Files
Remote-Terminal-for-MeshCore/app/repository/contacts.py
T

870 lines
35 KiB
Python

import logging
import time
from collections.abc import Mapping
from typing import Any
from app.database import db
from app.models import (
Contact,
ContactAdvertPath,
ContactAdvertPathSummary,
ContactNameHistory,
ContactUpsert,
)
from app.path_utils import first_hop_hex, normalize_contact_route, normalize_route_override
logger = logging.getLogger(__name__)
class AmbiguousPublicKeyPrefixError(ValueError):
"""Raised when a public key prefix matches multiple contacts."""
def __init__(self, prefix: str, matches: list[str]):
self.prefix = prefix.lower()
self.matches = matches
super().__init__(f"Ambiguous public key prefix '{self.prefix}'")
class ContactRepository:
@staticmethod
def _coerce_contact_upsert(
contact: ContactUpsert | Contact | Mapping[str, Any],
) -> ContactUpsert:
if isinstance(contact, ContactUpsert):
return contact
if isinstance(contact, Contact):
return contact.to_upsert()
return ContactUpsert.model_validate(contact)
@staticmethod
async def upsert(contact: ContactUpsert | Contact | Mapping[str, Any]) -> None:
contact_row = ContactRepository._coerce_contact_upsert(contact)
if (
contact_row.direct_path is None
and contact_row.direct_path_len is None
and contact_row.direct_path_hash_mode is None
):
direct_path = None
direct_path_len = None
direct_path_hash_mode = None
else:
direct_path, direct_path_len, direct_path_hash_mode = normalize_contact_route(
contact_row.direct_path,
contact_row.direct_path_len,
contact_row.direct_path_hash_mode,
)
route_override_path, route_override_len, route_override_hash_mode = (
normalize_route_override(
contact_row.route_override_path,
contact_row.route_override_len,
contact_row.route_override_hash_mode,
)
)
async with db.tx() as conn:
async with conn.execute(
"""
INSERT INTO contacts (public_key, name, type, flags, direct_path, direct_path_len,
direct_path_hash_mode, direct_path_updated_at,
route_override_path, route_override_len,
route_override_hash_mode,
last_advert, lat, lon, last_seen,
on_radio, last_contacted, first_seen)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(public_key) DO UPDATE SET
name = COALESCE(excluded.name, contacts.name),
type = CASE WHEN excluded.type = 0 THEN contacts.type ELSE excluded.type END,
flags = excluded.flags,
direct_path = COALESCE(excluded.direct_path, contacts.direct_path),
direct_path_len = COALESCE(excluded.direct_path_len, contacts.direct_path_len),
direct_path_hash_mode = COALESCE(
excluded.direct_path_hash_mode, contacts.direct_path_hash_mode
),
direct_path_updated_at = COALESCE(
excluded.direct_path_updated_at, contacts.direct_path_updated_at
),
route_override_path = COALESCE(
excluded.route_override_path, contacts.route_override_path
),
route_override_len = COALESCE(
excluded.route_override_len, contacts.route_override_len
),
route_override_hash_mode = COALESCE(
excluded.route_override_hash_mode, contacts.route_override_hash_mode
),
last_advert = COALESCE(excluded.last_advert, contacts.last_advert),
lat = COALESCE(excluded.lat, contacts.lat),
lon = COALESCE(excluded.lon, contacts.lon),
last_seen = CASE
WHEN excluded.last_seen IS NULL THEN contacts.last_seen
WHEN contacts.last_seen IS NULL THEN excluded.last_seen
WHEN excluded.last_seen > contacts.last_seen THEN excluded.last_seen
ELSE contacts.last_seen
END,
on_radio = COALESCE(excluded.on_radio, contacts.on_radio),
last_contacted = COALESCE(excluded.last_contacted, contacts.last_contacted),
first_seen = COALESCE(contacts.first_seen, excluded.first_seen)
""",
(
contact_row.public_key.lower(),
contact_row.name,
contact_row.type,
contact_row.flags,
direct_path,
direct_path_len,
direct_path_hash_mode,
contact_row.direct_path_updated_at,
route_override_path,
route_override_len,
route_override_hash_mode,
contact_row.last_advert,
contact_row.lat,
contact_row.lon,
contact_row.last_seen,
contact_row.on_radio,
contact_row.last_contacted,
contact_row.first_seen,
),
):
pass
@staticmethod
def _row_to_contact(row) -> Contact:
"""Convert a database row to a Contact model."""
available_columns = set(row.keys())
direct_path, direct_path_len, direct_path_hash_mode = normalize_contact_route(
row["direct_path"] if "direct_path" in available_columns else None,
row["direct_path_len"] if "direct_path_len" in available_columns else None,
row["direct_path_hash_mode"] if "direct_path_hash_mode" in available_columns else None,
)
route_override_path = (
row["route_override_path"] if "route_override_path" in available_columns else None
)
route_override_len = (
row["route_override_len"] if "route_override_len" in available_columns else None
)
route_override_hash_mode = (
row["route_override_hash_mode"]
if "route_override_hash_mode" in available_columns
else None
)
route_override_path, route_override_len, route_override_hash_mode = (
normalize_route_override(
route_override_path,
route_override_len,
route_override_hash_mode,
)
)
return Contact(
public_key=row["public_key"],
name=row["name"],
type=row["type"],
flags=row["flags"],
direct_path=direct_path,
direct_path_len=direct_path_len,
direct_path_hash_mode=direct_path_hash_mode,
direct_path_updated_at=(
row["direct_path_updated_at"]
if "direct_path_updated_at" in available_columns
else None
),
route_override_path=route_override_path,
route_override_len=route_override_len,
route_override_hash_mode=route_override_hash_mode,
last_advert=row["last_advert"],
lat=row["lat"],
lon=row["lon"],
last_seen=row["last_seen"],
on_radio=bool(row["on_radio"]),
favorite=bool(row["favorite"]) if "favorite" in available_columns else False,
last_contacted=row["last_contacted"],
last_read_at=row["last_read_at"],
first_seen=row["first_seen"],
)
@staticmethod
async def get_by_key(public_key: str) -> Contact | None:
async with db.readonly() as conn:
async with conn.execute(
"SELECT * FROM contacts WHERE public_key = ?", (public_key.lower(),)
) as cursor:
row = await cursor.fetchone()
return ContactRepository._row_to_contact(row) if row else None
@staticmethod
async def get_by_key_prefix(prefix: str) -> Contact | None:
"""Get a contact by key prefix only if it resolves uniquely.
Returns None when no contacts match OR when multiple contacts match
the prefix (to avoid silently selecting the wrong contact).
"""
normalized_prefix = prefix.lower()
exact = await ContactRepository.get_by_key(normalized_prefix)
if exact:
return exact
async with db.readonly() as conn:
async with conn.execute(
"SELECT * FROM contacts WHERE public_key LIKE ? ORDER BY public_key LIMIT 2",
(f"{normalized_prefix}%",),
) as cursor:
rows = list(await cursor.fetchall())
if len(rows) != 1:
return None
return ContactRepository._row_to_contact(rows[0])
@staticmethod
async def _get_prefix_matches(prefix: str, limit: int = 2) -> list[Contact]:
"""Get contacts matching a key prefix, up to limit."""
async with db.readonly() as conn:
async with conn.execute(
"SELECT * FROM contacts WHERE public_key LIKE ? ORDER BY public_key LIMIT ?",
(f"{prefix.lower()}%", limit),
) as cursor:
rows = list(await cursor.fetchall())
return [ContactRepository._row_to_contact(row) for row in rows]
@staticmethod
async def get_by_key_or_prefix(key_or_prefix: str) -> Contact | None:
"""Get a contact by exact key match, falling back to prefix match.
Useful when the input might be a full 64-char public key or a shorter prefix.
"""
contact = await ContactRepository.get_by_key(key_or_prefix)
if contact:
return contact
matches = await ContactRepository._get_prefix_matches(key_or_prefix, limit=2)
if len(matches) == 1:
return matches[0]
if len(matches) > 1:
raise AmbiguousPublicKeyPrefixError(
key_or_prefix,
[m.public_key for m in matches],
)
return None
@staticmethod
async def get_by_name(name: str) -> list[Contact]:
"""Get all contacts with the given exact name."""
async with db.readonly() as conn:
async with conn.execute("SELECT * FROM contacts WHERE name = ?", (name,)) as cursor:
rows = await cursor.fetchall()
return [ContactRepository._row_to_contact(row) for row in rows]
@staticmethod
async def resolve_prefixes(prefixes: list[str]) -> dict[str, Contact]:
"""Resolve multiple key prefixes to contacts in a single query.
Returns a dict mapping each prefix to its Contact, only for prefixes
that resolve uniquely (exactly one match). Ambiguous or unmatched
prefixes are omitted.
"""
if not prefixes:
return {}
normalized = [p.lower() for p in prefixes]
conditions = " OR ".join(["public_key LIKE ?"] * len(normalized))
params = [f"{p}%" for p in normalized]
async with db.readonly() as conn:
async with conn.execute(f"SELECT * FROM contacts WHERE {conditions}", params) as cursor:
rows = await cursor.fetchall()
# Group by which prefix each row matches
prefix_to_rows: dict[str, list] = {p: [] for p in normalized}
for row in rows:
pk = row["public_key"]
for p in normalized:
if pk.startswith(p):
prefix_to_rows[p].append(row)
# Only include uniquely-resolved prefixes
result: dict[str, Contact] = {}
for p in normalized:
if len(prefix_to_rows[p]) == 1:
result[p] = ContactRepository._row_to_contact(prefix_to_rows[p][0])
return result
@staticmethod
async def get_all(limit: int = 100, offset: int = 0) -> list[Contact]:
async with db.readonly() as conn:
async with conn.execute(
"SELECT * FROM contacts ORDER BY COALESCE(name, public_key) LIMIT ? OFFSET ?",
(limit, offset),
) as cursor:
rows = await cursor.fetchall()
return [ContactRepository._row_to_contact(row) for row in rows]
@staticmethod
async def get_recently_contacted_non_repeaters(limit: int = 200) -> list[Contact]:
"""Get recently interacted-with non-repeater contacts."""
async with db.readonly() as conn:
async with conn.execute(
"""
SELECT * FROM contacts
WHERE type != 2 AND last_contacted IS NOT NULL AND length(public_key) = 64
ORDER BY last_contacted DESC
LIMIT ?
""",
(limit,),
) as cursor:
rows = await cursor.fetchall()
return [ContactRepository._row_to_contact(row) for row in rows]
@staticmethod
async def get_recently_dm_active_non_repeaters(limit: int = 200) -> list[Contact]:
"""Get non-repeater contacts with the most recent DM activity (sent or received)."""
async with db.readonly() as conn:
async with conn.execute(
"""
SELECT c.*
FROM contacts c
INNER JOIN (
SELECT conversation_key, MAX(received_at) AS last_dm
FROM messages
WHERE type = 'PRIV'
GROUP BY conversation_key
) m ON c.public_key = m.conversation_key
WHERE c.type != 2 AND length(c.public_key) = 64
ORDER BY m.last_dm DESC
LIMIT ?
""",
(limit,),
) as cursor:
rows = await cursor.fetchall()
return [ContactRepository._row_to_contact(row) for row in rows]
@staticmethod
async def get_recently_advertised_non_repeaters(limit: int = 200) -> list[Contact]:
"""Get recently advert-heard non-repeater contacts."""
async with db.readonly() as conn:
async with conn.execute(
"""
SELECT * FROM contacts
WHERE type != 2 AND last_advert IS NOT NULL AND length(public_key) = 64
ORDER BY last_advert DESC
LIMIT ?
""",
(limit,),
) as cursor:
rows = await cursor.fetchall()
return [ContactRepository._row_to_contact(row) for row in rows]
@staticmethod
async def update_direct_path(
public_key: str,
path: str,
path_len: int,
path_hash_mode: int | None = None,
updated_at: int | None = None,
) -> None:
"""Persist a learned direct route for a contact.
Both callers (the RF PATH packet processor and the firmware PATH_UPDATE
event handler) are RF-backed: firmware ``onContactPathUpdated`` only
fires from ``onContactPathRecv`` during RF PATH packet reception. So
this method also advances ``last_seen`` monotonically. Never moves
``last_seen`` backwards if an out-of-order arrival lands with an older
timestamp.
"""
normalized_path, normalized_path_len, normalized_hash_mode = normalize_contact_route(
path,
path_len,
path_hash_mode,
)
ts = updated_at if updated_at is not None else int(time.time())
async with db.tx() as conn:
async with conn.execute(
"""UPDATE contacts SET direct_path = ?, direct_path_len = ?,
direct_path_hash_mode = COALESCE(?, direct_path_hash_mode),
direct_path_updated_at = ?,
last_seen = CASE
WHEN last_seen IS NULL THEN ?
WHEN ? > last_seen THEN ?
ELSE last_seen
END
WHERE public_key = ?""",
(
normalized_path,
normalized_path_len,
normalized_hash_mode,
ts,
ts,
ts,
ts,
public_key.lower(),
),
):
pass
@staticmethod
async def set_routing_override(
public_key: str,
path: str | None,
path_len: int | None,
path_hash_mode: int | None = None,
) -> None:
normalized_path, normalized_len, normalized_hash_mode = normalize_route_override(
path,
path_len,
path_hash_mode,
)
async with db.tx() as conn:
async with conn.execute(
"""
UPDATE contacts
SET route_override_path = ?, route_override_len = ?, route_override_hash_mode = ?
WHERE public_key = ?
""",
(
normalized_path,
normalized_len,
normalized_hash_mode,
public_key.lower(),
),
):
pass
@staticmethod
async def clear_routing_override(public_key: str) -> None:
async with db.tx() as conn:
async with conn.execute(
"""
UPDATE contacts
SET route_override_path = NULL,
route_override_len = NULL,
route_override_hash_mode = NULL
WHERE public_key = ?
""",
(public_key.lower(),),
):
pass
@staticmethod
async def clear_on_radio_except(keep_keys: list[str]) -> None:
"""Set on_radio=False for all contacts NOT in keep_keys."""
async with db.tx() as conn:
if not keep_keys:
async with conn.execute("UPDATE contacts SET on_radio = 0 WHERE on_radio = 1"):
pass
else:
placeholders = ",".join("?" * len(keep_keys))
async with conn.execute(
f"UPDATE contacts SET on_radio = 0 WHERE on_radio = 1 AND public_key NOT IN ({placeholders})",
keep_keys,
):
pass
@staticmethod
async def get_favorites() -> list[Contact]:
"""Return all contacts marked as favorite."""
async with db.readonly() as conn:
async with conn.execute(
"SELECT * FROM contacts WHERE favorite = 1 AND LENGTH(public_key) = 64"
) as cursor:
rows = await cursor.fetchall()
return [ContactRepository._row_to_contact(row) for row in rows]
@staticmethod
async def set_favorite(public_key: str, value: bool) -> None:
"""Set or clear the favorite flag for a contact."""
async with db.tx() as conn:
async with conn.execute(
"UPDATE contacts SET favorite = ? WHERE public_key = ?",
(1 if value else 0, public_key.lower()),
):
pass
@staticmethod
async def delete(public_key: str) -> None:
normalized = public_key.lower()
# contact_name_history and contact_advert_paths cascade via FK.
# Messages are intentionally preserved so history re-surfaces
# if the contact is re-added later.
async with db.tx() as conn:
async with conn.execute("DELETE FROM contacts WHERE public_key = ?", (normalized,)):
pass
@staticmethod
async def update_last_contacted(public_key: str, timestamp: int | None = None) -> None:
"""Update the last_contacted timestamp for a contact.
``last_contacted`` tracks the most recent direct-conversation activity
with this contact in either direction (incoming or outgoing DM). It is
the field that powers "recent conversations" ordering on the frontend.
It deliberately does not touch ``last_seen``: ``last_seen`` is reserved
for actual RF reception from the contact, and outgoing sends are not
evidence that we heard from them. RF observations from DM ingest update
``last_seen`` via :meth:`touch_last_seen` on incoming DMs only.
"""
ts = timestamp if timestamp is not None else int(time.time())
async with db.tx() as conn:
async with conn.execute(
"UPDATE contacts SET last_contacted = ? WHERE public_key = ?",
(ts, public_key.lower()),
):
pass
@staticmethod
async def touch_last_seen(public_key: str, timestamp: int) -> None:
"""Monotonically bump last_seen for a contact from an RF observation.
Never moves last_seen backwards; a no-op if the contact row does not
exist. Use this from packet-ingest paths that have attributed a packet
to a specific contact pubkey (advert, incoming DM, decrypted PATH, etc.).
"""
async with db.tx() as conn:
async with conn.execute(
"""
UPDATE contacts
SET last_seen = CASE
WHEN last_seen IS NULL THEN ?
WHEN ? > last_seen THEN ?
ELSE last_seen
END
WHERE public_key = ?
""",
(timestamp, timestamp, timestamp, public_key.lower()),
):
pass
@staticmethod
async def update_last_read_at(public_key: str, timestamp: int | None = None) -> bool:
"""Update the last_read_at timestamp for a contact.
Returns True if a row was updated, False if contact not found.
"""
ts = timestamp if timestamp is not None else int(time.time())
async with db.tx() as conn:
async with conn.execute(
"UPDATE contacts SET last_read_at = ? WHERE public_key = ?",
(ts, public_key.lower()),
) as cursor:
rowcount = cursor.rowcount
return rowcount > 0
@staticmethod
async def promote_prefix_placeholders(full_key: str) -> list[str]:
"""Promote prefix-only placeholder contacts to a resolved full key.
Returns the placeholder public keys that were merged into the full key.
All operations for the promotion happen inside one ``db.tx()`` so
partial promotions never leak to readers between steps.
"""
async def migrate_child_rows(conn, old_key: str, new_key: str) -> None:
async with conn.execute(
"""
INSERT INTO contact_name_history (public_key, name, first_seen, last_seen)
SELECT ?, name, first_seen, last_seen
FROM contact_name_history
WHERE public_key = ?
ON CONFLICT(public_key, name) DO UPDATE SET
first_seen = MIN(contact_name_history.first_seen, excluded.first_seen),
last_seen = MAX(contact_name_history.last_seen, excluded.last_seen)
""",
(new_key, old_key),
):
pass
async with conn.execute(
"""
INSERT INTO contact_advert_paths
(public_key, path_hex, path_len, first_seen, last_seen, heard_count)
SELECT ?, path_hex, path_len, first_seen, last_seen, heard_count
FROM contact_advert_paths
WHERE public_key = ?
ON CONFLICT(public_key, path_hex, path_len) DO UPDATE SET
first_seen = MIN(contact_advert_paths.first_seen, excluded.first_seen),
last_seen = MAX(contact_advert_paths.last_seen, excluded.last_seen),
heard_count = contact_advert_paths.heard_count + excluded.heard_count
""",
(new_key, old_key),
):
pass
async with conn.execute(
"DELETE FROM contact_name_history WHERE public_key = ?",
(old_key,),
):
pass
async with conn.execute(
"DELETE FROM contact_advert_paths WHERE public_key = ?",
(old_key,),
):
pass
normalized_full_key = full_key.lower()
promoted_keys: list[str] = []
async with db.tx() as conn:
async with conn.execute(
"""
SELECT public_key, last_seen, last_contacted, first_seen, last_read_at
FROM contacts
WHERE length(public_key) < 64
AND ? LIKE public_key || '%'
ORDER BY length(public_key) DESC, public_key
""",
(normalized_full_key,),
) as cursor:
rows = list(await cursor.fetchall())
if not rows:
return []
for row in rows:
old_key = row["public_key"]
if old_key == normalized_full_key:
continue
async with conn.execute(
"""
SELECT COUNT(*) AS match_count
FROM contacts
WHERE length(public_key) = 64
AND public_key LIKE ? || '%'
""",
(old_key,),
) as match_cursor:
match_row = await match_cursor.fetchone()
match_count = match_row["match_count"] if match_row is not None else 0
if match_count != 1:
logger.warning(
"Skipping prefix promotion for %s: %d full-key contacts match (expected 1)",
old_key,
match_count,
)
continue
await migrate_child_rows(conn, old_key, normalized_full_key)
# Merge timestamp metadata from the old prefix contact into the
# full-key contact (which all callers guarantee already exists),
# then delete the prefix placeholder.
async with conn.execute(
"""
UPDATE contacts
SET last_seen = CASE
WHEN contacts.last_seen IS NULL THEN ?
WHEN ? IS NULL THEN contacts.last_seen
WHEN ? > contacts.last_seen THEN ?
ELSE contacts.last_seen
END,
last_contacted = CASE
WHEN contacts.last_contacted IS NULL THEN ?
WHEN ? IS NULL THEN contacts.last_contacted
WHEN ? > contacts.last_contacted THEN ?
ELSE contacts.last_contacted
END,
first_seen = CASE
WHEN contacts.first_seen IS NULL THEN ?
WHEN ? IS NULL THEN contacts.first_seen
WHEN ? < contacts.first_seen THEN ?
ELSE contacts.first_seen
END,
last_read_at = CASE
WHEN contacts.last_read_at IS NULL THEN ?
WHEN ? IS NULL THEN contacts.last_read_at
WHEN ? > contacts.last_read_at THEN ?
ELSE contacts.last_read_at
END
WHERE public_key = ?
""",
(
row["last_seen"],
row["last_seen"],
row["last_seen"],
row["last_seen"],
row["last_contacted"],
row["last_contacted"],
row["last_contacted"],
row["last_contacted"],
row["first_seen"],
row["first_seen"],
row["first_seen"],
row["first_seen"],
row["last_read_at"],
row["last_read_at"],
row["last_read_at"],
row["last_read_at"],
normalized_full_key,
),
):
pass
async with conn.execute("DELETE FROM contacts WHERE public_key = ?", (old_key,)):
pass
promoted_keys.append(old_key)
return promoted_keys
@staticmethod
async def mark_all_read(timestamp: int) -> None:
"""Mark all contacts as read at the given timestamp."""
async with db.tx() as conn:
async with conn.execute("UPDATE contacts SET last_read_at = ?", (timestamp,)):
pass
@staticmethod
async def get_by_pubkey_first_byte(hex_byte: str) -> list[Contact]:
"""Get contacts whose public key starts with the given hex byte (2 chars)."""
async with db.readonly() as conn:
async with conn.execute(
"SELECT * FROM contacts WHERE substr(public_key, 1, 2) = ?",
(hex_byte.lower(),),
) as cursor:
rows = await cursor.fetchall()
return [ContactRepository._row_to_contact(row) for row in rows]
class ContactAdvertPathRepository:
"""Repository for recent unique advertisement paths per contact."""
@staticmethod
def _row_to_path(row) -> ContactAdvertPath:
path = row["path_hex"] or ""
path_len = row["path_len"]
next_hop = first_hop_hex(path, path_len)
return ContactAdvertPath(
path=path,
path_len=path_len,
next_hop=next_hop,
first_seen=row["first_seen"],
last_seen=row["last_seen"],
heard_count=row["heard_count"],
)
@staticmethod
async def record_observation(
public_key: str,
path_hex: str,
timestamp: int,
max_paths: int = 10,
hop_count: int | None = None,
) -> None:
"""
Upsert a unique advert path observation for a contact and prune to N most recent.
"""
if max_paths < 1:
max_paths = 1
normalized_key = public_key.lower()
normalized_path = path_hex.lower()
path_len = hop_count if hop_count is not None else len(normalized_path) // 2
async with db.tx() as conn:
async with conn.execute(
"""
INSERT INTO contact_advert_paths
(public_key, path_hex, path_len, first_seen, last_seen, heard_count)
VALUES (?, ?, ?, ?, ?, 1)
ON CONFLICT(public_key, path_hex, path_len) DO UPDATE SET
last_seen = MAX(contact_advert_paths.last_seen, excluded.last_seen),
heard_count = contact_advert_paths.heard_count + 1
""",
(normalized_key, normalized_path, path_len, timestamp, timestamp),
):
pass
# Keep only the N most recent unique paths per contact.
async with conn.execute(
"""
DELETE FROM contact_advert_paths
WHERE public_key = ?
AND id NOT IN (
SELECT id
FROM contact_advert_paths
WHERE public_key = ?
ORDER BY last_seen DESC, heard_count DESC, path_len ASC, path_hex ASC
LIMIT ?
)
""",
(normalized_key, normalized_key, max_paths),
):
pass
@staticmethod
async def get_recent_for_contact(public_key: str, limit: int = 10) -> list[ContactAdvertPath]:
async with db.readonly() as conn:
async with conn.execute(
"""
SELECT path_hex, path_len, first_seen, last_seen, heard_count
FROM contact_advert_paths
WHERE public_key = ?
ORDER BY last_seen DESC, heard_count DESC, path_len ASC, path_hex ASC
LIMIT ?
""",
(public_key.lower(), limit),
) as cursor:
rows = await cursor.fetchall()
return [ContactAdvertPathRepository._row_to_path(row) for row in rows]
@staticmethod
async def get_recent_for_all_contacts(
limit_per_contact: int = 10,
) -> list[ContactAdvertPathSummary]:
async with db.readonly() as conn:
async with conn.execute(
"""
SELECT public_key, path_hex, path_len, first_seen, last_seen, heard_count
FROM (
SELECT *,
ROW_NUMBER() OVER (
PARTITION BY public_key
ORDER BY last_seen DESC, heard_count DESC, path_len ASC, path_hex ASC
) AS rn
FROM contact_advert_paths
)
WHERE rn <= ?
ORDER BY public_key ASC, last_seen DESC, heard_count DESC, path_len ASC, path_hex ASC
""",
(limit_per_contact,),
) as cursor:
rows = await cursor.fetchall()
grouped: dict[str, list[ContactAdvertPath]] = {}
for row in rows:
key = row["public_key"]
paths = grouped.get(key)
if paths is None:
paths = []
grouped[key] = paths
paths.append(ContactAdvertPathRepository._row_to_path(row))
return [
ContactAdvertPathSummary(public_key=key, paths=paths) for key, paths in grouped.items()
]
class ContactNameHistoryRepository:
"""Repository for contact name change history."""
@staticmethod
async def record_name(public_key: str, name: str, timestamp: int) -> None:
"""Record a name observation. Upserts: updates last_seen if name already known."""
async with db.tx() as conn:
async with conn.execute(
"""
INSERT INTO contact_name_history (public_key, name, first_seen, last_seen)
VALUES (?, ?, ?, ?)
ON CONFLICT(public_key, name) DO UPDATE SET
last_seen = MAX(contact_name_history.last_seen, excluded.last_seen)
""",
(public_key.lower(), name, timestamp, timestamp),
):
pass
@staticmethod
async def get_history(public_key: str) -> list[ContactNameHistory]:
async with db.readonly() as conn:
async with conn.execute(
"""
SELECT name, first_seen, last_seen
FROM contact_name_history
WHERE public_key = ?
ORDER BY last_seen DESC
""",
(public_key.lower(),),
) as cursor:
rows = await cursor.fetchall()
return [
ContactNameHistory(
name=row["name"], first_seen=row["first_seen"], last_seen=row["last_seen"]
)
for row in rows
]