diff --git a/app/decoder.py b/app/decoder.py index 5640e55..fbef90c 100644 --- a/app/decoder.py +++ b/app/decoder.py @@ -53,9 +53,11 @@ class ParsedAdvertisement: """Result of parsing an advertisement packet.""" public_key: str # 64-char hex + timestamp: int # Unix timestamp from the advertisement name: str | None lat: float | None lon: float | None + device_role: int # 1=Chat, 2=Repeater, 3=Room, 4=Sensor @dataclass @@ -290,62 +292,91 @@ def parse_advertisement(payload: bytes) -> ParsedAdvertisement | None: """ Parse an advertisement payload. - Advertisement structure: - - public_key (32 bytes): Ed25519 public key - - signature (64 bytes): Ed25519 signature - - advert_data (variable): Contains name and possibly lat/lon - - The name is typically at the end of the payload as a UTF-8 string. + Advertisement payload structure (101+ bytes): + - Bytes 0-31 (32 bytes): Public Key (Ed25519) + - Bytes 32-35 (4 bytes): Timestamp (Unix timestamp, little-endian) + - Bytes 36-99 (64 bytes): Signature (Ed25519) + - Byte 100 (1 byte): App Flags + - Bits 0-3: Device Role (1=Chat, 2=Repeater, 3=Room, 4=Sensor) + - Bit 4 (0x10): HasLocation + - Bit 5 (0x20): HasFeature1 + - Bit 6 (0x40): HasFeature2 + - Bit 7 (0x80): HasName + - If HasLocation: 8 bytes (4 lat + 4 lon as signed int32 LE / 1e6) + - If HasFeature1: 2 bytes (skipped) + - If HasFeature2: 2 bytes (skipped) + - If HasName: remaining bytes = name (UTF-8) """ - # Minimum: 32 (pubkey) + 64 (sig) + at least 1 byte for flags/data - if len(payload) < 97: + # Minimum: pubkey(32) + timestamp(4) + sig(64) + flags(1) = 101 bytes + if len(payload) < 101: return None - public_key = payload[:32].hex() - # signature = payload[32:96] # Not currently verified - advert_data = payload[96:] + # Parse fixed-position fields + public_key = payload[0:32].hex() + timestamp = int.from_bytes(payload[32:36], byteorder="little") + # signature = payload[36:100] # Not currently verified + flags = payload[100] - if len(advert_data) == 0: - return ParsedAdvertisement( - public_key=public_key, - name=None, - lat=None, - lon=None, - ) + # Parse flags + device_role = flags & 0x0F + has_location = bool(flags & 0x10) + has_feature1 = bool(flags & 0x20) + has_feature2 = bool(flags & 0x40) + has_name = bool(flags & 0x80) - # Try to extract name from the advert data - # The structure varies, but the name is typically near the end - name = None + # Start parsing variable-length app data after flags + offset = 101 lat = None lon = None + name = None - # Try to decode the entire advert_data as UTF-8 to find the name - # Names are typically at the end after any binary data - try: - # Find the last valid UTF-8 string - for start in range(len(advert_data)): - try: - text = advert_data[start:].decode("utf-8") - # Filter out control characters and check if it looks like a name - null_idx = text.find("\x00") - if null_idx >= 0: - text = text[:null_idx] - text = text.strip() - if text and len(text) >= 1 and len(text) <= 40: - # Check if it contains printable characters - if any(c.isalnum() for c in text): - name = text - break - except UnicodeDecodeError: - continue - except Exception: - pass + # Parse location if present (8 bytes: 4 lat + 4 lon) + if has_location: + if len(payload) < offset + 8: + return ParsedAdvertisement( + public_key=public_key, + timestamp=timestamp, + name=None, + lat=None, + lon=None, + device_role=device_role, + ) + lat_raw = int.from_bytes(payload[offset:offset + 4], byteorder="little", signed=True) + lon_raw = int.from_bytes(payload[offset + 4:offset + 8], byteorder="little", signed=True) + lat = lat_raw / 1_000_000 + lon = lon_raw / 1_000_000 + offset += 8 + + # Skip feature fields if present + if has_feature1: + offset += 2 + if has_feature2: + offset += 2 + + # Parse name if present (remaining bytes) + if has_name and len(payload) > offset: + name_bytes = payload[offset:] + try: + # Decode name, strip null bytes and control characters + name = name_bytes.decode("utf-8", errors="ignore") + # Remove null terminator and anything after + null_idx = name.find("\x00") + if null_idx >= 0: + name = name[:null_idx] + # Strip control characters and whitespace + name = "".join(c for c in name if c >= " " or c in "\t").strip() + if not name or not any(c.isalnum() for c in name): + name = None + except Exception: + name = None return ParsedAdvertisement( public_key=public_key, + timestamp=timestamp, name=name, lat=lat, lon=lon, + device_role=device_role, ) diff --git a/app/packet_processor.py b/app/packet_processor.py index ca8e47e..305e7b1 100644 --- a/app/packet_processor.py +++ b/app/packet_processor.py @@ -344,24 +344,31 @@ async def _process_advertisement( logger.debug("Failed to parse advertisement packet") return - logger.debug("Parsed advertisement from %s: %s", advert.public_key[:12], advert.name) + logger.debug( + "Parsed advertisement from %s: %s (role=%d, lat=%s, lon=%s)", + advert.public_key[:12], advert.name, advert.device_role, advert.lat, advert.lon + ) # Try to find existing contact existing = await ContactRepository.get_by_key(advert.public_key) + # Use device_role from advertisement for contact type (1=Chat, 2=Repeater, 3=Room, 4=Sensor) + # Use advert.timestamp for last_advert (sender's timestamp), receive timestamp for last_seen + contact_type = advert.device_role if advert.device_role > 0 else (existing.type if existing else 0) + contact_data = { "public_key": advert.public_key, "name": advert.name, + "type": contact_type, "lat": advert.lat, "lon": advert.lon, - "last_advert": timestamp, + "last_advert": advert.timestamp if advert.timestamp > 0 else timestamp, "last_seen": timestamp, } await ContactRepository.upsert(contact_data) # Broadcast contact update to connected clients - contact_type = existing.type if existing else 0 broadcast_event("contact", { "public_key": advert.public_key, "name": advert.name, @@ -369,7 +376,7 @@ async def _process_advertisement( "flags": existing.flags if existing else 0, "last_path": existing.last_path if existing else None, "last_path_len": existing.last_path_len if existing else -1, - "last_advert": timestamp, + "last_advert": advert.timestamp if advert.timestamp > 0 else timestamp, "lat": advert.lat, "lon": advert.lon, "last_seen": timestamp, diff --git a/tests/test_decoder.py b/tests/test_decoder.py index 3539d75..9448e3b 100644 --- a/tests/test_decoder.py +++ b/tests/test_decoder.py @@ -252,36 +252,91 @@ class TestRealWorldPackets: class TestAdvertisementParsing: """Test parsing of advertisement packets.""" - def test_parse_real_advertisement(self): - """Parse a real advertisement packet from 'Flightless 🥝'.""" + def test_parse_repeater_advertisement_with_gps(self): + """Parse a repeater advertisement with GPS coordinates.""" from app.decoder import try_parse_advertisement - # Real advertisement packet + # Repeater packet with lat/lon of 49.02056 / -123.82935 + # Flags 0x92: Role=Repeater (2), Location=Yes, Name=Yes packet_hex = ( - "1200AE92564C5C9884854F04F469BBB2BAB8871A078053AF6CF4AA2C014B18CE8A83" - "54B55C6934EAC9C9BD98A99788B1725379BB25863731ADAB605BCD62F0BA0E467483" - "E0A21E81C9279665D117B265B192890B8E0C2AE03E48DA5AA28C3EFB842EF656670B" - "915128D902B72DB5F8466C696768746C65737320F09FA59D" + "1106538B1CD273868576DC7F679B493F9AB5AC316173E1A56D3388BC3BA75F583F63" + "AB0D1BA2A8ABD0BC6669DBF719E67E4C8517BA4E0D6F8C96A323E9D13A77F2630DED" + "965A5C17C3EC6ED1601EEFE857749DA24E9F39CBEACD722C3708F433DB5FA9BAF0BA" + "F9BC5B1241069290FEEB029A839EF843616E204F204D657368203220F09FA5AB" + ) + packet = bytes.fromhex(packet_hex) + + result = try_parse_advertisement(packet) + + assert result is not None + assert result.public_key == "8576dc7f679b493f9ab5ac316173e1a56d3388bc3ba75f583f63ab0d1ba2a8ab" + assert result.name == "Can O Mesh 2 🥫" + assert result.device_role == 2 # Repeater + assert result.timestamp > 0 # Has valid timestamp + assert result.lat is not None + assert result.lon is not None + assert abs(result.lat - 49.02056) < 0.000001 + assert abs(result.lon - (-123.82935)) < 0.000001 + + def test_parse_chat_node_advertisement_with_gps(self): + """Parse a chat node advertisement with GPS coordinates.""" + from app.decoder import try_parse_advertisement + + # Chat node packet with lat/lon of 47.786038 / -122.344096 + # Flags 0x91: Role=Chat (1), Location=Yes, Name=Yes + packet_hex = ( + "1100AE92564C5C9884854F04F469BBB2BAB8871A078053AF6CF4AA2C014B18CE8A83" + "2DBF6669128E9476F36320F21D1B37FF1CF31680F50F4B17EDABCC7CF8C47D3C5E1D" + "F3AFD0C8721EA06A8078462EF241DEF80AD6922751F206E3BB121DFB604F4146D60D" + "913628D902602DB5F8466C696768746C657373F09FA59D" ) packet = bytes.fromhex(packet_hex) result = try_parse_advertisement(packet) assert result is not None - # Public key is the first 32 bytes of payload assert result.public_key == "ae92564c5c9884854f04f469bbb2bab8871a078053af6cf4aa2c014b18ce8a83" - # Name should be extracted from the end - assert result.name == "Flightless 🥝" + assert result.name == "Flightless🥝" + assert result.device_role == 1 # Chat node + assert result.timestamp > 0 # Has valid timestamp + assert result.lat is not None + assert result.lon is not None + assert abs(result.lat - 47.786038) < 0.000001 + assert abs(result.lon - (-122.344096)) < 0.000001 + + def test_parse_advertisement_without_gps(self): + """Parse an advertisement without GPS coordinates.""" + from app.decoder import try_parse_advertisement + + # Chat node packet without location + # Flags 0x81: Role=Chat (1), Location=No, Name=Yes + packet_hex = ( + "1104D7F9E07A2E38C81F7DC0C1CEDDED6B415B4367CF48F578C5A092CED3490FF0C7" + "6EFDF1F5A4BD6669D3D143CFF384D8B3BD950CDCA31C98B7DA789D004D04DED31E16" + "B998E1AE352B283EAC8ABCF1F07214EC3BBF7AF3EB8EBF15C00417F2425A259E7CE6" + "A875BA0D814D656E6E697344" + ) + packet = bytes.fromhex(packet_hex) + + result = try_parse_advertisement(packet) + + assert result is not None + assert result.public_key == "2e38c81f7dc0c1cedded6b415b4367cf48f578c5a092ced3490ff0c76efdf1f5" + assert result.name == "MennisD" + assert result.device_role == 1 # Chat node + assert result.timestamp > 0 # Has valid timestamp + assert result.lat is None + assert result.lon is None def test_parse_advertisement_extracts_public_key(self): """Advertisement parsing extracts the public key correctly.""" from app.decoder import parse_packet, PayloadType packet_hex = ( - "1200AE92564C5C9884854F04F469BBB2BAB8871A078053AF6CF4AA2C014B18CE8A83" - "54B55C6934EAC9C9BD98A99788B1725379BB25863731ADAB605BCD62F0BA0E467483" - "E0A21E81C9279665D117B265B192890B8E0C2AE03E48DA5AA28C3EFB842EF656670B" - "915128D902B72DB5F8466C696768746C65737320F09FA59D" + "1100AE92564C5C9884854F04F469BBB2BAB8871A078053AF6CF4AA2C014B18CE8A83" + "2DBF6669128E9476F36320F21D1B37FF1CF31680F50F4B17EDABCC7CF8C47D3C5E1D" + "F3AFD0C8721EA06A8078462EF241DEF80AD6922751F206E3BB121DFB604F4146D60D" + "913628D902602DB5F8466C696768746C657373F09FA59D" ) packet = bytes.fromhex(packet_hex)