diff --git a/app/fanout/bot.py b/app/fanout/bot.py index 727453f..bb22412 100644 --- a/app/fanout/bot.py +++ b/app/fanout/bot.py @@ -136,6 +136,7 @@ class BotModule(FanoutModule): if path_value is None and paths and isinstance(paths, list) and len(paths) > 0: path_value = paths[0].get("path") if isinstance(paths[0], dict) else None path_bytes_per_hop = _derive_path_bytes_per_hop(paths, path_value) + packet_hash = data.get("packet_hash") # Wait for message to settle (allows retransmissions to be deduped) await asyncio.sleep(2) @@ -161,6 +162,7 @@ class BotModule(FanoutModule): path_value, is_outgoing, path_bytes_per_hop, + packet_hash, ), timeout=BOT_EXECUTION_TIMEOUT, ) diff --git a/app/fanout/bot_exec.py b/app/fanout/bot_exec.py index 253cfed..878c66f 100644 --- a/app/fanout/bot_exec.py +++ b/app/fanout/bot_exec.py @@ -69,14 +69,14 @@ def _analyze_bot_signature(bot_func_or_sig) -> BotCallPlan: has_varargs = any(p.kind == inspect.Parameter.VAR_POSITIONAL for p in param_values) has_kwargs = any(p.kind == inspect.Parameter.VAR_KEYWORD for p in param_values) explicit_optional_names = tuple( - name for name in ("is_outgoing", "path_bytes_per_hop") if name in params + name for name in ("is_outgoing", "path_bytes_per_hop", "packet_hash") if name in params ) unsupported_required_kwonly = [ p.name for p in param_values if p.kind == inspect.Parameter.KEYWORD_ONLY and p.default is inspect.Parameter.empty - and p.name not in {"is_outgoing", "path_bytes_per_hop"} + and p.name not in {"is_outgoing", "path_bytes_per_hop", "packet_hash"} ] if unsupported_required_kwonly: raise ValueError( @@ -102,6 +102,8 @@ def _analyze_bot_signature(bot_func_or_sig) -> BotCallPlan: keyword_args["is_outgoing"] = False if has_kwargs or "path_bytes_per_hop" in params: keyword_args["path_bytes_per_hop"] = 1 + if has_kwargs or "packet_hash" in params: + keyword_args["packet_hash"] = "" candidate_specs.append(("keyword", [], keyword_args)) if not has_kwargs and explicit_optional_names: @@ -110,8 +112,12 @@ def _analyze_bot_signature(bot_func_or_sig) -> BotCallPlan: kwargs["is_outgoing"] = False if has_kwargs or "path_bytes_per_hop" in params: kwargs["path_bytes_per_hop"] = 1 + if has_kwargs or "packet_hash" in params: + kwargs["packet_hash"] = "" candidate_specs.append(("mixed_keyword", base_args, kwargs)) + if has_varargs or positional_capacity >= 11: + candidate_specs.append(("positional_11", base_args + [False, 1, ""], {})) if has_varargs or positional_capacity >= 10: candidate_specs.append(("positional_10", base_args + [False, 1], {})) if has_varargs or positional_capacity >= 9: @@ -132,6 +138,7 @@ def _analyze_bot_signature(bot_func_or_sig) -> BotCallPlan: "Bot function signature is not supported. Use the default bot template as a reference. " "Supported trailing parameters are: path; path + is_outgoing; " "path + path_bytes_per_hop; path + is_outgoing + path_bytes_per_hop; " + "path + is_outgoing + path_bytes_per_hop + packet_hash; " "or use **kwargs for forward compatibility." ) @@ -148,12 +155,13 @@ def execute_bot_code( path: str | None, is_outgoing: bool = False, path_bytes_per_hop: int | None = None, + packet_hash: str | None = None, ) -> str | list[str] | None: """ Execute user-provided bot code with message context. The code should define a function: - `bot(sender_name, sender_key, message_text, is_dm, channel_key, channel_name, sender_timestamp, path, is_outgoing, path_bytes_per_hop)` + `bot(sender_name, sender_key, message_text, is_dm, channel_key, channel_name, sender_timestamp, path, is_outgoing, path_bytes_per_hop, packet_hash)` or use named parameters / `**kwargs`. that returns either None (no response), a string (single response message), or a list of strings (multiple messages sent in order). @@ -173,6 +181,7 @@ def execute_bot_code( path: Hex-encoded routing path (may be None) is_outgoing: True if this is our own outgoing message path_bytes_per_hop: Number of bytes per routing hop (1, 2, or 3), if known + packet_hash: MeshCore packet hash (first 16 hex chars of SHA256, uppercase), if known Returns: Response string, list of strings, or None. @@ -208,7 +217,21 @@ def execute_bot_code( try: # Call the bot function with appropriate signature - if call_plan.call_style == "positional_10": + if call_plan.call_style == "positional_11": + result = bot_func( + sender_name, + sender_key, + message_text, + is_dm, + channel_key, + channel_name, + sender_timestamp, + path, + is_outgoing, + path_bytes_per_hop, + packet_hash, + ) + elif call_plan.call_style == "positional_10": result = bot_func( sender_name, sender_key, @@ -255,6 +278,8 @@ def execute_bot_code( keyword_args["is_outgoing"] = is_outgoing if "path_bytes_per_hop" in call_plan.keyword_args: keyword_args["path_bytes_per_hop"] = path_bytes_per_hop + if "packet_hash" in call_plan.keyword_args: + keyword_args["packet_hash"] = packet_hash result = bot_func(**keyword_args) else: result = bot_func( diff --git a/app/fanout/community_mqtt.py b/app/fanout/community_mqtt.py index 8407428..2e04f58 100644 --- a/app/fanout/community_mqtt.py +++ b/app/fanout/community_mqtt.py @@ -11,7 +11,6 @@ from __future__ import annotations import asyncio import base64 -import hashlib import json import logging import ssl @@ -23,7 +22,7 @@ import aiomqtt from app.fanout.mqtt_base import BaseMqttPublisher from app.keystore import ed25519_sign_expanded -from app.path_utils import parse_packet_envelope, split_path_hex +from app.path_utils import calculate_packet_hash, parse_packet_envelope, split_path_hex from app.version_info import get_app_build_info logger = logging.getLogger(__name__) @@ -110,34 +109,6 @@ def _generate_jwt_token( return f"{header_b64}.{payload_b64}.{signature.hex()}" -def _calculate_packet_hash(raw_bytes: bytes) -> str: - """Calculate packet hash matching MeshCore's Packet::calculatePacketHash(). - - Parses the packet structure to extract payload type and payload data, - then hashes: payload_type(1 byte) [+ path_len(2 bytes LE) for TRACE] + payload_data. - Returns first 16 hex characters (uppercase). - """ - if not raw_bytes: - return "0" * 16 - - try: - envelope = parse_packet_envelope(raw_bytes) - if envelope is None: - return "0" * 16 - - # Hash: payload_type(1 byte) [+ path_byte as uint16_t LE for TRACE] + payload_data - # IMPORTANT: TRACE hash uses the raw wire byte (not decoded hop count) to match firmware. - hash_obj = hashlib.sha256() - hash_obj.update(bytes([envelope.payload_type])) - if envelope.payload_type == 9: # PAYLOAD_TYPE_TRACE - hash_obj.update(envelope.path_byte.to_bytes(2, byteorder="little")) - hash_obj.update(envelope.payload) - - return hash_obj.hexdigest()[:16].upper() - except Exception: - return "0" * 16 - - def _decode_packet_fields(raw_bytes: bytes) -> tuple[str, str, str, list[str], int | None]: """Decode packet fields used by the community uploader payload format. @@ -192,7 +163,7 @@ def _format_raw_packet(data: dict[str, Any], device_name: str, public_key_hex: s snr: float | str = float(snr_val) if snr_val is not None else "Unknown" rssi: int | str = int(rssi_val) if rssi_val is not None else "Unknown" - packet_hash = _calculate_packet_hash(raw_bytes) + packet_hash = calculate_packet_hash(raw_bytes) packet = { "origin": device_name or "MeshCore Device", diff --git a/app/packet_processor.py b/app/packet_processor.py index 8c84d16..16dae31 100644 --- a/app/packet_processor.py +++ b/app/packet_processor.py @@ -35,6 +35,7 @@ from app.models import ( RawPacketBroadcast, RawPacketDecryptedInfo, ) +from app.path_utils import calculate_packet_hash from app.repository import ( ChannelRepository, ContactAdvertPathRepository, @@ -73,6 +74,7 @@ async def create_message_from_decrypted( snr: float | None = None, channel_name: str | None = None, realtime: bool = True, + packet_hash: str | None = None, ) -> int | None: """Store a decrypted channel message via the shared message service.""" return await _create_message_from_decrypted( @@ -89,6 +91,7 @@ async def create_message_from_decrypted( channel_name=channel_name, realtime=realtime, broadcast_fn=broadcast_event, + packet_hash=packet_hash, ) @@ -104,6 +107,7 @@ async def create_dm_message_from_decrypted( snr: float | None = None, outgoing: bool = False, realtime: bool = True, + packet_hash: str | None = None, ) -> int | None: """Store a decrypted direct message via the shared message service.""" return await _create_dm_message_from_decrypted( @@ -119,6 +123,7 @@ async def create_dm_message_from_decrypted( outgoing=outgoing, realtime=realtime, broadcast_fn=broadcast_event, + packet_hash=packet_hash, ) @@ -323,13 +328,16 @@ async def process_raw_packet( "sender": None, } + # Compute packet hash once for threading into message broadcasts (used by bot fanout). + pkt_hash = calculate_packet_hash(raw_bytes) + # Process packets based on payload type # For GROUP_TEXT, we always try to decrypt even for duplicate packets - the message # deduplication in create_message_from_decrypted handles adding paths to existing messages. # This is more reliable than trying to look up the message via raw packet linking. if payload_type == PayloadType.GROUP_TEXT: decrypt_result = await _process_group_text( - raw_bytes, packet_id, ts, packet_info, rssi=rssi, snr=snr + raw_bytes, packet_id, ts, packet_info, rssi=rssi, snr=snr, packet_hash=pkt_hash ) if decrypt_result: result.update(decrypt_result) @@ -342,7 +350,7 @@ async def process_raw_packet( elif payload_type == PayloadType.TEXT_MESSAGE: # Try to decrypt direct messages using stored private key and known contacts decrypt_result = await _process_direct_message( - raw_bytes, packet_id, ts, packet_info, rssi=rssi, snr=snr + raw_bytes, packet_id, ts, packet_info, rssi=rssi, snr=snr, packet_hash=pkt_hash ) if decrypt_result: result.update(decrypt_result) @@ -384,6 +392,7 @@ async def _process_group_text( packet_info: PacketInfo | None, rssi: int | None = None, snr: float | None = None, + packet_hash: str | None = None, ) -> dict | None: """ Process a GroupText (channel message) packet. @@ -422,6 +431,7 @@ async def _process_group_text( path_len=packet_info.path_length if packet_info else None, rssi=rssi, snr=snr, + packet_hash=packet_hash, ) return { @@ -567,6 +577,7 @@ async def _process_direct_message( packet_info: PacketInfo | None, rssi: int | None = None, snr: float | None = None, + packet_hash: str | None = None, ) -> dict | None: """ Process a TEXT_MESSAGE (direct message) packet. @@ -690,6 +701,7 @@ async def _process_direct_message( rssi=rssi, snr=snr, outgoing=effective_outgoing, + packet_hash=packet_hash, ) return { diff --git a/app/path_utils.py b/app/path_utils.py index 99b87f2..1996889 100644 --- a/app/path_utils.py +++ b/app/path_utils.py @@ -9,6 +9,7 @@ The path_len wire byte is packed as [hash_mode:2][hop_count:6]: Mode 3 (hash_size=4) is reserved and rejected. """ +import hashlib from collections.abc import Iterable from dataclasses import dataclass @@ -289,3 +290,30 @@ def bucket_path_hash_widths(rows: Iterable) -> dict[str, int | float]: "double_byte_pct": (double_byte / total) * 100, "triple_byte_pct": (triple_byte / total) * 100, } + + +def calculate_packet_hash(raw_bytes: bytes) -> str: + """Calculate packet hash matching MeshCore's Packet::calculatePacketHash(). + + Parses the packet structure to extract payload type and payload data, + then hashes: payload_type(1 byte) [+ path_len(2 bytes LE) for TRACE] + payload_data. + Returns first 16 hex characters (uppercase). + """ + if not raw_bytes: + return "0" * 16 + + try: + envelope = parse_packet_envelope(raw_bytes) + if envelope is None: + return "0" * 16 + + hash_obj = hashlib.sha256() + hash_obj.update(bytes([envelope.payload_type])) + # TRACE hash uses the raw wire byte (not decoded hop count) to match firmware. + if envelope.payload_type == 9: # PAYLOAD_TYPE_TRACE + hash_obj.update(envelope.path_byte.to_bytes(2, byteorder="little")) + hash_obj.update(envelope.payload) + + return hash_obj.hexdigest()[:16].upper() + except Exception: + return "0" * 16 diff --git a/app/services/dm_ingest.py b/app/services/dm_ingest.py index 4256801..318aafc 100644 --- a/app/services/dm_ingest.py +++ b/app/services/dm_ingest.py @@ -154,6 +154,7 @@ async def _store_direct_message( update_last_contacted_key: str | None, best_effort_content_dedup: bool, linked_packet_dedup: bool, + packet_hash: str | None = None, message_repository=MessageRepository, contact_repository=ContactRepository, raw_packet_repository=RawPacketRepository, @@ -248,7 +249,9 @@ async def _store_direct_message( sender_name=sender_name, packet_id=packet_id, ) - broadcast_message(message=message, broadcast_fn=broadcast_fn, realtime=realtime) + broadcast_message( + message=message, broadcast_fn=broadcast_fn, realtime=realtime, packet_hash=packet_hash + ) if update_last_contacted_key: await contact_repository.update_last_contacted(update_last_contacted_key, received_at) @@ -279,6 +282,7 @@ async def ingest_decrypted_direct_message( outgoing: bool = False, realtime: bool = True, broadcast_fn: BroadcastFn, + packet_hash: str | None = None, contact_repository=ContactRepository, ) -> Message | None: conversation_key = their_public_key.lower() @@ -338,6 +342,7 @@ async def ingest_decrypted_direct_message( update_last_contacted_key=conversation_key, best_effort_content_dedup=outgoing, linked_packet_dedup=True, + packet_hash=packet_hash, ) if message is None: return None diff --git a/app/services/messages.py b/app/services/messages.py index f6445b3..0f1d5f4 100644 --- a/app/services/messages.py +++ b/app/services/messages.py @@ -95,9 +95,12 @@ def broadcast_message( message: Message, broadcast_fn: BroadcastFn, realtime: bool | None = None, + packet_hash: str | None = None, ) -> None: """Broadcast a message payload, preserving the caller's broadcast signature.""" payload = message.model_dump() + if packet_hash is not None: + payload["packet_hash"] = packet_hash if realtime is None: broadcast_fn("message", payload) else: @@ -272,6 +275,7 @@ async def create_message_from_decrypted( channel_name: str | None = None, realtime: bool = True, broadcast_fn: BroadcastFn, + packet_hash: str | None = None, ) -> int | None: """Store and broadcast a decrypted channel message.""" received = received_at or int(time.time()) @@ -340,6 +344,7 @@ async def create_message_from_decrypted( ), broadcast_fn=broadcast_fn, realtime=realtime, + packet_hash=packet_hash, ) return msg_id @@ -359,6 +364,7 @@ async def create_dm_message_from_decrypted( outgoing: bool = False, realtime: bool = True, broadcast_fn: BroadcastFn, + packet_hash: str | None = None, ) -> int | None: """Store and broadcast a decrypted direct message.""" from app.services.dm_ingest import ingest_decrypted_direct_message @@ -375,6 +381,7 @@ async def create_dm_message_from_decrypted( outgoing=outgoing, realtime=realtime, broadcast_fn=broadcast_fn, + packet_hash=packet_hash, ) return message.id if message is not None else None diff --git a/tests/test_community_mqtt.py b/tests/test_community_mqtt.py index be3c72b..966bb20 100644 --- a/tests/test_community_mqtt.py +++ b/tests/test_community_mqtt.py @@ -17,7 +17,6 @@ from app.fanout.community_mqtt import ( _base64url_encode, _build_radio_info, _build_status_topic, - _calculate_packet_hash, _decode_packet_fields, _format_raw_packet, _generate_jwt_token, @@ -29,6 +28,7 @@ from app.fanout.mqtt_community import ( _render_packet_topic, ) from app.keystore import ed25519_sign_expanded +from app.path_utils import calculate_packet_hash as _calculate_packet_hash def _make_test_keys() -> tuple[bytes, bytes]: diff --git a/tests/test_fanout_hitlist.py b/tests/test_fanout_hitlist.py index 640352b..722b047 100644 --- a/tests/test_fanout_hitlist.py +++ b/tests/test_fanout_hitlist.py @@ -37,6 +37,7 @@ class TestBotModuleParameterExtraction: path, is_outgoing, path_bytes_per_hop, + packet_hash, ): captured["is_outgoing"] = is_outgoing captured["is_dm"] = is_dm @@ -86,6 +87,7 @@ class TestBotModuleParameterExtraction: path, is_outgoing, path_bytes_per_hop, + packet_hash, ): captured["is_outgoing"] = is_outgoing return None @@ -132,6 +134,7 @@ class TestBotModuleParameterExtraction: path, is_outgoing, path_bytes_per_hop, + packet_hash, ): captured["path"] = path captured["path_bytes_per_hop"] = path_bytes_per_hop @@ -180,6 +183,7 @@ class TestBotModuleParameterExtraction: path, is_outgoing, path_bytes_per_hop, + packet_hash, ): captured["message_text"] = message_text captured["sender_name"] = sender_name @@ -228,6 +232,7 @@ class TestBotModuleParameterExtraction: path, is_outgoing, path_bytes_per_hop, + packet_hash, ): captured["channel_name"] = channel_name return None @@ -275,6 +280,7 @@ class TestBotModuleParameterExtraction: path, is_outgoing, path_bytes_per_hop, + packet_hash, ): captured["sender_name"] = sender_name captured["sender_key"] = sender_key