diff --git a/app/contacts_cache.py b/app/contacts_cache.py index f2fd39a..0e7383d 100644 --- a/app/contacts_cache.py +++ b/app/contacts_cache.py @@ -1,159 +1,59 @@ """ -Contacts Cache - Persistent storage of all known node names + public keys. +Contacts Cache - DB-backed contact name/key lookup. -Stores every node name ever seen (from device contacts and adverts), -so @mention autocomplete works even for removed contacts. +All contact data is stored in the SQLite contacts table. +JSONL files are no longer used. -File format: JSONL ({device_name}.contacts_cache.jsonl) -Each line: {"public_key": "...", "name": "...", "first_seen": ts, "last_seen": ts, - "source": "advert"|"device", "lat": float, "lon": float, "type_label": "COM"|"REP"|...} +Kept for backward compatibility: get_all_names(), get_all_contacts(), +parse_advert_payload(). """ -import json import logging import math import struct -import time -from pathlib import Path -from threading import Lock - -from app.config import config, runtime_config logger = logging.getLogger(__name__) -_cache_lock = Lock() -_cache: dict = {} # {public_key: {name, first_seen, last_seen, source}} -_cache_loaded = False -_adverts_offset = 0 # File offset for incremental advert scanning +_TYPE_LABELS = {0: 'COM', 1: 'COM', 2: 'REP', 3: 'ROOM', 4: 'SENS'} -def _get_cache_path() -> Path: - device_name = runtime_config.get_device_name() - return Path(config.MC_CONFIG_DIR) / f"{device_name}.contacts_cache.jsonl" - - -def _get_adverts_path() -> Path: - device_name = runtime_config.get_device_name() - return Path(config.MC_CONFIG_DIR) / f"{device_name}.adverts.jsonl" - - -def load_cache() -> dict: - """Load cache from disk into memory. Returns copy of cache dict.""" - global _cache, _cache_loaded - - with _cache_lock: - if _cache_loaded: - return _cache.copy() - - cache_path = _get_cache_path() - _cache = {} - - if not cache_path.exists(): - _cache_loaded = True - logger.info("Contacts cache file does not exist yet") - return _cache.copy() - - try: - with open(cache_path, 'r', encoding='utf-8') as f: - for line in f: - line = line.strip() - if not line: - continue - try: - entry = json.loads(line) - pk = entry.get('public_key', '').lower() - if pk: - # Migrate old "CLI" label to "COM" - if entry.get('type_label') == 'CLI': - entry['type_label'] = 'COM' - _cache[pk] = entry - except json.JSONDecodeError: - continue - - _cache_loaded = True - logger.info(f"Loaded contacts cache: {len(_cache)} entries") - except Exception as e: - logger.error(f"Failed to load contacts cache: {e}") - _cache_loaded = True - - return _cache.copy() - - -def save_cache() -> bool: - """Write full cache to disk (atomic write).""" - with _cache_lock: - cache_path = _get_cache_path() - try: - cache_path.parent.mkdir(parents=True, exist_ok=True) - temp_file = cache_path.with_suffix('.tmp') - with open(temp_file, 'w', encoding='utf-8') as f: - for entry in _cache.values(): - f.write(json.dumps(entry, ensure_ascii=False) + '\n') - temp_file.replace(cache_path) - logger.debug(f"Saved contacts cache: {len(_cache)} entries") - return True - except Exception as e: - logger.error(f"Failed to save contacts cache: {e}") - return False - - -def upsert_contact(public_key: str, name: str, source: str = "advert", - lat: float = 0.0, lon: float = 0.0, type_label: str = "") -> bool: - """Add or update a contact in the cache. Returns True if cache was modified.""" - pk = public_key.lower() - now = int(time.time()) - - with _cache_lock: - existing = _cache.get(pk) - if existing: - changed = False - if name and name != existing.get('name'): - existing['name'] = name - changed = True - # Update lat/lon if new values are non-zero - if lat != 0.0 or lon != 0.0: - if lat != existing.get('lat') or lon != existing.get('lon'): - existing['lat'] = lat - existing['lon'] = lon - changed = True - # Update type_label if provided and not already set - if type_label and type_label != existing.get('type_label'): - existing['type_label'] = type_label - changed = True - existing['last_seen'] = now - return changed - else: - if not name: - return False - entry = { - 'public_key': pk, - 'name': name, - 'first_seen': now, - 'last_seen': now, - 'source': source, - } - if lat != 0.0 or lon != 0.0: - entry['lat'] = lat - entry['lon'] = lon - if type_label: - entry['type_label'] = type_label - _cache[pk] = entry - return True +def _get_db(): + """Get database instance (deferred import to avoid circular imports).""" + from app.main import db + return db def get_all_contacts() -> list: - """Get all cached contacts as a list of dicts (shallow copies).""" - with _cache_lock: - return [entry.copy() for entry in _cache.values()] + """Get all known contacts from DB.""" + try: + db = _get_db() + if db: + contacts = db.get_contacts() + return [{ + 'public_key': c.get('public_key', ''), + 'name': c.get('name', ''), + 'first_seen': c.get('first_seen', ''), + 'last_seen': c.get('last_seen', ''), + 'source': c.get('source', ''), + 'lat': c.get('adv_lat', 0.0) or 0.0, + 'lon': c.get('adv_lon', 0.0) or 0.0, + 'type_label': _TYPE_LABELS.get(c.get('type', 1), 'UNKNOWN'), + } for c in contacts] + except Exception as e: + logger.error(f"Failed to get contacts: {e}") + return [] def get_all_names() -> list: """Get all unique non-empty contact names sorted alphabetically.""" - with _cache_lock: - return sorted(set( - entry['name'] for entry in _cache.values() - if entry.get('name') - )) + try: + db = _get_db() + if db: + contacts = db.get_contacts() + return sorted(set(c.get('name', '') for c in contacts if c.get('name'))) + except Exception as e: + logger.error(f"Failed to get contact names: {e}") + return [] def parse_advert_payload(pkt_payload_hex: str): @@ -208,69 +108,3 @@ def parse_advert_payload(pkt_payload_hex: str): return public_key, node_name if node_name else None, lat, lon except Exception: return None, None, 0.0, 0.0 - - -def scan_new_adverts() -> int: - """ - Scan .adverts.jsonl for new entries since last scan. - Returns number of new/updated contacts. - """ - global _adverts_offset - - adverts_path = _get_adverts_path() - if not adverts_path.exists(): - return 0 - - updated = 0 - try: - with open(adverts_path, 'r', encoding='utf-8') as f: - f.seek(_adverts_offset) - for line in f: - line = line.strip() - if not line: - continue - try: - advert = json.loads(line) - pkt_payload = advert.get('pkt_payload', '') - if not pkt_payload: - continue - pk, name, lat, lon = parse_advert_payload(pkt_payload) - if pk and name: - if upsert_contact(pk, name, source="advert", lat=lat, lon=lon): - updated += 1 - except json.JSONDecodeError: - continue - _adverts_offset = f.tell() - except Exception as e: - logger.error(f"Failed to scan adverts: {e}") - - if updated > 0: - save_cache() - logger.info(f"Contacts cache updated: {updated} new/changed entries") - - return updated - - -_TYPE_LABELS = {1: 'COM', 2: 'REP', 3: 'ROOM', 4: 'SENS'} - - -def initialize_from_device(contacts_detailed: dict): - """ - Seed cache from /api/contacts/detailed response dict. - Called once at startup if cache file doesn't exist. - - Args: - contacts_detailed: dict of {public_key: {adv_name, type, adv_lat, adv_lon, ...}} from meshcli - """ - added = 0 - for pk, details in contacts_detailed.items(): - name = details.get('adv_name', '') - lat = details.get('adv_lat', 0.0) or 0.0 - lon = details.get('adv_lon', 0.0) or 0.0 - type_label = _TYPE_LABELS.get(details.get('type'), '') - if upsert_contact(pk, name, source="device", lat=lat, lon=lon, type_label=type_label): - added += 1 - - if added > 0: - save_cache() - logger.info(f"Initialized contacts cache from device: {added} contacts") diff --git a/app/main.py b/app/main.py index 0dfef8b..210f6d8 100644 --- a/app/main.py +++ b/app/main.py @@ -196,6 +196,25 @@ def _migrate_db_to_pubkey(db, public_key: str): logger.info(f"Database renamed: {current.name} -> {target.name}") +def _cleanup_legacy_jsonl(data_dir: Path): + """Remove stale JSONL files whose data now lives in the database.""" + patterns = [ + '*.contacts_cache.jsonl', + '*.adverts.jsonl', + '*.acks.jsonl', + '*.echoes.jsonl', + '*.path.jsonl', + '*_dm_sent.jsonl', + ] + for pattern in patterns: + for f in data_dir.glob(pattern): + try: + f.unlink() + logger.info(f"Removed legacy file: {f.name}") + except OSError as e: + logger.warning(f"Could not remove {f.name}: {e}") + + def create_app(): """Create and configure Flask application""" global db, device_manager @@ -307,6 +326,9 @@ def create_app(): except Exception as e: logger.error(f"v1 migration failed: {e}") + # Clean up stale JSONL files (data is now in DB) + _cleanup_legacy_jsonl(Path(config.MC_CONFIG_DIR)) + return logger.warning("Timeout waiting for device connection")