From 3c5eace2207279c55401dd8fa27294d5a94bb682 Mon Sep 17 00:00:00 2001 From: Jorijn Schrijvershof Date: Mon, 5 Jan 2026 09:59:37 +0100 Subject: [PATCH] feat: add automatic serial port locking to prevent concurrent access MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implements fcntl.flock() based locking for serial transport to prevent USB serial conflicts when collect_companion and collect_repeater run simultaneously. This addresses Ofelia's limitation where no-overlap only prevents a job from overlapping with itself, not other jobs. Key changes: - Add connect_with_lock() async context manager to meshcore_client.py - Use non-blocking LOCK_NB with async polling to avoid freezing event loop - Only lock for serial transport (TCP/BLE don't need it) - 60s timeout with clear error message if lock cannot be acquired - Update collector scripts to use new context manager - Remove external flock from cron examples (now handled in Python) 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- CLAUDE.md | 8 +- README.md | 4 +- scripts/collect_companion.py | 227 ++++++++++++++++----------------- scripts/collect_repeater.py | 151 +++++++++++----------- src/meshmon/meshcore_client.py | 91 ++++++++++++- 5 files changed, 277 insertions(+), 204 deletions(-) diff --git a/CLAUDE.md b/CLAUDE.md index 5fdc7bd..8ef90d9 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -694,16 +694,14 @@ meshcore-cli -s /dev/ttyACM0 reset_path "repeater name" ## Cron Setup (Example) -Use `flock` to prevent USB serial conflicts when companion and repeater collection overlap. - ```cron MESHCORE=/path/to/meshcore-stats # Companion: every minute -* * * * * cd $MESHCORE && flock -w 60 /tmp/meshcore.lock .venv/bin/python scripts/collect_companion.py +* * * * * cd $MESHCORE && .venv/bin/python scripts/collect_companion.py # Repeater: every 15 minutes (offset by 1 min for staggering) -1,16,31,46 * * * * cd $MESHCORE && flock -w 60 /tmp/meshcore.lock .venv/bin/python scripts/collect_repeater.py +1,16,31,46 * * * * cd $MESHCORE && .venv/bin/python scripts/collect_repeater.py # Charts: every 5 minutes (generates SVG charts from database) */5 * * * * cd $MESHCORE && .venv/bin/python scripts/render_charts.py @@ -717,7 +715,7 @@ MESHCORE=/path/to/meshcore-stats **Notes:** - `cd $MESHCORE` is required because paths in the config are relative to the project root -- `flock -w 60` waits up to 60 seconds for the lock, preventing USB serial conflicts +- Serial port locking is handled automatically via `fcntl.flock()` in Python (no external `flock` needed) ## Adding New Metrics diff --git a/README.md b/README.md index ed1508a..f16b50e 100644 --- a/README.md +++ b/README.md @@ -184,10 +184,10 @@ Add to your crontab (`crontab -e`): MESHCORE=/path/to/meshcore-stats # Companion: every minute -* * * * * cd $MESHCORE && flock -w 60 /tmp/meshcore.lock .venv/bin/python scripts/collect_companion.py +* * * * * cd $MESHCORE && .venv/bin/python scripts/collect_companion.py # Repeater: every 15 minutes -1,16,31,46 * * * * cd $MESHCORE && flock -w 60 /tmp/meshcore.lock .venv/bin/python scripts/collect_repeater.py +1,16,31,46 * * * * cd $MESHCORE && .venv/bin/python scripts/collect_repeater.py # Charts: every 5 minutes */5 * * * * cd $MESHCORE && .venv/bin/python scripts/render_charts.py diff --git a/scripts/collect_companion.py b/scripts/collect_companion.py index 570dd99..2b649c3 100755 --- a/scripts/collect_companion.py +++ b/scripts/collect_companion.py @@ -25,7 +25,7 @@ sys.path.insert(0, str(Path(__file__).parent.parent / "src")) from meshmon.env import get_config from meshmon import log -from meshmon.meshcore_client import connect_from_env, run_command +from meshmon.meshcore_client import connect_with_lock, run_command from meshmon.db import init_db, insert_metrics @@ -39,138 +39,131 @@ async def collect_companion() -> int: cfg = get_config() ts = int(time.time()) - log.debug("Connecting to companion node...") - mc = await connect_from_env() - - if mc is None: - log.error("Failed to connect to companion node") - return 1 - # Metrics to insert (firmware field names) metrics: dict[str, float] = {} commands_succeeded = 0 - # Commands are accessed via mc.commands - cmd = mc.commands + log.debug("Connecting to companion node...") + async with connect_with_lock() as mc: + if mc is None: + log.error("Failed to connect to companion node") + return 1 - try: - # send_appstart (already called during connect, but call again to get self_info) - ok, evt_type, payload, err = await run_command( - mc, cmd.send_appstart(), "send_appstart" - ) - if ok: - commands_succeeded += 1 - log.debug(f"appstart: {evt_type}") - else: - log.error(f"appstart failed: {err}") + # Commands are accessed via mc.commands + cmd = mc.commands - # send_device_query - ok, evt_type, payload, err = await run_command( - mc, cmd.send_device_query(), "send_device_query" - ) - if ok: - commands_succeeded += 1 - log.debug(f"device_query: {payload}") - else: - log.error(f"device_query failed: {err}") + try: + # send_appstart (already called during connect, but call again to get self_info) + ok, evt_type, payload, err = await run_command( + mc, cmd.send_appstart(), "send_appstart" + ) + if ok: + commands_succeeded += 1 + log.debug(f"appstart: {evt_type}") + else: + log.error(f"appstart failed: {err}") - # get_bat - ok, evt_type, payload, err = await run_command( - mc, cmd.get_bat(), "get_bat" - ) - if ok: - commands_succeeded += 1 - log.debug(f"get_bat: {payload}") - else: - log.error(f"get_bat failed: {err}") + # send_device_query + ok, evt_type, payload, err = await run_command( + mc, cmd.send_device_query(), "send_device_query" + ) + if ok: + commands_succeeded += 1 + log.debug(f"device_query: {payload}") + else: + log.error(f"device_query failed: {err}") - # get_time - ok, evt_type, payload, err = await run_command( - mc, cmd.get_time(), "get_time" - ) - if ok: - commands_succeeded += 1 - log.debug(f"get_time: {payload}") - else: - log.error(f"get_time failed: {err}") + # get_bat + ok, evt_type, payload, err = await run_command( + mc, cmd.get_bat(), "get_bat" + ) + if ok: + commands_succeeded += 1 + log.debug(f"get_bat: {payload}") + else: + log.error(f"get_bat failed: {err}") - # get_self_telemetry - ok, evt_type, payload, err = await run_command( - mc, cmd.get_self_telemetry(), "get_self_telemetry" - ) - if ok: - commands_succeeded += 1 - log.debug(f"get_self_telemetry: {payload}") - else: - log.error(f"get_self_telemetry failed: {err}") + # get_time + ok, evt_type, payload, err = await run_command( + mc, cmd.get_time(), "get_time" + ) + if ok: + commands_succeeded += 1 + log.debug(f"get_time: {payload}") + else: + log.error(f"get_time failed: {err}") - # get_custom_vars - ok, evt_type, payload, err = await run_command( - mc, cmd.get_custom_vars(), "get_custom_vars" - ) - if ok: - commands_succeeded += 1 - log.debug(f"get_custom_vars: {payload}") - else: - log.debug(f"get_custom_vars failed: {err}") + # get_self_telemetry + ok, evt_type, payload, err = await run_command( + mc, cmd.get_self_telemetry(), "get_self_telemetry" + ) + if ok: + commands_succeeded += 1 + log.debug(f"get_self_telemetry: {payload}") + else: + log.error(f"get_self_telemetry failed: {err}") - # get_contacts - count contacts - ok, evt_type, payload, err = await run_command( - mc, cmd.get_contacts(), "get_contacts" - ) - if ok: - commands_succeeded += 1 - contacts_count = len(payload) if payload else 0 - metrics["contacts"] = float(contacts_count) - log.debug(f"get_contacts: found {contacts_count} contacts") - else: - log.error(f"get_contacts failed: {err}") + # get_custom_vars + ok, evt_type, payload, err = await run_command( + mc, cmd.get_custom_vars(), "get_custom_vars" + ) + if ok: + commands_succeeded += 1 + log.debug(f"get_custom_vars: {payload}") + else: + log.debug(f"get_custom_vars failed: {err}") - # Get statistics - these contain the main metrics - # Core stats (battery_mv, uptime_secs, errors, queue_len) - ok, evt_type, payload, err = await run_command( - mc, cmd.get_stats_core(), "get_stats_core" - ) - if ok and payload and isinstance(payload, dict): - commands_succeeded += 1 - # Insert all numeric fields from stats_core - for key, value in payload.items(): - if isinstance(value, (int, float)): - metrics[key] = float(value) - log.debug(f"stats_core: {payload}") + # get_contacts - count contacts + ok, evt_type, payload, err = await run_command( + mc, cmd.get_contacts(), "get_contacts" + ) + if ok: + commands_succeeded += 1 + contacts_count = len(payload) if payload else 0 + metrics["contacts"] = float(contacts_count) + log.debug(f"get_contacts: found {contacts_count} contacts") + else: + log.error(f"get_contacts failed: {err}") - # Radio stats (noise_floor, last_rssi, last_snr, tx_air_secs, rx_air_secs) - ok, evt_type, payload, err = await run_command( - mc, cmd.get_stats_radio(), "get_stats_radio" - ) - if ok and payload and isinstance(payload, dict): - commands_succeeded += 1 - for key, value in payload.items(): - if isinstance(value, (int, float)): - metrics[key] = float(value) - log.debug(f"stats_radio: {payload}") + # Get statistics - these contain the main metrics + # Core stats (battery_mv, uptime_secs, errors, queue_len) + ok, evt_type, payload, err = await run_command( + mc, cmd.get_stats_core(), "get_stats_core" + ) + if ok and payload and isinstance(payload, dict): + commands_succeeded += 1 + # Insert all numeric fields from stats_core + for key, value in payload.items(): + if isinstance(value, (int, float)): + metrics[key] = float(value) + log.debug(f"stats_core: {payload}") - # Packet stats (recv, sent, flood_tx, direct_tx, flood_rx, direct_rx) - ok, evt_type, payload, err = await run_command( - mc, cmd.get_stats_packets(), "get_stats_packets" - ) - if ok and payload and isinstance(payload, dict): - commands_succeeded += 1 - for key, value in payload.items(): - if isinstance(value, (int, float)): - metrics[key] = float(value) - log.debug(f"stats_packets: {payload}") + # Radio stats (noise_floor, last_rssi, last_snr, tx_air_secs, rx_air_secs) + ok, evt_type, payload, err = await run_command( + mc, cmd.get_stats_radio(), "get_stats_radio" + ) + if ok and payload and isinstance(payload, dict): + commands_succeeded += 1 + for key, value in payload.items(): + if isinstance(value, (int, float)): + metrics[key] = float(value) + log.debug(f"stats_radio: {payload}") - except Exception as e: - log.error(f"Error during collection: {e}") + # Packet stats (recv, sent, flood_tx, direct_tx, flood_rx, direct_rx) + ok, evt_type, payload, err = await run_command( + mc, cmd.get_stats_packets(), "get_stats_packets" + ) + if ok and payload and isinstance(payload, dict): + commands_succeeded += 1 + for key, value in payload.items(): + if isinstance(value, (int, float)): + metrics[key] = float(value) + log.debug(f"stats_packets: {payload}") - finally: - # Close connection - if hasattr(mc, "disconnect"): - try: - await mc.disconnect() - except Exception: - pass + except Exception as e: + log.error(f"Error during collection: {e}") + + # Connection closed and lock released by context manager # Print summary summary_parts = [f"ts={ts}"] diff --git a/scripts/collect_repeater.py b/scripts/collect_repeater.py index ef8c2ab..d69106b 100755 --- a/scripts/collect_repeater.py +++ b/scripts/collect_repeater.py @@ -27,7 +27,7 @@ sys.path.insert(0, str(Path(__file__).parent.parent / "src")) from meshmon.env import get_config from meshmon import log from meshmon.meshcore_client import ( - connect_from_env, + connect_with_lock, run_command, get_contact_by_name, get_contact_by_key_prefix, @@ -161,97 +161,90 @@ async def collect_repeater() -> int: # Skip collection - no metrics to write return 0 - # Connect to companion - log.debug("Connecting to companion node...") - mc = await connect_from_env() - - if mc is None: - log.error("Failed to connect to companion node") - return 1 - # Metrics to insert (firmware field names from req_status_sync) metrics: dict[str, float] = {} node_name = "unknown" status_ok = False - # Commands are accessed via mc.commands - cmd = mc.commands - - try: - # Initialize (appstart already called during connect) - ok, evt_type, payload, err = await run_command( - mc, cmd.send_appstart(), "send_appstart" - ) - if not ok: - log.error(f"appstart failed: {err}") - - # Find repeater contact - contact = await find_repeater_contact(mc) - - if contact is None: - log.error("Cannot find repeater contact") + # Connect to companion + log.debug("Connecting to companion node...") + async with connect_with_lock() as mc: + if mc is None: + log.error("Failed to connect to companion node") return 1 - # Store contact info - contact_info = extract_contact_info(contact) - node_name = contact_info.get("adv_name", "unknown") + # Commands are accessed via mc.commands + cmd = mc.commands - log.debug(f"Found repeater: {node_name}") + try: + # Initialize (appstart already called during connect) + ok, evt_type, payload, err = await run_command( + mc, cmd.send_appstart(), "send_appstart" + ) + if not ok: + log.error(f"appstart failed: {err}") - # Optional login (if command exists) - if cfg.repeater_password and hasattr(cmd, "send_login"): - log.debug("Attempting login...") - try: - ok, evt_type, payload, err = await run_command( - mc, - cmd.send_login(contact, cfg.repeater_password), - "send_login", - ) - if ok: - log.debug("Login successful") - else: - log.debug(f"Login failed or not supported: {err}") - except Exception as e: - log.debug(f"Login not supported: {e}") + # Find repeater contact + contact = await find_repeater_contact(mc) - # Query status (using _sync version which returns payload directly) - # Use timeout=0 to let the device suggest timeout, with min_timeout as floor - log.debug("Querying repeater status...") - success, payload, err = await query_repeater_with_retry( - mc, - contact, - "req_status_sync", - lambda: cmd.req_status_sync(contact, timeout=0, min_timeout=cfg.remote_timeout_s), - ) - if success and payload and isinstance(payload, dict): - status_ok = True - # Insert all numeric fields from status response - for key, value in payload.items(): - if isinstance(value, (int, float)): - metrics[key] = float(value) - log.debug(f"req_status_sync: {payload}") - else: - log.warn(f"req_status_sync failed: {err}") + if contact is None: + log.error("Cannot find repeater contact") + return 1 - # Update circuit breaker - if status_ok: - cb.record_success() - log.debug("Circuit breaker: recorded success") - else: + # Store contact info + contact_info = extract_contact_info(contact) + node_name = contact_info.get("adv_name", "unknown") + + log.debug(f"Found repeater: {node_name}") + + # Optional login (if command exists) + if cfg.repeater_password and hasattr(cmd, "send_login"): + log.debug("Attempting login...") + try: + ok, evt_type, payload, err = await run_command( + mc, + cmd.send_login(contact, cfg.repeater_password), + "send_login", + ) + if ok: + log.debug("Login successful") + else: + log.debug(f"Login failed or not supported: {err}") + except Exception as e: + log.debug(f"Login not supported: {e}") + + # Query status (using _sync version which returns payload directly) + # Use timeout=0 to let the device suggest timeout, with min_timeout as floor + log.debug("Querying repeater status...") + success, payload, err = await query_repeater_with_retry( + mc, + contact, + "req_status_sync", + lambda: cmd.req_status_sync(contact, timeout=0, min_timeout=cfg.remote_timeout_s), + ) + if success and payload and isinstance(payload, dict): + status_ok = True + # Insert all numeric fields from status response + for key, value in payload.items(): + if isinstance(value, (int, float)): + metrics[key] = float(value) + log.debug(f"req_status_sync: {payload}") + else: + log.warn(f"req_status_sync failed: {err}") + + # Update circuit breaker + if status_ok: + cb.record_success() + log.debug("Circuit breaker: recorded success") + else: + cb.record_failure(cfg.remote_cb_fails, cfg.remote_cb_cooldown_s) + log.debug(f"Circuit breaker: recorded failure ({cb.consecutive_failures}/{cfg.remote_cb_fails})") + + except Exception as e: + log.error(f"Error during collection: {e}") cb.record_failure(cfg.remote_cb_fails, cfg.remote_cb_cooldown_s) - log.debug(f"Circuit breaker: recorded failure ({cb.consecutive_failures}/{cfg.remote_cb_fails})") - except Exception as e: - log.error(f"Error during collection: {e}") - cb.record_failure(cfg.remote_cb_fails, cfg.remote_cb_cooldown_s) - - finally: - # Close connection - if hasattr(mc, "disconnect"): - try: - await mc.disconnect() - except Exception: - pass + # Connection closed and lock released by context manager # Print summary summary_parts = [f"ts={ts}"] diff --git a/src/meshmon/meshcore_client.py b/src/meshmon/meshcore_client.py index 658206a..44d8b34 100644 --- a/src/meshmon/meshcore_client.py +++ b/src/meshmon/meshcore_client.py @@ -1,7 +1,10 @@ """MeshCore client wrapper with safe command execution and contact lookup.""" import asyncio -from typing import Any, Optional, Callable, Coroutine +import fcntl +from contextlib import asynccontextmanager +from pathlib import Path +from typing import Any, AsyncIterator, Callable, Coroutine, Optional from .env import get_config from . import log @@ -100,6 +103,92 @@ async def connect_from_env() -> Optional[Any]: return None +async def _acquire_lock_async( + lock_file, + timeout: float = 60.0, + poll_interval: float = 0.1, +) -> None: + """Acquire exclusive file lock without blocking the event loop. + + Uses non-blocking LOCK_NB with async polling to avoid freezing the event loop. + + Args: + lock_file: Open file handle to lock + timeout: Maximum seconds to wait for lock + poll_interval: Seconds between lock attempts + + Raises: + TimeoutError: If lock cannot be acquired within timeout + """ + loop = asyncio.get_running_loop() + deadline = loop.time() + timeout + + while True: + try: + fcntl.flock(lock_file.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB) + return + except BlockingIOError: + if loop.time() >= deadline: + raise TimeoutError( + f"Could not acquire serial lock within {timeout}s. " + "Another process may be using the serial port." + ) + await asyncio.sleep(poll_interval) + + +@asynccontextmanager +async def connect_with_lock( + lock_timeout: float = 60.0, +) -> AsyncIterator[Optional[Any]]: + """Connect to MeshCore with serial port locking to prevent concurrent access. + + For serial transport: Acquires exclusive file lock before connecting. + For TCP/BLE: No locking needed (protocol handles multiple connections). + + Args: + lock_timeout: Maximum seconds to wait for serial lock + + Yields: + MeshCore client instance, or None if connection failed + """ + cfg = get_config() + lock_file = None + mc = None + needs_lock = cfg.mesh_transport.lower() == "serial" + + try: + if needs_lock: + lock_path: Path = cfg.state_dir / "serial.lock" + lock_path.parent.mkdir(parents=True, exist_ok=True) + + # Use 'a' mode: doesn't truncate, creates if missing + lock_file = open(lock_path, "a") + try: + await _acquire_lock_async(lock_file, timeout=lock_timeout) + log.debug(f"Acquired serial lock: {lock_path}") + except Exception: + # If lock acquisition fails, close file before re-raising + lock_file.close() + lock_file = None + raise + + mc = await connect_from_env() + yield mc + + finally: + # Disconnect first (while we still hold the lock) + if mc is not None and hasattr(mc, "disconnect"): + try: + await mc.disconnect() + except Exception as e: + log.debug(f"Error during disconnect (ignored): {e}") + + # Release lock by closing the file (close() auto-releases flock) + if lock_file is not None: + lock_file.close() + log.debug("Released serial lock") + + async def run_command( mc: Any, cmd_coro: Coroutine,