import asyncio import logging from datetime import datetime, timedelta, timezone import discord from discord.ext import commands, tasks from cogs.sentiment.actions import mute_user, warn_user from cogs.sentiment.channel_redirect import build_channel_context, handle_channel_redirect from cogs.sentiment.coherence import handle_coherence_alert from cogs.sentiment.log_utils import log_analysis from cogs.sentiment.state import flush_dirty_states from cogs.sentiment.topic_drift import handle_topic_drift logger = logging.getLogger("bcs.sentiment") # How often to flush dirty user states to DB (seconds) STATE_FLUSH_INTERVAL = 300 # 5 minutes class SentimentCog(commands.Cog): def __init__(self, bot: commands.Bot): self.bot = bot # Track which user IDs have unsaved in-memory changes self._dirty_users: set[int] = set() # Per-user redirect cooldown: {user_id: last_redirect_datetime} self._redirect_cooldowns: dict[int, datetime] = {} # Debounce buffer: keyed by channel_id, stores list of messages from ALL users self._message_buffer: dict[int, list[discord.Message]] = {} # Pending debounce timer tasks (per-channel) self._debounce_tasks: dict[int, asyncio.Task] = {} # Mention scan tasks (separate from debounce) self._mention_scan_tasks: dict[int, asyncio.Task] = {} # Mention scan state self._mention_scan_cooldowns: dict[int, datetime] = {} # {channel_id: last_scan_time} self._mention_scan_results: dict[int, str] = {} # {trigger_message_id: findings_summary} self._analyzed_message_ids: set[int] = set() # Discord message IDs already analyzed self._max_analyzed_ids = 500 async def cog_load(self): self._flush_states.start() async def cog_unload(self): self._flush_states.cancel() # Cancel all pending debounce timers and process remaining buffers for task in self._debounce_tasks.values(): task.cancel() self._debounce_tasks.clear() for task in self._mention_scan_tasks.values(): task.cancel() self._mention_scan_tasks.clear() for channel_id in list(self._message_buffer): await self._process_buffered(channel_id) # Final flush on shutdown await flush_dirty_states(self.bot, self._dirty_users) @commands.Cog.listener() async def on_message(self, message: discord.Message): logger.info("MSG from %s in #%s: %s", message.author, getattr(message.channel, 'name', 'DM'), message.content[:80] if message.content else "(empty)") # Ignore bots (including ourselves) if message.author.bot: return # Ignore DMs if not message.guild: return config = self.bot.config monitoring = config.get("monitoring", {}) if not monitoring.get("enabled", True): return # Check if channel is monitored monitored_channels = monitoring.get("channels", []) if monitored_channels and message.channel.id not in monitored_channels: return # Check ignored users if message.author.id in monitoring.get("ignored_users", []): return # Check immune roles immune_roles = set(monitoring.get("immune_roles", [])) if immune_roles and any( r.id in immune_roles for r in message.author.roles ): return # Check per-user immunity if self.bot.drama_tracker.is_immune(message.author.id): return # Explicit @mention of the bot triggers a mention scan instead of scoring. # Reply-pings (Discord auto-adds replied-to user to mentions) should NOT # trigger scans — and reply-to-bot messages should still be scored normally # so toxic replies to bot warnings aren't silently skipped. bot_mentioned_in_text = ( f"<@{self.bot.user.id}>" in (message.content or "") or f"<@!{self.bot.user.id}>" in (message.content or "") ) if bot_mentioned_in_text: # Classify intent: only run expensive mention scan for reports, # let ChatCog handle casual chat/questions intent = await self.bot.llm.classify_mention_intent( message.content or "" ) logger.info( "Mention intent for %s: %s", message.author, intent ) if intent == "report": mention_config = config.get("mention_scan", {}) if mention_config.get("enabled", True): await self._maybe_start_mention_scan(message, mention_config) return # Skip if empty if not message.content or not message.content.strip(): return # Buffer the message and start/reset debounce timer (per-channel) channel_id = message.channel.id if channel_id not in self._message_buffer: self._message_buffer[channel_id] = [] self._message_buffer[channel_id].append(message) # Cancel existing debounce timer for this channel existing_task = self._debounce_tasks.get(channel_id) if existing_task and not existing_task.done(): existing_task.cancel() batch_window = config.get("sentiment", {}).get("batch_window_seconds", 3) self._debounce_tasks[channel_id] = asyncio.create_task( self._debounce_then_process(channel_id, batch_window) ) async def _debounce_then_process(self, channel_id: int, delay: float): """Sleep for the debounce window, then process the buffered messages.""" try: await asyncio.sleep(delay) await self._process_buffered(channel_id) except asyncio.CancelledError: pass # Timer was reset by a new message — expected def _resolve_thresholds(self) -> dict: """Resolve effective moderation thresholds based on current mode.""" config = self.bot.config sentiment_config = config.get("sentiment", {}) mode_config = self.bot.get_mode_config() moderation_level = mode_config.get("moderation", "full") if moderation_level == "relaxed" and "relaxed_thresholds" in mode_config: rt = mode_config["relaxed_thresholds"] return { "warning": rt.get("warning_threshold", 0.80), "mute": rt.get("mute_threshold", 0.85), "spike_warn": rt.get("spike_warning_threshold", 0.70), "spike_mute": rt.get("spike_mute_threshold", 0.85), } return { "warning": sentiment_config.get("warning_threshold", 0.6), "mute": sentiment_config.get("mute_threshold", 0.75), "spike_warn": sentiment_config.get("spike_warning_threshold", 0.5), "spike_mute": sentiment_config.get("spike_mute_threshold", 0.8), } async def _apply_moderation( self, message: discord.Message, user_id: int, score: float, drama_score: float, categories: list[str], thresholds: dict, db_message_id: int | None, ) -> None: """Issue a warning or mute based on scores and thresholds.""" mute_threshold = self.bot.drama_tracker.get_mute_threshold(user_id, thresholds["mute"]) user_data = self.bot.drama_tracker.get_user(user_id) if drama_score >= mute_threshold or score >= thresholds["spike_mute"]: effective_score = max(drama_score, score) if user_data.warned_since_reset: await mute_user(self.bot, message, effective_score, categories, db_message_id, self._dirty_users) else: logger.info("Downgrading mute to warning for %s (no prior warning)", message.author) await warn_user(self.bot, message, effective_score, db_message_id, self._dirty_users) elif drama_score >= thresholds["warning"] or score >= thresholds["spike_warn"]: effective_score = max(drama_score, score) await warn_user(self.bot, message, effective_score, db_message_id, self._dirty_users) @staticmethod def _build_user_lookup(messages: list[discord.Message]) -> dict[str, tuple[int, discord.Message, list[discord.Message]]]: """Build username -> (user_id, ref_msg, [messages]) mapping.""" lookup: dict[str, tuple[int, discord.Message, list[discord.Message]]] = {} for msg in messages: name = msg.author.display_name if name not in lookup: lookup[name] = (msg.author.id, msg, []) lookup[name][2].append(msg) return lookup def _build_user_notes_map(self, messages: list[discord.Message]) -> dict[str, str]: """Build username -> LLM notes mapping for users in the message list.""" notes_map: dict[str, str] = {} for msg in messages: name = msg.author.display_name if name not in notes_map: notes = self.bot.drama_tracker.get_user_notes(msg.author.id) if notes: notes_map[name] = notes return notes_map @staticmethod def _build_anon_map( conversation: list[tuple[str, str, datetime, str | None]], ) -> dict[str, str]: """Build display_name -> 'User1', 'User2', ... mapping for all participants.""" seen: dict[str, str] = {} counter = 1 for username, _, _, reply_to in conversation: if username not in seen: seen[username] = f"User{counter}" counter += 1 if reply_to and reply_to not in seen: seen[reply_to] = f"User{counter}" counter += 1 return seen @staticmethod def _anonymize_conversation( conversation: list[tuple[str, str, datetime, str | None]], anon_map: dict[str, str], ) -> list[tuple[str, str, datetime, str | None]]: """Replace display names with anonymous keys in conversation tuples.""" return [ ( anon_map.get(username, username), content, ts, anon_map.get(reply_to, reply_to) if reply_to else None, ) for username, content, ts, reply_to in conversation ] @staticmethod def _anonymize_notes( user_notes_map: dict[str, str], anon_map: dict[str, str], ) -> dict[str, str]: """Replace display name keys with anonymous keys in user notes map.""" return {anon_map.get(name, name): notes for name, notes in user_notes_map.items()} @staticmethod def _build_alias_context( messages: list[discord.Message], anon_map: dict[str, str], alias_config: dict, ) -> str: """Build anonymized alias context string for the LLM. Maps user IDs from messages to their known nicknames using the config, then replaces display names with anonymous keys. """ if not alias_config: return "" lines = [] seen_ids: set[int] = set() for msg in messages: uid = msg.author.id if uid in seen_ids: continue seen_ids.add(uid) aliases = alias_config.get(uid) if aliases: anon_key = anon_map.get(msg.author.display_name, msg.author.display_name) lines.append(f" {anon_key} is also known as: {', '.join(aliases)}") # Also include aliases for members NOT in the conversation (so the LLM # can recognize name-drops of absent members) for uid, aliases in alias_config.items(): uid = int(uid) if isinstance(uid, str) else uid if uid not in seen_ids: lines.append(f" (not in chat) also known as: {', '.join(aliases)}") return "\n".join(lines) if lines else "" @staticmethod def _deanonymize_findings(result: dict, anon_map: dict[str, str]) -> None: """Replace anonymous keys back to display names in LLM findings (in-place).""" reverse_map = {v: k for k, v in anon_map.items()} for finding in result.get("user_findings", []): anon_name = finding.get("username", "") if anon_name in reverse_map: finding["username"] = reverse_map[anon_name] @staticmethod def _build_conversation( messages: list[discord.Message], ) -> list[tuple[str, str, datetime, str | None]]: """Convert a list of Discord messages to conversation tuples with reply resolution.""" msg_id_to_author = {m.id: m.author.display_name for m in messages} conversation = [] for msg in messages: reply_to = None if msg.reference and msg.reference.message_id: reply_to = msg_id_to_author.get(msg.reference.message_id) if not reply_to: ref = msg.reference.cached_message if ref: reply_to = ref.author.display_name conversation.append(( msg.author.display_name, msg.content, msg.created_at, reply_to, )) return conversation # -- Shared finding processor -- async def _process_finding( self, finding: dict, user_lookup: dict, *, sentiment_config: dict, dry_run: bool, thresholds: dict, db_content: str, db_topic_category: str, db_topic_reasoning: str, db_coherence_score: float | None, db_coherence_flag: str | None, game_channels: dict | None = None, coherence_config: dict | None = None, ) -> tuple[str, float, float, list[str]] | None: """Process a single user finding. Returns (username, score, drama_score, categories) or None if skipped. When game_channels is not None, topic drift, game redirect, and coherence handlers are active (buffered analysis mode). When None, they are skipped (mention scan mode). """ username = finding["username"] lookup = user_lookup.get(username) if not lookup: return None user_id, user_ref_msg, user_msgs = lookup score = finding["toxicity_score"] categories = finding["categories"] reasoning = finding["reasoning"] off_topic = finding.get("off_topic", False) note_update = finding.get("note_update") # Track in DramaTracker self.bot.drama_tracker.add_entry(user_id, score, categories, reasoning) escalation_boost = sentiment_config.get("escalation_boost", 0.04) drama_score = self.bot.drama_tracker.get_drama_score(user_id, escalation_boost=escalation_boost) logger.info( "User %s (%d) | msg_score=%.2f | drama_score=%.2f | categories=%s | %s", username, user_id, score, drama_score, categories, reasoning, ) # Save to DB db_message_id = await self.bot.db.save_message_and_analysis( guild_id=user_ref_msg.guild.id, channel_id=user_ref_msg.channel.id, user_id=user_id, username=username, content=db_content, message_ts=user_ref_msg.created_at.replace(tzinfo=timezone.utc), toxicity_score=score, drama_score=drama_score, categories=categories, reasoning=reasoning, off_topic=off_topic, topic_category=db_topic_category, topic_reasoning=db_topic_reasoning, coherence_score=db_coherence_score, coherence_flag=db_coherence_flag, ) # Feature handlers — only active during buffered analysis (game_channels set) if game_channels is not None: if off_topic: await handle_topic_drift( self.bot, user_ref_msg, db_topic_category, db_topic_reasoning, db_message_id, self._dirty_users, ) detected_game = finding.get("detected_game") if detected_game and game_channels and not dry_run: await handle_channel_redirect( self.bot, user_ref_msg, detected_game, game_channels, db_message_id, self._redirect_cooldowns, ) if coherence_config is not None and coherence_config.get("enabled", True): coherence_score = finding.get("coherence_score", 0.85) coherence_flag = finding.get("coherence_flag", "normal") degradation = self.bot.drama_tracker.update_coherence( user_id=user_id, score=coherence_score, flag=coherence_flag, drop_threshold=coherence_config.get("drop_threshold", 0.3), absolute_floor=coherence_config.get("absolute_floor", 0.5), cooldown_minutes=coherence_config.get("cooldown_minutes", 30), ) if degradation and not dry_run: await handle_coherence_alert( self.bot, user_ref_msg, degradation, coherence_config, db_message_id, self._dirty_users, ) # Note update — route to memory system if note_update: # Still update the legacy notes for backward compat with analysis prompt self.bot.drama_tracker.update_user_notes(user_id, note_update) self._dirty_users.add(user_id) # Also save as an expiring memory (7d default for passive observations) asyncio.create_task(self.bot.db.save_memory( user_id=user_id, memory=note_update[:500], topics=db_topic_category or "general", importance="medium", expires_at=datetime.now(timezone.utc) + timedelta(days=7), source="passive", )) self._dirty_users.add(user_id) # Log analysis await log_analysis( user_ref_msg, score, drama_score, categories, reasoning, off_topic, db_topic_category, ) # Moderation if not dry_run: await self._apply_moderation( user_ref_msg, user_id, score, drama_score, categories, thresholds, db_message_id, ) return (username, score, drama_score, categories) # -- Buffered analysis -- async def _process_buffered(self, channel_id: int): """Collect buffered messages, build conversation block, and run analysis.""" messages = self._message_buffer.pop(channel_id, []) self._debounce_tasks.pop(channel_id, None) if not messages: return # Use the last message as reference for channel/guild ref_message = messages[-1] channel = ref_message.channel config = self.bot.config sentiment_config = config.get("sentiment", {}) game_channels = config.get("game_channels", {}) # Fetch some history before the buffered messages for leading context context_count = sentiment_config.get("context_messages", 8) oldest_buffered = messages[0] history_messages: list[discord.Message] = [] try: async for msg in channel.history(limit=context_count + 5, before=oldest_buffered): if msg.author.bot: continue if not msg.content or not msg.content.strip(): continue history_messages.append(msg) if len(history_messages) >= context_count: break except discord.HTTPException: pass history_messages.reverse() # chronological order # Combine: history (context) + buffered (new messages to analyze) new_message_start = len(history_messages) all_messages = history_messages + messages conversation = self._build_conversation(all_messages) if not conversation: return user_notes_map = self._build_user_notes_map(messages) # Anonymize usernames before sending to LLM to prevent name-based bias anon_map = self._build_anon_map(conversation) anon_conversation = self._anonymize_conversation(conversation, anon_map) anon_notes = self._anonymize_notes(user_notes_map, anon_map) if user_notes_map else user_notes_map alias_config = config.get("user_aliases", {}) alias_context = self._build_alias_context(all_messages, anon_map, alias_config) channel_context = build_channel_context(ref_message, game_channels) logger.info( "Channel analysis: %d new messages (+%d context) in #%s", len(messages), len(history_messages), getattr(channel, 'name', 'unknown'), ) # TRIAGE: Lightweight model — conversation-level analysis result = await self.bot.llm.analyze_conversation( anon_conversation, channel_context=channel_context, user_notes_map=anon_notes, new_message_start=new_message_start, user_aliases=alias_context, ) if result is None: return # ESCALATION: Re-analyze with heavy model if any finding warrants it escalation_threshold = sentiment_config.get("escalation_threshold", 0.25) needs_escalation = any( f["toxicity_score"] >= escalation_threshold or f.get("off_topic", False) or f.get("coherence_score", 1.0) < 0.6 for f in result.get("user_findings", []) ) if needs_escalation: heavy_result = await self.bot.llm_heavy.analyze_conversation( anon_conversation, channel_context=channel_context, user_notes_map=anon_notes, new_message_start=new_message_start, user_aliases=alias_context, ) if heavy_result is not None: logger.info( "Escalated to heavy model for #%s", getattr(channel, 'name', 'unknown'), ) result = heavy_result # De-anonymize findings back to real display names self._deanonymize_findings(result, anon_map) user_lookup = self._build_user_lookup(messages) # Mark all buffered messages as analyzed (for mention scan dedup) for m in messages: self._mark_analyzed(m.id) dry_run = config.get("monitoring", {}).get("dry_run", False) thresholds = self._resolve_thresholds() coherence_config = config.get("coherence", {}) # Process per-user findings for finding in result.get("user_findings", []): username = finding["username"] lookup = user_lookup.get(username) if not lookup: continue _, _, user_msgs = lookup combined_content = "\n".join( m.content for m in user_msgs if m.content and m.content.strip() )[:4000] await self._process_finding( finding, user_lookup, sentiment_config=sentiment_config, dry_run=dry_run, thresholds=thresholds, db_content=combined_content, db_topic_category=finding.get("topic_category", "general_chat"), db_topic_reasoning=finding.get("topic_reasoning", ""), db_coherence_score=finding.get("coherence_score", 0.85), db_coherence_flag=finding.get("coherence_flag", "normal"), game_channels=game_channels, coherence_config=coherence_config, ) # -- Mention scan methods -- def _mark_analyzed(self, discord_message_id: int): """Track a Discord message ID as already analyzed.""" self._analyzed_message_ids.add(discord_message_id) if len(self._analyzed_message_ids) > self._max_analyzed_ids: sorted_ids = sorted(self._analyzed_message_ids) self._analyzed_message_ids = set(sorted_ids[len(sorted_ids) // 2:]) async def _maybe_start_mention_scan( self, trigger_message: discord.Message, mention_config: dict ): """Check cooldown and kick off a mention-triggered scan of recent messages.""" channel_id = trigger_message.channel.id cooldown_seconds = mention_config.get("cooldown_seconds", 60) now = datetime.now(timezone.utc) last_scan = self._mention_scan_cooldowns.get(channel_id) if last_scan and (now - last_scan).total_seconds() < cooldown_seconds: logger.info( "Mention scan cooldown active for #%s, skipping.", getattr(trigger_message.channel, "name", "unknown"), ) return self._mention_scan_cooldowns[channel_id] = now # Extract the user's concern (strip the bot ping from the message) mention_text = trigger_message.content for fmt in (f"<@{self.bot.user.id}>", f"<@!{self.bot.user.id}>"): mention_text = mention_text.replace(fmt, "") mention_text = mention_text.strip() or "(user pinged bot without specific concern)" # Store as a mention scan task (separate from debounce) existing_task = self._mention_scan_tasks.get(channel_id) if existing_task and not existing_task.done(): existing_task.cancel() self._mention_scan_tasks[channel_id] = asyncio.create_task( self._run_mention_scan(trigger_message, mention_text, mention_config) ) async def _run_mention_scan( self, trigger_message: discord.Message, mention_text: str, mention_config: dict, ): """Scan recent channel messages with ONE conversation-level LLM call.""" channel = trigger_message.channel scan_count = mention_config.get("scan_messages", 30) config = self.bot.config sentiment_config = config.get("sentiment", {}) game_channels = config.get("game_channels", {}) # Fetch recent messages (before the trigger, skip bots/empty) raw_messages: list[discord.Message] = [] try: async for msg in channel.history(limit=scan_count + 10, before=trigger_message): if msg.author.bot: continue if not msg.content or not msg.content.strip(): continue raw_messages.append(msg) if len(raw_messages) >= scan_count: break except discord.HTTPException: logger.warning("Failed to fetch history for mention scan in #%s", getattr(channel, "name", "unknown")) return raw_messages.reverse() # chronological order if not raw_messages: self._mention_scan_results[trigger_message.id] = "No recent messages found to analyze." return logger.info( "Mention scan triggered by %s in #%s: %d messages (single LLM call). Focus: %s", trigger_message.author.display_name, getattr(channel, "name", "unknown"), len(raw_messages), mention_text[:80], ) conversation = self._build_conversation(raw_messages) user_notes_map = self._build_user_notes_map(raw_messages) # Anonymize usernames before sending to LLM anon_map = self._build_anon_map(conversation) anon_conversation = self._anonymize_conversation(conversation, anon_map) anon_notes = self._anonymize_notes(user_notes_map, anon_map) if user_notes_map else user_notes_map alias_config = config.get("user_aliases", {}) alias_context = self._build_alias_context(raw_messages, anon_map, alias_config) channel_context = build_channel_context(raw_messages[0], game_channels) mention_context = ( f"A user flagged this conversation and said: \"{mention_text}\"\n" f"Pay special attention to whether this concern is valid." ) # Single LLM call result = await self.bot.llm.analyze_conversation( anon_conversation, mention_context=mention_context, channel_context=channel_context, user_notes_map=anon_notes, user_aliases=alias_context, ) if result is None: logger.warning("Conversation analysis failed for mention scan.") self._mention_scan_results[trigger_message.id] = "Analysis failed." return # De-anonymize findings back to real display names self._deanonymize_findings(result, anon_map) user_lookup = self._build_user_lookup(raw_messages) findings: list[str] = [] dry_run = config.get("monitoring", {}).get("dry_run", False) thresholds = self._resolve_thresholds() for finding in result.get("user_findings", []): username = finding["username"] lookup = user_lookup.get(username) if not lookup: logger.warning("Mention scan: LLM returned unknown user '%s', skipping.", username) continue user_id, ref_msg, user_msgs = lookup # Skip if all their messages were already analyzed if all(m.id in self._analyzed_message_ids for m in user_msgs): continue # Mark their messages as analyzed for m in user_msgs: self._mark_analyzed(m.id) worst_msg = finding.get("worst_message") content = f"[Mention scan] {worst_msg}" if worst_msg else "[Mention scan] See conversation" off_topic = finding.get("off_topic", False) result_tuple = await self._process_finding( finding, user_lookup, sentiment_config=sentiment_config, dry_run=dry_run, thresholds=thresholds, db_content=content, db_topic_category="personal_drama" if off_topic else "gaming", db_topic_reasoning=finding.get("reasoning", ""), db_coherence_score=None, db_coherence_flag=None, ) if result_tuple: _, score, _, categories = result_tuple if score >= 0.3: cat_str = ", ".join(c for c in categories if c != "none") or "none" findings.append(f"{username}: {score:.2f} ({cat_str})") # Build summary for ChatCog convo_summary = result.get("conversation_summary", "") if findings: summary = f"Scanned {len(raw_messages)} msgs. {convo_summary} Notable: " + "; ".join(findings[:5]) else: summary = f"Scanned {len(raw_messages)} msgs. {convo_summary}" # Prune old scan results if len(self._mention_scan_results) > 20: oldest = sorted(self._mention_scan_results.keys())[:len(self._mention_scan_results) - 10] for k in oldest: del self._mention_scan_results[k] self._mention_scan_results[trigger_message.id] = summary logger.info( "Mention scan complete in #%s: 1 LLM call, %d messages, %d users flagged", getattr(channel, "name", "unknown"), len(raw_messages), len(findings), ) # -- State flush loop -- @tasks.loop(seconds=STATE_FLUSH_INTERVAL) async def _flush_states(self): await flush_dirty_states(self.bot, self._dirty_users) @_flush_states.before_loop async def _before_flush(self): await self.bot.wait_until_ready() async def setup(bot: commands.Bot): await bot.add_cog(SentimentCog(bot))