Files
Remote-Terminal-for-MeshCore/app/repository/fanout.py
2026-03-06 14:41:51 -08:00

138 lines
4.4 KiB
Python

"""Repository for fanout_configs table."""
import json
import logging
import time
import uuid
from typing import Any
from app.database import db
logger = logging.getLogger(__name__)
# In-memory cache of config metadata (name, type) for status reporting.
# Populated by get_all/get/create/update and read by FanoutManager.get_statuses().
_configs_cache: dict[str, dict[str, Any]] = {}
def _row_to_dict(row: Any) -> dict[str, Any]:
"""Convert a database row to a config dict."""
result = {
"id": row["id"],
"type": row["type"],
"name": row["name"],
"enabled": bool(row["enabled"]),
"config": json.loads(row["config"]) if row["config"] else {},
"scope": json.loads(row["scope"]) if row["scope"] else {},
"sort_order": row["sort_order"] or 0,
"created_at": row["created_at"] or 0,
}
_configs_cache[result["id"]] = result
return result
class FanoutConfigRepository:
"""CRUD operations for fanout_configs table."""
@staticmethod
async def get_all() -> list[dict[str, Any]]:
"""Get all fanout configs ordered by sort_order."""
cursor = await db.conn.execute(
"SELECT * FROM fanout_configs ORDER BY sort_order, created_at"
)
rows = await cursor.fetchall()
return [_row_to_dict(row) for row in rows]
@staticmethod
async def get(config_id: str) -> dict[str, Any] | None:
"""Get a single fanout config by ID."""
cursor = await db.conn.execute("SELECT * FROM fanout_configs WHERE id = ?", (config_id,))
row = await cursor.fetchone()
if row is None:
return None
return _row_to_dict(row)
@staticmethod
async def create(
config_type: str,
name: str,
config: dict,
scope: dict,
enabled: bool = True,
config_id: str | None = None,
) -> dict[str, Any]:
"""Create a new fanout config."""
new_id = config_id or str(uuid.uuid4())
now = int(time.time())
# Get next sort_order
cursor = await db.conn.execute(
"SELECT COALESCE(MAX(sort_order), -1) + 1 FROM fanout_configs"
)
row = await cursor.fetchone()
sort_order = row[0] if row else 0
await db.conn.execute(
"""
INSERT INTO fanout_configs (id, type, name, enabled, config, scope, sort_order, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""",
(
new_id,
config_type,
name,
1 if enabled else 0,
json.dumps(config),
json.dumps(scope),
sort_order,
now,
),
)
await db.conn.commit()
result = await FanoutConfigRepository.get(new_id)
assert result is not None
return result
@staticmethod
async def update(config_id: str, **fields: Any) -> dict[str, Any] | None:
"""Update a fanout config. Only provided fields are updated."""
updates = []
params: list[Any] = []
for field in ("name", "enabled", "config", "scope", "sort_order"):
if field in fields:
value = fields[field]
if field == "enabled":
value = 1 if value else 0
elif field in ("config", "scope"):
value = json.dumps(value)
updates.append(f"{field} = ?")
params.append(value)
if not updates:
return await FanoutConfigRepository.get(config_id)
params.append(config_id)
query = f"UPDATE fanout_configs SET {', '.join(updates)} WHERE id = ?"
await db.conn.execute(query, params)
await db.conn.commit()
return await FanoutConfigRepository.get(config_id)
@staticmethod
async def delete(config_id: str) -> None:
"""Delete a fanout config."""
await db.conn.execute("DELETE FROM fanout_configs WHERE id = ?", (config_id,))
await db.conn.commit()
_configs_cache.pop(config_id, None)
@staticmethod
async def get_enabled() -> list[dict[str, Any]]:
"""Get all enabled fanout configs."""
cursor = await db.conn.execute(
"SELECT * FROM fanout_configs WHERE enabled = 1 ORDER BY sort_order, created_at"
)
rows = await cursor.fetchall()
return [_row_to_dict(row) for row in rows]