Files
Breehavior-Monitor/utils/drama_tracker.py
AJ Isaacs a35705d3f1 Initial commit: Breehavior Monitor Discord bot
Discord bot for monitoring chat sentiment and tracking drama using
Ollama LLM on athena.lan. Includes sentiment analysis, slash commands,
drama tracking, and SQL Server persistence via Docker Compose.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-20 22:39:40 -05:00

285 lines
9.3 KiB
Python

import time
from dataclasses import dataclass, field
from datetime import datetime, timezone
@dataclass
class AnalysisEntry:
timestamp: float
toxicity_score: float
categories: list[str]
reasoning: str
@dataclass
class UserDrama:
entries: list[AnalysisEntry] = field(default_factory=list)
offense_count: int = 0
last_offense_time: float = 0.0
last_warning_time: float = 0.0
last_analysis_time: float = 0.0
warned_since_reset: bool = False
immune: bool = False
# Topic drift tracking
off_topic_count: int = 0
last_topic_remind_time: float = 0.0
owner_notified: bool = False
# Coherence tracking
coherence_scores: list[float] = field(default_factory=list)
baseline_coherence: float = 0.85
last_coherence_alert_time: float = 0.0
# Per-user LLM notes
notes: str = ""
class DramaTracker:
def __init__(
self,
window_size: int = 10,
window_minutes: int = 15,
offense_reset_minutes: int = 120,
):
self.window_size = window_size
self.window_seconds = window_minutes * 60
self.offense_reset_seconds = offense_reset_minutes * 60
self._users: dict[int, UserDrama] = {}
def get_user(self, user_id: int) -> UserDrama:
if user_id not in self._users:
self._users[user_id] = UserDrama()
return self._users[user_id]
def add_entry(
self,
user_id: int,
toxicity_score: float,
categories: list[str],
reasoning: str,
) -> None:
user = self.get_user(user_id)
now = time.time()
user.entries.append(
AnalysisEntry(
timestamp=now,
toxicity_score=toxicity_score,
categories=categories,
reasoning=reasoning,
)
)
user.last_analysis_time = now
self._prune_entries(user, now)
def get_drama_score(self, user_id: int) -> float:
user = self.get_user(user_id)
now = time.time()
self._prune_entries(user, now)
if not user.entries:
return 0.0
# Weighted average: more recent messages weighted higher
total_weight = 0.0
weighted_sum = 0.0
for i, entry in enumerate(user.entries):
weight = (i + 1) # linear weight, later entries = higher
weighted_sum += entry.toxicity_score * weight
total_weight += weight
return weighted_sum / total_weight if total_weight > 0 else 0.0
def get_mute_threshold(self, user_id: int, base_threshold: float) -> float:
"""Lower the mute threshold if user was already warned."""
user = self.get_user(user_id)
if user.warned_since_reset:
return base_threshold - 0.05
return base_threshold
def record_offense(self, user_id: int) -> int:
user = self.get_user(user_id)
now = time.time()
# Reset offense count if enough time has passed
if (
user.last_offense_time > 0
and now - user.last_offense_time > self.offense_reset_seconds
):
user.offense_count = 0
user.offense_count += 1
user.last_offense_time = now
user.warned_since_reset = False
return user.offense_count
def record_warning(self, user_id: int) -> None:
user = self.get_user(user_id)
user.last_warning_time = time.time()
user.warned_since_reset = True
def can_warn(self, user_id: int, cooldown_minutes: int) -> bool:
user = self.get_user(user_id)
if user.last_warning_time == 0.0:
return True
return time.time() - user.last_warning_time > cooldown_minutes * 60
def can_analyze(self, user_id: int, cooldown_seconds: int) -> bool:
user = self.get_user(user_id)
if user.last_analysis_time == 0.0:
return True
return time.time() - user.last_analysis_time > cooldown_seconds
def reset_user(self, user_id: int) -> None:
if user_id in self._users:
del self._users[user_id]
def toggle_immunity(self, user_id: int) -> bool:
user = self.get_user(user_id)
user.immune = not user.immune
return user.immune
def is_immune(self, user_id: int) -> bool:
if user_id not in self._users:
return False
return self._users[user_id].immune
def get_all_scores(self) -> dict[int, float]:
scores = {}
for user_id in list(self._users.keys()):
score = self.get_drama_score(user_id)
if score > 0.0:
scores[user_id] = score
return scores
def get_recent_incidents(
self, user_id: int, count: int = 5
) -> list[AnalysisEntry]:
user = self.get_user(user_id)
now = time.time()
self._prune_entries(user, now)
# Return entries with score > 0.3 (non-trivial)
incidents = [e for e in user.entries if e.toxicity_score > 0.3]
return incidents[-count:]
def record_off_topic(self, user_id: int) -> int:
user = self.get_user(user_id)
user.off_topic_count += 1
user.last_topic_remind_time = time.time()
return user.off_topic_count
def can_topic_remind(self, user_id: int, cooldown_minutes: int) -> bool:
user = self.get_user(user_id)
if user.last_topic_remind_time == 0.0:
return True
return time.time() - user.last_topic_remind_time > cooldown_minutes * 60
def get_off_topic_count(self, user_id: int) -> int:
return self.get_user(user_id).off_topic_count
def mark_owner_notified(self, user_id: int) -> None:
self.get_user(user_id).owner_notified = True
def was_owner_notified(self, user_id: int) -> bool:
return self.get_user(user_id).owner_notified
def get_user_notes(self, user_id: int) -> str:
return self.get_user(user_id).notes
def update_user_notes(self, user_id: int, note_update: str) -> None:
user = self.get_user(user_id)
ts = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M")
new_line = f"[{ts}] {note_update}"
if user.notes:
user.notes = f"{user.notes}\n{new_line}"
else:
user.notes = new_line
# Trim oldest lines if over ~2000 chars
while len(user.notes) > 2000:
lines = user.notes.split("\n")
if len(lines) <= 1:
break
user.notes = "\n".join(lines[1:])
def clear_user_notes(self, user_id: int) -> None:
self.get_user(user_id).notes = ""
def reset_off_topic(self, user_id: int) -> None:
user = self.get_user(user_id)
user.off_topic_count = 0
user.last_topic_remind_time = 0.0
user.owner_notified = False
def update_coherence(
self,
user_id: int,
score: float,
flag: str,
drop_threshold: float = 0.3,
absolute_floor: float = 0.5,
cooldown_minutes: int = 30,
) -> dict | None:
"""Update user's coherence baseline and detect degradation.
Returns info dict if degradation detected, else None."""
user = self.get_user(user_id)
alpha = 0.1 # Slow-moving EMA — ~20 messages to shift significantly
# Keep a rolling window of recent scores (last 20)
user.coherence_scores.append(score)
if len(user.coherence_scores) > 20:
user.coherence_scores = user.coherence_scores[-20:]
baseline_before = user.baseline_coherence
drop = baseline_before - score
# Check for degradation BEFORE updating baseline
degraded = (
score < baseline_before - drop_threshold
and score < absolute_floor
)
# Update baseline with EMA
user.baseline_coherence = alpha * score + (1 - alpha) * user.baseline_coherence
if not degraded:
return None
# Check cooldown
now = time.time()
if (
user.last_coherence_alert_time > 0
and now - user.last_coherence_alert_time < cooldown_minutes * 60
):
return None
user.last_coherence_alert_time = now
return {
"baseline": baseline_before,
"current": score,
"drop": drop,
"flag": flag,
}
def load_user_states(self, states: list[dict]) -> int:
"""Hydrate user state from database rows.
Each dict must have: user_id, offense_count, immune, off_topic_count.
Optionally includes baseline_coherence.
Returns number of users loaded."""
count = 0
for state in states:
user_id = state["user_id"]
user = self.get_user(user_id)
user.offense_count = state["offense_count"]
user.immune = state["immune"]
user.off_topic_count = state["off_topic_count"]
if "baseline_coherence" in state:
user.baseline_coherence = state["baseline_coherence"]
if "user_notes" in state and state["user_notes"]:
user.notes = state["user_notes"]
count += 1
return count
def _prune_entries(self, user: UserDrama, now: float) -> None:
cutoff = now - self.window_seconds
user.entries = [e for e in user.entries if e.timestamp > cutoff]
if len(user.entries) > self.window_size:
user.entries = user.entries[-self.window_size :]