From 752c60f02d3e50463bbff966ea1e7c2bbf631bfe Mon Sep 17 00:00:00 2001 From: MarekWo Date: Sun, 1 Mar 2026 13:05:33 +0100 Subject: [PATCH] feat(v2): Import archive .msgs files + DB-based message history Migration now imports all archive files (oldest first) in addition to the live .msgs file, with deduplication. Archives endpoint and message history now query SQLite by date instead of reading .msgs files. Co-Authored-By: Claude Opus 4.6 --- app/database.py | 30 ++++++++++ app/migrate_v1.py | 147 ++++++++++++++++++++++++++++++++++++++-------- app/routes/api.py | 49 ++++++++++++---- 3 files changed, 190 insertions(+), 36 deletions(-) diff --git a/app/database.py b/app/database.py index d2c7173..fd3cb01 100644 --- a/app/database.py +++ b/app/database.py @@ -223,6 +223,36 @@ class Database: rows = conn.execute(query, params).fetchall() return [dict(r) for r in rows] + def get_message_dates(self) -> List[Dict]: + """Get distinct dates that have channel messages, with counts. + Returns list of {'date': 'YYYY-MM-DD', 'message_count': N}, newest first.""" + with self._connect() as conn: + rows = conn.execute( + """SELECT date(timestamp, 'unixepoch', 'localtime') as date, + COUNT(*) as message_count + FROM channel_messages + WHERE timestamp > 0 + GROUP BY date + ORDER BY date DESC""" + ).fetchall() + return [dict(r) for r in rows] + + def get_channel_messages_by_date(self, date_str: str, + channel_idx: int = None) -> List[Dict]: + """Get channel messages for a specific date (YYYY-MM-DD).""" + with self._connect() as conn: + conditions = ["date(timestamp, 'unixepoch', 'localtime') = ?"] + params: list = [date_str] + + if channel_idx is not None: + conditions.append("channel_idx = ?") + params.append(channel_idx) + + where = " WHERE " + " AND ".join(conditions) + query = f"SELECT * FROM channel_messages{where} ORDER BY timestamp ASC" + rows = conn.execute(query, params).fetchall() + return [dict(r) for r in rows] + def delete_channel_messages(self, channel_idx: int) -> int: with self._connect() as conn: cursor = conn.execute( diff --git a/app/migrate_v1.py b/app/migrate_v1.py index c03e26c..14d09ed 100644 --- a/app/migrate_v1.py +++ b/app/migrate_v1.py @@ -5,22 +5,22 @@ Runs automatically on startup if .msgs file exists and database is empty. Can also be run manually: python -m app.migrate_v1 Migrates: +- Live .msgs file (today's messages) +- Archive .msgs files (historical messages) - Channel messages (CHAN, SENT_CHAN) - Direct messages (PRIV, SENT_MSG) """ import json import logging -import time from pathlib import Path -from datetime import datetime -from typing import Optional +from typing import Optional, List logger = logging.getLogger(__name__) def _find_msgs_file(data_dir: Path, device_name: str) -> Optional[Path]: - """Find the .msgs file for the given device name.""" + """Find the live .msgs file for the given device name.""" msgs_file = data_dir / f"{device_name}.msgs" if msgs_file.exists(): return msgs_file @@ -35,9 +35,41 @@ def _find_msgs_file(data_dir: Path, device_name: str) -> Optional[Path]: return None +def _find_archive_files(data_dir: Path, device_name: str) -> List[Path]: + """Find all archive .msgs files, sorted oldest first.""" + archive_files = [] + + # Check common archive locations + archive_dirs = [ + data_dir / 'archive', # /data/archive/ + data_dir.parent / 'archive', # sibling archive dir + ] + + for archive_dir in archive_dirs: + if archive_dir.exists(): + # Pattern: DeviceName.YYYY-MM-DD.msgs + for f in archive_dir.glob(f"{device_name}.*.msgs"): + # Validate it's an archive file (has date in name) + parts = f.stem.split('.') + if len(parts) >= 2: + archive_files.append(f) + + # Also check data_dir itself for archives + for f in data_dir.glob(f"{device_name}.*.msgs"): + parts = f.stem.split('.') + if len(parts) >= 2 and f not in archive_files: + archive_files.append(f) + + # Sort by filename (which sorts by date since format is Name.YYYY-MM-DD) + archive_files.sort(key=lambda f: f.name) + + return archive_files + + def migrate_v1_data(db, data_dir: Path, device_name: str) -> dict: """ Import v1 .msgs data into v2 SQLite database. + Imports both live .msgs file and all archive files. Args: db: Database instance @@ -47,26 +79,69 @@ def migrate_v1_data(db, data_dir: Path, device_name: str) -> dict: Returns: dict with migration stats """ - msgs_file = _find_msgs_file(data_dir, device_name) - if not msgs_file: - logger.info("No .msgs file found, skipping v1 migration") - return {'status': 'skipped', 'reason': 'no_msgs_file'} - stats = { 'channel_messages': 0, 'direct_messages': 0, 'skipped': 0, 'errors': 0, - 'file': str(msgs_file), + 'files_processed': 0, } - logger.info(f"Starting v1 data migration from {msgs_file}") + # Collect all files to import: archives first (oldest), then live + files_to_import = [] + + archive_files = _find_archive_files(data_dir, device_name) + if archive_files: + files_to_import.extend(archive_files) + logger.info(f"Found {len(archive_files)} archive files to migrate") + + live_file = _find_msgs_file(data_dir, device_name) + if live_file: + files_to_import.append(live_file) + + if not files_to_import: + logger.info("No .msgs files found, skipping v1 migration") + return {'status': 'skipped', 'reason': 'no_msgs_files'} + + logger.info(f"Starting v1 data migration: {len(files_to_import)} files to process") + + # Track seen timestamps+text to avoid duplicates across archive and live file + seen_channel = set() + seen_dm = set() + + for msgs_file in files_to_import: + file_stats = _import_msgs_file( + db, msgs_file, device_name, seen_channel, seen_dm + ) + stats['channel_messages'] += file_stats['channel_messages'] + stats['direct_messages'] += file_stats['direct_messages'] + stats['skipped'] += file_stats['skipped'] + stats['errors'] += file_stats['errors'] + stats['files_processed'] += 1 + + stats['status'] = 'completed' + logger.info( + f"v1 migration complete: {stats['files_processed']} files, " + f"{stats['channel_messages']} channel msgs, " + f"{stats['direct_messages']} DMs, {stats['skipped']} skipped, " + f"{stats['errors']} errors" + ) + return stats + + +def _import_msgs_file(db, msgs_file: Path, device_name: str, + seen_channel: set, seen_dm: set) -> dict: + """Import a single .msgs file. Returns per-file stats.""" + stats = {'channel_messages': 0, 'direct_messages': 0, 'skipped': 0, 'errors': 0} + + logger.info(f"Importing {msgs_file.name}...") try: lines = msgs_file.read_text(encoding='utf-8', errors='replace').splitlines() except Exception as e: - logger.error(f"Failed to read .msgs file: {e}") - return {'status': 'error', 'reason': str(e)} + logger.error(f"Failed to read {msgs_file}: {e}") + stats['errors'] += 1 + return stats for line_num, raw_line in enumerate(lines, 1): raw_line = raw_line.strip() @@ -83,13 +158,38 @@ def migrate_v1_data(db, data_dir: Path, device_name: str) -> dict: try: if msg_type in ('CHAN', 'SENT_CHAN'): + # Dedup key: timestamp + first 50 chars of text + ts = entry.get('timestamp', 0) + text = entry.get('text', '')[:50] + dedup = (ts, text) + if dedup in seen_channel: + stats['skipped'] += 1 + continue + seen_channel.add(dedup) + _migrate_channel_msg(db, entry, device_name) stats['channel_messages'] += 1 elif msg_type == 'PRIV': + ts = entry.get('timestamp', 0) + text = entry.get('text', '')[:50] + dedup = (ts, text) + if dedup in seen_dm: + stats['skipped'] += 1 + continue + seen_dm.add(dedup) + _migrate_dm_incoming(db, entry) stats['direct_messages'] += 1 elif msg_type == 'SENT_MSG': if entry.get('txt_type', 0) == 0: # Only private messages + ts = entry.get('timestamp', 0) + text = entry.get('text', '')[:50] + dedup = (ts, text) + if dedup in seen_dm: + stats['skipped'] += 1 + continue + seen_dm.add(dedup) + _migrate_dm_outgoing(db, entry, device_name) stats['direct_messages'] += 1 else: @@ -99,13 +199,12 @@ def migrate_v1_data(db, data_dir: Path, device_name: str) -> dict: except Exception as e: stats['errors'] += 1 if stats['errors'] <= 5: - logger.warning(f"Migration error on line {line_num}: {e}") + logger.warning(f"Migration error in {msgs_file.name} line {line_num}: {e}") - stats['status'] = 'completed' logger.info( - f"v1 migration complete: {stats['channel_messages']} channel msgs, " - f"{stats['direct_messages']} DMs, {stats['skipped']} skipped, " - f"{stats['errors']} errors" + f" {msgs_file.name}: {stats['channel_messages']} chan, " + f"{stats['direct_messages']} DMs, {stats['skipped']} skip, " + f"{stats['errors']} err" ) return stats @@ -182,8 +281,6 @@ def _migrate_dm_outgoing(db, entry: dict, device_name: str): return # For outgoing DMs, we don't have recipient pubkey in v1 data. - # Use empty string — the messages will appear once a contact with - # matching name is found. # In v1, conversation_id was "name_{recipient}" — we store the name # in raw_json for reference. recipient = entry.get('recipient', entry.get('name', '')) @@ -235,9 +332,13 @@ def _lookup_pubkey_by_name(db, name: str) -> Optional[str]: def should_migrate(db, data_dir: Path, device_name: str) -> bool: - """Check if migration is needed: .msgs exists and DB has no messages.""" - msgs_file = _find_msgs_file(data_dir, device_name) - if not msgs_file: + """Check if migration is needed: .msgs files exist and DB has no messages.""" + # Check for live file + has_live = _find_msgs_file(data_dir, device_name) is not None + # Check for archive files + has_archives = len(_find_archive_files(data_dir, device_name)) > 0 + + if not has_live and not has_archives: return False # Only migrate if DB is empty (no channel messages and no DMs) diff --git a/app/routes/api.py b/app/routes/api.py index 996b751..247d540 100644 --- a/app/routes/api.py +++ b/app/routes/api.py @@ -344,14 +344,23 @@ def get_messages(): # v2: Read messages from Database db = _get_db() - if db and not archive_date: - ch = channel_idx if channel_idx is not None else 0 - db_messages = db.get_channel_messages( - channel_idx=ch, - limit=limit or 50, - offset=offset, - days=days, - ) + if db: + if archive_date: + # Archive view: query by specific date + db_messages = db.get_channel_messages_by_date( + date_str=archive_date, + channel_idx=channel_idx, + ) + else: + # Live view: query with limit/offset/days + ch = channel_idx if channel_idx is not None else 0 + db_messages = db.get_channel_messages( + channel_idx=ch, + limit=limit or 50, + offset=offset, + days=days, + ) + # Convert DB rows to frontend-compatible format messages = [] for row in db_messages: @@ -378,7 +387,7 @@ def get_messages(): msg['echo_count'] = len(echoes) msg['echo_paths'] = [e.get('path', '') for e in echoes] else: - # Fallback to parser for archive reads + # Fallback to parser for file-based reads messages = parser.read_messages( limit=limit, offset=offset, @@ -1065,15 +1074,29 @@ def sync_messages(): @api_bp.route('/archives', methods=['GET']) def get_archives(): """ - Get list of available message archives. + Get list of available message archives (dates with messages). Returns: JSON with list of archives, each with: - - date (str): Archive date in YYYY-MM-DD format - - message_count (int): Number of messages in archive - - file_size (int): Archive file size in bytes + - date (str): Date in YYYY-MM-DD format + - message_count (int): Number of messages on that date """ try: + # v2: Query distinct dates from SQLite + db = _get_db() + if db: + dates = db.get_message_dates() + # Exclude today (that's "Live") + from datetime import date as date_cls + today = date_cls.today().isoformat() + archives = [d for d in dates if d['date'] != today] + return jsonify({ + 'success': True, + 'archives': archives, + 'count': len(archives) + }), 200 + + # Fallback to file-based archives archives = archive_manager.list_archives() return jsonify({