Enhance PacketRouter and CompanionFrameServer for improved packet delivery and contact persistence

- Updated PacketRouter to deliver packets to all companion bridges when the destination is not recognized, ensuring better handling of ephemeral destinations.
- Refactored CompanionFrameServer to separate contact serialization and persistence logic, allowing for non-blocking database operations.
- Introduced a unique index for companion contacts in SQLite to support upsert functionality, enhancing data integrity and performance.
- Improved AdvertHelper to run database operations in a separate thread, preventing event loop blocking and maintaining responsiveness.
This commit is contained in:
agessaman
2026-02-24 21:51:56 -08:00
parent 27bbaf80ac
commit 789a2f27ea
4 changed files with 152 additions and 52 deletions
+48 -28
View File
@@ -83,37 +83,53 @@ class CompanionFrameServer(_BaseFrameServer):
path_len=msg_dict.get("path_len", 0),
)
def _save_contacts(self) -> None:
"""Persist contacts to SQLite."""
@staticmethod
def _contact_to_dict(c) -> dict:
"""Convert a Contact object to a persistence dict."""
pk = c.public_key if isinstance(c.public_key, bytes) else bytes.fromhex(c.public_key)
return {
"pubkey": pk,
"name": c.name,
"adv_type": c.adv_type,
"flags": c.flags,
"out_path_len": c.out_path_len,
"out_path": (
c.out_path
if isinstance(c.out_path, bytes)
else (bytes.fromhex(c.out_path) if c.out_path else b"")
),
"last_advert_timestamp": c.last_advert_timestamp,
"lastmod": c.lastmod,
"gps_lat": c.gps_lat,
"gps_lon": c.gps_lon,
"sync_since": c.sync_since,
}
async def _persist_contact(self, contact) -> None:
"""Upsert a single contact to SQLite (non-blocking)."""
if not self.sqlite_handler:
return
contact_dict = self._contact_to_dict(contact)
await asyncio.to_thread(
self.sqlite_handler.companion_upsert_contact,
self.companion_hash,
contact_dict,
)
async def _save_contacts(self) -> None:
"""Persist all contacts to SQLite (non-blocking)."""
if not self.sqlite_handler:
return
contacts = self.bridge.get_contacts()
dicts = []
for c in contacts:
pk = c.public_key if isinstance(c.public_key, bytes) else bytes.fromhex(c.public_key)
dicts.append(
{
"pubkey": pk,
"name": c.name,
"adv_type": c.adv_type,
"flags": c.flags,
"out_path_len": c.out_path_len,
"out_path": (
c.out_path
if isinstance(c.out_path, bytes)
else (bytes.fromhex(c.out_path) if c.out_path else b"")
),
"last_advert_timestamp": c.last_advert_timestamp,
"lastmod": c.lastmod,
"gps_lat": c.gps_lat,
"gps_lon": c.gps_lon,
"sync_since": c.sync_since,
}
)
self.sqlite_handler.companion_save_contacts(self.companion_hash, dicts)
dicts = [self._contact_to_dict(c) for c in contacts]
await asyncio.to_thread(
self.sqlite_handler.companion_save_contacts,
self.companion_hash,
dicts,
)
def _save_channels(self) -> None:
"""Persist channels to SQLite."""
async def _save_channels(self) -> None:
"""Persist channels to SQLite (non-blocking)."""
if not self.sqlite_handler:
return
channels = []
@@ -128,4 +144,8 @@ class CompanionFrameServer(_BaseFrameServer):
"secret": ch.secret,
}
)
self.sqlite_handler.companion_save_channels(self.companion_hash, channels)
await asyncio.to_thread(
self.sqlite_handler.companion_save_channels,
self.companion_hash,
channels,
)
@@ -381,6 +381,29 @@ class SQLiteHandler:
)
logger.info(f"Migration '{migration_name}' applied successfully")
# Migration 5: Add UNIQUE index on companion_contacts(companion_hash, pubkey)
# Required for ON CONFLICT upsert in companion_upsert_contact.
migration_name = "unique_companion_contacts_pubkey"
existing = conn.execute(
"SELECT migration_name FROM migrations WHERE migration_name = ?",
(migration_name,),
).fetchone()
if not existing:
# Replace the non-unique index with a UNIQUE one
conn.execute(
"DROP INDEX IF EXISTS idx_companion_contacts_pubkey"
)
conn.execute(
"CREATE UNIQUE INDEX IF NOT EXISTS idx_companion_contacts_hash_pubkey "
"ON companion_contacts (companion_hash, pubkey)"
)
conn.execute(
"INSERT INTO migrations (migration_name, applied_at) VALUES (?, ?)",
(migration_name, time.time()),
)
logger.info(f"Migration '{migration_name}' applied successfully")
conn.commit()
except Exception as e:
@@ -1674,6 +1697,49 @@ class SQLiteHandler:
logger.error(f"Failed to save companion contacts: {e}")
return False
def companion_upsert_contact(self, companion_hash: str, contact: dict) -> bool:
"""Insert or update a single contact for a companion in storage."""
try:
with sqlite3.connect(self.sqlite_path) as conn:
now = time.time()
conn.execute(
"""
INSERT INTO companion_contacts
(companion_hash, pubkey, name, adv_type, flags, out_path_len, out_path,
last_advert_timestamp, lastmod, gps_lat, gps_lon, sync_since, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(companion_hash, pubkey)
DO UPDATE SET
name=excluded.name, adv_type=excluded.adv_type,
flags=excluded.flags, out_path_len=excluded.out_path_len,
out_path=excluded.out_path,
last_advert_timestamp=excluded.last_advert_timestamp,
lastmod=excluded.lastmod, gps_lat=excluded.gps_lat,
gps_lon=excluded.gps_lon, sync_since=excluded.sync_since,
updated_at=excluded.updated_at
""",
(
companion_hash,
contact.get("pubkey", b""),
contact.get("name", ""),
contact.get("adv_type", 0),
contact.get("flags", 0),
contact.get("out_path_len", -1),
contact.get("out_path", b""),
contact.get("last_advert_timestamp", 0),
contact.get("lastmod", 0),
contact.get("gps_lat", 0.0),
contact.get("gps_lon", 0.0),
contact.get("sync_since", 0),
now,
),
)
conn.commit()
return True
except Exception as e:
logger.error(f"Failed to upsert companion contact: {e}")
return False
def companion_load_channels(self, companion_hash: str) -> List[Dict]:
"""Load channels for a companion from storage."""
try:
+14 -4
View File
@@ -4,6 +4,7 @@ Advertisement packet handling helper for pyMC Repeater.
This module processes advertisement packets for neighbor tracking and discovery.
"""
import asyncio
import logging
import time
@@ -76,11 +77,16 @@ class AdvertHelper:
route_type = packet.header & PH_ROUTE_MASK
# Check if this is a new neighbor
# Check if this is a new neighbor (run DB read in thread to avoid blocking event loop)
current_time = time.time()
if pubkey not in self._known_neighbors:
# Only check database if not in cache
current_neighbors = self.storage.get_neighbors() if self.storage else {}
if self.storage:
current_neighbors = await asyncio.to_thread(
self.storage.get_neighbors
)
else:
current_neighbors = {}
is_new_neighbor = pubkey not in current_neighbors
if is_new_neighbor:
@@ -110,10 +116,14 @@ class AdvertHelper:
"zero_hop": zero_hop,
}
# Store to database
# Store to database (run in thread so event loop stays responsive;
# blocking here can cause companion TCP clients to disconnect)
if self.storage:
try:
self.storage.record_advert(advert_record)
await asyncio.to_thread(
self.storage.record_advert,
advert_record,
)
except Exception as e:
logger.error(f"Failed to store advert record: {e}")
+24 -20
View File
@@ -209,6 +209,20 @@ class PacketRouter:
if self._should_deliver_path_to_companions(packet):
await companion_bridges[dest_hash].process_received_packet(packet)
processed_by_injection = True
elif companion_bridges and self._should_deliver_path_to_companions(packet):
# Dest not in bridges: path-return with ephemeral dest (e.g. multi-hop login).
# Deliver to all bridges; each will try to decrypt and ignore if not relevant.
for bridge in companion_bridges.values():
try:
await bridge.process_received_packet(packet)
except Exception as e:
logger.debug(f"Companion bridge PATH error: {e}")
logger.debug(
"PATH dest=0x%02x (anon) delivered to %d bridge(s) for matching",
dest_hash or 0,
len(companion_bridges),
)
processed_by_injection = True
elif self.daemon.path_helper:
await self.daemon.path_helper.process_path_packet(packet)
@@ -243,29 +257,19 @@ class PacketRouter:
len(companion_bridges),
)
processed_by_injection = True
elif companion_bridges and len(companion_bridges) == 1:
# Single bridge and dest not in bridges: likely ANON_REQ response (dest = ephemeral
# sender hash). Deliver to the only bridge so telemetry/login responses reach the client.
(single_bridge_hash,) = companion_bridges.keys()
try:
await list(companion_bridges.values())[0].process_received_packet(packet)
logger.info(
"RESPONSE dest=0x%02x (anon) delivered to sole companion bridge 0x%02x",
dest_hash or 0,
single_bridge_hash,
)
except Exception as e:
logger.debug(f"Companion bridge RESPONSE (anon) error: {e}")
processed_by_injection = True
elif companion_bridges:
# Multiple bridges; cannot guess which one. Log and drop.
src_hash = packet.payload[1] if packet.payload and len(packet.payload) >= 2 else None
# Dest not in bridges and not local: likely ANON_REQ response (dest = ephemeral
# sender hash). Deliver to all bridges; each will try to decrypt and ignore if
# not relevant (firmware-like behavior, works with multiple companion bridges).
for bridge in companion_bridges.values():
try:
await bridge.process_received_packet(packet)
except Exception as e:
logger.debug(f"Companion bridge RESPONSE error: {e}")
logger.debug(
"RESPONSE dest=0x%02x src=0x%02x not for us (bridges %s, local=0x%02x)",
"RESPONSE dest=0x%02x (anon) delivered to %d bridge(s) for matching",
dest_hash or 0,
src_hash if src_hash is not None else 0,
[f"0x{h:02x}" for h in companion_bridges],
local_hash if local_hash is not None else 0,
len(companion_bridges),
)
processed_by_injection = True