diff --git a/cogs/chat.py b/cogs/chat.py index e917548..0e5ceae 100644 --- a/cogs/chat.py +++ b/cogs/chat.py @@ -153,6 +153,19 @@ class ChatCog(commands.Cog): if not content: content = "(just pinged me)" if not is_proactive else message.content + # If a mention scan is running, await it so we can include findings + scan_summary = "" + if self.bot.user in message.mentions: + sentiment_cog = self.bot.get_cog("SentimentCog") + if sentiment_cog: + task = sentiment_cog._mention_scan_tasks.get(message.channel.id) + if task and not task.done(): + try: + await asyncio.wait_for(asyncio.shield(task), timeout=45) + except (asyncio.TimeoutError, asyncio.CancelledError): + pass + scan_summary = sentiment_cog._mention_scan_results.pop(message.id, "") + # Add drama score context only when noteworthy drama_score = self.bot.drama_tracker.get_drama_score(message.author.id) user_data = self.bot.drama_tracker.get_user(message.author.id) @@ -169,6 +182,10 @@ class ChatCog(commands.Cog): if user_notes: extra_context += f"[Notes about {message.author.display_name}: {user_notes}]\n" + # Include mention scan findings if available + if scan_summary: + extra_context += f"[You just scanned recent chat. Results: {scan_summary}]\n" + recent_user_msgs = [] try: async for msg in message.channel.history(limit=50, before=message): @@ -239,8 +256,7 @@ class ChatCog(commands.Cog): # warnings/mutes appear before the chat reply sentiment_cog = self.bot.get_cog("SentimentCog") if sentiment_cog: - key = (message.channel.id, message.author.id) - task = sentiment_cog._debounce_tasks.get(key) + task = sentiment_cog._debounce_tasks.get(message.channel.id) if task and not task.done(): try: await asyncio.wait_for(asyncio.shield(task), timeout=15) diff --git a/cogs/sentiment.py b/cogs/sentiment.py index 944daee..219ee00 100644 --- a/cogs/sentiment.py +++ b/cogs/sentiment.py @@ -1,8 +1,8 @@ import asyncio import logging -from collections import deque from datetime import datetime, timedelta, timezone + import discord from discord.ext import commands, tasks @@ -15,17 +15,21 @@ STATE_FLUSH_INTERVAL = 300 # 5 minutes class SentimentCog(commands.Cog): def __init__(self, bot: commands.Bot): self.bot = bot - # Per-channel message history for context: {channel_id: deque of (author, content)} - self._channel_history: dict[int, deque] = {} # Track which user IDs have unsaved in-memory changes self._dirty_users: set[int] = set() # Per-user redirect cooldown: {user_id: last_redirect_datetime} self._redirect_cooldowns: dict[int, datetime] = {} - # Debounce buffer: keyed by (channel_id, user_id), stores list of messages - self._message_buffer: dict[tuple[int, int], list[discord.Message]] = {} - # Pending debounce timer tasks - self._debounce_tasks: dict[tuple[int, int], asyncio.Task] = {} - # Per-channel poll cooldown: {channel_id: last_poll_datetime} + # Debounce buffer: keyed by channel_id, stores list of messages from ALL users + self._message_buffer: dict[int, list[discord.Message]] = {} + # Pending debounce timer tasks (per-channel) + self._debounce_tasks: dict[int, asyncio.Task] = {} + # Mention scan tasks (separate from debounce) + self._mention_scan_tasks: dict[int, asyncio.Task] = {} + # Mention scan state + self._mention_scan_cooldowns: dict[int, datetime] = {} # {channel_id: last_scan_time} + self._mention_scan_results: dict[int, str] = {} # {trigger_message_id: findings_summary} + self._analyzed_message_ids: set[int] = set() # Discord message IDs already analyzed + self._max_analyzed_ids = 500 async def cog_load(self): @@ -37,8 +41,11 @@ class SentimentCog(commands.Cog): for task in self._debounce_tasks.values(): task.cancel() self._debounce_tasks.clear() - for key in list(self._message_buffer): - await self._process_buffered(key) + for task in self._mention_scan_tasks.values(): + task.cancel() + self._mention_scan_tasks.clear() + for channel_id in list(self._message_buffer): + await self._process_buffered(channel_id) # Final flush on shutdown await self._flush_dirty_states() @@ -80,202 +87,174 @@ class SentimentCog(commands.Cog): if self.bot.drama_tracker.is_immune(message.author.id): return - # Skip sentiment analysis for messages directed at the bot - # (mentions, replies to bot) — users interacting with the bot - # in roast/chat modes shouldn't have those messages scored as toxic + # Messages directed at the bot (mentions, replies) shouldn't be scored + # for toxicity — but @mentions can trigger a scan of recent chat directed_at_bot = self.bot.user in message.mentions if not directed_at_bot and message.reference and message.reference.message_id: ref = message.reference.cached_message if ref and ref.author.id == self.bot.user.id: directed_at_bot = True if directed_at_bot: + # @mention (not just reply-to-bot) triggers a mention scan + if self.bot.user in message.mentions: + mention_config = config.get("mention_scan", {}) + if mention_config.get("enabled", True): + await self._maybe_start_mention_scan(message, mention_config) return - # Store message in channel history for context - self._store_context(message) - # Skip if empty if not message.content or not message.content.strip(): return - # Buffer the message and start/reset debounce timer - key = (message.channel.id, message.author.id) - if key not in self._message_buffer: - self._message_buffer[key] = [] - self._message_buffer[key].append(message) + # Buffer the message and start/reset debounce timer (per-channel) + channel_id = message.channel.id + if channel_id not in self._message_buffer: + self._message_buffer[channel_id] = [] + self._message_buffer[channel_id].append(message) - # Cancel existing debounce timer for this user+channel - existing_task = self._debounce_tasks.get(key) + # Cancel existing debounce timer for this channel + existing_task = self._debounce_tasks.get(channel_id) if existing_task and not existing_task.done(): existing_task.cancel() - # Skip debounce when bot is @mentioned so warnings fire before chat replies - if self.bot.user in message.mentions: - batch_window = 0 - else: - batch_window = config.get("sentiment", {}).get("batch_window_seconds", 3) + batch_window = config.get("sentiment", {}).get("batch_window_seconds", 3) - self._debounce_tasks[key] = asyncio.create_task( - self._debounce_then_process(key, batch_window) + self._debounce_tasks[channel_id] = asyncio.create_task( + self._debounce_then_process(channel_id, batch_window) ) - async def _debounce_then_process(self, key: tuple[int, int], delay: float): + async def _debounce_then_process(self, channel_id: int, delay: float): """Sleep for the debounce window, then process the buffered messages.""" try: await asyncio.sleep(delay) - await self._process_buffered(key) + await self._process_buffered(channel_id) except asyncio.CancelledError: pass # Timer was reset by a new message — expected - async def _process_buffered(self, key: tuple[int, int]): - """Combine buffered messages and run the analysis pipeline once.""" - messages = self._message_buffer.pop(key, []) - self._debounce_tasks.pop(key, None) + async def _process_buffered(self, channel_id: int): + """Collect buffered messages, build conversation block, and run analysis.""" + messages = self._message_buffer.pop(channel_id, []) + self._debounce_tasks.pop(channel_id, None) if not messages: return - # Use the last message as the reference for channel, author, guild, etc. - message = messages[-1] - combined_content = "\n".join(m.content for m in messages if m.content and m.content.strip()) - - if not combined_content.strip(): - return - - batch_count = len(messages) - if batch_count > 1: - logger.info( - "Batched %d messages from %s in #%s", - batch_count, message.author.display_name, - getattr(message.channel, 'name', 'unknown'), - ) + # Use the last message as reference for channel/guild + ref_message = messages[-1] + channel = ref_message.channel config = self.bot.config - monitoring = config.get("monitoring", {}) sentiment_config = config.get("sentiment", {}) - - # Build channel context for game detection game_channels = config.get("game_channels", {}) - channel_context = self._build_channel_context(message, game_channels) - # Analyze the combined message (triage with lightweight model) - context = self._get_context(message) - user_notes = self.bot.drama_tracker.get_user_notes(message.author.id) - result = await self.bot.llm.analyze_message( - combined_content, context, user_notes=user_notes, + # Fetch some history before the buffered messages for leading context + context_count = sentiment_config.get("context_messages", 8) + oldest_buffered = messages[0] + history_messages: list[discord.Message] = [] + try: + async for msg in channel.history(limit=context_count + 5, before=oldest_buffered): + if msg.author.bot: + continue + if not msg.content or not msg.content.strip(): + continue + history_messages.append(msg) + if len(history_messages) >= context_count: + break + except discord.HTTPException: + pass + + history_messages.reverse() # chronological order + + # Combine: history (context) + buffered (new messages to analyze) + all_messages = history_messages + messages + + # Build msg_id_to_author lookup for reply resolution + msg_id_to_author: dict[int, str] = { + m.id: m.author.display_name for m in all_messages + } + + # Convert to conversation tuples: (username, content, timestamp, reply_to_username) + conversation: list[tuple[str, str, datetime, str | None]] = [] + for msg in all_messages: + reply_to = None + if msg.reference and msg.reference.message_id: + reply_to = msg_id_to_author.get(msg.reference.message_id) + if not reply_to: + ref = msg.reference.cached_message + if ref: + reply_to = ref.author.display_name + conversation.append(( + msg.author.display_name, + msg.content, + msg.created_at, + reply_to, + )) + + if not conversation: + return + + # Build user notes map (only for users in the buffer, not history-only) + user_notes_map: dict[str, str] = {} + for msg in messages: + name = msg.author.display_name + if name not in user_notes_map: + notes = self.bot.drama_tracker.get_user_notes(msg.author.id) + if notes: + user_notes_map[name] = notes + + channel_context = self._build_channel_context(ref_message, game_channels) + + logger.info( + "Channel analysis: %d new messages (+%d context) in #%s", + len(messages), len(history_messages), + getattr(channel, 'name', 'unknown'), + ) + + # TRIAGE: Lightweight model — conversation-level analysis + result = await self.bot.llm.analyze_conversation( + conversation, channel_context=channel_context, + user_notes_map=user_notes_map, ) if result is None: return - # Escalation: re-analyze with heavy model if triage flags something + # ESCALATION: Re-analyze with heavy model if any finding warrants it escalation_threshold = sentiment_config.get("escalation_threshold", 0.25) - needs_escalation = ( - result["toxicity_score"] >= escalation_threshold - or result.get("off_topic", False) - or result.get("coherence_score", 1.0) < 0.6 + needs_escalation = any( + f["toxicity_score"] >= escalation_threshold + or f.get("off_topic", False) + or f.get("coherence_score", 1.0) < 0.6 + for f in result.get("user_findings", []) ) if needs_escalation: - triage_score = result["toxicity_score"] - heavy_result = await self.bot.llm_heavy.analyze_message( - combined_content, context, user_notes=user_notes, + heavy_result = await self.bot.llm_heavy.analyze_conversation( + conversation, channel_context=channel_context, + user_notes_map=user_notes_map, ) if heavy_result is not None: logger.info( - "Escalated to heavy model (triage_score=%.2f) for %s", - triage_score, message.author.display_name, + "Escalated to heavy model for #%s", + getattr(channel, 'name', 'unknown'), ) result = heavy_result - score = result["toxicity_score"] - categories = result["categories"] - reasoning = result["reasoning"] + # Build username -> (user_id, ref_msg, [messages]) for buffered users only + user_lookup: dict[str, tuple[int, discord.Message, list[discord.Message]]] = {} + for msg in messages: + name = msg.author.display_name + if name not in user_lookup: + user_lookup[name] = (msg.author.id, msg, []) + user_lookup[name][2].append(msg) - # Track the result - self.bot.drama_tracker.add_entry( - message.author.id, score, categories, reasoning - ) + # Mark all buffered messages as analyzed (for mention scan dedup) + for m in messages: + self._mark_analyzed(m.id) - drama_score = self.bot.drama_tracker.get_drama_score(message.author.id) - - logger.info( - "User %s (%d) | msg_score=%.2f | drama_score=%.2f | categories=%s | %s", - message.author.display_name, - message.author.id, - score, - drama_score, - categories, - reasoning, - ) - - # Topic drift detection - off_topic = result.get("off_topic", False) - topic_category = result.get("topic_category", "general_chat") - topic_reasoning = result.get("topic_reasoning", "") - - # Save message + analysis to DB (awaited — need message_id for action links) - db_message_id = await self.bot.db.save_message_and_analysis( - guild_id=message.guild.id, - channel_id=message.channel.id, - user_id=message.author.id, - username=message.author.display_name, - content=combined_content, - message_ts=message.created_at.replace(tzinfo=timezone.utc), - toxicity_score=score, - drama_score=drama_score, - categories=categories, - reasoning=reasoning, - off_topic=off_topic, - topic_category=topic_category, - topic_reasoning=topic_reasoning, - coherence_score=result.get("coherence_score"), - coherence_flag=result.get("coherence_flag"), - ) - - if off_topic: - await self._handle_topic_drift(message, topic_category, topic_reasoning, db_message_id) - - # Game channel redirect detection - detected_game = result.get("detected_game") - if detected_game and game_channels and not monitoring.get("dry_run", False): - await self._handle_channel_redirect(message, detected_game, game_channels, db_message_id) - - # Coherence / intoxication detection - coherence_score = result.get("coherence_score", 0.85) - coherence_flag = result.get("coherence_flag", "normal") - coherence_config = config.get("coherence", {}) - if coherence_config.get("enabled", True): - degradation = self.bot.drama_tracker.update_coherence( - user_id=message.author.id, - score=coherence_score, - flag=coherence_flag, - drop_threshold=coherence_config.get("drop_threshold", 0.3), - absolute_floor=coherence_config.get("absolute_floor", 0.5), - cooldown_minutes=coherence_config.get("cooldown_minutes", 30), - ) - if degradation and not config.get("monitoring", {}).get("dry_run", False): - await self._handle_coherence_alert(message, degradation, coherence_config, db_message_id) - - # Capture LLM note updates about this user - note_update = result.get("note_update") - if note_update: - self.bot.drama_tracker.update_user_notes(message.author.id, note_update) - self._dirty_users.add(message.author.id) - - # Mark dirty for coherence baseline drift even without actions - self._dirty_users.add(message.author.id) - - # Always log analysis to #bcs-log if it exists - await self._log_analysis(message, score, drama_score, categories, reasoning, off_topic, topic_category) - - # Dry-run mode: skip warnings/mutes + # Resolve thresholds once dry_run = config.get("monitoring", {}).get("dry_run", False) - if dry_run: - return - - # Check thresholds — use relaxed thresholds if the active mode says so mode_config = self.bot.get_mode_config() moderation_level = mode_config.get("moderation", "full") if moderation_level == "relaxed" and "relaxed_thresholds" in mode_config: @@ -289,18 +268,365 @@ class SentimentCog(commands.Cog): base_mute_threshold = sentiment_config.get("mute_threshold", 0.75) spike_warn = sentiment_config.get("spike_warning_threshold", 0.5) spike_mute = sentiment_config.get("spike_mute_threshold", 0.8) - mute_threshold = self.bot.drama_tracker.get_mute_threshold( - message.author.id, base_mute_threshold + coherence_config = config.get("coherence", {}) + + # Process per-user findings + for finding in result.get("user_findings", []): + username = finding["username"] + lookup = user_lookup.get(username) + if not lookup: + # LLM returned a finding for a history-only user or unknown name; skip + continue + + user_id, user_ref_msg, user_msgs = lookup + score = finding["toxicity_score"] + categories = finding["categories"] + reasoning = finding["reasoning"] + off_topic = finding.get("off_topic", False) + topic_category = finding.get("topic_category", "general_chat") + topic_reasoning = finding.get("topic_reasoning", "") + coherence_score = finding.get("coherence_score", 0.85) + coherence_flag = finding.get("coherence_flag", "normal") + note_update = finding.get("note_update") + detected_game = finding.get("detected_game") + + # Track the result in DramaTracker + self.bot.drama_tracker.add_entry(user_id, score, categories, reasoning) + drama_score = self.bot.drama_tracker.get_drama_score(user_id) + + logger.info( + "User %s (%d) | msg_score=%.2f | drama_score=%.2f | categories=%s | %s", + username, user_id, score, drama_score, categories, reasoning, + ) + + # Save message + analysis to DB + combined_content = "\n".join( + m.content for m in user_msgs if m.content and m.content.strip() + ) + db_message_id = await self.bot.db.save_message_and_analysis( + guild_id=user_ref_msg.guild.id, + channel_id=user_ref_msg.channel.id, + user_id=user_id, + username=username, + content=combined_content[:4000], + message_ts=user_ref_msg.created_at.replace(tzinfo=timezone.utc), + toxicity_score=score, + drama_score=drama_score, + categories=categories, + reasoning=reasoning, + off_topic=off_topic, + topic_category=topic_category, + topic_reasoning=topic_reasoning, + coherence_score=coherence_score, + coherence_flag=coherence_flag, + ) + + # Topic drift handling + if off_topic: + await self._handle_topic_drift(user_ref_msg, topic_category, topic_reasoning, db_message_id) + + # Game channel redirect detection + if detected_game and game_channels and not dry_run: + await self._handle_channel_redirect(user_ref_msg, detected_game, game_channels, db_message_id) + + # Coherence / intoxication detection + if coherence_config.get("enabled", True): + degradation = self.bot.drama_tracker.update_coherence( + user_id=user_id, + score=coherence_score, + flag=coherence_flag, + drop_threshold=coherence_config.get("drop_threshold", 0.3), + absolute_floor=coherence_config.get("absolute_floor", 0.5), + cooldown_minutes=coherence_config.get("cooldown_minutes", 30), + ) + if degradation and not dry_run: + await self._handle_coherence_alert(user_ref_msg, degradation, coherence_config, db_message_id) + + # Capture LLM note updates about this user + if note_update: + self.bot.drama_tracker.update_user_notes(user_id, note_update) + self._dirty_users.add(user_id) + + # Mark dirty for coherence baseline drift even without actions + self._dirty_users.add(user_id) + + # Always log analysis to #bcs-log if it exists + await self._log_analysis(user_ref_msg, score, drama_score, categories, reasoning, off_topic, topic_category) + + # Moderation actions (skip in dry-run mode) + if not dry_run: + mute_threshold = self.bot.drama_tracker.get_mute_threshold( + user_id, base_mute_threshold + ) + # Mute: rolling average OR single message spike + if drama_score >= mute_threshold or score >= spike_mute: + effective_score = max(drama_score, score) + await self._mute_user(user_ref_msg, effective_score, categories, db_message_id) + # Warn: rolling average OR single message spike + elif drama_score >= warning_threshold or score >= spike_warn: + effective_score = max(drama_score, score) + await self._warn_user(user_ref_msg, effective_score, db_message_id) + + # -- Mention scan methods -- + + def _mark_analyzed(self, discord_message_id: int): + """Track a Discord message ID as already analyzed.""" + self._analyzed_message_ids.add(discord_message_id) + if len(self._analyzed_message_ids) > self._max_analyzed_ids: + sorted_ids = sorted(self._analyzed_message_ids) + self._analyzed_message_ids = set(sorted_ids[len(sorted_ids) // 2:]) + + async def _maybe_start_mention_scan( + self, trigger_message: discord.Message, mention_config: dict + ): + """Check cooldown and kick off a mention-triggered scan of recent messages.""" + channel_id = trigger_message.channel.id + cooldown_seconds = mention_config.get("cooldown_seconds", 60) + + now = datetime.now(timezone.utc) + last_scan = self._mention_scan_cooldowns.get(channel_id) + if last_scan and (now - last_scan).total_seconds() < cooldown_seconds: + logger.info( + "Mention scan cooldown active for #%s, skipping.", + getattr(trigger_message.channel, "name", "unknown"), + ) + return + + self._mention_scan_cooldowns[channel_id] = now + + # Extract the user's concern (strip the bot ping from the message) + mention_text = trigger_message.content + for fmt in (f"<@{self.bot.user.id}>", f"<@!{self.bot.user.id}>"): + mention_text = mention_text.replace(fmt, "") + mention_text = mention_text.strip() or "(user pinged bot without specific concern)" + + # Store as a mention scan task (separate from debounce) + existing_task = self._mention_scan_tasks.get(channel_id) + if existing_task and not existing_task.done(): + existing_task.cancel() + + self._mention_scan_tasks[channel_id] = asyncio.create_task( + self._run_mention_scan(trigger_message, mention_text, mention_config) ) - # Mute: rolling average OR single message spike - if drama_score >= mute_threshold or score >= spike_mute: - effective_score = max(drama_score, score) - await self._mute_user(message, effective_score, categories, db_message_id) - # Warn: rolling average OR single message spike - elif drama_score >= warning_threshold or score >= spike_warn: - effective_score = max(drama_score, score) - await self._warn_user(message, effective_score, db_message_id) + async def _run_mention_scan( + self, + trigger_message: discord.Message, + mention_text: str, + mention_config: dict, + ): + """Scan recent channel messages with ONE conversation-level LLM call.""" + channel = trigger_message.channel + scan_count = mention_config.get("scan_messages", 30) + + config = self.bot.config + sentiment_config = config.get("sentiment", {}) + game_channels = config.get("game_channels", {}) + + # Fetch recent messages (before the trigger, skip bots/empty) + raw_messages: list[discord.Message] = [] + try: + async for msg in channel.history(limit=scan_count + 10, before=trigger_message): + if msg.author.bot: + continue + if not msg.content or not msg.content.strip(): + continue + raw_messages.append(msg) + if len(raw_messages) >= scan_count: + break + except discord.HTTPException: + logger.warning("Failed to fetch history for mention scan in #%s", + getattr(channel, "name", "unknown")) + return + + raw_messages.reverse() # chronological order + + if not raw_messages: + self._mention_scan_results[trigger_message.id] = "No recent messages found to analyze." + return + + logger.info( + "Mention scan triggered by %s in #%s: %d messages (single LLM call). Focus: %s", + trigger_message.author.display_name, + getattr(channel, "name", "unknown"), + len(raw_messages), + mention_text[:80], + ) + + # Build a lookup of message IDs to authors for resolving replies + msg_id_to_author: dict[int, str] = { + m.id: m.author.display_name for m in raw_messages + } + + # Convert to conversation tuples: (username, content, timestamp, reply_to) + conversation: list[tuple[str, str, datetime, str | None]] = [] + for msg in raw_messages: + reply_to = None + if msg.reference and msg.reference.message_id: + # Try our local lookup first + reply_to = msg_id_to_author.get(msg.reference.message_id) + if not reply_to: + # Try the cached message + ref = msg.reference.cached_message + if ref: + reply_to = ref.author.display_name + conversation.append(( + msg.author.display_name, + msg.content, + msg.created_at, + reply_to, + )) + + # Build user notes map + user_notes_map: dict[str, str] = {} + for msg in raw_messages: + name = msg.author.display_name + if name not in user_notes_map: + notes = self.bot.drama_tracker.get_user_notes(msg.author.id) + if notes: + user_notes_map[name] = notes + + channel_context = self._build_channel_context(raw_messages[0], game_channels) + mention_context = ( + f"A user flagged this conversation and said: \"{mention_text}\"\n" + f"Pay special attention to whether this concern is valid." + ) + + # Single LLM call + result = await self.bot.llm.analyze_conversation( + conversation, + mention_context=mention_context, + channel_context=channel_context, + user_notes_map=user_notes_map, + ) + + if result is None: + logger.warning("Conversation analysis failed for mention scan.") + self._mention_scan_results[trigger_message.id] = "Analysis failed." + return + + # Build username → user_id + ref_message mapping + user_lookup: dict[str, tuple[int, discord.Message, list[discord.Message]]] = {} + for msg in raw_messages: + name = msg.author.display_name + if name not in user_lookup: + user_lookup[name] = (msg.author.id, msg, []) + user_lookup[name][2].append(msg) + + findings: list[str] = [] + + # Resolve thresholds once (outside the loop) + dry_run = config.get("monitoring", {}).get("dry_run", False) + mode_config = self.bot.get_mode_config() + moderation_level = mode_config.get("moderation", "full") + if moderation_level == "relaxed" and "relaxed_thresholds" in mode_config: + rt = mode_config["relaxed_thresholds"] + warning_threshold = rt.get("warning_threshold", 0.80) + base_mute_threshold = rt.get("mute_threshold", 0.85) + spike_warn = rt.get("spike_warning_threshold", 0.70) + spike_mute = rt.get("spike_mute_threshold", 0.85) + else: + warning_threshold = sentiment_config.get("warning_threshold", 0.6) + base_mute_threshold = sentiment_config.get("mute_threshold", 0.75) + spike_warn = sentiment_config.get("spike_warning_threshold", 0.5) + spike_mute = sentiment_config.get("spike_mute_threshold", 0.8) + + for finding in result.get("user_findings", []): + username = finding["username"] + score = finding["toxicity_score"] + categories = finding["categories"] + reasoning = finding["reasoning"] + worst_msg = finding.get("worst_message") + off_topic = finding.get("off_topic", False) + note_update = finding.get("note_update") + + lookup = user_lookup.get(username) + if not lookup: + logger.warning("Mention scan: LLM returned unknown user '%s', skipping.", username) + continue + + user_id, ref_msg, user_msgs = lookup + + # Skip if all their messages were already analyzed + if all(m.id in self._analyzed_message_ids for m in user_msgs): + continue + + # Mark their messages as analyzed + for m in user_msgs: + self._mark_analyzed(m.id) + + self.bot.drama_tracker.add_entry(user_id, score, categories, reasoning) + drama_score = self.bot.drama_tracker.get_drama_score(user_id) + + # Save to DB + content_summary = f"[Mention scan] {worst_msg}" if worst_msg else "[Mention scan] See conversation" + db_message_id = await self.bot.db.save_message_and_analysis( + guild_id=ref_msg.guild.id, + channel_id=ref_msg.channel.id, + user_id=user_id, + username=username, + content=content_summary, + message_ts=ref_msg.created_at.replace(tzinfo=timezone.utc), + toxicity_score=score, + drama_score=drama_score, + categories=categories, + reasoning=reasoning, + off_topic=off_topic, + topic_category="personal_drama" if off_topic else "gaming", + topic_reasoning=reasoning, + coherence_score=None, + coherence_flag=None, + ) + + # Log to #bcs-log + await self._log_analysis( + ref_msg, score, drama_score, categories, reasoning, + off_topic, "personal_drama" if off_topic else "gaming", + ) + + # Collect notable findings for summary + if score >= 0.3: + cat_str = ", ".join(c for c in categories if c != "none") or "none" + findings.append(f"{username}: {score:.2f} ({cat_str})") + + # Update user notes + if note_update: + self.bot.drama_tracker.update_user_notes(user_id, note_update) + self._dirty_users.add(user_id) + + # Moderation actions + if not dry_run: + mute_threshold = self.bot.drama_tracker.get_mute_threshold( + user_id, base_mute_threshold + ) + if drama_score >= mute_threshold or score >= spike_mute: + effective_score = max(drama_score, score) + await self._mute_user(ref_msg, effective_score, categories, db_message_id) + elif drama_score >= warning_threshold or score >= spike_warn: + effective_score = max(drama_score, score) + await self._warn_user(ref_msg, effective_score, db_message_id) + + # Build summary for ChatCog + convo_summary = result.get("conversation_summary", "") + if findings: + summary = f"Scanned {len(raw_messages)} msgs. {convo_summary} Notable: " + "; ".join(findings[:5]) + else: + summary = f"Scanned {len(raw_messages)} msgs. {convo_summary}" + + # Prune old scan results + if len(self._mention_scan_results) > 20: + oldest = sorted(self._mention_scan_results.keys())[:len(self._mention_scan_results) - 10] + for k in oldest: + del self._mention_scan_results[k] + + self._mention_scan_results[trigger_message.id] = summary + + logger.info( + "Mention scan complete in #%s: 1 LLM call, %d messages, %d users flagged", + getattr(channel, "name", "unknown"), + len(raw_messages), + len(findings), + ) async def _mute_user( self, @@ -684,59 +1010,6 @@ class SentimentCog(commands.Cog): details=f"from=#{channel_name} to=#{detected_game} game={game_name}", )) - def _store_context(self, message: discord.Message): - ch_id = message.channel.id - if ch_id not in self._channel_history: - max_ctx = self.bot.config.get("sentiment", {}).get( - "context_messages", 8 - ) - self._channel_history[ch_id] = deque(maxlen=max_ctx) - self._channel_history[ch_id].append( - (message.author.display_name, message.content, datetime.now(timezone.utc)) - ) - - def _get_context(self, message: discord.Message) -> str: - """Build a timestamped chat log from recent channel messages. - - Excludes messages currently buffered for this user+channel - (those appear in the TARGET MESSAGE section instead). - """ - ch_id = message.channel.id - history = self._channel_history.get(ch_id, deque()) - if not history: - return "(no prior context)" - - now = datetime.now(timezone.utc) - - # Collect IDs of messages in the current debounce batch so we can skip them - batch_key = (ch_id, message.author.id) - batch_msgs = self._message_buffer.get(batch_key, []) - # Build a set of (author, content) from the batch for fast lookup - batch_set = {(m.author.display_name, m.content) for m in batch_msgs} - - lines = [] - for name, content, ts in history: - if (name, content) in batch_set: - continue - delta = now - ts - rel = self._format_relative_time(delta) - lines.append(f"[{rel}] {name}: {content}") - - if not lines: - return "(no prior context)" - return "\n".join(lines) - - @staticmethod - def _format_relative_time(delta: timedelta) -> str: - total_seconds = int(delta.total_seconds()) - if total_seconds < 60: - return f"~{total_seconds}s ago" - minutes = total_seconds // 60 - if minutes < 60: - return f"~{minutes}m ago" - hours = minutes // 60 - return f"~{hours}h ago" - async def _log_analysis( self, message: discord.Message, score: float, drama_score: float, categories: list[str], reasoning: str, off_topic: bool, topic_category: str, diff --git a/config.yaml b/config.yaml index b658e72..ec0aede 100644 --- a/config.yaml +++ b/config.yaml @@ -33,8 +33,13 @@ topic_drift: escalation_count: 3 # After this many reminds, DM the server owner reset_minutes: 60 # Reset off-topic count after this much on-topic behavior +mention_scan: + enabled: true + scan_messages: 30 # Messages to scan per mention trigger + cooldown_seconds: 60 # Per-channel cooldown between scans + timeouts: - escalation_minutes: [5, 15, 30, 60] # Escalating timeout durations + escalation_minutes: [30, 60, 120, 240] # Escalating timeout durations offense_reset_minutes: 120 # Reset offense counter after this much good behavior warning_cooldown_minutes: 5 # Don't warn same user more than once per this window diff --git a/prompts/analysis.txt b/prompts/analysis.txt index 2db7f34..bf8d165 100644 --- a/prompts/analysis.txt +++ b/prompts/analysis.txt @@ -36,4 +36,16 @@ If you notice something noteworthy about this user's communication style, behavi GAME DETECTION — If CHANNEL INFO is provided, identify which specific game the message is discussing. Set detected_game to the channel name that best matches (e.g. "gta-online", "warzone", "battlefield", "cod-zombies") using ONLY the channel names listed in the channel info. If the message isn't about a specific game or you're unsure, set detected_game to null. -Use the report_analysis tool to report your analysis of the TARGET MESSAGE only. \ No newline at end of file +Use the report_analysis tool to report your analysis of the TARGET MESSAGE only. + +CONVERSATION-LEVEL ANALYSIS (when given a CONVERSATION BLOCK instead of a single TARGET MESSAGE): +When you receive a full conversation block with multiple users, use the report_conversation_scan tool instead: +- Provide ONE finding per user (not per message) — aggregate their behavior across the conversation. +- Weight their average tone and worst message equally when determining the toxicity_score. +- Use the same scoring bands (0.0-1.0) as for single messages. +- Quote the worst/most problematic snippet in worst_message (max 100 chars, exact quote). +- Flag off_topic if user's messages are primarily personal drama, not gaming. +- For each user, assess coherence_score (0.0-1.0) and coherence_flag using the same criteria as single-message analysis. Normal texting shortcuts and abbreviations are fine (score ~0.85+). +- For each user, determine topic_category and provide brief topic_reasoning for their messages. +- For each user, check detected_game against the CHANNEL INFO section (if provided). Set to the game channel name if their messages are about a specific game, or null otherwise. +- If a USER REPORT section is present, pay close attention to whether that specific concern is valid. \ No newline at end of file diff --git a/utils/llm_client.py b/utils/llm_client.py index e812b3f..85103d0 100644 --- a/utils/llm_client.py +++ b/utils/llm_client.py @@ -3,6 +3,7 @@ import base64 import json import logging import time +from datetime import datetime, timedelta, timezone from pathlib import Path from openai import AsyncOpenAI @@ -97,6 +98,114 @@ ANALYSIS_TOOL = { } +CONVERSATION_TOOL = { + "type": "function", + "function": { + "name": "report_conversation_scan", + "description": "Analyze a conversation block and report findings per user.", + "parameters": { + "type": "object", + "properties": { + "user_findings": { + "type": "array", + "items": { + "type": "object", + "properties": { + "username": { + "type": "string", + "description": "Discord display name of the user.", + }, + "toxicity_score": { + "type": "number", + "description": "Weighted toxicity 0.0-1.0 across their messages in this conversation.", + }, + "categories": { + "type": "array", + "items": { + "type": "string", + "enum": [ + "aggressive", + "passive_aggressive", + "instigating", + "hostile", + "manipulative", + "sexual_vulgar", + "none", + ], + }, + "description": "Detected toxicity behavior categories.", + }, + "reasoning": { + "type": "string", + "description": "Brief explanation of this user's behavior in the conversation.", + }, + "worst_message": { + "type": ["string", "null"], + "description": "Most problematic snippet from this user (quoted, max 100 chars), or null if nothing notable.", + }, + "off_topic": { + "type": "boolean", + "description": "True if this user's messages were primarily off-topic personal drama.", + }, + "topic_category": { + "type": "string", + "enum": [ + "gaming", + "personal_drama", + "relationship_issues", + "real_life_venting", + "gossip", + "general_chat", + "meta", + ], + "description": "What topic category this user's messages fall into.", + }, + "topic_reasoning": { + "type": "string", + "description": "Brief explanation of the topic classification for this user.", + }, + "coherence_score": { + "type": "number", + "description": "Coherence rating 0.0-1.0 across this user's messages. Normal texting shortcuts are fine.", + }, + "coherence_flag": { + "type": "string", + "enum": [ + "normal", + "intoxicated", + "tired", + "angry_typing", + "mobile_keyboard", + "language_barrier", + ], + "description": "Best guess at why coherence is low, if applicable.", + }, + "note_update": { + "type": ["string", "null"], + "description": "New observation about this user's pattern, or null.", + }, + "detected_game": { + "type": ["string", "null"], + "description": "The game channel name this user's messages are about, or null.", + }, + }, + "required": ["username", "toxicity_score", "categories", "reasoning", "off_topic", "topic_category", "topic_reasoning", "coherence_score", "coherence_flag"], + }, + "description": "Findings for each user who participated in the conversation.", + }, + "conversation_summary": { + "type": "string", + "description": "One-sentence summary of the overall conversation tone and any escalation patterns.", + }, + }, + "required": ["user_findings", "conversation_summary"], + }, + }, +} + +_NO_TEMPERATURE_MODELS = {"gpt-5-nano", "o1", "o1-mini", "o1-preview", "o3", "o3-mini", "o4-mini"} + + class LLMClient: def __init__(self, base_url: str, model: str, api_key: str = "not-needed", db=None, no_think: bool = False, concurrency: int = 4): @@ -104,6 +213,7 @@ class LLMClient: self.host = base_url.rstrip("/") self._db = db self._no_think = no_think + self._supports_temperature = model not in _NO_TEMPERATURE_MODELS timeout = 600.0 if self.host else 120.0 # local models need longer for VRAM load client_kwargs = {"api_key": api_key, "timeout": timeout} if self.host: @@ -137,13 +247,15 @@ class LLMClient: async def analyze_message( self, message: str, context: str = "", user_notes: str = "", - channel_context: str = "", + channel_context: str = "", mention_context: str = "", ) -> dict | None: user_content = f"=== RECENT CHANNEL MESSAGES (for background context only) ===\n{context}\n\n" if user_notes: user_content += f"=== NOTES ABOUT THIS USER (from prior analysis) ===\n{user_notes}\n\n" if channel_context: user_content += f"=== CHANNEL INFO ===\n{channel_context}\n\n" + if mention_context: + user_content += f"=== USER REPORT (a user flagged this conversation — focus on this concern) ===\n{mention_context}\n\n" user_content += f"=== TARGET MESSAGE (analyze THIS message only) ===\n{message}" user_content = self._append_no_think(user_content) @@ -155,6 +267,7 @@ class LLMClient: async with self._semaphore: try: + temp_kwargs = {"temperature": 0.1} if self._supports_temperature else {} response = await self._client.chat.completions.create( model=self.model, messages=[ @@ -163,7 +276,7 @@ class LLMClient: ], tools=[ANALYSIS_TOOL], tool_choice={"type": "function", "function": {"name": "report_analysis"}}, - temperature=0.1, + **temp_kwargs, max_completion_tokens=2048, ) @@ -253,6 +366,152 @@ class LLMClient: logger.warning("Could not parse LLM content fallback: %s", text[:200]) return None + # -- Conversation-level analysis (mention scan) -- + + @staticmethod + def _format_relative_time(delta: timedelta) -> str: + total_seconds = int(delta.total_seconds()) + if total_seconds < 60: + return f"~{total_seconds}s ago" + minutes = total_seconds // 60 + if minutes < 60: + return f"~{minutes}m ago" + hours = minutes // 60 + return f"~{hours}h ago" + + @staticmethod + def _format_conversation_block( + messages: list[tuple[str, str, datetime, str | None]], + now: datetime | None = None, + ) -> str: + """Format messages as a compact timestamped chat block. + + Each tuple is (username, content, timestamp, reply_to_username). + Consecutive messages from the same user collapse to indented lines. + Replies shown as ``username → replied_to:``. + """ + if now is None: + now = datetime.now(timezone.utc) + + lines = [f"[Current time: {now.strftime('%I:%M %p')}]", ""] + last_user = None + + for username, content, ts, reply_to in messages: + delta = now - ts.replace(tzinfo=timezone.utc) if ts.tzinfo is None else now - ts + rel = LLMClient._format_relative_time(delta) + + if username == last_user: + # Continuation from same user — indent + for line in content.split("\n"): + lines.append(f" {line}") + else: + # New user block + if reply_to: + prefix = f"[{rel}] {username} → {reply_to}: " + else: + prefix = f"[{rel}] {username}: " + msg_lines = content.split("\n") + lines.append(prefix + msg_lines[0]) + for line in msg_lines[1:]: + lines.append(f" {line}") + + last_user = username + + return "\n".join(lines) + + async def analyze_conversation( + self, + messages: list[tuple[str, str, datetime, str | None]], + mention_context: str = "", + channel_context: str = "", + user_notes_map: dict[str, str] | None = None, + ) -> dict | None: + """Analyze a conversation block in one call, returning per-user findings.""" + if not messages: + return None + + convo_block = self._format_conversation_block(messages) + + user_content = f"=== CONVERSATION BLOCK ===\n{convo_block}\n\n" + if user_notes_map: + notes_lines = [f" {u}: {n}" for u, n in user_notes_map.items() if n] + if notes_lines: + user_content += "=== USER NOTES (from prior analysis) ===\n" + "\n".join(notes_lines) + "\n\n" + if channel_context: + user_content += f"=== CHANNEL INFO ===\n{channel_context}\n\n" + if mention_context: + user_content += f"=== USER REPORT (a user flagged this conversation — focus on this concern) ===\n{mention_context}\n\n" + user_content += "Analyze the conversation block above and report findings for each user." + user_content = self._append_no_think(user_content) + + req_json = json.dumps([ + {"role": "system", "content": SYSTEM_PROMPT[:500]}, + {"role": "user", "content": user_content[:500]}, + ], default=str) + t0 = time.monotonic() + + async with self._semaphore: + try: + temp_kwargs = {"temperature": 0.1} if self._supports_temperature else {} + response = await self._client.chat.completions.create( + model=self.model, + messages=[ + {"role": "system", "content": SYSTEM_PROMPT}, + {"role": "user", "content": user_content}, + ], + tools=[CONVERSATION_TOOL], + tool_choice={"type": "function", "function": {"name": "report_conversation_scan"}}, + **temp_kwargs, + max_completion_tokens=4096, + ) + + elapsed = int((time.monotonic() - t0) * 1000) + choice = response.choices[0] + usage = response.usage + + if choice.message.tool_calls: + tool_call = choice.message.tool_calls[0] + resp_text = tool_call.function.arguments + args = json.loads(resp_text) + self._log_llm("conversation", elapsed, True, req_json, resp_text, + input_tokens=usage.prompt_tokens if usage else None, + output_tokens=usage.completion_tokens if usage else None) + return self._validate_conversation_result(args) + + logger.warning("No tool call in conversation analysis response.") + self._log_llm("conversation", elapsed, False, req_json, error="Empty response") + return None + + except Exception as e: + elapsed = int((time.monotonic() - t0) * 1000) + logger.error("LLM conversation analysis error: %s", e) + self._log_llm("conversation", elapsed, False, req_json, error=str(e)) + return None + + @staticmethod + def _validate_conversation_result(result: dict) -> dict: + """Validate and normalize conversation analysis result.""" + findings = result.get("user_findings", []) + for finding in findings: + finding.setdefault("username", "unknown") + score = float(finding.get("toxicity_score", 0.0)) + finding["toxicity_score"] = min(max(score, 0.0), 1.0) + if not isinstance(finding.get("categories"), list): + finding["categories"] = ["none"] + finding.setdefault("reasoning", "") + finding.setdefault("worst_message", None) + finding["off_topic"] = bool(finding.get("off_topic", False)) + finding.setdefault("topic_category", "general_chat") + finding.setdefault("topic_reasoning", "") + coherence = float(finding.get("coherence_score", 0.85)) + finding["coherence_score"] = min(max(coherence, 0.0), 1.0) + finding.setdefault("coherence_flag", "normal") + finding.setdefault("note_update", None) + finding.setdefault("detected_game", None) + result["user_findings"] = findings + result.setdefault("conversation_summary", "") + return result + async def chat( self, messages: list[dict[str, str]], system_prompt: str, on_first_token=None, recent_bot_replies: list[str] | None = None, @@ -285,16 +544,15 @@ class LLMClient: async with self._semaphore: try: + temp_kwargs = {"temperature": 0.9, "frequency_penalty": 0.8, "presence_penalty": 0.6} if self._supports_temperature else {} stream = await self._client.chat.completions.create( model=self.model, messages=[ {"role": "system", "content": effective_prompt}, *patched, ], - temperature=0.9, + **temp_kwargs, max_completion_tokens=2048, - frequency_penalty=0.8, - presence_penalty=0.6, stream=True, ) @@ -350,13 +608,14 @@ class LLMClient: async with self._semaphore: try: async def _stream_image(): + temp_kwargs = {"temperature": 0.8} if self._supports_temperature else {} stream = await self._client.chat.completions.create( model=self.model, messages=[ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_content}, ], - temperature=0.8, + **temp_kwargs, max_completion_tokens=2048, stream=True, ) @@ -409,6 +668,7 @@ class LLMClient: async with self._semaphore: try: + temp_kwargs = {"temperature": 0.1} if self._supports_temperature else {} response = await self._client.chat.completions.create( model=self.model, messages=[ @@ -417,7 +677,7 @@ class LLMClient: ], tools=[ANALYSIS_TOOL], tool_choice={"type": "function", "function": {"name": "report_analysis"}}, - temperature=0.1, + **temp_kwargs, max_completion_tokens=2048, )