Previously all LLM-flagged findings were written to the abuse log, causing fail2ban to potentially ban IPs for low-severity activity. Now only high and critical severity findings trigger abuse logging. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
805 lines
29 KiB
Python
805 lines
29 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Web Log Security Monitor
|
|
Real-time daemon that analyzes Traefik access logs using local LLM on athena.lan.
|
|
Sends alerts via Gotify when suspicious activity is detected.
|
|
Uses LLM tool calling to query historical threat data for context-aware analysis.
|
|
"""
|
|
|
|
import argparse
|
|
import os
|
|
import signal
|
|
import subprocess
|
|
import requests
|
|
import json
|
|
import re
|
|
import time
|
|
from collections import Counter
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
import ipaddress
|
|
|
|
import threat_db
|
|
|
|
# Configuration from environment variables
|
|
LLAMA_URL = os.environ.get("LLAMA_URL", "http://localhost:11434/v1/chat/completions")
|
|
MODEL = os.environ.get("MODEL", "Qwen3-8B-Q6_K")
|
|
GOTIFY_URL = os.environ.get("GOTIFY_URL", "")
|
|
GOTIFY_TOKEN = os.environ.get("GOTIFY_TOKEN", "")
|
|
LOG_PATH = os.environ.get("LOG_PATH", "/logs/access.log")
|
|
LOG_MODE = os.environ.get("LOG_MODE", "local")
|
|
BARGE_HOST = os.environ.get("BARGE_HOST", "barge.lan")
|
|
ABUSE_LOG = os.environ.get("ABUSE_LOG", "/data/abuse.log")
|
|
STATE_DIR = os.environ.get("STATE_DIR", "/data")
|
|
BATCH_SIZE = int(os.environ.get("BATCH_SIZE", "100"))
|
|
MAX_LINES_PER_RUN = int(os.environ.get("MAX_LINES_PER_RUN", "1000"))
|
|
LLM_INTERVAL = int(os.environ.get("LLM_INTERVAL", "25"))
|
|
TAIL_POLL_INTERVAL = int(os.environ.get("TAIL_POLL_INTERVAL", "1"))
|
|
DB_SERVER = os.environ.get("DB_SERVER")
|
|
DB_NAME = os.environ.get("DB_NAME")
|
|
DB_USER = os.environ.get("DB_USER")
|
|
DB_PASSWORD = os.environ.get("DB_PASSWORD")
|
|
|
|
# State file location
|
|
if STATE_DIR:
|
|
STATE_FILE = Path(STATE_DIR) / ".web-log-monitor-state"
|
|
else:
|
|
SCRIPT_DIR = Path(__file__).parent
|
|
STATE_FILE = SCRIPT_DIR / ".web-log-monitor-state"
|
|
|
|
# Pre-filter patterns to reduce LLM load (obvious attacks get flagged immediately)
|
|
IMMEDIATE_ALERT_PATTERNS = [
|
|
(r"(?:union\s+select|select\s+.*\s+from|insert\s+into|delete\s+from|drop\s+table)", "SQL Injection"),
|
|
(r"(?:\.\./|\.\.\\|%2e%2e%2f|%2e%2e/|\.\.%2f)", "Path Traversal"),
|
|
(r"(?:<script|javascript:|onerror=|onload=|%3cscript)", "XSS Attempt"),
|
|
(r"(?:/etc/passwd|/etc/shadow|/proc/self)", "LFI Attempt"),
|
|
(r"(?:;\s*(?:ls|cat|wget|curl|bash|sh|nc)\s)", "Command Injection"),
|
|
(r"(?:wp-login|wp-admin|xmlrpc\.php).*(?:POST|HEAD)", "WordPress Scan"),
|
|
(r"(?:phpmyadmin|pma|mysqladmin)", "DB Admin Scan"),
|
|
(r"(?:\.env|/\.git/|\.aws/|\.ssh/)", "Sensitive File Probe"),
|
|
]
|
|
|
|
SYSTEM_PROMPT = """/no_think
|
|
You are a security analyst reviewing web server access logs. You have access to a historical threat database that you can query using tool calls.
|
|
|
|
Analyze the logs for suspicious patterns including:
|
|
1. Reconnaissance: Port scans, directory enumeration, technology fingerprinting
|
|
2. Brute force: Repeated auth failures, credential stuffing patterns
|
|
3. Injection attempts: SQL, command, LDAP, XPath injection
|
|
4. Path traversal: Attempts to access files outside webroot
|
|
5. Scanner signatures: Known vulnerability scanners, malicious bots
|
|
6. Anomalies: Unusual request patterns, suspicious user agents, weird timing
|
|
|
|
IMPORTANT: When you see suspicious IPs, use the lookup_ip tool to check if they are repeat offenders. Use get_threat_summary to understand the current threat landscape. Use get_recent_attacks to check for ongoing attack campaigns. This historical context should inform your severity ratings — repeat offenders and IPs involved in multiple attack types should be rated more severely.
|
|
|
|
After your analysis (and any tool calls), respond with JSON only (no markdown, no explanation):
|
|
{
|
|
"suspicious": true/false,
|
|
"findings": [
|
|
{
|
|
"severity": "low|medium|high|critical",
|
|
"type": "category of attack",
|
|
"ip": "source IP if identifiable",
|
|
"evidence": "specific log line or pattern",
|
|
"recommendation": "brief action to take"
|
|
}
|
|
],
|
|
"summary": "one sentence overview"
|
|
}
|
|
|
|
If nothing suspicious, return: {"suspicious": false, "findings": [], "summary": "No suspicious activity detected"}
|
|
|
|
IMPORTANT CONTEXT about the infrastructure behind this reverse proxy (Traefik):
|
|
- Gitea (git server): Expect git clone/push/pull traffic with URLs like /user/repo.git/info/refs, /user/repo.git/git-upload-pack, etc. This is NORMAL git protocol traffic, not sensitive file probing. Gitea also serves web UI pages for browsing repositories.
|
|
- Nextcloud (cloud storage): Expect heavy API traffic including WebDAV, OCS API calls, app-specific endpoints (bookmarks, calendar, contacts sync). Burst of requests from a single IP to Nextcloud is normal client sync behavior, NOT reconnaissance.
|
|
- Immich (photo management): Expect API calls for photo sync and uploads from mobile/desktop clients.
|
|
- Gotify (push notifications): Expect WebSocket connections and message API calls.
|
|
- Other self-hosted services behind this proxy generate legitimate automated traffic patterns.
|
|
|
|
Do NOT flag the following as suspicious:
|
|
- Git protocol operations to Gitea repositories (even with 401 responses — git auth negotiation starts with a 401)
|
|
- Nextcloud sync bursts (many rapid requests from one IP to Nextcloud endpoints)
|
|
- Authenticated API traffic to any of the above services
|
|
- Standard browser navigation patterns to any hosted service
|
|
|
|
Be conservative - normal traffic like health checks, static assets, and authenticated user activity is not suspicious."""
|
|
|
|
# Tool definitions for the LLM (OpenAI function-calling format)
|
|
LLM_TOOLS = [
|
|
{
|
|
"type": "function",
|
|
"function": {
|
|
"name": "lookup_ip",
|
|
"description": "Look up historical threat data for an IP address. Returns past event count, attack types, and first/last seen timestamps. Use this to check if an IP is a repeat offender.",
|
|
"parameters": {
|
|
"type": "object",
|
|
"properties": {
|
|
"ip": {
|
|
"type": "string",
|
|
"description": "The IP address to look up"
|
|
}
|
|
},
|
|
"required": ["ip"]
|
|
}
|
|
}
|
|
},
|
|
{
|
|
"type": "function",
|
|
"function": {
|
|
"name": "get_threat_summary",
|
|
"description": "Get aggregate threat statistics for the last N hours. Returns total events, unique IPs, top attack types, and top source IPs.",
|
|
"parameters": {
|
|
"type": "object",
|
|
"properties": {
|
|
"hours": {
|
|
"type": "integer",
|
|
"description": "Number of hours to look back (default 24)"
|
|
}
|
|
},
|
|
"required": []
|
|
}
|
|
}
|
|
},
|
|
{
|
|
"type": "function",
|
|
"function": {
|
|
"name": "get_recent_attacks",
|
|
"description": "Get list of recent attack events. Use to check for ongoing attack campaigns or patterns.",
|
|
"parameters": {
|
|
"type": "object",
|
|
"properties": {
|
|
"hours": {
|
|
"type": "integer",
|
|
"description": "Number of hours to look back (default 1)"
|
|
}
|
|
},
|
|
"required": []
|
|
}
|
|
}
|
|
},
|
|
]
|
|
|
|
# Graceful shutdown flag
|
|
_shutdown = False
|
|
|
|
|
|
def _handle_signal(signum, frame):
|
|
global _shutdown
|
|
print(f"\n[{datetime.now().isoformat()}] Received signal {signum}, shutting down...")
|
|
_shutdown = True
|
|
|
|
|
|
def get_last_position():
|
|
"""Get the last processed byte position from state file."""
|
|
if STATE_FILE.exists():
|
|
try:
|
|
return int(STATE_FILE.read_text().strip())
|
|
except (ValueError, IOError):
|
|
pass
|
|
return 0
|
|
|
|
|
|
def save_position(position):
|
|
"""Save current byte position to state file."""
|
|
STATE_FILE.write_text(str(position))
|
|
|
|
|
|
def tail_log(path):
|
|
"""Generator that yields new lines from a log file, handling rotation.
|
|
|
|
Uses byte-position tracking. On each poll:
|
|
- If file shrank (rotation), reset to beginning.
|
|
- Read any new bytes, yield complete lines.
|
|
"""
|
|
pos = get_last_position()
|
|
|
|
while not _shutdown:
|
|
try:
|
|
size = os.path.getsize(path)
|
|
except OSError:
|
|
time.sleep(TAIL_POLL_INTERVAL)
|
|
continue
|
|
|
|
# File shrank — log was rotated
|
|
if size < pos:
|
|
print(f"[{datetime.now().isoformat()}] Log rotation detected, resetting to beginning")
|
|
pos = 0
|
|
|
|
if size == pos:
|
|
time.sleep(TAIL_POLL_INTERVAL)
|
|
continue
|
|
|
|
try:
|
|
with open(path, "r", encoding="utf-8", errors="replace") as f:
|
|
f.seek(pos)
|
|
data = f.read()
|
|
new_pos = f.tell()
|
|
except OSError as e:
|
|
print(f"Error reading log file: {e}")
|
|
time.sleep(TAIL_POLL_INTERVAL)
|
|
continue
|
|
|
|
# Split into lines, keeping any incomplete trailing line for next read
|
|
lines = data.split("\n")
|
|
if data.endswith("\n"):
|
|
# All lines complete
|
|
complete_lines = [l for l in lines if l]
|
|
pos = new_pos
|
|
else:
|
|
# Last line is incomplete — don't advance past it
|
|
complete_lines = [l for l in lines[:-1] if l]
|
|
pos = new_pos - len(lines[-1].encode("utf-8"))
|
|
|
|
for line in complete_lines:
|
|
yield line, pos
|
|
|
|
# Generator exits on shutdown
|
|
|
|
|
|
def fetch_logs(start_line, max_lines, total_lines=None):
|
|
"""Fetch logs from log file (local or via SSH). Used in --once mode."""
|
|
if LOG_MODE == "local":
|
|
try:
|
|
# Use tail for efficient reading from end of large files
|
|
if total_lines is None:
|
|
total_lines = get_total_lines()
|
|
lines_from_end = total_lines - start_line
|
|
if lines_from_end <= 0:
|
|
return []
|
|
|
|
cmd = f"tail -n {lines_from_end} '{LOG_PATH}' | head -n {max_lines}"
|
|
result = subprocess.run(cmd, shell=True, capture_output=True, encoding="utf-8", errors="replace", timeout=60)
|
|
if result.returncode != 0:
|
|
raise RuntimeError(f"Failed to read logs: {result.stderr}")
|
|
return result.stdout.strip().split("\n") if result.stdout.strip() else []
|
|
except Exception as e:
|
|
raise RuntimeError(f"Failed to read logs: {e}")
|
|
else:
|
|
cmd = f"ssh {BARGE_HOST} \"tail -n +{start_line + 1} {LOG_PATH} | head -n {max_lines}\""
|
|
result = subprocess.run(cmd, shell=True, capture_output=True, encoding="utf-8", errors="replace", timeout=60)
|
|
if result.returncode != 0:
|
|
raise RuntimeError(f"Failed to fetch logs: {result.stderr}")
|
|
return result.stdout.strip().split("\n") if result.stdout.strip() else []
|
|
|
|
|
|
def get_total_lines():
|
|
"""Get total line count of the log file."""
|
|
if LOG_MODE == "local":
|
|
cmd = f"wc -l < '{LOG_PATH}'"
|
|
result = subprocess.run(cmd, shell=True, capture_output=True, encoding="utf-8", errors="replace", timeout=30)
|
|
if result.returncode != 0:
|
|
raise RuntimeError(f"Failed to count lines: {result.stderr}")
|
|
return int(result.stdout.strip())
|
|
else:
|
|
cmd = f"ssh {BARGE_HOST} \"wc -l < {LOG_PATH}\""
|
|
result = subprocess.run(cmd, shell=True, capture_output=True, encoding="utf-8", errors="replace", timeout=30)
|
|
return int(result.stdout.strip())
|
|
|
|
|
|
def parse_log_line(line):
|
|
"""Parse a Traefik common log format line."""
|
|
pattern = r'^(\S+) - - \[([^\]]+)\] "([^"]+)" (\d+) (\d+) "([^"]*)" "([^"]*)" (\d+) "([^"]*)" "([^"]*)" (\d+)ms$'
|
|
match = re.match(pattern, line)
|
|
if match:
|
|
return {
|
|
"ip": match.group(1),
|
|
"timestamp": match.group(2),
|
|
"request": match.group(3),
|
|
"status": int(match.group(4)),
|
|
"bytes": int(match.group(5)),
|
|
"referer": match.group(6),
|
|
"user_agent": match.group(7),
|
|
"request_id": match.group(8),
|
|
"service": match.group(9),
|
|
"backend": match.group(10),
|
|
"latency_ms": int(match.group(11)),
|
|
}
|
|
return None
|
|
|
|
|
|
def is_private_ip(ip_str):
|
|
"""Check if an IP address is private/internal."""
|
|
try:
|
|
return ipaddress.ip_address(ip_str).is_private
|
|
except ValueError:
|
|
return False
|
|
|
|
|
|
def log_abuse(ip, attack_type):
|
|
"""Write abuse detection to log file for fail2ban consumption."""
|
|
if not ABUSE_LOG:
|
|
return
|
|
if is_private_ip(ip):
|
|
return
|
|
timestamp = datetime.now().strftime("%Y-%m-%dT%H:%M:%S")
|
|
entry = f"{timestamp} web-log-monitor: abuse from {ip} - {attack_type}\n"
|
|
try:
|
|
with open(ABUSE_LOG, "a") as f:
|
|
f.write(entry)
|
|
except OSError as e:
|
|
print(f"Failed to write abuse log: {e}")
|
|
|
|
|
|
def check_immediate_alerts(logs, record=True):
|
|
"""Check for patterns that warrant immediate alerting without LLM.
|
|
|
|
When record=True (daemon mode), also records events to the threat DB.
|
|
"""
|
|
alerts = []
|
|
for line in logs:
|
|
for pattern, attack_type in IMMEDIATE_ALERT_PATTERNS:
|
|
if re.search(pattern, line, re.IGNORECASE):
|
|
parsed = parse_log_line(line)
|
|
if parsed:
|
|
log_abuse(parsed["ip"], attack_type)
|
|
if record and DB_SERVER:
|
|
try:
|
|
threat_db.record_event(
|
|
parsed["ip"], attack_type, "regex",
|
|
severity="high", evidence=line[:500]
|
|
)
|
|
except Exception as e:
|
|
print(f"Failed to record event to DB: {e}")
|
|
alerts.append({
|
|
"severity": "high",
|
|
"type": attack_type,
|
|
"evidence": line[:200],
|
|
"recommendation": "Review and consider blocking source IP"
|
|
})
|
|
break # One alert per line
|
|
return alerts
|
|
|
|
|
|
def filter_noise(logs):
|
|
"""Filter out known-good traffic to reduce LLM processing."""
|
|
filtered = []
|
|
for line in logs:
|
|
parsed = parse_log_line(line)
|
|
if not parsed:
|
|
continue
|
|
|
|
# Skip internal health checks from Firewalla/monitoring
|
|
if parsed["ip"] == "192.168.10.1" and "/health" in parsed["request"]:
|
|
continue
|
|
|
|
# Skip successful static asset requests
|
|
if parsed["status"] == 200 and re.search(r"\.(css|js|png|jpg|ico|woff|svg)(\?|$)", parsed["request"]):
|
|
continue
|
|
|
|
# Skip public Gitea repository browsing (actual attacks still caught by regex alerts)
|
|
if "gitea" in parsed["service"].lower():
|
|
continue
|
|
|
|
# Keep everything else for analysis
|
|
filtered.append(line)
|
|
|
|
return filtered
|
|
|
|
|
|
def execute_tool_call(name, arguments):
|
|
"""Execute an LLM tool call against the threat database."""
|
|
try:
|
|
if name == "lookup_ip":
|
|
return threat_db.lookup_ip(arguments["ip"])
|
|
elif name == "get_threat_summary":
|
|
hours = arguments.get("hours", 24)
|
|
return threat_db.get_threat_summary(hours)
|
|
elif name == "get_recent_attacks":
|
|
hours = arguments.get("hours", 1)
|
|
return threat_db.get_recent_attacks(hours)
|
|
else:
|
|
return {"error": f"Unknown tool: {name}"}
|
|
except Exception as e:
|
|
return {"error": str(e)}
|
|
|
|
|
|
def analyze_with_llm(logs):
|
|
"""Send logs to llama.cpp for analysis with tool-calling support."""
|
|
if not logs:
|
|
return {"suspicious": False, "findings": [], "summary": "No logs to analyze"}
|
|
|
|
log_text = "\n".join(logs[:BATCH_SIZE])
|
|
|
|
messages = [
|
|
{"role": "system", "content": SYSTEM_PROMPT},
|
|
{"role": "user", "content": f"Analyze these access logs:\n\n{log_text}"}
|
|
]
|
|
|
|
# Include tools only if the threat DB is available
|
|
use_tools = DB_SERVER is not None
|
|
|
|
max_rounds = 3
|
|
for round_num in range(max_rounds + 1):
|
|
payload = {
|
|
"model": MODEL,
|
|
"messages": messages,
|
|
"temperature": 0.1,
|
|
"max_tokens": 1024,
|
|
}
|
|
if use_tools and round_num < max_rounds:
|
|
payload["tools"] = LLM_TOOLS
|
|
|
|
try:
|
|
response = requests.post(LLAMA_URL, json=payload, timeout=120)
|
|
response.raise_for_status()
|
|
choice = response.json()["choices"][0]
|
|
message = choice["message"]
|
|
except requests.exceptions.RequestException as e:
|
|
print(f"LLM request failed: {e}")
|
|
return None
|
|
except (KeyError, IndexError) as e:
|
|
print(f"Unexpected LLM response format: {e}")
|
|
return None
|
|
|
|
# Check if the LLM wants to call tools
|
|
tool_calls = message.get("tool_calls")
|
|
if tool_calls and round_num < max_rounds:
|
|
# Add the assistant message with tool calls to conversation
|
|
messages.append(message)
|
|
|
|
for tc in tool_calls:
|
|
func = tc["function"]
|
|
try:
|
|
args = json.loads(func["arguments"]) if isinstance(func["arguments"], str) else func["arguments"]
|
|
except json.JSONDecodeError:
|
|
args = {}
|
|
|
|
result = execute_tool_call(func["name"], args)
|
|
result_json = json.dumps(result)
|
|
print(f" Tool call: {func['name']}({args}) -> {result_json[:200]}")
|
|
|
|
messages.append({
|
|
"role": "tool",
|
|
"tool_call_id": tc["id"],
|
|
"content": result_json
|
|
})
|
|
|
|
# Continue loop — send tool results back to LLM
|
|
continue
|
|
|
|
# Final text response — parse JSON
|
|
content = message.get("content", "")
|
|
try:
|
|
json_match = re.search(r'\{[\s\S]*\}', content)
|
|
if json_match:
|
|
return json.loads(json_match.group())
|
|
return {"suspicious": False, "findings": [], "summary": "Failed to parse LLM response"}
|
|
except json.JSONDecodeError as e:
|
|
print(f"Failed to parse LLM response: {e}")
|
|
print(f"Raw response: {content[:500]}")
|
|
return None
|
|
|
|
# Exhausted tool-call rounds without a final answer
|
|
print("Warning: LLM exhausted tool-call rounds without a final response")
|
|
return {"suspicious": False, "findings": [], "summary": "Analysis incomplete (tool-call limit)"}
|
|
|
|
|
|
def send_alert(title, message, priority=5):
|
|
"""Send alert via Gotify."""
|
|
if GOTIFY_TOKEN == "YOUR_TOKEN_HERE":
|
|
print(f"[ALERT - Gotify not configured] {title}: {message}")
|
|
return
|
|
|
|
try:
|
|
response = requests.post(
|
|
GOTIFY_URL,
|
|
params={"token": GOTIFY_TOKEN},
|
|
json={
|
|
"title": title,
|
|
"message": message,
|
|
"priority": priority,
|
|
},
|
|
timeout=10
|
|
)
|
|
response.raise_for_status()
|
|
print(f"Alert sent: {title}")
|
|
except requests.exceptions.RequestException as e:
|
|
print(f"Failed to send Gotify alert: {e}")
|
|
|
|
|
|
def format_findings(findings):
|
|
"""Format findings for alert message."""
|
|
lines = []
|
|
for f in findings[:5]: # Limit to 5 findings per alert
|
|
lines.append(f"[{f['severity'].upper()}] {f['type']}")
|
|
if f.get("ip"):
|
|
lines.append(f" IP: {f['ip']}")
|
|
lines.append(f" Evidence: {f['evidence'][:100]}...")
|
|
lines.append(f" Action: {f['recommendation']}")
|
|
lines.append("")
|
|
return "\n".join(lines)
|
|
|
|
|
|
def print_log_stats(logs):
|
|
"""Print verbose statistics about the logs."""
|
|
if not logs:
|
|
print("\n No logs to analyze")
|
|
return
|
|
|
|
parsed_logs = [parse_log_line(line) for line in logs]
|
|
parsed_logs = [p for p in parsed_logs if p] # Filter None
|
|
|
|
if not parsed_logs:
|
|
print("\n No parseable logs found")
|
|
return
|
|
|
|
# Time range
|
|
timestamps = [p["timestamp"] for p in parsed_logs]
|
|
print(f"\n{'─' * 60}")
|
|
print("LOG STATISTICS")
|
|
print(f"{'─' * 60}")
|
|
print(f" Time range: {timestamps[0]}")
|
|
print(f" {timestamps[-1]}")
|
|
print(f" Total entries: {len(parsed_logs)}")
|
|
|
|
# Status codes
|
|
status_counts = Counter(p["status"] for p in parsed_logs)
|
|
print(f"\n Status Codes:")
|
|
for status, count in sorted(status_counts.items()):
|
|
bar = "█" * min(count // 5, 30)
|
|
print(f" {status}: {count:>5} {bar}")
|
|
|
|
# Top IPs
|
|
ip_counts = Counter(p["ip"] for p in parsed_logs)
|
|
print(f"\n Top Source IPs:")
|
|
for ip, count in ip_counts.most_common(5):
|
|
pct = (count / len(parsed_logs)) * 100
|
|
print(f" {ip:<18} {count:>5} ({pct:.1f}%)")
|
|
|
|
# Services
|
|
service_counts = Counter(p["service"] for p in parsed_logs)
|
|
print(f"\n Requests by Service:")
|
|
for service, count in service_counts.most_common(10):
|
|
pct = (count / len(parsed_logs)) * 100
|
|
print(f" {service:<25} {count:>5} ({pct:.1f}%)")
|
|
|
|
# HTTP Methods
|
|
method_counts = Counter()
|
|
for p in parsed_logs:
|
|
method = p["request"].split()[0] if p["request"] else "UNKNOWN"
|
|
method_counts[method] += 1
|
|
print(f"\n HTTP Methods:")
|
|
for method, count in method_counts.most_common():
|
|
print(f" {method:<8} {count:>5}")
|
|
|
|
# Latency stats
|
|
latencies = [p["latency_ms"] for p in parsed_logs]
|
|
avg_latency = sum(latencies) / len(latencies)
|
|
max_latency = max(latencies)
|
|
print(f"\n Latency:")
|
|
print(f" Average: {avg_latency:.1f}ms")
|
|
print(f" Max: {max_latency}ms")
|
|
|
|
# Error rate
|
|
errors = sum(1 for p in parsed_logs if p["status"] >= 400)
|
|
error_rate = (errors / len(parsed_logs)) * 100
|
|
print(f"\n Error rate: {error_rate:.1f}% ({errors} requests)")
|
|
print(f"{'─' * 60}\n")
|
|
|
|
|
|
def process_batch(logs, args):
|
|
"""Process a batch of log lines: immediate alerts + LLM analysis + alerting."""
|
|
all_findings = []
|
|
|
|
# Check for immediate alert patterns
|
|
immediate_alerts = check_immediate_alerts(logs)
|
|
if immediate_alerts:
|
|
all_findings.extend(immediate_alerts)
|
|
print(f" Found {len(immediate_alerts)} immediate alert patterns")
|
|
|
|
# Filter noise and analyze with LLM
|
|
filtered_logs = filter_noise(logs)
|
|
print(f" Filtered {len(logs)} -> {len(filtered_logs)} logs for LLM analysis")
|
|
|
|
if filtered_logs:
|
|
llm_available = True
|
|
for i in range(0, len(filtered_logs), BATCH_SIZE):
|
|
if not llm_available:
|
|
print(" Skipping remaining batches (LLM unavailable)")
|
|
break
|
|
batch = filtered_logs[i:i + BATCH_SIZE]
|
|
print(f" Analyzing batch {i // BATCH_SIZE + 1} ({len(batch)} logs)...")
|
|
|
|
result = analyze_with_llm(batch)
|
|
if result is None:
|
|
llm_available = False
|
|
continue
|
|
if result.get("suspicious"):
|
|
findings = result.get("findings", [])
|
|
# Record LLM findings to threat DB
|
|
if DB_SERVER:
|
|
for f in findings:
|
|
ip = f.get("ip")
|
|
if ip and not is_private_ip(ip):
|
|
try:
|
|
threat_db.record_event(
|
|
ip, f.get("type", "unknown"), "llm",
|
|
severity=f.get("severity"),
|
|
evidence=f.get("evidence", "")[:500]
|
|
)
|
|
except Exception as e:
|
|
print(f" Failed to record LLM finding to DB: {e}")
|
|
if f.get("severity") in ("high", "critical"):
|
|
log_abuse(ip, f.get("type", "unknown"))
|
|
all_findings.extend(findings)
|
|
print(f" LLM found suspicious activity: {result.get('summary')}")
|
|
|
|
# Send consolidated alert if findings exist
|
|
if all_findings:
|
|
severity_order = {"critical": 0, "high": 1, "medium": 2, "low": 3}
|
|
all_findings.sort(key=lambda x: severity_order.get(x.get("severity", "low"), 3))
|
|
|
|
max_severity = all_findings[0].get("severity", "low")
|
|
priority = {"critical": 10, "high": 8, "medium": 5, "low": 3}.get(max_severity, 5)
|
|
|
|
alert_msg = format_findings(all_findings)
|
|
|
|
if args.verbose:
|
|
print(f"\n{'─' * 60}")
|
|
print("FINDINGS")
|
|
print(f"{'─' * 60}")
|
|
print(alert_msg)
|
|
|
|
if not args.dry_run:
|
|
send_alert(
|
|
f"Web Security Alert ({len(all_findings)} findings)",
|
|
alert_msg,
|
|
priority=priority
|
|
)
|
|
else:
|
|
print(f" [Dry run] Would send alert with {len(all_findings)} findings")
|
|
else:
|
|
print(" No suspicious activity detected")
|
|
|
|
|
|
def run_daemon(args):
|
|
"""Run as a long-lived daemon, tailing the log file in real time."""
|
|
print(f"[{datetime.now().isoformat()}] Starting daemon mode")
|
|
print(f" Log file: {LOG_PATH}")
|
|
print(f" LLM interval: {LLM_INTERVAL}s")
|
|
print(f" Poll interval: {TAIL_POLL_INTERVAL}s")
|
|
if DB_SERVER:
|
|
print(f" Threat DB: {DB_SERVER}/{DB_NAME}")
|
|
else:
|
|
print(" Threat DB: disabled (no DB_SERVER configured)")
|
|
|
|
buffer = []
|
|
last_flush = time.monotonic()
|
|
last_save = time.monotonic()
|
|
current_pos = get_last_position()
|
|
|
|
for line, pos in tail_log(LOG_PATH):
|
|
current_pos = pos
|
|
|
|
# Immediate regex check on every line
|
|
immediate = check_immediate_alerts([line])
|
|
if immediate:
|
|
print(f"[{datetime.now().isoformat()}] Immediate alert: {immediate[0]['type']}")
|
|
|
|
buffer.append(line)
|
|
|
|
now = time.monotonic()
|
|
|
|
# Flush buffer to LLM at interval
|
|
if now - last_flush >= LLM_INTERVAL and buffer:
|
|
# Cap to BATCH_SIZE to avoid overwhelming the LLM with huge backlogs.
|
|
# Remaining lines carry forward to the next interval.
|
|
flush_batch = buffer[:BATCH_SIZE]
|
|
buffer = buffer[BATCH_SIZE:]
|
|
if buffer:
|
|
print(f"\n[{datetime.now().isoformat()}] Flushing {len(flush_batch)} lines to LLM ({len(buffer)} deferred)...")
|
|
else:
|
|
print(f"\n[{datetime.now().isoformat()}] Flushing {len(flush_batch)} lines to LLM...")
|
|
if args.verbose:
|
|
print_log_stats(flush_batch)
|
|
process_batch(flush_batch, args)
|
|
# Reset timer AFTER processing so we don't immediately flush again
|
|
last_flush = time.monotonic()
|
|
|
|
# Save position periodically
|
|
if now - last_save >= 30:
|
|
if not args.dry_run:
|
|
save_position(current_pos)
|
|
last_save = now
|
|
|
|
# Shutdown: flush remaining buffer and save state
|
|
if buffer:
|
|
print(f"\n[{datetime.now().isoformat()}] Shutdown: flushing {len(buffer)} remaining lines...")
|
|
process_batch(buffer, args)
|
|
|
|
if not args.dry_run:
|
|
save_position(current_pos)
|
|
print(f"Final position saved: {current_pos}")
|
|
|
|
|
|
def run_once(args):
|
|
"""Run a single analysis pass (original cron-job behavior)."""
|
|
print(f"[{datetime.now().isoformat()}] Starting single-run analysis...")
|
|
if args.dry_run:
|
|
print("(Dry run - no alerts will be sent, state will not be updated)")
|
|
|
|
# Get current position
|
|
last_pos = get_last_position()
|
|
total_lines = get_total_lines()
|
|
|
|
# If first run or log rotated, start from recent logs
|
|
if last_pos == 0 or last_pos > total_lines:
|
|
last_pos = max(0, total_lines - MAX_LINES_PER_RUN)
|
|
print(f"Starting from line {last_pos} (total: {total_lines})")
|
|
|
|
lines_to_process = min(total_lines - last_pos, MAX_LINES_PER_RUN)
|
|
if lines_to_process <= 0:
|
|
print("No new logs to process")
|
|
return
|
|
|
|
print(f"Processing {lines_to_process} new log lines...")
|
|
|
|
# Fetch logs
|
|
logs = fetch_logs(last_pos, lines_to_process, total_lines)
|
|
if not logs:
|
|
print("No logs fetched")
|
|
return
|
|
|
|
# Show verbose stats if requested
|
|
if args.verbose:
|
|
print_log_stats(logs)
|
|
|
|
process_batch(logs, args)
|
|
|
|
# Save position
|
|
if not args.dry_run:
|
|
save_position(last_pos + len(logs))
|
|
print(f"Processed up to line {last_pos + len(logs)}")
|
|
else:
|
|
print(f"[Dry run] Would save position: {last_pos + len(logs)}")
|
|
|
|
|
|
def parse_args():
|
|
"""Parse command line arguments."""
|
|
parser = argparse.ArgumentParser(
|
|
description="Analyze Traefik access logs for suspicious activity using LLM"
|
|
)
|
|
parser.add_argument(
|
|
"-v", "--verbose",
|
|
action="store_true",
|
|
help="Show detailed log statistics"
|
|
)
|
|
parser.add_argument(
|
|
"--dry-run",
|
|
action="store_true",
|
|
help="Analyze logs but don't send alerts or update state"
|
|
)
|
|
parser.add_argument(
|
|
"--once",
|
|
action="store_true",
|
|
help="Run a single analysis pass and exit (original cron-job mode)"
|
|
)
|
|
return parser.parse_args()
|
|
|
|
|
|
def main():
|
|
args = parse_args()
|
|
|
|
# Initialize threat database if configured
|
|
if DB_SERVER and DB_NAME and DB_USER and DB_PASSWORD:
|
|
try:
|
|
threat_db.init_db(DB_SERVER, DB_NAME, DB_USER, DB_PASSWORD)
|
|
print(f"Threat database initialized: {DB_SERVER}/{DB_NAME}")
|
|
except Exception as e:
|
|
print(f"Warning: Failed to initialize threat database: {e}")
|
|
print("Continuing without historical threat data.")
|
|
else:
|
|
print("Threat database not configured — running without historical data.")
|
|
|
|
if args.once:
|
|
run_once(args)
|
|
else:
|
|
# Daemon mode — register signal handlers for graceful shutdown
|
|
signal.signal(signal.SIGTERM, _handle_signal)
|
|
signal.signal(signal.SIGINT, _handle_signal)
|
|
run_daemon(args)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|