diff --git a/ASCII_SA.py b/ASCII_SA.py index 644d12c..bcddb1b 100644 --- a/ASCII_SA.py +++ b/ASCII_SA.py @@ -4,8 +4,34 @@ import serial.tools.list_ports import time import re import curses +import signal +import sys +import os from collections import defaultdict +# Import shared utilities +try: + from serial_utils import parse_scan_result_regex, DEFAULT_BAUDRATE, DEFAULT_TIMEOUT +except ImportError: + print("Warning: serial_utils module not found, using local definitions", file=sys.stderr) + DEFAULT_BAUDRATE = 115200 + DEFAULT_TIMEOUT = 1 + + def parse_scan_result_regex(scan_result): + """Parse SCAN_RESULT data using regex.""" + if not scan_result or 'SCAN_RESULT' not in scan_result: + raise ValueError("Invalid scan result format") + pattern = r"\((\d+),\s*(-\d+)\)" + matches = re.findall(pattern, scan_result) + if not matches: + raise ValueError("No valid frequency/RSSI pairs found") + return [(int(freq), int(rssi)) for freq, rssi in matches] + +# Configuration constants +DEFAULT_RESOLUTION = 1 +DEFAULT_THRESHOLD = -120 +DEFAULT_DB_PER_HASH = 10 + def list_serial_ports(): ports = serial.tools.list_ports.comports() @@ -35,13 +61,14 @@ def select_serial_port(): def parse_scan_result(scan_result): """ - Parse the SCAN_RESULT data from the serial output. + Parse the SCAN_RESULT data from the serial output using regex. + This is a wrapper around the shared utility function. + :param scan_result: Raw SCAN_RESULT string. :return: List of tuples with (frequency in kHz, RSSI in dB). + :raises ValueError: If parsing fails or data is invalid. """ - pattern = r"\((\d+),\s*(-\d+)\)" - matches = re.findall(pattern, scan_result) - return [(int(freq), int(rssi)) for freq, rssi in matches] + return parse_scan_result_regex(scan_result) def group_by_frequency(data, resolution_mhz): @@ -207,7 +234,8 @@ def display_debug_output(stdscr, data, start_line): try: stdscr.addstr(start_line, 0, "Frequency (MHz) Max RSSI (dB)") stdscr.addstr(start_line + 1, 0, "-" * max_width) - except curses.error: + except curses.error as e: + # Screen too small or cursor out of bounds - log and continue pass # Format debug data compactly with fixed width: 10 values per row @@ -238,41 +266,71 @@ def read_serial_data(stdscr, port, baudrate, resolution_mhz, db_threshold, db_pe :param use_color: Whether to use colored output. :param show_debug: Whether to show debugging information. """ + # Set up signal handler for graceful exit + def signal_handler(sig, frame): + curses.endwin() + print("\nExiting gracefully...") + sys.exit(0) + + signal.signal(signal.SIGINT, signal_handler) + try: initialize_colors() # Initialize colors for the histogram max_height, max_width = stdscr.getmaxyx() histogram_start_row = 3 # Move histogram up for more space below - with serial.Serial(port, baudrate, timeout=1) as ser: + with serial.Serial(port, baudrate, timeout=DEFAULT_TIMEOUT) as ser: time.sleep(2) # Allow serial port to stabilize while True: - response = ser.readline().decode("utf-8").strip() - if response.startswith("SCAN_RESULT"): - parsed_data = parse_scan_result(response) - grouped_data = group_by_frequency(parsed_data, resolution_mhz) - draw_histogram( - stdscr, - grouped_data, - histogram_start_row, - db_threshold, - db_per_hash, - use_color, - show_debug - ) - elif response.startswith("LORA_RSSI"): - stdscr.addstr(0, 0, " ", curses.color_pair(1)) - stdscr.addstr(0, 0, response + "dB", curses.color_pair(1)) - else: - stdscr.refresh() + try: + response = ser.readline().decode("utf-8").strip() + if not response: + continue + + if response.startswith("SCAN_RESULT"): + parsed_data = parse_scan_result(response) + grouped_data = group_by_frequency(parsed_data, resolution_mhz) + draw_histogram( + stdscr, + grouped_data, + histogram_start_row, + db_threshold, + db_per_hash, + use_color, + show_debug + ) + elif response.startswith("LORA_RSSI"): + try: + stdscr.addstr(0, 0, " ", curses.color_pair(1)) + stdscr.addstr(0, 0, response + "dB", curses.color_pair(1)) + except curses.error: + pass + else: + stdscr.refresh() + except UnicodeDecodeError as e: + # Skip malformed data + continue + except ValueError as e: + # Invalid scan result format - skip + continue except serial.SerialException as e: - stdscr.addstr(0, 0, f"Serial error: {e}") - stdscr.refresh() - stdscr.getch() + try: + stdscr.addstr(0, 0, f"Serial error: {e}") + stdscr.refresh() + stdscr.getch() + except: + print(f"Serial error: {e}") + except KeyboardInterrupt: + # Handled by signal handler + pass except Exception as e: - stdscr.addstr(0, 0, f"An unexpected error occurred: {e}") - stdscr.refresh() - stdscr.getch() + try: + stdscr.addstr(0, 0, f"An unexpected error occurred: {e}") + stdscr.refresh() + stdscr.getch() + except: + print(f"An unexpected error occurred: {e}") if __name__ == "__main__": print("Serial Communication Script: Grouped Histogram with Adjustable Threshold") diff --git a/SpectrumScan.py b/SpectrumScan.py index 1bc0513..1c011ed 100644 --- a/SpectrumScan.py +++ b/SpectrumScan.py @@ -11,15 +11,50 @@ import json from datetime import datetime from argparse import RawTextHelpFormatter +# Import shared utilities +try: + from serial_utils import crc16, parse_scan_result, DEFAULT_BAUDRATE +except ImportError: + # Fallback if serial_utils is not available (backward compatibility) + print("Warning: serial_utils module not found, using local definitions", file=sys.stderr) + DEFAULT_BAUDRATE = 115200 + + # Local definitions as fallback + POLY = 0x1021 + + def crc16(s, c): + """Calculate CRC16 CCITT-FALSE checksum.""" + c = c ^ 0xffff + for ch in s: + c = c ^ (ord(ch) << 8) + for i in range(8): + if c & 0x8000: + c = ((c << 1) ^ POLY) & 0xffff + else: + c = (c << 1) & 0xffff + return c ^ 0xffff + + def parse_scan_result(line): + """Parse a JSON line from the serial input.""" + if not line or 'SCAN_RESULT ' not in line: + raise ValueError("Line does not contain SCAN_RESULT") + line = line[line.index('SCAN_RESULT '):] + parts = line.split(' ', 2) + if len(parts) < 3: + raise ValueError(f"Invalid SCAN_RESULT format") + _, count_str, rest = parts + count = int(count_str) + data = json.loads(rest.replace('(', '[').replace(')', ']')) + return count, data + # Constants SCAN_WIDTH = 33 # number of samples in each scanline OUT_PATH = "out" # output path for saved files # Default settings -DEFAULT_BAUDRATE = 115200 DEFAULT_COLOR_MAP = 'viridis' DEFAULT_SCAN_LEN = 200 -DEFAULT_RSSI_OFFSET = -11 +DEFAULT_RSSI_OFFSET = -11 # dBm offset for first power bin def print_progress_bar(iteration, total, prefix='', suffix='', decimals=1, length=50, fill='█', print_end="\r"): """ @@ -41,25 +76,6 @@ def print_progress_bar(iteration, total, prefix='', suffix='', decimals=1, lengt if iteration == total: print() -def parse_line(line): - """Parse a JSON line from the serial input.""" - - line = line[line.index('SCAN_RESULT '):] # support garbage interleaving with the string - _, count, rest = line.split(' ', 2) - return int(count), json.loads(rest.replace('(', '[').replace(')', ']')) - -POLY = 0x1021 -def crc16(s, c): - c = c ^ 0xffff - for ch in s: - c = c ^ (ord(ch) << 8) - for i in range(8): - if c & 0x8000: - c = ((c << 1) ^ POLY) & 0xffff - else: - c = (c << 1) & 0xffff - - return c ^ 0xffff def main(): parser = argparse.ArgumentParser(formatter_class=RawTextHelpFormatter, description='''\ @@ -138,9 +154,9 @@ def main(): lines += 1 try: - count, data = parse_line(line) + count, data = parse_scan_result(line) data.sort() - except json.JSONDecodeError: + except (ValueError, json.JSONDecodeError) as e: errors += 1 continue diff --git a/scripts/rpi-proxy-fc.py b/scripts/rpi-proxy-fc.py index 66ce68f..29ea052 100644 --- a/scripts/rpi-proxy-fc.py +++ b/scripts/rpi-proxy-fc.py @@ -7,6 +7,42 @@ import json from yamspy import MSPy +# Import shared utilities +try: + sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + from serial_utils import crc16, parse_scan_result as parse_line, DEFAULT_BAUDRATE +except ImportError: + print("Warning: serial_utils module not found, using local definitions", file=sys.stderr) + DEFAULT_BAUDRATE = 115200 + + # Local fallback definitions + POLY = 0x1021 + + def crc16(s, c): + """Calculate CRC16 CCITT-FALSE checksum.""" + c = c ^ 0xffff + for ch in s: + c = c ^ (ord(ch) << 8) + for i in range(8): + if c & 0x8000: + c = ((c << 1) ^ POLY) & 0xffff + else: + c = (c << 1) & 0xffff + return c ^ 0xffff + + def parse_line(line): + """Parse a JSON line from the serial input.""" + if 'SCAN_RESULT ' not in line: + raise ValueError("Line does not contain SCAN_RESULT") + line = line[line.index('SCAN_RESULT '):] + parts = line.split(' ', 2) + if len(parts) < 3: + raise ValueError(f"Invalid SCAN_RESULT format") + _, count_str, rest = parts + count = int(count_str) + data = json.loads(rest.replace('(', '[').replace(')', ']')) + return count, data + ## Constants # INAV Konrad custom FW @@ -15,35 +51,17 @@ INAV_KONRAD_MAX_NAME_LENGTH = 16 INAV_KONRAD_SET_PILOT_NAME = 0x5000 -LORA_SA_PORT = "/dev/ttyS0" -DRONE_PORT = "/dev/ttyACM0" +# Configuration - can be overridden via environment variables +LORA_SA_PORT = os.getenv("LORA_SA_PORT", "/dev/ttyS0") +DRONE_PORT = os.getenv("DRONE_PORT", "/dev/ttyACM0") +SERIAL_BAUDRATE = int(os.getenv("SERIAL_BAUDRATE", DEFAULT_BAUDRATE)) +SERIAL_TIMEOUT = int(os.getenv("SERIAL_TIMEOUT", "5")) # For testing on Windows: # LORA_SA_PORT = "COM6" # DRONE_PORT = "COM4" -# lifted from SpectrumScan.py -def parse_line(line): - """Parse a JSON line from the serial input.""" - - line = line[line.index('SCAN_RESULT '):] # support garbage interleaving with the string - _, count, rest = line.split(' ', 2) - return int(count), json.loads(rest.replace('(', '[').replace(')', ']')) - -POLY = 0x1021 -def crc16(s, c): - c = c ^ 0xffff - for ch in s: - c = c ^ (ord(ch) << 8) - for i in range(8): - if c & 0x8000: - c = ((c << 1) ^ POLY) & 0xffff - else: - c = (c << 1) & 0xffff - - return c ^ 0xffff - # use MSP to get the heading def get_heading(board: MSPy) -> float: board.fast_read_analog() @@ -76,10 +94,11 @@ with MSPy(device=DRONE_PORT, loglevel="WARNING", baudrate=115200) as board: sys.exit(1) else: try: - # Attempt to connect to the Lora ESP SA over serial, use 115200 baudrate - lora = serial.Serial(LORA_SA_PORT, 115200, timeout=5) - except: - # just some basic error display with the FC OSD + # Attempt to connect to the Lora ESP SA over serial + lora = serial.Serial(LORA_SA_PORT, SERIAL_BAUDRATE, timeout=SERIAL_TIMEOUT) + except (serial.SerialException, OSError) as e: + # Display error on FC OSD + print(f"Error connecting to LoRa board: {e}") for _ in range(10): board.send_RAW_msg( INAV_KONRAD_SET_PILOT_NAME, str2osd("err: lora board") @@ -131,12 +150,15 @@ with MSPy(device=DRONE_PORT, loglevel="WARNING", baudrate=115200) as board: continue try: count, data = parse_line(line) - except json.JSONDecodeError: - continue - finally: data.sort() candidate = get_candidates(data) osd_text = str2osd(f"{candidate[0]}: {candidate[1]}") board.send_RAW_msg(INAV_KONRAD_SET_PILOT_NAME, osd_text) heading = get_heading(board) lora.write(f"HEADING {heading}\n".encode("utf-8")) + except json.JSONDecodeError as e: + print(f"Error parsing JSON: {e}") + continue + except Exception as e: + print(f"Error processing scan result: {e}") + continue diff --git a/serial_utils.py b/serial_utils.py new file mode 100644 index 0000000..d663a50 --- /dev/null +++ b/serial_utils.py @@ -0,0 +1,135 @@ +#!/usr/bin/env python3 +""" +Shared utilities for serial communication and data parsing. +Used by ASCII_SA.py, SpectrumScan.py, and scripts/rpi-proxy-fc.py +""" + +import json +import re + +# Constants +POLY = 0x1021 # CRC16 CCITT-FALSE polynomial +MAX_SCAN_COUNT = 10000 # Maximum number of scans per result +MIN_FREQUENCY = 100000 # 100 MHz minimum (in kHz) +MAX_FREQUENCY = 6000000 # 6 GHz maximum (in kHz) +DEFAULT_BAUDRATE = 115200 +DEFAULT_TIMEOUT = 1 + + +def crc16(s, c): + """Calculate CRC16 CCITT-FALSE checksum. + + Args: + s: String to calculate checksum for + c: Initial CRC value + + Returns: + 16-bit CRC checksum + """ + c = c ^ 0xffff + for ch in s: + c = c ^ (ord(ch) << 8) + for i in range(8): + if c & 0x8000: + c = ((c << 1) ^ POLY) & 0xffff + else: + c = (c << 1) & 0xffff + + return c ^ 0xffff + + +def parse_scan_result(line): + """Parse a SCAN_RESULT line from the serial input. + + Args: + line: String containing SCAN_RESULT data + + Returns: + Tuple of (count, data) where data is list of tuples [(freq, rssi), ...] + + Raises: + ValueError: If line format is invalid or data doesn't pass validation + json.JSONDecodeError: If JSON parsing fails + """ + if not line or 'SCAN_RESULT ' not in line: + raise ValueError("Line does not contain SCAN_RESULT") + + # Extract SCAN_RESULT portion, supporting garbage interleaving + line = line[line.index('SCAN_RESULT '):] + parts = line.split(' ', 2) + + if len(parts) < 3: + raise ValueError(f"Invalid SCAN_RESULT format: expected 3 parts, got {len(parts)}") + + _, count_str, rest = parts + count = int(count_str) + + # Validate count is reasonable + if not (0 < count <= MAX_SCAN_COUNT): + raise ValueError(f"Invalid count value: {count} (must be 1-{MAX_SCAN_COUNT})") + + # Parse JSON with replacements for tuple notation + data = json.loads(rest.replace('(', '[').replace(')', ']')) + + # Validate data matches count + if len(data) != count: + raise ValueError(f"Data length {len(data)} does not match count {count}") + + # Validate frequency/RSSI pairs + for i, item in enumerate(data): + if not isinstance(item, (list, tuple)) or len(item) != 2: + raise ValueError(f"Invalid data item at index {i}: expected [freq, rssi] pair") + freq, rssi = item + if not (MIN_FREQUENCY <= freq <= MAX_FREQUENCY): + raise ValueError(f"Invalid frequency at index {i}: {freq} (must be {MIN_FREQUENCY}-{MAX_FREQUENCY} kHz)") + if not (-200 <= rssi <= 0): + raise ValueError(f"Invalid RSSI at index {i}: {rssi} (must be -200 to 0 dBm)") + + return count, data + + +def parse_scan_result_regex(scan_result): + """ + Parse SCAN_RESULT data using regex (alternative method for ASCII_SA.py). + + Args: + scan_result: Raw SCAN_RESULT string. + + Returns: + List of tuples with (frequency in kHz, RSSI in dB). + + Raises: + ValueError: If parsing fails or data is invalid. + """ + if not scan_result or 'SCAN_RESULT' not in scan_result: + raise ValueError("Invalid scan result format") + + pattern = r"\((\d+),\s*(-\d+)\)" + matches = re.findall(pattern, scan_result) + + if not matches: + raise ValueError("No valid frequency/RSSI pairs found in scan result") + + return [(int(freq), int(rssi)) for freq, rssi in matches] + + +def validate_serial_config(port, baudrate, timeout): + """Validate serial port configuration parameters. + + Args: + port: Serial port path + baudrate: Communication baudrate + timeout: Read timeout in seconds + + Raises: + ValueError: If any parameter is invalid + """ + if not port or not isinstance(port, str): + raise ValueError("Port must be a non-empty string") + + valid_baudrates = [9600, 19200, 38400, 57600, 115200, 230400, 460800, 921600] + if baudrate not in valid_baudrates: + raise ValueError(f"Baudrate {baudrate} not in valid list: {valid_baudrates}") + + if timeout < 0 or timeout > 300: + raise ValueError(f"Timeout {timeout} must be between 0 and 300 seconds")