diff --git a/main.go b/main.go index 7ddebd2..fdedf9f 100644 --- a/main.go +++ b/main.go @@ -57,15 +57,18 @@ func main() { // Create a message broker to distribute messages to multiple consumers broker := mqtt.NewBroker(messagesChan) - // Create a consumer channel for display with buffer size 10 - displayChan := broker.Subscribe(10) - // Create a stats tracker that subscribes to the broker // with statistics printed every 30 seconds stats := mqtt.NewMessageStats(broker, 30*time.Second) // Create a message logger that subscribes to the broker - messageLogger, err := mqtt.NewMessageLogger(broker, logsDir) + // and also logs to stdout with a separator + messageLogger, err := mqtt.NewMessageLogger( + broker, + logsDir, + true, // Enable logging to stdout + strings.Repeat("-", 80), // Use separator + ) if err != nil { log.Printf("Warning: Failed to initialize message logger: %v", err) } @@ -77,39 +80,24 @@ func main() { // Process messages until interrupt received fmt.Println("Waiting for messages... Press Ctrl+C to exit") fmt.Println("Statistics will be printed every 30 seconds") - fmt.Println("Specific message types will be logged to files in the ./logs directory") + fmt.Println("Messages will be logged to files in the ./logs directory") - // Main event loop for display - for { - select { - case packet := <-displayChan: - if packet == nil { - log.Println("Received nil packet, subscriber channel may be closed") - continue - } - - // Format and print the decoded message - formattedOutput := decoder.FormatTopicAndPacket(packet.TopicInfo, packet.DecodedPacket) - fmt.Println(formattedOutput) - fmt.Println(strings.Repeat("-", 80)) - - case <-sig: - // Got an interrupt signal, shutting down - fmt.Println("Shutting down...") - - // Close components in reverse order of creation - if messageLogger != nil { - messageLogger.Close() - } - - stats.Close() - - // Close the broker (which will close all subscriber channels) - broker.Close() - - // Then disconnect the MQTT client - mqttClient.Disconnect() - return - } + // Wait for interrupt signal + <-sig + + // Got an interrupt signal, shutting down + fmt.Println("Shutting down...") + + // Close components in reverse order of creation + if messageLogger != nil { + messageLogger.Close() } + + stats.Close() + + // Close the broker (which will close all subscriber channels) + broker.Close() + + // Then disconnect the MQTT client + mqttClient.Disconnect() } \ No newline at end of file diff --git a/mqtt/logger.go b/mqtt/logger.go index 2c1a823..de37ef4 100644 --- a/mqtt/logger.go +++ b/mqtt/logger.go @@ -14,31 +14,35 @@ import ( pb "meshstream/proto/generated/meshtastic" ) -// MessageLogger logs messages of specific types to separate files +// MessageLogger logs messages to files and optionally to stdout type MessageLogger struct { - logDir string - broker *Broker - subscriber <-chan *Packet - loggers map[pb.PortNum]*log.Logger - files map[pb.PortNum]*os.File - mutex sync.Mutex - done chan struct{} - wg sync.WaitGroup + logDir string + broker *Broker + subscriber <-chan *Packet + loggers map[pb.PortNum]*log.Logger + files map[pb.PortNum]*os.File + mutex sync.Mutex + done chan struct{} + wg sync.WaitGroup + logToStdout bool // Flag to enable console output + stdoutSeparator string // Separator for console output } // NewMessageLogger creates a new message logger that subscribes to the broker -func NewMessageLogger(broker *Broker, logDir string) (*MessageLogger, error) { +func NewMessageLogger(broker *Broker, logDir string, logToStdout bool, stdoutSeparator string) (*MessageLogger, error) { // Ensure log directory exists if err := os.MkdirAll(logDir, 0755); err != nil { return nil, fmt.Errorf("failed to create log directory: %v", err) } ml := &MessageLogger{ - logDir: logDir, - broker: broker, - loggers: make(map[pb.PortNum]*log.Logger), - files: make(map[pb.PortNum]*os.File), - done: make(chan struct{}), + logDir: logDir, + broker: broker, + loggers: make(map[pb.PortNum]*log.Logger), + files: make(map[pb.PortNum]*os.File), + done: make(chan struct{}), + logToStdout: logToStdout, + stdoutSeparator: stdoutSeparator, } // Subscribe to the broker with a large buffer @@ -103,23 +107,30 @@ func (ml *MessageLogger) getLogger(portNum pb.PortNum) *log.Logger { return logger } -// logMessage logs a message to the appropriate file based on its port type +// logMessage logs a message to the appropriate file and optionally to stdout func (ml *MessageLogger) logMessage(packet *Packet) { - // Log all message types by getting a logger for the packet's port type + // Format the message + formattedOutput := decoder.FormatTopicAndPacket(packet.TopicInfo, packet.DecodedPacket) + + // Add a timestamp and node info + logEntry := fmt.Sprintf("[Node %d] %s\n%s", + packet.From, + time.Now().Format(time.RFC3339), + formattedOutput) + + // Log to file logger := ml.getLogger(packet.PortNum) if logger != nil { - // Format the message - formattedOutput := decoder.FormatTopicAndPacket(packet.TopicInfo, packet.DecodedPacket) - - // Add a timestamp and node info - logEntry := fmt.Sprintf("[Node %d] %s\n%s\n", - packet.From, - time.Now().Format(time.RFC3339), - formattedOutput) - - // Write to the log logger.Println(logEntry) } + + // Log to stdout if enabled + if ml.logToStdout { + fmt.Println(formattedOutput) + if ml.stdoutSeparator != "" { + fmt.Println(ml.stdoutSeparator) + } + } } // Close stops the logger and closes all log files