Files
2026-02-12 23:52:49 -08:00

351 lines
12 KiB
Python

"""
Bot execution module for automatic message responses.
This module provides functionality for executing user-defined Python code
in response to incoming messages. The user's code can process message data
and optionally return a response string or a list of strings.
SECURITY WARNING: This executes arbitrary Python code provided by the user.
It should only be enabled on trusted systems where the user understands
the security implications.
"""
import asyncio
import inspect
import logging
import time
from concurrent.futures import ThreadPoolExecutor
from typing import Any
from fastapi import HTTPException
logger = logging.getLogger(__name__)
# Limit concurrent bot executions to prevent resource exhaustion
_bot_semaphore = asyncio.Semaphore(100)
# Dedicated thread pool for bot execution (separate from default executor)
_bot_executor = ThreadPoolExecutor(max_workers=100, thread_name_prefix="bot_")
# Timeout for bot code execution (seconds)
BOT_EXECUTION_TIMEOUT = 10
# Minimum spacing between bot message sends (seconds)
# This ensures repeaters have time to return to listening mode
BOT_MESSAGE_SPACING = 2.0
# Global state for rate limiting bot sends
_bot_send_lock = asyncio.Lock()
_last_bot_send_time: float = 0.0
def execute_bot_code(
code: str,
sender_name: str | None,
sender_key: str | None,
message_text: str,
is_dm: bool,
channel_key: str | None,
channel_name: str | None,
sender_timestamp: int | None,
path: str | None,
is_outgoing: bool = False,
) -> str | list[str] | None:
"""
Execute user-provided bot code with message context.
The code should define a function:
`bot(sender_name, sender_key, message_text, is_dm, channel_key, channel_name, sender_timestamp, path, is_outgoing)`
that returns either None (no response), a string (single response message),
or a list of strings (multiple messages sent in order).
Legacy bot functions with 8 parameters (without is_outgoing) are detected
via inspect and called without the new parameter for backward compatibility.
Args:
code: Python code defining the bot function
sender_name: Display name of the sender (may be None)
sender_key: 64-char hex public key of sender for DMs, None for channel messages
message_text: The message content
is_dm: True for direct messages, False for channel messages
channel_key: 32-char hex channel key for channel messages, None for DMs
channel_name: Channel name (e.g. "#general" with hash), None for DMs
sender_timestamp: Sender's timestamp from the message (may be None)
path: Hex-encoded routing path (may be None)
is_outgoing: True if this is our own outgoing message
Returns:
Response string, list of strings, or None.
Note: This executes arbitrary code. Only use with trusted input.
"""
if not code or not code.strip():
return None
# Build execution namespace with allowed imports
namespace: dict[str, Any] = {
"__builtins__": __builtins__,
}
try:
# Execute the user's code to define the bot function
exec(code, namespace)
except Exception as e:
logger.warning("Bot code compilation failed: %s", e)
return None
# Check if bot function was defined
if "bot" not in namespace or not callable(namespace["bot"]):
logger.debug("Bot code does not define a callable 'bot' function")
return None
bot_func = namespace["bot"]
# Detect whether the bot function accepts is_outgoing (new 9-param signature)
# or uses the legacy 8-param signature, for backward compatibility.
# Three cases: explicit is_outgoing param or 9+ params (positional),
# **kwargs (pass as keyword), or legacy 8-param (omit).
call_style = "legacy" # "positional", "keyword", or "legacy"
try:
sig = inspect.signature(bot_func)
params = sig.parameters
non_variadic = [
p
for p in params.values()
if p.kind not in (inspect.Parameter.VAR_POSITIONAL, inspect.Parameter.VAR_KEYWORD)
]
if "is_outgoing" in params or len(non_variadic) >= 9:
call_style = "positional"
elif any(p.kind == inspect.Parameter.VAR_KEYWORD for p in params.values()):
call_style = "keyword"
except (ValueError, TypeError):
pass
try:
# Call the bot function with appropriate signature
if call_style == "positional":
result = bot_func(
sender_name,
sender_key,
message_text,
is_dm,
channel_key,
channel_name,
sender_timestamp,
path,
is_outgoing,
)
elif call_style == "keyword":
result = bot_func(
sender_name,
sender_key,
message_text,
is_dm,
channel_key,
channel_name,
sender_timestamp,
path,
is_outgoing=is_outgoing,
)
else:
result = bot_func(
sender_name,
sender_key,
message_text,
is_dm,
channel_key,
channel_name,
sender_timestamp,
path,
)
# Validate result
if result is None:
return None
if isinstance(result, str):
return result if result.strip() else None
if isinstance(result, list):
# Filter to non-empty strings only
valid_messages = [msg for msg in result if isinstance(msg, str) and msg.strip()]
return valid_messages if valid_messages else None
logger.debug("Bot function returned unsupported type: %s", type(result))
return None
except Exception as e:
logger.warning("Bot function execution failed: %s", e)
return None
async def process_bot_response(
response: str | list[str],
is_dm: bool,
sender_key: str,
channel_key: str | None,
) -> None:
"""
Send the bot's response message(s) using the existing message sending endpoints.
For DMs, sends a direct message back to the sender.
For channel messages, sends to the same channel.
Bot messages are rate-limited to ensure at least BOT_MESSAGE_SPACING seconds
between sends, giving repeaters time to return to listening mode.
Args:
response: The response text to send, or a list of messages to send in order
is_dm: Whether the original message was a DM
sender_key: Public key of the original sender (for DM replies)
channel_key: Channel key for channel message replies
"""
# Normalize to list for uniform processing
messages = [response] if isinstance(response, str) else response
for message_text in messages:
await _send_single_bot_message(message_text, is_dm, sender_key, channel_key)
async def _send_single_bot_message(
message_text: str,
is_dm: bool,
sender_key: str,
channel_key: str | None,
) -> None:
"""
Send a single bot message with rate limiting.
Args:
message_text: The message text to send
is_dm: Whether the original message was a DM
sender_key: Public key of the original sender (for DM replies)
channel_key: Channel key for channel message replies
"""
global _last_bot_send_time
from app.models import SendChannelMessageRequest, SendDirectMessageRequest
from app.routers.messages import send_channel_message, send_direct_message
# Serialize bot sends and enforce minimum spacing
async with _bot_send_lock:
# Calculate how long since last bot send
now = time.monotonic()
time_since_last = now - _last_bot_send_time
if _last_bot_send_time > 0 and time_since_last < BOT_MESSAGE_SPACING:
wait_time = BOT_MESSAGE_SPACING - time_since_last
logger.debug("Rate limiting bot send, waiting %.2fs", wait_time)
await asyncio.sleep(wait_time)
try:
if is_dm:
logger.info("Bot sending DM reply to %s", sender_key[:12])
request = SendDirectMessageRequest(destination=sender_key, text=message_text)
await send_direct_message(request)
elif channel_key:
logger.info("Bot sending channel reply to %s", channel_key[:8])
request = SendChannelMessageRequest(channel_key=channel_key, text=message_text)
await send_channel_message(request)
else:
logger.warning("Cannot send bot response: no destination")
return # Don't update timestamp if we didn't send
except HTTPException as e:
logger.error("Bot failed to send response: %s", e.detail)
return # Don't update timestamp on failure
except Exception as e:
logger.error("Bot failed to send response: %s", e)
return # Don't update timestamp on failure
# Update last send time after successful send
_last_bot_send_time = time.monotonic()
async def run_bot_for_message(
sender_name: str | None,
sender_key: str | None,
message_text: str,
is_dm: bool,
channel_key: str | None,
channel_name: str | None = None,
sender_timestamp: int | None = None,
path: str | None = None,
is_outgoing: bool = False,
) -> None:
"""
Run all enabled bots for a message (incoming or outgoing).
This is the main entry point called by message handlers after
a message is successfully decrypted and stored. Bots run serially,
and errors in one bot don't prevent others from running.
Args:
sender_name: Display name of the sender
sender_key: 64-char hex public key of sender (DMs only, None for channels)
message_text: The message content
is_dm: True for direct messages, False for channel messages
channel_key: Channel key for channel messages
channel_name: Channel name (e.g. "#general"), None for DMs
sender_timestamp: Sender's timestamp from the message
path: Hex-encoded routing path
is_outgoing: Whether this is our own outgoing message
"""
# Early check if any bots are enabled (will re-check after sleep)
from app.repository import AppSettingsRepository
settings = await AppSettingsRepository.get()
enabled_bots = [b for b in settings.bots if b.enabled and b.code.strip()]
if not enabled_bots:
return
async with _bot_semaphore:
logger.debug(
"Running %d bot(s) for message from %s (is_dm=%s)",
len(enabled_bots),
sender_name or (sender_key[:12] if sender_key else "unknown"),
is_dm,
)
# Wait for the initiating message's retransmissions to propagate through the mesh
await asyncio.sleep(2)
# Re-check settings after sleep (user may have changed bot config)
settings = await AppSettingsRepository.get()
enabled_bots = [b for b in settings.bots if b.enabled and b.code.strip()]
if not enabled_bots:
logger.debug("All bots disabled during wait, skipping")
return
# Run each enabled bot serially
loop = asyncio.get_event_loop()
for bot in enabled_bots:
logger.debug("Executing bot '%s'", bot.name)
try:
response = await asyncio.wait_for(
loop.run_in_executor(
_bot_executor,
execute_bot_code,
bot.code,
sender_name,
sender_key,
message_text,
is_dm,
channel_key,
channel_name,
sender_timestamp,
path,
is_outgoing,
),
timeout=BOT_EXECUTION_TIMEOUT,
)
except asyncio.TimeoutError:
logger.warning(
"Bot '%s' execution timed out after %ds", bot.name, BOT_EXECUTION_TIMEOUT
)
continue # Continue to next bot
except Exception as e:
logger.warning("Bot '%s' execution error: %s", bot.name, e)
continue # Continue to next bot
# Send response if any
if response:
await process_bot_response(response, is_dm, sender_key or "", channel_key)