Make sure to count packets once per node/channel, even if received by multiple gateways

This commit is contained in:
Daniel Pupius
2025-04-24 12:58:20 -07:00
parent 28cb7072f7
commit 69b0ea6615

View File

@@ -1,5 +1,12 @@
import { createSlice, PayloadAction } from "@reduxjs/toolkit";
import { Packet, DeviceMetrics, EnvironmentMetrics, Position, User, Telemetry } from "../../lib/types";
import {
Packet,
DeviceMetrics,
EnvironmentMetrics,
Position,
User,
Telemetry,
} from "../../lib/types";
// Types for aggregated data
export interface NodeData {
@@ -54,6 +61,8 @@ interface AggregatorState {
channels: Record<string, ChannelData>;
messages: Record<string, TextMessage[]>;
selectedNodeId?: number;
// Track already processed packet IDs to prevent duplicates
processedPackets: Record<string, boolean>;
}
const initialState: AggregatorState = {
@@ -61,6 +70,7 @@ const initialState: AggregatorState = {
gateways: {},
channels: {},
messages: {},
processedPackets: {},
};
// Helper to create a key for message collections (by channelId)
@@ -78,9 +88,28 @@ const processPacket = (state: AggregatorState, packet: Packet) => {
const nodeId = data.from;
const timestamp = data.rxTime || Math.floor(Date.now() / 1000);
// Skip packets without valid from/id
if (nodeId === undefined || data.id === undefined) {
return;
}
// Create a unique packet key (same format as in packetSlice)
const nodeIdHex = `!${nodeId.toString(16).toLowerCase()}`;
const packetKey = `${nodeIdHex}_${data.id}`;
// Check if we've already processed this packet
const isNewPacket = !state.processedPackets[packetKey];
// Always mark this packet as processed
state.processedPackets[packetKey] = true;
// Update gateway data, but only if it's reporting packets from a different nodeId
// (a true gateway is relaying data from other nodes, not just its own data)
if (gatewayId && nodeId !== undefined && gatewayId !== `!${nodeId.toString(16)}`) {
if (
gatewayId &&
nodeId !== undefined &&
gatewayId !== `!${nodeId.toString(16)}`
) {
if (!state.gateways[gatewayId]) {
state.gateways[gatewayId] = {
gatewayId,
@@ -96,8 +125,6 @@ const processPacket = (state: AggregatorState, packet: Packet) => {
const gateway = state.gateways[gatewayId];
gateway.lastHeard = Math.max(gateway.lastHeard, timestamp);
gateway.messageCount++;
// Track text messages
if (data.textMessage) {
gateway.textMessageCount++;
}
@@ -105,13 +132,18 @@ const processPacket = (state: AggregatorState, packet: Packet) => {
if (channelId && !gateway.channelIds.includes(channelId)) {
gateway.channelIds.push(channelId);
}
// Record node in observed nodes
if (!gateway.observedNodes.includes(nodeId)) {
gateway.observedNodes.push(nodeId);
}
}
if (!isNewPacket) {
// Channel and node stats will already have been updated.
return;
}
// Update channel data
if (channelId) {
if (!state.channels[channelId]) {
@@ -125,6 +157,7 @@ const processPacket = (state: AggregatorState, packet: Packet) => {
const channel = state.channels[channelId];
channel.messageCount++;
channel.lastMessage = timestamp;
if (gatewayId && !channel.gateways.includes(gatewayId)) {
@@ -151,12 +184,10 @@ const processPacket = (state: AggregatorState, packet: Packet) => {
const node = state.nodes[nodeId];
node.lastHeard = Math.max(node.lastHeard, timestamp);
node.messageCount++;
// Track text messages
if (data.textMessage) {
node.textMessageCount++;
}
// Set channelId and gatewayId if available
if (channelId) {
node.channelId = channelId;
@@ -181,17 +212,18 @@ const processPacket = (state: AggregatorState, packet: Packet) => {
}
}
// Process text messages
// Process text messages.
if (data.textMessage && nodeId !== undefined && channelId) {
const channelKey = getChannelKey(channelId);
if (!state.messages[channelKey]) {
state.messages[channelKey] = [];
}
// Add the new message
const nodeName = state.nodes[nodeId]?.shortName || state.nodes[nodeId]?.longName;
const nodeName =
state.nodes[nodeId]?.shortName || state.nodes[nodeId]?.longName;
state.messages[channelKey].push({
id: data.id || Math.random(),
from: nodeId,
@@ -199,7 +231,7 @@ const processPacket = (state: AggregatorState, packet: Packet) => {
text: data.textMessage,
timestamp,
channelId,
gatewayId: gatewayId || '',
gatewayId: gatewayId || "",
});
// Sort messages by timestamp (newest first)
@@ -207,7 +239,10 @@ const processPacket = (state: AggregatorState, packet: Packet) => {
// Limit the number of messages per channel
if (state.messages[channelKey].length > MAX_MESSAGES_PER_CHANNEL) {
state.messages[channelKey] = state.messages[channelKey].slice(0, MAX_MESSAGES_PER_CHANNEL);
state.messages[channelKey] = state.messages[channelKey].slice(
0,
MAX_MESSAGES_PER_CHANNEL
);
}
}
};
@@ -218,7 +253,8 @@ const updateNodeInfo = (node: NodeData, nodeInfo: User) => {
if (nodeInfo.longName) node.longName = nodeInfo.longName;
if (nodeInfo.macaddr) node.macAddr = nodeInfo.macaddr;
if (nodeInfo.hwModel) node.hwModel = nodeInfo.hwModel;
if (nodeInfo.batteryLevel !== undefined) node.batteryLevel = nodeInfo.batteryLevel;
if (nodeInfo.batteryLevel !== undefined)
node.batteryLevel = nodeInfo.batteryLevel;
if (nodeInfo.snr !== undefined) node.snr = nodeInfo.snr;
};
@@ -226,13 +262,13 @@ const updateNodeInfo = (node: NodeData, nodeInfo: User) => {
const updateTelemetry = (node: NodeData, telemetry: Telemetry) => {
if (telemetry.deviceMetrics) {
node.deviceMetrics = { ...telemetry.deviceMetrics };
// Update battery level from device metrics if available
if (telemetry.deviceMetrics.batteryLevel !== undefined) {
node.batteryLevel = telemetry.deviceMetrics.batteryLevel;
}
}
if (telemetry.environmentMetrics) {
node.environmentMetrics = { ...telemetry.environmentMetrics };
}
@@ -250,6 +286,7 @@ const aggregatorSlice = createSlice({
state.gateways = {};
state.channels = {};
state.messages = {};
state.processedPackets = {};
state.selectedNodeId = undefined;
},
selectNode: (state, action: PayloadAction<number | undefined>) => {
@@ -258,6 +295,7 @@ const aggregatorSlice = createSlice({
},
});
export const { processNewPacket, clearAggregatedData, selectNode } = aggregatorSlice.actions;
export const { processNewPacket, clearAggregatedData, selectNode } =
aggregatorSlice.actions;
export default aggregatorSlice.reducer;