Update to the data models to show new fields on DB

This commit is contained in:
Pablo Revilla
2025-01-25 18:27:11 -08:00
parent 37ed0369e4
commit 40310d782a
2 changed files with 358 additions and 4 deletions

View File

@@ -9,7 +9,7 @@ from sqlalchemy import ForeignKey, BigInteger
class Base(AsyncAttrs, DeclarativeBase):
pass
# Node
class Node(Base):
__tablename__ = "node"
id: Mapped[str] = mapped_column(primary_key=True)

View File

@@ -1086,18 +1086,372 @@ async def stats(request):
total_packets = await store.get_total_packet_count()
total_nodes = await store.get_total_node_count()
total_packets_seen = await store.get_total_packet_seen_count()
total_nodes_longfast = await store.get_total_node_count_longfast()
total_nodes_mediumslow = await store.get_total_node_count_mediumslow()
# Render the stats template with the total packet count
template = env.get_template("stats.html")
return web.Response(
text=template.render(total_packets=total_packets, total_nodes=total_nodes,total_packets_seen=total_packets_seen ),
text=template.render(total_packets=total_packets, total_nodes=total_nodes,total_packets_seen=total_packets_seen,total_nodes_longfast=total_nodes_longfast, total_nodes_mediumslow=total_nodes_mediumslow ),
content_type="text/html",
)
# In the stats.html template, you would include something like:
# <p>Total Packets: {{ total_packets }}</p>
@routes.get("/graph/longfast")
async def graph_network_longfast(request):
try:
root = request.query.get("root")
depth = int(request.query.get("depth", 5))
hours = int(request.query.get("hours", 24))
minutes = int(request.query.get("minutes", 0))
# Validation
if root and not root.isdigit():
return web.Response(status=400, text="Invalid root node ID.")
if depth < 1:
return web.Response(status=400, text="Depth must be at least 1.")
since = datetime.timedelta(hours=hours, minutes=minutes)
nodes = {}
node_ids = set()
traceroutes = []
# Fetch traceroutes
for tr in await store.get_traceroutes_longfast(since):
node_ids.add(tr.gateway_node_id)
node_ids.add(tr.packet.from_node_id)
node_ids.add(tr.packet.to_node_id)
route = decode_payload.decode_payload(PortNum.TRACEROUTE_APP, tr.route)
node_ids.update(route.route)
path = [tr.packet.from_node_id]
path.extend(route.route)
if tr.done:
path.append(tr.packet.to_node_id)
else:
if path[-1] != tr.gateway_node_id:
path.append(tr.gateway_node_id)
traceroutes.append((tr, path))
edges = Counter()
edge_type = {}
used_nodes = set()
# Fetch MQTT neighbors
for ps, p in await store.get_mqtt_neighbors_longfast(since):
node_ids.add(ps.node_id)
node_ids.add(p.from_node_id)
used_nodes.add(ps.node_id)
used_nodes.add(p.from_node_id)
edges[(p.from_node_id, ps.node_id)] += 1
edge_type[(p.from_node_id, ps.node_id)] = 'sni'
# Fetch NeighborInfo packets
for packet in await store.get_packets_longfast(portnum=PortNum.NEIGHBORINFO_APP, since=since):
try:
_, neighbor_info = decode_payload.decode(packet)
node_ids.add(packet.from_node_id)
used_nodes.add(packet.from_node_id)
for node in neighbor_info.neighbors:
node_ids.add(node.node_id)
used_nodes.add(node.node_id)
edges[(node.node_id, packet.from_node_id)] += 1
edge_type[(node.node_id, packet.from_node_id)] = 'ni'
except Exception as e:
print(f"Error decoding NeighborInfo packet: {e}")
# Retrieve node details concurrently
async with asyncio.TaskGroup() as tg:
for node_id in node_ids:
nodes[node_id] = tg.create_task(store.get_node(node_id))
tr_done = set()
for tr, path in traceroutes:
if tr.done:
if tr.packet_id in tr_done:
continue
else:
tr_done.add(tr.packet_id)
for src, dest in zip(path, path[1:]):
used_nodes.add(src)
used_nodes.add(dest)
edges[(src, dest)] += 1
edge_type[(src, dest)] = 'tr'
# Helper to get node name
async def get_node_name(node_id):
try:
node = await nodes[node_id]
if node:
return f'[{node.short_name}] {node.long_name}\n{node_id_to_hex(node_id)}'
return node_id_to_hex(node_id)
except Exception as e:
return f"Error retrieving node: {str(e)}"
# Filter nodes and edges if root is specified
if root:
new_used_nodes = set()
new_edges = Counter()
edge_map = {}
for src, dest in edges:
edge_map.setdefault(dest, []).append(src)
queue = [int(root)]
for _ in range(depth):
next_queue = []
for node in queue:
new_used_nodes.add(node)
for dest in edge_map.get(node, []):
new_used_nodes.add(dest)
new_edges[(dest, node)] += 1
next_queue.append(dest)
queue = next_queue
used_nodes = new_used_nodes
edges = new_edges
# Create graph
graph = pydot.Dot('network', graph_type="digraph", layout="neato", overlap="false", model='subset', esep="+5")
for node_id in used_nodes:
node = await nodes[node_id]
color = '#000000'
node_name = await get_node_name(node_id)
if node and node.role in ('ROUTER', 'ROUTER_CLIENT', 'REPEATER'):
color = '#0000FF'
elif node and node.role == 'CLIENT_MUTE':
color = '#00FF00'
graph.add_node(pydot.Node(
str(node_id),
label=node_name,
shape='box',
color=color,
href=f"/graph/network?root={node_id}&amp;depth={depth-1}",
))
# Adjust edge visualization
if edges:
max_edge_count = edges.most_common(1)[0][1]
else:
max_edge_count = 1
size_ratio = 2. / max_edge_count
edge_added = set()
for (src, dest), edge_count in edges.items():
size = max(size_ratio * edge_count, .25)
arrowsize = max(size_ratio * edge_count, .5)
if edge_type[(src, dest)] in ('ni'):
color = '#FF0000'
elif edge_type[(src, dest)] in ('sni'):
color = '#00FF00'
else:
color = '#000000'
edge_dir = "forward"
if (dest, src) in edges and edge_type[(src, dest)] == edge_type[(dest, src)]:
edge_dir = "both"
edge_added.add((dest, src))
if (src, dest) not in edge_added:
edge_added.add((src, dest))
graph.add_edge(pydot.Edge(
str(src),
str(dest),
color=color,
tooltip=f'{await get_node_name(src)} -> {await get_node_name(dest)}',
penwidth=1.85,
dir=edge_dir,
))
return web.Response(
body=graph.create_svg(),
content_type="image/svg+xml",
)
except Exception as e:
print(f"Error in graph_network_longfast: {e}")
return web.Response(status=500, text="Internal Server Error")
@routes.get("/graph/mediumslow")
async def graph_network_mediumslow(request):
try:
root = request.query.get("root")
depth = int(request.query.get("depth", 5))
hours = int(request.query.get("hours", 24))
minutes = int(request.query.get("minutes", 0))
# Validation
if root and not root.isdigit():
return web.Response(status=400, text="Invalid root node ID.")
if depth < 1:
return web.Response(status=400, text="Depth must be at least 1.")
since = datetime.timedelta(hours=hours, minutes=minutes)
nodes = {}
node_ids = set()
traceroutes = []
# Fetch traceroutes
for tr in await store.get_traceroutes_mediumslow(since):
node_ids.add(tr.gateway_node_id)
node_ids.add(tr.packet.from_node_id)
node_ids.add(tr.packet.to_node_id)
route = decode_payload.decode_payload(PortNum.TRACEROUTE_APP, tr.route)
node_ids.update(route.route)
path = [tr.packet.from_node_id]
path.extend(route.route)
if tr.done:
path.append(tr.packet.to_node_id)
else:
if path[-1] != tr.gateway_node_id:
path.append(tr.gateway_node_id)
traceroutes.append((tr, path))
edges = Counter()
edge_type = {}
used_nodes = set()
# Fetch MQTT neighbors
for ps, p in await store.get_mqtt_neighbors_mediumslow(since):
node_ids.add(ps.node_id)
node_ids.add(p.from_node_id)
used_nodes.add(ps.node_id)
used_nodes.add(p.from_node_id)
edges[(p.from_node_id, ps.node_id)] += 1
edge_type[(p.from_node_id, ps.node_id)] = 'sni'
# Fetch NeighborInfo packets
for packet in await store.get_packets_mediumslow(portnum=PortNum.NEIGHBORINFO_APP, since=since):
try:
_, neighbor_info = decode_payload.decode(packet)
node_ids.add(packet.from_node_id)
used_nodes.add(packet.from_node_id)
for node in neighbor_info.neighbors:
node_ids.add(node.node_id)
used_nodes.add(node.node_id)
edges[(node.node_id, packet.from_node_id)] += 1
edge_type[(node.node_id, packet.from_node_id)] = 'ni'
except Exception as e:
print(f"Error decoding NeighborInfo packet: {e}")
# Retrieve node details concurrently
async with asyncio.TaskGroup() as tg:
for node_id in node_ids:
nodes[node_id] = tg.create_task(store.get_node(node_id))
tr_done = set()
for tr, path in traceroutes:
if tr.done:
if tr.packet_id in tr_done:
continue
else:
tr_done.add(tr.packet_id)
for src, dest in zip(path, path[1:]):
used_nodes.add(src)
used_nodes.add(dest)
edges[(src, dest)] += 1
edge_type[(src, dest)] = 'tr'
# Helper to get node name
async def get_node_name(node_id):
try:
node = await nodes[node_id]
if node:
return f'[{node.short_name}] {node.long_name}\n{node_id_to_hex(node_id)}'
return node_id_to_hex(node_id)
except Exception as e:
return f"Error retrieving node: {str(e)}"
# Filter nodes and edges if root is specified
if root:
new_used_nodes = set()
new_edges = Counter()
edge_map = {}
for src, dest in edges:
edge_map.setdefault(dest, []).append(src)
queue = [int(root)]
for _ in range(depth):
next_queue = []
for node in queue:
new_used_nodes.add(node)
for dest in edge_map.get(node, []):
new_used_nodes.add(dest)
new_edges[(dest, node)] += 1
next_queue.append(dest)
queue = next_queue
used_nodes = new_used_nodes
edges = new_edges
# Create graph
graph = pydot.Dot('network', graph_type="digraph", layout="neato", overlap="false", model='subset', esep="+5")
for node_id in used_nodes:
node = await nodes[node_id]
color = '#000000'
node_name = await get_node_name(node_id)
if node and node.role in ('ROUTER', 'ROUTER_CLIENT', 'REPEATER'):
color = '#0000FF'
elif node and node.role == 'CLIENT_MUTE':
color = '#00FF00'
graph.add_node(pydot.Node(
str(node_id),
label=node_name,
shape='box',
color=color,
href=f"/graph/mediumslow?root={node_id}&amp;depth={depth-1}",
))
# Adjust edge visualization
if edges:
max_edge_count = edges.most_common(1)[0][1]
else:
max_edge_count = 1
size_ratio = 2. / max_edge_count
edge_added = set()
for (src, dest), edge_count in edges.items():
size = max(size_ratio * edge_count, .25)
arrowsize = max(size_ratio * edge_count, .5)
if edge_type[(src, dest)] in ('ni'):
color = '#FF0000'
elif edge_type[(src, dest)] in ('sni'):
color = '#00FF00'
else:
color = '#000000'
edge_dir = "forward"
if (dest, src) in edges and edge_type[(src, dest)] == edge_type[(dest, src)]:
edge_dir = "both"
edge_added.add((dest, src))
if (src, dest) not in edge_added:
edge_added.add((src, dest))
graph.add_edge(pydot.Edge(
str(src),
str(dest),
color=color,
tooltip=f'{await get_node_name(src)} -> {await get_node_name(dest)}',
penwidth=1.85,
dir=edge_dir,
))
return web.Response(
body=graph.create_svg(),
content_type="image/svg+xml",
)
except Exception as e:
print(f"Error in graph_network_longfast: {e}")
return web.Response(status=500, text="Internal Server Error")