Initial commit: Breehavior Monitor Discord bot

Discord bot for monitoring chat sentiment and tracking drama using
Ollama LLM on athena.lan. Includes sentiment analysis, slash commands,
drama tracking, and SQL Server persistence via Docker Compose.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-02-20 22:39:40 -05:00
commit a35705d3f1
15 changed files with 2425 additions and 0 deletions

0
utils/__init__.py Normal file
View File

353
utils/database.py Normal file
View File

@@ -0,0 +1,353 @@
import asyncio
import json
import logging
import os
from datetime import datetime, timezone
logger = logging.getLogger("bcs.database")
class Database:
def __init__(self):
self._conn_str = os.getenv("DB_CONNECTION_STRING", "")
self._available = False
async def init(self) -> bool:
"""Initialize the database connection and create schema.
Returns True if DB is available, False for memory-only mode."""
if not self._conn_str:
logger.warning("DB_CONNECTION_STRING not set — running in memory-only mode.")
return False
try:
import pyodbc
self._pyodbc = pyodbc
except ImportError:
logger.warning("pyodbc not installed — running in memory-only mode.")
return False
try:
conn = await asyncio.to_thread(self._connect)
await asyncio.to_thread(self._create_schema, conn)
conn.close()
self._available = True
logger.info("Database initialized successfully.")
return True
except Exception:
logger.exception("Database initialization failed — running in memory-only mode.")
return False
def _connect(self):
return self._pyodbc.connect(self._conn_str, autocommit=True)
def _create_schema(self, conn):
cursor = conn.cursor()
# Create database if it doesn't exist
db_name = self._parse_database_name()
if db_name:
cursor.execute(
f"IF DB_ID('{db_name}') IS NULL CREATE DATABASE [{db_name}]"
)
cursor.execute(f"USE [{db_name}]")
cursor.execute("""
IF NOT EXISTS (SELECT * FROM sys.tables WHERE name = 'Messages')
CREATE TABLE Messages (
Id BIGINT IDENTITY(1,1) PRIMARY KEY,
GuildId BIGINT NOT NULL,
ChannelId BIGINT NOT NULL,
UserId BIGINT NOT NULL,
Username NVARCHAR(100) NOT NULL,
Content NVARCHAR(MAX) NOT NULL,
MessageTs DATETIME2 NOT NULL,
CreatedAt DATETIME2 NOT NULL DEFAULT SYSUTCDATETIME()
)
""")
cursor.execute("""
IF NOT EXISTS (SELECT * FROM sys.tables WHERE name = 'AnalysisResults')
CREATE TABLE AnalysisResults (
Id BIGINT IDENTITY(1,1) PRIMARY KEY,
MessageId BIGINT NOT NULL REFERENCES Messages(Id),
ToxicityScore FLOAT NOT NULL,
DramaScore FLOAT NOT NULL,
Categories NVARCHAR(500) NOT NULL,
Reasoning NVARCHAR(MAX) NOT NULL,
OffTopic BIT NOT NULL DEFAULT 0,
TopicCategory NVARCHAR(100) NULL,
TopicReasoning NVARCHAR(MAX) NULL,
CreatedAt DATETIME2 NOT NULL DEFAULT SYSUTCDATETIME()
)
""")
cursor.execute("""
IF NOT EXISTS (SELECT * FROM sys.tables WHERE name = 'Actions')
CREATE TABLE Actions (
Id BIGINT IDENTITY(1,1) PRIMARY KEY,
GuildId BIGINT NOT NULL,
UserId BIGINT NOT NULL,
Username NVARCHAR(100) NOT NULL,
ActionType NVARCHAR(50) NOT NULL,
MessageId BIGINT NULL REFERENCES Messages(Id),
Details NVARCHAR(MAX) NULL,
CreatedAt DATETIME2 NOT NULL DEFAULT SYSUTCDATETIME()
)
""")
cursor.execute("""
IF NOT EXISTS (SELECT * FROM sys.tables WHERE name = 'UserState')
CREATE TABLE UserState (
UserId BIGINT NOT NULL PRIMARY KEY,
OffenseCount INT NOT NULL DEFAULT 0,
Immune BIT NOT NULL DEFAULT 0,
OffTopicCount INT NOT NULL DEFAULT 0,
UpdatedAt DATETIME2 NOT NULL DEFAULT SYSUTCDATETIME()
)
""")
# --- Schema migrations for coherence feature ---
cursor.execute("""
IF COL_LENGTH('AnalysisResults', 'CoherenceScore') IS NULL
ALTER TABLE AnalysisResults ADD CoherenceScore FLOAT NULL
""")
cursor.execute("""
IF COL_LENGTH('AnalysisResults', 'CoherenceFlag') IS NULL
ALTER TABLE AnalysisResults ADD CoherenceFlag NVARCHAR(50) NULL
""")
cursor.execute("""
IF COL_LENGTH('UserState', 'BaselineCoherence') IS NULL
ALTER TABLE UserState ADD BaselineCoherence FLOAT NOT NULL DEFAULT 0.85
""")
# --- Schema migration for per-user LLM notes ---
cursor.execute("""
IF COL_LENGTH('UserState', 'UserNotes') IS NULL
ALTER TABLE UserState ADD UserNotes NVARCHAR(MAX) NULL
""")
cursor.close()
def _parse_database_name(self) -> str:
"""Extract DATABASE= value from the connection string."""
for part in self._conn_str.split(";"):
if part.strip().upper().startswith("DATABASE="):
return part.split("=", 1)[1].strip()
return ""
# ------------------------------------------------------------------
# Message + Analysis (awaited — we need the returned message ID)
# ------------------------------------------------------------------
async def save_message_and_analysis(
self,
guild_id: int,
channel_id: int,
user_id: int,
username: str,
content: str,
message_ts: datetime,
toxicity_score: float,
drama_score: float,
categories: list[str],
reasoning: str,
off_topic: bool = False,
topic_category: str | None = None,
topic_reasoning: str | None = None,
coherence_score: float | None = None,
coherence_flag: str | None = None,
) -> int | None:
"""Save a message and its analysis result. Returns the message row ID."""
if not self._available:
return None
try:
return await asyncio.to_thread(
self._save_message_and_analysis_sync,
guild_id, channel_id, user_id, username, content, message_ts,
toxicity_score, drama_score, categories, reasoning,
off_topic, topic_category, topic_reasoning,
coherence_score, coherence_flag,
)
except Exception:
logger.exception("Failed to save message and analysis")
return None
def _save_message_and_analysis_sync(
self,
guild_id, channel_id, user_id, username, content, message_ts,
toxicity_score, drama_score, categories, reasoning,
off_topic, topic_category, topic_reasoning,
coherence_score, coherence_flag,
) -> int:
conn = self._connect()
try:
cursor = conn.cursor()
cursor.execute(
"""INSERT INTO Messages (GuildId, ChannelId, UserId, Username, Content, MessageTs)
OUTPUT INSERTED.Id
VALUES (?, ?, ?, ?, ?, ?)""",
guild_id, channel_id, user_id, username,
content[:4000], # Truncate very long messages
message_ts,
)
msg_id = cursor.fetchone()[0]
cursor.execute(
"""INSERT INTO AnalysisResults
(MessageId, ToxicityScore, DramaScore, Categories, Reasoning,
OffTopic, TopicCategory, TopicReasoning,
CoherenceScore, CoherenceFlag)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
msg_id, toxicity_score, drama_score,
json.dumps(categories), reasoning[:4000],
1 if off_topic else 0,
topic_category, topic_reasoning[:4000] if topic_reasoning else None,
coherence_score, coherence_flag,
)
cursor.close()
return msg_id
finally:
conn.close()
# ------------------------------------------------------------------
# Actions (fire-and-forget via asyncio.create_task)
# ------------------------------------------------------------------
async def save_action(
self,
guild_id: int,
user_id: int,
username: str,
action_type: str,
message_id: int | None = None,
details: str | None = None,
) -> None:
"""Save a moderation action (warning, mute, topic_remind, etc.)."""
if not self._available:
return
try:
await asyncio.to_thread(
self._save_action_sync,
guild_id, user_id, username, action_type, message_id, details,
)
except Exception:
logger.exception("Failed to save action")
def _save_action_sync(self, guild_id, user_id, username, action_type, message_id, details):
conn = self._connect()
try:
cursor = conn.cursor()
cursor.execute(
"""INSERT INTO Actions (GuildId, UserId, Username, ActionType, MessageId, Details)
VALUES (?, ?, ?, ?, ?, ?)""",
guild_id, user_id, username, action_type, message_id,
details[:4000] if details else None,
)
cursor.close()
finally:
conn.close()
# ------------------------------------------------------------------
# UserState (upsert via MERGE)
# ------------------------------------------------------------------
async def save_user_state(
self,
user_id: int,
offense_count: int,
immune: bool,
off_topic_count: int,
baseline_coherence: float = 0.85,
user_notes: str | None = None,
) -> None:
"""Upsert user state (offense count, immunity, off-topic count, coherence baseline, notes)."""
if not self._available:
return
try:
await asyncio.to_thread(
self._save_user_state_sync,
user_id, offense_count, immune, off_topic_count, baseline_coherence, user_notes,
)
except Exception:
logger.exception("Failed to save user state")
def _save_user_state_sync(self, user_id, offense_count, immune, off_topic_count, baseline_coherence, user_notes):
conn = self._connect()
try:
cursor = conn.cursor()
cursor.execute(
"""MERGE UserState AS target
USING (SELECT ? AS UserId) AS source
ON target.UserId = source.UserId
WHEN MATCHED THEN
UPDATE SET OffenseCount = ?, Immune = ?, OffTopicCount = ?,
BaselineCoherence = ?, UserNotes = ?,
UpdatedAt = SYSUTCDATETIME()
WHEN NOT MATCHED THEN
INSERT (UserId, OffenseCount, Immune, OffTopicCount, BaselineCoherence, UserNotes)
VALUES (?, ?, ?, ?, ?, ?);""",
user_id,
offense_count, 1 if immune else 0, off_topic_count, baseline_coherence, user_notes,
user_id, offense_count, 1 if immune else 0, off_topic_count, baseline_coherence, user_notes,
)
cursor.close()
finally:
conn.close()
async def delete_user_state(self, user_id: int) -> None:
"""Remove a user's persisted state (used by /bcs-reset)."""
if not self._available:
return
try:
await asyncio.to_thread(self._delete_user_state_sync, user_id)
except Exception:
logger.exception("Failed to delete user state")
def _delete_user_state_sync(self, user_id):
conn = self._connect()
try:
cursor = conn.cursor()
cursor.execute("DELETE FROM UserState WHERE UserId = ?", user_id)
cursor.close()
finally:
conn.close()
# ------------------------------------------------------------------
# Hydration (load all user states on startup)
# ------------------------------------------------------------------
async def load_all_user_states(self) -> list[dict]:
"""Load all user states from the database for startup hydration.
Returns list of dicts with user_id, offense_count, immune, off_topic_count."""
if not self._available:
return []
try:
return await asyncio.to_thread(self._load_all_user_states_sync)
except Exception:
logger.exception("Failed to load user states")
return []
def _load_all_user_states_sync(self) -> list[dict]:
conn = self._connect()
try:
cursor = conn.cursor()
cursor.execute(
"SELECT UserId, OffenseCount, Immune, OffTopicCount, BaselineCoherence, UserNotes FROM UserState"
)
rows = cursor.fetchall()
cursor.close()
return [
{
"user_id": row[0],
"offense_count": row[1],
"immune": bool(row[2]),
"off_topic_count": row[3],
"baseline_coherence": float(row[4]),
"user_notes": row[5] or "",
}
for row in rows
]
finally:
conn.close()
async def close(self):
"""No persistent connection to close (connections are per-operation)."""
pass

284
utils/drama_tracker.py Normal file
View File

@@ -0,0 +1,284 @@
import time
from dataclasses import dataclass, field
from datetime import datetime, timezone
@dataclass
class AnalysisEntry:
timestamp: float
toxicity_score: float
categories: list[str]
reasoning: str
@dataclass
class UserDrama:
entries: list[AnalysisEntry] = field(default_factory=list)
offense_count: int = 0
last_offense_time: float = 0.0
last_warning_time: float = 0.0
last_analysis_time: float = 0.0
warned_since_reset: bool = False
immune: bool = False
# Topic drift tracking
off_topic_count: int = 0
last_topic_remind_time: float = 0.0
owner_notified: bool = False
# Coherence tracking
coherence_scores: list[float] = field(default_factory=list)
baseline_coherence: float = 0.85
last_coherence_alert_time: float = 0.0
# Per-user LLM notes
notes: str = ""
class DramaTracker:
def __init__(
self,
window_size: int = 10,
window_minutes: int = 15,
offense_reset_minutes: int = 120,
):
self.window_size = window_size
self.window_seconds = window_minutes * 60
self.offense_reset_seconds = offense_reset_minutes * 60
self._users: dict[int, UserDrama] = {}
def get_user(self, user_id: int) -> UserDrama:
if user_id not in self._users:
self._users[user_id] = UserDrama()
return self._users[user_id]
def add_entry(
self,
user_id: int,
toxicity_score: float,
categories: list[str],
reasoning: str,
) -> None:
user = self.get_user(user_id)
now = time.time()
user.entries.append(
AnalysisEntry(
timestamp=now,
toxicity_score=toxicity_score,
categories=categories,
reasoning=reasoning,
)
)
user.last_analysis_time = now
self._prune_entries(user, now)
def get_drama_score(self, user_id: int) -> float:
user = self.get_user(user_id)
now = time.time()
self._prune_entries(user, now)
if not user.entries:
return 0.0
# Weighted average: more recent messages weighted higher
total_weight = 0.0
weighted_sum = 0.0
for i, entry in enumerate(user.entries):
weight = (i + 1) # linear weight, later entries = higher
weighted_sum += entry.toxicity_score * weight
total_weight += weight
return weighted_sum / total_weight if total_weight > 0 else 0.0
def get_mute_threshold(self, user_id: int, base_threshold: float) -> float:
"""Lower the mute threshold if user was already warned."""
user = self.get_user(user_id)
if user.warned_since_reset:
return base_threshold - 0.05
return base_threshold
def record_offense(self, user_id: int) -> int:
user = self.get_user(user_id)
now = time.time()
# Reset offense count if enough time has passed
if (
user.last_offense_time > 0
and now - user.last_offense_time > self.offense_reset_seconds
):
user.offense_count = 0
user.offense_count += 1
user.last_offense_time = now
user.warned_since_reset = False
return user.offense_count
def record_warning(self, user_id: int) -> None:
user = self.get_user(user_id)
user.last_warning_time = time.time()
user.warned_since_reset = True
def can_warn(self, user_id: int, cooldown_minutes: int) -> bool:
user = self.get_user(user_id)
if user.last_warning_time == 0.0:
return True
return time.time() - user.last_warning_time > cooldown_minutes * 60
def can_analyze(self, user_id: int, cooldown_seconds: int) -> bool:
user = self.get_user(user_id)
if user.last_analysis_time == 0.0:
return True
return time.time() - user.last_analysis_time > cooldown_seconds
def reset_user(self, user_id: int) -> None:
if user_id in self._users:
del self._users[user_id]
def toggle_immunity(self, user_id: int) -> bool:
user = self.get_user(user_id)
user.immune = not user.immune
return user.immune
def is_immune(self, user_id: int) -> bool:
if user_id not in self._users:
return False
return self._users[user_id].immune
def get_all_scores(self) -> dict[int, float]:
scores = {}
for user_id in list(self._users.keys()):
score = self.get_drama_score(user_id)
if score > 0.0:
scores[user_id] = score
return scores
def get_recent_incidents(
self, user_id: int, count: int = 5
) -> list[AnalysisEntry]:
user = self.get_user(user_id)
now = time.time()
self._prune_entries(user, now)
# Return entries with score > 0.3 (non-trivial)
incidents = [e for e in user.entries if e.toxicity_score > 0.3]
return incidents[-count:]
def record_off_topic(self, user_id: int) -> int:
user = self.get_user(user_id)
user.off_topic_count += 1
user.last_topic_remind_time = time.time()
return user.off_topic_count
def can_topic_remind(self, user_id: int, cooldown_minutes: int) -> bool:
user = self.get_user(user_id)
if user.last_topic_remind_time == 0.0:
return True
return time.time() - user.last_topic_remind_time > cooldown_minutes * 60
def get_off_topic_count(self, user_id: int) -> int:
return self.get_user(user_id).off_topic_count
def mark_owner_notified(self, user_id: int) -> None:
self.get_user(user_id).owner_notified = True
def was_owner_notified(self, user_id: int) -> bool:
return self.get_user(user_id).owner_notified
def get_user_notes(self, user_id: int) -> str:
return self.get_user(user_id).notes
def update_user_notes(self, user_id: int, note_update: str) -> None:
user = self.get_user(user_id)
ts = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M")
new_line = f"[{ts}] {note_update}"
if user.notes:
user.notes = f"{user.notes}\n{new_line}"
else:
user.notes = new_line
# Trim oldest lines if over ~2000 chars
while len(user.notes) > 2000:
lines = user.notes.split("\n")
if len(lines) <= 1:
break
user.notes = "\n".join(lines[1:])
def clear_user_notes(self, user_id: int) -> None:
self.get_user(user_id).notes = ""
def reset_off_topic(self, user_id: int) -> None:
user = self.get_user(user_id)
user.off_topic_count = 0
user.last_topic_remind_time = 0.0
user.owner_notified = False
def update_coherence(
self,
user_id: int,
score: float,
flag: str,
drop_threshold: float = 0.3,
absolute_floor: float = 0.5,
cooldown_minutes: int = 30,
) -> dict | None:
"""Update user's coherence baseline and detect degradation.
Returns info dict if degradation detected, else None."""
user = self.get_user(user_id)
alpha = 0.1 # Slow-moving EMA — ~20 messages to shift significantly
# Keep a rolling window of recent scores (last 20)
user.coherence_scores.append(score)
if len(user.coherence_scores) > 20:
user.coherence_scores = user.coherence_scores[-20:]
baseline_before = user.baseline_coherence
drop = baseline_before - score
# Check for degradation BEFORE updating baseline
degraded = (
score < baseline_before - drop_threshold
and score < absolute_floor
)
# Update baseline with EMA
user.baseline_coherence = alpha * score + (1 - alpha) * user.baseline_coherence
if not degraded:
return None
# Check cooldown
now = time.time()
if (
user.last_coherence_alert_time > 0
and now - user.last_coherence_alert_time < cooldown_minutes * 60
):
return None
user.last_coherence_alert_time = now
return {
"baseline": baseline_before,
"current": score,
"drop": drop,
"flag": flag,
}
def load_user_states(self, states: list[dict]) -> int:
"""Hydrate user state from database rows.
Each dict must have: user_id, offense_count, immune, off_topic_count.
Optionally includes baseline_coherence.
Returns number of users loaded."""
count = 0
for state in states:
user_id = state["user_id"]
user = self.get_user(user_id)
user.offense_count = state["offense_count"]
user.immune = state["immune"]
user.off_topic_count = state["off_topic_count"]
if "baseline_coherence" in state:
user.baseline_coherence = state["baseline_coherence"]
if "user_notes" in state and state["user_notes"]:
user.notes = state["user_notes"]
count += 1
return count
def _prune_entries(self, user: UserDrama, now: float) -> None:
cutoff = now - self.window_seconds
user.entries = [e for e in user.entries if e.timestamp > cutoff]
if len(user.entries) > self.window_size:
user.entries = user.entries[-self.window_size :]

289
utils/ollama_client.py Normal file
View File

@@ -0,0 +1,289 @@
import json
import logging
from openai import AsyncOpenAI
logger = logging.getLogger("bcs.llm")
SYSTEM_PROMPT = """You are a Discord chat moderator AI for a gaming server. You will be given a TARGET MESSAGE to analyze, plus recent channel context for background.
CRITICAL: Only score the TARGET MESSAGE. The context is ONLY for understanding tone and conversation flow. Do NOT score the context messages — they belong to other users and are already being analyzed separately.
CONTEXT — This is a friend group who use crude nicknames (e.g. "tits" is someone's nickname). A nickname alone is NOT toxic. However, you must still flag genuinely aggressive language.
SCORING GUIDE — Be precise with scores:
- 0.0-0.1: Completely harmless. Casual chat, jokes, "lmao", greetings, game talk, nicknames.
- 0.2-0.3: Mildly edgy. Light trash-talk that's clearly playful ("you suck at this game lol").
- 0.4-0.5: Moderate. Frustrated or heated language that could go either way. Insults without "lol/lmao" softening them.
- 0.6-0.7: Aggressive. Direct insults, hostility, telling someone off. Even among friends, "fuck you, you piece of shit" is aggressive language that should score here regardless of context.
- 0.8-1.0: Severely toxic. Threats, targeted harassment, telling someone to leave, attacking insecurities, sustained personal attacks.
IMPORTANT RULES:
- "Tits" as a nickname = 0.0, not toxic.
- Profanity ALONE (just "fuck" or "shit" with no target) = low score (0.0-0.1).
- Profanity DIRECTED AT someone ("fuck you", "you piece of shit") = moderate-to-high score (0.5-0.7) even among friends.
- Do NOT let friendly context excuse clearly aggressive language. Friends can still cross lines.
- If a message contains BOTH a nickname AND an insult ("fuck you tits you piece of shit"), score the insult, not the nickname.
- If the target message is just "lmao", "lol", an emoji, or a short neutral reaction, it is ALWAYS 0.0 regardless of what other people said before it.
Also determine if the message is on-topic (gaming, games, matches, strategy, LFG, etc.) or off-topic personal drama (relationship issues, personal feuds, venting about real-life problems, gossip about people outside the server).
Also assess the message's coherence — how well-formed, readable, and grammatically correct it is.
- 0.9-1.0: Clear, well-written, normal for this user
- 0.6-0.8: Some errors but still understandable (normal texting shortcuts like "u" and "ur" are fine — don't penalize those)
- 0.3-0.5: Noticeably degraded — garbled words, missing letters, broken sentences beyond normal shorthand
- 0.0-0.2: Nearly incoherent — can barely understand what they're trying to say
You may also be given NOTES about this user from prior interactions. Use these to calibrate your scoring — for example, if notes say "uses heavy profanity casually" then profanity alone should score lower for this user.
If you notice something noteworthy about this user's communication style, behavior, or patterns that would help future analysis, include it as a note_update. Only add genuinely useful observations — don't repeat what's already in the notes. If nothing new, leave note_update as null.
Use the report_analysis tool to report your analysis of the TARGET MESSAGE only."""
ANALYSIS_TOOL = {
"type": "function",
"function": {
"name": "report_analysis",
"description": "Report the toxicity and topic analysis of a Discord message.",
"parameters": {
"type": "object",
"properties": {
"toxicity_score": {
"type": "number",
"description": "Toxicity rating from 0.0 (completely harmless) to 1.0 (extremely toxic).",
},
"categories": {
"type": "array",
"items": {
"type": "string",
"enum": [
"aggressive",
"passive_aggressive",
"instigating",
"hostile",
"manipulative",
"none",
],
},
"description": "Detected toxicity behavior categories.",
},
"reasoning": {
"type": "string",
"description": "Brief explanation of the toxicity analysis.",
},
"off_topic": {
"type": "boolean",
"description": "True if the message is off-topic personal drama rather than gaming-related conversation.",
},
"topic_category": {
"type": "string",
"enum": [
"gaming",
"personal_drama",
"relationship_issues",
"real_life_venting",
"gossip",
"general_chat",
"meta",
],
"description": "What topic category the message falls into.",
},
"topic_reasoning": {
"type": "string",
"description": "Brief explanation of the topic classification.",
},
"coherence_score": {
"type": "number",
"description": "Coherence rating from 0.0 (incoherent gibberish) to 1.0 (clear and well-written). Normal texting shortcuts are fine.",
},
"coherence_flag": {
"type": "string",
"enum": [
"normal",
"intoxicated",
"tired",
"angry_typing",
"mobile_keyboard",
"language_barrier",
],
"description": "Best guess at why coherence is low, if applicable.",
},
"note_update": {
"type": ["string", "null"],
"description": "Brief new observation about this user's style/behavior for future reference, or null if nothing new.",
},
},
"required": ["toxicity_score", "categories", "reasoning", "off_topic", "topic_category", "topic_reasoning", "coherence_score", "coherence_flag"],
},
},
}
class LLMClient:
def __init__(self, base_url: str, model: str, api_key: str = "not-needed"):
self.model = model
self.host = base_url.rstrip("/")
self._client = AsyncOpenAI(
base_url=f"{self.host}/v1",
api_key=api_key,
timeout=300.0, # 5 min — first request loads model into VRAM
)
async def close(self):
await self._client.close()
async def analyze_message(
self, message: str, context: str = "", user_notes: str = ""
) -> dict | None:
user_content = f"=== CONTEXT (other users' recent messages, for background only) ===\n{context}\n\n"
if user_notes:
user_content += f"=== NOTES ABOUT THIS USER (from prior analysis) ===\n{user_notes}\n\n"
user_content += f"=== TARGET MESSAGE (analyze THIS message only) ===\n{message}"
try:
response = await self._client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": user_content},
],
tools=[ANALYSIS_TOOL],
tool_choice={"type": "function", "function": {"name": "report_analysis"}},
temperature=0.1,
)
choice = response.choices[0]
# Extract tool call arguments
if choice.message.tool_calls:
tool_call = choice.message.tool_calls[0]
args = json.loads(tool_call.function.arguments)
return self._validate_result(args)
# Fallback: try parsing the message content as JSON
if choice.message.content:
return self._parse_content_fallback(choice.message.content)
logger.warning("No tool call or content in LLM response.")
return None
except Exception as e:
logger.error("LLM analysis error: %s", e)
return None
def _validate_result(self, result: dict) -> dict:
score = float(result.get("toxicity_score", 0.0))
result["toxicity_score"] = min(max(score, 0.0), 1.0)
if not isinstance(result.get("categories"), list):
result["categories"] = ["none"]
if not isinstance(result.get("reasoning"), str):
result["reasoning"] = ""
result["off_topic"] = bool(result.get("off_topic", False))
result.setdefault("topic_category", "general_chat")
result.setdefault("topic_reasoning", "")
coherence = float(result.get("coherence_score", 0.85))
result["coherence_score"] = min(max(coherence, 0.0), 1.0)
result.setdefault("coherence_flag", "normal")
result.setdefault("note_update", None)
return result
def _parse_content_fallback(self, text: str) -> dict | None:
"""Try to parse plain-text content as JSON if tool calling didn't work."""
import re
# Try direct JSON
try:
result = json.loads(text.strip())
return self._validate_result(result)
except (json.JSONDecodeError, ValueError):
pass
# Try extracting from code block
match = re.search(r"```(?:json)?\s*(\{.*?\})\s*```", text, re.DOTALL)
if match:
try:
result = json.loads(match.group(1))
return self._validate_result(result)
except (json.JSONDecodeError, ValueError):
pass
# Regex fallback for toxicity_score
score_match = re.search(r'"toxicity_score"\s*:\s*([\d.]+)', text)
if score_match:
return {
"toxicity_score": min(max(float(score_match.group(1)), 0.0), 1.0),
"categories": ["unknown"],
"reasoning": "Parsed via fallback regex",
}
logger.warning("Could not parse LLM content fallback: %s", text[:200])
return None
async def chat(
self, messages: list[dict[str, str]], system_prompt: str
) -> str | None:
"""Send a conversational chat request (no tools)."""
try:
response = await self._client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": system_prompt},
*messages,
],
temperature=0.8,
max_tokens=300,
)
content = response.choices[0].message.content
return content.strip() if content else None
except Exception as e:
logger.error("LLM chat error: %s", e)
return None
async def raw_analyze(self, message: str, context: str = "", user_notes: str = "") -> tuple[str, dict | None]:
"""Return the raw LLM response string AND parsed result for /bcs-test (single LLM call)."""
user_content = f"=== CONTEXT (other users' recent messages, for background only) ===\n{context}\n\n"
if user_notes:
user_content += f"=== NOTES ABOUT THIS USER (from prior analysis) ===\n{user_notes}\n\n"
user_content += f"=== TARGET MESSAGE (analyze THIS message only) ===\n{message}"
try:
response = await self._client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": user_content},
],
tools=[ANALYSIS_TOOL],
tool_choice={"type": "function", "function": {"name": "report_analysis"}},
temperature=0.1,
)
choice = response.choices[0]
parts = []
parsed = None
if choice.message.content:
parts.append(f"Content: {choice.message.content}")
if choice.message.tool_calls:
for tc in choice.message.tool_calls:
parts.append(
f"Tool call: {tc.function.name}({tc.function.arguments})"
)
# Parse the first tool call
args = json.loads(choice.message.tool_calls[0].function.arguments)
parsed = self._validate_result(args)
elif choice.message.content:
parsed = self._parse_content_fallback(choice.message.content)
raw = "\n".join(parts) or "(empty response)"
return raw, parsed
except Exception as e:
return f"Error: {e}", None