Compact code bloat for message fire and channel sync loops

This commit is contained in:
Jack Kingsman
2026-02-27 17:03:18 -08:00
parent 884972f9e0
commit 17f6a2b8c5
4 changed files with 81 additions and 84 deletions

View File

@@ -5,7 +5,7 @@ from typing import TYPE_CHECKING
from meshcore import EventType
from app.models import CONTACT_TYPE_REPEATER, Contact
from app.models import CONTACT_TYPE_REPEATER, Contact, Message, MessagePath
from app.packet_processor import process_raw_packet
from app.repository import (
AmbiguousPublicKeyPrefixError,
@@ -126,24 +126,22 @@ async def on_contact_message(event: "Event") -> None:
# Build paths array for broadcast
path = payload.get("path")
paths = [{"path": path or "", "received_at": received_at}] if path is not None else None
paths = [MessagePath(path=path or "", received_at=received_at)] if path is not None else None
# Broadcast the new message
broadcast_event(
"message",
{
"id": msg_id,
"type": "PRIV",
"conversation_key": sender_pubkey,
"text": payload.get("text", ""),
"sender_timestamp": payload.get("sender_timestamp") or received_at,
"received_at": received_at,
"paths": paths,
"txt_type": txt_type,
"signature": payload.get("signature"),
"outgoing": False,
"acked": 0,
},
Message(
id=msg_id,
type="PRIV",
conversation_key=sender_pubkey,
text=payload.get("text", ""),
sender_timestamp=payload.get("sender_timestamp") or received_at,
received_at=received_at,
paths=paths,
txt_type=txt_type,
signature=payload.get("signature"),
).model_dump(),
)
# Update contact last_contacted (contact was already fetched above)

View File

@@ -28,7 +28,13 @@ from app.decoder import (
try_decrypt_packet_with_channel_key,
)
from app.keystore import get_private_key, get_public_key, has_private_key
from app.models import CONTACT_TYPE_REPEATER, RawPacketBroadcast, RawPacketDecryptedInfo
from app.models import (
CONTACT_TYPE_REPEATER,
Message,
MessagePath,
RawPacketBroadcast,
RawPacketDecryptedInfo,
)
from app.repository import (
ChannelRepository,
ContactAdvertPathRepository,
@@ -187,24 +193,20 @@ async def create_message_from_decrypted(
# Build paths array for broadcast
# Use "is not None" to include empty string (direct/0-hop messages)
paths = [{"path": path or "", "received_at": received}] if path is not None else None
paths = [MessagePath(path=path or "", received_at=received)] if path is not None else None
# Broadcast new message to connected clients
broadcast_event(
"message",
{
"id": msg_id,
"type": "CHAN",
"conversation_key": channel_key_normalized,
"text": text,
"sender_timestamp": timestamp,
"received_at": received,
"paths": paths,
"txt_type": 0,
"signature": None,
"outgoing": False,
"acked": 0,
},
Message(
id=msg_id,
type="CHAN",
conversation_key=channel_key_normalized,
text=text,
sender_timestamp=timestamp,
received_at=received,
paths=paths,
).model_dump(),
)
# Run bot if enabled (for incoming channel messages, not historical decryption)
@@ -307,24 +309,21 @@ async def create_dm_message_from_decrypted(
await RawPacketRepository.mark_decrypted(packet_id, msg_id)
# Build paths array for broadcast
paths = [{"path": path or "", "received_at": received}] if path is not None else None
paths = [MessagePath(path=path or "", received_at=received)] if path is not None else None
# Broadcast new message to connected clients
broadcast_event(
"message",
{
"id": msg_id,
"type": "PRIV",
"conversation_key": conversation_key,
"text": decrypted.message,
"sender_timestamp": decrypted.timestamp,
"received_at": received,
"paths": paths,
"txt_type": 0,
"signature": None,
"outgoing": outgoing,
"acked": 0,
},
Message(
id=msg_id,
type="PRIV",
conversation_key=conversation_key,
text=decrypted.message,
sender_timestamp=decrypted.timestamp,
received_at=received,
paths=paths,
outgoing=outgoing,
).model_dump(),
)
# Update contact's last_contacted timestamp (for sorting)

View File

@@ -29,6 +29,33 @@ from app.repository import (
logger = logging.getLogger(__name__)
async def upsert_channel_from_radio_slot(payload: dict, *, on_radio: bool) -> str | None:
"""Parse a radio channel-slot payload and upsert to the database.
Returns the uppercase hex key if a channel was upserted, or None if the
slot was empty/invalid.
"""
name = payload.get("channel_name", "")
secret = payload.get("channel_secret", b"")
# Skip empty channels
if not name or name == "\x00" * len(name):
return None
is_hashtag = name.startswith("#")
key_bytes = secret if isinstance(secret, bytes) else bytes(secret)
key_hex = key_bytes.hex().upper()
await ChannelRepository.upsert(
key=key_hex,
name=name,
is_hashtag=is_hashtag,
on_radio=on_radio,
)
return key_hex
# Message poll task handle
_message_poll_task: asyncio.Task | None = None
@@ -172,29 +199,15 @@ async def sync_and_offload_channels(mc: MeshCore) -> dict:
if result.type != EventType.CHANNEL_INFO:
continue
payload = result.payload
name = payload.get("channel_name", "")
secret = payload.get("channel_secret", b"")
# Skip empty channels
if not name or name == "\x00" * len(name):
continue
is_hashtag = name.startswith("#")
# Convert key bytes to hex string
key_bytes = secret if isinstance(secret, bytes) else bytes(secret)
key_hex = key_bytes.hex().upper()
# Save to database
await ChannelRepository.upsert(
key=key_hex,
name=name,
is_hashtag=is_hashtag,
key_hex = await upsert_channel_from_radio_slot(
result.payload,
on_radio=False, # We're about to clear it
)
if key_hex is None:
continue
synced += 1
logger.debug("Synced channel %s: %s", key_hex[:8], name)
logger.debug("Synced channel %s: %s", key_hex[:8], result.payload.get("channel_name"))
# Clear from radio (set empty name and zero key)
try:

View File

@@ -8,6 +8,7 @@ from pydantic import BaseModel, Field
from app.dependencies import require_connected
from app.models import Channel
from app.radio import radio_manager
from app.radio_sync import upsert_channel_from_radio_slot
from app.repository import ChannelRepository
logger = logging.getLogger(__name__)
@@ -92,26 +93,12 @@ async def sync_channels_from_radio(max_channels: int = Query(default=40, ge=1, l
result = await mc.commands.get_channel(idx)
if result.type == EventType.CHANNEL_INFO:
payload = result.payload
name = payload.get("channel_name", "")
secret = payload.get("channel_secret", b"")
# Skip empty channels
if not name or name == "\x00" * len(name):
continue
is_hashtag = name.startswith("#")
key_bytes = secret if isinstance(secret, bytes) else bytes(secret)
key_hex = key_bytes.hex().upper()
await ChannelRepository.upsert(
key=key_hex,
name=name,
is_hashtag=is_hashtag,
on_radio=True,
)
count += 1
logger.debug("Synced channel %s: %s", key_hex, name)
key_hex = await upsert_channel_from_radio_slot(result.payload, on_radio=True)
if key_hex is not None:
count += 1
logger.debug(
"Synced channel %s: %s", key_hex, result.payload.get("channel_name")
)
logger.info("Synced %d channels from radio", count)
return {"synced": count}