feat: add automatic serial port locking to prevent concurrent access

Implements fcntl.flock() based locking for serial transport to prevent
USB serial conflicts when collect_companion and collect_repeater run
simultaneously. This addresses Ofelia's limitation where no-overlap
only prevents a job from overlapping with itself, not other jobs.

Key changes:
- Add connect_with_lock() async context manager to meshcore_client.py
- Use non-blocking LOCK_NB with async polling to avoid freezing event loop
- Only lock for serial transport (TCP/BLE don't need it)
- 60s timeout with clear error message if lock cannot be acquired
- Update collector scripts to use new context manager
- Remove external flock from cron examples (now handled in Python)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Jorijn Schrijvershof
2026-01-05 09:59:37 +01:00
parent 7eee23ec40
commit 3c5eace220
5 changed files with 277 additions and 204 deletions

View File

@@ -694,16 +694,14 @@ meshcore-cli -s /dev/ttyACM0 reset_path "repeater name"
## Cron Setup (Example)
Use `flock` to prevent USB serial conflicts when companion and repeater collection overlap.
```cron
MESHCORE=/path/to/meshcore-stats
# Companion: every minute
* * * * * cd $MESHCORE && flock -w 60 /tmp/meshcore.lock .venv/bin/python scripts/collect_companion.py
* * * * * cd $MESHCORE && .venv/bin/python scripts/collect_companion.py
# Repeater: every 15 minutes (offset by 1 min for staggering)
1,16,31,46 * * * * cd $MESHCORE && flock -w 60 /tmp/meshcore.lock .venv/bin/python scripts/collect_repeater.py
1,16,31,46 * * * * cd $MESHCORE && .venv/bin/python scripts/collect_repeater.py
# Charts: every 5 minutes (generates SVG charts from database)
*/5 * * * * cd $MESHCORE && .venv/bin/python scripts/render_charts.py
@@ -717,7 +715,7 @@ MESHCORE=/path/to/meshcore-stats
**Notes:**
- `cd $MESHCORE` is required because paths in the config are relative to the project root
- `flock -w 60` waits up to 60 seconds for the lock, preventing USB serial conflicts
- Serial port locking is handled automatically via `fcntl.flock()` in Python (no external `flock` needed)
## Adding New Metrics

View File

@@ -184,10 +184,10 @@ Add to your crontab (`crontab -e`):
MESHCORE=/path/to/meshcore-stats
# Companion: every minute
* * * * * cd $MESHCORE && flock -w 60 /tmp/meshcore.lock .venv/bin/python scripts/collect_companion.py
* * * * * cd $MESHCORE && .venv/bin/python scripts/collect_companion.py
# Repeater: every 15 minutes
1,16,31,46 * * * * cd $MESHCORE && flock -w 60 /tmp/meshcore.lock .venv/bin/python scripts/collect_repeater.py
1,16,31,46 * * * * cd $MESHCORE && .venv/bin/python scripts/collect_repeater.py
# Charts: every 5 minutes
*/5 * * * * cd $MESHCORE && .venv/bin/python scripts/render_charts.py

View File

@@ -25,7 +25,7 @@ sys.path.insert(0, str(Path(__file__).parent.parent / "src"))
from meshmon.env import get_config
from meshmon import log
from meshmon.meshcore_client import connect_from_env, run_command
from meshmon.meshcore_client import connect_with_lock, run_command
from meshmon.db import init_db, insert_metrics
@@ -39,138 +39,131 @@ async def collect_companion() -> int:
cfg = get_config()
ts = int(time.time())
log.debug("Connecting to companion node...")
mc = await connect_from_env()
if mc is None:
log.error("Failed to connect to companion node")
return 1
# Metrics to insert (firmware field names)
metrics: dict[str, float] = {}
commands_succeeded = 0
# Commands are accessed via mc.commands
cmd = mc.commands
log.debug("Connecting to companion node...")
async with connect_with_lock() as mc:
if mc is None:
log.error("Failed to connect to companion node")
return 1
try:
# send_appstart (already called during connect, but call again to get self_info)
ok, evt_type, payload, err = await run_command(
mc, cmd.send_appstart(), "send_appstart"
)
if ok:
commands_succeeded += 1
log.debug(f"appstart: {evt_type}")
else:
log.error(f"appstart failed: {err}")
# Commands are accessed via mc.commands
cmd = mc.commands
# send_device_query
ok, evt_type, payload, err = await run_command(
mc, cmd.send_device_query(), "send_device_query"
)
if ok:
commands_succeeded += 1
log.debug(f"device_query: {payload}")
else:
log.error(f"device_query failed: {err}")
try:
# send_appstart (already called during connect, but call again to get self_info)
ok, evt_type, payload, err = await run_command(
mc, cmd.send_appstart(), "send_appstart"
)
if ok:
commands_succeeded += 1
log.debug(f"appstart: {evt_type}")
else:
log.error(f"appstart failed: {err}")
# get_bat
ok, evt_type, payload, err = await run_command(
mc, cmd.get_bat(), "get_bat"
)
if ok:
commands_succeeded += 1
log.debug(f"get_bat: {payload}")
else:
log.error(f"get_bat failed: {err}")
# send_device_query
ok, evt_type, payload, err = await run_command(
mc, cmd.send_device_query(), "send_device_query"
)
if ok:
commands_succeeded += 1
log.debug(f"device_query: {payload}")
else:
log.error(f"device_query failed: {err}")
# get_time
ok, evt_type, payload, err = await run_command(
mc, cmd.get_time(), "get_time"
)
if ok:
commands_succeeded += 1
log.debug(f"get_time: {payload}")
else:
log.error(f"get_time failed: {err}")
# get_bat
ok, evt_type, payload, err = await run_command(
mc, cmd.get_bat(), "get_bat"
)
if ok:
commands_succeeded += 1
log.debug(f"get_bat: {payload}")
else:
log.error(f"get_bat failed: {err}")
# get_self_telemetry
ok, evt_type, payload, err = await run_command(
mc, cmd.get_self_telemetry(), "get_self_telemetry"
)
if ok:
commands_succeeded += 1
log.debug(f"get_self_telemetry: {payload}")
else:
log.error(f"get_self_telemetry failed: {err}")
# get_time
ok, evt_type, payload, err = await run_command(
mc, cmd.get_time(), "get_time"
)
if ok:
commands_succeeded += 1
log.debug(f"get_time: {payload}")
else:
log.error(f"get_time failed: {err}")
# get_custom_vars
ok, evt_type, payload, err = await run_command(
mc, cmd.get_custom_vars(), "get_custom_vars"
)
if ok:
commands_succeeded += 1
log.debug(f"get_custom_vars: {payload}")
else:
log.debug(f"get_custom_vars failed: {err}")
# get_self_telemetry
ok, evt_type, payload, err = await run_command(
mc, cmd.get_self_telemetry(), "get_self_telemetry"
)
if ok:
commands_succeeded += 1
log.debug(f"get_self_telemetry: {payload}")
else:
log.error(f"get_self_telemetry failed: {err}")
# get_contacts - count contacts
ok, evt_type, payload, err = await run_command(
mc, cmd.get_contacts(), "get_contacts"
)
if ok:
commands_succeeded += 1
contacts_count = len(payload) if payload else 0
metrics["contacts"] = float(contacts_count)
log.debug(f"get_contacts: found {contacts_count} contacts")
else:
log.error(f"get_contacts failed: {err}")
# get_custom_vars
ok, evt_type, payload, err = await run_command(
mc, cmd.get_custom_vars(), "get_custom_vars"
)
if ok:
commands_succeeded += 1
log.debug(f"get_custom_vars: {payload}")
else:
log.debug(f"get_custom_vars failed: {err}")
# Get statistics - these contain the main metrics
# Core stats (battery_mv, uptime_secs, errors, queue_len)
ok, evt_type, payload, err = await run_command(
mc, cmd.get_stats_core(), "get_stats_core"
)
if ok and payload and isinstance(payload, dict):
commands_succeeded += 1
# Insert all numeric fields from stats_core
for key, value in payload.items():
if isinstance(value, (int, float)):
metrics[key] = float(value)
log.debug(f"stats_core: {payload}")
# get_contacts - count contacts
ok, evt_type, payload, err = await run_command(
mc, cmd.get_contacts(), "get_contacts"
)
if ok:
commands_succeeded += 1
contacts_count = len(payload) if payload else 0
metrics["contacts"] = float(contacts_count)
log.debug(f"get_contacts: found {contacts_count} contacts")
else:
log.error(f"get_contacts failed: {err}")
# Radio stats (noise_floor, last_rssi, last_snr, tx_air_secs, rx_air_secs)
ok, evt_type, payload, err = await run_command(
mc, cmd.get_stats_radio(), "get_stats_radio"
)
if ok and payload and isinstance(payload, dict):
commands_succeeded += 1
for key, value in payload.items():
if isinstance(value, (int, float)):
metrics[key] = float(value)
log.debug(f"stats_radio: {payload}")
# Get statistics - these contain the main metrics
# Core stats (battery_mv, uptime_secs, errors, queue_len)
ok, evt_type, payload, err = await run_command(
mc, cmd.get_stats_core(), "get_stats_core"
)
if ok and payload and isinstance(payload, dict):
commands_succeeded += 1
# Insert all numeric fields from stats_core
for key, value in payload.items():
if isinstance(value, (int, float)):
metrics[key] = float(value)
log.debug(f"stats_core: {payload}")
# Packet stats (recv, sent, flood_tx, direct_tx, flood_rx, direct_rx)
ok, evt_type, payload, err = await run_command(
mc, cmd.get_stats_packets(), "get_stats_packets"
)
if ok and payload and isinstance(payload, dict):
commands_succeeded += 1
for key, value in payload.items():
if isinstance(value, (int, float)):
metrics[key] = float(value)
log.debug(f"stats_packets: {payload}")
# Radio stats (noise_floor, last_rssi, last_snr, tx_air_secs, rx_air_secs)
ok, evt_type, payload, err = await run_command(
mc, cmd.get_stats_radio(), "get_stats_radio"
)
if ok and payload and isinstance(payload, dict):
commands_succeeded += 1
for key, value in payload.items():
if isinstance(value, (int, float)):
metrics[key] = float(value)
log.debug(f"stats_radio: {payload}")
except Exception as e:
log.error(f"Error during collection: {e}")
# Packet stats (recv, sent, flood_tx, direct_tx, flood_rx, direct_rx)
ok, evt_type, payload, err = await run_command(
mc, cmd.get_stats_packets(), "get_stats_packets"
)
if ok and payload and isinstance(payload, dict):
commands_succeeded += 1
for key, value in payload.items():
if isinstance(value, (int, float)):
metrics[key] = float(value)
log.debug(f"stats_packets: {payload}")
finally:
# Close connection
if hasattr(mc, "disconnect"):
try:
await mc.disconnect()
except Exception:
pass
except Exception as e:
log.error(f"Error during collection: {e}")
# Connection closed and lock released by context manager
# Print summary
summary_parts = [f"ts={ts}"]

View File

@@ -27,7 +27,7 @@ sys.path.insert(0, str(Path(__file__).parent.parent / "src"))
from meshmon.env import get_config
from meshmon import log
from meshmon.meshcore_client import (
connect_from_env,
connect_with_lock,
run_command,
get_contact_by_name,
get_contact_by_key_prefix,
@@ -161,97 +161,90 @@ async def collect_repeater() -> int:
# Skip collection - no metrics to write
return 0
# Connect to companion
log.debug("Connecting to companion node...")
mc = await connect_from_env()
if mc is None:
log.error("Failed to connect to companion node")
return 1
# Metrics to insert (firmware field names from req_status_sync)
metrics: dict[str, float] = {}
node_name = "unknown"
status_ok = False
# Commands are accessed via mc.commands
cmd = mc.commands
try:
# Initialize (appstart already called during connect)
ok, evt_type, payload, err = await run_command(
mc, cmd.send_appstart(), "send_appstart"
)
if not ok:
log.error(f"appstart failed: {err}")
# Find repeater contact
contact = await find_repeater_contact(mc)
if contact is None:
log.error("Cannot find repeater contact")
# Connect to companion
log.debug("Connecting to companion node...")
async with connect_with_lock() as mc:
if mc is None:
log.error("Failed to connect to companion node")
return 1
# Store contact info
contact_info = extract_contact_info(contact)
node_name = contact_info.get("adv_name", "unknown")
# Commands are accessed via mc.commands
cmd = mc.commands
log.debug(f"Found repeater: {node_name}")
try:
# Initialize (appstart already called during connect)
ok, evt_type, payload, err = await run_command(
mc, cmd.send_appstart(), "send_appstart"
)
if not ok:
log.error(f"appstart failed: {err}")
# Optional login (if command exists)
if cfg.repeater_password and hasattr(cmd, "send_login"):
log.debug("Attempting login...")
try:
ok, evt_type, payload, err = await run_command(
mc,
cmd.send_login(contact, cfg.repeater_password),
"send_login",
)
if ok:
log.debug("Login successful")
else:
log.debug(f"Login failed or not supported: {err}")
except Exception as e:
log.debug(f"Login not supported: {e}")
# Find repeater contact
contact = await find_repeater_contact(mc)
# Query status (using _sync version which returns payload directly)
# Use timeout=0 to let the device suggest timeout, with min_timeout as floor
log.debug("Querying repeater status...")
success, payload, err = await query_repeater_with_retry(
mc,
contact,
"req_status_sync",
lambda: cmd.req_status_sync(contact, timeout=0, min_timeout=cfg.remote_timeout_s),
)
if success and payload and isinstance(payload, dict):
status_ok = True
# Insert all numeric fields from status response
for key, value in payload.items():
if isinstance(value, (int, float)):
metrics[key] = float(value)
log.debug(f"req_status_sync: {payload}")
else:
log.warn(f"req_status_sync failed: {err}")
if contact is None:
log.error("Cannot find repeater contact")
return 1
# Update circuit breaker
if status_ok:
cb.record_success()
log.debug("Circuit breaker: recorded success")
else:
# Store contact info
contact_info = extract_contact_info(contact)
node_name = contact_info.get("adv_name", "unknown")
log.debug(f"Found repeater: {node_name}")
# Optional login (if command exists)
if cfg.repeater_password and hasattr(cmd, "send_login"):
log.debug("Attempting login...")
try:
ok, evt_type, payload, err = await run_command(
mc,
cmd.send_login(contact, cfg.repeater_password),
"send_login",
)
if ok:
log.debug("Login successful")
else:
log.debug(f"Login failed or not supported: {err}")
except Exception as e:
log.debug(f"Login not supported: {e}")
# Query status (using _sync version which returns payload directly)
# Use timeout=0 to let the device suggest timeout, with min_timeout as floor
log.debug("Querying repeater status...")
success, payload, err = await query_repeater_with_retry(
mc,
contact,
"req_status_sync",
lambda: cmd.req_status_sync(contact, timeout=0, min_timeout=cfg.remote_timeout_s),
)
if success and payload and isinstance(payload, dict):
status_ok = True
# Insert all numeric fields from status response
for key, value in payload.items():
if isinstance(value, (int, float)):
metrics[key] = float(value)
log.debug(f"req_status_sync: {payload}")
else:
log.warn(f"req_status_sync failed: {err}")
# Update circuit breaker
if status_ok:
cb.record_success()
log.debug("Circuit breaker: recorded success")
else:
cb.record_failure(cfg.remote_cb_fails, cfg.remote_cb_cooldown_s)
log.debug(f"Circuit breaker: recorded failure ({cb.consecutive_failures}/{cfg.remote_cb_fails})")
except Exception as e:
log.error(f"Error during collection: {e}")
cb.record_failure(cfg.remote_cb_fails, cfg.remote_cb_cooldown_s)
log.debug(f"Circuit breaker: recorded failure ({cb.consecutive_failures}/{cfg.remote_cb_fails})")
except Exception as e:
log.error(f"Error during collection: {e}")
cb.record_failure(cfg.remote_cb_fails, cfg.remote_cb_cooldown_s)
finally:
# Close connection
if hasattr(mc, "disconnect"):
try:
await mc.disconnect()
except Exception:
pass
# Connection closed and lock released by context manager
# Print summary
summary_parts = [f"ts={ts}"]

View File

@@ -1,7 +1,10 @@
"""MeshCore client wrapper with safe command execution and contact lookup."""
import asyncio
from typing import Any, Optional, Callable, Coroutine
import fcntl
from contextlib import asynccontextmanager
from pathlib import Path
from typing import Any, AsyncIterator, Callable, Coroutine, Optional
from .env import get_config
from . import log
@@ -100,6 +103,92 @@ async def connect_from_env() -> Optional[Any]:
return None
async def _acquire_lock_async(
lock_file,
timeout: float = 60.0,
poll_interval: float = 0.1,
) -> None:
"""Acquire exclusive file lock without blocking the event loop.
Uses non-blocking LOCK_NB with async polling to avoid freezing the event loop.
Args:
lock_file: Open file handle to lock
timeout: Maximum seconds to wait for lock
poll_interval: Seconds between lock attempts
Raises:
TimeoutError: If lock cannot be acquired within timeout
"""
loop = asyncio.get_running_loop()
deadline = loop.time() + timeout
while True:
try:
fcntl.flock(lock_file.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
return
except BlockingIOError:
if loop.time() >= deadline:
raise TimeoutError(
f"Could not acquire serial lock within {timeout}s. "
"Another process may be using the serial port."
)
await asyncio.sleep(poll_interval)
@asynccontextmanager
async def connect_with_lock(
lock_timeout: float = 60.0,
) -> AsyncIterator[Optional[Any]]:
"""Connect to MeshCore with serial port locking to prevent concurrent access.
For serial transport: Acquires exclusive file lock before connecting.
For TCP/BLE: No locking needed (protocol handles multiple connections).
Args:
lock_timeout: Maximum seconds to wait for serial lock
Yields:
MeshCore client instance, or None if connection failed
"""
cfg = get_config()
lock_file = None
mc = None
needs_lock = cfg.mesh_transport.lower() == "serial"
try:
if needs_lock:
lock_path: Path = cfg.state_dir / "serial.lock"
lock_path.parent.mkdir(parents=True, exist_ok=True)
# Use 'a' mode: doesn't truncate, creates if missing
lock_file = open(lock_path, "a")
try:
await _acquire_lock_async(lock_file, timeout=lock_timeout)
log.debug(f"Acquired serial lock: {lock_path}")
except Exception:
# If lock acquisition fails, close file before re-raising
lock_file.close()
lock_file = None
raise
mc = await connect_from_env()
yield mc
finally:
# Disconnect first (while we still hold the lock)
if mc is not None and hasattr(mc, "disconnect"):
try:
await mc.disconnect()
except Exception as e:
log.debug(f"Error during disconnect (ignored): {e}")
# Release lock by closing the file (close() auto-releases flock)
if lock_file is not None:
lock_file.close()
log.debug("Released serial lock")
async def run_command(
mc: Any,
cmd_coro: Coroutine,