mirror of
https://github.com/SpudGunMan/meshing-around.git
synced 2026-03-28 17:32:36 +01:00
@@ -5,35 +5,29 @@
|
||||
# output to alert.txt for meshing-around bot
|
||||
# 2025 K7MHI Kelly Keeton
|
||||
|
||||
import sys
|
||||
import time
|
||||
import sounddevice as sd
|
||||
from icad_tone_detection import tone_detect
|
||||
from pydub import AudioSegment
|
||||
import requests
|
||||
import numpy as np
|
||||
import argparse
|
||||
import io
|
||||
|
||||
# ---------------------------
|
||||
# User Configuration Section
|
||||
# ---------------------------
|
||||
ALERT_FILE_PATH = "alert.txt" # Path to alert log file, or None to disable logging
|
||||
AUDIO_SOURCE = "soundcard" # "soundcard" for mic/line-in, "http" for stream
|
||||
HTTP_STREAM_URL = "" # Set to your stream URL if using "http"
|
||||
SAMPLE_RATE = 16000 # Audio sample rate (Hz)
|
||||
INPUT_CHANNELS = 1 # Number of input channels (1=mono)
|
||||
MIN_SAMPLES = 4096 # Minimum samples per detection window (increase for better accuracy)
|
||||
STREAM_BUFFER = 32000 # Number of bytes to buffer before detection (for MP3 streams)
|
||||
INPUT_DEVICE = 0 # Set to device index or name, or None for default
|
||||
|
||||
# Automatically set SAMPLE_RATE to the device's default sample rate
|
||||
try:
|
||||
device_info = sd.query_devices(INPUT_DEVICE, kind='input')
|
||||
SAMPLE_RATE = int(device_info['default_samplerate'])
|
||||
except Exception as e:
|
||||
SAMPLE_RATE = 44100 # fallback to a common supported rate
|
||||
INPUT_DEVICE = 0 # Set to device index or name, or None for default
|
||||
# ---------------------------
|
||||
|
||||
import sys
|
||||
import time
|
||||
from icad_tone_detection import tone_detect
|
||||
from pydub import AudioSegment
|
||||
import requests
|
||||
import sounddevice as sd
|
||||
import numpy as np
|
||||
import argparse
|
||||
import io
|
||||
|
||||
def write_alert(message):
|
||||
if ALERT_FILE_PATH:
|
||||
try:
|
||||
@@ -164,28 +158,25 @@ def main():
|
||||
print(f"Error processing HTTP stream: {e}", file=sys.stderr)
|
||||
sys.exit(4)
|
||||
elif AUDIO_SOURCE == "soundcard":
|
||||
# print("Available audio devices:")
|
||||
# for i, dev in enumerate(sd.query_devices()):
|
||||
# print(f"{i}: {dev['name']} ({dev['max_input_channels']} in, {dev['max_output_channels']} out)")
|
||||
print("Listening to audio device:")
|
||||
buffer = np.array([], dtype=np.float32)
|
||||
# Set a larger chunk size for detection, e.g. 8192
|
||||
DETECTION_CHUNK = 8192
|
||||
min_samples = MIN_SAMPLES # Use configured minimum samples
|
||||
|
||||
def callback(indata, frames, time_info, status):
|
||||
nonlocal buffer
|
||||
try:
|
||||
samples = indata[:, 0]
|
||||
buffer = np.concatenate((buffer, samples))
|
||||
# --- Simple audio level detection ---
|
||||
rms = np.sqrt(np.mean(samples**2))
|
||||
if rms > 0.01:
|
||||
print(f"Audio detected! RMS: {rms:.3f} ", end='\r')
|
||||
if rms > 0.5:
|
||||
print(f"WARNING: Audio too loud! RMS: {rms:.3f} ", end='\r')
|
||||
# --- End audio level detection ---
|
||||
# Only process when buffer is large enough
|
||||
while buffer.size >= DETECTION_CHUNK:
|
||||
chunk = buffer[:DETECTION_CHUNK]
|
||||
int_samples = np.int16(chunk * 32767)
|
||||
while buffer.size >= min_samples:
|
||||
int_samples = np.int16(buffer[:min_samples] * 32767)
|
||||
audio = AudioSegment(
|
||||
data=int_samples.tobytes(),
|
||||
sample_width=2,
|
||||
@@ -193,17 +184,11 @@ def main():
|
||||
channels=1
|
||||
)
|
||||
detect_and_alert(audio, SAMPLE_RATE)
|
||||
buffer = buffer[DETECTION_CHUNK:] # keep remainder for next window
|
||||
buffer = buffer[min_samples:] # keep remainder for next window
|
||||
except Exception as e:
|
||||
print(f"Callback error: {e}", file=sys.stderr)
|
||||
try:
|
||||
with sd.InputStream(
|
||||
samplerate=SAMPLE_RATE,
|
||||
channels=INPUT_CHANNELS,
|
||||
dtype='float32',
|
||||
callback=callback,
|
||||
device=INPUT_DEVICE
|
||||
):
|
||||
with sd.InputStream(samplerate=SAMPLE_RATE, channels=INPUT_CHANNELS, dtype='float32', callback=callback):
|
||||
print("Press Ctrl+C to stop.")
|
||||
import signal
|
||||
signal.pause() # Wait for Ctrl+C, keeps CPU usage minimal
|
||||
@@ -218,4 +203,3 @@ def main():
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
||||
|
||||
Reference in New Issue
Block a user