""" Contacts Cache - Persistent storage of all known node names + public keys. Stores every node name ever seen (from device contacts and adverts), so @mention autocomplete works even for removed contacts. File format: JSONL ({device_name}.contacts_cache.jsonl) Each line: {"public_key": "...", "name": "...", "first_seen": ts, "last_seen": ts, "source": "advert"|"device", "lat": float, "lon": float, "type_label": "CLI"|"REP"|...} """ import json import logging import math import struct import time from pathlib import Path from threading import Lock from app.config import config, runtime_config logger = logging.getLogger(__name__) _cache_lock = Lock() _cache: dict = {} # {public_key: {name, first_seen, last_seen, source}} _cache_loaded = False _adverts_offset = 0 # File offset for incremental advert scanning def _get_cache_path() -> Path: device_name = runtime_config.get_device_name() return Path(config.MC_CONFIG_DIR) / f"{device_name}.contacts_cache.jsonl" def _get_adverts_path() -> Path: device_name = runtime_config.get_device_name() return Path(config.MC_CONFIG_DIR) / f"{device_name}.adverts.jsonl" def load_cache() -> dict: """Load cache from disk into memory. Returns copy of cache dict.""" global _cache, _cache_loaded with _cache_lock: if _cache_loaded: return _cache.copy() cache_path = _get_cache_path() _cache = {} if not cache_path.exists(): _cache_loaded = True logger.info("Contacts cache file does not exist yet") return _cache.copy() try: with open(cache_path, 'r', encoding='utf-8') as f: for line in f: line = line.strip() if not line: continue try: entry = json.loads(line) pk = entry.get('public_key', '').lower() if pk: _cache[pk] = entry except json.JSONDecodeError: continue _cache_loaded = True logger.info(f"Loaded contacts cache: {len(_cache)} entries") except Exception as e: logger.error(f"Failed to load contacts cache: {e}") _cache_loaded = True return _cache.copy() def save_cache() -> bool: """Write full cache to disk (atomic write).""" with _cache_lock: cache_path = _get_cache_path() try: cache_path.parent.mkdir(parents=True, exist_ok=True) temp_file = cache_path.with_suffix('.tmp') with open(temp_file, 'w', encoding='utf-8') as f: for entry in _cache.values(): f.write(json.dumps(entry, ensure_ascii=False) + '\n') temp_file.replace(cache_path) logger.debug(f"Saved contacts cache: {len(_cache)} entries") return True except Exception as e: logger.error(f"Failed to save contacts cache: {e}") return False def upsert_contact(public_key: str, name: str, source: str = "advert", lat: float = 0.0, lon: float = 0.0, type_label: str = "") -> bool: """Add or update a contact in the cache. Returns True if cache was modified.""" pk = public_key.lower() now = int(time.time()) with _cache_lock: existing = _cache.get(pk) if existing: changed = False if name and name != existing.get('name'): existing['name'] = name changed = True # Update lat/lon if new values are non-zero if lat != 0.0 or lon != 0.0: if lat != existing.get('lat') or lon != existing.get('lon'): existing['lat'] = lat existing['lon'] = lon changed = True # Update type_label if provided and not already set if type_label and type_label != existing.get('type_label'): existing['type_label'] = type_label changed = True existing['last_seen'] = now return changed else: if not name: return False entry = { 'public_key': pk, 'name': name, 'first_seen': now, 'last_seen': now, 'source': source, } if lat != 0.0 or lon != 0.0: entry['lat'] = lat entry['lon'] = lon if type_label: entry['type_label'] = type_label _cache[pk] = entry return True def get_all_contacts() -> list: """Get all cached contacts as a list of dicts (shallow copies).""" with _cache_lock: return [entry.copy() for entry in _cache.values()] def get_all_names() -> list: """Get all unique non-empty contact names sorted alphabetically.""" with _cache_lock: return sorted(set( entry['name'] for entry in _cache.values() if entry.get('name') )) def parse_advert_payload(pkt_payload_hex: str): """ Parse advert pkt_payload to extract public_key, node_name, and GPS coordinates. Layout of pkt_payload (byte offsets): [0:32] Public Key (32 bytes = 64 hex chars) [32:36] Timestamp (4 bytes) [36:100] Signature (64 bytes) [100] App Flags (1 byte) - bit 4: Location, bit 7: Name [101+] If Location (bit 4): Lat (4 bytes, LE int32/1e6) + Lon (4 bytes, LE int32/1e6) If Name (bit 7): Node name (UTF-8, variable length) Returns: (public_key_hex, node_name, lat, lon) or (None, None, 0, 0) on failure """ try: raw = bytes.fromhex(pkt_payload_hex) if len(raw) < 101: return None, None, 0.0, 0.0 public_key = pkt_payload_hex[:64].lower() app_flags = raw[100] has_location = bool(app_flags & 0x10) # bit 4 has_name = bool(app_flags & 0x80) # bit 7 lat, lon = 0.0, 0.0 name_offset = 101 if has_location: if len(raw) >= 109: lat_i, lon_i = struct.unpack('= len(raw): return public_key, None, lat, lon name_bytes = raw[name_offset:] node_name = name_bytes.decode('utf-8', errors='replace').rstrip('\x00') return public_key, node_name if node_name else None, lat, lon except Exception: return None, None, 0.0, 0.0 def scan_new_adverts() -> int: """ Scan .adverts.jsonl for new entries since last scan. Returns number of new/updated contacts. """ global _adverts_offset adverts_path = _get_adverts_path() if not adverts_path.exists(): return 0 updated = 0 try: with open(adverts_path, 'r', encoding='utf-8') as f: f.seek(_adverts_offset) for line in f: line = line.strip() if not line: continue try: advert = json.loads(line) pkt_payload = advert.get('pkt_payload', '') if not pkt_payload: continue pk, name, lat, lon = parse_advert_payload(pkt_payload) if pk and name: if upsert_contact(pk, name, source="advert", lat=lat, lon=lon): updated += 1 except json.JSONDecodeError: continue _adverts_offset = f.tell() except Exception as e: logger.error(f"Failed to scan adverts: {e}") if updated > 0: save_cache() logger.info(f"Contacts cache updated: {updated} new/changed entries") return updated _TYPE_LABELS = {1: 'CLI', 2: 'REP', 3: 'ROOM', 4: 'SENS'} def initialize_from_device(contacts_detailed: dict): """ Seed cache from /api/contacts/detailed response dict. Called once at startup if cache file doesn't exist. Args: contacts_detailed: dict of {public_key: {adv_name, type, adv_lat, adv_lon, ...}} from meshcli """ added = 0 for pk, details in contacts_detailed.items(): name = details.get('adv_name', '') lat = details.get('adv_lat', 0.0) or 0.0 lon = details.get('adv_lon', 0.0) or 0.0 type_label = _TYPE_LABELS.get(details.get('type'), '') if upsert_contact(pk, name, source="device", lat=lat, lon=lon, type_label=type_label): added += 1 if added > 0: save_cache() logger.info(f"Initialized contacts cache from device: {added} contacts")