import time from dataclasses import dataclass, field from datetime import datetime, timezone @dataclass class AnalysisEntry: timestamp: float toxicity_score: float categories: list[str] reasoning: str @dataclass class UserDrama: entries: list[AnalysisEntry] = field(default_factory=list) offense_count: int = 0 last_offense_time: float = 0.0 last_warning_time: float = 0.0 last_analysis_time: float = 0.0 warned_since_reset: bool = False warning_expires_at: float = 0.0 immune: bool = False # Topic drift tracking off_topic_count: int = 0 last_topic_remind_time: float = 0.0 owner_notified: bool = False # Coherence tracking coherence_scores: list[float] = field(default_factory=list) baseline_coherence: float = 0.85 last_coherence_alert_time: float = 0.0 # Per-user LLM notes notes: str = "" # Known aliases/nicknames aliases: list[str] = field(default_factory=list) class DramaTracker: def __init__( self, window_size: int = 10, window_minutes: int = 15, offense_reset_minutes: int = 120, warning_expiration_minutes: int = 30, ): self.window_size = window_size self.window_seconds = window_minutes * 60 self.offense_reset_seconds = offense_reset_minutes * 60 self.warning_expiration_seconds = warning_expiration_minutes * 60 self._users: dict[int, UserDrama] = {} def get_user(self, user_id: int) -> UserDrama: if user_id not in self._users: self._users[user_id] = UserDrama() return self._users[user_id] def add_entry( self, user_id: int, toxicity_score: float, categories: list[str], reasoning: str, ) -> None: user = self.get_user(user_id) now = time.time() user.entries.append( AnalysisEntry( timestamp=now, toxicity_score=toxicity_score, categories=categories, reasoning=reasoning, ) ) user.last_analysis_time = now self._prune_entries(user, now) def get_drama_score(self, user_id: int, escalation_boost: float = 0.04) -> float: user = self.get_user(user_id) self._expire_warning(user) now = time.time() self._prune_entries(user, now) if not user.entries: return 0.0 # Weighted average: more recent messages weighted higher total_weight = 0.0 weighted_sum = 0.0 for i, entry in enumerate(user.entries): weight = (i + 1) # linear weight, later entries = higher weighted_sum += entry.toxicity_score * weight total_weight += weight base_score = weighted_sum / total_weight if total_weight > 0 else 0.0 # Escalation: if warned, each high-scoring message AFTER the warning # adds a boost so sustained bad behavior ramps toward mute threshold if user.warned_since_reset and user.last_warning_time > 0: post_warn_high = sum( 1 for e in user.entries if e.timestamp > user.last_warning_time and e.toxicity_score >= 0.5 ) if post_warn_high > 0: base_score += escalation_boost * post_warn_high return min(base_score, 1.0) def get_mute_threshold(self, user_id: int, base_threshold: float) -> float: """Lower the mute threshold if user was already warned.""" user = self.get_user(user_id) self._expire_warning(user) if user.warned_since_reset: return base_threshold - 0.05 return base_threshold def record_offense(self, user_id: int) -> int: user = self.get_user(user_id) now = time.time() # Reset offense count if enough time has passed if ( user.last_offense_time > 0 and now - user.last_offense_time > self.offense_reset_seconds ): user.offense_count = 0 user.offense_count += 1 user.last_offense_time = now user.warned_since_reset = False user.warning_expires_at = 0.0 return user.offense_count def record_warning(self, user_id: int) -> None: user = self.get_user(user_id) now = time.time() user.last_warning_time = now user.warned_since_reset = True if self.warning_expiration_seconds > 0: user.warning_expires_at = now + self.warning_expiration_seconds else: user.warning_expires_at = 0.0 # Never expires def _expire_warning(self, user: UserDrama) -> None: """Clear warned flag if the warning has expired.""" if ( user.warned_since_reset and user.warning_expires_at > 0 and time.time() >= user.warning_expires_at ): user.warned_since_reset = False user.warning_expires_at = 0.0 def is_warned(self, user_id: int) -> bool: """Check if user is currently warned (respects expiration).""" user = self.get_user(user_id) self._expire_warning(user) return user.warned_since_reset def can_warn(self, user_id: int, cooldown_minutes: int) -> bool: user = self.get_user(user_id) if user.last_warning_time == 0.0: return True return time.time() - user.last_warning_time > cooldown_minutes * 60 def can_analyze(self, user_id: int, cooldown_seconds: int) -> bool: user = self.get_user(user_id) if user.last_analysis_time == 0.0: return True return time.time() - user.last_analysis_time > cooldown_seconds def reset_user(self, user_id: int) -> None: if user_id in self._users: del self._users[user_id] def toggle_immunity(self, user_id: int) -> bool: user = self.get_user(user_id) user.immune = not user.immune return user.immune def is_immune(self, user_id: int) -> bool: if user_id not in self._users: return False return self._users[user_id].immune def get_all_scores(self) -> dict[int, float]: scores = {} for user_id in list(self._users.keys()): score = self.get_drama_score(user_id) if score > 0.0: scores[user_id] = score return scores def get_recent_incidents( self, user_id: int, count: int = 5 ) -> list[AnalysisEntry]: user = self.get_user(user_id) now = time.time() self._prune_entries(user, now) # Return entries with score > 0.3 (non-trivial) incidents = [e for e in user.entries if e.toxicity_score > 0.3] return incidents[-count:] def record_off_topic(self, user_id: int) -> int: user = self.get_user(user_id) user.off_topic_count += 1 user.last_topic_remind_time = time.time() return user.off_topic_count def can_topic_remind(self, user_id: int, cooldown_minutes: int) -> bool: user = self.get_user(user_id) if user.last_topic_remind_time == 0.0: return True return time.time() - user.last_topic_remind_time > cooldown_minutes * 60 def get_off_topic_count(self, user_id: int) -> int: return self.get_user(user_id).off_topic_count def mark_owner_notified(self, user_id: int) -> None: self.get_user(user_id).owner_notified = True def was_owner_notified(self, user_id: int) -> bool: return self.get_user(user_id).owner_notified def get_user_notes(self, user_id: int) -> str: return self.get_user(user_id).notes def update_user_notes(self, user_id: int, note_update: str) -> None: user = self.get_user(user_id) ts = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M") new_line = f"[{ts}] {note_update}" if user.notes: user.notes = f"{user.notes}\n{new_line}" else: user.notes = new_line # Keep only the 10 most recent lines lines = user.notes.split("\n") if len(lines) > 10: user.notes = "\n".join(lines[-10:]) def set_user_profile(self, user_id: int, profile: str) -> None: """Replace the user's profile summary (permanent memory).""" user = self.get_user(user_id) user.notes = profile[:500] def clear_user_notes(self, user_id: int) -> None: self.get_user(user_id).notes = "" def get_user_aliases(self, user_id: int) -> list[str]: return self.get_user(user_id).aliases def set_user_aliases(self, user_id: int, aliases: list[str]) -> None: self.get_user(user_id).aliases = aliases def get_all_aliases(self) -> dict[int, list[str]]: """Return {user_id: [aliases]} for all users that have aliases set.""" return {uid: user.aliases for uid, user in self._users.items() if user.aliases} def reset_off_topic(self, user_id: int) -> None: user = self.get_user(user_id) user.off_topic_count = 0 user.last_topic_remind_time = 0.0 user.owner_notified = False def update_coherence( self, user_id: int, score: float, flag: str, drop_threshold: float = 0.3, absolute_floor: float = 0.5, cooldown_minutes: int = 30, ) -> dict | None: """Update user's coherence baseline and detect degradation. Returns info dict if degradation detected, else None.""" user = self.get_user(user_id) alpha = 0.1 # Slow-moving EMA — ~20 messages to shift significantly # Keep a rolling window of recent scores (last 20) user.coherence_scores.append(score) if len(user.coherence_scores) > 20: user.coherence_scores = user.coherence_scores[-20:] baseline_before = user.baseline_coherence drop = baseline_before - score # Check for degradation BEFORE updating baseline degraded = ( score < baseline_before - drop_threshold and score < absolute_floor ) # Update baseline with EMA user.baseline_coherence = alpha * score + (1 - alpha) * user.baseline_coherence if not degraded: return None # Check cooldown now = time.time() if ( user.last_coherence_alert_time > 0 and now - user.last_coherence_alert_time < cooldown_minutes * 60 ): return None user.last_coherence_alert_time = now return { "baseline": baseline_before, "current": score, "drop": drop, "flag": flag, } def load_user_states(self, states: list[dict]) -> int: """Hydrate user state from database rows. Each dict must have: user_id, offense_count, immune, off_topic_count. Optionally includes baseline_coherence. Returns number of users loaded.""" count = 0 for state in states: user_id = state["user_id"] user = self.get_user(user_id) user.offense_count = state["offense_count"] user.immune = state["immune"] user.off_topic_count = state["off_topic_count"] if "baseline_coherence" in state: user.baseline_coherence = state["baseline_coherence"] if "user_notes" in state and state["user_notes"]: user.notes = state["user_notes"] if state.get("warned"): user.warned_since_reset = True user.warning_expires_at = state.get("warning_expires_at", 0.0) or 0.0 # Expire warning at load time if it's past due self._expire_warning(user) if state.get("last_offense_at"): user.last_offense_time = state["last_offense_at"] # Apply time-based offense reset at load time if time.time() - user.last_offense_time > self.offense_reset_seconds: user.offense_count = 0 user.warned_since_reset = False user.warning_expires_at = 0.0 user.last_offense_time = 0.0 if state.get("aliases"): user.aliases = [a.strip() for a in state["aliases"].split(",") if a.strip()] count += 1 return count def _prune_entries(self, user: UserDrama, now: float) -> None: cutoff = now - self.window_seconds user.entries = [e for e in user.entries if e.timestamp > cutoff] if len(user.entries) > self.window_size: user.entries = user.entries[-self.window_size :]