Files
Breehavior-Monitor/utils/llm_client.py
AJ Isaacs 86aacfb84f Add 120s timeout to image analysis streaming
The vision model request was hanging indefinitely, freezing the bot.
The streaming loop had no timeout so if the model never returned
chunks, the bot would wait forever. Now times out after 2 minutes.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-23 10:37:37 -05:00

477 lines
20 KiB
Python

import asyncio
import base64
import json
import logging
import time
from pathlib import Path
from openai import AsyncOpenAI
logger = logging.getLogger("bcs.llm")
_PROMPTS_DIR = Path(__file__).resolve().parent.parent / "prompts"
SYSTEM_PROMPT = (_PROMPTS_DIR / "analysis.txt").read_text(encoding="utf-8")
ANALYSIS_TOOL = {
"type": "function",
"function": {
"name": "report_analysis",
"description": "Report the toxicity and topic analysis of a Discord message.",
"parameters": {
"type": "object",
"properties": {
"toxicity_score": {
"type": "number",
"description": "Toxicity rating from 0.0 (completely harmless) to 1.0 (extremely toxic).",
},
"categories": {
"type": "array",
"items": {
"type": "string",
"enum": [
"aggressive",
"passive_aggressive",
"instigating",
"hostile",
"manipulative",
"sexual_vulgar",
"none",
],
},
"description": "Detected toxicity behavior categories.",
},
"reasoning": {
"type": "string",
"description": "Brief explanation of the toxicity analysis.",
},
"off_topic": {
"type": "boolean",
"description": "True if the message is off-topic personal drama rather than gaming-related conversation.",
},
"topic_category": {
"type": "string",
"enum": [
"gaming",
"personal_drama",
"relationship_issues",
"real_life_venting",
"gossip",
"general_chat",
"meta",
],
"description": "What topic category the message falls into.",
},
"topic_reasoning": {
"type": "string",
"description": "Brief explanation of the topic classification.",
},
"coherence_score": {
"type": "number",
"description": "Coherence rating from 0.0 (incoherent gibberish) to 1.0 (clear and well-written). Normal texting shortcuts are fine.",
},
"coherence_flag": {
"type": "string",
"enum": [
"normal",
"intoxicated",
"tired",
"angry_typing",
"mobile_keyboard",
"language_barrier",
],
"description": "Best guess at why coherence is low, if applicable.",
},
"note_update": {
"type": ["string", "null"],
"description": "Brief new observation about this user's style/behavior for future reference, or null if nothing new.",
},
"detected_game": {
"type": ["string", "null"],
"description": "The game channel name this message is about (e.g. 'gta-online', 'warzone'), or null if not game-specific.",
},
"disagreement_detected": {
"type": "boolean",
"description": "True if the target message is part of a clear disagreement between two users in the recent context. Only flag genuine back-and-forth debates, not one-off opinions.",
},
"disagreement_summary": {
"type": ["object", "null"],
"description": "If disagreement_detected is true, summarize the disagreement. Null otherwise.",
"properties": {
"topic": {
"type": "string",
"description": "Short topic of the disagreement (max 60 chars, e.g. 'Are snipers OP in Warzone?').",
},
"side_a": {
"type": "string",
"description": "First user's position (max 50 chars, e.g. 'Snipers are overpowered').",
},
"side_b": {
"type": "string",
"description": "Second user's position (max 50 chars, e.g. 'Snipers are balanced').",
},
"user_a": {
"type": "string",
"description": "Display name of the first user.",
},
"user_b": {
"type": "string",
"description": "Display name of the second user.",
},
},
},
},
"required": ["toxicity_score", "categories", "reasoning", "off_topic", "topic_category", "topic_reasoning", "coherence_score", "coherence_flag"],
},
},
}
class LLMClient:
def __init__(self, base_url: str, model: str, api_key: str = "not-needed", db=None):
self.model = model
self.host = base_url.rstrip("/")
self._db = db
self._client = AsyncOpenAI(
base_url=f"{self.host}/v1",
api_key=api_key,
timeout=600.0, # 10 min — first request loads model into VRAM
)
self._semaphore = asyncio.Semaphore(1) # serialize requests to avoid overloading
def _log_llm(self, request_type: str, duration_ms: int, success: bool,
request: str, response: str | None = None, error: str | None = None,
input_tokens: int | None = None, output_tokens: int | None = None):
"""Fire-and-forget LLM log entry to the database."""
if not self._db:
return
asyncio.create_task(self._db.save_llm_log(
request_type=request_type,
model=self.model,
duration_ms=duration_ms,
success=success,
request=request,
response=response,
error=error,
input_tokens=input_tokens,
output_tokens=output_tokens,
))
async def close(self):
await self._client.close()
async def analyze_message(
self, message: str, context: str = "", user_notes: str = "",
channel_context: str = "",
) -> dict | None:
user_content = f"=== RECENT CHANNEL MESSAGES (for background context only) ===\n{context}\n\n"
if user_notes:
user_content += f"=== NOTES ABOUT THIS USER (from prior analysis) ===\n{user_notes}\n\n"
if channel_context:
user_content += f"=== CHANNEL INFO ===\n{channel_context}\n\n"
user_content += f"=== TARGET MESSAGE (analyze THIS message only) ===\n{message}\n/no_think"
req_json = json.dumps([
{"role": "system", "content": SYSTEM_PROMPT[:500]},
{"role": "user", "content": user_content},
], default=str)
t0 = time.monotonic()
async with self._semaphore:
try:
response = await self._client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": user_content},
],
tools=[ANALYSIS_TOOL],
tool_choice={"type": "function", "function": {"name": "report_analysis"}},
temperature=0.1,
max_tokens=2048,
)
elapsed = int((time.monotonic() - t0) * 1000)
choice = response.choices[0]
usage = response.usage
# Extract tool call arguments
if choice.message.tool_calls:
tool_call = choice.message.tool_calls[0]
resp_text = tool_call.function.arguments
args = json.loads(resp_text)
self._log_llm("analysis", elapsed, True, req_json, resp_text,
input_tokens=usage.prompt_tokens if usage else None,
output_tokens=usage.completion_tokens if usage else None)
return self._validate_result(args)
# Fallback: try parsing the message content as JSON
if choice.message.content:
self._log_llm("analysis", elapsed, True, req_json, choice.message.content,
input_tokens=usage.prompt_tokens if usage else None,
output_tokens=usage.completion_tokens if usage else None)
return self._parse_content_fallback(choice.message.content)
logger.warning("No tool call or content in LLM response.")
self._log_llm("analysis", elapsed, False, req_json, error="Empty response")
return None
except Exception as e:
elapsed = int((time.monotonic() - t0) * 1000)
logger.error("LLM analysis error: %s", e)
self._log_llm("analysis", elapsed, False, req_json, error=str(e))
return None
def _validate_result(self, result: dict) -> dict:
score = float(result.get("toxicity_score", 0.0))
result["toxicity_score"] = min(max(score, 0.0), 1.0)
if not isinstance(result.get("categories"), list):
result["categories"] = ["none"]
if not isinstance(result.get("reasoning"), str):
result["reasoning"] = ""
result["off_topic"] = bool(result.get("off_topic", False))
result.setdefault("topic_category", "general_chat")
result.setdefault("topic_reasoning", "")
coherence = float(result.get("coherence_score", 0.85))
result["coherence_score"] = min(max(coherence, 0.0), 1.0)
result.setdefault("coherence_flag", "normal")
result.setdefault("note_update", None)
result.setdefault("detected_game", None)
result["disagreement_detected"] = bool(result.get("disagreement_detected", False))
summary = result.get("disagreement_summary")
if result["disagreement_detected"] and isinstance(summary, dict):
# Truncate fields to Discord poll limits
summary["topic"] = str(summary.get("topic", ""))[:60]
summary["side_a"] = str(summary.get("side_a", ""))[:50]
summary["side_b"] = str(summary.get("side_b", ""))[:50]
summary.setdefault("user_a", "")
summary.setdefault("user_b", "")
result["disagreement_summary"] = summary
else:
result["disagreement_summary"] = None
return result
def _parse_content_fallback(self, text: str) -> dict | None:
"""Try to parse plain-text content as JSON if tool calling didn't work."""
import re
# Try direct JSON
try:
result = json.loads(text.strip())
return self._validate_result(result)
except (json.JSONDecodeError, ValueError):
pass
# Try extracting from code block
match = re.search(r"```(?:json)?\s*(\{.*?\})\s*```", text, re.DOTALL)
if match:
try:
result = json.loads(match.group(1))
return self._validate_result(result)
except (json.JSONDecodeError, ValueError):
pass
# Regex fallback for toxicity_score
score_match = re.search(r'"toxicity_score"\s*:\s*([\d.]+)', text)
if score_match:
return {
"toxicity_score": min(max(float(score_match.group(1)), 0.0), 1.0),
"categories": ["unknown"],
"reasoning": "Parsed via fallback regex",
}
logger.warning("Could not parse LLM content fallback: %s", text[:200])
return None
async def chat(
self, messages: list[dict[str, str]], system_prompt: str,
on_first_token=None,
) -> str | None:
"""Send a conversational chat request (no tools).
If *on_first_token* is an async callable it will be awaited once the
first content token arrives (useful for triggering the typing indicator
only after the model starts generating).
"""
# Append /no_think to the last user message to disable thinking on Qwen3
patched = []
for m in messages:
patched.append(m)
if patched and patched[-1].get("role") == "user":
patched[-1] = {**patched[-1], "content": patched[-1]["content"] + "\n/no_think"}
req_json = json.dumps([
{"role": "system", "content": system_prompt[:500]},
*[{"role": m["role"], "content": str(m.get("content", ""))[:200]} for m in patched],
], default=str)
t0 = time.monotonic()
async with self._semaphore:
try:
stream = await self._client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": system_prompt},
*patched,
],
temperature=0.8,
max_tokens=2048,
stream=True,
)
chunks: list[str] = []
notified = False
async for chunk in stream:
delta = chunk.choices[0].delta if chunk.choices else None
if delta and delta.content:
if not notified and on_first_token:
await on_first_token()
notified = True
chunks.append(delta.content)
content = "".join(chunks).strip()
elapsed = int((time.monotonic() - t0) * 1000)
self._log_llm("chat", elapsed, bool(content), req_json, content or None)
return content if content else None
except Exception as e:
elapsed = int((time.monotonic() - t0) * 1000)
logger.error("LLM chat error: %s", e)
self._log_llm("chat", elapsed, False, req_json, error=str(e))
return None
async def analyze_image(
self,
image_bytes: bytes,
system_prompt: str,
user_text: str = "",
on_first_token=None,
) -> str | None:
"""Send an image to the vision model with a system prompt.
Returns the generated text response, or None on failure.
"""
b64 = base64.b64encode(image_bytes).decode()
data_url = f"data:image/png;base64,{b64}"
user_content: list[dict] = [
{"type": "image_url", "image_url": {"url": data_url}},
]
user_content.append({"type": "text", "text": (user_text or "") + "\n/no_think"})
req_json = json.dumps([
{"role": "system", "content": system_prompt[:500]},
{"role": "user", "content": f"[image {len(image_bytes)} bytes] {user_text or ''}"},
], default=str)
t0 = time.monotonic()
async with self._semaphore:
try:
async def _stream_image():
stream = await self._client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_content},
],
temperature=0.8,
max_tokens=2048,
stream=True,
)
chunks: list[str] = []
notified = False
async for chunk in stream:
delta = chunk.choices[0].delta if chunk.choices else None
if delta and delta.content:
if not notified and on_first_token:
await on_first_token()
notified = True
chunks.append(delta.content)
return "".join(chunks).strip()
content = await asyncio.wait_for(_stream_image(), timeout=120)
elapsed = int((time.monotonic() - t0) * 1000)
self._log_llm("image", elapsed, bool(content), req_json, content or None)
return content if content else None
except asyncio.TimeoutError:
elapsed = int((time.monotonic() - t0) * 1000)
logger.error("LLM image analysis timed out after %ds", elapsed // 1000)
self._log_llm("image", elapsed, False, req_json, error="Timeout")
return None
except Exception as e:
elapsed = int((time.monotonic() - t0) * 1000)
logger.error("LLM image analysis error: %s", e)
self._log_llm("image", elapsed, False, req_json, error=str(e))
return None
async def raw_analyze(
self, message: str, context: str = "", user_notes: str = "",
channel_context: str = "",
) -> tuple[str, dict | None]:
"""Return the raw LLM response string AND parsed result for /bcs-test (single LLM call)."""
user_content = f"=== RECENT CHANNEL MESSAGES (for background context only) ===\n{context}\n\n"
if user_notes:
user_content += f"=== NOTES ABOUT THIS USER (from prior analysis) ===\n{user_notes}\n\n"
if channel_context:
user_content += f"=== CHANNEL INFO ===\n{channel_context}\n\n"
user_content += f"=== TARGET MESSAGE (analyze THIS message only) ===\n{message}\n/no_think"
req_json = json.dumps([
{"role": "system", "content": SYSTEM_PROMPT[:500]},
{"role": "user", "content": user_content},
], default=str)
t0 = time.monotonic()
async with self._semaphore:
try:
response = await self._client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": user_content},
],
tools=[ANALYSIS_TOOL],
tool_choice={"type": "function", "function": {"name": "report_analysis"}},
temperature=0.1,
max_tokens=2048,
)
elapsed = int((time.monotonic() - t0) * 1000)
choice = response.choices[0]
usage = response.usage
parts = []
parsed = None
if choice.message.content:
parts.append(f"Content: {choice.message.content}")
if choice.message.tool_calls:
for tc in choice.message.tool_calls:
parts.append(
f"Tool call: {tc.function.name}({tc.function.arguments})"
)
# Parse the first tool call
args = json.loads(choice.message.tool_calls[0].function.arguments)
parsed = self._validate_result(args)
elif choice.message.content:
parsed = self._parse_content_fallback(choice.message.content)
raw = "\n".join(parts) or "(empty response)"
self._log_llm("raw_analyze", elapsed, parsed is not None, req_json, raw,
input_tokens=usage.prompt_tokens if usage else None,
output_tokens=usage.completion_tokens if usage else None)
return raw, parsed
except Exception as e:
elapsed = int((time.monotonic() - t0) * 1000)
self._log_llm("raw_analyze", elapsed, False, req_json, error=str(e))
return f"Error: {e}", None