mirror of
https://github.com/Genaker/LoraSA.git
synced 2026-03-28 17:42:59 +01:00
173 lines
6.1 KiB
Python
173 lines
6.1 KiB
Python
import sys
|
|
|
|
import SoapySDR
|
|
from SoapySDR import * # SOAPY_SDR_* constants
|
|
|
|
import numpy as np
|
|
import time
|
|
import signal
|
|
|
|
# set log level to error
|
|
SoapySDR.setLogLevel(SoapySDR.SOAPY_SDR_ERROR)
|
|
|
|
# Configuration parameters
|
|
start_freq = 800.1e6 # Start frequency in Hz (e.g., 900.1 MHz for S1G radio)
|
|
stop_freq = 950e6 # Stop frequency in Hz (e.g., 2.49 GHz for HiF radio)
|
|
step_size_default = 0.25e6 # Default step size in Hz
|
|
sample_rate_hif = 4e6 # Adjusted sample rate in Hz for HiF
|
|
gain = 69 # Gain in dB
|
|
freq_resolution = 10e3 # Desired frequency resolution in Hz
|
|
|
|
# Calculate the minimum number of samples needed
|
|
min_samples_hif = int(sample_rate_hif / freq_resolution)
|
|
duration_hif = min_samples_hif / sample_rate_hif # Adjust duration based on minimum samples
|
|
|
|
# Frequency slice size for extraction (0.25 MHz)
|
|
slice_size = 0.25e6
|
|
|
|
def calculate_rssi(signal, num_of_samples):
|
|
if num_of_samples == 0:
|
|
return -120.0 # Default low value when no samples are available
|
|
|
|
sum_of_squares = np.sum(np.abs(signal) ** 2)
|
|
mean_of_squares = sum_of_squares / num_of_samples
|
|
return 10 * np.log10(mean_of_squares + 1e-10) # Convert to dB with safety for log(0)
|
|
|
|
def render_ascii_chart(data):
|
|
if not data:
|
|
print("No data to render.")
|
|
return
|
|
|
|
max_rssi = max(data.values())
|
|
min_rssi = min(data.values())
|
|
chart_width = 50 # Width of the chart in characters
|
|
|
|
print("\nFrequency Sweep Results (ASCII Chart):")
|
|
for freq, rssi in data.items():
|
|
# Scale RSSI values to fit within chart width
|
|
scaled_rssi = int(((rssi - min_rssi) / (max_rssi - min_rssi)) * chart_width) if max_rssi != min_rssi else chart_width // 2
|
|
bar = "#" * scaled_rssi
|
|
print(f"{freq:.3f} MHz | {rssi:.2f} dB | {bar}")
|
|
|
|
def render_graphical_chart(data):
|
|
import matplotlib.pyplot as plt
|
|
if not data:
|
|
print("No data to render.")
|
|
return
|
|
|
|
freqs = list(data.keys())
|
|
rssis = list(data.values())
|
|
|
|
plt.figure(figsize=(10, 6))
|
|
plt.plot(freqs, rssis, marker="o", linestyle="-", color="b")
|
|
plt.title("Frequency Sweep Results (Graphical Chart)")
|
|
plt.xlabel("Frequency (MHz)")
|
|
plt.ylabel("RSSI (dB)")
|
|
plt.grid(True)
|
|
plt.show()
|
|
|
|
def frequency_sweep(output_type="ascii", step_size=step_size_default):
|
|
try:
|
|
device = "Cariboulite"
|
|
# Create and configure the SDR device
|
|
sdr = SoapySDR.Device({"driver": device, "channel": "HiF"}) # Replace 'rtlsdr' with your SDR driver
|
|
print("SDR device initialized.")
|
|
|
|
# Check if FPGA is already programmed
|
|
if sdr.hasHardwareTime():
|
|
print("FPGA is already programmed and operational.")
|
|
else:
|
|
print("FPGA is not programmed. Initializing...")
|
|
# Add any necessary initialization logic here
|
|
|
|
except Exception as e:
|
|
print(f"Error initializing SDR device: {e}")
|
|
|
|
# Set up HiF radio
|
|
sdr.setSampleRate(SOAPY_SDR_RX, 0, sample_rate_hif)
|
|
sdr.setGain(SOAPY_SDR_RX, 0, gain)
|
|
|
|
# Allocate a buffer for received samples
|
|
buffer_size_hif = min_samples_hif
|
|
rx_buffer_hif = np.zeros(buffer_size_hif, dtype=np.complex64)
|
|
|
|
print(f"Starting infinite frequency sweep from {start_freq / 1e6} MHz to {stop_freq / 1e6} MHz...")
|
|
print(f"Frequency resolution: {freq_resolution / 1e3} kHz")
|
|
|
|
freq_rssi_map = {}
|
|
|
|
while True:
|
|
current_freq = start_freq
|
|
while current_freq <= stop_freq:
|
|
start_time = time.time()
|
|
|
|
print(f"Tuning to center frequency {current_freq / 1e6} MHz")
|
|
try:
|
|
sdr.setFrequency(SOAPY_SDR_RX, 0, current_freq)
|
|
time.sleep(0.01) # Add a 10 ms delay after setting the frequency
|
|
except Exception as e:
|
|
print(f"Error setting frequency: {e}")
|
|
current_freq += step_size
|
|
continue
|
|
|
|
# Start streaming
|
|
try:
|
|
stream = sdr.setupStream(SOAPY_SDR_RX, SOAPY_SDR_CF32)
|
|
sdr.activateStream(stream)
|
|
|
|
# Collect samples
|
|
sr = sdr.readStream(stream, [rx_buffer_hif], len(rx_buffer_hif))
|
|
if sr.ret > 0:
|
|
# Calculate RSSI (Received Signal Strength Indicator)
|
|
rssi = calculate_rssi(rx_buffer_hif, sr.ret)
|
|
calibration_offset = -50 # Example offset for realistic RSSI values
|
|
rssi += gain + calibration_offset # Adjust RSSI for gain and calibration
|
|
print(f"Center Frequency: {current_freq / 1e6:.3f} MHz, RSSI: {rssi:.2f} dB")
|
|
|
|
freq_rssi_map[current_freq / 1e6] = rssi # Store frequency in MHz as key and RSSI as value
|
|
|
|
iteration_time = time.time() - start_time
|
|
print(f"Iteration completed in {iteration_time:.3f} seconds")
|
|
|
|
except Exception as e:
|
|
print(f"Error during stream or processing: {e}")
|
|
|
|
finally:
|
|
# Stop streaming and clean up
|
|
try:
|
|
sdr.deactivateStream(stream)
|
|
sdr.closeStream(stream)
|
|
except Exception as e:
|
|
print(f"Error cleaning up stream: {e}")
|
|
|
|
current_freq += step_size
|
|
|
|
# Render the chart after the full sweep
|
|
if freq_rssi_map:
|
|
if output_type == "ascii":
|
|
render_ascii_chart(freq_rssi_map)
|
|
elif output_type == "graphical":
|
|
render_graphical_chart(freq_rssi_map)
|
|
else:
|
|
print("Invalid output type. Choose 'ascii' or 'graphical'.")
|
|
freq_rssi_map.clear()
|
|
|
|
if __name__ == "__main__":
|
|
def catch_quit(sig, frame):
|
|
sdr.deactivateStream(stream)
|
|
sdr.closeStream(stream)
|
|
sys.exit(0)
|
|
|
|
signal.signal(signal.SIGINT, catch_quit)
|
|
|
|
output_type = "ascii" # Default to ASCII output
|
|
step_size = step_size_default # Default step size
|
|
if len(sys.argv) > 1:
|
|
output_type = sys.argv[1].strip().lower()
|
|
if len(sys.argv) > 2:
|
|
try:
|
|
step_size = float(sys.argv[2])
|
|
except ValueError:
|
|
print("Invalid step size provided. Using default.")
|
|
frequency_sweep(output_type=output_type, step_size=step_size)
|