From 789a2f27ea231ff535f95f85d91e62e9d3228c26 Mon Sep 17 00:00:00 2001 From: agessaman Date: Tue, 24 Feb 2026 21:51:56 -0800 Subject: [PATCH] Enhance PacketRouter and CompanionFrameServer for improved packet delivery and contact persistence - Updated PacketRouter to deliver packets to all companion bridges when the destination is not recognized, ensuring better handling of ephemeral destinations. - Refactored CompanionFrameServer to separate contact serialization and persistence logic, allowing for non-blocking database operations. - Introduced a unique index for companion contacts in SQLite to support upsert functionality, enhancing data integrity and performance. - Improved AdvertHelper to run database operations in a separate thread, preventing event loop blocking and maintaining responsiveness. --- repeater/companion/frame_server.py | 76 +++++++++++++-------- repeater/data_acquisition/sqlite_handler.py | 66 ++++++++++++++++++ repeater/handler_helpers/advert.py | 18 +++-- repeater/packet_router.py | 44 ++++++------ 4 files changed, 152 insertions(+), 52 deletions(-) diff --git a/repeater/companion/frame_server.py b/repeater/companion/frame_server.py index a019702..499bbcf 100644 --- a/repeater/companion/frame_server.py +++ b/repeater/companion/frame_server.py @@ -83,37 +83,53 @@ class CompanionFrameServer(_BaseFrameServer): path_len=msg_dict.get("path_len", 0), ) - def _save_contacts(self) -> None: - """Persist contacts to SQLite.""" + @staticmethod + def _contact_to_dict(c) -> dict: + """Convert a Contact object to a persistence dict.""" + pk = c.public_key if isinstance(c.public_key, bytes) else bytes.fromhex(c.public_key) + return { + "pubkey": pk, + "name": c.name, + "adv_type": c.adv_type, + "flags": c.flags, + "out_path_len": c.out_path_len, + "out_path": ( + c.out_path + if isinstance(c.out_path, bytes) + else (bytes.fromhex(c.out_path) if c.out_path else b"") + ), + "last_advert_timestamp": c.last_advert_timestamp, + "lastmod": c.lastmod, + "gps_lat": c.gps_lat, + "gps_lon": c.gps_lon, + "sync_since": c.sync_since, + } + + async def _persist_contact(self, contact) -> None: + """Upsert a single contact to SQLite (non-blocking).""" + if not self.sqlite_handler: + return + contact_dict = self._contact_to_dict(contact) + await asyncio.to_thread( + self.sqlite_handler.companion_upsert_contact, + self.companion_hash, + contact_dict, + ) + + async def _save_contacts(self) -> None: + """Persist all contacts to SQLite (non-blocking).""" if not self.sqlite_handler: return contacts = self.bridge.get_contacts() - dicts = [] - for c in contacts: - pk = c.public_key if isinstance(c.public_key, bytes) else bytes.fromhex(c.public_key) - dicts.append( - { - "pubkey": pk, - "name": c.name, - "adv_type": c.adv_type, - "flags": c.flags, - "out_path_len": c.out_path_len, - "out_path": ( - c.out_path - if isinstance(c.out_path, bytes) - else (bytes.fromhex(c.out_path) if c.out_path else b"") - ), - "last_advert_timestamp": c.last_advert_timestamp, - "lastmod": c.lastmod, - "gps_lat": c.gps_lat, - "gps_lon": c.gps_lon, - "sync_since": c.sync_since, - } - ) - self.sqlite_handler.companion_save_contacts(self.companion_hash, dicts) + dicts = [self._contact_to_dict(c) for c in contacts] + await asyncio.to_thread( + self.sqlite_handler.companion_save_contacts, + self.companion_hash, + dicts, + ) - def _save_channels(self) -> None: - """Persist channels to SQLite.""" + async def _save_channels(self) -> None: + """Persist channels to SQLite (non-blocking).""" if not self.sqlite_handler: return channels = [] @@ -128,4 +144,8 @@ class CompanionFrameServer(_BaseFrameServer): "secret": ch.secret, } ) - self.sqlite_handler.companion_save_channels(self.companion_hash, channels) + await asyncio.to_thread( + self.sqlite_handler.companion_save_channels, + self.companion_hash, + channels, + ) diff --git a/repeater/data_acquisition/sqlite_handler.py b/repeater/data_acquisition/sqlite_handler.py index 812dbf6..c006cc1 100644 --- a/repeater/data_acquisition/sqlite_handler.py +++ b/repeater/data_acquisition/sqlite_handler.py @@ -381,6 +381,29 @@ class SQLiteHandler: ) logger.info(f"Migration '{migration_name}' applied successfully") + # Migration 5: Add UNIQUE index on companion_contacts(companion_hash, pubkey) + # Required for ON CONFLICT upsert in companion_upsert_contact. + migration_name = "unique_companion_contacts_pubkey" + existing = conn.execute( + "SELECT migration_name FROM migrations WHERE migration_name = ?", + (migration_name,), + ).fetchone() + + if not existing: + # Replace the non-unique index with a UNIQUE one + conn.execute( + "DROP INDEX IF EXISTS idx_companion_contacts_pubkey" + ) + conn.execute( + "CREATE UNIQUE INDEX IF NOT EXISTS idx_companion_contacts_hash_pubkey " + "ON companion_contacts (companion_hash, pubkey)" + ) + conn.execute( + "INSERT INTO migrations (migration_name, applied_at) VALUES (?, ?)", + (migration_name, time.time()), + ) + logger.info(f"Migration '{migration_name}' applied successfully") + conn.commit() except Exception as e: @@ -1674,6 +1697,49 @@ class SQLiteHandler: logger.error(f"Failed to save companion contacts: {e}") return False + def companion_upsert_contact(self, companion_hash: str, contact: dict) -> bool: + """Insert or update a single contact for a companion in storage.""" + try: + with sqlite3.connect(self.sqlite_path) as conn: + now = time.time() + conn.execute( + """ + INSERT INTO companion_contacts + (companion_hash, pubkey, name, adv_type, flags, out_path_len, out_path, + last_advert_timestamp, lastmod, gps_lat, gps_lon, sync_since, updated_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + ON CONFLICT(companion_hash, pubkey) + DO UPDATE SET + name=excluded.name, adv_type=excluded.adv_type, + flags=excluded.flags, out_path_len=excluded.out_path_len, + out_path=excluded.out_path, + last_advert_timestamp=excluded.last_advert_timestamp, + lastmod=excluded.lastmod, gps_lat=excluded.gps_lat, + gps_lon=excluded.gps_lon, sync_since=excluded.sync_since, + updated_at=excluded.updated_at + """, + ( + companion_hash, + contact.get("pubkey", b""), + contact.get("name", ""), + contact.get("adv_type", 0), + contact.get("flags", 0), + contact.get("out_path_len", -1), + contact.get("out_path", b""), + contact.get("last_advert_timestamp", 0), + contact.get("lastmod", 0), + contact.get("gps_lat", 0.0), + contact.get("gps_lon", 0.0), + contact.get("sync_since", 0), + now, + ), + ) + conn.commit() + return True + except Exception as e: + logger.error(f"Failed to upsert companion contact: {e}") + return False + def companion_load_channels(self, companion_hash: str) -> List[Dict]: """Load channels for a companion from storage.""" try: diff --git a/repeater/handler_helpers/advert.py b/repeater/handler_helpers/advert.py index 74354d5..3a6fc91 100644 --- a/repeater/handler_helpers/advert.py +++ b/repeater/handler_helpers/advert.py @@ -4,6 +4,7 @@ Advertisement packet handling helper for pyMC Repeater. This module processes advertisement packets for neighbor tracking and discovery. """ +import asyncio import logging import time @@ -76,11 +77,16 @@ class AdvertHelper: route_type = packet.header & PH_ROUTE_MASK - # Check if this is a new neighbor + # Check if this is a new neighbor (run DB read in thread to avoid blocking event loop) current_time = time.time() if pubkey not in self._known_neighbors: # Only check database if not in cache - current_neighbors = self.storage.get_neighbors() if self.storage else {} + if self.storage: + current_neighbors = await asyncio.to_thread( + self.storage.get_neighbors + ) + else: + current_neighbors = {} is_new_neighbor = pubkey not in current_neighbors if is_new_neighbor: @@ -110,10 +116,14 @@ class AdvertHelper: "zero_hop": zero_hop, } - # Store to database + # Store to database (run in thread so event loop stays responsive; + # blocking here can cause companion TCP clients to disconnect) if self.storage: try: - self.storage.record_advert(advert_record) + await asyncio.to_thread( + self.storage.record_advert, + advert_record, + ) except Exception as e: logger.error(f"Failed to store advert record: {e}") diff --git a/repeater/packet_router.py b/repeater/packet_router.py index 80343d1..843fc44 100644 --- a/repeater/packet_router.py +++ b/repeater/packet_router.py @@ -209,6 +209,20 @@ class PacketRouter: if self._should_deliver_path_to_companions(packet): await companion_bridges[dest_hash].process_received_packet(packet) processed_by_injection = True + elif companion_bridges and self._should_deliver_path_to_companions(packet): + # Dest not in bridges: path-return with ephemeral dest (e.g. multi-hop login). + # Deliver to all bridges; each will try to decrypt and ignore if not relevant. + for bridge in companion_bridges.values(): + try: + await bridge.process_received_packet(packet) + except Exception as e: + logger.debug(f"Companion bridge PATH error: {e}") + logger.debug( + "PATH dest=0x%02x (anon) delivered to %d bridge(s) for matching", + dest_hash or 0, + len(companion_bridges), + ) + processed_by_injection = True elif self.daemon.path_helper: await self.daemon.path_helper.process_path_packet(packet) @@ -243,29 +257,19 @@ class PacketRouter: len(companion_bridges), ) processed_by_injection = True - elif companion_bridges and len(companion_bridges) == 1: - # Single bridge and dest not in bridges: likely ANON_REQ response (dest = ephemeral - # sender hash). Deliver to the only bridge so telemetry/login responses reach the client. - (single_bridge_hash,) = companion_bridges.keys() - try: - await list(companion_bridges.values())[0].process_received_packet(packet) - logger.info( - "RESPONSE dest=0x%02x (anon) delivered to sole companion bridge 0x%02x", - dest_hash or 0, - single_bridge_hash, - ) - except Exception as e: - logger.debug(f"Companion bridge RESPONSE (anon) error: {e}") - processed_by_injection = True elif companion_bridges: - # Multiple bridges; cannot guess which one. Log and drop. - src_hash = packet.payload[1] if packet.payload and len(packet.payload) >= 2 else None + # Dest not in bridges and not local: likely ANON_REQ response (dest = ephemeral + # sender hash). Deliver to all bridges; each will try to decrypt and ignore if + # not relevant (firmware-like behavior, works with multiple companion bridges). + for bridge in companion_bridges.values(): + try: + await bridge.process_received_packet(packet) + except Exception as e: + logger.debug(f"Companion bridge RESPONSE error: {e}") logger.debug( - "RESPONSE dest=0x%02x src=0x%02x not for us (bridges %s, local=0x%02x)", + "RESPONSE dest=0x%02x (anon) delivered to %d bridge(s) for matching", dest_hash or 0, - src_hash if src_hash is not None else 0, - [f"0x{h:02x}" for h in companion_bridges], - local_hash if local_hash is not None else 0, + len(companion_bridges), ) processed_by_injection = True