Files
meshstream/main.go
2025-04-30 10:36:45 -07:00

277 lines
8.4 KiB
Go

package main
import (
"flag"
"fmt"
"os"
"os/signal"
"strings"
"syscall"
"time"
"github.com/dpup/prefab/logging"
"meshstream/decoder"
"meshstream/mqtt"
"meshstream/server"
)
// Config holds all the configuration parameters
type Config struct {
// MQTT Configuration
MQTTBroker string
MQTTUsername string
MQTTPassword string
MQTTTopicPrefix string
MQTTClientID string
// Web server configuration
ServerHost string
ServerPort string
// Logging configuration
LogLevel string
LogFormat string
// Channel keys configuration (name:key pairs)
ChannelKeys []string
// Statistics configuration
StatsInterval time.Duration
CacheSize int
VerboseLogging bool
}
// getEnv retrieves an environment variable with the given prefix or returns the default value
func getEnv(key, defaultValue string) string {
envKey := "MESHSTREAM_" + key
if val, exists := os.LookupEnv(envKey); exists {
return val
}
return defaultValue
}
// parseConfig parses command line flags and environment variables
func parseConfig() *Config {
// Print custom usage message
flag.Usage = func() {
fmt.Fprintf(os.Stderr, "MeshStream: A Meshtastic MQTT streaming service\n\n")
fmt.Fprintf(os.Stderr, "Usage: %s [OPTIONS]\n\n", os.Args[0])
fmt.Fprintf(os.Stderr, "Options:\n")
flag.PrintDefaults()
fmt.Fprintf(os.Stderr, "\nAll options can also be set using environment variables with the MESHSTREAM_ prefix.\n")
fmt.Fprintf(os.Stderr, "Example: MESHSTREAM_MQTT_BROKER=mqtt.example.com MESHSTREAM_SERVER_PORT=8081 %s\n\n", os.Args[0])
}
config := &Config{}
// MQTT configuration
flag.StringVar(&config.MQTTBroker, "mqtt-broker", getEnv("MQTT_BROKER", "mqtt.bayme.sh"), "MQTT broker address")
flag.StringVar(&config.MQTTUsername, "mqtt-username", getEnv("MQTT_USERNAME", "meshdev"), "MQTT username")
flag.StringVar(&config.MQTTPassword, "mqtt-password", getEnv("MQTT_PASSWORD", "large4cats"), "MQTT password")
flag.StringVar(&config.MQTTTopicPrefix, "mqtt-topic-prefix", getEnv("MQTT_TOPIC_PREFIX", "msh/US/CA/Motherlode"), "MQTT topic prefix")
flag.StringVar(&config.MQTTClientID, "mqtt-client-id", getEnv("MQTT_CLIENT_ID", "meshstream-client"), "MQTT client ID")
// Web server configuration
flag.StringVar(&config.ServerHost, "server-host", getEnv("SERVER_HOST", "localhost"), "Web server host")
flag.StringVar(&config.ServerPort, "server-port", getEnv("SERVER_PORT", "8080"), "Web server port")
// Logging configuration
flag.StringVar(&config.LogLevel, "log-level", getEnv("LOG_LEVEL", "info"), "Log level (debug, info, warn, error)")
flag.StringVar(&config.LogFormat, "log-format", getEnv("LOG_FORMAT", "json"), "Log format (json, console)")
// Channel key configuration (comma separated list of name:key pairs)
channelKeysDefault := getEnv("CHANNEL_KEYS", "LongFast:"+decoder.DefaultPrivateKey+",ERSN:VIuMtC5uDDJtC/ojdH314HLkDIHanX4LdbK5yViV9jA=")
channelKeysFlag := flag.String("channel-keys", channelKeysDefault, "Comma-separated list of channel:key pairs for encrypted channels")
// Statistics configuration
statsIntervalStr := getEnv("STATS_INTERVAL", "30s")
flag.DurationVar(&config.StatsInterval, "stats-interval", mustParseDuration(statsIntervalStr), "Interval for statistics reporting")
flag.IntVar(&config.CacheSize, "cache-size", intFromEnv("CACHE_SIZE", 50), "Number of packets to cache for new subscribers")
flag.BoolVar(&config.VerboseLogging, "verbose", boolFromEnv("VERBOSE_LOGGING", false), "Enable verbose message logging")
// Parse flags
flag.Parse()
// Process channel keys from the command line
if *channelKeysFlag != "" {
config.ChannelKeys = strings.Split(*channelKeysFlag, ",")
}
return config
}
// Helper function to parse duration from environment
func mustParseDuration(durationStr string) time.Duration {
duration, err := time.ParseDuration(durationStr)
if err != nil {
fmt.Fprintf(os.Stderr, "Invalid duration format: %s\n", durationStr)
os.Exit(1)
}
return duration
}
// Helper function to parse int from environment
func intFromEnv(key string, defaultValue int) int {
envVal := getEnv(key, "")
if envVal == "" {
return defaultValue
}
var result int
_, err := fmt.Sscanf(envVal, "%d", &result)
if err != nil {
fmt.Fprintf(os.Stderr, "Invalid integer format for %s: %s\n", key, envVal)
os.Exit(1)
}
return result
}
// Helper function to parse bool from environment
func boolFromEnv(key string, defaultValue bool) bool {
envVal := getEnv(key, "")
if envVal == "" {
return defaultValue
}
switch strings.ToLower(envVal) {
case "true", "yes", "1", "y", "t":
return true
case "false", "no", "0", "n", "f":
return false
default:
fmt.Fprintf(os.Stderr, "Invalid boolean format for %s: %s\n", key, envVal)
os.Exit(1)
return defaultValue
}
}
func main() {
// Parse configuration from flags and environment variables
config := parseConfig()
// Set up logging
var logger logging.Logger
// Use the production logger (JSON format)
logger = logging.NewProdLogger()
// Add main component name
logger = logger.Named("main")
// Log our configuration
logger.Infof("Logger initialized with level: %s, format: %s",
config.LogLevel, config.LogFormat)
// Initialize channel keys
for _, channelKeyPair := range config.ChannelKeys {
parts := strings.SplitN(channelKeyPair, ":", 2)
if len(parts) != 2 {
logger.Errorw("Invalid channel key format, should be 'channel:key'", "pair", channelKeyPair)
continue
}
channelName := parts[0]
channelKey := parts[1]
err := decoder.AddChannelKey(channelName, channelKey)
if err != nil {
logger.Errorw("Failed to initialize channel key", "channel", channelName, "error", err)
} else {
logger.Infof("Initialized channel key for '%s'", channelName)
}
}
// Configure and create the MQTT client
mqttConfig := mqtt.Config{
Broker: config.MQTTBroker,
Username: config.MQTTUsername,
Password: config.MQTTPassword,
ClientID: config.MQTTClientID,
Topic: config.MQTTTopicPrefix + "/#",
}
logger.Infof("Connecting to MQTT broker: %s with topic prefix: %s", config.MQTTBroker, config.MQTTTopicPrefix)
mqttClient := mqtt.NewClient(mqttConfig, logger)
// Connect to the MQTT broker
if err := mqttClient.Connect(); err != nil {
logger.Fatalw("Failed to connect to MQTT broker", "error", err)
}
// Get the messages channel to receive decoded messages
messagesChan := mqttClient.Messages()
// Create a message broker to distribute messages to multiple consumers
// Cache packets for new subscribers based on configuration
broker := mqtt.NewBroker(messagesChan, config.CacheSize, logger)
logger.Infof("Message broker initialized with cache size: %d", config.CacheSize)
// Create a stats tracker that subscribes to the broker
// with statistics printed based on configured interval
stats := mqtt.NewMessageStats(broker, config.StatsInterval, logger)
logger.Infof("Stats tracker initialized with interval: %s", config.StatsInterval)
// Create a message logger that subscribes to the broker
// and also logs to stdout
messageLogger, err := mqtt.NewMessageLogger(
broker,
!config.VerboseLogging,
logger,
)
if err != nil {
logger.Warnw("Failed to initialize message logger", "error", err)
} else {
logger.Infof("Message logger initialized with verbose mode: %t", config.VerboseLogging)
}
// Start the web server
webServer := server.New(server.Config{
Host: config.ServerHost,
Port: config.ServerPort,
Broker: broker,
Logger: logger,
})
// Start the server in a goroutine
go func() {
logger.Infof("Starting web server at http://%s:%s", config.ServerHost, config.ServerPort)
if err := webServer.Start(); err != nil {
logger.Errorw("Web server error", "error", err)
}
}()
// Setup signal handling for graceful shutdown
sig := make(chan os.Signal, 1)
signal.Notify(sig, os.Interrupt, syscall.SIGTERM)
// Process messages until interrupt received
logger.Info("Waiting for messages... Press Ctrl+C to exit")
logger.Infof("Web server running at http://%s:%s", config.ServerHost, config.ServerPort)
// Wait for interrupt signal
<-sig
// Got an interrupt signal, shutting down
logger.Info("Shutting down...")
// Close components in reverse order of creation
// First stop the web server
if err := webServer.Stop(); err != nil {
logger.Errorw("Error stopping web server", "error", err)
}
// Then stop the logger
if messageLogger != nil {
messageLogger.Close()
}
// Stop the stats collector
stats.Close()
// Close the broker (which will close all subscriber channels)
broker.Close()
// Then disconnect the MQTT client
mqttClient.Disconnect()
}