b13e69a44f
Fix .git/ regex pattern to require leading slash, preventing Gitea git-protocol URLs from triggering "Sensitive File Probe" alerts. Add infrastructure context to the LLM system prompt describing Gitea, Nextcloud, Immich, and Gotify traffic patterns so the LLM does not flag normal self-hosted service activity. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
804 lines
29 KiB
Python
804 lines
29 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Web Log Security Monitor
|
|
Real-time daemon that analyzes Traefik access logs using local LLM on athena.lan.
|
|
Sends alerts via Gotify when suspicious activity is detected.
|
|
Uses LLM tool calling to query historical threat data for context-aware analysis.
|
|
"""
|
|
|
|
import argparse
|
|
import os
|
|
import signal
|
|
import subprocess
|
|
import requests
|
|
import json
|
|
import re
|
|
import time
|
|
from collections import Counter
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
import ipaddress
|
|
|
|
import threat_db
|
|
|
|
# Configuration from environment variables
|
|
LLAMA_URL = os.environ.get("LLAMA_URL", "http://localhost:11434/v1/chat/completions")
|
|
MODEL = os.environ.get("MODEL", "Qwen3-8B-Q6_K")
|
|
GOTIFY_URL = os.environ.get("GOTIFY_URL", "")
|
|
GOTIFY_TOKEN = os.environ.get("GOTIFY_TOKEN", "")
|
|
LOG_PATH = os.environ.get("LOG_PATH", "/logs/access.log")
|
|
LOG_MODE = os.environ.get("LOG_MODE", "local")
|
|
BARGE_HOST = os.environ.get("BARGE_HOST", "barge.lan")
|
|
ABUSE_LOG = os.environ.get("ABUSE_LOG", "/data/abuse.log")
|
|
STATE_DIR = os.environ.get("STATE_DIR", "/data")
|
|
BATCH_SIZE = int(os.environ.get("BATCH_SIZE", "100"))
|
|
MAX_LINES_PER_RUN = int(os.environ.get("MAX_LINES_PER_RUN", "1000"))
|
|
LLM_INTERVAL = int(os.environ.get("LLM_INTERVAL", "25"))
|
|
TAIL_POLL_INTERVAL = int(os.environ.get("TAIL_POLL_INTERVAL", "1"))
|
|
DB_SERVER = os.environ.get("DB_SERVER")
|
|
DB_NAME = os.environ.get("DB_NAME")
|
|
DB_USER = os.environ.get("DB_USER")
|
|
DB_PASSWORD = os.environ.get("DB_PASSWORD")
|
|
|
|
# State file location
|
|
if STATE_DIR:
|
|
STATE_FILE = Path(STATE_DIR) / ".web-log-monitor-state"
|
|
else:
|
|
SCRIPT_DIR = Path(__file__).parent
|
|
STATE_FILE = SCRIPT_DIR / ".web-log-monitor-state"
|
|
|
|
# Pre-filter patterns to reduce LLM load (obvious attacks get flagged immediately)
|
|
IMMEDIATE_ALERT_PATTERNS = [
|
|
(r"(?:union\s+select|select\s+.*\s+from|insert\s+into|delete\s+from|drop\s+table)", "SQL Injection"),
|
|
(r"(?:\.\./|\.\.\\|%2e%2e%2f|%2e%2e/|\.\.%2f)", "Path Traversal"),
|
|
(r"(?:<script|javascript:|onerror=|onload=|%3cscript)", "XSS Attempt"),
|
|
(r"(?:/etc/passwd|/etc/shadow|/proc/self)", "LFI Attempt"),
|
|
(r"(?:;\s*(?:ls|cat|wget|curl|bash|sh|nc)\s)", "Command Injection"),
|
|
(r"(?:wp-login|wp-admin|xmlrpc\.php).*(?:POST|HEAD)", "WordPress Scan"),
|
|
(r"(?:phpmyadmin|pma|mysqladmin)", "DB Admin Scan"),
|
|
(r"(?:\.env|/\.git/|\.aws/|\.ssh/)", "Sensitive File Probe"),
|
|
]
|
|
|
|
SYSTEM_PROMPT = """/no_think
|
|
You are a security analyst reviewing web server access logs. You have access to a historical threat database that you can query using tool calls.
|
|
|
|
Analyze the logs for suspicious patterns including:
|
|
1. Reconnaissance: Port scans, directory enumeration, technology fingerprinting
|
|
2. Brute force: Repeated auth failures, credential stuffing patterns
|
|
3. Injection attempts: SQL, command, LDAP, XPath injection
|
|
4. Path traversal: Attempts to access files outside webroot
|
|
5. Scanner signatures: Known vulnerability scanners, malicious bots
|
|
6. Anomalies: Unusual request patterns, suspicious user agents, weird timing
|
|
|
|
IMPORTANT: When you see suspicious IPs, use the lookup_ip tool to check if they are repeat offenders. Use get_threat_summary to understand the current threat landscape. Use get_recent_attacks to check for ongoing attack campaigns. This historical context should inform your severity ratings — repeat offenders and IPs involved in multiple attack types should be rated more severely.
|
|
|
|
After your analysis (and any tool calls), respond with JSON only (no markdown, no explanation):
|
|
{
|
|
"suspicious": true/false,
|
|
"findings": [
|
|
{
|
|
"severity": "low|medium|high|critical",
|
|
"type": "category of attack",
|
|
"ip": "source IP if identifiable",
|
|
"evidence": "specific log line or pattern",
|
|
"recommendation": "brief action to take"
|
|
}
|
|
],
|
|
"summary": "one sentence overview"
|
|
}
|
|
|
|
If nothing suspicious, return: {"suspicious": false, "findings": [], "summary": "No suspicious activity detected"}
|
|
|
|
IMPORTANT CONTEXT about the infrastructure behind this reverse proxy (Traefik):
|
|
- Gitea (git server): Expect git clone/push/pull traffic with URLs like /user/repo.git/info/refs, /user/repo.git/git-upload-pack, etc. This is NORMAL git protocol traffic, not sensitive file probing. Gitea also serves web UI pages for browsing repositories.
|
|
- Nextcloud (cloud storage): Expect heavy API traffic including WebDAV, OCS API calls, app-specific endpoints (bookmarks, calendar, contacts sync). Burst of requests from a single IP to Nextcloud is normal client sync behavior, NOT reconnaissance.
|
|
- Immich (photo management): Expect API calls for photo sync and uploads from mobile/desktop clients.
|
|
- Gotify (push notifications): Expect WebSocket connections and message API calls.
|
|
- Other self-hosted services behind this proxy generate legitimate automated traffic patterns.
|
|
|
|
Do NOT flag the following as suspicious:
|
|
- Git protocol operations to Gitea repositories (even with 401 responses — git auth negotiation starts with a 401)
|
|
- Nextcloud sync bursts (many rapid requests from one IP to Nextcloud endpoints)
|
|
- Authenticated API traffic to any of the above services
|
|
- Standard browser navigation patterns to any hosted service
|
|
|
|
Be conservative - normal traffic like health checks, static assets, and authenticated user activity is not suspicious."""
|
|
|
|
# Tool definitions for the LLM (OpenAI function-calling format)
|
|
LLM_TOOLS = [
|
|
{
|
|
"type": "function",
|
|
"function": {
|
|
"name": "lookup_ip",
|
|
"description": "Look up historical threat data for an IP address. Returns past event count, attack types, and first/last seen timestamps. Use this to check if an IP is a repeat offender.",
|
|
"parameters": {
|
|
"type": "object",
|
|
"properties": {
|
|
"ip": {
|
|
"type": "string",
|
|
"description": "The IP address to look up"
|
|
}
|
|
},
|
|
"required": ["ip"]
|
|
}
|
|
}
|
|
},
|
|
{
|
|
"type": "function",
|
|
"function": {
|
|
"name": "get_threat_summary",
|
|
"description": "Get aggregate threat statistics for the last N hours. Returns total events, unique IPs, top attack types, and top source IPs.",
|
|
"parameters": {
|
|
"type": "object",
|
|
"properties": {
|
|
"hours": {
|
|
"type": "integer",
|
|
"description": "Number of hours to look back (default 24)"
|
|
}
|
|
},
|
|
"required": []
|
|
}
|
|
}
|
|
},
|
|
{
|
|
"type": "function",
|
|
"function": {
|
|
"name": "get_recent_attacks",
|
|
"description": "Get list of recent attack events. Use to check for ongoing attack campaigns or patterns.",
|
|
"parameters": {
|
|
"type": "object",
|
|
"properties": {
|
|
"hours": {
|
|
"type": "integer",
|
|
"description": "Number of hours to look back (default 1)"
|
|
}
|
|
},
|
|
"required": []
|
|
}
|
|
}
|
|
},
|
|
]
|
|
|
|
# Graceful shutdown flag
|
|
_shutdown = False
|
|
|
|
|
|
def _handle_signal(signum, frame):
|
|
global _shutdown
|
|
print(f"\n[{datetime.now().isoformat()}] Received signal {signum}, shutting down...")
|
|
_shutdown = True
|
|
|
|
|
|
def get_last_position():
|
|
"""Get the last processed byte position from state file."""
|
|
if STATE_FILE.exists():
|
|
try:
|
|
return int(STATE_FILE.read_text().strip())
|
|
except (ValueError, IOError):
|
|
pass
|
|
return 0
|
|
|
|
|
|
def save_position(position):
|
|
"""Save current byte position to state file."""
|
|
STATE_FILE.write_text(str(position))
|
|
|
|
|
|
def tail_log(path):
|
|
"""Generator that yields new lines from a log file, handling rotation.
|
|
|
|
Uses byte-position tracking. On each poll:
|
|
- If file shrank (rotation), reset to beginning.
|
|
- Read any new bytes, yield complete lines.
|
|
"""
|
|
pos = get_last_position()
|
|
|
|
while not _shutdown:
|
|
try:
|
|
size = os.path.getsize(path)
|
|
except OSError:
|
|
time.sleep(TAIL_POLL_INTERVAL)
|
|
continue
|
|
|
|
# File shrank — log was rotated
|
|
if size < pos:
|
|
print(f"[{datetime.now().isoformat()}] Log rotation detected, resetting to beginning")
|
|
pos = 0
|
|
|
|
if size == pos:
|
|
time.sleep(TAIL_POLL_INTERVAL)
|
|
continue
|
|
|
|
try:
|
|
with open(path, "r", encoding="utf-8", errors="replace") as f:
|
|
f.seek(pos)
|
|
data = f.read()
|
|
new_pos = f.tell()
|
|
except OSError as e:
|
|
print(f"Error reading log file: {e}")
|
|
time.sleep(TAIL_POLL_INTERVAL)
|
|
continue
|
|
|
|
# Split into lines, keeping any incomplete trailing line for next read
|
|
lines = data.split("\n")
|
|
if data.endswith("\n"):
|
|
# All lines complete
|
|
complete_lines = [l for l in lines if l]
|
|
pos = new_pos
|
|
else:
|
|
# Last line is incomplete — don't advance past it
|
|
complete_lines = [l for l in lines[:-1] if l]
|
|
pos = new_pos - len(lines[-1].encode("utf-8"))
|
|
|
|
for line in complete_lines:
|
|
yield line, pos
|
|
|
|
# Generator exits on shutdown
|
|
|
|
|
|
def fetch_logs(start_line, max_lines, total_lines=None):
|
|
"""Fetch logs from log file (local or via SSH). Used in --once mode."""
|
|
if LOG_MODE == "local":
|
|
try:
|
|
# Use tail for efficient reading from end of large files
|
|
if total_lines is None:
|
|
total_lines = get_total_lines()
|
|
lines_from_end = total_lines - start_line
|
|
if lines_from_end <= 0:
|
|
return []
|
|
|
|
cmd = f"tail -n {lines_from_end} '{LOG_PATH}' | head -n {max_lines}"
|
|
result = subprocess.run(cmd, shell=True, capture_output=True, encoding="utf-8", errors="replace", timeout=60)
|
|
if result.returncode != 0:
|
|
raise RuntimeError(f"Failed to read logs: {result.stderr}")
|
|
return result.stdout.strip().split("\n") if result.stdout.strip() else []
|
|
except Exception as e:
|
|
raise RuntimeError(f"Failed to read logs: {e}")
|
|
else:
|
|
cmd = f"ssh {BARGE_HOST} \"tail -n +{start_line + 1} {LOG_PATH} | head -n {max_lines}\""
|
|
result = subprocess.run(cmd, shell=True, capture_output=True, encoding="utf-8", errors="replace", timeout=60)
|
|
if result.returncode != 0:
|
|
raise RuntimeError(f"Failed to fetch logs: {result.stderr}")
|
|
return result.stdout.strip().split("\n") if result.stdout.strip() else []
|
|
|
|
|
|
def get_total_lines():
|
|
"""Get total line count of the log file."""
|
|
if LOG_MODE == "local":
|
|
cmd = f"wc -l < '{LOG_PATH}'"
|
|
result = subprocess.run(cmd, shell=True, capture_output=True, encoding="utf-8", errors="replace", timeout=30)
|
|
if result.returncode != 0:
|
|
raise RuntimeError(f"Failed to count lines: {result.stderr}")
|
|
return int(result.stdout.strip())
|
|
else:
|
|
cmd = f"ssh {BARGE_HOST} \"wc -l < {LOG_PATH}\""
|
|
result = subprocess.run(cmd, shell=True, capture_output=True, encoding="utf-8", errors="replace", timeout=30)
|
|
return int(result.stdout.strip())
|
|
|
|
|
|
def parse_log_line(line):
|
|
"""Parse a Traefik common log format line."""
|
|
pattern = r'^(\S+) - - \[([^\]]+)\] "([^"]+)" (\d+) (\d+) "([^"]*)" "([^"]*)" (\d+) "([^"]*)" "([^"]*)" (\d+)ms$'
|
|
match = re.match(pattern, line)
|
|
if match:
|
|
return {
|
|
"ip": match.group(1),
|
|
"timestamp": match.group(2),
|
|
"request": match.group(3),
|
|
"status": int(match.group(4)),
|
|
"bytes": int(match.group(5)),
|
|
"referer": match.group(6),
|
|
"user_agent": match.group(7),
|
|
"request_id": match.group(8),
|
|
"service": match.group(9),
|
|
"backend": match.group(10),
|
|
"latency_ms": int(match.group(11)),
|
|
}
|
|
return None
|
|
|
|
|
|
def is_private_ip(ip_str):
|
|
"""Check if an IP address is private/internal."""
|
|
try:
|
|
return ipaddress.ip_address(ip_str).is_private
|
|
except ValueError:
|
|
return False
|
|
|
|
|
|
def log_abuse(ip, attack_type):
|
|
"""Write abuse detection to log file for fail2ban consumption."""
|
|
if not ABUSE_LOG:
|
|
return
|
|
if is_private_ip(ip):
|
|
return
|
|
timestamp = datetime.now().strftime("%Y-%m-%dT%H:%M:%S")
|
|
entry = f"{timestamp} web-log-monitor: abuse from {ip} - {attack_type}\n"
|
|
try:
|
|
with open(ABUSE_LOG, "a") as f:
|
|
f.write(entry)
|
|
except OSError as e:
|
|
print(f"Failed to write abuse log: {e}")
|
|
|
|
|
|
def check_immediate_alerts(logs, record=True):
|
|
"""Check for patterns that warrant immediate alerting without LLM.
|
|
|
|
When record=True (daemon mode), also records events to the threat DB.
|
|
"""
|
|
alerts = []
|
|
for line in logs:
|
|
for pattern, attack_type in IMMEDIATE_ALERT_PATTERNS:
|
|
if re.search(pattern, line, re.IGNORECASE):
|
|
parsed = parse_log_line(line)
|
|
if parsed:
|
|
log_abuse(parsed["ip"], attack_type)
|
|
if record and DB_SERVER:
|
|
try:
|
|
threat_db.record_event(
|
|
parsed["ip"], attack_type, "regex",
|
|
severity="high", evidence=line[:500]
|
|
)
|
|
except Exception as e:
|
|
print(f"Failed to record event to DB: {e}")
|
|
alerts.append({
|
|
"severity": "high",
|
|
"type": attack_type,
|
|
"evidence": line[:200],
|
|
"recommendation": "Review and consider blocking source IP"
|
|
})
|
|
break # One alert per line
|
|
return alerts
|
|
|
|
|
|
def filter_noise(logs):
|
|
"""Filter out known-good traffic to reduce LLM processing."""
|
|
filtered = []
|
|
for line in logs:
|
|
parsed = parse_log_line(line)
|
|
if not parsed:
|
|
continue
|
|
|
|
# Skip internal health checks from Firewalla/monitoring
|
|
if parsed["ip"] == "192.168.10.1" and "/health" in parsed["request"]:
|
|
continue
|
|
|
|
# Skip successful static asset requests
|
|
if parsed["status"] == 200 and re.search(r"\.(css|js|png|jpg|ico|woff|svg)(\?|$)", parsed["request"]):
|
|
continue
|
|
|
|
# Skip public Gitea repository browsing (actual attacks still caught by regex alerts)
|
|
if "gitea" in parsed["service"].lower():
|
|
continue
|
|
|
|
# Keep everything else for analysis
|
|
filtered.append(line)
|
|
|
|
return filtered
|
|
|
|
|
|
def execute_tool_call(name, arguments):
|
|
"""Execute an LLM tool call against the threat database."""
|
|
try:
|
|
if name == "lookup_ip":
|
|
return threat_db.lookup_ip(arguments["ip"])
|
|
elif name == "get_threat_summary":
|
|
hours = arguments.get("hours", 24)
|
|
return threat_db.get_threat_summary(hours)
|
|
elif name == "get_recent_attacks":
|
|
hours = arguments.get("hours", 1)
|
|
return threat_db.get_recent_attacks(hours)
|
|
else:
|
|
return {"error": f"Unknown tool: {name}"}
|
|
except Exception as e:
|
|
return {"error": str(e)}
|
|
|
|
|
|
def analyze_with_llm(logs):
|
|
"""Send logs to llama.cpp for analysis with tool-calling support."""
|
|
if not logs:
|
|
return {"suspicious": False, "findings": [], "summary": "No logs to analyze"}
|
|
|
|
log_text = "\n".join(logs[:BATCH_SIZE])
|
|
|
|
messages = [
|
|
{"role": "system", "content": SYSTEM_PROMPT},
|
|
{"role": "user", "content": f"Analyze these access logs:\n\n{log_text}"}
|
|
]
|
|
|
|
# Include tools only if the threat DB is available
|
|
use_tools = DB_SERVER is not None
|
|
|
|
max_rounds = 3
|
|
for round_num in range(max_rounds + 1):
|
|
payload = {
|
|
"model": MODEL,
|
|
"messages": messages,
|
|
"temperature": 0.1,
|
|
"max_tokens": 1024,
|
|
}
|
|
if use_tools and round_num < max_rounds:
|
|
payload["tools"] = LLM_TOOLS
|
|
|
|
try:
|
|
response = requests.post(LLAMA_URL, json=payload, timeout=120)
|
|
response.raise_for_status()
|
|
choice = response.json()["choices"][0]
|
|
message = choice["message"]
|
|
except requests.exceptions.RequestException as e:
|
|
print(f"LLM request failed: {e}")
|
|
return None
|
|
except (KeyError, IndexError) as e:
|
|
print(f"Unexpected LLM response format: {e}")
|
|
return None
|
|
|
|
# Check if the LLM wants to call tools
|
|
tool_calls = message.get("tool_calls")
|
|
if tool_calls and round_num < max_rounds:
|
|
# Add the assistant message with tool calls to conversation
|
|
messages.append(message)
|
|
|
|
for tc in tool_calls:
|
|
func = tc["function"]
|
|
try:
|
|
args = json.loads(func["arguments"]) if isinstance(func["arguments"], str) else func["arguments"]
|
|
except json.JSONDecodeError:
|
|
args = {}
|
|
|
|
result = execute_tool_call(func["name"], args)
|
|
result_json = json.dumps(result)
|
|
print(f" Tool call: {func['name']}({args}) -> {result_json[:200]}")
|
|
|
|
messages.append({
|
|
"role": "tool",
|
|
"tool_call_id": tc["id"],
|
|
"content": result_json
|
|
})
|
|
|
|
# Continue loop — send tool results back to LLM
|
|
continue
|
|
|
|
# Final text response — parse JSON
|
|
content = message.get("content", "")
|
|
try:
|
|
json_match = re.search(r'\{[\s\S]*\}', content)
|
|
if json_match:
|
|
return json.loads(json_match.group())
|
|
return {"suspicious": False, "findings": [], "summary": "Failed to parse LLM response"}
|
|
except json.JSONDecodeError as e:
|
|
print(f"Failed to parse LLM response: {e}")
|
|
print(f"Raw response: {content[:500]}")
|
|
return None
|
|
|
|
# Exhausted tool-call rounds without a final answer
|
|
print("Warning: LLM exhausted tool-call rounds without a final response")
|
|
return {"suspicious": False, "findings": [], "summary": "Analysis incomplete (tool-call limit)"}
|
|
|
|
|
|
def send_alert(title, message, priority=5):
|
|
"""Send alert via Gotify."""
|
|
if GOTIFY_TOKEN == "YOUR_TOKEN_HERE":
|
|
print(f"[ALERT - Gotify not configured] {title}: {message}")
|
|
return
|
|
|
|
try:
|
|
response = requests.post(
|
|
GOTIFY_URL,
|
|
params={"token": GOTIFY_TOKEN},
|
|
json={
|
|
"title": title,
|
|
"message": message,
|
|
"priority": priority,
|
|
},
|
|
timeout=10
|
|
)
|
|
response.raise_for_status()
|
|
print(f"Alert sent: {title}")
|
|
except requests.exceptions.RequestException as e:
|
|
print(f"Failed to send Gotify alert: {e}")
|
|
|
|
|
|
def format_findings(findings):
|
|
"""Format findings for alert message."""
|
|
lines = []
|
|
for f in findings[:5]: # Limit to 5 findings per alert
|
|
lines.append(f"[{f['severity'].upper()}] {f['type']}")
|
|
if f.get("ip"):
|
|
lines.append(f" IP: {f['ip']}")
|
|
lines.append(f" Evidence: {f['evidence'][:100]}...")
|
|
lines.append(f" Action: {f['recommendation']}")
|
|
lines.append("")
|
|
return "\n".join(lines)
|
|
|
|
|
|
def print_log_stats(logs):
|
|
"""Print verbose statistics about the logs."""
|
|
if not logs:
|
|
print("\n No logs to analyze")
|
|
return
|
|
|
|
parsed_logs = [parse_log_line(line) for line in logs]
|
|
parsed_logs = [p for p in parsed_logs if p] # Filter None
|
|
|
|
if not parsed_logs:
|
|
print("\n No parseable logs found")
|
|
return
|
|
|
|
# Time range
|
|
timestamps = [p["timestamp"] for p in parsed_logs]
|
|
print(f"\n{'─' * 60}")
|
|
print("LOG STATISTICS")
|
|
print(f"{'─' * 60}")
|
|
print(f" Time range: {timestamps[0]}")
|
|
print(f" {timestamps[-1]}")
|
|
print(f" Total entries: {len(parsed_logs)}")
|
|
|
|
# Status codes
|
|
status_counts = Counter(p["status"] for p in parsed_logs)
|
|
print(f"\n Status Codes:")
|
|
for status, count in sorted(status_counts.items()):
|
|
bar = "█" * min(count // 5, 30)
|
|
print(f" {status}: {count:>5} {bar}")
|
|
|
|
# Top IPs
|
|
ip_counts = Counter(p["ip"] for p in parsed_logs)
|
|
print(f"\n Top Source IPs:")
|
|
for ip, count in ip_counts.most_common(5):
|
|
pct = (count / len(parsed_logs)) * 100
|
|
print(f" {ip:<18} {count:>5} ({pct:.1f}%)")
|
|
|
|
# Services
|
|
service_counts = Counter(p["service"] for p in parsed_logs)
|
|
print(f"\n Requests by Service:")
|
|
for service, count in service_counts.most_common(10):
|
|
pct = (count / len(parsed_logs)) * 100
|
|
print(f" {service:<25} {count:>5} ({pct:.1f}%)")
|
|
|
|
# HTTP Methods
|
|
method_counts = Counter()
|
|
for p in parsed_logs:
|
|
method = p["request"].split()[0] if p["request"] else "UNKNOWN"
|
|
method_counts[method] += 1
|
|
print(f"\n HTTP Methods:")
|
|
for method, count in method_counts.most_common():
|
|
print(f" {method:<8} {count:>5}")
|
|
|
|
# Latency stats
|
|
latencies = [p["latency_ms"] for p in parsed_logs]
|
|
avg_latency = sum(latencies) / len(latencies)
|
|
max_latency = max(latencies)
|
|
print(f"\n Latency:")
|
|
print(f" Average: {avg_latency:.1f}ms")
|
|
print(f" Max: {max_latency}ms")
|
|
|
|
# Error rate
|
|
errors = sum(1 for p in parsed_logs if p["status"] >= 400)
|
|
error_rate = (errors / len(parsed_logs)) * 100
|
|
print(f"\n Error rate: {error_rate:.1f}% ({errors} requests)")
|
|
print(f"{'─' * 60}\n")
|
|
|
|
|
|
def process_batch(logs, args):
|
|
"""Process a batch of log lines: immediate alerts + LLM analysis + alerting."""
|
|
all_findings = []
|
|
|
|
# Check for immediate alert patterns
|
|
immediate_alerts = check_immediate_alerts(logs)
|
|
if immediate_alerts:
|
|
all_findings.extend(immediate_alerts)
|
|
print(f" Found {len(immediate_alerts)} immediate alert patterns")
|
|
|
|
# Filter noise and analyze with LLM
|
|
filtered_logs = filter_noise(logs)
|
|
print(f" Filtered {len(logs)} -> {len(filtered_logs)} logs for LLM analysis")
|
|
|
|
if filtered_logs:
|
|
llm_available = True
|
|
for i in range(0, len(filtered_logs), BATCH_SIZE):
|
|
if not llm_available:
|
|
print(" Skipping remaining batches (LLM unavailable)")
|
|
break
|
|
batch = filtered_logs[i:i + BATCH_SIZE]
|
|
print(f" Analyzing batch {i // BATCH_SIZE + 1} ({len(batch)} logs)...")
|
|
|
|
result = analyze_with_llm(batch)
|
|
if result is None:
|
|
llm_available = False
|
|
continue
|
|
if result.get("suspicious"):
|
|
findings = result.get("findings", [])
|
|
# Record LLM findings to threat DB
|
|
if DB_SERVER:
|
|
for f in findings:
|
|
ip = f.get("ip")
|
|
if ip and not is_private_ip(ip):
|
|
try:
|
|
threat_db.record_event(
|
|
ip, f.get("type", "unknown"), "llm",
|
|
severity=f.get("severity"),
|
|
evidence=f.get("evidence", "")[:500]
|
|
)
|
|
except Exception as e:
|
|
print(f" Failed to record LLM finding to DB: {e}")
|
|
log_abuse(ip, f.get("type", "unknown"))
|
|
all_findings.extend(findings)
|
|
print(f" LLM found suspicious activity: {result.get('summary')}")
|
|
|
|
# Send consolidated alert if findings exist
|
|
if all_findings:
|
|
severity_order = {"critical": 0, "high": 1, "medium": 2, "low": 3}
|
|
all_findings.sort(key=lambda x: severity_order.get(x.get("severity", "low"), 3))
|
|
|
|
max_severity = all_findings[0].get("severity", "low")
|
|
priority = {"critical": 10, "high": 8, "medium": 5, "low": 3}.get(max_severity, 5)
|
|
|
|
alert_msg = format_findings(all_findings)
|
|
|
|
if args.verbose:
|
|
print(f"\n{'─' * 60}")
|
|
print("FINDINGS")
|
|
print(f"{'─' * 60}")
|
|
print(alert_msg)
|
|
|
|
if not args.dry_run:
|
|
send_alert(
|
|
f"Web Security Alert ({len(all_findings)} findings)",
|
|
alert_msg,
|
|
priority=priority
|
|
)
|
|
else:
|
|
print(f" [Dry run] Would send alert with {len(all_findings)} findings")
|
|
else:
|
|
print(" No suspicious activity detected")
|
|
|
|
|
|
def run_daemon(args):
|
|
"""Run as a long-lived daemon, tailing the log file in real time."""
|
|
print(f"[{datetime.now().isoformat()}] Starting daemon mode")
|
|
print(f" Log file: {LOG_PATH}")
|
|
print(f" LLM interval: {LLM_INTERVAL}s")
|
|
print(f" Poll interval: {TAIL_POLL_INTERVAL}s")
|
|
if DB_SERVER:
|
|
print(f" Threat DB: {DB_SERVER}/{DB_NAME}")
|
|
else:
|
|
print(" Threat DB: disabled (no DB_SERVER configured)")
|
|
|
|
buffer = []
|
|
last_flush = time.monotonic()
|
|
last_save = time.monotonic()
|
|
current_pos = get_last_position()
|
|
|
|
for line, pos in tail_log(LOG_PATH):
|
|
current_pos = pos
|
|
|
|
# Immediate regex check on every line
|
|
immediate = check_immediate_alerts([line])
|
|
if immediate:
|
|
print(f"[{datetime.now().isoformat()}] Immediate alert: {immediate[0]['type']}")
|
|
|
|
buffer.append(line)
|
|
|
|
now = time.monotonic()
|
|
|
|
# Flush buffer to LLM at interval
|
|
if now - last_flush >= LLM_INTERVAL and buffer:
|
|
# Cap to BATCH_SIZE to avoid overwhelming the LLM with huge backlogs.
|
|
# Remaining lines carry forward to the next interval.
|
|
flush_batch = buffer[:BATCH_SIZE]
|
|
buffer = buffer[BATCH_SIZE:]
|
|
if buffer:
|
|
print(f"\n[{datetime.now().isoformat()}] Flushing {len(flush_batch)} lines to LLM ({len(buffer)} deferred)...")
|
|
else:
|
|
print(f"\n[{datetime.now().isoformat()}] Flushing {len(flush_batch)} lines to LLM...")
|
|
if args.verbose:
|
|
print_log_stats(flush_batch)
|
|
process_batch(flush_batch, args)
|
|
# Reset timer AFTER processing so we don't immediately flush again
|
|
last_flush = time.monotonic()
|
|
|
|
# Save position periodically
|
|
if now - last_save >= 30:
|
|
if not args.dry_run:
|
|
save_position(current_pos)
|
|
last_save = now
|
|
|
|
# Shutdown: flush remaining buffer and save state
|
|
if buffer:
|
|
print(f"\n[{datetime.now().isoformat()}] Shutdown: flushing {len(buffer)} remaining lines...")
|
|
process_batch(buffer, args)
|
|
|
|
if not args.dry_run:
|
|
save_position(current_pos)
|
|
print(f"Final position saved: {current_pos}")
|
|
|
|
|
|
def run_once(args):
|
|
"""Run a single analysis pass (original cron-job behavior)."""
|
|
print(f"[{datetime.now().isoformat()}] Starting single-run analysis...")
|
|
if args.dry_run:
|
|
print("(Dry run - no alerts will be sent, state will not be updated)")
|
|
|
|
# Get current position
|
|
last_pos = get_last_position()
|
|
total_lines = get_total_lines()
|
|
|
|
# If first run or log rotated, start from recent logs
|
|
if last_pos == 0 or last_pos > total_lines:
|
|
last_pos = max(0, total_lines - MAX_LINES_PER_RUN)
|
|
print(f"Starting from line {last_pos} (total: {total_lines})")
|
|
|
|
lines_to_process = min(total_lines - last_pos, MAX_LINES_PER_RUN)
|
|
if lines_to_process <= 0:
|
|
print("No new logs to process")
|
|
return
|
|
|
|
print(f"Processing {lines_to_process} new log lines...")
|
|
|
|
# Fetch logs
|
|
logs = fetch_logs(last_pos, lines_to_process, total_lines)
|
|
if not logs:
|
|
print("No logs fetched")
|
|
return
|
|
|
|
# Show verbose stats if requested
|
|
if args.verbose:
|
|
print_log_stats(logs)
|
|
|
|
process_batch(logs, args)
|
|
|
|
# Save position
|
|
if not args.dry_run:
|
|
save_position(last_pos + len(logs))
|
|
print(f"Processed up to line {last_pos + len(logs)}")
|
|
else:
|
|
print(f"[Dry run] Would save position: {last_pos + len(logs)}")
|
|
|
|
|
|
def parse_args():
|
|
"""Parse command line arguments."""
|
|
parser = argparse.ArgumentParser(
|
|
description="Analyze Traefik access logs for suspicious activity using LLM"
|
|
)
|
|
parser.add_argument(
|
|
"-v", "--verbose",
|
|
action="store_true",
|
|
help="Show detailed log statistics"
|
|
)
|
|
parser.add_argument(
|
|
"--dry-run",
|
|
action="store_true",
|
|
help="Analyze logs but don't send alerts or update state"
|
|
)
|
|
parser.add_argument(
|
|
"--once",
|
|
action="store_true",
|
|
help="Run a single analysis pass and exit (original cron-job mode)"
|
|
)
|
|
return parser.parse_args()
|
|
|
|
|
|
def main():
|
|
args = parse_args()
|
|
|
|
# Initialize threat database if configured
|
|
if DB_SERVER and DB_NAME and DB_USER and DB_PASSWORD:
|
|
try:
|
|
threat_db.init_db(DB_SERVER, DB_NAME, DB_USER, DB_PASSWORD)
|
|
print(f"Threat database initialized: {DB_SERVER}/{DB_NAME}")
|
|
except Exception as e:
|
|
print(f"Warning: Failed to initialize threat database: {e}")
|
|
print("Continuing without historical threat data.")
|
|
else:
|
|
print("Threat database not configured — running without historical data.")
|
|
|
|
if args.once:
|
|
run_once(args)
|
|
else:
|
|
# Daemon mode — register signal handlers for graceful shutdown
|
|
signal.signal(signal.SIGTERM, _handle_signal)
|
|
signal.signal(signal.SIGINT, _handle_signal)
|
|
run_daemon(args)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|