diff --git a/main.go b/main.go index f7b9721..7ddebd2 100644 --- a/main.go +++ b/main.go @@ -5,16 +5,12 @@ import ( "log" "os" "os/signal" - "path/filepath" "strings" - "sync" "syscall" "time" "meshstream/decoder" "meshstream/mqtt" - - pb "meshstream/proto/generated/meshtastic" ) const ( @@ -25,162 +21,6 @@ const ( logsDir = "./logs" ) -// MessageStats tracks statistics about received messages -type MessageStats struct { - sync.Mutex - TotalMessages int - ByNode map[uint32]int - ByPortType map[pb.PortNum]int - LastStatsPrinted time.Time -} - -// NewMessageStats creates a new MessageStats instance -func NewMessageStats() *MessageStats { - return &MessageStats{ - ByNode: make(map[uint32]int), - ByPortType: make(map[pb.PortNum]int), - LastStatsPrinted: time.Now(), - } -} - -// RecordMessage records a message in the statistics -func (s *MessageStats) RecordMessage(packet *mqtt.Packet) { - s.Lock() - defer s.Unlock() - - s.TotalMessages++ - - // Count by source node - s.ByNode[packet.From]++ - - // Count by port type - s.ByPortType[packet.PortNum]++ -} - -// PrintStats prints current statistics -func (s *MessageStats) PrintStats() { - s.Lock() - defer s.Unlock() - - now := time.Now() - duration := now.Sub(s.LastStatsPrinted) - msgPerSec := float64(s.TotalMessages) / duration.Seconds() - - fmt.Println("\n==== Message Statistics ====") - fmt.Printf("Total messages: %d (%.2f msg/sec)\n", s.TotalMessages, msgPerSec) - - // Print node statistics - fmt.Println("\nMessages by Node:") - for nodeID, count := range s.ByNode { - fmt.Printf(" Node %d: %d messages\n", nodeID, count) - } - - // Print port type statistics - fmt.Println("\nMessages by Port Type:") - for portType, count := range s.ByPortType { - fmt.Printf(" %s: %d messages\n", portType, count) - } - fmt.Println(strings.Repeat("=", 30)) - - // Reset counters for rate calculation - s.TotalMessages = 0 - s.ByNode = make(map[uint32]int) - s.ByPortType = make(map[pb.PortNum]int) - s.LastStatsPrinted = now -} - -// MessageLogger logs messages of specific types to separate files -type MessageLogger struct { - logDir string - loggers map[pb.PortNum]*log.Logger - files map[pb.PortNum]*os.File - mutex sync.Mutex -} - -// NewMessageLogger creates a new message logger -func NewMessageLogger(logDir string) (*MessageLogger, error) { - // Ensure log directory exists - if err := os.MkdirAll(logDir, 0755); err != nil { - return nil, fmt.Errorf("failed to create log directory: %v", err) - } - - return &MessageLogger{ - logDir: logDir, - loggers: make(map[pb.PortNum]*log.Logger), - files: make(map[pb.PortNum]*os.File), - }, nil -} - -// getLogger returns a logger for the specified port type -func (ml *MessageLogger) getLogger(portNum pb.PortNum) *log.Logger { - ml.mutex.Lock() - defer ml.mutex.Unlock() - - // Check if we already have a logger for this port type - if logger, ok := ml.loggers[portNum]; ok { - return logger - } - - // Create a new log file for this port type - filename := fmt.Sprintf("%s.log", strings.ToLower(portNum.String())) - filepath := filepath.Join(ml.logDir, filename) - - file, err := os.OpenFile(filepath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644) - if err != nil { - log.Printf("Error opening log file %s: %v", filepath, err) - return nil - } - - // Create a new logger - logger := log.New(file, "", log.LstdFlags) - - // Store the logger and file handle - ml.loggers[portNum] = logger - ml.files[portNum] = file - - return logger -} - -// LogMessage logs a message to the appropriate file based on its port type -func (ml *MessageLogger) LogMessage(packet *mqtt.Packet) { - // We only log specific message types - switch packet.PortNum { - case pb.PortNum_POSITION_APP, - pb.PortNum_TELEMETRY_APP, - pb.PortNum_NODEINFO_APP, - pb.PortNum_MAP_REPORT_APP, - pb.PortNum_TRACEROUTE_APP, - pb.PortNum_NEIGHBORINFO_APP: - - // Get the logger for this port type - logger := ml.getLogger(packet.PortNum) - if logger != nil { - // Format the message - formattedOutput := decoder.FormatTopicAndPacket(packet.TopicInfo, packet.DecodedPacket) - - // Add a timestamp and node info - logEntry := fmt.Sprintf("[Node %d] %s\n%s\n", - packet.From, - time.Now().Format(time.RFC3339), - formattedOutput) - - // Write to the log - logger.Println(logEntry) - } - } -} - -// Close closes all log files -func (ml *MessageLogger) Close() { - ml.mutex.Lock() - defer ml.mutex.Unlock() - - for portNum, file := range ml.files { - log.Printf("Closing log file for %s", portNum) - file.Close() - } -} - func main() { // Set up logging log.SetOutput(os.Stdout) @@ -220,65 +60,20 @@ func main() { // Create a consumer channel for display with buffer size 10 displayChan := broker.Subscribe(10) - // Create a consumer channel for statistics with buffer size 50 - statsChan := broker.Subscribe(50) + // Create a stats tracker that subscribes to the broker + // with statistics printed every 30 seconds + stats := mqtt.NewMessageStats(broker, 30*time.Second) - // Create a consumer channel for logging with buffer size 100 - loggerChan := broker.Subscribe(100) - - // Create a stats tracker - stats := NewMessageStats() - - // Create a message logger - messageLogger, err := NewMessageLogger(logsDir) + // Create a message logger that subscribes to the broker + messageLogger, err := mqtt.NewMessageLogger(broker, logsDir) if err != nil { log.Printf("Warning: Failed to initialize message logger: %v", err) } - // Create a ticker for periodically printing stats - statsTicker := time.NewTicker(30 * time.Second) - // Setup signal handling for graceful shutdown sig := make(chan os.Signal, 1) signal.Notify(sig, os.Interrupt, syscall.SIGTERM) - // Start a goroutine for processing statistics - go func() { - for { - select { - case packet, ok := <-statsChan: - if !ok { - // Channel closed - return - } - - if packet != nil { - stats.RecordMessage(packet) - } - - case <-statsTicker.C: - stats.PrintStats() - } - } - }() - - // Start a goroutine for logging specific message types - go func() { - if messageLogger != nil { - for { - packet, ok := <-loggerChan - if !ok { - // Channel closed - return - } - - if packet != nil { - messageLogger.LogMessage(packet) - } - } - } - }() - // Process messages until interrupt received fmt.Println("Waiting for messages... Press Ctrl+C to exit") fmt.Println("Statistics will be printed every 30 seconds") @@ -301,14 +96,17 @@ func main() { case <-sig: // Got an interrupt signal, shutting down fmt.Println("Shutting down...") - // Stop the ticker - statsTicker.Stop() - // Close the message logger + + // Close components in reverse order of creation if messageLogger != nil { messageLogger.Close() } - // Close the broker first (which will close all subscriber channels) + + stats.Close() + + // Close the broker (which will close all subscriber channels) broker.Close() + // Then disconnect the MQTT client mqttClient.Disconnect() return diff --git a/mqtt/logger.go b/mqtt/logger.go new file mode 100644 index 0000000..2c1a823 --- /dev/null +++ b/mqtt/logger.go @@ -0,0 +1,144 @@ +package mqtt + +import ( + "fmt" + "log" + "os" + "path/filepath" + "strings" + "sync" + "time" + + "meshstream/decoder" + + pb "meshstream/proto/generated/meshtastic" +) + +// MessageLogger logs messages of specific types to separate files +type MessageLogger struct { + logDir string + broker *Broker + subscriber <-chan *Packet + loggers map[pb.PortNum]*log.Logger + files map[pb.PortNum]*os.File + mutex sync.Mutex + done chan struct{} + wg sync.WaitGroup +} + +// NewMessageLogger creates a new message logger that subscribes to the broker +func NewMessageLogger(broker *Broker, logDir string) (*MessageLogger, error) { + // Ensure log directory exists + if err := os.MkdirAll(logDir, 0755); err != nil { + return nil, fmt.Errorf("failed to create log directory: %v", err) + } + + ml := &MessageLogger{ + logDir: logDir, + broker: broker, + loggers: make(map[pb.PortNum]*log.Logger), + files: make(map[pb.PortNum]*os.File), + done: make(chan struct{}), + } + + // Subscribe to the broker with a large buffer + ml.subscriber = broker.Subscribe(100) + + // Start processing messages + ml.wg.Add(1) + go ml.run() + + return ml, nil +} + +// run processes incoming messages +func (ml *MessageLogger) run() { + defer ml.wg.Done() + + for { + select { + case packet, ok := <-ml.subscriber: + if !ok { + // Channel closed + return + } + + if packet != nil { + ml.logMessage(packet) + } + + case <-ml.done: + return + } + } +} + +// getLogger returns a logger for the specified port type +func (ml *MessageLogger) getLogger(portNum pb.PortNum) *log.Logger { + ml.mutex.Lock() + defer ml.mutex.Unlock() + + // Check if we already have a logger for this port type + if logger, ok := ml.loggers[portNum]; ok { + return logger + } + + // Create a new log file for this port type + filename := fmt.Sprintf("%s.log", strings.ToLower(portNum.String())) + filepath := filepath.Join(ml.logDir, filename) + + file, err := os.OpenFile(filepath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644) + if err != nil { + log.Printf("Error opening log file %s: %v", filepath, err) + return nil + } + + // Create a new logger + logger := log.New(file, "", log.LstdFlags) + + // Store the logger and file handle + ml.loggers[portNum] = logger + ml.files[portNum] = file + + return logger +} + +// logMessage logs a message to the appropriate file based on its port type +func (ml *MessageLogger) logMessage(packet *Packet) { + // Log all message types by getting a logger for the packet's port type + logger := ml.getLogger(packet.PortNum) + if logger != nil { + // Format the message + formattedOutput := decoder.FormatTopicAndPacket(packet.TopicInfo, packet.DecodedPacket) + + // Add a timestamp and node info + logEntry := fmt.Sprintf("[Node %d] %s\n%s\n", + packet.From, + time.Now().Format(time.RFC3339), + formattedOutput) + + // Write to the log + logger.Println(logEntry) + } +} + +// Close stops the logger and closes all log files +func (ml *MessageLogger) Close() { + // Signal the processing loop to stop + close(ml.done) + + // Unsubscribe from the broker + ml.broker.Unsubscribe(ml.subscriber) + + // Wait for the processing loop to exit + ml.wg.Wait() + + // Close all log files + ml.mutex.Lock() + defer ml.mutex.Unlock() + + for portNum, file := range ml.files { + log.Printf("Closing log file for %s", portNum) + file.Close() + } +} \ No newline at end of file diff --git a/mqtt/stats.go b/mqtt/stats.go new file mode 100644 index 0000000..10879a8 --- /dev/null +++ b/mqtt/stats.go @@ -0,0 +1,130 @@ +package mqtt + +import ( + "fmt" + "strings" + "sync" + "time" + + pb "meshstream/proto/generated/meshtastic" +) + +// MessageStats tracks statistics about received messages +type MessageStats struct { + sync.Mutex + broker *Broker + subscriber <-chan *Packet + TotalMessages int + ByNode map[uint32]int + ByPortType map[pb.PortNum]int + LastStatsPrinted time.Time + done chan struct{} + wg sync.WaitGroup +} + +// NewMessageStats creates a new MessageStats instance +func NewMessageStats(broker *Broker, printInterval time.Duration) *MessageStats { + s := &MessageStats{ + broker: broker, + ByNode: make(map[uint32]int), + ByPortType: make(map[pb.PortNum]int), + LastStatsPrinted: time.Now(), + done: make(chan struct{}), + } + + // Subscribe to the broker with a larger buffer to handle bursts + s.subscriber = broker.Subscribe(50) + + // Start the collection loop + s.wg.Add(1) + go s.run(printInterval) + + return s +} + +// run handles statistics collection and periodic printing +func (s *MessageStats) run(printInterval time.Duration) { + defer s.wg.Done() + + // Create a ticker for periodically printing stats + statsTicker := time.NewTicker(printInterval) + defer statsTicker.Stop() + + for { + select { + case packet, ok := <-s.subscriber: + if !ok { + // Channel closed + return + } + + if packet != nil { + s.recordMessage(packet) + } + + case <-statsTicker.C: + s.PrintStats() + + case <-s.done: + return + } + } +} + +// recordMessage records a message in the statistics +func (s *MessageStats) recordMessage(packet *Packet) { + s.Lock() + defer s.Unlock() + + s.TotalMessages++ + + // Count by source node + s.ByNode[packet.From]++ + + // Count by port type + s.ByPortType[packet.PortNum]++ +} + +// PrintStats prints current statistics +func (s *MessageStats) PrintStats() { + s.Lock() + defer s.Unlock() + + now := time.Now() + duration := now.Sub(s.LastStatsPrinted) + msgPerSec := float64(s.TotalMessages) / duration.Seconds() + + fmt.Println("\n==== Message Statistics ====") + fmt.Printf("Total messages: %d (%.2f msg/sec)\n", s.TotalMessages, msgPerSec) + + // Print node statistics + fmt.Println("\nMessages by Node:") + for nodeID, count := range s.ByNode { + fmt.Printf(" Node %d: %d messages\n", nodeID, count) + } + + // Print port type statistics + fmt.Println("\nMessages by Port Type:") + for portType, count := range s.ByPortType { + fmt.Printf(" %s: %d messages\n", portType, count) + } + fmt.Println(strings.Repeat("=", 30)) + + // Reset counters for rate calculation + s.TotalMessages = 0 + s.ByNode = make(map[uint32]int) + s.ByPortType = make(map[pb.PortNum]int) + s.LastStatsPrinted = now +} + +// Close stops the stats collector and unsubscribes from the broker +func (s *MessageStats) Close() { + // Signal the collection loop to stop + close(s.done) + + // Unsubscribe from the broker + s.broker.Unsubscribe(s.subscriber) + + // Wait for the collection loop to exit + s.wg.Wait() +} \ No newline at end of file