""" Threat Database Module Stores and queries historical abuse events in SQL Server on barge.lan. Maintains a per-IP known_threats profile updated automatically on each event. """ import pyodbc from datetime import datetime, timedelta # Module-level connection string, set by init_db() _conn_str = None def init_db(server, database, username, password): """Initialize the database connection string and ensure schema exists.""" global _conn_str _conn_str = ( f"DRIVER={{ODBC Driver 18 for SQL Server}};" f"SERVER={server};" f"DATABASE={database};" f"UID={username};" f"PWD={password};" f"TrustServerCertificate=yes;" ) _ensure_schema() def _get_connection(): """Get a new database connection.""" if _conn_str is None: raise RuntimeError("Database not initialized. Call init_db() first.") return pyodbc.connect(_conn_str) def _ensure_schema(): """Create tables if they don't exist.""" with _get_connection() as conn: cursor = conn.cursor() cursor.execute(""" IF NOT EXISTS (SELECT * FROM sys.tables WHERE name = 'abuse_events') BEGIN CREATE TABLE abuse_events ( id INT IDENTITY(1,1) PRIMARY KEY, timestamp DATETIME2 NOT NULL, ip VARCHAR(45) NOT NULL, attack_type VARCHAR(100) NOT NULL, source VARCHAR(10) NOT NULL, severity VARCHAR(10), evidence VARCHAR(500) ); CREATE INDEX idx_abuse_events_ip ON abuse_events(ip); CREATE INDEX idx_abuse_events_timestamp ON abuse_events(timestamp); END IF NOT EXISTS (SELECT * FROM sys.tables WHERE name = 'known_threats') BEGIN CREATE TABLE known_threats ( ip VARCHAR(45) PRIMARY KEY, first_seen DATETIME2 NOT NULL, last_seen DATETIME2 NOT NULL, total_events INT NOT NULL DEFAULT 1, attack_types VARCHAR(500) NOT NULL, threat_level VARCHAR(10) NOT NULL DEFAULT 'low', banned BIT NOT NULL DEFAULT 0, notes VARCHAR(1000) ); END """) conn.commit() def _compute_threat_level(total_events, attack_types_list): """Determine threat level based on event count and attack diversity.""" unique_types = len(attack_types_list) if total_events >= 20 or unique_types >= 4: return "critical" if total_events >= 10 or unique_types >= 3: return "high" if total_events >= 3 or unique_types >= 2: return "medium" return "low" def _update_known_threat(cursor, ip, attack_type, now): """Insert or update the known_threats record for an IP.""" cursor.execute("SELECT total_events, attack_types FROM known_threats WHERE ip = ?", ip) row = cursor.fetchone() if row is None: level = _compute_threat_level(1, [attack_type]) cursor.execute( "INSERT INTO known_threats (ip, first_seen, last_seen, total_events, attack_types, threat_level) " "VALUES (?, ?, ?, 1, ?, ?)", ip, now, now, attack_type, level ) else: new_count = row.total_events + 1 existing_types = [t.strip() for t in row.attack_types.split(",") if t.strip()] if attack_type not in existing_types: existing_types.append(attack_type) types_str = ", ".join(existing_types) # Truncate to fit column if len(types_str) > 500: types_str = types_str[:497] + "..." level = _compute_threat_level(new_count, existing_types) cursor.execute( "UPDATE known_threats SET last_seen = ?, total_events = ?, attack_types = ?, threat_level = ? " "WHERE ip = ?", now, new_count, types_str, level, ip ) def record_event(ip, attack_type, source, severity=None, evidence=None): """Insert an abuse event and update the known_threats profile for the IP.""" attack_type = attack_type.lower() now = datetime.utcnow() with _get_connection() as conn: cursor = conn.cursor() cursor.execute( "INSERT INTO abuse_events (timestamp, ip, attack_type, source, severity, evidence) " "VALUES (?, ?, ?, ?, ?, ?)", now, ip, attack_type, source, severity, evidence[:500] if evidence else None ) _update_known_threat(cursor, ip, attack_type, now) conn.commit() def lookup_ip(ip): """Look up historical threat data for an IP address. Returns the known_threats profile if it exists, otherwise indicates unknown. """ with _get_connection() as conn: cursor = conn.cursor() cursor.execute( "SELECT first_seen, last_seen, total_events, attack_types, threat_level, banned, notes " "FROM known_threats WHERE ip = ?", ip ) row = cursor.fetchone() if row is None: return {"ip": ip, "known_threat": False, "total_events": 0} return { "ip": ip, "known_threat": True, "first_seen": row.first_seen.isoformat(), "last_seen": row.last_seen.isoformat(), "total_events": row.total_events, "attack_types": row.attack_types, "threat_level": row.threat_level, "banned": bool(row.banned), "notes": row.notes, } def get_threat_summary(hours=24): """Get aggregate threat statistics for the last N hours.""" since = datetime.utcnow() - timedelta(hours=hours) with _get_connection() as conn: cursor = conn.cursor() cursor.execute( "SELECT COUNT(*) FROM abuse_events WHERE timestamp >= ?", since ) total = cursor.fetchone()[0] cursor.execute( "SELECT COUNT(DISTINCT ip) FROM abuse_events WHERE timestamp >= ?", since ) unique_ips = cursor.fetchone()[0] cursor.execute( "SELECT TOP 10 attack_type, COUNT(*) as cnt FROM abuse_events " "WHERE timestamp >= ? GROUP BY attack_type ORDER BY cnt DESC", since ) top_attacks = {row.attack_type: row.cnt for row in cursor.fetchall()} cursor.execute( "SELECT TOP 10 ip, COUNT(*) as cnt FROM abuse_events " "WHERE timestamp >= ? GROUP BY ip ORDER BY cnt DESC", since ) top_ips = {row.ip: row.cnt for row in cursor.fetchall()} # Include repeat offenders from known_threats cursor.execute( "SELECT TOP 5 ip, total_events, threat_level, attack_types " "FROM known_threats ORDER BY total_events DESC" ) top_offenders = [ {"ip": r.ip, "total_events": r.total_events, "threat_level": r.threat_level, "attack_types": r.attack_types} for r in cursor.fetchall() ] return { "hours": hours, "total_events": total, "unique_ips": unique_ips, "top_attack_types": top_attacks, "top_source_ips": top_ips, "top_known_offenders": top_offenders, } def get_recent_attacks(hours=1): """Get list of recent attack events from the last N hours.""" since = datetime.utcnow() - timedelta(hours=hours) with _get_connection() as conn: cursor = conn.cursor() cursor.execute( "SELECT TOP 50 timestamp, ip, attack_type, source, severity, evidence " "FROM abuse_events WHERE timestamp >= ? ORDER BY timestamp DESC", since ) return [ { "timestamp": row.timestamp.isoformat(), "ip": row.ip, "attack_type": row.attack_type, "source": row.source, "severity": row.severity, "evidence": row.evidence, } for row in cursor.fetchall() ]