Files
LoraSA/rtl/rtl-scanner.py
2025-01-06 00:02:55 -08:00

187 lines
6.3 KiB
Python

import numpy as np
from rtlsdr import RtlSdr
import asyncio
import traceback
import time
import os
def printxy(x, y, text):
"""
Print text at a specific (x, y) coordinate in the console.
:param x: Column number (1-based)
:param y: Row number (1-based)
:param text: The text to print
"""
# ANSI escape code to move the cursor to (y, x)
print(f"\033[{y};{x}H{text}", end="", flush=True)
def bar_draw(x,y,rssi,max_height=20,symbol="#"):
normalized = rssi / (rssi + 1e-6) # Normalize to 0-1
height = int(normalized * max_height)
for i in range(max_height):
printxy(x,y-i," ")
for i in range(height):
printxy(x,y-i,symbol)
def ascii_bar_chart(data, start_freq, step, max_height=20, symbol='#'):
"""
Converts an array of RSSI values into an ASCII vertical bar chart with RSSI labels on the left and MHz labels under every 5th step.
"""
min_rssi = min(data)
max_rssi = max(data)
normalized = [(x - min_rssi) / (max_rssi - min_rssi + 1e-6) for x in data] # Normalize to 0-1
heights = [int(value * max_height) for value in normalized]
# Generate left-aligned RSSI labels
rssi_range = np.linspace(max_rssi, min_rssi, max_height)
rssi_labels = [f"{rssi:.1f} dB".rjust(8) for rssi in rssi_range]
# Print the vertical bars with RSSI labels
for level, label in zip(range(max_height, 0, -1), rssi_labels):
line = label + ' ' # Add RSSI label
for height in heights:
line += symbol if height >= level else ' '
print(line)
# Print x-axis
freq_labels = [
f"{(start_freq + step * i) / 1e6:.0f}" if i % 5 == 0 else ""
for i in range(len(data))
]
print(" " * 9 + '-' * len(data)) # Bar separator
x_axis = " " * 9
printed = 0
previous_label = ""
for i, label in enumerate(freq_labels):
if label:
printed = 0
previous_label = label
x_axis += label #.center(1) # Center the MHz label under the bar
else:
if printed < (5 - len(previous_label)):
x_axis += "|" # No space between bars
printed += 1
print(x_axis)
init=True
async def read_samples_async(sdr, fft_size):
"""
Wrapper to make the blocking read_samples method work asynchronously.
"""
return await asyncio.to_thread(sdr.read_samples, fft_size)
async def scan_frequency_range(start_freq, end_freq, step, sdr1, sdr2, fft_size=1024*4):
"""
Scans a frequency range using two RTL-SDR devices and calculates RSSI for each frequency.
"""
center_frequencies = np.arange(start_freq, end_freq + step, step)
rssi_values1 = []
rssi_values2 = []
x=9
y=20
for freq in center_frequencies:
sdr1.center_freq = freq
sdr2.center_freq = freq
sdr1.read_samples(2048)
sdr2.read_samples(2048)
# Read samples asynchronously from both devices
samples1, samples2 = await asyncio.gather(
read_samples_async(sdr1, fft_size), #sdr1.read_samples(fft_size),
read_samples_async(sdr2, fft_size),
)
# Perform FFT and calculate RSSI
power_spectrum1 = np.abs(np.fft.fft(samples1))**2
power_spectrum2 = np.abs(np.fft.fft(samples2))**2
power_db1 = 10 * np.log10(power_spectrum1 + 1e-6) # Convert to dB
power_db2 = 10 * np.log10(power_spectrum2 + 1e-6) # Convert to dB
avg_rssi1 = np.percentile(power_db1, 85) # 90th percentile RSSI for this frequency
avg_rssi2 = np.percentile(power_db2, 85) # 90th percentile RSSI for this frequency
rssi_values1.append(avg_rssi1)
#bar_draw(x,y,avg_rssi1,20,"#")
rssi_values2.append(avg_rssi2)
#bar_draw(x,y+24,avg_rssi2,20,"#")
x=x+1
return rssi_values1, rssi_values2
async def process_samples_async(samples):
"""
Wrapper to make the blocking read_samples method work asynchronously.
"""
return await asyncio.to_thread(process_samples, samples)
def process_samples(samples):
rssi_values = []
power_spectrum = np.abs(np.fft.fft(samples))**2
power_db = 10 * np.log10(power_spectrum + 1e-6) # Convert to dB
avg_rssi = np.percentile(power_db, 90) # 90th percentile RSSI for this frequency
rssi_values.append(avg_rssi)
return rssi_values
async def main():
# Frequency range in Hz
start_freq = 880e6 # 800 MHz
end_freq = 960e6 # 900 MHz
step = 1e6 # 1 MHz steps
# Initialize two RTL-SDR devices
sdr1 = RtlSdr(0) # First SDR device
sdr2 = RtlSdr(1) # Second SDR device
# List available devices
devices = RtlSdr.get_device_serial_addresses()
print("Available devices:", devices)
# Set parameters for both devices
for sdr in [sdr1, sdr2]:
sdr.sample_rate = 2.048e6 # 2.048 MSPS
sdr.gain = 40 # Adjust gain as needed
try:
while True:
try:
start_time = time.time()
rssi_values1, rssi_values2 = await scan_frequency_range(start_freq, end_freq, step, sdr1, sdr2)
end_time = time.time()
# Clear screen
os.system('clear' if os.name == 'posix' else 'cls')
# Scan frequency range with both devices simultaneously
print(f"Scanning range: {start_freq / 1e6:.0f} MHz to {end_freq / 1e6:.0f} MHz")
print(f"Scanning time: {end_time - start_time:.2f} seconds")
print(f"Single MHz time: {((end_time - start_time)/(end_freq / 1e6 - start_freq / 1e6)):.2f} seconds")
# Display ASCII bar chart for Device 1
print("RSSI (dB) for Device 1:")
ascii_bar_chart(rssi_values1, start_freq, step)
# Display ASCII bar chart for Device 2
print("\nRSSI (dB) for Device 2:")
ascii_bar_chart(rssi_values2, start_freq, step)
except Exception as e:
print(f"Error during scanning: {e}")
traceback.print_exc()
# Optionally, wait before retrying
time.sleep(0.5)
#time.sleep(0.5) # Refresh every 0.5 seconds
except KeyboardInterrupt:
print("\nStopping scan...")
finally:
sdr1.close()
sdr2.close()
if __name__ == '__main__':
asyncio.run(main())