From c6d94b10d1c6c49c07cf57e61387c55ef56f31ad Mon Sep 17 00:00:00 2001 From: Daniel Pupius Date: Mon, 21 Apr 2025 11:09:27 -0700 Subject: [PATCH] Inject loggers from main.go instead of creating them locally - Update all components to accept a logger parameter - Add default fallbacks when logger is not provided - Ensure consistent logger naming with parent.Named() pattern - Create a proper logger hierarchy originating from main.go - Update MessageLogger to pass logger to BaseSubscriber --- main.go | 7 ++++--- mqtt/client.go | 10 +++++++--- mqtt/logger.go | 12 ++++++++---- mqtt/stats.go | 11 +++++++---- mqtt/subscriber.go | 12 +++++++++--- server/server.go | 13 ++++++++----- 6 files changed, 43 insertions(+), 22 deletions(-) diff --git a/main.go b/main.go index 54a9af4..68c8b1c 100644 --- a/main.go +++ b/main.go @@ -50,7 +50,7 @@ func main() { Topic: mqttTopicPrefix + "/#", } - mqttClient := mqtt.NewClient(mqttConfig) + mqttClient := mqtt.NewClient(mqttConfig, logger) // Connect to the MQTT broker if err := mqttClient.Connect(); err != nil { @@ -65,7 +65,7 @@ func main() { // Create a stats tracker that subscribes to the broker // with statistics printed every 30 seconds - stats := mqtt.NewMessageStats(broker, 30*time.Second) + stats := mqtt.NewMessageStats(broker, 30*time.Second, logger) // Create a message logger that subscribes to the broker // and also logs to stdout with a separator @@ -74,6 +74,7 @@ func main() { logsDir, true, // Enable logging to stdout strings.Repeat("-", 80), // Use separator + logger, ) if err != nil { logger.Warnw("Failed to initialize message logger", "error", err) @@ -84,7 +85,7 @@ func main() { Host: serverHost, Port: serverPort, Broker: broker, - }) + }, logger) // Start the server in a goroutine go func() { diff --git a/mqtt/client.go b/mqtt/client.go index 1da2c8a..796dfc3 100644 --- a/mqtt/client.go +++ b/mqtt/client.go @@ -29,14 +29,18 @@ type Client struct { } // NewClient creates a new MQTT client with the provided configuration -func NewClient(config Config) *Client { - logger := logging.NewDevLogger().Named("mqtt.client") +func NewClient(config Config, logger logging.Logger) *Client { + // Use provided logger or create a default one + if logger == nil { + logger = logging.NewDevLogger() + } + clientLogger := logger.Named("mqtt.client") return &Client{ config: config, decodedMessages: make(chan *Packet, 100), // Buffer up to 100 messages done: make(chan struct{}), - logger: logger, + logger: clientLogger, } } diff --git a/mqtt/logger.go b/mqtt/logger.go index 8b619d9..6c87337 100644 --- a/mqtt/logger.go +++ b/mqtt/logger.go @@ -29,14 +29,17 @@ type MessageLogger struct { } // NewMessageLogger creates a new message logger that subscribes to the broker -func NewMessageLogger(broker *Broker, logDir string, logToStdout bool, stdoutSeparator string) (*MessageLogger, error) { +func NewMessageLogger(broker *Broker, logDir string, logToStdout bool, stdoutSeparator string, logger logging.Logger) (*MessageLogger, error) { // Ensure log directory exists if err := os.MkdirAll(logDir, 0755); err != nil { return nil, fmt.Errorf("failed to create log directory: %v", err) } - // Create a logger with appropriate name - logger := logging.NewDevLogger().Named("mqtt.message_logger") + // Use provided logger or create a default one + if logger == nil { + logger = logging.NewDevLogger() + } + messageLoggerLogger := logger.Named("mqtt.message_logger") ml := &MessageLogger{ logDir: logDir, @@ -44,7 +47,7 @@ func NewMessageLogger(broker *Broker, logDir string, logToStdout bool, stdoutSep files: make(map[pb.PortNum]*os.File), logToStdout: logToStdout, stdoutSeparator: stdoutSeparator, - logger: logger, + logger: messageLoggerLogger, } // Create base subscriber with logger's message handler @@ -54,6 +57,7 @@ func NewMessageLogger(broker *Broker, logDir string, logToStdout bool, stdoutSep BufferSize: 100, Processor: ml.logMessage, CloseHook: ml.closeLogFiles, + Logger: logger, }) // Start processing messages diff --git a/mqtt/stats.go b/mqtt/stats.go index ed4e10d..84c480f 100644 --- a/mqtt/stats.go +++ b/mqtt/stats.go @@ -23,16 +23,19 @@ type MessageStats struct { } // NewMessageStats creates a new MessageStats instance -func NewMessageStats(broker *Broker, printInterval time.Duration) *MessageStats { - // Create a logger for stats - logger := logging.NewDevLogger().Named("mqtt.stats") +func NewMessageStats(broker *Broker, printInterval time.Duration, logger logging.Logger) *MessageStats { + // Use the provided logger or create a default one + if logger == nil { + logger = logging.NewDevLogger() + } + statsLogger := logger.Named("mqtt.stats") s := &MessageStats{ ByNode: make(map[uint32]int), ByPortType: make(map[pb.PortNum]int), LastStatsPrinted: time.Now(), ticker: time.NewTicker(printInterval), - logger: logger, + logger: statsLogger, } // Create base subscriber with stats message handler diff --git a/mqtt/subscriber.go b/mqtt/subscriber.go index d6a1318..b8935aa 100644 --- a/mqtt/subscriber.go +++ b/mqtt/subscriber.go @@ -14,6 +14,7 @@ type SubscriberConfig struct { Processor func(*Packet) // Function to process each packet StartHook func() // Optional hook called when starting CloseHook func() // Optional hook called when closing + Logger logging.Logger // Logger instance to use } // BaseSubscriber implements common subscriber functionality @@ -32,8 +33,13 @@ type BaseSubscriber struct { // NewBaseSubscriber creates a new base subscriber func NewBaseSubscriber(config SubscriberConfig) *BaseSubscriber { - // Create a logger for this subscriber - logger := logging.NewDevLogger().Named("mqtt.subscriber." + config.Name) + // Use provided logger or create a default one + var subscriberLogger logging.Logger + if config.Logger == nil { + subscriberLogger = logging.NewDevLogger().Named("mqtt.subscriber." + config.Name) + } else { + subscriberLogger = config.Logger.Named("mqtt.subscriber." + config.Name) + } return &BaseSubscriber{ broker: config.Broker, @@ -43,7 +49,7 @@ func NewBaseSubscriber(config SubscriberConfig) *BaseSubscriber { startHook: config.StartHook, closeHook: config.CloseHook, BufferSize: config.BufferSize, - logger: logger, + logger: subscriberLogger, } } diff --git a/server/server.go b/server/server.go index 105be44..042ba11 100644 --- a/server/server.go +++ b/server/server.go @@ -34,18 +34,21 @@ type Server struct { } // New creates a new server instance -func New(config Config) *Server { - // Create a named logger - logger := logging.NewDevLogger().Named("server") +func New(config Config, logger logging.Logger) *Server { + // Use provided logger or create a default one + if logger == nil { + logger = logging.NewDevLogger() + } + serverLogger := logger.Named("server") if config.Broker == nil { - logger.Info("Warning: Server created without a broker, streaming will not work") + serverLogger.Info("Warning: Server created without a broker, streaming will not work") } return &Server{ config: config, shutdown: make(chan struct{}), - logger: logger, + logger: serverLogger, } }