mirror of
https://github.com/pyMC-dev/pyMC_Repeater.git
synced 2026-06-11 08:44:46 +02:00
45a44eb47b
- Updated byte representations in tests to use lowercase hex format for consistency. - Reformatted code for better readability, including line breaks and indentation adjustments. - Consolidated multiple lines into single lines where appropriate to enhance clarity. - Ensured that all test cases maintain consistent formatting and style across the test suite.
133 lines
4.4 KiB
Python
133 lines
4.4 KiB
Python
"""Tests for repeater.airtime using radio preset configurations.
|
|
|
|
This complements duration-focused tests by validating AirtimeManager behavior
|
|
across real-world SF/BW/CR combinations from radio-presets.json.
|
|
"""
|
|
|
|
import json
|
|
import math
|
|
from pathlib import Path
|
|
|
|
import pytest
|
|
|
|
from repeater.airtime import AirtimeManager
|
|
|
|
|
|
def _semtech_airtime_ms(payload_len: int, sf: int, bw_hz: int, cr: int, preamble: int) -> float:
|
|
"""Independent Semtech reference formula used as oracle in tests."""
|
|
crc = 1
|
|
h = 0 # explicit header
|
|
de = 1 if (sf >= 11 and bw_hz <= 125000) else 0
|
|
t_sym = (2**sf) / (bw_hz / 1000)
|
|
t_preamble = (preamble + 4.25) * t_sym
|
|
numerator = max(8 * payload_len - 4 * sf + 28 + 16 * crc - 20 * h, 0)
|
|
denominator = 4 * (sf - 2 * de)
|
|
n_payload = 8 + math.ceil(numerator / denominator) * cr
|
|
return t_preamble + n_payload * t_sym
|
|
|
|
|
|
def _load_all_presets():
|
|
"""Load preset tuples (title, sf, bw_hz, cr) from JSON."""
|
|
preset_file = Path(__file__).resolve().parents[1] / "radio-presets.json"
|
|
data = json.loads(preset_file.read_text(encoding="utf-8"))
|
|
entries = data["config"]["suggested_radio_settings"]["entries"]
|
|
|
|
selected = []
|
|
for e in entries:
|
|
selected.append(
|
|
(
|
|
e["title"],
|
|
int(e["spreading_factor"]),
|
|
int(float(e["bandwidth"]) * 1000),
|
|
int(e["coding_rate"]),
|
|
)
|
|
)
|
|
return selected
|
|
|
|
|
|
ALL_PRESETS = _load_all_presets()
|
|
ALL_PRESET_IDS = [p[0] for p in ALL_PRESETS]
|
|
|
|
|
|
def _make_mgr(sf: int, bw_hz: int, cr: int, preamble: int = 8, max_airtime_per_minute: int = 3600):
|
|
cfg = {
|
|
"radio": {
|
|
"spreading_factor": sf,
|
|
"bandwidth": bw_hz,
|
|
"coding_rate": cr,
|
|
"preamble_length": preamble,
|
|
},
|
|
"duty_cycle": {
|
|
"max_airtime_per_minute": max_airtime_per_minute,
|
|
"enforcement_enabled": True,
|
|
},
|
|
}
|
|
return AirtimeManager(cfg)
|
|
|
|
|
|
def test_all_presets_loaded():
|
|
assert ALL_PRESETS
|
|
|
|
|
|
@pytest.mark.parametrize("_title,sf,bw_hz,cr", ALL_PRESETS, ids=ALL_PRESET_IDS)
|
|
def test_all_presets_match_semtech_formula(_title, sf, bw_hz, cr):
|
|
mgr = _make_mgr(sf, bw_hz, cr, preamble=8)
|
|
for payload_len in (16, 32, 64, 128):
|
|
actual = mgr.calculate_airtime(payload_len)
|
|
expected = _semtech_airtime_ms(payload_len, sf=sf, bw_hz=bw_hz, cr=cr, preamble=8)
|
|
assert math.isclose(actual, expected, rel_tol=1e-9), (
|
|
f"{_title} mismatch for {payload_len}B: got {actual}, expected {expected}"
|
|
)
|
|
|
|
|
|
@pytest.mark.parametrize("_title,sf,bw_hz,cr", ALL_PRESETS, ids=ALL_PRESET_IDS)
|
|
def test_all_presets_airtime_increases_with_payload(_title, sf, bw_hz, cr):
|
|
mgr = _make_mgr(sf, bw_hz, cr, preamble=8)
|
|
short = mgr.calculate_airtime(16)
|
|
medium = mgr.calculate_airtime(64)
|
|
long_ = mgr.calculate_airtime(128)
|
|
assert short < medium < long_
|
|
|
|
|
|
def test_long_range_preset_has_higher_airtime_than_fast_preset_for_same_payload():
|
|
# EU/UK long-range profile vs US recommended profile from presets.
|
|
long_mgr = _make_mgr(sf=11, bw_hz=250000, cr=5, preamble=8)
|
|
fast_mgr = _make_mgr(sf=7, bw_hz=62500, cr=5, preamble=8)
|
|
payload_len = 64
|
|
assert long_mgr.calculate_airtime(payload_len) > fast_mgr.calculate_airtime(payload_len)
|
|
|
|
|
|
@pytest.mark.parametrize("_title,sf,bw_hz,cr", ALL_PRESETS, ids=ALL_PRESET_IDS)
|
|
def test_can_transmit_blocks_after_budget_exhausted_for_each_preset(_title, sf, bw_hz, cr):
|
|
mgr = _make_mgr(sf, bw_hz, cr, preamble=8, max_airtime_per_minute=600)
|
|
airtime = mgr.calculate_airtime(64)
|
|
|
|
# Feed TX history until just before the limit.
|
|
sent = 0
|
|
while True:
|
|
can_tx, _ = mgr.can_transmit(airtime)
|
|
if not can_tx:
|
|
break
|
|
mgr.record_tx(airtime)
|
|
sent += 1
|
|
# Safety guard against accidental infinite loops.
|
|
assert sent < 1000
|
|
|
|
can_tx_after, wait = mgr.can_transmit(airtime)
|
|
assert can_tx_after is False
|
|
assert wait >= 0.0
|
|
|
|
|
|
def test_stats_report_tx_rx_airtime_totals():
|
|
mgr = _make_mgr(sf=8, bw_hz=62500, cr=8, preamble=17)
|
|
tx_airtime = mgr.calculate_airtime(50)
|
|
rx_airtime = mgr.calculate_airtime(20)
|
|
|
|
mgr.record_tx(tx_airtime)
|
|
mgr.record_rx(rx_airtime)
|
|
|
|
stats = mgr.get_stats()
|
|
assert stats["total_airtime_ms"] == pytest.approx(tx_airtime)
|
|
assert stats["total_rx_airtime_ms"] == pytest.approx(rx_airtime)
|
|
assert stats["current_airtime_ms"] == pytest.approx(tx_airtime)
|