Implement advertisement packet deduplication with configurable TTL and max hashes

This commit is contained in:
Lloyd
2026-03-06 16:47:38 +00:00
parent 5c3b2eff2a
commit e19ec79b49

View File

@@ -8,6 +8,7 @@ Includes adaptive rate limiting based on mesh activity.
import asyncio
import logging
import time
from collections import OrderedDict
from enum import Enum
from typing import Dict, Optional, Tuple
@@ -89,10 +90,16 @@ class AdvertHelper:
float(penalty_cfg.get("max_penalty_seconds", 86400.0)),
)
# --- Advert dedupe config ---
dedupe_cfg = repeater_cfg.get("advert_dedupe", {})
self._advert_dedupe_ttl_seconds = max(1.0, float(dedupe_cfg.get("ttl_seconds", 120.0)))
self._advert_dedupe_max_hashes = max(100, int(dedupe_cfg.get("max_hashes", 10000)))
# --- Per-pubkey state ---
self._bucket_state: Dict[str, dict] = {}
self._penalty_until: Dict[str, float] = {}
self._violation_state: Dict[str, dict] = {}
self._recent_advert_hashes: OrderedDict[str, float] = OrderedDict()
# --- Adaptive metrics state ---
self._adverts_ewma = 0.0 # EWMA of adverts per minute
@@ -113,6 +120,7 @@ class AdvertHelper:
# Stats counters
self._stats_adverts_allowed = 0
self._stats_adverts_dropped = 0
self._stats_advert_duplicates = 0
self._stats_tier_changes = 0
# Recent drops tracking (keep last 20)
@@ -129,7 +137,8 @@ class AdvertHelper:
f"Advert limiter: adaptive={self._adaptive_enabled}, "
f"rate_limit={self._rate_limit_enabled}, "
f"bucket={self._base_bucket_capacity:.1f}, "
f"penalty={self._penalty_enabled}"
f"penalty={self._penalty_enabled}, "
f"dedupe=True"
)
# -------------------------------------------------------------------------
@@ -138,19 +147,27 @@ class AdvertHelper:
def _cleanup_old_state(self, now: float) -> None:
"""Clean up old/expired entries to prevent unbounded memory growth."""
# 1. Remove expired penalties
while self._recent_advert_hashes:
oldest_hash, expires_at = next(iter(self._recent_advert_hashes.items()))
if expires_at > now:
break
self._recent_advert_hashes.pop(oldest_hash, None)
while len(self._recent_advert_hashes) > self._advert_dedupe_max_hashes:
self._recent_advert_hashes.popitem(last=False)
expired_penalties = [pk for pk, until in self._penalty_until.items() if until < now]
for pk in expired_penalties:
del self._penalty_until[pk]
# 2. Remove old bucket states for inactive pubkeys
inactive_pubkeys = [
pk for pk, state in self._bucket_state.items()
if now - state.get("last_seen", 0) > self._bucket_state_retention_seconds
]
for pk in inactive_pubkeys:
del self._bucket_state[pk]
# Also clean up related violation state
if pk in self._violation_state:
del self._violation_state[pk]
@@ -161,7 +178,6 @@ class AdvertHelper:
# Reset violation count after decay period
vstate["count"] = 0
# 4. Hard limit: if we're tracking too many pubkeys, remove oldest inactive ones
if len(self._bucket_state) > self._max_tracked_pubkeys:
# Sort by last_seen and remove oldest 10%
sorted_pubkeys = sorted(
@@ -187,9 +203,33 @@ class AdvertHelper:
f"{len(inactive_pubkeys)} inactive pubkeys. "
f"Tracking: {len(self._bucket_state)} buckets, "
f"{len(self._penalty_until)} penalties, "
f"{len(self._known_neighbors)} neighbors"
f"{len(self._known_neighbors)} neighbors, "
f"{len(self._recent_advert_hashes)} advert hashes"
)
def _dedupe_advert_packet_hash(self, packet, now: float) -> bool:
"""Return True when advert packet hash was already seen recently."""
try:
pkt_hash = packet.calculate_packet_hash().hex().upper()
except Exception:
return False
expires_at = self._recent_advert_hashes.get(pkt_hash)
if expires_at and expires_at > now:
# Move to end so hot hashes remain least likely to be evicted
self._recent_advert_hashes.move_to_end(pkt_hash)
return True
# Track first-seen (or expired hash re-seen)
self._recent_advert_hashes[pkt_hash] = now + self._advert_dedupe_ttl_seconds
self._recent_advert_hashes.move_to_end(pkt_hash)
# Opportunistic cleanup to keep memory bounded between scheduled cleanup runs
while len(self._recent_advert_hashes) > self._advert_dedupe_max_hashes:
self._recent_advert_hashes.popitem(last=False)
return False
# -------------------------------------------------------------------------
# Adaptive tier calculation
# -------------------------------------------------------------------------
@@ -451,11 +491,18 @@ class AdvertHelper:
"stats": {
"adverts_allowed": self._stats_adverts_allowed,
"adverts_dropped": self._stats_adverts_dropped,
"adverts_duplicate_reheard": self._stats_advert_duplicates,
"drop_rate": round(
self._stats_adverts_dropped / max(1, self._stats_adverts_allowed + self._stats_adverts_dropped),
3,
),
},
"dedupe": {
"enabled": True,
"ttl_seconds": self._advert_dedupe_ttl_seconds,
"tracked_hashes": len(self._recent_advert_hashes),
"max_hashes": self._advert_dedupe_max_hashes,
},
"active_penalties": active_penalties,
"tracked_pubkeys": len(self._bucket_state),
"bucket_states": bucket_summary,
@@ -501,8 +548,20 @@ class AdvertHelper:
node_name = advert_data["name"]
contact_type = advert_data["contact_type"]
# Per-pubkey rate limiting (token bucket + penalty box)
now = time.time()
# Re-heard duplicates should be measured but not consume limiter tokens.
if self._dedupe_advert_packet_hash(packet, now):
self._stats_advert_duplicates += 1
self._update_metrics_window(now, is_advert=False, is_duplicate=True)
logger.debug(
"Duplicate advert re-heard from '%s' (%s...), skipping limiter/storage",
node_name,
pubkey[:16],
)
return
# Per-pubkey rate limiting (token bucket + penalty box)
allowed, reason = self._allow_advert(pubkey, now)
if not allowed:
logger.warning(f"Dropping advert from '{node_name}' ({pubkey[:16]}...): {reason}")
@@ -630,9 +689,15 @@ class AdvertHelper:
float(penalty_cfg.get("max_penalty_seconds", 86400.0)),
)
# Advert dedupe config
dedupe_cfg = repeater_cfg.get("advert_dedupe", {})
self._advert_dedupe_ttl_seconds = max(1.0, float(dedupe_cfg.get("ttl_seconds", 120.0)))
self._advert_dedupe_max_hashes = max(100, int(dedupe_cfg.get("max_hashes", 10000)))
logger.info(
f"Advert limiter config reloaded: adaptive={self._adaptive_enabled}, "
f"rate_limit={self._rate_limit_enabled}, bucket={self._base_bucket_capacity:.1f}"
f"rate_limit={self._rate_limit_enabled}, bucket={self._base_bucket_capacity:.1f}, "
f"dedupe=True"
)
except Exception as e:
logger.error(f"Error reloading advert limiter config: {e}")