mirror of
https://github.com/pyMC-dev/pyMC_Repeater.git
synced 2026-07-05 17:32:33 +02:00
fix(mqtt): publish Semtech-derived packet duration instead of hard-coded "0"
Every MQTT-published packet has shipped with duration="0" since the PacketRecord factory was introduced. The repeater already computes LoRa time-on-air via AirtimeManager.calculate_airtime() (the canonical Semtech reference formula) for duty-cycle gating and TX delay, but the result was thrown away after each packet - never stored on the packet_record dict that flows to MQTT/SQLite/Glass/websocket. What changes - engine.py: RepeaterHandler._build_packet_record() now computes airtime_ms once per packet (Semtech formula via AirtimeManager) and stores it as packet_record['airtime_ms']. Single source of truth for every downstream consumer. - storage_utils.py: PacketRecord.from_packet_record() reads the new airtime_ms field and serializes it as a rounded integer in the 'duration' field of the published JSON. Falls back to 0 if the field is missing (backward compatibility for any older code path). - storage_collector.py: _publish_packet_to_mqtt() simplified - no recomputation, no helper. The publish path is now a passthrough. Why MQTT consumers (firmware-compatible analyzers, dashboards, the upstream meshcoretomqtt project) expect the same time-on-air value the firmware emits. Hard-coded "0" makes airtime/utilization charts derived from the mqtt stream useless and silently diverges from firmware behavior. Plumbing the value through packet_record (instead of recomputing in the publish path) means any future consumer - SQLite schema, web UI charts, Glass telemetry - reads the same number without separate calculations. Tests tests/test_packet_duration.py - 5 tests covering: - backward compat (legacy packet_record without airtime_ms => '0') - airtime_ms field flows through to duration as rounded integer string - explicit zero stays '0' - AirtimeManager output matches an independently-implemented Semtech reference for typical MeshCore EU settings (SF8/62.5kHz/CR4-8) - low-data-rate optimization branch (SF12/125kHz triggers DE=1) Co-Authored-By: Oz <oz-agent@warp.dev>
This commit is contained in:
@@ -214,7 +214,13 @@ class StorageCollector:
|
||||
self._publish_packet_to_mqtt(packet_record)
|
||||
|
||||
def _publish_packet_to_mqtt(self, packet_record: dict):
|
||||
"""Publish packet to mqtt broker if enabled and allowed"""
|
||||
"""Publish packet to mqtt broker if enabled and allowed.
|
||||
|
||||
The ``duration`` field in the published JSON is sourced from
|
||||
``packet_record['airtime_ms']``, populated upstream by
|
||||
RepeaterHandler._build_packet_record using the Semtech reference
|
||||
time-on-air formula. No recomputation is needed here.
|
||||
"""
|
||||
if not self.mqtt_handler:
|
||||
return
|
||||
|
||||
|
||||
@@ -38,6 +38,12 @@ class PacketRecord:
|
||||
"""
|
||||
Create PacketRecord from internal packet_record format.
|
||||
|
||||
The ``duration`` field is sourced from ``packet_record['airtime_ms']``,
|
||||
which RepeaterHandler._build_packet_record populates using the
|
||||
Semtech-reference time-on-air formula on the active radio settings.
|
||||
Records produced by older code paths that pre-date that field fall
|
||||
back to 0 to preserve legacy behavior.
|
||||
|
||||
Args:
|
||||
packet_record: Internal packet record dictionary
|
||||
origin: Node name
|
||||
@@ -57,6 +63,8 @@ class PacketRecord:
|
||||
route_map = {1: "F", 2: "D"}
|
||||
route = route_map.get(packet_record.get("route", 0), str(packet_record.get("route", 0)))
|
||||
|
||||
airtime_ms = float(packet_record.get("airtime_ms", 0.0) or 0.0)
|
||||
|
||||
return cls(
|
||||
origin=origin,
|
||||
origin_id=origin_id,
|
||||
@@ -73,7 +81,7 @@ class PacketRecord:
|
||||
SNR=str(packet_record.get("snr", 0)),
|
||||
RSSI=str(packet_record.get("rssi", 0)),
|
||||
score=str(int(packet_record.get("score", 0) * 1000)),
|
||||
duration="0",
|
||||
duration=str(int(round(airtime_ms))),
|
||||
hash=packet_record.get("packet_hash", ""),
|
||||
)
|
||||
|
||||
|
||||
@@ -590,6 +590,20 @@ class RepeaterHandler(BaseHandler):
|
||||
pkt_hash = packet_hash or packet.calculate_packet_hash().hex().upper()
|
||||
payload = getattr(packet, "payload", None)
|
||||
payload_len = len(payload or b"")
|
||||
|
||||
# LoRa time-on-air for this packet (Semtech reference formula).
|
||||
# Computed once here so every downstream consumer (MQTT, SQLite, Glass,
|
||||
# websocket) reads the same value instead of recomputing or shipping
|
||||
# zeros. The same calculator is used by the RX accumulator and the
|
||||
# duty-cycle gate, keeping reporting and metering aligned.
|
||||
airtime_ms = 0.0
|
||||
try:
|
||||
raw_len = packet.get_raw_length() if hasattr(packet, "get_raw_length") else 0
|
||||
if raw_len > 0:
|
||||
airtime_ms = float(self.airtime_mgr.calculate_airtime(raw_len))
|
||||
except Exception as e:
|
||||
logger.debug(f"Could not compute airtime for packet record: {e}")
|
||||
|
||||
return {
|
||||
"timestamp": time.time(),
|
||||
"header": (
|
||||
@@ -608,6 +622,7 @@ class RepeaterHandler(BaseHandler):
|
||||
snr, payload_len, self.radio_config["spreading_factor"]
|
||||
),
|
||||
"tx_delay_ms": tx_delay_ms,
|
||||
"airtime_ms": airtime_ms,
|
||||
"transmitted": transmitted,
|
||||
"is_duplicate": is_duplicate,
|
||||
"packet_hash": pkt_hash[:16],
|
||||
|
||||
@@ -0,0 +1,243 @@
|
||||
"""End-to-end integration test for MQTT packet publishing.
|
||||
|
||||
Asserts the full wire path: AirtimeManager computes airtime_ms, the value is
|
||||
stored on packet_record, PacketRecord.from_packet_record serializes it as the
|
||||
``duration`` field, and mqtt_handler.publish_packet hands a JSON payload to
|
||||
the paho-mqtt client whose topic and content match the documented contract.
|
||||
|
||||
This complements ``tests/test_packet_duration.py`` (which unit-tests the
|
||||
serializer and Semtech formula in isolation) by locking the wire format end
|
||||
to end. paho-mqtt's network layer is mocked so no real broker is required.
|
||||
"""
|
||||
|
||||
import json
|
||||
import math
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
from repeater.airtime import AirtimeManager
|
||||
from repeater.data_acquisition.mqtt_handler import MeshCoreToMqttPusher
|
||||
from repeater.data_acquisition.storage_utils import PacketRecord
|
||||
|
||||
|
||||
# --------------------------------------------------------------------
|
||||
# Test scaffolding
|
||||
# --------------------------------------------------------------------
|
||||
class _FakeIdentity:
|
||||
"""Minimal LocalIdentity stand-in for constructor wiring."""
|
||||
|
||||
def __init__(self, public_key_hex: str):
|
||||
self._pk = bytes.fromhex(public_key_hex)
|
||||
|
||||
def get_public_key(self) -> bytes:
|
||||
return self._pk
|
||||
|
||||
|
||||
def _make_config(format_value: str = "letsmesh", iata_code: str = "LAX") -> dict:
|
||||
"""Minimal pyMC_Repeater config sufficient to construct MeshCoreToMqttPusher."""
|
||||
return {
|
||||
"repeater": {"node_name": "test-node"},
|
||||
"radio": {
|
||||
"spreading_factor": 8,
|
||||
"bandwidth": 62500,
|
||||
"coding_rate": 8,
|
||||
"preamble_length": 17,
|
||||
"frequency": 869618000,
|
||||
"tx_power": 14,
|
||||
},
|
||||
"duty_cycle": {"max_airtime_per_minute": 3600},
|
||||
"mqtt_brokers": {
|
||||
"iata_code": iata_code,
|
||||
"status_interval": 0,
|
||||
"owner": "",
|
||||
"email": "",
|
||||
"brokers": [
|
||||
{
|
||||
"name": "test-broker",
|
||||
"enabled": True,
|
||||
"host": "broker.example",
|
||||
"port": 1883,
|
||||
"transport": "tcp",
|
||||
"format": format_value,
|
||||
"use_jwt_auth": False,
|
||||
"tls": {"enabled": False, "insecure": False},
|
||||
}
|
||||
],
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
def _attach_capturing_client(conn) -> list:
|
||||
"""Replace ``conn.client`` with a Mock and return the capture list.
|
||||
|
||||
The list is appended to on every paho-mqtt ``client.publish`` call. We also
|
||||
flip ``conn._running = True`` so the publish path doesn't short-circuit on
|
||||
the "not connected" guard.
|
||||
"""
|
||||
captured: list = []
|
||||
|
||||
def _fake_publish(topic, payload, retain=False, qos=0):
|
||||
captured.append(
|
||||
{"topic": topic, "payload": payload, "retain": retain, "qos": qos}
|
||||
)
|
||||
return None
|
||||
|
||||
conn._running = True
|
||||
conn.client = MagicMock()
|
||||
conn.client.publish = _fake_publish
|
||||
return captured
|
||||
|
||||
|
||||
# --------------------------------------------------------------------
|
||||
# End-to-end integration test
|
||||
# --------------------------------------------------------------------
|
||||
def test_mqtt_published_packet_carries_semtech_duration_end_to_end():
|
||||
"""A packet flowing through the publisher must produce JSON whose
|
||||
``duration`` field equals the Semtech-derived airtime in ms.
|
||||
|
||||
Steps mirror production:
|
||||
1. AirtimeManager (engine.py would call this in _build_packet_record).
|
||||
2. Store airtime_ms on the packet_record dict.
|
||||
3. PacketRecord.from_packet_record serializes it as 'duration'.
|
||||
4. mqtt_handler.publish_packet flows JSON to the paho client.
|
||||
5. Topic and JSON content are asserted byte-by-byte.
|
||||
"""
|
||||
config = _make_config(format_value="letsmesh", iata_code="LAX")
|
||||
public_key_hex = "AB" * 32 # 64 hex chars = 32-byte Ed25519 pubkey
|
||||
identity = _FakeIdentity(public_key_hex)
|
||||
|
||||
# Construct the real publisher; this builds one _BrokerConnection that
|
||||
# holds a paho-mqtt Client object. We then swap that object for a Mock.
|
||||
pusher = MeshCoreToMqttPusher(local_identity=identity, config=config)
|
||||
assert len(pusher.connections) == 1
|
||||
captured = _attach_capturing_client(pusher.connections[0])
|
||||
|
||||
# Step 1: compute airtime exactly the way engine.py does.
|
||||
raw_bytes = bytes(range(40)) # 40-byte packet (typical MeshCore size)
|
||||
airtime_mgr = AirtimeManager(config)
|
||||
expected_airtime_ms = airtime_mgr.calculate_airtime(len(raw_bytes))
|
||||
expected_duration = str(int(round(expected_airtime_ms)))
|
||||
|
||||
# Step 2: build a packet_record with airtime_ms (matches _build_packet_record).
|
||||
packet_record = {
|
||||
"timestamp": 1700000000.0,
|
||||
"type": 4,
|
||||
"route": 1,
|
||||
"rssi": -90,
|
||||
"snr": 7.5,
|
||||
"score": 0.5,
|
||||
"payload_length": 32,
|
||||
"packet_hash": "DEADBEEF" + "00" * 4,
|
||||
"raw_packet": raw_bytes.hex(),
|
||||
"airtime_ms": expected_airtime_ms,
|
||||
}
|
||||
|
||||
# Step 3: serialize via the production code path.
|
||||
record = PacketRecord.from_packet_record(
|
||||
packet_record, origin="test-node", origin_id=public_key_hex.upper()
|
||||
)
|
||||
assert record is not None
|
||||
|
||||
# Step 4: publish via the real publisher chain.
|
||||
pusher.publish_packet(record.to_dict())
|
||||
|
||||
# Step 5: assertions on the wire output.
|
||||
assert len(captured) == 1, "expected exactly one paho publish call"
|
||||
publish = captured[0]
|
||||
|
||||
# Topic follows the MC2MQTT family convention for letsmesh format.
|
||||
assert publish["topic"] == f"meshcore/LAX/{public_key_hex.upper()}/packets"
|
||||
|
||||
# Payload is JSON; parse and verify duration is the Semtech value.
|
||||
payload_dict = json.loads(publish["payload"])
|
||||
assert payload_dict["duration"] == expected_duration
|
||||
assert payload_dict["duration"] != "0", "duration must not be hard-coded zero"
|
||||
assert 0 < int(payload_dict["duration"]) < 10_000, (
|
||||
"duration should be a sane time-on-air in ms"
|
||||
)
|
||||
|
||||
# Sanity: other key fields flowed through correctly.
|
||||
assert payload_dict["origin"] == "test-node"
|
||||
assert payload_dict["origin_id"] == public_key_hex.upper()
|
||||
assert payload_dict["len"] == str(len(raw_bytes))
|
||||
assert payload_dict["raw"] == raw_bytes.hex()
|
||||
assert payload_dict["RSSI"] == "-90"
|
||||
assert payload_dict["type"] == "PACKET"
|
||||
assert payload_dict["direction"] == "rx"
|
||||
assert payload_dict["route"] == "F" # route=1 -> "F" (flood)
|
||||
|
||||
|
||||
def test_mqtt_published_packet_topic_uses_mc2mqtt_structure_for_waev_format():
|
||||
"""The waev format flavor must publish on the same MC2MQTT topic structure
|
||||
as letsmesh. Locks the contract that all MC2MQTT family formats share
|
||||
``meshcore/{IATA}/{PUBKEY}/packets``.
|
||||
"""
|
||||
config = _make_config(format_value="waev", iata_code="SFO")
|
||||
public_key_hex = "CD" * 32
|
||||
identity = _FakeIdentity(public_key_hex)
|
||||
|
||||
pusher = MeshCoreToMqttPusher(local_identity=identity, config=config)
|
||||
captured = _attach_capturing_client(pusher.connections[0])
|
||||
|
||||
raw_bytes = bytes(range(20))
|
||||
airtime_mgr = AirtimeManager(config)
|
||||
packet_record = {
|
||||
"timestamp": 1700000000.0,
|
||||
"type": 1,
|
||||
"route": 2,
|
||||
"rssi": -75,
|
||||
"snr": 5.0,
|
||||
"score": 0.3,
|
||||
"payload_length": 14,
|
||||
"packet_hash": "CAFEBABE" + "00" * 4,
|
||||
"raw_packet": raw_bytes.hex(),
|
||||
"airtime_ms": airtime_mgr.calculate_airtime(len(raw_bytes)),
|
||||
}
|
||||
record = PacketRecord.from_packet_record(
|
||||
packet_record, origin="test", origin_id=public_key_hex.upper()
|
||||
)
|
||||
pusher.publish_packet(record.to_dict())
|
||||
|
||||
assert len(captured) == 1
|
||||
assert captured[0]["topic"] == f"meshcore/SFO/{public_key_hex.upper()}/packets"
|
||||
payload_dict = json.loads(captured[0]["payload"])
|
||||
assert int(payload_dict["duration"]) > 0
|
||||
assert payload_dict["route"] == "D" # route=2 -> "D" (direct)
|
||||
|
||||
|
||||
def test_mqtt_published_packet_legacy_mqtt_format_uses_singular_packet_topic():
|
||||
"""The legacy ``format: mqtt`` path keeps its custom topic shape:
|
||||
``meshcore/repeater/{node_name}/packet`` (singular). This test locks the
|
||||
backward-compat behavior we explicitly preserve.
|
||||
"""
|
||||
config = _make_config(format_value="mqtt", iata_code="LAX")
|
||||
public_key_hex = "EF" * 32
|
||||
identity = _FakeIdentity(public_key_hex)
|
||||
|
||||
pusher = MeshCoreToMqttPusher(local_identity=identity, config=config)
|
||||
captured = _attach_capturing_client(pusher.connections[0])
|
||||
|
||||
raw_bytes = bytes(range(16))
|
||||
airtime_mgr = AirtimeManager(config)
|
||||
packet_record = {
|
||||
"timestamp": 1700000000.0,
|
||||
"type": 1,
|
||||
"route": 1,
|
||||
"rssi": -80,
|
||||
"snr": 4.0,
|
||||
"score": 0.2,
|
||||
"payload_length": 10,
|
||||
"packet_hash": "BADC0DE0" + "00" * 4,
|
||||
"raw_packet": raw_bytes.hex(),
|
||||
"airtime_ms": airtime_mgr.calculate_airtime(len(raw_bytes)),
|
||||
}
|
||||
record = PacketRecord.from_packet_record(
|
||||
packet_record, origin="test-node", origin_id=public_key_hex.upper()
|
||||
)
|
||||
pusher.publish_packet(record.to_dict())
|
||||
|
||||
assert len(captured) == 1
|
||||
# Legacy "mqtt" format: custom topic prefix + singular subtopic
|
||||
assert captured[0]["topic"] == "meshcore/repeater/test-node/packet"
|
||||
payload_dict = json.loads(captured[0]["payload"])
|
||||
# Duration still flows through correctly even on the legacy topic
|
||||
assert int(payload_dict["duration"]) > 0
|
||||
@@ -0,0 +1,118 @@
|
||||
"""Tests for the MQTT packet `duration` field.
|
||||
|
||||
The duration value published to MQTT must match the LoRa Semtech time-on-air
|
||||
formula computed from the active radio settings, just like the firmware MQTT
|
||||
output. This file locks that contract and the backward-compatible default.
|
||||
"""
|
||||
|
||||
import math
|
||||
|
||||
from repeater.airtime import AirtimeManager
|
||||
from repeater.data_acquisition.storage_utils import PacketRecord
|
||||
|
||||
|
||||
def _semtech_airtime_ms(payload_len: int, sf: int, bw_hz: int, cr: int, preamble: int) -> float:
|
||||
"""Reference implementation copied verbatim from the Semtech LoRa
|
||||
calculator so we can compare against the production code without trusting
|
||||
its own implementation as the oracle.
|
||||
"""
|
||||
crc = 1
|
||||
h = 0 # explicit header
|
||||
de = 1 if (sf >= 11 and bw_hz <= 125000) else 0
|
||||
t_sym = (2 ** sf) / (bw_hz / 1000)
|
||||
t_preamble = (preamble + 4.25) * t_sym
|
||||
numerator = max(8 * payload_len - 4 * sf + 28 + 16 * crc - 20 * h, 0)
|
||||
denominator = 4 * (sf - 2 * de)
|
||||
n_payload = 8 + math.ceil(numerator / denominator) * cr
|
||||
return t_preamble + n_payload * t_sym
|
||||
|
||||
|
||||
def _make_packet_record(raw_packet_len_bytes: int = 32) -> dict:
|
||||
"""Minimal packet_record dict with a hex raw_packet of the desired length."""
|
||||
return {
|
||||
"timestamp": 1700000000,
|
||||
"type": 4,
|
||||
"route": 1,
|
||||
"rssi": -90,
|
||||
"snr": 7.5,
|
||||
"score": 0.5,
|
||||
"payload_length": raw_packet_len_bytes - 6, # arbitrary
|
||||
"packet_hash": "deadbeef",
|
||||
"raw_packet": "AB" * raw_packet_len_bytes, # hex string -> N bytes
|
||||
}
|
||||
|
||||
|
||||
# --------------------------------------------------------------------
|
||||
# Backward compatibility: legacy packet_records without airtime_ms
|
||||
# --------------------------------------------------------------------
|
||||
def test_packet_record_defaults_duration_to_zero_when_airtime_ms_missing():
|
||||
"""packet_records produced before this change have no 'airtime_ms' key;
|
||||
the serializer must default to '0' rather than raising.
|
||||
"""
|
||||
pkt = _make_packet_record()
|
||||
pkt.pop("airtime_ms", None) # ensure not present
|
||||
record = PacketRecord.from_packet_record(pkt, origin="node", origin_id="ABCD")
|
||||
assert record is not None
|
||||
assert record.duration == "0"
|
||||
|
||||
|
||||
# --------------------------------------------------------------------
|
||||
# Forward path: airtime_ms field flows through to duration
|
||||
# --------------------------------------------------------------------
|
||||
def test_packet_record_serializes_airtime_ms_as_rounded_integer_string():
|
||||
"""airtime_ms = 123.7 must serialize as duration='124'."""
|
||||
pkt = _make_packet_record()
|
||||
pkt["airtime_ms"] = 123.7
|
||||
record = PacketRecord.from_packet_record(pkt, origin="node", origin_id="ABCD")
|
||||
assert record is not None
|
||||
assert record.duration == "124"
|
||||
|
||||
|
||||
def test_packet_record_serializes_zero_airtime_as_zero_duration():
|
||||
pkt = _make_packet_record()
|
||||
pkt["airtime_ms"] = 0.0
|
||||
record = PacketRecord.from_packet_record(pkt, origin="node", origin_id="ABCD")
|
||||
assert record is not None
|
||||
assert record.duration == "0"
|
||||
|
||||
|
||||
# --------------------------------------------------------------------
|
||||
# End-to-end: AirtimeManager output matches the Semtech reference
|
||||
# --------------------------------------------------------------------
|
||||
def test_airtime_manager_matches_semtech_reference_for_typical_meshcore_settings():
|
||||
"""The calculator wired into _publish_packet_to_mqtt must produce the same
|
||||
number as the Semtech reference formula for typical MeshCore EU settings.
|
||||
"""
|
||||
cfg = {
|
||||
"radio": {
|
||||
"spreading_factor": 8,
|
||||
"bandwidth": 62500,
|
||||
"coding_rate": 8,
|
||||
"preamble_length": 17,
|
||||
}
|
||||
}
|
||||
mgr = AirtimeManager(cfg)
|
||||
for payload_len in (16, 32, 64, 128, 200):
|
||||
actual = mgr.calculate_airtime(payload_len)
|
||||
expected = _semtech_airtime_ms(payload_len, sf=8, bw_hz=62500, cr=8, preamble=17)
|
||||
assert math.isclose(actual, expected, rel_tol=1e-9), (
|
||||
f"airtime mismatch for {payload_len}B: got {actual}, expected {expected}"
|
||||
)
|
||||
|
||||
|
||||
def test_airtime_manager_matches_semtech_reference_for_low_data_rate_optimization():
|
||||
"""SF11/SF12 at <=125kHz triggers low-data-rate optimization (DE=1).
|
||||
This test ensures both the reference and production path agree there.
|
||||
"""
|
||||
cfg = {
|
||||
"radio": {
|
||||
"spreading_factor": 12,
|
||||
"bandwidth": 125000,
|
||||
"coding_rate": 5,
|
||||
"preamble_length": 8,
|
||||
}
|
||||
}
|
||||
mgr = AirtimeManager(cfg)
|
||||
actual = mgr.calculate_airtime(50)
|
||||
expected = _semtech_airtime_ms(50, sf=12, bw_hz=125000, cr=5, preamble=8)
|
||||
assert math.isclose(actual, expected, rel_tol=1e-9)
|
||||
Reference in New Issue
Block a user