From 97a2014af2de7c5bd68cd4f5a5c1502559115b8b Mon Sep 17 00:00:00 2001 From: MarekWo Date: Sun, 1 Mar 2026 11:32:28 +0100 Subject: [PATCH] feat(v2): Auto-migrate v1 .msgs data to SQLite on first startup Reads the existing .msgs JSONL file and imports channel messages and DMs into the v2 SQLite database. Runs automatically when device connects and DB is empty. Handles sender parsing, pubkey resolution, and FK constraints. Co-Authored-By: Claude Opus 4.6 --- app/main.py | 16 ++- app/migrate_v1.py | 249 +++++++++++++++++++++++++++++++++++++++++ tests/test_database.py | 80 +++++++++++++ 3 files changed, 344 insertions(+), 1 deletion(-) create mode 100644 app/migrate_v1.py diff --git a/app/main.py b/app/main.py index 2416d0e..0babf42 100644 --- a/app/main.py +++ b/app/main.py @@ -81,7 +81,7 @@ def create_app(): # Start device connection in background (non-blocking) device_manager.start() - # Update runtime config when device connects + # Update runtime config when device connects, then run v1 migration if needed def _wait_for_device_name(): """Wait for device manager to connect and update runtime config.""" for _ in range(60): # wait up to 60 seconds @@ -91,6 +91,20 @@ def create_app(): device_manager.device_name, "device" ) logger.info(f"Device name resolved: {device_manager.device_name}") + + # Auto-migrate v1 data if .msgs file exists and DB is empty + try: + from app.migrate_v1 import should_migrate, migrate_v1_data + from pathlib import Path + data_dir = Path(config.MC_CONFIG_DIR) + dev_name = device_manager.device_name + if should_migrate(db, data_dir, dev_name): + logger.info("v1 .msgs file detected with empty DB — starting migration") + result = migrate_v1_data(db, data_dir, dev_name) + logger.info(f"v1 migration result: {result}") + except Exception as e: + logger.error(f"v1 migration failed: {e}") + return logger.warning("Timeout waiting for device connection") diff --git a/app/migrate_v1.py b/app/migrate_v1.py new file mode 100644 index 0000000..c03e26c --- /dev/null +++ b/app/migrate_v1.py @@ -0,0 +1,249 @@ +""" +Migrate v1 data (.msgs JSONL) into v2 SQLite database. + +Runs automatically on startup if .msgs file exists and database is empty. +Can also be run manually: python -m app.migrate_v1 + +Migrates: +- Channel messages (CHAN, SENT_CHAN) +- Direct messages (PRIV, SENT_MSG) +""" + +import json +import logging +import time +from pathlib import Path +from datetime import datetime +from typing import Optional + +logger = logging.getLogger(__name__) + + +def _find_msgs_file(data_dir: Path, device_name: str) -> Optional[Path]: + """Find the .msgs file for the given device name.""" + msgs_file = data_dir / f"{device_name}.msgs" + if msgs_file.exists(): + return msgs_file + + # Try to find any .msgs file in the data dir + candidates = list(data_dir.glob("*.msgs")) + # Exclude archive files (pattern: name.YYYY-MM-DD.msgs) + live_files = [f for f in candidates if f.stem.count('.') == 0] + if len(live_files) == 1: + return live_files[0] + + return None + + +def migrate_v1_data(db, data_dir: Path, device_name: str) -> dict: + """ + Import v1 .msgs data into v2 SQLite database. + + Args: + db: Database instance + data_dir: Path to meshcore config dir containing .msgs file + device_name: Device name (used for .msgs filename and own message detection) + + Returns: + dict with migration stats + """ + msgs_file = _find_msgs_file(data_dir, device_name) + if not msgs_file: + logger.info("No .msgs file found, skipping v1 migration") + return {'status': 'skipped', 'reason': 'no_msgs_file'} + + stats = { + 'channel_messages': 0, + 'direct_messages': 0, + 'skipped': 0, + 'errors': 0, + 'file': str(msgs_file), + } + + logger.info(f"Starting v1 data migration from {msgs_file}") + + try: + lines = msgs_file.read_text(encoding='utf-8', errors='replace').splitlines() + except Exception as e: + logger.error(f"Failed to read .msgs file: {e}") + return {'status': 'error', 'reason': str(e)} + + for line_num, raw_line in enumerate(lines, 1): + raw_line = raw_line.strip() + if not raw_line: + continue + + try: + entry = json.loads(raw_line) + except json.JSONDecodeError: + stats['errors'] += 1 + continue + + msg_type = entry.get('type') + + try: + if msg_type in ('CHAN', 'SENT_CHAN'): + _migrate_channel_msg(db, entry, device_name) + stats['channel_messages'] += 1 + elif msg_type == 'PRIV': + _migrate_dm_incoming(db, entry) + stats['direct_messages'] += 1 + elif msg_type == 'SENT_MSG': + if entry.get('txt_type', 0) == 0: # Only private messages + _migrate_dm_outgoing(db, entry, device_name) + stats['direct_messages'] += 1 + else: + stats['skipped'] += 1 + else: + stats['skipped'] += 1 + except Exception as e: + stats['errors'] += 1 + if stats['errors'] <= 5: + logger.warning(f"Migration error on line {line_num}: {e}") + + stats['status'] = 'completed' + logger.info( + f"v1 migration complete: {stats['channel_messages']} channel msgs, " + f"{stats['direct_messages']} DMs, {stats['skipped']} skipped, " + f"{stats['errors']} errors" + ) + return stats + + +def _migrate_channel_msg(db, entry: dict, device_name: str): + """Migrate a CHAN or SENT_CHAN entry.""" + raw_text = entry.get('text', '').strip() + if not raw_text: + return + + is_own = entry.get('type') == 'SENT_CHAN' + channel_idx = entry.get('channel_idx', 0) + timestamp = entry.get('timestamp', 0) + + if is_own: + sender = entry.get('sender', device_name) + content = raw_text + else: + # Parse sender from "SenderName: message" format + if ':' in raw_text: + sender, content = raw_text.split(':', 1) + sender = sender.strip() + content = content.strip() + else: + sender = 'Unknown' + content = raw_text + + db.insert_channel_message( + channel_idx=channel_idx, + sender=sender, + content=content, + timestamp=timestamp, + sender_timestamp=entry.get('sender_timestamp'), + is_own=is_own, + txt_type=entry.get('txt_type', 0), + snr=entry.get('SNR'), + path_len=entry.get('path_len'), + pkt_payload=entry.get('pkt_payload'), + raw_json=json.dumps(entry, default=str), + ) + + +def _migrate_dm_incoming(db, entry: dict): + """Migrate a PRIV (incoming DM) entry.""" + text = entry.get('text', '').strip() + if not text: + return + + pubkey_prefix = entry.get('pubkey_prefix', '') + + # Use None if pubkey not in contacts table (FK constraint) + contact_key = pubkey_prefix if pubkey_prefix else None + if contact_key: + contact_key = _resolve_pubkey(db, contact_key) + + db.insert_direct_message( + contact_pubkey=contact_key, + direction='in', + content=text, + timestamp=entry.get('timestamp', 0), + sender_timestamp=entry.get('sender_timestamp'), + txt_type=entry.get('txt_type', 0), + snr=entry.get('SNR'), + path_len=entry.get('path_len'), + pkt_payload=entry.get('pkt_payload'), + raw_json=json.dumps(entry, default=str), + ) + + +def _migrate_dm_outgoing(db, entry: dict, device_name: str): + """Migrate a SENT_MSG (outgoing DM) entry.""" + text = entry.get('text', '').strip() + if not text: + return + + # For outgoing DMs, we don't have recipient pubkey in v1 data. + # Use empty string — the messages will appear once a contact with + # matching name is found. + # In v1, conversation_id was "name_{recipient}" — we store the name + # in raw_json for reference. + recipient = entry.get('recipient', entry.get('name', '')) + + # Try to find pubkey from contacts table by recipient name + contact_pubkey = _lookup_pubkey_by_name(db, recipient) + + db.insert_direct_message( + contact_pubkey=contact_pubkey, + direction='out', + content=text, + timestamp=entry.get('timestamp', 0), + sender_timestamp=entry.get('sender_timestamp'), + txt_type=entry.get('txt_type', 0), + expected_ack=entry.get('expected_ack'), + pkt_payload=entry.get('pkt_payload'), + raw_json=json.dumps(entry, default=str), + ) + + +def _resolve_pubkey(db, pubkey_prefix: str) -> Optional[str]: + """Check if a pubkey prefix matches a contact. Returns full key or None.""" + if not pubkey_prefix: + return None + try: + contacts = db.get_contacts() + prefix = pubkey_prefix.lower() + for c in contacts: + pk = (c.get('public_key') or '').lower() + if pk and pk.startswith(prefix): + return pk + except Exception: + pass + return None + + +def _lookup_pubkey_by_name(db, name: str) -> Optional[str]: + """Look up a contact's public_key by name. Returns None if not found.""" + if not name: + return None + try: + contacts = db.get_contacts() + for c in contacts: + if c.get('name') == name: + return c.get('public_key') + except Exception: + pass + return None + + +def should_migrate(db, data_dir: Path, device_name: str) -> bool: + """Check if migration is needed: .msgs exists and DB has no messages.""" + msgs_file = _find_msgs_file(data_dir, device_name) + if not msgs_file: + return False + + # Only migrate if DB is empty (no channel messages and no DMs) + try: + stats = db.get_stats() + total = stats.get('channel_messages', 0) + stats.get('direct_messages', 0) + return total == 0 + except Exception: + return False diff --git a/tests/test_database.py b/tests/test_database.py index e25c809..d84f1fd 100644 --- a/tests/test_database.py +++ b/tests/test_database.py @@ -413,3 +413,83 @@ class TestPaths: db.insert_path('aa', pkt_payload='PKT', path='A>B>C', snr=-5.0, path_len=3) stats = db.get_stats() assert stats['paths'] == 1 + + +# ================================================================ +# v1 Migration +# ================================================================ + +class TestV1Migration: + def _write_msgs(self, path, lines): + """Write JSONL lines to a .msgs file.""" + import json + with open(path, 'w') as f: + for line in lines: + f.write(json.dumps(line) + '\n') + + def test_migrate_channel_messages(self, db): + import tempfile, json + from app.migrate_v1 import migrate_v1_data, should_migrate + + with tempfile.TemporaryDirectory() as tmp: + data_dir = Path(tmp) + self._write_msgs(data_dir / 'TestDevice.msgs', [ + {'type': 'CHAN', 'channel_idx': 0, 'text': 'Alice: Hello world', 'timestamp': 1000, 'SNR': -5.0, 'path_len': 2}, + {'type': 'SENT_CHAN', 'channel_idx': 0, 'text': 'My message', 'timestamp': 1001, 'sender': 'TestDevice'}, + {'type': 'CHAN', 'channel_idx': 1, 'text': 'Bob: On channel 1', 'timestamp': 1002}, + ]) + + assert should_migrate(db, data_dir, 'TestDevice') + + result = migrate_v1_data(db, data_dir, 'TestDevice') + assert result['status'] == 'completed' + assert result['channel_messages'] == 3 + + msgs = db.get_channel_messages() + assert len(msgs) == 3 + assert msgs[0]['sender'] == 'Alice' + assert msgs[0]['content'] == 'Hello world' + assert msgs[1]['sender'] == 'TestDevice' + assert msgs[1]['content'] == 'My message' + assert msgs[1]['is_own'] == 1 + assert msgs[2]['sender'] == 'Bob' + assert msgs[2]['channel_idx'] == 1 + + def test_migrate_dm_messages(self, db): + import tempfile, json + from app.migrate_v1 import migrate_v1_data + + with tempfile.TemporaryDirectory() as tmp: + data_dir = Path(tmp) + self._write_msgs(data_dir / 'TestDevice.msgs', [ + {'type': 'PRIV', 'text': 'Hello from Alice', 'timestamp': 2000, 'pubkey_prefix': 'aabb', 'name': 'Alice'}, + {'type': 'SENT_MSG', 'text': 'Reply to Alice', 'timestamp': 2001, 'recipient': 'Alice', 'txt_type': 0}, + {'type': 'SENT_MSG', 'text': 'Channel sent', 'timestamp': 2002, 'txt_type': 1}, # should be skipped + ]) + + result = migrate_v1_data(db, data_dir, 'TestDevice') + assert result['status'] == 'completed' + assert result['direct_messages'] == 2 + assert result['skipped'] == 1 + + def test_should_migrate_false_when_db_has_data(self, db): + import tempfile + from app.migrate_v1 import should_migrate + + with tempfile.TemporaryDirectory() as tmp: + data_dir = Path(tmp) + self._write_msgs(data_dir / 'Dev.msgs', [ + {'type': 'CHAN', 'text': 'Test: msg', 'timestamp': 1000}, + ]) + + # Add a message to DB first + db.insert_channel_message(0, 'X', 'Existing', int(time.time())) + + assert not should_migrate(db, data_dir, 'Dev') + + def test_should_migrate_false_when_no_msgs_file(self, db): + import tempfile + from app.migrate_v1 import should_migrate + + with tempfile.TemporaryDirectory() as tmp: + assert not should_migrate(db, Path(tmp), 'NoDevice')