Replaces the entire notes field with an LLM-generated profile summary, used by the memory extraction system for permanent facts. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
309 lines
10 KiB
Python
309 lines
10 KiB
Python
import time
|
|
from dataclasses import dataclass, field
|
|
from datetime import datetime, timezone
|
|
|
|
|
|
@dataclass
|
|
class AnalysisEntry:
|
|
timestamp: float
|
|
toxicity_score: float
|
|
categories: list[str]
|
|
reasoning: str
|
|
|
|
|
|
@dataclass
|
|
class UserDrama:
|
|
entries: list[AnalysisEntry] = field(default_factory=list)
|
|
offense_count: int = 0
|
|
last_offense_time: float = 0.0
|
|
last_warning_time: float = 0.0
|
|
last_analysis_time: float = 0.0
|
|
warned_since_reset: bool = False
|
|
immune: bool = False
|
|
# Topic drift tracking
|
|
off_topic_count: int = 0
|
|
last_topic_remind_time: float = 0.0
|
|
owner_notified: bool = False
|
|
# Coherence tracking
|
|
coherence_scores: list[float] = field(default_factory=list)
|
|
baseline_coherence: float = 0.85
|
|
last_coherence_alert_time: float = 0.0
|
|
# Per-user LLM notes
|
|
notes: str = ""
|
|
|
|
|
|
class DramaTracker:
|
|
def __init__(
|
|
self,
|
|
window_size: int = 10,
|
|
window_minutes: int = 15,
|
|
offense_reset_minutes: int = 120,
|
|
):
|
|
self.window_size = window_size
|
|
self.window_seconds = window_minutes * 60
|
|
self.offense_reset_seconds = offense_reset_minutes * 60
|
|
self._users: dict[int, UserDrama] = {}
|
|
|
|
def get_user(self, user_id: int) -> UserDrama:
|
|
if user_id not in self._users:
|
|
self._users[user_id] = UserDrama()
|
|
return self._users[user_id]
|
|
|
|
def add_entry(
|
|
self,
|
|
user_id: int,
|
|
toxicity_score: float,
|
|
categories: list[str],
|
|
reasoning: str,
|
|
) -> None:
|
|
user = self.get_user(user_id)
|
|
now = time.time()
|
|
|
|
user.entries.append(
|
|
AnalysisEntry(
|
|
timestamp=now,
|
|
toxicity_score=toxicity_score,
|
|
categories=categories,
|
|
reasoning=reasoning,
|
|
)
|
|
)
|
|
user.last_analysis_time = now
|
|
self._prune_entries(user, now)
|
|
|
|
def get_drama_score(self, user_id: int, escalation_boost: float = 0.04) -> float:
|
|
user = self.get_user(user_id)
|
|
now = time.time()
|
|
self._prune_entries(user, now)
|
|
|
|
if not user.entries:
|
|
return 0.0
|
|
|
|
# Weighted average: more recent messages weighted higher
|
|
total_weight = 0.0
|
|
weighted_sum = 0.0
|
|
for i, entry in enumerate(user.entries):
|
|
weight = (i + 1) # linear weight, later entries = higher
|
|
weighted_sum += entry.toxicity_score * weight
|
|
total_weight += weight
|
|
|
|
base_score = weighted_sum / total_weight if total_weight > 0 else 0.0
|
|
|
|
# Escalation: if warned, each high-scoring message AFTER the warning
|
|
# adds a boost so sustained bad behavior ramps toward mute threshold
|
|
if user.warned_since_reset and user.last_warning_time > 0:
|
|
post_warn_high = sum(
|
|
1 for e in user.entries
|
|
if e.timestamp > user.last_warning_time and e.toxicity_score >= 0.5
|
|
)
|
|
if post_warn_high > 0:
|
|
base_score += escalation_boost * post_warn_high
|
|
|
|
return min(base_score, 1.0)
|
|
|
|
def get_mute_threshold(self, user_id: int, base_threshold: float) -> float:
|
|
"""Lower the mute threshold if user was already warned."""
|
|
user = self.get_user(user_id)
|
|
if user.warned_since_reset:
|
|
return base_threshold - 0.05
|
|
return base_threshold
|
|
|
|
def record_offense(self, user_id: int) -> int:
|
|
user = self.get_user(user_id)
|
|
now = time.time()
|
|
|
|
# Reset offense count if enough time has passed
|
|
if (
|
|
user.last_offense_time > 0
|
|
and now - user.last_offense_time > self.offense_reset_seconds
|
|
):
|
|
user.offense_count = 0
|
|
|
|
user.offense_count += 1
|
|
user.last_offense_time = now
|
|
user.warned_since_reset = False
|
|
return user.offense_count
|
|
|
|
def record_warning(self, user_id: int) -> None:
|
|
user = self.get_user(user_id)
|
|
user.last_warning_time = time.time()
|
|
user.warned_since_reset = True
|
|
|
|
def can_warn(self, user_id: int, cooldown_minutes: int) -> bool:
|
|
user = self.get_user(user_id)
|
|
if user.last_warning_time == 0.0:
|
|
return True
|
|
return time.time() - user.last_warning_time > cooldown_minutes * 60
|
|
|
|
def can_analyze(self, user_id: int, cooldown_seconds: int) -> bool:
|
|
user = self.get_user(user_id)
|
|
if user.last_analysis_time == 0.0:
|
|
return True
|
|
return time.time() - user.last_analysis_time > cooldown_seconds
|
|
|
|
def reset_user(self, user_id: int) -> None:
|
|
if user_id in self._users:
|
|
del self._users[user_id]
|
|
|
|
def toggle_immunity(self, user_id: int) -> bool:
|
|
user = self.get_user(user_id)
|
|
user.immune = not user.immune
|
|
return user.immune
|
|
|
|
def is_immune(self, user_id: int) -> bool:
|
|
if user_id not in self._users:
|
|
return False
|
|
return self._users[user_id].immune
|
|
|
|
def get_all_scores(self) -> dict[int, float]:
|
|
scores = {}
|
|
for user_id in list(self._users.keys()):
|
|
score = self.get_drama_score(user_id)
|
|
if score > 0.0:
|
|
scores[user_id] = score
|
|
return scores
|
|
|
|
def get_recent_incidents(
|
|
self, user_id: int, count: int = 5
|
|
) -> list[AnalysisEntry]:
|
|
user = self.get_user(user_id)
|
|
now = time.time()
|
|
self._prune_entries(user, now)
|
|
# Return entries with score > 0.3 (non-trivial)
|
|
incidents = [e for e in user.entries if e.toxicity_score > 0.3]
|
|
return incidents[-count:]
|
|
|
|
def record_off_topic(self, user_id: int) -> int:
|
|
user = self.get_user(user_id)
|
|
user.off_topic_count += 1
|
|
user.last_topic_remind_time = time.time()
|
|
return user.off_topic_count
|
|
|
|
def can_topic_remind(self, user_id: int, cooldown_minutes: int) -> bool:
|
|
user = self.get_user(user_id)
|
|
if user.last_topic_remind_time == 0.0:
|
|
return True
|
|
return time.time() - user.last_topic_remind_time > cooldown_minutes * 60
|
|
|
|
def get_off_topic_count(self, user_id: int) -> int:
|
|
return self.get_user(user_id).off_topic_count
|
|
|
|
def mark_owner_notified(self, user_id: int) -> None:
|
|
self.get_user(user_id).owner_notified = True
|
|
|
|
def was_owner_notified(self, user_id: int) -> bool:
|
|
return self.get_user(user_id).owner_notified
|
|
|
|
def get_user_notes(self, user_id: int) -> str:
|
|
return self.get_user(user_id).notes
|
|
|
|
def update_user_notes(self, user_id: int, note_update: str) -> None:
|
|
user = self.get_user(user_id)
|
|
ts = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M")
|
|
new_line = f"[{ts}] {note_update}"
|
|
if user.notes:
|
|
user.notes = f"{user.notes}\n{new_line}"
|
|
else:
|
|
user.notes = new_line
|
|
# Keep only the 10 most recent lines
|
|
lines = user.notes.split("\n")
|
|
if len(lines) > 10:
|
|
user.notes = "\n".join(lines[-10:])
|
|
|
|
def set_user_profile(self, user_id: int, profile: str) -> None:
|
|
"""Replace the user's profile summary (permanent memory)."""
|
|
user = self.get_user(user_id)
|
|
user.notes = profile[:500]
|
|
|
|
def clear_user_notes(self, user_id: int) -> None:
|
|
self.get_user(user_id).notes = ""
|
|
|
|
def reset_off_topic(self, user_id: int) -> None:
|
|
user = self.get_user(user_id)
|
|
user.off_topic_count = 0
|
|
user.last_topic_remind_time = 0.0
|
|
user.owner_notified = False
|
|
|
|
def update_coherence(
|
|
self,
|
|
user_id: int,
|
|
score: float,
|
|
flag: str,
|
|
drop_threshold: float = 0.3,
|
|
absolute_floor: float = 0.5,
|
|
cooldown_minutes: int = 30,
|
|
) -> dict | None:
|
|
"""Update user's coherence baseline and detect degradation.
|
|
Returns info dict if degradation detected, else None."""
|
|
user = self.get_user(user_id)
|
|
alpha = 0.1 # Slow-moving EMA — ~20 messages to shift significantly
|
|
|
|
# Keep a rolling window of recent scores (last 20)
|
|
user.coherence_scores.append(score)
|
|
if len(user.coherence_scores) > 20:
|
|
user.coherence_scores = user.coherence_scores[-20:]
|
|
|
|
baseline_before = user.baseline_coherence
|
|
drop = baseline_before - score
|
|
|
|
# Check for degradation BEFORE updating baseline
|
|
degraded = (
|
|
score < baseline_before - drop_threshold
|
|
and score < absolute_floor
|
|
)
|
|
|
|
# Update baseline with EMA
|
|
user.baseline_coherence = alpha * score + (1 - alpha) * user.baseline_coherence
|
|
|
|
if not degraded:
|
|
return None
|
|
|
|
# Check cooldown
|
|
now = time.time()
|
|
if (
|
|
user.last_coherence_alert_time > 0
|
|
and now - user.last_coherence_alert_time < cooldown_minutes * 60
|
|
):
|
|
return None
|
|
|
|
user.last_coherence_alert_time = now
|
|
return {
|
|
"baseline": baseline_before,
|
|
"current": score,
|
|
"drop": drop,
|
|
"flag": flag,
|
|
}
|
|
|
|
def load_user_states(self, states: list[dict]) -> int:
|
|
"""Hydrate user state from database rows.
|
|
Each dict must have: user_id, offense_count, immune, off_topic_count.
|
|
Optionally includes baseline_coherence.
|
|
Returns number of users loaded."""
|
|
count = 0
|
|
for state in states:
|
|
user_id = state["user_id"]
|
|
user = self.get_user(user_id)
|
|
user.offense_count = state["offense_count"]
|
|
user.immune = state["immune"]
|
|
user.off_topic_count = state["off_topic_count"]
|
|
if "baseline_coherence" in state:
|
|
user.baseline_coherence = state["baseline_coherence"]
|
|
if "user_notes" in state and state["user_notes"]:
|
|
user.notes = state["user_notes"]
|
|
if state.get("warned"):
|
|
user.warned_since_reset = True
|
|
if state.get("last_offense_at"):
|
|
user.last_offense_time = state["last_offense_at"]
|
|
# Apply time-based offense reset at load time
|
|
if time.time() - user.last_offense_time > self.offense_reset_seconds:
|
|
user.offense_count = 0
|
|
user.warned_since_reset = False
|
|
user.last_offense_time = 0.0
|
|
count += 1
|
|
return count
|
|
|
|
def _prune_entries(self, user: UserDrama, now: float) -> None:
|
|
cutoff = now - self.window_seconds
|
|
user.entries = [e for e in user.entries if e.timestamp > cutoff]
|
|
if len(user.entries) > self.window_size:
|
|
user.entries = user.entries[-self.window_size :]
|