diff --git a/decoder/decode_test.go b/decoder/decode_test.go index 95f244c..87846d9 100644 --- a/decoder/decode_test.go +++ b/decoder/decode_test.go @@ -6,7 +6,8 @@ import ( "strings" "testing" - pb "meshstream/proto/generated/meshtastic" + meshtreampb "meshstream/generated/meshstream" + pb "meshstream/generated/meshtastic" "google.golang.org/protobuf/proto" ) @@ -127,38 +128,35 @@ func TestDecodeMessageWithMapPayload(t *testing.T) { } // Create a topic info structure for a map topic - topicInfo := &TopicInfo{ + topicInfo := &meshtreampb.TopicInfo{ FullTopic: "msh/US/bayarea/2/map/LongFast/!1234abcd", RegionPath: "US/bayarea", Version: "2", Format: "map", Channel: "LongFast", - UserID: "!1234abcd", + UserId: "!1234abcd", } // Call the actual DecodeMessage function we want to test - decodedPacket := DecodeMessage(data, topicInfo) + decodedData := DecodeMessage(data, topicInfo) // Check that the decoding was successful - if decodedPacket.DecodeError != nil { - t.Errorf("Expected successful decoding, but got error: %v", decodedPacket.DecodeError) + if decodedData.DecodeError != "" { + t.Errorf("Expected successful decoding, but got error: %v", decodedData.DecodeError) } // Verify the decoded packet has the expected format - if decodedPacket.PortNum != pb.PortNum_MAP_REPORT_APP { - t.Errorf("Expected PortNum to be MAP_REPORT_APP, got %s", decodedPacket.PortNum) + if decodedData.PortNum != pb.PortNum_MAP_REPORT_APP { + t.Errorf("Expected PortNum to be MAP_REPORT_APP, got %s", decodedData.PortNum) } - // These fields are no longer used - // Only verify that key metadata was correctly extracted - // Verify that key metadata was correctly extracted - if decodedPacket.From == 0 { + if decodedData.From == 0 { t.Error("Expected From field to be non-zero") } // Format the output and check it contains expected components - formattedOutput := FormatTopicAndPacket(topicInfo, decodedPacket) + formattedOutput := FormatTopicAndPacket(topicInfo, decodedData) // Print out the formatted output to debug t.Logf("Formatted output: %s", formattedOutput) diff --git a/decoder/decoder.go b/decoder/decoder.go index 1ff8ff9..593544b 100644 --- a/decoder/decoder.go +++ b/decoder/decoder.go @@ -6,13 +6,13 @@ import ( "google.golang.org/protobuf/proto" - mesh "meshstream/proto/generated" - pb "meshstream/proto/generated/meshtastic" + meshtreampb "meshstream/generated/meshstream" + pb "meshstream/generated/meshtastic" ) // ParseTopic parses a Meshtastic MQTT topic into its components -func ParseTopic(topic string) (*mesh.TopicInfo, error) { - info := &mesh.TopicInfo{ +func ParseTopic(topic string) (*meshtreampb.TopicInfo, error) { + info := &meshtreampb.TopicInfo{ FullTopic: topic, } @@ -75,78 +75,78 @@ func DecodeEncodedMessage(payload []byte) (*pb.ServiceEnvelope, error) { return &serviceEnvelope, nil } -// DecodeMessage creates a DecodedPacket from a binary encoded message -func DecodeMessage(payload []byte, topicInfo *mesh.TopicInfo) *mesh.DecodedPacket { - decoded := &mesh.DecodedPacket{} +// DecodeMessage creates a Data object from a binary encoded message +func DecodeMessage(payload []byte, topicInfo *meshtreampb.TopicInfo) *meshtreampb.Data { + data := &meshtreampb.Data{} // First decode the envelope envelope, err := DecodeEncodedMessage(payload) if err != nil { - decoded.DecodeError = err.Error() - return decoded + data.DecodeError = err.Error() + return data } // Extract envelope fields - decoded.ChannelId = envelope.GetChannelId() - decoded.GatewayId = envelope.GetGatewayId() + data.ChannelId = envelope.GetChannelId() + data.GatewayId = envelope.GetGatewayId() // Extract mesh packet fields if available packet := envelope.GetPacket() if packet == nil { - decoded.DecodeError = "no mesh packet in envelope" - return decoded + data.DecodeError = "no mesh packet in envelope" + return data } // Extract mesh packet fields - decoded.Id = packet.GetId() - decoded.From = packet.GetFrom() - decoded.To = packet.GetTo() - decoded.HopLimit = packet.GetHopLimit() - decoded.HopStart = packet.GetHopStart() - decoded.WantAck = packet.GetWantAck() - decoded.Priority = packet.GetPriority().String() - decoded.ViaMqtt = packet.GetViaMqtt() - decoded.NextHop = packet.GetNextHop() - decoded.RelayNode = packet.GetRelayNode() + data.Id = packet.GetId() + data.From = packet.GetFrom() + data.To = packet.GetTo() + data.HopLimit = packet.GetHopLimit() + data.HopStart = packet.GetHopStart() + data.WantAck = packet.GetWantAck() + data.Priority = packet.GetPriority().String() + data.ViaMqtt = packet.GetViaMqtt() + data.NextHop = packet.GetNextHop() + data.RelayNode = packet.GetRelayNode() // Process the payload if packet.GetDecoded() != nil { // Packet has already been decoded - decodeDataPayload(decoded, packet.GetDecoded()) + decodeDataPayload(data, packet.GetDecoded()) } else if packet.GetEncrypted() != nil { // Packet is encrypted, try to decrypt it - decodeEncryptedPayload(decoded, packet.GetEncrypted(), envelope.GetChannelId(), packet.GetId(), packet.GetFrom()) + decodeEncryptedPayload(data, packet.GetEncrypted(), envelope.GetChannelId(), packet.GetId(), packet.GetFrom()) } else { - decoded.DecodeError = "packet has no payload" + data.DecodeError = "packet has no payload" } - return decoded + return data } // decodeDataPayload extracts information from a Data message -func decodeDataPayload(decoded *mesh.DecodedPacket, data *pb.Data) { +func decodeDataPayload(data *meshtreampb.Data, pbData *pb.Data) { // Extract data fields - decoded.PortNum = data.GetPortnum() - decoded.RequestId = data.GetRequestId() - decoded.ReplyId = data.GetReplyId() - decoded.Emoji = data.GetEmoji() - decoded.Dest = data.GetDest() - decoded.Source = data.GetSource() - decoded.WantResponse = data.GetWantResponse() + data.PortNum = pbData.GetPortnum() + data.RequestId = pbData.GetRequestId() + data.ReplyId = pbData.GetReplyId() + data.Emoji = pbData.GetEmoji() + data.Dest = pbData.GetDest() + data.Source = pbData.GetSource() + data.WantResponse = pbData.GetWantResponse() // Process the payload based on port type - payload := data.GetPayload() + payload := pbData.GetPayload() - switch data.GetPortnum() { + switch pbData.GetPortnum() { case pb.PortNum_TEXT_MESSAGE_APP: // Text message - store as string - decoded.Payload = &mesh.DecodedPacket_TextMessage{ + data.Payload = &meshtreampb.Data_TextMessage{ TextMessage: string(payload), } case pb.PortNum_TEXT_MESSAGE_COMPRESSED_APP: // Compressed text - store the raw bytes - decoded.Payload = &mesh.DecodedPacket_CompressedText{ + data.Payload = &meshtreampb.Data_CompressedText{ CompressedText: payload, } @@ -154,9 +154,9 @@ func decodeDataPayload(decoded *mesh.DecodedPacket, data *pb.Data) { // Position data var position pb.Position if err := proto.Unmarshal(payload, &position); err != nil { - decoded.DecodeError = fmt.Sprintf("failed to unmarshal Position data: %v", err) + data.DecodeError = fmt.Sprintf("failed to unmarshal Position data: %v", err) } else { - decoded.Payload = &mesh.DecodedPacket_Position{ + data.Payload = &meshtreampb.Data_Position{ Position: &position, } } @@ -165,9 +165,9 @@ func decodeDataPayload(decoded *mesh.DecodedPacket, data *pb.Data) { // Node information var user pb.User if err := proto.Unmarshal(payload, &user); err != nil { - decoded.DecodeError = fmt.Sprintf("failed to unmarshal User data: %v", err) + data.DecodeError = fmt.Sprintf("failed to unmarshal User data: %v", err) } else { - decoded.Payload = &mesh.DecodedPacket_NodeInfo{ + data.Payload = &meshtreampb.Data_NodeInfo{ NodeInfo: &user, } } @@ -176,9 +176,9 @@ func decodeDataPayload(decoded *mesh.DecodedPacket, data *pb.Data) { // Telemetry data var telemetry pb.Telemetry if err := proto.Unmarshal(payload, &telemetry); err != nil { - decoded.DecodeError = fmt.Sprintf("failed to unmarshal Telemetry data: %v", err) + data.DecodeError = fmt.Sprintf("failed to unmarshal Telemetry data: %v", err) } else { - decoded.Payload = &mesh.DecodedPacket_Telemetry{ + data.Payload = &meshtreampb.Data_Telemetry{ Telemetry: &telemetry, } } @@ -187,9 +187,9 @@ func decodeDataPayload(decoded *mesh.DecodedPacket, data *pb.Data) { // Waypoint data var waypoint pb.Waypoint if err := proto.Unmarshal(payload, &waypoint); err != nil { - decoded.DecodeError = fmt.Sprintf("failed to unmarshal Waypoint data: %v", err) + data.DecodeError = fmt.Sprintf("failed to unmarshal Waypoint data: %v", err) } else { - decoded.Payload = &mesh.DecodedPacket_Waypoint{ + data.Payload = &meshtreampb.Data_Waypoint{ Waypoint: &waypoint, } } @@ -198,9 +198,9 @@ func decodeDataPayload(decoded *mesh.DecodedPacket, data *pb.Data) { // Map report data var mapReport pb.MapReport if err := proto.Unmarshal(payload, &mapReport); err != nil { - decoded.DecodeError = fmt.Sprintf("failed to unmarshal MapReport data: %v", err) + data.DecodeError = fmt.Sprintf("failed to unmarshal MapReport data: %v", err) } else { - decoded.Payload = &mesh.DecodedPacket_MapReport{ + data.Payload = &meshtreampb.Data_MapReport{ MapReport: &mapReport, } } @@ -209,9 +209,9 @@ func decodeDataPayload(decoded *mesh.DecodedPacket, data *pb.Data) { // Traceroute data var routeDiscovery pb.RouteDiscovery if err := proto.Unmarshal(payload, &routeDiscovery); err != nil { - decoded.DecodeError = fmt.Sprintf("failed to unmarshal RouteDiscovery data: %v", err) + data.DecodeError = fmt.Sprintf("failed to unmarshal RouteDiscovery data: %v", err) } else { - decoded.Payload = &mesh.DecodedPacket_RouteDiscovery{ + data.Payload = &meshtreampb.Data_RouteDiscovery{ RouteDiscovery: &routeDiscovery, } } @@ -220,54 +220,98 @@ func decodeDataPayload(decoded *mesh.DecodedPacket, data *pb.Data) { // Neighbor information data var neighborInfo pb.NeighborInfo if err := proto.Unmarshal(payload, &neighborInfo); err != nil { - decoded.DecodeError = fmt.Sprintf("failed to unmarshal NeighborInfo data: %v", err) + data.DecodeError = fmt.Sprintf("failed to unmarshal NeighborInfo data: %v", err) } else { - decoded.Payload = &mesh.DecodedPacket_NeighborInfo{ + data.Payload = &meshtreampb.Data_NeighborInfo{ NeighborInfo: &neighborInfo, } } + case pb.PortNum_REMOTE_HARDWARE_APP: + // Remote hardware data + var hardware pb.HardwareMessage + if err := proto.Unmarshal(payload, &hardware); err != nil { + data.DecodeError = fmt.Sprintf("failed to unmarshal HardwareMessage data: %v", err) + } else { + data.Payload = &meshtreampb.Data_RemoteHardware{ + RemoteHardware: &hardware, + } + } + + case pb.PortNum_ROUTING_APP: + // Routing data + var routing pb.Routing + if err := proto.Unmarshal(payload, &routing); err != nil { + data.DecodeError = fmt.Sprintf("failed to unmarshal Routing data: %v", err) + } else { + data.Payload = &meshtreampb.Data_Routing{ + Routing: &routing, + } + } + + case pb.PortNum_ADMIN_APP: + // Admin data + var admin pb.AdminMessage + if err := proto.Unmarshal(payload, &admin); err != nil { + data.DecodeError = fmt.Sprintf("failed to unmarshal AdminMessage data: %v", err) + } else { + data.Payload = &meshtreampb.Data_Admin{ + Admin: &admin, + } + } + + case pb.PortNum_PAXCOUNTER_APP: + // Paxcount data + var paxcount pb.Paxcount + if err := proto.Unmarshal(payload, &paxcount); err != nil { + data.DecodeError = fmt.Sprintf("failed to unmarshal Paxcount data: %v", err) + } else { + data.Payload = &meshtreampb.Data_Paxcounter{ + Paxcounter: &paxcount, + } + } + default: // For other types, just store the raw bytes - decoded.Payload = &mesh.DecodedPacket_BinaryData{ + data.Payload = &meshtreampb.Data_BinaryData{ BinaryData: payload, } } } // decodeEncryptedPayload tries to decrypt and decode encrypted payloads -func decodeEncryptedPayload(decoded *mesh.DecodedPacket, encrypted []byte, channelId string, packetId, fromNode uint32) { +func decodeEncryptedPayload(data *meshtreampb.Data, encrypted []byte, channelId string, packetId, fromNode uint32) { // Attempt to decrypt the payload using the channel key if channelId == "" { - decoded.DecodeError = "encrypted packet has no channel ID" + data.DecodeError = "encrypted packet has no channel ID" return } channelKey := GetChannelKey(channelId) decrypted, err := XOR(encrypted, channelKey, packetId, fromNode) if err != nil { - decoded.DecodeError = fmt.Sprintf("failed to decrypt payload: %v", err) + data.DecodeError = fmt.Sprintf("failed to decrypt payload: %v", err) return } // Try to parse as a Data message - var data pb.Data - if err := proto.Unmarshal(decrypted, &data); err != nil { + var pbData pb.Data + if err := proto.Unmarshal(decrypted, &pbData); err != nil { // If we can't parse as Data, check if it's ASCII text if IsASCII(decrypted) { - decoded.PortNum = pb.PortNum_TEXT_MESSAGE_APP - decoded.Payload = &mesh.DecodedPacket_TextMessage{ + data.PortNum = pb.PortNum_TEXT_MESSAGE_APP + data.Payload = &meshtreampb.Data_TextMessage{ TextMessage: string(decrypted), } } else { - decoded.DecodeError = fmt.Sprintf("failed to parse decrypted data: %v", err) - decoded.Payload = &mesh.DecodedPacket_BinaryData{ + data.DecodeError = fmt.Sprintf("failed to parse decrypted data: %v", err) + data.Payload = &meshtreampb.Data_BinaryData{ BinaryData: decrypted, } } } else { // Successfully decoded the payload - decodeDataPayload(decoded, &data) + decodeDataPayload(data, &pbData) } } diff --git a/decoder/formatter.go b/decoder/formatter.go index 5653817..9bdb276 100644 --- a/decoder/formatter.go +++ b/decoder/formatter.go @@ -1,5 +1,87 @@ package decoder -// This file has been intentionally left empty. -// Removed formatter functions as they are no longer needed. -// We now use protobuf serialization via protojson for formatting. \ No newline at end of file +import ( + "fmt" + "strings" + + meshtreampb "meshstream/generated/meshstream" +) + +// FormatTopicAndPacket creates a human-readable representation of a topic and packet +// for debugging purposes. +func FormatTopicAndPacket(topic *meshtreampb.TopicInfo, data *meshtreampb.Data) string { + var sb strings.Builder + + // Topic information + sb.WriteString("===== Topic Info =====\n") + sb.WriteString(fmt.Sprintf("Full Topic: %s\n", topic.FullTopic)) + sb.WriteString(fmt.Sprintf("Region Path: %s\n", topic.RegionPath)) + sb.WriteString(fmt.Sprintf("Version: %s\n", topic.Version)) + sb.WriteString(fmt.Sprintf("Format: %s\n", topic.Format)) + sb.WriteString(fmt.Sprintf("Channel: %s\n", topic.Channel)) + sb.WriteString(fmt.Sprintf("User ID: %s\n", topic.UserId)) + + // Packet information + sb.WriteString("\n===== Packet Info =====\n") + + // Check if we have a decode error + if data.DecodeError != "" { + sb.WriteString(fmt.Sprintf("ERROR: %s\n", data.DecodeError)) + return sb.String() + } + + // Basic packet information + sb.WriteString(fmt.Sprintf("ID: %d\n", data.Id)) + sb.WriteString(fmt.Sprintf("From: %d\n", data.From)) + sb.WriteString(fmt.Sprintf("To: %d\n", data.To)) + sb.WriteString(fmt.Sprintf("Channel ID: %s\n", data.ChannelId)) + sb.WriteString(fmt.Sprintf("Gateway ID: %s\n", data.GatewayId)) + sb.WriteString(fmt.Sprintf("Port: %s\n", data.PortNum.String())) + sb.WriteString(fmt.Sprintf("Hop Limit: %d\n", data.HopLimit)) + sb.WriteString(fmt.Sprintf("Request ID: %d\n", data.RequestId)) + + // Payload type-specific information + sb.WriteString("\n===== Payload Info =====\n") + + switch data.Payload.(type) { + case *meshtreampb.Data_TextMessage: + sb.WriteString(fmt.Sprintf("Type: Text Message\nContent: %s\n", data.GetTextMessage())) + + case *meshtreampb.Data_Position: + pos := data.GetPosition() + lat := float64(pos.GetLatitudeI()) / 10000000.0 + lon := float64(pos.GetLongitudeI()) / 10000000.0 + sb.WriteString(fmt.Sprintf("Type: Position\nLatitude: %.6f\nLongitude: %.6f\nAltitude: %d\n", + lat, lon, pos.GetAltitude())) + + case *meshtreampb.Data_Telemetry: + telemetry := data.GetTelemetry() + sb.WriteString("Type: Telemetry\n") + if telemetry.GetEnvironmentMetrics() != nil { + env := telemetry.GetEnvironmentMetrics() + sb.WriteString(fmt.Sprintf("Environment: Temp %.1f°C, Rel Humidity %.1f%%\n", + env.GetTemperature(), env.GetRelativeHumidity())) + } + if telemetry.GetDeviceMetrics() != nil { + dev := telemetry.GetDeviceMetrics() + sb.WriteString(fmt.Sprintf("Device: Battery %d%%, Voltage %.1fV\n", + dev.GetBatteryLevel(), dev.GetVoltage())) + } + + case *meshtreampb.Data_NodeInfo: + user := data.GetNodeInfo() + sb.WriteString(fmt.Sprintf("Type: User Info\nID: %s\nLongName: %s\nShortName: %s\n", + user.GetId(), user.GetLongName(), user.GetShortName())) + + case *meshtreampb.Data_MapReport: + sb.WriteString(fmt.Sprintf("Type: Map Report\n")) + + case *meshtreampb.Data_BinaryData: + sb.WriteString(fmt.Sprintf("Type: Binary Data\nLength: %d bytes\n", len(data.GetBinaryData()))) + + default: + sb.WriteString("Type: Unknown\n") + } + + return sb.String() +} \ No newline at end of file diff --git a/mqtt/broker_test.go b/mqtt/broker_test.go index fd25920..76d6df0 100644 --- a/mqtt/broker_test.go +++ b/mqtt/broker_test.go @@ -7,7 +7,7 @@ import ( "github.com/dpup/prefab/logging" - "meshstream/decoder" + meshtreampb "meshstream/generated/meshstream" ) // TestBrokerSubscribeUnsubscribe tests the basic subscribe and unsubscribe functionality @@ -38,8 +38,10 @@ func TestBrokerSubscribeUnsubscribe(t *testing.T) { // First packet with ID 1 packet1 := &Packet{ - DecodedPacket: &decoder.DecodedPacket{ID: 1}, - TopicInfo: &decoder.TopicInfo{}, + Packet: &meshtreampb.Packet{ + Data: &meshtreampb.Data{Id: 1}, + Info: &meshtreampb.TopicInfo{}, + }, } // Send the packet @@ -48,8 +50,8 @@ func TestBrokerSubscribeUnsubscribe(t *testing.T) { // Both subscribers should receive the packet select { case received := <-subscriber1: - if received.ID != 1 { - t.Errorf("Expected subscriber1 to receive packet with ID 1, got %d", received.ID) + if received.Data.Id != 1 { + t.Errorf("Expected subscriber1 to receive packet with ID 1, got %d", received.Data.Id) } case <-time.After(100 * time.Millisecond): t.Error("subscriber1 didn't receive packet within timeout") @@ -57,8 +59,8 @@ func TestBrokerSubscribeUnsubscribe(t *testing.T) { select { case received := <-subscriber2: - if received.ID != 1 { - t.Errorf("Expected subscriber2 to receive packet with ID 1, got %d", received.ID) + if received.Data.Id != 1 { + t.Errorf("Expected subscriber2 to receive packet with ID 1, got %d", received.Data.Id) } case <-time.After(100 * time.Millisecond): t.Error("subscriber2 didn't receive packet within timeout") @@ -78,8 +80,10 @@ func TestBrokerSubscribeUnsubscribe(t *testing.T) { // Second packet with ID 2 packet2 := &Packet{ - DecodedPacket: &decoder.DecodedPacket{ID: 2}, - TopicInfo: &decoder.TopicInfo{}, + Packet: &meshtreampb.Packet{ + Data: &meshtreampb.Data{Id: 2}, + Info: &meshtreampb.TopicInfo{}, + }, } // Send the second packet @@ -88,8 +92,8 @@ func TestBrokerSubscribeUnsubscribe(t *testing.T) { // The second subscriber should receive the packet select { case received := <-subscriber2: - if received.ID != 2 { - t.Errorf("Expected subscriber2 to receive packet with ID 2, got %d", received.ID) + if received.Data.Id != 2 { + t.Errorf("Expected subscriber2 to receive packet with ID 2, got %d", received.Data.Id) } case <-time.After(100 * time.Millisecond): t.Error("subscriber2 didn't receive second packet within timeout") @@ -115,8 +119,10 @@ func TestBrokerMultipleSubscribers(t *testing.T) { // Send a test packet with ID 42 testPacket := &Packet{ - DecodedPacket: &decoder.DecodedPacket{ID: 42}, - TopicInfo: &decoder.TopicInfo{}, + Packet: &meshtreampb.Packet{ + Data: &meshtreampb.Data{Id: 42}, + Info: &meshtreampb.TopicInfo{}, + }, } sourceChan <- testPacket @@ -129,8 +135,8 @@ func TestBrokerMultipleSubscribers(t *testing.T) { defer wg.Done() select { case received := <-ch: - if received.ID != 42 { - t.Errorf("subscriber %d expected packet ID 42, got %d", idx, received.ID) + if received.Data.Id != 42 { + t.Errorf("subscriber %d expected packet ID 42, got %d", idx, received.Data.Id) } case <-time.After(100 * time.Millisecond): t.Errorf("subscriber %d didn't receive packet within timeout", idx) @@ -169,12 +175,16 @@ func TestBrokerSlowSubscriber(t *testing.T) { // Send two packets quickly to fill the slow subscriber's buffer testPacket1 := &Packet{ - DecodedPacket: &decoder.DecodedPacket{ID: 101}, - TopicInfo: &decoder.TopicInfo{}, + Packet: &meshtreampb.Packet{ + Data: &meshtreampb.Data{Id: 101}, + Info: &meshtreampb.TopicInfo{}, + }, } testPacket2 := &Packet{ - DecodedPacket: &decoder.DecodedPacket{ID: 102}, - TopicInfo: &decoder.TopicInfo{}, + Packet: &meshtreampb.Packet{ + Data: &meshtreampb.Data{Id: 102}, + Info: &meshtreampb.TopicInfo{}, + }, } sourceChan <- testPacket1 @@ -187,8 +197,8 @@ func TestBrokerSlowSubscriber(t *testing.T) { // The normal subscriber should receive both packets select { case received := <-normalSubscriber: - if received.ID != 101 { - t.Errorf("normalSubscriber expected packet ID 101, got %d", received.ID) + if received.Data.Id != 101 { + t.Errorf("normalSubscriber expected packet ID 101, got %d", received.Data.Id) } case <-time.After(100 * time.Millisecond): t.Error("normalSubscriber didn't receive first packet within timeout") @@ -196,8 +206,8 @@ func TestBrokerSlowSubscriber(t *testing.T) { select { case received := <-normalSubscriber: - if received.ID != 102 { - t.Errorf("normalSubscriber expected packet ID 102, got %d", received.ID) + if received.Data.Id != 102 { + t.Errorf("normalSubscriber expected packet ID 102, got %d", received.Data.Id) } case <-time.After(100 * time.Millisecond): t.Error("normalSubscriber didn't receive second packet within timeout") @@ -206,8 +216,8 @@ func TestBrokerSlowSubscriber(t *testing.T) { // The slow subscriber should receive at least the first packet select { case received := <-slowSubscriber: - if received.ID != 101 { - t.Errorf("slowSubscriber expected packet ID 101, got %d", received.ID) + if received.Data.Id != 101 { + t.Errorf("slowSubscriber expected packet ID 101, got %d", received.Data.Id) } case <-time.After(100 * time.Millisecond): t.Error("slowSubscriber didn't receive first packet within timeout") diff --git a/mqtt/client.go b/mqtt/client.go index 85bce47..9e4de5e 100644 --- a/mqtt/client.go +++ b/mqtt/client.go @@ -8,7 +8,6 @@ import ( mqtt "github.com/eclipse/paho.mqtt.golang" "meshstream/decoder" - mesh "meshstream/proto/generated" ) // Config holds configuration for the MQTT client @@ -99,10 +98,10 @@ func (c *Client) messageHandler(client mqtt.Client, msg mqtt.Message) { switch topicInfo.Format { case "e", "c", "map": // Binary encoded protobuf message - decodedPacket := decoder.DecodeMessage(msg.Payload(), topicInfo) + data := decoder.DecodeMessage(msg.Payload(), topicInfo) - // Create packet with both the decoded packet and topic info - packet := NewPacket(decodedPacket, topicInfo) + // Create packet with both the data and topic info + packet := NewPacket(data, topicInfo) // Send the decoded message to the channel, but don't block if buffer is full select { diff --git a/mqtt/logger.go b/mqtt/logger.go index 0fb0700..3cf8816 100644 --- a/mqtt/logger.go +++ b/mqtt/logger.go @@ -6,7 +6,7 @@ import ( "github.com/dpup/prefab/logging" - pb "meshstream/proto/generated/meshtastic" + pb "meshstream/generated/meshtastic" ) // MessageLogger logs messages using the provided logger @@ -44,23 +44,23 @@ func NewMessageLogger(broker *Broker, briefMode bool, logger logging.Logger) (*M func (ml *MessageLogger) getBriefSummary(packet *Packet) string { var summary string - if packet.DecodedPacket.DecodeError != nil { - return fmt.Sprintf("Error decoding packet: %v", packet.DecodedPacket.DecodeError) + if packet.Data.DecodeError != "" { + return fmt.Sprintf("Error decoding packet: %v", packet.Data.DecodeError) } // Create a basic summary based on the port type - switch packet.PortNum { + switch packet.Data.PortNum { case pb.PortNum_TEXT_MESSAGE_APP: // For text messages, include the text content - if text, ok := packet.Payload.(string); ok { - summary = fmt.Sprintf("Text message: %s", text) + if packet.Data.GetTextMessage() != "" { + summary = fmt.Sprintf("Text message: %s", packet.Data.GetTextMessage()) } else { summary = "Text message (invalid format)" } case pb.PortNum_POSITION_APP: // For position messages, include a compact location summary - if pos, ok := packet.Payload.(*pb.Position); ok { + if pos := packet.Data.GetPosition(); pos != nil { lat := float64(pos.GetLatitudeI()) / 10000000.0 lon := float64(pos.GetLongitudeI()) / 10000000.0 summary = fmt.Sprintf("Position: %.5f, %.5f", lat, lon) @@ -70,7 +70,7 @@ func (ml *MessageLogger) getBriefSummary(packet *Packet) string { case pb.PortNum_TELEMETRY_APP: // For telemetry, give a short summary of what's included - if telemetry, ok := packet.Payload.(*pb.Telemetry); ok { + if telemetry := packet.Data.GetTelemetry(); telemetry != nil { parts := []string{} if telemetry.GetEnvironmentMetrics() != nil { parts = append(parts, "environment") @@ -88,7 +88,7 @@ func (ml *MessageLogger) getBriefSummary(packet *Packet) string { default: // For other types, just mention the port type - summary = fmt.Sprintf("Message type: %s", packet.PortNum.String()) + summary = fmt.Sprintf("Message type: %s", packet.Data.PortNum.String()) } return summary @@ -100,30 +100,30 @@ func (ml *MessageLogger) logMessage(packet *Packet) { briefSummary := ml.getBriefSummary(packet) // Build the message prefix with type and GatewayID info for brief mode - typePrefix := fmt.Sprintf("[%s]", packet.PortNum.String()) - if packet.GatewayID != "" { - briefSummary = fmt.Sprintf("%s Gateway:%s %s", typePrefix, packet.GatewayID, briefSummary) + typePrefix := fmt.Sprintf("[%s]", packet.Data.PortNum.String()) + if packet.Data.GatewayId != "" { + briefSummary = fmt.Sprintf("%s Gateway:%s %s", typePrefix, packet.Data.GatewayId, briefSummary) } else { briefSummary = fmt.Sprintf("%s %s", typePrefix, briefSummary) } if ml.briefMode { fields := []interface{}{ - "portNum", packet.PortNum.String(), - "from", packet.From, - "to", packet.To, - "gateway", packet.GatewayID, - "channel", packet.TopicInfo.Channel, - "region", packet.TopicInfo.RegionPath, - "hopLimit", packet.HopLimit, - "id", packet.ID, + "portNum", packet.Data.PortNum.String(), + "from", packet.Data.From, + "to", packet.Data.To, + "gateway", packet.Data.GatewayId, + "channel", packet.Info.Channel, + "region", packet.Info.RegionPath, + "hopLimit", packet.Data.HopLimit, + "id", packet.Data.Id, } - if packet.DecodedPacket.DecodeError != nil { - fields = append(fields, "error", packet.DecodedPacket.DecodeError.Error()) + if packet.Data.DecodeError != "" { + fields = append(fields, "error", packet.Data.DecodeError) } ml.logger.Infow(briefSummary, fields...) } else { ml.logger.Infow(briefSummary, "packet", packet) } -} +} \ No newline at end of file diff --git a/mqtt/packet.go b/mqtt/packet.go index 6ad0598..bfd58e4 100644 --- a/mqtt/packet.go +++ b/mqtt/packet.go @@ -1,151 +1,17 @@ package mqtt import ( - mesh "meshstream/proto/generated" + meshtreampb "meshstream/generated/meshstream" ) -// Packet extends the DecodedPacket with MQTT topic information -type Packet struct { - *mesh.Packet +// Type alias for proto packet. +type Packet meshtreampb.Packet + +// NewPacket creates a Packet from a data packet and topic info +func NewPacket(data *meshtreampb.Data, topicInfo *meshtreampb.TopicInfo) *Packet { + p := Packet(meshtreampb.Packet{ + Data: data, + Info: topicInfo, + }) + return &p } - -// NewPacket creates a Packet from a decoded packet and topic info -func NewPacket(decoded *mesh.DecodedPacket, topicInfo *mesh.TopicInfo) *Packet { - return &Packet{ - Packet: &mesh.Packet{ - DecodedPacket: decoded, - TopicInfo: topicInfo, - }, - } -} - -// Helper accessors to maintain backward compatibility with existing code -func (p *Packet) GetChannelID() string { - if p.DecodedPacket != nil { - return p.DecodedPacket.ChannelId - } - return "" -} - -func (p *Packet) GetGatewayID() string { - if p.DecodedPacket != nil { - return p.DecodedPacket.GatewayId - } - return "" -} - -func (p *Packet) GetID() uint32 { - if p.DecodedPacket != nil { - return p.DecodedPacket.Id - } - return 0 -} - -func (p *Packet) GetFrom() uint32 { - if p.DecodedPacket != nil { - return p.DecodedPacket.From - } - return 0 -} - -func (p *Packet) GetTo() uint32 { - if p.DecodedPacket != nil { - return p.DecodedPacket.To - } - return 0 -} - -func (p *Packet) GetPortNum() int32 { - if p.DecodedPacket != nil { - return int32(p.DecodedPacket.PortNum) - } - return 0 -} - -func (p *Packet) GetPortNumString() string { - if p.DecodedPacket != nil { - return p.DecodedPacket.PortNum.String() - } - return "UNKNOWN" -} - -func (p *Packet) GetPayload() interface{} { - if p.DecodedPacket == nil { - return nil - } - - // Depending on the payload type, return the appropriate value - switch x := p.DecodedPacket.Payload.(type) { - case *mesh.DecodedPacket_TextMessage: - return x.TextMessage - case *mesh.DecodedPacket_BinaryData: - return x.BinaryData - case *mesh.DecodedPacket_Position: - return x.Position - case *mesh.DecodedPacket_NodeInfo: - return x.NodeInfo - case *mesh.DecodedPacket_Telemetry: - return x.Telemetry - case *mesh.DecodedPacket_Waypoint: - return x.Waypoint - case *mesh.DecodedPacket_RouteDiscovery: - return x.RouteDiscovery - case *mesh.DecodedPacket_NeighborInfo: - return x.NeighborInfo - case *mesh.DecodedPacket_CompressedText: - return x.CompressedText - case *mesh.DecodedPacket_MapReport: - return x.MapReport - default: - return nil - } -} - -func (p *Packet) GetHopLimit() uint32 { - if p.DecodedPacket != nil { - return p.DecodedPacket.HopLimit - } - return 0 -} - -func (p *Packet) GetHopStart() uint32 { - if p.DecodedPacket != nil { - return p.DecodedPacket.HopStart - } - return 0 -} - -func (p *Packet) HasDecodeError() bool { - return p.DecodedPacket != nil && p.DecodedPacket.DecodeError != "" -} - -func (p *Packet) GetDecodeError() string { - if p.DecodedPacket != nil { - return p.DecodedPacket.DecodeError - } - return "" -} - -// GetFullTopic returns the MQTT topic this packet was received on -func (p *Packet) GetFullTopic() string { - if p.TopicInfo != nil { - return p.TopicInfo.FullTopic - } - return "" -} - -// GetRegionPath returns the region path from the topic -func (p *Packet) GetRegionPath() string { - if p.TopicInfo != nil { - return p.TopicInfo.RegionPath - } - return "" -} - -// GetChannel returns the channel name from the topic -func (p *Packet) GetChannel() string { - if p.TopicInfo != nil { - return p.TopicInfo.Channel - } - return "" -} \ No newline at end of file diff --git a/mqtt/stats.go b/mqtt/stats.go index 195e7cf..88e49ac 100644 --- a/mqtt/stats.go +++ b/mqtt/stats.go @@ -7,7 +7,7 @@ import ( "github.com/dpup/prefab/logging" - pb "meshstream/proto/generated/meshtastic" + pb "meshstream/generated/meshtastic" ) // MessageStats tracks statistics about received messages @@ -71,10 +71,10 @@ func (s *MessageStats) recordMessage(packet *Packet) { s.TotalMessages++ // Count by source node - s.ByNode[packet.From]++ + s.ByNode[packet.Data.From]++ // Count by port type - s.ByPortType[packet.PortNum]++ + s.ByPortType[packet.Data.PortNum]++ } // PrintStats logs current statistics using the structured logger @@ -122,4 +122,4 @@ func (s *MessageStats) PrintStats() { s.ByNode = make(map[uint32]int) s.ByPortType = make(map[pb.PortNum]int) s.LastStatsPrinted = now -} +} \ No newline at end of file