mirror of
https://github.com/jkingsman/Remote-Terminal-for-MeshCore.git
synced 2026-03-28 17:43:05 +01:00
180 lines
6.2 KiB
Python
180 lines
6.2 KiB
Python
"""Fanout module wrapping bot execution logic."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import logging
|
|
|
|
from app.fanout.base import FanoutModule
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def _derive_path_bytes_per_hop(paths: object, path_value: str | None) -> int | None:
|
|
"""Derive hop width from the first serialized message path when possible."""
|
|
if not isinstance(path_value, str) or not path_value:
|
|
return None
|
|
if not isinstance(paths, list) or not paths:
|
|
return None
|
|
|
|
first_path = paths[0]
|
|
if not isinstance(first_path, dict):
|
|
return None
|
|
|
|
path_hops = first_path.get("path_len")
|
|
if not isinstance(path_hops, int) or path_hops <= 0:
|
|
return None
|
|
|
|
path_hex_chars = len(path_value)
|
|
if path_hex_chars % 2 != 0:
|
|
return None
|
|
|
|
path_bytes = path_hex_chars // 2
|
|
if path_bytes % path_hops != 0:
|
|
return None
|
|
|
|
hop_width = path_bytes // path_hops
|
|
if hop_width not in (1, 2, 3):
|
|
return None
|
|
|
|
return hop_width
|
|
|
|
|
|
class BotModule(FanoutModule):
|
|
"""Wraps a single bot's code execution and response routing.
|
|
|
|
Each BotModule represents one bot configuration. It receives decoded
|
|
messages via ``on_message``, executes the bot's Python code in a
|
|
background task (after a 2-second settle delay), and sends any response
|
|
back through the radio.
|
|
"""
|
|
|
|
def __init__(self, config_id: str, config: dict, *, name: str = "Bot") -> None:
|
|
super().__init__(config_id, config, name=name)
|
|
self._tasks: set[asyncio.Task] = set()
|
|
self._active = True
|
|
|
|
async def stop(self) -> None:
|
|
self._active = False
|
|
for task in self._tasks:
|
|
task.cancel()
|
|
# Wait briefly for tasks to acknowledge cancellation
|
|
if self._tasks:
|
|
await asyncio.gather(*self._tasks, return_exceptions=True)
|
|
self._tasks.clear()
|
|
|
|
async def on_message(self, data: dict) -> None:
|
|
"""Kick off bot execution in a background task so we don't block dispatch."""
|
|
task = asyncio.create_task(self._run_for_message(data))
|
|
self._tasks.add(task)
|
|
task.add_done_callback(self._tasks.discard)
|
|
|
|
async def _run_for_message(self, data: dict) -> None:
|
|
from app.fanout.bot_exec import (
|
|
BOT_EXECUTION_TIMEOUT,
|
|
execute_bot_code,
|
|
process_bot_response,
|
|
)
|
|
|
|
code = self.config.get("code", "")
|
|
if not code or not code.strip():
|
|
return
|
|
|
|
msg_type = data.get("type", "")
|
|
is_dm = msg_type == "PRIV"
|
|
conversation_key = data.get("conversation_key", "")
|
|
logger.debug(
|
|
"Bot '%s' starting for type=%s conversation=%s outgoing=%s",
|
|
self.name,
|
|
msg_type or "unknown",
|
|
conversation_key[:12] if conversation_key else "(none)",
|
|
bool(data.get("outgoing", False)),
|
|
)
|
|
|
|
# Extract bot parameters from broadcast data
|
|
if is_dm:
|
|
sender_key = data.get("sender_key") or conversation_key
|
|
is_outgoing = data.get("outgoing", False)
|
|
message_text = data.get("text", "")
|
|
channel_key = None
|
|
channel_name = None
|
|
|
|
# Outgoing DMs: sender is us, not the contact
|
|
if is_outgoing:
|
|
sender_name = None
|
|
else:
|
|
sender_name = data.get("sender_name")
|
|
if sender_name is None:
|
|
from app.repository import ContactRepository
|
|
|
|
contact = await ContactRepository.get_by_key(conversation_key)
|
|
sender_name = contact.name if contact else None
|
|
else:
|
|
sender_key = None
|
|
is_outgoing = bool(data.get("outgoing", False))
|
|
sender_name = data.get("sender_name")
|
|
channel_key = conversation_key
|
|
|
|
channel_name = data.get("channel_name")
|
|
if channel_name is None:
|
|
from app.repository import ChannelRepository
|
|
|
|
channel = await ChannelRepository.get_by_key(conversation_key)
|
|
channel_name = channel.name if channel else None
|
|
|
|
# Strip "sender: " prefix from channel message text
|
|
text = data.get("text", "")
|
|
if sender_name and text.startswith(f"{sender_name}: "):
|
|
message_text = text[len(f"{sender_name}: ") :]
|
|
else:
|
|
message_text = text
|
|
|
|
sender_timestamp = data.get("sender_timestamp")
|
|
path_value = data.get("path")
|
|
paths = data.get("paths")
|
|
# Message model serializes paths as list of dicts; extract first path string
|
|
if path_value is None and paths and isinstance(paths, list) and len(paths) > 0:
|
|
path_value = paths[0].get("path") if isinstance(paths[0], dict) else None
|
|
path_bytes_per_hop = _derive_path_bytes_per_hop(paths, path_value)
|
|
|
|
# Wait for message to settle (allows retransmissions to be deduped)
|
|
await asyncio.sleep(2)
|
|
|
|
# Execute bot code in thread pool with timeout
|
|
from app.fanout.bot_exec import _bot_executor, _bot_semaphore
|
|
|
|
async with _bot_semaphore:
|
|
loop = asyncio.get_running_loop()
|
|
try:
|
|
response = await asyncio.wait_for(
|
|
loop.run_in_executor(
|
|
_bot_executor,
|
|
execute_bot_code,
|
|
code,
|
|
sender_name,
|
|
sender_key,
|
|
message_text,
|
|
is_dm,
|
|
channel_key,
|
|
channel_name,
|
|
sender_timestamp,
|
|
path_value,
|
|
is_outgoing,
|
|
path_bytes_per_hop,
|
|
),
|
|
timeout=BOT_EXECUTION_TIMEOUT,
|
|
)
|
|
except asyncio.TimeoutError:
|
|
logger.warning("Bot '%s' execution timed out", self.name)
|
|
return
|
|
except Exception:
|
|
logger.exception("Bot '%s' execution error", self.name)
|
|
return
|
|
|
|
if response and self._active:
|
|
await process_bot_response(response, is_dm, sender_key or "", channel_key)
|
|
|
|
@property
|
|
def status(self) -> str:
|
|
return "connected"
|