diff --git a/app/main.py b/app/main.py index 2416d0e..0babf42 100644 --- a/app/main.py +++ b/app/main.py @@ -81,7 +81,7 @@ def create_app(): # Start device connection in background (non-blocking) device_manager.start() - # Update runtime config when device connects + # Update runtime config when device connects, then run v1 migration if needed def _wait_for_device_name(): """Wait for device manager to connect and update runtime config.""" for _ in range(60): # wait up to 60 seconds @@ -91,6 +91,20 @@ def create_app(): device_manager.device_name, "device" ) logger.info(f"Device name resolved: {device_manager.device_name}") + + # Auto-migrate v1 data if .msgs file exists and DB is empty + try: + from app.migrate_v1 import should_migrate, migrate_v1_data + from pathlib import Path + data_dir = Path(config.MC_CONFIG_DIR) + dev_name = device_manager.device_name + if should_migrate(db, data_dir, dev_name): + logger.info("v1 .msgs file detected with empty DB — starting migration") + result = migrate_v1_data(db, data_dir, dev_name) + logger.info(f"v1 migration result: {result}") + except Exception as e: + logger.error(f"v1 migration failed: {e}") + return logger.warning("Timeout waiting for device connection") diff --git a/app/migrate_v1.py b/app/migrate_v1.py new file mode 100644 index 0000000..c03e26c --- /dev/null +++ b/app/migrate_v1.py @@ -0,0 +1,249 @@ +""" +Migrate v1 data (.msgs JSONL) into v2 SQLite database. + +Runs automatically on startup if .msgs file exists and database is empty. +Can also be run manually: python -m app.migrate_v1 + +Migrates: +- Channel messages (CHAN, SENT_CHAN) +- Direct messages (PRIV, SENT_MSG) +""" + +import json +import logging +import time +from pathlib import Path +from datetime import datetime +from typing import Optional + +logger = logging.getLogger(__name__) + + +def _find_msgs_file(data_dir: Path, device_name: str) -> Optional[Path]: + """Find the .msgs file for the given device name.""" + msgs_file = data_dir / f"{device_name}.msgs" + if msgs_file.exists(): + return msgs_file + + # Try to find any .msgs file in the data dir + candidates = list(data_dir.glob("*.msgs")) + # Exclude archive files (pattern: name.YYYY-MM-DD.msgs) + live_files = [f for f in candidates if f.stem.count('.') == 0] + if len(live_files) == 1: + return live_files[0] + + return None + + +def migrate_v1_data(db, data_dir: Path, device_name: str) -> dict: + """ + Import v1 .msgs data into v2 SQLite database. + + Args: + db: Database instance + data_dir: Path to meshcore config dir containing .msgs file + device_name: Device name (used for .msgs filename and own message detection) + + Returns: + dict with migration stats + """ + msgs_file = _find_msgs_file(data_dir, device_name) + if not msgs_file: + logger.info("No .msgs file found, skipping v1 migration") + return {'status': 'skipped', 'reason': 'no_msgs_file'} + + stats = { + 'channel_messages': 0, + 'direct_messages': 0, + 'skipped': 0, + 'errors': 0, + 'file': str(msgs_file), + } + + logger.info(f"Starting v1 data migration from {msgs_file}") + + try: + lines = msgs_file.read_text(encoding='utf-8', errors='replace').splitlines() + except Exception as e: + logger.error(f"Failed to read .msgs file: {e}") + return {'status': 'error', 'reason': str(e)} + + for line_num, raw_line in enumerate(lines, 1): + raw_line = raw_line.strip() + if not raw_line: + continue + + try: + entry = json.loads(raw_line) + except json.JSONDecodeError: + stats['errors'] += 1 + continue + + msg_type = entry.get('type') + + try: + if msg_type in ('CHAN', 'SENT_CHAN'): + _migrate_channel_msg(db, entry, device_name) + stats['channel_messages'] += 1 + elif msg_type == 'PRIV': + _migrate_dm_incoming(db, entry) + stats['direct_messages'] += 1 + elif msg_type == 'SENT_MSG': + if entry.get('txt_type', 0) == 0: # Only private messages + _migrate_dm_outgoing(db, entry, device_name) + stats['direct_messages'] += 1 + else: + stats['skipped'] += 1 + else: + stats['skipped'] += 1 + except Exception as e: + stats['errors'] += 1 + if stats['errors'] <= 5: + logger.warning(f"Migration error on line {line_num}: {e}") + + stats['status'] = 'completed' + logger.info( + f"v1 migration complete: {stats['channel_messages']} channel msgs, " + f"{stats['direct_messages']} DMs, {stats['skipped']} skipped, " + f"{stats['errors']} errors" + ) + return stats + + +def _migrate_channel_msg(db, entry: dict, device_name: str): + """Migrate a CHAN or SENT_CHAN entry.""" + raw_text = entry.get('text', '').strip() + if not raw_text: + return + + is_own = entry.get('type') == 'SENT_CHAN' + channel_idx = entry.get('channel_idx', 0) + timestamp = entry.get('timestamp', 0) + + if is_own: + sender = entry.get('sender', device_name) + content = raw_text + else: + # Parse sender from "SenderName: message" format + if ':' in raw_text: + sender, content = raw_text.split(':', 1) + sender = sender.strip() + content = content.strip() + else: + sender = 'Unknown' + content = raw_text + + db.insert_channel_message( + channel_idx=channel_idx, + sender=sender, + content=content, + timestamp=timestamp, + sender_timestamp=entry.get('sender_timestamp'), + is_own=is_own, + txt_type=entry.get('txt_type', 0), + snr=entry.get('SNR'), + path_len=entry.get('path_len'), + pkt_payload=entry.get('pkt_payload'), + raw_json=json.dumps(entry, default=str), + ) + + +def _migrate_dm_incoming(db, entry: dict): + """Migrate a PRIV (incoming DM) entry.""" + text = entry.get('text', '').strip() + if not text: + return + + pubkey_prefix = entry.get('pubkey_prefix', '') + + # Use None if pubkey not in contacts table (FK constraint) + contact_key = pubkey_prefix if pubkey_prefix else None + if contact_key: + contact_key = _resolve_pubkey(db, contact_key) + + db.insert_direct_message( + contact_pubkey=contact_key, + direction='in', + content=text, + timestamp=entry.get('timestamp', 0), + sender_timestamp=entry.get('sender_timestamp'), + txt_type=entry.get('txt_type', 0), + snr=entry.get('SNR'), + path_len=entry.get('path_len'), + pkt_payload=entry.get('pkt_payload'), + raw_json=json.dumps(entry, default=str), + ) + + +def _migrate_dm_outgoing(db, entry: dict, device_name: str): + """Migrate a SENT_MSG (outgoing DM) entry.""" + text = entry.get('text', '').strip() + if not text: + return + + # For outgoing DMs, we don't have recipient pubkey in v1 data. + # Use empty string — the messages will appear once a contact with + # matching name is found. + # In v1, conversation_id was "name_{recipient}" — we store the name + # in raw_json for reference. + recipient = entry.get('recipient', entry.get('name', '')) + + # Try to find pubkey from contacts table by recipient name + contact_pubkey = _lookup_pubkey_by_name(db, recipient) + + db.insert_direct_message( + contact_pubkey=contact_pubkey, + direction='out', + content=text, + timestamp=entry.get('timestamp', 0), + sender_timestamp=entry.get('sender_timestamp'), + txt_type=entry.get('txt_type', 0), + expected_ack=entry.get('expected_ack'), + pkt_payload=entry.get('pkt_payload'), + raw_json=json.dumps(entry, default=str), + ) + + +def _resolve_pubkey(db, pubkey_prefix: str) -> Optional[str]: + """Check if a pubkey prefix matches a contact. Returns full key or None.""" + if not pubkey_prefix: + return None + try: + contacts = db.get_contacts() + prefix = pubkey_prefix.lower() + for c in contacts: + pk = (c.get('public_key') or '').lower() + if pk and pk.startswith(prefix): + return pk + except Exception: + pass + return None + + +def _lookup_pubkey_by_name(db, name: str) -> Optional[str]: + """Look up a contact's public_key by name. Returns None if not found.""" + if not name: + return None + try: + contacts = db.get_contacts() + for c in contacts: + if c.get('name') == name: + return c.get('public_key') + except Exception: + pass + return None + + +def should_migrate(db, data_dir: Path, device_name: str) -> bool: + """Check if migration is needed: .msgs exists and DB has no messages.""" + msgs_file = _find_msgs_file(data_dir, device_name) + if not msgs_file: + return False + + # Only migrate if DB is empty (no channel messages and no DMs) + try: + stats = db.get_stats() + total = stats.get('channel_messages', 0) + stats.get('direct_messages', 0) + return total == 0 + except Exception: + return False diff --git a/tests/test_database.py b/tests/test_database.py index e25c809..d84f1fd 100644 --- a/tests/test_database.py +++ b/tests/test_database.py @@ -413,3 +413,83 @@ class TestPaths: db.insert_path('aa', pkt_payload='PKT', path='A>B>C', snr=-5.0, path_len=3) stats = db.get_stats() assert stats['paths'] == 1 + + +# ================================================================ +# v1 Migration +# ================================================================ + +class TestV1Migration: + def _write_msgs(self, path, lines): + """Write JSONL lines to a .msgs file.""" + import json + with open(path, 'w') as f: + for line in lines: + f.write(json.dumps(line) + '\n') + + def test_migrate_channel_messages(self, db): + import tempfile, json + from app.migrate_v1 import migrate_v1_data, should_migrate + + with tempfile.TemporaryDirectory() as tmp: + data_dir = Path(tmp) + self._write_msgs(data_dir / 'TestDevice.msgs', [ + {'type': 'CHAN', 'channel_idx': 0, 'text': 'Alice: Hello world', 'timestamp': 1000, 'SNR': -5.0, 'path_len': 2}, + {'type': 'SENT_CHAN', 'channel_idx': 0, 'text': 'My message', 'timestamp': 1001, 'sender': 'TestDevice'}, + {'type': 'CHAN', 'channel_idx': 1, 'text': 'Bob: On channel 1', 'timestamp': 1002}, + ]) + + assert should_migrate(db, data_dir, 'TestDevice') + + result = migrate_v1_data(db, data_dir, 'TestDevice') + assert result['status'] == 'completed' + assert result['channel_messages'] == 3 + + msgs = db.get_channel_messages() + assert len(msgs) == 3 + assert msgs[0]['sender'] == 'Alice' + assert msgs[0]['content'] == 'Hello world' + assert msgs[1]['sender'] == 'TestDevice' + assert msgs[1]['content'] == 'My message' + assert msgs[1]['is_own'] == 1 + assert msgs[2]['sender'] == 'Bob' + assert msgs[2]['channel_idx'] == 1 + + def test_migrate_dm_messages(self, db): + import tempfile, json + from app.migrate_v1 import migrate_v1_data + + with tempfile.TemporaryDirectory() as tmp: + data_dir = Path(tmp) + self._write_msgs(data_dir / 'TestDevice.msgs', [ + {'type': 'PRIV', 'text': 'Hello from Alice', 'timestamp': 2000, 'pubkey_prefix': 'aabb', 'name': 'Alice'}, + {'type': 'SENT_MSG', 'text': 'Reply to Alice', 'timestamp': 2001, 'recipient': 'Alice', 'txt_type': 0}, + {'type': 'SENT_MSG', 'text': 'Channel sent', 'timestamp': 2002, 'txt_type': 1}, # should be skipped + ]) + + result = migrate_v1_data(db, data_dir, 'TestDevice') + assert result['status'] == 'completed' + assert result['direct_messages'] == 2 + assert result['skipped'] == 1 + + def test_should_migrate_false_when_db_has_data(self, db): + import tempfile + from app.migrate_v1 import should_migrate + + with tempfile.TemporaryDirectory() as tmp: + data_dir = Path(tmp) + self._write_msgs(data_dir / 'Dev.msgs', [ + {'type': 'CHAN', 'text': 'Test: msg', 'timestamp': 1000}, + ]) + + # Add a message to DB first + db.insert_channel_message(0, 'X', 'Existing', int(time.time())) + + assert not should_migrate(db, data_dir, 'Dev') + + def test_should_migrate_false_when_no_msgs_file(self, db): + import tempfile + from app.migrate_v1 import should_migrate + + with tempfile.TemporaryDirectory() as tmp: + assert not should_migrate(db, Path(tmp), 'NoDevice')