20 KiB
MeshCore MQTT Bridge - OpenCode Context
Instructions for OpenCode
- Run
pre-commit run --all-filesto ensure code quality and tests pass after making changes. - Activate and use Python virtual environment located at
./venvbefore any development or running commands.
Project Overview
This project is a MeshCore MQTT Bridge - a robust Python application that bridges MeshCore mesh networking devices to MQTT brokers, enabling seamless integration with IoT platforms and message processing systems.
Key Features
- Inbox/Outbox Architecture: Independent MeshCore and MQTT workers with message bus coordination
- Multi-connection support: TCP, Serial, and BLE connections to MeshCore devices
- TLS/SSL Support: Secure MQTT connections with configurable certificates
- Flexible configuration: JSON/YAML files, environment variables, CLI arguments
- Configurable event system: Subscribe to specific MeshCore event types
- Robust MQTT integration: Authentication, QoS, retention, auto-reconnection
- Message rate limiting: Configurable rate limiting to prevent network flooding and ensure reliable message delivery
- Health monitoring: Built-in health checks and automatic recovery for both workers
- Async architecture: Built with Python asyncio for high performance
- Type safety: Full type annotations with mypy support
- Comprehensive testing: 70+ tests with pytest and pytest-asyncio including rate limiting tests
- Streamlined CI/CD: Single pre-commit based CI workflow with automated quality checks
Architecture
The bridge uses an Inbox/Outbox Architecture with independent workers coordinated by a message bus.
Core Components
-
Bridge Coordinator (
meshcore_mqtt/bridge_coordinator.py)- Coordinates independent MeshCore and MQTT workers
- Manages shared message bus for inter-worker communication
- Provides health monitoring and system statistics
- Handles graceful startup and shutdown
-
Message Bus System (
meshcore_mqtt/message_queue.py)- Async message queue system for worker communication
- Thread-safe inbox/outbox pattern with asyncio.Queue
- Component status tracking and health monitoring
- Message types: MESHCORE_EVENT, MQTT_COMMAND, STATUS, HEALTH_CHECK, SHUTDOWN
-
MeshCore Worker (
meshcore_mqtt/meshcore_worker.py)- Independent worker managing MeshCore device connection
- Handles device commands forwarded from MQTT worker with rate limiting
- Auto-reconnection with exponential backoff and health monitoring
- Forwards MeshCore events to MQTT worker via message bus
- Manages auto-fetch restart after NO_MORE_MSGS events
- Configurable message rate limiting to prevent network flooding
-
MQTT Worker (
meshcore_mqtt/mqtt_worker.py)- Independent worker managing MQTT broker connection
- Subscribes to command topics and publishes events/status
- TLS/SSL support with configurable certificates
- Auto-reconnection with complete client recreation
- Forwards MQTT commands to MeshCore worker via message bus
-
Configuration System (
meshcore_mqtt/config.py)- Pydantic-based configuration with validation
- Support for multiple input methods (file, env, CLI)
- Event type validation and normalization
- TLS/SSL configuration validation
-
CLI Interface (
meshcore_mqtt/main.py)- Click-based command line interface
- Logging configuration for third-party libraries
- Configuration loading with precedence handling
Event System
The bridge supports configurable event subscriptions:
Default Events:
CONTACT_MSG_RECV,CHANNEL_MSG_RECV(messages)DEVICE_INFO,BATTERY,NEW_CONTACT(device info)ADVERTISEMENT,TRACE_DATA(network diagnostics)TELEMETRY_RESPONSE,CHANNEL_INFO(device details)
Additional Events:
CONNECTED,DISCONNECTED(connection status, can be noisy)LOGIN_SUCCESS,LOGIN_FAILED(authentication)MESSAGES_WAITING(notifications)CONTACTS,SELF_INFO(contact and device information)
Auto-Fetch Restart Feature
The bridge automatically handles NO_MORE_MSGS events from MeshCore by restarting the auto-fetch mechanism after a configurable delay:
- Purpose: Prevents message fetching from stopping when MeshCore reports no more messages
- Configuration:
auto_fetch_restart_delay(1-60 seconds, default: 5) - Behavior: When
NO_MORE_MSGSis received, waits the configured delay then restarts auto-fetching - Environment Variable:
MESHCORE_AUTO_FETCH_RESTART_DELAY=10 - CLI Argument:
--meshcore-auto-fetch-restart-delay 10
Message Rate Limiting Feature
The bridge includes configurable message rate limiting to prevent network flooding and ensure reliable message delivery:
- Purpose: Prevents overwhelming the MeshCore device with rapid message sending commands
- Initial Delay:
message_initial_delay(0.0-60.0 seconds, default: 15.0) - delay before sending the first message - Send Delay:
message_send_delay(0.0-60.0 seconds, default: 15.0) - delay between consecutive message sends - Behavior: Messages are queued and sent with appropriate delays using an async rate-limiting system
- Environment Variables:
MESHCORE_MESSAGE_INITIAL_DELAY=15.0MESHCORE_MESSAGE_SEND_DELAY=15.0
- CLI Arguments:
--meshcore-message-initial-delay 15.0--meshcore-message-send-delay 15.0
Guest Password Support for Telemetry Requests
The bridge supports requesting telemetry from repeaters that require guest password authentication:
Two-Step Authentication Process:
- Login: Use
send_logincommand with destination and password - Request Telemetry: Use
send_telemetry_reqcommand (with or without password)
Automatic Login Feature:
- When
send_telemetry_reqincludes apasswordfield, the bridge automatically:- Logs into the repeater using
send_login(destination, password) - Then requests telemetry using
send_telemetry_req(destination)
- Logs into the repeater using
- This simplifies the workflow for one-time telemetry requests
Manual Session Management: For more control over the authentication session:
- Use
send_loginto establish the session - Use
send_telemetry_req(without password) for multiple requests - Use
send_logoffto terminate the session when done
Use Cases:
- Automatic: Single telemetry request with password
- Manual: Multiple telemetry requests during one session
- Session Control: Explicit login/logout for resource management
MQTT Topics
The bridge publishes to structured MQTT topics:
{prefix}/message/channel/{channel_idx}- Channel messages (public channels){prefix}/message/direct/{pubkey_prefix}- Direct messages (private messages){prefix}/status- Connection status{prefix}/advertisement- Device advertisements{prefix}/battery- Battery updates{prefix}/device_info- Device information{prefix}/new_contact- Contact discovery{prefix}/login- Authentication status{prefix}/command/{type}- Commands (subscribed)
Message Topic Structure:
- For channel messages:
{prefix}/message/channel/{channel_idx}wherechannel_idxis the numeric channel identifier - For direct messages:
{prefix}/message/direct/{pubkey_prefix}wherepubkey_prefixis the sender's 6-byte public key prefix (hex encoded) - This allows subscribers to:
- Subscribe to all messages:
{prefix}/message/+/+ - Subscribe to all channel messages:
{prefix}/message/channel/+ - Subscribe to specific channel:
{prefix}/message/channel/0 - Subscribe to all direct messages:
{prefix}/message/direct/+ - Subscribe to messages from specific node:
{prefix}/message/direct/{specific_pubkey_prefix}
- Subscribe to all messages:
MQTT Command System
The bridge supports bidirectional communication via MQTT commands. Send commands to {prefix}/command/{command_type} with JSON payloads:
Available Commands (implemented in meshcore_worker.py:_handle_mqtt_command):
| Command | Description | Required Fields | MeshCore Method |
|---|---|---|---|
send_msg |
Send direct message | destination, message |
meshcore.commands.send_msg() |
send_chan_msg |
Send channel message | channel, message |
meshcore.commands.send_chan_msg() |
device_query |
Query device information | None | meshcore.commands.send_device_query() |
get_battery |
Get battery status | None | meshcore.commands.get_bat() |
set_name |
Set device name | name |
meshcore.commands.set_name() |
send_advert |
Send device advertisement | None (optional: flood) |
meshcore.commands.send_advert() |
send_trace |
Send trace packet for routing diagnostics | None (optional: auth_code, tag, flags, path) |
meshcore.commands.send_trace() |
send_telemetry_req |
Request telemetry data from a node | destination (optional: password) |
meshcore.commands.send_telemetry_req() |
send_login |
Login to a repeater with guest password | destination, password |
meshcore.commands.send_login() |
send_logoff |
Logoff from a repeater | destination |
meshcore.commands.send_logoff() |
Command Examples:
// Send direct message
{"destination": "node_id_or_contact_name", "message": "Hello!"}
// Send channel message
{"channel": 0, "message": "Hello channel!"}
// Device query
{}
// Get battery
{}
// Set device name
{"name": "MyDevice"}
// Send advertisement
{}
// Send advertisement with flood
{"flood": true}
// Send trace packet (basic)
{}
// Send trace packet with parameters
{"auth_code": 12345, "tag": 67890, "flags": 1, "path": "23,5f,3a"}
// Request telemetry data (without password)
{"destination": "node123"}
// Request telemetry data (with guest password)
{"destination": "repeater_node", "password": "guest_password"}
// Login to repeater with guest password
{"destination": "repeater_node", "password": "guest_password"}
// Logoff from repeater
{"destination": "repeater_node"}
Command Examples:
# Send direct message
mosquitto_pub -h localhost -t "meshcore/command/send_msg" \
-m '{"destination": "Alice", "message": "Hello Alice!"}'
# Send channel message
mosquitto_pub -h localhost -t "meshcore/command/send_chan_msg" \
-m '{"channel": 0, "message": "Hello everyone on channel 0!"}'
# Get device info
mosquitto_pub -h localhost -t "meshcore/command/device_query" -m '{}'
# Get battery status
mosquitto_pub -h localhost -t "meshcore/command/get_battery" -m '{}'
# Set device name
mosquitto_pub -h localhost -t "meshcore/command/set_name" \
-m '{"name": "MyMeshDevice"}'
# Send device advertisement
mosquitto_pub -h localhost -t "meshcore/command/send_advert" -m '{}'
# Send device advertisement with flood
mosquitto_pub -h localhost -t "meshcore/command/send_advert" \
-m '{"flood": true}'
# Send trace packet (basic)
mosquitto_pub -h localhost -t "meshcore/command/send_trace" -m '{}'
# Send trace packet with routing path
mosquitto_pub -h localhost -t "meshcore/command/send_trace" \
-m '{"auth_code": 12345, "path": "23,5f,3a"}'
# Request telemetry data from a node
mosquitto_pub -h localhost -t "meshcore/command/send_telemetry_req" \
-m '{"destination": "node123"}'
# Request telemetry data from repeater with guest password
mosquitto_pub -h localhost -t "meshcore/command/send_telemetry_req" \
-m '{"destination": "repeater_node", "password": "guest_password"}'
# Login to repeater with guest password
mosquitto_pub -h localhost -t "meshcore/command/send_login" \
-m '{"destination": "repeater_node", "password": "guest_password"}'
# Logoff from repeater
mosquitto_pub -h localhost -t "meshcore/command/send_logoff" \
-m '{"destination": "repeater_node"}'
Development Guidelines
Code Quality Tools
The project uses these tools (configured in pyproject.toml and .pre-commit-config.yaml):
- Black: Code formatting (line length: 88)
- Flake8: Linting with custom rules
- MyPy: Type checking with strict settings
- isort: Import sorting and organization
- Pytest: Testing with asyncio support
- Pre-commit: Automated code quality checks and hooks
- Streamlined CI: Single pre-commit based CI workflow replacing multiple separate workflows
Testing Strategy
Test Structure:
tests/test_config.py- Configuration system (18 tests)tests/test_bridge.py- Bridge coordinator functionality (10 tests)tests/test_configurable_events.py- Event configuration (13 tests)tests/test_json_serialization.py- JSON handling (10 tests)tests/test_logging.py- Logging configuration (6 tests)tests/test_rate_limiting.py- Message rate limiting functionality (9 tests)
Key Test Areas:
- Configuration validation and loading
- Worker coordination and message bus functionality
- Event handler mapping and subscription
- JSON serialization edge cases
- MQTT topic generation and command handling
- Message rate limiting and queue management
- Health monitoring and recovery mechanisms
- Logging setup and third-party library control
Architecture Benefits
Inbox/Outbox Pattern:
- Resilience: Workers operate independently, one can fail without affecting the other
- Scalability: Easy to add new workers or modify existing ones
- Testability: Each worker can be tested in isolation
- Monitoring: Built-in health checks and statistics for each component
- Recovery: Automatic reconnection and error recovery for both workers
Message Bus Design:
- Thread-safe: Uses asyncio.Queue for safe async operations
- Typed messages: Structured message types with validation
- Component tracking: Status monitoring for all registered components
- Graceful shutdown: Coordinated shutdown with cleanup
Running Commands
Development Setup:
python -m venv venv
source venv/bin/activate
pip install -r requirements-dev.txt
pre-commit install
Testing:
pytest # Run all tests
pytest -v # Verbose output
pytest --cov=meshcore_mqtt # With coverage
pytest tests/test_config.py # Specific test file
Code Quality:
black meshcore_mqtt/ tests/ # Format code
flake8 meshcore_mqtt/ tests/ # Lint code
mypy meshcore_mqtt/ tests/ # Type check
pre-commit run --all-files # Run all checks
Running the Application:
# With config file
python -m meshcore_mqtt.main --config-file config.yaml
# With CLI arguments
python -m meshcore_mqtt.main \
--mqtt-broker localhost \
--meshcore-connection tcp \
--meshcore-address 192.168.1.100 \
--meshcore-events "CONNECTED,BATTERY,ADVERTISEMENT"
# With environment variables
python -m meshcore_mqtt.main --env
Recent Development History
Major Features Implemented
-
Message Rate Limiting (Latest)
- Configurable rate limiting for message sending commands
- Initial delay and send delay configuration options
- Async message queue with proper timing controls
- Comprehensive test coverage for rate limiting functionality
-
Streamlined CI/CD Pipeline
- Consolidated multiple CI workflows into single pre-commit based pipeline
- Reduced from 3 separate workflows (ci.yml, test.yml, code-quality.yml) to 1
- Improved maintainability and faster CI execution
- Full integration with existing pre-commit configuration
-
Inbox/Outbox Architecture
- Complete restructure to independent worker pattern
- Message bus system for inter-worker communication
- Enhanced health monitoring and recovery
- Improved resilience and testability
-
TLS/SSL Support
- Secure MQTT connections with configurable certificates
- Support for custom CA, client certificates, and private keys
- Optional certificate verification bypass for testing
-
JSON Serialization
- Robust JSON serialization handling all Python data types
- Fallback mechanisms for complex objects
- Comprehensive error handling and validation
-
Configurable Events
- Made MeshCore event types configurable
- Support for config files, env vars, and CLI args
- Case-insensitive event parsing with validation
- Enhanced logging configuration for third-party libraries
Code Patterns
Worker Message Handling:
async def _handle_inbox_message(self, message: Message) -> None:
"""Handle messages from the inbox."""
if message.message_type == MessageType.MQTT_COMMAND:
await self._handle_mqtt_command(message)
elif message.message_type == MessageType.MESHCORE_EVENT:
await self._handle_meshcore_event(message)
Event Forwarding Pattern:
def _on_meshcore_event(self, event_data: Any) -> None:
"""Forward MeshCore events to MQTT worker."""
message = Message.create(
message_type=MessageType.MESHCORE_EVENT,
source=self.component_name,
target="mqtt",
payload={"event_data": event_data, "timestamp": time.time()}
)
asyncio.create_task(self.message_bus.send_message(message))
Rate Limited Command Execution:
async def _queue_rate_limited_command(
self, command_type: str, command_data: dict
) -> Any:
"""Queue a command for rate-limited execution."""
future: asyncio.Future[Any] = asyncio.Future()
message_data = {
"command_type": command_type,
"future": future,
**command_data,
}
await self._message_queue.put(message_data)
return await future
Configuration Validation:
@field_validator("field_name")
@classmethod
def validate_field(cls, v: Type) -> Type:
"""Validate field with custom logic."""
# Validation logic here
return v
Important Notes for OpenCode
When Making Changes
- Always run tests after changes:
pytest -v - Follow worker patterns for new functionality
- Update message bus if adding new message types
- Use type hints for all new code
- Handle errors gracefully with proper logging
- Test worker isolation and message passing
Configuration Precedence
- Command-line arguments (highest)
- Configuration file
- Environment variables (lowest)
Worker Development Guidelines
- Message Handling: Use inbox/outbox pattern for all inter-worker communication
- Health Monitoring: Implement health checks in
_perform_health_check() - Recovery Logic: Add automatic reconnection with exponential backoff
- Status Updates: Send status updates via message bus
- Graceful Shutdown: Handle shutdown messages and clean up resources
Command Implementation
- Add new commands to
meshcore_worker.py:_handle_mqtt_command() - Validate required fields before calling MeshCore methods
- Handle command results and errors appropriately
- Update activity timestamp on successful operations
Testing Requirements
- Test worker components in isolation
- Test message bus communication
- Test configuration validation
- Test error conditions and recovery scenarios
- Test health monitoring and status updates
- Maintain high test coverage
Logging Best Practices
- Use structured logging with proper levels
- Configure third-party library logging appropriately
- Provide meaningful log messages for debugging
- Use
self.loggerinstance for consistency - Log worker status changes and health events
This project demonstrates modern Python development practices with async programming, inbox/outbox architecture, comprehensive testing, and robust error handling. The codebase is well-structured and maintainable, with clear separation of concerns, independent workers, and extensive documentation.
Key Architectural Decisions
- Inbox/Outbox Pattern: Ensures worker independence and resilience
- Message Bus: Provides typed, thread-safe communication between components
- Health Monitoring: Built-in health checks and automatic recovery
- TLS/SSL Support: Secure MQTT connections for production deployments
- Async-First Design: All I/O operations are asynchronous for maximum performance
- Type Safety: Comprehensive type annotations with mypy validation
- Configuration Flexibility: Multiple input methods with proper validation