commit e3ee9fc1936b9f346d42b0ffa72f1e9e8855014c Author: AJ Isaacs Date: Sun Feb 8 19:29:09 2026 -0500 Replace config.py with .env for Docker-standard configuration Config was a Python file baked into the image or bind-mounted, requiring a rebuild or manual file management for any settings change. Now uses env_file in docker-compose with os.environ.get() calls, so config changes only need a container restart. Also filters Gitea traffic from LLM analysis to prevent false positive reconnaissance alerts on normal repository browsing. Co-Authored-By: Claude Opus 4.6 diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..5fe80ec --- /dev/null +++ b/.env.example @@ -0,0 +1,39 @@ +# Web Log Monitor Configuration +# Copy this file to .env and fill in your values. + +# LLM Configuration +LLAMA_URL=http://athena.lan:11434/v1/chat/completions +MODEL=Qwen3-8B-Q6_K + +# Gotify Configuration +GOTIFY_URL=https://notify.thecozycat.net/message +GOTIFY_TOKEN=YOUR_TOKEN_HERE + +# Log Source Configuration +# LOG_MODE: "local" for direct file access (Docker), "ssh" for remote access +LOG_PATH=/logs/access.log +LOG_MODE=local + +# SSH settings (only used if LOG_MODE=ssh) +BARGE_HOST=barge.lan + +# Abuse log for fail2ban +ABUSE_LOG=/data/abuse.log + +# State file directory +STATE_DIR=/data + +# Processing Settings +BATCH_SIZE=100 +MAX_LINES_PER_RUN=1000 + +# Daemon Settings +LLM_INTERVAL=25 +TAIL_POLL_INTERVAL=1 + +# Threat Database (SQL Server) +# Set all four values to enable historical threat tracking. +#DB_SERVER=barge.lan,1433 +#DB_NAME=ThreatDB +#DB_USER=weblogmonitor +#DB_PASSWORD=your_password diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..e0aa205 --- /dev/null +++ b/.gitignore @@ -0,0 +1,5 @@ +.env +config.py +__pycache__/ +data/ +cron.log diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..8a87bc6 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,27 @@ +FROM python:3.12-slim-bookworm + +WORKDIR /app + +# Install Microsoft ODBC Driver 18 for SQL Server +RUN apt-get update \ + && apt-get install -y --no-install-recommends curl gnupg2 apt-transport-https \ + && curl -fsSL https://packages.microsoft.com/keys/microsoft.asc | gpg --dearmor -o /usr/share/keyrings/microsoft-prod.gpg \ + && echo "deb [arch=amd64 signed-by=/usr/share/keyrings/microsoft-prod.gpg] https://packages.microsoft.com/debian/12/prod bookworm main" > /etc/apt/sources.list.d/mssql-release.list \ + && apt-get update \ + && ACCEPT_EULA=Y apt-get install -y --no-install-recommends msodbcsql18 unixodbc-dev \ + && apt-get purge -y curl gnupg2 apt-transport-https \ + && apt-get autoremove -y \ + && rm -rf /var/lib/apt/lists/* + +# Install Python dependencies +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +# Copy application +COPY web-log-monitor.py . +COPY threat_db.py . + +# Create state directory +RUN mkdir -p /data + +CMD ["python3", "-u", "web-log-monitor.py"] diff --git a/README.md b/README.md new file mode 100644 index 0000000..53e344e --- /dev/null +++ b/README.md @@ -0,0 +1,98 @@ +# Web Log Security Monitor + +Analyzes Traefik access logs using a local LLM (llama.cpp) and sends alerts via Gotify when suspicious activity is detected. + +## Docker Setup (Recommended) + +Run alongside Traefik on barge.lan: + +1. Copy the project to barge: + ```bash + scp -r . barge.lan:/mnt/docker/web-log-monitor/ + ``` + +2. Create config from Docker template: + ```bash + ssh barge.lan + cd /mnt/docker/web-log-monitor + cp config.docker.py config.py + nano config.py # Add your GOTIFY_TOKEN + ``` + +3. Start the container: + ```bash + docker compose up -d + ``` + +4. View logs: + ```bash + docker logs -f web-log-monitor + ``` + +## Standalone Setup + +For running on athena.lan (via SSH to barge): + +1. Copy the config file and add your Gotify token: + ```bash + cp config.example.py config.py + nano config.py # Add your GOTIFY_TOKEN + ``` + +2. Test manually: + ```bash + python3 web-log-monitor.py --verbose --dry-run + ``` + +3. Add to cron (hourly): + ```bash + crontab -e + # Add: 0 * * * * cd /path/to/web-log-monitor && python3 web-log-monitor.py + ``` + +## Configuration + +Edit `config.py`: + +| Setting | Description | +|---------|-------------| +| `LLAMA_URL` | llama.cpp server endpoint | +| `MODEL` | Model name to use | +| `GOTIFY_URL` | Gotify server URL | +| `GOTIFY_TOKEN` | Gotify app token | +| `LOG_MODE` | `"local"` or `"ssh"` | +| `LOG_PATH` | Path to access.log | +| `BARGE_HOST` | SSH host (only for ssh mode) | +| `STATE_DIR` | Directory for state file | +| `BATCH_SIZE` | Lines per LLM call | +| `MAX_LINES_PER_RUN` | Max lines per execution | + +## Command Line Options + +``` +python3 web-log-monitor.py [OPTIONS] + + -v, --verbose Show detailed log statistics + --dry-run Analyze without sending alerts or updating state +``` + +## How It Works + +1. Reads new logs (local file or via SSH) +2. Checks for obvious attack patterns (immediate alerts) +3. Filters noise (health checks, static assets) +4. Sends remaining logs to LLM for analysis +5. Consolidates findings and alerts via Gotify + +## Files + +``` +├── Dockerfile +├── docker-compose.yml +├── config.py # Your config (gitignored) +├── config.example.py # Template for standalone +├── config.docker.py # Template for Docker +├── requirements.txt +├── web-log-monitor.py +└── systemd/ # Optional systemd units +``` diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..05bc988 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,15 @@ +services: + web-log-monitor: + build: . + container_name: web-log-monitor + restart: unless-stopped + env_file: .env + volumes: + # Mount Traefik logs (read-only) + - /mnt/docker/traefik/logs:/logs:ro + # Persist state between restarts + - ./data:/data + environment: + - TZ=America/New_York + # Runs as a long-lived daemon by default; use --once for single-run mode + command: ["python3", "-u", "web-log-monitor.py"] diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..0b4cb1c --- /dev/null +++ b/requirements.txt @@ -0,0 +1,2 @@ +requests>=2.28.0 +pyodbc>=5.0.0 diff --git a/systemd/web-log-monitor.service b/systemd/web-log-monitor.service new file mode 100644 index 0000000..03e4c15 --- /dev/null +++ b/systemd/web-log-monitor.service @@ -0,0 +1,13 @@ +[Unit] +Description=Web Log Security Monitor +After=network.target + +[Service] +Type=oneshot +User=aj +ExecStart=/usr/bin/python3 /home/aj/web-log-monitor/web-log-monitor.py +WorkingDirectory=/home/aj/web-log-monitor + +# Logging +StandardOutput=append:/var/log/web-log-monitor.log +StandardError=append:/var/log/web-log-monitor.log diff --git a/systemd/web-log-monitor.timer b/systemd/web-log-monitor.timer new file mode 100644 index 0000000..0cd1c17 --- /dev/null +++ b/systemd/web-log-monitor.timer @@ -0,0 +1,10 @@ +[Unit] +Description=Run Web Log Security Monitor every 10 minutes + +[Timer] +OnBootSec=5min +OnUnitActiveSec=10min +Persistent=true + +[Install] +WantedBy=timers.target diff --git a/threat_db.py b/threat_db.py new file mode 100644 index 0000000..110e584 --- /dev/null +++ b/threat_db.py @@ -0,0 +1,223 @@ +""" +Threat Database Module +Stores and queries historical abuse events in SQL Server on barge.lan. +Maintains a per-IP known_threats profile updated automatically on each event. +""" + +import pyodbc +from datetime import datetime, timedelta + +# Module-level connection string, set by init_db() +_conn_str = None + + +def init_db(server, database, username, password): + """Initialize the database connection string and ensure schema exists.""" + global _conn_str + _conn_str = ( + f"DRIVER={{ODBC Driver 18 for SQL Server}};" + f"SERVER={server};" + f"DATABASE={database};" + f"UID={username};" + f"PWD={password};" + f"TrustServerCertificate=yes;" + ) + _ensure_schema() + + +def _get_connection(): + """Get a new database connection.""" + if _conn_str is None: + raise RuntimeError("Database not initialized. Call init_db() first.") + return pyodbc.connect(_conn_str) + + +def _ensure_schema(): + """Create tables if they don't exist.""" + with _get_connection() as conn: + cursor = conn.cursor() + cursor.execute(""" + IF NOT EXISTS (SELECT * FROM sys.tables WHERE name = 'abuse_events') + BEGIN + CREATE TABLE abuse_events ( + id INT IDENTITY(1,1) PRIMARY KEY, + timestamp DATETIME2 NOT NULL, + ip VARCHAR(45) NOT NULL, + attack_type VARCHAR(100) NOT NULL, + source VARCHAR(10) NOT NULL, + severity VARCHAR(10), + evidence VARCHAR(500) + ); + CREATE INDEX idx_abuse_events_ip ON abuse_events(ip); + CREATE INDEX idx_abuse_events_timestamp ON abuse_events(timestamp); + END + + IF NOT EXISTS (SELECT * FROM sys.tables WHERE name = 'known_threats') + BEGIN + CREATE TABLE known_threats ( + ip VARCHAR(45) PRIMARY KEY, + first_seen DATETIME2 NOT NULL, + last_seen DATETIME2 NOT NULL, + total_events INT NOT NULL DEFAULT 1, + attack_types VARCHAR(500) NOT NULL, + threat_level VARCHAR(10) NOT NULL DEFAULT 'low', + banned BIT NOT NULL DEFAULT 0, + notes VARCHAR(1000) + ); + END + """) + conn.commit() + + +def _compute_threat_level(total_events, attack_types_list): + """Determine threat level based on event count and attack diversity.""" + unique_types = len(attack_types_list) + if total_events >= 20 or unique_types >= 4: + return "critical" + if total_events >= 10 or unique_types >= 3: + return "high" + if total_events >= 3 or unique_types >= 2: + return "medium" + return "low" + + +def _update_known_threat(cursor, ip, attack_type, now): + """Insert or update the known_threats record for an IP.""" + cursor.execute("SELECT total_events, attack_types FROM known_threats WHERE ip = ?", ip) + row = cursor.fetchone() + + if row is None: + level = _compute_threat_level(1, [attack_type]) + cursor.execute( + "INSERT INTO known_threats (ip, first_seen, last_seen, total_events, attack_types, threat_level) " + "VALUES (?, ?, ?, 1, ?, ?)", + ip, now, now, attack_type, level + ) + else: + new_count = row.total_events + 1 + existing_types = [t.strip() for t in row.attack_types.split(",") if t.strip()] + if attack_type not in existing_types: + existing_types.append(attack_type) + types_str = ", ".join(existing_types) + # Truncate to fit column + if len(types_str) > 500: + types_str = types_str[:497] + "..." + level = _compute_threat_level(new_count, existing_types) + cursor.execute( + "UPDATE known_threats SET last_seen = ?, total_events = ?, attack_types = ?, threat_level = ? " + "WHERE ip = ?", + now, new_count, types_str, level, ip + ) + + +def record_event(ip, attack_type, source, severity=None, evidence=None): + """Insert an abuse event and update the known_threats profile for the IP.""" + now = datetime.utcnow() + with _get_connection() as conn: + cursor = conn.cursor() + cursor.execute( + "INSERT INTO abuse_events (timestamp, ip, attack_type, source, severity, evidence) " + "VALUES (?, ?, ?, ?, ?, ?)", + now, ip, attack_type, source, severity, evidence[:500] if evidence else None + ) + _update_known_threat(cursor, ip, attack_type, now) + conn.commit() + + +def lookup_ip(ip): + """Look up historical threat data for an IP address. + + Returns the known_threats profile if it exists, otherwise indicates unknown. + """ + with _get_connection() as conn: + cursor = conn.cursor() + cursor.execute( + "SELECT first_seen, last_seen, total_events, attack_types, threat_level, banned, notes " + "FROM known_threats WHERE ip = ?", ip + ) + row = cursor.fetchone() + + if row is None: + return {"ip": ip, "known_threat": False, "total_events": 0} + + return { + "ip": ip, + "known_threat": True, + "first_seen": row.first_seen.isoformat(), + "last_seen": row.last_seen.isoformat(), + "total_events": row.total_events, + "attack_types": row.attack_types, + "threat_level": row.threat_level, + "banned": bool(row.banned), + "notes": row.notes, + } + + +def get_threat_summary(hours=24): + """Get aggregate threat statistics for the last N hours.""" + since = datetime.utcnow() - timedelta(hours=hours) + with _get_connection() as conn: + cursor = conn.cursor() + + cursor.execute( + "SELECT COUNT(*) FROM abuse_events WHERE timestamp >= ?", since + ) + total = cursor.fetchone()[0] + + cursor.execute( + "SELECT COUNT(DISTINCT ip) FROM abuse_events WHERE timestamp >= ?", since + ) + unique_ips = cursor.fetchone()[0] + + cursor.execute( + "SELECT TOP 10 attack_type, COUNT(*) as cnt FROM abuse_events " + "WHERE timestamp >= ? GROUP BY attack_type ORDER BY cnt DESC", since + ) + top_attacks = {row.attack_type: row.cnt for row in cursor.fetchall()} + + cursor.execute( + "SELECT TOP 10 ip, COUNT(*) as cnt FROM abuse_events " + "WHERE timestamp >= ? GROUP BY ip ORDER BY cnt DESC", since + ) + top_ips = {row.ip: row.cnt for row in cursor.fetchall()} + + # Include repeat offenders from known_threats + cursor.execute( + "SELECT TOP 5 ip, total_events, threat_level, attack_types " + "FROM known_threats ORDER BY total_events DESC" + ) + top_offenders = [ + {"ip": r.ip, "total_events": r.total_events, "threat_level": r.threat_level, "attack_types": r.attack_types} + for r in cursor.fetchall() + ] + + return { + "hours": hours, + "total_events": total, + "unique_ips": unique_ips, + "top_attack_types": top_attacks, + "top_source_ips": top_ips, + "top_known_offenders": top_offenders, + } + + +def get_recent_attacks(hours=1): + """Get list of recent attack events from the last N hours.""" + since = datetime.utcnow() - timedelta(hours=hours) + with _get_connection() as conn: + cursor = conn.cursor() + cursor.execute( + "SELECT TOP 50 timestamp, ip, attack_type, source, severity, evidence " + "FROM abuse_events WHERE timestamp >= ? ORDER BY timestamp DESC", since + ) + return [ + { + "timestamp": row.timestamp.isoformat(), + "ip": row.ip, + "attack_type": row.attack_type, + "source": row.source, + "severity": row.severity, + "evidence": row.evidence, + } + for row in cursor.fetchall() + ] diff --git a/web-log-monitor.py b/web-log-monitor.py new file mode 100644 index 0000000..dc15e63 --- /dev/null +++ b/web-log-monitor.py @@ -0,0 +1,790 @@ +#!/usr/bin/env python3 +""" +Web Log Security Monitor +Real-time daemon that analyzes Traefik access logs using local LLM on athena.lan. +Sends alerts via Gotify when suspicious activity is detected. +Uses LLM tool calling to query historical threat data for context-aware analysis. +""" + +import argparse +import os +import signal +import subprocess +import requests +import json +import re +import time +from collections import Counter +from datetime import datetime +from pathlib import Path +import ipaddress + +import threat_db + +# Configuration from environment variables +LLAMA_URL = os.environ.get("LLAMA_URL", "http://localhost:11434/v1/chat/completions") +MODEL = os.environ.get("MODEL", "Qwen3-8B-Q6_K") +GOTIFY_URL = os.environ.get("GOTIFY_URL", "") +GOTIFY_TOKEN = os.environ.get("GOTIFY_TOKEN", "") +LOG_PATH = os.environ.get("LOG_PATH", "/logs/access.log") +LOG_MODE = os.environ.get("LOG_MODE", "local") +BARGE_HOST = os.environ.get("BARGE_HOST", "barge.lan") +ABUSE_LOG = os.environ.get("ABUSE_LOG", "/data/abuse.log") +STATE_DIR = os.environ.get("STATE_DIR", "/data") +BATCH_SIZE = int(os.environ.get("BATCH_SIZE", "100")) +MAX_LINES_PER_RUN = int(os.environ.get("MAX_LINES_PER_RUN", "1000")) +LLM_INTERVAL = int(os.environ.get("LLM_INTERVAL", "25")) +TAIL_POLL_INTERVAL = int(os.environ.get("TAIL_POLL_INTERVAL", "1")) +DB_SERVER = os.environ.get("DB_SERVER") +DB_NAME = os.environ.get("DB_NAME") +DB_USER = os.environ.get("DB_USER") +DB_PASSWORD = os.environ.get("DB_PASSWORD") + +# State file location +if STATE_DIR: + STATE_FILE = Path(STATE_DIR) / ".web-log-monitor-state" +else: + SCRIPT_DIR = Path(__file__).parent + STATE_FILE = SCRIPT_DIR / ".web-log-monitor-state" + +# Pre-filter patterns to reduce LLM load (obvious attacks get flagged immediately) +IMMEDIATE_ALERT_PATTERNS = [ + (r"(?:union\s+select|select\s+.*\s+from|insert\s+into|delete\s+from|drop\s+table)", "SQL Injection"), + (r"(?:\.\./|\.\.\\|%2e%2e%2f|%2e%2e/|\.\.%2f)", "Path Traversal"), + (r"(?: {result_json[:200]}") + + messages.append({ + "role": "tool", + "tool_call_id": tc["id"], + "content": result_json + }) + + # Continue loop — send tool results back to LLM + continue + + # Final text response — parse JSON + content = message.get("content", "") + try: + json_match = re.search(r'\{[\s\S]*\}', content) + if json_match: + return json.loads(json_match.group()) + return {"suspicious": False, "findings": [], "summary": "Failed to parse LLM response"} + except json.JSONDecodeError as e: + print(f"Failed to parse LLM response: {e}") + print(f"Raw response: {content[:500]}") + return None + + # Exhausted tool-call rounds without a final answer + print("Warning: LLM exhausted tool-call rounds without a final response") + return {"suspicious": False, "findings": [], "summary": "Analysis incomplete (tool-call limit)"} + + +def send_alert(title, message, priority=5): + """Send alert via Gotify.""" + if GOTIFY_TOKEN == "YOUR_TOKEN_HERE": + print(f"[ALERT - Gotify not configured] {title}: {message}") + return + + try: + response = requests.post( + GOTIFY_URL, + params={"token": GOTIFY_TOKEN}, + json={ + "title": title, + "message": message, + "priority": priority, + }, + timeout=10 + ) + response.raise_for_status() + print(f"Alert sent: {title}") + except requests.exceptions.RequestException as e: + print(f"Failed to send Gotify alert: {e}") + + +def format_findings(findings): + """Format findings for alert message.""" + lines = [] + for f in findings[:5]: # Limit to 5 findings per alert + lines.append(f"[{f['severity'].upper()}] {f['type']}") + if f.get("ip"): + lines.append(f" IP: {f['ip']}") + lines.append(f" Evidence: {f['evidence'][:100]}...") + lines.append(f" Action: {f['recommendation']}") + lines.append("") + return "\n".join(lines) + + +def print_log_stats(logs): + """Print verbose statistics about the logs.""" + if not logs: + print("\n No logs to analyze") + return + + parsed_logs = [parse_log_line(line) for line in logs] + parsed_logs = [p for p in parsed_logs if p] # Filter None + + if not parsed_logs: + print("\n No parseable logs found") + return + + # Time range + timestamps = [p["timestamp"] for p in parsed_logs] + print(f"\n{'─' * 60}") + print("LOG STATISTICS") + print(f"{'─' * 60}") + print(f" Time range: {timestamps[0]}") + print(f" {timestamps[-1]}") + print(f" Total entries: {len(parsed_logs)}") + + # Status codes + status_counts = Counter(p["status"] for p in parsed_logs) + print(f"\n Status Codes:") + for status, count in sorted(status_counts.items()): + bar = "█" * min(count // 5, 30) + print(f" {status}: {count:>5} {bar}") + + # Top IPs + ip_counts = Counter(p["ip"] for p in parsed_logs) + print(f"\n Top Source IPs:") + for ip, count in ip_counts.most_common(5): + pct = (count / len(parsed_logs)) * 100 + print(f" {ip:<18} {count:>5} ({pct:.1f}%)") + + # Services + service_counts = Counter(p["service"] for p in parsed_logs) + print(f"\n Requests by Service:") + for service, count in service_counts.most_common(10): + pct = (count / len(parsed_logs)) * 100 + print(f" {service:<25} {count:>5} ({pct:.1f}%)") + + # HTTP Methods + method_counts = Counter() + for p in parsed_logs: + method = p["request"].split()[0] if p["request"] else "UNKNOWN" + method_counts[method] += 1 + print(f"\n HTTP Methods:") + for method, count in method_counts.most_common(): + print(f" {method:<8} {count:>5}") + + # Latency stats + latencies = [p["latency_ms"] for p in parsed_logs] + avg_latency = sum(latencies) / len(latencies) + max_latency = max(latencies) + print(f"\n Latency:") + print(f" Average: {avg_latency:.1f}ms") + print(f" Max: {max_latency}ms") + + # Error rate + errors = sum(1 for p in parsed_logs if p["status"] >= 400) + error_rate = (errors / len(parsed_logs)) * 100 + print(f"\n Error rate: {error_rate:.1f}% ({errors} requests)") + print(f"{'─' * 60}\n") + + +def process_batch(logs, args): + """Process a batch of log lines: immediate alerts + LLM analysis + alerting.""" + all_findings = [] + + # Check for immediate alert patterns + immediate_alerts = check_immediate_alerts(logs) + if immediate_alerts: + all_findings.extend(immediate_alerts) + print(f" Found {len(immediate_alerts)} immediate alert patterns") + + # Filter noise and analyze with LLM + filtered_logs = filter_noise(logs) + print(f" Filtered {len(logs)} -> {len(filtered_logs)} logs for LLM analysis") + + if filtered_logs: + llm_available = True + for i in range(0, len(filtered_logs), BATCH_SIZE): + if not llm_available: + print(" Skipping remaining batches (LLM unavailable)") + break + batch = filtered_logs[i:i + BATCH_SIZE] + print(f" Analyzing batch {i // BATCH_SIZE + 1} ({len(batch)} logs)...") + + result = analyze_with_llm(batch) + if result is None: + llm_available = False + continue + if result.get("suspicious"): + findings = result.get("findings", []) + # Record LLM findings to threat DB + if DB_SERVER: + for f in findings: + ip = f.get("ip") + if ip and not is_private_ip(ip): + try: + threat_db.record_event( + ip, f.get("type", "unknown"), "llm", + severity=f.get("severity"), + evidence=f.get("evidence", "")[:500] + ) + except Exception as e: + print(f" Failed to record LLM finding to DB: {e}") + log_abuse(ip, f.get("type", "unknown")) + all_findings.extend(findings) + print(f" LLM found suspicious activity: {result.get('summary')}") + + # Send consolidated alert if findings exist + if all_findings: + severity_order = {"critical": 0, "high": 1, "medium": 2, "low": 3} + all_findings.sort(key=lambda x: severity_order.get(x.get("severity", "low"), 3)) + + max_severity = all_findings[0].get("severity", "low") + priority = {"critical": 10, "high": 8, "medium": 5, "low": 3}.get(max_severity, 5) + + alert_msg = format_findings(all_findings) + + if args.verbose: + print(f"\n{'─' * 60}") + print("FINDINGS") + print(f"{'─' * 60}") + print(alert_msg) + + if not args.dry_run: + send_alert( + f"Web Security Alert ({len(all_findings)} findings)", + alert_msg, + priority=priority + ) + else: + print(f" [Dry run] Would send alert with {len(all_findings)} findings") + else: + print(" No suspicious activity detected") + + +def run_daemon(args): + """Run as a long-lived daemon, tailing the log file in real time.""" + print(f"[{datetime.now().isoformat()}] Starting daemon mode") + print(f" Log file: {LOG_PATH}") + print(f" LLM interval: {LLM_INTERVAL}s") + print(f" Poll interval: {TAIL_POLL_INTERVAL}s") + if DB_SERVER: + print(f" Threat DB: {DB_SERVER}/{DB_NAME}") + else: + print(" Threat DB: disabled (no DB_SERVER configured)") + + buffer = [] + last_flush = time.monotonic() + last_save = time.monotonic() + current_pos = get_last_position() + + for line, pos in tail_log(LOG_PATH): + current_pos = pos + + # Immediate regex check on every line + immediate = check_immediate_alerts([line]) + if immediate: + print(f"[{datetime.now().isoformat()}] Immediate alert: {immediate[0]['type']}") + + buffer.append(line) + + now = time.monotonic() + + # Flush buffer to LLM at interval + if now - last_flush >= LLM_INTERVAL and buffer: + # Cap to BATCH_SIZE to avoid overwhelming the LLM with huge backlogs. + # Remaining lines carry forward to the next interval. + flush_batch = buffer[:BATCH_SIZE] + buffer = buffer[BATCH_SIZE:] + if buffer: + print(f"\n[{datetime.now().isoformat()}] Flushing {len(flush_batch)} lines to LLM ({len(buffer)} deferred)...") + else: + print(f"\n[{datetime.now().isoformat()}] Flushing {len(flush_batch)} lines to LLM...") + if args.verbose: + print_log_stats(flush_batch) + process_batch(flush_batch, args) + # Reset timer AFTER processing so we don't immediately flush again + last_flush = time.monotonic() + + # Save position periodically + if now - last_save >= 30: + if not args.dry_run: + save_position(current_pos) + last_save = now + + # Shutdown: flush remaining buffer and save state + if buffer: + print(f"\n[{datetime.now().isoformat()}] Shutdown: flushing {len(buffer)} remaining lines...") + process_batch(buffer, args) + + if not args.dry_run: + save_position(current_pos) + print(f"Final position saved: {current_pos}") + + +def run_once(args): + """Run a single analysis pass (original cron-job behavior).""" + print(f"[{datetime.now().isoformat()}] Starting single-run analysis...") + if args.dry_run: + print("(Dry run - no alerts will be sent, state will not be updated)") + + # Get current position + last_pos = get_last_position() + total_lines = get_total_lines() + + # If first run or log rotated, start from recent logs + if last_pos == 0 or last_pos > total_lines: + last_pos = max(0, total_lines - MAX_LINES_PER_RUN) + print(f"Starting from line {last_pos} (total: {total_lines})") + + lines_to_process = min(total_lines - last_pos, MAX_LINES_PER_RUN) + if lines_to_process <= 0: + print("No new logs to process") + return + + print(f"Processing {lines_to_process} new log lines...") + + # Fetch logs + logs = fetch_logs(last_pos, lines_to_process, total_lines) + if not logs: + print("No logs fetched") + return + + # Show verbose stats if requested + if args.verbose: + print_log_stats(logs) + + process_batch(logs, args) + + # Save position + if not args.dry_run: + save_position(last_pos + len(logs)) + print(f"Processed up to line {last_pos + len(logs)}") + else: + print(f"[Dry run] Would save position: {last_pos + len(logs)}") + + +def parse_args(): + """Parse command line arguments.""" + parser = argparse.ArgumentParser( + description="Analyze Traefik access logs for suspicious activity using LLM" + ) + parser.add_argument( + "-v", "--verbose", + action="store_true", + help="Show detailed log statistics" + ) + parser.add_argument( + "--dry-run", + action="store_true", + help="Analyze logs but don't send alerts or update state" + ) + parser.add_argument( + "--once", + action="store_true", + help="Run a single analysis pass and exit (original cron-job mode)" + ) + return parser.parse_args() + + +def main(): + args = parse_args() + + # Initialize threat database if configured + if DB_SERVER and DB_NAME and DB_USER and DB_PASSWORD: + try: + threat_db.init_db(DB_SERVER, DB_NAME, DB_USER, DB_PASSWORD) + print(f"Threat database initialized: {DB_SERVER}/{DB_NAME}") + except Exception as e: + print(f"Warning: Failed to initialize threat database: {e}") + print("Continuing without historical threat data.") + else: + print("Threat database not configured — running without historical data.") + + if args.once: + run_once(args) + else: + # Daemon mode — register signal handlers for graceful shutdown + signal.signal(signal.SIGTERM, _handle_signal) + signal.signal(signal.SIGINT, _handle_signal) + run_daemon(args) + + +if __name__ == "__main__": + main()