Files
Breehavior-Monitor/cogs/chat.py
AJ Isaacs 365907a7a0 feat: extract and save memories after chat conversations
Merge worktree: adds _extract_and_save_memories() method and fire-and-forget
extraction call after each chat reply. Combined with Task 4's memory
retrieval and injection.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-26 13:04:12 -05:00

501 lines
19 KiB
Python

import asyncio
import logging
import random
import re
from collections import deque
from datetime import datetime, timedelta, timezone
from pathlib import Path
import discord
from discord.ext import commands
logger = logging.getLogger("bcs.chat")
_PROMPTS_DIR = Path(__file__).resolve().parent.parent / "prompts"
IMAGE_ROAST = (_PROMPTS_DIR / "scoreboard_roast.txt").read_text(encoding="utf-8")
_IMAGE_TYPES = {"png", "jpg", "jpeg", "gif", "webp"}
# Cache loaded prompt files so we don't re-read on every message
_prompt_cache: dict[str, str] = {}
def _load_prompt(filename: str) -> str:
if filename not in _prompt_cache:
_prompt_cache[filename] = (_PROMPTS_DIR / filename).read_text(encoding="utf-8")
return _prompt_cache[filename]
_TOPIC_KEYWORDS = {
"gta", "warzone", "cod", "battlefield", "fortnite", "apex", "valorant",
"minecraft", "roblox", "league", "dota", "overwatch", "destiny", "halo",
"work", "job", "school", "college", "girlfriend", "boyfriend", "wife",
"husband", "dog", "cat", "pet", "car", "music", "movie", "food",
}
_GENERIC_CHANNELS = {"general", "off-topic", "memes"}
def _extract_topic_keywords(text: str, channel_name: str) -> list[str]:
"""Extract topic keywords from message text and channel name."""
words = set(text.lower().split()) & _TOPIC_KEYWORDS
if channel_name.lower() not in _GENERIC_CHANNELS:
words.add(channel_name.lower())
return list(words)[:5]
def _format_relative_time(dt: datetime) -> str:
"""Return a human-readable relative time string."""
now = datetime.now(timezone.utc)
# Ensure dt is timezone-aware
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
delta = now - dt
seconds = int(delta.total_seconds())
if seconds < 60:
return "just now"
minutes = seconds // 60
if minutes < 60:
return f"{minutes}m ago"
hours = minutes // 60
if hours < 24:
return f"{hours}h ago"
days = hours // 24
if days == 1:
return "yesterday"
if days < 7:
return f"{days} days ago"
weeks = days // 7
if weeks < 5:
return f"{weeks}w ago"
months = days // 30
return f"{months}mo ago"
class ChatCog(commands.Cog):
def __init__(self, bot: commands.Bot):
self.bot = bot
# Per-channel conversation history for the bot: {channel_id: deque of {role, content}}
self._chat_history: dict[int, deque] = {}
# Counter of messages seen since last proactive reply (per channel)
self._messages_since_reply: dict[int, int] = {}
# Users whose profile has been updated and needs DB flush
self._dirty_users: set[int] = set()
def _get_active_prompt(self) -> str:
"""Load the chat prompt for the current mode."""
mode_config = self.bot.get_mode_config()
prompt_file = mode_config.get("prompt_file", "chat_personality.txt")
return _load_prompt(prompt_file)
async def _build_memory_context(self, user_id: int, message_text: str, channel_name: str) -> str:
"""Build a layered memory context block for the chat prompt."""
lines = []
# Layer 1: Profile (always)
profile = self.bot.drama_tracker.get_user_notes(user_id)
if profile:
lines.append(f"Profile: {profile}")
# Layer 2: Recent memories (last 5)
recent_memories = await self.bot.db.get_recent_memories(user_id, limit=5)
if recent_memories:
parts = []
for mem in recent_memories:
time_str = _format_relative_time(mem["created_at"])
parts.append(f"{mem['memory']} ({time_str})")
lines.append("Recent: " + " | ".join(parts))
# Layer 3: Topic-matched memories (deduplicated against recent)
keywords = _extract_topic_keywords(message_text, channel_name)
if keywords:
topic_memories = await self.bot.db.get_memories_by_topics(user_id, keywords, limit=5)
# Deduplicate against recent memories
recent_texts = {mem["memory"] for mem in recent_memories} if recent_memories else set()
unique_topic = [mem for mem in topic_memories if mem["memory"] not in recent_texts]
if unique_topic:
parts = []
for mem in unique_topic:
time_str = _format_relative_time(mem["created_at"])
parts.append(f"{mem['memory']} ({time_str})")
lines.append("Relevant: " + " | ".join(parts))
if not lines:
return ""
return "[What you know about this person:]\n" + "\n".join(lines)
async def _extract_and_save_memories(
self, user_id: int, username: str, conversation: list[dict[str, str]],
) -> None:
"""Background task: extract memories from conversation and save them."""
try:
current_profile = self.bot.drama_tracker.get_user_notes(user_id)
result = await self.bot.llm.extract_memories(
conversation, username, current_profile,
)
if not result:
return
# Save expiring memories
for mem in result.get("memories", []):
if mem["expiration"] == "permanent":
continue # permanent facts go into profile_update
exp_days = {"1d": 1, "3d": 3, "7d": 7, "30d": 30}
days = exp_days.get(mem["expiration"], 7)
expires_at = datetime.now(timezone.utc) + timedelta(days=days)
await self.bot.db.save_memory(
user_id=user_id,
memory=mem["memory"],
topics=",".join(mem["topics"]),
importance=mem["importance"],
expires_at=expires_at,
source="chat",
)
# Prune if over cap
await self.bot.db.prune_excess_memories(user_id)
# Update profile if warranted
profile_update = result.get("profile_update")
if profile_update:
self.bot.drama_tracker.set_user_profile(user_id, profile_update)
self._dirty_users.add(user_id)
logger.info(
"Extracted %d memories for %s (profile_update=%s)",
len(result.get("memories", [])),
username,
bool(profile_update),
)
except Exception:
logger.exception("Failed to extract memories for %s", username)
@commands.Cog.listener()
async def on_message(self, message: discord.Message):
if message.author.bot:
return
if not message.guild:
return
should_reply = False
is_proactive = False
reply_context = "" # Text of the message being replied to
# Check if bot is @mentioned
if self.bot.user in message.mentions:
should_reply = True
# Check if replying to a message
if message.reference and message.reference.message_id:
try:
ref_msg = message.reference.cached_message
if ref_msg is None:
ref_msg = await message.channel.fetch_message(
message.reference.message_id
)
if ref_msg.author.id == self.bot.user.id:
# Replying to the bot's own message — continue conversation
should_reply = True
if ref_msg.content:
reply_context = f"[Replying to bot's message: {ref_msg.content[:300]}]\n"
elif should_reply:
# @mentioned the bot while replying to someone else — include that message
ref_text = ref_msg.content[:500] if ref_msg.content else "(no text)"
reply_context = f"[{ref_msg.author.display_name} said: {ref_text}]\n"
except discord.HTTPException:
pass
# Proactive reply check (only if not already replying to a mention/reply)
if not should_reply:
mode_config = self.bot.get_mode_config()
if mode_config.get("proactive_replies", False):
ch_id = message.channel.id
self._messages_since_reply[ch_id] = self._messages_since_reply.get(ch_id, 0) + 1
cooldown = self.bot.config.get("modes", {}).get("proactive_cooldown_messages", 5)
reply_chance = mode_config.get("reply_chance", 0.0)
if (
self._messages_since_reply[ch_id] >= cooldown
and reply_chance > 0
and random.random() < reply_chance
and message.content and message.content.strip()
):
should_reply = True
is_proactive = True
if not should_reply:
return
# Build conversation context
ch_id = message.channel.id
if ch_id not in self._chat_history:
self._chat_history[ch_id] = deque(maxlen=10)
# Clean the mention out of the message content
content = message.content.replace(f"<@{self.bot.user.id}>", "").strip()
# Check for image attachments (on this message or the referenced message)
image_attachment = None
for att in message.attachments:
ext = att.filename.rsplit(".", 1)[-1].lower() if "." in att.filename else ""
if ext in _IMAGE_TYPES:
image_attachment = att
break
if not image_attachment and message.reference:
try:
ref = message.reference.cached_message or await message.channel.fetch_message(
message.reference.message_id
)
for att in ref.attachments:
ext = att.filename.rsplit(".", 1)[-1].lower() if "." in att.filename else ""
if ext in _IMAGE_TYPES:
image_attachment = att
break
except discord.HTTPException:
pass
typing_ctx = None
async def start_typing():
nonlocal typing_ctx
typing_ctx = message.channel.typing()
await typing_ctx.__aenter__()
if image_attachment:
# --- Image path: roast the image ---
image_bytes = await image_attachment.read()
user_text = content if content else ""
logger.info(
"Image roast request in #%s from %s (%s, %s)",
message.channel.name,
message.author.display_name,
image_attachment.filename,
user_text[:80],
)
response = await self.bot.llm_heavy.analyze_image(
image_bytes,
IMAGE_ROAST,
user_text=user_text,
on_first_token=start_typing,
)
else:
# --- Text-only path: normal chat ---
if not content:
content = "(just pinged me)" if not is_proactive else message.content
# If a mention scan is running, await it so we can include findings
scan_summary = ""
if self.bot.user in message.mentions:
sentiment_cog = self.bot.get_cog("SentimentCog")
if sentiment_cog:
task = sentiment_cog._mention_scan_tasks.get(message.channel.id)
if task and not task.done():
try:
await asyncio.wait_for(asyncio.shield(task), timeout=45)
except (asyncio.TimeoutError, asyncio.CancelledError):
pass
scan_summary = sentiment_cog._mention_scan_results.pop(message.id, "")
# Add drama score context only when noteworthy
drama_score = self.bot.drama_tracker.get_drama_score(message.author.id)
user_data = self.bot.drama_tracker.get_user(message.author.id)
context_parts = [f"#{message.channel.name}"]
if drama_score >= 0.2:
context_parts.append(f"drama score {drama_score:.2f}/1.0")
if user_data.offense_count > 0:
context_parts.append(f"{user_data.offense_count} offense(s)")
score_context = f"[Server context: {message.author.display_name}{', '.join(context_parts)}]"
# Gather memory context and recent messages for richer context
extra_context = ""
memory_context = await self._build_memory_context(
message.author.id, content, message.channel.name,
)
if memory_context:
extra_context += memory_context + "\n"
# Include mention scan findings if available
if scan_summary:
extra_context += f"[You just scanned recent chat. Results: {scan_summary}]\n"
recent_user_msgs = []
try:
async for msg in message.channel.history(limit=50, before=message):
if msg.author.id == message.author.id and msg.content and msg.content.strip():
recent_user_msgs.append(msg.content[:200])
if len(recent_user_msgs) >= 10:
break
except discord.HTTPException:
pass
if recent_user_msgs:
recent_lines = "\n".join(f"- {m}" for m in reversed(recent_user_msgs))
extra_context += f"[{message.author.display_name}'s recent messages:\n{recent_lines}]\n"
self._chat_history[ch_id].append(
{"role": "user", "content": f"{score_context}\n{extra_context}{reply_context}{message.author.display_name}: {content}"}
)
active_prompt = self._get_active_prompt()
# Collect recent bot replies so the LLM can avoid repeating itself
recent_bot_replies = [
m["content"][:150] for m in self._chat_history[ch_id]
if m["role"] == "assistant"
][-5:]
response = await self.bot.llm_chat.chat(
list(self._chat_history[ch_id]),
active_prompt,
on_first_token=start_typing,
recent_bot_replies=recent_bot_replies,
)
if typing_ctx:
await typing_ctx.__aexit__(None, None, None)
# Strip leaked metadata the LLM may echo back.
# The LLM often dumps paraphrased context and style labels in [brackets]
# before/between its actual answer. Split on those bracket lines and
# keep only the last non-empty segment — the real roast is always last.
if response:
segments = re.split(r"^\s*\[[^\]]*\]\s*$", response, flags=re.MULTILINE)
segments = [s.strip() for s in segments if s.strip()]
response = segments[-1] if segments else ""
if not response:
log_channel = discord.utils.get(message.guild.text_channels, name="bcs-log")
if log_channel:
try:
await log_channel.send(
f"**LLM OFFLINE** | Failed to generate reply to "
f"{message.author.mention} in #{message.channel.name}"
)
except discord.HTTPException:
pass
logger.warning("LLM returned no response for %s in #%s", message.author, message.channel.name)
return
if not image_attachment:
self._chat_history[ch_id].append(
{"role": "assistant", "content": response}
)
# Reset proactive cooldown counter for this channel
if is_proactive:
self._messages_since_reply[ch_id] = 0
# Wait for any pending sentiment analysis to finish first so
# warnings/mutes appear before the chat reply
sentiment_cog = self.bot.get_cog("SentimentCog")
if sentiment_cog:
task = sentiment_cog._debounce_tasks.get(message.channel.id)
if task and not task.done():
try:
await asyncio.wait_for(asyncio.shield(task), timeout=15)
except (asyncio.TimeoutError, asyncio.CancelledError):
pass
await message.reply(response, mention_author=False)
# Fire-and-forget memory extraction
if not image_attachment:
asyncio.create_task(self._extract_and_save_memories(
message.author.id,
message.author.display_name,
list(self._chat_history[ch_id]),
))
reply_type = "proactive" if is_proactive else "chat"
logger.info(
"%s reply in #%s to %s: %s",
reply_type.capitalize(),
message.channel.name,
message.author.display_name,
response[:100],
)
@commands.Cog.listener()
async def on_raw_reaction_add(self, payload: discord.RawReactionActionEvent):
# Ignore bot's own reactions
if payload.user_id == self.bot.user.id:
return
# 50% chance to reply
if random.random() > 0.50:
return
# Only react to reactions on the bot's own messages
channel = self.bot.get_channel(payload.channel_id)
if channel is None:
return
try:
message = await channel.fetch_message(payload.message_id)
except discord.HTTPException:
return
if message.author.id != self.bot.user.id:
return
# Get the user who reacted
guild = self.bot.get_guild(payload.guild_id) if payload.guild_id else None
if guild is None:
return
member = guild.get_member(payload.user_id)
if member is None:
return
emoji = str(payload.emoji)
# Build a one-shot prompt for the LLM
ch_id = channel.id
if ch_id not in self._chat_history:
self._chat_history[ch_id] = deque(maxlen=10)
context = (
f"[Server context: {member.display_name} — #{channel.name}]\n"
f"[{member.display_name} reacted to your message with {emoji}]\n"
f"[Your message was: {message.content[:300]}]\n"
f"{member.display_name}: *reacted {emoji}*"
)
self._chat_history[ch_id].append({"role": "user", "content": context})
active_prompt = self._get_active_prompt()
recent_bot_replies = [
m["content"][:150] for m in self._chat_history[ch_id]
if m["role"] == "assistant"
][-5:]
response = await self.bot.llm_chat.chat(
list(self._chat_history[ch_id]),
active_prompt,
recent_bot_replies=recent_bot_replies,
)
# Strip leaked metadata (same approach as main chat path)
if response:
segments = re.split(r"^\s*\[[^\]]*\]\s*$", response, flags=re.MULTILINE)
segments = [s.strip() for s in segments if s.strip()]
response = segments[-1] if segments else ""
if not response:
return
self._chat_history[ch_id].append({"role": "assistant", "content": response})
await channel.send(response)
logger.info(
"Reaction reply in #%s to %s (%s): %s",
channel.name,
member.display_name,
emoji,
response[:100],
)
async def setup(bot: commands.Bot):
await bot.add_cog(ChatCog(bot))