Files
Breehavior-Monitor/cogs/sentiment.py
AJ Isaacs a35705d3f1 Initial commit: Breehavior Monitor Discord bot
Discord bot for monitoring chat sentiment and tracking drama using
Ollama LLM on athena.lan. Includes sentiment analysis, slash commands,
drama tracking, and SQL Server persistence via Docker Compose.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-20 22:39:40 -05:00

556 lines
22 KiB
Python

import asyncio
import logging
from collections import deque
from datetime import datetime, timedelta, timezone
import discord
from discord.ext import commands, tasks
logger = logging.getLogger("bcs.sentiment")
# How often to flush dirty user states to DB (seconds)
STATE_FLUSH_INTERVAL = 300 # 5 minutes
class SentimentCog(commands.Cog):
def __init__(self, bot: commands.Bot):
self.bot = bot
# Per-channel message history for context: {channel_id: deque of (author, content)}
self._channel_history: dict[int, deque] = {}
# Track which user IDs have unsaved in-memory changes
self._dirty_users: set[int] = set()
async def cog_load(self):
self._flush_states.start()
async def cog_unload(self):
self._flush_states.cancel()
# Final flush on shutdown
await self._flush_dirty_states()
@commands.Cog.listener()
async def on_message(self, message: discord.Message):
logger.info("MSG from %s in #%s: %s", message.author, getattr(message.channel, 'name', 'DM'), message.content[:80] if message.content else "(empty)")
# Ignore bots (including ourselves)
if message.author.bot:
return
# Ignore DMs
if not message.guild:
return
config = self.bot.config
monitoring = config.get("monitoring", {})
if not monitoring.get("enabled", True):
return
# Check if channel is monitored
monitored_channels = monitoring.get("channels", [])
if monitored_channels and message.channel.id not in monitored_channels:
return
# Check ignored users
if message.author.id in monitoring.get("ignored_users", []):
return
# Check immune roles
immune_roles = set(monitoring.get("immune_roles", []))
if immune_roles and any(
r.id in immune_roles for r in message.author.roles
):
return
# Check per-user immunity
if self.bot.drama_tracker.is_immune(message.author.id):
return
# Store message in channel history for context
self._store_context(message)
# Skip if empty
if not message.content or not message.content.strip():
return
# Check per-user analysis cooldown
sentiment_config = config.get("sentiment", {})
cooldown = sentiment_config.get("cooldown_between_analyses", 2)
if not self.bot.drama_tracker.can_analyze(message.author.id, cooldown):
return
# Analyze the message
context = self._get_context(message)
user_notes = self.bot.drama_tracker.get_user_notes(message.author.id)
result = await self.bot.ollama.analyze_message(
message.content, context, user_notes=user_notes
)
if result is None:
return
score = result["toxicity_score"]
categories = result["categories"]
reasoning = result["reasoning"]
# Track the result
self.bot.drama_tracker.add_entry(
message.author.id, score, categories, reasoning
)
drama_score = self.bot.drama_tracker.get_drama_score(message.author.id)
logger.info(
"User %s (%d) | msg_score=%.2f | drama_score=%.2f | categories=%s | %s",
message.author.display_name,
message.author.id,
score,
drama_score,
categories,
reasoning,
)
# Topic drift detection
off_topic = result.get("off_topic", False)
topic_category = result.get("topic_category", "general_chat")
topic_reasoning = result.get("topic_reasoning", "")
# Save message + analysis to DB (awaited — need message_id for action links)
db_message_id = await self.bot.db.save_message_and_analysis(
guild_id=message.guild.id,
channel_id=message.channel.id,
user_id=message.author.id,
username=message.author.display_name,
content=message.content,
message_ts=message.created_at.replace(tzinfo=timezone.utc),
toxicity_score=score,
drama_score=drama_score,
categories=categories,
reasoning=reasoning,
off_topic=off_topic,
topic_category=topic_category,
topic_reasoning=topic_reasoning,
coherence_score=result.get("coherence_score"),
coherence_flag=result.get("coherence_flag"),
)
if off_topic:
await self._handle_topic_drift(message, topic_category, topic_reasoning, db_message_id)
# Coherence / intoxication detection
coherence_score = result.get("coherence_score", 0.85)
coherence_flag = result.get("coherence_flag", "normal")
coherence_config = config.get("coherence", {})
if coherence_config.get("enabled", True):
degradation = self.bot.drama_tracker.update_coherence(
user_id=message.author.id,
score=coherence_score,
flag=coherence_flag,
drop_threshold=coherence_config.get("drop_threshold", 0.3),
absolute_floor=coherence_config.get("absolute_floor", 0.5),
cooldown_minutes=coherence_config.get("cooldown_minutes", 30),
)
if degradation and not config.get("monitoring", {}).get("dry_run", False):
await self._handle_coherence_alert(message, degradation, coherence_config, db_message_id)
# Capture LLM note updates about this user
note_update = result.get("note_update")
if note_update:
self.bot.drama_tracker.update_user_notes(message.author.id, note_update)
self._dirty_users.add(message.author.id)
# Mark dirty for coherence baseline drift even without actions
self._dirty_users.add(message.author.id)
# Always log analysis to #bcs-log if it exists
await self._log_analysis(message, score, drama_score, categories, reasoning, off_topic, topic_category)
# Dry-run mode: skip warnings/mutes
dry_run = config.get("monitoring", {}).get("dry_run", False)
if dry_run:
return
# Check thresholds — both rolling average AND single-message spikes
warning_threshold = sentiment_config.get("warning_threshold", 0.6)
base_mute_threshold = sentiment_config.get("mute_threshold", 0.75)
mute_threshold = self.bot.drama_tracker.get_mute_threshold(
message.author.id, base_mute_threshold
)
spike_warn = sentiment_config.get("spike_warning_threshold", 0.5)
spike_mute = sentiment_config.get("spike_mute_threshold", 0.8)
# Mute: rolling average OR single message spike
if drama_score >= mute_threshold or score >= spike_mute:
effective_score = max(drama_score, score)
await self._mute_user(message, effective_score, categories, db_message_id)
# Warn: rolling average OR single message spike
elif drama_score >= warning_threshold or score >= spike_warn:
effective_score = max(drama_score, score)
await self._warn_user(message, effective_score, db_message_id)
async def _mute_user(
self,
message: discord.Message,
score: float,
categories: list[str],
db_message_id: int | None = None,
):
member = message.author
if not isinstance(member, discord.Member):
return
# Check bot permissions
if not message.guild.me.guild_permissions.moderate_members:
logger.warning("Missing moderate_members permission, cannot mute.")
return
# Record offense and get escalating timeout
offense_num = self.bot.drama_tracker.record_offense(member.id)
timeout_config = self.bot.config.get("timeouts", {})
escalation = timeout_config.get("escalation_minutes", [5, 15, 30, 60])
idx = min(offense_num - 1, len(escalation) - 1)
duration_minutes = escalation[idx]
try:
await member.timeout(
timedelta(minutes=duration_minutes),
reason=f"BCS auto-mute: drama score {score:.2f}",
)
except discord.Forbidden:
logger.warning("Cannot timeout %s — role hierarchy issue.", member)
return
except discord.HTTPException as e:
logger.error("Failed to timeout %s: %s", member, e)
return
# Build embed
messages_config = self.bot.config.get("messages", {})
cat_str = ", ".join(c for c in categories if c != "none") or "general negativity"
embed = discord.Embed(
title=messages_config.get("mute_title", "BREEHAVIOR ALERT"),
description=messages_config.get("mute_description", "").format(
username=member.display_name,
duration=f"{duration_minutes} minutes",
score=f"{score:.2f}",
categories=cat_str,
),
color=discord.Color.red(),
)
embed.set_footer(
text=f"Offense #{offense_num} | Timeout: {duration_minutes}m"
)
await message.channel.send(embed=embed)
await self._log_action(
message.guild,
f"**MUTE** | {member.mention} | Score: {score:.2f} | "
f"Duration: {duration_minutes}m | Offense #{offense_num} | "
f"Categories: {cat_str}",
)
logger.info(
"Muted %s for %d minutes (offense #%d, score %.2f)",
member,
duration_minutes,
offense_num,
score,
)
# Persist mute action and updated user state (fire-and-forget)
asyncio.create_task(self.bot.db.save_action(
guild_id=message.guild.id,
user_id=member.id,
username=member.display_name,
action_type="mute",
message_id=db_message_id,
details=f"duration={duration_minutes}m offense={offense_num} score={score:.2f} categories={cat_str}",
))
self._save_user_state(member.id)
async def _warn_user(self, message: discord.Message, score: float, db_message_id: int | None = None):
timeout_config = self.bot.config.get("timeouts", {})
cooldown = timeout_config.get("warning_cooldown_minutes", 5)
if not self.bot.drama_tracker.can_warn(message.author.id, cooldown):
return
self.bot.drama_tracker.record_warning(message.author.id)
# React with warning emoji
try:
await message.add_reaction("\u26a0\ufe0f")
except discord.HTTPException:
pass
# Send warning message
messages_config = self.bot.config.get("messages", {})
warning_text = messages_config.get(
"warning",
"Easy there, {username}. The Breehavior Monitor is watching.",
).format(username=message.author.display_name)
await message.channel.send(warning_text)
await self._log_action(
message.guild,
f"**WARNING** | {message.author.mention} | Score: {score:.2f}",
)
logger.info("Warned %s (score %.2f)", message.author, score)
# Persist warning action (fire-and-forget)
asyncio.create_task(self.bot.db.save_action(
guild_id=message.guild.id,
user_id=message.author.id,
username=message.author.display_name,
action_type="warning",
message_id=db_message_id,
details=f"score={score:.2f}",
))
async def _handle_topic_drift(
self, message: discord.Message, topic_category: str, topic_reasoning: str,
db_message_id: int | None = None,
):
config = self.bot.config.get("topic_drift", {})
if not config.get("enabled", True):
return
# Check if we're in dry-run mode — still track but don't act
dry_run = self.bot.config.get("monitoring", {}).get("dry_run", False)
if dry_run:
return
tracker = self.bot.drama_tracker
user_id = message.author.id
cooldown = config.get("remind_cooldown_minutes", 10)
if not tracker.can_topic_remind(user_id, cooldown):
return
count = tracker.record_off_topic(user_id)
escalation_threshold = config.get("escalation_count", 3)
messages_config = self.bot.config.get("messages", {})
if count >= escalation_threshold and not tracker.was_owner_notified(user_id):
# DM the server owner
tracker.mark_owner_notified(user_id)
owner = message.guild.owner
if owner:
dm_text = messages_config.get(
"topic_owner_dm",
"Heads up: {username} keeps going off-topic in #{channel}. Reminded {count} times.",
).format(
username=message.author.display_name,
channel=message.channel.name,
count=count,
)
try:
await owner.send(dm_text)
except discord.HTTPException:
logger.warning("Could not DM server owner about topic drift.")
await self._log_action(
message.guild,
f"**TOPIC DRIFT — OWNER NOTIFIED** | {message.author.mention} | "
f"Off-topic count: {count} | Category: {topic_category}",
)
logger.info("Notified owner about %s topic drift (count %d)", message.author, count)
asyncio.create_task(self.bot.db.save_action(
guild_id=message.guild.id, user_id=user_id,
username=message.author.display_name,
action_type="topic_escalation", message_id=db_message_id,
details=f"off_topic_count={count} category={topic_category}",
))
self._save_user_state(user_id)
elif count >= 2:
# Firmer nudge
nudge_text = messages_config.get(
"topic_nudge",
"{username}, let's keep it to gaming talk in here.",
).format(username=message.author.display_name)
await message.channel.send(nudge_text)
await self._log_action(
message.guild,
f"**TOPIC NUDGE** | {message.author.mention} | "
f"Off-topic count: {count} | Category: {topic_category}",
)
logger.info("Topic nudge for %s (count %d)", message.author, count)
asyncio.create_task(self.bot.db.save_action(
guild_id=message.guild.id, user_id=user_id,
username=message.author.display_name,
action_type="topic_nudge", message_id=db_message_id,
details=f"off_topic_count={count} category={topic_category}",
))
self._save_user_state(user_id)
else:
# Friendly first reminder
remind_text = messages_config.get(
"topic_remind",
"Hey {username}, this is a gaming server — maybe take the personal stuff to DMs?",
).format(username=message.author.display_name)
await message.channel.send(remind_text)
await self._log_action(
message.guild,
f"**TOPIC REMIND** | {message.author.mention} | "
f"Category: {topic_category} | {topic_reasoning}",
)
logger.info("Topic remind for %s (count %d)", message.author, count)
asyncio.create_task(self.bot.db.save_action(
guild_id=message.guild.id, user_id=user_id,
username=message.author.display_name,
action_type="topic_remind", message_id=db_message_id,
details=f"off_topic_count={count} category={topic_category} reasoning={topic_reasoning}",
))
self._save_user_state(user_id)
async def _handle_coherence_alert(
self, message: discord.Message, degradation: dict, coherence_config: dict,
db_message_id: int | None = None,
):
flag = degradation["flag"]
messages_map = coherence_config.get("messages", {})
alert_text = messages_map.get(flag, messages_map.get(
"default", "You okay there, {username}? That message was... something."
)).format(username=message.author.display_name)
await message.channel.send(alert_text)
await self._log_action(
message.guild,
f"**COHERENCE ALERT** | {message.author.mention} | "
f"Score: {degradation['current']:.2f} | Baseline: {degradation['baseline']:.2f} | "
f"Drop: {degradation['drop']:.2f} | Flag: {flag}",
)
logger.info(
"Coherence alert for %s: score=%.2f baseline=%.2f drop=%.2f flag=%s",
message.author, degradation["current"], degradation["baseline"],
degradation["drop"], flag,
)
asyncio.create_task(self.bot.db.save_action(
guild_id=message.guild.id,
user_id=message.author.id,
username=message.author.display_name,
action_type="coherence_alert",
message_id=db_message_id,
details=f"score={degradation['current']:.2f} baseline={degradation['baseline']:.2f} drop={degradation['drop']:.2f} flag={flag}",
))
self._save_user_state(message.author.id)
def _save_user_state(self, user_id: int) -> None:
"""Fire-and-forget save of a user's current state to DB."""
user_data = self.bot.drama_tracker.get_user(user_id)
asyncio.create_task(self.bot.db.save_user_state(
user_id=user_id,
offense_count=user_data.offense_count,
immune=user_data.immune,
off_topic_count=user_data.off_topic_count,
baseline_coherence=user_data.baseline_coherence,
user_notes=user_data.notes or None,
))
self._dirty_users.discard(user_id)
@tasks.loop(seconds=STATE_FLUSH_INTERVAL)
async def _flush_states(self):
await self._flush_dirty_states()
@_flush_states.before_loop
async def _before_flush(self):
await self.bot.wait_until_ready()
async def _flush_dirty_states(self) -> None:
"""Save all dirty user states to DB."""
if not self._dirty_users:
return
dirty = list(self._dirty_users)
self._dirty_users.clear()
for user_id in dirty:
user_data = self.bot.drama_tracker.get_user(user_id)
await self.bot.db.save_user_state(
user_id=user_id,
offense_count=user_data.offense_count,
immune=user_data.immune,
off_topic_count=user_data.off_topic_count,
baseline_coherence=user_data.baseline_coherence,
user_notes=user_data.notes or None,
)
logger.info("Flushed %d dirty user states to DB.", len(dirty))
def _store_context(self, message: discord.Message):
ch_id = message.channel.id
if ch_id not in self._channel_history:
max_ctx = self.bot.config.get("sentiment", {}).get(
"context_messages", 3
)
self._channel_history[ch_id] = deque(maxlen=max_ctx + 1)
self._channel_history[ch_id].append(
(message.author.display_name, message.content)
)
def _get_context(self, message: discord.Message) -> str:
ch_id = message.channel.id
history = self._channel_history.get(ch_id, deque())
# Exclude the current message (last item)
context_entries = list(history)[:-1] if len(history) > 1 else []
if not context_entries:
return "(no prior context)"
return " | ".join(
f"{name}: {content}" for name, content in context_entries
)
async def _log_analysis(
self, message: discord.Message, score: float, drama_score: float,
categories: list[str], reasoning: str, off_topic: bool, topic_category: str,
):
log_channel = discord.utils.get(
message.guild.text_channels, name="bcs-log"
)
if not log_channel:
return
# Only log notable messages (score > 0.1) to avoid spam
if score <= 0.1:
return
cat_str = ", ".join(c for c in categories if c != "none") or "none"
embed = discord.Embed(
title=f"Analysis: {message.author.display_name}",
description=f"#{message.channel.name}: {message.content[:200]}",
color=self._score_color(score),
)
embed.add_field(name="Message Score", value=f"{score:.2f}", inline=True)
embed.add_field(name="Rolling Drama", value=f"{drama_score:.2f}", inline=True)
embed.add_field(name="Categories", value=cat_str, inline=True)
embed.add_field(name="Reasoning", value=reasoning[:1024] or "n/a", inline=False)
try:
await log_channel.send(embed=embed)
except discord.HTTPException:
pass
@staticmethod
def _score_color(score: float) -> discord.Color:
if score >= 0.75:
return discord.Color.red()
if score >= 0.6:
return discord.Color.orange()
if score >= 0.3:
return discord.Color.yellow()
return discord.Color.green()
async def _log_action(self, guild: discord.Guild, text: str):
log_channel = discord.utils.get(guild.text_channels, name="bcs-log")
if log_channel:
try:
await log_channel.send(text)
except discord.HTTPException:
pass
async def setup(bot: commands.Bot):
await bot.add_cog(SentimentCog(bot))