diff --git a/repeater/data_acquisition/sqlite_handler.py b/repeater/data_acquisition/sqlite_handler.py index 422067b..2acef95 100644 --- a/repeater/data_acquisition/sqlite_handler.py +++ b/repeater/data_acquisition/sqlite_handler.py @@ -101,6 +101,41 @@ class SQLiteHandler: conn.execute("CREATE INDEX IF NOT EXISTS idx_transport_keys_name ON transport_keys(name)") conn.execute("CREATE INDEX IF NOT EXISTS idx_transport_keys_parent ON transport_keys(parent_id)") + # Room server tables + conn.execute(""" + CREATE TABLE IF NOT EXISTS room_messages ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + room_hash TEXT NOT NULL, + author_pubkey TEXT NOT NULL, + post_timestamp REAL NOT NULL, + sender_timestamp REAL, + message_text TEXT NOT NULL, + txt_type INTEGER NOT NULL, + created_at REAL NOT NULL + ) + """) + + conn.execute(""" + CREATE TABLE IF NOT EXISTS room_client_sync ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + room_hash TEXT NOT NULL, + client_pubkey TEXT NOT NULL, + sync_since REAL NOT NULL DEFAULT 0, + pending_ack_crc INTEGER DEFAULT 0, + push_post_timestamp REAL DEFAULT 0, + ack_timeout_time REAL DEFAULT 0, + push_failures INTEGER DEFAULT 0, + last_activity REAL NOT NULL, + updated_at REAL NOT NULL, + UNIQUE(room_hash, client_pubkey) + ) + """) + + conn.execute("CREATE INDEX IF NOT EXISTS idx_room_messages_room ON room_messages(room_hash, post_timestamp)") + conn.execute("CREATE INDEX IF NOT EXISTS idx_room_messages_author ON room_messages(author_pubkey)") + conn.execute("CREATE INDEX IF NOT EXISTS idx_room_client_sync_room ON room_client_sync(room_hash, client_pubkey)") + conn.execute("CREATE INDEX IF NOT EXISTS idx_room_client_sync_pending ON room_client_sync(pending_ack_crc)") + conn.commit() logger.info(f"SQLite database initialized: {self.sqlite_path}") @@ -868,4 +903,172 @@ class SQLiteHandler: return cursor.rowcount > 0 except Exception as e: logger.error(f"Failed to delete advert: {e}") - return False \ No newline at end of file + return False + + # ------------------------------------------------------------------ + # Room Server Methods + # ------------------------------------------------------------------ + + def insert_room_message(self, room_hash: str, author_pubkey: str, message_text: str, + post_timestamp: float, sender_timestamp: float = None, + txt_type: int = 0) -> Optional[int]: + """Insert a new room message and return its ID.""" + try: + with sqlite3.connect(self.sqlite_path) as conn: + cursor = conn.execute(""" + INSERT INTO room_messages ( + room_hash, author_pubkey, post_timestamp, sender_timestamp, + message_text, txt_type, created_at + ) VALUES (?, ?, ?, ?, ?, ?, ?) + """, ( + room_hash, author_pubkey, post_timestamp, sender_timestamp, + message_text, txt_type, time.time() + )) + return cursor.lastrowid + except Exception as e: + logger.error(f"Failed to insert room message: {e}") + return None + + def get_unsynced_messages(self, room_hash: str, client_pubkey: str, + sync_since: float, limit: int = 100) -> List[Dict]: + """Get messages for a room that client hasn't synced yet.""" + try: + with sqlite3.connect(self.sqlite_path) as conn: + conn.row_factory = sqlite3.Row + cursor = conn.execute(""" + SELECT * FROM room_messages + WHERE room_hash = ? + AND post_timestamp > ? + AND author_pubkey != ? + ORDER BY post_timestamp ASC + LIMIT ? + """, (room_hash, sync_since, client_pubkey, limit)) + return [dict(row) for row in cursor.fetchall()] + except Exception as e: + logger.error(f"Failed to get unsynced messages: {e}") + return [] + + def get_unsynced_count(self, room_hash: str, client_pubkey: str, sync_since: float) -> int: + """Count unsynced messages for a client.""" + try: + with sqlite3.connect(self.sqlite_path) as conn: + cursor = conn.execute(""" + SELECT COUNT(*) FROM room_messages + WHERE room_hash = ? + AND post_timestamp > ? + AND author_pubkey != ? + """, (room_hash, sync_since, client_pubkey)) + return cursor.fetchone()[0] + except Exception as e: + logger.error(f"Failed to count unsynced messages: {e}") + return 0 + + def upsert_client_sync(self, room_hash: str, client_pubkey: str, **kwargs) -> bool: + """Insert or update client sync state.""" + try: + with sqlite3.connect(self.sqlite_path) as conn: + # Check if exists + cursor = conn.execute(""" + SELECT id FROM room_client_sync + WHERE room_hash = ? AND client_pubkey = ? + """, (room_hash, client_pubkey)) + existing = cursor.fetchone() + + kwargs['updated_at'] = time.time() + + if existing: + # Update + set_clauses = [] + values = [] + for key, value in kwargs.items(): + set_clauses.append(f"{key} = ?") + values.append(value) + values.extend([room_hash, client_pubkey]) + + conn.execute(f""" + UPDATE room_client_sync + SET {', '.join(set_clauses)} + WHERE room_hash = ? AND client_pubkey = ? + """, values) + else: + # Insert with defaults + kwargs.setdefault('sync_since', 0) + kwargs.setdefault('pending_ack_crc', 0) + kwargs.setdefault('push_post_timestamp', 0) + kwargs.setdefault('ack_timeout_time', 0) + kwargs.setdefault('push_failures', 0) + kwargs.setdefault('last_activity', time.time()) + + columns = ['room_hash', 'client_pubkey'] + list(kwargs.keys()) + placeholders = ['?'] * len(columns) + values = [room_hash, client_pubkey] + list(kwargs.values()) + + conn.execute(f""" + INSERT INTO room_client_sync ({', '.join(columns)}) + VALUES ({', '.join(placeholders)}) + """, values) + + conn.commit() + return True + except Exception as e: + logger.error(f"Failed to upsert client sync: {e}") + return False + + def get_client_sync(self, room_hash: str, client_pubkey: str) -> Optional[Dict]: + """Get client sync state.""" + try: + with sqlite3.connect(self.sqlite_path) as conn: + conn.row_factory = sqlite3.Row + cursor = conn.execute(""" + SELECT * FROM room_client_sync + WHERE room_hash = ? AND client_pubkey = ? + """, (room_hash, client_pubkey)) + row = cursor.fetchone() + return dict(row) if row else None + except Exception as e: + logger.error(f"Failed to get client sync: {e}") + return None + + def get_all_room_clients(self, room_hash: str) -> List[Dict]: + """Get all clients for a room.""" + try: + with sqlite3.connect(self.sqlite_path) as conn: + conn.row_factory = sqlite3.Row + cursor = conn.execute(""" + SELECT * FROM room_client_sync + WHERE room_hash = ? + ORDER BY last_activity DESC + """, (room_hash,)) + return [dict(row) for row in cursor.fetchall()] + except Exception as e: + logger.error(f"Failed to get room clients: {e}") + return [] + + def cleanup_old_messages(self, room_hash: str, keep_count: int = 32) -> int: + """Keep only the most recent N messages per room.""" + try: + with sqlite3.connect(self.sqlite_path) as conn: + # First check if cleanup is needed + cursor = conn.execute(""" + SELECT COUNT(*) FROM room_messages WHERE room_hash = ? + """, (room_hash,)) + total_count = cursor.fetchone()[0] + + if total_count <= keep_count: + return 0 # No cleanup needed + + # Delete old messages + cursor = conn.execute(""" + DELETE FROM room_messages + WHERE room_hash = ? + AND id NOT IN ( + SELECT id FROM room_messages + WHERE room_hash = ? + ORDER BY post_timestamp DESC + LIMIT ? + ) + """, (room_hash, room_hash, keep_count)) + return cursor.rowcount + except Exception as e: + logger.error(f"Failed to cleanup old messages: {e}") + return 0 diff --git a/repeater/handler_helpers/login.py b/repeater/handler_helpers/login.py index f7d3030..e57dd55 100644 --- a/repeater/handler_helpers/login.py +++ b/repeater/handler_helpers/login.py @@ -76,12 +76,13 @@ class LoginHelper: logger.info(f"Created ACL for {identity_type} '{name}': hash=0x{hash_byte:02X}") # Create auth callback that uses this identity's ACL - def auth_callback_with_context(client_identity, shared_secret, password, timestamp): + def auth_callback_with_context(client_identity, shared_secret, password, timestamp, sync_since=None): return identity_acl.authenticate_client( client_identity=client_identity, shared_secret=shared_secret, password=password, timestamp=timestamp, + sync_since=sync_since, target_identity_hash=hash_byte, target_identity_name=name, target_identity_config=config diff --git a/repeater/handler_helpers/room_server.py b/repeater/handler_helpers/room_server.py new file mode 100644 index 0000000..89a0790 --- /dev/null +++ b/repeater/handler_helpers/room_server.py @@ -0,0 +1,605 @@ +import asyncio +import logging +import time +from typing import Optional, Dict + +from pymc_core.protocol import PacketBuilder, CryptoUtils +from pymc_core.protocol.constants import PAYLOAD_TYPE_TXT_MSG + +logger = logging.getLogger("RoomServer") + +# Hard limit from C++ simple_room_server +MAX_UNSYNCED_POSTS = 32 + +# Text message type constants +TXT_TYPE_PLAIN = 0x00 +TXT_TYPE_CLI_DATA = 0x01 +TXT_TYPE_SIGNED_PLAIN = 0x02 + +# Push timing constants (from C++ simple_room_server) +PUSH_NOTIFY_DELAY_MS = 2000 +SYNC_PUSH_INTERVAL_MS = 1200 +POST_SYNC_DELAY_SECS = 6 +PUSH_ACK_TIMEOUT_FLOOD_MS = 12000 +PUSH_TIMEOUT_BASE_MS = 4000 +PUSH_ACK_TIMEOUT_FACTOR_MS = 2000 + +# Safety limits and protections +MAX_MESSAGE_LENGTH = 160 # Match C++ MAX_POST_TEXT_LEN (151 bytes for text) +MAX_POSTS_PER_CLIENT_PER_MINUTE = 10 # Prevent spam +MAX_CLIENTS_PER_ROOM = 50 # From ACL default +MAX_PUSH_FAILURES = 3 # Evict after this many consecutive failures +INACTIVE_CLIENT_TIMEOUT = 3600 # Evict after 1 hour inactivity (seconds) +MAX_CONSECUTIVE_SYNC_ERRORS = 10 # Circuit breaker threshold +DB_ERROR_RETRY_DELAY = 60 # Wait 1 minute on DB error (seconds) + +# Backoff strategy for failed pushes (seconds) +RETRY_BACKOFF_SCHEDULE = [0, 30, 300, 3600] # 0s, 30s, 5min, 1hr + +# Global rate limiter (shared across all rooms) +_global_push_limiter = None +_global_push_lock = asyncio.Lock() +GLOBAL_MIN_GAP_BETWEEN_MESSAGES = 1.1 # 1.1s minimum gap between transmissions + + +class GlobalRateLimiter: + + def __init__(self, min_gap_seconds: float = 0.1): + self.min_gap = min_gap_seconds # Minimum gap between consecutive messages + self.lock = asyncio.Lock() # Only one transmission at a time + self.last_release_time = 0 + + async def acquire(self): + + async with self.lock: + # Enforce minimum gap between consecutive transmissions + now = time.time() + time_since_last = now - self.last_release_time + if time_since_last < self.min_gap: + wait_time = self.min_gap - time_since_last + logger.debug(f"Global rate limiter: waiting {wait_time*1000:.0f}ms") + await asyncio.sleep(wait_time) + # Lock is now held - caller can transmit + # Will be released when context exits + + def release(self): + self.last_release_time = time.time() + + +class RoomServer: + + def __init__( + self, + room_hash: int, + room_name: str, + local_identity, + sqlite_handler, + packet_injector, + acl, + max_posts: int = 32 + ): + + self.room_hash = room_hash + self.room_name = room_name + self.local_identity = local_identity + self.db = sqlite_handler + self.packet_injector = packet_injector + self.acl = acl + + # Enforce hard limit (match C++ MAX_UNSYNCED_POSTS) + if max_posts > MAX_UNSYNCED_POSTS: + logger.warning( + f"Room '{room_name}': max_posts={max_posts} exceeds hard limit " + f"of {MAX_UNSYNCED_POSTS}, capping to {MAX_UNSYNCED_POSTS}" + ) + max_posts = MAX_UNSYNCED_POSTS + self.max_posts = max_posts + + # Round-robin state + self.next_client_idx = 0 + self.next_push_time = 0 + + # Cleanup tracking + self.last_cleanup_time = time.time() + self.cleanup_interval = 600 # Cleanup every 10 minutes + + # Safety and monitoring + self.client_post_times = {} # Track last N post times per client for rate limiting + self.consecutive_sync_errors = 0 # Circuit breaker counter + self.last_eviction_check = time.time() + self.eviction_check_interval = 300 # Check every 5 minutes + + # Initialize global rate limiter (singleton) + global _global_push_limiter + if _global_push_limiter is None: + _global_push_limiter = GlobalRateLimiter(GLOBAL_MIN_GAP_BETWEEN_MESSAGES) + self.global_limiter = _global_push_limiter + + # Background task handle + self._sync_task = None + self._running = False + + logger.info( + f"RoomServer initialized: name='{room_name}', " + f"hash=0x{room_hash:02X}, max_posts={max_posts}" + ) + + async def start(self): + if self._running: + logger.warning(f"Room '{self.room_name}' sync loop already running") + return + + self._running = True + self._sync_task = asyncio.create_task(self._sync_loop()) + logger.info(f"Room '{self.room_name}' sync loop started") + + async def stop(self): + self._running = False + if self._sync_task: + self._sync_task.cancel() + try: + await self._sync_task + except asyncio.CancelledError: + pass + logger.info(f"Room '{self.room_name}' sync loop stopped") + + async def add_post( + self, + client_pubkey: bytes, + message_text: str, + sender_timestamp: int, + txt_type: int = TXT_TYPE_PLAIN + ) -> bool: + + try: + # SAFETY: Validate message length + if len(message_text) > MAX_MESSAGE_LENGTH: + logger.warning( + f"Room '{self.room_name}': Message from {client_pubkey[:4].hex()} " + f"exceeds max length ({len(message_text)} > {MAX_MESSAGE_LENGTH}), truncating" + ) + message_text = message_text[:MAX_MESSAGE_LENGTH] + + # SAFETY: Rate limit per client + client_key = client_pubkey.hex() + now = time.time() + + if client_key not in self.client_post_times: + self.client_post_times[client_key] = [] + + # Remove timestamps older than 1 minute + self.client_post_times[client_key] = [ + t for t in self.client_post_times[client_key] + if now - t < 60 + ] + + # Check rate limit + if len(self.client_post_times[client_key]) >= MAX_POSTS_PER_CLIENT_PER_MINUTE: + logger.warning( + f"Room '{self.room_name}': Client {client_pubkey[:4].hex()} " + f"exceeded rate limit ({MAX_POSTS_PER_CLIENT_PER_MINUTE} posts/min), dropping message" + ) + return False + + # Record this post time + self.client_post_times[client_key].append(now) + + # Use our RTC time for post_timestamp + post_timestamp = time.time() + + # Store to database + msg_id = self.db.insert_room_message( + room_hash=f"0x{self.room_hash:02X}", + author_pubkey=client_pubkey.hex(), + message_text=message_text, + post_timestamp=post_timestamp, + sender_timestamp=sender_timestamp, + txt_type=txt_type + ) + + if msg_id: + logger.info( + f"Room '{self.room_name}': New post #{msg_id} from " + f"{client_pubkey[:4].hex()}: {message_text[:50]}" + ) + + # Trigger push notification + self.next_push_time = time.time() + (PUSH_NOTIFY_DELAY_MS / 1000.0) + + return True + else: + logger.error(f"Failed to store message to database") + return False + + except Exception as e: + logger.error(f"Error adding post: {e}", exc_info=True) + return False + + async def push_post_to_client(self, client_info, post: Dict) -> bool: + + try: + # SAFETY: Global transmission lock - only ONE message on radio at a time + # This is critical because LoRa is serial (0.5-9s airtime per message) + await self.global_limiter.acquire() + + # SAFETY: Check client failure backoff + sync_state = self.db.get_client_sync( + room_hash=f"0x{self.room_hash:02X}", + client_pubkey=client_info.id.get_public_key().hex() + ) + + if sync_state: + failures = sync_state.get('push_failures', 0) + if failures > 0: + # Apply exponential backoff + backoff_idx = min(failures, len(RETRY_BACKOFF_SCHEDULE) - 1) + backoff_delay = RETRY_BACKOFF_SCHEDULE[backoff_idx] + last_failure_time = sync_state.get('updated_at', 0) + time_since_failure = time.time() - last_failure_time + + if time_since_failure < backoff_delay: + wait_time = backoff_delay - time_since_failure + logger.debug( + f"Room '{self.room_name}': Client 0x{client_info.id.get_public_key()[0]:02X} " + f"in backoff (failure {failures}), waiting {wait_time:.0f}s" + ) + return False # Skip this client for now + + # Build message payload + timestamp = int(time.time()) + flags = (TXT_TYPE_SIGNED_PLAIN << 2) # Include author prefix + + # Author prefix (first 4 bytes of pubkey) + author_pubkey = bytes.fromhex(post['author_pubkey']) + author_prefix = author_pubkey[:4] + + # Plaintext: timestamp(4) + flags(1) + author_prefix(4) + text + message_bytes = post['message_text'].encode('utf-8') + plaintext = ( + timestamp.to_bytes(4, 'little') + + bytes([flags]) + + author_prefix + + message_bytes + ) + + # Calculate expected ACK (same algorithm as pymc_core) + attempt = 0 + pack_data = PacketBuilder._pack_timestamp_data(timestamp, attempt, message_bytes) + ack_hash = CryptoUtils.sha256(pack_data + client_info.id.get_public_key())[:4] + expected_ack_crc = int.from_bytes(ack_hash, 'little') + + # Determine routing based on stored out_path + route_type = "flood" if client_info.out_path_len < 0 else "direct" + + # Create datagram + packet = PacketBuilder.create_datagram( + ptype=PAYLOAD_TYPE_TXT_MSG, + dest=client_info.id, + local_identity=self.local_identity, + secret=client_info.shared_secret, + plaintext=plaintext, + route_type=route_type + ) + + # Add stored path for direct routing + if route_type == "direct" and len(client_info.out_path) > 0: + packet.path = bytearray(client_info.out_path[:client_info.out_path_len]) + packet.path_len = client_info.out_path_len + + # Calculate ACK timeout + if route_type == "flood": + ack_timeout = PUSH_ACK_TIMEOUT_FLOOD_MS / 1000.0 + else: + path_len = client_info.out_path_len if client_info.out_path_len >= 0 else 0 + ack_timeout = (PUSH_TIMEOUT_BASE_MS + PUSH_ACK_TIMEOUT_FACTOR_MS * (path_len + 1)) / 1000.0 + + # Update client sync state with pending ACK + self.db.upsert_client_sync( + room_hash=f"0x{self.room_hash:02X}", + client_pubkey=client_info.id.get_public_key().hex(), + pending_ack_crc=expected_ack_crc, + push_post_timestamp=post['post_timestamp'], + ack_timeout_time=time.time() + ack_timeout + ) + # Send packet (dispatcher will track ACK automatically) + # This blocks for the entire transmission duration (0.5-9 seconds) + success = await self.packet_injector(packet, wait_for_ack=True) + + # SAFETY: Release transmission lock AFTER send completes + self.global_limiter.release() + + if success: + # ACK received! Update sync state + await self._handle_ack_received( + client_info.id.get_public_key(), + post['post_timestamp'] + ) + logger.info( + f"Room '{self.room_name}': Pushed post to " + f"0x{client_info.id.get_public_key()[0]:02X} via {route_type.upper()}, ACK received" + ) + else: + # ACK timeout + await self._handle_ack_timeout(client_info.id.get_public_key()) + logger.warning( + f"Room '{self.room_name}': Push to " + f"0x{client_info.id.get_public_key()[0]:02X} timed out" + ) + + return success + + except Exception as e: + logger.error(f"Error pushing post to client: {e}", exc_info=True) + return False + + async def _handle_ack_received(self, client_pubkey: bytes, post_timestamp: float): + + try: + # Update sync state: advance sync_since, clear pending_ack, reset failures + self.db.upsert_client_sync( + room_hash=f"0x{self.room_hash:02X}", + client_pubkey=client_pubkey.hex(), + sync_since=post_timestamp, + pending_ack_crc=0, + push_failures=0, + last_activity=time.time() + ) + except Exception as e: + logger.error(f"Error handling ACK received: {e}") + + async def _handle_ack_timeout(self, client_pubkey: bytes): + try: + # Get current sync state + sync_state = self.db.get_client_sync( + room_hash=f"0x{self.room_hash:02X}", + client_pubkey=client_pubkey.hex() + ) + + if sync_state: + # Increment failure counter, clear pending_ack + failures = sync_state.get('push_failures', 0) + 1 + self.db.upsert_client_sync( + room_hash=f"0x{self.room_hash:02X}", + client_pubkey=client_pubkey.hex(), + push_failures=failures, + pending_ack_crc=0 + ) + + if failures >= 3: + logger.warning( + f"Room '{self.room_name}': Client 0x{client_pubkey[0]:02X} " + f"has {failures} consecutive failures" + ) + except Exception as e: + logger.error(f"Error handling ACK timeout: {e}") + + def get_unsynced_count(self, client_pubkey: bytes) -> int: + try: + # Get client's sync state + sync_state = self.db.get_client_sync( + room_hash=f"0x{self.room_hash:02X}", + client_pubkey=client_pubkey.hex() + ) + + sync_since = sync_state['sync_since'] if sync_state else 0 + + return self.db.get_unsynced_count( + room_hash=f"0x{self.room_hash:02X}", + client_pubkey=client_pubkey.hex(), + sync_since=sync_since + ) + except Exception as e: + logger.error(f"Error getting unsynced count: {e}") + return 0 + + async def _evict_failed_clients(self): + try: + now = time.time() + all_sync_states = self.db.get_all_room_clients(f"0x{self.room_hash:02X}") + + for sync_state in all_sync_states: + client_pubkey_hex = sync_state['client_pubkey'] + push_failures = sync_state.get('push_failures', 0) + last_activity = sync_state.get('last_activity', 0) + + evict = False + reason = "" + + # Check max failures + if push_failures >= MAX_PUSH_FAILURES: + evict = True + reason = f"max failures ({push_failures})" + + # Check inactivity timeout + elif now - last_activity > INACTIVE_CLIENT_TIMEOUT: + evict = True + reason = f"inactive for {(now - last_activity) / 60:.0f} minutes" + + if evict: + # Remove from database + self.db.upsert_client_sync( + room_hash=f"0x{self.room_hash:02X}", + client_pubkey=client_pubkey_hex, + last_activity=0 # Mark as evicted + ) + + # Remove from ACL + client_pubkey = bytes.fromhex(client_pubkey_hex) + self.acl.remove_client(client_pubkey) + + logger.info( + f"Room '{self.room_name}': Evicted client " + f"0x{client_pubkey[0]:02X} ({reason})" + ) + + except Exception as e: + logger.error(f"Error evicting failed clients: {e}", exc_info=True) + + async def _sync_loop(self): + + # SAFETY: Stagger room startup to prevent thundering herd + import random + startup_delay = random.uniform(0, 5) # 0-5 second random delay + await asyncio.sleep(startup_delay) + + logger.info(f"Room '{self.room_name}' sync loop starting (delayed {startup_delay:.1f}s)") + + while self._running: + try: + await asyncio.sleep(SYNC_PUSH_INTERVAL_MS / 1000.0) + + # SAFETY: Circuit breaker - stop if too many consecutive errors + if self.consecutive_sync_errors >= MAX_CONSECUTIVE_SYNC_ERRORS: + logger.error( + f"Room '{self.room_name}': Circuit breaker tripped! " + f"{self.consecutive_sync_errors} consecutive errors. Pausing for {DB_ERROR_RETRY_DELAY}s" + ) + await asyncio.sleep(DB_ERROR_RETRY_DELAY) + self.consecutive_sync_errors = 0 # Reset after pause + continue + + # SAFETY: Periodic eviction check (every 5 minutes) + if time.time() - self.last_eviction_check > self.eviction_check_interval: + await self._evict_failed_clients() + self.last_eviction_check = time.time() + + # Periodic cleanup check (every 10 minutes) + if time.time() - self.last_cleanup_time > self.cleanup_interval: + await self._cleanup_old_messages() + self.last_cleanup_time = time.time() + + # Check if it's time to push + if time.time() < self.next_push_time: + continue + + # Get all clients for this room + all_clients = self.acl.get_all_clients() + if not all_clients: + self.next_push_time = time.time() + 1.0 # Check again in 1 second + continue + + # SAFETY: Limit number of clients + if len(all_clients) > MAX_CLIENTS_PER_ROOM: + logger.warning( + f"Room '{self.room_name}': Too many clients ({len(all_clients)} > {MAX_CLIENTS_PER_ROOM})" + ) + all_clients = all_clients[:MAX_CLIENTS_PER_ROOM] + + # Check for ACK timeouts first + await self._check_ack_timeouts() + + # Round-robin: get next client + if self.next_client_idx >= len(all_clients): + self.next_client_idx = 0 + + client = all_clients[self.next_client_idx] + self.next_client_idx = (self.next_client_idx + 1) % len(all_clients) + + # Get client sync state + sync_state = self.db.get_client_sync( + room_hash=f"0x{self.room_hash:02X}", + client_pubkey=client.id.get_public_key().hex() + ) + + # Skip if already waiting for ACK, evicted, or max failures + if sync_state: + pending_ack = sync_state.get('pending_ack_crc', 0) + last_activity = sync_state.get('last_activity', 0) + push_failures = sync_state.get('push_failures', 0) + + if pending_ack != 0: + logger.debug(f"Skipping client 0x{client.id.get_public_key()[0]:02X} (waiting for ACK)") + self.next_push_time = time.time() + (SYNC_PUSH_INTERVAL_MS / 8000.0) + continue + + if last_activity == 0: + logger.debug(f"Skipping client 0x{client.id.get_public_key()[0]:02X} (evicted)") + continue + + if push_failures >= 3: + logger.debug(f"Skipping client 0x{client.id.get_public_key()[0]:02X} (max failures)") + continue + + sync_since = sync_state.get('sync_since', 0) + else: + # Initialize sync state for new client + # Use sync_since from ACL client (sent during login) if available + sync_since = client.sync_since if hasattr(client, 'sync_since') else 0 + logger.info( + f"Room '{self.room_name}': Initializing client " + f"0x{client.id.get_public_key()[0]:02X} with sync_since={sync_since}" + ) + self.db.upsert_client_sync( + room_hash=f"0x{self.room_hash:02X}", + client_pubkey=client.id.get_public_key().hex(), + sync_since=sync_since, + last_activity=time.time() + ) + + # Find next unsynced message for this client + unsynced = self.db.get_unsynced_messages( + room_hash=f"0x{self.room_hash:02X}", + client_pubkey=client.id.get_public_key().hex(), + sync_since=sync_since, + limit=1 + ) + + if unsynced: + post = unsynced[0] + # Check if enough time has passed since post creation + now = time.time() + if now >= post['post_timestamp'] + POST_SYNC_DELAY_SECS: + # Push this post + await self.push_post_to_client(client, post) + self.next_push_time = time.time() + (SYNC_PUSH_INTERVAL_MS / 1000.0) + else: + # Not ready yet, check sooner + self.next_push_time = time.time() + (SYNC_PUSH_INTERVAL_MS / 8000.0) + else: + # No unsynced posts, check next client sooner + self.next_push_time = time.time() + (SYNC_PUSH_INTERVAL_MS / 8000.0) + + # SAFETY: Reset error counter on successful iteration + self.consecutive_sync_errors = 0 + + except asyncio.CancelledError: + break + except Exception as e: + # SAFETY: Track consecutive errors for circuit breaker + self.consecutive_sync_errors += 1 + logger.error( + f"Room '{self.room_name}': Sync loop error #{self.consecutive_sync_errors}: {e}", + exc_info=True + ) + + # SAFETY: Back off on errors + backoff = min(self.consecutive_sync_errors, 10) # Cap at 10 seconds + await asyncio.sleep(backoff) + + logger.info(f"Room '{self.room_name}' sync loop stopped") + + async def _check_ack_timeouts(self): + try: + now = time.time() + all_sync_states = self.db.get_all_room_clients(f"0x{self.room_hash:02X}") + + for sync_state in all_sync_states: + if sync_state['pending_ack_crc'] != 0: + timeout_time = sync_state.get('ack_timeout_time', 0) + if now >= timeout_time: + # ACK timeout + client_pubkey = bytes.fromhex(sync_state['client_pubkey']) + await self._handle_ack_timeout(client_pubkey) + except Exception as e: + logger.error(f"Error checking ACK timeouts: {e}") + + async def _cleanup_old_messages(self): + try: + deleted = self.db.cleanup_old_messages( + room_hash=f"0x{self.room_hash:02X}", + keep_count=self.max_posts + ) + if deleted > 0: + logger.info(f"Room '{self.room_name}': Cleaned up {deleted} old messages") + except Exception as e: + logger.error(f"Error cleaning up old messages: {e}") diff --git a/repeater/handler_helpers/text.py b/repeater/handler_helpers/text.py index 864b623..fa24193 100644 --- a/repeater/handler_helpers/text.py +++ b/repeater/handler_helpers/text.py @@ -8,9 +8,11 @@ Also handles CLI commands for admin users on the repeater identity. import asyncio import logging +import struct from pymc_core.node.handlers.text import TextMessageHandler from .repeater_cli import RepeaterCLI +from .room_server import RoomServer logger = logging.getLogger("TextHelper") @@ -18,16 +20,21 @@ logger = logging.getLogger("TextHelper") class TextHelper: def __init__(self, identity_manager, packet_injector=None, acl_dict=None, log_fn=None, - config_path: str = None, config: dict = None, save_config_callback=None): + config_path: str = None, config: dict = None, save_config_callback=None, + sqlite_handler=None): self.identity_manager = identity_manager self.packet_injector = packet_injector self.log_fn = log_fn or logger.info self.acl_dict = acl_dict or {} # Per-identity ACLs keyed by hash_byte + self.sqlite_handler = sqlite_handler # For room server database operations # Dictionary of handlers keyed by dest_hash self.handlers = {} + # Dictionary of room servers keyed by dest_hash + self.room_servers = {} + # Track repeater identity for CLI commands self.repeater_hash = None @@ -79,6 +86,44 @@ class TextHelper: self.repeater_hash = hash_byte logger.info(f"Set repeater hash for CLI: 0x{hash_byte:02X}") + # Create RoomServer instance for room_server identities + if identity_type == "room_server" and self.sqlite_handler: + try: + from .room_server import MAX_UNSYNCED_POSTS + + room_config = radio_config or {} + max_posts = room_config.get('max_posts', MAX_UNSYNCED_POSTS) + + # Enforce hard limit + if max_posts > MAX_UNSYNCED_POSTS: + logger.warning( + f"Room '{name}': Configured max_posts={max_posts} exceeds hard limit " + f"of {MAX_UNSYNCED_POSTS}, capping to {MAX_UNSYNCED_POSTS}" + ) + max_posts = MAX_UNSYNCED_POSTS + + room_server = RoomServer( + room_hash=hash_byte, + room_name=name, + local_identity=identity, + sqlite_handler=self.sqlite_handler, + packet_injector=self.packet_injector, + acl=identity_acl, + max_posts=max_posts + ) + + self.room_servers[hash_byte] = room_server + + # Start sync loop + asyncio.create_task(room_server.start()) + + logger.info( + f"Registered room server '{name}': hash=0x{hash_byte:02X}, " + f"max_posts={max_posts}" + ) + except Exception as e: + logger.error(f"Failed to create room server '{name}': {e}", exc_info=True) + logger.info( f"Registered {identity_type} '{name}' text handler: hash=0x{hash_byte:02X}" ) @@ -171,6 +216,43 @@ class TextHelper: f"[{identity_type}:{identity_name}] Message: {message_text}" ) + # Handle room server messages - store to database + if identity_type == "room_server" and dest_hash in self.room_servers: + try: + room_server = self.room_servers[dest_hash] + + # Get sender's full public key from ACL + identity_acl = self.acl_dict.get(dest_hash) + sender_pubkey = bytes([src_hash]) + b'\x00' * 31 # Default + if identity_acl: + for client_info in identity_acl.get_all_clients(): + if client_info.id.get_public_key()[0] == src_hash: + sender_pubkey = client_info.id.get_public_key() + break + + # Extract timestamp and txt_type from decrypted data + # Packet decryption already happened in TextMessageHandler + # We need to extract from original payload if available + sender_timestamp = int(packet.decrypted.get('timestamp', 0)) if hasattr(packet, 'decrypted') else 0 + txt_type = 0 # TXT_TYPE_PLAIN by default + + # Store message to room database + await room_server.add_post( + client_pubkey=sender_pubkey, + message_text=message_text, + sender_timestamp=sender_timestamp, + txt_type=txt_type + ) + + logger.info( + f"Room '{identity_name}': Stored message from 0x{src_hash:02X}" + ) + except Exception as e: + logger.error(f"Failed to store room message: {e}", exc_info=True) + + # Room messages don't need further processing + return + # Check if this is a CLI command to the repeater (AFTER decryption) if dest_hash == self.repeater_hash and self.cli and self._is_cli_command(message_text): try: @@ -235,6 +317,17 @@ class TextHelper: for hash_byte, info in self.handlers.items() ] + async def cleanup(self): + """Cleanup room servers and handlers.""" + # Stop all room server sync loops + for room_server in self.room_servers.values(): + try: + await room_server.stop() + except Exception as e: + logger.error(f"Error stopping room server: {e}") + + logger.info("TextHelper cleanup complete") + def _is_cli_command(self, message: str) -> bool: """Check if message looks like a CLI command.""" # Strip optional sequence prefix (XX|) diff --git a/repeater/main.py b/repeater/main.py index 5c4d7a0..971cc1b 100644 --- a/repeater/main.py +++ b/repeater/main.py @@ -196,6 +196,7 @@ class RepeaterDaemon: config_path=getattr(self, 'config_path', None), # For CLI to save changes config=self.config, # For CLI to read/modify settings save_config_callback=lambda: self._save_config(getattr(self, 'config_path', '/tmp/config.yaml')), # For CLI to persist changes + sqlite_handler=self.repeater_handler.storage.sqlite if self.repeater_handler and self.repeater_handler.storage else None, # For room server database ) # Register default repeater identity for text messages @@ -212,7 +213,7 @@ class RepeaterDaemon: name=name, identity=identity, identity_type="room_server", - radio_config=self.config.get("radio", {}) + radio_config=config # Pass room-specific config (includes max_posts, etc.) ) logger.info("Text message processing helper initialized")