Files
Breehavior-Monitor/cogs/sentiment/__init__.py
AJ Isaacs c63913cf14 fix: anonymize usernames before LLM analysis to prevent name-based scoring bias
Display names like "Calm your tits" were causing the LLM to inflate toxicity
scores on completely benign messages. Usernames are now replaced with User1,
User2, etc. before sending to the LLM, then mapped back to real names in the
results.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-25 22:20:53 -05:00

722 lines
29 KiB
Python

import asyncio
import logging
from datetime import datetime, timezone
import discord
from discord.ext import commands, tasks
from cogs.sentiment.actions import mute_user, warn_user
from cogs.sentiment.channel_redirect import build_channel_context, handle_channel_redirect
from cogs.sentiment.coherence import handle_coherence_alert
from cogs.sentiment.log_utils import log_analysis
from cogs.sentiment.state import flush_dirty_states
from cogs.sentiment.topic_drift import handle_topic_drift
logger = logging.getLogger("bcs.sentiment")
# How often to flush dirty user states to DB (seconds)
STATE_FLUSH_INTERVAL = 300 # 5 minutes
class SentimentCog(commands.Cog):
def __init__(self, bot: commands.Bot):
self.bot = bot
# Track which user IDs have unsaved in-memory changes
self._dirty_users: set[int] = set()
# Per-user redirect cooldown: {user_id: last_redirect_datetime}
self._redirect_cooldowns: dict[int, datetime] = {}
# Debounce buffer: keyed by channel_id, stores list of messages from ALL users
self._message_buffer: dict[int, list[discord.Message]] = {}
# Pending debounce timer tasks (per-channel)
self._debounce_tasks: dict[int, asyncio.Task] = {}
# Mention scan tasks (separate from debounce)
self._mention_scan_tasks: dict[int, asyncio.Task] = {}
# Mention scan state
self._mention_scan_cooldowns: dict[int, datetime] = {} # {channel_id: last_scan_time}
self._mention_scan_results: dict[int, str] = {} # {trigger_message_id: findings_summary}
self._analyzed_message_ids: set[int] = set() # Discord message IDs already analyzed
self._max_analyzed_ids = 500
async def cog_load(self):
self._flush_states.start()
async def cog_unload(self):
self._flush_states.cancel()
# Cancel all pending debounce timers and process remaining buffers
for task in self._debounce_tasks.values():
task.cancel()
self._debounce_tasks.clear()
for task in self._mention_scan_tasks.values():
task.cancel()
self._mention_scan_tasks.clear()
for channel_id in list(self._message_buffer):
await self._process_buffered(channel_id)
# Final flush on shutdown
await flush_dirty_states(self.bot, self._dirty_users)
@commands.Cog.listener()
async def on_message(self, message: discord.Message):
logger.info("MSG from %s in #%s: %s", message.author, getattr(message.channel, 'name', 'DM'), message.content[:80] if message.content else "(empty)")
# Ignore bots (including ourselves)
if message.author.bot:
return
# Ignore DMs
if not message.guild:
return
config = self.bot.config
monitoring = config.get("monitoring", {})
if not monitoring.get("enabled", True):
return
# Check if channel is monitored
monitored_channels = monitoring.get("channels", [])
if monitored_channels and message.channel.id not in monitored_channels:
return
# Check ignored users
if message.author.id in monitoring.get("ignored_users", []):
return
# Check immune roles
immune_roles = set(monitoring.get("immune_roles", []))
if immune_roles and any(
r.id in immune_roles for r in message.author.roles
):
return
# Check per-user immunity
if self.bot.drama_tracker.is_immune(message.author.id):
return
# Explicit @mention of the bot triggers a mention scan instead of scoring.
# Reply-pings (Discord auto-adds replied-to user to mentions) should NOT
# trigger scans — and reply-to-bot messages should still be scored normally
# so toxic replies to bot warnings aren't silently skipped.
bot_mentioned_in_text = (
f"<@{self.bot.user.id}>" in (message.content or "")
or f"<@!{self.bot.user.id}>" in (message.content or "")
)
if bot_mentioned_in_text:
mention_config = config.get("mention_scan", {})
if mention_config.get("enabled", True):
await self._maybe_start_mention_scan(message, mention_config)
return
# Skip if empty
if not message.content or not message.content.strip():
return
# Buffer the message and start/reset debounce timer (per-channel)
channel_id = message.channel.id
if channel_id not in self._message_buffer:
self._message_buffer[channel_id] = []
self._message_buffer[channel_id].append(message)
# Cancel existing debounce timer for this channel
existing_task = self._debounce_tasks.get(channel_id)
if existing_task and not existing_task.done():
existing_task.cancel()
batch_window = config.get("sentiment", {}).get("batch_window_seconds", 3)
self._debounce_tasks[channel_id] = asyncio.create_task(
self._debounce_then_process(channel_id, batch_window)
)
async def _debounce_then_process(self, channel_id: int, delay: float):
"""Sleep for the debounce window, then process the buffered messages."""
try:
await asyncio.sleep(delay)
await self._process_buffered(channel_id)
except asyncio.CancelledError:
pass # Timer was reset by a new message — expected
def _resolve_thresholds(self) -> dict:
"""Resolve effective moderation thresholds based on current mode."""
config = self.bot.config
sentiment_config = config.get("sentiment", {})
mode_config = self.bot.get_mode_config()
moderation_level = mode_config.get("moderation", "full")
if moderation_level == "relaxed" and "relaxed_thresholds" in mode_config:
rt = mode_config["relaxed_thresholds"]
return {
"warning": rt.get("warning_threshold", 0.80),
"mute": rt.get("mute_threshold", 0.85),
"spike_warn": rt.get("spike_warning_threshold", 0.70),
"spike_mute": rt.get("spike_mute_threshold", 0.85),
}
return {
"warning": sentiment_config.get("warning_threshold", 0.6),
"mute": sentiment_config.get("mute_threshold", 0.75),
"spike_warn": sentiment_config.get("spike_warning_threshold", 0.5),
"spike_mute": sentiment_config.get("spike_mute_threshold", 0.8),
}
async def _apply_moderation(
self,
message: discord.Message,
user_id: int,
score: float,
drama_score: float,
categories: list[str],
thresholds: dict,
db_message_id: int | None,
) -> None:
"""Issue a warning or mute based on scores and thresholds."""
mute_threshold = self.bot.drama_tracker.get_mute_threshold(user_id, thresholds["mute"])
user_data = self.bot.drama_tracker.get_user(user_id)
if drama_score >= mute_threshold or score >= thresholds["spike_mute"]:
effective_score = max(drama_score, score)
if user_data.warned_since_reset:
await mute_user(self.bot, message, effective_score, categories, db_message_id, self._dirty_users)
else:
logger.info("Downgrading mute to warning for %s (no prior warning)", message.author)
await warn_user(self.bot, message, effective_score, db_message_id, self._dirty_users)
elif drama_score >= thresholds["warning"] or score >= thresholds["spike_warn"]:
effective_score = max(drama_score, score)
await warn_user(self.bot, message, effective_score, db_message_id, self._dirty_users)
@staticmethod
def _build_user_lookup(messages: list[discord.Message]) -> dict[str, tuple[int, discord.Message, list[discord.Message]]]:
"""Build username -> (user_id, ref_msg, [messages]) mapping."""
lookup: dict[str, tuple[int, discord.Message, list[discord.Message]]] = {}
for msg in messages:
name = msg.author.display_name
if name not in lookup:
lookup[name] = (msg.author.id, msg, [])
lookup[name][2].append(msg)
return lookup
def _build_user_notes_map(self, messages: list[discord.Message]) -> dict[str, str]:
"""Build username -> LLM notes mapping for users in the message list."""
notes_map: dict[str, str] = {}
for msg in messages:
name = msg.author.display_name
if name not in notes_map:
notes = self.bot.drama_tracker.get_user_notes(msg.author.id)
if notes:
notes_map[name] = notes
return notes_map
@staticmethod
def _build_anon_map(
conversation: list[tuple[str, str, datetime, str | None]],
) -> dict[str, str]:
"""Build display_name -> 'User1', 'User2', ... mapping for all participants."""
seen: dict[str, str] = {}
counter = 1
for username, _, _, reply_to in conversation:
if username not in seen:
seen[username] = f"User{counter}"
counter += 1
if reply_to and reply_to not in seen:
seen[reply_to] = f"User{counter}"
counter += 1
return seen
@staticmethod
def _anonymize_conversation(
conversation: list[tuple[str, str, datetime, str | None]],
anon_map: dict[str, str],
) -> list[tuple[str, str, datetime, str | None]]:
"""Replace display names with anonymous keys in conversation tuples."""
return [
(
anon_map.get(username, username),
content,
ts,
anon_map.get(reply_to, reply_to) if reply_to else None,
)
for username, content, ts, reply_to in conversation
]
@staticmethod
def _anonymize_notes(
user_notes_map: dict[str, str],
anon_map: dict[str, str],
) -> dict[str, str]:
"""Replace display name keys with anonymous keys in user notes map."""
return {anon_map.get(name, name): notes for name, notes in user_notes_map.items()}
@staticmethod
def _deanonymize_findings(result: dict, anon_map: dict[str, str]) -> None:
"""Replace anonymous keys back to display names in LLM findings (in-place)."""
reverse_map = {v: k for k, v in anon_map.items()}
for finding in result.get("user_findings", []):
anon_name = finding.get("username", "")
if anon_name in reverse_map:
finding["username"] = reverse_map[anon_name]
@staticmethod
def _build_conversation(
messages: list[discord.Message],
) -> list[tuple[str, str, datetime, str | None]]:
"""Convert a list of Discord messages to conversation tuples with reply resolution."""
msg_id_to_author = {m.id: m.author.display_name for m in messages}
conversation = []
for msg in messages:
reply_to = None
if msg.reference and msg.reference.message_id:
reply_to = msg_id_to_author.get(msg.reference.message_id)
if not reply_to:
ref = msg.reference.cached_message
if ref:
reply_to = ref.author.display_name
conversation.append((
msg.author.display_name,
msg.content,
msg.created_at,
reply_to,
))
return conversation
# -- Shared finding processor --
async def _process_finding(
self,
finding: dict,
user_lookup: dict,
*,
sentiment_config: dict,
dry_run: bool,
thresholds: dict,
db_content: str,
db_topic_category: str,
db_topic_reasoning: str,
db_coherence_score: float | None,
db_coherence_flag: str | None,
game_channels: dict | None = None,
coherence_config: dict | None = None,
) -> tuple[str, float, float, list[str]] | None:
"""Process a single user finding.
Returns (username, score, drama_score, categories) or None if skipped.
When game_channels is not None, topic drift, game redirect, and coherence
handlers are active (buffered analysis mode). When None, they are skipped
(mention scan mode).
"""
username = finding["username"]
lookup = user_lookup.get(username)
if not lookup:
return None
user_id, user_ref_msg, user_msgs = lookup
score = finding["toxicity_score"]
categories = finding["categories"]
reasoning = finding["reasoning"]
off_topic = finding.get("off_topic", False)
note_update = finding.get("note_update")
# Track in DramaTracker
self.bot.drama_tracker.add_entry(user_id, score, categories, reasoning)
escalation_boost = sentiment_config.get("escalation_boost", 0.04)
drama_score = self.bot.drama_tracker.get_drama_score(user_id, escalation_boost=escalation_boost)
logger.info(
"User %s (%d) | msg_score=%.2f | drama_score=%.2f | categories=%s | %s",
username, user_id, score, drama_score, categories, reasoning,
)
# Save to DB
db_message_id = await self.bot.db.save_message_and_analysis(
guild_id=user_ref_msg.guild.id,
channel_id=user_ref_msg.channel.id,
user_id=user_id,
username=username,
content=db_content,
message_ts=user_ref_msg.created_at.replace(tzinfo=timezone.utc),
toxicity_score=score,
drama_score=drama_score,
categories=categories,
reasoning=reasoning,
off_topic=off_topic,
topic_category=db_topic_category,
topic_reasoning=db_topic_reasoning,
coherence_score=db_coherence_score,
coherence_flag=db_coherence_flag,
)
# Feature handlers — only active during buffered analysis (game_channels set)
if game_channels is not None:
if off_topic:
await handle_topic_drift(
self.bot, user_ref_msg, db_topic_category, db_topic_reasoning,
db_message_id, self._dirty_users,
)
detected_game = finding.get("detected_game")
if detected_game and game_channels and not dry_run:
await handle_channel_redirect(
self.bot, user_ref_msg, detected_game, game_channels,
db_message_id, self._redirect_cooldowns,
)
if coherence_config is not None and coherence_config.get("enabled", True):
coherence_score = finding.get("coherence_score", 0.85)
coherence_flag = finding.get("coherence_flag", "normal")
degradation = self.bot.drama_tracker.update_coherence(
user_id=user_id,
score=coherence_score,
flag=coherence_flag,
drop_threshold=coherence_config.get("drop_threshold", 0.3),
absolute_floor=coherence_config.get("absolute_floor", 0.5),
cooldown_minutes=coherence_config.get("cooldown_minutes", 30),
)
if degradation and not dry_run:
await handle_coherence_alert(
self.bot, user_ref_msg, degradation, coherence_config,
db_message_id, self._dirty_users,
)
# Note update
if note_update:
self.bot.drama_tracker.update_user_notes(user_id, note_update)
self._dirty_users.add(user_id)
self._dirty_users.add(user_id)
# Log analysis
await log_analysis(
user_ref_msg, score, drama_score, categories, reasoning,
off_topic, db_topic_category,
)
# Moderation
if not dry_run:
await self._apply_moderation(
user_ref_msg, user_id, score, drama_score, categories, thresholds, db_message_id,
)
return (username, score, drama_score, categories)
# -- Buffered analysis --
async def _process_buffered(self, channel_id: int):
"""Collect buffered messages, build conversation block, and run analysis."""
messages = self._message_buffer.pop(channel_id, [])
self._debounce_tasks.pop(channel_id, None)
if not messages:
return
# Use the last message as reference for channel/guild
ref_message = messages[-1]
channel = ref_message.channel
config = self.bot.config
sentiment_config = config.get("sentiment", {})
game_channels = config.get("game_channels", {})
# Fetch some history before the buffered messages for leading context
context_count = sentiment_config.get("context_messages", 8)
oldest_buffered = messages[0]
history_messages: list[discord.Message] = []
try:
async for msg in channel.history(limit=context_count + 5, before=oldest_buffered):
if msg.author.bot:
continue
if not msg.content or not msg.content.strip():
continue
history_messages.append(msg)
if len(history_messages) >= context_count:
break
except discord.HTTPException:
pass
history_messages.reverse() # chronological order
# Combine: history (context) + buffered (new messages to analyze)
new_message_start = len(history_messages)
all_messages = history_messages + messages
conversation = self._build_conversation(all_messages)
if not conversation:
return
user_notes_map = self._build_user_notes_map(messages)
# Anonymize usernames before sending to LLM to prevent name-based bias
anon_map = self._build_anon_map(conversation)
anon_conversation = self._anonymize_conversation(conversation, anon_map)
anon_notes = self._anonymize_notes(user_notes_map, anon_map) if user_notes_map else user_notes_map
channel_context = build_channel_context(ref_message, game_channels)
logger.info(
"Channel analysis: %d new messages (+%d context) in #%s",
len(messages), len(history_messages),
getattr(channel, 'name', 'unknown'),
)
# TRIAGE: Lightweight model — conversation-level analysis
result = await self.bot.llm.analyze_conversation(
anon_conversation,
channel_context=channel_context,
user_notes_map=anon_notes,
new_message_start=new_message_start,
)
if result is None:
return
# ESCALATION: Re-analyze with heavy model if any finding warrants it
escalation_threshold = sentiment_config.get("escalation_threshold", 0.25)
needs_escalation = any(
f["toxicity_score"] >= escalation_threshold
or f.get("off_topic", False)
or f.get("coherence_score", 1.0) < 0.6
for f in result.get("user_findings", [])
)
if needs_escalation:
heavy_result = await self.bot.llm_heavy.analyze_conversation(
anon_conversation,
channel_context=channel_context,
user_notes_map=anon_notes,
new_message_start=new_message_start,
)
if heavy_result is not None:
logger.info(
"Escalated to heavy model for #%s",
getattr(channel, 'name', 'unknown'),
)
result = heavy_result
# De-anonymize findings back to real display names
self._deanonymize_findings(result, anon_map)
user_lookup = self._build_user_lookup(messages)
# Mark all buffered messages as analyzed (for mention scan dedup)
for m in messages:
self._mark_analyzed(m.id)
dry_run = config.get("monitoring", {}).get("dry_run", False)
thresholds = self._resolve_thresholds()
coherence_config = config.get("coherence", {})
# Process per-user findings
for finding in result.get("user_findings", []):
username = finding["username"]
lookup = user_lookup.get(username)
if not lookup:
continue
_, _, user_msgs = lookup
combined_content = "\n".join(
m.content for m in user_msgs if m.content and m.content.strip()
)[:4000]
await self._process_finding(
finding, user_lookup,
sentiment_config=sentiment_config,
dry_run=dry_run,
thresholds=thresholds,
db_content=combined_content,
db_topic_category=finding.get("topic_category", "general_chat"),
db_topic_reasoning=finding.get("topic_reasoning", ""),
db_coherence_score=finding.get("coherence_score", 0.85),
db_coherence_flag=finding.get("coherence_flag", "normal"),
game_channels=game_channels,
coherence_config=coherence_config,
)
# -- Mention scan methods --
def _mark_analyzed(self, discord_message_id: int):
"""Track a Discord message ID as already analyzed."""
self._analyzed_message_ids.add(discord_message_id)
if len(self._analyzed_message_ids) > self._max_analyzed_ids:
sorted_ids = sorted(self._analyzed_message_ids)
self._analyzed_message_ids = set(sorted_ids[len(sorted_ids) // 2:])
async def _maybe_start_mention_scan(
self, trigger_message: discord.Message, mention_config: dict
):
"""Check cooldown and kick off a mention-triggered scan of recent messages."""
channel_id = trigger_message.channel.id
cooldown_seconds = mention_config.get("cooldown_seconds", 60)
now = datetime.now(timezone.utc)
last_scan = self._mention_scan_cooldowns.get(channel_id)
if last_scan and (now - last_scan).total_seconds() < cooldown_seconds:
logger.info(
"Mention scan cooldown active for #%s, skipping.",
getattr(trigger_message.channel, "name", "unknown"),
)
return
self._mention_scan_cooldowns[channel_id] = now
# Extract the user's concern (strip the bot ping from the message)
mention_text = trigger_message.content
for fmt in (f"<@{self.bot.user.id}>", f"<@!{self.bot.user.id}>"):
mention_text = mention_text.replace(fmt, "")
mention_text = mention_text.strip() or "(user pinged bot without specific concern)"
# Store as a mention scan task (separate from debounce)
existing_task = self._mention_scan_tasks.get(channel_id)
if existing_task and not existing_task.done():
existing_task.cancel()
self._mention_scan_tasks[channel_id] = asyncio.create_task(
self._run_mention_scan(trigger_message, mention_text, mention_config)
)
async def _run_mention_scan(
self,
trigger_message: discord.Message,
mention_text: str,
mention_config: dict,
):
"""Scan recent channel messages with ONE conversation-level LLM call."""
channel = trigger_message.channel
scan_count = mention_config.get("scan_messages", 30)
config = self.bot.config
sentiment_config = config.get("sentiment", {})
game_channels = config.get("game_channels", {})
# Fetch recent messages (before the trigger, skip bots/empty)
raw_messages: list[discord.Message] = []
try:
async for msg in channel.history(limit=scan_count + 10, before=trigger_message):
if msg.author.bot:
continue
if not msg.content or not msg.content.strip():
continue
raw_messages.append(msg)
if len(raw_messages) >= scan_count:
break
except discord.HTTPException:
logger.warning("Failed to fetch history for mention scan in #%s",
getattr(channel, "name", "unknown"))
return
raw_messages.reverse() # chronological order
if not raw_messages:
self._mention_scan_results[trigger_message.id] = "No recent messages found to analyze."
return
logger.info(
"Mention scan triggered by %s in #%s: %d messages (single LLM call). Focus: %s",
trigger_message.author.display_name,
getattr(channel, "name", "unknown"),
len(raw_messages),
mention_text[:80],
)
conversation = self._build_conversation(raw_messages)
user_notes_map = self._build_user_notes_map(raw_messages)
# Anonymize usernames before sending to LLM
anon_map = self._build_anon_map(conversation)
anon_conversation = self._anonymize_conversation(conversation, anon_map)
anon_notes = self._anonymize_notes(user_notes_map, anon_map) if user_notes_map else user_notes_map
channel_context = build_channel_context(raw_messages[0], game_channels)
mention_context = (
f"A user flagged this conversation and said: \"{mention_text}\"\n"
f"Pay special attention to whether this concern is valid."
)
# Single LLM call
result = await self.bot.llm.analyze_conversation(
anon_conversation,
mention_context=mention_context,
channel_context=channel_context,
user_notes_map=anon_notes,
)
if result is None:
logger.warning("Conversation analysis failed for mention scan.")
self._mention_scan_results[trigger_message.id] = "Analysis failed."
return
# De-anonymize findings back to real display names
self._deanonymize_findings(result, anon_map)
user_lookup = self._build_user_lookup(raw_messages)
findings: list[str] = []
dry_run = config.get("monitoring", {}).get("dry_run", False)
thresholds = self._resolve_thresholds()
for finding in result.get("user_findings", []):
username = finding["username"]
lookup = user_lookup.get(username)
if not lookup:
logger.warning("Mention scan: LLM returned unknown user '%s', skipping.", username)
continue
user_id, ref_msg, user_msgs = lookup
# Skip if all their messages were already analyzed
if all(m.id in self._analyzed_message_ids for m in user_msgs):
continue
# Mark their messages as analyzed
for m in user_msgs:
self._mark_analyzed(m.id)
worst_msg = finding.get("worst_message")
content = f"[Mention scan] {worst_msg}" if worst_msg else "[Mention scan] See conversation"
off_topic = finding.get("off_topic", False)
result_tuple = await self._process_finding(
finding, user_lookup,
sentiment_config=sentiment_config,
dry_run=dry_run,
thresholds=thresholds,
db_content=content,
db_topic_category="personal_drama" if off_topic else "gaming",
db_topic_reasoning=finding.get("reasoning", ""),
db_coherence_score=None,
db_coherence_flag=None,
)
if result_tuple:
_, score, _, categories = result_tuple
if score >= 0.3:
cat_str = ", ".join(c for c in categories if c != "none") or "none"
findings.append(f"{username}: {score:.2f} ({cat_str})")
# Build summary for ChatCog
convo_summary = result.get("conversation_summary", "")
if findings:
summary = f"Scanned {len(raw_messages)} msgs. {convo_summary} Notable: " + "; ".join(findings[:5])
else:
summary = f"Scanned {len(raw_messages)} msgs. {convo_summary}"
# Prune old scan results
if len(self._mention_scan_results) > 20:
oldest = sorted(self._mention_scan_results.keys())[:len(self._mention_scan_results) - 10]
for k in oldest:
del self._mention_scan_results[k]
self._mention_scan_results[trigger_message.id] = summary
logger.info(
"Mention scan complete in #%s: 1 LLM call, %d messages, %d users flagged",
getattr(channel, "name", "unknown"),
len(raw_messages),
len(findings),
)
# -- State flush loop --
@tasks.loop(seconds=STATE_FLUSH_INTERVAL)
async def _flush_states(self):
await flush_dirty_states(self.bot, self._dirty_users)
@_flush_states.before_loop
async def _before_flush(self):
await self.bot.wait_until_ready()
async def setup(bot: commands.Bot):
await bot.add_cog(SentimentCog(bot))