feat: add server-side airtime bucket aggregation for optimized chart rendering

This commit is contained in:
Lloyd
2026-04-21 14:46:30 +01:00
parent 81a3b70415
commit be56e919fd
5 changed files with 151 additions and 11 deletions

View File

@@ -852,9 +852,9 @@ class SQLiteHandler:
SELECT
timestamp, type, route, length, rssi, snr, score,
transmitted, is_duplicate, drop_reason, src_hash, dst_hash, path_hash,
header, transport_codes, payload, payload_length,
tx_delay_ms, packet_hash, original_path, forwarded_path, raw_packet,
lbt_attempts, lbt_backoff_delays_ms, lbt_channel_busy
transport_codes, payload, payload_length,
tx_delay_ms, packet_hash, original_path, forwarded_path,
lbt_attempts, lbt_channel_busy
FROM packets
ORDER BY timestamp DESC
LIMIT ?
@@ -904,9 +904,9 @@ class SQLiteHandler:
SELECT
timestamp, type, route, length, rssi, snr, score,
transmitted, is_duplicate, drop_reason, src_hash, dst_hash, path_hash,
header, transport_codes, payload, payload_length,
tx_delay_ms, packet_hash, original_path, forwarded_path, raw_packet,
lbt_attempts, lbt_backoff_delays_ms, lbt_channel_busy
transport_codes, payload, payload_length,
tx_delay_ms, packet_hash, original_path, forwarded_path,
lbt_attempts, lbt_channel_busy
FROM packets
"""
@@ -955,6 +955,71 @@ class SQLiteHandler:
logger.error(f"Failed to get airtime data: {e}")
return []
def get_airtime_buckets(
self,
start_timestamp: float,
end_timestamp: float,
bucket_seconds: int = 60,
sf: int = 9,
bw_hz: int = 62500,
cr: int = 5,
preamble: int = 17,
) -> list:
"""Return pre-aggregated airtime buckets for chart rendering.
Applies the Semtech LoRa airtime formula server-side and groups results
into time buckets, drastically reducing response size vs raw packet rows.
"""
import math
bw_khz = bw_hz / 1000
t_sym = (2**sf) / bw_khz # ms per symbol
t_preamble = (preamble + 4.25) * t_sym
de = 1 if sf >= 11 and bw_hz <= 125000 else 0
def _airtime_ms(length_bytes: int) -> float:
length_bytes = max(length_bytes or 32, 1)
numerator = max(8 * length_bytes - 4 * sf + 28 + 16, 0) # CRC=1, H=0
denominator = 4 * (sf - 2 * de)
n_payload = 8 + math.ceil(numerator / denominator) * cr
return t_preamble + n_payload * t_sym
try:
with self._connect() as conn:
conn.row_factory = sqlite3.Row
rows = conn.execute(
"SELECT timestamp, length, transmitted FROM packets "
"WHERE timestamp >= ? AND timestamp <= ? ORDER BY timestamp ASC",
(start_timestamp, end_timestamp),
).fetchall()
buckets: dict = {}
rx_total = 0
tx_total = 0
for row in rows:
bucket_ts = int(row["timestamp"] / bucket_seconds) * bucket_seconds
ms = _airtime_ms(row["length"])
if bucket_ts not in buckets:
buckets[bucket_ts] = {"timestamp": bucket_ts, "rx_ms": 0.0, "tx_ms": 0.0, "rx_count": 0, "tx_count": 0}
if row["transmitted"]:
buckets[bucket_ts]["tx_ms"] += ms
buckets[bucket_ts]["tx_count"] += 1
tx_total += 1
else:
buckets[bucket_ts]["rx_ms"] += ms
buckets[bucket_ts]["rx_count"] += 1
rx_total += 1
return {
"buckets": sorted(buckets.values(), key=lambda x: x["timestamp"]),
"bucket_seconds": bucket_seconds,
"rx_total": rx_total,
"tx_total": tx_total,
}
except Exception as e:
logger.error(f"Failed to get airtime buckets: {e}")
return {"buckets": [], "bucket_seconds": bucket_seconds, "rx_total": 0, "tx_total": 0}
def get_packet_by_hash(self, packet_hash: str) -> Optional[dict]:
try:
with self._connect() as conn:

View File

@@ -345,6 +345,20 @@ class StorageCollector:
) -> list:
return self.sqlite_handler.get_airtime_data(start_timestamp, end_timestamp, limit)
def get_airtime_buckets(
self,
start_timestamp: float,
end_timestamp: float,
bucket_seconds: int = 60,
sf: int = 9,
bw_hz: int = 62500,
cr: int = 5,
preamble: int = 17,
) -> dict:
return self.sqlite_handler.get_airtime_buckets(
start_timestamp, end_timestamp, bucket_seconds, sf, bw_hz, cr, preamble
)
def get_packet_by_hash(self, packet_hash: str) -> Optional[dict]:
return self.sqlite_handler.get_packet_by_hash(packet_hash)

View File

@@ -1457,6 +1457,42 @@ class APIEndpoints:
logger.error(f"Error getting airtime data: {e}")
return self._error(e)
@cherrypy.expose
@cherrypy.tools.json_out()
def airtime_chart_data(
self,
start_timestamp=None,
end_timestamp=None,
bucket_seconds=60,
sf=9,
bw_hz=62500,
cr=5,
preamble=17,
):
"""Server-side aggregated airtime utilization for chart rendering.
Returns pre-bucketed rx_ms/tx_ms per time bucket instead of raw packet rows,
reducing response size from potentially hundreds of KB to a few KB.
"""
try:
now = __import__("time").time()
start_ts = float(start_timestamp) if start_timestamp is not None else now - 86400
end_ts = float(end_timestamp) if end_timestamp is not None else now
bucket_s = max(10, min(int(bucket_seconds), 3600))
result = self._get_storage().get_airtime_buckets(
start_timestamp=start_ts,
end_timestamp=end_ts,
bucket_seconds=bucket_s,
sf=int(sf),
bw_hz=int(bw_hz),
cr=int(cr),
preamble=int(preamble),
)
return self._success(result)
except Exception as e:
logger.error(f"Error getting airtime chart data: {e}")
return self._error(e)
@cherrypy.expose
@cherrypy.tools.json_out()
def packet_by_hash(self, packet_hash=None):

View File

@@ -37,6 +37,10 @@ class CompanionAPIEndpoints:
self.config = config or {}
self.config_manager = config_manager
http_cfg = self.config.get("http", {}) if isinstance(self.config, dict) else {}
self._sse_queue_maxsize = max(32, int(http_cfg.get("sse_queue_maxsize", 64)))
self._sse_keepalive_sec = max(5, int(http_cfg.get("sse_keepalive_sec", 15)))
# SSE clients: each gets a thread-safe queue
self._sse_clients: list[queue.Queue] = []
self._sse_lock = threading.Lock()
@@ -666,7 +670,7 @@ class CompanionAPIEndpoints:
cherrypy.response.headers["Connection"] = "keep-alive"
cherrypy.response.headers["X-Accel-Buffering"] = "no"
client_queue: queue.Queue = queue.Queue(maxsize=256)
client_queue: queue.Queue = queue.Queue(maxsize=self._sse_queue_maxsize)
with self._sse_lock:
self._sse_clients.append(client_queue)
@@ -677,12 +681,12 @@ class CompanionAPIEndpoints:
while True:
try:
item = client_queue.get(timeout=15.0)
item = client_queue.get(timeout=float(self._sse_keepalive_sec))
yield f"data: {json.dumps(item)}\n\n"
except queue.Empty:
# Keep-alive comment
payload = {"event": "keepalive", "timestamp": int(time.time())}
yield f"data: {json.dumps(payload)}\n\n"
# Keep-alive comment frame keeps EventSource connected
# without allocating additional JSON payload objects.
yield ": keepalive\n\n"
except GeneratorExit:
pass
except Exception as exc:

View File

@@ -432,10 +432,17 @@ class HTTPStatsServer:
config["/_next"]["cors.expose.on"] = True
config["/favicon.ico"]["cors.expose.on"] = True
http_cfg = self.config.get("http", {}) if isinstance(self.config, dict) else {}
thread_pool = max(2, int(http_cfg.get("thread_pool", 8)))
thread_pool_max = max(thread_pool, int(http_cfg.get("thread_pool_max", 16)))
socket_timeout = max(15, int(http_cfg.get("socket_timeout", 65)))
socket_queue_size = max(10, int(http_cfg.get("socket_queue_size", 100)))
cherrypy.config.update(
{
"server.socket_host": self.host,
"server.socket_port": self.port,
"server.socket_queue_size": socket_queue_size,
"engine.autoreload.on": False,
"log.screen": False,
"log.access_file": "", # Disable access log file
@@ -447,8 +454,22 @@ class HTTPStatsServer:
# Add auth handlers to config so they're accessible in endpoints
"jwt_handler": self.jwt_handler,
"token_manager": self.token_manager,
# Bound the thread pool to prevent unbounded growth.
# SSE streams each hold one thread; allow headroom for concurrent
# SSE clients plus normal API polling without growing unboundedly.
"server.thread_pool": thread_pool,
"server.thread_pool_max": thread_pool_max,
# Close idle/stale connections so their threads return to the pool.
"server.socket_timeout": socket_timeout,
}
)
logger.info(
"HTTP worker config: thread_pool=%s, thread_pool_max=%s, socket_timeout=%ss, socket_queue_size=%s",
thread_pool,
thread_pool_max,
socket_timeout,
socket_queue_size,
)
# Mount main app
cherrypy.tree.mount(self.app, "/", config)