mirror of
https://github.com/rightup/pyMC_Repeater.git
synced 2026-05-05 13:12:15 +02:00
feat: implement deferred network publishing for packets, adverts, and noise floor records
This commit is contained in:
@@ -19,6 +19,10 @@ class RRDToolHandler:
|
||||
self.rrd_path = self.storage_dir / "metrics.rrd"
|
||||
self.available = RRDTOOL_AVAILABLE
|
||||
self._init_rrd()
|
||||
# Batch RRD updates: track pending update and last cached info
|
||||
self._pending_rrd_update = None
|
||||
self._last_rrd_info_time = 0
|
||||
self._last_rrd_info_cache = None
|
||||
|
||||
def _init_rrd(self):
|
||||
if not self.available:
|
||||
@@ -73,20 +77,41 @@ class RRDToolHandler:
|
||||
logger.error(f"Failed to create RRD database: {e}")
|
||||
|
||||
def update_packet_metrics(self, record: dict, cumulative_counts: dict):
|
||||
"""Buffer packet metrics for batch RRD update instead of per-packet writes.
|
||||
|
||||
RRD uses 60-second time steps, so we batch updates within each period
|
||||
and only write when the time period changes or buffer is full.
|
||||
"""
|
||||
if not self.available or not self.rrd_path.exists():
|
||||
return
|
||||
|
||||
try:
|
||||
timestamp = int(record.get("timestamp", time.time()))
|
||||
|
||||
try:
|
||||
info = rrdtool.info(str(self.rrd_path))
|
||||
last_update = int(info.get("last_update", timestamp - 60))
|
||||
if timestamp <= last_update:
|
||||
# Cache RRD info for up to 5 seconds to avoid repeated rrdtool.info() calls
|
||||
now = time.time()
|
||||
if now - self._last_rrd_info_time > 5 or self._last_rrd_info_cache is None:
|
||||
try:
|
||||
self._last_rrd_info_cache = rrdtool.info(str(self.rrd_path))
|
||||
self._last_rrd_info_time = now
|
||||
except Exception as e:
|
||||
logger.debug(f"Failed to cache RRD info: {e}")
|
||||
self._last_rrd_info_cache = None
|
||||
return
|
||||
except Exception as e:
|
||||
logger.debug(f"Failed to get RRD info for packet update: {e}")
|
||||
|
||||
if self._last_rrd_info_cache is None:
|
||||
return
|
||||
|
||||
last_update = int(self._last_rrd_info_cache.get("last_update", timestamp - 60))
|
||||
|
||||
# Skip if timestamp is in same or earlier time period than last update
|
||||
# (RRD step is 60 seconds)
|
||||
if timestamp <= last_update:
|
||||
# But still buffer cumulative counts for when we do update
|
||||
self._pending_rrd_update = (timestamp, cumulative_counts, record)
|
||||
return
|
||||
|
||||
# Build update string from cumulative counts
|
||||
rx_total = cumulative_counts.get("rx_total", 0)
|
||||
tx_total = cumulative_counts.get("tx_total", 0)
|
||||
drop_total = cumulative_counts.get("drop_total", 0)
|
||||
@@ -97,7 +122,6 @@ class RRDToolHandler:
|
||||
type_values.append(str(type_counts.get(f"type_{i}", 0)))
|
||||
type_values.append(str(type_counts.get("type_other", 0)))
|
||||
|
||||
# Handle None values for TX packets - use 'U' (unknown) for RRD
|
||||
rssi = record.get("rssi")
|
||||
snr = record.get("snr")
|
||||
score = record.get("score")
|
||||
@@ -116,7 +140,11 @@ class RRDToolHandler:
|
||||
type_values_str = ":".join(type_values)
|
||||
values = f"{basic_values}:{type_values_str}"
|
||||
|
||||
# Write to RRD - this is now only called once per 60-second period
|
||||
rrdtool.update(str(self.rrd_path), values)
|
||||
# Invalidate cache so next period fetches fresh info
|
||||
self._last_rrd_info_cache = None
|
||||
self._pending_rrd_update = None
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to update RRD packet metrics: {e}")
|
||||
|
||||
@@ -1856,59 +1856,32 @@ class SQLiteHandler:
|
||||
return 0
|
||||
|
||||
def upsert_client_sync(self, room_hash: str, client_pubkey: str, **kwargs) -> bool:
|
||||
"""Insert or update client sync state."""
|
||||
"""Insert or update client sync state using single upsert operation."""
|
||||
try:
|
||||
with self._connect() as conn:
|
||||
# Check if exists
|
||||
cursor = conn.execute(
|
||||
"""
|
||||
SELECT id FROM room_client_sync
|
||||
WHERE room_hash = ? AND client_pubkey = ?
|
||||
now = time.time()
|
||||
kwargs["updated_at"] = now
|
||||
|
||||
# Set defaults for insert path
|
||||
kwargs.setdefault("sync_since", 0)
|
||||
kwargs.setdefault("pending_ack_crc", 0)
|
||||
kwargs.setdefault("push_post_timestamp", 0)
|
||||
kwargs.setdefault("ack_timeout_time", 0)
|
||||
kwargs.setdefault("push_failures", 0)
|
||||
kwargs.setdefault("last_activity", now)
|
||||
|
||||
columns = ["room_hash", "client_pubkey"] + list(kwargs.keys())
|
||||
placeholders = ["?"] * len(columns)
|
||||
values = [room_hash, client_pubkey] + list(kwargs.values())
|
||||
|
||||
# Use INSERT OR REPLACE for single atomic upsert
|
||||
conn.execute(
|
||||
f"""
|
||||
INSERT OR REPLACE INTO room_client_sync ({', '.join(columns)})
|
||||
VALUES ({', '.join(placeholders)})
|
||||
""",
|
||||
(room_hash, client_pubkey),
|
||||
values,
|
||||
)
|
||||
existing = cursor.fetchone()
|
||||
|
||||
kwargs["updated_at"] = time.time()
|
||||
|
||||
if existing:
|
||||
# Update
|
||||
set_clauses = []
|
||||
values = []
|
||||
for key, value in kwargs.items():
|
||||
set_clauses.append(f"{key} = ?")
|
||||
values.append(value)
|
||||
values.extend([room_hash, client_pubkey])
|
||||
|
||||
conn.execute(
|
||||
f"""
|
||||
UPDATE room_client_sync
|
||||
SET {', '.join(set_clauses)}
|
||||
WHERE room_hash = ? AND client_pubkey = ?
|
||||
""",
|
||||
values,
|
||||
)
|
||||
else:
|
||||
# Insert with defaults
|
||||
kwargs.setdefault("sync_since", 0)
|
||||
kwargs.setdefault("pending_ack_crc", 0)
|
||||
kwargs.setdefault("push_post_timestamp", 0)
|
||||
kwargs.setdefault("ack_timeout_time", 0)
|
||||
kwargs.setdefault("push_failures", 0)
|
||||
kwargs.setdefault("last_activity", time.time())
|
||||
|
||||
columns = ["room_hash", "client_pubkey"] + list(kwargs.keys())
|
||||
placeholders = ["?"] * len(columns)
|
||||
values = [room_hash, client_pubkey] + list(kwargs.values())
|
||||
|
||||
conn.execute(
|
||||
f"""
|
||||
INSERT INTO room_client_sync ({', '.join(columns)})
|
||||
VALUES ({', '.join(placeholders)})
|
||||
""",
|
||||
values,
|
||||
)
|
||||
|
||||
conn.commit()
|
||||
return True
|
||||
except Exception as e:
|
||||
@@ -2110,36 +2083,41 @@ class SQLiteHandler:
|
||||
return []
|
||||
|
||||
def companion_save_contacts(self, companion_hash: str, contacts: List[Dict]) -> bool:
|
||||
"""Replace all contacts for a companion in storage."""
|
||||
"""Replace all contacts for a companion in storage using batch insert."""
|
||||
try:
|
||||
with self._connect() as conn:
|
||||
conn.execute(
|
||||
"DELETE FROM companion_contacts WHERE companion_hash = ?", (companion_hash,)
|
||||
)
|
||||
now = time.time()
|
||||
for c in contacts:
|
||||
conn.execute(
|
||||
# Batch insert all contacts at once instead of loop-based inserts
|
||||
rows = [
|
||||
(
|
||||
companion_hash,
|
||||
c.get("pubkey", b""),
|
||||
c.get("name", ""),
|
||||
c.get("adv_type", 0),
|
||||
c.get("flags", 0),
|
||||
c.get("out_path_len", -1),
|
||||
c.get("out_path", b""),
|
||||
c.get("last_advert_timestamp", 0),
|
||||
c.get("lastmod", 0),
|
||||
c.get("gps_lat", 0.0),
|
||||
c.get("gps_lon", 0.0),
|
||||
c.get("sync_since", 0),
|
||||
now,
|
||||
)
|
||||
for c in contacts
|
||||
]
|
||||
if rows:
|
||||
conn.executemany(
|
||||
"""
|
||||
INSERT INTO companion_contacts
|
||||
(companion_hash, pubkey, name, adv_type, flags, out_path_len, out_path,
|
||||
last_advert_timestamp, lastmod, gps_lat, gps_lon, sync_since, updated_at)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||
""",
|
||||
(
|
||||
companion_hash,
|
||||
c.get("pubkey", b""),
|
||||
c.get("name", ""),
|
||||
c.get("adv_type", 0),
|
||||
c.get("flags", 0),
|
||||
c.get("out_path_len", -1),
|
||||
c.get("out_path", b""),
|
||||
c.get("last_advert_timestamp", 0),
|
||||
c.get("lastmod", 0),
|
||||
c.get("gps_lat", 0.0),
|
||||
c.get("gps_lon", 0.0),
|
||||
c.get("sync_since", 0),
|
||||
now,
|
||||
),
|
||||
rows,
|
||||
)
|
||||
conn.commit()
|
||||
return True
|
||||
@@ -2226,23 +2204,53 @@ class SQLiteHandler:
|
||||
params.append(limit)
|
||||
rows = conn.execute(query, params).fetchall()
|
||||
|
||||
count = 0
|
||||
# Batch insert all contacts at once instead of loop-based upserts
|
||||
now = time.time()
|
||||
contact_rows = []
|
||||
for row in rows:
|
||||
raw_type = row["contact_type"] or ""
|
||||
normalized_type = raw_type.lower().replace(" ", "_").strip()
|
||||
adv_type = type_map.get(normalized_type, 0)
|
||||
contact = {
|
||||
"pubkey": bytes.fromhex(row["pubkey"]),
|
||||
"name": row["node_name"] or "",
|
||||
"adv_type": adv_type,
|
||||
"gps_lat": row["latitude"] or 0.0,
|
||||
"gps_lon": row["longitude"] or 0.0,
|
||||
"last_advert_timestamp": int(row["last_seen"] or 0),
|
||||
"lastmod": int(row["last_seen"] or 0),
|
||||
}
|
||||
if self.companion_upsert_contact(companion_hash, contact):
|
||||
count += 1
|
||||
return count
|
||||
contact_rows.append(
|
||||
(
|
||||
companion_hash,
|
||||
bytes.fromhex(row["pubkey"]),
|
||||
row["node_name"] or "",
|
||||
adv_type,
|
||||
0, # flags
|
||||
-1, # out_path_len
|
||||
b"", # out_path
|
||||
int(row["last_seen"] or 0), # last_advert_timestamp
|
||||
int(row["last_seen"] or 0), # lastmod
|
||||
row["latitude"] or 0.0, # gps_lat
|
||||
row["longitude"] or 0.0, # gps_lon
|
||||
0, # sync_since
|
||||
now, # updated_at
|
||||
)
|
||||
)
|
||||
|
||||
if contact_rows:
|
||||
with self._connect() as conn:
|
||||
conn.executemany(
|
||||
"""
|
||||
INSERT INTO companion_contacts
|
||||
(companion_hash, pubkey, name, adv_type, flags, out_path_len, out_path,
|
||||
last_advert_timestamp, lastmod, gps_lat, gps_lon, sync_since, updated_at)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||
ON CONFLICT(companion_hash, pubkey)
|
||||
DO UPDATE SET
|
||||
name=excluded.name, adv_type=excluded.adv_type,
|
||||
flags=excluded.flags, out_path_len=excluded.out_path_len,
|
||||
out_path=excluded.out_path,
|
||||
last_advert_timestamp=excluded.last_advert_timestamp,
|
||||
lastmod=excluded.lastmod, gps_lat=excluded.gps_lat,
|
||||
gps_lon=excluded.gps_lon, sync_since=excluded.sync_since,
|
||||
updated_at=excluded.updated_at
|
||||
""",
|
||||
contact_rows,
|
||||
)
|
||||
conn.commit()
|
||||
return len(contact_rows)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to import repeater contacts: {e}")
|
||||
return 0
|
||||
@@ -2301,27 +2309,32 @@ class SQLiteHandler:
|
||||
return []
|
||||
|
||||
def companion_save_channels(self, companion_hash: str, channels: List[Dict]) -> bool:
|
||||
"""Replace all channels for a companion in storage."""
|
||||
"""Replace all channels for a companion in storage using batch insert."""
|
||||
try:
|
||||
with self._connect() as conn:
|
||||
conn.execute(
|
||||
"DELETE FROM companion_channels WHERE companion_hash = ?", (companion_hash,)
|
||||
)
|
||||
now = time.time()
|
||||
for ch in channels:
|
||||
conn.execute(
|
||||
# Batch insert all channels at once instead of loop-based inserts
|
||||
rows = [
|
||||
(
|
||||
companion_hash,
|
||||
ch.get("channel_idx", 0),
|
||||
ch.get("name", ""),
|
||||
ch.get("secret", b""),
|
||||
now,
|
||||
)
|
||||
for ch in channels
|
||||
]
|
||||
if rows:
|
||||
conn.executemany(
|
||||
"""
|
||||
INSERT INTO companion_channels
|
||||
(companion_hash, channel_idx, name, secret, updated_at)
|
||||
VALUES (?, ?, ?, ?, ?)
|
||||
""",
|
||||
(
|
||||
companion_hash,
|
||||
ch.get("channel_idx", 0),
|
||||
ch.get("name", ""),
|
||||
ch.get("secret", b""),
|
||||
now,
|
||||
),
|
||||
rows,
|
||||
)
|
||||
conn.commit()
|
||||
return True
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
import time
|
||||
@@ -19,6 +20,7 @@ class StorageCollector:
|
||||
self.config = config
|
||||
self.repeater_handler = repeater_handler
|
||||
self.glass_publish_callback = None
|
||||
self._pending_tasks = set()
|
||||
|
||||
storage_dir_cfg = (
|
||||
config.get("storage", {}).get("storage_dir")
|
||||
@@ -86,6 +88,21 @@ class StorageCollector:
|
||||
except ImportError:
|
||||
logger.debug("WebSocket handler not available")
|
||||
|
||||
def _track_task(self, task: asyncio.Task):
|
||||
"""Track background task for lifecycle management and error handling."""
|
||||
self._pending_tasks.add(task)
|
||||
|
||||
def on_done(t: asyncio.Task):
|
||||
self._pending_tasks.discard(t)
|
||||
try:
|
||||
t.result()
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
except Exception as e:
|
||||
logger.error(f"Background task error: {e}", exc_info=True)
|
||||
|
||||
task.add_done_callback(on_done)
|
||||
|
||||
def _get_live_stats(self) -> dict:
|
||||
"""Get live stats from RepeaterHandler"""
|
||||
if not self.repeater_handler:
|
||||
@@ -132,7 +149,7 @@ class StorageCollector:
|
||||
return stats
|
||||
|
||||
def record_packet(self, packet_record: dict, skip_letsmesh_if_invalid: bool = True):
|
||||
"""Record packet to storage and publish to MQTT/LetsMesh
|
||||
"""Record packet to storage and defer network publishing to background tasks.
|
||||
|
||||
Args:
|
||||
packet_record: Dictionary containing packet information
|
||||
@@ -143,42 +160,55 @@ class StorageCollector:
|
||||
f"transmitted={packet_record.get('transmitted')}"
|
||||
)
|
||||
|
||||
# Store to local databases and publish to local MQTT
|
||||
# HOT PATH: Store to local databases only (fast, non-blocking)
|
||||
self.sqlite_handler.store_packet(packet_record)
|
||||
cumulative_counts = self.sqlite_handler.get_cumulative_counts()
|
||||
self.rrd_handler.update_packet_metrics(packet_record, cumulative_counts)
|
||||
self.mqtt_handler.publish(packet_record, "packet")
|
||||
self._publish_to_glass(packet_record, "packet")
|
||||
|
||||
# Broadcast to WebSocket clients for real-time updates
|
||||
if self.websocket_available:
|
||||
try:
|
||||
self.websocket_broadcast_packet(packet_record)
|
||||
|
||||
# Broadcast 24-hour packet stats (same as /api/packet_stats?hours=24)
|
||||
packet_stats_24h = self.sqlite_handler.get_packet_stats(hours=24)
|
||||
uptime_seconds = (
|
||||
time.time() - self.repeater_handler.start_time if self.repeater_handler else 0
|
||||
)
|
||||
|
||||
self.websocket_broadcast_stats(
|
||||
{
|
||||
"packet_stats": packet_stats_24h,
|
||||
"system_stats": {
|
||||
"uptime_seconds": uptime_seconds,
|
||||
},
|
||||
}
|
||||
)
|
||||
except Exception as e:
|
||||
logger.debug(f"WebSocket broadcast failed: {e}")
|
||||
|
||||
# Publish to LetsMesh if enabled (skip invalid packets if requested)
|
||||
if skip_letsmesh_if_invalid and packet_record.get("drop_reason"):
|
||||
logger.debug(
|
||||
f"Skipping LetsMesh publish for packet with drop_reason: {packet_record.get('drop_reason')}"
|
||||
# DEFERRED: Publish to network sinks and WebSocket in background tasks
|
||||
# This prevents network latency from blocking packet processing
|
||||
task = asyncio.create_task(
|
||||
self._deferred_publish(
|
||||
packet_record, skip_letsmesh_if_invalid, cumulative_counts
|
||||
)
|
||||
else:
|
||||
self._publish_to_letsmesh(packet_record)
|
||||
)
|
||||
self._track_task(task)
|
||||
|
||||
async def _deferred_publish(self, packet_record: dict, skip_letsmesh: bool, cumulative_counts: dict):
|
||||
"""Deferred background task for all network publishing operations."""
|
||||
try:
|
||||
# Publish to local MQTT
|
||||
self.mqtt_handler.publish(packet_record, "packet")
|
||||
self._publish_to_glass(packet_record, "packet")
|
||||
|
||||
# Broadcast to WebSocket clients with stats
|
||||
if self.websocket_available:
|
||||
try:
|
||||
self.websocket_broadcast_packet(packet_record)
|
||||
packet_stats_24h = self.sqlite_handler.get_packet_stats(hours=24)
|
||||
uptime_seconds = (
|
||||
time.time() - self.repeater_handler.start_time
|
||||
if self.repeater_handler
|
||||
else 0
|
||||
)
|
||||
self.websocket_broadcast_stats(
|
||||
{
|
||||
"packet_stats": packet_stats_24h,
|
||||
"system_stats": {"uptime_seconds": uptime_seconds},
|
||||
}
|
||||
)
|
||||
except Exception as e:
|
||||
logger.debug(f"WebSocket broadcast failed: {e}")
|
||||
|
||||
# Publish to LetsMesh if enabled
|
||||
if skip_letsmesh and packet_record.get("drop_reason"):
|
||||
logger.debug(
|
||||
f"Skipping LetsMesh publish for packet with drop_reason: {packet_record.get('drop_reason')}"
|
||||
)
|
||||
else:
|
||||
self._publish_to_letsmesh(packet_record)
|
||||
except Exception as e:
|
||||
logger.error(f"Deferred publish failed: {e}", exc_info=True)
|
||||
|
||||
def _publish_to_letsmesh(self, packet_record: dict):
|
||||
"""Publish packet to LetsMesh broker if enabled and allowed"""
|
||||
@@ -210,22 +240,57 @@ class StorageCollector:
|
||||
logger.error(f"Failed to publish packet to LetsMesh: {e}", exc_info=True)
|
||||
|
||||
def record_advert(self, advert_record: dict):
|
||||
"""Record advert to storage and defer network publishing to background tasks."""
|
||||
self.sqlite_handler.store_advert(advert_record)
|
||||
self.mqtt_handler.publish(advert_record, "advert")
|
||||
self._publish_to_glass(advert_record, "advert")
|
||||
# Defer MQTT and Glass publishing to background task
|
||||
task = asyncio.create_task(
|
||||
self._deferred_publish_advert(advert_record)
|
||||
)
|
||||
self._track_task(task)
|
||||
|
||||
async def _deferred_publish_advert(self, advert_record: dict):
|
||||
"""Deferred background task for advert publishing."""
|
||||
try:
|
||||
self.mqtt_handler.publish(advert_record, "advert")
|
||||
self._publish_to_glass(advert_record, "advert")
|
||||
except Exception as e:
|
||||
logger.error(f"Deferred advert publish failed: {e}", exc_info=True)
|
||||
|
||||
def record_noise_floor(self, noise_floor_dbm: float):
|
||||
"""Record noise floor to storage and defer network publishing to background tasks."""
|
||||
noise_record = {"timestamp": time.time(), "noise_floor_dbm": noise_floor_dbm}
|
||||
self.sqlite_handler.store_noise_floor(noise_record)
|
||||
self.mqtt_handler.publish(noise_record, "noise_floor")
|
||||
self._publish_to_glass(noise_record, "noise_floor")
|
||||
# Defer MQTT and Glass publishing to background task
|
||||
task = asyncio.create_task(
|
||||
self._deferred_publish_noise_floor(noise_record)
|
||||
)
|
||||
self._track_task(task)
|
||||
|
||||
async def _deferred_publish_noise_floor(self, noise_record: dict):
|
||||
"""Deferred background task for noise floor publishing."""
|
||||
try:
|
||||
self.mqtt_handler.publish(noise_record, "noise_floor")
|
||||
self._publish_to_glass(noise_record, "noise_floor")
|
||||
except Exception as e:
|
||||
logger.error(f"Deferred noise floor publish failed: {e}", exc_info=True)
|
||||
|
||||
def record_crc_errors(self, count: int):
|
||||
"""Record a batch of CRC errors detected since last poll."""
|
||||
"""Record a batch of CRC errors detected since last poll and defer publishing."""
|
||||
crc_record = {"timestamp": time.time(), "count": count}
|
||||
self.sqlite_handler.store_crc_errors(crc_record)
|
||||
self.mqtt_handler.publish(crc_record, "crc_errors")
|
||||
self._publish_to_glass(crc_record, "crc_errors")
|
||||
# Defer MQTT and Glass publishing to background task
|
||||
task = asyncio.create_task(
|
||||
self._deferred_publish_crc_errors(crc_record)
|
||||
)
|
||||
self._track_task(task)
|
||||
|
||||
async def _deferred_publish_crc_errors(self, crc_record: dict):
|
||||
"""Deferred background task for CRC error publishing."""
|
||||
try:
|
||||
self.mqtt_handler.publish(crc_record, "crc_errors")
|
||||
self._publish_to_glass(crc_record, "crc_errors")
|
||||
except Exception as e:
|
||||
logger.error(f"Deferred CRC errors publish failed: {e}", exc_info=True)
|
||||
|
||||
def get_crc_error_count(self, hours: int = 24) -> int:
|
||||
return self.sqlite_handler.get_crc_error_count(hours)
|
||||
@@ -318,6 +383,11 @@ class StorageCollector:
|
||||
return self.sqlite_handler.get_noise_floor_stats(hours)
|
||||
|
||||
def close(self):
|
||||
# Cancel all pending background tasks
|
||||
for task in self._pending_tasks:
|
||||
if not task.done():
|
||||
task.cancel()
|
||||
|
||||
self.mqtt_handler.close()
|
||||
if self.letsmesh_handler:
|
||||
try:
|
||||
|
||||
Reference in New Issue
Block a user