diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 9225b7a..fa319ba 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -91,33 +91,6 @@ jobs: name: codecov-umbrella fail_ci_if_error: false - security: - name: Security Scan - runs-on: ubuntu-latest - - steps: - - uses: actions/checkout@v4 - - - name: Set up Python - uses: actions/setup-python@v5 - with: - python-version: "3.12" - - - name: Install dependencies - run: | - python -m pip install --upgrade pip - pip install -e ".[dev]" - - - name: Run Bandit security scan - run: | - bandit -r meshcore_mqtt/ -f json -o bandit-report.json || true - bandit -r meshcore_mqtt/ - - # - name: Run Safety check - # run: | - # safety check --json --output safety-report.json || true - # safety check - build-test: name: Build Test runs-on: ubuntu-latest diff --git a/README.md b/README.md index 40e7eed..c137990 100644 --- a/README.md +++ b/README.md @@ -8,7 +8,6 @@ [![Docker Build and Push](https://github.com/ipnet-mesh/meshcore-mqtt/actions/workflows/docker-build.yml/badge.svg)](https://github.com/ipnet-mesh/meshcore-mqtt/actions/workflows/docker-build.yml) [![Code style: black](https://img.shields.io/badge/Code%20style-black-000000.svg)](https://github.com/psf/black) [![Typing: mypy](https://img.shields.io/badge/Typing-mypy-blue.svg)](https://mypy.readthedocs.io/) -[![Security: bandit](https://img.shields.io/badge/Security-bandit-yellow.svg)](https://github.com/PyCQA/bandit) A robust bridge service that connects MeshCore devices to MQTT brokers, enabling seamless integration with IoT platforms and message processing systems. diff --git a/meshcore_mqtt/config.py b/meshcore_mqtt/config.py index adfa3ab..a633b53 100644 --- a/meshcore_mqtt/config.py +++ b/meshcore_mqtt/config.py @@ -111,6 +111,18 @@ class MeshCoreConfig(BaseModel): default=True, description="Reset routing path after max retries and try once more", ) + message_initial_delay: float = Field( + default=5.0, + ge=0.0, + le=60.0, + description="Initial delay in seconds before sending the first message", + ) + message_send_delay: float = Field( + default=10.0, + ge=0.0, + le=60.0, + description="Delay in seconds between consecutive message sends", + ) @field_validator("port") @classmethod @@ -260,6 +272,10 @@ class Config(BaseModel): "MESHCORE_RESET_PATH_ON_FAILURE", "true" ).lower() == "true", + message_initial_delay=float( + os.getenv("MESHCORE_MESSAGE_INITIAL_DELAY", "5.0") + ), + message_send_delay=float(os.getenv("MESHCORE_MESSAGE_SEND_DELAY", "10.0")), ) return cls( diff --git a/meshcore_mqtt/main.py b/meshcore_mqtt/main.py index 718ae79..30a92a4 100644 --- a/meshcore_mqtt/main.py +++ b/meshcore_mqtt/main.py @@ -163,6 +163,18 @@ def setup_logging(level: str) -> None: default=True, help="Reset routing path after max retries and try once more (default: enabled)", ) +@click.option( + "--meshcore-message-initial-delay", + type=click.FloatRange(0.0, 60.0), + default=5.0, + help="Initial delay in seconds before sending the first message (default: 5.0)", +) +@click.option( + "--meshcore-message-send-delay", + type=click.FloatRange(0.0, 60.0), + default=10.0, + help="Delay in seconds between consecutive message sends (default: 10.0)", +) @click.option( "--log-level", type=click.Choice(["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"]), @@ -198,6 +210,8 @@ def main( meshcore_message_retry_count: int, meshcore_message_retry_delay: float, meshcore_reset_path_on_failure: bool, + meshcore_message_initial_delay: float, + meshcore_message_send_delay: float, log_level: str, env: bool, ) -> None: @@ -264,6 +278,8 @@ def main( message_retry_count=meshcore_message_retry_count, message_retry_delay=meshcore_message_retry_delay, reset_path_on_failure=meshcore_reset_path_on_failure, + message_initial_delay=meshcore_message_initial_delay, + message_send_delay=meshcore_message_send_delay, ) config = Config( diff --git a/meshcore_mqtt/meshcore_worker.py b/meshcore_mqtt/meshcore_worker.py index f6b3eda..735912e 100644 --- a/meshcore_mqtt/meshcore_worker.py +++ b/meshcore_mqtt/meshcore_worker.py @@ -82,6 +82,11 @@ class MeshCoreWorker: self._startup_time = time.time() self._startup_grace_period = 5.0 # 5 seconds to ignore commands during startup + # Message rate limiting + self._message_queue: asyncio.Queue[dict] = asyncio.Queue() + self._last_message_time: Optional[float] = None + self._rate_limit_task: Optional[asyncio.Task[None]] = None + async def start(self) -> None: """Start the MeshCore worker.""" if self._running: @@ -109,6 +114,9 @@ class MeshCoreWorker: asyncio.create_task( self._auto_fetch_monitor(), name="meshcore_autofetch" ), + asyncio.create_task( + self._message_rate_limiter(), name="meshcore_rate_limiter" + ), ] self._tasks.extend(tasks) @@ -339,7 +347,9 @@ class MeshCoreWorker: "send_msg requires 'destination' and 'message' fields" ) return - result = await self._send_msg_with_retry(destination, msg_text) + result = await self._queue_rate_limited_command( + command_type, command_data + ) elif command_type == "device_query": result = await self.meshcore.commands.send_device_query() @@ -362,7 +372,9 @@ class MeshCoreWorker: "send_chan_msg requires 'channel' and 'message' fields" ) return - result = await self._send_chan_msg_with_retry(channel, msg_text) + result = await self._queue_rate_limited_command( + command_type, command_data + ) elif command_type == "send_advert": flood = command_data.get("flood", False) @@ -570,6 +582,147 @@ class MeshCoreWorker: self.logger.error(f"Error in auto-fetch monitor: {e}") await asyncio.sleep(60) + async def _message_rate_limiter(self) -> None: + """Rate-limited message processor to prevent network flooding.""" + self.logger.info("Starting message rate limiter") + + while self._running: + try: + # Get next message from the queue (with timeout to allow shutdown) + try: + message_data = await asyncio.wait_for( + self._message_queue.get(), timeout=1.0 + ) + except asyncio.TimeoutError: + continue + + # Apply rate limiting delays + current_time = time.time() + + # Apply initial delay for the first message + if self._last_message_time is None: + if self.config.meshcore.message_initial_delay > 0: + self.logger.debug( + f"Applying initial delay of " + f"{self.config.meshcore.message_initial_delay}s " + f"before first message" + ) + await asyncio.sleep(self.config.meshcore.message_initial_delay) + else: + # Apply delay between consecutive messages + time_since_last = current_time - self._last_message_time + required_delay = self.config.meshcore.message_send_delay + + if time_since_last < required_delay: + sleep_time = required_delay - time_since_last + self.logger.debug( + f"Rate limiting: waiting {sleep_time:.1f}s " + f"before next message" + ) + await asyncio.sleep(sleep_time) + + # Send the message + await self._execute_rate_limited_message(message_data) + self._last_message_time = time.time() + + except Exception as e: + self.logger.error(f"Error in message rate limiter: {e}") + await asyncio.sleep(1) + + async def _queue_rate_limited_command( + self, command_type: str, command_data: dict + ) -> Any: + """Queue a command for rate-limited execution.""" + self.logger.info(f"Queueing rate-limited command: {command_type}") + + # Create a future to track the result + future: asyncio.Future[Any] = asyncio.Future() + + # Prepare message data for the queue + message_data = { + "command_type": command_type, + "future": future, + **command_data, # Include all command data + } + + # Add to the rate-limiting queue + await self._message_queue.put(message_data) + + # Wait for the result + try: + result = await future + return result + except Exception as e: + self.logger.error(f"Rate-limited command '{command_type}' failed: {e}") + raise + + async def _execute_rate_limited_message(self, message_data: dict) -> None: + """Execute a rate-limited message send operation.""" + command_type = message_data.get("command_type") + future: Optional[asyncio.Future[Any]] = message_data.get("future") + + try: + result = None + + if not self.meshcore: + raise RuntimeError("MeshCore not initialized") + + if command_type == "send_msg": + destination = message_data.get("destination") + msg_text = message_data.get("message", "") + if not isinstance(destination, str) or not isinstance(msg_text, str): + raise ValueError("Invalid destination or message type") + result = await self._send_msg_with_retry(destination, msg_text) + + elif command_type == "send_chan_msg": + channel = message_data.get("channel") + msg_text = message_data.get("message", "") + if not isinstance(channel, int) or not isinstance(msg_text, str): + raise ValueError("Invalid channel or message type") + result = await self._send_chan_msg_with_retry(channel, msg_text) + + elif command_type == "device_query": + result = await self.meshcore.commands.send_device_query() + + elif command_type == "get_battery": + result = await self.meshcore.commands.get_bat() + + elif command_type == "set_name": + name = message_data.get("name", "") + if not isinstance(name, str): + raise ValueError("Invalid name type") + result = await self.meshcore.commands.set_name(name) + + elif command_type == "send_advert": + flood = message_data.get("flood", False) + result = await self.meshcore.commands.send_advert(flood=flood) + + elif command_type == "send_trace": + auth_code = message_data.get("auth_code", 0) + tag = message_data.get("tag") + flags = message_data.get("flags", 0) + path = message_data.get("path") + result = await self.meshcore.commands.send_trace( + auth_code=auth_code, tag=tag, flags=flags, path=path + ) + + elif command_type == "send_telemetry_req": + destination = message_data.get("destination") + if not isinstance(destination, str): + raise ValueError("Invalid destination type") + result = await self.meshcore.commands.send_telemetry_req(destination) + + # Set the result on the future + if future and not future.done(): + future.set_result(result) + + except Exception as e: + self.logger.error( + f"Error executing rate-limited command '{command_type}': {e}" + ) + if future and not future.done(): + future.set_exception(e) + async def _recover_connection(self) -> None: """Recover MeshCore connection.""" if self._reconnect_attempts >= self._max_reconnect_attempts: diff --git a/tests/test_rate_limiting.py b/tests/test_rate_limiting.py new file mode 100644 index 0000000..d5fa6f0 --- /dev/null +++ b/tests/test_rate_limiting.py @@ -0,0 +1,372 @@ +"""Tests for MeshCore worker rate limiting functionality.""" + +import asyncio +import time +from typing import Any +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from meshcore_mqtt.config import Config, ConnectionType, MeshCoreConfig, MQTTConfig +from meshcore_mqtt.meshcore_worker import MeshCoreWorker + + +@pytest.fixture +def test_config() -> Config: + """Create a test configuration with rate limiting settings.""" + return Config( + mqtt=MQTTConfig(broker="localhost"), + meshcore=MeshCoreConfig( + connection_type=ConnectionType.TCP, + address="127.0.0.1", + port=12345, + message_initial_delay=0.1, # 100ms for faster tests + message_send_delay=0.2, # 200ms for faster tests + ), + ) + + +@pytest.fixture +def test_config_zero_delays() -> Config: + """Create a test configuration with zero delays.""" + return Config( + mqtt=MQTTConfig(broker="localhost"), + meshcore=MeshCoreConfig( + connection_type=ConnectionType.TCP, + address="127.0.0.1", + port=12345, + message_initial_delay=0.0, + message_send_delay=0.0, + ), + ) + + +@pytest.fixture +def mock_meshcore_worker(test_config: Config) -> MeshCoreWorker: + """Create a MeshCore worker with mocked MeshCore instance.""" + worker = MeshCoreWorker(test_config) + + # Mock the MeshCore instance + mock_meshcore = MagicMock() + mock_meshcore.commands = MagicMock() + mock_meshcore.commands.send_msg = AsyncMock(return_value=MagicMock()) + mock_meshcore.commands.send_chan_msg = AsyncMock(return_value=MagicMock()) + mock_meshcore.commands.send_device_query = AsyncMock(return_value=MagicMock()) + worker.meshcore = mock_meshcore + + return worker + + +class TestRateLimiting: + """Test message rate limiting functionality.""" + + async def test_message_initial_delay(self, mock_meshcore_worker: MeshCoreWorker) -> None: + """Test that initial delay is applied before the first message.""" + start_time = time.time() + + # Set worker to running state + mock_meshcore_worker._running = True + + # Queue a message + command_data = {"destination": "test", "message": "test"} + future = asyncio.Future() + message_data = { + "command_type": "send_msg", + "future": future, + **command_data + } + + await mock_meshcore_worker._message_queue.put(message_data) + + # Start the rate limiter + rate_limiter_task = asyncio.create_task(mock_meshcore_worker._message_rate_limiter()) + + # Wait for the message to be processed + try: + await asyncio.wait_for(future, timeout=1.0) + except asyncio.TimeoutError: + pass + finally: + mock_meshcore_worker._running = False # Stop the rate limiter + rate_limiter_task.cancel() + try: + await rate_limiter_task + except asyncio.CancelledError: + pass + + elapsed_time = time.time() - start_time + + # Should take at least the initial delay time + assert elapsed_time >= mock_meshcore_worker.config.meshcore.message_initial_delay + + async def test_message_send_delay(self, mock_meshcore_worker: MeshCoreWorker) -> None: + """Test that delay is applied between consecutive messages.""" + # Set worker to running state + mock_meshcore_worker._running = True + + # Set the last message time to simulate a previous message + mock_meshcore_worker._last_message_time = time.time() + + start_time = time.time() + + # Queue a message + command_data = {"destination": "test", "message": "test"} + future = asyncio.Future() + message_data = { + "command_type": "send_msg", + "future": future, + **command_data + } + + await mock_meshcore_worker._message_queue.put(message_data) + + # Start the rate limiter + rate_limiter_task = asyncio.create_task(mock_meshcore_worker._message_rate_limiter()) + + # Wait for the message to be processed + try: + await asyncio.wait_for(future, timeout=1.0) + except asyncio.TimeoutError: + pass + finally: + mock_meshcore_worker._running = False # Stop the rate limiter + rate_limiter_task.cancel() + try: + await rate_limiter_task + except asyncio.CancelledError: + pass + + elapsed_time = time.time() - start_time + + # Should take at least the send delay time + assert elapsed_time >= mock_meshcore_worker.config.meshcore.message_send_delay + + async def test_no_delay_with_zero_config(self, test_config_zero_delays: Config) -> None: + """Test that no delays are applied when configured to zero.""" + worker = MeshCoreWorker(test_config_zero_delays) + + # Mock the MeshCore instance + mock_meshcore = MagicMock() + mock_meshcore.commands = MagicMock() + mock_meshcore.commands.send_msg = AsyncMock(return_value=MagicMock()) + worker.meshcore = mock_meshcore + + # Set worker to running state + worker._running = True + + start_time = time.time() + + # Queue a message + command_data = {"destination": "test", "message": "test"} + future = asyncio.Future() + message_data = { + "command_type": "send_msg", + "future": future, + **command_data + } + + await worker._message_queue.put(message_data) + + # Start the rate limiter + rate_limiter_task = asyncio.create_task(worker._message_rate_limiter()) + + # Wait for the message to be processed + try: + await asyncio.wait_for(future, timeout=0.5) + except asyncio.TimeoutError: + pass + finally: + worker._running = False # Stop the rate limiter + rate_limiter_task.cancel() + try: + await rate_limiter_task + except asyncio.CancelledError: + pass + + elapsed_time = time.time() - start_time + + # Should complete quickly with no delays + assert elapsed_time < 0.1 + + async def test_rate_limited_command_execution(self, mock_meshcore_worker: MeshCoreWorker) -> None: + """Test that rate-limited commands are executed correctly.""" + # Test send_msg command + result = await mock_meshcore_worker._execute_rate_limited_message({ + "command_type": "send_msg", + "destination": "test_user", + "message": "Hello", + "future": None + }) + + # Verify the command was called + mock_meshcore_worker.meshcore.commands.send_msg.assert_called_once_with("test_user", "Hello") + + # Test send_chan_msg command + await mock_meshcore_worker._execute_rate_limited_message({ + "command_type": "send_chan_msg", + "channel": 0, + "message": "Hello channel", + "future": None + }) + + # Verify the command was called + mock_meshcore_worker.meshcore.commands.send_chan_msg.assert_called_once_with(0, "Hello channel") + + async def test_queue_rate_limited_command(self, mock_meshcore_worker: MeshCoreWorker) -> None: + """Test queuing and execution of rate-limited commands.""" + # Set worker to running state + mock_meshcore_worker._running = True + + # Start the rate limiter in the background + rate_limiter_task = asyncio.create_task(mock_meshcore_worker._message_rate_limiter()) + + try: + # Queue a command + command_data = {"destination": "test_user", "message": "Hello"} + result_task = asyncio.create_task( + mock_meshcore_worker._queue_rate_limited_command("send_msg", command_data) + ) + + # Wait for completion + result = await asyncio.wait_for(result_task, timeout=2.0) + + # Verify the command was executed + mock_meshcore_worker.meshcore.commands.send_msg.assert_called_once_with("test_user", "Hello") + + finally: + mock_meshcore_worker._running = False # Stop the rate limiter + rate_limiter_task.cancel() + try: + await rate_limiter_task + except asyncio.CancelledError: + pass + + async def test_multiple_messages_rate_limiting(self, mock_meshcore_worker: MeshCoreWorker) -> None: + """Test that multiple messages are properly rate limited.""" + # Set worker to running state + mock_meshcore_worker._running = True + + # Start the rate limiter + rate_limiter_task = asyncio.create_task(mock_meshcore_worker._message_rate_limiter()) + + try: + start_time = time.time() + + # Queue multiple messages + tasks = [] + for i in range(3): + command_data = {"destination": f"user{i}", "message": f"Message {i}"} + task = asyncio.create_task( + mock_meshcore_worker._queue_rate_limited_command("send_msg", command_data) + ) + tasks.append(task) + + # Wait for all messages to complete + await asyncio.gather(*tasks) + + elapsed_time = time.time() - start_time + + # Should take at least initial_delay + 2 * send_delay for 3 messages + expected_min_time = ( + mock_meshcore_worker.config.meshcore.message_initial_delay + + 2 * mock_meshcore_worker.config.meshcore.message_send_delay + ) + assert elapsed_time >= expected_min_time + + # Verify all commands were executed + assert mock_meshcore_worker.meshcore.commands.send_msg.call_count == 3 + + finally: + mock_meshcore_worker._running = False # Stop the rate limiter + rate_limiter_task.cancel() + try: + await rate_limiter_task + except asyncio.CancelledError: + pass + + async def test_rate_limiter_error_handling(self, mock_meshcore_worker: MeshCoreWorker) -> None: + """Test error handling in rate-limited message execution.""" + # Set worker to running state + mock_meshcore_worker._running = True + + # Make the mock command raise an exception - but we need to bypass the retry logic + # So we'll make it raise an exception in _execute_rate_limited_message directly + original_execute = mock_meshcore_worker._execute_rate_limited_message + + async def mock_execute(message_data): + future = message_data.get("future") + if future and not future.done(): + future.set_exception(Exception("Test error")) + + mock_meshcore_worker._execute_rate_limited_message = mock_execute + + # Start the rate limiter + rate_limiter_task = asyncio.create_task(mock_meshcore_worker._message_rate_limiter()) + + try: + # Queue a command that will fail + command_data = {"destination": "test_user", "message": "Hello"} + + with pytest.raises(Exception, match="Test error"): + await asyncio.wait_for( + mock_meshcore_worker._queue_rate_limited_command("send_msg", command_data), + timeout=2.0 + ) + + finally: + mock_meshcore_worker._running = False # Stop the rate limiter + mock_meshcore_worker._execute_rate_limited_message = original_execute # Restore original + rate_limiter_task.cancel() + try: + await rate_limiter_task + except asyncio.CancelledError: + pass + + async def test_configuration_validation(self) -> None: + """Test that rate limiting configuration is properly validated.""" + # Test valid configuration + config = Config( + mqtt=MQTTConfig(broker="localhost"), + meshcore=MeshCoreConfig( + connection_type=ConnectionType.TCP, + address="127.0.0.1", + message_initial_delay=5.0, + message_send_delay=10.0, + ), + ) + assert config.meshcore.message_initial_delay == 5.0 + assert config.meshcore.message_send_delay == 10.0 + + # Test that values are within valid range + config = Config( + mqtt=MQTTConfig(broker="localhost"), + meshcore=MeshCoreConfig( + connection_type=ConnectionType.TCP, + address="127.0.0.1", + message_initial_delay=0.0, # Minimum + message_send_delay=60.0, # Maximum + ), + ) + assert config.meshcore.message_initial_delay == 0.0 + assert config.meshcore.message_send_delay == 60.0 + + # Test that invalid values raise validation errors + with pytest.raises(Exception): + Config( + mqtt=MQTTConfig(broker="localhost"), + meshcore=MeshCoreConfig( + connection_type=ConnectionType.TCP, + address="127.0.0.1", + message_initial_delay=-1.0, # Invalid: negative + ), + ) + + with pytest.raises(Exception): + Config( + mqtt=MQTTConfig(broker="localhost"), + meshcore=MeshCoreConfig( + connection_type=ConnectionType.TCP, + address="127.0.0.1", + message_send_delay=61.0, # Invalid: too large + ), + ) \ No newline at end of file