Flesh out the fanout agents a bit more

This commit is contained in:
Jack Kingsman
2026-03-05 22:29:45 -08:00
parent cb4333df4f
commit 5808504ee0

View File

@@ -5,11 +5,12 @@ The fanout bus is a unified system for dispatching mesh radio events (decoded me
## Core Concepts
### FanoutModule (base.py)
Abstract base class that all integration modules implement:
- `start()` / `stop()` — lifecycle management
- `on_message(data)` — receive decoded messages
- `on_raw(data)` — receive raw packets
- `status` property — "connected" | "disconnected"
Base class that all integration modules extend:
- `__init__(config_id, config, *, name="")` — constructor; receives the config UUID, the type-specific config dict, and the user-assigned name
- `start()` / `stop()` — async lifecycle (e.g. open/close connections)
- `on_message(data)` — receive decoded messages (DM/channel)
- `on_raw(data)` — receive raw RF packets
- `status` property (**must override**) — return `"connected"`, `"disconnected"`, or `"error"`
### FanoutManager (manager.py)
Singleton that owns all active modules and dispatches events:
@@ -21,6 +22,8 @@ Singleton that owns all active modules and dispatches events:
- `stop_all()` — shutdown
- `get_statuses()` — health endpoint data
All modules are constructed uniformly: `cls(config_id, config_blob, name=cfg.get("name", ""))`.
### Scope Matching
Each config has a `scope` JSON blob controlling what events reach it:
```json
@@ -33,12 +36,12 @@ Community MQTT always enforces `{"messages": "none", "raw_packets": "all"}`.
## Event Flow
```
Radio Event packet_processor / event_handler
broadcast_event("message"|"raw_packet", data, realtime=True)
WebSocket broadcast (always)
FanoutManager.broadcast_message/raw (only if realtime=True)
scope check per module
module.on_message / on_raw
Radio Event -> packet_processor / event_handler
-> broadcast_event("message"|"raw_packet", data, realtime=True)
-> WebSocket broadcast (always)
-> FanoutManager.broadcast_message/raw (only if realtime=True)
-> scope check per module
-> module.on_message / on_raw
```
Setting `realtime=False` (used during historical decryption) skips fanout dispatch entirely.
@@ -62,21 +65,180 @@ Wraps bot code execution via `app/fanout/bot_exec.py`. Config blob:
- Rate-limits outgoing messages for repeater compatibility
### webhook (webhook.py)
HTTP POST webhook delivery. Config blob:
- `url`, `secret` (optional HMAC signing key)
- Delivers messages and raw packets as JSON payloads
HTTP webhook delivery. Config blob:
- `url`, `method` (POST/PUT/PATCH)
- `hmac_secret` (optional) — when set, each request includes an HMAC-SHA256 signature of the JSON body
- `hmac_header` (optional, default `X-Webhook-Signature`) — header name for the signature (value format: `sha256=<hex>`)
- `headers` — arbitrary extra headers (JSON object)
### apprise (apprise_mod.py)
Push notifications via Apprise library. Config blob:
- `urls`list of Apprise notification service URLs
- Formats messages for human-readable notification delivery
- `urls`newline-separated Apprise notification service URLs
- `preserve_identity` — suppress Discord webhook name/avatar override
- `include_path` — include routing path in notification body
## Adding a New Integration Type
1. Create `app/fanout/my_type.py` with a class extending `FanoutModule`
2. Register it in `manager.py``_register_module_types()`
3. Add validation in `app/routers/fanout.py``_VALID_TYPES` and validator function
4. Add frontend editor component in `SettingsFanoutSection.tsx`
### Step-by-step checklist
#### 1. Backend module (`app/fanout/my_type.py`)
Create a class extending `FanoutModule`:
```python
from app.fanout.base import FanoutModule
class MyTypeModule(FanoutModule):
def __init__(self, config_id: str, config: dict, *, name: str = "") -> None:
super().__init__(config_id, config, name=name)
# Initialize module-specific state
async def start(self) -> None:
"""Open connections, create clients, etc."""
async def stop(self) -> None:
"""Close connections, clean up resources."""
async def on_message(self, data: dict) -> None:
"""Handle decoded messages. Omit if not needed."""
async def on_raw(self, data: dict) -> None:
"""Handle raw packets. Omit if not needed."""
@property
def status(self) -> str:
"""Required. Return 'connected', 'disconnected', or 'error'."""
...
```
Constructor requirements:
- Must accept `config_id: str, config: dict, *, name: str = ""`
- Must forward `name` to super: `super().__init__(config_id, config, name=name)`
#### 2. Register in manager (`app/fanout/manager.py`)
Add import and mapping in `_register_module_types()`:
```python
from app.fanout.my_type import MyTypeModule
_MODULE_TYPES["my_type"] = MyTypeModule
```
#### 3. Router changes (`app/routers/fanout.py`)
Three changes needed:
**a)** Add to `_VALID_TYPES` set:
```python
_VALID_TYPES = {"mqtt_private", "mqtt_community", "bot", "webhook", "apprise", "my_type"}
```
**b)** Add a validation function:
```python
def _validate_my_type_config(config: dict) -> None:
"""Validate my_type config blob."""
if not config.get("some_required_field"):
raise HTTPException(status_code=400, detail="some_required_field is required")
```
**c)** Wire validation into both `create_fanout_config` and `update_fanout_config` — add an `elif` to the validation block in each:
```python
elif body.type == "my_type":
_validate_my_type_config(body.config)
```
Note: validation only runs when the config will be enabled (disabled configs are treated as drafts).
**d)** Add scope enforcement in `_enforce_scope()` if the type has fixed scope constraints (e.g. raw_packets always none). Otherwise it falls through to the `mqtt_private` default which allows both messages and raw_packets to be configurable.
#### 4. Frontend editor component (`SettingsFanoutSection.tsx`)
Four changes needed in this single file:
**a)** Add to `TYPE_LABELS` and `TYPE_OPTIONS` at the top:
```tsx
const TYPE_LABELS: Record<string, string> = {
// ... existing entries ...
my_type: 'My Type',
};
const TYPE_OPTIONS = [
// ... existing entries ...
{ value: 'my_type', label: 'My Type' },
];
```
**b)** Create an editor component (follows the same pattern as existing editors):
```tsx
function MyTypeConfigEditor({
config,
scope,
onChange,
onScopeChange,
}: {
config: Record<string, unknown>;
scope: Record<string, unknown>;
onChange: (config: Record<string, unknown>) => void;
onScopeChange: (scope: Record<string, unknown>) => void;
}) {
return (
<div className="space-y-3">
{/* Type-specific config fields */}
<Separator />
<ScopeSelector scope={scope} onChange={onScopeChange} />
</div>
);
}
```
If your type does NOT have user-configurable scope (like bot or community MQTT), omit the `scope`/`onScopeChange` props and the `ScopeSelector`. The `ScopeSelector` component is defined within the same file — it provides all/none/only/except radio buttons with channel and contact checklists.
**c)** Add default config and scope in `handleAddCreate`:
```tsx
const defaults: Record<string, Record<string, unknown>> = {
// ... existing entries ...
my_type: { some_field: '', other_field: true },
};
const defaultScopes: Record<string, Record<string, unknown>> = {
// ... existing entries ...
my_type: { messages: 'all', raw_packets: 'none' },
};
```
**d)** Wire the editor into the detail view's conditional render block:
```tsx
{editingConfig.type === 'my_type' && (
<MyTypeConfigEditor
config={editConfig}
scope={editScope}
onChange={setEditConfig}
onScopeChange={setEditScope}
/>
)}
```
#### 5. Tests
**Backend integration tests** (`tests/test_fanout_integration.py`):
- Test that a configured + enabled module receives messages via `FanoutManager.broadcast_message`
- Test scope filtering (all, none, selective)
- Test that a disabled module does not receive messages
**Backend unit tests** (`tests/test_fanout_hitlist.py` or a dedicated file):
- Test config validation (required fields, bad values)
- Test module-specific logic in isolation
**Frontend tests** (`frontend/src/test/fanoutSection.test.tsx`):
- The existing suite covers the list/edit/create flow generically. If your editor has special behavior, add specific test cases.
#### Summary of files to touch
| File | Change |
|------|--------|
| `app/fanout/my_type.py` | New module class |
| `app/fanout/manager.py` | Import + register in `_register_module_types()` |
| `app/routers/fanout.py` | `_VALID_TYPES` + validator function + scope enforcement |
| `frontend/.../SettingsFanoutSection.tsx` | `TYPE_LABELS` + `TYPE_OPTIONS` + editor component + defaults + detail view wiring |
| `tests/test_fanout_integration.py` | Integration tests |
## REST API
@@ -101,7 +263,7 @@ Migrations:
## Key Files
- `app/fanout/base.py` — FanoutModule ABC
- `app/fanout/base.py` — FanoutModule base class
- `app/fanout/manager.py` — FanoutManager singleton
- `app/fanout/mqtt_base.py` — BaseMqttPublisher ABC (shared MQTT connection loop)
- `app/fanout/mqtt.py` — MqttPublisher (private MQTT publishing)