mirror of
https://github.com/rightup/pyMC_Repeater.git
synced 2026-03-28 17:43:06 +01:00
- Updated the SQLiteHandler to apply a migration that prefixes companion_hash values with '0x' for consistency with room_hash patterns. - Adjusted the main.py file to reflect the new formatting for companion_hash when generating its string representation.
1920 lines
79 KiB
Python
1920 lines
79 KiB
Python
import base64
|
|
import json
|
|
import logging
|
|
import secrets
|
|
import sqlite3
|
|
import time
|
|
from pathlib import Path
|
|
from typing import Any, Dict, List, Optional
|
|
|
|
logger = logging.getLogger("SQLiteHandler")
|
|
|
|
|
|
class SQLiteHandler:
|
|
def __init__(self, storage_dir: Path):
|
|
self.storage_dir = storage_dir
|
|
self.sqlite_path = self.storage_dir / "repeater.db"
|
|
self._init_database()
|
|
self._run_migrations()
|
|
|
|
def _init_database(self):
|
|
try:
|
|
with sqlite3.connect(self.sqlite_path) as conn:
|
|
conn.execute(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS packets (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
timestamp REAL NOT NULL,
|
|
type INTEGER NOT NULL,
|
|
route INTEGER NOT NULL,
|
|
length INTEGER NOT NULL,
|
|
rssi INTEGER,
|
|
snr REAL,
|
|
score REAL,
|
|
transmitted BOOLEAN NOT NULL,
|
|
is_duplicate BOOLEAN NOT NULL,
|
|
drop_reason TEXT,
|
|
src_hash TEXT,
|
|
dst_hash TEXT,
|
|
path_hash TEXT,
|
|
header TEXT,
|
|
transport_codes TEXT,
|
|
payload TEXT,
|
|
payload_length INTEGER,
|
|
tx_delay_ms REAL,
|
|
packet_hash TEXT,
|
|
original_path TEXT,
|
|
forwarded_path TEXT,
|
|
raw_packet TEXT
|
|
)
|
|
"""
|
|
)
|
|
|
|
conn.execute(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS adverts (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
timestamp REAL NOT NULL,
|
|
pubkey TEXT NOT NULL,
|
|
node_name TEXT,
|
|
is_repeater BOOLEAN NOT NULL,
|
|
route_type INTEGER,
|
|
contact_type TEXT,
|
|
latitude REAL,
|
|
longitude REAL,
|
|
first_seen REAL NOT NULL,
|
|
last_seen REAL NOT NULL,
|
|
rssi INTEGER,
|
|
snr REAL,
|
|
advert_count INTEGER NOT NULL DEFAULT 1,
|
|
is_new_neighbor BOOLEAN NOT NULL,
|
|
zero_hop BOOLEAN NOT NULL DEFAULT FALSE
|
|
)
|
|
"""
|
|
)
|
|
|
|
conn.execute(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS noise_floor (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
timestamp REAL NOT NULL,
|
|
noise_floor_dbm REAL NOT NULL
|
|
)
|
|
"""
|
|
)
|
|
|
|
conn.execute(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS transport_keys (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
name TEXT NOT NULL UNIQUE,
|
|
flood_policy TEXT NOT NULL CHECK (flood_policy IN ('allow', 'deny')),
|
|
transport_key TEXT NOT NULL,
|
|
last_used REAL,
|
|
parent_id INTEGER,
|
|
created_at REAL NOT NULL,
|
|
updated_at REAL NOT NULL,
|
|
FOREIGN KEY (parent_id) REFERENCES transport_keys(id)
|
|
)
|
|
"""
|
|
)
|
|
|
|
conn.execute(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS api_tokens (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
name TEXT NOT NULL,
|
|
token_hash TEXT NOT NULL UNIQUE,
|
|
created_at REAL NOT NULL,
|
|
last_used REAL
|
|
)
|
|
"""
|
|
)
|
|
|
|
conn.execute(
|
|
"CREATE INDEX IF NOT EXISTS idx_packets_timestamp ON packets(timestamp)"
|
|
)
|
|
conn.execute("CREATE INDEX IF NOT EXISTS idx_packets_type ON packets(type)")
|
|
conn.execute("CREATE INDEX IF NOT EXISTS idx_packets_hash ON packets(packet_hash)")
|
|
conn.execute(
|
|
"CREATE INDEX IF NOT EXISTS idx_packets_transmitted ON packets(transmitted)"
|
|
)
|
|
conn.execute(
|
|
"CREATE INDEX IF NOT EXISTS idx_adverts_timestamp ON adverts(timestamp)"
|
|
)
|
|
conn.execute("CREATE INDEX IF NOT EXISTS idx_adverts_pubkey ON adverts(pubkey)")
|
|
conn.execute(
|
|
"CREATE INDEX IF NOT EXISTS idx_noise_timestamp ON noise_floor(timestamp)"
|
|
)
|
|
conn.execute(
|
|
"CREATE INDEX IF NOT EXISTS idx_transport_keys_name ON transport_keys(name)"
|
|
)
|
|
conn.execute(
|
|
"CREATE INDEX IF NOT EXISTS idx_transport_keys_parent ON transport_keys(parent_id)"
|
|
)
|
|
|
|
# Room server tables
|
|
conn.execute(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS room_messages (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
room_hash TEXT NOT NULL,
|
|
author_pubkey TEXT NOT NULL,
|
|
post_timestamp REAL NOT NULL,
|
|
sender_timestamp REAL,
|
|
message_text TEXT NOT NULL,
|
|
txt_type INTEGER NOT NULL,
|
|
created_at REAL NOT NULL
|
|
)
|
|
"""
|
|
)
|
|
|
|
conn.execute(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS room_client_sync (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
room_hash TEXT NOT NULL,
|
|
client_pubkey TEXT NOT NULL,
|
|
sync_since REAL NOT NULL DEFAULT 0,
|
|
pending_ack_crc INTEGER DEFAULT 0,
|
|
push_post_timestamp REAL DEFAULT 0,
|
|
ack_timeout_time REAL DEFAULT 0,
|
|
push_failures INTEGER DEFAULT 0,
|
|
last_activity REAL NOT NULL,
|
|
updated_at REAL NOT NULL,
|
|
UNIQUE(room_hash, client_pubkey)
|
|
)
|
|
"""
|
|
)
|
|
|
|
conn.execute(
|
|
"CREATE INDEX IF NOT EXISTS idx_room_messages_room ON room_messages(room_hash, post_timestamp)"
|
|
)
|
|
conn.execute(
|
|
"CREATE INDEX IF NOT EXISTS idx_room_messages_author ON room_messages(author_pubkey)"
|
|
)
|
|
conn.execute(
|
|
"CREATE INDEX IF NOT EXISTS idx_room_client_sync_room ON room_client_sync(room_hash, client_pubkey)"
|
|
)
|
|
conn.execute(
|
|
"CREATE INDEX IF NOT EXISTS idx_room_client_sync_pending ON room_client_sync(pending_ack_crc)"
|
|
)
|
|
|
|
conn.commit()
|
|
logger.info(f"SQLite database initialized: {self.sqlite_path}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to initialize SQLite: {e}")
|
|
|
|
def _run_migrations(self):
|
|
"""Run database migrations"""
|
|
try:
|
|
with sqlite3.connect(self.sqlite_path) as conn:
|
|
# Create migrations table if it doesn't exist
|
|
conn.execute(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS migrations (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
migration_name TEXT NOT NULL UNIQUE,
|
|
applied_at REAL NOT NULL
|
|
)
|
|
"""
|
|
)
|
|
|
|
# Migration 1: Add zero_hop column to adverts table
|
|
migration_name = "add_zero_hop_to_adverts"
|
|
existing = conn.execute(
|
|
"SELECT migration_name FROM migrations WHERE migration_name = ?",
|
|
(migration_name,),
|
|
).fetchone()
|
|
|
|
if not existing:
|
|
# Check if zero_hop column already exists
|
|
cursor = conn.execute("PRAGMA table_info(adverts)")
|
|
columns = [column[1] for column in cursor.fetchall()]
|
|
|
|
if "zero_hop" not in columns:
|
|
conn.execute(
|
|
"ALTER TABLE adverts ADD COLUMN zero_hop BOOLEAN NOT NULL DEFAULT FALSE"
|
|
)
|
|
logger.info("Added zero_hop column to adverts table")
|
|
|
|
# Mark migration as applied
|
|
conn.execute(
|
|
"INSERT INTO migrations (migration_name, applied_at) VALUES (?, ?)",
|
|
(migration_name, time.time()),
|
|
)
|
|
logger.info(f"Migration '{migration_name}' applied successfully")
|
|
|
|
# Migration 2: Add LBT metrics columns to packets table
|
|
migration_name = "add_lbt_metrics_to_packets"
|
|
existing = conn.execute(
|
|
"SELECT migration_name FROM migrations WHERE migration_name = ?",
|
|
(migration_name,),
|
|
).fetchone()
|
|
|
|
if not existing:
|
|
# Check if columns already exist
|
|
cursor = conn.execute("PRAGMA table_info(packets)")
|
|
columns = [column[1] for column in cursor.fetchall()]
|
|
|
|
if "lbt_attempts" not in columns:
|
|
conn.execute(
|
|
"ALTER TABLE packets ADD COLUMN lbt_attempts INTEGER DEFAULT 0"
|
|
)
|
|
logger.info("Added lbt_attempts column to packets table")
|
|
|
|
if "lbt_backoff_delays_ms" not in columns:
|
|
conn.execute("ALTER TABLE packets ADD COLUMN lbt_backoff_delays_ms TEXT")
|
|
logger.info("Added lbt_backoff_delays_ms column to packets table")
|
|
|
|
if "lbt_channel_busy" not in columns:
|
|
conn.execute(
|
|
"ALTER TABLE packets ADD COLUMN lbt_channel_busy BOOLEAN DEFAULT FALSE"
|
|
)
|
|
logger.info("Added lbt_channel_busy column to packets table")
|
|
|
|
# Mark migration as applied
|
|
conn.execute(
|
|
"INSERT INTO migrations (migration_name, applied_at) VALUES (?, ?)",
|
|
(migration_name, time.time()),
|
|
)
|
|
logger.info(f"Migration '{migration_name}' applied successfully")
|
|
|
|
# Migration 3: Add api_tokens table
|
|
migration_name = "add_api_tokens_table"
|
|
existing = conn.execute(
|
|
"SELECT migration_name FROM migrations WHERE migration_name = ?",
|
|
(migration_name,),
|
|
).fetchone()
|
|
|
|
if not existing:
|
|
# Check if api_tokens table already exists
|
|
cursor = conn.execute(
|
|
"SELECT name FROM sqlite_master WHERE type='table' AND name='api_tokens'"
|
|
)
|
|
|
|
if not cursor.fetchone():
|
|
conn.execute(
|
|
"""
|
|
CREATE TABLE api_tokens (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
name TEXT NOT NULL,
|
|
token_hash TEXT NOT NULL UNIQUE,
|
|
created_at REAL NOT NULL,
|
|
last_used REAL
|
|
)
|
|
"""
|
|
)
|
|
logger.info("Created api_tokens table")
|
|
|
|
# Mark migration as applied
|
|
conn.execute(
|
|
"INSERT INTO migrations (migration_name, applied_at) VALUES (?, ?)",
|
|
(migration_name, time.time()),
|
|
)
|
|
logger.info(f"Migration '{migration_name}' applied successfully")
|
|
|
|
# Migration 4: Add companion tables for companion identity persistence
|
|
migration_name = "add_companion_tables"
|
|
existing = conn.execute(
|
|
"SELECT migration_name FROM migrations WHERE migration_name = ?",
|
|
(migration_name,),
|
|
).fetchone()
|
|
|
|
if not existing:
|
|
cursor = conn.execute(
|
|
"SELECT name FROM sqlite_master WHERE type='table' AND name='companion_contacts'"
|
|
)
|
|
if not cursor.fetchone():
|
|
conn.execute(
|
|
"""
|
|
CREATE TABLE companion_contacts (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
companion_hash TEXT NOT NULL,
|
|
pubkey BLOB NOT NULL,
|
|
name TEXT NOT NULL,
|
|
adv_type INTEGER NOT NULL DEFAULT 0,
|
|
flags INTEGER NOT NULL DEFAULT 0,
|
|
out_path_len INTEGER NOT NULL DEFAULT -1,
|
|
out_path BLOB,
|
|
last_advert_timestamp INTEGER NOT NULL DEFAULT 0,
|
|
lastmod INTEGER NOT NULL DEFAULT 0,
|
|
gps_lat REAL NOT NULL DEFAULT 0,
|
|
gps_lon REAL NOT NULL DEFAULT 0,
|
|
sync_since INTEGER NOT NULL DEFAULT 0,
|
|
updated_at REAL NOT NULL
|
|
)
|
|
"""
|
|
)
|
|
conn.execute(
|
|
"""
|
|
CREATE TABLE companion_channels (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
companion_hash TEXT NOT NULL,
|
|
channel_idx INTEGER NOT NULL,
|
|
name TEXT NOT NULL,
|
|
secret BLOB NOT NULL,
|
|
updated_at REAL NOT NULL
|
|
)
|
|
"""
|
|
)
|
|
conn.execute(
|
|
"""
|
|
CREATE TABLE companion_messages (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
companion_hash TEXT NOT NULL,
|
|
sender_key BLOB NOT NULL,
|
|
txt_type INTEGER NOT NULL DEFAULT 0,
|
|
timestamp INTEGER NOT NULL DEFAULT 0,
|
|
text TEXT NOT NULL,
|
|
is_channel INTEGER NOT NULL DEFAULT 0,
|
|
channel_idx INTEGER NOT NULL DEFAULT 0,
|
|
path_len INTEGER NOT NULL DEFAULT 0,
|
|
packet_hash TEXT,
|
|
created_at REAL NOT NULL
|
|
)
|
|
"""
|
|
)
|
|
conn.execute(
|
|
"CREATE INDEX IF NOT EXISTS idx_companion_contacts_hash ON companion_contacts(companion_hash)"
|
|
)
|
|
conn.execute(
|
|
"CREATE INDEX IF NOT EXISTS idx_companion_contacts_pubkey ON companion_contacts(companion_hash, pubkey)"
|
|
)
|
|
conn.execute(
|
|
"CREATE INDEX IF NOT EXISTS idx_companion_channels_hash ON companion_channels(companion_hash)"
|
|
)
|
|
conn.execute(
|
|
"CREATE INDEX IF NOT EXISTS idx_companion_messages_hash ON companion_messages(companion_hash)"
|
|
)
|
|
conn.execute(
|
|
"CREATE INDEX IF NOT EXISTS idx_companion_messages_hash_packet ON companion_messages(companion_hash, packet_hash)"
|
|
)
|
|
logger.info(
|
|
"Created companion_contacts, companion_channels, companion_messages tables"
|
|
)
|
|
|
|
conn.execute(
|
|
"INSERT INTO migrations (migration_name, applied_at) VALUES (?, ?)",
|
|
(migration_name, time.time()),
|
|
)
|
|
logger.info(f"Migration '{migration_name}' applied successfully")
|
|
|
|
# Migration 5: Add UNIQUE index on companion_contacts(companion_hash, pubkey)
|
|
# Required for ON CONFLICT upsert in companion_upsert_contact.
|
|
migration_name = "unique_companion_contacts_pubkey"
|
|
existing = conn.execute(
|
|
"SELECT migration_name FROM migrations WHERE migration_name = ?",
|
|
(migration_name,),
|
|
).fetchone()
|
|
|
|
if not existing:
|
|
# Replace the non-unique index with a UNIQUE one
|
|
conn.execute(
|
|
"DROP INDEX IF EXISTS idx_companion_contacts_pubkey"
|
|
)
|
|
conn.execute(
|
|
"CREATE UNIQUE INDEX IF NOT EXISTS idx_companion_contacts_hash_pubkey "
|
|
"ON companion_contacts (companion_hash, pubkey)"
|
|
)
|
|
conn.execute(
|
|
"INSERT INTO migrations (migration_name, applied_at) VALUES (?, ?)",
|
|
(migration_name, time.time()),
|
|
)
|
|
logger.info(f"Migration '{migration_name}' applied successfully")
|
|
|
|
# Migration 6: Normalize companion_hash to 0x-prefixed hex (match room_hash pattern)
|
|
migration_name = "companion_hash_0x_prefix"
|
|
existing = conn.execute(
|
|
"SELECT migration_name FROM migrations WHERE migration_name = ?",
|
|
(migration_name,),
|
|
).fetchone()
|
|
|
|
if not existing:
|
|
for table in ("companion_contacts", "companion_channels", "companion_messages"):
|
|
conn.execute(
|
|
f"UPDATE {table} SET companion_hash = '0x' || companion_hash "
|
|
"WHERE companion_hash NOT LIKE '0x%'"
|
|
)
|
|
conn.execute(
|
|
"INSERT INTO migrations (migration_name, applied_at) VALUES (?, ?)",
|
|
(migration_name, time.time()),
|
|
)
|
|
logger.info(f"Migration '{migration_name}' applied successfully")
|
|
|
|
conn.commit()
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to run migrations: {e}")
|
|
|
|
# API Token methods
|
|
def create_api_token(self, name: str, token_hash: str) -> int:
|
|
"""Create a new API token entry"""
|
|
try:
|
|
with sqlite3.connect(self.sqlite_path) as conn:
|
|
cursor = conn.execute(
|
|
"INSERT INTO api_tokens (name, token_hash, created_at) VALUES (?, ?, ?)",
|
|
(name, token_hash, time.time()),
|
|
)
|
|
return cursor.lastrowid
|
|
except Exception as e:
|
|
logger.error(f"Failed to create API token: {e}")
|
|
raise
|
|
|
|
def verify_api_token(self, token_hash: str) -> Optional[Dict[str, Any]]:
|
|
"""Verify API token and update last_used timestamp"""
|
|
try:
|
|
with sqlite3.connect(self.sqlite_path) as conn:
|
|
cursor = conn.execute(
|
|
"SELECT id, name, created_at FROM api_tokens WHERE token_hash = ?",
|
|
(token_hash,),
|
|
)
|
|
row = cursor.fetchone()
|
|
|
|
if row:
|
|
token_id, name, created_at = row
|
|
|
|
# Update last_used timestamp
|
|
conn.execute(
|
|
"UPDATE api_tokens SET last_used = ? WHERE id = ?", (time.time(), token_id)
|
|
)
|
|
conn.commit()
|
|
|
|
return {"id": token_id, "name": name, "created_at": created_at}
|
|
return None
|
|
except Exception as e:
|
|
logger.error(f"Failed to verify API token: {e}")
|
|
return None
|
|
|
|
def revoke_api_token(self, token_id: int) -> bool:
|
|
"""Revoke (delete) an API token"""
|
|
try:
|
|
with sqlite3.connect(self.sqlite_path) as conn:
|
|
cursor = conn.execute("DELETE FROM api_tokens WHERE id = ?", (token_id,))
|
|
return cursor.rowcount > 0
|
|
except Exception as e:
|
|
logger.error(f"Failed to revoke API token: {e}")
|
|
return False
|
|
|
|
def list_api_tokens(self) -> List[Dict[str, Any]]:
|
|
"""List all API tokens (without sensitive data)"""
|
|
try:
|
|
with sqlite3.connect(self.sqlite_path) as conn:
|
|
cursor = conn.execute(
|
|
"SELECT id, name, created_at, last_used FROM api_tokens ORDER BY created_at DESC"
|
|
)
|
|
|
|
tokens = []
|
|
for row in cursor.fetchall():
|
|
tokens.append(
|
|
{"id": row[0], "name": row[1], "created_at": row[2], "last_used": row[3]}
|
|
)
|
|
return tokens
|
|
except Exception as e:
|
|
logger.error(f"Failed to list API tokens: {e}")
|
|
return []
|
|
|
|
def store_packet(self, record: dict):
|
|
try:
|
|
with sqlite3.connect(self.sqlite_path) as conn:
|
|
orig_path = record.get("original_path")
|
|
fwd_path = record.get("forwarded_path")
|
|
try:
|
|
orig_path_val = json.dumps(orig_path) if orig_path is not None else None
|
|
except Exception:
|
|
orig_path_val = str(orig_path)
|
|
try:
|
|
fwd_path_val = json.dumps(fwd_path) if fwd_path is not None else None
|
|
except Exception:
|
|
fwd_path_val = str(fwd_path)
|
|
|
|
conn.execute(
|
|
"""
|
|
INSERT INTO packets (
|
|
timestamp, type, route, length, rssi, snr, score,
|
|
transmitted, is_duplicate, drop_reason, src_hash, dst_hash, path_hash,
|
|
header, transport_codes, payload, payload_length,
|
|
tx_delay_ms, packet_hash, original_path, forwarded_path, raw_packet,
|
|
lbt_attempts, lbt_backoff_delays_ms, lbt_channel_busy
|
|
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
""",
|
|
(
|
|
record.get("timestamp", time.time()),
|
|
record.get("type", 0),
|
|
record.get("route", 0),
|
|
record.get("length", 0),
|
|
record.get("rssi"),
|
|
record.get("snr"),
|
|
record.get("score"),
|
|
int(bool(record.get("transmitted", False))),
|
|
int(bool(record.get("is_duplicate", False))),
|
|
record.get("drop_reason"),
|
|
record.get("src_hash"),
|
|
record.get("dst_hash"),
|
|
record.get("path_hash"),
|
|
record.get("header"),
|
|
record.get("transport_codes"),
|
|
record.get("payload"),
|
|
record.get("payload_length"),
|
|
record.get("tx_delay_ms"),
|
|
record.get("packet_hash"),
|
|
orig_path_val,
|
|
fwd_path_val,
|
|
record.get("raw_packet"),
|
|
record.get("lbt_attempts", 0),
|
|
(
|
|
json.dumps(record.get("lbt_backoff_delays_ms"))
|
|
if record.get("lbt_backoff_delays_ms")
|
|
else None
|
|
),
|
|
int(bool(record.get("lbt_channel_busy", False))),
|
|
),
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to store packet in SQLite: {e}")
|
|
|
|
def store_advert(self, record: dict):
|
|
try:
|
|
with sqlite3.connect(self.sqlite_path) as conn:
|
|
conn.row_factory = sqlite3.Row
|
|
existing = conn.execute(
|
|
"SELECT pubkey, first_seen, advert_count, zero_hop, rssi, snr FROM adverts WHERE pubkey = ? ORDER BY last_seen DESC LIMIT 1",
|
|
(record.get("pubkey", ""),),
|
|
).fetchone()
|
|
|
|
current_time = record.get("timestamp", time.time())
|
|
|
|
if existing:
|
|
# Use incoming zero_hop value (already calculated from route_type + path_len)
|
|
incoming_zero_hop = record.get("zero_hop", False)
|
|
existing_zero_hop = bool(existing["zero_hop"])
|
|
|
|
# Signal measurement logic:
|
|
# - If incoming is zero-hop: ALWAYS store incoming rssi/snr (most recent zero-hop measurement)
|
|
# - If incoming is multi-hop and existing was zero-hop: preserve existing (don't overwrite zero-hop with multi-hop)
|
|
# - If both are multi-hop: signal measurements are not applicable
|
|
if incoming_zero_hop:
|
|
rssi_to_store = record.get("rssi")
|
|
snr_to_store = record.get("snr")
|
|
zero_hop_to_store = True
|
|
elif existing_zero_hop:
|
|
rssi_to_store = existing["rssi"]
|
|
snr_to_store = existing["snr"]
|
|
zero_hop_to_store = True
|
|
else:
|
|
rssi_to_store = None
|
|
snr_to_store = None
|
|
zero_hop_to_store = False
|
|
|
|
conn.execute(
|
|
"""
|
|
UPDATE adverts
|
|
SET timestamp = ?, node_name = ?, is_repeater = ?, route_type = ?,
|
|
contact_type = ?, latitude = ?, longitude = ?, last_seen = ?,
|
|
rssi = ?, snr = ?, advert_count = advert_count + 1, is_new_neighbor = 0,
|
|
zero_hop = ?
|
|
WHERE pubkey = ?
|
|
""",
|
|
(
|
|
current_time,
|
|
record.get("node_name"),
|
|
record.get("is_repeater", False),
|
|
record.get("route_type"),
|
|
record.get("contact_type"),
|
|
record.get("latitude"),
|
|
record.get("longitude"),
|
|
current_time,
|
|
rssi_to_store,
|
|
snr_to_store,
|
|
zero_hop_to_store,
|
|
record.get("pubkey", ""),
|
|
),
|
|
)
|
|
else:
|
|
conn.execute(
|
|
"""
|
|
INSERT INTO adverts (
|
|
timestamp, pubkey, node_name, is_repeater, route_type, contact_type,
|
|
latitude, longitude, first_seen, last_seen, rssi, snr, advert_count,
|
|
is_new_neighbor, zero_hop
|
|
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
""",
|
|
(
|
|
current_time,
|
|
record.get("pubkey", ""),
|
|
record.get("node_name"),
|
|
record.get("is_repeater", False),
|
|
record.get("route_type"),
|
|
record.get("contact_type"),
|
|
record.get("latitude"),
|
|
record.get("longitude"),
|
|
current_time,
|
|
current_time,
|
|
record.get("rssi"),
|
|
record.get("snr"),
|
|
1,
|
|
True,
|
|
record.get("zero_hop", False),
|
|
),
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to store advert in SQLite: {e}")
|
|
|
|
def store_noise_floor(self, record: dict):
|
|
try:
|
|
with sqlite3.connect(self.sqlite_path) as conn:
|
|
conn.execute(
|
|
"""
|
|
INSERT INTO noise_floor (timestamp, noise_floor_dbm)
|
|
VALUES (?, ?)
|
|
""",
|
|
(record.get("timestamp", time.time()), record.get("noise_floor_dbm")),
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Failed to store noise floor in SQLite: {e}")
|
|
|
|
def get_packet_stats(self, hours: int = 24) -> dict:
|
|
try:
|
|
cutoff = time.time() - (hours * 3600)
|
|
|
|
with sqlite3.connect(self.sqlite_path) as conn:
|
|
conn.row_factory = sqlite3.Row
|
|
|
|
stats = conn.execute(
|
|
"""
|
|
SELECT
|
|
COUNT(*) as total_packets,
|
|
SUM(transmitted) as transmitted_packets,
|
|
SUM(CASE WHEN transmitted = 0 THEN 1 ELSE 0 END) as dropped_packets,
|
|
AVG(rssi) as avg_rssi,
|
|
AVG(snr) as avg_snr,
|
|
AVG(score) as avg_score,
|
|
AVG(payload_length) as avg_payload_length,
|
|
AVG(tx_delay_ms) as avg_tx_delay
|
|
FROM packets
|
|
WHERE timestamp > ?
|
|
""",
|
|
(cutoff,),
|
|
).fetchone()
|
|
|
|
types = conn.execute(
|
|
"""
|
|
SELECT type, COUNT(*) as count
|
|
FROM packets
|
|
WHERE timestamp > ?
|
|
GROUP BY type
|
|
ORDER BY count DESC
|
|
""",
|
|
(cutoff,),
|
|
).fetchall()
|
|
|
|
drop_reasons = conn.execute(
|
|
"""
|
|
SELECT drop_reason, COUNT(*) as count
|
|
FROM packets
|
|
WHERE timestamp > ? AND transmitted = 0 AND drop_reason IS NOT NULL
|
|
GROUP BY drop_reason
|
|
ORDER BY count DESC
|
|
""",
|
|
(cutoff,),
|
|
).fetchall()
|
|
|
|
return {
|
|
"total_packets": stats["total_packets"],
|
|
"transmitted_packets": stats["transmitted_packets"],
|
|
"dropped_packets": stats["dropped_packets"],
|
|
"avg_rssi": round(stats["avg_rssi"] or 0, 1),
|
|
"avg_snr": round(stats["avg_snr"] or 0, 1),
|
|
"avg_score": round(stats["avg_score"] or 0, 3),
|
|
"avg_payload_length": round(stats["avg_payload_length"] or 0, 1),
|
|
"avg_tx_delay": round(stats["avg_tx_delay"] or 0, 1),
|
|
"packet_types": [{"type": row["type"], "count": row["count"]} for row in types],
|
|
"drop_reasons": [
|
|
{"reason": row["drop_reason"], "count": row["count"]}
|
|
for row in drop_reasons
|
|
],
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to get packet stats: {e}")
|
|
return {}
|
|
|
|
def get_recent_packets(self, limit: int = 100) -> list:
|
|
try:
|
|
with sqlite3.connect(self.sqlite_path) as conn:
|
|
conn.row_factory = sqlite3.Row
|
|
|
|
packets = conn.execute(
|
|
"""
|
|
SELECT
|
|
timestamp, type, route, length, rssi, snr, score,
|
|
transmitted, is_duplicate, drop_reason, src_hash, dst_hash, path_hash,
|
|
header, transport_codes, payload, payload_length,
|
|
tx_delay_ms, packet_hash, original_path, forwarded_path, raw_packet,
|
|
lbt_attempts, lbt_backoff_delays_ms, lbt_channel_busy
|
|
FROM packets
|
|
ORDER BY timestamp DESC
|
|
LIMIT ?
|
|
""",
|
|
(limit,),
|
|
).fetchall()
|
|
|
|
return [dict(row) for row in packets]
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to get recent packets: {e}")
|
|
return []
|
|
|
|
def get_filtered_packets(
|
|
self,
|
|
packet_type: Optional[int] = None,
|
|
route: Optional[int] = None,
|
|
start_timestamp: Optional[float] = None,
|
|
end_timestamp: Optional[float] = None,
|
|
limit: int = 1000,
|
|
offset: int = 0,
|
|
) -> list:
|
|
try:
|
|
with sqlite3.connect(self.sqlite_path) as conn:
|
|
conn.row_factory = sqlite3.Row
|
|
|
|
where_clauses = []
|
|
params = []
|
|
|
|
if packet_type is not None:
|
|
where_clauses.append("type = ?")
|
|
params.append(packet_type)
|
|
|
|
if route is not None:
|
|
where_clauses.append("route = ?")
|
|
params.append(route)
|
|
|
|
if start_timestamp is not None:
|
|
where_clauses.append("timestamp >= ?")
|
|
params.append(start_timestamp)
|
|
|
|
if end_timestamp is not None:
|
|
where_clauses.append("timestamp <= ?")
|
|
params.append(end_timestamp)
|
|
|
|
base_query = """
|
|
SELECT
|
|
timestamp, type, route, length, rssi, snr, score,
|
|
transmitted, is_duplicate, drop_reason, src_hash, dst_hash, path_hash,
|
|
header, transport_codes, payload, payload_length,
|
|
tx_delay_ms, packet_hash, original_path, forwarded_path, raw_packet,
|
|
lbt_attempts, lbt_backoff_delays_ms, lbt_channel_busy
|
|
FROM packets
|
|
"""
|
|
|
|
if where_clauses:
|
|
query = f"{base_query} WHERE {' AND '.join(where_clauses)}"
|
|
else:
|
|
query = base_query
|
|
|
|
query += " ORDER BY timestamp DESC LIMIT ? OFFSET ?"
|
|
params.append(limit)
|
|
params.append(offset)
|
|
|
|
packets = conn.execute(query, params).fetchall()
|
|
|
|
return [dict(row) for row in packets]
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to get filtered packets: {e}")
|
|
return []
|
|
|
|
def get_packet_by_hash(self, packet_hash: str) -> Optional[dict]:
|
|
try:
|
|
with sqlite3.connect(self.sqlite_path) as conn:
|
|
conn.row_factory = sqlite3.Row
|
|
|
|
packet = conn.execute(
|
|
"""
|
|
SELECT
|
|
timestamp, type, route, length, rssi, snr, score,
|
|
transmitted, is_duplicate, drop_reason, src_hash, dst_hash, path_hash,
|
|
header, transport_codes, payload, payload_length,
|
|
tx_delay_ms, packet_hash, original_path, forwarded_path, raw_packet,
|
|
lbt_attempts, lbt_backoff_delays_ms, lbt_channel_busy
|
|
FROM packets
|
|
WHERE packet_hash = ?
|
|
""",
|
|
(packet_hash,),
|
|
).fetchone()
|
|
|
|
return dict(packet) if packet else None
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to get packet by hash: {e}")
|
|
return None
|
|
|
|
def get_packet_type_stats(self, hours: int = 24) -> dict:
|
|
try:
|
|
cutoff = time.time() - (hours * 3600)
|
|
|
|
# Align with pyMC_core feat/newRadios PAYLOAD_TYPES (0x0B = CONTROL)
|
|
try:
|
|
from pymc_core.protocol.utils import PAYLOAD_TYPES as _PT
|
|
_human = {
|
|
"REQ": "Request",
|
|
"RESPONSE": "Response",
|
|
"TXT_MSG": "Plain Text Message",
|
|
"ACK": "Acknowledgment",
|
|
"ADVERT": "Node Advertisement",
|
|
"GRP_TXT": "Group Text Message",
|
|
"GRP_DATA": "Group Datagram",
|
|
"ANON_REQ": "Anonymous Request",
|
|
"PATH": "Returned Path",
|
|
"TRACE": "Trace",
|
|
"MULTIPART": "Multi-part Packet",
|
|
"CONTROL": "Control",
|
|
"RAW_CUSTOM": "Custom Packet",
|
|
}
|
|
packet_type_names = {}
|
|
for i in range(16):
|
|
code = _PT.get(i)
|
|
if code:
|
|
label = _human.get(code, code.replace("_", " ").title())
|
|
packet_type_names[i] = f"{label} ({code})"
|
|
else:
|
|
packet_type_names[i] = f"Reserved Type {i}"
|
|
except ImportError:
|
|
packet_type_names = {
|
|
0: "Request (REQ)",
|
|
1: "Response (RESPONSE)",
|
|
2: "Plain Text Message (TXT_MSG)",
|
|
3: "Acknowledgment (ACK)",
|
|
4: "Node Advertisement (ADVERT)",
|
|
5: "Group Text Message (GRP_TXT)",
|
|
6: "Group Datagram (GRP_DATA)",
|
|
7: "Anonymous Request (ANON_REQ)",
|
|
8: "Returned Path (PATH)",
|
|
9: "Trace (TRACE)",
|
|
10: "Multi-part Packet (MULTIPART)",
|
|
11: "Control (CONTROL)",
|
|
12: "Reserved Type 12",
|
|
13: "Reserved Type 13",
|
|
14: "Reserved Type 14",
|
|
15: "Custom Packet (RAW_CUSTOM)",
|
|
}
|
|
|
|
with sqlite3.connect(self.sqlite_path) as conn:
|
|
conn.row_factory = sqlite3.Row
|
|
|
|
type_counts = {}
|
|
for packet_type in range(16):
|
|
count = conn.execute(
|
|
"SELECT COUNT(*) FROM packets WHERE type = ? AND timestamp > ?",
|
|
(packet_type, cutoff),
|
|
).fetchone()[0]
|
|
|
|
type_name = packet_type_names.get(packet_type, f"Type {packet_type}")
|
|
if count > 0:
|
|
type_counts[type_name] = count
|
|
|
|
other_count = conn.execute(
|
|
"SELECT COUNT(*) FROM packets WHERE type > 15 AND timestamp > ?", (cutoff,)
|
|
).fetchone()[0]
|
|
if other_count > 0:
|
|
type_counts["Other Types (>15)"] = other_count
|
|
|
|
return {
|
|
"hours": hours,
|
|
"packet_type_totals": type_counts,
|
|
"total_packets": sum(type_counts.values()),
|
|
"period": f"{hours} hours",
|
|
"data_source": "sqlite",
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to get packet type stats from SQLite: {e}")
|
|
return {"error": str(e), "data_source": "error"}
|
|
|
|
def get_route_stats(self, hours: int = 24) -> dict:
|
|
|
|
try:
|
|
cutoff = time.time() - (hours * 3600)
|
|
|
|
with sqlite3.connect(self.sqlite_path) as conn:
|
|
conn.row_factory = sqlite3.Row
|
|
|
|
route_counts = {}
|
|
route_names = {0: "Transport Flood", 1: "Flood", 2: "Direct", 3: "Transport Direct"}
|
|
|
|
for route_type in range(4):
|
|
count = conn.execute(
|
|
"SELECT COUNT(*) FROM packets WHERE route = ? AND timestamp > ?",
|
|
(route_type, cutoff),
|
|
).fetchone()[0]
|
|
|
|
route_name = route_names.get(route_type, f"Route {route_type}")
|
|
if count > 0:
|
|
route_counts[route_name] = count
|
|
|
|
# Count any other route types > 3
|
|
other_count = conn.execute(
|
|
"SELECT COUNT(*) FROM packets WHERE route > 3 AND timestamp > ?", (cutoff,)
|
|
).fetchone()[0]
|
|
if other_count > 0:
|
|
route_counts["Other Routes (>3)"] = other_count
|
|
|
|
return {
|
|
"hours": hours,
|
|
"route_totals": route_counts,
|
|
"total_packets": sum(route_counts.values()),
|
|
"period": f"{hours} hours",
|
|
"data_source": "sqlite",
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to get route stats from SQLite: {e}")
|
|
return {"error": str(e), "data_source": "error"}
|
|
|
|
def get_neighbors(self) -> dict:
|
|
try:
|
|
with sqlite3.connect(self.sqlite_path) as conn:
|
|
conn.row_factory = sqlite3.Row
|
|
|
|
neighbors = conn.execute(
|
|
"""
|
|
SELECT pubkey, node_name, is_repeater, route_type, contact_type,
|
|
latitude, longitude, first_seen, last_seen, rssi, snr, advert_count, zero_hop
|
|
FROM adverts a1
|
|
WHERE last_seen = (
|
|
SELECT MAX(last_seen)
|
|
FROM adverts a2
|
|
WHERE a2.pubkey = a1.pubkey
|
|
)
|
|
ORDER BY last_seen DESC
|
|
"""
|
|
).fetchall()
|
|
|
|
result = {}
|
|
for row in neighbors:
|
|
result[row["pubkey"]] = {
|
|
"node_name": row["node_name"],
|
|
"is_repeater": bool(row["is_repeater"]),
|
|
"route_type": row["route_type"],
|
|
"contact_type": row["contact_type"],
|
|
"latitude": row["latitude"],
|
|
"longitude": row["longitude"],
|
|
"first_seen": row["first_seen"],
|
|
"last_seen": row["last_seen"],
|
|
"rssi": row["rssi"],
|
|
"snr": row["snr"],
|
|
"advert_count": row["advert_count"],
|
|
"zero_hop": bool(row["zero_hop"]),
|
|
}
|
|
|
|
return result
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to get neighbors: {e}")
|
|
return {}
|
|
|
|
def get_noise_floor_history(self, hours: int = 24, limit: int = None) -> list:
|
|
try:
|
|
cutoff = time.time() - (hours * 3600)
|
|
|
|
with sqlite3.connect(self.sqlite_path) as conn:
|
|
conn.row_factory = sqlite3.Row
|
|
|
|
# Build query with optional limit
|
|
query = """
|
|
SELECT timestamp, noise_floor_dbm
|
|
FROM noise_floor
|
|
WHERE timestamp > ?
|
|
ORDER BY timestamp DESC
|
|
"""
|
|
|
|
if limit:
|
|
query += f" LIMIT {int(limit)}"
|
|
|
|
measurements = conn.execute(query, (cutoff,)).fetchall()
|
|
|
|
# Reverse to get chronological order (oldest to newest)
|
|
result = [
|
|
{"timestamp": row["timestamp"], "noise_floor_dbm": row["noise_floor_dbm"]}
|
|
for row in reversed(measurements)
|
|
]
|
|
|
|
return result
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to get noise floor history: {e}")
|
|
return []
|
|
|
|
def get_noise_floor_stats(self, hours: int = 24) -> dict:
|
|
try:
|
|
cutoff = time.time() - (hours * 3600)
|
|
|
|
with sqlite3.connect(self.sqlite_path) as conn:
|
|
conn.row_factory = sqlite3.Row
|
|
|
|
stats = conn.execute(
|
|
"""
|
|
SELECT
|
|
COUNT(*) as measurement_count,
|
|
AVG(noise_floor_dbm) as avg_noise_floor,
|
|
MIN(noise_floor_dbm) as min_noise_floor,
|
|
MAX(noise_floor_dbm) as max_noise_floor
|
|
FROM noise_floor
|
|
WHERE timestamp > ?
|
|
""",
|
|
(cutoff,),
|
|
).fetchone()
|
|
|
|
return {
|
|
"measurement_count": stats["measurement_count"],
|
|
"avg_noise_floor": round(stats["avg_noise_floor"] or 0, 1),
|
|
"min_noise_floor": round(stats["min_noise_floor"] or 0, 1),
|
|
"max_noise_floor": round(stats["max_noise_floor"] or 0, 1),
|
|
"hours": hours,
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to get noise floor stats: {e}")
|
|
return {}
|
|
|
|
def cleanup_old_data(self, days: int = 7):
|
|
try:
|
|
cutoff = time.time() - (days * 24 * 3600)
|
|
|
|
with sqlite3.connect(self.sqlite_path) as conn:
|
|
result = conn.execute("DELETE FROM packets WHERE timestamp < ?", (cutoff,))
|
|
packets_deleted = result.rowcount
|
|
|
|
result = conn.execute("DELETE FROM adverts WHERE timestamp < ?", (cutoff,))
|
|
adverts_deleted = result.rowcount
|
|
|
|
result = conn.execute("DELETE FROM noise_floor WHERE timestamp < ?", (cutoff,))
|
|
noise_deleted = result.rowcount
|
|
|
|
conn.commit()
|
|
|
|
if packets_deleted > 0 or adverts_deleted > 0 or noise_deleted > 0:
|
|
logger.info(
|
|
f"Cleaned up {packets_deleted} old packets, {adverts_deleted} old adverts, {noise_deleted} old noise measurements"
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to cleanup old data: {e}")
|
|
|
|
def get_cumulative_counts(self) -> dict:
|
|
try:
|
|
with sqlite3.connect(self.sqlite_path) as conn:
|
|
type_counts = {}
|
|
for i in range(16):
|
|
count = conn.execute(
|
|
"SELECT COUNT(*) FROM packets WHERE type = ?", (i,)
|
|
).fetchone()[0]
|
|
type_counts[f"type_{i}"] = count
|
|
|
|
other_count = conn.execute(
|
|
"SELECT COUNT(*) FROM packets WHERE type > 15"
|
|
).fetchone()[0]
|
|
type_counts["type_other"] = other_count
|
|
|
|
rx_total = conn.execute("SELECT COUNT(*) FROM packets").fetchone()[0]
|
|
tx_total = conn.execute(
|
|
"SELECT COUNT(*) FROM packets WHERE transmitted = 1"
|
|
).fetchone()[0]
|
|
drop_total = conn.execute(
|
|
"SELECT COUNT(*) FROM packets WHERE transmitted = 0"
|
|
).fetchone()[0]
|
|
|
|
return {
|
|
"rx_total": rx_total,
|
|
"tx_total": tx_total,
|
|
"drop_total": drop_total,
|
|
"type_counts": type_counts,
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to get cumulative counts: {e}")
|
|
return {"rx_total": 0, "tx_total": 0, "drop_total": 0, "type_counts": {}}
|
|
|
|
def get_adverts_by_contact_type(
|
|
self, contact_type: str, limit: Optional[int] = None, hours: Optional[int] = None
|
|
) -> List[dict]:
|
|
|
|
try:
|
|
with sqlite3.connect(self.sqlite_path) as conn:
|
|
conn.row_factory = sqlite3.Row
|
|
|
|
query = """
|
|
SELECT id, timestamp, pubkey, node_name, is_repeater, route_type,
|
|
contact_type, latitude, longitude, first_seen, last_seen,
|
|
rssi, snr, advert_count, is_new_neighbor, zero_hop
|
|
FROM adverts
|
|
WHERE contact_type = ?
|
|
"""
|
|
params = [contact_type]
|
|
|
|
if hours is not None:
|
|
cutoff = time.time() - (hours * 3600)
|
|
query += " AND timestamp > ?"
|
|
params.append(cutoff)
|
|
|
|
query += " ORDER BY timestamp DESC"
|
|
|
|
if limit is not None:
|
|
query += " LIMIT ?"
|
|
params.append(limit)
|
|
|
|
rows = conn.execute(query, params).fetchall()
|
|
|
|
adverts = []
|
|
for row in rows:
|
|
advert = {
|
|
"id": row["id"],
|
|
"timestamp": row["timestamp"],
|
|
"pubkey": row["pubkey"],
|
|
"node_name": row["node_name"],
|
|
"is_repeater": bool(row["is_repeater"]),
|
|
"route_type": row["route_type"],
|
|
"contact_type": row["contact_type"],
|
|
"latitude": row["latitude"],
|
|
"longitude": row["longitude"],
|
|
"first_seen": row["first_seen"],
|
|
"last_seen": row["last_seen"],
|
|
"rssi": row["rssi"],
|
|
"snr": row["snr"],
|
|
"advert_count": row["advert_count"],
|
|
"is_new_neighbor": bool(row["is_new_neighbor"]),
|
|
"zero_hop": bool(row["zero_hop"]),
|
|
}
|
|
adverts.append(advert)
|
|
|
|
return adverts
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to get adverts by contact_type '{contact_type}': {e}")
|
|
return []
|
|
|
|
def generate_transport_key(self, name: str, key_length_bytes: int = 32) -> str:
|
|
"""
|
|
Generate a transport key using the proper MeshCore key derivation.
|
|
|
|
Args:
|
|
name: The key name to derive the key from
|
|
key_length_bytes: Length of the key in bytes (default: 32 bytes = 256 bits)
|
|
|
|
Returns:
|
|
A base64-encoded transport key derived from the name
|
|
"""
|
|
try:
|
|
from pymc_core.protocol.transport_keys import get_auto_key_for
|
|
|
|
# Use the proper MeshCore key derivation function
|
|
key_bytes = get_auto_key_for(name)
|
|
|
|
# Encode to base64 for safe storage and transmission
|
|
key = base64.b64encode(key_bytes).decode("utf-8")
|
|
|
|
logger.debug(
|
|
f"Generated transport key for '{name}' with {len(key_bytes)} bytes ({len(key)} base64 chars)"
|
|
)
|
|
return key
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to generate transport key using get_auto_key_for: {e}")
|
|
# Fallback to secure random if MeshCore function fails
|
|
try:
|
|
random_bytes = secrets.token_bytes(key_length_bytes)
|
|
key = base64.b64encode(random_bytes).decode("utf-8")
|
|
logger.warning(f"Using fallback random key generation for '{name}'")
|
|
return key
|
|
except Exception as fallback_e:
|
|
logger.error(f"Fallback key generation also failed: {fallback_e}")
|
|
raise
|
|
|
|
def create_transport_key(
|
|
self,
|
|
name: str,
|
|
flood_policy: str,
|
|
transport_key: Optional[str] = None,
|
|
parent_id: Optional[int] = None,
|
|
last_used: Optional[float] = None,
|
|
) -> Optional[int]:
|
|
try:
|
|
# Generate key if not provided
|
|
if transport_key is None:
|
|
transport_key = self.generate_transport_key(name)
|
|
|
|
current_time = time.time()
|
|
with sqlite3.connect(self.sqlite_path) as conn:
|
|
cursor = conn.execute(
|
|
"""
|
|
INSERT INTO transport_keys (name, flood_policy, transport_key, parent_id, last_used, created_at, updated_at)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?)
|
|
""",
|
|
(
|
|
name,
|
|
flood_policy,
|
|
transport_key,
|
|
parent_id,
|
|
last_used,
|
|
current_time,
|
|
current_time,
|
|
),
|
|
)
|
|
return cursor.lastrowid
|
|
except Exception as e:
|
|
logger.error(f"Failed to create transport key: {e}")
|
|
return None
|
|
|
|
def get_transport_keys(self) -> List[dict]:
|
|
try:
|
|
with sqlite3.connect(self.sqlite_path) as conn:
|
|
conn.row_factory = sqlite3.Row
|
|
rows = conn.execute(
|
|
"""
|
|
SELECT id, name, flood_policy, transport_key, parent_id, last_used, created_at, updated_at
|
|
FROM transport_keys
|
|
ORDER BY created_at ASC
|
|
"""
|
|
).fetchall()
|
|
|
|
return [
|
|
{
|
|
"id": row["id"],
|
|
"name": row["name"],
|
|
"flood_policy": row["flood_policy"],
|
|
"transport_key": row["transport_key"],
|
|
"parent_id": row["parent_id"],
|
|
"last_used": row["last_used"],
|
|
"created_at": row["created_at"],
|
|
"updated_at": row["updated_at"],
|
|
}
|
|
for row in rows
|
|
]
|
|
except Exception as e:
|
|
logger.error(f"Failed to get transport keys: {e}")
|
|
return []
|
|
|
|
def get_transport_key_by_id(self, key_id: int) -> Optional[dict]:
|
|
try:
|
|
with sqlite3.connect(self.sqlite_path) as conn:
|
|
conn.row_factory = sqlite3.Row
|
|
row = conn.execute(
|
|
"""
|
|
SELECT id, name, flood_policy, transport_key, parent_id, last_used, created_at, updated_at
|
|
FROM transport_keys WHERE id = ?
|
|
""",
|
|
(key_id,),
|
|
).fetchone()
|
|
|
|
if row:
|
|
return {
|
|
"id": row["id"],
|
|
"name": row["name"],
|
|
"flood_policy": row["flood_policy"],
|
|
"transport_key": row["transport_key"],
|
|
"parent_id": row["parent_id"],
|
|
"last_used": row["last_used"],
|
|
"created_at": row["created_at"],
|
|
"updated_at": row["updated_at"],
|
|
}
|
|
return None
|
|
except Exception as e:
|
|
logger.error(f"Failed to get transport key by id: {e}")
|
|
return None
|
|
|
|
def update_transport_key(
|
|
self,
|
|
key_id: int,
|
|
name: Optional[str] = None,
|
|
flood_policy: Optional[str] = None,
|
|
transport_key: Optional[str] = None,
|
|
parent_id: Optional[int] = None,
|
|
last_used: Optional[float] = None,
|
|
) -> bool:
|
|
try:
|
|
updates = []
|
|
params = []
|
|
|
|
if name is not None:
|
|
updates.append("name = ?")
|
|
params.append(name)
|
|
if flood_policy is not None:
|
|
updates.append("flood_policy = ?")
|
|
params.append(flood_policy)
|
|
if transport_key is not None:
|
|
updates.append("transport_key = ?")
|
|
params.append(transport_key)
|
|
if parent_id is not None:
|
|
updates.append("parent_id = ?")
|
|
params.append(parent_id)
|
|
if last_used is not None:
|
|
updates.append("last_used = ?")
|
|
params.append(last_used)
|
|
|
|
if not updates:
|
|
return False
|
|
|
|
updates.append("updated_at = ?")
|
|
params.append(time.time())
|
|
params.append(key_id)
|
|
|
|
with sqlite3.connect(self.sqlite_path) as conn:
|
|
cursor = conn.execute(
|
|
f"""
|
|
UPDATE transport_keys SET {', '.join(updates)}
|
|
WHERE id = ?
|
|
""",
|
|
params,
|
|
)
|
|
return cursor.rowcount > 0
|
|
except Exception as e:
|
|
logger.error(f"Failed to update transport key: {e}")
|
|
return False
|
|
|
|
def delete_transport_key(self, key_id: int) -> bool:
|
|
try:
|
|
with sqlite3.connect(self.sqlite_path) as conn:
|
|
cursor = conn.execute("DELETE FROM transport_keys WHERE id = ?", (key_id,))
|
|
return cursor.rowcount > 0
|
|
except Exception as e:
|
|
logger.error(f"Failed to delete transport key: {e}")
|
|
return False
|
|
|
|
def delete_advert(self, advert_id: int) -> bool:
|
|
try:
|
|
with sqlite3.connect(self.sqlite_path) as conn:
|
|
cursor = conn.execute("DELETE FROM adverts WHERE id = ?", (advert_id,))
|
|
return cursor.rowcount > 0
|
|
except Exception as e:
|
|
logger.error(f"Failed to delete advert: {e}")
|
|
return False
|
|
|
|
# ------------------------------------------------------------------
|
|
# Room Server Methods
|
|
# ------------------------------------------------------------------
|
|
|
|
def insert_room_message(
|
|
self,
|
|
room_hash: str,
|
|
author_pubkey: str,
|
|
message_text: str,
|
|
post_timestamp: float,
|
|
sender_timestamp: float = None,
|
|
txt_type: int = 0,
|
|
) -> Optional[int]:
|
|
"""Insert a new room message and return its ID."""
|
|
try:
|
|
with sqlite3.connect(self.sqlite_path) as conn:
|
|
cursor = conn.execute(
|
|
"""
|
|
INSERT INTO room_messages (
|
|
room_hash, author_pubkey, post_timestamp, sender_timestamp,
|
|
message_text, txt_type, created_at
|
|
) VALUES (?, ?, ?, ?, ?, ?, ?)
|
|
""",
|
|
(
|
|
room_hash,
|
|
author_pubkey,
|
|
post_timestamp,
|
|
sender_timestamp,
|
|
message_text,
|
|
txt_type,
|
|
time.time(),
|
|
),
|
|
)
|
|
return cursor.lastrowid
|
|
except Exception as e:
|
|
logger.error(f"Failed to insert room message: {e}")
|
|
return None
|
|
|
|
def get_unsynced_messages(
|
|
self, room_hash: str, client_pubkey: str, sync_since: float, limit: int = 100
|
|
) -> List[Dict]:
|
|
"""Get messages for a room that client hasn't synced yet."""
|
|
try:
|
|
with sqlite3.connect(self.sqlite_path) as conn:
|
|
conn.row_factory = sqlite3.Row
|
|
cursor = conn.execute(
|
|
"""
|
|
SELECT * FROM room_messages
|
|
WHERE room_hash = ?
|
|
AND post_timestamp > ?
|
|
AND author_pubkey != ?
|
|
ORDER BY post_timestamp ASC
|
|
LIMIT ?
|
|
""",
|
|
(room_hash, sync_since, client_pubkey, limit),
|
|
)
|
|
return [dict(row) for row in cursor.fetchall()]
|
|
except Exception as e:
|
|
logger.error(f"Failed to get unsynced messages: {e}")
|
|
return []
|
|
|
|
def get_unsynced_count(self, room_hash: str, client_pubkey: str, sync_since: float) -> int:
|
|
"""Count unsynced messages for a client."""
|
|
try:
|
|
with sqlite3.connect(self.sqlite_path) as conn:
|
|
cursor = conn.execute(
|
|
"""
|
|
SELECT COUNT(*) FROM room_messages
|
|
WHERE room_hash = ?
|
|
AND post_timestamp > ?
|
|
AND author_pubkey != ?
|
|
""",
|
|
(room_hash, sync_since, client_pubkey),
|
|
)
|
|
return cursor.fetchone()[0]
|
|
except Exception as e:
|
|
logger.error(f"Failed to count unsynced messages: {e}")
|
|
return 0
|
|
|
|
def upsert_client_sync(self, room_hash: str, client_pubkey: str, **kwargs) -> bool:
|
|
"""Insert or update client sync state."""
|
|
try:
|
|
with sqlite3.connect(self.sqlite_path) as conn:
|
|
# Check if exists
|
|
cursor = conn.execute(
|
|
"""
|
|
SELECT id FROM room_client_sync
|
|
WHERE room_hash = ? AND client_pubkey = ?
|
|
""",
|
|
(room_hash, client_pubkey),
|
|
)
|
|
existing = cursor.fetchone()
|
|
|
|
kwargs["updated_at"] = time.time()
|
|
|
|
if existing:
|
|
# Update
|
|
set_clauses = []
|
|
values = []
|
|
for key, value in kwargs.items():
|
|
set_clauses.append(f"{key} = ?")
|
|
values.append(value)
|
|
values.extend([room_hash, client_pubkey])
|
|
|
|
conn.execute(
|
|
f"""
|
|
UPDATE room_client_sync
|
|
SET {', '.join(set_clauses)}
|
|
WHERE room_hash = ? AND client_pubkey = ?
|
|
""",
|
|
values,
|
|
)
|
|
else:
|
|
# Insert with defaults
|
|
kwargs.setdefault("sync_since", 0)
|
|
kwargs.setdefault("pending_ack_crc", 0)
|
|
kwargs.setdefault("push_post_timestamp", 0)
|
|
kwargs.setdefault("ack_timeout_time", 0)
|
|
kwargs.setdefault("push_failures", 0)
|
|
kwargs.setdefault("last_activity", time.time())
|
|
|
|
columns = ["room_hash", "client_pubkey"] + list(kwargs.keys())
|
|
placeholders = ["?"] * len(columns)
|
|
values = [room_hash, client_pubkey] + list(kwargs.values())
|
|
|
|
conn.execute(
|
|
f"""
|
|
INSERT INTO room_client_sync ({', '.join(columns)})
|
|
VALUES ({', '.join(placeholders)})
|
|
""",
|
|
values,
|
|
)
|
|
|
|
conn.commit()
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"Failed to upsert client sync: {e}")
|
|
return False
|
|
|
|
def get_client_sync(self, room_hash: str, client_pubkey: str) -> Optional[Dict]:
|
|
"""Get client sync state."""
|
|
try:
|
|
with sqlite3.connect(self.sqlite_path) as conn:
|
|
conn.row_factory = sqlite3.Row
|
|
cursor = conn.execute(
|
|
"""
|
|
SELECT * FROM room_client_sync
|
|
WHERE room_hash = ? AND client_pubkey = ?
|
|
""",
|
|
(room_hash, client_pubkey),
|
|
)
|
|
row = cursor.fetchone()
|
|
return dict(row) if row else None
|
|
except Exception as e:
|
|
logger.error(f"Failed to get client sync: {e}")
|
|
return None
|
|
|
|
def get_all_room_clients(self, room_hash: str) -> List[Dict]:
|
|
"""Get all clients for a room."""
|
|
try:
|
|
with sqlite3.connect(self.sqlite_path) as conn:
|
|
conn.row_factory = sqlite3.Row
|
|
cursor = conn.execute(
|
|
"""
|
|
SELECT * FROM room_client_sync
|
|
WHERE room_hash = ?
|
|
ORDER BY last_activity DESC
|
|
""",
|
|
(room_hash,),
|
|
)
|
|
return [dict(row) for row in cursor.fetchall()]
|
|
except Exception as e:
|
|
logger.error(f"Failed to get room clients: {e}")
|
|
return []
|
|
|
|
def get_room_message_count(self, room_hash: str) -> int:
|
|
"""Get total number of messages in a room."""
|
|
try:
|
|
with sqlite3.connect(self.sqlite_path) as conn:
|
|
cursor = conn.execute(
|
|
"""
|
|
SELECT COUNT(*) FROM room_messages WHERE room_hash = ?
|
|
""",
|
|
(room_hash,),
|
|
)
|
|
return cursor.fetchone()[0]
|
|
except Exception as e:
|
|
logger.error(f"Failed to get room message count: {e}")
|
|
return 0
|
|
|
|
def get_room_messages(self, room_hash: str, limit: int = 50, offset: int = 0) -> List[Dict]:
|
|
"""Get messages from a room with pagination."""
|
|
try:
|
|
with sqlite3.connect(self.sqlite_path) as conn:
|
|
conn.row_factory = sqlite3.Row
|
|
cursor = conn.execute(
|
|
"""
|
|
SELECT * FROM room_messages
|
|
WHERE room_hash = ?
|
|
ORDER BY post_timestamp DESC
|
|
LIMIT ? OFFSET ?
|
|
""",
|
|
(room_hash, limit, offset),
|
|
)
|
|
return [dict(row) for row in cursor.fetchall()]
|
|
except Exception as e:
|
|
logger.error(f"Failed to get room messages: {e}")
|
|
return []
|
|
|
|
def get_messages_since(
|
|
self, room_hash: str, since_timestamp: float, limit: int = 50
|
|
) -> List[Dict]:
|
|
"""Get messages posted after a specific timestamp."""
|
|
try:
|
|
with sqlite3.connect(self.sqlite_path) as conn:
|
|
conn.row_factory = sqlite3.Row
|
|
cursor = conn.execute(
|
|
"""
|
|
SELECT * FROM room_messages
|
|
WHERE room_hash = ? AND post_timestamp > ?
|
|
ORDER BY post_timestamp DESC
|
|
LIMIT ?
|
|
""",
|
|
(room_hash, since_timestamp, limit),
|
|
)
|
|
return [dict(row) for row in cursor.fetchall()]
|
|
except Exception as e:
|
|
logger.error(f"Failed to get messages since timestamp: {e}")
|
|
return []
|
|
|
|
def get_unsynced_count(self, room_hash: str, client_pubkey: str, sync_since: float) -> int:
|
|
"""Get count of unsynced messages for a client."""
|
|
try:
|
|
with sqlite3.connect(self.sqlite_path) as conn:
|
|
cursor = conn.execute(
|
|
"""
|
|
SELECT COUNT(*) FROM room_messages
|
|
WHERE room_hash = ?
|
|
AND author_pubkey != ?
|
|
AND post_timestamp > ?
|
|
""",
|
|
(room_hash, client_pubkey, sync_since),
|
|
)
|
|
return cursor.fetchone()[0]
|
|
except Exception as e:
|
|
logger.error(f"Failed to get unsynced count: {e}")
|
|
return 0
|
|
|
|
def delete_room_message(self, room_hash: str, message_id: int) -> bool:
|
|
"""Delete a specific message by ID."""
|
|
try:
|
|
with sqlite3.connect(self.sqlite_path) as conn:
|
|
cursor = conn.execute(
|
|
"""
|
|
DELETE FROM room_messages
|
|
WHERE room_hash = ? AND id = ?
|
|
""",
|
|
(room_hash, message_id),
|
|
)
|
|
return cursor.rowcount > 0
|
|
except Exception as e:
|
|
logger.error(f"Failed to delete message: {e}")
|
|
return False
|
|
|
|
def clear_room_messages(self, room_hash: str) -> int:
|
|
"""Clear all messages from a room."""
|
|
try:
|
|
with sqlite3.connect(self.sqlite_path) as conn:
|
|
cursor = conn.execute(
|
|
"""
|
|
DELETE FROM room_messages WHERE room_hash = ?
|
|
""",
|
|
(room_hash,),
|
|
)
|
|
return cursor.rowcount
|
|
except Exception as e:
|
|
logger.error(f"Failed to clear room messages: {e}")
|
|
return 0
|
|
|
|
def cleanup_old_messages(self, room_hash: str, keep_count: int = 32) -> int:
|
|
"""Keep only the most recent N messages per room."""
|
|
try:
|
|
with sqlite3.connect(self.sqlite_path) as conn:
|
|
# First check if cleanup is needed
|
|
cursor = conn.execute(
|
|
"""
|
|
SELECT COUNT(*) FROM room_messages WHERE room_hash = ?
|
|
""",
|
|
(room_hash,),
|
|
)
|
|
total_count = cursor.fetchone()[0]
|
|
|
|
if total_count <= keep_count:
|
|
return 0 # No cleanup needed
|
|
|
|
# Delete old messages
|
|
cursor = conn.execute(
|
|
"""
|
|
DELETE FROM room_messages
|
|
WHERE room_hash = ?
|
|
AND id NOT IN (
|
|
SELECT id FROM room_messages
|
|
WHERE room_hash = ?
|
|
ORDER BY post_timestamp DESC
|
|
LIMIT ?
|
|
)
|
|
""",
|
|
(room_hash, room_hash, keep_count),
|
|
)
|
|
return cursor.rowcount
|
|
except Exception as e:
|
|
logger.error(f"Failed to cleanup old messages: {e}")
|
|
return 0
|
|
|
|
# Companion persistence methods
|
|
def companion_load_contacts(self, companion_hash: str) -> List[Dict]:
|
|
"""Load contacts for a companion from storage."""
|
|
try:
|
|
with sqlite3.connect(self.sqlite_path) as conn:
|
|
conn.row_factory = sqlite3.Row
|
|
cursor = conn.execute(
|
|
"""
|
|
SELECT pubkey, name, adv_type, flags, out_path_len, out_path,
|
|
last_advert_timestamp, lastmod, gps_lat, gps_lon, sync_since
|
|
FROM companion_contacts WHERE companion_hash = ?
|
|
""",
|
|
(companion_hash,),
|
|
)
|
|
return [dict(row) for row in cursor.fetchall()]
|
|
except Exception as e:
|
|
logger.error(f"Failed to load companion contacts: {e}")
|
|
return []
|
|
|
|
def companion_save_contacts(self, companion_hash: str, contacts: List[Dict]) -> bool:
|
|
"""Replace all contacts for a companion in storage."""
|
|
try:
|
|
with sqlite3.connect(self.sqlite_path) as conn:
|
|
conn.execute(
|
|
"DELETE FROM companion_contacts WHERE companion_hash = ?", (companion_hash,)
|
|
)
|
|
now = time.time()
|
|
for c in contacts:
|
|
conn.execute(
|
|
"""
|
|
INSERT INTO companion_contacts
|
|
(companion_hash, pubkey, name, adv_type, flags, out_path_len, out_path,
|
|
last_advert_timestamp, lastmod, gps_lat, gps_lon, sync_since, updated_at)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
""",
|
|
(
|
|
companion_hash,
|
|
c.get("pubkey", b""),
|
|
c.get("name", ""),
|
|
c.get("adv_type", 0),
|
|
c.get("flags", 0),
|
|
c.get("out_path_len", -1),
|
|
c.get("out_path", b""),
|
|
c.get("last_advert_timestamp", 0),
|
|
c.get("lastmod", 0),
|
|
c.get("gps_lat", 0.0),
|
|
c.get("gps_lon", 0.0),
|
|
c.get("sync_since", 0),
|
|
now,
|
|
),
|
|
)
|
|
conn.commit()
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"Failed to save companion contacts: {e}")
|
|
return False
|
|
|
|
def companion_upsert_contact(self, companion_hash: str, contact: dict) -> bool:
|
|
"""Insert or update a single contact for a companion in storage."""
|
|
try:
|
|
with sqlite3.connect(self.sqlite_path) as conn:
|
|
now = time.time()
|
|
conn.execute(
|
|
"""
|
|
INSERT INTO companion_contacts
|
|
(companion_hash, pubkey, name, adv_type, flags, out_path_len, out_path,
|
|
last_advert_timestamp, lastmod, gps_lat, gps_lon, sync_since, updated_at)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
ON CONFLICT(companion_hash, pubkey)
|
|
DO UPDATE SET
|
|
name=excluded.name, adv_type=excluded.adv_type,
|
|
flags=excluded.flags, out_path_len=excluded.out_path_len,
|
|
out_path=excluded.out_path,
|
|
last_advert_timestamp=excluded.last_advert_timestamp,
|
|
lastmod=excluded.lastmod, gps_lat=excluded.gps_lat,
|
|
gps_lon=excluded.gps_lon, sync_since=excluded.sync_since,
|
|
updated_at=excluded.updated_at
|
|
""",
|
|
(
|
|
companion_hash,
|
|
contact.get("pubkey", b""),
|
|
contact.get("name", ""),
|
|
contact.get("adv_type", 0),
|
|
contact.get("flags", 0),
|
|
contact.get("out_path_len", -1),
|
|
contact.get("out_path", b""),
|
|
contact.get("last_advert_timestamp", 0),
|
|
contact.get("lastmod", 0),
|
|
contact.get("gps_lat", 0.0),
|
|
contact.get("gps_lon", 0.0),
|
|
contact.get("sync_since", 0),
|
|
now,
|
|
),
|
|
)
|
|
conn.commit()
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"Failed to upsert companion contact: {e}")
|
|
return False
|
|
|
|
def companion_load_channels(self, companion_hash: str) -> List[Dict]:
|
|
"""Load channels for a companion from storage."""
|
|
try:
|
|
with sqlite3.connect(self.sqlite_path) as conn:
|
|
conn.row_factory = sqlite3.Row
|
|
cursor = conn.execute(
|
|
"""
|
|
SELECT channel_idx, name, secret FROM companion_channels
|
|
WHERE companion_hash = ? ORDER BY channel_idx
|
|
""",
|
|
(companion_hash,),
|
|
)
|
|
return [dict(row) for row in cursor.fetchall()]
|
|
except Exception as e:
|
|
logger.error(f"Failed to load companion channels: {e}")
|
|
return []
|
|
|
|
def companion_save_channels(self, companion_hash: str, channels: List[Dict]) -> bool:
|
|
"""Replace all channels for a companion in storage."""
|
|
try:
|
|
with sqlite3.connect(self.sqlite_path) as conn:
|
|
conn.execute(
|
|
"DELETE FROM companion_channels WHERE companion_hash = ?", (companion_hash,)
|
|
)
|
|
now = time.time()
|
|
for ch in channels:
|
|
conn.execute(
|
|
"""
|
|
INSERT INTO companion_channels
|
|
(companion_hash, channel_idx, name, secret, updated_at)
|
|
VALUES (?, ?, ?, ?, ?)
|
|
""",
|
|
(
|
|
companion_hash,
|
|
ch.get("channel_idx", 0),
|
|
ch.get("name", ""),
|
|
ch.get("secret", b""),
|
|
now,
|
|
),
|
|
)
|
|
conn.commit()
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"Failed to save companion channels: {e}")
|
|
return False
|
|
|
|
def companion_load_messages(self, companion_hash: str, limit: int = 100) -> List[Dict]:
|
|
"""Load queued messages for a companion (oldest first for queue order)."""
|
|
try:
|
|
with sqlite3.connect(self.sqlite_path) as conn:
|
|
conn.row_factory = sqlite3.Row
|
|
cursor = conn.execute(
|
|
"""
|
|
SELECT sender_key, txt_type, timestamp, text, is_channel, channel_idx, path_len
|
|
FROM companion_messages WHERE companion_hash = ?
|
|
ORDER BY created_at ASC LIMIT ?
|
|
""",
|
|
(companion_hash, limit),
|
|
)
|
|
return [dict(row) for row in cursor.fetchall()]
|
|
except Exception as e:
|
|
logger.error(f"Failed to load companion messages: {e}")
|
|
return []
|
|
|
|
def companion_push_message(self, companion_hash: str, msg: Dict) -> bool:
|
|
"""Append a message to the companion's queue. Deduplicates by packet_hash when present. Returns True if inserted, False if duplicate (skipped)."""
|
|
try:
|
|
packet_hash = msg.get("packet_hash") or None
|
|
if isinstance(packet_hash, bytes):
|
|
packet_hash = packet_hash.decode("utf-8", errors="replace") if packet_hash else None
|
|
sender_key = msg.get("sender_key", b"")
|
|
with sqlite3.connect(self.sqlite_path) as conn:
|
|
if packet_hash:
|
|
cursor = conn.execute(
|
|
"""
|
|
SELECT id FROM companion_messages
|
|
WHERE companion_hash = ? AND packet_hash = ?
|
|
LIMIT 1
|
|
""",
|
|
(companion_hash, packet_hash),
|
|
)
|
|
if cursor.fetchone():
|
|
return False
|
|
conn.execute(
|
|
"""
|
|
INSERT INTO companion_messages
|
|
(companion_hash, sender_key, txt_type, timestamp, text, is_channel, channel_idx, path_len, packet_hash, created_at)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
""",
|
|
(
|
|
companion_hash,
|
|
sender_key,
|
|
msg.get("txt_type", 0),
|
|
msg.get("timestamp", 0),
|
|
msg.get("text", ""),
|
|
int(msg.get("is_channel", False)),
|
|
msg.get("channel_idx", 0),
|
|
msg.get("path_len", 0),
|
|
packet_hash,
|
|
time.time(),
|
|
),
|
|
)
|
|
conn.commit()
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"Failed to push companion message: {e}")
|
|
return False
|
|
|
|
def companion_pop_message(self, companion_hash: str) -> Optional[Dict]:
|
|
"""Remove and return the oldest message from the companion's queue."""
|
|
try:
|
|
with sqlite3.connect(self.sqlite_path) as conn:
|
|
conn.row_factory = sqlite3.Row
|
|
cursor = conn.execute(
|
|
"""
|
|
SELECT id, sender_key, txt_type, timestamp, text, is_channel, channel_idx, path_len
|
|
FROM companion_messages WHERE companion_hash = ?
|
|
ORDER BY created_at ASC LIMIT 1
|
|
""",
|
|
(companion_hash,),
|
|
)
|
|
row = cursor.fetchone()
|
|
if not row:
|
|
return None
|
|
msg = dict(row)
|
|
conn.execute("DELETE FROM companion_messages WHERE id = ?", (msg["id"],))
|
|
conn.commit()
|
|
return {k: v for k, v in msg.items() if k != "id"}
|
|
except Exception as e:
|
|
logger.error(f"Failed to pop companion message: {e}")
|
|
return None
|