From 64b026646d7ed8bdbcb45379d9b7ea7dd5bc7aa3 Mon Sep 17 00:00:00 2001 From: Daniel Pupius Date: Wed, 30 Apr 2025 10:36:45 -0700 Subject: [PATCH] Use env and flags for configuration --- Makefile | 2 +- main.go | 211 +++++++++++++++++++++++++++++++++++++++++++++++-------- 2 files changed, 181 insertions(+), 32 deletions(-) diff --git a/Makefile b/Makefile index 8d88379..4d99af3 100644 --- a/Makefile +++ b/Makefile @@ -19,7 +19,7 @@ build: # Run the application with json log formatting run: build - @./dist/meshstream 2>&1 | go tool github.com/dpup/logista + @./dist/meshstream --verbose 2>&1 | go tool github.com/dpup/logista # Generate Go code from Protocol Buffers gen-proto: tools diff --git a/main.go b/main.go index 493d8cc..53f2737 100644 --- a/main.go +++ b/main.go @@ -1,8 +1,11 @@ package main import ( + "flag" + "fmt" "os" "os/signal" + "strings" "syscall" "time" @@ -13,40 +16,181 @@ import ( "meshstream/server" ) -const ( - mqttBroker = "mqtt.bayme.sh" - mqttUsername = "meshdev" - mqttPassword = "large4cats" - mqttTopicPrefix = "msh/US/CA/Motherlode" +// Config holds all the configuration parameters +type Config struct { + // MQTT Configuration + MQTTBroker string + MQTTUsername string + MQTTPassword string + MQTTTopicPrefix string + MQTTClientID string // Web server configuration - serverHost = "localhost" - serverPort = "8080" -) + ServerHost string + ServerPort string -func main() { - // Set up logging - logger := logging.NewProdLogger().Named("main") + // Logging configuration + LogLevel string + LogFormat string - // Initialize default channel key - err := decoder.AddChannelKey("LongFast", decoder.DefaultPrivateKey) - if err != nil { - logger.Errorw("Failed to initialize default channel key", "error", err) + // Channel keys configuration (name:key pairs) + ChannelKeys []string + + // Statistics configuration + StatsInterval time.Duration + CacheSize int + VerboseLogging bool +} + +// getEnv retrieves an environment variable with the given prefix or returns the default value +func getEnv(key, defaultValue string) string { + envKey := "MESHSTREAM_" + key + if val, exists := os.LookupEnv(envKey); exists { + return val + } + return defaultValue +} + +// parseConfig parses command line flags and environment variables +func parseConfig() *Config { + // Print custom usage message + flag.Usage = func() { + fmt.Fprintf(os.Stderr, "MeshStream: A Meshtastic MQTT streaming service\n\n") + fmt.Fprintf(os.Stderr, "Usage: %s [OPTIONS]\n\n", os.Args[0]) + fmt.Fprintf(os.Stderr, "Options:\n") + flag.PrintDefaults() + fmt.Fprintf(os.Stderr, "\nAll options can also be set using environment variables with the MESHSTREAM_ prefix.\n") + fmt.Fprintf(os.Stderr, "Example: MESHSTREAM_MQTT_BROKER=mqtt.example.com MESHSTREAM_SERVER_PORT=8081 %s\n\n", os.Args[0]) } - if err := decoder.AddChannelKey("ERSN", "VIuMtC5uDDJtC/ojdH314HLkDIHanX4LdbK5yViV9jA="); err != nil { - logger.Errorw("Failed to initialize ERSN channel key", "error", err) + config := &Config{} + + // MQTT configuration + flag.StringVar(&config.MQTTBroker, "mqtt-broker", getEnv("MQTT_BROKER", "mqtt.bayme.sh"), "MQTT broker address") + flag.StringVar(&config.MQTTUsername, "mqtt-username", getEnv("MQTT_USERNAME", "meshdev"), "MQTT username") + flag.StringVar(&config.MQTTPassword, "mqtt-password", getEnv("MQTT_PASSWORD", "large4cats"), "MQTT password") + flag.StringVar(&config.MQTTTopicPrefix, "mqtt-topic-prefix", getEnv("MQTT_TOPIC_PREFIX", "msh/US/CA/Motherlode"), "MQTT topic prefix") + flag.StringVar(&config.MQTTClientID, "mqtt-client-id", getEnv("MQTT_CLIENT_ID", "meshstream-client"), "MQTT client ID") + + // Web server configuration + flag.StringVar(&config.ServerHost, "server-host", getEnv("SERVER_HOST", "localhost"), "Web server host") + flag.StringVar(&config.ServerPort, "server-port", getEnv("SERVER_PORT", "8080"), "Web server port") + + // Logging configuration + flag.StringVar(&config.LogLevel, "log-level", getEnv("LOG_LEVEL", "info"), "Log level (debug, info, warn, error)") + flag.StringVar(&config.LogFormat, "log-format", getEnv("LOG_FORMAT", "json"), "Log format (json, console)") + + // Channel key configuration (comma separated list of name:key pairs) + channelKeysDefault := getEnv("CHANNEL_KEYS", "LongFast:"+decoder.DefaultPrivateKey+",ERSN:VIuMtC5uDDJtC/ojdH314HLkDIHanX4LdbK5yViV9jA=") + channelKeysFlag := flag.String("channel-keys", channelKeysDefault, "Comma-separated list of channel:key pairs for encrypted channels") + + // Statistics configuration + statsIntervalStr := getEnv("STATS_INTERVAL", "30s") + flag.DurationVar(&config.StatsInterval, "stats-interval", mustParseDuration(statsIntervalStr), "Interval for statistics reporting") + flag.IntVar(&config.CacheSize, "cache-size", intFromEnv("CACHE_SIZE", 50), "Number of packets to cache for new subscribers") + flag.BoolVar(&config.VerboseLogging, "verbose", boolFromEnv("VERBOSE_LOGGING", false), "Enable verbose message logging") + + // Parse flags + flag.Parse() + + // Process channel keys from the command line + if *channelKeysFlag != "" { + config.ChannelKeys = strings.Split(*channelKeysFlag, ",") + } + + return config +} + +// Helper function to parse duration from environment +func mustParseDuration(durationStr string) time.Duration { + duration, err := time.ParseDuration(durationStr) + if err != nil { + fmt.Fprintf(os.Stderr, "Invalid duration format: %s\n", durationStr) + os.Exit(1) + } + return duration +} + +// Helper function to parse int from environment +func intFromEnv(key string, defaultValue int) int { + envVal := getEnv(key, "") + if envVal == "" { + return defaultValue + } + var result int + _, err := fmt.Sscanf(envVal, "%d", &result) + if err != nil { + fmt.Fprintf(os.Stderr, "Invalid integer format for %s: %s\n", key, envVal) + os.Exit(1) + } + return result +} + +// Helper function to parse bool from environment +func boolFromEnv(key string, defaultValue bool) bool { + envVal := getEnv(key, "") + if envVal == "" { + return defaultValue + } + + switch strings.ToLower(envVal) { + case "true", "yes", "1", "y", "t": + return true + case "false", "no", "0", "n", "f": + return false + default: + fmt.Fprintf(os.Stderr, "Invalid boolean format for %s: %s\n", key, envVal) + os.Exit(1) + return defaultValue + } +} + +func main() { + // Parse configuration from flags and environment variables + config := parseConfig() + + // Set up logging + var logger logging.Logger + + // Use the production logger (JSON format) + logger = logging.NewProdLogger() + + // Add main component name + logger = logger.Named("main") + + // Log our configuration + logger.Infof("Logger initialized with level: %s, format: %s", + config.LogLevel, config.LogFormat) + + // Initialize channel keys + for _, channelKeyPair := range config.ChannelKeys { + parts := strings.SplitN(channelKeyPair, ":", 2) + if len(parts) != 2 { + logger.Errorw("Invalid channel key format, should be 'channel:key'", "pair", channelKeyPair) + continue + } + + channelName := parts[0] + channelKey := parts[1] + + err := decoder.AddChannelKey(channelName, channelKey) + if err != nil { + logger.Errorw("Failed to initialize channel key", "channel", channelName, "error", err) + } else { + logger.Infof("Initialized channel key for '%s'", channelName) + } } // Configure and create the MQTT client mqttConfig := mqtt.Config{ - Broker: mqttBroker, - Username: mqttUsername, - Password: mqttPassword, - ClientID: "meshstream-client", - Topic: mqttTopicPrefix + "/#", + Broker: config.MQTTBroker, + Username: config.MQTTUsername, + Password: config.MQTTPassword, + ClientID: config.MQTTClientID, + Topic: config.MQTTTopicPrefix + "/#", } + logger.Infof("Connecting to MQTT broker: %s with topic prefix: %s", config.MQTTBroker, config.MQTTTopicPrefix) mqttClient := mqtt.NewClient(mqttConfig, logger) // Connect to the MQTT broker @@ -58,34 +202,39 @@ func main() { messagesChan := mqttClient.Messages() // Create a message broker to distribute messages to multiple consumers - // Cache the last 50 packets for new subscribers - broker := mqtt.NewBroker(messagesChan, 50, logger) + // Cache packets for new subscribers based on configuration + broker := mqtt.NewBroker(messagesChan, config.CacheSize, logger) + logger.Infof("Message broker initialized with cache size: %d", config.CacheSize) // Create a stats tracker that subscribes to the broker - // with statistics printed every 30 seconds - stats := mqtt.NewMessageStats(broker, 30*time.Second, logger) + // with statistics printed based on configured interval + stats := mqtt.NewMessageStats(broker, config.StatsInterval, logger) + logger.Infof("Stats tracker initialized with interval: %s", config.StatsInterval) // Create a message logger that subscribes to the broker - // and also logs to stdout with a separator + // and also logs to stdout messageLogger, err := mqtt.NewMessageLogger( broker, - false, // Use brief mode for more concise logs + !config.VerboseLogging, logger, ) if err != nil { logger.Warnw("Failed to initialize message logger", "error", err) + } else { + logger.Infof("Message logger initialized with verbose mode: %t", config.VerboseLogging) } // Start the web server webServer := server.New(server.Config{ - Host: serverHost, - Port: serverPort, + Host: config.ServerHost, + Port: config.ServerPort, Broker: broker, Logger: logger, }) // Start the server in a goroutine go func() { + logger.Infof("Starting web server at http://%s:%s", config.ServerHost, config.ServerPort) if err := webServer.Start(); err != nil { logger.Errorw("Web server error", "error", err) } @@ -97,7 +246,7 @@ func main() { // Process messages until interrupt received logger.Info("Waiting for messages... Press Ctrl+C to exit") - logger.Infof("Web server running at http://%s:%s", serverHost, serverPort) + logger.Infof("Web server running at http://%s:%s", config.ServerHost, config.ServerPort) // Wait for interrupt signal <-sig