refactor: eliminate JSONL companion files, delegate to DB

Remove contacts_cache.jsonl and adverts.jsonl file I/O — all contact
data is already in the SQLite contacts/advertisements tables. Clean up
stale JSONL files (acks, echoes, path, dm_sent) at startup.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
MarekWo
2026-03-24 08:16:41 +01:00
parent 2a9f90c01d
commit 5ccd882c5a
2 changed files with 58 additions and 202 deletions

View File

@@ -1,159 +1,59 @@
""" """
Contacts Cache - Persistent storage of all known node names + public keys. Contacts Cache - DB-backed contact name/key lookup.
Stores every node name ever seen (from device contacts and adverts), All contact data is stored in the SQLite contacts table.
so @mention autocomplete works even for removed contacts. JSONL files are no longer used.
File format: JSONL ({device_name}.contacts_cache.jsonl) Kept for backward compatibility: get_all_names(), get_all_contacts(),
Each line: {"public_key": "...", "name": "...", "first_seen": ts, "last_seen": ts, parse_advert_payload().
"source": "advert"|"device", "lat": float, "lon": float, "type_label": "COM"|"REP"|...}
""" """
import json
import logging import logging
import math import math
import struct import struct
import time
from pathlib import Path
from threading import Lock
from app.config import config, runtime_config
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
_cache_lock = Lock() _TYPE_LABELS = {0: 'COM', 1: 'COM', 2: 'REP', 3: 'ROOM', 4: 'SENS'}
_cache: dict = {} # {public_key: {name, first_seen, last_seen, source}}
_cache_loaded = False
_adverts_offset = 0 # File offset for incremental advert scanning
def _get_cache_path() -> Path: def _get_db():
device_name = runtime_config.get_device_name() """Get database instance (deferred import to avoid circular imports)."""
return Path(config.MC_CONFIG_DIR) / f"{device_name}.contacts_cache.jsonl" from app.main import db
return db
def _get_adverts_path() -> Path:
device_name = runtime_config.get_device_name()
return Path(config.MC_CONFIG_DIR) / f"{device_name}.adverts.jsonl"
def load_cache() -> dict:
"""Load cache from disk into memory. Returns copy of cache dict."""
global _cache, _cache_loaded
with _cache_lock:
if _cache_loaded:
return _cache.copy()
cache_path = _get_cache_path()
_cache = {}
if not cache_path.exists():
_cache_loaded = True
logger.info("Contacts cache file does not exist yet")
return _cache.copy()
try:
with open(cache_path, 'r', encoding='utf-8') as f:
for line in f:
line = line.strip()
if not line:
continue
try:
entry = json.loads(line)
pk = entry.get('public_key', '').lower()
if pk:
# Migrate old "CLI" label to "COM"
if entry.get('type_label') == 'CLI':
entry['type_label'] = 'COM'
_cache[pk] = entry
except json.JSONDecodeError:
continue
_cache_loaded = True
logger.info(f"Loaded contacts cache: {len(_cache)} entries")
except Exception as e:
logger.error(f"Failed to load contacts cache: {e}")
_cache_loaded = True
return _cache.copy()
def save_cache() -> bool:
"""Write full cache to disk (atomic write)."""
with _cache_lock:
cache_path = _get_cache_path()
try:
cache_path.parent.mkdir(parents=True, exist_ok=True)
temp_file = cache_path.with_suffix('.tmp')
with open(temp_file, 'w', encoding='utf-8') as f:
for entry in _cache.values():
f.write(json.dumps(entry, ensure_ascii=False) + '\n')
temp_file.replace(cache_path)
logger.debug(f"Saved contacts cache: {len(_cache)} entries")
return True
except Exception as e:
logger.error(f"Failed to save contacts cache: {e}")
return False
def upsert_contact(public_key: str, name: str, source: str = "advert",
lat: float = 0.0, lon: float = 0.0, type_label: str = "") -> bool:
"""Add or update a contact in the cache. Returns True if cache was modified."""
pk = public_key.lower()
now = int(time.time())
with _cache_lock:
existing = _cache.get(pk)
if existing:
changed = False
if name and name != existing.get('name'):
existing['name'] = name
changed = True
# Update lat/lon if new values are non-zero
if lat != 0.0 or lon != 0.0:
if lat != existing.get('lat') or lon != existing.get('lon'):
existing['lat'] = lat
existing['lon'] = lon
changed = True
# Update type_label if provided and not already set
if type_label and type_label != existing.get('type_label'):
existing['type_label'] = type_label
changed = True
existing['last_seen'] = now
return changed
else:
if not name:
return False
entry = {
'public_key': pk,
'name': name,
'first_seen': now,
'last_seen': now,
'source': source,
}
if lat != 0.0 or lon != 0.0:
entry['lat'] = lat
entry['lon'] = lon
if type_label:
entry['type_label'] = type_label
_cache[pk] = entry
return True
def get_all_contacts() -> list: def get_all_contacts() -> list:
"""Get all cached contacts as a list of dicts (shallow copies).""" """Get all known contacts from DB."""
with _cache_lock: try:
return [entry.copy() for entry in _cache.values()] db = _get_db()
if db:
contacts = db.get_contacts()
return [{
'public_key': c.get('public_key', ''),
'name': c.get('name', ''),
'first_seen': c.get('first_seen', ''),
'last_seen': c.get('last_seen', ''),
'source': c.get('source', ''),
'lat': c.get('adv_lat', 0.0) or 0.0,
'lon': c.get('adv_lon', 0.0) or 0.0,
'type_label': _TYPE_LABELS.get(c.get('type', 1), 'UNKNOWN'),
} for c in contacts]
except Exception as e:
logger.error(f"Failed to get contacts: {e}")
return []
def get_all_names() -> list: def get_all_names() -> list:
"""Get all unique non-empty contact names sorted alphabetically.""" """Get all unique non-empty contact names sorted alphabetically."""
with _cache_lock: try:
return sorted(set( db = _get_db()
entry['name'] for entry in _cache.values() if db:
if entry.get('name') contacts = db.get_contacts()
)) return sorted(set(c.get('name', '') for c in contacts if c.get('name')))
except Exception as e:
logger.error(f"Failed to get contact names: {e}")
return []
def parse_advert_payload(pkt_payload_hex: str): def parse_advert_payload(pkt_payload_hex: str):
@@ -208,69 +108,3 @@ def parse_advert_payload(pkt_payload_hex: str):
return public_key, node_name if node_name else None, lat, lon return public_key, node_name if node_name else None, lat, lon
except Exception: except Exception:
return None, None, 0.0, 0.0 return None, None, 0.0, 0.0
def scan_new_adverts() -> int:
"""
Scan .adverts.jsonl for new entries since last scan.
Returns number of new/updated contacts.
"""
global _adverts_offset
adverts_path = _get_adverts_path()
if not adverts_path.exists():
return 0
updated = 0
try:
with open(adverts_path, 'r', encoding='utf-8') as f:
f.seek(_adverts_offset)
for line in f:
line = line.strip()
if not line:
continue
try:
advert = json.loads(line)
pkt_payload = advert.get('pkt_payload', '')
if not pkt_payload:
continue
pk, name, lat, lon = parse_advert_payload(pkt_payload)
if pk and name:
if upsert_contact(pk, name, source="advert", lat=lat, lon=lon):
updated += 1
except json.JSONDecodeError:
continue
_adverts_offset = f.tell()
except Exception as e:
logger.error(f"Failed to scan adverts: {e}")
if updated > 0:
save_cache()
logger.info(f"Contacts cache updated: {updated} new/changed entries")
return updated
_TYPE_LABELS = {1: 'COM', 2: 'REP', 3: 'ROOM', 4: 'SENS'}
def initialize_from_device(contacts_detailed: dict):
"""
Seed cache from /api/contacts/detailed response dict.
Called once at startup if cache file doesn't exist.
Args:
contacts_detailed: dict of {public_key: {adv_name, type, adv_lat, adv_lon, ...}} from meshcli
"""
added = 0
for pk, details in contacts_detailed.items():
name = details.get('adv_name', '')
lat = details.get('adv_lat', 0.0) or 0.0
lon = details.get('adv_lon', 0.0) or 0.0
type_label = _TYPE_LABELS.get(details.get('type'), '')
if upsert_contact(pk, name, source="device", lat=lat, lon=lon, type_label=type_label):
added += 1
if added > 0:
save_cache()
logger.info(f"Initialized contacts cache from device: {added} contacts")

View File

@@ -196,6 +196,25 @@ def _migrate_db_to_pubkey(db, public_key: str):
logger.info(f"Database renamed: {current.name} -> {target.name}") logger.info(f"Database renamed: {current.name} -> {target.name}")
def _cleanup_legacy_jsonl(data_dir: Path):
"""Remove stale JSONL files whose data now lives in the database."""
patterns = [
'*.contacts_cache.jsonl',
'*.adverts.jsonl',
'*.acks.jsonl',
'*.echoes.jsonl',
'*.path.jsonl',
'*_dm_sent.jsonl',
]
for pattern in patterns:
for f in data_dir.glob(pattern):
try:
f.unlink()
logger.info(f"Removed legacy file: {f.name}")
except OSError as e:
logger.warning(f"Could not remove {f.name}: {e}")
def create_app(): def create_app():
"""Create and configure Flask application""" """Create and configure Flask application"""
global db, device_manager global db, device_manager
@@ -307,6 +326,9 @@ def create_app():
except Exception as e: except Exception as e:
logger.error(f"v1 migration failed: {e}") logger.error(f"v1 migration failed: {e}")
# Clean up stale JSONL files (data is now in DB)
_cleanup_legacy_jsonl(Path(config.MC_CONFIG_DIR))
return return
logger.warning("Timeout waiting for device connection") logger.warning("Timeout waiting for device connection")