Add packet_hash to bot kwargs. Closes #268.

This commit is contained in:
Jack Kingsman
2026-06-01 18:50:19 -07:00
parent 50af30f3bb
commit bea51b894d
9 changed files with 95 additions and 39 deletions
+2
View File
@@ -136,6 +136,7 @@ class BotModule(FanoutModule):
if path_value is None and paths and isinstance(paths, list) and len(paths) > 0:
path_value = paths[0].get("path") if isinstance(paths[0], dict) else None
path_bytes_per_hop = _derive_path_bytes_per_hop(paths, path_value)
packet_hash = data.get("packet_hash")
# Wait for message to settle (allows retransmissions to be deduped)
await asyncio.sleep(2)
@@ -161,6 +162,7 @@ class BotModule(FanoutModule):
path_value,
is_outgoing,
path_bytes_per_hop,
packet_hash,
),
timeout=BOT_EXECUTION_TIMEOUT,
)
+29 -4
View File
@@ -69,14 +69,14 @@ def _analyze_bot_signature(bot_func_or_sig) -> BotCallPlan:
has_varargs = any(p.kind == inspect.Parameter.VAR_POSITIONAL for p in param_values)
has_kwargs = any(p.kind == inspect.Parameter.VAR_KEYWORD for p in param_values)
explicit_optional_names = tuple(
name for name in ("is_outgoing", "path_bytes_per_hop") if name in params
name for name in ("is_outgoing", "path_bytes_per_hop", "packet_hash") if name in params
)
unsupported_required_kwonly = [
p.name
for p in param_values
if p.kind == inspect.Parameter.KEYWORD_ONLY
and p.default is inspect.Parameter.empty
and p.name not in {"is_outgoing", "path_bytes_per_hop"}
and p.name not in {"is_outgoing", "path_bytes_per_hop", "packet_hash"}
]
if unsupported_required_kwonly:
raise ValueError(
@@ -102,6 +102,8 @@ def _analyze_bot_signature(bot_func_or_sig) -> BotCallPlan:
keyword_args["is_outgoing"] = False
if has_kwargs or "path_bytes_per_hop" in params:
keyword_args["path_bytes_per_hop"] = 1
if has_kwargs or "packet_hash" in params:
keyword_args["packet_hash"] = ""
candidate_specs.append(("keyword", [], keyword_args))
if not has_kwargs and explicit_optional_names:
@@ -110,8 +112,12 @@ def _analyze_bot_signature(bot_func_or_sig) -> BotCallPlan:
kwargs["is_outgoing"] = False
if has_kwargs or "path_bytes_per_hop" in params:
kwargs["path_bytes_per_hop"] = 1
if has_kwargs or "packet_hash" in params:
kwargs["packet_hash"] = ""
candidate_specs.append(("mixed_keyword", base_args, kwargs))
if has_varargs or positional_capacity >= 11:
candidate_specs.append(("positional_11", base_args + [False, 1, ""], {}))
if has_varargs or positional_capacity >= 10:
candidate_specs.append(("positional_10", base_args + [False, 1], {}))
if has_varargs or positional_capacity >= 9:
@@ -132,6 +138,7 @@ def _analyze_bot_signature(bot_func_or_sig) -> BotCallPlan:
"Bot function signature is not supported. Use the default bot template as a reference. "
"Supported trailing parameters are: path; path + is_outgoing; "
"path + path_bytes_per_hop; path + is_outgoing + path_bytes_per_hop; "
"path + is_outgoing + path_bytes_per_hop + packet_hash; "
"or use **kwargs for forward compatibility."
)
@@ -148,12 +155,13 @@ def execute_bot_code(
path: str | None,
is_outgoing: bool = False,
path_bytes_per_hop: int | None = None,
packet_hash: str | None = None,
) -> str | list[str] | None:
"""
Execute user-provided bot code with message context.
The code should define a function:
`bot(sender_name, sender_key, message_text, is_dm, channel_key, channel_name, sender_timestamp, path, is_outgoing, path_bytes_per_hop)`
`bot(sender_name, sender_key, message_text, is_dm, channel_key, channel_name, sender_timestamp, path, is_outgoing, path_bytes_per_hop, packet_hash)`
or use named parameters / `**kwargs`.
that returns either None (no response), a string (single response message),
or a list of strings (multiple messages sent in order).
@@ -173,6 +181,7 @@ def execute_bot_code(
path: Hex-encoded routing path (may be None)
is_outgoing: True if this is our own outgoing message
path_bytes_per_hop: Number of bytes per routing hop (1, 2, or 3), if known
packet_hash: MeshCore packet hash (first 16 hex chars of SHA256, uppercase), if known
Returns:
Response string, list of strings, or None.
@@ -208,7 +217,21 @@ def execute_bot_code(
try:
# Call the bot function with appropriate signature
if call_plan.call_style == "positional_10":
if call_plan.call_style == "positional_11":
result = bot_func(
sender_name,
sender_key,
message_text,
is_dm,
channel_key,
channel_name,
sender_timestamp,
path,
is_outgoing,
path_bytes_per_hop,
packet_hash,
)
elif call_plan.call_style == "positional_10":
result = bot_func(
sender_name,
sender_key,
@@ -255,6 +278,8 @@ def execute_bot_code(
keyword_args["is_outgoing"] = is_outgoing
if "path_bytes_per_hop" in call_plan.keyword_args:
keyword_args["path_bytes_per_hop"] = path_bytes_per_hop
if "packet_hash" in call_plan.keyword_args:
keyword_args["packet_hash"] = packet_hash
result = bot_func(**keyword_args)
else:
result = bot_func(
+2 -31
View File
@@ -11,7 +11,6 @@ from __future__ import annotations
import asyncio
import base64
import hashlib
import json
import logging
import ssl
@@ -23,7 +22,7 @@ import aiomqtt
from app.fanout.mqtt_base import BaseMqttPublisher
from app.keystore import ed25519_sign_expanded
from app.path_utils import parse_packet_envelope, split_path_hex
from app.path_utils import calculate_packet_hash, parse_packet_envelope, split_path_hex
from app.version_info import get_app_build_info
logger = logging.getLogger(__name__)
@@ -110,34 +109,6 @@ def _generate_jwt_token(
return f"{header_b64}.{payload_b64}.{signature.hex()}"
def _calculate_packet_hash(raw_bytes: bytes) -> str:
"""Calculate packet hash matching MeshCore's Packet::calculatePacketHash().
Parses the packet structure to extract payload type and payload data,
then hashes: payload_type(1 byte) [+ path_len(2 bytes LE) for TRACE] + payload_data.
Returns first 16 hex characters (uppercase).
"""
if not raw_bytes:
return "0" * 16
try:
envelope = parse_packet_envelope(raw_bytes)
if envelope is None:
return "0" * 16
# Hash: payload_type(1 byte) [+ path_byte as uint16_t LE for TRACE] + payload_data
# IMPORTANT: TRACE hash uses the raw wire byte (not decoded hop count) to match firmware.
hash_obj = hashlib.sha256()
hash_obj.update(bytes([envelope.payload_type]))
if envelope.payload_type == 9: # PAYLOAD_TYPE_TRACE
hash_obj.update(envelope.path_byte.to_bytes(2, byteorder="little"))
hash_obj.update(envelope.payload)
return hash_obj.hexdigest()[:16].upper()
except Exception:
return "0" * 16
def _decode_packet_fields(raw_bytes: bytes) -> tuple[str, str, str, list[str], int | None]:
"""Decode packet fields used by the community uploader payload format.
@@ -192,7 +163,7 @@ def _format_raw_packet(data: dict[str, Any], device_name: str, public_key_hex: s
snr: float | str = float(snr_val) if snr_val is not None else "Unknown"
rssi: int | str = int(rssi_val) if rssi_val is not None else "Unknown"
packet_hash = _calculate_packet_hash(raw_bytes)
packet_hash = calculate_packet_hash(raw_bytes)
packet = {
"origin": device_name or "MeshCore Device",
+14 -2
View File
@@ -35,6 +35,7 @@ from app.models import (
RawPacketBroadcast,
RawPacketDecryptedInfo,
)
from app.path_utils import calculate_packet_hash
from app.repository import (
ChannelRepository,
ContactAdvertPathRepository,
@@ -73,6 +74,7 @@ async def create_message_from_decrypted(
snr: float | None = None,
channel_name: str | None = None,
realtime: bool = True,
packet_hash: str | None = None,
) -> int | None:
"""Store a decrypted channel message via the shared message service."""
return await _create_message_from_decrypted(
@@ -89,6 +91,7 @@ async def create_message_from_decrypted(
channel_name=channel_name,
realtime=realtime,
broadcast_fn=broadcast_event,
packet_hash=packet_hash,
)
@@ -104,6 +107,7 @@ async def create_dm_message_from_decrypted(
snr: float | None = None,
outgoing: bool = False,
realtime: bool = True,
packet_hash: str | None = None,
) -> int | None:
"""Store a decrypted direct message via the shared message service."""
return await _create_dm_message_from_decrypted(
@@ -119,6 +123,7 @@ async def create_dm_message_from_decrypted(
outgoing=outgoing,
realtime=realtime,
broadcast_fn=broadcast_event,
packet_hash=packet_hash,
)
@@ -323,13 +328,16 @@ async def process_raw_packet(
"sender": None,
}
# Compute packet hash once for threading into message broadcasts (used by bot fanout).
pkt_hash = calculate_packet_hash(raw_bytes)
# Process packets based on payload type
# For GROUP_TEXT, we always try to decrypt even for duplicate packets - the message
# deduplication in create_message_from_decrypted handles adding paths to existing messages.
# This is more reliable than trying to look up the message via raw packet linking.
if payload_type == PayloadType.GROUP_TEXT:
decrypt_result = await _process_group_text(
raw_bytes, packet_id, ts, packet_info, rssi=rssi, snr=snr
raw_bytes, packet_id, ts, packet_info, rssi=rssi, snr=snr, packet_hash=pkt_hash
)
if decrypt_result:
result.update(decrypt_result)
@@ -342,7 +350,7 @@ async def process_raw_packet(
elif payload_type == PayloadType.TEXT_MESSAGE:
# Try to decrypt direct messages using stored private key and known contacts
decrypt_result = await _process_direct_message(
raw_bytes, packet_id, ts, packet_info, rssi=rssi, snr=snr
raw_bytes, packet_id, ts, packet_info, rssi=rssi, snr=snr, packet_hash=pkt_hash
)
if decrypt_result:
result.update(decrypt_result)
@@ -384,6 +392,7 @@ async def _process_group_text(
packet_info: PacketInfo | None,
rssi: int | None = None,
snr: float | None = None,
packet_hash: str | None = None,
) -> dict | None:
"""
Process a GroupText (channel message) packet.
@@ -422,6 +431,7 @@ async def _process_group_text(
path_len=packet_info.path_length if packet_info else None,
rssi=rssi,
snr=snr,
packet_hash=packet_hash,
)
return {
@@ -567,6 +577,7 @@ async def _process_direct_message(
packet_info: PacketInfo | None,
rssi: int | None = None,
snr: float | None = None,
packet_hash: str | None = None,
) -> dict | None:
"""
Process a TEXT_MESSAGE (direct message) packet.
@@ -690,6 +701,7 @@ async def _process_direct_message(
rssi=rssi,
snr=snr,
outgoing=effective_outgoing,
packet_hash=packet_hash,
)
return {
+28
View File
@@ -9,6 +9,7 @@ The path_len wire byte is packed as [hash_mode:2][hop_count:6]:
Mode 3 (hash_size=4) is reserved and rejected.
"""
import hashlib
from collections.abc import Iterable
from dataclasses import dataclass
@@ -289,3 +290,30 @@ def bucket_path_hash_widths(rows: Iterable) -> dict[str, int | float]:
"double_byte_pct": (double_byte / total) * 100,
"triple_byte_pct": (triple_byte / total) * 100,
}
def calculate_packet_hash(raw_bytes: bytes) -> str:
"""Calculate packet hash matching MeshCore's Packet::calculatePacketHash().
Parses the packet structure to extract payload type and payload data,
then hashes: payload_type(1 byte) [+ path_len(2 bytes LE) for TRACE] + payload_data.
Returns first 16 hex characters (uppercase).
"""
if not raw_bytes:
return "0" * 16
try:
envelope = parse_packet_envelope(raw_bytes)
if envelope is None:
return "0" * 16
hash_obj = hashlib.sha256()
hash_obj.update(bytes([envelope.payload_type]))
# TRACE hash uses the raw wire byte (not decoded hop count) to match firmware.
if envelope.payload_type == 9: # PAYLOAD_TYPE_TRACE
hash_obj.update(envelope.path_byte.to_bytes(2, byteorder="little"))
hash_obj.update(envelope.payload)
return hash_obj.hexdigest()[:16].upper()
except Exception:
return "0" * 16
+6 -1
View File
@@ -154,6 +154,7 @@ async def _store_direct_message(
update_last_contacted_key: str | None,
best_effort_content_dedup: bool,
linked_packet_dedup: bool,
packet_hash: str | None = None,
message_repository=MessageRepository,
contact_repository=ContactRepository,
raw_packet_repository=RawPacketRepository,
@@ -248,7 +249,9 @@ async def _store_direct_message(
sender_name=sender_name,
packet_id=packet_id,
)
broadcast_message(message=message, broadcast_fn=broadcast_fn, realtime=realtime)
broadcast_message(
message=message, broadcast_fn=broadcast_fn, realtime=realtime, packet_hash=packet_hash
)
if update_last_contacted_key:
await contact_repository.update_last_contacted(update_last_contacted_key, received_at)
@@ -279,6 +282,7 @@ async def ingest_decrypted_direct_message(
outgoing: bool = False,
realtime: bool = True,
broadcast_fn: BroadcastFn,
packet_hash: str | None = None,
contact_repository=ContactRepository,
) -> Message | None:
conversation_key = their_public_key.lower()
@@ -338,6 +342,7 @@ async def ingest_decrypted_direct_message(
update_last_contacted_key=conversation_key,
best_effort_content_dedup=outgoing,
linked_packet_dedup=True,
packet_hash=packet_hash,
)
if message is None:
return None
+7
View File
@@ -95,9 +95,12 @@ def broadcast_message(
message: Message,
broadcast_fn: BroadcastFn,
realtime: bool | None = None,
packet_hash: str | None = None,
) -> None:
"""Broadcast a message payload, preserving the caller's broadcast signature."""
payload = message.model_dump()
if packet_hash is not None:
payload["packet_hash"] = packet_hash
if realtime is None:
broadcast_fn("message", payload)
else:
@@ -272,6 +275,7 @@ async def create_message_from_decrypted(
channel_name: str | None = None,
realtime: bool = True,
broadcast_fn: BroadcastFn,
packet_hash: str | None = None,
) -> int | None:
"""Store and broadcast a decrypted channel message."""
received = received_at or int(time.time())
@@ -340,6 +344,7 @@ async def create_message_from_decrypted(
),
broadcast_fn=broadcast_fn,
realtime=realtime,
packet_hash=packet_hash,
)
return msg_id
@@ -359,6 +364,7 @@ async def create_dm_message_from_decrypted(
outgoing: bool = False,
realtime: bool = True,
broadcast_fn: BroadcastFn,
packet_hash: str | None = None,
) -> int | None:
"""Store and broadcast a decrypted direct message."""
from app.services.dm_ingest import ingest_decrypted_direct_message
@@ -375,6 +381,7 @@ async def create_dm_message_from_decrypted(
outgoing=outgoing,
realtime=realtime,
broadcast_fn=broadcast_fn,
packet_hash=packet_hash,
)
return message.id if message is not None else None
+1 -1
View File
@@ -17,7 +17,6 @@ from app.fanout.community_mqtt import (
_base64url_encode,
_build_radio_info,
_build_status_topic,
_calculate_packet_hash,
_decode_packet_fields,
_format_raw_packet,
_generate_jwt_token,
@@ -29,6 +28,7 @@ from app.fanout.mqtt_community import (
_render_packet_topic,
)
from app.keystore import ed25519_sign_expanded
from app.path_utils import calculate_packet_hash as _calculate_packet_hash
def _make_test_keys() -> tuple[bytes, bytes]:
+6
View File
@@ -37,6 +37,7 @@ class TestBotModuleParameterExtraction:
path,
is_outgoing,
path_bytes_per_hop,
packet_hash,
):
captured["is_outgoing"] = is_outgoing
captured["is_dm"] = is_dm
@@ -86,6 +87,7 @@ class TestBotModuleParameterExtraction:
path,
is_outgoing,
path_bytes_per_hop,
packet_hash,
):
captured["is_outgoing"] = is_outgoing
return None
@@ -132,6 +134,7 @@ class TestBotModuleParameterExtraction:
path,
is_outgoing,
path_bytes_per_hop,
packet_hash,
):
captured["path"] = path
captured["path_bytes_per_hop"] = path_bytes_per_hop
@@ -180,6 +183,7 @@ class TestBotModuleParameterExtraction:
path,
is_outgoing,
path_bytes_per_hop,
packet_hash,
):
captured["message_text"] = message_text
captured["sender_name"] = sender_name
@@ -228,6 +232,7 @@ class TestBotModuleParameterExtraction:
path,
is_outgoing,
path_bytes_per_hop,
packet_hash,
):
captured["channel_name"] = channel_name
return None
@@ -275,6 +280,7 @@ class TestBotModuleParameterExtraction:
path,
is_outgoing,
path_bytes_per_hop,
packet_hash,
):
captured["sender_name"] = sender_name
captured["sender_key"] = sender_key