From d80f9a7b3a606dc1aa8541b37a2e3590ed760e1c Mon Sep 17 00:00:00 2001 From: MarekWo Date: Thu, 19 Mar 2026 08:09:03 +0100 Subject: [PATCH] feat(console): add contact management commands (Etap 2) Add 14 console commands for contact management: contact_info, path, disc_path, reset_path, change_path, advert_path, share_contact, export_contact, import_contact, remove_contact, change_flags, pending_contacts, add_pending, flush_pending. Co-Authored-By: Claude Opus 4.6 --- app/device_manager.py | 149 ++++++++++++++++++++++++++++++++++++ app/main.py | 173 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 322 insertions(+) diff --git a/app/device_manager.py b/app/device_manager.py index a554c0a..5f4f80e 100644 --- a/app/device_manager.py +++ b/app/device_manager.py @@ -1739,3 +1739,152 @@ class DeviceManager: except Exception as e: logger.error(f"req_mma failed: {e}") return {'success': False, 'error': str(e)} + + # ── Contact Management (extended) ──────────────────────────── + + def contact_info(self, name_or_key: str) -> Dict: + """Get full info for a contact.""" + if not self.is_connected: + return {'success': False, 'error': 'Device not connected'} + contact = self.resolve_contact(name_or_key) + if not contact: + return {'success': False, 'error': f"Contact not found: {name_or_key}"} + return {'success': True, 'data': dict(contact)} + + def contact_path(self, name_or_key: str) -> Dict: + """Get path info for a contact.""" + if not self.is_connected: + return {'success': False, 'error': 'Device not connected'} + contact = self.resolve_contact(name_or_key) + if not contact: + return {'success': False, 'error': f"Contact not found: {name_or_key}"} + return {'success': True, 'data': { + 'out_path': contact.get('out_path', ''), + 'out_path_len': contact.get('out_path_len', -1), + 'out_path_hash_len': contact.get('out_path_hash_len', 0), + }} + + def discover_path(self, name_or_key: str) -> Dict: + """Discover a new path to a contact.""" + if not self.is_connected: + return {'success': False, 'error': 'Device not connected'} + contact = self.resolve_contact(name_or_key) + if not contact: + return {'success': False, 'error': f"Contact not found: {name_or_key}"} + try: + from meshcore.events import EventType + res = self.execute( + self.mc.commands.send_path_discovery(contact), + timeout=10 + ) + timeout = 30 + if res and hasattr(res, 'payload') and 'suggested_timeout' in res.payload: + timeout = res.payload['suggested_timeout'] / 600 + timeout = max(timeout, contact.get('timeout', 0) or 30) + event = self.execute( + self.mc.wait_for_event(EventType.PATH_RESPONSE, timeout=timeout), + timeout=timeout + 5 + ) + if event and hasattr(event, 'payload'): + return {'success': True, 'data': event.payload} + return {'success': False, 'error': 'No path response (timeout)'} + except Exception as e: + logger.error(f"discover_path failed: {e}") + return {'success': False, 'error': str(e)} + + def change_path(self, name_or_key: str, path: str) -> Dict: + """Change the path to a contact.""" + if not self.is_connected: + return {'success': False, 'error': 'Device not connected'} + contact = self.resolve_contact(name_or_key) + if not contact: + return {'success': False, 'error': f"Contact not found: {name_or_key}"} + try: + self.execute(self.mc.commands.change_contact_path(contact, path), timeout=10) + return {'success': True, 'message': f'Path changed for {contact.get("adv_name", name_or_key)}'} + except Exception as e: + logger.error(f"change_path failed: {e}") + return {'success': False, 'error': str(e)} + + def advert_path(self, name_or_key: str) -> Dict: + """Get advertisement path for a contact.""" + if not self.is_connected: + return {'success': False, 'error': 'Device not connected'} + contact = self.resolve_contact(name_or_key) + if not contact: + return {'success': False, 'error': f"Contact not found: {name_or_key}"} + try: + event = self.execute(self.mc.commands.get_advert_path(contact), timeout=10) + if event and hasattr(event, 'payload'): + return {'success': True, 'data': event.payload} + return {'success': False, 'error': 'No advert path response'} + except Exception as e: + logger.error(f"advert_path failed: {e}") + return {'success': False, 'error': str(e)} + + def share_contact(self, name_or_key: str) -> Dict: + """Share a contact with others on the mesh.""" + if not self.is_connected: + return {'success': False, 'error': 'Device not connected'} + contact = self.resolve_contact(name_or_key) + if not contact: + return {'success': False, 'error': f"Contact not found: {name_or_key}"} + try: + self.execute(self.mc.commands.share_contact(contact), timeout=10) + return {'success': True, 'message': f'Contact shared: {contact.get("adv_name", name_or_key)}'} + except Exception as e: + logger.error(f"share_contact failed: {e}") + return {'success': False, 'error': str(e)} + + def export_contact(self, name_or_key: str) -> Dict: + """Export a contact as URI.""" + if not self.is_connected: + return {'success': False, 'error': 'Device not connected'} + contact = self.resolve_contact(name_or_key) + if not contact: + return {'success': False, 'error': f"Contact not found: {name_or_key}"} + try: + event = self.execute(self.mc.commands.export_contact(contact), timeout=10) + if event and hasattr(event, 'payload'): + uri = event.payload.get('uri', '') + if isinstance(uri, bytes): + uri = 'meshcore://' + uri.hex() + return {'success': True, 'data': {'uri': uri}} + return {'success': False, 'error': 'No export response'} + except Exception as e: + logger.error(f"export_contact failed: {e}") + return {'success': False, 'error': str(e)} + + def import_contact_uri(self, uri: str) -> Dict: + """Import a contact from meshcore:// URI.""" + if not self.is_connected: + return {'success': False, 'error': 'Device not connected'} + try: + if uri.startswith('meshcore://'): + hex_data = uri[11:] + else: + hex_data = uri + contact_bytes = bytes.fromhex(hex_data) + self.execute(self.mc.commands.import_contact(contact_bytes), timeout=10) + # Refresh contacts + self.execute(self.mc.commands.get_contacts(), timeout=10) + return {'success': True, 'message': 'Contact imported'} + except ValueError: + return {'success': False, 'error': 'Invalid URI format (expected hex data)'} + except Exception as e: + logger.error(f"import_contact failed: {e}") + return {'success': False, 'error': str(e)} + + def change_contact_flags(self, name_or_key: str, flags: int) -> Dict: + """Change flags for a contact.""" + if not self.is_connected: + return {'success': False, 'error': 'Device not connected'} + contact = self.resolve_contact(name_or_key) + if not contact: + return {'success': False, 'error': f"Contact not found: {name_or_key}"} + try: + self.execute(self.mc.commands.change_contact_flags(contact, flags), timeout=10) + return {'success': True, 'message': f'Flags changed for {contact.get("adv_name", name_or_key)}'} + except Exception as e: + logger.error(f"change_flags failed: {e}") + return {'success': False, 'error': str(e)} diff --git a/app/main.py b/app/main.py index d448593..a6c0439 100644 --- a/app/main.py +++ b/app/main.py @@ -503,6 +503,164 @@ def _execute_console_command(args: list) -> str: elif cmd == 'req_mma': return "Usage: req_mma \n Time format: number with optional suffix s/m/h (default: minutes)" + # ── Contact management commands ────────────────────────────── + + elif cmd == 'contact_info' and len(args) >= 2: + name = ' '.join(args[1:]) + result = device_manager.contact_info(name) + if result.get('success'): + data = result['data'] + lines = [f"Contact: {data.get('adv_name', data.get('name', name))}"] + for k, v in sorted(data.items()): + if isinstance(v, bytes): + v = v.hex() + lines.append(f" {k}: {v}") + return "\n".join(lines) + return f"Error: {result.get('error')}" + + elif cmd == 'path' and len(args) >= 2: + name = ' '.join(args[1:]) + result = device_manager.contact_path(name) + if result.get('success'): + data = result['data'] + opl = data.get('out_path_len', -1) + raw = data.get('out_path', '') + if opl > 0: + hop_count = opl & 0x3F + hash_size = (opl >> 6) + 1 + meaningful = raw[:hop_count * hash_size * 2] + chunk = hash_size * 2 + hops = [meaningful[i:i+chunk].upper() for i in range(0, len(meaningful), chunk)] + path_str = ' → '.join(hops) if hops else f'len:{opl}' + return f"Path to {name}: {path_str} ({hop_count} hops)" + elif opl == 0: + return f"Path to {name}: Direct" + else: + return f"Path to {name}: Flood (no path)" + return f"Error: {result.get('error')}" + + elif cmd == 'disc_path' and len(args) >= 2: + name = ' '.join(args[1:]) + result = device_manager.discover_path(name) + if result.get('success'): + data = result['data'] + lines = [f"Discovered path to {name}:"] + for k, v in data.items(): + lines.append(f" {k}: {v}") + return "\n".join(lines) + return f"Error: {result.get('error')}" + + elif cmd == 'reset_path' and len(args) >= 2: + name = ' '.join(args[1:]) + contact = device_manager.resolve_contact(name) + if not contact: + return f"Error: Contact not found: {name}" + result = device_manager.reset_path(contact.get('public_key', name)) + if result.get('success'): + return f"Path reset for {contact.get('adv_name', name)}" + return f"Error: {result.get('error')}" + + elif cmd == 'change_path' and len(args) >= 3: + name = args[1] + path = args[2] + result = device_manager.change_path(name, path) + if result.get('success'): + return result.get('message', 'OK') + return f"Error: {result.get('error')}" + + elif cmd == 'change_path': + return "Usage: change_path \n Path: hex string, e.g. 6a61" + + elif cmd == 'advert_path' and len(args) >= 2: + name = ' '.join(args[1:]) + result = device_manager.advert_path(name) + if result.get('success'): + data = result['data'] + lines = [f"Advert path to {name}:"] + for k, v in data.items(): + lines.append(f" {k}: {v}") + return "\n".join(lines) + return f"Error: {result.get('error')}" + + elif cmd == 'share_contact' and len(args) >= 2: + name = ' '.join(args[1:]) + result = device_manager.share_contact(name) + if result.get('success'): + return result.get('message', 'OK') + return f"Error: {result.get('error')}" + + elif cmd == 'export_contact' and len(args) >= 2: + name = ' '.join(args[1:]) + result = device_manager.export_contact(name) + if result.get('success'): + uri = result['data'].get('uri', '') + return f"URI: {uri}" + return f"Error: {result.get('error')}" + + elif cmd == 'import_contact' and len(args) >= 2: + uri = args[1] + result = device_manager.import_contact_uri(uri) + if result.get('success'): + return result.get('message', 'OK') + return f"Error: {result.get('error')}" + + elif cmd == 'remove_contact' and len(args) >= 2: + name = ' '.join(args[1:]) + contact = device_manager.resolve_contact(name) + if not contact: + return f"Error: Contact not found: {name}" + result = device_manager.delete_contact(contact.get('public_key', name)) + if result.get('success'): + return f"Contact removed: {contact.get('adv_name', name)}" + return f"Error: {result.get('error')}" + + elif cmd == 'change_flags' and len(args) >= 3: + name = args[1] + try: + flags = int(args[2]) + except ValueError: + return "Error: flags must be an integer" + result = device_manager.change_contact_flags(name, flags) + if result.get('success'): + return result.get('message', 'OK') + return f"Error: {result.get('error')}" + + elif cmd == 'change_flags': + return "Usage: change_flags \n Flags: integer (tel_l|tel_a|star)" + + elif cmd == 'pending_contacts': + result = device_manager.get_pending_contacts() + if not result: + return "No pending contacts" + lines = [f"Pending contacts ({len(result)}):"] + for c in result: + name = c.get('adv_name', c.get('name', '?')) + pk = c.get('public_key', '')[:12] + lines.append(f" {name} ({pk}...)") + return "\n".join(lines) + + elif cmd == 'add_pending' and len(args) >= 2: + key_or_name = ' '.join(args[1:]) + # Try to find in pending contacts + pending = device_manager.get_pending_contacts() + target = None + for c in (pending or []): + if c.get('public_key', '').startswith(key_or_name) or c.get('adv_name', '') == key_or_name: + target = c + break + if not target: + return f"Error: Not found in pending: {key_or_name}" + result = device_manager.approve_contact(target['public_key']) + if result.get('success'): + return f"Contact added: {target.get('adv_name', key_or_name)}" + return f"Error: {result.get('error')}" + + elif cmd == 'flush_pending': + result = device_manager.clear_pending_contacts() + if result.get('success'): + return result.get('message', 'Pending contacts flushed') + return f"Error: {result.get('error')}" + elif cmd == 'help': return ( "Available commands:\n\n" @@ -522,6 +680,21 @@ def _execute_console_command(args: list) -> str: " telemetry — Request sensor telemetry\n" " neighbors — List neighbors of a node\n" " trace [tag] — Send trace packet\n\n" + " Contacts\n" + " contact_info — Contact details (JSON)\n" + " path — Show path to contact\n" + " disc_path — Discover new path\n" + " reset_path — Reset path to flood\n" + " change_path

— Change path to contact\n" + " advert_path — Get path from advert\n" + " share_contact — Share contact with mesh\n" + " export_contact — Export contact URI\n" + " import_contact — Import contact from URI\n" + " remove_contact — Remove contact from device\n" + " change_flags — Change contact flags\n" + " pending_contacts — Show pending contacts\n" + " add_pending — Add pending contact\n" + " flush_pending — Flush pending list\n\n" " Repeaters\n" " login — Log into a repeater\n" " logout — Log out of a repeater\n"