From 4d15c7d8948a491b5ffdbcbc5d529f958368625c Mon Sep 17 00:00:00 2001 From: Jack Kingsman Date: Thu, 5 Mar 2026 23:47:08 -0800 Subject: [PATCH] Add per-config id lock to reload and remove stale comment --- app/fanout/manager.py | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/app/fanout/manager.py b/app/fanout/manager.py index 644b643..b58041b 100644 --- a/app/fanout/manager.py +++ b/app/fanout/manager.py @@ -11,7 +11,7 @@ from app.fanout.base import FanoutModule logger = logging.getLogger(__name__) _DISPATCH_TIMEOUT_SECONDS = 30.0 -# Type string -> module class mapping (extended in Phase 2/3) +# Type string -> module class mapping _MODULE_TYPES: dict[str, type] = {} @@ -122,14 +122,16 @@ class FanoutManager: async def reload_config(self, config_id: str) -> None: """Stop old module (if any) and start updated config.""" - await self.remove_config(config_id) + lock = self._restart_locks.setdefault(config_id, asyncio.Lock()) + async with lock: + await self.remove_config(config_id) - from app.repository.fanout import FanoutConfigRepository + from app.repository.fanout import FanoutConfigRepository - cfg = await FanoutConfigRepository.get(config_id) - if cfg is None or not cfg["enabled"]: - return - await self._start_module(cfg) + cfg = await FanoutConfigRepository.get(config_id) + if cfg is None or not cfg["enabled"]: + return + await self._start_module(cfg) async def remove_config(self, config_id: str) -> None: """Stop and remove a module.""" @@ -140,7 +142,6 @@ class FanoutManager: await module.stop() except Exception: logger.exception("Error stopping fanout module %s", config_id) - self._restart_locks.pop(config_id, None) async def _dispatch_matching( self,