mirror of
https://github.com/dpup/meshstream.git
synced 2026-03-28 17:42:37 +01:00
Remove stats listener since it is noisy in prod
This commit is contained in:
11
main.go
11
main.go
@@ -92,9 +92,6 @@ func parseConfig() *Config {
|
||||
channelKeysDefault := getEnv("CHANNEL_KEYS", "LongFast:"+decoder.DefaultPrivateKey)
|
||||
channelKeysFlag := flag.String("channel-keys", channelKeysDefault, "Comma-separated list of channel:key pairs for encrypted channels")
|
||||
|
||||
// Statistics configuration
|
||||
statsIntervalStr := getEnv("STATS_INTERVAL", "30s")
|
||||
flag.DurationVar(&config.StatsInterval, "stats-interval", mustParseDuration(statsIntervalStr), "Interval for statistics reporting")
|
||||
flag.IntVar(&config.CacheSize, "cache-size", intFromEnv("CACHE_SIZE", 50), "Number of packets to cache for new subscribers")
|
||||
flag.BoolVar(&config.VerboseLogging, "verbose", boolFromEnv("VERBOSE_LOGGING", false), "Enable verbose message logging")
|
||||
|
||||
@@ -224,11 +221,6 @@ func main() {
|
||||
broker := mqtt.NewBroker(messagesChan, config.CacheSize, logger)
|
||||
logger.Infof("Message broker initialized with cache size: %d", config.CacheSize)
|
||||
|
||||
// Create a stats tracker that subscribes to the broker
|
||||
// with statistics printed based on configured interval
|
||||
stats := mqtt.NewMessageStats(broker, config.StatsInterval, logger)
|
||||
logger.Infof("Stats tracker initialized with interval: %s", config.StatsInterval)
|
||||
|
||||
// Create a message logger that subscribes to the broker
|
||||
// and also logs to stdout
|
||||
messageLogger, err := mqtt.NewMessageLogger(
|
||||
@@ -287,9 +279,6 @@ func main() {
|
||||
messageLogger.Close()
|
||||
}
|
||||
|
||||
// Stop the stats collector
|
||||
stats.Close()
|
||||
|
||||
// Close the broker (which will close all subscriber channels)
|
||||
broker.Close()
|
||||
|
||||
|
||||
126
mqtt/stats.go
126
mqtt/stats.go
@@ -1,126 +0,0 @@
|
||||
package mqtt
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/dpup/prefab/logging"
|
||||
|
||||
meshtreampb "meshstream/generated/meshstream"
|
||||
pb "meshstream/generated/meshtastic"
|
||||
)
|
||||
|
||||
// MessageStats tracks statistics about received messages
|
||||
type MessageStats struct {
|
||||
*BaseSubscriber
|
||||
sync.Mutex
|
||||
TotalMessages int
|
||||
ByNode map[uint32]int
|
||||
ByPortType map[pb.PortNum]int
|
||||
LastStatsPrinted time.Time
|
||||
ticker *time.Ticker
|
||||
logger logging.Logger
|
||||
}
|
||||
|
||||
// NewMessageStats creates a new MessageStats instance
|
||||
func NewMessageStats(broker *Broker, printInterval time.Duration, logger logging.Logger) *MessageStats {
|
||||
statsLogger := logger.Named("mqtt.MessageStats")
|
||||
|
||||
s := &MessageStats{
|
||||
ByNode: make(map[uint32]int),
|
||||
ByPortType: make(map[pb.PortNum]int),
|
||||
LastStatsPrinted: time.Now(),
|
||||
ticker: time.NewTicker(printInterval),
|
||||
logger: statsLogger,
|
||||
}
|
||||
|
||||
// Create base subscriber with stats message handler
|
||||
s.BaseSubscriber = NewBaseSubscriber(SubscriberConfig{
|
||||
Name: "MessageStats",
|
||||
Broker: broker,
|
||||
BufferSize: 50,
|
||||
Processor: s.recordMessage,
|
||||
StartHook: func() { go s.runTicker() },
|
||||
CloseHook: func() { s.ticker.Stop() },
|
||||
Logger: statsLogger,
|
||||
})
|
||||
|
||||
// Start processing messages
|
||||
s.Start()
|
||||
|
||||
return s
|
||||
}
|
||||
|
||||
// runTicker handles the periodic stats printing
|
||||
func (s *MessageStats) runTicker() {
|
||||
for {
|
||||
select {
|
||||
case <-s.ticker.C:
|
||||
s.PrintStats()
|
||||
case <-s.done:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// recordMessage records a message in the statistics
|
||||
func (s *MessageStats) recordMessage(packet *meshtreampb.Packet) {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
|
||||
s.TotalMessages++
|
||||
|
||||
// Count by source node
|
||||
s.ByNode[packet.Data.From]++
|
||||
|
||||
// Count by port type
|
||||
s.ByPortType[packet.Data.PortNum]++
|
||||
}
|
||||
|
||||
// PrintStats logs current statistics using the structured logger
|
||||
func (s *MessageStats) PrintStats() {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
|
||||
now := time.Now()
|
||||
duration := now.Sub(s.LastStatsPrinted)
|
||||
msgPerSec := float64(s.TotalMessages) / duration.Seconds()
|
||||
|
||||
// Log the basic statistics with structured fields
|
||||
s.logger.Infow("Message Statistics Summary",
|
||||
"totalMessages", s.TotalMessages,
|
||||
"messagesPerSecond", msgPerSec,
|
||||
"durationSeconds", duration.Seconds(),
|
||||
)
|
||||
|
||||
// Create maps for structured node and port stats
|
||||
nodeStats := make(map[string]int)
|
||||
for nodeID, count := range s.ByNode {
|
||||
nodeStats[fmt.Sprintf("node.%d", nodeID)] = count
|
||||
}
|
||||
|
||||
// Log node statistics with structured fields
|
||||
s.logger.Infow("Messages by Node",
|
||||
"nodeCounts", nodeStats,
|
||||
"activeNodes", len(s.ByNode),
|
||||
)
|
||||
|
||||
// Create maps for structured port stats
|
||||
portStats := make(map[string]int)
|
||||
for portType, count := range s.ByPortType {
|
||||
portStats[portType.String()] = count
|
||||
}
|
||||
|
||||
// Log port type statistics with structured fields
|
||||
s.logger.Infow("Messages by Port Type",
|
||||
"portCounts", portStats,
|
||||
"activePorts", len(s.ByPortType),
|
||||
)
|
||||
|
||||
// Reset counters for rate calculation
|
||||
s.TotalMessages = 0
|
||||
s.ByNode = make(map[uint32]int)
|
||||
s.ByPortType = make(map[pb.PortNum]int)
|
||||
s.LastStatsPrinted = now
|
||||
}
|
||||
Reference in New Issue
Block a user