From 69b0ea66158b175c5769524ea13c9fc7ea63d766 Mon Sep 17 00:00:00 2001 From: Daniel Pupius Date: Thu, 24 Apr 2025 12:58:20 -0700 Subject: [PATCH] Make sure to count packets once per node/channel, even if received by multiple gateways --- web/src/store/slices/aggregatorSlice.ts | 74 +++++++++++++++++++------ 1 file changed, 56 insertions(+), 18 deletions(-) diff --git a/web/src/store/slices/aggregatorSlice.ts b/web/src/store/slices/aggregatorSlice.ts index 5cf0169..9c442c6 100644 --- a/web/src/store/slices/aggregatorSlice.ts +++ b/web/src/store/slices/aggregatorSlice.ts @@ -1,5 +1,12 @@ import { createSlice, PayloadAction } from "@reduxjs/toolkit"; -import { Packet, DeviceMetrics, EnvironmentMetrics, Position, User, Telemetry } from "../../lib/types"; +import { + Packet, + DeviceMetrics, + EnvironmentMetrics, + Position, + User, + Telemetry, +} from "../../lib/types"; // Types for aggregated data export interface NodeData { @@ -54,6 +61,8 @@ interface AggregatorState { channels: Record; messages: Record; selectedNodeId?: number; + // Track already processed packet IDs to prevent duplicates + processedPackets: Record; } const initialState: AggregatorState = { @@ -61,6 +70,7 @@ const initialState: AggregatorState = { gateways: {}, channels: {}, messages: {}, + processedPackets: {}, }; // Helper to create a key for message collections (by channelId) @@ -78,9 +88,28 @@ const processPacket = (state: AggregatorState, packet: Packet) => { const nodeId = data.from; const timestamp = data.rxTime || Math.floor(Date.now() / 1000); + // Skip packets without valid from/id + if (nodeId === undefined || data.id === undefined) { + return; + } + + // Create a unique packet key (same format as in packetSlice) + const nodeIdHex = `!${nodeId.toString(16).toLowerCase()}`; + const packetKey = `${nodeIdHex}_${data.id}`; + + // Check if we've already processed this packet + const isNewPacket = !state.processedPackets[packetKey]; + + // Always mark this packet as processed + state.processedPackets[packetKey] = true; + // Update gateway data, but only if it's reporting packets from a different nodeId // (a true gateway is relaying data from other nodes, not just its own data) - if (gatewayId && nodeId !== undefined && gatewayId !== `!${nodeId.toString(16)}`) { + if ( + gatewayId && + nodeId !== undefined && + gatewayId !== `!${nodeId.toString(16)}` + ) { if (!state.gateways[gatewayId]) { state.gateways[gatewayId] = { gatewayId, @@ -96,8 +125,6 @@ const processPacket = (state: AggregatorState, packet: Packet) => { const gateway = state.gateways[gatewayId]; gateway.lastHeard = Math.max(gateway.lastHeard, timestamp); gateway.messageCount++; - - // Track text messages if (data.textMessage) { gateway.textMessageCount++; } @@ -105,13 +132,18 @@ const processPacket = (state: AggregatorState, packet: Packet) => { if (channelId && !gateway.channelIds.includes(channelId)) { gateway.channelIds.push(channelId); } - + // Record node in observed nodes if (!gateway.observedNodes.includes(nodeId)) { gateway.observedNodes.push(nodeId); } } + if (!isNewPacket) { + // Channel and node stats will already have been updated. + return; + } + // Update channel data if (channelId) { if (!state.channels[channelId]) { @@ -125,6 +157,7 @@ const processPacket = (state: AggregatorState, packet: Packet) => { const channel = state.channels[channelId]; channel.messageCount++; + channel.lastMessage = timestamp; if (gatewayId && !channel.gateways.includes(gatewayId)) { @@ -151,12 +184,10 @@ const processPacket = (state: AggregatorState, packet: Packet) => { const node = state.nodes[nodeId]; node.lastHeard = Math.max(node.lastHeard, timestamp); node.messageCount++; - - // Track text messages if (data.textMessage) { node.textMessageCount++; } - + // Set channelId and gatewayId if available if (channelId) { node.channelId = channelId; @@ -181,17 +212,18 @@ const processPacket = (state: AggregatorState, packet: Packet) => { } } - // Process text messages + // Process text messages. if (data.textMessage && nodeId !== undefined && channelId) { const channelKey = getChannelKey(channelId); - + if (!state.messages[channelKey]) { state.messages[channelKey] = []; } // Add the new message - const nodeName = state.nodes[nodeId]?.shortName || state.nodes[nodeId]?.longName; - + const nodeName = + state.nodes[nodeId]?.shortName || state.nodes[nodeId]?.longName; + state.messages[channelKey].push({ id: data.id || Math.random(), from: nodeId, @@ -199,7 +231,7 @@ const processPacket = (state: AggregatorState, packet: Packet) => { text: data.textMessage, timestamp, channelId, - gatewayId: gatewayId || '', + gatewayId: gatewayId || "", }); // Sort messages by timestamp (newest first) @@ -207,7 +239,10 @@ const processPacket = (state: AggregatorState, packet: Packet) => { // Limit the number of messages per channel if (state.messages[channelKey].length > MAX_MESSAGES_PER_CHANNEL) { - state.messages[channelKey] = state.messages[channelKey].slice(0, MAX_MESSAGES_PER_CHANNEL); + state.messages[channelKey] = state.messages[channelKey].slice( + 0, + MAX_MESSAGES_PER_CHANNEL + ); } } }; @@ -218,7 +253,8 @@ const updateNodeInfo = (node: NodeData, nodeInfo: User) => { if (nodeInfo.longName) node.longName = nodeInfo.longName; if (nodeInfo.macaddr) node.macAddr = nodeInfo.macaddr; if (nodeInfo.hwModel) node.hwModel = nodeInfo.hwModel; - if (nodeInfo.batteryLevel !== undefined) node.batteryLevel = nodeInfo.batteryLevel; + if (nodeInfo.batteryLevel !== undefined) + node.batteryLevel = nodeInfo.batteryLevel; if (nodeInfo.snr !== undefined) node.snr = nodeInfo.snr; }; @@ -226,13 +262,13 @@ const updateNodeInfo = (node: NodeData, nodeInfo: User) => { const updateTelemetry = (node: NodeData, telemetry: Telemetry) => { if (telemetry.deviceMetrics) { node.deviceMetrics = { ...telemetry.deviceMetrics }; - + // Update battery level from device metrics if available if (telemetry.deviceMetrics.batteryLevel !== undefined) { node.batteryLevel = telemetry.deviceMetrics.batteryLevel; } } - + if (telemetry.environmentMetrics) { node.environmentMetrics = { ...telemetry.environmentMetrics }; } @@ -250,6 +286,7 @@ const aggregatorSlice = createSlice({ state.gateways = {}; state.channels = {}; state.messages = {}; + state.processedPackets = {}; state.selectedNodeId = undefined; }, selectNode: (state, action: PayloadAction) => { @@ -258,6 +295,7 @@ const aggregatorSlice = createSlice({ }, }); -export const { processNewPacket, clearAggregatedData, selectNode } = aggregatorSlice.actions; +export const { processNewPacket, clearAggregatedData, selectNode } = + aggregatorSlice.actions; export default aggregatorSlice.reducer;