diff --git a/main.go b/main.go index 68c8b1c..95365a6 100644 --- a/main.go +++ b/main.go @@ -71,7 +71,7 @@ func main() { // and also logs to stdout with a separator messageLogger, err := mqtt.NewMessageLogger( broker, - logsDir, + true, // Use brief mode for more concise logs true, // Enable logging to stdout strings.Repeat("-", 80), // Use separator logger, diff --git a/mqtt/logger.go b/mqtt/logger.go index 6c87337..d5577ff 100644 --- a/mqtt/logger.go +++ b/mqtt/logger.go @@ -2,12 +2,7 @@ package mqtt import ( "fmt" - "io" - "os" - "path/filepath" "strings" - "sync" - "time" "github.com/dpup/prefab/logging" @@ -16,25 +11,17 @@ import ( pb "meshstream/proto/generated/meshtastic" ) -// MessageLogger logs messages to files and optionally to stdout +// MessageLogger logs messages using the provided logger type MessageLogger struct { *BaseSubscriber - logDir string - loggers map[pb.PortNum]io.Writer - files map[pb.PortNum]*os.File - mutex sync.Mutex - logToStdout bool // Flag to enable console output - stdoutSeparator string // Separator for console output - logger logging.Logger // Main logger instance + logger logging.Logger // Main logger instance + briefMode bool // Whether to log brief summaries instead of full packets + logToStdout bool // Flag to enable console output + stdoutSeparator string // Separator for console output } // NewMessageLogger creates a new message logger that subscribes to the broker -func NewMessageLogger(broker *Broker, logDir string, logToStdout bool, stdoutSeparator string, logger logging.Logger) (*MessageLogger, error) { - // Ensure log directory exists - if err := os.MkdirAll(logDir, 0755); err != nil { - return nil, fmt.Errorf("failed to create log directory: %v", err) - } - +func NewMessageLogger(broker *Broker, briefMode bool, logToStdout bool, stdoutSeparator string, logger logging.Logger) (*MessageLogger, error) { // Use provided logger or create a default one if logger == nil { logger = logging.NewDevLogger() @@ -42,12 +29,10 @@ func NewMessageLogger(broker *Broker, logDir string, logToStdout bool, stdoutSep messageLoggerLogger := logger.Named("mqtt.message_logger") ml := &MessageLogger{ - logDir: logDir, - loggers: make(map[pb.PortNum]io.Writer), - files: make(map[pb.PortNum]*os.File), - logToStdout: logToStdout, - stdoutSeparator: stdoutSeparator, - logger: messageLoggerLogger, + briefMode: briefMode, + logToStdout: logToStdout, + stdoutSeparator: stdoutSeparator, + logger: messageLoggerLogger, } // Create base subscriber with logger's message handler @@ -56,7 +41,6 @@ func NewMessageLogger(broker *Broker, logDir string, logToStdout bool, stdoutSep Broker: broker, BufferSize: 100, Processor: ml.logMessage, - CloseHook: ml.closeLogFiles, Logger: logger, }) @@ -66,59 +50,90 @@ func NewMessageLogger(broker *Broker, logDir string, logToStdout bool, stdoutSep return ml, nil } -// closeLogFiles is called when the subscriber is closed -func (ml *MessageLogger) closeLogFiles() { - ml.mutex.Lock() - defer ml.mutex.Unlock() +// getBriefSummary returns a brief summary of the packet +func (ml *MessageLogger) getBriefSummary(packet *Packet) string { + var summary string - for portNum, file := range ml.files { - ml.logger.Infof("Closing log file for %s", portNum) - file.Close() + if packet.DecodedPacket.DecodeError != nil { + return fmt.Sprintf("Error decoding packet: %v", packet.DecodedPacket.DecodeError) } + + // Create a basic summary based on the port type + switch packet.PortNum { + case pb.PortNum_TEXT_MESSAGE_APP: + // For text messages, include the text content + if text, ok := packet.Payload.(string); ok { + summary = fmt.Sprintf("Text message: %s", text) + } else { + summary = "Text message (invalid format)" + } + + case pb.PortNum_POSITION_APP: + // For position messages, include a compact location summary + if pos, ok := packet.Payload.(*pb.Position); ok { + lat := float64(pos.GetLatitudeI()) / 10000000.0 + lon := float64(pos.GetLongitudeI()) / 10000000.0 + summary = fmt.Sprintf("Position: %.5f, %.5f", lat, lon) + } else { + summary = "Position update (invalid format)" + } + + case pb.PortNum_TELEMETRY_APP: + // For telemetry, give a short summary of what's included + if telemetry, ok := packet.Payload.(*pb.Telemetry); ok { + parts := []string{} + if telemetry.GetEnvironmentMetrics() != nil { + parts = append(parts, "environment") + } + if telemetry.GetDeviceMetrics() != nil { + parts = append(parts, "device") + } + if telemetry.GetPowerMetrics() != nil { + parts = append(parts, "power") + } + summary = fmt.Sprintf("Telemetry: %s", strings.Join(parts, ", ")) + } else { + summary = "Telemetry (invalid format)" + } + + default: + // For other types, just mention the port type + summary = fmt.Sprintf("Message type: %s", packet.PortNum.String()) + } + + return summary } -// getLogWriter returns a writer for the specified port type -func (ml *MessageLogger) getLogWriter(portNum pb.PortNum) io.Writer { - ml.mutex.Lock() - defer ml.mutex.Unlock() - - // Check if we already have a writer for this port type - if writer, ok := ml.loggers[portNum]; ok { - return writer - } - - // Create a new log file for this port type - filename := fmt.Sprintf("%s.log", strings.ToLower(portNum.String())) - filepath := filepath.Join(ml.logDir, filename) - - file, err := os.OpenFile(filepath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644) - if err != nil { - ml.logger.Errorw("Error opening log file", "filepath", filepath, "error", err) - return nil - } - - // Store the writer and file handle - ml.loggers[portNum] = file - ml.files[portNum] = file - - return file -} - -// logMessage logs a message to the appropriate file and optionally to stdout +// logMessage logs a message using the structured logger func (ml *MessageLogger) logMessage(packet *Packet) { - // Format the message + // Get the full formatted output if needed for stdout or debug logging formattedOutput := decoder.FormatTopicAndPacket(packet.TopicInfo, packet.DecodedPacket) - // Add a timestamp and node info - logEntry := fmt.Sprintf("[Node %d] %s\n%s\n", - packet.From, - time.Now().Format(time.RFC3339), - formattedOutput) + // Get a brief summary for structured logging + briefSummary := ml.getBriefSummary(packet) - // Log to file - writer := ml.getLogWriter(packet.PortNum) - if writer != nil { - fmt.Fprint(writer, logEntry) + // Prepare common fields for both logging modes + fields := []interface{}{ + "portNum", packet.PortNum.String(), + "from", packet.From, + "to", packet.To, + "channel", packet.TopicInfo.Channel, + "region", packet.TopicInfo.RegionPath, + } + + // If packet had a decode error, add it to the fields + if packet.DecodedPacket.DecodeError != nil { + fields = append(fields, "error", packet.DecodedPacket.DecodeError.Error()) + } + + // Log based on mode + if ml.briefMode { + // Brief mode - just log the summary with structured fields + ml.logger.Infow(briefSummary, fields...) + } else { + // Full mode - include the full formatted output + allFields := append(fields, "fullOutput", formattedOutput) + ml.logger.Infow("Message received", allFields...) } // Log to stdout if enabled @@ -128,12 +143,5 @@ func (ml *MessageLogger) logMessage(packet *Packet) { fmt.Println(ml.stdoutSeparator) } } - - // Also log a brief message with the structured logger - ml.logger.Debugw("Message logged", - "portNum", packet.PortNum.String(), - "from", packet.From, - "to", packet.To, - ) }