import asyncio import logging from datetime import datetime, timedelta, timezone import discord from discord.ext import commands, tasks from cogs.sentiment.actions import mute_user, warn_user from cogs.sentiment.channel_redirect import build_channel_context, handle_channel_redirect from cogs.sentiment.coherence import handle_coherence_alert from cogs.sentiment.log_utils import log_analysis from cogs.sentiment.state import flush_dirty_states from cogs.sentiment.topic_drift import handle_topic_drift logger = logging.getLogger("bcs.sentiment") # How often to flush dirty user states to DB (seconds) STATE_FLUSH_INTERVAL = 300 # 5 minutes class SentimentCog(commands.Cog): def __init__(self, bot: commands.Bot): self.bot = bot # Track which user IDs have unsaved in-memory changes self._dirty_users: set[int] = set() # Per-user redirect cooldown: {user_id: last_redirect_datetime} self._redirect_cooldowns: dict[int, datetime] = {} # Debounce buffer: keyed by channel_id, stores list of messages from ALL users self._message_buffer: dict[int, list[discord.Message]] = {} # Pending debounce timer tasks (per-channel) self._debounce_tasks: dict[int, asyncio.Task] = {} # Mention scan tasks (separate from debounce) self._mention_scan_tasks: dict[int, asyncio.Task] = {} # Mention scan state self._mention_scan_cooldowns: dict[int, datetime] = {} # {channel_id: last_scan_time} self._mention_scan_results: dict[int, str] = {} # {trigger_message_id: findings_summary} self._analyzed_message_ids: set[int] = set() # Discord message IDs already analyzed self._max_analyzed_ids = 500 async def cog_load(self): self._flush_states.start() async def cog_unload(self): self._flush_states.cancel() # Cancel all pending debounce timers and process remaining buffers for task in self._debounce_tasks.values(): task.cancel() self._debounce_tasks.clear() for task in self._mention_scan_tasks.values(): task.cancel() self._mention_scan_tasks.clear() for channel_id in list(self._message_buffer): await self._process_buffered(channel_id) # Final flush on shutdown await flush_dirty_states(self.bot, self._dirty_users) @commands.Cog.listener() async def on_message(self, message: discord.Message): logger.info("MSG from %s in #%s: %s", message.author, getattr(message.channel, 'name', 'DM'), message.content[:80] if message.content else "(empty)") # Ignore bots (including ourselves) if message.author.bot: return # Ignore DMs if not message.guild: return config = self.bot.config monitoring = config.get("monitoring", {}) if not monitoring.get("enabled", True): return # Check if channel is monitored monitored_channels = monitoring.get("channels", []) if monitored_channels and message.channel.id not in monitored_channels: return # Check ignored users if message.author.id in monitoring.get("ignored_users", []): return # Check immune roles immune_roles = set(monitoring.get("immune_roles", [])) if immune_roles and any( r.id in immune_roles for r in message.author.roles ): return # Check per-user immunity if self.bot.drama_tracker.is_immune(message.author.id): return # Explicit @mention of the bot triggers a mention scan instead of scoring. # Reply-pings (Discord auto-adds replied-to user to mentions) should NOT # trigger scans — and reply-to-bot messages should still be scored normally # so toxic replies to bot warnings aren't silently skipped. bot_mentioned_in_text = ( f"<@{self.bot.user.id}>" in (message.content or "") or f"<@!{self.bot.user.id}>" in (message.content or "") ) if bot_mentioned_in_text: # Classify intent: only run expensive mention scan for reports, # let ChatCog handle casual chat/questions intent = await self.bot.llm.classify_mention_intent( message.content or "" ) logger.info( "Mention intent for %s: %s", message.author, intent ) if intent == "report": mention_config = config.get("mention_scan", {}) if mention_config.get("enabled", True): await self._maybe_start_mention_scan(message, mention_config) return # Skip if empty if not message.content or not message.content.strip(): return # Buffer the message and start/reset debounce timer (per-channel) channel_id = message.channel.id if channel_id not in self._message_buffer: self._message_buffer[channel_id] = [] self._message_buffer[channel_id].append(message) # Cancel existing debounce timer for this channel existing_task = self._debounce_tasks.get(channel_id) if existing_task and not existing_task.done(): existing_task.cancel() batch_window = config.get("sentiment", {}).get("batch_window_seconds", 3) self._debounce_tasks[channel_id] = asyncio.create_task( self._debounce_then_process(channel_id, batch_window) ) async def _debounce_then_process(self, channel_id: int, delay: float): """Sleep for the debounce window, then process the buffered messages.""" try: await asyncio.sleep(delay) await self._process_buffered(channel_id) except asyncio.CancelledError: pass # Timer was reset by a new message — expected def _resolve_thresholds(self) -> dict: """Resolve effective moderation thresholds based on current mode.""" config = self.bot.config sentiment_config = config.get("sentiment", {}) mode_config = self.bot.get_mode_config() moderation_level = mode_config.get("moderation", "full") if moderation_level == "relaxed" and "relaxed_thresholds" in mode_config: rt = mode_config["relaxed_thresholds"] return { "warning": rt.get("warning_threshold", 0.80), "mute": rt.get("mute_threshold", 0.85), "spike_warn": rt.get("spike_warning_threshold", 0.70), "spike_mute": rt.get("spike_mute_threshold", 0.85), } return { "warning": sentiment_config.get("warning_threshold", 0.6), "mute": sentiment_config.get("mute_threshold", 0.75), "spike_warn": sentiment_config.get("spike_warning_threshold", 0.5), "spike_mute": sentiment_config.get("spike_mute_threshold", 0.8), } async def _apply_moderation( self, message: discord.Message, user_id: int, score: float, drama_score: float, categories: list[str], thresholds: dict, db_message_id: int | None, ) -> None: """Issue a warning or mute based on scores and thresholds.""" mute_threshold = self.bot.drama_tracker.get_mute_threshold(user_id, thresholds["mute"]) user_data = self.bot.drama_tracker.get_user(user_id) if drama_score >= mute_threshold or score >= thresholds["spike_mute"]: effective_score = max(drama_score, score) if user_data.warned_since_reset: await mute_user(self.bot, message, effective_score, categories, db_message_id, self._dirty_users) else: logger.info("Downgrading mute to warning for %s (no prior warning)", message.author) await warn_user(self.bot, message, effective_score, db_message_id, self._dirty_users) elif drama_score >= thresholds["warning"] or score >= thresholds["spike_warn"]: effective_score = max(drama_score, score) await warn_user(self.bot, message, effective_score, db_message_id, self._dirty_users) @staticmethod def _build_user_lookup(messages: list[discord.Message]) -> dict[str, tuple[int, discord.Message, list[discord.Message]]]: """Build username -> (user_id, ref_msg, [messages]) mapping.""" lookup: dict[str, tuple[int, discord.Message, list[discord.Message]]] = {} for msg in messages: name = msg.author.display_name if name not in lookup: lookup[name] = (msg.author.id, msg, []) lookup[name][2].append(msg) return lookup def _build_user_notes_map(self, messages: list[discord.Message]) -> dict[str, str]: """Build username -> LLM notes mapping for users in the message list.""" notes_map: dict[str, str] = {} for msg in messages: name = msg.author.display_name if name not in notes_map: notes = self.bot.drama_tracker.get_user_notes(msg.author.id) if notes: notes_map[name] = notes return notes_map @staticmethod def _build_anon_map( conversation: list[tuple[str, str, datetime, str | None]], ) -> dict[str, str]: """Build display_name -> 'User1', 'User2', ... mapping for all participants.""" seen: dict[str, str] = {} counter = 1 for username, _, _, reply_to in conversation: if username not in seen: seen[username] = f"User{counter}" counter += 1 if reply_to and reply_to not in seen: seen[reply_to] = f"User{counter}" counter += 1 return seen @staticmethod def _anonymize_conversation( conversation: list[tuple[str, str, datetime, str | None]], anon_map: dict[str, str], ) -> list[tuple[str, str, datetime, str | None]]: """Replace display names with anonymous keys in conversation tuples.""" return [ ( anon_map.get(username, username), content, ts, anon_map.get(reply_to, reply_to) if reply_to else None, ) for username, content, ts, reply_to in conversation ] @staticmethod def _anonymize_notes( user_notes_map: dict[str, str], anon_map: dict[str, str], ) -> dict[str, str]: """Replace display name keys with anonymous keys in user notes map.""" return {anon_map.get(name, name): notes for name, notes in user_notes_map.items()} def _build_alias_context( self, messages: list[discord.Message], anon_map: dict[str, str], ) -> str: """Build anonymized alias context string for the LLM. Maps user IDs from messages to their known nicknames from DramaTracker, then replaces display names with anonymous keys. """ all_aliases = self.bot.drama_tracker.get_all_aliases() if not all_aliases: return "" lines = [] seen_ids: set[int] = set() for msg in messages: uid = msg.author.id if uid in seen_ids: continue seen_ids.add(uid) aliases = all_aliases.get(uid) if aliases: anon_key = anon_map.get(msg.author.display_name, msg.author.display_name) lines.append(f" {anon_key} is also known as: {', '.join(aliases)}") # Also include aliases for members NOT in the conversation (so the LLM # can recognize name-drops of absent members) for uid, aliases in all_aliases.items(): if uid not in seen_ids: lines.append(f" (not in chat) also known as: {', '.join(aliases)}") return "\n".join(lines) if lines else "" @staticmethod def _deanonymize_findings(result: dict, anon_map: dict[str, str]) -> None: """Replace anonymous keys back to display names in LLM findings (in-place).""" reverse_map = {v: k for k, v in anon_map.items()} for finding in result.get("user_findings", []): anon_name = finding.get("username", "") if anon_name in reverse_map: finding["username"] = reverse_map[anon_name] # De-anonymize text fields that may reference other users for field in ("note_update", "reasoning", "worst_message"): text = finding.get(field) if text: for anon, real in reverse_map.items(): text = text.replace(anon, real) finding[field] = text @staticmethod def _build_conversation( messages: list[discord.Message], ) -> list[tuple[str, str, datetime, str | None]]: """Convert a list of Discord messages to conversation tuples with reply resolution.""" msg_id_to_author = {m.id: m.author.display_name for m in messages} conversation = [] for msg in messages: reply_to = None if msg.reference and msg.reference.message_id: reply_to = msg_id_to_author.get(msg.reference.message_id) if not reply_to: ref = msg.reference.cached_message if ref: reply_to = ref.author.display_name conversation.append(( msg.author.display_name, msg.content, msg.created_at, reply_to, )) return conversation # -- Shared finding processor -- async def _process_finding( self, finding: dict, user_lookup: dict, *, sentiment_config: dict, dry_run: bool, thresholds: dict, db_content: str, db_topic_category: str, db_topic_reasoning: str, db_coherence_score: float | None, db_coherence_flag: str | None, game_channels: dict | None = None, coherence_config: dict | None = None, ) -> tuple[str, float, float, list[str]] | None: """Process a single user finding. Returns (username, score, drama_score, categories) or None if skipped. When game_channels is not None, topic drift, game redirect, and coherence handlers are active (buffered analysis mode). When None, they are skipped (mention scan mode). """ username = finding["username"] lookup = user_lookup.get(username) if not lookup: return None user_id, user_ref_msg, user_msgs = lookup score = finding["toxicity_score"] categories = finding["categories"] reasoning = finding["reasoning"] off_topic = finding.get("off_topic", False) note_update = finding.get("note_update") # Track in DramaTracker self.bot.drama_tracker.add_entry(user_id, score, categories, reasoning) escalation_boost = sentiment_config.get("escalation_boost", 0.04) drama_score = self.bot.drama_tracker.get_drama_score(user_id, escalation_boost=escalation_boost) logger.info( "User %s (%d) | msg_score=%.2f | drama_score=%.2f | categories=%s | %s", username, user_id, score, drama_score, categories, reasoning, ) # Save to DB db_message_id = await self.bot.db.save_message_and_analysis( guild_id=user_ref_msg.guild.id, channel_id=user_ref_msg.channel.id, user_id=user_id, username=username, content=db_content, message_ts=user_ref_msg.created_at.replace(tzinfo=timezone.utc), toxicity_score=score, drama_score=drama_score, categories=categories, reasoning=reasoning, off_topic=off_topic, topic_category=db_topic_category, topic_reasoning=db_topic_reasoning, coherence_score=db_coherence_score, coherence_flag=db_coherence_flag, ) # Feature handlers — only active during buffered analysis (game_channels set) if game_channels is not None: if off_topic: await handle_topic_drift( self.bot, user_ref_msg, db_topic_category, db_topic_reasoning, db_message_id, self._dirty_users, ) detected_game = finding.get("detected_game") if detected_game and game_channels and not dry_run: await handle_channel_redirect( self.bot, user_ref_msg, detected_game, game_channels, db_message_id, self._redirect_cooldowns, ) if coherence_config is not None and coherence_config.get("enabled", True): coherence_score = finding.get("coherence_score", 0.85) coherence_flag = finding.get("coherence_flag", "normal") degradation = self.bot.drama_tracker.update_coherence( user_id=user_id, score=coherence_score, flag=coherence_flag, drop_threshold=coherence_config.get("drop_threshold", 0.3), absolute_floor=coherence_config.get("absolute_floor", 0.5), cooldown_minutes=coherence_config.get("cooldown_minutes", 30), ) if degradation and not dry_run: await handle_coherence_alert( self.bot, user_ref_msg, degradation, coherence_config, db_message_id, self._dirty_users, ) # Note update — route to memory system if note_update: # Still update the legacy notes for backward compat with analysis prompt self.bot.drama_tracker.update_user_notes(user_id, note_update) self._dirty_users.add(user_id) # Also save as an expiring memory (7d default for passive observations) asyncio.create_task(self.bot.db.save_memory( user_id=user_id, memory=note_update[:500], topics=db_topic_category or "general", importance="medium", expires_at=datetime.now(timezone.utc) + timedelta(days=7), source="passive", )) self._dirty_users.add(user_id) # Log analysis await log_analysis( user_ref_msg, score, drama_score, categories, reasoning, off_topic, db_topic_category, ) # Moderation if not dry_run: await self._apply_moderation( user_ref_msg, user_id, score, drama_score, categories, thresholds, db_message_id, ) return (username, score, drama_score, categories) # -- Buffered analysis -- async def _process_buffered(self, channel_id: int): """Collect buffered messages, build conversation block, and run analysis.""" messages = self._message_buffer.pop(channel_id, []) self._debounce_tasks.pop(channel_id, None) if not messages: return # Use the last message as reference for channel/guild ref_message = messages[-1] channel = ref_message.channel config = self.bot.config sentiment_config = config.get("sentiment", {}) game_channels = config.get("game_channels", {}) # Fetch some history before the buffered messages for leading context context_count = sentiment_config.get("context_messages", 8) oldest_buffered = messages[0] history_messages: list[discord.Message] = [] try: async for msg in channel.history(limit=context_count + 5, before=oldest_buffered): if msg.author.bot: continue if not msg.content or not msg.content.strip(): continue history_messages.append(msg) if len(history_messages) >= context_count: break except discord.HTTPException: pass history_messages.reverse() # chronological order # Combine: history (context) + buffered (new messages to analyze) new_message_start = len(history_messages) all_messages = history_messages + messages conversation = self._build_conversation(all_messages) if not conversation: return user_notes_map = self._build_user_notes_map(messages) # Anonymize usernames before sending to LLM to prevent name-based bias anon_map = self._build_anon_map(conversation) anon_conversation = self._anonymize_conversation(conversation, anon_map) anon_notes = self._anonymize_notes(user_notes_map, anon_map) if user_notes_map else user_notes_map alias_context = self._build_alias_context(all_messages, anon_map) channel_context = build_channel_context(ref_message, game_channels) logger.info( "Channel analysis: %d new messages (+%d context) in #%s", len(messages), len(history_messages), getattr(channel, 'name', 'unknown'), ) # TRIAGE: Lightweight model — conversation-level analysis result = await self.bot.llm.analyze_conversation( anon_conversation, channel_context=channel_context, user_notes_map=anon_notes, new_message_start=new_message_start, user_aliases=alias_context, ) if result is None: return # ESCALATION: Re-analyze with heavy model if any finding warrants it escalation_threshold = sentiment_config.get("escalation_threshold", 0.25) needs_escalation = any( f["toxicity_score"] >= escalation_threshold or f.get("off_topic", False) or f.get("coherence_score", 1.0) < 0.6 for f in result.get("user_findings", []) ) if needs_escalation: heavy_result = await self.bot.llm_heavy.analyze_conversation( anon_conversation, channel_context=channel_context, user_notes_map=anon_notes, new_message_start=new_message_start, user_aliases=alias_context, ) if heavy_result is not None: logger.info( "Escalated to heavy model for #%s", getattr(channel, 'name', 'unknown'), ) result = heavy_result # De-anonymize findings back to real display names self._deanonymize_findings(result, anon_map) user_lookup = self._build_user_lookup(messages) # Mark all buffered messages as analyzed (for mention scan dedup) for m in messages: self._mark_analyzed(m.id) dry_run = config.get("monitoring", {}).get("dry_run", False) thresholds = self._resolve_thresholds() coherence_config = config.get("coherence", {}) # Process per-user findings for finding in result.get("user_findings", []): username = finding["username"] lookup = user_lookup.get(username) if not lookup: continue _, _, user_msgs = lookup combined_content = "\n".join( m.content for m in user_msgs if m.content and m.content.strip() )[:4000] await self._process_finding( finding, user_lookup, sentiment_config=sentiment_config, dry_run=dry_run, thresholds=thresholds, db_content=combined_content, db_topic_category=finding.get("topic_category", "general_chat"), db_topic_reasoning=finding.get("topic_reasoning", ""), db_coherence_score=finding.get("coherence_score", 0.85), db_coherence_flag=finding.get("coherence_flag", "normal"), game_channels=game_channels, coherence_config=coherence_config, ) # -- Mention scan methods -- def _mark_analyzed(self, discord_message_id: int): """Track a Discord message ID as already analyzed.""" self._analyzed_message_ids.add(discord_message_id) if len(self._analyzed_message_ids) > self._max_analyzed_ids: sorted_ids = sorted(self._analyzed_message_ids) self._analyzed_message_ids = set(sorted_ids[len(sorted_ids) // 2:]) async def _maybe_start_mention_scan( self, trigger_message: discord.Message, mention_config: dict ): """Check cooldown and kick off a mention-triggered scan of recent messages.""" channel_id = trigger_message.channel.id cooldown_seconds = mention_config.get("cooldown_seconds", 60) now = datetime.now(timezone.utc) last_scan = self._mention_scan_cooldowns.get(channel_id) if last_scan and (now - last_scan).total_seconds() < cooldown_seconds: logger.info( "Mention scan cooldown active for #%s, skipping.", getattr(trigger_message.channel, "name", "unknown"), ) return self._mention_scan_cooldowns[channel_id] = now # Extract the user's concern (strip the bot ping from the message) mention_text = trigger_message.content for fmt in (f"<@{self.bot.user.id}>", f"<@!{self.bot.user.id}>"): mention_text = mention_text.replace(fmt, "") mention_text = mention_text.strip() or "(user pinged bot without specific concern)" # Store as a mention scan task (separate from debounce) existing_task = self._mention_scan_tasks.get(channel_id) if existing_task and not existing_task.done(): existing_task.cancel() self._mention_scan_tasks[channel_id] = asyncio.create_task( self._run_mention_scan(trigger_message, mention_text, mention_config) ) async def _run_mention_scan( self, trigger_message: discord.Message, mention_text: str, mention_config: dict, ): """Scan recent channel messages with ONE conversation-level LLM call.""" channel = trigger_message.channel scan_count = mention_config.get("scan_messages", 30) config = self.bot.config sentiment_config = config.get("sentiment", {}) game_channels = config.get("game_channels", {}) # Fetch recent messages (before the trigger, skip bots/empty) raw_messages: list[discord.Message] = [] try: async for msg in channel.history(limit=scan_count + 10, before=trigger_message): if msg.author.bot: continue if not msg.content or not msg.content.strip(): continue raw_messages.append(msg) if len(raw_messages) >= scan_count: break except discord.HTTPException: logger.warning("Failed to fetch history for mention scan in #%s", getattr(channel, "name", "unknown")) return raw_messages.reverse() # chronological order if not raw_messages: self._mention_scan_results[trigger_message.id] = "No recent messages found to analyze." return logger.info( "Mention scan triggered by %s in #%s: %d messages (single LLM call). Focus: %s", trigger_message.author.display_name, getattr(channel, "name", "unknown"), len(raw_messages), mention_text[:80], ) conversation = self._build_conversation(raw_messages) user_notes_map = self._build_user_notes_map(raw_messages) # Anonymize usernames before sending to LLM anon_map = self._build_anon_map(conversation) anon_conversation = self._anonymize_conversation(conversation, anon_map) anon_notes = self._anonymize_notes(user_notes_map, anon_map) if user_notes_map else user_notes_map alias_context = self._build_alias_context(raw_messages, anon_map) channel_context = build_channel_context(raw_messages[0], game_channels) mention_context = ( f"A user flagged this conversation and said: \"{mention_text}\"\n" f"Pay special attention to whether this concern is valid." ) # Single LLM call result = await self.bot.llm.analyze_conversation( anon_conversation, mention_context=mention_context, channel_context=channel_context, user_notes_map=anon_notes, user_aliases=alias_context, ) if result is None: logger.warning("Conversation analysis failed for mention scan.") self._mention_scan_results[trigger_message.id] = "Analysis failed." return # De-anonymize findings back to real display names self._deanonymize_findings(result, anon_map) user_lookup = self._build_user_lookup(raw_messages) findings: list[str] = [] dry_run = config.get("monitoring", {}).get("dry_run", False) thresholds = self._resolve_thresholds() for finding in result.get("user_findings", []): username = finding["username"] lookup = user_lookup.get(username) if not lookup: logger.warning("Mention scan: LLM returned unknown user '%s', skipping.", username) continue user_id, ref_msg, user_msgs = lookup # Skip if all their messages were already analyzed if all(m.id in self._analyzed_message_ids for m in user_msgs): continue # Mark their messages as analyzed for m in user_msgs: self._mark_analyzed(m.id) worst_msg = finding.get("worst_message") content = f"[Mention scan] {worst_msg}" if worst_msg else "[Mention scan] See conversation" off_topic = finding.get("off_topic", False) result_tuple = await self._process_finding( finding, user_lookup, sentiment_config=sentiment_config, dry_run=dry_run, thresholds=thresholds, db_content=content, db_topic_category="personal_drama" if off_topic else "gaming", db_topic_reasoning=finding.get("reasoning", ""), db_coherence_score=None, db_coherence_flag=None, ) if result_tuple: _, score, _, categories = result_tuple if score >= 0.3: cat_str = ", ".join(c for c in categories if c != "none") or "none" findings.append(f"{username}: {score:.2f} ({cat_str})") # Build summary for ChatCog convo_summary = result.get("conversation_summary", "") if findings: summary = f"Scanned {len(raw_messages)} msgs. {convo_summary} Notable: " + "; ".join(findings[:5]) else: summary = f"Scanned {len(raw_messages)} msgs. {convo_summary}" # Prune old scan results if len(self._mention_scan_results) > 20: oldest = sorted(self._mention_scan_results.keys())[:len(self._mention_scan_results) - 10] for k in oldest: del self._mention_scan_results[k] self._mention_scan_results[trigger_message.id] = summary logger.info( "Mention scan complete in #%s: 1 LLM call, %d messages, %d users flagged", getattr(channel, "name", "unknown"), len(raw_messages), len(findings), ) # -- State flush loop -- @tasks.loop(seconds=STATE_FLUSH_INTERVAL) async def _flush_states(self): await flush_dirty_states(self.bot, self._dirty_users) @_flush_states.before_loop async def _before_flush(self): await self.bot.wait_until_ready() async def setup(bot: commands.Bot): await bot.add_cog(SentimentCog(bot))