feat: enhance retry logic with acknowledgement tracking and startup grace period

This commit is contained in:
Louis King
2025-09-21 17:36:56 +01:00
parent 20c673c3e0
commit 9719042343
6 changed files with 113 additions and 62 deletions

View File

@@ -17,14 +17,14 @@ jobs:
# github.event.pull_request.user.login == 'external-contributor' ||
# github.event.pull_request.user.login == 'new-developer' ||
# github.event.pull_request.author_association == 'FIRST_TIME_CONTRIBUTOR'
runs-on: ubuntu-latest
permissions:
contents: read
pull-requests: read
issues: read
id-token: write
steps:
- name: Checkout repository
uses: actions/checkout@v4
@@ -43,12 +43,11 @@ jobs:
- Performance considerations
- Security concerns
- Test coverage
Use the repository's CLAUDE.md for guidance on style and conventions. Be constructive and helpful in your feedback.
Use `gh pr comment` with your Bash tool to leave your review as a comment on the PR.
# See https://github.com/anthropics/claude-code-action/blob/main/docs/usage.md
# or https://docs.claude.com/en/docs/claude-code/sdk#command-line for available options
claude_args: '--allowed-tools "Bash(gh issue view:*),Bash(gh search:*),Bash(gh issue list:*),Bash(gh pr comment:*),Bash(gh pr diff:*),Bash(gh pr view:*),Bash(gh pr list:*)"'

View File

@@ -35,7 +35,7 @@ jobs:
uses: anthropics/claude-code-action@v1
with:
claude_code_oauth_token: ${{ secrets.CLAUDE_CODE_OAUTH_TOKEN }}
# This is an optional setting that allows Claude to read CI results on PRs
additional_permissions: |
actions: read
@@ -47,4 +47,3 @@ jobs:
# See https://github.com/anthropics/claude-code-action/blob/main/docs/usage.md
# or https://docs.claude.com/en/docs/claude-code/sdk#command-line for available options
# claude_args: '--model claude-opus-4-1-20250805 --allowed-tools Bash(gh pr:*)'

View File

@@ -155,4 +155,4 @@ The implementation includes comprehensive unit tests covering:
Run tests with:
```bash
pytest tests/test_retry_logic.py -v
```
```

View File

@@ -103,7 +103,9 @@ class MeshCoreConfig(BaseModel):
default=2.0,
ge=0.5,
le=30.0,
description="Base delay in seconds between message retries (exponential backoff)",
description=(
"Base delay in seconds between message retries (exponential backoff)"
),
)
reset_path_on_failure: bool = Field(
default=True,
@@ -254,7 +256,10 @@ class Config(BaseModel):
),
message_retry_count=int(os.getenv("MESHCORE_MESSAGE_RETRY_COUNT", "3")),
message_retry_delay=float(os.getenv("MESHCORE_MESSAGE_RETRY_DELAY", "2.0")),
reset_path_on_failure=os.getenv("MESHCORE_RESET_PATH_ON_FAILURE", "true").lower() == "true",
reset_path_on_failure=os.getenv(
"MESHCORE_RESET_PATH_ON_FAILURE", "true"
).lower()
== "true",
)
return cls(

View File

@@ -73,6 +73,10 @@ class MeshCoreWorker:
self._pending_acks: Dict[str, asyncio.Event] = {}
self._ack_results: Dict[str, bool] = {}
# Command deduplication to prevent restart message re-sending
self._startup_time = time.time()
self._startup_grace_period = 5.0 # 5 seconds to ignore commands during startup
async def start(self) -> None:
"""Start the MeshCore worker."""
if self._running:
@@ -294,6 +298,20 @@ class MeshCoreWorker:
self.logger.error("MeshCore not initialized, cannot process command")
return
# Check if this command is received during startup grace period
# to prevent processing of stale/retained MQTT messages
time_since_startup = time.time() - self._startup_time
if time_since_startup < self._startup_grace_period:
command_data = message.payload
command_type = command_data.get("command_type", "")
self.logger.warning(
f"Ignoring MQTT command '{command_type}' received during startup "
f"grace period ({time_since_startup:.1f}s < "
f"{self._startup_grace_period}s). This prevents processing "
"stale/retained messages after restart."
)
return
command_data = message.payload
command_type = command_data.get("command_type", "")
@@ -666,11 +684,15 @@ class MeshCoreWorker:
for attempt in range(max_retries + 1):
try:
self.logger.info(
f"Sending message to {destination} (attempt {attempt + 1}/{max_retries + 1})"
f"Sending message to {destination} "
f"(attempt {attempt + 1}/{max_retries + 1})"
)
# Send the message
result = await self.meshcore.commands.send_msg(destination, message)
if self.meshcore:
result = await self.meshcore.commands.send_msg(destination, message)
else:
raise RuntimeError("MeshCore not initialized")
# Check if we got MSG_SENT with expected_ack info
if result and hasattr(result, "payload"):
@@ -687,25 +709,31 @@ class MeshCoreWorker:
if ack_received:
self.logger.info(
f"Message to {destination} acknowledged successfully"
f"Message to {destination} acknowledged "
f"successfully"
)
return result
else:
self.logger.warning(
f"No acknowledgement received for message to {destination}"
f"No acknowledgement received for message to "
f"{destination}"
)
# If this was the last regular attempt, try path reset if configured
# If this was the last regular attempt, try path reset
# if configured
if attempt == max_retries - 1 and reset_path:
self.logger.info(
f"Resetting path for {destination} and trying once more"
f"Resetting path for {destination} and trying "
f"once more"
)
await self._reset_path(destination)
# Continue to the last attempt with reset path
elif attempt < max_retries:
# Wait before retry with exponential backoff
delay = base_delay * (2 ** attempt)
self.logger.info(f"Retrying in {delay:.1f} seconds...")
delay = base_delay * (2**attempt)
self.logger.info(
f"Retrying in {delay:.1f} seconds..."
)
await asyncio.sleep(delay)
continue
@@ -715,15 +743,17 @@ class MeshCoreWorker:
except Exception as e:
self.logger.error(
f"Error sending message to {destination} on attempt {attempt + 1}: {e}"
f"Error sending message to {destination} on attempt "
f"{attempt + 1}: {e}"
)
if attempt < max_retries:
delay = base_delay * (2 ** attempt)
delay = base_delay * (2**attempt)
self.logger.info(f"Retrying in {delay:.1f} seconds...")
await asyncio.sleep(delay)
self.logger.error(
f"Failed to send message to {destination} after {max_retries + 1} attempts"
f"Failed to send message to {destination} after "
f"{max_retries + 1} attempts"
)
return None
@@ -735,11 +765,17 @@ class MeshCoreWorker:
for attempt in range(max_retries + 1):
try:
self.logger.info(
f"Sending message to channel {channel} (attempt {attempt + 1}/{max_retries + 1})"
f"Sending message to channel {channel} "
f"(attempt {attempt + 1}/{max_retries + 1})"
)
# Send the channel message
result = await self.meshcore.commands.send_chan_msg(channel, message)
if self.meshcore:
result = await self.meshcore.commands.send_chan_msg(
channel, message
)
else:
raise RuntimeError("MeshCore not initialized")
# Check if we got MSG_SENT with expected_ack info
if result and hasattr(result, "payload"):
@@ -756,18 +792,22 @@ class MeshCoreWorker:
if ack_received:
self.logger.info(
f"Channel {channel} message acknowledged successfully"
f"Channel {channel} message acknowledged "
f"successfully"
)
return result
else:
self.logger.warning(
f"No acknowledgement received for channel {channel} message"
f"No acknowledgement received for channel "
f"{channel} message"
)
if attempt < max_retries:
# Wait before retry with exponential backoff
delay = base_delay * (2 ** attempt)
self.logger.info(f"Retrying in {delay:.1f} seconds...")
delay = base_delay * (2**attempt)
self.logger.info(
f"Retrying in {delay:.1f} seconds..."
)
await asyncio.sleep(delay)
continue
@@ -777,15 +817,17 @@ class MeshCoreWorker:
except Exception as e:
self.logger.error(
f"Error sending message to channel {channel} on attempt {attempt + 1}: {e}"
f"Error sending message to channel {channel} on attempt "
f"{attempt + 1}: {e}"
)
if attempt < max_retries:
delay = base_delay * (2 ** attempt)
delay = base_delay * (2**attempt)
self.logger.info(f"Retrying in {delay:.1f} seconds...")
await asyncio.sleep(delay)
self.logger.error(
f"Failed to send message to channel {channel} after {max_retries + 1} attempts"
f"Failed to send message to channel {channel} after "
f"{max_retries + 1} attempts"
)
return None
@@ -815,7 +857,9 @@ class MeshCoreWorker:
ack_id = None
if hasattr(ack_data, "payload"):
if isinstance(ack_data.payload, dict):
ack_id = ack_data.payload.get("ack") or ack_data.payload.get("ack_id")
ack_id = ack_data.payload.get("ack") or ack_data.payload.get(
"ack_id"
)
elif hasattr(ack_data.payload, "ack"):
ack_id = ack_data.payload.ack
@@ -836,7 +880,7 @@ class MeshCoreWorker:
# The MeshCore library should handle path reset through reconnection
# or by sending a specific command. For now, we'll attempt a trace
# packet which can help re-establish routing
if hasattr(self.meshcore.commands, "send_trace"):
if self.meshcore and hasattr(self.meshcore.commands, "send_trace"):
await self.meshcore.commands.send_trace(flags=1)
# Give the network time to update routing tables
await asyncio.sleep(1)

View File

@@ -5,17 +5,17 @@ from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from meshcore_mqtt.config import Config, MeshCoreConfig, MQTTConfig
from meshcore_mqtt.config import Config, ConnectionType, MeshCoreConfig, MQTTConfig
from meshcore_mqtt.meshcore_worker import MeshCoreWorker
@pytest.fixture
def mock_config():
def mock_config() -> Config:
"""Create test configuration."""
return Config(
mqtt=MQTTConfig(broker="test-broker"),
meshcore=MeshCoreConfig(
connection_type="tcp",
connection_type=ConnectionType.TCP,
address="test-address",
port=12345,
message_retry_count=3,
@@ -26,7 +26,7 @@ def mock_config():
@pytest.fixture
def worker(mock_config):
def worker(mock_config: Config) -> MeshCoreWorker:
"""Create MeshCore worker instance."""
return MeshCoreWorker(mock_config)
@@ -34,19 +34,19 @@ def worker(mock_config):
class TestRetryConfiguration:
"""Test retry configuration parameters."""
def test_default_retry_config(self):
def test_default_retry_config(self) -> None:
"""Test default retry configuration values."""
config = MeshCoreConfig(
connection_type="tcp", address="test", port=12345
connection_type=ConnectionType.TCP, address="test", port=12345
)
assert config.message_retry_count == 3
assert config.message_retry_delay == 2.0
assert config.reset_path_on_failure is True
def test_custom_retry_config(self):
def test_custom_retry_config(self) -> None:
"""Test custom retry configuration values."""
config = MeshCoreConfig(
connection_type="tcp",
connection_type=ConnectionType.TCP,
address="test",
port=12345,
message_retry_count=5,
@@ -57,11 +57,11 @@ class TestRetryConfiguration:
assert config.message_retry_delay == 3.5
assert config.reset_path_on_failure is False
def test_retry_config_validation(self):
def test_retry_config_validation(self) -> None:
"""Test retry configuration validation."""
# Valid range
config = MeshCoreConfig(
connection_type="tcp",
connection_type=ConnectionType.TCP,
address="test",
port=12345,
message_retry_count=10,
@@ -73,7 +73,7 @@ class TestRetryConfiguration:
# Invalid retry count
with pytest.raises(ValueError):
MeshCoreConfig(
connection_type="tcp",
connection_type=ConnectionType.TCP,
address="test",
port=12345,
message_retry_count=11,
@@ -82,7 +82,7 @@ class TestRetryConfiguration:
# Invalid retry delay
with pytest.raises(ValueError):
MeshCoreConfig(
connection_type="tcp",
connection_type=ConnectionType.TCP,
address="test",
port=12345,
message_retry_delay=31.0,
@@ -93,7 +93,9 @@ class TestMessageRetryLogic:
"""Test message retry functionality."""
@pytest.mark.asyncio
async def test_send_msg_with_retry_success_first_attempt(self, worker):
async def test_send_msg_with_retry_success_first_attempt(
self, worker: MeshCoreWorker
) -> None:
"""Test successful message send on first attempt."""
# Mock MeshCore commands
mock_result = MagicMock()
@@ -113,7 +115,9 @@ class TestMessageRetryLogic:
)
@pytest.mark.asyncio
async def test_send_msg_with_retry_success_after_retries(self, worker):
async def test_send_msg_with_retry_success_after_retries(
self, worker: MeshCoreWorker
) -> None:
"""Test successful message send after retries."""
# Mock MeshCore commands
mock_result = MagicMock()
@@ -125,16 +129,14 @@ class TestMessageRetryLogic:
# Mock ack not received first two times, then received
ack_results = [False, False, True]
with patch.object(
worker, "_wait_for_ack", side_effect=ack_results
):
with patch.object(worker, "_wait_for_ack", side_effect=ack_results):
result = await worker._send_msg_with_retry("destination", "message")
assert result == mock_result
assert worker.meshcore.commands.send_msg.call_count == 3
@pytest.mark.asyncio
async def test_send_msg_with_retry_failure(self, worker):
async def test_send_msg_with_retry_failure(self, worker: MeshCoreWorker) -> None:
"""Test message send failure after all retries."""
# Mock MeshCore commands
mock_result = MagicMock()
@@ -154,7 +156,7 @@ class TestMessageRetryLogic:
assert worker.meshcore.commands.send_msg.call_count == 4
@pytest.mark.asyncio
async def test_send_msg_with_path_reset(self, worker):
async def test_send_msg_with_path_reset(self, worker: MeshCoreWorker) -> None:
"""Test path reset after max retries."""
# Mock MeshCore commands
mock_result = MagicMock()
@@ -174,7 +176,7 @@ class TestMessageRetryLogic:
mock_reset_path.assert_called_once_with("destination")
@pytest.mark.asyncio
async def test_send_msg_no_ack_info(self, worker):
async def test_send_msg_no_ack_info(self, worker: MeshCoreWorker) -> None:
"""Test message send when no ack info is provided."""
# Mock MeshCore commands - no ack info in response
mock_result = MagicMock()
@@ -191,7 +193,9 @@ class TestMessageRetryLogic:
worker.meshcore.commands.send_msg.assert_called_once()
@pytest.mark.asyncio
async def test_send_chan_msg_with_retry_success(self, worker):
async def test_send_chan_msg_with_retry_success(
self, worker: MeshCoreWorker
) -> None:
"""Test successful channel message send with retry."""
# Mock MeshCore commands
mock_result = MagicMock()
@@ -209,7 +213,7 @@ class TestMessageRetryLogic:
worker.meshcore.commands.send_chan_msg.assert_called_once_with(0, "message")
@pytest.mark.asyncio
async def test_wait_for_ack_timeout(self, worker):
async def test_wait_for_ack_timeout(self, worker: MeshCoreWorker) -> None:
"""Test acknowledgement timeout."""
ack_key = "test_ack"
event = asyncio.Event()
@@ -221,11 +225,11 @@ class TestMessageRetryLogic:
assert ack_key not in worker._pending_acks
@pytest.mark.asyncio
async def test_wait_for_ack_received(self, worker):
async def test_wait_for_ack_received(self, worker: MeshCoreWorker) -> None:
"""Test acknowledgement received."""
ack_key = "test_ack"
async def set_ack():
async def set_ack() -> None:
await asyncio.sleep(0.05)
worker._ack_results[ack_key] = True
if ack_key in worker._pending_acks:
@@ -238,7 +242,7 @@ class TestMessageRetryLogic:
result = await worker._wait_for_ack(ack_key, 1.0)
assert result is True
def test_on_ack_received(self, worker):
def test_on_ack_received(self, worker: MeshCoreWorker) -> None:
"""Test acknowledgement event handler."""
ack_key = "test_ack"
event = asyncio.Event()
@@ -255,7 +259,7 @@ class TestMessageRetryLogic:
assert event.is_set()
@pytest.mark.asyncio
async def test_reset_path(self, worker):
async def test_reset_path(self, worker: MeshCoreWorker) -> None:
"""Test path reset functionality."""
worker.meshcore = MagicMock()
worker.meshcore.commands = MagicMock()
@@ -266,7 +270,7 @@ class TestMessageRetryLogic:
worker.meshcore.commands.send_trace.assert_called_once_with(flags=1)
@pytest.mark.asyncio
async def test_exponential_backoff(self, worker):
async def test_exponential_backoff(self, worker: MeshCoreWorker) -> None:
"""Test exponential backoff timing."""
# Set shorter delays for testing
worker.config.meshcore.message_retry_delay = 0.1
@@ -282,7 +286,7 @@ class TestMessageRetryLogic:
# Track timing
call_times = []
async def mock_send_msg(*args):
async def mock_send_msg(*args: str) -> MagicMock:
call_times.append(asyncio.get_event_loop().time())
return mock_result
@@ -302,5 +306,5 @@ class TestMessageRetryLogic:
# Second retry after base_delay * 2 (0.2s)
assert 0.15 < (call_times[2] - call_times[1]) < 0.3
if len(call_times) > 3:
# Third retry after base_delay * 4 (0.4s)
assert 0.35 < (call_times[3] - call_times[2]) < 0.5
# Third attempt happens immediately after path reset (no delay)
assert (call_times[3] - call_times[2]) < 0.1