mirror of
https://github.com/MarekWo/mc-webui.git
synced 2026-03-28 17:42:45 +01:00
- DM handler: don't overwrite contact name with prefix when name unknown - Migration: upsert contact with sender name from v1 PRIV entries - Fixes conversations showing "4e45565e" instead of "demo mc-webui" Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
360 lines
12 KiB
Python
360 lines
12 KiB
Python
"""
|
|
Migrate v1 data (.msgs JSONL) into v2 SQLite database.
|
|
|
|
Runs automatically on startup if .msgs file exists and database is empty.
|
|
Can also be run manually: python -m app.migrate_v1
|
|
|
|
Migrates:
|
|
- Live .msgs file (today's messages)
|
|
- Archive .msgs files (historical messages)
|
|
- Channel messages (CHAN, SENT_CHAN)
|
|
- Direct messages (PRIV, SENT_MSG)
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
from pathlib import Path
|
|
from typing import Optional, List
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def _find_msgs_file(data_dir: Path, device_name: str) -> Optional[Path]:
|
|
"""Find the live .msgs file for the given device name."""
|
|
msgs_file = data_dir / f"{device_name}.msgs"
|
|
if msgs_file.exists():
|
|
return msgs_file
|
|
|
|
# Try to find any .msgs file in the data dir
|
|
candidates = list(data_dir.glob("*.msgs"))
|
|
# Exclude archive files (pattern: name.YYYY-MM-DD.msgs)
|
|
live_files = [f for f in candidates if f.stem.count('.') == 0]
|
|
if len(live_files) == 1:
|
|
return live_files[0]
|
|
|
|
return None
|
|
|
|
|
|
def _find_archive_files(data_dir: Path, device_name: str) -> List[Path]:
|
|
"""Find all archive .msgs files, sorted oldest first."""
|
|
archive_files = []
|
|
|
|
# Check common archive locations
|
|
archive_dirs = [
|
|
data_dir / 'archive', # /data/archive/
|
|
data_dir.parent / 'archive', # sibling archive dir
|
|
]
|
|
|
|
for archive_dir in archive_dirs:
|
|
if archive_dir.exists():
|
|
# Pattern: DeviceName.YYYY-MM-DD.msgs
|
|
for f in archive_dir.glob(f"{device_name}.*.msgs"):
|
|
# Validate it's an archive file (has date in name)
|
|
parts = f.stem.split('.')
|
|
if len(parts) >= 2:
|
|
archive_files.append(f)
|
|
|
|
# Also check data_dir itself for archives
|
|
for f in data_dir.glob(f"{device_name}.*.msgs"):
|
|
parts = f.stem.split('.')
|
|
if len(parts) >= 2 and f not in archive_files:
|
|
archive_files.append(f)
|
|
|
|
# Sort by filename (which sorts by date since format is Name.YYYY-MM-DD)
|
|
archive_files.sort(key=lambda f: f.name)
|
|
|
|
return archive_files
|
|
|
|
|
|
def migrate_v1_data(db, data_dir: Path, device_name: str) -> dict:
|
|
"""
|
|
Import v1 .msgs data into v2 SQLite database.
|
|
Imports both live .msgs file and all archive files.
|
|
|
|
Args:
|
|
db: Database instance
|
|
data_dir: Path to meshcore config dir containing .msgs file
|
|
device_name: Device name (used for .msgs filename and own message detection)
|
|
|
|
Returns:
|
|
dict with migration stats
|
|
"""
|
|
stats = {
|
|
'channel_messages': 0,
|
|
'direct_messages': 0,
|
|
'skipped': 0,
|
|
'errors': 0,
|
|
'files_processed': 0,
|
|
}
|
|
|
|
# Collect all files to import: archives first (oldest), then live
|
|
files_to_import = []
|
|
|
|
archive_files = _find_archive_files(data_dir, device_name)
|
|
if archive_files:
|
|
files_to_import.extend(archive_files)
|
|
logger.info(f"Found {len(archive_files)} archive files to migrate")
|
|
|
|
live_file = _find_msgs_file(data_dir, device_name)
|
|
if live_file:
|
|
files_to_import.append(live_file)
|
|
|
|
if not files_to_import:
|
|
logger.info("No .msgs files found, skipping v1 migration")
|
|
return {'status': 'skipped', 'reason': 'no_msgs_files'}
|
|
|
|
logger.info(f"Starting v1 data migration: {len(files_to_import)} files to process")
|
|
|
|
# Track seen timestamps+text to avoid duplicates across archive and live file
|
|
seen_channel = set()
|
|
seen_dm = set()
|
|
|
|
for msgs_file in files_to_import:
|
|
file_stats = _import_msgs_file(
|
|
db, msgs_file, device_name, seen_channel, seen_dm
|
|
)
|
|
stats['channel_messages'] += file_stats['channel_messages']
|
|
stats['direct_messages'] += file_stats['direct_messages']
|
|
stats['skipped'] += file_stats['skipped']
|
|
stats['errors'] += file_stats['errors']
|
|
stats['files_processed'] += 1
|
|
|
|
stats['status'] = 'completed'
|
|
logger.info(
|
|
f"v1 migration complete: {stats['files_processed']} files, "
|
|
f"{stats['channel_messages']} channel msgs, "
|
|
f"{stats['direct_messages']} DMs, {stats['skipped']} skipped, "
|
|
f"{stats['errors']} errors"
|
|
)
|
|
return stats
|
|
|
|
|
|
def _import_msgs_file(db, msgs_file: Path, device_name: str,
|
|
seen_channel: set, seen_dm: set) -> dict:
|
|
"""Import a single .msgs file. Returns per-file stats."""
|
|
stats = {'channel_messages': 0, 'direct_messages': 0, 'skipped': 0, 'errors': 0}
|
|
|
|
logger.info(f"Importing {msgs_file.name}...")
|
|
|
|
try:
|
|
lines = msgs_file.read_text(encoding='utf-8', errors='replace').splitlines()
|
|
except Exception as e:
|
|
logger.error(f"Failed to read {msgs_file}: {e}")
|
|
stats['errors'] += 1
|
|
return stats
|
|
|
|
for line_num, raw_line in enumerate(lines, 1):
|
|
raw_line = raw_line.strip()
|
|
if not raw_line:
|
|
continue
|
|
|
|
try:
|
|
entry = json.loads(raw_line)
|
|
except json.JSONDecodeError:
|
|
stats['errors'] += 1
|
|
continue
|
|
|
|
msg_type = entry.get('type')
|
|
|
|
try:
|
|
if msg_type in ('CHAN', 'SENT_CHAN'):
|
|
# Dedup key: timestamp + first 50 chars of text
|
|
ts = entry.get('timestamp', 0)
|
|
text = entry.get('text', '')[:50]
|
|
dedup = (ts, text)
|
|
if dedup in seen_channel:
|
|
stats['skipped'] += 1
|
|
continue
|
|
seen_channel.add(dedup)
|
|
|
|
_migrate_channel_msg(db, entry, device_name)
|
|
stats['channel_messages'] += 1
|
|
elif msg_type == 'PRIV':
|
|
ts = entry.get('timestamp', 0)
|
|
text = entry.get('text', '')[:50]
|
|
dedup = (ts, text)
|
|
if dedup in seen_dm:
|
|
stats['skipped'] += 1
|
|
continue
|
|
seen_dm.add(dedup)
|
|
|
|
_migrate_dm_incoming(db, entry)
|
|
stats['direct_messages'] += 1
|
|
elif msg_type == 'SENT_MSG':
|
|
if entry.get('txt_type', 0) == 0: # Only private messages
|
|
ts = entry.get('timestamp', 0)
|
|
text = entry.get('text', '')[:50]
|
|
dedup = (ts, text)
|
|
if dedup in seen_dm:
|
|
stats['skipped'] += 1
|
|
continue
|
|
seen_dm.add(dedup)
|
|
|
|
_migrate_dm_outgoing(db, entry, device_name)
|
|
stats['direct_messages'] += 1
|
|
else:
|
|
stats['skipped'] += 1
|
|
else:
|
|
stats['skipped'] += 1
|
|
except Exception as e:
|
|
stats['errors'] += 1
|
|
if stats['errors'] <= 5:
|
|
logger.warning(f"Migration error in {msgs_file.name} line {line_num}: {e}")
|
|
|
|
logger.info(
|
|
f" {msgs_file.name}: {stats['channel_messages']} chan, "
|
|
f"{stats['direct_messages']} DMs, {stats['skipped']} skip, "
|
|
f"{stats['errors']} err"
|
|
)
|
|
return stats
|
|
|
|
|
|
def _migrate_channel_msg(db, entry: dict, device_name: str):
|
|
"""Migrate a CHAN or SENT_CHAN entry."""
|
|
raw_text = entry.get('text', '').strip()
|
|
if not raw_text:
|
|
return
|
|
|
|
is_own = entry.get('type') == 'SENT_CHAN'
|
|
channel_idx = entry.get('channel_idx', 0)
|
|
timestamp = entry.get('timestamp', 0)
|
|
|
|
if is_own:
|
|
sender = entry.get('sender', device_name)
|
|
content = raw_text
|
|
else:
|
|
# Parse sender from "SenderName: message" format
|
|
if ':' in raw_text:
|
|
sender, content = raw_text.split(':', 1)
|
|
sender = sender.strip()
|
|
content = content.strip()
|
|
else:
|
|
sender = 'Unknown'
|
|
content = raw_text
|
|
|
|
db.insert_channel_message(
|
|
channel_idx=channel_idx,
|
|
sender=sender,
|
|
content=content,
|
|
timestamp=timestamp,
|
|
sender_timestamp=entry.get('sender_timestamp'),
|
|
is_own=is_own,
|
|
txt_type=entry.get('txt_type', 0),
|
|
snr=entry.get('SNR'),
|
|
path_len=entry.get('path_len'),
|
|
pkt_payload=entry.get('pkt_payload'),
|
|
raw_json=json.dumps(entry, default=str),
|
|
)
|
|
|
|
|
|
def _migrate_dm_incoming(db, entry: dict):
|
|
"""Migrate a PRIV (incoming DM) entry."""
|
|
text = entry.get('text', '').strip()
|
|
if not text:
|
|
return
|
|
|
|
pubkey_prefix = entry.get('pubkey_prefix', '')
|
|
sender_name = entry.get('name', '')
|
|
|
|
# Resolve prefix to full key if contact exists
|
|
contact_key = pubkey_prefix if pubkey_prefix else None
|
|
if contact_key:
|
|
contact_key = _resolve_pubkey(db, contact_key)
|
|
|
|
# Create/update contact with sender name from v1 data
|
|
if contact_key and sender_name:
|
|
db.upsert_contact(
|
|
public_key=contact_key,
|
|
name=sender_name,
|
|
source='message',
|
|
)
|
|
|
|
db.insert_direct_message(
|
|
contact_pubkey=contact_key,
|
|
direction='in',
|
|
content=text,
|
|
timestamp=entry.get('timestamp', 0),
|
|
sender_timestamp=entry.get('sender_timestamp'),
|
|
txt_type=entry.get('txt_type', 0),
|
|
snr=entry.get('SNR'),
|
|
path_len=entry.get('path_len'),
|
|
pkt_payload=entry.get('pkt_payload'),
|
|
raw_json=json.dumps(entry, default=str),
|
|
)
|
|
|
|
|
|
def _migrate_dm_outgoing(db, entry: dict, device_name: str):
|
|
"""Migrate a SENT_MSG (outgoing DM) entry."""
|
|
text = entry.get('text', '').strip()
|
|
if not text:
|
|
return
|
|
|
|
# For outgoing DMs, we don't have recipient pubkey in v1 data.
|
|
# In v1, conversation_id was "name_{recipient}" — we store the name
|
|
# in raw_json for reference.
|
|
recipient = entry.get('recipient', entry.get('name', ''))
|
|
|
|
# Try to find pubkey from contacts table by recipient name
|
|
contact_pubkey = _lookup_pubkey_by_name(db, recipient)
|
|
|
|
db.insert_direct_message(
|
|
contact_pubkey=contact_pubkey,
|
|
direction='out',
|
|
content=text,
|
|
timestamp=entry.get('timestamp', 0),
|
|
sender_timestamp=entry.get('sender_timestamp'),
|
|
txt_type=entry.get('txt_type', 0),
|
|
expected_ack=entry.get('expected_ack'),
|
|
pkt_payload=entry.get('pkt_payload'),
|
|
raw_json=json.dumps(entry, default=str),
|
|
)
|
|
|
|
|
|
def _resolve_pubkey(db, pubkey_prefix: str) -> Optional[str]:
|
|
"""Check if a pubkey prefix matches a contact. Returns full key or None."""
|
|
if not pubkey_prefix:
|
|
return None
|
|
try:
|
|
contacts = db.get_contacts()
|
|
prefix = pubkey_prefix.lower()
|
|
for c in contacts:
|
|
pk = (c.get('public_key') or '').lower()
|
|
if pk and pk.startswith(prefix):
|
|
return pk
|
|
except Exception:
|
|
pass
|
|
return None
|
|
|
|
|
|
def _lookup_pubkey_by_name(db, name: str) -> Optional[str]:
|
|
"""Look up a contact's public_key by name. Returns None if not found."""
|
|
if not name:
|
|
return None
|
|
try:
|
|
contacts = db.get_contacts()
|
|
for c in contacts:
|
|
if c.get('name') == name:
|
|
return c.get('public_key')
|
|
except Exception:
|
|
pass
|
|
return None
|
|
|
|
|
|
def should_migrate(db, data_dir: Path, device_name: str) -> bool:
|
|
"""Check if migration is needed: .msgs files exist and DB has no messages."""
|
|
# Check for live file
|
|
has_live = _find_msgs_file(data_dir, device_name) is not None
|
|
# Check for archive files
|
|
has_archives = len(_find_archive_files(data_dir, device_name)) > 0
|
|
|
|
if not has_live and not has_archives:
|
|
return False
|
|
|
|
# Only migrate if DB is empty (no channel messages and no DMs)
|
|
try:
|
|
stats = db.get_stats()
|
|
total = stats.get('channel_messages', 0) + stats.get('direct_messages', 0)
|
|
return total == 0
|
|
except Exception:
|
|
return False
|