Files
web-log-monitor/threat_db.py
AJ Isaacs 5b86573b62 Normalize attack_type to lowercase in record_event
Ensures consistent casing for attack type labels stored in the
threat database by applying .lower() on input.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-09 15:08:20 -05:00

225 lines
7.9 KiB
Python

"""
Threat Database Module
Stores and queries historical abuse events in SQL Server on barge.lan.
Maintains a per-IP known_threats profile updated automatically on each event.
"""
import pyodbc
from datetime import datetime, timedelta
# Module-level connection string, set by init_db()
_conn_str = None
def init_db(server, database, username, password):
"""Initialize the database connection string and ensure schema exists."""
global _conn_str
_conn_str = (
f"DRIVER={{ODBC Driver 18 for SQL Server}};"
f"SERVER={server};"
f"DATABASE={database};"
f"UID={username};"
f"PWD={password};"
f"TrustServerCertificate=yes;"
)
_ensure_schema()
def _get_connection():
"""Get a new database connection."""
if _conn_str is None:
raise RuntimeError("Database not initialized. Call init_db() first.")
return pyodbc.connect(_conn_str)
def _ensure_schema():
"""Create tables if they don't exist."""
with _get_connection() as conn:
cursor = conn.cursor()
cursor.execute("""
IF NOT EXISTS (SELECT * FROM sys.tables WHERE name = 'abuse_events')
BEGIN
CREATE TABLE abuse_events (
id INT IDENTITY(1,1) PRIMARY KEY,
timestamp DATETIME2 NOT NULL,
ip VARCHAR(45) NOT NULL,
attack_type VARCHAR(100) NOT NULL,
source VARCHAR(10) NOT NULL,
severity VARCHAR(10),
evidence VARCHAR(500)
);
CREATE INDEX idx_abuse_events_ip ON abuse_events(ip);
CREATE INDEX idx_abuse_events_timestamp ON abuse_events(timestamp);
END
IF NOT EXISTS (SELECT * FROM sys.tables WHERE name = 'known_threats')
BEGIN
CREATE TABLE known_threats (
ip VARCHAR(45) PRIMARY KEY,
first_seen DATETIME2 NOT NULL,
last_seen DATETIME2 NOT NULL,
total_events INT NOT NULL DEFAULT 1,
attack_types VARCHAR(500) NOT NULL,
threat_level VARCHAR(10) NOT NULL DEFAULT 'low',
banned BIT NOT NULL DEFAULT 0,
notes VARCHAR(1000)
);
END
""")
conn.commit()
def _compute_threat_level(total_events, attack_types_list):
"""Determine threat level based on event count and attack diversity."""
unique_types = len(attack_types_list)
if total_events >= 20 or unique_types >= 4:
return "critical"
if total_events >= 10 or unique_types >= 3:
return "high"
if total_events >= 3 or unique_types >= 2:
return "medium"
return "low"
def _update_known_threat(cursor, ip, attack_type, now):
"""Insert or update the known_threats record for an IP."""
cursor.execute("SELECT total_events, attack_types FROM known_threats WHERE ip = ?", ip)
row = cursor.fetchone()
if row is None:
level = _compute_threat_level(1, [attack_type])
cursor.execute(
"INSERT INTO known_threats (ip, first_seen, last_seen, total_events, attack_types, threat_level) "
"VALUES (?, ?, ?, 1, ?, ?)",
ip, now, now, attack_type, level
)
else:
new_count = row.total_events + 1
existing_types = [t.strip() for t in row.attack_types.split(",") if t.strip()]
if attack_type not in existing_types:
existing_types.append(attack_type)
types_str = ", ".join(existing_types)
# Truncate to fit column
if len(types_str) > 500:
types_str = types_str[:497] + "..."
level = _compute_threat_level(new_count, existing_types)
cursor.execute(
"UPDATE known_threats SET last_seen = ?, total_events = ?, attack_types = ?, threat_level = ? "
"WHERE ip = ?",
now, new_count, types_str, level, ip
)
def record_event(ip, attack_type, source, severity=None, evidence=None):
"""Insert an abuse event and update the known_threats profile for the IP."""
attack_type = attack_type.lower()
now = datetime.utcnow()
with _get_connection() as conn:
cursor = conn.cursor()
cursor.execute(
"INSERT INTO abuse_events (timestamp, ip, attack_type, source, severity, evidence) "
"VALUES (?, ?, ?, ?, ?, ?)",
now, ip, attack_type, source, severity, evidence[:500] if evidence else None
)
_update_known_threat(cursor, ip, attack_type, now)
conn.commit()
def lookup_ip(ip):
"""Look up historical threat data for an IP address.
Returns the known_threats profile if it exists, otherwise indicates unknown.
"""
with _get_connection() as conn:
cursor = conn.cursor()
cursor.execute(
"SELECT first_seen, last_seen, total_events, attack_types, threat_level, banned, notes "
"FROM known_threats WHERE ip = ?", ip
)
row = cursor.fetchone()
if row is None:
return {"ip": ip, "known_threat": False, "total_events": 0}
return {
"ip": ip,
"known_threat": True,
"first_seen": row.first_seen.isoformat(),
"last_seen": row.last_seen.isoformat(),
"total_events": row.total_events,
"attack_types": row.attack_types,
"threat_level": row.threat_level,
"banned": bool(row.banned),
"notes": row.notes,
}
def get_threat_summary(hours=24):
"""Get aggregate threat statistics for the last N hours."""
since = datetime.utcnow() - timedelta(hours=hours)
with _get_connection() as conn:
cursor = conn.cursor()
cursor.execute(
"SELECT COUNT(*) FROM abuse_events WHERE timestamp >= ?", since
)
total = cursor.fetchone()[0]
cursor.execute(
"SELECT COUNT(DISTINCT ip) FROM abuse_events WHERE timestamp >= ?", since
)
unique_ips = cursor.fetchone()[0]
cursor.execute(
"SELECT TOP 10 attack_type, COUNT(*) as cnt FROM abuse_events "
"WHERE timestamp >= ? GROUP BY attack_type ORDER BY cnt DESC", since
)
top_attacks = {row.attack_type: row.cnt for row in cursor.fetchall()}
cursor.execute(
"SELECT TOP 10 ip, COUNT(*) as cnt FROM abuse_events "
"WHERE timestamp >= ? GROUP BY ip ORDER BY cnt DESC", since
)
top_ips = {row.ip: row.cnt for row in cursor.fetchall()}
# Include repeat offenders from known_threats
cursor.execute(
"SELECT TOP 5 ip, total_events, threat_level, attack_types "
"FROM known_threats ORDER BY total_events DESC"
)
top_offenders = [
{"ip": r.ip, "total_events": r.total_events, "threat_level": r.threat_level, "attack_types": r.attack_types}
for r in cursor.fetchall()
]
return {
"hours": hours,
"total_events": total,
"unique_ips": unique_ips,
"top_attack_types": top_attacks,
"top_source_ips": top_ips,
"top_known_offenders": top_offenders,
}
def get_recent_attacks(hours=1):
"""Get list of recent attack events from the last N hours."""
since = datetime.utcnow() - timedelta(hours=hours)
with _get_connection() as conn:
cursor = conn.cursor()
cursor.execute(
"SELECT TOP 50 timestamp, ip, attack_type, source, severity, evidence "
"FROM abuse_events WHERE timestamp >= ? ORDER BY timestamp DESC", since
)
return [
{
"timestamp": row.timestamp.isoformat(),
"ip": row.ip,
"attack_type": row.attack_type,
"source": row.source,
"severity": row.severity,
"evidence": row.evidence,
}
for row in cursor.fetchall()
]