Add packet cleanup

This commit is contained in:
Jack Kingsman
2026-01-13 12:49:27 -08:00
parent 3ea98e309c
commit 547a2adb94
14 changed files with 573 additions and 44 deletions

View File

@@ -522,3 +522,14 @@ class RawPacketRepository:
(int(time.time()), packet_id),
)
await db.conn.commit()
@staticmethod
async def prune_old_undecrypted(max_age_days: int) -> int:
"""Delete undecrypted packets older than max_age_days. Returns count deleted."""
cutoff = int(time.time()) - (max_age_days * 86400)
cursor = await db.conn.execute(
"DELETE FROM raw_packets WHERE decrypted = 0 AND timestamp < ?",
(cutoff,),
)
await db.conn.commit()
return cursor.rowcount

View File

@@ -1,6 +1,9 @@
import os
from fastapi import APIRouter
from pydantic import BaseModel
from app.config import settings
from app.radio import radio_manager
@@ -11,13 +14,23 @@ class HealthResponse(BaseModel):
status: str
radio_connected: bool
serial_port: str | None
database_size_mb: float
@router.get("/health", response_model=HealthResponse)
async def healthcheck() -> HealthResponse:
"""Check if the API is running and if the radio is connected."""
# Get database file size in MB
db_size_mb = 0.0
try:
db_size_bytes = os.path.getsize(settings.database_path)
db_size_mb = round(db_size_bytes / (1024 * 1024), 2)
except OSError:
pass
return HealthResponse(
status="ok" if radio_manager.is_connected else "degraded",
radio_connected=radio_manager.is_connected,
serial_port=radio_manager.port,
database_size_mb=db_size_mb,
)

View File

@@ -4,7 +4,8 @@ from hashlib import sha256
from fastapi import APIRouter, BackgroundTasks
from pydantic import BaseModel, Field
from app.decoder import try_decrypt_packet_with_channel_key
from app.database import db
from app.decoder import extract_payload, try_decrypt_packet_with_channel_key
from app.packet_processor import create_message_from_decrypted
from app.repository import RawPacketRepository
@@ -174,3 +175,166 @@ async def decrypt_historical_packets(
async def get_decrypt_progress() -> DecryptProgress | None:
"""Get the current progress of historical decryption."""
return _decrypt_progress
class MaintenanceRequest(BaseModel):
prune_undecrypted_days: int = Field(
ge=1,
description="Delete undecrypted packets older than this many days"
)
class MaintenanceResult(BaseModel):
packets_deleted: int
vacuumed: bool
@router.post("/maintenance", response_model=MaintenanceResult)
async def run_maintenance(request: MaintenanceRequest) -> MaintenanceResult:
"""
Clean up old undecrypted packets and reclaim disk space.
- Deletes undecrypted packets older than the specified number of days
- Runs VACUUM to reclaim disk space
"""
logger.info("Running maintenance: pruning packets older than %d days", request.prune_undecrypted_days)
# Prune old undecrypted packets
deleted = await RawPacketRepository.prune_old_undecrypted(request.prune_undecrypted_days)
logger.info("Deleted %d old undecrypted packets", deleted)
# Run VACUUM to reclaim space
await db.conn.execute("VACUUM")
logger.info("Database vacuumed")
return MaintenanceResult(packets_deleted=deleted, vacuumed=True)
class DedupProgress(BaseModel):
total: int
processed: int
duplicates_removed: int
in_progress: bool
class DedupResult(BaseModel):
started: bool
total_packets: int
message: str
# Global state for tracking dedup progress
_dedup_progress: DedupProgress | None = None
async def _run_payload_dedup() -> None:
"""Background task to remove duplicate-payload packets."""
global _dedup_progress
# Get all undecrypted packets
packets = await RawPacketRepository.get_all_undecrypted()
total = len(packets)
_dedup_progress = DedupProgress(
total=total, processed=0, duplicates_removed=0, in_progress=True
)
logger.info("Starting payload deduplication of %d packets", total)
# Group packets by payload hash
payload_groups: dict[str, list[int]] = {} # hash -> list of packet IDs
for packet_id, packet_data, _timestamp in packets:
payload = extract_payload(packet_data)
if payload is None:
continue
payload_hash = sha256(payload).hexdigest()
if payload_hash not in payload_groups:
payload_groups[payload_hash] = []
payload_groups[payload_hash].append(packet_id)
_dedup_progress = DedupProgress(
total=total,
processed=_dedup_progress.processed + 1,
duplicates_removed=_dedup_progress.duplicates_removed,
in_progress=True,
)
# Delete duplicates (keep the first/oldest packet in each group)
duplicates_removed = 0
for payload_hash, packet_ids in payload_groups.items():
if len(packet_ids) > 1:
# Keep the first one, delete the rest
ids_to_delete = packet_ids[1:]
for packet_id in ids_to_delete:
await db.conn.execute(
"DELETE FROM raw_packets WHERE id = ?", (packet_id,)
)
duplicates_removed += 1
_dedup_progress = DedupProgress(
total=total,
processed=_dedup_progress.processed,
duplicates_removed=duplicates_removed,
in_progress=True,
)
await db.conn.commit()
# Run VACUUM to reclaim space
await db.conn.execute("VACUUM")
_dedup_progress = DedupProgress(
total=total,
processed=total,
duplicates_removed=duplicates_removed,
in_progress=False,
)
logger.info("Payload deduplication complete: removed %d duplicates", duplicates_removed)
@router.post("/dedup", response_model=DedupResult)
async def deduplicate_packets(background_tasks: BackgroundTasks) -> DedupResult:
"""
Remove packets with duplicate payloads (keeps one copy of each unique payload).
This operation runs in the background and may take a long time for large databases.
Use GET /packets/dedup/progress to check status.
Note: This only affects undecrypted packets. Packets that arrive through different
repeater paths have different full data but the same payload - this removes those duplicates.
"""
global _dedup_progress
# Check if dedup is already in progress
if _dedup_progress and _dedup_progress.in_progress:
return DedupResult(
started=False,
total_packets=_dedup_progress.total,
message=f"Deduplication already in progress: {_dedup_progress.processed}/{_dedup_progress.total}",
)
# Get count of undecrypted packets
count = await RawPacketRepository.get_undecrypted_count()
if count == 0:
return DedupResult(
started=False, total_packets=0, message="No undecrypted packets to process"
)
# Start background dedup
background_tasks.add_task(_run_payload_dedup)
return DedupResult(
started=True,
total_packets=count,
message=f"Started deduplication of {count} packets in background. This may take a while.",
)
@router.get("/dedup/progress", response_model=DedupProgress | None)
async def get_dedup_progress() -> DedupProgress | None:
"""Get the current progress of payload deduplication."""
return _dedup_progress

View File

@@ -1,9 +1,11 @@
"""WebSocket router for real-time updates."""
import logging
import os
from fastapi import APIRouter, WebSocket, WebSocketDisconnect
from app.config import settings
from app.radio import radio_manager
from app.repository import ChannelRepository, ContactRepository
from app.websocket import ws_manager
@@ -20,9 +22,18 @@ async def websocket_endpoint(websocket: WebSocket) -> None:
# Send initial state
try:
# Health status
db_size_mb = 0.0
try:
db_size_bytes = os.path.getsize(settings.database_path)
db_size_mb = round(db_size_bytes / (1024 * 1024), 2)
except OSError:
pass
health_data = {
"status": "ok" if radio_manager.is_connected else "degraded",
"radio_connected": radio_manager.is_connected,
"serial_port": radio_manager.port,
"database_size_mb": db_size_mb,
}
await ws_manager.send_personal(websocket, "health", health_data)

View File

@@ -3,10 +3,13 @@
import asyncio
import json
import logging
import os
from typing import Any
from fastapi import WebSocket
from app.config import settings
logger = logging.getLogger(__name__)
@@ -85,8 +88,17 @@ def broadcast_error(message: str, details: str | None = None) -> None:
def broadcast_health(radio_connected: bool, serial_port: str | None = None) -> None:
"""Broadcast health status change to all connected clients."""
# Get database file size in MB
db_size_mb = 0.0
try:
db_size_bytes = os.path.getsize(settings.database_path)
db_size_mb = round(db_size_bytes / (1024 * 1024), 2)
except OSError:
pass
asyncio.create_task(ws_manager.broadcast("health", {
"status": "ok" if radio_connected else "degraded",
"radio_connected": radio_connected,
"serial_port": serial_port,
"database_size_mb": db_size_mb,
}))

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@@ -13,8 +13,8 @@
<link rel="shortcut icon" href="/favicon.ico" />
<link rel="apple-touch-icon" sizes="180x180" href="/apple-touch-icon.png" />
<link rel="manifest" href="/site.webmanifest" />
<script type="module" crossorigin src="/assets/index-CTjJAYeA.js"></script>
<link rel="stylesheet" crossorigin href="/assets/index-DaLCXB8p.css">
<script type="module" crossorigin src="/assets/index-BW3IACj-.js"></script>
<link rel="stylesheet" crossorigin href="/assets/index-URSnSHtR.css">
</head>
<body>
<div id="root"></div>

View File

@@ -653,11 +653,16 @@ export function App() {
open={showConfig}
config={config}
appSettings={appSettings}
health={health}
onClose={() => setShowConfig(false)}
onSave={handleSaveConfig}
onSaveAppSettings={handleSaveAppSettings}
onSetPrivateKey={handleSetPrivateKey}
onReboot={handleReboot}
onHealthRefresh={async () => {
const data = await api.getHealth();
setHealth(data);
}}
/>
<Toaster position="top-right" />

View File

@@ -4,7 +4,9 @@ import type {
Channel,
CommandResponse,
Contact,
DedupResult,
HealthStatus,
MaintenanceResult,
Message,
RadioConfig,
RadioConfigUpdate,
@@ -164,6 +166,13 @@ export const api = {
method: 'POST',
body: JSON.stringify(params),
}),
runMaintenance: (pruneUndecryptedDays: number) =>
fetchJson<MaintenanceResult>('/packets/maintenance', {
method: 'POST',
body: JSON.stringify({ prune_undecrypted_days: pruneUndecryptedDays }),
}),
deduplicatePackets: () =>
fetchJson<DedupResult>('/packets/dedup', { method: 'POST' }),
// Read State
markAllRead: () =>

View File

@@ -1,5 +1,5 @@
import { useState, useEffect } from 'react';
import type { AppSettings, AppSettingsUpdate, RadioConfig, RadioConfigUpdate } from '../types';
import type { AppSettings, AppSettingsUpdate, HealthStatus, RadioConfig, RadioConfigUpdate } from '../types';
import {
Dialog,
DialogContent,
@@ -12,27 +12,33 @@ import { Label } from './ui/label';
import { Button } from './ui/button';
import { Separator } from './ui/separator';
import { Alert, AlertDescription } from './ui/alert';
import { toast } from './ui/sonner';
import { api } from '../api';
interface ConfigModalProps {
open: boolean;
config: RadioConfig | null;
appSettings: AppSettings | null;
health: HealthStatus | null;
onClose: () => void;
onSave: (update: RadioConfigUpdate) => Promise<void>;
onSaveAppSettings: (update: AppSettingsUpdate) => Promise<void>;
onSetPrivateKey: (key: string) => Promise<void>;
onReboot: () => Promise<void>;
onHealthRefresh: () => Promise<void>;
}
export function ConfigModal({
open,
config,
appSettings,
health,
onClose,
onSave,
onSaveAppSettings,
onSetPrivateKey,
onReboot,
onHealthRefresh,
}: ConfigModalProps) {
const [name, setName] = useState('');
const [lat, setLat] = useState('');
@@ -44,8 +50,11 @@ export function ConfigModal({
const [cr, setCr] = useState('');
const [privateKey, setPrivateKey] = useState('');
const [maxRadioContacts, setMaxRadioContacts] = useState('');
const [retentionDays, setRetentionDays] = useState('14');
const [loading, setLoading] = useState(false);
const [rebooting, setRebooting] = useState(false);
const [cleaning, setCleaning] = useState(false);
const [deduping, setDeduping] = useState(false);
const [error, setError] = useState('');
useEffect(() => {
@@ -135,6 +144,58 @@ export function ConfigModal({
}
};
const handleCleanup = async () => {
const days = parseInt(retentionDays, 10);
if (isNaN(days) || days < 1) {
setError('Retention days must be at least 1');
return;
}
setError('');
setCleaning(true);
try {
const result = await api.runMaintenance(days);
toast.success('Database cleanup complete', {
description: `Deleted ${result.packets_deleted} old packet${result.packets_deleted === 1 ? '' : 's'}`,
});
// Refresh health to get updated database size
await onHealthRefresh();
} catch (err) {
console.error('Failed to run maintenance:', err);
toast.error('Database cleanup failed', {
description: err instanceof Error ? err.message : 'Unknown error',
});
} finally {
setCleaning(false);
}
};
const handleDedup = async () => {
setError('');
setDeduping(true);
try {
const result = await api.deduplicatePackets();
if (result.started) {
toast.success('Deduplication started', {
description: result.message,
});
} else {
toast.info('Deduplication', {
description: result.message,
});
}
} catch (err) {
console.error('Failed to start deduplication:', err);
toast.error('Deduplication failed', {
description: err instanceof Error ? err.message : 'Unknown error',
});
} finally {
setDeduping(false);
}
};
return (
<Dialog open={open} onOpenChange={(isOpen) => !isOpen && onClose()}>
<DialogContent className="sm:max-w-[500px] max-h-[90vh] overflow-y-auto">
@@ -308,6 +369,52 @@ export function ConfigModal({
</Button>
</div>
<Separator className="my-4" />
<div className="space-y-3">
<Label>Database Maintenance</Label>
<p className="text-xs text-muted-foreground">
Current database size: <span className="font-medium">{health?.database_size_mb ?? '?'} MB</span>
</p>
<p className="text-xs text-muted-foreground">
Delete undecrypted packets older than the specified days. This helps manage storage
for packets that couldn't be decrypted (unknown channel keys).
</p>
<div className="flex gap-2 items-end">
<div className="space-y-1">
<Label htmlFor="retention-days" className="text-xs">Days to retain</Label>
<Input
id="retention-days"
type="number"
min="1"
max="365"
value={retentionDays}
onChange={(e) => setRetentionDays(e.target.value)}
className="w-20"
/>
</div>
<Button
variant="outline"
onClick={handleCleanup}
disabled={cleaning || loading}
>
{cleaning ? 'Cleaning...' : 'Cleanup'}
</Button>
</div>
<p className="text-xs text-muted-foreground mt-4">
Remove packets with duplicate payloads (same message received via different paths).
Runs in background and may take a long time.
</p>
<Button
variant="outline"
onClick={handleDedup}
disabled={deduping || loading}
>
{deduping ? 'Starting...' : 'Remove Duplicates'}
</Button>
</div>
{error && (
<div className="text-sm text-destructive">{error}</div>
)}

View File

@@ -41,6 +41,18 @@ export interface HealthStatus {
status: string;
radio_connected: boolean;
serial_port: string | null;
database_size_mb: number;
}
export interface MaintenanceResult {
packets_deleted: number;
vacuumed: boolean;
}
export interface DedupResult {
started: boolean;
total_packets: number;
message: string;
}
export interface Contact {

View File

@@ -563,3 +563,188 @@ class TestRawPacketRepository:
finally:
db._connection = original_conn
await conn.close()
@pytest.mark.asyncio
async def test_prune_old_undecrypted_deletes_old_packets(self):
"""Prune deletes undecrypted packets older than specified days."""
import aiosqlite
import time
from app.repository import RawPacketRepository
from app.database import db
conn = await aiosqlite.connect(":memory:")
conn.row_factory = aiosqlite.Row
await conn.execute("""
CREATE TABLE raw_packets (
id INTEGER PRIMARY KEY,
timestamp INTEGER NOT NULL,
data BLOB NOT NULL UNIQUE,
decrypted INTEGER DEFAULT 0,
message_id INTEGER,
decrypt_attempts INTEGER DEFAULT 0,
last_attempt INTEGER
)
""")
now = int(time.time())
old_timestamp = now - (15 * 86400) # 15 days ago
recent_timestamp = now - (5 * 86400) # 5 days ago
# Insert old undecrypted packet
await conn.execute(
"INSERT INTO raw_packets (timestamp, data, decrypted) VALUES (?, ?, 0)",
(old_timestamp, b"\x01\x02\x03")
)
# Insert recent undecrypted packet
await conn.execute(
"INSERT INTO raw_packets (timestamp, data, decrypted) VALUES (?, ?, 0)",
(recent_timestamp, b"\x04\x05\x06")
)
# Insert old but decrypted packet (should NOT be deleted)
await conn.execute(
"INSERT INTO raw_packets (timestamp, data, decrypted) VALUES (?, ?, 1)",
(old_timestamp, b"\x07\x08\x09")
)
await conn.commit()
original_conn = db._connection
db._connection = conn
try:
# Prune packets older than 10 days
deleted = await RawPacketRepository.prune_old_undecrypted(10)
assert deleted == 1 # Only the old undecrypted packet
# Verify remaining packets
cursor = await conn.execute("SELECT COUNT(*) as count FROM raw_packets")
row = await cursor.fetchone()
assert row["count"] == 2 # Recent undecrypted + old decrypted
finally:
db._connection = original_conn
await conn.close()
@pytest.mark.asyncio
async def test_prune_old_undecrypted_returns_zero_when_nothing_to_delete(self):
"""Prune returns 0 when no packets match criteria."""
import aiosqlite
import time
from app.repository import RawPacketRepository
from app.database import db
conn = await aiosqlite.connect(":memory:")
conn.row_factory = aiosqlite.Row
await conn.execute("""
CREATE TABLE raw_packets (
id INTEGER PRIMARY KEY,
timestamp INTEGER NOT NULL,
data BLOB NOT NULL UNIQUE,
decrypted INTEGER DEFAULT 0,
message_id INTEGER,
decrypt_attempts INTEGER DEFAULT 0,
last_attempt INTEGER
)
""")
now = int(time.time())
recent_timestamp = now - (5 * 86400) # 5 days ago
# Insert only recent packet
await conn.execute(
"INSERT INTO raw_packets (timestamp, data, decrypted) VALUES (?, ?, 0)",
(recent_timestamp, b"\x01\x02\x03")
)
await conn.commit()
original_conn = db._connection
db._connection = conn
try:
# Prune packets older than 10 days (none should match)
deleted = await RawPacketRepository.prune_old_undecrypted(10)
assert deleted == 0
finally:
db._connection = original_conn
await conn.close()
class TestMaintenanceEndpoint:
"""Test database maintenance endpoint."""
@pytest.mark.asyncio
async def test_maintenance_prunes_and_vacuums(self):
"""Maintenance endpoint prunes old packets and runs vacuum."""
import aiosqlite
import time
from app.repository import RawPacketRepository
from app.database import db
from app.routers.packets import run_maintenance, MaintenanceRequest
conn = await aiosqlite.connect(":memory:")
conn.row_factory = aiosqlite.Row
await conn.execute("""
CREATE TABLE raw_packets (
id INTEGER PRIMARY KEY,
timestamp INTEGER NOT NULL,
data BLOB NOT NULL UNIQUE,
decrypted INTEGER DEFAULT 0,
message_id INTEGER,
decrypt_attempts INTEGER DEFAULT 0,
last_attempt INTEGER
)
""")
now = int(time.time())
old_timestamp = now - (20 * 86400) # 20 days ago
# Insert old undecrypted packets
await conn.execute(
"INSERT INTO raw_packets (timestamp, data, decrypted) VALUES (?, ?, 0)",
(old_timestamp, b"\x01\x02\x03")
)
await conn.execute(
"INSERT INTO raw_packets (timestamp, data, decrypted) VALUES (?, ?, 0)",
(old_timestamp, b"\x04\x05\x06")
)
await conn.commit()
original_conn = db._connection
db._connection = conn
try:
request = MaintenanceRequest(prune_undecrypted_days=14)
result = await run_maintenance(request)
assert result.packets_deleted == 2
assert result.vacuumed is True
finally:
db._connection = original_conn
await conn.close()
class TestHealthEndpointDatabaseSize:
"""Test database size reporting in health endpoint."""
def test_health_includes_database_size(self):
"""Health endpoint includes database_size_mb field."""
from fastapi.testclient import TestClient
from unittest.mock import patch
with patch("app.routers.health.radio_manager") as mock_rm, \
patch("app.routers.health.os.path.getsize") as mock_getsize:
mock_rm.is_connected = True
mock_rm.port = "/dev/ttyUSB0"
mock_getsize.return_value = 10 * 1024 * 1024 # 10 MB
from app.main import app
client = TestClient(app)
response = client.get("/api/health")
assert response.status_code == 200
data = response.json()
assert "database_size_mb" in data
assert data["database_size_mb"] == 10.0