Files
meshing-around/modules/radio.py
2025-10-13 08:40:21 -07:00

210 lines
7.7 KiB
Python
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# meshing around with hamlib as a source for info to send to mesh network
# detect signal strength and frequency of active channel if appears to be in use send to mesh network
# depends on rigctld running externally as a network service
# also can use VOX detection with a microphone and vosk speech to text to send voice messages to mesh network
# requires vosk and sounddevice python modules. will auto download needed. more from https://alphacephei.com/vosk/models and unpack
# 2024 Kelly Keeton K7MHI
from modules.log import *
import asyncio
# verbose debug logging for trap words function
debugVoxTmsg = False
if radio_detection_enabled:
# used by hamlib detection
import socket
if voxDetectionEnabled:
# module global variables
previousVoxState = False
voxHoldTime = signalHoldTime
try:
import sounddevice as sd # pip install sounddevice sudo apt install portaudio19-dev
from vosk import Model, KaldiRecognizer # pip install vosk
import json
q = asyncio.Queue(maxsize=10) # what is a reasonable limit?
if useLocalVoxModel:
voxModel = Model(lang=localVoxModelPath) # use built in model for specified language
else:
voxModel = Model(lang=voxLanguage) # use built in model for specified language
except Exception as e:
print(f"RadioMon: Error importing VOX dependencies: {e}")
print(f"To use VOX detection please install the vosk and sounddevice python modules")
print(f"pip install vosk sounddevice")
print(f"sounddevice needs pulseaudio, apt-get install portaudio19-dev")
voxDetectionEnabled = False
logger.error(f"RadioMon: VOX detection disabled due to import error")
FREQ_NAME_MAP = {
462562500: "GRMS CH1",
462587500: "GRMS CH2",
462612500: "GRMS CH3",
462637500: "GRMS CH4",
462662500: "GRMS CH5",
462687500: "GRMS CH6",
462712500: "GRMS CH7",
467562500: "GRMS CH8",
467587500: "GRMS CH9",
467612500: "GRMS CH10",
467637500: "GRMS CH11",
467662500: "GRMS CH12",
467687500: "GRMS CH13",
467712500: "GRMS CH14",
467737500: "GRMS CH15",
462550000: "GRMS CH16",
462575000: "GMRS CH17",
462600000: "GMRS CH18",
462625000: "GMRS CH19",
462675000: "GMRS CH20",
462670000: "GMRS CH21",
462725000: "GMRS CH22",
462725500: "GMRS CH23",
467575000: "GMRS CH24",
467600000: "GMRS CH25",
467625000: "GMRS CH26",
467650000: "GMRS CH27",
467675000: "GMRS CH28",
467700000: "FRS CH1",
462650000: "FRS CH5",
462700000: "FRS CH7",
462737500: "FRS CH16",
146520000: "2M Simplex Calling",
446000000: "70cm Simplex Calling",
156800000: "Marine CH16",
# Add more as needed
}
def get_freq_common_name(freq):
freq = int(freq)
name = FREQ_NAME_MAP.get(freq)
if name:
return name
else:
# Return MHz if not found
return f"{freq/1000000} Mhz"
def get_hamlib(msg="f"):
# get data from rigctld server
try:
rigControlSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
rigControlSocket.settimeout(2)
rigControlSocket.connect((rigControlServerAddress.split(":")[0],int(rigControlServerAddress.split(":")[1])))
except Exception as e:
logger.error(f"RadioMon: Error connecting to rigctld: {e}")
return ERROR_FETCHING_DATA
try:
build_msg = f"{msg}\n"
MESSAGE = bytes(build_msg, "utf-8")
rigControlSocket.sendall(MESSAGE)
# Look for the response
data = rigControlSocket.recv(16)
rigControlSocket.close()
# strip newline and return
data = data.replace(b'\n',b'')
return data.decode("utf-8").rstrip()
except Exception as e:
logger.error(f"RadioMon: Error fetching data from rigctld: {e}")
return ERROR_FETCHING_DATA
def get_sig_strength():
strength = get_hamlib('l STRENGTH')
return strength
def vox_callback(indata, frames, time, status):
if status:
logger.warning(f"RadioMon: VOX input status: {status}")
q.put(bytes(indata))
async def signalWatcher():
global previousStrength
global signalCycle
try:
signalStrength = int(get_sig_strength())
if signalStrength >= previousStrength and signalStrength > signalDetectionThreshold:
message = f"Detected {get_freq_common_name(get_hamlib('f'))} active. S-Meter:{signalStrength}dBm"
logger.debug(f"RadioMon: {message}. Waiting for {signalHoldTime} seconds")
previousStrength = signalStrength
signalCycle = 0
await asyncio.sleep(signalHoldTime)
return message
else:
signalCycle += 1
if signalCycle >= signalCycleLimit:
signalCycle = 0
previousStrength = -40
await asyncio.sleep(signalCooldown)
return None
except Exception as e:
signalStrength = -40
signalCycle = 0
previousStrength = -40
def make_vox_callback(loop, q):
def vox_callback(indata, frames, time, status):
if status:
logger.warning(f"RadioMon: VOX input status: {status}")
try:
loop.call_soon_threadsafe(q.put_nowait, bytes(indata))
except asyncio.QueueFull:
# Optionally log or just drop the oldest
logger.debug("RadioMon: VOX queue full, dropping audio frame")
except RuntimeError:
# Loop may be closed
pass
return vox_callback
async def voxMonitor():
global previousVoxState, voxMsgQueue
try:
model = voxModel
device_info = sd.query_devices(voxInputDevice, 'input')
samplerate = 16000
logger.debug(f"RadioMon: VOX monitor started on device {device_info['name']} with samplerate {samplerate} using trap words: {voxTrapList if voxOnTrapList else 'none'}")
rec = KaldiRecognizer(model, samplerate)
loop = asyncio.get_running_loop()
callback = make_vox_callback(loop, q)
with sd.RawInputStream(
device=voxInputDevice,
samplerate=samplerate,
blocksize=8000,
dtype='int16',
channels=1,
callback=callback
):
while True:
data = await q.get()
if rec.AcceptWaveform(data):
result = rec.Result()
text = json.loads(result).get("text", "")
# check for trap words
if text and text != 'huh':
if voxOnTrapList:
if isinstance(voxTrapList, str):
traps = [voxTrapList]
else:
traps = voxTrapList
if any(trap.lower() in text.lower() for trap in traps):
#remove the trap words from the text
for trap in traps:
text = text.replace(trap, '')
text = text.strip()
if text:
logger.debug(f"RadioMon: VOX 🎙Trapped {voxTrapList} in: {text}")
voxMsgQueue.append(f"🎙Trapped {voxDescription}: {text}")
else:
if debugVoxTmsg:
logger.debug(f"RadioMon: VOX ignored text not on trap list: {text}")
else:
voxMsgQueue.append(f"🎙Detected {voxDescription}: {text}")
await asyncio.sleep(0.5)
except Exception as e:
logger.error(f"RadioMon: Error in VOX monitor: {e}")
# end of file