Merge pull request #267 from SpudGunMan/lab

Lab
This commit is contained in:
Kelly
2025-11-12 19:47:11 -08:00
committed by GitHub
6 changed files with 284 additions and 20 deletions
+3
View File
@@ -52,6 +52,7 @@ Mesh Bot is a feature-rich Python bot designed to enhance your [Meshtastic](http
- **Customizable Triggers**: Use proximity events for creative applications like "king of the hill" or 🧭 geocache games by adjusting the alert cycle.
- **High Flying Alerts**: Receive notifications when nodes with high altitude are detected on the mesh.
- **Voice/Command Triggers**: Activate bot functions using keywords or voice commands (see [Voice Commands](#voice-commands-vox) for "Hey Chirpy!" support).
- **YOLOv5 alerts**: Use camera modules to detect objects or OCR
### EAS Alerts
- **FEMA iPAWS/EAS Alerts**: Receive Emergency Alerts from FEMA via API on internet-connected nodes.
@@ -72,6 +73,7 @@ Mesh Bot is a feature-rich Python bot designed to enhance your [Meshtastic](http
- **WSJT-X Integration**: Monitor WSJT-X (FT8, FT4, WSPR, etc.) decode messages and forward them to the mesh network with optional callsign filtering.
- **JS8Call Integration**: Monitor JS8Call messages and forward them to the mesh network with optional callsign filtering.
- **Meshages TTS**: The bot can speak mesh messages aloud using [KittenTTS](https://github.com/KittenML/KittenTTS). Enable this feature to have important alerts and messages read out loud on your device—ideal for hands-free operation or accessibility. See [radio.md](modules/radio.md) for setup instructions.
- **Offline Tone out Decoder**: Decode fire Tone out and DTMF and action with alerts to mesh
### Asset Tracking, Check-In/Check-Out, and Inventory Management
Advanced check-in/check-out and asset tracking for people and equipment—ideal for accountability, safety monitoring, and logistics (e.g., Radio-Net, FEMA, trailhead groups). Admin approval workflows, GPS location capture, and overdue alerts. The integrated inventory and point-of-sale (POS) system enables item management, sales tracking, cart-based transactions, and daily reporting, for swaps, emergency supply management, and field operations, maker-places.
@@ -172,6 +174,7 @@ For testing and feature ideas on Discord and GitHub, if its stable its thanks to
- **mrpatrick1991**: For OG Docker configurations. 💻
- **A-c0rN**: Assistance with iPAWS and 🚨
- **Mike O'Connell/skrrt**: For [eas_alert_parser](etc/eas_alert_parser.py) enhanced by **sheer.cold**
- **dadud**: For idea on [etc/icad_tone.py](etc/icad_tone.py)
- **WH6GXZ nurse dude**: Volcano Alerts 🌋
- **mikecarper**: hamtest, leading to quiz etc.. 📋
- **c.merphy360**: high altitude alerts. 🚀
+33
View File
@@ -97,3 +97,36 @@ Run this script to monitor the camera feed and generate alerts for detected and
---
## icad_tone.py
**Purpose:**
`icad_tone.py` is a utility script for detecting fire and EMS radio tones using the [icad_tone_detection](https://github.com/thegreatcodeholio/icad_tone_detection) library. It analyzes audio from a live stream, soundcard, or WAV file, identifies various tone types (such as two-tone, long tone, hi/low, pulsed, MDC, and DTMF), and writes detected alerts to `alert.txt` for integration with Mesh Bot or Meshtastic.
**Usage:**
Run the script from the command line, specifying a WAV file for offline analysis or configuring it to listen to a stream or soundcard for real-time monitoring.
```sh
python etc/icad_tone.py --wav path/to/file.wav
```
Or, for live monitoring (after setting `HTTP_STREAM_URL` in the script):
```sh
python etc/icad_tone.py
```
**What it does:**
- Loads audio from a stream, soundcard, or WAV file.
- Uses `icad_tone_detection` to analyze audio for tone patterns.
- Prints raw detection results and summaries to the console.
- Writes a summary of detected tones to `alert.txt` (overwriting each time).
- Handles errors and missing dependencies gracefully.
**Configuration:**
- `ALERT_FILE_PATH`: Path to the alert output file (default: `alert.txt`).
- `AUDIO_SOURCE`: Set to `"http"` for streaming or `"soundcard"` for local audio input.
- `HTTP_STREAM_URL`: URL of the audio stream (required if using HTTP source).
- `SAMPLE_RATE`, `INPUT_CHANNELS`, `CHUNK_DURATION`: Audio processing parameters.
**Note:**
- Requires installation of dependencies (`icad_tone_detection`)
- Set `HTTP_STREAM_URL` to a valid stream if using HTTP mode.
- Intended for experimental or hobbyist use; may require customization for your workflow.
+222
View File
@@ -0,0 +1,222 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# icad_tone.py - uses icad_tone_detection, for fire and EMS tone detection
# https://github.com/thegreatcodeholio/icad_tone_detection
# output to alert.txt for meshing-around bot
# 2025 K7MHI Kelly Keeton
# ---------------------------
# User Configuration Section
# ---------------------------
ALERT_FILE_PATH = "alert.txt" # Path to alert log file, or None to disable logging
AUDIO_SOURCE = "soundcard" # "soundcard" for mic/line-in, "http" for stream
HTTP_STREAM_URL = "" # Set to your stream URL if using "http"
SAMPLE_RATE = 16000 # Audio sample rate (Hz)
INPUT_CHANNELS = 1 # Number of input channels (1=mono)
MIN_SAMPLES = 4096 # Minimum samples per detection window (increase for better accuracy)
STREAM_BUFFER = 32000 # Number of bytes to buffer before detection (for MP3 streams)
INPUT_DEVICE = 0 # Set to device index or name, or None for default
# ---------------------------
import sys
import time
from icad_tone_detection import tone_detect
from pydub import AudioSegment
import requests
import sounddevice as sd
import numpy as np
import argparse
import io
import warnings
warnings.filterwarnings("ignore", message="nperseg = .* is greater than input length")
def write_alert(message):
if ALERT_FILE_PATH:
try:
with open(ALERT_FILE_PATH, "w") as f: # overwrite each time
f.write(message + "\n")
except Exception as e:
print(f"Error writing to alert file: {e}", file=sys.stderr)
def detect_and_alert(audio_data, sample_rate):
try:
result = tone_detect(audio_data, sample_rate)
except Exception as e:
print(f"Detection error: {e}", file=sys.stderr)
return
# Only print if something is detected
if result and any(getattr(result, t, []) for t in [
"two_tone_result", "long_result", "hi_low_result", "pulsed_result", "mdc_result", "dtmf_result"
]):
print("Raw detection result:", result)
# Prepare alert summary for all relevant tone types
summary = []
if hasattr(result, "dtmf_result") and result.dtmf_result:
for dtmf in result.dtmf_result:
summary.append(f"DTMF Digit: {dtmf.get('digit', '?')} | Duration: {dtmf.get('length', '?')}s")
if hasattr(result, "hi_low_result") and result.hi_low_result:
for hl in result.hi_low_result:
summary.append(
f"Hi/Low Alternations: {hl.get('alternations', '?')} | Duration: {hl.get('length', '?')}s"
)
if hasattr(result, "mdc_result") and result.mdc_result:
for mdc in result.mdc_result:
summary.append(
f"MDC UnitID: {mdc.get('unitID', '?')} | Op: {mdc.get('op', '?')} | Duration: {mdc.get('length', '?')}s"
)
if hasattr(result, "pulsed_result") and result.pulsed_result:
for pl in result.pulsed_result:
summary.append(
f"Pulsed Tone: {pl.get('detected', '?')}Hz | Cycles: {pl.get('cycles', '?')} | Duration: {pl.get('length', '?')}s"
)
if hasattr(result, "two_tone_result") and result.two_tone_result:
for tt in result.two_tone_result:
summary.append(
f"Two-Tone: {tt.get('detected', ['?','?'])[0]}Hz/{tt.get('detected', ['?','?'])[1]}Hz | Tone A: {tt.get('tone_a_length', '?')}s | Tone B: {tt.get('tone_b_length', '?')}s"
)
if hasattr(result, "long_result") and result.long_result:
for lt in result.long_result:
summary.append(
f"Long Tone: {lt.get('detected', '?')}Hz | Duration: {lt.get('length', '?')}s"
)
if summary:
write_alert("\n".join(summary))
def get_supported_sample_rate(device, channels=1):
# Try common sample rates
for rate in [44100, 48000, 16000, 8000]:
try:
sd.check_input_settings(device=device, channels=channels, samplerate=rate)
return rate
except Exception:
continue
return None
def main():
print("="*80)
print(" iCAD Tone Decoder for Meshing-Around Booting Up!")
if AUDIO_SOURCE == "soundcard":
try:
if INPUT_DEVICE is not None:
sd.default.device = INPUT_DEVICE
device_info = sd.query_devices(INPUT_DEVICE, kind='input')
else:
device_info = sd.query_devices(sd.default.device, kind='input')
device_name = device_info['name']
# Detect supported sample rate
detected_rate = get_supported_sample_rate(sd.default.device, INPUT_CHANNELS)
if detected_rate:
SAMPLE_RATE = detected_rate
else:
print("No supported sample rate found, using default.", file=sys.stderr)
except Exception:
device_name = "Unknown"
print(f" Mode: Soundcard | Device: {device_name} | Sample Rate: {SAMPLE_RATE} Hz | Channels: {INPUT_CHANNELS}")
elif AUDIO_SOURCE == "http":
print(f" Mode: HTTP Stream | URL: {HTTP_STREAM_URL} | Buffer: {STREAM_BUFFER} bytes")
else:
print(f" Mode: {AUDIO_SOURCE}")
print("="*80)
time.sleep(1)
parser = argparse.ArgumentParser(description="ICAD Tone Detection")
parser.add_argument("--wav", type=str, help="Path to WAV file for detection")
args = parser.parse_args()
if args.wav:
print(f"Processing WAV file: {args.wav}")
try:
audio = AudioSegment.from_file(args.wav)
if audio.channels > 1:
audio = audio.set_channels(1)
print(f"AudioSegment: channels={audio.channels}, frame_rate={audio.frame_rate}, duration={len(audio)}ms")
detect_and_alert(audio, audio.frame_rate)
except Exception as e:
print(f"Error processing WAV file: {e}", file=sys.stderr)
return
print("Starting ICAD Tone Detection...")
if AUDIO_SOURCE == "http":
if not HTTP_STREAM_URL or HTTP_STREAM_URL.startswith("http://your-stream-url-here"):
print("ERROR: Please set a valid HTTP_STREAM_URL or provide a WAV file using --wav option.", file=sys.stderr)
sys.exit(2)
print(f"Listening to HTTP stream: {HTTP_STREAM_URL}")
try:
response = requests.get(HTTP_STREAM_URL, stream=True, timeout=10)
buffer = io.BytesIO()
try:
for chunk in response.iter_content(chunk_size=4096):
buffer.write(chunk)
# Use STREAM_BUFFER for detection window
if buffer.tell() > STREAM_BUFFER:
buffer.seek(0)
audio = AudioSegment.from_file(buffer, format="mp3")
if audio.channels > 1:
audio = audio.set_channels(1)
# --- Simple audio level detection ---
samples = np.array(audio.get_array_of_samples())
if samples.dtype != np.float32:
samples = samples.astype(np.float32) / 32767.0 # Normalize to -1..1
rms = np.sqrt(np.mean(samples**2))
if rms > 0.01:
print(f"Audio detected! RMS: {rms:.3f} ", end='\r')
if rms > 0.5:
print(f"WARNING: Audio too loud! RMS: {rms:.3f} ", end='\r')
# --- End audio level detection ---
detect_and_alert(audio, audio.frame_rate)
buffer = io.BytesIO()
except KeyboardInterrupt:
print("\nStopped by user.")
sys.exit(0)
except requests.exceptions.RequestException as e:
print(f"Connection error: {e}", file=sys.stderr)
sys.exit(3)
except Exception as e:
print(f"Error processing HTTP stream: {e}", file=sys.stderr)
sys.exit(4)
elif AUDIO_SOURCE == "soundcard":
print("Listening to audio device:")
buffer = np.array([], dtype=np.float32)
min_samples = MIN_SAMPLES # Use configured minimum samples
def callback(indata, frames, time_info, status):
nonlocal buffer
try:
samples = indata[:, 0]
buffer = np.concatenate((buffer, samples))
# --- Simple audio level detection ---
rms = np.sqrt(np.mean(samples**2))
if rms > 0.01:
print(f"Audio detected! RMS: {rms:.3f} ", end='\r')
if rms > 0.5:
print(f"WARNING: Audio too loud! RMS: {rms:.3f} ", end='\r')
# --- End audio level detection ---
# Only process when buffer is large enough
while buffer.size >= min_samples:
int_samples = np.int16(buffer[:min_samples] * 32767)
audio = AudioSegment(
data=int_samples.tobytes(),
sample_width=2,
frame_rate=SAMPLE_RATE,
channels=1
)
detect_and_alert(audio, SAMPLE_RATE)
buffer = buffer[min_samples:] # keep remainder for next window
except Exception as e:
print(f"Callback error: {e}", file=sys.stderr)
try:
with sd.InputStream(samplerate=SAMPLE_RATE, channels=INPUT_CHANNELS, dtype='float32', callback=callback):
print("Press Ctrl+C to stop.")
import signal
signal.pause() # Wait for Ctrl+C, keeps CPU usage minimal
except KeyboardInterrupt:
print("Stopped by user.")
except Exception as e:
print(f"Error accessing soundcard: {e}", file=sys.stderr)
sys.exit(5)
else:
print("Unknown AUDIO_SOURCE. Set to 'http' or 'soundcard'.", file=sys.stderr)
sys.exit(6)
if __name__ == "__main__":
main()
+12 -9
View File
@@ -290,6 +290,8 @@ def handle_ping(message_from_id, deviceID, message, hop, snr, rssi, isDM, chann
if (float(snr) != 0 or float(rssi) != 0) and "Hops" not in hop:
msg += f"\nSNR:{snr} RSSI:{rssi}"
elif "Hops" in hop:
# janky, remove the words Gateway or MQTT if present
hop = hop.replace("Gateway", "").replace("Direct", "").replace("MQTT", "").strip()
msg += f"\n{hop}🐇 "
if "@" in message:
@@ -2029,25 +2031,26 @@ def onReceive(packet, interface):
else:
hop_count = hop_away
if hop == "" and hop_count > 0:
if hop_count > 0:
# set hop string from calculated hop count
hop = f"{hop_count} Hop" if hop_count == 1 else f"{hop_count} Hops"
if hop_start == hop_limit and "lora" in str(transport_mechanism).lower() and (snr != 0 or rssi != 0):
if hop_start == hop_limit and "lora" in str(transport_mechanism).lower() and (snr != 0 or rssi != 0) and hop_count == 0:
# 2.7+ firmware direct hop over LoRa
hop = "Direct"
if ((hop_start == 0 and hop_limit >= 0) or via_mqtt or ("mqtt" in str(transport_mechanism).lower())):
if via_mqtt or "mqtt" in str(transport_mechanism).lower():
hop = "MQTT"
elif hop == "" and hop_count == 0 and (snr != 0 or rssi != 0):
# this came from a UDP but we had signal info so gateway is used
hop = "Gateway"
elif "unknown" in str(transport_mechanism).lower() and (snr == 0 and rssi == 0):
# we for sure detected this sourced from a UDP like host
via_mqtt = True
elif "udp" in str(transport_mechanism).lower():
hop = "Gateway"
if hop in ("MQTT", "Gateway") and hop_count > 0:
hop = f"{hop_count} Hops"
hop = f" {hop_count} Hops"
# Add relay node info if present
if packet.get('relayNode') is not None:
hop += f" (Relay:{packet['relayNode']})"
if enableHopLogs:
logger.debug(f"System: Packet HopDebugger: hop_away:{hop_away} hop_limit:{hop_limit} hop_start:{hop_start} calculated_hop_count:{hop_count} final_hop_value:{hop} via_mqtt:{via_mqtt} transport_mechanism:{transport_mechanism} Hostname:{rxNodeHostName}")
+2 -2
View File
@@ -1004,7 +1004,7 @@ def stringSafeCheck(s, fromID=0):
if len(s) > 1000:
return False
# Check for single-character injections
single_injection_chars = [';', '|', '}', '>', ')']
single_injection_chars = [';', '|', '}', '>']
if any(c in s for c in single_injection_chars):
return False # injection character found
# Check for multi-character patterns
@@ -1348,7 +1348,7 @@ def handleAlertBroadcast(deviceID=1):
def onDisconnect(interface):
# Handle disconnection of the interface
logger.warning(f"System: Abrupt Disconnection of Interface detected")
logger.warning(f"System: Abrupt Disconnection of Interface detected, attempting reconnect...")
interface.close()
# Telemetry Functions
+12 -9
View File
@@ -107,6 +107,8 @@ def handle_ping(message_from_id, deviceID, message, hop, snr, rssi, isDM, chann
if (float(snr) != 0 or float(rssi) != 0) and "Hops" not in hop:
msg += f"\nSNR:{snr} RSSI:{rssi}"
elif "Hops" in hop:
# janky, remove the words Gateway or MQTT if present
hop = hop.replace("Gateway", "").replace("Direct", "").replace("MQTT", "").strip()
msg += f"\n{hop}🐇 "
else:
msg += "\nflood route"
@@ -384,25 +386,26 @@ def onReceive(packet, interface):
else:
hop_count = hop_away
if hop == "" and hop_count > 0:
if hop_count > 0:
# set hop string from calculated hop count
hop = f"{hop_count} Hop" if hop_count == 1 else f"{hop_count} Hops"
if hop_start == hop_limit and "lora" in str(transport_mechanism).lower() and (snr != 0 or rssi != 0):
if hop_start == hop_limit and "lora" in str(transport_mechanism).lower() and (snr != 0 or rssi != 0) and hop_count == 0:
# 2.7+ firmware direct hop over LoRa
hop = "Direct"
if ((hop_start == 0 and hop_limit >= 0) or via_mqtt or ("mqtt" in str(transport_mechanism).lower())):
if via_mqtt or "mqtt" in str(transport_mechanism).lower():
hop = "MQTT"
elif hop == "" and hop_count == 0 and (snr != 0 or rssi != 0):
# this came from a UDP but we had signal info so gateway is used
hop = "Gateway"
elif "unknown" in str(transport_mechanism).lower() and (snr == 0 and rssi == 0):
# we for sure detected this sourced from a UDP like host
via_mqtt = True
elif "udp" in str(transport_mechanism).lower():
hop = "Gateway"
if hop in ("MQTT", "Gateway") and hop_count > 0:
hop = f"{hop_count} Hops"
hop = f" {hop_count} Hops"
# Add relay node info if present
if packet.get('relayNode') is not None:
hop += f" (Relay:{packet['relayNode']})"
if my_settings.enableHopLogs:
logger.debug(f"System: Packet HopDebugger: hop_away:{hop_away} hop_limit:{hop_limit} hop_start:{hop_start} calculated_hop_count:{hop_count} final_hop_value:{hop} via_mqtt:{via_mqtt} transport_mechanism:{transport_mechanism} Hostname:{rxNodeHostName}")