Update MQTT packet handling to use protobuf structures directly

- Refactored decoder to use protobuf-defined Packet, TopicInfo, and Data structures
- Updated MQTT client and broker to work with protobuf structures directly
- Improved logging to properly serialize protobuf messages for structured output
- Optimized .logista.yaml template for cleaner, more compact log display

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
Daniel Pupius
2025-04-22 10:30:39 -07:00
parent 0784ccb958
commit 2d61d369b0
3 changed files with 86 additions and 40 deletions

View File

@@ -1,41 +1,61 @@
# Meshstream Logista Configuration
# {"level":"info","ts":1745264398.907407,"logger":"main.server","caller":"meshstream/main.go:88","msg":"Starting server on localhost:8080"}
# {"level":"info","ts":1745264404.905592,"logger":"main.mqtt.MessageLogger","caller":"mqtt/subscriber.go:82","msg":"[NODEINFO_APP] Gateway:!fa74d32c Message type: NODEINFO_APP","packet":{"ChannelID":"MediumSlow","GatewayID":"!fa74d32c","ID":1512657627,"From":4201960236,"To":144028282,"HopLimit":4,"HopStart":0,"WantACK":false,"Priority":"BACKGROUND","ViaMQTT":false,"NextHop":0,"RelayNode":0,"PortNum":4,"Payload":{"id":"!fa74d32c","long_name":"MeshLager","short_name":"🍺","macaddr":"9BL6dNMs","hw_model":44},"RequestID":0,"ReplyID":0,"Emoji":0,"Dest":0,"Source":0,"WantResponse":true,"DecodeError":null,"FullTopic":"msh/US/bayarea/2/c/MediumSlow/!fa74d32c","RegionPath":"US/bayarea","Version":"2","Format":"c","Channel":"MediumSlow","UserID":"!fa74d32c"}}
# {"level":"info","ts":1745264405.2398899,"logger":"main.mqtt.MessageLogger","caller":"mqtt/subscriber.go:82","msg":"[TELEMETRY_APP] Gateway:!eb976cd5 Telemetry: device","packet":{"ChannelID":"MediumSlow","GatewayID":"!eb976cd5","ID":2022965751,"From":3149439480,"To":4294967295,"HopLimit":3,"HopStart":3,"WantACK":false,"Priority":"UNSET","ViaMQTT":false,"NextHop":0,"RelayNode":0,"PortNum":67,"Payload":{"time":1745264403,"Variant":{"DeviceMetrics":{"battery_level":100,"voltage":4.19,"channel_utilization":13.37,"air_util_tx":2.5811112,"uptime_seconds":771327}}},"RequestID":0,"ReplyID":0,"Emoji":0,"Dest":0,"Source":0,"WantResponse":false,"DecodeError":null,"FullTopic":"msh/US/bayarea/2/e/MediumSlow/!eb976cd5","RegionPath":"US/bayarea","Version":"2","Format":"e","Channel":"MediumSlow","UserID":"!eb976cd5"}}
# Updated log examples:
# map[caller:mqtt/subscriber.go:83 level:info logger:main.mqtt.MessageLogger msg:[NODEINFO_APP] Gateway:!4358bcb4 Message type: NODEINFO_APP packet:info:{full_topic:"msh/US/bayarea/2/e/LongFast/!4358bcb4" region_path:"US/bayarea" version:"2" format:"e" channel:"LongFast" user_id:"!4358bcb4"} data:{channel_id:"LongFast" gateway_id:"!4358bcb4" id:1496068057 from:4228900486 to:902833813 hop_limit:5 hop_start:5 priority:"UNSET" port_num:NODEINFO_APP node_info:{id:"!fc0fe686" long_name:"Meshtastic e686" short_name:"e686" macaddr:"\xc1.\xfc\x0f\xe6\x86" hw_model:TRACKER_T1000_E role:CLIENT_MUTE public_key:"\xb3\x82\x08\x0f\x1e\xbc\x1e *\xdc\xf6ZW\x0fナ\x14\xb3\x18Gk<7\x83\x86\xb7\xf5\xf2\xc5J\x01"} want_response:true} ts:1.745300044594027e+09]
format: |
{{- $timestamp := .ts | date | color "blue" -}}
{{- $level := .level | colorByLevel .level | bold -}}
{{- $logger := .logger | dim -}}
{{- $message := .msg -}}
{{- $message := .msg }}
{{$timestamp }} {{ $level }} [{{ $logger }}] {{ $message }}
{{if .packet -}}
{{- $data := .packet.data -}}
{{"Channel:" | dim }} {{ $data.ChannelID | color "green" }}
{{ "From:" | dim }} {{ $data.From | mult 1 }} {{ "To:" | dim }} {{ $data.To | mult 1 }}
{{ "Gateway:" | dim }} {{ $data.GatewayID | color "yellow" }}
{{ "Priority:" | dim }} {{ $data.Priority }}
{{ "PortNum:" | dim }} {{ $data.PortNum }}
{{- if $data.HopLimit }}
{{ "Hop:" | dim }} {{ $data.HopStart }}/{{ $data.HopLimit }}
{{- end }}
{{- if $data.DecodeError }}
{{ "Error:" | dim }} {{ $data.DecodeError | color "red" }}
{{- end }}
{{ "Payload:" | dim }}
{{ $data.Payload | table }}
{{end -}}
{{- if not .packet -}}
{{- filter . "level" "ts" "logger" "caller" "msg" | table -}}
{{- if .packet -}}
{{- $info := .packet.info -}}
{{- $data := .packet.data -}}
{{- if $info }}
{{ "Channel:" | dim }} {{ $info.channel | color "green" }}{{ " Topic:" | dim }} {{ $info.fullTopic | color "cyan" }}
{{- end }}
{{- if $data }}
{{ "From:" | dim }} {{ $data.from | mult 1 }}{{ " To:" | dim }} {{ $data.to | mult 1 }}{{ " Gateway:" | dim }} {{ $data.gatewayId | color "yellow" }}
{{ "Priority:" | dim }} {{ $data.priority }}{{ " PortNum:" | dim }} {{ $data.portNum }}
{{- if $data.hopLimit }}{{ " Hop:" | dim }} {{ $data.hopStart }}/{{ $data.hopLimit }}{{ end }}
{{- if $data.decodeError }}
{{ "Error:" | dim }} {{ $data.decodeError | color "red" }}
{{- end -}}
{{- if $data.textMessage }}
{{ "Text:" | dim }} {{ $data.textMessage }}
{{- else if $data.mapReport }}
{{ "Map Report:" | dim }} {{ $data.mapReport | table }}
{{- else if $data.position }}
{{ "Position:" | dim }} Lat: {{ $data.position.latitudeI | mult 0.0000001 }}, Long: {{ $data.position.longitudeI | mult 0.0000001 }}
{{- else if $data.nodeInfo }}
{{ "NodeInfo:" | dim }} {{ $data.nodeInfo.id }} {{ $data.nodeInfo.longName }}
{{- else if $data.telemetry }}
{{ "Telemetry:" | dim }}
{{- if $data.telemetry.deviceMetrics }}
{{ "Battery:" | dim }} {{ $data.telemetry.deviceMetrics.batteryLevel }}%, {{ $data.telemetry.deviceMetrics.voltage }}V
{{- end }}
{{- if $data.telemetry.environmentMetrics }}
{{ "Environment:" | dim }} Temp: {{ $data.telemetry.environmentMetrics.temperature }}°C, Humidity: {{ $data.telemetry.environmentMetrics.relativeHumidity }}%
{{- end }}
{{- else }}
{{ "Payload:" | dim }}
{{ filter $data "channelId" "gatewayId" "id" "from" "to" "hopLimit" "hopStart" "priority" "portNum" "decodeError" | table }}
{{- end }}
{{- else }}
{{ .packet | table }}
{{- end }}
{{- else if .nodeCounts}}
{{ .nodeCounts | table}}
{{- else if .portCounts}}
{{ .portCounts | table}}
{{- else}}
{{ filter . "level" "ts" "logger" "caller" "msg" | table }}
{{- end -}}
# Date formatting
date_format: "15:04:05.000"
date_format: "15:04:05"
# Handle non-JSON lines
handle_non_json: true

View File

@@ -1,13 +1,15 @@
package mqtt
import (
"encoding/json"
"fmt"
"strings"
"github.com/dpup/prefab/logging"
"google.golang.org/protobuf/encoding/protojson"
pb "meshstream/generated/meshtastic"
meshtreampb "meshstream/generated/meshstream"
pb "meshstream/generated/meshtastic"
)
// MessageLogger logs messages using the provided logger
@@ -124,7 +126,31 @@ func (ml *MessageLogger) logMessage(packet *meshtreampb.Packet) {
}
ml.logger.Infow(briefSummary, fields...)
} else {
ml.logger.Infow(briefSummary, "packet", packet)
}
// Convert the protobuf message to a structured map for logging
// Use protojson to properly handle all fields and nested messages
marshaler := protojson.MarshalOptions{
EmitUnpopulated: false,
UseProtoNames: false, // Use camelCase names for consistency with other logging
}
}
// Marshal the packet to JSON
packetJSON, err := marshaler.Marshal(packet)
if err != nil {
ml.logger.Warnw("Failed to marshal packet to JSON", "error", err)
ml.logger.Infow(briefSummary, "packet", packet)
return
}
// Unmarshal back to a map for structured logging
var packetMap map[string]interface{}
err = json.Unmarshal(packetJSON, &packetMap)
if err != nil {
ml.logger.Warnw("Failed to unmarshal packet JSON to map", "error", err)
ml.logger.Infow(briefSummary, "packet", packet)
return
}
// Log with the structured map
ml.logger.Infow(briefSummary, "packet", packetMap)
}
}

View File

@@ -7,8 +7,8 @@ import (
"github.com/dpup/prefab/logging"
pb "meshstream/generated/meshtastic"
meshtreampb "meshstream/generated/meshstream"
pb "meshstream/generated/meshtastic"
)
// MessageStats tracks statistics about received messages
@@ -89,21 +89,21 @@ func (s *MessageStats) PrintStats() {
// Log the basic statistics with structured fields
s.logger.Infow("Message Statistics Summary",
"total_messages", s.TotalMessages,
"messages_per_second", msgPerSec,
"duration_seconds", duration.Seconds(),
"totalMessages", s.TotalMessages,
"messagesPerSecond", msgPerSec,
"durationSeconds", duration.Seconds(),
)
// Create maps for structured node and port stats
nodeStats := make(map[string]int)
for nodeID, count := range s.ByNode {
nodeStats[fmt.Sprintf("node_%d", nodeID)] = count
nodeStats[fmt.Sprintf("node.%d", nodeID)] = count
}
// Log node statistics with structured fields
s.logger.Infow("Messages by Node",
"node_counts", nodeStats,
"active_nodes", len(s.ByNode),
"nodeCounts", nodeStats,
"activeNodes", len(s.ByNode),
)
// Create maps for structured port stats
@@ -114,8 +114,8 @@ func (s *MessageStats) PrintStats() {
// Log port type statistics with structured fields
s.logger.Infow("Messages by Port Type",
"port_counts", portStats,
"active_ports", len(s.ByPortType),
"portCounts", portStats,
"activePorts", len(s.ByPortType),
)
// Reset counters for rate calculation
@@ -123,4 +123,4 @@ func (s *MessageStats) PrintStats() {
s.ByNode = make(map[uint32]int)
s.ByPortType = make(map[pb.PortNum]int)
s.LastStatsPrinted = now
}
}