From 2b80760696bde0c9e74ec4b01f494e959251d7fa Mon Sep 17 00:00:00 2001 From: Jack Kingsman Date: Thu, 19 Mar 2026 20:43:35 -0700 Subject: [PATCH] Add DB entry for outgoing inside the radio lock (didn't we just do the opposite?) --- app/repository/messages.py | 6 ++ app/services/message_send.py | 110 ++++++++++++++++++++--------------- app/services/messages.py | 40 +++++++++++-- tests/test_send_messages.py | 68 +++++++++++++++++++++- 4 files changed, 170 insertions(+), 54 deletions(-) diff --git a/app/repository/messages.py b/app/repository/messages.py index 8b73cf6..945017e 100644 --- a/app/repository/messages.py +++ b/app/repository/messages.py @@ -554,6 +554,12 @@ class MessageRepository: return MessageRepository._row_to_message(row) + @staticmethod + async def delete_by_id(message_id: int) -> None: + """Delete a message row by ID.""" + await db.conn.execute("DELETE FROM messages WHERE id = ?", (message_id,)) + await db.conn.commit() + @staticmethod async def get_by_content( msg_type: str, diff --git a/app/services/message_send.py b/app/services/message_send.py index 841efa7..d93136a 100644 --- a/app/services/message_send.py +++ b/app/services/message_send.py @@ -13,7 +13,8 @@ from app.region_scope import normalize_region_scope from app.repository import AppSettingsRepository, ContactRepository, MessageRepository from app.services import dm_ack_tracker from app.services.messages import ( - build_message_model, + broadcast_message, + build_stored_outgoing_channel_message, create_outgoing_channel_message, create_outgoing_direct_message, increment_ack_and_broadcast, @@ -586,6 +587,23 @@ async def send_channel_message_to_channel( requested_timestamp=sent_at, ) timestamp_bytes = sender_timestamp.to_bytes(4, "little") + outgoing_message = await create_outgoing_channel_message( + conversation_key=channel_key_upper, + text=text_with_sender, + sender_timestamp=sender_timestamp, + received_at=sent_at, + sender_name=radio_name or None, + sender_key=our_public_key, + channel_name=channel.name, + broadcast_fn=broadcast_fn, + broadcast=False, + message_repository=message_repository, + ) + if outgoing_message is None: + raise HTTPException( + status_code=500, + detail="Failed to store outgoing message - unexpected duplicate", + ) result = await send_channel_message_with_effective_scope( mc=mc, @@ -611,23 +629,11 @@ async def send_channel_message_to_channel( raise HTTPException( status_code=500, detail=f"Failed to send message: {result.payload}" ) - - outgoing_message = await create_outgoing_channel_message( - conversation_key=channel_key_upper, - text=text_with_sender, - sender_timestamp=sender_timestamp, - received_at=sent_at, - sender_name=radio_name or None, - sender_key=our_public_key, - channel_name=channel.name, - broadcast_fn=broadcast_fn, - message_repository=message_repository, - ) - if outgoing_message is None: - raise HTTPException( - status_code=500, - detail="Failed to store outgoing message - unexpected duplicate", - ) + except Exception: + if outgoing_message is not None: + await message_repository.delete_by_id(outgoing_message.id) + outgoing_message = None + raise finally: if sender_timestamp is not None: await release_outgoing_sender_timestamp( @@ -640,22 +646,19 @@ async def send_channel_message_to_channel( if sent_at is None or sender_timestamp is None or outgoing_message is None: raise HTTPException(status_code=500, detail="Failed to store outgoing message") - message_id = outgoing_message.id - acked_count, paths = await message_repository.get_ack_and_paths(message_id) - return build_message_model( - message_id=message_id, - msg_type="CHAN", + outgoing_message = await build_stored_outgoing_channel_message( + message_id=outgoing_message.id, conversation_key=channel_key_upper, text=text_with_sender, sender_timestamp=sender_timestamp, received_at=sent_at, - paths=paths, - outgoing=True, - acked=acked_count, sender_name=radio_name or None, sender_key=our_public_key, channel_name=channel.name, + message_repository=message_repository, ) + broadcast_message(message=outgoing_message, broadcast_fn=broadcast_fn) + return outgoing_message async def resend_channel_message_record( @@ -705,6 +708,23 @@ async def resend_channel_message_record( requested_timestamp=sent_at, ) timestamp_bytes = sender_timestamp.to_bytes(4, "little") + new_message = await create_outgoing_channel_message( + conversation_key=message.conversation_key, + text=message.text, + sender_timestamp=sender_timestamp, + received_at=sent_at, + sender_name=radio_name or None, + sender_key=resend_public_key, + channel_name=channel.name, + broadcast_fn=broadcast_fn, + broadcast=False, + message_repository=message_repository, + ) + if new_message is None: + raise HTTPException( + status_code=500, + detail="Failed to store resent message - unexpected duplicate", + ) result = await send_channel_message_with_effective_scope( mc=mc, @@ -729,26 +749,11 @@ async def resend_channel_message_record( status_code=500, detail=f"Failed to resend message: {result.payload}", ) - - if new_timestamp: - if sent_at is None: - raise HTTPException(status_code=500, detail="Failed to assign resend timestamp") - new_message = await create_outgoing_channel_message( - conversation_key=message.conversation_key, - text=message.text, - sender_timestamp=sender_timestamp, - received_at=sent_at, - sender_name=radio_name or None, - sender_key=resend_public_key, - channel_name=channel.name, - broadcast_fn=broadcast_fn, - message_repository=message_repository, - ) - if new_message is None: - raise HTTPException( - status_code=500, - detail="Failed to store resent message - unexpected duplicate", - ) + except Exception: + if new_message is not None: + await message_repository.delete_by_id(new_message.id) + new_message = None + raise finally: if new_timestamp and sent_at is not None: await release_outgoing_sender_timestamp( @@ -762,6 +767,19 @@ async def resend_channel_message_record( if sent_at is None or new_message is None: raise HTTPException(status_code=500, detail="Failed to assign resend timestamp") + new_message = await build_stored_outgoing_channel_message( + message_id=new_message.id, + conversation_key=message.conversation_key, + text=message.text, + sender_timestamp=sender_timestamp, + received_at=sent_at, + sender_name=radio_name or None, + sender_key=resend_public_key, + channel_name=channel.name, + message_repository=message_repository, + ) + broadcast_message(message=new_message, broadcast_fn=broadcast_fn) + logger.info( "Resent channel message %d as new message %d to %s", message.id, diff --git a/app/services/messages.py b/app/services/messages.py index da67882..5508a6a 100644 --- a/app/services/messages.py +++ b/app/services/messages.py @@ -96,6 +96,36 @@ def broadcast_message( broadcast_fn("message", payload, realtime=realtime) +async def build_stored_outgoing_channel_message( + *, + message_id: int, + conversation_key: str, + text: str, + sender_timestamp: int, + received_at: int, + sender_name: str | None, + sender_key: str | None, + channel_name: str | None, + message_repository=MessageRepository, +) -> Message: + """Build the current payload for a stored outgoing channel message.""" + acked_count, paths = await message_repository.get_ack_and_paths(message_id) + return build_message_model( + message_id=message_id, + msg_type="CHAN", + conversation_key=conversation_key, + text=text, + sender_timestamp=sender_timestamp, + received_at=received_at, + paths=paths, + outgoing=True, + acked=acked_count, + sender_name=sender_name, + sender_key=sender_key, + channel_name=channel_name, + ) + + def broadcast_message_acked( *, message_id: int, @@ -428,6 +458,7 @@ async def create_outgoing_channel_message( sender_key: str | None, channel_name: str | None, broadcast_fn: BroadcastFn, + broadcast: bool = True, message_repository=MessageRepository, ) -> Message | None: """Store and broadcast an outgoing channel message.""" @@ -444,18 +475,17 @@ async def create_outgoing_channel_message( if msg_id is None: return None - message = build_message_model( + message = await build_stored_outgoing_channel_message( message_id=msg_id, - msg_type="CHAN", conversation_key=conversation_key, text=text, sender_timestamp=sender_timestamp, received_at=received_at, - outgoing=True, - acked=0, sender_name=sender_name, sender_key=sender_key, channel_name=channel_name, + message_repository=message_repository, ) - broadcast_message(message=message, broadcast_fn=broadcast_fn) + if broadcast: + broadcast_message(message=message, broadcast_fn=broadcast_fn) return message diff --git a/tests/test_send_messages.py b/tests/test_send_messages.py index 41a8737..2b53531 100644 --- a/tests/test_send_messages.py +++ b/tests/test_send_messages.py @@ -1521,10 +1521,10 @@ class TestConcurrentChannelSends: class TestChannelSendLockScope: - """Channel send should release the radio lock before DB persistence work.""" + """Channel send should persist the outgoing row while the radio lock is held.""" @pytest.mark.asyncio - async def test_channel_message_row_created_after_radio_lock_released(self, test_db): + async def test_channel_message_row_created_inside_radio_lock(self, test_db): mc = _make_mc(name="TestNode") chan_key = "de" * 16 await ChannelRepository.upsert(key=chan_key, name="#lockscope") @@ -1549,4 +1549,66 @@ class TestChannelSendLockScope: SendChannelMessageRequest(channel_key=chan_key, text="Lock scope test") ) - assert observed_lock_states == [False] + assert observed_lock_states == [True] + + @pytest.mark.asyncio + async def test_channel_self_observation_during_send_reconciles_to_reserved_outgoing_row( + self, test_db + ): + """A self-observation that arrives during send should update the reserved outgoing row.""" + from app.services.messages import create_fallback_channel_message + + mc = _make_mc(name="TestNode") + chan_key = "ef" * 16 + await ChannelRepository.upsert(key=chan_key, name="#race") + + broadcasts = [] + + def capture_broadcast(event_type, data, *args, **kwargs): + broadcasts.append({"type": event_type, "data": data}) + + async def send_with_self_observation(*args, **kwargs): + timestamp_bytes = kwargs["timestamp"] + sender_timestamp = int.from_bytes(timestamp_bytes, "little") + await create_fallback_channel_message( + conversation_key=chan_key.upper(), + message_text="Hello race", + sender_timestamp=sender_timestamp, + received_at=int(time.time()), + path="a1b2", + path_len=2, + txt_type=0, + sender_name="TestNode", + channel_name="#race", + broadcast_fn=capture_broadcast, + ) + return _make_radio_result() + + mc.commands.send_chan_msg = AsyncMock(side_effect=send_with_self_observation) + + with ( + patch("app.routers.messages.require_connected", return_value=mc), + patch.object(radio_manager, "_meshcore", mc), + patch("app.routers.messages.broadcast_event", side_effect=capture_broadcast), + ): + message = await send_channel_message( + SendChannelMessageRequest(channel_key=chan_key, text="Hello race") + ) + + assert message.outgoing is True + assert message.acked == 1 + assert message.paths is not None + assert len(message.paths) == 1 + assert message.paths[0].path == "a1b2" + + stored = await MessageRepository.get_all( + msg_type="CHAN", conversation_key=chan_key.upper(), limit=10 + ) + assert len(stored) == 1 + assert stored[0].outgoing is True + assert stored[0].acked == 1 + + message_events = [entry for entry in broadcasts if entry["type"] == "message"] + ack_events = [entry for entry in broadcasts if entry["type"] == "message_acked"] + assert len(message_events) == 1 + assert len(ack_events) == 1