Files
Remote-Terminal-for-MeshCore/app/AGENTS.md
2026-02-04 13:14:08 -08:00

23 KiB

Backend AGENTS.md

This document provides context for AI assistants and developers working on the FastAPI backend.

Technology Stack

  • FastAPI - Async web framework with automatic OpenAPI docs
  • aiosqlite - Async SQLite driver
  • meshcore - MeshCore radio library (local dependency at ../meshcore_py)
  • Pydantic - Data validation and settings management
  • PyCryptodome - AES-128 encryption for packet decryption
  • UV - Python package manager

Directory Structure

app/
├── main.py           # FastAPI app, lifespan, router registration, static file serving
├── config.py         # Pydantic settings (env vars: MESHCORE_*)
├── database.py       # SQLite schema, connection management, runs migrations
├── migrations.py     # Database migrations using SQLite user_version pragma
├── models.py         # Pydantic models for API request/response
├── repository.py     # Database CRUD (ContactRepository, ChannelRepository, etc.)
├── radio.py          # RadioManager - serial connection to MeshCore device
├── radio_sync.py     # Periodic sync, contact auto-loading to radio
├── decoder.py        # Packet decryption (channel + direct messages)
├── packet_processor.py # Raw packet processing, advertisement handling
├── keystore.py       # Ephemeral key store (private key in memory only)
├── event_handlers.py # Radio event subscriptions, ACK tracking, repeat detection
├── websocket.py      # WebSocketManager for real-time client updates
└── routers/          # All routes prefixed with /api
    ├── health.py     # GET /api/health
    ├── radio.py      # Radio config, advertise, private key, reboot
    ├── contacts.py   # Contact CRUD, radio sync, mark-read
    ├── channels.py   # Channel CRUD, radio sync, mark-read
    ├── messages.py   # Message list and send (direct/channel)
    ├── packets.py    # Raw packet endpoints, historical decryption
    ├── read_state.py # Read state: unread counts, mark-all-read
    ├── settings.py   # App settings (max_radio_contacts)
    └── ws.py         # WebSocket endpoint at /api/ws

Key Architectural Patterns

Repository Pattern

All database operations go through repository classes in repository.py:

from app.repository import ContactRepository, ChannelRepository, MessageRepository, RawPacketRepository, AppSettingsRepository

# Examples
contact = await ContactRepository.get_by_key_prefix("abc123")
await MessageRepository.create(msg_type="PRIV", text="Hello", received_at=timestamp)
await RawPacketRepository.mark_decrypted(packet_id, message_id)

# App settings (single-row pattern)
settings = await AppSettingsRepository.get()
await AppSettingsRepository.update(auto_decrypt_dm_on_advert=True)
await AppSettingsRepository.add_favorite("contact", public_key)

Radio Connection

RadioManager in radio.py handles serial connection:

from app.radio import radio_manager

# Access meshcore instance
if radio_manager.meshcore:
    await radio_manager.meshcore.commands.send_msg(dst, msg)

Auto-detection scans common serial ports when MESHCORE_SERIAL_PORT is not set.

Event-Driven Architecture

Radio events flow through event_handlers.py:

Event Handler Actions
CONTACT_MSG_RECV on_contact_message Fallback only - stores DM if packet processor didn't handle it
RX_LOG_DATA on_rx_log_data Store packet, decrypt channels/DMs, broadcast via WS
PATH_UPDATE on_path_update Update contact path info
NEW_CONTACT on_new_contact Sync contact from radio's internal database
ACK on_ack Match pending ACKs, mark message acked, broadcast

Note on DM handling: Direct messages are primarily handled by the packet processor via RX_LOG_DATA, which decrypts using the exported private key. The CONTACT_MSG_RECV handler exists as a fallback for radios without ENABLE_PRIVATE_KEY_EXPORT=1 in firmware.

WebSocket Broadcasting

Real-time updates use ws_manager singleton:

from app.websocket import ws_manager

# Broadcast to all connected clients
await ws_manager.broadcast("message", {"id": 1, "text": "Hello"})

Event types: health, contacts, channels, message, contact, raw_packet, message_acked, error

Note: The WebSocket initial connect only sends health. Contacts and channels are fetched via REST (GET /api/contacts, GET /api/channels) for faster parallel loading. The WS still broadcasts real-time contacts/channels updates when data changes.

Helper functions for common broadcasts:

from app.websocket import broadcast_error, broadcast_health

# Notify clients of errors (shows toast in frontend)
broadcast_error("Operation failed", "Additional details")

# Notify clients of connection status change
broadcast_health(radio_connected=True, serial_port="/dev/ttyUSB0")

Connection Monitoring

RadioManager includes a background task that monitors connection status:

  • Checks connection every 5 seconds
  • Broadcasts health event on status change
  • Attempts automatic reconnection when connection lost
  • Runs full post_connect_setup() after successful reconnect (event handlers, key export, time sync, contact/channel sync, advertisements, message polling)
  • Resilient to transient errors (logs and continues rather than crashing)
  • Supports manual reconnection via POST /api/radio/reconnect
from app.radio import radio_manager

# Manual reconnection
success = await radio_manager.reconnect()

# Background monitor (started automatically in app lifespan)
await radio_manager.start_connection_monitor()
await radio_manager.stop_connection_monitor()

Message Polling

Periodic message polling serves as a fallback for platforms where push events are unreliable. Use pause_polling() to temporarily suspend polling during operations that need exclusive radio access (e.g., repeater CLI commands):

from app.radio_sync import pause_polling, is_polling_paused

# Pause polling during sensitive operations (supports nesting)
async with pause_polling():
    # Polling is paused here
    await do_repeater_operation()

    async with pause_polling():
        # Still paused (nested)
        await do_another_operation()

    # Still paused (outer context active)

# Polling resumes when all contexts exit

# Check current state
if is_polling_paused():
    print("Polling is currently paused")

Periodic Advertisement

The server automatically sends an advertisement every hour to announce presence on the mesh. This helps maintain visibility to other nodes and refreshes routing information.

  • Started automatically on radio connection
  • Interval: 1 hour (3600 seconds)
  • Uses flood mode for maximum reach
from app.radio_sync import start_periodic_advert, stop_periodic_advert, send_advertisement

# Start/stop periodic advertising
start_periodic_advert()  # Started automatically in lifespan
await stop_periodic_advert()

# Manual advertisement
await send_advertisement()  # Returns True on success

Database Schema

contacts (
    public_key TEXT PRIMARY KEY,  -- 64-char hex
    name TEXT,
    type INTEGER DEFAULT 0,       -- 0=unknown, 1=client, 2=repeater, 3=room
    flags INTEGER DEFAULT 0,
    last_path TEXT,               -- Routing path hex
    last_path_len INTEGER DEFAULT -1,
    last_advert INTEGER,          -- Unix timestamp of last advertisement
    lat REAL, lon REAL,
    last_seen INTEGER,
    on_radio INTEGER DEFAULT 0,   -- Boolean: contact loaded on radio
    last_contacted INTEGER,       -- Unix timestamp of last message sent/received
    last_read_at INTEGER          -- Unix timestamp when conversation was last read
)

channels (
    key TEXT PRIMARY KEY,         -- 32-char hex channel key
    name TEXT NOT NULL,
    is_hashtag INTEGER DEFAULT 0, -- Key derived from SHA256(name)[:16]
    on_radio INTEGER DEFAULT 0,
    last_read_at INTEGER          -- Unix timestamp when channel was last read
)

messages (
    id INTEGER PRIMARY KEY,
    type TEXT NOT NULL,           -- 'PRIV' or 'CHAN'
    conversation_key TEXT NOT NULL, -- User pubkey for PRIV, channel key for CHAN
    text TEXT NOT NULL,
    sender_timestamp INTEGER,
    received_at INTEGER NOT NULL,
    path TEXT,                    -- Hex-encoded routing path (2 chars per hop), null for outgoing
    txt_type INTEGER DEFAULT 0,
    signature TEXT,
    outgoing INTEGER DEFAULT 0,
    acked INTEGER DEFAULT 0,
    UNIQUE(type, conversation_key, text, sender_timestamp)  -- Deduplication
)

raw_packets (
    id INTEGER PRIMARY KEY,
    timestamp INTEGER NOT NULL,
    data BLOB NOT NULL,           -- Raw packet bytes
    decrypted INTEGER DEFAULT 0,
    message_id INTEGER,           -- FK to messages if decrypted
    decrypt_attempts INTEGER DEFAULT 0,
    last_attempt INTEGER,
    FOREIGN KEY (message_id) REFERENCES messages(id)
)

app_settings (
    id INTEGER PRIMARY KEY CHECK (id = 1),  -- Single-row pattern
    max_radio_contacts INTEGER DEFAULT 200,
    favorites TEXT DEFAULT '[]',            -- JSON array of {type, id}
    auto_decrypt_dm_on_advert INTEGER DEFAULT 0,
    sidebar_sort_order TEXT DEFAULT 'recent',  -- 'recent' or 'alpha'
    last_message_times TEXT DEFAULT '{}',   -- JSON object of state_key -> timestamp
    preferences_migrated INTEGER DEFAULT 0  -- One-time migration flag
)

Database Migrations (migrations.py)

Schema migrations use SQLite's user_version pragma for version tracking:

from app.migrations import get_version, set_version, run_migrations

# Check current schema version
version = await get_version(conn)  # Returns int (0 for new/unmigrated DB)

# Run pending migrations (called automatically on startup)
applied = await run_migrations(conn)  # Returns number of migrations applied

How It Works

  1. database.py calls run_migrations() after schema initialization
  2. Each migration checks user_version and runs if needed
  3. Migrations are idempotent (safe to run multiple times)
  4. ALTER TABLE ADD COLUMN handles existing columns gracefully

Adding a New Migration

# In migrations.py
async def run_migrations(conn: aiosqlite.Connection) -> int:
    version = await get_version(conn)
    applied = 0

    if version < 1:
        await _migrate_001_add_last_read_at(conn)
        await set_version(conn, 1)
        applied += 1

    # Add new migrations here:
    # if version < 2:
    #     await _migrate_002_something(conn)
    #     await set_version(conn, 2)
    #     applied += 1

    return applied

Packet Decryption (decoder.py)

The decoder handles MeshCore packet decryption for historical packet analysis:

Packet Types

class PayloadType(IntEnum):
    GROUP_TEXT = 0x05      # Channel messages (decryptable)
    TEXT_MESSAGE = 0x02   # Direct messages
    ACK = 0x03
    ADVERT = 0x04
    # ... see decoder.py for full list

Channel Key Derivation

Hashtag channels derive keys from name:

channel_key = hashlib.sha256(b"#channelname").digest()[:16]

Decryption Flow

  1. Parse packet header to get payload type
  2. For GROUP_TEXT: extract channel_hash (1 byte), cipher_mac (2 bytes), ciphertext
  3. Verify HMAC-SHA256 using 32-byte secret (key + 16 zero bytes)
  4. Decrypt with AES-128 ECB
  5. Parse decrypted content: timestamp (4 bytes), flags (1 byte), "sender: message" text
from app.decoder import try_decrypt_packet_with_channel_key

result = try_decrypt_packet_with_channel_key(raw_bytes, channel_key)
if result:
    print(f"{result.sender}: {result.message}")

Direct Message Decryption

Direct messages use ECDH key exchange (Ed25519 → X25519) for shared secret derivation.

Key storage: The private key is exported from the radio on startup and stored in memory via keystore.py. This enables server-side DM decryption even when contacts aren't loaded on the radio.

Primary path (packet processor): When an RX_LOG_DATA event contains a TEXT_MESSAGE packet, packet_processor.py handles the complete flow:

  1. Decrypts using known contact public keys and stored private key
  2. Filters CLI responses (txt_type=1 in flags)
  3. Stores message in database
  4. Broadcasts via WebSocket
  5. Updates contact's last_contacted timestamp
  6. Triggers bot if enabled

Fallback path (event handler): If the packet processor can't decrypt (no private key export, unknown contact), on_contact_message handles DMs from the MeshCore library's CONTACT_MSG_RECV event. DB deduplication prevents double-storage when both paths fire.

Historical decryption: When creating a contact with try_historical=True, the server attempts to decrypt all stored TEXT_MESSAGE packets for that contact.

Direction detection: The decoder uses the 1-byte dest_hash and src_hash to determine if a message is incoming or outgoing. Edge case: when both bytes match (1/256 chance), defaults to treating as incoming.

from app.decoder import try_decrypt_dm

result = try_decrypt_dm(raw_bytes, private_key, contact_public_key)
if result:
    print(f"{result.message} (timestamp={result.timestamp})")

Advertisement Parsing (decoder.py)

Advertisement packets contain contact information including optional GPS coordinates.

Packet Structure

Bytes 0-31:   Public key (32 bytes)
Bytes 32-35:  Timestamp (4 bytes, little-endian Unix timestamp)
Bytes 36-99:  Signature (64 bytes)
Byte 100:     App flags
Bytes 101+:   Optional fields (location, name) based on flags

App Flags (byte 100)

  • Bits 0-3: Device role (1=Chat, 2=Repeater, 3=Room, 4=Sensor)
  • Bit 4: Has location (lat/lon follow)
  • Bit 5: Has feature 1
  • Bit 6: Has feature 2
  • Bit 7: Has name (null-terminated string at end)

GPS Extraction

When bit 4 is set, latitude and longitude follow as signed int32 little-endian values, divided by 1,000,000 to get decimal degrees:

from app.decoder import parse_advertisement

advert = parse_advertisement(payload_bytes)
if advert:
    print(f"Device role: {advert.device_role}")  # 1=Chat, 2=Repeater
    if advert.lat and advert.lon:
        print(f"Location: {advert.lat}, {advert.lon}")

Data Flow

  1. event_handlers.py receives ADVERTISEMENT event
  2. packet_processor.py calls parse_advertisement() to extract data
  3. Contact is upserted with location data (lat, lon) and device_role as type
  4. Frontend MapView displays contacts with GPS coordinates

ACK and Repeat Detection

The acked field is an integer count, not a boolean:

  • 0 = not acked
  • 1 = one ACK/echo received
  • 2+ = multiple flood echoes received

Direct Message ACKs

When sending a direct message, an expected ACK code is tracked:

from app.event_handlers import track_pending_ack

track_pending_ack(expected_ack="abc123", message_id=42, timeout_ms=30000)

When ACK event arrives, the message's ack count is incremented.

Channel Message Repeats

Flood messages echo back through repeaters. Detection uses:

  • Channel key
  • Text hash
  • Timestamp (±5 second window)

Each repeat increments the ack count. The frontend displays:

  • ? = no acks
  • = 1 echo
  • ✓2, ✓3, etc. = multiple echoes (real-time updates via WebSocket)

Auto-Contact Sync to Radio

To enable the radio to auto-ACK incoming DMs, recent non-repeater contacts are automatically loaded to the radio. Configured via max_radio_contacts setting (default 200).

  • Triggered on each advertisement from a non-repeater contact
  • Loads most recently contacted non-repeaters (by last_contacted timestamp)
  • Throttled to at most once per 30 seconds
  • last_contacted updated on message send/receive
from app.radio_sync import sync_recent_contacts_to_radio

result = await sync_recent_contacts_to_radio(force=True)
# Returns: {"loaded": 5, "already_on_radio": 195, "failed": 0}

API Endpoints

All endpoints are prefixed with /api.

Health

  • GET /api/health - Connection status, serial port

Radio

  • GET /api/radio/config - Read config (public key, name, radio params)
  • PATCH /api/radio/config - Update name, lat/lon, tx_power, radio params
  • PUT /api/radio/private-key - Import private key to radio (write-only)
  • POST /api/radio/advertise?flood=true - Send advertisement
  • POST /api/radio/reboot - Reboot radio or reconnect if disconnected
  • POST /api/radio/reconnect - Manual reconnection attempt

Contacts

  • GET /api/contacts - List from database
  • GET /api/contacts/{key} - Get by public key or prefix
  • POST /api/contacts - Create contact (optionally trigger historical DM decryption)
  • POST /api/contacts/sync - Pull from radio to database
  • POST /api/contacts/{key}/add-to-radio - Push to radio
  • POST /api/contacts/{key}/remove-from-radio - Remove from radio
  • POST /api/contacts/{key}/mark-read - Mark conversation as read (updates last_read_at)
  • POST /api/contacts/{key}/telemetry - Request telemetry from repeater (see below)

Channels

  • GET /api/channels - List from database
  • GET /api/channels/{key} - Get by channel key
  • POST /api/channels - Create (hashtag if name starts with # or no key provided)
  • POST /api/channels/sync - Pull from radio
  • POST /api/channels/{key}/mark-read - Mark channel as read (updates last_read_at)
  • DELETE /api/channels/{key} - Delete channel

Read State

  • GET /api/read-state/unreads?name=X - Server-computed unread counts, mention flags, and last message times
  • POST /api/read-state/mark-all-read - Mark all contacts and channels as read

Messages

  • GET /api/messages?type=&conversation_key=&limit=&offset= - List with filters
  • POST /api/messages/direct - Send direct message
  • POST /api/messages/channel - Send channel message

Packets

  • GET /api/packets/undecrypted/count - Count of undecrypted packets
  • POST /api/packets/decrypt/historical - Try decrypting old packets with new key

Settings

  • GET /api/settings - Get all app settings
  • PATCH /api/settings - Update settings (max_radio_contacts, auto_decrypt_dm_on_advert, sidebar_sort_order)
  • POST /api/settings/favorites - Add a favorite
  • DELETE /api/settings/favorites - Remove a favorite
  • POST /api/settings/favorites/toggle - Toggle favorite status
  • POST /api/settings/migrate - One-time migration from frontend localStorage

WebSocket

  • WS /api/ws - Real-time updates (health, contacts, channels, messages, raw packets)

Static Files (Production)

In production, the backend also serves the frontend:

  • / - Serves frontend/dist/index.html
  • /assets/* - Serves compiled JS/CSS from frontend/dist/assets/
  • /* - Falls back to index.html for SPA routing

Testing

Run tests with:

PYTHONPATH=. uv run pytest tests/ -v

Key test files:

  • tests/test_decoder.py - Channel + direct message decryption, key exchange, real-world test vectors
  • tests/test_keystore.py - Ephemeral key store operations
  • tests/test_event_handlers.py - ACK tracking, repeat detection, CLI response filtering
  • tests/test_api.py - API endpoint tests, read state tracking
  • tests/test_migrations.py - Migration system, schema versioning

Common Tasks

Adding a New Endpoint

  1. Create or update router in app/routers/
  2. Define Pydantic models in app/models.py if needed
  3. Add repository methods in app/repository.py for database operations
  4. Register router in app/main.py if new file
  5. Add tests in tests/

Adding a New Event Handler

  1. Define handler in app/event_handlers.py
  2. Register in register_event_handlers() function
  3. Broadcast updates via ws_manager as needed

Working with Radio Commands

# Available via radio_manager.meshcore.commands
await mc.commands.send_msg(dst, msg)
await mc.commands.send_chan_msg(chan, msg)
await mc.commands.get_contacts()
await mc.commands.add_contact(contact_dict)
await mc.commands.set_channel(idx, name, key)
await mc.commands.send_advert(flood=True)

Repeater Telemetry

The POST /api/contacts/{key}/telemetry endpoint fetches status, neighbors, and ACL from repeaters (contact type=2).

Request Flow

  1. Verify contact exists and is a repeater (type=2)
  2. Add contact to radio with stored path data (from advertisements)
  3. Send login with password
  4. Request status with retries (3 attempts, 10s timeout)
  5. Fetch neighbors with fetch_all_neighbours() (handles pagination)
  6. Fetch ACL with req_acl_sync()
  7. Resolve pubkey prefixes to contact names from database

ACL Permission Levels

ACL_PERMISSION_NAMES = {
    0: "Guest",
    1: "Read-only",
    2: "Read-write",
    3: "Admin",
}

Response Models

class NeighborInfo(BaseModel):
    pubkey_prefix: str      # 4-12 char prefix
    name: str | None        # Resolved contact name
    snr: float              # Signal-to-noise ratio in dB
    last_heard_seconds: int # Seconds since last heard

class AclEntry(BaseModel):
    pubkey_prefix: str      # 12 char prefix
    name: str | None        # Resolved contact name
    permission: int         # 0-3
    permission_name: str    # Human-readable name

class TelemetryResponse(BaseModel):
    # Status fields
    pubkey_prefix: str
    battery_volts: float    # Converted from mV
    uptime_seconds: int
    # ... signal quality, packet counts, etc.

    # Related data
    neighbors: list[NeighborInfo]
    acl: list[AclEntry]

Repeater CLI Commands

After login via telemetry endpoint, you can send CLI commands to repeaters:

Endpoint

POST /api/contacts/{key}/command - Send a CLI command (assumes already logged in)

Request/Response

class CommandRequest(BaseModel):
    command: str  # CLI command to send

class CommandResponse(BaseModel):
    command: str           # Echo of sent command
    response: str          # Response from repeater
    sender_timestamp: int | None  # Timestamp from response

Common Commands

get name / set name <value>     # Repeater name
get tx / set tx <dbm>           # TX power
get radio / set radio <freq,bw,sf,cr>  # Radio params
tempradio <freq,bw,sf,cr,mins>  # Temporary radio change
setperm <pubkey> <0-3>          # ACL: 0=guest, 1=ro, 2=rw, 3=admin
clock / clock sync              # Get/sync time
ver                             # Firmware version
reboot                          # Restart repeater

CLI Response Filtering

CLI responses have txt_type=1 (vs txt_type=0 for normal messages). Both DM handling paths filter these to prevent storage—the command endpoint returns the response directly.

Packet processor path (primary):

# In create_dm_message_from_decrypted()
txt_type = decrypted.flags & 0x0F
if txt_type == 1:
    return None  # Skip CLI responses

Event handler path (fallback):

# In on_contact_message()
txt_type = payload.get("txt_type", 0)
if txt_type == 1:
    return  # Skip CLI responses

Helper Function

prepare_repeater_connection() handles the login dance:

  1. Add contact to radio with stored path from DB (out_path, out_path_len)
  2. Send login with password
  3. Wait for key exchange to complete

Contact Path Tracking

When advertisements are received, path data is extracted and stored:

  • last_path: Hex string of routing path bytes
  • last_path_len: Number of hops (-1=flood/unknown, 0=direct, >0=hops through repeaters)

Shortest path selection: When receiving echoed advertisements within 60 seconds, the shortest path is kept. This ensures we use the most efficient route when multiple paths exist.