Files
Remote-Terminal-for-MeshCore/app/repository.py
2026-02-04 12:06:36 -08:00

909 lines
34 KiB
Python

import json
import logging
import sqlite3
import time
from hashlib import sha256
from typing import Any, Literal
from app.database import db
from app.decoder import PayloadType, extract_payload, get_packet_payload_type
from app.models import (
AppSettings,
BotConfig,
Channel,
Contact,
Favorite,
Message,
MessagePath,
)
logger = logging.getLogger(__name__)
class ContactRepository:
@staticmethod
async def upsert(contact: dict[str, Any]) -> None:
await db.conn.execute(
"""
INSERT INTO contacts (public_key, name, type, flags, last_path, last_path_len,
last_advert, lat, lon, last_seen, on_radio, last_contacted)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(public_key) DO UPDATE SET
name = COALESCE(excluded.name, contacts.name),
type = CASE WHEN excluded.type = 0 THEN contacts.type ELSE excluded.type END,
flags = excluded.flags,
last_path = COALESCE(excluded.last_path, contacts.last_path),
last_path_len = excluded.last_path_len,
last_advert = COALESCE(excluded.last_advert, contacts.last_advert),
lat = COALESCE(excluded.lat, contacts.lat),
lon = COALESCE(excluded.lon, contacts.lon),
last_seen = excluded.last_seen,
on_radio = excluded.on_radio,
last_contacted = COALESCE(excluded.last_contacted, contacts.last_contacted)
""",
(
contact.get("public_key", "").lower(),
contact.get("name") or contact.get("adv_name"),
contact.get("type", 0),
contact.get("flags", 0),
contact.get("last_path") or contact.get("out_path"),
contact.get("last_path_len")
if "last_path_len" in contact
else contact.get("out_path_len", -1),
contact.get("last_advert"),
contact.get("lat") or contact.get("adv_lat"),
contact.get("lon") or contact.get("adv_lon"),
contact.get("last_seen", int(time.time())),
contact.get("on_radio", False),
contact.get("last_contacted"),
),
)
await db.conn.commit()
@staticmethod
def _row_to_contact(row) -> Contact:
"""Convert a database row to a Contact model."""
return Contact(
public_key=row["public_key"],
name=row["name"],
type=row["type"],
flags=row["flags"],
last_path=row["last_path"],
last_path_len=row["last_path_len"],
last_advert=row["last_advert"],
lat=row["lat"],
lon=row["lon"],
last_seen=row["last_seen"],
on_radio=bool(row["on_radio"]),
last_contacted=row["last_contacted"],
last_read_at=row["last_read_at"],
)
@staticmethod
async def get_by_key(public_key: str) -> Contact | None:
cursor = await db.conn.execute(
"SELECT * FROM contacts WHERE public_key = ?", (public_key.lower(),)
)
row = await cursor.fetchone()
return ContactRepository._row_to_contact(row) if row else None
@staticmethod
async def get_by_key_prefix(prefix: str) -> Contact | None:
cursor = await db.conn.execute(
"SELECT * FROM contacts WHERE public_key LIKE ? LIMIT 1",
(f"{prefix.lower()}%",),
)
row = await cursor.fetchone()
return ContactRepository._row_to_contact(row) if row else None
@staticmethod
async def get_by_key_or_prefix(key_or_prefix: str) -> Contact | None:
"""Get a contact by exact key match, falling back to prefix match.
Useful when the input might be a full 64-char public key or a shorter prefix.
"""
contact = await ContactRepository.get_by_key(key_or_prefix)
if not contact:
contact = await ContactRepository.get_by_key_prefix(key_or_prefix)
return contact
@staticmethod
async def get_all(limit: int = 100, offset: int = 0) -> list[Contact]:
cursor = await db.conn.execute(
"SELECT * FROM contacts ORDER BY COALESCE(name, public_key) LIMIT ? OFFSET ?",
(limit, offset),
)
rows = await cursor.fetchall()
return [ContactRepository._row_to_contact(row) for row in rows]
@staticmethod
async def get_recent_non_repeaters(limit: int = 200) -> list[Contact]:
"""Get the most recently active non-repeater contacts.
Orders by most recent activity (last_contacted or last_advert),
excluding repeaters (type=2).
"""
cursor = await db.conn.execute(
"""
SELECT * FROM contacts
WHERE type != 2
ORDER BY COALESCE(last_contacted, 0) DESC, COALESCE(last_advert, 0) DESC
LIMIT ?
""",
(limit,),
)
rows = await cursor.fetchall()
return [ContactRepository._row_to_contact(row) for row in rows]
@staticmethod
async def update_path(public_key: str, path: str, path_len: int) -> None:
await db.conn.execute(
"UPDATE contacts SET last_path = ?, last_path_len = ?, last_seen = ? WHERE public_key = ?",
(path, path_len, int(time.time()), public_key.lower()),
)
await db.conn.commit()
@staticmethod
async def set_on_radio(public_key: str, on_radio: bool) -> None:
await db.conn.execute(
"UPDATE contacts SET on_radio = ? WHERE public_key = ?",
(on_radio, public_key.lower()),
)
await db.conn.commit()
@staticmethod
async def delete(public_key: str) -> None:
await db.conn.execute(
"DELETE FROM contacts WHERE public_key = ?",
(public_key.lower(),),
)
await db.conn.commit()
@staticmethod
async def update_last_contacted(public_key: str, timestamp: int | None = None) -> None:
"""Update the last_contacted timestamp for a contact."""
ts = timestamp or int(time.time())
await db.conn.execute(
"UPDATE contacts SET last_contacted = ?, last_seen = ? WHERE public_key = ?",
(ts, ts, public_key.lower()),
)
await db.conn.commit()
@staticmethod
async def update_last_read_at(public_key: str, timestamp: int | None = None) -> bool:
"""Update the last_read_at timestamp for a contact.
Returns True if a row was updated, False if contact not found.
"""
ts = timestamp or int(time.time())
cursor = await db.conn.execute(
"UPDATE contacts SET last_read_at = ? WHERE public_key = ?",
(ts, public_key.lower()),
)
await db.conn.commit()
return cursor.rowcount > 0
@staticmethod
async def get_by_pubkey_first_byte(hex_byte: str) -> list[Contact]:
"""Get contacts whose public key starts with the given hex byte (2 chars)."""
cursor = await db.conn.execute(
"SELECT * FROM contacts WHERE substr(public_key, 1, 2) = ?",
(hex_byte.lower(),),
)
rows = await cursor.fetchall()
return [ContactRepository._row_to_contact(row) for row in rows]
class ChannelRepository:
@staticmethod
async def upsert(key: str, name: str, is_hashtag: bool = False, on_radio: bool = False) -> None:
"""Upsert a channel. Key is 32-char hex string."""
await db.conn.execute(
"""
INSERT INTO channels (key, name, is_hashtag, on_radio)
VALUES (?, ?, ?, ?)
ON CONFLICT(key) DO UPDATE SET
name = excluded.name,
is_hashtag = excluded.is_hashtag,
on_radio = excluded.on_radio
""",
(key.upper(), name, is_hashtag, on_radio),
)
await db.conn.commit()
@staticmethod
async def get_by_key(key: str) -> Channel | None:
"""Get a channel by its key (32-char hex string)."""
cursor = await db.conn.execute(
"SELECT key, name, is_hashtag, on_radio, last_read_at FROM channels WHERE key = ?",
(key.upper(),),
)
row = await cursor.fetchone()
if row:
return Channel(
key=row["key"],
name=row["name"],
is_hashtag=bool(row["is_hashtag"]),
on_radio=bool(row["on_radio"]),
last_read_at=row["last_read_at"],
)
return None
@staticmethod
async def get_all() -> list[Channel]:
cursor = await db.conn.execute(
"SELECT key, name, is_hashtag, on_radio, last_read_at FROM channels ORDER BY name"
)
rows = await cursor.fetchall()
return [
Channel(
key=row["key"],
name=row["name"],
is_hashtag=bool(row["is_hashtag"]),
on_radio=bool(row["on_radio"]),
last_read_at=row["last_read_at"],
)
for row in rows
]
@staticmethod
async def delete(key: str) -> None:
"""Delete a channel by key."""
await db.conn.execute(
"DELETE FROM channels WHERE key = ?",
(key.upper(),),
)
await db.conn.commit()
@staticmethod
async def update_last_read_at(key: str, timestamp: int | None = None) -> bool:
"""Update the last_read_at timestamp for a channel.
Returns True if a row was updated, False if channel not found.
"""
ts = timestamp or int(time.time())
cursor = await db.conn.execute(
"UPDATE channels SET last_read_at = ? WHERE key = ?",
(ts, key.upper()),
)
await db.conn.commit()
return cursor.rowcount > 0
class MessageRepository:
@staticmethod
def _parse_paths(paths_json: str | None) -> list[MessagePath] | None:
"""Parse paths JSON string to list of MessagePath objects."""
if not paths_json:
return None
try:
paths_data = json.loads(paths_json)
return [MessagePath(**p) for p in paths_data]
except (json.JSONDecodeError, TypeError, KeyError):
return None
@staticmethod
async def create(
msg_type: str,
text: str,
received_at: int,
conversation_key: str,
sender_timestamp: int | None = None,
path: str | None = None,
txt_type: int = 0,
signature: str | None = None,
outgoing: bool = False,
) -> int | None:
"""Create a message, returning the ID or None if duplicate.
Uses INSERT OR IGNORE to handle the UNIQUE constraint on
(type, conversation_key, text, sender_timestamp). This prevents
duplicate messages when the same message arrives via multiple RF paths.
The path parameter is converted to the paths JSON array format.
"""
# Convert single path to paths array format
paths_json = None
if path is not None:
paths_json = json.dumps([{"path": path, "received_at": received_at}])
cursor = await db.conn.execute(
"""
INSERT OR IGNORE INTO messages (type, conversation_key, text, sender_timestamp,
received_at, paths, txt_type, signature, outgoing)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
msg_type,
conversation_key,
text,
sender_timestamp,
received_at,
paths_json,
txt_type,
signature,
outgoing,
),
)
await db.conn.commit()
# rowcount is 0 if INSERT was ignored due to UNIQUE constraint violation
if cursor.rowcount == 0:
return None
return cursor.lastrowid
@staticmethod
async def add_path(
message_id: int, path: str, received_at: int | None = None
) -> list[MessagePath]:
"""Add a new path to an existing message.
This is used when a repeat/echo of a message arrives via a different route.
Returns the updated list of paths.
"""
ts = received_at or int(time.time())
# Get current paths
cursor = await db.conn.execute("SELECT paths FROM messages WHERE id = ?", (message_id,))
row = await cursor.fetchone()
if not row:
return []
# Parse existing paths or start with empty list
existing_paths = []
if row["paths"]:
try:
existing_paths = json.loads(row["paths"])
except json.JSONDecodeError:
existing_paths = []
# Add new path
existing_paths.append({"path": path, "received_at": ts})
# Update database
paths_json = json.dumps(existing_paths)
await db.conn.execute(
"UPDATE messages SET paths = ? WHERE id = ?",
(paths_json, message_id),
)
await db.conn.commit()
return [MessagePath(**p) for p in existing_paths]
@staticmethod
async def claim_prefix_messages(full_key: str) -> int:
"""Promote prefix-stored messages to the full conversation key.
When a full key becomes known for a contact, any messages stored with
only a prefix as conversation_key are updated to use the full key.
"""
lower_key = full_key.lower()
cursor = await db.conn.execute(
"""UPDATE messages SET conversation_key = ?
WHERE type = 'PRIV' AND length(conversation_key) < 64
AND ? LIKE conversation_key || '%'""",
(lower_key, lower_key),
)
await db.conn.commit()
return cursor.rowcount
@staticmethod
async def get_all(
limit: int = 100,
offset: int = 0,
msg_type: str | None = None,
conversation_key: str | None = None,
before: int | None = None,
before_id: int | None = None,
) -> list[Message]:
query = "SELECT * FROM messages WHERE 1=1"
params: list[Any] = []
if msg_type:
query += " AND type = ?"
params.append(msg_type)
if conversation_key:
# Support both exact match and prefix match for DMs
query += " AND conversation_key LIKE ?"
params.append(f"{conversation_key}%")
if before is not None and before_id is not None:
query += " AND (received_at < ? OR (received_at = ? AND id < ?))"
params.extend([before, before, before_id])
query += " ORDER BY received_at DESC, id DESC LIMIT ?"
params.append(limit)
if before is None or before_id is None:
query += " OFFSET ?"
params.append(offset)
cursor = await db.conn.execute(query, params)
rows = await cursor.fetchall()
return [
Message(
id=row["id"],
type=row["type"],
conversation_key=row["conversation_key"],
text=row["text"],
sender_timestamp=row["sender_timestamp"],
received_at=row["received_at"],
paths=MessageRepository._parse_paths(row["paths"]),
txt_type=row["txt_type"],
signature=row["signature"],
outgoing=bool(row["outgoing"]),
acked=row["acked"],
)
for row in rows
]
@staticmethod
async def increment_ack_count(message_id: int) -> int:
"""Increment ack count and return the new value."""
await db.conn.execute("UPDATE messages SET acked = acked + 1 WHERE id = ?", (message_id,))
await db.conn.commit()
cursor = await db.conn.execute("SELECT acked FROM messages WHERE id = ?", (message_id,))
row = await cursor.fetchone()
return row["acked"] if row else 1
@staticmethod
async def get_ack_count(message_id: int) -> int:
"""Get the current ack count for a message."""
cursor = await db.conn.execute("SELECT acked FROM messages WHERE id = ?", (message_id,))
row = await cursor.fetchone()
return row["acked"] if row else 0
@staticmethod
async def get_by_content(
msg_type: str,
conversation_key: str,
text: str,
sender_timestamp: int | None,
) -> "Message | None":
"""Look up a message by its unique content fields."""
cursor = await db.conn.execute(
"""
SELECT id, type, conversation_key, text, sender_timestamp, received_at,
paths, txt_type, signature, outgoing, acked
FROM messages
WHERE type = ? AND conversation_key = ? AND text = ?
AND (sender_timestamp = ? OR (sender_timestamp IS NULL AND ? IS NULL))
""",
(msg_type, conversation_key, text, sender_timestamp, sender_timestamp),
)
row = await cursor.fetchone()
if not row:
return None
paths = None
if row["paths"]:
try:
paths_data = json.loads(row["paths"])
paths = [
MessagePath(path=p["path"], received_at=p["received_at"]) for p in paths_data
]
except (json.JSONDecodeError, KeyError):
pass
return Message(
id=row["id"],
type=row["type"],
conversation_key=row["conversation_key"],
text=row["text"],
sender_timestamp=row["sender_timestamp"],
received_at=row["received_at"],
paths=paths,
txt_type=row["txt_type"],
signature=row["signature"],
outgoing=bool(row["outgoing"]),
acked=row["acked"],
)
@staticmethod
async def get_unread_counts(name: str | None = None) -> dict:
"""Get unread message counts, mention flags, and last message times for all conversations.
Args:
name: User's display name for @[name] mention detection. If None, mentions are skipped.
Returns:
Dict with 'counts', 'mentions', and 'last_message_times' keys.
"""
counts: dict[str, int] = {}
mention_flags: dict[str, bool] = {}
last_message_times: dict[str, int] = {}
mention_pattern = f"%@[{name}]%" if name else None
# Channel unreads
cursor = await db.conn.execute(
"""
SELECT m.conversation_key,
COUNT(*) as unread_count,
MAX(m.received_at) as last_message_time,
SUM(CASE WHEN m.text LIKE ? THEN 1 ELSE 0 END) > 0 as has_mention
FROM messages m
JOIN channels c ON m.conversation_key = c.key
WHERE m.type = 'CHAN' AND m.outgoing = 0
AND m.received_at > COALESCE(c.last_read_at, 0)
GROUP BY m.conversation_key
""",
(mention_pattern or "",),
)
rows = await cursor.fetchall()
for row in rows:
state_key = f"channel-{row['conversation_key']}"
counts[state_key] = row["unread_count"]
if mention_pattern and row["has_mention"]:
mention_flags[state_key] = True
# Contact unreads
cursor = await db.conn.execute(
"""
SELECT m.conversation_key,
COUNT(*) as unread_count,
MAX(m.received_at) as last_message_time,
SUM(CASE WHEN m.text LIKE ? THEN 1 ELSE 0 END) > 0 as has_mention
FROM messages m
JOIN contacts ct ON m.conversation_key = ct.public_key
WHERE m.type = 'PRIV' AND m.outgoing = 0
AND m.received_at > COALESCE(ct.last_read_at, 0)
GROUP BY m.conversation_key
""",
(mention_pattern or "",),
)
rows = await cursor.fetchall()
for row in rows:
state_key = f"contact-{row['conversation_key']}"
counts[state_key] = row["unread_count"]
if mention_pattern and row["has_mention"]:
mention_flags[state_key] = True
# Last message times for all conversations (including read ones)
cursor = await db.conn.execute(
"""
SELECT type, conversation_key, MAX(received_at) as last_message_time
FROM messages
GROUP BY type, conversation_key
"""
)
rows = await cursor.fetchall()
for row in rows:
prefix = "channel" if row["type"] == "CHAN" else "contact"
state_key = f"{prefix}-{row['conversation_key']}"
last_message_times[state_key] = row["last_message_time"]
return {
"counts": counts,
"mentions": mention_flags,
"last_message_times": last_message_times,
}
class RawPacketRepository:
@staticmethod
async def create(data: bytes, timestamp: int | None = None) -> tuple[int, bool]:
"""
Create a raw packet with payload-based deduplication.
Returns (packet_id, is_new) tuple:
- is_new=True: New packet stored, packet_id is the new row ID
- is_new=False: Duplicate payload detected, packet_id is the existing row ID
Deduplication is based on the SHA-256 hash of the packet payload
(excluding routing/path information).
"""
ts = timestamp or int(time.time())
# Compute payload hash for deduplication
payload = extract_payload(data)
if payload:
payload_hash = sha256(payload).hexdigest()
else:
# For malformed packets, hash the full data
payload_hash = sha256(data).hexdigest()
# Check if this payload already exists
cursor = await db.conn.execute(
"SELECT id FROM raw_packets WHERE payload_hash = ?", (payload_hash,)
)
existing = await cursor.fetchone()
if existing:
# Duplicate - return existing packet ID
logger.debug(
"Duplicate payload detected (hash=%s..., existing_id=%d)",
payload_hash[:12],
existing["id"],
)
return (existing["id"], False)
# New packet - insert with hash
try:
cursor = await db.conn.execute(
"INSERT INTO raw_packets (timestamp, data, payload_hash) VALUES (?, ?, ?)",
(ts, data, payload_hash),
)
await db.conn.commit()
assert cursor.lastrowid is not None # INSERT always returns a row ID
return (cursor.lastrowid, True)
except sqlite3.IntegrityError:
# Race condition: another insert with same payload_hash happened between
# our SELECT and INSERT. This is expected for duplicate packets arriving
# close together. Query again to get the existing ID.
logger.debug(
"Duplicate packet detected via race condition (payload_hash=%s), dropping",
payload_hash[:16],
)
cursor = await db.conn.execute(
"SELECT id FROM raw_packets WHERE payload_hash = ?", (payload_hash,)
)
existing = await cursor.fetchone()
if existing:
return (existing["id"], False)
# This shouldn't happen, but if it does, re-raise
raise
@staticmethod
async def get_undecrypted_count() -> int:
"""Get count of undecrypted packets (those without a linked message)."""
cursor = await db.conn.execute(
"SELECT COUNT(*) as count FROM raw_packets WHERE message_id IS NULL"
)
row = await cursor.fetchone()
return row["count"] if row else 0
@staticmethod
async def get_oldest_undecrypted() -> int | None:
"""Get timestamp of oldest undecrypted packet, or None if none exist."""
cursor = await db.conn.execute(
"SELECT MIN(timestamp) as oldest FROM raw_packets WHERE message_id IS NULL"
)
row = await cursor.fetchone()
return row["oldest"] if row and row["oldest"] is not None else None
@staticmethod
async def get_all_undecrypted() -> list[tuple[int, bytes, int]]:
"""Get all undecrypted packets as (id, data, timestamp) tuples."""
cursor = await db.conn.execute(
"SELECT id, data, timestamp FROM raw_packets WHERE message_id IS NULL ORDER BY timestamp ASC"
)
rows = await cursor.fetchall()
return [(row["id"], bytes(row["data"]), row["timestamp"]) for row in rows]
@staticmethod
async def mark_decrypted(packet_id: int, message_id: int) -> None:
"""Link a raw packet to its decrypted message."""
await db.conn.execute(
"UPDATE raw_packets SET message_id = ? WHERE id = ?",
(message_id, packet_id),
)
await db.conn.commit()
@staticmethod
async def prune_old_undecrypted(max_age_days: int) -> int:
"""Delete undecrypted packets older than max_age_days. Returns count deleted."""
cutoff = int(time.time()) - (max_age_days * 86400)
cursor = await db.conn.execute(
"DELETE FROM raw_packets WHERE message_id IS NULL AND timestamp < ?",
(cutoff,),
)
await db.conn.commit()
return cursor.rowcount
@staticmethod
async def get_undecrypted_text_messages() -> list[tuple[int, bytes, int]]:
"""Get all undecrypted TEXT_MESSAGE packets as (id, data, timestamp) tuples.
Filters raw packets to only include those with PayloadType.TEXT_MESSAGE (0x02).
These are direct messages that can be decrypted with contact ECDH keys.
"""
cursor = await db.conn.execute(
"SELECT id, data, timestamp FROM raw_packets WHERE message_id IS NULL ORDER BY timestamp ASC"
)
rows = await cursor.fetchall()
# Filter for TEXT_MESSAGE packets
result = []
for row in rows:
data = bytes(row["data"])
payload_type = get_packet_payload_type(data)
if payload_type == PayloadType.TEXT_MESSAGE:
result.append((row["id"], data, row["timestamp"]))
return result
class AppSettingsRepository:
"""Repository for app_settings table (single-row pattern)."""
@staticmethod
async def get() -> AppSettings:
"""Get the current app settings.
Always returns settings - creates default row if needed (migration handles initial row).
"""
cursor = await db.conn.execute(
"""
SELECT max_radio_contacts, favorites, auto_decrypt_dm_on_advert,
sidebar_sort_order, last_message_times, preferences_migrated,
advert_interval, last_advert_time, bots
FROM app_settings WHERE id = 1
"""
)
row = await cursor.fetchone()
if not row:
# Should not happen after migration, but handle gracefully
return AppSettings()
# Parse favorites JSON
favorites = []
if row["favorites"]:
try:
favorites_data = json.loads(row["favorites"])
favorites = [Favorite(**f) for f in favorites_data]
except (json.JSONDecodeError, TypeError, KeyError) as e:
logger.warning(
"Failed to parse favorites JSON, using empty list: %s (data=%r)",
e,
row["favorites"][:100] if row["favorites"] else None,
)
favorites = []
# Parse last_message_times JSON
last_message_times: dict[str, int] = {}
if row["last_message_times"]:
try:
last_message_times = json.loads(row["last_message_times"])
except (json.JSONDecodeError, TypeError) as e:
logger.warning(
"Failed to parse last_message_times JSON, using empty dict: %s",
e,
)
last_message_times = {}
# Parse bots JSON
bots: list[BotConfig] = []
if row["bots"]:
try:
bots_data = json.loads(row["bots"])
bots = [BotConfig(**b) for b in bots_data]
except (json.JSONDecodeError, TypeError, KeyError) as e:
logger.warning(
"Failed to parse bots JSON, using empty list: %s (data=%r)",
e,
row["bots"][:100] if row["bots"] else None,
)
bots = []
# Validate sidebar_sort_order (fallback to "recent" if invalid)
sort_order = row["sidebar_sort_order"]
if sort_order not in ("recent", "alpha"):
sort_order = "recent"
return AppSettings(
max_radio_contacts=row["max_radio_contacts"],
favorites=favorites,
auto_decrypt_dm_on_advert=bool(row["auto_decrypt_dm_on_advert"]),
sidebar_sort_order=sort_order,
last_message_times=last_message_times,
preferences_migrated=bool(row["preferences_migrated"]),
advert_interval=row["advert_interval"] or 0,
last_advert_time=row["last_advert_time"] or 0,
bots=bots,
)
@staticmethod
async def update(
max_radio_contacts: int | None = None,
favorites: list[Favorite] | None = None,
auto_decrypt_dm_on_advert: bool | None = None,
sidebar_sort_order: str | None = None,
last_message_times: dict[str, int] | None = None,
preferences_migrated: bool | None = None,
advert_interval: int | None = None,
last_advert_time: int | None = None,
bots: list[BotConfig] | None = None,
) -> AppSettings:
"""Update app settings. Only provided fields are updated."""
updates = []
params: list[Any] = []
if max_radio_contacts is not None:
updates.append("max_radio_contacts = ?")
params.append(max_radio_contacts)
if favorites is not None:
updates.append("favorites = ?")
favorites_json = json.dumps([f.model_dump() for f in favorites])
params.append(favorites_json)
if auto_decrypt_dm_on_advert is not None:
updates.append("auto_decrypt_dm_on_advert = ?")
params.append(1 if auto_decrypt_dm_on_advert else 0)
if sidebar_sort_order is not None:
updates.append("sidebar_sort_order = ?")
params.append(sidebar_sort_order)
if last_message_times is not None:
updates.append("last_message_times = ?")
params.append(json.dumps(last_message_times))
if preferences_migrated is not None:
updates.append("preferences_migrated = ?")
params.append(1 if preferences_migrated else 0)
if advert_interval is not None:
updates.append("advert_interval = ?")
params.append(advert_interval)
if last_advert_time is not None:
updates.append("last_advert_time = ?")
params.append(last_advert_time)
if bots is not None:
updates.append("bots = ?")
bots_json = json.dumps([b.model_dump() for b in bots])
params.append(bots_json)
if updates:
query = f"UPDATE app_settings SET {', '.join(updates)} WHERE id = 1"
await db.conn.execute(query, params)
await db.conn.commit()
return await AppSettingsRepository.get()
@staticmethod
async def add_favorite(fav_type: Literal["channel", "contact"], fav_id: str) -> AppSettings:
"""Add a favorite, avoiding duplicates."""
settings = await AppSettingsRepository.get()
# Check if already favorited
if any(f.type == fav_type and f.id == fav_id for f in settings.favorites):
return settings
new_favorites = settings.favorites + [Favorite(type=fav_type, id=fav_id)]
return await AppSettingsRepository.update(favorites=new_favorites)
@staticmethod
async def remove_favorite(fav_type: Literal["channel", "contact"], fav_id: str) -> AppSettings:
"""Remove a favorite."""
settings = await AppSettingsRepository.get()
new_favorites = [
f for f in settings.favorites if not (f.type == fav_type and f.id == fav_id)
]
return await AppSettingsRepository.update(favorites=new_favorites)
@staticmethod
async def migrate_preferences_from_frontend(
favorites: list[dict],
sort_order: str,
last_message_times: dict[str, int],
) -> tuple[AppSettings, bool]:
"""Migrate all preferences from frontend localStorage.
This is a one-time migration. If already migrated, returns current settings
without overwriting. Returns (settings, did_migrate) tuple.
"""
settings = await AppSettingsRepository.get()
if settings.preferences_migrated:
# Already migrated, don't overwrite
return settings, False
# Convert frontend favorites format to Favorite objects
new_favorites = []
for f in favorites:
if f.get("type") in ("channel", "contact") and f.get("id"):
new_favorites.append(Favorite(type=f["type"], id=f["id"]))
# Update with migrated preferences and mark as migrated
settings = await AppSettingsRepository.update(
favorites=new_favorites,
sidebar_sort_order=sort_order if sort_order in ("recent", "alpha") else "recent",
last_message_times=last_message_times,
preferences_migrated=True,
)
return settings, True