Replace config.py with .env for Docker-standard configuration

Config was a Python file baked into the image or bind-mounted, requiring
a rebuild or manual file management for any settings change. Now uses
env_file in docker-compose with os.environ.get() calls, so config
changes only need a container restart. Also filters Gitea traffic from
LLM analysis to prevent false positive reconnaissance alerts on normal
repository browsing.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-02-08 19:29:09 -05:00
commit e3ee9fc193
10 changed files with 1222 additions and 0 deletions

39
.env.example Normal file
View File

@@ -0,0 +1,39 @@
# Web Log Monitor Configuration
# Copy this file to .env and fill in your values.
# LLM Configuration
LLAMA_URL=http://athena.lan:11434/v1/chat/completions
MODEL=Qwen3-8B-Q6_K
# Gotify Configuration
GOTIFY_URL=https://notify.thecozycat.net/message
GOTIFY_TOKEN=YOUR_TOKEN_HERE
# Log Source Configuration
# LOG_MODE: "local" for direct file access (Docker), "ssh" for remote access
LOG_PATH=/logs/access.log
LOG_MODE=local
# SSH settings (only used if LOG_MODE=ssh)
BARGE_HOST=barge.lan
# Abuse log for fail2ban
ABUSE_LOG=/data/abuse.log
# State file directory
STATE_DIR=/data
# Processing Settings
BATCH_SIZE=100
MAX_LINES_PER_RUN=1000
# Daemon Settings
LLM_INTERVAL=25
TAIL_POLL_INTERVAL=1
# Threat Database (SQL Server)
# Set all four values to enable historical threat tracking.
#DB_SERVER=barge.lan,1433
#DB_NAME=ThreatDB
#DB_USER=weblogmonitor
#DB_PASSWORD=your_password

5
.gitignore vendored Normal file
View File

@@ -0,0 +1,5 @@
.env
config.py
__pycache__/
data/
cron.log

27
Dockerfile Normal file
View File

@@ -0,0 +1,27 @@
FROM python:3.12-slim-bookworm
WORKDIR /app
# Install Microsoft ODBC Driver 18 for SQL Server
RUN apt-get update \
&& apt-get install -y --no-install-recommends curl gnupg2 apt-transport-https \
&& curl -fsSL https://packages.microsoft.com/keys/microsoft.asc | gpg --dearmor -o /usr/share/keyrings/microsoft-prod.gpg \
&& echo "deb [arch=amd64 signed-by=/usr/share/keyrings/microsoft-prod.gpg] https://packages.microsoft.com/debian/12/prod bookworm main" > /etc/apt/sources.list.d/mssql-release.list \
&& apt-get update \
&& ACCEPT_EULA=Y apt-get install -y --no-install-recommends msodbcsql18 unixodbc-dev \
&& apt-get purge -y curl gnupg2 apt-transport-https \
&& apt-get autoremove -y \
&& rm -rf /var/lib/apt/lists/*
# Install Python dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application
COPY web-log-monitor.py .
COPY threat_db.py .
# Create state directory
RUN mkdir -p /data
CMD ["python3", "-u", "web-log-monitor.py"]

98
README.md Normal file
View File

@@ -0,0 +1,98 @@
# Web Log Security Monitor
Analyzes Traefik access logs using a local LLM (llama.cpp) and sends alerts via Gotify when suspicious activity is detected.
## Docker Setup (Recommended)
Run alongside Traefik on barge.lan:
1. Copy the project to barge:
```bash
scp -r . barge.lan:/mnt/docker/web-log-monitor/
```
2. Create config from Docker template:
```bash
ssh barge.lan
cd /mnt/docker/web-log-monitor
cp config.docker.py config.py
nano config.py # Add your GOTIFY_TOKEN
```
3. Start the container:
```bash
docker compose up -d
```
4. View logs:
```bash
docker logs -f web-log-monitor
```
## Standalone Setup
For running on athena.lan (via SSH to barge):
1. Copy the config file and add your Gotify token:
```bash
cp config.example.py config.py
nano config.py # Add your GOTIFY_TOKEN
```
2. Test manually:
```bash
python3 web-log-monitor.py --verbose --dry-run
```
3. Add to cron (hourly):
```bash
crontab -e
# Add: 0 * * * * cd /path/to/web-log-monitor && python3 web-log-monitor.py
```
## Configuration
Edit `config.py`:
| Setting | Description |
|---------|-------------|
| `LLAMA_URL` | llama.cpp server endpoint |
| `MODEL` | Model name to use |
| `GOTIFY_URL` | Gotify server URL |
| `GOTIFY_TOKEN` | Gotify app token |
| `LOG_MODE` | `"local"` or `"ssh"` |
| `LOG_PATH` | Path to access.log |
| `BARGE_HOST` | SSH host (only for ssh mode) |
| `STATE_DIR` | Directory for state file |
| `BATCH_SIZE` | Lines per LLM call |
| `MAX_LINES_PER_RUN` | Max lines per execution |
## Command Line Options
```
python3 web-log-monitor.py [OPTIONS]
-v, --verbose Show detailed log statistics
--dry-run Analyze without sending alerts or updating state
```
## How It Works
1. Reads new logs (local file or via SSH)
2. Checks for obvious attack patterns (immediate alerts)
3. Filters noise (health checks, static assets)
4. Sends remaining logs to LLM for analysis
5. Consolidates findings and alerts via Gotify
## Files
```
├── Dockerfile
├── docker-compose.yml
├── config.py # Your config (gitignored)
├── config.example.py # Template for standalone
├── config.docker.py # Template for Docker
├── requirements.txt
├── web-log-monitor.py
└── systemd/ # Optional systemd units
```

15
docker-compose.yml Normal file
View File

@@ -0,0 +1,15 @@
services:
web-log-monitor:
build: .
container_name: web-log-monitor
restart: unless-stopped
env_file: .env
volumes:
# Mount Traefik logs (read-only)
- /mnt/docker/traefik/logs:/logs:ro
# Persist state between restarts
- ./data:/data
environment:
- TZ=America/New_York
# Runs as a long-lived daemon by default; use --once for single-run mode
command: ["python3", "-u", "web-log-monitor.py"]

2
requirements.txt Normal file
View File

@@ -0,0 +1,2 @@
requests>=2.28.0
pyodbc>=5.0.0

View File

@@ -0,0 +1,13 @@
[Unit]
Description=Web Log Security Monitor
After=network.target
[Service]
Type=oneshot
User=aj
ExecStart=/usr/bin/python3 /home/aj/web-log-monitor/web-log-monitor.py
WorkingDirectory=/home/aj/web-log-monitor
# Logging
StandardOutput=append:/var/log/web-log-monitor.log
StandardError=append:/var/log/web-log-monitor.log

View File

@@ -0,0 +1,10 @@
[Unit]
Description=Run Web Log Security Monitor every 10 minutes
[Timer]
OnBootSec=5min
OnUnitActiveSec=10min
Persistent=true
[Install]
WantedBy=timers.target

223
threat_db.py Normal file
View File

@@ -0,0 +1,223 @@
"""
Threat Database Module
Stores and queries historical abuse events in SQL Server on barge.lan.
Maintains a per-IP known_threats profile updated automatically on each event.
"""
import pyodbc
from datetime import datetime, timedelta
# Module-level connection string, set by init_db()
_conn_str = None
def init_db(server, database, username, password):
"""Initialize the database connection string and ensure schema exists."""
global _conn_str
_conn_str = (
f"DRIVER={{ODBC Driver 18 for SQL Server}};"
f"SERVER={server};"
f"DATABASE={database};"
f"UID={username};"
f"PWD={password};"
f"TrustServerCertificate=yes;"
)
_ensure_schema()
def _get_connection():
"""Get a new database connection."""
if _conn_str is None:
raise RuntimeError("Database not initialized. Call init_db() first.")
return pyodbc.connect(_conn_str)
def _ensure_schema():
"""Create tables if they don't exist."""
with _get_connection() as conn:
cursor = conn.cursor()
cursor.execute("""
IF NOT EXISTS (SELECT * FROM sys.tables WHERE name = 'abuse_events')
BEGIN
CREATE TABLE abuse_events (
id INT IDENTITY(1,1) PRIMARY KEY,
timestamp DATETIME2 NOT NULL,
ip VARCHAR(45) NOT NULL,
attack_type VARCHAR(100) NOT NULL,
source VARCHAR(10) NOT NULL,
severity VARCHAR(10),
evidence VARCHAR(500)
);
CREATE INDEX idx_abuse_events_ip ON abuse_events(ip);
CREATE INDEX idx_abuse_events_timestamp ON abuse_events(timestamp);
END
IF NOT EXISTS (SELECT * FROM sys.tables WHERE name = 'known_threats')
BEGIN
CREATE TABLE known_threats (
ip VARCHAR(45) PRIMARY KEY,
first_seen DATETIME2 NOT NULL,
last_seen DATETIME2 NOT NULL,
total_events INT NOT NULL DEFAULT 1,
attack_types VARCHAR(500) NOT NULL,
threat_level VARCHAR(10) NOT NULL DEFAULT 'low',
banned BIT NOT NULL DEFAULT 0,
notes VARCHAR(1000)
);
END
""")
conn.commit()
def _compute_threat_level(total_events, attack_types_list):
"""Determine threat level based on event count and attack diversity."""
unique_types = len(attack_types_list)
if total_events >= 20 or unique_types >= 4:
return "critical"
if total_events >= 10 or unique_types >= 3:
return "high"
if total_events >= 3 or unique_types >= 2:
return "medium"
return "low"
def _update_known_threat(cursor, ip, attack_type, now):
"""Insert or update the known_threats record for an IP."""
cursor.execute("SELECT total_events, attack_types FROM known_threats WHERE ip = ?", ip)
row = cursor.fetchone()
if row is None:
level = _compute_threat_level(1, [attack_type])
cursor.execute(
"INSERT INTO known_threats (ip, first_seen, last_seen, total_events, attack_types, threat_level) "
"VALUES (?, ?, ?, 1, ?, ?)",
ip, now, now, attack_type, level
)
else:
new_count = row.total_events + 1
existing_types = [t.strip() for t in row.attack_types.split(",") if t.strip()]
if attack_type not in existing_types:
existing_types.append(attack_type)
types_str = ", ".join(existing_types)
# Truncate to fit column
if len(types_str) > 500:
types_str = types_str[:497] + "..."
level = _compute_threat_level(new_count, existing_types)
cursor.execute(
"UPDATE known_threats SET last_seen = ?, total_events = ?, attack_types = ?, threat_level = ? "
"WHERE ip = ?",
now, new_count, types_str, level, ip
)
def record_event(ip, attack_type, source, severity=None, evidence=None):
"""Insert an abuse event and update the known_threats profile for the IP."""
now = datetime.utcnow()
with _get_connection() as conn:
cursor = conn.cursor()
cursor.execute(
"INSERT INTO abuse_events (timestamp, ip, attack_type, source, severity, evidence) "
"VALUES (?, ?, ?, ?, ?, ?)",
now, ip, attack_type, source, severity, evidence[:500] if evidence else None
)
_update_known_threat(cursor, ip, attack_type, now)
conn.commit()
def lookup_ip(ip):
"""Look up historical threat data for an IP address.
Returns the known_threats profile if it exists, otherwise indicates unknown.
"""
with _get_connection() as conn:
cursor = conn.cursor()
cursor.execute(
"SELECT first_seen, last_seen, total_events, attack_types, threat_level, banned, notes "
"FROM known_threats WHERE ip = ?", ip
)
row = cursor.fetchone()
if row is None:
return {"ip": ip, "known_threat": False, "total_events": 0}
return {
"ip": ip,
"known_threat": True,
"first_seen": row.first_seen.isoformat(),
"last_seen": row.last_seen.isoformat(),
"total_events": row.total_events,
"attack_types": row.attack_types,
"threat_level": row.threat_level,
"banned": bool(row.banned),
"notes": row.notes,
}
def get_threat_summary(hours=24):
"""Get aggregate threat statistics for the last N hours."""
since = datetime.utcnow() - timedelta(hours=hours)
with _get_connection() as conn:
cursor = conn.cursor()
cursor.execute(
"SELECT COUNT(*) FROM abuse_events WHERE timestamp >= ?", since
)
total = cursor.fetchone()[0]
cursor.execute(
"SELECT COUNT(DISTINCT ip) FROM abuse_events WHERE timestamp >= ?", since
)
unique_ips = cursor.fetchone()[0]
cursor.execute(
"SELECT TOP 10 attack_type, COUNT(*) as cnt FROM abuse_events "
"WHERE timestamp >= ? GROUP BY attack_type ORDER BY cnt DESC", since
)
top_attacks = {row.attack_type: row.cnt for row in cursor.fetchall()}
cursor.execute(
"SELECT TOP 10 ip, COUNT(*) as cnt FROM abuse_events "
"WHERE timestamp >= ? GROUP BY ip ORDER BY cnt DESC", since
)
top_ips = {row.ip: row.cnt for row in cursor.fetchall()}
# Include repeat offenders from known_threats
cursor.execute(
"SELECT TOP 5 ip, total_events, threat_level, attack_types "
"FROM known_threats ORDER BY total_events DESC"
)
top_offenders = [
{"ip": r.ip, "total_events": r.total_events, "threat_level": r.threat_level, "attack_types": r.attack_types}
for r in cursor.fetchall()
]
return {
"hours": hours,
"total_events": total,
"unique_ips": unique_ips,
"top_attack_types": top_attacks,
"top_source_ips": top_ips,
"top_known_offenders": top_offenders,
}
def get_recent_attacks(hours=1):
"""Get list of recent attack events from the last N hours."""
since = datetime.utcnow() - timedelta(hours=hours)
with _get_connection() as conn:
cursor = conn.cursor()
cursor.execute(
"SELECT TOP 50 timestamp, ip, attack_type, source, severity, evidence "
"FROM abuse_events WHERE timestamp >= ? ORDER BY timestamp DESC", since
)
return [
{
"timestamp": row.timestamp.isoformat(),
"ip": row.ip,
"attack_type": row.attack_type,
"source": row.source,
"severity": row.severity,
"evidence": row.evidence,
}
for row in cursor.fetchall()
]

790
web-log-monitor.py Normal file
View File

@@ -0,0 +1,790 @@
#!/usr/bin/env python3
"""
Web Log Security Monitor
Real-time daemon that analyzes Traefik access logs using local LLM on athena.lan.
Sends alerts via Gotify when suspicious activity is detected.
Uses LLM tool calling to query historical threat data for context-aware analysis.
"""
import argparse
import os
import signal
import subprocess
import requests
import json
import re
import time
from collections import Counter
from datetime import datetime
from pathlib import Path
import ipaddress
import threat_db
# Configuration from environment variables
LLAMA_URL = os.environ.get("LLAMA_URL", "http://localhost:11434/v1/chat/completions")
MODEL = os.environ.get("MODEL", "Qwen3-8B-Q6_K")
GOTIFY_URL = os.environ.get("GOTIFY_URL", "")
GOTIFY_TOKEN = os.environ.get("GOTIFY_TOKEN", "")
LOG_PATH = os.environ.get("LOG_PATH", "/logs/access.log")
LOG_MODE = os.environ.get("LOG_MODE", "local")
BARGE_HOST = os.environ.get("BARGE_HOST", "barge.lan")
ABUSE_LOG = os.environ.get("ABUSE_LOG", "/data/abuse.log")
STATE_DIR = os.environ.get("STATE_DIR", "/data")
BATCH_SIZE = int(os.environ.get("BATCH_SIZE", "100"))
MAX_LINES_PER_RUN = int(os.environ.get("MAX_LINES_PER_RUN", "1000"))
LLM_INTERVAL = int(os.environ.get("LLM_INTERVAL", "25"))
TAIL_POLL_INTERVAL = int(os.environ.get("TAIL_POLL_INTERVAL", "1"))
DB_SERVER = os.environ.get("DB_SERVER")
DB_NAME = os.environ.get("DB_NAME")
DB_USER = os.environ.get("DB_USER")
DB_PASSWORD = os.environ.get("DB_PASSWORD")
# State file location
if STATE_DIR:
STATE_FILE = Path(STATE_DIR) / ".web-log-monitor-state"
else:
SCRIPT_DIR = Path(__file__).parent
STATE_FILE = SCRIPT_DIR / ".web-log-monitor-state"
# Pre-filter patterns to reduce LLM load (obvious attacks get flagged immediately)
IMMEDIATE_ALERT_PATTERNS = [
(r"(?:union\s+select|select\s+.*\s+from|insert\s+into|delete\s+from|drop\s+table)", "SQL Injection"),
(r"(?:\.\./|\.\.\\|%2e%2e%2f|%2e%2e/|\.\.%2f)", "Path Traversal"),
(r"(?:<script|javascript:|onerror=|onload=|%3cscript)", "XSS Attempt"),
(r"(?:/etc/passwd|/etc/shadow|/proc/self)", "LFI Attempt"),
(r"(?:;\s*(?:ls|cat|wget|curl|bash|sh|nc)\s)", "Command Injection"),
(r"(?:wp-login|wp-admin|xmlrpc\.php).*(?:POST|HEAD)", "WordPress Scan"),
(r"(?:phpmyadmin|pma|mysqladmin)", "DB Admin Scan"),
(r"(?:\.env|\.git/|\.aws/|\.ssh/)", "Sensitive File Probe"),
]
SYSTEM_PROMPT = """/no_think
You are a security analyst reviewing web server access logs. You have access to a historical threat database that you can query using tool calls.
Analyze the logs for suspicious patterns including:
1. Reconnaissance: Port scans, directory enumeration, technology fingerprinting
2. Brute force: Repeated auth failures, credential stuffing patterns
3. Injection attempts: SQL, command, LDAP, XPath injection
4. Path traversal: Attempts to access files outside webroot
5. Scanner signatures: Known vulnerability scanners, malicious bots
6. Anomalies: Unusual request patterns, suspicious user agents, weird timing
IMPORTANT: When you see suspicious IPs, use the lookup_ip tool to check if they are repeat offenders. Use get_threat_summary to understand the current threat landscape. Use get_recent_attacks to check for ongoing attack campaigns. This historical context should inform your severity ratings — repeat offenders and IPs involved in multiple attack types should be rated more severely.
After your analysis (and any tool calls), respond with JSON only (no markdown, no explanation):
{
"suspicious": true/false,
"findings": [
{
"severity": "low|medium|high|critical",
"type": "category of attack",
"ip": "source IP if identifiable",
"evidence": "specific log line or pattern",
"recommendation": "brief action to take"
}
],
"summary": "one sentence overview"
}
If nothing suspicious, return: {"suspicious": false, "findings": [], "summary": "No suspicious activity detected"}
Be conservative - normal traffic like health checks, static assets, and authenticated user activity is not suspicious."""
# Tool definitions for the LLM (OpenAI function-calling format)
LLM_TOOLS = [
{
"type": "function",
"function": {
"name": "lookup_ip",
"description": "Look up historical threat data for an IP address. Returns past event count, attack types, and first/last seen timestamps. Use this to check if an IP is a repeat offender.",
"parameters": {
"type": "object",
"properties": {
"ip": {
"type": "string",
"description": "The IP address to look up"
}
},
"required": ["ip"]
}
}
},
{
"type": "function",
"function": {
"name": "get_threat_summary",
"description": "Get aggregate threat statistics for the last N hours. Returns total events, unique IPs, top attack types, and top source IPs.",
"parameters": {
"type": "object",
"properties": {
"hours": {
"type": "integer",
"description": "Number of hours to look back (default 24)"
}
},
"required": []
}
}
},
{
"type": "function",
"function": {
"name": "get_recent_attacks",
"description": "Get list of recent attack events. Use to check for ongoing attack campaigns or patterns.",
"parameters": {
"type": "object",
"properties": {
"hours": {
"type": "integer",
"description": "Number of hours to look back (default 1)"
}
},
"required": []
}
}
},
]
# Graceful shutdown flag
_shutdown = False
def _handle_signal(signum, frame):
global _shutdown
print(f"\n[{datetime.now().isoformat()}] Received signal {signum}, shutting down...")
_shutdown = True
def get_last_position():
"""Get the last processed byte position from state file."""
if STATE_FILE.exists():
try:
return int(STATE_FILE.read_text().strip())
except (ValueError, IOError):
pass
return 0
def save_position(position):
"""Save current byte position to state file."""
STATE_FILE.write_text(str(position))
def tail_log(path):
"""Generator that yields new lines from a log file, handling rotation.
Uses byte-position tracking. On each poll:
- If file shrank (rotation), reset to beginning.
- Read any new bytes, yield complete lines.
"""
pos = get_last_position()
while not _shutdown:
try:
size = os.path.getsize(path)
except OSError:
time.sleep(TAIL_POLL_INTERVAL)
continue
# File shrank — log was rotated
if size < pos:
print(f"[{datetime.now().isoformat()}] Log rotation detected, resetting to beginning")
pos = 0
if size == pos:
time.sleep(TAIL_POLL_INTERVAL)
continue
try:
with open(path, "r", encoding="utf-8", errors="replace") as f:
f.seek(pos)
data = f.read()
new_pos = f.tell()
except OSError as e:
print(f"Error reading log file: {e}")
time.sleep(TAIL_POLL_INTERVAL)
continue
# Split into lines, keeping any incomplete trailing line for next read
lines = data.split("\n")
if data.endswith("\n"):
# All lines complete
complete_lines = [l for l in lines if l]
pos = new_pos
else:
# Last line is incomplete — don't advance past it
complete_lines = [l for l in lines[:-1] if l]
pos = new_pos - len(lines[-1].encode("utf-8"))
for line in complete_lines:
yield line, pos
# Generator exits on shutdown
def fetch_logs(start_line, max_lines, total_lines=None):
"""Fetch logs from log file (local or via SSH). Used in --once mode."""
if LOG_MODE == "local":
try:
# Use tail for efficient reading from end of large files
if total_lines is None:
total_lines = get_total_lines()
lines_from_end = total_lines - start_line
if lines_from_end <= 0:
return []
cmd = f"tail -n {lines_from_end} '{LOG_PATH}' | head -n {max_lines}"
result = subprocess.run(cmd, shell=True, capture_output=True, encoding="utf-8", errors="replace", timeout=60)
if result.returncode != 0:
raise RuntimeError(f"Failed to read logs: {result.stderr}")
return result.stdout.strip().split("\n") if result.stdout.strip() else []
except Exception as e:
raise RuntimeError(f"Failed to read logs: {e}")
else:
cmd = f"ssh {BARGE_HOST} \"tail -n +{start_line + 1} {LOG_PATH} | head -n {max_lines}\""
result = subprocess.run(cmd, shell=True, capture_output=True, encoding="utf-8", errors="replace", timeout=60)
if result.returncode != 0:
raise RuntimeError(f"Failed to fetch logs: {result.stderr}")
return result.stdout.strip().split("\n") if result.stdout.strip() else []
def get_total_lines():
"""Get total line count of the log file."""
if LOG_MODE == "local":
cmd = f"wc -l < '{LOG_PATH}'"
result = subprocess.run(cmd, shell=True, capture_output=True, encoding="utf-8", errors="replace", timeout=30)
if result.returncode != 0:
raise RuntimeError(f"Failed to count lines: {result.stderr}")
return int(result.stdout.strip())
else:
cmd = f"ssh {BARGE_HOST} \"wc -l < {LOG_PATH}\""
result = subprocess.run(cmd, shell=True, capture_output=True, encoding="utf-8", errors="replace", timeout=30)
return int(result.stdout.strip())
def parse_log_line(line):
"""Parse a Traefik common log format line."""
pattern = r'^(\S+) - - \[([^\]]+)\] "([^"]+)" (\d+) (\d+) "([^"]*)" "([^"]*)" (\d+) "([^"]*)" "([^"]*)" (\d+)ms$'
match = re.match(pattern, line)
if match:
return {
"ip": match.group(1),
"timestamp": match.group(2),
"request": match.group(3),
"status": int(match.group(4)),
"bytes": int(match.group(5)),
"referer": match.group(6),
"user_agent": match.group(7),
"request_id": match.group(8),
"service": match.group(9),
"backend": match.group(10),
"latency_ms": int(match.group(11)),
}
return None
def is_private_ip(ip_str):
"""Check if an IP address is private/internal."""
try:
return ipaddress.ip_address(ip_str).is_private
except ValueError:
return False
def log_abuse(ip, attack_type):
"""Write abuse detection to log file for fail2ban consumption."""
if not ABUSE_LOG:
return
if is_private_ip(ip):
return
timestamp = datetime.now().strftime("%Y-%m-%dT%H:%M:%S")
entry = f"{timestamp} web-log-monitor: abuse from {ip} - {attack_type}\n"
try:
with open(ABUSE_LOG, "a") as f:
f.write(entry)
except OSError as e:
print(f"Failed to write abuse log: {e}")
def check_immediate_alerts(logs, record=True):
"""Check for patterns that warrant immediate alerting without LLM.
When record=True (daemon mode), also records events to the threat DB.
"""
alerts = []
for line in logs:
for pattern, attack_type in IMMEDIATE_ALERT_PATTERNS:
if re.search(pattern, line, re.IGNORECASE):
parsed = parse_log_line(line)
if parsed:
log_abuse(parsed["ip"], attack_type)
if record and DB_SERVER:
try:
threat_db.record_event(
parsed["ip"], attack_type, "regex",
severity="high", evidence=line[:500]
)
except Exception as e:
print(f"Failed to record event to DB: {e}")
alerts.append({
"severity": "high",
"type": attack_type,
"evidence": line[:200],
"recommendation": "Review and consider blocking source IP"
})
break # One alert per line
return alerts
def filter_noise(logs):
"""Filter out known-good traffic to reduce LLM processing."""
filtered = []
for line in logs:
parsed = parse_log_line(line)
if not parsed:
continue
# Skip internal health checks from Firewalla/monitoring
if parsed["ip"] == "192.168.10.1" and "/health" in parsed["request"]:
continue
# Skip successful static asset requests
if parsed["status"] == 200 and re.search(r"\.(css|js|png|jpg|ico|woff|svg)(\?|$)", parsed["request"]):
continue
# Skip public Gitea repository browsing (actual attacks still caught by regex alerts)
if "gitea" in parsed["service"].lower():
continue
# Keep everything else for analysis
filtered.append(line)
return filtered
def execute_tool_call(name, arguments):
"""Execute an LLM tool call against the threat database."""
try:
if name == "lookup_ip":
return threat_db.lookup_ip(arguments["ip"])
elif name == "get_threat_summary":
hours = arguments.get("hours", 24)
return threat_db.get_threat_summary(hours)
elif name == "get_recent_attacks":
hours = arguments.get("hours", 1)
return threat_db.get_recent_attacks(hours)
else:
return {"error": f"Unknown tool: {name}"}
except Exception as e:
return {"error": str(e)}
def analyze_with_llm(logs):
"""Send logs to llama.cpp for analysis with tool-calling support."""
if not logs:
return {"suspicious": False, "findings": [], "summary": "No logs to analyze"}
log_text = "\n".join(logs[:BATCH_SIZE])
messages = [
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": f"Analyze these access logs:\n\n{log_text}"}
]
# Include tools only if the threat DB is available
use_tools = DB_SERVER is not None
max_rounds = 3
for round_num in range(max_rounds + 1):
payload = {
"model": MODEL,
"messages": messages,
"temperature": 0.1,
"max_tokens": 1024,
}
if use_tools and round_num < max_rounds:
payload["tools"] = LLM_TOOLS
try:
response = requests.post(LLAMA_URL, json=payload, timeout=120)
response.raise_for_status()
choice = response.json()["choices"][0]
message = choice["message"]
except requests.exceptions.RequestException as e:
print(f"LLM request failed: {e}")
return None
except (KeyError, IndexError) as e:
print(f"Unexpected LLM response format: {e}")
return None
# Check if the LLM wants to call tools
tool_calls = message.get("tool_calls")
if tool_calls and round_num < max_rounds:
# Add the assistant message with tool calls to conversation
messages.append(message)
for tc in tool_calls:
func = tc["function"]
try:
args = json.loads(func["arguments"]) if isinstance(func["arguments"], str) else func["arguments"]
except json.JSONDecodeError:
args = {}
result = execute_tool_call(func["name"], args)
result_json = json.dumps(result)
print(f" Tool call: {func['name']}({args}) -> {result_json[:200]}")
messages.append({
"role": "tool",
"tool_call_id": tc["id"],
"content": result_json
})
# Continue loop — send tool results back to LLM
continue
# Final text response — parse JSON
content = message.get("content", "")
try:
json_match = re.search(r'\{[\s\S]*\}', content)
if json_match:
return json.loads(json_match.group())
return {"suspicious": False, "findings": [], "summary": "Failed to parse LLM response"}
except json.JSONDecodeError as e:
print(f"Failed to parse LLM response: {e}")
print(f"Raw response: {content[:500]}")
return None
# Exhausted tool-call rounds without a final answer
print("Warning: LLM exhausted tool-call rounds without a final response")
return {"suspicious": False, "findings": [], "summary": "Analysis incomplete (tool-call limit)"}
def send_alert(title, message, priority=5):
"""Send alert via Gotify."""
if GOTIFY_TOKEN == "YOUR_TOKEN_HERE":
print(f"[ALERT - Gotify not configured] {title}: {message}")
return
try:
response = requests.post(
GOTIFY_URL,
params={"token": GOTIFY_TOKEN},
json={
"title": title,
"message": message,
"priority": priority,
},
timeout=10
)
response.raise_for_status()
print(f"Alert sent: {title}")
except requests.exceptions.RequestException as e:
print(f"Failed to send Gotify alert: {e}")
def format_findings(findings):
"""Format findings for alert message."""
lines = []
for f in findings[:5]: # Limit to 5 findings per alert
lines.append(f"[{f['severity'].upper()}] {f['type']}")
if f.get("ip"):
lines.append(f" IP: {f['ip']}")
lines.append(f" Evidence: {f['evidence'][:100]}...")
lines.append(f" Action: {f['recommendation']}")
lines.append("")
return "\n".join(lines)
def print_log_stats(logs):
"""Print verbose statistics about the logs."""
if not logs:
print("\n No logs to analyze")
return
parsed_logs = [parse_log_line(line) for line in logs]
parsed_logs = [p for p in parsed_logs if p] # Filter None
if not parsed_logs:
print("\n No parseable logs found")
return
# Time range
timestamps = [p["timestamp"] for p in parsed_logs]
print(f"\n{'' * 60}")
print("LOG STATISTICS")
print(f"{'' * 60}")
print(f" Time range: {timestamps[0]}")
print(f" {timestamps[-1]}")
print(f" Total entries: {len(parsed_logs)}")
# Status codes
status_counts = Counter(p["status"] for p in parsed_logs)
print(f"\n Status Codes:")
for status, count in sorted(status_counts.items()):
bar = "" * min(count // 5, 30)
print(f" {status}: {count:>5} {bar}")
# Top IPs
ip_counts = Counter(p["ip"] for p in parsed_logs)
print(f"\n Top Source IPs:")
for ip, count in ip_counts.most_common(5):
pct = (count / len(parsed_logs)) * 100
print(f" {ip:<18} {count:>5} ({pct:.1f}%)")
# Services
service_counts = Counter(p["service"] for p in parsed_logs)
print(f"\n Requests by Service:")
for service, count in service_counts.most_common(10):
pct = (count / len(parsed_logs)) * 100
print(f" {service:<25} {count:>5} ({pct:.1f}%)")
# HTTP Methods
method_counts = Counter()
for p in parsed_logs:
method = p["request"].split()[0] if p["request"] else "UNKNOWN"
method_counts[method] += 1
print(f"\n HTTP Methods:")
for method, count in method_counts.most_common():
print(f" {method:<8} {count:>5}")
# Latency stats
latencies = [p["latency_ms"] for p in parsed_logs]
avg_latency = sum(latencies) / len(latencies)
max_latency = max(latencies)
print(f"\n Latency:")
print(f" Average: {avg_latency:.1f}ms")
print(f" Max: {max_latency}ms")
# Error rate
errors = sum(1 for p in parsed_logs if p["status"] >= 400)
error_rate = (errors / len(parsed_logs)) * 100
print(f"\n Error rate: {error_rate:.1f}% ({errors} requests)")
print(f"{'' * 60}\n")
def process_batch(logs, args):
"""Process a batch of log lines: immediate alerts + LLM analysis + alerting."""
all_findings = []
# Check for immediate alert patterns
immediate_alerts = check_immediate_alerts(logs)
if immediate_alerts:
all_findings.extend(immediate_alerts)
print(f" Found {len(immediate_alerts)} immediate alert patterns")
# Filter noise and analyze with LLM
filtered_logs = filter_noise(logs)
print(f" Filtered {len(logs)} -> {len(filtered_logs)} logs for LLM analysis")
if filtered_logs:
llm_available = True
for i in range(0, len(filtered_logs), BATCH_SIZE):
if not llm_available:
print(" Skipping remaining batches (LLM unavailable)")
break
batch = filtered_logs[i:i + BATCH_SIZE]
print(f" Analyzing batch {i // BATCH_SIZE + 1} ({len(batch)} logs)...")
result = analyze_with_llm(batch)
if result is None:
llm_available = False
continue
if result.get("suspicious"):
findings = result.get("findings", [])
# Record LLM findings to threat DB
if DB_SERVER:
for f in findings:
ip = f.get("ip")
if ip and not is_private_ip(ip):
try:
threat_db.record_event(
ip, f.get("type", "unknown"), "llm",
severity=f.get("severity"),
evidence=f.get("evidence", "")[:500]
)
except Exception as e:
print(f" Failed to record LLM finding to DB: {e}")
log_abuse(ip, f.get("type", "unknown"))
all_findings.extend(findings)
print(f" LLM found suspicious activity: {result.get('summary')}")
# Send consolidated alert if findings exist
if all_findings:
severity_order = {"critical": 0, "high": 1, "medium": 2, "low": 3}
all_findings.sort(key=lambda x: severity_order.get(x.get("severity", "low"), 3))
max_severity = all_findings[0].get("severity", "low")
priority = {"critical": 10, "high": 8, "medium": 5, "low": 3}.get(max_severity, 5)
alert_msg = format_findings(all_findings)
if args.verbose:
print(f"\n{'' * 60}")
print("FINDINGS")
print(f"{'' * 60}")
print(alert_msg)
if not args.dry_run:
send_alert(
f"Web Security Alert ({len(all_findings)} findings)",
alert_msg,
priority=priority
)
else:
print(f" [Dry run] Would send alert with {len(all_findings)} findings")
else:
print(" No suspicious activity detected")
def run_daemon(args):
"""Run as a long-lived daemon, tailing the log file in real time."""
print(f"[{datetime.now().isoformat()}] Starting daemon mode")
print(f" Log file: {LOG_PATH}")
print(f" LLM interval: {LLM_INTERVAL}s")
print(f" Poll interval: {TAIL_POLL_INTERVAL}s")
if DB_SERVER:
print(f" Threat DB: {DB_SERVER}/{DB_NAME}")
else:
print(" Threat DB: disabled (no DB_SERVER configured)")
buffer = []
last_flush = time.monotonic()
last_save = time.monotonic()
current_pos = get_last_position()
for line, pos in tail_log(LOG_PATH):
current_pos = pos
# Immediate regex check on every line
immediate = check_immediate_alerts([line])
if immediate:
print(f"[{datetime.now().isoformat()}] Immediate alert: {immediate[0]['type']}")
buffer.append(line)
now = time.monotonic()
# Flush buffer to LLM at interval
if now - last_flush >= LLM_INTERVAL and buffer:
# Cap to BATCH_SIZE to avoid overwhelming the LLM with huge backlogs.
# Remaining lines carry forward to the next interval.
flush_batch = buffer[:BATCH_SIZE]
buffer = buffer[BATCH_SIZE:]
if buffer:
print(f"\n[{datetime.now().isoformat()}] Flushing {len(flush_batch)} lines to LLM ({len(buffer)} deferred)...")
else:
print(f"\n[{datetime.now().isoformat()}] Flushing {len(flush_batch)} lines to LLM...")
if args.verbose:
print_log_stats(flush_batch)
process_batch(flush_batch, args)
# Reset timer AFTER processing so we don't immediately flush again
last_flush = time.monotonic()
# Save position periodically
if now - last_save >= 30:
if not args.dry_run:
save_position(current_pos)
last_save = now
# Shutdown: flush remaining buffer and save state
if buffer:
print(f"\n[{datetime.now().isoformat()}] Shutdown: flushing {len(buffer)} remaining lines...")
process_batch(buffer, args)
if not args.dry_run:
save_position(current_pos)
print(f"Final position saved: {current_pos}")
def run_once(args):
"""Run a single analysis pass (original cron-job behavior)."""
print(f"[{datetime.now().isoformat()}] Starting single-run analysis...")
if args.dry_run:
print("(Dry run - no alerts will be sent, state will not be updated)")
# Get current position
last_pos = get_last_position()
total_lines = get_total_lines()
# If first run or log rotated, start from recent logs
if last_pos == 0 or last_pos > total_lines:
last_pos = max(0, total_lines - MAX_LINES_PER_RUN)
print(f"Starting from line {last_pos} (total: {total_lines})")
lines_to_process = min(total_lines - last_pos, MAX_LINES_PER_RUN)
if lines_to_process <= 0:
print("No new logs to process")
return
print(f"Processing {lines_to_process} new log lines...")
# Fetch logs
logs = fetch_logs(last_pos, lines_to_process, total_lines)
if not logs:
print("No logs fetched")
return
# Show verbose stats if requested
if args.verbose:
print_log_stats(logs)
process_batch(logs, args)
# Save position
if not args.dry_run:
save_position(last_pos + len(logs))
print(f"Processed up to line {last_pos + len(logs)}")
else:
print(f"[Dry run] Would save position: {last_pos + len(logs)}")
def parse_args():
"""Parse command line arguments."""
parser = argparse.ArgumentParser(
description="Analyze Traefik access logs for suspicious activity using LLM"
)
parser.add_argument(
"-v", "--verbose",
action="store_true",
help="Show detailed log statistics"
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Analyze logs but don't send alerts or update state"
)
parser.add_argument(
"--once",
action="store_true",
help="Run a single analysis pass and exit (original cron-job mode)"
)
return parser.parse_args()
def main():
args = parse_args()
# Initialize threat database if configured
if DB_SERVER and DB_NAME and DB_USER and DB_PASSWORD:
try:
threat_db.init_db(DB_SERVER, DB_NAME, DB_USER, DB_PASSWORD)
print(f"Threat database initialized: {DB_SERVER}/{DB_NAME}")
except Exception as e:
print(f"Warning: Failed to initialize threat database: {e}")
print("Continuing without historical threat data.")
else:
print("Threat database not configured — running without historical data.")
if args.once:
run_once(args)
else:
# Daemon mode — register signal handlers for graceful shutdown
signal.signal(signal.SIGTERM, _handle_signal)
signal.signal(signal.SIGINT, _handle_signal)
run_daemon(args)
if __name__ == "__main__":
main()