tidy configuration handling by adding get_node_info function

This commit is contained in:
Lloyd
2025-11-19 10:39:14 +00:00
parent 007182deb1
commit 1793973a84
3 changed files with 45 additions and 116 deletions

View File

@@ -9,6 +9,36 @@ import yaml
logger = logging.getLogger("Config")
def get_node_info(config: Dict[str, Any]) -> Dict[str, Any]:
"""
Extract node name, radio configuration, and LetsMesh settings from config.
Args:
config: Configuration dictionary
Returns:
Dictionary with node_name, radio_config, and LetsMesh configuration
"""
node_name = config.get("repeater", {}).get("node_name", "PyMC-Repeater")
radio_config = config.get("radio", {})
radio_freq = radio_config.get("frequency", 0.0)
radio_bw = radio_config.get("bandwidth", 0.0)
radio_sf = radio_config.get("spreading_factor", 7)
radio_cr = radio_config.get("coding_rate", 5)
radio_config_str = f"{radio_freq},{radio_bw},{radio_sf},{radio_cr}"
letsmesh_config = config.get("letsmesh", {})
return {
"node_name": node_name,
"radio_config": radio_config_str,
"iata_code": letsmesh_config.get("iata_code", "TEST"),
"broker_index": letsmesh_config.get("broker_index", 0),
"status_interval": letsmesh_config.get("status_interval", 60),
"model": letsmesh_config.get("model", "PyMC-Repeater")
}
def load_config(config_path: Optional[str] = None) -> Dict[str, Any]:
if config_path is None:
config_path = os.getenv("PYMC_REPEATER_CONFIG", "/etc/pymc_repeater/config.yaml")

View File

@@ -6,7 +6,7 @@ import paho.mqtt.client as mqtt
from datetime import datetime, timedelta, UTC
from nacl.signing import SigningKey
from .. import __version__
# --------------------------------------------------------------------
# Helper: Base64URL without padding (required by MeshCore broker)
@@ -67,8 +67,6 @@ class MeshCoreToMqttJwtPusher:
jwt_expiry_minutes: int = 10,
use_tls: bool = True,
status_interval: int = 60, # Heartbeat interval in seconds
model: str = "PyMC-Repeater",
app_version: str = "0.0.0",
node_name: str = None,
radio_config: str = None,
):
@@ -84,10 +82,9 @@ class MeshCoreToMqttJwtPusher:
self.jwt_expiry_minutes = jwt_expiry_minutes
self.use_tls = use_tls
self.status_interval = status_interval
self.model = model
self.app_version = app_version
self.app_version = __version__
self.node_name = node_name or "PyMC-Repeater"
self.radio_config = radio_config or "915.0,125.0,7,5"
self.radio_config = radio_config or "0.0,0.0,0,0"
self._status_task = None
self._running = False
self._packet_stats = {
@@ -273,7 +270,7 @@ class MeshCoreToMqttJwtPusher:
"timestamp": datetime.now(UTC).isoformat(),
"origin": origin or "PyMC-Repeater",
"origin_id": self.public_key,
"model": self.model,
# "model": self.model,
"firmware_version": self.app_version,
"radio": radio_config or "0.0,0.0,0,0",
"client_version": f"pyMC_repeater_{self.app_version}",
@@ -298,95 +295,3 @@ class MeshCoreToMqttJwtPusher:
result = self.client.publish(topic, message, retain=retain)
logging.debug(f"Published to {topic}: {message}")
return result
# --------------------------------------------------------------------
# Example Usage
# --------------------------------------------------------------------
if __name__ == "__main__":
import time
import sys
logging.basicConfig(level=logging.INFO)
# Parse command line arguments
mode = sys.argv[1] if len(sys.argv) > 1 else "live"
if mode not in ["live", "send"]:
print("Usage: python letsmesh_handler.py [live|send]")
print(" live - Run in live mode with periodic status updates")
print(" send - Send a single test packet and exit")
sys.exit(1)
test_packet = (
"1D02DB013E6203740EEFAAC93AEE45D0854005CB3327ECB8D7626792D4D934F81E"
"2559DC27BCA0DFC407ED729E84E713AB2DDB5A04A509"
)
# Use saved test keypair (or generate new ones)
USE_SAVED_KEYS = True # Set to False to generate new keys
if USE_SAVED_KEYS:
# Saved test keypair
private_key = "2d2893a803b6eaed8c7e92189b8f7b76098e043c3e4a4f6a247cb730866c6fc9"
public_key = "66508c1711742e7633384659dc8139fa32c972cea9a50043da13bb9cb498de34"
print(f"\n=== Using Saved Test Keypair ===")
print(f"Public key: {public_key}")
else:
# Generate a valid test keypair
print("Generating new test keypair...")
test_signer = SigningKey.generate()
private_key = binascii.hexlify(bytes(test_signer)).decode()
public_key = binascii.hexlify(bytes(test_signer.verify_key)).decode()
print(f"\n=== Valid Test Keypair ===")
print(f"Private key: {private_key}")
print(f"Public key: {public_key}")
print(f"\nSave these for future testing!")
print()
# Create pusher with appropriate configuration
pusher = MeshCoreToMqttJwtPusher(
private_key=private_key,
public_key=public_key,
iata_code="test",
broker_index=0,
status_interval=30 if mode == "live" else 0, # 30s heartbeat in live mode
model="PyMC-Gateway",
app_version="1.0.0"
)
pusher.connect()
# Wait for connection to complete
print(f"Mode: {mode.upper()}")
print("Waiting for connection to complete...")
time.sleep(2)
if mode == "send":
# Send mode: publish one packet and exit
print("Publishing test packet...")
pusher.publish_raw_data(test_packet)
# Wait for publish to complete
time.sleep(1)
print("Disconnecting...")
pusher.disconnect()
print("Done!")
elif mode == "live":
# Live mode: stay connected and send periodic status updates
print("Connected! Publishing status updates every 30s...")
print("\nPress Ctrl+C to disconnect\n")
try:
# Keep running until interrupted
while True:
time.sleep(1)
except KeyboardInterrupt:
print("\n\nShutting down...")
pusher.disconnect()
print("Done!")

View File

@@ -9,7 +9,7 @@ from .sqlite_handler import SQLiteHandler
from .rrdtool_handler import RRDToolHandler
from .mqtt_handler import MQTTHandler
from .letsmesh_handler import MeshCoreToMqttJwtPusher
from .. import __version__
logger = logging.getLogger("StorageCollector")
@@ -40,31 +40,25 @@ class StorageCollector:
logger.error("Cannot initialize LetsMesh: No identity_key found in mesh config")
else:
from ..config import get_node_info
private_key_hex = identity_key.hex()
public_key_hex = local_identity.get_public_key().hex()
# Get node name and radio config from config
node_name = self.config.get("repeater", {}).get("node_name", "PyMC-Repeater")
radio_config = self.config.get("radio", {})
radio_freq = radio_config.get("frequency", 915.0)
radio_bw = radio_config.get("bandwidth", 125.0)
radio_sf = radio_config.get("spreading_factor", 7)
radio_cr = radio_config.get("coding_rate", 5)
radio_config_str = f"{radio_freq},{radio_bw},{radio_sf},{radio_cr}"
# Get all config from config module
node_info = get_node_info(self.config)
self.letsmesh_handler = MeshCoreToMqttJwtPusher(
private_key=private_key_hex,
public_key=public_key_hex,
iata_code=letsmesh_config.get("iata_code", "test"),
broker_index=letsmesh_config.get("broker_index", 0),
status_interval=letsmesh_config.get("status_interval", 60),
model=letsmesh_config.get("model", "PyMC-Repeater"),
app_version=__version__,
node_name=node_name,
radio_config=radio_config_str
iata_code=node_info["iata_code"],
broker_index=node_info["broker_index"],
status_interval=node_info["status_interval"],
node_name=node_info["node_name"],
radio_config=node_info["radio_config"]
)
self.letsmesh_handler.connect()
logger.info(f"LetsMesh handler initialized (v{__version__}) with public key: {public_key_hex[:16]}...")
logger.info(f"LetsMesh handler initialized with public key: {public_key_hex[:16]}...")
except Exception as e:
logger.error(f"Failed to initialize LetsMesh handler: {e}")
self.letsmesh_handler = None