From b52834dd8a76d98f82f01a8efa62b65837e39a2e Mon Sep 17 00:00:00 2001 From: Daniel Pupius Date: Mon, 21 Apr 2025 09:17:05 -0700 Subject: [PATCH] Add BaseSubscriber to reduce duplication between subscribers --- mqtt/logger.go | 66 +++++++++------------------ mqtt/stats.go | 60 +++++++++---------------- mqtt/subscriber.go | 109 +++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 149 insertions(+), 86 deletions(-) create mode 100644 mqtt/subscriber.go diff --git a/mqtt/logger.go b/mqtt/logger.go index de37ef4..fd3a85e 100644 --- a/mqtt/logger.go +++ b/mqtt/logger.go @@ -16,14 +16,11 @@ import ( // MessageLogger logs messages to files and optionally to stdout type MessageLogger struct { + *BaseSubscriber logDir string - broker *Broker - subscriber <-chan *Packet loggers map[pb.PortNum]*log.Logger files map[pb.PortNum]*os.File mutex sync.Mutex - done chan struct{} - wg sync.WaitGroup logToStdout bool // Flag to enable console output stdoutSeparator string // Separator for console output } @@ -37,43 +34,35 @@ func NewMessageLogger(broker *Broker, logDir string, logToStdout bool, stdoutSep ml := &MessageLogger{ logDir: logDir, - broker: broker, loggers: make(map[pb.PortNum]*log.Logger), files: make(map[pb.PortNum]*os.File), - done: make(chan struct{}), logToStdout: logToStdout, stdoutSeparator: stdoutSeparator, } - // Subscribe to the broker with a large buffer - ml.subscriber = broker.Subscribe(100) + // Create base subscriber with logger's message handler + ml.BaseSubscriber = NewBaseSubscriber(SubscriberConfig{ + Name: "MessageLogger", + Broker: broker, + BufferSize: 100, + Processor: ml.logMessage, + CloseHook: ml.closeLogFiles, + }) // Start processing messages - ml.wg.Add(1) - go ml.run() + ml.Start() return ml, nil } -// run processes incoming messages -func (ml *MessageLogger) run() { - defer ml.wg.Done() +// closeLogFiles is called when the subscriber is closed +func (ml *MessageLogger) closeLogFiles() { + ml.mutex.Lock() + defer ml.mutex.Unlock() - for { - select { - case packet, ok := <-ml.subscriber: - if !ok { - // Channel closed - return - } - - if packet != nil { - ml.logMessage(packet) - } - - case <-ml.done: - return - } + for portNum, file := range ml.files { + log.Printf("Closing log file for %s", portNum) + file.Close() } } @@ -133,23 +122,8 @@ func (ml *MessageLogger) logMessage(packet *Packet) { } } -// Close stops the logger and closes all log files +// Close overrides the BaseSubscriber.Close method func (ml *MessageLogger) Close() { - // Signal the processing loop to stop - close(ml.done) - - // Unsubscribe from the broker - ml.broker.Unsubscribe(ml.subscriber) - - // Wait for the processing loop to exit - ml.wg.Wait() - - // Close all log files - ml.mutex.Lock() - defer ml.mutex.Unlock() - - for portNum, file := range ml.files { - log.Printf("Closing log file for %s", portNum) - file.Close() - } + // Call the base close method which will handle unsubscribe and closeLogFiles + ml.BaseSubscriber.Close() } \ No newline at end of file diff --git a/mqtt/stats.go b/mqtt/stats.go index 10879a8..33c3576 100644 --- a/mqtt/stats.go +++ b/mqtt/stats.go @@ -11,60 +11,46 @@ import ( // MessageStats tracks statistics about received messages type MessageStats struct { + *BaseSubscriber sync.Mutex - broker *Broker - subscriber <-chan *Packet TotalMessages int ByNode map[uint32]int ByPortType map[pb.PortNum]int LastStatsPrinted time.Time - done chan struct{} - wg sync.WaitGroup + ticker *time.Ticker } // NewMessageStats creates a new MessageStats instance func NewMessageStats(broker *Broker, printInterval time.Duration) *MessageStats { s := &MessageStats{ - broker: broker, ByNode: make(map[uint32]int), ByPortType: make(map[pb.PortNum]int), LastStatsPrinted: time.Now(), - done: make(chan struct{}), + ticker: time.NewTicker(printInterval), } - // Subscribe to the broker with a larger buffer to handle bursts - s.subscriber = broker.Subscribe(50) + // Create base subscriber with stats message handler + s.BaseSubscriber = NewBaseSubscriber(SubscriberConfig{ + Name: "MessageStats", + Broker: broker, + BufferSize: 50, + Processor: s.recordMessage, + StartHook: func() { go s.runTicker() }, + CloseHook: func() { s.ticker.Stop() }, + }) - // Start the collection loop - s.wg.Add(1) - go s.run(printInterval) + // Start processing messages + s.Start() return s } -// run handles statistics collection and periodic printing -func (s *MessageStats) run(printInterval time.Duration) { - defer s.wg.Done() - - // Create a ticker for periodically printing stats - statsTicker := time.NewTicker(printInterval) - defer statsTicker.Stop() - +// runTicker handles the periodic stats printing +func (s *MessageStats) runTicker() { for { select { - case packet, ok := <-s.subscriber: - if !ok { - // Channel closed - return - } - - if packet != nil { - s.recordMessage(packet) - } - - case <-statsTicker.C: + case <-s.ticker.C: s.PrintStats() - case <-s.done: return } @@ -117,14 +103,8 @@ func (s *MessageStats) PrintStats() { s.LastStatsPrinted = now } -// Close stops the stats collector and unsubscribes from the broker +// Close overrides the BaseSubscriber.Close method func (s *MessageStats) Close() { - // Signal the collection loop to stop - close(s.done) - - // Unsubscribe from the broker - s.broker.Unsubscribe(s.subscriber) - - // Wait for the collection loop to exit - s.wg.Wait() + // Call the base close method which will handle unsubscribe and stop the ticker + s.BaseSubscriber.Close() } \ No newline at end of file diff --git a/mqtt/subscriber.go b/mqtt/subscriber.go new file mode 100644 index 0000000..19c7da4 --- /dev/null +++ b/mqtt/subscriber.go @@ -0,0 +1,109 @@ +package mqtt + +import ( + "log" + "sync" +) + +// SubscriberConfig holds configuration for creating a subscriber +type SubscriberConfig struct { + Name string // Descriptive name for the subscriber + Broker *Broker // The broker to subscribe to + BufferSize int // Channel buffer size + Processor func(*Packet) // Function to process each packet + StartHook func() // Optional hook called when starting + CloseHook func() // Optional hook called when closing +} + +// BaseSubscriber implements common subscriber functionality +type BaseSubscriber struct { + broker *Broker + channel <-chan *Packet + done chan struct{} + wg sync.WaitGroup + name string + processor func(*Packet) + startHook func() + closeHook func() + BufferSize int +} + +// NewBaseSubscriber creates a new base subscriber +func NewBaseSubscriber(config SubscriberConfig) *BaseSubscriber { + return &BaseSubscriber{ + broker: config.Broker, + name: config.Name, + processor: config.Processor, + done: make(chan struct{}), + startHook: config.StartHook, + closeHook: config.CloseHook, + BufferSize: config.BufferSize, + } +} + +// Start begins subscriber processing +func (b *BaseSubscriber) Start() { + // Subscribe to the broker + b.channel = b.broker.Subscribe(b.BufferSize) + + // Call the start hook if provided + if b.startHook != nil { + b.startHook() + } + + // Start the processing loop + b.wg.Add(1) + go b.run() + + log.Printf("Subscriber %s started", b.name) +} + +// run processes messages from the channel +func (b *BaseSubscriber) run() { + defer b.wg.Done() + + for { + select { + case packet, ok := <-b.channel: + if !ok { + // Channel closed + log.Printf("Channel closed for subscriber %s", b.name) + return + } + + if packet != nil && b.processor != nil { + b.processor(packet) + } + + case <-b.done: + log.Printf("Subscriber %s received shutdown signal", b.name) + return + } + } +} + +// Close stops the subscriber and releases resources +func (b *BaseSubscriber) Close() { + log.Printf("Closing subscriber %s", b.name) + + // Signal the processing loop to stop + close(b.done) + + // Unsubscribe from the broker + b.broker.Unsubscribe(b.channel) + + // Wait for processing to finish + b.wg.Wait() + + // Call the close hook if provided + if b.closeHook != nil { + b.closeHook() + } + + log.Printf("Subscriber %s closed", b.name) +} + +// Name returns the subscriber's name +func (b *BaseSubscriber) Name() string { + return b.name +} \ No newline at end of file