Compare commits

...

3 Commits

Author SHA1 Message Date
Jack Kingsman abe8390237 Test self-reexec on windows 2026-04-07 18:53:09 -07:00
Jack Kingsman 5fe0ac0ad4 Be more memory concious on recent contact fetch 2026-04-07 16:41:34 -07:00
Jack Kingsman b98102ccac Add 72hr packet density view 2026-04-07 16:26:01 -07:00
10 changed files with 332 additions and 4 deletions
+21
View File
@@ -12,6 +12,7 @@ from __future__ import annotations
import asyncio
import json
import logging
import sys
import time
from abc import ABC, abstractmethod
from typing import Any
@@ -252,6 +253,26 @@ class BaseMqttPublisher(ABC):
self._client = None
self._last_error = _format_error_detail(e)
# Windows ProactorEventLoop does not implement add_reader /
# add_writer, which paho-mqtt requires. Give a specific,
# actionable toast instead of the generic connection error.
if isinstance(e, NotImplementedError) and sys.platform == "win32":
broadcast_error(
"MQTT unavailable — Windows event loop incompatible",
"The default Windows event loop (ProactorEventLoop) does "
"not support MQTT. Restart with: uv run uvicorn "
"app.main:app --loop none",
)
_broadcast_health()
logger.error(
"%s cannot run: Windows ProactorEventLoop does not "
"implement add_reader/add_writer required by paho-mqtt. "
"Restart uvicorn with '--loop none' to use "
"SelectorEventLoop instead. Giving up (will not retry).",
self._integration_label(),
)
return
title, detail = self._on_error()
broadcast_error(title, detail)
_broadcast_health()
+117
View File
@@ -1,3 +1,120 @@
import os
import sys
# ---------------------------------------------------------------------------
# Windows event-loop fix for MQTT fanout (aiomqtt / paho-mqtt) compatibility
# ---------------------------------------------------------------------------
# On Windows, uvicorn's default "auto" loop explicitly creates ProactorEventLoop,
# which does NOT implement add_reader()/add_writer() — calls that paho-mqtt
# requires internally. Setting the event loop *policy* alone is not enough
# because uvicorn's "auto" factory bypasses it.
#
# The fix: re-exec the current process with "--loop none", which tells uvicorn
# to let asyncio.run() create the loop through the standard policy (where we
# have just installed WindowsSelectorEventLoopPolicy).
#
# Guards:
# - "--loop" already in argv → we (or the operator) already handled it
# - MESHCORE_NO_AUTO_LOOP_ON_WIN32=true → operator opt-out for custom
# runners, test harnesses, or other non-uvicorn invocations
# ---------------------------------------------------------------------------
_win32_needs_reexec = (
sys.platform == "win32"
and os.environ.get("MESHCORE_NO_AUTO_LOOP_ON_WIN32", "").lower() not in ("true", "1")
and "--loop" not in sys.argv
)
# Skip re-exec when --reload is active: on Windows os.execv spawns a new
# process and exits, so the reloader's child dies and a fresh uvicorn
# (with its own reloader) starts — creating doubled watchers or a loop.
# Also skip if sys.executable is missing (embedded / frozen Python).
if _win32_needs_reexec and "--reload" in sys.argv:
print(
"\n" + "!" * 78 + "\n"
" WINDOWS + --reload DETECTED\n" + "!" * 78 + "\n"
"\n"
" We can't auto-fix the event loop when --reload is active because\n"
" the re-exec would fight with uvicorn's reloader process.\n"
"\n"
" If you need MQTT fanout, add --loop none to your command:\n"
"\n"
" uv run uvicorn app.main:app --reload \033[1m--loop none\033[0m [... other options ...]\n"
"\n"
" Everything else works fine as-is.\n"
"\n" + "!" * 78 + "\n",
file=sys.stderr,
flush=True,
)
_win32_needs_reexec = False
if _win32_needs_reexec and not sys.executable:
# Embedded or frozen Python — can't re-exec, just warn.
_win32_needs_reexec = False
if _win32_needs_reexec:
import asyncio as _asyncio
_asyncio.set_event_loop_policy(
_asyncio.WindowsSelectorEventLoopPolicy() # type: ignore[attr-defined]
)
print(
"\n" + "=" * 78 + "\n"
" HALLO FRIEND WINDOWS USER <3 WE GOTTA ADJUST THINGS BEFORE YOU STARTUP\n"
+ "="
* 78
+ "\n"
"\n"
" uvicorn's default event loop on Windows (ProactorEventLoop) is not\n"
" compatible with aiomqtt/paho-mqtt, which require add_reader() /\n"
" add_writer(). Re-executing with '--loop none' so uvicorn honours\n"
" WindowsSelectorEventLoopPolicy and MQTT fanout can function.\n"
""
" In English: The code we use for MQTT is fussy. We're restarting\n"
" the server with the right settings for MQTT to work.\n"
"\n"
" This may or may not work :) If the app starts up after this without a warning, you're good to go.\n"
"\n" + "=" * 78 + "\n",
file=sys.stderr,
flush=True,
)
# sys.argv[0] on Windows is typically a .exe console-script launcher
# (e.g. .venv\Scripts\uvicorn.exe) which Python can't open as a script.
# use "python -m uvicorn" instead, forwarding the original arguments.
# yes, this is brittle as all hell.
try:
os.execv(
sys.executable,
[sys.executable, "-m", "uvicorn"] + sys.argv[1:] + ["--loop", "none"],
)
except Exception:
# execv failed — fall through and let the app start normally.
# MQTT fanout will not work, but everything else will.
print(
"\n" + "!" * 78 + "\n"
" AUTO-RESTART FAILED :<\n" + "!" * 78 + "\n"
"\n"
" We tried to restart uvicorn with the necessary settings\n"
" automatically, but there was a problem with the invocation\n"
" (not shocking; this is a fragile system).\n"
"\n"
" Please rerun RemoteTerm with a command like:\n"
"\n"
" uv run uvicorn app.main:app \033[1m--loop none\033[0m [... other options ...]\n"
"\n"
" Setting '--loop none' on uvicorn startup will put you in a good\n"
" state for MQTT and bypass this self-repair.\n"
"\n"
" The server is starting anyway -- everything except MQTT fanout\n"
" will work normally. If you want to suppress this attempt, \n"
" set the env var MESHCORE_NO_AUTO_LOOP_ON_WIN32=true\n"
"\n" + "!" * 78 + "\n",
file=sys.stderr,
flush=True,
)
# ---------------------------------------------------------------------------
import asyncio
import logging
from contextlib import asynccontextmanager
+6
View File
@@ -884,6 +884,11 @@ class NoiseFloorHistoryStats(BaseModel):
samples: list[NoiseFloorSample] = Field(default_factory=list)
class PacketsPerHourBucket(BaseModel):
timestamp: int = Field(description="Unix timestamp at the start of the hour")
count: int = Field(description="Number of packets received in that hour")
class StatisticsResponse(BaseModel):
busiest_channels_24h: list[BusyChannel]
contact_count: int
@@ -899,6 +904,7 @@ class StatisticsResponse(BaseModel):
repeaters_heard: ContactActivityCounts
known_channels_active: ContactActivityCounts
path_hash_width_24h: PathHashWidthStats
packets_per_hour_72h: list[PacketsPerHourBucket]
noise_floor_24h: NoiseFloorHistoryStats
+11 -4
View File
@@ -692,9 +692,18 @@ class ContactAdvertPathRepository:
cursor = await db.conn.execute(
"""
SELECT public_key, path_hex, path_len, first_seen, last_seen, heard_count
FROM contact_advert_paths
FROM (
SELECT *,
ROW_NUMBER() OVER (
PARTITION BY public_key
ORDER BY last_seen DESC, heard_count DESC, path_len ASC, path_hex ASC
) AS rn
FROM contact_advert_paths
)
WHERE rn <= ?
ORDER BY public_key ASC, last_seen DESC, heard_count DESC, path_len ASC, path_hex ASC
"""
""",
(limit_per_contact,),
)
rows = await cursor.fetchall()
@@ -705,8 +714,6 @@ class ContactAdvertPathRepository:
if paths is None:
paths = []
grouped[key] = paths
if len(paths) >= limit_per_contact:
continue
paths.append(ContactAdvertPathRepository._row_to_path(row))
return [
+22
View File
@@ -11,6 +11,7 @@ logger = logging.getLogger(__name__)
SECONDS_1H = 3600
SECONDS_24H = 86400
SECONDS_72H = 259200
SECONDS_7D = 604800
RAW_PACKET_STATS_BATCH_SIZE = 500
@@ -274,6 +275,25 @@ class StatisticsRepository:
"last_week": row["last_week"] or 0,
}
@staticmethod
async def _packets_per_hour_72h() -> list[dict[str, int]]:
"""Return packet counts bucketed by hour for the last 72 hours."""
now = int(time.time())
cutoff = now - SECONDS_72H
# Bucket timestamps to the start of each hour
cursor = await db.conn.execute(
"""
SELECT (timestamp / 3600) * 3600 AS hour_ts, COUNT(*) AS count
FROM raw_packets
WHERE timestamp >= ?
GROUP BY hour_ts
ORDER BY hour_ts
""",
(cutoff,),
)
rows = await cursor.fetchall()
return [{"timestamp": row["hour_ts"], "count": row["count"]} for row in rows]
@staticmethod
async def _path_hash_width_24h() -> dict[str, int | float]:
"""Count parsed raw packets from the last 24h by hop hash width."""
@@ -350,6 +370,7 @@ class StatisticsRepository:
repeaters_heard = await StatisticsRepository._activity_counts(contact_type=2)
known_channels_active = await StatisticsRepository._known_channels_active()
path_hash_width_24h = await StatisticsRepository._path_hash_width_24h()
packets_per_hour_72h = await StatisticsRepository._packets_per_hour_72h()
return {
"busiest_channels_24h": busiest_channels_24h,
@@ -366,4 +387,5 @@ class StatisticsRepository:
"repeaters_heard": repeaters_heard,
"known_channels_active": known_channels_active,
"path_hash_width_24h": path_hash_width_24h,
"packets_per_hour_72h": packets_per_hour_72h,
}
@@ -42,6 +42,87 @@ function formatTime(ts: number): string {
});
}
function formatDateTime(ts: number): string {
const d = new Date(ts * 1000);
return (
d.toLocaleDateString([], { month: 'short', day: 'numeric' }) +
' ' +
d.toLocaleTimeString([], { hour: '2-digit', minute: '2-digit', hour12: false })
);
}
function PacketsPerHourChart({ buckets }: { buckets: { timestamp: number; count: number }[] }) {
// Fill gaps so hours with zero packets still appear on the chart
const filled: { timestamp: number; count: number }[] = [];
if (buckets.length > 0) {
const first = buckets[0].timestamp;
const last = buckets[buckets.length - 1].timestamp;
const byTs = new Map(buckets.map((b) => [b.timestamp, b.count]));
for (let ts = first; ts <= last; ts += 3600) {
filled.push({ timestamp: ts, count: byTs.get(ts) ?? 0 });
}
}
const data = filled.map((b, i) => ({
idx: i,
label: formatDateTime(b.timestamp),
count: b.count,
}));
// Show ~6 evenly-spaced tick labels
const tickCount = Math.min(6, data.length);
const tickIndices: number[] = [];
if (data.length > 1) {
for (let i = 0; i < tickCount; i++) {
tickIndices.push(Math.round((i / (tickCount - 1)) * (data.length - 1)));
}
}
return (
<ResponsiveContainer width="100%" height={140}>
<AreaChart data={data} margin={{ top: 4, right: 4, bottom: 0, left: -8 }}>
<CartesianGrid strokeDasharray="3 3" stroke="hsl(var(--border))" vertical={false} />
<XAxis
dataKey="idx"
type="number"
domain={[0, data.length - 1]}
tick={{ fontSize: 10, fill: 'hsl(var(--muted-foreground))' }}
tickLine={false}
axisLine={false}
ticks={tickIndices}
tickFormatter={(idx) => data[idx]?.label ?? ''}
/>
<YAxis
tick={{ fontSize: 10, fill: 'hsl(var(--muted-foreground))' }}
tickLine={false}
axisLine={false}
allowDecimals={false}
/>
<RechartsTooltip
{...TOOLTIP_STYLE}
cursor={{
stroke: 'hsl(var(--muted-foreground))',
strokeWidth: 1,
strokeDasharray: '3 3',
}}
labelFormatter={(idx) => data[Number(idx)]?.label ?? ''}
formatter={(value) => [`${Number(value).toLocaleString()} packets`, 'Count']}
/>
<Area
type="monotone"
dataKey="count"
stroke="#0ea5e9"
fill="#0ea5e9"
fillOpacity={0.15}
strokeWidth={1.5}
dot={false}
activeDot={{ r: 4, fill: '#0ea5e9', strokeWidth: 2, stroke: 'hsl(var(--popover))' }}
/>
</AreaChart>
</ResponsiveContainer>
);
}
function NoiseFloorChart({
samples,
}: {
@@ -241,6 +322,17 @@ export function SettingsStatisticsSection({ className }: { className?: string })
</div>
</div>
{/* Packets per Hour (72h) */}
{stats.packets_per_hour_72h?.length > 0 && (
<>
<Separator />
<div>
<h4 className="text-sm font-medium mb-2">Packets per Hour (72h)</h4>
<PacketsPerHourChart buckets={stats.packets_per_hour_72h} />
</div>
</>
)}
<Separator />
{/* Path Hash Width */}
+5
View File
@@ -652,6 +652,10 @@ describe('SettingsModal', () => {
double_byte_pct: 30,
triple_byte_pct: 20,
},
packets_per_hour_72h: [
{ timestamp: 1711792800, count: 12 },
{ timestamp: 1711796400, count: 8 },
],
noise_floor_24h: {
sample_interval_seconds: 300,
coverage_seconds: 3600,
@@ -722,6 +726,7 @@ describe('SettingsModal', () => {
double_byte_pct: 30,
triple_byte_pct: 20,
},
packets_per_hour_72h: [],
noise_floor_24h: {
sample_interval_seconds: 300,
coverage_seconds: 0,
+6
View File
@@ -544,6 +544,11 @@ export interface NoiseFloorHistoryStats {
samples: NoiseFloorSample[];
}
interface PacketsPerHourBucket {
timestamp: number;
count: number;
}
export interface StatisticsResponse {
busiest_channels_24h: BusyChannel[];
contact_count: number;
@@ -567,5 +572,6 @@ export interface StatisticsResponse {
double_byte_pct: number;
triple_byte_pct: number;
};
packets_per_hour_72h: PacketsPerHourBucket[];
noise_floor_24h: NoiseFloorHistoryStats;
}
+3
View File
@@ -53,6 +53,9 @@ ignore = [
"SIM117", # nested with statements - can be clearer in tests
]
[tool.ruff.lint.per-file-ignores]
"app/main.py" = ["E402"] # imports after Windows event-loop re-exec block
[tool.ruff.lint.isort]
known-first-party = ["app"]
+49
View File
@@ -43,6 +43,7 @@ class TestStatisticsEmpty:
"double_byte_pct": 0.0,
"triple_byte_pct": 0.0,
}
assert result["packets_per_hour_72h"] == []
class TestStatisticsCounts:
@@ -397,6 +398,54 @@ class TestPathHashWidthStats:
assert breakdown["triple_byte"] == 1
class TestPacketsPerHour:
@pytest.mark.asyncio
async def test_buckets_packets_by_hour(self, test_db):
"""Packets within 72h are bucketed by hour."""
now = int(time.time())
hour_start = (now // 3600) * 3600
conn = test_db.conn
# 3 packets in the current hour, 1 in the previous hour
for i in range(3):
await conn.execute(
"INSERT INTO raw_packets (timestamp, data, payload_hash) VALUES (?, ?, ?)",
(hour_start + i, b"\x01", bytes([i]) * 32),
)
await conn.execute(
"INSERT INTO raw_packets (timestamp, data, payload_hash) VALUES (?, ?, ?)",
(hour_start - 1800, b"\x02", b"\xaa" * 32),
)
# 1 packet outside the 72h window — should be excluded
await conn.execute(
"INSERT INTO raw_packets (timestamp, data, payload_hash) VALUES (?, ?, ?)",
(now - 260000, b"\x03", b"\xbb" * 32),
)
await conn.commit()
result = await StatisticsRepository.get_all()
buckets = result["packets_per_hour_72h"]
assert len(buckets) == 2
by_ts = {b["timestamp"]: b["count"] for b in buckets}
assert by_ts[hour_start] == 3
assert by_ts[hour_start - 3600] == 1
@pytest.mark.asyncio
async def test_empty_when_no_recent_packets(self, test_db):
"""Returns empty list when all packets are older than 72h."""
now = int(time.time())
conn = test_db.conn
await conn.execute(
"INSERT INTO raw_packets (timestamp, data, payload_hash) VALUES (?, ?, ?)",
(now - 300000, b"\x01", b"\x01" * 32),
)
await conn.commit()
result = await StatisticsRepository.get_all()
assert result["packets_per_hour_72h"] == []
class TestStatisticsEndpoint:
@pytest.mark.asyncio
async def test_statistics_endpoint_includes_noise_floor_history(self, test_db, client):