mirror of
https://github.com/rightup/pyMC_Repeater.git
synced 2026-03-28 17:43:06 +01:00
refactor: streamline packet and advert recording by consolidating RRD metrics updates
This commit is contained in:
@@ -192,15 +192,12 @@ class StorageCollector:
|
||||
self.mqtt_client = None
|
||||
|
||||
def record_packet(self, packet_record: dict):
|
||||
|
||||
self._store_packet_sqlite(packet_record)
|
||||
self._update_rrd_metrics(packet_record, record_type="packet")
|
||||
self._update_rrd_packet_metrics(packet_record)
|
||||
self._publish_mqtt(packet_record, "packet")
|
||||
|
||||
def record_advert(self, advert_record: dict):
|
||||
|
||||
self._store_advert_sqlite(advert_record)
|
||||
self._update_rrd_metrics(advert_record, record_type="advert")
|
||||
self._publish_mqtt(advert_record, "advert")
|
||||
|
||||
def _store_packet_sqlite(self, record: dict):
|
||||
@@ -311,71 +308,49 @@ class StorageCollector:
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to store advert in SQLite: {e}")
|
||||
|
||||
def _update_rrd_metrics(self, record: dict, record_type: str):
|
||||
def _update_rrd_packet_metrics(self, record: dict):
|
||||
if not RRDTOOL_AVAILABLE or not self.rrd_path.exists():
|
||||
return
|
||||
|
||||
try:
|
||||
# Get current timestamp
|
||||
timestamp = int(record.get("timestamp", time.time()))
|
||||
|
||||
# Get current values from RRD (for counters we need to increment)
|
||||
# Skip if trying to update with old data
|
||||
try:
|
||||
info = rrdtool.info(str(self.rrd_path))
|
||||
last_update = int(info.get("last_update", timestamp - 60))
|
||||
|
||||
# Skip if trying to update with old data
|
||||
if timestamp <= last_update:
|
||||
return
|
||||
|
||||
except Exception:
|
||||
# If we can't read info, proceed with update
|
||||
pass
|
||||
|
||||
# Prepare update values based on record type
|
||||
if record_type == "packet":
|
||||
# Get packet type for counter tracking
|
||||
packet_type = record.get("type", 0)
|
||||
|
||||
# For packets, we update counters and gauges
|
||||
rx_inc = 1
|
||||
tx_inc = 1 if record.get("transmitted", False) else 0
|
||||
drop_inc = 0 if record.get("transmitted", False) else 1
|
||||
|
||||
# Initialize packet type counters (all start with 0)
|
||||
type_counters = ["0"] * 17 # type_0 through type_15 plus type_other
|
||||
|
||||
# Increment the appropriate packet type counter
|
||||
if 0 <= packet_type <= 15:
|
||||
type_counters[packet_type] = "1"
|
||||
else:
|
||||
type_counters[16] = "1" # type_other for packet types > 15
|
||||
|
||||
# Build the values string: basic metrics + packet type counters
|
||||
basic_values = f"{timestamp}:{rx_inc}:{tx_inc}:{drop_inc}:" \
|
||||
f"{record.get('rssi', 'U')}:{record.get('snr', 'U')}:" \
|
||||
f"{record.get('length', 'U')}:{record.get('score', 'U')}:U"
|
||||
|
||||
type_values = ":".join(type_counters)
|
||||
values = f"{basic_values}:{type_values}"
|
||||
|
||||
elif record_type == "advert":
|
||||
# For adverts, we mainly update gauges, packet type counters stay at 0
|
||||
type_counters = ["0"] * 17 # All packet type counters set to 0
|
||||
type_values = ":".join(type_counters)
|
||||
|
||||
basic_values = f"{timestamp}:0:0:0:" \
|
||||
f"{record.get('rssi', 'U')}:{record.get('snr', 'U')}:" \
|
||||
f"U:U:1"
|
||||
|
||||
values = f"{basic_values}:{type_values}"
|
||||
# For packets, we update counters and gauges
|
||||
packet_type = record.get("type", 0)
|
||||
rx_inc = 1
|
||||
tx_inc = 1 if record.get("transmitted", False) else 0
|
||||
drop_inc = 0 if record.get("transmitted", False) else 1
|
||||
|
||||
# Initialize packet type counters (all start with 0)
|
||||
type_counters = ["0"] * 17 # type_0 through type_15 plus type_other
|
||||
|
||||
# Increment the appropriate packet type counter
|
||||
if 0 <= packet_type <= 15:
|
||||
type_counters[packet_type] = "1"
|
||||
else:
|
||||
return
|
||||
|
||||
type_counters[16] = "1" # type_other for packet types > 15
|
||||
|
||||
# Build the values string: basic metrics + packet type counters
|
||||
basic_values = f"{timestamp}:{rx_inc}:{tx_inc}:{drop_inc}:" \
|
||||
f"{record.get('rssi', 'U')}:{record.get('snr', 'U')}:" \
|
||||
f"{record.get('length', 'U')}:{record.get('score', 'U')}:U"
|
||||
|
||||
type_values = ":".join(type_counters)
|
||||
values = f"{basic_values}:{type_values}"
|
||||
|
||||
rrdtool.update(str(self.rrd_path), values)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to update RRD metrics: {e}")
|
||||
logger.error(f"Failed to update RRD packet metrics: {e}")
|
||||
|
||||
def _publish_mqtt(self, record: dict, record_type: str):
|
||||
"""Publish record to MQTT broker."""
|
||||
@@ -385,16 +360,9 @@ class StorageCollector:
|
||||
try:
|
||||
base_topic = self.mqtt_config.get("base_topic", "meshcore/repeater")
|
||||
node_name = self.config.get("repeater", {}).get("node_name", "unknown")
|
||||
|
||||
topic = f"{base_topic}/{node_name}/{record_type}"
|
||||
|
||||
# Create clean payload (remove non-serializable items)
|
||||
payload = {k: v for k, v in record.items() if v is not None}
|
||||
|
||||
# Convert to JSON
|
||||
message = json.dumps(payload, default=str)
|
||||
|
||||
# Publish
|
||||
self.mqtt_client.publish(topic, message, qos=0, retain=False)
|
||||
|
||||
except Exception as e:
|
||||
|
||||
Reference in New Issue
Block a user