Files
pyMC_Repeater/repeater/data_acquisition/rrdtool_handler.py
TJ Downes d592af6e19 fix(rrdtool): replace rrdtool.info() with self-tracked timestamp to eliminate allocation storm
Problem
-------
update_packet_metrics() called rrdtool.info() (cached for 5 s) to get the
RRD's last_update timestamp.  rrdtool.info() returns a massive Python dict:
17 data sources × 5 RRAs × ~8 fields each = ~700+ dict entries per call.
tracemalloc showed +10696 new allocations / +251 KB at this exact line,
flagged as "Investigate" in the memory diagnostics dashboard.

The rrdtool.info() approach was also unnecessarily complex: it required a
5-second secondary cache, a _pending_rrd_update buffer, and two extra
instance attributes — all to answer one question ("did we already write
this period?") that we can answer ourselves with a single integer.

Fix
---
Replace _last_rrd_info_cache / _last_rrd_info_time / _pending_rrd_update
with a single self._last_rrd_update: int = 0 that stores the timestamp of
the last successful rrdtool.update() call.  The throttle check becomes:

    if timestamp <= self._last_rrd_update:
        return

On success: self._last_rrd_update = timestamp

Zero dict allocations per call.  The only downside vs rrdtool.info() is
that _last_rrd_update resets to 0 on process restart, meaning the first
packet after a restart always triggers a write — correct behaviour.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-21 20:50:27 -07:00

293 lines
11 KiB
Python

import logging
import time
from pathlib import Path
from typing import Any, Dict, Optional
try:
import rrdtool
RRDTOOL_AVAILABLE = True
except ImportError:
RRDTOOL_AVAILABLE = False
logger = logging.getLogger("RRDToolHandler")
class RRDToolHandler:
def __init__(self, storage_dir: Path):
self.storage_dir = storage_dir
self.rrd_path = self.storage_dir / "metrics.rrd"
self.available = RRDTOOL_AVAILABLE
self._init_rrd()
# Timestamp of the last successful rrdtool.update() call (unix seconds,
# aligned to the 60-second RRD step). Used to skip writes whose period
# has already been committed — no rrdtool.info() call needed.
self._last_rrd_update: int = 0
# Read-side cache: rrdtool.fetch() returns 24 h of data and is a
# blocking disk read. Cache the result for 60 s — matching the RRD
# step size — so repeated dashboard refreshes don't hammer the SD card.
self._get_data_cache: tuple = (0.0, None) # (fetched_at, result)
def _init_rrd(self):
if not self.available:
logger.warning("RRDTool not available - skipping RRD initialization")
return
if self.rrd_path.exists():
logger.info(f"RRD database exists: {self.rrd_path}")
return
try:
rrdtool.create(
str(self.rrd_path),
"--step",
"60",
"--start",
str(int(time.time() - 60)),
"DS:rx_count:COUNTER:120:0:U",
"DS:tx_count:COUNTER:120:0:U",
"DS:drop_count:COUNTER:120:0:U",
"DS:avg_rssi:GAUGE:120:-200:0",
"DS:avg_snr:GAUGE:120:-30:30",
"DS:avg_length:GAUGE:120:0:256",
"DS:avg_score:GAUGE:120:0:1",
"DS:neighbor_count:GAUGE:120:0:U",
"DS:type_0:COUNTER:120:0:U",
"DS:type_1:COUNTER:120:0:U",
"DS:type_2:COUNTER:120:0:U",
"DS:type_3:COUNTER:120:0:U",
"DS:type_4:COUNTER:120:0:U",
"DS:type_5:COUNTER:120:0:U",
"DS:type_6:COUNTER:120:0:U",
"DS:type_7:COUNTER:120:0:U",
"DS:type_8:COUNTER:120:0:U",
"DS:type_9:COUNTER:120:0:U",
"DS:type_10:COUNTER:120:0:U",
"DS:type_11:COUNTER:120:0:U",
"DS:type_12:COUNTER:120:0:U",
"DS:type_13:COUNTER:120:0:U",
"DS:type_14:COUNTER:120:0:U",
"DS:type_15:COUNTER:120:0:U",
"DS:type_other:COUNTER:120:0:U",
"RRA:AVERAGE:0.5:1:10080",
"RRA:AVERAGE:0.5:5:8640",
"RRA:AVERAGE:0.5:60:8760",
"RRA:MAX:0.5:1:10080",
"RRA:MIN:0.5:1:10080",
)
logger.info(f"RRD database created: {self.rrd_path}")
except Exception as e:
logger.error(f"Failed to create RRD database: {e}")
def update_packet_metrics(self, record: dict, cumulative_counts: dict):
"""Write packet metrics to RRD, throttled to once per 60-second step.
RRD enforces a 60-second minimum step between updates. We track the
last written timestamp ourselves — no rrdtool.info() call needed, which
previously allocated thousands of Python objects per call.
"""
if not self.available or not self.rrd_path.exists():
return
try:
timestamp = int(record.get("timestamp", time.time()))
# Skip if this packet falls in the same 60-second period we already wrote.
if timestamp <= self._last_rrd_update:
return
# Build update string from cumulative counts
rx_total = cumulative_counts.get("rx_total", 0)
tx_total = cumulative_counts.get("tx_total", 0)
drop_total = cumulative_counts.get("drop_total", 0)
type_counts = cumulative_counts.get("type_counts", {})
type_values = []
for i in range(16):
type_values.append(str(type_counts.get(f"type_{i}", 0)))
type_values.append(str(type_counts.get("type_other", 0)))
rssi = record.get("rssi")
snr = record.get("snr")
score = record.get("score")
rssi_val = "U" if rssi is None else str(rssi)
snr_val = "U" if snr is None else str(snr)
score_val = "U" if score is None else str(score)
length_val = str(record.get("length", 0))
basic_values = (
f"{timestamp}:{rx_total}:{tx_total}:{drop_total}:"
f"{rssi_val}:{snr_val}:{length_val}:{score_val}:"
f"U"
)
type_values_str = ":".join(type_values)
values = f"{basic_values}:{type_values_str}"
rrdtool.update(str(self.rrd_path), values)
self._last_rrd_update = timestamp
except Exception as e:
logger.error(f"Failed to update RRD packet metrics: {e}")
logger.debug(f"RRD packet update failed - record: {record}")
def get_data(
self,
start_time: Optional[int] = None,
end_time: Optional[int] = None,
resolution: str = "average",
) -> Optional[dict]:
if not self.available or not self.rrd_path.exists():
logger.error(
f"RRD not available: available={self.available}, rrd_path exists={self.rrd_path.exists()}"
)
return None
# Serve from cache if result is still fresh. RRD step is 60 s, so
# anything newer than that is guaranteed to be identical to a live fetch.
# Only the default (full 24-hour, no explicit bounds) call is cached —
# explicit start/end requests always bypass the cache.
now = time.time()
use_cache = start_time is None and end_time is None
if use_cache:
cache_fetched_at, cache_result = self._get_data_cache
if now - cache_fetched_at < 60.0 and cache_result is not None:
return cache_result
try:
if end_time is None:
end_time = int(now)
if start_time is None:
start_time = end_time - (24 * 3600)
fetch_result = rrdtool.fetch(
str(self.rrd_path),
resolution.upper(),
"--start",
str(start_time),
"--end",
str(end_time),
)
if not fetch_result:
logger.error("RRD fetch returned None")
return None
(start, end, step), data_sources, data_points = fetch_result
if not data_points:
logger.warning("No data points returned from RRD fetch")
result = {
"start_time": start,
"end_time": end,
"step": step,
"data_sources": data_sources,
"packet_types": {},
"metrics": {},
}
timestamps = []
current_time = start
for ds in data_sources:
if ds.startswith("type_"):
if "packet_types" not in result:
result["packet_types"] = {}
result["packet_types"][ds] = []
else:
result["metrics"][ds] = []
for point in data_points:
timestamps.append(current_time)
for i, value in enumerate(point):
ds_name = data_sources[i]
if ds_name.startswith("type_"):
result["packet_types"][ds_name].append(value)
else:
result["metrics"][ds_name].append(value)
current_time += step
result["timestamps"] = timestamps
# Populate read cache for default (unconstrained) calls only.
if use_cache:
self._get_data_cache = (now, result)
return result
except Exception as e:
logger.error(f"Failed to get RRD data: {e}")
return None
def get_packet_type_stats(self, hours: int = 24) -> Optional[dict]:
try:
end_time = int(time.time())
start_time = end_time - (hours * 3600)
rrd_data = self.get_data(start_time, end_time)
if not rrd_data or "packet_types" not in rrd_data:
logger.warning(f"No RRD data available")
return None
type_totals = {}
packet_type_names = {
"type_0": "Request (REQ)",
"type_1": "Response (RESPONSE)",
"type_2": "Plain Text Message (TXT_MSG)",
"type_3": "Acknowledgment (ACK)",
"type_4": "Node Advertisement (ADVERT)",
"type_5": "Group Text Message (GRP_TXT)",
"type_6": "Group Datagram (GRP_DATA)",
"type_7": "Anonymous Request (ANON_REQ)",
"type_8": "Returned Path (PATH)",
"type_9": "Trace (TRACE)",
"type_10": "Multi-part Packet (MULTIPART)",
"type_11": "Control (CONTROL)",
"type_12": "Reserved Type 12",
"type_13": "Reserved Type 13",
"type_14": "Reserved Type 14",
"type_15": "Custom Packet (RAW_CUSTOM)",
"type_other": "Other Types (>15)",
}
total_valid_points = 0
for type_key, data_points in rrd_data["packet_types"].items():
valid_points = [p for p in data_points if p is not None]
total_valid_points += len(valid_points)
if total_valid_points < 10:
logger.warning(f"RRD data too sparse ({total_valid_points} valid points)")
return None
for type_key, data_points in rrd_data["packet_types"].items():
valid_points = [p for p in data_points if p is not None]
if len(valid_points) >= 2:
total = max(valid_points) - min(valid_points)
elif len(valid_points) == 1:
total = valid_points[0]
else:
total = 0
type_name = packet_type_names.get(type_key, type_key)
type_totals[type_name] = max(0, total or 0)
result = {
"hours": hours,
"packet_type_totals": type_totals,
"total_packets": sum(type_totals.values()),
"period": f"{hours} hours",
"data_source": "rrd",
}
return result
except Exception as e:
logger.error(f"Failed to get packet type stats from RRD: {e}")
return None