Initial commit

PyMeshCore GUI is an open-source desktop application for interacting with the MeshCore network.
It focuses on **chatting, prototyping, and development** on top of MeshCore, with an emphasis on
desktop workflows and developer accessibility.

The project is built using PySide6 (Qt for Python) and meshcore-py.
This commit is contained in:
Stefan de Konink
2026-01-04 20:54:12 +01:00
commit 18e21ffcdb
39 changed files with 2342 additions and 0 deletions

View File

@@ -0,0 +1,8 @@
Metadata-Version: 2.4
Name: PyMeshCoreGUI
Version: 0.1.0
Summary: An open source Qt application build on MeshCore.
Author-email: Stefan de Konink <stefan@konink.de>
Requires-Python: >=3.12
Requires-Dist: PySide6
Requires-Dist: meshcore

View File

@@ -0,0 +1,35 @@
pyproject.toml
src/PyMeshCoreGUI.egg-info/PKG-INFO
src/PyMeshCoreGUI.egg-info/SOURCES.txt
src/PyMeshCoreGUI.egg-info/dependency_links.txt
src/PyMeshCoreGUI.egg-info/entry_points.txt
src/PyMeshCoreGUI.egg-info/requires.txt
src/PyMeshCoreGUI.egg-info/top_level.txt
src/meshcore_gui/__init__.py
src/meshcore_gui/main.py
src/meshcore_gui/controllers/__init__.py
src/meshcore_gui/controllers/channel_controller.py
src/meshcore_gui/controllers/contact_controller.py
src/meshcore_gui/controllers/meshcore_controller.py
src/meshcore_gui/controllers/message_controller.py
src/meshcore_gui/models/__init__.py
src/meshcore_gui/models/channel.py
src/meshcore_gui/models/channel_list_model.py
src/meshcore_gui/models/contact.py
src/meshcore_gui/models/contact_list_model.py
src/meshcore_gui/models/message.py
src/meshcore_gui/models/message_list_model.py
src/meshcore_gui/utils/__init__.py
src/meshcore_gui/utils/async_executor.py
src/meshcore_gui/utils/parsing.py
src/meshcore_gui/views/__init__.py
src/meshcore_gui/views/main_window.py
src/meshcore_gui/wizard/__init__.py
src/meshcore_gui/wizard/wizard_companion.py
src/meshcore_gui/wizard/companion/__init__.py
src/meshcore_gui/wizard/companion/community_presets.py
src/meshcore_gui/wizard/companion/connection.py
src/meshcore_gui/wizard/companion/deviceselection.py
src/meshcore_gui/wizard/companion/devicestatus.py
src/meshcore_gui/wizard/companion/mc_bluetooth.py
src/meshcore_gui/wizard/companion/mc_serial.py

View File

@@ -0,0 +1 @@

View File

@@ -0,0 +1,2 @@
[console_scripts]
meshcore-gui = meshcore_gui.main:main

View File

@@ -0,0 +1,2 @@
PySide6
meshcore

View File

@@ -0,0 +1 @@
meshcore_gui

View File

View File

View File

@@ -0,0 +1,49 @@
from PySide6.QtCore import QObject, Slot, Signal, Qt, QItemSelection
from PySide6.QtWidgets import QListView
from meshcore import EventType
from meshcore.events import Event
from ..models.channel import Channel
from ..models.channel_list_model import ChannelListModel
class ChannelController(QObject):
selected = Signal(object)
def __init__(self, view: QListView, parent: QObject | None = None):
super().__init__(parent)
self.view = view
self.model = ChannelListModel()
self.view.setModel(self.model)
self.view.setSelectionMode(QListView.SingleSelection)
selection_model = self.view.selectionModel()
selection_model.selectionChanged.connect(self._on_selected)
def _on_selected(self, selected: QItemSelection, deselected: QItemSelection):
if not selected.indexes():
return
index = selected.indexes()[0]
data = index.data(Qt.UserRole)
if data and "channel_idx" in data:
self.model.set_new_message(data["channel_idx"], True)
self.selected.emit(data)
@Slot(int)
def new_channel_message(self, channel_idx: int):
self.model.set_new_message(channel_idx, False)
@Slot()
def deselect(self):
if self.view.selectionModel().hasSelection():
self.view.selectionModel().clearSelection()
@Slot(object)
def process_event(self, event: Event):
if event.type == EventType.CHANNEL_INFO:
channel = Channel(event.payload)
if not channel.is_valid():
return # Ignore invalid channel
self.model.add_or_update(channel)

View File

@@ -0,0 +1,53 @@
from PySide6.QtCore import QObject, Slot, Signal, Qt, QItemSelection
from PySide6.QtWidgets import QListView
from meshcore import EventType
from meshcore.events import Event
from ..models.contact import Contact
from ..models.contact_list_model import ContactListModel
class ContactController(QObject):
selected = Signal(object)
def __init__(self, view: QListView, parent: QObject | None = None):
super().__init__(parent)
self.view = view
self.model = ContactListModel()
self.view.setModel(self.model)
self.view.setSelectionMode(QListView.SingleSelection)
selection_model = self.view.selectionModel()
selection_model.selectionChanged.connect(self._on_selected)
def _on_selected(self, selected: QItemSelection, deselected: QItemSelection):
if not selected.indexes():
return
index = selected.indexes()[0]
data = index.data(Qt.UserRole)
if data and "public_key" in data:
self.model.set_new_message(data["public_key"], True)
self.selected.emit(data)
@Slot(str)
def new_private_message(self, pubkey_prefix: str):
self.model.set_new_message(pubkey_prefix, False)
@Slot()
def deselect(self):
if self.view.selectionModel().hasSelection():
self.view.selectionModel().clearSelection()
@Slot(object)
def process_event(self, event: Event):
if event.type == EventType.CONTACTS:
for contact_payload in event.payload.values():
contact = Contact(contact_payload)
self.model.add_contact(contact)
elif (
event.type == EventType.NEW_CONTACT or event.type == EventType.NEXT_CONTACT
):
contact = Contact(event.payload)
self.model.add_contact(contact)

View File

@@ -0,0 +1,126 @@
import asyncio
from typing import Any
from PySide6.QtCore import QObject, Signal, Slot
from meshcore import MeshCore, EventType
from meshcore.events import Event
from ..utils.async_executor import AsyncExecutor
from ..utils.parsing import parse_rx_log_data
class MeshCoreController(QObject):
self_info_ready = Signal(object)
contacts_ready = Signal(object)
channels_ready = Signal(object)
event_received = Signal(object)
message_sent = Signal(object)
def __init__(self, mc: MeshCore, executor: AsyncExecutor):
super().__init__()
self.mc = mc
self.executor = executor
self._subscriptions = []
self._shutdown_called = False
@Slot(object)
def process_event(self, event: Event):
if event.type == EventType.RX_LOG_DATA:
# TODO: Publish this to a dedicated debugging tab
print("RX_LOG_DATA", parse_rx_log_data(event.payload))
elif event.type == EventType.NO_MORE_MSGS:
pass
@Slot()
def get_self_info(self):
"""Emit self_info immediately"""
self.self_info_ready.emit(self.mc.self_info)
@Slot()
def fetch_contacts(self):
"""Fetch contacts async via executor"""
self.executor.submit_async(
self.mc.commands.get_contacts_async(), self.contacts_ready
)
@Slot()
def channel_info(self):
"""Fetch contacts async via executor"""
async def all_channels():
for channel_idx in range(0, 40):
await self.mc.commands.get_channel(channel_idx)
self.executor.submit_async(all_channels(), self.channels_ready)
@Slot()
def send_adv(self):
self.executor.submit_async(self.mc.commands.send_advert(True), None)
def set_radio(self, freq: float, bw: float, sf: int, cr: int):
self.executor.submit_async(self.mc.commands.set_radio(freq, bw, sf, cr), None)
def reboot(self):
self.executor.submit_async(self.mc.commands.reboot(), self.reboot)
def set_name(self, name: str):
self.executor.submit_async(self.mc.commands.set_name(name), None)
def start_message_stream(self):
"""
Subscribe to MeshCore native message events.
Each event triggers a Qt signal to the GUI.
"""
async def handle_message(event: Event):
self.event_received.emit(event)
sub1 = self.mc.subscribe(None, handle_message)
# Keep track of subscriptions so we can unsubscribe them later
self._subscriptions.extend([sub1])
# Start MeshCore automatic fetching
asyncio.run_coroutine_threadsafe(
self.mc.start_auto_message_fetching(), self.executor.loop
)
@Slot(dict, str)
def send_message(self, target: dict[str, Any], text: str):
if "channel_idx" in target:
self.executor.submit_async(
self.mc.commands.send_chan_msg(target["channel_idx"], text),
self.message_sent,
)
elif "public_key" in target:
self.executor.submit_async(
self.mc.commands.send_msg(target["public_key"], text),
self.message_sent,
)
def shutdown(self):
"""Idempotent shutdown"""
if self._shutdown_called:
return
self._shutdown_called = True
# Cancel subscriptions
for sub in self._subscriptions:
try:
self.mc.unsubscribe(sub)
except Exception:
pass
self._subscriptions.clear()
async def stop_fetching():
try:
await self.mc.stop_auto_message_fetching()
await self.mc.disconnect()
except Exception:
pass
asyncio.run_coroutine_threadsafe(stop_fetching(), self.executor.loop)
self.executor.shutdown()

View File

@@ -0,0 +1,76 @@
import time
from typing import Any
from PySide6.QtCore import QObject, Signal, Slot
from PySide6.QtWidgets import QListView
from meshcore import EventType
from meshcore.events import Event
from ..models.message import Message
from ..models.message_list_model import MessageListModel
class MessageController(QObject):
new_channel_message = Signal(int)
new_private_message = Signal(str)
def __init__(self, view: QListView, parent: QObject | None = None):
super().__init__(parent)
self.view: QListView = view
self.current_chat: int | str | None = None
self.channel_models: dict[int | str, MessageListModel] = {}
def get_channel_model(self, chat: int | str) -> MessageListModel:
if chat not in self.channel_models:
self.channel_models[chat] = MessageListModel()
return self.channel_models[chat]
@Slot(object)
def set_chat(self, payload: dict[str, Any]):
chat_id = None
if "channel_idx" in payload:
chat_id = payload["channel_idx"]
elif "public_key" in payload:
chat_id = payload["public_key"][0:12]
if chat_id is not None:
self.current_chat = chat_id
model = self.get_channel_model(chat_id)
self.view.setModel(model)
@Slot(dict, str)
def send_message(self, target: dict[str, Any], text: str):
chat_id = None
if "channel_idx" in target:
chat_id = target["channel_idx"]
elif "public_key" in target:
chat_id = target["public_key"][0:12]
if chat_id is not None:
model = self.get_channel_model(chat_id)
model.add_message(
Message({"text": text, "sender_timestamp": int(time.time())})
)
@Slot(object)
def process_event(self, event: Event):
if event.type == EventType.CHANNEL_MSG_RECV:
payload = event.payload
chat = payload["channel_idx"]
model = self.get_channel_model(chat)
model.add_message(Message(payload))
if self.current_chat != chat:
self.new_channel_message.emit(chat)
elif event.type == EventType.CONTACT_MSG_RECV:
payload = event.payload
chat = payload["pubkey_prefix"]
model = self.get_channel_model(chat)
model.add_message(Message(payload))
if self.current_chat != chat:
self.new_private_message.emit(chat)

222
src/meshcore_gui/main.py Normal file
View File

@@ -0,0 +1,222 @@
import argparse
import asyncio
import signal
import sys
from meshcore import MeshCore
from PySide6.QtCore import QCoreApplication, QObject, QSettings, QThread, QTimer, Slot
from PySide6.QtWidgets import QApplication
from .controllers.channel_controller import ChannelController
from .controllers.contact_controller import ContactController
from .controllers.message_controller import MessageController
from .controllers.meshcore_controller import MeshCoreController
from .utils.async_executor import AsyncExecutor
from .views.main_window import MeshCoreWidget
from .wizard.wizard_companion import CompanionWizard
class ApplicationController(QObject):
def __init__(self, executor, loop):
self.mc = None
self.meshcore_controller = None
self.executor = executor
self.loop = loop
def attempt_connection(self):
async def init_mc(port, interace_type: str):
try:
if interface_type == "serial":
mc = await MeshCore.create_serial(port)
return mc
elif interface_type == "bluetooth":
mc = await MeshCore.create_ble(port)
return mc
except:
pass
return None
async def disconnect_mc(mc):
try:
await mc.disconnect()
except:
pass
if self.mc is not None:
future = asyncio.run_coroutine_threadsafe(disconnect_mc(self.mc), self.loop)
self.mc = None
port = None
settings = QSettings()
settings.beginGroup("interface")
interface_type = settings.value("type", False)
if interface_type == "serial":
port = settings.value("port")
elif interface_type == "bluetooth":
port = settings.value("addr")
settings.endGroup()
if port is None:
return None
future = asyncio.run_coroutine_threadsafe(
init_mc(port, interface_type), self.loop
)
self.mc = future.result()
self.meshcore_controller = MeshCoreController(self.mc, self.executor)
return self.mc is not None
def start(self, force_wizard=False):
if force_wizard:
self.start_wizard()
else:
if self.mc is None:
self.attempt_connection()
if self.mc is not None:
self.start_main()
else:
self.start_wizard()
@Slot(bool)
def on_wizard_done(self, success: bool):
if success:
self.start()
def start_wizard(self):
self.wizard = CompanionWizard(self)
self.wizard.completed.connect(self.on_wizard_done)
self.wizard.show()
def start_main(self):
# --- GUI ---
self.widget = MeshCoreWidget()
self.widget.setWindowTitle("MeshCore Chat")
self.widget.resize(400, 600)
self.widget.show()
# --- Controllers ---
self.message_controller = MessageController(self.widget.list_msgs)
self.contact_controller = ContactController(self.widget.list_contacts)
self.channel_controller = ChannelController(self.widget.list_channels)
# --- Signal/Slot Connections ---
self.meshcore_controller.event_received.connect(
self.meshcore_controller.process_event
)
self.meshcore_controller.event_received.connect(
self.message_controller.process_event
)
self.meshcore_controller.event_received.connect(
self.contact_controller.process_event
)
self.meshcore_controller.event_received.connect(
self.channel_controller.process_event
)
self.meshcore_controller.self_info_ready.connect(self.widget.update_self_info)
self.meshcore_controller.message_sent.connect(self.widget.on_sent)
self.contact_controller.selected.connect(self.message_controller.set_chat)
self.contact_controller.selected.connect(self.channel_controller.deselect)
self.contact_controller.selected.connect(self.widget.set_target)
self.channel_controller.selected.connect(self.contact_controller.deselect)
self.channel_controller.selected.connect(self.message_controller.set_chat)
self.channel_controller.selected.connect(self.widget.set_target)
self.message_controller.new_channel_message.connect(
self.channel_controller.new_channel_message
)
self.message_controller.new_private_message.connect(
self.contact_controller.new_private_message
)
self.widget.text_submitted.connect(self.meshcore_controller.send_message)
self.widget.text_submitted.connect(self.message_controller.send_message)
self.widget.button_adv.clicked.connect(self.meshcore_controller.send_adv)
# --- Initial actions ---
self.meshcore_controller.start_message_stream()
self.meshcore_controller.channel_info()
self.meshcore_controller.get_self_info()
self.meshcore_controller.fetch_contacts()
def main():
parser = argparse.ArgumentParser(description="MeshCore GUI")
parser.add_argument(
"--port", default="/dev/ttyACM0", help="Serial port for the MeshCore device"
)
parser.add_argument(
"--wizard", default=False, action="store_true", help="Start the wizard"
)
args = parser.parse_args()
if args.port is not None:
settings = QSettings()
settings.beginGroup("interface")
settings.setValue("port", args.port)
settings.endGroup()
app = QApplication(sys.argv)
app.setQuitOnLastWindowClosed(True)
# --- asyncio loop in background thread ---
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop_thread = QThread()
def start_loop():
asyncio.set_event_loop(loop)
loop.run_forever()
loop_thread.run = start_loop
loop_thread.start()
executor = AsyncExecutor(loop)
controller = ApplicationController(executor, loop)
controller.start(args.wizard)
# --- Idempotent shutdown ---
shutdown_called = False
def shutdown(*args):
nonlocal shutdown_called
if shutdown_called:
return
shutdown_called = True
if controller.meshcore_controller:
controller.meshcore_controller.shutdown()
loop.call_soon_threadsafe(loop.stop)
loop_thread.quit()
loop_thread.wait()
app.quit()
# --- Signal Handlers ---
signal.signal(signal.SIGINT, shutdown)
app.aboutToQuit.connect(shutdown)
# Timer to allow Python's signal handler to run
timer = QTimer()
timer.start(500)
timer.timeout.connect(lambda: None)
sys.exit(app.exec())
if __name__ == "__main__":
QCoreApplication.setOrganizationName("Bliksem Labs B.V.")
QCoreApplication.setOrganizationDomain("bliksemlabs.com")
QCoreApplication.setApplicationName("MeshCoreGUI")
main()

View File

View File

@@ -0,0 +1,29 @@
from typing import Any
class Channel:
def __init__(self, payload: dict[str, Any]):
self.payload = payload
self.new_message = 0
def set_new_message(self, value: int):
if value == 0:
self.new_message = 0
else:
self.new_message += value
@property
def channel_idx(self) -> int | None:
return self.payload.get("channel_idx")
@property
def name(self) -> str:
return self.payload.get("channel_name", "Unnamed channel")
@property
def secret(self) -> bytes | None:
return self.payload.get("channel_secret")
def is_valid(self) -> bool:
# Ignore channel if secret is 16 null-bytes
return self.secret is not None and self.secret != b"\x00" * 16

View File

@@ -0,0 +1,63 @@
from PySide6.QtCore import QAbstractListModel, QModelIndex, Qt
from PySide6.QtGui import QFont, QBrush, QColor
from .channel import Channel
class ChannelListModel(QAbstractListModel):
def __init__(self, channels: list[Channel] | None = None):
super().__init__()
self._channels: list[Channel] = channels or []
def rowCount(self, parent: QModelIndex = QModelIndex()) -> int:
return len(self._channels)
def data(self, index: QModelIndex, role: int = Qt.DisplayRole):
if not index.isValid():
return None
channel = self._channels[index.row()]
if role == Qt.DisplayRole:
return channel.name
elif role == Qt.UserRole:
return channel.payload
elif role == Qt.FontRole and channel.new_message > 0:
font = QFont()
font.setBold(True)
return font
elif role == Qt.ForegroundRole and channel.new_message > 0:
return QBrush(QColor("red")) # Mark new messages
return None
def add_or_update(self, channel: Channel):
# Update existing channel by channel_idx
for row, existing in enumerate(self._channels):
if existing.channel_idx == channel.channel_idx:
self._channels[row] = channel
idx = self.index(row)
self.dataChanged.emit(idx, idx, [Qt.DisplayRole, Qt.UserRole])
return
# New channel
self.beginInsertRows(QModelIndex(), len(self._channels), len(self._channels))
self._channels.append(channel)
self.endInsertRows()
def set_new_message(self, channel_idx: int, reset: bool = False):
for row, item in enumerate(self._channels):
if item.channel_idx == channel_idx:
if reset:
item.set_new_message(0)
else:
item.set_new_message(1)
idx = self.index(row, 0)
self.dataChanged.emit(
idx, idx, [Qt.DisplayRole, Qt.FontRole, Qt.ForegroundRole]
)
break

View File

@@ -0,0 +1,23 @@
from typing import Any
class Contact:
"""Simple container for contact payload"""
def __init__(self, payload: dict[str, Any]):
self.payload = payload
self.new_message = 0
def set_new_message(self, value: int):
if value == 0:
self.new_message = 0
else:
self.new_message += value
@property
def name(self) -> str:
return self.payload.get("adv_name", "Unknown")
@property
def public_key(self) -> str | None:
return self.payload.get("public_key")

View File

@@ -0,0 +1,77 @@
from PySide6.QtCore import QAbstractListModel, QModelIndex, Qt
from PySide6.QtGui import QFont, QBrush, QColor
from .contact import Contact
class ContactListModel(QAbstractListModel):
"""Model that manages a list of contacts for a QListView"""
def __init__(self, contacts: list[Contact] | None = None):
super().__init__()
self._contacts = contacts or []
def data(self, index: QModelIndex, role: int = Qt.DisplayRole):
if not index.isValid():
return None
contact = self._contacts[index.row()]
if role == Qt.DisplayRole:
return contact.name
elif role == Qt.UserRole:
return contact.payload
elif role == Qt.FontRole and contact.new_message > 0:
font = QFont()
font.setBold(True)
return font
elif role == Qt.ForegroundRole and contact.new_message > 0:
return QBrush(QColor("red"))
return None
def rowCount(self, parent: QModelIndex = QModelIndex()) -> int:
return len(self._contacts)
def add_contact(self, contact: Contact):
"""Add a new contact or update an existing one based on public_key"""
for row, existing in enumerate(self._contacts):
if existing.public_key == contact.public_key:
self._contacts[row] = contact
index = self.index(row)
self.dataChanged.emit(index, index, [Qt.DisplayRole, Qt.UserRole])
return
# If not found, add as a new contact
self.beginInsertRows(QModelIndex(), len(self._contacts), len(self._contacts))
self._contacts.append(contact)
self.endInsertRows()
def remove_contact(self, row: int):
"""Remove contact at index"""
if 0 <= row < len(self._contacts):
self.beginRemoveRows(QModelIndex(), row, row)
self._contacts.pop(row)
self.endRemoveRows()
def clear(self):
"""Remove all contacts"""
self.beginResetModel()
self._contacts.clear()
self.endResetModel()
def set_new_message(self, pubkey_prefix: str, reset: bool = False):
for row, item in enumerate(self._contacts):
if item.public_key and item.public_key.startswith(pubkey_prefix):
if reset:
item.set_new_message(0)
else:
item.set_new_message(1)
idx = self.index(row, 0)
self.dataChanged.emit(
idx, idx, [Qt.DisplayRole, Qt.FontRole, Qt.ForegroundRole]
)
break

View File

@@ -0,0 +1,22 @@
from datetime import datetime
from typing import Any
class Message:
def __init__(self, payload: dict[str, Any]):
self.payload = payload
@property
def timestamp(self) -> datetime | None:
ts = self.payload.get("sender_timestamp")
return datetime.fromtimestamp(ts) if ts else None
@property
def text(self) -> str:
return self.payload.get("text", "")
def formatted(self) -> str:
if self.timestamp:
time_str = self.timestamp.strftime("%Y-%m-%d %H:%M:%S")
return f"[{time_str}] {self.text}"
return self.text

View File

@@ -0,0 +1,36 @@
from PySide6.QtCore import QAbstractListModel, QModelIndex, Qt
from .message import Message
class MessageListModel(QAbstractListModel):
def __init__(self, messages: list[Message] | None = None):
super().__init__()
self._messages = messages or []
def rowCount(self, parent: QModelIndex = QModelIndex()) -> int:
return len(self._messages)
def data(self, index: QModelIndex, role: int = Qt.DisplayRole):
if not index.isValid():
return None
msg = self._messages[index.row()]
if role == Qt.DisplayRole:
return msg.formatted()
if role == Qt.UserRole:
return msg.payload
return None
def add_message(self, message: Message):
self.beginInsertRows(QModelIndex(), len(self._messages), len(self._messages))
self._messages.append(message)
self.endInsertRows()
def clear(self):
self.beginResetModel()
self._messages.clear()
self.endResetModel()

View File

View File

@@ -0,0 +1,29 @@
import asyncio
from typing import Optional, Coroutine
from PySide6.QtCore import QObject, Signal
class AsyncExecutor(QObject):
def __init__(self, loop: asyncio.AbstractEventLoop):
super().__init__()
self.loop = loop
def submit_async(self, coro: Coroutine, signal: Optional[Signal] = None):
"""Submit coroutine and emit signal on completion"""
future = asyncio.run_coroutine_threadsafe(coro, self.loop)
if signal:
future.add_done_callback(lambda f: self._emit_safe(f, signal))
def _emit_safe(self, fut: asyncio.Future, signal: Signal):
try:
result = fut.result()
except Exception as e:
print("Async call failed:", e)
return
signal.emit(result)
def shutdown(self):
"""Stop the loop safely"""
if self.loop.is_running():
self.loop.call_soon_threadsafe(self.loop.stop)

View File

@@ -0,0 +1,63 @@
import logging
from typing import Any
_LOGGER = logging.getLogger(__name__)
def parse_rx_log_data(payload: Any) -> dict[str, Any]:
"""Parse RX_LOG event payload to extract LoRa packet details.
Expected format (hex):
byte0: header
byte1: path_len
next path_len bytes: path nodes
next byte: channel_hash (optional)
"""
result: dict[str, Any] = {}
try:
hex_str = None
if isinstance(payload, dict):
hex_str = payload.get("payload") or payload.get("raw_hex")
elif isinstance(payload, (str, bytes)):
hex_str = payload
if not hex_str:
return result
if isinstance(hex_str, bytes):
hex_str = hex_str.hex()
hex_str = (
str(hex_str).lower().replace(" ", "").replace("\n", "").replace("\r", "")
)
if len(hex_str) < 4:
return result
result["header"] = hex_str[0:2]
try:
path_len = int(hex_str[2:4], 16)
result["path_len"] = path_len
except ValueError:
return {}
path_start = 4
path_end = path_start + (path_len * 2)
if len(hex_str) < path_end:
return {}
path_hex = hex_str[path_start:path_end]
result["path"] = path_hex
result["path_nodes"] = [path_hex[i : i + 2] for i in range(0, len(path_hex), 2)]
if len(hex_str) >= path_end + 2:
result["channel_hash"] = hex_str[path_end : path_end + 2]
except Exception as ex:
_LOGGER.debug(f"Error parsing RX_LOG data: {ex}")
return result

View File

View File

@@ -0,0 +1,92 @@
from typing import Any, TYPE_CHECKING
from PySide6.QtCore import Signal, Slot
from PySide6.QtGui import QCloseEvent
from PySide6.QtWidgets import (
QWidget,
QVBoxLayout,
QLabel,
QPushButton,
QListView,
QHBoxLayout,
QLineEdit,
)
if TYPE_CHECKING:
from controllers.meshcore_controller import MeshCoreController
class MeshCoreWidget(QWidget):
text_submitted = Signal(object, str)
def __init__(self):
super().__init__()
self.controller: "MeshCoreController" | None = None
self.current_target: dict[str, Any] | None = None
# GUI layout
self.layout = QVBoxLayout(self)
self.label_info = QLabel("Node info: ...")
self.button_adv = QPushButton("Adv")
self.list_contacts = QListView()
self.list_channels = QListView()
self.list_msgs = QListView()
option_layout = QHBoxLayout()
option_layout.addWidget(self.label_info, stretch=1)
option_layout.addWidget(self.button_adv)
option_panel = QWidget()
option_panel.setLayout(option_layout)
self.layout.addWidget(option_panel)
self.layout.addWidget(QLabel("Contacts:"))
self.layout.addWidget(self.list_contacts)
self.layout.addWidget(QLabel("Channels:"))
self.layout.addWidget(self.list_channels)
self.layout.addWidget(QLabel("Messages:"))
self.layout.addWidget(self.list_msgs)
# --- Input area ---
self.input_line = QLineEdit()
self.input_line.setPlaceholderText("Type a message…")
self.send_button = QPushButton("Send")
input_layout = QHBoxLayout()
input_layout.addWidget(self.input_line, stretch=1)
input_layout.addWidget(self.send_button)
input_panel = QWidget()
input_panel.setLayout(input_layout)
self.layout.addWidget(input_panel)
self.send_button.clicked.connect(self._on_send_clicked)
self.input_line.returnPressed.connect(self._on_send_clicked)
@Slot(object)
def set_target(self, target: dict[str, Any]):
self.current_target = target
def _on_send_clicked(self):
if self.current_target is not None:
text = self.input_line.text().strip()
if text:
self.text_submitted.emit(self.current_target, text)
@Slot(object)
def on_sent(self, event: Any):
self.input_line.clear()
@Slot(object)
def update_self_info(self, info: dict[str, Any]):
self.label_info.setText(f"Node info: {info.get('name', 'N/A')}")
def setController(self, controller: "MeshCoreController"):
self.controller = controller
def closeEvent(self, event: QCloseEvent):
if self.controller:
# Stop streaming and executor loop
self.controller.shutdown()
event.accept() # Allow the window to close

View File

View File

@@ -0,0 +1,133 @@
{
"version": "community-presets-1.0",
"source": "MeshCore mobile app community suggested presets",
"notes": "These presets are community-suggested RF settings as exposed in the MeshCore app UI. They are not guaranteed to be legally valid in all regions.",
"presets": [
{
"id": "australia",
"name": "Australia",
"region": "Australia",
"frequency_mhz": 915.800,
"spreading_factor": 10,
"bandwidth_khz": 250,
"coding_rate": 5
},
{
"id": "australia_victoria",
"name": "Australia: Victoria",
"region": "Australia",
"frequency_mhz": 916.575,
"spreading_factor": 7,
"bandwidth_khz": 62.5,
"coding_rate": 8
},
{
"id": "eu_uk_narrow",
"name": "EU/UK (Narrow)",
"region": "EU/UK",
"frequency_mhz": 869.618,
"spreading_factor": 8,
"bandwidth_khz": 62.5,
"coding_rate": 8
},
{
"id": "eu_uk_long_range",
"name": "EU/UK (Long Range)",
"region": "EU/UK",
"frequency_mhz": 869.525,
"spreading_factor": 11,
"bandwidth_khz": 250,
"coding_rate": 5
},
{
"id": "eu_uk_medium_range",
"name": "EU/UK (Medium Range)",
"region": "EU/UK",
"frequency_mhz": 869.525,
"spreading_factor": 10,
"bandwidth_khz": 250,
"coding_rate": 5
},
{
"id": "czech_republic_narrow",
"name": "Czech Republic (Narrow)",
"region": "Czech Republic",
"frequency_mhz": 869.525,
"spreading_factor": 7,
"bandwidth_khz": 62.5,
"coding_rate": 5
},
{
"id": "eu_433_long_range",
"name": "EU 433MHz (Long Range)",
"region": "EU 433MHz",
"frequency_mhz": 433.650,
"spreading_factor": 11,
"bandwidth_khz": 250,
"coding_rate": 5
},
{
"id": "new_zealand",
"name": "New Zealand",
"region": "New Zealand",
"frequency_mhz": 917.375,
"spreading_factor": 11,
"bandwidth_khz": 250,
"coding_rate": 5
},
{
"id": "new_zealand_narrow",
"name": "New Zealand (Narrow)",
"region": "New Zealand",
"frequency_mhz": 917.375,
"spreading_factor": 7,
"bandwidth_khz": 62.5,
"coding_rate": 5
},
{
"id": "portugal_433",
"name": "Portugal 433",
"region": "Portugal",
"frequency_mhz": 433.375,
"spreading_factor": 9,
"bandwidth_khz": 62.5,
"coding_rate": 6
},
{
"id": "portugal_868",
"name": "Portugal 868",
"region": "Portugal",
"frequency_mhz": 869.618,
"spreading_factor": 7,
"bandwidth_khz": 62.5,
"coding_rate": 6
},
{
"id": "switzerland",
"name": "Switzerland",
"region": "Switzerland",
"frequency_mhz": 869.618,
"spreading_factor": 8,
"bandwidth_khz": 62.5,
"coding_rate": 8
},
{
"id": "usa_canada_recommended",
"name": "USA/Canada (Recommended)",
"region": "USA/Canada",
"frequency_mhz": 910.525,
"spreading_factor": 7,
"bandwidth_khz": 62.5,
"coding_rate": 5
},
{
"id": "vietnam",
"name": "Vietnam",
"region": "Vietnam",
"frequency_mhz": 920.250,
"spreading_factor": 11,
"bandwidth_khz": 250,
"coding_rate": 5
}
]
}

View File

@@ -0,0 +1,61 @@
from PySide6.QtWidgets import QWidget, QVBoxLayout, QComboBox
from PySide6.QtCore import QFile, QIODevice
from PySide6.QtCore import QJsonDocument
class PresetSelector(QWidget):
def __init__(self, json_file):
super().__init__()
self.setWindowTitle("Community Presets")
# Layout
layout = QVBoxLayout(self)
# ComboBox
self.combo = QComboBox()
layout.addWidget(self.combo)
# Load JSON and populate combobox
self.load_presets(json_file)
def load_presets(self, json_file):
file = QFile(json_file)
if not file.open(QIODevice.ReadOnly | QIODevice.Text):
raise RuntimeError(f"Cannot open {json_file}")
data = file.readAll()
file.close()
doc = QJsonDocument.fromJson(data)
if not doc.isObject():
raise ValueError("Invalid JSON root object")
root = doc.object()
presets = root.get("presets", [])
for preset in presets:
preset_lookup = "-".join(
[
str(preset["frequency_mhz"]),
str(int(preset["bandwidth_khz"])),
str(preset["spreading_factor"]),
str(preset["coding_rate"]),
]
)
self.combo.addItem(preset["name"], userData=preset_lookup)
def select_by_self_info(self, self_info: dict):
preset_lookup = "-".join(
[
str(self_info["radio_freq"]),
str(int(self_info["radio_bw"])),
str(self_info["radio_sf"]),
str(self_info["radio_cr"]),
]
)
index = self.combo.findData(preset_lookup)
if index != -1:
self.combo.setCurrentIndex(index)
else:
self.combo.addItem("Custom", userData=preset_lookup)
index = self.combo.findData(preset_lookup)
self.combo.setCurrentIndex(index)

View File

@@ -0,0 +1,42 @@
from PySide6.QtWidgets import (
QWizardPage,
QVBoxLayout,
QLabel,
QRadioButton,
QButtonGroup,
)
# ========================
# Wizard Pages
# ========================
class ConnectionTypePage(QWizardPage):
def __init__(self):
super().__init__()
self.setTitle("Choose connection type")
layout = QVBoxLayout()
self.setLayout(layout)
layout.addWidget(QLabel("In which way do you wat to connect?"))
self.serial_radio = QRadioButton("Serial")
self.serial_radio.setChecked(True)
self.bt_radio = QRadioButton("Bluetooth")
self.bt_radio.setEnabled(True)
self.tcp_radio = QRadioButton("TCP/IP")
self.tcp_radio.setEnabled(False)
layout.addWidget(self.serial_radio)
layout.addWidget(self.bt_radio)
layout.addWidget(self.tcp_radio)
self.group = QButtonGroup()
self.group.addButton(self.serial_radio)
self.group.addButton(self.bt_radio)
self.group.addButton(self.tcp_radio)
def validatePage(self) -> bool:
return (
self.bt_radio.isChecked()
or self.serial_radio.isChecked()
or self.tcp_radio.isChecked()
)

View File

@@ -0,0 +1,74 @@
from PySide6.QtWidgets import (
QWizardPage,
QVBoxLayout,
QLabel,
QListWidget,
QLineEdit,
)
from PySide6.QtCore import Slot
from .mc_bluetooth import BluetoothConnection
from .mc_serial import SerialConnection
class DeviceSelectionPage(QWizardPage):
def __init__(self, wizard):
super().__init__()
self.setTitle("Select Device")
self.wizard_ref = wizard
self.layout = QVBoxLayout()
self.setLayout(self.layout)
self.label = QLabel("Select Device:")
self.layout.addWidget(self.label)
self.device_list = QListWidget()
self.ip_field = QLineEdit()
self.ip_field.setPlaceholderText("Hostname / IP")
self.port_field = QLineEdit()
self.port_field.setPlaceholderText("Port")
def initializePage(self):
conn_page = self.wizard_ref.page(0)
# Clear old widgets
self.device_list.clear()
self.layout.removeWidget(self.device_list)
self.layout.removeWidget(self.ip_field)
self.layout.removeWidget(self.port_field)
if conn_page.bt_radio.isChecked():
self.wizard_ref.connection = BluetoothConnection(
self.wizard_ref.application_controller
)
self.label.setText("Select a Bluetooth device")
self.layout.addWidget(self.device_list)
agent = self.wizard_ref.connection.agent
agent.deviceDiscovered.connect(self.add_device)
agent.finished.connect(self.scan_finished)
agent.start()
elif conn_page.serial_radio.isChecked():
self.wizard_ref.connection = SerialConnection(
self.wizard_ref.application_controller
)
self.label.setText("Select a serial port")
devices = self.wizard_ref.connection.scan_devices()
self.device_list.addItems(devices)
self.layout.addWidget(self.device_list)
elif conn_page.tcp_radio.isChecked():
self.label.setText("TCP/IP connection")
self.layout.addWidget(self.ip_field)
self.layout.addWidget(self.port_field)
@Slot(object)
def add_device(self, device_info):
self.device_list.addItem(device_info.name())
self.wizard_ref.connection.device_list.append(device_info)
@Slot()
def scan_finished(self):
pass

View File

@@ -0,0 +1,116 @@
from PySide6.QtWidgets import (
QWizardPage,
QVBoxLayout,
QLabel,
QLineEdit,
QPushButton,
)
from pathlib import Path
from .mc_bluetooth import BluetoothConnection
from .mc_serial import SerialConnection
from .community_presets import PresetSelector
class DeviceStatusPage(QWizardPage):
def __init__(self, wizard):
super().__init__()
self.setTitle("Check device properties")
self.wizard_ref = wizard
layout = QVBoxLayout()
self.setLayout(layout)
self.name_label = QLabel("Name")
self.name = QLineEdit()
layout.addWidget(self.name_label)
layout.addWidget(self.name)
self.pubkey = QLineEdit()
self.pubkey.setEnabled(False)
self.pubkey_label = QLabel("Public Key")
layout.addWidget(self.pubkey_label)
layout.addWidget(self.pubkey)
base_path = Path(__file__).parent
self.profile_dropdown = PresetSelector(
base_path / "community-presets.json"
)
layout.addWidget(self.profile_dropdown)
self.button_save = QPushButton("Save")
self.button_save.setEnabled(False)
layout.addWidget(self.button_save)
self._initial_name = None
self._initial_profile = None
self.name.textChanged.connect(self._update_dirty_state)
self.profile_dropdown.combo.currentIndexChanged.connect(
self._update_dirty_state
)
self.button_save.clicked.connect(self._on_save_clicked)
def _update_dirty_state(self) -> None:
name_changed = self.name.text() != self._initial_name
profile_changed = (
self.profile_dropdown.combo.currentData() != self._initial_profile
)
self._is_dirty = name_changed or profile_changed
self.button_save.setEnabled(self._is_dirty)
def _on_save_clicked(self) -> None:
if not self._is_dirty:
return
self._save_data()
def _save_data(self) -> None:
new_name = self.name.text()
new_profile = self.profile_dropdown.combo.currentData()
if new_name != self._initial_name:
self.wizard_ref.application_controller.meshcore_controller.set_name(
new_name
)
self._initial_name = new_name
if new_profile != self._initial_profile:
freq, bw, sf, cr = new_profile.split("-")
self.wizard_ref.application_controller.meshcore_controller.set_radio(
float(freq), float(bw), int(sf), int(cr)
)
self._initial_profile = new_profile
self._is_dirty = False
self.button_save.setEnabled(False)
def validatePage(self) -> bool:
if self._is_dirty:
self._save_data()
return True
def update_profile(self, application_controller):
self.info = self.wizard_ref.application_controller.mc.self_info
if "name" in self.info:
self.name.setText(self.info["name"])
self.pubkey.setText(self.info["public_key"])
self.profile_dropdown.select_by_self_info(self.info)
self._initial_name = self.name.text()
self._initial_profile = self.profile_dropdown.combo.currentData()
self._is_dirty = False
self.button_save.setEnabled(False)
def initializePage(self):
conn = self.wizard_ref.connection
if isinstance(conn, SerialConnection):
conn.connect(
self.wizard_ref.page(1).device_list.currentItem().text(),
self.update_profile,
)
elif isinstance(conn, BluetoothConnection):
conn.connect(
self.wizard_ref.page(1).device_list.currentItem().text(),
self.update_profile,
)

View File

@@ -0,0 +1,53 @@
from PySide6.QtCore import QObject, Slot, Signal, QSettings
from PySide6.QtBluetooth import (
QBluetoothDeviceDiscoveryAgent,
QBluetoothDeviceInfo,
)
class BluetoothConnection(QObject):
"""
Bluetooth connection implemented using PySide6 QtBluetooth.
Supports asynchronous scanning with live updates, connecting via RFCOMM,
and returning device info after successful connection.
"""
device_discovered = Signal(QBluetoothDeviceInfo) # signaal voor GUI live update
scan_finished = Signal() # signaal als scan klaar is
def __init__(self, application_controller):
super().__init__()
self.application_controller = application_controller
self.device_list = [] # opgeslagen QBluetoothDeviceInfo objecten
self.connected_socket = None # QBluetoothSocket
self.device_info = None # dict met name/public_key
self.agent = QBluetoothDeviceDiscoveryAgent()
self.agent.setLowEnergyDiscoveryTimeout(5000)
# Verbind agent signalen naar interne slots
self.agent.deviceDiscovered.connect(self._on_device_discovered)
self.agent.finished.connect(self._on_scan_finished)
def start_scan(self):
self.device_list.clear()
self.agent.start()
@Slot(QBluetoothDeviceInfo)
def _on_device_discovered(self, device_info: QBluetoothDeviceInfo):
self.device_list.append(device_info)
self.device_discovered.emit(device_info)
@Slot()
def _on_scan_finished(self):
self.scan_finished.emit()
def connect(self, device_name_or_address: str, slot) -> bool:
print(device_name_or_address)
settings = QSettings()
settings.beginGroup("interface")
settings.setValue("type", "bluetooth")
settings.setValue("addr", device_name_or_address)
settings.endGroup()
if self.application_controller.attempt_connection():
slot(self.application_controller)

View File

@@ -0,0 +1,39 @@
from PySide6.QtSerialPort import QSerialPortInfo
from PySide6.QtCore import QSettings
class SerialConnection:
"""
Serial connection class using PySide6 QtSerialPort.
Only scans available serial ports on the host system.
"""
def __init__(self, application_controller):
self.application_controller = application_controller
self.device_list = []
self.connected_port = None
self.device_info = None
def scan_devices(self) -> list[str]:
"""
Scan beschikbare seriële poorten en retourneer een lijst van naamstrings.
"""
self.device_list = QSerialPortInfo.availablePorts()
# Return human-readable strings, bijv. "COM3 - USB Serial"
devices = []
for port in self.device_list:
name = port.systemLocation() # COM3 or /dev/ttyUSB0
desc = port.description() # bv. "USB Serial"
devices.append(f"{name} - {desc}")
return devices
def connect(self, device_string: str, slot) -> bool:
settings = QSettings()
settings.beginGroup("interface")
settings.setValue("type", "serial")
settings.setValue("port", device_string.split(" - ")[0]) # TODO
settings.setValue("baudrate", 115200)
settings.endGroup()
if self.application_controller.attempt_connection():
slot(self.application_controller)

View File

@@ -0,0 +1,44 @@
from PySide6.QtWidgets import QApplication, QWizard
from PySide6.QtCore import Signal
from .companion.connection import ConnectionTypePage
from .companion.deviceselection import DeviceSelectionPage
from .companion.devicestatus import DeviceStatusPage
# ========================
# Wizard
# ========================
class CompanionWizard(QWizard):
completed = Signal(bool)
def accept(self):
self.completed.emit(True)
super().accept()
def reject(self):
self.completed.emit(False)
super().reject()
def __init__(self, application_controller):
super().__init__()
self.application_controller = application_controller
self.addPage(ConnectionTypePage())
self.addPage(DeviceSelectionPage(self))
self.addPage(DeviceStatusPage(self))
self.setWindowTitle("Companion Wizard")
# ========================
# Main
# ========================
if __name__ == "__main__":
import sys
app = QApplication(sys.argv)
wizard = CompanionWizard()
wizard.show()
sys.exit(app.exec())