import asyncio import logging from datetime import datetime, timedelta, timezone import discord from discord.ext import commands, tasks logger = logging.getLogger("bcs.sentiment") # How often to flush dirty user states to DB (seconds) STATE_FLUSH_INTERVAL = 300 # 5 minutes class SentimentCog(commands.Cog): def __init__(self, bot: commands.Bot): self.bot = bot # Track which user IDs have unsaved in-memory changes self._dirty_users: set[int] = set() # Per-user redirect cooldown: {user_id: last_redirect_datetime} self._redirect_cooldowns: dict[int, datetime] = {} # Debounce buffer: keyed by channel_id, stores list of messages from ALL users self._message_buffer: dict[int, list[discord.Message]] = {} # Pending debounce timer tasks (per-channel) self._debounce_tasks: dict[int, asyncio.Task] = {} # Mention scan tasks (separate from debounce) self._mention_scan_tasks: dict[int, asyncio.Task] = {} # Mention scan state self._mention_scan_cooldowns: dict[int, datetime] = {} # {channel_id: last_scan_time} self._mention_scan_results: dict[int, str] = {} # {trigger_message_id: findings_summary} self._analyzed_message_ids: set[int] = set() # Discord message IDs already analyzed self._max_analyzed_ids = 500 async def cog_load(self): self._flush_states.start() async def cog_unload(self): self._flush_states.cancel() # Cancel all pending debounce timers and process remaining buffers for task in self._debounce_tasks.values(): task.cancel() self._debounce_tasks.clear() for task in self._mention_scan_tasks.values(): task.cancel() self._mention_scan_tasks.clear() for channel_id in list(self._message_buffer): await self._process_buffered(channel_id) # Final flush on shutdown await self._flush_dirty_states() @commands.Cog.listener() async def on_message(self, message: discord.Message): logger.info("MSG from %s in #%s: %s", message.author, getattr(message.channel, 'name', 'DM'), message.content[:80] if message.content else "(empty)") # Ignore bots (including ourselves) if message.author.bot: return # Ignore DMs if not message.guild: return config = self.bot.config monitoring = config.get("monitoring", {}) if not monitoring.get("enabled", True): return # Check if channel is monitored monitored_channels = monitoring.get("channels", []) if monitored_channels and message.channel.id not in monitored_channels: return # Check ignored users if message.author.id in monitoring.get("ignored_users", []): return # Check immune roles immune_roles = set(monitoring.get("immune_roles", [])) if immune_roles and any( r.id in immune_roles for r in message.author.roles ): return # Check per-user immunity if self.bot.drama_tracker.is_immune(message.author.id): return # Explicit @mention of the bot triggers a mention scan instead of scoring. # Reply-pings (Discord auto-adds replied-to user to mentions) should NOT # trigger scans — and reply-to-bot messages should still be scored normally # so toxic replies to bot warnings aren't silently skipped. bot_mentioned_in_text = ( f"<@{self.bot.user.id}>" in (message.content or "") or f"<@!{self.bot.user.id}>" in (message.content or "") ) if bot_mentioned_in_text: mention_config = config.get("mention_scan", {}) if mention_config.get("enabled", True): await self._maybe_start_mention_scan(message, mention_config) return # Skip if empty if not message.content or not message.content.strip(): return # Buffer the message and start/reset debounce timer (per-channel) channel_id = message.channel.id if channel_id not in self._message_buffer: self._message_buffer[channel_id] = [] self._message_buffer[channel_id].append(message) # Cancel existing debounce timer for this channel existing_task = self._debounce_tasks.get(channel_id) if existing_task and not existing_task.done(): existing_task.cancel() batch_window = config.get("sentiment", {}).get("batch_window_seconds", 3) self._debounce_tasks[channel_id] = asyncio.create_task( self._debounce_then_process(channel_id, batch_window) ) async def _debounce_then_process(self, channel_id: int, delay: float): """Sleep for the debounce window, then process the buffered messages.""" try: await asyncio.sleep(delay) await self._process_buffered(channel_id) except asyncio.CancelledError: pass # Timer was reset by a new message — expected async def _process_buffered(self, channel_id: int): """Collect buffered messages, build conversation block, and run analysis.""" messages = self._message_buffer.pop(channel_id, []) self._debounce_tasks.pop(channel_id, None) if not messages: return # Use the last message as reference for channel/guild ref_message = messages[-1] channel = ref_message.channel config = self.bot.config sentiment_config = config.get("sentiment", {}) game_channels = config.get("game_channels", {}) # Fetch some history before the buffered messages for leading context context_count = sentiment_config.get("context_messages", 8) oldest_buffered = messages[0] history_messages: list[discord.Message] = [] try: async for msg in channel.history(limit=context_count + 5, before=oldest_buffered): if msg.author.bot: continue if not msg.content or not msg.content.strip(): continue history_messages.append(msg) if len(history_messages) >= context_count: break except discord.HTTPException: pass history_messages.reverse() # chronological order # Combine: history (context) + buffered (new messages to analyze) new_message_start = len(history_messages) all_messages = history_messages + messages # Build msg_id_to_author lookup for reply resolution msg_id_to_author: dict[int, str] = { m.id: m.author.display_name for m in all_messages } # Convert to conversation tuples: (username, content, timestamp, reply_to_username) conversation: list[tuple[str, str, datetime, str | None]] = [] for msg in all_messages: reply_to = None if msg.reference and msg.reference.message_id: reply_to = msg_id_to_author.get(msg.reference.message_id) if not reply_to: ref = msg.reference.cached_message if ref: reply_to = ref.author.display_name conversation.append(( msg.author.display_name, msg.content, msg.created_at, reply_to, )) if not conversation: return # Build user notes map (only for users in the buffer, not history-only) user_notes_map: dict[str, str] = {} for msg in messages: name = msg.author.display_name if name not in user_notes_map: notes = self.bot.drama_tracker.get_user_notes(msg.author.id) if notes: user_notes_map[name] = notes channel_context = self._build_channel_context(ref_message, game_channels) logger.info( "Channel analysis: %d new messages (+%d context) in #%s", len(messages), len(history_messages), getattr(channel, 'name', 'unknown'), ) # TRIAGE: Lightweight model — conversation-level analysis result = await self.bot.llm.analyze_conversation( conversation, channel_context=channel_context, user_notes_map=user_notes_map, new_message_start=new_message_start, ) if result is None: return # ESCALATION: Re-analyze with heavy model if any finding warrants it escalation_threshold = sentiment_config.get("escalation_threshold", 0.25) needs_escalation = any( f["toxicity_score"] >= escalation_threshold or f.get("off_topic", False) or f.get("coherence_score", 1.0) < 0.6 for f in result.get("user_findings", []) ) if needs_escalation: heavy_result = await self.bot.llm_heavy.analyze_conversation( conversation, channel_context=channel_context, user_notes_map=user_notes_map, new_message_start=new_message_start, ) if heavy_result is not None: logger.info( "Escalated to heavy model for #%s", getattr(channel, 'name', 'unknown'), ) result = heavy_result # Build username -> (user_id, ref_msg, [messages]) for buffered users only user_lookup: dict[str, tuple[int, discord.Message, list[discord.Message]]] = {} for msg in messages: name = msg.author.display_name if name not in user_lookup: user_lookup[name] = (msg.author.id, msg, []) user_lookup[name][2].append(msg) # Mark all buffered messages as analyzed (for mention scan dedup) for m in messages: self._mark_analyzed(m.id) # Resolve thresholds once dry_run = config.get("monitoring", {}).get("dry_run", False) mode_config = self.bot.get_mode_config() moderation_level = mode_config.get("moderation", "full") if moderation_level == "relaxed" and "relaxed_thresholds" in mode_config: rt = mode_config["relaxed_thresholds"] warning_threshold = rt.get("warning_threshold", 0.80) base_mute_threshold = rt.get("mute_threshold", 0.85) spike_warn = rt.get("spike_warning_threshold", 0.70) spike_mute = rt.get("spike_mute_threshold", 0.85) else: warning_threshold = sentiment_config.get("warning_threshold", 0.6) base_mute_threshold = sentiment_config.get("mute_threshold", 0.75) spike_warn = sentiment_config.get("spike_warning_threshold", 0.5) spike_mute = sentiment_config.get("spike_mute_threshold", 0.8) coherence_config = config.get("coherence", {}) # Process per-user findings for finding in result.get("user_findings", []): username = finding["username"] lookup = user_lookup.get(username) if not lookup: # LLM returned a finding for a history-only user or unknown name; skip continue user_id, user_ref_msg, user_msgs = lookup score = finding["toxicity_score"] categories = finding["categories"] reasoning = finding["reasoning"] off_topic = finding.get("off_topic", False) topic_category = finding.get("topic_category", "general_chat") topic_reasoning = finding.get("topic_reasoning", "") coherence_score = finding.get("coherence_score", 0.85) coherence_flag = finding.get("coherence_flag", "normal") note_update = finding.get("note_update") detected_game = finding.get("detected_game") # Track the result in DramaTracker self.bot.drama_tracker.add_entry(user_id, score, categories, reasoning) escalation_boost = sentiment_config.get("escalation_boost", 0.04) drama_score = self.bot.drama_tracker.get_drama_score(user_id, escalation_boost=escalation_boost) logger.info( "User %s (%d) | msg_score=%.2f | drama_score=%.2f | categories=%s | %s", username, user_id, score, drama_score, categories, reasoning, ) # Save message + analysis to DB combined_content = "\n".join( m.content for m in user_msgs if m.content and m.content.strip() ) db_message_id = await self.bot.db.save_message_and_analysis( guild_id=user_ref_msg.guild.id, channel_id=user_ref_msg.channel.id, user_id=user_id, username=username, content=combined_content[:4000], message_ts=user_ref_msg.created_at.replace(tzinfo=timezone.utc), toxicity_score=score, drama_score=drama_score, categories=categories, reasoning=reasoning, off_topic=off_topic, topic_category=topic_category, topic_reasoning=topic_reasoning, coherence_score=coherence_score, coherence_flag=coherence_flag, ) # Topic drift handling if off_topic: await self._handle_topic_drift(user_ref_msg, topic_category, topic_reasoning, db_message_id) # Game channel redirect detection if detected_game and game_channels and not dry_run: await self._handle_channel_redirect(user_ref_msg, detected_game, game_channels, db_message_id) # Coherence / intoxication detection if coherence_config.get("enabled", True): degradation = self.bot.drama_tracker.update_coherence( user_id=user_id, score=coherence_score, flag=coherence_flag, drop_threshold=coherence_config.get("drop_threshold", 0.3), absolute_floor=coherence_config.get("absolute_floor", 0.5), cooldown_minutes=coherence_config.get("cooldown_minutes", 30), ) if degradation and not dry_run: await self._handle_coherence_alert(user_ref_msg, degradation, coherence_config, db_message_id) # Capture LLM note updates about this user if note_update: self.bot.drama_tracker.update_user_notes(user_id, note_update) self._dirty_users.add(user_id) # Mark dirty for coherence baseline drift even without actions self._dirty_users.add(user_id) # Always log analysis to #bcs-log if it exists await self._log_analysis(user_ref_msg, score, drama_score, categories, reasoning, off_topic, topic_category) # Moderation actions (skip in dry-run mode) if not dry_run: mute_threshold = self.bot.drama_tracker.get_mute_threshold( user_id, base_mute_threshold ) user_data = self.bot.drama_tracker.get_user(user_id) # Mute: rolling average OR single message spike if drama_score >= mute_threshold or score >= spike_mute: effective_score = max(drama_score, score) if user_data.warned_since_reset: await self._mute_user(user_ref_msg, effective_score, categories, db_message_id) else: # Downgrade to warning — require a warning before muting logger.info("Downgrading mute to warning for %s (no prior warning)", user_ref_msg.author) await self._warn_user(user_ref_msg, effective_score, db_message_id) # Warn: rolling average OR single message spike elif drama_score >= warning_threshold or score >= spike_warn: effective_score = max(drama_score, score) await self._warn_user(user_ref_msg, effective_score, db_message_id) # -- Mention scan methods -- def _mark_analyzed(self, discord_message_id: int): """Track a Discord message ID as already analyzed.""" self._analyzed_message_ids.add(discord_message_id) if len(self._analyzed_message_ids) > self._max_analyzed_ids: sorted_ids = sorted(self._analyzed_message_ids) self._analyzed_message_ids = set(sorted_ids[len(sorted_ids) // 2:]) async def _maybe_start_mention_scan( self, trigger_message: discord.Message, mention_config: dict ): """Check cooldown and kick off a mention-triggered scan of recent messages.""" channel_id = trigger_message.channel.id cooldown_seconds = mention_config.get("cooldown_seconds", 60) now = datetime.now(timezone.utc) last_scan = self._mention_scan_cooldowns.get(channel_id) if last_scan and (now - last_scan).total_seconds() < cooldown_seconds: logger.info( "Mention scan cooldown active for #%s, skipping.", getattr(trigger_message.channel, "name", "unknown"), ) return self._mention_scan_cooldowns[channel_id] = now # Extract the user's concern (strip the bot ping from the message) mention_text = trigger_message.content for fmt in (f"<@{self.bot.user.id}>", f"<@!{self.bot.user.id}>"): mention_text = mention_text.replace(fmt, "") mention_text = mention_text.strip() or "(user pinged bot without specific concern)" # Store as a mention scan task (separate from debounce) existing_task = self._mention_scan_tasks.get(channel_id) if existing_task and not existing_task.done(): existing_task.cancel() self._mention_scan_tasks[channel_id] = asyncio.create_task( self._run_mention_scan(trigger_message, mention_text, mention_config) ) async def _run_mention_scan( self, trigger_message: discord.Message, mention_text: str, mention_config: dict, ): """Scan recent channel messages with ONE conversation-level LLM call.""" channel = trigger_message.channel scan_count = mention_config.get("scan_messages", 30) config = self.bot.config sentiment_config = config.get("sentiment", {}) game_channels = config.get("game_channels", {}) # Fetch recent messages (before the trigger, skip bots/empty) raw_messages: list[discord.Message] = [] try: async for msg in channel.history(limit=scan_count + 10, before=trigger_message): if msg.author.bot: continue if not msg.content or not msg.content.strip(): continue raw_messages.append(msg) if len(raw_messages) >= scan_count: break except discord.HTTPException: logger.warning("Failed to fetch history for mention scan in #%s", getattr(channel, "name", "unknown")) return raw_messages.reverse() # chronological order if not raw_messages: self._mention_scan_results[trigger_message.id] = "No recent messages found to analyze." return logger.info( "Mention scan triggered by %s in #%s: %d messages (single LLM call). Focus: %s", trigger_message.author.display_name, getattr(channel, "name", "unknown"), len(raw_messages), mention_text[:80], ) # Build a lookup of message IDs to authors for resolving replies msg_id_to_author: dict[int, str] = { m.id: m.author.display_name for m in raw_messages } # Convert to conversation tuples: (username, content, timestamp, reply_to) conversation: list[tuple[str, str, datetime, str | None]] = [] for msg in raw_messages: reply_to = None if msg.reference and msg.reference.message_id: # Try our local lookup first reply_to = msg_id_to_author.get(msg.reference.message_id) if not reply_to: # Try the cached message ref = msg.reference.cached_message if ref: reply_to = ref.author.display_name conversation.append(( msg.author.display_name, msg.content, msg.created_at, reply_to, )) # Build user notes map user_notes_map: dict[str, str] = {} for msg in raw_messages: name = msg.author.display_name if name not in user_notes_map: notes = self.bot.drama_tracker.get_user_notes(msg.author.id) if notes: user_notes_map[name] = notes channel_context = self._build_channel_context(raw_messages[0], game_channels) mention_context = ( f"A user flagged this conversation and said: \"{mention_text}\"\n" f"Pay special attention to whether this concern is valid." ) # Single LLM call result = await self.bot.llm.analyze_conversation( conversation, mention_context=mention_context, channel_context=channel_context, user_notes_map=user_notes_map, ) if result is None: logger.warning("Conversation analysis failed for mention scan.") self._mention_scan_results[trigger_message.id] = "Analysis failed." return # Build username → user_id + ref_message mapping user_lookup: dict[str, tuple[int, discord.Message, list[discord.Message]]] = {} for msg in raw_messages: name = msg.author.display_name if name not in user_lookup: user_lookup[name] = (msg.author.id, msg, []) user_lookup[name][2].append(msg) findings: list[str] = [] # Resolve thresholds once (outside the loop) dry_run = config.get("monitoring", {}).get("dry_run", False) mode_config = self.bot.get_mode_config() moderation_level = mode_config.get("moderation", "full") if moderation_level == "relaxed" and "relaxed_thresholds" in mode_config: rt = mode_config["relaxed_thresholds"] warning_threshold = rt.get("warning_threshold", 0.80) base_mute_threshold = rt.get("mute_threshold", 0.85) spike_warn = rt.get("spike_warning_threshold", 0.70) spike_mute = rt.get("spike_mute_threshold", 0.85) else: warning_threshold = sentiment_config.get("warning_threshold", 0.6) base_mute_threshold = sentiment_config.get("mute_threshold", 0.75) spike_warn = sentiment_config.get("spike_warning_threshold", 0.5) spike_mute = sentiment_config.get("spike_mute_threshold", 0.8) for finding in result.get("user_findings", []): username = finding["username"] score = finding["toxicity_score"] categories = finding["categories"] reasoning = finding["reasoning"] worst_msg = finding.get("worst_message") off_topic = finding.get("off_topic", False) note_update = finding.get("note_update") lookup = user_lookup.get(username) if not lookup: logger.warning("Mention scan: LLM returned unknown user '%s', skipping.", username) continue user_id, ref_msg, user_msgs = lookup # Skip if all their messages were already analyzed if all(m.id in self._analyzed_message_ids for m in user_msgs): continue # Mark their messages as analyzed for m in user_msgs: self._mark_analyzed(m.id) self.bot.drama_tracker.add_entry(user_id, score, categories, reasoning) escalation_boost = sentiment_config.get("escalation_boost", 0.04) drama_score = self.bot.drama_tracker.get_drama_score(user_id, escalation_boost=escalation_boost) # Save to DB content_summary = f"[Mention scan] {worst_msg}" if worst_msg else "[Mention scan] See conversation" db_message_id = await self.bot.db.save_message_and_analysis( guild_id=ref_msg.guild.id, channel_id=ref_msg.channel.id, user_id=user_id, username=username, content=content_summary, message_ts=ref_msg.created_at.replace(tzinfo=timezone.utc), toxicity_score=score, drama_score=drama_score, categories=categories, reasoning=reasoning, off_topic=off_topic, topic_category="personal_drama" if off_topic else "gaming", topic_reasoning=reasoning, coherence_score=None, coherence_flag=None, ) # Log to #bcs-log await self._log_analysis( ref_msg, score, drama_score, categories, reasoning, off_topic, "personal_drama" if off_topic else "gaming", ) # Collect notable findings for summary if score >= 0.3: cat_str = ", ".join(c for c in categories if c != "none") or "none" findings.append(f"{username}: {score:.2f} ({cat_str})") # Update user notes if note_update: self.bot.drama_tracker.update_user_notes(user_id, note_update) self._dirty_users.add(user_id) # Moderation actions if not dry_run: mute_threshold = self.bot.drama_tracker.get_mute_threshold( user_id, base_mute_threshold ) user_data = self.bot.drama_tracker.get_user(user_id) if drama_score >= mute_threshold or score >= spike_mute: effective_score = max(drama_score, score) if user_data.warned_since_reset: await self._mute_user(ref_msg, effective_score, categories, db_message_id) else: logger.info("Downgrading mute to warning for %s (no prior warning)", ref_msg.author) await self._warn_user(ref_msg, effective_score, db_message_id) elif drama_score >= warning_threshold or score >= spike_warn: effective_score = max(drama_score, score) await self._warn_user(ref_msg, effective_score, db_message_id) # Build summary for ChatCog convo_summary = result.get("conversation_summary", "") if findings: summary = f"Scanned {len(raw_messages)} msgs. {convo_summary} Notable: " + "; ".join(findings[:5]) else: summary = f"Scanned {len(raw_messages)} msgs. {convo_summary}" # Prune old scan results if len(self._mention_scan_results) > 20: oldest = sorted(self._mention_scan_results.keys())[:len(self._mention_scan_results) - 10] for k in oldest: del self._mention_scan_results[k] self._mention_scan_results[trigger_message.id] = summary logger.info( "Mention scan complete in #%s: 1 LLM call, %d messages, %d users flagged", getattr(channel, "name", "unknown"), len(raw_messages), len(findings), ) async def _mute_user( self, message: discord.Message, score: float, categories: list[str], db_message_id: int | None = None, ): member = message.author if not isinstance(member, discord.Member): return # Check bot permissions if not message.guild.me.guild_permissions.moderate_members: logger.warning("Missing moderate_members permission, cannot mute.") return # Record offense and get escalating timeout offense_num = self.bot.drama_tracker.record_offense(member.id) timeout_config = self.bot.config.get("timeouts", {}) escalation = timeout_config.get("escalation_minutes", [5, 15, 30, 60]) idx = min(offense_num - 1, len(escalation) - 1) duration_minutes = escalation[idx] try: await member.timeout( timedelta(minutes=duration_minutes), reason=f"BCS auto-mute: drama score {score:.2f}", ) except discord.Forbidden: logger.warning("Cannot timeout %s — role hierarchy issue.", member) return except discord.HTTPException as e: logger.error("Failed to timeout %s: %s", member, e) return # Build embed messages_config = self.bot.config.get("messages", {}) cat_str = ", ".join(c for c in categories if c != "none") or "general negativity" embed = discord.Embed( title=messages_config.get("mute_title", "BREEHAVIOR ALERT"), description=messages_config.get("mute_description", "").format( username=member.display_name, duration=f"{duration_minutes} minutes", score=f"{score:.2f}", categories=cat_str, ), color=discord.Color.red(), ) embed.set_footer( text=f"Offense #{offense_num} | Timeout: {duration_minutes}m" ) await message.channel.send(embed=embed) await self._log_action( message.guild, f"**MUTE** | {member.mention} | Score: {score:.2f} | " f"Duration: {duration_minutes}m | Offense #{offense_num} | " f"Categories: {cat_str}", ) logger.info( "Muted %s for %d minutes (offense #%d, score %.2f)", member, duration_minutes, offense_num, score, ) # Persist mute action and updated user state (fire-and-forget) asyncio.create_task(self.bot.db.save_action( guild_id=message.guild.id, user_id=member.id, username=member.display_name, action_type="mute", message_id=db_message_id, details=f"duration={duration_minutes}m offense={offense_num} score={score:.2f} categories={cat_str}", )) self._save_user_state(member.id) async def _warn_user(self, message: discord.Message, score: float, db_message_id: int | None = None): timeout_config = self.bot.config.get("timeouts", {}) cooldown = timeout_config.get("warning_cooldown_minutes", 5) if not self.bot.drama_tracker.can_warn(message.author.id, cooldown): return self.bot.drama_tracker.record_warning(message.author.id) # React with warning emoji try: await message.add_reaction("\u26a0\ufe0f") except discord.HTTPException: pass # Send warning message messages_config = self.bot.config.get("messages", {}) warning_text = messages_config.get( "warning", "Easy there, {username}. The Breehavior Monitor is watching.", ).format(username=message.author.display_name) await message.channel.send(warning_text) await self._log_action( message.guild, f"**WARNING** | {message.author.mention} | Score: {score:.2f}", ) logger.info("Warned %s (score %.2f)", message.author, score) # Persist warning action (fire-and-forget) asyncio.create_task(self.bot.db.save_action( guild_id=message.guild.id, user_id=message.author.id, username=message.author.display_name, action_type="warning", message_id=db_message_id, details=f"score={score:.2f}", )) # Persist warned flag immediately so it survives restarts self._save_user_state(message.author.id) async def _handle_topic_drift( self, message: discord.Message, topic_category: str, topic_reasoning: str, db_message_id: int | None = None, ): config = self.bot.config.get("topic_drift", {}) if not config.get("enabled", True): return # Skip channels excluded from topic drift monitoring ignored = config.get("ignored_channels", []) if message.channel.id in ignored or getattr(message.channel, "name", "") in ignored: return # Check if we're in dry-run mode — still track but don't act dry_run = self.bot.config.get("monitoring", {}).get("dry_run", False) if dry_run: return tracker = self.bot.drama_tracker user_id = message.author.id cooldown = config.get("remind_cooldown_minutes", 10) if not tracker.can_topic_remind(user_id, cooldown): return count = tracker.record_off_topic(user_id) escalation_threshold = config.get("escalation_count", 3) messages_config = self.bot.config.get("messages", {}) if count >= escalation_threshold and not tracker.was_owner_notified(user_id): # DM the server owner tracker.mark_owner_notified(user_id) owner = message.guild.owner if owner: dm_text = messages_config.get( "topic_owner_dm", "Heads up: {username} keeps going off-topic in #{channel}. Reminded {count} times.", ).format( username=message.author.display_name, channel=message.channel.name, count=count, ) try: await owner.send(dm_text) except discord.HTTPException: logger.warning("Could not DM server owner about topic drift.") await self._log_action( message.guild, f"**TOPIC DRIFT — OWNER NOTIFIED** | {message.author.mention} | " f"Off-topic count: {count} | Category: {topic_category}", ) logger.info("Notified owner about %s topic drift (count %d)", message.author, count) asyncio.create_task(self.bot.db.save_action( guild_id=message.guild.id, user_id=user_id, username=message.author.display_name, action_type="topic_escalation", message_id=db_message_id, details=f"off_topic_count={count} category={topic_category}", )) self._save_user_state(user_id) elif count >= 2: # Firmer nudge nudge_text = messages_config.get( "topic_nudge", "{username}, let's keep it to gaming talk in here.", ).format(username=message.author.display_name) await message.channel.send(nudge_text) await self._log_action( message.guild, f"**TOPIC NUDGE** | {message.author.mention} | " f"Off-topic count: {count} | Category: {topic_category}", ) logger.info("Topic nudge for %s (count %d)", message.author, count) asyncio.create_task(self.bot.db.save_action( guild_id=message.guild.id, user_id=user_id, username=message.author.display_name, action_type="topic_nudge", message_id=db_message_id, details=f"off_topic_count={count} category={topic_category}", )) self._save_user_state(user_id) else: # Friendly first reminder remind_text = messages_config.get( "topic_remind", "Hey {username}, this is a gaming server — maybe take the personal stuff to DMs?", ).format(username=message.author.display_name) await message.channel.send(remind_text) await self._log_action( message.guild, f"**TOPIC REMIND** | {message.author.mention} | " f"Category: {topic_category} | {topic_reasoning}", ) logger.info("Topic remind for %s (count %d)", message.author, count) asyncio.create_task(self.bot.db.save_action( guild_id=message.guild.id, user_id=user_id, username=message.author.display_name, action_type="topic_remind", message_id=db_message_id, details=f"off_topic_count={count} category={topic_category} reasoning={topic_reasoning}", )) self._save_user_state(user_id) async def _handle_coherence_alert( self, message: discord.Message, degradation: dict, coherence_config: dict, db_message_id: int | None = None, ): flag = degradation["flag"] messages_map = coherence_config.get("messages", {}) alert_text = messages_map.get(flag, messages_map.get( "default", "You okay there, {username}? That message was... something." )).format(username=message.author.display_name) await message.channel.send(alert_text) await self._log_action( message.guild, f"**COHERENCE ALERT** | {message.author.mention} | " f"Score: {degradation['current']:.2f} | Baseline: {degradation['baseline']:.2f} | " f"Drop: {degradation['drop']:.2f} | Flag: {flag}", ) logger.info( "Coherence alert for %s: score=%.2f baseline=%.2f drop=%.2f flag=%s", message.author, degradation["current"], degradation["baseline"], degradation["drop"], flag, ) asyncio.create_task(self.bot.db.save_action( guild_id=message.guild.id, user_id=message.author.id, username=message.author.display_name, action_type="coherence_alert", message_id=db_message_id, details=f"score={degradation['current']:.2f} baseline={degradation['baseline']:.2f} drop={degradation['drop']:.2f} flag={flag}", )) self._save_user_state(message.author.id) def _save_user_state(self, user_id: int) -> None: """Fire-and-forget save of a user's current state to DB.""" user_data = self.bot.drama_tracker.get_user(user_id) asyncio.create_task(self.bot.db.save_user_state( user_id=user_id, offense_count=user_data.offense_count, immune=user_data.immune, off_topic_count=user_data.off_topic_count, baseline_coherence=user_data.baseline_coherence, user_notes=user_data.notes or None, warned=user_data.warned_since_reset, last_offense_at=user_data.last_offense_time or None, )) self._dirty_users.discard(user_id) @tasks.loop(seconds=STATE_FLUSH_INTERVAL) async def _flush_states(self): await self._flush_dirty_states() @_flush_states.before_loop async def _before_flush(self): await self.bot.wait_until_ready() async def _flush_dirty_states(self) -> None: """Save all dirty user states to DB.""" if not self._dirty_users: return dirty = list(self._dirty_users) self._dirty_users.clear() for user_id in dirty: user_data = self.bot.drama_tracker.get_user(user_id) await self.bot.db.save_user_state( user_id=user_id, offense_count=user_data.offense_count, immune=user_data.immune, off_topic_count=user_data.off_topic_count, baseline_coherence=user_data.baseline_coherence, user_notes=user_data.notes or None, warned=user_data.warned_since_reset, last_offense_at=user_data.last_offense_time or None, ) logger.info("Flushed %d dirty user states to DB.", len(dirty)) def _build_channel_context(self, message: discord.Message, game_channels: dict) -> str: """Build a channel context string for LLM game detection.""" if not game_channels: return "" channel_name = getattr(message.channel, "name", "") current_game = game_channels.get(channel_name) lines = [] if current_game: lines.append(f"Current channel: #{channel_name} ({current_game})") else: lines.append(f"Current channel: #{channel_name}") channel_list = ", ".join(f"#{ch} ({game})" for ch, game in game_channels.items()) lines.append(f"Game channels: {channel_list}") return "\n".join(lines) async def _handle_channel_redirect( self, message: discord.Message, detected_game: str, game_channels: dict, db_message_id: int | None = None, ): """Send a redirect message if the user is talking about a different game.""" channel_name = getattr(message.channel, "name", "") # Only redirect if message is in a game channel if channel_name not in game_channels: return # No redirect needed if detected game matches current channel if detected_game == channel_name: return # Detected game must be a valid game channel if detected_game not in game_channels: return # Find the target channel in the guild target_channel = discord.utils.get( message.guild.text_channels, name=detected_game ) if not target_channel: return # Check per-user cooldown (reuse topic_drift remind_cooldown_minutes) user_id = message.author.id cooldown_minutes = self.bot.config.get("topic_drift", {}).get("remind_cooldown_minutes", 10) now = datetime.now(timezone.utc) last_redirect = self._redirect_cooldowns.get(user_id) if last_redirect and (now - last_redirect) < timedelta(minutes=cooldown_minutes): return self._redirect_cooldowns[user_id] = now # Send redirect message messages_config = self.bot.config.get("messages", {}) game_name = game_channels[detected_game] redirect_text = messages_config.get( "channel_redirect", "Hey {username}, that sounds like {game} talk — head over to {channel} for that!", ).format( username=message.author.display_name, game=game_name, channel=target_channel.mention, ) await message.channel.send(redirect_text) await self._log_action( message.guild, f"**CHANNEL REDIRECT** | {message.author.mention} | " f"#{channel_name} → #{detected_game} ({game_name})", ) logger.info( "Redirected %s from #%s to #%s (%s)", message.author, channel_name, detected_game, game_name, ) asyncio.create_task(self.bot.db.save_action( guild_id=message.guild.id, user_id=user_id, username=message.author.display_name, action_type="channel_redirect", message_id=db_message_id, details=f"from=#{channel_name} to=#{detected_game} game={game_name}", )) async def _log_analysis( self, message: discord.Message, score: float, drama_score: float, categories: list[str], reasoning: str, off_topic: bool, topic_category: str, ): log_channel = discord.utils.get( message.guild.text_channels, name="bcs-log" ) if not log_channel: return # Only log notable messages (score > 0.1) to avoid spam if score <= 0.1: return cat_str = ", ".join(c for c in categories if c != "none") or "none" embed = discord.Embed( title=f"Analysis: {message.author.display_name}", description=f"#{message.channel.name}: {message.content[:200]}", color=self._score_color(score), ) embed.add_field(name="Message Score", value=f"{score:.2f}", inline=True) embed.add_field(name="Rolling Drama", value=f"{drama_score:.2f}", inline=True) embed.add_field(name="Categories", value=cat_str, inline=True) embed.add_field(name="Reasoning", value=reasoning[:1024] or "n/a", inline=False) try: await log_channel.send(embed=embed) except discord.HTTPException: pass @staticmethod def _score_color(score: float) -> discord.Color: if score >= 0.75: return discord.Color.red() if score >= 0.6: return discord.Color.orange() if score >= 0.3: return discord.Color.yellow() return discord.Color.green() async def _log_action(self, guild: discord.Guild, text: str): log_channel = discord.utils.get(guild.text_channels, name="bcs-log") if log_channel: try: await log_channel.send(text) except discord.HTTPException: pass async def setup(bot: commands.Bot): await bot.add_cog(SentimentCog(bot))