forked from iarv/Remote-Terminal-for-MeshCore
Add database-level payload dedupe
This commit is contained in:
@@ -215,8 +215,6 @@ All endpoints are prefixed with `/api` (e.g., `/api/health`).
|
||||
| POST | `/api/packets/decrypt/historical` | Decrypt stored packets |
|
||||
| GET | `/api/packets/decrypt/progress` | Get historical decryption progress |
|
||||
| POST | `/api/packets/maintenance` | Delete old packets (cleanup) |
|
||||
| POST | `/api/packets/dedup` | Remove duplicate raw packets |
|
||||
| GET | `/api/packets/dedup/progress` | Get deduplication progress |
|
||||
| POST | `/api/contacts/{key}/mark-read` | Mark contact conversation as read |
|
||||
| POST | `/api/channels/{key}/mark-read` | Mark channel as read |
|
||||
| POST | `/api/read-state/mark-all-read` | Mark all conversations as read |
|
||||
|
||||
@@ -50,12 +50,14 @@ CREATE TABLE IF NOT EXISTS raw_packets (
|
||||
timestamp INTEGER NOT NULL,
|
||||
data BLOB NOT NULL,
|
||||
message_id INTEGER,
|
||||
payload_hash TEXT,
|
||||
FOREIGN KEY (message_id) REFERENCES messages(id)
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_messages_conversation ON messages(type, conversation_key);
|
||||
CREATE INDEX IF NOT EXISTS idx_messages_received ON messages(received_at);
|
||||
CREATE INDEX IF NOT EXISTS idx_raw_packets_message_id ON raw_packets(message_id);
|
||||
CREATE UNIQUE INDEX IF NOT EXISTS idx_raw_packets_payload_hash ON raw_packets(payload_hash);
|
||||
CREATE INDEX IF NOT EXISTS idx_contacts_on_radio ON contacts(on_radio);
|
||||
"""
|
||||
|
||||
|
||||
@@ -9,6 +9,7 @@ so all migrations run in order on first startup after upgrade.
|
||||
"""
|
||||
|
||||
import logging
|
||||
from hashlib import sha256
|
||||
|
||||
import aiosqlite
|
||||
|
||||
@@ -57,6 +58,20 @@ async def run_migrations(conn: aiosqlite.Connection) -> int:
|
||||
await set_version(conn, 3)
|
||||
applied += 1
|
||||
|
||||
# Migration 4: Add payload_hash column for deduplication
|
||||
if version < 4:
|
||||
logger.info("Applying migration 4: add payload_hash column")
|
||||
await _migrate_004_add_payload_hash_column(conn)
|
||||
await set_version(conn, 4)
|
||||
applied += 1
|
||||
|
||||
# Migration 5: Backfill payload hashes and deduplicate existing packets
|
||||
if version < 5:
|
||||
logger.info("Applying migration 5: backfill payload hashes and dedupe")
|
||||
await _migrate_005_backfill_payload_hashes(conn)
|
||||
await set_version(conn, 5)
|
||||
applied += 1
|
||||
|
||||
if applied > 0:
|
||||
logger.info(
|
||||
"Applied %d migration(s), schema now at version %d", applied, await get_version(conn)
|
||||
@@ -169,3 +184,167 @@ async def _migrate_003_drop_decrypted_column(conn: aiosqlite.Connection) -> None
|
||||
raise
|
||||
|
||||
await conn.commit()
|
||||
|
||||
|
||||
async def _migrate_004_add_payload_hash_column(conn: aiosqlite.Connection) -> None:
|
||||
"""
|
||||
Add payload_hash column to raw_packets for deduplication.
|
||||
|
||||
This column stores the SHA-256 hash of the packet payload (excluding routing/path info).
|
||||
It will be used with a unique index to prevent duplicate packets from being stored.
|
||||
"""
|
||||
try:
|
||||
await conn.execute("ALTER TABLE raw_packets ADD COLUMN payload_hash TEXT")
|
||||
logger.debug("Added payload_hash column to raw_packets table")
|
||||
except aiosqlite.OperationalError as e:
|
||||
if "duplicate column name" in str(e).lower():
|
||||
logger.debug("raw_packets.payload_hash already exists, skipping")
|
||||
else:
|
||||
raise
|
||||
|
||||
await conn.commit()
|
||||
|
||||
|
||||
def _extract_payload_for_hash(raw_packet: bytes) -> bytes | None:
|
||||
"""
|
||||
Extract payload from a raw packet for hashing (migration-local copy of decoder logic).
|
||||
|
||||
Returns the payload bytes, or None if packet is malformed.
|
||||
"""
|
||||
if len(raw_packet) < 2:
|
||||
return None
|
||||
|
||||
try:
|
||||
header = raw_packet[0]
|
||||
route_type = header & 0x03
|
||||
offset = 1
|
||||
|
||||
# Skip transport codes if present (TRANSPORT_FLOOD=0, TRANSPORT_DIRECT=3)
|
||||
if route_type in (0x00, 0x03):
|
||||
if len(raw_packet) < offset + 4:
|
||||
return None
|
||||
offset += 4
|
||||
|
||||
# Get path length
|
||||
if len(raw_packet) < offset + 1:
|
||||
return None
|
||||
path_length = raw_packet[offset]
|
||||
offset += 1
|
||||
|
||||
# Skip path bytes
|
||||
if len(raw_packet) < offset + path_length:
|
||||
return None
|
||||
offset += path_length
|
||||
|
||||
# Rest is payload (may be empty, matching decoder.py behavior)
|
||||
return raw_packet[offset:]
|
||||
except (IndexError, ValueError):
|
||||
return None
|
||||
|
||||
|
||||
async def _migrate_005_backfill_payload_hashes(conn: aiosqlite.Connection) -> None:
|
||||
"""
|
||||
Backfill payload_hash for existing packets and remove duplicates.
|
||||
|
||||
This may take a while for large databases. Progress is logged.
|
||||
After backfilling, a unique index is created to prevent future duplicates.
|
||||
"""
|
||||
# Get count first
|
||||
cursor = await conn.execute("SELECT COUNT(*) FROM raw_packets WHERE payload_hash IS NULL")
|
||||
row = await cursor.fetchone()
|
||||
total = row[0] if row else 0
|
||||
|
||||
if total == 0:
|
||||
logger.debug("No packets need hash backfill")
|
||||
else:
|
||||
logger.info("Backfilling payload hashes for %d packets. This may take a while...", total)
|
||||
|
||||
# Process in batches to avoid memory issues
|
||||
batch_size = 1000
|
||||
processed = 0
|
||||
duplicates_deleted = 0
|
||||
|
||||
# Track seen hashes to identify duplicates (keep oldest = lowest ID)
|
||||
seen_hashes: dict[str, int] = {} # hash -> oldest packet ID
|
||||
|
||||
# First pass: compute hashes and identify duplicates
|
||||
cursor = await conn.execute("SELECT id, data FROM raw_packets ORDER BY id ASC")
|
||||
|
||||
packets_to_update: list[tuple[str, int]] = [] # (hash, id)
|
||||
ids_to_delete: list[int] = []
|
||||
|
||||
while True:
|
||||
rows = await cursor.fetchmany(batch_size)
|
||||
if not rows:
|
||||
break
|
||||
|
||||
for row in rows:
|
||||
packet_id = row[0]
|
||||
packet_data = bytes(row[1])
|
||||
|
||||
# Extract payload and compute hash
|
||||
payload = _extract_payload_for_hash(packet_data)
|
||||
if payload:
|
||||
payload_hash = sha256(payload).hexdigest()
|
||||
else:
|
||||
# For malformed packets, hash the full data
|
||||
payload_hash = sha256(packet_data).hexdigest()
|
||||
|
||||
if payload_hash in seen_hashes:
|
||||
# Duplicate - mark for deletion (we keep the older one)
|
||||
ids_to_delete.append(packet_id)
|
||||
duplicates_deleted += 1
|
||||
else:
|
||||
# New hash - keep this packet
|
||||
seen_hashes[payload_hash] = packet_id
|
||||
packets_to_update.append((payload_hash, packet_id))
|
||||
|
||||
processed += 1
|
||||
|
||||
if processed % 10000 == 0:
|
||||
logger.info("Processed %d/%d packets...", processed, total)
|
||||
|
||||
# Second pass: update hashes for packets we're keeping
|
||||
total_updates = len(packets_to_update)
|
||||
logger.info("Updating %d packets with hashes...", total_updates)
|
||||
for idx, (payload_hash, packet_id) in enumerate(packets_to_update, 1):
|
||||
await conn.execute(
|
||||
"UPDATE raw_packets SET payload_hash = ? WHERE id = ?",
|
||||
(payload_hash, packet_id),
|
||||
)
|
||||
if idx % 10000 == 0:
|
||||
logger.info("Updated %d/%d packets...", idx, total_updates)
|
||||
|
||||
# Third pass: delete duplicates
|
||||
if ids_to_delete:
|
||||
total_deletes = len(ids_to_delete)
|
||||
logger.info("Removing %d duplicate packets...", total_deletes)
|
||||
deleted_count = 0
|
||||
# Delete in batches to avoid "too many SQL variables" error
|
||||
for i in range(0, len(ids_to_delete), 500):
|
||||
batch = ids_to_delete[i : i + 500]
|
||||
placeholders = ",".join("?" * len(batch))
|
||||
await conn.execute(f"DELETE FROM raw_packets WHERE id IN ({placeholders})", batch)
|
||||
deleted_count += len(batch)
|
||||
if deleted_count % 10000 < 500: # Log roughly every 10k
|
||||
logger.info("Removed %d/%d duplicates...", deleted_count, total_deletes)
|
||||
|
||||
await conn.commit()
|
||||
logger.info(
|
||||
"Hash backfill complete: %d packets updated, %d duplicates removed",
|
||||
len(packets_to_update),
|
||||
duplicates_deleted,
|
||||
)
|
||||
|
||||
# Create unique index on payload_hash (this enforces uniqueness going forward)
|
||||
try:
|
||||
await conn.execute(
|
||||
"CREATE UNIQUE INDEX IF NOT EXISTS idx_raw_packets_payload_hash "
|
||||
"ON raw_packets(payload_hash)"
|
||||
)
|
||||
logger.debug("Created unique index on payload_hash")
|
||||
except aiosqlite.OperationalError as e:
|
||||
if "already exists" not in str(e).lower():
|
||||
raise
|
||||
|
||||
await conn.commit()
|
||||
|
||||
@@ -155,10 +155,15 @@ async def process_raw_packet(
|
||||
Process an incoming raw packet.
|
||||
|
||||
This is the main entry point for all incoming RF packets.
|
||||
|
||||
Note: Packets are deduplicated by payload hash in the database. If we receive
|
||||
a duplicate packet (same payload, different path), we still broadcast it to
|
||||
the frontend (for the real-time packet feed) but skip decryption processing
|
||||
since the original packet was already processed.
|
||||
"""
|
||||
ts = timestamp or int(time.time())
|
||||
|
||||
packet_id = await RawPacketRepository.create(raw_bytes, ts)
|
||||
packet_id, is_new_packet = await RawPacketRepository.create(raw_bytes, ts)
|
||||
raw_hex = raw_bytes.hex()
|
||||
|
||||
# Parse packet to get type
|
||||
@@ -179,22 +184,25 @@ async def process_raw_packet(
|
||||
"sender": None,
|
||||
}
|
||||
|
||||
# Try to decrypt/parse based on payload type
|
||||
if payload_type == PayloadType.GROUP_TEXT:
|
||||
decrypt_result = await _process_group_text(raw_bytes, packet_id, ts, packet_info)
|
||||
if decrypt_result:
|
||||
result.update(decrypt_result)
|
||||
# Only process new packets - duplicates were already processed when first received
|
||||
if is_new_packet:
|
||||
# Try to decrypt/parse based on payload type
|
||||
if payload_type == PayloadType.GROUP_TEXT:
|
||||
decrypt_result = await _process_group_text(raw_bytes, packet_id, ts, packet_info)
|
||||
if decrypt_result:
|
||||
result.update(decrypt_result)
|
||||
|
||||
elif payload_type == PayloadType.ADVERT:
|
||||
await _process_advertisement(raw_bytes, ts, packet_info)
|
||||
elif payload_type == PayloadType.ADVERT:
|
||||
await _process_advertisement(raw_bytes, ts, packet_info)
|
||||
|
||||
# TODO: Add TEXT_MESSAGE (direct message) decryption when private key is available
|
||||
# elif payload_type == PayloadType.TEXT_MESSAGE:
|
||||
# decrypt_result = await _process_direct_message(raw_bytes, packet_id, ts, packet_info)
|
||||
# if decrypt_result:
|
||||
# result.update(decrypt_result)
|
||||
# TODO: Add TEXT_MESSAGE (direct message) decryption when private key is available
|
||||
# elif payload_type == PayloadType.TEXT_MESSAGE:
|
||||
# decrypt_result = await _process_direct_message(raw_bytes, packet_id, ts, packet_info)
|
||||
# if decrypt_result:
|
||||
# result.update(decrypt_result)
|
||||
|
||||
# Broadcast raw packet for the packet feed UI
|
||||
# Always broadcast raw packet for the packet feed UI (even duplicates)
|
||||
# This enables the frontend cracker to see all incoming packets in real-time
|
||||
broadcast_payload = RawPacketBroadcast(
|
||||
id=packet_id,
|
||||
timestamp=ts,
|
||||
|
||||
@@ -1,9 +1,15 @@
|
||||
import logging
|
||||
import sqlite3
|
||||
import time
|
||||
from hashlib import sha256
|
||||
from typing import Any
|
||||
|
||||
from app.database import db
|
||||
from app.decoder import extract_payload
|
||||
from app.models import Channel, Contact, Message, RawPacket
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ContactRepository:
|
||||
@staticmethod
|
||||
@@ -454,17 +460,62 @@ class MessageRepository:
|
||||
|
||||
class RawPacketRepository:
|
||||
@staticmethod
|
||||
async def create(data: bytes, timestamp: int | None = None) -> int:
|
||||
"""Create a raw packet. Returns the packet ID."""
|
||||
async def create(data: bytes, timestamp: int | None = None) -> tuple[int, bool]:
|
||||
"""
|
||||
Create a raw packet with payload-based deduplication.
|
||||
|
||||
Returns (packet_id, is_new) tuple:
|
||||
- is_new=True: New packet stored, packet_id is the new row ID
|
||||
- is_new=False: Duplicate payload detected, packet_id is the existing row ID
|
||||
|
||||
Deduplication is based on the SHA-256 hash of the packet payload
|
||||
(excluding routing/path information).
|
||||
"""
|
||||
ts = timestamp or int(time.time())
|
||||
|
||||
# Compute payload hash for deduplication
|
||||
payload = extract_payload(data)
|
||||
if payload:
|
||||
payload_hash = sha256(payload).hexdigest()
|
||||
else:
|
||||
# For malformed packets, hash the full data
|
||||
payload_hash = sha256(data).hexdigest()
|
||||
|
||||
# Check if this payload already exists
|
||||
cursor = await db.conn.execute(
|
||||
"INSERT INTO raw_packets (timestamp, data) VALUES (?, ?)",
|
||||
(ts, data),
|
||||
"SELECT id FROM raw_packets WHERE payload_hash = ?", (payload_hash,)
|
||||
)
|
||||
await db.conn.commit()
|
||||
assert cursor.lastrowid is not None # INSERT always returns a row ID
|
||||
return cursor.lastrowid
|
||||
existing = await cursor.fetchone()
|
||||
|
||||
if existing:
|
||||
# Duplicate - return existing packet ID
|
||||
return (existing["id"], False)
|
||||
|
||||
# New packet - insert with hash
|
||||
try:
|
||||
cursor = await db.conn.execute(
|
||||
"INSERT INTO raw_packets (timestamp, data, payload_hash) VALUES (?, ?, ?)",
|
||||
(ts, data, payload_hash),
|
||||
)
|
||||
await db.conn.commit()
|
||||
assert cursor.lastrowid is not None # INSERT always returns a row ID
|
||||
return (cursor.lastrowid, True)
|
||||
except sqlite3.IntegrityError:
|
||||
# Race condition: another insert with same payload_hash happened between
|
||||
# our SELECT and INSERT. This is expected for duplicate packets arriving
|
||||
# close together. Query again to get the existing ID.
|
||||
logger.debug(
|
||||
"Duplicate packet detected via race condition (payload_hash=%s), dropping",
|
||||
payload_hash[:16],
|
||||
)
|
||||
cursor = await db.conn.execute(
|
||||
"SELECT id FROM raw_packets WHERE payload_hash = ?", (payload_hash,)
|
||||
)
|
||||
existing = await cursor.fetchone()
|
||||
if existing:
|
||||
return (existing["id"], False)
|
||||
# This shouldn't happen, but if it does, re-raise
|
||||
raise
|
||||
|
||||
@staticmethod
|
||||
async def get_undecrypted_count() -> int:
|
||||
|
||||
@@ -5,7 +5,7 @@ from fastapi import APIRouter, BackgroundTasks
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
from app.database import db
|
||||
from app.decoder import extract_payload, try_decrypt_packet_with_channel_key
|
||||
from app.decoder import try_decrypt_packet_with_channel_key
|
||||
from app.packet_processor import create_message_from_decrypted
|
||||
from app.repository import RawPacketRepository
|
||||
|
||||
@@ -209,131 +209,3 @@ async def run_maintenance(request: MaintenanceRequest) -> MaintenanceResult:
|
||||
logger.info("Database vacuumed")
|
||||
|
||||
return MaintenanceResult(packets_deleted=deleted, vacuumed=True)
|
||||
|
||||
|
||||
class DedupProgress(BaseModel):
|
||||
total: int
|
||||
processed: int
|
||||
duplicates_removed: int
|
||||
in_progress: bool
|
||||
|
||||
|
||||
class DedupResult(BaseModel):
|
||||
started: bool
|
||||
total_packets: int
|
||||
message: str
|
||||
|
||||
|
||||
# Global state for tracking dedup progress
|
||||
_dedup_progress: DedupProgress | None = None
|
||||
|
||||
|
||||
async def _run_payload_dedup() -> None:
|
||||
"""Background task to remove duplicate-payload packets."""
|
||||
global _dedup_progress
|
||||
|
||||
# Get all undecrypted packets
|
||||
packets = await RawPacketRepository.get_all_undecrypted()
|
||||
total = len(packets)
|
||||
|
||||
_dedup_progress = DedupProgress(
|
||||
total=total, processed=0, duplicates_removed=0, in_progress=True
|
||||
)
|
||||
|
||||
logger.info("Starting payload deduplication of %d packets", total)
|
||||
|
||||
# Group packets by payload hash
|
||||
payload_groups: dict[str, list[int]] = {} # hash -> list of packet IDs
|
||||
|
||||
for packet_id, packet_data, _timestamp in packets:
|
||||
payload = extract_payload(packet_data)
|
||||
if payload is None:
|
||||
continue
|
||||
|
||||
payload_hash = sha256(payload).hexdigest()
|
||||
|
||||
if payload_hash not in payload_groups:
|
||||
payload_groups[payload_hash] = []
|
||||
payload_groups[payload_hash].append(packet_id)
|
||||
|
||||
_dedup_progress = DedupProgress(
|
||||
total=total,
|
||||
processed=_dedup_progress.processed + 1,
|
||||
duplicates_removed=_dedup_progress.duplicates_removed,
|
||||
in_progress=True,
|
||||
)
|
||||
|
||||
# Delete duplicates (keep the first/oldest packet in each group)
|
||||
duplicates_removed = 0
|
||||
for packet_ids in payload_groups.values():
|
||||
if len(packet_ids) > 1:
|
||||
# Keep the first one, delete the rest
|
||||
ids_to_delete = packet_ids[1:]
|
||||
for packet_id in ids_to_delete:
|
||||
await db.conn.execute("DELETE FROM raw_packets WHERE id = ?", (packet_id,))
|
||||
duplicates_removed += 1
|
||||
|
||||
_dedup_progress = DedupProgress(
|
||||
total=total,
|
||||
processed=_dedup_progress.processed,
|
||||
duplicates_removed=duplicates_removed,
|
||||
in_progress=True,
|
||||
)
|
||||
|
||||
await db.conn.commit()
|
||||
|
||||
# Run VACUUM to reclaim space (must be outside transaction, use executescript)
|
||||
await db.conn.executescript("VACUUM;")
|
||||
|
||||
_dedup_progress = DedupProgress(
|
||||
total=total,
|
||||
processed=total,
|
||||
duplicates_removed=duplicates_removed,
|
||||
in_progress=False,
|
||||
)
|
||||
|
||||
logger.info("Payload deduplication complete: removed %d duplicates", duplicates_removed)
|
||||
|
||||
|
||||
@router.post("/dedup", response_model=DedupResult)
|
||||
async def deduplicate_packets(background_tasks: BackgroundTasks) -> DedupResult:
|
||||
"""
|
||||
Remove packets with duplicate payloads (keeps one copy of each unique payload).
|
||||
|
||||
This operation runs in the background and may take a long time for large databases.
|
||||
Use GET /packets/dedup/progress to check status.
|
||||
|
||||
Note: This only affects undecrypted packets. Packets that arrive through different
|
||||
repeater paths have different full data but the same payload - this removes those duplicates.
|
||||
"""
|
||||
global _dedup_progress
|
||||
|
||||
# Check if dedup is already in progress
|
||||
if _dedup_progress and _dedup_progress.in_progress:
|
||||
return DedupResult(
|
||||
started=False,
|
||||
total_packets=_dedup_progress.total,
|
||||
message=f"Deduplication already in progress: {_dedup_progress.processed}/{_dedup_progress.total}",
|
||||
)
|
||||
|
||||
# Get count of undecrypted packets
|
||||
count = await RawPacketRepository.get_undecrypted_count()
|
||||
if count == 0:
|
||||
return DedupResult(
|
||||
started=False, total_packets=0, message="No undecrypted packets to process"
|
||||
)
|
||||
|
||||
# Start background dedup
|
||||
background_tasks.add_task(_run_payload_dedup)
|
||||
|
||||
return DedupResult(
|
||||
started=True,
|
||||
total_packets=count,
|
||||
message=f"Started deduplication of {count} packets in background. This may take a while.",
|
||||
)
|
||||
|
||||
|
||||
@router.get("/dedup/progress", response_model=DedupProgress | None)
|
||||
async def get_dedup_progress() -> DedupProgress | None:
|
||||
"""Get the current progress of payload deduplication."""
|
||||
return _dedup_progress
|
||||
|
||||
@@ -46,7 +46,7 @@ frontend/
|
||||
│ │ ├── CrackerPanel.tsx # WebGPU channel key cracker (lazy-loads wordlist)
|
||||
│ │ ├── NewMessageModal.tsx
|
||||
│ │ ├── ConfigModal.tsx # Radio config + app settings
|
||||
│ │ └── MaintenanceModal.tsx # Database maintenance (cleanup, dedup)
|
||||
│ │ └── MaintenanceModal.tsx # Database maintenance (packet cleanup)
|
||||
│ └── test/
|
||||
│ ├── setup.ts # Test setup (jsdom, matchers)
|
||||
│ ├── messageParser.test.ts
|
||||
|
||||
1
frontend/dist/assets/index-DiX6e5Wm.js.map
vendored
1
frontend/dist/assets/index-DiX6e5Wm.js.map
vendored
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
1
frontend/dist/assets/index-E6l20oLD.js.map
vendored
Normal file
1
frontend/dist/assets/index-E6l20oLD.js.map
vendored
Normal file
File diff suppressed because one or more lines are too long
2
frontend/dist/index.html
vendored
2
frontend/dist/index.html
vendored
@@ -13,7 +13,7 @@
|
||||
<link rel="shortcut icon" href="/favicon.ico" />
|
||||
<link rel="apple-touch-icon" sizes="180x180" href="/apple-touch-icon.png" />
|
||||
<link rel="manifest" href="/site.webmanifest" />
|
||||
<script type="module" crossorigin src="/assets/index-DiX6e5Wm.js"></script>
|
||||
<script type="module" crossorigin src="/assets/index-E6l20oLD.js"></script>
|
||||
<link rel="stylesheet" crossorigin href="/assets/index-C5j7uJOC.css">
|
||||
</head>
|
||||
<body>
|
||||
|
||||
@@ -4,7 +4,6 @@ import type {
|
||||
Channel,
|
||||
CommandResponse,
|
||||
Contact,
|
||||
DedupResult,
|
||||
HealthStatus,
|
||||
MaintenanceResult,
|
||||
Message,
|
||||
@@ -169,7 +168,6 @@ export const api = {
|
||||
method: 'POST',
|
||||
body: JSON.stringify({ prune_undecrypted_days: pruneUndecryptedDays }),
|
||||
}),
|
||||
deduplicatePackets: () => fetchJson<DedupResult>('/packets/dedup', { method: 'POST' }),
|
||||
|
||||
// Read State
|
||||
markAllRead: () =>
|
||||
|
||||
@@ -22,7 +22,6 @@ export function MaintenanceModal({
|
||||
}: MaintenanceModalProps) {
|
||||
const [retentionDays, setRetentionDays] = useState('14');
|
||||
const [cleaning, setCleaning] = useState(false);
|
||||
const [deduping, setDeduping] = useState(false);
|
||||
|
||||
const handleCleanup = async () => {
|
||||
const days = parseInt(retentionDays, 10);
|
||||
@@ -52,30 +51,6 @@ export function MaintenanceModal({
|
||||
}
|
||||
};
|
||||
|
||||
const handleDedup = async () => {
|
||||
setDeduping(true);
|
||||
|
||||
try {
|
||||
const result = await api.deduplicatePackets();
|
||||
if (result.started) {
|
||||
toast.success('Deduplication started', {
|
||||
description: result.message,
|
||||
});
|
||||
} else {
|
||||
toast.info('Deduplication', {
|
||||
description: result.message,
|
||||
});
|
||||
}
|
||||
} catch (err) {
|
||||
console.error('Failed to start deduplication:', err);
|
||||
toast.error('Deduplication failed', {
|
||||
description: err instanceof Error ? err.message : 'Unknown error',
|
||||
});
|
||||
} finally {
|
||||
setDeduping(false);
|
||||
}
|
||||
};
|
||||
|
||||
return (
|
||||
<Dialog open={open} onOpenChange={(isOpen) => !isOpen && onClose()}>
|
||||
<DialogContent className="sm:max-w-[400px]">
|
||||
@@ -115,17 +90,6 @@ export function MaintenanceModal({
|
||||
</Button>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<div className="space-y-3">
|
||||
<Label>Remove Duplicate Packets</Label>
|
||||
<p className="text-xs text-muted-foreground">
|
||||
Remove packets with duplicate payloads (same message received via different paths).
|
||||
Runs in background and may take a long time.
|
||||
</p>
|
||||
<Button variant="outline" onClick={handleDedup} disabled={deduping}>
|
||||
{deduping ? 'Starting...' : 'Remove Duplicates'}
|
||||
</Button>
|
||||
</div>
|
||||
</div>
|
||||
</DialogContent>
|
||||
</Dialog>
|
||||
|
||||
@@ -49,12 +49,6 @@ export interface MaintenanceResult {
|
||||
vacuumed: boolean;
|
||||
}
|
||||
|
||||
export interface DedupResult {
|
||||
started: boolean;
|
||||
total_packets: number;
|
||||
message: string;
|
||||
}
|
||||
|
||||
export interface Contact {
|
||||
public_key: PublicKey;
|
||||
name: string | null;
|
||||
|
||||
@@ -477,15 +477,19 @@ class TestRawPacketRepository:
|
||||
conn = await aiosqlite.connect(":memory:")
|
||||
conn.row_factory = aiosqlite.Row
|
||||
|
||||
# Create the raw_packets table
|
||||
# Create the raw_packets table with payload_hash for deduplication
|
||||
await conn.execute("""
|
||||
CREATE TABLE raw_packets (
|
||||
id INTEGER PRIMARY KEY,
|
||||
timestamp INTEGER NOT NULL,
|
||||
data BLOB NOT NULL UNIQUE,
|
||||
message_id INTEGER
|
||||
data BLOB NOT NULL,
|
||||
message_id INTEGER,
|
||||
payload_hash TEXT
|
||||
)
|
||||
""")
|
||||
await conn.execute(
|
||||
"CREATE UNIQUE INDEX IF NOT EXISTS idx_raw_packets_payload_hash ON raw_packets(payload_hash)"
|
||||
)
|
||||
await conn.commit()
|
||||
|
||||
# Patch the db._connection to use our test connection
|
||||
@@ -494,10 +498,11 @@ class TestRawPacketRepository:
|
||||
|
||||
try:
|
||||
packet_data = b"\x01\x02\x03\x04\x05"
|
||||
packet_id = await RawPacketRepository.create(packet_data, 1234567890)
|
||||
packet_id, is_new = await RawPacketRepository.create(packet_data, 1234567890)
|
||||
|
||||
assert packet_id is not None
|
||||
assert packet_id > 0
|
||||
assert is_new is True
|
||||
finally:
|
||||
db._connection = original_conn
|
||||
await conn.close()
|
||||
@@ -514,15 +519,19 @@ class TestRawPacketRepository:
|
||||
conn = await aiosqlite.connect(":memory:")
|
||||
conn.row_factory = aiosqlite.Row
|
||||
|
||||
# Create the raw_packets table
|
||||
# Create the raw_packets table with payload_hash for deduplication
|
||||
await conn.execute("""
|
||||
CREATE TABLE raw_packets (
|
||||
id INTEGER PRIMARY KEY,
|
||||
timestamp INTEGER NOT NULL,
|
||||
data BLOB NOT NULL UNIQUE,
|
||||
message_id INTEGER
|
||||
data BLOB NOT NULL,
|
||||
message_id INTEGER,
|
||||
payload_hash TEXT
|
||||
)
|
||||
""")
|
||||
await conn.execute(
|
||||
"CREATE UNIQUE INDEX IF NOT EXISTS idx_raw_packets_payload_hash ON raw_packets(payload_hash)"
|
||||
)
|
||||
await conn.commit()
|
||||
|
||||
# Patch the db._connection to use our test connection
|
||||
@@ -533,12 +542,106 @@ class TestRawPacketRepository:
|
||||
packet1 = b"\x01\x02\x03"
|
||||
packet2 = b"\x04\x05\x06"
|
||||
|
||||
id1 = await RawPacketRepository.create(packet1, 1234567890)
|
||||
id2 = await RawPacketRepository.create(packet2, 1234567891)
|
||||
id1, is_new1 = await RawPacketRepository.create(packet1, 1234567890)
|
||||
id2, is_new2 = await RawPacketRepository.create(packet2, 1234567891)
|
||||
|
||||
assert id1 is not None
|
||||
assert id2 is not None
|
||||
assert id1 != id2
|
||||
assert is_new1 is True
|
||||
assert is_new2 is True
|
||||
finally:
|
||||
db._connection = original_conn
|
||||
await conn.close()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_duplicate_packet_returns_existing_id(self):
|
||||
"""Inserting same payload twice returns existing ID and is_new=False."""
|
||||
import aiosqlite
|
||||
|
||||
from app.database import db
|
||||
from app.repository import RawPacketRepository
|
||||
|
||||
conn = await aiosqlite.connect(":memory:")
|
||||
conn.row_factory = aiosqlite.Row
|
||||
|
||||
# Create the raw_packets table with payload_hash for deduplication
|
||||
await conn.execute("""
|
||||
CREATE TABLE raw_packets (
|
||||
id INTEGER PRIMARY KEY,
|
||||
timestamp INTEGER NOT NULL,
|
||||
data BLOB NOT NULL,
|
||||
message_id INTEGER,
|
||||
payload_hash TEXT
|
||||
)
|
||||
""")
|
||||
await conn.execute(
|
||||
"CREATE UNIQUE INDEX IF NOT EXISTS idx_raw_packets_payload_hash ON raw_packets(payload_hash)"
|
||||
)
|
||||
await conn.commit()
|
||||
|
||||
original_conn = db._connection
|
||||
db._connection = conn
|
||||
|
||||
try:
|
||||
# Same packet data inserted twice
|
||||
packet_data = b"\x01\x02\x03\x04\x05"
|
||||
id1, is_new1 = await RawPacketRepository.create(packet_data, 1234567890)
|
||||
id2, is_new2 = await RawPacketRepository.create(packet_data, 1234567891)
|
||||
|
||||
# Both should return the same ID
|
||||
assert id1 == id2
|
||||
# First is new, second is not
|
||||
assert is_new1 is True
|
||||
assert is_new2 is False
|
||||
finally:
|
||||
db._connection = original_conn
|
||||
await conn.close()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_malformed_packet_uses_full_data_hash(self):
|
||||
"""Malformed packets (can't extract payload) hash full data for dedup."""
|
||||
import aiosqlite
|
||||
|
||||
from app.database import db
|
||||
from app.repository import RawPacketRepository
|
||||
|
||||
conn = await aiosqlite.connect(":memory:")
|
||||
conn.row_factory = aiosqlite.Row
|
||||
|
||||
await conn.execute("""
|
||||
CREATE TABLE raw_packets (
|
||||
id INTEGER PRIMARY KEY,
|
||||
timestamp INTEGER NOT NULL,
|
||||
data BLOB NOT NULL,
|
||||
message_id INTEGER,
|
||||
payload_hash TEXT
|
||||
)
|
||||
""")
|
||||
await conn.execute(
|
||||
"CREATE UNIQUE INDEX IF NOT EXISTS idx_raw_packets_payload_hash ON raw_packets(payload_hash)"
|
||||
)
|
||||
await conn.commit()
|
||||
|
||||
original_conn = db._connection
|
||||
db._connection = conn
|
||||
|
||||
try:
|
||||
# Single byte is too short to be valid packet (extract_payload returns None)
|
||||
malformed = b"\x01"
|
||||
id1, is_new1 = await RawPacketRepository.create(malformed, 1234567890)
|
||||
id2, is_new2 = await RawPacketRepository.create(malformed, 1234567891)
|
||||
|
||||
# Should still deduplicate using full data hash
|
||||
assert id1 == id2
|
||||
assert is_new1 is True
|
||||
assert is_new2 is False
|
||||
|
||||
# Different malformed packet should get different ID
|
||||
different_malformed = b"\x02"
|
||||
id3, is_new3 = await RawPacketRepository.create(different_malformed, 1234567892)
|
||||
assert id3 != id1
|
||||
assert is_new3 is True
|
||||
finally:
|
||||
db._connection = original_conn
|
||||
await conn.close()
|
||||
|
||||
@@ -83,8 +83,8 @@ class TestMigration001:
|
||||
# Run migrations
|
||||
applied = await run_migrations(conn)
|
||||
|
||||
assert applied == 3 # All 3 migrations run
|
||||
assert await get_version(conn) == 3
|
||||
assert applied == 5 # All 5 migrations run
|
||||
assert await get_version(conn) == 5
|
||||
|
||||
# Verify columns exist by inserting and selecting
|
||||
await conn.execute(
|
||||
@@ -149,9 +149,9 @@ class TestMigration001:
|
||||
applied1 = await run_migrations(conn)
|
||||
applied2 = await run_migrations(conn)
|
||||
|
||||
assert applied1 == 3 # All 3 migrations run
|
||||
assert applied1 == 5 # All 5 migrations run
|
||||
assert applied2 == 0 # No migrations on second run
|
||||
assert await get_version(conn) == 3
|
||||
assert await get_version(conn) == 5
|
||||
finally:
|
||||
await conn.close()
|
||||
|
||||
@@ -194,9 +194,9 @@ class TestMigration001:
|
||||
# Run migrations - should not fail
|
||||
applied = await run_migrations(conn)
|
||||
|
||||
# All 3 migrations applied (version incremented) but no error
|
||||
assert applied == 3
|
||||
assert await get_version(conn) == 3
|
||||
# All 5 migrations applied (version incremented) but no error
|
||||
assert applied == 5
|
||||
assert await get_version(conn) == 5
|
||||
finally:
|
||||
await conn.close()
|
||||
|
||||
|
||||
@@ -393,7 +393,7 @@ class TestCreateMessageFromDecrypted:
|
||||
from app.packet_processor import create_message_from_decrypted
|
||||
|
||||
# Create a raw packet first (required for the function)
|
||||
packet_id = await RawPacketRepository.create(b"test_packet_data", 1700000000)
|
||||
packet_id, _ = await RawPacketRepository.create(b"test_packet_data", 1700000000)
|
||||
|
||||
broadcasts, mock_broadcast = captured_broadcasts
|
||||
|
||||
@@ -439,7 +439,7 @@ class TestCreateMessageFromDecrypted:
|
||||
"""create_message_from_decrypted handles messages without sender prefix."""
|
||||
from app.packet_processor import create_message_from_decrypted
|
||||
|
||||
packet_id = await RawPacketRepository.create(b"test_packet_data_2", 1700000000)
|
||||
packet_id, _ = await RawPacketRepository.create(b"test_packet_data_2", 1700000000)
|
||||
|
||||
broadcasts, mock_broadcast = captured_broadcasts
|
||||
|
||||
@@ -467,8 +467,8 @@ class TestCreateMessageFromDecrypted:
|
||||
"""create_message_from_decrypted returns None for duplicate message."""
|
||||
from app.packet_processor import create_message_from_decrypted
|
||||
|
||||
packet_id_1 = await RawPacketRepository.create(b"packet_1", 1700000000)
|
||||
packet_id_2 = await RawPacketRepository.create(b"packet_2", 1700000001)
|
||||
packet_id_1, _ = await RawPacketRepository.create(b"packet_1", 1700000000)
|
||||
packet_id_2, _ = await RawPacketRepository.create(b"packet_2", 1700000001)
|
||||
|
||||
broadcasts, mock_broadcast = captured_broadcasts
|
||||
|
||||
@@ -505,7 +505,7 @@ class TestCreateMessageFromDecrypted:
|
||||
"""create_message_from_decrypted links raw packet to created message."""
|
||||
from app.packet_processor import create_message_from_decrypted
|
||||
|
||||
packet_id = await RawPacketRepository.create(b"test_packet", 1700000000)
|
||||
packet_id, _ = await RawPacketRepository.create(b"test_packet", 1700000000)
|
||||
|
||||
broadcasts, mock_broadcast = captured_broadcasts
|
||||
|
||||
|
||||
Reference in New Issue
Block a user