feat(v2): Route API endpoints through Database, remove bridge

- Update api.py: messages, contacts, DM endpoints read from SQLite DB
- Add DB fallback paths for parser.py backward compatibility
- Replace bridge echo registration with DeviceManager event handling
- Update status endpoint to use db.get_stats()
- Update channel updates/DM updates endpoints for DB queries
- Delete channel messages via DB instead of parser
- Remove meshcore-bridge/ directory (no longer needed in v2)
- Remove MC_BRIDGE_URL from config

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
MarekWo
2026-03-01 09:28:14 +01:00
parent badf67cf74
commit df8e2d2218
5 changed files with 243 additions and 2523 deletions

View File

@@ -18,8 +18,7 @@ class Config:
MC_DEVICE_NAME = os.getenv('MC_DEVICE_NAME', 'MeshCore')
MC_CONFIG_DIR = os.getenv('MC_CONFIG_DIR', '/root/.config/meshcore')
# MeshCore Bridge configuration (v1 — will be removed in Phase 1)
MC_BRIDGE_URL = os.getenv('MC_BRIDGE_URL', 'http://meshcore-bridge:5001/cli')
# MC_BRIDGE_URL removed in v2 (direct device communication)
# Archive configuration (v1 — archives move to SQLite in v2)
MC_ARCHIVE_DIR = os.getenv('MC_ARCHIVE_DIR', '/root/.archive/meshcore')

View File

@@ -15,7 +15,7 @@ from Crypto.Cipher import AES
from datetime import datetime
from io import BytesIO
from pathlib import Path
from flask import Blueprint, jsonify, request, send_file
from flask import Blueprint, jsonify, request, send_file, current_app
from app.meshcore import cli, parser
from app.config import config, runtime_config
from app.archiver import manager as archive_manager
@@ -23,6 +23,16 @@ from app.contacts_cache import get_all_names, get_all_contacts
logger = logging.getLogger(__name__)
def _get_db():
"""Get Database instance from app context."""
return getattr(current_app, 'db', None)
def _get_dm():
"""Get DeviceManager instance from app context."""
return getattr(current_app, 'device_manager', None)
api_bp = Blueprint('api', __name__, url_prefix='/api')
# Simple cache for get_channels() to reduce USB/meshcli calls
@@ -332,85 +342,50 @@ def get_messages():
'error': f'Invalid date format: {archive_date}. Expected YYYY-MM-DD'
}), 400
# Read messages (from archive or live .msgs file)
messages = parser.read_messages(
limit=limit,
offset=offset,
archive_date=archive_date,
days=days,
channel_idx=channel_idx
)
# v2: Read messages from Database
db = _get_db()
if db and not archive_date:
ch = channel_idx if channel_idx is not None else 0
db_messages = db.get_channel_messages(
channel_idx=ch,
limit=limit or 50,
offset=offset,
days=days,
)
# Convert DB rows to frontend-compatible format
messages = []
for row in db_messages:
messages.append({
'sender': row.get('sender', ''),
'content': row.get('content', ''),
'timestamp': row.get('timestamp', 0),
'datetime': datetime.fromtimestamp(row['timestamp']).isoformat() if row.get('timestamp') else None,
'is_own': bool(row.get('is_own', 0)),
'snr': row.get('snr'),
'path_len': row.get('path_len'),
'channel_idx': row.get('channel_idx', 0),
'sender_timestamp': row.get('sender_timestamp'),
'txt_type': row.get('txt_type', 0),
'raw_text': row.get('content', ''),
})
# Fetch echo data from bridge (for "Heard X repeats" + path display)
if not archive_date: # Only for live messages, not archives
try:
bridge_url = config.MC_BRIDGE_URL.replace('/cli', '/echo_counts')
response = requests.get(bridge_url, timeout=2)
if response.ok:
resp_data = response.json()
echo_counts = resp_data.get('echo_counts', [])
incoming_paths = resp_data.get('incoming_paths', [])
if incoming_paths:
logger.debug(f"Echo data: {len(echo_counts)} sent, {len(incoming_paths)} incoming paths from bridge")
# Merge sent echo counts + paths into own messages
for msg in messages:
if msg.get('is_own'):
msg['echo_count'] = 0
msg['echo_paths'] = []
for ec in echo_counts:
if (msg.get('channel_idx') == ec.get('channel_idx') and
abs(msg['timestamp'] - ec['timestamp']) < 5):
msg['echo_count'] = ec['count']
msg['echo_paths'] = ec.get('paths', [])
pkt = ec.get('pkt_payload')
if pkt:
msg['analyzer_url'] = compute_analyzer_url(pkt)
break
# Merge incoming paths into received messages
# Deterministic matching via computed pkt_payload
incoming_by_payload = {ip['pkt_payload']: ip for ip in incoming_paths}
# Get channel secrets for payload computation
_, channels = get_channels_cached()
channel_secrets = {ch['index']: ch['key'] for ch in (channels or [])}
for msg in messages:
if not msg.get('is_own') and msg.get('sender_timestamp') and msg.get('channel_idx') in channel_secrets:
secret = channel_secrets[msg['channel_idx']]
# Always compute attempt=0 payload for analyzer URL
base_payload = compute_pkt_payload(
secret, msg['sender_timestamp'],
msg.get('txt_type', 0), msg.get('raw_text', ''), attempt=0
)
msg['analyzer_url'] = compute_analyzer_url(base_payload)
# Try all 4 attempt values for path matching
matched = False
for attempt in range(4):
try:
computed_payload = compute_pkt_payload(
secret, msg['sender_timestamp'],
msg.get('txt_type', 0), msg.get('raw_text', ''), attempt
)
except Exception:
break
if computed_payload in incoming_by_payload:
entry = incoming_by_payload[computed_payload]
msg['paths'] = entry.get('paths', [])
matched = True
break
if not matched and incoming_by_payload:
raw = msg.get('raw_text', '')
logger.debug(
f"Echo mismatch: ts={msg.get('sender_timestamp')} "
f"ch={msg.get('channel_idx')} "
f"text_bytes={len(raw.encode('utf-8'))} "
f"base_payload={base_payload[:16]}... "
f"text_preview={raw[:40]!r}"
)
except Exception as e:
logger.debug(f"Echo data fetch failed (non-critical): {e}")
# Enrich with echo data from DB (if available)
for msg in messages:
if msg.get('is_own'):
pkt = row.get('pkt_payload') if row else None
if pkt:
echoes = db.get_echoes_for_message(pkt)
msg['echo_count'] = len(echoes)
msg['echo_paths'] = [e.get('path', '') for e in echoes]
else:
# Fallback to parser for archive reads
messages = parser.read_messages(
limit=limit,
offset=offset,
archive_date=archive_date,
days=days,
channel_idx=channel_idx
)
return jsonify({
'success': True,
@@ -473,16 +448,7 @@ def send_message():
success, message = cli.send_message(text, reply_to=reply_to, channel_index=channel_idx)
if success:
# Register for echo tracking ("Heard X repeats" feature)
try:
bridge_url = config.MC_BRIDGE_URL.replace('/cli', '/register_echo')
requests.post(
bridge_url,
json={'channel_idx': channel_idx, 'timestamp': time.time()},
timeout=2
)
except Exception as e:
logger.debug(f"Echo registration failed (non-critical): {e}")
# v2: Echo tracking is handled automatically by DeviceManager events
return jsonify({
'success': True,
@@ -515,12 +481,21 @@ def get_status():
# Check if device is accessible
connected = cli.check_connection()
# Get message count
message_count = parser.count_messages()
# Get latest message timestamp
latest = parser.get_latest_message()
latest_timestamp = latest['timestamp'] if latest else None
# v2: Get message count from Database
db = _get_db()
message_count = 0
latest_timestamp = None
if db:
stats = db.get_stats()
message_count = stats.get('channel_messages', 0) + stats.get('direct_messages', 0)
# Get latest channel message timestamp
recent = db.get_channel_messages(limit=1)
if recent:
latest_timestamp = recent[0].get('timestamp')
else:
message_count = parser.count_messages()
latest = parser.get_latest_message()
latest_timestamp = latest['timestamp'] if latest else None
return jsonify({
'success': True,
@@ -586,23 +561,55 @@ def get_cached_contacts():
try:
fmt = request.args.get('format', 'names')
if fmt == 'full':
contacts = get_all_contacts()
# Add public_key_prefix for display
for c in contacts:
c['public_key_prefix'] = c.get('public_key', '')[:12]
return jsonify({
'success': True,
'contacts': contacts,
'count': len(contacts)
}), 200
# v2: Read from Database (fallback to contacts_cache)
db = _get_db()
if db:
db_contacts = db.get_contacts()
if fmt == 'full':
contacts = []
for c in db_contacts:
pk = c.get('public_key', '')
contacts.append({
'public_key': pk,
'public_key_prefix': pk[:12],
'name': c.get('name', ''),
'first_seen': c.get('first_seen', ''),
'last_seen': c.get('last_seen', ''),
'source': c.get('source', ''),
'adv_lat': c.get('adv_lat'),
'adv_lon': c.get('adv_lon'),
'type_label': {0: 'CLI', 1: 'CLI', 2: 'REP', 3: 'ROOM', 4: 'SENS'}.get(c.get('type', 1), 'UNKNOWN'),
})
return jsonify({
'success': True,
'contacts': contacts,
'count': len(contacts)
}), 200
else:
names = sorted(set(c.get('name', '') for c in db_contacts if c.get('name')))
return jsonify({
'success': True,
'contacts': names,
'count': len(names)
}), 200
else:
names = get_all_names()
return jsonify({
'success': True,
'contacts': names,
'count': len(names)
}), 200
# Fallback to contacts_cache
if fmt == 'full':
contacts = get_all_contacts()
for c in contacts:
c['public_key_prefix'] = c.get('public_key', '')[:12]
return jsonify({
'success': True,
'contacts': contacts,
'count': len(contacts)
}), 200
else:
names = get_all_names()
return jsonify({
'success': True,
'contacts': names,
'count': len(names)
}), 200
except Exception as e:
logger.error(f"Error getting cached contacts: {e}")
@@ -1341,9 +1348,13 @@ def delete_channel(index):
"""
try:
# First, delete all messages for this channel
messages_deleted = parser.delete_channel_messages(index)
if not messages_deleted:
logger.warning(f"Failed to delete messages for channel {index}, continuing with channel removal")
db = _get_db()
if db:
db.delete_channel_messages(index)
else:
messages_deleted = parser.delete_channel_messages(index)
if not messages_deleted:
logger.warning(f"Failed to delete messages for channel {index}, continuing with channel removal")
# Then remove the channel itself
success, message = cli.remove_channel(index)
@@ -1515,10 +1526,11 @@ def get_messages_updates():
# OPTIMIZATION: Read ALL messages ONCE (no channel filter)
# Then compute per-channel statistics in memory
all_messages = parser.read_messages(
limit=None, # Get all messages
days=7 # Only last 7 days
)
db = _get_db()
if db:
all_messages = db.get_channel_messages(limit=None, days=7)
else:
all_messages = parser.read_messages(limit=None, days=7)
# Group messages by channel and compute stats
channel_stats = {} # channel_idx -> {latest_ts, messages_after_last_seen}
@@ -1617,7 +1629,25 @@ def get_dm_conversations():
try:
days = request.args.get('days', default=7, type=int)
conversations = parser.get_dm_conversations(days=days)
# v2: Read from Database
db = _get_db()
if db:
convos = db.get_dm_conversations()
# Convert to frontend-compatible format
conversations = []
for c in convos:
pk = c.get('contact_pubkey', '')
conversations.append({
'conversation_id': f"pk_{pk}" if pk else 'unknown',
'display_name': c.get('display_name', pk[:8] + '...' if pk else 'Unknown'),
'pubkey_prefix': pk[:12] if pk else '',
'last_message_timestamp': c.get('last_message_timestamp', 0),
'last_message_preview': (c.get('last_message_preview', '') or '')[:50],
'unread_count': 0,
'message_count': c.get('message_count', 0),
})
else:
conversations = parser.get_dm_conversations(days=days)
return jsonify({
'success': True,
@@ -1664,30 +1694,62 @@ def get_dm_messages():
limit = request.args.get('limit', default=100, type=int)
days = request.args.get('days', default=7, type=int)
messages, pubkey_to_name = parser.read_dm_messages(
limit=limit,
conversation_id=conversation_id,
days=days
)
# v2: Read from Database
db = _get_db()
pubkey_to_name = {}
# Filter out retry duplicate messages (keep only the first send)
try:
retry_codes = cli.get_retry_ack_codes()
if retry_codes:
messages = [msg for msg in messages
if not (msg.get('direction') == 'outgoing'
and msg.get('expected_ack') in retry_codes)]
except Exception as e:
logger.debug(f"Retry dedup failed (non-critical): {e}")
# Extract pubkey from conversation_id
contact_pubkey = ''
if conversation_id.startswith('pk_'):
contact_pubkey = conversation_id[3:]
elif conversation_id.startswith('name_'):
# Look up pubkey by name
contact_name = conversation_id[5:]
if db:
contacts = db.get_contacts()
for c in contacts:
if c.get('name', '').strip() == contact_name:
contact_pubkey = c['public_key']
break
# Secondary dedup: collapse retries with same text+recipient within 5min window
messages = parser.dedup_retry_messages(messages)
if db and contact_pubkey:
db_msgs = db.get_dm_messages(contact_pubkey, limit=limit)
messages = []
for row in db_msgs:
messages.append({
'type': 'dm',
'direction': 'incoming' if row['direction'] == 'in' else 'outgoing',
'sender': row.get('contact_pubkey', ''),
'content': row.get('content', ''),
'timestamp': row.get('timestamp', 0),
'datetime': datetime.fromtimestamp(row['timestamp']).isoformat() if row.get('timestamp') else None,
'is_own': row['direction'] == 'out',
'snr': row.get('snr'),
'path_len': row.get('path_len'),
'expected_ack': row.get('expected_ack'),
'conversation_id': conversation_id,
})
else:
messages, pubkey_to_name = parser.read_dm_messages(
limit=limit,
conversation_id=conversation_id,
days=days
)
messages = parser.dedup_retry_messages(messages)
# Determine display name from conversation_id or messages
# Determine display name
display_name = 'Unknown'
if conversation_id.startswith('pk_'):
pk = conversation_id[3:]
display_name = pubkey_to_name.get(pk, pk[:8] + '...')
# Try DB first
if db:
contact = db.get_contact(pk)
if contact:
display_name = contact.get('name', pk[:8] + '...')
else:
display_name = pubkey_to_name.get(pk, pk[:8] + '...')
else:
display_name = pubkey_to_name.get(pk, pk[:8] + '...')
elif conversation_id.startswith('name_'):
display_name = conversation_id[5:]
@@ -1871,37 +1933,56 @@ def get_dm_updates():
except json.JSONDecodeError:
last_seen = {}
# Get all conversations
conversations = parser.get_dm_conversations(days=7)
# v2: Read from Database
db = _get_db()
updates = []
total_unread = 0
for conv in conversations:
conv_id = conv['conversation_id']
last_seen_ts = last_seen.get(conv_id, 0)
if db:
convos = db.get_dm_conversations()
for c in convos:
pk = c.get('contact_pubkey', '')
conv_id = f"pk_{pk}" if pk else 'unknown'
display_name = c.get('display_name', pk[:8] + '...' if pk else 'Unknown')
last_msg_ts = c.get('last_message_timestamp', 0)
last_seen_ts = last_seen.get(conv_id, 0)
# Count unread
if conv['last_message_timestamp'] > last_seen_ts:
# Need to count actual unread messages (dedup retries)
messages, _ = parser.read_dm_messages(
conversation_id=conv_id,
days=7
)
messages = parser.dedup_retry_messages(messages)
unread_count = sum(1 for m in messages if m['timestamp'] > last_seen_ts)
else:
unread_count = 0
if last_msg_ts > last_seen_ts and pk:
db_msgs = db.get_dm_messages(pk, limit=200)
unread_count = sum(1 for m in db_msgs if m.get('timestamp', 0) > last_seen_ts)
else:
unread_count = 0
total_unread += unread_count
total_unread += unread_count
if unread_count > 0:
updates.append({
'conversation_id': conv_id,
'display_name': conv['display_name'],
'unread_count': unread_count,
'latest_timestamp': conv['last_message_timestamp']
})
if unread_count > 0:
updates.append({
'conversation_id': conv_id,
'display_name': display_name,
'unread_count': unread_count,
'latest_timestamp': last_msg_ts
})
else:
# Fallback to parser
conversations = parser.get_dm_conversations(days=7)
for conv in conversations:
conv_id = conv['conversation_id']
last_seen_ts = last_seen.get(conv_id, 0)
if conv['last_message_timestamp'] > last_seen_ts:
messages, _ = parser.read_dm_messages(conversation_id=conv_id, days=7)
messages = parser.dedup_retry_messages(messages)
unread_count = sum(1 for m in messages if m['timestamp'] > last_seen_ts)
else:
unread_count = 0
total_unread += unread_count
if unread_count > 0:
updates.append({
'conversation_id': conv_id,
'display_name': conv['display_name'],
'unread_count': unread_count,
'latest_timestamp': conv['last_message_timestamp']
})
return jsonify({
'success': True,

View File

@@ -1,28 +0,0 @@
# MeshCore Bridge Dockerfile
FROM python:3.11-slim
LABEL maintainer="mc-webui"
LABEL description="MeshCore CLI Bridge - HTTP API wrapper for meshcli"
WORKDIR /bridge
# Install system dependencies
RUN apt-get update && apt-get install -y \
gcc \
python3-dev \
&& rm -rf /var/lib/apt/lists/*
# Install meshcore-cli (from PyPI)
RUN pip install --no-cache-dir meshcore-cli==1.4.2
# Copy bridge application
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY bridge.py .
# Expose bridge API port
EXPOSE 5001
# Run bridge
CMD ["python", "bridge.py"]

File diff suppressed because it is too large Load Diff

View File

@@ -1,10 +0,0 @@
# MeshCore Bridge - Minimal dependencies
Flask==3.0.0
Werkzeug==3.0.1
# WebSocket support for console
flask-socketio==5.3.6
python-socketio==5.10.0
python-engineio==4.8.1
gevent==23.9.1
gevent-websocket==0.10.1