mirror of
https://github.com/Genaker/LoraSA.git
synced 2026-03-28 17:42:59 +01:00
285 lines
11 KiB
Python
285 lines
11 KiB
Python
import serial
|
|
import serial.tools.list_ports
|
|
import time
|
|
import re
|
|
import curses
|
|
from collections import defaultdict
|
|
|
|
|
|
def list_serial_ports():
|
|
ports = serial.tools.list_ports.comports()
|
|
return [port.device for port in ports]
|
|
|
|
|
|
def select_serial_port():
|
|
ports = list_serial_ports()
|
|
if not ports:
|
|
print("No serial ports found. Please connect a device and try again.")
|
|
return None
|
|
|
|
print("Available Serial Ports:")
|
|
for i, port in enumerate(ports):
|
|
print(f"{i + 1}: {port}")
|
|
|
|
while True:
|
|
try:
|
|
choice = int(input("Select a serial port (number): "))
|
|
if 1 <= choice <= len(ports):
|
|
return ports[choice - 1]
|
|
else:
|
|
print(f"Please select a valid option between 1 and {len(ports)}.")
|
|
except ValueError:
|
|
print("Invalid input. Please enter a number.")
|
|
|
|
|
|
def parse_scan_result(scan_result):
|
|
"""
|
|
Parse the SCAN_RESULT data from the serial output.
|
|
:param scan_result: Raw SCAN_RESULT string.
|
|
:return: List of tuples with (frequency in kHz, RSSI in dB).
|
|
"""
|
|
pattern = r"\((\d+),\s*(-\d+)\)"
|
|
matches = re.findall(pattern, scan_result)
|
|
return [(int(freq), int(rssi)) for freq, rssi in matches]
|
|
|
|
|
|
def group_by_frequency(data, resolution_mhz):
|
|
"""
|
|
Group data by the specified frequency resolution.
|
|
:param data: List of tuples (frequency in kHz, RSSI in dB).
|
|
:param resolution_mhz: Frequency resolution in MHz for grouping.
|
|
:return: Grouped data as a list of (group frequency in MHz, max RSSI).
|
|
"""
|
|
grouped = defaultdict(lambda: float('-inf')) # Default to lowest RSSI
|
|
resolution_khz = resolution_mhz * 1000 # Convert MHz to kHz
|
|
|
|
for freq, rssi in data:
|
|
group_freq = (freq // resolution_khz) * resolution_khz
|
|
grouped[group_freq] = max(grouped[group_freq], rssi)
|
|
|
|
return sorted((freq // 1000, rssi) for freq, rssi in grouped.items())
|
|
|
|
|
|
def calculate_bar_length(rssi, db_threshold, db_per_hash, max_bar_length=15):
|
|
"""
|
|
Calculate the bar length for the histogram based on the RSSI value and threshold.
|
|
:param rssi: The RSSI value (dB).
|
|
:param db_threshold: The threshold for noise filtering.
|
|
:param db_per_hash: Number of dB per # in the bar.
|
|
:param max_bar_length: The maximum bar length in characters.
|
|
:return: Length of the bar as an integer.
|
|
"""
|
|
if rssi < db_threshold:
|
|
return 0
|
|
effective_rssi = rssi - db_threshold # Start bars at the threshold
|
|
return min(max_bar_length, effective_rssi // db_per_hash)
|
|
|
|
|
|
def clear_histogram_area(stdscr, start_row, height, width):
|
|
"""
|
|
Clears the histogram area by overwriting it with spaces.
|
|
:param stdscr: The curses screen object.
|
|
:param start_row: The starting row for the histogram area.
|
|
:param height: The height of the area to clear.
|
|
:param width: The width of the terminal.
|
|
"""
|
|
for row in range(start_row, start_row + height):
|
|
stdscr.addstr(row, 0, " " * width)
|
|
|
|
|
|
def initialize_colors():
|
|
"""
|
|
Initialize color pairs for curses.
|
|
"""
|
|
curses.start_color()
|
|
curses.use_default_colors()
|
|
|
|
# Define color pairs
|
|
curses.init_pair(5, 240, -1) # Low RSSI
|
|
curses.init_pair(1, curses.COLOR_BLUE, -1) # Low RSSI
|
|
curses.init_pair(2, curses.COLOR_GREEN, -1) # Moderate RSSI
|
|
curses.init_pair(3, curses.COLOR_YELLOW, -1) # High RSSI
|
|
curses.init_pair(4, curses.COLOR_RED, -1) # Very High RSSI
|
|
|
|
|
|
def get_color_for_rssi(rssi, use_color=True, db_threshold=0):
|
|
"""
|
|
Determine the color based on RSSI value.
|
|
:param rssi: The RSSI value in dB.
|
|
:param use_color: Whether to use colored output.
|
|
:param db_threshold: The threshold value for noise filtering.
|
|
:return: Color pair number.
|
|
"""
|
|
if rssi < -90 or use_color == False:
|
|
return curses.color_pair(5) # Blue for low RSSI
|
|
if rssi < -80:
|
|
return curses.color_pair(1) # Blue for low RSSI
|
|
elif rssi < -75:
|
|
return curses.color_pair(2) # Green for moderate RSSI
|
|
elif rssi < -70:
|
|
return curses.color_pair(3) # Yellow for high RSSI
|
|
else:
|
|
return curses.color_pair(4) # Red for very high RSSI
|
|
|
|
|
|
def draw_histogram(stdscr, data, histogram_start_row, db_threshold, db_per_hash, use_color=True, show_debug=True):
|
|
"""
|
|
Draw the histogram with colored bars, dB labels on the left, and a two-line frequency legend.
|
|
Optionally display debug information.
|
|
:param stdscr: The curses screen object.
|
|
:param data: List of tuples containing (frequency in MHz, RSSI in dB).
|
|
:param histogram_start_row: The starting row for the histogram area.
|
|
:param db_threshold: The threshold for noise filtering.
|
|
:param db_per_hash: Number of dB per # in the bar.
|
|
:param use_color: Whether to use colored output.
|
|
:param show_debug: Whether to show debugging information.
|
|
"""
|
|
max_height, max_width = stdscr.getmaxyx()
|
|
|
|
# Clear the histogram area
|
|
clear_histogram_area(stdscr, histogram_start_row, 15, max_width)
|
|
|
|
baseline_row = histogram_start_row + 12 # Set the baseline for bars
|
|
legend_start = baseline_row + 1 # Legends go below the baseline
|
|
|
|
# Calculate maximum bar height
|
|
max_bar_height = baseline_row - histogram_start_row - 1
|
|
|
|
# Draw dB scale on the left, moved 1 row down
|
|
for y in range(max_bar_height + 1):
|
|
db_value = db_threshold + (max_bar_height - y) * db_per_hash
|
|
try:
|
|
# Offset dB scale by 1 row
|
|
stdscr.addstr(histogram_start_row + y + 1, 0, f"{db_value:>4}")
|
|
except curses.error:
|
|
pass
|
|
|
|
# Draw histogram
|
|
for i, (freq_mhz, rssi) in enumerate(data):
|
|
bar_x = i * 2 + 5 # Offset to align bars close to the dB legend
|
|
|
|
if rssi >= db_threshold:
|
|
bar_length = calculate_bar_length(rssi, db_threshold, db_per_hash)
|
|
bar_color = get_color_for_rssi(rssi, use_color)
|
|
# Draw the bar vertically, starting from the baseline
|
|
for y in range(bar_length):
|
|
char = "=" if y == bar_length - 1 else "#"
|
|
try:
|
|
stdscr.addstr(baseline_row - y, bar_x, char, bar_color)
|
|
except curses.error:
|
|
pass
|
|
else:
|
|
# Show '=' at the baseline for values below the threshold
|
|
try:
|
|
stdscr.addstr(baseline_row, bar_x, "=", curses.color_pair(1)) # Grey for below threshold
|
|
except curses.error:
|
|
pass
|
|
|
|
# Write legends below the bars
|
|
try:
|
|
if i % 2 == 0:
|
|
stdscr.addstr(legend_start, bar_x, f"{freq_mhz}")
|
|
else:
|
|
stdscr.addstr(legend_start + 1, bar_x, f"{freq_mhz}")
|
|
except curses.error:
|
|
pass
|
|
|
|
if show_debug:
|
|
# Display debug information if enabled
|
|
debug_start = legend_start + 3
|
|
display_debug_output(stdscr, data, debug_start)
|
|
|
|
stdscr.refresh()
|
|
|
|
|
|
def display_debug_output(stdscr, data, start_line):
|
|
"""
|
|
Display debug output below the histogram with frequency and RSSI values.
|
|
:param stdscr: The curses screen object.
|
|
:param data: Grouped data containing frequencies and RSSI values.
|
|
:param start_line: The starting line for the debug output.
|
|
"""
|
|
max_height, max_width = stdscr.getmaxyx()
|
|
|
|
# Header
|
|
try:
|
|
stdscr.addstr(start_line, 0, "Frequency (MHz) Max RSSI (dB)")
|
|
stdscr.addstr(start_line + 1, 0, "-" * max_width)
|
|
except curses.error:
|
|
pass
|
|
|
|
# Format debug data compactly with fixed width: 10 values per row
|
|
compact_data = [f"{freq_mhz:4}:{rssi:4}" for freq_mhz, rssi in data]
|
|
rows = [compact_data[i:i + 10] for i in range(0, len(compact_data), 10)]
|
|
|
|
# Display each row
|
|
for row_index, row in enumerate(rows):
|
|
if start_line + row_index + 2 < max_height:
|
|
line = " ".join(row)
|
|
try:
|
|
stdscr.addstr(start_line + row_index + 2, 0, line[:max_width])
|
|
except curses.error:
|
|
pass
|
|
|
|
stdscr.refresh()
|
|
|
|
|
|
def read_serial_data(stdscr, port, baudrate, resolution_mhz, db_threshold, db_per_hash, use_color, show_debug):
|
|
"""
|
|
Read serial data and dynamically update the histogram with optional debug output.
|
|
:param stdscr: The curses screen object.
|
|
:param port: The serial port.
|
|
:param baudrate: The baud rate.
|
|
:param resolution_mhz: Frequency resolution in MHz for grouping.
|
|
:param db_threshold: The threshold for noise filtering.
|
|
:param db_per_hash: Number of dB per # in the bar.
|
|
:param use_color: Whether to use colored output.
|
|
:param show_debug: Whether to show debugging information.
|
|
"""
|
|
try:
|
|
initialize_colors() # Initialize colors for the histogram
|
|
max_height, max_width = stdscr.getmaxyx()
|
|
histogram_start_row = 3 # Move histogram up for more space below
|
|
|
|
with serial.Serial(port, baudrate, timeout=1) as ser:
|
|
time.sleep(2) # Allow serial port to stabilize
|
|
|
|
while True:
|
|
response = ser.readline().decode("utf-8").strip()
|
|
if response.startswith("SCAN_RESULT"):
|
|
parsed_data = parse_scan_result(response)
|
|
grouped_data = group_by_frequency(parsed_data, resolution_mhz)
|
|
draw_histogram(
|
|
stdscr,
|
|
grouped_data,
|
|
histogram_start_row,
|
|
db_threshold,
|
|
db_per_hash,
|
|
use_color,
|
|
show_debug
|
|
)
|
|
else:
|
|
stdscr.refresh()
|
|
except serial.SerialException as e:
|
|
stdscr.addstr(0, 0, f"Serial error: {e}")
|
|
stdscr.refresh()
|
|
stdscr.getch()
|
|
except Exception as e:
|
|
stdscr.addstr(0, 0, f"An unexpected error occurred: {e}")
|
|
stdscr.refresh()
|
|
stdscr.getch()
|
|
|
|
if __name__ == "__main__":
|
|
print("Serial Communication Script: Grouped Histogram with Adjustable Threshold")
|
|
selected_port = select_serial_port()
|
|
|
|
if selected_port:
|
|
baudrate = 115200
|
|
resolution_mhz = int(input("Enter frequency resolution in MHz (default: 1 MHz): ") or 1)
|
|
db_threshold = int(input("Enter minimum RSSI value to start bars (default: -110): ") or -110)
|
|
db_per_hash = int(input("Enter dB per #: (default: 5): ") or 5)
|
|
show_debug = input("Show debug information? (yes/no, default: yes): ").strip().lower() not in ["no", "n"]
|
|
use_color = input("Use colored output? (yes/no, default: yes): ").strip().lower() not in ["no", "n"]
|
|
curses.wrapper(read_serial_data, selected_port, baudrate, resolution_mhz, db_threshold, db_per_hash, use_color, show_debug)
|
|
|