Files
Breehavior-Monitor/utils/llm_client.py
AJ Isaacs f46caf9ac5 fix: tag context messages with [CONTEXT] to prevent LLM from scoring them
The triage LLM was blending context message content into its reasoning
for new messages (e.g., citing profanity from context when the new
message was just "I'll be here"). Added per-message [CONTEXT] tags
inline and strengthened the prompt to explicitly forbid referencing
context content in reasoning/scores.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-25 20:08:23 -05:00

762 lines
34 KiB
Python

import asyncio
import base64
import json
import logging
import time
from datetime import datetime, timedelta, timezone
from pathlib import Path
from openai import AsyncOpenAI
logger = logging.getLogger("bcs.llm")
_PROMPTS_DIR = Path(__file__).resolve().parent.parent / "prompts"
SYSTEM_PROMPT = (_PROMPTS_DIR / "analysis.txt").read_text(encoding="utf-8")
ANALYSIS_TOOL = {
"type": "function",
"function": {
"name": "report_analysis",
"description": "Report the toxicity and topic analysis of a Discord message.",
"parameters": {
"type": "object",
"properties": {
"toxicity_score": {
"type": "number",
"description": "Toxicity rating from 0.0 (completely harmless) to 1.0 (extremely toxic).",
},
"categories": {
"type": "array",
"items": {
"type": "string",
"enum": [
"aggressive",
"passive_aggressive",
"instigating",
"hostile",
"manipulative",
"sexual_vulgar",
"none",
],
},
"description": "Detected toxicity behavior categories.",
},
"reasoning": {
"type": "string",
"description": "Brief explanation of the toxicity analysis.",
},
"off_topic": {
"type": "boolean",
"description": "True if the message is off-topic personal drama rather than gaming-related conversation.",
},
"topic_category": {
"type": "string",
"enum": [
"gaming",
"personal_drama",
"relationship_issues",
"real_life_venting",
"gossip",
"general_chat",
"meta",
],
"description": "What topic category the message falls into.",
},
"topic_reasoning": {
"type": "string",
"description": "Brief explanation of the topic classification.",
},
"coherence_score": {
"type": "number",
"description": "Coherence rating from 0.0 (incoherent gibberish) to 1.0 (clear and well-written). Normal texting shortcuts are fine.",
},
"coherence_flag": {
"type": "string",
"enum": [
"normal",
"intoxicated",
"tired",
"angry_typing",
"mobile_keyboard",
"language_barrier",
],
"description": "Best guess at why coherence is low, if applicable.",
},
"note_update": {
"type": ["string", "null"],
"description": "Brief new observation about this user's style/behavior for future reference, or null if nothing new.",
},
"detected_game": {
"type": ["string", "null"],
"description": "The game channel name this message is about (e.g. 'gta-online', 'warzone'), or null if not game-specific.",
},
},
"required": ["toxicity_score", "categories", "reasoning", "off_topic", "topic_category", "topic_reasoning", "coherence_score", "coherence_flag"],
},
},
}
CONVERSATION_TOOL = {
"type": "function",
"function": {
"name": "report_conversation_scan",
"description": "Analyze a conversation block and report findings per user.",
"parameters": {
"type": "object",
"properties": {
"user_findings": {
"type": "array",
"items": {
"type": "object",
"properties": {
"username": {
"type": "string",
"description": "Discord display name of the user.",
},
"toxicity_score": {
"type": "number",
"description": "Weighted toxicity 0.0-1.0 across their messages in this conversation.",
},
"categories": {
"type": "array",
"items": {
"type": "string",
"enum": [
"aggressive",
"passive_aggressive",
"instigating",
"hostile",
"manipulative",
"sexual_vulgar",
"none",
],
},
"description": "Detected toxicity behavior categories.",
},
"reasoning": {
"type": "string",
"description": "Brief explanation of this user's behavior in the conversation.",
},
"worst_message": {
"type": ["string", "null"],
"description": "Most problematic snippet from this user (quoted, max 100 chars), or null if nothing notable.",
},
"off_topic": {
"type": "boolean",
"description": "True if this user's messages were primarily off-topic personal drama.",
},
"topic_category": {
"type": "string",
"enum": [
"gaming",
"personal_drama",
"relationship_issues",
"real_life_venting",
"gossip",
"general_chat",
"meta",
],
"description": "What topic category this user's messages fall into.",
},
"topic_reasoning": {
"type": "string",
"description": "Brief explanation of the topic classification for this user.",
},
"coherence_score": {
"type": "number",
"description": "Coherence rating 0.0-1.0 across this user's messages. Normal texting shortcuts are fine.",
},
"coherence_flag": {
"type": "string",
"enum": [
"normal",
"intoxicated",
"tired",
"angry_typing",
"mobile_keyboard",
"language_barrier",
],
"description": "Best guess at why coherence is low, if applicable.",
},
"note_update": {
"type": ["string", "null"],
"description": "New observation about this user's pattern, or null.",
},
"detected_game": {
"type": ["string", "null"],
"description": "The game channel name this user's messages are about, or null.",
},
},
"required": ["username", "toxicity_score", "categories", "reasoning", "off_topic", "topic_category", "topic_reasoning", "coherence_score", "coherence_flag"],
},
"description": "Findings for each user who participated in the conversation.",
},
"conversation_summary": {
"type": "string",
"description": "One-sentence summary of the overall conversation tone and any escalation patterns.",
},
},
"required": ["user_findings", "conversation_summary"],
},
},
}
_NO_TEMPERATURE_MODELS = {"gpt-5-nano", "o1", "o1-mini", "o1-preview", "o3", "o3-mini", "o4-mini"}
class LLMClient:
def __init__(self, base_url: str, model: str, api_key: str = "not-needed",
db=None, no_think: bool = False, concurrency: int = 4):
self.model = model
self.host = base_url.rstrip("/")
self._db = db
self._no_think = no_think
self._supports_temperature = model not in _NO_TEMPERATURE_MODELS
timeout = 600.0 if self.host else 120.0 # local models need longer for VRAM load
client_kwargs = {"api_key": api_key, "timeout": timeout}
if self.host:
client_kwargs["base_url"] = f"{self.host}/v1"
self._client = AsyncOpenAI(**client_kwargs)
self._semaphore = asyncio.Semaphore(concurrency)
def _log_llm(self, request_type: str, duration_ms: int, success: bool,
request: str, response: str | None = None, error: str | None = None,
input_tokens: int | None = None, output_tokens: int | None = None):
"""Fire-and-forget LLM log entry to the database."""
if not self._db:
return
asyncio.create_task(self._db.save_llm_log(
request_type=request_type,
model=self.model,
duration_ms=duration_ms,
success=success,
request=request,
response=response,
error=error,
input_tokens=input_tokens,
output_tokens=output_tokens,
))
def _append_no_think(self, text: str) -> str:
return text + "\n/no_think" if self._no_think else text
async def close(self):
await self._client.close()
async def analyze_message(
self, message: str, context: str = "", user_notes: str = "",
channel_context: str = "", mention_context: str = "",
) -> dict | None:
user_content = f"=== RECENT CHANNEL MESSAGES (for background context only) ===\n{context}\n\n"
if user_notes:
user_content += f"=== NOTES ABOUT THIS USER (from prior analysis) ===\n{user_notes}\n\n"
if channel_context:
user_content += f"=== CHANNEL INFO ===\n{channel_context}\n\n"
if mention_context:
user_content += f"=== USER REPORT (a user flagged this conversation — focus on this concern) ===\n{mention_context}\n\n"
user_content += f"=== TARGET MESSAGE (analyze THIS message only) ===\n{message}"
user_content = self._append_no_think(user_content)
req_json = json.dumps([
{"role": "system", "content": SYSTEM_PROMPT[:500]},
{"role": "user", "content": user_content},
], default=str)
t0 = time.monotonic()
async with self._semaphore:
try:
temp_kwargs = {"temperature": 0.1} if self._supports_temperature else {}
response = await self._client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": user_content},
],
tools=[ANALYSIS_TOOL],
tool_choice={"type": "function", "function": {"name": "report_analysis"}},
**temp_kwargs,
max_completion_tokens=2048,
)
elapsed = int((time.monotonic() - t0) * 1000)
choice = response.choices[0]
usage = response.usage
# Extract tool call arguments
if choice.message.tool_calls:
tool_call = choice.message.tool_calls[0]
resp_text = tool_call.function.arguments
args = json.loads(resp_text)
self._log_llm("analysis", elapsed, True, req_json, resp_text,
input_tokens=usage.prompt_tokens if usage else None,
output_tokens=usage.completion_tokens if usage else None)
return self._validate_result(args)
# Fallback: try parsing the message content as JSON
if choice.message.content:
self._log_llm("analysis", elapsed, True, req_json, choice.message.content,
input_tokens=usage.prompt_tokens if usage else None,
output_tokens=usage.completion_tokens if usage else None)
return self._parse_content_fallback(choice.message.content)
logger.warning("No tool call or content in LLM response.")
self._log_llm("analysis", elapsed, False, req_json, error="Empty response")
return None
except Exception as e:
elapsed = int((time.monotonic() - t0) * 1000)
logger.error("LLM analysis error: %s", e)
self._log_llm("analysis", elapsed, False, req_json, error=str(e))
return None
def _validate_result(self, result: dict) -> dict:
score = float(result.get("toxicity_score", 0.0))
result["toxicity_score"] = min(max(score, 0.0), 1.0)
if not isinstance(result.get("categories"), list):
result["categories"] = ["none"]
if not isinstance(result.get("reasoning"), str):
result["reasoning"] = ""
result["off_topic"] = bool(result.get("off_topic", False))
result.setdefault("topic_category", "general_chat")
result.setdefault("topic_reasoning", "")
coherence = float(result.get("coherence_score", 0.85))
result["coherence_score"] = min(max(coherence, 0.0), 1.0)
result.setdefault("coherence_flag", "normal")
result.setdefault("note_update", None)
result.setdefault("detected_game", None)
return result
def _parse_content_fallback(self, text: str) -> dict | None:
"""Try to parse plain-text content as JSON if tool calling didn't work."""
import re
# Try direct JSON
try:
result = json.loads(text.strip())
return self._validate_result(result)
except (json.JSONDecodeError, ValueError):
pass
# Try extracting from code block
match = re.search(r"```(?:json)?\s*(\{.*?\})\s*```", text, re.DOTALL)
if match:
try:
result = json.loads(match.group(1))
return self._validate_result(result)
except (json.JSONDecodeError, ValueError):
pass
# Regex fallback for toxicity_score
score_match = re.search(r'"toxicity_score"\s*:\s*([\d.]+)', text)
if score_match:
return {
"toxicity_score": min(max(float(score_match.group(1)), 0.0), 1.0),
"categories": ["unknown"],
"reasoning": "Parsed via fallback regex",
}
logger.warning("Could not parse LLM content fallback: %s", text[:200])
return None
# -- Conversation-level analysis (mention scan) --
@staticmethod
def _format_relative_time(delta: timedelta) -> str:
total_seconds = int(delta.total_seconds())
if total_seconds < 60:
return f"~{total_seconds}s ago"
minutes = total_seconds // 60
if minutes < 60:
return f"~{minutes}m ago"
hours = minutes // 60
return f"~{hours}h ago"
@staticmethod
def _format_conversation_block(
messages: list[tuple[str, str, datetime, str | None]],
now: datetime | None = None,
new_message_start: int | None = None,
) -> str:
"""Format messages as a compact timestamped chat block.
Each tuple is (username, content, timestamp, reply_to_username).
Consecutive messages from the same user collapse to indented lines.
Replies shown as ``username → replied_to:``.
If *new_message_start* is given, a separator is inserted before that
index so the LLM can distinguish context from new messages.
"""
if now is None:
now = datetime.now(timezone.utc)
lines = [f"[Current time: {now.strftime('%I:%M %p')}]", ""]
last_user = None
in_new_section = new_message_start is None or new_message_start == 0
for idx, (username, content, ts, reply_to) in enumerate(messages):
if new_message_start is not None and idx == new_message_start:
lines.append("")
lines.append("--- NEW MESSAGES (score only these) ---")
lines.append("")
last_user = None # reset collapse so first new msg gets full header
in_new_section = True
delta = now - ts.replace(tzinfo=timezone.utc) if ts.tzinfo is None else now - ts
rel = LLMClient._format_relative_time(delta)
tag = "" if in_new_section else " [CONTEXT]"
if username == last_user:
# Continuation from same user — indent
for line in content.split("\n"):
lines.append(f" {line}")
else:
# New user block
if reply_to:
prefix = f"[{rel}] {username}{reply_to}:{tag} "
else:
prefix = f"[{rel}] {username}:{tag} "
msg_lines = content.split("\n")
lines.append(prefix + msg_lines[0])
for line in msg_lines[1:]:
lines.append(f" {line}")
last_user = username
return "\n".join(lines)
async def analyze_conversation(
self,
messages: list[tuple[str, str, datetime, str | None]],
mention_context: str = "",
channel_context: str = "",
user_notes_map: dict[str, str] | None = None,
new_message_start: int | None = None,
) -> dict | None:
"""Analyze a conversation block in one call, returning per-user findings."""
if not messages:
return None
convo_block = self._format_conversation_block(messages, new_message_start=new_message_start)
user_content = f"=== CONVERSATION BLOCK ===\n{convo_block}\n\n"
if user_notes_map:
notes_lines = [f" {u}: {n}" for u, n in user_notes_map.items() if n]
if notes_lines:
user_content += "=== USER NOTES (from prior analysis) ===\n" + "\n".join(notes_lines) + "\n\n"
if channel_context:
user_content += f"=== CHANNEL INFO ===\n{channel_context}\n\n"
if mention_context:
user_content += f"=== USER REPORT (a user flagged this conversation — focus on this concern) ===\n{mention_context}\n\n"
user_content += "Analyze the conversation block above and report findings for each user."
user_content = self._append_no_think(user_content)
req_json = json.dumps([
{"role": "system", "content": SYSTEM_PROMPT[:500]},
{"role": "user", "content": user_content[:500]},
], default=str)
t0 = time.monotonic()
async with self._semaphore:
try:
temp_kwargs = {"temperature": 0.1} if self._supports_temperature else {}
response = await self._client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": user_content},
],
tools=[CONVERSATION_TOOL],
tool_choice={"type": "function", "function": {"name": "report_conversation_scan"}},
**temp_kwargs,
max_completion_tokens=4096,
)
elapsed = int((time.monotonic() - t0) * 1000)
choice = response.choices[0]
usage = response.usage
if choice.message.tool_calls:
tool_call = choice.message.tool_calls[0]
resp_text = tool_call.function.arguments
args = json.loads(resp_text)
self._log_llm("conversation", elapsed, True, req_json, resp_text,
input_tokens=usage.prompt_tokens if usage else None,
output_tokens=usage.completion_tokens if usage else None)
return self._validate_conversation_result(args)
# Fallback: try parsing the message content as JSON
if choice.message.content:
logger.warning("No tool call in conversation analysis — trying content fallback. Content: %s", choice.message.content[:300])
fallback = self._parse_conversation_content_fallback(choice.message.content)
if fallback:
self._log_llm("conversation", elapsed, True, req_json, choice.message.content,
input_tokens=usage.prompt_tokens if usage else None,
output_tokens=usage.completion_tokens if usage else None)
return fallback
logger.warning("No tool call or parseable content in conversation analysis response.")
self._log_llm("conversation", elapsed, False, req_json, error="Empty response")
return None
except Exception as e:
elapsed = int((time.monotonic() - t0) * 1000)
logger.error("LLM conversation analysis error: %s", e)
self._log_llm("conversation", elapsed, False, req_json, error=str(e))
return None
@staticmethod
def _validate_conversation_result(result: dict) -> dict:
"""Validate and normalize conversation analysis result."""
findings = result.get("user_findings", [])
for finding in findings:
finding.setdefault("username", "unknown")
score = float(finding.get("toxicity_score", 0.0))
finding["toxicity_score"] = min(max(score, 0.0), 1.0)
if not isinstance(finding.get("categories"), list):
finding["categories"] = ["none"]
finding.setdefault("reasoning", "")
finding.setdefault("worst_message", None)
finding["off_topic"] = bool(finding.get("off_topic", False))
finding.setdefault("topic_category", "general_chat")
finding.setdefault("topic_reasoning", "")
coherence = float(finding.get("coherence_score", 0.85))
finding["coherence_score"] = min(max(coherence, 0.0), 1.0)
finding.setdefault("coherence_flag", "normal")
finding.setdefault("note_update", None)
finding.setdefault("detected_game", None)
result["user_findings"] = findings
result.setdefault("conversation_summary", "")
return result
def _parse_conversation_content_fallback(self, text: str) -> dict | None:
"""Try to parse plain-text content as JSON for conversation analysis."""
import re
# Try direct JSON parse
try:
result = json.loads(text.strip())
if "user_findings" in result:
return self._validate_conversation_result(result)
except (json.JSONDecodeError, ValueError):
pass
# Try extracting from code block
match = re.search(r"```(?:json)?\s*(\{.*?\})\s*```", text, re.DOTALL)
if match:
try:
result = json.loads(match.group(1))
if "user_findings" in result:
return self._validate_conversation_result(result)
except (json.JSONDecodeError, ValueError):
pass
logger.warning("Could not parse conversation content fallback: %s", text[:200])
return None
async def chat(
self, messages: list[dict[str, str]], system_prompt: str,
on_first_token=None, recent_bot_replies: list[str] | None = None,
) -> str | None:
"""Send a conversational chat request (no tools).
If *on_first_token* is an async callable it will be awaited once the
first content token arrives (useful for triggering the typing indicator
only after the model starts generating).
"""
patched = list(messages)
# Append recent bot replies to the system prompt so the model avoids
# repeating the same phrases / joke structures.
effective_prompt = system_prompt
if recent_bot_replies:
avoid_block = "\n".join(f"- {r}" for r in recent_bot_replies)
effective_prompt += (
"\n\nIMPORTANT — you recently said the following. "
"Do NOT reuse any of these phrases, sentence structures, or joke patterns. "
"Come up with something completely different.\n"
+ avoid_block
)
req_json = json.dumps([
{"role": "system", "content": effective_prompt[:500]},
*[{"role": m["role"], "content": str(m.get("content", ""))[:200]} for m in patched],
], default=str)
t0 = time.monotonic()
async with self._semaphore:
try:
temp_kwargs = {"temperature": 0.9, "frequency_penalty": 0.8, "presence_penalty": 0.6} if self._supports_temperature else {}
stream = await self._client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": effective_prompt},
*patched,
],
**temp_kwargs,
max_completion_tokens=2048,
stream=True,
)
chunks: list[str] = []
notified = False
async for chunk in stream:
delta = chunk.choices[0].delta if chunk.choices else None
if delta and delta.content:
if not notified and on_first_token:
await on_first_token()
notified = True
chunks.append(delta.content)
content = "".join(chunks).strip()
elapsed = int((time.monotonic() - t0) * 1000)
self._log_llm("chat", elapsed, bool(content), req_json, content or None)
return content if content else None
except Exception as e:
elapsed = int((time.monotonic() - t0) * 1000)
logger.error("LLM chat error: %s", e)
self._log_llm("chat", elapsed, False, req_json, error=str(e))
return None
async def analyze_image(
self,
image_bytes: bytes,
system_prompt: str,
user_text: str = "",
on_first_token=None,
) -> str | None:
"""Send an image to the vision model with a system prompt.
Returns the generated text response, or None on failure.
"""
b64 = base64.b64encode(image_bytes).decode()
data_url = f"data:image/png;base64,{b64}"
user_content: list[dict] = [
{"type": "image_url", "image_url": {"url": data_url}},
]
text_part = user_text or ""
if self._no_think:
text_part = (text_part + "\n/no_think").strip()
if text_part:
user_content.append({"type": "text", "text": text_part})
req_json = json.dumps([
{"role": "system", "content": system_prompt[:500]},
{"role": "user", "content": f"[image {len(image_bytes)} bytes] {user_text or ''}"},
], default=str)
t0 = time.monotonic()
async with self._semaphore:
try:
async def _stream_image():
temp_kwargs = {"temperature": 0.8} if self._supports_temperature else {}
stream = await self._client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_content},
],
**temp_kwargs,
max_completion_tokens=2048,
stream=True,
)
chunks: list[str] = []
notified = False
async for chunk in stream:
delta = chunk.choices[0].delta if chunk.choices else None
if delta and delta.content:
if not notified and on_first_token:
await on_first_token()
notified = True
chunks.append(delta.content)
return "".join(chunks).strip()
content = await asyncio.wait_for(_stream_image(), timeout=120)
elapsed = int((time.monotonic() - t0) * 1000)
self._log_llm("image", elapsed, bool(content), req_json, content or None)
return content if content else None
except asyncio.TimeoutError:
elapsed = int((time.monotonic() - t0) * 1000)
logger.error("LLM image analysis timed out after %ds", elapsed // 1000)
self._log_llm("image", elapsed, False, req_json, error="Timeout")
return None
except Exception as e:
elapsed = int((time.monotonic() - t0) * 1000)
logger.error("LLM image analysis error: %s", e)
self._log_llm("image", elapsed, False, req_json, error=str(e))
return None
async def raw_analyze(
self, message: str, context: str = "", user_notes: str = "",
channel_context: str = "",
) -> tuple[str, dict | None]:
"""Return the raw LLM response string AND parsed result for /bcs-test (single LLM call)."""
user_content = f"=== RECENT CHANNEL MESSAGES (for background context only) ===\n{context}\n\n"
if user_notes:
user_content += f"=== NOTES ABOUT THIS USER (from prior analysis) ===\n{user_notes}\n\n"
if channel_context:
user_content += f"=== CHANNEL INFO ===\n{channel_context}\n\n"
user_content += f"=== TARGET MESSAGE (analyze THIS message only) ===\n{message}"
user_content = self._append_no_think(user_content)
req_json = json.dumps([
{"role": "system", "content": SYSTEM_PROMPT[:500]},
{"role": "user", "content": user_content},
], default=str)
t0 = time.monotonic()
async with self._semaphore:
try:
temp_kwargs = {"temperature": 0.1} if self._supports_temperature else {}
response = await self._client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": user_content},
],
tools=[ANALYSIS_TOOL],
tool_choice={"type": "function", "function": {"name": "report_analysis"}},
**temp_kwargs,
max_completion_tokens=2048,
)
elapsed = int((time.monotonic() - t0) * 1000)
choice = response.choices[0]
usage = response.usage
parts = []
parsed = None
if choice.message.content:
parts.append(f"Content: {choice.message.content}")
if choice.message.tool_calls:
for tc in choice.message.tool_calls:
parts.append(
f"Tool call: {tc.function.name}({tc.function.arguments})"
)
# Parse the first tool call
args = json.loads(choice.message.tool_calls[0].function.arguments)
parsed = self._validate_result(args)
elif choice.message.content:
parsed = self._parse_content_fallback(choice.message.content)
raw = "\n".join(parts) or "(empty response)"
self._log_llm("raw_analyze", elapsed, parsed is not None, req_json, raw,
input_tokens=usage.prompt_tokens if usage else None,
output_tokens=usage.completion_tokens if usage else None)
return raw, parsed
except Exception as e:
elapsed = int((time.monotonic() - t0) * 1000)
self._log_llm("raw_analyze", elapsed, False, req_json, error=str(e))
return f"Error: {e}", None