import logging import time from pathlib import Path from typing import Any, Dict, Optional try: import rrdtool RRDTOOL_AVAILABLE = True except ImportError: RRDTOOL_AVAILABLE = False logger = logging.getLogger("RRDToolHandler") class RRDToolHandler: def __init__(self, storage_dir: Path): self.storage_dir = storage_dir self.rrd_path = self.storage_dir / "metrics.rrd" self.available = RRDTOOL_AVAILABLE self._init_rrd() def _init_rrd(self): if not self.available: logger.warning("RRDTool not available - skipping RRD initialization") return if self.rrd_path.exists(): logger.info(f"RRD database exists: {self.rrd_path}") return try: rrdtool.create( str(self.rrd_path), "--step", "60", "--start", str(int(time.time() - 60)), "DS:rx_count:COUNTER:120:0:U", "DS:tx_count:COUNTER:120:0:U", "DS:drop_count:COUNTER:120:0:U", "DS:avg_rssi:GAUGE:120:-200:0", "DS:avg_snr:GAUGE:120:-30:30", "DS:avg_length:GAUGE:120:0:256", "DS:avg_score:GAUGE:120:0:1", "DS:neighbor_count:GAUGE:120:0:U", "DS:type_0:COUNTER:120:0:U", "DS:type_1:COUNTER:120:0:U", "DS:type_2:COUNTER:120:0:U", "DS:type_3:COUNTER:120:0:U", "DS:type_4:COUNTER:120:0:U", "DS:type_5:COUNTER:120:0:U", "DS:type_6:COUNTER:120:0:U", "DS:type_7:COUNTER:120:0:U", "DS:type_8:COUNTER:120:0:U", "DS:type_9:COUNTER:120:0:U", "DS:type_10:COUNTER:120:0:U", "DS:type_11:COUNTER:120:0:U", "DS:type_12:COUNTER:120:0:U", "DS:type_13:COUNTER:120:0:U", "DS:type_14:COUNTER:120:0:U", "DS:type_15:COUNTER:120:0:U", "DS:type_other:COUNTER:120:0:U", "RRA:AVERAGE:0.5:1:10080", "RRA:AVERAGE:0.5:5:8640", "RRA:AVERAGE:0.5:60:8760", "RRA:MAX:0.5:1:10080", "RRA:MIN:0.5:1:10080", ) logger.info(f"RRD database created: {self.rrd_path}") except Exception as e: logger.error(f"Failed to create RRD database: {e}") def update_packet_metrics(self, record: dict, cumulative_counts: dict): if not self.available or not self.rrd_path.exists(): return try: timestamp = int(record.get("timestamp", time.time())) try: info = rrdtool.info(str(self.rrd_path)) last_update = int(info.get("last_update", timestamp - 60)) if timestamp <= last_update: return except Exception as e: logger.debug(f"Failed to get RRD info for packet update: {e}") rx_total = cumulative_counts.get("rx_total", 0) tx_total = cumulative_counts.get("tx_total", 0) drop_total = cumulative_counts.get("drop_total", 0) type_counts = cumulative_counts.get("type_counts", {}) type_values = [] for i in range(16): type_values.append(str(type_counts.get(f"type_{i}", 0))) type_values.append(str(type_counts.get("type_other", 0))) # Handle None values for TX packets - use 'U' (unknown) for RRD rssi = record.get("rssi") snr = record.get("snr") score = record.get("score") rssi_val = "U" if rssi is None else str(rssi) snr_val = "U" if snr is None else str(snr) score_val = "U" if score is None else str(score) length_val = str(record.get("length", 0)) basic_values = ( f"{timestamp}:{rx_total}:{tx_total}:{drop_total}:" f"{rssi_val}:{snr_val}:{length_val}:{score_val}:" f"U" ) type_values_str = ":".join(type_values) values = f"{basic_values}:{type_values_str}" rrdtool.update(str(self.rrd_path), values) except Exception as e: logger.error(f"Failed to update RRD packet metrics: {e}") logger.debug(f"RRD packet update failed - record: {record}") def get_data( self, start_time: Optional[int] = None, end_time: Optional[int] = None, resolution: str = "average", ) -> Optional[dict]: if not self.available or not self.rrd_path.exists(): logger.error( f"RRD not available: available={self.available}, rrd_path exists={self.rrd_path.exists()}" ) return None try: if end_time is None: end_time = int(time.time()) if start_time is None: start_time = end_time - (24 * 3600) fetch_result = rrdtool.fetch( str(self.rrd_path), resolution.upper(), "--start", str(start_time), "--end", str(end_time), ) if not fetch_result: logger.error("RRD fetch returned None") return None (start, end, step), data_sources, data_points = fetch_result if not data_points: logger.warning("No data points returned from RRD fetch") result = { "start_time": start, "end_time": end, "step": step, "data_sources": data_sources, "packet_types": {}, "metrics": {}, } timestamps = [] current_time = start for ds in data_sources: if ds.startswith("type_"): if "packet_types" not in result: result["packet_types"] = {} result["packet_types"][ds] = [] else: result["metrics"][ds] = [] for point in data_points: timestamps.append(current_time) for i, value in enumerate(point): ds_name = data_sources[i] if ds_name.startswith("type_"): result["packet_types"][ds_name].append(value) else: result["metrics"][ds_name].append(value) current_time += step result["timestamps"] = timestamps return result except Exception as e: logger.error(f"Failed to get RRD data: {e}") return None def get_packet_type_stats(self, hours: int = 24) -> Optional[dict]: try: end_time = int(time.time()) start_time = end_time - (hours * 3600) rrd_data = self.get_data(start_time, end_time) if not rrd_data or "packet_types" not in rrd_data: logger.warning(f"No RRD data available") return None type_totals = {} packet_type_names = { "type_0": "Request (REQ)", "type_1": "Response (RESPONSE)", "type_2": "Plain Text Message (TXT_MSG)", "type_3": "Acknowledgment (ACK)", "type_4": "Node Advertisement (ADVERT)", "type_5": "Group Text Message (GRP_TXT)", "type_6": "Group Datagram (GRP_DATA)", "type_7": "Anonymous Request (ANON_REQ)", "type_8": "Returned Path (PATH)", "type_9": "Trace (TRACE)", "type_10": "Multi-part Packet (MULTIPART)", "type_11": "Control (CONTROL)", "type_12": "Reserved Type 12", "type_13": "Reserved Type 13", "type_14": "Reserved Type 14", "type_15": "Custom Packet (RAW_CUSTOM)", "type_other": "Other Types (>15)", } total_valid_points = 0 for type_key, data_points in rrd_data["packet_types"].items(): valid_points = [p for p in data_points if p is not None] total_valid_points += len(valid_points) if total_valid_points < 10: logger.warning(f"RRD data too sparse ({total_valid_points} valid points)") return None for type_key, data_points in rrd_data["packet_types"].items(): valid_points = [p for p in data_points if p is not None] if len(valid_points) >= 2: total = max(valid_points) - min(valid_points) elif len(valid_points) == 1: total = valid_points[0] else: total = 0 type_name = packet_type_names.get(type_key, type_key) type_totals[type_name] = max(0, total or 0) result = { "hours": hours, "packet_type_totals": type_totals, "total_packets": sum(type_totals.values()), "period": f"{hours} hours", "data_source": "rrd", } return result except Exception as e: logger.error(f"Failed to get packet type stats from RRD: {e}") return None