mirror of
https://github.com/rightup/pyMC_Repeater.git
synced 2026-03-28 17:43:06 +01:00
- Introduced `record_packet_only` method in `RepeaterHandler` to log packets for UI/storage without forwarding or duplicate checks, specifically for injection-only types like ANON_REQ and ACK. - Updated `PacketRouter` to call `_record_for_ui` method, ensuring that relevant metadata is recorded for packets processed by various handlers. - Enhanced handling of packet metadata, including RSSI and SNR values, to improve the visibility of packet information in the web UI.
1140 lines
46 KiB
Python
1140 lines
46 KiB
Python
import asyncio
|
|
import copy
|
|
import logging
|
|
import struct
|
|
import time
|
|
from collections import OrderedDict
|
|
from typing import Optional, Tuple
|
|
|
|
from pymc_core.node.handlers.base import BaseHandler
|
|
from pymc_core.protocol import Packet
|
|
from pymc_core.protocol.constants import (
|
|
MAX_PATH_SIZE,
|
|
PAYLOAD_TYPE_ADVERT,
|
|
PAYLOAD_TYPE_ANON_REQ,
|
|
PH_ROUTE_MASK,
|
|
PH_TYPE_MASK,
|
|
PH_TYPE_SHIFT,
|
|
ROUTE_TYPE_DIRECT,
|
|
ROUTE_TYPE_FLOOD,
|
|
ROUTE_TYPE_TRANSPORT_DIRECT,
|
|
ROUTE_TYPE_TRANSPORT_FLOOD,
|
|
)
|
|
from pymc_core.protocol.packet_utils import PacketHeaderUtils, PacketTimingUtils, PathUtils
|
|
|
|
from repeater.airtime import AirtimeManager
|
|
from repeater.data_acquisition import StorageCollector
|
|
|
|
logger = logging.getLogger("RepeaterHandler")
|
|
|
|
NOISE_FLOOR_INTERVAL = 30.0 # seconds
|
|
|
|
LOOP_DETECT_OFF = "off"
|
|
LOOP_DETECT_MINIMAL = "minimal"
|
|
LOOP_DETECT_MODERATE = "moderate"
|
|
LOOP_DETECT_STRICT = "strict"
|
|
|
|
# Thresholds for 1-byte path hashes loop detection.
|
|
# Count how many times our own hash already exists in the incoming FLOOD path.
|
|
# If occurrences >= threshold, treat as loop and drop.
|
|
LOOP_DETECT_MAX_COUNTERS = {
|
|
LOOP_DETECT_MINIMAL: 4,
|
|
LOOP_DETECT_MODERATE: 2,
|
|
LOOP_DETECT_STRICT: 1,
|
|
}
|
|
|
|
|
|
class RepeaterHandler(BaseHandler):
|
|
|
|
@staticmethod
|
|
def payload_type() -> int:
|
|
|
|
return 0xFF # Special marker (not a real payload type)
|
|
|
|
def __init__(self, config: dict, dispatcher, local_hash: int, *, local_hash_bytes=None, send_advert_func=None):
|
|
|
|
self.config = config
|
|
self.dispatcher = dispatcher
|
|
self.local_hash = local_hash
|
|
self.local_hash_bytes = local_hash_bytes or bytes([local_hash])
|
|
self.send_advert_func = send_advert_func
|
|
self.airtime_mgr = AirtimeManager(config)
|
|
self.seen_packets = OrderedDict()
|
|
self.cache_ttl = max(
|
|
300, config.get("repeater", {}).get("cache_ttl", 3600)
|
|
) # Min 5 min, default 1 hour
|
|
self.max_cache_size = 1000
|
|
self.tx_delay_factor = config.get("delays", {}).get("tx_delay_factor", 1.0)
|
|
self.direct_tx_delay_factor = config.get("delays", {}).get("direct_tx_delay_factor", 0.5)
|
|
self.use_score_for_tx = config.get("repeater", {}).get("use_score_for_tx", False)
|
|
self.score_threshold = config.get("repeater", {}).get("score_threshold", 0.3)
|
|
self.send_advert_interval_hours = config.get("repeater", {}).get(
|
|
"send_advert_interval_hours", 10
|
|
)
|
|
self.last_advert_time = time.time()
|
|
self.loop_detect_mode = self._normalize_loop_detect_mode(
|
|
config.get("mesh", {}).get("loop_detect", LOOP_DETECT_OFF)
|
|
)
|
|
|
|
radio = dispatcher.radio if dispatcher else None
|
|
if radio:
|
|
self.radio_config = {
|
|
"spreading_factor": getattr(radio, "spreading_factor", 8),
|
|
"bandwidth": getattr(radio, "bandwidth", 125000),
|
|
"coding_rate": getattr(radio, "coding_rate", 8),
|
|
"preamble_length": getattr(radio, "preamble_length", 17),
|
|
"frequency": getattr(radio, "frequency", 915000000),
|
|
"tx_power": getattr(radio, "tx_power", 14),
|
|
}
|
|
logger.info(
|
|
f"radio settings: SF={self.radio_config['spreading_factor']}, "
|
|
f"BW={self.radio_config['bandwidth']}Hz, CR={self.radio_config['coding_rate']}"
|
|
)
|
|
else:
|
|
raise RuntimeError("Radio object not available - cannot initialize repeater")
|
|
|
|
# Statistics tracking for dashboard
|
|
self.rx_count = 0
|
|
self.forwarded_count = 0
|
|
self.dropped_count = 0
|
|
self.recent_packets = []
|
|
self.max_recent_packets = 50
|
|
self.start_time = time.time()
|
|
|
|
# Storage collector for persistent packet logging
|
|
try:
|
|
|
|
local_identity = dispatcher.local_identity if dispatcher else None
|
|
self.storage = StorageCollector(config, local_identity, repeater_handler=self)
|
|
logger.info("StorageCollector initialized successfully")
|
|
except Exception as e:
|
|
logger.error(f"Failed to initialize StorageCollector: {e}")
|
|
self.storage = None
|
|
|
|
# Initialize background timer tracking
|
|
self.last_noise_measurement = time.time()
|
|
self.noise_floor_interval = NOISE_FLOOR_INTERVAL # 30 seconds
|
|
self._background_task = None
|
|
self._last_crc_error_count = 0 # Track radio counter for delta persistence
|
|
|
|
# Cache transport keys for efficient lookup
|
|
self._transport_keys_cache = None
|
|
self._transport_keys_cache_time = 0
|
|
self._transport_keys_cache_ttl = 60 # Cache for 60 seconds
|
|
|
|
self._start_background_tasks()
|
|
|
|
async def __call__(
|
|
self, packet: Packet, metadata: Optional[dict] = None, local_transmission: bool = False
|
|
) -> None:
|
|
|
|
if metadata is None:
|
|
metadata = {}
|
|
|
|
self.rx_count += 1
|
|
|
|
# Check if we're in monitor mode (receive only, no forwarding)
|
|
mode = self.config.get("repeater", {}).get("mode", "forward")
|
|
monitor_mode = mode == "monitor"
|
|
|
|
logger.debug(
|
|
f"RX packet: header=0x{packet.header:02x}, payload_len={len(packet.payload or b'')}, "
|
|
f"path_len={len(packet.path) if packet.path else 0}, "
|
|
f"rssi={metadata.get('rssi', 'N/A')}, snr={metadata.get('snr', 'N/A')}, mode={mode}"
|
|
)
|
|
|
|
# clone the packet to avoid modifying the original
|
|
processed_packet = copy.deepcopy(packet)
|
|
|
|
snr = metadata.get("snr", 0.0)
|
|
rssi = metadata.get("rssi", 0)
|
|
transmitted = False
|
|
tx_delay_ms = 0.0
|
|
drop_reason = None
|
|
|
|
original_path_hashes = packet.get_path_hashes_hex()
|
|
path_hash_size = packet.get_path_hash_size()
|
|
|
|
# Process for forwarding (skip if in monitor mode or if this is a local transmission)
|
|
result = (
|
|
None
|
|
if (monitor_mode or local_transmission)
|
|
else self.process_packet(processed_packet, snr)
|
|
)
|
|
forwarded_path_hashes = None
|
|
|
|
# For local transmissions, create a direct transmission result
|
|
if local_transmission and not monitor_mode:
|
|
# Mark local packet as seen to prevent duplicate processing when received back
|
|
self.mark_seen(packet)
|
|
# Calculate transmission delay for local packets
|
|
delay = self._calculate_tx_delay(packet, snr)
|
|
result = (packet, delay)
|
|
forwarded_path_hashes = packet.get_path_hashes_hex()
|
|
logger.debug(f"Local transmission: calculated delay {delay:.3f}s")
|
|
|
|
if result:
|
|
fwd_pkt, delay = result
|
|
tx_delay_ms = delay * 1000.0
|
|
|
|
# Capture the forwarded path (after modification)
|
|
forwarded_path_hashes = fwd_pkt.get_path_hashes_hex()
|
|
|
|
# Check duty-cycle before scheduling TX
|
|
airtime_ms = self.airtime_mgr.calculate_airtime(fwd_pkt.get_raw_length())
|
|
|
|
can_tx, wait_time = self.airtime_mgr.can_transmit(airtime_ms)
|
|
|
|
# LBT metadata (set after any TX path that awaits send)
|
|
tx_metadata = None
|
|
lbt_attempts = 0
|
|
lbt_backoff_delays_ms = None
|
|
lbt_channel_busy = False
|
|
|
|
if not can_tx:
|
|
if local_transmission:
|
|
# Defer local TX until duty cycle allows instead of dropping
|
|
deferred_delay = delay + wait_time
|
|
logger.info(
|
|
f"Duty-cycle limit: deferring local TX by {wait_time:.1f}s "
|
|
f"(airtime={airtime_ms:.1f}ms)"
|
|
)
|
|
self.forwarded_count += 1
|
|
transmitted = True
|
|
tx_task = await self.schedule_retransmit(
|
|
fwd_pkt, deferred_delay, airtime_ms, local_transmission=True
|
|
)
|
|
try:
|
|
await tx_task
|
|
except Exception as e:
|
|
self.forwarded_count -= 1
|
|
transmitted = False
|
|
drop_reason = "TX failed (deferred)"
|
|
logger.warning(f"Deferred local TX failed: {e}")
|
|
raise
|
|
tx_metadata = getattr(fwd_pkt, "_tx_metadata", None)
|
|
if tx_metadata:
|
|
lbt_attempts = tx_metadata.get("lbt_attempts", 0)
|
|
lbt_backoff_delays_ms = tx_metadata.get(
|
|
"lbt_backoff_delays_ms", []
|
|
)
|
|
lbt_channel_busy = tx_metadata.get("lbt_channel_busy", False)
|
|
if lbt_attempts > 0:
|
|
total_lbt_delay = sum(lbt_backoff_delays_ms)
|
|
logger.info(
|
|
f"LBT: {lbt_attempts} attempts, "
|
|
f"{total_lbt_delay:.0f}ms delay, "
|
|
f"backoffs={lbt_backoff_delays_ms}"
|
|
)
|
|
else:
|
|
logger.warning(
|
|
f"Duty-cycle limit exceeded. Airtime={airtime_ms:.1f}ms, "
|
|
f"wait={wait_time:.1f}s before retry"
|
|
)
|
|
self.dropped_count += 1
|
|
drop_reason = "Duty cycle limit"
|
|
else:
|
|
self.forwarded_count += 1
|
|
transmitted = True
|
|
tx_task = await self.schedule_retransmit(
|
|
fwd_pkt, delay, airtime_ms, local_transmission=local_transmission
|
|
)
|
|
try:
|
|
await tx_task
|
|
except Exception as e:
|
|
self.forwarded_count -= 1
|
|
transmitted = False
|
|
drop_reason = "TX failed"
|
|
logger.warning(f"Local TX failed: {e}")
|
|
raise
|
|
tx_metadata = getattr(fwd_pkt, "_tx_metadata", None)
|
|
if tx_metadata:
|
|
lbt_attempts = tx_metadata.get("lbt_attempts", 0)
|
|
lbt_backoff_delays_ms = tx_metadata.get("lbt_backoff_delays_ms", [])
|
|
lbt_channel_busy = tx_metadata.get("lbt_channel_busy", False)
|
|
|
|
if lbt_attempts > 0:
|
|
total_lbt_delay = sum(lbt_backoff_delays_ms)
|
|
logger.info(
|
|
f"LBT: {lbt_attempts} attempts, {total_lbt_delay:.0f}ms delay, "
|
|
f"backoffs={lbt_backoff_delays_ms}"
|
|
)
|
|
else:
|
|
self.dropped_count += 1
|
|
# Determine drop reason from process_packet result
|
|
if monitor_mode:
|
|
drop_reason = "Monitor mode"
|
|
else:
|
|
# Check if packet has a specific drop reason set by handlers
|
|
drop_reason = processed_packet.drop_reason or self._get_drop_reason(
|
|
processed_packet
|
|
)
|
|
logger.debug(f"Packet not forwarded: {drop_reason}")
|
|
|
|
# Extract packet type and route from header
|
|
if not hasattr(packet, "header") or packet.header is None:
|
|
logger.error(f"Packet missing header attribute! Packet: {packet}")
|
|
payload_type = 0
|
|
route_type = 0
|
|
else:
|
|
header_info = PacketHeaderUtils.parse_header(packet.header)
|
|
payload_type = header_info["payload_type"]
|
|
route_type = header_info["route_type"]
|
|
logger.debug(
|
|
f"Packet header=0x{packet.header:02x}, type={payload_type}, route={route_type}"
|
|
)
|
|
|
|
# Check if this is a duplicate
|
|
pkt_hash = packet.calculate_packet_hash().hex().upper()
|
|
is_dupe = pkt_hash in self.seen_packets and not transmitted
|
|
|
|
# Set drop reason for duplicates
|
|
if is_dupe and drop_reason is None:
|
|
drop_reason = "Duplicate"
|
|
|
|
path_hash = None
|
|
display_hashes = (
|
|
original_path_hashes if original_path_hashes else packet.get_path_hashes_hex()
|
|
)
|
|
if display_hashes:
|
|
display = display_hashes[:8]
|
|
if len(display_hashes) > 8:
|
|
display = list(display) + ["..."]
|
|
path_hash = "[" + ", ".join(display) + "]"
|
|
|
|
src_hash = None
|
|
dst_hash = None
|
|
|
|
# Payload types with dest_hash and src_hash as first 2 bytes
|
|
if payload_type in [0x00, 0x01, 0x02, 0x08]:
|
|
if hasattr(packet, "payload") and packet.payload and len(packet.payload) >= 2:
|
|
dst_hash = f"{packet.payload[0]:02X}"
|
|
src_hash = f"{packet.payload[1]:02X}"
|
|
|
|
# ADVERT packets have source identifier as first byte
|
|
elif payload_type == PAYLOAD_TYPE_ADVERT:
|
|
if hasattr(packet, "payload") and packet.payload and len(packet.payload) >= 1:
|
|
src_hash = f"{packet.payload[0]:02X}"
|
|
|
|
# Record packet for charts
|
|
packet_record = {
|
|
"timestamp": time.time(),
|
|
"header": (
|
|
f"0x{packet.header:02X}"
|
|
if hasattr(packet, "header") and packet.header is not None
|
|
else None
|
|
),
|
|
"payload": (
|
|
packet.payload.hex() if hasattr(packet, "payload") and packet.payload else None
|
|
),
|
|
"payload_length": (
|
|
len(packet.payload) if hasattr(packet, "payload") and packet.payload else 0
|
|
),
|
|
"type": payload_type,
|
|
"route": route_type,
|
|
"length": len(packet.payload or b""),
|
|
"rssi": rssi,
|
|
"snr": snr,
|
|
"score": self.calculate_packet_score(
|
|
snr, len(packet.payload or b""), self.radio_config["spreading_factor"]
|
|
),
|
|
"tx_delay_ms": tx_delay_ms,
|
|
"transmitted": transmitted,
|
|
"is_duplicate": is_dupe,
|
|
"packet_hash": pkt_hash[:16],
|
|
"drop_reason": drop_reason,
|
|
"path_hash": path_hash,
|
|
"src_hash": src_hash,
|
|
"dst_hash": dst_hash,
|
|
"original_path": original_path_hashes or None,
|
|
"forwarded_path": forwarded_path_hashes,
|
|
"path_hash_size": path_hash_size,
|
|
"raw_packet": packet.write_to().hex() if hasattr(packet, "write_to") else None,
|
|
"lbt_attempts": lbt_attempts if transmitted else 0,
|
|
"lbt_backoff_delays_ms": (
|
|
lbt_backoff_delays_ms if transmitted and lbt_backoff_delays_ms else None
|
|
),
|
|
"lbt_channel_busy": lbt_channel_busy if transmitted else False,
|
|
}
|
|
|
|
# Store packet record to persistent storage
|
|
# Skip LetsMesh only for invalid packets (not duplicates or operational drops)
|
|
if self.storage:
|
|
try:
|
|
# Only skip LetsMesh for actual invalid/bad packets
|
|
invalid_reasons = ["Invalid advert packet", "Empty payload", "Path too long"]
|
|
skip_letsmesh = drop_reason in invalid_reasons if drop_reason else False
|
|
self.storage.record_packet(packet_record, skip_letsmesh_if_invalid=skip_letsmesh)
|
|
except Exception as e:
|
|
logger.error(f"Failed to store packet record: {e}")
|
|
|
|
# If this is a duplicate, try to attach it to the original packet
|
|
if is_dupe and len(self.recent_packets) > 0:
|
|
# Find the original packet with same hash
|
|
for idx in range(len(self.recent_packets) - 1, -1, -1):
|
|
prev_pkt = self.recent_packets[idx]
|
|
if prev_pkt.get("packet_hash") == packet_record["packet_hash"]:
|
|
# Add duplicate to original packet's duplicate list
|
|
if "duplicates" not in prev_pkt:
|
|
prev_pkt["duplicates"] = []
|
|
prev_pkt["duplicates"].append(packet_record)
|
|
# Don't add duplicate to main list, just track in original
|
|
break
|
|
else:
|
|
# Original not found, add as regular packet
|
|
self.recent_packets.append(packet_record)
|
|
else:
|
|
# Not a duplicate or first occurrence
|
|
self.recent_packets.append(packet_record)
|
|
|
|
if len(self.recent_packets) > self.max_recent_packets:
|
|
self.recent_packets.pop(0)
|
|
|
|
def log_trace_record(self, packet_record: dict) -> None:
|
|
"""Manually log a packet trace record (used by external callers)"""
|
|
self.recent_packets.append(packet_record)
|
|
|
|
self.rx_count += 1
|
|
if packet_record.get("transmitted", False):
|
|
self.forwarded_count += 1
|
|
else:
|
|
self.dropped_count += 1
|
|
|
|
# Store to persistent storage (same as __call__ does)
|
|
if self.storage:
|
|
try:
|
|
self.storage.record_packet(packet_record)
|
|
except Exception as e:
|
|
logger.error(f"Failed to store packet record: {e}")
|
|
|
|
if len(self.recent_packets) > self.max_recent_packets:
|
|
self.recent_packets.pop(0)
|
|
|
|
def record_packet_only(self, packet: Packet, metadata: dict) -> None:
|
|
"""Record a packet for UI/storage without running forwarding or duplicate logic.
|
|
|
|
Used by the packet router for injection-only types (ANON_REQ, ACK, PATH, etc.)
|
|
so they still appear in the web UI.
|
|
"""
|
|
if not self.storage:
|
|
return
|
|
rssi = metadata.get("rssi", 0)
|
|
snr = metadata.get("snr", 0.0)
|
|
if not hasattr(packet, "header") or packet.header is None:
|
|
logger.debug("record_packet_only: packet missing header, skipping")
|
|
return
|
|
header_info = PacketHeaderUtils.parse_header(packet.header)
|
|
payload_type = header_info["payload_type"]
|
|
route_type = header_info["route_type"]
|
|
pkt_hash = packet.calculate_packet_hash().hex().upper()
|
|
original_path_hashes = packet.get_path_hashes_hex()
|
|
path_hash_size = packet.get_path_hash_size()
|
|
display_hashes = original_path_hashes
|
|
path_hash = None
|
|
if display_hashes:
|
|
display = display_hashes[:8]
|
|
if len(display_hashes) > 8:
|
|
display = list(display) + ["..."]
|
|
path_hash = "[" + ", ".join(display) + "]"
|
|
src_hash = None
|
|
dst_hash = None
|
|
if payload_type in [0x00, 0x01, 0x02, 0x08]:
|
|
if hasattr(packet, "payload") and packet.payload and len(packet.payload) >= 2:
|
|
dst_hash = f"{packet.payload[0]:02X}"
|
|
src_hash = f"{packet.payload[1]:02X}"
|
|
elif payload_type == PAYLOAD_TYPE_ADVERT:
|
|
if hasattr(packet, "payload") and packet.payload and len(packet.payload) >= 1:
|
|
src_hash = f"{packet.payload[0]:02X}"
|
|
elif payload_type == PAYLOAD_TYPE_ANON_REQ:
|
|
if hasattr(packet, "payload") and packet.payload and len(packet.payload) >= 1:
|
|
dst_hash = f"{packet.payload[0]:02X}"
|
|
packet_record = {
|
|
"timestamp": time.time(),
|
|
"header": f"0x{packet.header:02X}",
|
|
"payload": (
|
|
packet.payload.hex() if hasattr(packet, "payload") and packet.payload else None
|
|
),
|
|
"payload_length": (
|
|
len(packet.payload) if hasattr(packet, "payload") and packet.payload else 0
|
|
),
|
|
"type": payload_type,
|
|
"route": route_type,
|
|
"length": len(packet.payload or b""),
|
|
"rssi": rssi,
|
|
"snr": snr,
|
|
"score": self.calculate_packet_score(
|
|
snr, len(packet.payload or b""), self.radio_config["spreading_factor"]
|
|
),
|
|
"tx_delay_ms": 0.0,
|
|
"transmitted": False,
|
|
"is_duplicate": False,
|
|
"packet_hash": pkt_hash[:16],
|
|
"drop_reason": None,
|
|
"path_hash": path_hash,
|
|
"src_hash": src_hash,
|
|
"dst_hash": dst_hash,
|
|
"original_path": original_path_hashes or None,
|
|
"forwarded_path": None,
|
|
"path_hash_size": path_hash_size,
|
|
"raw_packet": packet.write_to().hex() if hasattr(packet, "write_to") else None,
|
|
"lbt_attempts": 0,
|
|
"lbt_backoff_delays_ms": None,
|
|
"lbt_channel_busy": False,
|
|
}
|
|
try:
|
|
self.storage.record_packet(packet_record, skip_letsmesh_if_invalid=False)
|
|
except Exception as e:
|
|
logger.error(f"Failed to store packet record (record_packet_only): {e}")
|
|
return
|
|
self.recent_packets.append(packet_record)
|
|
if len(self.recent_packets) > self.max_recent_packets:
|
|
self.recent_packets.pop(0)
|
|
self.rx_count += 1
|
|
|
|
def cleanup_cache(self):
|
|
|
|
now = time.time()
|
|
expired = [k for k, ts in self.seen_packets.items() if now - ts > self.cache_ttl]
|
|
for k in expired:
|
|
del self.seen_packets[k]
|
|
|
|
def _get_drop_reason(self, packet: Packet) -> str:
|
|
|
|
if self.is_duplicate(packet):
|
|
return "Duplicate"
|
|
|
|
if not packet or not packet.payload:
|
|
return "Empty payload"
|
|
|
|
if len(packet.path or []) >= MAX_PATH_SIZE:
|
|
return "Path too long"
|
|
|
|
route_type = packet.header & PH_ROUTE_MASK
|
|
|
|
if route_type == ROUTE_TYPE_FLOOD:
|
|
# Check if global flood policy blocked it
|
|
global_flood_allow = self.config.get("mesh", {}).get("global_flood_allow", True)
|
|
if not global_flood_allow:
|
|
return "Global flood policy disabled"
|
|
|
|
if route_type == ROUTE_TYPE_DIRECT:
|
|
hash_size = packet.get_path_hash_size()
|
|
if not packet.path or len(packet.path) < hash_size:
|
|
return "Direct: no path"
|
|
next_hop = bytes(packet.path[:hash_size])
|
|
if next_hop != self.local_hash_bytes[:hash_size]:
|
|
return "Direct: not for us"
|
|
|
|
# Default reason
|
|
return "Unknown"
|
|
|
|
def is_duplicate(self, packet: Packet) -> bool:
|
|
|
|
pkt_hash = packet.calculate_packet_hash().hex().upper()
|
|
if pkt_hash in self.seen_packets:
|
|
return True
|
|
return False
|
|
|
|
def mark_seen(self, packet: Packet):
|
|
|
|
pkt_hash = packet.calculate_packet_hash().hex().upper()
|
|
self.seen_packets[pkt_hash] = time.time()
|
|
|
|
if len(self.seen_packets) > self.max_cache_size:
|
|
self.seen_packets.popitem(last=False)
|
|
|
|
def validate_packet(self, packet: Packet) -> Tuple[bool, str]:
|
|
|
|
if not packet or not packet.payload:
|
|
return False, "Empty payload"
|
|
|
|
if len(packet.path or []) >= MAX_PATH_SIZE:
|
|
return (
|
|
False,
|
|
f"Path length {len(packet.path or [])} exceeds MAX_PATH_SIZE ({MAX_PATH_SIZE})",
|
|
)
|
|
|
|
return True, ""
|
|
|
|
def _normalize_loop_detect_mode(self, mode) -> str:
|
|
if isinstance(mode, str):
|
|
normalized = mode.strip().lower()
|
|
if normalized in {
|
|
LOOP_DETECT_OFF,
|
|
LOOP_DETECT_MINIMAL,
|
|
LOOP_DETECT_MODERATE,
|
|
LOOP_DETECT_STRICT,
|
|
}:
|
|
return normalized
|
|
return LOOP_DETECT_OFF
|
|
|
|
def _get_loop_detect_mode(self) -> str:
|
|
return self.loop_detect_mode
|
|
|
|
def _is_flood_looped(self, packet: Packet, mode: Optional[str] = None) -> bool:
|
|
mode = mode or self._get_loop_detect_mode()
|
|
if mode == LOOP_DETECT_OFF:
|
|
return False
|
|
|
|
max_counter = LOOP_DETECT_MAX_COUNTERS.get(mode)
|
|
if max_counter is None:
|
|
return False
|
|
|
|
path = packet.path or bytearray()
|
|
local_count = sum(1 for hop in path if hop == self.local_hash)
|
|
return local_count >= max_counter
|
|
|
|
def _check_transport_codes(self, packet: Packet) -> Tuple[bool, str]:
|
|
|
|
if not self.storage:
|
|
logger.warning("Transport code check failed: no storage available")
|
|
return False, "No storage available for transport key validation"
|
|
|
|
try:
|
|
from pymc_core.protocol.transport_keys import calc_transport_code
|
|
|
|
# Check cache validity
|
|
current_time = time.time()
|
|
if (
|
|
self._transport_keys_cache is None
|
|
or current_time - self._transport_keys_cache_time > self._transport_keys_cache_ttl
|
|
):
|
|
# Refresh cache
|
|
self._transport_keys_cache = self.storage.get_transport_keys()
|
|
self._transport_keys_cache_time = current_time
|
|
|
|
transport_keys = self._transport_keys_cache
|
|
|
|
if not transport_keys:
|
|
return False, "No transport keys configured"
|
|
|
|
# Check if packet has transport codes
|
|
if not packet.has_transport_codes():
|
|
return False, "No transport codes present"
|
|
|
|
transport_code_0 = packet.transport_codes[0] # First transport code
|
|
|
|
payload = packet.get_payload()
|
|
payload_type = (
|
|
packet.get_payload_type()
|
|
if hasattr(packet, "get_payload_type")
|
|
else ((packet.header & 0x3C) >> 2)
|
|
)
|
|
|
|
# Check packet against each transport key
|
|
for key_record in transport_keys:
|
|
transport_key_encoded = key_record.get("transport_key")
|
|
key_name = key_record.get("name", "unknown")
|
|
flood_policy = key_record.get("flood_policy", "deny")
|
|
|
|
if not transport_key_encoded:
|
|
continue
|
|
|
|
try:
|
|
import base64
|
|
|
|
transport_key = base64.b64decode(transport_key_encoded)
|
|
expected_code = calc_transport_code(transport_key, packet)
|
|
if transport_code_0 == expected_code:
|
|
logger.debug(
|
|
f"Transport code validated for key '{key_name}' with policy '{flood_policy}'"
|
|
)
|
|
|
|
# Update last_used timestamp for this key
|
|
try:
|
|
key_id = key_record.get("id")
|
|
if key_id:
|
|
self.storage.update_transport_key(
|
|
key_id=key_id, last_used=time.time()
|
|
)
|
|
logger.debug(
|
|
f"Updated last_used timestamp for transport key '{key_name}'"
|
|
)
|
|
except Exception as e:
|
|
logger.warning(
|
|
f"Failed to update last_used for transport key '{key_name}': {e}"
|
|
)
|
|
|
|
# Check flood policy for this key
|
|
if flood_policy == "allow":
|
|
return True, ""
|
|
else:
|
|
return False, f"Transport key '{key_name}' flood policy denied"
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Error checking transport key '{key_name}': {e}")
|
|
continue
|
|
|
|
# No matching transport code found
|
|
logger.debug(
|
|
f"Transport code 0x{transport_code_0:04X} denied (checked {len(transport_keys)} keys)"
|
|
)
|
|
return False, "No matching transport code"
|
|
|
|
except Exception as e:
|
|
logger.error(f"Transport code validation error: {e}")
|
|
return False, f"Transport code validation error: {e}"
|
|
|
|
def flood_forward(self, packet: Packet) -> Optional[Packet]:
|
|
|
|
# Validate
|
|
valid, reason = self.validate_packet(packet)
|
|
if not valid:
|
|
packet.drop_reason = reason
|
|
return None
|
|
|
|
# Check if packet is marked do-not-retransmit
|
|
if packet.is_marked_do_not_retransmit():
|
|
# Check if packet has custom drop reason
|
|
if not packet.drop_reason:
|
|
packet.drop_reason = "Marked do not retransmit"
|
|
return None
|
|
|
|
# Check global flood policy
|
|
global_flood_allow = self.config.get("mesh", {}).get("global_flood_allow", True)
|
|
if not global_flood_allow:
|
|
route_type = packet.header & PH_ROUTE_MASK
|
|
if route_type == ROUTE_TYPE_FLOOD or route_type == ROUTE_TYPE_TRANSPORT_FLOOD:
|
|
|
|
allowed, check_reason = self._check_transport_codes(packet)
|
|
if not allowed:
|
|
packet.drop_reason = check_reason
|
|
return None
|
|
else:
|
|
packet.drop_reason = "Global flood policy disabled"
|
|
return None
|
|
|
|
mode = self._get_loop_detect_mode()
|
|
if self._is_flood_looped(packet, mode):
|
|
packet.drop_reason = f"FLOOD loop detected ({mode})"
|
|
return None
|
|
|
|
# Suppress duplicates
|
|
if self.is_duplicate(packet):
|
|
packet.drop_reason = "Duplicate"
|
|
return None
|
|
|
|
if packet.path is None:
|
|
packet.path = bytearray()
|
|
elif not isinstance(packet.path, bytearray):
|
|
packet.path = bytearray(packet.path)
|
|
|
|
hash_size = packet.get_path_hash_size()
|
|
hop_count = packet.get_path_hash_count()
|
|
|
|
# path_len encodes hop count in 6 bits (0-63); adding ourselves must not exceed 63
|
|
if hop_count >= 63:
|
|
packet.drop_reason = "Path hop count at maximum (63), cannot append"
|
|
return None
|
|
|
|
# Check path won't exceed MAX_PATH_SIZE after append
|
|
if (hop_count + 1) * hash_size > MAX_PATH_SIZE:
|
|
packet.drop_reason = "Path would exceed MAX_PATH_SIZE"
|
|
return None
|
|
|
|
self.mark_seen(packet)
|
|
|
|
# Append hash_size bytes from our public key prefix
|
|
packet.path.extend(self.local_hash_bytes[:hash_size])
|
|
packet.path_len = PathUtils.encode_path_len(hash_size, hop_count + 1)
|
|
|
|
return packet
|
|
|
|
def direct_forward(self, packet: Packet) -> Optional[Packet]:
|
|
|
|
# Validate packet (empty payload, oversized path, etc.)
|
|
valid, reason = self.validate_packet(packet)
|
|
if not valid:
|
|
packet.drop_reason = reason
|
|
return None
|
|
|
|
# Check if packet is marked do-not-retransmit
|
|
if packet.is_marked_do_not_retransmit():
|
|
if not packet.drop_reason:
|
|
packet.drop_reason = "Marked do not retransmit"
|
|
return None
|
|
|
|
hash_size = packet.get_path_hash_size()
|
|
hop_count = packet.get_path_hash_count()
|
|
|
|
# Check if we're the next hop
|
|
if not packet.path or len(packet.path) < hash_size:
|
|
packet.drop_reason = "Direct: no path"
|
|
return None
|
|
|
|
next_hop = bytes(packet.path[:hash_size])
|
|
if next_hop != self.local_hash_bytes[:hash_size]:
|
|
packet.drop_reason = "Direct: not for us"
|
|
return None
|
|
|
|
# Suppress duplicates
|
|
if self.is_duplicate(packet):
|
|
packet.drop_reason = "Duplicate"
|
|
return None
|
|
|
|
self.mark_seen(packet)
|
|
|
|
original_path = list(packet.path)
|
|
# Remove first hash entry (hash_size bytes)
|
|
packet.path = bytearray(packet.path[hash_size:])
|
|
packet.path_len = PathUtils.encode_path_len(hash_size, hop_count - 1)
|
|
|
|
return packet
|
|
|
|
@staticmethod
|
|
def calculate_packet_score(snr: float, packet_len: int, spreading_factor: int = 8) -> float:
|
|
|
|
# SNR thresholds per SF (from MeshCore RadioLibWrappers.cpp)
|
|
snr_thresholds = {7: -7.5, 8: -10.0, 9: -12.5, 10: -15.0, 11: -17.5, 12: -20.0}
|
|
|
|
if spreading_factor < 7:
|
|
return 0.0
|
|
|
|
threshold = snr_thresholds.get(spreading_factor, -10.0)
|
|
|
|
# Below threshold = no chance of success
|
|
if snr < threshold:
|
|
return 0.0
|
|
|
|
# Success rate based on SNR above threshold
|
|
success_rate_based_on_snr = (snr - threshold) / 10.0
|
|
|
|
# Collision penalty: longer packets more likely to collide (max 256 bytes)
|
|
collision_penalty = 1.0 - (packet_len / 256.0)
|
|
|
|
# Combined score
|
|
score = success_rate_based_on_snr * collision_penalty
|
|
|
|
return max(0.0, min(1.0, score))
|
|
|
|
def _calculate_tx_delay(self, packet: Packet, snr: float = 0.0) -> float:
|
|
|
|
import random
|
|
|
|
packet_len = packet.get_raw_length()
|
|
airtime_ms = self.airtime_mgr.calculate_airtime(packet_len)
|
|
|
|
route_type = packet.header & PH_ROUTE_MASK
|
|
|
|
# Base delay calculations
|
|
# this part took me along time to get right well i hope i got it right ;-)
|
|
|
|
if route_type == ROUTE_TYPE_FLOOD:
|
|
# Flood packets: random(0-5) * (airtime * 52/50 / 2) * tx_delay_factor
|
|
# This creates collision avoidance with tunable delay
|
|
base_delay_ms = (airtime_ms * 52 / 50) / 2.0 # From C++ implementation
|
|
random_mult = random.uniform(0, 5) # Random multiplier for collision avoidance
|
|
delay_ms = base_delay_ms * random_mult * self.tx_delay_factor
|
|
delay_s = delay_ms / 1000.0
|
|
else: # DIRECT
|
|
# Direct packets: use direct_tx_delay_factor (already in seconds)
|
|
# direct_tx_delay_factor is stored as seconds in config
|
|
delay_s = self.direct_tx_delay_factor
|
|
|
|
# Apply score-based delay adjustment ONLY if delay >= 50ms threshold
|
|
# (matching C++ reactive behavior in Dispatcher::calcRxDelay)
|
|
if delay_s >= 0.05 and self.use_score_for_tx:
|
|
score = self.calculate_packet_score(snr, packet_len)
|
|
# Higher score = shorter delay: max(0.2, 1.0 - score)
|
|
# score 1.0 → multiplier 0.2 (20% of original)
|
|
# score 0.0 → multiplier 1.0 (100% of original)
|
|
score_multiplier = max(0.2, 1.0 - score)
|
|
delay_s = delay_s * score_multiplier
|
|
logger.debug(
|
|
f"Congestion detected (delay >= 50ms), score={score:.2f}, "
|
|
f"delay multiplier={score_multiplier:.2f}"
|
|
)
|
|
|
|
# Cap at 5 seconds maximum
|
|
delay_s = min(delay_s, 5.0)
|
|
|
|
logger.debug(
|
|
f"Route={'FLOOD' if route_type == ROUTE_TYPE_FLOOD else 'DIRECT'}, "
|
|
f"len={packet_len}B, airtime={airtime_ms:.1f}ms, delay={delay_s:.3f}s"
|
|
)
|
|
|
|
return delay_s
|
|
|
|
def process_packet(self, packet: Packet, snr: float = 0.0) -> Optional[Tuple[Packet, float]]:
|
|
|
|
route_type = packet.header & PH_ROUTE_MASK
|
|
|
|
if route_type == ROUTE_TYPE_FLOOD or route_type == ROUTE_TYPE_TRANSPORT_FLOOD:
|
|
fwd_pkt = self.flood_forward(packet)
|
|
if fwd_pkt is None:
|
|
return None
|
|
delay = self._calculate_tx_delay(fwd_pkt, snr)
|
|
return fwd_pkt, delay
|
|
|
|
elif route_type == ROUTE_TYPE_DIRECT or route_type == ROUTE_TYPE_TRANSPORT_DIRECT:
|
|
fwd_pkt = self.direct_forward(packet)
|
|
if fwd_pkt is None:
|
|
return None
|
|
delay = self._calculate_tx_delay(fwd_pkt, snr)
|
|
return fwd_pkt, delay
|
|
|
|
else:
|
|
packet.drop_reason = f"Unknown route type: {route_type}"
|
|
return None
|
|
|
|
async def schedule_retransmit(
|
|
self,
|
|
fwd_pkt: Packet,
|
|
delay: float,
|
|
airtime_ms: float = 0.0,
|
|
local_transmission: bool = False,
|
|
):
|
|
"""Schedule a packet retransmission with delay and return the task.
|
|
|
|
If local_transmission is True and the first send fails, retry once after
|
|
a short delay (handles transient radio/LBT failures).
|
|
"""
|
|
|
|
async def delayed_send():
|
|
await asyncio.sleep(delay)
|
|
last_error = None
|
|
for attempt in range(2 if local_transmission else 1):
|
|
try:
|
|
await self.dispatcher.send_packet(fwd_pkt, wait_for_ack=False)
|
|
if airtime_ms > 0:
|
|
self.airtime_mgr.record_tx(airtime_ms)
|
|
packet_size = fwd_pkt.get_raw_length()
|
|
logger.info(
|
|
f"Retransmitted packet ({packet_size} bytes, "
|
|
f"{airtime_ms:.1f}ms airtime)"
|
|
)
|
|
return
|
|
except Exception as e:
|
|
last_error = e
|
|
logger.error(f"Retransmit failed: {e}")
|
|
if local_transmission and attempt == 0:
|
|
logger.info("Retrying local TX in 1s...")
|
|
await asyncio.sleep(1.0)
|
|
else:
|
|
raise
|
|
if last_error is not None:
|
|
raise last_error
|
|
|
|
return asyncio.create_task(delayed_send())
|
|
|
|
def get_noise_floor(self) -> Optional[float]:
|
|
try:
|
|
radio = self.dispatcher.radio if self.dispatcher else None
|
|
if radio and hasattr(radio, "get_noise_floor"):
|
|
return radio.get_noise_floor()
|
|
return None
|
|
except Exception as e:
|
|
logger.debug(f"Failed to get noise floor: {e}")
|
|
return None
|
|
|
|
def get_stats(self) -> dict:
|
|
|
|
uptime_seconds = time.time() - self.start_time
|
|
|
|
# Get config sections
|
|
repeater_config = self.config.get("repeater", {})
|
|
duty_cycle_config = self.config.get("duty_cycle", {})
|
|
delays_config = self.config.get("delays", {})
|
|
|
|
max_airtime_ms = duty_cycle_config.get("max_airtime_per_minute", 3600)
|
|
max_duty_cycle_percent = (max_airtime_ms / 60000) * 100 # 60000ms = 1 minute
|
|
|
|
# Calculate actual hourly rates (packets in last 3600 seconds)
|
|
now = time.time()
|
|
packets_last_hour = [p for p in self.recent_packets if now - p["timestamp"] < 3600]
|
|
rx_per_hour = len(packets_last_hour)
|
|
forwarded_per_hour = sum(1 for p in packets_last_hour if p.get("transmitted", False))
|
|
|
|
# Get current noise floor from radio
|
|
noise_floor_dbm = self.get_noise_floor()
|
|
|
|
# Get CRC error count from radio hardware
|
|
radio = self.dispatcher.radio if self.dispatcher else None
|
|
crc_error_count = getattr(radio, "crc_error_count", 0) if radio else 0
|
|
|
|
# Get neighbors from database
|
|
neighbors = self.storage.get_neighbors() if self.storage else {}
|
|
|
|
stats = {
|
|
"local_hash": f"0x{self.local_hash:02x}",
|
|
"duplicate_cache_size": len(self.seen_packets),
|
|
"cache_ttl": self.cache_ttl,
|
|
"rx_count": self.rx_count,
|
|
"forwarded_count": self.forwarded_count,
|
|
"dropped_count": self.dropped_count,
|
|
"rx_per_hour": rx_per_hour,
|
|
"forwarded_per_hour": forwarded_per_hour,
|
|
"recent_packets": self.recent_packets,
|
|
"neighbors": neighbors,
|
|
"uptime_seconds": uptime_seconds,
|
|
"noise_floor_dbm": noise_floor_dbm,
|
|
"crc_error_count": crc_error_count,
|
|
# Add configuration data
|
|
"config": {
|
|
"node_name": repeater_config.get("node_name", "Unknown"),
|
|
"repeater": {
|
|
"mode": repeater_config.get("mode", "forward"),
|
|
"use_score_for_tx": repeater_config.get("use_score_for_tx", False),
|
|
"score_threshold": repeater_config.get("score_threshold", 0.3),
|
|
"send_advert_interval_hours": repeater_config.get(
|
|
"send_advert_interval_hours", 10
|
|
),
|
|
"latitude": repeater_config.get("latitude", 0.0),
|
|
"longitude": repeater_config.get("longitude", 0.0),
|
|
"max_flood_hops": repeater_config.get("max_flood_hops", 3),
|
|
"advert_interval_minutes": repeater_config.get("advert_interval_minutes", 120),
|
|
"advert_rate_limit": repeater_config.get("advert_rate_limit", {}),
|
|
"advert_penalty_box": repeater_config.get("advert_penalty_box", {}),
|
|
"advert_adaptive": repeater_config.get("advert_adaptive", {}),
|
|
},
|
|
"radio": self.config.get(
|
|
"radio", {}
|
|
), # Read from live config, not cached radio_config
|
|
"duty_cycle": {
|
|
"max_airtime_percent": max_duty_cycle_percent,
|
|
"enforcement_enabled": duty_cycle_config.get("enforcement_enabled", True),
|
|
},
|
|
"delays": {
|
|
"tx_delay_factor": delays_config.get("tx_delay_factor", 1.0),
|
|
"direct_tx_delay_factor": delays_config.get("direct_tx_delay_factor", 0.5),
|
|
"rx_delay_base": delays_config.get("rx_delay_base", 0.0),
|
|
},
|
|
"web": self.config.get("web", {}), # Include web configuration
|
|
"mesh": {
|
|
"path_hash_mode": self.config.get("mesh", {}).get("path_hash_mode", 0),
|
|
},
|
|
},
|
|
"public_key": None,
|
|
}
|
|
# Add airtime stats
|
|
stats.update(self.airtime_mgr.get_stats())
|
|
return stats
|
|
|
|
def _start_background_tasks(self):
|
|
if self._background_task is None:
|
|
self._background_task = asyncio.create_task(self._background_timer_loop())
|
|
logger.info("Background timer started for noise floor and adverts")
|
|
|
|
async def _background_timer_loop(self):
|
|
try:
|
|
while True:
|
|
current_time = time.time()
|
|
|
|
# Check noise floor recording (every 30 seconds)
|
|
if current_time - self.last_noise_measurement >= self.noise_floor_interval:
|
|
await self._record_noise_floor_async()
|
|
await self._record_crc_errors_async()
|
|
self.last_noise_measurement = current_time
|
|
|
|
# Check advert sending (every N hours)
|
|
if self.send_advert_interval_hours > 0 and self.send_advert_func:
|
|
interval_seconds = self.send_advert_interval_hours * 3600
|
|
if current_time - self.last_advert_time >= interval_seconds:
|
|
await self._send_periodic_advert_async()
|
|
self.last_advert_time = current_time
|
|
|
|
# Sleep for 5 seconds before next check
|
|
await asyncio.sleep(5.0)
|
|
|
|
except asyncio.CancelledError:
|
|
logger.info("Background timer loop cancelled")
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error in background timer loop: {e}")
|
|
# Restart the timer after a delay
|
|
await asyncio.sleep(30)
|
|
self._background_task = asyncio.create_task(self._background_timer_loop())
|
|
|
|
async def _record_noise_floor_async(self):
|
|
if not self.storage:
|
|
return
|
|
|
|
try:
|
|
# Run in executor so KISS modem's blocking _send_command (up to 5s timeout)
|
|
# does not block the event loop and hang the process / delay Ctrl+C.
|
|
loop = asyncio.get_running_loop()
|
|
noise_floor = await loop.run_in_executor(None, self.get_noise_floor)
|
|
if noise_floor is not None:
|
|
self.storage.record_noise_floor(noise_floor)
|
|
logger.debug(f"Recorded noise floor: {noise_floor} dBm")
|
|
else:
|
|
logger.debug("Unable to read noise floor from radio")
|
|
except Exception as e:
|
|
logger.error(f"Error recording noise floor: {e}")
|
|
|
|
async def _record_crc_errors_async(self):
|
|
"""Persist CRC error delta from the radio hardware counter."""
|
|
if not self.storage:
|
|
return
|
|
|
|
try:
|
|
radio = self.dispatcher.radio if self.dispatcher else None
|
|
current = getattr(radio, "crc_error_count", 0) if radio else 0
|
|
delta = current - self._last_crc_error_count
|
|
if delta > 0:
|
|
self.storage.record_crc_errors(delta)
|
|
logger.debug(f"Recorded {delta} CRC errors (total: {current})")
|
|
self._last_crc_error_count = current
|
|
except Exception as e:
|
|
logger.error(f"Error recording CRC errors: {e}")
|
|
|
|
async def _send_periodic_advert_async(self):
|
|
logger.info(
|
|
f"Periodic advert timer triggered (interval: {self.send_advert_interval_hours}h)"
|
|
)
|
|
try:
|
|
if self.send_advert_func:
|
|
success = await self.send_advert_func()
|
|
if success:
|
|
logger.info("Periodic advert sent successfully")
|
|
else:
|
|
logger.warning("Failed to send periodic advert")
|
|
else:
|
|
logger.debug("No send_advert_func configured")
|
|
except Exception as e:
|
|
logger.error(f"Error sending periodic advert: {e}")
|
|
|
|
def reload_runtime_config(self):
|
|
"""Reload runtime configuration from self.config (called after live config updates)."""
|
|
try:
|
|
# Refresh delay factors
|
|
self.tx_delay_factor = self.config.get("delays", {}).get("tx_delay_factor", 1.0)
|
|
self.direct_tx_delay_factor = self.config.get("delays", {}).get(
|
|
"direct_tx_delay_factor", 0.5
|
|
)
|
|
|
|
# Refresh repeater settings
|
|
repeater_config = self.config.get("repeater", {})
|
|
self.use_score_for_tx = repeater_config.get("use_score_for_tx", False)
|
|
self.score_threshold = repeater_config.get("score_threshold", 0.3)
|
|
self.send_advert_interval_hours = repeater_config.get("send_advert_interval_hours", 10)
|
|
self.cache_ttl = repeater_config.get("cache_ttl", 60)
|
|
self.loop_detect_mode = self._normalize_loop_detect_mode(
|
|
self.config.get("mesh", {}).get("loop_detect", LOOP_DETECT_OFF)
|
|
)
|
|
|
|
# Note: Radio config changes require restart as they affect hardware
|
|
# Note: Airtime manager has its own config reference that gets updated
|
|
|
|
logger.info("Runtime configuration reloaded successfully")
|
|
except Exception as e:
|
|
logger.error(f"Error reloading runtime config: {e}")
|
|
|
|
def cleanup(self):
|
|
if self._background_task and not self._background_task.done():
|
|
self._background_task.cancel()
|
|
logger.info("Background timer task cancelled")
|
|
|
|
if self.storage:
|
|
try:
|
|
self.storage.close()
|
|
logger.info("StorageCollector closed successfully")
|
|
except Exception as e:
|
|
logger.error(f"Error closing StorageCollector: {e}")
|
|
|
|
def __del__(self):
|
|
try:
|
|
self.cleanup()
|
|
except Exception:
|
|
pass
|