Consolidate display and logging functionality in MessageLogger

This commit is contained in:
Daniel Pupius
2025-04-21 08:52:15 -07:00
parent c129be3be6
commit ef660b2025
2 changed files with 63 additions and 64 deletions

62
main.go
View File

@@ -57,15 +57,18 @@ func main() {
// Create a message broker to distribute messages to multiple consumers
broker := mqtt.NewBroker(messagesChan)
// Create a consumer channel for display with buffer size 10
displayChan := broker.Subscribe(10)
// Create a stats tracker that subscribes to the broker
// with statistics printed every 30 seconds
stats := mqtt.NewMessageStats(broker, 30*time.Second)
// Create a message logger that subscribes to the broker
messageLogger, err := mqtt.NewMessageLogger(broker, logsDir)
// and also logs to stdout with a separator
messageLogger, err := mqtt.NewMessageLogger(
broker,
logsDir,
true, // Enable logging to stdout
strings.Repeat("-", 80), // Use separator
)
if err != nil {
log.Printf("Warning: Failed to initialize message logger: %v", err)
}
@@ -77,39 +80,24 @@ func main() {
// Process messages until interrupt received
fmt.Println("Waiting for messages... Press Ctrl+C to exit")
fmt.Println("Statistics will be printed every 30 seconds")
fmt.Println("Specific message types will be logged to files in the ./logs directory")
fmt.Println("Messages will be logged to files in the ./logs directory")
// Main event loop for display
for {
select {
case packet := <-displayChan:
if packet == nil {
log.Println("Received nil packet, subscriber channel may be closed")
continue
}
// Format and print the decoded message
formattedOutput := decoder.FormatTopicAndPacket(packet.TopicInfo, packet.DecodedPacket)
fmt.Println(formattedOutput)
fmt.Println(strings.Repeat("-", 80))
case <-sig:
// Got an interrupt signal, shutting down
fmt.Println("Shutting down...")
// Close components in reverse order of creation
if messageLogger != nil {
messageLogger.Close()
}
stats.Close()
// Close the broker (which will close all subscriber channels)
broker.Close()
// Then disconnect the MQTT client
mqttClient.Disconnect()
return
}
// Wait for interrupt signal
<-sig
// Got an interrupt signal, shutting down
fmt.Println("Shutting down...")
// Close components in reverse order of creation
if messageLogger != nil {
messageLogger.Close()
}
stats.Close()
// Close the broker (which will close all subscriber channels)
broker.Close()
// Then disconnect the MQTT client
mqttClient.Disconnect()
}

View File

@@ -14,31 +14,35 @@ import (
pb "meshstream/proto/generated/meshtastic"
)
// MessageLogger logs messages of specific types to separate files
// MessageLogger logs messages to files and optionally to stdout
type MessageLogger struct {
logDir string
broker *Broker
subscriber <-chan *Packet
loggers map[pb.PortNum]*log.Logger
files map[pb.PortNum]*os.File
mutex sync.Mutex
done chan struct{}
wg sync.WaitGroup
logDir string
broker *Broker
subscriber <-chan *Packet
loggers map[pb.PortNum]*log.Logger
files map[pb.PortNum]*os.File
mutex sync.Mutex
done chan struct{}
wg sync.WaitGroup
logToStdout bool // Flag to enable console output
stdoutSeparator string // Separator for console output
}
// NewMessageLogger creates a new message logger that subscribes to the broker
func NewMessageLogger(broker *Broker, logDir string) (*MessageLogger, error) {
func NewMessageLogger(broker *Broker, logDir string, logToStdout bool, stdoutSeparator string) (*MessageLogger, error) {
// Ensure log directory exists
if err := os.MkdirAll(logDir, 0755); err != nil {
return nil, fmt.Errorf("failed to create log directory: %v", err)
}
ml := &MessageLogger{
logDir: logDir,
broker: broker,
loggers: make(map[pb.PortNum]*log.Logger),
files: make(map[pb.PortNum]*os.File),
done: make(chan struct{}),
logDir: logDir,
broker: broker,
loggers: make(map[pb.PortNum]*log.Logger),
files: make(map[pb.PortNum]*os.File),
done: make(chan struct{}),
logToStdout: logToStdout,
stdoutSeparator: stdoutSeparator,
}
// Subscribe to the broker with a large buffer
@@ -103,23 +107,30 @@ func (ml *MessageLogger) getLogger(portNum pb.PortNum) *log.Logger {
return logger
}
// logMessage logs a message to the appropriate file based on its port type
// logMessage logs a message to the appropriate file and optionally to stdout
func (ml *MessageLogger) logMessage(packet *Packet) {
// Log all message types by getting a logger for the packet's port type
// Format the message
formattedOutput := decoder.FormatTopicAndPacket(packet.TopicInfo, packet.DecodedPacket)
// Add a timestamp and node info
logEntry := fmt.Sprintf("[Node %d] %s\n%s",
packet.From,
time.Now().Format(time.RFC3339),
formattedOutput)
// Log to file
logger := ml.getLogger(packet.PortNum)
if logger != nil {
// Format the message
formattedOutput := decoder.FormatTopicAndPacket(packet.TopicInfo, packet.DecodedPacket)
// Add a timestamp and node info
logEntry := fmt.Sprintf("[Node %d] %s\n%s\n",
packet.From,
time.Now().Format(time.RFC3339),
formattedOutput)
// Write to the log
logger.Println(logEntry)
}
// Log to stdout if enabled
if ml.logToStdout {
fmt.Println(formattedOutput)
if ml.stdoutSeparator != "" {
fmt.Println(ml.stdoutSeparator)
}
}
}
// Close stops the logger and closes all log files