diff --git a/etc/icad_tone.py b/etc/icad_tone.py index f7d6515..8c03ee1 100644 --- a/etc/icad_tone.py +++ b/etc/icad_tone.py @@ -5,47 +5,46 @@ # output to alert.txt for meshing-around bot # 2025 K7MHI Kelly Keeton -ALERT_FILE_PATH = "alert.txt" -AUDIO_SOURCE = "http" # "http" for stream, "soundcard" for microphone or line-in -HTTP_STREAM_URL = "" # Set to your stream URL, e.g., "http://your-stream-url-here/stream.mp3" -SAMPLE_RATE = 44100 -INPUT_CHANNELS = 1 -CHUNK_DURATION = 2 # seconds +# --------------------------- +# User Configuration Section +# --------------------------- +ALERT_FILE_PATH = "alert.txt" # Path to alert log file, or None to disable logging +AUDIO_SOURCE = "soundcard" # "soundcard" for mic/line-in, "http" for stream +HTTP_STREAM_URL = "" # Set to your stream URL if using "http" +SAMPLE_RATE = 16000 # Audio sample rate (Hz) +INPUT_CHANNELS = 1 # Number of input channels (1=mono) +MIN_SAMPLES = 4096 # Minimum samples per detection window (increase for better accuracy) +# --------------------------- -try: - import sys - import os - import time - from icad_tone_detection import tone_detect - import io - from pydub import AudioSegment - import requests - import sounddevice as sd - import numpy as np - import argparse -except ImportError as e: - print(f"Missing required module: {e.name}. Please review the comments in program, and try again.", file=sys.stderr) - sys.exit(1) +import sys +import time +from icad_tone_detection import tone_detect +from pydub import AudioSegment +import requests +import sounddevice as sd +import numpy as np +import argparse +import io def write_alert(message): if ALERT_FILE_PATH: try: - with open(ALERT_FILE_PATH, "w") as f: # overwrite existing file for each alert + with open(ALERT_FILE_PATH, "w") as f: # overwrite each time f.write(message + "\n") except Exception as e: print(f"Error writing to alert file: {e}", file=sys.stderr) def detect_and_alert(audio_data, sample_rate): - result = tone_detect(audio_data, sample_rate) - print("Raw detection result:", result) # Debugging line - if result: - # Print all detected tone types for debugging - for tone_type in ["two_tone_result", "long_result", "hi_low_result", "pulsed_result", "mdc_result", "dtmf_result"]: - tone_list = getattr(result, tone_type, []) - if tone_list: - print(f"{tone_type}:") - for tone in tone_list: - print(tone) + try: + result = tone_detect(audio_data, sample_rate) + except Exception as e: + print(f"Detection error: {e}", file=sys.stderr) + return + # Only print if something is detected + if result and any(getattr(result, t, []) for t in [ + "two_tone_result", "long_result", "hi_low_result", "pulsed_result", "mdc_result", "dtmf_result" + ]): + print("Raw detection result:", result) # Prepare alert summary for all relevant tone types summary = [] if hasattr(result, "dtmf_result") and result.dtmf_result: @@ -78,8 +77,6 @@ def detect_and_alert(audio_data, sample_rate): ) if summary: write_alert("\n".join(summary)) - else: - print("No tone detected.") def main(): parser = argparse.ArgumentParser(description="ICAD Tone Detection") @@ -90,7 +87,6 @@ def main(): print(f"Processing WAV file: {args.wav}") try: audio = AudioSegment.from_file(args.wav) - # Convert to mono if necessary if audio.channels > 1: audio = audio.set_channels(1) print(f"AudioSegment: channels={audio.channels}, frame_rate={audio.frame_rate}, duration={len(audio)}ms") @@ -114,8 +110,9 @@ def main(): if buffer.tell() > SAMPLE_RATE * CHUNK_DURATION * 2: buffer.seek(0) audio = AudioSegment.from_file(buffer, format="mp3") - samples = np.array(audio.get_array_of_samples()) - detect_and_alert(samples, audio.frame_rate) + if audio.channels > 1: + audio = audio.set_channels(1) + detect_and_alert(audio, audio.frame_rate) buffer = io.BytesIO() except requests.exceptions.RequestException as e: print(f"Connection error: {e}", file=sys.stderr) @@ -125,13 +122,34 @@ def main(): sys.exit(4) elif AUDIO_SOURCE == "soundcard": print("Listening to audio device:") - def callback(indata, frames, time, status): - samples = indata[:, 0] - detect_and_alert(samples, SAMPLE_RATE) + buffer = np.array([], dtype=np.float32) + min_samples = MIN_SAMPLES # Use configured minimum samples + + def callback(indata, frames, time_info, status): + nonlocal buffer + try: + samples = indata[:, 0] + buffer = np.concatenate((buffer, samples)) + # Only process when buffer is large enough + while buffer.size >= min_samples: + int_samples = np.int16(buffer[:min_samples] * 32767) + audio = AudioSegment( + data=int_samples.tobytes(), + sample_width=2, + frame_rate=SAMPLE_RATE, + channels=1 + ) + detect_and_alert(audio, SAMPLE_RATE) + buffer = buffer[min_samples:] # keep remainder for next window + except Exception as e: + print(f"Callback error: {e}", file=sys.stderr) try: - with sd.InputStream(samplerate=SAMPLE_RATE, channels=INPUT_CHANNELS, callback=callback): - while True: - time.sleep(CHUNK_DURATION) + with sd.InputStream(samplerate=SAMPLE_RATE, channels=INPUT_CHANNELS, dtype='float32', callback=callback): + print("Press Ctrl+C to stop.") + import signal + signal.pause() # Wait for Ctrl+C, keeps CPU usage minimal + except KeyboardInterrupt: + print("Stopped by user.") except Exception as e: print(f"Error accessing soundcard: {e}", file=sys.stderr) sys.exit(5)