diff --git a/CLAUDE.md b/CLAUDE.md index 83c86ea..bac3d92 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -1,47 +1,95 @@ -# Breehavior Monitor +# CLAUDE.md -Discord bot for monitoring chat toxicity, topic drift, coherence, and game channel routing. Runs as a Docker container on `barge.lan`. +This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository. + +## Project Overview + +Breehavior Monitor (BCS) — a Python Discord bot that uses LLM-powered analysis to monitor chat toxicity, topic drift, coherence degradation, and game channel routing. It runs as a Docker container on `barge.lan`. + +## Development Commands + +```bash +# Local dev (requires .env with DISCORD_BOT_TOKEN, DB_CONNECTION_STRING, LLM vars) +python bot.py + +# Local dev with Docker (bot + MSSQL) +docker compose up --build + +# View logs +docker logs bcs-bot --tail 50 +``` + +There are no tests or linting configured. ## Deployment -Production runs at `barge.lan:/mnt/docker/breehavior-monitor/`. +Production runs at `barge.lan:/mnt/docker/breehavior-monitor/`. Image hosted on Gitea registry. -The Docker image is hosted on the Gitea registry at `git.thecozycat.net/aj/breehavior-monitor:latest`. The `config.yaml` is volume-mounted (not baked into the image). +```bash +# Full deploy (code + config) +git push origin master +docker build -t git.thecozycat.net/aj/breehavior-monitor:latest . +docker push git.thecozycat.net/aj/breehavior-monitor:latest +scp config.yaml aj@barge.lan:/mnt/docker/breehavior-monitor/config.yaml +ssh aj@barge.lan "cd /mnt/docker/breehavior-monitor && docker compose pull && docker compose up -d" -### Steps - -1. **Push code** to Gitea (origin): - ```bash - git push origin master - ``` - -2. **Build and push** the Docker image: - ```bash - docker build -t git.thecozycat.net/aj/breehavior-monitor:latest . - docker push git.thecozycat.net/aj/breehavior-monitor:latest - ``` - -3. **Copy config.yaml** to barge (it's volume-mounted, not in the image): - ```bash - scp config.yaml aj@barge.lan:/mnt/docker/breehavior-monitor/config.yaml - ``` - -4. **Pull and restart** on barge: - ```bash - ssh aj@barge.lan "cd /mnt/docker/breehavior-monitor && docker compose pull && docker compose up -d" - ``` - -5. **Verify** the bot started: - ```bash - ssh aj@barge.lan "docker logs bcs-bot --tail 10" - ``` - -### Config-only changes - -If only `config.yaml` changed (no code changes), skip steps 1-2 and just do steps 3-4. The config is mounted as a volume so a container restart picks it up. +# Config-only deploy (no code changes) +scp config.yaml aj@barge.lan:/mnt/docker/breehavior-monitor/config.yaml +ssh aj@barge.lan "cd /mnt/docker/breehavior-monitor && docker compose restart bcs-bot" +``` ## Architecture -- **LLM backend**: llama.cpp on `athena.lan:11434` (text mode must be active) -- **Database**: MSSQL Express in a separate container (`bcs-mssql`) on barge — only used for local dev. Production DB is already running on barge. -- **Prompts**: `prompts/*.txt` — loaded at import time, so code changes require a container rebuild +### LLM Tier System + +The bot uses three LLM client instances (`LLMClient` wrapping OpenAI-compatible API): + +- **`bot.llm` (triage)**: Cheap local model on athena.lan for first-pass sentiment analysis. Configured via `LLM_BASE_URL`, `LLM_MODEL`. +- **`bot.llm_heavy` (escalation)**: More capable model for re-analysis when triage scores above `escalation_threshold` (0.25), admin commands (`/bcs-scan`, `/bcs-test`). Configured via `LLM_ESCALATION_*` env vars. +- **`bot.llm_chat` (chat/roast)**: Dedicated model for conversational replies and image roasts. Falls back to `llm_heavy` if `LLM_CHAT_MODEL` not set. + +LLM calls use OpenAI tool-calling for structured output (`ANALYSIS_TOOL`, `CONVERSATION_TOOL` in `utils/llm_client.py`). Chat uses streaming. All calls go through a semaphore for concurrency control. + +### Cog Structure + +- **`cogs/sentiment.py` (SentimentCog)**: Core moderation engine. Listens to all messages, debounces per-channel (batches messages within `batch_window_seconds`), runs triage → escalation analysis, issues warnings/mutes. Also handles mention-triggered conversation scans and game channel redirects. Flushes dirty user states to DB every 5 minutes. +- **`cogs/chat.py` (ChatCog)**: Conversational AI. Responds to @mentions, replies to bot messages, proactive replies based on mode config. Handles image roasts via vision model. Strips leaked LLM metadata brackets from responses. +- **`cogs/commands.py` (CommandsCog)**: Slash commands — `/dramareport`, `/dramascore`, `/bcs-status`, `/bcs-threshold`, `/bcs-reset`, `/bcs-immune`, `/bcs-history`, `/bcs-scan`, `/bcs-test`, `/bcs-notes`, `/bcs-mode`. +- **`cogs/wordle.py` (WordleCog)**: Watches for Wordle bot messages and generates fun commentary on results. + +### Key Utilities + +- **`utils/drama_tracker.py`**: In-memory per-user state (toxicity entries, offense counts, coherence baselines, LLM notes). Rolling window with time + size pruning. Weighted scoring with post-warning escalation boost. Hydrated from DB on startup. +- **`utils/database.py`**: MSSQL via pyodbc. Schema auto-creates/migrates on init. Per-operation connections (no pool). Tables: `Messages`, `AnalysisResults`, `Actions`, `UserState`, `BotSettings`, `LlmLog`. Gracefully degrades to memory-only mode if DB unavailable. +- **`utils/llm_client.py`**: OpenAI-compatible client. Methods: `analyze_message` (single), `analyze_conversation` (batch/mention scan), `chat` (streaming), `analyze_image` (vision), `raw_analyze` (debug). All calls logged to `LlmLog` table. + +### Mode System + +Modes are defined in `config.yaml` under `modes:` and control personality, moderation level, and proactive reply behavior. Each mode specifies a `prompt_file` from `prompts/`, moderation level (`full` or `relaxed` with custom thresholds), and reply chance. Modes persist across restarts via `BotSettings` table. Changed via `/bcs-mode` command. + +### Moderation Flow + +1. Message arrives → SentimentCog buffers it (debounce per channel) +2. After `batch_window_seconds`, buffered messages analyzed as conversation block +3. Triage model scores each user → if any score >= `escalation_threshold`, re-analyze with heavy model +4. Results feed into DramaTracker rolling window → weighted drama score calculated +5. Warning if score >= threshold AND user hasn't been warned recently +6. Mute (timeout) if score >= mute threshold AND user was already warned (requires warning first) +7. Post-warning escalation: each subsequent high-scoring message adds `escalation_boost` to drama score + +### Prompts + +`prompts/*.txt` are loaded at import time and cached. The analysis system prompt (`analysis.txt`) defines scoring bands and rules. Chat personality prompts are per-mode. Changes to prompt files require container rebuild. + +### Environment Variables + +Key vars in `.env`: `DISCORD_BOT_TOKEN`, `DB_CONNECTION_STRING`, `LLM_BASE_URL`, `LLM_MODEL`, `LLM_API_KEY`, `LLM_ESCALATION_BASE_URL`, `LLM_ESCALATION_MODEL`, `LLM_ESCALATION_API_KEY`, `LLM_CHAT_BASE_URL`, `LLM_CHAT_MODEL`, `LLM_CHAT_API_KEY`, `MSSQL_SA_PASSWORD`. + +### Important Patterns + +- DB operations use `asyncio.to_thread()` wrapping synchronous pyodbc calls +- Fire-and-forget DB writes use `asyncio.create_task()` +- Single-instance guard via TCP port binding (`BCS_LOCK_PORT`, default 39821) +- `config.yaml` is volume-mounted in production, not baked into the image +- Bot uses `network_mode: host` in Docker to reach LAN services +- Models that don't support temperature (reasoning models like o1/o3/o4-mini) are handled via `_NO_TEMPERATURE_MODELS` set diff --git a/cogs/sentiment.py b/cogs/sentiment.py deleted file mode 100644 index 8612f51..0000000 --- a/cogs/sentiment.py +++ /dev/null @@ -1,1083 +0,0 @@ -import asyncio -import logging -from datetime import datetime, timedelta, timezone - - -import discord -from discord.ext import commands, tasks - -logger = logging.getLogger("bcs.sentiment") - -# How often to flush dirty user states to DB (seconds) -STATE_FLUSH_INTERVAL = 300 # 5 minutes - - -class SentimentCog(commands.Cog): - def __init__(self, bot: commands.Bot): - self.bot = bot - # Track which user IDs have unsaved in-memory changes - self._dirty_users: set[int] = set() - # Per-user redirect cooldown: {user_id: last_redirect_datetime} - self._redirect_cooldowns: dict[int, datetime] = {} - # Debounce buffer: keyed by channel_id, stores list of messages from ALL users - self._message_buffer: dict[int, list[discord.Message]] = {} - # Pending debounce timer tasks (per-channel) - self._debounce_tasks: dict[int, asyncio.Task] = {} - # Mention scan tasks (separate from debounce) - self._mention_scan_tasks: dict[int, asyncio.Task] = {} - # Mention scan state - self._mention_scan_cooldowns: dict[int, datetime] = {} # {channel_id: last_scan_time} - self._mention_scan_results: dict[int, str] = {} # {trigger_message_id: findings_summary} - self._analyzed_message_ids: set[int] = set() # Discord message IDs already analyzed - self._max_analyzed_ids = 500 - - - async def cog_load(self): - self._flush_states.start() - - async def cog_unload(self): - self._flush_states.cancel() - # Cancel all pending debounce timers and process remaining buffers - for task in self._debounce_tasks.values(): - task.cancel() - self._debounce_tasks.clear() - for task in self._mention_scan_tasks.values(): - task.cancel() - self._mention_scan_tasks.clear() - for channel_id in list(self._message_buffer): - await self._process_buffered(channel_id) - # Final flush on shutdown - await self._flush_dirty_states() - - @commands.Cog.listener() - async def on_message(self, message: discord.Message): - logger.info("MSG from %s in #%s: %s", message.author, getattr(message.channel, 'name', 'DM'), message.content[:80] if message.content else "(empty)") - - # Ignore bots (including ourselves) - if message.author.bot: - return - - # Ignore DMs - if not message.guild: - return - - config = self.bot.config - monitoring = config.get("monitoring", {}) - - if not monitoring.get("enabled", True): - return - - # Check if channel is monitored - monitored_channels = monitoring.get("channels", []) - if monitored_channels and message.channel.id not in monitored_channels: - return - - # Check ignored users - if message.author.id in monitoring.get("ignored_users", []): - return - - # Check immune roles - immune_roles = set(monitoring.get("immune_roles", [])) - if immune_roles and any( - r.id in immune_roles for r in message.author.roles - ): - return - - # Check per-user immunity - if self.bot.drama_tracker.is_immune(message.author.id): - return - - # Explicit @mention of the bot triggers a mention scan instead of scoring. - # Reply-pings (Discord auto-adds replied-to user to mentions) should NOT - # trigger scans — and reply-to-bot messages should still be scored normally - # so toxic replies to bot warnings aren't silently skipped. - bot_mentioned_in_text = ( - f"<@{self.bot.user.id}>" in (message.content or "") - or f"<@!{self.bot.user.id}>" in (message.content or "") - ) - if bot_mentioned_in_text: - mention_config = config.get("mention_scan", {}) - if mention_config.get("enabled", True): - await self._maybe_start_mention_scan(message, mention_config) - return - - # Skip if empty - if not message.content or not message.content.strip(): - return - - # Buffer the message and start/reset debounce timer (per-channel) - channel_id = message.channel.id - if channel_id not in self._message_buffer: - self._message_buffer[channel_id] = [] - self._message_buffer[channel_id].append(message) - - # Cancel existing debounce timer for this channel - existing_task = self._debounce_tasks.get(channel_id) - if existing_task and not existing_task.done(): - existing_task.cancel() - - batch_window = config.get("sentiment", {}).get("batch_window_seconds", 3) - - self._debounce_tasks[channel_id] = asyncio.create_task( - self._debounce_then_process(channel_id, batch_window) - ) - - async def _debounce_then_process(self, channel_id: int, delay: float): - """Sleep for the debounce window, then process the buffered messages.""" - try: - await asyncio.sleep(delay) - await self._process_buffered(channel_id) - except asyncio.CancelledError: - pass # Timer was reset by a new message — expected - - async def _process_buffered(self, channel_id: int): - """Collect buffered messages, build conversation block, and run analysis.""" - messages = self._message_buffer.pop(channel_id, []) - self._debounce_tasks.pop(channel_id, None) - - if not messages: - return - - # Use the last message as reference for channel/guild - ref_message = messages[-1] - channel = ref_message.channel - - config = self.bot.config - sentiment_config = config.get("sentiment", {}) - game_channels = config.get("game_channels", {}) - - # Fetch some history before the buffered messages for leading context - context_count = sentiment_config.get("context_messages", 8) - oldest_buffered = messages[0] - history_messages: list[discord.Message] = [] - try: - async for msg in channel.history(limit=context_count + 5, before=oldest_buffered): - if msg.author.bot: - continue - if not msg.content or not msg.content.strip(): - continue - history_messages.append(msg) - if len(history_messages) >= context_count: - break - except discord.HTTPException: - pass - - history_messages.reverse() # chronological order - - # Combine: history (context) + buffered (new messages to analyze) - new_message_start = len(history_messages) - all_messages = history_messages + messages - - # Build msg_id_to_author lookup for reply resolution - msg_id_to_author: dict[int, str] = { - m.id: m.author.display_name for m in all_messages - } - - # Convert to conversation tuples: (username, content, timestamp, reply_to_username) - conversation: list[tuple[str, str, datetime, str | None]] = [] - for msg in all_messages: - reply_to = None - if msg.reference and msg.reference.message_id: - reply_to = msg_id_to_author.get(msg.reference.message_id) - if not reply_to: - ref = msg.reference.cached_message - if ref: - reply_to = ref.author.display_name - conversation.append(( - msg.author.display_name, - msg.content, - msg.created_at, - reply_to, - )) - - if not conversation: - return - - # Build user notes map (only for users in the buffer, not history-only) - user_notes_map: dict[str, str] = {} - for msg in messages: - name = msg.author.display_name - if name not in user_notes_map: - notes = self.bot.drama_tracker.get_user_notes(msg.author.id) - if notes: - user_notes_map[name] = notes - - channel_context = self._build_channel_context(ref_message, game_channels) - - logger.info( - "Channel analysis: %d new messages (+%d context) in #%s", - len(messages), len(history_messages), - getattr(channel, 'name', 'unknown'), - ) - - # TRIAGE: Lightweight model — conversation-level analysis - result = await self.bot.llm.analyze_conversation( - conversation, - channel_context=channel_context, - user_notes_map=user_notes_map, - new_message_start=new_message_start, - ) - - if result is None: - return - - # ESCALATION: Re-analyze with heavy model if any finding warrants it - escalation_threshold = sentiment_config.get("escalation_threshold", 0.25) - needs_escalation = any( - f["toxicity_score"] >= escalation_threshold - or f.get("off_topic", False) - or f.get("coherence_score", 1.0) < 0.6 - for f in result.get("user_findings", []) - ) - if needs_escalation: - heavy_result = await self.bot.llm_heavy.analyze_conversation( - conversation, - channel_context=channel_context, - user_notes_map=user_notes_map, - new_message_start=new_message_start, - ) - if heavy_result is not None: - logger.info( - "Escalated to heavy model for #%s", - getattr(channel, 'name', 'unknown'), - ) - result = heavy_result - - # Build username -> (user_id, ref_msg, [messages]) for buffered users only - user_lookup: dict[str, tuple[int, discord.Message, list[discord.Message]]] = {} - for msg in messages: - name = msg.author.display_name - if name not in user_lookup: - user_lookup[name] = (msg.author.id, msg, []) - user_lookup[name][2].append(msg) - - # Mark all buffered messages as analyzed (for mention scan dedup) - for m in messages: - self._mark_analyzed(m.id) - - # Resolve thresholds once - dry_run = config.get("monitoring", {}).get("dry_run", False) - mode_config = self.bot.get_mode_config() - moderation_level = mode_config.get("moderation", "full") - if moderation_level == "relaxed" and "relaxed_thresholds" in mode_config: - rt = mode_config["relaxed_thresholds"] - warning_threshold = rt.get("warning_threshold", 0.80) - base_mute_threshold = rt.get("mute_threshold", 0.85) - spike_warn = rt.get("spike_warning_threshold", 0.70) - spike_mute = rt.get("spike_mute_threshold", 0.85) - else: - warning_threshold = sentiment_config.get("warning_threshold", 0.6) - base_mute_threshold = sentiment_config.get("mute_threshold", 0.75) - spike_warn = sentiment_config.get("spike_warning_threshold", 0.5) - spike_mute = sentiment_config.get("spike_mute_threshold", 0.8) - coherence_config = config.get("coherence", {}) - - # Process per-user findings - for finding in result.get("user_findings", []): - username = finding["username"] - lookup = user_lookup.get(username) - if not lookup: - # LLM returned a finding for a history-only user or unknown name; skip - continue - - user_id, user_ref_msg, user_msgs = lookup - score = finding["toxicity_score"] - categories = finding["categories"] - reasoning = finding["reasoning"] - off_topic = finding.get("off_topic", False) - topic_category = finding.get("topic_category", "general_chat") - topic_reasoning = finding.get("topic_reasoning", "") - coherence_score = finding.get("coherence_score", 0.85) - coherence_flag = finding.get("coherence_flag", "normal") - note_update = finding.get("note_update") - detected_game = finding.get("detected_game") - - # Track the result in DramaTracker - self.bot.drama_tracker.add_entry(user_id, score, categories, reasoning) - escalation_boost = sentiment_config.get("escalation_boost", 0.04) - drama_score = self.bot.drama_tracker.get_drama_score(user_id, escalation_boost=escalation_boost) - - logger.info( - "User %s (%d) | msg_score=%.2f | drama_score=%.2f | categories=%s | %s", - username, user_id, score, drama_score, categories, reasoning, - ) - - # Save message + analysis to DB - combined_content = "\n".join( - m.content for m in user_msgs if m.content and m.content.strip() - ) - db_message_id = await self.bot.db.save_message_and_analysis( - guild_id=user_ref_msg.guild.id, - channel_id=user_ref_msg.channel.id, - user_id=user_id, - username=username, - content=combined_content[:4000], - message_ts=user_ref_msg.created_at.replace(tzinfo=timezone.utc), - toxicity_score=score, - drama_score=drama_score, - categories=categories, - reasoning=reasoning, - off_topic=off_topic, - topic_category=topic_category, - topic_reasoning=topic_reasoning, - coherence_score=coherence_score, - coherence_flag=coherence_flag, - ) - - # Topic drift handling - if off_topic: - await self._handle_topic_drift(user_ref_msg, topic_category, topic_reasoning, db_message_id) - - # Game channel redirect detection - if detected_game and game_channels and not dry_run: - await self._handle_channel_redirect(user_ref_msg, detected_game, game_channels, db_message_id) - - # Coherence / intoxication detection - if coherence_config.get("enabled", True): - degradation = self.bot.drama_tracker.update_coherence( - user_id=user_id, - score=coherence_score, - flag=coherence_flag, - drop_threshold=coherence_config.get("drop_threshold", 0.3), - absolute_floor=coherence_config.get("absolute_floor", 0.5), - cooldown_minutes=coherence_config.get("cooldown_minutes", 30), - ) - if degradation and not dry_run: - await self._handle_coherence_alert(user_ref_msg, degradation, coherence_config, db_message_id) - - # Capture LLM note updates about this user - if note_update: - self.bot.drama_tracker.update_user_notes(user_id, note_update) - self._dirty_users.add(user_id) - - # Mark dirty for coherence baseline drift even without actions - self._dirty_users.add(user_id) - - # Always log analysis to #bcs-log if it exists - await self._log_analysis(user_ref_msg, score, drama_score, categories, reasoning, off_topic, topic_category) - - # Moderation actions (skip in dry-run mode) - if not dry_run: - mute_threshold = self.bot.drama_tracker.get_mute_threshold( - user_id, base_mute_threshold - ) - user_data = self.bot.drama_tracker.get_user(user_id) - # Mute: rolling average OR single message spike - if drama_score >= mute_threshold or score >= spike_mute: - effective_score = max(drama_score, score) - if user_data.warned_since_reset: - await self._mute_user(user_ref_msg, effective_score, categories, db_message_id) - else: - # Downgrade to warning — require a warning before muting - logger.info("Downgrading mute to warning for %s (no prior warning)", user_ref_msg.author) - await self._warn_user(user_ref_msg, effective_score, db_message_id) - # Warn: rolling average OR single message spike - elif drama_score >= warning_threshold or score >= spike_warn: - effective_score = max(drama_score, score) - await self._warn_user(user_ref_msg, effective_score, db_message_id) - - # -- Mention scan methods -- - - def _mark_analyzed(self, discord_message_id: int): - """Track a Discord message ID as already analyzed.""" - self._analyzed_message_ids.add(discord_message_id) - if len(self._analyzed_message_ids) > self._max_analyzed_ids: - sorted_ids = sorted(self._analyzed_message_ids) - self._analyzed_message_ids = set(sorted_ids[len(sorted_ids) // 2:]) - - async def _maybe_start_mention_scan( - self, trigger_message: discord.Message, mention_config: dict - ): - """Check cooldown and kick off a mention-triggered scan of recent messages.""" - channel_id = trigger_message.channel.id - cooldown_seconds = mention_config.get("cooldown_seconds", 60) - - now = datetime.now(timezone.utc) - last_scan = self._mention_scan_cooldowns.get(channel_id) - if last_scan and (now - last_scan).total_seconds() < cooldown_seconds: - logger.info( - "Mention scan cooldown active for #%s, skipping.", - getattr(trigger_message.channel, "name", "unknown"), - ) - return - - self._mention_scan_cooldowns[channel_id] = now - - # Extract the user's concern (strip the bot ping from the message) - mention_text = trigger_message.content - for fmt in (f"<@{self.bot.user.id}>", f"<@!{self.bot.user.id}>"): - mention_text = mention_text.replace(fmt, "") - mention_text = mention_text.strip() or "(user pinged bot without specific concern)" - - # Store as a mention scan task (separate from debounce) - existing_task = self._mention_scan_tasks.get(channel_id) - if existing_task and not existing_task.done(): - existing_task.cancel() - - self._mention_scan_tasks[channel_id] = asyncio.create_task( - self._run_mention_scan(trigger_message, mention_text, mention_config) - ) - - async def _run_mention_scan( - self, - trigger_message: discord.Message, - mention_text: str, - mention_config: dict, - ): - """Scan recent channel messages with ONE conversation-level LLM call.""" - channel = trigger_message.channel - scan_count = mention_config.get("scan_messages", 30) - - config = self.bot.config - sentiment_config = config.get("sentiment", {}) - game_channels = config.get("game_channels", {}) - - # Fetch recent messages (before the trigger, skip bots/empty) - raw_messages: list[discord.Message] = [] - try: - async for msg in channel.history(limit=scan_count + 10, before=trigger_message): - if msg.author.bot: - continue - if not msg.content or not msg.content.strip(): - continue - raw_messages.append(msg) - if len(raw_messages) >= scan_count: - break - except discord.HTTPException: - logger.warning("Failed to fetch history for mention scan in #%s", - getattr(channel, "name", "unknown")) - return - - raw_messages.reverse() # chronological order - - if not raw_messages: - self._mention_scan_results[trigger_message.id] = "No recent messages found to analyze." - return - - logger.info( - "Mention scan triggered by %s in #%s: %d messages (single LLM call). Focus: %s", - trigger_message.author.display_name, - getattr(channel, "name", "unknown"), - len(raw_messages), - mention_text[:80], - ) - - # Build a lookup of message IDs to authors for resolving replies - msg_id_to_author: dict[int, str] = { - m.id: m.author.display_name for m in raw_messages - } - - # Convert to conversation tuples: (username, content, timestamp, reply_to) - conversation: list[tuple[str, str, datetime, str | None]] = [] - for msg in raw_messages: - reply_to = None - if msg.reference and msg.reference.message_id: - # Try our local lookup first - reply_to = msg_id_to_author.get(msg.reference.message_id) - if not reply_to: - # Try the cached message - ref = msg.reference.cached_message - if ref: - reply_to = ref.author.display_name - conversation.append(( - msg.author.display_name, - msg.content, - msg.created_at, - reply_to, - )) - - # Build user notes map - user_notes_map: dict[str, str] = {} - for msg in raw_messages: - name = msg.author.display_name - if name not in user_notes_map: - notes = self.bot.drama_tracker.get_user_notes(msg.author.id) - if notes: - user_notes_map[name] = notes - - channel_context = self._build_channel_context(raw_messages[0], game_channels) - mention_context = ( - f"A user flagged this conversation and said: \"{mention_text}\"\n" - f"Pay special attention to whether this concern is valid." - ) - - # Single LLM call - result = await self.bot.llm.analyze_conversation( - conversation, - mention_context=mention_context, - channel_context=channel_context, - user_notes_map=user_notes_map, - ) - - if result is None: - logger.warning("Conversation analysis failed for mention scan.") - self._mention_scan_results[trigger_message.id] = "Analysis failed." - return - - # Build username → user_id + ref_message mapping - user_lookup: dict[str, tuple[int, discord.Message, list[discord.Message]]] = {} - for msg in raw_messages: - name = msg.author.display_name - if name not in user_lookup: - user_lookup[name] = (msg.author.id, msg, []) - user_lookup[name][2].append(msg) - - findings: list[str] = [] - - # Resolve thresholds once (outside the loop) - dry_run = config.get("monitoring", {}).get("dry_run", False) - mode_config = self.bot.get_mode_config() - moderation_level = mode_config.get("moderation", "full") - if moderation_level == "relaxed" and "relaxed_thresholds" in mode_config: - rt = mode_config["relaxed_thresholds"] - warning_threshold = rt.get("warning_threshold", 0.80) - base_mute_threshold = rt.get("mute_threshold", 0.85) - spike_warn = rt.get("spike_warning_threshold", 0.70) - spike_mute = rt.get("spike_mute_threshold", 0.85) - else: - warning_threshold = sentiment_config.get("warning_threshold", 0.6) - base_mute_threshold = sentiment_config.get("mute_threshold", 0.75) - spike_warn = sentiment_config.get("spike_warning_threshold", 0.5) - spike_mute = sentiment_config.get("spike_mute_threshold", 0.8) - - for finding in result.get("user_findings", []): - username = finding["username"] - score = finding["toxicity_score"] - categories = finding["categories"] - reasoning = finding["reasoning"] - worst_msg = finding.get("worst_message") - off_topic = finding.get("off_topic", False) - note_update = finding.get("note_update") - - lookup = user_lookup.get(username) - if not lookup: - logger.warning("Mention scan: LLM returned unknown user '%s', skipping.", username) - continue - - user_id, ref_msg, user_msgs = lookup - - # Skip if all their messages were already analyzed - if all(m.id in self._analyzed_message_ids for m in user_msgs): - continue - - # Mark their messages as analyzed - for m in user_msgs: - self._mark_analyzed(m.id) - - self.bot.drama_tracker.add_entry(user_id, score, categories, reasoning) - escalation_boost = sentiment_config.get("escalation_boost", 0.04) - drama_score = self.bot.drama_tracker.get_drama_score(user_id, escalation_boost=escalation_boost) - - # Save to DB - content_summary = f"[Mention scan] {worst_msg}" if worst_msg else "[Mention scan] See conversation" - db_message_id = await self.bot.db.save_message_and_analysis( - guild_id=ref_msg.guild.id, - channel_id=ref_msg.channel.id, - user_id=user_id, - username=username, - content=content_summary, - message_ts=ref_msg.created_at.replace(tzinfo=timezone.utc), - toxicity_score=score, - drama_score=drama_score, - categories=categories, - reasoning=reasoning, - off_topic=off_topic, - topic_category="personal_drama" if off_topic else "gaming", - topic_reasoning=reasoning, - coherence_score=None, - coherence_flag=None, - ) - - # Log to #bcs-log - await self._log_analysis( - ref_msg, score, drama_score, categories, reasoning, - off_topic, "personal_drama" if off_topic else "gaming", - ) - - # Collect notable findings for summary - if score >= 0.3: - cat_str = ", ".join(c for c in categories if c != "none") or "none" - findings.append(f"{username}: {score:.2f} ({cat_str})") - - # Update user notes - if note_update: - self.bot.drama_tracker.update_user_notes(user_id, note_update) - self._dirty_users.add(user_id) - - # Moderation actions - if not dry_run: - mute_threshold = self.bot.drama_tracker.get_mute_threshold( - user_id, base_mute_threshold - ) - user_data = self.bot.drama_tracker.get_user(user_id) - if drama_score >= mute_threshold or score >= spike_mute: - effective_score = max(drama_score, score) - if user_data.warned_since_reset: - await self._mute_user(ref_msg, effective_score, categories, db_message_id) - else: - logger.info("Downgrading mute to warning for %s (no prior warning)", ref_msg.author) - await self._warn_user(ref_msg, effective_score, db_message_id) - elif drama_score >= warning_threshold or score >= spike_warn: - effective_score = max(drama_score, score) - await self._warn_user(ref_msg, effective_score, db_message_id) - - # Build summary for ChatCog - convo_summary = result.get("conversation_summary", "") - if findings: - summary = f"Scanned {len(raw_messages)} msgs. {convo_summary} Notable: " + "; ".join(findings[:5]) - else: - summary = f"Scanned {len(raw_messages)} msgs. {convo_summary}" - - # Prune old scan results - if len(self._mention_scan_results) > 20: - oldest = sorted(self._mention_scan_results.keys())[:len(self._mention_scan_results) - 10] - for k in oldest: - del self._mention_scan_results[k] - - self._mention_scan_results[trigger_message.id] = summary - - logger.info( - "Mention scan complete in #%s: 1 LLM call, %d messages, %d users flagged", - getattr(channel, "name", "unknown"), - len(raw_messages), - len(findings), - ) - - async def _mute_user( - self, - message: discord.Message, - score: float, - categories: list[str], - db_message_id: int | None = None, - ): - member = message.author - if not isinstance(member, discord.Member): - return - - # Check bot permissions - if not message.guild.me.guild_permissions.moderate_members: - logger.warning("Missing moderate_members permission, cannot mute.") - return - - # Record offense and get escalating timeout - offense_num = self.bot.drama_tracker.record_offense(member.id) - timeout_config = self.bot.config.get("timeouts", {}) - escalation = timeout_config.get("escalation_minutes", [5, 15, 30, 60]) - idx = min(offense_num - 1, len(escalation) - 1) - duration_minutes = escalation[idx] - - try: - await member.timeout( - timedelta(minutes=duration_minutes), - reason=f"BCS auto-mute: drama score {score:.2f}", - ) - except discord.Forbidden: - logger.warning("Cannot timeout %s — role hierarchy issue.", member) - return - except discord.HTTPException as e: - logger.error("Failed to timeout %s: %s", member, e) - return - - # Build embed - messages_config = self.bot.config.get("messages", {}) - cat_str = ", ".join(c for c in categories if c != "none") or "general negativity" - - embed = discord.Embed( - title=messages_config.get("mute_title", "BREEHAVIOR ALERT"), - description=messages_config.get("mute_description", "").format( - username=member.display_name, - duration=f"{duration_minutes} minutes", - score=f"{score:.2f}", - categories=cat_str, - ), - color=discord.Color.red(), - ) - embed.set_footer( - text=f"Offense #{offense_num} | Timeout: {duration_minutes}m" - ) - - await message.channel.send(embed=embed) - await self._log_action( - message.guild, - f"**MUTE** | {member.mention} | Score: {score:.2f} | " - f"Duration: {duration_minutes}m | Offense #{offense_num} | " - f"Categories: {cat_str}", - ) - - logger.info( - "Muted %s for %d minutes (offense #%d, score %.2f)", - member, - duration_minutes, - offense_num, - score, - ) - - # Persist mute action and updated user state (fire-and-forget) - asyncio.create_task(self.bot.db.save_action( - guild_id=message.guild.id, - user_id=member.id, - username=member.display_name, - action_type="mute", - message_id=db_message_id, - details=f"duration={duration_minutes}m offense={offense_num} score={score:.2f} categories={cat_str}", - )) - self._save_user_state(member.id) - - async def _warn_user(self, message: discord.Message, score: float, db_message_id: int | None = None): - timeout_config = self.bot.config.get("timeouts", {}) - cooldown = timeout_config.get("warning_cooldown_minutes", 5) - - if not self.bot.drama_tracker.can_warn(message.author.id, cooldown): - return - - self.bot.drama_tracker.record_warning(message.author.id) - - # React with warning emoji - try: - await message.add_reaction("\u26a0\ufe0f") - except discord.HTTPException: - pass - - # Send warning message - messages_config = self.bot.config.get("messages", {}) - warning_text = messages_config.get( - "warning", - "Easy there, {username}. The Breehavior Monitor is watching.", - ).format(username=message.author.display_name) - - await message.channel.send(warning_text) - await self._log_action( - message.guild, - f"**WARNING** | {message.author.mention} | Score: {score:.2f}", - ) - - logger.info("Warned %s (score %.2f)", message.author, score) - - # Persist warning action (fire-and-forget) - asyncio.create_task(self.bot.db.save_action( - guild_id=message.guild.id, - user_id=message.author.id, - username=message.author.display_name, - action_type="warning", - message_id=db_message_id, - details=f"score={score:.2f}", - )) - # Persist warned flag immediately so it survives restarts - self._save_user_state(message.author.id) - - async def _handle_topic_drift( - self, message: discord.Message, topic_category: str, topic_reasoning: str, - db_message_id: int | None = None, - ): - config = self.bot.config.get("topic_drift", {}) - if not config.get("enabled", True): - return - - # Skip channels excluded from topic drift monitoring - ignored = config.get("ignored_channels", []) - if message.channel.id in ignored or getattr(message.channel, "name", "") in ignored: - return - - # Check if we're in dry-run mode — still track but don't act - dry_run = self.bot.config.get("monitoring", {}).get("dry_run", False) - if dry_run: - return - - tracker = self.bot.drama_tracker - user_id = message.author.id - cooldown = config.get("remind_cooldown_minutes", 10) - - if not tracker.can_topic_remind(user_id, cooldown): - return - - count = tracker.record_off_topic(user_id) - escalation_threshold = config.get("escalation_count", 3) - messages_config = self.bot.config.get("messages", {}) - - if count >= escalation_threshold and not tracker.was_owner_notified(user_id): - # DM the server owner - tracker.mark_owner_notified(user_id) - owner = message.guild.owner - if owner: - dm_text = messages_config.get( - "topic_owner_dm", - "Heads up: {username} keeps going off-topic in #{channel}. Reminded {count} times.", - ).format( - username=message.author.display_name, - channel=message.channel.name, - count=count, - ) - try: - await owner.send(dm_text) - except discord.HTTPException: - logger.warning("Could not DM server owner about topic drift.") - - await self._log_action( - message.guild, - f"**TOPIC DRIFT — OWNER NOTIFIED** | {message.author.mention} | " - f"Off-topic count: {count} | Category: {topic_category}", - ) - logger.info("Notified owner about %s topic drift (count %d)", message.author, count) - - asyncio.create_task(self.bot.db.save_action( - guild_id=message.guild.id, user_id=user_id, - username=message.author.display_name, - action_type="topic_escalation", message_id=db_message_id, - details=f"off_topic_count={count} category={topic_category}", - )) - self._save_user_state(user_id) - - elif count >= 2: - # Firmer nudge - nudge_text = messages_config.get( - "topic_nudge", - "{username}, let's keep it to gaming talk in here.", - ).format(username=message.author.display_name) - await message.channel.send(nudge_text) - await self._log_action( - message.guild, - f"**TOPIC NUDGE** | {message.author.mention} | " - f"Off-topic count: {count} | Category: {topic_category}", - ) - logger.info("Topic nudge for %s (count %d)", message.author, count) - - asyncio.create_task(self.bot.db.save_action( - guild_id=message.guild.id, user_id=user_id, - username=message.author.display_name, - action_type="topic_nudge", message_id=db_message_id, - details=f"off_topic_count={count} category={topic_category}", - )) - self._save_user_state(user_id) - - else: - # Friendly first reminder - remind_text = messages_config.get( - "topic_remind", - "Hey {username}, this is a gaming server — maybe take the personal stuff to DMs?", - ).format(username=message.author.display_name) - await message.channel.send(remind_text) - await self._log_action( - message.guild, - f"**TOPIC REMIND** | {message.author.mention} | " - f"Category: {topic_category} | {topic_reasoning}", - ) - logger.info("Topic remind for %s (count %d)", message.author, count) - - asyncio.create_task(self.bot.db.save_action( - guild_id=message.guild.id, user_id=user_id, - username=message.author.display_name, - action_type="topic_remind", message_id=db_message_id, - details=f"off_topic_count={count} category={topic_category} reasoning={topic_reasoning}", - )) - self._save_user_state(user_id) - - async def _handle_coherence_alert( - self, message: discord.Message, degradation: dict, coherence_config: dict, - db_message_id: int | None = None, - ): - flag = degradation["flag"] - messages_map = coherence_config.get("messages", {}) - alert_text = messages_map.get(flag, messages_map.get( - "default", "You okay there, {username}? That message was... something." - )).format(username=message.author.display_name) - - await message.channel.send(alert_text) - await self._log_action( - message.guild, - f"**COHERENCE ALERT** | {message.author.mention} | " - f"Score: {degradation['current']:.2f} | Baseline: {degradation['baseline']:.2f} | " - f"Drop: {degradation['drop']:.2f} | Flag: {flag}", - ) - logger.info( - "Coherence alert for %s: score=%.2f baseline=%.2f drop=%.2f flag=%s", - message.author, degradation["current"], degradation["baseline"], - degradation["drop"], flag, - ) - - asyncio.create_task(self.bot.db.save_action( - guild_id=message.guild.id, - user_id=message.author.id, - username=message.author.display_name, - action_type="coherence_alert", - message_id=db_message_id, - details=f"score={degradation['current']:.2f} baseline={degradation['baseline']:.2f} drop={degradation['drop']:.2f} flag={flag}", - )) - self._save_user_state(message.author.id) - - def _save_user_state(self, user_id: int) -> None: - """Fire-and-forget save of a user's current state to DB.""" - user_data = self.bot.drama_tracker.get_user(user_id) - asyncio.create_task(self.bot.db.save_user_state( - user_id=user_id, - offense_count=user_data.offense_count, - immune=user_data.immune, - off_topic_count=user_data.off_topic_count, - baseline_coherence=user_data.baseline_coherence, - user_notes=user_data.notes or None, - warned=user_data.warned_since_reset, - last_offense_at=user_data.last_offense_time or None, - )) - self._dirty_users.discard(user_id) - - @tasks.loop(seconds=STATE_FLUSH_INTERVAL) - async def _flush_states(self): - await self._flush_dirty_states() - - @_flush_states.before_loop - async def _before_flush(self): - await self.bot.wait_until_ready() - - async def _flush_dirty_states(self) -> None: - """Save all dirty user states to DB.""" - if not self._dirty_users: - return - dirty = list(self._dirty_users) - self._dirty_users.clear() - for user_id in dirty: - user_data = self.bot.drama_tracker.get_user(user_id) - await self.bot.db.save_user_state( - user_id=user_id, - offense_count=user_data.offense_count, - immune=user_data.immune, - off_topic_count=user_data.off_topic_count, - baseline_coherence=user_data.baseline_coherence, - user_notes=user_data.notes or None, - warned=user_data.warned_since_reset, - last_offense_at=user_data.last_offense_time or None, - ) - logger.info("Flushed %d dirty user states to DB.", len(dirty)) - - def _build_channel_context(self, message: discord.Message, game_channels: dict) -> str: - """Build a channel context string for LLM game detection.""" - if not game_channels: - return "" - channel_name = getattr(message.channel, "name", "") - current_game = game_channels.get(channel_name) - lines = [] - if current_game: - lines.append(f"Current channel: #{channel_name} ({current_game})") - else: - lines.append(f"Current channel: #{channel_name}") - channel_list = ", ".join(f"#{ch} ({game})" for ch, game in game_channels.items()) - lines.append(f"Game channels: {channel_list}") - return "\n".join(lines) - - async def _handle_channel_redirect( - self, message: discord.Message, detected_game: str, - game_channels: dict, db_message_id: int | None = None, - ): - """Send a redirect message if the user is talking about a different game.""" - channel_name = getattr(message.channel, "name", "") - - # Only redirect if message is in a game channel - if channel_name not in game_channels: - return - - # No redirect needed if detected game matches current channel - if detected_game == channel_name: - return - - # Detected game must be a valid game channel - if detected_game not in game_channels: - return - - # Find the target channel in the guild - target_channel = discord.utils.get( - message.guild.text_channels, name=detected_game - ) - if not target_channel: - return - - # Check per-user cooldown (reuse topic_drift remind_cooldown_minutes) - user_id = message.author.id - cooldown_minutes = self.bot.config.get("topic_drift", {}).get("remind_cooldown_minutes", 10) - now = datetime.now(timezone.utc) - last_redirect = self._redirect_cooldowns.get(user_id) - if last_redirect and (now - last_redirect) < timedelta(minutes=cooldown_minutes): - return - - self._redirect_cooldowns[user_id] = now - - # Send redirect message - messages_config = self.bot.config.get("messages", {}) - game_name = game_channels[detected_game] - redirect_text = messages_config.get( - "channel_redirect", - "Hey {username}, that sounds like {game} talk — head over to {channel} for that!", - ).format( - username=message.author.display_name, - game=game_name, - channel=target_channel.mention, - ) - - await message.channel.send(redirect_text) - - await self._log_action( - message.guild, - f"**CHANNEL REDIRECT** | {message.author.mention} | " - f"#{channel_name} → #{detected_game} ({game_name})", - ) - logger.info( - "Redirected %s from #%s to #%s (%s)", - message.author, channel_name, detected_game, game_name, - ) - - asyncio.create_task(self.bot.db.save_action( - guild_id=message.guild.id, - user_id=user_id, - username=message.author.display_name, - action_type="channel_redirect", - message_id=db_message_id, - details=f"from=#{channel_name} to=#{detected_game} game={game_name}", - )) - - async def _log_analysis( - self, message: discord.Message, score: float, drama_score: float, - categories: list[str], reasoning: str, off_topic: bool, topic_category: str, - ): - log_channel = discord.utils.get( - message.guild.text_channels, name="bcs-log" - ) - if not log_channel: - return - - # Only log notable messages (score > 0.1) to avoid spam - if score <= 0.1: - return - - cat_str = ", ".join(c for c in categories if c != "none") or "none" - embed = discord.Embed( - title=f"Analysis: {message.author.display_name}", - description=f"#{message.channel.name}: {message.content[:200]}", - color=self._score_color(score), - ) - embed.add_field(name="Message Score", value=f"{score:.2f}", inline=True) - embed.add_field(name="Rolling Drama", value=f"{drama_score:.2f}", inline=True) - embed.add_field(name="Categories", value=cat_str, inline=True) - embed.add_field(name="Reasoning", value=reasoning[:1024] or "n/a", inline=False) - try: - await log_channel.send(embed=embed) - except discord.HTTPException: - pass - - @staticmethod - def _score_color(score: float) -> discord.Color: - if score >= 0.75: - return discord.Color.red() - if score >= 0.6: - return discord.Color.orange() - if score >= 0.3: - return discord.Color.yellow() - return discord.Color.green() - - async def _log_action(self, guild: discord.Guild, text: str): - log_channel = discord.utils.get(guild.text_channels, name="bcs-log") - if log_channel: - try: - await log_channel.send(text) - except discord.HTTPException: - pass - - -async def setup(bot: commands.Bot): - await bot.add_cog(SentimentCog(bot)) diff --git a/cogs/sentiment/__init__.py b/cogs/sentiment/__init__.py new file mode 100644 index 0000000..ef45dcf --- /dev/null +++ b/cogs/sentiment/__init__.py @@ -0,0 +1,656 @@ +import asyncio +import logging +from datetime import datetime, timezone + + +import discord +from discord.ext import commands, tasks + +from cogs.sentiment.actions import mute_user, warn_user +from cogs.sentiment.channel_redirect import build_channel_context, handle_channel_redirect +from cogs.sentiment.coherence import handle_coherence_alert +from cogs.sentiment.log_utils import log_analysis +from cogs.sentiment.state import flush_dirty_states +from cogs.sentiment.topic_drift import handle_topic_drift + +logger = logging.getLogger("bcs.sentiment") + +# How often to flush dirty user states to DB (seconds) +STATE_FLUSH_INTERVAL = 300 # 5 minutes + + +class SentimentCog(commands.Cog): + def __init__(self, bot: commands.Bot): + self.bot = bot + # Track which user IDs have unsaved in-memory changes + self._dirty_users: set[int] = set() + # Per-user redirect cooldown: {user_id: last_redirect_datetime} + self._redirect_cooldowns: dict[int, datetime] = {} + # Debounce buffer: keyed by channel_id, stores list of messages from ALL users + self._message_buffer: dict[int, list[discord.Message]] = {} + # Pending debounce timer tasks (per-channel) + self._debounce_tasks: dict[int, asyncio.Task] = {} + # Mention scan tasks (separate from debounce) + self._mention_scan_tasks: dict[int, asyncio.Task] = {} + # Mention scan state + self._mention_scan_cooldowns: dict[int, datetime] = {} # {channel_id: last_scan_time} + self._mention_scan_results: dict[int, str] = {} # {trigger_message_id: findings_summary} + self._analyzed_message_ids: set[int] = set() # Discord message IDs already analyzed + self._max_analyzed_ids = 500 + + + async def cog_load(self): + self._flush_states.start() + + async def cog_unload(self): + self._flush_states.cancel() + # Cancel all pending debounce timers and process remaining buffers + for task in self._debounce_tasks.values(): + task.cancel() + self._debounce_tasks.clear() + for task in self._mention_scan_tasks.values(): + task.cancel() + self._mention_scan_tasks.clear() + for channel_id in list(self._message_buffer): + await self._process_buffered(channel_id) + # Final flush on shutdown + await flush_dirty_states(self.bot, self._dirty_users) + + @commands.Cog.listener() + async def on_message(self, message: discord.Message): + logger.info("MSG from %s in #%s: %s", message.author, getattr(message.channel, 'name', 'DM'), message.content[:80] if message.content else "(empty)") + + # Ignore bots (including ourselves) + if message.author.bot: + return + + # Ignore DMs + if not message.guild: + return + + config = self.bot.config + monitoring = config.get("monitoring", {}) + + if not monitoring.get("enabled", True): + return + + # Check if channel is monitored + monitored_channels = monitoring.get("channels", []) + if monitored_channels and message.channel.id not in monitored_channels: + return + + # Check ignored users + if message.author.id in monitoring.get("ignored_users", []): + return + + # Check immune roles + immune_roles = set(monitoring.get("immune_roles", [])) + if immune_roles and any( + r.id in immune_roles for r in message.author.roles + ): + return + + # Check per-user immunity + if self.bot.drama_tracker.is_immune(message.author.id): + return + + # Explicit @mention of the bot triggers a mention scan instead of scoring. + # Reply-pings (Discord auto-adds replied-to user to mentions) should NOT + # trigger scans — and reply-to-bot messages should still be scored normally + # so toxic replies to bot warnings aren't silently skipped. + bot_mentioned_in_text = ( + f"<@{self.bot.user.id}>" in (message.content or "") + or f"<@!{self.bot.user.id}>" in (message.content or "") + ) + if bot_mentioned_in_text: + mention_config = config.get("mention_scan", {}) + if mention_config.get("enabled", True): + await self._maybe_start_mention_scan(message, mention_config) + return + + # Skip if empty + if not message.content or not message.content.strip(): + return + + # Buffer the message and start/reset debounce timer (per-channel) + channel_id = message.channel.id + if channel_id not in self._message_buffer: + self._message_buffer[channel_id] = [] + self._message_buffer[channel_id].append(message) + + # Cancel existing debounce timer for this channel + existing_task = self._debounce_tasks.get(channel_id) + if existing_task and not existing_task.done(): + existing_task.cancel() + + batch_window = config.get("sentiment", {}).get("batch_window_seconds", 3) + + self._debounce_tasks[channel_id] = asyncio.create_task( + self._debounce_then_process(channel_id, batch_window) + ) + + async def _debounce_then_process(self, channel_id: int, delay: float): + """Sleep for the debounce window, then process the buffered messages.""" + try: + await asyncio.sleep(delay) + await self._process_buffered(channel_id) + except asyncio.CancelledError: + pass # Timer was reset by a new message — expected + + def _resolve_thresholds(self) -> dict: + """Resolve effective moderation thresholds based on current mode.""" + config = self.bot.config + sentiment_config = config.get("sentiment", {}) + mode_config = self.bot.get_mode_config() + moderation_level = mode_config.get("moderation", "full") + if moderation_level == "relaxed" and "relaxed_thresholds" in mode_config: + rt = mode_config["relaxed_thresholds"] + return { + "warning": rt.get("warning_threshold", 0.80), + "mute": rt.get("mute_threshold", 0.85), + "spike_warn": rt.get("spike_warning_threshold", 0.70), + "spike_mute": rt.get("spike_mute_threshold", 0.85), + } + return { + "warning": sentiment_config.get("warning_threshold", 0.6), + "mute": sentiment_config.get("mute_threshold", 0.75), + "spike_warn": sentiment_config.get("spike_warning_threshold", 0.5), + "spike_mute": sentiment_config.get("spike_mute_threshold", 0.8), + } + + async def _apply_moderation( + self, + message: discord.Message, + user_id: int, + score: float, + drama_score: float, + categories: list[str], + thresholds: dict, + db_message_id: int | None, + ) -> None: + """Issue a warning or mute based on scores and thresholds.""" + mute_threshold = self.bot.drama_tracker.get_mute_threshold(user_id, thresholds["mute"]) + user_data = self.bot.drama_tracker.get_user(user_id) + if drama_score >= mute_threshold or score >= thresholds["spike_mute"]: + effective_score = max(drama_score, score) + if user_data.warned_since_reset: + await mute_user(self.bot, message, effective_score, categories, db_message_id, self._dirty_users) + else: + logger.info("Downgrading mute to warning for %s (no prior warning)", message.author) + await warn_user(self.bot, message, effective_score, db_message_id, self._dirty_users) + elif drama_score >= thresholds["warning"] or score >= thresholds["spike_warn"]: + effective_score = max(drama_score, score) + await warn_user(self.bot, message, effective_score, db_message_id, self._dirty_users) + + @staticmethod + def _build_user_lookup(messages: list[discord.Message]) -> dict[str, tuple[int, discord.Message, list[discord.Message]]]: + """Build username -> (user_id, ref_msg, [messages]) mapping.""" + lookup: dict[str, tuple[int, discord.Message, list[discord.Message]]] = {} + for msg in messages: + name = msg.author.display_name + if name not in lookup: + lookup[name] = (msg.author.id, msg, []) + lookup[name][2].append(msg) + return lookup + + def _build_user_notes_map(self, messages: list[discord.Message]) -> dict[str, str]: + """Build username -> LLM notes mapping for users in the message list.""" + notes_map: dict[str, str] = {} + for msg in messages: + name = msg.author.display_name + if name not in notes_map: + notes = self.bot.drama_tracker.get_user_notes(msg.author.id) + if notes: + notes_map[name] = notes + return notes_map + + @staticmethod + def _build_conversation( + messages: list[discord.Message], + ) -> list[tuple[str, str, datetime, str | None]]: + """Convert a list of Discord messages to conversation tuples with reply resolution.""" + msg_id_to_author = {m.id: m.author.display_name for m in messages} + conversation = [] + for msg in messages: + reply_to = None + if msg.reference and msg.reference.message_id: + reply_to = msg_id_to_author.get(msg.reference.message_id) + if not reply_to: + ref = msg.reference.cached_message + if ref: + reply_to = ref.author.display_name + conversation.append(( + msg.author.display_name, + msg.content, + msg.created_at, + reply_to, + )) + return conversation + + # -- Shared finding processor -- + + async def _process_finding( + self, + finding: dict, + user_lookup: dict, + *, + sentiment_config: dict, + dry_run: bool, + thresholds: dict, + db_content: str, + db_topic_category: str, + db_topic_reasoning: str, + db_coherence_score: float | None, + db_coherence_flag: str | None, + game_channels: dict | None = None, + coherence_config: dict | None = None, + ) -> tuple[str, float, float, list[str]] | None: + """Process a single user finding. + + Returns (username, score, drama_score, categories) or None if skipped. + + When game_channels is not None, topic drift, game redirect, and coherence + handlers are active (buffered analysis mode). When None, they are skipped + (mention scan mode). + """ + username = finding["username"] + lookup = user_lookup.get(username) + if not lookup: + return None + + user_id, user_ref_msg, user_msgs = lookup + score = finding["toxicity_score"] + categories = finding["categories"] + reasoning = finding["reasoning"] + off_topic = finding.get("off_topic", False) + note_update = finding.get("note_update") + + # Track in DramaTracker + self.bot.drama_tracker.add_entry(user_id, score, categories, reasoning) + escalation_boost = sentiment_config.get("escalation_boost", 0.04) + drama_score = self.bot.drama_tracker.get_drama_score(user_id, escalation_boost=escalation_boost) + + logger.info( + "User %s (%d) | msg_score=%.2f | drama_score=%.2f | categories=%s | %s", + username, user_id, score, drama_score, categories, reasoning, + ) + + # Save to DB + db_message_id = await self.bot.db.save_message_and_analysis( + guild_id=user_ref_msg.guild.id, + channel_id=user_ref_msg.channel.id, + user_id=user_id, + username=username, + content=db_content, + message_ts=user_ref_msg.created_at.replace(tzinfo=timezone.utc), + toxicity_score=score, + drama_score=drama_score, + categories=categories, + reasoning=reasoning, + off_topic=off_topic, + topic_category=db_topic_category, + topic_reasoning=db_topic_reasoning, + coherence_score=db_coherence_score, + coherence_flag=db_coherence_flag, + ) + + # Feature handlers — only active during buffered analysis (game_channels set) + if game_channels is not None: + if off_topic: + await handle_topic_drift( + self.bot, user_ref_msg, db_topic_category, db_topic_reasoning, + db_message_id, self._dirty_users, + ) + + detected_game = finding.get("detected_game") + if detected_game and game_channels and not dry_run: + await handle_channel_redirect( + self.bot, user_ref_msg, detected_game, game_channels, + db_message_id, self._redirect_cooldowns, + ) + + if coherence_config is not None and coherence_config.get("enabled", True): + coherence_score = finding.get("coherence_score", 0.85) + coherence_flag = finding.get("coherence_flag", "normal") + degradation = self.bot.drama_tracker.update_coherence( + user_id=user_id, + score=coherence_score, + flag=coherence_flag, + drop_threshold=coherence_config.get("drop_threshold", 0.3), + absolute_floor=coherence_config.get("absolute_floor", 0.5), + cooldown_minutes=coherence_config.get("cooldown_minutes", 30), + ) + if degradation and not dry_run: + await handle_coherence_alert( + self.bot, user_ref_msg, degradation, coherence_config, + db_message_id, self._dirty_users, + ) + + # Note update + if note_update: + self.bot.drama_tracker.update_user_notes(user_id, note_update) + self._dirty_users.add(user_id) + + self._dirty_users.add(user_id) + + # Log analysis + await log_analysis( + user_ref_msg, score, drama_score, categories, reasoning, + off_topic, db_topic_category, + ) + + # Moderation + if not dry_run: + await self._apply_moderation( + user_ref_msg, user_id, score, drama_score, categories, thresholds, db_message_id, + ) + + return (username, score, drama_score, categories) + + # -- Buffered analysis -- + + async def _process_buffered(self, channel_id: int): + """Collect buffered messages, build conversation block, and run analysis.""" + messages = self._message_buffer.pop(channel_id, []) + self._debounce_tasks.pop(channel_id, None) + + if not messages: + return + + # Use the last message as reference for channel/guild + ref_message = messages[-1] + channel = ref_message.channel + + config = self.bot.config + sentiment_config = config.get("sentiment", {}) + game_channels = config.get("game_channels", {}) + + # Fetch some history before the buffered messages for leading context + context_count = sentiment_config.get("context_messages", 8) + oldest_buffered = messages[0] + history_messages: list[discord.Message] = [] + try: + async for msg in channel.history(limit=context_count + 5, before=oldest_buffered): + if msg.author.bot: + continue + if not msg.content or not msg.content.strip(): + continue + history_messages.append(msg) + if len(history_messages) >= context_count: + break + except discord.HTTPException: + pass + + history_messages.reverse() # chronological order + + # Combine: history (context) + buffered (new messages to analyze) + new_message_start = len(history_messages) + all_messages = history_messages + messages + + conversation = self._build_conversation(all_messages) + if not conversation: + return + + user_notes_map = self._build_user_notes_map(messages) + + channel_context = build_channel_context(ref_message, game_channels) + + logger.info( + "Channel analysis: %d new messages (+%d context) in #%s", + len(messages), len(history_messages), + getattr(channel, 'name', 'unknown'), + ) + + # TRIAGE: Lightweight model — conversation-level analysis + result = await self.bot.llm.analyze_conversation( + conversation, + channel_context=channel_context, + user_notes_map=user_notes_map, + new_message_start=new_message_start, + ) + + if result is None: + return + + # ESCALATION: Re-analyze with heavy model if any finding warrants it + escalation_threshold = sentiment_config.get("escalation_threshold", 0.25) + needs_escalation = any( + f["toxicity_score"] >= escalation_threshold + or f.get("off_topic", False) + or f.get("coherence_score", 1.0) < 0.6 + for f in result.get("user_findings", []) + ) + if needs_escalation: + heavy_result = await self.bot.llm_heavy.analyze_conversation( + conversation, + channel_context=channel_context, + user_notes_map=user_notes_map, + new_message_start=new_message_start, + ) + if heavy_result is not None: + logger.info( + "Escalated to heavy model for #%s", + getattr(channel, 'name', 'unknown'), + ) + result = heavy_result + + user_lookup = self._build_user_lookup(messages) + + # Mark all buffered messages as analyzed (for mention scan dedup) + for m in messages: + self._mark_analyzed(m.id) + + dry_run = config.get("monitoring", {}).get("dry_run", False) + thresholds = self._resolve_thresholds() + coherence_config = config.get("coherence", {}) + + # Process per-user findings + for finding in result.get("user_findings", []): + username = finding["username"] + lookup = user_lookup.get(username) + if not lookup: + continue + _, _, user_msgs = lookup + combined_content = "\n".join( + m.content for m in user_msgs if m.content and m.content.strip() + )[:4000] + await self._process_finding( + finding, user_lookup, + sentiment_config=sentiment_config, + dry_run=dry_run, + thresholds=thresholds, + db_content=combined_content, + db_topic_category=finding.get("topic_category", "general_chat"), + db_topic_reasoning=finding.get("topic_reasoning", ""), + db_coherence_score=finding.get("coherence_score", 0.85), + db_coherence_flag=finding.get("coherence_flag", "normal"), + game_channels=game_channels, + coherence_config=coherence_config, + ) + + # -- Mention scan methods -- + + def _mark_analyzed(self, discord_message_id: int): + """Track a Discord message ID as already analyzed.""" + self._analyzed_message_ids.add(discord_message_id) + if len(self._analyzed_message_ids) > self._max_analyzed_ids: + sorted_ids = sorted(self._analyzed_message_ids) + self._analyzed_message_ids = set(sorted_ids[len(sorted_ids) // 2:]) + + async def _maybe_start_mention_scan( + self, trigger_message: discord.Message, mention_config: dict + ): + """Check cooldown and kick off a mention-triggered scan of recent messages.""" + channel_id = trigger_message.channel.id + cooldown_seconds = mention_config.get("cooldown_seconds", 60) + + now = datetime.now(timezone.utc) + last_scan = self._mention_scan_cooldowns.get(channel_id) + if last_scan and (now - last_scan).total_seconds() < cooldown_seconds: + logger.info( + "Mention scan cooldown active for #%s, skipping.", + getattr(trigger_message.channel, "name", "unknown"), + ) + return + + self._mention_scan_cooldowns[channel_id] = now + + # Extract the user's concern (strip the bot ping from the message) + mention_text = trigger_message.content + for fmt in (f"<@{self.bot.user.id}>", f"<@!{self.bot.user.id}>"): + mention_text = mention_text.replace(fmt, "") + mention_text = mention_text.strip() or "(user pinged bot without specific concern)" + + # Store as a mention scan task (separate from debounce) + existing_task = self._mention_scan_tasks.get(channel_id) + if existing_task and not existing_task.done(): + existing_task.cancel() + + self._mention_scan_tasks[channel_id] = asyncio.create_task( + self._run_mention_scan(trigger_message, mention_text, mention_config) + ) + + async def _run_mention_scan( + self, + trigger_message: discord.Message, + mention_text: str, + mention_config: dict, + ): + """Scan recent channel messages with ONE conversation-level LLM call.""" + channel = trigger_message.channel + scan_count = mention_config.get("scan_messages", 30) + + config = self.bot.config + sentiment_config = config.get("sentiment", {}) + game_channels = config.get("game_channels", {}) + + # Fetch recent messages (before the trigger, skip bots/empty) + raw_messages: list[discord.Message] = [] + try: + async for msg in channel.history(limit=scan_count + 10, before=trigger_message): + if msg.author.bot: + continue + if not msg.content or not msg.content.strip(): + continue + raw_messages.append(msg) + if len(raw_messages) >= scan_count: + break + except discord.HTTPException: + logger.warning("Failed to fetch history for mention scan in #%s", + getattr(channel, "name", "unknown")) + return + + raw_messages.reverse() # chronological order + + if not raw_messages: + self._mention_scan_results[trigger_message.id] = "No recent messages found to analyze." + return + + logger.info( + "Mention scan triggered by %s in #%s: %d messages (single LLM call). Focus: %s", + trigger_message.author.display_name, + getattr(channel, "name", "unknown"), + len(raw_messages), + mention_text[:80], + ) + + conversation = self._build_conversation(raw_messages) + user_notes_map = self._build_user_notes_map(raw_messages) + + channel_context = build_channel_context(raw_messages[0], game_channels) + mention_context = ( + f"A user flagged this conversation and said: \"{mention_text}\"\n" + f"Pay special attention to whether this concern is valid." + ) + + # Single LLM call + result = await self.bot.llm.analyze_conversation( + conversation, + mention_context=mention_context, + channel_context=channel_context, + user_notes_map=user_notes_map, + ) + + if result is None: + logger.warning("Conversation analysis failed for mention scan.") + self._mention_scan_results[trigger_message.id] = "Analysis failed." + return + + user_lookup = self._build_user_lookup(raw_messages) + findings: list[str] = [] + dry_run = config.get("monitoring", {}).get("dry_run", False) + thresholds = self._resolve_thresholds() + + for finding in result.get("user_findings", []): + username = finding["username"] + lookup = user_lookup.get(username) + if not lookup: + logger.warning("Mention scan: LLM returned unknown user '%s', skipping.", username) + continue + + user_id, ref_msg, user_msgs = lookup + + # Skip if all their messages were already analyzed + if all(m.id in self._analyzed_message_ids for m in user_msgs): + continue + + # Mark their messages as analyzed + for m in user_msgs: + self._mark_analyzed(m.id) + + worst_msg = finding.get("worst_message") + content = f"[Mention scan] {worst_msg}" if worst_msg else "[Mention scan] See conversation" + off_topic = finding.get("off_topic", False) + + result_tuple = await self._process_finding( + finding, user_lookup, + sentiment_config=sentiment_config, + dry_run=dry_run, + thresholds=thresholds, + db_content=content, + db_topic_category="personal_drama" if off_topic else "gaming", + db_topic_reasoning=finding.get("reasoning", ""), + db_coherence_score=None, + db_coherence_flag=None, + ) + if result_tuple: + _, score, _, categories = result_tuple + if score >= 0.3: + cat_str = ", ".join(c for c in categories if c != "none") or "none" + findings.append(f"{username}: {score:.2f} ({cat_str})") + + # Build summary for ChatCog + convo_summary = result.get("conversation_summary", "") + if findings: + summary = f"Scanned {len(raw_messages)} msgs. {convo_summary} Notable: " + "; ".join(findings[:5]) + else: + summary = f"Scanned {len(raw_messages)} msgs. {convo_summary}" + + # Prune old scan results + if len(self._mention_scan_results) > 20: + oldest = sorted(self._mention_scan_results.keys())[:len(self._mention_scan_results) - 10] + for k in oldest: + del self._mention_scan_results[k] + + self._mention_scan_results[trigger_message.id] = summary + + logger.info( + "Mention scan complete in #%s: 1 LLM call, %d messages, %d users flagged", + getattr(channel, "name", "unknown"), + len(raw_messages), + len(findings), + ) + + # -- State flush loop -- + + @tasks.loop(seconds=STATE_FLUSH_INTERVAL) + async def _flush_states(self): + await flush_dirty_states(self.bot, self._dirty_users) + + @_flush_states.before_loop + async def _before_flush(self): + await self.bot.wait_until_ready() + + +async def setup(bot: commands.Bot): + await bot.add_cog(SentimentCog(bot)) diff --git a/cogs/sentiment/actions.py b/cogs/sentiment/actions.py new file mode 100644 index 0000000..336fdba --- /dev/null +++ b/cogs/sentiment/actions.py @@ -0,0 +1,123 @@ +import asyncio +import logging +from datetime import timedelta + +import discord + +from cogs.sentiment.log_utils import log_action +from cogs.sentiment.state import save_user_state + +logger = logging.getLogger("bcs.sentiment") + + +async def mute_user( + bot, message: discord.Message, score: float, + categories: list[str], db_message_id: int | None, dirty_users: set[int], +): + member = message.author + if not isinstance(member, discord.Member): + return + + if not message.guild.me.guild_permissions.moderate_members: + logger.warning("Missing moderate_members permission, cannot mute.") + return + + offense_num = bot.drama_tracker.record_offense(member.id) + timeout_config = bot.config.get("timeouts", {}) + escalation = timeout_config.get("escalation_minutes", [5, 15, 30, 60]) + idx = min(offense_num - 1, len(escalation) - 1) + duration_minutes = escalation[idx] + + try: + await member.timeout( + timedelta(minutes=duration_minutes), + reason=f"BCS auto-mute: drama score {score:.2f}", + ) + except discord.Forbidden: + logger.warning("Cannot timeout %s — role hierarchy issue.", member) + return + except discord.HTTPException as e: + logger.error("Failed to timeout %s: %s", member, e) + return + + messages_config = bot.config.get("messages", {}) + cat_str = ", ".join(c for c in categories if c != "none") or "general negativity" + + embed = discord.Embed( + title=messages_config.get("mute_title", "BREEHAVIOR ALERT"), + description=messages_config.get("mute_description", "").format( + username=member.display_name, + duration=f"{duration_minutes} minutes", + score=f"{score:.2f}", + categories=cat_str, + ), + color=discord.Color.red(), + ) + embed.set_footer( + text=f"Offense #{offense_num} | Timeout: {duration_minutes}m" + ) + + await message.channel.send(embed=embed) + await log_action( + message.guild, + f"**MUTE** | {member.mention} | Score: {score:.2f} | " + f"Duration: {duration_minutes}m | Offense #{offense_num} | " + f"Categories: {cat_str}", + ) + + logger.info( + "Muted %s for %d minutes (offense #%d, score %.2f)", + member, duration_minutes, offense_num, score, + ) + + asyncio.create_task(bot.db.save_action( + guild_id=message.guild.id, + user_id=member.id, + username=member.display_name, + action_type="mute", + message_id=db_message_id, + details=f"duration={duration_minutes}m offense={offense_num} score={score:.2f} categories={cat_str}", + )) + save_user_state(bot, dirty_users, member.id) + + +async def warn_user( + bot, message: discord.Message, score: float, + db_message_id: int | None, dirty_users: set[int], +): + timeout_config = bot.config.get("timeouts", {}) + cooldown = timeout_config.get("warning_cooldown_minutes", 5) + + if not bot.drama_tracker.can_warn(message.author.id, cooldown): + return + + bot.drama_tracker.record_warning(message.author.id) + + try: + await message.add_reaction("\u26a0\ufe0f") + except discord.HTTPException: + pass + + messages_config = bot.config.get("messages", {}) + warning_text = messages_config.get( + "warning", + "Easy there, {username}. The Breehavior Monitor is watching.", + ).format(username=message.author.display_name) + + await message.channel.send(warning_text) + await log_action( + message.guild, + f"**WARNING** | {message.author.mention} | Score: {score:.2f}", + ) + + logger.info("Warned %s (score %.2f)", message.author, score) + + asyncio.create_task(bot.db.save_action( + guild_id=message.guild.id, + user_id=message.author.id, + username=message.author.display_name, + action_type="warning", + message_id=db_message_id, + details=f"score={score:.2f}", + )) + save_user_state(bot, dirty_users, message.author.id) diff --git a/cogs/sentiment/channel_redirect.py b/cogs/sentiment/channel_redirect.py new file mode 100644 index 0000000..e0d5f07 --- /dev/null +++ b/cogs/sentiment/channel_redirect.py @@ -0,0 +1,95 @@ +import asyncio +import logging +from datetime import datetime, timedelta, timezone + +import discord + +from cogs.sentiment.log_utils import log_action + +logger = logging.getLogger("bcs.sentiment") + + +def build_channel_context(message: discord.Message, game_channels: dict) -> str: + """Build a channel context string for LLM game detection.""" + if not game_channels: + return "" + channel_name = getattr(message.channel, "name", "") + current_game = game_channels.get(channel_name) + lines = [] + if current_game: + lines.append(f"Current channel: #{channel_name} ({current_game})") + else: + lines.append(f"Current channel: #{channel_name}") + channel_list = ", ".join(f"#{ch} ({game})" for ch, game in game_channels.items()) + lines.append(f"Game channels: {channel_list}") + return "\n".join(lines) + + +async def handle_channel_redirect( + bot, message: discord.Message, detected_game: str, + game_channels: dict, db_message_id: int | None, + redirect_cooldowns: dict[int, datetime], +): + """Send a redirect message if the user is talking about a different game.""" + channel_name = getattr(message.channel, "name", "") + + # Only redirect if message is in a game channel + if channel_name not in game_channels: + return + + # No redirect needed if detected game matches current channel + if detected_game == channel_name: + return + + # Detected game must be a valid game channel + if detected_game not in game_channels: + return + + # Find the target channel in the guild + target_channel = discord.utils.get( + message.guild.text_channels, name=detected_game + ) + if not target_channel: + return + + # Check per-user cooldown + user_id = message.author.id + cooldown_minutes = bot.config.get("topic_drift", {}).get("remind_cooldown_minutes", 10) + now = datetime.now(timezone.utc) + last_redirect = redirect_cooldowns.get(user_id) + if last_redirect and (now - last_redirect) < timedelta(minutes=cooldown_minutes): + return + + redirect_cooldowns[user_id] = now + + messages_config = bot.config.get("messages", {}) + game_name = game_channels[detected_game] + redirect_text = messages_config.get( + "channel_redirect", + "Hey {username}, that sounds like {game} talk \u2014 head over to {channel} for that!", + ).format( + username=message.author.display_name, + game=game_name, + channel=target_channel.mention, + ) + + await message.channel.send(redirect_text) + + await log_action( + message.guild, + f"**CHANNEL REDIRECT** | {message.author.mention} | " + f"#{channel_name} \u2192 #{detected_game} ({game_name})", + ) + logger.info( + "Redirected %s from #%s to #%s (%s)", + message.author, channel_name, detected_game, game_name, + ) + + asyncio.create_task(bot.db.save_action( + guild_id=message.guild.id, + user_id=user_id, + username=message.author.display_name, + action_type="channel_redirect", + message_id=db_message_id, + details=f"from=#{channel_name} to=#{detected_game} game={game_name}", + )) diff --git a/cogs/sentiment/coherence.py b/cogs/sentiment/coherence.py new file mode 100644 index 0000000..722de5a --- /dev/null +++ b/cogs/sentiment/coherence.py @@ -0,0 +1,43 @@ +import asyncio +import logging + +import discord + +from cogs.sentiment.log_utils import log_action +from cogs.sentiment.state import save_user_state + +logger = logging.getLogger("bcs.sentiment") + + +async def handle_coherence_alert( + bot, message: discord.Message, degradation: dict, coherence_config: dict, + db_message_id: int | None, dirty_users: set[int], +): + flag = degradation["flag"] + messages_map = coherence_config.get("messages", {}) + alert_text = messages_map.get(flag, messages_map.get( + "default", "You okay there, {username}? That message was... something." + )).format(username=message.author.display_name) + + await message.channel.send(alert_text) + await log_action( + message.guild, + f"**COHERENCE ALERT** | {message.author.mention} | " + f"Score: {degradation['current']:.2f} | Baseline: {degradation['baseline']:.2f} | " + f"Drop: {degradation['drop']:.2f} | Flag: {flag}", + ) + logger.info( + "Coherence alert for %s: score=%.2f baseline=%.2f drop=%.2f flag=%s", + message.author, degradation["current"], degradation["baseline"], + degradation["drop"], flag, + ) + + asyncio.create_task(bot.db.save_action( + guild_id=message.guild.id, + user_id=message.author.id, + username=message.author.display_name, + action_type="coherence_alert", + message_id=db_message_id, + details=f"score={degradation['current']:.2f} baseline={degradation['baseline']:.2f} drop={degradation['drop']:.2f} flag={flag}", + )) + save_user_state(bot, dirty_users, message.author.id) diff --git a/cogs/sentiment/log_utils.py b/cogs/sentiment/log_utils.py new file mode 100644 index 0000000..b64fdc8 --- /dev/null +++ b/cogs/sentiment/log_utils.py @@ -0,0 +1,54 @@ +import logging + +import discord + +logger = logging.getLogger("bcs.sentiment") + + +def score_color(score: float) -> discord.Color: + if score >= 0.75: + return discord.Color.red() + if score >= 0.6: + return discord.Color.orange() + if score >= 0.3: + return discord.Color.yellow() + return discord.Color.green() + + +async def log_analysis( + message: discord.Message, score: float, drama_score: float, + categories: list[str], reasoning: str, off_topic: bool, topic_category: str, +): + log_channel = discord.utils.get( + message.guild.text_channels, name="bcs-log" + ) + if not log_channel: + return + + # Only log notable messages (score > 0.1) to avoid spam + if score <= 0.1: + return + + cat_str = ", ".join(c for c in categories if c != "none") or "none" + embed = discord.Embed( + title=f"Analysis: {message.author.display_name}", + description=f"#{message.channel.name}: {message.content[:200]}", + color=score_color(score), + ) + embed.add_field(name="Message Score", value=f"{score:.2f}", inline=True) + embed.add_field(name="Rolling Drama", value=f"{drama_score:.2f}", inline=True) + embed.add_field(name="Categories", value=cat_str, inline=True) + embed.add_field(name="Reasoning", value=reasoning[:1024] or "n/a", inline=False) + try: + await log_channel.send(embed=embed) + except discord.HTTPException: + pass + + +async def log_action(guild: discord.Guild, text: str): + log_channel = discord.utils.get(guild.text_channels, name="bcs-log") + if log_channel: + try: + await log_channel.send(text) + except discord.HTTPException: + pass diff --git a/cogs/sentiment/state.py b/cogs/sentiment/state.py new file mode 100644 index 0000000..81443a9 --- /dev/null +++ b/cogs/sentiment/state.py @@ -0,0 +1,41 @@ +import asyncio +import logging + +logger = logging.getLogger("bcs.sentiment") + + +def save_user_state(bot, dirty_users: set[int], user_id: int) -> None: + """Fire-and-forget save of a user's current state to DB.""" + user_data = bot.drama_tracker.get_user(user_id) + asyncio.create_task(bot.db.save_user_state( + user_id=user_id, + offense_count=user_data.offense_count, + immune=user_data.immune, + off_topic_count=user_data.off_topic_count, + baseline_coherence=user_data.baseline_coherence, + user_notes=user_data.notes or None, + warned=user_data.warned_since_reset, + last_offense_at=user_data.last_offense_time or None, + )) + dirty_users.discard(user_id) + + +async def flush_dirty_states(bot, dirty_users: set[int]) -> None: + """Save all dirty user states to DB.""" + if not dirty_users: + return + dirty = list(dirty_users) + dirty_users.clear() + for user_id in dirty: + user_data = bot.drama_tracker.get_user(user_id) + await bot.db.save_user_state( + user_id=user_id, + offense_count=user_data.offense_count, + immune=user_data.immune, + off_topic_count=user_data.off_topic_count, + baseline_coherence=user_data.baseline_coherence, + user_notes=user_data.notes or None, + warned=user_data.warned_since_reset, + last_offense_at=user_data.last_offense_time or None, + ) + logger.info("Flushed %d dirty user states to DB.", len(dirty)) diff --git a/cogs/sentiment/topic_drift.py b/cogs/sentiment/topic_drift.py new file mode 100644 index 0000000..95cbad7 --- /dev/null +++ b/cogs/sentiment/topic_drift.py @@ -0,0 +1,111 @@ +import asyncio +import logging + +import discord + +from cogs.sentiment.log_utils import log_action +from cogs.sentiment.state import save_user_state + +logger = logging.getLogger("bcs.sentiment") + + +async def handle_topic_drift( + bot, message: discord.Message, topic_category: str, topic_reasoning: str, + db_message_id: int | None, dirty_users: set[int], +): + config = bot.config.get("topic_drift", {}) + if not config.get("enabled", True): + return + + ignored = config.get("ignored_channels", []) + if message.channel.id in ignored or getattr(message.channel, "name", "") in ignored: + return + + dry_run = bot.config.get("monitoring", {}).get("dry_run", False) + if dry_run: + return + + tracker = bot.drama_tracker + user_id = message.author.id + cooldown = config.get("remind_cooldown_minutes", 10) + + if not tracker.can_topic_remind(user_id, cooldown): + return + + count = tracker.record_off_topic(user_id) + escalation_threshold = config.get("escalation_count", 3) + messages_config = bot.config.get("messages", {}) + + if count >= escalation_threshold and not tracker.was_owner_notified(user_id): + tracker.mark_owner_notified(user_id) + owner = message.guild.owner + if owner: + dm_text = messages_config.get( + "topic_owner_dm", + "Heads up: {username} keeps going off-topic in #{channel}. Reminded {count} times.", + ).format( + username=message.author.display_name, + channel=message.channel.name, + count=count, + ) + try: + await owner.send(dm_text) + except discord.HTTPException: + logger.warning("Could not DM server owner about topic drift.") + + await log_action( + message.guild, + f"**TOPIC DRIFT \u2014 OWNER NOTIFIED** | {message.author.mention} | " + f"Off-topic count: {count} | Category: {topic_category}", + ) + logger.info("Notified owner about %s topic drift (count %d)", message.author, count) + + asyncio.create_task(bot.db.save_action( + guild_id=message.guild.id, user_id=user_id, + username=message.author.display_name, + action_type="topic_escalation", message_id=db_message_id, + details=f"off_topic_count={count} category={topic_category}", + )) + save_user_state(bot, dirty_users, user_id) + + elif count >= 2: + nudge_text = messages_config.get( + "topic_nudge", + "{username}, let's keep it to gaming talk in here.", + ).format(username=message.author.display_name) + await message.channel.send(nudge_text) + await log_action( + message.guild, + f"**TOPIC NUDGE** | {message.author.mention} | " + f"Off-topic count: {count} | Category: {topic_category}", + ) + logger.info("Topic nudge for %s (count %d)", message.author, count) + + asyncio.create_task(bot.db.save_action( + guild_id=message.guild.id, user_id=user_id, + username=message.author.display_name, + action_type="topic_nudge", message_id=db_message_id, + details=f"off_topic_count={count} category={topic_category}", + )) + save_user_state(bot, dirty_users, user_id) + + else: + remind_text = messages_config.get( + "topic_remind", + "Hey {username}, this is a gaming server \u2014 maybe take the personal stuff to DMs?", + ).format(username=message.author.display_name) + await message.channel.send(remind_text) + await log_action( + message.guild, + f"**TOPIC REMIND** | {message.author.mention} | " + f"Category: {topic_category} | {topic_reasoning}", + ) + logger.info("Topic remind for %s (count %d)", message.author, count) + + asyncio.create_task(bot.db.save_action( + guild_id=message.guild.id, user_id=user_id, + username=message.author.display_name, + action_type="topic_remind", message_id=db_message_id, + details=f"off_topic_count={count} category={topic_category} reasoning={topic_reasoning}", + )) + save_user_state(bot, dirty_users, user_id)