Add DB entry for outgoing inside the radio lock (didn't we just do the opposite?)

This commit is contained in:
Jack Kingsman
2026-03-19 20:43:35 -07:00
parent c2655c1809
commit 2b80760696
4 changed files with 170 additions and 54 deletions
+6
View File
@@ -554,6 +554,12 @@ class MessageRepository:
return MessageRepository._row_to_message(row)
@staticmethod
async def delete_by_id(message_id: int) -> None:
"""Delete a message row by ID."""
await db.conn.execute("DELETE FROM messages WHERE id = ?", (message_id,))
await db.conn.commit()
@staticmethod
async def get_by_content(
msg_type: str,
+64 -46
View File
@@ -13,7 +13,8 @@ from app.region_scope import normalize_region_scope
from app.repository import AppSettingsRepository, ContactRepository, MessageRepository
from app.services import dm_ack_tracker
from app.services.messages import (
build_message_model,
broadcast_message,
build_stored_outgoing_channel_message,
create_outgoing_channel_message,
create_outgoing_direct_message,
increment_ack_and_broadcast,
@@ -586,6 +587,23 @@ async def send_channel_message_to_channel(
requested_timestamp=sent_at,
)
timestamp_bytes = sender_timestamp.to_bytes(4, "little")
outgoing_message = await create_outgoing_channel_message(
conversation_key=channel_key_upper,
text=text_with_sender,
sender_timestamp=sender_timestamp,
received_at=sent_at,
sender_name=radio_name or None,
sender_key=our_public_key,
channel_name=channel.name,
broadcast_fn=broadcast_fn,
broadcast=False,
message_repository=message_repository,
)
if outgoing_message is None:
raise HTTPException(
status_code=500,
detail="Failed to store outgoing message - unexpected duplicate",
)
result = await send_channel_message_with_effective_scope(
mc=mc,
@@ -611,23 +629,11 @@ async def send_channel_message_to_channel(
raise HTTPException(
status_code=500, detail=f"Failed to send message: {result.payload}"
)
outgoing_message = await create_outgoing_channel_message(
conversation_key=channel_key_upper,
text=text_with_sender,
sender_timestamp=sender_timestamp,
received_at=sent_at,
sender_name=radio_name or None,
sender_key=our_public_key,
channel_name=channel.name,
broadcast_fn=broadcast_fn,
message_repository=message_repository,
)
if outgoing_message is None:
raise HTTPException(
status_code=500,
detail="Failed to store outgoing message - unexpected duplicate",
)
except Exception:
if outgoing_message is not None:
await message_repository.delete_by_id(outgoing_message.id)
outgoing_message = None
raise
finally:
if sender_timestamp is not None:
await release_outgoing_sender_timestamp(
@@ -640,22 +646,19 @@ async def send_channel_message_to_channel(
if sent_at is None or sender_timestamp is None or outgoing_message is None:
raise HTTPException(status_code=500, detail="Failed to store outgoing message")
message_id = outgoing_message.id
acked_count, paths = await message_repository.get_ack_and_paths(message_id)
return build_message_model(
message_id=message_id,
msg_type="CHAN",
outgoing_message = await build_stored_outgoing_channel_message(
message_id=outgoing_message.id,
conversation_key=channel_key_upper,
text=text_with_sender,
sender_timestamp=sender_timestamp,
received_at=sent_at,
paths=paths,
outgoing=True,
acked=acked_count,
sender_name=radio_name or None,
sender_key=our_public_key,
channel_name=channel.name,
message_repository=message_repository,
)
broadcast_message(message=outgoing_message, broadcast_fn=broadcast_fn)
return outgoing_message
async def resend_channel_message_record(
@@ -705,6 +708,23 @@ async def resend_channel_message_record(
requested_timestamp=sent_at,
)
timestamp_bytes = sender_timestamp.to_bytes(4, "little")
new_message = await create_outgoing_channel_message(
conversation_key=message.conversation_key,
text=message.text,
sender_timestamp=sender_timestamp,
received_at=sent_at,
sender_name=radio_name or None,
sender_key=resend_public_key,
channel_name=channel.name,
broadcast_fn=broadcast_fn,
broadcast=False,
message_repository=message_repository,
)
if new_message is None:
raise HTTPException(
status_code=500,
detail="Failed to store resent message - unexpected duplicate",
)
result = await send_channel_message_with_effective_scope(
mc=mc,
@@ -729,26 +749,11 @@ async def resend_channel_message_record(
status_code=500,
detail=f"Failed to resend message: {result.payload}",
)
if new_timestamp:
if sent_at is None:
raise HTTPException(status_code=500, detail="Failed to assign resend timestamp")
new_message = await create_outgoing_channel_message(
conversation_key=message.conversation_key,
text=message.text,
sender_timestamp=sender_timestamp,
received_at=sent_at,
sender_name=radio_name or None,
sender_key=resend_public_key,
channel_name=channel.name,
broadcast_fn=broadcast_fn,
message_repository=message_repository,
)
if new_message is None:
raise HTTPException(
status_code=500,
detail="Failed to store resent message - unexpected duplicate",
)
except Exception:
if new_message is not None:
await message_repository.delete_by_id(new_message.id)
new_message = None
raise
finally:
if new_timestamp and sent_at is not None:
await release_outgoing_sender_timestamp(
@@ -762,6 +767,19 @@ async def resend_channel_message_record(
if sent_at is None or new_message is None:
raise HTTPException(status_code=500, detail="Failed to assign resend timestamp")
new_message = await build_stored_outgoing_channel_message(
message_id=new_message.id,
conversation_key=message.conversation_key,
text=message.text,
sender_timestamp=sender_timestamp,
received_at=sent_at,
sender_name=radio_name or None,
sender_key=resend_public_key,
channel_name=channel.name,
message_repository=message_repository,
)
broadcast_message(message=new_message, broadcast_fn=broadcast_fn)
logger.info(
"Resent channel message %d as new message %d to %s",
message.id,
+35 -5
View File
@@ -96,6 +96,36 @@ def broadcast_message(
broadcast_fn("message", payload, realtime=realtime)
async def build_stored_outgoing_channel_message(
*,
message_id: int,
conversation_key: str,
text: str,
sender_timestamp: int,
received_at: int,
sender_name: str | None,
sender_key: str | None,
channel_name: str | None,
message_repository=MessageRepository,
) -> Message:
"""Build the current payload for a stored outgoing channel message."""
acked_count, paths = await message_repository.get_ack_and_paths(message_id)
return build_message_model(
message_id=message_id,
msg_type="CHAN",
conversation_key=conversation_key,
text=text,
sender_timestamp=sender_timestamp,
received_at=received_at,
paths=paths,
outgoing=True,
acked=acked_count,
sender_name=sender_name,
sender_key=sender_key,
channel_name=channel_name,
)
def broadcast_message_acked(
*,
message_id: int,
@@ -428,6 +458,7 @@ async def create_outgoing_channel_message(
sender_key: str | None,
channel_name: str | None,
broadcast_fn: BroadcastFn,
broadcast: bool = True,
message_repository=MessageRepository,
) -> Message | None:
"""Store and broadcast an outgoing channel message."""
@@ -444,18 +475,17 @@ async def create_outgoing_channel_message(
if msg_id is None:
return None
message = build_message_model(
message = await build_stored_outgoing_channel_message(
message_id=msg_id,
msg_type="CHAN",
conversation_key=conversation_key,
text=text,
sender_timestamp=sender_timestamp,
received_at=received_at,
outgoing=True,
acked=0,
sender_name=sender_name,
sender_key=sender_key,
channel_name=channel_name,
message_repository=message_repository,
)
broadcast_message(message=message, broadcast_fn=broadcast_fn)
if broadcast:
broadcast_message(message=message, broadcast_fn=broadcast_fn)
return message
+65 -3
View File
@@ -1521,10 +1521,10 @@ class TestConcurrentChannelSends:
class TestChannelSendLockScope:
"""Channel send should release the radio lock before DB persistence work."""
"""Channel send should persist the outgoing row while the radio lock is held."""
@pytest.mark.asyncio
async def test_channel_message_row_created_after_radio_lock_released(self, test_db):
async def test_channel_message_row_created_inside_radio_lock(self, test_db):
mc = _make_mc(name="TestNode")
chan_key = "de" * 16
await ChannelRepository.upsert(key=chan_key, name="#lockscope")
@@ -1549,4 +1549,66 @@ class TestChannelSendLockScope:
SendChannelMessageRequest(channel_key=chan_key, text="Lock scope test")
)
assert observed_lock_states == [False]
assert observed_lock_states == [True]
@pytest.mark.asyncio
async def test_channel_self_observation_during_send_reconciles_to_reserved_outgoing_row(
self, test_db
):
"""A self-observation that arrives during send should update the reserved outgoing row."""
from app.services.messages import create_fallback_channel_message
mc = _make_mc(name="TestNode")
chan_key = "ef" * 16
await ChannelRepository.upsert(key=chan_key, name="#race")
broadcasts = []
def capture_broadcast(event_type, data, *args, **kwargs):
broadcasts.append({"type": event_type, "data": data})
async def send_with_self_observation(*args, **kwargs):
timestamp_bytes = kwargs["timestamp"]
sender_timestamp = int.from_bytes(timestamp_bytes, "little")
await create_fallback_channel_message(
conversation_key=chan_key.upper(),
message_text="Hello race",
sender_timestamp=sender_timestamp,
received_at=int(time.time()),
path="a1b2",
path_len=2,
txt_type=0,
sender_name="TestNode",
channel_name="#race",
broadcast_fn=capture_broadcast,
)
return _make_radio_result()
mc.commands.send_chan_msg = AsyncMock(side_effect=send_with_self_observation)
with (
patch("app.routers.messages.require_connected", return_value=mc),
patch.object(radio_manager, "_meshcore", mc),
patch("app.routers.messages.broadcast_event", side_effect=capture_broadcast),
):
message = await send_channel_message(
SendChannelMessageRequest(channel_key=chan_key, text="Hello race")
)
assert message.outgoing is True
assert message.acked == 1
assert message.paths is not None
assert len(message.paths) == 1
assert message.paths[0].path == "a1b2"
stored = await MessageRepository.get_all(
msg_type="CHAN", conversation_key=chan_key.upper(), limit=10
)
assert len(stored) == 1
assert stored[0].outgoing is True
assert stored[0].acked == 1
message_events = [entry for entry in broadcasts if entry["type"] == "message"]
ack_events = [entry for entry in broadcasts if entry["type"] == "message_acked"]
assert len(message_events) == 1
assert len(ack_events) == 1