feat(v2): Auto-migrate v1 .msgs data to SQLite on first startup

Reads the existing .msgs JSONL file and imports channel messages and DMs
into the v2 SQLite database. Runs automatically when device connects and
DB is empty. Handles sender parsing, pubkey resolution, and FK constraints.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
MarekWo
2026-03-01 11:32:28 +01:00
parent 64860ba178
commit 97a2014af2
3 changed files with 344 additions and 1 deletions

View File

@@ -81,7 +81,7 @@ def create_app():
# Start device connection in background (non-blocking)
device_manager.start()
# Update runtime config when device connects
# Update runtime config when device connects, then run v1 migration if needed
def _wait_for_device_name():
"""Wait for device manager to connect and update runtime config."""
for _ in range(60): # wait up to 60 seconds
@@ -91,6 +91,20 @@ def create_app():
device_manager.device_name, "device"
)
logger.info(f"Device name resolved: {device_manager.device_name}")
# Auto-migrate v1 data if .msgs file exists and DB is empty
try:
from app.migrate_v1 import should_migrate, migrate_v1_data
from pathlib import Path
data_dir = Path(config.MC_CONFIG_DIR)
dev_name = device_manager.device_name
if should_migrate(db, data_dir, dev_name):
logger.info("v1 .msgs file detected with empty DB — starting migration")
result = migrate_v1_data(db, data_dir, dev_name)
logger.info(f"v1 migration result: {result}")
except Exception as e:
logger.error(f"v1 migration failed: {e}")
return
logger.warning("Timeout waiting for device connection")

249
app/migrate_v1.py Normal file
View File

@@ -0,0 +1,249 @@
"""
Migrate v1 data (.msgs JSONL) into v2 SQLite database.
Runs automatically on startup if .msgs file exists and database is empty.
Can also be run manually: python -m app.migrate_v1
Migrates:
- Channel messages (CHAN, SENT_CHAN)
- Direct messages (PRIV, SENT_MSG)
"""
import json
import logging
import time
from pathlib import Path
from datetime import datetime
from typing import Optional
logger = logging.getLogger(__name__)
def _find_msgs_file(data_dir: Path, device_name: str) -> Optional[Path]:
"""Find the .msgs file for the given device name."""
msgs_file = data_dir / f"{device_name}.msgs"
if msgs_file.exists():
return msgs_file
# Try to find any .msgs file in the data dir
candidates = list(data_dir.glob("*.msgs"))
# Exclude archive files (pattern: name.YYYY-MM-DD.msgs)
live_files = [f for f in candidates if f.stem.count('.') == 0]
if len(live_files) == 1:
return live_files[0]
return None
def migrate_v1_data(db, data_dir: Path, device_name: str) -> dict:
"""
Import v1 .msgs data into v2 SQLite database.
Args:
db: Database instance
data_dir: Path to meshcore config dir containing .msgs file
device_name: Device name (used for .msgs filename and own message detection)
Returns:
dict with migration stats
"""
msgs_file = _find_msgs_file(data_dir, device_name)
if not msgs_file:
logger.info("No .msgs file found, skipping v1 migration")
return {'status': 'skipped', 'reason': 'no_msgs_file'}
stats = {
'channel_messages': 0,
'direct_messages': 0,
'skipped': 0,
'errors': 0,
'file': str(msgs_file),
}
logger.info(f"Starting v1 data migration from {msgs_file}")
try:
lines = msgs_file.read_text(encoding='utf-8', errors='replace').splitlines()
except Exception as e:
logger.error(f"Failed to read .msgs file: {e}")
return {'status': 'error', 'reason': str(e)}
for line_num, raw_line in enumerate(lines, 1):
raw_line = raw_line.strip()
if not raw_line:
continue
try:
entry = json.loads(raw_line)
except json.JSONDecodeError:
stats['errors'] += 1
continue
msg_type = entry.get('type')
try:
if msg_type in ('CHAN', 'SENT_CHAN'):
_migrate_channel_msg(db, entry, device_name)
stats['channel_messages'] += 1
elif msg_type == 'PRIV':
_migrate_dm_incoming(db, entry)
stats['direct_messages'] += 1
elif msg_type == 'SENT_MSG':
if entry.get('txt_type', 0) == 0: # Only private messages
_migrate_dm_outgoing(db, entry, device_name)
stats['direct_messages'] += 1
else:
stats['skipped'] += 1
else:
stats['skipped'] += 1
except Exception as e:
stats['errors'] += 1
if stats['errors'] <= 5:
logger.warning(f"Migration error on line {line_num}: {e}")
stats['status'] = 'completed'
logger.info(
f"v1 migration complete: {stats['channel_messages']} channel msgs, "
f"{stats['direct_messages']} DMs, {stats['skipped']} skipped, "
f"{stats['errors']} errors"
)
return stats
def _migrate_channel_msg(db, entry: dict, device_name: str):
"""Migrate a CHAN or SENT_CHAN entry."""
raw_text = entry.get('text', '').strip()
if not raw_text:
return
is_own = entry.get('type') == 'SENT_CHAN'
channel_idx = entry.get('channel_idx', 0)
timestamp = entry.get('timestamp', 0)
if is_own:
sender = entry.get('sender', device_name)
content = raw_text
else:
# Parse sender from "SenderName: message" format
if ':' in raw_text:
sender, content = raw_text.split(':', 1)
sender = sender.strip()
content = content.strip()
else:
sender = 'Unknown'
content = raw_text
db.insert_channel_message(
channel_idx=channel_idx,
sender=sender,
content=content,
timestamp=timestamp,
sender_timestamp=entry.get('sender_timestamp'),
is_own=is_own,
txt_type=entry.get('txt_type', 0),
snr=entry.get('SNR'),
path_len=entry.get('path_len'),
pkt_payload=entry.get('pkt_payload'),
raw_json=json.dumps(entry, default=str),
)
def _migrate_dm_incoming(db, entry: dict):
"""Migrate a PRIV (incoming DM) entry."""
text = entry.get('text', '').strip()
if not text:
return
pubkey_prefix = entry.get('pubkey_prefix', '')
# Use None if pubkey not in contacts table (FK constraint)
contact_key = pubkey_prefix if pubkey_prefix else None
if contact_key:
contact_key = _resolve_pubkey(db, contact_key)
db.insert_direct_message(
contact_pubkey=contact_key,
direction='in',
content=text,
timestamp=entry.get('timestamp', 0),
sender_timestamp=entry.get('sender_timestamp'),
txt_type=entry.get('txt_type', 0),
snr=entry.get('SNR'),
path_len=entry.get('path_len'),
pkt_payload=entry.get('pkt_payload'),
raw_json=json.dumps(entry, default=str),
)
def _migrate_dm_outgoing(db, entry: dict, device_name: str):
"""Migrate a SENT_MSG (outgoing DM) entry."""
text = entry.get('text', '').strip()
if not text:
return
# For outgoing DMs, we don't have recipient pubkey in v1 data.
# Use empty string — the messages will appear once a contact with
# matching name is found.
# In v1, conversation_id was "name_{recipient}" — we store the name
# in raw_json for reference.
recipient = entry.get('recipient', entry.get('name', ''))
# Try to find pubkey from contacts table by recipient name
contact_pubkey = _lookup_pubkey_by_name(db, recipient)
db.insert_direct_message(
contact_pubkey=contact_pubkey,
direction='out',
content=text,
timestamp=entry.get('timestamp', 0),
sender_timestamp=entry.get('sender_timestamp'),
txt_type=entry.get('txt_type', 0),
expected_ack=entry.get('expected_ack'),
pkt_payload=entry.get('pkt_payload'),
raw_json=json.dumps(entry, default=str),
)
def _resolve_pubkey(db, pubkey_prefix: str) -> Optional[str]:
"""Check if a pubkey prefix matches a contact. Returns full key or None."""
if not pubkey_prefix:
return None
try:
contacts = db.get_contacts()
prefix = pubkey_prefix.lower()
for c in contacts:
pk = (c.get('public_key') or '').lower()
if pk and pk.startswith(prefix):
return pk
except Exception:
pass
return None
def _lookup_pubkey_by_name(db, name: str) -> Optional[str]:
"""Look up a contact's public_key by name. Returns None if not found."""
if not name:
return None
try:
contacts = db.get_contacts()
for c in contacts:
if c.get('name') == name:
return c.get('public_key')
except Exception:
pass
return None
def should_migrate(db, data_dir: Path, device_name: str) -> bool:
"""Check if migration is needed: .msgs exists and DB has no messages."""
msgs_file = _find_msgs_file(data_dir, device_name)
if not msgs_file:
return False
# Only migrate if DB is empty (no channel messages and no DMs)
try:
stats = db.get_stats()
total = stats.get('channel_messages', 0) + stats.get('direct_messages', 0)
return total == 0
except Exception:
return False

View File

@@ -413,3 +413,83 @@ class TestPaths:
db.insert_path('aa', pkt_payload='PKT', path='A>B>C', snr=-5.0, path_len=3)
stats = db.get_stats()
assert stats['paths'] == 1
# ================================================================
# v1 Migration
# ================================================================
class TestV1Migration:
def _write_msgs(self, path, lines):
"""Write JSONL lines to a .msgs file."""
import json
with open(path, 'w') as f:
for line in lines:
f.write(json.dumps(line) + '\n')
def test_migrate_channel_messages(self, db):
import tempfile, json
from app.migrate_v1 import migrate_v1_data, should_migrate
with tempfile.TemporaryDirectory() as tmp:
data_dir = Path(tmp)
self._write_msgs(data_dir / 'TestDevice.msgs', [
{'type': 'CHAN', 'channel_idx': 0, 'text': 'Alice: Hello world', 'timestamp': 1000, 'SNR': -5.0, 'path_len': 2},
{'type': 'SENT_CHAN', 'channel_idx': 0, 'text': 'My message', 'timestamp': 1001, 'sender': 'TestDevice'},
{'type': 'CHAN', 'channel_idx': 1, 'text': 'Bob: On channel 1', 'timestamp': 1002},
])
assert should_migrate(db, data_dir, 'TestDevice')
result = migrate_v1_data(db, data_dir, 'TestDevice')
assert result['status'] == 'completed'
assert result['channel_messages'] == 3
msgs = db.get_channel_messages()
assert len(msgs) == 3
assert msgs[0]['sender'] == 'Alice'
assert msgs[0]['content'] == 'Hello world'
assert msgs[1]['sender'] == 'TestDevice'
assert msgs[1]['content'] == 'My message'
assert msgs[1]['is_own'] == 1
assert msgs[2]['sender'] == 'Bob'
assert msgs[2]['channel_idx'] == 1
def test_migrate_dm_messages(self, db):
import tempfile, json
from app.migrate_v1 import migrate_v1_data
with tempfile.TemporaryDirectory() as tmp:
data_dir = Path(tmp)
self._write_msgs(data_dir / 'TestDevice.msgs', [
{'type': 'PRIV', 'text': 'Hello from Alice', 'timestamp': 2000, 'pubkey_prefix': 'aabb', 'name': 'Alice'},
{'type': 'SENT_MSG', 'text': 'Reply to Alice', 'timestamp': 2001, 'recipient': 'Alice', 'txt_type': 0},
{'type': 'SENT_MSG', 'text': 'Channel sent', 'timestamp': 2002, 'txt_type': 1}, # should be skipped
])
result = migrate_v1_data(db, data_dir, 'TestDevice')
assert result['status'] == 'completed'
assert result['direct_messages'] == 2
assert result['skipped'] == 1
def test_should_migrate_false_when_db_has_data(self, db):
import tempfile
from app.migrate_v1 import should_migrate
with tempfile.TemporaryDirectory() as tmp:
data_dir = Path(tmp)
self._write_msgs(data_dir / 'Dev.msgs', [
{'type': 'CHAN', 'text': 'Test: msg', 'timestamp': 1000},
])
# Add a message to DB first
db.insert_channel_message(0, 'X', 'Existing', int(time.time()))
assert not should_migrate(db, data_dir, 'Dev')
def test_should_migrate_false_when_no_msgs_file(self, db):
import tempfile
from app.migrate_v1 import should_migrate
with tempfile.TemporaryDirectory() as tmp:
assert not should_migrate(db, Path(tmp), 'NoDevice')