Autobackfill regions

This commit is contained in:
Jack Kingsman
2026-06-20 21:48:16 -07:00
parent 06556a853d
commit 66148c2c39
10 changed files with 183 additions and 2 deletions
+1
View File
@@ -371,6 +371,7 @@ All endpoints are prefixed with `/api` (e.g., `/api/health`).
| POST | `/api/messages/channel/{message_id}/resend` | Resend channel message (default: byte-perfect within 30s; `?new_timestamp=true`: fresh timestamp, no time limit, creates new message row) |
| GET | `/api/packets/undecrypted/count` | Count of undecrypted packets |
| GET | `/api/packets/{packet_id}` | Fetch one stored raw packet by row ID for on-demand inspection |
| POST | `/api/packets/region-backfill` | Re-resolve region scope for stored channel messages with retained raw packets |
| POST | `/api/packets/decrypt/historical` | Decrypt stored packets |
| POST | `/api/packets/maintenance` | Delete old packets and vacuum |
| GET | `/api/read-state/unreads` | Server-computed unread counts, mentions, last message times, and `last_read_ats` boundaries |
+1
View File
@@ -263,6 +263,7 @@ Web Push is a standalone subsystem in `app/push/`, separate from the fanout modu
### Packets
- `GET /packets/undecrypted/count`
- `POST /packets/region-backfill` — re-resolve region scope for stored channel messages that still have a retained raw packet (region is otherwise only tagged at ingest); returns `{scanned, scoped, named}`
- `GET /packets/{packet_id}` — fetch one stored raw packet by row ID for on-demand inspection
- `POST /packets/decrypt/historical`
- `POST /packets/maintenance`
+45
View File
@@ -1,6 +1,7 @@
import json
import re
import time
from collections.abc import AsyncIterator
from dataclasses import dataclass
from typing import Any
@@ -632,6 +633,50 @@ class MessageRepository:
async with conn.execute("DELETE FROM messages WHERE id = ?", (message_id,)):
pass
@staticmethod
async def stream_chan_messages_with_raw(
batch_size: int = 500,
) -> "AsyncIterator[tuple[int, bytes]]":
"""Yield (message_id, raw_packet_bytes) for CHAN messages that still have a
retained raw packet, in ascending id batches.
Used by the region backfill: region is a property of the on-air payload, so
any retained raw packet for the message yields the same transport code.
"""
last_id = 0
while True:
async with db.readonly() as conn:
async with conn.execute(
"""
SELECT m.id AS mid, rp.data AS data
FROM messages m
JOIN raw_packets rp ON rp.message_id = m.id
WHERE m.type = 'CHAN' AND m.id > ?
GROUP BY m.id
ORDER BY m.id ASC
LIMIT ?
""",
(last_id, batch_size),
) as cursor:
rows = await cursor.fetchall()
if not rows:
return
for row in rows:
yield row["mid"], bytes(row["data"])
last_id = row["mid"]
@staticmethod
async def set_transport_scope(
message_id: int, transport_code: int | None, region: str | None
) -> None:
"""Set the resolved transport code / region on a stored message."""
async with db.tx() as conn:
async with conn.execute(
"UPDATE messages SET transport_code = ?, region = ? WHERE id = ?",
(transport_code, region, message_id),
):
pass
@staticmethod
async def get_by_content(
msg_type: str,
+14
View File
@@ -17,6 +17,7 @@ from app.repository import (
MessageRepository,
RawPacketRepository,
)
from app.services.messages import backfill_message_regions
from app.websocket import broadcast_success
logger = logging.getLogger(__name__)
@@ -128,6 +129,19 @@ async def get_undecrypted_count() -> dict:
return {"count": count}
@router.post("/region-backfill")
async def backfill_regions() -> dict:
"""Re-resolve region scope for stored channel messages that still have a raw packet.
Region tagging normally happens at ingest, so messages stored before the feature
(or before a region name was added to ``known_regions``) have no region. This
recomputes them. Messages whose raw packet was already purged cannot be
re-evaluated. Clients should refetch the conversation to see updated badges.
"""
known_regions = (await AppSettingsRepository.get()).known_regions
return await backfill_message_regions(known_regions)
@router.get("/{packet_id}", response_model=RawPacketDetail)
async def get_raw_packet(packet_id: int) -> RawPacketDetail:
"""Fetch one stored raw packet by row ID for on-demand inspection."""
+12
View File
@@ -221,6 +221,7 @@ async def update_settings(update: AppSettingsUpdate) -> AppSettings:
# Known regions for scope decoding. Normalize to user-facing form (no leading
# '#'), trim blanks, and dedupe case-insensitively while preserving order.
known_regions_changed = False
if update.known_regions is not None:
cleaned_regions: list[str] = []
seen_regions: set[str] = set()
@@ -231,6 +232,8 @@ async def update_settings(update: AppSettingsUpdate) -> AppSettings:
if name and name.lower() not in seen_regions:
seen_regions.add(name.lower())
cleaned_regions.append(name)
current = await AppSettingsRepository.get()
known_regions_changed = cleaned_regions != current.known_regions
kwargs["known_regions"] = cleaned_regions
# Block lists
@@ -290,6 +293,15 @@ async def update_settings(update: AppSettingsUpdate) -> AppSettings:
except Exception as e:
logger.warning("Failed to apply flood_scope to radio: %s", e)
# Retroactively tag stored messages when the region list changed. Runs in
# the background since it walks every channel message with a retained raw
# packet; clients refetch conversations to see updated badges.
if known_regions_changed:
from app.services.messages import backfill_message_regions
logger.info("known_regions changed; scheduling region backfill")
asyncio.create_task(backfill_message_regions(result.known_regions))
return result
return await AppSettingsRepository.get()
+33
View File
@@ -360,6 +360,39 @@ async def create_message_from_decrypted(
return msg_id
async def backfill_message_regions(known_regions: list[str]) -> dict[str, int]:
"""Re-resolve region scope for stored channel messages that still have a raw packet.
Region is normally resolved at ingest, so messages stored before the feature
existed (or before a region was added to the list) carry no region. This walks
every CHAN message that still has a retained raw packet, recomputes its
transport code, and persists the resolved region.
Messages whose raw packet has been purged cannot be re-evaluated.
Returns counts: ``scanned`` (messages examined), ``scoped`` (transport-routed),
and ``named`` (matched a known region).
"""
from app.path_utils import parse_packet_envelope
from app.region_resolver import resolve_region
scanned = scoped = named = 0
async for message_id, raw_bytes in MessageRepository.stream_chan_messages_with_raw():
scanned += 1
env = parse_packet_envelope(raw_bytes)
if env is None or env.transport_codes is None:
continue
scoped += 1
transport_code = env.transport_codes[0]
region = resolve_region(int(env.payload_type), env.payload, transport_code, known_regions)
if region:
named += 1
await MessageRepository.set_transport_scope(message_id, transport_code, region)
logger.info("Region backfill complete: scanned=%d scoped=%d named=%d", scanned, scoped, named)
return {"scanned": scanned, "scoped": scoped, "named": named}
async def create_dm_message_from_decrypted(
*,
packet_id: int,
@@ -1181,8 +1181,8 @@ export function SettingsRadioSection({
<p className="text-[0.8125rem] text-muted-foreground">
One region name per line. Incoming region-scoped (TransportFlood/TransportDirect) packets
are matched against this list so messages and the packet inspector show a readable region
label instead of a raw transport code. The list is seeded from your channels' regions and
can be edited freely.
label instead of a raw transport code. Saving a change re-tags existing messages whose
original packet is still stored.
</p>
</div>
+11
View File
@@ -56,6 +56,17 @@ class TestUndecryptedCount:
assert response.json()["count"] == 3
class TestRegionBackfill:
"""Test POST /api/packets/region-backfill."""
@pytest.mark.asyncio
async def test_returns_zero_counts_on_empty_db(self, test_db, client):
response = await client.post("/api/packets/region-backfill")
assert response.status_code == 200
assert response.json() == {"scanned": 0, "scoped": 0, "named": 0}
class TestGetRawPacket:
"""Test GET /api/packets/{id}."""
+37
View File
@@ -174,6 +174,43 @@ class TestRegionPersistedOnChannelMessage:
assert messages[0].transport_code == code
assert messages[0].region is None
@pytest.mark.asyncio
async def test_backfill_tags_messages_ingested_before_region_was_known(
self, test_db, captured_broadcasts
):
from app.packet_processor import process_raw_packet
from app.repository import AppSettingsRepository, ChannelRepository, MessageRepository
from app.services.messages import backfill_message_regions
fixture = FIXTURES["channel_message"]
await ChannelRepository.upsert(
key=fixture["channel_key_hex"].upper(), name=fixture["channel_name"], is_hashtag=True
)
# Ingest a region-scoped message while the region is NOT yet in the list.
await AppSettingsRepository.update(known_regions=[])
packet_bytes, code = _build_transport_channel_packet("nl-gr")
_, mock_broadcast = captured_broadcasts
with patch("app.packet_processor.broadcast_event", mock_broadcast):
await process_raw_packet(packet_bytes, timestamp=1700000000)
messages = await MessageRepository.get_all(
msg_type="CHAN", conversation_key=fixture["channel_key_hex"].upper(), limit=10
)
assert len(messages) == 1
# transport_code is set at ingest, but region is unresolved (empty list).
assert messages[0].transport_code == code
assert messages[0].region is None
# Operator adds the region and runs the backfill.
result = await backfill_message_regions(["nl-gr"])
assert result["named"] == 1
refreshed = await MessageRepository.get_all(
msg_type="CHAN", conversation_key=fixture["channel_key_hex"].upper(), limit=10
)
assert refreshed[0].region == "nl-gr"
assert refreshed[0].transport_code == code
@pytest.mark.asyncio
async def test_plain_flood_message_is_unscoped(self, test_db, captured_broadcasts):
from app.packet_processor import process_raw_packet
+27
View File
@@ -104,6 +104,33 @@ class TestUpdateSettings:
)
assert result.known_regions == ["nl-gr", "DE-BY"]
@pytest.mark.asyncio
async def test_known_regions_change_triggers_backfill(self, test_db):
"""Changing the region list schedules a background message re-tag."""
import asyncio
with patch(
"app.services.messages.backfill_message_regions", new_callable=AsyncMock
) as mock_backfill:
await update_settings(AppSettingsUpdate(known_regions=["nl-gr", "de-by"]))
await asyncio.sleep(0) # let the scheduled task run
mock_backfill.assert_awaited_once_with(["nl-gr", "de-by"])
@pytest.mark.asyncio
async def test_known_regions_unchanged_skips_backfill(self, test_db):
"""Saving the same region list does not re-run the backfill."""
import asyncio
await update_settings(AppSettingsUpdate(known_regions=["nl-gr"]))
with patch(
"app.services.messages.backfill_message_regions", new_callable=AsyncMock
) as mock_backfill:
await update_settings(AppSettingsUpdate(known_regions=["#nl-gr"])) # same after cleanup
await asyncio.sleep(0)
mock_backfill.assert_not_awaited()
@pytest.mark.asyncio
async def test_flood_scope_applies_to_radio(self, test_db):
"""When radio is connected, setting flood_scope calls set_flood_scope on radio."""