diff --git a/main.go b/main.go index 2cf550a..54a9af4 100644 --- a/main.go +++ b/main.go @@ -2,13 +2,14 @@ package main import ( "fmt" - "log" "os" "os/signal" "strings" "syscall" "time" + "github.com/dpup/prefab/logging" + "meshstream/decoder" "meshstream/mqtt" "meshstream/server" @@ -28,16 +29,16 @@ const ( func main() { // Set up logging - log.SetOutput(os.Stdout) - + logger := logging.NewDevLogger().Named("main") + // Initialize default channel key err := decoder.AddChannelKey("LongFast", decoder.DefaultPrivateKey) if err != nil { - log.Printf("Failed to initialize default channel key: %v", err) + logger.Errorw("Failed to initialize default channel key", "error", err) } if err := decoder.AddChannelKey("ERSN", "VIuMtC5uDDJtC/ojdH314HLkDIHanX4LdbK5yViV9jA="); err != nil { - log.Printf("Failed to initialize ERSN channel key: %v", err) + logger.Errorw("Failed to initialize ERSN channel key", "error", err) } // Configure and create the MQTT client @@ -53,14 +54,14 @@ func main() { // Connect to the MQTT broker if err := mqttClient.Connect(); err != nil { - log.Fatalf("Failed to connect to MQTT broker: %v", err) + logger.Fatalw("Failed to connect to MQTT broker", "error", err) } // Get the messages channel to receive decoded messages messagesChan := mqttClient.Messages() // Create a message broker to distribute messages to multiple consumers - broker := mqtt.NewBroker(messagesChan) + broker := mqtt.NewBroker(messagesChan, logger) // Create a stats tracker that subscribes to the broker // with statistics printed every 30 seconds @@ -75,7 +76,7 @@ func main() { strings.Repeat("-", 80), // Use separator ) if err != nil { - log.Printf("Warning: Failed to initialize message logger: %v", err) + logger.Warnw("Failed to initialize message logger", "error", err) } // Start the web server @@ -88,7 +89,7 @@ func main() { // Start the server in a goroutine go func() { if err := webServer.Start(); err != nil { - log.Printf("Web server error: %v", err) + logger.Errorw("Web server error", "error", err) } }() @@ -111,7 +112,7 @@ func main() { // Close components in reverse order of creation // First stop the web server if err := webServer.Stop(); err != nil { - log.Printf("Error stopping web server: %v", err) + logger.Errorw("Error stopping web server", "error", err) } // Then stop the logger diff --git a/mqtt/broker.go b/mqtt/broker.go index 9e79a65..ea6c2eb 100644 --- a/mqtt/broker.go +++ b/mqtt/broker.go @@ -1,8 +1,9 @@ package mqtt import ( - "log" "sync" + + "github.com/dpup/prefab/logging" ) // Broker distributes messages from a source channel to multiple subscriber channels @@ -12,14 +13,22 @@ type Broker struct { subscriberMutex sync.RWMutex // Lock for modifying the subscribers map done chan struct{} // Signal to stop the dispatch loop wg sync.WaitGroup // Wait group to ensure clean shutdown + logger logging.Logger // Logger for broker operations } // NewBroker creates a new broker that distributes messages from sourceChannel to subscribers -func NewBroker(sourceChannel <-chan *Packet) *Broker { +func NewBroker(sourceChannel <-chan *Packet, logger logging.Logger) *Broker { + // Create a named logger if one was not provided + if logger == nil { + logger = logging.NewDevLogger() + } + brokerLogger := logger.Named("mqtt.broker") + broker := &Broker{ sourceChan: sourceChannel, subscribers: make(map[chan *Packet]struct{}), done: make(chan struct{}), + logger: brokerLogger, } // Start the dispatch loop @@ -46,7 +55,6 @@ func (b *Broker) Subscribe(bufferSize int) <-chan *Packet { // Unsubscribe removes a subscriber and closes its channel func (b *Broker) Unsubscribe(ch <-chan *Packet) { - b.subscriberMutex.Lock() defer b.subscriberMutex.Unlock() @@ -60,7 +68,7 @@ func (b *Broker) Unsubscribe(ch <-chan *Packet) { } // If we get here, the channel wasn't found - log.Println("Warning: Subscriber channel not found - cannot unsubscribe") + b.logger.Warn("Subscriber channel not found - cannot unsubscribe") } // Close shuts down the broker and closes all subscriber channels @@ -94,7 +102,7 @@ func (b *Broker) dispatchLoop() { case packet, ok := <-b.sourceChan: if !ok { // Source channel has been closed, shut down the broker - log.Println("Source channel closed, shutting down broker") + b.logger.Info("Source channel closed, shutting down broker") b.Close() return } @@ -122,7 +130,7 @@ func (b *Broker) broadcast(packet *Packet) { defer func() { if r := recover(); r != nil { // This can happen if the channel was closed after we took a snapshot - log.Println("Warning: Recovered from panic in broadcast, channel likely closed") + b.logger.Warn("Recovered from panic in broadcast, channel likely closed") } }() @@ -132,7 +140,7 @@ func (b *Broker) broadcast(packet *Packet) { // Message delivered successfully default: // Channel buffer is full, log warning and drop the message - log.Println("Warning: Subscriber buffer full, dropping message") + b.logger.Warn("Subscriber buffer full, dropping message") } }(ch) } diff --git a/mqtt/client.go b/mqtt/client.go index 421d076..1da2c8a 100644 --- a/mqtt/client.go +++ b/mqtt/client.go @@ -2,12 +2,12 @@ package mqtt import ( "fmt" - "log" "time" - "meshstream/decoder" - + "github.com/dpup/prefab/logging" mqtt "github.com/eclipse/paho.mqtt.golang" + + "meshstream/decoder" ) // Config holds configuration for the MQTT client @@ -25,14 +25,18 @@ type Client struct { client mqtt.Client decodedMessages chan *Packet done chan struct{} + logger logging.Logger } // NewClient creates a new MQTT client with the provided configuration func NewClient(config Config) *Client { + logger := logging.NewDevLogger().Named("mqtt.client") + return &Client{ config: config, decodedMessages: make(chan *Packet, 100), // Buffer up to 100 messages done: make(chan struct{}), + logger: logger, } } @@ -58,7 +62,7 @@ func (c *Client) Connect() error { // Subscribe to the configured topic token := c.client.Subscribe(c.config.Topic, 0, nil) token.Wait() - log.Printf("Subscribed to topic: %s\n", c.config.Topic) + c.logger.Infof("Subscribed to topic: %s", c.config.Topic) return nil } @@ -81,14 +85,16 @@ func (c *Client) Messages() <-chan *Packet { // messageHandler processes incoming MQTT messages func (c *Client) messageHandler(client mqtt.Client, msg mqtt.Message) { - log.Printf("Received message from topic: %s\n", msg.Topic()) + c.logger.Debugf("Received message from topic: %s", msg.Topic()) // Parse the topic structure topicInfo, err := decoder.ParseTopic(msg.Topic()) if err != nil { - log.Printf("Error parsing topic: %v\n", err) - log.Printf("Raw topic: %s\n", msg.Topic()) - log.Printf("Raw payload: %x\n", msg.Payload()) + c.logger.Errorw("Error parsing topic", + "error", err, + "topic", msg.Topic(), + "payload_hex", fmt.Sprintf("%x", msg.Payload()), + ) return } @@ -113,25 +119,25 @@ func (c *Client) messageHandler(client mqtt.Client, msg mqtt.Message) { return default: // Channel buffer is full, log a warning and drop the message - log.Println("Warning: Message buffer full, dropping message") + c.logger.Warn("Message buffer full, dropping message") } case "json": // TODO: Add support for JSON format messages in the future - log.Printf("Ignoring JSON format message from topic: %s\n", msg.Topic()) + c.logger.Debugf("Ignoring JSON format message from topic: %s", msg.Topic()) default: // Unsupported format, log and ignore - log.Printf("Unsupported format: %s from topic: %s\n", topicInfo.Format, msg.Topic()) + c.logger.Infow("Unsupported format", "format", topicInfo.Format, "topic", msg.Topic()) } } // connectHandler is called when the client connects to the broker func (c *Client) connectHandler(client mqtt.Client) { - log.Println("Connected to MQTT Broker!") + c.logger.Info("Connected to MQTT Broker") } // connectionLostHandler is called when the client loses connection func (c *Client) connectionLostHandler(client mqtt.Client, err error) { - log.Printf("Connection lost: %v\n", err) + c.logger.Errorw("Connection lost", "error", err) } \ No newline at end of file diff --git a/mqtt/logger.go b/mqtt/logger.go index 209cbc2..8b619d9 100644 --- a/mqtt/logger.go +++ b/mqtt/logger.go @@ -2,13 +2,15 @@ package mqtt import ( "fmt" - "log" + "io" "os" "path/filepath" "strings" "sync" "time" + "github.com/dpup/prefab/logging" + "meshstream/decoder" pb "meshstream/proto/generated/meshtastic" @@ -18,11 +20,12 @@ import ( type MessageLogger struct { *BaseSubscriber logDir string - loggers map[pb.PortNum]*log.Logger + loggers map[pb.PortNum]io.Writer files map[pb.PortNum]*os.File mutex sync.Mutex logToStdout bool // Flag to enable console output stdoutSeparator string // Separator for console output + logger logging.Logger // Main logger instance } // NewMessageLogger creates a new message logger that subscribes to the broker @@ -32,12 +35,16 @@ func NewMessageLogger(broker *Broker, logDir string, logToStdout bool, stdoutSep return nil, fmt.Errorf("failed to create log directory: %v", err) } + // Create a logger with appropriate name + logger := logging.NewDevLogger().Named("mqtt.message_logger") + ml := &MessageLogger{ logDir: logDir, - loggers: make(map[pb.PortNum]*log.Logger), + loggers: make(map[pb.PortNum]io.Writer), files: make(map[pb.PortNum]*os.File), logToStdout: logToStdout, stdoutSeparator: stdoutSeparator, + logger: logger, } // Create base subscriber with logger's message handler @@ -61,19 +68,19 @@ func (ml *MessageLogger) closeLogFiles() { defer ml.mutex.Unlock() for portNum, file := range ml.files { - log.Printf("Closing log file for %s", portNum) + ml.logger.Infof("Closing log file for %s", portNum) file.Close() } } -// getLogger returns a logger for the specified port type -func (ml *MessageLogger) getLogger(portNum pb.PortNum) *log.Logger { +// getLogWriter returns a writer for the specified port type +func (ml *MessageLogger) getLogWriter(portNum pb.PortNum) io.Writer { ml.mutex.Lock() defer ml.mutex.Unlock() - // Check if we already have a logger for this port type - if logger, ok := ml.loggers[portNum]; ok { - return logger + // Check if we already have a writer for this port type + if writer, ok := ml.loggers[portNum]; ok { + return writer } // Create a new log file for this port type @@ -82,18 +89,15 @@ func (ml *MessageLogger) getLogger(portNum pb.PortNum) *log.Logger { file, err := os.OpenFile(filepath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644) if err != nil { - log.Printf("Error opening log file %s: %v", filepath, err) + ml.logger.Errorw("Error opening log file", "filepath", filepath, "error", err) return nil } - // Create a new logger - logger := log.New(file, "", log.LstdFlags) - - // Store the logger and file handle - ml.loggers[portNum] = logger + // Store the writer and file handle + ml.loggers[portNum] = file ml.files[portNum] = file - return logger + return file } // logMessage logs a message to the appropriate file and optionally to stdout @@ -102,15 +106,15 @@ func (ml *MessageLogger) logMessage(packet *Packet) { formattedOutput := decoder.FormatTopicAndPacket(packet.TopicInfo, packet.DecodedPacket) // Add a timestamp and node info - logEntry := fmt.Sprintf("[Node %d] %s\n%s", + logEntry := fmt.Sprintf("[Node %d] %s\n%s\n", packet.From, time.Now().Format(time.RFC3339), formattedOutput) // Log to file - logger := ml.getLogger(packet.PortNum) - if logger != nil { - logger.Println(logEntry) + writer := ml.getLogWriter(packet.PortNum) + if writer != nil { + fmt.Fprint(writer, logEntry) } // Log to stdout if enabled @@ -120,5 +124,12 @@ func (ml *MessageLogger) logMessage(packet *Packet) { fmt.Println(ml.stdoutSeparator) } } + + // Also log a brief message with the structured logger + ml.logger.Debugw("Message logged", + "portNum", packet.PortNum.String(), + "from", packet.From, + "to", packet.To, + ) } diff --git a/mqtt/subscriber.go b/mqtt/subscriber.go index 19c7da4..d6a1318 100644 --- a/mqtt/subscriber.go +++ b/mqtt/subscriber.go @@ -1,8 +1,9 @@ package mqtt import ( - "log" "sync" + + "github.com/dpup/prefab/logging" ) // SubscriberConfig holds configuration for creating a subscriber @@ -26,10 +27,14 @@ type BaseSubscriber struct { startHook func() closeHook func() BufferSize int + logger logging.Logger } // NewBaseSubscriber creates a new base subscriber func NewBaseSubscriber(config SubscriberConfig) *BaseSubscriber { + // Create a logger for this subscriber + logger := logging.NewDevLogger().Named("mqtt.subscriber." + config.Name) + return &BaseSubscriber{ broker: config.Broker, name: config.Name, @@ -38,6 +43,7 @@ func NewBaseSubscriber(config SubscriberConfig) *BaseSubscriber { startHook: config.StartHook, closeHook: config.CloseHook, BufferSize: config.BufferSize, + logger: logger, } } @@ -55,7 +61,7 @@ func (b *BaseSubscriber) Start() { b.wg.Add(1) go b.run() - log.Printf("Subscriber %s started", b.name) + b.logger.Infof("Subscriber %s started", b.name) } // run processes messages from the channel @@ -67,7 +73,7 @@ func (b *BaseSubscriber) run() { case packet, ok := <-b.channel: if !ok { // Channel closed - log.Printf("Channel closed for subscriber %s", b.name) + b.logger.Infof("Channel closed for subscriber %s", b.name) return } @@ -76,7 +82,7 @@ func (b *BaseSubscriber) run() { } case <-b.done: - log.Printf("Subscriber %s received shutdown signal", b.name) + b.logger.Infof("Subscriber %s received shutdown signal", b.name) return } } @@ -84,7 +90,7 @@ func (b *BaseSubscriber) run() { // Close stops the subscriber and releases resources func (b *BaseSubscriber) Close() { - log.Printf("Closing subscriber %s", b.name) + b.logger.Infof("Closing subscriber %s", b.name) // Signal the processing loop to stop close(b.done) @@ -100,7 +106,7 @@ func (b *BaseSubscriber) Close() { b.closeHook() } - log.Printf("Subscriber %s closed", b.name) + b.logger.Infof("Subscriber %s closed", b.name) } // Name returns the subscriber's name diff --git a/server/server.go b/server/server.go index 42631cb..105be44 100644 --- a/server/server.go +++ b/server/server.go @@ -3,13 +3,13 @@ package server import ( "encoding/json" "fmt" - "log" "net/http" "strconv" "sync/atomic" "time" "github.com/dpup/prefab" + "github.com/dpup/prefab/logging" "meshstream/mqtt" ) @@ -29,17 +29,23 @@ type Server struct { shutdown chan struct{} // Atomic flag to indicate if server is shutting down isShuttingDown atomic.Bool + // Logger instance + logger logging.Logger } // New creates a new server instance func New(config Config) *Server { + // Create a named logger + logger := logging.NewDevLogger().Named("server") + if config.Broker == nil { - log.Println("Warning: Server created without a broker, streaming will not work") + logger.Info("Warning: Server created without a broker, streaming will not work") } return &Server{ config: config, shutdown: make(chan struct{}), + logger: logger, } } @@ -61,7 +67,7 @@ func (s *Server) Start() error { ) // Start the server - log.Printf("Starting server on %s:%s", s.config.Host, s.config.Port) + s.logger.Infof("Starting server on %s:%s", s.config.Host, s.config.Port) return s.server.Start() } @@ -82,17 +88,30 @@ func (s *Server) Stop() error { // handleStatus is a placeholder API endpoint that returns server status func (s *Server) handleStatus(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + // Ensure logger is in context + ctx = logging.EnsureLogger(ctx) + logger := logging.FromContext(ctx).Named("api.status") + status := map[string]interface{}{ "status": "ok", "message": "Meshtastic Stream API is running", } + logger.Debug("Status endpoint called") + w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(status) } // handleStream handles Server-Sent Events streaming of MQTT messages func (s *Server) handleStream(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + // Ensure we have a logger in the context + ctx = logging.EnsureLogger(ctx) + // Create request-scoped logger + logger := logging.FromContext(ctx).Named("sse") + // Check if the server is shutting down if s.isShuttingDown.Load() { http.Error(w, "Server is shutting down", http.StatusServiceUnavailable) @@ -122,7 +141,7 @@ func (s *Server) handleStream(w http.ResponseWriter, r *http.Request) { packetChan := s.config.Broker.Subscribe(10) // Signal when the client disconnects - notify := r.Context().Done() + notify := ctx.Done() // Send an initial message fmt.Fprintf(w, "event: info\ndata: Connected to Meshtastic stream\n\n") @@ -133,13 +152,13 @@ func (s *Server) handleStream(w http.ResponseWriter, r *http.Request) { select { case <-notify: // Client disconnected, unsubscribe and return - log.Println("Client disconnected, unsubscribing from broker") + logger.Info("Client disconnected, unsubscribing from broker") s.config.Broker.Unsubscribe(packetChan) return case <-s.shutdown: // Server is shutting down, send a message to client and close - log.Println("Server shutting down, closing SSE connection") + logger.Info("Server shutting down, closing SSE connection") fmt.Fprintf(w, "event: info\ndata: Server shutting down, connection closed\n\n") flusher.Flush() s.config.Broker.Unsubscribe(packetChan) @@ -148,7 +167,7 @@ func (s *Server) handleStream(w http.ResponseWriter, r *http.Request) { case packet, ok := <-packetChan: if !ok { // Channel closed, probably shutting down - log.Println("Packet channel closed, ending stream") + logger.Info("Packet channel closed, ending stream") return } @@ -172,7 +191,7 @@ func (s *Server) handleStream(w http.ResponseWriter, r *http.Request) { data, err := json.Marshal(packetWrapper) if err != nil { - log.Printf("Error marshaling packet to JSON: %v", err) + logger.Errorw("Error marshaling packet to JSON", "error", err) continue }