From 0feb2591ef2b7e61636295e0ba644ae9e920e77f Mon Sep 17 00:00:00 2001 From: Daniel Pupius Date: Mon, 19 May 2025 10:59:33 -0700 Subject: [PATCH] Remove stats listener since it is noisy in prod --- main.go | 11 ----- mqtt/stats.go | 126 -------------------------------------------------- 2 files changed, 137 deletions(-) delete mode 100644 mqtt/stats.go diff --git a/main.go b/main.go index 5a96bf0..5b4ff3d 100644 --- a/main.go +++ b/main.go @@ -92,9 +92,6 @@ func parseConfig() *Config { channelKeysDefault := getEnv("CHANNEL_KEYS", "LongFast:"+decoder.DefaultPrivateKey) channelKeysFlag := flag.String("channel-keys", channelKeysDefault, "Comma-separated list of channel:key pairs for encrypted channels") - // Statistics configuration - statsIntervalStr := getEnv("STATS_INTERVAL", "30s") - flag.DurationVar(&config.StatsInterval, "stats-interval", mustParseDuration(statsIntervalStr), "Interval for statistics reporting") flag.IntVar(&config.CacheSize, "cache-size", intFromEnv("CACHE_SIZE", 50), "Number of packets to cache for new subscribers") flag.BoolVar(&config.VerboseLogging, "verbose", boolFromEnv("VERBOSE_LOGGING", false), "Enable verbose message logging") @@ -224,11 +221,6 @@ func main() { broker := mqtt.NewBroker(messagesChan, config.CacheSize, logger) logger.Infof("Message broker initialized with cache size: %d", config.CacheSize) - // Create a stats tracker that subscribes to the broker - // with statistics printed based on configured interval - stats := mqtt.NewMessageStats(broker, config.StatsInterval, logger) - logger.Infof("Stats tracker initialized with interval: %s", config.StatsInterval) - // Create a message logger that subscribes to the broker // and also logs to stdout messageLogger, err := mqtt.NewMessageLogger( @@ -287,9 +279,6 @@ func main() { messageLogger.Close() } - // Stop the stats collector - stats.Close() - // Close the broker (which will close all subscriber channels) broker.Close() diff --git a/mqtt/stats.go b/mqtt/stats.go deleted file mode 100644 index f956a48..0000000 --- a/mqtt/stats.go +++ /dev/null @@ -1,126 +0,0 @@ -package mqtt - -import ( - "fmt" - "sync" - "time" - - "github.com/dpup/prefab/logging" - - meshtreampb "meshstream/generated/meshstream" - pb "meshstream/generated/meshtastic" -) - -// MessageStats tracks statistics about received messages -type MessageStats struct { - *BaseSubscriber - sync.Mutex - TotalMessages int - ByNode map[uint32]int - ByPortType map[pb.PortNum]int - LastStatsPrinted time.Time - ticker *time.Ticker - logger logging.Logger -} - -// NewMessageStats creates a new MessageStats instance -func NewMessageStats(broker *Broker, printInterval time.Duration, logger logging.Logger) *MessageStats { - statsLogger := logger.Named("mqtt.MessageStats") - - s := &MessageStats{ - ByNode: make(map[uint32]int), - ByPortType: make(map[pb.PortNum]int), - LastStatsPrinted: time.Now(), - ticker: time.NewTicker(printInterval), - logger: statsLogger, - } - - // Create base subscriber with stats message handler - s.BaseSubscriber = NewBaseSubscriber(SubscriberConfig{ - Name: "MessageStats", - Broker: broker, - BufferSize: 50, - Processor: s.recordMessage, - StartHook: func() { go s.runTicker() }, - CloseHook: func() { s.ticker.Stop() }, - Logger: statsLogger, - }) - - // Start processing messages - s.Start() - - return s -} - -// runTicker handles the periodic stats printing -func (s *MessageStats) runTicker() { - for { - select { - case <-s.ticker.C: - s.PrintStats() - case <-s.done: - return - } - } -} - -// recordMessage records a message in the statistics -func (s *MessageStats) recordMessage(packet *meshtreampb.Packet) { - s.Lock() - defer s.Unlock() - - s.TotalMessages++ - - // Count by source node - s.ByNode[packet.Data.From]++ - - // Count by port type - s.ByPortType[packet.Data.PortNum]++ -} - -// PrintStats logs current statistics using the structured logger -func (s *MessageStats) PrintStats() { - s.Lock() - defer s.Unlock() - - now := time.Now() - duration := now.Sub(s.LastStatsPrinted) - msgPerSec := float64(s.TotalMessages) / duration.Seconds() - - // Log the basic statistics with structured fields - s.logger.Infow("Message Statistics Summary", - "totalMessages", s.TotalMessages, - "messagesPerSecond", msgPerSec, - "durationSeconds", duration.Seconds(), - ) - - // Create maps for structured node and port stats - nodeStats := make(map[string]int) - for nodeID, count := range s.ByNode { - nodeStats[fmt.Sprintf("node.%d", nodeID)] = count - } - - // Log node statistics with structured fields - s.logger.Infow("Messages by Node", - "nodeCounts", nodeStats, - "activeNodes", len(s.ByNode), - ) - - // Create maps for structured port stats - portStats := make(map[string]int) - for portType, count := range s.ByPortType { - portStats[portType.String()] = count - } - - // Log port type statistics with structured fields - s.logger.Infow("Messages by Port Type", - "portCounts", portStats, - "activePorts", len(s.ByPortType), - ) - - // Reset counters for rate calculation - s.TotalMessages = 0 - s.ByNode = make(map[uint32]int) - s.ByPortType = make(map[pb.PortNum]int) - s.LastStatsPrinted = now -}