- Default models: gpt-4o-mini (triage), gpt-4o (escalation) - Remove Qwen-specific /no_think hacks - Reduce timeout from 600s to 120s, increase concurrency semaphore to 4 - Support empty LLM_BASE_URL to use OpenAI directly Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
470 lines
20 KiB
Python
470 lines
20 KiB
Python
import asyncio
|
|
import base64
|
|
import json
|
|
import logging
|
|
import time
|
|
from pathlib import Path
|
|
|
|
from openai import AsyncOpenAI
|
|
|
|
logger = logging.getLogger("bcs.llm")
|
|
|
|
_PROMPTS_DIR = Path(__file__).resolve().parent.parent / "prompts"
|
|
|
|
SYSTEM_PROMPT = (_PROMPTS_DIR / "analysis.txt").read_text(encoding="utf-8")
|
|
|
|
ANALYSIS_TOOL = {
|
|
"type": "function",
|
|
"function": {
|
|
"name": "report_analysis",
|
|
"description": "Report the toxicity and topic analysis of a Discord message.",
|
|
"parameters": {
|
|
"type": "object",
|
|
"properties": {
|
|
"toxicity_score": {
|
|
"type": "number",
|
|
"description": "Toxicity rating from 0.0 (completely harmless) to 1.0 (extremely toxic).",
|
|
},
|
|
"categories": {
|
|
"type": "array",
|
|
"items": {
|
|
"type": "string",
|
|
"enum": [
|
|
"aggressive",
|
|
"passive_aggressive",
|
|
"instigating",
|
|
"hostile",
|
|
"manipulative",
|
|
"sexual_vulgar",
|
|
"none",
|
|
],
|
|
},
|
|
"description": "Detected toxicity behavior categories.",
|
|
},
|
|
"reasoning": {
|
|
"type": "string",
|
|
"description": "Brief explanation of the toxicity analysis.",
|
|
},
|
|
"off_topic": {
|
|
"type": "boolean",
|
|
"description": "True if the message is off-topic personal drama rather than gaming-related conversation.",
|
|
},
|
|
"topic_category": {
|
|
"type": "string",
|
|
"enum": [
|
|
"gaming",
|
|
"personal_drama",
|
|
"relationship_issues",
|
|
"real_life_venting",
|
|
"gossip",
|
|
"general_chat",
|
|
"meta",
|
|
],
|
|
"description": "What topic category the message falls into.",
|
|
},
|
|
"topic_reasoning": {
|
|
"type": "string",
|
|
"description": "Brief explanation of the topic classification.",
|
|
},
|
|
"coherence_score": {
|
|
"type": "number",
|
|
"description": "Coherence rating from 0.0 (incoherent gibberish) to 1.0 (clear and well-written). Normal texting shortcuts are fine.",
|
|
},
|
|
"coherence_flag": {
|
|
"type": "string",
|
|
"enum": [
|
|
"normal",
|
|
"intoxicated",
|
|
"tired",
|
|
"angry_typing",
|
|
"mobile_keyboard",
|
|
"language_barrier",
|
|
],
|
|
"description": "Best guess at why coherence is low, if applicable.",
|
|
},
|
|
"note_update": {
|
|
"type": ["string", "null"],
|
|
"description": "Brief new observation about this user's style/behavior for future reference, or null if nothing new.",
|
|
},
|
|
"detected_game": {
|
|
"type": ["string", "null"],
|
|
"description": "The game channel name this message is about (e.g. 'gta-online', 'warzone'), or null if not game-specific.",
|
|
},
|
|
"disagreement_detected": {
|
|
"type": "boolean",
|
|
"description": "True if the target message is part of a clear disagreement between two users in the recent context. Only flag genuine back-and-forth debates, not one-off opinions.",
|
|
},
|
|
"disagreement_summary": {
|
|
"type": ["object", "null"],
|
|
"description": "If disagreement_detected is true, summarize the disagreement. Null otherwise.",
|
|
"properties": {
|
|
"topic": {
|
|
"type": "string",
|
|
"description": "Short topic of the disagreement (max 60 chars, e.g. 'Are snipers OP in Warzone?').",
|
|
},
|
|
"side_a": {
|
|
"type": "string",
|
|
"description": "First user's position (max 50 chars, e.g. 'Snipers are overpowered').",
|
|
},
|
|
"side_b": {
|
|
"type": "string",
|
|
"description": "Second user's position (max 50 chars, e.g. 'Snipers are balanced').",
|
|
},
|
|
"user_a": {
|
|
"type": "string",
|
|
"description": "Display name of the first user.",
|
|
},
|
|
"user_b": {
|
|
"type": "string",
|
|
"description": "Display name of the second user.",
|
|
},
|
|
},
|
|
},
|
|
},
|
|
"required": ["toxicity_score", "categories", "reasoning", "off_topic", "topic_category", "topic_reasoning", "coherence_score", "coherence_flag"],
|
|
},
|
|
},
|
|
}
|
|
|
|
|
|
class LLMClient:
|
|
def __init__(self, base_url: str, model: str, api_key: str = "not-needed", db=None):
|
|
self.model = model
|
|
self.host = base_url.rstrip("/")
|
|
self._db = db
|
|
client_kwargs = {"api_key": api_key, "timeout": 120.0}
|
|
if self.host:
|
|
client_kwargs["base_url"] = f"{self.host}/v1"
|
|
self._client = AsyncOpenAI(**client_kwargs)
|
|
self._semaphore = asyncio.Semaphore(4)
|
|
|
|
def _log_llm(self, request_type: str, duration_ms: int, success: bool,
|
|
request: str, response: str | None = None, error: str | None = None,
|
|
input_tokens: int | None = None, output_tokens: int | None = None):
|
|
"""Fire-and-forget LLM log entry to the database."""
|
|
if not self._db:
|
|
return
|
|
asyncio.create_task(self._db.save_llm_log(
|
|
request_type=request_type,
|
|
model=self.model,
|
|
duration_ms=duration_ms,
|
|
success=success,
|
|
request=request,
|
|
response=response,
|
|
error=error,
|
|
input_tokens=input_tokens,
|
|
output_tokens=output_tokens,
|
|
))
|
|
|
|
async def close(self):
|
|
await self._client.close()
|
|
|
|
async def analyze_message(
|
|
self, message: str, context: str = "", user_notes: str = "",
|
|
channel_context: str = "",
|
|
) -> dict | None:
|
|
user_content = f"=== RECENT CHANNEL MESSAGES (for background context only) ===\n{context}\n\n"
|
|
if user_notes:
|
|
user_content += f"=== NOTES ABOUT THIS USER (from prior analysis) ===\n{user_notes}\n\n"
|
|
if channel_context:
|
|
user_content += f"=== CHANNEL INFO ===\n{channel_context}\n\n"
|
|
user_content += f"=== TARGET MESSAGE (analyze THIS message only) ===\n{message}\n"
|
|
|
|
req_json = json.dumps([
|
|
{"role": "system", "content": SYSTEM_PROMPT[:500]},
|
|
{"role": "user", "content": user_content},
|
|
], default=str)
|
|
t0 = time.monotonic()
|
|
|
|
async with self._semaphore:
|
|
try:
|
|
response = await self._client.chat.completions.create(
|
|
model=self.model,
|
|
messages=[
|
|
{"role": "system", "content": SYSTEM_PROMPT},
|
|
{"role": "user", "content": user_content},
|
|
],
|
|
tools=[ANALYSIS_TOOL],
|
|
tool_choice={"type": "function", "function": {"name": "report_analysis"}},
|
|
temperature=0.1,
|
|
max_tokens=2048,
|
|
)
|
|
|
|
elapsed = int((time.monotonic() - t0) * 1000)
|
|
choice = response.choices[0]
|
|
usage = response.usage
|
|
|
|
# Extract tool call arguments
|
|
if choice.message.tool_calls:
|
|
tool_call = choice.message.tool_calls[0]
|
|
resp_text = tool_call.function.arguments
|
|
args = json.loads(resp_text)
|
|
self._log_llm("analysis", elapsed, True, req_json, resp_text,
|
|
input_tokens=usage.prompt_tokens if usage else None,
|
|
output_tokens=usage.completion_tokens if usage else None)
|
|
return self._validate_result(args)
|
|
|
|
# Fallback: try parsing the message content as JSON
|
|
if choice.message.content:
|
|
self._log_llm("analysis", elapsed, True, req_json, choice.message.content,
|
|
input_tokens=usage.prompt_tokens if usage else None,
|
|
output_tokens=usage.completion_tokens if usage else None)
|
|
return self._parse_content_fallback(choice.message.content)
|
|
|
|
logger.warning("No tool call or content in LLM response.")
|
|
self._log_llm("analysis", elapsed, False, req_json, error="Empty response")
|
|
return None
|
|
|
|
except Exception as e:
|
|
elapsed = int((time.monotonic() - t0) * 1000)
|
|
logger.error("LLM analysis error: %s", e)
|
|
self._log_llm("analysis", elapsed, False, req_json, error=str(e))
|
|
return None
|
|
|
|
def _validate_result(self, result: dict) -> dict:
|
|
score = float(result.get("toxicity_score", 0.0))
|
|
result["toxicity_score"] = min(max(score, 0.0), 1.0)
|
|
|
|
if not isinstance(result.get("categories"), list):
|
|
result["categories"] = ["none"]
|
|
|
|
if not isinstance(result.get("reasoning"), str):
|
|
result["reasoning"] = ""
|
|
|
|
result["off_topic"] = bool(result.get("off_topic", False))
|
|
result.setdefault("topic_category", "general_chat")
|
|
result.setdefault("topic_reasoning", "")
|
|
|
|
coherence = float(result.get("coherence_score", 0.85))
|
|
result["coherence_score"] = min(max(coherence, 0.0), 1.0)
|
|
result.setdefault("coherence_flag", "normal")
|
|
|
|
result.setdefault("note_update", None)
|
|
result.setdefault("detected_game", None)
|
|
|
|
result["disagreement_detected"] = bool(result.get("disagreement_detected", False))
|
|
summary = result.get("disagreement_summary")
|
|
if result["disagreement_detected"] and isinstance(summary, dict):
|
|
# Truncate fields to Discord poll limits
|
|
summary["topic"] = str(summary.get("topic", ""))[:60]
|
|
summary["side_a"] = str(summary.get("side_a", ""))[:50]
|
|
summary["side_b"] = str(summary.get("side_b", ""))[:50]
|
|
summary.setdefault("user_a", "")
|
|
summary.setdefault("user_b", "")
|
|
result["disagreement_summary"] = summary
|
|
else:
|
|
result["disagreement_summary"] = None
|
|
|
|
return result
|
|
|
|
def _parse_content_fallback(self, text: str) -> dict | None:
|
|
"""Try to parse plain-text content as JSON if tool calling didn't work."""
|
|
import re
|
|
|
|
# Try direct JSON
|
|
try:
|
|
result = json.loads(text.strip())
|
|
return self._validate_result(result)
|
|
except (json.JSONDecodeError, ValueError):
|
|
pass
|
|
|
|
# Try extracting from code block
|
|
match = re.search(r"```(?:json)?\s*(\{.*?\})\s*```", text, re.DOTALL)
|
|
if match:
|
|
try:
|
|
result = json.loads(match.group(1))
|
|
return self._validate_result(result)
|
|
except (json.JSONDecodeError, ValueError):
|
|
pass
|
|
|
|
# Regex fallback for toxicity_score
|
|
score_match = re.search(r'"toxicity_score"\s*:\s*([\d.]+)', text)
|
|
if score_match:
|
|
return {
|
|
"toxicity_score": min(max(float(score_match.group(1)), 0.0), 1.0),
|
|
"categories": ["unknown"],
|
|
"reasoning": "Parsed via fallback regex",
|
|
}
|
|
|
|
logger.warning("Could not parse LLM content fallback: %s", text[:200])
|
|
return None
|
|
|
|
async def chat(
|
|
self, messages: list[dict[str, str]], system_prompt: str,
|
|
on_first_token=None,
|
|
) -> str | None:
|
|
"""Send a conversational chat request (no tools).
|
|
|
|
If *on_first_token* is an async callable it will be awaited once the
|
|
first content token arrives (useful for triggering the typing indicator
|
|
only after the model starts generating).
|
|
"""
|
|
req_json = json.dumps([
|
|
{"role": "system", "content": system_prompt[:500]},
|
|
*[{"role": m["role"], "content": str(m.get("content", ""))[:200]} for m in messages],
|
|
], default=str)
|
|
t0 = time.monotonic()
|
|
|
|
async with self._semaphore:
|
|
try:
|
|
stream = await self._client.chat.completions.create(
|
|
model=self.model,
|
|
messages=[
|
|
{"role": "system", "content": system_prompt},
|
|
*messages,
|
|
],
|
|
temperature=0.8,
|
|
max_tokens=2048,
|
|
stream=True,
|
|
)
|
|
|
|
chunks: list[str] = []
|
|
notified = False
|
|
async for chunk in stream:
|
|
delta = chunk.choices[0].delta if chunk.choices else None
|
|
if delta and delta.content:
|
|
if not notified and on_first_token:
|
|
await on_first_token()
|
|
notified = True
|
|
chunks.append(delta.content)
|
|
|
|
content = "".join(chunks).strip()
|
|
elapsed = int((time.monotonic() - t0) * 1000)
|
|
self._log_llm("chat", elapsed, bool(content), req_json, content or None)
|
|
return content if content else None
|
|
except Exception as e:
|
|
elapsed = int((time.monotonic() - t0) * 1000)
|
|
logger.error("LLM chat error: %s", e)
|
|
self._log_llm("chat", elapsed, False, req_json, error=str(e))
|
|
return None
|
|
|
|
async def analyze_image(
|
|
self,
|
|
image_bytes: bytes,
|
|
system_prompt: str,
|
|
user_text: str = "",
|
|
on_first_token=None,
|
|
) -> str | None:
|
|
"""Send an image to the vision model with a system prompt.
|
|
|
|
Returns the generated text response, or None on failure.
|
|
"""
|
|
b64 = base64.b64encode(image_bytes).decode()
|
|
data_url = f"data:image/png;base64,{b64}"
|
|
|
|
user_content: list[dict] = [
|
|
{"type": "image_url", "image_url": {"url": data_url}},
|
|
]
|
|
if user_text:
|
|
user_content.append({"type": "text", "text": user_text})
|
|
|
|
req_json = json.dumps([
|
|
{"role": "system", "content": system_prompt[:500]},
|
|
{"role": "user", "content": f"[image {len(image_bytes)} bytes] {user_text or ''}"},
|
|
], default=str)
|
|
t0 = time.monotonic()
|
|
|
|
async with self._semaphore:
|
|
try:
|
|
async def _stream_image():
|
|
stream = await self._client.chat.completions.create(
|
|
model=self.model,
|
|
messages=[
|
|
{"role": "system", "content": system_prompt},
|
|
{"role": "user", "content": user_content},
|
|
],
|
|
temperature=0.8,
|
|
max_tokens=2048,
|
|
stream=True,
|
|
)
|
|
|
|
chunks: list[str] = []
|
|
notified = False
|
|
async for chunk in stream:
|
|
delta = chunk.choices[0].delta if chunk.choices else None
|
|
if delta and delta.content:
|
|
if not notified and on_first_token:
|
|
await on_first_token()
|
|
notified = True
|
|
chunks.append(delta.content)
|
|
|
|
return "".join(chunks).strip()
|
|
|
|
content = await asyncio.wait_for(_stream_image(), timeout=120)
|
|
elapsed = int((time.monotonic() - t0) * 1000)
|
|
self._log_llm("image", elapsed, bool(content), req_json, content or None)
|
|
return content if content else None
|
|
except asyncio.TimeoutError:
|
|
elapsed = int((time.monotonic() - t0) * 1000)
|
|
logger.error("LLM image analysis timed out after %ds", elapsed // 1000)
|
|
self._log_llm("image", elapsed, False, req_json, error="Timeout")
|
|
return None
|
|
except Exception as e:
|
|
elapsed = int((time.monotonic() - t0) * 1000)
|
|
logger.error("LLM image analysis error: %s", e)
|
|
self._log_llm("image", elapsed, False, req_json, error=str(e))
|
|
return None
|
|
|
|
async def raw_analyze(
|
|
self, message: str, context: str = "", user_notes: str = "",
|
|
channel_context: str = "",
|
|
) -> tuple[str, dict | None]:
|
|
"""Return the raw LLM response string AND parsed result for /bcs-test (single LLM call)."""
|
|
user_content = f"=== RECENT CHANNEL MESSAGES (for background context only) ===\n{context}\n\n"
|
|
if user_notes:
|
|
user_content += f"=== NOTES ABOUT THIS USER (from prior analysis) ===\n{user_notes}\n\n"
|
|
if channel_context:
|
|
user_content += f"=== CHANNEL INFO ===\n{channel_context}\n\n"
|
|
user_content += f"=== TARGET MESSAGE (analyze THIS message only) ===\n{message}\n"
|
|
|
|
req_json = json.dumps([
|
|
{"role": "system", "content": SYSTEM_PROMPT[:500]},
|
|
{"role": "user", "content": user_content},
|
|
], default=str)
|
|
t0 = time.monotonic()
|
|
|
|
async with self._semaphore:
|
|
try:
|
|
response = await self._client.chat.completions.create(
|
|
model=self.model,
|
|
messages=[
|
|
{"role": "system", "content": SYSTEM_PROMPT},
|
|
{"role": "user", "content": user_content},
|
|
],
|
|
tools=[ANALYSIS_TOOL],
|
|
tool_choice={"type": "function", "function": {"name": "report_analysis"}},
|
|
temperature=0.1,
|
|
max_tokens=2048,
|
|
)
|
|
|
|
elapsed = int((time.monotonic() - t0) * 1000)
|
|
choice = response.choices[0]
|
|
usage = response.usage
|
|
parts = []
|
|
parsed = None
|
|
|
|
if choice.message.content:
|
|
parts.append(f"Content: {choice.message.content}")
|
|
|
|
if choice.message.tool_calls:
|
|
for tc in choice.message.tool_calls:
|
|
parts.append(
|
|
f"Tool call: {tc.function.name}({tc.function.arguments})"
|
|
)
|
|
# Parse the first tool call
|
|
args = json.loads(choice.message.tool_calls[0].function.arguments)
|
|
parsed = self._validate_result(args)
|
|
elif choice.message.content:
|
|
parsed = self._parse_content_fallback(choice.message.content)
|
|
|
|
raw = "\n".join(parts) or "(empty response)"
|
|
self._log_llm("raw_analyze", elapsed, parsed is not None, req_json, raw,
|
|
input_tokens=usage.prompt_tokens if usage else None,
|
|
output_tokens=usage.completion_tokens if usage else None)
|
|
return raw, parsed
|
|
|
|
except Exception as e:
|
|
elapsed = int((time.monotonic() - t0) * 1000)
|
|
self._log_llm("raw_analyze", elapsed, False, req_json, error=str(e))
|
|
return f"Error: {e}", None
|