diff --git a/DEPLOYMENT.md b/DEPLOYMENT.md new file mode 100644 index 0000000..7f5c1d8 --- /dev/null +++ b/DEPLOYMENT.md @@ -0,0 +1,52 @@ +# Akita Meshtastic Meshcore Bridge Deployment Guide + +## Prerequisites +- Python 3.9+ (recommended: 3.11+) +- All hardware connected and serial port identified (e.g., COM8) +- MQTT broker (if using MQTT transport) +- (Optional) FastAPI/uvicorn for REST API + +## Installation +1. Clone the repository: + ```sh + git clone https://github.com/AkitaEngineering/Akita-Meshtastic-Meshcore-Bridge.git + cd Akita-Meshtastic-Meshcore-Bridge + ``` +2. Create and activate a virtual environment: + ```sh + python -m venv .venv + # Windows: + .venv\Scripts\activate + # Linux/macOS: + source .venv/bin/activate + ``` +3. Install dependencies: + ```sh + pip install -r requirements.txt + ``` + +## Configuration +- Copy and edit `examples/config.ini.example` to `config.ini`. +- Set the correct serial port, baud rate, MQTT, and API settings as needed. +- Ensure the `SERIAL_PROTOCOL` matches your device (`raw_serial` or `json_newline`). + +## Running the Bridge +```sh +python run_bridge_async.py +``` +- The bridge will log to console by default. Adjust logging in `config.ini` if needed. + +## Operational Notes +- Only one process can access the serial port at a time. +- To enable the REST API, set `api_enabled = true` in your config and ensure FastAPI/uvicorn are installed. +- For production, use a process manager (systemd, pm2, etc.) to keep the bridge running. +- Monitor logs for errors or disconnects. +- To update, pull the latest code and re-install requirements if needed. + +## Troubleshooting +- **Serial port access denied:** Ensure no other process is using the port. Reboot if needed. +- **No messages/events:** Check device connection and config. Enable debug logging for more details. +- **Unhandled exceptions:** Review logs. All async tasks now have robust exception handling and will log errors. + +## Support +- See README.md for more details and contact info. diff --git a/README.md b/README.md index f1fd7b0..354b9f8 100644 --- a/README.md +++ b/README.md @@ -51,21 +51,36 @@ Copy `examples/config.ini.example` to `config.ini` and edit it. - **For REST API (Optional):** Set `API_ENABLED = True` and configure `API_HOST` and `API_PORT` to enable the monitoring API. -### Run - python run_bridge.py + +### Run (Sync or Async) + +- **Synchronous (legacy):** + python run_bridge.py + +- **Async (recommended, for meshcore_py and async MQTT):** + python run_bridge_async.py + +The async entry point supports: + - Async Meshcore integration (meshcore_py) + - Async MQTT (asyncio-mqtt) + - Async REST API (FastAPI, if enabled) + +#### Async API Server +If `API_ENABLED = True` in your config, the async bridge will launch a FastAPI server for health, metrics, and control endpoints (see below). + ### REST API Endpoints (if enabled) -- `GET /api/health` - Get health status of all components -- `GET /api/metrics` - Get detailed metrics and statistics -- `GET /api/status` - Get combined health and metrics -- `GET /api/info` - Get bridge information -- `POST /api/control` - Control actions (e.g., reset metrics) +Endpoints are available on the configured API host/port (default: http://127.0.0.1:8080): + +- `GET /api/health` — Health status of all components +- `GET /api/metrics` — Detailed metrics and statistics +- `GET /api/status` — Combined health and metrics +- `GET /api/info` — Bridge information +- `POST /api/control` — Control actions (e.g., reset metrics) Example: -```bash -curl http://localhost:8080/api/health -curl http://localhost:8080/api/metrics -``` + curl http://localhost:8080/api/health + curl http://localhost:8080/api/metrics --- diff --git a/ammb/api_async.py b/ammb/api_async.py new file mode 100644 index 0000000..a6f555f --- /dev/null +++ b/ammb/api_async.py @@ -0,0 +1,53 @@ +# ammb/api_async.py +""" +Async REST API for monitoring and controlling the bridge using FastAPI. +""" +from fastapi import FastAPI, Request +from fastapi.responses import JSONResponse +import logging +from ammb.metrics import get_metrics +from ammb.health import get_health_monitor + +app = FastAPI() +logger = logging.getLogger(__name__) + +@app.get("/api/health") +async def health(): + health_monitor = get_health_monitor() + health_data = health_monitor.get_overall_health() + return JSONResponse(content=health_data) + +@app.get("/api/metrics") +async def metrics(): + metrics = get_metrics() + metrics_data = metrics.get_all_stats() + return JSONResponse(content=metrics_data) + +@app.get("/api/status") +async def status(): + health_monitor = get_health_monitor() + metrics = get_metrics() + status = { + "health": health_monitor.get_overall_health(), + "metrics": metrics.get_all_stats(), + } + return JSONResponse(content=status) + +@app.get("/api/info") +async def info(): + # This endpoint should be extended to include async bridge state if needed + return JSONResponse(content={ + "name": "Akita Meshtastic Meshcore Bridge", + "version": "1.0.0", + "external_transport": "async", + }) + +@app.post("/api/control") +async def control(request: Request): + data = await request.json() + action = data.get("action") + if action == "reset_metrics": + metrics = get_metrics() + metrics.reset() + return JSONResponse(content={"message": "Metrics reset"}) + return JSONResponse(content={"error": f"Unknown action: {action}"}, status_code=400) diff --git a/ammb/bridge_async.py b/ammb/bridge_async.py new file mode 100644 index 0000000..6f9c35b --- /dev/null +++ b/ammb/bridge_async.py @@ -0,0 +1,91 @@ +# ammb/bridge_async.py +""" +Async Bridge orchestrator using MeshcoreAsyncHandler. +""" + +import asyncio +import logging +from typing import Optional + +from ammb.meshcore_async_handler import MeshcoreAsyncHandler +from ammb.mqtt_async_handler import MQTTAsyncHandler +from ammb.config_handler import BridgeConfig + + + +class AsyncBridge: + def __init__(self, config: BridgeConfig): + self.logger = logging.getLogger(__name__) + self.config = config + self.meshcore_handler: Optional[MeshcoreAsyncHandler] = None + self.mqtt_handler: Optional[MQTTAsyncHandler] = None + self._running = False + + async def start(self): + self._running = True + try: + if self.config.external_transport == "serial": + self.meshcore_handler = MeshcoreAsyncHandler( + serial_port=self.config.serial_port, + baud=self.config.serial_baud or 115200, + debug=self.config.log_level == "DEBUG", + ) + try: + await self.meshcore_handler.run() + except Exception as e: + self.logger.error(f"Unhandled exception in meshcore handler: {e}", exc_info=True) + elif self.config.external_transport == "mqtt": + self.mqtt_handler = MQTTAsyncHandler( + broker=self.config.mqtt_broker, + port=self.config.mqtt_port, + topic_in=self.config.mqtt_topic_in, + topic_out=self.config.mqtt_topic_out, + username=self.config.mqtt_username, + password=self.config.mqtt_password, + qos=self.config.mqtt_qos or 0, + retain=self.config.mqtt_retain_out or False, + tls=self.config.mqtt_tls_enabled or False, + tls_ca_certs=self.config.mqtt_tls_ca_certs, + tls_insecure=self.config.mqtt_tls_insecure or False, + client_id=self.config.mqtt_client_id, + ) + try: + self.mqtt_handler.set_message_handler(self.handle_mqtt_message) + await self.mqtt_handler.connect() + await self.mqtt_handler.run() + except Exception as e: + self.logger.error(f"Unhandled exception in MQTT handler: {e}", exc_info=True) + else: + self.logger.error("Unsupported external transport in async bridge.") + except asyncio.CancelledError: + self.logger.info("AsyncBridge received cancellation signal.") + except Exception as e: + self.logger.critical(f"Unhandled exception in AsyncBridge: {e}", exc_info=True) + finally: + await self.shutdown() + + async def shutdown(self): + self.logger.info("Shutting down AsyncBridge and handlers...") + self._running = False + if self.meshcore_handler: + await self.meshcore_handler.disconnect() + if self.mqtt_handler: + await self.mqtt_handler.disconnect() + self.logger.info("AsyncBridge shutdown complete.") + + async def handle_incoming_message(self, event): + try: + self.logger.info(f"Received message: {event.payload}") + # Add additional async message processing here + except Exception as e: + self.logger.error(f"Error in handle_incoming_message: {e}", exc_info=True) + + def handle_mqtt_message(self, data): + try: + self.logger.info(f"Received MQTT message: {data}") + # Add additional async message processing here if needed + except Exception as e: + self.logger.error(f"Error in handle_mqtt_message: {e}", exc_info=True) + + def handle_mqtt_message(self, data): + self.logger.info(f"Received MQTT message: {data}") diff --git a/ammb/meshcore_async_handler.py b/ammb/meshcore_async_handler.py new file mode 100644 index 0000000..583d284 --- /dev/null +++ b/ammb/meshcore_async_handler.py @@ -0,0 +1,98 @@ +# ammb/meshcore_async_handler.py +""" +Async Meshcore handler using meshcore_py for serial communication. +""" + +import asyncio +import logging +from typing import Any, Dict, Optional, Callable +from meshcore import MeshCore, EventType + +class MeshcoreAsyncHandler: + """ + Async handler for Meshcore devices using meshcore_py. + """ + def __init__(self, serial_port: str, baud: int = 115200, debug: bool = False): + self.logger = logging.getLogger(__name__) + self.serial_port = serial_port + self.baud = baud + self.debug = debug + self.meshcore: Optional[MeshCore] = None + self._event_handlers: Dict[EventType, Callable] = {} + self._connected = asyncio.Event() + self._disconnect_requested = False + + async def connect(self): + self.logger.info(f"Connecting to Meshcore device on {self.serial_port}...") + self.meshcore = await MeshCore.create_serial(self.serial_port, self.baud, debug=self.debug) + self.meshcore.subscribe(EventType.CONNECTED, self._on_connected) + self.meshcore.subscribe(EventType.DISCONNECTED, self._on_disconnected) + self._connected.set() + self.logger.info("Meshcore device connected.") + + async def disconnect(self): + self._disconnect_requested = True + if self.meshcore: + await self.meshcore.disconnect() + self._connected.clear() + self.logger.info("Meshcore device disconnected.") + + async def send_message(self, contact_or_key: Any, message: str): + if not self.meshcore: + raise RuntimeError("Meshcore not connected.") + result = await self.meshcore.commands.send_msg(contact_or_key, message) + if result is None: + self.logger.error("send_msg returned None (no response from Meshcore).") + return False + if result.type == EventType.ERROR: + self.logger.error(f"Error sending message: {result.payload}") + return False + self.logger.info("Message sent successfully!") + return True + + def subscribe(self, event_type: EventType, handler: Callable): + if not self.meshcore: + raise RuntimeError("Meshcore not connected.") + # Wrap handler for centralized error logging + def safe_handler(event): + try: + if asyncio.iscoroutinefunction(handler): + asyncio.create_task(handler(event)) + else: + handler(event) + except Exception as e: + self.logger.error(f"Error in event handler for {event_type}: {e}", exc_info=True) + self.meshcore.subscribe(event_type, safe_handler) + self.logger.info(f"Subscribed to event: {event_type}") + + async def get_contacts(self): + if not self.meshcore: + raise RuntimeError("Meshcore not connected.") + result = await self.meshcore.commands.get_contacts() + if result is None: + self.logger.error("get_contacts returned None (no response from Meshcore).") + return None + if result.type == EventType.ERROR: + self.logger.error(f"Error getting contacts: {result.payload}") + return None + return result.payload + + async def run(self): + # Only connect once per run, and wait for disconnect request + try: + await self.connect() + while not self._disconnect_requested: + await asyncio.sleep(1) + except Exception as e: + self.logger.error(f"Unhandled exception in MeshcoreAsyncHandler.run: {e}", exc_info=True) + raise + finally: + # Ensure disconnect is called only once per run + await self.disconnect() + + def _on_connected(self, event): + self.logger.info(f"Connected: {event.payload}") + + def _on_disconnected(self, event): + self.logger.warning(f"Disconnected: {event.payload}") + self._connected.clear() diff --git a/ammb/mqtt_async_handler.py b/ammb/mqtt_async_handler.py new file mode 100644 index 0000000..c555190 --- /dev/null +++ b/ammb/mqtt_async_handler.py @@ -0,0 +1,76 @@ +# ammb/mqtt_async_handler.py +""" +Async MQTT handler using asyncio-mqtt for external network interface. +""" +import asyncio +import json +import logging +from typing import Any, Callable, Optional +from asyncio_mqtt import Client, MqttError + +class MQTTAsyncHandler: + def __init__(self, broker: str, port: int, topic_in: str, topic_out: str, username: Optional[str] = None, password: Optional[str] = None, qos: int = 0, retain: bool = False, tls: bool = False, tls_ca_certs: Optional[str] = None, tls_insecure: bool = False, client_id: Optional[str] = None): + self.logger = logging.getLogger(__name__) + self.broker = broker + self.port = port + self.topic_in = topic_in + self.topic_out = topic_out + self.username = username + self.password = password + self.qos = qos + self.retain = retain + self.tls = tls + self.tls_ca_certs = tls_ca_certs + self.tls_insecure = tls_insecure + self.client_id = client_id + self._client: Optional[Client] = None + self._message_handler: Optional[Callable[[dict], None]] = None + self._running = False + + async def connect(self): + self._client = Client(self.broker, port=self.port, username=self.username, password=self.password, client_id=self.client_id) + if self.tls: + tls_params = {} + if self.tls_ca_certs: + tls_params['ca_certs'] = self.tls_ca_certs + if self.tls_insecure: + tls_params['cert_reqs'] = False + self._client.tls_set(**tls_params) + await self._client.connect() + self.logger.info(f"Connected to MQTT broker {self.broker}:{self.port}") + + async def disconnect(self): + if self._client: + await self._client.disconnect() + self.logger.info("Disconnected from MQTT broker.") + + async def publish(self, message: dict): + if not self._client: + raise RuntimeError("MQTT client not connected.") + payload = json.dumps(message) + await self._client.publish(self.topic_out, payload, qos=self.qos, retain=self.retain) + self.logger.info(f"Published message to {self.topic_out}") + + def set_message_handler(self, handler: Callable[[dict], None]): + self._message_handler = handler + + async def run(self): + if not self._client: + await self.connect() + self._running = True + async with self._client.unfiltered_messages() as messages: + await self._client.subscribe(self.topic_in, qos=self.qos) + self.logger.info(f"Subscribed to {self.topic_in}") + async for msg in messages: + try: + payload = msg.payload.decode('utf-8') + data = json.loads(payload) + if self._message_handler: + self._message_handler(data) + except Exception as e: + self.logger.error(f"Error processing MQTT message: {e}") + if not self._running: + break + + def stop(self): + self._running = False diff --git a/requirements.txt b/requirements.txt index cfcc39a..b213b1a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,6 +2,10 @@ # Runtime dependencies for the Akita Meshtastic-Meshcore Bridge meshtastic>=2.2.0,<3.0.0 +meshcore>=2.2.0 +asyncio-mqtt>=0.14.0 +fastapi>=0.110.0 +uvicorn>=0.27.0 pyserial>=3.5,<4.0.0 pypubsub>=4.0.0,<5.0.0 paho-mqtt>=1.6.0,<2.0.0 diff --git a/run_bridge_async.py b/run_bridge_async.py new file mode 100644 index 0000000..35922e2 --- /dev/null +++ b/run_bridge_async.py @@ -0,0 +1,67 @@ +#!/usr/bin/env python3 +# run_bridge_async.py +""" +Async entry point for Akita Meshtastic Meshcore Bridge using meshcore_py. +""" +import logging +import os +import sys +import asyncio + +project_root = os.path.dirname(os.path.abspath(__file__)) +sys.path.insert(0, project_root) + +try: + from ammb.config_handler import CONFIG_FILE, load_config + from ammb.utils import setup_logging + from ammb.bridge_async import AsyncBridge +except ImportError as e: + print(f"ERROR: Failed to import AMMB modules: {e}", file=sys.stderr) + sys.exit(1) + + +if __name__ == "__main__": + logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") + logging.info("--- Akita Meshtastic Meshcore Bridge (Async) Starting ---") + + config_path = os.path.join(project_root, CONFIG_FILE) + logging.info(f"Loading configuration from: {config_path}") + config = load_config(config_path) + if not config: + logging.critical("Failed to load configuration. Bridge cannot start.") + sys.exit(1) + logging.info("Configuration loaded successfully.") + logging.info(f"Selected external transport: {config.external_transport}") + + setup_logging(config.log_level) + logging.debug(f"Logging level set to {config.log_level}") + + bridge = AsyncBridge(config) + + async def main(): + import uvicorn + from multiprocessing import Process + def run_api(): + uvicorn.run("ammb.api_async:app", host=config.api_host or "127.0.0.1", port=int(config.api_port or 8080), log_level="info") + + api_proc = None + if getattr(config, "api_enabled", False): + api_proc = Process(target=run_api) + api_proc.start() + logging.info(f"Async API server started on http://{config.api_host or '127.0.0.1'}:{config.api_port or 8080}") + try: + await bridge.start() + finally: + if api_proc: + api_proc.terminate() + api_proc.join() + + try: + asyncio.run(main()) + except KeyboardInterrupt: + logging.info("KeyboardInterrupt received. Initiating graceful shutdown...") + except Exception as e: + logging.critical(f"Unhandled critical exception in async bridge execution: {e}", exc_info=True) + sys.exit(1) + logging.info("--- Akita Meshtastic Meshcore Bridge (Async) Stopped ---") + sys.exit(0)