Add graceful shutdown for SSE connections

Add atomic shutdown flag and shutdown channel to handle graceful
termination of Server-Sent Events connections. This ensures:

1. New connections are rejected with a proper status code during shutdown
2. Existing connections receive a notification before being closed
3. All subscriber channels are properly unsubscribed from the broker

This implementation safely handles multiple in-flight requests during
shutdown by using an atomic flag to track server state.

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
Daniel Pupius
2025-04-21 10:38:59 -07:00
parent 9b73057103
commit 9c438713de

View File

@@ -6,6 +6,7 @@ import (
"log"
"net/http"
"strconv"
"sync/atomic"
"time"
"github.com/dpup/prefab"
@@ -24,6 +25,10 @@ type Config struct {
type Server struct {
config Config
server *prefab.Server
// Channel to signal shutdown to active connections
shutdown chan struct{}
// Atomic flag to indicate if server is shutting down
isShuttingDown atomic.Bool
}
// New creates a new server instance
@@ -33,7 +38,8 @@ func New(config Config) *Server {
}
return &Server{
config: config,
config: config,
shutdown: make(chan struct{}),
}
}
@@ -61,6 +67,13 @@ func (s *Server) Start() error {
// Stop shuts down the server
func (s *Server) Stop() error {
// Set the shutdown flag first to prevent new connections from starting streams
s.isShuttingDown.Store(true)
// Signal all active connections to close
close(s.shutdown)
// Then shut down the HTTP server
if s.server != nil {
return s.server.Shutdown()
}
@@ -80,6 +93,12 @@ func (s *Server) handleStatus(w http.ResponseWriter, r *http.Request) {
// handleStream handles Server-Sent Events streaming of MQTT messages
func (s *Server) handleStream(w http.ResponseWriter, r *http.Request) {
// Check if the server is shutting down
if s.isShuttingDown.Load() {
http.Error(w, "Server is shutting down", http.StatusServiceUnavailable)
return
}
// Check if broker is available
if s.config.Broker == nil {
http.Error(w, "MQTT broker not available", http.StatusServiceUnavailable)
@@ -118,6 +137,14 @@ func (s *Server) handleStream(w http.ResponseWriter, r *http.Request) {
s.config.Broker.Unsubscribe(packetChan)
return
case <-s.shutdown:
// Server is shutting down, send a message to client and close
log.Println("Server shutting down, closing SSE connection")
fmt.Fprintf(w, "event: info\ndata: Server shutting down, connection closed\n\n")
flusher.Flush()
s.config.Broker.Unsubscribe(packetChan)
return
case packet, ok := <-packetChan:
if !ok {
// Channel closed, probably shutting down