feat: Add DM delivery tracking via ACK packet detection

Bridge captures ACK packets from meshcli stdout (json_log_rx),
persists to .acks.jsonl, and exposes /ack_status endpoint.
Delivery status is merged server-side into DM messages and
displayed as a green checkmark with SNR/route tooltip.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
MarekWo
2026-02-18 09:30:33 +01:00
parent cf537628cf
commit 7a960f2556
5 changed files with 210 additions and 10 deletions

View File

@@ -397,6 +397,36 @@ def send_dm(recipient: str, text: str) -> Tuple[bool, str]:
return success, stdout or stderr
def check_dm_delivery(ack_codes: list) -> Tuple[bool, Dict, str]:
"""
Check delivery status for sent DMs by their expected_ack codes.
Args:
ack_codes: List of expected_ack hex strings from SENT_MSG log entries
Returns:
Tuple of (success, ack_status_dict, error_message)
ack_status_dict maps ack_code -> ack_info dict or None
"""
try:
response = requests.get(
f"{config.MC_BRIDGE_URL.replace('/cli', '/ack_status')}",
params={'ack_codes': ','.join(ack_codes)},
timeout=DEFAULT_TIMEOUT
)
if response.status_code != 200:
return False, {}, f"Bridge error: {response.status_code}"
data = response.json()
return data.get('success', False), data.get('acks', {}), ''
except requests.exceptions.ConnectionError:
return False, {}, 'Cannot connect to bridge'
except Exception as e:
return False, {}, str(e)
# =============================================================================
# Contact Management (Existing & Pending Contacts)
# =============================================================================

View File

@@ -440,7 +440,8 @@ def _parse_sent_msg(line: Dict) -> Optional[Dict]:
'is_own': True,
'txt_type': txt_type,
'conversation_id': conversation_id,
'dedup_key': dedup_key
'dedup_key': dedup_key,
'expected_ack': line.get('expected_ack'),
}

View File

@@ -1592,6 +1592,23 @@ def get_dm_messages():
elif msg['direction'] == 'outgoing' and msg.get('recipient'):
display_name = msg['recipient']
# Merge delivery status from ACK tracking
ack_codes = [msg['expected_ack'] for msg in messages
if msg.get('direction') == 'outgoing' and msg.get('expected_ack')]
if ack_codes:
try:
success_ack, acks, _ = cli.check_dm_delivery(ack_codes)
if success_ack:
for msg in messages:
ack_code = msg.get('expected_ack')
if ack_code and acks.get(ack_code):
ack_info = acks[ack_code]
msg['status'] = 'delivered'
msg['delivery_snr'] = ack_info.get('snr')
msg['delivery_route'] = ack_info.get('route')
except Exception as e:
logger.debug(f"ACK status fetch failed (non-critical): {e}")
return jsonify({
'success': True,
'conversation_id': conversation_id,

View File

@@ -425,12 +425,20 @@ function displayMessages(messages) {
// Status icon for own messages
let statusIcon = '';
if (msg.is_own && msg.status) {
const icons = {
'pending': '<i class="bi bi-clock dm-status pending" title="Sending..."></i>',
'delivered': '<i class="bi bi-check2 dm-status delivered" title="Delivered"></i>',
'timeout': '<i class="bi bi-x-circle dm-status timeout" title="Not delivered"></i>'
};
statusIcon = icons[msg.status] || '';
if (msg.status === 'delivered') {
let title = 'Delivered';
if (msg.delivery_snr !== null && msg.delivery_snr !== undefined) {
title += `, SNR: ${msg.delivery_snr.toFixed(1)} dB`;
}
if (msg.delivery_route) title += ` (${msg.delivery_route})`;
statusIcon = `<i class="bi bi-check2 dm-status delivered" title="${title}"></i>`;
} else {
const icons = {
'pending': '<i class="bi bi-clock dm-status pending" title="Sending..."></i>',
'timeout': '<i class="bi bi-x-circle dm-status timeout" title="Not delivered"></i>'
};
statusIcon = icons[msg.status] || '';
}
}
// Metadata for incoming messages

View File

@@ -153,19 +153,29 @@ class MeshCLISession:
self.echo_lock = threading.Lock()
self.echo_log_path = self.config_dir / f"{device_name}.echoes.jsonl"
# Load persisted echo data from disk
# ACK tracking for DM delivery status
self.acks = {} # ack_code -> {snr, rssi, route, path, ts}
self.acks_file = self.config_dir / f"{device_name}.acks.jsonl"
# Load persisted data from disk
self._load_echoes()
self._load_acks()
# Start session
self._start_session()
def _update_log_paths(self, new_name):
"""Update advert/echo log paths after device name detection, renaming existing files."""
"""Update advert/echo/ack log paths after device name detection, renaming existing files."""
new_advert = self.config_dir / f"{new_name}.adverts.jsonl"
new_echo = self.config_dir / f"{new_name}.echoes.jsonl"
new_acks = self.config_dir / f"{new_name}.acks.jsonl"
# Rename existing files if they use the old (configured) name
for old_path, new_path in [(self.advert_log_path, new_advert), (self.echo_log_path, new_echo)]:
for old_path, new_path in [
(self.advert_log_path, new_advert),
(self.echo_log_path, new_echo),
(self.acks_file, new_acks),
]:
if old_path != new_path and old_path.exists() and not new_path.exists():
try:
old_path.rename(new_path)
@@ -175,6 +185,7 @@ class MeshCLISession:
self.advert_log_path = new_advert
self.echo_log_path = new_echo
self.acks_file = new_acks
logger.info(f"Log paths updated for device: {new_name}")
def _start_session(self):
@@ -344,6 +355,12 @@ class MeshCLISession:
self._process_echo(echo_data)
continue
# Try to parse as ACK packet (for DM delivery tracking)
ack_data = self._parse_ack_packet(line)
if ack_data:
self._process_ack(ack_data)
continue
# Otherwise, append to current CLI response
self._append_to_current_response(line)
@@ -680,6 +697,99 @@ class MeshCLISession:
except Exception as e:
logger.error(f"Failed to load echoes: {e}")
# =========================================================================
# ACK tracking for DM delivery status
# =========================================================================
def _parse_ack_packet(self, line):
"""Parse ACK JSON packet from stdout, return data dict or None."""
try:
data = json.loads(line)
if isinstance(data, dict) and data.get("payload_typename") == "ACK":
return {
'ack_code': data.get('pkt_payload'),
'snr': data.get('snr'),
'rssi': data.get('rssi'),
'route': data.get('route_typename'),
'path': data.get('path', ''),
'path_len': data.get('path_len', 0),
}
except (json.JSONDecodeError, ValueError):
pass
return None
def _process_ack(self, ack_data):
"""Process an ACK packet: store delivery confirmation."""
ack_code = ack_data.get('ack_code')
if not ack_code:
return
# Only store the first ACK per code (ignore duplicates from multi_acks)
if ack_code in self.acks:
logger.debug(f"ACK duplicate ignored: code={ack_code}")
return
record = {
'ack_code': ack_code,
'snr': ack_data.get('snr'),
'rssi': ack_data.get('rssi'),
'route': ack_data.get('route'),
'path': ack_data.get('path', ''),
'ts': time.time(),
}
self.acks[ack_code] = record
self._save_ack(record)
logger.info(f"ACK received: code={ack_code}, snr={ack_data.get('snr')}, route={ack_data.get('route')}")
def _save_ack(self, record):
"""Append ACK record to .acks.jsonl file."""
try:
with open(self.acks_file, 'a', encoding='utf-8') as f:
f.write(json.dumps(record, ensure_ascii=False) + '\n')
except Exception as e:
logger.error(f"Failed to save ACK: {e}")
def _load_acks(self):
"""Load ACK data from .acks.jsonl on startup with 7-day cleanup."""
if not self.acks_file.exists():
return
cutoff = time.time() - (7 * 24 * 3600) # 7 days
kept_lines = []
loaded = 0
try:
with open(self.acks_file, 'r', encoding='utf-8') as f:
for line in f:
line = line.strip()
if not line:
continue
try:
record = json.loads(line)
except json.JSONDecodeError:
continue
ts = record.get('ts', 0)
if ts < cutoff:
continue # Skip old records
kept_lines.append(line)
ack_code = record.get('ack_code')
if ack_code:
self.acks[ack_code] = record
loaded += 1
# Rewrite file with only recent records (compact)
with open(self.acks_file, 'w', encoding='utf-8') as f:
for line in kept_lines:
f.write(line + '\n')
logger.info(f"Loaded ACKs from disk: {loaded} records (kept {len(kept_lines)})")
except Exception as e:
logger.error(f"Failed to load ACKs: {e}")
def _log_advert(self, json_line):
"""Log advert JSON to .jsonl file with timestamp"""
try:
@@ -1314,6 +1424,40 @@ def get_echo_counts():
}), 200
# =============================================================================
# ACK tracking endpoint for DM delivery status
# =============================================================================
@app.route('/ack_status', methods=['GET'])
def get_ack_status():
"""
Get ACK status for sent DMs by their expected_ack codes.
Query params:
ack_codes: comma-separated list of expected_ack hex codes
Response JSON:
{
"success": true,
"acks": {
"544a4d8f": {"snr": 13.0, "rssi": -32, "route": "DIRECT", "ts": 1706500000.123},
"ff3b55ce": null
}
}
"""
if not meshcli_session:
return jsonify({'success': False, 'error': 'Not initialized'}), 503
requested = request.args.get('ack_codes', '')
codes = [c.strip() for c in requested.split(',') if c.strip()]
result = {}
for code in codes:
result[code] = meshcli_session.acks.get(code)
return jsonify({'success': True, 'acks': result}), 200
# =============================================================================
# WebSocket handlers for console
# =============================================================================