diff --git a/AGENTS.md b/AGENTS.md index d77d19d..ba3786b 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -371,6 +371,7 @@ All endpoints are prefixed with `/api` (e.g., `/api/health`). | POST | `/api/messages/channel/{message_id}/resend` | Resend channel message (default: byte-perfect within 30s; `?new_timestamp=true`: fresh timestamp, no time limit, creates new message row) | | GET | `/api/packets/undecrypted/count` | Count of undecrypted packets | | GET | `/api/packets/{packet_id}` | Fetch one stored raw packet by row ID for on-demand inspection | +| POST | `/api/packets/region-backfill` | Re-resolve region scope for stored channel messages with retained raw packets | | POST | `/api/packets/decrypt/historical` | Decrypt stored packets | | POST | `/api/packets/maintenance` | Delete old packets and vacuum | | GET | `/api/read-state/unreads` | Server-computed unread counts, mentions, last message times, and `last_read_ats` boundaries | diff --git a/app/AGENTS.md b/app/AGENTS.md index a8ec381..dbdb032 100644 --- a/app/AGENTS.md +++ b/app/AGENTS.md @@ -263,6 +263,7 @@ Web Push is a standalone subsystem in `app/push/`, separate from the fanout modu ### Packets - `GET /packets/undecrypted/count` +- `POST /packets/region-backfill` — re-resolve region scope for stored channel messages that still have a retained raw packet (region is otherwise only tagged at ingest); returns `{scanned, scoped, named}` - `GET /packets/{packet_id}` — fetch one stored raw packet by row ID for on-demand inspection - `POST /packets/decrypt/historical` - `POST /packets/maintenance` diff --git a/app/repository/messages.py b/app/repository/messages.py index c0eeafb..cd66549 100644 --- a/app/repository/messages.py +++ b/app/repository/messages.py @@ -1,6 +1,7 @@ import json import re import time +from collections.abc import AsyncIterator from dataclasses import dataclass from typing import Any @@ -632,6 +633,50 @@ class MessageRepository: async with conn.execute("DELETE FROM messages WHERE id = ?", (message_id,)): pass + @staticmethod + async def stream_chan_messages_with_raw( + batch_size: int = 500, + ) -> "AsyncIterator[tuple[int, bytes]]": + """Yield (message_id, raw_packet_bytes) for CHAN messages that still have a + retained raw packet, in ascending id batches. + + Used by the region backfill: region is a property of the on-air payload, so + any retained raw packet for the message yields the same transport code. + """ + last_id = 0 + while True: + async with db.readonly() as conn: + async with conn.execute( + """ + SELECT m.id AS mid, rp.data AS data + FROM messages m + JOIN raw_packets rp ON rp.message_id = m.id + WHERE m.type = 'CHAN' AND m.id > ? + GROUP BY m.id + ORDER BY m.id ASC + LIMIT ? + """, + (last_id, batch_size), + ) as cursor: + rows = await cursor.fetchall() + if not rows: + return + for row in rows: + yield row["mid"], bytes(row["data"]) + last_id = row["mid"] + + @staticmethod + async def set_transport_scope( + message_id: int, transport_code: int | None, region: str | None + ) -> None: + """Set the resolved transport code / region on a stored message.""" + async with db.tx() as conn: + async with conn.execute( + "UPDATE messages SET transport_code = ?, region = ? WHERE id = ?", + (transport_code, region, message_id), + ): + pass + @staticmethod async def get_by_content( msg_type: str, diff --git a/app/routers/packets.py b/app/routers/packets.py index fc27ba7..a80f847 100644 --- a/app/routers/packets.py +++ b/app/routers/packets.py @@ -17,6 +17,7 @@ from app.repository import ( MessageRepository, RawPacketRepository, ) +from app.services.messages import backfill_message_regions from app.websocket import broadcast_success logger = logging.getLogger(__name__) @@ -128,6 +129,19 @@ async def get_undecrypted_count() -> dict: return {"count": count} +@router.post("/region-backfill") +async def backfill_regions() -> dict: + """Re-resolve region scope for stored channel messages that still have a raw packet. + + Region tagging normally happens at ingest, so messages stored before the feature + (or before a region name was added to ``known_regions``) have no region. This + recomputes them. Messages whose raw packet was already purged cannot be + re-evaluated. Clients should refetch the conversation to see updated badges. + """ + known_regions = (await AppSettingsRepository.get()).known_regions + return await backfill_message_regions(known_regions) + + @router.get("/{packet_id}", response_model=RawPacketDetail) async def get_raw_packet(packet_id: int) -> RawPacketDetail: """Fetch one stored raw packet by row ID for on-demand inspection.""" diff --git a/app/routers/settings.py b/app/routers/settings.py index b707c37..1100786 100644 --- a/app/routers/settings.py +++ b/app/routers/settings.py @@ -221,6 +221,7 @@ async def update_settings(update: AppSettingsUpdate) -> AppSettings: # Known regions for scope decoding. Normalize to user-facing form (no leading # '#'), trim blanks, and dedupe case-insensitively while preserving order. + known_regions_changed = False if update.known_regions is not None: cleaned_regions: list[str] = [] seen_regions: set[str] = set() @@ -231,6 +232,8 @@ async def update_settings(update: AppSettingsUpdate) -> AppSettings: if name and name.lower() not in seen_regions: seen_regions.add(name.lower()) cleaned_regions.append(name) + current = await AppSettingsRepository.get() + known_regions_changed = cleaned_regions != current.known_regions kwargs["known_regions"] = cleaned_regions # Block lists @@ -290,6 +293,15 @@ async def update_settings(update: AppSettingsUpdate) -> AppSettings: except Exception as e: logger.warning("Failed to apply flood_scope to radio: %s", e) + # Retroactively tag stored messages when the region list changed. Runs in + # the background since it walks every channel message with a retained raw + # packet; clients refetch conversations to see updated badges. + if known_regions_changed: + from app.services.messages import backfill_message_regions + + logger.info("known_regions changed; scheduling region backfill") + asyncio.create_task(backfill_message_regions(result.known_regions)) + return result return await AppSettingsRepository.get() diff --git a/app/services/messages.py b/app/services/messages.py index 47c13bf..9e37881 100644 --- a/app/services/messages.py +++ b/app/services/messages.py @@ -360,6 +360,39 @@ async def create_message_from_decrypted( return msg_id +async def backfill_message_regions(known_regions: list[str]) -> dict[str, int]: + """Re-resolve region scope for stored channel messages that still have a raw packet. + + Region is normally resolved at ingest, so messages stored before the feature + existed (or before a region was added to the list) carry no region. This walks + every CHAN message that still has a retained raw packet, recomputes its + transport code, and persists the resolved region. + + Messages whose raw packet has been purged cannot be re-evaluated. + + Returns counts: ``scanned`` (messages examined), ``scoped`` (transport-routed), + and ``named`` (matched a known region). + """ + from app.path_utils import parse_packet_envelope + from app.region_resolver import resolve_region + + scanned = scoped = named = 0 + async for message_id, raw_bytes in MessageRepository.stream_chan_messages_with_raw(): + scanned += 1 + env = parse_packet_envelope(raw_bytes) + if env is None or env.transport_codes is None: + continue + scoped += 1 + transport_code = env.transport_codes[0] + region = resolve_region(int(env.payload_type), env.payload, transport_code, known_regions) + if region: + named += 1 + await MessageRepository.set_transport_scope(message_id, transport_code, region) + + logger.info("Region backfill complete: scanned=%d scoped=%d named=%d", scanned, scoped, named) + return {"scanned": scanned, "scoped": scoped, "named": named} + + async def create_dm_message_from_decrypted( *, packet_id: int, diff --git a/frontend/src/components/settings/SettingsRadioSection.tsx b/frontend/src/components/settings/SettingsRadioSection.tsx index 07d79cb..fc1cd2c 100644 --- a/frontend/src/components/settings/SettingsRadioSection.tsx +++ b/frontend/src/components/settings/SettingsRadioSection.tsx @@ -1181,8 +1181,8 @@ export function SettingsRadioSection({
One region name per line. Incoming region-scoped (TransportFlood/TransportDirect) packets are matched against this list so messages and the packet inspector show a readable region - label instead of a raw transport code. The list is seeded from your channels' regions and - can be edited freely. + label instead of a raw transport code. Saving a change re-tags existing messages whose + original packet is still stored.
diff --git a/tests/test_packets_router.py b/tests/test_packets_router.py index 2b38b01..c0d0796 100644 --- a/tests/test_packets_router.py +++ b/tests/test_packets_router.py @@ -56,6 +56,17 @@ class TestUndecryptedCount: assert response.json()["count"] == 3 +class TestRegionBackfill: + """Test POST /api/packets/region-backfill.""" + + @pytest.mark.asyncio + async def test_returns_zero_counts_on_empty_db(self, test_db, client): + response = await client.post("/api/packets/region-backfill") + + assert response.status_code == 200 + assert response.json() == {"scanned": 0, "scoped": 0, "named": 0} + + class TestGetRawPacket: """Test GET /api/packets/{id}.""" diff --git a/tests/test_region_resolver.py b/tests/test_region_resolver.py index 1c7172a..8ebfc7c 100644 --- a/tests/test_region_resolver.py +++ b/tests/test_region_resolver.py @@ -174,6 +174,43 @@ class TestRegionPersistedOnChannelMessage: assert messages[0].transport_code == code assert messages[0].region is None + @pytest.mark.asyncio + async def test_backfill_tags_messages_ingested_before_region_was_known( + self, test_db, captured_broadcasts + ): + from app.packet_processor import process_raw_packet + from app.repository import AppSettingsRepository, ChannelRepository, MessageRepository + from app.services.messages import backfill_message_regions + + fixture = FIXTURES["channel_message"] + await ChannelRepository.upsert( + key=fixture["channel_key_hex"].upper(), name=fixture["channel_name"], is_hashtag=True + ) + # Ingest a region-scoped message while the region is NOT yet in the list. + await AppSettingsRepository.update(known_regions=[]) + packet_bytes, code = _build_transport_channel_packet("nl-gr") + _, mock_broadcast = captured_broadcasts + with patch("app.packet_processor.broadcast_event", mock_broadcast): + await process_raw_packet(packet_bytes, timestamp=1700000000) + + messages = await MessageRepository.get_all( + msg_type="CHAN", conversation_key=fixture["channel_key_hex"].upper(), limit=10 + ) + assert len(messages) == 1 + # transport_code is set at ingest, but region is unresolved (empty list). + assert messages[0].transport_code == code + assert messages[0].region is None + + # Operator adds the region and runs the backfill. + result = await backfill_message_regions(["nl-gr"]) + assert result["named"] == 1 + + refreshed = await MessageRepository.get_all( + msg_type="CHAN", conversation_key=fixture["channel_key_hex"].upper(), limit=10 + ) + assert refreshed[0].region == "nl-gr" + assert refreshed[0].transport_code == code + @pytest.mark.asyncio async def test_plain_flood_message_is_unscoped(self, test_db, captured_broadcasts): from app.packet_processor import process_raw_packet diff --git a/tests/test_settings_router.py b/tests/test_settings_router.py index 30e8468..50c59b3 100644 --- a/tests/test_settings_router.py +++ b/tests/test_settings_router.py @@ -104,6 +104,33 @@ class TestUpdateSettings: ) assert result.known_regions == ["nl-gr", "DE-BY"] + @pytest.mark.asyncio + async def test_known_regions_change_triggers_backfill(self, test_db): + """Changing the region list schedules a background message re-tag.""" + import asyncio + + with patch( + "app.services.messages.backfill_message_regions", new_callable=AsyncMock + ) as mock_backfill: + await update_settings(AppSettingsUpdate(known_regions=["nl-gr", "de-by"])) + await asyncio.sleep(0) # let the scheduled task run + + mock_backfill.assert_awaited_once_with(["nl-gr", "de-by"]) + + @pytest.mark.asyncio + async def test_known_regions_unchanged_skips_backfill(self, test_db): + """Saving the same region list does not re-run the backfill.""" + import asyncio + + await update_settings(AppSettingsUpdate(known_regions=["nl-gr"])) + with patch( + "app.services.messages.backfill_message_regions", new_callable=AsyncMock + ) as mock_backfill: + await update_settings(AppSettingsUpdate(known_regions=["#nl-gr"])) # same after cleanup + await asyncio.sleep(0) + + mock_backfill.assert_not_awaited() + @pytest.mark.asyncio async def test_flood_scope_applies_to_radio(self, test_db): """When radio is connected, setting flood_scope calls set_flood_scope on radio."""