Add a new cog that gives the bot ambient presence by reacting to messages with contextual emoji chosen by the triage LLM. Includes RNG gating and per-channel cooldown to keep reactions sparse and natural. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
302 lines
10 KiB
Python
302 lines
10 KiB
Python
import asyncio
|
|
import logging
|
|
import os
|
|
import signal
|
|
import socket
|
|
import sys
|
|
|
|
import discord
|
|
import yaml
|
|
from discord.ext import commands
|
|
from dotenv import load_dotenv
|
|
|
|
from utils.database import Database
|
|
from utils.drama_tracker import DramaTracker
|
|
from utils.llm_client import LLMClient
|
|
|
|
# Load .env
|
|
load_dotenv()
|
|
|
|
# Logging
|
|
os.makedirs("logs", exist_ok=True)
|
|
|
|
class SafeStreamHandler(logging.StreamHandler):
|
|
"""StreamHandler that replaces unencodable characters instead of crashing."""
|
|
def emit(self, record):
|
|
try:
|
|
msg = self.format(record)
|
|
stream = self.stream
|
|
stream.write(msg.encode(stream.encoding or "utf-8", errors="replace").decode(stream.encoding or "utf-8", errors="replace") + self.terminator)
|
|
self.flush()
|
|
except Exception:
|
|
self.handleError(record)
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s [%(name)s] %(levelname)s: %(message)s",
|
|
handlers=[
|
|
SafeStreamHandler(sys.stdout),
|
|
logging.FileHandler("logs/bcs.log", encoding="utf-8"),
|
|
],
|
|
)
|
|
logger = logging.getLogger("bcs")
|
|
|
|
|
|
def load_config() -> dict:
|
|
config_path = os.path.join(os.path.dirname(__file__), "config.yaml")
|
|
try:
|
|
with open(config_path, "r", encoding="utf-8") as f:
|
|
return yaml.safe_load(f) or {}
|
|
except FileNotFoundError:
|
|
logger.warning("config.yaml not found, using defaults.")
|
|
return {}
|
|
|
|
|
|
class BCSBot(commands.Bot):
|
|
def __init__(self, config: dict):
|
|
intents = discord.Intents.default()
|
|
intents.message_content = True
|
|
intents.members = True
|
|
|
|
super().__init__(
|
|
command_prefix=config.get("bot", {}).get("prefix", "!"),
|
|
intents=intents,
|
|
)
|
|
|
|
self.config = config
|
|
|
|
# Database (initialized async in setup_hook)
|
|
self.db = Database()
|
|
|
|
# Triage LLM (local Qwen on athena for cheap first-pass analysis)
|
|
llm_base_url = os.getenv("LLM_BASE_URL", "")
|
|
llm_model = os.getenv("LLM_MODEL", "gpt-4o-mini")
|
|
llm_api_key = os.getenv("LLM_API_KEY", "not-needed")
|
|
is_local = bool(llm_base_url)
|
|
self.llm = LLMClient(
|
|
llm_base_url, llm_model, llm_api_key, db=self.db,
|
|
no_think=is_local, concurrency=1 if is_local else 4,
|
|
)
|
|
|
|
# Heavy/escalation LLM (OpenAI for re-analysis, image roasts, commands)
|
|
esc_base_url = os.getenv("LLM_ESCALATION_BASE_URL", "")
|
|
esc_model = os.getenv("LLM_ESCALATION_MODEL", "gpt-4o")
|
|
esc_api_key = os.getenv("LLM_ESCALATION_API_KEY", llm_api_key)
|
|
esc_is_local = bool(esc_base_url)
|
|
self.llm_heavy = LLMClient(
|
|
esc_base_url, esc_model, esc_api_key, db=self.db,
|
|
no_think=esc_is_local, concurrency=1 if esc_is_local else 4,
|
|
)
|
|
|
|
# Chat LLM (dedicated model for chat/roasts — defaults to llm_heavy)
|
|
chat_model = os.getenv("LLM_CHAT_MODEL", "")
|
|
chat_api_key = os.getenv("LLM_CHAT_API_KEY", esc_api_key)
|
|
chat_base_url = os.getenv("LLM_CHAT_BASE_URL", esc_base_url)
|
|
if chat_model:
|
|
chat_is_local = bool(chat_base_url)
|
|
self.llm_chat = LLMClient(
|
|
chat_base_url, chat_model, chat_api_key, db=self.db,
|
|
no_think=chat_is_local, concurrency=4,
|
|
)
|
|
else:
|
|
self.llm_chat = self.llm_heavy
|
|
|
|
# Active mode (server-wide)
|
|
modes_config = config.get("modes", {})
|
|
self.current_mode = modes_config.get("default_mode", "default")
|
|
|
|
# Drama tracker
|
|
sentiment = config.get("sentiment", {})
|
|
timeouts = config.get("timeouts", {})
|
|
self.drama_tracker = DramaTracker(
|
|
window_size=sentiment.get("rolling_window_size", 10),
|
|
window_minutes=sentiment.get("rolling_window_minutes", 15),
|
|
offense_reset_minutes=timeouts.get("offense_reset_minutes", 120),
|
|
warning_expiration_minutes=timeouts.get("warning_expiration_minutes", 30),
|
|
)
|
|
|
|
def get_mode_config(self) -> dict:
|
|
"""Return the config dict for the currently active mode."""
|
|
modes = self.config.get("modes", {})
|
|
return modes.get(self.current_mode, modes.get("default", {}))
|
|
|
|
async def setup_hook(self):
|
|
# Initialize database and hydrate DramaTracker
|
|
db_ok = await self.db.init()
|
|
if db_ok:
|
|
states = await self.db.load_all_user_states()
|
|
loaded = self.drama_tracker.load_user_states(states)
|
|
logger.info("Loaded %d user states from database.", loaded)
|
|
|
|
# Restore saved mode
|
|
saved_mode = await self.db.load_setting("current_mode")
|
|
if saved_mode:
|
|
modes = self.config.get("modes", {})
|
|
if saved_mode in modes and isinstance(modes.get(saved_mode), dict):
|
|
self.current_mode = saved_mode
|
|
logger.info("Restored saved mode: %s", saved_mode)
|
|
|
|
await self.load_extension("cogs.sentiment")
|
|
await self.load_extension("cogs.commands")
|
|
await self.load_extension("cogs.chat")
|
|
await self.load_extension("cogs.reactions")
|
|
|
|
# Global sync as fallback; guild-specific sync happens in on_ready
|
|
await self.tree.sync()
|
|
logger.info("Slash commands synced (global).")
|
|
|
|
# Quick connectivity check
|
|
try:
|
|
await self.llm._client.chat.completions.create(
|
|
model=self.llm.model,
|
|
messages=[{"role": "user", "content": "hi"}],
|
|
max_completion_tokens=16,
|
|
)
|
|
logger.info("LLM connectivity check passed.")
|
|
except Exception as e:
|
|
logger.warning("LLM connectivity check failed: %s", e)
|
|
|
|
async def on_message(self, message: discord.Message):
|
|
logger.info(
|
|
"EVENT on_message from %s in #%s: %s",
|
|
message.author,
|
|
getattr(message.channel, "name", "DM"),
|
|
message.content[:80] if message.content else "(empty)",
|
|
)
|
|
await self.process_commands(message)
|
|
|
|
async def on_ready(self):
|
|
logger.info("Logged in as %s (ID: %d)", self.user, self.user.id)
|
|
|
|
# Guild-specific command sync for instant propagation
|
|
for guild in self.guilds:
|
|
try:
|
|
self.tree.copy_global_to(guild=guild)
|
|
await self.tree.sync(guild=guild)
|
|
logger.info("Slash commands synced to guild %s.", guild.name)
|
|
except Exception:
|
|
logger.exception("Failed to sync commands to guild %s", guild.name)
|
|
|
|
# Set status based on active mode
|
|
mode_config = self.get_mode_config()
|
|
status_text = mode_config.get("description") or self.config.get("bot", {}).get(
|
|
"status", "Monitoring vibes..."
|
|
)
|
|
await self.change_presence(
|
|
activity=discord.Activity(
|
|
type=discord.ActivityType.watching, name=status_text
|
|
)
|
|
)
|
|
|
|
# Check permissions in monitored channels
|
|
monitored = self.config.get("monitoring", {}).get("channels", [])
|
|
channels = (
|
|
[self.get_channel(ch_id) for ch_id in monitored]
|
|
if monitored
|
|
else [
|
|
ch
|
|
for guild in self.guilds
|
|
for ch in guild.text_channels
|
|
]
|
|
)
|
|
|
|
for ch in channels:
|
|
if ch is None:
|
|
continue
|
|
perms = ch.permissions_for(ch.guild.me)
|
|
missing = []
|
|
if not perms.send_messages:
|
|
missing.append("Send Messages")
|
|
if not perms.add_reactions:
|
|
missing.append("Add Reactions")
|
|
if not perms.moderate_members:
|
|
missing.append("Moderate Members")
|
|
if not perms.read_messages:
|
|
missing.append("Read Messages")
|
|
if missing:
|
|
logger.warning(
|
|
"Missing permissions in #%s (%s): %s",
|
|
ch.name,
|
|
ch.guild.name,
|
|
", ".join(missing),
|
|
)
|
|
|
|
# Start memory pruning background task
|
|
if not hasattr(self, "_memory_prune_task") or self._memory_prune_task.done():
|
|
self._memory_prune_task = asyncio.create_task(self._prune_memories_loop())
|
|
|
|
async def _prune_memories_loop(self):
|
|
"""Background task that prunes expired memories every 6 hours."""
|
|
await self.wait_until_ready()
|
|
while not self.is_closed():
|
|
try:
|
|
count = await self.db.prune_expired_memories()
|
|
if count > 0:
|
|
logger.info("Pruned %d expired memories.", count)
|
|
except Exception:
|
|
logger.exception("Memory pruning error")
|
|
await asyncio.sleep(6 * 3600) # Every 6 hours
|
|
|
|
async def close(self):
|
|
await self.db.close()
|
|
await self.llm.close()
|
|
await self.llm_heavy.close()
|
|
if self.llm_chat is not self.llm_heavy:
|
|
await self.llm_chat.close()
|
|
await super().close()
|
|
|
|
|
|
def acquire_instance_lock(port: int = 39821) -> socket.socket | None:
|
|
"""Bind a TCP port as a single-instance lock. Returns the socket or None if already locked."""
|
|
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
try:
|
|
sock.bind(("127.0.0.1", port))
|
|
sock.listen(1)
|
|
return sock
|
|
except OSError:
|
|
sock.close()
|
|
return None
|
|
|
|
|
|
async def main():
|
|
# Single-instance guard — exit if another instance is already running
|
|
lock_port = int(os.getenv("BCS_LOCK_PORT", "39821"))
|
|
lock_sock = acquire_instance_lock(lock_port)
|
|
if lock_sock is None:
|
|
logger.error("Another BCS instance is already running (port %d in use). Exiting.", lock_port)
|
|
sys.exit(1)
|
|
logger.info("Instance lock acquired on port %d.", lock_port)
|
|
|
|
config = load_config()
|
|
token = os.getenv("DISCORD_BOT_TOKEN")
|
|
|
|
if not token:
|
|
logger.error("DISCORD_BOT_TOKEN not set. Check your .env file.")
|
|
sys.exit(1)
|
|
|
|
bot = BCSBot(config)
|
|
|
|
# Graceful shutdown
|
|
loop = asyncio.get_event_loop()
|
|
|
|
def _signal_handler():
|
|
logger.info("Shutdown signal received.")
|
|
asyncio.ensure_future(bot.close())
|
|
|
|
for sig in (signal.SIGINT, signal.SIGTERM):
|
|
try:
|
|
loop.add_signal_handler(sig, _signal_handler)
|
|
except NotImplementedError:
|
|
# Windows doesn't support add_signal_handler
|
|
pass
|
|
|
|
try:
|
|
async with bot:
|
|
await bot.start(token)
|
|
finally:
|
|
lock_sock.close()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main())
|