#!/usr/bin/env python3 """ Web Log Security Monitor Real-time daemon that analyzes Traefik access logs using local LLM on athena.lan. Sends alerts via Gotify when suspicious activity is detected. Uses LLM tool calling to query historical threat data for context-aware analysis. """ import argparse import os import signal import subprocess import requests import json import re import time from collections import Counter from datetime import datetime from pathlib import Path import ipaddress import threat_db # Configuration from environment variables LLAMA_URL = os.environ.get("LLAMA_URL", "http://localhost:11434/v1/chat/completions") MODEL = os.environ.get("MODEL", "Qwen3-8B-Q6_K") GOTIFY_URL = os.environ.get("GOTIFY_URL", "") GOTIFY_TOKEN = os.environ.get("GOTIFY_TOKEN", "") LOG_PATH = os.environ.get("LOG_PATH", "/logs/access.log") LOG_MODE = os.environ.get("LOG_MODE", "local") BARGE_HOST = os.environ.get("BARGE_HOST", "barge.lan") ABUSE_LOG = os.environ.get("ABUSE_LOG", "/data/abuse.log") STATE_DIR = os.environ.get("STATE_DIR", "/data") BATCH_SIZE = int(os.environ.get("BATCH_SIZE", "100")) MAX_LINES_PER_RUN = int(os.environ.get("MAX_LINES_PER_RUN", "1000")) LLM_INTERVAL = int(os.environ.get("LLM_INTERVAL", "25")) TAIL_POLL_INTERVAL = int(os.environ.get("TAIL_POLL_INTERVAL", "1")) DB_SERVER = os.environ.get("DB_SERVER") DB_NAME = os.environ.get("DB_NAME") DB_USER = os.environ.get("DB_USER") DB_PASSWORD = os.environ.get("DB_PASSWORD") # State file location if STATE_DIR: STATE_FILE = Path(STATE_DIR) / ".web-log-monitor-state" else: SCRIPT_DIR = Path(__file__).parent STATE_FILE = SCRIPT_DIR / ".web-log-monitor-state" # Pre-filter patterns to reduce LLM load (obvious attacks get flagged immediately) IMMEDIATE_ALERT_PATTERNS = [ (r"(?:union\s+select|select\s+.*\s+from|insert\s+into|delete\s+from|drop\s+table)", "SQL Injection"), (r"(?:\.\./|\.\.\\|%2e%2e%2f|%2e%2e/|\.\.%2f)", "Path Traversal"), (r"(?: {result_json[:200]}") messages.append({ "role": "tool", "tool_call_id": tc["id"], "content": result_json }) # Continue loop — send tool results back to LLM continue # Final text response — parse JSON content = message.get("content", "") try: json_match = re.search(r'\{[\s\S]*\}', content) if json_match: return json.loads(json_match.group()) return {"suspicious": False, "findings": [], "summary": "Failed to parse LLM response"} except json.JSONDecodeError as e: print(f"Failed to parse LLM response: {e}") print(f"Raw response: {content[:500]}") return None # Exhausted tool-call rounds without a final answer print("Warning: LLM exhausted tool-call rounds without a final response") return {"suspicious": False, "findings": [], "summary": "Analysis incomplete (tool-call limit)"} def send_alert(title, message, priority=5): """Send alert via Gotify.""" if GOTIFY_TOKEN == "YOUR_TOKEN_HERE": print(f"[ALERT - Gotify not configured] {title}: {message}") return try: response = requests.post( GOTIFY_URL, params={"token": GOTIFY_TOKEN}, json={ "title": title, "message": message, "priority": priority, }, timeout=10 ) response.raise_for_status() print(f"Alert sent: {title}") except requests.exceptions.RequestException as e: print(f"Failed to send Gotify alert: {e}") def format_findings(findings): """Format findings for alert message.""" lines = [] for f in findings[:5]: # Limit to 5 findings per alert lines.append(f"[{f['severity'].upper()}] {f['type']}") if f.get("ip"): lines.append(f" IP: {f['ip']}") lines.append(f" Evidence: {f['evidence'][:100]}...") lines.append(f" Action: {f['recommendation']}") lines.append("") return "\n".join(lines) def print_log_stats(logs): """Print verbose statistics about the logs.""" if not logs: print("\n No logs to analyze") return parsed_logs = [parse_log_line(line) for line in logs] parsed_logs = [p for p in parsed_logs if p] # Filter None if not parsed_logs: print("\n No parseable logs found") return # Time range timestamps = [p["timestamp"] for p in parsed_logs] print(f"\n{'─' * 60}") print("LOG STATISTICS") print(f"{'─' * 60}") print(f" Time range: {timestamps[0]}") print(f" {timestamps[-1]}") print(f" Total entries: {len(parsed_logs)}") # Status codes status_counts = Counter(p["status"] for p in parsed_logs) print(f"\n Status Codes:") for status, count in sorted(status_counts.items()): bar = "█" * min(count // 5, 30) print(f" {status}: {count:>5} {bar}") # Top IPs ip_counts = Counter(p["ip"] for p in parsed_logs) print(f"\n Top Source IPs:") for ip, count in ip_counts.most_common(5): pct = (count / len(parsed_logs)) * 100 print(f" {ip:<18} {count:>5} ({pct:.1f}%)") # Services service_counts = Counter(p["service"] for p in parsed_logs) print(f"\n Requests by Service:") for service, count in service_counts.most_common(10): pct = (count / len(parsed_logs)) * 100 print(f" {service:<25} {count:>5} ({pct:.1f}%)") # HTTP Methods method_counts = Counter() for p in parsed_logs: method = p["request"].split()[0] if p["request"] else "UNKNOWN" method_counts[method] += 1 print(f"\n HTTP Methods:") for method, count in method_counts.most_common(): print(f" {method:<8} {count:>5}") # Latency stats latencies = [p["latency_ms"] for p in parsed_logs] avg_latency = sum(latencies) / len(latencies) max_latency = max(latencies) print(f"\n Latency:") print(f" Average: {avg_latency:.1f}ms") print(f" Max: {max_latency}ms") # Error rate errors = sum(1 for p in parsed_logs if p["status"] >= 400) error_rate = (errors / len(parsed_logs)) * 100 print(f"\n Error rate: {error_rate:.1f}% ({errors} requests)") print(f"{'─' * 60}\n") def process_batch(logs, args): """Process a batch of log lines: immediate alerts + LLM analysis + alerting.""" all_findings = [] # Check for immediate alert patterns immediate_alerts = check_immediate_alerts(logs) if immediate_alerts: all_findings.extend(immediate_alerts) print(f" Found {len(immediate_alerts)} immediate alert patterns") # Filter noise and analyze with LLM filtered_logs = filter_noise(logs) print(f" Filtered {len(logs)} -> {len(filtered_logs)} logs for LLM analysis") if filtered_logs: llm_available = True for i in range(0, len(filtered_logs), BATCH_SIZE): if not llm_available: print(" Skipping remaining batches (LLM unavailable)") break batch = filtered_logs[i:i + BATCH_SIZE] print(f" Analyzing batch {i // BATCH_SIZE + 1} ({len(batch)} logs)...") result = analyze_with_llm(batch) if result is None: llm_available = False continue if result.get("suspicious"): findings = result.get("findings", []) # Record LLM findings to threat DB if DB_SERVER: for f in findings: ip = f.get("ip") if ip and not is_private_ip(ip): try: threat_db.record_event( ip, f.get("type", "unknown"), "llm", severity=f.get("severity"), evidence=f.get("evidence", "")[:500] ) except Exception as e: print(f" Failed to record LLM finding to DB: {e}") if f.get("severity") in ("high", "critical"): log_abuse(ip, f.get("type", "unknown")) all_findings.extend(findings) print(f" LLM found suspicious activity: {result.get('summary')}") # Send consolidated alert if findings exist if all_findings: severity_order = {"critical": 0, "high": 1, "medium": 2, "low": 3} all_findings.sort(key=lambda x: severity_order.get(x.get("severity", "low"), 3)) max_severity = all_findings[0].get("severity", "low") priority = {"critical": 10, "high": 8, "medium": 5, "low": 3}.get(max_severity, 5) alert_msg = format_findings(all_findings) if args.verbose: print(f"\n{'─' * 60}") print("FINDINGS") print(f"{'─' * 60}") print(alert_msg) if not args.dry_run: send_alert( f"Web Security Alert ({len(all_findings)} findings)", alert_msg, priority=priority ) else: print(f" [Dry run] Would send alert with {len(all_findings)} findings") else: print(" No suspicious activity detected") def run_daemon(args): """Run as a long-lived daemon, tailing the log file in real time.""" print(f"[{datetime.now().isoformat()}] Starting daemon mode") print(f" Log file: {LOG_PATH}") print(f" LLM interval: {LLM_INTERVAL}s") print(f" Poll interval: {TAIL_POLL_INTERVAL}s") if DB_SERVER: print(f" Threat DB: {DB_SERVER}/{DB_NAME}") else: print(" Threat DB: disabled (no DB_SERVER configured)") buffer = [] last_flush = time.monotonic() last_save = time.monotonic() current_pos = get_last_position() for line, pos in tail_log(LOG_PATH): current_pos = pos # Immediate regex check on every line immediate = check_immediate_alerts([line]) if immediate: print(f"[{datetime.now().isoformat()}] Immediate alert: {immediate[0]['type']}") buffer.append(line) now = time.monotonic() # Flush buffer to LLM at interval if now - last_flush >= LLM_INTERVAL and buffer: # Cap to BATCH_SIZE to avoid overwhelming the LLM with huge backlogs. # Remaining lines carry forward to the next interval. flush_batch = buffer[:BATCH_SIZE] buffer = buffer[BATCH_SIZE:] if buffer: print(f"\n[{datetime.now().isoformat()}] Flushing {len(flush_batch)} lines to LLM ({len(buffer)} deferred)...") else: print(f"\n[{datetime.now().isoformat()}] Flushing {len(flush_batch)} lines to LLM...") if args.verbose: print_log_stats(flush_batch) process_batch(flush_batch, args) # Reset timer AFTER processing so we don't immediately flush again last_flush = time.monotonic() # Save position periodically if now - last_save >= 30: if not args.dry_run: save_position(current_pos) last_save = now # Shutdown: flush remaining buffer and save state if buffer: print(f"\n[{datetime.now().isoformat()}] Shutdown: flushing {len(buffer)} remaining lines...") process_batch(buffer, args) if not args.dry_run: save_position(current_pos) print(f"Final position saved: {current_pos}") def run_once(args): """Run a single analysis pass (original cron-job behavior).""" print(f"[{datetime.now().isoformat()}] Starting single-run analysis...") if args.dry_run: print("(Dry run - no alerts will be sent, state will not be updated)") # Get current position last_pos = get_last_position() total_lines = get_total_lines() # If first run or log rotated, start from recent logs if last_pos == 0 or last_pos > total_lines: last_pos = max(0, total_lines - MAX_LINES_PER_RUN) print(f"Starting from line {last_pos} (total: {total_lines})") lines_to_process = min(total_lines - last_pos, MAX_LINES_PER_RUN) if lines_to_process <= 0: print("No new logs to process") return print(f"Processing {lines_to_process} new log lines...") # Fetch logs logs = fetch_logs(last_pos, lines_to_process, total_lines) if not logs: print("No logs fetched") return # Show verbose stats if requested if args.verbose: print_log_stats(logs) process_batch(logs, args) # Save position if not args.dry_run: save_position(last_pos + len(logs)) print(f"Processed up to line {last_pos + len(logs)}") else: print(f"[Dry run] Would save position: {last_pos + len(logs)}") def parse_args(): """Parse command line arguments.""" parser = argparse.ArgumentParser( description="Analyze Traefik access logs for suspicious activity using LLM" ) parser.add_argument( "-v", "--verbose", action="store_true", help="Show detailed log statistics" ) parser.add_argument( "--dry-run", action="store_true", help="Analyze logs but don't send alerts or update state" ) parser.add_argument( "--once", action="store_true", help="Run a single analysis pass and exit (original cron-job mode)" ) return parser.parse_args() def main(): args = parse_args() # Initialize threat database if configured if DB_SERVER and DB_NAME and DB_USER and DB_PASSWORD: try: threat_db.init_db(DB_SERVER, DB_NAME, DB_USER, DB_PASSWORD) print(f"Threat database initialized: {DB_SERVER}/{DB_NAME}") except Exception as e: print(f"Warning: Failed to initialize threat database: {e}") print("Continuing without historical threat data.") else: print("Threat database not configured — running without historical data.") if args.once: run_once(args) else: # Daemon mode — register signal handlers for graceful shutdown signal.signal(signal.SIGTERM, _handle_signal) signal.signal(signal.SIGINT, _handle_signal) run_daemon(args) if __name__ == "__main__": main()