2 Commits

Author SHA1 Message Date
Jason Michalski
b19b51d6a6 hack 2024-06-28 21:20:48 -07:00
Jason Michalski
414beaaab0 Add traceroute times. 2024-06-11 21:00:57 -07:00
4 changed files with 64 additions and 42 deletions

View File

@@ -3,7 +3,7 @@ from datetime import datetime
from sqlalchemy.orm import DeclarativeBase, foreign
from sqlalchemy.ext.asyncio import AsyncAttrs
from sqlalchemy.orm import mapped_column, relationship, Mapped
from sqlalchemy import ForeignKey, BigInteger
from sqlalchemy import ForeignKey, BigInteger, DateTime
class Base(AsyncAttrs, DeclarativeBase):
@@ -23,7 +23,8 @@ class Node(Base):
class Packet(Base):
__tablename__ = "packet"
id: Mapped[int] = mapped_column(BigInteger, primary_key=True)
pk: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
id: Mapped[int] = mapped_column(BigInteger)
portnum: Mapped[int]
from_node_id: Mapped[int] = mapped_column(BigInteger)
from_node: Mapped["Node"] = relationship(
@@ -34,17 +35,18 @@ class Packet(Base):
primaryjoin="Packet.to_node_id == foreign(Node.node_id)", lazy="joined"
)
payload: Mapped[bytes]
import_time: Mapped[datetime]
import_time = mapped_column(DateTime)
class PacketSeen(Base):
__tablename__ = "packet_seen"
packet_id = mapped_column(ForeignKey("packet.id"), primary_key=True)
node_id: Mapped[int] = mapped_column(BigInteger, primary_key=True)
pk: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
packet_id = mapped_column(ForeignKey("packet.id"))
node_id: Mapped[int] = mapped_column(BigInteger)
node: Mapped["Node"] = relationship(
lazy="joined", primaryjoin="PacketSeen.node_id == foreign(Node.node_id)"
)
rx_time: Mapped[int] = mapped_column(BigInteger, primary_key=True)
rx_time: Mapped[int] = mapped_column(BigInteger)
hop_limit: Mapped[int]
channel: Mapped[str]
rx_snr: Mapped[float] = mapped_column(nullable=True)
@@ -60,4 +62,4 @@ class Traceroute(Base):
gateway_node_id: Mapped[int] = mapped_column(BigInteger)
done: Mapped[bool]
route: Mapped[bytes]
import_time: Mapped[datetime]

View File

@@ -30,13 +30,14 @@ async def get_topic_envelopes(topics):
while True:
try:
async with aiomqtt.Client(
"mqtt.meshtastic.org", username="meshdev", password="large4cats"
"192.168.10.52"
) as client:
for topic in topics:
await client.subscribe(topic)
async for msg in client.messages:
try:
envelope = ServiceEnvelope.FromString(msg.payload)
print(envelope)
except DecodeError:
continue
decrypt(envelope.packet)

View File

@@ -17,40 +17,40 @@ async def process_envelope(topic, env):
async with database.async_session() as session:
result = await session.execute(select(Packet).where(Packet.id == env.packet.id))
new_packet = False
packet = result.scalar_one_or_none()
if not packet:
new_packet = True
packet = Packet(
id=env.packet.id,
portnum=env.packet.decoded.portnum,
from_node_id=getattr(env.packet, "from"),
to_node_id=env.packet.to,
payload=env.packet.SerializeToString(),
import_time=datetime.datetime.utcnow(),
)
session.add(packet)
result = await session.execute(
select(PacketSeen).where(
PacketSeen.packet_id == env.packet.id,
PacketSeen.node_id == int(env.gateway_id[1:], 16),
PacketSeen.rx_time == env.packet.rx_time,
)
#packet = result.scalar_one_or_none()
#if not packet:
new_packet = True
packet = Packet(
id=env.packet.id,
portnum=env.packet.decoded.portnum,
from_node_id=getattr(env.packet, "from"),
to_node_id=env.packet.to,
payload=env.packet.SerializeToString(),
import_time=datetime.datetime.utcnow(),
)
seen = None
if not result.scalar_one_or_none():
seen = PacketSeen(
packet_id=env.packet.id,
node_id=int(env.gateway_id[1:], 16),
channel=env.channel_id,
rx_time=env.packet.rx_time,
rx_snr=env.packet.rx_snr,
rx_rssi=env.packet.rx_rssi,
hop_limit=env.packet.hop_limit,
topic=topic,
import_time=datetime.datetime.utcnow(),
)
session.add(seen)
session.add(packet)
#result = await session.execute(
# select(PacketSeen).where(
# PacketSeen.packet_id == env.packet.id,
# PacketSeen.node_id == int(env.gateway_id[1:], 16),
# PacketSeen.rx_time == env.packet.rx_time,
# )
#)
#seen = None
#if not result.scalar_one_or_none():
seen = PacketSeen(
packet_id=env.packet.id,
node_id=int(env.gateway_id[1:], 16),
channel=env.channel_id,
rx_time=env.packet.rx_time,
rx_snr=env.packet.rx_snr,
rx_rssi=env.packet.rx_rssi,
hop_limit=env.packet.hop_limit,
topic=topic,
import_time=datetime.datetime.utcnow(),
)
session.add(seen)
if env.packet.decoded.portnum == PortNum.NODEINFO_APP:
user = decode_payload.decode_payload(
@@ -109,6 +109,7 @@ async def process_envelope(topic, env):
route=env.packet.decoded.payload,
done=not env.packet.decoded.want_response,
gateway_node_id=int(env.gateway_id[1:], 16),
import_time=datetime.datetime.utcnow(),
))
await session.commit()
@@ -169,7 +170,7 @@ async def get_packets_from(node_id=None, portnum=None, limit=500):
async def get_packet(packet_id):
async with database.async_session() as session:
q = select(Packet).where(Packet.id == packet_id)
q = select(Packet).where(Packet.id == packet_id).limit(1)
result = await session.execute(q)
return result.scalar_one_or_none()
@@ -207,5 +208,6 @@ async def get_traceroute(packet_id):
result = await session.execute(
select(Traceroute)
.where(Traceroute.packet_id == packet_id)
.order_by(Traceroute.import_time)
)
return result.scalars()

View File

@@ -557,6 +557,7 @@ async def graph_traceroute(request):
mqtt_nodes = set()
saw_reply = set()
dest = None
node_seen_time = {}
for tr in traceroutes:
if tr.done:
saw_reply.add(tr.gateway_node_id)
@@ -571,6 +572,13 @@ async def graph_traceroute(request):
elif path[-1] != tr.gateway_node_id:
# It seems some nodes add them self to the list before uplinking
path.append(tr.gateway_node_id)
if not tr.done:
for node_id in path[-2:]:
if node_id not in node_seen_time:
if tr.import_time:
node_seen_time[node_id] = tr.import_time
mqtt_nodes.add(tr.gateway_node_id)
node_color[path[-1]] = '#' + hex(hash(tuple(path)))[3:9]
paths.add(tuple(path))
@@ -579,12 +587,21 @@ async def graph_traceroute(request):
for path in paths:
used_nodes.update(path)
import_times = [tr.import_time for tr in traceroutes if tr.import_time]
if import_times:
first_time = min(import_times)
else:
first_time = 0
for node_id in used_nodes:
node = await nodes[node_id]
if not node:
node_name = node_id_to_hex(node_id)
else:
node_name = f'[{node.short_name}] {node.long_name} - {node_id_to_hex(node_id)}'
if node_id in node_seen_time:
ms = (node_seen_time[node_id] - first_time).total_seconds() * 1000
node_name += f'\n {ms:.2f}ms'
style = 'dashed'
if node_id == dest:
style = 'filled'