fix: remove dead code, SSE error handling, bounded dedup maps

- Remove unused mustParseDuration from main.go
- Replace http.Error calls after SSE headers with plain returns;
  skip bad packets instead of killing the stream on marshal error
- Change processedPackets/seenPackets from boolean to timestamp values
  and prune entries older than 24h on each packet to prevent unbounded
  memory growth in long-running sessions

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Daniel Pupius
2026-03-15 16:27:57 +00:00
parent 9e5fd5bcae
commit 1d61a89505
4 changed files with 29 additions and 24 deletions

10
main.go
View File

@@ -107,16 +107,6 @@ func parseConfig() *Config {
return config
}
// Helper function to parse duration from environment
func mustParseDuration(durationStr string) time.Duration {
duration, err := time.ParseDuration(durationStr)
if err != nil {
fmt.Fprintf(os.Stderr, "Invalid duration format: %s\n", durationStr)
os.Exit(1)
}
return duration
}
// Helper function to parse duration from environment with default
func durationFromEnv(key string, defaultValue time.Duration) time.Duration {
envVal := getEnv(key, "")

View File

@@ -236,7 +236,6 @@ func (s *Server) handleStream(w http.ResponseWriter, r *http.Request) {
// Client disconnected, unsubscribe and return
logger.Info("Client disconnected, unsubscribing from broker")
s.config.Broker.Unsubscribe(packetChan)
http.Error(w, "Client disconnected", http.StatusGone)
return
case <-s.shutdown:
@@ -245,7 +244,6 @@ func (s *Server) handleStream(w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, "event: info\ndata: Server shutting down, connection closed\n\n")
flusher.Flush()
s.config.Broker.Unsubscribe(packetChan)
http.Error(w, "Server is shutting down", http.StatusServiceUnavailable)
return
case <-heartbeatTicker.C:
@@ -259,7 +257,6 @@ func (s *Server) handleStream(w http.ResponseWriter, r *http.Request) {
if !ok {
// Channel closed, probably shutting down
logger.Info("Packet channel closed, ending stream")
http.Error(w, "Server is shutting down", http.StatusServiceUnavailable)
return
}
@@ -278,8 +275,7 @@ func (s *Server) handleStream(w http.ResponseWriter, r *http.Request) {
data, err := marshaler.Marshal(packet)
if err != nil {
logger.Errorw("Error marshaling packet to JSON", "error", err)
http.Error(w, "Error marshaling packet", http.StatusInternalServerError)
return
continue
}
// Send the event

View File

@@ -72,8 +72,8 @@ interface AggregatorState {
channels: Record<string, ChannelData>;
messages: Record<string, TextMessage[]>;
selectedNodeId?: number;
// Track already processed packet IDs to prevent duplicates
processedPackets: Record<string, boolean>;
// Track already processed packet IDs to prevent duplicates (value = unix timestamp)
processedPackets: Record<string, number>;
}
const initialState: AggregatorState = {
@@ -108,11 +108,19 @@ const processPacket = (state: AggregatorState, packet: Packet) => {
const nodeIdHex = `!${nodeId.toString(16).toLowerCase()}`;
const packetKey = `${nodeIdHex}_${data.id}`;
// Prune processed packets older than 24 hours
const cutoff = timestamp - 86400;
for (const key of Object.keys(state.processedPackets)) {
if (state.processedPackets[key] < cutoff) {
delete state.processedPackets[key];
}
}
// Check if we've already processed this packet
const isNewPacket = !state.processedPackets[packetKey];
// Always mark this packet as processed
state.processedPackets[packetKey] = true;
// Always mark this packet as processed with its timestamp
state.processedPackets[packetKey] = timestamp;
// Update gateway data
// Handle both cases:

View File

@@ -10,7 +10,7 @@ interface PacketState {
error: string | null;
streamPaused: boolean;
bufferedPackets: Packet[]; // Holds packets received while paused
seenPackets: Record<string, boolean>; // Tracks already seen packet IDs to prevent duplicates
seenPackets: Record<string, number>; // Tracks already seen packet IDs (value = unix timestamp) to prevent duplicates
}
const initialState: PacketState = {
@@ -19,7 +19,7 @@ const initialState: PacketState = {
error: null,
streamPaused: false,
bufferedPackets: [],
seenPackets: {}, // Empty object to track seen packets
seenPackets: {}, // Empty object to track seen packets (key → unix timestamp)
};
const packetSlice = createSlice({
@@ -40,7 +40,8 @@ const packetSlice = createSlice({
const packetKey = `${nodeId}_${packet.data.id}`;
if (!state.seenPackets[packetKey]) {
state.seenPackets[packetKey] = true;
const ts = packet.data.rxTime || Math.floor(Date.now() / 1000);
state.seenPackets[packetKey] = ts;
uniquePackets.push(packet);
}
} else {
@@ -65,6 +66,16 @@ const packetSlice = createSlice({
return;
}
const now = packet.data.rxTime || Math.floor(Date.now() / 1000);
// Prune seen packets older than 24 hours
const cutoff = now - 86400;
for (const key of Object.keys(state.seenPackets)) {
if (state.seenPackets[key] < cutoff) {
delete state.seenPackets[key];
}
}
// Create a Meshtastic node ID format
const nodeId = `!${packet.data.from.toString(16).toLowerCase()}`;
const packetKey = `${nodeId}_${packet.data.id}`;
@@ -75,8 +86,8 @@ const packetSlice = createSlice({
return;
}
// Mark this packet as seen
state.seenPackets[packetKey] = true;
// Mark this packet as seen with its timestamp
state.seenPackets[packetKey] = now;
if (state.streamPaused) {
// When paused, add to buffer instead of main list