From 1d61a8950559ebb53ff4769eb1ff7fdedb4ffcae Mon Sep 17 00:00:00 2001 From: Daniel Pupius Date: Sun, 15 Mar 2026 16:27:57 +0000 Subject: [PATCH] fix: remove dead code, SSE error handling, bounded dedup maps - Remove unused mustParseDuration from main.go - Replace http.Error calls after SSE headers with plain returns; skip bad packets instead of killing the stream on marshal error - Change processedPackets/seenPackets from boolean to timestamp values and prune entries older than 24h on each packet to prevent unbounded memory growth in long-running sessions Co-Authored-By: Claude Sonnet 4.6 --- main.go | 10 ---------- server/server.go | 6 +----- web/src/store/slices/aggregatorSlice.ts | 16 ++++++++++++---- web/src/store/slices/packetSlice.ts | 21 ++++++++++++++++----- 4 files changed, 29 insertions(+), 24 deletions(-) diff --git a/main.go b/main.go index add494d..94224c4 100644 --- a/main.go +++ b/main.go @@ -107,16 +107,6 @@ func parseConfig() *Config { return config } -// Helper function to parse duration from environment -func mustParseDuration(durationStr string) time.Duration { - duration, err := time.ParseDuration(durationStr) - if err != nil { - fmt.Fprintf(os.Stderr, "Invalid duration format: %s\n", durationStr) - os.Exit(1) - } - return duration -} - // Helper function to parse duration from environment with default func durationFromEnv(key string, defaultValue time.Duration) time.Duration { envVal := getEnv(key, "") diff --git a/server/server.go b/server/server.go index 23c6f84..ad4f240 100644 --- a/server/server.go +++ b/server/server.go @@ -236,7 +236,6 @@ func (s *Server) handleStream(w http.ResponseWriter, r *http.Request) { // Client disconnected, unsubscribe and return logger.Info("Client disconnected, unsubscribing from broker") s.config.Broker.Unsubscribe(packetChan) - http.Error(w, "Client disconnected", http.StatusGone) return case <-s.shutdown: @@ -245,7 +244,6 @@ func (s *Server) handleStream(w http.ResponseWriter, r *http.Request) { fmt.Fprintf(w, "event: info\ndata: Server shutting down, connection closed\n\n") flusher.Flush() s.config.Broker.Unsubscribe(packetChan) - http.Error(w, "Server is shutting down", http.StatusServiceUnavailable) return case <-heartbeatTicker.C: @@ -259,7 +257,6 @@ func (s *Server) handleStream(w http.ResponseWriter, r *http.Request) { if !ok { // Channel closed, probably shutting down logger.Info("Packet channel closed, ending stream") - http.Error(w, "Server is shutting down", http.StatusServiceUnavailable) return } @@ -278,8 +275,7 @@ func (s *Server) handleStream(w http.ResponseWriter, r *http.Request) { data, err := marshaler.Marshal(packet) if err != nil { logger.Errorw("Error marshaling packet to JSON", "error", err) - http.Error(w, "Error marshaling packet", http.StatusInternalServerError) - return + continue } // Send the event diff --git a/web/src/store/slices/aggregatorSlice.ts b/web/src/store/slices/aggregatorSlice.ts index 09ace43..601c89e 100644 --- a/web/src/store/slices/aggregatorSlice.ts +++ b/web/src/store/slices/aggregatorSlice.ts @@ -72,8 +72,8 @@ interface AggregatorState { channels: Record; messages: Record; selectedNodeId?: number; - // Track already processed packet IDs to prevent duplicates - processedPackets: Record; + // Track already processed packet IDs to prevent duplicates (value = unix timestamp) + processedPackets: Record; } const initialState: AggregatorState = { @@ -108,11 +108,19 @@ const processPacket = (state: AggregatorState, packet: Packet) => { const nodeIdHex = `!${nodeId.toString(16).toLowerCase()}`; const packetKey = `${nodeIdHex}_${data.id}`; + // Prune processed packets older than 24 hours + const cutoff = timestamp - 86400; + for (const key of Object.keys(state.processedPackets)) { + if (state.processedPackets[key] < cutoff) { + delete state.processedPackets[key]; + } + } + // Check if we've already processed this packet const isNewPacket = !state.processedPackets[packetKey]; - // Always mark this packet as processed - state.processedPackets[packetKey] = true; + // Always mark this packet as processed with its timestamp + state.processedPackets[packetKey] = timestamp; // Update gateway data // Handle both cases: diff --git a/web/src/store/slices/packetSlice.ts b/web/src/store/slices/packetSlice.ts index 2330dfd..a2be10f 100644 --- a/web/src/store/slices/packetSlice.ts +++ b/web/src/store/slices/packetSlice.ts @@ -10,7 +10,7 @@ interface PacketState { error: string | null; streamPaused: boolean; bufferedPackets: Packet[]; // Holds packets received while paused - seenPackets: Record; // Tracks already seen packet IDs to prevent duplicates + seenPackets: Record; // Tracks already seen packet IDs (value = unix timestamp) to prevent duplicates } const initialState: PacketState = { @@ -19,7 +19,7 @@ const initialState: PacketState = { error: null, streamPaused: false, bufferedPackets: [], - seenPackets: {}, // Empty object to track seen packets + seenPackets: {}, // Empty object to track seen packets (key → unix timestamp) }; const packetSlice = createSlice({ @@ -40,7 +40,8 @@ const packetSlice = createSlice({ const packetKey = `${nodeId}_${packet.data.id}`; if (!state.seenPackets[packetKey]) { - state.seenPackets[packetKey] = true; + const ts = packet.data.rxTime || Math.floor(Date.now() / 1000); + state.seenPackets[packetKey] = ts; uniquePackets.push(packet); } } else { @@ -65,6 +66,16 @@ const packetSlice = createSlice({ return; } + const now = packet.data.rxTime || Math.floor(Date.now() / 1000); + + // Prune seen packets older than 24 hours + const cutoff = now - 86400; + for (const key of Object.keys(state.seenPackets)) { + if (state.seenPackets[key] < cutoff) { + delete state.seenPackets[key]; + } + } + // Create a Meshtastic node ID format const nodeId = `!${packet.data.from.toString(16).toLowerCase()}`; const packetKey = `${nodeId}_${packet.data.id}`; @@ -75,8 +86,8 @@ const packetSlice = createSlice({ return; } - // Mark this packet as seen - state.seenPackets[packetKey] = true; + // Mark this packet as seen with its timestamp + state.seenPackets[packetKey] = now; if (state.streamPaused) { // When paused, add to buffer instead of main list