diff --git a/scripts/watchdog/watchdog.py b/scripts/watchdog/watchdog.py index 9a09caf..8947df9 100755 --- a/scripts/watchdog/watchdog.py +++ b/scripts/watchdog/watchdog.py @@ -63,8 +63,8 @@ def log(message: str, level: str = 'INFO'): # USB Device Reset Constant USBDEVFS_RESET = 21780 # 0x5514 -def auto_detect_usb_device() -> str: - """Attempt to auto-detect the physical USB device path (e.g., /dev/bus/usb/001/002) for LoRa.""" +def auto_detect_serial_port() -> str: + """Detect the serial port (e.g., /dev/ttyACM0) from environment or by-id.""" env_file = os.path.join(MCWEBUI_DIR, '.env') serial_port = 'auto' @@ -85,7 +85,7 @@ def auto_detect_usb_device() -> str: if len(devices) == 1: serial_port = str(devices[0]) elif len(devices) > 1: - log("Multiple serial devices found, cannot auto-detect USB device for reset", "WARN") + log("Multiple serial devices found, cannot auto-detect single port", "WARN") return None else: log("No serial devices found in /dev/serial/by-id", "WARN") @@ -97,12 +97,22 @@ def auto_detect_usb_device() -> str: if not serial_port or not os.path.exists(serial_port): log(f"Serial port {serial_port} not found", "WARN") return None + + try: + real_tty = os.path.realpath(serial_port) + return real_tty + except Exception as e: + log(f"Error resolving serial port {serial_port}: {e}", "ERROR") + return None + +def auto_detect_usb_device() -> str: + """Attempt to auto-detect the physical USB device path (e.g., /dev/bus/usb/001/002) for LoRa.""" + real_tty = auto_detect_serial_port() + if not real_tty: + return None try: - # Resolve symlink to get actual tty device (e.g., /dev/ttyACM0) - real_tty = os.path.realpath(serial_port) tty_name = os.path.basename(real_tty) - # Find USB bus and dev number via sysfs sysfs_path = f"/sys/class/tty/{tty_name}/device" if not os.path.exists(sysfs_path): @@ -126,6 +136,42 @@ def auto_detect_usb_device() -> str: log(f"Error during USB device auto-detection: {e}", "ERROR") return None +def reset_esp32_device(): + """Perform a hardware reset on ESP32/LoRa device using DTR/RTS lines via ioctl.""" + serial_port = auto_detect_serial_port() + if not serial_port: + log("Cannot perform ESP32 reset: serial port could not be determined", "WARN") + return False + + log(f"Performing ESP32 hard reset via DTR/RTS on {serial_port}", "WARN") + try: + import struct + import termios + + fd = os.open(serial_port, os.O_RDWR | os.O_NOCTTY | os.O_NONBLOCK) + + TIOCM_DTR = 0x002 + TIOCM_RTS = 0x004 + TIOCMBIS = getattr(termios, 'TIOCMBIS', 0x5416) + TIOCMBIC = getattr(termios, 'TIOCMBIC', 0x5417) + + # Reset sequence used by esptool + # DTR=False (Clear), RTS=True (Set) -> EN=Low + fcntl.ioctl(fd, TIOCMBIC, struct.pack('i', TIOCM_DTR)) + fcntl.ioctl(fd, TIOCMBIS, struct.pack('i', TIOCM_RTS)) + time.sleep(0.1) + + # DTR=False (Clear), RTS=False (Clear) -> EN=High + fcntl.ioctl(fd, TIOCMBIC, struct.pack('i', TIOCM_DTR | TIOCM_RTS)) + time.sleep(0.05) + + os.close(fd) + log("ESP32 DTR/RTS reset sent successfully", "INFO") + return True + except Exception as e: + log(f"ESP32 DTR/RTS reset failed: {e}", "ERROR") + return False + def reset_usb_device(): """Perform a hardware USB bus reset on the LoRa device.""" device_path = os.environ.get('USB_DEVICE_PATH') @@ -326,6 +372,7 @@ def handle_unhealthy_container(container_name: str, status: dict): log(f"{container_name} has been restarted {recent_restarts} times in the last 8 minutes. Attempting hardware USB reset.", "WARN") # Stop the container first so it releases the serial port run_compose_command(['stop', container_name]) + reset_esp32_device() if reset_usb_device(): time.sleep(5) # Give OS time to re-enumerate the device restart_success = start_container(container_name) @@ -402,6 +449,7 @@ def handle_unresponsive_device(container_name: str, status: dict): log(f"{container_name} has been restarted {recent_restarts} times in the last 8 minutes. Attempting hardware USB reset.", "WARN") # Stop the container first so it releases the serial port run_compose_command(['stop', container_name]) + reset_esp32_device() if reset_usb_device(): time.sleep(5) # Give OS time to re-enumerate the device restart_success = start_container(container_name)