From e19ec79b4910ad5a6a1a4a2800fc5e120894ffe2 Mon Sep 17 00:00:00 2001 From: Lloyd Date: Fri, 6 Mar 2026 16:47:38 +0000 Subject: [PATCH] Implement advertisement packet deduplication with configurable TTL and max hashes --- repeater/handler_helpers/advert.py | 81 +++++++++++++++++++++++++++--- 1 file changed, 73 insertions(+), 8 deletions(-) diff --git a/repeater/handler_helpers/advert.py b/repeater/handler_helpers/advert.py index 4c825bb..c31cbe2 100644 --- a/repeater/handler_helpers/advert.py +++ b/repeater/handler_helpers/advert.py @@ -8,6 +8,7 @@ Includes adaptive rate limiting based on mesh activity. import asyncio import logging import time +from collections import OrderedDict from enum import Enum from typing import Dict, Optional, Tuple @@ -89,10 +90,16 @@ class AdvertHelper: float(penalty_cfg.get("max_penalty_seconds", 86400.0)), ) + # --- Advert dedupe config --- + dedupe_cfg = repeater_cfg.get("advert_dedupe", {}) + self._advert_dedupe_ttl_seconds = max(1.0, float(dedupe_cfg.get("ttl_seconds", 120.0))) + self._advert_dedupe_max_hashes = max(100, int(dedupe_cfg.get("max_hashes", 10000))) + # --- Per-pubkey state --- self._bucket_state: Dict[str, dict] = {} self._penalty_until: Dict[str, float] = {} self._violation_state: Dict[str, dict] = {} + self._recent_advert_hashes: OrderedDict[str, float] = OrderedDict() # --- Adaptive metrics state --- self._adverts_ewma = 0.0 # EWMA of adverts per minute @@ -113,6 +120,7 @@ class AdvertHelper: # Stats counters self._stats_adverts_allowed = 0 self._stats_adverts_dropped = 0 + self._stats_advert_duplicates = 0 self._stats_tier_changes = 0 # Recent drops tracking (keep last 20) @@ -129,7 +137,8 @@ class AdvertHelper: f"Advert limiter: adaptive={self._adaptive_enabled}, " f"rate_limit={self._rate_limit_enabled}, " f"bucket={self._base_bucket_capacity:.1f}, " - f"penalty={self._penalty_enabled}" + f"penalty={self._penalty_enabled}, " + f"dedupe=True" ) # ------------------------------------------------------------------------- @@ -138,19 +147,27 @@ class AdvertHelper: def _cleanup_old_state(self, now: float) -> None: """Clean up old/expired entries to prevent unbounded memory growth.""" - # 1. Remove expired penalties + while self._recent_advert_hashes: + oldest_hash, expires_at = next(iter(self._recent_advert_hashes.items())) + if expires_at > now: + break + self._recent_advert_hashes.pop(oldest_hash, None) + + while len(self._recent_advert_hashes) > self._advert_dedupe_max_hashes: + self._recent_advert_hashes.popitem(last=False) + + expired_penalties = [pk for pk, until in self._penalty_until.items() if until < now] for pk in expired_penalties: del self._penalty_until[pk] - # 2. Remove old bucket states for inactive pubkeys + inactive_pubkeys = [ pk for pk, state in self._bucket_state.items() if now - state.get("last_seen", 0) > self._bucket_state_retention_seconds ] for pk in inactive_pubkeys: del self._bucket_state[pk] - # Also clean up related violation state if pk in self._violation_state: del self._violation_state[pk] @@ -161,7 +178,6 @@ class AdvertHelper: # Reset violation count after decay period vstate["count"] = 0 - # 4. Hard limit: if we're tracking too many pubkeys, remove oldest inactive ones if len(self._bucket_state) > self._max_tracked_pubkeys: # Sort by last_seen and remove oldest 10% sorted_pubkeys = sorted( @@ -187,9 +203,33 @@ class AdvertHelper: f"{len(inactive_pubkeys)} inactive pubkeys. " f"Tracking: {len(self._bucket_state)} buckets, " f"{len(self._penalty_until)} penalties, " - f"{len(self._known_neighbors)} neighbors" + f"{len(self._known_neighbors)} neighbors, " + f"{len(self._recent_advert_hashes)} advert hashes" ) + def _dedupe_advert_packet_hash(self, packet, now: float) -> bool: + """Return True when advert packet hash was already seen recently.""" + try: + pkt_hash = packet.calculate_packet_hash().hex().upper() + except Exception: + return False + + expires_at = self._recent_advert_hashes.get(pkt_hash) + if expires_at and expires_at > now: + # Move to end so hot hashes remain least likely to be evicted + self._recent_advert_hashes.move_to_end(pkt_hash) + return True + + # Track first-seen (or expired hash re-seen) + self._recent_advert_hashes[pkt_hash] = now + self._advert_dedupe_ttl_seconds + self._recent_advert_hashes.move_to_end(pkt_hash) + + # Opportunistic cleanup to keep memory bounded between scheduled cleanup runs + while len(self._recent_advert_hashes) > self._advert_dedupe_max_hashes: + self._recent_advert_hashes.popitem(last=False) + + return False + # ------------------------------------------------------------------------- # Adaptive tier calculation # ------------------------------------------------------------------------- @@ -451,11 +491,18 @@ class AdvertHelper: "stats": { "adverts_allowed": self._stats_adverts_allowed, "adverts_dropped": self._stats_adverts_dropped, + "adverts_duplicate_reheard": self._stats_advert_duplicates, "drop_rate": round( self._stats_adverts_dropped / max(1, self._stats_adverts_allowed + self._stats_adverts_dropped), 3, ), }, + "dedupe": { + "enabled": True, + "ttl_seconds": self._advert_dedupe_ttl_seconds, + "tracked_hashes": len(self._recent_advert_hashes), + "max_hashes": self._advert_dedupe_max_hashes, + }, "active_penalties": active_penalties, "tracked_pubkeys": len(self._bucket_state), "bucket_states": bucket_summary, @@ -501,8 +548,20 @@ class AdvertHelper: node_name = advert_data["name"] contact_type = advert_data["contact_type"] - # Per-pubkey rate limiting (token bucket + penalty box) now = time.time() + + # Re-heard duplicates should be measured but not consume limiter tokens. + if self._dedupe_advert_packet_hash(packet, now): + self._stats_advert_duplicates += 1 + self._update_metrics_window(now, is_advert=False, is_duplicate=True) + logger.debug( + "Duplicate advert re-heard from '%s' (%s...), skipping limiter/storage", + node_name, + pubkey[:16], + ) + return + + # Per-pubkey rate limiting (token bucket + penalty box) allowed, reason = self._allow_advert(pubkey, now) if not allowed: logger.warning(f"Dropping advert from '{node_name}' ({pubkey[:16]}...): {reason}") @@ -630,9 +689,15 @@ class AdvertHelper: float(penalty_cfg.get("max_penalty_seconds", 86400.0)), ) + # Advert dedupe config + dedupe_cfg = repeater_cfg.get("advert_dedupe", {}) + self._advert_dedupe_ttl_seconds = max(1.0, float(dedupe_cfg.get("ttl_seconds", 120.0))) + self._advert_dedupe_max_hashes = max(100, int(dedupe_cfg.get("max_hashes", 10000))) + logger.info( f"Advert limiter config reloaded: adaptive={self._adaptive_enabled}, " - f"rate_limit={self._rate_limit_enabled}, bucket={self._base_bucket_capacity:.1f}" + f"rate_limit={self._rate_limit_enabled}, bucket={self._base_bucket_capacity:.1f}, " + f"dedupe=True" ) except Exception as e: logger.error(f"Error reloading advert limiter config: {e}")