Send as a channel message instead of message.reply() so it doesn't look like the bot is talking to itself. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
317 lines
12 KiB
Python
317 lines
12 KiB
Python
import asyncio
|
|
import logging
|
|
import random
|
|
import re
|
|
from collections import deque
|
|
from pathlib import Path
|
|
|
|
import discord
|
|
from discord.ext import commands
|
|
|
|
logger = logging.getLogger("bcs.chat")
|
|
|
|
_PROMPTS_DIR = Path(__file__).resolve().parent.parent / "prompts"
|
|
IMAGE_ROAST = (_PROMPTS_DIR / "scoreboard_roast.txt").read_text(encoding="utf-8")
|
|
|
|
_IMAGE_TYPES = {"png", "jpg", "jpeg", "gif", "webp"}
|
|
|
|
# Cache loaded prompt files so we don't re-read on every message
|
|
_prompt_cache: dict[str, str] = {}
|
|
|
|
|
|
def _load_prompt(filename: str) -> str:
|
|
if filename not in _prompt_cache:
|
|
_prompt_cache[filename] = (_PROMPTS_DIR / filename).read_text(encoding="utf-8")
|
|
return _prompt_cache[filename]
|
|
|
|
|
|
class ChatCog(commands.Cog):
|
|
def __init__(self, bot: commands.Bot):
|
|
self.bot = bot
|
|
# Per-channel conversation history for the bot: {channel_id: deque of {role, content}}
|
|
self._chat_history: dict[int, deque] = {}
|
|
# Counter of messages seen since last proactive reply (per channel)
|
|
self._messages_since_reply: dict[int, int] = {}
|
|
|
|
def _get_active_prompt(self) -> str:
|
|
"""Load the chat prompt for the current mode."""
|
|
mode_config = self.bot.get_mode_config()
|
|
prompt_file = mode_config.get("prompt_file", "chat_personality.txt")
|
|
return _load_prompt(prompt_file)
|
|
|
|
@commands.Cog.listener()
|
|
async def on_message(self, message: discord.Message):
|
|
if message.author.bot:
|
|
return
|
|
|
|
if not message.guild:
|
|
return
|
|
|
|
should_reply = False
|
|
is_proactive = False
|
|
reply_context = "" # Text of the message being replied to
|
|
|
|
# Check if bot is @mentioned
|
|
if self.bot.user in message.mentions:
|
|
should_reply = True
|
|
|
|
# Check if replying to a message
|
|
if message.reference and message.reference.message_id:
|
|
try:
|
|
ref_msg = message.reference.cached_message
|
|
if ref_msg is None:
|
|
ref_msg = await message.channel.fetch_message(
|
|
message.reference.message_id
|
|
)
|
|
if ref_msg.author.id == self.bot.user.id:
|
|
# Replying to the bot's own message — continue conversation
|
|
should_reply = True
|
|
if ref_msg.content:
|
|
reply_context = f"[Replying to bot's message: {ref_msg.content[:300]}]\n"
|
|
elif should_reply:
|
|
# @mentioned the bot while replying to someone else — include that message
|
|
ref_text = ref_msg.content[:500] if ref_msg.content else "(no text)"
|
|
reply_context = f"[{ref_msg.author.display_name} said: {ref_text}]\n"
|
|
except discord.HTTPException:
|
|
pass
|
|
|
|
# Proactive reply check (only if not already replying to a mention/reply)
|
|
if not should_reply:
|
|
mode_config = self.bot.get_mode_config()
|
|
if mode_config.get("proactive_replies", False):
|
|
ch_id = message.channel.id
|
|
self._messages_since_reply[ch_id] = self._messages_since_reply.get(ch_id, 0) + 1
|
|
cooldown = self.bot.config.get("modes", {}).get("proactive_cooldown_messages", 5)
|
|
reply_chance = mode_config.get("reply_chance", 0.0)
|
|
|
|
if (
|
|
self._messages_since_reply[ch_id] >= cooldown
|
|
and reply_chance > 0
|
|
and random.random() < reply_chance
|
|
and message.content and message.content.strip()
|
|
):
|
|
should_reply = True
|
|
is_proactive = True
|
|
|
|
if not should_reply:
|
|
return
|
|
|
|
# Build conversation context
|
|
ch_id = message.channel.id
|
|
if ch_id not in self._chat_history:
|
|
self._chat_history[ch_id] = deque(maxlen=10)
|
|
|
|
# Clean the mention out of the message content
|
|
content = message.content.replace(f"<@{self.bot.user.id}>", "").strip()
|
|
|
|
# Check for image attachments (on this message or the referenced message)
|
|
image_attachment = None
|
|
for att in message.attachments:
|
|
ext = att.filename.rsplit(".", 1)[-1].lower() if "." in att.filename else ""
|
|
if ext in _IMAGE_TYPES:
|
|
image_attachment = att
|
|
break
|
|
if not image_attachment and message.reference:
|
|
try:
|
|
ref = message.reference.cached_message or await message.channel.fetch_message(
|
|
message.reference.message_id
|
|
)
|
|
for att in ref.attachments:
|
|
ext = att.filename.rsplit(".", 1)[-1].lower() if "." in att.filename else ""
|
|
if ext in _IMAGE_TYPES:
|
|
image_attachment = att
|
|
break
|
|
except discord.HTTPException:
|
|
pass
|
|
|
|
typing_ctx = None
|
|
|
|
async def start_typing():
|
|
nonlocal typing_ctx
|
|
typing_ctx = message.channel.typing()
|
|
await typing_ctx.__aenter__()
|
|
|
|
if image_attachment:
|
|
# --- Image path: roast the image ---
|
|
image_bytes = await image_attachment.read()
|
|
user_text = content if content else ""
|
|
logger.info(
|
|
"Image roast request in #%s from %s (%s, %s)",
|
|
message.channel.name,
|
|
message.author.display_name,
|
|
image_attachment.filename,
|
|
user_text[:80],
|
|
)
|
|
response = await self.bot.llm_heavy.analyze_image(
|
|
image_bytes,
|
|
IMAGE_ROAST,
|
|
user_text=user_text,
|
|
on_first_token=start_typing,
|
|
)
|
|
else:
|
|
# --- Text-only path: normal chat ---
|
|
if not content:
|
|
content = "(just pinged me)" if not is_proactive else message.content
|
|
|
|
# Add drama score context only when noteworthy
|
|
drama_score = self.bot.drama_tracker.get_drama_score(message.author.id)
|
|
user_data = self.bot.drama_tracker.get_user(message.author.id)
|
|
context_parts = [f"#{message.channel.name}"]
|
|
if drama_score >= 0.2:
|
|
context_parts.append(f"drama score {drama_score:.2f}/1.0")
|
|
if user_data.offense_count > 0:
|
|
context_parts.append(f"{user_data.offense_count} offense(s)")
|
|
score_context = f"[Server context: {message.author.display_name} — {', '.join(context_parts)}]"
|
|
|
|
self._chat_history[ch_id].append(
|
|
{"role": "user", "content": f"{score_context}\n{reply_context}{message.author.display_name}: {content}"}
|
|
)
|
|
|
|
active_prompt = self._get_active_prompt()
|
|
|
|
response = await self.bot.llm.chat(
|
|
list(self._chat_history[ch_id]),
|
|
active_prompt,
|
|
on_first_token=start_typing,
|
|
)
|
|
|
|
if typing_ctx:
|
|
await typing_ctx.__aexit__(None, None, None)
|
|
|
|
# Strip leaked metadata the LLM may echo back
|
|
if response:
|
|
response = re.sub(r"\[Server context:[^\]]*\]\n?", "", response)
|
|
response = re.sub(r"\[Replying to bot's message:[^\]]*\]\n?", "", response)
|
|
response = re.sub(r"\[\w[\w ]* said:[^\]]*\]\n?", "", response)
|
|
# Catch reformatted metadata (LLM drops prefix but keeps content)
|
|
response = re.sub(r"\[[^\]]*#[a-z-]+[^\]]*(?:drama score|offense)[^\]]*\]\n?", "", response, flags=re.IGNORECASE)
|
|
response = response.strip()
|
|
|
|
if not response:
|
|
log_channel = discord.utils.get(message.guild.text_channels, name="bcs-log")
|
|
if log_channel:
|
|
try:
|
|
await log_channel.send(
|
|
f"**LLM OFFLINE** | Failed to generate reply to "
|
|
f"{message.author.mention} in #{message.channel.name}"
|
|
)
|
|
except discord.HTTPException:
|
|
pass
|
|
logger.warning("LLM returned no response for %s in #%s", message.author, message.channel.name)
|
|
return
|
|
|
|
if not image_attachment:
|
|
self._chat_history[ch_id].append(
|
|
{"role": "assistant", "content": response}
|
|
)
|
|
|
|
# Reset proactive cooldown counter for this channel
|
|
if is_proactive:
|
|
self._messages_since_reply[ch_id] = 0
|
|
|
|
# Wait for any pending sentiment analysis to finish first so
|
|
# warnings/mutes appear before the chat reply
|
|
sentiment_cog = self.bot.get_cog("SentimentCog")
|
|
if sentiment_cog:
|
|
key = (message.channel.id, message.author.id)
|
|
task = sentiment_cog._debounce_tasks.get(key)
|
|
if task and not task.done():
|
|
try:
|
|
await asyncio.wait_for(asyncio.shield(task), timeout=15)
|
|
except (asyncio.TimeoutError, asyncio.CancelledError):
|
|
pass
|
|
|
|
if is_proactive:
|
|
await message.channel.send(response)
|
|
else:
|
|
await message.reply(response, mention_author=False)
|
|
|
|
reply_type = "proactive" if is_proactive else "chat"
|
|
logger.info(
|
|
"%s reply in #%s to %s: %s",
|
|
reply_type.capitalize(),
|
|
message.channel.name,
|
|
message.author.display_name,
|
|
response[:100],
|
|
)
|
|
|
|
|
|
@commands.Cog.listener()
|
|
async def on_raw_reaction_add(self, payload: discord.RawReactionActionEvent):
|
|
# Ignore bot's own reactions
|
|
if payload.user_id == self.bot.user.id:
|
|
return
|
|
|
|
# 50% chance to reply
|
|
if random.random() > 0.50:
|
|
return
|
|
|
|
# Only react to reactions on the bot's own messages
|
|
channel = self.bot.get_channel(payload.channel_id)
|
|
if channel is None:
|
|
return
|
|
|
|
try:
|
|
message = await channel.fetch_message(payload.message_id)
|
|
except discord.HTTPException:
|
|
return
|
|
|
|
if message.author.id != self.bot.user.id:
|
|
return
|
|
|
|
# Get the user who reacted
|
|
guild = self.bot.get_guild(payload.guild_id) if payload.guild_id else None
|
|
if guild is None:
|
|
return
|
|
member = guild.get_member(payload.user_id)
|
|
if member is None:
|
|
return
|
|
|
|
emoji = str(payload.emoji)
|
|
|
|
# Build a one-shot prompt for the LLM
|
|
ch_id = channel.id
|
|
if ch_id not in self._chat_history:
|
|
self._chat_history[ch_id] = deque(maxlen=10)
|
|
|
|
context = (
|
|
f"[Server context: {member.display_name} — #{channel.name}]\n"
|
|
f"[{member.display_name} reacted to your message with {emoji}]\n"
|
|
f"[Your message was: {message.content[:300]}]\n"
|
|
f"{member.display_name}: *reacted {emoji}*"
|
|
)
|
|
|
|
self._chat_history[ch_id].append({"role": "user", "content": context})
|
|
active_prompt = self._get_active_prompt()
|
|
|
|
response = await self.bot.llm.chat(
|
|
list(self._chat_history[ch_id]),
|
|
active_prompt,
|
|
)
|
|
|
|
# Strip leaked metadata
|
|
if response:
|
|
response = re.sub(r"\[Server context:[^\]]*\]\n?", "", response)
|
|
response = re.sub(r"\[.*?reacted to your message.*?\]\n?", "", response)
|
|
response = re.sub(r"\[Your message was:.*?\]\n?", "", response)
|
|
response = re.sub(r"\[[^\]]*#[a-z-]+[^\]]*(?:drama score|offense)[^\]]*\]\n?", "", response, flags=re.IGNORECASE)
|
|
response = response.strip()
|
|
|
|
if not response:
|
|
return
|
|
|
|
self._chat_history[ch_id].append({"role": "assistant", "content": response})
|
|
|
|
await channel.send(response)
|
|
logger.info(
|
|
"Reaction reply in #%s to %s (%s): %s",
|
|
channel.name,
|
|
member.display_name,
|
|
emoji,
|
|
response[:100],
|
|
)
|
|
|
|
|
|
async def setup(bot: commands.Bot):
|
|
await bot.add_cog(ChatCog(bot))
|