mirror of
https://github.com/jkingsman/Remote-Terminal-for-MeshCore.git
synced 2026-03-28 17:43:05 +01:00
388 lines
15 KiB
Python
388 lines
15 KiB
Python
import json
|
|
import logging
|
|
import time
|
|
from typing import Any, Literal
|
|
|
|
from app.database import db
|
|
from app.models import AppSettings, BotConfig, Favorite
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
SECONDS_1H = 3600
|
|
SECONDS_24H = 86400
|
|
SECONDS_7D = 604800
|
|
|
|
|
|
class AppSettingsRepository:
|
|
"""Repository for app_settings table (single-row pattern)."""
|
|
|
|
@staticmethod
|
|
async def get() -> AppSettings:
|
|
"""Get the current app settings.
|
|
|
|
Always returns settings - creates default row if needed (migration handles initial row).
|
|
"""
|
|
cursor = await db.conn.execute(
|
|
"""
|
|
SELECT max_radio_contacts, favorites, auto_decrypt_dm_on_advert,
|
|
sidebar_sort_order, last_message_times, preferences_migrated,
|
|
advert_interval, last_advert_time, bots,
|
|
mqtt_broker_host, mqtt_broker_port, mqtt_username, mqtt_password,
|
|
mqtt_use_tls, mqtt_tls_insecure, mqtt_topic_prefix,
|
|
mqtt_publish_messages, mqtt_publish_raw_packets
|
|
FROM app_settings WHERE id = 1
|
|
"""
|
|
)
|
|
row = await cursor.fetchone()
|
|
|
|
if not row:
|
|
# Should not happen after migration, but handle gracefully
|
|
return AppSettings()
|
|
|
|
# Parse favorites JSON
|
|
favorites = []
|
|
if row["favorites"]:
|
|
try:
|
|
favorites_data = json.loads(row["favorites"])
|
|
favorites = [Favorite(**f) for f in favorites_data]
|
|
except (json.JSONDecodeError, TypeError, KeyError) as e:
|
|
logger.warning(
|
|
"Failed to parse favorites JSON, using empty list: %s (data=%r)",
|
|
e,
|
|
row["favorites"][:100] if row["favorites"] else None,
|
|
)
|
|
favorites = []
|
|
|
|
# Parse last_message_times JSON
|
|
last_message_times: dict[str, int] = {}
|
|
if row["last_message_times"]:
|
|
try:
|
|
last_message_times = json.loads(row["last_message_times"])
|
|
except (json.JSONDecodeError, TypeError) as e:
|
|
logger.warning(
|
|
"Failed to parse last_message_times JSON, using empty dict: %s",
|
|
e,
|
|
)
|
|
last_message_times = {}
|
|
|
|
# Parse bots JSON
|
|
bots: list[BotConfig] = []
|
|
if row["bots"]:
|
|
try:
|
|
bots_data = json.loads(row["bots"])
|
|
bots = [BotConfig(**b) for b in bots_data]
|
|
except (json.JSONDecodeError, TypeError, KeyError) as e:
|
|
logger.warning(
|
|
"Failed to parse bots JSON, using empty list: %s (data=%r)",
|
|
e,
|
|
row["bots"][:100] if row["bots"] else None,
|
|
)
|
|
bots = []
|
|
|
|
# Validate sidebar_sort_order (fallback to "recent" if invalid)
|
|
sort_order = row["sidebar_sort_order"]
|
|
if sort_order not in ("recent", "alpha"):
|
|
sort_order = "recent"
|
|
|
|
return AppSettings(
|
|
max_radio_contacts=row["max_radio_contacts"],
|
|
favorites=favorites,
|
|
auto_decrypt_dm_on_advert=bool(row["auto_decrypt_dm_on_advert"]),
|
|
sidebar_sort_order=sort_order,
|
|
last_message_times=last_message_times,
|
|
preferences_migrated=bool(row["preferences_migrated"]),
|
|
advert_interval=row["advert_interval"] or 0,
|
|
last_advert_time=row["last_advert_time"] or 0,
|
|
bots=bots,
|
|
mqtt_broker_host=row["mqtt_broker_host"] or "",
|
|
mqtt_broker_port=row["mqtt_broker_port"] or 1883,
|
|
mqtt_username=row["mqtt_username"] or "",
|
|
mqtt_password=row["mqtt_password"] or "",
|
|
mqtt_use_tls=bool(row["mqtt_use_tls"]),
|
|
mqtt_tls_insecure=bool(row["mqtt_tls_insecure"]),
|
|
mqtt_topic_prefix=row["mqtt_topic_prefix"] or "meshcore",
|
|
mqtt_publish_messages=bool(row["mqtt_publish_messages"]),
|
|
mqtt_publish_raw_packets=bool(row["mqtt_publish_raw_packets"]),
|
|
)
|
|
|
|
@staticmethod
|
|
async def update(
|
|
max_radio_contacts: int | None = None,
|
|
favorites: list[Favorite] | None = None,
|
|
auto_decrypt_dm_on_advert: bool | None = None,
|
|
sidebar_sort_order: str | None = None,
|
|
last_message_times: dict[str, int] | None = None,
|
|
preferences_migrated: bool | None = None,
|
|
advert_interval: int | None = None,
|
|
last_advert_time: int | None = None,
|
|
bots: list[BotConfig] | None = None,
|
|
mqtt_broker_host: str | None = None,
|
|
mqtt_broker_port: int | None = None,
|
|
mqtt_username: str | None = None,
|
|
mqtt_password: str | None = None,
|
|
mqtt_use_tls: bool | None = None,
|
|
mqtt_tls_insecure: bool | None = None,
|
|
mqtt_topic_prefix: str | None = None,
|
|
mqtt_publish_messages: bool | None = None,
|
|
mqtt_publish_raw_packets: bool | None = None,
|
|
) -> AppSettings:
|
|
"""Update app settings. Only provided fields are updated."""
|
|
updates = []
|
|
params: list[Any] = []
|
|
|
|
if max_radio_contacts is not None:
|
|
updates.append("max_radio_contacts = ?")
|
|
params.append(max_radio_contacts)
|
|
|
|
if favorites is not None:
|
|
updates.append("favorites = ?")
|
|
favorites_json = json.dumps([f.model_dump() for f in favorites])
|
|
params.append(favorites_json)
|
|
|
|
if auto_decrypt_dm_on_advert is not None:
|
|
updates.append("auto_decrypt_dm_on_advert = ?")
|
|
params.append(1 if auto_decrypt_dm_on_advert else 0)
|
|
|
|
if sidebar_sort_order is not None:
|
|
updates.append("sidebar_sort_order = ?")
|
|
params.append(sidebar_sort_order)
|
|
|
|
if last_message_times is not None:
|
|
updates.append("last_message_times = ?")
|
|
params.append(json.dumps(last_message_times))
|
|
|
|
if preferences_migrated is not None:
|
|
updates.append("preferences_migrated = ?")
|
|
params.append(1 if preferences_migrated else 0)
|
|
|
|
if advert_interval is not None:
|
|
updates.append("advert_interval = ?")
|
|
params.append(advert_interval)
|
|
|
|
if last_advert_time is not None:
|
|
updates.append("last_advert_time = ?")
|
|
params.append(last_advert_time)
|
|
|
|
if bots is not None:
|
|
updates.append("bots = ?")
|
|
bots_json = json.dumps([b.model_dump() for b in bots])
|
|
params.append(bots_json)
|
|
|
|
if mqtt_broker_host is not None:
|
|
updates.append("mqtt_broker_host = ?")
|
|
params.append(mqtt_broker_host)
|
|
|
|
if mqtt_broker_port is not None:
|
|
updates.append("mqtt_broker_port = ?")
|
|
params.append(mqtt_broker_port)
|
|
|
|
if mqtt_username is not None:
|
|
updates.append("mqtt_username = ?")
|
|
params.append(mqtt_username)
|
|
|
|
if mqtt_password is not None:
|
|
updates.append("mqtt_password = ?")
|
|
params.append(mqtt_password)
|
|
|
|
if mqtt_use_tls is not None:
|
|
updates.append("mqtt_use_tls = ?")
|
|
params.append(1 if mqtt_use_tls else 0)
|
|
|
|
if mqtt_tls_insecure is not None:
|
|
updates.append("mqtt_tls_insecure = ?")
|
|
params.append(1 if mqtt_tls_insecure else 0)
|
|
|
|
if mqtt_topic_prefix is not None:
|
|
updates.append("mqtt_topic_prefix = ?")
|
|
params.append(mqtt_topic_prefix)
|
|
|
|
if mqtt_publish_messages is not None:
|
|
updates.append("mqtt_publish_messages = ?")
|
|
params.append(1 if mqtt_publish_messages else 0)
|
|
|
|
if mqtt_publish_raw_packets is not None:
|
|
updates.append("mqtt_publish_raw_packets = ?")
|
|
params.append(1 if mqtt_publish_raw_packets else 0)
|
|
|
|
if updates:
|
|
query = f"UPDATE app_settings SET {', '.join(updates)} WHERE id = 1"
|
|
await db.conn.execute(query, params)
|
|
await db.conn.commit()
|
|
|
|
return await AppSettingsRepository.get()
|
|
|
|
@staticmethod
|
|
async def add_favorite(fav_type: Literal["channel", "contact"], fav_id: str) -> AppSettings:
|
|
"""Add a favorite, avoiding duplicates."""
|
|
settings = await AppSettingsRepository.get()
|
|
|
|
# Check if already favorited
|
|
if any(f.type == fav_type and f.id == fav_id for f in settings.favorites):
|
|
return settings
|
|
|
|
new_favorites = settings.favorites + [Favorite(type=fav_type, id=fav_id)]
|
|
return await AppSettingsRepository.update(favorites=new_favorites)
|
|
|
|
@staticmethod
|
|
async def remove_favorite(fav_type: Literal["channel", "contact"], fav_id: str) -> AppSettings:
|
|
"""Remove a favorite."""
|
|
settings = await AppSettingsRepository.get()
|
|
new_favorites = [
|
|
f for f in settings.favorites if not (f.type == fav_type and f.id == fav_id)
|
|
]
|
|
return await AppSettingsRepository.update(favorites=new_favorites)
|
|
|
|
@staticmethod
|
|
async def migrate_preferences_from_frontend(
|
|
favorites: list[dict],
|
|
sort_order: str,
|
|
last_message_times: dict[str, int],
|
|
) -> tuple[AppSettings, bool]:
|
|
"""Migrate all preferences from frontend localStorage.
|
|
|
|
This is a one-time migration. If already migrated, returns current settings
|
|
without overwriting. Returns (settings, did_migrate) tuple.
|
|
"""
|
|
settings = await AppSettingsRepository.get()
|
|
|
|
if settings.preferences_migrated:
|
|
# Already migrated, don't overwrite
|
|
return settings, False
|
|
|
|
# Convert frontend favorites format to Favorite objects
|
|
new_favorites = []
|
|
for f in favorites:
|
|
if f.get("type") in ("channel", "contact") and f.get("id"):
|
|
new_favorites.append(Favorite(type=f["type"], id=f["id"]))
|
|
|
|
# Update with migrated preferences and mark as migrated
|
|
settings = await AppSettingsRepository.update(
|
|
favorites=new_favorites,
|
|
sidebar_sort_order=sort_order if sort_order in ("recent", "alpha") else "recent",
|
|
last_message_times=last_message_times,
|
|
preferences_migrated=True,
|
|
)
|
|
|
|
return settings, True
|
|
|
|
|
|
class StatisticsRepository:
|
|
@staticmethod
|
|
async def _activity_counts(*, contact_type: int, exclude: bool = False) -> dict[str, int]:
|
|
"""Get time-windowed counts for contacts/repeaters heard."""
|
|
now = int(time.time())
|
|
op = "!=" if exclude else "="
|
|
cursor = await db.conn.execute(
|
|
f"""
|
|
SELECT
|
|
SUM(CASE WHEN last_seen >= ? THEN 1 ELSE 0 END) AS last_hour,
|
|
SUM(CASE WHEN last_seen >= ? THEN 1 ELSE 0 END) AS last_24_hours,
|
|
SUM(CASE WHEN last_seen >= ? THEN 1 ELSE 0 END) AS last_week
|
|
FROM contacts
|
|
WHERE type {op} ? AND last_seen IS NOT NULL
|
|
""",
|
|
(now - SECONDS_1H, now - SECONDS_24H, now - SECONDS_7D, contact_type),
|
|
)
|
|
row = await cursor.fetchone()
|
|
assert row is not None # Aggregate query always returns a row
|
|
return {
|
|
"last_hour": row["last_hour"] or 0,
|
|
"last_24_hours": row["last_24_hours"] or 0,
|
|
"last_week": row["last_week"] or 0,
|
|
}
|
|
|
|
@staticmethod
|
|
async def get_all() -> dict:
|
|
"""Aggregate all statistics from existing tables."""
|
|
now = int(time.time())
|
|
|
|
# Top 5 busiest channels in last 24h
|
|
cursor = await db.conn.execute(
|
|
"""
|
|
SELECT m.conversation_key, COALESCE(c.name, m.conversation_key) AS channel_name,
|
|
COUNT(*) AS message_count
|
|
FROM messages m
|
|
LEFT JOIN channels c ON m.conversation_key = c.key
|
|
WHERE m.type = 'CHAN' AND m.received_at >= ?
|
|
GROUP BY m.conversation_key
|
|
ORDER BY COUNT(*) DESC
|
|
LIMIT 5
|
|
""",
|
|
(now - SECONDS_24H,),
|
|
)
|
|
rows = await cursor.fetchall()
|
|
busiest_channels_24h = [
|
|
{
|
|
"channel_key": row["conversation_key"],
|
|
"channel_name": row["channel_name"],
|
|
"message_count": row["message_count"],
|
|
}
|
|
for row in rows
|
|
]
|
|
|
|
# Entity counts
|
|
cursor = await db.conn.execute("SELECT COUNT(*) AS cnt FROM contacts WHERE type != 2")
|
|
row = await cursor.fetchone()
|
|
assert row is not None
|
|
contact_count: int = row["cnt"]
|
|
|
|
cursor = await db.conn.execute("SELECT COUNT(*) AS cnt FROM contacts WHERE type = 2")
|
|
row = await cursor.fetchone()
|
|
assert row is not None
|
|
repeater_count: int = row["cnt"]
|
|
|
|
cursor = await db.conn.execute("SELECT COUNT(*) AS cnt FROM channels")
|
|
row = await cursor.fetchone()
|
|
assert row is not None
|
|
channel_count: int = row["cnt"]
|
|
|
|
# Packet split
|
|
cursor = await db.conn.execute(
|
|
"""
|
|
SELECT COUNT(*) AS total,
|
|
SUM(CASE WHEN message_id IS NOT NULL THEN 1 ELSE 0 END) AS decrypted
|
|
FROM raw_packets
|
|
"""
|
|
)
|
|
pkt_row = await cursor.fetchone()
|
|
assert pkt_row is not None
|
|
total_packets = pkt_row["total"] or 0
|
|
decrypted_packets = pkt_row["decrypted"] or 0
|
|
undecrypted_packets = total_packets - decrypted_packets
|
|
|
|
# Message type counts
|
|
cursor = await db.conn.execute("SELECT COUNT(*) AS cnt FROM messages WHERE type = 'PRIV'")
|
|
row = await cursor.fetchone()
|
|
assert row is not None
|
|
total_dms: int = row["cnt"]
|
|
|
|
cursor = await db.conn.execute("SELECT COUNT(*) AS cnt FROM messages WHERE type = 'CHAN'")
|
|
row = await cursor.fetchone()
|
|
assert row is not None
|
|
total_channel_messages: int = row["cnt"]
|
|
|
|
# Outgoing count
|
|
cursor = await db.conn.execute("SELECT COUNT(*) AS cnt FROM messages WHERE outgoing = 1")
|
|
row = await cursor.fetchone()
|
|
assert row is not None
|
|
total_outgoing: int = row["cnt"]
|
|
|
|
# Activity windows
|
|
contacts_heard = await StatisticsRepository._activity_counts(contact_type=2, exclude=True)
|
|
repeaters_heard = await StatisticsRepository._activity_counts(contact_type=2)
|
|
|
|
return {
|
|
"busiest_channels_24h": busiest_channels_24h,
|
|
"contact_count": contact_count,
|
|
"repeater_count": repeater_count,
|
|
"channel_count": channel_count,
|
|
"total_packets": total_packets,
|
|
"decrypted_packets": decrypted_packets,
|
|
"undecrypted_packets": undecrypted_packets,
|
|
"total_dms": total_dms,
|
|
"total_channel_messages": total_channel_messages,
|
|
"total_outgoing": total_outgoing,
|
|
"contacts_heard": contacts_heard,
|
|
"repeaters_heard": repeaters_heard,
|
|
}
|