Merge branch 'feat/companion' into dev-companion-v2-cleanup

This commit is contained in:
Lloyd
2026-03-13 09:07:34 +00:00
committed by GitHub
40 changed files with 3405 additions and 149 deletions
+3
View File
@@ -62,3 +62,6 @@ data/
*.log
.DS_Store
syncpi.sh
# Docker
/data
+22 -5
View File
@@ -320,17 +320,34 @@ This script will:
The script will prompt you for each optional removal step.
## Docker
## Docker Compose
You can now run PyMC Repeater from within a [Docker Container](https://www.docker.com/). Checkout the example [Docker Compose](./docker-compose.yml) file before you get started.
You can now run pyMC Repeater from within a [Docker Container](https://www.docker.com/). Checkout the example [Docker Compose](./docker-compose.yml) file before you get started. It will need some configuration changes based on what hardware you're using (USB vs SPI). Look at the commented out lines to see which hardware requires what lines and only enable what you need.
Here is what you'll need to do in order to get the container running:
1. Copy the `config.yaml.example` to `config.yaml`
```bash
cp ./config.yaml.example ./config.yaml
```
2. Run the configuration script and follow the prompts.
```bash
sudo bash ./setup-radio-config.sh
```
3. Modify the `config.yaml` file with a unique web UI password. This allows you to bypass the `/setup` page when logging for the first time. You can find the value under `repeater.security.admin_password`. Change to _anything_ besides the default of `admin123`.
4. Configure the [docker compose](./docker-compose.yml) to your specific hardware and file paths. Be sure to comment-out or delete lines that aren't required for your hardware. Please note that your hardware devices might be at a different path than those listed in the docker compose file.
5. Build and start the container.
```bash
docker compose up -d --force-recreate --build
```
Just note that you will have to pass in a `config.yaml` into the container. You can create a new config by following the instructions in the [Configuration section](#configuration).
## Roadmap / Planned Features
- [ ] **Public Map Integration** - Submit repeater location and details to public map for discovery
+7
View File
@@ -6,10 +6,17 @@ services:
ports:
- 8000:8000
devices:
# SPI DEVICES (Your path may differ)
- /dev/spidev0.0
- /dev/gpiochip0
# USB DEVICES (Your path may differ)
- /dev/bus/usb/002:/dev/bus/usb/002
# SPI DEVICES PERMISSIONS
cap_add:
- SYS_RAWIO
# USB DEVICSE PERMISSIONS
group_add:
- plugdev
volumes:
- ./config.yaml:/etc/pymc_repeater/config.yaml
- ./data:/var/lib/pymc_repeater
+3
View File
@@ -12,6 +12,7 @@ RUN apt-get update && apt-get install -y \
python3-rrdtool \
jq \
wget \
libusb-1.0-0 \
swig \
git \
build-essential \
@@ -26,6 +27,8 @@ WORKDIR ${INSTALL_DIR}
# Copy source
COPY repeater ./repeater
COPY pyproject.toml .
COPY radio-presets.json .
COPY radio-settings.json .
# Install package
RUN pip install --no-cache-dir .
+112 -74
View File
@@ -96,14 +96,9 @@ is_enabled() {
# Function to get current version
get_version() {
# Try to read from _version.py first (generated by setuptools_scm)
if [ -f "$INSTALL_DIR/repeater/_version.py" ]; then
grep "^__version__ = version = " "$INSTALL_DIR/repeater/_version.py" | cut -d"'" -f2 2>/dev/null || echo "unknown"
elif [ -f "$INSTALL_DIR/pyproject.toml" ]; then
grep "^version" "$INSTALL_DIR/pyproject.toml" | cut -d'"' -f2 2>/dev/null || echo "unknown"
else
echo "not installed"
fi
# Read version from the pip-installed package in dist-packages
python3 -c "from importlib.metadata import version; print(version('pymc_repeater'))" 2>/dev/null \
|| echo "not installed"
}
# Function to get service status for display
@@ -266,6 +261,7 @@ install_repeater() {
useradd --system --home /var/lib/pymc_repeater --shell /sbin/nologin "$SERVICE_USER"
fi
(
echo "10"; echo "# Adding user to hardware groups..."
for grp in plugdev dialout gpio i2c spi; do
getent group "$grp" >/dev/null 2>&1 && usermod -a -G "$grp" "$SERVICE_USER" 2>/dev/null || true
@@ -296,31 +292,7 @@ install_repeater() {
wget -qO /usr/local/bin/yq "https://github.com/mikefarah/yq/releases/download/${YQ_VERSION}/${YQ_BINARY}" 2>/dev/null && chmod +x /usr/local/bin/yq
fi
echo "28"; echo "# Generating version file..."
cd "$SCRIPT_DIR"
# Generate version file using setuptools_scm before copying
if [ -d .git ]; then
git fetch --tags >/dev/null 2>&1 || true
# Write the version file that will be copied
python3 -m setuptools_scm >/dev/null 2>&1 || true
python3 -c "from setuptools_scm import get_version; get_version(write_to='repeater/_version.py')" >/dev/null 2>&1 || true
fi
# Clean up stale bytecode in source directory before copying
find "$SCRIPT_DIR/repeater" -type d -name __pycache__ -exec rm -rf {} + 2>/dev/null || true
find "$SCRIPT_DIR/repeater" -type f -name '*.pyc' -delete 2>/dev/null || true
echo "29"; echo "# Cleaning old installation files..."
# Remove old repeater directory to ensure clean install
rm -rf "$INSTALL_DIR/repeater" 2>/dev/null || true
# Clean up old Python bytecode
find "$INSTALL_DIR" -type d -name __pycache__ -exec rm -rf {} + 2>/dev/null || true
find "$INSTALL_DIR" -type f -name '*.pyc' -delete 2>/dev/null || true
echo "30"; echo "# Installing files..."
cp -r "$SCRIPT_DIR/repeater" "$INSTALL_DIR/"
cp "$SCRIPT_DIR/pyproject.toml" "$INSTALL_DIR/"
cp "$SCRIPT_DIR/README.md" "$INSTALL_DIR/"
echo "29"; echo "# Installing files..."
cp "$SCRIPT_DIR/manage.sh" "$INSTALL_DIR/" 2>/dev/null || true
cp "$SCRIPT_DIR/pymc-repeater.service" "$INSTALL_DIR/" 2>/dev/null || true
cp "$SCRIPT_DIR/radio-settings.json" /var/lib/pymc_repeater/ 2>/dev/null || true
@@ -371,10 +343,51 @@ EOF
mkdir -p /etc/sudoers.d
cat > /etc/sudoers.d/pymc-repeater <<'EOF'
# Allow repeater user to manage the pymc-repeater service without password
repeater ALL=(root) NOPASSWD: /usr/bin/systemctl restart pymc-repeater, /usr/bin/systemctl stop pymc-repeater, /usr/bin/systemctl start pymc-repeater, /usr/bin/systemctl status pymc-repeater
repeater ALL=(root) NOPASSWD: /usr/bin/systemctl restart pymc-repeater, /usr/bin/systemctl stop pymc-repeater, /usr/bin/systemctl start pymc-repeater, /usr/bin/systemctl status pymc-repeater, /usr/local/bin/pymc-do-upgrade
EOF
chmod 0440 /etc/sudoers.d/pymc-repeater
echo ">>> Installing OTA upgrade wrapper..."
cat > /usr/local/bin/pymc-do-upgrade <<'UPGRADEEOF'
#!/bin/bash
# pymc-do-upgrade: invoked by the repeater service user via sudo for OTA upgrades.
# Usage: sudo /usr/local/bin/pymc-do-upgrade [channel] [pretend-version]
set -e
CHANNEL="${1:-main}"
PRETEND_VERSION="${2:-}"
# Validate: only allow safe git ref characters
if ! [[ "$CHANNEL" =~ ^[a-zA-Z0-9._/-]{1,80}$ ]]; then
echo "Invalid channel name: $CHANNEL" >&2
exit 1
fi
export PIP_ROOT_USER_ACTION=ignore
# If caller supplied a version string, tell setuptools_scm to use it (sudo
# strips env vars so it is passed as a positional argument instead).
[ -n "$PRETEND_VERSION" ] && export SETUPTOOLS_SCM_PRETEND_VERSION="$PRETEND_VERSION"
# Migration: remove legacy PYTHONPATH from service unit if present.
# Old installs set PYTHONPATH=/opt/pymc_repeater which caused the service to
# load from a stale source copy instead of the pip-installed dist-packages.
SVC_UNIT=/etc/systemd/system/pymc-repeater.service
if grep -q 'PYTHONPATH' "$SVC_UNIT" 2>/dev/null; then
sed -i '/^Environment=.*PYTHONPATH/d' "$SVC_UNIT"
systemctl daemon-reload
fi
# Migration: fix WorkingDirectory if it still points at the old source checkout.
# /opt/pymc_repeater contains a repeater/ subdirectory which shadows the
# pip-installed package, causing updates to have no effect on the running process.
if grep -q 'WorkingDirectory=/opt/pymc_repeater' "$SVC_UNIT" 2>/dev/null; then
sed -i 's|WorkingDirectory=/opt/pymc_repeater|WorkingDirectory=/var/lib/pymc_repeater|' "$SVC_UNIT"
systemctl daemon-reload
fi
exec python3 -m pip install \
--break-system-packages \
--no-cache-dir \
--force-reinstall \
--ignore-installed \
"pymc_repeater[hardware] @ git+https://github.com/rightup/pyMC_Repeater.git@${CHANNEL}"
UPGRADEEOF
chmod 0755 /usr/local/bin/pymc-do-upgrade
echo "75"; echo "# Starting service..."
systemctl enable "$SERVICE_NAME"
@@ -410,7 +423,13 @@ EOF
echo "Note: Using optimized binary wheels for faster installation"
echo ""
if pip install --break-system-packages --no-cache-dir .[hardware]; then
# Remove old pymc_core first so no stale .py/.pyc files linger
python3 -m pip uninstall -y pymc_core 2>/dev/null || true
# Install with --force-reinstall to ensure fresh pymc_core from GitHub
# --ignore-installed avoids failures on system-managed packages (e.g. PyYAML)
echo "Installing pymc_repeater with fresh dependencies from pyproject.toml..."
if python3 -m pip install --break-system-packages --no-cache-dir --force-reinstall --ignore-installed .[hardware]; then
echo ""
echo "✓ Python package installation completed successfully!"
@@ -602,37 +621,13 @@ upgrade_repeater() {
fi
echo " ✓ Dependencies updated"
echo "[3.5/9] Generating version file..."
echo "[4/9] Installing files..."
SCRIPT_DIR="$(dirname "$0")"
cd "$SCRIPT_DIR"
# Generate version file using setuptools_scm before copying
if [ -d .git ]; then
git fetch --tags 2>/dev/null || true
# Write the version file that will be copied
GENERATED_VERSION=$(python3 -m setuptools_scm 2>&1 || echo "unknown (setuptools_scm not available)")
python3 -c "from setuptools_scm import get_version; get_version(write_to='repeater/_version.py')" 2>&1 || echo " Warning: Could not generate _version.py file"
echo " Generated version: $GENERATED_VERSION"
if ! cp "$SCRIPT_DIR/pymc-repeater.service" /etc/systemd/system/; then
echo " ⚠ Warning: Failed to update service file old service file may remain"
fi
# Clean up stale bytecode in source directory before copying
find "$SCRIPT_DIR/repeater" -type d -name __pycache__ -exec rm -rf {} + 2>/dev/null || true
find "$SCRIPT_DIR/repeater" -type f -name '*.pyc' -delete 2>/dev/null || true
echo " ✓ Version file generated and bytecode cleaned"
echo "[3.8/9] Cleaning old installation files..."
# Remove old repeater directory to ensure clean upgrade
rm -rf "$INSTALL_DIR/repeater" 2>/dev/null || true
# Clean up old Python bytecode
find "$INSTALL_DIR" -type d -name __pycache__ -exec rm -rf {} + 2>/dev/null || true
find "$INSTALL_DIR" -type f -name '*.pyc' -delete 2>/dev/null || true
echo " ✓ Old files cleaned"
echo "[4/9] Installing new files..."
cp -r repeater "$INSTALL_DIR/" 2>/dev/null || true
cp pyproject.toml "$INSTALL_DIR/" 2>/dev/null || true
cp README.md "$INSTALL_DIR/" 2>/dev/null || true
cp pymc-repeater.service /etc/systemd/system/ 2>/dev/null || true
cp radio-settings.json /var/lib/pymc_repeater/ 2>/dev/null || true
cp radio-presets.json /var/lib/pymc_repeater/ 2>/dev/null || true
cp "$SCRIPT_DIR/radio-settings.json" /var/lib/pymc_repeater/ 2>/dev/null || true
cp "$SCRIPT_DIR/radio-presets.json" /var/lib/pymc_repeater/ 2>/dev/null || true
echo " ✓ Files updated"
echo "[5/9] Validating and updating configuration..."
@@ -681,9 +676,49 @@ EOF
mkdir -p /etc/sudoers.d
cat > /etc/sudoers.d/pymc-repeater <<'EOF'
# Allow repeater user to manage the pymc-repeater service without password
repeater ALL=(root) NOPASSWD: /usr/bin/systemctl restart pymc-repeater, /usr/bin/systemctl stop pymc-repeater, /usr/bin/systemctl start pymc-repeater, /usr/bin/systemctl status pymc-repeater
repeater ALL=(root) NOPASSWD: /usr/bin/systemctl restart pymc-repeater, /usr/bin/systemctl stop pymc-repeater, /usr/bin/systemctl start pymc-repeater, /usr/bin/systemctl status pymc-repeater, /usr/local/bin/pymc-do-upgrade
EOF
chmod 0440 /etc/sudoers.d/pymc-repeater
# Install / refresh OTA upgrade wrapper
cat > /usr/local/bin/pymc-do-upgrade <<'UPGRADEEOF'
#!/bin/bash
# pymc-do-upgrade: invoked by the repeater service user via sudo for OTA upgrades.
# Usage: sudo /usr/local/bin/pymc-do-upgrade [channel] [pretend-version]
set -e
CHANNEL="${1:-main}"
PRETEND_VERSION="${2:-}"
# Validate: only allow safe git ref characters
if ! [[ "$CHANNEL" =~ ^[a-zA-Z0-9._/-]{1,80}$ ]]; then
echo "Invalid channel name: $CHANNEL" >&2
exit 1
fi
export PIP_ROOT_USER_ACTION=ignore
# If caller supplied a version string, tell setuptools_scm to use it (sudo
# strips env vars so it is passed as a positional argument instead).
[ -n "$PRETEND_VERSION" ] && export SETUPTOOLS_SCM_PRETEND_VERSION="$PRETEND_VERSION"
# Migration: remove legacy PYTHONPATH from service unit if present.
# Old installs set PYTHONPATH=/opt/pymc_repeater which caused the service to
# load from a stale source copy instead of the pip-installed dist-packages.
SVC_UNIT=/etc/systemd/system/pymc-repeater.service
if grep -q 'PYTHONPATH' "$SVC_UNIT" 2>/dev/null; then
sed -i '/^Environment=.*PYTHONPATH/d' "$SVC_UNIT"
systemctl daemon-reload
fi
# Migration: fix WorkingDirectory if it still points at the old source checkout.
# /opt/pymc_repeater contains a repeater/ subdirectory which shadows the
# pip-installed package, causing updates to have no effect on the running process.
if grep -q 'WorkingDirectory=/opt/pymc_repeater' "$SVC_UNIT" 2>/dev/null; then
sed -i 's|WorkingDirectory=/opt/pymc_repeater|WorkingDirectory=/var/lib/pymc_repeater|' "$SVC_UNIT"
systemctl daemon-reload
fi
exec python3 -m pip install \
--break-system-packages \
--no-cache-dir \
--force-reinstall \
--ignore-installed \
"pymc_repeater[hardware] @ git+https://github.com/rightup/pyMC_Repeater.git@${CHANNEL}"
UPGRADEEOF
chmod 0755 /usr/local/bin/pymc-do-upgrade
echo " ✓ Permissions updated"
echo "[7/9] Reloading systemd..."
@@ -715,23 +750,24 @@ EOF
# Force binary wheels for slow-to-compile packages (much faster on Raspberry Pi)
export PIP_ONLY_BINARY=pycryptodome,cffi,PyNaCl,psutil
echo "Note: Using optimized binary wheels and cached packages for faster installation"
echo "Note: Using optimized binary wheels for faster installation"
echo ""
# Upgrade packages (uses cache for unchanged dependencies - much faster)
if python3 -m pip install --break-system-packages --upgrade --upgrade-strategy eager .[hardware]; then
# Remove old pymc_core first so no stale .py/.pyc files linger
python3 -m pip uninstall -y pymc_core 2>/dev/null || true
# Install with --force-reinstall to ensure fresh pymc_core from GitHub
# --ignore-installed avoids failures on system-managed packages (e.g. PyYAML)
echo "Upgrading pymc_repeater with fresh dependencies from pyproject.toml..."
if python3 -m pip install --break-system-packages --no-cache-dir --force-reinstall --ignore-installed .[hardware]; then
echo ""
echo "✓ Package and dependencies updated successfully!"
echo "✓ Package and dependencies upgraded successfully!"
else
echo ""
echo "⚠ Package update failed, but continuing..."
echo "⚠ Package upgrade failed, but continuing..."
fi
echo ""
echo "✓ All packages including pymc_core reinstalled successfully"
echo "[8/9] Starting service..."
systemctl daemon-reload
systemctl start "$SERVICE_NAME"
@@ -831,6 +867,7 @@ uninstall_repeater() {
systemctl stop "$SERVICE_NAME" 2>/dev/null || true
systemctl disable "$SERVICE_NAME" 2>/dev/null || true
(
echo "20"; echo "# Backing up configuration..."
if [ -d "$CONFIG_DIR" ]; then
cp -r "$CONFIG_DIR" "/tmp/pymc_repeater_config_backup_$(date +%Y%m%d_%H%M%S)" 2>/dev/null || true
@@ -843,6 +880,7 @@ uninstall_repeater() {
echo "50"; echo "# Removing polkit and sudoers rules..."
rm -f /etc/polkit-1/rules.d/10-pymc-repeater.rules
rm -f /etc/sudoers.d/pymc-repeater
rm -f /usr/local/bin/pymc-do-upgrade
echo "60"; echo "# Removing installation..."
rm -rf "$INSTALL_DIR"
+1 -2
View File
@@ -10,8 +10,7 @@ Wants=network-online.target
Type=simple
User=repeater
Group=repeater
WorkingDirectory=/opt/pymc_repeater
Environment="PYTHONPATH=/opt/pymc_repeater"
WorkingDirectory=/var/lib/pymc_repeater
# Start command - use python module directly with proper path
ExecStart=/usr/bin/python3 -m repeater.main --config /etc/pymc_repeater/config.yaml
+5 -4
View File
@@ -29,9 +29,8 @@ classifiers = [
keywords = ["mesh", "networking", "lora", "repeater", "daemon", "iot"]
dependencies = [
"pymc_core",
"pymc_core[hardware]@git+https://github.com/rightup/pyMC_core.git@feat/companion",
"pyyaml>=6.0.0",
"cherrypy>=18.0.0",
"paho-mqtt>=1.6.0",
@@ -63,8 +62,9 @@ dev = [
[project.scripts]
pymc-repeater = "repeater.main:main"
[tool.setuptools]
packages = ["repeater"]
[tool.setuptools.packages.find]
where = ["."]
include = ["repeater*"]
[tool.setuptools.package-data]
repeater = [
@@ -86,3 +86,4 @@ line_length = 100
[tool.setuptools_scm]
version_scheme = "guess-next-dev"
local_scheme = "no-local-version"
version_file = "repeater/_version.py"
+37 -37
View File
@@ -132,22 +132,22 @@ class _BrokerConnection:
try:
signature = self.local_identity.sign(signing_input)
except Exception as e:
logging.error(f"JWT signing failed for {self.broker['name']}: {e}")
logging.error(f" - public_key: {self.public_key}")
logging.error(f" - signing_input length: {len(signing_input)}")
logger.error(f"JWT signing failed for {self.broker['name']}: {e}")
logger.error(f" - public_key: {self.public_key}")
logger.error(f" - signing_input length: {len(signing_input)}")
raise
signature_hex = binascii.hexlify(signature).decode()
token = f"{header_b64}.{payload_b64}.{signature_hex}"
logging.debug(f"JWT token generated for {self.broker['name']}: {token[:50]}...")
logger.debug(f"JWT token generated for {self.broker['name']}: {token[:50]}...")
return token
def _on_connect(self, client, userdata, flags, rc):
"""MQTT connection callback"""
if rc == 0:
logging.info(f"Connected to {self.broker['name']}")
logger.info(f"Connected to {self.broker['name']}")
self._running = True
self._reconnect_attempts = 0 # Reset counter on success
self._schedule_jwt_refresh() # Schedule proactive JWT refresh
@@ -155,7 +155,7 @@ class _BrokerConnection:
self._on_connect_callback(self.broker["name"])
else:
error_msg = get_mqtt_error_message(rc, is_disconnect=False)
logging.error(f"Failed to connect to {self.broker['name']}: {error_msg}")
logger.error(f"Failed to connect to {self.broker['name']}: {error_msg}")
self._schedule_reconnect()
def _on_disconnect(self, client, userdata, rc):
@@ -165,11 +165,11 @@ class _BrokerConnection:
if rc != 0: # Unexpected disconnect
error_msg = get_mqtt_error_message(rc, is_disconnect=True)
logging.warning(f"Disconnected from {self.broker['name']} (rc={rc}): {error_msg}")
logger.warning(f"Disconnected from {self.broker['name']} (rc={rc}): {error_msg}")
if was_running: # Only reconnect if we were intentionally connected
self._schedule_reconnect(reason=error_msg)
else:
logging.info(f"Clean disconnect from {self.broker['name']}")
logger.info(f"Clean disconnect from {self.broker['name']}")
if self._on_disconnect_callback:
self._on_disconnect_callback(self.broker["name"])
@@ -183,7 +183,7 @@ class _BrokerConnection:
delay = min(5 * (2**self._reconnect_attempts), self._max_reconnect_delay)
self._reconnect_attempts += 1
logging.info(
logger.info(
f"Scheduling reconnect to {self.broker['name']} in {delay}s (attempt {self._reconnect_attempts}, reason: {reason})"
)
self._reconnect_timer = threading.Timer(delay, lambda: self._attempt_reconnect(reason))
@@ -193,7 +193,7 @@ class _BrokerConnection:
def _attempt_reconnect(self, reason: str = "connection lost"):
"""Attempt to reconnect to broker with fresh JWT"""
try:
logging.info(f"Attempting reconnection to {self.broker['name']} (reason: {reason})...")
logger.info(f"Attempting reconnection to {self.broker['name']} (reason: {reason})...")
# Stop the loop if it's still running (websocket mode requires clean restart)
try:
@@ -208,7 +208,7 @@ class _BrokerConnection:
self.client.loop_start()
self._loop_running = True
except Exception as e:
logging.error(f"Reconnection failed for {self.broker['name']}: {e}")
logger.error(f"Reconnection failed for {self.broker['name']}: {e}")
self._schedule_reconnect() # Try again later
def _set_jwt_credentials(self):
@@ -218,11 +218,11 @@ class _BrokerConnection:
username = f"v1_{self.public_key}"
self.client.username_pw_set(username=username, password=token)
self._connect_time = datetime.now(UTC)
logging.debug(f"JWT credentials set for {self.broker['name']}")
logging.debug(f"Using username: {username}")
logging.debug(f"Public key: {self.public_key[:16]}...{self.public_key[-16:]}")
logger.debug(f"JWT credentials set for {self.broker['name']}")
logger.debug(f"Using username: {username}")
logger.debug(f"Public key: {self.public_key[:16]}...{self.public_key[-16:]}")
except Exception as e:
logging.error(f"Failed to set JWT credentials for {self.broker['name']}: {e}")
logger.error(f"Failed to set JWT credentials for {self.broker['name']}: {e}")
raise
def connect(self):
@@ -241,7 +241,7 @@ class _BrokerConnection:
# Set JWT credentials before CONNECT handshake
self._set_jwt_credentials()
logging.info(
logger.info(
f"Connecting to {self.broker['name']} "
f"({protocol}://{self.broker['host']}:{self.broker['port']}) ..."
)
@@ -265,7 +265,7 @@ class _BrokerConnection:
self.client.loop_stop()
self.client.disconnect()
logging.info(f"Disconnected from {self.broker['name']}")
logger.info(f"Disconnected from {self.broker['name']}")
def publish(self, topic: str, payload: str, retain: bool = False):
"""Publish message to broker"""
@@ -306,7 +306,7 @@ class _BrokerConnection:
refresh_threshold = 0.80 + stagger_offset
refresh_delay = expiry_seconds * refresh_threshold
logging.info(
logger.info(
f"JWT refresh scheduled for {self.broker['name']} in {refresh_delay:.0f}s "
f"({refresh_threshold*100:.0f}% of {self.jwt_expiry_minutes}min token lifetime)"
)
@@ -319,7 +319,7 @@ class _BrokerConnection:
if not self._running:
return
logging.info(f"JWT token expiring soon for {self.broker['name']}, refreshing...")
logger.info(f"JWT token expiring soon for {self.broker['name']}, refreshing...")
self._running = False
self._jwt_refresh_timer = None
self.client.disconnect() # Triggers clean disconnect, then reconnect via timer
@@ -364,11 +364,11 @@ class MeshCoreToMqttJwtPusher:
if broker_index == -2:
# Custom brokers only - no built-in brokers
self.brokers = []
logging.info("Custom broker mode: using only user-defined brokers")
logger.info("Custom broker mode: using only user-defined brokers")
elif broker_index is None or broker_index == -1:
# Connect to all built-in brokers + additional ones
self.brokers = LETSMESH_BROKERS.copy()
logging.info(
logger.info(
f"Multi-broker mode: connecting to all {len(LETSMESH_BROKERS)} built-in brokers"
)
else:
@@ -376,16 +376,16 @@ class MeshCoreToMqttJwtPusher:
if broker_index >= len(LETSMESH_BROKERS):
raise ValueError(f"Invalid broker_index {broker_index}")
self.brokers = [LETSMESH_BROKERS[broker_index]]
logging.info(f"Single broker mode: connecting to {self.brokers[0]['name']}")
logger.info(f"Single broker mode: connecting to {self.brokers[0]['name']}")
# Add additional brokers from config
if additional_brokers:
for broker_config in additional_brokers:
if all(k in broker_config for k in ["name", "host", "port", "audience"]):
self.brokers.append(broker_config)
logging.info(f"Added custom broker: {broker_config['name']}")
logger.info(f"Added custom broker: {broker_config['name']}")
else:
logging.warning(f"Skipping invalid broker config: {broker_config}")
logger.warning(f"Skipping invalid broker config: {broker_config}")
# Validate that we have at least one broker
if not self.brokers:
@@ -426,7 +426,7 @@ class MeshCoreToMqttJwtPusher:
)
self.connections.append(conn)
logging.info(f"Initialized with {len(self.connections)} broker connection(s)")
logger.info(f"Initialized with {len(self.connections)} broker connection(s)")
def _on_broker_connected(self, broker_name: str):
"""Callback when a broker connects"""
@@ -439,7 +439,7 @@ class MeshCoreToMqttJwtPusher:
# Start heartbeat thread
self._status_task = threading.Thread(target=self._status_heartbeat_loop, daemon=True)
self._status_task.start()
logging.info(f"Started status heartbeat (interval: {self.status_interval}s)")
logger.info(f"Started status heartbeat (interval: {self.status_interval}s)")
def _on_broker_disconnected(self, broker_name: str):
"""Callback when a broker disconnects"""
@@ -448,9 +448,9 @@ class MeshCoreToMqttJwtPusher:
any_reconnecting = any(conn.has_pending_reconnect() for conn in self.connections)
if all_down and not any_reconnecting:
logging.warning("All broker connections lost with no pending reconnects")
logger.warning("All broker connections lost with no pending reconnects")
elif all_down:
logging.info("All brokers temporarily disconnected, reconnects pending")
logger.info("All brokers temporarily disconnected, reconnects pending")
def connect(self):
"""Establish connections to all configured brokers"""
@@ -462,19 +462,19 @@ class MeshCoreToMqttJwtPusher:
else:
# Stagger additional brokers using background timers
delay = idx * 30
logging.info(f"Staggering connection to {conn.broker['name']} by {delay}s")
logger.info(f"Staggering connection to {conn.broker['name']} by {delay}s")
timer = threading.Timer(delay, lambda c=conn: self._delayed_connect(c))
timer.daemon = True
timer.start()
except Exception as e:
logging.error(f"Failed to connect to {conn.broker['name']}: {e}")
logger.error(f"Failed to connect to {conn.broker['name']}: {e}")
def _delayed_connect(self, conn):
"""Connect a broker after a delay (called by timer)"""
try:
conn.connect()
except Exception as e:
logging.error(f"Failed to connect to {conn.broker['name']}: {e}")
logger.error(f"Failed to connect to {conn.broker['name']}: {e}")
def disconnect(self):
"""Disconnect from all brokers"""
@@ -493,9 +493,9 @@ class MeshCoreToMqttJwtPusher:
try:
conn.disconnect()
except Exception as e:
logging.error(f"Error disconnecting from {conn.broker['name']}: {e}")
logger.error(f"Error disconnecting from {conn.broker['name']}: {e}")
logging.info("Disconnected from all brokers")
logger.info("Disconnected from all brokers")
def _status_heartbeat_loop(self):
"""Background thread that publishes periodic status updates"""
@@ -507,11 +507,11 @@ class MeshCoreToMqttJwtPusher:
self.publish_status(
state="online", origin=self.node_name, radio_config=self.radio_config
)
logging.debug(f"Status heartbeat sent (next in {self.status_interval}s)")
logger.debug(f"Status heartbeat sent (next in {self.status_interval}s)")
time.sleep(self.status_interval)
except Exception as e:
logging.error(f"Status heartbeat error: {e}")
logger.error(f"Status heartbeat error: {e}")
time.sleep(self.status_interval)
# ----------------------------------------------------------------
@@ -582,10 +582,10 @@ class MeshCoreToMqttJwtPusher:
if conn.is_connected():
result = conn.publish(topic, message, retain=retain)
results.append((conn.broker["name"], result))
logging.debug(f"Published to {conn.broker['name']}/{topic}")
logger.debug(f"Published to {conn.broker['name']}/{topic}")
if not results:
logging.warning(f"No active broker connections for publishing to {topic}")
logger.warning(f"No active broker connections for publishing to {topic}")
return results
+10 -1
View File
@@ -1003,8 +1003,15 @@ class RepeaterHandler(BaseHandler):
# Get neighbors from database
neighbors = self.storage.get_neighbors() if self.storage else {}
# Format local_hash respecting path_hash_mode
phm = self.config.get("mesh", {}).get("path_hash_mode", 0)
_bc = {0: 1, 1: 2, 2: 3}.get(phm, 1)
_hc = _bc * 2
_val = int.from_bytes(bytes(self.local_hash_bytes[:_bc]), "big")
local_hash_str = f"0x{_val:0{_hc}x}"
stats = {
"local_hash": f"0x{self.local_hash:02x}",
"local_hash": local_hash_str,
"duplicate_cache_size": len(self.seen_packets),
"cache_ttl": self.cache_ttl,
"rx_count": self.rx_count,
@@ -1055,6 +1062,8 @@ class RepeaterHandler(BaseHandler):
},
"web": self.config.get("web", {}), # Include web configuration
"mesh": {
"loop_detect": self.config.get("mesh", {}).get("loop_detect", "off"),
"global_flood_allow": self.config.get("mesh", {}).get("global_flood_allow", True),
"path_hash_mode": self.config.get("mesh", {}).get("path_hash_mode", 0),
},
},
+73 -8
View File
@@ -8,6 +8,7 @@ Includes adaptive rate limiting based on mesh activity.
import asyncio
import logging
import time
from collections import OrderedDict
from enum import Enum
from typing import Dict, Optional, Tuple
@@ -89,10 +90,16 @@ class AdvertHelper:
float(penalty_cfg.get("max_penalty_seconds", 86400.0)),
)
# --- Advert dedupe config ---
dedupe_cfg = repeater_cfg.get("advert_dedupe", {})
self._advert_dedupe_ttl_seconds = max(1.0, float(dedupe_cfg.get("ttl_seconds", 120.0)))
self._advert_dedupe_max_hashes = max(100, int(dedupe_cfg.get("max_hashes", 10000)))
# --- Per-pubkey state ---
self._bucket_state: Dict[str, dict] = {}
self._penalty_until: Dict[str, float] = {}
self._violation_state: Dict[str, dict] = {}
self._recent_advert_hashes: OrderedDict[str, float] = OrderedDict()
# --- Adaptive metrics state ---
self._adverts_ewma = 0.0 # EWMA of adverts per minute
@@ -113,6 +120,7 @@ class AdvertHelper:
# Stats counters
self._stats_adverts_allowed = 0
self._stats_adverts_dropped = 0
self._stats_advert_duplicates = 0
self._stats_tier_changes = 0
# Recent drops tracking (keep last 20)
@@ -129,7 +137,8 @@ class AdvertHelper:
f"Advert limiter: adaptive={self._adaptive_enabled}, "
f"rate_limit={self._rate_limit_enabled}, "
f"bucket={self._base_bucket_capacity:.1f}, "
f"penalty={self._penalty_enabled}"
f"penalty={self._penalty_enabled}, "
f"dedupe=True"
)
# -------------------------------------------------------------------------
@@ -138,19 +147,27 @@ class AdvertHelper:
def _cleanup_old_state(self, now: float) -> None:
"""Clean up old/expired entries to prevent unbounded memory growth."""
# 1. Remove expired penalties
while self._recent_advert_hashes:
oldest_hash, expires_at = next(iter(self._recent_advert_hashes.items()))
if expires_at > now:
break
self._recent_advert_hashes.pop(oldest_hash, None)
while len(self._recent_advert_hashes) > self._advert_dedupe_max_hashes:
self._recent_advert_hashes.popitem(last=False)
expired_penalties = [pk for pk, until in self._penalty_until.items() if until < now]
for pk in expired_penalties:
del self._penalty_until[pk]
# 2. Remove old bucket states for inactive pubkeys
inactive_pubkeys = [
pk for pk, state in self._bucket_state.items()
if now - state.get("last_seen", 0) > self._bucket_state_retention_seconds
]
for pk in inactive_pubkeys:
del self._bucket_state[pk]
# Also clean up related violation state
if pk in self._violation_state:
del self._violation_state[pk]
@@ -161,7 +178,6 @@ class AdvertHelper:
# Reset violation count after decay period
vstate["count"] = 0
# 4. Hard limit: if we're tracking too many pubkeys, remove oldest inactive ones
if len(self._bucket_state) > self._max_tracked_pubkeys:
# Sort by last_seen and remove oldest 10%
sorted_pubkeys = sorted(
@@ -187,9 +203,33 @@ class AdvertHelper:
f"{len(inactive_pubkeys)} inactive pubkeys. "
f"Tracking: {len(self._bucket_state)} buckets, "
f"{len(self._penalty_until)} penalties, "
f"{len(self._known_neighbors)} neighbors"
f"{len(self._known_neighbors)} neighbors, "
f"{len(self._recent_advert_hashes)} advert hashes"
)
def _dedupe_advert_packet_hash(self, packet, now: float) -> bool:
"""Return True when advert packet hash was already seen recently."""
try:
pkt_hash = packet.calculate_packet_hash().hex().upper()
except Exception:
return False
expires_at = self._recent_advert_hashes.get(pkt_hash)
if expires_at and expires_at > now:
# Move to end so hot hashes remain least likely to be evicted
self._recent_advert_hashes.move_to_end(pkt_hash)
return True
# Track first-seen (or expired hash re-seen)
self._recent_advert_hashes[pkt_hash] = now + self._advert_dedupe_ttl_seconds
self._recent_advert_hashes.move_to_end(pkt_hash)
# Opportunistic cleanup to keep memory bounded between scheduled cleanup runs
while len(self._recent_advert_hashes) > self._advert_dedupe_max_hashes:
self._recent_advert_hashes.popitem(last=False)
return False
# -------------------------------------------------------------------------
# Adaptive tier calculation
# -------------------------------------------------------------------------
@@ -451,11 +491,18 @@ class AdvertHelper:
"stats": {
"adverts_allowed": self._stats_adverts_allowed,
"adverts_dropped": self._stats_adverts_dropped,
"adverts_duplicate_reheard": self._stats_advert_duplicates,
"drop_rate": round(
self._stats_adverts_dropped / max(1, self._stats_adverts_allowed + self._stats_adverts_dropped),
3,
),
},
"dedupe": {
"enabled": True,
"ttl_seconds": self._advert_dedupe_ttl_seconds,
"tracked_hashes": len(self._recent_advert_hashes),
"max_hashes": self._advert_dedupe_max_hashes,
},
"active_penalties": active_penalties,
"tracked_pubkeys": len(self._bucket_state),
"bucket_states": bucket_summary,
@@ -501,8 +548,20 @@ class AdvertHelper:
node_name = advert_data["name"]
contact_type = advert_data["contact_type"]
# Per-pubkey rate limiting (token bucket + penalty box)
now = time.time()
# Re-heard duplicates should be measured but not consume limiter tokens.
if self._dedupe_advert_packet_hash(packet, now):
self._stats_advert_duplicates += 1
self._update_metrics_window(now, is_advert=False, is_duplicate=True)
logger.debug(
"Duplicate advert re-heard from '%s' (%s...), skipping limiter/storage",
node_name,
pubkey[:16],
)
return
# Per-pubkey rate limiting (token bucket + penalty box)
allowed, reason = self._allow_advert(pubkey, now)
if not allowed:
logger.warning(f"Dropping advert from '{node_name}' ({pubkey[:16]}...): {reason}")
@@ -630,9 +689,15 @@ class AdvertHelper:
float(penalty_cfg.get("max_penalty_seconds", 86400.0)),
)
# Advert dedupe config
dedupe_cfg = repeater_cfg.get("advert_dedupe", {})
self._advert_dedupe_ttl_seconds = max(1.0, float(dedupe_cfg.get("ttl_seconds", 120.0)))
self._advert_dedupe_max_hashes = max(100, int(dedupe_cfg.get("max_hashes", 10000)))
logger.info(
f"Advert limiter config reloaded: adaptive={self._adaptive_enabled}, "
f"rate_limit={self._rate_limit_enabled}, bucket={self._base_bucket_capacity:.1f}"
f"rate_limit={self._rate_limit_enabled}, bucket={self._base_bucket_capacity:.1f}, "
f"dedupe=True"
)
except Exception as e:
logger.error(f"Error reloading advert limiter config: {e}")
+8 -1
View File
@@ -74,12 +74,19 @@ class TraceHelper:
# Check if this is a response to one of our pings
trace_tag = parsed_data.get("tag")
if trace_tag in self.pending_pings:
rssi_val = getattr(packet, "rssi", 0)
if rssi_val == 0:
logger.warning(
f"Ignoring trace response for tag {trace_tag} "
"with RSSI=0 (no signal data)"
)
return # wait for a valid response or let timeout handle it
ping_info = self.pending_pings[trace_tag]
# Store response data
ping_info["result"] = {
"path": trace_path,
"snr": packet.get_snr(),
"rssi": getattr(packet, "rssi", 0),
"rssi": rssi_val,
"received_at": time.time(),
}
# Signal the waiting coroutine
+2
View File
@@ -1,6 +1,7 @@
from .api_endpoints import APIEndpoints
from .cad_calibration_engine import CADCalibrationEngine
from .http_server import HTTPStatsServer, LogBuffer, StatsApp, _log_buffer
from .update_endpoints import UpdateAPIEndpoints
__all__ = [
"HTTPStatsServer",
@@ -8,5 +9,6 @@ __all__ = [
"LogBuffer",
"APIEndpoints",
"CADCalibrationEngine",
"UpdateAPIEndpoints",
"_log_buffer",
]
+74 -14
View File
@@ -16,6 +16,7 @@ from .auth.middleware import require_auth
from .auth_endpoints import AuthAPIEndpoints
from .cad_calibration_engine import CADCalibrationEngine
from .companion_endpoints import CompanionAPIEndpoints
from .update_endpoints import UpdateAPIEndpoints
logger = logging.getLogger("HTTPServer")
@@ -113,6 +114,14 @@ logger = logging.getLogger("HTTPServer")
# DELETE /api/room_message?room_name=General&message_id=123 - Delete specific message
# DELETE /api/room_messages_clear?room_name=General - Clear all messages in room
# OTA Updates
# GET /api/update/status - Current + latest version, channel, state
# POST /api/update/check - Force fresh GitHub version check
# POST /api/update/install - Start background upgrade; stream via /progress
# GET /api/update/progress - SSE stream of live install log lines
# GET /api/update/channels - List available release channels (branches)
# POST /api/update/set_channel - Switch release channel {"channel": "dev"}
# Setup Wizard
# GET /api/needs_setup - Check if repeater needs initial setup
# GET /api/hardware_options - Get available hardware configurations
@@ -163,6 +172,9 @@ class APIEndpoints:
daemon_instance, event_loop, self.config, self.config_manager
)
# Create nested update object for /api/update/* routes
self.update = UpdateAPIEndpoints()
def _is_cors_enabled(self):
return self.config.get("web", {}).get("cors_enabled", False)
@@ -229,6 +241,19 @@ class APIEndpoints:
cherrypy.response.headers["Allow"] = "POST"
raise cherrypy.HTTPError(405, "Method not allowed. This endpoint requires POST.")
def _fmt_hash(self, pubkey: bytes) -> str:
"""Format a node hash as a hex string respecting the configured path_hash_mode.
path_hash_mode 0 (default) 1-byte "0x19"
path_hash_mode 1 2-byte "0x1927"
path_hash_mode 2 3-byte "0x192722"
"""
mode = self.config.get("mesh", {}).get("path_hash_mode", 0)
byte_count = {0: 1, 1: 2, 2: 3}.get(mode, 1)
hex_chars = byte_count * 2
value = int.from_bytes(bytes(pubkey[:byte_count]), "big")
return f"0x{value:0{hex_chars}X}"
def _get_time_range(self, hours):
end_time = int(time.time())
return end_time - (hours * 3600), end_time
@@ -498,6 +523,8 @@ class APIEndpoints:
config_yaml["sx1262"]["txen_pin"] = hw_config.get("txen_pin", -1)
if "rxen_pin" in hw_config:
config_yaml["sx1262"]["rxen_pin"] = hw_config.get("rxen_pin", -1)
if "en_pin" in hw_config:
config_yaml["sx1262"]["en_pin"] = hw_config.get("en_pin", -1)
if "cs_pin" in hw_config:
config_yaml["sx1262"]["cs_pin"] = hw_config.get("cs_pin", -1)
if "txled_pin" in hw_config:
@@ -1608,11 +1635,21 @@ class APIEndpoints:
self.config["kiss"]["baud_rate"] = int(data["kiss_baud_rate"])
applied.append("kiss.baud_rate")
# Update flood loop detection mode
if "loop_detect" in data:
mode = str(data["loop_detect"]).strip().lower()
if mode not in ("off", "minimal", "moderate", "strict"):
return self._error("loop_detect must be one of: off, minimal, moderate, strict")
if "mesh" not in self.config:
self.config["mesh"] = {}
self.config["mesh"]["loop_detect"] = mode
applied.append(f"loop_detect={mode}")
if not applied:
return self._error("No valid settings provided")
live_sections = ["repeater", "delays", "radio"]
if "mesh" in self.config and any(k in data for k in ("path_hash_mode",)):
if "mesh" in self.config and any(k in data for k in ("path_hash_mode", "loop_detect")):
live_sections.append("mesh")
if "kiss" in self.config:
live_sections.append("kiss")
@@ -2052,7 +2089,6 @@ class APIEndpoints:
else:
return self._error("Method not supported")
@cherrypy.expose
@cherrypy.expose
@cherrypy.tools.json_out()
@cherrypy.tools.json_in()
@@ -2074,11 +2110,21 @@ class APIEndpoints:
if not target_id:
return self._error("Missing target_id parameter")
# Parse target hash (accepts hex string like "0xA5" or "a5")
# Derive byte width from path_hash_mode (issue #133):
# 0 = 1-byte (legacy), 1 = 2-byte, 2 = 3-byte
path_hash_mode = self.config.get("mesh", {}).get("path_hash_mode", 0)
byte_count = {0: 1, 1: 2, 2: 3}.get(path_hash_mode, 1)
hex_chars = byte_count * 2
max_hash = (1 << (byte_count * 8)) - 1
# Parse target hash (accepts hex string like "0xA5", "0xA5F0", or bare hex)
try:
target_hash = int(target_id, 16) if isinstance(target_id, str) else int(target_id)
if target_hash < 0 or target_hash > 255:
return self._error("target_id must be a valid byte (0x00-0xFF)")
if target_hash < 0 or target_hash > max_hash:
return self._error(
f"target_id must be a valid {byte_count}-byte hash "
f"(0x00-0x{max_hash:0{hex_chars}X})"
)
except ValueError:
return self._error(f"Invalid target_id format: {target_id}")
@@ -2100,8 +2146,9 @@ class APIEndpoints:
# Create trace packet
from pymc_core.protocol import PacketBuilder
path_bytes = list(target_hash.to_bytes(byte_count, "big"))
packet = PacketBuilder.create_trace(
tag=trace_tag, auth_code=0x12345678, flags=0x00, path=[target_hash]
tag=trace_tag, auth_code=0x12345678, flags=0x00, path=path_bytes
)
# Wait for response with timeout
@@ -2114,7 +2161,7 @@ class APIEndpoints:
# Send packet via router
await router.inject_packet(packet)
logger.info(f"Ping sent to 0x{target_hash:02x} with tag {trace_tag}")
logger.info(f"Ping sent to 0x{target_hash:0{hex_chars}x} with tag {trace_tag} (path_hash_mode={path_hash_mode})")
try:
await asyncio.wait_for(event.wait(), timeout=timeout)
@@ -2145,14 +2192,27 @@ class APIEndpoints:
# Calculate round-trip time
rtt_ms = (result["received_at"] - ping_info["sent_at"]) * 1000
# result["path"] is a flat byte list from _parse_trace_payload.
# For multi-byte hash mode, group into byte_count-sized chunks
# before formatting (e.g. [0xb5, 0xd8] → ["0xb5d8"] for 2-byte mode).
raw_path = result["path"]
if byte_count > 1:
grouped_path = [
int.from_bytes(bytes(raw_path[i:i + byte_count]), "big")
for i in range(0, len(raw_path), byte_count)
]
else:
grouped_path = raw_path
return self._success(
{
"target_id": f"0x{target_hash:02x}",
"target_id": f"0x{target_hash:0{hex_chars}x}",
"rtt_ms": round(rtt_ms, 2),
"snr_db": result["snr"],
"rssi": result["rssi"],
"path": [f"0x{h:02x}" for h in result["path"]],
"path": [f"0x{h:0{hex_chars}x}" for h in grouped_path],
"tag": trace_tag,
"path_hash_mode": path_hash_mode,
},
message="Ping successful",
)
@@ -2307,7 +2367,7 @@ class APIEndpoints:
if runtime_info:
identity_obj, config, identity_type = runtime_info
identity_config["runtime"] = {
"hash": f"0x{identity_obj.get_public_key()[0]:02X}",
"hash": self._fmt_hash(identity_obj.get_public_key()),
"address": identity_obj.get_address_bytes().hex(),
"type": identity_type,
"registered": True,
@@ -3043,7 +3103,7 @@ class APIEndpoints:
{
"name": "repeater",
"type": "repeater",
"hash": f"0x{repeater_hash:02X}",
"hash": self._fmt_hash(self.daemon_instance.local_identity.get_public_key()),
"max_clients": repeater_acl.max_clients,
"authenticated_clients": repeater_acl.get_num_clients(),
"has_admin_password": bool(repeater_acl.admin_password),
@@ -3062,7 +3122,7 @@ class APIEndpoints:
{
"name": name,
"type": "room_server",
"hash": f"0x{hash_byte:02X}",
"hash": self._fmt_hash(identity.get_public_key()),
"max_clients": acl.max_clients,
"authenticated_clients": acl.get_num_clients(),
"has_admin_password": bool(acl.admin_password),
@@ -3163,7 +3223,7 @@ class APIEndpoints:
identity_map[repeater_hash] = {
"name": "repeater",
"type": "repeater",
"hash": f"0x{repeater_hash:02X}",
"hash": self._fmt_hash(self.daemon_instance.local_identity.get_public_key()),
}
# Add room servers
@@ -3172,7 +3232,7 @@ class APIEndpoints:
identity_map[hash_byte] = {
"name": name,
"type": "room_server",
"hash": f"0x{hash_byte:02X}",
"hash": self._fmt_hash(identity.get_public_key()),
}
# Add companions
+177
View File
@@ -0,0 +1,177 @@
"""
WebSocket proxy for the companion frame protocol.
Bridges browser WebSocket to the companion TCP frame server.
Raw byte pipe no parsing, all protocol logic lives in the client.
"""
import logging
import socket
import threading
from urllib.parse import parse_qs
import cherrypy
from ws4py.websocket import WebSocket
logger = logging.getLogger("CompanionWSProxy")
# Set by http_server.py before CherryPy starts
_daemon = None
def set_daemon(instance):
global _daemon
_daemon = instance
class CompanionFrameWebSocket(WebSocket):
def opened(self):
"""Authenticate, resolve companion, open TCP socket, start reader."""
# JWT auth — same pattern as PacketWebSocket
jwt_handler = cherrypy.config.get("jwt_handler")
qs = ""
if hasattr(self, "environ"):
qs = self.environ.get("QUERY_STRING", "")
params = parse_qs(qs)
token = params.get("token", [None])[0]
companion_name = params.get("companion_name", [None])[0]
if not jwt_handler:
logger.warning("Connection rejected: no JWT handler configured")
self.close(code=1011, reason="server configuration error")
return
if not token:
logger.warning("Connection rejected: missing token")
self.close(code=1008, reason="unauthorized")
return
try:
payload = jwt_handler.verify_jwt(token)
if not payload:
logger.warning("Connection rejected: invalid token")
self.close(code=1008, reason="unauthorized")
return
except Exception as e:
logger.warning(f"Auth error: {e}")
self.close(code=1008, reason="unauthorized")
return
if not companion_name:
logger.warning("Connection rejected: missing companion_name")
self.close(code=1008, reason="missing companion_name")
return
# Resolve companion TCP port from config
tcp_port = self._resolve_tcp_port(companion_name)
if tcp_port is None:
logger.warning(f"Connection rejected: companion '{companion_name}' not found")
self.close(code=1008, reason="companion not found")
return
# Open TCP socket to the companion frame server
try:
self._tcp = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._tcp.settimeout(5.0)
self._tcp.connect(("127.0.0.1", tcp_port))
self._tcp.settimeout(None)
except Exception as e:
logger.error(f"TCP connect failed for '{companion_name}' port {tcp_port}: {e}")
self._tcp = None
self.close(code=1011, reason="TCP connect failed")
return
self._closing = False
self._reader = threading.Thread(target=self._tcp_to_ws, daemon=True)
self._reader.start()
user = payload.get("sub", "unknown")
logger.info(f"Companion WS opened: user={user}, companion={companion_name}, port={tcp_port}")
def received_message(self, message):
"""WS → TCP"""
tcp = getattr(self, "_tcp", None)
if tcp is None or getattr(self, "_closing", True):
return
try:
data = message.data
if isinstance(data, str):
data = data.encode("latin-1")
tcp.sendall(data)
except Exception:
self._teardown()
def closed(self, code, reason=None):
logger.info(f"Companion WS closed (code={code})")
self._teardown()
# ── internal ─────────────────────────────────────────────────────
def _resolve_tcp_port(self, companion_name):
"""Look up companion TCP port from daemon config. Returns port or None."""
if not _daemon:
return None
# Verify the companion is actually registered
identity_manager = getattr(_daemon, "identity_manager", None)
bridges = getattr(_daemon, "companion_bridges", {})
if not identity_manager or not bridges:
return None
found = False
for name, identity, _cfg in identity_manager.get_identities_by_type("companion"):
if name == companion_name:
h = identity.get_public_key()[0]
if h in bridges:
found = True
break
if not found:
return None
companions = _daemon.config.get("identities", {}).get("companions") or []
for entry in companions:
if entry.get("name") == companion_name:
return (entry.get("settings") or {}).get("tcp_port", 5000)
return None
def _tcp_to_ws(self):
"""TCP → WS reader loop"""
tcp = getattr(self, "_tcp", None)
if tcp is None:
return
try:
while not getattr(self, "_closing", True):
data = tcp.recv(4096)
if not data:
break
try:
self.send(data, binary=True)
except Exception:
break
except Exception:
pass
finally:
self._teardown()
def _teardown(self):
if getattr(self, "_closing", True):
return
self._closing = True
tcp = getattr(self, "_tcp", None)
if tcp:
try:
tcp.close()
except Exception:
pass
self._tcp = None
try:
self.close()
except Exception:
pass
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
@@ -0,0 +1 @@
import{a as p,b as n,g as m,e as t,s as g,t as s,j as d,p as l}from"./index-BvDdpPbD.js";const f={class:"flex items-center justify-between mb-4"},w={class:"text-xl font-semibold text-content-primary dark:text-content-primary"},v={class:"mb-6"},h={key:0,class:"w-6 h-6",fill:"none",stroke:"currentColor",viewBox:"0 0 24 24"},y={key:1,class:"w-6 h-6",fill:"none",stroke:"currentColor",viewBox:"0 0 24 24"},C={key:2,class:"w-6 h-6",fill:"none",stroke:"currentColor",viewBox:"0 0 24 24"},B={class:"text-content-secondary dark:text-content-primary/80 text-base leading-relaxed"},j={class:"flex gap-3"},_=p({__name:"ConfirmDialog",props:{show:{type:Boolean},title:{default:"Confirm Action"},message:{},confirmText:{default:"Confirm"},cancelText:{default:"Cancel"},variant:{default:"warning"}},emits:["close","confirm"],setup(c,{emit:b}){const o=c,r=b,u=i=>{i.target===i.currentTarget&&r("close")},k={danger:"bg-red-100 dark:bg-red-500/20 border-red-500/30 text-red-600 dark:text-red-400",warning:"bg-yellow-100 dark:bg-yellow-500/20 border-yellow-500/30 text-yellow-600 dark:text-yellow-400",info:"bg-blue-500/20 border-blue-500/30 text-blue-600 dark:text-blue-400"},x={danger:"bg-red-500 hover:bg-red-600",warning:"bg-yellow-500 hover:bg-yellow-600",info:"bg-blue-500 hover:bg-blue-600"};return(i,e)=>o.show?(l(),n("div",{key:0,onClick:u,class:"fixed inset-0 bg-black/40 backdrop-blur-lg z-[99999] flex items-center justify-center p-4",style:{"backdrop-filter":"blur(8px) saturate(180%)",position:"fixed",top:"0",left:"0",right:"0",bottom:"0"}},[t("div",{class:"bg-white dark:bg-surface-elevated backdrop-blur-xl rounded-[20px] p-6 w-full max-w-md border border-stroke-subtle dark:border-white/10",onClick:e[3]||(e[3]=g(()=>{},["stop"]))},[t("div",f,[t("h3",w,s(o.title),1),t("button",{onClick:e[0]||(e[0]=a=>r("close")),class:"text-content-secondary dark:text-content-muted hover:text-content-primary dark:hover:text-content-primary transition-colors"},e[4]||(e[4]=[t("svg",{class:"w-6 h-6",fill:"none",stroke:"currentColor",viewBox:"0 0 24 24"},[t("path",{"stroke-linecap":"round","stroke-linejoin":"round","stroke-width":"2",d:"M6 18L18 6M6 6l12 12"})],-1)]))]),t("div",v,[t("div",{class:d(["inline-flex p-3 rounded-xl mb-4",k[o.variant]])},[o.variant==="danger"?(l(),n("svg",h,e[5]||(e[5]=[t("path",{"stroke-linecap":"round","stroke-linejoin":"round","stroke-width":"2",d:"M12 9v2m0 4h.01m-6.938 4h13.856c1.54 0 2.502-1.667 1.732-3L13.732 4c-.77-1.333-2.694-1.333-3.464 0L3.34 16c-.77 1.333.192 3 1.732 3z"},null,-1)]))):o.variant==="warning"?(l(),n("svg",y,e[6]||(e[6]=[t("path",{"stroke-linecap":"round","stroke-linejoin":"round","stroke-width":"2",d:"M12 9v2m0 4h.01m-6.938 4h13.856c1.54 0 2.502-1.667 1.732-3L13.732 4c-.77-1.333-2.694-1.333-3.464 0L3.34 16c-.77 1.333.192 3 1.732 3z"},null,-1)]))):(l(),n("svg",C,e[7]||(e[7]=[t("path",{"stroke-linecap":"round","stroke-linejoin":"round","stroke-width":"2",d:"M13 16h-1v-4h-1m1-4h.01M21 12a9 9 0 11-18 0 9 9 0 0118 0z"},null,-1)])))],2),t("p",B,s(o.message),1)]),t("div",j,[t("button",{onClick:e[1]||(e[1]=a=>r("close")),class:"flex-1 px-4 py-3 rounded-xl bg-background-mute dark:bg-white/5 hover:bg-stroke-subtle dark:hover:bg-white/10 text-content-primary dark:text-content-primary transition-all duration-200 border border-stroke-subtle dark:border-stroke/10"},s(o.cancelText),1),t("button",{onClick:e[2]||(e[2]=a=>r("confirm")),class:d(["flex-1 px-4 py-3 rounded-xl text-white transition-all duration-200",x[o.variant]])},s(o.confirmText),3)])])])):m("",!0)}});export{_};
File diff suppressed because one or more lines are too long
@@ -0,0 +1 @@
import{a as e,b as r,i as o,p as n}from"./index-BvDdpPbD.js";const d=e({name:"HelpView",__name:"Help",setup(a){return(i,t)=>(n(),r("div",null,t[0]||(t[0]=[o('<div class="glass-card backdrop-blur border border-stroke-subtle dark:border-white/10 rounded-[15px] p-8"><h1 class="text-content-primary dark:text-content-primary text-2xl font-semibold mb-6">Help &amp; Documentation</h1><div class="text-center py-12"><div class="text-primary mb-6"><svg class="w-20 h-20 mx-auto mb-4" fill="none" stroke="currentColor" viewBox="0 0 24 24"><path stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M12 6.253v13m0-13C10.832 5.477 9.246 5 7.5 5S4.168 5.477 3 6.253v13C4.168 18.477 5.754 18 7.5 18s3.332.477 4.5 1.253m0-13C13.168 5.477 14.754 5 16.5 5c1.746 0 3.332.477 4.5 1.253v13C19.832 18.477 18.246 18 16.5 18c-1.746 0-3.332.477-4.5 1.253"></path></svg></div><h2 class="text-content-primary dark:text-content-primary text-xl font-medium mb-3">pyMC Repeater Wiki</h2><p class="text-content-secondary dark:text-content-muted mb-8 max-w-md mx-auto"> Access documentation, setup guides, troubleshooting tips, and community resources on our official wiki. </p><a href="https://github.com/rightup/pyMC_Repeater/wiki" target="_blank" rel="noopener noreferrer" class="inline-flex items-center gap-2 bg-primary hover:bg-primary/80 text-white dark:text-background font-medium py-3 px-6 rounded-xl transition-colors duration-200"><svg class="w-5 h-5" fill="none" stroke="currentColor" viewBox="0 0 24 24"><path stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M10 6H6a2 2 0 00-2 2v10a2 2 0 002 2h10a2 2 0 002-2v-4M14 4h6m0 0v6m0-6L10 14"></path></svg> Visit Wiki Documentation </a><div class="mt-8 text-xs text-content-muted dark:text-content-muted"> Opens in a new tab </div></div></div>',1)])))}});export{d as default};
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
@@ -0,0 +1 @@
import{a as k,b as o,g,e as r,j as a,t as p,s as x,p as s}from"./index-BvDdpPbD.js";const f={class:"mb-6"},m={key:0,class:"w-6 h-6",fill:"none",stroke:"currentColor",viewBox:"0 0 24 24"},v={key:1,class:"w-6 h-6",fill:"none",stroke:"currentColor",viewBox:"0 0 24 24"},h={key:2,class:"w-6 h-6",fill:"none",stroke:"currentColor",viewBox:"0 0 24 24"},w={class:"text-content-secondary dark:text-content-primary/80 text-base leading-relaxed"},C={class:"flex"},B=k({__name:"MessageDialog",props:{show:{type:Boolean},message:{},variant:{default:"success"}},emits:["close"],setup(i,{emit:d}){const t=i,l=d,c=n=>{n.target===n.currentTarget&&l("close")},b={success:"bg-green-100 dark:bg-green-500/20 border-green-600/40 dark:border-green-500/30 text-green-600 dark:text-green-400",error:"bg-red-100 dark:bg-red-500/20 border-red-500/30 text-red-600 dark:text-red-400",info:"bg-blue-500/20 border-blue-500/30 text-blue-600 dark:text-blue-400"},u={success:"bg-green-500 hover:bg-green-600",error:"bg-red-500 hover:bg-red-600",info:"bg-blue-500 hover:bg-blue-600"};return(n,e)=>t.show?(s(),o("div",{key:0,onClick:c,class:"fixed inset-0 bg-black/40 backdrop-blur-lg z-[99999] flex items-center justify-center p-4",style:{"backdrop-filter":"blur(8px) saturate(180%)",position:"fixed",top:"0",left:"0",right:"0",bottom:"0"}},[r("div",{class:"bg-white dark:bg-surface-elevated backdrop-blur-xl rounded-[20px] p-6 w-full max-w-md border border-stroke-subtle dark:border-white/10",onClick:e[1]||(e[1]=x(()=>{},["stop"]))},[r("div",f,[r("div",{class:a(["inline-flex p-3 rounded-xl mb-4",b[t.variant]])},[t.variant==="success"?(s(),o("svg",m,e[2]||(e[2]=[r("path",{"stroke-linecap":"round","stroke-linejoin":"round","stroke-width":"2",d:"M5 13l4 4L19 7"},null,-1)]))):t.variant==="error"?(s(),o("svg",v,e[3]||(e[3]=[r("path",{"stroke-linecap":"round","stroke-linejoin":"round","stroke-width":"2",d:"M6 18L18 6M6 6l12 12"},null,-1)]))):(s(),o("svg",h,e[4]||(e[4]=[r("path",{"stroke-linecap":"round","stroke-linejoin":"round","stroke-width":"2",d:"M13 16h-1v-4h-1m1-4h.01M21 12a9 9 0 11-18 0 9 9 0 0118 0z"},null,-1)])))],2),r("p",w,p(t.message),1)]),r("div",C,[r("button",{onClick:e[0]||(e[0]=y=>l("close")),class:a(["flex-1 px-4 py-3 rounded-xl text-white transition-all duration-200",u[t.variant]])}," OK ",2)])])])):g("",!0)}});export{B as _};
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
@@ -0,0 +1 @@
import{M as x,c as s}from"./index-BvDdpPbD.js";const l={7:-7.5,8:-10,9:-12.5,10:-15,11:-17.5,12:-20},d=-116,i=8,u=5;function y(t,e){return t-e}function S(t){return l[t]??l[i]}function f(t,e){const r=e+u;if(t<=e){const o=t<=e-5?0:1;return{bars:o,color:"text-red-600 dark:text-red-400",snr:t,quality:o===0?"none":"poor"}}if(t<r){const n=(t-e)/u<.5?2:3;return{bars:n,color:n===2?"text-orange-600 dark:text-orange-400":"text-yellow-600 dark:text-yellow-400",snr:t,quality:"fair"}}const a=t-r>=10?5:4;return{bars:a,color:a===5?"text-green-600 dark:text-green-400":"text-green-600 dark:text-green-300",snr:t,quality:a===5?"excellent":"good"}}function N(){const t=x(),e=s(()=>t.noiseFloorDbm??d),r=s(()=>t.stats?.config?.radio?.spreading_factor??i),c=s(()=>S(r.value));return{getSignalQuality:o=>{if(!o||o>0||o<-120)return{bars:0,color:"text-gray-400 dark:text-gray-500",snr:-999,quality:"none"};const n=y(o,e.value),g=Math.max(-30,Math.min(20,n));return f(g,c.value)},noiseFloor:e,spreadingFactor:r,minSNR:c}}export{N as u};
+15 -1
View File
@@ -28,6 +28,7 @@ try:
broadcast_packet,
init_websocket,
)
from .companion_ws_proxy import CompanionFrameWebSocket, set_daemon as _set_companion_daemon
WEBSOCKET_AVAILABLE = True
except ImportError:
@@ -163,7 +164,7 @@ class StatsApp:
raise cherrypy.NotFound()
# Handle WebSocket routes
if args and len(args) >= 2 and args[0] == "ws" and args[1] == "packets":
if args and len(args) >= 2 and args[0] == "ws" and args[1] in ("packets", "companion_frame"):
# WebSocket tool will intercept this
return ""
@@ -191,6 +192,7 @@ class HTTPStatsServer:
self.port = port
self.config = config or {}
self.config_path = config_path
self.daemon_instance = daemon_instance
# Initialize authentication handlers
self._init_auth_handlers()
@@ -359,6 +361,18 @@ class HTTPStatsServer:
"tools.gzip.on": False,
}
logger.info("WebSocket endpoint configured at /ws/packets")
# Companion frame proxy (binary WS ↔ TCP byte pipe)
if self.daemon_instance:
_set_companion_daemon(self.daemon_instance)
config["/ws/companion_frame"] = {
"tools.websocket.on": True,
"tools.websocket.handler_cls": CompanionFrameWebSocket,
"tools.trailing_slash.on": False,
"tools.require_auth.on": False,
"tools.gzip.on": False,
}
logger.info("WebSocket endpoint configured at /ws/companion_frame")
except Exception as e:
logger.error(f"Failed to initialize WebSocket: {e}")
import traceback
File diff suppressed because it is too large Load Diff
+1 -1
View File
@@ -1,5 +1,5 @@
"""
Comprehensive tests for pyMC_Repeater engine.py RepeaterHandler.
tests for pyMC_Repeater engine.py RepeaterHandler.
Covers: flood_forward, direct_forward, process_packet, duplicate detection,
mark_seen, validate_packet, packet scoring, TX delay, cache management,
+720
View File
@@ -0,0 +1,720 @@
"""
Tests for flood packet loop detection and duplicate suppression.
Exercises the real RepeaterHandler engine with real pymc_core Packet/PathUtils
objects to verify:
- Duplicate packet suppression via calculate_packet_hash (SHA256-based)
- Loop detection modes (off, minimal, moderate, strict) with real path bytes
- Flood re-forwarding prevention (own hash already in path)
- Multi-byte hash mode interaction with loop/dedup
- Global flood policy enforcement
- mark_seen / is_duplicate cache behaviour
- do_not_retransmit flag handling
"""
from unittest.mock import MagicMock, patch
import pytest
from pymc_core.protocol import Packet, PathUtils
from pymc_core.protocol.constants import (
MAX_PATH_SIZE,
ROUTE_TYPE_FLOOD,
ROUTE_TYPE_TRANSPORT_FLOOD,
)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
LOCAL_HASH_BYTES = bytes([0xAB, 0xCD, 0xEF])
def _make_flood_packet(
path_bytes: bytes = b"",
hash_size: int = 1,
hash_count: int = 0,
payload: bytes = b"\x01\x02\x03\x04",
) -> Packet:
"""Create a real flood Packet with the given path encoding."""
pkt = Packet()
pkt.header = ROUTE_TYPE_FLOOD
pkt.path = bytearray(path_bytes)
pkt.path_len = PathUtils.encode_path_len(hash_size, hash_count)
pkt.payload = bytearray(payload)
pkt.payload_len = len(payload)
return pkt
def _make_handler(
loop_detect="off",
path_hash_mode=0,
local_hash_bytes=None,
global_flood_allow=True,
):
"""Create a RepeaterHandler with real engine logic, mocking only hardware."""
lhb = local_hash_bytes or LOCAL_HASH_BYTES
config = {
"repeater": {
"mode": "forward",
"cache_ttl": 3600,
"use_score_for_tx": False,
"score_threshold": 0.3,
"send_advert_interval_hours": 0,
"node_name": "test-node",
},
"mesh": {
"global_flood_allow": global_flood_allow,
"loop_detect": loop_detect,
"path_hash_mode": path_hash_mode,
},
"delays": {"tx_delay_factor": 1.0, "direct_tx_delay_factor": 0.5},
"duty_cycle": {"max_airtime_per_minute": 3600, "enforcement_enabled": True},
"radio": {
"spreading_factor": 8,
"bandwidth": 125000,
"coding_rate": 8,
"preamble_length": 17,
},
}
dispatcher = MagicMock()
dispatcher.radio = MagicMock(
spreading_factor=8,
bandwidth=125000,
coding_rate=8,
preamble_length=17,
frequency=915000000,
tx_power=14,
)
dispatcher.local_identity = MagicMock()
with (
patch("repeater.engine.StorageCollector"),
patch("repeater.engine.RepeaterHandler._start_background_tasks"),
):
from repeater.engine import RepeaterHandler
h = RepeaterHandler(config, dispatcher, lhb[0], local_hash_bytes=lhb)
return h
# ===================================================================
# 1. Duplicate suppression — real packet hash (SHA256)
# ===================================================================
class TestDuplicateSuppression:
"""Verify duplicate packets are detected via real calculate_packet_hash."""
def test_same_packet_forwarded_twice_is_duplicate(self):
"""Forwarding the same packet a second time must be rejected as duplicate."""
h = _make_handler()
pkt1 = _make_flood_packet(payload=b"\xDE\xAD")
result1 = h.flood_forward(pkt1)
assert result1 is not None
# Same content in a fresh Packet object
pkt2 = _make_flood_packet(payload=b"\xDE\xAD")
result2 = h.flood_forward(pkt2)
assert result2 is None
assert pkt2.drop_reason == "Duplicate"
def test_different_payload_not_duplicate(self):
"""Packets with different payloads have different hashes."""
h = _make_handler()
pkt1 = _make_flood_packet(payload=b"\x01\x02")
assert h.flood_forward(pkt1) is not None
pkt2 = _make_flood_packet(payload=b"\x03\x04")
assert h.flood_forward(pkt2) is not None
def test_mark_seen_makes_is_duplicate_true(self):
"""mark_seen records the hash; is_duplicate finds it."""
h = _make_handler()
pkt = _make_flood_packet(payload=b"\xAA\xBB")
assert not h.is_duplicate(pkt)
h.mark_seen(pkt)
assert h.is_duplicate(pkt)
def test_packet_hash_uses_real_sha256(self):
"""Verify the hash comes from real Packet.calculate_packet_hash."""
pkt = _make_flood_packet(payload=b"\x01\x02\x03")
hash_bytes = pkt.calculate_packet_hash()
assert isinstance(hash_bytes, bytes)
assert len(hash_bytes) > 0
# Same content → same hash
pkt2 = _make_flood_packet(payload=b"\x01\x02\x03")
assert pkt2.calculate_packet_hash() == hash_bytes
def test_different_path_same_payload_same_hash(self):
"""
Packet hash is based on payload_type + payload (not path),
except for TRACE packets. Two flood packets with different paths
but same payload have the same hash.
"""
pkt_a = _make_flood_packet(path_bytes=b"\x11", hash_size=1, hash_count=1,
payload=b"\xFF")
pkt_b = _make_flood_packet(path_bytes=b"\x22", hash_size=1, hash_count=1,
payload=b"\xFF")
assert pkt_a.calculate_packet_hash() == pkt_b.calculate_packet_hash()
def test_seen_cache_eviction(self):
"""When cache exceeds max_cache_size, oldest entries are evicted."""
h = _make_handler()
h.max_cache_size = 3
packets = [_make_flood_packet(payload=bytes([i, i + 1])) for i in range(5)]
for p in packets:
h.mark_seen(p)
# Oldest entries (0, 1) should have been evicted
assert not h.is_duplicate(packets[0])
assert not h.is_duplicate(packets[1])
# Recent entries still present
assert h.is_duplicate(packets[2])
assert h.is_duplicate(packets[3])
assert h.is_duplicate(packets[4])
# ===================================================================
# 2. Loop detection modes — 1-byte hash
# ===================================================================
class TestLoopDetection1Byte:
"""Loop detection with 1-byte hash paths (mode 0)."""
def test_loop_detect_off_allows_own_hash(self):
"""With loop_detect=off, packet with our hash in path is forwarded."""
h = _make_handler(loop_detect="off",
local_hash_bytes=bytes([0xAB, 0xCD, 0xEF]))
# Path contains our 1-byte hash (0xAB) once
pkt = _make_flood_packet(b"\xAB", hash_size=1, hash_count=1)
result = h.flood_forward(pkt)
assert result is not None
def test_loop_detect_strict_blocks_single_occurrence(self):
"""strict mode (threshold=1): one occurrence of our hash → loop."""
h = _make_handler(loop_detect="strict",
local_hash_bytes=bytes([0xAB, 0xCD, 0xEF]))
pkt = _make_flood_packet(b"\xAB", hash_size=1, hash_count=1)
result = h.flood_forward(pkt)
assert result is None
assert "loop" in pkt.drop_reason.lower()
def test_loop_detect_moderate_allows_one_occurrence(self):
"""moderate mode (threshold=2): one occurrence is fine."""
h = _make_handler(loop_detect="moderate",
local_hash_bytes=bytes([0xAB, 0xCD, 0xEF]))
pkt = _make_flood_packet(b"\x11\xAB", hash_size=1, hash_count=2)
result = h.flood_forward(pkt)
assert result is not None
def test_loop_detect_moderate_blocks_two_occurrences(self):
"""moderate mode (threshold=2): two occurrences → loop."""
h = _make_handler(loop_detect="moderate",
local_hash_bytes=bytes([0xAB, 0xCD, 0xEF]))
pkt = _make_flood_packet(b"\xAB\x11\xAB", hash_size=1, hash_count=3)
result = h.flood_forward(pkt)
assert result is None
assert "loop" in pkt.drop_reason.lower()
def test_loop_detect_minimal_allows_three_occurrences(self):
"""minimal mode (threshold=4): three occurrences still OK."""
h = _make_handler(loop_detect="minimal",
local_hash_bytes=bytes([0xAB, 0xCD, 0xEF]))
pkt = _make_flood_packet(b"\xAB\x11\xAB\x22\xAB", hash_size=1, hash_count=5)
result = h.flood_forward(pkt)
assert result is not None
def test_loop_detect_minimal_blocks_four_occurrences(self):
"""minimal mode (threshold=4): four occurrences → loop."""
h = _make_handler(loop_detect="minimal",
local_hash_bytes=bytes([0xAB, 0xCD, 0xEF]))
pkt = _make_flood_packet(
b"\xAB\x11\xAB\x22\xAB\x33\xAB", hash_size=1, hash_count=7
)
result = h.flood_forward(pkt)
assert result is None
assert "loop" in pkt.drop_reason.lower()
def test_loop_detect_no_match_passes(self):
"""Strict mode still passes if our hash is not in the path."""
h = _make_handler(loop_detect="strict",
local_hash_bytes=bytes([0xAB, 0xCD, 0xEF]))
pkt = _make_flood_packet(b"\x11\x22\x33", hash_size=1, hash_count=3)
result = h.flood_forward(pkt)
assert result is not None
# ===================================================================
# 3. Own-hash re-forwarding prevention
# ===================================================================
class TestOwnHashReForwarding:
"""
After a repeater flood_forwards a packet (appending its own hash),
receiving that same packet back should be handled correctly.
"""
def test_forward_then_receive_again_is_duplicate(self):
"""
After forwarding, the packet hash is in seen_packets.
Receiving an identical packet is a duplicate.
"""
h = _make_handler(loop_detect="off")
pkt = _make_flood_packet(b"\x11", hash_size=1, hash_count=1,
payload=b"\xAA\xBB")
result = h.flood_forward(pkt)
assert result is not None
# The original packet's payload hash was marked seen
# Receiving same original packet again (before our hop was appended)
pkt2 = _make_flood_packet(b"\x11", hash_size=1, hash_count=1,
payload=b"\xAA\xBB")
result2 = h.flood_forward(pkt2)
assert result2 is None
assert pkt2.drop_reason == "Duplicate"
def test_strict_detects_own_hash_after_flood_chain(self):
"""
If the forwarded packet (with our hash appended) loops back to us,
strict mode detects our hash in the path.
"""
our_hash = bytes([0xAB, 0xCD, 0xEF])
h = _make_handler(loop_detect="strict", local_hash_bytes=our_hash)
# Original packet arrives, we forward (appending 0xAB)
pkt = _make_flood_packet(b"\x11", hash_size=1, hash_count=1,
payload=b"\xDD\xEE")
result = h.flood_forward(pkt)
assert result is not None
# Now path is [0x11, 0xAB], and this exact payload is in seen_packets
# Suppose another node re-forwards it with an additional hop,
# so it's a new payload in the packet hash sense (different path iteration)
# but path contains our hash 0xAB
looped_pkt = _make_flood_packet(
b"\x11\xAB\x22", hash_size=1, hash_count=3,
payload=b"\xDD\xEE\xFF" # different payload → not a duplicate
)
result2 = h.flood_forward(looped_pkt)
assert result2 is None
assert "loop" in looped_pkt.drop_reason.lower()
def test_off_mode_still_catches_duplicate_after_own_forward(self):
"""Even with loop_detect=off, duplicate suppression still works."""
h = _make_handler(loop_detect="off")
pkt = _make_flood_packet(payload=b"\x42\x43")
assert h.flood_forward(pkt) is not None
pkt2 = _make_flood_packet(payload=b"\x42\x43")
result = h.flood_forward(pkt2)
assert result is None
assert pkt2.drop_reason == "Duplicate"
# ===================================================================
# 4. Multi-byte hash + loop detection
# ===================================================================
class TestLoopDetectionMultiByte:
"""
Loop detection currently counts byte-level matches against local_hash
(single int). In multi-byte mode the per-hop hash is >1 byte, so
individual bytes in the path may coincidentally match.
These tests verify the actual engine behaviour.
"""
def test_2_byte_mode_strict_byte_level_match(self):
"""
In 2-byte mode with strict, _is_flood_looped scans individual bytes.
If local_hash (0xAB) appears as a byte anywhere in the 2-byte path
entries, it counts as a match.
"""
h = _make_handler(loop_detect="strict",
local_hash_bytes=bytes([0xAB, 0xCD, 0xEF]))
# Path: 2-byte hop [0xAB, 0x11] — byte 0xAB appears once
pkt = _make_flood_packet(b"\xAB\x11", hash_size=2, hash_count=1)
result = h.flood_forward(pkt)
# strict threshold=1, 0xAB appears once in raw bytes → loop detected
assert result is None
assert "loop" in pkt.drop_reason.lower()
def test_2_byte_mode_off_ignores_byte_match(self):
"""With loop_detect=off, even byte-level 0xAB matches are ignored."""
h = _make_handler(loop_detect="off",
local_hash_bytes=bytes([0xAB, 0xCD, 0xEF]))
pkt = _make_flood_packet(b"\xAB\x11", hash_size=2, hash_count=1)
result = h.flood_forward(pkt)
assert result is not None
def test_2_byte_no_local_hash_byte_passes_strict(self):
"""If local_hash byte doesn't appear anywhere in the 2-byte path, strict passes."""
h = _make_handler(loop_detect="strict",
local_hash_bytes=bytes([0xAB, 0xCD, 0xEF]))
# Path: [0x11, 0x22] — no 0xAB byte
pkt = _make_flood_packet(b"\x11\x22", hash_size=2, hash_count=1)
result = h.flood_forward(pkt)
assert result is not None
def test_3_byte_mode_local_hash_byte_in_path(self):
"""In 3-byte mode, the 0xAB byte anywhere triggers strict loop detection."""
h = _make_handler(loop_detect="strict",
local_hash_bytes=bytes([0xAB, 0xCD, 0xEF]))
# 3-byte hop: [0x11, 0xAB, 0x33] — 0xAB in the middle
pkt = _make_flood_packet(b"\x11\xAB\x33", hash_size=3, hash_count=1)
result = h.flood_forward(pkt)
assert result is None
def test_moderate_multi_byte_counts_all_byte_occurrences(self):
"""
moderate threshold=2. With 2-byte hops, each byte is counted
independently, so two occurrences of 0xAB across different hops
triggers the loop.
"""
h = _make_handler(loop_detect="moderate",
local_hash_bytes=bytes([0xAB, 0xCD, 0xEF]))
# Two 2-byte hops: [0xAB, 0x11, 0xAB, 0x22] — 0xAB appears twice
pkt = _make_flood_packet(b"\xAB\x11\xAB\x22", hash_size=2, hash_count=2)
result = h.flood_forward(pkt)
assert result is None
assert "loop" in pkt.drop_reason.lower()
def test_2_byte_flood_forward_appends_correctly(self):
"""
After flood_forward in 2-byte mode, verify the path contains
only the expected bytes (no extra, no corruption).
"""
h = _make_handler(loop_detect="off",
local_hash_bytes=bytes([0xAB, 0xCD, 0xEF]))
pkt = _make_flood_packet(b"\x11\x22", hash_size=2, hash_count=1)
result = h.flood_forward(pkt)
assert result is not None
assert result.get_path_hash_count() == 2
hashes = result.get_path_hashes()
assert hashes == [b"\x11\x22", b"\xAB\xCD"]
# ===================================================================
# 5. Global flood policy
# ===================================================================
class TestGlobalFloodPolicy:
"""Test global_flood_allow=False blocks flood packets."""
def test_global_flood_disabled_drops_flood(self):
h = _make_handler(global_flood_allow=False)
pkt = _make_flood_packet(payload=b"\x01\x02")
result = h.flood_forward(pkt)
assert result is None
assert pkt.drop_reason is not None
def test_global_flood_enabled_allows_flood(self):
h = _make_handler(global_flood_allow=True)
pkt = _make_flood_packet(payload=b"\x01\x02")
result = h.flood_forward(pkt)
assert result is not None
def test_transport_flood_without_codes_drops(self):
"""ROUTE_TYPE_TRANSPORT_FLOOD with global_flood_allow=False and no valid codes."""
h = _make_handler(global_flood_allow=False)
# Nullify the storage to ensure transport code check fails
h.storage = None
pkt = Packet()
pkt.header = ROUTE_TYPE_TRANSPORT_FLOOD
pkt.path = bytearray()
pkt.path_len = PathUtils.encode_path_len(1, 0)
pkt.payload = bytearray(b"\x01\x02")
pkt.payload_len = 2
pkt.transport_codes = [0x1234, 0x5678]
result = h.flood_forward(pkt)
assert result is None
# ===================================================================
# 6. do_not_retransmit flag
# ===================================================================
class TestDoNotRetransmit:
"""Verify packets flagged do_not_retransmit are dropped."""
def test_flagged_packet_dropped(self):
h = _make_handler()
pkt = _make_flood_packet(payload=b"\x01\x02")
pkt.mark_do_not_retransmit()
result = h.flood_forward(pkt)
assert result is None
assert "retransmit" in pkt.drop_reason.lower()
def test_unflagged_packet_passes(self):
h = _make_handler()
pkt = _make_flood_packet(payload=b"\x01\x02")
assert not pkt.is_marked_do_not_retransmit()
result = h.flood_forward(pkt)
assert result is not None
# ===================================================================
# 7. validate_packet edge cases
# ===================================================================
class TestValidatePacket:
"""Test packet validation rules used by flood_forward."""
def test_empty_payload_rejected(self):
h = _make_handler()
pkt = _make_flood_packet(payload=b"")
pkt.payload = bytearray()
result = h.flood_forward(pkt)
assert result is None
assert "Empty payload" in (pkt.drop_reason or "")
def test_max_path_size_rejected(self):
"""Path at MAX_PATH_SIZE (64 bytes) is rejected before forwarding."""
h = _make_handler()
pkt = Packet()
pkt.header = ROUTE_TYPE_FLOOD
pkt.path = bytearray(range(64))
# Manually set path_len to bypass encode_path_len validation
pkt.path_len = PathUtils.encode_path_len(1, 63)
pkt.payload = bytearray(b"\x01\x02")
pkt.payload_len = 2
# validate_packet checks len(path) >= MAX_PATH_SIZE
result = h.flood_forward(pkt)
assert result is None
def test_none_payload_rejected(self):
h = _make_handler()
pkt = Packet()
pkt.header = ROUTE_TYPE_FLOOD
pkt.payload = None
result = h.flood_forward(pkt)
assert result is None
def test_hop_count_63_rejected(self):
"""At 63 hops (1-byte mode), appending would overflow 6-bit counter."""
h = _make_handler()
# 63 bytes of path with hash_count=63
pkt = _make_flood_packet(bytes(63), hash_size=1, hash_count=63)
result = h.flood_forward(pkt)
assert result is None
assert "maximum" in (pkt.drop_reason or "").lower() or \
"exceed" in (pkt.drop_reason or "").lower()
# ===================================================================
# 8. Serialization round-trip after dedup/loop decisions
# ===================================================================
class TestSerializationAfterForward:
"""
Verify packets that survive loop/dedup checks can serialize
and deserialize correctly, preserving the appended path.
"""
def test_forwarded_1_byte_round_trips(self):
h = _make_handler(loop_detect="moderate",
local_hash_bytes=bytes([0x42, 0x00, 0x00]))
pkt = _make_flood_packet(b"\x11\x22", hash_size=1, hash_count=2,
payload=b"\xAA\xBB")
result = h.flood_forward(pkt)
assert result is not None
wire = result.write_to()
pkt2 = Packet()
pkt2.read_from(wire)
assert pkt2.get_path_hash_count() == 3
assert pkt2.get_path_hashes() == [b"\x11", b"\x22", b"\x42"]
assert pkt2.get_payload() == b"\xAA\xBB"
def test_forwarded_2_byte_round_trips(self):
h = _make_handler(loop_detect="off",
local_hash_bytes=bytes([0xAA, 0xBB, 0xCC]))
pkt = _make_flood_packet(b"\x11\x22", hash_size=2, hash_count=1,
payload=b"\xDE\xAD")
result = h.flood_forward(pkt)
assert result is not None
wire = result.write_to()
pkt2 = Packet()
pkt2.read_from(wire)
assert pkt2.get_path_hash_size() == 2
assert pkt2.get_path_hash_count() == 2
assert pkt2.get_path_hashes() == [b"\x11\x22", b"\xAA\xBB"]
def test_forwarded_3_byte_round_trips(self):
h = _make_handler(loop_detect="off",
local_hash_bytes=bytes([0xAA, 0xBB, 0xCC]))
pkt = _make_flood_packet(b"\x11\x22\x33", hash_size=3, hash_count=1,
payload=b"\xBE\xEF")
result = h.flood_forward(pkt)
assert result is not None
wire = result.write_to()
pkt2 = Packet()
pkt2.read_from(wire)
assert pkt2.get_path_hash_size() == 3
assert pkt2.get_path_hash_count() == 2
assert pkt2.get_path_hashes() == [b"\x11\x22\x33", b"\xAA\xBB\xCC"]
# ===================================================================
# 9. Multi-repeater flood chain with loop detection
# ===================================================================
class TestFloodChainLoopDetection:
"""
Simulate a flood packet traversing multiple repeaters and verify
loop detection works across the chain.
"""
def test_three_repeater_chain_no_loop(self):
"""A→B→C with distinct hashes: no loop at any step (strict mode)."""
hashes = [
bytes([0x11, 0x00, 0x00]),
bytes([0x22, 0x00, 0x00]),
bytes([0x33, 0x00, 0x00]),
]
handlers = [_make_handler(loop_detect="strict", local_hash_bytes=h) for h in hashes]
pkt = _make_flood_packet(payload=b"\xFE\xED")
for i, h in enumerate(handlers):
result = h.flood_forward(pkt)
assert result is not None, f"repeater {i} unexpectedly dropped packet"
# Use different payload to avoid dedup between handlers
# (In real life they'd be on different nodes)
pkt = _make_flood_packet(
path_bytes=bytes(result.path),
hash_size=1,
hash_count=result.get_path_hash_count(),
payload=b"\xFE\xED" + bytes([i + 1]),
)
assert pkt.get_path_hash_count() == 3
def test_circular_topology_strict_blocks_loop(self):
"""
ABCA: when the packet returns to A, strict mode detects
A's hash (0x11) already in the path.
"""
hash_a = bytes([0x11, 0x00, 0x00])
hash_b = bytes([0x22, 0x00, 0x00])
hash_c = bytes([0x33, 0x00, 0x00])
h_a = _make_handler(loop_detect="strict", local_hash_bytes=hash_a)
h_b = _make_handler(loop_detect="strict", local_hash_bytes=hash_b)
h_c = _make_handler(loop_detect="strict", local_hash_bytes=hash_c)
# A originates
pkt = _make_flood_packet(payload=b"\x01\x02\x03")
pkt = h_a.flood_forward(pkt)
assert pkt is not None # path: [0x11]
# B forwards (new payload to avoid dedup)
pkt_b = _make_flood_packet(
bytes(pkt.path), hash_size=1,
hash_count=pkt.get_path_hash_count(),
payload=b"\x01\x02\x03\x04",
)
pkt_b = h_b.flood_forward(pkt_b)
assert pkt_b is not None # path: [0x11, 0x22]
# C forwards
pkt_c = _make_flood_packet(
bytes(pkt_b.path), hash_size=1,
hash_count=pkt_b.get_path_hash_count(),
payload=b"\x01\x02\x03\x04\x05",
)
pkt_c = h_c.flood_forward(pkt_c)
assert pkt_c is not None # path: [0x11, 0x22, 0x33]
# Back to A — 0x11 is already in path → strict blocks it
pkt_a2 = _make_flood_packet(
bytes(pkt_c.path), hash_size=1,
hash_count=pkt_c.get_path_hash_count(),
payload=b"\x01\x02\x03\x04\x05\x06",
)
result = h_a.flood_forward(pkt_a2)
assert result is None
assert "loop" in pkt_a2.drop_reason.lower()
def test_circular_topology_off_allows_loop_but_dedup_catches(self):
"""
With loop_detect=off and the exact same payload, duplicate
suppression catches the re-visit even without loop detection.
"""
h = _make_handler(loop_detect="off", local_hash_bytes=bytes([0x11, 0x00, 0x00]))
pkt = _make_flood_packet(payload=b"\xAA\xBB")
assert h.flood_forward(pkt) is not None
# Same payload comes back
pkt2 = _make_flood_packet(payload=b"\xAA\xBB")
result = h.flood_forward(pkt2)
assert result is None
assert pkt2.drop_reason == "Duplicate"
def test_two_byte_chain_loop_detected(self):
"""2-byte mode: circular path A→B→A detected via byte-level scan."""
hash_a = bytes([0xAA, 0xBB, 0x00])
hash_b = bytes([0xCC, 0xDD, 0x00])
h_a = _make_handler(loop_detect="strict", local_hash_bytes=hash_a)
h_b = _make_handler(loop_detect="strict", local_hash_bytes=hash_b)
# A forwards (2-byte mode)
pkt = _make_flood_packet(b"", hash_size=2, hash_count=0, payload=b"\x01\x02")
pkt = h_a.flood_forward(pkt)
assert pkt is not None # path: [0xAA, 0xBB]
# B forwards
pkt_b = _make_flood_packet(
bytes(pkt.path), hash_size=2,
hash_count=pkt.get_path_hash_count(),
payload=b"\x01\x02\x03",
)
pkt_b = h_b.flood_forward(pkt_b)
assert pkt_b is not None # path: [0xAA, 0xBB, 0xCC, 0xDD]
# Back to A — byte 0xAA is in path → strict detects it
pkt_a2 = _make_flood_packet(
bytes(pkt_b.path), hash_size=2,
hash_count=pkt_b.get_path_hash_count(),
payload=b"\x01\x02\x03\x04",
)
result = h_a.flood_forward(pkt_a2)
assert result is None
assert "loop" in pkt_a2.drop_reason.lower()
# ===================================================================
# 10. Normalize loop_detect_mode edge cases
# ===================================================================
class TestNormalizeLoopDetectMode:
"""Verify _normalize_loop_detect_mode handles edge cases."""
def test_uppercase_normalized(self):
h = _make_handler(loop_detect="STRICT")
assert h.loop_detect_mode == "strict"
def test_whitespace_stripped(self):
h = _make_handler(loop_detect=" moderate ")
assert h.loop_detect_mode == "moderate"
def test_invalid_defaults_to_off(self):
h = _make_handler(loop_detect="invalid_value")
assert h.loop_detect_mode == "off"
def test_numeric_defaults_to_off(self):
h = _make_handler(loop_detect=42)
assert h.loop_detect_mode == "off"
def test_none_defaults_to_off(self):
h = _make_handler(loop_detect=None)
assert h.loop_detect_mode == "off"
+780
View File
@@ -0,0 +1,780 @@
"""
Integration tests for multi-byte path hash support using real pymc_core protocol objects.
Exercises actual Packet, PathUtils, PacketBuilder, and engine forwarding
rather than mocking the protocol layer. Covers:
- PathUtils encode/decode round-trips for all hash sizes
- Packet serialization/deserialization with multi-byte paths
- Packet.apply_path_hash_mode and get_path_hashes
- Engine flood_forward with real multi-byte encoded packets
- Engine direct_forward with real multi-byte encoded packets
- PacketBuilder.create_trace payload structure + TraceHandler parsing
- Max-hop boundary enforcement per hash size
"""
import struct
from collections import OrderedDict
from unittest.mock import MagicMock, patch
import pytest
from pymc_core.protocol import Packet, PacketBuilder, PathUtils
from pymc_core.protocol.constants import (
MAX_PATH_SIZE,
PATH_HASH_COUNT_MASK,
PATH_HASH_SIZE_SHIFT,
PAYLOAD_TYPE_TRACE,
PH_TYPE_SHIFT,
ROUTE_TYPE_DIRECT,
ROUTE_TYPE_FLOOD,
)
from pymc_core.node.handlers.trace import TraceHandler
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
LOCAL_HASH_BYTES = bytes([0xAB, 0xCD, 0xEF])
def _make_flood_packet(path_bytes: bytes, hash_size: int, hash_count: int,
payload: bytes = b"\x01\x02\x03\x04") -> Packet:
"""Create a real flood Packet with the given multi-byte path encoding."""
pkt = Packet()
pkt.header = ROUTE_TYPE_FLOOD
pkt.path = bytearray(path_bytes)
pkt.path_len = PathUtils.encode_path_len(hash_size, hash_count)
pkt.payload = bytearray(payload)
pkt.payload_len = len(payload)
return pkt
def _make_direct_packet(path_bytes: bytes, hash_size: int, hash_count: int,
payload: bytes = b"\x01\x02\x03\x04") -> Packet:
"""Create a real direct-routed Packet."""
pkt = Packet()
pkt.header = ROUTE_TYPE_DIRECT
pkt.path = bytearray(path_bytes)
pkt.path_len = PathUtils.encode_path_len(hash_size, hash_count)
pkt.payload = bytearray(payload)
pkt.payload_len = len(payload)
return pkt
def _make_handler(path_hash_mode=0, local_hash_bytes=None):
"""Create a real RepeaterHandler with minimal mocking (only radio/storage)."""
lhb = local_hash_bytes or LOCAL_HASH_BYTES
config = {
"repeater": {
"mode": "forward",
"cache_ttl": 3600,
"use_score_for_tx": False,
"score_threshold": 0.3,
"send_advert_interval_hours": 0,
"node_name": "test-node",
},
"mesh": {
"global_flood_allow": True,
"loop_detect": "off",
"path_hash_mode": path_hash_mode,
},
"delays": {"tx_delay_factor": 1.0, "direct_tx_delay_factor": 0.5},
"duty_cycle": {"max_airtime_per_minute": 3600, "enforcement_enabled": True},
"radio": {
"spreading_factor": 8,
"bandwidth": 125000,
"coding_rate": 8,
"preamble_length": 17,
},
}
dispatcher = MagicMock()
dispatcher.radio = MagicMock(
spreading_factor=8, bandwidth=125000, coding_rate=8,
preamble_length=17, frequency=915000000, tx_power=14,
)
dispatcher.local_identity = MagicMock()
with (
patch("repeater.engine.StorageCollector"),
patch("repeater.engine.RepeaterHandler._start_background_tasks"),
):
from repeater.engine import RepeaterHandler
h = RepeaterHandler(config, dispatcher, lhb[0], local_hash_bytes=lhb)
return h
# ===================================================================
# 1. PathUtils — encode/decode round-trips
# ===================================================================
class TestPathUtilsRoundTrip:
"""Verify PathUtils encode/decode for all valid hash sizes and hop counts."""
@pytest.mark.parametrize("hash_size", [1, 2, 3])
def test_encode_decode_hash_size(self, hash_size):
encoded = PathUtils.encode_path_len(hash_size, 0)
assert PathUtils.get_path_hash_size(encoded) == hash_size
assert PathUtils.get_path_hash_count(encoded) == 0
@pytest.mark.parametrize("hash_size,count", [
(1, 1), (1, 10), (1, 63),
(2, 1), (2, 15), (2, 32),
(3, 1), (3, 10), (3, 21),
])
def test_encode_decode_round_trip(self, hash_size, count):
encoded = PathUtils.encode_path_len(hash_size, count)
assert PathUtils.get_path_hash_size(encoded) == hash_size
assert PathUtils.get_path_hash_count(encoded) == count
assert PathUtils.get_path_byte_len(encoded) == hash_size * count
@pytest.mark.parametrize("hash_size", [1, 2, 3])
def test_encode_zero_hops(self, hash_size):
encoded = PathUtils.encode_path_len(hash_size, 0)
assert PathUtils.get_path_byte_len(encoded) == 0
assert PathUtils.is_valid_path_len(encoded)
def test_encode_preserves_bit_layout(self):
"""Verify the actual bit layout: bits 6-7 = (hash_size-1), bits 0-5 = count."""
for hs in (1, 2, 3):
for count in (0, 1, 30, 63):
if count * hs > MAX_PATH_SIZE:
continue
encoded = PathUtils.encode_path_len(hs, count)
assert (encoded >> PATH_HASH_SIZE_SHIFT) == hs - 1
assert (encoded & PATH_HASH_COUNT_MASK) == count
def test_1_byte_backward_compatible(self):
"""For hash_size=1, encoded byte == raw hop count (legacy compat)."""
for count in range(64):
encoded = PathUtils.encode_path_len(1, count)
assert encoded == count
def test_encode_hop_count_overflow_raises(self):
with pytest.raises(ValueError, match="hop count must be 0-63"):
PathUtils.encode_path_len(1, 64)
@pytest.mark.parametrize("hash_size,max_hops", [(1, 63), (2, 32), (3, 21)])
def test_max_hops_boundary(self, hash_size, max_hops):
at_max = PathUtils.encode_path_len(hash_size, max_hops)
assert PathUtils.is_path_at_max_hops(at_max)
assert PathUtils.is_valid_path_len(at_max)
@pytest.mark.parametrize("hash_size,below_max", [(1, 62), (2, 31), (3, 20)])
def test_below_max_hops_not_at_max(self, hash_size, below_max):
encoded = PathUtils.encode_path_len(hash_size, below_max)
assert not PathUtils.is_path_at_max_hops(encoded)
def test_zero_path_len_not_at_max(self):
assert not PathUtils.is_path_at_max_hops(0)
def test_invalid_path_len_too_many_bytes(self):
"""33 hops of 2 bytes = 66, exceeds MAX_PATH_SIZE=64."""
encoded = PathUtils.encode_path_len(2, 33)
assert not PathUtils.is_valid_path_len(encoded)
def test_hash_size_4_reserved_invalid(self):
"""hash_size=4 (bits 6-7 = 0b11) is reserved and invalid."""
# Manually construct: (3 << 6) | 1 = 0xC1
raw = (3 << PATH_HASH_SIZE_SHIFT) | 1
assert not PathUtils.is_valid_path_len(raw)
# ===================================================================
# 2. Packet — multi-byte path serialization round-trip
# ===================================================================
class TestPacketMultiBytePath:
"""Verify Packet write_to/read_from preserves multi-byte path encoding."""
def test_1_byte_path_round_trip(self):
pkt = _make_flood_packet(b"\xAA\xBB\xCC", hash_size=1, hash_count=3)
wire = pkt.write_to()
pkt2 = Packet()
pkt2.read_from(wire)
assert pkt2.get_path_hash_size() == 1
assert pkt2.get_path_hash_count() == 3
assert bytes(pkt2.path) == b"\xAA\xBB\xCC"
def test_2_byte_path_round_trip(self):
path = b"\xAA\xBB\xCC\xDD" # 2 hops of 2 bytes
pkt = _make_flood_packet(path, hash_size=2, hash_count=2)
wire = pkt.write_to()
pkt2 = Packet()
pkt2.read_from(wire)
assert pkt2.get_path_hash_size() == 2
assert pkt2.get_path_hash_count() == 2
assert bytes(pkt2.path) == path
def test_3_byte_path_round_trip(self):
path = b"\xAA\xBB\xCC\xDD\xEE\xFF" # 2 hops of 3 bytes
pkt = _make_flood_packet(path, hash_size=3, hash_count=2)
wire = pkt.write_to()
pkt2 = Packet()
pkt2.read_from(wire)
assert pkt2.get_path_hash_size() == 3
assert pkt2.get_path_hash_count() == 2
assert bytes(pkt2.path) == path
def test_empty_path_2_byte_mode(self):
pkt = _make_flood_packet(b"", hash_size=2, hash_count=0)
wire = pkt.write_to()
pkt2 = Packet()
pkt2.read_from(wire)
assert pkt2.get_path_hash_size() == 2
assert pkt2.get_path_hash_count() == 0
assert bytes(pkt2.path) == b""
def test_payload_preserved_after_multibyte_path(self):
"""Payload bytes after a multi-byte path are correctly sliced."""
payload = b"\xDE\xAD\xBE\xEF"
path = b"\x11\x22\x33\x44\x55\x66"
pkt = _make_flood_packet(path, hash_size=3, hash_count=2, payload=payload)
wire = pkt.write_to()
pkt2 = Packet()
pkt2.read_from(wire)
assert pkt2.get_payload() == payload
def test_path_len_byte_on_wire(self):
"""The encoded path_len byte on the wire has the correct bit pattern."""
pkt = _make_flood_packet(b"\x11\x22\x33\x44", hash_size=2, hash_count=2)
wire = pkt.write_to()
# Wire: header(1) + path_len(1) + path(4) + payload(4)
# For ROUTE_TYPE_FLOOD (no transport codes), path_len is at index 1
path_len_on_wire = wire[1]
assert PathUtils.get_path_hash_size(path_len_on_wire) == 2
assert PathUtils.get_path_hash_count(path_len_on_wire) == 2
class TestPacketGetPathHashes:
"""Verify Packet.get_path_hashes splits path into per-hop byte entries."""
def test_1_byte_hashes(self):
pkt = _make_flood_packet(b"\xAA\xBB\xCC", hash_size=1, hash_count=3)
hashes = pkt.get_path_hashes()
assert hashes == [b"\xAA", b"\xBB", b"\xCC"]
def test_2_byte_hashes(self):
pkt = _make_flood_packet(b"\xAA\xBB\xCC\xDD", hash_size=2, hash_count=2)
hashes = pkt.get_path_hashes()
assert hashes == [b"\xAA\xBB", b"\xCC\xDD"]
def test_3_byte_hashes(self):
pkt = _make_flood_packet(
b"\xAA\xBB\xCC\xDD\xEE\xFF", hash_size=3, hash_count=2
)
hashes = pkt.get_path_hashes()
assert hashes == [b"\xAA\xBB\xCC", b"\xDD\xEE\xFF"]
def test_empty_path(self):
pkt = _make_flood_packet(b"", hash_size=2, hash_count=0)
assert pkt.get_path_hashes() == []
def test_hashes_hex_output(self):
pkt = _make_flood_packet(b"\x0A\x0B\x0C\x0D", hash_size=2, hash_count=2)
hex_hashes = pkt.get_path_hashes_hex()
assert hex_hashes == ["0A0B", "0C0D"]
class TestPacketApplyPathHashMode:
"""Verify Packet.apply_path_hash_mode sets encoding for 0-hop packets."""
@pytest.mark.parametrize("mode,expected_hash_size", [(0, 1), (1, 2), (2, 3)])
def test_apply_mode_sets_hash_size(self, mode, expected_hash_size):
pkt = Packet()
pkt.header = ROUTE_TYPE_FLOOD
pkt.payload = bytearray(b"\x01")
pkt.payload_len = 1
pkt.apply_path_hash_mode(mode)
assert pkt.get_path_hash_size() == expected_hash_size
assert pkt.get_path_hash_count() == 0
def test_apply_mode_skips_nonzero_hop_count(self):
"""Mode should not be re-applied if path already has hops."""
pkt = _make_flood_packet(b"\xAA\xBB", hash_size=2, hash_count=1)
original_path_len = pkt.path_len
pkt.apply_path_hash_mode(0) # try to override to 1-byte
assert pkt.path_len == original_path_len # unchanged
def test_apply_mode_skips_trace_packets(self):
"""Trace packets should never have path_hash_mode applied."""
pkt = PacketBuilder.create_trace(tag=1, auth_code=2, flags=0)
pkt.apply_path_hash_mode(2)
# Trace packet path_len stays 0 (no routing path)
assert pkt.path_len == 0
def test_apply_invalid_mode_raises(self):
pkt = Packet()
pkt.header = ROUTE_TYPE_FLOOD
with pytest.raises(ValueError, match="path_hash_mode must be 0, 1, or 2"):
pkt.apply_path_hash_mode(3)
class TestPacketSetPath:
"""Verify Packet.set_path with explicit path_len_encoded."""
def test_set_path_with_encoded_len(self):
pkt = Packet()
pkt.header = ROUTE_TYPE_FLOOD
path = b"\xAA\xBB\xCC\xDD"
encoded = PathUtils.encode_path_len(2, 2)
pkt.set_path(path, path_len_encoded=encoded)
assert pkt.get_path_hash_size() == 2
assert pkt.get_path_hash_count() == 2
assert bytes(pkt.path) == path
def test_set_path_without_encoded_defaults_1_byte(self):
"""Without explicit path_len_encoded, defaults to 1-byte hash_size."""
pkt = Packet()
pkt.header = ROUTE_TYPE_FLOOD
pkt.set_path(b"\xAA\xBB\xCC")
assert pkt.get_path_hash_size() == 1
assert pkt.get_path_hash_count() == 3
# ===================================================================
# 3. Engine flood_forward — real Packet objects
# ===================================================================
class TestFloodForwardMultiByte:
"""Test RepeaterHandler.flood_forward with real multi-byte Packet objects."""
def test_1_byte_mode_appends_single_byte(self):
h = _make_handler(path_hash_mode=0, local_hash_bytes=bytes([0xAB, 0xCD, 0xEF]))
pkt = _make_flood_packet(b"\x11", hash_size=1, hash_count=1)
result = h.flood_forward(pkt)
assert result is not None
assert result.get_path_hash_count() == 2
assert result.get_path_hash_size() == 1
hashes = result.get_path_hashes()
assert hashes[0] == b"\x11"
assert hashes[1] == b"\xAB" # first byte of local_hash_bytes
def test_2_byte_mode_appends_two_bytes(self):
h = _make_handler(path_hash_mode=1, local_hash_bytes=bytes([0xAB, 0xCD, 0xEF]))
pkt = _make_flood_packet(b"\x11\x22", hash_size=2, hash_count=1)
result = h.flood_forward(pkt)
assert result is not None
assert result.get_path_hash_count() == 2
assert result.get_path_hash_size() == 2
hashes = result.get_path_hashes()
assert hashes[0] == b"\x11\x22"
assert hashes[1] == b"\xAB\xCD"
def test_3_byte_mode_appends_three_bytes(self):
h = _make_handler(path_hash_mode=2, local_hash_bytes=bytes([0xAB, 0xCD, 0xEF]))
pkt = _make_flood_packet(b"\x11\x22\x33", hash_size=3, hash_count=1)
result = h.flood_forward(pkt)
assert result is not None
assert result.get_path_hash_count() == 2
assert result.get_path_hash_size() == 3
hashes = result.get_path_hashes()
assert hashes[0] == b"\x11\x22\x33"
assert hashes[1] == b"\xAB\xCD\xEF"
def test_empty_path_gets_local_hash_appended(self):
h = _make_handler(path_hash_mode=1, local_hash_bytes=bytes([0xAB, 0xCD, 0xEF]))
pkt = _make_flood_packet(b"", hash_size=2, hash_count=0)
result = h.flood_forward(pkt)
assert result is not None
assert result.get_path_hash_count() == 1
hashes = result.get_path_hashes()
assert hashes[0] == b"\xAB\xCD"
def test_path_len_re_encoded_after_forward(self):
"""After appending, path_len byte should encode (hash_size, count+1)."""
h = _make_handler(path_hash_mode=1, local_hash_bytes=bytes([0xAB, 0xCD, 0xEF]))
pkt = _make_flood_packet(b"\x11\x22\x33\x44", hash_size=2, hash_count=2)
result = h.flood_forward(pkt)
assert result is not None
expected_path_len = PathUtils.encode_path_len(2, 3)
assert result.path_len == expected_path_len
def test_forwarded_packet_serializes_correctly(self):
"""The forwarded packet should serialize and deserialize cleanly."""
h = _make_handler(path_hash_mode=1, local_hash_bytes=bytes([0xAB, 0xCD, 0xEF]))
pkt = _make_flood_packet(b"\x11\x22", hash_size=2, hash_count=1)
result = h.flood_forward(pkt)
assert result is not None
wire = result.write_to()
pkt2 = Packet()
pkt2.read_from(wire)
assert pkt2.get_path_hash_size() == 2
assert pkt2.get_path_hash_count() == 2
assert pkt2.get_path_hashes() == [b"\x11\x22", b"\xAB\xCD"]
def test_flood_rejects_at_max_hops_2_byte(self):
"""At 32 hops (2-byte mode), flood_forward should drop the packet."""
h = _make_handler(path_hash_mode=1, local_hash_bytes=bytes([0xAB, 0xCD, 0xEF]))
path = bytes([0x00, 0x01] * 32) # 32 hops × 2 bytes = 64 bytes
pkt = _make_flood_packet(path, hash_size=2, hash_count=32)
result = h.flood_forward(pkt)
assert result is None
assert pkt.drop_reason is not None
def test_flood_rejects_at_max_hops_3_byte(self):
"""At 21 hops (3-byte mode), adding one more would exceed MAX_PATH_SIZE."""
h = _make_handler(path_hash_mode=2, local_hash_bytes=bytes([0xAB, 0xCD, 0xEF]))
path = bytes([0x00, 0x01, 0x02] * 21) # 21 hops × 3 bytes = 63 bytes
pkt = _make_flood_packet(path, hash_size=3, hash_count=21)
result = h.flood_forward(pkt)
assert result is None
assert pkt.drop_reason is not None
def test_flood_allows_below_max_2_byte(self):
"""At 31 hops (2-byte), one more should succeed."""
h = _make_handler(path_hash_mode=1, local_hash_bytes=bytes([0xAB, 0xCD, 0xEF]))
path = bytes(range(62)) # 31 hops × 2 bytes
pkt = _make_flood_packet(path, hash_size=2, hash_count=31)
result = h.flood_forward(pkt)
assert result is not None
assert result.get_path_hash_count() == 32
def test_flood_rejects_empty_payload(self):
h = _make_handler(path_hash_mode=0)
pkt = _make_flood_packet(b"", hash_size=1, hash_count=0, payload=b"")
pkt.payload = bytearray()
result = h.flood_forward(pkt)
assert result is None
assert "Empty payload" in (pkt.drop_reason or "")
# ===================================================================
# 4. Engine direct_forward — real Packet objects
# ===================================================================
class TestDirectForwardMultiByte:
"""Test RepeaterHandler.direct_forward with real multi-byte Packet objects."""
def test_1_byte_match_strips_first_hop(self):
h = _make_handler(path_hash_mode=0, local_hash_bytes=bytes([0xAB, 0xCD, 0xEF]))
# Path: [0xAB, 0x11] — first hop matches local_hash_bytes[0]
pkt = _make_direct_packet(b"\xAB\x11", hash_size=1, hash_count=2)
result = h.direct_forward(pkt)
assert result is not None
assert result.get_path_hash_count() == 1
assert result.get_path_hash_size() == 1
assert result.get_path_hashes() == [b"\x11"]
def test_2_byte_match_strips_first_hop(self):
h = _make_handler(path_hash_mode=1, local_hash_bytes=bytes([0xAB, 0xCD, 0xEF]))
# Path: [0xAB,0xCD, 0x11,0x22] — first 2-byte hop matches local_hash_bytes[:2]
pkt = _make_direct_packet(b"\xAB\xCD\x11\x22", hash_size=2, hash_count=2)
result = h.direct_forward(pkt)
assert result is not None
assert result.get_path_hash_count() == 1
assert result.get_path_hash_size() == 2
assert result.get_path_hashes() == [b"\x11\x22"]
def test_3_byte_match_strips_first_hop(self):
h = _make_handler(path_hash_mode=2, local_hash_bytes=bytes([0xAB, 0xCD, 0xEF]))
pkt = _make_direct_packet(
b"\xAB\xCD\xEF\x11\x22\x33", hash_size=3, hash_count=2
)
result = h.direct_forward(pkt)
assert result is not None
assert result.get_path_hash_count() == 1
assert result.get_path_hash_size() == 3
assert result.get_path_hashes() == [b"\x11\x22\x33"]
def test_2_byte_mismatch_rejects(self):
h = _make_handler(path_hash_mode=1, local_hash_bytes=bytes([0xAB, 0xCD, 0xEF]))
# Path: [0xFF,0xEE, ...] — first 2-byte hop doesn't match
pkt = _make_direct_packet(b"\xFF\xEE\x11\x22", hash_size=2, hash_count=2)
result = h.direct_forward(pkt)
assert result is None
assert "not for us" in (pkt.drop_reason or "")
def test_path_len_re_encoded_after_strip(self):
h = _make_handler(path_hash_mode=1, local_hash_bytes=bytes([0xAB, 0xCD, 0xEF]))
pkt = _make_direct_packet(b"\xAB\xCD\x11\x22\x33\x44", hash_size=2, hash_count=3)
result = h.direct_forward(pkt)
assert result is not None
expected_path_len = PathUtils.encode_path_len(2, 2)
assert result.path_len == expected_path_len
def test_last_hop_strips_to_empty(self):
"""When only one hop remains and it matches, path becomes empty."""
h = _make_handler(path_hash_mode=1, local_hash_bytes=bytes([0xAB, 0xCD, 0xEF]))
pkt = _make_direct_packet(b"\xAB\xCD", hash_size=2, hash_count=1)
result = h.direct_forward(pkt)
assert result is not None
assert result.get_path_hash_count() == 0
assert bytes(result.path) == b""
def test_forwarded_direct_serializes_correctly(self):
"""After stripping, the packet should serialize/deserialize cleanly."""
h = _make_handler(path_hash_mode=2, local_hash_bytes=bytes([0xAB, 0xCD, 0xEF]))
pkt = _make_direct_packet(
b"\xAB\xCD\xEF\x11\x22\x33\x44\x55\x66", hash_size=3, hash_count=3
)
result = h.direct_forward(pkt)
assert result is not None
wire = result.write_to()
pkt2 = Packet()
pkt2.read_from(wire)
assert pkt2.get_path_hash_size() == 3
assert pkt2.get_path_hash_count() == 2
assert pkt2.get_path_hashes() == [b"\x11\x22\x33", b"\x44\x55\x66"]
def test_no_path_rejects(self):
h = _make_handler(path_hash_mode=1, local_hash_bytes=bytes([0xAB, 0xCD, 0xEF]))
pkt = _make_direct_packet(b"", hash_size=2, hash_count=0)
result = h.direct_forward(pkt)
assert result is None
assert "no path" in (pkt.drop_reason or "").lower()
def test_path_too_short_for_hash_size(self):
"""If path has fewer bytes than hash_size, reject."""
h = _make_handler(path_hash_mode=1, local_hash_bytes=bytes([0xAB, 0xCD, 0xEF]))
pkt = _make_direct_packet(b"\xAB", hash_size=2, hash_count=1)
# path has 1 byte but hash_size is 2
result = h.direct_forward(pkt)
assert result is None
# ===================================================================
# 5. Flood → Direct — multi-hop forwarding chain
# ===================================================================
class TestMultiHopForwardingChain:
"""Simulate a multi-hop path being built and then consumed."""
def test_flood_chain_builds_path_then_direct_consumes(self):
"""
Simulate: node_A floods repeater_1 forwards repeater_2 forwards
Then the return direct packet strips hops in reverse order.
"""
node_a_hash = bytes([0x11, 0x22, 0x33])
rep1_hash = bytes([0xAA, 0xBB, 0xCC])
rep2_hash = bytes([0xDD, 0xEE, 0xFF])
# Step 1: node_A creates a flood packet with 0 hops, 2-byte mode
pkt = _make_flood_packet(b"", hash_size=2, hash_count=0)
# Step 2: repeater_1 flood_forward adds its 2-byte hash
h1 = _make_handler(path_hash_mode=1, local_hash_bytes=rep1_hash)
pkt = h1.flood_forward(pkt)
assert pkt is not None
assert pkt.get_path_hashes() == [rep1_hash[:2]]
# Step 3: repeater_2 flood_forward adds its 2-byte hash
h2 = _make_handler(path_hash_mode=1, local_hash_bytes=rep2_hash)
pkt = h2.flood_forward(pkt)
assert pkt is not None
assert pkt.get_path_hashes() == [rep1_hash[:2], rep2_hash[:2]]
# Verify the path serializes correctly
wire = pkt.write_to()
pkt_rx = Packet()
pkt_rx.read_from(wire)
assert pkt_rx.get_path_hash_size() == 2
assert pkt_rx.get_path_hash_count() == 2
# Step 4: Now simulate a direct reply going back through the path
# The path should be [rep1, rep2] — direct packet addressed to rep1 first
# (Direct packets strip from the front)
direct_pkt = _make_direct_packet(
bytes(pkt_rx.path), hash_size=2, hash_count=2,
payload=b"\xFE\xED"
)
# repeater_1 strips its hop
result = h1.direct_forward(direct_pkt)
assert result is not None
assert result.get_path_hash_count() == 1
assert result.get_path_hashes() == [rep2_hash[:2]]
# repeater_2 strips its hop
result = h2.direct_forward(result)
assert result is not None
assert result.get_path_hash_count() == 0
assert bytes(result.path) == b""
# ===================================================================
# 6. PacketBuilder.create_trace — payload structure
# ===================================================================
class TestTracePacketStructure:
"""Verify real trace packet creation and payload structure."""
def test_create_trace_basic(self):
pkt = PacketBuilder.create_trace(tag=0x12345678, auth_code=0xDEADBEEF, flags=0x01)
assert pkt.get_payload_type() == PAYLOAD_TYPE_TRACE
assert pkt.path_len == 0
assert len(pkt.path) == 0
payload = pkt.get_payload()
assert len(payload) == 9
tag, auth_code, flags = struct.unpack("<IIB", payload[:9])
assert tag == 0x12345678
assert auth_code == 0xDEADBEEF
assert flags == 0x01
def test_create_trace_with_path_bytes(self):
"""Trace path goes into payload, not routing path."""
path_bytes = [0xAA, 0xBB, 0xCC, 0xDD]
pkt = PacketBuilder.create_trace(
tag=1, auth_code=2, flags=0, path=path_bytes
)
payload = pkt.get_payload()
assert len(payload) == 9 + 4
# Routing path stays empty
assert pkt.path_len == 0
assert len(pkt.path) == 0
# Path bytes are in the payload after the 9-byte header
assert list(payload[9:]) == path_bytes
def test_trace_is_direct_route(self):
pkt = PacketBuilder.create_trace(tag=0, auth_code=0, flags=0)
assert pkt.is_route_direct()
def test_trace_apply_path_hash_mode_is_noop(self):
"""apply_path_hash_mode should not alter trace packets."""
pkt = PacketBuilder.create_trace(tag=0, auth_code=0, flags=0)
pkt.apply_path_hash_mode(2)
assert pkt.path_len == 0 # unchanged
def test_trace_serialization_round_trip(self):
path_bytes = [0x11, 0x22, 0x33]
pkt = PacketBuilder.create_trace(tag=42, auth_code=99, flags=3, path=path_bytes)
wire = pkt.write_to()
pkt2 = Packet()
pkt2.read_from(wire)
assert pkt2.get_payload_type() == PAYLOAD_TYPE_TRACE
payload = pkt2.get_payload()
tag, auth_code, flags = struct.unpack("<IIB", payload[:9])
assert tag == 42
assert auth_code == 99
assert flags == 3
assert list(payload[9:]) == path_bytes
# ===================================================================
# 7. TraceHandler._parse_trace_payload — real parsing
# ===================================================================
class TestTracePayloadParsing:
"""Verify TraceHandler._parse_trace_payload with real trace payloads."""
def _make_trace_handler(self):
handler = object.__new__(TraceHandler)
return handler
def test_parse_basic_trace(self):
th = self._make_trace_handler()
payload = struct.pack("<IIB", 0x12345678, 0xAABBCCDD, 0x05)
result = th._parse_trace_payload(payload)
assert result["valid"]
assert result["tag"] == 0x12345678
assert result["auth_code"] == 0xAABBCCDD
assert result["flags"] == 0x05
assert result["trace_path"] == []
def test_parse_trace_with_1_byte_path(self):
th = self._make_trace_handler()
payload = struct.pack("<IIB", 1, 2, 0) + bytes([0xAA, 0xBB, 0xCC])
result = th._parse_trace_payload(payload)
assert result["valid"]
assert result["trace_path"] == [0xAA, 0xBB, 0xCC]
assert result["path_length"] == 3
def test_parse_trace_with_multibyte_path_is_flat(self):
"""
Trace path is raw bytes in payload _parse_trace_payload returns it flat.
Multi-byte grouping is NOT done at the trace parser level.
"""
th = self._make_trace_handler()
# 2 hops of 2-byte hashes → 4 flat bytes in the payload
path = bytes([0xAA, 0xBB, 0xCC, 0xDD])
payload = struct.pack("<IIB", 10, 20, 0) + path
result = th._parse_trace_payload(payload)
assert result["valid"]
# Returns flat list, not grouped
assert result["trace_path"] == [0xAA, 0xBB, 0xCC, 0xDD]
assert result["path_length"] == 4
def test_parse_from_real_packet(self):
"""Create a trace with PacketBuilder, serialize, deserialize, then parse."""
th = self._make_trace_handler()
trace_path = [0x11, 0x22, 0x33, 0x44, 0x55, 0x66]
pkt = PacketBuilder.create_trace(
tag=100, auth_code=200, flags=7, path=trace_path
)
wire = pkt.write_to()
pkt2 = Packet()
pkt2.read_from(wire)
result = th._parse_trace_payload(pkt2.get_payload())
assert result["valid"]
assert result["tag"] == 100
assert result["auth_code"] == 200
assert result["flags"] == 7
assert result["trace_path"] == trace_path
def test_parse_too_short_payload(self):
th = self._make_trace_handler()
result = th._parse_trace_payload(b"\x01\x02\x03")
assert "error" in result
assert "too short" in result["error"].lower()
# ===================================================================
# 8. Wire-level verification — manual byte inspection
# ===================================================================
class TestWireLevelEncoding:
"""Verify exact byte layout of serialized multi-byte path packets."""
def test_2_byte_mode_wire_format(self):
"""
ROUTE_TYPE_FLOOD (no transport codes):
[header(1)] [path_len(1)] [path(N)] [payload(M)]
"""
pkt = _make_flood_packet(
b"\xAA\xBB\xCC\xDD", hash_size=2, hash_count=2,
payload=b"\xFE"
)
wire = pkt.write_to()
assert wire[0] == ROUTE_TYPE_FLOOD # header
path_len = wire[1]
assert PathUtils.get_path_hash_size(path_len) == 2
assert PathUtils.get_path_hash_count(path_len) == 2
assert wire[2:6] == b"\xAA\xBB\xCC\xDD" # path bytes
assert wire[6:] == b"\xFE" # payload
def test_3_byte_mode_wire_format(self):
pkt = _make_flood_packet(
b"\x11\x22\x33\x44\x55\x66", hash_size=3, hash_count=2,
payload=b"\xAA"
)
wire = pkt.write_to()
assert wire[0] == ROUTE_TYPE_FLOOD
path_len = wire[1]
assert PathUtils.get_path_hash_size(path_len) == 3
assert PathUtils.get_path_hash_count(path_len) == 2
assert wire[2:8] == b"\x11\x22\x33\x44\x55\x66"
assert wire[8:] == b"\xAA"
def test_1_byte_mode_backward_compat_wire(self):
"""1-byte mode: path_len byte on wire == hop count (legacy format)."""
pkt = _make_flood_packet(b"\xAA\xBB", hash_size=1, hash_count=2)
wire = pkt.write_to()
assert wire[1] == 2 # path_len == hop_count for 1-byte mode
def test_read_from_2_byte_wire(self):
"""Manually construct wire bytes and verify read_from parses correctly."""
# header=ROUTE_TYPE_FLOOD, path_len=encode(2, 2), path=4 bytes, payload=2 bytes
path_len = PathUtils.encode_path_len(2, 2)
wire = bytes([ROUTE_TYPE_FLOOD, path_len]) + b"\xAA\xBB\xCC\xDD" + b"\xFE\xED"
pkt = Packet()
pkt.read_from(wire)
assert pkt.get_path_hash_size() == 2
assert pkt.get_path_hash_count() == 2
assert pkt.get_path_hashes() == [b"\xAA\xBB", b"\xCC\xDD"]
assert pkt.get_payload() == b"\xFE\xED"