Improve structured logging for better log aggregation

- Add message type prefix in brief mode for quick identification
- Include GatewayID in brief mode summary
- Remove formatted output from structured fields
- Use proper structured fields for each message type
- Add common fields like hopLimit and ID for all messages
- Extract specific data for position, telemetry, and text messages
- Format structured data for better log aggregation compatibility
This commit is contained in:
Daniel Pupius
2025-04-21 11:39:50 -07:00
parent 56a14fff61
commit 0f151c31b9
5 changed files with 129 additions and 182 deletions

64
main.go
View File

@@ -1,10 +1,8 @@
package main
import (
"fmt"
"os"
"os/signal"
"strings"
"syscall"
"time"
@@ -21,16 +19,16 @@ const (
mqttPassword = "large4cats"
mqttTopicPrefix = "msh/US/bayarea"
logsDir = "./logs"
// Web server configuration
serverHost = "localhost"
serverPort = "8080"
serverHost = "localhost"
serverPort = "8080"
)
func main() {
// Set up logging
logger := logging.NewDevLogger().Named("main")
logger := logging.NewProdLogger().Named("main")
// Initialize default channel key
err := decoder.AddChannelKey("LongFast", decoder.DefaultPrivateKey)
if err != nil {
@@ -49,84 +47,82 @@ func main() {
ClientID: "meshstream-client",
Topic: mqttTopicPrefix + "/#",
}
mqttClient := mqtt.NewClient(mqttConfig, logger)
// Connect to the MQTT broker
if err := mqttClient.Connect(); err != nil {
logger.Fatalw("Failed to connect to MQTT broker", "error", err)
}
// Get the messages channel to receive decoded messages
messagesChan := mqttClient.Messages()
// Create a message broker to distribute messages to multiple consumers
broker := mqtt.NewBroker(messagesChan, logger)
// Create a stats tracker that subscribes to the broker
// with statistics printed every 30 seconds
stats := mqtt.NewMessageStats(broker, 30*time.Second, logger)
// Create a message logger that subscribes to the broker
// and also logs to stdout with a separator
messageLogger, err := mqtt.NewMessageLogger(
broker,
true, // Use brief mode for more concise logs
true, // Enable logging to stdout
strings.Repeat("-", 80), // Use separator
broker,
false, // Use brief mode for more concise logs
logger,
)
if err != nil {
logger.Warnw("Failed to initialize message logger", "error", err)
}
// Start the web server
webServer := server.New(server.Config{
Host: serverHost,
Port: serverPort,
Broker: broker,
}, logger)
// Start the server in a goroutine
go func() {
if err := webServer.Start(); err != nil {
logger.Errorw("Web server error", "error", err)
}
}()
// Setup signal handling for graceful shutdown
sig := make(chan os.Signal, 1)
signal.Notify(sig, os.Interrupt, syscall.SIGTERM)
// Process messages until interrupt received
fmt.Println("Waiting for messages... Press Ctrl+C to exit")
fmt.Println("Statistics will be printed every 30 seconds")
fmt.Println("Messages will be logged to files in the ./logs directory")
fmt.Printf("Web server running at http://%s:%s\n", serverHost, serverPort)
logger.Info("Waiting for messages... Press Ctrl+C to exit")
logger.Info("Statistics will be printed every 30 seconds")
logger.Info("Messages will be logged to files in the ./logs directory")
logger.Infof("Web server running at http://%s:%s\n", serverHost, serverPort)
// Wait for interrupt signal
<-sig
// Got an interrupt signal, shutting down
fmt.Println("Shutting down...")
logger.Info("Shutting down...")
// Close components in reverse order of creation
// First stop the web server
if err := webServer.Stop(); err != nil {
logger.Errorw("Error stopping web server", "error", err)
}
// Then stop the logger
if messageLogger != nil {
messageLogger.Close()
}
// Stop the stats collector
stats.Close()
// Close the broker (which will close all subscriber channels)
broker.Close()
// Then disconnect the MQTT client
mqttClient.Disconnect()
}
}

View File

@@ -6,35 +6,25 @@ import (
"github.com/dpup/prefab/logging"
"meshstream/decoder"
pb "meshstream/proto/generated/meshtastic"
)
// MessageLogger logs messages using the provided logger
type MessageLogger struct {
*BaseSubscriber
logger logging.Logger // Main logger instance
briefMode bool // Whether to log brief summaries instead of full packets
logToStdout bool // Flag to enable console output
stdoutSeparator string // Separator for console output
logger logging.Logger // Main logger instance
briefMode bool // Whether to log brief summaries instead of full packets
}
// NewMessageLogger creates a new message logger that subscribes to the broker
func NewMessageLogger(broker *Broker, briefMode bool, logToStdout bool, stdoutSeparator string, logger logging.Logger) (*MessageLogger, error) {
// Use provided logger or create a default one
if logger == nil {
logger = logging.NewDevLogger()
}
messageLoggerLogger := logger.Named("mqtt.message_logger")
func NewMessageLogger(broker *Broker, briefMode bool, logger logging.Logger) (*MessageLogger, error) {
messageLoggerLogger := logger.Named("mqtt.MessageLogger")
ml := &MessageLogger{
briefMode: briefMode,
logToStdout: logToStdout,
stdoutSeparator: stdoutSeparator,
logger: messageLoggerLogger,
briefMode: briefMode,
logger: messageLoggerLogger,
}
// Create base subscriber with logger's message handler
ml.BaseSubscriber = NewBaseSubscriber(SubscriberConfig{
Name: "MessageLogger",
@@ -43,21 +33,21 @@ func NewMessageLogger(broker *Broker, briefMode bool, logToStdout bool, stdoutSe
Processor: ml.logMessage,
Logger: logger,
})
// Start processing messages
ml.Start()
return ml, nil
}
// getBriefSummary returns a brief summary of the packet
func (ml *MessageLogger) getBriefSummary(packet *Packet) string {
var summary string
if packet.DecodedPacket.DecodeError != nil {
return fmt.Sprintf("Error decoding packet: %v", packet.DecodedPacket.DecodeError)
}
// Create a basic summary based on the port type
switch packet.PortNum {
case pb.PortNum_TEXT_MESSAGE_APP:
@@ -67,7 +57,7 @@ func (ml *MessageLogger) getBriefSummary(packet *Packet) string {
} else {
summary = "Text message (invalid format)"
}
case pb.PortNum_POSITION_APP:
// For position messages, include a compact location summary
if pos, ok := packet.Payload.(*pb.Position); ok {
@@ -77,7 +67,7 @@ func (ml *MessageLogger) getBriefSummary(packet *Packet) string {
} else {
summary = "Position update (invalid format)"
}
case pb.PortNum_TELEMETRY_APP:
// For telemetry, give a short summary of what's included
if telemetry, ok := packet.Payload.(*pb.Telemetry); ok {
@@ -95,53 +85,45 @@ func (ml *MessageLogger) getBriefSummary(packet *Packet) string {
} else {
summary = "Telemetry (invalid format)"
}
default:
// For other types, just mention the port type
summary = fmt.Sprintf("Message type: %s", packet.PortNum.String())
}
return summary
}
// logMessage logs a message using the structured logger
func (ml *MessageLogger) logMessage(packet *Packet) {
// Get the full formatted output if needed for stdout or debug logging
formattedOutput := decoder.FormatTopicAndPacket(packet.TopicInfo, packet.DecodedPacket)
// Get a brief summary for structured logging
briefSummary := ml.getBriefSummary(packet)
// Prepare common fields for both logging modes
fields := []interface{}{
"portNum", packet.PortNum.String(),
"from", packet.From,
"to", packet.To,
"channel", packet.TopicInfo.Channel,
"region", packet.TopicInfo.RegionPath,
// Build the message prefix with type and GatewayID info for brief mode
typePrefix := fmt.Sprintf("[%s]", packet.PortNum.String())
if packet.GatewayID != "" {
briefSummary = fmt.Sprintf("%s Gateway:%s %s", typePrefix, packet.GatewayID, briefSummary)
} else {
briefSummary = fmt.Sprintf("%s %s", typePrefix, briefSummary)
}
// If packet had a decode error, add it to the fields
if packet.DecodedPacket.DecodeError != nil {
fields = append(fields, "error", packet.DecodedPacket.DecodeError.Error())
}
// Log based on mode
if ml.briefMode {
// Brief mode - just log the summary with structured fields
fields := []interface{}{
"portNum", packet.PortNum.String(),
"from", packet.From,
"to", packet.To,
"gateway", packet.GatewayID,
"channel", packet.TopicInfo.Channel,
"region", packet.TopicInfo.RegionPath,
"hopLimit", packet.HopLimit,
"id", packet.ID,
}
if packet.DecodedPacket.DecodeError != nil {
fields = append(fields, "error", packet.DecodedPacket.DecodeError.Error())
}
ml.logger.Infow(briefSummary, fields...)
} else {
// Full mode - include the full formatted output
allFields := append(fields, "fullOutput", formattedOutput)
ml.logger.Infow("Message received", allFields...)
ml.logger.Infow(briefSummary, "packet", packet)
}
// Log to stdout if enabled
if ml.logToStdout {
fmt.Println(formattedOutput)
if ml.stdoutSeparator != "" {
fmt.Println(ml.stdoutSeparator)
}
}
}
}

View File

@@ -24,12 +24,8 @@ type MessageStats struct {
// NewMessageStats creates a new MessageStats instance
func NewMessageStats(broker *Broker, printInterval time.Duration, logger logging.Logger) *MessageStats {
// Use the provided logger or create a default one
if logger == nil {
logger = logging.NewDevLogger()
}
statsLogger := logger.Named("mqtt.stats")
statsLogger := logger.Named("mqtt.MessageStats")
s := &MessageStats{
ByNode: make(map[uint32]int),
ByPortType: make(map[pb.PortNum]int),
@@ -37,7 +33,7 @@ func NewMessageStats(broker *Broker, printInterval time.Duration, logger logging
ticker: time.NewTicker(printInterval),
logger: statsLogger,
}
// Create base subscriber with stats message handler
s.BaseSubscriber = NewBaseSubscriber(SubscriberConfig{
Name: "MessageStats",
@@ -47,10 +43,10 @@ func NewMessageStats(broker *Broker, printInterval time.Duration, logger logging
StartHook: func() { go s.runTicker() },
CloseHook: func() { s.ticker.Stop() },
})
// Start processing messages
s.Start()
return s
}
@@ -72,10 +68,10 @@ func (s *MessageStats) recordMessage(packet *Packet) {
defer s.Unlock()
s.TotalMessages++
// Count by source node
s.ByNode[packet.From]++
// Count by port type
s.ByPortType[packet.PortNum]++
}
@@ -88,42 +84,41 @@ func (s *MessageStats) PrintStats() {
now := time.Now()
duration := now.Sub(s.LastStatsPrinted)
msgPerSec := float64(s.TotalMessages) / duration.Seconds()
// Log the basic statistics with structured fields
s.logger.Infow("Message Statistics Summary",
s.logger.Infow("Message Statistics Summary",
"total_messages", s.TotalMessages,
"messages_per_second", msgPerSec,
"duration_seconds", duration.Seconds(),
)
// Create maps for structured node and port stats
nodeStats := make(map[string]int)
for nodeID, count := range s.ByNode {
nodeStats[fmt.Sprintf("node_%d", nodeID)] = count
}
// Log node statistics with structured fields
s.logger.Infow("Messages by Node",
s.logger.Infow("Messages by Node",
"node_counts", nodeStats,
"active_nodes", len(s.ByNode),
)
// Create maps for structured port stats
portStats := make(map[string]int)
for portType, count := range s.ByPortType {
portStats[portType.String()] = count
}
// Log port type statistics with structured fields
s.logger.Infow("Messages by Port Type",
s.logger.Infow("Messages by Port Type",
"port_counts", portStats,
"active_ports", len(s.ByPortType),
)
// Reset counters for rate calculation
s.TotalMessages = 0
s.ByNode = make(map[uint32]int)
s.ByPortType = make(map[pb.PortNum]int)
s.LastStatsPrinted = now
}

View File

@@ -2,18 +2,18 @@ package mqtt
import (
"sync"
"github.com/dpup/prefab/logging"
)
// SubscriberConfig holds configuration for creating a subscriber
type SubscriberConfig struct {
Name string // Descriptive name for the subscriber
Broker *Broker // The broker to subscribe to
BufferSize int // Channel buffer size
Processor func(*Packet) // Function to process each packet
StartHook func() // Optional hook called when starting
CloseHook func() // Optional hook called when closing
Name string // Descriptive name for the subscriber
Broker *Broker // The broker to subscribe to
BufferSize int // Channel buffer size
Processor func(*Packet) // Function to process each packet
StartHook func() // Optional hook called when starting
CloseHook func() // Optional hook called when closing
Logger logging.Logger // Logger instance to use
}
@@ -40,7 +40,7 @@ func NewBaseSubscriber(config SubscriberConfig) *BaseSubscriber {
} else {
subscriberLogger = config.Logger.Named("mqtt.subscriber." + config.Name)
}
return &BaseSubscriber{
broker: config.Broker,
name: config.Name,
@@ -57,23 +57,23 @@ func NewBaseSubscriber(config SubscriberConfig) *BaseSubscriber {
func (b *BaseSubscriber) Start() {
// Subscribe to the broker
b.channel = b.broker.Subscribe(b.BufferSize)
// Call the start hook if provided
if b.startHook != nil {
b.startHook()
}
// Start the processing loop
b.wg.Add(1)
go b.run()
b.logger.Infof("Subscriber %s started", b.name)
}
// run processes messages from the channel
func (b *BaseSubscriber) run() {
defer b.wg.Done()
for {
select {
case packet, ok := <-b.channel:
@@ -82,11 +82,11 @@ func (b *BaseSubscriber) run() {
b.logger.Infof("Channel closed for subscriber %s", b.name)
return
}
if packet != nil && b.processor != nil {
b.processor(packet)
}
case <-b.done:
b.logger.Infof("Subscriber %s received shutdown signal", b.name)
return
@@ -97,25 +97,25 @@ func (b *BaseSubscriber) run() {
// Close stops the subscriber and releases resources
func (b *BaseSubscriber) Close() {
b.logger.Infof("Closing subscriber %s", b.name)
// Signal the processing loop to stop
close(b.done)
// Unsubscribe from the broker
b.broker.Unsubscribe(b.channel)
// Wait for processing to finish
b.wg.Wait()
// Call the close hook if provided
if b.closeHook != nil {
b.closeHook()
}
b.logger.Infof("Subscriber %s closed", b.name)
}
// Name returns the subscriber's name
func (b *BaseSubscriber) Name() string {
return b.name
}
}

View File

@@ -1,5 +1,6 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
@@ -13,11 +14,13 @@
max-width: 960px;
margin: 0 auto;
}
h1 {
color: #333;
border-bottom: 1px solid #eee;
padding-bottom: 10px;
}
#messages {
margin-top: 20px;
border: 1px solid #ddd;
@@ -29,13 +32,16 @@
font-family: 'Monaco', 'Consolas', monospace;
font-size: 14px;
}
.message {
padding: 8px 0;
border-bottom: 1px solid #eee;
}
.message:last-child {
border-bottom: none;
}
.info {
padding: 10px;
background-color: #e9f7fe;
@@ -44,14 +50,15 @@
}
</style>
</head>
<body>
<h1>Meshtastic Stream</h1>
<div class="info">
<p>This page displays real-time messages from Meshtastic nodes via MQTT.</p>
<p>Messages are streamed using Server-Sent Events (SSE) and will appear below as they arrive.</p>
</div>
<div id="messages">
<p>Waiting for messages...</p>
</div>
@@ -59,91 +66,57 @@
<script>
document.addEventListener('DOMContentLoaded', () => {
const messagesDiv = document.getElementById('messages');
// Function to add a message to the UI
function addMessage(text) {
const msgElement = document.createElement('div');
const msgElement = document.createElement('pre');
msgElement.className = 'message';
msgElement.textContent = text;
messagesDiv.appendChild(msgElement);
// Auto-scroll to the bottom
messagesDiv.scrollTop = messagesDiv.scrollHeight;
}
// Clear initial waiting message
messagesDiv.innerHTML = '';
// Connect to SSE endpoint
console.log('Connecting to SSE stream...');
const eventSource = new EventSource('/api/stream');
// Handle connection open
eventSource.onopen = () => {
console.log('SSE connection established');
addMessage('Connected to Meshtastic stream.');
};
// Handle connection error
eventSource.onerror = (error) => {
console.error('SSE connection error:', error);
addMessage('Error: Connection to server lost. Trying to reconnect...');
};
// Handle 'info' events
eventSource.addEventListener('info', (e) => {
console.log('Info event:', e.data);
addMessage(`Server info: ${e.data}`);
});
// Handle 'message' events
eventSource.addEventListener('message', (e) => {
console.log('Message event:', e.data);
try {
const packet = JSON.parse(e.data);
const timestamp = new Date(packet.received_at * 1000).toLocaleTimeString();
// Store the full packet data for later reference if needed
console.log('Full packet:', packet);
// Format the message with essential information
let msgText = `[${timestamp}] From Node ${packet.from} to ${packet.to} (${packet.port_string})`;
// Add packet ID and hop info if available
if (packet.id) {
msgText += ` • ID: ${packet.id}`;
}
if (packet.hop_limit && packet.hop_limit > 0) {
msgText += ` • Hop: ${packet.hop_start}/${packet.hop_limit}`;
}
// Determine payload type and add appropriate info
const payloadType = typeof packet.payload;
if (payloadType === 'string') {
// Text message
msgText += ` • Message: "${packet.payload}"`;
} else if (packet.payload && typeof packet.payload === 'object') {
// Handle various object payload types
if (Array.isArray(packet.payload)) {
// Binary data as array
msgText += ` • Binary data: ${packet.payload.length} bytes`;
} else {
// Structured data (protobuf object)
msgText += ` • Data: ${Object.keys(packet.payload).length} fields`;
}
} else if (packet.payload === null) {
msgText += ` • No payload`;
} else {
msgText += ` • Payload type: ${payloadType}`;
}
// Add channel ID if available
if (packet.channel_id) {
msgText += ` • Channel: ${packet.channel_id}`;
}
let msgText = `[${timestamp}] ${packet.port_string}\n\n`;
msgText += JSON.stringify(packet, null, 2);
addMessage(msgText);
} catch (err) {
console.error('Error parsing message:', err);
@@ -153,4 +126,5 @@
});
</script>
</body>
</html>