diff --git a/repeater/companion/frame_server.py b/repeater/companion/frame_server.py index 80b1eea..d101814 100644 --- a/repeater/companion/frame_server.py +++ b/repeater/companion/frame_server.py @@ -149,7 +149,7 @@ class CompanionFrameServer: logger.info(f"Companion frame server stopped (port={self.port})") def _persist_companion_message(self, msg_dict: dict) -> None: - """Persist a message to SQLite and remove it from the bridge queue so it is delivered once from SQLite.""" + """Persist a message to SQLite (deduplicated) and remove it from the bridge queue so it is delivered once from SQLite.""" if not self.sqlite_handler: return self.sqlite_handler.companion_push_message(self.companion_hash, msg_dict) @@ -167,7 +167,7 @@ class CompanionFrameServer: except Exception as e: logger.debug(f"Push write error: {e}") - async def on_message_received(sender_key, text, timestamp, txt_type): + async def on_message_received(sender_key, text, timestamp, txt_type, packet_hash=None): msg_dict = { "sender_key": sender_key, "text": text, @@ -176,6 +176,7 @@ class CompanionFrameServer: "is_channel": False, "channel_idx": 0, "path_len": 0, + "packet_hash": packet_hash, } self._persist_companion_message(msg_dict) _write_push(bytes([PUSH_CODE_MSG_WAITING])) @@ -232,7 +233,7 @@ class CompanionFrameServer: _write_push(bytes([PUSH_CODE_PATH_UPDATED]) + pub_key[:32]) async def on_channel_message_received( - channel_name, sender_name, message_text, timestamp, path_len=0, channel_idx=0 + channel_name, sender_name, message_text, timestamp, path_len=0, channel_idx=0, packet_hash=None ): msg_dict = { "sender_key": b"", @@ -242,6 +243,7 @@ class CompanionFrameServer: "is_channel": True, "channel_idx": channel_idx, "path_len": path_len, + "packet_hash": packet_hash, } self._persist_companion_message(msg_dict) _write_push(bytes([PUSH_CODE_MSG_WAITING])) diff --git a/repeater/data_acquisition/sqlite_handler.py b/repeater/data_acquisition/sqlite_handler.py index 21ebca5..883017b 100644 --- a/repeater/data_acquisition/sqlite_handler.py +++ b/repeater/data_acquisition/sqlite_handler.py @@ -301,6 +301,7 @@ class SQLiteHandler: is_channel INTEGER NOT NULL DEFAULT 0, channel_idx INTEGER NOT NULL DEFAULT 0, path_len INTEGER NOT NULL DEFAULT 0, + packet_hash TEXT, created_at REAL NOT NULL ) """) @@ -308,6 +309,9 @@ class SQLiteHandler: conn.execute("CREATE INDEX IF NOT EXISTS idx_companion_contacts_pubkey ON companion_contacts(companion_hash, pubkey)") conn.execute("CREATE INDEX IF NOT EXISTS idx_companion_channels_hash ON companion_channels(companion_hash)") conn.execute("CREATE INDEX IF NOT EXISTS idx_companion_messages_hash ON companion_messages(companion_hash)") + conn.execute( + "CREATE INDEX IF NOT EXISTS idx_companion_messages_hash_packet ON companion_messages(companion_hash, packet_hash)" + ) logger.info("Created companion_contacts, companion_channels, companion_messages tables") conn.execute( @@ -1514,22 +1518,35 @@ class SQLiteHandler: return [] def companion_push_message(self, companion_hash: str, msg: Dict) -> bool: - """Append a message to the companion's queue.""" + """Append a message to the companion's queue. Deduplicates by packet_hash when present. Returns True if inserted, False if duplicate (skipped).""" try: + packet_hash = msg.get("packet_hash") or None + if isinstance(packet_hash, bytes): + packet_hash = packet_hash.decode("utf-8", errors="replace") if packet_hash else None + sender_key = msg.get("sender_key", b"") with sqlite3.connect(self.sqlite_path) as conn: + if packet_hash: + cursor = conn.execute(""" + SELECT id FROM companion_messages + WHERE companion_hash = ? AND packet_hash = ? + LIMIT 1 + """, (companion_hash, packet_hash)) + if cursor.fetchone(): + return False conn.execute(""" INSERT INTO companion_messages - (companion_hash, sender_key, txt_type, timestamp, text, is_channel, channel_idx, path_len, created_at) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) + (companion_hash, sender_key, txt_type, timestamp, text, is_channel, channel_idx, path_len, packet_hash, created_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( companion_hash, - msg.get("sender_key", b""), + sender_key, msg.get("txt_type", 0), msg.get("timestamp", 0), msg.get("text", ""), int(msg.get("is_channel", False)), msg.get("channel_idx", 0), msg.get("path_len", 0), + packet_hash, time.time(), )) conn.commit()