diff --git a/main.go b/main.go index be47493..6d2fb88 100644 --- a/main.go +++ b/main.go @@ -62,9 +62,9 @@ func main() { // Main event loop for { select { - case msg := <-messagesChan: + case packet := <-messagesChan: // Format and print the decoded message - formattedOutput := decoder.FormatTopicAndPacket(msg.TopicInfo, msg.DecodedPacket) + formattedOutput := decoder.FormatTopicAndPacket(packet.TopicInfo, packet.DecodedPacket) fmt.Println(formattedOutput) fmt.Println(strings.Repeat("-", 80)) diff --git a/mqtt/client.go b/mqtt/client.go index 2f207cb..62a0e7a 100644 --- a/mqtt/client.go +++ b/mqtt/client.go @@ -23,21 +23,15 @@ type Config struct { type Client struct { config Config client mqtt.Client - decodedMessages chan DecodedMessage + decodedMessages chan *Packet done chan struct{} } -// DecodedMessage contains a decoded packet and its topic info -type DecodedMessage struct { - TopicInfo *decoder.TopicInfo - DecodedPacket *decoder.DecodedPacket -} - // NewClient creates a new MQTT client with the provided configuration func NewClient(config Config) *Client { return &Client{ config: config, - decodedMessages: make(chan DecodedMessage, 100), // Buffer up to 100 messages + decodedMessages: make(chan *Packet, 100), // Buffer up to 100 messages done: make(chan struct{}), } } @@ -79,7 +73,7 @@ func (c *Client) Disconnect() { // Messages returns a channel of decoded messages // The consumer should read from this channel to receive decoded messages -func (c *Client) Messages() <-chan DecodedMessage { +func (c *Client) Messages() <-chan *Packet { return c.decodedMessages } @@ -103,9 +97,15 @@ func (c *Client) messageHandler(client mqtt.Client, msg mqtt.Message) { // Binary encoded protobuf message decodedPacket := decoder.DecodeMessage(msg.Payload(), topicInfo) + // Create packet with both the decoded packet and topic info + packet := &Packet{ + DecodedPacket: decodedPacket, + TopicInfo: topicInfo, + } + // Send the decoded message to the channel, but don't block if buffer is full select { - case c.decodedMessages <- DecodedMessage{TopicInfo: topicInfo, DecodedPacket: decodedPacket}: + case c.decodedMessages <- packet: // Message sent successfully case <-c.done: // Client is shutting down diff --git a/mqtt/client_test.go b/mqtt/client_test.go index 1204ef6..248dcdf 100644 --- a/mqtt/client_test.go +++ b/mqtt/client_test.go @@ -56,7 +56,7 @@ func TestMessagesChannel(t *testing.T) { // Test we can read from the channel go func() { - msg := DecodedMessage{} + msg := &Packet{} client.decodedMessages <- msg }() diff --git a/mqtt/packet.go b/mqtt/packet.go new file mode 100644 index 0000000..251a342 --- /dev/null +++ b/mqtt/packet.go @@ -0,0 +1,11 @@ +package mqtt + +import ( + "meshstream/decoder" +) + +// Packet extends the DecodedPacket with MQTT topic information +type Packet struct { + *decoder.DecodedPacket + *decoder.TopicInfo +} \ No newline at end of file