mirror of
https://github.com/dpup/meshstream.git
synced 2026-03-28 17:42:37 +01:00
668 lines
20 KiB
Go
668 lines
20 KiB
Go
package mqtt
|
|
|
|
import (
|
|
"os"
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/dpup/prefab/logging"
|
|
|
|
meshtreampb "meshstream/generated/meshstream"
|
|
pb "meshstream/generated/meshtastic"
|
|
)
|
|
|
|
// TestMain disables age protection globally so pressure tests run without
|
|
// needing to advance mock clocks. Individual tests that specifically cover
|
|
// age-based behaviour set minEvictAge themselves.
|
|
func TestMain(m *testing.M) {
|
|
minEvictAge = 0
|
|
os.Exit(m.Run())
|
|
}
|
|
|
|
// pkt is a test helper that builds a Packet with the given ID, from-node, and port.
|
|
func pkt(id, from uint32, port pb.PortNum) *meshtreampb.Packet {
|
|
return &meshtreampb.Packet{
|
|
Data: &meshtreampb.Data{Id: id, From: from, PortNum: port},
|
|
Info: &meshtreampb.TopicInfo{},
|
|
}
|
|
}
|
|
|
|
// routerNodeInfoPkt builds a NODEINFO_APP packet whose User.Role is ROUTER,
|
|
// causing the cache to mark the sender as a router node.
|
|
func routerNodeInfoPkt(id, from uint32) *meshtreampb.Packet {
|
|
return &meshtreampb.Packet{
|
|
Data: &meshtreampb.Data{
|
|
Id: id,
|
|
From: from,
|
|
PortNum: pb.PortNum_NODEINFO_APP,
|
|
Payload: &meshtreampb.Data_NodeInfo{
|
|
NodeInfo: &pb.User{Role: pb.Config_DeviceConfig_ROUTER},
|
|
},
|
|
},
|
|
Info: &meshtreampb.TopicInfo{},
|
|
}
|
|
}
|
|
|
|
// ── NodeAwareCache unit tests ──────────────────────────────────────────────────
|
|
|
|
func TestNodeAwareCacheEmpty(t *testing.T) {
|
|
c := NewNodeAwareCache(100, time.Hour)
|
|
if got := c.GetAll(); len(got) != 0 {
|
|
t.Errorf("expected empty, got %d packets", len(got))
|
|
}
|
|
}
|
|
|
|
func TestNodeAwareCacheBasicOrdering(t *testing.T) {
|
|
c := NewNodeAwareCache(100, time.Hour)
|
|
c.Add(pkt(1, 1, pb.PortNum_NODEINFO_APP))
|
|
c.Add(pkt(2, 2, pb.PortNum_NODEINFO_APP))
|
|
c.Add(pkt(3, 3, pb.PortNum_NODEINFO_APP))
|
|
|
|
got := c.GetAll()
|
|
if len(got) != 3 {
|
|
t.Fatalf("expected 3, got %d", len(got))
|
|
}
|
|
for i, p := range got {
|
|
if p.Data.Id != uint32(i+1) {
|
|
t.Errorf("pos %d: want ID %d, got %d", i, i+1, p.Data.Id)
|
|
}
|
|
}
|
|
}
|
|
|
|
// TestPressureEvictsOldestOfLowestPriority verifies that under cap pressure, the
|
|
// oldest packet of the lowest-priority type is evicted while higher-priority
|
|
// types are preserved.
|
|
func TestPressureEvictsOldestOfLowestPriority(t *testing.T) {
|
|
c := NewNodeAwareCache(3, time.Hour)
|
|
|
|
c.Add(pkt(1, 1, pb.PortNum_NODEINFO_APP)) // priority 2
|
|
c.Add(pkt(2, 1, pb.PortNum_NEIGHBORINFO_APP)) // priority 4
|
|
c.Add(pkt(3, 1, pb.PortNum_NODEINFO_APP)) // priority 2 — at cap
|
|
c.Add(pkt(4, 1, pb.PortNum_NODEINFO_APP)) // pressure: evicts oldest pri=2 (ID 1)
|
|
|
|
got := c.GetAll()
|
|
if len(got) != 3 {
|
|
t.Fatalf("expected 3 (global cap), got %d: %v", len(got), ids(got))
|
|
}
|
|
|
|
wantIDs := []uint32{2, 3, 4}
|
|
for i, p := range got {
|
|
if p.Data.Id != wantIDs[i] {
|
|
t.Errorf("pos %d: want ID %d, got %d", i, wantIDs[i], p.Data.Id)
|
|
}
|
|
}
|
|
}
|
|
|
|
// TestRareTypeOutlastsFrequentType is the core scenario: a node sends many
|
|
// node-info packets and one neighbor-info packet. Under cache pressure, the
|
|
// neighbor-info (higher priority) should survive while old node-infos are evicted.
|
|
func TestRareTypeOutlastsFrequentType(t *testing.T) {
|
|
// Cap=5 creates pressure when we add 13 packets.
|
|
c := NewNodeAwareCache(5, time.Hour)
|
|
|
|
neighborID := uint32(50)
|
|
c.Add(pkt(neighborID, 1, pb.PortNum_NEIGHBORINFO_APP)) // priority 4
|
|
for i := uint32(1); i <= 12; i++ {
|
|
c.Add(pkt(i, 1, pb.PortNum_NODEINFO_APP)) // priority 2
|
|
}
|
|
|
|
got := c.GetAll()
|
|
|
|
var nodeInfoCount, neighborCount int
|
|
for _, p := range got {
|
|
switch p.Data.PortNum {
|
|
case pb.PortNum_NODEINFO_APP:
|
|
nodeInfoCount++
|
|
case pb.PortNum_NEIGHBORINFO_APP:
|
|
neighborCount++
|
|
}
|
|
}
|
|
|
|
if len(got) != 5 {
|
|
t.Errorf("expected 5 packets (cap), got %d: %v", len(got), ids(got))
|
|
}
|
|
if neighborCount != 1 {
|
|
t.Errorf("expected neighbor-info to survive, got %d", neighborCount)
|
|
}
|
|
if nodeInfoCount != 4 {
|
|
t.Errorf("expected 4 node-infos (cap-1), got %d", nodeInfoCount)
|
|
}
|
|
}
|
|
|
|
// TestPressureIsGlobal verifies that eviction competes globally across all nodes:
|
|
// when the cap is hit, the oldest packet of the lowest priority is removed
|
|
// regardless of which node it came from.
|
|
func TestPressureIsGlobal(t *testing.T) {
|
|
c := NewNodeAwareCache(3, time.Hour)
|
|
|
|
c.Add(pkt(1, 1, pb.PortNum_NODEINFO_APP)) // node 1
|
|
c.Add(pkt(2, 2, pb.PortNum_NODEINFO_APP)) // node 2
|
|
c.Add(pkt(3, 1, pb.PortNum_NODEINFO_APP)) // node 1 — at cap
|
|
c.Add(pkt(4, 2, pb.PortNum_NODEINFO_APP)) // node 2 — evicts globally oldest = ID 1
|
|
|
|
got := c.GetAll()
|
|
if len(got) != 3 {
|
|
t.Fatalf("expected 3, got %d: %v", len(got), ids(got))
|
|
}
|
|
if got[0].Data.Id != 2 {
|
|
t.Errorf("expected oldest surviving ID 2, got %d", got[0].Data.Id)
|
|
}
|
|
}
|
|
|
|
// TestBeladyApproximation verifies the node-aware eviction: among same-priority
|
|
// old packets, the one from the most recently active node is evicted first
|
|
// (it is most likely to resend, making its old packet cheapest to lose).
|
|
// A flaky node's old packet therefore outlives a reliable node's old packet.
|
|
func TestBeladyApproximation(t *testing.T) {
|
|
now := time.Unix(0, 0)
|
|
c := NewNodeAwareCache(3, 24*time.Hour)
|
|
c.nowFunc = func() time.Time { return now }
|
|
|
|
// At T=0: both nodes send a node-info.
|
|
c.Add(pkt(1, 1, pb.PortNum_NODEINFO_APP)) // node F (flaky)
|
|
c.Add(pkt(2, 2, pb.PortNum_NODEINFO_APP)) // node R (reliable)
|
|
|
|
// At T=90m: R sends again — it is now the most recently active node.
|
|
// F has been silent, so its T=0 packet is the "harder to replace" one.
|
|
now = time.Unix(int64(90*time.Minute/time.Second), 0)
|
|
c.Add(pkt(3, 2, pb.PortNum_NODEINFO_APP)) // R resends, cache now at cap=3
|
|
|
|
// At T=2h: pressure. Both node 1 and node 2 have old packets (age > minEvictAge=0).
|
|
// Among lowest-priority (both are NODEINFO), evict from node 2 (most recently active
|
|
// at T=90m) rather than node 1 (last seen T=0).
|
|
now = time.Unix(int64(2*time.Hour/time.Second), 0)
|
|
minEvictAge = time.Hour
|
|
t.Cleanup(func() { minEvictAge = 0 })
|
|
|
|
c.Add(pkt(4, 3, pb.PortNum_NODEINFO_APP)) // new node — triggers eviction
|
|
|
|
got := c.GetAll()
|
|
var survivingNodes []uint32
|
|
for _, p := range got {
|
|
survivingNodes = append(survivingNodes, p.Data.From)
|
|
}
|
|
|
|
// Node 1's packet (ID 1, from flaky node) must survive.
|
|
flakySurvived := false
|
|
reliableOldSurvived := false
|
|
for _, p := range got {
|
|
if p.Data.Id == 1 {
|
|
flakySurvived = true
|
|
}
|
|
if p.Data.Id == 2 { // R's old T=0 packet
|
|
reliableOldSurvived = true
|
|
}
|
|
}
|
|
|
|
if !flakySurvived {
|
|
t.Error("flaky node's only packet should survive (Bélády: it won't be resent soon)")
|
|
}
|
|
if reliableOldSurvived {
|
|
t.Error("reliable node's old packet should be evicted (node recently resent, cheapest to lose)")
|
|
}
|
|
}
|
|
|
|
// TestMinAgeProtectsRecentPackets verifies that packets younger than minEvictAge
|
|
// are never evicted even under heavy cap pressure.
|
|
func TestMinAgeProtectsRecentPackets(t *testing.T) {
|
|
minEvictAge = time.Hour
|
|
t.Cleanup(func() { minEvictAge = 0 })
|
|
|
|
now := time.Unix(0, 0)
|
|
c := NewNodeAwareCache(3, 24*time.Hour)
|
|
c.nowFunc = func() time.Time { return now }
|
|
|
|
// Three packets at T=0 fill the cache.
|
|
for i := uint32(1); i <= 3; i++ {
|
|
c.Add(pkt(i, i, pb.PortNum_NODEINFO_APP))
|
|
}
|
|
|
|
// T=30m: add a fourth packet. All are < 1h old — none are eligible for
|
|
// priority eviction. The cap must still be respected, so one is evicted
|
|
// via the fallback (all-entries pickEvictTarget).
|
|
now = time.Unix(int64(30*time.Minute/time.Second), 0)
|
|
c.Add(pkt(4, 4, pb.PortNum_NODEINFO_APP))
|
|
|
|
got := c.GetAll()
|
|
if len(got) != 3 {
|
|
t.Fatalf("expected cap=3 enforced even with all-recent entries, got %d", len(got))
|
|
}
|
|
}
|
|
|
|
// TestRetentionEvictsStaleNode verifies whole-node eviction after silence.
|
|
func TestRetentionEvictsStaleNode(t *testing.T) {
|
|
now := time.Now()
|
|
c := NewNodeAwareCache(1000, 3*time.Hour)
|
|
c.nowFunc = func() time.Time { return now }
|
|
|
|
c.Add(pkt(1, 1, pb.PortNum_NODEINFO_APP))
|
|
c.Add(pkt(2, 1, pb.PortNum_NODEINFO_APP))
|
|
|
|
// Node 2 is active after node 1 has gone silent.
|
|
c.nowFunc = func() time.Time { return now.Add(1 * time.Minute) }
|
|
c.Add(pkt(3, 2, pb.PortNum_NODEINFO_APP))
|
|
|
|
// Past node 1's retention window.
|
|
c.nowFunc = func() time.Time { return now.Add(3*time.Hour + 1*time.Second) }
|
|
|
|
got := c.GetAll()
|
|
if len(got) != 1 || got[0].Data.Id != 3 {
|
|
t.Errorf("expected only node 2's packet (ID 3), got %v", ids(got))
|
|
}
|
|
}
|
|
|
|
// TestRetentionExtendsOnNewPacket verifies that a new packet resets the retention clock.
|
|
func TestRetentionExtendsOnNewPacket(t *testing.T) {
|
|
now := time.Now()
|
|
c := NewNodeAwareCache(1000, 3*time.Hour)
|
|
c.nowFunc = func() time.Time { return now }
|
|
|
|
c.Add(pkt(1, 1, pb.PortNum_NODEINFO_APP))
|
|
|
|
// Node 1 sends again at t=2h, refreshing its window.
|
|
c.nowFunc = func() time.Time { return now.Add(2 * time.Hour) }
|
|
c.Add(pkt(2, 1, pb.PortNum_NODEINFO_APP))
|
|
|
|
// t=3h+1s — past first packet's original window, but node refreshed at 2h.
|
|
c.nowFunc = func() time.Time { return now.Add(3*time.Hour + 1*time.Second) }
|
|
|
|
got := c.GetAll()
|
|
if len(got) != 2 {
|
|
t.Errorf("expected both packets retained (node still active), got %d", len(got))
|
|
}
|
|
}
|
|
|
|
// TestNoSourcePacketsAlwaysIncluded verifies from=0 packets bypass node retention.
|
|
func TestNoSourcePacketsAlwaysIncluded(t *testing.T) {
|
|
now := time.Now()
|
|
c := NewNodeAwareCache(1000, time.Hour)
|
|
c.nowFunc = func() time.Time { return now }
|
|
|
|
c.Add(pkt(1, 0, pb.PortNum_NODEINFO_APP))
|
|
|
|
c.nowFunc = func() time.Time { return now.Add(48 * time.Hour) }
|
|
|
|
got := c.GetAll()
|
|
if len(got) != 1 {
|
|
t.Errorf("expected from=0 packet to always be included, got %d", len(got))
|
|
}
|
|
}
|
|
|
|
// TestGlobalCapSamePriorityFIFO verifies that when all packets share the same
|
|
// priority, the oldest is evicted first (FIFO behaviour within a priority tier).
|
|
func TestGlobalCapSamePriorityFIFO(t *testing.T) {
|
|
c := NewNodeAwareCache(3, time.Hour)
|
|
|
|
// All from=0 packets of the same type — same priority, pure FIFO.
|
|
c.Add(pkt(1, 0, pb.PortNum_NODEINFO_APP))
|
|
c.Add(pkt(2, 0, pb.PortNum_NODEINFO_APP))
|
|
c.Add(pkt(3, 0, pb.PortNum_NODEINFO_APP))
|
|
c.Add(pkt(4, 0, pb.PortNum_NODEINFO_APP)) // should push out ID 1
|
|
|
|
got := c.GetAll()
|
|
if len(got) != 3 {
|
|
t.Fatalf("expected 3 (global cap), got %d", len(got))
|
|
}
|
|
if got[0].Data.Id != 2 {
|
|
t.Errorf("expected oldest surviving ID 2, got %d", got[0].Data.Id)
|
|
}
|
|
}
|
|
|
|
// TestRouterNodeSurvivesRetention verifies that router nodes are exempt from
|
|
// the retention window: GetAll still returns their packets after a long silence.
|
|
func TestRouterNodeSurvivesRetention(t *testing.T) {
|
|
now := time.Now()
|
|
c := NewNodeAwareCache(1000, 3*time.Hour)
|
|
c.nowFunc = func() time.Time { return now }
|
|
|
|
c.Add(routerNodeInfoPkt(1, 1)) // router node
|
|
c.Add(pkt(2, 2, pb.PortNum_NODEINFO_APP)) // regular node
|
|
|
|
// Advance past both nodes' retention window.
|
|
c.nowFunc = func() time.Time { return now.Add(3*time.Hour + 1*time.Second) }
|
|
|
|
got := c.GetAll()
|
|
var routerSurvived, regularSurvived bool
|
|
for _, p := range got {
|
|
switch p.Data.Id {
|
|
case 1:
|
|
routerSurvived = true
|
|
case 2:
|
|
regularSurvived = true
|
|
}
|
|
}
|
|
if !routerSurvived {
|
|
t.Error("router node's packet should survive past the retention window")
|
|
}
|
|
if regularSurvived {
|
|
t.Error("regular node's packet should be excluded after the retention window")
|
|
}
|
|
}
|
|
|
|
// TestRouterNodeNotPrunedUnderPressure verifies that pruneStale skips router
|
|
// nodes even when the cache is over capacity and regular stale nodes are removed.
|
|
func TestRouterNodeNotPrunedUnderPressure(t *testing.T) {
|
|
now := time.Now()
|
|
c := NewNodeAwareCache(4, 3*time.Hour)
|
|
c.nowFunc = func() time.Time { return now }
|
|
|
|
c.Add(routerNodeInfoPkt(1, 1)) // router node
|
|
c.Add(pkt(2, 2, pb.PortNum_NODEINFO_APP)) // regular node
|
|
|
|
// Advance past both nodes' retention window.
|
|
c.nowFunc = func() time.Time { return now.Add(3*time.Hour + 1*time.Second) }
|
|
|
|
// Two new packets push the cache over cap, triggering pruneStale.
|
|
c.Add(pkt(3, 3, pb.PortNum_NODEINFO_APP))
|
|
c.Add(pkt(4, 4, pb.PortNum_NODEINFO_APP))
|
|
c.Add(pkt(5, 5, pb.PortNum_NODEINFO_APP))
|
|
|
|
got := c.GetAll()
|
|
routerSurvived := false
|
|
for _, p := range got {
|
|
if p.Data.Id == 1 {
|
|
routerSurvived = true
|
|
}
|
|
}
|
|
if !routerSurvived {
|
|
t.Error("router node's packet should not be pruned under cache pressure")
|
|
}
|
|
}
|
|
|
|
// TestRouterNodeInfoElevatedPriority verifies that NODEINFO_APP and POSITION_APP
|
|
// packets from router nodes are protected over the same types from regular nodes.
|
|
func TestRouterNodeInfoElevatedPriority(t *testing.T) {
|
|
c := NewNodeAwareCache(5, time.Hour)
|
|
|
|
c.Add(routerNodeInfoPkt(1, 1)) // router NODEINFO — elevated to priority 4
|
|
c.Add(pkt(2, 2, pb.PortNum_NODEINFO_APP)) // regular NODEINFO — priority 2
|
|
c.Add(pkt(3, 3, pb.PortNum_NODEINFO_APP)) // regular NODEINFO — priority 2
|
|
c.Add(pkt(4, 4, pb.PortNum_NODEINFO_APP)) // regular NODEINFO — priority 2
|
|
c.Add(pkt(5, 5, pb.PortNum_NODEINFO_APP)) // regular NODEINFO — priority 2, at cap
|
|
c.Add(pkt(6, 6, pb.PortNum_NODEINFO_APP)) // pressure: must evict a regular NODEINFO
|
|
|
|
got := c.GetAll()
|
|
if len(got) != 5 {
|
|
t.Fatalf("expected cap=5, got %d: %v", len(got), ids(got))
|
|
}
|
|
routerSurvived := false
|
|
for _, p := range got {
|
|
if p.Data.Id == 1 {
|
|
routerSurvived = true
|
|
}
|
|
}
|
|
if !routerSurvived {
|
|
t.Error("router node's NODEINFO should survive eviction due to elevated priority")
|
|
}
|
|
}
|
|
|
|
// ids extracts packet IDs for readable failure messages.
|
|
func ids(packets []*meshtreampb.Packet) []uint32 {
|
|
out := make([]uint32, len(packets))
|
|
for i, p := range packets {
|
|
out[i] = p.Data.Id
|
|
}
|
|
return out
|
|
}
|
|
|
|
// ── Broker integration tests ───────────────────────────────────────────────────
|
|
|
|
func newTestBroker(sourceChan chan *meshtreampb.Packet, cacheSize int) *Broker {
|
|
return NewBroker(sourceChan, cacheSize, time.Hour, logging.NewDevLogger().Named("test"))
|
|
}
|
|
|
|
func TestBrokerSubscribeUnsubscribe(t *testing.T) {
|
|
sourceChan := make(chan *meshtreampb.Packet, 10)
|
|
broker := newTestBroker(sourceChan, 5)
|
|
defer broker.Close()
|
|
|
|
subscriber1 := broker.Subscribe(5)
|
|
subscriber2 := broker.Subscribe(5)
|
|
|
|
broker.subscriberMutex.RLock()
|
|
count := len(broker.subscribers)
|
|
broker.subscriberMutex.RUnlock()
|
|
|
|
if count != 2 {
|
|
t.Errorf("expected 2 subscribers, got %d", count)
|
|
}
|
|
|
|
sourceChan <- pkt(1, 0, pb.PortNum_UNKNOWN_APP)
|
|
|
|
select {
|
|
case p := <-subscriber1:
|
|
if p.Data.Id != 1 {
|
|
t.Errorf("subscriber1: want ID 1, got %d", p.Data.Id)
|
|
}
|
|
case <-time.After(100 * time.Millisecond):
|
|
t.Error("subscriber1 timed out")
|
|
}
|
|
select {
|
|
case p := <-subscriber2:
|
|
if p.Data.Id != 1 {
|
|
t.Errorf("subscriber2: want ID 1, got %d", p.Data.Id)
|
|
}
|
|
case <-time.After(100 * time.Millisecond):
|
|
t.Error("subscriber2 timed out")
|
|
}
|
|
|
|
broker.Unsubscribe(subscriber1)
|
|
|
|
broker.subscriberMutex.RLock()
|
|
count = len(broker.subscribers)
|
|
broker.subscriberMutex.RUnlock()
|
|
if count != 1 {
|
|
t.Errorf("expected 1 subscriber after unsubscribe, got %d", count)
|
|
}
|
|
|
|
sourceChan <- pkt(2, 0, pb.PortNum_UNKNOWN_APP)
|
|
select {
|
|
case p := <-subscriber2:
|
|
if p.Data.Id != 2 {
|
|
t.Errorf("subscriber2: want ID 2, got %d", p.Data.Id)
|
|
}
|
|
case <-time.After(100 * time.Millisecond):
|
|
t.Error("subscriber2 timed out on second packet")
|
|
}
|
|
}
|
|
|
|
func TestBrokerMultipleSubscribers(t *testing.T) {
|
|
sourceChan := make(chan *meshtreampb.Packet, 10)
|
|
broker := newTestBroker(sourceChan, 10)
|
|
defer broker.Close()
|
|
|
|
const n = 10
|
|
subs := make([]<-chan *meshtreampb.Packet, n)
|
|
for i := range subs {
|
|
subs[i] = broker.Subscribe(5)
|
|
}
|
|
|
|
sourceChan <- pkt(42, 0, pb.PortNum_UNKNOWN_APP)
|
|
|
|
var wg sync.WaitGroup
|
|
wg.Add(n)
|
|
for i, ch := range subs {
|
|
go func(idx int, c <-chan *meshtreampb.Packet) {
|
|
defer wg.Done()
|
|
select {
|
|
case p := <-c:
|
|
if p.Data.Id != 42 {
|
|
t.Errorf("sub %d: want 42, got %d", idx, p.Data.Id)
|
|
}
|
|
case <-time.After(100 * time.Millisecond):
|
|
t.Errorf("sub %d timed out", idx)
|
|
}
|
|
}(i, ch)
|
|
}
|
|
wg.Wait()
|
|
}
|
|
|
|
func TestBrokerSlowSubscriber(t *testing.T) {
|
|
sourceChan := make(chan *meshtreampb.Packet, 10)
|
|
broker := newTestBroker(sourceChan, 10)
|
|
defer broker.Close()
|
|
|
|
slow := broker.Subscribe(1)
|
|
fast := broker.Subscribe(5)
|
|
|
|
sourceChan <- pkt(101, 0, pb.PortNum_UNKNOWN_APP)
|
|
time.Sleep(10 * time.Millisecond)
|
|
sourceChan <- pkt(102, 0, pb.PortNum_UNKNOWN_APP)
|
|
|
|
for _, want := range []uint32{101, 102} {
|
|
select {
|
|
case p := <-fast:
|
|
if p.Data.Id != want {
|
|
t.Errorf("fast: want %d, got %d", want, p.Data.Id)
|
|
}
|
|
case <-time.After(100 * time.Millisecond):
|
|
t.Errorf("fast: timed out waiting for %d", want)
|
|
}
|
|
}
|
|
select {
|
|
case p := <-slow:
|
|
if p.Data.Id != 101 {
|
|
t.Errorf("slow: want 101, got %d", p.Data.Id)
|
|
}
|
|
case <-time.After(100 * time.Millisecond):
|
|
t.Error("slow: timed out")
|
|
}
|
|
}
|
|
|
|
func TestBrokerCloseWithSubscribers(t *testing.T) {
|
|
sourceChan := make(chan *meshtreampb.Packet, 10)
|
|
broker := newTestBroker(sourceChan, 5)
|
|
|
|
sub := broker.Subscribe(5)
|
|
broker.Close()
|
|
|
|
select {
|
|
case _, ok := <-sub:
|
|
if ok {
|
|
t.Error("expected channel to be closed")
|
|
}
|
|
case <-time.After(100 * time.Millisecond):
|
|
t.Error("channel not closed")
|
|
}
|
|
}
|
|
|
|
func TestBrokerPacketCaching(t *testing.T) {
|
|
sourceChan := make(chan *meshtreampb.Packet, 10)
|
|
broker := newTestBroker(sourceChan, 100)
|
|
defer broker.Close()
|
|
|
|
for i := uint32(1); i <= 3; i++ {
|
|
sourceChan <- pkt(i, i, pb.PortNum_NODEINFO_APP)
|
|
time.Sleep(10 * time.Millisecond)
|
|
}
|
|
|
|
sub := broker.Subscribe(10)
|
|
var received []uint32
|
|
for i := 0; i < 3; i++ {
|
|
select {
|
|
case p := <-sub:
|
|
received = append(received, p.Data.Id)
|
|
case <-time.After(200 * time.Millisecond):
|
|
t.Fatalf("timed out waiting for packet %d", i+1)
|
|
}
|
|
}
|
|
for i, id := range received {
|
|
if id != uint32(i+1) {
|
|
t.Errorf("pos %d: want %d, got %d", i, i+1, id)
|
|
}
|
|
}
|
|
}
|
|
|
|
// TestBrokerPriorityEvictionOnReplay verifies that when a subscriber joins after
|
|
// mixed packets have been received, the cached set reflects priority eviction:
|
|
// high-priority types survive while low-priority types are trimmed.
|
|
func TestBrokerPriorityEvictionOnReplay(t *testing.T) {
|
|
sourceChan := make(chan *meshtreampb.Packet, 20)
|
|
// Tight cache: cap=5 creates pressure with 9 packets.
|
|
broker := NewBroker(sourceChan, 5, time.Hour, logging.NewDevLogger().Named("test"))
|
|
defer broker.Close()
|
|
|
|
// 1 neighbor-info (priority 4) + 8 node-infos (priority 2).
|
|
sourceChan <- pkt(99, 1, pb.PortNum_NEIGHBORINFO_APP)
|
|
for i := 1; i <= 8; i++ {
|
|
sourceChan <- pkt(uint32(i), 1, pb.PortNum_NODEINFO_APP)
|
|
time.Sleep(5 * time.Millisecond)
|
|
}
|
|
time.Sleep(20 * time.Millisecond)
|
|
|
|
sub := broker.Subscribe(20)
|
|
var nodeInfoCount, neighborCount int
|
|
for i := 0; i < 5; i++ { // expect exactly cap=5 packets
|
|
select {
|
|
case p := <-sub:
|
|
switch p.Data.PortNum {
|
|
case pb.PortNum_NODEINFO_APP:
|
|
nodeInfoCount++
|
|
case pb.PortNum_NEIGHBORINFO_APP:
|
|
neighborCount++
|
|
}
|
|
case <-time.After(200 * time.Millisecond):
|
|
t.Fatalf("timed out waiting for cached packet %d", i+1)
|
|
}
|
|
}
|
|
|
|
if neighborCount != 1 {
|
|
t.Errorf("expected neighbor-info to survive in cache, got %d", neighborCount)
|
|
}
|
|
if nodeInfoCount != 4 {
|
|
t.Errorf("expected 4 node-infos (cap-1), got %d", nodeInfoCount)
|
|
}
|
|
}
|
|
|
|
// TestSubscriberBufferFull verifies that when cache replay times out due to a
|
|
// full subscriber buffer, live packets can still be delivered once there is room.
|
|
func TestSubscriberBufferFull(t *testing.T) {
|
|
orig := cacheGracePeriod
|
|
cacheGracePeriod = time.Millisecond // short enough to time out on packet 2+
|
|
t.Cleanup(func() { cacheGracePeriod = orig })
|
|
|
|
sourceChan := make(chan *meshtreampb.Packet, 10)
|
|
broker := newTestBroker(sourceChan, 100)
|
|
defer broker.Close()
|
|
|
|
for i := uint32(1); i <= 5; i++ {
|
|
sourceChan <- pkt(i, i, pb.PortNum_NODEINFO_APP)
|
|
time.Sleep(10 * time.Millisecond)
|
|
}
|
|
|
|
small := broker.Subscribe(1)
|
|
|
|
// Read the first cached packet to confirm replay started.
|
|
select {
|
|
case <-small:
|
|
case <-time.After(500 * time.Millisecond):
|
|
t.Error("timed out waiting for first cached packet")
|
|
return
|
|
}
|
|
|
|
// Wait for the cache goroutine to finish (timeout = 1 ms; 10 ms is ample).
|
|
time.Sleep(10 * time.Millisecond)
|
|
|
|
// Drain any residual cached packets that squeezed in before the timeout.
|
|
drained := false
|
|
for !drained {
|
|
select {
|
|
case <-small:
|
|
default:
|
|
drained = true
|
|
}
|
|
}
|
|
|
|
// Buffer is now empty — a new live packet should be deliverable.
|
|
sourceChan <- pkt(99, 99, pb.PortNum_NODEINFO_APP)
|
|
select {
|
|
case p := <-small:
|
|
if p.Data.Id != 99 {
|
|
t.Errorf("want live packet ID 99, got %d", p.Data.Id)
|
|
}
|
|
case <-time.After(100 * time.Millisecond):
|
|
t.Error("timed out waiting for live packet")
|
|
}
|
|
}
|