Files
mqtt/Program.cs
Ben Meadors 6283f6e695 Moar
2025-04-16 09:17:47 -05:00

207 lines
6.7 KiB
C#

using MQTTnet.Server;
using Meshtastic.Protobufs;
using Google.Protobuf;
using Serilog;
using MQTTnet.Protocol;
using System.Runtime.Loader;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Serilog.Formatting.Compact;
using Meshtastic.Crypto;
using Meshtastic;
using System.Security.Authentication;
using System.Security.Cryptography.X509Certificates;
using System.Reflection;
await RunMqttServer(args);
async Task RunMqttServer(string[] args)
{
Log.Logger = new LoggerConfiguration()
.MinimumLevel.Debug()
.WriteTo.Console(new RenderedCompactJsonFormatter())
// .WriteTo.File(new RenderedCompactJsonFormatter(), "log.json", rollingInterval: RollingInterval.Hour)
.CreateLogger();
// Create and configure MQTT server
using var mqttServer = new MqttServerFactory()
.CreateMqttServer(BuildMqttServerOptions());
ConfigureMqttServer(mqttServer);
// Set up host
using var host = CreateHostBuilder(args).Build();
await host.StartAsync();
var lifetime = host.Services.GetRequiredService<IHostApplicationLifetime>();
// Start MQTT server
await mqttServer.StartAsync();
// Configure graceful shutdown
await SetupGracefulShutdown(mqttServer, lifetime, host);
}
MqttServerOptions BuildMqttServerOptions()
{
var currentPath = Path.GetDirectoryName(Assembly.GetExecutingAssembly().Location)!;
#pragma warning disable SYSLIB0057 // Type or member is obsolete
var certificate = new X509Certificate2(
Path.Combine(currentPath, "certificate.pfx"),
"large4cats",
X509KeyStorageFlags.Exportable);
#pragma warning restore SYSLIB0057
var options = new MqttServerOptionsBuilder()
.WithoutDefaultEndpoint()
.WithEncryptedEndpoint()
.WithEncryptedEndpointPort(8883)
.WithEncryptionCertificate(certificate.Export(X509ContentType.Pfx))
.WithEncryptionSslProtocol(SslProtocols.Tls12)
.Build();
Log.Logger.Information("Using SSL certificate for MQTT server");
return options;
}
void ConfigureMqttServer(MqttServer mqttServer)
{
mqttServer.InterceptingPublishAsync += HandleInterceptingPublish;
mqttServer.InterceptingSubscriptionAsync += HandleInterceptingSubscription;
mqttServer.ValidatingConnectionAsync += HandleValidatingConnection;
}
async Task HandleInterceptingPublish(InterceptingPublishEventArgs args)
{
try
{
if (args.ApplicationMessage.Payload.Length == 0)
{
Log.Logger.Warning("Received empty payload on topic {@Topic} from {@ClientId}", args.ApplicationMessage.Topic, args.ClientId);
args.ProcessPublish = false;
return;
}
var serviceEnvelope = ServiceEnvelope.Parser.ParseFrom(args.ApplicationMessage.Payload);
if (!IsValidServiceEnvelope(serviceEnvelope))
{
Log.Logger.Warning("Service envelope or packet is malformed. Blocking packet on topic {@Topic} from {@ClientId}",
args.ApplicationMessage.Topic, args.ClientId);
args.ProcessPublish = false;
return;
}
// Spot for any async operations we might want to perform
await Task.FromResult(0);
var data = DecryptMeshPacket(serviceEnvelope);
// Uncomment to block unrecognized packets
// if (data == null)
// {
// Log.Logger.Warning("Service envelope does not contain a valid packet. Blocking packet");
// args.ProcessPublish = false;
// return;
// }
LogReceivedMessage(args.ApplicationMessage.Topic, args.ClientId, data);
args.ProcessPublish = true;
}
catch (InvalidProtocolBufferException)
{
Log.Logger.Warning("Failed to decode presumed protobuf packet. Blocking");
args.ProcessPublish = false;
}
catch (Exception ex)
{
Log.Logger.Error("Exception occurred while processing packet on {@Topic} from {@ClientId}: {@Exception}",
args.ApplicationMessage.Topic, args.ClientId, ex.Message);
args.ProcessPublish = false;
}
}
Task HandleInterceptingSubscription(InterceptingSubscriptionEventArgs args)
{
// Add filtering logic here if needed
args.ProcessSubscription = true;
return Task.CompletedTask;
}
Task HandleValidatingConnection(ValidatingConnectionEventArgs args)
{
// Add connection / authentication logic here if needed
args.ReasonCode = MqttConnectReasonCode.Success;
return Task.CompletedTask;
}
bool IsValidServiceEnvelope(ServiceEnvelope serviceEnvelope)
{
return !(String.IsNullOrWhiteSpace(serviceEnvelope.ChannelId) ||
String.IsNullOrWhiteSpace(serviceEnvelope.GatewayId) ||
serviceEnvelope.Packet == null ||
serviceEnvelope.Packet.Id < 1 ||
serviceEnvelope.Packet.From < 1 ||
serviceEnvelope.Packet.Encrypted == null ||
serviceEnvelope.Packet.Encrypted.Length < 1 ||
serviceEnvelope.Packet.Decoded != null);
}
void LogReceivedMessage(string topic, string clientId, Data? data)
{
if (data?.Portnum == PortNum.TextMessageApp)
{
Log.Logger.Information("Received text message on topic {@Topic} from {@ClientId}: {@Message}",
topic, clientId, data.Payload.ToStringUtf8());
}
else
{
Log.Logger.Information("Received packet on topic {@Topic} from {@ClientId} with port number: {@Portnum}",
topic, clientId, data?.Portnum);
}
}
static Data? DecryptMeshPacket(ServiceEnvelope serviceEnvelope)
{
var nonce = new NonceGenerator(serviceEnvelope.Packet.From, serviceEnvelope.Packet.Id).Create();
var decrypted = PacketEncryption.TransformPacket(serviceEnvelope.Packet.Encrypted.ToByteArray(), nonce, Resources.DEFAULT_PSK);
var payload = Data.Parser.ParseFrom(decrypted);
if (payload.Portnum > PortNum.UnknownApp && payload.Payload.Length > 0)
return payload;
return null;
}
async Task SetupGracefulShutdown(MqttServer mqttServer, IHostApplicationLifetime lifetime, IHost host)
{
var ended = new ManualResetEventSlim();
var starting = new ManualResetEventSlim();
AssemblyLoadContext.Default.Unloading += ctx =>
{
starting.Set();
Log.Logger.Debug("Waiting for completion");
ended.Wait();
};
starting.Wait();
Log.Logger.Debug("Received signal gracefully shutting down");
await mqttServer.StopAsync();
Thread.Sleep(500);
ended.Set();
lifetime.StopApplication();
await host.WaitForShutdownAsync();
}
static IHostBuilder CreateHostBuilder(string[] args)
{
return Host.CreateDefaultBuilder(args)
.UseConsoleLifetime()
.ConfigureServices((hostContext, services) =>
{
services.AddSingleton(Console.Out);
});
}