mirror of
https://github.com/jkingsman/Remote-Terminal-for-MeshCore.git
synced 2026-05-09 23:05:10 +02:00
Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| abe8390237 | |||
| 5fe0ac0ad4 | |||
| b98102ccac |
@@ -12,6 +12,7 @@ from __future__ import annotations
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
import sys
|
||||
import time
|
||||
from abc import ABC, abstractmethod
|
||||
from typing import Any
|
||||
@@ -252,6 +253,26 @@ class BaseMqttPublisher(ABC):
|
||||
self._client = None
|
||||
self._last_error = _format_error_detail(e)
|
||||
|
||||
# Windows ProactorEventLoop does not implement add_reader /
|
||||
# add_writer, which paho-mqtt requires. Give a specific,
|
||||
# actionable toast instead of the generic connection error.
|
||||
if isinstance(e, NotImplementedError) and sys.platform == "win32":
|
||||
broadcast_error(
|
||||
"MQTT unavailable — Windows event loop incompatible",
|
||||
"The default Windows event loop (ProactorEventLoop) does "
|
||||
"not support MQTT. Restart with: uv run uvicorn "
|
||||
"app.main:app --loop none",
|
||||
)
|
||||
_broadcast_health()
|
||||
logger.error(
|
||||
"%s cannot run: Windows ProactorEventLoop does not "
|
||||
"implement add_reader/add_writer required by paho-mqtt. "
|
||||
"Restart uvicorn with '--loop none' to use "
|
||||
"SelectorEventLoop instead. Giving up (will not retry).",
|
||||
self._integration_label(),
|
||||
)
|
||||
return
|
||||
|
||||
title, detail = self._on_error()
|
||||
broadcast_error(title, detail)
|
||||
_broadcast_health()
|
||||
|
||||
+117
@@ -1,3 +1,120 @@
|
||||
import os
|
||||
import sys
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Windows event-loop fix for MQTT fanout (aiomqtt / paho-mqtt) compatibility
|
||||
# ---------------------------------------------------------------------------
|
||||
# On Windows, uvicorn's default "auto" loop explicitly creates ProactorEventLoop,
|
||||
# which does NOT implement add_reader()/add_writer() — calls that paho-mqtt
|
||||
# requires internally. Setting the event loop *policy* alone is not enough
|
||||
# because uvicorn's "auto" factory bypasses it.
|
||||
#
|
||||
# The fix: re-exec the current process with "--loop none", which tells uvicorn
|
||||
# to let asyncio.run() create the loop through the standard policy (where we
|
||||
# have just installed WindowsSelectorEventLoopPolicy).
|
||||
#
|
||||
# Guards:
|
||||
# - "--loop" already in argv → we (or the operator) already handled it
|
||||
# - MESHCORE_NO_AUTO_LOOP_ON_WIN32=true → operator opt-out for custom
|
||||
# runners, test harnesses, or other non-uvicorn invocations
|
||||
# ---------------------------------------------------------------------------
|
||||
_win32_needs_reexec = (
|
||||
sys.platform == "win32"
|
||||
and os.environ.get("MESHCORE_NO_AUTO_LOOP_ON_WIN32", "").lower() not in ("true", "1")
|
||||
and "--loop" not in sys.argv
|
||||
)
|
||||
# Skip re-exec when --reload is active: on Windows os.execv spawns a new
|
||||
# process and exits, so the reloader's child dies and a fresh uvicorn
|
||||
# (with its own reloader) starts — creating doubled watchers or a loop.
|
||||
# Also skip if sys.executable is missing (embedded / frozen Python).
|
||||
if _win32_needs_reexec and "--reload" in sys.argv:
|
||||
print(
|
||||
"\n" + "!" * 78 + "\n"
|
||||
" WINDOWS + --reload DETECTED\n" + "!" * 78 + "\n"
|
||||
"\n"
|
||||
" We can't auto-fix the event loop when --reload is active because\n"
|
||||
" the re-exec would fight with uvicorn's reloader process.\n"
|
||||
"\n"
|
||||
" If you need MQTT fanout, add --loop none to your command:\n"
|
||||
"\n"
|
||||
" uv run uvicorn app.main:app --reload \033[1m--loop none\033[0m [... other options ...]\n"
|
||||
"\n"
|
||||
" Everything else works fine as-is.\n"
|
||||
"\n" + "!" * 78 + "\n",
|
||||
file=sys.stderr,
|
||||
flush=True,
|
||||
)
|
||||
_win32_needs_reexec = False
|
||||
|
||||
if _win32_needs_reexec and not sys.executable:
|
||||
# Embedded or frozen Python — can't re-exec, just warn.
|
||||
_win32_needs_reexec = False
|
||||
|
||||
if _win32_needs_reexec:
|
||||
import asyncio as _asyncio
|
||||
|
||||
_asyncio.set_event_loop_policy(
|
||||
_asyncio.WindowsSelectorEventLoopPolicy() # type: ignore[attr-defined]
|
||||
)
|
||||
|
||||
print(
|
||||
"\n" + "=" * 78 + "\n"
|
||||
" HALLO FRIEND WINDOWS USER <3 WE GOTTA ADJUST THINGS BEFORE YOU STARTUP\n"
|
||||
+ "="
|
||||
* 78
|
||||
+ "\n"
|
||||
"\n"
|
||||
" uvicorn's default event loop on Windows (ProactorEventLoop) is not\n"
|
||||
" compatible with aiomqtt/paho-mqtt, which require add_reader() /\n"
|
||||
" add_writer(). Re-executing with '--loop none' so uvicorn honours\n"
|
||||
" WindowsSelectorEventLoopPolicy and MQTT fanout can function.\n"
|
||||
""
|
||||
" In English: The code we use for MQTT is fussy. We're restarting\n"
|
||||
" the server with the right settings for MQTT to work.\n"
|
||||
"\n"
|
||||
" This may or may not work :) If the app starts up after this without a warning, you're good to go.\n"
|
||||
"\n" + "=" * 78 + "\n",
|
||||
file=sys.stderr,
|
||||
flush=True,
|
||||
)
|
||||
|
||||
# sys.argv[0] on Windows is typically a .exe console-script launcher
|
||||
# (e.g. .venv\Scripts\uvicorn.exe) which Python can't open as a script.
|
||||
# use "python -m uvicorn" instead, forwarding the original arguments.
|
||||
# yes, this is brittle as all hell.
|
||||
try:
|
||||
os.execv(
|
||||
sys.executable,
|
||||
[sys.executable, "-m", "uvicorn"] + sys.argv[1:] + ["--loop", "none"],
|
||||
)
|
||||
except Exception:
|
||||
# execv failed — fall through and let the app start normally.
|
||||
# MQTT fanout will not work, but everything else will.
|
||||
print(
|
||||
"\n" + "!" * 78 + "\n"
|
||||
" AUTO-RESTART FAILED :<\n" + "!" * 78 + "\n"
|
||||
"\n"
|
||||
" We tried to restart uvicorn with the necessary settings\n"
|
||||
" automatically, but there was a problem with the invocation\n"
|
||||
" (not shocking; this is a fragile system).\n"
|
||||
"\n"
|
||||
" Please rerun RemoteTerm with a command like:\n"
|
||||
"\n"
|
||||
" uv run uvicorn app.main:app \033[1m--loop none\033[0m [... other options ...]\n"
|
||||
"\n"
|
||||
" Setting '--loop none' on uvicorn startup will put you in a good\n"
|
||||
" state for MQTT and bypass this self-repair.\n"
|
||||
"\n"
|
||||
" The server is starting anyway -- everything except MQTT fanout\n"
|
||||
" will work normally. If you want to suppress this attempt, \n"
|
||||
" set the env var MESHCORE_NO_AUTO_LOOP_ON_WIN32=true\n"
|
||||
"\n" + "!" * 78 + "\n",
|
||||
file=sys.stderr,
|
||||
flush=True,
|
||||
)
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
from contextlib import asynccontextmanager
|
||||
|
||||
@@ -884,6 +884,11 @@ class NoiseFloorHistoryStats(BaseModel):
|
||||
samples: list[NoiseFloorSample] = Field(default_factory=list)
|
||||
|
||||
|
||||
class PacketsPerHourBucket(BaseModel):
|
||||
timestamp: int = Field(description="Unix timestamp at the start of the hour")
|
||||
count: int = Field(description="Number of packets received in that hour")
|
||||
|
||||
|
||||
class StatisticsResponse(BaseModel):
|
||||
busiest_channels_24h: list[BusyChannel]
|
||||
contact_count: int
|
||||
@@ -899,6 +904,7 @@ class StatisticsResponse(BaseModel):
|
||||
repeaters_heard: ContactActivityCounts
|
||||
known_channels_active: ContactActivityCounts
|
||||
path_hash_width_24h: PathHashWidthStats
|
||||
packets_per_hour_72h: list[PacketsPerHourBucket]
|
||||
noise_floor_24h: NoiseFloorHistoryStats
|
||||
|
||||
|
||||
|
||||
@@ -692,9 +692,18 @@ class ContactAdvertPathRepository:
|
||||
cursor = await db.conn.execute(
|
||||
"""
|
||||
SELECT public_key, path_hex, path_len, first_seen, last_seen, heard_count
|
||||
FROM contact_advert_paths
|
||||
FROM (
|
||||
SELECT *,
|
||||
ROW_NUMBER() OVER (
|
||||
PARTITION BY public_key
|
||||
ORDER BY last_seen DESC, heard_count DESC, path_len ASC, path_hex ASC
|
||||
) AS rn
|
||||
FROM contact_advert_paths
|
||||
)
|
||||
WHERE rn <= ?
|
||||
ORDER BY public_key ASC, last_seen DESC, heard_count DESC, path_len ASC, path_hex ASC
|
||||
"""
|
||||
""",
|
||||
(limit_per_contact,),
|
||||
)
|
||||
rows = await cursor.fetchall()
|
||||
|
||||
@@ -705,8 +714,6 @@ class ContactAdvertPathRepository:
|
||||
if paths is None:
|
||||
paths = []
|
||||
grouped[key] = paths
|
||||
if len(paths) >= limit_per_contact:
|
||||
continue
|
||||
paths.append(ContactAdvertPathRepository._row_to_path(row))
|
||||
|
||||
return [
|
||||
|
||||
@@ -11,6 +11,7 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
SECONDS_1H = 3600
|
||||
SECONDS_24H = 86400
|
||||
SECONDS_72H = 259200
|
||||
SECONDS_7D = 604800
|
||||
RAW_PACKET_STATS_BATCH_SIZE = 500
|
||||
|
||||
@@ -274,6 +275,25 @@ class StatisticsRepository:
|
||||
"last_week": row["last_week"] or 0,
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
async def _packets_per_hour_72h() -> list[dict[str, int]]:
|
||||
"""Return packet counts bucketed by hour for the last 72 hours."""
|
||||
now = int(time.time())
|
||||
cutoff = now - SECONDS_72H
|
||||
# Bucket timestamps to the start of each hour
|
||||
cursor = await db.conn.execute(
|
||||
"""
|
||||
SELECT (timestamp / 3600) * 3600 AS hour_ts, COUNT(*) AS count
|
||||
FROM raw_packets
|
||||
WHERE timestamp >= ?
|
||||
GROUP BY hour_ts
|
||||
ORDER BY hour_ts
|
||||
""",
|
||||
(cutoff,),
|
||||
)
|
||||
rows = await cursor.fetchall()
|
||||
return [{"timestamp": row["hour_ts"], "count": row["count"]} for row in rows]
|
||||
|
||||
@staticmethod
|
||||
async def _path_hash_width_24h() -> dict[str, int | float]:
|
||||
"""Count parsed raw packets from the last 24h by hop hash width."""
|
||||
@@ -350,6 +370,7 @@ class StatisticsRepository:
|
||||
repeaters_heard = await StatisticsRepository._activity_counts(contact_type=2)
|
||||
known_channels_active = await StatisticsRepository._known_channels_active()
|
||||
path_hash_width_24h = await StatisticsRepository._path_hash_width_24h()
|
||||
packets_per_hour_72h = await StatisticsRepository._packets_per_hour_72h()
|
||||
|
||||
return {
|
||||
"busiest_channels_24h": busiest_channels_24h,
|
||||
@@ -366,4 +387,5 @@ class StatisticsRepository:
|
||||
"repeaters_heard": repeaters_heard,
|
||||
"known_channels_active": known_channels_active,
|
||||
"path_hash_width_24h": path_hash_width_24h,
|
||||
"packets_per_hour_72h": packets_per_hour_72h,
|
||||
}
|
||||
|
||||
@@ -42,6 +42,87 @@ function formatTime(ts: number): string {
|
||||
});
|
||||
}
|
||||
|
||||
function formatDateTime(ts: number): string {
|
||||
const d = new Date(ts * 1000);
|
||||
return (
|
||||
d.toLocaleDateString([], { month: 'short', day: 'numeric' }) +
|
||||
' ' +
|
||||
d.toLocaleTimeString([], { hour: '2-digit', minute: '2-digit', hour12: false })
|
||||
);
|
||||
}
|
||||
|
||||
function PacketsPerHourChart({ buckets }: { buckets: { timestamp: number; count: number }[] }) {
|
||||
// Fill gaps so hours with zero packets still appear on the chart
|
||||
const filled: { timestamp: number; count: number }[] = [];
|
||||
if (buckets.length > 0) {
|
||||
const first = buckets[0].timestamp;
|
||||
const last = buckets[buckets.length - 1].timestamp;
|
||||
const byTs = new Map(buckets.map((b) => [b.timestamp, b.count]));
|
||||
for (let ts = first; ts <= last; ts += 3600) {
|
||||
filled.push({ timestamp: ts, count: byTs.get(ts) ?? 0 });
|
||||
}
|
||||
}
|
||||
|
||||
const data = filled.map((b, i) => ({
|
||||
idx: i,
|
||||
label: formatDateTime(b.timestamp),
|
||||
count: b.count,
|
||||
}));
|
||||
|
||||
// Show ~6 evenly-spaced tick labels
|
||||
const tickCount = Math.min(6, data.length);
|
||||
const tickIndices: number[] = [];
|
||||
if (data.length > 1) {
|
||||
for (let i = 0; i < tickCount; i++) {
|
||||
tickIndices.push(Math.round((i / (tickCount - 1)) * (data.length - 1)));
|
||||
}
|
||||
}
|
||||
|
||||
return (
|
||||
<ResponsiveContainer width="100%" height={140}>
|
||||
<AreaChart data={data} margin={{ top: 4, right: 4, bottom: 0, left: -8 }}>
|
||||
<CartesianGrid strokeDasharray="3 3" stroke="hsl(var(--border))" vertical={false} />
|
||||
<XAxis
|
||||
dataKey="idx"
|
||||
type="number"
|
||||
domain={[0, data.length - 1]}
|
||||
tick={{ fontSize: 10, fill: 'hsl(var(--muted-foreground))' }}
|
||||
tickLine={false}
|
||||
axisLine={false}
|
||||
ticks={tickIndices}
|
||||
tickFormatter={(idx) => data[idx]?.label ?? ''}
|
||||
/>
|
||||
<YAxis
|
||||
tick={{ fontSize: 10, fill: 'hsl(var(--muted-foreground))' }}
|
||||
tickLine={false}
|
||||
axisLine={false}
|
||||
allowDecimals={false}
|
||||
/>
|
||||
<RechartsTooltip
|
||||
{...TOOLTIP_STYLE}
|
||||
cursor={{
|
||||
stroke: 'hsl(var(--muted-foreground))',
|
||||
strokeWidth: 1,
|
||||
strokeDasharray: '3 3',
|
||||
}}
|
||||
labelFormatter={(idx) => data[Number(idx)]?.label ?? ''}
|
||||
formatter={(value) => [`${Number(value).toLocaleString()} packets`, 'Count']}
|
||||
/>
|
||||
<Area
|
||||
type="monotone"
|
||||
dataKey="count"
|
||||
stroke="#0ea5e9"
|
||||
fill="#0ea5e9"
|
||||
fillOpacity={0.15}
|
||||
strokeWidth={1.5}
|
||||
dot={false}
|
||||
activeDot={{ r: 4, fill: '#0ea5e9', strokeWidth: 2, stroke: 'hsl(var(--popover))' }}
|
||||
/>
|
||||
</AreaChart>
|
||||
</ResponsiveContainer>
|
||||
);
|
||||
}
|
||||
|
||||
function NoiseFloorChart({
|
||||
samples,
|
||||
}: {
|
||||
@@ -241,6 +322,17 @@ export function SettingsStatisticsSection({ className }: { className?: string })
|
||||
</div>
|
||||
</div>
|
||||
|
||||
{/* Packets per Hour (72h) */}
|
||||
{stats.packets_per_hour_72h?.length > 0 && (
|
||||
<>
|
||||
<Separator />
|
||||
<div>
|
||||
<h4 className="text-sm font-medium mb-2">Packets per Hour (72h)</h4>
|
||||
<PacketsPerHourChart buckets={stats.packets_per_hour_72h} />
|
||||
</div>
|
||||
</>
|
||||
)}
|
||||
|
||||
<Separator />
|
||||
|
||||
{/* Path Hash Width */}
|
||||
|
||||
@@ -652,6 +652,10 @@ describe('SettingsModal', () => {
|
||||
double_byte_pct: 30,
|
||||
triple_byte_pct: 20,
|
||||
},
|
||||
packets_per_hour_72h: [
|
||||
{ timestamp: 1711792800, count: 12 },
|
||||
{ timestamp: 1711796400, count: 8 },
|
||||
],
|
||||
noise_floor_24h: {
|
||||
sample_interval_seconds: 300,
|
||||
coverage_seconds: 3600,
|
||||
@@ -722,6 +726,7 @@ describe('SettingsModal', () => {
|
||||
double_byte_pct: 30,
|
||||
triple_byte_pct: 20,
|
||||
},
|
||||
packets_per_hour_72h: [],
|
||||
noise_floor_24h: {
|
||||
sample_interval_seconds: 300,
|
||||
coverage_seconds: 0,
|
||||
|
||||
@@ -544,6 +544,11 @@ export interface NoiseFloorHistoryStats {
|
||||
samples: NoiseFloorSample[];
|
||||
}
|
||||
|
||||
interface PacketsPerHourBucket {
|
||||
timestamp: number;
|
||||
count: number;
|
||||
}
|
||||
|
||||
export interface StatisticsResponse {
|
||||
busiest_channels_24h: BusyChannel[];
|
||||
contact_count: number;
|
||||
@@ -567,5 +572,6 @@ export interface StatisticsResponse {
|
||||
double_byte_pct: number;
|
||||
triple_byte_pct: number;
|
||||
};
|
||||
packets_per_hour_72h: PacketsPerHourBucket[];
|
||||
noise_floor_24h: NoiseFloorHistoryStats;
|
||||
}
|
||||
|
||||
@@ -53,6 +53,9 @@ ignore = [
|
||||
"SIM117", # nested with statements - can be clearer in tests
|
||||
]
|
||||
|
||||
[tool.ruff.lint.per-file-ignores]
|
||||
"app/main.py" = ["E402"] # imports after Windows event-loop re-exec block
|
||||
|
||||
[tool.ruff.lint.isort]
|
||||
known-first-party = ["app"]
|
||||
|
||||
|
||||
@@ -43,6 +43,7 @@ class TestStatisticsEmpty:
|
||||
"double_byte_pct": 0.0,
|
||||
"triple_byte_pct": 0.0,
|
||||
}
|
||||
assert result["packets_per_hour_72h"] == []
|
||||
|
||||
|
||||
class TestStatisticsCounts:
|
||||
@@ -397,6 +398,54 @@ class TestPathHashWidthStats:
|
||||
assert breakdown["triple_byte"] == 1
|
||||
|
||||
|
||||
class TestPacketsPerHour:
|
||||
@pytest.mark.asyncio
|
||||
async def test_buckets_packets_by_hour(self, test_db):
|
||||
"""Packets within 72h are bucketed by hour."""
|
||||
now = int(time.time())
|
||||
hour_start = (now // 3600) * 3600
|
||||
conn = test_db.conn
|
||||
|
||||
# 3 packets in the current hour, 1 in the previous hour
|
||||
for i in range(3):
|
||||
await conn.execute(
|
||||
"INSERT INTO raw_packets (timestamp, data, payload_hash) VALUES (?, ?, ?)",
|
||||
(hour_start + i, b"\x01", bytes([i]) * 32),
|
||||
)
|
||||
await conn.execute(
|
||||
"INSERT INTO raw_packets (timestamp, data, payload_hash) VALUES (?, ?, ?)",
|
||||
(hour_start - 1800, b"\x02", b"\xaa" * 32),
|
||||
)
|
||||
# 1 packet outside the 72h window — should be excluded
|
||||
await conn.execute(
|
||||
"INSERT INTO raw_packets (timestamp, data, payload_hash) VALUES (?, ?, ?)",
|
||||
(now - 260000, b"\x03", b"\xbb" * 32),
|
||||
)
|
||||
await conn.commit()
|
||||
|
||||
result = await StatisticsRepository.get_all()
|
||||
buckets = result["packets_per_hour_72h"]
|
||||
|
||||
assert len(buckets) == 2
|
||||
by_ts = {b["timestamp"]: b["count"] for b in buckets}
|
||||
assert by_ts[hour_start] == 3
|
||||
assert by_ts[hour_start - 3600] == 1
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_empty_when_no_recent_packets(self, test_db):
|
||||
"""Returns empty list when all packets are older than 72h."""
|
||||
now = int(time.time())
|
||||
conn = test_db.conn
|
||||
await conn.execute(
|
||||
"INSERT INTO raw_packets (timestamp, data, payload_hash) VALUES (?, ?, ?)",
|
||||
(now - 300000, b"\x01", b"\x01" * 32),
|
||||
)
|
||||
await conn.commit()
|
||||
|
||||
result = await StatisticsRepository.get_all()
|
||||
assert result["packets_per_hour_72h"] == []
|
||||
|
||||
|
||||
class TestStatisticsEndpoint:
|
||||
@pytest.mark.asyncio
|
||||
async def test_statistics_endpoint_includes_noise_floor_history(self, test_db, client):
|
||||
|
||||
Reference in New Issue
Block a user