mirror of
https://github.com/pablorevilla-meshtastic/meshview.git
synced 2026-03-04 23:27:46 +01:00
Fix issues with decryption of secondary keys
This commit is contained in:
@@ -8,6 +8,7 @@ import aiomqtt
|
||||
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
|
||||
from google.protobuf.message import DecodeError
|
||||
|
||||
from meshtastic.protobuf.mesh_pb2 import Data
|
||||
from meshtastic.protobuf.mqtt_pb2 import ServiceEnvelope
|
||||
from meshview.config import CONFIG
|
||||
|
||||
@@ -45,6 +46,12 @@ def _parse_skip_node_ids():
|
||||
return skip_ids
|
||||
|
||||
|
||||
def _strip_quotes(value):
|
||||
if len(value) >= 2 and value[0] == value[-1] and value[0] in ("'", '"'):
|
||||
return value[1:-1]
|
||||
return value
|
||||
|
||||
|
||||
def _parse_secondary_keys():
|
||||
mqtt_config = CONFIG.get("mqtt", {})
|
||||
raw_value = mqtt_config.get("secondary_keys", "")
|
||||
@@ -62,7 +69,9 @@ def _parse_secondary_keys():
|
||||
keys = []
|
||||
for value in values:
|
||||
try:
|
||||
keys.append(base64.b64decode(value))
|
||||
cleaned = _strip_quotes(str(value).strip())
|
||||
if cleaned:
|
||||
keys.append(base64.b64decode(cleaned))
|
||||
except (TypeError, ValueError):
|
||||
logger.warning("Invalid base64 key in mqtt.secondary_keys: %s", value)
|
||||
return keys
|
||||
@@ -71,8 +80,16 @@ def _parse_secondary_keys():
|
||||
SKIP_NODE_IDS = _parse_skip_node_ids()
|
||||
SECONDARY_KEYS = _parse_secondary_keys()
|
||||
|
||||
logger.info("Primary key: %s", PRIMARY_KEY)
|
||||
if SECONDARY_KEYS:
|
||||
logger.info("Secondary keys: %s", SECONDARY_KEYS)
|
||||
else:
|
||||
logger.info("Secondary keys: []")
|
||||
|
||||
def _try_decrypt(packet, key):
|
||||
|
||||
def decrypt(packet, key):
|
||||
if packet.HasField("decoded"):
|
||||
return True
|
||||
packet_id = packet.id.to_bytes(8, "little")
|
||||
from_node_id = getattr(packet, "from").to_bytes(8, "little")
|
||||
nonce = packet_id + from_node_id
|
||||
@@ -81,24 +98,17 @@ def _try_decrypt(packet, key):
|
||||
decryptor = cipher.decryptor()
|
||||
raw_proto = decryptor.update(packet.encrypted) + decryptor.finalize()
|
||||
try:
|
||||
packet.decoded.ParseFromString(raw_proto)
|
||||
data = Data()
|
||||
data.ParseFromString(raw_proto)
|
||||
packet.decoded.CopyFrom(data)
|
||||
except DecodeError:
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def decrypt(packet):
|
||||
if packet.HasField("decoded"):
|
||||
return
|
||||
if _try_decrypt(packet, PRIMARY_KEY):
|
||||
return
|
||||
for key in SECONDARY_KEYS:
|
||||
if _try_decrypt(packet, key):
|
||||
return
|
||||
|
||||
|
||||
async def get_topic_envelopes(mqtt_server, mqtt_port, topics, mqtt_user, mqtt_passwd):
|
||||
identifier = str(random.getrandbits(16))
|
||||
keyring = [PRIMARY_KEY, *SECONDARY_KEYS]
|
||||
msg_count = 0
|
||||
start_time = None
|
||||
while True:
|
||||
@@ -125,8 +135,9 @@ async def get_topic_envelopes(mqtt_server, mqtt_port, topics, mqtt_user, mqtt_pa
|
||||
except DecodeError:
|
||||
continue
|
||||
|
||||
decrypt(envelope.packet)
|
||||
# print(envelope.packet.decoded)
|
||||
for key in keyring:
|
||||
if decrypt(envelope.packet, key):
|
||||
break
|
||||
if not envelope.packet.decoded:
|
||||
continue
|
||||
|
||||
|
||||
@@ -83,6 +83,7 @@ skip_node_ids =
|
||||
secondary_keys =
|
||||
|
||||
|
||||
|
||||
# -------------------------
|
||||
# Database Configuration
|
||||
# -------------------------
|
||||
|
||||
Reference in New Issue
Block a user