Files
Remote-Terminal-for-MeshCore/app/database.py
2026-01-18 14:17:49 -08:00

98 lines
2.8 KiB
Python

import logging
from pathlib import Path
import aiosqlite
from app.config import settings
logger = logging.getLogger(__name__)
SCHEMA = """
CREATE TABLE IF NOT EXISTS contacts (
public_key TEXT PRIMARY KEY,
name TEXT,
type INTEGER DEFAULT 0,
flags INTEGER DEFAULT 0,
last_path TEXT,
last_path_len INTEGER DEFAULT -1,
last_advert INTEGER,
lat REAL,
lon REAL,
last_seen INTEGER,
on_radio INTEGER DEFAULT 0,
last_contacted INTEGER
);
CREATE TABLE IF NOT EXISTS channels (
key TEXT PRIMARY KEY,
name TEXT NOT NULL,
is_hashtag INTEGER DEFAULT 0,
on_radio INTEGER DEFAULT 0
);
CREATE TABLE IF NOT EXISTS messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
type TEXT NOT NULL,
conversation_key TEXT NOT NULL,
text TEXT NOT NULL,
sender_timestamp INTEGER,
received_at INTEGER NOT NULL,
path TEXT,
txt_type INTEGER DEFAULT 0,
signature TEXT,
outgoing INTEGER DEFAULT 0,
acked INTEGER DEFAULT 0,
UNIQUE(type, conversation_key, text, sender_timestamp)
);
CREATE TABLE IF NOT EXISTS raw_packets (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp INTEGER NOT NULL,
data BLOB NOT NULL,
message_id INTEGER,
payload_hash TEXT,
FOREIGN KEY (message_id) REFERENCES messages(id)
);
CREATE INDEX IF NOT EXISTS idx_messages_conversation ON messages(type, conversation_key);
CREATE INDEX IF NOT EXISTS idx_messages_received ON messages(received_at);
CREATE INDEX IF NOT EXISTS idx_raw_packets_message_id ON raw_packets(message_id);
CREATE UNIQUE INDEX IF NOT EXISTS idx_raw_packets_payload_hash ON raw_packets(payload_hash);
CREATE INDEX IF NOT EXISTS idx_contacts_on_radio ON contacts(on_radio);
"""
class Database:
def __init__(self, db_path: str):
self.db_path = db_path
self._connection: aiosqlite.Connection | None = None
async def connect(self) -> None:
logger.info("Connecting to database at %s", self.db_path)
Path(self.db_path).parent.mkdir(parents=True, exist_ok=True)
self._connection = await aiosqlite.connect(self.db_path)
self._connection.row_factory = aiosqlite.Row
await self._connection.executescript(SCHEMA)
await self._connection.commit()
logger.debug("Database schema initialized")
# Run any pending migrations
from app.migrations import run_migrations
await run_migrations(self._connection)
async def disconnect(self) -> None:
if self._connection:
await self._connection.close()
self._connection = None
logger.debug("Database connection closed")
@property
def conn(self) -> aiosqlite.Connection:
if not self._connection:
raise RuntimeError("Database not connected")
return self._connection
db = Database(settings.database_path)