mirror of
https://github.com/rightup/pyMC_Repeater.git
synced 2026-05-03 12:12:14 +02:00
Merge pull request #193 from tjdownes/perf/sqlite-wal-threadlocal
perf: thread-local SQLite connections, synchronous=NORMAL, dedup indexes
This commit is contained in:
@@ -3,6 +3,7 @@ import json
|
||||
import logging
|
||||
import secrets
|
||||
import sqlite3
|
||||
import threading
|
||||
import time
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Optional
|
||||
@@ -19,14 +20,50 @@ class SQLiteHandler:
|
||||
self._hot_cache_ttl_sec = 60
|
||||
self._packet_stats_cache = {}
|
||||
self._neighbors_cache = {"timestamp": 0.0, "value": None}
|
||||
# Thread-local storage for persistent SQLite connections.
|
||||
# Opening a new connection on every DB call is expensive on SD-card
|
||||
# storage: each sqlite3.connect() call triggers file-system operations
|
||||
# and each subsequent PRAGMA runs as a round-trip. Thread-local keeps
|
||||
# one long-lived connection per thread (typically one for the write
|
||||
# executor and one for the event-loop / HTTP threads), eliminating
|
||||
# repeated setup overhead while maintaining correct isolation.
|
||||
self._local = threading.local()
|
||||
self._init_database()
|
||||
self._run_migrations()
|
||||
|
||||
def _connect(self) -> sqlite3.Connection:
|
||||
"""Create a connection with WAL mode and busy timeout to avoid 'database is locked' errors."""
|
||||
conn = sqlite3.connect(self.sqlite_path, timeout=30)
|
||||
conn.execute("PRAGMA journal_mode=WAL")
|
||||
conn.execute("PRAGMA busy_timeout=30000")
|
||||
"""Return a persistent thread-local SQLite connection.
|
||||
|
||||
The first call from a given thread opens the connection and configures
|
||||
it once. Subsequent calls from the same thread return the cached
|
||||
connection, avoiding per-call connection overhead and repeated PRAGMA
|
||||
round-trips.
|
||||
|
||||
WAL (Write-Ahead Logging) mode:
|
||||
Default journal mode (DELETE) takes an exclusive lock for every write,
|
||||
blocking all readers. WAL allows one writer and multiple readers to
|
||||
operate concurrently — critical on SD-card storage where a single
|
||||
write can take 5–20 ms.
|
||||
|
||||
synchronous=NORMAL:
|
||||
Default FULL flushes WAL frames to disk after every transaction.
|
||||
NORMAL flushes only at WAL checkpoints — safe (no data loss on power
|
||||
failure beyond the current transaction) and significantly faster on
|
||||
SD cards, which have slow fsync.
|
||||
|
||||
busy_timeout=5000:
|
||||
Under concurrent access SQLite would immediately raise
|
||||
'database is locked'. 5 s of automatic retry eliminates transient
|
||||
contention errors when the write executor and the HTTP thread
|
||||
briefly compete for the WAL write lock.
|
||||
"""
|
||||
conn = getattr(self._local, "conn", None)
|
||||
if conn is None:
|
||||
conn = sqlite3.connect(str(self.sqlite_path))
|
||||
conn.execute("PRAGMA journal_mode=WAL")
|
||||
conn.execute("PRAGMA synchronous=NORMAL")
|
||||
conn.execute("PRAGMA busy_timeout=5000")
|
||||
self._local.conn = conn
|
||||
return conn
|
||||
|
||||
def _invalidate_hot_caches(self) -> None:
|
||||
@@ -479,6 +516,57 @@ class SQLiteHandler:
|
||||
)
|
||||
logger.info(f"Migration '{migration_name}' applied successfully")
|
||||
|
||||
# Migration 8: UNIQUE index on companion_messages for dedup by
|
||||
# (companion_hash, packet_hash). Enables INSERT OR IGNORE
|
||||
# deduplication in companion_push_message, replacing the
|
||||
# Python-level SELECT + INSERT round-trip.
|
||||
migration_name = "companion_messages_packet_hash_unique"
|
||||
existing = conn.execute(
|
||||
"SELECT migration_name FROM migrations WHERE migration_name = ?",
|
||||
(migration_name,),
|
||||
).fetchone()
|
||||
if not existing:
|
||||
conn.execute(
|
||||
"""
|
||||
CREATE UNIQUE INDEX IF NOT EXISTS idx_companion_messages_dedup
|
||||
ON companion_messages(companion_hash, packet_hash)
|
||||
WHERE packet_hash IS NOT NULL
|
||||
"""
|
||||
)
|
||||
conn.execute(
|
||||
"INSERT INTO migrations (migration_name, applied_at) VALUES (?, ?)",
|
||||
(migration_name, time.time()),
|
||||
)
|
||||
logger.info(f"Migration '{migration_name}' applied successfully")
|
||||
|
||||
# Migration 9: Deduplicate adverts and enforce UNIQUE on pubkey.
|
||||
# Without this index store_advert's ON CONFLICT clause cannot
|
||||
# function and each advert inserts a new row instead of updating
|
||||
# the existing one, causing unbounded table growth on busy meshes.
|
||||
migration_name = "adverts_unique_pubkey"
|
||||
existing = conn.execute(
|
||||
"SELECT migration_name FROM migrations WHERE migration_name = ?",
|
||||
(migration_name,),
|
||||
).fetchone()
|
||||
if not existing:
|
||||
# Keep only the most recently seen row per pubkey
|
||||
conn.execute(
|
||||
"""
|
||||
DELETE FROM adverts WHERE id NOT IN (
|
||||
SELECT MAX(id) FROM adverts GROUP BY pubkey
|
||||
)
|
||||
"""
|
||||
)
|
||||
conn.execute("DROP INDEX IF EXISTS idx_adverts_pubkey")
|
||||
conn.execute(
|
||||
"CREATE UNIQUE INDEX IF NOT EXISTS idx_adverts_pubkey ON adverts(pubkey)"
|
||||
)
|
||||
conn.execute(
|
||||
"INSERT INTO migrations (migration_name, applied_at) VALUES (?, ?)",
|
||||
(migration_name, time.time()),
|
||||
)
|
||||
logger.info(f"Migration '{migration_name}' applied successfully")
|
||||
|
||||
conn.commit()
|
||||
|
||||
except Exception as e:
|
||||
@@ -1902,24 +1990,6 @@ class SQLiteHandler:
|
||||
logger.error(f"Failed to get unsynced messages: {e}")
|
||||
return []
|
||||
|
||||
def get_unsynced_count(self, room_hash: str, client_pubkey: str, sync_since: float) -> int:
|
||||
"""Count unsynced messages for a client."""
|
||||
try:
|
||||
with self._connect() as conn:
|
||||
cursor = conn.execute(
|
||||
"""
|
||||
SELECT COUNT(*) FROM room_messages
|
||||
WHERE room_hash = ?
|
||||
AND post_timestamp > ?
|
||||
AND author_pubkey != ?
|
||||
""",
|
||||
(room_hash, sync_since, client_pubkey),
|
||||
)
|
||||
return cursor.fetchone()[0]
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to count unsynced messages: {e}")
|
||||
return 0
|
||||
|
||||
def upsert_client_sync(self, room_hash: str, client_pubkey: str, **kwargs) -> bool:
|
||||
"""Insert or update client sync state using single upsert operation."""
|
||||
try:
|
||||
@@ -2045,7 +2115,13 @@ class SQLiteHandler:
|
||||
return []
|
||||
|
||||
def get_unsynced_count(self, room_hash: str, client_pubkey: str, sync_since: float) -> int:
|
||||
"""Get count of unsynced messages for a client."""
|
||||
"""Get count of unsynced messages for a client.
|
||||
|
||||
Note: a duplicate definition of this method existed earlier in the file
|
||||
with the same signature but reversed parameter-binding order in the SQL.
|
||||
Python silently uses the last definition; the first was dead code.
|
||||
The dead definition has been removed.
|
||||
"""
|
||||
try:
|
||||
with self._connect() as conn:
|
||||
cursor = conn.execute(
|
||||
@@ -2426,30 +2502,28 @@ class SQLiteHandler:
|
||||
return []
|
||||
|
||||
def companion_push_message(self, companion_hash: str, msg: Dict) -> bool:
|
||||
"""Append a message to the companion's queue. Deduplicates by packet_hash when present. Returns True if inserted, False if duplicate (skipped)."""
|
||||
"""Append a message to the companion's queue.
|
||||
|
||||
Deduplicates by (companion_hash, packet_hash) using INSERT OR IGNORE
|
||||
backed by the UNIQUE index added in migration 8. This replaces the
|
||||
previous SELECT + INSERT round-trip (two statements, two SD-card reads)
|
||||
with a single atomic statement.
|
||||
|
||||
Returns True if inserted, False if the message was a duplicate (skipped).
|
||||
"""
|
||||
try:
|
||||
packet_hash = msg.get("packet_hash") or None
|
||||
if isinstance(packet_hash, bytes):
|
||||
packet_hash = packet_hash.decode("utf-8", errors="replace") if packet_hash else None
|
||||
sender_key = msg.get("sender_key", b"")
|
||||
with self._connect() as conn:
|
||||
if packet_hash:
|
||||
cursor = conn.execute(
|
||||
"""
|
||||
SELECT id FROM companion_messages
|
||||
WHERE companion_hash = ? AND packet_hash = ?
|
||||
LIMIT 1
|
||||
""",
|
||||
(companion_hash, packet_hash),
|
||||
)
|
||||
if cursor.fetchone():
|
||||
return False
|
||||
conn.execute(
|
||||
cursor = conn.execute(
|
||||
"""
|
||||
INSERT INTO companion_messages
|
||||
(companion_hash, sender_key, txt_type, timestamp, text, is_channel, channel_idx, path_len, packet_hash, created_at)
|
||||
INSERT OR IGNORE INTO companion_messages
|
||||
(companion_hash, sender_key, txt_type, timestamp, text,
|
||||
is_channel, channel_idx, path_len, packet_hash, created_at)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||
""",
|
||||
""",
|
||||
(
|
||||
companion_hash,
|
||||
sender_key,
|
||||
@@ -2464,7 +2538,7 @@ class SQLiteHandler:
|
||||
),
|
||||
)
|
||||
conn.commit()
|
||||
return True
|
||||
return cursor.rowcount > 0
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to push companion message: {e}")
|
||||
return False
|
||||
|
||||
Reference in New Issue
Block a user