diff --git a/.env.example b/.env.example index 361f056..4a1494f 100644 --- a/.env.example +++ b/.env.example @@ -16,7 +16,7 @@ MESHSTREAM_SITE_DESCRIPTION=Development instance - Meshtastic activity in Bay Ar # For local development, point to local server # (empty for production where API is on same domain) -MESHSTREAM_API_BASE_URL=http://localhost:8080 +MESHSTREAM_API_BASE_URL=http://localhost:5446 # Google Maps API configuration - required for maps to work # Get keys at: https://developers.google.com/maps/documentation/javascript/get-api-key @@ -36,7 +36,7 @@ MESHSTREAM_MQTT_TOPIC_PREFIX=msh/US/bayarea # Server configuration MESHSTREAM_SERVER_HOST=0.0.0.0 # Listen on all interfaces -MESHSTREAM_SERVER_PORT=8080 # Standard web port +MESHSTREAM_SERVER_PORT=5446 # Standard web port MESHSTREAM_STATIC_DIR=/app/static # Logging and debugging diff --git a/.gitignore b/.gitignore index 60b3e00..dddb121 100644 --- a/.gitignore +++ b/.gitignore @@ -78,4 +78,4 @@ temp/ # AI Agent files **/.claude/settings.local.json -.private-journal/* \ No newline at end of file +.private-journal/*.pnpm-store/ diff --git a/Dockerfile b/Dockerfile index 618d212..e806a9d 100644 --- a/Dockerfile +++ b/Dockerfile @@ -24,16 +24,12 @@ ARG MESHSTREAM_API_BASE_URL="" ARG MESHSTREAM_APP_ENV="production" ARG MESHSTREAM_SITE_TITLE ARG MESHSTREAM_SITE_DESCRIPTION -ARG MESHSTREAM_GOOGLE_MAPS_ID -ARG MESHSTREAM_GOOGLE_MAPS_API_KEY # Convert MESHSTREAM_ prefixed args to VITE_ environment variables for the web build ENV VITE_API_BASE_URL=${MESHSTREAM_API_BASE_URL} \ VITE_APP_ENV=${MESHSTREAM_APP_ENV} \ VITE_SITE_TITLE=${MESHSTREAM_SITE_TITLE} \ - VITE_SITE_DESCRIPTION=${MESHSTREAM_SITE_DESCRIPTION} \ - VITE_GOOGLE_MAPS_ID=${MESHSTREAM_GOOGLE_MAPS_ID} \ - VITE_GOOGLE_MAPS_API_KEY=${MESHSTREAM_GOOGLE_MAPS_API_KEY} + VITE_SITE_DESCRIPTION=${MESHSTREAM_SITE_DESCRIPTION} # Build the web app RUN pnpm build @@ -93,11 +89,11 @@ RUN chown -R meshstream:meshstream /app USER meshstream # Expose the application port -EXPOSE 8080 +EXPOSE 5446 # Server configuration ENV MESHSTREAM_SERVER_HOST=0.0.0.0 -ENV MESHSTREAM_SERVER_PORT=8080 +ENV MESHSTREAM_SERVER_PORT=5446 ENV MESHSTREAM_STATIC_DIR=/app/static # Reporting configuration diff --git a/Makefile b/Makefile index 898bfe8..d1c87a6 100644 --- a/Makefile +++ b/Makefile @@ -10,7 +10,7 @@ WEB_DIST_DIR := $(ROOT_DIR)/dist/static # Proto compilation PROTOC_GEN_GO := $(TOOLS_DIR)/protoc-gen-go -PROTO_FILES := $(shell find $(ROOT_DIR) -name "*.proto" | sed 's|$(ROOT_DIR)/||' ) +PROTO_FILES := $(shell find $(ROOT_DIR)/proto -name "*.proto" | sed 's|$(ROOT_DIR)/||' ) # Build the application build: @@ -56,23 +56,16 @@ gen-proto: tools $(PROTO_FILES) @echo "Generated Go code from Protocol Buffers" -# Clean generated files +# Clean generated files and tool binaries clean: rm -rf dist rm -rf $(BIN_DIR) find . -name "*.pb.go" -type f -delete -# Install tools needed for development -tools: $(PROTOC_GEN_GO) - -$(TOOLS_DIR): +# Install tools needed for development (always reinstalls to ensure correct platform binary) +tools: mkdir -p $(TOOLS_DIR) - -# Install the protoc-gen-go tool -$(PROTOC_GEN_GO): $(TOOLS_DIR) GOBIN=$(abspath $(TOOLS_DIR)) go install google.golang.org/protobuf/cmd/protoc-gen-go@latest - echo "Installed protoc-gen-go in $(TOOLS_DIR)" - ls $(TOOLS_DIR) # Web application commands # Run the web application in development mode @@ -102,21 +95,19 @@ docker-build: --build-arg "MESHSTREAM_APP_ENV=$${MESHSTREAM_APP_ENV:-production}" \ --build-arg "MESHSTREAM_SITE_TITLE=$${MESHSTREAM_SITE_TITLE:-Meshstream}" \ --build-arg "MESHSTREAM_SITE_DESCRIPTION=$${MESHSTREAM_SITE_DESCRIPTION:-Meshtastic activity monitoring}" \ - --build-arg "MESHSTREAM_GOOGLE_MAPS_ID=$${MESHSTREAM_GOOGLE_MAPS_ID:-4f089fb2d9fbb3db}" \ - --build-arg "MESHSTREAM_GOOGLE_MAPS_API_KEY=$${MESHSTREAM_GOOGLE_MAPS_API_KEY:-}" \ --load \ -t meshstream \ . # Run Docker container with environment variables docker-run: docker-build - docker run -p 8080:8080 \ + docker run -p 5446:5446 \ -e MESHSTREAM_MQTT_BROKER=$${MESHSTREAM_MQTT_BROKER:-mqtt.bayme.sh} \ -e MESHSTREAM_MQTT_USERNAME=$${MESHSTREAM_MQTT_USERNAME:-meshdev} \ -e MESHSTREAM_MQTT_PASSWORD=$${MESHSTREAM_MQTT_PASSWORD:-large4cats} \ -e MESHSTREAM_MQTT_TOPIC_PREFIX=$${MESHSTREAM_MQTT_TOPIC_PREFIX:-msh/US/bayarea} \ -e MESHSTREAM_SERVER_HOST=0.0.0.0 \ - -e MESHSTREAM_SERVER_PORT=$${MESHSTREAM_SERVER_PORT:-8080} \ + -e MESHSTREAM_SERVER_PORT=$${MESHSTREAM_SERVER_PORT:-5446} \ -e MESHSTREAM_STATIC_DIR=/app/static \ -e MESHSTREAM_LOG_LEVEL=$${MESHSTREAM_LOG_LEVEL:-info} \ -e MESHSTREAM_VERBOSE_LOGGING=$${MESHSTREAM_VERBOSE_LOGGING:-false} \ diff --git a/README.md b/README.md index 8add1de..e79fed1 100644 --- a/README.md +++ b/README.md @@ -80,14 +80,15 @@ Meshstream can be configured through environment variables, command-line flags, ### Web UI Configuration (Build-time) -These must be set at build time (via Docker build args): +These must be set at build time (via Docker build args or `web/.env.local`): -| Build Variable | Description | -|----------------|-------------| -| `MESHSTREAM_GOOGLE_MAPS_API_KEY` | Google Maps API key for map visualization | -| `MESHSTREAM_GOOGLE_MAPS_ID` | Google Maps map ID | -| `MESHSTREAM_SITE_TITLE` | Custom site title | -| `MESHSTREAM_SITE_DESCRIPTION` | Custom site description | +| Build Variable | Default | Description | +|----------------|---------|-------------| +| `VITE_SITE_TITLE` | My Mesh | Site title shown in the browser tab | +| `VITE_SITE_DESCRIPTION` | Realtime Meshtastic activity via MQTT. | Meta description | +| `VITE_API_BASE_URL` | _(empty — same origin)_ | API base URL, if serving from a different host | + +Maps use [MapLibre GL JS](https://maplibre.org/) with free CartoDB Dark Matter tiles — no API key required. For complete configuration options, see the Dockerfile and docker-compose.yml. diff --git a/decoder/decoder.go b/decoder/decoder.go index d2203f0..337999a 100644 --- a/decoder/decoder.go +++ b/decoder/decoder.go @@ -67,11 +67,19 @@ func ParseTopic(topic string) (*meshtreampb.TopicInfo, error) { return info, nil } +// maxPayloadBytes caps the encrypted payload size we will attempt to decrypt. +// Meshtastic radio packets are capped at ~256 bytes; this limit blocks malformed +// or oversized messages from consuming CPU on the decode path. +const maxPayloadBytes = 4096 + // DecodeEncodedMessage decodes a binary encoded message (format "e") func DecodeEncodedMessage(payload []byte) (*pb.ServiceEnvelope, error) { + if len(payload) > maxPayloadBytes { + return nil, fmt.Errorf("OVERSIZED_PAYLOAD") + } var serviceEnvelope pb.ServiceEnvelope if err := proto.Unmarshal(payload, &serviceEnvelope); err != nil { - return nil, fmt.Errorf("failed to unmarshal ServiceEnvelope: %v", err) + return nil, fmt.Errorf("DECODE_ERROR") } return &serviceEnvelope, nil } @@ -97,7 +105,7 @@ func DecodeMessage(payload []byte, topicInfo *meshtreampb.TopicInfo) *meshtreamp // Extract mesh packet fields if available packet := envelope.GetPacket() if packet == nil { - data.DecodeError = "no mesh packet in envelope" + data.DecodeError = "NO_PACKET" return data } @@ -123,7 +131,7 @@ func DecodeMessage(payload []byte, topicInfo *meshtreampb.TopicInfo) *meshtreamp // Packet is encrypted, try to decrypt it decodeEncryptedPayload(data, packet.GetEncrypted(), envelope.GetChannelId(), packet.GetId(), packet.GetFrom()) } else { - data.DecodeError = "packet has no payload" + data.DecodeError = "NO_PAYLOAD" } return data @@ -160,7 +168,7 @@ func decodeDataPayload(data *meshtreampb.Data, pbData *pb.Data) { // Position data var position pb.Position if err := proto.Unmarshal(payload, &position); err != nil { - data.DecodeError = fmt.Sprintf("failed to unmarshal Position data: %v", err) + data.DecodeError = "PARSE_ERROR" } else { data.Payload = &meshtreampb.Data_Position{ Position: &position, @@ -171,7 +179,7 @@ func decodeDataPayload(data *meshtreampb.Data, pbData *pb.Data) { // Node information var user pb.User if err := proto.Unmarshal(payload, &user); err != nil { - data.DecodeError = fmt.Sprintf("failed to unmarshal User data: %v", err) + data.DecodeError = "PARSE_ERROR" } else { data.Payload = &meshtreampb.Data_NodeInfo{ NodeInfo: &user, @@ -182,7 +190,7 @@ func decodeDataPayload(data *meshtreampb.Data, pbData *pb.Data) { // Telemetry data var telemetry pb.Telemetry if err := proto.Unmarshal(payload, &telemetry); err != nil { - data.DecodeError = fmt.Sprintf("failed to unmarshal Telemetry data: %v", err) + data.DecodeError = "PARSE_ERROR" } else { data.Payload = &meshtreampb.Data_Telemetry{ Telemetry: &telemetry, @@ -193,7 +201,7 @@ func decodeDataPayload(data *meshtreampb.Data, pbData *pb.Data) { // Waypoint data var waypoint pb.Waypoint if err := proto.Unmarshal(payload, &waypoint); err != nil { - data.DecodeError = fmt.Sprintf("failed to unmarshal Waypoint data: %v", err) + data.DecodeError = "PARSE_ERROR" } else { data.Payload = &meshtreampb.Data_Waypoint{ Waypoint: &waypoint, @@ -204,7 +212,7 @@ func decodeDataPayload(data *meshtreampb.Data, pbData *pb.Data) { // Map report data var mapReport pb.MapReport if err := proto.Unmarshal(payload, &mapReport); err != nil { - data.DecodeError = fmt.Sprintf("failed to unmarshal MapReport data: %v", err) + data.DecodeError = "PARSE_ERROR" } else { data.Payload = &meshtreampb.Data_MapReport{ MapReport: &mapReport, @@ -215,7 +223,7 @@ func decodeDataPayload(data *meshtreampb.Data, pbData *pb.Data) { // Traceroute data var routeDiscovery pb.RouteDiscovery if err := proto.Unmarshal(payload, &routeDiscovery); err != nil { - data.DecodeError = fmt.Sprintf("failed to unmarshal RouteDiscovery data: %v", err) + data.DecodeError = "PARSE_ERROR" } else { data.Payload = &meshtreampb.Data_RouteDiscovery{ RouteDiscovery: &routeDiscovery, @@ -226,7 +234,7 @@ func decodeDataPayload(data *meshtreampb.Data, pbData *pb.Data) { // Neighbor information data var neighborInfo pb.NeighborInfo if err := proto.Unmarshal(payload, &neighborInfo); err != nil { - data.DecodeError = fmt.Sprintf("failed to unmarshal NeighborInfo data: %v", err) + data.DecodeError = "PARSE_ERROR" } else { data.Payload = &meshtreampb.Data_NeighborInfo{ NeighborInfo: &neighborInfo, @@ -237,7 +245,7 @@ func decodeDataPayload(data *meshtreampb.Data, pbData *pb.Data) { // Remote hardware data var hardware pb.HardwareMessage if err := proto.Unmarshal(payload, &hardware); err != nil { - data.DecodeError = fmt.Sprintf("failed to unmarshal HardwareMessage data: %v", err) + data.DecodeError = "PARSE_ERROR" } else { data.Payload = &meshtreampb.Data_RemoteHardware{ RemoteHardware: &hardware, @@ -248,7 +256,7 @@ func decodeDataPayload(data *meshtreampb.Data, pbData *pb.Data) { // Routing data var routing pb.Routing if err := proto.Unmarshal(payload, &routing); err != nil { - data.DecodeError = fmt.Sprintf("failed to unmarshal Routing data: %v", err) + data.DecodeError = "PARSE_ERROR" } else { data.Payload = &meshtreampb.Data_Routing{ Routing: &routing, @@ -259,7 +267,7 @@ func decodeDataPayload(data *meshtreampb.Data, pbData *pb.Data) { // Admin data var admin pb.AdminMessage if err := proto.Unmarshal(payload, &admin); err != nil { - data.DecodeError = fmt.Sprintf("failed to unmarshal AdminMessage data: %v", err) + data.DecodeError = "PARSE_ERROR" } else { data.Payload = &meshtreampb.Data_Admin{ Admin: &admin, @@ -270,7 +278,7 @@ func decodeDataPayload(data *meshtreampb.Data, pbData *pb.Data) { // Paxcount data var paxcount pb.Paxcount if err := proto.Unmarshal(payload, &paxcount); err != nil { - data.DecodeError = fmt.Sprintf("failed to unmarshal Paxcount data: %v", err) + data.DecodeError = "PARSE_ERROR" } else { data.Payload = &meshtreampb.Data_Paxcounter{ Paxcounter: &paxcount, @@ -289,14 +297,14 @@ func decodeDataPayload(data *meshtreampb.Data, pbData *pb.Data) { func decodeEncryptedPayload(data *meshtreampb.Data, encrypted []byte, channelId string, packetId, fromNode uint32) { // Attempt to decrypt the payload using the channel key if channelId == "" { - data.DecodeError = "encrypted packet has no channel ID" + data.DecodeError = "NO_CHANNEL_ID" return } channelKey := GetChannelKey(channelId) decrypted, err := XOR(encrypted, channelKey, packetId, fromNode) if err != nil { - data.DecodeError = fmt.Sprintf("failed to decrypt payload: %v", err) + data.DecodeError = "DECRYPT_FAILED" return } @@ -312,9 +320,9 @@ func decodeEncryptedPayload(data *meshtreampb.Data, encrypted []byte, channelId } else { // Check if this channel is configured - if not, likely a private message if !IsChannelConfigured(channelId) { - data.DecodeError = fmt.Sprintf("PRIVATE_CHANNEL: failed to parse decrypted data on unconfigured channel '%s': %v", channelId, err) + data.DecodeError = "PRIVATE_CHANNEL" } else { - data.DecodeError = fmt.Sprintf("PARSE_ERROR: failed to parse decrypted data: %v", err) + data.DecodeError = "PARSE_ERROR" } data.Payload = &meshtreampb.Data_BinaryData{ BinaryData: decrypted, diff --git a/docker-compose.yml b/docker-compose.yml index 3e76b52..916d796 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -17,7 +17,7 @@ services: - MESHSTREAM_GOOGLE_MAPS_ID=${MESHSTREAM_GOOGLE_MAPS_ID} - MESHSTREAM_GOOGLE_MAPS_API_KEY=${MESHSTREAM_GOOGLE_MAPS_API_KEY} ports: - - "8080:8080" + - "5446:5446" environment: # Runtime configuration with defaults from .env file or inline defaults # MQTT connection settings @@ -37,7 +37,7 @@ services: # Server configuration - MESHSTREAM_SERVER_HOST=${MESHSTREAM_SERVER_HOST:-0.0.0.0} - - MESHSTREAM_SERVER_PORT=${MESHSTREAM_SERVER_PORT:-8080} + - MESHSTREAM_SERVER_PORT=${MESHSTREAM_SERVER_PORT:-5446} - MESHSTREAM_STATIC_DIR=${MESHSTREAM_STATIC_DIR:-/app/static} # Logging and debugging @@ -49,7 +49,7 @@ services: - MESHSTREAM_CHANNEL_KEYS=${MESHSTREAM_CHANNEL_KEYS:-} restart: unless-stopped healthcheck: - test: ["CMD", "wget", "-q", "--spider", "http://localhost:8080/api/status"] + test: ["CMD", "wget", "-q", "--spider", "http://localhost:5446/api/status"] interval: 30s timeout: 5s retries: 3 \ No newline at end of file diff --git a/generated/google/protobuf/descriptor.pb.go b/generated/google/protobuf/descriptor.pb.go index fc84964..ea59f35 100644 --- a/generated/google/protobuf/descriptor.pb.go +++ b/generated/google/protobuf/descriptor.pb.go @@ -39,7 +39,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.36.11 -// protoc v4.25.1 +// protoc v5.29.3 // source: google/protobuf/descriptor.proto package descriptorpb diff --git a/generated/meshstream/meshstream.pb.go b/generated/meshstream/meshstream.pb.go index 5e26fac..beda4c3 100644 --- a/generated/meshstream/meshstream.pb.go +++ b/generated/meshstream/meshstream.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.36.11 -// protoc v4.25.1 +// protoc v5.29.3 // source: meshstream/meshstream.proto package meshtreampb diff --git a/generated/meshtastic/admin.pb.go b/generated/meshtastic/admin.pb.go index bd1fb57..bb370c1 100644 --- a/generated/meshtastic/admin.pb.go +++ b/generated/meshtastic/admin.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.36.11 -// protoc v4.25.1 +// protoc v5.29.3 // source: meshtastic/admin.proto package meshtastic diff --git a/generated/meshtastic/apponly.pb.go b/generated/meshtastic/apponly.pb.go index ff57be2..72265ce 100644 --- a/generated/meshtastic/apponly.pb.go +++ b/generated/meshtastic/apponly.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.36.11 -// protoc v4.25.1 +// protoc v5.29.3 // source: meshtastic/apponly.proto package meshtastic diff --git a/generated/meshtastic/atak.pb.go b/generated/meshtastic/atak.pb.go index 7594c46..47466bb 100644 --- a/generated/meshtastic/atak.pb.go +++ b/generated/meshtastic/atak.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.36.11 -// protoc v4.25.1 +// protoc v5.29.3 // source: meshtastic/atak.proto package meshtastic diff --git a/generated/meshtastic/cannedmessages.pb.go b/generated/meshtastic/cannedmessages.pb.go index 449ad0c..b2a025c 100644 --- a/generated/meshtastic/cannedmessages.pb.go +++ b/generated/meshtastic/cannedmessages.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.36.11 -// protoc v4.25.1 +// protoc v5.29.3 // source: meshtastic/cannedmessages.proto package meshtastic diff --git a/generated/meshtastic/channel.pb.go b/generated/meshtastic/channel.pb.go index 8cf3268..93ebc54 100644 --- a/generated/meshtastic/channel.pb.go +++ b/generated/meshtastic/channel.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.36.11 -// protoc v4.25.1 +// protoc v5.29.3 // source: meshtastic/channel.proto package meshtastic diff --git a/generated/meshtastic/clientonly.pb.go b/generated/meshtastic/clientonly.pb.go index f94f41e..dfc261a 100644 --- a/generated/meshtastic/clientonly.pb.go +++ b/generated/meshtastic/clientonly.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.36.11 -// protoc v4.25.1 +// protoc v5.29.3 // source: meshtastic/clientonly.proto package meshtastic diff --git a/generated/meshtastic/config.pb.go b/generated/meshtastic/config.pb.go index d5bae3a..b404ea3 100644 --- a/generated/meshtastic/config.pb.go +++ b/generated/meshtastic/config.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.36.11 -// protoc v4.25.1 +// protoc v5.29.3 // source: meshtastic/config.proto package meshtastic diff --git a/generated/meshtastic/connection_status.pb.go b/generated/meshtastic/connection_status.pb.go index 359abfd..dfdcf97 100644 --- a/generated/meshtastic/connection_status.pb.go +++ b/generated/meshtastic/connection_status.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.36.11 -// protoc v4.25.1 +// protoc v5.29.3 // source: meshtastic/connection_status.proto package meshtastic diff --git a/generated/meshtastic/device_ui.pb.go b/generated/meshtastic/device_ui.pb.go index 24301d5..1a4c72c 100644 --- a/generated/meshtastic/device_ui.pb.go +++ b/generated/meshtastic/device_ui.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.36.11 -// protoc v4.25.1 +// protoc v5.29.3 // source: meshtastic/device_ui.proto package meshtastic diff --git a/generated/meshtastic/deviceonly.pb.go b/generated/meshtastic/deviceonly.pb.go index f9b420e..fcbd71d 100644 --- a/generated/meshtastic/deviceonly.pb.go +++ b/generated/meshtastic/deviceonly.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.36.11 -// protoc v4.25.1 +// protoc v5.29.3 // source: meshtastic/deviceonly.proto package meshtastic diff --git a/generated/meshtastic/interdevice.pb.go b/generated/meshtastic/interdevice.pb.go index c83ab6f..0857091 100644 --- a/generated/meshtastic/interdevice.pb.go +++ b/generated/meshtastic/interdevice.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.36.11 -// protoc v4.25.1 +// protoc v5.29.3 // source: meshtastic/interdevice.proto package meshtastic diff --git a/generated/meshtastic/localonly.pb.go b/generated/meshtastic/localonly.pb.go index 7ff62a3..6063a61 100644 --- a/generated/meshtastic/localonly.pb.go +++ b/generated/meshtastic/localonly.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.36.11 -// protoc v4.25.1 +// protoc v5.29.3 // source: meshtastic/localonly.proto package meshtastic diff --git a/generated/meshtastic/mesh.pb.go b/generated/meshtastic/mesh.pb.go index c87eb87..a3ca02b 100644 --- a/generated/meshtastic/mesh.pb.go +++ b/generated/meshtastic/mesh.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.36.11 -// protoc v4.25.1 +// protoc v5.29.3 // source: meshtastic/mesh.proto package meshtastic diff --git a/generated/meshtastic/module_config.pb.go b/generated/meshtastic/module_config.pb.go index 91cc83b..19ada24 100644 --- a/generated/meshtastic/module_config.pb.go +++ b/generated/meshtastic/module_config.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.36.11 -// protoc v4.25.1 +// protoc v5.29.3 // source: meshtastic/module_config.proto package meshtastic diff --git a/generated/meshtastic/mqtt.pb.go b/generated/meshtastic/mqtt.pb.go index 63ac9b7..a69018c 100644 --- a/generated/meshtastic/mqtt.pb.go +++ b/generated/meshtastic/mqtt.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.36.11 -// protoc v4.25.1 +// protoc v5.29.3 // source: meshtastic/mqtt.proto package meshtastic diff --git a/generated/meshtastic/nanopb.pb.go b/generated/meshtastic/nanopb.pb.go index 8053e21..82046ea 100644 --- a/generated/meshtastic/nanopb.pb.go +++ b/generated/meshtastic/nanopb.pb.go @@ -8,7 +8,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.36.11 -// protoc v4.25.1 +// protoc v5.29.3 // source: meshtastic/nanopb.proto package meshtastic diff --git a/generated/meshtastic/paxcount.pb.go b/generated/meshtastic/paxcount.pb.go index 969e59c..f7ba7e1 100644 --- a/generated/meshtastic/paxcount.pb.go +++ b/generated/meshtastic/paxcount.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.36.11 -// protoc v4.25.1 +// protoc v5.29.3 // source: meshtastic/paxcount.proto package meshtastic diff --git a/generated/meshtastic/portnums.pb.go b/generated/meshtastic/portnums.pb.go index 1a3ad0d..8cbaaf7 100644 --- a/generated/meshtastic/portnums.pb.go +++ b/generated/meshtastic/portnums.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.36.11 -// protoc v4.25.1 +// protoc v5.29.3 // source: meshtastic/portnums.proto package meshtastic diff --git a/generated/meshtastic/powermon.pb.go b/generated/meshtastic/powermon.pb.go index b500dfa..96bbfc0 100644 --- a/generated/meshtastic/powermon.pb.go +++ b/generated/meshtastic/powermon.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.36.11 -// protoc v4.25.1 +// protoc v5.29.3 // source: meshtastic/powermon.proto package meshtastic diff --git a/generated/meshtastic/remote_hardware.pb.go b/generated/meshtastic/remote_hardware.pb.go index b7650ed..562dc5c 100644 --- a/generated/meshtastic/remote_hardware.pb.go +++ b/generated/meshtastic/remote_hardware.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.36.11 -// protoc v4.25.1 +// protoc v5.29.3 // source: meshtastic/remote_hardware.proto package meshtastic diff --git a/generated/meshtastic/rtttl.pb.go b/generated/meshtastic/rtttl.pb.go index 6d3e823..1e19cfe 100644 --- a/generated/meshtastic/rtttl.pb.go +++ b/generated/meshtastic/rtttl.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.36.11 -// protoc v4.25.1 +// protoc v5.29.3 // source: meshtastic/rtttl.proto package meshtastic diff --git a/generated/meshtastic/storeforward.pb.go b/generated/meshtastic/storeforward.pb.go index 41d6702..6aec5c9 100644 --- a/generated/meshtastic/storeforward.pb.go +++ b/generated/meshtastic/storeforward.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.36.11 -// protoc v4.25.1 +// protoc v5.29.3 // source: meshtastic/storeforward.proto package meshtastic diff --git a/generated/meshtastic/telemetry.pb.go b/generated/meshtastic/telemetry.pb.go index e5414a1..20bf631 100644 --- a/generated/meshtastic/telemetry.pb.go +++ b/generated/meshtastic/telemetry.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.36.11 -// protoc v4.25.1 +// protoc v5.29.3 // source: meshtastic/telemetry.proto package meshtastic diff --git a/generated/meshtastic/xmodem.pb.go b/generated/meshtastic/xmodem.pb.go index 615fe40..685df2c 100644 --- a/generated/meshtastic/xmodem.pb.go +++ b/generated/meshtastic/xmodem.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.36.11 -// protoc v4.25.1 +// protoc v5.29.3 // source: meshtastic/xmodem.proto package meshtastic diff --git a/go.sum b/go.sum index 12eb04c..fe433c2 100644 --- a/go.sum +++ b/go.sum @@ -10,12 +10,6 @@ github.com/cpuguy83/go-md2man/v2 v2.0.6/go.mod h1:oOW0eioCTA6cOiMLiUPZOpcVxMig6N github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/dpup/logista v1.0.7 h1:TTlwktow4qw2C1p454wQt1a/WoXJGMd+DstF/+cap+s= -github.com/dpup/logista v1.0.7/go.mod h1:B8txXLc5xuzFilYllCZbh+jBOqACXmjjRan0mwtog8U= -github.com/dpup/logista v1.0.8 h1:aP90nYVITQdKkP9JDpoz8EslMl+IUQLbq3sI2aCen+4= -github.com/dpup/logista v1.0.8/go.mod h1:B8txXLc5xuzFilYllCZbh+jBOqACXmjjRan0mwtog8U= -github.com/dpup/logista v1.0.9 h1:0jAyW5IW9rjqrz+5M81RhGEvEIa/Ua5++EiQ/7CJn9M= -github.com/dpup/logista v1.0.9/go.mod h1:B8txXLc5xuzFilYllCZbh+jBOqACXmjjRan0mwtog8U= github.com/dpup/logista v1.0.10 h1:d4zi4/wXhC6trcYLMyw47VK+8A6ELlfIDq/DoKXnEJ0= github.com/dpup/logista v1.0.10/go.mod h1:B8txXLc5xuzFilYllCZbh+jBOqACXmjjRan0mwtog8U= github.com/dpup/prefab v0.2.0 h1:g9SB58vTX0DO+U74occ0WR5xGEY0HXg6BillxnVTDJU= diff --git a/main.go b/main.go index 94224c4..635e7d6 100644 --- a/main.go +++ b/main.go @@ -40,9 +40,10 @@ type Config struct { ChannelKeys []string // Statistics configuration - StatsInterval time.Duration - CacheSize int - VerboseLogging bool + StatsInterval time.Duration + CacheSize int + CacheRetention time.Duration + VerboseLogging bool } // getEnv retrieves an environment variable with the given prefix or returns the default value @@ -85,14 +86,15 @@ func parseConfig() *Config { // Web server configuration flag.StringVar(&config.ServerHost, "server-host", getEnv("SERVER_HOST", "localhost"), "Web server host") - flag.StringVar(&config.ServerPort, "server-port", getEnv("SERVER_PORT", "8080"), "Web server port") + flag.StringVar(&config.ServerPort, "server-port", getEnv("SERVER_PORT", "5446"), "Web server port") flag.StringVar(&config.StaticDir, "static-dir", getEnv("STATIC_DIR", "./server/static"), "Directory containing static web files") // Channel key configuration (comma separated list of name:key pairs) channelKeysDefault := getEnv("CHANNEL_KEYS", "LongFast:"+decoder.DefaultPrivateKey) channelKeysFlag := flag.String("channel-keys", channelKeysDefault, "Comma-separated list of channel:key pairs for encrypted channels") - flag.IntVar(&config.CacheSize, "cache-size", intFromEnv("CACHE_SIZE", 200), "Number of packets to cache for new subscribers") + flag.IntVar(&config.CacheSize, "cache-size", intFromEnv("CACHE_SIZE", 5000), "Maximum number of packets to retain in the cache") + flag.DurationVar(&config.CacheRetention, "cache-retention", durationFromEnv("CACHE_RETENTION", 3*time.Hour), "How long to retain a node's packets after its last activity") flag.BoolVar(&config.VerboseLogging, "verbose", boolFromEnv("VERBOSE_LOGGING", false), "Enable verbose message logging") flag.Parse() @@ -208,8 +210,8 @@ func main() { // Create a message broker to distribute messages to multiple consumers // Cache packets for new subscribers based on configuration - broker := mqtt.NewBroker(messagesChan, config.CacheSize, logger) - logger.Infof("Message broker initialized with cache size: %d", config.CacheSize) + broker := mqtt.NewBroker(messagesChan, config.CacheSize, config.CacheRetention, logger) + logger.Infof("Message broker initialized with cache size: %d, retention: %s", config.CacheSize, config.CacheRetention) // Create a message logger that subscribes to the broker // and also logs to stdout diff --git a/moat.yaml b/moat.yaml index 11917b3..0dc1da4 100644 --- a/moat.yaml +++ b/moat.yaml @@ -15,12 +15,12 @@ env: MESHSTREAM_MQTT_PASSWORD: large4cats MESHSTREAM_MQTT_TOPIC_PREFIX: msh/US/bayarea MESHSTREAM_SERVER_HOST: "0.0.0.0" - MESHSTREAM_SERVER_PORT: "8080" + MESHSTREAM_SERVER_PORT: "5446" MESHSTREAM_LOG_LEVEL: info MESHSTREAM_VERBOSE_LOGGING: "false" MESHSTREAM_CACHE_SIZE: "1000" hooks: pre_run: "cd /workspace/web && pnpm install" ports: - web: 8080 + web: 5446 runtime: docker diff --git a/mqtt/broker.go b/mqtt/broker.go index 0b99222..c628078 100644 --- a/mqtt/broker.go +++ b/mqtt/broker.go @@ -5,124 +5,284 @@ import ( "time" meshtreampb "meshstream/generated/meshstream" + pb "meshstream/generated/meshtastic" "github.com/dpup/prefab/logging" ) -// Time to wait before giving up on sending cached packets +// Time to wait before giving up on sending cached packets to a new subscriber. var cacheGracePeriod = 1500 * time.Millisecond -// CircularBuffer implements a fixed-size circular buffer for caching packets -type CircularBuffer struct { - buffer []*meshtreampb.Packet // Fixed size buffer to store packets - size int // Size of the buffer - next int // Index where the next packet will be stored - count int // Number of packets currently in the buffer - mutex sync.RWMutex // Lock for thread-safe access +// minEvictAge is the minimum age a packet must reach before it is eligible for +// priority-based eviction. Recent traffic is never evicted; only historical +// data competes under cache pressure. +var minEvictAge = time.Hour + +// typePriority defines the eviction priority for each packet type. +// Higher values indicate greater importance: under cache pressure the oldest +// eligible packet of the lowest-priority type is evicted first, so rare +// high-value types (neighbour-info, chat) outlive frequent low-value types +// (node-info, telemetry) in the historical window. +// +// Packet types not listed use defaultTypePriority. +var typePriority = map[pb.PortNum]int{ + pb.PortNum_TEXT_MESSAGE_APP: 5, // chat — preserve history + pb.PortNum_TEXT_MESSAGE_COMPRESSED_APP: 5, + pb.PortNum_NEIGHBORINFO_APP: 4, // rare; protect from eviction + pb.PortNum_TRACEROUTE_APP: 3, + pb.PortNum_POSITION_APP: 3, + pb.PortNum_NODEINFO_APP: 2, // frequent; lower priority + pb.PortNum_TELEMETRY_APP: 2, + pb.PortNum_ROUTING_APP: 2, + pb.PortNum_MAP_REPORT_APP: 2, } -// NewCircularBuffer creates a new circular buffer with the given size -func NewCircularBuffer(size int) *CircularBuffer { - return &CircularBuffer{ - buffer: make([]*meshtreampb.Packet, size), - size: size, - next: 0, - count: 0, +// defaultTypePriority applies to any port type not listed above. +const defaultTypePriority = 1 + +// packetPriority returns the eviction priority for p. +func packetPriority(p *meshtreampb.Packet) int { + if pri, ok := typePriority[p.GetData().GetPortNum()]; ok { + return pri + } + return defaultTypePriority +} + +// entry wraps a packet with its cache insertion timestamp. +type entry struct { + pkt *meshtreampb.Packet + insertedAt int64 // unix timestamp when this packet was added to the cache +} + +// NodeAwareCache stores packets with two eviction axes: +// +// 1. Age protection: packets younger than minEvictAge are never evicted. +// This keeps all recent traffic intact regardless of pressure. +// +// 2. Priority-based eviction for historical data: when the global cap is hit +// and old packets must be removed, the cache evicts from the lowest-priority +// type first. Within that tier it picks the packet whose source node was +// most recently active — a Bélády approximation: the node that sent recently +// is most likely to resend, so its old packet is cheapest to lose. Silent +// nodes' (flaky/distant) historical packets are thus protected. +// +// Node retention: once a node has been silent for [retention], its packets are +// excluded from GetAll and proactively pruned when the cache is under pressure. +// +// Packets with from=0 (no identified source node) are always included in GetAll. +// They are never associated with a node's send rate, so they survive eviction +// longer than packets from chatty nodes within the same priority tier. +type NodeAwareCache struct { + mu sync.Mutex + entries []entry + nodeLastSeen map[uint32]int64 // nodeID → unix timestamp of most recent packet + maxSize int // global safety cap + retention time.Duration + nowFunc func() time.Time // injectable for testing +} + +// NewNodeAwareCache creates a cache with the given safety cap and node +// retention window. +func NewNodeAwareCache(maxSize int, retention time.Duration) *NodeAwareCache { + return &NodeAwareCache{ + entries: make([]entry, 0, min(maxSize, 256)), + nodeLastSeen: make(map[uint32]int64), + maxSize: maxSize, + retention: retention, + nowFunc: time.Now, } } -// Add adds a packet to the circular buffer -func (cb *CircularBuffer) Add(packet *meshtreampb.Packet) { - cb.mutex.Lock() - defer cb.mutex.Unlock() +// Add records a packet. Recent packets (younger than minEvictAge) are never +// evicted. When the global cap is hit, stale-node packets are pruned first; +// if still over the limit, the best historical eviction candidate is removed. +func (c *NodeAwareCache) Add(packet *meshtreampb.Packet) { + c.mu.Lock() + defer c.mu.Unlock() - cb.buffer[cb.next] = packet - cb.next = (cb.next + 1) % cb.size + nodeID := packet.GetData().GetFrom() + nowUnix := c.nowFunc().Unix() - // Update count if we haven't filled the buffer yet - if cb.count < cb.size { - cb.count++ + if nodeID != 0 { + c.nodeLastSeen[nodeID] = nowUnix + } + + c.entries = append(c.entries, entry{pkt: packet, insertedAt: nowUnix}) + + if len(c.entries) > c.maxSize { + c.pruneStale(nowUnix) + if len(c.entries) > c.maxSize { + c.evict(nowUnix) + } } } -// GetAll returns all packets in the buffer in chronological order -func (cb *CircularBuffer) GetAll() []*meshtreampb.Packet { - cb.mutex.RLock() - defer cb.mutex.RUnlock() +// GetAll returns all cached packets whose source node was active within the +// retention window. Packets with no source node (from=0) are always included. +// Returned in arrival order. +func (c *NodeAwareCache) GetAll() []*meshtreampb.Packet { + c.mu.Lock() + defer c.mu.Unlock() - if cb.count == 0 { + if len(c.entries) == 0 { return []*meshtreampb.Packet{} } - result := make([]*meshtreampb.Packet, cb.count) + cutoff := c.nowFunc().Unix() - int64(c.retention.Seconds()) - // If buffer isn't full yet, just copy from start to next - if cb.count < cb.size { - copy(result, cb.buffer[:cb.count]) - return result + activeNodes := make(map[uint32]bool, len(c.nodeLastSeen)) + for nodeID, lastSeen := range c.nodeLastSeen { + if lastSeen >= cutoff { + activeNodes[nodeID] = true + } } - // If buffer is full, we need to handle the wrap-around - // First copy from next to end - firstPartLength := cb.size - cb.next - if firstPartLength > 0 { - copy(result, cb.buffer[cb.next:]) + result := make([]*meshtreampb.Packet, 0, len(c.entries)) + for _, e := range c.entries { + nodeID := e.pkt.GetData().GetFrom() + if nodeID == 0 || activeNodes[nodeID] { + result = append(result, e.pkt) + } } - - // Then copy from start to next - if cb.next > 0 { - copy(result[firstPartLength:], cb.buffer[:cb.next]) - } - return result } -// Broker distributes messages from a source channel to multiple subscriber channels -type Broker struct { - sourceChan <-chan *meshtreampb.Packet // Source of packets (e.g., from MQTT client) - subscribers map[chan *meshtreampb.Packet]struct{} // Active subscribers - subscriberMutex sync.RWMutex // Lock for modifying the subscribers map - done chan struct{} // Signal to stop the dispatch loop - wg sync.WaitGroup // Wait group to ensure clean shutdown - logger logging.Logger // Logger for broker operations - cache *CircularBuffer // Circular buffer for caching packets +// evict removes the best eviction candidate. It first tries entries old enough +// to be eligible (insertedAt ≤ nowUnix - minEvictAge); if none qualify it falls +// back to all entries so the cap is always enforced. +// Must be called with c.mu held. +func (c *NodeAwareCache) evict(nowUnix int64) { + ageThreshold := nowUnix - int64(minEvictAge.Seconds()) + + idx := c.pickEvictTarget(ageThreshold) + if idx < 0 { + // All entries are recent; cap must still be enforced. + idx = c.pickEvictTarget(-1) + } + if idx < 0 { + return + } + c.entries = append(c.entries[:idx], c.entries[idx+1:]...) } -// NewBroker creates a new broker that distributes messages from sourceChannel to subscribers -func NewBroker(sourceChannel <-chan *meshtreampb.Packet, cacheSize int, logger logging.Logger) *Broker { +// pickEvictTarget returns the index of the best eviction candidate among entries +// with insertedAt ≤ ageThreshold. Pass ageThreshold = -1 to consider all entries. +// +// Selection criteria (in order): +// 1. Lowest priority tier — least important packet types go first. +// 2. Most recently active source node — that node is most likely to resend, +// so its old packet is cheapest to lose (Bélády approximation). +// Packets from=0 have nodeLastSeen=0 and are thus the last resort within +// a tier (they have no source that will refresh them). +// +// Returns -1 if no qualifying entries exist. +func (c *NodeAwareCache) pickEvictTarget(ageThreshold int64) int { + // First pass: find minimum priority among qualifying entries. + minPri := -1 + for _, e := range c.entries { + if ageThreshold >= 0 && e.insertedAt > ageThreshold { + continue + } + if pri := packetPriority(e.pkt); minPri < 0 || pri < minPri { + minPri = pri + } + } + if minPri < 0 { + return -1 // no qualifying entries + } + + // Second pass: among qualifying entries at minPri, pick the one from the + // most recently active source node (highest nodeLastSeen). + bestIdx := -1 + bestLastSeen := int64(-1) + for i, e := range c.entries { + if ageThreshold >= 0 && e.insertedAt > ageThreshold { + continue + } + if packetPriority(e.pkt) != minPri { + continue + } + nodeID := e.pkt.GetData().GetFrom() + lastSeen := c.nodeLastSeen[nodeID] // 0 for from=0 packets + if bestIdx < 0 || lastSeen > bestLastSeen { + bestIdx = i + bestLastSeen = lastSeen + } + } + return bestIdx +} + +// pruneStale removes all packets from nodes that haven't been heard within the +// retention window, and cleans up their tracking entries. +// Must be called with c.mu held. +func (c *NodeAwareCache) pruneStale(nowUnix int64) { + cutoff := nowUnix - int64(c.retention.Seconds()) + + stale := make(map[uint32]bool) + for nodeID, lastSeen := range c.nodeLastSeen { + if lastSeen < cutoff { + stale[nodeID] = true + delete(c.nodeLastSeen, nodeID) + } + } + if len(stale) == 0 { + return + } + + // Filter entries in-place. + out := c.entries[:0] + for _, e := range c.entries { + if nodeID := e.pkt.GetData().GetFrom(); nodeID == 0 || !stale[nodeID] { + out = append(out, e) + } + } + c.entries = out +} + +// ── Broker ──────────────────────────────────────────────────────────────────── + +// Broker distributes messages from a source channel to multiple subscriber channels. +type Broker struct { + sourceChan <-chan *meshtreampb.Packet + subscribers map[chan *meshtreampb.Packet]struct{} + subscriberMutex sync.RWMutex + done chan struct{} + wg sync.WaitGroup + logger logging.Logger + cache *NodeAwareCache +} + +// NewBroker creates a new broker. cacheSize is the global safety cap on total +// retained packets; retention controls per-node eviction after silence. +func NewBroker(sourceChannel <-chan *meshtreampb.Packet, cacheSize int, retention time.Duration, logger logging.Logger) *Broker { broker := &Broker{ sourceChan: sourceChannel, subscribers: make(map[chan *meshtreampb.Packet]struct{}), done: make(chan struct{}), logger: logger.Named("mqtt.broker"), - cache: NewCircularBuffer(cacheSize), + cache: NewNodeAwareCache(cacheSize, retention), } - // Start the dispatch loop broker.wg.Add(1) go broker.dispatchLoop() return broker } -// Subscribe creates and returns a new subscriber channel -// The bufferSize parameter controls how many messages can be buffered in the channel +// Subscribe creates and returns a new subscriber channel. The subscriber +// immediately receives all currently cached packets. func (b *Broker) Subscribe(bufferSize int) <-chan *meshtreampb.Packet { - // Create a new channel for this subscriber subscriberChan := make(chan *meshtreampb.Packet, bufferSize) - // Register the new subscriber b.subscriberMutex.Lock() b.subscribers[subscriberChan] = struct{}{} b.subscriberMutex.Unlock() - // Send cached packets to the new subscriber cachedPackets := b.cache.GetAll() if len(cachedPackets) > 0 { go func() { defer func() { if r := recover(); r != nil { - // This can happen if the channel was closed while we were sending b.logger.Warn("Recovered from panic when sending cached packets, channel likely closed") } }() @@ -130,9 +290,7 @@ func (b *Broker) Subscribe(bufferSize int) <-chan *meshtreampb.Packet { for _, packet := range cachedPackets { select { case subscriberChan <- packet: - // Successfully sent packet case <-time.After(cacheGracePeriod): - // Give up after waiting some time to avoid blocking indefinitely b.logger.Warn("Timeout when sending cached packet to new subscriber") return } @@ -140,16 +298,14 @@ func (b *Broker) Subscribe(bufferSize int) <-chan *meshtreampb.Packet { }() } - // Return the channel return subscriberChan } -// Unsubscribe removes a subscriber and closes its channel +// Unsubscribe removes a subscriber and closes its channel. func (b *Broker) Unsubscribe(ch <-chan *meshtreampb.Packet) { b.subscriberMutex.Lock() defer b.subscriberMutex.Unlock() - // Find the channel in our subscribers map for subCh := range b.subscribers { if subCh == ch { delete(b.subscribers, subCh) @@ -158,19 +314,14 @@ func (b *Broker) Unsubscribe(ch <-chan *meshtreampb.Packet) { } } - // If we get here, the channel wasn't found b.logger.Warn("Subscriber channel not found - cannot unsubscribe") } -// Close shuts down the broker and closes all subscriber channels +// Close shuts down the broker and closes all subscriber channels. func (b *Broker) Close() { - // Signal the dispatch loop to stop close(b.done) - - // Wait for the dispatch loop to exit b.wg.Wait() - // Close all subscriber channels b.subscriberMutex.Lock() defer b.subscriberMutex.Unlock() @@ -180,36 +331,32 @@ func (b *Broker) Close() { b.subscribers = make(map[chan *meshtreampb.Packet]struct{}) } -// dispatchLoop continuously reads from the source channel and distributes to subscribers +// dispatchLoop continuously reads from the source channel and distributes to subscribers. func (b *Broker) dispatchLoop() { defer b.wg.Done() for { select { case <-b.done: - // Broker is shutting down return case packet, ok := <-b.sourceChan: if !ok { - // Source channel has been closed, shut down the broker + // Source channel has been closed — run Close in a goroutine to avoid + // deadlocking (Close calls wg.Wait, but we are the goroutine in the wg). b.logger.Info("Source channel closed, shutting down broker") - b.Close() + go b.Close() return } - // Add packet to the cache b.cache.Add(packet) - - // Distribute the packet to all subscribers b.broadcast(packet) } } } -// broadcast sends a packet to all active subscribers without blocking +// broadcast sends a packet to all active subscribers without blocking. func (b *Broker) broadcast(packet *meshtreampb.Packet) { - // Take a read lock to get a snapshot of the subscribers b.subscriberMutex.RLock() subscribers := make([]chan *meshtreampb.Packet, 0, len(b.subscribers)) for ch := range b.subscribers { @@ -217,23 +364,17 @@ func (b *Broker) broadcast(packet *meshtreampb.Packet) { } b.subscriberMutex.RUnlock() - // Distribute to all subscribers for _, ch := range subscribers { - // Use a goroutine and recover to ensure sending to a closed channel doesn't panic go func(ch chan *meshtreampb.Packet) { defer func() { if r := recover(); r != nil { - // This can happen if the channel was closed after we took a snapshot b.logger.Warn("Recovered from panic in broadcast, channel likely closed") } }() - // Try to send without blocking select { case ch <- packet: - // Message delivered successfully default: - // Channel buffer is full, log warning and drop the message b.logger.Warn("Subscriber buffer full, dropping message") } }(ch) diff --git a/mqtt/broker_test.go b/mqtt/broker_test.go index 224b4f8..6babda5 100644 --- a/mqtt/broker_test.go +++ b/mqtt/broker_test.go @@ -1,6 +1,7 @@ package mqtt import ( + "os" "sync" "testing" "time" @@ -8,481 +9,555 @@ import ( "github.com/dpup/prefab/logging" meshtreampb "meshstream/generated/meshstream" + pb "meshstream/generated/meshtastic" ) -// TestCircularBuffer tests the circular buffer implementation -func TestCircularBuffer(t *testing.T) { - // Create a circular buffer with size 3 - buffer := NewCircularBuffer(3) +// TestMain disables age protection globally so pressure tests run without +// needing to advance mock clocks. Individual tests that specifically cover +// age-based behaviour set minEvictAge themselves. +func TestMain(m *testing.M) { + minEvictAge = 0 + os.Exit(m.Run()) +} - // Test empty buffer returns empty slice - packets := buffer.GetAll() - if len(packets) != 0 { - t.Errorf("Expected empty buffer to return empty slice, got %d items", len(packets)) +// pkt is a test helper that builds a Packet with the given ID, from-node, and port. +func pkt(id, from uint32, port pb.PortNum) *meshtreampb.Packet { + return &meshtreampb.Packet{ + Data: &meshtreampb.Data{Id: id, From: from, PortNum: port}, + Info: &meshtreampb.TopicInfo{}, } +} - // Add 3 packets and verify count - for i := 1; i <= 3; i++ { - packet := &meshtreampb.Packet{ - Data: &meshtreampb.Data{Id: uint32(i)}, - Info: &meshtreampb.TopicInfo{}, - } - buffer.Add(packet) +// ── NodeAwareCache unit tests ────────────────────────────────────────────────── + +func TestNodeAwareCacheEmpty(t *testing.T) { + c := NewNodeAwareCache(100, time.Hour) + if got := c.GetAll(); len(got) != 0 { + t.Errorf("expected empty, got %d packets", len(got)) } +} - // Check that buffer has 3 packets - packets = buffer.GetAll() - if len(packets) != 3 { - t.Errorf("Expected buffer to have 3 packets, got %d", len(packets)) +func TestNodeAwareCacheBasicOrdering(t *testing.T) { + c := NewNodeAwareCache(100, time.Hour) + c.Add(pkt(1, 1, pb.PortNum_NODEINFO_APP)) + c.Add(pkt(2, 2, pb.PortNum_NODEINFO_APP)) + c.Add(pkt(3, 3, pb.PortNum_NODEINFO_APP)) + + got := c.GetAll() + if len(got) != 3 { + t.Fatalf("expected 3, got %d", len(got)) } - - // Verify packets are in order - for i, packet := range packets { - expected := uint32(i + 1) - if packet.Data.Id != expected { - t.Errorf("Expected packet %d to have ID %d, got %d", i, expected, packet.Data.Id) - } - } - - // Add 2 more packets to test wrap-around - for i := 4; i <= 5; i++ { - packet := &meshtreampb.Packet{ - Data: &meshtreampb.Data{Id: uint32(i)}, - Info: &meshtreampb.TopicInfo{}, - } - buffer.Add(packet) - } - - // Check that buffer still has 3 packets (maxed out) - packets = buffer.GetAll() - if len(packets) != 3 { - t.Errorf("Expected buffer to have 3 packets after overflow, got %d", len(packets)) - } - - // Verify packets are the latest ones in order (3, 4, 5) - for i, packet := range packets { - expected := uint32(i + 3) - if packet.Data.Id != expected { - t.Errorf("Expected packet %d to have ID %d, got %d", i, expected, packet.Data.Id) + for i, p := range got { + if p.Data.Id != uint32(i+1) { + t.Errorf("pos %d: want ID %d, got %d", i, i+1, p.Data.Id) } } } -// TestBrokerSubscribeUnsubscribe tests the basic subscribe and unsubscribe functionality -func TestBrokerSubscribeUnsubscribe(t *testing.T) { - // Create a test source channel - sourceChan := make(chan *meshtreampb.Packet, 10) +// TestPressureEvictsOldestOfLowestPriority verifies that under cap pressure, the +// oldest packet of the lowest-priority type is evicted while higher-priority +// types are preserved. +func TestPressureEvictsOldestOfLowestPriority(t *testing.T) { + c := NewNodeAwareCache(3, time.Hour) - // Create a broker with the source channel - testLogger := logging.NewDevLogger().Named("test") - broker := NewBroker(sourceChan, 5, testLogger) + c.Add(pkt(1, 1, pb.PortNum_NODEINFO_APP)) // priority 2 + c.Add(pkt(2, 1, pb.PortNum_NEIGHBORINFO_APP)) // priority 4 + c.Add(pkt(3, 1, pb.PortNum_NODEINFO_APP)) // priority 2 — at cap + c.Add(pkt(4, 1, pb.PortNum_NODEINFO_APP)) // pressure: evicts oldest pri=2 (ID 1) + + got := c.GetAll() + if len(got) != 3 { + t.Fatalf("expected 3 (global cap), got %d: %v", len(got), ids(got)) + } + + wantIDs := []uint32{2, 3, 4} + for i, p := range got { + if p.Data.Id != wantIDs[i] { + t.Errorf("pos %d: want ID %d, got %d", i, wantIDs[i], p.Data.Id) + } + } +} + +// TestRareTypeOutlastsFrequentType is the core scenario: a node sends many +// node-info packets and one neighbor-info packet. Under cache pressure, the +// neighbor-info (higher priority) should survive while old node-infos are evicted. +func TestRareTypeOutlastsFrequentType(t *testing.T) { + // Cap=5 creates pressure when we add 13 packets. + c := NewNodeAwareCache(5, time.Hour) + + neighborID := uint32(50) + c.Add(pkt(neighborID, 1, pb.PortNum_NEIGHBORINFO_APP)) // priority 4 + for i := uint32(1); i <= 12; i++ { + c.Add(pkt(i, 1, pb.PortNum_NODEINFO_APP)) // priority 2 + } + + got := c.GetAll() + + var nodeInfoCount, neighborCount int + for _, p := range got { + switch p.Data.PortNum { + case pb.PortNum_NODEINFO_APP: + nodeInfoCount++ + case pb.PortNum_NEIGHBORINFO_APP: + neighborCount++ + } + } + + if len(got) != 5 { + t.Errorf("expected 5 packets (cap), got %d: %v", len(got), ids(got)) + } + if neighborCount != 1 { + t.Errorf("expected neighbor-info to survive, got %d", neighborCount) + } + if nodeInfoCount != 4 { + t.Errorf("expected 4 node-infos (cap-1), got %d", nodeInfoCount) + } +} + +// TestPressureIsGlobal verifies that eviction competes globally across all nodes: +// when the cap is hit, the oldest packet of the lowest priority is removed +// regardless of which node it came from. +func TestPressureIsGlobal(t *testing.T) { + c := NewNodeAwareCache(3, time.Hour) + + c.Add(pkt(1, 1, pb.PortNum_NODEINFO_APP)) // node 1 + c.Add(pkt(2, 2, pb.PortNum_NODEINFO_APP)) // node 2 + c.Add(pkt(3, 1, pb.PortNum_NODEINFO_APP)) // node 1 — at cap + c.Add(pkt(4, 2, pb.PortNum_NODEINFO_APP)) // node 2 — evicts globally oldest = ID 1 + + got := c.GetAll() + if len(got) != 3 { + t.Fatalf("expected 3, got %d: %v", len(got), ids(got)) + } + if got[0].Data.Id != 2 { + t.Errorf("expected oldest surviving ID 2, got %d", got[0].Data.Id) + } +} + +// TestBeladyApproximation verifies the node-aware eviction: among same-priority +// old packets, the one from the most recently active node is evicted first +// (it is most likely to resend, making its old packet cheapest to lose). +// A flaky node's old packet therefore outlives a reliable node's old packet. +func TestBeladyApproximation(t *testing.T) { + now := time.Unix(0, 0) + c := NewNodeAwareCache(3, 24*time.Hour) + c.nowFunc = func() time.Time { return now } + + // At T=0: both nodes send a node-info. + c.Add(pkt(1, 1, pb.PortNum_NODEINFO_APP)) // node F (flaky) + c.Add(pkt(2, 2, pb.PortNum_NODEINFO_APP)) // node R (reliable) + + // At T=90m: R sends again — it is now the most recently active node. + // F has been silent, so its T=0 packet is the "harder to replace" one. + now = time.Unix(int64(90*time.Minute/time.Second), 0) + c.Add(pkt(3, 2, pb.PortNum_NODEINFO_APP)) // R resends, cache now at cap=3 + + // At T=2h: pressure. Both node 1 and node 2 have old packets (age > minEvictAge=0). + // Among lowest-priority (both are NODEINFO), evict from node 2 (most recently active + // at T=90m) rather than node 1 (last seen T=0). + now = time.Unix(int64(2*time.Hour/time.Second), 0) + minEvictAge = time.Hour + t.Cleanup(func() { minEvictAge = 0 }) + + c.Add(pkt(4, 3, pb.PortNum_NODEINFO_APP)) // new node — triggers eviction + + got := c.GetAll() + var survivingNodes []uint32 + for _, p := range got { + survivingNodes = append(survivingNodes, p.Data.From) + } + + // Node 1's packet (ID 1, from flaky node) must survive. + flakySurvived := false + reliableOldSurvived := false + for _, p := range got { + if p.Data.Id == 1 { + flakySurvived = true + } + if p.Data.Id == 2 { // R's old T=0 packet + reliableOldSurvived = true + } + } + + if !flakySurvived { + t.Error("flaky node's only packet should survive (Bélády: it won't be resent soon)") + } + if reliableOldSurvived { + t.Error("reliable node's old packet should be evicted (node recently resent, cheapest to lose)") + } +} + +// TestMinAgeProtectsRecentPackets verifies that packets younger than minEvictAge +// are never evicted even under heavy cap pressure. +func TestMinAgeProtectsRecentPackets(t *testing.T) { + minEvictAge = time.Hour + t.Cleanup(func() { minEvictAge = 0 }) + + now := time.Unix(0, 0) + c := NewNodeAwareCache(3, 24*time.Hour) + c.nowFunc = func() time.Time { return now } + + // Three packets at T=0 fill the cache. + for i := uint32(1); i <= 3; i++ { + c.Add(pkt(i, i, pb.PortNum_NODEINFO_APP)) + } + + // T=30m: add a fourth packet. All are < 1h old — none are eligible for + // priority eviction. The cap must still be respected, so one is evicted + // via the fallback (all-entries pickEvictTarget). + now = time.Unix(int64(30*time.Minute/time.Second), 0) + c.Add(pkt(4, 4, pb.PortNum_NODEINFO_APP)) + + got := c.GetAll() + if len(got) != 3 { + t.Fatalf("expected cap=3 enforced even with all-recent entries, got %d", len(got)) + } +} + +// TestRetentionEvictsStaleNode verifies whole-node eviction after silence. +func TestRetentionEvictsStaleNode(t *testing.T) { + now := time.Now() + c := NewNodeAwareCache(1000, 3*time.Hour) + c.nowFunc = func() time.Time { return now } + + c.Add(pkt(1, 1, pb.PortNum_NODEINFO_APP)) + c.Add(pkt(2, 1, pb.PortNum_NODEINFO_APP)) + + // Node 2 is active after node 1 has gone silent. + c.nowFunc = func() time.Time { return now.Add(1 * time.Minute) } + c.Add(pkt(3, 2, pb.PortNum_NODEINFO_APP)) + + // Past node 1's retention window. + c.nowFunc = func() time.Time { return now.Add(3*time.Hour + 1*time.Second) } + + got := c.GetAll() + if len(got) != 1 || got[0].Data.Id != 3 { + t.Errorf("expected only node 2's packet (ID 3), got %v", ids(got)) + } +} + +// TestRetentionExtendsOnNewPacket verifies that a new packet resets the retention clock. +func TestRetentionExtendsOnNewPacket(t *testing.T) { + now := time.Now() + c := NewNodeAwareCache(1000, 3*time.Hour) + c.nowFunc = func() time.Time { return now } + + c.Add(pkt(1, 1, pb.PortNum_NODEINFO_APP)) + + // Node 1 sends again at t=2h, refreshing its window. + c.nowFunc = func() time.Time { return now.Add(2 * time.Hour) } + c.Add(pkt(2, 1, pb.PortNum_NODEINFO_APP)) + + // t=3h+1s — past first packet's original window, but node refreshed at 2h. + c.nowFunc = func() time.Time { return now.Add(3*time.Hour + 1*time.Second) } + + got := c.GetAll() + if len(got) != 2 { + t.Errorf("expected both packets retained (node still active), got %d", len(got)) + } +} + +// TestNoSourcePacketsAlwaysIncluded verifies from=0 packets bypass node retention. +func TestNoSourcePacketsAlwaysIncluded(t *testing.T) { + now := time.Now() + c := NewNodeAwareCache(1000, time.Hour) + c.nowFunc = func() time.Time { return now } + + c.Add(pkt(1, 0, pb.PortNum_NODEINFO_APP)) + + c.nowFunc = func() time.Time { return now.Add(48 * time.Hour) } + + got := c.GetAll() + if len(got) != 1 { + t.Errorf("expected from=0 packet to always be included, got %d", len(got)) + } +} + +// TestGlobalCapSamePriorityFIFO verifies that when all packets share the same +// priority, the oldest is evicted first (FIFO behaviour within a priority tier). +func TestGlobalCapSamePriorityFIFO(t *testing.T) { + c := NewNodeAwareCache(3, time.Hour) + + // All from=0 packets of the same type — same priority, pure FIFO. + c.Add(pkt(1, 0, pb.PortNum_NODEINFO_APP)) + c.Add(pkt(2, 0, pb.PortNum_NODEINFO_APP)) + c.Add(pkt(3, 0, pb.PortNum_NODEINFO_APP)) + c.Add(pkt(4, 0, pb.PortNum_NODEINFO_APP)) // should push out ID 1 + + got := c.GetAll() + if len(got) != 3 { + t.Fatalf("expected 3 (global cap), got %d", len(got)) + } + if got[0].Data.Id != 2 { + t.Errorf("expected oldest surviving ID 2, got %d", got[0].Data.Id) + } +} + +// ids extracts packet IDs for readable failure messages. +func ids(packets []*meshtreampb.Packet) []uint32 { + out := make([]uint32, len(packets)) + for i, p := range packets { + out[i] = p.Data.Id + } + return out +} + +// ── Broker integration tests ─────────────────────────────────────────────────── + +func newTestBroker(sourceChan chan *meshtreampb.Packet, cacheSize int) *Broker { + return NewBroker(sourceChan, cacheSize, time.Hour, logging.NewDevLogger().Named("test")) +} + +func TestBrokerSubscribeUnsubscribe(t *testing.T) { + sourceChan := make(chan *meshtreampb.Packet, 10) + broker := newTestBroker(sourceChan, 5) defer broker.Close() - // Subscribe to the broker subscriber1 := broker.Subscribe(5) subscriber2 := broker.Subscribe(5) - // Keep track of the internal broker state for testing broker.subscriberMutex.RLock() - subscriberCount := len(broker.subscribers) + count := len(broker.subscribers) broker.subscriberMutex.RUnlock() - if subscriberCount != 2 { - t.Errorf("Expected 2 subscribers, got %d", subscriberCount) + if count != 2 { + t.Errorf("expected 2 subscribers, got %d", count) } - // We need to use sequential packets because our implementation is asynchronous - // and exact packet matching may not work reliably - - // First packet with ID 1 - packet1 := &meshtreampb.Packet{ - Data: &meshtreampb.Data{Id: 1}, - Info: &meshtreampb.TopicInfo{}, - } - - // Send the packet - sourceChan <- packet1 - - // Both subscribers should receive the packet - select { - case received := <-subscriber1: - if received.Data.Id != 1 { - t.Errorf("Expected subscriber1 to receive packet with ID 1, got %d", received.Data.Id) - } - case <-time.After(100 * time.Millisecond): - t.Error("subscriber1 didn't receive packet within timeout") - } + sourceChan <- pkt(1, 0, pb.PortNum_UNKNOWN_APP) select { - case received := <-subscriber2: - if received.Data.Id != 1 { - t.Errorf("Expected subscriber2 to receive packet with ID 1, got %d", received.Data.Id) + case p := <-subscriber1: + if p.Data.Id != 1 { + t.Errorf("subscriber1: want ID 1, got %d", p.Data.Id) } case <-time.After(100 * time.Millisecond): - t.Error("subscriber2 didn't receive packet within timeout") + t.Error("subscriber1 timed out") + } + select { + case p := <-subscriber2: + if p.Data.Id != 1 { + t.Errorf("subscriber2: want ID 1, got %d", p.Data.Id) + } + case <-time.After(100 * time.Millisecond): + t.Error("subscriber2 timed out") } - // Unsubscribe the first subscriber broker.Unsubscribe(subscriber1) - // Verify the subscriber was removed broker.subscriberMutex.RLock() - subscriberCount = len(broker.subscribers) + count = len(broker.subscribers) broker.subscriberMutex.RUnlock() - - if subscriberCount != 1 { - t.Errorf("Expected 1 subscriber after unsubscribe, got %d", subscriberCount) + if count != 1 { + t.Errorf("expected 1 subscriber after unsubscribe, got %d", count) } - // Second packet with ID 2 - packet2 := &meshtreampb.Packet{ - Data: &meshtreampb.Data{Id: 2}, - Info: &meshtreampb.TopicInfo{}, - } - - // Send the second packet - sourceChan <- packet2 - - // The second subscriber should receive the packet + sourceChan <- pkt(2, 0, pb.PortNum_UNKNOWN_APP) select { - case received := <-subscriber2: - if received.Data.Id != 2 { - t.Errorf("Expected subscriber2 to receive packet with ID 2, got %d", received.Data.Id) + case p := <-subscriber2: + if p.Data.Id != 2 { + t.Errorf("subscriber2: want ID 2, got %d", p.Data.Id) } case <-time.After(100 * time.Millisecond): - t.Error("subscriber2 didn't receive second packet within timeout") + t.Error("subscriber2 timed out on second packet") } } -// TestBrokerMultipleSubscribers tests broadcasting to many subscribers func TestBrokerMultipleSubscribers(t *testing.T) { - // Create a test source channel sourceChan := make(chan *meshtreampb.Packet, 10) - - // Create a broker with the source channel - testLogger := logging.NewDevLogger().Named("test") - broker := NewBroker(sourceChan, 5, testLogger) + broker := newTestBroker(sourceChan, 10) defer broker.Close() - // Create multiple subscribers - const numSubscribers = 10 - subscribers := make([]<-chan *meshtreampb.Packet, numSubscribers) - for i := 0; i < numSubscribers; i++ { - subscribers[i] = broker.Subscribe(5) + const n = 10 + subs := make([]<-chan *meshtreampb.Packet, n) + for i := range subs { + subs[i] = broker.Subscribe(5) } - // Send a test packet with ID 42 - testPacket := &meshtreampb.Packet{ - Data: &meshtreampb.Data{Id: 42}, - Info: &meshtreampb.TopicInfo{}, - } - sourceChan <- testPacket + sourceChan <- pkt(42, 0, pb.PortNum_UNKNOWN_APP) - // All subscribers should receive the packet var wg sync.WaitGroup - wg.Add(numSubscribers) - - for i, subscriber := range subscribers { - go func(idx int, ch <-chan *meshtreampb.Packet) { + wg.Add(n) + for i, ch := range subs { + go func(idx int, c <-chan *meshtreampb.Packet) { defer wg.Done() select { - case received := <-ch: - if received.Data.Id != 42 { - t.Errorf("subscriber %d expected packet ID 42, got %d", idx, received.Data.Id) + case p := <-c: + if p.Data.Id != 42 { + t.Errorf("sub %d: want 42, got %d", idx, p.Data.Id) } case <-time.After(100 * time.Millisecond): - t.Errorf("subscriber %d didn't receive packet within timeout", idx) + t.Errorf("sub %d timed out", idx) } - }(i, subscriber) + }(i, ch) } - - // Wait for all goroutines to complete wg.Wait() } -// TestBrokerSlowSubscriber tests that a slow subscriber doesn't block others func TestBrokerSlowSubscriber(t *testing.T) { - // Create a test source channel sourceChan := make(chan *meshtreampb.Packet, 10) - - // Create a broker with the source channel - testLogger := logging.NewDevLogger().Named("test") - broker := NewBroker(sourceChan, 5, testLogger) + broker := newTestBroker(sourceChan, 10) defer broker.Close() - // Create a slow subscriber with buffer size 1 - slowSubscriber := broker.Subscribe(1) + slow := broker.Subscribe(1) + fast := broker.Subscribe(5) - // And a normal subscriber - normalSubscriber := broker.Subscribe(5) - - // Verify we have two subscribers - broker.subscriberMutex.RLock() - subscriberCount := len(broker.subscribers) - broker.subscriberMutex.RUnlock() - - if subscriberCount != 2 { - t.Errorf("Expected 2 subscribers, got %d", subscriberCount) - } - - // Send two packets quickly to fill the slow subscriber's buffer - testPacket1 := &meshtreampb.Packet{ - Data: &meshtreampb.Data{Id: 101}, - Info: &meshtreampb.TopicInfo{}, - } - testPacket2 := &meshtreampb.Packet{ - Data: &meshtreampb.Data{Id: 102}, - Info: &meshtreampb.TopicInfo{}, - } - - sourceChan <- testPacket1 - - // Give the broker time to distribute the first packet + sourceChan <- pkt(101, 0, pb.PortNum_UNKNOWN_APP) time.Sleep(10 * time.Millisecond) + sourceChan <- pkt(102, 0, pb.PortNum_UNKNOWN_APP) - sourceChan <- testPacket2 - - // The normal subscriber should receive both packets - select { - case received := <-normalSubscriber: - if received.Data.Id != 101 { - t.Errorf("normalSubscriber expected packet ID 101, got %d", received.Data.Id) + for _, want := range []uint32{101, 102} { + select { + case p := <-fast: + if p.Data.Id != want { + t.Errorf("fast: want %d, got %d", want, p.Data.Id) + } + case <-time.After(100 * time.Millisecond): + t.Errorf("fast: timed out waiting for %d", want) } - case <-time.After(100 * time.Millisecond): - t.Error("normalSubscriber didn't receive first packet within timeout") } - select { - case received := <-normalSubscriber: - if received.Data.Id != 102 { - t.Errorf("normalSubscriber expected packet ID 102, got %d", received.Data.Id) + case p := <-slow: + if p.Data.Id != 101 { + t.Errorf("slow: want 101, got %d", p.Data.Id) } case <-time.After(100 * time.Millisecond): - t.Error("normalSubscriber didn't receive second packet within timeout") - } - - // The slow subscriber should receive at least the first packet - select { - case received := <-slowSubscriber: - if received.Data.Id != 101 { - t.Errorf("slowSubscriber expected packet ID 101, got %d", received.Data.Id) - } - case <-time.After(100 * time.Millisecond): - t.Error("slowSubscriber didn't receive first packet within timeout") + t.Error("slow: timed out") } } -// TestBrokerCloseWithSubscribers tests closing the broker with active subscribers func TestBrokerCloseWithSubscribers(t *testing.T) { - // Create a test source channel sourceChan := make(chan *meshtreampb.Packet, 10) + broker := newTestBroker(sourceChan, 5) - // Create a broker with the source channel - testLogger := logging.NewDevLogger().Named("test") - broker := NewBroker(sourceChan, 5, testLogger) - - // Subscribe to the broker - subscriber := broker.Subscribe(5) - - // Verify we have one subscriber - broker.subscriberMutex.RLock() - subscriberCount := len(broker.subscribers) - broker.subscriberMutex.RUnlock() - - if subscriberCount != 1 { - t.Errorf("Expected 1 subscriber, got %d", subscriberCount) - } - - // Close the broker - this should close all subscriber channels + sub := broker.Subscribe(5) broker.Close() - // Trying to read from the subscriber channel should not block - // since it should be closed select { - case _, ok := <-subscriber: + case _, ok := <-sub: if ok { - t.Error("Expected subscriber channel to be closed") + t.Error("expected channel to be closed") } case <-time.After(100 * time.Millisecond): - t.Error("Subscriber channel should be closed but isn't") + t.Error("channel not closed") } } -// TestBrokerPacketCaching tests that the broker caches packets func TestBrokerPacketCaching(t *testing.T) { - // Create a test source channel sourceChan := make(chan *meshtreampb.Packet, 10) - - // Create a broker with a small cache size - testLogger := logging.NewDevLogger().Named("test") - broker := NewBroker(sourceChan, 3, testLogger) + broker := newTestBroker(sourceChan, 100) defer broker.Close() - // Send three packets - for i := 1; i <= 3; i++ { - packet := &meshtreampb.Packet{ - Data: &meshtreampb.Data{Id: uint32(i)}, - Info: &meshtreampb.TopicInfo{}, - } - sourceChan <- packet - - // Give the broker time to process the packet + for i := uint32(1); i <= 3; i++ { + sourceChan <- pkt(i, i, pb.PortNum_NODEINFO_APP) time.Sleep(10 * time.Millisecond) } - // Create a subscriber after the packets were sent - subscriber := broker.Subscribe(5) - - // The subscriber should receive all three cached packets - receivedIds := make([]uint32, 0, 3) - - // We need to receive 3 packets + sub := broker.Subscribe(10) + var received []uint32 for i := 0; i < 3; i++ { select { - case received := <-subscriber: - receivedIds = append(receivedIds, received.Data.Id) - case <-time.After(100 * time.Millisecond): - t.Errorf("Subscriber didn't receive cached packet %d within timeout", i+1) + case p := <-sub: + received = append(received, p.Data.Id) + case <-time.After(200 * time.Millisecond): + t.Fatalf("timed out waiting for packet %d", i+1) } } - - // Check that we received all packets in the correct order - if len(receivedIds) != 3 { - t.Errorf("Expected to receive 3 packets, got %d", len(receivedIds)) - } else { - for i, id := range receivedIds { - if id != uint32(i+1) { - t.Errorf("Expected packet %d to have ID %d, got %d", i, i+1, id) - } + for i, id := range received { + if id != uint32(i+1) { + t.Errorf("pos %d: want %d, got %d", i, i+1, id) } } } -// TestBrokerCacheOverflow tests that the broker correctly handles cache overflow -func TestBrokerCacheOverflow(t *testing.T) { - // Create a test source channel - sourceChan := make(chan *meshtreampb.Packet, 10) - - // Create a broker with a small cache size - testLogger := logging.NewDevLogger().Named("test") - cacheSize := 3 - broker := NewBroker(sourceChan, cacheSize, testLogger) +// TestBrokerPriorityEvictionOnReplay verifies that when a subscriber joins after +// mixed packets have been received, the cached set reflects priority eviction: +// high-priority types survive while low-priority types are trimmed. +func TestBrokerPriorityEvictionOnReplay(t *testing.T) { + sourceChan := make(chan *meshtreampb.Packet, 20) + // Tight cache: cap=5 creates pressure with 9 packets. + broker := NewBroker(sourceChan, 5, time.Hour, logging.NewDevLogger().Named("test")) defer broker.Close() - // Send 5 packets (exceeding the cache size) - for i := 1; i <= 5; i++ { - packet := &meshtreampb.Packet{ - Data: &meshtreampb.Data{Id: uint32(i)}, - Info: &meshtreampb.TopicInfo{}, - } - sourceChan <- packet - - // Give the broker time to process the packet - time.Sleep(10 * time.Millisecond) + // 1 neighbor-info (priority 4) + 8 node-infos (priority 2). + sourceChan <- pkt(99, 1, pb.PortNum_NEIGHBORINFO_APP) + for i := 1; i <= 8; i++ { + sourceChan <- pkt(uint32(i), 1, pb.PortNum_NODEINFO_APP) + time.Sleep(5 * time.Millisecond) } + time.Sleep(20 * time.Millisecond) - // Create a subscriber after the packets were sent - subscriber := broker.Subscribe(5) - - // The subscriber should receive the last 3 packets (3, 4, 5) - receivedIds := make([]uint32, 0, cacheSize) - - // We expect to receive exactly cacheSize packets - for i := 0; i < cacheSize; i++ { + sub := broker.Subscribe(20) + var nodeInfoCount, neighborCount int + for i := 0; i < 5; i++ { // expect exactly cap=5 packets select { - case received := <-subscriber: - receivedIds = append(receivedIds, received.Data.Id) - case <-time.After(100 * time.Millisecond): - t.Errorf("Subscriber didn't receive cached packet %d within timeout", i+1) - } - } - - // Verify no more packets are coming - select { - case received := <-subscriber: - t.Errorf("Received unexpected packet with ID %d", received.Data.Id) - case <-time.After(50 * time.Millisecond): - // This is expected, no more packets should be received - } - - // Check that we received only the last 3 packets in the correct order - expectedIds := []uint32{3, 4, 5} - if len(receivedIds) != len(expectedIds) { - t.Errorf("Expected to receive %d packets, got %d", len(expectedIds), len(receivedIds)) - } else { - for i, id := range receivedIds { - if id != expectedIds[i] { - t.Errorf("Expected packet %d to have ID %d, got %d", i, expectedIds[i], id) + case p := <-sub: + switch p.Data.PortNum { + case pb.PortNum_NODEINFO_APP: + nodeInfoCount++ + case pb.PortNum_NEIGHBORINFO_APP: + neighborCount++ } + case <-time.After(200 * time.Millisecond): + t.Fatalf("timed out waiting for cached packet %d", i+1) } } + + if neighborCount != 1 { + t.Errorf("expected neighbor-info to survive in cache, got %d", neighborCount) + } + if nodeInfoCount != 4 { + t.Errorf("expected 4 node-infos (cap-1), got %d", nodeInfoCount) + } } -// TestSubscriberBufferFull tests the behavior when a subscriber's buffer is full +// TestSubscriberBufferFull verifies that when cache replay times out due to a +// full subscriber buffer, live packets can still be delivered once there is room. func TestSubscriberBufferFull(t *testing.T) { - cacheGracePeriod = 300 + orig := cacheGracePeriod + cacheGracePeriod = time.Millisecond // short enough to time out on packet 2+ + t.Cleanup(func() { cacheGracePeriod = orig }) - // Create a test source channel sourceChan := make(chan *meshtreampb.Packet, 10) - - // Create a broker with a cache size of 5 - testLogger := logging.NewDevLogger().Named("test") - broker := NewBroker(sourceChan, 5, testLogger) + broker := newTestBroker(sourceChan, 100) defer broker.Close() - // Prefill the cache with 5 packets - for i := 1; i <= 5; i++ { - packet := &meshtreampb.Packet{ - Data: &meshtreampb.Data{Id: uint32(i)}, - Info: &meshtreampb.TopicInfo{}, - } - sourceChan <- packet + for i := uint32(1); i <= 5; i++ { + sourceChan <- pkt(i, i, pb.PortNum_NODEINFO_APP) time.Sleep(10 * time.Millisecond) } - // Create a subscriber with a very small buffer size (1) - smallSubscriber := broker.Subscribe(1) + small := broker.Subscribe(1) - // The small subscriber should receive at least one cached packet - // The others will be dropped because the buffer is full + // Read the first cached packet to confirm replay started. select { - case received := <-smallSubscriber: - if received.Data.Id != 1 { - t.Errorf("Expected subscriber to receive packet with ID 1, got %d", received.Data.Id) - } + case <-small: case <-time.After(500 * time.Millisecond): - t.Error("Subscriber didn't receive any cached packet within timeout") + t.Error("timed out waiting for first cached packet") + return } - // Check that no more packets are immediately available - // This is a bit tricky to test since we can't guarantee how many - // packets were dropped due to the full buffer, but we can check - // that the channel isn't immediately ready with more packets - select { - case received := <-smallSubscriber: - // If we get here, it should be a later packet from the cache - if received.Data.Id < 1 { - t.Errorf("Received unexpected packet with ID %d", received.Data.Id) + // Wait for the cache goroutine to finish (timeout = 1 ms; 10 ms is ample). + time.Sleep(10 * time.Millisecond) + + // Drain any residual cached packets that squeezed in before the timeout. + drained := false + for !drained { + select { + case <-small: + default: + drained = true } - case <-time.After(50 * time.Millisecond): - // This is also acceptable - it means all attempts to send more cached - // packets found the buffer full and gave up } - // Send a new packet now that the subscriber is connected - newPacket := &meshtreampb.Packet{ - Data: &meshtreampb.Data{Id: 6}, - Info: &meshtreampb.TopicInfo{}, - } - sourceChan <- newPacket - - // The subscriber should receive this packet if they read the first one + // Buffer is now empty — a new live packet should be deliverable. + sourceChan <- pkt(99, 99, pb.PortNum_NODEINFO_APP) select { - case received := <-smallSubscriber: - if received.Data.Id != 6 { - t.Errorf("Expected subscriber to receive packet with ID 6, got %d", received.Data.Id) + case p := <-small: + if p.Data.Id != 99 { + t.Errorf("want live packet ID 99, got %d", p.Data.Id) } case <-time.After(100 * time.Millisecond): - t.Error("Subscriber didn't receive new packet within timeout") + t.Error("timed out waiting for live packet") } } diff --git a/server/server.go b/server/server.go index ad4f240..c1cca51 100644 --- a/server/server.go +++ b/server/server.go @@ -27,6 +27,7 @@ type Config struct { MQTTTopicPath string // MQTT topic path being subscribed to StaticDir string // Directory containing static web files ChannelKeys []string // Channel keys for decryption + AllowedOrigin string // CORS allowed origin; defaults to "*" (public stream) } // Create connection info JSON to send to the client @@ -67,6 +68,16 @@ func New(config Config) *Server { } } +// securityHeaders wraps a handler to add common HTTP security headers. +func securityHeaders(next http.HandlerFunc) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("X-Content-Type-Options", "nosniff") + w.Header().Set("X-Frame-Options", "DENY") + w.Header().Set("Referrer-Policy", "strict-origin-when-cross-origin") + next(w, r) + } +} + // Start initializes and starts the web server func (s *Server) Start() error { // Get port as integer @@ -83,8 +94,8 @@ func (s *Server) Start() error { prefab.WithContext(baseCtx), prefab.WithHost(s.config.Host), prefab.WithPort(port), - prefab.WithHTTPHandlerFunc("/api/status", s.handleStatus), - prefab.WithHTTPHandlerFunc("/api/stream", s.handleStream), + prefab.WithHTTPHandlerFunc("/api/status", securityHeaders(s.handleStatus)), + prefab.WithHTTPHandlerFunc("/api/stream", securityHeaders(s.handleStream)), prefab.WithStaticFiles("/assets/", s.config.StaticDir), prefab.WithHTTPHandlerFunc("/", s.fallbackHandler), ) @@ -176,10 +187,14 @@ func (s *Server) handleStream(w http.ResponseWriter, r *http.Request) { } // Set headers for SSE + allowedOrigin := s.config.AllowedOrigin + if allowedOrigin == "" { + allowedOrigin = "*" + } w.Header().Set("Content-Type", "text/event-stream") w.Header().Set("Cache-Control", "no-cache") w.Header().Set("Connection", "keep-alive") - w.Header().Set("Access-Control-Allow-Origin", "*") + w.Header().Set("Access-Control-Allow-Origin", allowedOrigin) // Make sure that the writer supports flushing flusher, ok := w.(http.Flusher) diff --git a/todos/001-complete-p1-xss-infowindow-node-names.md b/todos/001-complete-p1-xss-infowindow-node-names.md deleted file mode 100644 index 08b89d1..0000000 --- a/todos/001-complete-p1-xss-infowindow-node-names.md +++ /dev/null @@ -1,66 +0,0 @@ ---- -status: complete -priority: p1 -issue_id: "001" -tags: [code-review, security, xss, frontend] -dependencies: [] ---- - -# XSS via Node Names in Google Maps InfoWindow - -## Problem Statement - -`showInfoWindow` in `NetworkMap.tsx` builds an HTML string using `nodeName` (sourced from `node.longName || node.shortName`, which comes from MQTT packets) and passes it to `infoWindowRef.current.setContent(infoContent)`. Google Maps `InfoWindow.setContent` renders its argument as live HTML. A Meshtastic device with a `longName` of `` will execute arbitrary JavaScript in every viewer's browser. - -Attack path: malicious device broadcasts NodeInfo on public MQTT → Go decoder stores it faithfully → SSE streams to browser → Redux stores verbatim → map renders with no sanitization. - -## Findings - -- **File:** `web/src/components/dashboard/NetworkMap.tsx`, lines 431-451 -- `nodeName` is interpolated raw into an HTML template literal -- `infoWindowRef.current.setContent(infoContent)` renders the HTML directly -- All SVG marker templates on lines 344 and 390 use the same unsafe pattern (lower risk currently since values are internally computed, but should be normalized) -- No sanitization at any layer (decoder, SSE, Redux, component) - -## Proposed Solutions - -### Option A: DOM construction with .textContent (Recommended) -Replace the HTML template literal with `document.createElement` tree. Use `.textContent` for all data-derived values. -- **Pros:** Completely eliminates XSS, no extra dependency, idiomatic -- **Cons:** More verbose than template literal -- **Effort:** Small -- **Risk:** Low - -### Option B: Pass Element directly to setContent -Build a `div` element using DOM APIs and pass the element object to `setContent` (which accepts `Element | string`). -- **Pros:** Clean, no string HTML at all -- **Cons:** Slightly more DOM manipulation code -- **Effort:** Small -- **Risk:** Low - -### Option C: Add DOMPurify sanitization -Run `nodeName` through `DOMPurify.sanitize()` before interpolation. -- **Pros:** Minimal code change -- **Cons:** Adds a dependency; still uses HTML string pattern; sanitizers can be bypassed -- **Effort:** Small -- **Risk:** Medium (sanitizer bypass potential) - -## Recommended Action - -_Use Option A — DOM construction. It eliminates the vulnerability class entirely rather than mitigating it._ - -## Technical Details - -- **Affected files:** `web/src/components/dashboard/NetworkMap.tsx` -- **Function:** `showInfoWindow` (line ~412) -- **Data origin:** `data.nodeInfo.longName` / `data.nodeInfo.shortName` from MQTT protobuf - -## Acceptance Criteria - -- [ ] `showInfoWindow` constructs InfoWindow content using DOM APIs, not HTML strings -- [ ] No user-supplied string data is interpolated into HTML template literals in NetworkMap.tsx -- [ ] A node with `longName = ""` renders safely as literal text in the info window - -## Work Log - -- 2026-03-15: Identified by security-sentinel and architecture-strategist review agents diff --git a/todos/002-pending-p1-hardcoded-mqtt-credentials.md b/todos/002-pending-p1-hardcoded-mqtt-credentials.md deleted file mode 100644 index 6f7c8f8..0000000 --- a/todos/002-pending-p1-hardcoded-mqtt-credentials.md +++ /dev/null @@ -1,54 +0,0 @@ ---- -status: pending -priority: p1 -issue_id: "002" -tags: [code-review, security, credentials, configuration] -dependencies: [] ---- - -# Hardcoded MQTT Credentials in Source Code - -## Problem Statement - -`meshdev` / `large4cats` are committed as default flag values in `main.go`. Anyone reading the source or forking the repo has these credentials. Operators who deploy without setting environment variables silently use these known-public credentials, believing they have a private configuration. - -## Findings - -- **File:** `main.go`, lines 73-74 -- `getEnv("MQTT_USERNAME", "meshdev")` and `getEnv("MQTT_PASSWORD", "large4cats")` expose credentials in source -- Credentials appear in `--help` output and any build artifact -- While these are community broker credentials (not a private service), they are real auth credentials enabling broker access - -## Proposed Solutions - -### Option A: Empty defaults + startup validation (Recommended) -Use `""` as the default for both fields. Add a startup check that logs a warning (or hard-fails if a `--require-auth` flag is set) when credentials are empty. -- **Pros:** Forces operators to be explicit; no credentials in source -- **Effort:** Small -- **Risk:** Low (may break zero-config dev setups) - -### Option B: Document-only approach -Keep defaults but add a prominent comment and README warning. -- **Pros:** Zero code change -- **Cons:** Credentials still in source; doesn't protect production deployments -- **Effort:** Trivial -- **Risk:** Medium - -## Recommended Action - -_Option A. At minimum remove the password default; a username default of "meshdev" is less sensitive but should also go._ - -## Technical Details - -- **Affected file:** `main.go`, lines 73-74 -- **Related:** `MQTT_USERNAME` / `MQTT_PASSWORD` env vars already supported — just remove the hardcoded fallback values - -## Acceptance Criteria - -- [ ] No credential values appear in source code as string literals -- [ ] Server starts cleanly with credentials provided via env vars -- [ ] Server logs a clear message when credentials are not configured - -## Work Log - -- 2026-03-15: Identified by security-sentinel review agent diff --git a/todos/003-complete-p1-processedpackets-unbounded-growth.md b/todos/003-complete-p1-processedpackets-unbounded-growth.md deleted file mode 100644 index a306ab4..0000000 --- a/todos/003-complete-p1-processedpackets-unbounded-growth.md +++ /dev/null @@ -1,64 +0,0 @@ ---- -status: complete -priority: p1 -issue_id: "003" -tags: [code-review, performance, memory, frontend, redux] -dependencies: [] ---- - -# Unbounded Memory Growth in processedPackets and seenPackets - -## Problem Statement - -`aggregatorSlice.processedPackets` and `packetSlice.seenPackets` are `Record` maps that grow forever. Every processed packet adds an entry and nothing ever removes them. A session receiving modest Meshtastic traffic (~1 packet/sec) accumulates tens of thousands of keys over hours. Both maps serialize on every Redux state snapshot, are included in DevTools, and will be included in any future persistence layer. - -`processedPackets` is P1 because it's in the hot aggregator path queried on every packet; `seenPackets` (in packetSlice) has the same issue at P2 because it's less frequently queried. - -## Findings - -- **File:** `web/src/store/slices/aggregatorSlice.ts`, lines 76, 115 -- `processedPackets[packetKey] = true` written on every packet, never pruned -- No eviction path; `clearAggregatedData` resets it but requires explicit user action -- **File:** `web/src/store/slices/packetSlice.ts`, lines ~13, ~78-79 -- `seenPackets` same pattern; `packets` array is trimmed to 5000 but map is not - -## Proposed Solutions - -### Option A: Time-based eviction with timestamp map (Recommended) -Change `processedPackets: Record` to `processedPackets: Record` (value = timestamp). On each `processNewPacket` call, prune entries older than a TTL (e.g., 24h). Apply the same pattern to `seenPackets`. -- **Pros:** Consistent with topologySlice TTL pattern already in the spec; self-healing -- **Effort:** Small -- **Risk:** Low - -### Option B: Fixed-size LRU cap -Keep only the most recent N packet keys (e.g., 10000). When the cap is exceeded, drop the oldest. Use an insertion-ordered structure or a separate ordered array. -- **Pros:** Hard memory bound -- **Cons:** More complex; need to track insertion order -- **Effort:** Medium -- **Risk:** Low - -### Option C: Rely on clearAggregatedData -Document that operators should reload periodically; no code change. -- **Pros:** Zero effort -- **Cons:** Doesn't fix the problem; long-running sessions still leak -- **Risk:** High (ongoing) - -## Recommended Action - -_Option A — add timestamps to both maps and prune during packet processing. The topologySlice will use the same pattern, so this creates consistency._ - -## Technical Details - -- **Affected files:** `web/src/store/slices/aggregatorSlice.ts`, `web/src/store/slices/packetSlice.ts` -- Pattern: `Record` where value is unix timestamp; prune `now - value > TTL` at start of each dispatch - -## Acceptance Criteria - -- [ ] `processedPackets` entries older than 24h are pruned during packet processing -- [ ] `seenPackets` entries are similarly bounded -- [ ] Deduplication still works correctly after pruning (no duplicate packets in ~24h window) -- [ ] Memory usage stabilizes after hours of packet ingestion - -## Work Log - -- 2026-03-15: Identified by architecture-strategist review agent diff --git a/todos/004-complete-p2-sse-http-error-after-headers.md b/todos/004-complete-p2-sse-http-error-after-headers.md deleted file mode 100644 index 1191966..0000000 --- a/todos/004-complete-p2-sse-http-error-after-headers.md +++ /dev/null @@ -1,51 +0,0 @@ ---- -status: complete -priority: p2 -issue_id: "004" -tags: [code-review, backend, sse, go, protocol] -dependencies: [] ---- - -# http.Error Called After SSE Headers Already Sent - -## Problem Statement - -In `server.go`, `http.Error()` is called in the client-disconnect, shutdown, and channel-closed cases after `w.WriteHeader(http.StatusOK)` and SSE headers have already been written. This produces `superfluous response.WriteHeader call` log noise and, more critically, writes HTTP error text into the SSE event stream body, corrupting the stream framing for any still-connected client. - -## Findings - -- **File:** `server/server.go`, lines 236-240, 245-249, 262-264 -- `w.WriteHeader(http.StatusOK)` + SSE headers written at line ~203 -- `http.Error()` calls in `<-notify`, `<-s.shutdown`, and `!ok` cases write to an already-open response body -- In the client-disconnect case this is harmless (connection is dead) -- In the `!ok` channel case the client may still be alive and receives garbled data - -## Proposed Solutions - -### Option A: Replace http.Error with plain return (Recommended) -In all three cases, simply `return` after logging. The SSE client handles reconnection automatically; no error response is needed on the already-open stream. -- **Effort:** Small -- **Risk:** Low - -### Option B: Flush a `data: error\n\n` SSE event before closing -Send a proper SSE event indicating shutdown, then return without calling `http.Error`. -- **Effort:** Small -- **Risk:** Low (slightly more client-visible information) - -## Recommended Action - -_Option A — just return. SSE clients reconnect automatically._ - -## Technical Details - -- **Affected file:** `server/server.go` - -## Acceptance Criteria - -- [ ] No `http.Error` calls after SSE headers have been written -- [ ] No `superfluous response.WriteHeader call` log warnings during normal client disconnect -- [ ] SSE stream body is never corrupted with HTTP error text - -## Work Log - -- 2026-03-15: Identified by architecture-strategist review agent diff --git a/todos/005-pending-p2-broker-shutdown-deadlock.md b/todos/005-pending-p2-broker-shutdown-deadlock.md deleted file mode 100644 index 66ebde4..0000000 --- a/todos/005-pending-p2-broker-shutdown-deadlock.md +++ /dev/null @@ -1,58 +0,0 @@ ---- -status: pending -priority: p2 -issue_id: "005" -tags: [code-review, backend, concurrency, go, mqtt] -dependencies: [] ---- - -# Broker dispatchLoop Self-Deadlock on Source Channel Close - -## Problem Statement - -When the MQTT source channel closes, `dispatchLoop` calls `b.Close()`. `Close` calls `b.wg.Wait()`. But `dispatchLoop` is running in the goroutine counted by `b.wg`, so `wg.Wait()` will never return — deadlock. The current shutdown order in `main.go` avoids this path, but the ordering dependency is implicit and undocumented. A future refactor inverting shutdown order would deadlock silently. - -## Findings - -- **File:** `mqtt/broker.go`, lines 193-199 -- `dispatchLoop` calls `b.Close()` when source channel closes -- `Close` calls `b.wg.Wait()`, waiting for `dispatchLoop` to finish -- `dispatchLoop` is blocked inside `Close` → deadlock -- Current `main.go` shutdown order (`broker.Close()` before `mqttClient.Disconnect()`) avoids triggering this, but the safety is fragile - -## Proposed Solutions - -### Option A: Send signal to Close rather than calling it directly (Recommended) -Have `dispatchLoop` close an internal `loopDone` channel rather than calling `b.Close()`. Have `b.Close()` check if `loopDone` is already closed to avoid double-work. -- **Effort:** Small-Medium -- **Risk:** Low - -### Option B: Use context cancellation -Pass a `context.Context` to `dispatchLoop`. When source closes, cancel the context. `Close` also cancels the context and waits for the loop to exit via `wg.Wait()`. -- **Effort:** Medium -- **Risk:** Low - -### Option C: Add comment documenting the ordering constraint -Document that `broker.Close()` must be called before `mqttClient.Disconnect()`. -- **Effort:** Trivial -- **Cons:** Fragile; future refactors will miss it -- **Risk:** Medium - -## Recommended Action - -_Option A or B. At minimum add a comment (Option C) as immediate mitigation, then refactor to remove the ordering dependency._ - -## Technical Details - -- **Affected file:** `mqtt/broker.go` -- **Related:** `main.go` shutdown sequence - -## Acceptance Criteria - -- [ ] Broker shuts down cleanly regardless of whether `broker.Close()` or MQTT disconnect is called first -- [ ] No goroutine leaks on shutdown -- [ ] Shutdown order is explicit and enforced, not accidental - -## Work Log - -- 2026-03-15: Identified by architecture-strategist review agent diff --git a/todos/006-complete-p2-networkmap-functions-redeclared.md b/todos/006-complete-p2-networkmap-functions-redeclared.md deleted file mode 100644 index 3011515..0000000 --- a/todos/006-complete-p2-networkmap-functions-redeclared.md +++ /dev/null @@ -1,60 +0,0 @@ ---- -status: complete -priority: p2 -issue_id: "006" -tags: [code-review, frontend, react, architecture, quality] -dependencies: [] ---- - -# NetworkMap.tsx Functions Redeclared in Component Body Every Render - -## Problem Statement - -`initializeMap`, `updateNodeMarkers`, `createMarker`, `updateMarker`, and `showInfoWindow` are declared inside the component function body without `useCallback`. They are redeclared on every render, cannot be properly listed in dependency arrays, and are the direct cause of the `/* eslint-disable react-hooks/exhaustive-deps */` at the top of the file. The planned topology polyline feature will make this worse if it follows the same pattern. - -## Findings - -- **File:** `web/src/components/dashboard/NetworkMap.tsx`, lines 261-455, line 1 (`eslint-disable`) -- `tryInitializeMap` useCallback (line 133) lists `updateNodeMarkers` and `initializeMap` in deps — but these change every render, defeating memoization -- The `eslint-disable` suppression acknowledges the problem but doesn't fix it -- Adding polyline management code to this pattern makes the component increasingly difficult to reason about - -## Proposed Solutions - -### Option A: Extract to module-level pure functions (Recommended) -Move functions that don't access component state/refs directly (e.g., `getMarkerIcon`, marker content builders) to module-level. Pass refs/state as parameters. -- **Pros:** Cleanest; enables proper useCallback dependency arrays; allows tree-shaking -- **Effort:** Medium -- **Risk:** Low - -### Option B: Wrap all in useCallback with correct deps -Convert all functions to useCallback with proper dependency lists. Remove the eslint-disable. -- **Pros:** Fixes the hook rules without restructuring -- **Cons:** Still keeps large functions in the component body -- **Effort:** Medium -- **Risk:** Low - -### Option C: Leave as-is, document for topology addition -Accept the pattern and apply it consistently for topology polylines. Remove the eslint-disable only when refactoring later. -- **Effort:** None now -- **Cons:** Technical debt grows; makes the topology feature harder to test -- **Risk:** Medium (ongoing) - -## Recommended Action - -_At minimum, do Option A for module-level pure helpers before adding topology polyline code. The topology feature addition is a natural trigger for this cleanup._ - -## Technical Details - -- **Affected file:** `web/src/components/dashboard/NetworkMap.tsx` -- This should be addressed before or alongside the topology polyline rendering implementation - -## Acceptance Criteria - -- [ ] The `/* eslint-disable react-hooks/exhaustive-deps */` comment is removed -- [ ] All `useCallback` hooks have accurate, complete dependency arrays -- [ ] Adding polyline management code does not require more eslint suppression - -## Work Log - -- 2026-03-15: Identified by architecture-strategist review agent diff --git a/todos/007-pending-p2-wildcard-cors-no-auth.md b/todos/007-pending-p2-wildcard-cors-no-auth.md deleted file mode 100644 index abad95e..0000000 --- a/todos/007-pending-p2-wildcard-cors-no-auth.md +++ /dev/null @@ -1,56 +0,0 @@ ---- -status: pending -priority: p2 -issue_id: "007" -tags: [code-review, security, cors, auth, backend] -dependencies: [] ---- - -# Wildcard CORS + No Authentication on SSE Endpoint - -## Problem Statement - -`/api/stream` sets `Access-Control-Allow-Origin: *`, meaning any JavaScript on any origin can subscribe to the full decoded packet feed. There is no authentication at any layer. The `/api/status` endpoint also exposes the MQTT server hostname and subscribed topic to any caller. - -## Findings - -- **File:** `server/server.go`, line ~182 -- `w.Header().Set("Access-Control-Allow-Origin", "*")` on the SSE endpoint -- No API key, session token, or any auth mechanism -- `/api/status` returns `mqttServer` and `mqttTopic` to unauthenticated callers - -## Proposed Solutions - -### Option A: Restrict CORS to same-origin or configured origin -Replace `*` with a configurable `MESHSTREAM_ALLOWED_ORIGIN` env var. Default to `""` (same-origin only). -- **Effort:** Small -- **Risk:** Low (may break cross-origin dev setups — document the env var) - -### Option B: Add optional static token auth -Add an optional `MESHSTREAM_API_TOKEN` env var. If set, require `Authorization: Bearer ` or `?token=` on all API requests. -- **Effort:** Small-Medium -- **Risk:** Low - -### Option C: No change for local-only deployments -Document that the server is intended for local/trusted network use only. -- **Effort:** Trivial -- **Cons:** Doesn't protect deployments that are inadvertently exposed -- **Risk:** Medium - -## Recommended Action - -_Option A at minimum (restrict CORS origin). Option B if public deployment is expected._ - -## Technical Details - -- **Affected file:** `server/server.go` - -## Acceptance Criteria - -- [ ] CORS origin is not `*` in default configuration -- [ ] CORS origin is configurable via environment variable -- [ ] Status endpoint does not expose internal MQTT details without auth (or is documented as intentionally public) - -## Work Log - -- 2026-03-15: Identified by security-sentinel review agent diff --git a/todos/008-pending-p2-unbounded-binary-payload.md b/todos/008-pending-p2-unbounded-binary-payload.md deleted file mode 100644 index 91c47e0..0000000 --- a/todos/008-pending-p2-unbounded-binary-payload.md +++ /dev/null @@ -1,51 +0,0 @@ ---- -status: pending -priority: p2 -issue_id: "008" -tags: [code-review, security, backend, memory, dos] -dependencies: [] ---- - -# Unbounded Binary Payload Forwarded to All SSE Clients - -## Problem Statement - -When decryption produces bytes that are neither valid protobuf nor ASCII, the raw decrypted bytes are stored verbatim in `BinaryData` and forwarded via SSE to all connected clients. There is no size limit on the encrypted payload or the binary blob. A malicious actor on the MQTT feed could craft large packets, causing memory amplification and excessive bandwidth to all SSE subscribers. - -## Findings - -- **File:** `decoder/decoder.go`, lines 316-318 -- Raw decrypted bytes stored in `BinaryData` field with no size cap -- Serialized by `protojson` and streamed to all SSE clients -- One large malicious packet → all connected clients receive it - -## Proposed Solutions - -### Option A: Add maximum payload size check before decryption (Recommended) -Reject encrypted payloads larger than a configurable limit (e.g., 4KB default). Return a `decodeError` without attempting decryption. -- **Effort:** Small -- **Risk:** Low - -### Option B: Drop binary data from SSE stream entirely -When `BinaryData` would be set, replace it with a size indicator only (e.g., `binaryDataSize: 1234`). Don't send the raw bytes to clients. -- **Effort:** Small -- **Risk:** Low (clients lose access to binary payloads they currently can't decode anyway) - -## Recommended Action - -_Both — add a size check before decryption AND drop binary blobs from SSE output._ - -## Technical Details - -- **Affected file:** `decoder/decoder.go`, lines 316-318 -- Reasonable limit: 1KB for most Meshtastic payloads; 4KB as a generous cap - -## Acceptance Criteria - -- [ ] Encrypted payloads larger than the limit are rejected with a `decodeError` -- [ ] Raw binary bytes are not forwarded to SSE clients -- [ ] Normal Meshtastic packets (all under ~256 bytes) are unaffected - -## Work Log - -- 2026-03-15: Identified by security-sentinel review agent diff --git a/todos/009-pending-p3-no-http-security-headers.md b/todos/009-pending-p3-no-http-security-headers.md deleted file mode 100644 index 213fadf..0000000 --- a/todos/009-pending-p3-no-http-security-headers.md +++ /dev/null @@ -1,46 +0,0 @@ ---- -status: pending -priority: p3 -issue_id: "009" -tags: [code-review, security, http-headers, backend] -dependencies: [001] ---- - -# No HTTP Security Headers - -## Problem Statement - -The server sets no `Content-Security-Policy`, `X-Content-Type-Options`, `X-Frame-Options`, or other security headers. A CSP would meaningfully reduce XSS blast radius (finding 001) and constrain resource loading. - -## Findings - -- **File:** `server/server.go` -- No security middleware layer -- Google Maps API and Fonts load from external origins — a CSP needs to allow these specifically -- `prefab` framework's default header behavior is unknown - -## Proposed Solutions - -### Option A: Add security headers middleware to the prefab server -Inject headers on all responses: `X-Content-Type-Options: nosniff`, `X-Frame-Options: DENY`, `Referrer-Policy: strict-origin-when-cross-origin`, and a CSP allowing `maps.googleapis.com`, `fonts.googleapis.com`, `fonts.gstatic.com`. -- **Effort:** Small -- **Risk:** Low (may need tuning for Google Maps inline styles/scripts) - -## Recommended Action - -_Option A after fixing finding 001 (XSS). CSP is most effective when combined with fixing the actual XSS root cause._ - -## Technical Details - -- **Affected file:** `server/server.go` - -## Acceptance Criteria - -- [ ] All HTTP responses include `X-Content-Type-Options: nosniff` -- [ ] All HTTP responses include `X-Frame-Options: DENY` -- [ ] CSP header allows required Google Maps and Fonts origins -- [ ] Google Maps continues to function with the CSP applied - -## Work Log - -- 2026-03-15: Identified by security-sentinel review agent diff --git a/todos/010-pending-p3-decode-error-leaks-internals.md b/todos/010-pending-p3-decode-error-leaks-internals.md deleted file mode 100644 index f576243..0000000 --- a/todos/010-pending-p3-decode-error-leaks-internals.md +++ /dev/null @@ -1,45 +0,0 @@ ---- -status: pending -priority: p3 -issue_id: "010" -tags: [code-review, security, information-disclosure, backend] -dependencies: [] ---- - -# Internal Error Details Leaked to Browser Clients via decodeError - -## Problem Statement - -Full Go error strings including channel names and parse error details are placed in the `decodeError` field and streamed to all browser clients. This leaks which channels are configured, internal protobuf error messages, and potentially version/path information. - -## Findings - -- **File:** `decoder/decoder.go`, lines 313-315 -- `"PRIVATE_CHANNEL: failed to parse decrypted data on unconfigured channel '%s': %v"` — channel name + Go error exposed -- All connected clients receive this in SSE stream -- Confirms channel key configuration to any observer - -## Proposed Solutions - -### Option A: Strip detail from decodeError before SSE; log server-side -Set `decodeError` to a short code (`"PRIVATE_CHANNEL"`, `"PARSE_ERROR"`, `"DECRYPT_FAILED"`) for the wire format. Log the full error server-side at debug level. -- **Effort:** Small -- **Risk:** Low (developers lose some frontend visibility — mitigated by server logs) - -## Recommended Action - -_Option A. Server logs preserve the detail for debugging; clients see only opaque codes._ - -## Technical Details - -- **Affected files:** `decoder/decoder.go`, all `data.DecodeError = fmt.Sprintf(...)` callsites - -## Acceptance Criteria - -- [ ] `decodeError` field sent to clients contains only short error codes, not Go error strings -- [ ] Full error detail is logged at the server -- [ ] Frontend can still distinguish error types from the short codes - -## Work Log - -- 2026-03-15: Identified by security-sentinel review agent diff --git a/todos/011-complete-p3-dead-code-mustparseduration.md b/todos/011-complete-p3-dead-code-mustparseduration.md deleted file mode 100644 index 4278d7b..0000000 --- a/todos/011-complete-p3-dead-code-mustparseduration.md +++ /dev/null @@ -1,34 +0,0 @@ ---- -status: complete -priority: p3 -issue_id: "011" -tags: [code-review, quality, cleanup, backend] -dependencies: [] ---- - -# Dead Code: mustParseDuration in main.go - -## Problem Statement - -`mustParseDuration` is defined in `main.go` but never called. `durationFromEnv` is used everywhere instead. - -## Findings - -- **File:** `main.go`, lines 111-118 -- Function defined, zero call sites -- Leftover scaffolding from earlier iteration - -## Proposed Solutions - -### Option A: Delete the function -- **Effort:** Trivial -- **Risk:** None - -## Acceptance Criteria - -- [ ] `mustParseDuration` removed from `main.go` -- [ ] `make build` still passes - -## Work Log - -- 2026-03-15: Identified by architecture-strategist review agent