From ed1e719bca2bb88505fe39524ed7692079ab8db5 Mon Sep 17 00:00:00 2001 From: Daniel Pupius Date: Mon, 21 Apr 2025 10:01:05 -0700 Subject: [PATCH] Add SSE endpoint for streaming MQTT messages to web browsers --- main.go | 5 +- server/server.go | 106 ++++++++++++++++++++++++++++++++++++++- server/static/index.html | 101 +++++++++++++++++++++++++++++++++++-- 3 files changed, 205 insertions(+), 7 deletions(-) diff --git a/main.go b/main.go index 3733d3a..2cf550a 100644 --- a/main.go +++ b/main.go @@ -80,8 +80,9 @@ func main() { // Start the web server webServer := server.New(server.Config{ - Host: serverHost, - Port: serverPort, + Host: serverHost, + Port: serverPort, + Broker: broker, }) // Start the server in a goroutine diff --git a/server/server.go b/server/server.go index 3a1a769..7cf38ae 100644 --- a/server/server.go +++ b/server/server.go @@ -2,17 +2,22 @@ package server import ( "encoding/json" + "fmt" "log" "net/http" "strconv" + "time" "github.com/dpup/prefab" + + "meshstream/mqtt" ) // Config holds server configuration type Config struct { - Host string - Port string + Host string + Port string + Broker *mqtt.Broker // The MQTT message broker } // Server encapsulates the HTTP server functionality @@ -23,6 +28,10 @@ type Server struct { // New creates a new server instance func New(config Config) *Server { + if config.Broker == nil { + log.Println("Warning: Server created without a broker, streaming will not work") + } + return &Server{ config: config, } @@ -41,6 +50,7 @@ func (s *Server) Start() error { prefab.WithHost(s.config.Host), prefab.WithPort(port), prefab.WithHTTPHandlerFunc("/api/status", s.handleStatus), + prefab.WithHTTPHandlerFunc("/api/stream", s.handleStream), prefab.WithStaticFiles("/", "./server/static"), ) @@ -66,4 +76,96 @@ func (s *Server) handleStatus(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(status) +} + +// handleStream handles Server-Sent Events streaming of MQTT messages +func (s *Server) handleStream(w http.ResponseWriter, r *http.Request) { + // Check if broker is available + if s.config.Broker == nil { + http.Error(w, "MQTT broker not available", http.StatusServiceUnavailable) + return + } + + // Set headers for SSE + w.Header().Set("Content-Type", "text/event-stream") + w.Header().Set("Cache-Control", "no-cache") + w.Header().Set("Connection", "keep-alive") + w.Header().Set("Access-Control-Allow-Origin", "*") + + // Make sure that the writer supports flushing + flusher, ok := w.(http.Flusher) + if !ok { + http.Error(w, "Streaming not supported", http.StatusInternalServerError) + return + } + + // Subscribe to the broker with a buffer size of 10 + packetChan := s.config.Broker.Subscribe(10) + + // Signal when the client disconnects + notify := r.Context().Done() + + // Send an initial message + fmt.Fprintf(w, "event: info\ndata: Connected to Meshtastic stream\n\n") + flusher.Flush() + + // Stream messages to the client + for { + select { + case <-notify: + // Client disconnected, unsubscribe and return + log.Println("Client disconnected, unsubscribing from broker") + s.config.Broker.Unsubscribe(packetChan) + return + + case packet, ok := <-packetChan: + if !ok { + // Channel closed, probably shutting down + log.Println("Packet channel closed, ending stream") + return + } + + if packet == nil { + continue + } + + // Create a simplified packet for the frontend + packetData := map[string]interface{}{ + "from": packet.From, + "to": packet.To, + "port": packet.PortNum.String(), + "timestamp": time.Now().Unix(), + "hop_limit": packet.HopLimit, + "hop_start": packet.HopStart, + "id": packet.ID, + "channel_id": packet.ChannelID, + } + + // Add payload information if available + switch v := packet.Payload.(type) { + case string: + packetData["payload_type"] = "text" + packetData["payload"] = v + case []byte: + packetData["payload_type"] = "binary" + packetData["payload_size"] = len(v) + case nil: + packetData["payload_type"] = "none" + default: + packetData["payload_type"] = fmt.Sprintf("%T", v) + } + + // Convert the packet to JSON + data, err := json.Marshal(packetData) + + if err != nil { + log.Printf("Error marshaling packet to JSON: %v", err) + continue + } + + // Send the event + fmt.Fprintf(w, "event: message\ndata: %s\n\n", data) + flusher.Flush() + } + } } \ No newline at end of file diff --git a/server/static/index.html b/server/static/index.html index 3951619..14d0cea 100644 --- a/server/static/index.html +++ b/server/static/index.html @@ -10,30 +10,125 @@ margin: 0; padding: 20px; line-height: 1.6; + max-width: 960px; + margin: 0 auto; } h1 { color: #333; + border-bottom: 1px solid #eee; + padding-bottom: 10px; } #messages { margin-top: 20px; border: 1px solid #ddd; + border-radius: 4px; padding: 10px; - height: 400px; + height: 500px; overflow-y: auto; background-color: #f9f9f9; + font-family: 'Monaco', 'Consolas', monospace; + font-size: 14px; + } + .message { + padding: 8px 0; + border-bottom: 1px solid #eee; + } + .message:last-child { + border-bottom: none; + } + .info { + padding: 10px; + background-color: #e9f7fe; + border-radius: 4px; + margin-bottom: 20px; }

Meshtastic Stream

+ +
+

This page displays real-time messages from Meshtastic nodes via MQTT.

+

Messages are streamed using Server-Sent Events (SSE) and will appear below as they arrive.

+
+

Waiting for messages...