Add BaseSubscriber to reduce duplication between subscribers

This commit is contained in:
Daniel Pupius
2025-04-21 09:17:05 -07:00
parent ef660b2025
commit b52834dd8a
3 changed files with 149 additions and 86 deletions

View File

@@ -16,14 +16,11 @@ import (
// MessageLogger logs messages to files and optionally to stdout
type MessageLogger struct {
*BaseSubscriber
logDir string
broker *Broker
subscriber <-chan *Packet
loggers map[pb.PortNum]*log.Logger
files map[pb.PortNum]*os.File
mutex sync.Mutex
done chan struct{}
wg sync.WaitGroup
logToStdout bool // Flag to enable console output
stdoutSeparator string // Separator for console output
}
@@ -37,43 +34,35 @@ func NewMessageLogger(broker *Broker, logDir string, logToStdout bool, stdoutSep
ml := &MessageLogger{
logDir: logDir,
broker: broker,
loggers: make(map[pb.PortNum]*log.Logger),
files: make(map[pb.PortNum]*os.File),
done: make(chan struct{}),
logToStdout: logToStdout,
stdoutSeparator: stdoutSeparator,
}
// Subscribe to the broker with a large buffer
ml.subscriber = broker.Subscribe(100)
// Create base subscriber with logger's message handler
ml.BaseSubscriber = NewBaseSubscriber(SubscriberConfig{
Name: "MessageLogger",
Broker: broker,
BufferSize: 100,
Processor: ml.logMessage,
CloseHook: ml.closeLogFiles,
})
// Start processing messages
ml.wg.Add(1)
go ml.run()
ml.Start()
return ml, nil
}
// run processes incoming messages
func (ml *MessageLogger) run() {
defer ml.wg.Done()
// closeLogFiles is called when the subscriber is closed
func (ml *MessageLogger) closeLogFiles() {
ml.mutex.Lock()
defer ml.mutex.Unlock()
for {
select {
case packet, ok := <-ml.subscriber:
if !ok {
// Channel closed
return
}
if packet != nil {
ml.logMessage(packet)
}
case <-ml.done:
return
}
for portNum, file := range ml.files {
log.Printf("Closing log file for %s", portNum)
file.Close()
}
}
@@ -133,23 +122,8 @@ func (ml *MessageLogger) logMessage(packet *Packet) {
}
}
// Close stops the logger and closes all log files
// Close overrides the BaseSubscriber.Close method
func (ml *MessageLogger) Close() {
// Signal the processing loop to stop
close(ml.done)
// Unsubscribe from the broker
ml.broker.Unsubscribe(ml.subscriber)
// Wait for the processing loop to exit
ml.wg.Wait()
// Close all log files
ml.mutex.Lock()
defer ml.mutex.Unlock()
for portNum, file := range ml.files {
log.Printf("Closing log file for %s", portNum)
file.Close()
}
// Call the base close method which will handle unsubscribe and closeLogFiles
ml.BaseSubscriber.Close()
}

View File

@@ -11,60 +11,46 @@ import (
// MessageStats tracks statistics about received messages
type MessageStats struct {
*BaseSubscriber
sync.Mutex
broker *Broker
subscriber <-chan *Packet
TotalMessages int
ByNode map[uint32]int
ByPortType map[pb.PortNum]int
LastStatsPrinted time.Time
done chan struct{}
wg sync.WaitGroup
ticker *time.Ticker
}
// NewMessageStats creates a new MessageStats instance
func NewMessageStats(broker *Broker, printInterval time.Duration) *MessageStats {
s := &MessageStats{
broker: broker,
ByNode: make(map[uint32]int),
ByPortType: make(map[pb.PortNum]int),
LastStatsPrinted: time.Now(),
done: make(chan struct{}),
ticker: time.NewTicker(printInterval),
}
// Subscribe to the broker with a larger buffer to handle bursts
s.subscriber = broker.Subscribe(50)
// Create base subscriber with stats message handler
s.BaseSubscriber = NewBaseSubscriber(SubscriberConfig{
Name: "MessageStats",
Broker: broker,
BufferSize: 50,
Processor: s.recordMessage,
StartHook: func() { go s.runTicker() },
CloseHook: func() { s.ticker.Stop() },
})
// Start the collection loop
s.wg.Add(1)
go s.run(printInterval)
// Start processing messages
s.Start()
return s
}
// run handles statistics collection and periodic printing
func (s *MessageStats) run(printInterval time.Duration) {
defer s.wg.Done()
// Create a ticker for periodically printing stats
statsTicker := time.NewTicker(printInterval)
defer statsTicker.Stop()
// runTicker handles the periodic stats printing
func (s *MessageStats) runTicker() {
for {
select {
case packet, ok := <-s.subscriber:
if !ok {
// Channel closed
return
}
if packet != nil {
s.recordMessage(packet)
}
case <-statsTicker.C:
case <-s.ticker.C:
s.PrintStats()
case <-s.done:
return
}
@@ -117,14 +103,8 @@ func (s *MessageStats) PrintStats() {
s.LastStatsPrinted = now
}
// Close stops the stats collector and unsubscribes from the broker
// Close overrides the BaseSubscriber.Close method
func (s *MessageStats) Close() {
// Signal the collection loop to stop
close(s.done)
// Unsubscribe from the broker
s.broker.Unsubscribe(s.subscriber)
// Wait for the collection loop to exit
s.wg.Wait()
// Call the base close method which will handle unsubscribe and stop the ticker
s.BaseSubscriber.Close()
}

109
mqtt/subscriber.go Normal file
View File

@@ -0,0 +1,109 @@
package mqtt
import (
"log"
"sync"
)
// SubscriberConfig holds configuration for creating a subscriber
type SubscriberConfig struct {
Name string // Descriptive name for the subscriber
Broker *Broker // The broker to subscribe to
BufferSize int // Channel buffer size
Processor func(*Packet) // Function to process each packet
StartHook func() // Optional hook called when starting
CloseHook func() // Optional hook called when closing
}
// BaseSubscriber implements common subscriber functionality
type BaseSubscriber struct {
broker *Broker
channel <-chan *Packet
done chan struct{}
wg sync.WaitGroup
name string
processor func(*Packet)
startHook func()
closeHook func()
BufferSize int
}
// NewBaseSubscriber creates a new base subscriber
func NewBaseSubscriber(config SubscriberConfig) *BaseSubscriber {
return &BaseSubscriber{
broker: config.Broker,
name: config.Name,
processor: config.Processor,
done: make(chan struct{}),
startHook: config.StartHook,
closeHook: config.CloseHook,
BufferSize: config.BufferSize,
}
}
// Start begins subscriber processing
func (b *BaseSubscriber) Start() {
// Subscribe to the broker
b.channel = b.broker.Subscribe(b.BufferSize)
// Call the start hook if provided
if b.startHook != nil {
b.startHook()
}
// Start the processing loop
b.wg.Add(1)
go b.run()
log.Printf("Subscriber %s started", b.name)
}
// run processes messages from the channel
func (b *BaseSubscriber) run() {
defer b.wg.Done()
for {
select {
case packet, ok := <-b.channel:
if !ok {
// Channel closed
log.Printf("Channel closed for subscriber %s", b.name)
return
}
if packet != nil && b.processor != nil {
b.processor(packet)
}
case <-b.done:
log.Printf("Subscriber %s received shutdown signal", b.name)
return
}
}
}
// Close stops the subscriber and releases resources
func (b *BaseSubscriber) Close() {
log.Printf("Closing subscriber %s", b.name)
// Signal the processing loop to stop
close(b.done)
// Unsubscribe from the broker
b.broker.Unsubscribe(b.channel)
// Wait for processing to finish
b.wg.Wait()
// Call the close hook if provided
if b.closeHook != nil {
b.closeHook()
}
log.Printf("Subscriber %s closed", b.name)
}
// Name returns the subscriber's name
func (b *BaseSubscriber) Name() string {
return b.name
}