mirror of
https://github.com/MarekWo/mc-webui.git
synced 2026-03-28 17:42:45 +01:00
Compare commits
6 Commits
92a88cae22
...
d54d8f58dd
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d54d8f58dd | ||
|
|
2c73e20775 | ||
|
|
f9bcbabb86 | ||
|
|
5ccd882c5a | ||
|
|
2a9f90c01d | ||
|
|
acfa5d3550 |
@@ -26,7 +26,7 @@ class Config:
|
||||
MC_ARCHIVE_RETENTION_DAYS = int(os.getenv('MC_ARCHIVE_RETENTION_DAYS', '7'))
|
||||
|
||||
# v2: Database
|
||||
MC_DB_PATH = os.getenv('MC_DB_PATH', '') # empty = auto: {MC_CONFIG_DIR}/{device_name}.db
|
||||
MC_DB_PATH = os.getenv('MC_DB_PATH', '') # empty = auto: {MC_CONFIG_DIR}/mc_{pubkey_prefix}.db
|
||||
|
||||
# v2: TCP connection (alternative to serial, e.g. meshcore-proxy)
|
||||
MC_TCP_HOST = os.getenv('MC_TCP_HOST', '') # empty = use serial
|
||||
|
||||
@@ -1,159 +1,59 @@
|
||||
"""
|
||||
Contacts Cache - Persistent storage of all known node names + public keys.
|
||||
Contacts Cache - DB-backed contact name/key lookup.
|
||||
|
||||
Stores every node name ever seen (from device contacts and adverts),
|
||||
so @mention autocomplete works even for removed contacts.
|
||||
All contact data is stored in the SQLite contacts table.
|
||||
JSONL files are no longer used.
|
||||
|
||||
File format: JSONL ({device_name}.contacts_cache.jsonl)
|
||||
Each line: {"public_key": "...", "name": "...", "first_seen": ts, "last_seen": ts,
|
||||
"source": "advert"|"device", "lat": float, "lon": float, "type_label": "COM"|"REP"|...}
|
||||
Kept for backward compatibility: get_all_names(), get_all_contacts(),
|
||||
parse_advert_payload().
|
||||
"""
|
||||
|
||||
import json
|
||||
import logging
|
||||
import math
|
||||
import struct
|
||||
import time
|
||||
from pathlib import Path
|
||||
from threading import Lock
|
||||
|
||||
from app.config import config, runtime_config
|
||||
from flask import current_app
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
_cache_lock = Lock()
|
||||
_cache: dict = {} # {public_key: {name, first_seen, last_seen, source}}
|
||||
_cache_loaded = False
|
||||
_adverts_offset = 0 # File offset for incremental advert scanning
|
||||
_TYPE_LABELS = {0: 'COM', 1: 'COM', 2: 'REP', 3: 'ROOM', 4: 'SENS'}
|
||||
|
||||
|
||||
def _get_cache_path() -> Path:
|
||||
device_name = runtime_config.get_device_name()
|
||||
return Path(config.MC_CONFIG_DIR) / f"{device_name}.contacts_cache.jsonl"
|
||||
|
||||
|
||||
def _get_adverts_path() -> Path:
|
||||
device_name = runtime_config.get_device_name()
|
||||
return Path(config.MC_CONFIG_DIR) / f"{device_name}.adverts.jsonl"
|
||||
|
||||
|
||||
def load_cache() -> dict:
|
||||
"""Load cache from disk into memory. Returns copy of cache dict."""
|
||||
global _cache, _cache_loaded
|
||||
|
||||
with _cache_lock:
|
||||
if _cache_loaded:
|
||||
return _cache.copy()
|
||||
|
||||
cache_path = _get_cache_path()
|
||||
_cache = {}
|
||||
|
||||
if not cache_path.exists():
|
||||
_cache_loaded = True
|
||||
logger.info("Contacts cache file does not exist yet")
|
||||
return _cache.copy()
|
||||
|
||||
try:
|
||||
with open(cache_path, 'r', encoding='utf-8') as f:
|
||||
for line in f:
|
||||
line = line.strip()
|
||||
if not line:
|
||||
continue
|
||||
try:
|
||||
entry = json.loads(line)
|
||||
pk = entry.get('public_key', '').lower()
|
||||
if pk:
|
||||
# Migrate old "CLI" label to "COM"
|
||||
if entry.get('type_label') == 'CLI':
|
||||
entry['type_label'] = 'COM'
|
||||
_cache[pk] = entry
|
||||
except json.JSONDecodeError:
|
||||
continue
|
||||
|
||||
_cache_loaded = True
|
||||
logger.info(f"Loaded contacts cache: {len(_cache)} entries")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to load contacts cache: {e}")
|
||||
_cache_loaded = True
|
||||
|
||||
return _cache.copy()
|
||||
|
||||
|
||||
def save_cache() -> bool:
|
||||
"""Write full cache to disk (atomic write)."""
|
||||
with _cache_lock:
|
||||
cache_path = _get_cache_path()
|
||||
try:
|
||||
cache_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
temp_file = cache_path.with_suffix('.tmp')
|
||||
with open(temp_file, 'w', encoding='utf-8') as f:
|
||||
for entry in _cache.values():
|
||||
f.write(json.dumps(entry, ensure_ascii=False) + '\n')
|
||||
temp_file.replace(cache_path)
|
||||
logger.debug(f"Saved contacts cache: {len(_cache)} entries")
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to save contacts cache: {e}")
|
||||
return False
|
||||
|
||||
|
||||
def upsert_contact(public_key: str, name: str, source: str = "advert",
|
||||
lat: float = 0.0, lon: float = 0.0, type_label: str = "") -> bool:
|
||||
"""Add or update a contact in the cache. Returns True if cache was modified."""
|
||||
pk = public_key.lower()
|
||||
now = int(time.time())
|
||||
|
||||
with _cache_lock:
|
||||
existing = _cache.get(pk)
|
||||
if existing:
|
||||
changed = False
|
||||
if name and name != existing.get('name'):
|
||||
existing['name'] = name
|
||||
changed = True
|
||||
# Update lat/lon if new values are non-zero
|
||||
if lat != 0.0 or lon != 0.0:
|
||||
if lat != existing.get('lat') or lon != existing.get('lon'):
|
||||
existing['lat'] = lat
|
||||
existing['lon'] = lon
|
||||
changed = True
|
||||
# Update type_label if provided and not already set
|
||||
if type_label and type_label != existing.get('type_label'):
|
||||
existing['type_label'] = type_label
|
||||
changed = True
|
||||
existing['last_seen'] = now
|
||||
return changed
|
||||
else:
|
||||
if not name:
|
||||
return False
|
||||
entry = {
|
||||
'public_key': pk,
|
||||
'name': name,
|
||||
'first_seen': now,
|
||||
'last_seen': now,
|
||||
'source': source,
|
||||
}
|
||||
if lat != 0.0 or lon != 0.0:
|
||||
entry['lat'] = lat
|
||||
entry['lon'] = lon
|
||||
if type_label:
|
||||
entry['type_label'] = type_label
|
||||
_cache[pk] = entry
|
||||
return True
|
||||
def _get_db():
|
||||
"""Get database instance from Flask app context."""
|
||||
return getattr(current_app, 'db', None)
|
||||
|
||||
|
||||
def get_all_contacts() -> list:
|
||||
"""Get all cached contacts as a list of dicts (shallow copies)."""
|
||||
with _cache_lock:
|
||||
return [entry.copy() for entry in _cache.values()]
|
||||
"""Get all known contacts from DB."""
|
||||
try:
|
||||
db = _get_db()
|
||||
if db:
|
||||
contacts = db.get_contacts()
|
||||
return [{
|
||||
'public_key': c.get('public_key', ''),
|
||||
'name': c.get('name', ''),
|
||||
'first_seen': c.get('first_seen', ''),
|
||||
'last_seen': c.get('last_seen', ''),
|
||||
'source': c.get('source', ''),
|
||||
'lat': c.get('adv_lat', 0.0) or 0.0,
|
||||
'lon': c.get('adv_lon', 0.0) or 0.0,
|
||||
'type_label': _TYPE_LABELS.get(c.get('type', 1), 'UNKNOWN'),
|
||||
} for c in contacts]
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to get contacts: {e}")
|
||||
return []
|
||||
|
||||
|
||||
def get_all_names() -> list:
|
||||
"""Get all unique non-empty contact names sorted alphabetically."""
|
||||
with _cache_lock:
|
||||
return sorted(set(
|
||||
entry['name'] for entry in _cache.values()
|
||||
if entry.get('name')
|
||||
))
|
||||
try:
|
||||
db = _get_db()
|
||||
if db:
|
||||
contacts = db.get_contacts()
|
||||
return sorted(set(c.get('name', '') for c in contacts if c.get('name')))
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to get contact names: {e}")
|
||||
return []
|
||||
|
||||
|
||||
def parse_advert_payload(pkt_payload_hex: str):
|
||||
@@ -208,69 +108,3 @@ def parse_advert_payload(pkt_payload_hex: str):
|
||||
return public_key, node_name if node_name else None, lat, lon
|
||||
except Exception:
|
||||
return None, None, 0.0, 0.0
|
||||
|
||||
|
||||
def scan_new_adverts() -> int:
|
||||
"""
|
||||
Scan .adverts.jsonl for new entries since last scan.
|
||||
Returns number of new/updated contacts.
|
||||
"""
|
||||
global _adverts_offset
|
||||
|
||||
adverts_path = _get_adverts_path()
|
||||
if not adverts_path.exists():
|
||||
return 0
|
||||
|
||||
updated = 0
|
||||
try:
|
||||
with open(adverts_path, 'r', encoding='utf-8') as f:
|
||||
f.seek(_adverts_offset)
|
||||
for line in f:
|
||||
line = line.strip()
|
||||
if not line:
|
||||
continue
|
||||
try:
|
||||
advert = json.loads(line)
|
||||
pkt_payload = advert.get('pkt_payload', '')
|
||||
if not pkt_payload:
|
||||
continue
|
||||
pk, name, lat, lon = parse_advert_payload(pkt_payload)
|
||||
if pk and name:
|
||||
if upsert_contact(pk, name, source="advert", lat=lat, lon=lon):
|
||||
updated += 1
|
||||
except json.JSONDecodeError:
|
||||
continue
|
||||
_adverts_offset = f.tell()
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to scan adverts: {e}")
|
||||
|
||||
if updated > 0:
|
||||
save_cache()
|
||||
logger.info(f"Contacts cache updated: {updated} new/changed entries")
|
||||
|
||||
return updated
|
||||
|
||||
|
||||
_TYPE_LABELS = {1: 'COM', 2: 'REP', 3: 'ROOM', 4: 'SENS'}
|
||||
|
||||
|
||||
def initialize_from_device(contacts_detailed: dict):
|
||||
"""
|
||||
Seed cache from /api/contacts/detailed response dict.
|
||||
Called once at startup if cache file doesn't exist.
|
||||
|
||||
Args:
|
||||
contacts_detailed: dict of {public_key: {adv_name, type, adv_lat, adv_lon, ...}} from meshcli
|
||||
"""
|
||||
added = 0
|
||||
for pk, details in contacts_detailed.items():
|
||||
name = details.get('adv_name', '')
|
||||
lat = details.get('adv_lat', 0.0) or 0.0
|
||||
lon = details.get('adv_lon', 0.0) or 0.0
|
||||
type_label = _TYPE_LABELS.get(details.get('type'), '')
|
||||
if upsert_contact(pk, name, source="device", lat=lat, lon=lon, type_label=type_label):
|
||||
added += 1
|
||||
|
||||
if added > 0:
|
||||
save_cache()
|
||||
logger.info(f"Initialized contacts cache from device: {added} contacts")
|
||||
|
||||
@@ -80,6 +80,12 @@ class Database:
|
||||
row = conn.execute("SELECT * FROM device WHERE id = 1").fetchone()
|
||||
return dict(row) if row else None
|
||||
|
||||
def get_public_key(self) -> Optional[str]:
|
||||
"""Get device public key (used for DB filename resolution)."""
|
||||
with self._connect() as conn:
|
||||
row = conn.execute("SELECT public_key FROM device WHERE id = 1").fetchone()
|
||||
return row['public_key'] if row and row['public_key'] else None
|
||||
|
||||
# ================================================================
|
||||
# Contacts
|
||||
# ================================================================
|
||||
@@ -963,6 +969,14 @@ class Database:
|
||||
(key, 1 if muted else 0)
|
||||
)
|
||||
|
||||
def get_muted_channels(self) -> List[int]:
|
||||
"""Get list of muted channel indices."""
|
||||
with self._connect() as conn:
|
||||
rows = conn.execute(
|
||||
"SELECT key FROM read_status WHERE is_muted = 1 AND key LIKE 'chan_%'"
|
||||
).fetchall()
|
||||
return [int(r['key'][5:]) for r in rows]
|
||||
|
||||
# ================================================================
|
||||
# Full-Text Search
|
||||
# ================================================================
|
||||
@@ -1050,7 +1064,8 @@ class Database:
|
||||
backup_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
date_str = datetime.now().strftime('%Y-%m-%d')
|
||||
backup_path = backup_dir / f"mc-webui.{date_str}.db"
|
||||
prefix = self.db_path.stem # e.g. "mc_9cebbd27"
|
||||
backup_path = backup_dir / f"{prefix}.{date_str}.db"
|
||||
|
||||
source = sqlite3.connect(str(self.db_path))
|
||||
dest = sqlite3.connect(str(backup_path))
|
||||
@@ -1070,7 +1085,7 @@ class Database:
|
||||
return []
|
||||
|
||||
backups = []
|
||||
for f in sorted(backup_dir.glob("mc-webui.*.db"), reverse=True):
|
||||
for f in sorted(backup_dir.glob("*.db"), reverse=True):
|
||||
backups.append({
|
||||
'filename': f.name,
|
||||
'path': str(f),
|
||||
@@ -1087,7 +1102,7 @@ class Database:
|
||||
|
||||
cutoff = datetime.now() - timedelta(days=retention_days)
|
||||
removed = 0
|
||||
for f in backup_dir.glob("mc-webui.*.db"):
|
||||
for f in backup_dir.glob("*.db"):
|
||||
if datetime.fromtimestamp(f.stat().st_mtime) < cutoff:
|
||||
f.unlink()
|
||||
removed += 1
|
||||
|
||||
@@ -2335,7 +2335,7 @@ class DeviceManager:
|
||||
from meshcore.events import EventType
|
||||
types = 0xFF # all types
|
||||
if type_filter:
|
||||
type_map = {'cli': 1, 'rep': 2, 'room': 3, 'sensor': 4, 'sens': 4}
|
||||
type_map = {'com': 1, 'rep': 2, 'room': 3, 'sensor': 4, 'sens': 4}
|
||||
t = type_map.get(type_filter.lower())
|
||||
if t:
|
||||
types = t
|
||||
|
||||
214
app/main.py
214
app/main.py
@@ -8,9 +8,11 @@ import json
|
||||
import logging
|
||||
import re
|
||||
import shlex
|
||||
import sqlite3
|
||||
import threading
|
||||
import time
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
from flask import Flask, request as flask_request
|
||||
from flask_socketio import SocketIO, emit
|
||||
from app.config import config, runtime_config
|
||||
@@ -52,21 +54,53 @@ db = None
|
||||
device_manager = None
|
||||
|
||||
|
||||
def _sanitize_db_name(name: str) -> str:
|
||||
"""Sanitize device name for use as database filename."""
|
||||
sanitized = re.sub(r'[<>:"/\\|?*\x00-\x1f]', '_', name)
|
||||
sanitized = sanitized.strip('. ')
|
||||
return sanitized or 'device'
|
||||
def _pubkey_db_name(public_key: str) -> str:
|
||||
"""Return stable DB filename based on device public key prefix."""
|
||||
return f"mc_{public_key[:8].lower()}.db"
|
||||
|
||||
|
||||
def _read_pubkey_from_db(db_path: Path) -> Optional[str]:
|
||||
"""Probe an existing DB file for the device public key.
|
||||
|
||||
Uses a raw sqlite3 connection (not Database class) to avoid
|
||||
WAL creation side effects on a file that may be about to be renamed.
|
||||
"""
|
||||
try:
|
||||
conn = sqlite3.connect(f"file:{db_path}?mode=ro", uri=True)
|
||||
try:
|
||||
row = conn.execute("SELECT public_key FROM device WHERE id = 1").fetchone()
|
||||
if row and row[0]:
|
||||
return row[0]
|
||||
finally:
|
||||
conn.close()
|
||||
except Exception:
|
||||
pass
|
||||
return None
|
||||
|
||||
|
||||
def _rename_db_files(src: Path, dst: Path) -> bool:
|
||||
"""Rename DB + WAL + SHM files. Returns True on success."""
|
||||
for suffix in ['', '-wal', '-shm']:
|
||||
s = Path(str(src) + suffix)
|
||||
d = Path(str(dst) + suffix)
|
||||
if s.exists():
|
||||
try:
|
||||
s.rename(d)
|
||||
except OSError as e:
|
||||
logger.error(f"Failed to rename {s.name} -> {d.name}: {e}")
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def _resolve_db_path() -> Path:
|
||||
"""Resolve database path, preferring existing device-named DB files.
|
||||
"""Resolve database path using public-key-based naming.
|
||||
|
||||
Priority:
|
||||
1. Explicit MC_DB_PATH that is NOT mc-webui.db -> use as-is
|
||||
2. Existing device-named .db file in config dir (most recently modified)
|
||||
3. Existing mc-webui.db (legacy, will be renamed on device connect)
|
||||
4. New mc-webui.db (will be renamed on device connect)
|
||||
1. Explicit MC_DB_PATH (not mc-webui.db) -> use as-is
|
||||
2. Existing mc_*.db file (new pubkey-based format) -> use most recent
|
||||
3. Existing *.db (old device-name format) -> probe for pubkey, rename if possible
|
||||
4. Existing mc-webui.db (legacy default) -> probe for pubkey, rename if possible
|
||||
5. New install -> create mc-webui.db (will be renamed on first device connect)
|
||||
"""
|
||||
if config.MC_DB_PATH:
|
||||
p = Path(config.MC_DB_PATH)
|
||||
@@ -76,35 +110,69 @@ def _resolve_db_path() -> Path:
|
||||
else:
|
||||
db_dir = Path(config.MC_CONFIG_DIR)
|
||||
|
||||
# Scan for existing device-named DBs (anything except mc-webui.db)
|
||||
# 1. Scan for new-format DBs (mc_????????.db)
|
||||
try:
|
||||
existing = sorted(
|
||||
[f for f in db_dir.glob('*.db')
|
||||
if f.name != 'mc-webui.db' and f.is_file()],
|
||||
new_format = sorted(
|
||||
[f for f in db_dir.glob('mc_????????.db') if f.is_file()],
|
||||
key=lambda f: f.stat().st_mtime,
|
||||
reverse=True
|
||||
)
|
||||
if existing:
|
||||
logger.info(f"Found device-named database: {existing[0].name}")
|
||||
return existing[0]
|
||||
if new_format:
|
||||
logger.info(f"Found database: {new_format[0].name}")
|
||||
return new_format[0]
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
# Fallback: mc-webui.db (legacy or new install)
|
||||
return db_dir / 'mc-webui.db'
|
||||
# 2. Scan for old device-named DBs (anything except mc-webui.db and mc_*.db)
|
||||
try:
|
||||
old_format = sorted(
|
||||
[f for f in db_dir.glob('*.db')
|
||||
if f.name != 'mc-webui.db'
|
||||
and not re.match(r'^mc_[0-9a-f]{8}\.db$', f.name)
|
||||
and f.is_file()],
|
||||
key=lambda f: f.stat().st_mtime,
|
||||
reverse=True
|
||||
)
|
||||
if old_format:
|
||||
db_file = old_format[0]
|
||||
pubkey = _read_pubkey_from_db(db_file)
|
||||
if pubkey:
|
||||
target = db_dir / _pubkey_db_name(pubkey)
|
||||
if not target.exists() and _rename_db_files(db_file, target):
|
||||
logger.info(f"Migrated database: {db_file.name} -> {target.name}")
|
||||
return target
|
||||
elif target.exists():
|
||||
logger.info(f"Found database: {target.name}")
|
||||
return target
|
||||
# No pubkey in device table yet — use as-is, rename deferred
|
||||
logger.info(f"Found legacy database: {db_file.name} (rename deferred)")
|
||||
return db_file
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
# 3. Check for mc-webui.db (legacy default)
|
||||
legacy = db_dir / 'mc-webui.db'
|
||||
if legacy.exists():
|
||||
pubkey = _read_pubkey_from_db(legacy)
|
||||
if pubkey:
|
||||
target = db_dir / _pubkey_db_name(pubkey)
|
||||
if not target.exists() and _rename_db_files(legacy, target):
|
||||
logger.info(f"Migrated database: {legacy.name} -> {target.name}")
|
||||
return target
|
||||
return legacy
|
||||
|
||||
# 4. New install — will be renamed on first device connect
|
||||
return legacy
|
||||
|
||||
|
||||
def _migrate_db_to_device_name(db, device_name: str):
|
||||
"""Rename DB file to match device name if needed.
|
||||
def _migrate_db_to_pubkey(db, public_key: str):
|
||||
"""Rename DB file to public-key-based name if needed.
|
||||
|
||||
Handles three cases:
|
||||
- Current DB already matches device name -> no-op
|
||||
- Target DB exists (different device was here before) -> switch to it
|
||||
- Target DB doesn't exist -> rename current DB files
|
||||
Called after device connects and provides its public key.
|
||||
"""
|
||||
safe_name = _sanitize_db_name(device_name)
|
||||
target_name = _pubkey_db_name(public_key)
|
||||
current = db.db_path
|
||||
target = current.parent / f"{safe_name}.db"
|
||||
target = current.parent / target_name
|
||||
|
||||
if current.resolve() == target.resolve():
|
||||
return
|
||||
@@ -123,19 +191,28 @@ def _migrate_db_to_device_name(db, device_name: str):
|
||||
except Exception as e:
|
||||
logger.warning(f"WAL checkpoint before rename: {e}")
|
||||
|
||||
# Rename DB + WAL + SHM files
|
||||
for suffix in ['', '-wal', '-shm']:
|
||||
src = Path(str(current) + suffix)
|
||||
dst = Path(str(target) + suffix)
|
||||
if src.exists():
|
||||
try:
|
||||
src.rename(dst)
|
||||
except OSError as e:
|
||||
logger.error(f"Failed to rename {src.name} -> {dst.name}: {e}")
|
||||
return # abort migration
|
||||
if _rename_db_files(current, target):
|
||||
db.db_path = target
|
||||
logger.info(f"Database renamed: {current.name} -> {target.name}")
|
||||
|
||||
db.db_path = target
|
||||
logger.info(f"Database renamed: {current.name} -> {target.name}")
|
||||
|
||||
def _cleanup_legacy_jsonl(data_dir: Path):
|
||||
"""Remove stale JSONL files whose data now lives in the database."""
|
||||
patterns = [
|
||||
'*.contacts_cache.jsonl',
|
||||
'*.adverts.jsonl',
|
||||
'*.acks.jsonl',
|
||||
'*.echoes.jsonl',
|
||||
'*.path.jsonl',
|
||||
'*_dm_sent.jsonl',
|
||||
]
|
||||
for pattern in patterns:
|
||||
for f in data_dir.glob(pattern):
|
||||
try:
|
||||
f.unlink()
|
||||
logger.info(f"Removed legacy file: {f.name}")
|
||||
except OSError as e:
|
||||
logger.warning(f"Could not remove {f.name}: {e}")
|
||||
|
||||
|
||||
def create_app():
|
||||
@@ -186,6 +263,27 @@ def create_app():
|
||||
except Exception as e:
|
||||
logger.warning(f"Could not rename settings file: {e}")
|
||||
|
||||
# Migrate .read_status.json to DB (one-time)
|
||||
read_status_file = Path(config.MC_CONFIG_DIR) / '.read_status.json'
|
||||
if read_status_file.exists():
|
||||
try:
|
||||
import json as _json
|
||||
with open(read_status_file, 'r', encoding='utf-8') as f:
|
||||
rs_data = _json.load(f)
|
||||
migrated = 0
|
||||
for ch_idx, ts in rs_data.get('channels', {}).items():
|
||||
db.mark_read(f"chan_{ch_idx}", int(ts))
|
||||
migrated += 1
|
||||
for conv_id, ts in rs_data.get('dm', {}).items():
|
||||
db.mark_read(f"dm_{conv_id}", int(ts))
|
||||
migrated += 1
|
||||
for ch_idx in rs_data.get('muted_channels', []):
|
||||
db.set_channel_muted(int(ch_idx), True)
|
||||
read_status_file.rename(read_status_file.with_suffix('.json.bak'))
|
||||
logger.info(f"Migrated {migrated} read status entries to DB")
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to migrate .read_status.json: {e}")
|
||||
|
||||
# v2: Initialize and start device manager
|
||||
device_manager = DeviceManager(config, db, socketio)
|
||||
app.device_manager = device_manager
|
||||
@@ -203,17 +301,20 @@ def create_app():
|
||||
runtime_config.set_device_name(dev_name, "device")
|
||||
logger.info(f"Device name resolved: {dev_name}")
|
||||
|
||||
# Rename DB to match device name (mc-webui.db -> {name}.db)
|
||||
_migrate_db_to_device_name(db, dev_name)
|
||||
|
||||
# Ensure device info is stored in current DB
|
||||
pubkey = ''
|
||||
if device_manager.self_info:
|
||||
pubkey = device_manager.self_info.get('public_key', '')
|
||||
db.set_device_info(
|
||||
public_key=device_manager.self_info.get('public_key', ''),
|
||||
public_key=pubkey,
|
||||
name=dev_name,
|
||||
self_info=json.dumps(device_manager.self_info, default=str)
|
||||
)
|
||||
|
||||
# Rename DB to pubkey-based name (e.g. mc-webui.db -> mc_9cebbd27.db)
|
||||
if pubkey:
|
||||
_migrate_db_to_pubkey(db, pubkey)
|
||||
|
||||
# Auto-migrate v1 data if .msgs file exists and DB is empty
|
||||
try:
|
||||
from app.migrate_v1 import should_migrate, migrate_v1_data
|
||||
@@ -225,6 +326,9 @@ def create_app():
|
||||
except Exception as e:
|
||||
logger.error(f"v1 migration failed: {e}")
|
||||
|
||||
# Clean up stale JSONL files (data is now in DB)
|
||||
_cleanup_legacy_jsonl(Path(config.MC_CONFIG_DIR))
|
||||
|
||||
return
|
||||
logger.warning("Timeout waiting for device connection")
|
||||
|
||||
@@ -1000,12 +1104,30 @@ def _execute_console_command(args: list) -> str:
|
||||
data = result['data']
|
||||
if not data:
|
||||
return "No nodes discovered"
|
||||
type_names = ["NONE", "COM", "REP", "ROOM", "SENS"]
|
||||
lines = [f"Discovered nodes ({len(data)}):"]
|
||||
for node in data:
|
||||
if isinstance(node, dict):
|
||||
name = node.get('adv_name', node.get('name', '?'))
|
||||
pk = node.get('public_key', '')[:12]
|
||||
lines.append(f" {name} ({pk}...)")
|
||||
pk = node.get('pubkey', '')
|
||||
# Try to resolve name from contacts
|
||||
name = None
|
||||
if pk and device_manager.mc:
|
||||
try:
|
||||
contact = device_manager.mc.get_contact_by_key_prefix(pk)
|
||||
if contact:
|
||||
name = contact.get('adv_name', '')
|
||||
except Exception:
|
||||
pass
|
||||
if name:
|
||||
label = f"{pk[:6]} {name}"
|
||||
else:
|
||||
label = pk[:16] or '?'
|
||||
nt = node.get('node_type', 0)
|
||||
type_str = type_names[nt] if nt < len(type_names) else f"t:{nt}"
|
||||
snr_in = node.get('SNR_in', 0)
|
||||
snr = node.get('SNR', 0)
|
||||
rssi = node.get('RSSI', 0)
|
||||
lines.append(f" {label:28} {type_str:>4} SNR: {snr_in:6.2f}->{snr:6.2f} RSSI: {rssi}")
|
||||
else:
|
||||
lines.append(f" {node}")
|
||||
return "\n".join(lines)
|
||||
|
||||
@@ -1,198 +1,101 @@
|
||||
"""
|
||||
Read Status Manager - Server-side storage for message read status
|
||||
Read Status Manager - DB-backed storage for message read status
|
||||
|
||||
Manages the last seen timestamps for channels and DM conversations,
|
||||
providing cross-device synchronization for unread message tracking.
|
||||
All data is stored in the read_status table of the SQLite database.
|
||||
"""
|
||||
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
from pathlib import Path
|
||||
from threading import Lock
|
||||
from app.config import config
|
||||
from flask import current_app
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Thread-safe lock for file operations
|
||||
_status_lock = Lock()
|
||||
|
||||
# Path to read status file
|
||||
READ_STATUS_FILE = Path(config.MC_CONFIG_DIR) / '.read_status.json'
|
||||
|
||||
|
||||
def _get_default_status():
|
||||
"""Get default read status structure"""
|
||||
return {
|
||||
'channels': {}, # {"0": timestamp, "1": timestamp, ...}
|
||||
'dm': {}, # {"name_User1": timestamp, "pk_abc123": timestamp, ...}
|
||||
'muted_channels': [] # [2, 5, 7] - channel indices with muted notifications
|
||||
}
|
||||
def _get_db():
|
||||
"""Get database instance from Flask app context."""
|
||||
return getattr(current_app, 'db', None)
|
||||
|
||||
|
||||
def load_read_status():
|
||||
"""
|
||||
Load read status from disk.
|
||||
"""Load read status from database.
|
||||
|
||||
Returns:
|
||||
dict: Read status with 'channels' and 'dm' keys
|
||||
dict: Read status with 'channels', 'dm', and 'muted_channels' keys
|
||||
"""
|
||||
with _status_lock:
|
||||
try:
|
||||
if not READ_STATUS_FILE.exists():
|
||||
logger.info("Read status file does not exist, creating default")
|
||||
return _get_default_status()
|
||||
try:
|
||||
db = _get_db()
|
||||
rows = db.get_read_status()
|
||||
|
||||
with open(READ_STATUS_FILE, 'r', encoding='utf-8') as f:
|
||||
status = json.load(f)
|
||||
channels = {}
|
||||
dm = {}
|
||||
muted_channels = []
|
||||
|
||||
# Validate structure
|
||||
if not isinstance(status, dict):
|
||||
logger.warning("Invalid read status structure, resetting")
|
||||
return _get_default_status()
|
||||
for key, row in rows.items():
|
||||
if key.startswith('chan_'):
|
||||
chan_idx = key[5:] # "chan_0" -> "0"
|
||||
channels[chan_idx] = row['last_seen_ts']
|
||||
if row.get('is_muted'):
|
||||
try:
|
||||
muted_channels.append(int(chan_idx))
|
||||
except ValueError:
|
||||
pass
|
||||
elif key.startswith('dm_'):
|
||||
conv_id = key[3:] # "dm_name_User1" -> "name_User1"
|
||||
dm[conv_id] = row['last_seen_ts']
|
||||
|
||||
# Ensure all keys exist
|
||||
if 'channels' not in status:
|
||||
status['channels'] = {}
|
||||
if 'dm' not in status:
|
||||
status['dm'] = {}
|
||||
if 'muted_channels' not in status:
|
||||
status['muted_channels'] = []
|
||||
return {
|
||||
'channels': channels,
|
||||
'dm': dm,
|
||||
'muted_channels': muted_channels,
|
||||
}
|
||||
|
||||
logger.debug(f"Loaded read status: {len(status['channels'])} channels, {len(status['dm'])} DM conversations")
|
||||
return status
|
||||
|
||||
except json.JSONDecodeError as e:
|
||||
logger.error(f"Failed to parse read status file: {e}")
|
||||
return _get_default_status()
|
||||
except Exception as e:
|
||||
logger.error(f"Error loading read status: {e}")
|
||||
return _get_default_status()
|
||||
except Exception as e:
|
||||
logger.error(f"Error loading read status: {e}")
|
||||
return {'channels': {}, 'dm': {}, 'muted_channels': []}
|
||||
|
||||
|
||||
def save_read_status(status):
|
||||
"""
|
||||
Save read status to disk.
|
||||
|
||||
Args:
|
||||
status (dict): Read status with 'channels' and 'dm' keys
|
||||
|
||||
Returns:
|
||||
bool: True if successful, False otherwise
|
||||
"""
|
||||
with _status_lock:
|
||||
try:
|
||||
# Ensure directory exists
|
||||
READ_STATUS_FILE.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Write atomically (write to temp file, then rename)
|
||||
temp_file = READ_STATUS_FILE.with_suffix('.tmp')
|
||||
with open(temp_file, 'w', encoding='utf-8') as f:
|
||||
json.dump(status, f, indent=2)
|
||||
|
||||
# Atomic rename
|
||||
temp_file.replace(READ_STATUS_FILE)
|
||||
|
||||
logger.debug(f"Saved read status: {len(status['channels'])} channels, {len(status['dm'])} DM conversations")
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error saving read status: {e}")
|
||||
return False
|
||||
"""No-op — data is written per-operation via mark_* functions."""
|
||||
return True
|
||||
|
||||
|
||||
def mark_channel_read(channel_idx, timestamp):
|
||||
"""
|
||||
Mark a channel as read up to a specific timestamp.
|
||||
|
||||
Args:
|
||||
channel_idx (int or str): Channel index (will be converted to string)
|
||||
timestamp (int or float): Unix timestamp of last read message
|
||||
|
||||
Returns:
|
||||
bool: True if successful, False otherwise
|
||||
"""
|
||||
"""Mark a channel as read up to a specific timestamp."""
|
||||
try:
|
||||
# Load current status
|
||||
status = load_read_status()
|
||||
|
||||
# Update channel timestamp (ensure key is string for JSON compatibility)
|
||||
channel_key = str(channel_idx)
|
||||
status['channels'][channel_key] = int(timestamp)
|
||||
|
||||
# Save updated status
|
||||
success = save_read_status(status)
|
||||
|
||||
if success:
|
||||
logger.debug(f"Marked channel {channel_idx} as read at timestamp {timestamp}")
|
||||
|
||||
return success
|
||||
|
||||
db = _get_db()
|
||||
db.mark_read(f"chan_{channel_idx}", int(timestamp))
|
||||
logger.debug(f"Marked channel {channel_idx} as read at timestamp {timestamp}")
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error(f"Error marking channel {channel_idx} as read: {e}")
|
||||
return False
|
||||
|
||||
|
||||
def mark_dm_read(conversation_id, timestamp):
|
||||
"""
|
||||
Mark a DM conversation as read up to a specific timestamp.
|
||||
|
||||
Args:
|
||||
conversation_id (str): Conversation identifier (e.g., "name_User1" or "pk_abc123")
|
||||
timestamp (int or float): Unix timestamp of last read message
|
||||
|
||||
Returns:
|
||||
bool: True if successful, False otherwise
|
||||
"""
|
||||
"""Mark a DM conversation as read up to a specific timestamp."""
|
||||
try:
|
||||
# Load current status
|
||||
status = load_read_status()
|
||||
|
||||
# Update DM timestamp
|
||||
status['dm'][conversation_id] = int(timestamp)
|
||||
|
||||
# Save updated status
|
||||
success = save_read_status(status)
|
||||
|
||||
if success:
|
||||
logger.debug(f"Marked DM conversation {conversation_id} as read at timestamp {timestamp}")
|
||||
|
||||
return success
|
||||
|
||||
db = _get_db()
|
||||
db.mark_read(f"dm_{conversation_id}", int(timestamp))
|
||||
logger.debug(f"Marked DM conversation {conversation_id} as read at timestamp {timestamp}")
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error(f"Error marking DM conversation {conversation_id} as read: {e}")
|
||||
return False
|
||||
|
||||
|
||||
def get_channel_last_seen(channel_idx):
|
||||
"""
|
||||
Get last seen timestamp for a specific channel.
|
||||
|
||||
Args:
|
||||
channel_idx (int or str): Channel index
|
||||
|
||||
Returns:
|
||||
int: Unix timestamp, or 0 if never seen
|
||||
"""
|
||||
"""Get last seen timestamp for a specific channel."""
|
||||
try:
|
||||
status = load_read_status()
|
||||
channel_key = str(channel_idx)
|
||||
return status['channels'].get(channel_key, 0)
|
||||
return status['channels'].get(str(channel_idx), 0)
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting last seen for channel {channel_idx}: {e}")
|
||||
return 0
|
||||
|
||||
|
||||
def get_dm_last_seen(conversation_id):
|
||||
"""
|
||||
Get last seen timestamp for a specific DM conversation.
|
||||
|
||||
Args:
|
||||
conversation_id (str): Conversation identifier
|
||||
|
||||
Returns:
|
||||
int: Unix timestamp, or 0 if never seen
|
||||
"""
|
||||
"""Get last seen timestamp for a specific DM conversation."""
|
||||
try:
|
||||
status = load_read_status()
|
||||
return status['dm'].get(conversation_id, 0)
|
||||
@@ -202,75 +105,39 @@ def get_dm_last_seen(conversation_id):
|
||||
|
||||
|
||||
def get_muted_channels():
|
||||
"""
|
||||
Get list of muted channel indices.
|
||||
|
||||
Returns:
|
||||
list: List of muted channel indices (integers)
|
||||
"""
|
||||
"""Get list of muted channel indices."""
|
||||
try:
|
||||
status = load_read_status()
|
||||
return status.get('muted_channels', [])
|
||||
db = _get_db()
|
||||
return db.get_muted_channels()
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting muted channels: {e}")
|
||||
return []
|
||||
|
||||
|
||||
def set_channel_muted(channel_idx, muted):
|
||||
"""
|
||||
Set mute state for a channel.
|
||||
|
||||
Args:
|
||||
channel_idx (int): Channel index
|
||||
muted (bool): True to mute, False to unmute
|
||||
|
||||
Returns:
|
||||
bool: True if successful
|
||||
"""
|
||||
"""Set mute state for a channel."""
|
||||
try:
|
||||
status = load_read_status()
|
||||
muted_list = status.get('muted_channels', [])
|
||||
channel_idx = int(channel_idx)
|
||||
|
||||
if muted and channel_idx not in muted_list:
|
||||
muted_list.append(channel_idx)
|
||||
elif not muted and channel_idx in muted_list:
|
||||
muted_list.remove(channel_idx)
|
||||
|
||||
status['muted_channels'] = muted_list
|
||||
success = save_read_status(status)
|
||||
|
||||
if success:
|
||||
logger.info(f"Channel {channel_idx} {'muted' if muted else 'unmuted'}")
|
||||
return success
|
||||
|
||||
db = _get_db()
|
||||
db.set_channel_muted(int(channel_idx), muted)
|
||||
logger.info(f"Channel {channel_idx} {'muted' if muted else 'unmuted'}")
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error(f"Error setting mute for channel {channel_idx}: {e}")
|
||||
return False
|
||||
|
||||
|
||||
def mark_all_channels_read(channel_timestamps):
|
||||
"""
|
||||
Mark all channels as read in bulk.
|
||||
"""Mark all channels as read in bulk.
|
||||
|
||||
Args:
|
||||
channel_timestamps (dict): {"0": timestamp, "1": timestamp, ...}
|
||||
|
||||
Returns:
|
||||
bool: True if successful
|
||||
"""
|
||||
try:
|
||||
status = load_read_status()
|
||||
|
||||
db = _get_db()
|
||||
for channel_key, timestamp in channel_timestamps.items():
|
||||
status['channels'][str(channel_key)] = int(timestamp)
|
||||
|
||||
success = save_read_status(status)
|
||||
|
||||
if success:
|
||||
logger.info(f"Marked {len(channel_timestamps)} channels as read")
|
||||
return success
|
||||
|
||||
db.mark_read(f"chan_{channel_key}", int(timestamp))
|
||||
logger.info(f"Marked {len(channel_timestamps)} channels as read")
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error(f"Error marking all channels as read: {e}")
|
||||
return False
|
||||
|
||||
@@ -392,7 +392,7 @@ class TestBackup:
|
||||
db.create_backup(backup_dir)
|
||||
backups = db.list_backups(backup_dir)
|
||||
assert len(backups) == 1
|
||||
assert 'mc-webui.' in backups[0]['filename']
|
||||
assert backups[0]['filename'].endswith('.db')
|
||||
|
||||
def test_list_backups_empty_dir(self, db):
|
||||
with tempfile.TemporaryDirectory() as tmp:
|
||||
|
||||
Reference in New Issue
Block a user