mirror of
https://github.com/skinnyrad/Lora-Scanner.git
synced 2026-03-28 17:43:00 +01:00
253 lines
8.0 KiB
Python
253 lines
8.0 KiB
Python
from flask import Flask, render_template, request, jsonify
|
|
from markupsafe import escape
|
|
from flask_socketio import SocketIO, emit
|
|
import serial
|
|
import threading
|
|
import time
|
|
from collections import deque
|
|
import pandas as pd
|
|
import re
|
|
|
|
app = Flask(__name__)
|
|
socketio = SocketIO(app)
|
|
serial_threads = {}
|
|
serial_buffers = {}
|
|
port1_status = True
|
|
port2_status = True
|
|
port3_status = True
|
|
global ser1
|
|
global ser2
|
|
global ser3
|
|
global_dataframe = pd.DataFrame(columns=['Device Name', 'Frequency', 'Signal Strength', 'Plaintext'])
|
|
frequency = lambda port: {'port1': 433, 'port2': 868,'port3': 915}.get(port, None)
|
|
surveydata = {}
|
|
|
|
|
|
def read_serial_data(port, ser, buffer):
|
|
global surveydata
|
|
rssi_pattern = r"RSSI: (-?\d+)"
|
|
rssi = ''
|
|
|
|
while True:
|
|
try:
|
|
if ser.in_waiting > 0:
|
|
data = ser.readline().decode('utf-8').strip()
|
|
match = re.search(rssi_pattern, data)
|
|
|
|
# Check if a RSSI was found
|
|
if match:
|
|
if port =='port1':
|
|
rssi = int(match.group(1))
|
|
surveydata['Raw LoRa Device 443 MHz'] = [433,rssi,'']
|
|
elif port =='port2':
|
|
rssi = int(match.group(1))
|
|
surveydata['Raw LoRa Device 868 MHz'] = [868,rssi,'']
|
|
elif port =='port3':
|
|
rssi = int(match.group(1))
|
|
surveydata['Raw LoRa Device 915 MHz'] = [915,rssi,'']
|
|
|
|
buffer.append(data)
|
|
socketio.emit(f'serial_data_{port}', {'data': data})
|
|
if frequency(port) == 433 and surveydata.get('Raw LoRa Device 443 MHz') is None:
|
|
surveydata['Raw LoRa Device 443 MHz'] = [433,0,rssi]
|
|
|
|
elif frequency(port) == 868 and surveydata.get('Raw LoRa Device 868 MHz') is None:
|
|
surveydata['Raw LoRa Device 868 MHz'] = [868,0,rssi]
|
|
|
|
elif frequency(port) == 915 and surveydata.get('Raw LoRa Device 915 MHz') is None:
|
|
surveydata['Raw LoRa Device 915 MHz'] = [915,0,rssi]
|
|
|
|
|
|
if (port == 'port1' and port1_status == False):
|
|
return
|
|
if (port == 'port2' and port2_status == False):
|
|
return
|
|
if (port == 'port3' and port3_status == False):
|
|
return
|
|
time.sleep(0.1)
|
|
except:
|
|
pass
|
|
|
|
|
|
def connect_serial(port,frequency):
|
|
global ser1
|
|
global ser2
|
|
global ser3
|
|
|
|
if frequency == 433:
|
|
try:
|
|
ser1 = serial.Serial(port, baudrate=9600)
|
|
serial_buffers['port1'] = deque(maxlen=10)
|
|
serial_threads['port1'] = threading.Thread(target=read_serial_data, args=('port1', ser1, serial_buffers['port1']))
|
|
serial_threads['port1'].daemon = True
|
|
serial_threads['port1'].start()
|
|
except:
|
|
print("\n\nPort for 433 MHz not available\n\n")
|
|
if frequency == 868:
|
|
try:
|
|
ser2 = serial.Serial(port, baudrate=9600)
|
|
serial_buffers['port2'] = deque(maxlen=10)
|
|
serial_threads['port2'] = threading.Thread(target=read_serial_data, args=('port2', ser2, serial_buffers['port2']))
|
|
serial_threads['port2'].daemon = True
|
|
serial_threads['port2'].start()
|
|
except:
|
|
print("\n\nPort for 868 MHz not available\n\n")
|
|
if frequency == 915:
|
|
try:
|
|
ser3 = serial.Serial(port, baudrate=9600)
|
|
serial_buffers['port3'] = deque(maxlen=10)
|
|
serial_threads['port3'] = threading.Thread(target=read_serial_data, args=('port3', ser3, serial_buffers['port3']))
|
|
serial_threads['port3'].daemon = True
|
|
serial_threads['port3'].start()
|
|
|
|
except:
|
|
print('\n\nPort for 915 MHz not available\n\n')
|
|
|
|
|
|
def disconnect_serial(port):
|
|
global ser1
|
|
global ser2
|
|
global ser3
|
|
try:
|
|
serial_threads[port].stop()
|
|
del serial_threads[port]
|
|
del serial_buffers[port]
|
|
except:
|
|
pass
|
|
if port == "port1":
|
|
ser1.close()
|
|
elif port == "port2":
|
|
ser2.close()
|
|
elif port == "port3":
|
|
ser3.close()
|
|
else:
|
|
print("Unkown port, something went wrong...")
|
|
|
|
|
|
@app.route('/')
|
|
def index():
|
|
return render_template('index.html')
|
|
|
|
@app.route('/analysis')
|
|
def analysis():
|
|
return render_template('analysis.html', initial_data={port: list(buffer) for port, buffer in serial_buffers.items()})
|
|
|
|
@app.route('/survey')
|
|
def survey():
|
|
return render_template('survey.html', data=global_dataframe)
|
|
|
|
@app.route('/tracking')
|
|
def tracking():
|
|
return render_template('tracking.html', initial_data={port: list(buffer) for port, buffer in serial_buffers.items()})
|
|
|
|
@app.route('/attach_serial_433', methods=['GET'])
|
|
def attach_serial_433():
|
|
user_input = escape(request.args.get('user_input'))
|
|
port1_status = True
|
|
connect_serial(str(user_input), 433)
|
|
# Process the input as needed
|
|
result = f'Serial Port Requested for 433 MHz'
|
|
return jsonify(result=result)
|
|
|
|
@app.route('/delete_serial_433', methods=['GET'])
|
|
def delete433():
|
|
# Add your logic here to handle the confirmation
|
|
disconnect_serial("port1")
|
|
result = "Port Disconnected!"
|
|
return jsonify(result=result)
|
|
|
|
@app.route('/attach_serial_868', methods=['GET'])
|
|
def attach_serial_868():
|
|
user_input = escape(request.args.get('user_input'))
|
|
port2_status = True
|
|
connect_serial(str(user_input), 868)
|
|
# Process the input as needed
|
|
result = f'Serial Port Requested for 868 MHz'
|
|
return jsonify(result=result)
|
|
|
|
@app.route('/delete_serial_868', methods=['GET'])
|
|
def delete868():
|
|
# Add your logic here to handle the confirmation
|
|
disconnect_serial("port2")
|
|
result = "Port Disconnected!"
|
|
return jsonify(result=result)
|
|
|
|
@app.route('/attach_serial_915', methods=['GET'])
|
|
def attach_serial_915():
|
|
user_input = escape(request.args.get('user_input'))
|
|
connect_serial(str(user_input), 915)
|
|
# Process the input as needed
|
|
result = f'Serial Port Requested for 915 MHz'
|
|
return jsonify(result=result)
|
|
|
|
@app.route('/delete_serial_915', methods=['GET'])
|
|
def delete915():
|
|
# Add your logic here to handle the confirmation
|
|
disconnect_serial("port3")
|
|
result = "Port Disconnected!"
|
|
return jsonify(result=result)
|
|
|
|
@socketio.on('connect')
|
|
def handle_connect():
|
|
for port, buffer in serial_buffers.items():
|
|
emit(f'initial_serial_data_{port}', {'data': list(buffer)})
|
|
|
|
@app.route('/transmit433', methods=['POST'])
|
|
def transmit433():
|
|
global ser1
|
|
data = request.json # Get the data from the POST request
|
|
user_input = data.get('user_input') # Extract the user input
|
|
msg = "TX:"+user_input+" "
|
|
ser1.write(msg.encode())
|
|
return jsonify(result="Ok")
|
|
|
|
@app.route('/transmit868', methods=['POST'])
|
|
def transmit868():
|
|
global ser2
|
|
data = request.json # Get the data from the POST request
|
|
user_input = data.get('user_input') # Extract the user input
|
|
msg = "TX:"+user_input+" "
|
|
ser2.write(msg.encode())
|
|
return jsonify(result="Ok")
|
|
|
|
@app.route('/transmit915', methods=['POST'])
|
|
def transmit915():
|
|
global ser3
|
|
data = request.json # Get the data from the POST request
|
|
user_input = data.get('user_input') # Extract the user input
|
|
msg = "TX:"+user_input+" "
|
|
ser3.write(msg.encode())
|
|
return jsonify(result="Ok")
|
|
|
|
@app.route('/checkSer', methods=['GET'])
|
|
def checkSer():
|
|
data = request.args.get('port')
|
|
if data == 'port1':
|
|
try:
|
|
if ser1.is_open:
|
|
return jsonify(result="True")
|
|
except:
|
|
pass
|
|
elif data == 'port2':
|
|
try:
|
|
if ser2.is_open:
|
|
return jsonify(result="True")
|
|
except:
|
|
pass
|
|
elif data == 'port3':
|
|
try:
|
|
if ser3.is_open:
|
|
return jsonify(result="True")
|
|
except:
|
|
pass
|
|
return jsonify(result="False")
|
|
|
|
@app.route('/get_table_data')
|
|
def get_table_data():
|
|
global surveydata
|
|
print(surveydata)
|
|
return jsonify(surveydata)
|
|
|
|
if __name__ == '__main__':
|
|
socketio.run(app, debug=True)
|