Files
web-log-monitor/web-log-monitor.py
AJ Isaacs 1a06eeb601 Only log high/critical findings to abuse log for fail2ban
Previously all LLM-flagged findings were written to the abuse log,
causing fail2ban to potentially ban IPs for low-severity activity.
Now only high and critical severity findings trigger abuse logging.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-09 21:07:59 -05:00

805 lines
29 KiB
Python

#!/usr/bin/env python3
"""
Web Log Security Monitor
Real-time daemon that analyzes Traefik access logs using local LLM on athena.lan.
Sends alerts via Gotify when suspicious activity is detected.
Uses LLM tool calling to query historical threat data for context-aware analysis.
"""
import argparse
import os
import signal
import subprocess
import requests
import json
import re
import time
from collections import Counter
from datetime import datetime
from pathlib import Path
import ipaddress
import threat_db
# Configuration from environment variables
LLAMA_URL = os.environ.get("LLAMA_URL", "http://localhost:11434/v1/chat/completions")
MODEL = os.environ.get("MODEL", "Qwen3-8B-Q6_K")
GOTIFY_URL = os.environ.get("GOTIFY_URL", "")
GOTIFY_TOKEN = os.environ.get("GOTIFY_TOKEN", "")
LOG_PATH = os.environ.get("LOG_PATH", "/logs/access.log")
LOG_MODE = os.environ.get("LOG_MODE", "local")
BARGE_HOST = os.environ.get("BARGE_HOST", "barge.lan")
ABUSE_LOG = os.environ.get("ABUSE_LOG", "/data/abuse.log")
STATE_DIR = os.environ.get("STATE_DIR", "/data")
BATCH_SIZE = int(os.environ.get("BATCH_SIZE", "100"))
MAX_LINES_PER_RUN = int(os.environ.get("MAX_LINES_PER_RUN", "1000"))
LLM_INTERVAL = int(os.environ.get("LLM_INTERVAL", "25"))
TAIL_POLL_INTERVAL = int(os.environ.get("TAIL_POLL_INTERVAL", "1"))
DB_SERVER = os.environ.get("DB_SERVER")
DB_NAME = os.environ.get("DB_NAME")
DB_USER = os.environ.get("DB_USER")
DB_PASSWORD = os.environ.get("DB_PASSWORD")
# State file location
if STATE_DIR:
STATE_FILE = Path(STATE_DIR) / ".web-log-monitor-state"
else:
SCRIPT_DIR = Path(__file__).parent
STATE_FILE = SCRIPT_DIR / ".web-log-monitor-state"
# Pre-filter patterns to reduce LLM load (obvious attacks get flagged immediately)
IMMEDIATE_ALERT_PATTERNS = [
(r"(?:union\s+select|select\s+.*\s+from|insert\s+into|delete\s+from|drop\s+table)", "SQL Injection"),
(r"(?:\.\./|\.\.\\|%2e%2e%2f|%2e%2e/|\.\.%2f)", "Path Traversal"),
(r"(?:<script|javascript:|onerror=|onload=|%3cscript)", "XSS Attempt"),
(r"(?:/etc/passwd|/etc/shadow|/proc/self)", "LFI Attempt"),
(r"(?:;\s*(?:ls|cat|wget|curl|bash|sh|nc)\s)", "Command Injection"),
(r"(?:wp-login|wp-admin|xmlrpc\.php).*(?:POST|HEAD)", "WordPress Scan"),
(r"(?:phpmyadmin|pma|mysqladmin)", "DB Admin Scan"),
(r"(?:\.env|/\.git/|\.aws/|\.ssh/)", "Sensitive File Probe"),
]
SYSTEM_PROMPT = """/no_think
You are a security analyst reviewing web server access logs. You have access to a historical threat database that you can query using tool calls.
Analyze the logs for suspicious patterns including:
1. Reconnaissance: Port scans, directory enumeration, technology fingerprinting
2. Brute force: Repeated auth failures, credential stuffing patterns
3. Injection attempts: SQL, command, LDAP, XPath injection
4. Path traversal: Attempts to access files outside webroot
5. Scanner signatures: Known vulnerability scanners, malicious bots
6. Anomalies: Unusual request patterns, suspicious user agents, weird timing
IMPORTANT: When you see suspicious IPs, use the lookup_ip tool to check if they are repeat offenders. Use get_threat_summary to understand the current threat landscape. Use get_recent_attacks to check for ongoing attack campaigns. This historical context should inform your severity ratings — repeat offenders and IPs involved in multiple attack types should be rated more severely.
After your analysis (and any tool calls), respond with JSON only (no markdown, no explanation):
{
"suspicious": true/false,
"findings": [
{
"severity": "low|medium|high|critical",
"type": "category of attack",
"ip": "source IP if identifiable",
"evidence": "specific log line or pattern",
"recommendation": "brief action to take"
}
],
"summary": "one sentence overview"
}
If nothing suspicious, return: {"suspicious": false, "findings": [], "summary": "No suspicious activity detected"}
IMPORTANT CONTEXT about the infrastructure behind this reverse proxy (Traefik):
- Gitea (git server): Expect git clone/push/pull traffic with URLs like /user/repo.git/info/refs, /user/repo.git/git-upload-pack, etc. This is NORMAL git protocol traffic, not sensitive file probing. Gitea also serves web UI pages for browsing repositories.
- Nextcloud (cloud storage): Expect heavy API traffic including WebDAV, OCS API calls, app-specific endpoints (bookmarks, calendar, contacts sync). Burst of requests from a single IP to Nextcloud is normal client sync behavior, NOT reconnaissance.
- Immich (photo management): Expect API calls for photo sync and uploads from mobile/desktop clients.
- Gotify (push notifications): Expect WebSocket connections and message API calls.
- Other self-hosted services behind this proxy generate legitimate automated traffic patterns.
Do NOT flag the following as suspicious:
- Git protocol operations to Gitea repositories (even with 401 responses — git auth negotiation starts with a 401)
- Nextcloud sync bursts (many rapid requests from one IP to Nextcloud endpoints)
- Authenticated API traffic to any of the above services
- Standard browser navigation patterns to any hosted service
Be conservative - normal traffic like health checks, static assets, and authenticated user activity is not suspicious."""
# Tool definitions for the LLM (OpenAI function-calling format)
LLM_TOOLS = [
{
"type": "function",
"function": {
"name": "lookup_ip",
"description": "Look up historical threat data for an IP address. Returns past event count, attack types, and first/last seen timestamps. Use this to check if an IP is a repeat offender.",
"parameters": {
"type": "object",
"properties": {
"ip": {
"type": "string",
"description": "The IP address to look up"
}
},
"required": ["ip"]
}
}
},
{
"type": "function",
"function": {
"name": "get_threat_summary",
"description": "Get aggregate threat statistics for the last N hours. Returns total events, unique IPs, top attack types, and top source IPs.",
"parameters": {
"type": "object",
"properties": {
"hours": {
"type": "integer",
"description": "Number of hours to look back (default 24)"
}
},
"required": []
}
}
},
{
"type": "function",
"function": {
"name": "get_recent_attacks",
"description": "Get list of recent attack events. Use to check for ongoing attack campaigns or patterns.",
"parameters": {
"type": "object",
"properties": {
"hours": {
"type": "integer",
"description": "Number of hours to look back (default 1)"
}
},
"required": []
}
}
},
]
# Graceful shutdown flag
_shutdown = False
def _handle_signal(signum, frame):
global _shutdown
print(f"\n[{datetime.now().isoformat()}] Received signal {signum}, shutting down...")
_shutdown = True
def get_last_position():
"""Get the last processed byte position from state file."""
if STATE_FILE.exists():
try:
return int(STATE_FILE.read_text().strip())
except (ValueError, IOError):
pass
return 0
def save_position(position):
"""Save current byte position to state file."""
STATE_FILE.write_text(str(position))
def tail_log(path):
"""Generator that yields new lines from a log file, handling rotation.
Uses byte-position tracking. On each poll:
- If file shrank (rotation), reset to beginning.
- Read any new bytes, yield complete lines.
"""
pos = get_last_position()
while not _shutdown:
try:
size = os.path.getsize(path)
except OSError:
time.sleep(TAIL_POLL_INTERVAL)
continue
# File shrank — log was rotated
if size < pos:
print(f"[{datetime.now().isoformat()}] Log rotation detected, resetting to beginning")
pos = 0
if size == pos:
time.sleep(TAIL_POLL_INTERVAL)
continue
try:
with open(path, "r", encoding="utf-8", errors="replace") as f:
f.seek(pos)
data = f.read()
new_pos = f.tell()
except OSError as e:
print(f"Error reading log file: {e}")
time.sleep(TAIL_POLL_INTERVAL)
continue
# Split into lines, keeping any incomplete trailing line for next read
lines = data.split("\n")
if data.endswith("\n"):
# All lines complete
complete_lines = [l for l in lines if l]
pos = new_pos
else:
# Last line is incomplete — don't advance past it
complete_lines = [l for l in lines[:-1] if l]
pos = new_pos - len(lines[-1].encode("utf-8"))
for line in complete_lines:
yield line, pos
# Generator exits on shutdown
def fetch_logs(start_line, max_lines, total_lines=None):
"""Fetch logs from log file (local or via SSH). Used in --once mode."""
if LOG_MODE == "local":
try:
# Use tail for efficient reading from end of large files
if total_lines is None:
total_lines = get_total_lines()
lines_from_end = total_lines - start_line
if lines_from_end <= 0:
return []
cmd = f"tail -n {lines_from_end} '{LOG_PATH}' | head -n {max_lines}"
result = subprocess.run(cmd, shell=True, capture_output=True, encoding="utf-8", errors="replace", timeout=60)
if result.returncode != 0:
raise RuntimeError(f"Failed to read logs: {result.stderr}")
return result.stdout.strip().split("\n") if result.stdout.strip() else []
except Exception as e:
raise RuntimeError(f"Failed to read logs: {e}")
else:
cmd = f"ssh {BARGE_HOST} \"tail -n +{start_line + 1} {LOG_PATH} | head -n {max_lines}\""
result = subprocess.run(cmd, shell=True, capture_output=True, encoding="utf-8", errors="replace", timeout=60)
if result.returncode != 0:
raise RuntimeError(f"Failed to fetch logs: {result.stderr}")
return result.stdout.strip().split("\n") if result.stdout.strip() else []
def get_total_lines():
"""Get total line count of the log file."""
if LOG_MODE == "local":
cmd = f"wc -l < '{LOG_PATH}'"
result = subprocess.run(cmd, shell=True, capture_output=True, encoding="utf-8", errors="replace", timeout=30)
if result.returncode != 0:
raise RuntimeError(f"Failed to count lines: {result.stderr}")
return int(result.stdout.strip())
else:
cmd = f"ssh {BARGE_HOST} \"wc -l < {LOG_PATH}\""
result = subprocess.run(cmd, shell=True, capture_output=True, encoding="utf-8", errors="replace", timeout=30)
return int(result.stdout.strip())
def parse_log_line(line):
"""Parse a Traefik common log format line."""
pattern = r'^(\S+) - - \[([^\]]+)\] "([^"]+)" (\d+) (\d+) "([^"]*)" "([^"]*)" (\d+) "([^"]*)" "([^"]*)" (\d+)ms$'
match = re.match(pattern, line)
if match:
return {
"ip": match.group(1),
"timestamp": match.group(2),
"request": match.group(3),
"status": int(match.group(4)),
"bytes": int(match.group(5)),
"referer": match.group(6),
"user_agent": match.group(7),
"request_id": match.group(8),
"service": match.group(9),
"backend": match.group(10),
"latency_ms": int(match.group(11)),
}
return None
def is_private_ip(ip_str):
"""Check if an IP address is private/internal."""
try:
return ipaddress.ip_address(ip_str).is_private
except ValueError:
return False
def log_abuse(ip, attack_type):
"""Write abuse detection to log file for fail2ban consumption."""
if not ABUSE_LOG:
return
if is_private_ip(ip):
return
timestamp = datetime.now().strftime("%Y-%m-%dT%H:%M:%S")
entry = f"{timestamp} web-log-monitor: abuse from {ip} - {attack_type}\n"
try:
with open(ABUSE_LOG, "a") as f:
f.write(entry)
except OSError as e:
print(f"Failed to write abuse log: {e}")
def check_immediate_alerts(logs, record=True):
"""Check for patterns that warrant immediate alerting without LLM.
When record=True (daemon mode), also records events to the threat DB.
"""
alerts = []
for line in logs:
for pattern, attack_type in IMMEDIATE_ALERT_PATTERNS:
if re.search(pattern, line, re.IGNORECASE):
parsed = parse_log_line(line)
if parsed:
log_abuse(parsed["ip"], attack_type)
if record and DB_SERVER:
try:
threat_db.record_event(
parsed["ip"], attack_type, "regex",
severity="high", evidence=line[:500]
)
except Exception as e:
print(f"Failed to record event to DB: {e}")
alerts.append({
"severity": "high",
"type": attack_type,
"evidence": line[:200],
"recommendation": "Review and consider blocking source IP"
})
break # One alert per line
return alerts
def filter_noise(logs):
"""Filter out known-good traffic to reduce LLM processing."""
filtered = []
for line in logs:
parsed = parse_log_line(line)
if not parsed:
continue
# Skip internal health checks from Firewalla/monitoring
if parsed["ip"] == "192.168.10.1" and "/health" in parsed["request"]:
continue
# Skip successful static asset requests
if parsed["status"] == 200 and re.search(r"\.(css|js|png|jpg|ico|woff|svg)(\?|$)", parsed["request"]):
continue
# Skip public Gitea repository browsing (actual attacks still caught by regex alerts)
if "gitea" in parsed["service"].lower():
continue
# Keep everything else for analysis
filtered.append(line)
return filtered
def execute_tool_call(name, arguments):
"""Execute an LLM tool call against the threat database."""
try:
if name == "lookup_ip":
return threat_db.lookup_ip(arguments["ip"])
elif name == "get_threat_summary":
hours = arguments.get("hours", 24)
return threat_db.get_threat_summary(hours)
elif name == "get_recent_attacks":
hours = arguments.get("hours", 1)
return threat_db.get_recent_attacks(hours)
else:
return {"error": f"Unknown tool: {name}"}
except Exception as e:
return {"error": str(e)}
def analyze_with_llm(logs):
"""Send logs to llama.cpp for analysis with tool-calling support."""
if not logs:
return {"suspicious": False, "findings": [], "summary": "No logs to analyze"}
log_text = "\n".join(logs[:BATCH_SIZE])
messages = [
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": f"Analyze these access logs:\n\n{log_text}"}
]
# Include tools only if the threat DB is available
use_tools = DB_SERVER is not None
max_rounds = 3
for round_num in range(max_rounds + 1):
payload = {
"model": MODEL,
"messages": messages,
"temperature": 0.1,
"max_tokens": 1024,
}
if use_tools and round_num < max_rounds:
payload["tools"] = LLM_TOOLS
try:
response = requests.post(LLAMA_URL, json=payload, timeout=120)
response.raise_for_status()
choice = response.json()["choices"][0]
message = choice["message"]
except requests.exceptions.RequestException as e:
print(f"LLM request failed: {e}")
return None
except (KeyError, IndexError) as e:
print(f"Unexpected LLM response format: {e}")
return None
# Check if the LLM wants to call tools
tool_calls = message.get("tool_calls")
if tool_calls and round_num < max_rounds:
# Add the assistant message with tool calls to conversation
messages.append(message)
for tc in tool_calls:
func = tc["function"]
try:
args = json.loads(func["arguments"]) if isinstance(func["arguments"], str) else func["arguments"]
except json.JSONDecodeError:
args = {}
result = execute_tool_call(func["name"], args)
result_json = json.dumps(result)
print(f" Tool call: {func['name']}({args}) -> {result_json[:200]}")
messages.append({
"role": "tool",
"tool_call_id": tc["id"],
"content": result_json
})
# Continue loop — send tool results back to LLM
continue
# Final text response — parse JSON
content = message.get("content", "")
try:
json_match = re.search(r'\{[\s\S]*\}', content)
if json_match:
return json.loads(json_match.group())
return {"suspicious": False, "findings": [], "summary": "Failed to parse LLM response"}
except json.JSONDecodeError as e:
print(f"Failed to parse LLM response: {e}")
print(f"Raw response: {content[:500]}")
return None
# Exhausted tool-call rounds without a final answer
print("Warning: LLM exhausted tool-call rounds without a final response")
return {"suspicious": False, "findings": [], "summary": "Analysis incomplete (tool-call limit)"}
def send_alert(title, message, priority=5):
"""Send alert via Gotify."""
if GOTIFY_TOKEN == "YOUR_TOKEN_HERE":
print(f"[ALERT - Gotify not configured] {title}: {message}")
return
try:
response = requests.post(
GOTIFY_URL,
params={"token": GOTIFY_TOKEN},
json={
"title": title,
"message": message,
"priority": priority,
},
timeout=10
)
response.raise_for_status()
print(f"Alert sent: {title}")
except requests.exceptions.RequestException as e:
print(f"Failed to send Gotify alert: {e}")
def format_findings(findings):
"""Format findings for alert message."""
lines = []
for f in findings[:5]: # Limit to 5 findings per alert
lines.append(f"[{f['severity'].upper()}] {f['type']}")
if f.get("ip"):
lines.append(f" IP: {f['ip']}")
lines.append(f" Evidence: {f['evidence'][:100]}...")
lines.append(f" Action: {f['recommendation']}")
lines.append("")
return "\n".join(lines)
def print_log_stats(logs):
"""Print verbose statistics about the logs."""
if not logs:
print("\n No logs to analyze")
return
parsed_logs = [parse_log_line(line) for line in logs]
parsed_logs = [p for p in parsed_logs if p] # Filter None
if not parsed_logs:
print("\n No parseable logs found")
return
# Time range
timestamps = [p["timestamp"] for p in parsed_logs]
print(f"\n{'' * 60}")
print("LOG STATISTICS")
print(f"{'' * 60}")
print(f" Time range: {timestamps[0]}")
print(f" {timestamps[-1]}")
print(f" Total entries: {len(parsed_logs)}")
# Status codes
status_counts = Counter(p["status"] for p in parsed_logs)
print(f"\n Status Codes:")
for status, count in sorted(status_counts.items()):
bar = "" * min(count // 5, 30)
print(f" {status}: {count:>5} {bar}")
# Top IPs
ip_counts = Counter(p["ip"] for p in parsed_logs)
print(f"\n Top Source IPs:")
for ip, count in ip_counts.most_common(5):
pct = (count / len(parsed_logs)) * 100
print(f" {ip:<18} {count:>5} ({pct:.1f}%)")
# Services
service_counts = Counter(p["service"] for p in parsed_logs)
print(f"\n Requests by Service:")
for service, count in service_counts.most_common(10):
pct = (count / len(parsed_logs)) * 100
print(f" {service:<25} {count:>5} ({pct:.1f}%)")
# HTTP Methods
method_counts = Counter()
for p in parsed_logs:
method = p["request"].split()[0] if p["request"] else "UNKNOWN"
method_counts[method] += 1
print(f"\n HTTP Methods:")
for method, count in method_counts.most_common():
print(f" {method:<8} {count:>5}")
# Latency stats
latencies = [p["latency_ms"] for p in parsed_logs]
avg_latency = sum(latencies) / len(latencies)
max_latency = max(latencies)
print(f"\n Latency:")
print(f" Average: {avg_latency:.1f}ms")
print(f" Max: {max_latency}ms")
# Error rate
errors = sum(1 for p in parsed_logs if p["status"] >= 400)
error_rate = (errors / len(parsed_logs)) * 100
print(f"\n Error rate: {error_rate:.1f}% ({errors} requests)")
print(f"{'' * 60}\n")
def process_batch(logs, args):
"""Process a batch of log lines: immediate alerts + LLM analysis + alerting."""
all_findings = []
# Check for immediate alert patterns
immediate_alerts = check_immediate_alerts(logs)
if immediate_alerts:
all_findings.extend(immediate_alerts)
print(f" Found {len(immediate_alerts)} immediate alert patterns")
# Filter noise and analyze with LLM
filtered_logs = filter_noise(logs)
print(f" Filtered {len(logs)} -> {len(filtered_logs)} logs for LLM analysis")
if filtered_logs:
llm_available = True
for i in range(0, len(filtered_logs), BATCH_SIZE):
if not llm_available:
print(" Skipping remaining batches (LLM unavailable)")
break
batch = filtered_logs[i:i + BATCH_SIZE]
print(f" Analyzing batch {i // BATCH_SIZE + 1} ({len(batch)} logs)...")
result = analyze_with_llm(batch)
if result is None:
llm_available = False
continue
if result.get("suspicious"):
findings = result.get("findings", [])
# Record LLM findings to threat DB
if DB_SERVER:
for f in findings:
ip = f.get("ip")
if ip and not is_private_ip(ip):
try:
threat_db.record_event(
ip, f.get("type", "unknown"), "llm",
severity=f.get("severity"),
evidence=f.get("evidence", "")[:500]
)
except Exception as e:
print(f" Failed to record LLM finding to DB: {e}")
if f.get("severity") in ("high", "critical"):
log_abuse(ip, f.get("type", "unknown"))
all_findings.extend(findings)
print(f" LLM found suspicious activity: {result.get('summary')}")
# Send consolidated alert if findings exist
if all_findings:
severity_order = {"critical": 0, "high": 1, "medium": 2, "low": 3}
all_findings.sort(key=lambda x: severity_order.get(x.get("severity", "low"), 3))
max_severity = all_findings[0].get("severity", "low")
priority = {"critical": 10, "high": 8, "medium": 5, "low": 3}.get(max_severity, 5)
alert_msg = format_findings(all_findings)
if args.verbose:
print(f"\n{'' * 60}")
print("FINDINGS")
print(f"{'' * 60}")
print(alert_msg)
if not args.dry_run:
send_alert(
f"Web Security Alert ({len(all_findings)} findings)",
alert_msg,
priority=priority
)
else:
print(f" [Dry run] Would send alert with {len(all_findings)} findings")
else:
print(" No suspicious activity detected")
def run_daemon(args):
"""Run as a long-lived daemon, tailing the log file in real time."""
print(f"[{datetime.now().isoformat()}] Starting daemon mode")
print(f" Log file: {LOG_PATH}")
print(f" LLM interval: {LLM_INTERVAL}s")
print(f" Poll interval: {TAIL_POLL_INTERVAL}s")
if DB_SERVER:
print(f" Threat DB: {DB_SERVER}/{DB_NAME}")
else:
print(" Threat DB: disabled (no DB_SERVER configured)")
buffer = []
last_flush = time.monotonic()
last_save = time.monotonic()
current_pos = get_last_position()
for line, pos in tail_log(LOG_PATH):
current_pos = pos
# Immediate regex check on every line
immediate = check_immediate_alerts([line])
if immediate:
print(f"[{datetime.now().isoformat()}] Immediate alert: {immediate[0]['type']}")
buffer.append(line)
now = time.monotonic()
# Flush buffer to LLM at interval
if now - last_flush >= LLM_INTERVAL and buffer:
# Cap to BATCH_SIZE to avoid overwhelming the LLM with huge backlogs.
# Remaining lines carry forward to the next interval.
flush_batch = buffer[:BATCH_SIZE]
buffer = buffer[BATCH_SIZE:]
if buffer:
print(f"\n[{datetime.now().isoformat()}] Flushing {len(flush_batch)} lines to LLM ({len(buffer)} deferred)...")
else:
print(f"\n[{datetime.now().isoformat()}] Flushing {len(flush_batch)} lines to LLM...")
if args.verbose:
print_log_stats(flush_batch)
process_batch(flush_batch, args)
# Reset timer AFTER processing so we don't immediately flush again
last_flush = time.monotonic()
# Save position periodically
if now - last_save >= 30:
if not args.dry_run:
save_position(current_pos)
last_save = now
# Shutdown: flush remaining buffer and save state
if buffer:
print(f"\n[{datetime.now().isoformat()}] Shutdown: flushing {len(buffer)} remaining lines...")
process_batch(buffer, args)
if not args.dry_run:
save_position(current_pos)
print(f"Final position saved: {current_pos}")
def run_once(args):
"""Run a single analysis pass (original cron-job behavior)."""
print(f"[{datetime.now().isoformat()}] Starting single-run analysis...")
if args.dry_run:
print("(Dry run - no alerts will be sent, state will not be updated)")
# Get current position
last_pos = get_last_position()
total_lines = get_total_lines()
# If first run or log rotated, start from recent logs
if last_pos == 0 or last_pos > total_lines:
last_pos = max(0, total_lines - MAX_LINES_PER_RUN)
print(f"Starting from line {last_pos} (total: {total_lines})")
lines_to_process = min(total_lines - last_pos, MAX_LINES_PER_RUN)
if lines_to_process <= 0:
print("No new logs to process")
return
print(f"Processing {lines_to_process} new log lines...")
# Fetch logs
logs = fetch_logs(last_pos, lines_to_process, total_lines)
if not logs:
print("No logs fetched")
return
# Show verbose stats if requested
if args.verbose:
print_log_stats(logs)
process_batch(logs, args)
# Save position
if not args.dry_run:
save_position(last_pos + len(logs))
print(f"Processed up to line {last_pos + len(logs)}")
else:
print(f"[Dry run] Would save position: {last_pos + len(logs)}")
def parse_args():
"""Parse command line arguments."""
parser = argparse.ArgumentParser(
description="Analyze Traefik access logs for suspicious activity using LLM"
)
parser.add_argument(
"-v", "--verbose",
action="store_true",
help="Show detailed log statistics"
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Analyze logs but don't send alerts or update state"
)
parser.add_argument(
"--once",
action="store_true",
help="Run a single analysis pass and exit (original cron-job mode)"
)
return parser.parse_args()
def main():
args = parse_args()
# Initialize threat database if configured
if DB_SERVER and DB_NAME and DB_USER and DB_PASSWORD:
try:
threat_db.init_db(DB_SERVER, DB_NAME, DB_USER, DB_PASSWORD)
print(f"Threat database initialized: {DB_SERVER}/{DB_NAME}")
except Exception as e:
print(f"Warning: Failed to initialize threat database: {e}")
print("Continuing without historical threat data.")
else:
print("Threat database not configured — running without historical data.")
if args.once:
run_once(args)
else:
# Daemon mode — register signal handlers for graceful shutdown
signal.signal(signal.SIGTERM, _handle_signal)
signal.signal(signal.SIGINT, _handle_signal)
run_daemon(args)
if __name__ == "__main__":
main()