#!/usr/bin/env python3 """ llama.cpp Watchdog Service Monitors llama-server health and restarts on failure. Detects: - Router health endpoint failures - Zombie child model-server processes - Loaded models that are unreachable through the router Per-model tracking: - Individual model failures never trigger a full service restart - Newly loaded models get a grace period before probing - Persistently failing models are unloaded and put in cooldown instead of causing restarts - Only router-level health failures trigger service restarts """ import subprocess import requests import time import signal from datetime import datetime from collections import defaultdict # Configuration SERVERS = [ {"name": "llama-main", "port": 11434, "service": "llama-cpp"}, {"name": "llama-alt", "port": 8082, "service": None}, ] LLAMA_SERVER_BIN = "/home/aj/llama.cpp/build/bin/llama-server" MODELS_DIR = "/home/aj/models" MODELS_PRESET = "/home/aj/models/models.ini" CHECK_INTERVAL = 30 # seconds between checks HEALTH_TIMEOUT = 10 # router health check timeout DEEP_CHECK_TIMEOUT = 30 # model probe timeout MAX_HEALTH_FAILURES = 2 # restart after N router health failures MAX_MODEL_PROBE_FAILURES = 5 # ignore model after N probe failures MODEL_LOAD_GRACE_PERIOD = 300 # skip probing models loaded within last 5 min MODEL_FAILURE_COOLDOWN = 600 # stop probing a failed model for 10 min # Per-server health failure tracking (router-level) health_failures = {} # Per-model failure tracking: {port: {model_name: count}} model_failures = defaultdict(lambda: defaultdict(int)) # Per-model first-seen timestamps: {port: {model_name: timestamp}} model_first_seen = defaultdict(dict) # Per-model cooldown timestamps: {port: {model_name: timestamp}} model_cooldowns = defaultdict(dict) def log(message): """Log with timestamp.""" print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] {message}", flush=True) def check_health(port): """Check if the router process is responding.""" try: response = requests.get(f"http://localhost:{port}/health", timeout=HEALTH_TIMEOUT) return response.status_code == 200 except requests.exceptions.RequestException: return False def get_loaded_models(port): """Get list of models the router reports as loaded, with status info. Returns list of dicts: [{"id": "model-name", "status": "loaded"}, ...] """ try: response = requests.get(f"http://localhost:{port}/v1/models", timeout=HEALTH_TIMEOUT) if response.status_code != 200: return [] data = response.json() models = [] for m in data.get("data", []): status_value = m.get("status", {}).get("value", "unknown") models.append({"id": m["id"], "status": status_value}) return models except Exception: return [] def probe_model(port, model_name): """Send a minimal completions request to verify a loaded model actually works.""" try: response = requests.post( f"http://localhost:{port}/v1/chat/completions", json={ "model": model_name, "messages": [{"role": "user", "content": "hi"}], "max_tokens": 1, }, timeout=DEEP_CHECK_TIMEOUT, ) return response.status_code == 200 except requests.exceptions.RequestException: return False def unload_model(port, model_name): """Ask the router to unload a specific model.""" try: response = requests.post( f"http://localhost:{port}/models/unload", json={"model": model_name}, timeout=HEALTH_TIMEOUT, ) return response.status_code == 200 except requests.exceptions.RequestException: return False def check_zombies(): """Check for zombie llama-server processes.""" result = subprocess.run(["ps", "aux"], capture_output=True, text=True) zombies = [] for line in result.stdout.split("\n"): if "llama-server" in line and "" in line: parts = line.split() if len(parts) >= 2: zombies.append(parts[1]) return zombies def restart_via_systemd(service_name): """Restart a server using systemd.""" log(f"Restarting {service_name} via systemd...") result = subprocess.run( ["sudo", "systemctl", "restart", service_name], capture_output=True, text=True, ) if result.returncode == 0: log(f"{service_name} restart command succeeded") else: log(f"{service_name} restart failed: {result.stderr.strip()}") # Give it time to come up time.sleep(5) def restart_manual(server): """Restart a server that has no systemd service by killing and re-launching.""" port = server["port"] host = server["host"] if "host" in server else "0.0.0.0" name = server["name"] log(f"Restarting {name} (manual, port {port})...") # Kill existing subprocess.run(["pkill", "-9", "-f", f"llama-server.*--port.*{port}"], capture_output=True) time.sleep(2) cmd = [ LLAMA_SERVER_BIN, "--host", host, "--port", str(port), "--models-dir", MODELS_DIR, "--models-preset", MODELS_PRESET, ] subprocess.Popen(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, start_new_session=True) time.sleep(5) if check_health(port): log(f"{name} started successfully") else: log(f"{name} failed to start") def restart_server(server): """Restart a server using the appropriate method.""" service = server.get("service") if service: restart_via_systemd(service) else: restart_manual(server) def clear_model_tracking(port): """Clear all per-model tracking state for a server after restart.""" model_failures[port].clear() model_first_seen[port].clear() model_cooldowns[port].clear() def run_watchdog(): """Main watchdog loop.""" log("llama.cpp watchdog starting...") log(f"Config: health_failures_threshold={MAX_HEALTH_FAILURES}, " f"model_probe_failures_threshold={MAX_MODEL_PROBE_FAILURES}, " f"grace_period={MODEL_LOAD_GRACE_PERIOD}s, " f"cooldown={MODEL_FAILURE_COOLDOWN}s") for server in SERVERS: health_failures[server["port"]] = 0 while True: try: now = time.time() # --- Phase 1: Check for zombie child processes --- zombies = check_zombies() if zombies: log(f"Found {len(zombies)} zombie llama-server process(es): {zombies}") for server in SERVERS: restart_server(server) health_failures[server["port"]] = 0 clear_model_tracking(server["port"]) time.sleep(CHECK_INTERVAL) continue # --- Phase 2: Router health checks --- for server in SERVERS: port = server["port"] name = server["name"] if not check_health(port): health_failures[port] += 1 log(f"{name} health check failed ({health_failures[port]}/{MAX_HEALTH_FAILURES})") if health_failures[port] >= MAX_HEALTH_FAILURES: restart_server(server) health_failures[port] = 0 clear_model_tracking(port) continue # Health check passed - reset health failure counter if health_failures[port] > 0: log(f"{name} router recovered after {health_failures[port]} failure(s)") health_failures[port] = 0 # --- Phase 3: Deep check - probe loaded models individually --- models = get_loaded_models(port) if not models: continue # Track which models are currently loaded so we can clean up stale entries current_model_ids = {m["id"] for m in models} # Clean up tracking for models that are no longer loaded for tracking_dict in [model_first_seen, model_failures, model_cooldowns]: if port in tracking_dict: stale = [m for m in tracking_dict[port] if m not in current_model_ids] for m in stale: del tracking_dict[port][m] for model_info in models: model_name = model_info["id"] model_status = model_info["status"] # Skip models that are still loading if model_status == "loading": log(f"{name}: model '{model_name}' is still loading, skipping probe") continue # Skip models that aren't fully loaded if model_status != "loaded": continue # Track first-seen time for grace period if model_name not in model_first_seen[port]: model_first_seen[port][model_name] = now log(f"{name}: new model '{model_name}' detected, " f"grace period {MODEL_LOAD_GRACE_PERIOD}s before probing") continue # Skip if within grace period age = now - model_first_seen[port][model_name] if age < MODEL_LOAD_GRACE_PERIOD: remaining = int(MODEL_LOAD_GRACE_PERIOD - age) # Only log occasionally to avoid spam if int(age) % 60 < CHECK_INTERVAL: log(f"{name}: model '{model_name}' in grace period ({remaining}s remaining)") continue # Skip if in cooldown from previous failures if model_name in model_cooldowns[port]: cooldown_elapsed = now - model_cooldowns[port][model_name] if cooldown_elapsed < MODEL_FAILURE_COOLDOWN: remaining = int(MODEL_FAILURE_COOLDOWN - cooldown_elapsed) if int(cooldown_elapsed) % 120 < CHECK_INTERVAL: log(f"{name}: model '{model_name}' in cooldown ({remaining}s remaining)") continue else: # Cooldown expired, give it another chance log(f"{name}: model '{model_name}' cooldown expired, resuming probes") del model_cooldowns[port][model_name] model_failures[port][model_name] = 0 # Probe the model if probe_model(port, model_name): # Probe succeeded - reset failure counter if model_failures[port][model_name] > 0: log(f"{name}: model '{model_name}' recovered after " f"{model_failures[port][model_name]} failure(s)") model_failures[port][model_name] = 0 else: # Probe failed model_failures[port][model_name] += 1 fail_count = model_failures[port][model_name] log(f"{name}: model '{model_name}' probe failed " f"({fail_count}/{MAX_MODEL_PROBE_FAILURES})") if fail_count >= MAX_MODEL_PROBE_FAILURES: # Try to unload the bad model to free resources if unload_model(port, model_name): log(f"{name}: model '{model_name}' persistently unreachable, " f"unloaded successfully, cooldown {MODEL_FAILURE_COOLDOWN}s (NO restart)") else: log(f"{name}: model '{model_name}' persistently unreachable, " f"unload failed, cooldown {MODEL_FAILURE_COOLDOWN}s (NO restart)") model_cooldowns[port][model_name] = now time.sleep(CHECK_INTERVAL) except KeyboardInterrupt: log("Watchdog stopping...") break except Exception as e: log(f"Watchdog error: {e}") time.sleep(CHECK_INTERVAL) if __name__ == "__main__": run_watchdog()