From 93c0e9e94fbae8315108f61e4ebbead261b5b2eb Mon Sep 17 00:00:00 2001 From: Daniel Pupius Date: Fri, 18 Apr 2025 17:48:21 -0700 Subject: [PATCH] Refactor decoder to use protocol buffers directly for ServiceEnvelope decoding --- Makefile | 2 +- decoder/decoder.go | 400 +++++++++++++++++---------------------------- main.go | 13 +- 3 files changed, 157 insertions(+), 258 deletions(-) diff --git a/Makefile b/Makefile index 279a4a4..8bddafa 100644 --- a/Makefile +++ b/Makefile @@ -23,7 +23,7 @@ gen-proto: tools -I./proto \ --go_out=./proto/generated \ --go_opt=paths=source_relative \ - ./proto/meshtastic/*.proto ./proto/nanopb.proto + ./proto/meshtastic/*.proto # Clean generated files clean: diff --git a/decoder/decoder.go b/decoder/decoder.go index b090006..55634cf 100644 --- a/decoder/decoder.go +++ b/decoder/decoder.go @@ -4,63 +4,38 @@ import ( "encoding/json" "fmt" "strings" - "unicode/utf8" + + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/encoding/protojson" + + pb "meshstream/proto/generated/meshtastic" ) -// PacketType represents the type of a Meshtastic packet -type PacketType string - -const ( - TypeJSON PacketType = "json" - TypeEncoded PacketType = "encoded" - TypeText PacketType = "text" -) - -// DecodedPacket contains information about a decoded packet -type DecodedPacket struct { - // Topic structure fields - Topic string +// TopicInfo contains parsed information about a Meshtastic MQTT topic +type TopicInfo struct { + FullTopic string RegionPath string Version string Format string Channel string UserID string - Type PacketType - - // JSON message fields - JSONData map[string]interface{} - FromNode string - ToNode string - Text string - Timestamp string - - // Encoded message fields - ChannelID string - GatewayID string - PacketID string - - // Raw data - RawData []byte } -// DecodePacket attempts to decode a packet from MQTT -func DecodePacket(topic string, payload []byte) (*DecodedPacket, error) { - packet := &DecodedPacket{ - Topic: topic, - RawData: payload, - JSONData: make(map[string]interface{}), +// ParseTopic parses a Meshtastic MQTT topic into its components +func ParseTopic(topic string) (*TopicInfo, error) { + info := &TopicInfo{ + FullTopic: topic, } - // Extract topic components // Format: msh/REGION_PATH/VERSION/FORMAT/CHANNELNAME/USERID // Example: msh/US/CA/Motherlode/2/e/LongFast/!abcd1234 // Example: msh/US/CA/Motherlode/2/json/LongFast/!abcd1234 parts := strings.Split(topic, "/") if len(parts) < 4 { - return packet, fmt.Errorf("invalid topic format: %s", topic) + return info, fmt.Errorf("invalid topic format: %s", topic) } - // Find protocol version and format indices by looking for "2" followed by "e", "c", or "json" + // Find protocol version and format indices by looking for "2" followed by "e" or "json" versionIndex := -1 formatIndex := -1 @@ -75,248 +50,171 @@ func DecodePacket(topic string, payload []byte) (*DecodedPacket, error) { if versionIndex == -1 || formatIndex >= len(parts) { // Could not find proper version/format markers - return packet, fmt.Errorf("invalid topic format, missing version/format: %s", topic) + return info, fmt.Errorf("invalid topic format, missing version/format: %s", topic) } // Extract region path (all segments between "msh" and version) if versionIndex > 1 { - packet.RegionPath = strings.Join(parts[1:versionIndex], "/") + info.RegionPath = strings.Join(parts[1:versionIndex], "/") } // Extract version and format - packet.Version = parts[versionIndex] - packet.Format = parts[formatIndex] + info.Version = parts[versionIndex] + info.Format = parts[formatIndex] - // Process based on format type + // Extract channel and user ID channelIndex := formatIndex + 1 userIdIndex := channelIndex + 1 if channelIndex < len(parts) { - packet.Channel = parts[channelIndex] + info.Channel = parts[channelIndex] } if userIdIndex < len(parts) { - packet.UserID = parts[userIdIndex] + info.UserID = parts[userIdIndex] } - // Process based on format type - if packet.Format == "e" { - // Encoded protobuf packet (using ServiceEnvelope) - packet.Type = TypeEncoded - - // For encoded packets, try to parse if the payload looks like a JSON ServiceEnvelope - // (Some gateways present ServiceEnvelope in JSON format) - if len(payload) > 0 && payload[0] == '{' { - var serviceEnvelope map[string]interface{} - if err := json.Unmarshal(payload, &serviceEnvelope); err == nil { - // Successfully parsed as JSON ServiceEnvelope - packet.JSONData = serviceEnvelope - - // Extract ServiceEnvelope metadata fields - if channelId, ok := serviceEnvelope["channel_id"].(string); ok { - packet.ChannelID = channelId - } - if gatewayId, ok := serviceEnvelope["gateway_id"].(string); ok { - packet.GatewayID = gatewayId - } - - // Try to extract data from the packet field - if packetData, ok := serviceEnvelope["packet"].(map[string]interface{}); ok { - if id, ok := packetData["id"].(float64); ok { - packet.PacketID = fmt.Sprintf("%d", int(id)) - } - if from, ok := packetData["from"].(float64); ok { - packet.FromNode = fmt.Sprintf("%d", int(from)) - } - if to, ok := packetData["to"].(float64); ok { - packet.ToNode = fmt.Sprintf("%d", int(to)) - } - - // Try to extract decoded payload if available - if decoded, ok := packetData["decoded"].(map[string]interface{}); ok { - if payload, ok := decoded["payload"].(string); ok { - packet.Text = payload - } - } - } - } - } - - // Note: For binary protocol buffer decoding we would need to use the generated protobuf code, - // but we'll defer that for now and just track that this is an encoded packet. - - } else if packet.Format == "json" { - // JSON format - packet.Type = TypeJSON - if err := json.Unmarshal(payload, &packet.JSONData); err != nil { - return packet, fmt.Errorf("failed to parse JSON: %v", err) - } - - // Extract common fields - if from, ok := packet.JSONData["from"].(string); ok { - packet.FromNode = from - } - if to, ok := packet.JSONData["to"].(string); ok { - packet.ToNode = to - } - if text, ok := packet.JSONData["payload"].(string); ok { - packet.Text = text - } - if ts, ok := packet.JSONData["timestamp"].(string); ok { - packet.Timestamp = ts - } - } else { - // Unknown format, try to infer from content - if len(payload) > 0 && payload[0] == '{' { - // Looks like JSON - packet.Type = TypeJSON - if err := json.Unmarshal(payload, &packet.JSONData); err == nil { - // Successfully parsed as JSON - - // Extract common fields - if from, ok := packet.JSONData["from"].(string); ok { - packet.FromNode = from - } - if to, ok := packet.JSONData["to"].(string); ok { - packet.ToNode = to - } - if text, ok := packet.JSONData["payload"].(string); ok { - packet.Text = text - } - if ts, ok := packet.JSONData["timestamp"].(string); ok { - packet.Timestamp = ts - } - } - } else if utf8.Valid(payload) && !containsBinaryData(payload) { - // Probably text - packet.Type = TypeText - packet.Text = string(payload) - } else { - // Encoded but not in JSON format - packet.Type = TypeEncoded - } - } - - return packet, nil + return info, nil } -// containsBinaryData does a simple check to see if a byte slice likely contains binary data -// by checking for control characters that aren't common in text -func containsBinaryData(data []byte) bool { - for _, b := range data { - // Skip common control characters - if b == '\n' || b == '\r' || b == '\t' { - continue - } - - // If we find a control character, it's probably binary data - if b < 32 || b > 126 { - return true - } +// DecodeEncodedMessage decodes a binary encoded message (format "e") +func DecodeEncodedMessage(payload []byte) (*pb.ServiceEnvelope, error) { + var serviceEnvelope pb.ServiceEnvelope + if err := proto.Unmarshal(payload, &serviceEnvelope); err != nil { + return nil, fmt.Errorf("failed to unmarshal ServiceEnvelope: %v", err) } - return false + return &serviceEnvelope, nil } -// FormatPacket formats a decoded packet for display -func FormatPacket(packet *DecodedPacket) string { +// DecodeJSONMessage decodes a JSON message (format "json") +func DecodeJSONMessage(payload []byte) (map[string]interface{}, error) { + var jsonData map[string]interface{} + if err := json.Unmarshal(payload, &jsonData); err != nil { + return nil, fmt.Errorf("failed to parse JSON: %v", err) + } + return jsonData, nil +} + +// FormatServiceEnvelope formats a ServiceEnvelope message into a human-readable string +func FormatServiceEnvelope(envelope *pb.ServiceEnvelope) string { var builder strings.Builder - builder.WriteString(fmt.Sprintf("Topic: %s\n", packet.Topic)) + builder.WriteString("ServiceEnvelope:\n") - // Show basic topic structure - builder.WriteString(fmt.Sprintf("Region Path: %s\n", packet.RegionPath)) - if packet.Version != "" { - builder.WriteString(fmt.Sprintf("Version: %s\n", packet.Version)) - } - if packet.Format != "" { - builder.WriteString(fmt.Sprintf("Format: %s\n", packet.Format)) - } - if packet.Channel != "" { - builder.WriteString(fmt.Sprintf("Channel: %s\n", packet.Channel)) - } - if packet.UserID != "" { - builder.WriteString(fmt.Sprintf("User ID: %s\n", packet.UserID)) + // Print basic envelope info + builder.WriteString(fmt.Sprintf(" Channel ID: %s\n", envelope.GetChannelId())) + builder.WriteString(fmt.Sprintf(" Gateway ID: %s\n", envelope.GetGatewayId())) + + // Print MeshPacket info if available + if packet := envelope.GetPacket(); packet != nil { + builder.WriteString("\nMeshPacket:\n") + builder.WriteString(fmt.Sprintf(" ID: %d\n", packet.GetId())) + builder.WriteString(fmt.Sprintf(" From: %d\n", packet.GetFrom())) + builder.WriteString(fmt.Sprintf(" To: %d\n", packet.GetTo())) + + // Try to output hop info + builder.WriteString(fmt.Sprintf(" Hop Limit: %d\n", packet.GetHopLimit())) + builder.WriteString(fmt.Sprintf(" Hop Start: %d\n", packet.GetHopStart())) + + // Determine payload type + if packet.GetDecoded() != nil { + data := packet.GetDecoded() + builder.WriteString(fmt.Sprintf(" Port Number: %s\n", data.GetPortnum())) + + // For text messages, print the text + if data.GetPortnum() == pb.PortNum_TEXT_MESSAGE_APP { + builder.WriteString(fmt.Sprintf(" Text Message: %s\n", string(data.GetPayload()))) + } else { + // For other message types, print the payload as hex + builder.WriteString(fmt.Sprintf(" Payload (%d bytes): %x\n", len(data.GetPayload()), data.GetPayload())) + } + } else if packet.GetEncrypted() != nil { + builder.WriteString(fmt.Sprintf(" Encrypted Payload (%d bytes): %x\n", len(packet.GetEncrypted()), packet.GetEncrypted())) + } } - switch packet.Type { - case TypeJSON: - builder.WriteString("Type: JSON\n") - if packet.FromNode != "" { - builder.WriteString(fmt.Sprintf("From: %s\n", packet.FromNode)) - } - if packet.ToNode != "" { - builder.WriteString(fmt.Sprintf("To: %s\n", packet.ToNode)) - } - if packet.Text != "" { - builder.WriteString(fmt.Sprintf("Text: %s\n", packet.Text)) - } - if packet.Timestamp != "" { - builder.WriteString(fmt.Sprintf("Timestamp: %s\n", packet.Timestamp)) - } - - // Format remaining JSON data - jsonBytes, _ := json.MarshalIndent(packet.JSONData, "", " ") - builder.WriteString(fmt.Sprintf("Data: %s\n", jsonBytes)) - - case TypeEncoded: - builder.WriteString("Type: Encoded (ServiceEnvelope)\n") - - // Display ServiceEnvelope metadata if available - if packet.ChannelID != "" { - builder.WriteString(fmt.Sprintf("Channel ID: %s\n", packet.ChannelID)) - } - if packet.GatewayID != "" { - builder.WriteString(fmt.Sprintf("Gateway ID: %s\n", packet.GatewayID)) - } - if packet.PacketID != "" { - builder.WriteString(fmt.Sprintf("Packet ID: %s\n", packet.PacketID)) - } - if packet.FromNode != "" { - builder.WriteString(fmt.Sprintf("From Node: %s\n", packet.FromNode)) - } - if packet.ToNode != "" { - builder.WriteString(fmt.Sprintf("To Node: %s\n", packet.ToNode)) - } - if packet.Text != "" { - builder.WriteString(fmt.Sprintf("Payload: %s\n", packet.Text)) - } - - // If we were able to parse as JSON, show the data - if len(packet.JSONData) > 0 { - jsonBytes, _ := json.MarshalIndent(packet.JSONData, "", " ") - builder.WriteString(fmt.Sprintf("Service Envelope: %s\n", jsonBytes)) + // Use protojson to generate a full JSON representation for debugging + marshaler := protojson.MarshalOptions{ + Multiline: true, + Indent: " ", + } + jsonBytes, err := marshaler.Marshal(envelope) + if err == nil { + builder.WriteString("\nFull Protobuf Structure:\n") + builder.WriteString(string(jsonBytes)) + } + + return builder.String() +} + +// FormatJSONMessage formats a JSON message into a human-readable string +func FormatJSONMessage(jsonData map[string]interface{}) string { + var builder strings.Builder + + builder.WriteString("JSON Message:\n") + + // Extract and display common fields + if from, ok := jsonData["from"].(string); ok { + builder.WriteString(fmt.Sprintf(" From: %s\n", from)) + } + if to, ok := jsonData["to"].(string); ok { + builder.WriteString(fmt.Sprintf(" To: %s\n", to)) + } + if message, ok := jsonData["payload"].(string); ok { + builder.WriteString(fmt.Sprintf(" Message: %s\n", message)) + } + if timestamp, ok := jsonData["timestamp"].(string); ok { + builder.WriteString(fmt.Sprintf(" Timestamp: %s\n", timestamp)) + } + + // Format the full JSON for reference + jsonBytes, err := json.MarshalIndent(jsonData, " ", " ") + if err == nil { + builder.WriteString("\nFull JSON Structure:\n ") + builder.WriteString(string(jsonBytes)) + } + + return builder.String() +} + +// FormatMessage formats a decoded message based on its format +func FormatMessage(topicInfo *TopicInfo, payload []byte) string { + var builder strings.Builder + + // Display topic information + builder.WriteString(fmt.Sprintf("Topic: %s\n", topicInfo.FullTopic)) + builder.WriteString(fmt.Sprintf("Region Path: %s\n", topicInfo.RegionPath)) + builder.WriteString(fmt.Sprintf("Version: %s\n", topicInfo.Version)) + builder.WriteString(fmt.Sprintf("Format: %s\n", topicInfo.Format)) + builder.WriteString(fmt.Sprintf("Channel: %s\n", topicInfo.Channel)) + if topicInfo.UserID != "" { + builder.WriteString(fmt.Sprintf("User ID: %s\n", topicInfo.UserID)) + } + builder.WriteString("\n") + + // Decode and format based on the format + if topicInfo.Format == "e" { + // Encoded protobuf message (ServiceEnvelope) + serviceEnvelope, err := DecodeEncodedMessage(payload) + if err != nil { + builder.WriteString(fmt.Sprintf("Error decoding encoded message: %v\n", err)) + builder.WriteString(fmt.Sprintf("Raw Binary (%d bytes): %x\n", len(payload), payload)) } else { - // Show raw binary data - builder.WriteString(fmt.Sprintf("Raw Data (%d bytes): %x\n", len(packet.RawData), packet.RawData)) + builder.WriteString(FormatServiceEnvelope(serviceEnvelope)) } - - case TypeText: - builder.WriteString("Type: Text\n") - builder.WriteString(fmt.Sprintf("Content: %s\n", packet.Text)) - - default: - // Debug case for unknown packet types - builder.WriteString(fmt.Sprintf("Type: UNKNOWN (%s)\n", packet.Type)) - builder.WriteString("---DEBUG INFO---\n") - builder.WriteString(fmt.Sprintf("Raw Payload (%d bytes): %x\n", len(packet.RawData), packet.RawData)) - - // Try to show as string if possible - if len(packet.RawData) > 0 { - builder.WriteString(fmt.Sprintf("As String: %s\n", string(packet.RawData))) - } - - // Topic parts - topicParts := strings.Split(packet.Topic, "/") - builder.WriteString("Topic Parts:\n") - for i, part := range topicParts { - builder.WriteString(fmt.Sprintf(" [%d]: %s\n", i, part)) - } - - // Show any JSON data if present - if len(packet.JSONData) > 0 { - jsonBytes, _ := json.MarshalIndent(packet.JSONData, "", " ") - builder.WriteString(fmt.Sprintf("JSON Data: %s\n", jsonBytes)) + } else if topicInfo.Format == "json" { + // JSON message + jsonData, err := DecodeJSONMessage(payload) + if err != nil { + builder.WriteString(fmt.Sprintf("Error decoding JSON message: %v\n", err)) + builder.WriteString(fmt.Sprintf("Raw Data: %s\n", string(payload))) + } else { + builder.WriteString(FormatJSONMessage(jsonData)) } + } else { + // Unknown format + builder.WriteString(fmt.Sprintf("Unsupported format: %s\n", topicInfo.Format)) + builder.WriteString(fmt.Sprintf("Raw Data (%d bytes): %x\n", len(payload), payload)) } return builder.String() diff --git a/main.go b/main.go index a33dd1b..af58a52 100644 --- a/main.go +++ b/main.go @@ -24,14 +24,15 @@ const ( var messagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) { fmt.Printf("Received message from topic: %s\n", msg.Topic()) - // Decode the packet - packet, err := decoder.DecodePacket(msg.Topic(), msg.Payload()) + // Parse the topic structure + topicInfo, err := decoder.ParseTopic(msg.Topic()) if err != nil { - fmt.Printf("Error decoding packet: %v\n", err) - fmt.Printf("Raw payload: %s\n", msg.Payload()) + fmt.Printf("Error parsing topic: %v\n", err) + fmt.Printf("Raw topic: %s\n", msg.Topic()) + fmt.Printf("Raw payload: %x\n", msg.Payload()) } else { - // Format and print the packet - formattedOutput := decoder.FormatPacket(packet) + // Format and print the message + formattedOutput := decoder.FormatMessage(topicInfo, msg.Payload()) fmt.Println(formattedOutput) }