From bd743275150b7ffb89c694501cf09c98268e2abc Mon Sep 17 00:00:00 2001 From: Daniel Pupius Date: Mon, 16 Mar 2026 10:47:50 -0700 Subject: [PATCH] Retain router position packets mroe aggressively --- mqtt/broker.go | 61 +++++++++++++++++++++----- mqtt/broker_test.go | 104 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 154 insertions(+), 11 deletions(-) diff --git a/mqtt/broker.go b/mqtt/broker.go index 83bcd55..44949b0 100644 --- a/mqtt/broker.go +++ b/mqtt/broker.go @@ -40,12 +40,17 @@ var typePriority = map[pb.PortNum]int{ // defaultTypePriority applies to any port type not listed above. const defaultTypePriority = 1 -// packetPriority returns the eviction priority for p. -func packetPriority(p *meshtreampb.Packet) int { - if pri, ok := typePriority[p.GetData().GetPortNum()]; ok { - return pri +// isRouterRole returns true for device roles that act as infrastructure nodes. +// These nodes transmit position and node-info far less often than client nodes, +// so their state packets need elevated protection from eviction. +func isRouterRole(role pb.Config_DeviceConfig_Role) bool { + switch role { + case pb.Config_DeviceConfig_ROUTER, + pb.Config_DeviceConfig_ROUTER_CLIENT, + pb.Config_DeviceConfig_ROUTER_LATE: + return true } - return defaultTypePriority + return false } // entry wraps a packet with its cache insertion timestamp. @@ -68,6 +73,8 @@ type entry struct { // // Node retention: once a node has been silent for [retention], its packets are // excluded from GetAll and proactively pruned when the cache is under pressure. +// Router nodes (ROUTER, ROUTER_CLIENT, ROUTER_LATE) are exempt from retention +// pruning — they transmit far less frequently and their state must be preserved. // // Packets with from=0 (no identified source node) are always included in GetAll. // They are never associated with a node's send rate, so they survive eviction @@ -75,8 +82,9 @@ type entry struct { type NodeAwareCache struct { mu sync.Mutex entries []entry - nodeLastSeen map[uint32]int64 // nodeID → unix timestamp of most recent packet - maxSize int // global safety cap + nodeLastSeen map[uint32]int64 // nodeID → unix timestamp of most recent packet + routerNodes map[uint32]bool // nodeIDs identified as router-role devices + maxSize int // global safety cap retention time.Duration nowFunc func() time.Time // injectable for testing } @@ -87,12 +95,32 @@ func NewNodeAwareCache(maxSize int, retention time.Duration) *NodeAwareCache { return &NodeAwareCache{ entries: make([]entry, 0, min(maxSize, 256)), nodeLastSeen: make(map[uint32]int64), + routerNodes: make(map[uint32]bool), maxSize: maxSize, retention: retention, nowFunc: time.Now, } } +// packetPriority returns the eviction priority for p. Router nodes' position +// and node-info packets are elevated to priority 4 (equal to NEIGHBORINFO_APP) +// because those nodes transmit far less frequently and their state is harder to +// replace. +// Must be called with c.mu held. +func (c *NodeAwareCache) packetPriority(p *meshtreampb.Packet) int { + port := p.GetData().GetPortNum() + if c.routerNodes[p.GetData().GetFrom()] { + switch port { + case pb.PortNum_POSITION_APP, pb.PortNum_NODEINFO_APP: + return 4 // elevated: same tier as NEIGHBORINFO_APP + } + } + if pri, ok := typePriority[port]; ok { + return pri + } + return defaultTypePriority +} + // Add records a packet. Recent packets (younger than minEvictAge) are never // evicted. When the global cap is hit, stale-node packets are pruned first; // if still over the limit, the best historical eviction candidate is removed. @@ -105,6 +133,17 @@ func (c *NodeAwareCache) Add(packet *meshtreampb.Packet) { if nodeID != 0 { c.nodeLastSeen[nodeID] = nowUnix + + // Keep router-node membership up to date from NODEINFO_APP packets. + if packet.GetData().GetPortNum() == pb.PortNum_NODEINFO_APP { + if user := packet.GetData().GetNodeInfo(); user != nil { + if isRouterRole(user.GetRole()) { + c.routerNodes[nodeID] = true + } else { + delete(c.routerNodes, nodeID) + } + } + } } c.entries = append(c.entries, entry{pkt: packet, insertedAt: nowUnix}) @@ -132,7 +171,7 @@ func (c *NodeAwareCache) GetAll() []*meshtreampb.Packet { activeNodes := make(map[uint32]bool, len(c.nodeLastSeen)) for nodeID, lastSeen := range c.nodeLastSeen { - if lastSeen >= cutoff { + if c.routerNodes[nodeID] || lastSeen >= cutoff { activeNodes[nodeID] = true } } @@ -183,7 +222,7 @@ func (c *NodeAwareCache) pickEvictTarget(ageThreshold int64) int { if ageThreshold >= 0 && e.insertedAt > ageThreshold { continue } - if pri := packetPriority(e.pkt); minPri < 0 || pri < minPri { + if pri := c.packetPriority(e.pkt); minPri < 0 || pri < minPri { minPri = pri } } @@ -199,7 +238,7 @@ func (c *NodeAwareCache) pickEvictTarget(ageThreshold int64) int { if ageThreshold >= 0 && e.insertedAt > ageThreshold { continue } - if packetPriority(e.pkt) != minPri { + if c.packetPriority(e.pkt) != minPri { continue } nodeID := e.pkt.GetData().GetFrom() @@ -220,7 +259,7 @@ func (c *NodeAwareCache) pruneStale(nowUnix int64) { stale := make(map[uint32]bool) for nodeID, lastSeen := range c.nodeLastSeen { - if lastSeen < cutoff { + if !c.routerNodes[nodeID] && lastSeen < cutoff { stale[nodeID] = true delete(c.nodeLastSeen, nodeID) } diff --git a/mqtt/broker_test.go b/mqtt/broker_test.go index 73f71ff..d045d09 100644 --- a/mqtt/broker_test.go +++ b/mqtt/broker_test.go @@ -28,6 +28,22 @@ func pkt(id, from uint32, port pb.PortNum) *meshtreampb.Packet { } } +// routerNodeInfoPkt builds a NODEINFO_APP packet whose User.Role is ROUTER, +// causing the cache to mark the sender as a router node. +func routerNodeInfoPkt(id, from uint32) *meshtreampb.Packet { + return &meshtreampb.Packet{ + Data: &meshtreampb.Data{ + Id: id, + From: from, + PortNum: pb.PortNum_NODEINFO_APP, + Payload: &meshtreampb.Data_NodeInfo{ + NodeInfo: &pb.User{Role: pb.Config_DeviceConfig_ROUTER}, + }, + }, + Info: &meshtreampb.TopicInfo{}, + } +} + // ── NodeAwareCache unit tests ────────────────────────────────────────────────── func TestNodeAwareCacheEmpty(t *testing.T) { @@ -293,6 +309,94 @@ func TestGlobalCapSamePriorityFIFO(t *testing.T) { } } +// TestRouterNodeSurvivesRetention verifies that router nodes are exempt from +// the retention window: GetAll still returns their packets after a long silence. +func TestRouterNodeSurvivesRetention(t *testing.T) { + now := time.Now() + c := NewNodeAwareCache(1000, 3*time.Hour) + c.nowFunc = func() time.Time { return now } + + c.Add(routerNodeInfoPkt(1, 1)) // router node + c.Add(pkt(2, 2, pb.PortNum_NODEINFO_APP)) // regular node + + // Advance past both nodes' retention window. + c.nowFunc = func() time.Time { return now.Add(3*time.Hour + 1*time.Second) } + + got := c.GetAll() + var routerSurvived, regularSurvived bool + for _, p := range got { + switch p.Data.Id { + case 1: + routerSurvived = true + case 2: + regularSurvived = true + } + } + if !routerSurvived { + t.Error("router node's packet should survive past the retention window") + } + if regularSurvived { + t.Error("regular node's packet should be excluded after the retention window") + } +} + +// TestRouterNodeNotPrunedUnderPressure verifies that pruneStale skips router +// nodes even when the cache is over capacity and regular stale nodes are removed. +func TestRouterNodeNotPrunedUnderPressure(t *testing.T) { + now := time.Now() + c := NewNodeAwareCache(4, 3*time.Hour) + c.nowFunc = func() time.Time { return now } + + c.Add(routerNodeInfoPkt(1, 1)) // router node + c.Add(pkt(2, 2, pb.PortNum_NODEINFO_APP)) // regular node + + // Advance past both nodes' retention window. + c.nowFunc = func() time.Time { return now.Add(3*time.Hour + 1*time.Second) } + + // Two new packets push the cache over cap, triggering pruneStale. + c.Add(pkt(3, 3, pb.PortNum_NODEINFO_APP)) + c.Add(pkt(4, 4, pb.PortNum_NODEINFO_APP)) + c.Add(pkt(5, 5, pb.PortNum_NODEINFO_APP)) + + got := c.GetAll() + routerSurvived := false + for _, p := range got { + if p.Data.Id == 1 { + routerSurvived = true + } + } + if !routerSurvived { + t.Error("router node's packet should not be pruned under cache pressure") + } +} + +// TestRouterNodeInfoElevatedPriority verifies that NODEINFO_APP and POSITION_APP +// packets from router nodes are protected over the same types from regular nodes. +func TestRouterNodeInfoElevatedPriority(t *testing.T) { + c := NewNodeAwareCache(5, time.Hour) + + c.Add(routerNodeInfoPkt(1, 1)) // router NODEINFO — elevated to priority 4 + c.Add(pkt(2, 2, pb.PortNum_NODEINFO_APP)) // regular NODEINFO — priority 2 + c.Add(pkt(3, 3, pb.PortNum_NODEINFO_APP)) // regular NODEINFO — priority 2 + c.Add(pkt(4, 4, pb.PortNum_NODEINFO_APP)) // regular NODEINFO — priority 2 + c.Add(pkt(5, 5, pb.PortNum_NODEINFO_APP)) // regular NODEINFO — priority 2, at cap + c.Add(pkt(6, 6, pb.PortNum_NODEINFO_APP)) // pressure: must evict a regular NODEINFO + + got := c.GetAll() + if len(got) != 5 { + t.Fatalf("expected cap=5, got %d: %v", len(got), ids(got)) + } + routerSurvived := false + for _, p := range got { + if p.Data.Id == 1 { + routerSurvived = true + } + } + if !routerSurvived { + t.Error("router node's NODEINFO should survive eviction due to elevated priority") + } +} + // ids extracts packet IDs for readable failure messages. func ids(packets []*meshtreampb.Packet) []uint32 { out := make([]uint32, len(packets))