mirror of
https://github.com/dpup/meshstream.git
synced 2026-03-28 17:42:37 +01:00
Inject loggers from main.go instead of creating them locally
- Update all components to accept a logger parameter - Add default fallbacks when logger is not provided - Ensure consistent logger naming with parent.Named() pattern - Create a proper logger hierarchy originating from main.go - Update MessageLogger to pass logger to BaseSubscriber
This commit is contained in:
7
main.go
7
main.go
@@ -50,7 +50,7 @@ func main() {
|
||||
Topic: mqttTopicPrefix + "/#",
|
||||
}
|
||||
|
||||
mqttClient := mqtt.NewClient(mqttConfig)
|
||||
mqttClient := mqtt.NewClient(mqttConfig, logger)
|
||||
|
||||
// Connect to the MQTT broker
|
||||
if err := mqttClient.Connect(); err != nil {
|
||||
@@ -65,7 +65,7 @@ func main() {
|
||||
|
||||
// Create a stats tracker that subscribes to the broker
|
||||
// with statistics printed every 30 seconds
|
||||
stats := mqtt.NewMessageStats(broker, 30*time.Second)
|
||||
stats := mqtt.NewMessageStats(broker, 30*time.Second, logger)
|
||||
|
||||
// Create a message logger that subscribes to the broker
|
||||
// and also logs to stdout with a separator
|
||||
@@ -74,6 +74,7 @@ func main() {
|
||||
logsDir,
|
||||
true, // Enable logging to stdout
|
||||
strings.Repeat("-", 80), // Use separator
|
||||
logger,
|
||||
)
|
||||
if err != nil {
|
||||
logger.Warnw("Failed to initialize message logger", "error", err)
|
||||
@@ -84,7 +85,7 @@ func main() {
|
||||
Host: serverHost,
|
||||
Port: serverPort,
|
||||
Broker: broker,
|
||||
})
|
||||
}, logger)
|
||||
|
||||
// Start the server in a goroutine
|
||||
go func() {
|
||||
|
||||
@@ -29,14 +29,18 @@ type Client struct {
|
||||
}
|
||||
|
||||
// NewClient creates a new MQTT client with the provided configuration
|
||||
func NewClient(config Config) *Client {
|
||||
logger := logging.NewDevLogger().Named("mqtt.client")
|
||||
func NewClient(config Config, logger logging.Logger) *Client {
|
||||
// Use provided logger or create a default one
|
||||
if logger == nil {
|
||||
logger = logging.NewDevLogger()
|
||||
}
|
||||
clientLogger := logger.Named("mqtt.client")
|
||||
|
||||
return &Client{
|
||||
config: config,
|
||||
decodedMessages: make(chan *Packet, 100), // Buffer up to 100 messages
|
||||
done: make(chan struct{}),
|
||||
logger: logger,
|
||||
logger: clientLogger,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -29,14 +29,17 @@ type MessageLogger struct {
|
||||
}
|
||||
|
||||
// NewMessageLogger creates a new message logger that subscribes to the broker
|
||||
func NewMessageLogger(broker *Broker, logDir string, logToStdout bool, stdoutSeparator string) (*MessageLogger, error) {
|
||||
func NewMessageLogger(broker *Broker, logDir string, logToStdout bool, stdoutSeparator string, logger logging.Logger) (*MessageLogger, error) {
|
||||
// Ensure log directory exists
|
||||
if err := os.MkdirAll(logDir, 0755); err != nil {
|
||||
return nil, fmt.Errorf("failed to create log directory: %v", err)
|
||||
}
|
||||
|
||||
// Create a logger with appropriate name
|
||||
logger := logging.NewDevLogger().Named("mqtt.message_logger")
|
||||
// Use provided logger or create a default one
|
||||
if logger == nil {
|
||||
logger = logging.NewDevLogger()
|
||||
}
|
||||
messageLoggerLogger := logger.Named("mqtt.message_logger")
|
||||
|
||||
ml := &MessageLogger{
|
||||
logDir: logDir,
|
||||
@@ -44,7 +47,7 @@ func NewMessageLogger(broker *Broker, logDir string, logToStdout bool, stdoutSep
|
||||
files: make(map[pb.PortNum]*os.File),
|
||||
logToStdout: logToStdout,
|
||||
stdoutSeparator: stdoutSeparator,
|
||||
logger: logger,
|
||||
logger: messageLoggerLogger,
|
||||
}
|
||||
|
||||
// Create base subscriber with logger's message handler
|
||||
@@ -54,6 +57,7 @@ func NewMessageLogger(broker *Broker, logDir string, logToStdout bool, stdoutSep
|
||||
BufferSize: 100,
|
||||
Processor: ml.logMessage,
|
||||
CloseHook: ml.closeLogFiles,
|
||||
Logger: logger,
|
||||
})
|
||||
|
||||
// Start processing messages
|
||||
|
||||
@@ -23,16 +23,19 @@ type MessageStats struct {
|
||||
}
|
||||
|
||||
// NewMessageStats creates a new MessageStats instance
|
||||
func NewMessageStats(broker *Broker, printInterval time.Duration) *MessageStats {
|
||||
// Create a logger for stats
|
||||
logger := logging.NewDevLogger().Named("mqtt.stats")
|
||||
func NewMessageStats(broker *Broker, printInterval time.Duration, logger logging.Logger) *MessageStats {
|
||||
// Use the provided logger or create a default one
|
||||
if logger == nil {
|
||||
logger = logging.NewDevLogger()
|
||||
}
|
||||
statsLogger := logger.Named("mqtt.stats")
|
||||
|
||||
s := &MessageStats{
|
||||
ByNode: make(map[uint32]int),
|
||||
ByPortType: make(map[pb.PortNum]int),
|
||||
LastStatsPrinted: time.Now(),
|
||||
ticker: time.NewTicker(printInterval),
|
||||
logger: logger,
|
||||
logger: statsLogger,
|
||||
}
|
||||
|
||||
// Create base subscriber with stats message handler
|
||||
|
||||
@@ -14,6 +14,7 @@ type SubscriberConfig struct {
|
||||
Processor func(*Packet) // Function to process each packet
|
||||
StartHook func() // Optional hook called when starting
|
||||
CloseHook func() // Optional hook called when closing
|
||||
Logger logging.Logger // Logger instance to use
|
||||
}
|
||||
|
||||
// BaseSubscriber implements common subscriber functionality
|
||||
@@ -32,8 +33,13 @@ type BaseSubscriber struct {
|
||||
|
||||
// NewBaseSubscriber creates a new base subscriber
|
||||
func NewBaseSubscriber(config SubscriberConfig) *BaseSubscriber {
|
||||
// Create a logger for this subscriber
|
||||
logger := logging.NewDevLogger().Named("mqtt.subscriber." + config.Name)
|
||||
// Use provided logger or create a default one
|
||||
var subscriberLogger logging.Logger
|
||||
if config.Logger == nil {
|
||||
subscriberLogger = logging.NewDevLogger().Named("mqtt.subscriber." + config.Name)
|
||||
} else {
|
||||
subscriberLogger = config.Logger.Named("mqtt.subscriber." + config.Name)
|
||||
}
|
||||
|
||||
return &BaseSubscriber{
|
||||
broker: config.Broker,
|
||||
@@ -43,7 +49,7 @@ func NewBaseSubscriber(config SubscriberConfig) *BaseSubscriber {
|
||||
startHook: config.StartHook,
|
||||
closeHook: config.CloseHook,
|
||||
BufferSize: config.BufferSize,
|
||||
logger: logger,
|
||||
logger: subscriberLogger,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -34,18 +34,21 @@ type Server struct {
|
||||
}
|
||||
|
||||
// New creates a new server instance
|
||||
func New(config Config) *Server {
|
||||
// Create a named logger
|
||||
logger := logging.NewDevLogger().Named("server")
|
||||
func New(config Config, logger logging.Logger) *Server {
|
||||
// Use provided logger or create a default one
|
||||
if logger == nil {
|
||||
logger = logging.NewDevLogger()
|
||||
}
|
||||
serverLogger := logger.Named("server")
|
||||
|
||||
if config.Broker == nil {
|
||||
logger.Info("Warning: Server created without a broker, streaming will not work")
|
||||
serverLogger.Info("Warning: Server created without a broker, streaming will not work")
|
||||
}
|
||||
|
||||
return &Server{
|
||||
config: config,
|
||||
shutdown: make(chan struct{}),
|
||||
logger: logger,
|
||||
logger: serverLogger,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user