Files

180 lines
6.2 KiB
Python

"""Fanout module wrapping bot execution logic."""
from __future__ import annotations
import asyncio
import logging
from app.fanout.base import FanoutModule
logger = logging.getLogger(__name__)
def _derive_path_bytes_per_hop(paths: object, path_value: str | None) -> int | None:
"""Derive hop width from the first serialized message path when possible."""
if not isinstance(path_value, str) or not path_value:
return None
if not isinstance(paths, list) or not paths:
return None
first_path = paths[0]
if not isinstance(first_path, dict):
return None
path_hops = first_path.get("path_len")
if not isinstance(path_hops, int) or path_hops <= 0:
return None
path_hex_chars = len(path_value)
if path_hex_chars % 2 != 0:
return None
path_bytes = path_hex_chars // 2
if path_bytes % path_hops != 0:
return None
hop_width = path_bytes // path_hops
if hop_width not in (1, 2, 3):
return None
return hop_width
class BotModule(FanoutModule):
"""Wraps a single bot's code execution and response routing.
Each BotModule represents one bot configuration. It receives decoded
messages via ``on_message``, executes the bot's Python code in a
background task (after a 2-second settle delay), and sends any response
back through the radio.
"""
def __init__(self, config_id: str, config: dict, *, name: str = "Bot") -> None:
super().__init__(config_id, config, name=name)
self._tasks: set[asyncio.Task] = set()
self._active = True
async def stop(self) -> None:
self._active = False
for task in self._tasks:
task.cancel()
# Wait briefly for tasks to acknowledge cancellation
if self._tasks:
await asyncio.gather(*self._tasks, return_exceptions=True)
self._tasks.clear()
async def on_message(self, data: dict) -> None:
"""Kick off bot execution in a background task so we don't block dispatch."""
task = asyncio.create_task(self._run_for_message(data))
self._tasks.add(task)
task.add_done_callback(self._tasks.discard)
async def _run_for_message(self, data: dict) -> None:
from app.fanout.bot_exec import (
BOT_EXECUTION_TIMEOUT,
execute_bot_code,
process_bot_response,
)
code = self.config.get("code", "")
if not code or not code.strip():
return
msg_type = data.get("type", "")
is_dm = msg_type == "PRIV"
conversation_key = data.get("conversation_key", "")
logger.debug(
"Bot '%s' starting for type=%s conversation=%s outgoing=%s",
self.name,
msg_type or "unknown",
conversation_key[:12] if conversation_key else "(none)",
bool(data.get("outgoing", False)),
)
# Extract bot parameters from broadcast data
if is_dm:
sender_key = data.get("sender_key") or conversation_key
is_outgoing = data.get("outgoing", False)
message_text = data.get("text", "")
channel_key = None
channel_name = None
# Outgoing DMs: sender is us, not the contact
if is_outgoing:
sender_name = None
else:
sender_name = data.get("sender_name")
if sender_name is None:
from app.repository import ContactRepository
contact = await ContactRepository.get_by_key(conversation_key)
sender_name = contact.name if contact else None
else:
sender_key = None
is_outgoing = bool(data.get("outgoing", False))
sender_name = data.get("sender_name")
channel_key = conversation_key
channel_name = data.get("channel_name")
if channel_name is None:
from app.repository import ChannelRepository
channel = await ChannelRepository.get_by_key(conversation_key)
channel_name = channel.name if channel else None
# Strip "sender: " prefix from channel message text
text = data.get("text", "")
if sender_name and text.startswith(f"{sender_name}: "):
message_text = text[len(f"{sender_name}: ") :]
else:
message_text = text
sender_timestamp = data.get("sender_timestamp")
path_value = data.get("path")
paths = data.get("paths")
# Message model serializes paths as list of dicts; extract first path string
if path_value is None and paths and isinstance(paths, list) and len(paths) > 0:
path_value = paths[0].get("path") if isinstance(paths[0], dict) else None
path_bytes_per_hop = _derive_path_bytes_per_hop(paths, path_value)
# Wait for message to settle (allows retransmissions to be deduped)
await asyncio.sleep(2)
# Execute bot code in thread pool with timeout
from app.fanout.bot_exec import _bot_executor, _bot_semaphore
async with _bot_semaphore:
loop = asyncio.get_running_loop()
try:
response = await asyncio.wait_for(
loop.run_in_executor(
_bot_executor,
execute_bot_code,
code,
sender_name,
sender_key,
message_text,
is_dm,
channel_key,
channel_name,
sender_timestamp,
path_value,
is_outgoing,
path_bytes_per_hop,
),
timeout=BOT_EXECUTION_TIMEOUT,
)
except asyncio.TimeoutError:
logger.warning("Bot '%s' execution timed out", self.name)
return
except Exception:
logger.exception("Bot '%s' execution error", self.name)
return
if response and self._active:
await process_bot_response(response, is_dm, sender_key or "", channel_key)
@property
def status(self) -> str:
return "connected"