From 23deefe0612e4ae5e09200a64e2a589f51c1c509 Mon Sep 17 00:00:00 2001 From: Daniel Pupius Date: Fri, 2 May 2025 13:20:57 -0700 Subject: [PATCH] More fixes for CI/CD --- .github/workflows/cicd.yml | 3 -- .github/workflows/pr-validation.yml | 7 ++- decoder/decode_test.go | 60 +++++++++++++------------ mqtt/broker.go | 5 ++- mqtt/broker_test.go | 70 +++++++++++++++-------------- 5 files changed, 78 insertions(+), 67 deletions(-) diff --git a/.github/workflows/cicd.yml b/.github/workflows/cicd.yml index e1d7746..701b08d 100644 --- a/.github/workflows/cicd.yml +++ b/.github/workflows/cicd.yml @@ -23,9 +23,6 @@ jobs: go-version: ${{ env.GO_VERSION }} cache: true - - name: Cleanup any old data - run: make clean - - name: Install Protocol Buffers Compiler run: | sudo apt-get update diff --git a/.github/workflows/pr-validation.yml b/.github/workflows/pr-validation.yml index e5053a6..4135a39 100644 --- a/.github/workflows/pr-validation.yml +++ b/.github/workflows/pr-validation.yml @@ -25,7 +25,12 @@ jobs: run: | sudo apt-get update sudo apt-get install -y protobuf-compiler - + + - name: Install protoc-gen-go + run: | + go install google.golang.org/protobuf/cmd/protoc-gen-go@latest + echo "$(go env GOPATH)/bin" >> $GITHUB_PATH + - name: Generate Protocol Buffers run: make gen-proto diff --git a/decoder/decode_test.go b/decoder/decode_test.go index 87846d9..1ecf56b 100644 --- a/decoder/decode_test.go +++ b/decoder/decode_test.go @@ -8,6 +8,7 @@ import ( meshtreampb "meshstream/generated/meshstream" pb "meshstream/generated/meshtastic" + "google.golang.org/protobuf/proto" ) @@ -27,7 +28,8 @@ func TestManualDecode(t *testing.T) { if err != nil { fmt.Printf("Failed to unmarshal as MapReport: %v\n", err) } else { - fmt.Printf("Successfully decoded as MapReport: %+v\n", mapReport) + // Use the pointer to avoid copying mutex + fmt.Printf("Successfully decoded as MapReport: %v\n", mapReport.String()) } // Try decoding as ServiceEnvelope @@ -37,7 +39,8 @@ func TestManualDecode(t *testing.T) { if err != nil { fmt.Printf("Failed to unmarshal as ServiceEnvelope: %v\n", err) } else { - fmt.Printf("Successfully decoded as ServiceEnvelope: %+v\n", serviceEnvelope) + // Use the pointer to avoid copying mutex + fmt.Printf("Successfully decoded as ServiceEnvelope: %v\n", serviceEnvelope.String()) } // Try decoding as MeshPacket @@ -47,7 +50,8 @@ func TestManualDecode(t *testing.T) { if err != nil { fmt.Printf("Failed to unmarshal as MeshPacket: %v\n", err) } else { - fmt.Printf("Successfully decoded as MeshPacket: %+v\n", meshPacket) + // Use the pointer to avoid copying mutex + fmt.Printf("Successfully decoded as MeshPacket: %v\n", meshPacket.String()) } } @@ -60,41 +64,41 @@ func TestDecodeMapReport(t *testing.T) { // Try to decode as different message types testCases := []struct { - name string - unmarshal func([]byte, proto.Message) error - message proto.Message - shouldPass bool + name string + unmarshal func([]byte, proto.Message) error + message proto.Message + shouldPass bool }{ { - name: "MapReport", - unmarshal: proto.Unmarshal, - message: &pb.MapReport{}, - shouldPass: false, // Expected to fail based on our findings + name: "MapReport", + unmarshal: proto.Unmarshal, + message: &pb.MapReport{}, + shouldPass: false, // Expected to fail based on our findings }, { - name: "ServiceEnvelope", - unmarshal: proto.Unmarshal, - message: &pb.ServiceEnvelope{}, - shouldPass: true, // This should work + name: "ServiceEnvelope", + unmarshal: proto.Unmarshal, + message: &pb.ServiceEnvelope{}, + shouldPass: true, // This should work }, { - name: "MeshPacket", - unmarshal: proto.Unmarshal, - message: &pb.MeshPacket{}, - shouldPass: true, // This also works - but with unparsed fields + name: "MeshPacket", + unmarshal: proto.Unmarshal, + message: &pb.MeshPacket{}, + shouldPass: true, // This also works - but with unparsed fields }, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { err := tc.unmarshal(data, tc.message) - + if tc.shouldPass && err != nil { t.Errorf("Expected %s to parse successfully, but got error: %v", tc.name, err) } else if !tc.shouldPass && err == nil { - t.Errorf("Expected %s to fail parsing, but it succeeded: %+v", tc.name, tc.message) + t.Errorf("Expected %s to fail parsing, but it succeeded", tc.name) } - + if err == nil { // Do some additional validation if needed if tc.name == "ServiceEnvelope" { @@ -106,11 +110,11 @@ func TestDecodeMapReport(t *testing.T) { t.Errorf("Expected ServiceEnvelope to have a decoded packet") } if envelope.GetPacket().GetDecoded().GetPortnum() != pb.PortNum_MAP_REPORT_APP { - t.Errorf("Expected PortNum to be MAP_REPORT_APP, got %s", + t.Errorf("Expected PortNum to be MAP_REPORT_APP, got %s", envelope.GetPacket().GetDecoded().GetPortnum()) } } - + t.Logf("Successfully decoded as %s", tc.name) } else { t.Logf("Failed to decode as %s: %v", tc.name, err) @@ -157,18 +161,18 @@ func TestDecodeMessageWithMapPayload(t *testing.T) { // Format the output and check it contains expected components formattedOutput := FormatTopicAndPacket(topicInfo, decodedData) - + // Print out the formatted output to debug t.Logf("Formatted output: %s", formattedOutput) - + // Just verify the basic information is included if !strings.Contains(formattedOutput, "Format: map") { t.Error("Expected formatted output to contain 'Format: map'") } - + if !strings.Contains(formattedOutput, "Port:") { t.Error("Expected formatted output to contain 'Port:' in packet information") } t.Logf("Successfully decoded MAP format message") -} \ No newline at end of file +} diff --git a/mqtt/broker.go b/mqtt/broker.go index 7e3a5ea..0b99222 100644 --- a/mqtt/broker.go +++ b/mqtt/broker.go @@ -9,6 +9,9 @@ import ( "github.com/dpup/prefab/logging" ) +// Time to wait before giving up on sending cached packets +var cacheGracePeriod = 1500 * time.Millisecond + // CircularBuffer implements a fixed-size circular buffer for caching packets type CircularBuffer struct { buffer []*meshtreampb.Packet // Fixed size buffer to store packets @@ -128,7 +131,7 @@ func (b *Broker) Subscribe(bufferSize int) <-chan *meshtreampb.Packet { select { case subscriberChan <- packet: // Successfully sent packet - case <-time.After(1500 * time.Millisecond): + case <-time.After(cacheGracePeriod): // Give up after waiting some time to avoid blocking indefinitely b.logger.Warn("Timeout when sending cached packet to new subscriber") return diff --git a/mqtt/broker_test.go b/mqtt/broker_test.go index be4811d..39b834a 100644 --- a/mqtt/broker_test.go +++ b/mqtt/broker_test.go @@ -72,7 +72,7 @@ func TestCircularBuffer(t *testing.T) { func TestBrokerSubscribeUnsubscribe(t *testing.T) { // Create a test source channel sourceChan := make(chan *meshtreampb.Packet, 10) - + // Create a broker with the source channel testLogger := logging.NewDevLogger().Named("test") broker := NewBroker(sourceChan, 5, testLogger) @@ -86,20 +86,20 @@ func TestBrokerSubscribeUnsubscribe(t *testing.T) { broker.subscriberMutex.RLock() subscriberCount := len(broker.subscribers) broker.subscriberMutex.RUnlock() - + if subscriberCount != 2 { t.Errorf("Expected 2 subscribers, got %d", subscriberCount) } // We need to use sequential packets because our implementation is asynchronous // and exact packet matching may not work reliably - + // First packet with ID 1 packet1 := &meshtreampb.Packet{ Data: &meshtreampb.Data{Id: 1}, Info: &meshtreampb.TopicInfo{}, } - + // Send the packet sourceChan <- packet1 @@ -124,12 +124,12 @@ func TestBrokerSubscribeUnsubscribe(t *testing.T) { // Unsubscribe the first subscriber broker.Unsubscribe(subscriber1) - + // Verify the subscriber was removed broker.subscriberMutex.RLock() subscriberCount = len(broker.subscribers) broker.subscriberMutex.RUnlock() - + if subscriberCount != 1 { t.Errorf("Expected 1 subscriber after unsubscribe, got %d", subscriberCount) } @@ -139,7 +139,7 @@ func TestBrokerSubscribeUnsubscribe(t *testing.T) { Data: &meshtreampb.Data{Id: 2}, Info: &meshtreampb.TopicInfo{}, } - + // Send the second packet sourceChan <- packet2 @@ -158,7 +158,7 @@ func TestBrokerSubscribeUnsubscribe(t *testing.T) { func TestBrokerMultipleSubscribers(t *testing.T) { // Create a test source channel sourceChan := make(chan *meshtreampb.Packet, 10) - + // Create a broker with the source channel testLogger := logging.NewDevLogger().Named("test") broker := NewBroker(sourceChan, 5, testLogger) @@ -204,7 +204,7 @@ func TestBrokerMultipleSubscribers(t *testing.T) { func TestBrokerSlowSubscriber(t *testing.T) { // Create a test source channel sourceChan := make(chan *meshtreampb.Packet, 10) - + // Create a broker with the source channel testLogger := logging.NewDevLogger().Named("test") broker := NewBroker(sourceChan, 5, testLogger) @@ -212,15 +212,15 @@ func TestBrokerSlowSubscriber(t *testing.T) { // Create a slow subscriber with buffer size 1 slowSubscriber := broker.Subscribe(1) - + // And a normal subscriber normalSubscriber := broker.Subscribe(5) - + // Verify we have two subscribers broker.subscriberMutex.RLock() subscriberCount := len(broker.subscribers) broker.subscriberMutex.RUnlock() - + if subscriberCount != 2 { t.Errorf("Expected 2 subscribers, got %d", subscriberCount) } @@ -234,12 +234,12 @@ func TestBrokerSlowSubscriber(t *testing.T) { Data: &meshtreampb.Data{Id: 102}, Info: &meshtreampb.TopicInfo{}, } - + sourceChan <- testPacket1 - + // Give the broker time to distribute the first packet time.Sleep(10 * time.Millisecond) - + sourceChan <- testPacket2 // The normal subscriber should receive both packets @@ -276,19 +276,19 @@ func TestBrokerSlowSubscriber(t *testing.T) { func TestBrokerCloseWithSubscribers(t *testing.T) { // Create a test source channel sourceChan := make(chan *meshtreampb.Packet, 10) - + // Create a broker with the source channel testLogger := logging.NewDevLogger().Named("test") broker := NewBroker(sourceChan, 5, testLogger) // Subscribe to the broker subscriber := broker.Subscribe(5) - + // Verify we have one subscriber broker.subscriberMutex.RLock() subscriberCount := len(broker.subscribers) broker.subscriberMutex.RUnlock() - + if subscriberCount != 1 { t.Errorf("Expected 1 subscriber, got %d", subscriberCount) } @@ -312,7 +312,7 @@ func TestBrokerCloseWithSubscribers(t *testing.T) { func TestBrokerPacketCaching(t *testing.T) { // Create a test source channel sourceChan := make(chan *meshtreampb.Packet, 10) - + // Create a broker with a small cache size testLogger := logging.NewDevLogger().Named("test") broker := NewBroker(sourceChan, 3, testLogger) @@ -325,17 +325,17 @@ func TestBrokerPacketCaching(t *testing.T) { Info: &meshtreampb.TopicInfo{}, } sourceChan <- packet - + // Give the broker time to process the packet time.Sleep(10 * time.Millisecond) } // Create a subscriber after the packets were sent subscriber := broker.Subscribe(5) - + // The subscriber should receive all three cached packets receivedIds := make([]uint32, 0, 3) - + // We need to receive 3 packets for i := 0; i < 3; i++ { select { @@ -345,7 +345,7 @@ func TestBrokerPacketCaching(t *testing.T) { t.Errorf("Subscriber didn't receive cached packet %d within timeout", i+1) } } - + // Check that we received all packets in the correct order if len(receivedIds) != 3 { t.Errorf("Expected to receive 3 packets, got %d", len(receivedIds)) @@ -362,7 +362,7 @@ func TestBrokerPacketCaching(t *testing.T) { func TestBrokerCacheOverflow(t *testing.T) { // Create a test source channel sourceChan := make(chan *meshtreampb.Packet, 10) - + // Create a broker with a small cache size testLogger := logging.NewDevLogger().Named("test") cacheSize := 3 @@ -376,17 +376,17 @@ func TestBrokerCacheOverflow(t *testing.T) { Info: &meshtreampb.TopicInfo{}, } sourceChan <- packet - + // Give the broker time to process the packet time.Sleep(10 * time.Millisecond) } // Create a subscriber after the packets were sent subscriber := broker.Subscribe(5) - + // The subscriber should receive the last 3 packets (3, 4, 5) receivedIds := make([]uint32, 0, cacheSize) - + // We expect to receive exactly cacheSize packets for i := 0; i < cacheSize; i++ { select { @@ -396,7 +396,7 @@ func TestBrokerCacheOverflow(t *testing.T) { t.Errorf("Subscriber didn't receive cached packet %d within timeout", i+1) } } - + // Verify no more packets are coming select { case received := <-subscriber: @@ -404,7 +404,7 @@ func TestBrokerCacheOverflow(t *testing.T) { case <-time.After(50 * time.Millisecond): // This is expected, no more packets should be received } - + // Check that we received only the last 3 packets in the correct order expectedIds := []uint32{3, 4, 5} if len(receivedIds) != len(expectedIds) { @@ -420,9 +420,11 @@ func TestBrokerCacheOverflow(t *testing.T) { // TestSubscriberBufferFull tests the behavior when a subscriber's buffer is full func TestSubscriberBufferFull(t *testing.T) { + cacheGracePeriod = 300 + // Create a test source channel sourceChan := make(chan *meshtreampb.Packet, 10) - + // Create a broker with a cache size of 5 testLogger := logging.NewDevLogger().Named("test") broker := NewBroker(sourceChan, 5, testLogger) @@ -440,7 +442,7 @@ func TestSubscriberBufferFull(t *testing.T) { // Create a subscriber with a very small buffer size (1) smallSubscriber := broker.Subscribe(1) - + // The small subscriber should receive at least one cached packet // The others will be dropped because the buffer is full select { @@ -451,7 +453,7 @@ func TestSubscriberBufferFull(t *testing.T) { case <-time.After(100 * time.Millisecond): t.Error("Subscriber didn't receive any cached packet within timeout") } - + // Check that no more packets are immediately available // This is a bit tricky to test since we can't guarantee how many // packets were dropped due to the full buffer, but we can check @@ -473,7 +475,7 @@ func TestSubscriberBufferFull(t *testing.T) { Info: &meshtreampb.TopicInfo{}, } sourceChan <- newPacket - + // The subscriber should receive this packet if they read the first one select { case received := <-smallSubscriber: @@ -483,4 +485,4 @@ func TestSubscriberBufferFull(t *testing.T) { case <-time.After(100 * time.Millisecond): t.Error("Subscriber didn't receive new packet within timeout") } -} \ No newline at end of file +}