mirror of
https://github.com/pablorevilla-meshtastic/meshview.git
synced 2026-07-03 16:31:51 +02:00
work on error where packet ids could be duplicate and crash the loop
This commit is contained in:
+11
-6
@@ -1,7 +1,7 @@
|
||||
import datetime
|
||||
import re
|
||||
from sqlalchemy import select
|
||||
from sqlalchemy import update
|
||||
from sqlalchemy.dialects.sqlite import insert as sqlite_insert
|
||||
from meshtastic.protobuf.config_pb2 import Config
|
||||
from meshtastic.protobuf.portnums_pb2 import PortNum
|
||||
from meshtastic.protobuf.mesh_pb2 import User, HardwareModel
|
||||
@@ -12,7 +12,7 @@ from meshview.models import Packet, PacketSeen, Node, Traceroute
|
||||
|
||||
async def process_envelope(topic, env):
|
||||
|
||||
# Checking if the received packet is a MAP_REPORT
|
||||
# MAP_REPORT_APP
|
||||
if env.packet.decoded.portnum == PortNum.MAP_REPORT_APP:
|
||||
node_id = getattr(env.packet, "from")
|
||||
user_id = f"!{node_id:0{8}x}"
|
||||
@@ -72,6 +72,7 @@ async def process_envelope(topic, env):
|
||||
return
|
||||
|
||||
async with mqtt_database.async_session() as session:
|
||||
# --- Packet insert with ON CONFLICT DO NOTHING
|
||||
result = await session.execute(
|
||||
select(Packet).where(Packet.id == env.packet.id)
|
||||
)
|
||||
@@ -79,7 +80,7 @@ async def process_envelope(topic, env):
|
||||
packet = result.scalar_one_or_none()
|
||||
if not packet:
|
||||
new_packet = True
|
||||
packet = Packet(
|
||||
stmt = sqlite_insert(Packet).values(
|
||||
id=env.packet.id,
|
||||
portnum=env.packet.decoded.portnum,
|
||||
from_node_id=getattr(env.packet, "from"),
|
||||
@@ -87,9 +88,10 @@ async def process_envelope(topic, env):
|
||||
payload=env.packet.SerializeToString(),
|
||||
import_time=datetime.datetime.now(),
|
||||
channel=env.channel_id,
|
||||
)
|
||||
session.add(packet)
|
||||
).on_conflict_do_nothing(index_elements=["id"])
|
||||
await session.execute(stmt)
|
||||
|
||||
# --- PacketSeen (no conflict handling here, normal insert)
|
||||
result = await session.execute(
|
||||
select(PacketSeen).where(
|
||||
PacketSeen.packet_id == env.packet.id,
|
||||
@@ -112,13 +114,13 @@ async def process_envelope(topic, env):
|
||||
)
|
||||
session.add(seen)
|
||||
|
||||
# --- NODEINFO_APP handling
|
||||
if env.packet.decoded.portnum == PortNum.NODEINFO_APP:
|
||||
try:
|
||||
user = decode_payload.decode_payload(
|
||||
PortNum.NODEINFO_APP, env.packet.decoded.payload
|
||||
)
|
||||
if user and user.id:
|
||||
# ✅ Safe fix: only parse hex IDs, otherwise leave None
|
||||
if user.id[0] == "!" and re.fullmatch(r"[0-9a-fA-F]+", user.id[1:]):
|
||||
node_id = int(user.id[1:], 16)
|
||||
else:
|
||||
@@ -164,6 +166,7 @@ async def process_envelope(topic, env):
|
||||
except Exception as e:
|
||||
print(f"Error processing NODEINFO_APP: {e}")
|
||||
|
||||
# --- POSITION_APP handling
|
||||
if env.packet.decoded.portnum == PortNum.POSITION_APP:
|
||||
position = decode_payload.decode_payload(
|
||||
PortNum.POSITION_APP, env.packet.decoded.payload
|
||||
@@ -180,6 +183,7 @@ async def process_envelope(topic, env):
|
||||
node.last_long = position.longitude_i
|
||||
session.add(node)
|
||||
|
||||
# --- TRACEROUTE_APP (no conflict handling, normal insert)
|
||||
if env.packet.decoded.portnum == PortNum.TRACEROUTE_APP:
|
||||
packet_id = None
|
||||
if env.packet.decoded.want_response:
|
||||
@@ -202,6 +206,7 @@ async def process_envelope(topic, env):
|
||||
)
|
||||
|
||||
await session.commit()
|
||||
|
||||
if new_packet:
|
||||
await packet.awaitable_attrs.to_node
|
||||
await packet.awaitable_attrs.from_node
|
||||
|
||||
Reference in New Issue
Block a user