import asyncio import logging import random import re from collections import deque from datetime import datetime, timedelta, timezone from pathlib import Path import discord from discord.ext import commands logger = logging.getLogger("bcs.chat") _PROMPTS_DIR = Path(__file__).resolve().parent.parent / "prompts" IMAGE_ROAST = (_PROMPTS_DIR / "scoreboard_roast.txt").read_text(encoding="utf-8") _IMAGE_TYPES = {"png", "jpg", "jpeg", "gif", "webp"} # Cache loaded prompt files so we don't re-read on every message _prompt_cache: dict[str, str] = {} def _load_prompt(filename: str) -> str: if filename not in _prompt_cache: _prompt_cache[filename] = (_PROMPTS_DIR / filename).read_text(encoding="utf-8") return _prompt_cache[filename] _TOPIC_KEYWORDS = { "gta", "warzone", "cod", "battlefield", "fortnite", "apex", "valorant", "minecraft", "roblox", "league", "dota", "overwatch", "destiny", "halo", "work", "job", "school", "college", "girlfriend", "boyfriend", "wife", "husband", "dog", "cat", "pet", "car", "music", "movie", "food", } _GENERIC_CHANNELS = {"general", "off-topic", "memes"} def _extract_topic_keywords(text: str, channel_name: str) -> list[str]: """Extract topic keywords from message text and channel name.""" words = set(text.lower().split()) & _TOPIC_KEYWORDS if channel_name.lower() not in _GENERIC_CHANNELS: words.add(channel_name.lower()) return list(words)[:5] def _format_relative_time(dt: datetime) -> str: """Return a human-readable relative time string.""" now = datetime.now(timezone.utc) # Ensure dt is timezone-aware if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) delta = now - dt seconds = int(delta.total_seconds()) if seconds < 60: return "just now" minutes = seconds // 60 if minutes < 60: return f"{minutes}m ago" hours = minutes // 60 if hours < 24: return f"{hours}h ago" days = hours // 24 if days == 1: return "yesterday" if days < 7: return f"{days} days ago" weeks = days // 7 if weeks < 5: return f"{weeks}w ago" months = days // 30 return f"{months}mo ago" class ChatCog(commands.Cog): def __init__(self, bot: commands.Bot): self.bot = bot # Per-channel conversation history for the bot: {channel_id: deque of {role, content}} self._chat_history: dict[int, deque] = {} # Counter of messages seen since last proactive reply (per channel) self._messages_since_reply: dict[int, int] = {} # Users whose profile has been updated and needs DB flush self._dirty_users: set[int] = set() def _get_active_prompt(self) -> str: """Load the chat prompt for the current mode.""" mode_config = self.bot.get_mode_config() prompt_file = mode_config.get("prompt_file", "personalities/chat_personality.txt") return _load_prompt(prompt_file) async def _build_memory_context(self, user_id: int, message_text: str, channel_name: str) -> str: """Build a layered memory context block for the chat prompt.""" lines = [] # Layer 1: Profile (always) profile = self.bot.drama_tracker.get_user_notes(user_id) if profile: lines.append(f"Profile: {profile}") # Layer 2: Recent memories (last 5) recent_memories = await self.bot.db.get_recent_memories(user_id, limit=5) if recent_memories: parts = [] for mem in recent_memories: time_str = _format_relative_time(mem["created_at"]) parts.append(f"{mem['memory']} ({time_str})") lines.append("Recent: " + " | ".join(parts)) # Layer 3: Topic-matched memories (deduplicated against recent) keywords = _extract_topic_keywords(message_text, channel_name) if keywords: topic_memories = await self.bot.db.get_memories_by_topics(user_id, keywords, limit=5) # Deduplicate against recent memories recent_texts = {mem["memory"] for mem in recent_memories} if recent_memories else set() unique_topic = [mem for mem in topic_memories if mem["memory"] not in recent_texts] if unique_topic: parts = [] for mem in unique_topic: time_str = _format_relative_time(mem["created_at"]) parts.append(f"{mem['memory']} ({time_str})") lines.append("Relevant: " + " | ".join(parts)) if not lines: return "" return "[What you know about this person:]\n" + "\n".join(lines) async def _extract_and_save_memories( self, user_id: int, username: str, conversation: list[dict[str, str]], ) -> None: """Background task: extract memories from conversation and save them.""" try: current_profile = self.bot.drama_tracker.get_user_notes(user_id) result = await self.bot.llm.extract_memories( conversation, username, current_profile, ) if not result: return # Save expiring memories for mem in result.get("memories", []): if mem["expiration"] == "permanent": continue # permanent facts go into profile_update exp_days = {"1d": 1, "3d": 3, "7d": 7, "30d": 30} days = exp_days.get(mem["expiration"], 7) expires_at = datetime.now(timezone.utc) + timedelta(days=days) await self.bot.db.save_memory( user_id=user_id, memory=mem["memory"], topics=",".join(mem["topics"]), importance=mem["importance"], expires_at=expires_at, source="chat", ) # Prune if over cap await self.bot.db.prune_excess_memories(user_id) # Update profile if warranted profile_update = result.get("profile_update") if profile_update: self.bot.drama_tracker.set_user_profile(user_id, profile_update) self._dirty_users.add(user_id) logger.info( "Extracted %d memories for %s (profile_update=%s)", len(result.get("memories", [])), username, bool(profile_update), ) except Exception: logger.exception("Failed to extract memories for %s", username) @commands.Cog.listener() async def on_message(self, message: discord.Message): if message.author.bot: return if not message.guild: return should_reply = False is_proactive = False reply_context = "" # Text of the message being replied to # Check if bot is @mentioned if self.bot.user in message.mentions: should_reply = True # Check if replying to a message if message.reference and message.reference.message_id: try: ref_msg = message.reference.cached_message if ref_msg is None: ref_msg = await message.channel.fetch_message( message.reference.message_id ) if ref_msg.author.id == self.bot.user.id: # Replying to the bot's own message — continue conversation should_reply = True if ref_msg.content: reply_context = f"[Replying to bot's message: {ref_msg.content[:300]}]\n" elif should_reply: # @mentioned the bot while replying to someone else — include that message ref_text = ref_msg.content[:500] if ref_msg.content else "(no text)" reply_context = f"[{ref_msg.author.display_name} said: {ref_text}]\n" except discord.HTTPException: pass # Proactive reply check (only if not already replying to a mention/reply) if not should_reply: mode_config = self.bot.get_mode_config() if mode_config.get("proactive_replies", False): ch_id = message.channel.id self._messages_since_reply[ch_id] = self._messages_since_reply.get(ch_id, 0) + 1 cooldown = self.bot.config.get("modes", {}).get("proactive_cooldown_messages", 5) reply_chance = mode_config.get("reply_chance", 0.0) if ( self._messages_since_reply[ch_id] >= cooldown and reply_chance > 0 and random.random() < reply_chance and message.content and message.content.strip() ): should_reply = True is_proactive = True if not should_reply: return # Build conversation context ch_id = message.channel.id if ch_id not in self._chat_history: self._chat_history[ch_id] = deque(maxlen=10) # Clean the mention out of the message content content = message.content.replace(f"<@{self.bot.user.id}>", "").strip() # Check for image attachments (on this message or the referenced message) image_attachment = None for att in message.attachments: ext = att.filename.rsplit(".", 1)[-1].lower() if "." in att.filename else "" if ext in _IMAGE_TYPES: image_attachment = att break if not image_attachment and message.reference: try: ref = message.reference.cached_message or await message.channel.fetch_message( message.reference.message_id ) for att in ref.attachments: ext = att.filename.rsplit(".", 1)[-1].lower() if "." in att.filename else "" if ext in _IMAGE_TYPES: image_attachment = att break except discord.HTTPException: pass typing_ctx = None async def start_typing(): nonlocal typing_ctx typing_ctx = message.channel.typing() await typing_ctx.__aenter__() if image_attachment: # --- Image path: roast the image --- image_bytes = await image_attachment.read() user_text = content if content else "" logger.info( "Image roast request in #%s from %s (%s, %s)", message.channel.name, message.author.display_name, image_attachment.filename, user_text[:80], ) response = await self.bot.llm_heavy.analyze_image( image_bytes, IMAGE_ROAST, user_text=user_text, on_first_token=start_typing, ) else: # --- Text-only path: normal chat --- if not content: content = "(just pinged me)" if not is_proactive else message.content # If a mention scan is running, await it so we can include findings scan_summary = "" if self.bot.user in message.mentions: sentiment_cog = self.bot.get_cog("SentimentCog") if sentiment_cog: task = sentiment_cog._mention_scan_tasks.get(message.channel.id) if task and not task.done(): try: await asyncio.wait_for(asyncio.shield(task), timeout=45) except (asyncio.TimeoutError, asyncio.CancelledError): pass scan_summary = sentiment_cog._mention_scan_results.pop(message.id, "") # Add drama score context only when noteworthy drama_score = self.bot.drama_tracker.get_drama_score(message.author.id) user_data = self.bot.drama_tracker.get_user(message.author.id) context_parts = [f"#{message.channel.name}"] if drama_score >= 0.2: context_parts.append(f"drama score {drama_score:.2f}/1.0") if user_data.offense_count > 0: context_parts.append(f"{user_data.offense_count} offense(s)") score_context = f"[Server context: {message.author.display_name} — {', '.join(context_parts)}]" # Gather memory context and recent messages for richer context extra_context = "" memory_context = await self._build_memory_context( message.author.id, content, message.channel.name, ) if memory_context: extra_context += memory_context + "\n" # Include mention scan findings if available if scan_summary: extra_context += f"[You just scanned recent chat. Results: {scan_summary}]\n" # When @mentioned, fetch recent channel conversation (all users) # so the bot has full context of what's being discussed. # For proactive/reply-to-bot, just fetch the mentioner's messages. recent_msgs = [] fetch_all_users = self.bot.user in message.mentions try: async for msg in message.channel.history(limit=50, before=message): if not msg.content or not msg.content.strip(): continue if msg.author.bot: # Include bot's own replies for conversational continuity if msg.author.id == self.bot.user.id: recent_msgs.append((msg.author.display_name, msg.content[:200])) if len(recent_msgs) >= 15: break continue if fetch_all_users or msg.author.id == message.author.id: recent_msgs.append((msg.author.display_name, msg.content[:200])) if len(recent_msgs) >= 15: break except discord.HTTPException: pass if recent_msgs: recent_lines = "\n".join( f"- {name}: {text}" for name, text in reversed(recent_msgs) ) label = "Recent conversation" if fetch_all_users else f"{message.author.display_name}'s recent messages" extra_context += f"[{label}:\n{recent_lines}]\n" self._chat_history[ch_id].append( {"role": "user", "content": f"{score_context}\n{extra_context}{reply_context}{message.author.display_name}: {content}"} ) active_prompt = self._get_active_prompt() # Collect recent bot replies so the LLM can avoid repeating itself recent_bot_replies = [ m["content"][:150] for m in self._chat_history[ch_id] if m["role"] == "assistant" ][-5:] response = await self.bot.llm_chat.chat( list(self._chat_history[ch_id]), active_prompt, on_first_token=start_typing, recent_bot_replies=recent_bot_replies, ) if typing_ctx: await typing_ctx.__aexit__(None, None, None) # Strip leaked metadata the LLM may echo back. # The LLM often dumps paraphrased context and style labels in [brackets] # before/between its actual answer. Split on those bracket lines and # keep only the last non-empty segment — the real roast is always last. if response: segments = re.split(r"^\s*\[[^\]]*\]\s*$", response, flags=re.MULTILINE) segments = [s.strip() for s in segments if s.strip()] response = segments[-1] if segments else "" if not response: log_channel = discord.utils.get(message.guild.text_channels, name="bcs-log") if log_channel: try: await log_channel.send( f"**LLM OFFLINE** | Failed to generate reply to " f"{message.author.mention} in #{message.channel.name}" ) except discord.HTTPException: pass logger.warning("LLM returned no response for %s in #%s", message.author, message.channel.name) return if not image_attachment: self._chat_history[ch_id].append( {"role": "assistant", "content": response} ) # Reset proactive cooldown counter for this channel if is_proactive: self._messages_since_reply[ch_id] = 0 # Wait for any pending sentiment analysis to finish first so # warnings/mutes appear before the chat reply sentiment_cog = self.bot.get_cog("SentimentCog") if sentiment_cog: task = sentiment_cog._debounce_tasks.get(message.channel.id) if task and not task.done(): try: await asyncio.wait_for(asyncio.shield(task), timeout=15) except (asyncio.TimeoutError, asyncio.CancelledError): pass await message.reply(response, mention_author=False) # Fire-and-forget memory extraction if not image_attachment: asyncio.create_task(self._extract_and_save_memories( message.author.id, message.author.display_name, list(self._chat_history[ch_id]), )) reply_type = "proactive" if is_proactive else "chat" logger.info( "%s reply in #%s to %s: %s", reply_type.capitalize(), message.channel.name, message.author.display_name, response[:100], ) @commands.Cog.listener() async def on_raw_reaction_add(self, payload: discord.RawReactionActionEvent): # Ignore bot's own reactions if payload.user_id == self.bot.user.id: return # 50% chance to reply if random.random() > 0.50: return # Only react to reactions on the bot's own messages channel = self.bot.get_channel(payload.channel_id) if channel is None: return try: message = await channel.fetch_message(payload.message_id) except discord.HTTPException: return if message.author.id != self.bot.user.id: return # Get the user who reacted guild = self.bot.get_guild(payload.guild_id) if payload.guild_id else None if guild is None: return member = guild.get_member(payload.user_id) if member is None: return emoji = str(payload.emoji) # Build a one-shot prompt for the LLM ch_id = channel.id if ch_id not in self._chat_history: self._chat_history[ch_id] = deque(maxlen=10) context = ( f"[Server context: {member.display_name} — #{channel.name}]\n" f"[{member.display_name} reacted to your message with {emoji}]\n" f"[Your message was: {message.content[:300]}]\n" f"{member.display_name}: *reacted {emoji}*" ) self._chat_history[ch_id].append({"role": "user", "content": context}) active_prompt = self._get_active_prompt() recent_bot_replies = [ m["content"][:150] for m in self._chat_history[ch_id] if m["role"] == "assistant" ][-5:] response = await self.bot.llm_chat.chat( list(self._chat_history[ch_id]), active_prompt, recent_bot_replies=recent_bot_replies, ) # Strip leaked metadata (same approach as main chat path) if response: segments = re.split(r"^\s*\[[^\]]*\]\s*$", response, flags=re.MULTILINE) segments = [s.strip() for s in segments if s.strip()] response = segments[-1] if segments else "" if not response: return self._chat_history[ch_id].append({"role": "assistant", "content": response}) await channel.send(response) logger.info( "Reaction reply in #%s to %s (%s): %s", channel.name, member.display_name, emoji, response[:100], ) async def setup(bot: commands.Bot): await bot.add_cog(ChatCog(bot))