forked from iarv/Remote-Terminal-for-MeshCore
Fix clock filtering and contact lookup behavior bugs
This commit is contained in:
@@ -4,7 +4,7 @@ from typing import TYPE_CHECKING
|
||||
|
||||
from meshcore import EventType
|
||||
|
||||
from app.models import Contact
|
||||
from app.models import CONTACT_TYPE_REPEATER, Contact
|
||||
from app.packet_processor import process_raw_packet
|
||||
from app.repository import ContactRepository, MessageRepository
|
||||
from app.websocket import broadcast_event
|
||||
@@ -71,11 +71,20 @@ async def on_contact_message(event: "Event") -> None:
|
||||
sender_pubkey = payload.get("public_key") or payload.get("pubkey_prefix", "")
|
||||
received_at = int(time.time())
|
||||
|
||||
# Look up full public key from contact database if we only have prefix
|
||||
if len(sender_pubkey) < 64:
|
||||
contact = await ContactRepository.get_by_key_prefix(sender_pubkey)
|
||||
if contact:
|
||||
sender_pubkey = contact.public_key
|
||||
# Look up contact from database - use prefix lookup only if needed
|
||||
# (get_by_key_or_prefix does exact match first, then prefix fallback)
|
||||
contact = await ContactRepository.get_by_key_or_prefix(sender_pubkey)
|
||||
if contact:
|
||||
sender_pubkey = contact.public_key
|
||||
|
||||
# Skip messages from repeaters - they only send CLI responses, not chat messages.
|
||||
# CLI responses are handled by the command endpoint and txt_type filter above.
|
||||
if contact.type == CONTACT_TYPE_REPEATER:
|
||||
logger.debug(
|
||||
"Skipping message from repeater %s (not stored in chat history)",
|
||||
sender_pubkey[:12],
|
||||
)
|
||||
return
|
||||
|
||||
# Try to create message - INSERT OR IGNORE handles duplicates atomically
|
||||
# If the packet processor already stored this message, this returns None
|
||||
@@ -121,8 +130,7 @@ async def on_contact_message(event: "Event") -> None:
|
||||
},
|
||||
)
|
||||
|
||||
# Update contact last_contacted
|
||||
contact = await ContactRepository.get_by_key_prefix(sender_pubkey)
|
||||
# Update contact last_contacted (contact was already fetched above)
|
||||
if contact:
|
||||
await ContactRepository.update_last_contacted(contact.public_key, received_at)
|
||||
|
||||
|
||||
@@ -204,6 +204,9 @@ class TelemetryResponse(BaseModel):
|
||||
default_factory=list, description="List of neighbors seen by repeater"
|
||||
)
|
||||
acl: list[AclEntry] = Field(default_factory=list, description="Access control list")
|
||||
clock_output: str | None = Field(
|
||||
default=None, description="Output from 'clock' command (or error message)"
|
||||
)
|
||||
|
||||
|
||||
class CommandRequest(BaseModel):
|
||||
|
||||
@@ -212,13 +212,14 @@ async def create_dm_message_from_decrypted(
|
||||
|
||||
Returns the message ID if created, None if duplicate.
|
||||
"""
|
||||
# Extract txt_type from flags (lower 4 bits)
|
||||
# txt_type=1 is CLI response - don't store these in chat history
|
||||
txt_type = decrypted.flags & 0x0F
|
||||
if txt_type == 1:
|
||||
# Check if sender is a repeater - repeaters only send CLI responses, not chat messages.
|
||||
# CLI responses are handled by the command endpoint, not stored in chat history.
|
||||
contact = await ContactRepository.get_by_key_or_prefix(their_public_key)
|
||||
if contact and contact.type == CONTACT_TYPE_REPEATER:
|
||||
logger.debug(
|
||||
"Skipping CLI response from %s (txt_type=1)",
|
||||
"Skipping message from repeater %s (CLI responses not stored): %s",
|
||||
their_public_key[:12],
|
||||
(decrypted.message or "")[:50],
|
||||
)
|
||||
return None
|
||||
|
||||
|
||||
@@ -361,8 +361,49 @@ async def request_telemetry(public_key: str, request: TelemetryRequest) -> Telem
|
||||
)
|
||||
)
|
||||
|
||||
# Fetch clock output (up to 2 attempts)
|
||||
# Must pause polling and stop auto-fetch to prevent race condition where
|
||||
# the CLI response is consumed before we can call get_msg()
|
||||
logger.info("Fetching clock from repeater %s", contact.public_key[:12])
|
||||
clock_output: str | None = None
|
||||
|
||||
async with pause_polling():
|
||||
await mc.stop_auto_message_fetching()
|
||||
try:
|
||||
for attempt in range(1, 3):
|
||||
logger.debug("Clock request attempt %d/2", attempt)
|
||||
try:
|
||||
send_result = await mc.commands.send_cmd(contact.public_key, "clock")
|
||||
if send_result.type == EventType.ERROR:
|
||||
logger.debug("Clock command send error: %s", send_result.payload)
|
||||
continue
|
||||
|
||||
# Wait for response
|
||||
wait_result = await mc.wait_for_event(EventType.MESSAGES_WAITING, timeout=5.0)
|
||||
if wait_result is None:
|
||||
logger.debug("Clock request timeout, retrying...")
|
||||
continue
|
||||
|
||||
response_event = await mc.commands.get_msg()
|
||||
if response_event.type == EventType.ERROR:
|
||||
logger.debug("Clock get_msg error: %s", response_event.payload)
|
||||
continue
|
||||
|
||||
clock_output = response_event.payload.get("text", "")
|
||||
logger.info("Received clock output: %s", clock_output)
|
||||
break
|
||||
except Exception as e:
|
||||
logger.debug("Clock request exception: %s", e)
|
||||
continue
|
||||
finally:
|
||||
await mc.start_auto_message_fetching()
|
||||
|
||||
if clock_output is None:
|
||||
clock_output = "Unable to fetch `clock` output (repeater did not respond)"
|
||||
|
||||
# Convert raw telemetry to response format
|
||||
# bat is in mV, convert to V (e.g., 3775 -> 3.775)
|
||||
|
||||
return TelemetryResponse(
|
||||
pubkey_prefix=status.get("pubkey_pre", contact.public_key[:12]),
|
||||
battery_volts=status.get("bat", 0) / 1000.0,
|
||||
@@ -384,6 +425,7 @@ async def request_telemetry(public_key: str, request: TelemetryRequest) -> Telem
|
||||
full_events=status.get("full_evts", 0),
|
||||
neighbors=neighbors,
|
||||
acl=acl_entries,
|
||||
clock_output=clock_output,
|
||||
)
|
||||
|
||||
|
||||
|
||||
File diff suppressed because one or more lines are too long
1
frontend/dist/assets/index-COA8MjNX.js.map
vendored
Normal file
1
frontend/dist/assets/index-COA8MjNX.js.map
vendored
Normal file
File diff suppressed because one or more lines are too long
1
frontend/dist/assets/index-D5ozmcKB.js.map
vendored
1
frontend/dist/assets/index-D5ozmcKB.js.map
vendored
File diff suppressed because one or more lines are too long
2
frontend/dist/index.html
vendored
2
frontend/dist/index.html
vendored
@@ -13,7 +13,7 @@
|
||||
<link rel="shortcut icon" href="/favicon.ico" />
|
||||
<link rel="apple-touch-icon" sizes="180x180" href="/apple-touch-icon.png" />
|
||||
<link rel="manifest" href="/site.webmanifest" />
|
||||
<script type="module" crossorigin src="/assets/index-D5ozmcKB.js"></script>
|
||||
<script type="module" crossorigin src="/assets/index-COA8MjNX.js"></script>
|
||||
<link rel="stylesheet" crossorigin href="/assets/index-H2C92sGV.css">
|
||||
</head>
|
||||
<body>
|
||||
|
||||
@@ -37,6 +37,7 @@ export function formatTelemetry(telemetry: TelemetryResponse): string {
|
||||
`Telemetry`,
|
||||
`Battery Voltage: ${telemetry.battery_volts.toFixed(3)}V`,
|
||||
`Uptime: ${formatDuration(telemetry.uptime_seconds)}`,
|
||||
...(telemetry.clock_output ? [`Clock: ${telemetry.clock_output}`] : []),
|
||||
`TX Airtime: ${formatDuration(telemetry.airtime_seconds)}`,
|
||||
`RX Airtime: ${formatDuration(telemetry.rx_airtime_seconds)}`,
|
||||
'',
|
||||
|
||||
@@ -78,6 +78,7 @@ describe('formatTelemetry', () => {
|
||||
full_events: 0,
|
||||
neighbors: [],
|
||||
acl: [],
|
||||
clock_output: null,
|
||||
};
|
||||
|
||||
const result = formatTelemetry(telemetry);
|
||||
@@ -119,6 +120,7 @@ describe('formatTelemetry', () => {
|
||||
full_events: 0,
|
||||
neighbors: [],
|
||||
acl: [],
|
||||
clock_output: null,
|
||||
};
|
||||
|
||||
const result = formatTelemetry(telemetry);
|
||||
|
||||
@@ -197,6 +197,7 @@ export interface TelemetryResponse {
|
||||
full_events: number;
|
||||
neighbors: NeighborInfo[];
|
||||
acl: AclEntry[];
|
||||
clock_output: string | null;
|
||||
}
|
||||
|
||||
export interface CommandResponse {
|
||||
|
||||
@@ -195,7 +195,7 @@ class TestContactMessageCLIFiltering:
|
||||
patch("app.bot.run_bot_for_message", new_callable=AsyncMock),
|
||||
):
|
||||
mock_repo.create = AsyncMock(return_value=42)
|
||||
mock_contact_repo.get_by_key_prefix = AsyncMock(return_value=None)
|
||||
mock_contact_repo.get_by_key_or_prefix = AsyncMock(return_value=None)
|
||||
|
||||
class MockEvent:
|
||||
payload = {
|
||||
@@ -224,7 +224,7 @@ class TestContactMessageCLIFiltering:
|
||||
patch("app.bot.run_bot_for_message", new_callable=AsyncMock),
|
||||
):
|
||||
mock_repo.create = AsyncMock(return_value=42)
|
||||
mock_contact_repo.get_by_key_prefix = AsyncMock(return_value=None)
|
||||
mock_contact_repo.get_by_key_or_prefix = AsyncMock(return_value=None)
|
||||
|
||||
class MockEvent:
|
||||
payload = {
|
||||
@@ -259,7 +259,7 @@ class TestContactMessageCLIFiltering:
|
||||
patch("app.bot.run_bot_for_message", new_callable=AsyncMock),
|
||||
):
|
||||
mock_repo.create = AsyncMock(return_value=42)
|
||||
mock_contact_repo.get_by_key_prefix = AsyncMock(return_value=None)
|
||||
mock_contact_repo.get_by_key_or_prefix = AsyncMock(return_value=None)
|
||||
|
||||
class MockEvent:
|
||||
payload = {
|
||||
|
||||
@@ -943,26 +943,47 @@ class TestDMDecryptionFunction:
|
||||
assert packet_id not in [p.id for p in undecrypted]
|
||||
|
||||
|
||||
class TestCLIResponseFiltering:
|
||||
"""Test that CLI responses (txt_type=1) are not stored in chat history."""
|
||||
class TestRepeaterMessageFiltering:
|
||||
"""Test that messages from repeaters are not stored in chat history.
|
||||
|
||||
A1B2C3_PUB = "a1b2c3d3ba9f5fa8705b9845fe11cc6f01d1d49caaf4d122ac7121663c5beec7"
|
||||
FACE12_PUB = "FACE123334789E2B81519AFDBC39A3C9EB7EA3457AD367D3243597A484847E46"
|
||||
Repeaters only send CLI responses (not chat messages), and these are handled
|
||||
by the command endpoint. The packet processor filters them out based on
|
||||
contact type to prevent duplicate storage.
|
||||
"""
|
||||
|
||||
# A repeater contact
|
||||
REPEATER_PUB = "a1b2c3d3ba9f5fa8705b9845fe11cc6f01d1d49caaf4d122ac7121663c5beec7"
|
||||
# A normal client contact
|
||||
CLIENT_PUB = "b2c3d4e4cb0a6fb9816ca956ff22dd7f12e2e5adbbf5e233bd8232774d6cffe8"
|
||||
# Our public key
|
||||
OUR_PUB = "FACE123334789E2B81519AFDBC39A3C9EB7EA3457AD367D3243597A484847E46"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_cli_response_not_stored(self, test_db, captured_broadcasts):
|
||||
"""CLI responses (flags & 0x0F == 1) should not be stored in database."""
|
||||
async def test_repeater_message_not_stored(self, test_db, captured_broadcasts):
|
||||
"""Messages from repeaters should not be stored in database."""
|
||||
from app.decoder import DecryptedDirectMessage
|
||||
from app.models import CONTACT_TYPE_REPEATER
|
||||
from app.packet_processor import create_dm_message_from_decrypted
|
||||
from app.repository import MessageRepository, RawPacketRepository
|
||||
from app.repository import ContactRepository, MessageRepository, RawPacketRepository
|
||||
|
||||
# Store a raw packet first
|
||||
# Create a repeater contact first
|
||||
await ContactRepository.upsert(
|
||||
{
|
||||
"public_key": self.REPEATER_PUB,
|
||||
"name": "Test Repeater",
|
||||
"type": CONTACT_TYPE_REPEATER, # type=2 is repeater
|
||||
"flags": 0,
|
||||
"on_radio": False,
|
||||
}
|
||||
)
|
||||
|
||||
# Store a raw packet
|
||||
packet_id, _ = await RawPacketRepository.create(b"\x09\x00test", 1700000000)
|
||||
|
||||
# Create a DecryptedDirectMessage with flags=1 (CLI response)
|
||||
# Create a DecryptedDirectMessage (simulating a CLI response from repeater)
|
||||
decrypted = DecryptedDirectMessage(
|
||||
timestamp=1700000000,
|
||||
flags=1, # txt_type=1 (CLI response)
|
||||
flags=0, # flags don't matter - we filter by contact type
|
||||
message="cli response: version 1.0",
|
||||
dest_hash="fa",
|
||||
src_hash="a1",
|
||||
@@ -974,13 +995,13 @@ class TestCLIResponseFiltering:
|
||||
msg_id = await create_dm_message_from_decrypted(
|
||||
packet_id=packet_id,
|
||||
decrypted=decrypted,
|
||||
their_public_key=self.A1B2C3_PUB,
|
||||
our_public_key=self.FACE12_PUB,
|
||||
their_public_key=self.REPEATER_PUB,
|
||||
our_public_key=self.OUR_PUB,
|
||||
received_at=1700000001,
|
||||
outgoing=False,
|
||||
)
|
||||
|
||||
# Should return None (not stored)
|
||||
# Should return None (not stored because sender is a repeater)
|
||||
assert msg_id is None
|
||||
|
||||
# Should not broadcast
|
||||
@@ -988,25 +1009,36 @@ class TestCLIResponseFiltering:
|
||||
|
||||
# Should not be in database
|
||||
messages = await MessageRepository.get_all(
|
||||
msg_type="PRIV", conversation_key=self.A1B2C3_PUB.lower(), limit=10
|
||||
msg_type="PRIV", conversation_key=self.REPEATER_PUB.lower(), limit=10
|
||||
)
|
||||
assert len(messages) == 0
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_normal_message_still_stored(self, test_db, captured_broadcasts):
|
||||
"""Normal messages (flags & 0x0F == 0) should still be stored."""
|
||||
async def test_client_message_still_stored(self, test_db, captured_broadcasts):
|
||||
"""Messages from normal clients should still be stored."""
|
||||
from app.decoder import DecryptedDirectMessage
|
||||
from app.packet_processor import create_dm_message_from_decrypted
|
||||
from app.repository import MessageRepository, RawPacketRepository
|
||||
from app.repository import ContactRepository, MessageRepository, RawPacketRepository
|
||||
|
||||
# Create a normal client contact (type=1)
|
||||
await ContactRepository.upsert(
|
||||
{
|
||||
"public_key": self.CLIENT_PUB,
|
||||
"name": "Test Client",
|
||||
"type": 1, # type=1 is client
|
||||
"flags": 0,
|
||||
"on_radio": False,
|
||||
}
|
||||
)
|
||||
|
||||
packet_id, _ = await RawPacketRepository.create(b"\x09\x00test2", 1700000000)
|
||||
|
||||
decrypted = DecryptedDirectMessage(
|
||||
timestamp=1700000000,
|
||||
flags=0, # txt_type=0 (normal message)
|
||||
flags=0,
|
||||
message="Hello, world!",
|
||||
dest_hash="fa",
|
||||
src_hash="a1",
|
||||
src_hash="b2",
|
||||
)
|
||||
|
||||
broadcasts, mock_broadcast = captured_broadcasts
|
||||
@@ -1015,13 +1047,13 @@ class TestCLIResponseFiltering:
|
||||
msg_id = await create_dm_message_from_decrypted(
|
||||
packet_id=packet_id,
|
||||
decrypted=decrypted,
|
||||
their_public_key=self.A1B2C3_PUB,
|
||||
our_public_key=self.FACE12_PUB,
|
||||
their_public_key=self.CLIENT_PUB,
|
||||
our_public_key=self.OUR_PUB,
|
||||
received_at=1700000001,
|
||||
outgoing=False,
|
||||
)
|
||||
|
||||
# Should return message ID
|
||||
# Should return message ID (stored because sender is a client)
|
||||
assert msg_id is not None
|
||||
|
||||
# Should broadcast
|
||||
@@ -1030,6 +1062,6 @@ class TestCLIResponseFiltering:
|
||||
|
||||
# Should be in database
|
||||
messages = await MessageRepository.get_all(
|
||||
msg_type="PRIV", conversation_key=self.A1B2C3_PUB.lower(), limit=10
|
||||
msg_type="PRIV", conversation_key=self.CLIENT_PUB.lower(), limit=10
|
||||
)
|
||||
assert len(messages) == 1
|
||||
|
||||
Reference in New Issue
Block a user