mirror of
https://github.com/jkingsman/Remote-Terminal-for-MeshCore.git
synced 2026-07-04 08:52:03 +02:00
Merge branch 'main' of github.com:maplemesh/Remote-Terminal-for-MeshCore into gnomeadrift/repeater_telemetry_history
This commit is contained in:
+26
-10
@@ -25,18 +25,22 @@ Keep it aligned with `app/` source files and router behavior.
|
||||
app/
|
||||
├── main.py # App startup/lifespan, router registration, static frontend mounting
|
||||
├── config.py # Env-driven runtime settings
|
||||
├── channel_constants.py # Public/default channel constants shared across sync/send logic
|
||||
├── database.py # SQLite connection + base schema + migration runner
|
||||
├── migrations.py # Schema migrations (SQLite user_version)
|
||||
├── models.py # Pydantic request/response models and typed write contracts (for example ContactUpsert)
|
||||
├── version_info.py # Unified version/build metadata resolution for debug + startup surfaces
|
||||
├── repository/ # Data access layer (contacts, channels, messages, raw_packets, settings, fanout)
|
||||
├── services/ # Shared orchestration/domain services
|
||||
│ ├── messages.py # Shared message creation, dedup, ACK application
|
||||
│ ├── message_send.py # Direct send, channel send, resend workflows
|
||||
│ ├── dm_ingest.py # Shared direct-message ingest / dedup seam for packet + fallback paths
|
||||
│ ├── dm_ack_apply.py # Shared DM ACK application over pending/buffered ACK state
|
||||
│ ├── dm_ack_tracker.py # Pending DM ACK state
|
||||
│ ├── contact_reconciliation.py # Prefix-claim, sender-key backfill, name-history wiring
|
||||
│ ├── radio_lifecycle.py # Post-connect setup and reconnect/setup helpers
|
||||
│ ├── radio_commands.py # Radio config/private-key command workflows
|
||||
│ ├── radio_noise_floor.py # In-memory local radio noise-floor sampling/history
|
||||
│ └── radio_runtime.py # Router/dependency seam over the global RadioManager
|
||||
├── radio.py # RadioManager transport/session state + lock management
|
||||
├── radio_sync.py # Polling, sync, periodic advertisement loop
|
||||
@@ -61,6 +65,8 @@ app/
|
||||
├── messages.py
|
||||
├── packets.py
|
||||
├── read_state.py
|
||||
├── rooms.py
|
||||
├── server_control.py
|
||||
├── settings.py
|
||||
├── fanout.py
|
||||
├── repeaters.py
|
||||
@@ -174,6 +180,7 @@ app/
|
||||
- `PUT /radio/private-key`
|
||||
- `POST /radio/advertise` — manual advert send; request body may set `mode` to `flood` or `zero_hop` (defaults to `flood`)
|
||||
- `POST /radio/discover` — short mesh discovery sweep for nearby repeaters/sensors
|
||||
- `POST /radio/trace` — send a multi-hop trace loop through known repeaters and back to the local radio
|
||||
- `POST /radio/disconnect`
|
||||
- `POST /radio/reboot`
|
||||
- `POST /radio/reconnect`
|
||||
@@ -198,6 +205,10 @@ app/
|
||||
- `POST /contacts/{public_key}/repeater/radio-settings`
|
||||
- `POST /contacts/{public_key}/repeater/advert-intervals`
|
||||
- `POST /contacts/{public_key}/repeater/owner-info`
|
||||
- `POST /contacts/{public_key}/room/login`
|
||||
- `POST /contacts/{public_key}/room/status`
|
||||
- `POST /contacts/{public_key}/room/lpp-telemetry`
|
||||
- `POST /contacts/{public_key}/room/acl`
|
||||
|
||||
### Channels
|
||||
- `GET /channels`
|
||||
@@ -216,6 +227,7 @@ app/
|
||||
|
||||
### Packets
|
||||
- `GET /packets/undecrypted/count`
|
||||
- `GET /packets/{packet_id}` — fetch one stored raw packet by row ID for on-demand inspection
|
||||
- `POST /packets/decrypt/historical`
|
||||
- `POST /packets/maintenance`
|
||||
|
||||
@@ -236,6 +248,7 @@ app/
|
||||
- `POST /fanout` — create new fanout config
|
||||
- `PATCH /fanout/{id}` — update fanout config (triggers module reload)
|
||||
- `DELETE /fanout/{id}` — delete fanout config (stops module)
|
||||
- `POST /fanout/bots/disable-until-restart` — stop bot modules and keep bots disabled until restart
|
||||
|
||||
### Statistics
|
||||
- `GET /statistics` — aggregated mesh network stats (entity counts, message/packet splits, activity windows, busiest channels)
|
||||
@@ -322,9 +335,11 @@ tests/
|
||||
├── conftest.py # Shared fixtures
|
||||
├── test_ack_tracking_wiring.py # DM ACK tracking extraction and wiring
|
||||
├── test_api.py # REST endpoint integration tests
|
||||
├── test_block_lists.py # Blocked keys/names filtering across list/search surfaces
|
||||
├── test_bot.py # Bot execution and sandboxing
|
||||
├── test_channels_router.py # Channels router endpoints
|
||||
├── test_channel_sender_backfill.py # Sender-key backfill uniqueness rules for channel messages
|
||||
├── test_channels_router.py # Channels router endpoints
|
||||
├── test_community_mqtt.py # Community MQTT publisher (JWT, packet format, hash, broadcast)
|
||||
├── test_config.py # Configuration validation
|
||||
├── test_contact_reconciliation_service.py # Prefix/contact reconciliation service helpers
|
||||
├── test_contacts_router.py # Contacts router endpoints
|
||||
@@ -332,40 +347,41 @@ tests/
|
||||
├── test_disable_bots.py # MESHCORE_DISABLE_BOTS=true feature
|
||||
├── test_echo_dedup.py # Echo/repeat deduplication (incl. concurrent)
|
||||
├── test_fanout.py # Fanout bus CRUD, scope matching, manager dispatch
|
||||
├── test_fanout_integration.py # Fanout integration tests
|
||||
├── test_fanout_hitlist.py # Fanout-related hitlist regression tests
|
||||
├── test_fanout_integration.py # Fanout integration tests
|
||||
├── test_event_handlers.py # ACK tracking, event registration, cleanup
|
||||
├── test_frontend_static.py # Frontend static file serving
|
||||
├── test_health_mqtt_status.py # Health endpoint MQTT status field
|
||||
├── test_http_quality.py # Cache-control / gzip / basic-auth HTTP quality checks
|
||||
├── test_key_normalization.py # Public key normalization
|
||||
├── test_keystore.py # Ephemeral keystore
|
||||
├── test_main_startup.py # App startup and lifespan
|
||||
├── test_map_upload.py # Map upload fanout module
|
||||
├── test_message_pagination.py # Cursor-based message pagination
|
||||
├── test_message_prefix_claim.py # Message prefix claim logic
|
||||
├── test_migrations.py # Schema migration system
|
||||
├── test_community_mqtt.py # Community MQTT publisher (JWT, packet format, hash, broadcast)
|
||||
├── test_mqtt.py # MQTT publisher topic routing and lifecycle
|
||||
├── test_messages_search.py # Message search, around, forward pagination
|
||||
├── test_migrations.py # Schema migration system
|
||||
├── test_packet_pipeline.py # End-to-end packet processing
|
||||
├── test_packets_router.py # Packets router endpoints (decrypt, maintenance)
|
||||
├── test_path_utils.py # Path hex rendering helpers
|
||||
├── test_radio.py # RadioManager, serial detection
|
||||
├── test_radio_commands_service.py # Radio config/private-key service workflows
|
||||
├── test_radio_lifecycle_service.py # Reconnect/setup orchestration helpers
|
||||
├── test_radio_runtime_service.py # radio_runtime seam behavior and helpers
|
||||
├── test_real_crypto.py # Real cryptographic operations
|
||||
├── test_radio_operation.py # radio_operation() context manager
|
||||
├── test_radio_router.py # Radio router endpoints
|
||||
├── test_radio_runtime_service.py # radio_runtime seam behavior and helpers
|
||||
├── test_radio_sync.py # Polling, sync, advertisement
|
||||
├── test_real_crypto.py # Real cryptographic operations
|
||||
├── test_repeater_routes.py # Repeater command/telemetry/trace + granular pane endpoints
|
||||
├── test_repository.py # Data access layer
|
||||
├── test_room_routes.py # Room-server login/status/telemetry/ACL endpoints
|
||||
├── test_rx_log_data.py # on_rx_log_data event handler integration
|
||||
├── test_messages_search.py # Message search, around, forward pagination
|
||||
├── test_block_lists.py # Blocked keys/names filtering
|
||||
├── test_security.py # Optional Basic Auth middleware / config behavior
|
||||
├── test_send_messages.py # Outgoing messages, bot triggers, concurrent sends
|
||||
├── test_settings_router.py # Settings endpoints, advert validation
|
||||
├── test_statistics.py # Statistics aggregation
|
||||
├── test_main_startup.py # App startup and lifespan
|
||||
├── test_path_utils.py # Path hex rendering helpers
|
||||
├── test_version_info.py # Version/build metadata resolution
|
||||
├── test_websocket.py # WS manager broadcast/cleanup
|
||||
└── test_websocket_route.py # WS endpoint lifecycle
|
||||
```
|
||||
|
||||
@@ -96,8 +96,12 @@ CREATE UNIQUE INDEX IF NOT EXISTS idx_messages_dedup_null_safe
|
||||
ON messages(type, conversation_key, text, COALESCE(sender_timestamp, 0))
|
||||
WHERE type = 'CHAN';
|
||||
CREATE INDEX IF NOT EXISTS idx_raw_packets_message_id ON raw_packets(message_id);
|
||||
CREATE INDEX IF NOT EXISTS idx_raw_packets_timestamp ON raw_packets(timestamp);
|
||||
CREATE UNIQUE INDEX IF NOT EXISTS idx_raw_packets_payload_hash ON raw_packets(payload_hash);
|
||||
CREATE INDEX IF NOT EXISTS idx_contacts_on_radio ON contacts(on_radio);
|
||||
CREATE INDEX IF NOT EXISTS idx_contacts_type_last_seen ON contacts(type, last_seen);
|
||||
CREATE INDEX IF NOT EXISTS idx_messages_type_received_conversation
|
||||
ON messages(type, received_at, conversation_key);
|
||||
-- idx_messages_sender_key is created by migration 25 (after adding the sender_key column)
|
||||
-- idx_messages_incoming_priv_dedup is created by migration 44 after legacy rows are reconciled
|
||||
CREATE INDEX IF NOT EXISTS idx_contact_advert_paths_recent
|
||||
|
||||
@@ -30,8 +30,6 @@ logger = logging.getLogger(__name__)
|
||||
# Track active subscriptions so we can unsubscribe before re-registering
|
||||
# This prevents handler duplication after reconnects
|
||||
_active_subscriptions: list["Subscription"] = []
|
||||
_pending_acks = dm_ack_tracker._pending_acks
|
||||
_buffered_acks = dm_ack_tracker._buffered_acks
|
||||
|
||||
|
||||
def track_pending_ack(expected_ack: str, message_id: int, timeout_ms: int) -> bool:
|
||||
|
||||
@@ -39,6 +39,7 @@ from app.routers import (
|
||||
ws,
|
||||
)
|
||||
from app.security import add_optional_basic_auth_middleware
|
||||
from app.services.radio_noise_floor import start_noise_floor_sampling, stop_noise_floor_sampling
|
||||
from app.services.radio_runtime import radio_runtime as radio_manager
|
||||
from app.version_info import get_app_build_info
|
||||
|
||||
@@ -70,6 +71,7 @@ async def lifespan(app: FastAPI):
|
||||
from app.radio_sync import ensure_default_channels
|
||||
|
||||
await ensure_default_channels()
|
||||
await start_noise_floor_sampling()
|
||||
|
||||
# Always start connection monitor (even if initial connection failed)
|
||||
await radio_manager.start_connection_monitor()
|
||||
@@ -98,6 +100,7 @@ async def lifespan(app: FastAPI):
|
||||
await radio_manager.stop_connection_monitor()
|
||||
await stop_background_contact_reconciliation()
|
||||
await stop_message_polling()
|
||||
await stop_noise_floor_sampling()
|
||||
await stop_periodic_advert()
|
||||
await stop_periodic_sync()
|
||||
if radio_manager.meshcore:
|
||||
|
||||
+45
-4
@@ -360,13 +360,20 @@ async def run_migrations(conn: aiosqlite.Connection) -> int:
|
||||
await set_version(conn, 46)
|
||||
applied += 1
|
||||
|
||||
# Migration 47: Repeater telemetry history table + tracking opt-in column
|
||||
# Migration 47: Add statistics indexes for time-windowed scans
|
||||
if version < 47:
|
||||
logger.info("Applying migration 47: repeater telemetry history")
|
||||
await _migrate_047_repeater_telemetry_history(conn)
|
||||
logger.info("Applying migration 47: add statistics indexes")
|
||||
await _migrate_047_add_statistics_indexes(conn)
|
||||
await set_version(conn, 47)
|
||||
applied += 1
|
||||
|
||||
# Migration 48: Repeater telemetry history table + tracking opt-in column
|
||||
if version < 48:
|
||||
logger.info("Applying migration 48: repeater telemetry history")
|
||||
await _migrate_048_repeater_telemetry_history(conn)
|
||||
await set_version(conn, 48)
|
||||
applied += 1
|
||||
|
||||
if applied > 0:
|
||||
logger.info(
|
||||
"Applied %d migration(s), schema now at version %d", applied, await get_version(conn)
|
||||
@@ -2877,7 +2884,41 @@ async def _migrate_046_cleanup_orphaned_contact_child_rows(conn: aiosqlite.Conne
|
||||
await conn.commit()
|
||||
|
||||
|
||||
async def _migrate_047_repeater_telemetry_history(conn: aiosqlite.Connection) -> None:
|
||||
async def _migrate_047_add_statistics_indexes(conn: aiosqlite.Connection) -> None:
|
||||
"""Add indexes used by the statistics endpoint's time-windowed scans."""
|
||||
cursor = await conn.execute("SELECT name FROM sqlite_master WHERE type='table'")
|
||||
tables = {row[0] for row in await cursor.fetchall()}
|
||||
|
||||
if "raw_packets" in tables:
|
||||
cursor = await conn.execute("PRAGMA table_info(raw_packets)")
|
||||
raw_packet_columns = {row[1] for row in await cursor.fetchall()}
|
||||
if "timestamp" in raw_packet_columns:
|
||||
await conn.execute(
|
||||
"CREATE INDEX IF NOT EXISTS idx_raw_packets_timestamp ON raw_packets(timestamp)"
|
||||
)
|
||||
|
||||
if "contacts" in tables:
|
||||
cursor = await conn.execute("PRAGMA table_info(contacts)")
|
||||
contact_columns = {row[1] for row in await cursor.fetchall()}
|
||||
if {"type", "last_seen"}.issubset(contact_columns):
|
||||
await conn.execute(
|
||||
"CREATE INDEX IF NOT EXISTS idx_contacts_type_last_seen ON contacts(type, last_seen)"
|
||||
)
|
||||
|
||||
if "messages" in tables:
|
||||
cursor = await conn.execute("PRAGMA table_info(messages)")
|
||||
message_columns = {row[1] for row in await cursor.fetchall()}
|
||||
if {"type", "received_at", "conversation_key"}.issubset(message_columns):
|
||||
await conn.execute(
|
||||
"""
|
||||
CREATE INDEX IF NOT EXISTS idx_messages_type_received_conversation
|
||||
ON messages(type, received_at, conversation_key)
|
||||
"""
|
||||
)
|
||||
await conn.commit()
|
||||
|
||||
|
||||
async def _migrate_048_repeater_telemetry_history(conn: aiosqlite.Connection) -> None:
|
||||
"""Create repeater_telemetry_history table and add tracking opt-in column to app_settings."""
|
||||
await conn.execute(
|
||||
"""
|
||||
|
||||
@@ -628,6 +628,59 @@ class TraceResponse(BaseModel):
|
||||
path_len: int = Field(description="Number of hops in the trace path")
|
||||
|
||||
|
||||
class RadioTraceHopRequest(BaseModel):
|
||||
"""One requested hop in a radio trace path."""
|
||||
|
||||
public_key: str | None = Field(
|
||||
default=None,
|
||||
description="Full repeater public key when this hop maps to a known repeater",
|
||||
)
|
||||
hop_hex: str | None = Field(
|
||||
default=None,
|
||||
description="Raw hop hash hex when using a custom repeater prefix",
|
||||
)
|
||||
|
||||
|
||||
class RadioTraceRequest(BaseModel):
|
||||
"""Ordered trace path for a radio trace loop."""
|
||||
|
||||
hop_hash_bytes: Literal[1, 2, 4] = Field(
|
||||
default=4,
|
||||
description="Hash width in bytes for every hop in this trace path",
|
||||
)
|
||||
hops: list[RadioTraceHopRequest] = Field(
|
||||
min_length=1,
|
||||
description="Ordered repeater hops, using either known repeater keys or custom hop hex",
|
||||
)
|
||||
|
||||
|
||||
class RadioTraceNode(BaseModel):
|
||||
"""One resolved node in a radio trace result."""
|
||||
|
||||
role: Literal["repeater", "custom", "local"] = Field(description="Node role in the trace")
|
||||
public_key: str | None = Field(
|
||||
default=None,
|
||||
description="Resolved full public key for this node when known",
|
||||
)
|
||||
name: str | None = Field(default=None, description="Display name for this node when known")
|
||||
observed_hash: str | None = Field(
|
||||
default=None,
|
||||
description="Observed 4-byte trace hash for this node as hex",
|
||||
)
|
||||
snr: float | None = Field(default=None, description="Reported SNR for this node in dB")
|
||||
|
||||
|
||||
class RadioTraceResponse(BaseModel):
|
||||
"""Resolved multi-hop radio trace result."""
|
||||
|
||||
path_len: int = Field(description="Number of hashed nodes returned by the trace response")
|
||||
timeout_seconds: float = Field(description="Timeout window used while waiting for the trace")
|
||||
nodes: list[RadioTraceNode] = Field(
|
||||
default_factory=list,
|
||||
description="Ordered trace nodes: repeater hops followed by the terminal local radio",
|
||||
)
|
||||
|
||||
|
||||
class PathDiscoveryRoute(BaseModel):
|
||||
"""One resolved route returned by contact path discovery."""
|
||||
|
||||
@@ -681,6 +734,10 @@ class RadioDiscoveryResult(BaseModel):
|
||||
"""One mesh node heard during a discovery sweep."""
|
||||
|
||||
public_key: str = Field(description="Discovered node public key as hex")
|
||||
name: str | None = Field(
|
||||
default=None,
|
||||
description="Known name for this node from contacts DB, if any",
|
||||
)
|
||||
node_type: Literal["repeater", "sensor"] = Field(description="Discovered node class")
|
||||
heard_count: int = Field(default=1, description="How many responses were heard from this node")
|
||||
local_snr: float | None = Field(
|
||||
@@ -824,6 +881,27 @@ class PathHashWidthStats(BaseModel):
|
||||
triple_byte_pct: float
|
||||
|
||||
|
||||
class NoiseFloorSample(BaseModel):
|
||||
timestamp: int = Field(description="Unix timestamp of the sampled reading")
|
||||
noise_floor_dbm: int = Field(description="Noise floor in dBm")
|
||||
|
||||
|
||||
class NoiseFloorHistoryStats(BaseModel):
|
||||
sample_interval_seconds: int = Field(description="Expected spacing between samples")
|
||||
coverage_seconds: int = Field(description="How much of the last 24 hours is represented")
|
||||
latest_noise_floor_dbm: int | None = Field(
|
||||
default=None, description="Most recent sampled noise floor in dBm"
|
||||
)
|
||||
latest_timestamp: int | None = Field(
|
||||
default=None, description="Unix timestamp of the most recent sample"
|
||||
)
|
||||
supported: bool | None = Field(
|
||||
default=None,
|
||||
description="Whether the connected radio appears to support radio stats sampling",
|
||||
)
|
||||
samples: list[NoiseFloorSample] = Field(default_factory=list)
|
||||
|
||||
|
||||
class StatisticsResponse(BaseModel):
|
||||
busiest_channels_24h: list[BusyChannel]
|
||||
contact_count: int
|
||||
@@ -839,6 +917,7 @@ class StatisticsResponse(BaseModel):
|
||||
repeaters_heard: ContactActivityCounts
|
||||
known_channels_active: ContactActivityCounts
|
||||
path_hash_width_24h: PathHashWidthStats
|
||||
noise_floor_24h: NoiseFloorHistoryStats
|
||||
|
||||
|
||||
class TelemetryHistoryEntry(BaseModel):
|
||||
|
||||
+16
-11
@@ -122,20 +122,20 @@ async def run_historical_dm_decryption(
|
||||
"""Background task to decrypt historical DM packets with contact's key."""
|
||||
from app.websocket import broadcast_success
|
||||
|
||||
packets = await RawPacketRepository.get_undecrypted_text_messages()
|
||||
total = len(packets)
|
||||
total = 0
|
||||
decrypted_count = 0
|
||||
|
||||
if total == 0:
|
||||
logger.info("No undecrypted TEXT_MESSAGE packets to process")
|
||||
return
|
||||
|
||||
logger.info("Starting historical DM decryption of %d TEXT_MESSAGE packets", total)
|
||||
logger.info("Starting historical DM decryption scan for undecrypted TEXT_MESSAGE packets")
|
||||
|
||||
# Derive our public key from the private key
|
||||
our_public_key_bytes = derive_public_key(private_key_bytes)
|
||||
|
||||
for packet_id, packet_data, packet_timestamp in packets:
|
||||
async for (
|
||||
packet_id,
|
||||
packet_data,
|
||||
packet_timestamp,
|
||||
) in RawPacketRepository.stream_undecrypted_text_messages():
|
||||
total += 1
|
||||
# Note: passing our_public_key=None disables the outbound hash check in
|
||||
# try_decrypt_dm (only the inbound check src_hash == their_first_byte runs).
|
||||
# For the 255/256 case where our first byte differs from the contact's,
|
||||
@@ -187,6 +187,10 @@ async def run_historical_dm_decryption(
|
||||
if msg_id is not None:
|
||||
decrypted_count += 1
|
||||
|
||||
if total == 0:
|
||||
logger.info("No undecrypted TEXT_MESSAGE packets to process")
|
||||
return
|
||||
|
||||
logger.info(
|
||||
"Historical DM decryption complete: %d/%d packets decrypted",
|
||||
decrypted_count,
|
||||
@@ -264,9 +268,10 @@ async def process_raw_packet(
|
||||
This is the main entry point for all incoming RF packets.
|
||||
|
||||
Note: Packets are deduplicated by payload hash in the database. If we receive
|
||||
a duplicate packet (same payload, different path), we still broadcast it to
|
||||
the frontend (for the real-time packet feed) but skip decryption processing
|
||||
since the original packet was already processed.
|
||||
a duplicate payload (same payload, different path), we still broadcast it to
|
||||
the frontend for realtime packet-feed fidelity. Some payload types are also
|
||||
intentionally reprocessed on duplicate arrival so message-level dedup/path
|
||||
merge logic and advert/path-history tracking still see each observation.
|
||||
"""
|
||||
ts = timestamp or int(time.time())
|
||||
observation_id = next(_raw_observation_counter)
|
||||
|
||||
+18
-2
@@ -20,7 +20,7 @@ from meshcore import EventType, MeshCore
|
||||
|
||||
from app.channel_constants import PUBLIC_CHANNEL_KEY, PUBLIC_CHANNEL_NAME
|
||||
from app.config import settings
|
||||
from app.event_handlers import cleanup_expired_acks
|
||||
from app.event_handlers import cleanup_expired_acks, on_contact_message
|
||||
from app.models import Contact, ContactUpsert
|
||||
from app.radio import RadioOperationBusyError
|
||||
from app.repository import (
|
||||
@@ -388,6 +388,14 @@ async def _resolve_channel_for_pending_message(
|
||||
return cached_key, channel.name if channel else None
|
||||
|
||||
|
||||
async def _store_pending_direct_message(event) -> None:
|
||||
"""Route a CONTACT_MSG_RECV event pulled via get_msg() through the DM ingest path."""
|
||||
try:
|
||||
await on_contact_message(event)
|
||||
except Exception:
|
||||
logger.warning("Failed to store pending direct message", exc_info=True)
|
||||
|
||||
|
||||
async def _store_pending_channel_message(mc: MeshCore, payload: dict) -> None:
|
||||
"""Persist a CHANNEL_MSG_RECV event pulled via get_msg()."""
|
||||
channel_idx = payload.get("channel_idx")
|
||||
@@ -412,7 +420,8 @@ async def _store_pending_channel_message(mc: MeshCore, payload: dict) -> None:
|
||||
return
|
||||
|
||||
received_at = int(time.time())
|
||||
sender_timestamp = payload.get("sender_timestamp") or received_at
|
||||
ts = payload.get("sender_timestamp")
|
||||
sender_timestamp = ts if ts is not None else received_at
|
||||
sender_name, message_text = _split_channel_sender_and_text(payload.get("text", ""))
|
||||
|
||||
await create_fallback_channel_message(
|
||||
@@ -497,6 +506,8 @@ async def drain_pending_messages(mc: MeshCore) -> int:
|
||||
elif result.type in (EventType.CONTACT_MSG_RECV, EventType.CHANNEL_MSG_RECV):
|
||||
if result.type == EventType.CHANNEL_MSG_RECV:
|
||||
await _store_pending_channel_message(mc, result.payload)
|
||||
elif result.type == EventType.CONTACT_MSG_RECV:
|
||||
await _store_pending_direct_message(result)
|
||||
count += 1
|
||||
|
||||
# Small delay between fetches
|
||||
@@ -534,6 +545,8 @@ async def poll_for_messages(mc: MeshCore) -> int:
|
||||
elif result.type in (EventType.CONTACT_MSG_RECV, EventType.CHANNEL_MSG_RECV):
|
||||
if result.type == EventType.CHANNEL_MSG_RECV:
|
||||
await _store_pending_channel_message(mc, result.payload)
|
||||
elif result.type == EventType.CONTACT_MSG_RECV:
|
||||
await _store_pending_direct_message(result)
|
||||
count += 1
|
||||
# If we got a message, there might be more - drain them
|
||||
count += await drain_pending_messages(mc)
|
||||
@@ -1119,6 +1132,7 @@ _last_contact_sync: float = 0.0
|
||||
CONTACT_SYNC_THROTTLE_SECONDS = 30 # Don't sync more than once per 30 seconds
|
||||
CONTACT_RECONCILE_BATCH_SIZE = 2
|
||||
CONTACT_RECONCILE_YIELD_SECONDS = 0.05
|
||||
CONTACT_RECONCILE_BUSY_BACKOFF_SECONDS = 2.0
|
||||
|
||||
|
||||
def _evict_removed_contact_from_library_cache(mc: MeshCore, public_key: str) -> None:
|
||||
@@ -1328,6 +1342,8 @@ async def _reconcile_radio_contacts_in_background(
|
||||
)
|
||||
except RadioOperationBusyError:
|
||||
logger.debug("Background contact reconcile yielding: radio busy")
|
||||
await asyncio.sleep(CONTACT_RECONCILE_BUSY_BACKOFF_SECONDS)
|
||||
continue
|
||||
|
||||
await asyncio.sleep(CONTACT_RECONCILE_YIELD_SECONDS)
|
||||
if not progressed:
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
import logging
|
||||
import sqlite3
|
||||
import time
|
||||
from collections.abc import AsyncIterator
|
||||
from hashlib import sha256
|
||||
|
||||
from app.database import db
|
||||
@@ -8,6 +9,8 @@ from app.decoder import PayloadType, extract_payload, get_packet_payload_type
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
UNDECRYPTED_PACKET_BATCH_SIZE = 500
|
||||
|
||||
|
||||
class RawPacketRepository:
|
||||
@staticmethod
|
||||
@@ -100,6 +103,40 @@ class RawPacketRepository:
|
||||
rows = await cursor.fetchall()
|
||||
return [(row["id"], bytes(row["data"]), row["timestamp"]) for row in rows]
|
||||
|
||||
@staticmethod
|
||||
async def stream_undecrypted_text_messages(
|
||||
batch_size: int = UNDECRYPTED_PACKET_BATCH_SIZE,
|
||||
) -> AsyncIterator[tuple[int, bytes, int]]:
|
||||
"""Yield undecrypted TEXT_MESSAGE packets in bounded-size batches."""
|
||||
cursor = await db.conn.execute(
|
||||
"SELECT id, data, timestamp FROM raw_packets WHERE message_id IS NULL ORDER BY timestamp ASC"
|
||||
)
|
||||
try:
|
||||
while True:
|
||||
rows = await cursor.fetchmany(batch_size)
|
||||
if not rows:
|
||||
break
|
||||
|
||||
for row in rows:
|
||||
data = bytes(row["data"])
|
||||
payload_type = get_packet_payload_type(data)
|
||||
if payload_type == PayloadType.TEXT_MESSAGE:
|
||||
yield (row["id"], data, row["timestamp"])
|
||||
finally:
|
||||
await cursor.close()
|
||||
|
||||
@staticmethod
|
||||
async def count_undecrypted_text_messages(
|
||||
batch_size: int = UNDECRYPTED_PACKET_BATCH_SIZE,
|
||||
) -> int:
|
||||
"""Count undecrypted TEXT_MESSAGE packets without materializing them all."""
|
||||
count = 0
|
||||
async for _packet in RawPacketRepository.stream_undecrypted_text_messages(
|
||||
batch_size=batch_size
|
||||
):
|
||||
count += 1
|
||||
return count
|
||||
|
||||
@staticmethod
|
||||
async def mark_decrypted(packet_id: int, message_id: int) -> None:
|
||||
"""Link a raw packet to its decrypted message."""
|
||||
@@ -158,17 +195,4 @@ class RawPacketRepository:
|
||||
Filters raw packets to only include those with PayloadType.TEXT_MESSAGE (0x02).
|
||||
These are direct messages that can be decrypted with contact ECDH keys.
|
||||
"""
|
||||
cursor = await db.conn.execute(
|
||||
"SELECT id, data, timestamp FROM raw_packets WHERE message_id IS NULL ORDER BY timestamp ASC"
|
||||
)
|
||||
rows = await cursor.fetchall()
|
||||
|
||||
# Filter for TEXT_MESSAGE packets
|
||||
result = []
|
||||
for row in rows:
|
||||
data = bytes(row["data"])
|
||||
payload_type = get_packet_payload_type(data)
|
||||
if payload_type == PayloadType.TEXT_MESSAGE:
|
||||
result.append((row["id"], data, row["timestamp"]))
|
||||
|
||||
return result
|
||||
return [packet async for packet in RawPacketRepository.stream_undecrypted_text_messages()]
|
||||
|
||||
+56
-37
@@ -12,6 +12,7 @@ logger = logging.getLogger(__name__)
|
||||
SECONDS_1H = 3600
|
||||
SECONDS_24H = 86400
|
||||
SECONDS_7D = 604800
|
||||
RAW_PACKET_STATS_BATCH_SIZE = 500
|
||||
|
||||
|
||||
class AppSettingsRepository:
|
||||
@@ -271,6 +272,26 @@ class AppSettingsRepository:
|
||||
|
||||
|
||||
class StatisticsRepository:
|
||||
@staticmethod
|
||||
async def get_database_message_totals() -> dict[str, int]:
|
||||
"""Return message totals needed by lightweight debug surfaces."""
|
||||
cursor = await db.conn.execute(
|
||||
"""
|
||||
SELECT
|
||||
SUM(CASE WHEN type = 'PRIV' THEN 1 ELSE 0 END) AS total_dms,
|
||||
SUM(CASE WHEN type = 'CHAN' THEN 1 ELSE 0 END) AS total_channel_messages,
|
||||
SUM(CASE WHEN outgoing = 1 THEN 1 ELSE 0 END) AS total_outgoing
|
||||
FROM messages
|
||||
"""
|
||||
)
|
||||
row = await cursor.fetchone()
|
||||
assert row is not None
|
||||
return {
|
||||
"total_dms": row["total_dms"] or 0,
|
||||
"total_channel_messages": row["total_channel_messages"] or 0,
|
||||
"total_outgoing": row["total_outgoing"] or 0,
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
async def _activity_counts(*, contact_type: int, exclude: bool = False) -> dict[str, int]:
|
||||
"""Get time-windowed counts for contacts/repeaters heard."""
|
||||
@@ -297,17 +318,26 @@ class StatisticsRepository:
|
||||
|
||||
@staticmethod
|
||||
async def _known_channels_active() -> dict[str, int]:
|
||||
"""Count distinct known channel keys with channel traffic in each time window."""
|
||||
"""Count known channel keys with any traffic in each time window.
|
||||
|
||||
Channel keys are stored canonically as uppercase hex, so we can avoid
|
||||
the old UPPER(...) join and aggregate per known channel directly.
|
||||
"""
|
||||
now = int(time.time())
|
||||
cursor = await db.conn.execute(
|
||||
"""
|
||||
WITH known AS (
|
||||
SELECT conversation_key, MAX(received_at) AS last_received_at
|
||||
FROM messages
|
||||
WHERE type = 'CHAN'
|
||||
AND conversation_key IN (SELECT key FROM channels)
|
||||
GROUP BY conversation_key
|
||||
)
|
||||
SELECT
|
||||
COUNT(DISTINCT CASE WHEN m.received_at >= ? THEN m.conversation_key END) AS last_hour,
|
||||
COUNT(DISTINCT CASE WHEN m.received_at >= ? THEN m.conversation_key END) AS last_24_hours,
|
||||
COUNT(DISTINCT CASE WHEN m.received_at >= ? THEN m.conversation_key END) AS last_week
|
||||
FROM messages m
|
||||
INNER JOIN channels c ON UPPER(m.conversation_key) = UPPER(c.key)
|
||||
WHERE m.type = 'CHAN'
|
||||
SUM(CASE WHEN last_received_at >= ? THEN 1 ELSE 0 END) AS last_hour,
|
||||
SUM(CASE WHEN last_received_at >= ? THEN 1 ELSE 0 END) AS last_24_hours,
|
||||
SUM(CASE WHEN last_received_at >= ? THEN 1 ELSE 0 END) AS last_week
|
||||
FROM known
|
||||
""",
|
||||
(now - SECONDS_1H, now - SECONDS_24H, now - SECONDS_7D),
|
||||
)
|
||||
@@ -327,22 +357,26 @@ class StatisticsRepository:
|
||||
"SELECT data FROM raw_packets WHERE timestamp >= ?",
|
||||
(now - SECONDS_24H,),
|
||||
)
|
||||
rows = await cursor.fetchall()
|
||||
|
||||
single_byte = 0
|
||||
double_byte = 0
|
||||
triple_byte = 0
|
||||
|
||||
for row in rows:
|
||||
envelope = parse_packet_envelope(bytes(row["data"]))
|
||||
if envelope is None:
|
||||
continue
|
||||
if envelope.hash_size == 1:
|
||||
single_byte += 1
|
||||
elif envelope.hash_size == 2:
|
||||
double_byte += 1
|
||||
elif envelope.hash_size == 3:
|
||||
triple_byte += 1
|
||||
while True:
|
||||
rows = await cursor.fetchmany(RAW_PACKET_STATS_BATCH_SIZE)
|
||||
if not rows:
|
||||
break
|
||||
|
||||
for row in rows:
|
||||
envelope = parse_packet_envelope(bytes(row["data"]))
|
||||
if envelope is None:
|
||||
continue
|
||||
if envelope.hash_size == 1:
|
||||
single_byte += 1
|
||||
elif envelope.hash_size == 2:
|
||||
double_byte += 1
|
||||
elif envelope.hash_size == 3:
|
||||
triple_byte += 1
|
||||
|
||||
total_packets = single_byte + double_byte + triple_byte
|
||||
if total_packets == 0:
|
||||
@@ -425,22 +459,7 @@ class StatisticsRepository:
|
||||
decrypted_packets = pkt_row["decrypted"] or 0
|
||||
undecrypted_packets = total_packets - decrypted_packets
|
||||
|
||||
# Message type counts
|
||||
cursor = await db.conn.execute("SELECT COUNT(*) AS cnt FROM messages WHERE type = 'PRIV'")
|
||||
row = await cursor.fetchone()
|
||||
assert row is not None
|
||||
total_dms: int = row["cnt"]
|
||||
|
||||
cursor = await db.conn.execute("SELECT COUNT(*) AS cnt FROM messages WHERE type = 'CHAN'")
|
||||
row = await cursor.fetchone()
|
||||
assert row is not None
|
||||
total_channel_messages: int = row["cnt"]
|
||||
|
||||
# Outgoing count
|
||||
cursor = await db.conn.execute("SELECT COUNT(*) AS cnt FROM messages WHERE outgoing = 1")
|
||||
row = await cursor.fetchone()
|
||||
assert row is not None
|
||||
total_outgoing: int = row["cnt"]
|
||||
message_totals = await StatisticsRepository.get_database_message_totals()
|
||||
|
||||
# Activity windows
|
||||
contacts_heard = await StatisticsRepository._activity_counts(contact_type=2, exclude=True)
|
||||
@@ -456,9 +475,9 @@ class StatisticsRepository:
|
||||
"total_packets": total_packets,
|
||||
"decrypted_packets": decrypted_packets,
|
||||
"undecrypted_packets": undecrypted_packets,
|
||||
"total_dms": total_dms,
|
||||
"total_channel_messages": total_channel_messages,
|
||||
"total_outgoing": total_outgoing,
|
||||
"total_dms": message_totals["total_dms"],
|
||||
"total_channel_messages": message_totals["total_channel_messages"],
|
||||
"total_outgoing": message_totals["total_outgoing"],
|
||||
"contacts_heard": contacts_heard,
|
||||
"repeaters_heard": repeaters_heard,
|
||||
"known_channels_active": known_channels_active,
|
||||
|
||||
+14
-6
@@ -40,6 +40,10 @@ logger = logging.getLogger(__name__)
|
||||
router = APIRouter(prefix="/contacts", tags=["contacts"])
|
||||
|
||||
|
||||
TRACE_HASH_BYTES = 4
|
||||
TRACE_FLAGS_4BYTE = 2
|
||||
|
||||
|
||||
def _ambiguous_contact_detail(err: AmbiguousPublicKeyPrefixError) -> str:
|
||||
sample = ", ".join(key[:12] for key in err.matches[:2])
|
||||
return (
|
||||
@@ -373,17 +377,17 @@ async def delete_contact(public_key: str) -> dict:
|
||||
async def request_trace(public_key: str) -> TraceResponse:
|
||||
"""Send a single-hop trace to a contact and wait for the result.
|
||||
|
||||
The trace path contains the contact's 1-byte pubkey hash as the sole hop
|
||||
(no intermediate repeaters). The radio firmware requires at least one
|
||||
node in the path.
|
||||
The trace path contains the contact's 4-byte pubkey hash as the sole hop
|
||||
(no intermediate repeaters). This uses TRACE's dedicated width flags rather
|
||||
than the radio's normal path_hash_mode setting.
|
||||
"""
|
||||
require_connected()
|
||||
|
||||
contact = await _resolve_contact_or_404(public_key)
|
||||
|
||||
tag = random.randint(1, 0xFFFFFFFF)
|
||||
# First 2 hex chars of pubkey = 1-byte hash used by the trace protocol
|
||||
contact_hash = contact.public_key[:2]
|
||||
# Use a 4-byte contact hash for low-collision direct trace targeting.
|
||||
contact_hash = contact.public_key[: TRACE_HASH_BYTES * 2]
|
||||
|
||||
# Trace does not need auto-fetch suspension: response arrives as TRACE_DATA
|
||||
# from the reader loop, not via get_msg().
|
||||
@@ -394,7 +398,11 @@ async def request_trace(public_key: str) -> TraceResponse:
|
||||
logger.info(
|
||||
"Sending trace to %s (tag=%d, hash=%s)", contact.public_key[:12], tag, contact_hash
|
||||
)
|
||||
result = await mc.commands.send_trace(path=contact_hash, tag=tag)
|
||||
result = await mc.commands.send_trace(
|
||||
path=contact_hash,
|
||||
tag=tag,
|
||||
flags=TRACE_FLAGS_4BYTE,
|
||||
)
|
||||
|
||||
if result.type == EventType.ERROR:
|
||||
raise HTTPException(status_code=500, detail=f"Failed to send trace: {result.payload}")
|
||||
|
||||
@@ -265,7 +265,7 @@ async def _probe_radio() -> DebugRadioProbe:
|
||||
async def debug_support_snapshot() -> DebugSnapshotResponse:
|
||||
"""Return a support/debug snapshot with recent logs and live radio state."""
|
||||
health_data = await build_health_data(radio_runtime.is_connected, radio_runtime.connection_info)
|
||||
statistics = await StatisticsRepository.get_all()
|
||||
message_totals = await StatisticsRepository.get_database_message_totals()
|
||||
radio_probe = await _probe_radio()
|
||||
channels_with_incoming_messages = (
|
||||
await MessageRepository.count_channels_with_incoming_messages()
|
||||
@@ -291,9 +291,9 @@ async def debug_support_snapshot() -> DebugSnapshotResponse:
|
||||
},
|
||||
),
|
||||
database=DebugDatabaseInfo(
|
||||
total_dms=statistics["total_dms"],
|
||||
total_channel_messages=statistics["total_channel_messages"],
|
||||
total_outgoing=statistics["total_outgoing"],
|
||||
total_dms=message_totals["total_dms"],
|
||||
total_channel_messages=message_totals["total_channel_messages"],
|
||||
total_outgoing=message_totals["total_outgoing"],
|
||||
),
|
||||
radio_probe=radio_probe,
|
||||
logs=[*LOG_COPY_BOUNDARY_PREFIX, *get_recent_log_lines(limit=1000)],
|
||||
|
||||
@@ -210,8 +210,7 @@ async def decrypt_historical_packets(
|
||||
except ValueError:
|
||||
raise _bad_request("Invalid hex string for contact public key") from None
|
||||
|
||||
packets = await RawPacketRepository.get_undecrypted_text_messages()
|
||||
count = len(packets)
|
||||
count = await RawPacketRepository.count_undecrypted_text_messages()
|
||||
if count == 0:
|
||||
return DecryptResult(
|
||||
started=False,
|
||||
|
||||
@@ -2,6 +2,7 @@ import asyncio
|
||||
import logging
|
||||
import random
|
||||
import time
|
||||
from contextlib import suppress
|
||||
from typing import Literal, TypeAlias
|
||||
|
||||
from fastapi import APIRouter, HTTPException
|
||||
@@ -10,14 +11,20 @@ from pydantic import BaseModel, Field
|
||||
|
||||
from app.dependencies import require_connected
|
||||
from app.models import (
|
||||
CONTACT_TYPE_REPEATER,
|
||||
ContactUpsert,
|
||||
RadioDiscoveryRequest,
|
||||
RadioDiscoveryResponse,
|
||||
RadioDiscoveryResult,
|
||||
RadioTraceHopRequest,
|
||||
RadioTraceNode,
|
||||
RadioTraceRequest,
|
||||
RadioTraceResponse,
|
||||
)
|
||||
from app.radio_sync import send_advertisement as do_send_advertisement
|
||||
from app.radio_sync import sync_radio_time
|
||||
from app.repository import ContactRepository
|
||||
from app.services.contact_reconciliation import promote_prefix_contacts_for_contact
|
||||
from app.services.radio_commands import (
|
||||
KeystoreRefreshError,
|
||||
PathHashModeUnsupportedError,
|
||||
@@ -44,6 +51,12 @@ _DISCOVERY_NODE_TYPES: dict[int, DiscoveryNodeType] = {
|
||||
2: "repeater",
|
||||
4: "sensor",
|
||||
}
|
||||
TRACE_WAIT_TIMEOUT_SECONDS = 45.0
|
||||
TRACE_DEFAULT_TIMEOUT_SECONDS = 15.0
|
||||
TRACE_TIMEOUT_MIN_SECONDS = 5.0
|
||||
TRACE_TIMEOUT_MAX_SECONDS = 30.0
|
||||
TRACE_TIMEOUT_MARGIN = 1.2
|
||||
TRACE_HASH_FLAGS = {1: 0, 2: 1, 4: 2}
|
||||
|
||||
|
||||
async def _prepare_connected(*, broadcast_on_success: bool) -> bool:
|
||||
@@ -197,9 +210,118 @@ async def _persist_new_discovery_contacts(results: list[RadioDiscoveryResult]) -
|
||||
on_radio=False,
|
||||
)
|
||||
await ContactRepository.upsert(contact)
|
||||
promoted_keys = await promote_prefix_contacts_for_contact(
|
||||
public_key=result.public_key,
|
||||
log=logger,
|
||||
)
|
||||
created = await ContactRepository.get_by_key(result.public_key)
|
||||
if created is not None:
|
||||
broadcast_event("contact", created.model_dump())
|
||||
for old_key in promoted_keys:
|
||||
broadcast_event("contact_deleted", {"public_key": old_key})
|
||||
|
||||
|
||||
async def _attach_known_names(results: list[RadioDiscoveryResult]) -> None:
|
||||
"""Resolve known contact names for discovery results from the DB."""
|
||||
for result in results:
|
||||
contact = await ContactRepository.get_by_key(result.public_key)
|
||||
if contact is not None and contact.name:
|
||||
result.name = contact.name
|
||||
|
||||
|
||||
def _trace_hash_for_key(public_key: str, hop_hash_bytes: int) -> str:
|
||||
return public_key[: hop_hash_bytes * 2].lower()
|
||||
|
||||
|
||||
def _trace_timeout_seconds(send_result: object) -> float:
|
||||
payload = getattr(send_result, "payload", None) or {}
|
||||
suggested_timeout = payload.get("suggested_timeout")
|
||||
try:
|
||||
if suggested_timeout is None:
|
||||
raise TypeError
|
||||
timeout_seconds = float(suggested_timeout) / 1000.0 * TRACE_TIMEOUT_MARGIN
|
||||
except (TypeError, ValueError):
|
||||
timeout_seconds = TRACE_DEFAULT_TIMEOUT_SECONDS
|
||||
return max(TRACE_TIMEOUT_MIN_SECONDS, min(TRACE_TIMEOUT_MAX_SECONDS, timeout_seconds))
|
||||
|
||||
|
||||
async def _resolve_trace_hops(
|
||||
hops: list[RadioTraceHopRequest], hop_hash_bytes: int
|
||||
) -> tuple[list[RadioTraceNode], list[str]]:
|
||||
trace_nodes: list[RadioTraceNode] = []
|
||||
requested_hashes: list[str] = []
|
||||
expected_hex_len = hop_hash_bytes * 2
|
||||
|
||||
for hop in hops:
|
||||
public_key = hop.public_key.strip().lower() if isinstance(hop.public_key, str) else None
|
||||
hop_hex = hop.hop_hex.strip().lower() if isinstance(hop.hop_hex, str) else None
|
||||
if bool(public_key) == bool(hop_hex):
|
||||
raise HTTPException(
|
||||
status_code=400,
|
||||
detail="Each trace hop must provide exactly one of public_key or hop_hex",
|
||||
)
|
||||
|
||||
if public_key:
|
||||
if len(public_key) != 64:
|
||||
raise HTTPException(
|
||||
status_code=400,
|
||||
detail="Trace repeater keys must be full 64-character public keys",
|
||||
)
|
||||
try:
|
||||
bytes.fromhex(public_key)
|
||||
except ValueError as exc:
|
||||
raise HTTPException(
|
||||
status_code=400,
|
||||
detail="Trace repeater keys must be valid hex public keys",
|
||||
) from exc
|
||||
|
||||
contact = await ContactRepository.get_by_key(public_key)
|
||||
if contact is None:
|
||||
raise HTTPException(
|
||||
status_code=404, detail=f"Trace repeater not found: {public_key}"
|
||||
)
|
||||
if contact.type != CONTACT_TYPE_REPEATER:
|
||||
raise HTTPException(
|
||||
status_code=400,
|
||||
detail=f"Trace node is not a repeater: {public_key[:12]}",
|
||||
)
|
||||
requested_hashes.append(_trace_hash_for_key(contact.public_key, hop_hash_bytes))
|
||||
trace_nodes.append(
|
||||
RadioTraceNode(
|
||||
role="repeater",
|
||||
public_key=contact.public_key,
|
||||
name=contact.name,
|
||||
observed_hash=None,
|
||||
snr=None,
|
||||
)
|
||||
)
|
||||
continue
|
||||
|
||||
assert hop_hex is not None
|
||||
if len(hop_hex) != expected_hex_len:
|
||||
raise HTTPException(
|
||||
status_code=400,
|
||||
detail=f"Custom trace hops must be exactly {expected_hex_len} hex characters",
|
||||
)
|
||||
try:
|
||||
bytes.fromhex(hop_hex)
|
||||
except ValueError as exc:
|
||||
raise HTTPException(
|
||||
status_code=400,
|
||||
detail="Custom trace hops must be valid hex",
|
||||
) from exc
|
||||
requested_hashes.append(hop_hex)
|
||||
trace_nodes.append(
|
||||
RadioTraceNode(
|
||||
role="custom",
|
||||
public_key=None,
|
||||
name=None,
|
||||
observed_hash=hop_hex,
|
||||
snr=None,
|
||||
)
|
||||
)
|
||||
|
||||
return trace_nodes, requested_hashes
|
||||
|
||||
|
||||
@router.get("/config", response_model=RadioConfigResponse)
|
||||
@@ -365,6 +487,7 @@ async def discover_mesh(request: RadioDiscoveryRequest) -> RadioDiscoveryRespons
|
||||
),
|
||||
)
|
||||
await _persist_new_discovery_contacts(results)
|
||||
await _attach_known_names(results)
|
||||
return RadioDiscoveryResponse(
|
||||
target=request.target,
|
||||
duration_seconds=DISCOVERY_WINDOW_SECONDS,
|
||||
@@ -372,6 +495,105 @@ async def discover_mesh(request: RadioDiscoveryRequest) -> RadioDiscoveryRespons
|
||||
)
|
||||
|
||||
|
||||
@router.post("/trace", response_model=RadioTraceResponse)
|
||||
async def trace_path(request: RadioTraceRequest) -> RadioTraceResponse:
|
||||
"""Send a multi-hop trace loop through known repeaters and back to the local radio."""
|
||||
require_connected()
|
||||
trace_nodes, requested_hashes = await _resolve_trace_hops(request.hops, request.hop_hash_bytes)
|
||||
|
||||
tag = random.randint(1, 0xFFFFFFFF)
|
||||
trace_flags = TRACE_HASH_FLAGS[request.hop_hash_bytes]
|
||||
|
||||
async with radio_manager.radio_operation("radio_trace", pause_polling=True) as mc:
|
||||
local_public_key = str((mc.self_info or {}).get("public_key") or "").lower()
|
||||
if len(local_public_key) != 64:
|
||||
raise HTTPException(status_code=503, detail="Local radio public key is unavailable")
|
||||
local_name = (mc.self_info or {}).get("name")
|
||||
|
||||
response_task = asyncio.create_task(
|
||||
mc.wait_for_event(
|
||||
EventType.TRACE_DATA,
|
||||
attribute_filters={"tag": tag},
|
||||
timeout=TRACE_WAIT_TIMEOUT_SECONDS,
|
||||
)
|
||||
)
|
||||
try:
|
||||
send_result = await mc.commands.send_trace(
|
||||
path=",".join(requested_hashes),
|
||||
tag=tag,
|
||||
flags=trace_flags,
|
||||
)
|
||||
if send_result is None or send_result.type == EventType.ERROR:
|
||||
raise HTTPException(status_code=500, detail="Failed to send trace")
|
||||
|
||||
timeout_seconds = _trace_timeout_seconds(send_result)
|
||||
try:
|
||||
event = await asyncio.wait_for(response_task, timeout=timeout_seconds)
|
||||
except asyncio.TimeoutError as exc:
|
||||
raise HTTPException(status_code=504, detail="No trace response heard") from exc
|
||||
finally:
|
||||
if not response_task.done():
|
||||
response_task.cancel()
|
||||
with suppress(asyncio.CancelledError):
|
||||
await response_task
|
||||
|
||||
if event is None:
|
||||
raise HTTPException(status_code=504, detail="No trace response heard")
|
||||
|
||||
payload = event.payload if isinstance(event.payload, dict) else {}
|
||||
path_len = payload.get("path_len")
|
||||
if not isinstance(path_len, int):
|
||||
raise HTTPException(status_code=500, detail="Trace response was malformed")
|
||||
|
||||
raw_path = payload.get("path")
|
||||
path_nodes = raw_path if isinstance(raw_path, list) else []
|
||||
final_local_node = (
|
||||
path_nodes[-1]
|
||||
if path_nodes
|
||||
and isinstance(path_nodes[-1], dict)
|
||||
and not isinstance(path_nodes[-1].get("hash"), str)
|
||||
else None
|
||||
)
|
||||
hashed_nodes = path_nodes[:-1] if final_local_node is not None else path_nodes
|
||||
|
||||
if len(hashed_nodes) < len(trace_nodes):
|
||||
raise HTTPException(status_code=500, detail="Trace response was incomplete")
|
||||
|
||||
nodes: list[RadioTraceNode] = []
|
||||
for index, trace_node in enumerate(trace_nodes):
|
||||
observed = hashed_nodes[index] if index < len(hashed_nodes) else {}
|
||||
observed_hash = observed.get("hash") if isinstance(observed, dict) else None
|
||||
observed_snr = observed.get("snr") if isinstance(observed, dict) else None
|
||||
nodes.append(
|
||||
RadioTraceNode(
|
||||
role=trace_node.role,
|
||||
public_key=trace_node.public_key,
|
||||
name=trace_node.name,
|
||||
observed_hash=(
|
||||
observed_hash if isinstance(observed_hash, str) else trace_node.observed_hash
|
||||
),
|
||||
snr=float(observed_snr) if isinstance(observed_snr, (int, float)) else None,
|
||||
)
|
||||
)
|
||||
|
||||
terminal_snr_value = final_local_node.get("snr") if isinstance(final_local_node, dict) else None
|
||||
nodes.append(
|
||||
RadioTraceNode(
|
||||
role="local",
|
||||
public_key=local_public_key,
|
||||
name=local_name if isinstance(local_name, str) and local_name else None,
|
||||
observed_hash=None,
|
||||
snr=float(terminal_snr_value) if isinstance(terminal_snr_value, (int, float)) else None,
|
||||
)
|
||||
)
|
||||
|
||||
return RadioTraceResponse(
|
||||
path_len=path_len,
|
||||
timeout_seconds=timeout_seconds,
|
||||
nodes=nodes,
|
||||
)
|
||||
|
||||
|
||||
async def _attempt_reconnect() -> dict:
|
||||
"""Shared reconnection logic for reboot and reconnect endpoints."""
|
||||
radio_manager.resume_connection()
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
import asyncio
|
||||
import logging
|
||||
import time
|
||||
from typing import TYPE_CHECKING
|
||||
@@ -31,7 +30,6 @@ from app.models import (
|
||||
from app.repository import ContactRepository, RepeaterTelemetryRepository
|
||||
from app.routers.contacts import _ensure_on_radio, _resolve_contact_or_404
|
||||
from app.routers.server_control import (
|
||||
_monotonic,
|
||||
batch_cli_fetch,
|
||||
extract_response_text,
|
||||
prepare_authenticated_contact_connection,
|
||||
@@ -40,9 +38,6 @@ from app.routers.server_control import (
|
||||
)
|
||||
from app.services.radio_runtime import radio_runtime as radio_manager
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from meshcore.events import Event
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# ACL permission level names
|
||||
@@ -60,58 +55,6 @@ def _extract_response_text(event) -> str:
|
||||
return extract_response_text(event)
|
||||
|
||||
|
||||
async def _fetch_repeater_response(
|
||||
mc,
|
||||
target_pubkey_prefix: str,
|
||||
timeout: float = 20.0,
|
||||
) -> "Event | None":
|
||||
deadline = _monotonic() + timeout
|
||||
|
||||
while _monotonic() < deadline:
|
||||
try:
|
||||
result = await mc.commands.get_msg(timeout=2.0)
|
||||
except asyncio.TimeoutError:
|
||||
continue
|
||||
except Exception as exc:
|
||||
logger.debug("get_msg() exception: %s", exc)
|
||||
await asyncio.sleep(1.0)
|
||||
continue
|
||||
|
||||
if result.type == EventType.NO_MORE_MSGS:
|
||||
await asyncio.sleep(1.0)
|
||||
continue
|
||||
|
||||
if result.type == EventType.ERROR:
|
||||
logger.debug("get_msg() error: %s", result.payload)
|
||||
await asyncio.sleep(1.0)
|
||||
continue
|
||||
|
||||
if result.type == EventType.CONTACT_MSG_RECV:
|
||||
msg_prefix = result.payload.get("pubkey_prefix", "")
|
||||
txt_type = result.payload.get("txt_type", 0)
|
||||
if msg_prefix == target_pubkey_prefix and txt_type == 1:
|
||||
return result
|
||||
logger.debug(
|
||||
"Skipping non-target message (from=%s, txt_type=%d) while waiting for %s",
|
||||
msg_prefix,
|
||||
txt_type,
|
||||
target_pubkey_prefix,
|
||||
)
|
||||
continue
|
||||
|
||||
if result.type == EventType.CHANNEL_MSG_RECV:
|
||||
logger.debug(
|
||||
"Skipping channel message (channel_idx=%s) during repeater fetch",
|
||||
result.payload.get("channel_idx"),
|
||||
)
|
||||
continue
|
||||
|
||||
logger.debug("Unexpected event type %s during repeater fetch, skipping", result.type)
|
||||
|
||||
logger.warning("No CLI response from repeater %s within %.1fs", target_pubkey_prefix, timeout)
|
||||
return None
|
||||
|
||||
|
||||
async def prepare_repeater_connection(mc, contact: Contact, password: str) -> RepeaterLoginResponse:
|
||||
return await prepare_authenticated_contact_connection(
|
||||
mc,
|
||||
|
||||
@@ -13,6 +13,7 @@ from app.models import (
|
||||
Contact,
|
||||
RepeaterLoginResponse,
|
||||
)
|
||||
from app.radio_sync import _store_pending_channel_message, _store_pending_direct_message
|
||||
from app.routers.contacts import _ensure_on_radio
|
||||
from app.services.radio_runtime import radio_runtime as radio_manager
|
||||
|
||||
@@ -115,18 +116,20 @@ async def fetch_contact_cli_response(
|
||||
if msg_prefix == target_pubkey_prefix and txt_type == 1:
|
||||
return result
|
||||
logger.debug(
|
||||
"Skipping non-target message (from=%s, txt_type=%d) while waiting for %s",
|
||||
"Storing non-target DM (from=%s, txt_type=%d) consumed while waiting for %s",
|
||||
msg_prefix,
|
||||
txt_type,
|
||||
target_pubkey_prefix,
|
||||
)
|
||||
await _store_pending_direct_message(result)
|
||||
continue
|
||||
|
||||
if result.type == EventType.CHANNEL_MSG_RECV:
|
||||
logger.debug(
|
||||
"Skipping channel message (channel_idx=%s) during CLI fetch",
|
||||
"Storing channel message (channel_idx=%s) consumed during CLI fetch",
|
||||
result.payload.get("channel_idx"),
|
||||
)
|
||||
await _store_pending_channel_message(mc, result.payload)
|
||||
continue
|
||||
|
||||
logger.debug("Unexpected event type %s during CLI fetch, skipping", result.type)
|
||||
|
||||
@@ -2,6 +2,7 @@ from fastapi import APIRouter
|
||||
|
||||
from app.models import StatisticsResponse
|
||||
from app.repository import StatisticsRepository
|
||||
from app.services.radio_noise_floor import get_noise_floor_history
|
||||
|
||||
router = APIRouter(prefix="/statistics", tags=["statistics"])
|
||||
|
||||
@@ -9,4 +10,5 @@ router = APIRouter(prefix="/statistics", tags=["statistics"])
|
||||
@router.get("", response_model=StatisticsResponse)
|
||||
async def get_statistics() -> StatisticsResponse:
|
||||
data = await StatisticsRepository.get_all()
|
||||
data["noise_floor_24h"] = await get_noise_floor_history()
|
||||
return StatisticsResponse(**data)
|
||||
|
||||
@@ -0,0 +1,110 @@
|
||||
"""In-memory local-radio noise floor history sampling."""
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
import time
|
||||
from collections import deque
|
||||
|
||||
from meshcore import EventType
|
||||
|
||||
from app.radio import RadioDisconnectedError, RadioOperationBusyError
|
||||
from app.services.radio_runtime import radio_runtime as radio_manager
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
NOISE_FLOOR_SAMPLE_INTERVAL_SECONDS = 300
|
||||
NOISE_FLOOR_WINDOW_SECONDS = 24 * 60 * 60
|
||||
MAX_NOISE_FLOOR_SAMPLES = 300
|
||||
|
||||
_noise_floor_task: asyncio.Task | None = None
|
||||
_noise_floor_samples: deque[tuple[int, int]] = deque(maxlen=MAX_NOISE_FLOOR_SAMPLES)
|
||||
_noise_floor_supported: bool | None = None
|
||||
_samples_lock = asyncio.Lock()
|
||||
|
||||
|
||||
async def _append_sample(timestamp: int, noise_floor_dbm: int) -> None:
|
||||
async with _samples_lock:
|
||||
_noise_floor_samples.append((timestamp, noise_floor_dbm))
|
||||
|
||||
|
||||
async def sample_noise_floor_once(*, blocking: bool = False) -> None:
|
||||
"""Fetch the current radio noise floor once and record it when available."""
|
||||
global _noise_floor_supported
|
||||
|
||||
if not radio_manager.is_connected:
|
||||
return
|
||||
|
||||
try:
|
||||
async with radio_manager.radio_operation("noise_floor_sample", blocking=blocking) as mc:
|
||||
event = await mc.commands.get_stats_radio()
|
||||
except (RadioDisconnectedError, RadioOperationBusyError):
|
||||
return
|
||||
except Exception as exc:
|
||||
logger.debug("Noise floor sampling failed: %s", exc)
|
||||
return
|
||||
|
||||
if event.type == EventType.ERROR:
|
||||
_noise_floor_supported = False
|
||||
return
|
||||
|
||||
if event.type != EventType.STATS_RADIO:
|
||||
return
|
||||
|
||||
noise_floor = event.payload.get("noise_floor")
|
||||
if not isinstance(noise_floor, int):
|
||||
return
|
||||
|
||||
_noise_floor_supported = True
|
||||
await _append_sample(int(time.time()), noise_floor)
|
||||
|
||||
|
||||
async def _noise_floor_sampling_loop() -> None:
|
||||
while True:
|
||||
await sample_noise_floor_once()
|
||||
await asyncio.sleep(NOISE_FLOOR_SAMPLE_INTERVAL_SECONDS)
|
||||
|
||||
|
||||
async def start_noise_floor_sampling() -> None:
|
||||
global _noise_floor_task
|
||||
if _noise_floor_task is not None and not _noise_floor_task.done():
|
||||
return
|
||||
_noise_floor_task = asyncio.create_task(_noise_floor_sampling_loop())
|
||||
|
||||
|
||||
async def stop_noise_floor_sampling() -> None:
|
||||
global _noise_floor_task
|
||||
if _noise_floor_task is None:
|
||||
return
|
||||
if not _noise_floor_task.done():
|
||||
_noise_floor_task.cancel()
|
||||
try:
|
||||
await _noise_floor_task
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
_noise_floor_task = None
|
||||
|
||||
|
||||
async def get_noise_floor_history() -> dict:
|
||||
"""Return the current 24-hour in-memory noise floor history snapshot."""
|
||||
now = int(time.time())
|
||||
cutoff = now - NOISE_FLOOR_WINDOW_SECONDS
|
||||
|
||||
async with _samples_lock:
|
||||
samples = [
|
||||
{"timestamp": timestamp, "noise_floor_dbm": noise_floor_dbm}
|
||||
for timestamp, noise_floor_dbm in _noise_floor_samples
|
||||
if timestamp >= cutoff
|
||||
]
|
||||
|
||||
latest = samples[-1] if samples else None
|
||||
oldest_timestamp = samples[0]["timestamp"] if samples else None
|
||||
coverage_seconds = 0 if oldest_timestamp is None else max(0, now - oldest_timestamp)
|
||||
|
||||
return {
|
||||
"sample_interval_seconds": NOISE_FLOOR_SAMPLE_INTERVAL_SECONDS,
|
||||
"coverage_seconds": coverage_seconds,
|
||||
"latest_noise_floor_dbm": latest["noise_floor_dbm"] if latest else None,
|
||||
"latest_timestamp": latest["timestamp"] if latest else None,
|
||||
"supported": _noise_floor_supported,
|
||||
"samples": samples,
|
||||
}
|
||||
@@ -43,9 +43,6 @@ class WebSocketManager:
|
||||
3. Send to all clients concurrently with timeout
|
||||
4. Re-acquire lock to clean up disconnected clients
|
||||
"""
|
||||
if not self.active_connections:
|
||||
return
|
||||
|
||||
message = dump_ws_event(event_type, data)
|
||||
|
||||
# Copy connection list under lock to avoid holding lock during I/O
|
||||
|
||||
Reference in New Issue
Block a user