From e3e6393bad224741f34e0f5e22cc2cee2429c5e3 Mon Sep 17 00:00:00 2001 From: SpudGunMan Date: Wed, 12 Nov 2025 11:50:40 -0800 Subject: [PATCH] icad_tone_alerts icad_tone_alerts --- etc/README.md | 33 +++++++++++ etc/icad_tone.py | 144 +++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 177 insertions(+) create mode 100644 etc/icad_tone.py diff --git a/etc/README.md b/etc/README.md index 6f749fb..17d7314 100644 --- a/etc/README.md +++ b/etc/README.md @@ -97,3 +97,36 @@ Run this script to monitor the camera feed and generate alerts for detected and --- +## icad_tone.py + +**Purpose:** +`icad_tone.py` is a utility script for detecting fire and EMS radio tones using the [icad_tone_detection](https://github.com/thegreatcodeholio/icad_tone_detection) library. It analyzes audio from a live stream, soundcard, or WAV file, identifies various tone types (such as two-tone, long tone, hi/low, pulsed, MDC, and DTMF), and writes detected alerts to `alert.txt` for integration with Mesh Bot or Meshtastic. + +**Usage:** +Run the script from the command line, specifying a WAV file for offline analysis or configuring it to listen to a stream or soundcard for real-time monitoring. + +```sh +python etc/icad_tone.py --wav path/to/file.wav +``` +Or, for live monitoring (after setting `HTTP_STREAM_URL` in the script): +```sh +python etc/icad_tone.py +``` + +**What it does:** +- Loads audio from a stream, soundcard, or WAV file. +- Uses `icad_tone_detection` to analyze audio for tone patterns. +- Prints raw detection results and summaries to the console. +- Writes a summary of detected tones to `alert.txt` (overwriting each time). +- Handles errors and missing dependencies gracefully. + +**Configuration:** +- `ALERT_FILE_PATH`: Path to the alert output file (default: `alert.txt`). +- `AUDIO_SOURCE`: Set to `"http"` for streaming or `"soundcard"` for local audio input. +- `HTTP_STREAM_URL`: URL of the audio stream (required if using HTTP source). +- `SAMPLE_RATE`, `INPUT_CHANNELS`, `CHUNK_DURATION`: Audio processing parameters. + +**Note:** +- Requires installation of dependencies (`icad_tone_detection`) +- Set `HTTP_STREAM_URL` to a valid stream if using HTTP mode. +- Intended for experimental or hobbyist use; may require customization for your workflow. \ No newline at end of file diff --git a/etc/icad_tone.py b/etc/icad_tone.py new file mode 100644 index 0000000..f7d6515 --- /dev/null +++ b/etc/icad_tone.py @@ -0,0 +1,144 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# icad_tone.py - uses icad_tone_detection, for fire and EMS tone detection +# https://github.com/thegreatcodeholio/icad_tone_detection +# output to alert.txt for meshing-around bot +# 2025 K7MHI Kelly Keeton + +ALERT_FILE_PATH = "alert.txt" +AUDIO_SOURCE = "http" # "http" for stream, "soundcard" for microphone or line-in +HTTP_STREAM_URL = "" # Set to your stream URL, e.g., "http://your-stream-url-here/stream.mp3" +SAMPLE_RATE = 44100 +INPUT_CHANNELS = 1 +CHUNK_DURATION = 2 # seconds + +try: + import sys + import os + import time + from icad_tone_detection import tone_detect + import io + from pydub import AudioSegment + import requests + import sounddevice as sd + import numpy as np + import argparse +except ImportError as e: + print(f"Missing required module: {e.name}. Please review the comments in program, and try again.", file=sys.stderr) + sys.exit(1) + +def write_alert(message): + if ALERT_FILE_PATH: + try: + with open(ALERT_FILE_PATH, "w") as f: # overwrite existing file for each alert + f.write(message + "\n") + except Exception as e: + print(f"Error writing to alert file: {e}", file=sys.stderr) + +def detect_and_alert(audio_data, sample_rate): + result = tone_detect(audio_data, sample_rate) + print("Raw detection result:", result) # Debugging line + if result: + # Print all detected tone types for debugging + for tone_type in ["two_tone_result", "long_result", "hi_low_result", "pulsed_result", "mdc_result", "dtmf_result"]: + tone_list = getattr(result, tone_type, []) + if tone_list: + print(f"{tone_type}:") + for tone in tone_list: + print(tone) + # Prepare alert summary for all relevant tone types + summary = [] + if hasattr(result, "dtmf_result") and result.dtmf_result: + for dtmf in result.dtmf_result: + summary.append(f"DTMF Digit: {dtmf.get('digit', '?')} | Duration: {dtmf.get('length', '?')}s") + if hasattr(result, "hi_low_result") and result.hi_low_result: + for hl in result.hi_low_result: + summary.append( + f"Hi/Low Alternations: {hl.get('alternations', '?')} | Duration: {hl.get('length', '?')}s" + ) + if hasattr(result, "mdc_result") and result.mdc_result: + for mdc in result.mdc_result: + summary.append( + f"MDC UnitID: {mdc.get('unitID', '?')} | Op: {mdc.get('op', '?')} | Duration: {mdc.get('length', '?')}s" + ) + if hasattr(result, "pulsed_result") and result.pulsed_result: + for pl in result.pulsed_result: + summary.append( + f"Pulsed Tone: {pl.get('detected', '?')}Hz | Cycles: {pl.get('cycles', '?')} | Duration: {pl.get('length', '?')}s" + ) + if hasattr(result, "two_tone_result") and result.two_tone_result: + for tt in result.two_tone_result: + summary.append( + f"Two-Tone: {tt.get('detected', ['?','?'])[0]}Hz/{tt.get('detected', ['?','?'])[1]}Hz | Tone A: {tt.get('tone_a_length', '?')}s | Tone B: {tt.get('tone_b_length', '?')}s" + ) + if hasattr(result, "long_result") and result.long_result: + for lt in result.long_result: + summary.append( + f"Long Tone: {lt.get('detected', '?')}Hz | Duration: {lt.get('length', '?')}s" + ) + if summary: + write_alert("\n".join(summary)) + else: + print("No tone detected.") + +def main(): + parser = argparse.ArgumentParser(description="ICAD Tone Detection") + parser.add_argument("--wav", type=str, help="Path to WAV file for detection") + args = parser.parse_args() + + if args.wav: + print(f"Processing WAV file: {args.wav}") + try: + audio = AudioSegment.from_file(args.wav) + # Convert to mono if necessary + if audio.channels > 1: + audio = audio.set_channels(1) + print(f"AudioSegment: channels={audio.channels}, frame_rate={audio.frame_rate}, duration={len(audio)}ms") + detect_and_alert(audio, audio.frame_rate) + except Exception as e: + print(f"Error processing WAV file: {e}", file=sys.stderr) + return + + print("Starting ICAD Tone Detection...") + + if AUDIO_SOURCE == "http": + if not HTTP_STREAM_URL or HTTP_STREAM_URL.startswith("http://your-stream-url-here"): + print("ERROR: Please set a valid HTTP_STREAM_URL or provide a WAV file using --wav option.", file=sys.stderr) + sys.exit(2) + print(f"Listening to HTTP stream: {HTTP_STREAM_URL}") + try: + response = requests.get(HTTP_STREAM_URL, stream=True, timeout=10) + buffer = io.BytesIO() + for chunk in response.iter_content(chunk_size=4096): + buffer.write(chunk) + if buffer.tell() > SAMPLE_RATE * CHUNK_DURATION * 2: + buffer.seek(0) + audio = AudioSegment.from_file(buffer, format="mp3") + samples = np.array(audio.get_array_of_samples()) + detect_and_alert(samples, audio.frame_rate) + buffer = io.BytesIO() + except requests.exceptions.RequestException as e: + print(f"Connection error: {e}", file=sys.stderr) + sys.exit(3) + except Exception as e: + print(f"Error processing HTTP stream: {e}", file=sys.stderr) + sys.exit(4) + elif AUDIO_SOURCE == "soundcard": + print("Listening to audio device:") + def callback(indata, frames, time, status): + samples = indata[:, 0] + detect_and_alert(samples, SAMPLE_RATE) + try: + with sd.InputStream(samplerate=SAMPLE_RATE, channels=INPUT_CHANNELS, callback=callback): + while True: + time.sleep(CHUNK_DURATION) + except Exception as e: + print(f"Error accessing soundcard: {e}", file=sys.stderr) + sys.exit(5) + else: + print("Unknown AUDIO_SOURCE. Set to 'http' or 'soundcard'.", file=sys.stderr) + sys.exit(6) + +if __name__ == "__main__": + main() +