Files
Breehavior-Monitor/utils/drama_tracker.py
AJ Isaacs 33d56f8737 feat: move user aliases from config to DB with /bcs-alias command
Aliases now stored in UserState table instead of config.yaml. Adds
Aliases column (NVARCHAR 500), loads on startup, persists via flush.
New /bcs-alias slash command (view/set/clear) for managing nicknames.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-27 10:35:19 -05:00

323 lines
11 KiB
Python

import time
from dataclasses import dataclass, field
from datetime import datetime, timezone
@dataclass
class AnalysisEntry:
timestamp: float
toxicity_score: float
categories: list[str]
reasoning: str
@dataclass
class UserDrama:
entries: list[AnalysisEntry] = field(default_factory=list)
offense_count: int = 0
last_offense_time: float = 0.0
last_warning_time: float = 0.0
last_analysis_time: float = 0.0
warned_since_reset: bool = False
immune: bool = False
# Topic drift tracking
off_topic_count: int = 0
last_topic_remind_time: float = 0.0
owner_notified: bool = False
# Coherence tracking
coherence_scores: list[float] = field(default_factory=list)
baseline_coherence: float = 0.85
last_coherence_alert_time: float = 0.0
# Per-user LLM notes
notes: str = ""
# Known aliases/nicknames
aliases: list[str] = field(default_factory=list)
class DramaTracker:
def __init__(
self,
window_size: int = 10,
window_minutes: int = 15,
offense_reset_minutes: int = 120,
):
self.window_size = window_size
self.window_seconds = window_minutes * 60
self.offense_reset_seconds = offense_reset_minutes * 60
self._users: dict[int, UserDrama] = {}
def get_user(self, user_id: int) -> UserDrama:
if user_id not in self._users:
self._users[user_id] = UserDrama()
return self._users[user_id]
def add_entry(
self,
user_id: int,
toxicity_score: float,
categories: list[str],
reasoning: str,
) -> None:
user = self.get_user(user_id)
now = time.time()
user.entries.append(
AnalysisEntry(
timestamp=now,
toxicity_score=toxicity_score,
categories=categories,
reasoning=reasoning,
)
)
user.last_analysis_time = now
self._prune_entries(user, now)
def get_drama_score(self, user_id: int, escalation_boost: float = 0.04) -> float:
user = self.get_user(user_id)
now = time.time()
self._prune_entries(user, now)
if not user.entries:
return 0.0
# Weighted average: more recent messages weighted higher
total_weight = 0.0
weighted_sum = 0.0
for i, entry in enumerate(user.entries):
weight = (i + 1) # linear weight, later entries = higher
weighted_sum += entry.toxicity_score * weight
total_weight += weight
base_score = weighted_sum / total_weight if total_weight > 0 else 0.0
# Escalation: if warned, each high-scoring message AFTER the warning
# adds a boost so sustained bad behavior ramps toward mute threshold
if user.warned_since_reset and user.last_warning_time > 0:
post_warn_high = sum(
1 for e in user.entries
if e.timestamp > user.last_warning_time and e.toxicity_score >= 0.5
)
if post_warn_high > 0:
base_score += escalation_boost * post_warn_high
return min(base_score, 1.0)
def get_mute_threshold(self, user_id: int, base_threshold: float) -> float:
"""Lower the mute threshold if user was already warned."""
user = self.get_user(user_id)
if user.warned_since_reset:
return base_threshold - 0.05
return base_threshold
def record_offense(self, user_id: int) -> int:
user = self.get_user(user_id)
now = time.time()
# Reset offense count if enough time has passed
if (
user.last_offense_time > 0
and now - user.last_offense_time > self.offense_reset_seconds
):
user.offense_count = 0
user.offense_count += 1
user.last_offense_time = now
user.warned_since_reset = False
return user.offense_count
def record_warning(self, user_id: int) -> None:
user = self.get_user(user_id)
user.last_warning_time = time.time()
user.warned_since_reset = True
def can_warn(self, user_id: int, cooldown_minutes: int) -> bool:
user = self.get_user(user_id)
if user.last_warning_time == 0.0:
return True
return time.time() - user.last_warning_time > cooldown_minutes * 60
def can_analyze(self, user_id: int, cooldown_seconds: int) -> bool:
user = self.get_user(user_id)
if user.last_analysis_time == 0.0:
return True
return time.time() - user.last_analysis_time > cooldown_seconds
def reset_user(self, user_id: int) -> None:
if user_id in self._users:
del self._users[user_id]
def toggle_immunity(self, user_id: int) -> bool:
user = self.get_user(user_id)
user.immune = not user.immune
return user.immune
def is_immune(self, user_id: int) -> bool:
if user_id not in self._users:
return False
return self._users[user_id].immune
def get_all_scores(self) -> dict[int, float]:
scores = {}
for user_id in list(self._users.keys()):
score = self.get_drama_score(user_id)
if score > 0.0:
scores[user_id] = score
return scores
def get_recent_incidents(
self, user_id: int, count: int = 5
) -> list[AnalysisEntry]:
user = self.get_user(user_id)
now = time.time()
self._prune_entries(user, now)
# Return entries with score > 0.3 (non-trivial)
incidents = [e for e in user.entries if e.toxicity_score > 0.3]
return incidents[-count:]
def record_off_topic(self, user_id: int) -> int:
user = self.get_user(user_id)
user.off_topic_count += 1
user.last_topic_remind_time = time.time()
return user.off_topic_count
def can_topic_remind(self, user_id: int, cooldown_minutes: int) -> bool:
user = self.get_user(user_id)
if user.last_topic_remind_time == 0.0:
return True
return time.time() - user.last_topic_remind_time > cooldown_minutes * 60
def get_off_topic_count(self, user_id: int) -> int:
return self.get_user(user_id).off_topic_count
def mark_owner_notified(self, user_id: int) -> None:
self.get_user(user_id).owner_notified = True
def was_owner_notified(self, user_id: int) -> bool:
return self.get_user(user_id).owner_notified
def get_user_notes(self, user_id: int) -> str:
return self.get_user(user_id).notes
def update_user_notes(self, user_id: int, note_update: str) -> None:
user = self.get_user(user_id)
ts = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M")
new_line = f"[{ts}] {note_update}"
if user.notes:
user.notes = f"{user.notes}\n{new_line}"
else:
user.notes = new_line
# Keep only the 10 most recent lines
lines = user.notes.split("\n")
if len(lines) > 10:
user.notes = "\n".join(lines[-10:])
def set_user_profile(self, user_id: int, profile: str) -> None:
"""Replace the user's profile summary (permanent memory)."""
user = self.get_user(user_id)
user.notes = profile[:500]
def clear_user_notes(self, user_id: int) -> None:
self.get_user(user_id).notes = ""
def get_user_aliases(self, user_id: int) -> list[str]:
return self.get_user(user_id).aliases
def set_user_aliases(self, user_id: int, aliases: list[str]) -> None:
self.get_user(user_id).aliases = aliases
def get_all_aliases(self) -> dict[int, list[str]]:
"""Return {user_id: [aliases]} for all users that have aliases set."""
return {uid: user.aliases for uid, user in self._users.items() if user.aliases}
def reset_off_topic(self, user_id: int) -> None:
user = self.get_user(user_id)
user.off_topic_count = 0
user.last_topic_remind_time = 0.0
user.owner_notified = False
def update_coherence(
self,
user_id: int,
score: float,
flag: str,
drop_threshold: float = 0.3,
absolute_floor: float = 0.5,
cooldown_minutes: int = 30,
) -> dict | None:
"""Update user's coherence baseline and detect degradation.
Returns info dict if degradation detected, else None."""
user = self.get_user(user_id)
alpha = 0.1 # Slow-moving EMA — ~20 messages to shift significantly
# Keep a rolling window of recent scores (last 20)
user.coherence_scores.append(score)
if len(user.coherence_scores) > 20:
user.coherence_scores = user.coherence_scores[-20:]
baseline_before = user.baseline_coherence
drop = baseline_before - score
# Check for degradation BEFORE updating baseline
degraded = (
score < baseline_before - drop_threshold
and score < absolute_floor
)
# Update baseline with EMA
user.baseline_coherence = alpha * score + (1 - alpha) * user.baseline_coherence
if not degraded:
return None
# Check cooldown
now = time.time()
if (
user.last_coherence_alert_time > 0
and now - user.last_coherence_alert_time < cooldown_minutes * 60
):
return None
user.last_coherence_alert_time = now
return {
"baseline": baseline_before,
"current": score,
"drop": drop,
"flag": flag,
}
def load_user_states(self, states: list[dict]) -> int:
"""Hydrate user state from database rows.
Each dict must have: user_id, offense_count, immune, off_topic_count.
Optionally includes baseline_coherence.
Returns number of users loaded."""
count = 0
for state in states:
user_id = state["user_id"]
user = self.get_user(user_id)
user.offense_count = state["offense_count"]
user.immune = state["immune"]
user.off_topic_count = state["off_topic_count"]
if "baseline_coherence" in state:
user.baseline_coherence = state["baseline_coherence"]
if "user_notes" in state and state["user_notes"]:
user.notes = state["user_notes"]
if state.get("warned"):
user.warned_since_reset = True
if state.get("last_offense_at"):
user.last_offense_time = state["last_offense_at"]
# Apply time-based offense reset at load time
if time.time() - user.last_offense_time > self.offense_reset_seconds:
user.offense_count = 0
user.warned_since_reset = False
user.last_offense_time = 0.0
if state.get("aliases"):
user.aliases = [a.strip() for a in state["aliases"].split(",") if a.strip()]
count += 1
return count
def _prune_entries(self, user: UserDrama, now: float) -> None:
cutoff = now - self.window_seconds
user.entries = [e for e in user.entries if e.timestamp > cutoff]
if len(user.entries) > self.window_size:
user.entries = user.entries[-self.window_size :]