From 67873e8dd9ff887cdbe3b89389b962ddb7f82d8e Mon Sep 17 00:00:00 2001 From: Jack Kingsman Date: Fri, 3 Apr 2026 17:47:44 -0700 Subject: [PATCH] Drop some duplicated logic and defns --- app/path_utils.py | 48 ++++++++++++++++++++++++++++++++++++ app/repository/messages.py | 31 ++--------------------- app/repository/settings.py | 45 ++------------------------------- app/routers/radio.py | 5 +--- app/services/dm_ack_apply.py | 7 +----- app/services/dm_ingest.py | 6 ++--- app/services/message_send.py | 3 +-- tests/test_statistics.py | 2 +- 8 files changed, 58 insertions(+), 89 deletions(-) diff --git a/app/path_utils.py b/app/path_utils.py index 6a33faa..7d4d731 100644 --- a/app/path_utils.py +++ b/app/path_utils.py @@ -244,3 +244,51 @@ def parse_explicit_hop_route(route_text: str) -> tuple[str, int, int]: raise ValueError(f"Explicit path exceeds MAX_PATH_SIZE={MAX_PATH_SIZE} bytes") return "".join(hops), len(hops), hash_size - 1 + + +async def bucket_path_hash_widths(cursor, *, batch_size: int = 500) -> dict[str, int | float]: + """Bucket raw packet rows by hop hash width and return counts + percentages. + + *cursor* must be an already-executed async cursor whose rows have a ``data`` + column containing raw packet bytes. + """ + single_byte = 0 + double_byte = 0 + triple_byte = 0 + + while True: + rows = await cursor.fetchmany(batch_size) + if not rows: + break + for row in rows: + envelope = parse_packet_envelope(bytes(row["data"])) + if envelope is None: + continue + if envelope.hash_size == 1: + single_byte += 1 + elif envelope.hash_size == 2: + double_byte += 1 + elif envelope.hash_size == 3: + triple_byte += 1 + + total = single_byte + double_byte + triple_byte + if total == 0: + return { + "total_packets": 0, + "single_byte": 0, + "double_byte": 0, + "triple_byte": 0, + "single_byte_pct": 0.0, + "double_byte_pct": 0.0, + "triple_byte_pct": 0.0, + } + + return { + "total_packets": total, + "single_byte": single_byte, + "double_byte": double_byte, + "triple_byte": triple_byte, + "single_byte_pct": (single_byte / total) * 100, + "double_byte_pct": (double_byte / total) * 100, + "triple_byte_pct": (triple_byte / total) * 100, + } diff --git a/app/repository/messages.py b/app/repository/messages.py index cee28d3..d83e947 100644 --- a/app/repository/messages.py +++ b/app/repository/messages.py @@ -804,7 +804,7 @@ class MessageRepository: """ import time as _time - from app.path_utils import parse_packet_envelope + from app.path_utils import bucket_path_hash_widths now = int(_time.time()) t_1h = now - 3600 @@ -858,9 +858,6 @@ class MessageRepository: ] # Path hash width distribution for last 24h (in-Python parse of raw packet envelopes) - single_byte = 0 - double_byte = 0 - triple_byte = 0 cursor3 = await db.conn.execute( """ SELECT rp.data FROM raw_packets rp @@ -870,31 +867,7 @@ class MessageRepository: """, (conversation_key, t_24h), ) - while True: - batch = await cursor3.fetchmany(500) - if not batch: - break - for pkt_row in batch: - envelope = parse_packet_envelope(bytes(pkt_row["data"])) - if envelope is None: - continue - if envelope.hash_size == 1: - single_byte += 1 - elif envelope.hash_size == 2: - double_byte += 1 - elif envelope.hash_size == 3: - triple_byte += 1 - - hash_total = single_byte + double_byte + triple_byte - path_hash_width_24h = { - "total_packets": hash_total, - "single_byte": single_byte, - "double_byte": double_byte, - "triple_byte": triple_byte, - "single_byte_pct": (single_byte / hash_total * 100) if hash_total else 0.0, - "double_byte_pct": (double_byte / hash_total * 100) if hash_total else 0.0, - "triple_byte_pct": (triple_byte / hash_total * 100) if hash_total else 0.0, - } + path_hash_width_24h = await bucket_path_hash_widths(cursor3) return { "message_counts": message_counts, diff --git a/app/repository/settings.py b/app/repository/settings.py index 3f1b057..d7c82bb 100644 --- a/app/repository/settings.py +++ b/app/repository/settings.py @@ -5,7 +5,7 @@ from typing import Any, Literal from app.database import db from app.models import AppSettings, Favorite -from app.path_utils import parse_packet_envelope +from app.path_utils import bucket_path_hash_widths logger = logging.getLogger(__name__) @@ -324,48 +324,7 @@ class StatisticsRepository: "SELECT data FROM raw_packets WHERE timestamp >= ?", (now - SECONDS_24H,), ) - - single_byte = 0 - double_byte = 0 - triple_byte = 0 - - while True: - rows = await cursor.fetchmany(RAW_PACKET_STATS_BATCH_SIZE) - if not rows: - break - - for row in rows: - envelope = parse_packet_envelope(bytes(row["data"])) - if envelope is None: - continue - if envelope.hash_size == 1: - single_byte += 1 - elif envelope.hash_size == 2: - double_byte += 1 - elif envelope.hash_size == 3: - triple_byte += 1 - - total_packets = single_byte + double_byte + triple_byte - if total_packets == 0: - return { - "total_packets": 0, - "single_byte": 0, - "double_byte": 0, - "triple_byte": 0, - "single_byte_pct": 0.0, - "double_byte_pct": 0.0, - "triple_byte_pct": 0.0, - } - - return { - "total_packets": total_packets, - "single_byte": single_byte, - "double_byte": double_byte, - "triple_byte": triple_byte, - "single_byte_pct": (single_byte / total_packets) * 100, - "double_byte_pct": (double_byte / total_packets) * 100, - "triple_byte_pct": (triple_byte / total_packets) * 100, - } + return await bucket_path_hash_widths(cursor, batch_size=RAW_PACKET_STATS_BATCH_SIZE) @staticmethod async def get_all() -> dict: diff --git a/app/routers/radio.py b/app/routers/radio.py index 906500b..0bd4ce5 100644 --- a/app/routers/radio.py +++ b/app/routers/radio.py @@ -23,6 +23,7 @@ from app.models import ( from app.radio_sync import send_advertisement as do_send_advertisement from app.radio_sync import sync_radio_time from app.repository import ContactRepository +from app.routers.server_control import _monotonic from app.services.contact_reconciliation import ( promote_prefix_contacts_for_contact, reconcile_contact_messages, @@ -135,10 +136,6 @@ class RadioAdvertiseRequest(BaseModel): ) -def _monotonic() -> float: - return time.monotonic() - - def _better_signal(first: float | None, second: float | None) -> float | None: if first is None: return second diff --git a/app/services/dm_ack_apply.py b/app/services/dm_ack_apply.py index 66368dc..2e2cd9e 100644 --- a/app/services/dm_ack_apply.py +++ b/app/services/dm_ack_apply.py @@ -1,12 +1,7 @@ """Shared direct-message ACK application logic.""" -from collections.abc import Callable -from typing import Any - from app.services import dm_ack_tracker -from app.services.messages import increment_ack_and_broadcast - -BroadcastFn = Callable[..., Any] +from app.services.messages import BroadcastFn, increment_ack_and_broadcast async def apply_dm_ack_code(ack_code: str, *, broadcast_fn: BroadcastFn) -> bool: diff --git a/app/services/dm_ingest.py b/app/services/dm_ingest.py index 783cce4..00ec64a 100644 --- a/app/services/dm_ingest.py +++ b/app/services/dm_ingest.py @@ -1,9 +1,8 @@ import asyncio import logging import time -from collections.abc import Callable from dataclasses import dataclass -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING from app.models import CONTACT_TYPE_REPEATER, CONTACT_TYPE_ROOM, Contact, ContactUpsert, Message from app.repository import ( @@ -14,6 +13,7 @@ from app.repository import ( ) from app.services.contact_reconciliation import claim_prefix_messages_for_contact from app.services.messages import ( + BroadcastFn, broadcast_message, build_message_model, build_message_paths, @@ -27,8 +27,6 @@ if TYPE_CHECKING: from app.decoder import DecryptedDirectMessage logger = logging.getLogger(__name__) - -BroadcastFn = Callable[..., Any] _decrypted_dm_store_lock = asyncio.Lock() diff --git a/app/services/message_send.py b/app/services/message_send.py index 232ac15..dce3124 100644 --- a/app/services/message_send.py +++ b/app/services/message_send.py @@ -20,6 +20,7 @@ from app.repository import ( ) from app.services import dm_ack_tracker from app.services.messages import ( + BroadcastFn, broadcast_message, build_stored_outgoing_channel_message, create_outgoing_channel_message, @@ -33,8 +34,6 @@ NO_RADIO_RESPONSE_AFTER_SEND_DETAIL = ( "Send command was issued to the radio, but no response was heard back. " "The message may or may not have sent successfully." ) - -BroadcastFn = Callable[..., Any] TrackAckFn = Callable[[str, int, int], bool] NowFn = Callable[[], float] OutgoingReservationKey = tuple[str, str, str] diff --git a/tests/test_statistics.py b/tests/test_statistics.py index 79d5ff2..89c2d66 100644 --- a/tests/test_statistics.py +++ b/tests/test_statistics.py @@ -386,7 +386,7 @@ class TestPathHashWidthStats: with ( patch.object(test_db.conn, "execute", new=AsyncMock(return_value=fake_cursor)), - patch("app.repository.settings.parse_packet_envelope", side_effect=fake_parse), + patch("app.path_utils.parse_packet_envelope", side_effect=fake_parse), ): breakdown = await StatisticsRepository._path_hash_width_24h()