From 2d61d369b0abe81b53b343d1ff948894d3807dac Mon Sep 17 00:00:00 2001 From: Daniel Pupius Date: Tue, 22 Apr 2025 10:30:39 -0700 Subject: [PATCH] Update MQTT packet handling to use protobuf structures directly MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Refactored decoder to use protobuf-defined Packet, TopicInfo, and Data structures - Updated MQTT client and broker to work with protobuf structures directly - Improved logging to properly serialize protobuf messages for structured output - Optimized .logista.yaml template for cleaner, more compact log display 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- .logista.yaml | 72 ++++++++++++++++++++++++++++++++------------------ mqtt/logger.go | 34 +++++++++++++++++++++--- mqtt/stats.go | 20 +++++++------- 3 files changed, 86 insertions(+), 40 deletions(-) diff --git a/.logista.yaml b/.logista.yaml index 37b739c..97b7ad0 100644 --- a/.logista.yaml +++ b/.logista.yaml @@ -1,41 +1,61 @@ # Meshstream Logista Configuration # {"level":"info","ts":1745264398.907407,"logger":"main.server","caller":"meshstream/main.go:88","msg":"Starting server on localhost:8080"} -# {"level":"info","ts":1745264404.905592,"logger":"main.mqtt.MessageLogger","caller":"mqtt/subscriber.go:82","msg":"[NODEINFO_APP] Gateway:!fa74d32c Message type: NODEINFO_APP","packet":{"ChannelID":"MediumSlow","GatewayID":"!fa74d32c","ID":1512657627,"From":4201960236,"To":144028282,"HopLimit":4,"HopStart":0,"WantACK":false,"Priority":"BACKGROUND","ViaMQTT":false,"NextHop":0,"RelayNode":0,"PortNum":4,"Payload":{"id":"!fa74d32c","long_name":"MeshLager","short_name":"🍺","macaddr":"9BL6dNMs","hw_model":44},"RequestID":0,"ReplyID":0,"Emoji":0,"Dest":0,"Source":0,"WantResponse":true,"DecodeError":null,"FullTopic":"msh/US/bayarea/2/c/MediumSlow/!fa74d32c","RegionPath":"US/bayarea","Version":"2","Format":"c","Channel":"MediumSlow","UserID":"!fa74d32c"}} -# {"level":"info","ts":1745264405.2398899,"logger":"main.mqtt.MessageLogger","caller":"mqtt/subscriber.go:82","msg":"[TELEMETRY_APP] Gateway:!eb976cd5 Telemetry: device","packet":{"ChannelID":"MediumSlow","GatewayID":"!eb976cd5","ID":2022965751,"From":3149439480,"To":4294967295,"HopLimit":3,"HopStart":3,"WantACK":false,"Priority":"UNSET","ViaMQTT":false,"NextHop":0,"RelayNode":0,"PortNum":67,"Payload":{"time":1745264403,"Variant":{"DeviceMetrics":{"battery_level":100,"voltage":4.19,"channel_utilization":13.37,"air_util_tx":2.5811112,"uptime_seconds":771327}}},"RequestID":0,"ReplyID":0,"Emoji":0,"Dest":0,"Source":0,"WantResponse":false,"DecodeError":null,"FullTopic":"msh/US/bayarea/2/e/MediumSlow/!eb976cd5","RegionPath":"US/bayarea","Version":"2","Format":"e","Channel":"MediumSlow","UserID":"!eb976cd5"}} +# Updated log examples: +# map[caller:mqtt/subscriber.go:83 level:info logger:main.mqtt.MessageLogger msg:[NODEINFO_APP] Gateway:!4358bcb4 Message type: NODEINFO_APP packet:info:{full_topic:"msh/US/bayarea/2/e/LongFast/!4358bcb4" region_path:"US/bayarea" version:"2" format:"e" channel:"LongFast" user_id:"!4358bcb4"} data:{channel_id:"LongFast" gateway_id:"!4358bcb4" id:1496068057 from:4228900486 to:902833813 hop_limit:5 hop_start:5 priority:"UNSET" port_num:NODEINFO_APP node_info:{id:"!fc0fe686" long_name:"Meshtastic e686" short_name:"e686" macaddr:"\xc1.\xfc\x0f\xe6\x86" hw_model:TRACKER_T1000_E role:CLIENT_MUTE public_key:"\xb3\x82\x08\x0f\x1e\xbc\x1e *\xdc\xf6ZW\x0fナ\x14\xb3\x18Gk<7\x83\x86\xb7\xf5\xf2\xc5J\x01"} want_response:true} ts:1.745300044594027e+09] format: | {{- $timestamp := .ts | date | color "blue" -}} {{- $level := .level | colorByLevel .level | bold -}} {{- $logger := .logger | dim -}} - {{- $message := .msg -}} - + {{- $message := .msg }} {{$timestamp }} {{ $level }} [{{ $logger }}] {{ $message }} - {{if .packet -}} - {{- $data := .packet.data -}} - {{"Channel:" | dim }} {{ $data.ChannelID | color "green" }} - {{ "From:" | dim }} {{ $data.From | mult 1 }} {{ "To:" | dim }} {{ $data.To | mult 1 }} - {{ "Gateway:" | dim }} {{ $data.GatewayID | color "yellow" }} - {{ "Priority:" | dim }} {{ $data.Priority }} - {{ "PortNum:" | dim }} {{ $data.PortNum }} - - {{- if $data.HopLimit }} - {{ "Hop:" | dim }} {{ $data.HopStart }}/{{ $data.HopLimit }} - {{- end }} - {{- if $data.DecodeError }} - {{ "Error:" | dim }} {{ $data.DecodeError | color "red" }} - {{- end }} - {{ "Payload:" | dim }} - {{ $data.Payload | table }} - - {{end -}} - - {{- if not .packet -}} - {{- filter . "level" "ts" "logger" "caller" "msg" | table -}} + {{- if .packet -}} + {{- $info := .packet.info -}} + {{- $data := .packet.data -}} + {{- if $info }} + {{ "Channel:" | dim }} {{ $info.channel | color "green" }}{{ " Topic:" | dim }} {{ $info.fullTopic | color "cyan" }} + {{- end }} + {{- if $data }} + {{ "From:" | dim }} {{ $data.from | mult 1 }}{{ " To:" | dim }} {{ $data.to | mult 1 }}{{ " Gateway:" | dim }} {{ $data.gatewayId | color "yellow" }} + {{ "Priority:" | dim }} {{ $data.priority }}{{ " PortNum:" | dim }} {{ $data.portNum }} + {{- if $data.hopLimit }}{{ " Hop:" | dim }} {{ $data.hopStart }}/{{ $data.hopLimit }}{{ end }} + {{- if $data.decodeError }} + {{ "Error:" | dim }} {{ $data.decodeError | color "red" }} + {{- end -}} + {{- if $data.textMessage }} + {{ "Text:" | dim }} {{ $data.textMessage }} + {{- else if $data.mapReport }} + {{ "Map Report:" | dim }} {{ $data.mapReport | table }} + {{- else if $data.position }} + {{ "Position:" | dim }} Lat: {{ $data.position.latitudeI | mult 0.0000001 }}, Long: {{ $data.position.longitudeI | mult 0.0000001 }} + {{- else if $data.nodeInfo }} + {{ "NodeInfo:" | dim }} {{ $data.nodeInfo.id }} {{ $data.nodeInfo.longName }} + {{- else if $data.telemetry }} + {{ "Telemetry:" | dim }} + {{- if $data.telemetry.deviceMetrics }} + {{ "Battery:" | dim }} {{ $data.telemetry.deviceMetrics.batteryLevel }}%, {{ $data.telemetry.deviceMetrics.voltage }}V + {{- end }} + {{- if $data.telemetry.environmentMetrics }} + {{ "Environment:" | dim }} Temp: {{ $data.telemetry.environmentMetrics.temperature }}°C, Humidity: {{ $data.telemetry.environmentMetrics.relativeHumidity }}% + {{- end }} + {{- else }} + {{ "Payload:" | dim }} + {{ filter $data "channelId" "gatewayId" "id" "from" "to" "hopLimit" "hopStart" "priority" "portNum" "decodeError" | table }} + {{- end }} + {{- else }} + {{ .packet | table }} + {{- end }} + {{- else if .nodeCounts}} + {{ .nodeCounts | table}} + {{- else if .portCounts}} + {{ .portCounts | table}} + {{- else}} + {{ filter . "level" "ts" "logger" "caller" "msg" | table }} {{- end -}} # Date formatting -date_format: "15:04:05.000" +date_format: "15:04:05" # Handle non-JSON lines handle_non_json: true diff --git a/mqtt/logger.go b/mqtt/logger.go index 4ebb6b6..6c82fdd 100644 --- a/mqtt/logger.go +++ b/mqtt/logger.go @@ -1,13 +1,15 @@ package mqtt import ( + "encoding/json" "fmt" "strings" "github.com/dpup/prefab/logging" + "google.golang.org/protobuf/encoding/protojson" - pb "meshstream/generated/meshtastic" meshtreampb "meshstream/generated/meshstream" + pb "meshstream/generated/meshtastic" ) // MessageLogger logs messages using the provided logger @@ -124,7 +126,31 @@ func (ml *MessageLogger) logMessage(packet *meshtreampb.Packet) { } ml.logger.Infow(briefSummary, fields...) } else { - ml.logger.Infow(briefSummary, "packet", packet) - } + // Convert the protobuf message to a structured map for logging + // Use protojson to properly handle all fields and nested messages + marshaler := protojson.MarshalOptions{ + EmitUnpopulated: false, + UseProtoNames: false, // Use camelCase names for consistency with other logging + } -} \ No newline at end of file + // Marshal the packet to JSON + packetJSON, err := marshaler.Marshal(packet) + if err != nil { + ml.logger.Warnw("Failed to marshal packet to JSON", "error", err) + ml.logger.Infow(briefSummary, "packet", packet) + return + } + + // Unmarshal back to a map for structured logging + var packetMap map[string]interface{} + err = json.Unmarshal(packetJSON, &packetMap) + if err != nil { + ml.logger.Warnw("Failed to unmarshal packet JSON to map", "error", err) + ml.logger.Infow(briefSummary, "packet", packet) + return + } + + // Log with the structured map + ml.logger.Infow(briefSummary, "packet", packetMap) + } +} diff --git a/mqtt/stats.go b/mqtt/stats.go index 9c4b247..f956a48 100644 --- a/mqtt/stats.go +++ b/mqtt/stats.go @@ -7,8 +7,8 @@ import ( "github.com/dpup/prefab/logging" - pb "meshstream/generated/meshtastic" meshtreampb "meshstream/generated/meshstream" + pb "meshstream/generated/meshtastic" ) // MessageStats tracks statistics about received messages @@ -89,21 +89,21 @@ func (s *MessageStats) PrintStats() { // Log the basic statistics with structured fields s.logger.Infow("Message Statistics Summary", - "total_messages", s.TotalMessages, - "messages_per_second", msgPerSec, - "duration_seconds", duration.Seconds(), + "totalMessages", s.TotalMessages, + "messagesPerSecond", msgPerSec, + "durationSeconds", duration.Seconds(), ) // Create maps for structured node and port stats nodeStats := make(map[string]int) for nodeID, count := range s.ByNode { - nodeStats[fmt.Sprintf("node_%d", nodeID)] = count + nodeStats[fmt.Sprintf("node.%d", nodeID)] = count } // Log node statistics with structured fields s.logger.Infow("Messages by Node", - "node_counts", nodeStats, - "active_nodes", len(s.ByNode), + "nodeCounts", nodeStats, + "activeNodes", len(s.ByNode), ) // Create maps for structured port stats @@ -114,8 +114,8 @@ func (s *MessageStats) PrintStats() { // Log port type statistics with structured fields s.logger.Infow("Messages by Port Type", - "port_counts", portStats, - "active_ports", len(s.ByPortType), + "portCounts", portStats, + "activePorts", len(s.ByPortType), ) // Reset counters for rate calculation @@ -123,4 +123,4 @@ func (s *MessageStats) PrintStats() { s.ByNode = make(map[uint32]int) s.ByPortType = make(map[pb.PortNum]int) s.LastStatsPrinted = now -} \ No newline at end of file +}