Update MessageLogger to log all packet types

This commit is contained in:
Daniel Pupius
2025-04-20 20:13:04 -07:00
parent a4be4caa55
commit c129be3be6
3 changed files with 286 additions and 214 deletions

226
main.go
View File

@@ -5,16 +5,12 @@ import (
"log"
"os"
"os/signal"
"path/filepath"
"strings"
"sync"
"syscall"
"time"
"meshstream/decoder"
"meshstream/mqtt"
pb "meshstream/proto/generated/meshtastic"
)
const (
@@ -25,162 +21,6 @@ const (
logsDir = "./logs"
)
// MessageStats tracks statistics about received messages
type MessageStats struct {
sync.Mutex
TotalMessages int
ByNode map[uint32]int
ByPortType map[pb.PortNum]int
LastStatsPrinted time.Time
}
// NewMessageStats creates a new MessageStats instance
func NewMessageStats() *MessageStats {
return &MessageStats{
ByNode: make(map[uint32]int),
ByPortType: make(map[pb.PortNum]int),
LastStatsPrinted: time.Now(),
}
}
// RecordMessage records a message in the statistics
func (s *MessageStats) RecordMessage(packet *mqtt.Packet) {
s.Lock()
defer s.Unlock()
s.TotalMessages++
// Count by source node
s.ByNode[packet.From]++
// Count by port type
s.ByPortType[packet.PortNum]++
}
// PrintStats prints current statistics
func (s *MessageStats) PrintStats() {
s.Lock()
defer s.Unlock()
now := time.Now()
duration := now.Sub(s.LastStatsPrinted)
msgPerSec := float64(s.TotalMessages) / duration.Seconds()
fmt.Println("\n==== Message Statistics ====")
fmt.Printf("Total messages: %d (%.2f msg/sec)\n", s.TotalMessages, msgPerSec)
// Print node statistics
fmt.Println("\nMessages by Node:")
for nodeID, count := range s.ByNode {
fmt.Printf(" Node %d: %d messages\n", nodeID, count)
}
// Print port type statistics
fmt.Println("\nMessages by Port Type:")
for portType, count := range s.ByPortType {
fmt.Printf(" %s: %d messages\n", portType, count)
}
fmt.Println(strings.Repeat("=", 30))
// Reset counters for rate calculation
s.TotalMessages = 0
s.ByNode = make(map[uint32]int)
s.ByPortType = make(map[pb.PortNum]int)
s.LastStatsPrinted = now
}
// MessageLogger logs messages of specific types to separate files
type MessageLogger struct {
logDir string
loggers map[pb.PortNum]*log.Logger
files map[pb.PortNum]*os.File
mutex sync.Mutex
}
// NewMessageLogger creates a new message logger
func NewMessageLogger(logDir string) (*MessageLogger, error) {
// Ensure log directory exists
if err := os.MkdirAll(logDir, 0755); err != nil {
return nil, fmt.Errorf("failed to create log directory: %v", err)
}
return &MessageLogger{
logDir: logDir,
loggers: make(map[pb.PortNum]*log.Logger),
files: make(map[pb.PortNum]*os.File),
}, nil
}
// getLogger returns a logger for the specified port type
func (ml *MessageLogger) getLogger(portNum pb.PortNum) *log.Logger {
ml.mutex.Lock()
defer ml.mutex.Unlock()
// Check if we already have a logger for this port type
if logger, ok := ml.loggers[portNum]; ok {
return logger
}
// Create a new log file for this port type
filename := fmt.Sprintf("%s.log", strings.ToLower(portNum.String()))
filepath := filepath.Join(ml.logDir, filename)
file, err := os.OpenFile(filepath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
if err != nil {
log.Printf("Error opening log file %s: %v", filepath, err)
return nil
}
// Create a new logger
logger := log.New(file, "", log.LstdFlags)
// Store the logger and file handle
ml.loggers[portNum] = logger
ml.files[portNum] = file
return logger
}
// LogMessage logs a message to the appropriate file based on its port type
func (ml *MessageLogger) LogMessage(packet *mqtt.Packet) {
// We only log specific message types
switch packet.PortNum {
case pb.PortNum_POSITION_APP,
pb.PortNum_TELEMETRY_APP,
pb.PortNum_NODEINFO_APP,
pb.PortNum_MAP_REPORT_APP,
pb.PortNum_TRACEROUTE_APP,
pb.PortNum_NEIGHBORINFO_APP:
// Get the logger for this port type
logger := ml.getLogger(packet.PortNum)
if logger != nil {
// Format the message
formattedOutput := decoder.FormatTopicAndPacket(packet.TopicInfo, packet.DecodedPacket)
// Add a timestamp and node info
logEntry := fmt.Sprintf("[Node %d] %s\n%s\n",
packet.From,
time.Now().Format(time.RFC3339),
formattedOutput)
// Write to the log
logger.Println(logEntry)
}
}
}
// Close closes all log files
func (ml *MessageLogger) Close() {
ml.mutex.Lock()
defer ml.mutex.Unlock()
for portNum, file := range ml.files {
log.Printf("Closing log file for %s", portNum)
file.Close()
}
}
func main() {
// Set up logging
log.SetOutput(os.Stdout)
@@ -220,65 +60,20 @@ func main() {
// Create a consumer channel for display with buffer size 10
displayChan := broker.Subscribe(10)
// Create a consumer channel for statistics with buffer size 50
statsChan := broker.Subscribe(50)
// Create a stats tracker that subscribes to the broker
// with statistics printed every 30 seconds
stats := mqtt.NewMessageStats(broker, 30*time.Second)
// Create a consumer channel for logging with buffer size 100
loggerChan := broker.Subscribe(100)
// Create a stats tracker
stats := NewMessageStats()
// Create a message logger
messageLogger, err := NewMessageLogger(logsDir)
// Create a message logger that subscribes to the broker
messageLogger, err := mqtt.NewMessageLogger(broker, logsDir)
if err != nil {
log.Printf("Warning: Failed to initialize message logger: %v", err)
}
// Create a ticker for periodically printing stats
statsTicker := time.NewTicker(30 * time.Second)
// Setup signal handling for graceful shutdown
sig := make(chan os.Signal, 1)
signal.Notify(sig, os.Interrupt, syscall.SIGTERM)
// Start a goroutine for processing statistics
go func() {
for {
select {
case packet, ok := <-statsChan:
if !ok {
// Channel closed
return
}
if packet != nil {
stats.RecordMessage(packet)
}
case <-statsTicker.C:
stats.PrintStats()
}
}
}()
// Start a goroutine for logging specific message types
go func() {
if messageLogger != nil {
for {
packet, ok := <-loggerChan
if !ok {
// Channel closed
return
}
if packet != nil {
messageLogger.LogMessage(packet)
}
}
}
}()
// Process messages until interrupt received
fmt.Println("Waiting for messages... Press Ctrl+C to exit")
fmt.Println("Statistics will be printed every 30 seconds")
@@ -301,14 +96,17 @@ func main() {
case <-sig:
// Got an interrupt signal, shutting down
fmt.Println("Shutting down...")
// Stop the ticker
statsTicker.Stop()
// Close the message logger
// Close components in reverse order of creation
if messageLogger != nil {
messageLogger.Close()
}
// Close the broker first (which will close all subscriber channels)
stats.Close()
// Close the broker (which will close all subscriber channels)
broker.Close()
// Then disconnect the MQTT client
mqttClient.Disconnect()
return

144
mqtt/logger.go Normal file
View File

@@ -0,0 +1,144 @@
package mqtt
import (
"fmt"
"log"
"os"
"path/filepath"
"strings"
"sync"
"time"
"meshstream/decoder"
pb "meshstream/proto/generated/meshtastic"
)
// MessageLogger logs messages of specific types to separate files
type MessageLogger struct {
logDir string
broker *Broker
subscriber <-chan *Packet
loggers map[pb.PortNum]*log.Logger
files map[pb.PortNum]*os.File
mutex sync.Mutex
done chan struct{}
wg sync.WaitGroup
}
// NewMessageLogger creates a new message logger that subscribes to the broker
func NewMessageLogger(broker *Broker, logDir string) (*MessageLogger, error) {
// Ensure log directory exists
if err := os.MkdirAll(logDir, 0755); err != nil {
return nil, fmt.Errorf("failed to create log directory: %v", err)
}
ml := &MessageLogger{
logDir: logDir,
broker: broker,
loggers: make(map[pb.PortNum]*log.Logger),
files: make(map[pb.PortNum]*os.File),
done: make(chan struct{}),
}
// Subscribe to the broker with a large buffer
ml.subscriber = broker.Subscribe(100)
// Start processing messages
ml.wg.Add(1)
go ml.run()
return ml, nil
}
// run processes incoming messages
func (ml *MessageLogger) run() {
defer ml.wg.Done()
for {
select {
case packet, ok := <-ml.subscriber:
if !ok {
// Channel closed
return
}
if packet != nil {
ml.logMessage(packet)
}
case <-ml.done:
return
}
}
}
// getLogger returns a logger for the specified port type
func (ml *MessageLogger) getLogger(portNum pb.PortNum) *log.Logger {
ml.mutex.Lock()
defer ml.mutex.Unlock()
// Check if we already have a logger for this port type
if logger, ok := ml.loggers[portNum]; ok {
return logger
}
// Create a new log file for this port type
filename := fmt.Sprintf("%s.log", strings.ToLower(portNum.String()))
filepath := filepath.Join(ml.logDir, filename)
file, err := os.OpenFile(filepath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
if err != nil {
log.Printf("Error opening log file %s: %v", filepath, err)
return nil
}
// Create a new logger
logger := log.New(file, "", log.LstdFlags)
// Store the logger and file handle
ml.loggers[portNum] = logger
ml.files[portNum] = file
return logger
}
// logMessage logs a message to the appropriate file based on its port type
func (ml *MessageLogger) logMessage(packet *Packet) {
// Log all message types by getting a logger for the packet's port type
logger := ml.getLogger(packet.PortNum)
if logger != nil {
// Format the message
formattedOutput := decoder.FormatTopicAndPacket(packet.TopicInfo, packet.DecodedPacket)
// Add a timestamp and node info
logEntry := fmt.Sprintf("[Node %d] %s\n%s\n",
packet.From,
time.Now().Format(time.RFC3339),
formattedOutput)
// Write to the log
logger.Println(logEntry)
}
}
// Close stops the logger and closes all log files
func (ml *MessageLogger) Close() {
// Signal the processing loop to stop
close(ml.done)
// Unsubscribe from the broker
ml.broker.Unsubscribe(ml.subscriber)
// Wait for the processing loop to exit
ml.wg.Wait()
// Close all log files
ml.mutex.Lock()
defer ml.mutex.Unlock()
for portNum, file := range ml.files {
log.Printf("Closing log file for %s", portNum)
file.Close()
}
}

130
mqtt/stats.go Normal file
View File

@@ -0,0 +1,130 @@
package mqtt
import (
"fmt"
"strings"
"sync"
"time"
pb "meshstream/proto/generated/meshtastic"
)
// MessageStats tracks statistics about received messages
type MessageStats struct {
sync.Mutex
broker *Broker
subscriber <-chan *Packet
TotalMessages int
ByNode map[uint32]int
ByPortType map[pb.PortNum]int
LastStatsPrinted time.Time
done chan struct{}
wg sync.WaitGroup
}
// NewMessageStats creates a new MessageStats instance
func NewMessageStats(broker *Broker, printInterval time.Duration) *MessageStats {
s := &MessageStats{
broker: broker,
ByNode: make(map[uint32]int),
ByPortType: make(map[pb.PortNum]int),
LastStatsPrinted: time.Now(),
done: make(chan struct{}),
}
// Subscribe to the broker with a larger buffer to handle bursts
s.subscriber = broker.Subscribe(50)
// Start the collection loop
s.wg.Add(1)
go s.run(printInterval)
return s
}
// run handles statistics collection and periodic printing
func (s *MessageStats) run(printInterval time.Duration) {
defer s.wg.Done()
// Create a ticker for periodically printing stats
statsTicker := time.NewTicker(printInterval)
defer statsTicker.Stop()
for {
select {
case packet, ok := <-s.subscriber:
if !ok {
// Channel closed
return
}
if packet != nil {
s.recordMessage(packet)
}
case <-statsTicker.C:
s.PrintStats()
case <-s.done:
return
}
}
}
// recordMessage records a message in the statistics
func (s *MessageStats) recordMessage(packet *Packet) {
s.Lock()
defer s.Unlock()
s.TotalMessages++
// Count by source node
s.ByNode[packet.From]++
// Count by port type
s.ByPortType[packet.PortNum]++
}
// PrintStats prints current statistics
func (s *MessageStats) PrintStats() {
s.Lock()
defer s.Unlock()
now := time.Now()
duration := now.Sub(s.LastStatsPrinted)
msgPerSec := float64(s.TotalMessages) / duration.Seconds()
fmt.Println("\n==== Message Statistics ====")
fmt.Printf("Total messages: %d (%.2f msg/sec)\n", s.TotalMessages, msgPerSec)
// Print node statistics
fmt.Println("\nMessages by Node:")
for nodeID, count := range s.ByNode {
fmt.Printf(" Node %d: %d messages\n", nodeID, count)
}
// Print port type statistics
fmt.Println("\nMessages by Port Type:")
for portType, count := range s.ByPortType {
fmt.Printf(" %s: %d messages\n", portType, count)
}
fmt.Println(strings.Repeat("=", 30))
// Reset counters for rate calculation
s.TotalMessages = 0
s.ByNode = make(map[uint32]int)
s.ByPortType = make(map[pb.PortNum]int)
s.LastStatsPrinted = now
}
// Close stops the stats collector and unsubscribes from the broker
func (s *MessageStats) Close() {
// Signal the collection loop to stop
close(s.done)
// Unsubscribe from the broker
s.broker.Unsubscribe(s.subscriber)
// Wait for the collection loop to exit
s.wg.Wait()
}