From 9c438713de6a793a5e215dac5d862a9ea2ecb4e3 Mon Sep 17 00:00:00 2001 From: Daniel Pupius Date: Mon, 21 Apr 2025 10:38:59 -0700 Subject: [PATCH] Add graceful shutdown for SSE connections MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add atomic shutdown flag and shutdown channel to handle graceful termination of Server-Sent Events connections. This ensures: 1. New connections are rejected with a proper status code during shutdown 2. Existing connections receive a notification before being closed 3. All subscriber channels are properly unsubscribed from the broker This implementation safely handles multiple in-flight requests during shutdown by using an atomic flag to track server state. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- server/server.go | 29 ++++++++++++++++++++++++++++- 1 file changed, 28 insertions(+), 1 deletion(-) diff --git a/server/server.go b/server/server.go index caed58e..42631cb 100644 --- a/server/server.go +++ b/server/server.go @@ -6,6 +6,7 @@ import ( "log" "net/http" "strconv" + "sync/atomic" "time" "github.com/dpup/prefab" @@ -24,6 +25,10 @@ type Config struct { type Server struct { config Config server *prefab.Server + // Channel to signal shutdown to active connections + shutdown chan struct{} + // Atomic flag to indicate if server is shutting down + isShuttingDown atomic.Bool } // New creates a new server instance @@ -33,7 +38,8 @@ func New(config Config) *Server { } return &Server{ - config: config, + config: config, + shutdown: make(chan struct{}), } } @@ -61,6 +67,13 @@ func (s *Server) Start() error { // Stop shuts down the server func (s *Server) Stop() error { + // Set the shutdown flag first to prevent new connections from starting streams + s.isShuttingDown.Store(true) + + // Signal all active connections to close + close(s.shutdown) + + // Then shut down the HTTP server if s.server != nil { return s.server.Shutdown() } @@ -80,6 +93,12 @@ func (s *Server) handleStatus(w http.ResponseWriter, r *http.Request) { // handleStream handles Server-Sent Events streaming of MQTT messages func (s *Server) handleStream(w http.ResponseWriter, r *http.Request) { + // Check if the server is shutting down + if s.isShuttingDown.Load() { + http.Error(w, "Server is shutting down", http.StatusServiceUnavailable) + return + } + // Check if broker is available if s.config.Broker == nil { http.Error(w, "MQTT broker not available", http.StatusServiceUnavailable) @@ -118,6 +137,14 @@ func (s *Server) handleStream(w http.ResponseWriter, r *http.Request) { s.config.Broker.Unsubscribe(packetChan) return + case <-s.shutdown: + // Server is shutting down, send a message to client and close + log.Println("Server shutting down, closing SSE connection") + fmt.Fprintf(w, "event: info\ndata: Server shutting down, connection closed\n\n") + flusher.Flush() + s.config.Broker.Unsubscribe(packetChan) + return + case packet, ok := <-packetChan: if !ok { // Channel closed, probably shutting down