mirror of
https://github.com/rightup/pyMC_Repeater.git
synced 2026-03-28 17:43:06 +01:00
- Introduced `total_rx_airtime_ms` in `AirtimeManager` to track received packet airtime. - Added `record_rx` method to log received airtime in `AirtimeManager`. - Updated `RepeaterHandler` to count received packets and log RX airtime using the new method. - Enhanced statistics reporting in `get_stats` to include total received airtime. - Updated `ProtocolRequestHelper` to include total RX airtime in the RepeaterStats structure for better monitoring.
132 lines
5.0 KiB
Python
132 lines
5.0 KiB
Python
import logging
|
|
import math
|
|
import time
|
|
from typing import Tuple
|
|
|
|
logger = logging.getLogger("AirtimeManager")
|
|
|
|
|
|
class AirtimeManager:
|
|
def __init__(self, config: dict):
|
|
self.config = config
|
|
self.radio_config = config.get("radio", {})
|
|
self.max_airtime_per_minute = config.get("duty_cycle", {}).get(
|
|
"max_airtime_per_minute", 3600
|
|
)
|
|
|
|
# Store radio settings for airtime calculations
|
|
self.spreading_factor = self.radio_config.get("spreading_factor", 7)
|
|
self.bandwidth = self.radio_config.get("bandwidth", 125000)
|
|
self.coding_rate = self.radio_config.get("coding_rate", 5)
|
|
self.preamble_length = self.radio_config.get("preamble_length", 8)
|
|
|
|
# Track airtime in rolling window
|
|
self.tx_history = [] # [(timestamp, airtime_ms), ...]
|
|
self.window_size = 60 # seconds
|
|
self.total_airtime_ms = 0
|
|
self.total_rx_airtime_ms = 0
|
|
|
|
def calculate_airtime(
|
|
self,
|
|
payload_len: int,
|
|
spreading_factor: int = None,
|
|
bandwidth_hz: int = None,
|
|
coding_rate: int = None,
|
|
preamble_len: int = None,
|
|
crc_enabled: bool = True,
|
|
explicit_header: bool = True,
|
|
) -> float:
|
|
"""
|
|
Calculate LoRa packet airtime using the Semtech reference formula.
|
|
|
|
Reference: https://www.semtech.com/design-support/lora-calculator
|
|
|
|
Args:
|
|
payload_len: Payload length in bytes
|
|
spreading_factor: SF7-SF12 (uses config value if None)
|
|
bandwidth_hz: Bandwidth in Hz (uses config value if None)
|
|
coding_rate: CR denominator, 5=4/5, 6=4/6, 7=4/7, 8=4/8 (uses config value if None)
|
|
preamble_len: Preamble symbols (uses config value if None)
|
|
crc_enabled: Whether CRC is enabled (default: True)
|
|
explicit_header: Whether explicit header mode is used (default: True)
|
|
|
|
Returns:
|
|
Airtime in milliseconds
|
|
"""
|
|
sf = spreading_factor or self.spreading_factor
|
|
bw_hz = (bandwidth_hz or self.bandwidth)
|
|
cr = coding_rate or self.coding_rate
|
|
preamble_len = preamble_len or self.preamble_length
|
|
crc = 1 if crc_enabled else 0
|
|
h = 0 if explicit_header else 1 # H=0 for explicit, H=1 for implicit
|
|
|
|
# Low data rate optimization: required for SF11/SF12 at 125kHz
|
|
de = 1 if (sf >= 11 and bw_hz <= 125000) else 0
|
|
|
|
# Symbol time in milliseconds: T_sym = 2^SF / BW_kHz
|
|
t_sym = (2 ** sf) / (bw_hz / 1000)
|
|
|
|
# Preamble time: T_preamble = (n_preamble + 4.25) * T_sym
|
|
t_preamble = (preamble_len + 4.25) * t_sym
|
|
|
|
# Payload symbol calculation (Semtech formula):
|
|
# n_payload = 8 + ceil(max(8*PL - 4*SF + 28 + 16*CRC - 20*H, 0) / (4*(SF - 2*DE))) * CR
|
|
numerator = max(8 * payload_len - 4 * sf + 28 + 16 * crc - 20 * h, 0)
|
|
denominator = 4 * (sf - 2 * de)
|
|
n_payload = 8 + math.ceil(numerator / denominator) * cr
|
|
|
|
# Payload time
|
|
t_payload = n_payload * t_sym
|
|
|
|
# Total packet airtime
|
|
return t_preamble + t_payload
|
|
|
|
def can_transmit(self, airtime_ms: float) -> Tuple[bool, float]:
|
|
enforcement_enabled = self.config.get("duty_cycle", {}).get("enforcement_enabled", True)
|
|
if not enforcement_enabled:
|
|
# Duty cycle enforcement disabled - always allow
|
|
return True, 0.0
|
|
|
|
now = time.time()
|
|
|
|
# Remove old entries outside window
|
|
self.tx_history = [(ts, at) for ts, at in self.tx_history if now - ts < self.window_size]
|
|
|
|
# Calculate current airtime in window
|
|
current_airtime = sum(at for _, at in self.tx_history)
|
|
|
|
if current_airtime + airtime_ms <= self.max_airtime_per_minute:
|
|
return True, 0.0
|
|
|
|
# Calculate wait time until oldest entry expires
|
|
if self.tx_history:
|
|
oldest_ts, oldest_at = self.tx_history[0]
|
|
wait_time = (oldest_ts + self.window_size) - now
|
|
return False, max(0, wait_time)
|
|
|
|
return False, 1.0
|
|
|
|
def record_tx(self, airtime_ms: float):
|
|
self.tx_history.append((time.time(), airtime_ms))
|
|
self.total_airtime_ms += airtime_ms
|
|
logger.debug(f"TX recorded: {airtime_ms: .1f}ms (total: {self.total_airtime_ms: .0f}ms)")
|
|
|
|
def record_rx(self, airtime_ms: float):
|
|
"""Record received packet airtime (for total RX airtime stats)."""
|
|
self.total_rx_airtime_ms += airtime_ms
|
|
|
|
def get_stats(self) -> dict:
|
|
now = time.time()
|
|
self.tx_history = [(ts, at) for ts, at in self.tx_history if now - ts < self.window_size]
|
|
|
|
current_airtime = sum(at for _, at in self.tx_history)
|
|
utilization = (current_airtime / self.max_airtime_per_minute) * 100
|
|
|
|
return {
|
|
"current_airtime_ms": current_airtime,
|
|
"max_airtime_ms": self.max_airtime_per_minute,
|
|
"utilization_percent": utilization,
|
|
"total_airtime_ms": self.total_airtime_ms,
|
|
"total_rx_airtime_ms": self.total_rx_airtime_ms,
|
|
}
|