More fixes for CI/CD

This commit is contained in:
Daniel Pupius
2025-05-02 13:20:57 -07:00
parent 5cae8fd5cf
commit 23deefe061
5 changed files with 78 additions and 67 deletions

View File

@@ -23,9 +23,6 @@ jobs:
go-version: ${{ env.GO_VERSION }}
cache: true
- name: Cleanup any old data
run: make clean
- name: Install Protocol Buffers Compiler
run: |
sudo apt-get update

View File

@@ -25,7 +25,12 @@ jobs:
run: |
sudo apt-get update
sudo apt-get install -y protobuf-compiler
- name: Install protoc-gen-go
run: |
go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
echo "$(go env GOPATH)/bin" >> $GITHUB_PATH
- name: Generate Protocol Buffers
run: make gen-proto

View File

@@ -8,6 +8,7 @@ import (
meshtreampb "meshstream/generated/meshstream"
pb "meshstream/generated/meshtastic"
"google.golang.org/protobuf/proto"
)
@@ -27,7 +28,8 @@ func TestManualDecode(t *testing.T) {
if err != nil {
fmt.Printf("Failed to unmarshal as MapReport: %v\n", err)
} else {
fmt.Printf("Successfully decoded as MapReport: %+v\n", mapReport)
// Use the pointer to avoid copying mutex
fmt.Printf("Successfully decoded as MapReport: %v\n", mapReport.String())
}
// Try decoding as ServiceEnvelope
@@ -37,7 +39,8 @@ func TestManualDecode(t *testing.T) {
if err != nil {
fmt.Printf("Failed to unmarshal as ServiceEnvelope: %v\n", err)
} else {
fmt.Printf("Successfully decoded as ServiceEnvelope: %+v\n", serviceEnvelope)
// Use the pointer to avoid copying mutex
fmt.Printf("Successfully decoded as ServiceEnvelope: %v\n", serviceEnvelope.String())
}
// Try decoding as MeshPacket
@@ -47,7 +50,8 @@ func TestManualDecode(t *testing.T) {
if err != nil {
fmt.Printf("Failed to unmarshal as MeshPacket: %v\n", err)
} else {
fmt.Printf("Successfully decoded as MeshPacket: %+v\n", meshPacket)
// Use the pointer to avoid copying mutex
fmt.Printf("Successfully decoded as MeshPacket: %v\n", meshPacket.String())
}
}
@@ -60,41 +64,41 @@ func TestDecodeMapReport(t *testing.T) {
// Try to decode as different message types
testCases := []struct {
name string
unmarshal func([]byte, proto.Message) error
message proto.Message
shouldPass bool
name string
unmarshal func([]byte, proto.Message) error
message proto.Message
shouldPass bool
}{
{
name: "MapReport",
unmarshal: proto.Unmarshal,
message: &pb.MapReport{},
shouldPass: false, // Expected to fail based on our findings
name: "MapReport",
unmarshal: proto.Unmarshal,
message: &pb.MapReport{},
shouldPass: false, // Expected to fail based on our findings
},
{
name: "ServiceEnvelope",
unmarshal: proto.Unmarshal,
message: &pb.ServiceEnvelope{},
shouldPass: true, // This should work
name: "ServiceEnvelope",
unmarshal: proto.Unmarshal,
message: &pb.ServiceEnvelope{},
shouldPass: true, // This should work
},
{
name: "MeshPacket",
unmarshal: proto.Unmarshal,
message: &pb.MeshPacket{},
shouldPass: true, // This also works - but with unparsed fields
name: "MeshPacket",
unmarshal: proto.Unmarshal,
message: &pb.MeshPacket{},
shouldPass: true, // This also works - but with unparsed fields
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
err := tc.unmarshal(data, tc.message)
if tc.shouldPass && err != nil {
t.Errorf("Expected %s to parse successfully, but got error: %v", tc.name, err)
} else if !tc.shouldPass && err == nil {
t.Errorf("Expected %s to fail parsing, but it succeeded: %+v", tc.name, tc.message)
t.Errorf("Expected %s to fail parsing, but it succeeded", tc.name)
}
if err == nil {
// Do some additional validation if needed
if tc.name == "ServiceEnvelope" {
@@ -106,11 +110,11 @@ func TestDecodeMapReport(t *testing.T) {
t.Errorf("Expected ServiceEnvelope to have a decoded packet")
}
if envelope.GetPacket().GetDecoded().GetPortnum() != pb.PortNum_MAP_REPORT_APP {
t.Errorf("Expected PortNum to be MAP_REPORT_APP, got %s",
t.Errorf("Expected PortNum to be MAP_REPORT_APP, got %s",
envelope.GetPacket().GetDecoded().GetPortnum())
}
}
t.Logf("Successfully decoded as %s", tc.name)
} else {
t.Logf("Failed to decode as %s: %v", tc.name, err)
@@ -157,18 +161,18 @@ func TestDecodeMessageWithMapPayload(t *testing.T) {
// Format the output and check it contains expected components
formattedOutput := FormatTopicAndPacket(topicInfo, decodedData)
// Print out the formatted output to debug
t.Logf("Formatted output: %s", formattedOutput)
// Just verify the basic information is included
if !strings.Contains(formattedOutput, "Format: map") {
t.Error("Expected formatted output to contain 'Format: map'")
}
if !strings.Contains(formattedOutput, "Port:") {
t.Error("Expected formatted output to contain 'Port:' in packet information")
}
t.Logf("Successfully decoded MAP format message")
}
}

View File

@@ -9,6 +9,9 @@ import (
"github.com/dpup/prefab/logging"
)
// Time to wait before giving up on sending cached packets
var cacheGracePeriod = 1500 * time.Millisecond
// CircularBuffer implements a fixed-size circular buffer for caching packets
type CircularBuffer struct {
buffer []*meshtreampb.Packet // Fixed size buffer to store packets
@@ -128,7 +131,7 @@ func (b *Broker) Subscribe(bufferSize int) <-chan *meshtreampb.Packet {
select {
case subscriberChan <- packet:
// Successfully sent packet
case <-time.After(1500 * time.Millisecond):
case <-time.After(cacheGracePeriod):
// Give up after waiting some time to avoid blocking indefinitely
b.logger.Warn("Timeout when sending cached packet to new subscriber")
return

View File

@@ -72,7 +72,7 @@ func TestCircularBuffer(t *testing.T) {
func TestBrokerSubscribeUnsubscribe(t *testing.T) {
// Create a test source channel
sourceChan := make(chan *meshtreampb.Packet, 10)
// Create a broker with the source channel
testLogger := logging.NewDevLogger().Named("test")
broker := NewBroker(sourceChan, 5, testLogger)
@@ -86,20 +86,20 @@ func TestBrokerSubscribeUnsubscribe(t *testing.T) {
broker.subscriberMutex.RLock()
subscriberCount := len(broker.subscribers)
broker.subscriberMutex.RUnlock()
if subscriberCount != 2 {
t.Errorf("Expected 2 subscribers, got %d", subscriberCount)
}
// We need to use sequential packets because our implementation is asynchronous
// and exact packet matching may not work reliably
// First packet with ID 1
packet1 := &meshtreampb.Packet{
Data: &meshtreampb.Data{Id: 1},
Info: &meshtreampb.TopicInfo{},
}
// Send the packet
sourceChan <- packet1
@@ -124,12 +124,12 @@ func TestBrokerSubscribeUnsubscribe(t *testing.T) {
// Unsubscribe the first subscriber
broker.Unsubscribe(subscriber1)
// Verify the subscriber was removed
broker.subscriberMutex.RLock()
subscriberCount = len(broker.subscribers)
broker.subscriberMutex.RUnlock()
if subscriberCount != 1 {
t.Errorf("Expected 1 subscriber after unsubscribe, got %d", subscriberCount)
}
@@ -139,7 +139,7 @@ func TestBrokerSubscribeUnsubscribe(t *testing.T) {
Data: &meshtreampb.Data{Id: 2},
Info: &meshtreampb.TopicInfo{},
}
// Send the second packet
sourceChan <- packet2
@@ -158,7 +158,7 @@ func TestBrokerSubscribeUnsubscribe(t *testing.T) {
func TestBrokerMultipleSubscribers(t *testing.T) {
// Create a test source channel
sourceChan := make(chan *meshtreampb.Packet, 10)
// Create a broker with the source channel
testLogger := logging.NewDevLogger().Named("test")
broker := NewBroker(sourceChan, 5, testLogger)
@@ -204,7 +204,7 @@ func TestBrokerMultipleSubscribers(t *testing.T) {
func TestBrokerSlowSubscriber(t *testing.T) {
// Create a test source channel
sourceChan := make(chan *meshtreampb.Packet, 10)
// Create a broker with the source channel
testLogger := logging.NewDevLogger().Named("test")
broker := NewBroker(sourceChan, 5, testLogger)
@@ -212,15 +212,15 @@ func TestBrokerSlowSubscriber(t *testing.T) {
// Create a slow subscriber with buffer size 1
slowSubscriber := broker.Subscribe(1)
// And a normal subscriber
normalSubscriber := broker.Subscribe(5)
// Verify we have two subscribers
broker.subscriberMutex.RLock()
subscriberCount := len(broker.subscribers)
broker.subscriberMutex.RUnlock()
if subscriberCount != 2 {
t.Errorf("Expected 2 subscribers, got %d", subscriberCount)
}
@@ -234,12 +234,12 @@ func TestBrokerSlowSubscriber(t *testing.T) {
Data: &meshtreampb.Data{Id: 102},
Info: &meshtreampb.TopicInfo{},
}
sourceChan <- testPacket1
// Give the broker time to distribute the first packet
time.Sleep(10 * time.Millisecond)
sourceChan <- testPacket2
// The normal subscriber should receive both packets
@@ -276,19 +276,19 @@ func TestBrokerSlowSubscriber(t *testing.T) {
func TestBrokerCloseWithSubscribers(t *testing.T) {
// Create a test source channel
sourceChan := make(chan *meshtreampb.Packet, 10)
// Create a broker with the source channel
testLogger := logging.NewDevLogger().Named("test")
broker := NewBroker(sourceChan, 5, testLogger)
// Subscribe to the broker
subscriber := broker.Subscribe(5)
// Verify we have one subscriber
broker.subscriberMutex.RLock()
subscriberCount := len(broker.subscribers)
broker.subscriberMutex.RUnlock()
if subscriberCount != 1 {
t.Errorf("Expected 1 subscriber, got %d", subscriberCount)
}
@@ -312,7 +312,7 @@ func TestBrokerCloseWithSubscribers(t *testing.T) {
func TestBrokerPacketCaching(t *testing.T) {
// Create a test source channel
sourceChan := make(chan *meshtreampb.Packet, 10)
// Create a broker with a small cache size
testLogger := logging.NewDevLogger().Named("test")
broker := NewBroker(sourceChan, 3, testLogger)
@@ -325,17 +325,17 @@ func TestBrokerPacketCaching(t *testing.T) {
Info: &meshtreampb.TopicInfo{},
}
sourceChan <- packet
// Give the broker time to process the packet
time.Sleep(10 * time.Millisecond)
}
// Create a subscriber after the packets were sent
subscriber := broker.Subscribe(5)
// The subscriber should receive all three cached packets
receivedIds := make([]uint32, 0, 3)
// We need to receive 3 packets
for i := 0; i < 3; i++ {
select {
@@ -345,7 +345,7 @@ func TestBrokerPacketCaching(t *testing.T) {
t.Errorf("Subscriber didn't receive cached packet %d within timeout", i+1)
}
}
// Check that we received all packets in the correct order
if len(receivedIds) != 3 {
t.Errorf("Expected to receive 3 packets, got %d", len(receivedIds))
@@ -362,7 +362,7 @@ func TestBrokerPacketCaching(t *testing.T) {
func TestBrokerCacheOverflow(t *testing.T) {
// Create a test source channel
sourceChan := make(chan *meshtreampb.Packet, 10)
// Create a broker with a small cache size
testLogger := logging.NewDevLogger().Named("test")
cacheSize := 3
@@ -376,17 +376,17 @@ func TestBrokerCacheOverflow(t *testing.T) {
Info: &meshtreampb.TopicInfo{},
}
sourceChan <- packet
// Give the broker time to process the packet
time.Sleep(10 * time.Millisecond)
}
// Create a subscriber after the packets were sent
subscriber := broker.Subscribe(5)
// The subscriber should receive the last 3 packets (3, 4, 5)
receivedIds := make([]uint32, 0, cacheSize)
// We expect to receive exactly cacheSize packets
for i := 0; i < cacheSize; i++ {
select {
@@ -396,7 +396,7 @@ func TestBrokerCacheOverflow(t *testing.T) {
t.Errorf("Subscriber didn't receive cached packet %d within timeout", i+1)
}
}
// Verify no more packets are coming
select {
case received := <-subscriber:
@@ -404,7 +404,7 @@ func TestBrokerCacheOverflow(t *testing.T) {
case <-time.After(50 * time.Millisecond):
// This is expected, no more packets should be received
}
// Check that we received only the last 3 packets in the correct order
expectedIds := []uint32{3, 4, 5}
if len(receivedIds) != len(expectedIds) {
@@ -420,9 +420,11 @@ func TestBrokerCacheOverflow(t *testing.T) {
// TestSubscriberBufferFull tests the behavior when a subscriber's buffer is full
func TestSubscriberBufferFull(t *testing.T) {
cacheGracePeriod = 300
// Create a test source channel
sourceChan := make(chan *meshtreampb.Packet, 10)
// Create a broker with a cache size of 5
testLogger := logging.NewDevLogger().Named("test")
broker := NewBroker(sourceChan, 5, testLogger)
@@ -440,7 +442,7 @@ func TestSubscriberBufferFull(t *testing.T) {
// Create a subscriber with a very small buffer size (1)
smallSubscriber := broker.Subscribe(1)
// The small subscriber should receive at least one cached packet
// The others will be dropped because the buffer is full
select {
@@ -451,7 +453,7 @@ func TestSubscriberBufferFull(t *testing.T) {
case <-time.After(100 * time.Millisecond):
t.Error("Subscriber didn't receive any cached packet within timeout")
}
// Check that no more packets are immediately available
// This is a bit tricky to test since we can't guarantee how many
// packets were dropped due to the full buffer, but we can check
@@ -473,7 +475,7 @@ func TestSubscriberBufferFull(t *testing.T) {
Info: &meshtreampb.TopicInfo{},
}
sourceChan <- newPacket
// The subscriber should receive this packet if they read the first one
select {
case received := <-smallSubscriber:
@@ -483,4 +485,4 @@ func TestSubscriberBufferFull(t *testing.T) {
case <-time.After(100 * time.Millisecond):
t.Error("Subscriber didn't receive new packet within timeout")
}
}
}