Tighten up message broadcast contract

This commit is contained in:
Jack Kingsman
2026-03-06 15:55:04 -08:00
parent 3330028d27
commit dd13768a44
6 changed files with 220 additions and 8 deletions

View File

@@ -122,6 +122,28 @@ THE SOFTWARE.
</details>
### httpx (0.28.1) — BSD License
<details>
<summary>Full license text</summary>
```
Copyright © 2019, [Encode OSS Ltd](https://www.encode.io/).
All rights reserved.
Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met:
* Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution.
* Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote products derived from this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
```
</details>
### meshcore (2.2.5) — MIT
<details>

View File

@@ -56,7 +56,7 @@ class BotModule(FanoutModule):
# Extract bot parameters from broadcast data
if is_dm:
conversation_key = data.get("conversation_key", "")
sender_key = conversation_key
sender_key = data.get("sender_key") or conversation_key
is_outgoing = data.get("outgoing", False)
message_text = data.get("text", "")
channel_key = None
@@ -66,10 +66,12 @@ class BotModule(FanoutModule):
if is_outgoing:
sender_name = None
else:
from app.repository import ContactRepository
sender_name = data.get("sender_name")
if sender_name is None:
from app.repository import ContactRepository
contact = await ContactRepository.get_by_key(conversation_key)
sender_name = contact.name if contact else None
contact = await ContactRepository.get_by_key(conversation_key)
sender_name = contact.name if contact else None
else:
conversation_key = data.get("conversation_key", "")
sender_key = None
@@ -77,11 +79,12 @@ class BotModule(FanoutModule):
sender_name = data.get("sender_name")
channel_key = conversation_key
# Look up channel name
from app.repository import ChannelRepository
channel_name = data.get("channel_name")
if channel_name is None:
from app.repository import ChannelRepository
channel = await ChannelRepository.get_by_key(conversation_key)
channel_name = channel.name if channel else None
channel = await ChannelRepository.get_by_key(conversation_key)
channel_name = channel.name if channel else None
# Strip "sender: " prefix from channel message text
text = data.get("text", "")

View File

@@ -296,6 +296,7 @@ async def send_channel_message(request: SendChannelMessageRequest) -> Message:
acked=0,
sender_name=radio_name or None,
sender_key=our_public_key,
channel_name=db_channel.name,
).model_dump(),
)
@@ -316,6 +317,7 @@ async def send_channel_message(request: SendChannelMessageRequest) -> Message:
paths=paths,
sender_name=radio_name or None,
sender_key=our_public_key,
channel_name=db_channel.name,
)
return message
@@ -444,6 +446,7 @@ async def resend_channel_message(
acked=0,
sender_name=radio_name or None,
sender_key=resend_public_key,
channel_name=db_channel.name,
).model_dump(),
)

View File

@@ -203,6 +203,101 @@ class TestBotModuleParameterExtraction:
assert captured["message_text"] == "the actual message"
assert captured["sender_name"] == "Alice"
@pytest.mark.asyncio
async def test_channel_name_uses_payload_before_db_lookup(self):
"""Channel fanout payload channel_name is preserved even if the DB lookup misses."""
from app.fanout.bot import BotModule
captured = {}
def fake_execute(
code,
sender_name,
sender_key,
message_text,
is_dm,
channel_key,
channel_name,
sender_timestamp,
path,
is_outgoing,
):
captured["channel_name"] = channel_name
return None
mod = BotModule("test", {"code": "def bot(**k): pass"}, name="Test")
with (
patch("app.fanout.bot_exec.execute_bot_code", side_effect=fake_execute),
patch(
"app.fanout.bot_exec._bot_semaphore",
MagicMock(__aenter__=AsyncMock(), __aexit__=AsyncMock()),
),
patch("app.fanout.bot.asyncio.sleep", new_callable=AsyncMock),
patch("app.repository.ChannelRepository") as mock_chan,
):
mock_chan.get_by_key = AsyncMock(return_value=None)
await mod._run_for_message(
{
"type": "CHAN",
"conversation_key": "ch1",
"channel_name": "#payload",
"text": "Alice: hello",
"sender_name": "Alice",
}
)
assert captured["channel_name"] == "#payload"
@pytest.mark.asyncio
async def test_dm_sender_name_uses_payload_before_db_lookup(self):
"""Incoming DM sender_name from the message payload should be preserved."""
from app.fanout.bot import BotModule
captured = {}
def fake_execute(
code,
sender_name,
sender_key,
message_text,
is_dm,
channel_key,
channel_name,
sender_timestamp,
path,
is_outgoing,
):
captured["sender_name"] = sender_name
captured["sender_key"] = sender_key
return None
mod = BotModule("test", {"code": "def bot(**k): pass"}, name="Test")
with (
patch("app.fanout.bot_exec.execute_bot_code", side_effect=fake_execute),
patch(
"app.fanout.bot_exec._bot_semaphore",
MagicMock(__aenter__=AsyncMock(), __aexit__=AsyncMock()),
),
patch("app.fanout.bot.asyncio.sleep", new_callable=AsyncMock),
patch("app.repository.ContactRepository") as mock_contact,
):
mock_contact.get_by_key = AsyncMock(return_value=None)
await mod._run_for_message(
{
"type": "PRIV",
"conversation_key": "pk1",
"sender_name": "PayloadAlice",
"sender_key": "pk1",
"text": "hello",
"outgoing": False,
}
)
assert captured["sender_name"] == "PayloadAlice"
assert captured["sender_key"] == "pk1"
# ---------------------------------------------------------------------------
# T2: Migration 036, 037, 038 tests

View File

@@ -196,6 +196,54 @@ class TestFanoutMqttIntegration:
assert "alpha/dm:pk1" in topics
assert "beta/dm:pk1" in topics
@pytest.mark.asyncio
async def test_private_mqtt_preserves_full_message_payload(self, mqtt_broker, integration_db):
"""Private MQTT publishes the full message payload without dropping fields."""
from unittest.mock import patch
cfg = await FanoutConfigRepository.create(
config_type="mqtt_private",
name="Full Payload",
config=_private_config(mqtt_broker.port, "mesh"),
scope={"messages": "all", "raw_packets": "all"},
enabled=True,
)
payload = {
"type": "CHAN",
"conversation_key": "ch1",
"channel_name": "#general",
"text": "Alice: hello mqtt",
"sender_name": "Alice",
"sender_key": "ab" * 32,
"sender_timestamp": 1700000000,
"received_at": 1700000001,
"paths": [{"path": "aabb", "received_at": 1700000001}],
"outgoing": False,
"acked": 2,
}
manager = FanoutManager()
with (
patch("app.fanout.mqtt_base._broadcast_health"),
patch("app.websocket.broadcast_success"),
patch("app.websocket.broadcast_error"),
patch("app.websocket.broadcast_health"),
):
try:
await manager.load_from_db()
await _wait_connected(manager, cfg["id"])
await manager.broadcast_message(payload)
messages = await mqtt_broker.wait_for(1)
finally:
await manager.stop_all()
assert len(messages) == 1
topic, body = messages[0]
assert topic == "mesh/gm:ch1"
assert body == payload
@pytest.mark.asyncio
async def test_one_disabled_only_enabled_receives(self, mqtt_broker, integration_db):
"""Disabled integration must not publish any messages."""
@@ -565,6 +613,44 @@ class TestFanoutWebhookIntegration:
assert len(results) == 1
assert results[0]["headers"].get("x-custom") == "my-value"
@pytest.mark.asyncio
async def test_webhook_preserves_full_message_payload(self, webhook_server, integration_db):
"""Webhook delivers the full message payload body without dropping fields."""
cfg = await FanoutConfigRepository.create(
config_type="webhook",
name="Full Payload Hook",
config=_webhook_config(webhook_server.port),
scope={"messages": "all", "raw_packets": "none"},
enabled=True,
)
payload = {
"type": "CHAN",
"conversation_key": "ch1",
"channel_name": "#general",
"text": "Alice: hello webhook",
"sender_name": "Alice",
"sender_key": "ab" * 32,
"sender_timestamp": 1700000000,
"received_at": 1700000001,
"paths": [{"path": "aabb", "received_at": 1700000001}],
"outgoing": False,
"acked": 2,
}
manager = FanoutManager()
try:
await manager.load_from_db()
await _wait_connected(manager, cfg["id"])
await manager.broadcast_message(payload)
results = await webhook_server.wait_for(1)
finally:
await manager.stop_all()
assert len(results) == 1
assert results[0]["body"] == payload
@pytest.mark.asyncio
async def test_webhook_hmac_signature(self, webhook_server, integration_db):
"""Webhook sends HMAC-SHA256 signature when hmac_secret is configured."""

View File

@@ -157,6 +157,7 @@ class TestOutgoingChannelBroadcast:
assert data["type"] == "CHAN"
assert data["conversation_key"] == chan_key.upper()
assert data["sender_name"] == "MyNode"
assert data["channel_name"] == "#general"
@pytest.mark.asyncio
async def test_send_channel_msg_response_includes_current_ack_count(self, test_db):
@@ -177,6 +178,7 @@ class TestOutgoingChannelBroadcast:
# Fresh message has acked=0
assert message.id is not None
assert message.acked == 0
assert message.channel_name == "#acked"
@pytest.mark.asyncio
async def test_send_channel_msg_includes_sender_key(self, test_db):
@@ -498,6 +500,7 @@ class TestResendChannelMessage:
assert event_type == "message"
assert event_data["id"] == result["message_id"]
assert event_data["outgoing"] is True
assert event_data["channel_name"] == "#broadcast"
@pytest.mark.asyncio
async def test_resend_byte_perfect_still_enforces_window(self, test_db):