From be56e919fd636f4e1714757bfa7deca51f5b56a3 Mon Sep 17 00:00:00 2001 From: Lloyd Date: Tue, 21 Apr 2026 14:46:30 +0100 Subject: [PATCH] feat: add server-side airtime bucket aggregation for optimized chart rendering --- repeater/data_acquisition/sqlite_handler.py | 77 +++++++++++++++++-- .../data_acquisition/storage_collector.py | 14 ++++ repeater/web/api_endpoints.py | 36 +++++++++ repeater/web/companion_endpoints.py | 14 ++-- repeater/web/http_server.py | 21 +++++ 5 files changed, 151 insertions(+), 11 deletions(-) diff --git a/repeater/data_acquisition/sqlite_handler.py b/repeater/data_acquisition/sqlite_handler.py index d37ed81..6f55ad1 100644 --- a/repeater/data_acquisition/sqlite_handler.py +++ b/repeater/data_acquisition/sqlite_handler.py @@ -852,9 +852,9 @@ class SQLiteHandler: SELECT timestamp, type, route, length, rssi, snr, score, transmitted, is_duplicate, drop_reason, src_hash, dst_hash, path_hash, - header, transport_codes, payload, payload_length, - tx_delay_ms, packet_hash, original_path, forwarded_path, raw_packet, - lbt_attempts, lbt_backoff_delays_ms, lbt_channel_busy + transport_codes, payload, payload_length, + tx_delay_ms, packet_hash, original_path, forwarded_path, + lbt_attempts, lbt_channel_busy FROM packets ORDER BY timestamp DESC LIMIT ? @@ -904,9 +904,9 @@ class SQLiteHandler: SELECT timestamp, type, route, length, rssi, snr, score, transmitted, is_duplicate, drop_reason, src_hash, dst_hash, path_hash, - header, transport_codes, payload, payload_length, - tx_delay_ms, packet_hash, original_path, forwarded_path, raw_packet, - lbt_attempts, lbt_backoff_delays_ms, lbt_channel_busy + transport_codes, payload, payload_length, + tx_delay_ms, packet_hash, original_path, forwarded_path, + lbt_attempts, lbt_channel_busy FROM packets """ @@ -955,6 +955,71 @@ class SQLiteHandler: logger.error(f"Failed to get airtime data: {e}") return [] + def get_airtime_buckets( + self, + start_timestamp: float, + end_timestamp: float, + bucket_seconds: int = 60, + sf: int = 9, + bw_hz: int = 62500, + cr: int = 5, + preamble: int = 17, + ) -> list: + """Return pre-aggregated airtime buckets for chart rendering. + + Applies the Semtech LoRa airtime formula server-side and groups results + into time buckets, drastically reducing response size vs raw packet rows. + """ + import math + + bw_khz = bw_hz / 1000 + t_sym = (2**sf) / bw_khz # ms per symbol + t_preamble = (preamble + 4.25) * t_sym + de = 1 if sf >= 11 and bw_hz <= 125000 else 0 + + def _airtime_ms(length_bytes: int) -> float: + length_bytes = max(length_bytes or 32, 1) + numerator = max(8 * length_bytes - 4 * sf + 28 + 16, 0) # CRC=1, H=0 + denominator = 4 * (sf - 2 * de) + n_payload = 8 + math.ceil(numerator / denominator) * cr + return t_preamble + n_payload * t_sym + + try: + with self._connect() as conn: + conn.row_factory = sqlite3.Row + rows = conn.execute( + "SELECT timestamp, length, transmitted FROM packets " + "WHERE timestamp >= ? AND timestamp <= ? ORDER BY timestamp ASC", + (start_timestamp, end_timestamp), + ).fetchall() + + buckets: dict = {} + rx_total = 0 + tx_total = 0 + for row in rows: + bucket_ts = int(row["timestamp"] / bucket_seconds) * bucket_seconds + ms = _airtime_ms(row["length"]) + if bucket_ts not in buckets: + buckets[bucket_ts] = {"timestamp": bucket_ts, "rx_ms": 0.0, "tx_ms": 0.0, "rx_count": 0, "tx_count": 0} + if row["transmitted"]: + buckets[bucket_ts]["tx_ms"] += ms + buckets[bucket_ts]["tx_count"] += 1 + tx_total += 1 + else: + buckets[bucket_ts]["rx_ms"] += ms + buckets[bucket_ts]["rx_count"] += 1 + rx_total += 1 + + return { + "buckets": sorted(buckets.values(), key=lambda x: x["timestamp"]), + "bucket_seconds": bucket_seconds, + "rx_total": rx_total, + "tx_total": tx_total, + } + except Exception as e: + logger.error(f"Failed to get airtime buckets: {e}") + return {"buckets": [], "bucket_seconds": bucket_seconds, "rx_total": 0, "tx_total": 0} + def get_packet_by_hash(self, packet_hash: str) -> Optional[dict]: try: with self._connect() as conn: diff --git a/repeater/data_acquisition/storage_collector.py b/repeater/data_acquisition/storage_collector.py index c980854..f800617 100644 --- a/repeater/data_acquisition/storage_collector.py +++ b/repeater/data_acquisition/storage_collector.py @@ -345,6 +345,20 @@ class StorageCollector: ) -> list: return self.sqlite_handler.get_airtime_data(start_timestamp, end_timestamp, limit) + def get_airtime_buckets( + self, + start_timestamp: float, + end_timestamp: float, + bucket_seconds: int = 60, + sf: int = 9, + bw_hz: int = 62500, + cr: int = 5, + preamble: int = 17, + ) -> dict: + return self.sqlite_handler.get_airtime_buckets( + start_timestamp, end_timestamp, bucket_seconds, sf, bw_hz, cr, preamble + ) + def get_packet_by_hash(self, packet_hash: str) -> Optional[dict]: return self.sqlite_handler.get_packet_by_hash(packet_hash) diff --git a/repeater/web/api_endpoints.py b/repeater/web/api_endpoints.py index ff65b3b..7b4f78a 100644 --- a/repeater/web/api_endpoints.py +++ b/repeater/web/api_endpoints.py @@ -1457,6 +1457,42 @@ class APIEndpoints: logger.error(f"Error getting airtime data: {e}") return self._error(e) + @cherrypy.expose + @cherrypy.tools.json_out() + def airtime_chart_data( + self, + start_timestamp=None, + end_timestamp=None, + bucket_seconds=60, + sf=9, + bw_hz=62500, + cr=5, + preamble=17, + ): + """Server-side aggregated airtime utilization for chart rendering. + + Returns pre-bucketed rx_ms/tx_ms per time bucket instead of raw packet rows, + reducing response size from potentially hundreds of KB to a few KB. + """ + try: + now = __import__("time").time() + start_ts = float(start_timestamp) if start_timestamp is not None else now - 86400 + end_ts = float(end_timestamp) if end_timestamp is not None else now + bucket_s = max(10, min(int(bucket_seconds), 3600)) + result = self._get_storage().get_airtime_buckets( + start_timestamp=start_ts, + end_timestamp=end_ts, + bucket_seconds=bucket_s, + sf=int(sf), + bw_hz=int(bw_hz), + cr=int(cr), + preamble=int(preamble), + ) + return self._success(result) + except Exception as e: + logger.error(f"Error getting airtime chart data: {e}") + return self._error(e) + @cherrypy.expose @cherrypy.tools.json_out() def packet_by_hash(self, packet_hash=None): diff --git a/repeater/web/companion_endpoints.py b/repeater/web/companion_endpoints.py index d33384d..7a5688d 100644 --- a/repeater/web/companion_endpoints.py +++ b/repeater/web/companion_endpoints.py @@ -37,6 +37,10 @@ class CompanionAPIEndpoints: self.config = config or {} self.config_manager = config_manager + http_cfg = self.config.get("http", {}) if isinstance(self.config, dict) else {} + self._sse_queue_maxsize = max(32, int(http_cfg.get("sse_queue_maxsize", 64))) + self._sse_keepalive_sec = max(5, int(http_cfg.get("sse_keepalive_sec", 15))) + # SSE clients: each gets a thread-safe queue self._sse_clients: list[queue.Queue] = [] self._sse_lock = threading.Lock() @@ -666,7 +670,7 @@ class CompanionAPIEndpoints: cherrypy.response.headers["Connection"] = "keep-alive" cherrypy.response.headers["X-Accel-Buffering"] = "no" - client_queue: queue.Queue = queue.Queue(maxsize=256) + client_queue: queue.Queue = queue.Queue(maxsize=self._sse_queue_maxsize) with self._sse_lock: self._sse_clients.append(client_queue) @@ -677,12 +681,12 @@ class CompanionAPIEndpoints: while True: try: - item = client_queue.get(timeout=15.0) + item = client_queue.get(timeout=float(self._sse_keepalive_sec)) yield f"data: {json.dumps(item)}\n\n" except queue.Empty: - # Keep-alive comment - payload = {"event": "keepalive", "timestamp": int(time.time())} - yield f"data: {json.dumps(payload)}\n\n" + # Keep-alive comment frame keeps EventSource connected + # without allocating additional JSON payload objects. + yield ": keepalive\n\n" except GeneratorExit: pass except Exception as exc: diff --git a/repeater/web/http_server.py b/repeater/web/http_server.py index a7aa4ca..2067313 100644 --- a/repeater/web/http_server.py +++ b/repeater/web/http_server.py @@ -432,10 +432,17 @@ class HTTPStatsServer: config["/_next"]["cors.expose.on"] = True config["/favicon.ico"]["cors.expose.on"] = True + http_cfg = self.config.get("http", {}) if isinstance(self.config, dict) else {} + thread_pool = max(2, int(http_cfg.get("thread_pool", 8))) + thread_pool_max = max(thread_pool, int(http_cfg.get("thread_pool_max", 16))) + socket_timeout = max(15, int(http_cfg.get("socket_timeout", 65))) + socket_queue_size = max(10, int(http_cfg.get("socket_queue_size", 100))) + cherrypy.config.update( { "server.socket_host": self.host, "server.socket_port": self.port, + "server.socket_queue_size": socket_queue_size, "engine.autoreload.on": False, "log.screen": False, "log.access_file": "", # Disable access log file @@ -447,8 +454,22 @@ class HTTPStatsServer: # Add auth handlers to config so they're accessible in endpoints "jwt_handler": self.jwt_handler, "token_manager": self.token_manager, + # Bound the thread pool to prevent unbounded growth. + # SSE streams each hold one thread; allow headroom for concurrent + # SSE clients plus normal API polling without growing unboundedly. + "server.thread_pool": thread_pool, + "server.thread_pool_max": thread_pool_max, + # Close idle/stale connections so their threads return to the pool. + "server.socket_timeout": socket_timeout, } ) + logger.info( + "HTTP worker config: thread_pool=%s, thread_pool_max=%s, socket_timeout=%ss, socket_queue_size=%s", + thread_pool, + thread_pool_max, + socket_timeout, + socket_queue_size, + ) # Mount main app cherrypy.tree.mount(self.app, "/", config)