mirror of
https://github.com/rightup/pyMC_Repeater.git
synced 2026-03-28 17:43:06 +01:00
Fix message persistence and deduplication in CompanionFrameServer
- Updated `_persist_companion_message` to clarify deduplication of messages in SQLite. - Modified `on_message_received` and `on_channel_message_received` to include `packet_hash` for message identification. - Enhanced `SQLiteHandler` to support deduplication by `packet_hash` when pushing messages, preventing duplicates in the database. - Added `packet_hash` column to the `companion_messages` table and created an index for efficient lookups.
This commit is contained in:
@@ -149,7 +149,7 @@ class CompanionFrameServer:
|
||||
logger.info(f"Companion frame server stopped (port={self.port})")
|
||||
|
||||
def _persist_companion_message(self, msg_dict: dict) -> None:
|
||||
"""Persist a message to SQLite and remove it from the bridge queue so it is delivered once from SQLite."""
|
||||
"""Persist a message to SQLite (deduplicated) and remove it from the bridge queue so it is delivered once from SQLite."""
|
||||
if not self.sqlite_handler:
|
||||
return
|
||||
self.sqlite_handler.companion_push_message(self.companion_hash, msg_dict)
|
||||
@@ -167,7 +167,7 @@ class CompanionFrameServer:
|
||||
except Exception as e:
|
||||
logger.debug(f"Push write error: {e}")
|
||||
|
||||
async def on_message_received(sender_key, text, timestamp, txt_type):
|
||||
async def on_message_received(sender_key, text, timestamp, txt_type, packet_hash=None):
|
||||
msg_dict = {
|
||||
"sender_key": sender_key,
|
||||
"text": text,
|
||||
@@ -176,6 +176,7 @@ class CompanionFrameServer:
|
||||
"is_channel": False,
|
||||
"channel_idx": 0,
|
||||
"path_len": 0,
|
||||
"packet_hash": packet_hash,
|
||||
}
|
||||
self._persist_companion_message(msg_dict)
|
||||
_write_push(bytes([PUSH_CODE_MSG_WAITING]))
|
||||
@@ -232,7 +233,7 @@ class CompanionFrameServer:
|
||||
_write_push(bytes([PUSH_CODE_PATH_UPDATED]) + pub_key[:32])
|
||||
|
||||
async def on_channel_message_received(
|
||||
channel_name, sender_name, message_text, timestamp, path_len=0, channel_idx=0
|
||||
channel_name, sender_name, message_text, timestamp, path_len=0, channel_idx=0, packet_hash=None
|
||||
):
|
||||
msg_dict = {
|
||||
"sender_key": b"",
|
||||
@@ -242,6 +243,7 @@ class CompanionFrameServer:
|
||||
"is_channel": True,
|
||||
"channel_idx": channel_idx,
|
||||
"path_len": path_len,
|
||||
"packet_hash": packet_hash,
|
||||
}
|
||||
self._persist_companion_message(msg_dict)
|
||||
_write_push(bytes([PUSH_CODE_MSG_WAITING]))
|
||||
|
||||
@@ -301,6 +301,7 @@ class SQLiteHandler:
|
||||
is_channel INTEGER NOT NULL DEFAULT 0,
|
||||
channel_idx INTEGER NOT NULL DEFAULT 0,
|
||||
path_len INTEGER NOT NULL DEFAULT 0,
|
||||
packet_hash TEXT,
|
||||
created_at REAL NOT NULL
|
||||
)
|
||||
""")
|
||||
@@ -308,6 +309,9 @@ class SQLiteHandler:
|
||||
conn.execute("CREATE INDEX IF NOT EXISTS idx_companion_contacts_pubkey ON companion_contacts(companion_hash, pubkey)")
|
||||
conn.execute("CREATE INDEX IF NOT EXISTS idx_companion_channels_hash ON companion_channels(companion_hash)")
|
||||
conn.execute("CREATE INDEX IF NOT EXISTS idx_companion_messages_hash ON companion_messages(companion_hash)")
|
||||
conn.execute(
|
||||
"CREATE INDEX IF NOT EXISTS idx_companion_messages_hash_packet ON companion_messages(companion_hash, packet_hash)"
|
||||
)
|
||||
logger.info("Created companion_contacts, companion_channels, companion_messages tables")
|
||||
|
||||
conn.execute(
|
||||
@@ -1514,22 +1518,35 @@ class SQLiteHandler:
|
||||
return []
|
||||
|
||||
def companion_push_message(self, companion_hash: str, msg: Dict) -> bool:
|
||||
"""Append a message to the companion's queue."""
|
||||
"""Append a message to the companion's queue. Deduplicates by packet_hash when present. Returns True if inserted, False if duplicate (skipped)."""
|
||||
try:
|
||||
packet_hash = msg.get("packet_hash") or None
|
||||
if isinstance(packet_hash, bytes):
|
||||
packet_hash = packet_hash.decode("utf-8", errors="replace") if packet_hash else None
|
||||
sender_key = msg.get("sender_key", b"")
|
||||
with sqlite3.connect(self.sqlite_path) as conn:
|
||||
if packet_hash:
|
||||
cursor = conn.execute("""
|
||||
SELECT id FROM companion_messages
|
||||
WHERE companion_hash = ? AND packet_hash = ?
|
||||
LIMIT 1
|
||||
""", (companion_hash, packet_hash))
|
||||
if cursor.fetchone():
|
||||
return False
|
||||
conn.execute("""
|
||||
INSERT INTO companion_messages
|
||||
(companion_hash, sender_key, txt_type, timestamp, text, is_channel, channel_idx, path_len, created_at)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||
(companion_hash, sender_key, txt_type, timestamp, text, is_channel, channel_idx, path_len, packet_hash, created_at)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||
""", (
|
||||
companion_hash,
|
||||
msg.get("sender_key", b""),
|
||||
sender_key,
|
||||
msg.get("txt_type", 0),
|
||||
msg.get("timestamp", 0),
|
||||
msg.get("text", ""),
|
||||
int(msg.get("is_channel", False)),
|
||||
msg.get("channel_idx", 0),
|
||||
msg.get("path_len", 0),
|
||||
packet_hash,
|
||||
time.time(),
|
||||
))
|
||||
conn.commit()
|
||||
|
||||
Reference in New Issue
Block a user