Files
mc-webui/app/migrate_v1.py
MarekWo c20e7c20ad fix(v2): Fix DM contact names showing as pubkey prefix
- DM handler: don't overwrite contact name with prefix when name unknown
- Migration: upsert contact with sender name from v1 PRIV entries
- Fixes conversations showing "4e45565e" instead of "demo mc-webui"

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-01 13:58:15 +01:00

360 lines
12 KiB
Python

"""
Migrate v1 data (.msgs JSONL) into v2 SQLite database.
Runs automatically on startup if .msgs file exists and database is empty.
Can also be run manually: python -m app.migrate_v1
Migrates:
- Live .msgs file (today's messages)
- Archive .msgs files (historical messages)
- Channel messages (CHAN, SENT_CHAN)
- Direct messages (PRIV, SENT_MSG)
"""
import json
import logging
from pathlib import Path
from typing import Optional, List
logger = logging.getLogger(__name__)
def _find_msgs_file(data_dir: Path, device_name: str) -> Optional[Path]:
"""Find the live .msgs file for the given device name."""
msgs_file = data_dir / f"{device_name}.msgs"
if msgs_file.exists():
return msgs_file
# Try to find any .msgs file in the data dir
candidates = list(data_dir.glob("*.msgs"))
# Exclude archive files (pattern: name.YYYY-MM-DD.msgs)
live_files = [f for f in candidates if f.stem.count('.') == 0]
if len(live_files) == 1:
return live_files[0]
return None
def _find_archive_files(data_dir: Path, device_name: str) -> List[Path]:
"""Find all archive .msgs files, sorted oldest first."""
archive_files = []
# Check common archive locations
archive_dirs = [
data_dir / 'archive', # /data/archive/
data_dir.parent / 'archive', # sibling archive dir
]
for archive_dir in archive_dirs:
if archive_dir.exists():
# Pattern: DeviceName.YYYY-MM-DD.msgs
for f in archive_dir.glob(f"{device_name}.*.msgs"):
# Validate it's an archive file (has date in name)
parts = f.stem.split('.')
if len(parts) >= 2:
archive_files.append(f)
# Also check data_dir itself for archives
for f in data_dir.glob(f"{device_name}.*.msgs"):
parts = f.stem.split('.')
if len(parts) >= 2 and f not in archive_files:
archive_files.append(f)
# Sort by filename (which sorts by date since format is Name.YYYY-MM-DD)
archive_files.sort(key=lambda f: f.name)
return archive_files
def migrate_v1_data(db, data_dir: Path, device_name: str) -> dict:
"""
Import v1 .msgs data into v2 SQLite database.
Imports both live .msgs file and all archive files.
Args:
db: Database instance
data_dir: Path to meshcore config dir containing .msgs file
device_name: Device name (used for .msgs filename and own message detection)
Returns:
dict with migration stats
"""
stats = {
'channel_messages': 0,
'direct_messages': 0,
'skipped': 0,
'errors': 0,
'files_processed': 0,
}
# Collect all files to import: archives first (oldest), then live
files_to_import = []
archive_files = _find_archive_files(data_dir, device_name)
if archive_files:
files_to_import.extend(archive_files)
logger.info(f"Found {len(archive_files)} archive files to migrate")
live_file = _find_msgs_file(data_dir, device_name)
if live_file:
files_to_import.append(live_file)
if not files_to_import:
logger.info("No .msgs files found, skipping v1 migration")
return {'status': 'skipped', 'reason': 'no_msgs_files'}
logger.info(f"Starting v1 data migration: {len(files_to_import)} files to process")
# Track seen timestamps+text to avoid duplicates across archive and live file
seen_channel = set()
seen_dm = set()
for msgs_file in files_to_import:
file_stats = _import_msgs_file(
db, msgs_file, device_name, seen_channel, seen_dm
)
stats['channel_messages'] += file_stats['channel_messages']
stats['direct_messages'] += file_stats['direct_messages']
stats['skipped'] += file_stats['skipped']
stats['errors'] += file_stats['errors']
stats['files_processed'] += 1
stats['status'] = 'completed'
logger.info(
f"v1 migration complete: {stats['files_processed']} files, "
f"{stats['channel_messages']} channel msgs, "
f"{stats['direct_messages']} DMs, {stats['skipped']} skipped, "
f"{stats['errors']} errors"
)
return stats
def _import_msgs_file(db, msgs_file: Path, device_name: str,
seen_channel: set, seen_dm: set) -> dict:
"""Import a single .msgs file. Returns per-file stats."""
stats = {'channel_messages': 0, 'direct_messages': 0, 'skipped': 0, 'errors': 0}
logger.info(f"Importing {msgs_file.name}...")
try:
lines = msgs_file.read_text(encoding='utf-8', errors='replace').splitlines()
except Exception as e:
logger.error(f"Failed to read {msgs_file}: {e}")
stats['errors'] += 1
return stats
for line_num, raw_line in enumerate(lines, 1):
raw_line = raw_line.strip()
if not raw_line:
continue
try:
entry = json.loads(raw_line)
except json.JSONDecodeError:
stats['errors'] += 1
continue
msg_type = entry.get('type')
try:
if msg_type in ('CHAN', 'SENT_CHAN'):
# Dedup key: timestamp + first 50 chars of text
ts = entry.get('timestamp', 0)
text = entry.get('text', '')[:50]
dedup = (ts, text)
if dedup in seen_channel:
stats['skipped'] += 1
continue
seen_channel.add(dedup)
_migrate_channel_msg(db, entry, device_name)
stats['channel_messages'] += 1
elif msg_type == 'PRIV':
ts = entry.get('timestamp', 0)
text = entry.get('text', '')[:50]
dedup = (ts, text)
if dedup in seen_dm:
stats['skipped'] += 1
continue
seen_dm.add(dedup)
_migrate_dm_incoming(db, entry)
stats['direct_messages'] += 1
elif msg_type == 'SENT_MSG':
if entry.get('txt_type', 0) == 0: # Only private messages
ts = entry.get('timestamp', 0)
text = entry.get('text', '')[:50]
dedup = (ts, text)
if dedup in seen_dm:
stats['skipped'] += 1
continue
seen_dm.add(dedup)
_migrate_dm_outgoing(db, entry, device_name)
stats['direct_messages'] += 1
else:
stats['skipped'] += 1
else:
stats['skipped'] += 1
except Exception as e:
stats['errors'] += 1
if stats['errors'] <= 5:
logger.warning(f"Migration error in {msgs_file.name} line {line_num}: {e}")
logger.info(
f" {msgs_file.name}: {stats['channel_messages']} chan, "
f"{stats['direct_messages']} DMs, {stats['skipped']} skip, "
f"{stats['errors']} err"
)
return stats
def _migrate_channel_msg(db, entry: dict, device_name: str):
"""Migrate a CHAN or SENT_CHAN entry."""
raw_text = entry.get('text', '').strip()
if not raw_text:
return
is_own = entry.get('type') == 'SENT_CHAN'
channel_idx = entry.get('channel_idx', 0)
timestamp = entry.get('timestamp', 0)
if is_own:
sender = entry.get('sender', device_name)
content = raw_text
else:
# Parse sender from "SenderName: message" format
if ':' in raw_text:
sender, content = raw_text.split(':', 1)
sender = sender.strip()
content = content.strip()
else:
sender = 'Unknown'
content = raw_text
db.insert_channel_message(
channel_idx=channel_idx,
sender=sender,
content=content,
timestamp=timestamp,
sender_timestamp=entry.get('sender_timestamp'),
is_own=is_own,
txt_type=entry.get('txt_type', 0),
snr=entry.get('SNR'),
path_len=entry.get('path_len'),
pkt_payload=entry.get('pkt_payload'),
raw_json=json.dumps(entry, default=str),
)
def _migrate_dm_incoming(db, entry: dict):
"""Migrate a PRIV (incoming DM) entry."""
text = entry.get('text', '').strip()
if not text:
return
pubkey_prefix = entry.get('pubkey_prefix', '')
sender_name = entry.get('name', '')
# Resolve prefix to full key if contact exists
contact_key = pubkey_prefix if pubkey_prefix else None
if contact_key:
contact_key = _resolve_pubkey(db, contact_key)
# Create/update contact with sender name from v1 data
if contact_key and sender_name:
db.upsert_contact(
public_key=contact_key,
name=sender_name,
source='message',
)
db.insert_direct_message(
contact_pubkey=contact_key,
direction='in',
content=text,
timestamp=entry.get('timestamp', 0),
sender_timestamp=entry.get('sender_timestamp'),
txt_type=entry.get('txt_type', 0),
snr=entry.get('SNR'),
path_len=entry.get('path_len'),
pkt_payload=entry.get('pkt_payload'),
raw_json=json.dumps(entry, default=str),
)
def _migrate_dm_outgoing(db, entry: dict, device_name: str):
"""Migrate a SENT_MSG (outgoing DM) entry."""
text = entry.get('text', '').strip()
if not text:
return
# For outgoing DMs, we don't have recipient pubkey in v1 data.
# In v1, conversation_id was "name_{recipient}" — we store the name
# in raw_json for reference.
recipient = entry.get('recipient', entry.get('name', ''))
# Try to find pubkey from contacts table by recipient name
contact_pubkey = _lookup_pubkey_by_name(db, recipient)
db.insert_direct_message(
contact_pubkey=contact_pubkey,
direction='out',
content=text,
timestamp=entry.get('timestamp', 0),
sender_timestamp=entry.get('sender_timestamp'),
txt_type=entry.get('txt_type', 0),
expected_ack=entry.get('expected_ack'),
pkt_payload=entry.get('pkt_payload'),
raw_json=json.dumps(entry, default=str),
)
def _resolve_pubkey(db, pubkey_prefix: str) -> Optional[str]:
"""Check if a pubkey prefix matches a contact. Returns full key or None."""
if not pubkey_prefix:
return None
try:
contacts = db.get_contacts()
prefix = pubkey_prefix.lower()
for c in contacts:
pk = (c.get('public_key') or '').lower()
if pk and pk.startswith(prefix):
return pk
except Exception:
pass
return None
def _lookup_pubkey_by_name(db, name: str) -> Optional[str]:
"""Look up a contact's public_key by name. Returns None if not found."""
if not name:
return None
try:
contacts = db.get_contacts()
for c in contacts:
if c.get('name') == name:
return c.get('public_key')
except Exception:
pass
return None
def should_migrate(db, data_dir: Path, device_name: str) -> bool:
"""Check if migration is needed: .msgs files exist and DB has no messages."""
# Check for live file
has_live = _find_msgs_file(data_dir, device_name) is not None
# Check for archive files
has_archives = len(_find_archive_files(data_dir, device_name)) > 0
if not has_live and not has_archives:
return False
# Only migrate if DB is empty (no channel messages and no DMs)
try:
stats = db.get_stats()
total = stats.get('channel_messages', 0) + stats.get('direct_messages', 0)
return total == 0
except Exception:
return False