Implement room server functionality: add database schema, message handling, and client synchronization

This commit is contained in:
Lloyd
2025-12-18 10:44:00 +00:00
parent 02d3cbc396
commit 710ee5b666
5 changed files with 907 additions and 4 deletions
+204 -1
View File
@@ -101,6 +101,41 @@ class SQLiteHandler:
conn.execute("CREATE INDEX IF NOT EXISTS idx_transport_keys_name ON transport_keys(name)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_transport_keys_parent ON transport_keys(parent_id)")
# Room server tables
conn.execute("""
CREATE TABLE IF NOT EXISTS room_messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
room_hash TEXT NOT NULL,
author_pubkey TEXT NOT NULL,
post_timestamp REAL NOT NULL,
sender_timestamp REAL,
message_text TEXT NOT NULL,
txt_type INTEGER NOT NULL,
created_at REAL NOT NULL
)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS room_client_sync (
id INTEGER PRIMARY KEY AUTOINCREMENT,
room_hash TEXT NOT NULL,
client_pubkey TEXT NOT NULL,
sync_since REAL NOT NULL DEFAULT 0,
pending_ack_crc INTEGER DEFAULT 0,
push_post_timestamp REAL DEFAULT 0,
ack_timeout_time REAL DEFAULT 0,
push_failures INTEGER DEFAULT 0,
last_activity REAL NOT NULL,
updated_at REAL NOT NULL,
UNIQUE(room_hash, client_pubkey)
)
""")
conn.execute("CREATE INDEX IF NOT EXISTS idx_room_messages_room ON room_messages(room_hash, post_timestamp)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_room_messages_author ON room_messages(author_pubkey)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_room_client_sync_room ON room_client_sync(room_hash, client_pubkey)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_room_client_sync_pending ON room_client_sync(pending_ack_crc)")
conn.commit()
logger.info(f"SQLite database initialized: {self.sqlite_path}")
@@ -868,4 +903,172 @@ class SQLiteHandler:
return cursor.rowcount > 0
except Exception as e:
logger.error(f"Failed to delete advert: {e}")
return False
return False
# ------------------------------------------------------------------
# Room Server Methods
# ------------------------------------------------------------------
def insert_room_message(self, room_hash: str, author_pubkey: str, message_text: str,
post_timestamp: float, sender_timestamp: float = None,
txt_type: int = 0) -> Optional[int]:
"""Insert a new room message and return its ID."""
try:
with sqlite3.connect(self.sqlite_path) as conn:
cursor = conn.execute("""
INSERT INTO room_messages (
room_hash, author_pubkey, post_timestamp, sender_timestamp,
message_text, txt_type, created_at
) VALUES (?, ?, ?, ?, ?, ?, ?)
""", (
room_hash, author_pubkey, post_timestamp, sender_timestamp,
message_text, txt_type, time.time()
))
return cursor.lastrowid
except Exception as e:
logger.error(f"Failed to insert room message: {e}")
return None
def get_unsynced_messages(self, room_hash: str, client_pubkey: str,
sync_since: float, limit: int = 100) -> List[Dict]:
"""Get messages for a room that client hasn't synced yet."""
try:
with sqlite3.connect(self.sqlite_path) as conn:
conn.row_factory = sqlite3.Row
cursor = conn.execute("""
SELECT * FROM room_messages
WHERE room_hash = ?
AND post_timestamp > ?
AND author_pubkey != ?
ORDER BY post_timestamp ASC
LIMIT ?
""", (room_hash, sync_since, client_pubkey, limit))
return [dict(row) for row in cursor.fetchall()]
except Exception as e:
logger.error(f"Failed to get unsynced messages: {e}")
return []
def get_unsynced_count(self, room_hash: str, client_pubkey: str, sync_since: float) -> int:
"""Count unsynced messages for a client."""
try:
with sqlite3.connect(self.sqlite_path) as conn:
cursor = conn.execute("""
SELECT COUNT(*) FROM room_messages
WHERE room_hash = ?
AND post_timestamp > ?
AND author_pubkey != ?
""", (room_hash, sync_since, client_pubkey))
return cursor.fetchone()[0]
except Exception as e:
logger.error(f"Failed to count unsynced messages: {e}")
return 0
def upsert_client_sync(self, room_hash: str, client_pubkey: str, **kwargs) -> bool:
"""Insert or update client sync state."""
try:
with sqlite3.connect(self.sqlite_path) as conn:
# Check if exists
cursor = conn.execute("""
SELECT id FROM room_client_sync
WHERE room_hash = ? AND client_pubkey = ?
""", (room_hash, client_pubkey))
existing = cursor.fetchone()
kwargs['updated_at'] = time.time()
if existing:
# Update
set_clauses = []
values = []
for key, value in kwargs.items():
set_clauses.append(f"{key} = ?")
values.append(value)
values.extend([room_hash, client_pubkey])
conn.execute(f"""
UPDATE room_client_sync
SET {', '.join(set_clauses)}
WHERE room_hash = ? AND client_pubkey = ?
""", values)
else:
# Insert with defaults
kwargs.setdefault('sync_since', 0)
kwargs.setdefault('pending_ack_crc', 0)
kwargs.setdefault('push_post_timestamp', 0)
kwargs.setdefault('ack_timeout_time', 0)
kwargs.setdefault('push_failures', 0)
kwargs.setdefault('last_activity', time.time())
columns = ['room_hash', 'client_pubkey'] + list(kwargs.keys())
placeholders = ['?'] * len(columns)
values = [room_hash, client_pubkey] + list(kwargs.values())
conn.execute(f"""
INSERT INTO room_client_sync ({', '.join(columns)})
VALUES ({', '.join(placeholders)})
""", values)
conn.commit()
return True
except Exception as e:
logger.error(f"Failed to upsert client sync: {e}")
return False
def get_client_sync(self, room_hash: str, client_pubkey: str) -> Optional[Dict]:
"""Get client sync state."""
try:
with sqlite3.connect(self.sqlite_path) as conn:
conn.row_factory = sqlite3.Row
cursor = conn.execute("""
SELECT * FROM room_client_sync
WHERE room_hash = ? AND client_pubkey = ?
""", (room_hash, client_pubkey))
row = cursor.fetchone()
return dict(row) if row else None
except Exception as e:
logger.error(f"Failed to get client sync: {e}")
return None
def get_all_room_clients(self, room_hash: str) -> List[Dict]:
"""Get all clients for a room."""
try:
with sqlite3.connect(self.sqlite_path) as conn:
conn.row_factory = sqlite3.Row
cursor = conn.execute("""
SELECT * FROM room_client_sync
WHERE room_hash = ?
ORDER BY last_activity DESC
""", (room_hash,))
return [dict(row) for row in cursor.fetchall()]
except Exception as e:
logger.error(f"Failed to get room clients: {e}")
return []
def cleanup_old_messages(self, room_hash: str, keep_count: int = 32) -> int:
"""Keep only the most recent N messages per room."""
try:
with sqlite3.connect(self.sqlite_path) as conn:
# First check if cleanup is needed
cursor = conn.execute("""
SELECT COUNT(*) FROM room_messages WHERE room_hash = ?
""", (room_hash,))
total_count = cursor.fetchone()[0]
if total_count <= keep_count:
return 0 # No cleanup needed
# Delete old messages
cursor = conn.execute("""
DELETE FROM room_messages
WHERE room_hash = ?
AND id NOT IN (
SELECT id FROM room_messages
WHERE room_hash = ?
ORDER BY post_timestamp DESC
LIMIT ?
)
""", (room_hash, room_hash, keep_count))
return cursor.rowcount
except Exception as e:
logger.error(f"Failed to cleanup old messages: {e}")
return 0
+2 -1
View File
@@ -76,12 +76,13 @@ class LoginHelper:
logger.info(f"Created ACL for {identity_type} '{name}': hash=0x{hash_byte:02X}")
# Create auth callback that uses this identity's ACL
def auth_callback_with_context(client_identity, shared_secret, password, timestamp):
def auth_callback_with_context(client_identity, shared_secret, password, timestamp, sync_since=None):
return identity_acl.authenticate_client(
client_identity=client_identity,
shared_secret=shared_secret,
password=password,
timestamp=timestamp,
sync_since=sync_since,
target_identity_hash=hash_byte,
target_identity_name=name,
target_identity_config=config
+605
View File
@@ -0,0 +1,605 @@
import asyncio
import logging
import time
from typing import Optional, Dict
from pymc_core.protocol import PacketBuilder, CryptoUtils
from pymc_core.protocol.constants import PAYLOAD_TYPE_TXT_MSG
logger = logging.getLogger("RoomServer")
# Hard limit from C++ simple_room_server
MAX_UNSYNCED_POSTS = 32
# Text message type constants
TXT_TYPE_PLAIN = 0x00
TXT_TYPE_CLI_DATA = 0x01
TXT_TYPE_SIGNED_PLAIN = 0x02
# Push timing constants (from C++ simple_room_server)
PUSH_NOTIFY_DELAY_MS = 2000
SYNC_PUSH_INTERVAL_MS = 1200
POST_SYNC_DELAY_SECS = 6
PUSH_ACK_TIMEOUT_FLOOD_MS = 12000
PUSH_TIMEOUT_BASE_MS = 4000
PUSH_ACK_TIMEOUT_FACTOR_MS = 2000
# Safety limits and protections
MAX_MESSAGE_LENGTH = 160 # Match C++ MAX_POST_TEXT_LEN (151 bytes for text)
MAX_POSTS_PER_CLIENT_PER_MINUTE = 10 # Prevent spam
MAX_CLIENTS_PER_ROOM = 50 # From ACL default
MAX_PUSH_FAILURES = 3 # Evict after this many consecutive failures
INACTIVE_CLIENT_TIMEOUT = 3600 # Evict after 1 hour inactivity (seconds)
MAX_CONSECUTIVE_SYNC_ERRORS = 10 # Circuit breaker threshold
DB_ERROR_RETRY_DELAY = 60 # Wait 1 minute on DB error (seconds)
# Backoff strategy for failed pushes (seconds)
RETRY_BACKOFF_SCHEDULE = [0, 30, 300, 3600] # 0s, 30s, 5min, 1hr
# Global rate limiter (shared across all rooms)
_global_push_limiter = None
_global_push_lock = asyncio.Lock()
GLOBAL_MIN_GAP_BETWEEN_MESSAGES = 1.1 # 1.1s minimum gap between transmissions
class GlobalRateLimiter:
def __init__(self, min_gap_seconds: float = 0.1):
self.min_gap = min_gap_seconds # Minimum gap between consecutive messages
self.lock = asyncio.Lock() # Only one transmission at a time
self.last_release_time = 0
async def acquire(self):
async with self.lock:
# Enforce minimum gap between consecutive transmissions
now = time.time()
time_since_last = now - self.last_release_time
if time_since_last < self.min_gap:
wait_time = self.min_gap - time_since_last
logger.debug(f"Global rate limiter: waiting {wait_time*1000:.0f}ms")
await asyncio.sleep(wait_time)
# Lock is now held - caller can transmit
# Will be released when context exits
def release(self):
self.last_release_time = time.time()
class RoomServer:
def __init__(
self,
room_hash: int,
room_name: str,
local_identity,
sqlite_handler,
packet_injector,
acl,
max_posts: int = 32
):
self.room_hash = room_hash
self.room_name = room_name
self.local_identity = local_identity
self.db = sqlite_handler
self.packet_injector = packet_injector
self.acl = acl
# Enforce hard limit (match C++ MAX_UNSYNCED_POSTS)
if max_posts > MAX_UNSYNCED_POSTS:
logger.warning(
f"Room '{room_name}': max_posts={max_posts} exceeds hard limit "
f"of {MAX_UNSYNCED_POSTS}, capping to {MAX_UNSYNCED_POSTS}"
)
max_posts = MAX_UNSYNCED_POSTS
self.max_posts = max_posts
# Round-robin state
self.next_client_idx = 0
self.next_push_time = 0
# Cleanup tracking
self.last_cleanup_time = time.time()
self.cleanup_interval = 600 # Cleanup every 10 minutes
# Safety and monitoring
self.client_post_times = {} # Track last N post times per client for rate limiting
self.consecutive_sync_errors = 0 # Circuit breaker counter
self.last_eviction_check = time.time()
self.eviction_check_interval = 300 # Check every 5 minutes
# Initialize global rate limiter (singleton)
global _global_push_limiter
if _global_push_limiter is None:
_global_push_limiter = GlobalRateLimiter(GLOBAL_MIN_GAP_BETWEEN_MESSAGES)
self.global_limiter = _global_push_limiter
# Background task handle
self._sync_task = None
self._running = False
logger.info(
f"RoomServer initialized: name='{room_name}', "
f"hash=0x{room_hash:02X}, max_posts={max_posts}"
)
async def start(self):
if self._running:
logger.warning(f"Room '{self.room_name}' sync loop already running")
return
self._running = True
self._sync_task = asyncio.create_task(self._sync_loop())
logger.info(f"Room '{self.room_name}' sync loop started")
async def stop(self):
self._running = False
if self._sync_task:
self._sync_task.cancel()
try:
await self._sync_task
except asyncio.CancelledError:
pass
logger.info(f"Room '{self.room_name}' sync loop stopped")
async def add_post(
self,
client_pubkey: bytes,
message_text: str,
sender_timestamp: int,
txt_type: int = TXT_TYPE_PLAIN
) -> bool:
try:
# SAFETY: Validate message length
if len(message_text) > MAX_MESSAGE_LENGTH:
logger.warning(
f"Room '{self.room_name}': Message from {client_pubkey[:4].hex()} "
f"exceeds max length ({len(message_text)} > {MAX_MESSAGE_LENGTH}), truncating"
)
message_text = message_text[:MAX_MESSAGE_LENGTH]
# SAFETY: Rate limit per client
client_key = client_pubkey.hex()
now = time.time()
if client_key not in self.client_post_times:
self.client_post_times[client_key] = []
# Remove timestamps older than 1 minute
self.client_post_times[client_key] = [
t for t in self.client_post_times[client_key]
if now - t < 60
]
# Check rate limit
if len(self.client_post_times[client_key]) >= MAX_POSTS_PER_CLIENT_PER_MINUTE:
logger.warning(
f"Room '{self.room_name}': Client {client_pubkey[:4].hex()} "
f"exceeded rate limit ({MAX_POSTS_PER_CLIENT_PER_MINUTE} posts/min), dropping message"
)
return False
# Record this post time
self.client_post_times[client_key].append(now)
# Use our RTC time for post_timestamp
post_timestamp = time.time()
# Store to database
msg_id = self.db.insert_room_message(
room_hash=f"0x{self.room_hash:02X}",
author_pubkey=client_pubkey.hex(),
message_text=message_text,
post_timestamp=post_timestamp,
sender_timestamp=sender_timestamp,
txt_type=txt_type
)
if msg_id:
logger.info(
f"Room '{self.room_name}': New post #{msg_id} from "
f"{client_pubkey[:4].hex()}: {message_text[:50]}"
)
# Trigger push notification
self.next_push_time = time.time() + (PUSH_NOTIFY_DELAY_MS / 1000.0)
return True
else:
logger.error(f"Failed to store message to database")
return False
except Exception as e:
logger.error(f"Error adding post: {e}", exc_info=True)
return False
async def push_post_to_client(self, client_info, post: Dict) -> bool:
try:
# SAFETY: Global transmission lock - only ONE message on radio at a time
# This is critical because LoRa is serial (0.5-9s airtime per message)
await self.global_limiter.acquire()
# SAFETY: Check client failure backoff
sync_state = self.db.get_client_sync(
room_hash=f"0x{self.room_hash:02X}",
client_pubkey=client_info.id.get_public_key().hex()
)
if sync_state:
failures = sync_state.get('push_failures', 0)
if failures > 0:
# Apply exponential backoff
backoff_idx = min(failures, len(RETRY_BACKOFF_SCHEDULE) - 1)
backoff_delay = RETRY_BACKOFF_SCHEDULE[backoff_idx]
last_failure_time = sync_state.get('updated_at', 0)
time_since_failure = time.time() - last_failure_time
if time_since_failure < backoff_delay:
wait_time = backoff_delay - time_since_failure
logger.debug(
f"Room '{self.room_name}': Client 0x{client_info.id.get_public_key()[0]:02X} "
f"in backoff (failure {failures}), waiting {wait_time:.0f}s"
)
return False # Skip this client for now
# Build message payload
timestamp = int(time.time())
flags = (TXT_TYPE_SIGNED_PLAIN << 2) # Include author prefix
# Author prefix (first 4 bytes of pubkey)
author_pubkey = bytes.fromhex(post['author_pubkey'])
author_prefix = author_pubkey[:4]
# Plaintext: timestamp(4) + flags(1) + author_prefix(4) + text
message_bytes = post['message_text'].encode('utf-8')
plaintext = (
timestamp.to_bytes(4, 'little') +
bytes([flags]) +
author_prefix +
message_bytes
)
# Calculate expected ACK (same algorithm as pymc_core)
attempt = 0
pack_data = PacketBuilder._pack_timestamp_data(timestamp, attempt, message_bytes)
ack_hash = CryptoUtils.sha256(pack_data + client_info.id.get_public_key())[:4]
expected_ack_crc = int.from_bytes(ack_hash, 'little')
# Determine routing based on stored out_path
route_type = "flood" if client_info.out_path_len < 0 else "direct"
# Create datagram
packet = PacketBuilder.create_datagram(
ptype=PAYLOAD_TYPE_TXT_MSG,
dest=client_info.id,
local_identity=self.local_identity,
secret=client_info.shared_secret,
plaintext=plaintext,
route_type=route_type
)
# Add stored path for direct routing
if route_type == "direct" and len(client_info.out_path) > 0:
packet.path = bytearray(client_info.out_path[:client_info.out_path_len])
packet.path_len = client_info.out_path_len
# Calculate ACK timeout
if route_type == "flood":
ack_timeout = PUSH_ACK_TIMEOUT_FLOOD_MS / 1000.0
else:
path_len = client_info.out_path_len if client_info.out_path_len >= 0 else 0
ack_timeout = (PUSH_TIMEOUT_BASE_MS + PUSH_ACK_TIMEOUT_FACTOR_MS * (path_len + 1)) / 1000.0
# Update client sync state with pending ACK
self.db.upsert_client_sync(
room_hash=f"0x{self.room_hash:02X}",
client_pubkey=client_info.id.get_public_key().hex(),
pending_ack_crc=expected_ack_crc,
push_post_timestamp=post['post_timestamp'],
ack_timeout_time=time.time() + ack_timeout
)
# Send packet (dispatcher will track ACK automatically)
# This blocks for the entire transmission duration (0.5-9 seconds)
success = await self.packet_injector(packet, wait_for_ack=True)
# SAFETY: Release transmission lock AFTER send completes
self.global_limiter.release()
if success:
# ACK received! Update sync state
await self._handle_ack_received(
client_info.id.get_public_key(),
post['post_timestamp']
)
logger.info(
f"Room '{self.room_name}': Pushed post to "
f"0x{client_info.id.get_public_key()[0]:02X} via {route_type.upper()}, ACK received"
)
else:
# ACK timeout
await self._handle_ack_timeout(client_info.id.get_public_key())
logger.warning(
f"Room '{self.room_name}': Push to "
f"0x{client_info.id.get_public_key()[0]:02X} timed out"
)
return success
except Exception as e:
logger.error(f"Error pushing post to client: {e}", exc_info=True)
return False
async def _handle_ack_received(self, client_pubkey: bytes, post_timestamp: float):
try:
# Update sync state: advance sync_since, clear pending_ack, reset failures
self.db.upsert_client_sync(
room_hash=f"0x{self.room_hash:02X}",
client_pubkey=client_pubkey.hex(),
sync_since=post_timestamp,
pending_ack_crc=0,
push_failures=0,
last_activity=time.time()
)
except Exception as e:
logger.error(f"Error handling ACK received: {e}")
async def _handle_ack_timeout(self, client_pubkey: bytes):
try:
# Get current sync state
sync_state = self.db.get_client_sync(
room_hash=f"0x{self.room_hash:02X}",
client_pubkey=client_pubkey.hex()
)
if sync_state:
# Increment failure counter, clear pending_ack
failures = sync_state.get('push_failures', 0) + 1
self.db.upsert_client_sync(
room_hash=f"0x{self.room_hash:02X}",
client_pubkey=client_pubkey.hex(),
push_failures=failures,
pending_ack_crc=0
)
if failures >= 3:
logger.warning(
f"Room '{self.room_name}': Client 0x{client_pubkey[0]:02X} "
f"has {failures} consecutive failures"
)
except Exception as e:
logger.error(f"Error handling ACK timeout: {e}")
def get_unsynced_count(self, client_pubkey: bytes) -> int:
try:
# Get client's sync state
sync_state = self.db.get_client_sync(
room_hash=f"0x{self.room_hash:02X}",
client_pubkey=client_pubkey.hex()
)
sync_since = sync_state['sync_since'] if sync_state else 0
return self.db.get_unsynced_count(
room_hash=f"0x{self.room_hash:02X}",
client_pubkey=client_pubkey.hex(),
sync_since=sync_since
)
except Exception as e:
logger.error(f"Error getting unsynced count: {e}")
return 0
async def _evict_failed_clients(self):
try:
now = time.time()
all_sync_states = self.db.get_all_room_clients(f"0x{self.room_hash:02X}")
for sync_state in all_sync_states:
client_pubkey_hex = sync_state['client_pubkey']
push_failures = sync_state.get('push_failures', 0)
last_activity = sync_state.get('last_activity', 0)
evict = False
reason = ""
# Check max failures
if push_failures >= MAX_PUSH_FAILURES:
evict = True
reason = f"max failures ({push_failures})"
# Check inactivity timeout
elif now - last_activity > INACTIVE_CLIENT_TIMEOUT:
evict = True
reason = f"inactive for {(now - last_activity) / 60:.0f} minutes"
if evict:
# Remove from database
self.db.upsert_client_sync(
room_hash=f"0x{self.room_hash:02X}",
client_pubkey=client_pubkey_hex,
last_activity=0 # Mark as evicted
)
# Remove from ACL
client_pubkey = bytes.fromhex(client_pubkey_hex)
self.acl.remove_client(client_pubkey)
logger.info(
f"Room '{self.room_name}': Evicted client "
f"0x{client_pubkey[0]:02X} ({reason})"
)
except Exception as e:
logger.error(f"Error evicting failed clients: {e}", exc_info=True)
async def _sync_loop(self):
# SAFETY: Stagger room startup to prevent thundering herd
import random
startup_delay = random.uniform(0, 5) # 0-5 second random delay
await asyncio.sleep(startup_delay)
logger.info(f"Room '{self.room_name}' sync loop starting (delayed {startup_delay:.1f}s)")
while self._running:
try:
await asyncio.sleep(SYNC_PUSH_INTERVAL_MS / 1000.0)
# SAFETY: Circuit breaker - stop if too many consecutive errors
if self.consecutive_sync_errors >= MAX_CONSECUTIVE_SYNC_ERRORS:
logger.error(
f"Room '{self.room_name}': Circuit breaker tripped! "
f"{self.consecutive_sync_errors} consecutive errors. Pausing for {DB_ERROR_RETRY_DELAY}s"
)
await asyncio.sleep(DB_ERROR_RETRY_DELAY)
self.consecutive_sync_errors = 0 # Reset after pause
continue
# SAFETY: Periodic eviction check (every 5 minutes)
if time.time() - self.last_eviction_check > self.eviction_check_interval:
await self._evict_failed_clients()
self.last_eviction_check = time.time()
# Periodic cleanup check (every 10 minutes)
if time.time() - self.last_cleanup_time > self.cleanup_interval:
await self._cleanup_old_messages()
self.last_cleanup_time = time.time()
# Check if it's time to push
if time.time() < self.next_push_time:
continue
# Get all clients for this room
all_clients = self.acl.get_all_clients()
if not all_clients:
self.next_push_time = time.time() + 1.0 # Check again in 1 second
continue
# SAFETY: Limit number of clients
if len(all_clients) > MAX_CLIENTS_PER_ROOM:
logger.warning(
f"Room '{self.room_name}': Too many clients ({len(all_clients)} > {MAX_CLIENTS_PER_ROOM})"
)
all_clients = all_clients[:MAX_CLIENTS_PER_ROOM]
# Check for ACK timeouts first
await self._check_ack_timeouts()
# Round-robin: get next client
if self.next_client_idx >= len(all_clients):
self.next_client_idx = 0
client = all_clients[self.next_client_idx]
self.next_client_idx = (self.next_client_idx + 1) % len(all_clients)
# Get client sync state
sync_state = self.db.get_client_sync(
room_hash=f"0x{self.room_hash:02X}",
client_pubkey=client.id.get_public_key().hex()
)
# Skip if already waiting for ACK, evicted, or max failures
if sync_state:
pending_ack = sync_state.get('pending_ack_crc', 0)
last_activity = sync_state.get('last_activity', 0)
push_failures = sync_state.get('push_failures', 0)
if pending_ack != 0:
logger.debug(f"Skipping client 0x{client.id.get_public_key()[0]:02X} (waiting for ACK)")
self.next_push_time = time.time() + (SYNC_PUSH_INTERVAL_MS / 8000.0)
continue
if last_activity == 0:
logger.debug(f"Skipping client 0x{client.id.get_public_key()[0]:02X} (evicted)")
continue
if push_failures >= 3:
logger.debug(f"Skipping client 0x{client.id.get_public_key()[0]:02X} (max failures)")
continue
sync_since = sync_state.get('sync_since', 0)
else:
# Initialize sync state for new client
# Use sync_since from ACL client (sent during login) if available
sync_since = client.sync_since if hasattr(client, 'sync_since') else 0
logger.info(
f"Room '{self.room_name}': Initializing client "
f"0x{client.id.get_public_key()[0]:02X} with sync_since={sync_since}"
)
self.db.upsert_client_sync(
room_hash=f"0x{self.room_hash:02X}",
client_pubkey=client.id.get_public_key().hex(),
sync_since=sync_since,
last_activity=time.time()
)
# Find next unsynced message for this client
unsynced = self.db.get_unsynced_messages(
room_hash=f"0x{self.room_hash:02X}",
client_pubkey=client.id.get_public_key().hex(),
sync_since=sync_since,
limit=1
)
if unsynced:
post = unsynced[0]
# Check if enough time has passed since post creation
now = time.time()
if now >= post['post_timestamp'] + POST_SYNC_DELAY_SECS:
# Push this post
await self.push_post_to_client(client, post)
self.next_push_time = time.time() + (SYNC_PUSH_INTERVAL_MS / 1000.0)
else:
# Not ready yet, check sooner
self.next_push_time = time.time() + (SYNC_PUSH_INTERVAL_MS / 8000.0)
else:
# No unsynced posts, check next client sooner
self.next_push_time = time.time() + (SYNC_PUSH_INTERVAL_MS / 8000.0)
# SAFETY: Reset error counter on successful iteration
self.consecutive_sync_errors = 0
except asyncio.CancelledError:
break
except Exception as e:
# SAFETY: Track consecutive errors for circuit breaker
self.consecutive_sync_errors += 1
logger.error(
f"Room '{self.room_name}': Sync loop error #{self.consecutive_sync_errors}: {e}",
exc_info=True
)
# SAFETY: Back off on errors
backoff = min(self.consecutive_sync_errors, 10) # Cap at 10 seconds
await asyncio.sleep(backoff)
logger.info(f"Room '{self.room_name}' sync loop stopped")
async def _check_ack_timeouts(self):
try:
now = time.time()
all_sync_states = self.db.get_all_room_clients(f"0x{self.room_hash:02X}")
for sync_state in all_sync_states:
if sync_state['pending_ack_crc'] != 0:
timeout_time = sync_state.get('ack_timeout_time', 0)
if now >= timeout_time:
# ACK timeout
client_pubkey = bytes.fromhex(sync_state['client_pubkey'])
await self._handle_ack_timeout(client_pubkey)
except Exception as e:
logger.error(f"Error checking ACK timeouts: {e}")
async def _cleanup_old_messages(self):
try:
deleted = self.db.cleanup_old_messages(
room_hash=f"0x{self.room_hash:02X}",
keep_count=self.max_posts
)
if deleted > 0:
logger.info(f"Room '{self.room_name}': Cleaned up {deleted} old messages")
except Exception as e:
logger.error(f"Error cleaning up old messages: {e}")
+94 -1
View File
@@ -8,9 +8,11 @@ Also handles CLI commands for admin users on the repeater identity.
import asyncio
import logging
import struct
from pymc_core.node.handlers.text import TextMessageHandler
from .repeater_cli import RepeaterCLI
from .room_server import RoomServer
logger = logging.getLogger("TextHelper")
@@ -18,16 +20,21 @@ logger = logging.getLogger("TextHelper")
class TextHelper:
def __init__(self, identity_manager, packet_injector=None, acl_dict=None, log_fn=None,
config_path: str = None, config: dict = None, save_config_callback=None):
config_path: str = None, config: dict = None, save_config_callback=None,
sqlite_handler=None):
self.identity_manager = identity_manager
self.packet_injector = packet_injector
self.log_fn = log_fn or logger.info
self.acl_dict = acl_dict or {} # Per-identity ACLs keyed by hash_byte
self.sqlite_handler = sqlite_handler # For room server database operations
# Dictionary of handlers keyed by dest_hash
self.handlers = {}
# Dictionary of room servers keyed by dest_hash
self.room_servers = {}
# Track repeater identity for CLI commands
self.repeater_hash = None
@@ -79,6 +86,44 @@ class TextHelper:
self.repeater_hash = hash_byte
logger.info(f"Set repeater hash for CLI: 0x{hash_byte:02X}")
# Create RoomServer instance for room_server identities
if identity_type == "room_server" and self.sqlite_handler:
try:
from .room_server import MAX_UNSYNCED_POSTS
room_config = radio_config or {}
max_posts = room_config.get('max_posts', MAX_UNSYNCED_POSTS)
# Enforce hard limit
if max_posts > MAX_UNSYNCED_POSTS:
logger.warning(
f"Room '{name}': Configured max_posts={max_posts} exceeds hard limit "
f"of {MAX_UNSYNCED_POSTS}, capping to {MAX_UNSYNCED_POSTS}"
)
max_posts = MAX_UNSYNCED_POSTS
room_server = RoomServer(
room_hash=hash_byte,
room_name=name,
local_identity=identity,
sqlite_handler=self.sqlite_handler,
packet_injector=self.packet_injector,
acl=identity_acl,
max_posts=max_posts
)
self.room_servers[hash_byte] = room_server
# Start sync loop
asyncio.create_task(room_server.start())
logger.info(
f"Registered room server '{name}': hash=0x{hash_byte:02X}, "
f"max_posts={max_posts}"
)
except Exception as e:
logger.error(f"Failed to create room server '{name}': {e}", exc_info=True)
logger.info(
f"Registered {identity_type} '{name}' text handler: hash=0x{hash_byte:02X}"
)
@@ -171,6 +216,43 @@ class TextHelper:
f"[{identity_type}:{identity_name}] Message: {message_text}"
)
# Handle room server messages - store to database
if identity_type == "room_server" and dest_hash in self.room_servers:
try:
room_server = self.room_servers[dest_hash]
# Get sender's full public key from ACL
identity_acl = self.acl_dict.get(dest_hash)
sender_pubkey = bytes([src_hash]) + b'\x00' * 31 # Default
if identity_acl:
for client_info in identity_acl.get_all_clients():
if client_info.id.get_public_key()[0] == src_hash:
sender_pubkey = client_info.id.get_public_key()
break
# Extract timestamp and txt_type from decrypted data
# Packet decryption already happened in TextMessageHandler
# We need to extract from original payload if available
sender_timestamp = int(packet.decrypted.get('timestamp', 0)) if hasattr(packet, 'decrypted') else 0
txt_type = 0 # TXT_TYPE_PLAIN by default
# Store message to room database
await room_server.add_post(
client_pubkey=sender_pubkey,
message_text=message_text,
sender_timestamp=sender_timestamp,
txt_type=txt_type
)
logger.info(
f"Room '{identity_name}': Stored message from 0x{src_hash:02X}"
)
except Exception as e:
logger.error(f"Failed to store room message: {e}", exc_info=True)
# Room messages don't need further processing
return
# Check if this is a CLI command to the repeater (AFTER decryption)
if dest_hash == self.repeater_hash and self.cli and self._is_cli_command(message_text):
try:
@@ -235,6 +317,17 @@ class TextHelper:
for hash_byte, info in self.handlers.items()
]
async def cleanup(self):
"""Cleanup room servers and handlers."""
# Stop all room server sync loops
for room_server in self.room_servers.values():
try:
await room_server.stop()
except Exception as e:
logger.error(f"Error stopping room server: {e}")
logger.info("TextHelper cleanup complete")
def _is_cli_command(self, message: str) -> bool:
"""Check if message looks like a CLI command."""
# Strip optional sequence prefix (XX|)
+2 -1
View File
@@ -196,6 +196,7 @@ class RepeaterDaemon:
config_path=getattr(self, 'config_path', None), # For CLI to save changes
config=self.config, # For CLI to read/modify settings
save_config_callback=lambda: self._save_config(getattr(self, 'config_path', '/tmp/config.yaml')), # For CLI to persist changes
sqlite_handler=self.repeater_handler.storage.sqlite if self.repeater_handler and self.repeater_handler.storage else None, # For room server database
)
# Register default repeater identity for text messages
@@ -212,7 +213,7 @@ class RepeaterDaemon:
name=name,
identity=identity,
identity_type="room_server",
radio_config=self.config.get("radio", {})
radio_config=config # Pass room-specific config (includes max_posts, etc.)
)
logger.info("Text message processing helper initialized")