From 0f151c31b98293d5886e5fc2e63219aa64f7fca9 Mon Sep 17 00:00:00 2001 From: Daniel Pupius Date: Mon, 21 Apr 2025 11:39:50 -0700 Subject: [PATCH] Improve structured logging for better log aggregation - Add message type prefix in brief mode for quick identification - Include GatewayID in brief mode summary - Remove formatted output from structured fields - Use proper structured fields for each message type - Add common fields like hopLimit and ID for all messages - Extract specific data for position, telemetry, and text messages - Format structured data for better log aggregation compatibility --- main.go | 64 +++++++++++++-------------- mqtt/logger.go | 96 ++++++++++++++++------------------------ mqtt/stats.go | 37 +++++++--------- mqtt/subscriber.go | 40 ++++++++--------- server/static/index.html | 74 ++++++++++--------------------- 5 files changed, 129 insertions(+), 182 deletions(-) diff --git a/main.go b/main.go index 95365a6..e5a2e8b 100644 --- a/main.go +++ b/main.go @@ -1,10 +1,8 @@ package main import ( - "fmt" "os" "os/signal" - "strings" "syscall" "time" @@ -21,16 +19,16 @@ const ( mqttPassword = "large4cats" mqttTopicPrefix = "msh/US/bayarea" logsDir = "./logs" - + // Web server configuration - serverHost = "localhost" - serverPort = "8080" + serverHost = "localhost" + serverPort = "8080" ) func main() { // Set up logging - logger := logging.NewDevLogger().Named("main") - + logger := logging.NewProdLogger().Named("main") + // Initialize default channel key err := decoder.AddChannelKey("LongFast", decoder.DefaultPrivateKey) if err != nil { @@ -49,84 +47,82 @@ func main() { ClientID: "meshstream-client", Topic: mqttTopicPrefix + "/#", } - + mqttClient := mqtt.NewClient(mqttConfig, logger) - + // Connect to the MQTT broker if err := mqttClient.Connect(); err != nil { logger.Fatalw("Failed to connect to MQTT broker", "error", err) } - + // Get the messages channel to receive decoded messages messagesChan := mqttClient.Messages() - + // Create a message broker to distribute messages to multiple consumers broker := mqtt.NewBroker(messagesChan, logger) - + // Create a stats tracker that subscribes to the broker // with statistics printed every 30 seconds stats := mqtt.NewMessageStats(broker, 30*time.Second, logger) - + // Create a message logger that subscribes to the broker // and also logs to stdout with a separator messageLogger, err := mqtt.NewMessageLogger( - broker, - true, // Use brief mode for more concise logs - true, // Enable logging to stdout - strings.Repeat("-", 80), // Use separator + broker, + false, // Use brief mode for more concise logs logger, ) if err != nil { logger.Warnw("Failed to initialize message logger", "error", err) } - + // Start the web server webServer := server.New(server.Config{ Host: serverHost, Port: serverPort, Broker: broker, }, logger) - + // Start the server in a goroutine go func() { if err := webServer.Start(); err != nil { logger.Errorw("Web server error", "error", err) } }() - + // Setup signal handling for graceful shutdown sig := make(chan os.Signal, 1) signal.Notify(sig, os.Interrupt, syscall.SIGTERM) - + // Process messages until interrupt received - fmt.Println("Waiting for messages... Press Ctrl+C to exit") - fmt.Println("Statistics will be printed every 30 seconds") - fmt.Println("Messages will be logged to files in the ./logs directory") - fmt.Printf("Web server running at http://%s:%s\n", serverHost, serverPort) - + logger.Info("Waiting for messages... Press Ctrl+C to exit") + logger.Info("Statistics will be printed every 30 seconds") + logger.Info("Messages will be logged to files in the ./logs directory") + logger.Infof("Web server running at http://%s:%s\n", serverHost, serverPort) + // Wait for interrupt signal <-sig - + // Got an interrupt signal, shutting down - fmt.Println("Shutting down...") - + logger.Info("Shutting down...") + // Close components in reverse order of creation // First stop the web server if err := webServer.Stop(); err != nil { logger.Errorw("Error stopping web server", "error", err) } - + // Then stop the logger if messageLogger != nil { messageLogger.Close() } - + // Stop the stats collector stats.Close() - + // Close the broker (which will close all subscriber channels) broker.Close() - + // Then disconnect the MQTT client mqttClient.Disconnect() -} \ No newline at end of file +} diff --git a/mqtt/logger.go b/mqtt/logger.go index d5577ff..0fb0700 100644 --- a/mqtt/logger.go +++ b/mqtt/logger.go @@ -6,35 +6,25 @@ import ( "github.com/dpup/prefab/logging" - "meshstream/decoder" - pb "meshstream/proto/generated/meshtastic" ) // MessageLogger logs messages using the provided logger type MessageLogger struct { *BaseSubscriber - logger logging.Logger // Main logger instance - briefMode bool // Whether to log brief summaries instead of full packets - logToStdout bool // Flag to enable console output - stdoutSeparator string // Separator for console output + logger logging.Logger // Main logger instance + briefMode bool // Whether to log brief summaries instead of full packets } // NewMessageLogger creates a new message logger that subscribes to the broker -func NewMessageLogger(broker *Broker, briefMode bool, logToStdout bool, stdoutSeparator string, logger logging.Logger) (*MessageLogger, error) { - // Use provided logger or create a default one - if logger == nil { - logger = logging.NewDevLogger() - } - messageLoggerLogger := logger.Named("mqtt.message_logger") - +func NewMessageLogger(broker *Broker, briefMode bool, logger logging.Logger) (*MessageLogger, error) { + messageLoggerLogger := logger.Named("mqtt.MessageLogger") + ml := &MessageLogger{ - briefMode: briefMode, - logToStdout: logToStdout, - stdoutSeparator: stdoutSeparator, - logger: messageLoggerLogger, + briefMode: briefMode, + logger: messageLoggerLogger, } - + // Create base subscriber with logger's message handler ml.BaseSubscriber = NewBaseSubscriber(SubscriberConfig{ Name: "MessageLogger", @@ -43,21 +33,21 @@ func NewMessageLogger(broker *Broker, briefMode bool, logToStdout bool, stdoutSe Processor: ml.logMessage, Logger: logger, }) - + // Start processing messages ml.Start() - + return ml, nil } // getBriefSummary returns a brief summary of the packet func (ml *MessageLogger) getBriefSummary(packet *Packet) string { var summary string - + if packet.DecodedPacket.DecodeError != nil { return fmt.Sprintf("Error decoding packet: %v", packet.DecodedPacket.DecodeError) } - + // Create a basic summary based on the port type switch packet.PortNum { case pb.PortNum_TEXT_MESSAGE_APP: @@ -67,7 +57,7 @@ func (ml *MessageLogger) getBriefSummary(packet *Packet) string { } else { summary = "Text message (invalid format)" } - + case pb.PortNum_POSITION_APP: // For position messages, include a compact location summary if pos, ok := packet.Payload.(*pb.Position); ok { @@ -77,7 +67,7 @@ func (ml *MessageLogger) getBriefSummary(packet *Packet) string { } else { summary = "Position update (invalid format)" } - + case pb.PortNum_TELEMETRY_APP: // For telemetry, give a short summary of what's included if telemetry, ok := packet.Payload.(*pb.Telemetry); ok { @@ -95,53 +85,45 @@ func (ml *MessageLogger) getBriefSummary(packet *Packet) string { } else { summary = "Telemetry (invalid format)" } - + default: // For other types, just mention the port type summary = fmt.Sprintf("Message type: %s", packet.PortNum.String()) } - + return summary } // logMessage logs a message using the structured logger func (ml *MessageLogger) logMessage(packet *Packet) { - // Get the full formatted output if needed for stdout or debug logging - formattedOutput := decoder.FormatTopicAndPacket(packet.TopicInfo, packet.DecodedPacket) - // Get a brief summary for structured logging briefSummary := ml.getBriefSummary(packet) - - // Prepare common fields for both logging modes - fields := []interface{}{ - "portNum", packet.PortNum.String(), - "from", packet.From, - "to", packet.To, - "channel", packet.TopicInfo.Channel, - "region", packet.TopicInfo.RegionPath, + + // Build the message prefix with type and GatewayID info for brief mode + typePrefix := fmt.Sprintf("[%s]", packet.PortNum.String()) + if packet.GatewayID != "" { + briefSummary = fmt.Sprintf("%s Gateway:%s %s", typePrefix, packet.GatewayID, briefSummary) + } else { + briefSummary = fmt.Sprintf("%s %s", typePrefix, briefSummary) } - - // If packet had a decode error, add it to the fields - if packet.DecodedPacket.DecodeError != nil { - fields = append(fields, "error", packet.DecodedPacket.DecodeError.Error()) - } - - // Log based on mode + if ml.briefMode { - // Brief mode - just log the summary with structured fields + fields := []interface{}{ + "portNum", packet.PortNum.String(), + "from", packet.From, + "to", packet.To, + "gateway", packet.GatewayID, + "channel", packet.TopicInfo.Channel, + "region", packet.TopicInfo.RegionPath, + "hopLimit", packet.HopLimit, + "id", packet.ID, + } + if packet.DecodedPacket.DecodeError != nil { + fields = append(fields, "error", packet.DecodedPacket.DecodeError.Error()) + } ml.logger.Infow(briefSummary, fields...) } else { - // Full mode - include the full formatted output - allFields := append(fields, "fullOutput", formattedOutput) - ml.logger.Infow("Message received", allFields...) + ml.logger.Infow(briefSummary, "packet", packet) } - - // Log to stdout if enabled - if ml.logToStdout { - fmt.Println(formattedOutput) - if ml.stdoutSeparator != "" { - fmt.Println(ml.stdoutSeparator) - } - } -} +} diff --git a/mqtt/stats.go b/mqtt/stats.go index 84c480f..5fe15a2 100644 --- a/mqtt/stats.go +++ b/mqtt/stats.go @@ -24,12 +24,8 @@ type MessageStats struct { // NewMessageStats creates a new MessageStats instance func NewMessageStats(broker *Broker, printInterval time.Duration, logger logging.Logger) *MessageStats { - // Use the provided logger or create a default one - if logger == nil { - logger = logging.NewDevLogger() - } - statsLogger := logger.Named("mqtt.stats") - + statsLogger := logger.Named("mqtt.MessageStats") + s := &MessageStats{ ByNode: make(map[uint32]int), ByPortType: make(map[pb.PortNum]int), @@ -37,7 +33,7 @@ func NewMessageStats(broker *Broker, printInterval time.Duration, logger logging ticker: time.NewTicker(printInterval), logger: statsLogger, } - + // Create base subscriber with stats message handler s.BaseSubscriber = NewBaseSubscriber(SubscriberConfig{ Name: "MessageStats", @@ -47,10 +43,10 @@ func NewMessageStats(broker *Broker, printInterval time.Duration, logger logging StartHook: func() { go s.runTicker() }, CloseHook: func() { s.ticker.Stop() }, }) - + // Start processing messages s.Start() - + return s } @@ -72,10 +68,10 @@ func (s *MessageStats) recordMessage(packet *Packet) { defer s.Unlock() s.TotalMessages++ - + // Count by source node s.ByNode[packet.From]++ - + // Count by port type s.ByPortType[packet.PortNum]++ } @@ -88,42 +84,41 @@ func (s *MessageStats) PrintStats() { now := time.Now() duration := now.Sub(s.LastStatsPrinted) msgPerSec := float64(s.TotalMessages) / duration.Seconds() - + // Log the basic statistics with structured fields - s.logger.Infow("Message Statistics Summary", + s.logger.Infow("Message Statistics Summary", "total_messages", s.TotalMessages, "messages_per_second", msgPerSec, "duration_seconds", duration.Seconds(), ) - + // Create maps for structured node and port stats nodeStats := make(map[string]int) for nodeID, count := range s.ByNode { nodeStats[fmt.Sprintf("node_%d", nodeID)] = count } - + // Log node statistics with structured fields - s.logger.Infow("Messages by Node", + s.logger.Infow("Messages by Node", "node_counts", nodeStats, "active_nodes", len(s.ByNode), ) - + // Create maps for structured port stats portStats := make(map[string]int) for portType, count := range s.ByPortType { portStats[portType.String()] = count } - + // Log port type statistics with structured fields - s.logger.Infow("Messages by Port Type", + s.logger.Infow("Messages by Port Type", "port_counts", portStats, "active_ports", len(s.ByPortType), ) - + // Reset counters for rate calculation s.TotalMessages = 0 s.ByNode = make(map[uint32]int) s.ByPortType = make(map[pb.PortNum]int) s.LastStatsPrinted = now } - diff --git a/mqtt/subscriber.go b/mqtt/subscriber.go index b8935aa..32293ba 100644 --- a/mqtt/subscriber.go +++ b/mqtt/subscriber.go @@ -2,18 +2,18 @@ package mqtt import ( "sync" - + "github.com/dpup/prefab/logging" ) // SubscriberConfig holds configuration for creating a subscriber type SubscriberConfig struct { - Name string // Descriptive name for the subscriber - Broker *Broker // The broker to subscribe to - BufferSize int // Channel buffer size - Processor func(*Packet) // Function to process each packet - StartHook func() // Optional hook called when starting - CloseHook func() // Optional hook called when closing + Name string // Descriptive name for the subscriber + Broker *Broker // The broker to subscribe to + BufferSize int // Channel buffer size + Processor func(*Packet) // Function to process each packet + StartHook func() // Optional hook called when starting + CloseHook func() // Optional hook called when closing Logger logging.Logger // Logger instance to use } @@ -40,7 +40,7 @@ func NewBaseSubscriber(config SubscriberConfig) *BaseSubscriber { } else { subscriberLogger = config.Logger.Named("mqtt.subscriber." + config.Name) } - + return &BaseSubscriber{ broker: config.Broker, name: config.Name, @@ -57,23 +57,23 @@ func NewBaseSubscriber(config SubscriberConfig) *BaseSubscriber { func (b *BaseSubscriber) Start() { // Subscribe to the broker b.channel = b.broker.Subscribe(b.BufferSize) - + // Call the start hook if provided if b.startHook != nil { b.startHook() } - + // Start the processing loop b.wg.Add(1) go b.run() - + b.logger.Infof("Subscriber %s started", b.name) } // run processes messages from the channel func (b *BaseSubscriber) run() { defer b.wg.Done() - + for { select { case packet, ok := <-b.channel: @@ -82,11 +82,11 @@ func (b *BaseSubscriber) run() { b.logger.Infof("Channel closed for subscriber %s", b.name) return } - + if packet != nil && b.processor != nil { b.processor(packet) } - + case <-b.done: b.logger.Infof("Subscriber %s received shutdown signal", b.name) return @@ -97,25 +97,25 @@ func (b *BaseSubscriber) run() { // Close stops the subscriber and releases resources func (b *BaseSubscriber) Close() { b.logger.Infof("Closing subscriber %s", b.name) - + // Signal the processing loop to stop close(b.done) - + // Unsubscribe from the broker b.broker.Unsubscribe(b.channel) - + // Wait for processing to finish b.wg.Wait() - + // Call the close hook if provided if b.closeHook != nil { b.closeHook() } - + b.logger.Infof("Subscriber %s closed", b.name) } // Name returns the subscriber's name func (b *BaseSubscriber) Name() string { return b.name -} \ No newline at end of file +} diff --git a/server/static/index.html b/server/static/index.html index a8b9a2b..2f27ec3 100644 --- a/server/static/index.html +++ b/server/static/index.html @@ -1,5 +1,6 @@ + @@ -13,11 +14,13 @@ max-width: 960px; margin: 0 auto; } + h1 { color: #333; border-bottom: 1px solid #eee; padding-bottom: 10px; } + #messages { margin-top: 20px; border: 1px solid #ddd; @@ -29,13 +32,16 @@ font-family: 'Monaco', 'Consolas', monospace; font-size: 14px; } + .message { padding: 8px 0; border-bottom: 1px solid #eee; } + .message:last-child { border-bottom: none; } + .info { padding: 10px; background-color: #e9f7fe; @@ -44,14 +50,15 @@ } +

Meshtastic Stream

- +

This page displays real-time messages from Meshtastic nodes via MQTT.

Messages are streamed using Server-Sent Events (SSE) and will appear below as they arrive.

- +

Waiting for messages...

@@ -59,91 +66,57 @@ + \ No newline at end of file