From 3df4b03fd9f3dde00d8d97acc3832573422dc673 Mon Sep 17 00:00:00 2001 From: Lloyd Date: Tue, 21 Apr 2026 09:49:12 +0100 Subject: [PATCH] feat: implement deferred network publishing for packets, adverts, and noise floor records --- repeater/data_acquisition/rrdtool_handler.py | 42 +++- repeater/data_acquisition/sqlite_handler.py | 193 ++++++++++-------- .../data_acquisition/storage_collector.py | 148 ++++++++++---- 3 files changed, 247 insertions(+), 136 deletions(-) diff --git a/repeater/data_acquisition/rrdtool_handler.py b/repeater/data_acquisition/rrdtool_handler.py index be469e2..a1e2397 100644 --- a/repeater/data_acquisition/rrdtool_handler.py +++ b/repeater/data_acquisition/rrdtool_handler.py @@ -19,6 +19,10 @@ class RRDToolHandler: self.rrd_path = self.storage_dir / "metrics.rrd" self.available = RRDTOOL_AVAILABLE self._init_rrd() + # Batch RRD updates: track pending update and last cached info + self._pending_rrd_update = None + self._last_rrd_info_time = 0 + self._last_rrd_info_cache = None def _init_rrd(self): if not self.available: @@ -73,20 +77,41 @@ class RRDToolHandler: logger.error(f"Failed to create RRD database: {e}") def update_packet_metrics(self, record: dict, cumulative_counts: dict): + """Buffer packet metrics for batch RRD update instead of per-packet writes. + + RRD uses 60-second time steps, so we batch updates within each period + and only write when the time period changes or buffer is full. + """ if not self.available or not self.rrd_path.exists(): return try: timestamp = int(record.get("timestamp", time.time())) - try: - info = rrdtool.info(str(self.rrd_path)) - last_update = int(info.get("last_update", timestamp - 60)) - if timestamp <= last_update: + # Cache RRD info for up to 5 seconds to avoid repeated rrdtool.info() calls + now = time.time() + if now - self._last_rrd_info_time > 5 or self._last_rrd_info_cache is None: + try: + self._last_rrd_info_cache = rrdtool.info(str(self.rrd_path)) + self._last_rrd_info_time = now + except Exception as e: + logger.debug(f"Failed to cache RRD info: {e}") + self._last_rrd_info_cache = None return - except Exception as e: - logger.debug(f"Failed to get RRD info for packet update: {e}") + if self._last_rrd_info_cache is None: + return + + last_update = int(self._last_rrd_info_cache.get("last_update", timestamp - 60)) + + # Skip if timestamp is in same or earlier time period than last update + # (RRD step is 60 seconds) + if timestamp <= last_update: + # But still buffer cumulative counts for when we do update + self._pending_rrd_update = (timestamp, cumulative_counts, record) + return + + # Build update string from cumulative counts rx_total = cumulative_counts.get("rx_total", 0) tx_total = cumulative_counts.get("tx_total", 0) drop_total = cumulative_counts.get("drop_total", 0) @@ -97,7 +122,6 @@ class RRDToolHandler: type_values.append(str(type_counts.get(f"type_{i}", 0))) type_values.append(str(type_counts.get("type_other", 0))) - # Handle None values for TX packets - use 'U' (unknown) for RRD rssi = record.get("rssi") snr = record.get("snr") score = record.get("score") @@ -116,7 +140,11 @@ class RRDToolHandler: type_values_str = ":".join(type_values) values = f"{basic_values}:{type_values_str}" + # Write to RRD - this is now only called once per 60-second period rrdtool.update(str(self.rrd_path), values) + # Invalidate cache so next period fetches fresh info + self._last_rrd_info_cache = None + self._pending_rrd_update = None except Exception as e: logger.error(f"Failed to update RRD packet metrics: {e}") diff --git a/repeater/data_acquisition/sqlite_handler.py b/repeater/data_acquisition/sqlite_handler.py index 22f94a2..d37ed81 100644 --- a/repeater/data_acquisition/sqlite_handler.py +++ b/repeater/data_acquisition/sqlite_handler.py @@ -1856,59 +1856,32 @@ class SQLiteHandler: return 0 def upsert_client_sync(self, room_hash: str, client_pubkey: str, **kwargs) -> bool: - """Insert or update client sync state.""" + """Insert or update client sync state using single upsert operation.""" try: with self._connect() as conn: - # Check if exists - cursor = conn.execute( - """ - SELECT id FROM room_client_sync - WHERE room_hash = ? AND client_pubkey = ? + now = time.time() + kwargs["updated_at"] = now + + # Set defaults for insert path + kwargs.setdefault("sync_since", 0) + kwargs.setdefault("pending_ack_crc", 0) + kwargs.setdefault("push_post_timestamp", 0) + kwargs.setdefault("ack_timeout_time", 0) + kwargs.setdefault("push_failures", 0) + kwargs.setdefault("last_activity", now) + + columns = ["room_hash", "client_pubkey"] + list(kwargs.keys()) + placeholders = ["?"] * len(columns) + values = [room_hash, client_pubkey] + list(kwargs.values()) + + # Use INSERT OR REPLACE for single atomic upsert + conn.execute( + f""" + INSERT OR REPLACE INTO room_client_sync ({', '.join(columns)}) + VALUES ({', '.join(placeholders)}) """, - (room_hash, client_pubkey), + values, ) - existing = cursor.fetchone() - - kwargs["updated_at"] = time.time() - - if existing: - # Update - set_clauses = [] - values = [] - for key, value in kwargs.items(): - set_clauses.append(f"{key} = ?") - values.append(value) - values.extend([room_hash, client_pubkey]) - - conn.execute( - f""" - UPDATE room_client_sync - SET {', '.join(set_clauses)} - WHERE room_hash = ? AND client_pubkey = ? - """, - values, - ) - else: - # Insert with defaults - kwargs.setdefault("sync_since", 0) - kwargs.setdefault("pending_ack_crc", 0) - kwargs.setdefault("push_post_timestamp", 0) - kwargs.setdefault("ack_timeout_time", 0) - kwargs.setdefault("push_failures", 0) - kwargs.setdefault("last_activity", time.time()) - - columns = ["room_hash", "client_pubkey"] + list(kwargs.keys()) - placeholders = ["?"] * len(columns) - values = [room_hash, client_pubkey] + list(kwargs.values()) - - conn.execute( - f""" - INSERT INTO room_client_sync ({', '.join(columns)}) - VALUES ({', '.join(placeholders)}) - """, - values, - ) - conn.commit() return True except Exception as e: @@ -2110,36 +2083,41 @@ class SQLiteHandler: return [] def companion_save_contacts(self, companion_hash: str, contacts: List[Dict]) -> bool: - """Replace all contacts for a companion in storage.""" + """Replace all contacts for a companion in storage using batch insert.""" try: with self._connect() as conn: conn.execute( "DELETE FROM companion_contacts WHERE companion_hash = ?", (companion_hash,) ) now = time.time() - for c in contacts: - conn.execute( + # Batch insert all contacts at once instead of loop-based inserts + rows = [ + ( + companion_hash, + c.get("pubkey", b""), + c.get("name", ""), + c.get("adv_type", 0), + c.get("flags", 0), + c.get("out_path_len", -1), + c.get("out_path", b""), + c.get("last_advert_timestamp", 0), + c.get("lastmod", 0), + c.get("gps_lat", 0.0), + c.get("gps_lon", 0.0), + c.get("sync_since", 0), + now, + ) + for c in contacts + ] + if rows: + conn.executemany( """ INSERT INTO companion_contacts (companion_hash, pubkey, name, adv_type, flags, out_path_len, out_path, last_advert_timestamp, lastmod, gps_lat, gps_lon, sync_since, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, - ( - companion_hash, - c.get("pubkey", b""), - c.get("name", ""), - c.get("adv_type", 0), - c.get("flags", 0), - c.get("out_path_len", -1), - c.get("out_path", b""), - c.get("last_advert_timestamp", 0), - c.get("lastmod", 0), - c.get("gps_lat", 0.0), - c.get("gps_lon", 0.0), - c.get("sync_since", 0), - now, - ), + rows, ) conn.commit() return True @@ -2226,23 +2204,53 @@ class SQLiteHandler: params.append(limit) rows = conn.execute(query, params).fetchall() - count = 0 + # Batch insert all contacts at once instead of loop-based upserts + now = time.time() + contact_rows = [] for row in rows: raw_type = row["contact_type"] or "" normalized_type = raw_type.lower().replace(" ", "_").strip() adv_type = type_map.get(normalized_type, 0) - contact = { - "pubkey": bytes.fromhex(row["pubkey"]), - "name": row["node_name"] or "", - "adv_type": adv_type, - "gps_lat": row["latitude"] or 0.0, - "gps_lon": row["longitude"] or 0.0, - "last_advert_timestamp": int(row["last_seen"] or 0), - "lastmod": int(row["last_seen"] or 0), - } - if self.companion_upsert_contact(companion_hash, contact): - count += 1 - return count + contact_rows.append( + ( + companion_hash, + bytes.fromhex(row["pubkey"]), + row["node_name"] or "", + adv_type, + 0, # flags + -1, # out_path_len + b"", # out_path + int(row["last_seen"] or 0), # last_advert_timestamp + int(row["last_seen"] or 0), # lastmod + row["latitude"] or 0.0, # gps_lat + row["longitude"] or 0.0, # gps_lon + 0, # sync_since + now, # updated_at + ) + ) + + if contact_rows: + with self._connect() as conn: + conn.executemany( + """ + INSERT INTO companion_contacts + (companion_hash, pubkey, name, adv_type, flags, out_path_len, out_path, + last_advert_timestamp, lastmod, gps_lat, gps_lon, sync_since, updated_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + ON CONFLICT(companion_hash, pubkey) + DO UPDATE SET + name=excluded.name, adv_type=excluded.adv_type, + flags=excluded.flags, out_path_len=excluded.out_path_len, + out_path=excluded.out_path, + last_advert_timestamp=excluded.last_advert_timestamp, + lastmod=excluded.lastmod, gps_lat=excluded.gps_lat, + gps_lon=excluded.gps_lon, sync_since=excluded.sync_since, + updated_at=excluded.updated_at + """, + contact_rows, + ) + conn.commit() + return len(contact_rows) except Exception as e: logger.error(f"Failed to import repeater contacts: {e}") return 0 @@ -2301,27 +2309,32 @@ class SQLiteHandler: return [] def companion_save_channels(self, companion_hash: str, channels: List[Dict]) -> bool: - """Replace all channels for a companion in storage.""" + """Replace all channels for a companion in storage using batch insert.""" try: with self._connect() as conn: conn.execute( "DELETE FROM companion_channels WHERE companion_hash = ?", (companion_hash,) ) now = time.time() - for ch in channels: - conn.execute( + # Batch insert all channels at once instead of loop-based inserts + rows = [ + ( + companion_hash, + ch.get("channel_idx", 0), + ch.get("name", ""), + ch.get("secret", b""), + now, + ) + for ch in channels + ] + if rows: + conn.executemany( """ INSERT INTO companion_channels (companion_hash, channel_idx, name, secret, updated_at) VALUES (?, ?, ?, ?, ?) """, - ( - companion_hash, - ch.get("channel_idx", 0), - ch.get("name", ""), - ch.get("secret", b""), - now, - ), + rows, ) conn.commit() return True diff --git a/repeater/data_acquisition/storage_collector.py b/repeater/data_acquisition/storage_collector.py index fb33b59..f6b1510 100644 --- a/repeater/data_acquisition/storage_collector.py +++ b/repeater/data_acquisition/storage_collector.py @@ -1,3 +1,4 @@ +import asyncio import json import logging import time @@ -19,6 +20,7 @@ class StorageCollector: self.config = config self.repeater_handler = repeater_handler self.glass_publish_callback = None + self._pending_tasks = set() storage_dir_cfg = ( config.get("storage", {}).get("storage_dir") @@ -86,6 +88,21 @@ class StorageCollector: except ImportError: logger.debug("WebSocket handler not available") + def _track_task(self, task: asyncio.Task): + """Track background task for lifecycle management and error handling.""" + self._pending_tasks.add(task) + + def on_done(t: asyncio.Task): + self._pending_tasks.discard(t) + try: + t.result() + except asyncio.CancelledError: + pass + except Exception as e: + logger.error(f"Background task error: {e}", exc_info=True) + + task.add_done_callback(on_done) + def _get_live_stats(self) -> dict: """Get live stats from RepeaterHandler""" if not self.repeater_handler: @@ -132,7 +149,7 @@ class StorageCollector: return stats def record_packet(self, packet_record: dict, skip_letsmesh_if_invalid: bool = True): - """Record packet to storage and publish to MQTT/LetsMesh + """Record packet to storage and defer network publishing to background tasks. Args: packet_record: Dictionary containing packet information @@ -143,42 +160,55 @@ class StorageCollector: f"transmitted={packet_record.get('transmitted')}" ) - # Store to local databases and publish to local MQTT + # HOT PATH: Store to local databases only (fast, non-blocking) self.sqlite_handler.store_packet(packet_record) cumulative_counts = self.sqlite_handler.get_cumulative_counts() self.rrd_handler.update_packet_metrics(packet_record, cumulative_counts) - self.mqtt_handler.publish(packet_record, "packet") - self._publish_to_glass(packet_record, "packet") - # Broadcast to WebSocket clients for real-time updates - if self.websocket_available: - try: - self.websocket_broadcast_packet(packet_record) - - # Broadcast 24-hour packet stats (same as /api/packet_stats?hours=24) - packet_stats_24h = self.sqlite_handler.get_packet_stats(hours=24) - uptime_seconds = ( - time.time() - self.repeater_handler.start_time if self.repeater_handler else 0 - ) - - self.websocket_broadcast_stats( - { - "packet_stats": packet_stats_24h, - "system_stats": { - "uptime_seconds": uptime_seconds, - }, - } - ) - except Exception as e: - logger.debug(f"WebSocket broadcast failed: {e}") - - # Publish to LetsMesh if enabled (skip invalid packets if requested) - if skip_letsmesh_if_invalid and packet_record.get("drop_reason"): - logger.debug( - f"Skipping LetsMesh publish for packet with drop_reason: {packet_record.get('drop_reason')}" + # DEFERRED: Publish to network sinks and WebSocket in background tasks + # This prevents network latency from blocking packet processing + task = asyncio.create_task( + self._deferred_publish( + packet_record, skip_letsmesh_if_invalid, cumulative_counts ) - else: - self._publish_to_letsmesh(packet_record) + ) + self._track_task(task) + + async def _deferred_publish(self, packet_record: dict, skip_letsmesh: bool, cumulative_counts: dict): + """Deferred background task for all network publishing operations.""" + try: + # Publish to local MQTT + self.mqtt_handler.publish(packet_record, "packet") + self._publish_to_glass(packet_record, "packet") + + # Broadcast to WebSocket clients with stats + if self.websocket_available: + try: + self.websocket_broadcast_packet(packet_record) + packet_stats_24h = self.sqlite_handler.get_packet_stats(hours=24) + uptime_seconds = ( + time.time() - self.repeater_handler.start_time + if self.repeater_handler + else 0 + ) + self.websocket_broadcast_stats( + { + "packet_stats": packet_stats_24h, + "system_stats": {"uptime_seconds": uptime_seconds}, + } + ) + except Exception as e: + logger.debug(f"WebSocket broadcast failed: {e}") + + # Publish to LetsMesh if enabled + if skip_letsmesh and packet_record.get("drop_reason"): + logger.debug( + f"Skipping LetsMesh publish for packet with drop_reason: {packet_record.get('drop_reason')}" + ) + else: + self._publish_to_letsmesh(packet_record) + except Exception as e: + logger.error(f"Deferred publish failed: {e}", exc_info=True) def _publish_to_letsmesh(self, packet_record: dict): """Publish packet to LetsMesh broker if enabled and allowed""" @@ -210,22 +240,57 @@ class StorageCollector: logger.error(f"Failed to publish packet to LetsMesh: {e}", exc_info=True) def record_advert(self, advert_record: dict): + """Record advert to storage and defer network publishing to background tasks.""" self.sqlite_handler.store_advert(advert_record) - self.mqtt_handler.publish(advert_record, "advert") - self._publish_to_glass(advert_record, "advert") + # Defer MQTT and Glass publishing to background task + task = asyncio.create_task( + self._deferred_publish_advert(advert_record) + ) + self._track_task(task) + + async def _deferred_publish_advert(self, advert_record: dict): + """Deferred background task for advert publishing.""" + try: + self.mqtt_handler.publish(advert_record, "advert") + self._publish_to_glass(advert_record, "advert") + except Exception as e: + logger.error(f"Deferred advert publish failed: {e}", exc_info=True) def record_noise_floor(self, noise_floor_dbm: float): + """Record noise floor to storage and defer network publishing to background tasks.""" noise_record = {"timestamp": time.time(), "noise_floor_dbm": noise_floor_dbm} self.sqlite_handler.store_noise_floor(noise_record) - self.mqtt_handler.publish(noise_record, "noise_floor") - self._publish_to_glass(noise_record, "noise_floor") + # Defer MQTT and Glass publishing to background task + task = asyncio.create_task( + self._deferred_publish_noise_floor(noise_record) + ) + self._track_task(task) + + async def _deferred_publish_noise_floor(self, noise_record: dict): + """Deferred background task for noise floor publishing.""" + try: + self.mqtt_handler.publish(noise_record, "noise_floor") + self._publish_to_glass(noise_record, "noise_floor") + except Exception as e: + logger.error(f"Deferred noise floor publish failed: {e}", exc_info=True) def record_crc_errors(self, count: int): - """Record a batch of CRC errors detected since last poll.""" + """Record a batch of CRC errors detected since last poll and defer publishing.""" crc_record = {"timestamp": time.time(), "count": count} self.sqlite_handler.store_crc_errors(crc_record) - self.mqtt_handler.publish(crc_record, "crc_errors") - self._publish_to_glass(crc_record, "crc_errors") + # Defer MQTT and Glass publishing to background task + task = asyncio.create_task( + self._deferred_publish_crc_errors(crc_record) + ) + self._track_task(task) + + async def _deferred_publish_crc_errors(self, crc_record: dict): + """Deferred background task for CRC error publishing.""" + try: + self.mqtt_handler.publish(crc_record, "crc_errors") + self._publish_to_glass(crc_record, "crc_errors") + except Exception as e: + logger.error(f"Deferred CRC errors publish failed: {e}", exc_info=True) def get_crc_error_count(self, hours: int = 24) -> int: return self.sqlite_handler.get_crc_error_count(hours) @@ -318,6 +383,11 @@ class StorageCollector: return self.sqlite_handler.get_noise_floor_stats(hours) def close(self): + # Cancel all pending background tasks + for task in self._pending_tasks: + if not task.done(): + task.cancel() + self.mqtt_handler.close() if self.letsmesh_handler: try: