Drop some duplicated logic and defns

This commit is contained in:
Jack Kingsman
2026-04-03 17:47:44 -07:00
parent e2ddf5f79f
commit 67873e8dd9
8 changed files with 58 additions and 89 deletions

View File

@@ -244,3 +244,51 @@ def parse_explicit_hop_route(route_text: str) -> tuple[str, int, int]:
raise ValueError(f"Explicit path exceeds MAX_PATH_SIZE={MAX_PATH_SIZE} bytes")
return "".join(hops), len(hops), hash_size - 1
async def bucket_path_hash_widths(cursor, *, batch_size: int = 500) -> dict[str, int | float]:
"""Bucket raw packet rows by hop hash width and return counts + percentages.
*cursor* must be an already-executed async cursor whose rows have a ``data``
column containing raw packet bytes.
"""
single_byte = 0
double_byte = 0
triple_byte = 0
while True:
rows = await cursor.fetchmany(batch_size)
if not rows:
break
for row in rows:
envelope = parse_packet_envelope(bytes(row["data"]))
if envelope is None:
continue
if envelope.hash_size == 1:
single_byte += 1
elif envelope.hash_size == 2:
double_byte += 1
elif envelope.hash_size == 3:
triple_byte += 1
total = single_byte + double_byte + triple_byte
if total == 0:
return {
"total_packets": 0,
"single_byte": 0,
"double_byte": 0,
"triple_byte": 0,
"single_byte_pct": 0.0,
"double_byte_pct": 0.0,
"triple_byte_pct": 0.0,
}
return {
"total_packets": total,
"single_byte": single_byte,
"double_byte": double_byte,
"triple_byte": triple_byte,
"single_byte_pct": (single_byte / total) * 100,
"double_byte_pct": (double_byte / total) * 100,
"triple_byte_pct": (triple_byte / total) * 100,
}

View File

@@ -804,7 +804,7 @@ class MessageRepository:
"""
import time as _time
from app.path_utils import parse_packet_envelope
from app.path_utils import bucket_path_hash_widths
now = int(_time.time())
t_1h = now - 3600
@@ -858,9 +858,6 @@ class MessageRepository:
]
# Path hash width distribution for last 24h (in-Python parse of raw packet envelopes)
single_byte = 0
double_byte = 0
triple_byte = 0
cursor3 = await db.conn.execute(
"""
SELECT rp.data FROM raw_packets rp
@@ -870,31 +867,7 @@ class MessageRepository:
""",
(conversation_key, t_24h),
)
while True:
batch = await cursor3.fetchmany(500)
if not batch:
break
for pkt_row in batch:
envelope = parse_packet_envelope(bytes(pkt_row["data"]))
if envelope is None:
continue
if envelope.hash_size == 1:
single_byte += 1
elif envelope.hash_size == 2:
double_byte += 1
elif envelope.hash_size == 3:
triple_byte += 1
hash_total = single_byte + double_byte + triple_byte
path_hash_width_24h = {
"total_packets": hash_total,
"single_byte": single_byte,
"double_byte": double_byte,
"triple_byte": triple_byte,
"single_byte_pct": (single_byte / hash_total * 100) if hash_total else 0.0,
"double_byte_pct": (double_byte / hash_total * 100) if hash_total else 0.0,
"triple_byte_pct": (triple_byte / hash_total * 100) if hash_total else 0.0,
}
path_hash_width_24h = await bucket_path_hash_widths(cursor3)
return {
"message_counts": message_counts,

View File

@@ -5,7 +5,7 @@ from typing import Any, Literal
from app.database import db
from app.models import AppSettings, Favorite
from app.path_utils import parse_packet_envelope
from app.path_utils import bucket_path_hash_widths
logger = logging.getLogger(__name__)
@@ -324,48 +324,7 @@ class StatisticsRepository:
"SELECT data FROM raw_packets WHERE timestamp >= ?",
(now - SECONDS_24H,),
)
single_byte = 0
double_byte = 0
triple_byte = 0
while True:
rows = await cursor.fetchmany(RAW_PACKET_STATS_BATCH_SIZE)
if not rows:
break
for row in rows:
envelope = parse_packet_envelope(bytes(row["data"]))
if envelope is None:
continue
if envelope.hash_size == 1:
single_byte += 1
elif envelope.hash_size == 2:
double_byte += 1
elif envelope.hash_size == 3:
triple_byte += 1
total_packets = single_byte + double_byte + triple_byte
if total_packets == 0:
return {
"total_packets": 0,
"single_byte": 0,
"double_byte": 0,
"triple_byte": 0,
"single_byte_pct": 0.0,
"double_byte_pct": 0.0,
"triple_byte_pct": 0.0,
}
return {
"total_packets": total_packets,
"single_byte": single_byte,
"double_byte": double_byte,
"triple_byte": triple_byte,
"single_byte_pct": (single_byte / total_packets) * 100,
"double_byte_pct": (double_byte / total_packets) * 100,
"triple_byte_pct": (triple_byte / total_packets) * 100,
}
return await bucket_path_hash_widths(cursor, batch_size=RAW_PACKET_STATS_BATCH_SIZE)
@staticmethod
async def get_all() -> dict:

View File

@@ -23,6 +23,7 @@ from app.models import (
from app.radio_sync import send_advertisement as do_send_advertisement
from app.radio_sync import sync_radio_time
from app.repository import ContactRepository
from app.routers.server_control import _monotonic
from app.services.contact_reconciliation import (
promote_prefix_contacts_for_contact,
reconcile_contact_messages,
@@ -135,10 +136,6 @@ class RadioAdvertiseRequest(BaseModel):
)
def _monotonic() -> float:
return time.monotonic()
def _better_signal(first: float | None, second: float | None) -> float | None:
if first is None:
return second

View File

@@ -1,12 +1,7 @@
"""Shared direct-message ACK application logic."""
from collections.abc import Callable
from typing import Any
from app.services import dm_ack_tracker
from app.services.messages import increment_ack_and_broadcast
BroadcastFn = Callable[..., Any]
from app.services.messages import BroadcastFn, increment_ack_and_broadcast
async def apply_dm_ack_code(ack_code: str, *, broadcast_fn: BroadcastFn) -> bool:

View File

@@ -1,9 +1,8 @@
import asyncio
import logging
import time
from collections.abc import Callable
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any
from typing import TYPE_CHECKING
from app.models import CONTACT_TYPE_REPEATER, CONTACT_TYPE_ROOM, Contact, ContactUpsert, Message
from app.repository import (
@@ -14,6 +13,7 @@ from app.repository import (
)
from app.services.contact_reconciliation import claim_prefix_messages_for_contact
from app.services.messages import (
BroadcastFn,
broadcast_message,
build_message_model,
build_message_paths,
@@ -27,8 +27,6 @@ if TYPE_CHECKING:
from app.decoder import DecryptedDirectMessage
logger = logging.getLogger(__name__)
BroadcastFn = Callable[..., Any]
_decrypted_dm_store_lock = asyncio.Lock()

View File

@@ -20,6 +20,7 @@ from app.repository import (
)
from app.services import dm_ack_tracker
from app.services.messages import (
BroadcastFn,
broadcast_message,
build_stored_outgoing_channel_message,
create_outgoing_channel_message,
@@ -33,8 +34,6 @@ NO_RADIO_RESPONSE_AFTER_SEND_DETAIL = (
"Send command was issued to the radio, but no response was heard back. "
"The message may or may not have sent successfully."
)
BroadcastFn = Callable[..., Any]
TrackAckFn = Callable[[str, int, int], bool]
NowFn = Callable[[], float]
OutgoingReservationKey = tuple[str, str, str]

View File

@@ -386,7 +386,7 @@ class TestPathHashWidthStats:
with (
patch.object(test_db.conn, "execute", new=AsyncMock(return_value=fake_cursor)),
patch("app.repository.settings.parse_packet_envelope", side_effect=fake_parse),
patch("app.path_utils.parse_packet_envelope", side_effect=fake_parse),
):
breakdown = await StatisticsRepository._path_hash_width_24h()