From 2490284435b50b08e2ad6db2d191dbeee8870164 Mon Sep 17 00:00:00 2001 From: pablorevilla-meshtastic Date: Thu, 5 Mar 2026 10:05:14 -0800 Subject: [PATCH] fix ingestion error --- meshview/mqtt_store.py | 59 ++++++++++++++++++++++++++---------------- 1 file changed, 37 insertions(+), 22 deletions(-) diff --git a/meshview/mqtt_store.py b/meshview/mqtt_store.py index 03f1ad2..abedc49 100644 --- a/meshview/mqtt_store.py +++ b/meshview/mqtt_store.py @@ -181,7 +181,7 @@ async def process_envelope(topic, env): except IntegrityError: pass - # --- PacketSeen (no conflict handling here, normal insert) + # --- PacketSeen insert with conflict-safe handling if not env.gateway_id: print("WARNING: Missing gateway_id, skipping PacketSeen entry") @@ -196,29 +196,44 @@ async def process_envelope(topic, env): update(Node).where(Node.node_id == node_id).values(is_mqtt_gateway=True) ) - result = await session.execute( - select(PacketSeen).where( - PacketSeen.packet_id == env.packet.id, - PacketSeen.node_id == node_id, - PacketSeen.rx_time == env.packet.rx_time, + now_us = int(time.time() * 1_000_000) + seen_values = { + "packet_id": env.packet.id, + "node_id": int(env.gateway_id[1:], 16), + "channel": env.channel_id, + "rx_time": env.packet.rx_time, + "rx_snr": env.packet.rx_snr, + "rx_rssi": env.packet.rx_rssi, + "hop_limit": env.packet.hop_limit, + "hop_start": env.packet.hop_start, + "relay_node": env.packet.relay_node if env.packet.relay_node else None, + "topic": topic, + "import_time_us": now_us, + } + dialect = session.get_bind().dialect.name + seen_stmt = None + if dialect == "sqlite": + seen_stmt = ( + sqlite_insert(PacketSeen) + .values(**seen_values) + .on_conflict_do_nothing(index_elements=["packet_id", "node_id", "rx_time"]) ) - ) - if not result.scalar_one_or_none(): - now_us = int(time.time() * 1_000_000) - seen = PacketSeen( - packet_id=env.packet.id, - node_id=int(env.gateway_id[1:], 16), - channel=env.channel_id, - rx_time=env.packet.rx_time, - rx_snr=env.packet.rx_snr, - rx_rssi=env.packet.rx_rssi, - hop_limit=env.packet.hop_limit, - hop_start=env.packet.hop_start, - relay_node=env.packet.relay_node if env.packet.relay_node else None, - topic=topic, - import_time_us=now_us, + elif dialect == "postgresql": + seen_stmt = ( + pg_insert(PacketSeen) + .values(**seen_values) + .on_conflict_do_nothing(index_elements=["packet_id", "node_id", "rx_time"]) ) - session.add(seen) + + if seen_stmt is not None: + await session.execute(seen_stmt) + else: + try: + async with session.begin_nested(): + session.add(PacketSeen(**seen_values)) + await session.flush() + except IntegrityError: + pass # --- NODEINFO_APP handling if env.packet.decoded.portnum == PortNum.NODEINFO_APP: