mirror of
https://github.com/Genaker/LoraSA.git
synced 2026-05-02 19:42:59 +02:00
rdl SDR dual
This commit is contained in:
@@ -1,5 +1,7 @@
|
||||
import numpy as np
|
||||
from rtlsdr import RtlSdr
|
||||
import asyncio
|
||||
import traceback
|
||||
import time
|
||||
import os
|
||||
|
||||
@@ -11,7 +13,7 @@ def ascii_bar_chart(data, start_freq, step, max_height=20, symbol='#'):
|
||||
max_rssi = max(data)
|
||||
normalized = [(x - min_rssi) / (max_rssi - min_rssi + 1e-6) for x in data] # Normalize to 0-1
|
||||
heights = [int(value * max_height) for value in normalized]
|
||||
|
||||
|
||||
# Generate left-aligned RSSI labels
|
||||
rssi_range = np.linspace(max_rssi, min_rssi, max_height)
|
||||
rssi_labels = [f"{rssi:.1f} dB".rjust(8) for rssi in rssi_range]
|
||||
@@ -22,9 +24,12 @@ def ascii_bar_chart(data, start_freq, step, max_height=20, symbol='#'):
|
||||
for height in heights:
|
||||
line += symbol if height >= level else ' '
|
||||
print(line)
|
||||
|
||||
|
||||
# Print x-axis
|
||||
freq_labels = [f"{(start_freq + step * i) / 1e6:.0f}" if i % 5 == 0 else "" for i in range(len(data))]
|
||||
freq_labels = [
|
||||
f"{(start_freq + step * i) / 1e6:.0f}" if i % 5 == 0 else ""
|
||||
for i in range(len(data))
|
||||
]
|
||||
print(" " * 8 + '-' * len(data)) # Bar separator
|
||||
x_axis = " " * 8
|
||||
printed = 0
|
||||
@@ -36,63 +41,121 @@ def ascii_bar_chart(data, start_freq, step, max_height=20, symbol='#'):
|
||||
x_axis += label #.center(1) # Center the MHz label under the bar
|
||||
else:
|
||||
if printed < (5 - len(previous_label)):
|
||||
x_axis += " " # No space between bars
|
||||
x_axis += "|" # No space between bars
|
||||
printed += 1
|
||||
print(x_axis)
|
||||
|
||||
def scan_frequency_range(start_freq, end_freq, step, sdr, fft_size=2*1024):
|
||||
async def read_samples_async(sdr, fft_size):
|
||||
"""
|
||||
Scans a frequency range using an RTL-SDR device and calculates RSSI for each frequency.
|
||||
Wrapper to make the blocking read_samples method work asynchronously.
|
||||
"""
|
||||
return await asyncio.to_thread(sdr.read_samples, fft_size)
|
||||
|
||||
async def scan_frequency_range(start_freq, end_freq, step, sdr1, sdr2, fft_size=2*1024):
|
||||
"""
|
||||
Scans a frequency range using two RTL-SDR devices and calculates RSSI for each frequency.
|
||||
"""
|
||||
center_frequencies = np.arange(start_freq, end_freq + step, step)
|
||||
rssi_values = []
|
||||
|
||||
rssi_values1 = []
|
||||
rssi_values2 = []
|
||||
|
||||
for freq in center_frequencies:
|
||||
sdr.center_freq = freq
|
||||
samples = sdr.read_samples(fft_size)
|
||||
sdr1.center_freq = freq
|
||||
sdr2.center_freq = freq
|
||||
|
||||
sdr1.read_samples(2048)
|
||||
sdr2.read_samples(2048)
|
||||
|
||||
# Read samples asynchronously from both devices
|
||||
samples1, samples2 = await asyncio.gather(
|
||||
read_samples_async(sdr1, fft_size), #sdr1.read_samples(fft_size),
|
||||
read_samples_async(sdr2, fft_size),
|
||||
)
|
||||
|
||||
# Perform FFT and calculate RSSI
|
||||
power_spectrum = np.abs(np.fft.fft(samples))**2
|
||||
power_db = 10 * np.log10(power_spectrum + 1e-6) # Convert to dB
|
||||
avg_rssi = np.percentile(power_db, 90) # 95 percentile RSSI for this frequency
|
||||
rssi_values.append(avg_rssi)
|
||||
|
||||
power_spectrum1 = np.abs(np.fft.fft(samples1))**2
|
||||
power_spectrum2 = np.abs(np.fft.fft(samples2))**2
|
||||
power_db1 = 10 * np.log10(power_spectrum1 + 1e-6) # Convert to dB
|
||||
power_db2 = 10 * np.log10(power_spectrum2 + 1e-6) # Convert to dB
|
||||
avg_rssi1 = np.percentile(power_db1, 90) # 90th percentile RSSI for this frequency
|
||||
avg_rssi2 = np.percentile(power_db2, 90) # 90th percentile RSSI for this frequency
|
||||
rssi_values1.append(avg_rssi1)
|
||||
rssi_values2.append(avg_rssi2)
|
||||
|
||||
return rssi_values1, rssi_values2
|
||||
|
||||
async def process_samples_async(samples):
|
||||
"""
|
||||
Wrapper to make the blocking read_samples method work asynchronously.
|
||||
"""
|
||||
return await asyncio.to_thread(process_samples, samples)
|
||||
|
||||
def process_samples(samples):
|
||||
rssi_values = []
|
||||
power_spectrum = np.abs(np.fft.fft(samples))**2
|
||||
power_db = 10 * np.log10(power_spectrum + 1e-6) # Convert to dB
|
||||
avg_rssi = np.percentile(power_db, 90) # 90th percentile RSSI for this frequency
|
||||
rssi_values.append(avg_rssi)
|
||||
return rssi_values
|
||||
|
||||
def main():
|
||||
async def main():
|
||||
# Frequency range in Hz
|
||||
start_freq = 800e6 # 800 MHz
|
||||
end_freq = 900e6 # 1200 MHz
|
||||
start_freq = 860e6 # 800 MHz
|
||||
end_freq = 1060e6 # 900 MHz
|
||||
step = 1e6 # 1 MHz steps
|
||||
|
||||
# Initialize RTL-SDR
|
||||
sdr = RtlSdr()
|
||||
sdr.sample_rate = 2.048e6 # 2.048 MSPS
|
||||
sdr.gain = 10 # Adjust gain as needed
|
||||
# Initialize two RTL-SDR devices
|
||||
sdr1 = RtlSdr(0) # First SDR device
|
||||
sdr2 = RtlSdr(1) # Second SDR device
|
||||
|
||||
# List available devices
|
||||
devices = RtlSdr.get_device_serial_addresses()
|
||||
print("Available devices:", devices)
|
||||
|
||||
# Set parameters for both devices
|
||||
for sdr in [sdr1, sdr2]:
|
||||
sdr.sample_rate = 2.048e6 # 2.048 MSPS
|
||||
sdr.gain = 10 # Adjust gain as needed
|
||||
|
||||
try:
|
||||
while True:
|
||||
start_time = time.time()
|
||||
rssi_values = scan_frequency_range(start_freq, end_freq, step, sdr)
|
||||
end_time = time.time()
|
||||
|
||||
try:
|
||||
start_time = time.time()
|
||||
rssi_values1, rssi_values2 = await scan_frequency_range(start_freq, end_freq, step, sdr1, sdr2)
|
||||
end_time = time.time()
|
||||
|
||||
# Clear screen
|
||||
os.system('clear' if os.name == 'posix' else 'cls')
|
||||
# Clear screen
|
||||
os.system('clear' if os.name == 'posix' else 'cls')
|
||||
|
||||
# Scan frequency range with both devices simultaneously
|
||||
print(f"Scanning range: {start_freq / 1e6:.0f} MHz to {end_freq / 1e6:.0f} MHz")
|
||||
print(f"Scanning time: {end_time - start_time:.2f} seconds")
|
||||
print(f"Single MHz time: {((end_time - start_time)/(end_freq / 1e6 - start_freq / 1e6)):.2f} seconds")
|
||||
|
||||
# Display ASCII bar chart for Device 1
|
||||
print("RSSI (dB) for Device 1:")
|
||||
ascii_bar_chart(rssi_values1, start_freq, step)
|
||||
|
||||
# Display ASCII bar chart for Device 2
|
||||
print("\nRSSI (dB) for Device 2:")
|
||||
ascii_bar_chart(rssi_values2, start_freq, step)
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error during scanning: {e}")
|
||||
traceback.print_exc()
|
||||
# Optionally, wait before retrying
|
||||
time.sleep(0.5)
|
||||
|
||||
# Scan frequency range
|
||||
print(f"Scanning range: {start_freq / 1e6:.0f} MHz to {end_freq / 1e6:.0f} MHz")
|
||||
print(f"Scanning time: {end_time - start_time:.2f} seconds")
|
||||
|
||||
# Display ASCII bar chart with RSSI and MHz labels
|
||||
print("RSSI (dB):")
|
||||
ascii_bar_chart(rssi_values, start_freq, step)
|
||||
#time.sleep(0.5) # Refresh every 0.5 seconds
|
||||
|
||||
except KeyboardInterrupt:
|
||||
print("\nStopping scan...")
|
||||
|
||||
finally:
|
||||
sdr.close()
|
||||
sdr1.close()
|
||||
sdr2.close()
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
asyncio.run(main())
|
||||
|
||||
Reference in New Issue
Block a user