Files
Lora-Scanner/lorawan.py
2024-01-31 11:29:52 -07:00

122 lines
3.6 KiB
Python

##!/usr/bin/env python3
import io
import sys
import time
import datetime
import argparse
from enum import IntEnum
import serial
from serial.threaded import LineReader, ReaderThread
parser = argparse.ArgumentParser(description='Connect to LoRaWAN network')
parser.add_argument('port', help="Serial port of LoStik")
parser.add_argument('--joinmode', '-j', help="otaa, abp", default="otaa")
# ABP credentials
parser.add_argument('--appskey', help="App Session Key", default="")
parser.add_argument('--nwkskey', help="Network Session Key", default="")
parser.add_argument('--devaddr', help="Device Address", default="")
# OTAA credentials
parser.add_argument('--appeui', help="App EUI", default="")
parser.add_argument('--appkey', help="App Key", default="")
parser.add_argument('--deveui', help="Device EUI", default="")
args = parser.parse_args()
OTAA_RETRIES = 5
class MaxRetriesError(Exception):
pass
class ConnectionState(IntEnum):
SUCCESS = 0
CONNECTING = 100
CONNECTED = 200
FAILED = 500
TO_MANY_RETRIES = 520
class PrintLines(LineReader):
retries = 0
state = ConnectionState.CONNECTING
def retry(self, action):
if(self.retries >= OTAA_RETRIES):
print("Too many retries, exiting")
self.state = ConnectionState.TO_MANY_RETRIES
return
self.retries = self.retries + 1
action()
def get_var(self, cmd):
self.send_cmd(cmd)
return self.transport.serial.readline()
def join(self):
if args.joinmode == "abp":
self.join_abp()
else:
self.join_otaa()
def join_otaa(self):
if len(args.appeui):
self.send_cmd('mac set appeui %s' % args.appeui)
if len(args.appkey):
self.send_cmd('mac set appkey %s' % args.appkey)
if len(args.deveui):
self.send_cmd('mac set deveui %s' % args.deveui)
self.send_cmd('mac join otaa')
def join_abp(self):
if len(args.devaddr):
self.send_cmd('mac set devaddr %s' % args.devaddr)
if len(args.appskey):
self.send_cmd('mac set appskey %s' % args.appskey)
if len(args.nwkskey):
self.send_cmd('mac set nwkskey %s' % args.nwkskey)
self.send_cmd('mac join abp')
def connection_made(self, transport):
"""
Fires when connection is made to serial port device
"""
print("Connection to LoStik established")
self.transport = transport
self.retry(self.join)
def handle_line(self, data):
# if data == "ok" or data == 'busy':
# return
print("STATUS: %s" % data)
if data.strip() == "denied" or data.strip() == "no_free_ch":
print("Retrying OTAA connection")
self.retry(self.join)
elif data.strip() == "accepted":
print("UPDATING STATE to connected")
self.state = ConnectionState.CONNECTED
def connection_lost(self, exc):
"""
Called when serial connection is severed to device
"""
if exc:
print(exc)
print("Lost connection to serial device")
def send_cmd(self, cmd, delay=.5):
print(cmd)
self.transport.write(('%s\r\n' % cmd).encode('UTF-8'))
time.sleep(delay)
ser = serial.Serial(args.port, baudrate=57600)
with ReaderThread(ser, PrintLines) as protocol:
while protocol.state < ConnectionState.FAILED:
if protocol.state != ConnectionState.CONNECTED:
time.sleep(1)
continue
protocol.send_cmd("mac tx uncnf 1 %d" % int(time.time()))
time.sleep(10)
exit(protocol.state)