refactor: extract sentiment cog into package with shared _process_finding

Convert cogs/sentiment.py (1050 lines) into cogs/sentiment/ package:
- __init__.py (656 lines): core SentimentCog with new _process_finding()
  that deduplicates the per-user finding loop from _process_buffered and
  _run_mention_scan (~90 lines each → single shared method)
- actions.py: mute_user, warn_user
- topic_drift.py: handle_topic_drift
- channel_redirect.py: handle_channel_redirect, build_channel_context
- coherence.py: handle_coherence_alert
- log_utils.py: log_analysis, log_action, score_color
- state.py: save_user_state, flush_dirty_states

All extracted modules use plain async functions (not methods) receiving
bot/config as parameters. Named log_utils.py to avoid shadowing stdlib
logging. Also update CLAUDE.md with comprehensive project documentation.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-02-25 17:06:27 -05:00
parent 188370b1fd
commit 660086a500
9 changed files with 1209 additions and 1121 deletions

File diff suppressed because it is too large Load Diff

656
cogs/sentiment/__init__.py Normal file
View File

@@ -0,0 +1,656 @@
import asyncio
import logging
from datetime import datetime, timezone
import discord
from discord.ext import commands, tasks
from cogs.sentiment.actions import mute_user, warn_user
from cogs.sentiment.channel_redirect import build_channel_context, handle_channel_redirect
from cogs.sentiment.coherence import handle_coherence_alert
from cogs.sentiment.log_utils import log_analysis
from cogs.sentiment.state import flush_dirty_states
from cogs.sentiment.topic_drift import handle_topic_drift
logger = logging.getLogger("bcs.sentiment")
# How often to flush dirty user states to DB (seconds)
STATE_FLUSH_INTERVAL = 300 # 5 minutes
class SentimentCog(commands.Cog):
def __init__(self, bot: commands.Bot):
self.bot = bot
# Track which user IDs have unsaved in-memory changes
self._dirty_users: set[int] = set()
# Per-user redirect cooldown: {user_id: last_redirect_datetime}
self._redirect_cooldowns: dict[int, datetime] = {}
# Debounce buffer: keyed by channel_id, stores list of messages from ALL users
self._message_buffer: dict[int, list[discord.Message]] = {}
# Pending debounce timer tasks (per-channel)
self._debounce_tasks: dict[int, asyncio.Task] = {}
# Mention scan tasks (separate from debounce)
self._mention_scan_tasks: dict[int, asyncio.Task] = {}
# Mention scan state
self._mention_scan_cooldowns: dict[int, datetime] = {} # {channel_id: last_scan_time}
self._mention_scan_results: dict[int, str] = {} # {trigger_message_id: findings_summary}
self._analyzed_message_ids: set[int] = set() # Discord message IDs already analyzed
self._max_analyzed_ids = 500
async def cog_load(self):
self._flush_states.start()
async def cog_unload(self):
self._flush_states.cancel()
# Cancel all pending debounce timers and process remaining buffers
for task in self._debounce_tasks.values():
task.cancel()
self._debounce_tasks.clear()
for task in self._mention_scan_tasks.values():
task.cancel()
self._mention_scan_tasks.clear()
for channel_id in list(self._message_buffer):
await self._process_buffered(channel_id)
# Final flush on shutdown
await flush_dirty_states(self.bot, self._dirty_users)
@commands.Cog.listener()
async def on_message(self, message: discord.Message):
logger.info("MSG from %s in #%s: %s", message.author, getattr(message.channel, 'name', 'DM'), message.content[:80] if message.content else "(empty)")
# Ignore bots (including ourselves)
if message.author.bot:
return
# Ignore DMs
if not message.guild:
return
config = self.bot.config
monitoring = config.get("monitoring", {})
if not monitoring.get("enabled", True):
return
# Check if channel is monitored
monitored_channels = monitoring.get("channels", [])
if monitored_channels and message.channel.id not in monitored_channels:
return
# Check ignored users
if message.author.id in monitoring.get("ignored_users", []):
return
# Check immune roles
immune_roles = set(monitoring.get("immune_roles", []))
if immune_roles and any(
r.id in immune_roles for r in message.author.roles
):
return
# Check per-user immunity
if self.bot.drama_tracker.is_immune(message.author.id):
return
# Explicit @mention of the bot triggers a mention scan instead of scoring.
# Reply-pings (Discord auto-adds replied-to user to mentions) should NOT
# trigger scans — and reply-to-bot messages should still be scored normally
# so toxic replies to bot warnings aren't silently skipped.
bot_mentioned_in_text = (
f"<@{self.bot.user.id}>" in (message.content or "")
or f"<@!{self.bot.user.id}>" in (message.content or "")
)
if bot_mentioned_in_text:
mention_config = config.get("mention_scan", {})
if mention_config.get("enabled", True):
await self._maybe_start_mention_scan(message, mention_config)
return
# Skip if empty
if not message.content or not message.content.strip():
return
# Buffer the message and start/reset debounce timer (per-channel)
channel_id = message.channel.id
if channel_id not in self._message_buffer:
self._message_buffer[channel_id] = []
self._message_buffer[channel_id].append(message)
# Cancel existing debounce timer for this channel
existing_task = self._debounce_tasks.get(channel_id)
if existing_task and not existing_task.done():
existing_task.cancel()
batch_window = config.get("sentiment", {}).get("batch_window_seconds", 3)
self._debounce_tasks[channel_id] = asyncio.create_task(
self._debounce_then_process(channel_id, batch_window)
)
async def _debounce_then_process(self, channel_id: int, delay: float):
"""Sleep for the debounce window, then process the buffered messages."""
try:
await asyncio.sleep(delay)
await self._process_buffered(channel_id)
except asyncio.CancelledError:
pass # Timer was reset by a new message — expected
def _resolve_thresholds(self) -> dict:
"""Resolve effective moderation thresholds based on current mode."""
config = self.bot.config
sentiment_config = config.get("sentiment", {})
mode_config = self.bot.get_mode_config()
moderation_level = mode_config.get("moderation", "full")
if moderation_level == "relaxed" and "relaxed_thresholds" in mode_config:
rt = mode_config["relaxed_thresholds"]
return {
"warning": rt.get("warning_threshold", 0.80),
"mute": rt.get("mute_threshold", 0.85),
"spike_warn": rt.get("spike_warning_threshold", 0.70),
"spike_mute": rt.get("spike_mute_threshold", 0.85),
}
return {
"warning": sentiment_config.get("warning_threshold", 0.6),
"mute": sentiment_config.get("mute_threshold", 0.75),
"spike_warn": sentiment_config.get("spike_warning_threshold", 0.5),
"spike_mute": sentiment_config.get("spike_mute_threshold", 0.8),
}
async def _apply_moderation(
self,
message: discord.Message,
user_id: int,
score: float,
drama_score: float,
categories: list[str],
thresholds: dict,
db_message_id: int | None,
) -> None:
"""Issue a warning or mute based on scores and thresholds."""
mute_threshold = self.bot.drama_tracker.get_mute_threshold(user_id, thresholds["mute"])
user_data = self.bot.drama_tracker.get_user(user_id)
if drama_score >= mute_threshold or score >= thresholds["spike_mute"]:
effective_score = max(drama_score, score)
if user_data.warned_since_reset:
await mute_user(self.bot, message, effective_score, categories, db_message_id, self._dirty_users)
else:
logger.info("Downgrading mute to warning for %s (no prior warning)", message.author)
await warn_user(self.bot, message, effective_score, db_message_id, self._dirty_users)
elif drama_score >= thresholds["warning"] or score >= thresholds["spike_warn"]:
effective_score = max(drama_score, score)
await warn_user(self.bot, message, effective_score, db_message_id, self._dirty_users)
@staticmethod
def _build_user_lookup(messages: list[discord.Message]) -> dict[str, tuple[int, discord.Message, list[discord.Message]]]:
"""Build username -> (user_id, ref_msg, [messages]) mapping."""
lookup: dict[str, tuple[int, discord.Message, list[discord.Message]]] = {}
for msg in messages:
name = msg.author.display_name
if name not in lookup:
lookup[name] = (msg.author.id, msg, [])
lookup[name][2].append(msg)
return lookup
def _build_user_notes_map(self, messages: list[discord.Message]) -> dict[str, str]:
"""Build username -> LLM notes mapping for users in the message list."""
notes_map: dict[str, str] = {}
for msg in messages:
name = msg.author.display_name
if name not in notes_map:
notes = self.bot.drama_tracker.get_user_notes(msg.author.id)
if notes:
notes_map[name] = notes
return notes_map
@staticmethod
def _build_conversation(
messages: list[discord.Message],
) -> list[tuple[str, str, datetime, str | None]]:
"""Convert a list of Discord messages to conversation tuples with reply resolution."""
msg_id_to_author = {m.id: m.author.display_name for m in messages}
conversation = []
for msg in messages:
reply_to = None
if msg.reference and msg.reference.message_id:
reply_to = msg_id_to_author.get(msg.reference.message_id)
if not reply_to:
ref = msg.reference.cached_message
if ref:
reply_to = ref.author.display_name
conversation.append((
msg.author.display_name,
msg.content,
msg.created_at,
reply_to,
))
return conversation
# -- Shared finding processor --
async def _process_finding(
self,
finding: dict,
user_lookup: dict,
*,
sentiment_config: dict,
dry_run: bool,
thresholds: dict,
db_content: str,
db_topic_category: str,
db_topic_reasoning: str,
db_coherence_score: float | None,
db_coherence_flag: str | None,
game_channels: dict | None = None,
coherence_config: dict | None = None,
) -> tuple[str, float, float, list[str]] | None:
"""Process a single user finding.
Returns (username, score, drama_score, categories) or None if skipped.
When game_channels is not None, topic drift, game redirect, and coherence
handlers are active (buffered analysis mode). When None, they are skipped
(mention scan mode).
"""
username = finding["username"]
lookup = user_lookup.get(username)
if not lookup:
return None
user_id, user_ref_msg, user_msgs = lookup
score = finding["toxicity_score"]
categories = finding["categories"]
reasoning = finding["reasoning"]
off_topic = finding.get("off_topic", False)
note_update = finding.get("note_update")
# Track in DramaTracker
self.bot.drama_tracker.add_entry(user_id, score, categories, reasoning)
escalation_boost = sentiment_config.get("escalation_boost", 0.04)
drama_score = self.bot.drama_tracker.get_drama_score(user_id, escalation_boost=escalation_boost)
logger.info(
"User %s (%d) | msg_score=%.2f | drama_score=%.2f | categories=%s | %s",
username, user_id, score, drama_score, categories, reasoning,
)
# Save to DB
db_message_id = await self.bot.db.save_message_and_analysis(
guild_id=user_ref_msg.guild.id,
channel_id=user_ref_msg.channel.id,
user_id=user_id,
username=username,
content=db_content,
message_ts=user_ref_msg.created_at.replace(tzinfo=timezone.utc),
toxicity_score=score,
drama_score=drama_score,
categories=categories,
reasoning=reasoning,
off_topic=off_topic,
topic_category=db_topic_category,
topic_reasoning=db_topic_reasoning,
coherence_score=db_coherence_score,
coherence_flag=db_coherence_flag,
)
# Feature handlers — only active during buffered analysis (game_channels set)
if game_channels is not None:
if off_topic:
await handle_topic_drift(
self.bot, user_ref_msg, db_topic_category, db_topic_reasoning,
db_message_id, self._dirty_users,
)
detected_game = finding.get("detected_game")
if detected_game and game_channels and not dry_run:
await handle_channel_redirect(
self.bot, user_ref_msg, detected_game, game_channels,
db_message_id, self._redirect_cooldowns,
)
if coherence_config is not None and coherence_config.get("enabled", True):
coherence_score = finding.get("coherence_score", 0.85)
coherence_flag = finding.get("coherence_flag", "normal")
degradation = self.bot.drama_tracker.update_coherence(
user_id=user_id,
score=coherence_score,
flag=coherence_flag,
drop_threshold=coherence_config.get("drop_threshold", 0.3),
absolute_floor=coherence_config.get("absolute_floor", 0.5),
cooldown_minutes=coherence_config.get("cooldown_minutes", 30),
)
if degradation and not dry_run:
await handle_coherence_alert(
self.bot, user_ref_msg, degradation, coherence_config,
db_message_id, self._dirty_users,
)
# Note update
if note_update:
self.bot.drama_tracker.update_user_notes(user_id, note_update)
self._dirty_users.add(user_id)
self._dirty_users.add(user_id)
# Log analysis
await log_analysis(
user_ref_msg, score, drama_score, categories, reasoning,
off_topic, db_topic_category,
)
# Moderation
if not dry_run:
await self._apply_moderation(
user_ref_msg, user_id, score, drama_score, categories, thresholds, db_message_id,
)
return (username, score, drama_score, categories)
# -- Buffered analysis --
async def _process_buffered(self, channel_id: int):
"""Collect buffered messages, build conversation block, and run analysis."""
messages = self._message_buffer.pop(channel_id, [])
self._debounce_tasks.pop(channel_id, None)
if not messages:
return
# Use the last message as reference for channel/guild
ref_message = messages[-1]
channel = ref_message.channel
config = self.bot.config
sentiment_config = config.get("sentiment", {})
game_channels = config.get("game_channels", {})
# Fetch some history before the buffered messages for leading context
context_count = sentiment_config.get("context_messages", 8)
oldest_buffered = messages[0]
history_messages: list[discord.Message] = []
try:
async for msg in channel.history(limit=context_count + 5, before=oldest_buffered):
if msg.author.bot:
continue
if not msg.content or not msg.content.strip():
continue
history_messages.append(msg)
if len(history_messages) >= context_count:
break
except discord.HTTPException:
pass
history_messages.reverse() # chronological order
# Combine: history (context) + buffered (new messages to analyze)
new_message_start = len(history_messages)
all_messages = history_messages + messages
conversation = self._build_conversation(all_messages)
if not conversation:
return
user_notes_map = self._build_user_notes_map(messages)
channel_context = build_channel_context(ref_message, game_channels)
logger.info(
"Channel analysis: %d new messages (+%d context) in #%s",
len(messages), len(history_messages),
getattr(channel, 'name', 'unknown'),
)
# TRIAGE: Lightweight model — conversation-level analysis
result = await self.bot.llm.analyze_conversation(
conversation,
channel_context=channel_context,
user_notes_map=user_notes_map,
new_message_start=new_message_start,
)
if result is None:
return
# ESCALATION: Re-analyze with heavy model if any finding warrants it
escalation_threshold = sentiment_config.get("escalation_threshold", 0.25)
needs_escalation = any(
f["toxicity_score"] >= escalation_threshold
or f.get("off_topic", False)
or f.get("coherence_score", 1.0) < 0.6
for f in result.get("user_findings", [])
)
if needs_escalation:
heavy_result = await self.bot.llm_heavy.analyze_conversation(
conversation,
channel_context=channel_context,
user_notes_map=user_notes_map,
new_message_start=new_message_start,
)
if heavy_result is not None:
logger.info(
"Escalated to heavy model for #%s",
getattr(channel, 'name', 'unknown'),
)
result = heavy_result
user_lookup = self._build_user_lookup(messages)
# Mark all buffered messages as analyzed (for mention scan dedup)
for m in messages:
self._mark_analyzed(m.id)
dry_run = config.get("monitoring", {}).get("dry_run", False)
thresholds = self._resolve_thresholds()
coherence_config = config.get("coherence", {})
# Process per-user findings
for finding in result.get("user_findings", []):
username = finding["username"]
lookup = user_lookup.get(username)
if not lookup:
continue
_, _, user_msgs = lookup
combined_content = "\n".join(
m.content for m in user_msgs if m.content and m.content.strip()
)[:4000]
await self._process_finding(
finding, user_lookup,
sentiment_config=sentiment_config,
dry_run=dry_run,
thresholds=thresholds,
db_content=combined_content,
db_topic_category=finding.get("topic_category", "general_chat"),
db_topic_reasoning=finding.get("topic_reasoning", ""),
db_coherence_score=finding.get("coherence_score", 0.85),
db_coherence_flag=finding.get("coherence_flag", "normal"),
game_channels=game_channels,
coherence_config=coherence_config,
)
# -- Mention scan methods --
def _mark_analyzed(self, discord_message_id: int):
"""Track a Discord message ID as already analyzed."""
self._analyzed_message_ids.add(discord_message_id)
if len(self._analyzed_message_ids) > self._max_analyzed_ids:
sorted_ids = sorted(self._analyzed_message_ids)
self._analyzed_message_ids = set(sorted_ids[len(sorted_ids) // 2:])
async def _maybe_start_mention_scan(
self, trigger_message: discord.Message, mention_config: dict
):
"""Check cooldown and kick off a mention-triggered scan of recent messages."""
channel_id = trigger_message.channel.id
cooldown_seconds = mention_config.get("cooldown_seconds", 60)
now = datetime.now(timezone.utc)
last_scan = self._mention_scan_cooldowns.get(channel_id)
if last_scan and (now - last_scan).total_seconds() < cooldown_seconds:
logger.info(
"Mention scan cooldown active for #%s, skipping.",
getattr(trigger_message.channel, "name", "unknown"),
)
return
self._mention_scan_cooldowns[channel_id] = now
# Extract the user's concern (strip the bot ping from the message)
mention_text = trigger_message.content
for fmt in (f"<@{self.bot.user.id}>", f"<@!{self.bot.user.id}>"):
mention_text = mention_text.replace(fmt, "")
mention_text = mention_text.strip() or "(user pinged bot without specific concern)"
# Store as a mention scan task (separate from debounce)
existing_task = self._mention_scan_tasks.get(channel_id)
if existing_task and not existing_task.done():
existing_task.cancel()
self._mention_scan_tasks[channel_id] = asyncio.create_task(
self._run_mention_scan(trigger_message, mention_text, mention_config)
)
async def _run_mention_scan(
self,
trigger_message: discord.Message,
mention_text: str,
mention_config: dict,
):
"""Scan recent channel messages with ONE conversation-level LLM call."""
channel = trigger_message.channel
scan_count = mention_config.get("scan_messages", 30)
config = self.bot.config
sentiment_config = config.get("sentiment", {})
game_channels = config.get("game_channels", {})
# Fetch recent messages (before the trigger, skip bots/empty)
raw_messages: list[discord.Message] = []
try:
async for msg in channel.history(limit=scan_count + 10, before=trigger_message):
if msg.author.bot:
continue
if not msg.content or not msg.content.strip():
continue
raw_messages.append(msg)
if len(raw_messages) >= scan_count:
break
except discord.HTTPException:
logger.warning("Failed to fetch history for mention scan in #%s",
getattr(channel, "name", "unknown"))
return
raw_messages.reverse() # chronological order
if not raw_messages:
self._mention_scan_results[trigger_message.id] = "No recent messages found to analyze."
return
logger.info(
"Mention scan triggered by %s in #%s: %d messages (single LLM call). Focus: %s",
trigger_message.author.display_name,
getattr(channel, "name", "unknown"),
len(raw_messages),
mention_text[:80],
)
conversation = self._build_conversation(raw_messages)
user_notes_map = self._build_user_notes_map(raw_messages)
channel_context = build_channel_context(raw_messages[0], game_channels)
mention_context = (
f"A user flagged this conversation and said: \"{mention_text}\"\n"
f"Pay special attention to whether this concern is valid."
)
# Single LLM call
result = await self.bot.llm.analyze_conversation(
conversation,
mention_context=mention_context,
channel_context=channel_context,
user_notes_map=user_notes_map,
)
if result is None:
logger.warning("Conversation analysis failed for mention scan.")
self._mention_scan_results[trigger_message.id] = "Analysis failed."
return
user_lookup = self._build_user_lookup(raw_messages)
findings: list[str] = []
dry_run = config.get("monitoring", {}).get("dry_run", False)
thresholds = self._resolve_thresholds()
for finding in result.get("user_findings", []):
username = finding["username"]
lookup = user_lookup.get(username)
if not lookup:
logger.warning("Mention scan: LLM returned unknown user '%s', skipping.", username)
continue
user_id, ref_msg, user_msgs = lookup
# Skip if all their messages were already analyzed
if all(m.id in self._analyzed_message_ids for m in user_msgs):
continue
# Mark their messages as analyzed
for m in user_msgs:
self._mark_analyzed(m.id)
worst_msg = finding.get("worst_message")
content = f"[Mention scan] {worst_msg}" if worst_msg else "[Mention scan] See conversation"
off_topic = finding.get("off_topic", False)
result_tuple = await self._process_finding(
finding, user_lookup,
sentiment_config=sentiment_config,
dry_run=dry_run,
thresholds=thresholds,
db_content=content,
db_topic_category="personal_drama" if off_topic else "gaming",
db_topic_reasoning=finding.get("reasoning", ""),
db_coherence_score=None,
db_coherence_flag=None,
)
if result_tuple:
_, score, _, categories = result_tuple
if score >= 0.3:
cat_str = ", ".join(c for c in categories if c != "none") or "none"
findings.append(f"{username}: {score:.2f} ({cat_str})")
# Build summary for ChatCog
convo_summary = result.get("conversation_summary", "")
if findings:
summary = f"Scanned {len(raw_messages)} msgs. {convo_summary} Notable: " + "; ".join(findings[:5])
else:
summary = f"Scanned {len(raw_messages)} msgs. {convo_summary}"
# Prune old scan results
if len(self._mention_scan_results) > 20:
oldest = sorted(self._mention_scan_results.keys())[:len(self._mention_scan_results) - 10]
for k in oldest:
del self._mention_scan_results[k]
self._mention_scan_results[trigger_message.id] = summary
logger.info(
"Mention scan complete in #%s: 1 LLM call, %d messages, %d users flagged",
getattr(channel, "name", "unknown"),
len(raw_messages),
len(findings),
)
# -- State flush loop --
@tasks.loop(seconds=STATE_FLUSH_INTERVAL)
async def _flush_states(self):
await flush_dirty_states(self.bot, self._dirty_users)
@_flush_states.before_loop
async def _before_flush(self):
await self.bot.wait_until_ready()
async def setup(bot: commands.Bot):
await bot.add_cog(SentimentCog(bot))

123
cogs/sentiment/actions.py Normal file
View File

@@ -0,0 +1,123 @@
import asyncio
import logging
from datetime import timedelta
import discord
from cogs.sentiment.log_utils import log_action
from cogs.sentiment.state import save_user_state
logger = logging.getLogger("bcs.sentiment")
async def mute_user(
bot, message: discord.Message, score: float,
categories: list[str], db_message_id: int | None, dirty_users: set[int],
):
member = message.author
if not isinstance(member, discord.Member):
return
if not message.guild.me.guild_permissions.moderate_members:
logger.warning("Missing moderate_members permission, cannot mute.")
return
offense_num = bot.drama_tracker.record_offense(member.id)
timeout_config = bot.config.get("timeouts", {})
escalation = timeout_config.get("escalation_minutes", [5, 15, 30, 60])
idx = min(offense_num - 1, len(escalation) - 1)
duration_minutes = escalation[idx]
try:
await member.timeout(
timedelta(minutes=duration_minutes),
reason=f"BCS auto-mute: drama score {score:.2f}",
)
except discord.Forbidden:
logger.warning("Cannot timeout %s — role hierarchy issue.", member)
return
except discord.HTTPException as e:
logger.error("Failed to timeout %s: %s", member, e)
return
messages_config = bot.config.get("messages", {})
cat_str = ", ".join(c for c in categories if c != "none") or "general negativity"
embed = discord.Embed(
title=messages_config.get("mute_title", "BREEHAVIOR ALERT"),
description=messages_config.get("mute_description", "").format(
username=member.display_name,
duration=f"{duration_minutes} minutes",
score=f"{score:.2f}",
categories=cat_str,
),
color=discord.Color.red(),
)
embed.set_footer(
text=f"Offense #{offense_num} | Timeout: {duration_minutes}m"
)
await message.channel.send(embed=embed)
await log_action(
message.guild,
f"**MUTE** | {member.mention} | Score: {score:.2f} | "
f"Duration: {duration_minutes}m | Offense #{offense_num} | "
f"Categories: {cat_str}",
)
logger.info(
"Muted %s for %d minutes (offense #%d, score %.2f)",
member, duration_minutes, offense_num, score,
)
asyncio.create_task(bot.db.save_action(
guild_id=message.guild.id,
user_id=member.id,
username=member.display_name,
action_type="mute",
message_id=db_message_id,
details=f"duration={duration_minutes}m offense={offense_num} score={score:.2f} categories={cat_str}",
))
save_user_state(bot, dirty_users, member.id)
async def warn_user(
bot, message: discord.Message, score: float,
db_message_id: int | None, dirty_users: set[int],
):
timeout_config = bot.config.get("timeouts", {})
cooldown = timeout_config.get("warning_cooldown_minutes", 5)
if not bot.drama_tracker.can_warn(message.author.id, cooldown):
return
bot.drama_tracker.record_warning(message.author.id)
try:
await message.add_reaction("\u26a0\ufe0f")
except discord.HTTPException:
pass
messages_config = bot.config.get("messages", {})
warning_text = messages_config.get(
"warning",
"Easy there, {username}. The Breehavior Monitor is watching.",
).format(username=message.author.display_name)
await message.channel.send(warning_text)
await log_action(
message.guild,
f"**WARNING** | {message.author.mention} | Score: {score:.2f}",
)
logger.info("Warned %s (score %.2f)", message.author, score)
asyncio.create_task(bot.db.save_action(
guild_id=message.guild.id,
user_id=message.author.id,
username=message.author.display_name,
action_type="warning",
message_id=db_message_id,
details=f"score={score:.2f}",
))
save_user_state(bot, dirty_users, message.author.id)

View File

@@ -0,0 +1,95 @@
import asyncio
import logging
from datetime import datetime, timedelta, timezone
import discord
from cogs.sentiment.log_utils import log_action
logger = logging.getLogger("bcs.sentiment")
def build_channel_context(message: discord.Message, game_channels: dict) -> str:
"""Build a channel context string for LLM game detection."""
if not game_channels:
return ""
channel_name = getattr(message.channel, "name", "")
current_game = game_channels.get(channel_name)
lines = []
if current_game:
lines.append(f"Current channel: #{channel_name} ({current_game})")
else:
lines.append(f"Current channel: #{channel_name}")
channel_list = ", ".join(f"#{ch} ({game})" for ch, game in game_channels.items())
lines.append(f"Game channels: {channel_list}")
return "\n".join(lines)
async def handle_channel_redirect(
bot, message: discord.Message, detected_game: str,
game_channels: dict, db_message_id: int | None,
redirect_cooldowns: dict[int, datetime],
):
"""Send a redirect message if the user is talking about a different game."""
channel_name = getattr(message.channel, "name", "")
# Only redirect if message is in a game channel
if channel_name not in game_channels:
return
# No redirect needed if detected game matches current channel
if detected_game == channel_name:
return
# Detected game must be a valid game channel
if detected_game not in game_channels:
return
# Find the target channel in the guild
target_channel = discord.utils.get(
message.guild.text_channels, name=detected_game
)
if not target_channel:
return
# Check per-user cooldown
user_id = message.author.id
cooldown_minutes = bot.config.get("topic_drift", {}).get("remind_cooldown_minutes", 10)
now = datetime.now(timezone.utc)
last_redirect = redirect_cooldowns.get(user_id)
if last_redirect and (now - last_redirect) < timedelta(minutes=cooldown_minutes):
return
redirect_cooldowns[user_id] = now
messages_config = bot.config.get("messages", {})
game_name = game_channels[detected_game]
redirect_text = messages_config.get(
"channel_redirect",
"Hey {username}, that sounds like {game} talk \u2014 head over to {channel} for that!",
).format(
username=message.author.display_name,
game=game_name,
channel=target_channel.mention,
)
await message.channel.send(redirect_text)
await log_action(
message.guild,
f"**CHANNEL REDIRECT** | {message.author.mention} | "
f"#{channel_name} \u2192 #{detected_game} ({game_name})",
)
logger.info(
"Redirected %s from #%s to #%s (%s)",
message.author, channel_name, detected_game, game_name,
)
asyncio.create_task(bot.db.save_action(
guild_id=message.guild.id,
user_id=user_id,
username=message.author.display_name,
action_type="channel_redirect",
message_id=db_message_id,
details=f"from=#{channel_name} to=#{detected_game} game={game_name}",
))

View File

@@ -0,0 +1,43 @@
import asyncio
import logging
import discord
from cogs.sentiment.log_utils import log_action
from cogs.sentiment.state import save_user_state
logger = logging.getLogger("bcs.sentiment")
async def handle_coherence_alert(
bot, message: discord.Message, degradation: dict, coherence_config: dict,
db_message_id: int | None, dirty_users: set[int],
):
flag = degradation["flag"]
messages_map = coherence_config.get("messages", {})
alert_text = messages_map.get(flag, messages_map.get(
"default", "You okay there, {username}? That message was... something."
)).format(username=message.author.display_name)
await message.channel.send(alert_text)
await log_action(
message.guild,
f"**COHERENCE ALERT** | {message.author.mention} | "
f"Score: {degradation['current']:.2f} | Baseline: {degradation['baseline']:.2f} | "
f"Drop: {degradation['drop']:.2f} | Flag: {flag}",
)
logger.info(
"Coherence alert for %s: score=%.2f baseline=%.2f drop=%.2f flag=%s",
message.author, degradation["current"], degradation["baseline"],
degradation["drop"], flag,
)
asyncio.create_task(bot.db.save_action(
guild_id=message.guild.id,
user_id=message.author.id,
username=message.author.display_name,
action_type="coherence_alert",
message_id=db_message_id,
details=f"score={degradation['current']:.2f} baseline={degradation['baseline']:.2f} drop={degradation['drop']:.2f} flag={flag}",
))
save_user_state(bot, dirty_users, message.author.id)

View File

@@ -0,0 +1,54 @@
import logging
import discord
logger = logging.getLogger("bcs.sentiment")
def score_color(score: float) -> discord.Color:
if score >= 0.75:
return discord.Color.red()
if score >= 0.6:
return discord.Color.orange()
if score >= 0.3:
return discord.Color.yellow()
return discord.Color.green()
async def log_analysis(
message: discord.Message, score: float, drama_score: float,
categories: list[str], reasoning: str, off_topic: bool, topic_category: str,
):
log_channel = discord.utils.get(
message.guild.text_channels, name="bcs-log"
)
if not log_channel:
return
# Only log notable messages (score > 0.1) to avoid spam
if score <= 0.1:
return
cat_str = ", ".join(c for c in categories if c != "none") or "none"
embed = discord.Embed(
title=f"Analysis: {message.author.display_name}",
description=f"#{message.channel.name}: {message.content[:200]}",
color=score_color(score),
)
embed.add_field(name="Message Score", value=f"{score:.2f}", inline=True)
embed.add_field(name="Rolling Drama", value=f"{drama_score:.2f}", inline=True)
embed.add_field(name="Categories", value=cat_str, inline=True)
embed.add_field(name="Reasoning", value=reasoning[:1024] or "n/a", inline=False)
try:
await log_channel.send(embed=embed)
except discord.HTTPException:
pass
async def log_action(guild: discord.Guild, text: str):
log_channel = discord.utils.get(guild.text_channels, name="bcs-log")
if log_channel:
try:
await log_channel.send(text)
except discord.HTTPException:
pass

41
cogs/sentiment/state.py Normal file
View File

@@ -0,0 +1,41 @@
import asyncio
import logging
logger = logging.getLogger("bcs.sentiment")
def save_user_state(bot, dirty_users: set[int], user_id: int) -> None:
"""Fire-and-forget save of a user's current state to DB."""
user_data = bot.drama_tracker.get_user(user_id)
asyncio.create_task(bot.db.save_user_state(
user_id=user_id,
offense_count=user_data.offense_count,
immune=user_data.immune,
off_topic_count=user_data.off_topic_count,
baseline_coherence=user_data.baseline_coherence,
user_notes=user_data.notes or None,
warned=user_data.warned_since_reset,
last_offense_at=user_data.last_offense_time or None,
))
dirty_users.discard(user_id)
async def flush_dirty_states(bot, dirty_users: set[int]) -> None:
"""Save all dirty user states to DB."""
if not dirty_users:
return
dirty = list(dirty_users)
dirty_users.clear()
for user_id in dirty:
user_data = bot.drama_tracker.get_user(user_id)
await bot.db.save_user_state(
user_id=user_id,
offense_count=user_data.offense_count,
immune=user_data.immune,
off_topic_count=user_data.off_topic_count,
baseline_coherence=user_data.baseline_coherence,
user_notes=user_data.notes or None,
warned=user_data.warned_since_reset,
last_offense_at=user_data.last_offense_time or None,
)
logger.info("Flushed %d dirty user states to DB.", len(dirty))

View File

@@ -0,0 +1,111 @@
import asyncio
import logging
import discord
from cogs.sentiment.log_utils import log_action
from cogs.sentiment.state import save_user_state
logger = logging.getLogger("bcs.sentiment")
async def handle_topic_drift(
bot, message: discord.Message, topic_category: str, topic_reasoning: str,
db_message_id: int | None, dirty_users: set[int],
):
config = bot.config.get("topic_drift", {})
if not config.get("enabled", True):
return
ignored = config.get("ignored_channels", [])
if message.channel.id in ignored or getattr(message.channel, "name", "") in ignored:
return
dry_run = bot.config.get("monitoring", {}).get("dry_run", False)
if dry_run:
return
tracker = bot.drama_tracker
user_id = message.author.id
cooldown = config.get("remind_cooldown_minutes", 10)
if not tracker.can_topic_remind(user_id, cooldown):
return
count = tracker.record_off_topic(user_id)
escalation_threshold = config.get("escalation_count", 3)
messages_config = bot.config.get("messages", {})
if count >= escalation_threshold and not tracker.was_owner_notified(user_id):
tracker.mark_owner_notified(user_id)
owner = message.guild.owner
if owner:
dm_text = messages_config.get(
"topic_owner_dm",
"Heads up: {username} keeps going off-topic in #{channel}. Reminded {count} times.",
).format(
username=message.author.display_name,
channel=message.channel.name,
count=count,
)
try:
await owner.send(dm_text)
except discord.HTTPException:
logger.warning("Could not DM server owner about topic drift.")
await log_action(
message.guild,
f"**TOPIC DRIFT \u2014 OWNER NOTIFIED** | {message.author.mention} | "
f"Off-topic count: {count} | Category: {topic_category}",
)
logger.info("Notified owner about %s topic drift (count %d)", message.author, count)
asyncio.create_task(bot.db.save_action(
guild_id=message.guild.id, user_id=user_id,
username=message.author.display_name,
action_type="topic_escalation", message_id=db_message_id,
details=f"off_topic_count={count} category={topic_category}",
))
save_user_state(bot, dirty_users, user_id)
elif count >= 2:
nudge_text = messages_config.get(
"topic_nudge",
"{username}, let's keep it to gaming talk in here.",
).format(username=message.author.display_name)
await message.channel.send(nudge_text)
await log_action(
message.guild,
f"**TOPIC NUDGE** | {message.author.mention} | "
f"Off-topic count: {count} | Category: {topic_category}",
)
logger.info("Topic nudge for %s (count %d)", message.author, count)
asyncio.create_task(bot.db.save_action(
guild_id=message.guild.id, user_id=user_id,
username=message.author.display_name,
action_type="topic_nudge", message_id=db_message_id,
details=f"off_topic_count={count} category={topic_category}",
))
save_user_state(bot, dirty_users, user_id)
else:
remind_text = messages_config.get(
"topic_remind",
"Hey {username}, this is a gaming server \u2014 maybe take the personal stuff to DMs?",
).format(username=message.author.display_name)
await message.channel.send(remind_text)
await log_action(
message.guild,
f"**TOPIC REMIND** | {message.author.mention} | "
f"Category: {topic_category} | {topic_reasoning}",
)
logger.info("Topic remind for %s (count %d)", message.author, count)
asyncio.create_task(bot.db.save_action(
guild_id=message.guild.id, user_id=user_id,
username=message.author.display_name,
action_type="topic_remind", message_id=db_message_id,
details=f"off_topic_count={count} category={topic_category} reasoning={topic_reasoning}",
))
save_user_state(bot, dirty_users, user_id)