fix ingestion error

This commit is contained in:
pablorevilla-meshtastic
2026-03-05 10:05:14 -08:00
parent 90767f94b4
commit 2490284435
+37 -22
View File
@@ -181,7 +181,7 @@ async def process_envelope(topic, env):
except IntegrityError:
pass
# --- PacketSeen (no conflict handling here, normal insert)
# --- PacketSeen insert with conflict-safe handling
if not env.gateway_id:
print("WARNING: Missing gateway_id, skipping PacketSeen entry")
@@ -196,29 +196,44 @@ async def process_envelope(topic, env):
update(Node).where(Node.node_id == node_id).values(is_mqtt_gateway=True)
)
result = await session.execute(
select(PacketSeen).where(
PacketSeen.packet_id == env.packet.id,
PacketSeen.node_id == node_id,
PacketSeen.rx_time == env.packet.rx_time,
now_us = int(time.time() * 1_000_000)
seen_values = {
"packet_id": env.packet.id,
"node_id": int(env.gateway_id[1:], 16),
"channel": env.channel_id,
"rx_time": env.packet.rx_time,
"rx_snr": env.packet.rx_snr,
"rx_rssi": env.packet.rx_rssi,
"hop_limit": env.packet.hop_limit,
"hop_start": env.packet.hop_start,
"relay_node": env.packet.relay_node if env.packet.relay_node else None,
"topic": topic,
"import_time_us": now_us,
}
dialect = session.get_bind().dialect.name
seen_stmt = None
if dialect == "sqlite":
seen_stmt = (
sqlite_insert(PacketSeen)
.values(**seen_values)
.on_conflict_do_nothing(index_elements=["packet_id", "node_id", "rx_time"])
)
)
if not result.scalar_one_or_none():
now_us = int(time.time() * 1_000_000)
seen = PacketSeen(
packet_id=env.packet.id,
node_id=int(env.gateway_id[1:], 16),
channel=env.channel_id,
rx_time=env.packet.rx_time,
rx_snr=env.packet.rx_snr,
rx_rssi=env.packet.rx_rssi,
hop_limit=env.packet.hop_limit,
hop_start=env.packet.hop_start,
relay_node=env.packet.relay_node if env.packet.relay_node else None,
topic=topic,
import_time_us=now_us,
elif dialect == "postgresql":
seen_stmt = (
pg_insert(PacketSeen)
.values(**seen_values)
.on_conflict_do_nothing(index_elements=["packet_id", "node_id", "rx_time"])
)
session.add(seen)
if seen_stmt is not None:
await session.execute(seen_stmt)
else:
try:
async with session.begin_nested():
session.add(PacketSeen(**seen_values))
await session.flush()
except IntegrityError:
pass
# --- NODEINFO_APP handling
if env.packet.decoded.portnum == PortNum.NODEINFO_APP: