321a43ac81
Monitors llama-server health with multi-phase checks (zombie detection, health endpoint, loaded model probing) and auto-restarts via systemd or manual relaunch on consecutive failures. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
217 lines
6.6 KiB
Python
217 lines
6.6 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
llama.cpp Watchdog Service
|
|
Monitors llama-server health and restarts on failure.
|
|
|
|
Detects:
|
|
- Router health endpoint failures
|
|
- Zombie child model-server processes
|
|
- Loaded models that are unreachable through the router
|
|
"""
|
|
|
|
import subprocess
|
|
import requests
|
|
import time
|
|
import signal
|
|
from datetime import datetime
|
|
|
|
# Configuration
|
|
SERVERS = [
|
|
{"name": "llama-main", "port": 11434, "service": "llama-cpp"},
|
|
{"name": "llama-alt", "port": 8082, "service": None},
|
|
]
|
|
|
|
LLAMA_SERVER_BIN = "/home/aj/llama.cpp/build/bin/llama-server"
|
|
MODELS_DIR = "/home/aj/models"
|
|
MODELS_PRESET = "/home/aj/models/models.ini"
|
|
|
|
CHECK_INTERVAL = 30 # seconds between health checks
|
|
HEALTH_TIMEOUT = 10 # seconds to wait for health response
|
|
DEEP_CHECK_TIMEOUT = 30 # seconds to wait for model probe
|
|
MAX_CONSECUTIVE_FAILURES = 2 # restart after this many failures
|
|
|
|
# Track failures per server
|
|
failure_counts = {}
|
|
|
|
|
|
def log(message):
|
|
"""Log with timestamp."""
|
|
print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] {message}", flush=True)
|
|
|
|
|
|
def check_health(port):
|
|
"""Check if the router process is responding."""
|
|
try:
|
|
response = requests.get(f"http://localhost:{port}/health", timeout=HEALTH_TIMEOUT)
|
|
return response.status_code == 200
|
|
except requests.exceptions.RequestException:
|
|
return False
|
|
|
|
|
|
def get_loaded_models(port):
|
|
"""Get list of models the router reports as loaded."""
|
|
try:
|
|
response = requests.get(f"http://localhost:{port}/v1/models", timeout=HEALTH_TIMEOUT)
|
|
if response.status_code != 200:
|
|
return []
|
|
data = response.json()
|
|
return [
|
|
m["id"] for m in data.get("data", [])
|
|
if m.get("status", {}).get("value") == "loaded"
|
|
]
|
|
except Exception:
|
|
return []
|
|
|
|
|
|
def probe_model(port, model_name):
|
|
"""Send a minimal completions request to verify a loaded model actually works."""
|
|
try:
|
|
response = requests.post(
|
|
f"http://localhost:{port}/v1/chat/completions",
|
|
json={
|
|
"model": model_name,
|
|
"messages": [{"role": "user", "content": "hi"}],
|
|
"max_tokens": 1,
|
|
},
|
|
timeout=DEEP_CHECK_TIMEOUT,
|
|
)
|
|
return response.status_code == 200
|
|
except requests.exceptions.RequestException:
|
|
return False
|
|
|
|
|
|
def check_zombies():
|
|
"""Check for zombie llama-server processes."""
|
|
result = subprocess.run(["ps", "aux"], capture_output=True, text=True)
|
|
zombies = []
|
|
for line in result.stdout.split("\n"):
|
|
if "llama-server" in line and "<defunct>" in line:
|
|
parts = line.split()
|
|
if len(parts) >= 2:
|
|
zombies.append(parts[1])
|
|
return zombies
|
|
|
|
|
|
def restart_via_systemd(service_name):
|
|
"""Restart a server using systemd."""
|
|
log(f"Restarting {service_name} via systemd...")
|
|
result = subprocess.run(
|
|
["sudo", "systemctl", "restart", service_name],
|
|
capture_output=True,
|
|
text=True,
|
|
)
|
|
if result.returncode == 0:
|
|
log(f"{service_name} restart command succeeded")
|
|
else:
|
|
log(f"{service_name} restart failed: {result.stderr.strip()}")
|
|
# Give it time to come up
|
|
time.sleep(5)
|
|
|
|
|
|
def restart_manual(server):
|
|
"""Restart a server that has no systemd service by killing and re-launching."""
|
|
port = server["port"]
|
|
host = server["host"] if "host" in server else "0.0.0.0"
|
|
name = server["name"]
|
|
|
|
log(f"Restarting {name} (manual, port {port})...")
|
|
|
|
# Kill existing
|
|
subprocess.run(["pkill", "-9", "-f", f"llama-server.*--port.*{port}"], capture_output=True)
|
|
time.sleep(2)
|
|
|
|
cmd = [
|
|
LLAMA_SERVER_BIN,
|
|
"--host", host,
|
|
"--port", str(port),
|
|
"--models-dir", MODELS_DIR,
|
|
"--models-preset", MODELS_PRESET,
|
|
]
|
|
subprocess.Popen(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, start_new_session=True)
|
|
time.sleep(5)
|
|
|
|
if check_health(port):
|
|
log(f"{name} started successfully")
|
|
else:
|
|
log(f"{name} failed to start")
|
|
|
|
|
|
def restart_server(server):
|
|
"""Restart a server using the appropriate method."""
|
|
service = server.get("service")
|
|
if service:
|
|
restart_via_systemd(service)
|
|
else:
|
|
restart_manual(server)
|
|
|
|
|
|
def run_watchdog():
|
|
"""Main watchdog loop."""
|
|
log("llama.cpp watchdog starting...")
|
|
|
|
for server in SERVERS:
|
|
failure_counts[server["port"]] = 0
|
|
|
|
while True:
|
|
try:
|
|
# --- Phase 1: Check for zombie child processes ---
|
|
zombies = check_zombies()
|
|
if zombies:
|
|
log(f"Found {len(zombies)} zombie llama-server process(es): {zombies}")
|
|
for server in SERVERS:
|
|
restart_server(server)
|
|
failure_counts[server["port"]] = 0
|
|
time.sleep(CHECK_INTERVAL)
|
|
continue
|
|
|
|
# --- Phase 2: Basic health checks ---
|
|
for server in SERVERS:
|
|
port = server["port"]
|
|
name = server["name"]
|
|
|
|
if not check_health(port):
|
|
failure_counts[port] += 1
|
|
log(f"{name} health check failed ({failure_counts[port]}/{MAX_CONSECUTIVE_FAILURES})")
|
|
|
|
if failure_counts[port] >= MAX_CONSECUTIVE_FAILURES:
|
|
restart_server(server)
|
|
failure_counts[port] = 0
|
|
continue
|
|
|
|
# --- Phase 3: Deep check - probe loaded models ---
|
|
loaded = get_loaded_models(port)
|
|
if loaded:
|
|
all_ok = True
|
|
for model in loaded:
|
|
if not probe_model(port, model):
|
|
log(f"{name}: loaded model '{model}' is unreachable!")
|
|
all_ok = False
|
|
break
|
|
|
|
if not all_ok:
|
|
failure_counts[port] += 1
|
|
log(f"{name} deep check failed ({failure_counts[port]}/{MAX_CONSECUTIVE_FAILURES})")
|
|
|
|
if failure_counts[port] >= MAX_CONSECUTIVE_FAILURES:
|
|
restart_server(server)
|
|
failure_counts[port] = 0
|
|
continue
|
|
|
|
# All checks passed
|
|
if failure_counts[port] > 0:
|
|
log(f"{name} recovered")
|
|
failure_counts[port] = 0
|
|
|
|
time.sleep(CHECK_INTERVAL)
|
|
|
|
except KeyboardInterrupt:
|
|
log("Watchdog stopping...")
|
|
break
|
|
except Exception as e:
|
|
log(f"Watchdog error: {e}")
|
|
time.sleep(CHECK_INTERVAL)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
run_watchdog()
|