Update logging to use prefab/logging package

- Replace standard log package with prefab/logging
- Add context-aware logging in HTTP handlers
- Add structured logging with fields and levels
- Improve logging hierarchy and namespaces
- Implement named loggers for components
This commit is contained in:
Daniel Pupius
2025-04-21 11:01:45 -07:00
parent 9c438713de
commit 9447f44139
6 changed files with 115 additions and 64 deletions

21
main.go
View File

@@ -2,13 +2,14 @@ package main
import (
"fmt"
"log"
"os"
"os/signal"
"strings"
"syscall"
"time"
"github.com/dpup/prefab/logging"
"meshstream/decoder"
"meshstream/mqtt"
"meshstream/server"
@@ -28,16 +29,16 @@ const (
func main() {
// Set up logging
log.SetOutput(os.Stdout)
logger := logging.NewDevLogger().Named("main")
// Initialize default channel key
err := decoder.AddChannelKey("LongFast", decoder.DefaultPrivateKey)
if err != nil {
log.Printf("Failed to initialize default channel key: %v", err)
logger.Errorw("Failed to initialize default channel key", "error", err)
}
if err := decoder.AddChannelKey("ERSN", "VIuMtC5uDDJtC/ojdH314HLkDIHanX4LdbK5yViV9jA="); err != nil {
log.Printf("Failed to initialize ERSN channel key: %v", err)
logger.Errorw("Failed to initialize ERSN channel key", "error", err)
}
// Configure and create the MQTT client
@@ -53,14 +54,14 @@ func main() {
// Connect to the MQTT broker
if err := mqttClient.Connect(); err != nil {
log.Fatalf("Failed to connect to MQTT broker: %v", err)
logger.Fatalw("Failed to connect to MQTT broker", "error", err)
}
// Get the messages channel to receive decoded messages
messagesChan := mqttClient.Messages()
// Create a message broker to distribute messages to multiple consumers
broker := mqtt.NewBroker(messagesChan)
broker := mqtt.NewBroker(messagesChan, logger)
// Create a stats tracker that subscribes to the broker
// with statistics printed every 30 seconds
@@ -75,7 +76,7 @@ func main() {
strings.Repeat("-", 80), // Use separator
)
if err != nil {
log.Printf("Warning: Failed to initialize message logger: %v", err)
logger.Warnw("Failed to initialize message logger", "error", err)
}
// Start the web server
@@ -88,7 +89,7 @@ func main() {
// Start the server in a goroutine
go func() {
if err := webServer.Start(); err != nil {
log.Printf("Web server error: %v", err)
logger.Errorw("Web server error", "error", err)
}
}()
@@ -111,7 +112,7 @@ func main() {
// Close components in reverse order of creation
// First stop the web server
if err := webServer.Stop(); err != nil {
log.Printf("Error stopping web server: %v", err)
logger.Errorw("Error stopping web server", "error", err)
}
// Then stop the logger

View File

@@ -1,8 +1,9 @@
package mqtt
import (
"log"
"sync"
"github.com/dpup/prefab/logging"
)
// Broker distributes messages from a source channel to multiple subscriber channels
@@ -12,14 +13,22 @@ type Broker struct {
subscriberMutex sync.RWMutex // Lock for modifying the subscribers map
done chan struct{} // Signal to stop the dispatch loop
wg sync.WaitGroup // Wait group to ensure clean shutdown
logger logging.Logger // Logger for broker operations
}
// NewBroker creates a new broker that distributes messages from sourceChannel to subscribers
func NewBroker(sourceChannel <-chan *Packet) *Broker {
func NewBroker(sourceChannel <-chan *Packet, logger logging.Logger) *Broker {
// Create a named logger if one was not provided
if logger == nil {
logger = logging.NewDevLogger()
}
brokerLogger := logger.Named("mqtt.broker")
broker := &Broker{
sourceChan: sourceChannel,
subscribers: make(map[chan *Packet]struct{}),
done: make(chan struct{}),
logger: brokerLogger,
}
// Start the dispatch loop
@@ -46,7 +55,6 @@ func (b *Broker) Subscribe(bufferSize int) <-chan *Packet {
// Unsubscribe removes a subscriber and closes its channel
func (b *Broker) Unsubscribe(ch <-chan *Packet) {
b.subscriberMutex.Lock()
defer b.subscriberMutex.Unlock()
@@ -60,7 +68,7 @@ func (b *Broker) Unsubscribe(ch <-chan *Packet) {
}
// If we get here, the channel wasn't found
log.Println("Warning: Subscriber channel not found - cannot unsubscribe")
b.logger.Warn("Subscriber channel not found - cannot unsubscribe")
}
// Close shuts down the broker and closes all subscriber channels
@@ -94,7 +102,7 @@ func (b *Broker) dispatchLoop() {
case packet, ok := <-b.sourceChan:
if !ok {
// Source channel has been closed, shut down the broker
log.Println("Source channel closed, shutting down broker")
b.logger.Info("Source channel closed, shutting down broker")
b.Close()
return
}
@@ -122,7 +130,7 @@ func (b *Broker) broadcast(packet *Packet) {
defer func() {
if r := recover(); r != nil {
// This can happen if the channel was closed after we took a snapshot
log.Println("Warning: Recovered from panic in broadcast, channel likely closed")
b.logger.Warn("Recovered from panic in broadcast, channel likely closed")
}
}()
@@ -132,7 +140,7 @@ func (b *Broker) broadcast(packet *Packet) {
// Message delivered successfully
default:
// Channel buffer is full, log warning and drop the message
log.Println("Warning: Subscriber buffer full, dropping message")
b.logger.Warn("Subscriber buffer full, dropping message")
}
}(ch)
}

View File

@@ -2,12 +2,12 @@ package mqtt
import (
"fmt"
"log"
"time"
"meshstream/decoder"
"github.com/dpup/prefab/logging"
mqtt "github.com/eclipse/paho.mqtt.golang"
"meshstream/decoder"
)
// Config holds configuration for the MQTT client
@@ -25,14 +25,18 @@ type Client struct {
client mqtt.Client
decodedMessages chan *Packet
done chan struct{}
logger logging.Logger
}
// NewClient creates a new MQTT client with the provided configuration
func NewClient(config Config) *Client {
logger := logging.NewDevLogger().Named("mqtt.client")
return &Client{
config: config,
decodedMessages: make(chan *Packet, 100), // Buffer up to 100 messages
done: make(chan struct{}),
logger: logger,
}
}
@@ -58,7 +62,7 @@ func (c *Client) Connect() error {
// Subscribe to the configured topic
token := c.client.Subscribe(c.config.Topic, 0, nil)
token.Wait()
log.Printf("Subscribed to topic: %s\n", c.config.Topic)
c.logger.Infof("Subscribed to topic: %s", c.config.Topic)
return nil
}
@@ -81,14 +85,16 @@ func (c *Client) Messages() <-chan *Packet {
// messageHandler processes incoming MQTT messages
func (c *Client) messageHandler(client mqtt.Client, msg mqtt.Message) {
log.Printf("Received message from topic: %s\n", msg.Topic())
c.logger.Debugf("Received message from topic: %s", msg.Topic())
// Parse the topic structure
topicInfo, err := decoder.ParseTopic(msg.Topic())
if err != nil {
log.Printf("Error parsing topic: %v\n", err)
log.Printf("Raw topic: %s\n", msg.Topic())
log.Printf("Raw payload: %x\n", msg.Payload())
c.logger.Errorw("Error parsing topic",
"error", err,
"topic", msg.Topic(),
"payload_hex", fmt.Sprintf("%x", msg.Payload()),
)
return
}
@@ -113,25 +119,25 @@ func (c *Client) messageHandler(client mqtt.Client, msg mqtt.Message) {
return
default:
// Channel buffer is full, log a warning and drop the message
log.Println("Warning: Message buffer full, dropping message")
c.logger.Warn("Message buffer full, dropping message")
}
case "json":
// TODO: Add support for JSON format messages in the future
log.Printf("Ignoring JSON format message from topic: %s\n", msg.Topic())
c.logger.Debugf("Ignoring JSON format message from topic: %s", msg.Topic())
default:
// Unsupported format, log and ignore
log.Printf("Unsupported format: %s from topic: %s\n", topicInfo.Format, msg.Topic())
c.logger.Infow("Unsupported format", "format", topicInfo.Format, "topic", msg.Topic())
}
}
// connectHandler is called when the client connects to the broker
func (c *Client) connectHandler(client mqtt.Client) {
log.Println("Connected to MQTT Broker!")
c.logger.Info("Connected to MQTT Broker")
}
// connectionLostHandler is called when the client loses connection
func (c *Client) connectionLostHandler(client mqtt.Client, err error) {
log.Printf("Connection lost: %v\n", err)
c.logger.Errorw("Connection lost", "error", err)
}

View File

@@ -2,13 +2,15 @@ package mqtt
import (
"fmt"
"log"
"io"
"os"
"path/filepath"
"strings"
"sync"
"time"
"github.com/dpup/prefab/logging"
"meshstream/decoder"
pb "meshstream/proto/generated/meshtastic"
@@ -18,11 +20,12 @@ import (
type MessageLogger struct {
*BaseSubscriber
logDir string
loggers map[pb.PortNum]*log.Logger
loggers map[pb.PortNum]io.Writer
files map[pb.PortNum]*os.File
mutex sync.Mutex
logToStdout bool // Flag to enable console output
stdoutSeparator string // Separator for console output
logger logging.Logger // Main logger instance
}
// NewMessageLogger creates a new message logger that subscribes to the broker
@@ -32,12 +35,16 @@ func NewMessageLogger(broker *Broker, logDir string, logToStdout bool, stdoutSep
return nil, fmt.Errorf("failed to create log directory: %v", err)
}
// Create a logger with appropriate name
logger := logging.NewDevLogger().Named("mqtt.message_logger")
ml := &MessageLogger{
logDir: logDir,
loggers: make(map[pb.PortNum]*log.Logger),
loggers: make(map[pb.PortNum]io.Writer),
files: make(map[pb.PortNum]*os.File),
logToStdout: logToStdout,
stdoutSeparator: stdoutSeparator,
logger: logger,
}
// Create base subscriber with logger's message handler
@@ -61,19 +68,19 @@ func (ml *MessageLogger) closeLogFiles() {
defer ml.mutex.Unlock()
for portNum, file := range ml.files {
log.Printf("Closing log file for %s", portNum)
ml.logger.Infof("Closing log file for %s", portNum)
file.Close()
}
}
// getLogger returns a logger for the specified port type
func (ml *MessageLogger) getLogger(portNum pb.PortNum) *log.Logger {
// getLogWriter returns a writer for the specified port type
func (ml *MessageLogger) getLogWriter(portNum pb.PortNum) io.Writer {
ml.mutex.Lock()
defer ml.mutex.Unlock()
// Check if we already have a logger for this port type
if logger, ok := ml.loggers[portNum]; ok {
return logger
// Check if we already have a writer for this port type
if writer, ok := ml.loggers[portNum]; ok {
return writer
}
// Create a new log file for this port type
@@ -82,18 +89,15 @@ func (ml *MessageLogger) getLogger(portNum pb.PortNum) *log.Logger {
file, err := os.OpenFile(filepath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
if err != nil {
log.Printf("Error opening log file %s: %v", filepath, err)
ml.logger.Errorw("Error opening log file", "filepath", filepath, "error", err)
return nil
}
// Create a new logger
logger := log.New(file, "", log.LstdFlags)
// Store the logger and file handle
ml.loggers[portNum] = logger
// Store the writer and file handle
ml.loggers[portNum] = file
ml.files[portNum] = file
return logger
return file
}
// logMessage logs a message to the appropriate file and optionally to stdout
@@ -102,15 +106,15 @@ func (ml *MessageLogger) logMessage(packet *Packet) {
formattedOutput := decoder.FormatTopicAndPacket(packet.TopicInfo, packet.DecodedPacket)
// Add a timestamp and node info
logEntry := fmt.Sprintf("[Node %d] %s\n%s",
logEntry := fmt.Sprintf("[Node %d] %s\n%s\n",
packet.From,
time.Now().Format(time.RFC3339),
formattedOutput)
// Log to file
logger := ml.getLogger(packet.PortNum)
if logger != nil {
logger.Println(logEntry)
writer := ml.getLogWriter(packet.PortNum)
if writer != nil {
fmt.Fprint(writer, logEntry)
}
// Log to stdout if enabled
@@ -120,5 +124,12 @@ func (ml *MessageLogger) logMessage(packet *Packet) {
fmt.Println(ml.stdoutSeparator)
}
}
// Also log a brief message with the structured logger
ml.logger.Debugw("Message logged",
"portNum", packet.PortNum.String(),
"from", packet.From,
"to", packet.To,
)
}

View File

@@ -1,8 +1,9 @@
package mqtt
import (
"log"
"sync"
"github.com/dpup/prefab/logging"
)
// SubscriberConfig holds configuration for creating a subscriber
@@ -26,10 +27,14 @@ type BaseSubscriber struct {
startHook func()
closeHook func()
BufferSize int
logger logging.Logger
}
// NewBaseSubscriber creates a new base subscriber
func NewBaseSubscriber(config SubscriberConfig) *BaseSubscriber {
// Create a logger for this subscriber
logger := logging.NewDevLogger().Named("mqtt.subscriber." + config.Name)
return &BaseSubscriber{
broker: config.Broker,
name: config.Name,
@@ -38,6 +43,7 @@ func NewBaseSubscriber(config SubscriberConfig) *BaseSubscriber {
startHook: config.StartHook,
closeHook: config.CloseHook,
BufferSize: config.BufferSize,
logger: logger,
}
}
@@ -55,7 +61,7 @@ func (b *BaseSubscriber) Start() {
b.wg.Add(1)
go b.run()
log.Printf("Subscriber %s started", b.name)
b.logger.Infof("Subscriber %s started", b.name)
}
// run processes messages from the channel
@@ -67,7 +73,7 @@ func (b *BaseSubscriber) run() {
case packet, ok := <-b.channel:
if !ok {
// Channel closed
log.Printf("Channel closed for subscriber %s", b.name)
b.logger.Infof("Channel closed for subscriber %s", b.name)
return
}
@@ -76,7 +82,7 @@ func (b *BaseSubscriber) run() {
}
case <-b.done:
log.Printf("Subscriber %s received shutdown signal", b.name)
b.logger.Infof("Subscriber %s received shutdown signal", b.name)
return
}
}
@@ -84,7 +90,7 @@ func (b *BaseSubscriber) run() {
// Close stops the subscriber and releases resources
func (b *BaseSubscriber) Close() {
log.Printf("Closing subscriber %s", b.name)
b.logger.Infof("Closing subscriber %s", b.name)
// Signal the processing loop to stop
close(b.done)
@@ -100,7 +106,7 @@ func (b *BaseSubscriber) Close() {
b.closeHook()
}
log.Printf("Subscriber %s closed", b.name)
b.logger.Infof("Subscriber %s closed", b.name)
}
// Name returns the subscriber's name

View File

@@ -3,13 +3,13 @@ package server
import (
"encoding/json"
"fmt"
"log"
"net/http"
"strconv"
"sync/atomic"
"time"
"github.com/dpup/prefab"
"github.com/dpup/prefab/logging"
"meshstream/mqtt"
)
@@ -29,17 +29,23 @@ type Server struct {
shutdown chan struct{}
// Atomic flag to indicate if server is shutting down
isShuttingDown atomic.Bool
// Logger instance
logger logging.Logger
}
// New creates a new server instance
func New(config Config) *Server {
// Create a named logger
logger := logging.NewDevLogger().Named("server")
if config.Broker == nil {
log.Println("Warning: Server created without a broker, streaming will not work")
logger.Info("Warning: Server created without a broker, streaming will not work")
}
return &Server{
config: config,
shutdown: make(chan struct{}),
logger: logger,
}
}
@@ -61,7 +67,7 @@ func (s *Server) Start() error {
)
// Start the server
log.Printf("Starting server on %s:%s", s.config.Host, s.config.Port)
s.logger.Infof("Starting server on %s:%s", s.config.Host, s.config.Port)
return s.server.Start()
}
@@ -82,17 +88,30 @@ func (s *Server) Stop() error {
// handleStatus is a placeholder API endpoint that returns server status
func (s *Server) handleStatus(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
// Ensure logger is in context
ctx = logging.EnsureLogger(ctx)
logger := logging.FromContext(ctx).Named("api.status")
status := map[string]interface{}{
"status": "ok",
"message": "Meshtastic Stream API is running",
}
logger.Debug("Status endpoint called")
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(status)
}
// handleStream handles Server-Sent Events streaming of MQTT messages
func (s *Server) handleStream(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
// Ensure we have a logger in the context
ctx = logging.EnsureLogger(ctx)
// Create request-scoped logger
logger := logging.FromContext(ctx).Named("sse")
// Check if the server is shutting down
if s.isShuttingDown.Load() {
http.Error(w, "Server is shutting down", http.StatusServiceUnavailable)
@@ -122,7 +141,7 @@ func (s *Server) handleStream(w http.ResponseWriter, r *http.Request) {
packetChan := s.config.Broker.Subscribe(10)
// Signal when the client disconnects
notify := r.Context().Done()
notify := ctx.Done()
// Send an initial message
fmt.Fprintf(w, "event: info\ndata: Connected to Meshtastic stream\n\n")
@@ -133,13 +152,13 @@ func (s *Server) handleStream(w http.ResponseWriter, r *http.Request) {
select {
case <-notify:
// Client disconnected, unsubscribe and return
log.Println("Client disconnected, unsubscribing from broker")
logger.Info("Client disconnected, unsubscribing from broker")
s.config.Broker.Unsubscribe(packetChan)
return
case <-s.shutdown:
// Server is shutting down, send a message to client and close
log.Println("Server shutting down, closing SSE connection")
logger.Info("Server shutting down, closing SSE connection")
fmt.Fprintf(w, "event: info\ndata: Server shutting down, connection closed\n\n")
flusher.Flush()
s.config.Broker.Unsubscribe(packetChan)
@@ -148,7 +167,7 @@ func (s *Server) handleStream(w http.ResponseWriter, r *http.Request) {
case packet, ok := <-packetChan:
if !ok {
// Channel closed, probably shutting down
log.Println("Packet channel closed, ending stream")
logger.Info("Packet channel closed, ending stream")
return
}
@@ -172,7 +191,7 @@ func (s *Server) handleStream(w http.ResponseWriter, r *http.Request) {
data, err := json.Marshal(packetWrapper)
if err != nil {
log.Printf("Error marshaling packet to JSON: %v", err)
logger.Errorw("Error marshaling packet to JSON", "error", err)
continue
}