mirror of
https://github.com/dpup/meshstream.git
synced 2026-03-28 17:42:37 +01:00
Retain router position packets mroe aggressively
This commit is contained in:
@@ -40,12 +40,17 @@ var typePriority = map[pb.PortNum]int{
|
||||
// defaultTypePriority applies to any port type not listed above.
|
||||
const defaultTypePriority = 1
|
||||
|
||||
// packetPriority returns the eviction priority for p.
|
||||
func packetPriority(p *meshtreampb.Packet) int {
|
||||
if pri, ok := typePriority[p.GetData().GetPortNum()]; ok {
|
||||
return pri
|
||||
// isRouterRole returns true for device roles that act as infrastructure nodes.
|
||||
// These nodes transmit position and node-info far less often than client nodes,
|
||||
// so their state packets need elevated protection from eviction.
|
||||
func isRouterRole(role pb.Config_DeviceConfig_Role) bool {
|
||||
switch role {
|
||||
case pb.Config_DeviceConfig_ROUTER,
|
||||
pb.Config_DeviceConfig_ROUTER_CLIENT,
|
||||
pb.Config_DeviceConfig_ROUTER_LATE:
|
||||
return true
|
||||
}
|
||||
return defaultTypePriority
|
||||
return false
|
||||
}
|
||||
|
||||
// entry wraps a packet with its cache insertion timestamp.
|
||||
@@ -68,6 +73,8 @@ type entry struct {
|
||||
//
|
||||
// Node retention: once a node has been silent for [retention], its packets are
|
||||
// excluded from GetAll and proactively pruned when the cache is under pressure.
|
||||
// Router nodes (ROUTER, ROUTER_CLIENT, ROUTER_LATE) are exempt from retention
|
||||
// pruning — they transmit far less frequently and their state must be preserved.
|
||||
//
|
||||
// Packets with from=0 (no identified source node) are always included in GetAll.
|
||||
// They are never associated with a node's send rate, so they survive eviction
|
||||
@@ -75,8 +82,9 @@ type entry struct {
|
||||
type NodeAwareCache struct {
|
||||
mu sync.Mutex
|
||||
entries []entry
|
||||
nodeLastSeen map[uint32]int64 // nodeID → unix timestamp of most recent packet
|
||||
maxSize int // global safety cap
|
||||
nodeLastSeen map[uint32]int64 // nodeID → unix timestamp of most recent packet
|
||||
routerNodes map[uint32]bool // nodeIDs identified as router-role devices
|
||||
maxSize int // global safety cap
|
||||
retention time.Duration
|
||||
nowFunc func() time.Time // injectable for testing
|
||||
}
|
||||
@@ -87,12 +95,32 @@ func NewNodeAwareCache(maxSize int, retention time.Duration) *NodeAwareCache {
|
||||
return &NodeAwareCache{
|
||||
entries: make([]entry, 0, min(maxSize, 256)),
|
||||
nodeLastSeen: make(map[uint32]int64),
|
||||
routerNodes: make(map[uint32]bool),
|
||||
maxSize: maxSize,
|
||||
retention: retention,
|
||||
nowFunc: time.Now,
|
||||
}
|
||||
}
|
||||
|
||||
// packetPriority returns the eviction priority for p. Router nodes' position
|
||||
// and node-info packets are elevated to priority 4 (equal to NEIGHBORINFO_APP)
|
||||
// because those nodes transmit far less frequently and their state is harder to
|
||||
// replace.
|
||||
// Must be called with c.mu held.
|
||||
func (c *NodeAwareCache) packetPriority(p *meshtreampb.Packet) int {
|
||||
port := p.GetData().GetPortNum()
|
||||
if c.routerNodes[p.GetData().GetFrom()] {
|
||||
switch port {
|
||||
case pb.PortNum_POSITION_APP, pb.PortNum_NODEINFO_APP:
|
||||
return 4 // elevated: same tier as NEIGHBORINFO_APP
|
||||
}
|
||||
}
|
||||
if pri, ok := typePriority[port]; ok {
|
||||
return pri
|
||||
}
|
||||
return defaultTypePriority
|
||||
}
|
||||
|
||||
// Add records a packet. Recent packets (younger than minEvictAge) are never
|
||||
// evicted. When the global cap is hit, stale-node packets are pruned first;
|
||||
// if still over the limit, the best historical eviction candidate is removed.
|
||||
@@ -105,6 +133,17 @@ func (c *NodeAwareCache) Add(packet *meshtreampb.Packet) {
|
||||
|
||||
if nodeID != 0 {
|
||||
c.nodeLastSeen[nodeID] = nowUnix
|
||||
|
||||
// Keep router-node membership up to date from NODEINFO_APP packets.
|
||||
if packet.GetData().GetPortNum() == pb.PortNum_NODEINFO_APP {
|
||||
if user := packet.GetData().GetNodeInfo(); user != nil {
|
||||
if isRouterRole(user.GetRole()) {
|
||||
c.routerNodes[nodeID] = true
|
||||
} else {
|
||||
delete(c.routerNodes, nodeID)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
c.entries = append(c.entries, entry{pkt: packet, insertedAt: nowUnix})
|
||||
@@ -132,7 +171,7 @@ func (c *NodeAwareCache) GetAll() []*meshtreampb.Packet {
|
||||
|
||||
activeNodes := make(map[uint32]bool, len(c.nodeLastSeen))
|
||||
for nodeID, lastSeen := range c.nodeLastSeen {
|
||||
if lastSeen >= cutoff {
|
||||
if c.routerNodes[nodeID] || lastSeen >= cutoff {
|
||||
activeNodes[nodeID] = true
|
||||
}
|
||||
}
|
||||
@@ -183,7 +222,7 @@ func (c *NodeAwareCache) pickEvictTarget(ageThreshold int64) int {
|
||||
if ageThreshold >= 0 && e.insertedAt > ageThreshold {
|
||||
continue
|
||||
}
|
||||
if pri := packetPriority(e.pkt); minPri < 0 || pri < minPri {
|
||||
if pri := c.packetPriority(e.pkt); minPri < 0 || pri < minPri {
|
||||
minPri = pri
|
||||
}
|
||||
}
|
||||
@@ -199,7 +238,7 @@ func (c *NodeAwareCache) pickEvictTarget(ageThreshold int64) int {
|
||||
if ageThreshold >= 0 && e.insertedAt > ageThreshold {
|
||||
continue
|
||||
}
|
||||
if packetPriority(e.pkt) != minPri {
|
||||
if c.packetPriority(e.pkt) != minPri {
|
||||
continue
|
||||
}
|
||||
nodeID := e.pkt.GetData().GetFrom()
|
||||
@@ -220,7 +259,7 @@ func (c *NodeAwareCache) pruneStale(nowUnix int64) {
|
||||
|
||||
stale := make(map[uint32]bool)
|
||||
for nodeID, lastSeen := range c.nodeLastSeen {
|
||||
if lastSeen < cutoff {
|
||||
if !c.routerNodes[nodeID] && lastSeen < cutoff {
|
||||
stale[nodeID] = true
|
||||
delete(c.nodeLastSeen, nodeID)
|
||||
}
|
||||
|
||||
@@ -28,6 +28,22 @@ func pkt(id, from uint32, port pb.PortNum) *meshtreampb.Packet {
|
||||
}
|
||||
}
|
||||
|
||||
// routerNodeInfoPkt builds a NODEINFO_APP packet whose User.Role is ROUTER,
|
||||
// causing the cache to mark the sender as a router node.
|
||||
func routerNodeInfoPkt(id, from uint32) *meshtreampb.Packet {
|
||||
return &meshtreampb.Packet{
|
||||
Data: &meshtreampb.Data{
|
||||
Id: id,
|
||||
From: from,
|
||||
PortNum: pb.PortNum_NODEINFO_APP,
|
||||
Payload: &meshtreampb.Data_NodeInfo{
|
||||
NodeInfo: &pb.User{Role: pb.Config_DeviceConfig_ROUTER},
|
||||
},
|
||||
},
|
||||
Info: &meshtreampb.TopicInfo{},
|
||||
}
|
||||
}
|
||||
|
||||
// ── NodeAwareCache unit tests ──────────────────────────────────────────────────
|
||||
|
||||
func TestNodeAwareCacheEmpty(t *testing.T) {
|
||||
@@ -293,6 +309,94 @@ func TestGlobalCapSamePriorityFIFO(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestRouterNodeSurvivesRetention verifies that router nodes are exempt from
|
||||
// the retention window: GetAll still returns their packets after a long silence.
|
||||
func TestRouterNodeSurvivesRetention(t *testing.T) {
|
||||
now := time.Now()
|
||||
c := NewNodeAwareCache(1000, 3*time.Hour)
|
||||
c.nowFunc = func() time.Time { return now }
|
||||
|
||||
c.Add(routerNodeInfoPkt(1, 1)) // router node
|
||||
c.Add(pkt(2, 2, pb.PortNum_NODEINFO_APP)) // regular node
|
||||
|
||||
// Advance past both nodes' retention window.
|
||||
c.nowFunc = func() time.Time { return now.Add(3*time.Hour + 1*time.Second) }
|
||||
|
||||
got := c.GetAll()
|
||||
var routerSurvived, regularSurvived bool
|
||||
for _, p := range got {
|
||||
switch p.Data.Id {
|
||||
case 1:
|
||||
routerSurvived = true
|
||||
case 2:
|
||||
regularSurvived = true
|
||||
}
|
||||
}
|
||||
if !routerSurvived {
|
||||
t.Error("router node's packet should survive past the retention window")
|
||||
}
|
||||
if regularSurvived {
|
||||
t.Error("regular node's packet should be excluded after the retention window")
|
||||
}
|
||||
}
|
||||
|
||||
// TestRouterNodeNotPrunedUnderPressure verifies that pruneStale skips router
|
||||
// nodes even when the cache is over capacity and regular stale nodes are removed.
|
||||
func TestRouterNodeNotPrunedUnderPressure(t *testing.T) {
|
||||
now := time.Now()
|
||||
c := NewNodeAwareCache(4, 3*time.Hour)
|
||||
c.nowFunc = func() time.Time { return now }
|
||||
|
||||
c.Add(routerNodeInfoPkt(1, 1)) // router node
|
||||
c.Add(pkt(2, 2, pb.PortNum_NODEINFO_APP)) // regular node
|
||||
|
||||
// Advance past both nodes' retention window.
|
||||
c.nowFunc = func() time.Time { return now.Add(3*time.Hour + 1*time.Second) }
|
||||
|
||||
// Two new packets push the cache over cap, triggering pruneStale.
|
||||
c.Add(pkt(3, 3, pb.PortNum_NODEINFO_APP))
|
||||
c.Add(pkt(4, 4, pb.PortNum_NODEINFO_APP))
|
||||
c.Add(pkt(5, 5, pb.PortNum_NODEINFO_APP))
|
||||
|
||||
got := c.GetAll()
|
||||
routerSurvived := false
|
||||
for _, p := range got {
|
||||
if p.Data.Id == 1 {
|
||||
routerSurvived = true
|
||||
}
|
||||
}
|
||||
if !routerSurvived {
|
||||
t.Error("router node's packet should not be pruned under cache pressure")
|
||||
}
|
||||
}
|
||||
|
||||
// TestRouterNodeInfoElevatedPriority verifies that NODEINFO_APP and POSITION_APP
|
||||
// packets from router nodes are protected over the same types from regular nodes.
|
||||
func TestRouterNodeInfoElevatedPriority(t *testing.T) {
|
||||
c := NewNodeAwareCache(5, time.Hour)
|
||||
|
||||
c.Add(routerNodeInfoPkt(1, 1)) // router NODEINFO — elevated to priority 4
|
||||
c.Add(pkt(2, 2, pb.PortNum_NODEINFO_APP)) // regular NODEINFO — priority 2
|
||||
c.Add(pkt(3, 3, pb.PortNum_NODEINFO_APP)) // regular NODEINFO — priority 2
|
||||
c.Add(pkt(4, 4, pb.PortNum_NODEINFO_APP)) // regular NODEINFO — priority 2
|
||||
c.Add(pkt(5, 5, pb.PortNum_NODEINFO_APP)) // regular NODEINFO — priority 2, at cap
|
||||
c.Add(pkt(6, 6, pb.PortNum_NODEINFO_APP)) // pressure: must evict a regular NODEINFO
|
||||
|
||||
got := c.GetAll()
|
||||
if len(got) != 5 {
|
||||
t.Fatalf("expected cap=5, got %d: %v", len(got), ids(got))
|
||||
}
|
||||
routerSurvived := false
|
||||
for _, p := range got {
|
||||
if p.Data.Id == 1 {
|
||||
routerSurvived = true
|
||||
}
|
||||
}
|
||||
if !routerSurvived {
|
||||
t.Error("router node's NODEINFO should survive eviction due to elevated priority")
|
||||
}
|
||||
}
|
||||
|
||||
// ids extracts packet IDs for readable failure messages.
|
||||
func ids(packets []*meshtreampb.Packet) []uint32 {
|
||||
out := make([]uint32, len(packets))
|
||||
|
||||
Reference in New Issue
Block a user