From 52243cbc948e76001fabf72b7208de0946f5002c Mon Sep 17 00:00:00 2001 From: jkingsman Date: Thu, 4 Jun 2026 21:34:02 -0700 Subject: [PATCH] Extract ACK codes for standalone ACKs (i.e. normal/non-flood). Might resolve #278? --- app/AGENTS.md | 1 + app/packet_processor.py | 17 +++++++ tests/test_packet_pipeline.py | 93 +++++++++++++++++++++++++++++++++++ 3 files changed, 111 insertions(+) diff --git a/app/AGENTS.md b/app/AGENTS.md index c1961b4..1a6e70a 100644 --- a/app/AGENTS.md +++ b/app/AGENTS.md @@ -137,6 +137,7 @@ app/ - Non-final DM attempts use the contact's effective route (`override > direct > flood`). The final retry is intentionally sent as flood even when a routing override exists. - DM ACK state is terminal on first ACK. Retry attempts may register multiple expected ACK codes for the same message, but sibling pending codes are cleared once one ACK wins so a DM should not accrue multiple delivery confirmations from retries. - ACKs are delivery state, not routing state. Bundled ACKs inside PATH packets still satisfy pending DM sends, but ACK history does not feed contact route learning. +- DM ACKs are matched from two independent radio emissions, so confirmation does not depend on the radio surfacing a host control frame: (1) the `EventType.ACK`/`SEND_CONFIRMED` host frame via `event_handlers.on_ack`, and (2) the raw RF packet itself via `packet_processor.process_raw_packet`. The packet processor extracts ACK codes both from PATH-return packets (flood replies, ACK embedded in `extra`) and from standalone `PayloadType.ACK` packets (direct replies, 4-byte cleartext payload), feeding both into `apply_dm_ack_code`. This matters for companion firmwares (e.g. pyMC over TCP) that do not reliably emit a separate host ACK frame for direct-routed replies. ### Echo/repeat dedup diff --git a/app/packet_processor.py b/app/packet_processor.py index 16dae31..e5ba30d 100644 --- a/app/packet_processor.py +++ b/app/packet_processor.py @@ -358,6 +358,23 @@ async def process_raw_packet( elif payload_type == PayloadType.PATH: await _process_path_packet(raw_bytes, ts, packet_info) + elif payload_type == PayloadType.ACK: + # Standalone ACK packets carry the 4-byte ack code in cleartext (the + # firmware just memcpy's the uint32 into the payload). A contact answers + # a *direct*-routed DM with one of these, whereas a *flood*-routed DM is + # answered with a PATH-return that has the ACK embedded (handled above in + # _process_path_packet). We match directly from the raw RF packet so DM + # delivery confirmation does not depend on the radio also surfacing a + # separate EventType.ACK host control frame, which some companion + # firmwares (e.g. pyMC over TCP) do not reliably emit for direct ACKs. + if packet_info is not None and len(packet_info.payload) >= 4: + ack_code = packet_info.payload[:4].hex() + matched = await apply_dm_ack_code(ack_code, broadcast_fn=broadcast_event) + if matched: + logger.info("Applied standalone ACK %s from raw packet", ack_code) + else: + logger.debug("Buffered/ignored standalone ACK %s from raw packet", ack_code) + # Always broadcast raw packet for the packet feed UI (even duplicates) # This enables the frontend cracker to see all incoming packets in real-time broadcast_payload = RawPacketBroadcast( diff --git a/tests/test_packet_pipeline.py b/tests/test_packet_pipeline.py index c5e0910..3e58f8c 100644 --- a/tests/test_packet_pipeline.py +++ b/tests/test_packet_pipeline.py @@ -74,6 +74,12 @@ def _build_path_packet( return header + payload +def _build_ack_packet(code: bytes, *, route_type: RouteType = RouteType.DIRECT) -> bytes: + """Build a standalone ACK packet whose cleartext payload is the 4-byte code.""" + header = bytes([(PayloadType.ACK << 2) | route_type, 0x00]) + return header + code + + class TestChannelMessagePipeline: """Test channel message flow: packet → decrypt → store → broadcast.""" @@ -763,6 +769,93 @@ class TestAckPipeline: assert "ack_count" in broadcast["data"] assert broadcast["data"]["ack_count"] == 1 + @pytest.mark.asyncio + async def test_standalone_ack_packet_marks_message_acked(self, test_db, captured_broadcasts): + """A standalone ACK RF packet satisfies a pending DM ACK from the raw feed. + + Direct-routed DMs are answered with a standalone PAYLOAD_TYPE_ACK packet + (vs. the PATH-embedded ACK used for flood). We must match it straight from + the raw packet so delivery confirmation does not depend on the radio also + emitting a separate EventType.ACK host control frame. + """ + from app.packet_processor import process_raw_packet + from app.services import dm_ack_tracker + + code = bytes.fromhex("01020304") + raw_packet = _build_ack_packet(code) + + message_id = await MessageRepository.create( + msg_type="PRIV", + text="waiting for direct ack", + conversation_key=PATH_TEST_CONTACT_PUB.hex(), + sender_timestamp=1700000000, + received_at=1700000000, + outgoing=True, + ) + + prev_pending = dm_ack_tracker._pending_acks.copy() + prev_buffered = dm_ack_tracker._buffered_acks.copy() + dm_ack_tracker._pending_acks.clear() + dm_ack_tracker._buffered_acks.clear() + dm_ack_tracker.track_pending_ack(code.hex(), message_id, 30000) + + broadcasts, mock_broadcast = captured_broadcasts + try: + with patch("app.packet_processor.broadcast_event", mock_broadcast): + result = await process_raw_packet(raw_packet, timestamp=1700000300) + finally: + dm_ack_tracker._pending_acks.clear() + dm_ack_tracker._pending_acks.update(prev_pending) + dm_ack_tracker._buffered_acks.clear() + dm_ack_tracker._buffered_acks.update(prev_buffered) + + assert result["payload_type"] == "ACK" + + messages = await MessageRepository.get_all( + msg_type="PRIV", + conversation_key=PATH_TEST_CONTACT_PUB.hex(), + limit=10, + ) + assert len(messages) == 1 + assert messages[0].acked == 1 + + ack_broadcasts = [b for b in broadcasts if b["type"] == "message_acked"] + assert len(ack_broadcasts) == 1 + assert ack_broadcasts[0]["data"] == {"message_id": message_id, "ack_count": 1} + + @pytest.mark.asyncio + async def test_standalone_ack_packet_with_no_pending_is_buffered( + self, test_db, captured_broadcasts + ): + """An unmatched standalone ACK is buffered (for late registration), not dropped.""" + from app.packet_processor import process_raw_packet + from app.services import dm_ack_tracker + + code = bytes.fromhex("aabbccdd") + raw_packet = _build_ack_packet(code) + + prev_pending = dm_ack_tracker._pending_acks.copy() + prev_buffered = dm_ack_tracker._buffered_acks.copy() + dm_ack_tracker._pending_acks.clear() + dm_ack_tracker._buffered_acks.clear() + + broadcasts, mock_broadcast = captured_broadcasts + try: + with patch("app.packet_processor.broadcast_event", mock_broadcast): + await process_raw_packet(raw_packet, timestamp=1700000300) + buffered_after = dm_ack_tracker._buffered_acks.copy() + finally: + dm_ack_tracker._pending_acks.clear() + dm_ack_tracker._pending_acks.update(prev_pending) + dm_ack_tracker._buffered_acks.clear() + dm_ack_tracker._buffered_acks.update(prev_buffered) + + # Code is buffered so a slightly-later send registration still matches it. + assert code.hex() in buffered_after + # No message exists, so nothing should be marked acked / broadcast. + ack_broadcasts = [b for b in broadcasts if b["type"] == "message_acked"] + assert ack_broadcasts == [] + class TestCreateMessageFromDecrypted: """Test the shared message creation function used by both real-time and historical decryption."""