matrix: create potato-matrix-bridge (#528)

* matrix: create potato-matrix-bridge

* matrix: add unit tests

* matrix: address review comments

* ci: condition github actions to only run on paths affected...

* Add comprehensive unit tests for config, matrix, potatomesh, and main modules

* Revert "Add comprehensive unit tests for config, matrix, potatomesh, and main modules"

This reverts commit 212522b4a2.

* matrix: add unit tests

* matrix: add unit tests

* matrix: add unit tests
This commit is contained in:
l5y
2025-11-29 08:52:20 +01:00
committed by GitHub
parent 7160d72aae
commit 2bd69415c1
14 changed files with 1575 additions and 0 deletions

View File

@@ -36,6 +36,8 @@ jobs:
include:
- language: python
build-mode: none
- language: rust
build-mode: none
- language: ruby
build-mode: none
- language: javascript-typescript

View File

@@ -19,6 +19,9 @@ on:
branches: [ "main" ]
pull_request:
branches: [ "main" ]
paths:
- 'web/**'
- 'tests/**'
permissions:
contents: read
@@ -47,6 +50,7 @@ jobs:
files: web/reports/javascript-coverage.json
flags: frontend
name: frontend
fail_ci_if_error: false
env:
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}
- name: Upload test results to Codecov

View File

@@ -19,6 +19,9 @@ on:
branches: [ "main" ]
pull_request:
branches: [ "main" ]
paths:
- 'app/**'
- 'tests/**'
permissions:
contents: read
@@ -63,5 +66,6 @@ jobs:
files: coverage/lcov.info
flags: flutter-mobile
name: flutter-mobile
fail_ci_if_error: false
env:
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}

View File

@@ -19,6 +19,9 @@ on:
branches: [ "main" ]
pull_request:
branches: [ "main" ]
paths:
- 'data/**'
- 'tests/**'
permissions:
contents: read
@@ -47,6 +50,7 @@ jobs:
token: ${{ secrets.CODECOV_TOKEN }}
files: reports/python-coverage.xml
flags: python-ingestor
fail_ci_if_error: false
name: python-ingestor
env:
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}

View File

@@ -19,6 +19,9 @@ on:
branches: [ "main" ]
pull_request:
branches: [ "main" ]
paths:
- 'web/**'
- 'tests/**'
permissions:
contents: read

78
.github/workflows/rust.yml vendored Normal file
View File

@@ -0,0 +1,78 @@
# Copyright © 2025-26 l5yth & contributors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
name: Rust
on:
push:
branches: [ "main" ]
pull_request:
branches: [ "main" ]
paths:
- '.github/**'
- 'matrix/**'
- 'tests/**'
permissions:
contents: read
jobs:
matrix:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v5
- name: Install Rust toolchain
uses: dtolnay/rust-toolchain@stable
- name: Cache Cargo registry
uses: actions/cache@v4
with:
path: |
~/.cargo/registry
~/.cargo/git
./matrix/target
key: ${{ runner.os }}-cargo-${{ hashFiles('**/Cargo.toml', '**/Cargo.lock') }}
restore-keys: |
${{ runner.os }}-cargo-
- name: Show rustc version
run: rustc --version
- name: Install llvm-tools-preview component
run: rustup component add llvm-tools-preview --toolchain stable
- name: Install cargo-llvm-cov
working-directory: ./matrix
run: cargo install cargo-llvm-cov --locked
- name: Check formatting
working-directory: ./matrix
run: cargo fmt --all -- --check
- name: Clippy lint
working-directory: ./matrix
run: cargo clippy --all-targets --all-features -- -D warnings
- name: Build
working-directory: ./matrix
run: cargo build --all --all-features
- name: Test
working-directory: ./matrix
run: cargo test --all --all-features --verbose
- name: Run tests with coverage
working-directory: ./matrix
run: |
cargo llvm-cov --all-features --workspace --lcov --output-path coverage.lcov
- name: Upload coverage to Codecov
uses: codecov/codecov-action@v5
with:
token: ${{ secrets.CODECOV_TOKEN }}
files: ./matrix/coverage.lcov
flags: matrix-bridge
name: matrix-bridge
fail_ci_if_error: false

3
matrix/.gitignore vendored Normal file
View File

@@ -0,0 +1,3 @@
target/
Cargo.lock
coverage.lcov

20
matrix/Cargo.toml Normal file
View File

@@ -0,0 +1,20 @@
[package]
name = "potatomesh-matrix-bridge"
version = "0.5.7"
edition = "2021"
[dependencies]
tokio = { version = "1", features = ["macros", "rt-multi-thread", "time"] }
reqwest = { version = "0.12", features = ["json", "rustls-tls"] }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
toml = "0.9"
anyhow = "1"
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["fmt", "env-filter"] }
urlencoding = "2"
[dev-dependencies]
tempfile = "3"
mockito = "1"
serial_test = "3"

19
matrix/Config.toml Normal file
View File

@@ -0,0 +1,19 @@
[potatomesh]
# Base URL without trailing slash
base_url = "https://potatomesh.net/api"
# Poll interval in seconds
poll_interval_secs = 60
[matrix]
# Homeserver base URL (client API) without trailing slash
homeserver = "https://matrix.example.org"
# Appservice access token (from your registration.yaml)
as_token = "YOUR_APPSERVICE_AS_TOKEN"
# Server name (domain) part of Matrix user IDs
server_name = "example.org"
# Room ID to send into (must be joined by the appservice / puppets)
room_id = "!yourroomid:example.org"
[state]
# Where to persist last seen message id (optional but recommended)
state_file = "bridge_state.json"

249
matrix/README.md Normal file
View File

@@ -0,0 +1,249 @@
# potatomesh-matrix-bridge
A small Rust daemon that bridges **PotatoMesh** LoRa messages into a **Matrix** room.
For each PotatoMesh node, the bridge creates (or uses) a **Matrix puppet user**:
- Matrix localpart: the hex node id (without `!`), e.g. `!67fc83cb``@67fc83cb:example.org`
- Matrix display name: the nodes `long_name` from the PotatoMesh API
Messages from PotatoMesh are periodically fetched and forwarded to a single Matrix room as those puppet users.
---
## Features
- Polls `https://potatomesh.net/api/messages` (or any configured base URL)
- Looks up node metadata via `GET /nodes/{hex}` and caches it
- One Matrix user per node:
- username: hex node id
- display name: `long_name`
- Forwards `TEXT_MESSAGE_APP` messages into a single Matrix room
- Persists last-seen message ID to avoid duplicates across restarts
---
## Architecture Overview
- **PotatoMesh side**
- `GET /messages` returns an array of messages
- `GET /nodes/{hex}` returns node metadata (including `long_name`)
- **Matrix side**
- Uses the Matrix Client-Server API with an **appservice access token**
- Impersonates puppet users via `user_id=@{hex}:{server_name}&access_token={as_token}`
- Sends `m.room.message` events into a configured room
This is **not** a full appservice framework; it just speaks the minimal HTTP needed.
---
## Requirements
- Rust (stable) and `cargo`
- A Matrix homeserver you control (e.g. Synapse)
- An **application service registration** on your homeserver that:
- Whitelists the puppet user namespace (e.g. `@[0-9a-f]{8}:example.org`)
- Provides an `as_token` the bridge can use
- Network access from the bridge host to:
- `https://potatomesh.net/api` (or your configured PotatoMesh API)
- Your Matrix homeserver (`https://matrix.example.org`)
---
## Configuration
All configuration is in `Config.toml` in the project root.
Example:
```toml
[potatomesh]
# Base URL without trailing slash
base_url = "https://potatomesh.net/api"
# Poll interval in seconds
poll_interval_secs = 10
[matrix]
# Homeserver base URL (client API) without trailing slash
homeserver = "https://matrix.example.org"
# Appservice access token (from your registration.yaml)
as_token = "YOUR_APPSERVICE_AS_TOKEN"
# Server name (domain) part of Matrix user IDs
server_name = "example.org"
# Room ID to send into (must be joined by the appservice / puppets)
room_id = "!yourroomid:example.org"
[state]
# Where to persist last seen message id
state_file = "bridge_state.json"
````
### PotatoMesh API
The bridge assumes:
* Messages: `GET {base_url}/messages` JSON array, for example:
```json
[
{
"id": 2947676906,
"rx_time": 1764241436,
"rx_iso": "2025-11-27T11:03:56Z",
"from_id": "!da6556d4",
"to_id": "^all",
"channel": 1,
"portnum": "TEXT_MESSAGE_APP",
"text": "Ping",
"rssi": -111,
"hop_limit": 1,
"lora_freq": 868,
"modem_preset": "MediumFast",
"channel_name": "TEST",
"snr": -9.0,
"node_id": "!06871773"
}
]
```
* Nodes: `GET {base_url}/nodes/{hex}` JSON, for example:
```json
{
"node_id": "!67fc83cb",
"short_name": "83CB",
"long_name": "Meshtastic 83CB",
"role": "CLIENT_HIDDEN",
"last_heard": 1764250515,
"first_heard": 1758993817,
"last_seen_iso": "2025-11-27T13:35:15Z"
}
```
Node hex ID is derived from `node_id` by stripping the leading `!` and using the remainder as the Matrix localpart.
---
## Matrix Appservice Setup (Synapse example)
You need an appservice registration file (e.g. `potatomesh-bridge.yaml`) configured in Synapse.
A minimal example sketch (you **must** adjust URLs, secrets, namespaces):
```yaml
id: potatomesh-bridge
url: "http://your-bridge-host:8080" # not used by this bridge if it only calls out
as_token: "YOUR_APPSERVICE_AS_TOKEN"
hs_token: "SECRET_HS_TOKEN"
sender_localpart: "potatomesh-bridge"
rate_limited: false
namespaces:
users:
- exclusive: true
regex: "@[0-9a-f]{8}:example.org"
```
For this bridge, only the `as_token` and `namespaces.users` actually matter. The bridge does not accept inbound events; it only uses the `as_token` to call the homeserver.
In Synapses `homeserver.yaml`, add the registration file under `app_service_config_files`, restart, and invite a puppet user to your target room (or use room ID directly).
---
## Build
```bash
# clone
git clone https://github.com/YOUR_USER/potatomesh-matrix-bridge.git
cd potatomesh-matrix-bridge
# build
cargo build --release
```
The resulting binary will be at:
```bash
target/release/potatomesh-matrix-bridge
```
---
## Run
Ensure `Config.toml` is present and valid, then:
```bash
./target/release/potatomesh-matrix-bridge
```
Environment variables you may care about:
* `RUST_LOG` for logging, e.g.:
```bash
RUST_LOG=info,reqwest=warn ./target/release/potatomesh-matrix-bridge
```
The bridge will:
1. Load state from `bridge_state.json` (if present).
2. Poll PotatoMesh every `poll_interval_secs`.
3. For each new `TEXT_MESSAGE_APP`:
* Fetch node info.
* Ensure puppet is registered (`@{hex}:{server_name}`).
* Set puppet display name to `long_name`.
* Send a formatted text message into `room_id` as that puppet.
* Update and persist `bridge_state.json`.
Delete `bridge_state.json` if you want it to replay all currently available messages.
---
## Development
Run tests (currently mostly compile checks, no real tests yet):
```bash
cargo test
```
Format code:
```bash
cargo fmt
```
Lint (optional but recommended):
```bash
cargo clippy -- -D warnings
```
---
## GitHub Actions CI
This repository includes a GitHub Actions workflow (`.github/workflows/ci.yml`) that:
* runs on pushes and pull requests
* caches Cargo dependencies
* runs:
* `cargo fmt --check`
* `cargo clippy`
* `cargo test`
See the workflow file for details.
---
## Caveats & Future Work
* No E2EE: this bridge posts into unencrypted (or server-side managed) rooms. For encrypted rooms, youd need real E2EE support and key management.
* No inbound Matrix → PotatoMesh direction yet. This is a one-way bridge (PotatoMesh → Matrix).
* No pagination or `since` support on the PotatoMesh API. The bridge simply deduplicates by message `id` and stores the highest seen.
If you change the PotatoMesh API, adjust the types in `src/potatomesh.rs` accordingly.

143
matrix/src/config.rs Normal file
View File

@@ -0,0 +1,143 @@
use serde::Deserialize;
use std::{fs, path::Path};
#[derive(Debug, Deserialize, Clone)]
pub struct PotatomeshConfig {
pub base_url: String,
pub poll_interval_secs: u64,
}
#[derive(Debug, Deserialize, Clone)]
pub struct MatrixConfig {
pub homeserver: String,
pub as_token: String,
pub server_name: String,
pub room_id: String,
}
#[derive(Debug, Deserialize, Clone)]
pub struct StateConfig {
pub state_file: String,
}
#[derive(Debug, Deserialize, Clone)]
pub struct Config {
pub potatomesh: PotatomeshConfig,
pub matrix: MatrixConfig,
pub state: StateConfig,
}
impl Config {
pub fn load_from_file(path: &str) -> anyhow::Result<Self> {
let contents = fs::read_to_string(path)?;
let cfg = toml::from_str(&contents)?;
Ok(cfg)
}
pub fn from_default_path() -> anyhow::Result<Self> {
let path = "Config.toml";
if !Path::new(path).exists() {
anyhow::bail!("Config file {path} not found");
}
Self::load_from_file(path)
}
}
#[cfg(test)]
mod tests {
use super::*;
use serial_test::serial;
use std::io::Write;
#[test]
fn parse_minimal_config_from_toml_str() {
let toml_str = r#"
[potatomesh]
base_url = "https://potatomesh.net/api"
poll_interval_secs = 10
[matrix]
homeserver = "https://matrix.example.org"
as_token = "AS_TOKEN"
server_name = "example.org"
room_id = "!roomid:example.org"
[state]
state_file = "bridge_state.json"
"#;
let cfg: Config = toml::from_str(toml_str).expect("toml should parse");
assert_eq!(cfg.potatomesh.base_url, "https://potatomesh.net/api");
assert_eq!(cfg.potatomesh.poll_interval_secs, 10);
assert_eq!(cfg.matrix.homeserver, "https://matrix.example.org");
assert_eq!(cfg.matrix.as_token, "AS_TOKEN");
assert_eq!(cfg.matrix.server_name, "example.org");
assert_eq!(cfg.matrix.room_id, "!roomid:example.org");
assert_eq!(cfg.state.state_file, "bridge_state.json");
}
#[test]
fn load_from_file_not_found() {
let result = Config::load_from_file("file_that_does_not_exist.toml");
assert!(result.is_err());
}
#[test]
fn load_from_file_valid_file() {
let toml_str = r#"
[potatomesh]
base_url = "https://potatomesh.net/api"
poll_interval_secs = 10
[matrix]
homeserver = "https://matrix.example.org"
as_token = "AS_TOKEN"
server_name = "example.org"
room_id = "!roomid:example.org"
[state]
state_file = "bridge_state.json"
"#;
let mut file = tempfile::NamedTempFile::new().unwrap();
write!(file, "{}", toml_str).unwrap();
let result = Config::load_from_file(file.path().to_str().unwrap());
assert!(result.is_ok());
}
#[test]
#[serial]
fn from_default_path_not_found() {
let tmp_dir = tempfile::tempdir().unwrap();
std::env::set_current_dir(tmp_dir.path()).unwrap();
let result = Config::from_default_path();
assert!(result.is_err());
}
#[test]
#[serial]
fn from_default_path_found() {
let toml_str = r#"
[potatomesh]
base_url = "https://potatomesh.net/api"
poll_interval_secs = 10
[matrix]
homeserver = "https://matrix.example.org"
as_token = "AS_TOKEN"
server_name = "example.org"
room_id = "!roomid:example.org"
[state]
state_file = "bridge_state.json"
"#;
let tmp_dir = tempfile::tempdir().unwrap();
let file_path = tmp_dir.path().join("Config.toml");
let mut file = std::fs::File::create(file_path).unwrap();
write!(file, "{}", toml_str).unwrap();
std::env::set_current_dir(tmp_dir.path()).unwrap();
let result = Config::from_default_path();
assert!(result.is_ok());
}
}

321
matrix/src/main.rs Normal file
View File

@@ -0,0 +1,321 @@
mod config;
mod matrix;
mod potatomesh;
use std::{fs, path::Path};
use anyhow::Result;
use tokio::time::{sleep, Duration};
use tracing::{error, info};
use crate::config::Config;
use crate::matrix::MatrixAppserviceClient;
use crate::potatomesh::{PotatoClient, PotatoMessage};
#[derive(Debug, serde::Serialize, serde::Deserialize, Default)]
pub struct BridgeState {
last_message_id: Option<u64>,
}
impl BridgeState {
fn load(path: &str) -> Result<Self> {
if !Path::new(path).exists() {
return Ok(Self::default());
}
let data = fs::read_to_string(path)?;
let s: Self = serde_json::from_str(&data)?;
Ok(s)
}
fn save(&self, path: &str) -> Result<()> {
let data = serde_json::to_string_pretty(self)?;
fs::write(path, data)?;
Ok(())
}
fn should_forward(&self, msg: &PotatoMessage) -> bool {
match self.last_message_id {
None => true,
Some(last) => msg.id > last,
}
}
fn update_with(&mut self, msg: &PotatoMessage) {
self.last_message_id = Some(match self.last_message_id {
None => msg.id,
Some(last) => last.max(msg.id),
});
}
}
#[tokio::main]
async fn main() -> Result<()> {
// Logging: RUST_LOG=info,bridge=debug,reqwest=warn ...
tracing_subscriber::fmt()
.with_env_filter(
tracing_subscriber::EnvFilter::from_default_env()
.add_directive("potatomesh_matrix_bridge=info".parse().unwrap_or_default())
.add_directive("reqwest=warn".parse().unwrap_or_default()),
)
.init();
let cfg = Config::from_default_path()?;
info!("Loaded config: {:?}", cfg);
let http = reqwest::Client::builder().build()?;
let potato = PotatoClient::new(http.clone(), cfg.potatomesh.clone());
let matrix = MatrixAppserviceClient::new(http.clone(), cfg.matrix.clone());
let state_path = &cfg.state.state_file;
let mut state = BridgeState::load(state_path)?;
info!("Loaded state: {:?}", state);
let poll_interval = Duration::from_secs(cfg.potatomesh.poll_interval_secs);
loop {
match potato.fetch_messages().await {
Ok(mut msgs) => {
// sort by id ascending so we process in order
msgs.sort_by_key(|m| m.id);
for msg in msgs {
if !state.should_forward(&msg) {
continue;
}
// Filter to the ports you care about
if msg.portnum != "TEXT_MESSAGE_APP" {
state.update_with(&msg);
continue;
}
if let Err(e) = handle_message(&potato, &matrix, &mut state, &msg).await {
error!("Error handling message {}: {:?}", msg.id, e);
}
// persist after each processed message
if let Err(e) = state.save(state_path) {
error!("Error saving state: {:?}", e);
}
}
}
Err(e) => {
error!("Error fetching PotatoMesh messages: {:?}", e);
}
}
sleep(poll_interval).await;
}
}
async fn handle_message(
potato: &PotatoClient,
matrix: &MatrixAppserviceClient,
state: &mut BridgeState,
msg: &PotatoMessage,
) -> Result<()> {
let node = potato.get_node(&msg.node_id).await?;
let localpart = MatrixAppserviceClient::localpart_from_node_id(&msg.node_id);
let user_id = matrix.user_id(&localpart);
// Ensure puppet exists & has display name
matrix.ensure_user_registered(&localpart).await?;
matrix.set_display_name(&user_id, &node.long_name).await?;
// Format the bridged message
let short = node
.short_name
.clone()
.unwrap_or_else(|| node.long_name.clone());
let body = format!(
"[{short}] {text}\n({from_id}{to_id}, RSSI {rssi} dB, SNR {snr} dB, {chan}/{preset})",
short = short,
text = msg.text,
from_id = msg.from_id,
to_id = msg.to_id,
rssi = msg.rssi,
snr = msg.snr,
chan = msg.channel_name,
preset = msg.modem_preset,
);
matrix.send_text_message_as(&user_id, &body).await?;
state.update_with(msg);
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use crate::config::{MatrixConfig, PotatomeshConfig};
use crate::matrix::MatrixAppserviceClient;
use crate::potatomesh::PotatoClient;
fn sample_msg(id: u64) -> PotatoMessage {
PotatoMessage {
id,
rx_time: 0,
rx_iso: "2025-11-27T00:00:00Z".to_string(),
from_id: "!abcd1234".to_string(),
to_id: "^all".to_string(),
channel: 1,
portnum: "TEXT_MESSAGE_APP".to_string(),
text: "Ping".to_string(),
rssi: -100,
hop_limit: 1,
lora_freq: 868,
modem_preset: "MediumFast".to_string(),
channel_name: "TEST".to_string(),
snr: 0.0,
reply_id: None,
node_id: "!abcd1234".to_string(),
}
}
#[test]
fn bridge_state_initially_forwards_all() {
let state = BridgeState::default();
let msg = sample_msg(42);
assert!(state.should_forward(&msg));
}
#[test]
fn bridge_state_tracks_highest_id_and_skips_older() {
let mut state = BridgeState::default();
let m1 = sample_msg(10);
let m2 = sample_msg(20);
let m3 = sample_msg(15);
// First message, should forward
assert!(state.should_forward(&m1));
state.update_with(&m1);
assert_eq!(state.last_message_id, Some(10));
// Second message, higher id, should forward
assert!(state.should_forward(&m2));
state.update_with(&m2);
assert_eq!(state.last_message_id, Some(20));
// Third message, lower than last, should NOT forward
assert!(!state.should_forward(&m3));
// state remains unchanged
assert_eq!(state.last_message_id, Some(20));
}
#[test]
fn bridge_state_update_is_monotonic() {
let mut state = BridgeState {
last_message_id: Some(50),
};
let m = sample_msg(40);
state.update_with(&m); // id is lower than current
// last_message_id must stay at 50
assert_eq!(state.last_message_id, Some(50));
}
#[test]
fn bridge_state_load_save_roundtrip() {
let tmp_dir = tempfile::tempdir().unwrap();
let file_path = tmp_dir.path().join("state.json");
let path_str = file_path.to_str().unwrap();
let state = BridgeState {
last_message_id: Some(12345),
};
state.save(path_str).unwrap();
let loaded_state = BridgeState::load(path_str).unwrap();
assert_eq!(loaded_state.last_message_id, Some(12345));
}
#[test]
fn bridge_state_load_nonexistent() {
let tmp_dir = tempfile::tempdir().unwrap();
let file_path = tmp_dir.path().join("nonexistent.json");
let path_str = file_path.to_str().unwrap();
let state = BridgeState::load(path_str).unwrap();
assert_eq!(state.last_message_id, None);
}
#[tokio::test]
async fn test_handle_message() {
let mut server = mockito::Server::new_async().await;
let potatomesh_cfg = PotatomeshConfig {
base_url: server.url(),
poll_interval_secs: 1,
};
let matrix_cfg = MatrixConfig {
homeserver: server.url(),
as_token: "AS_TOKEN".to_string(),
server_name: "example.org".to_string(),
room_id: "!roomid:example.org".to_string(),
};
let node_id = "abcd1234";
let user_id = format!("@{}:{}", node_id, matrix_cfg.server_name);
let encoded_user = urlencoding::encode(&user_id);
let mock_get_node = server
.mock("GET", "/nodes/abcd1234")
.with_status(200)
.with_header("content-type", "application/json")
.with_body(r#"{"node_id": "!abcd1234", "long_name": "Test Node", "short_name": "TN"}"#)
.create();
let mock_register = server
.mock("POST", "/_matrix/client/v3/register")
.match_query("kind=user&access_token=AS_TOKEN")
.with_status(200)
.create();
let mock_display_name = server
.mock(
"PUT",
format!("/_matrix/client/v3/profile/{}/displayname", encoded_user).as_str(),
)
.match_query(format!("user_id={}&access_token=AS_TOKEN", encoded_user).as_str())
.with_status(200)
.create();
let http_client = reqwest::Client::new();
let matrix_client = MatrixAppserviceClient::new(http_client.clone(), matrix_cfg);
let room_id = &matrix_client.cfg.room_id;
let encoded_room = urlencoding::encode(room_id);
let txn_id = matrix_client
.txn_counter
.load(std::sync::atomic::Ordering::SeqCst);
let mock_send = server
.mock(
"PUT",
format!(
"/_matrix/client/v3/rooms/{}/send/m.room.message/{}",
encoded_room, txn_id
)
.as_str(),
)
.match_query(format!("user_id={}&access_token=AS_TOKEN", encoded_user).as_str())
.with_status(200)
.create();
let potato_client = PotatoClient::new(http_client.clone(), potatomesh_cfg);
let mut state = BridgeState::default();
let msg = sample_msg(100);
let result = handle_message(&potato_client, &matrix_client, &mut state, &msg).await;
assert!(result.is_ok());
mock_get_node.assert();
mock_register.assert();
mock_display_name.assert();
mock_send.assert();
assert_eq!(state.last_message_id, Some(100));
}
}

362
matrix/src/matrix.rs Normal file
View File

@@ -0,0 +1,362 @@
use serde::Serialize;
use std::sync::{
atomic::{AtomicU64, Ordering},
Arc,
};
use crate::config::MatrixConfig;
#[derive(Clone)]
pub struct MatrixAppserviceClient {
http: reqwest::Client,
pub cfg: MatrixConfig,
pub txn_counter: Arc<AtomicU64>,
}
impl MatrixAppserviceClient {
pub fn new(http: reqwest::Client, cfg: MatrixConfig) -> Self {
let start = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64;
Self {
http,
cfg,
txn_counter: Arc::new(AtomicU64::new(start)),
}
}
/// Convert a node_id like "!deadbeef" into Matrix localpart "deadbeef".
pub fn localpart_from_node_id(node_id: &str) -> String {
node_id.trim_start_matches('!').to_string()
}
/// Build a full Matrix user_id from localpart.
pub fn user_id(&self, localpart: &str) -> String {
format!("@{}:{}", localpart, self.cfg.server_name)
}
fn auth_query(&self) -> String {
format!("access_token={}", urlencoding::encode(&self.cfg.as_token))
}
/// Ensure the puppet user exists (register via appservice registration).
pub async fn ensure_user_registered(&self, localpart: &str) -> anyhow::Result<()> {
#[derive(Serialize)]
struct RegisterReq<'a> {
#[serde(rename = "type")]
typ: &'a str,
username: &'a str,
}
let url = format!(
"{}/_matrix/client/v3/register?kind=user&{}",
self.cfg.homeserver,
self.auth_query()
);
let body = RegisterReq {
typ: "m.login.application_service",
username: localpart,
};
let resp = self.http.post(&url).json(&body).send().await?;
if resp.status().is_success() {
Ok(())
} else {
// If user already exists, Synapse / HS usually returns 400 M_USER_IN_USE.
// We'll just ignore non-success and hope it's that case.
Ok(())
}
}
/// Set display name for puppet user.
pub async fn set_display_name(&self, user_id: &str, display_name: &str) -> anyhow::Result<()> {
#[derive(Serialize)]
struct DisplayNameReq<'a> {
displayname: &'a str,
}
let encoded_user = urlencoding::encode(user_id);
let url = format!(
"{}/_matrix/client/v3/profile/{}/displayname?user_id={}&{}",
self.cfg.homeserver,
encoded_user,
encoded_user,
self.auth_query()
);
let body = DisplayNameReq {
displayname: display_name,
};
let resp = self.http.put(&url).json(&body).send().await?;
if resp.status().is_success() {
Ok(())
} else {
// Non-fatal.
tracing::warn!(
"Failed to set display name for {}: {}",
user_id,
resp.status()
);
Ok(())
}
}
/// Send a plain text message into the configured room as puppet user_id.
pub async fn send_text_message_as(&self, user_id: &str, body_text: &str) -> anyhow::Result<()> {
#[derive(Serialize)]
struct MsgContent<'a> {
msgtype: &'a str,
body: &'a str,
}
let txn_id = self.txn_counter.fetch_add(1, Ordering::SeqCst);
let encoded_room = urlencoding::encode(&self.cfg.room_id);
let encoded_user = urlencoding::encode(user_id);
let url = format!(
"{}/_matrix/client/v3/rooms/{}/send/m.room.message/{}?user_id={}&{}",
self.cfg.homeserver,
encoded_room,
txn_id,
encoded_user,
self.auth_query()
);
let content = MsgContent {
msgtype: "m.text",
body: body_text,
};
let resp = self.http.put(&url).json(&content).send().await?;
if !resp.status().is_success() {
let status = resp.status();
// optional: pull a short body snippet for debugging
let body_snip = resp.text().await.unwrap_or_default();
// Log for observability
tracing::warn!(
"Failed to send message as {}: status {}, body: {}",
user_id,
status,
body_snip
);
// Propagate an error so callers know this message was NOT delivered
return Err(anyhow::anyhow!(
"Matrix send failed for {} with status {}",
user_id,
status
));
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
fn dummy_cfg() -> MatrixConfig {
MatrixConfig {
homeserver: "https://matrix.example.org".to_string(),
as_token: "AS_TOKEN".to_string(),
server_name: "example.org".to_string(),
room_id: "!roomid:example.org".to_string(),
}
}
#[test]
fn localpart_strips_bang_correctly() {
assert_eq!(
MatrixAppserviceClient::localpart_from_node_id("!deadbeef"),
"deadbeef"
);
assert_eq!(
MatrixAppserviceClient::localpart_from_node_id("cafebabe"),
"cafebabe"
);
}
#[test]
fn user_id_builds_from_localpart_and_server_name() {
let http = reqwest::Client::builder().build().unwrap();
let client = MatrixAppserviceClient::new(http, dummy_cfg());
let uid = client.user_id("deadbeef");
assert_eq!(uid, "@deadbeef:example.org");
}
#[test]
fn auth_query_contains_access_token() {
let http = reqwest::Client::builder().build().unwrap();
let client = MatrixAppserviceClient::new(http, dummy_cfg());
let q = client.auth_query();
assert!(q.starts_with("access_token="));
assert!(q.contains("AS_TOKEN"));
}
#[test]
fn test_new_matrix_client() {
let http_client = reqwest::Client::new();
let config = dummy_cfg();
let client = MatrixAppserviceClient::new(http_client, config);
assert_eq!(client.cfg.homeserver, "https://matrix.example.org");
assert_eq!(client.cfg.as_token, "AS_TOKEN");
assert!(client.txn_counter.load(Ordering::SeqCst) > 0);
}
#[tokio::test]
async fn test_ensure_user_registered_success() {
let mut server = mockito::Server::new_async().await;
let mock = server
.mock("POST", "/_matrix/client/v3/register")
.match_query("kind=user&access_token=AS_TOKEN")
.with_status(200)
.create();
let mut cfg = dummy_cfg();
cfg.homeserver = server.url();
let client = MatrixAppserviceClient::new(reqwest::Client::new(), cfg);
let result = client.ensure_user_registered("testuser").await;
mock.assert();
assert!(result.is_ok());
}
#[tokio::test]
async fn test_ensure_user_registered_user_in_use() {
let mut server = mockito::Server::new_async().await;
let mock = server
.mock("POST", "/_matrix/client/v3/register")
.match_query("kind=user&access_token=AS_TOKEN")
.with_status(400) // M_USER_IN_USE
.create();
let mut cfg = dummy_cfg();
cfg.homeserver = server.url();
let client = MatrixAppserviceClient::new(reqwest::Client::new(), cfg);
let result = client.ensure_user_registered("testuser").await;
mock.assert();
assert!(result.is_ok());
}
#[tokio::test]
async fn test_set_display_name_success() {
let mut server = mockito::Server::new_async().await;
let user_id = "@test:example.org";
let encoded_user = urlencoding::encode(user_id);
let query = format!("user_id={}&access_token=AS_TOKEN", encoded_user);
let path = format!("/_matrix/client/v3/profile/{}/displayname", encoded_user);
let mock = server
.mock("PUT", path.as_str())
.match_query(query.as_str())
.with_status(200)
.create();
let mut cfg = dummy_cfg();
cfg.homeserver = server.url();
let client = MatrixAppserviceClient::new(reqwest::Client::new(), cfg);
let result = client.set_display_name(user_id, "Test Name").await;
mock.assert();
assert!(result.is_ok());
}
#[tokio::test]
async fn test_set_display_name_fail_is_ok() {
let mut server = mockito::Server::new_async().await;
let user_id = "@test:example.org";
let encoded_user = urlencoding::encode(user_id);
let query = format!("user_id={}&access_token=AS_TOKEN", encoded_user);
let path = format!("/_matrix/client/v3/profile/{}/displayname", encoded_user);
let mock = server
.mock("PUT", path.as_str())
.match_query(query.as_str())
.with_status(500)
.create();
let mut cfg = dummy_cfg();
cfg.homeserver = server.url();
let client = MatrixAppserviceClient::new(reqwest::Client::new(), cfg);
let result = client.set_display_name(user_id, "Test Name").await;
mock.assert();
assert!(result.is_ok());
}
#[tokio::test]
async fn test_send_text_message_as_success() {
let mut server = mockito::Server::new_async().await;
let user_id = "@test:example.org";
let room_id = "!roomid:example.org";
let encoded_user = urlencoding::encode(user_id);
let encoded_room = urlencoding::encode(room_id);
let client = {
let mut cfg = dummy_cfg();
cfg.homeserver = server.url();
cfg.room_id = room_id.to_string();
MatrixAppserviceClient::new(reqwest::Client::new(), cfg)
};
let txn_id = client.txn_counter.load(Ordering::SeqCst);
let query = format!("user_id={}&access_token=AS_TOKEN", encoded_user);
let path = format!(
"/_matrix/client/v3/rooms/{}/send/m.room.message/{}",
encoded_room, txn_id
);
let mock = server
.mock("PUT", path.as_str())
.match_query(query.as_str())
.with_status(200)
.create();
let result = client.send_text_message_as(user_id, "hello").await;
mock.assert();
assert!(result.is_ok());
}
#[tokio::test]
async fn test_send_text_message_as_fail() {
let mut server = mockito::Server::new_async().await;
let user_id = "@test:example.org";
let room_id = "!roomid:example.org";
let encoded_user = urlencoding::encode(user_id);
let encoded_room = urlencoding::encode(room_id);
let client = {
let mut cfg = dummy_cfg();
cfg.homeserver = server.url();
cfg.room_id = room_id.to_string();
MatrixAppserviceClient::new(reqwest::Client::new(), cfg)
};
let txn_id = client.txn_counter.load(Ordering::SeqCst);
let query = format!("user_id={}&access_token=AS_TOKEN", encoded_user);
let path = format!(
"/_matrix/client/v3/rooms/{}/send/m.room.message/{}",
encoded_room, txn_id
);
let mock = server
.mock("PUT", path.as_str())
.match_query(query.as_str())
.with_status(500)
.create();
let result = client.send_text_message_as(user_id, "hello").await;
mock.assert();
assert!(result.is_err());
}
}

363
matrix/src/potatomesh.rs Normal file
View File

@@ -0,0 +1,363 @@
use serde::Deserialize;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
use crate::config::PotatomeshConfig;
#[allow(dead_code)]
#[derive(Debug, Deserialize, Clone)]
pub struct PotatoMessage {
pub id: u64,
pub rx_time: u64,
pub rx_iso: String,
pub from_id: String,
pub to_id: String,
pub channel: u8,
pub portnum: String,
pub text: String,
pub rssi: i16,
pub hop_limit: u8,
pub lora_freq: u32,
pub modem_preset: String,
pub channel_name: String,
pub snr: f32,
#[serde(default)]
pub reply_id: Option<u64>,
pub node_id: String,
}
#[allow(dead_code)]
#[derive(Debug, Deserialize, Clone)]
pub struct PotatoNode {
pub node_id: String,
#[serde(default)]
pub short_name: Option<String>,
pub long_name: String,
#[serde(default)]
pub role: Option<String>,
#[serde(default)]
pub hw_model: Option<String>,
#[serde(default)]
pub last_heard: Option<u64>,
#[serde(default)]
pub first_heard: Option<u64>,
#[serde(default)]
pub latitude: Option<f64>,
#[serde(default)]
pub longitude: Option<f64>,
#[serde(default)]
pub altitude: Option<f64>,
}
#[derive(Clone)]
pub struct PotatoClient {
http: reqwest::Client,
cfg: PotatomeshConfig,
// simple in-memory cache for node metadata
nodes_cache: Arc<RwLock<HashMap<String, PotatoNode>>>,
}
impl PotatoClient {
pub fn new(http: reqwest::Client, cfg: PotatomeshConfig) -> Self {
Self {
http,
cfg,
nodes_cache: Arc::new(RwLock::new(HashMap::new())),
}
}
fn messages_url(&self) -> String {
format!("{}/messages", self.cfg.base_url)
}
fn node_url(&self, hex_id: &str) -> String {
// e.g. https://potatomesh.net/api/nodes/67fc83cb
format!("{}/nodes/{}", self.cfg.base_url, hex_id)
}
pub async fn fetch_messages(&self) -> anyhow::Result<Vec<PotatoMessage>> {
let resp = self
.http
.get(self.messages_url())
.send()
.await?
.error_for_status()?;
let msgs: Vec<PotatoMessage> = resp.json().await?;
Ok(msgs)
}
pub async fn get_node(&self, node_id_with_bang: &str) -> anyhow::Result<PotatoNode> {
// node_id is like "!67fc83cb" → we need "67fc83cb"
let hex = node_id_with_bang.trim_start_matches('!').to_string();
{
let cache = self.nodes_cache.read().await;
if let Some(n) = cache.get(&hex) {
return Ok(n.clone());
}
}
let url = self.node_url(&hex);
let resp = self.http.get(url).send().await?.error_for_status()?;
let node: PotatoNode = resp.json().await?;
{
let mut cache = self.nodes_cache.write().await;
cache.insert(hex, node.clone());
}
Ok(node)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn deserialize_sample_message_array() {
let json = r#"
[
{
"id": 2947676906,
"rx_time": 1764241436,
"rx_iso": "2025-11-27T11:03:56Z",
"from_id": "!da6556d4",
"to_id": "^all",
"channel": 1,
"portnum": "TEXT_MESSAGE_APP",
"text": "Ping",
"rssi": -111,
"hop_limit": 1,
"lora_freq": 868,
"modem_preset": "MediumFast",
"channel_name": "TEST",
"snr": -9.0,
"node_id": "!06871773"
}
]
"#;
let msgs: Vec<PotatoMessage> = serde_json::from_str(json).expect("valid message json");
assert_eq!(msgs.len(), 1);
let m = &msgs[0];
assert_eq!(m.id, 2947676906);
assert_eq!(m.from_id, "!da6556d4");
assert_eq!(m.node_id, "!06871773");
assert_eq!(m.portnum, "TEXT_MESSAGE_APP");
assert_eq!(m.lora_freq, 868);
assert!((m.snr - (-9.0)).abs() < f32::EPSILON);
}
#[test]
fn deserialize_sample_node() {
let json = r#"
{
"node_id": "!67fc83cb",
"short_name": "83CB",
"long_name": "Meshtastic 83CB",
"role": "CLIENT_HIDDEN",
"last_heard": 1764250515,
"first_heard": 1758993817,
"last_seen_iso": "2025-11-27T13:35:15Z"
}
"#;
let node: PotatoNode = serde_json::from_str(json).expect("valid node json");
assert_eq!(node.node_id, "!67fc83cb");
assert_eq!(node.short_name.as_deref(), Some("83CB"));
assert_eq!(node.long_name, "Meshtastic 83CB");
assert_eq!(node.role.as_deref(), Some("CLIENT_HIDDEN"));
assert_eq!(node.last_heard, Some(1764250515));
assert_eq!(node.first_heard, Some(1758993817));
assert!(node.latitude.is_none());
}
#[test]
fn node_hex_id_is_stripped_correctly() {
let with_bang = "!deadbeef";
let hex = with_bang.trim_start_matches('!');
assert_eq!(hex, "deadbeef");
let already_hex = "cafebabe";
let hex2 = already_hex.trim_start_matches('!');
assert_eq!(hex2, "cafebabe");
}
#[test]
fn test_new_potato_client() {
let http_client = reqwest::Client::new();
let config = PotatomeshConfig {
base_url: "http://localhost:8080".to_string(),
poll_interval_secs: 60,
};
let client = PotatoClient::new(http_client, config);
assert_eq!(client.cfg.base_url, "http://localhost:8080");
assert_eq!(client.cfg.poll_interval_secs, 60);
}
#[test]
fn test_messages_url() {
let http_client = reqwest::Client::new();
let config = PotatomeshConfig {
base_url: "http://localhost:8080".to_string(),
poll_interval_secs: 60,
};
let client = PotatoClient::new(http_client, config);
assert_eq!(client.messages_url(), "http://localhost:8080/messages");
}
#[test]
fn test_node_url() {
let http_client = reqwest::Client::new();
let config = PotatomeshConfig {
base_url: "http://localhost:8080".to_string(),
poll_interval_secs: 60,
};
let client = PotatoClient::new(http_client, config);
assert_eq!(
client.node_url("!1234"),
"http://localhost:8080/nodes/!1234"
);
}
#[tokio::test]
async fn test_fetch_messages_success() {
let mut server = mockito::Server::new_async().await;
let mock = server
.mock("GET", "/messages")
.with_status(200)
.with_header("content-type", "application/json")
.with_body(
r#"
[
{
"id": 2947676906, "rx_time": 1764241436, "rx_iso": "2025-11-27T11:03:56Z",
"from_id": "!da6556d4", "to_id": "^all", "channel": 1,
"portnum": "TEXT_MESSAGE_APP", "text": "Ping", "rssi": -111,
"hop_limit": 1, "lora_freq": 868, "modem_preset": "MediumFast",
"channel_name": "TEST", "snr": -9.0, "node_id": "!06871773"
}
]
"#,
)
.create();
let http_client = reqwest::Client::new();
let config = PotatomeshConfig {
base_url: server.url(),
poll_interval_secs: 60,
};
let client = PotatoClient::new(http_client, config);
let result = client.fetch_messages().await;
mock.assert();
assert!(result.is_ok());
let messages = result.unwrap();
assert_eq!(messages.len(), 1);
assert_eq!(messages[0].id, 2947676906);
}
#[tokio::test]
async fn test_fetch_messages_error() {
let mut server = mockito::Server::new_async().await;
let mock = server.mock("GET", "/messages").with_status(500).create();
let http_client = reqwest::Client::new();
let config = PotatomeshConfig {
base_url: server.url(),
poll_interval_secs: 60,
};
let client = PotatoClient::new(http_client, config);
let result = client.fetch_messages().await;
mock.assert();
assert!(result.is_err());
}
#[tokio::test]
async fn test_get_node_cache_hit() {
let http_client = reqwest::Client::new();
let config = PotatomeshConfig {
base_url: "http://localhost:8080".to_string(),
poll_interval_secs: 60,
};
let client = PotatoClient::new(http_client, config);
let node = PotatoNode {
node_id: "!1234".to_string(),
short_name: Some("test".to_string()),
long_name: "test node".to_string(),
role: None,
hw_model: None,
last_heard: None,
first_heard: None,
latitude: None,
longitude: None,
altitude: None,
};
client
.nodes_cache
.write()
.await
.insert("1234".to_string(), node.clone());
let result = client.get_node("!1234").await;
assert!(result.is_ok());
let got = result.unwrap();
assert_eq!(got.node_id, "!1234");
assert_eq!(got.short_name.unwrap(), "test");
}
#[tokio::test]
async fn test_get_node_cache_miss() {
let mut server = mockito::Server::new_async().await;
let mock = server
.mock("GET", "/nodes/1234")
.with_status(200)
.with_header("content-type", "application/json")
.with_body(
r#"
{
"node_id": "!1234", "short_name": "test", "long_name": "test node",
"role": "test", "hw_model": "test", "last_heard": 1, "first_heard": 1,
"latitude": 1.0, "longitude": 1.0, "altitude": 1.0
}
"#,
)
.create();
let http_client = reqwest::Client::new();
let config = PotatomeshConfig {
base_url: server.url(),
poll_interval_secs: 60,
};
let client = PotatoClient::new(http_client, config);
// first call, should miss cache and hit the server
let result = client.get_node("!1234").await;
mock.assert();
assert!(result.is_ok());
// second call, should hit cache
let result2 = client.get_node("!1234").await;
assert!(result2.is_ok());
// mockito would panic here if we made a second request
}
#[tokio::test]
async fn test_get_node_error() {
let mut server = mockito::Server::new_async().await;
let mock = server.mock("GET", "/nodes/1234").with_status(500).create();
let http_client = reqwest::Client::new();
let config = PotatomeshConfig {
base_url: server.url(),
poll_interval_secs: 60,
};
let client = PotatoClient::new(http_client, config);
let result = client.get_node("!1234").await;
mock.assert();
assert!(result.is_err());
}
}