Add per-config id lock to reload and remove stale comment

This commit is contained in:
Jack Kingsman
2026-03-05 23:47:08 -08:00
parent 22e28a9e5b
commit 4d15c7d894

View File

@@ -11,7 +11,7 @@ from app.fanout.base import FanoutModule
logger = logging.getLogger(__name__)
_DISPATCH_TIMEOUT_SECONDS = 30.0
# Type string -> module class mapping (extended in Phase 2/3)
# Type string -> module class mapping
_MODULE_TYPES: dict[str, type] = {}
@@ -122,14 +122,16 @@ class FanoutManager:
async def reload_config(self, config_id: str) -> None:
"""Stop old module (if any) and start updated config."""
await self.remove_config(config_id)
lock = self._restart_locks.setdefault(config_id, asyncio.Lock())
async with lock:
await self.remove_config(config_id)
from app.repository.fanout import FanoutConfigRepository
from app.repository.fanout import FanoutConfigRepository
cfg = await FanoutConfigRepository.get(config_id)
if cfg is None or not cfg["enabled"]:
return
await self._start_module(cfg)
cfg = await FanoutConfigRepository.get(config_id)
if cfg is None or not cfg["enabled"]:
return
await self._start_module(cfg)
async def remove_config(self, config_id: str) -> None:
"""Stop and remove a module."""
@@ -140,7 +142,6 @@ class FanoutManager:
await module.stop()
except Exception:
logger.exception("Error stopping fanout module %s", config_id)
self._restart_locks.pop(config_id, None)
async def _dispatch_matching(
self,