Files
LoraSA/sdr/scanner.py

173 lines
6.1 KiB
Python

import sys
import SoapySDR
from SoapySDR import * # SOAPY_SDR_* constants
import numpy as np
import time
import signal
# set log level to error
SoapySDR.setLogLevel(SoapySDR.SOAPY_SDR_ERROR)
# Configuration parameters
start_freq = 800.1e6 # Start frequency in Hz (e.g., 900.1 MHz for S1G radio)
stop_freq = 950e6 # Stop frequency in Hz (e.g., 2.49 GHz for HiF radio)
step_size_default = 0.25e6 # Default step size in Hz
sample_rate_hif = 4e6 # Adjusted sample rate in Hz for HiF
gain = 69 # Gain in dB
freq_resolution = 10e3 # Desired frequency resolution in Hz
# Calculate the minimum number of samples needed
min_samples_hif = int(sample_rate_hif / freq_resolution)
duration_hif = min_samples_hif / sample_rate_hif # Adjust duration based on minimum samples
# Frequency slice size for extraction (0.25 MHz)
slice_size = 0.25e6
def calculate_rssi(signal, num_of_samples):
if num_of_samples == 0:
return -120.0 # Default low value when no samples are available
sum_of_squares = np.sum(np.abs(signal) ** 2)
mean_of_squares = sum_of_squares / num_of_samples
return 10 * np.log10(mean_of_squares + 1e-10) # Convert to dB with safety for log(0)
def render_ascii_chart(data):
if not data:
print("No data to render.")
return
max_rssi = max(data.values())
min_rssi = min(data.values())
chart_width = 50 # Width of the chart in characters
print("\nFrequency Sweep Results (ASCII Chart):")
for freq, rssi in data.items():
# Scale RSSI values to fit within chart width
scaled_rssi = int(((rssi - min_rssi) / (max_rssi - min_rssi)) * chart_width) if max_rssi != min_rssi else chart_width // 2
bar = "#" * scaled_rssi
print(f"{freq:.3f} MHz | {rssi:.2f} dB | {bar}")
def render_graphical_chart(data):
import matplotlib.pyplot as plt
if not data:
print("No data to render.")
return
freqs = list(data.keys())
rssis = list(data.values())
plt.figure(figsize=(10, 6))
plt.plot(freqs, rssis, marker="o", linestyle="-", color="b")
plt.title("Frequency Sweep Results (Graphical Chart)")
plt.xlabel("Frequency (MHz)")
plt.ylabel("RSSI (dB)")
plt.grid(True)
plt.show()
def frequency_sweep(output_type="ascii", step_size=step_size_default):
try:
device = "Cariboulite"
# Create and configure the SDR device
sdr = SoapySDR.Device({"driver": device, "channel": "HiF"}) # Replace 'rtlsdr' with your SDR driver
print("SDR device initialized.")
# Check if FPGA is already programmed
if sdr.hasHardwareTime():
print("FPGA is already programmed and operational.")
else:
print("FPGA is not programmed. Initializing...")
# Add any necessary initialization logic here
except Exception as e:
print(f"Error initializing SDR device: {e}")
# Set up HiF radio
sdr.setSampleRate(SOAPY_SDR_RX, 0, sample_rate_hif)
sdr.setGain(SOAPY_SDR_RX, 0, gain)
# Allocate a buffer for received samples
buffer_size_hif = min_samples_hif
rx_buffer_hif = np.zeros(buffer_size_hif, dtype=np.complex64)
print(f"Starting infinite frequency sweep from {start_freq / 1e6} MHz to {stop_freq / 1e6} MHz...")
print(f"Frequency resolution: {freq_resolution / 1e3} kHz")
freq_rssi_map = {}
while True:
current_freq = start_freq
while current_freq <= stop_freq:
start_time = time.time()
print(f"Tuning to center frequency {current_freq / 1e6} MHz")
try:
sdr.setFrequency(SOAPY_SDR_RX, 0, current_freq)
time.sleep(0.01) # Add a 10 ms delay after setting the frequency
except Exception as e:
print(f"Error setting frequency: {e}")
current_freq += step_size
continue
# Start streaming
try:
stream = sdr.setupStream(SOAPY_SDR_RX, SOAPY_SDR_CF32)
sdr.activateStream(stream)
# Collect samples
sr = sdr.readStream(stream, [rx_buffer_hif], len(rx_buffer_hif))
if sr.ret > 0:
# Calculate RSSI (Received Signal Strength Indicator)
rssi = calculate_rssi(rx_buffer_hif, sr.ret)
calibration_offset = -50 # Example offset for realistic RSSI values
rssi += gain + calibration_offset # Adjust RSSI for gain and calibration
print(f"Center Frequency: {current_freq / 1e6:.3f} MHz, RSSI: {rssi:.2f} dB")
freq_rssi_map[current_freq / 1e6] = rssi # Store frequency in MHz as key and RSSI as value
iteration_time = time.time() - start_time
print(f"Iteration completed in {iteration_time:.3f} seconds")
except Exception as e:
print(f"Error during stream or processing: {e}")
finally:
# Stop streaming and clean up
try:
sdr.deactivateStream(stream)
sdr.closeStream(stream)
except Exception as e:
print(f"Error cleaning up stream: {e}")
current_freq += step_size
# Render the chart after the full sweep
if freq_rssi_map:
if output_type == "ascii":
render_ascii_chart(freq_rssi_map)
elif output_type == "graphical":
render_graphical_chart(freq_rssi_map)
else:
print("Invalid output type. Choose 'ascii' or 'graphical'.")
freq_rssi_map.clear()
if __name__ == "__main__":
def catch_quit(sig, frame):
sdr.deactivateStream(stream)
sdr.closeStream(stream)
sys.exit(0)
signal.signal(signal.SIGINT, catch_quit)
output_type = "ascii" # Default to ASCII output
step_size = step_size_default # Default step size
if len(sys.argv) > 1:
output_type = sys.argv[1].strip().lower()
if len(sys.argv) > 2:
try:
step_size = float(sys.argv[2])
except ValueError:
print("Invalid step size provided. Using default.")
frequency_sweep(output_type=output_type, step_size=step_size)