Add missing pathing information to frontend

This commit is contained in:
Jack Kingsman
2026-01-18 14:17:49 -08:00
parent b13af6433d
commit 13220c4a8f
19 changed files with 281 additions and 40 deletions
+1 -1
View File
@@ -188,7 +188,7 @@ messages (
text TEXT NOT NULL,
sender_timestamp INTEGER,
received_at INTEGER NOT NULL,
path_len INTEGER,
path TEXT, -- Hex-encoded routing path (2 chars per hop), null for outgoing
txt_type INTEGER DEFAULT 0,
signature TEXT,
outgoing INTEGER DEFAULT 0,
+1 -1
View File
@@ -37,7 +37,7 @@ CREATE TABLE IF NOT EXISTS messages (
text TEXT NOT NULL,
sender_timestamp INTEGER,
received_at INTEGER NOT NULL,
path_len INTEGER,
path TEXT,
txt_type INTEGER DEFAULT 0,
signature TEXT,
outgoing INTEGER DEFAULT 0,
+2 -2
View File
@@ -80,7 +80,7 @@ async def on_contact_message(event: "Event") -> None:
conversation_key=sender_pubkey,
sender_timestamp=payload.get("sender_timestamp"),
received_at=received_at,
path_len=payload.get("path_len"),
path=payload.get("path"),
txt_type=payload.get("txt_type", 0),
signature=payload.get("signature"),
)
@@ -100,7 +100,7 @@ async def on_contact_message(event: "Event") -> None:
"text": payload.get("text", ""),
"sender_timestamp": payload.get("sender_timestamp"),
"received_at": received_at,
"path_len": payload.get("path_len"),
"path": payload.get("path"),
"txt_type": payload.get("txt_type", 0),
"signature": payload.get("signature"),
"outgoing": False,
+166
View File
@@ -72,6 +72,20 @@ async def run_migrations(conn: aiosqlite.Connection) -> int:
await set_version(conn, 5)
applied += 1
# Migration 6: Replace path_len with path column in messages
if version < 6:
logger.info("Applying migration 6: replace path_len with path column")
await _migrate_006_replace_path_len_with_path(conn)
await set_version(conn, 6)
applied += 1
# Migration 7: Backfill path from raw_packets for decrypted messages
if version < 7:
logger.info("Applying migration 7: backfill path from raw_packets")
await _migrate_007_backfill_message_paths(conn)
await set_version(conn, 7)
applied += 1
if applied > 0:
logger.info(
"Applied %d migration(s), schema now at version %d", applied, await get_version(conn)
@@ -348,3 +362,155 @@ async def _migrate_005_backfill_payload_hashes(conn: aiosqlite.Connection) -> No
raise
await conn.commit()
async def _migrate_006_replace_path_len_with_path(conn: aiosqlite.Connection) -> None:
"""
Replace path_len INTEGER column with path TEXT column in messages table.
The path column stores the hex-encoded routing path bytes. Path length can
be derived from the hex string (2 chars per byte = 1 hop).
SQLite 3.35.0+ supports ALTER TABLE DROP COLUMN. For older versions,
we silently skip the drop (the column will remain but is unused).
"""
# First, add the new path column
try:
await conn.execute("ALTER TABLE messages ADD COLUMN path TEXT")
logger.debug("Added path column to messages table")
except aiosqlite.OperationalError as e:
if "duplicate column name" in str(e).lower():
logger.debug("messages.path already exists, skipping")
else:
raise
# Try to drop the old path_len column
try:
await conn.execute("ALTER TABLE messages DROP COLUMN path_len")
logger.debug("Dropped path_len from messages table")
except aiosqlite.OperationalError as e:
error_msg = str(e).lower()
if "no such column" in error_msg:
logger.debug("messages.path_len already dropped, skipping")
elif "syntax error" in error_msg or "drop column" in error_msg:
# SQLite version doesn't support DROP COLUMN - harmless, column stays
logger.debug("SQLite doesn't support DROP COLUMN, path_len column will remain")
else:
raise
await conn.commit()
def _extract_path_from_packet(raw_packet: bytes) -> str | None:
"""
Extract path hex string from a raw packet (migration-local copy of decoder logic).
Returns the path as a hex string, or None if packet is malformed.
"""
if len(raw_packet) < 2:
return None
try:
header = raw_packet[0]
route_type = header & 0x03
offset = 1
# Skip transport codes if present (TRANSPORT_FLOOD=0, TRANSPORT_DIRECT=3)
if route_type in (0x00, 0x03):
if len(raw_packet) < offset + 4:
return None
offset += 4
# Get path length
if len(raw_packet) < offset + 1:
return None
path_length = raw_packet[offset]
offset += 1
# Extract path bytes
if len(raw_packet) < offset + path_length:
return None
path_bytes = raw_packet[offset : offset + path_length]
return path_bytes.hex()
except (IndexError, ValueError):
return None
async def _migrate_007_backfill_message_paths(conn: aiosqlite.Connection) -> None:
"""
Backfill path column for messages that have linked raw_packets.
For each message with a linked raw_packet (via message_id), extract the
path from the raw packet and update the message.
Only updates incoming messages (outgoing=0) since outgoing messages
don't have meaningful path data.
"""
# Get count of messages that need backfill
cursor = await conn.execute(
"""
SELECT COUNT(*)
FROM messages m
JOIN raw_packets rp ON rp.message_id = m.id
WHERE m.path IS NULL AND m.outgoing = 0
"""
)
row = await cursor.fetchone()
total = row[0] if row else 0
if total == 0:
logger.debug("No messages need path backfill")
return
logger.info("Backfilling path for %d messages. This may take a while...", total)
# Process in batches
batch_size = 1000
processed = 0
updated = 0
cursor = await conn.execute(
"""
SELECT m.id, rp.data
FROM messages m
JOIN raw_packets rp ON rp.message_id = m.id
WHERE m.path IS NULL AND m.outgoing = 0
ORDER BY m.id ASC
"""
)
updates: list[tuple[str, int]] = [] # (path, message_id)
while True:
rows = await cursor.fetchmany(batch_size)
if not rows:
break
for row in rows:
message_id = row[0]
packet_data = bytes(row[1])
path_hex = _extract_path_from_packet(packet_data)
if path_hex is not None:
updates.append((path_hex, message_id))
processed += 1
if processed % 10000 == 0:
logger.info("Processed %d/%d messages...", processed, total)
# Apply updates in batches
if updates:
logger.info("Updating %d messages with path data...", len(updates))
for idx, (path_hex, message_id) in enumerate(updates, 1):
await conn.execute(
"UPDATE messages SET path = ? WHERE id = ?",
(path_hex, message_id),
)
updated += 1
if idx % 10000 == 0:
logger.info("Updated %d/%d messages...", idx, len(updates))
await conn.commit()
logger.info("Path backfill complete: %d messages updated", updated)
+1 -1
View File
@@ -74,7 +74,7 @@ class Message(BaseModel):
text: str
sender_timestamp: int | None = None
received_at: int
path_len: int | None = None
path: str | None = Field(default=None, description="Hex-encoded routing path (2 chars per hop)")
txt_type: int = 0
signature: str | None = None
outgoing: bool = False
+5 -4
View File
@@ -48,7 +48,7 @@ async def create_message_from_decrypted(
message_text: str,
timestamp: int,
received_at: int | None = None,
path_len: int | None = None,
path: str | None = None,
) -> int | None:
"""Create a message record from decrypted channel packet content.
@@ -62,7 +62,7 @@ async def create_message_from_decrypted(
message_text: The decrypted message content
timestamp: Sender timestamp from the packet
received_at: When the packet was received (defaults to now)
path_len: Path length from packet routing (None for historical decryption)
path: Hex-encoded routing path (None for historical decryption)
Returns the message ID if created, None if duplicate.
"""
@@ -83,6 +83,7 @@ async def create_message_from_decrypted(
conversation_key=channel_key_normalized,
sender_timestamp=timestamp,
received_at=received,
path=path,
)
if msg_id is None:
@@ -111,7 +112,7 @@ async def create_message_from_decrypted(
"text": text,
"sender_timestamp": timestamp,
"received_at": received,
"path_len": path_len,
"path": path,
"txt_type": 0,
"signature": None,
"outgoing": False,
@@ -283,7 +284,7 @@ async def _process_group_text(
message_text=decrypted.message,
timestamp=decrypted.timestamp,
received_at=timestamp,
path_len=packet_info.path_length if packet_info else None,
path=packet_info.path.hex() if packet_info else None,
)
return {
+5 -5
View File
@@ -293,7 +293,7 @@ class MessageRepository:
received_at: int,
conversation_key: str,
sender_timestamp: int | None = None,
path_len: int | None = None,
path: str | None = None,
txt_type: int = 0,
signature: str | None = None,
outgoing: bool = False,
@@ -307,7 +307,7 @@ class MessageRepository:
cursor = await db.conn.execute(
"""
INSERT OR IGNORE INTO messages (type, conversation_key, text, sender_timestamp,
received_at, path_len, txt_type, signature, outgoing)
received_at, path, txt_type, signature, outgoing)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
@@ -316,7 +316,7 @@ class MessageRepository:
text,
sender_timestamp,
received_at,
path_len,
path,
txt_type,
signature,
outgoing,
@@ -359,7 +359,7 @@ class MessageRepository:
text=row["text"],
sender_timestamp=row["sender_timestamp"],
received_at=row["received_at"],
path_len=row["path_len"],
path=row["path"],
txt_type=row["txt_type"],
signature=row["signature"],
outgoing=bool(row["outgoing"]),
@@ -419,7 +419,7 @@ class MessageRepository:
text=row["text"],
sender_timestamp=row["sender_timestamp"],
received_at=row["received_at"],
path_len=row["path_len"],
path=row["path"],
txt_type=row["txt_type"],
signature=row["signature"],
outgoing=bool(row["outgoing"]),
+6 -1
View File
@@ -5,7 +5,7 @@ from fastapi import APIRouter, BackgroundTasks
from pydantic import BaseModel, Field
from app.database import db
from app.decoder import try_decrypt_packet_with_channel_key
from app.decoder import parse_packet, try_decrypt_packet_with_channel_key
from app.packet_processor import create_message_from_decrypted
from app.repository import RawPacketRepository
@@ -65,6 +65,10 @@ async def _run_historical_decryption(channel_key_bytes: bytes, channel_key_hex:
result.message[:50] if result.message else "",
)
# Extract path from the raw packet for storage
packet_info = parse_packet(packet_data)
path_hex = packet_info.path.hex() if packet_info else None
msg_id = await create_message_from_decrypted(
packet_id=packet_id,
channel_key=channel_key_hex,
@@ -72,6 +76,7 @@ async def _run_historical_decryption(channel_key_bytes: bytes, channel_key_hex:
message_text=result.message,
timestamp=result.timestamp,
received_at=packet_timestamp, # Use original packet timestamp for correct ordering
path=path_hex,
)
if msg_id is not None:
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
+1 -1
View File
@@ -13,7 +13,7 @@
<link rel="shortcut icon" href="/favicon.ico" />
<link rel="apple-touch-icon" sizes="180x180" href="/apple-touch-icon.png" />
<link rel="manifest" href="/site.webmanifest" />
<script type="module" crossorigin src="/assets/index-5vg6E5ET.js"></script>
<script type="module" crossorigin src="/assets/index-DRuKVg0T.js"></script>
<link rel="stylesheet" crossorigin href="/assets/index-D9NEpiho.css">
</head>
<body>
+1 -1
View File
@@ -100,7 +100,7 @@ function createLocalMessage(conversationKey: string, text: string, outgoing: boo
text,
sender_timestamp: now,
received_at: now,
path_len: null,
path: null,
txt_type: 0,
signature: null,
outgoing,
+1 -1
View File
@@ -99,7 +99,7 @@ function createLocalMessage(
text,
sender_timestamp: now,
received_at: now,
path_len: null,
path: null,
txt_type: 0,
signature: null,
outgoing,
+2 -2
View File
@@ -268,7 +268,7 @@ describe('Integration: ACK Events', () => {
text: 'Hello',
sender_timestamp: 1700000000,
received_at: 1700000000,
path_len: null,
path: null,
txt_type: 0,
signature: null,
outgoing: true,
@@ -301,7 +301,7 @@ describe('Integration: ACK Events', () => {
text: 'Hello',
sender_timestamp: 1700000000,
received_at: 1700000000,
path_len: null,
path: null,
txt_type: 0,
signature: null,
outgoing: true,
+1 -1
View File
@@ -72,7 +72,7 @@ describe('shouldIncrementUnread', () => {
text: 'Test',
sender_timestamp: null,
received_at: Date.now(),
path_len: null,
path: null,
txt_type: 0,
signature: null,
outgoing: false,
@@ -16,7 +16,7 @@ function createMessage(overrides: Partial<Message> = {}): Message {
text: 'Hello world',
sender_timestamp: 1700000000,
received_at: 1700000001,
path_len: null,
path: null,
txt_type: 0,
signature: null,
outgoing: false,
+2 -1
View File
@@ -82,7 +82,8 @@ export interface Message {
text: string;
sender_timestamp: number | null;
received_at: number;
path_len: number | null;
/** Hex-encoded routing path (2 chars per hop). Null for outgoing messages. */
path: string | null;
txt_type: number;
signature: string | null;
outgoing: boolean;
+75 -7
View File
@@ -78,13 +78,30 @@ class TestMigration001:
)
""")
await conn.execute("CREATE INDEX idx_raw_packets_decrypted ON raw_packets(decrypted)")
# Messages table with old schema (for migrations 6 and 7)
await conn.execute("""
CREATE TABLE messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
type TEXT NOT NULL,
conversation_key TEXT NOT NULL,
text TEXT NOT NULL,
sender_timestamp INTEGER,
received_at INTEGER NOT NULL,
path_len INTEGER,
txt_type INTEGER DEFAULT 0,
signature TEXT,
outgoing INTEGER DEFAULT 0,
acked INTEGER DEFAULT 0,
UNIQUE(type, conversation_key, text, sender_timestamp)
)
""")
await conn.commit()
# Run migrations
applied = await run_migrations(conn)
assert applied == 5 # All 5 migrations run
assert await get_version(conn) == 5
assert applied == 7 # All 7 migrations run
assert await get_version(conn) == 7
# Verify columns exist by inserting and selecting
await conn.execute(
@@ -143,15 +160,32 @@ class TestMigration001:
)
""")
await conn.execute("CREATE INDEX idx_raw_packets_decrypted ON raw_packets(decrypted)")
# Messages table with old schema (for migrations 6 and 7)
await conn.execute("""
CREATE TABLE messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
type TEXT NOT NULL,
conversation_key TEXT NOT NULL,
text TEXT NOT NULL,
sender_timestamp INTEGER,
received_at INTEGER NOT NULL,
path_len INTEGER,
txt_type INTEGER DEFAULT 0,
signature TEXT,
outgoing INTEGER DEFAULT 0,
acked INTEGER DEFAULT 0,
UNIQUE(type, conversation_key, text, sender_timestamp)
)
""")
await conn.commit()
# Run migrations twice
applied1 = await run_migrations(conn)
applied2 = await run_migrations(conn)
assert applied1 == 5 # All 5 migrations run
assert applied1 == 7 # All 7 migrations run
assert applied2 == 0 # No migrations on second run
assert await get_version(conn) == 5
assert await get_version(conn) == 7
finally:
await conn.close()
@@ -189,14 +223,31 @@ class TestMigration001:
)
""")
await conn.execute("CREATE INDEX idx_raw_packets_decrypted ON raw_packets(decrypted)")
# Messages table with old schema (for migrations 6 and 7)
await conn.execute("""
CREATE TABLE messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
type TEXT NOT NULL,
conversation_key TEXT NOT NULL,
text TEXT NOT NULL,
sender_timestamp INTEGER,
received_at INTEGER NOT NULL,
path_len INTEGER,
txt_type INTEGER DEFAULT 0,
signature TEXT,
outgoing INTEGER DEFAULT 0,
acked INTEGER DEFAULT 0,
UNIQUE(type, conversation_key, text, sender_timestamp)
)
""")
await conn.commit()
# Run migrations - should not fail
applied = await run_migrations(conn)
# All 5 migrations applied (version incremented) but no error
assert applied == 5
assert await get_version(conn) == 5
# All 7 migrations applied (version incremented) but no error
assert applied == 7
assert await get_version(conn) == 7
finally:
await conn.close()
@@ -234,6 +285,23 @@ class TestMigration001:
)
""")
await conn.execute("CREATE INDEX idx_raw_packets_decrypted ON raw_packets(decrypted)")
# Messages table with old schema (for migrations 6 and 7)
await conn.execute("""
CREATE TABLE messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
type TEXT NOT NULL,
conversation_key TEXT NOT NULL,
text TEXT NOT NULL,
sender_timestamp INTEGER,
received_at INTEGER NOT NULL,
path_len INTEGER,
txt_type INTEGER DEFAULT 0,
signature TEXT,
outgoing INTEGER DEFAULT 0,
acked INTEGER DEFAULT 0,
UNIQUE(type, conversation_key, text, sender_timestamp)
)
""")
await conn.execute(
"INSERT INTO contacts (public_key, name, type) VALUES (?, ?, ?)",
("existingkey", "ExistingContact", 1),
+6 -6
View File
@@ -430,7 +430,7 @@ class TestCreateMessageFromDecrypted:
assert broadcast["text"] == "TestSender: Hello world"
assert broadcast["sender_timestamp"] == 1700000000
assert broadcast["received_at"] == 1700000001
assert broadcast["path_len"] is None # Historical decryption has no path info
assert broadcast["path"] is None # Historical decryption has no path info
assert broadcast["outgoing"] is False
assert broadcast["acked"] == 0
@@ -529,8 +529,8 @@ class TestMessageBroadcastStructure:
"""Test that message broadcasts have the correct structure for frontend."""
@pytest.mark.asyncio
async def test_realtime_broadcast_includes_path_len(self, test_db, captured_broadcasts):
"""Real-time packet processing includes path_len in broadcast."""
async def test_realtime_broadcast_includes_path(self, test_db, captured_broadcasts):
"""Real-time packet processing includes path in broadcast."""
from app.packet_processor import process_raw_packet
fixture = FIXTURES["channel_message"]
@@ -550,9 +550,9 @@ class TestMessageBroadcastStructure:
assert len(message_broadcasts) == 1
broadcast = message_broadcasts[0]["data"]
# Real-time processing extracts path_len from packet (flood packets have path_len=0)
assert "path_len" in broadcast
# The test packet is a flood packet, so path_len should be 0 or None depending on packet structure
# Real-time processing extracts path from packet (flood packets have empty path)
assert "path" in broadcast
# The test packet is a flood packet, so path should be empty string ""
class TestRawPacketStorage: