- Parse display names with ': ' split to handle colons in names - Reset cooldown to half instead of subtract-3 to reduce LLM call frequency - Remove redundant message.guild check Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
589 lines
24 KiB
Python
589 lines
24 KiB
Python
import asyncio
|
|
import logging
|
|
import random
|
|
import re
|
|
from collections import deque
|
|
from datetime import datetime, timedelta, timezone
|
|
from pathlib import Path
|
|
|
|
import discord
|
|
from discord.ext import commands
|
|
|
|
logger = logging.getLogger("bcs.chat")
|
|
|
|
_PROMPTS_DIR = Path(__file__).resolve().parent.parent / "prompts"
|
|
IMAGE_ROAST = (_PROMPTS_DIR / "scoreboard_roast.txt").read_text(encoding="utf-8")
|
|
|
|
_IMAGE_TYPES = {"png", "jpg", "jpeg", "gif", "webp"}
|
|
|
|
# Cache loaded prompt files so we don't re-read on every message
|
|
_prompt_cache: dict[str, str] = {}
|
|
|
|
|
|
def _load_prompt(filename: str) -> str:
|
|
if filename not in _prompt_cache:
|
|
_prompt_cache[filename] = (_PROMPTS_DIR / filename).read_text(encoding="utf-8")
|
|
return _prompt_cache[filename]
|
|
|
|
|
|
_TOPIC_KEYWORDS = {
|
|
"gta", "warzone", "cod", "battlefield", "fortnite", "apex", "valorant",
|
|
"minecraft", "roblox", "league", "dota", "overwatch", "destiny", "halo",
|
|
"work", "job", "school", "college", "girlfriend", "boyfriend", "wife",
|
|
"husband", "dog", "cat", "pet", "car", "music", "movie", "food",
|
|
}
|
|
|
|
_GENERIC_CHANNELS = {"general", "off-topic", "memes"}
|
|
|
|
|
|
def _extract_topic_keywords(text: str, channel_name: str) -> list[str]:
|
|
"""Extract topic keywords from message text and channel name."""
|
|
words = set(text.lower().split()) & _TOPIC_KEYWORDS
|
|
if channel_name.lower() not in _GENERIC_CHANNELS:
|
|
words.add(channel_name.lower())
|
|
return list(words)[:5]
|
|
|
|
|
|
def _format_relative_time(dt: datetime) -> str:
|
|
"""Return a human-readable relative time string."""
|
|
now = datetime.now(timezone.utc)
|
|
# Ensure dt is timezone-aware
|
|
if dt.tzinfo is None:
|
|
dt = dt.replace(tzinfo=timezone.utc)
|
|
delta = now - dt
|
|
seconds = int(delta.total_seconds())
|
|
if seconds < 60:
|
|
return "just now"
|
|
minutes = seconds // 60
|
|
if minutes < 60:
|
|
return f"{minutes}m ago"
|
|
hours = minutes // 60
|
|
if hours < 24:
|
|
return f"{hours}h ago"
|
|
days = hours // 24
|
|
if days == 1:
|
|
return "yesterday"
|
|
if days < 7:
|
|
return f"{days} days ago"
|
|
weeks = days // 7
|
|
if weeks < 5:
|
|
return f"{weeks}w ago"
|
|
months = days // 30
|
|
return f"{months}mo ago"
|
|
|
|
|
|
class ChatCog(commands.Cog):
|
|
|
|
@staticmethod
|
|
def _split_afterthought(response: str) -> tuple[str, str | None]:
|
|
"""Split a response on ||| into (main_reply, afterthought)."""
|
|
if "|||" not in response:
|
|
return response, None
|
|
parts = response.split("|||", 1)
|
|
main = parts[0].strip()
|
|
after = parts[1].strip() or None
|
|
if not main:
|
|
return response, None
|
|
return main, after
|
|
|
|
def __init__(self, bot: commands.Bot):
|
|
self.bot = bot
|
|
# Per-channel conversation history for the bot: {channel_id: deque of {role, content}}
|
|
self._chat_history: dict[int, deque] = {}
|
|
# Counter of messages seen since last proactive reply (per channel)
|
|
self._messages_since_reply: dict[int, int] = {}
|
|
# Users whose profile has been updated and needs DB flush
|
|
self._dirty_users: set[int] = set()
|
|
|
|
def _get_active_prompt(self) -> str:
|
|
"""Load the chat prompt for the current mode."""
|
|
mode_config = self.bot.get_mode_config()
|
|
prompt_file = mode_config.get("prompt_file", "personalities/chat_personality.txt")
|
|
return _load_prompt(prompt_file)
|
|
|
|
async def _build_memory_context(self, user_id: int, message_text: str, channel_name: str) -> str:
|
|
"""Build a layered memory context block for the chat prompt."""
|
|
lines = []
|
|
|
|
# Layer 1: Profile (always)
|
|
profile = self.bot.drama_tracker.get_user_notes(user_id)
|
|
if profile:
|
|
lines.append(f"Profile: {profile}")
|
|
|
|
# Layer 2: Recent memories (last 5)
|
|
recent_memories = await self.bot.db.get_recent_memories(user_id, limit=5)
|
|
if recent_memories:
|
|
parts = []
|
|
for mem in recent_memories:
|
|
time_str = _format_relative_time(mem["created_at"])
|
|
parts.append(f"{mem['memory']} ({time_str})")
|
|
lines.append("Recent: " + " | ".join(parts))
|
|
|
|
# Layer 3: Topic-matched memories (deduplicated against recent)
|
|
keywords = _extract_topic_keywords(message_text, channel_name)
|
|
if keywords:
|
|
topic_memories = await self.bot.db.get_memories_by_topics(user_id, keywords, limit=5)
|
|
# Deduplicate against recent memories
|
|
recent_texts = {mem["memory"] for mem in recent_memories} if recent_memories else set()
|
|
unique_topic = [mem for mem in topic_memories if mem["memory"] not in recent_texts]
|
|
if unique_topic:
|
|
parts = []
|
|
for mem in unique_topic:
|
|
time_str = _format_relative_time(mem["created_at"])
|
|
parts.append(f"{mem['memory']} ({time_str})")
|
|
lines.append("Relevant: " + " | ".join(parts))
|
|
|
|
if not lines:
|
|
return ""
|
|
|
|
return "[What you know about this person:]\n" + "\n".join(lines)
|
|
|
|
async def _extract_and_save_memories(
|
|
self, user_id: int, username: str, conversation: list[dict[str, str]],
|
|
) -> None:
|
|
"""Background task: extract memories from conversation and save them."""
|
|
try:
|
|
current_profile = self.bot.drama_tracker.get_user_notes(user_id)
|
|
result = await self.bot.llm.extract_memories(
|
|
conversation, username, current_profile,
|
|
)
|
|
if not result:
|
|
return
|
|
|
|
# Save expiring memories
|
|
for mem in result.get("memories", []):
|
|
if mem["expiration"] == "permanent":
|
|
continue # permanent facts go into profile_update
|
|
exp_days = {"1d": 1, "3d": 3, "7d": 7, "30d": 30}
|
|
days = exp_days.get(mem["expiration"], 7)
|
|
expires_at = datetime.now(timezone.utc) + timedelta(days=days)
|
|
await self.bot.db.save_memory(
|
|
user_id=user_id,
|
|
memory=mem["memory"],
|
|
topics=",".join(mem["topics"]),
|
|
importance=mem["importance"],
|
|
expires_at=expires_at,
|
|
source="chat",
|
|
)
|
|
# Prune if over cap
|
|
await self.bot.db.prune_excess_memories(user_id)
|
|
|
|
# Update profile if warranted
|
|
profile_update = result.get("profile_update")
|
|
if profile_update:
|
|
self.bot.drama_tracker.set_user_profile(user_id, profile_update)
|
|
self._dirty_users.add(user_id)
|
|
|
|
logger.info(
|
|
"Extracted %d memories for %s (profile_update=%s)",
|
|
len(result.get("memories", [])),
|
|
username,
|
|
bool(profile_update),
|
|
)
|
|
except Exception:
|
|
logger.exception("Failed to extract memories for %s", username)
|
|
|
|
@commands.Cog.listener()
|
|
async def on_message(self, message: discord.Message):
|
|
if message.author.bot:
|
|
return
|
|
|
|
if not message.guild:
|
|
return
|
|
|
|
should_reply = False
|
|
is_proactive = False
|
|
reply_context = "" # Text of the message being replied to
|
|
|
|
# Check if bot is @mentioned
|
|
if self.bot.user in message.mentions:
|
|
should_reply = True
|
|
|
|
# Check if replying to a message
|
|
if message.reference and message.reference.message_id:
|
|
try:
|
|
ref_msg = message.reference.cached_message
|
|
if ref_msg is None:
|
|
ref_msg = await message.channel.fetch_message(
|
|
message.reference.message_id
|
|
)
|
|
if ref_msg.author.id == self.bot.user.id:
|
|
# Replying to the bot's own message — continue conversation
|
|
should_reply = True
|
|
if ref_msg.content:
|
|
reply_context = f"[Replying to bot's message: {ref_msg.content[:300]}]\n"
|
|
elif should_reply:
|
|
# @mentioned the bot while replying to someone else — include that message
|
|
ref_text = ref_msg.content[:500] if ref_msg.content else "(no text)"
|
|
reply_context = f"[{ref_msg.author.display_name} said: {ref_text}]\n"
|
|
except discord.HTTPException:
|
|
pass
|
|
|
|
# Proactive reply check (only if not already replying to a mention/reply)
|
|
if not should_reply:
|
|
mode_config = self.bot.get_mode_config()
|
|
if mode_config.get("proactive_replies", False):
|
|
ch_id = message.channel.id
|
|
self._messages_since_reply[ch_id] = self._messages_since_reply.get(ch_id, 0) + 1
|
|
cooldown = self.bot.config.get("modes", {}).get("proactive_cooldown_messages", 5)
|
|
|
|
if (
|
|
self._messages_since_reply[ch_id] >= cooldown
|
|
and message.content and message.content.strip()
|
|
):
|
|
# Gather recent messages for relevance check
|
|
recent_for_check = []
|
|
try:
|
|
async for msg in message.channel.history(limit=5, before=message):
|
|
if msg.content and msg.content.strip() and not msg.author.bot:
|
|
recent_for_check.append(
|
|
f"{msg.author.display_name}: {msg.content[:200]}"
|
|
)
|
|
except discord.HTTPException:
|
|
pass
|
|
recent_for_check.reverse()
|
|
recent_for_check.append(
|
|
f"{message.author.display_name}: {message.content[:200]}"
|
|
)
|
|
|
|
# Build memory context for users in recent messages
|
|
memory_parts = []
|
|
seen_users = set()
|
|
for line in recent_for_check:
|
|
name = line.split(": ", 1)[0]
|
|
if name not in seen_users:
|
|
seen_users.add(name)
|
|
member = discord.utils.find(
|
|
lambda m, n=name: m.display_name == n,
|
|
message.guild.members,
|
|
)
|
|
if member:
|
|
profile = self.bot.drama_tracker.get_user_notes(member.id)
|
|
if profile:
|
|
memory_parts.append(f"{name}: {profile}")
|
|
|
|
memory_ctx = "\n".join(memory_parts) if memory_parts else ""
|
|
|
|
is_relevant = await self.bot.llm.check_reply_relevance(
|
|
recent_for_check, memory_ctx,
|
|
)
|
|
|
|
if is_relevant:
|
|
reply_chance = mode_config.get("reply_chance", 0.0)
|
|
if reply_chance > 0 and random.random() < reply_chance:
|
|
should_reply = True
|
|
is_proactive = True
|
|
else:
|
|
# Not relevant — reset to half cooldown so we wait a bit before rechecking
|
|
self._messages_since_reply[ch_id] = cooldown // 2
|
|
|
|
if not should_reply:
|
|
return
|
|
|
|
# Build conversation context
|
|
ch_id = message.channel.id
|
|
if ch_id not in self._chat_history:
|
|
self._chat_history[ch_id] = deque(maxlen=10)
|
|
|
|
# Clean the mention out of the message content
|
|
content = message.content.replace(f"<@{self.bot.user.id}>", "").strip()
|
|
|
|
# Check for image attachments (on this message or the referenced message)
|
|
image_attachment = None
|
|
for att in message.attachments:
|
|
ext = att.filename.rsplit(".", 1)[-1].lower() if "." in att.filename else ""
|
|
if ext in _IMAGE_TYPES:
|
|
image_attachment = att
|
|
break
|
|
if not image_attachment and message.reference:
|
|
try:
|
|
ref = message.reference.cached_message or await message.channel.fetch_message(
|
|
message.reference.message_id
|
|
)
|
|
for att in ref.attachments:
|
|
ext = att.filename.rsplit(".", 1)[-1].lower() if "." in att.filename else ""
|
|
if ext in _IMAGE_TYPES:
|
|
image_attachment = att
|
|
break
|
|
except discord.HTTPException:
|
|
pass
|
|
|
|
typing_ctx = None
|
|
|
|
async def start_typing():
|
|
nonlocal typing_ctx
|
|
typing_ctx = message.channel.typing()
|
|
await typing_ctx.__aenter__()
|
|
|
|
if image_attachment:
|
|
# --- Image path: roast the image ---
|
|
image_bytes = await image_attachment.read()
|
|
user_text = content if content else ""
|
|
logger.info(
|
|
"Image roast request in #%s from %s (%s, %s)",
|
|
message.channel.name,
|
|
message.author.display_name,
|
|
image_attachment.filename,
|
|
user_text[:80],
|
|
)
|
|
ext = image_attachment.filename.rsplit(".", 1)[-1].lower() if "." in image_attachment.filename else "png"
|
|
mime = f"image/{'jpeg' if ext == 'jpg' else ext}"
|
|
response = await self.bot.llm_heavy.analyze_image(
|
|
image_bytes,
|
|
IMAGE_ROAST,
|
|
user_text=user_text,
|
|
on_first_token=start_typing,
|
|
media_type=mime,
|
|
)
|
|
else:
|
|
# --- Text-only path: normal chat ---
|
|
if not content:
|
|
content = "(just pinged me)" if not is_proactive else message.content
|
|
|
|
# If a mention scan is running, await it so we can include findings
|
|
scan_summary = ""
|
|
if self.bot.user in message.mentions:
|
|
sentiment_cog = self.bot.get_cog("SentimentCog")
|
|
if sentiment_cog:
|
|
task = sentiment_cog._mention_scan_tasks.get(message.channel.id)
|
|
if task and not task.done():
|
|
try:
|
|
await asyncio.wait_for(asyncio.shield(task), timeout=45)
|
|
except (asyncio.TimeoutError, asyncio.CancelledError):
|
|
pass
|
|
scan_summary = sentiment_cog._mention_scan_results.pop(message.id, "")
|
|
|
|
# Add drama score context only when noteworthy
|
|
drama_score = self.bot.drama_tracker.get_drama_score(message.author.id)
|
|
user_data = self.bot.drama_tracker.get_user(message.author.id)
|
|
context_parts = [f"#{message.channel.name}"]
|
|
if drama_score >= 0.2:
|
|
context_parts.append(f"drama score {drama_score:.2f}/1.0")
|
|
if user_data.offense_count > 0:
|
|
context_parts.append(f"{user_data.offense_count} offense(s)")
|
|
score_context = f"[Server context: {message.author.display_name} — {', '.join(context_parts)}]"
|
|
|
|
# Gather memory context and recent messages for richer context
|
|
extra_context = ""
|
|
memory_context = await self._build_memory_context(
|
|
message.author.id, content, message.channel.name,
|
|
)
|
|
if memory_context:
|
|
extra_context += memory_context + "\n"
|
|
|
|
# Include mention scan findings if available
|
|
if scan_summary:
|
|
extra_context += f"[You just scanned recent chat. Results: {scan_summary}]\n"
|
|
|
|
# When @mentioned, fetch recent channel conversation (all users)
|
|
# so the bot has full context of what's being discussed.
|
|
# For proactive/reply-to-bot, just fetch the mentioner's messages.
|
|
recent_msgs = []
|
|
fetch_all_users = self.bot.user in message.mentions
|
|
try:
|
|
async for msg in message.channel.history(limit=50, before=message):
|
|
if not msg.content or not msg.content.strip():
|
|
continue
|
|
if msg.author.bot:
|
|
# Include bot's own replies for conversational continuity
|
|
if msg.author.id == self.bot.user.id:
|
|
recent_msgs.append((msg.author.display_name, msg.content[:200]))
|
|
if len(recent_msgs) >= 15:
|
|
break
|
|
continue
|
|
if fetch_all_users or msg.author.id == message.author.id:
|
|
recent_msgs.append((msg.author.display_name, msg.content[:200]))
|
|
if len(recent_msgs) >= 15:
|
|
break
|
|
except discord.HTTPException:
|
|
pass
|
|
if recent_msgs:
|
|
recent_lines = "\n".join(
|
|
f"- {name}: {text}" for name, text in reversed(recent_msgs)
|
|
)
|
|
label = "Recent conversation" if fetch_all_users else f"{message.author.display_name}'s recent messages"
|
|
extra_context += f"[{label}:\n{recent_lines}]\n"
|
|
|
|
self._chat_history[ch_id].append(
|
|
{"role": "user", "content": f"{score_context}\n{extra_context}{reply_context}{message.author.display_name}: {content}"}
|
|
)
|
|
|
|
active_prompt = self._get_active_prompt()
|
|
|
|
# Collect recent bot replies so the LLM can avoid repeating itself
|
|
recent_bot_replies = [
|
|
m["content"][:150] for m in self._chat_history[ch_id]
|
|
if m["role"] == "assistant"
|
|
][-5:]
|
|
|
|
response = await self.bot.llm_chat.chat(
|
|
list(self._chat_history[ch_id]),
|
|
active_prompt,
|
|
on_first_token=start_typing,
|
|
recent_bot_replies=recent_bot_replies,
|
|
)
|
|
|
|
if typing_ctx:
|
|
await typing_ctx.__aexit__(None, None, None)
|
|
|
|
# Strip leaked metadata the LLM may echo back.
|
|
# The LLM often dumps paraphrased context and style labels in [brackets]
|
|
# before/between its actual answer. Split on those bracket lines and
|
|
# keep only the last non-empty segment — the real roast is always last.
|
|
if response:
|
|
segments = re.split(r"^\s*\[[^\]]*\]\s*$", response, flags=re.MULTILINE)
|
|
segments = [s.strip() for s in segments if s.strip()]
|
|
response = segments[-1] if segments else ""
|
|
|
|
if not response:
|
|
log_channel = discord.utils.get(message.guild.text_channels, name="bcs-log")
|
|
if log_channel:
|
|
try:
|
|
await log_channel.send(
|
|
f"**LLM OFFLINE** | Failed to generate reply to "
|
|
f"{message.author.mention} in #{message.channel.name}"
|
|
)
|
|
except discord.HTTPException:
|
|
pass
|
|
logger.warning("LLM returned no response for %s in #%s", message.author, message.channel.name)
|
|
return
|
|
|
|
# Split afterthoughts (triple-pipe delimiter)
|
|
main_reply, afterthought = self._split_afterthought(response)
|
|
|
|
# Store cleaned content in history (no ||| delimiter)
|
|
if not image_attachment:
|
|
clean_for_history = f"{main_reply}\n{afterthought}" if afterthought else main_reply
|
|
self._chat_history[ch_id].append(
|
|
{"role": "assistant", "content": clean_for_history}
|
|
)
|
|
|
|
# Reset proactive cooldown counter for this channel
|
|
if is_proactive:
|
|
self._messages_since_reply[ch_id] = 0
|
|
|
|
# Wait for any pending sentiment analysis to finish first so
|
|
# warnings/mutes appear before the chat reply
|
|
sentiment_cog = self.bot.get_cog("SentimentCog")
|
|
if sentiment_cog:
|
|
task = sentiment_cog._debounce_tasks.get(message.channel.id)
|
|
if task and not task.done():
|
|
try:
|
|
await asyncio.wait_for(asyncio.shield(task), timeout=15)
|
|
except (asyncio.TimeoutError, asyncio.CancelledError):
|
|
pass
|
|
|
|
await message.reply(main_reply, mention_author=False)
|
|
|
|
if afterthought:
|
|
await asyncio.sleep(random.uniform(2.0, 5.0))
|
|
await message.channel.send(afterthought)
|
|
|
|
# Fire-and-forget memory extraction
|
|
if not image_attachment:
|
|
asyncio.create_task(self._extract_and_save_memories(
|
|
message.author.id,
|
|
message.author.display_name,
|
|
list(self._chat_history[ch_id]),
|
|
))
|
|
|
|
reply_type = "proactive" if is_proactive else "chat"
|
|
logger.info(
|
|
"%s reply in #%s to %s: %s",
|
|
reply_type.capitalize(),
|
|
message.channel.name,
|
|
message.author.display_name,
|
|
main_reply[:100],
|
|
)
|
|
|
|
|
|
@commands.Cog.listener()
|
|
async def on_raw_reaction_add(self, payload: discord.RawReactionActionEvent):
|
|
# Ignore bot's own reactions
|
|
if payload.user_id == self.bot.user.id:
|
|
return
|
|
|
|
# 50% chance to reply
|
|
if random.random() > 0.50:
|
|
return
|
|
|
|
# Only react to reactions on the bot's own messages
|
|
channel = self.bot.get_channel(payload.channel_id)
|
|
if channel is None:
|
|
return
|
|
|
|
try:
|
|
message = await channel.fetch_message(payload.message_id)
|
|
except discord.HTTPException:
|
|
return
|
|
|
|
if message.author.id != self.bot.user.id:
|
|
return
|
|
|
|
# Get the user who reacted
|
|
guild = self.bot.get_guild(payload.guild_id) if payload.guild_id else None
|
|
if guild is None:
|
|
return
|
|
member = guild.get_member(payload.user_id)
|
|
if member is None:
|
|
return
|
|
|
|
emoji = str(payload.emoji)
|
|
|
|
# Build a one-shot prompt for the LLM
|
|
ch_id = channel.id
|
|
if ch_id not in self._chat_history:
|
|
self._chat_history[ch_id] = deque(maxlen=10)
|
|
|
|
context = (
|
|
f"[Server context: {member.display_name} — #{channel.name}]\n"
|
|
f"[{member.display_name} reacted to your message with {emoji}]\n"
|
|
f"[Your message was: {message.content[:300]}]\n"
|
|
f"{member.display_name}: *reacted {emoji}*"
|
|
)
|
|
|
|
self._chat_history[ch_id].append({"role": "user", "content": context})
|
|
active_prompt = self._get_active_prompt()
|
|
|
|
recent_bot_replies = [
|
|
m["content"][:150] for m in self._chat_history[ch_id]
|
|
if m["role"] == "assistant"
|
|
][-5:]
|
|
|
|
response = await self.bot.llm_chat.chat(
|
|
list(self._chat_history[ch_id]),
|
|
active_prompt,
|
|
recent_bot_replies=recent_bot_replies,
|
|
)
|
|
|
|
# Strip leaked metadata (same approach as main chat path)
|
|
if response:
|
|
segments = re.split(r"^\s*\[[^\]]*\]\s*$", response, flags=re.MULTILINE)
|
|
segments = [s.strip() for s in segments if s.strip()]
|
|
response = segments[-1] if segments else ""
|
|
|
|
if not response:
|
|
return
|
|
|
|
main_reply, afterthought = self._split_afterthought(response)
|
|
clean_for_history = f"{main_reply}\n{afterthought}" if afterthought else main_reply
|
|
self._chat_history[ch_id].append({"role": "assistant", "content": clean_for_history})
|
|
|
|
await channel.send(main_reply)
|
|
|
|
if afterthought:
|
|
await asyncio.sleep(random.uniform(2.0, 5.0))
|
|
await channel.send(afterthought)
|
|
|
|
logger.info(
|
|
"Reaction reply in #%s to %s (%s): %s",
|
|
channel.name,
|
|
member.display_name,
|
|
emoji,
|
|
main_reply[:100],
|
|
)
|
|
|
|
|
|
async def setup(bot: commands.Bot):
|
|
await bot.add_cog(ChatCog(bot))
|