mirror of
https://github.com/dpup/meshstream.git
synced 2026-03-28 17:42:37 +01:00
121 lines
3.4 KiB
Go
121 lines
3.4 KiB
Go
package main
|
|
|
|
import (
|
|
"fmt"
|
|
"log"
|
|
"os"
|
|
"os/signal"
|
|
"strings"
|
|
"syscall"
|
|
"time"
|
|
|
|
"meshstream/decoder"
|
|
|
|
mqtt "github.com/eclipse/paho.mqtt.golang"
|
|
)
|
|
|
|
const (
|
|
mqttBroker = "mqtt.bayme.sh"
|
|
mqttUsername = "meshdev"
|
|
mqttPassword = "large4cats"
|
|
mqttTopicPrefix = "msh/US/bayarea"
|
|
)
|
|
|
|
var messagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
|
|
fmt.Printf("Received message from topic: %s\n", msg.Topic())
|
|
|
|
// Parse the topic structure
|
|
topicInfo, err := decoder.ParseTopic(msg.Topic())
|
|
if err != nil {
|
|
fmt.Printf("Error parsing topic: %v\n", err)
|
|
fmt.Printf("Raw topic: %s\n", msg.Topic())
|
|
fmt.Printf("Raw payload: %x\n", msg.Payload())
|
|
} else {
|
|
// First decode the message based on its format
|
|
var formattedOutput string
|
|
switch topicInfo.Format {
|
|
case "e", "c", "map":
|
|
// Binary encoded protobuf message (both regular and map formats use the same decoder)
|
|
decodedPacket := decoder.DecodeMessage(msg.Payload(), topicInfo)
|
|
formattedOutput = decoder.FormatTopicAndPacket(topicInfo, decodedPacket)
|
|
case "json":
|
|
// JSON format message
|
|
jsonData, err := decoder.DecodeJSONMessage(msg.Payload())
|
|
if err != nil {
|
|
fmt.Printf("Error decoding JSON message: %v\n", err)
|
|
formattedOutput = decoder.FormatTopicAndRawData(topicInfo, msg.Payload())
|
|
} else {
|
|
formattedOutput = decoder.FormatTopicAndJSONData(topicInfo, jsonData)
|
|
}
|
|
default:
|
|
// Unsupported format
|
|
formattedOutput = decoder.FormatTopicAndRawData(topicInfo, msg.Payload())
|
|
}
|
|
|
|
// Print the formatted output
|
|
fmt.Println(formattedOutput)
|
|
}
|
|
|
|
fmt.Println(strings.Repeat("-", 80))
|
|
}
|
|
|
|
var connectHandler mqtt.OnConnectHandler = func(client mqtt.Client) {
|
|
fmt.Println("Connected to MQTT Broker!")
|
|
}
|
|
|
|
var connectLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {
|
|
fmt.Printf("Connection lost: %v\n", err)
|
|
}
|
|
|
|
func main() {
|
|
// Set up logging
|
|
log.SetOutput(os.Stdout)
|
|
|
|
// Initialize default channel key
|
|
err := decoder.AddChannelKey("LongFast", decoder.DefaultPrivateKey)
|
|
if err != nil {
|
|
log.Printf("Failed to initialize default channel key: %v", err)
|
|
}
|
|
|
|
if err := decoder.AddChannelKey("ERSN", "VIuMtC5uDDJtC/ojdH314HLkDIHanX4LdbK5yViV9jA="); err != nil {
|
|
log.Printf("Failed to initialize ERSN channel key: %v", err)
|
|
}
|
|
|
|
// Create MQTT client options
|
|
opts := mqtt.NewClientOptions()
|
|
opts.AddBroker(fmt.Sprintf("tcp://%s:1883", mqttBroker))
|
|
opts.SetClientID("meshstream-client")
|
|
opts.SetUsername(mqttUsername)
|
|
opts.SetPassword(mqttPassword)
|
|
opts.SetDefaultPublishHandler(messagePubHandler)
|
|
opts.SetPingTimeout(1 * time.Second)
|
|
opts.OnConnect = connectHandler
|
|
opts.OnConnectionLost = connectLostHandler
|
|
|
|
// Create and start a client
|
|
client := mqtt.NewClient(opts)
|
|
if token := client.Connect(); token.Wait() && token.Error() != nil {
|
|
log.Fatalf("Error connecting to MQTT broker: %v", token.Error())
|
|
}
|
|
|
|
// Subscribe to all topics for this region
|
|
// This will capture:
|
|
// - msh/US/CA/Motherlode/2/e/# (binary protobuf data)
|
|
// - msh/US/CA/Motherlode/2/json/# (JSON formatted data)
|
|
topic := mqttTopicPrefix + "/#"
|
|
token := client.Subscribe(topic, 0, nil)
|
|
token.Wait()
|
|
fmt.Printf("Subscribed to topic: %s\n", topic)
|
|
|
|
// Wait for interrupt signal to gracefully shutdown
|
|
sig := make(chan os.Signal, 1)
|
|
signal.Notify(sig, os.Interrupt, syscall.SIGTERM)
|
|
<-sig
|
|
|
|
// Unsubscribe and disconnect
|
|
fmt.Println("Unsubscribing and disconnecting...")
|
|
token = client.Unsubscribe(topic)
|
|
token.Wait()
|
|
client.Disconnect(250)
|
|
}
|