Update icad_tone.py

This commit is contained in:
SpudGunMan
2025-11-12 12:10:18 -08:00
parent 0aa8bccd04
commit 665acaa904

View File

@@ -5,47 +5,46 @@
# output to alert.txt for meshing-around bot
# 2025 K7MHI Kelly Keeton
ALERT_FILE_PATH = "alert.txt"
AUDIO_SOURCE = "http" # "http" for stream, "soundcard" for microphone or line-in
HTTP_STREAM_URL = "" # Set to your stream URL, e.g., "http://your-stream-url-here/stream.mp3"
SAMPLE_RATE = 44100
INPUT_CHANNELS = 1
CHUNK_DURATION = 2 # seconds
# ---------------------------
# User Configuration Section
# ---------------------------
ALERT_FILE_PATH = "alert.txt" # Path to alert log file, or None to disable logging
AUDIO_SOURCE = "soundcard" # "soundcard" for mic/line-in, "http" for stream
HTTP_STREAM_URL = "" # Set to your stream URL if using "http"
SAMPLE_RATE = 16000 # Audio sample rate (Hz)
INPUT_CHANNELS = 1 # Number of input channels (1=mono)
MIN_SAMPLES = 4096 # Minimum samples per detection window (increase for better accuracy)
# ---------------------------
try:
import sys
import os
import time
from icad_tone_detection import tone_detect
import io
from pydub import AudioSegment
import requests
import sounddevice as sd
import numpy as np
import argparse
except ImportError as e:
print(f"Missing required module: {e.name}. Please review the comments in program, and try again.", file=sys.stderr)
sys.exit(1)
import sys
import time
from icad_tone_detection import tone_detect
from pydub import AudioSegment
import requests
import sounddevice as sd
import numpy as np
import argparse
import io
def write_alert(message):
if ALERT_FILE_PATH:
try:
with open(ALERT_FILE_PATH, "w") as f: # overwrite existing file for each alert
with open(ALERT_FILE_PATH, "w") as f: # overwrite each time
f.write(message + "\n")
except Exception as e:
print(f"Error writing to alert file: {e}", file=sys.stderr)
def detect_and_alert(audio_data, sample_rate):
result = tone_detect(audio_data, sample_rate)
print("Raw detection result:", result) # Debugging line
if result:
# Print all detected tone types for debugging
for tone_type in ["two_tone_result", "long_result", "hi_low_result", "pulsed_result", "mdc_result", "dtmf_result"]:
tone_list = getattr(result, tone_type, [])
if tone_list:
print(f"{tone_type}:")
for tone in tone_list:
print(tone)
try:
result = tone_detect(audio_data, sample_rate)
except Exception as e:
print(f"Detection error: {e}", file=sys.stderr)
return
# Only print if something is detected
if result and any(getattr(result, t, []) for t in [
"two_tone_result", "long_result", "hi_low_result", "pulsed_result", "mdc_result", "dtmf_result"
]):
print("Raw detection result:", result)
# Prepare alert summary for all relevant tone types
summary = []
if hasattr(result, "dtmf_result") and result.dtmf_result:
@@ -78,8 +77,6 @@ def detect_and_alert(audio_data, sample_rate):
)
if summary:
write_alert("\n".join(summary))
else:
print("No tone detected.")
def main():
parser = argparse.ArgumentParser(description="ICAD Tone Detection")
@@ -90,7 +87,6 @@ def main():
print(f"Processing WAV file: {args.wav}")
try:
audio = AudioSegment.from_file(args.wav)
# Convert to mono if necessary
if audio.channels > 1:
audio = audio.set_channels(1)
print(f"AudioSegment: channels={audio.channels}, frame_rate={audio.frame_rate}, duration={len(audio)}ms")
@@ -114,8 +110,9 @@ def main():
if buffer.tell() > SAMPLE_RATE * CHUNK_DURATION * 2:
buffer.seek(0)
audio = AudioSegment.from_file(buffer, format="mp3")
samples = np.array(audio.get_array_of_samples())
detect_and_alert(samples, audio.frame_rate)
if audio.channels > 1:
audio = audio.set_channels(1)
detect_and_alert(audio, audio.frame_rate)
buffer = io.BytesIO()
except requests.exceptions.RequestException as e:
print(f"Connection error: {e}", file=sys.stderr)
@@ -125,13 +122,34 @@ def main():
sys.exit(4)
elif AUDIO_SOURCE == "soundcard":
print("Listening to audio device:")
def callback(indata, frames, time, status):
samples = indata[:, 0]
detect_and_alert(samples, SAMPLE_RATE)
buffer = np.array([], dtype=np.float32)
min_samples = MIN_SAMPLES # Use configured minimum samples
def callback(indata, frames, time_info, status):
nonlocal buffer
try:
samples = indata[:, 0]
buffer = np.concatenate((buffer, samples))
# Only process when buffer is large enough
while buffer.size >= min_samples:
int_samples = np.int16(buffer[:min_samples] * 32767)
audio = AudioSegment(
data=int_samples.tobytes(),
sample_width=2,
frame_rate=SAMPLE_RATE,
channels=1
)
detect_and_alert(audio, SAMPLE_RATE)
buffer = buffer[min_samples:] # keep remainder for next window
except Exception as e:
print(f"Callback error: {e}", file=sys.stderr)
try:
with sd.InputStream(samplerate=SAMPLE_RATE, channels=INPUT_CHANNELS, callback=callback):
while True:
time.sleep(CHUNK_DURATION)
with sd.InputStream(samplerate=SAMPLE_RATE, channels=INPUT_CHANNELS, dtype='float32', callback=callback):
print("Press Ctrl+C to stop.")
import signal
signal.pause() # Wait for Ctrl+C, keeps CPU usage minimal
except KeyboardInterrupt:
print("Stopped by user.")
except Exception as e:
print(f"Error accessing soundcard: {e}", file=sys.stderr)
sys.exit(5)