feat: channel-level conversation analysis with compact formatting

Switch from per-user message batching to per-channel conversation
analysis. The LLM now sees the full interleaved conversation with
relative timestamps, reply chains, and consecutive message collapsing
instead of isolated flat text per user.

Key changes:
- Fix gpt-5-nano temperature incompatibility (conditional temp param)
- Add mention-triggered scan: users @mention bot to analyze recent chat
- Refactor debounce buffer from (channel_id, user_id) to channel_id
- Replace per-message analyze_message() with analyze_conversation()
  returning per-user findings from a single LLM call
- Add CONVERSATION_TOOL schema with coherence, topic, and game fields
- Compact message format: relative timestamps, reply arrows (→),
  consecutive same-user message collapsing
- Separate mention scan tasks from debounce tasks
- Remove _store_context/_get_context (conversation block IS the context)
- Escalation timeout config: [30, 60, 120, 240] minutes

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-02-24 23:13:07 -05:00
parent 943c67cc87
commit 90b70cad69
5 changed files with 793 additions and 227 deletions
+18 -2
View File
@@ -153,6 +153,19 @@ class ChatCog(commands.Cog):
if not content: if not content:
content = "(just pinged me)" if not is_proactive else message.content content = "(just pinged me)" if not is_proactive else message.content
# If a mention scan is running, await it so we can include findings
scan_summary = ""
if self.bot.user in message.mentions:
sentiment_cog = self.bot.get_cog("SentimentCog")
if sentiment_cog:
task = sentiment_cog._mention_scan_tasks.get(message.channel.id)
if task and not task.done():
try:
await asyncio.wait_for(asyncio.shield(task), timeout=45)
except (asyncio.TimeoutError, asyncio.CancelledError):
pass
scan_summary = sentiment_cog._mention_scan_results.pop(message.id, "")
# Add drama score context only when noteworthy # Add drama score context only when noteworthy
drama_score = self.bot.drama_tracker.get_drama_score(message.author.id) drama_score = self.bot.drama_tracker.get_drama_score(message.author.id)
user_data = self.bot.drama_tracker.get_user(message.author.id) user_data = self.bot.drama_tracker.get_user(message.author.id)
@@ -169,6 +182,10 @@ class ChatCog(commands.Cog):
if user_notes: if user_notes:
extra_context += f"[Notes about {message.author.display_name}: {user_notes}]\n" extra_context += f"[Notes about {message.author.display_name}: {user_notes}]\n"
# Include mention scan findings if available
if scan_summary:
extra_context += f"[You just scanned recent chat. Results: {scan_summary}]\n"
recent_user_msgs = [] recent_user_msgs = []
try: try:
async for msg in message.channel.history(limit=50, before=message): async for msg in message.channel.history(limit=50, before=message):
@@ -239,8 +256,7 @@ class ChatCog(commands.Cog):
# warnings/mutes appear before the chat reply # warnings/mutes appear before the chat reply
sentiment_cog = self.bot.get_cog("SentimentCog") sentiment_cog = self.bot.get_cog("SentimentCog")
if sentiment_cog: if sentiment_cog:
key = (message.channel.id, message.author.id) task = sentiment_cog._debounce_tasks.get(message.channel.id)
task = sentiment_cog._debounce_tasks.get(key)
if task and not task.done(): if task and not task.done():
try: try:
await asyncio.wait_for(asyncio.shield(task), timeout=15) await asyncio.wait_for(asyncio.shield(task), timeout=15)
+489 -216
View File
@@ -1,8 +1,8 @@
import asyncio import asyncio
import logging import logging
from collections import deque
from datetime import datetime, timedelta, timezone from datetime import datetime, timedelta, timezone
import discord import discord
from discord.ext import commands, tasks from discord.ext import commands, tasks
@@ -15,17 +15,21 @@ STATE_FLUSH_INTERVAL = 300 # 5 minutes
class SentimentCog(commands.Cog): class SentimentCog(commands.Cog):
def __init__(self, bot: commands.Bot): def __init__(self, bot: commands.Bot):
self.bot = bot self.bot = bot
# Per-channel message history for context: {channel_id: deque of (author, content)}
self._channel_history: dict[int, deque] = {}
# Track which user IDs have unsaved in-memory changes # Track which user IDs have unsaved in-memory changes
self._dirty_users: set[int] = set() self._dirty_users: set[int] = set()
# Per-user redirect cooldown: {user_id: last_redirect_datetime} # Per-user redirect cooldown: {user_id: last_redirect_datetime}
self._redirect_cooldowns: dict[int, datetime] = {} self._redirect_cooldowns: dict[int, datetime] = {}
# Debounce buffer: keyed by (channel_id, user_id), stores list of messages # Debounce buffer: keyed by channel_id, stores list of messages from ALL users
self._message_buffer: dict[tuple[int, int], list[discord.Message]] = {} self._message_buffer: dict[int, list[discord.Message]] = {}
# Pending debounce timer tasks # Pending debounce timer tasks (per-channel)
self._debounce_tasks: dict[tuple[int, int], asyncio.Task] = {} self._debounce_tasks: dict[int, asyncio.Task] = {}
# Per-channel poll cooldown: {channel_id: last_poll_datetime} # Mention scan tasks (separate from debounce)
self._mention_scan_tasks: dict[int, asyncio.Task] = {}
# Mention scan state
self._mention_scan_cooldowns: dict[int, datetime] = {} # {channel_id: last_scan_time}
self._mention_scan_results: dict[int, str] = {} # {trigger_message_id: findings_summary}
self._analyzed_message_ids: set[int] = set() # Discord message IDs already analyzed
self._max_analyzed_ids = 500
async def cog_load(self): async def cog_load(self):
@@ -37,8 +41,11 @@ class SentimentCog(commands.Cog):
for task in self._debounce_tasks.values(): for task in self._debounce_tasks.values():
task.cancel() task.cancel()
self._debounce_tasks.clear() self._debounce_tasks.clear()
for key in list(self._message_buffer): for task in self._mention_scan_tasks.values():
await self._process_buffered(key) task.cancel()
self._mention_scan_tasks.clear()
for channel_id in list(self._message_buffer):
await self._process_buffered(channel_id)
# Final flush on shutdown # Final flush on shutdown
await self._flush_dirty_states() await self._flush_dirty_states()
@@ -80,202 +87,174 @@ class SentimentCog(commands.Cog):
if self.bot.drama_tracker.is_immune(message.author.id): if self.bot.drama_tracker.is_immune(message.author.id):
return return
# Skip sentiment analysis for messages directed at the bot # Messages directed at the bot (mentions, replies) shouldn't be scored
# (mentions, replies to bot) — users interacting with the bot # for toxicity — but @mentions can trigger a scan of recent chat
# in roast/chat modes shouldn't have those messages scored as toxic
directed_at_bot = self.bot.user in message.mentions directed_at_bot = self.bot.user in message.mentions
if not directed_at_bot and message.reference and message.reference.message_id: if not directed_at_bot and message.reference and message.reference.message_id:
ref = message.reference.cached_message ref = message.reference.cached_message
if ref and ref.author.id == self.bot.user.id: if ref and ref.author.id == self.bot.user.id:
directed_at_bot = True directed_at_bot = True
if directed_at_bot: if directed_at_bot:
# @mention (not just reply-to-bot) triggers a mention scan
if self.bot.user in message.mentions:
mention_config = config.get("mention_scan", {})
if mention_config.get("enabled", True):
await self._maybe_start_mention_scan(message, mention_config)
return return
# Store message in channel history for context
self._store_context(message)
# Skip if empty # Skip if empty
if not message.content or not message.content.strip(): if not message.content or not message.content.strip():
return return
# Buffer the message and start/reset debounce timer # Buffer the message and start/reset debounce timer (per-channel)
key = (message.channel.id, message.author.id) channel_id = message.channel.id
if key not in self._message_buffer: if channel_id not in self._message_buffer:
self._message_buffer[key] = [] self._message_buffer[channel_id] = []
self._message_buffer[key].append(message) self._message_buffer[channel_id].append(message)
# Cancel existing debounce timer for this user+channel # Cancel existing debounce timer for this channel
existing_task = self._debounce_tasks.get(key) existing_task = self._debounce_tasks.get(channel_id)
if existing_task and not existing_task.done(): if existing_task and not existing_task.done():
existing_task.cancel() existing_task.cancel()
# Skip debounce when bot is @mentioned so warnings fire before chat replies batch_window = config.get("sentiment", {}).get("batch_window_seconds", 3)
if self.bot.user in message.mentions:
batch_window = 0
else:
batch_window = config.get("sentiment", {}).get("batch_window_seconds", 3)
self._debounce_tasks[key] = asyncio.create_task( self._debounce_tasks[channel_id] = asyncio.create_task(
self._debounce_then_process(key, batch_window) self._debounce_then_process(channel_id, batch_window)
) )
async def _debounce_then_process(self, key: tuple[int, int], delay: float): async def _debounce_then_process(self, channel_id: int, delay: float):
"""Sleep for the debounce window, then process the buffered messages.""" """Sleep for the debounce window, then process the buffered messages."""
try: try:
await asyncio.sleep(delay) await asyncio.sleep(delay)
await self._process_buffered(key) await self._process_buffered(channel_id)
except asyncio.CancelledError: except asyncio.CancelledError:
pass # Timer was reset by a new message — expected pass # Timer was reset by a new message — expected
async def _process_buffered(self, key: tuple[int, int]): async def _process_buffered(self, channel_id: int):
"""Combine buffered messages and run the analysis pipeline once.""" """Collect buffered messages, build conversation block, and run analysis."""
messages = self._message_buffer.pop(key, []) messages = self._message_buffer.pop(channel_id, [])
self._debounce_tasks.pop(key, None) self._debounce_tasks.pop(channel_id, None)
if not messages: if not messages:
return return
# Use the last message as the reference for channel, author, guild, etc. # Use the last message as reference for channel/guild
message = messages[-1] ref_message = messages[-1]
combined_content = "\n".join(m.content for m in messages if m.content and m.content.strip()) channel = ref_message.channel
if not combined_content.strip():
return
batch_count = len(messages)
if batch_count > 1:
logger.info(
"Batched %d messages from %s in #%s",
batch_count, message.author.display_name,
getattr(message.channel, 'name', 'unknown'),
)
config = self.bot.config config = self.bot.config
monitoring = config.get("monitoring", {})
sentiment_config = config.get("sentiment", {}) sentiment_config = config.get("sentiment", {})
# Build channel context for game detection
game_channels = config.get("game_channels", {}) game_channels = config.get("game_channels", {})
channel_context = self._build_channel_context(message, game_channels)
# Analyze the combined message (triage with lightweight model) # Fetch some history before the buffered messages for leading context
context = self._get_context(message) context_count = sentiment_config.get("context_messages", 8)
user_notes = self.bot.drama_tracker.get_user_notes(message.author.id) oldest_buffered = messages[0]
result = await self.bot.llm.analyze_message( history_messages: list[discord.Message] = []
combined_content, context, user_notes=user_notes, try:
async for msg in channel.history(limit=context_count + 5, before=oldest_buffered):
if msg.author.bot:
continue
if not msg.content or not msg.content.strip():
continue
history_messages.append(msg)
if len(history_messages) >= context_count:
break
except discord.HTTPException:
pass
history_messages.reverse() # chronological order
# Combine: history (context) + buffered (new messages to analyze)
all_messages = history_messages + messages
# Build msg_id_to_author lookup for reply resolution
msg_id_to_author: dict[int, str] = {
m.id: m.author.display_name for m in all_messages
}
# Convert to conversation tuples: (username, content, timestamp, reply_to_username)
conversation: list[tuple[str, str, datetime, str | None]] = []
for msg in all_messages:
reply_to = None
if msg.reference and msg.reference.message_id:
reply_to = msg_id_to_author.get(msg.reference.message_id)
if not reply_to:
ref = msg.reference.cached_message
if ref:
reply_to = ref.author.display_name
conversation.append((
msg.author.display_name,
msg.content,
msg.created_at,
reply_to,
))
if not conversation:
return
# Build user notes map (only for users in the buffer, not history-only)
user_notes_map: dict[str, str] = {}
for msg in messages:
name = msg.author.display_name
if name not in user_notes_map:
notes = self.bot.drama_tracker.get_user_notes(msg.author.id)
if notes:
user_notes_map[name] = notes
channel_context = self._build_channel_context(ref_message, game_channels)
logger.info(
"Channel analysis: %d new messages (+%d context) in #%s",
len(messages), len(history_messages),
getattr(channel, 'name', 'unknown'),
)
# TRIAGE: Lightweight model — conversation-level analysis
result = await self.bot.llm.analyze_conversation(
conversation,
channel_context=channel_context, channel_context=channel_context,
user_notes_map=user_notes_map,
) )
if result is None: if result is None:
return return
# Escalation: re-analyze with heavy model if triage flags something # ESCALATION: Re-analyze with heavy model if any finding warrants it
escalation_threshold = sentiment_config.get("escalation_threshold", 0.25) escalation_threshold = sentiment_config.get("escalation_threshold", 0.25)
needs_escalation = ( needs_escalation = any(
result["toxicity_score"] >= escalation_threshold f["toxicity_score"] >= escalation_threshold
or result.get("off_topic", False) or f.get("off_topic", False)
or result.get("coherence_score", 1.0) < 0.6 or f.get("coherence_score", 1.0) < 0.6
for f in result.get("user_findings", [])
) )
if needs_escalation: if needs_escalation:
triage_score = result["toxicity_score"] heavy_result = await self.bot.llm_heavy.analyze_conversation(
heavy_result = await self.bot.llm_heavy.analyze_message( conversation,
combined_content, context, user_notes=user_notes,
channel_context=channel_context, channel_context=channel_context,
user_notes_map=user_notes_map,
) )
if heavy_result is not None: if heavy_result is not None:
logger.info( logger.info(
"Escalated to heavy model (triage_score=%.2f) for %s", "Escalated to heavy model for #%s",
triage_score, message.author.display_name, getattr(channel, 'name', 'unknown'),
) )
result = heavy_result result = heavy_result
score = result["toxicity_score"] # Build username -> (user_id, ref_msg, [messages]) for buffered users only
categories = result["categories"] user_lookup: dict[str, tuple[int, discord.Message, list[discord.Message]]] = {}
reasoning = result["reasoning"] for msg in messages:
name = msg.author.display_name
if name not in user_lookup:
user_lookup[name] = (msg.author.id, msg, [])
user_lookup[name][2].append(msg)
# Track the result # Mark all buffered messages as analyzed (for mention scan dedup)
self.bot.drama_tracker.add_entry( for m in messages:
message.author.id, score, categories, reasoning self._mark_analyzed(m.id)
)
drama_score = self.bot.drama_tracker.get_drama_score(message.author.id) # Resolve thresholds once
logger.info(
"User %s (%d) | msg_score=%.2f | drama_score=%.2f | categories=%s | %s",
message.author.display_name,
message.author.id,
score,
drama_score,
categories,
reasoning,
)
# Topic drift detection
off_topic = result.get("off_topic", False)
topic_category = result.get("topic_category", "general_chat")
topic_reasoning = result.get("topic_reasoning", "")
# Save message + analysis to DB (awaited — need message_id for action links)
db_message_id = await self.bot.db.save_message_and_analysis(
guild_id=message.guild.id,
channel_id=message.channel.id,
user_id=message.author.id,
username=message.author.display_name,
content=combined_content,
message_ts=message.created_at.replace(tzinfo=timezone.utc),
toxicity_score=score,
drama_score=drama_score,
categories=categories,
reasoning=reasoning,
off_topic=off_topic,
topic_category=topic_category,
topic_reasoning=topic_reasoning,
coherence_score=result.get("coherence_score"),
coherence_flag=result.get("coherence_flag"),
)
if off_topic:
await self._handle_topic_drift(message, topic_category, topic_reasoning, db_message_id)
# Game channel redirect detection
detected_game = result.get("detected_game")
if detected_game and game_channels and not monitoring.get("dry_run", False):
await self._handle_channel_redirect(message, detected_game, game_channels, db_message_id)
# Coherence / intoxication detection
coherence_score = result.get("coherence_score", 0.85)
coherence_flag = result.get("coherence_flag", "normal")
coherence_config = config.get("coherence", {})
if coherence_config.get("enabled", True):
degradation = self.bot.drama_tracker.update_coherence(
user_id=message.author.id,
score=coherence_score,
flag=coherence_flag,
drop_threshold=coherence_config.get("drop_threshold", 0.3),
absolute_floor=coherence_config.get("absolute_floor", 0.5),
cooldown_minutes=coherence_config.get("cooldown_minutes", 30),
)
if degradation and not config.get("monitoring", {}).get("dry_run", False):
await self._handle_coherence_alert(message, degradation, coherence_config, db_message_id)
# Capture LLM note updates about this user
note_update = result.get("note_update")
if note_update:
self.bot.drama_tracker.update_user_notes(message.author.id, note_update)
self._dirty_users.add(message.author.id)
# Mark dirty for coherence baseline drift even without actions
self._dirty_users.add(message.author.id)
# Always log analysis to #bcs-log if it exists
await self._log_analysis(message, score, drama_score, categories, reasoning, off_topic, topic_category)
# Dry-run mode: skip warnings/mutes
dry_run = config.get("monitoring", {}).get("dry_run", False) dry_run = config.get("monitoring", {}).get("dry_run", False)
if dry_run:
return
# Check thresholds — use relaxed thresholds if the active mode says so
mode_config = self.bot.get_mode_config() mode_config = self.bot.get_mode_config()
moderation_level = mode_config.get("moderation", "full") moderation_level = mode_config.get("moderation", "full")
if moderation_level == "relaxed" and "relaxed_thresholds" in mode_config: if moderation_level == "relaxed" and "relaxed_thresholds" in mode_config:
@@ -289,18 +268,365 @@ class SentimentCog(commands.Cog):
base_mute_threshold = sentiment_config.get("mute_threshold", 0.75) base_mute_threshold = sentiment_config.get("mute_threshold", 0.75)
spike_warn = sentiment_config.get("spike_warning_threshold", 0.5) spike_warn = sentiment_config.get("spike_warning_threshold", 0.5)
spike_mute = sentiment_config.get("spike_mute_threshold", 0.8) spike_mute = sentiment_config.get("spike_mute_threshold", 0.8)
mute_threshold = self.bot.drama_tracker.get_mute_threshold( coherence_config = config.get("coherence", {})
message.author.id, base_mute_threshold
# Process per-user findings
for finding in result.get("user_findings", []):
username = finding["username"]
lookup = user_lookup.get(username)
if not lookup:
# LLM returned a finding for a history-only user or unknown name; skip
continue
user_id, user_ref_msg, user_msgs = lookup
score = finding["toxicity_score"]
categories = finding["categories"]
reasoning = finding["reasoning"]
off_topic = finding.get("off_topic", False)
topic_category = finding.get("topic_category", "general_chat")
topic_reasoning = finding.get("topic_reasoning", "")
coherence_score = finding.get("coherence_score", 0.85)
coherence_flag = finding.get("coherence_flag", "normal")
note_update = finding.get("note_update")
detected_game = finding.get("detected_game")
# Track the result in DramaTracker
self.bot.drama_tracker.add_entry(user_id, score, categories, reasoning)
drama_score = self.bot.drama_tracker.get_drama_score(user_id)
logger.info(
"User %s (%d) | msg_score=%.2f | drama_score=%.2f | categories=%s | %s",
username, user_id, score, drama_score, categories, reasoning,
)
# Save message + analysis to DB
combined_content = "\n".join(
m.content for m in user_msgs if m.content and m.content.strip()
)
db_message_id = await self.bot.db.save_message_and_analysis(
guild_id=user_ref_msg.guild.id,
channel_id=user_ref_msg.channel.id,
user_id=user_id,
username=username,
content=combined_content[:4000],
message_ts=user_ref_msg.created_at.replace(tzinfo=timezone.utc),
toxicity_score=score,
drama_score=drama_score,
categories=categories,
reasoning=reasoning,
off_topic=off_topic,
topic_category=topic_category,
topic_reasoning=topic_reasoning,
coherence_score=coherence_score,
coherence_flag=coherence_flag,
)
# Topic drift handling
if off_topic:
await self._handle_topic_drift(user_ref_msg, topic_category, topic_reasoning, db_message_id)
# Game channel redirect detection
if detected_game and game_channels and not dry_run:
await self._handle_channel_redirect(user_ref_msg, detected_game, game_channels, db_message_id)
# Coherence / intoxication detection
if coherence_config.get("enabled", True):
degradation = self.bot.drama_tracker.update_coherence(
user_id=user_id,
score=coherence_score,
flag=coherence_flag,
drop_threshold=coherence_config.get("drop_threshold", 0.3),
absolute_floor=coherence_config.get("absolute_floor", 0.5),
cooldown_minutes=coherence_config.get("cooldown_minutes", 30),
)
if degradation and not dry_run:
await self._handle_coherence_alert(user_ref_msg, degradation, coherence_config, db_message_id)
# Capture LLM note updates about this user
if note_update:
self.bot.drama_tracker.update_user_notes(user_id, note_update)
self._dirty_users.add(user_id)
# Mark dirty for coherence baseline drift even without actions
self._dirty_users.add(user_id)
# Always log analysis to #bcs-log if it exists
await self._log_analysis(user_ref_msg, score, drama_score, categories, reasoning, off_topic, topic_category)
# Moderation actions (skip in dry-run mode)
if not dry_run:
mute_threshold = self.bot.drama_tracker.get_mute_threshold(
user_id, base_mute_threshold
)
# Mute: rolling average OR single message spike
if drama_score >= mute_threshold or score >= spike_mute:
effective_score = max(drama_score, score)
await self._mute_user(user_ref_msg, effective_score, categories, db_message_id)
# Warn: rolling average OR single message spike
elif drama_score >= warning_threshold or score >= spike_warn:
effective_score = max(drama_score, score)
await self._warn_user(user_ref_msg, effective_score, db_message_id)
# -- Mention scan methods --
def _mark_analyzed(self, discord_message_id: int):
"""Track a Discord message ID as already analyzed."""
self._analyzed_message_ids.add(discord_message_id)
if len(self._analyzed_message_ids) > self._max_analyzed_ids:
sorted_ids = sorted(self._analyzed_message_ids)
self._analyzed_message_ids = set(sorted_ids[len(sorted_ids) // 2:])
async def _maybe_start_mention_scan(
self, trigger_message: discord.Message, mention_config: dict
):
"""Check cooldown and kick off a mention-triggered scan of recent messages."""
channel_id = trigger_message.channel.id
cooldown_seconds = mention_config.get("cooldown_seconds", 60)
now = datetime.now(timezone.utc)
last_scan = self._mention_scan_cooldowns.get(channel_id)
if last_scan and (now - last_scan).total_seconds() < cooldown_seconds:
logger.info(
"Mention scan cooldown active for #%s, skipping.",
getattr(trigger_message.channel, "name", "unknown"),
)
return
self._mention_scan_cooldowns[channel_id] = now
# Extract the user's concern (strip the bot ping from the message)
mention_text = trigger_message.content
for fmt in (f"<@{self.bot.user.id}>", f"<@!{self.bot.user.id}>"):
mention_text = mention_text.replace(fmt, "")
mention_text = mention_text.strip() or "(user pinged bot without specific concern)"
# Store as a mention scan task (separate from debounce)
existing_task = self._mention_scan_tasks.get(channel_id)
if existing_task and not existing_task.done():
existing_task.cancel()
self._mention_scan_tasks[channel_id] = asyncio.create_task(
self._run_mention_scan(trigger_message, mention_text, mention_config)
) )
# Mute: rolling average OR single message spike async def _run_mention_scan(
if drama_score >= mute_threshold or score >= spike_mute: self,
effective_score = max(drama_score, score) trigger_message: discord.Message,
await self._mute_user(message, effective_score, categories, db_message_id) mention_text: str,
# Warn: rolling average OR single message spike mention_config: dict,
elif drama_score >= warning_threshold or score >= spike_warn: ):
effective_score = max(drama_score, score) """Scan recent channel messages with ONE conversation-level LLM call."""
await self._warn_user(message, effective_score, db_message_id) channel = trigger_message.channel
scan_count = mention_config.get("scan_messages", 30)
config = self.bot.config
sentiment_config = config.get("sentiment", {})
game_channels = config.get("game_channels", {})
# Fetch recent messages (before the trigger, skip bots/empty)
raw_messages: list[discord.Message] = []
try:
async for msg in channel.history(limit=scan_count + 10, before=trigger_message):
if msg.author.bot:
continue
if not msg.content or not msg.content.strip():
continue
raw_messages.append(msg)
if len(raw_messages) >= scan_count:
break
except discord.HTTPException:
logger.warning("Failed to fetch history for mention scan in #%s",
getattr(channel, "name", "unknown"))
return
raw_messages.reverse() # chronological order
if not raw_messages:
self._mention_scan_results[trigger_message.id] = "No recent messages found to analyze."
return
logger.info(
"Mention scan triggered by %s in #%s: %d messages (single LLM call). Focus: %s",
trigger_message.author.display_name,
getattr(channel, "name", "unknown"),
len(raw_messages),
mention_text[:80],
)
# Build a lookup of message IDs to authors for resolving replies
msg_id_to_author: dict[int, str] = {
m.id: m.author.display_name for m in raw_messages
}
# Convert to conversation tuples: (username, content, timestamp, reply_to)
conversation: list[tuple[str, str, datetime, str | None]] = []
for msg in raw_messages:
reply_to = None
if msg.reference and msg.reference.message_id:
# Try our local lookup first
reply_to = msg_id_to_author.get(msg.reference.message_id)
if not reply_to:
# Try the cached message
ref = msg.reference.cached_message
if ref:
reply_to = ref.author.display_name
conversation.append((
msg.author.display_name,
msg.content,
msg.created_at,
reply_to,
))
# Build user notes map
user_notes_map: dict[str, str] = {}
for msg in raw_messages:
name = msg.author.display_name
if name not in user_notes_map:
notes = self.bot.drama_tracker.get_user_notes(msg.author.id)
if notes:
user_notes_map[name] = notes
channel_context = self._build_channel_context(raw_messages[0], game_channels)
mention_context = (
f"A user flagged this conversation and said: \"{mention_text}\"\n"
f"Pay special attention to whether this concern is valid."
)
# Single LLM call
result = await self.bot.llm.analyze_conversation(
conversation,
mention_context=mention_context,
channel_context=channel_context,
user_notes_map=user_notes_map,
)
if result is None:
logger.warning("Conversation analysis failed for mention scan.")
self._mention_scan_results[trigger_message.id] = "Analysis failed."
return
# Build username → user_id + ref_message mapping
user_lookup: dict[str, tuple[int, discord.Message, list[discord.Message]]] = {}
for msg in raw_messages:
name = msg.author.display_name
if name not in user_lookup:
user_lookup[name] = (msg.author.id, msg, [])
user_lookup[name][2].append(msg)
findings: list[str] = []
# Resolve thresholds once (outside the loop)
dry_run = config.get("monitoring", {}).get("dry_run", False)
mode_config = self.bot.get_mode_config()
moderation_level = mode_config.get("moderation", "full")
if moderation_level == "relaxed" and "relaxed_thresholds" in mode_config:
rt = mode_config["relaxed_thresholds"]
warning_threshold = rt.get("warning_threshold", 0.80)
base_mute_threshold = rt.get("mute_threshold", 0.85)
spike_warn = rt.get("spike_warning_threshold", 0.70)
spike_mute = rt.get("spike_mute_threshold", 0.85)
else:
warning_threshold = sentiment_config.get("warning_threshold", 0.6)
base_mute_threshold = sentiment_config.get("mute_threshold", 0.75)
spike_warn = sentiment_config.get("spike_warning_threshold", 0.5)
spike_mute = sentiment_config.get("spike_mute_threshold", 0.8)
for finding in result.get("user_findings", []):
username = finding["username"]
score = finding["toxicity_score"]
categories = finding["categories"]
reasoning = finding["reasoning"]
worst_msg = finding.get("worst_message")
off_topic = finding.get("off_topic", False)
note_update = finding.get("note_update")
lookup = user_lookup.get(username)
if not lookup:
logger.warning("Mention scan: LLM returned unknown user '%s', skipping.", username)
continue
user_id, ref_msg, user_msgs = lookup
# Skip if all their messages were already analyzed
if all(m.id in self._analyzed_message_ids for m in user_msgs):
continue
# Mark their messages as analyzed
for m in user_msgs:
self._mark_analyzed(m.id)
self.bot.drama_tracker.add_entry(user_id, score, categories, reasoning)
drama_score = self.bot.drama_tracker.get_drama_score(user_id)
# Save to DB
content_summary = f"[Mention scan] {worst_msg}" if worst_msg else "[Mention scan] See conversation"
db_message_id = await self.bot.db.save_message_and_analysis(
guild_id=ref_msg.guild.id,
channel_id=ref_msg.channel.id,
user_id=user_id,
username=username,
content=content_summary,
message_ts=ref_msg.created_at.replace(tzinfo=timezone.utc),
toxicity_score=score,
drama_score=drama_score,
categories=categories,
reasoning=reasoning,
off_topic=off_topic,
topic_category="personal_drama" if off_topic else "gaming",
topic_reasoning=reasoning,
coherence_score=None,
coherence_flag=None,
)
# Log to #bcs-log
await self._log_analysis(
ref_msg, score, drama_score, categories, reasoning,
off_topic, "personal_drama" if off_topic else "gaming",
)
# Collect notable findings for summary
if score >= 0.3:
cat_str = ", ".join(c for c in categories if c != "none") or "none"
findings.append(f"{username}: {score:.2f} ({cat_str})")
# Update user notes
if note_update:
self.bot.drama_tracker.update_user_notes(user_id, note_update)
self._dirty_users.add(user_id)
# Moderation actions
if not dry_run:
mute_threshold = self.bot.drama_tracker.get_mute_threshold(
user_id, base_mute_threshold
)
if drama_score >= mute_threshold or score >= spike_mute:
effective_score = max(drama_score, score)
await self._mute_user(ref_msg, effective_score, categories, db_message_id)
elif drama_score >= warning_threshold or score >= spike_warn:
effective_score = max(drama_score, score)
await self._warn_user(ref_msg, effective_score, db_message_id)
# Build summary for ChatCog
convo_summary = result.get("conversation_summary", "")
if findings:
summary = f"Scanned {len(raw_messages)} msgs. {convo_summary} Notable: " + "; ".join(findings[:5])
else:
summary = f"Scanned {len(raw_messages)} msgs. {convo_summary}"
# Prune old scan results
if len(self._mention_scan_results) > 20:
oldest = sorted(self._mention_scan_results.keys())[:len(self._mention_scan_results) - 10]
for k in oldest:
del self._mention_scan_results[k]
self._mention_scan_results[trigger_message.id] = summary
logger.info(
"Mention scan complete in #%s: 1 LLM call, %d messages, %d users flagged",
getattr(channel, "name", "unknown"),
len(raw_messages),
len(findings),
)
async def _mute_user( async def _mute_user(
self, self,
@@ -684,59 +1010,6 @@ class SentimentCog(commands.Cog):
details=f"from=#{channel_name} to=#{detected_game} game={game_name}", details=f"from=#{channel_name} to=#{detected_game} game={game_name}",
)) ))
def _store_context(self, message: discord.Message):
ch_id = message.channel.id
if ch_id not in self._channel_history:
max_ctx = self.bot.config.get("sentiment", {}).get(
"context_messages", 8
)
self._channel_history[ch_id] = deque(maxlen=max_ctx)
self._channel_history[ch_id].append(
(message.author.display_name, message.content, datetime.now(timezone.utc))
)
def _get_context(self, message: discord.Message) -> str:
"""Build a timestamped chat log from recent channel messages.
Excludes messages currently buffered for this user+channel
(those appear in the TARGET MESSAGE section instead).
"""
ch_id = message.channel.id
history = self._channel_history.get(ch_id, deque())
if not history:
return "(no prior context)"
now = datetime.now(timezone.utc)
# Collect IDs of messages in the current debounce batch so we can skip them
batch_key = (ch_id, message.author.id)
batch_msgs = self._message_buffer.get(batch_key, [])
# Build a set of (author, content) from the batch for fast lookup
batch_set = {(m.author.display_name, m.content) for m in batch_msgs}
lines = []
for name, content, ts in history:
if (name, content) in batch_set:
continue
delta = now - ts
rel = self._format_relative_time(delta)
lines.append(f"[{rel}] {name}: {content}")
if not lines:
return "(no prior context)"
return "\n".join(lines)
@staticmethod
def _format_relative_time(delta: timedelta) -> str:
total_seconds = int(delta.total_seconds())
if total_seconds < 60:
return f"~{total_seconds}s ago"
minutes = total_seconds // 60
if minutes < 60:
return f"~{minutes}m ago"
hours = minutes // 60
return f"~{hours}h ago"
async def _log_analysis( async def _log_analysis(
self, message: discord.Message, score: float, drama_score: float, self, message: discord.Message, score: float, drama_score: float,
categories: list[str], reasoning: str, off_topic: bool, topic_category: str, categories: list[str], reasoning: str, off_topic: bool, topic_category: str,
+6 -1
View File
@@ -33,8 +33,13 @@ topic_drift:
escalation_count: 3 # After this many reminds, DM the server owner escalation_count: 3 # After this many reminds, DM the server owner
reset_minutes: 60 # Reset off-topic count after this much on-topic behavior reset_minutes: 60 # Reset off-topic count after this much on-topic behavior
mention_scan:
enabled: true
scan_messages: 30 # Messages to scan per mention trigger
cooldown_seconds: 60 # Per-channel cooldown between scans
timeouts: timeouts:
escalation_minutes: [5, 15, 30, 60] # Escalating timeout durations escalation_minutes: [30, 60, 120, 240] # Escalating timeout durations
offense_reset_minutes: 120 # Reset offense counter after this much good behavior offense_reset_minutes: 120 # Reset offense counter after this much good behavior
warning_cooldown_minutes: 5 # Don't warn same user more than once per this window warning_cooldown_minutes: 5 # Don't warn same user more than once per this window
+13 -1
View File
@@ -36,4 +36,16 @@ If you notice something noteworthy about this user's communication style, behavi
GAME DETECTION — If CHANNEL INFO is provided, identify which specific game the message is discussing. Set detected_game to the channel name that best matches (e.g. "gta-online", "warzone", "battlefield", "cod-zombies") using ONLY the channel names listed in the channel info. If the message isn't about a specific game or you're unsure, set detected_game to null. GAME DETECTION — If CHANNEL INFO is provided, identify which specific game the message is discussing. Set detected_game to the channel name that best matches (e.g. "gta-online", "warzone", "battlefield", "cod-zombies") using ONLY the channel names listed in the channel info. If the message isn't about a specific game or you're unsure, set detected_game to null.
Use the report_analysis tool to report your analysis of the TARGET MESSAGE only. Use the report_analysis tool to report your analysis of the TARGET MESSAGE only.
CONVERSATION-LEVEL ANALYSIS (when given a CONVERSATION BLOCK instead of a single TARGET MESSAGE):
When you receive a full conversation block with multiple users, use the report_conversation_scan tool instead:
- Provide ONE finding per user (not per message) — aggregate their behavior across the conversation.
- Weight their average tone and worst message equally when determining the toxicity_score.
- Use the same scoring bands (0.0-1.0) as for single messages.
- Quote the worst/most problematic snippet in worst_message (max 100 chars, exact quote).
- Flag off_topic if user's messages are primarily personal drama, not gaming.
- For each user, assess coherence_score (0.0-1.0) and coherence_flag using the same criteria as single-message analysis. Normal texting shortcuts and abbreviations are fine (score ~0.85+).
- For each user, determine topic_category and provide brief topic_reasoning for their messages.
- For each user, check detected_game against the CHANNEL INFO section (if provided). Set to the game channel name if their messages are about a specific game, or null otherwise.
- If a USER REPORT section is present, pay close attention to whether that specific concern is valid.
+267 -7
View File
@@ -3,6 +3,7 @@ import base64
import json import json
import logging import logging
import time import time
from datetime import datetime, timedelta, timezone
from pathlib import Path from pathlib import Path
from openai import AsyncOpenAI from openai import AsyncOpenAI
@@ -97,6 +98,114 @@ ANALYSIS_TOOL = {
} }
CONVERSATION_TOOL = {
"type": "function",
"function": {
"name": "report_conversation_scan",
"description": "Analyze a conversation block and report findings per user.",
"parameters": {
"type": "object",
"properties": {
"user_findings": {
"type": "array",
"items": {
"type": "object",
"properties": {
"username": {
"type": "string",
"description": "Discord display name of the user.",
},
"toxicity_score": {
"type": "number",
"description": "Weighted toxicity 0.0-1.0 across their messages in this conversation.",
},
"categories": {
"type": "array",
"items": {
"type": "string",
"enum": [
"aggressive",
"passive_aggressive",
"instigating",
"hostile",
"manipulative",
"sexual_vulgar",
"none",
],
},
"description": "Detected toxicity behavior categories.",
},
"reasoning": {
"type": "string",
"description": "Brief explanation of this user's behavior in the conversation.",
},
"worst_message": {
"type": ["string", "null"],
"description": "Most problematic snippet from this user (quoted, max 100 chars), or null if nothing notable.",
},
"off_topic": {
"type": "boolean",
"description": "True if this user's messages were primarily off-topic personal drama.",
},
"topic_category": {
"type": "string",
"enum": [
"gaming",
"personal_drama",
"relationship_issues",
"real_life_venting",
"gossip",
"general_chat",
"meta",
],
"description": "What topic category this user's messages fall into.",
},
"topic_reasoning": {
"type": "string",
"description": "Brief explanation of the topic classification for this user.",
},
"coherence_score": {
"type": "number",
"description": "Coherence rating 0.0-1.0 across this user's messages. Normal texting shortcuts are fine.",
},
"coherence_flag": {
"type": "string",
"enum": [
"normal",
"intoxicated",
"tired",
"angry_typing",
"mobile_keyboard",
"language_barrier",
],
"description": "Best guess at why coherence is low, if applicable.",
},
"note_update": {
"type": ["string", "null"],
"description": "New observation about this user's pattern, or null.",
},
"detected_game": {
"type": ["string", "null"],
"description": "The game channel name this user's messages are about, or null.",
},
},
"required": ["username", "toxicity_score", "categories", "reasoning", "off_topic", "topic_category", "topic_reasoning", "coherence_score", "coherence_flag"],
},
"description": "Findings for each user who participated in the conversation.",
},
"conversation_summary": {
"type": "string",
"description": "One-sentence summary of the overall conversation tone and any escalation patterns.",
},
},
"required": ["user_findings", "conversation_summary"],
},
},
}
_NO_TEMPERATURE_MODELS = {"gpt-5-nano", "o1", "o1-mini", "o1-preview", "o3", "o3-mini", "o4-mini"}
class LLMClient: class LLMClient:
def __init__(self, base_url: str, model: str, api_key: str = "not-needed", def __init__(self, base_url: str, model: str, api_key: str = "not-needed",
db=None, no_think: bool = False, concurrency: int = 4): db=None, no_think: bool = False, concurrency: int = 4):
@@ -104,6 +213,7 @@ class LLMClient:
self.host = base_url.rstrip("/") self.host = base_url.rstrip("/")
self._db = db self._db = db
self._no_think = no_think self._no_think = no_think
self._supports_temperature = model not in _NO_TEMPERATURE_MODELS
timeout = 600.0 if self.host else 120.0 # local models need longer for VRAM load timeout = 600.0 if self.host else 120.0 # local models need longer for VRAM load
client_kwargs = {"api_key": api_key, "timeout": timeout} client_kwargs = {"api_key": api_key, "timeout": timeout}
if self.host: if self.host:
@@ -137,13 +247,15 @@ class LLMClient:
async def analyze_message( async def analyze_message(
self, message: str, context: str = "", user_notes: str = "", self, message: str, context: str = "", user_notes: str = "",
channel_context: str = "", channel_context: str = "", mention_context: str = "",
) -> dict | None: ) -> dict | None:
user_content = f"=== RECENT CHANNEL MESSAGES (for background context only) ===\n{context}\n\n" user_content = f"=== RECENT CHANNEL MESSAGES (for background context only) ===\n{context}\n\n"
if user_notes: if user_notes:
user_content += f"=== NOTES ABOUT THIS USER (from prior analysis) ===\n{user_notes}\n\n" user_content += f"=== NOTES ABOUT THIS USER (from prior analysis) ===\n{user_notes}\n\n"
if channel_context: if channel_context:
user_content += f"=== CHANNEL INFO ===\n{channel_context}\n\n" user_content += f"=== CHANNEL INFO ===\n{channel_context}\n\n"
if mention_context:
user_content += f"=== USER REPORT (a user flagged this conversation — focus on this concern) ===\n{mention_context}\n\n"
user_content += f"=== TARGET MESSAGE (analyze THIS message only) ===\n{message}" user_content += f"=== TARGET MESSAGE (analyze THIS message only) ===\n{message}"
user_content = self._append_no_think(user_content) user_content = self._append_no_think(user_content)
@@ -155,6 +267,7 @@ class LLMClient:
async with self._semaphore: async with self._semaphore:
try: try:
temp_kwargs = {"temperature": 0.1} if self._supports_temperature else {}
response = await self._client.chat.completions.create( response = await self._client.chat.completions.create(
model=self.model, model=self.model,
messages=[ messages=[
@@ -163,7 +276,7 @@ class LLMClient:
], ],
tools=[ANALYSIS_TOOL], tools=[ANALYSIS_TOOL],
tool_choice={"type": "function", "function": {"name": "report_analysis"}}, tool_choice={"type": "function", "function": {"name": "report_analysis"}},
temperature=0.1, **temp_kwargs,
max_completion_tokens=2048, max_completion_tokens=2048,
) )
@@ -253,6 +366,152 @@ class LLMClient:
logger.warning("Could not parse LLM content fallback: %s", text[:200]) logger.warning("Could not parse LLM content fallback: %s", text[:200])
return None return None
# -- Conversation-level analysis (mention scan) --
@staticmethod
def _format_relative_time(delta: timedelta) -> str:
total_seconds = int(delta.total_seconds())
if total_seconds < 60:
return f"~{total_seconds}s ago"
minutes = total_seconds // 60
if minutes < 60:
return f"~{minutes}m ago"
hours = minutes // 60
return f"~{hours}h ago"
@staticmethod
def _format_conversation_block(
messages: list[tuple[str, str, datetime, str | None]],
now: datetime | None = None,
) -> str:
"""Format messages as a compact timestamped chat block.
Each tuple is (username, content, timestamp, reply_to_username).
Consecutive messages from the same user collapse to indented lines.
Replies shown as ``username → replied_to:``.
"""
if now is None:
now = datetime.now(timezone.utc)
lines = [f"[Current time: {now.strftime('%I:%M %p')}]", ""]
last_user = None
for username, content, ts, reply_to in messages:
delta = now - ts.replace(tzinfo=timezone.utc) if ts.tzinfo is None else now - ts
rel = LLMClient._format_relative_time(delta)
if username == last_user:
# Continuation from same user — indent
for line in content.split("\n"):
lines.append(f" {line}")
else:
# New user block
if reply_to:
prefix = f"[{rel}] {username}{reply_to}: "
else:
prefix = f"[{rel}] {username}: "
msg_lines = content.split("\n")
lines.append(prefix + msg_lines[0])
for line in msg_lines[1:]:
lines.append(f" {line}")
last_user = username
return "\n".join(lines)
async def analyze_conversation(
self,
messages: list[tuple[str, str, datetime, str | None]],
mention_context: str = "",
channel_context: str = "",
user_notes_map: dict[str, str] | None = None,
) -> dict | None:
"""Analyze a conversation block in one call, returning per-user findings."""
if not messages:
return None
convo_block = self._format_conversation_block(messages)
user_content = f"=== CONVERSATION BLOCK ===\n{convo_block}\n\n"
if user_notes_map:
notes_lines = [f" {u}: {n}" for u, n in user_notes_map.items() if n]
if notes_lines:
user_content += "=== USER NOTES (from prior analysis) ===\n" + "\n".join(notes_lines) + "\n\n"
if channel_context:
user_content += f"=== CHANNEL INFO ===\n{channel_context}\n\n"
if mention_context:
user_content += f"=== USER REPORT (a user flagged this conversation — focus on this concern) ===\n{mention_context}\n\n"
user_content += "Analyze the conversation block above and report findings for each user."
user_content = self._append_no_think(user_content)
req_json = json.dumps([
{"role": "system", "content": SYSTEM_PROMPT[:500]},
{"role": "user", "content": user_content[:500]},
], default=str)
t0 = time.monotonic()
async with self._semaphore:
try:
temp_kwargs = {"temperature": 0.1} if self._supports_temperature else {}
response = await self._client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": user_content},
],
tools=[CONVERSATION_TOOL],
tool_choice={"type": "function", "function": {"name": "report_conversation_scan"}},
**temp_kwargs,
max_completion_tokens=4096,
)
elapsed = int((time.monotonic() - t0) * 1000)
choice = response.choices[0]
usage = response.usage
if choice.message.tool_calls:
tool_call = choice.message.tool_calls[0]
resp_text = tool_call.function.arguments
args = json.loads(resp_text)
self._log_llm("conversation", elapsed, True, req_json, resp_text,
input_tokens=usage.prompt_tokens if usage else None,
output_tokens=usage.completion_tokens if usage else None)
return self._validate_conversation_result(args)
logger.warning("No tool call in conversation analysis response.")
self._log_llm("conversation", elapsed, False, req_json, error="Empty response")
return None
except Exception as e:
elapsed = int((time.monotonic() - t0) * 1000)
logger.error("LLM conversation analysis error: %s", e)
self._log_llm("conversation", elapsed, False, req_json, error=str(e))
return None
@staticmethod
def _validate_conversation_result(result: dict) -> dict:
"""Validate and normalize conversation analysis result."""
findings = result.get("user_findings", [])
for finding in findings:
finding.setdefault("username", "unknown")
score = float(finding.get("toxicity_score", 0.0))
finding["toxicity_score"] = min(max(score, 0.0), 1.0)
if not isinstance(finding.get("categories"), list):
finding["categories"] = ["none"]
finding.setdefault("reasoning", "")
finding.setdefault("worst_message", None)
finding["off_topic"] = bool(finding.get("off_topic", False))
finding.setdefault("topic_category", "general_chat")
finding.setdefault("topic_reasoning", "")
coherence = float(finding.get("coherence_score", 0.85))
finding["coherence_score"] = min(max(coherence, 0.0), 1.0)
finding.setdefault("coherence_flag", "normal")
finding.setdefault("note_update", None)
finding.setdefault("detected_game", None)
result["user_findings"] = findings
result.setdefault("conversation_summary", "")
return result
async def chat( async def chat(
self, messages: list[dict[str, str]], system_prompt: str, self, messages: list[dict[str, str]], system_prompt: str,
on_first_token=None, recent_bot_replies: list[str] | None = None, on_first_token=None, recent_bot_replies: list[str] | None = None,
@@ -285,16 +544,15 @@ class LLMClient:
async with self._semaphore: async with self._semaphore:
try: try:
temp_kwargs = {"temperature": 0.9, "frequency_penalty": 0.8, "presence_penalty": 0.6} if self._supports_temperature else {}
stream = await self._client.chat.completions.create( stream = await self._client.chat.completions.create(
model=self.model, model=self.model,
messages=[ messages=[
{"role": "system", "content": effective_prompt}, {"role": "system", "content": effective_prompt},
*patched, *patched,
], ],
temperature=0.9, **temp_kwargs,
max_completion_tokens=2048, max_completion_tokens=2048,
frequency_penalty=0.8,
presence_penalty=0.6,
stream=True, stream=True,
) )
@@ -350,13 +608,14 @@ class LLMClient:
async with self._semaphore: async with self._semaphore:
try: try:
async def _stream_image(): async def _stream_image():
temp_kwargs = {"temperature": 0.8} if self._supports_temperature else {}
stream = await self._client.chat.completions.create( stream = await self._client.chat.completions.create(
model=self.model, model=self.model,
messages=[ messages=[
{"role": "system", "content": system_prompt}, {"role": "system", "content": system_prompt},
{"role": "user", "content": user_content}, {"role": "user", "content": user_content},
], ],
temperature=0.8, **temp_kwargs,
max_completion_tokens=2048, max_completion_tokens=2048,
stream=True, stream=True,
) )
@@ -409,6 +668,7 @@ class LLMClient:
async with self._semaphore: async with self._semaphore:
try: try:
temp_kwargs = {"temperature": 0.1} if self._supports_temperature else {}
response = await self._client.chat.completions.create( response = await self._client.chat.completions.create(
model=self.model, model=self.model,
messages=[ messages=[
@@ -417,7 +677,7 @@ class LLMClient:
], ],
tools=[ANALYSIS_TOOL], tools=[ANALYSIS_TOOL],
tool_choice={"type": "function", "function": {"name": "report_analysis"}}, tool_choice={"type": "function", "function": {"name": "report_analysis"}},
temperature=0.1, **temp_kwargs,
max_completion_tokens=2048, max_completion_tokens=2048,
) )