diff --git a/alembic/versions/20260421_0001_normalize_public_key_case.py b/alembic/versions/20260421_0001_normalize_public_key_case.py new file mode 100644 index 0000000..e2f8441 --- /dev/null +++ b/alembic/versions/20260421_0001_normalize_public_key_case.py @@ -0,0 +1,163 @@ +"""Normalize public_key to lowercase and merge duplicate nodes + +Revision ID: b1c2d3e4f5a6 +Revises: a1b2c3d4e5f6 +Create Date: 2026-04-21 + +Before this migration, public_key values could be stored in mixed case: +- tag_import.py stored them as lowercase (via validate_public_key) +- letsmesh_normalizer.py stored them as UPPERCASE (via _normalize_full_public_key) +- MQTT topic paths stored them as-is + +This caused duplicate nodes for the same physical device, with tags +linked to one and mesh events linked to another. + +This migration: +1. Merges duplicate nodes (picking the one with the earliest first_seen) +2. Re-points all FK references to the winner node +3. Deletes the loser nodes +4. Normalizes all remaining public_keys to lowercase +5. Also lowercases public_key columns in child tables (advertisements, telemetry) +""" + +from alembic import op +from sqlalchemy import text + +revision = "b1c2d3e4f5a6" +down_revision = "a1b2c3d4e5f6" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + conn = op.get_bind() + + # Find groups of duplicate nodes (same lowercase public_key, different actual case) + duplicates = conn.execute(text(""" + SELECT LOWER(public_key) AS lower_pk, + GROUP_CONCAT(id) AS ids, + COUNT(*) AS cnt + FROM nodes + GROUP BY LOWER(public_key) + HAVING cnt > 1 + """)).fetchall() + + for row in duplicates: + lower_pk = row[0] + ids = row[1].split(",") + + winner_row = conn.execute( + text(""" + SELECT id FROM nodes + WHERE LOWER(public_key) = :lower_pk + ORDER BY first_seen ASC, created_at ASC + LIMIT 1 + """), + {"lower_pk": lower_pk}, + ).fetchone() + + if not winner_row: + continue + + winner_id = winner_row[0] + loser_ids = [nid for nid in ids if nid != winner_id] + + for loser_id in loser_ids: + params = {"winner_id": winner_id, "loser_id": loser_id} + + # For CASCADE tables, move records to winner first + conn.execute( + text( + "UPDATE node_tags SET node_id = :winner_id WHERE node_id = :loser_id" + ), + params, + ) + conn.execute( + text( + "UPDATE event_observers SET observer_node_id = :winner_id WHERE observer_node_id = :loser_id" + ), + params, + ) + + # For SET NULL tables, re-point to winner instead of losing the link + conn.execute( + text( + "UPDATE advertisements SET observer_node_id = :winner_id WHERE observer_node_id = :loser_id" + ), + params, + ) + conn.execute( + text( + "UPDATE advertisements SET node_id = :winner_id WHERE node_id = :loser_id" + ), + params, + ) + conn.execute( + text( + "UPDATE telemetry SET observer_node_id = :winner_id WHERE observer_node_id = :loser_id" + ), + params, + ) + conn.execute( + text( + "UPDATE telemetry SET node_id = :winner_id WHERE node_id = :loser_id" + ), + params, + ) + conn.execute( + text( + "UPDATE messages SET observer_node_id = :winner_id WHERE observer_node_id = :loser_id" + ), + params, + ) + conn.execute( + text( + "UPDATE trace_paths SET observer_node_id = :winner_id WHERE observer_node_id = :loser_id" + ), + params, + ) + conn.execute( + text( + "UPDATE events_log SET observer_node_id = :winner_id WHERE observer_node_id = :loser_id" + ), + params, + ) + + # Merge scalar fields into winner (keep best data) + conn.execute( + text(""" + UPDATE nodes SET + name = COALESCE(name, (SELECT name FROM nodes WHERE id = :loser_id)), + adv_type = COALESCE(adv_type, (SELECT adv_type FROM nodes WHERE id = :loser_id)), + last_seen = ( + SELECT MAX(v) FROM ( + SELECT last_seen AS v FROM nodes WHERE id = :winner_id + UNION ALL + SELECT last_seen AS v FROM nodes WHERE id = :loser_id + ) + ), + lat = COALESCE(lat, (SELECT lat FROM nodes WHERE id = :loser_id)), + lon = COALESCE(lon, (SELECT lon FROM nodes WHERE id = :loser_id)) + WHERE id = :winner_id + """), + params, + ) + + # Delete the loser (CASCADE will clean up any remaining orphans) + conn.execute( + text("DELETE FROM nodes WHERE id = :loser_id"), {"loser_id": loser_id} + ) + + # Now normalize all remaining public_keys to lowercase + conn.execute(text("UPDATE nodes SET public_key = LOWER(public_key)")) + + # Also lowercase public_key in child tables that store their own copy + conn.execute(text("UPDATE advertisements SET public_key = LOWER(public_key)")) + conn.execute(text("UPDATE telemetry SET node_public_key = LOWER(node_public_key)")) + + +def downgrade() -> None: + # Cannot reverse the merge (duplicate data has been deleted). + # The public_key normalization to lowercase is also irreversible + # since we don't know the original case. + pass diff --git a/docs/upgrading.md b/docs/upgrading.md index b08e74a..3def1b8 100644 --- a/docs/upgrading.md +++ b/docs/upgrading.md @@ -4,7 +4,7 @@ This guide covers upgrading from a previous MeshCore Hub release to the current ## v0.9.0 -This release includes **breaking changes** to the MQTT broker, packet capture service, and data ingestion pipeline. +This release includes **breaking changes** to the MQTT broker, packet capture service, data ingestion pipeline, and public key handling. ### Overview of Changes @@ -23,6 +23,20 @@ This release includes **breaking changes** to the MQTT broker, packet capture se | Compose files | Single `docker-compose.yml` | Base + environment overrides (`.dev.yml`, `.prod.yml`) | | Container names | `meshcore-*` | Parameterized via `COMPOSE_PROJECT_NAME` (default: `hub-*`) | | Volume names | `meshcore_*` | Parameterized via `COMPOSE_PROJECT_NAME` (default: `hub_*`) | +| Public key case | Mixed (uppercase/lowercase) | Normalized to **lowercase** | + +### Public Key Case Normalization + +Previously, the tag importer stored `public_key` as lowercase while the LetsMesh packet normalizer stored it as UPPERCASE. This could create duplicate nodes for the same physical device — with tags linked to one node and mesh events linked to another. + +An Alembic migration (`b1c2d3e4f5a6`) automatically: + +1. Merges duplicate nodes (keeping the one with the earliest `first_seen`) +2. Re-points all foreign key references to the surviving node +3. Deletes the duplicate node +4. Normalizes all remaining `public_key` values to lowercase + +**No manual action is required** — the migration runs as part of `meshcore-hub db upgrade` (or the `migrate` Docker Compose service). ### Step 1: Backup diff --git a/src/meshcore_hub/collector/letsmesh_normalizer.py b/src/meshcore_hub/collector/letsmesh_normalizer.py index 8fa32b2..416df7f 100644 --- a/src/meshcore_hub/collector/letsmesh_normalizer.py +++ b/src/meshcore_hub/collector/letsmesh_normalizer.py @@ -784,7 +784,7 @@ class LetsMeshNormalizer: match = re.search(r"([0-9A-Fa-f]{64})", value) if not match: return None - return match.group(1).upper() + return match.group(1).lower() @classmethod def _parse_hex_or_int(cls, value: Any) -> int | None: @@ -903,7 +903,7 @@ class LetsMeshNormalizer: return None if any(ch not in "0123456789ABCDEF" for ch in normalized): return None - return normalized + return normalized.lower() @staticmethod def _normalize_pubkey_prefix(value: Any) -> str | None: diff --git a/src/meshcore_hub/common/models/node.py b/src/meshcore_hub/common/models/node.py index 7657b3a..f4d7987 100644 --- a/src/meshcore_hub/common/models/node.py +++ b/src/meshcore_hub/common/models/node.py @@ -37,6 +37,12 @@ class Node(Base, UUIDMixin, TimestampMixin): nullable=False, index=True, ) + + def __init__(self, **kwargs: object) -> None: + super().__init__(**kwargs) + if self.public_key and isinstance(self.public_key, str): + self.public_key = self.public_key.lower() + name: Mapped[Optional[str]] = mapped_column( String(255), nullable=True, diff --git a/src/meshcore_hub/common/mqtt.py b/src/meshcore_hub/common/mqtt.py index fcb9794..b4e4489 100644 --- a/src/meshcore_hub/common/mqtt.py +++ b/src/meshcore_hub/common/mqtt.py @@ -102,7 +102,7 @@ class TopicBuilder: ): public_key = parts[prefix_len] event_name = "/".join(parts[prefix_len + 2 :]) - return (public_key, event_name) + return (public_key.lower(), event_name) return None def parse_command_topic(self, topic: str) -> tuple[str, str] | None: @@ -124,7 +124,7 @@ class TopicBuilder: ): public_key = parts[prefix_len] command_name = "/".join(parts[prefix_len + 2 :]) - return (public_key, command_name) + return (public_key.lower(), command_name) return None def parse_letsmesh_upload_topic(self, topic: str) -> tuple[str, str] | None: @@ -145,7 +145,7 @@ class TopicBuilder: if feed_type not in {"packets", "status", "internal"}: return None - return (public_key, feed_type) + return (public_key.lower(), feed_type) MessageHandler = Callable[[str, str, dict[str, Any]], None] diff --git a/tests/test_collector/test_letsmesh_normalizer_integration.py b/tests/test_collector/test_letsmesh_normalizer_integration.py index 50b1206..3b303b0 100644 --- a/tests/test_collector/test_letsmesh_normalizer_integration.py +++ b/tests/test_collector/test_letsmesh_normalizer_integration.py @@ -43,8 +43,10 @@ def _make_decoder( return decoder -PUB_KEY = "AA" * 32 -OBSERVER_KEY = "BB" * 32 +PUB_KEY_UPPER = "AA" * 32 +PUB_KEY = PUB_KEY_UPPER.lower() +OBSERVER_KEY_UPPER = "BB" * 32 +OBSERVER_KEY = OBSERVER_KEY_UPPER.lower() class TestStatusFeed: @@ -90,7 +92,7 @@ class TestAdvertPacket: "payload": { "decoded": { "type": 4, - "publicKey": PUB_KEY, + "publicKey": PUB_KEY_UPPER, "timestamp": 1700000000, "signature": "CC" * 64, "appData": { @@ -130,7 +132,7 @@ class TestAdvertPacket: "payload": { "decoded": { "type": 4, - "publicKey": PUB_KEY, + "publicKey": PUB_KEY_UPPER, "timestamp": 1700000000, "signature": "CC" * 64, } @@ -346,10 +348,10 @@ class TestControlPacket: "flags": 1, "subType": 144, "dataHex": "", - "publicKey": PUB_KEY, + "publicKey": PUB_KEY_UPPER, "nodeType": 2, "parsed": { - "publicKey": PUB_KEY, + "publicKey": PUB_KEY_UPPER, "nodeType": 2, "snr": 10, }, @@ -411,7 +413,7 @@ class TestPathPacket: "pathLength": 4, "pathHashes": ["AA", "BB", "CC", "DD"], "extraType": 1, - "extraData": PUB_KEY, + "extraData": PUB_KEY_UPPER, } }, } @@ -447,7 +449,7 @@ class TestResponsePacket: "tag": 0, "decrypted": { "content": { - "node_public_key": PUB_KEY, + "node_public_key": PUB_KEY_UPPER, "battery_voltage": 3.7, "battery_percentage": 85, } @@ -484,7 +486,7 @@ class TestResponsePacket: "tag": 0, "decrypted": { "content": { - "node_public_key": PUB_KEY, + "node_public_key": PUB_KEY_UPPER, "parsed_data": {"temperature": 22.5, "humidity": 60}, } }, diff --git a/tests/test_collector/test_subscriber.py b/tests/test_collector/test_subscriber.py index c3ad793..a8dd00f 100644 --- a/tests/test_collector/test_subscriber.py +++ b/tests/test_collector/test_subscriber.py @@ -493,7 +493,7 @@ class TestSubscriber: public_key, event_type, payload, _db = handler.call_args.args assert public_key == "a" * 64 assert event_type == "advertisement" - assert payload["public_key"] == "B" * 64 + assert payload["public_key"] == "b" * 64 assert payload["name"] == "Concord Attic G2" assert payload["adv_type"] == "repeater" assert payload["flags"] == 146 @@ -548,7 +548,7 @@ class TestSubscriber: contact_handler.assert_called_once() _public_key, event_type, payload, _db = contact_handler.call_args.args assert event_type == "contact" - assert payload["public_key"] == "C" * 64 + assert payload["public_key"] == "c" * 64 assert payload["type"] == 2 assert payload["flags"] == 146 @@ -755,7 +755,7 @@ class TestSubscriber: assert payload["hop_count"] == 2 assert payload["path_hashes"] == ["AA", "BB"] assert payload["extra_type"] == 244 - assert payload["node_public_key"] == "D" * 64 + assert payload["node_public_key"] == "d" * 64 def test_letsmesh_packet_fallback_logs_decoded_payload( self, mock_mqtt_client, db_manager diff --git a/tests/test_common/test_mqtt.py b/tests/test_common/test_mqtt.py index 73d2405..58ab9c7 100644 --- a/tests/test_common/test_mqtt.py +++ b/tests/test_common/test_mqtt.py @@ -14,7 +14,7 @@ class TestTopicBuilder: "meshcore/ABCDEF1234567890/event/advertisement" ) - assert parsed == ("ABCDEF1234567890", "advertisement") + assert parsed == ("abcdef1234567890", "advertisement") def test_parse_event_topic_with_multi_segment_prefix(self) -> None: """Event topics are parsed correctly with a slash-delimited prefix.""" @@ -24,7 +24,7 @@ class TestTopicBuilder: "meshcore/BOS/ABCDEF1234567890/event/channel_msg_recv" ) - assert parsed == ("ABCDEF1234567890", "channel_msg_recv") + assert parsed == ("abcdef1234567890", "channel_msg_recv") def test_parse_command_topic_with_multi_segment_prefix(self) -> None: """Command topics are parsed correctly with a slash-delimited prefix.""" @@ -34,7 +34,7 @@ class TestTopicBuilder: "meshcore/BOS/ABCDEF123456/command/send_msg" ) - assert parsed == ("ABCDEF123456", "send_msg") + assert parsed == ("abcdef123456", "send_msg") def test_parse_letsmesh_upload_topic(self) -> None: """LetsMesh upload topics map to public key and feed type.""" @@ -44,7 +44,7 @@ class TestTopicBuilder: "meshcore/STN/ABCDEF1234567890/status" ) - assert parsed == ("ABCDEF1234567890", "status") + assert parsed == ("abcdef1234567890", "status") def test_parse_letsmesh_upload_topic_rejects_unknown_feed(self) -> None: """Unknown LetsMesh feed topics are rejected."""