Compare commits

...

3 Commits

Author SHA1 Message Date
pdxlocations
c5fa47f2ff get environment_metrics from payload 2025-08-23 00:57:34 -07:00
pdxlocations
6d6a121a56 get device_metrics from payload 2025-08-23 00:55:20 -07:00
pdxlocations
43430b3725 parse protobufs 2025-08-23 00:27:07 -07:00
2 changed files with 42 additions and 7 deletions

View File

@@ -7,6 +7,7 @@ from typing import Union
from contact.utilities.utils import get_channels, get_readable_duration, get_time_ago, refresh_node_list
from contact.settings import settings_menu
from contact.message_handlers.tx_handler import send_message, send_traceroute
from contact.utilities.utils import parse_protobuf
from contact.ui.colors import get_color
from contact.utilities.db_handler import get_name_from_database, update_node_info_in_db, is_chat_archived
from contact.utilities.input_handlers import get_list_input
@@ -15,6 +16,7 @@ import contact.ui.dialog
from contact.ui.nav_utils import move_main_highlight, draw_main_arrows, get_msg_window_lines, wrap_text
from contact.utilities.singleton import ui_state, interface_state, menu_state
MIN_COL = 1 # "effectively zero" without breaking curses
root_win = None # set in main_ui
@@ -832,14 +834,14 @@ def draw_packetlog_win() -> None:
else get_name_from_database(packet["to"], "short").ljust(columns[1])
)
if "decoded" in packet:
port = packet["decoded"]["portnum"].ljust(columns[2])
payload = (packet["decoded"]["payload"]).ljust(columns[3])
port = str(packet["decoded"].get("portnum", "")).ljust(columns[2])
parsed_payload = parse_protobuf(packet)
else:
port = "NO KEY".ljust(columns[2])
payload = "NO KEY".ljust(columns[3])
parsed_payload = "NO KEY"
# Combine and truncate if necessary
logString = f"{from_id} {to_id} {port} {payload}"
logString = f"{from_id} {to_id} {port} {parsed_payload}"
logString = logString[: width - 3]
# Add to the window

View File

@@ -1,8 +1,11 @@
import datetime
import time
from meshtastic.protobuf import config_pb2
import contact.ui.default_config as config
from typing import Optional, Union
from google.protobuf.message import DecodeError
from meshtastic import protocols
from meshtastic.protobuf import config_pb2, mesh_pb2, portnums_pb2
import contact.ui.default_config as config
from contact.utilities.singleton import ui_state, interface_state
@@ -136,6 +139,7 @@ def get_time_ago(timestamp):
return f"{value} {unit} ago"
return "now"
def add_new_message(channel_id, prefix, message):
if channel_id not in ui_state.all_messages:
ui_state.all_messages[channel_id] = []
@@ -162,4 +166,33 @@ def add_new_message(channel_id, prefix, message):
ui_state.all_messages[channel_id].append((f"-- {current_hour} --", ""))
# Add the message
ui_state.all_messages[channel_id].append((prefix,message))
ui_state.all_messages[channel_id].append((prefix, message))
def parse_protobuf(packet: dict) -> Union[str, dict]:
"""Attempt to parse a decoded payload using the registered protobuf handler."""
try:
decoded = packet.get("decoded") or {}
portnum = decoded.get("portnum")
payload = decoded.get("payload")
if isinstance(payload, str):
return payload
handler = protocols.get(portnums_pb2.PortNum.Value(portnum)) if portnum is not None else None
if handler is not None and handler.protobufFactory is not None:
try:
pb = handler.protobufFactory()
pb.ParseFromString(bytes(payload))
if hasattr(pb, "device_metrics") and pb.HasField("device_metrics"):
return str(pb.device_metrics).replace("\n", " ").replace("\r", " ").strip()
if hasattr(pb, "environment_metrics") and pb.HasField("environment_metrics"):
return str(pb.environment_metrics).replace("\n", " ").replace("\r", " ").strip()
return str(pb).replace("\n", " ").replace("\r", " ").strip()
except DecodeError:
return payload
return payload
except Exception:
return payload