Files
meshstream/mqtt/broker_test.go
2026-03-16 10:47:50 -07:00

668 lines
20 KiB
Go

package mqtt
import (
"os"
"sync"
"testing"
"time"
"github.com/dpup/prefab/logging"
meshtreampb "meshstream/generated/meshstream"
pb "meshstream/generated/meshtastic"
)
// TestMain disables age protection globally so pressure tests run without
// needing to advance mock clocks. Individual tests that specifically cover
// age-based behaviour set minEvictAge themselves.
func TestMain(m *testing.M) {
minEvictAge = 0
os.Exit(m.Run())
}
// pkt is a test helper that builds a Packet with the given ID, from-node, and port.
func pkt(id, from uint32, port pb.PortNum) *meshtreampb.Packet {
return &meshtreampb.Packet{
Data: &meshtreampb.Data{Id: id, From: from, PortNum: port},
Info: &meshtreampb.TopicInfo{},
}
}
// routerNodeInfoPkt builds a NODEINFO_APP packet whose User.Role is ROUTER,
// causing the cache to mark the sender as a router node.
func routerNodeInfoPkt(id, from uint32) *meshtreampb.Packet {
return &meshtreampb.Packet{
Data: &meshtreampb.Data{
Id: id,
From: from,
PortNum: pb.PortNum_NODEINFO_APP,
Payload: &meshtreampb.Data_NodeInfo{
NodeInfo: &pb.User{Role: pb.Config_DeviceConfig_ROUTER},
},
},
Info: &meshtreampb.TopicInfo{},
}
}
// ── NodeAwareCache unit tests ──────────────────────────────────────────────────
func TestNodeAwareCacheEmpty(t *testing.T) {
c := NewNodeAwareCache(100, time.Hour)
if got := c.GetAll(); len(got) != 0 {
t.Errorf("expected empty, got %d packets", len(got))
}
}
func TestNodeAwareCacheBasicOrdering(t *testing.T) {
c := NewNodeAwareCache(100, time.Hour)
c.Add(pkt(1, 1, pb.PortNum_NODEINFO_APP))
c.Add(pkt(2, 2, pb.PortNum_NODEINFO_APP))
c.Add(pkt(3, 3, pb.PortNum_NODEINFO_APP))
got := c.GetAll()
if len(got) != 3 {
t.Fatalf("expected 3, got %d", len(got))
}
for i, p := range got {
if p.Data.Id != uint32(i+1) {
t.Errorf("pos %d: want ID %d, got %d", i, i+1, p.Data.Id)
}
}
}
// TestPressureEvictsOldestOfLowestPriority verifies that under cap pressure, the
// oldest packet of the lowest-priority type is evicted while higher-priority
// types are preserved.
func TestPressureEvictsOldestOfLowestPriority(t *testing.T) {
c := NewNodeAwareCache(3, time.Hour)
c.Add(pkt(1, 1, pb.PortNum_NODEINFO_APP)) // priority 2
c.Add(pkt(2, 1, pb.PortNum_NEIGHBORINFO_APP)) // priority 4
c.Add(pkt(3, 1, pb.PortNum_NODEINFO_APP)) // priority 2 — at cap
c.Add(pkt(4, 1, pb.PortNum_NODEINFO_APP)) // pressure: evicts oldest pri=2 (ID 1)
got := c.GetAll()
if len(got) != 3 {
t.Fatalf("expected 3 (global cap), got %d: %v", len(got), ids(got))
}
wantIDs := []uint32{2, 3, 4}
for i, p := range got {
if p.Data.Id != wantIDs[i] {
t.Errorf("pos %d: want ID %d, got %d", i, wantIDs[i], p.Data.Id)
}
}
}
// TestRareTypeOutlastsFrequentType is the core scenario: a node sends many
// node-info packets and one neighbor-info packet. Under cache pressure, the
// neighbor-info (higher priority) should survive while old node-infos are evicted.
func TestRareTypeOutlastsFrequentType(t *testing.T) {
// Cap=5 creates pressure when we add 13 packets.
c := NewNodeAwareCache(5, time.Hour)
neighborID := uint32(50)
c.Add(pkt(neighborID, 1, pb.PortNum_NEIGHBORINFO_APP)) // priority 4
for i := uint32(1); i <= 12; i++ {
c.Add(pkt(i, 1, pb.PortNum_NODEINFO_APP)) // priority 2
}
got := c.GetAll()
var nodeInfoCount, neighborCount int
for _, p := range got {
switch p.Data.PortNum {
case pb.PortNum_NODEINFO_APP:
nodeInfoCount++
case pb.PortNum_NEIGHBORINFO_APP:
neighborCount++
}
}
if len(got) != 5 {
t.Errorf("expected 5 packets (cap), got %d: %v", len(got), ids(got))
}
if neighborCount != 1 {
t.Errorf("expected neighbor-info to survive, got %d", neighborCount)
}
if nodeInfoCount != 4 {
t.Errorf("expected 4 node-infos (cap-1), got %d", nodeInfoCount)
}
}
// TestPressureIsGlobal verifies that eviction competes globally across all nodes:
// when the cap is hit, the oldest packet of the lowest priority is removed
// regardless of which node it came from.
func TestPressureIsGlobal(t *testing.T) {
c := NewNodeAwareCache(3, time.Hour)
c.Add(pkt(1, 1, pb.PortNum_NODEINFO_APP)) // node 1
c.Add(pkt(2, 2, pb.PortNum_NODEINFO_APP)) // node 2
c.Add(pkt(3, 1, pb.PortNum_NODEINFO_APP)) // node 1 — at cap
c.Add(pkt(4, 2, pb.PortNum_NODEINFO_APP)) // node 2 — evicts globally oldest = ID 1
got := c.GetAll()
if len(got) != 3 {
t.Fatalf("expected 3, got %d: %v", len(got), ids(got))
}
if got[0].Data.Id != 2 {
t.Errorf("expected oldest surviving ID 2, got %d", got[0].Data.Id)
}
}
// TestBeladyApproximation verifies the node-aware eviction: among same-priority
// old packets, the one from the most recently active node is evicted first
// (it is most likely to resend, making its old packet cheapest to lose).
// A flaky node's old packet therefore outlives a reliable node's old packet.
func TestBeladyApproximation(t *testing.T) {
now := time.Unix(0, 0)
c := NewNodeAwareCache(3, 24*time.Hour)
c.nowFunc = func() time.Time { return now }
// At T=0: both nodes send a node-info.
c.Add(pkt(1, 1, pb.PortNum_NODEINFO_APP)) // node F (flaky)
c.Add(pkt(2, 2, pb.PortNum_NODEINFO_APP)) // node R (reliable)
// At T=90m: R sends again — it is now the most recently active node.
// F has been silent, so its T=0 packet is the "harder to replace" one.
now = time.Unix(int64(90*time.Minute/time.Second), 0)
c.Add(pkt(3, 2, pb.PortNum_NODEINFO_APP)) // R resends, cache now at cap=3
// At T=2h: pressure. Both node 1 and node 2 have old packets (age > minEvictAge=0).
// Among lowest-priority (both are NODEINFO), evict from node 2 (most recently active
// at T=90m) rather than node 1 (last seen T=0).
now = time.Unix(int64(2*time.Hour/time.Second), 0)
minEvictAge = time.Hour
t.Cleanup(func() { minEvictAge = 0 })
c.Add(pkt(4, 3, pb.PortNum_NODEINFO_APP)) // new node — triggers eviction
got := c.GetAll()
var survivingNodes []uint32
for _, p := range got {
survivingNodes = append(survivingNodes, p.Data.From)
}
// Node 1's packet (ID 1, from flaky node) must survive.
flakySurvived := false
reliableOldSurvived := false
for _, p := range got {
if p.Data.Id == 1 {
flakySurvived = true
}
if p.Data.Id == 2 { // R's old T=0 packet
reliableOldSurvived = true
}
}
if !flakySurvived {
t.Error("flaky node's only packet should survive (Bélády: it won't be resent soon)")
}
if reliableOldSurvived {
t.Error("reliable node's old packet should be evicted (node recently resent, cheapest to lose)")
}
}
// TestMinAgeProtectsRecentPackets verifies that packets younger than minEvictAge
// are never evicted even under heavy cap pressure.
func TestMinAgeProtectsRecentPackets(t *testing.T) {
minEvictAge = time.Hour
t.Cleanup(func() { minEvictAge = 0 })
now := time.Unix(0, 0)
c := NewNodeAwareCache(3, 24*time.Hour)
c.nowFunc = func() time.Time { return now }
// Three packets at T=0 fill the cache.
for i := uint32(1); i <= 3; i++ {
c.Add(pkt(i, i, pb.PortNum_NODEINFO_APP))
}
// T=30m: add a fourth packet. All are < 1h old — none are eligible for
// priority eviction. The cap must still be respected, so one is evicted
// via the fallback (all-entries pickEvictTarget).
now = time.Unix(int64(30*time.Minute/time.Second), 0)
c.Add(pkt(4, 4, pb.PortNum_NODEINFO_APP))
got := c.GetAll()
if len(got) != 3 {
t.Fatalf("expected cap=3 enforced even with all-recent entries, got %d", len(got))
}
}
// TestRetentionEvictsStaleNode verifies whole-node eviction after silence.
func TestRetentionEvictsStaleNode(t *testing.T) {
now := time.Now()
c := NewNodeAwareCache(1000, 3*time.Hour)
c.nowFunc = func() time.Time { return now }
c.Add(pkt(1, 1, pb.PortNum_NODEINFO_APP))
c.Add(pkt(2, 1, pb.PortNum_NODEINFO_APP))
// Node 2 is active after node 1 has gone silent.
c.nowFunc = func() time.Time { return now.Add(1 * time.Minute) }
c.Add(pkt(3, 2, pb.PortNum_NODEINFO_APP))
// Past node 1's retention window.
c.nowFunc = func() time.Time { return now.Add(3*time.Hour + 1*time.Second) }
got := c.GetAll()
if len(got) != 1 || got[0].Data.Id != 3 {
t.Errorf("expected only node 2's packet (ID 3), got %v", ids(got))
}
}
// TestRetentionExtendsOnNewPacket verifies that a new packet resets the retention clock.
func TestRetentionExtendsOnNewPacket(t *testing.T) {
now := time.Now()
c := NewNodeAwareCache(1000, 3*time.Hour)
c.nowFunc = func() time.Time { return now }
c.Add(pkt(1, 1, pb.PortNum_NODEINFO_APP))
// Node 1 sends again at t=2h, refreshing its window.
c.nowFunc = func() time.Time { return now.Add(2 * time.Hour) }
c.Add(pkt(2, 1, pb.PortNum_NODEINFO_APP))
// t=3h+1s — past first packet's original window, but node refreshed at 2h.
c.nowFunc = func() time.Time { return now.Add(3*time.Hour + 1*time.Second) }
got := c.GetAll()
if len(got) != 2 {
t.Errorf("expected both packets retained (node still active), got %d", len(got))
}
}
// TestNoSourcePacketsAlwaysIncluded verifies from=0 packets bypass node retention.
func TestNoSourcePacketsAlwaysIncluded(t *testing.T) {
now := time.Now()
c := NewNodeAwareCache(1000, time.Hour)
c.nowFunc = func() time.Time { return now }
c.Add(pkt(1, 0, pb.PortNum_NODEINFO_APP))
c.nowFunc = func() time.Time { return now.Add(48 * time.Hour) }
got := c.GetAll()
if len(got) != 1 {
t.Errorf("expected from=0 packet to always be included, got %d", len(got))
}
}
// TestGlobalCapSamePriorityFIFO verifies that when all packets share the same
// priority, the oldest is evicted first (FIFO behaviour within a priority tier).
func TestGlobalCapSamePriorityFIFO(t *testing.T) {
c := NewNodeAwareCache(3, time.Hour)
// All from=0 packets of the same type — same priority, pure FIFO.
c.Add(pkt(1, 0, pb.PortNum_NODEINFO_APP))
c.Add(pkt(2, 0, pb.PortNum_NODEINFO_APP))
c.Add(pkt(3, 0, pb.PortNum_NODEINFO_APP))
c.Add(pkt(4, 0, pb.PortNum_NODEINFO_APP)) // should push out ID 1
got := c.GetAll()
if len(got) != 3 {
t.Fatalf("expected 3 (global cap), got %d", len(got))
}
if got[0].Data.Id != 2 {
t.Errorf("expected oldest surviving ID 2, got %d", got[0].Data.Id)
}
}
// TestRouterNodeSurvivesRetention verifies that router nodes are exempt from
// the retention window: GetAll still returns their packets after a long silence.
func TestRouterNodeSurvivesRetention(t *testing.T) {
now := time.Now()
c := NewNodeAwareCache(1000, 3*time.Hour)
c.nowFunc = func() time.Time { return now }
c.Add(routerNodeInfoPkt(1, 1)) // router node
c.Add(pkt(2, 2, pb.PortNum_NODEINFO_APP)) // regular node
// Advance past both nodes' retention window.
c.nowFunc = func() time.Time { return now.Add(3*time.Hour + 1*time.Second) }
got := c.GetAll()
var routerSurvived, regularSurvived bool
for _, p := range got {
switch p.Data.Id {
case 1:
routerSurvived = true
case 2:
regularSurvived = true
}
}
if !routerSurvived {
t.Error("router node's packet should survive past the retention window")
}
if regularSurvived {
t.Error("regular node's packet should be excluded after the retention window")
}
}
// TestRouterNodeNotPrunedUnderPressure verifies that pruneStale skips router
// nodes even when the cache is over capacity and regular stale nodes are removed.
func TestRouterNodeNotPrunedUnderPressure(t *testing.T) {
now := time.Now()
c := NewNodeAwareCache(4, 3*time.Hour)
c.nowFunc = func() time.Time { return now }
c.Add(routerNodeInfoPkt(1, 1)) // router node
c.Add(pkt(2, 2, pb.PortNum_NODEINFO_APP)) // regular node
// Advance past both nodes' retention window.
c.nowFunc = func() time.Time { return now.Add(3*time.Hour + 1*time.Second) }
// Two new packets push the cache over cap, triggering pruneStale.
c.Add(pkt(3, 3, pb.PortNum_NODEINFO_APP))
c.Add(pkt(4, 4, pb.PortNum_NODEINFO_APP))
c.Add(pkt(5, 5, pb.PortNum_NODEINFO_APP))
got := c.GetAll()
routerSurvived := false
for _, p := range got {
if p.Data.Id == 1 {
routerSurvived = true
}
}
if !routerSurvived {
t.Error("router node's packet should not be pruned under cache pressure")
}
}
// TestRouterNodeInfoElevatedPriority verifies that NODEINFO_APP and POSITION_APP
// packets from router nodes are protected over the same types from regular nodes.
func TestRouterNodeInfoElevatedPriority(t *testing.T) {
c := NewNodeAwareCache(5, time.Hour)
c.Add(routerNodeInfoPkt(1, 1)) // router NODEINFO — elevated to priority 4
c.Add(pkt(2, 2, pb.PortNum_NODEINFO_APP)) // regular NODEINFO — priority 2
c.Add(pkt(3, 3, pb.PortNum_NODEINFO_APP)) // regular NODEINFO — priority 2
c.Add(pkt(4, 4, pb.PortNum_NODEINFO_APP)) // regular NODEINFO — priority 2
c.Add(pkt(5, 5, pb.PortNum_NODEINFO_APP)) // regular NODEINFO — priority 2, at cap
c.Add(pkt(6, 6, pb.PortNum_NODEINFO_APP)) // pressure: must evict a regular NODEINFO
got := c.GetAll()
if len(got) != 5 {
t.Fatalf("expected cap=5, got %d: %v", len(got), ids(got))
}
routerSurvived := false
for _, p := range got {
if p.Data.Id == 1 {
routerSurvived = true
}
}
if !routerSurvived {
t.Error("router node's NODEINFO should survive eviction due to elevated priority")
}
}
// ids extracts packet IDs for readable failure messages.
func ids(packets []*meshtreampb.Packet) []uint32 {
out := make([]uint32, len(packets))
for i, p := range packets {
out[i] = p.Data.Id
}
return out
}
// ── Broker integration tests ───────────────────────────────────────────────────
func newTestBroker(sourceChan chan *meshtreampb.Packet, cacheSize int) *Broker {
return NewBroker(sourceChan, cacheSize, time.Hour, logging.NewDevLogger().Named("test"))
}
func TestBrokerSubscribeUnsubscribe(t *testing.T) {
sourceChan := make(chan *meshtreampb.Packet, 10)
broker := newTestBroker(sourceChan, 5)
defer broker.Close()
subscriber1 := broker.Subscribe(5)
subscriber2 := broker.Subscribe(5)
broker.subscriberMutex.RLock()
count := len(broker.subscribers)
broker.subscriberMutex.RUnlock()
if count != 2 {
t.Errorf("expected 2 subscribers, got %d", count)
}
sourceChan <- pkt(1, 0, pb.PortNum_UNKNOWN_APP)
select {
case p := <-subscriber1:
if p.Data.Id != 1 {
t.Errorf("subscriber1: want ID 1, got %d", p.Data.Id)
}
case <-time.After(100 * time.Millisecond):
t.Error("subscriber1 timed out")
}
select {
case p := <-subscriber2:
if p.Data.Id != 1 {
t.Errorf("subscriber2: want ID 1, got %d", p.Data.Id)
}
case <-time.After(100 * time.Millisecond):
t.Error("subscriber2 timed out")
}
broker.Unsubscribe(subscriber1)
broker.subscriberMutex.RLock()
count = len(broker.subscribers)
broker.subscriberMutex.RUnlock()
if count != 1 {
t.Errorf("expected 1 subscriber after unsubscribe, got %d", count)
}
sourceChan <- pkt(2, 0, pb.PortNum_UNKNOWN_APP)
select {
case p := <-subscriber2:
if p.Data.Id != 2 {
t.Errorf("subscriber2: want ID 2, got %d", p.Data.Id)
}
case <-time.After(100 * time.Millisecond):
t.Error("subscriber2 timed out on second packet")
}
}
func TestBrokerMultipleSubscribers(t *testing.T) {
sourceChan := make(chan *meshtreampb.Packet, 10)
broker := newTestBroker(sourceChan, 10)
defer broker.Close()
const n = 10
subs := make([]<-chan *meshtreampb.Packet, n)
for i := range subs {
subs[i] = broker.Subscribe(5)
}
sourceChan <- pkt(42, 0, pb.PortNum_UNKNOWN_APP)
var wg sync.WaitGroup
wg.Add(n)
for i, ch := range subs {
go func(idx int, c <-chan *meshtreampb.Packet) {
defer wg.Done()
select {
case p := <-c:
if p.Data.Id != 42 {
t.Errorf("sub %d: want 42, got %d", idx, p.Data.Id)
}
case <-time.After(100 * time.Millisecond):
t.Errorf("sub %d timed out", idx)
}
}(i, ch)
}
wg.Wait()
}
func TestBrokerSlowSubscriber(t *testing.T) {
sourceChan := make(chan *meshtreampb.Packet, 10)
broker := newTestBroker(sourceChan, 10)
defer broker.Close()
slow := broker.Subscribe(1)
fast := broker.Subscribe(5)
sourceChan <- pkt(101, 0, pb.PortNum_UNKNOWN_APP)
time.Sleep(10 * time.Millisecond)
sourceChan <- pkt(102, 0, pb.PortNum_UNKNOWN_APP)
for _, want := range []uint32{101, 102} {
select {
case p := <-fast:
if p.Data.Id != want {
t.Errorf("fast: want %d, got %d", want, p.Data.Id)
}
case <-time.After(100 * time.Millisecond):
t.Errorf("fast: timed out waiting for %d", want)
}
}
select {
case p := <-slow:
if p.Data.Id != 101 {
t.Errorf("slow: want 101, got %d", p.Data.Id)
}
case <-time.After(100 * time.Millisecond):
t.Error("slow: timed out")
}
}
func TestBrokerCloseWithSubscribers(t *testing.T) {
sourceChan := make(chan *meshtreampb.Packet, 10)
broker := newTestBroker(sourceChan, 5)
sub := broker.Subscribe(5)
broker.Close()
select {
case _, ok := <-sub:
if ok {
t.Error("expected channel to be closed")
}
case <-time.After(100 * time.Millisecond):
t.Error("channel not closed")
}
}
func TestBrokerPacketCaching(t *testing.T) {
sourceChan := make(chan *meshtreampb.Packet, 10)
broker := newTestBroker(sourceChan, 100)
defer broker.Close()
for i := uint32(1); i <= 3; i++ {
sourceChan <- pkt(i, i, pb.PortNum_NODEINFO_APP)
time.Sleep(10 * time.Millisecond)
}
sub := broker.Subscribe(10)
var received []uint32
for i := 0; i < 3; i++ {
select {
case p := <-sub:
received = append(received, p.Data.Id)
case <-time.After(200 * time.Millisecond):
t.Fatalf("timed out waiting for packet %d", i+1)
}
}
for i, id := range received {
if id != uint32(i+1) {
t.Errorf("pos %d: want %d, got %d", i, i+1, id)
}
}
}
// TestBrokerPriorityEvictionOnReplay verifies that when a subscriber joins after
// mixed packets have been received, the cached set reflects priority eviction:
// high-priority types survive while low-priority types are trimmed.
func TestBrokerPriorityEvictionOnReplay(t *testing.T) {
sourceChan := make(chan *meshtreampb.Packet, 20)
// Tight cache: cap=5 creates pressure with 9 packets.
broker := NewBroker(sourceChan, 5, time.Hour, logging.NewDevLogger().Named("test"))
defer broker.Close()
// 1 neighbor-info (priority 4) + 8 node-infos (priority 2).
sourceChan <- pkt(99, 1, pb.PortNum_NEIGHBORINFO_APP)
for i := 1; i <= 8; i++ {
sourceChan <- pkt(uint32(i), 1, pb.PortNum_NODEINFO_APP)
time.Sleep(5 * time.Millisecond)
}
time.Sleep(20 * time.Millisecond)
sub := broker.Subscribe(20)
var nodeInfoCount, neighborCount int
for i := 0; i < 5; i++ { // expect exactly cap=5 packets
select {
case p := <-sub:
switch p.Data.PortNum {
case pb.PortNum_NODEINFO_APP:
nodeInfoCount++
case pb.PortNum_NEIGHBORINFO_APP:
neighborCount++
}
case <-time.After(200 * time.Millisecond):
t.Fatalf("timed out waiting for cached packet %d", i+1)
}
}
if neighborCount != 1 {
t.Errorf("expected neighbor-info to survive in cache, got %d", neighborCount)
}
if nodeInfoCount != 4 {
t.Errorf("expected 4 node-infos (cap-1), got %d", nodeInfoCount)
}
}
// TestSubscriberBufferFull verifies that when cache replay times out due to a
// full subscriber buffer, live packets can still be delivered once there is room.
func TestSubscriberBufferFull(t *testing.T) {
orig := cacheGracePeriod
cacheGracePeriod = time.Millisecond // short enough to time out on packet 2+
t.Cleanup(func() { cacheGracePeriod = orig })
sourceChan := make(chan *meshtreampb.Packet, 10)
broker := newTestBroker(sourceChan, 100)
defer broker.Close()
for i := uint32(1); i <= 5; i++ {
sourceChan <- pkt(i, i, pb.PortNum_NODEINFO_APP)
time.Sleep(10 * time.Millisecond)
}
small := broker.Subscribe(1)
// Read the first cached packet to confirm replay started.
select {
case <-small:
case <-time.After(500 * time.Millisecond):
t.Error("timed out waiting for first cached packet")
return
}
// Wait for the cache goroutine to finish (timeout = 1 ms; 10 ms is ample).
time.Sleep(10 * time.Millisecond)
// Drain any residual cached packets that squeezed in before the timeout.
drained := false
for !drained {
select {
case <-small:
default:
drained = true
}
}
// Buffer is now empty — a new live packet should be deliverable.
sourceChan <- pkt(99, 99, pb.PortNum_NODEINFO_APP)
select {
case p := <-small:
if p.Data.Id != 99 {
t.Errorf("want live packet ID 99, got %d", p.Data.Id)
}
case <-time.After(100 * time.Millisecond):
t.Error("timed out waiting for live packet")
}
}