1161 lines
52 KiB
Python
1161 lines
52 KiB
Python
import asyncio
|
|
import base64
|
|
import json
|
|
import logging
|
|
import time
|
|
from datetime import datetime, timedelta, timezone
|
|
from pathlib import Path
|
|
|
|
from openai import AsyncOpenAI
|
|
|
|
logger = logging.getLogger("bcs.llm")
|
|
|
|
_PROMPTS_DIR = Path(__file__).resolve().parent.parent / "prompts"
|
|
|
|
SYSTEM_PROMPT = (_PROMPTS_DIR / "analysis.txt").read_text(encoding="utf-8")
|
|
|
|
ANALYSIS_TOOL = {
|
|
"type": "function",
|
|
"function": {
|
|
"name": "report_analysis",
|
|
"description": "Report the toxicity and topic analysis of a Discord message.",
|
|
"parameters": {
|
|
"type": "object",
|
|
"properties": {
|
|
"toxicity_score": {
|
|
"type": "number",
|
|
"description": "Toxicity rating from 0.0 (completely harmless) to 1.0 (extremely toxic).",
|
|
},
|
|
"categories": {
|
|
"type": "array",
|
|
"items": {
|
|
"type": "string",
|
|
"enum": [
|
|
"aggressive",
|
|
"passive_aggressive",
|
|
"instigating",
|
|
"hostile",
|
|
"manipulative",
|
|
"sexual_vulgar",
|
|
"jealousy",
|
|
"none",
|
|
],
|
|
},
|
|
"description": "Detected toxicity behavior categories.",
|
|
},
|
|
"reasoning": {
|
|
"type": "string",
|
|
"description": "Brief explanation of the toxicity analysis.",
|
|
},
|
|
"off_topic": {
|
|
"type": "boolean",
|
|
"description": "True if the message is off-topic personal drama rather than gaming-related conversation.",
|
|
},
|
|
"topic_category": {
|
|
"type": "string",
|
|
"enum": [
|
|
"gaming",
|
|
"personal_drama",
|
|
"relationship_issues",
|
|
"real_life_venting",
|
|
"gossip",
|
|
"general_chat",
|
|
"meta",
|
|
],
|
|
"description": "What topic category the message falls into.",
|
|
},
|
|
"topic_reasoning": {
|
|
"type": "string",
|
|
"description": "Brief explanation of the topic classification.",
|
|
},
|
|
"coherence_score": {
|
|
"type": "number",
|
|
"description": "Coherence rating from 0.0 (incoherent gibberish) to 1.0 (clear and well-written). Normal texting shortcuts are fine.",
|
|
},
|
|
"coherence_flag": {
|
|
"type": "string",
|
|
"enum": [
|
|
"normal",
|
|
"intoxicated",
|
|
"tired",
|
|
"angry_typing",
|
|
"mobile_keyboard",
|
|
"language_barrier",
|
|
],
|
|
"description": "Best guess at why coherence is low, if applicable.",
|
|
},
|
|
"note_update": {
|
|
"type": ["string", "null"],
|
|
"description": "Brief new observation about this user's style/behavior for future reference, or null if nothing new. NEVER quote toxic language — describe patterns abstractly (e.g. 'uses personal insults when frustrated').",
|
|
},
|
|
"detected_game": {
|
|
"type": ["string", "null"],
|
|
"description": "The game channel name this message is about (e.g. 'gta-online', 'warzone'), or null if not game-specific.",
|
|
},
|
|
"violated_rules": {
|
|
"type": "array",
|
|
"items": {"type": "integer"},
|
|
"description": "Rule numbers violated (empty array if none).",
|
|
},
|
|
},
|
|
"required": ["toxicity_score", "categories", "reasoning", "off_topic", "topic_category", "topic_reasoning", "coherence_score", "coherence_flag"],
|
|
},
|
|
},
|
|
}
|
|
|
|
|
|
CONVERSATION_TOOL = {
|
|
"type": "function",
|
|
"function": {
|
|
"name": "report_conversation_scan",
|
|
"description": "Analyze a conversation block and report findings per user.",
|
|
"parameters": {
|
|
"type": "object",
|
|
"properties": {
|
|
"user_findings": {
|
|
"type": "array",
|
|
"items": {
|
|
"type": "object",
|
|
"properties": {
|
|
"username": {
|
|
"type": "string",
|
|
"description": "Discord display name of the user.",
|
|
},
|
|
"toxicity_score": {
|
|
"type": "number",
|
|
"description": "Weighted toxicity 0.0-1.0 across their messages in this conversation.",
|
|
},
|
|
"categories": {
|
|
"type": "array",
|
|
"items": {
|
|
"type": "string",
|
|
"enum": [
|
|
"aggressive",
|
|
"passive_aggressive",
|
|
"instigating",
|
|
"hostile",
|
|
"manipulative",
|
|
"sexual_vulgar",
|
|
"jealousy",
|
|
"none",
|
|
],
|
|
},
|
|
"description": "Detected toxicity behavior categories.",
|
|
},
|
|
"reasoning": {
|
|
"type": "string",
|
|
"description": "Brief explanation of this user's behavior in the conversation.",
|
|
},
|
|
"worst_message": {
|
|
"type": ["string", "null"],
|
|
"description": "Most problematic snippet from this user (quoted, max 100 chars), or null if nothing notable.",
|
|
},
|
|
"off_topic": {
|
|
"type": "boolean",
|
|
"description": "True if this user's messages were primarily off-topic personal drama.",
|
|
},
|
|
"topic_category": {
|
|
"type": "string",
|
|
"enum": [
|
|
"gaming",
|
|
"personal_drama",
|
|
"relationship_issues",
|
|
"real_life_venting",
|
|
"gossip",
|
|
"general_chat",
|
|
"meta",
|
|
],
|
|
"description": "What topic category this user's messages fall into.",
|
|
},
|
|
"topic_reasoning": {
|
|
"type": "string",
|
|
"description": "Brief explanation of the topic classification for this user.",
|
|
},
|
|
"coherence_score": {
|
|
"type": "number",
|
|
"description": "Coherence rating 0.0-1.0 across this user's messages. Normal texting shortcuts are fine.",
|
|
},
|
|
"coherence_flag": {
|
|
"type": "string",
|
|
"enum": [
|
|
"normal",
|
|
"intoxicated",
|
|
"tired",
|
|
"angry_typing",
|
|
"mobile_keyboard",
|
|
"language_barrier",
|
|
],
|
|
"description": "Best guess at why coherence is low, if applicable.",
|
|
},
|
|
"note_update": {
|
|
"type": ["string", "null"],
|
|
"description": "New observation about this user's pattern, or null. NEVER quote toxic language — describe patterns abstractly.",
|
|
},
|
|
"detected_game": {
|
|
"type": ["string", "null"],
|
|
"description": "The game channel name this user's messages are about, or null.",
|
|
},
|
|
"violated_rules": {
|
|
"type": "array",
|
|
"items": {"type": "integer"},
|
|
"description": "Rule numbers violated (empty array if none).",
|
|
},
|
|
},
|
|
"required": ["username", "toxicity_score", "categories", "reasoning", "off_topic", "topic_category", "topic_reasoning", "coherence_score", "coherence_flag"],
|
|
},
|
|
"description": "Findings for each user who participated in the conversation.",
|
|
},
|
|
"conversation_summary": {
|
|
"type": "string",
|
|
"description": "One-sentence summary of the overall conversation tone and any escalation patterns.",
|
|
},
|
|
},
|
|
"required": ["user_findings", "conversation_summary"],
|
|
},
|
|
},
|
|
}
|
|
|
|
MEMORY_EXTRACTION_TOOL = {
|
|
"type": "function",
|
|
"function": {
|
|
"name": "extract_memories",
|
|
"description": "Extract noteworthy memories from a conversation for future reference.",
|
|
"parameters": {
|
|
"type": "object",
|
|
"properties": {
|
|
"memories": {
|
|
"type": "array",
|
|
"items": {
|
|
"type": "object",
|
|
"properties": {
|
|
"memory": {
|
|
"type": "string",
|
|
"description": "A concise fact or observation worth remembering.",
|
|
},
|
|
"topics": {
|
|
"type": "array",
|
|
"items": {"type": "string"},
|
|
"description": "Topic tags for retrieval (e.g., 'gta', 'personal', 'warzone').",
|
|
},
|
|
"expiration": {
|
|
"type": "string",
|
|
"enum": ["1d", "3d", "7d", "30d", "permanent"],
|
|
"description": "How long this memory stays relevant.",
|
|
},
|
|
"importance": {
|
|
"type": "string",
|
|
"enum": ["low", "medium", "high"],
|
|
"description": "How important this memory is for future interactions.",
|
|
},
|
|
},
|
|
"required": ["memory", "topics", "expiration", "importance"],
|
|
},
|
|
"description": "Memories to store. Only include genuinely new or noteworthy information.",
|
|
},
|
|
"profile_update": {
|
|
"type": ["string", "null"],
|
|
"description": "Full updated profile summary incorporating new permanent facts, or null if no profile changes.",
|
|
},
|
|
},
|
|
"required": ["memories"],
|
|
},
|
|
},
|
|
}
|
|
|
|
MEMORY_EXTRACTION_PROMPT = (_PROMPTS_DIR / "memory_extraction.txt").read_text(encoding="utf-8")
|
|
|
|
_NO_TEMPERATURE_MODELS = {"gpt-5-nano", "o1", "o1-mini", "o1-preview", "o3", "o3-mini", "o4-mini"}
|
|
|
|
|
|
class LLMClient:
|
|
def __init__(self, base_url: str, model: str, api_key: str = "not-needed",
|
|
db=None, no_think: bool = False, concurrency: int = 4):
|
|
self.model = model
|
|
self.host = base_url.rstrip("/")
|
|
self._db = db
|
|
self._no_think = no_think
|
|
self._supports_temperature = model not in _NO_TEMPERATURE_MODELS
|
|
timeout = 600.0 if self.host else 120.0 # local models need longer for VRAM load
|
|
client_kwargs = {"api_key": api_key, "timeout": timeout}
|
|
if self.host:
|
|
client_kwargs["base_url"] = f"{self.host}/v1"
|
|
self._client = AsyncOpenAI(**client_kwargs)
|
|
self._semaphore = asyncio.Semaphore(concurrency)
|
|
|
|
def _log_llm(self, request_type: str, duration_ms: int, success: bool,
|
|
request: str, response: str | None = None, error: str | None = None,
|
|
input_tokens: int | None = None, output_tokens: int | None = None):
|
|
"""Fire-and-forget LLM log entry to the database."""
|
|
if not self._db:
|
|
return
|
|
asyncio.create_task(self._db.save_llm_log(
|
|
request_type=request_type,
|
|
model=self.model,
|
|
duration_ms=duration_ms,
|
|
success=success,
|
|
request=request,
|
|
response=response,
|
|
error=error,
|
|
input_tokens=input_tokens,
|
|
output_tokens=output_tokens,
|
|
))
|
|
|
|
def _append_no_think(self, text: str) -> str:
|
|
return text + "\n/no_think" if self._no_think else text
|
|
|
|
async def close(self):
|
|
await self._client.close()
|
|
|
|
async def analyze_message(
|
|
self, message: str, context: str = "", user_notes: str = "",
|
|
channel_context: str = "", mention_context: str = "",
|
|
rules_context: str = "",
|
|
) -> dict | None:
|
|
user_content = f"=== RECENT CHANNEL MESSAGES (for background context only) ===\n{context}\n\n"
|
|
if user_notes:
|
|
user_content += f"=== NOTES ABOUT THIS USER (from prior analysis) ===\n{user_notes}\n\n"
|
|
if channel_context:
|
|
user_content += f"=== CHANNEL INFO ===\n{channel_context}\n\n"
|
|
if rules_context:
|
|
user_content += f"=== SERVER RULES ===\n{rules_context}\n\n"
|
|
if mention_context:
|
|
user_content += f"=== USER REPORT (a user flagged this conversation — focus on this concern) ===\n{mention_context}\n\n"
|
|
user_content += f"=== TARGET MESSAGE (analyze THIS message only) ===\n{message}"
|
|
user_content = self._append_no_think(user_content)
|
|
|
|
req_json = json.dumps([
|
|
{"role": "system", "content": SYSTEM_PROMPT[:500]},
|
|
{"role": "user", "content": user_content},
|
|
], default=str)
|
|
t0 = time.monotonic()
|
|
|
|
async with self._semaphore:
|
|
try:
|
|
temp_kwargs = {"temperature": 0.1} if self._supports_temperature else {}
|
|
response = await self._client.chat.completions.create(
|
|
model=self.model,
|
|
messages=[
|
|
{"role": "system", "content": SYSTEM_PROMPT},
|
|
{"role": "user", "content": user_content},
|
|
],
|
|
tools=[ANALYSIS_TOOL],
|
|
tool_choice={"type": "function", "function": {"name": "report_analysis"}},
|
|
**temp_kwargs,
|
|
max_completion_tokens=2048,
|
|
)
|
|
|
|
elapsed = int((time.monotonic() - t0) * 1000)
|
|
choice = response.choices[0]
|
|
usage = response.usage
|
|
|
|
# Extract tool call arguments
|
|
if choice.message.tool_calls:
|
|
tool_call = choice.message.tool_calls[0]
|
|
resp_text = tool_call.function.arguments
|
|
args = json.loads(resp_text)
|
|
self._log_llm("analysis", elapsed, True, req_json, resp_text,
|
|
input_tokens=usage.prompt_tokens if usage else None,
|
|
output_tokens=usage.completion_tokens if usage else None)
|
|
return self._validate_result(args)
|
|
|
|
# Fallback: try parsing the message content as JSON
|
|
if choice.message.content:
|
|
self._log_llm("analysis", elapsed, True, req_json, choice.message.content,
|
|
input_tokens=usage.prompt_tokens if usage else None,
|
|
output_tokens=usage.completion_tokens if usage else None)
|
|
return self._parse_content_fallback(choice.message.content)
|
|
|
|
logger.warning("No tool call or content in LLM response.")
|
|
self._log_llm("analysis", elapsed, False, req_json, error="Empty response")
|
|
return None
|
|
|
|
except Exception as e:
|
|
elapsed = int((time.monotonic() - t0) * 1000)
|
|
logger.error("LLM analysis error: %s", e)
|
|
self._log_llm("analysis", elapsed, False, req_json, error=str(e))
|
|
return None
|
|
|
|
def _validate_result(self, result: dict) -> dict:
|
|
score = float(result.get("toxicity_score", 0.0))
|
|
result["toxicity_score"] = min(max(score, 0.0), 1.0)
|
|
|
|
if not isinstance(result.get("categories"), list):
|
|
result["categories"] = ["none"]
|
|
|
|
if not isinstance(result.get("reasoning"), str):
|
|
result["reasoning"] = ""
|
|
|
|
result["off_topic"] = bool(result.get("off_topic", False))
|
|
result.setdefault("topic_category", "general_chat")
|
|
result.setdefault("topic_reasoning", "")
|
|
|
|
coherence = float(result.get("coherence_score", 0.85))
|
|
result["coherence_score"] = min(max(coherence, 0.0), 1.0)
|
|
result.setdefault("coherence_flag", "normal")
|
|
|
|
result.setdefault("note_update", None)
|
|
result.setdefault("detected_game", None)
|
|
if not isinstance(result.get("violated_rules"), list):
|
|
result["violated_rules"] = []
|
|
|
|
return result
|
|
|
|
def _parse_content_fallback(self, text: str) -> dict | None:
|
|
"""Try to parse plain-text content as JSON if tool calling didn't work."""
|
|
import re
|
|
|
|
# Try direct JSON
|
|
try:
|
|
result = json.loads(text.strip())
|
|
return self._validate_result(result)
|
|
except (json.JSONDecodeError, ValueError):
|
|
pass
|
|
|
|
# Try extracting from code block
|
|
match = re.search(r"```(?:json)?\s*(\{.*?\})\s*```", text, re.DOTALL)
|
|
if match:
|
|
try:
|
|
result = json.loads(match.group(1))
|
|
return self._validate_result(result)
|
|
except (json.JSONDecodeError, ValueError):
|
|
pass
|
|
|
|
# Regex fallback for toxicity_score
|
|
score_match = re.search(r'"toxicity_score"\s*:\s*([\d.]+)', text)
|
|
if score_match:
|
|
return {
|
|
"toxicity_score": min(max(float(score_match.group(1)), 0.0), 1.0),
|
|
"categories": ["unknown"],
|
|
"reasoning": "Parsed via fallback regex",
|
|
}
|
|
|
|
logger.warning("Could not parse LLM content fallback: %s", text[:200])
|
|
return None
|
|
|
|
# -- Conversation-level analysis (mention scan) --
|
|
|
|
@staticmethod
|
|
def _format_relative_time(delta: timedelta) -> str:
|
|
total_seconds = int(delta.total_seconds())
|
|
if total_seconds < 60:
|
|
return f"~{total_seconds}s ago"
|
|
minutes = total_seconds // 60
|
|
if minutes < 60:
|
|
return f"~{minutes}m ago"
|
|
hours = minutes // 60
|
|
return f"~{hours}h ago"
|
|
|
|
@staticmethod
|
|
def _format_conversation_block(
|
|
messages: list[tuple[str, str, datetime, str | None]],
|
|
now: datetime | None = None,
|
|
new_message_start: int | None = None,
|
|
) -> str:
|
|
"""Format messages as a compact timestamped chat block.
|
|
|
|
Each tuple is (username, content, timestamp, reply_to_username).
|
|
Consecutive messages from the same user collapse to indented lines.
|
|
Replies shown as ``username → replied_to:``.
|
|
|
|
If *new_message_start* is given, a separator is inserted before that
|
|
index so the LLM can distinguish context from new messages.
|
|
"""
|
|
if now is None:
|
|
now = datetime.now(timezone.utc)
|
|
|
|
lines = [f"[Current time: {now.strftime('%I:%M %p')}]", ""]
|
|
last_user = None
|
|
in_new_section = new_message_start is None or new_message_start == 0
|
|
|
|
for idx, (username, content, ts, reply_to) in enumerate(messages):
|
|
if new_message_start is not None and idx == new_message_start:
|
|
lines.append("")
|
|
lines.append("--- NEW MESSAGES (score only these) ---")
|
|
lines.append("")
|
|
last_user = None # reset collapse so first new msg gets full header
|
|
in_new_section = True
|
|
delta = now - ts.replace(tzinfo=timezone.utc) if ts.tzinfo is None else now - ts
|
|
rel = LLMClient._format_relative_time(delta)
|
|
tag = "" if in_new_section else " [CONTEXT]"
|
|
|
|
if username == last_user:
|
|
# Continuation from same user — indent
|
|
for line in content.split("\n"):
|
|
lines.append(f" {line}")
|
|
else:
|
|
# New user block
|
|
if reply_to:
|
|
prefix = f"[{rel}] {username} → {reply_to}:{tag} "
|
|
else:
|
|
prefix = f"[{rel}] {username}:{tag} "
|
|
msg_lines = content.split("\n")
|
|
lines.append(prefix + msg_lines[0])
|
|
for line in msg_lines[1:]:
|
|
lines.append(f" {line}")
|
|
|
|
last_user = username
|
|
|
|
return "\n".join(lines)
|
|
|
|
async def analyze_conversation(
|
|
self,
|
|
messages: list[tuple[str, str, datetime, str | None]],
|
|
mention_context: str = "",
|
|
channel_context: str = "",
|
|
user_notes_map: dict[str, str] | None = None,
|
|
new_message_start: int | None = None,
|
|
user_aliases: str = "",
|
|
rules_context: str = "",
|
|
) -> dict | None:
|
|
"""Analyze a conversation block in one call, returning per-user findings."""
|
|
if not messages:
|
|
return None
|
|
|
|
convo_block = self._format_conversation_block(messages, new_message_start=new_message_start)
|
|
|
|
user_content = f"=== CONVERSATION BLOCK ===\n{convo_block}\n\n"
|
|
if user_aliases:
|
|
user_content += f"=== KNOWN MEMBER ALIASES (names other members use to refer to each other) ===\n{user_aliases}\n\n"
|
|
if user_notes_map:
|
|
notes_lines = [f" {u}: {n}" for u, n in user_notes_map.items() if n]
|
|
if notes_lines:
|
|
user_content += "=== USER NOTES (from prior analysis) ===\n" + "\n".join(notes_lines) + "\n\n"
|
|
if channel_context:
|
|
user_content += f"=== CHANNEL INFO ===\n{channel_context}\n\n"
|
|
if rules_context:
|
|
user_content += f"=== SERVER RULES ===\n{rules_context}\n\n"
|
|
if mention_context:
|
|
user_content += f"=== USER REPORT (a user flagged this conversation — focus on this concern) ===\n{mention_context}\n\n"
|
|
user_content += "Analyze the conversation block above and report findings for each user."
|
|
user_content = self._append_no_think(user_content)
|
|
|
|
req_json = json.dumps([
|
|
{"role": "system", "content": SYSTEM_PROMPT[:500]},
|
|
{"role": "user", "content": user_content[:500]},
|
|
], default=str)
|
|
t0 = time.monotonic()
|
|
|
|
async with self._semaphore:
|
|
try:
|
|
temp_kwargs = {"temperature": 0.1} if self._supports_temperature else {}
|
|
response = await self._client.chat.completions.create(
|
|
model=self.model,
|
|
messages=[
|
|
{"role": "system", "content": SYSTEM_PROMPT},
|
|
{"role": "user", "content": user_content},
|
|
],
|
|
tools=[CONVERSATION_TOOL],
|
|
tool_choice={"type": "function", "function": {"name": "report_conversation_scan"}},
|
|
**temp_kwargs,
|
|
max_completion_tokens=4096,
|
|
)
|
|
|
|
elapsed = int((time.monotonic() - t0) * 1000)
|
|
choice = response.choices[0]
|
|
usage = response.usage
|
|
|
|
if choice.message.tool_calls:
|
|
tool_call = choice.message.tool_calls[0]
|
|
resp_text = tool_call.function.arguments
|
|
args = json.loads(resp_text)
|
|
self._log_llm("conversation", elapsed, True, req_json, resp_text,
|
|
input_tokens=usage.prompt_tokens if usage else None,
|
|
output_tokens=usage.completion_tokens if usage else None)
|
|
return self._validate_conversation_result(args)
|
|
|
|
# Fallback: try parsing the message content as JSON
|
|
if choice.message.content:
|
|
logger.warning("No tool call in conversation analysis — trying content fallback. Content: %s", choice.message.content[:300])
|
|
fallback = self._parse_conversation_content_fallback(choice.message.content)
|
|
if fallback:
|
|
self._log_llm("conversation", elapsed, True, req_json, choice.message.content,
|
|
input_tokens=usage.prompt_tokens if usage else None,
|
|
output_tokens=usage.completion_tokens if usage else None)
|
|
return fallback
|
|
|
|
logger.warning("No tool call or parseable content in conversation analysis response.")
|
|
self._log_llm("conversation", elapsed, False, req_json, error="Empty response")
|
|
return None
|
|
|
|
except Exception as e:
|
|
elapsed = int((time.monotonic() - t0) * 1000)
|
|
logger.error("LLM conversation analysis error: %s", e)
|
|
self._log_llm("conversation", elapsed, False, req_json, error=str(e))
|
|
return None
|
|
|
|
@staticmethod
|
|
def _validate_conversation_result(result: dict) -> dict:
|
|
"""Validate and normalize conversation analysis result."""
|
|
if not isinstance(result, dict):
|
|
return {"user_findings": [], "conversation_summary": ""}
|
|
findings = [f for f in result.get("user_findings", []) if isinstance(f, dict)]
|
|
for finding in findings:
|
|
finding.setdefault("username", "unknown")
|
|
score = float(finding.get("toxicity_score", 0.0))
|
|
finding["toxicity_score"] = min(max(score, 0.0), 1.0)
|
|
if not isinstance(finding.get("categories"), list):
|
|
finding["categories"] = ["none"]
|
|
finding.setdefault("reasoning", "")
|
|
finding.setdefault("worst_message", None)
|
|
finding["off_topic"] = bool(finding.get("off_topic", False))
|
|
finding.setdefault("topic_category", "general_chat")
|
|
finding.setdefault("topic_reasoning", "")
|
|
coherence = float(finding.get("coherence_score", 0.85))
|
|
finding["coherence_score"] = min(max(coherence, 0.0), 1.0)
|
|
finding.setdefault("coherence_flag", "normal")
|
|
finding.setdefault("note_update", None)
|
|
finding.setdefault("detected_game", None)
|
|
if not isinstance(finding.get("violated_rules"), list):
|
|
finding["violated_rules"] = []
|
|
result["user_findings"] = findings
|
|
result.setdefault("conversation_summary", "")
|
|
return result
|
|
|
|
def _parse_conversation_content_fallback(self, text: str) -> dict | None:
|
|
"""Try to parse plain-text content as JSON for conversation analysis."""
|
|
import re
|
|
|
|
# Try direct JSON parse
|
|
try:
|
|
result = json.loads(text.strip())
|
|
if "user_findings" in result:
|
|
return self._validate_conversation_result(result)
|
|
except (json.JSONDecodeError, ValueError):
|
|
pass
|
|
|
|
# Try extracting from code block
|
|
match = re.search(r"```(?:json)?\s*(\{.*?\})\s*```", text, re.DOTALL)
|
|
if match:
|
|
try:
|
|
result = json.loads(match.group(1))
|
|
if "user_findings" in result:
|
|
return self._validate_conversation_result(result)
|
|
except (json.JSONDecodeError, ValueError):
|
|
pass
|
|
|
|
logger.warning("Could not parse conversation content fallback: %s", text[:200])
|
|
return None
|
|
|
|
async def chat(
|
|
self, messages: list[dict[str, str]], system_prompt: str,
|
|
on_first_token=None, recent_bot_replies: list[str] | None = None,
|
|
) -> str | None:
|
|
"""Send a conversational chat request (no tools).
|
|
|
|
If *on_first_token* is an async callable it will be awaited once the
|
|
first content token arrives (useful for triggering the typing indicator
|
|
only after the model starts generating).
|
|
"""
|
|
patched = list(messages)
|
|
|
|
# Append recent bot replies to the system prompt so the model avoids
|
|
# repeating the same phrases / joke structures.
|
|
effective_prompt = system_prompt
|
|
if recent_bot_replies:
|
|
avoid_block = "\n".join(f"- {r}" for r in recent_bot_replies)
|
|
effective_prompt += (
|
|
"\n\nIMPORTANT — you recently said the following. "
|
|
"Do NOT reuse any of these phrases, sentence structures, or joke patterns. "
|
|
"Come up with something completely different.\n"
|
|
+ avoid_block
|
|
)
|
|
|
|
req_json = json.dumps([
|
|
{"role": "system", "content": effective_prompt[:500]},
|
|
*[{"role": m["role"], "content": str(m.get("content", ""))[:200]} for m in patched],
|
|
], default=str)
|
|
t0 = time.monotonic()
|
|
|
|
async with self._semaphore:
|
|
try:
|
|
temp_kwargs = {"temperature": 0.9, "frequency_penalty": 0.8, "presence_penalty": 0.6} if self._supports_temperature else {}
|
|
stream = await self._client.chat.completions.create(
|
|
model=self.model,
|
|
messages=[
|
|
{"role": "system", "content": effective_prompt},
|
|
*patched,
|
|
],
|
|
**temp_kwargs,
|
|
max_completion_tokens=2048,
|
|
stream=True,
|
|
)
|
|
|
|
chunks: list[str] = []
|
|
notified = False
|
|
async for chunk in stream:
|
|
delta = chunk.choices[0].delta if chunk.choices else None
|
|
if delta and delta.content:
|
|
if not notified and on_first_token:
|
|
await on_first_token()
|
|
notified = True
|
|
chunks.append(delta.content)
|
|
|
|
content = "".join(chunks).strip()
|
|
elapsed = int((time.monotonic() - t0) * 1000)
|
|
self._log_llm("chat", elapsed, bool(content), req_json, content or None)
|
|
return content if content else None
|
|
except Exception as e:
|
|
elapsed = int((time.monotonic() - t0) * 1000)
|
|
logger.error("LLM chat error: %s", e)
|
|
self._log_llm("chat", elapsed, False, req_json, error=str(e))
|
|
return None
|
|
|
|
async def classify_mention_intent(self, message_text: str) -> str:
|
|
"""Classify whether a bot @mention is a chat/question or a moderation report.
|
|
|
|
Returns 'chat' or 'report'. Defaults to 'chat' on failure.
|
|
"""
|
|
prompt = (
|
|
"You are classifying the intent of a Discord message that @mentioned a bot.\n"
|
|
"Reply with EXACTLY one word: 'chat' or 'report'.\n\n"
|
|
"- 'chat' = the user is talking to the bot, asking a question, joking, greeting, "
|
|
"or having a conversation. This includes things like 'what do you think?', "
|
|
"'hey bot', 'do you know...', or any general interaction.\n"
|
|
"- 'report' = the user is flagging bad behavior, asking the bot to check/scan "
|
|
"the chat, reporting toxicity, or pointing out someone being problematic. "
|
|
"This includes things like 'check this', 'they're being toxic', 'look at what "
|
|
"they said', 'scan the chat', or concerns about other users.\n\n"
|
|
"If unsure, say 'chat'."
|
|
)
|
|
t0 = time.monotonic()
|
|
|
|
async with self._semaphore:
|
|
try:
|
|
temp_kwargs = {"temperature": 0.0} if self._supports_temperature else {}
|
|
response = await self._client.chat.completions.create(
|
|
model=self.model,
|
|
messages=[
|
|
{"role": "system", "content": prompt},
|
|
{"role": "user", "content": message_text},
|
|
],
|
|
**temp_kwargs,
|
|
max_completion_tokens=16,
|
|
)
|
|
elapsed = int((time.monotonic() - t0) * 1000)
|
|
content = (response.choices[0].message.content or "").strip().lower()
|
|
intent = "report" if "report" in content else "chat"
|
|
self._log_llm("classify_intent", elapsed, True, message_text[:200], intent)
|
|
logger.info("Mention intent classified as '%s' for: %s", intent, message_text[:80])
|
|
return intent
|
|
except Exception as e:
|
|
elapsed = int((time.monotonic() - t0) * 1000)
|
|
logger.error("Intent classification error: %s", e)
|
|
self._log_llm("classify_intent", elapsed, False, message_text[:200], error=str(e))
|
|
return "chat"
|
|
|
|
_REACTION_EMOJIS = {
|
|
"\U0001f480", "\U0001f602", "\U0001f440", "\U0001f525",
|
|
"\U0001f4af", "\U0001f62d", "\U0001f921", "\u2764\ufe0f",
|
|
"\U0001fae1", "\U0001f913", "\U0001f974", "\U0001f3af",
|
|
}
|
|
|
|
async def pick_reaction(self, message_text: str, channel_name: str) -> str | None:
|
|
"""Pick a contextual emoji reaction for a Discord message.
|
|
|
|
Returns an emoji string, or None if no reaction is appropriate.
|
|
"""
|
|
prompt = (
|
|
"You are a lurker in a Discord gaming server. "
|
|
"Given a message and its channel, decide if it deserves a reaction emoji.\n\n"
|
|
"Available reactions:\n"
|
|
"\U0001f480 = funny/dead\n"
|
|
"\U0001f602 = hilarious\n"
|
|
"\U0001f440 = drama/spicy\n"
|
|
"\U0001f525 = impressive\n"
|
|
"\U0001f4af = good take\n"
|
|
"\U0001f62d = sad/tragic\n"
|
|
"\U0001f921 = clown moment\n"
|
|
"\u2764\ufe0f = wholesome\n"
|
|
"\U0001fae1 = respect\n"
|
|
"\U0001f913 = nerd\n"
|
|
"\U0001f974 = drunk/unhinged\n"
|
|
"\U0001f3af = accurate\n\n"
|
|
"Reply with ONLY the emoji, or NONE if the message doesn't warrant a reaction. "
|
|
"Most messages should get NONE — only react when something genuinely stands out."
|
|
)
|
|
t0 = time.monotonic()
|
|
|
|
async with self._semaphore:
|
|
try:
|
|
temp_kwargs = {"temperature": 0.9} if self._supports_temperature else {}
|
|
response = await self._client.chat.completions.create(
|
|
model=self.model,
|
|
messages=[
|
|
{"role": "system", "content": prompt},
|
|
{"role": "user", "content": f"[#{channel_name}] {message_text[:500]}"},
|
|
],
|
|
**temp_kwargs,
|
|
max_completion_tokens=16,
|
|
)
|
|
elapsed = int((time.monotonic() - t0) * 1000)
|
|
raw = (response.choices[0].message.content or "").strip()
|
|
token = raw.split()[0] if raw.split() else ""
|
|
|
|
if not token or token.lower() == "none" or token not in self._REACTION_EMOJIS:
|
|
self._log_llm("pick_reaction", elapsed, True, message_text[:200], "NONE")
|
|
return None
|
|
|
|
self._log_llm("pick_reaction", elapsed, True, message_text[:200], token)
|
|
logger.debug("Picked reaction %s for: %s", token, message_text[:80])
|
|
return token
|
|
except Exception as e:
|
|
elapsed = int((time.monotonic() - t0) * 1000)
|
|
logger.error("Reaction pick error: %s", e)
|
|
self._log_llm("pick_reaction", elapsed, False, message_text[:200], error=str(e))
|
|
return None
|
|
|
|
async def check_reply_relevance(
|
|
self, recent_messages: list[str], memory_context: str = "",
|
|
) -> bool:
|
|
"""Check if the bot would naturally want to jump into a conversation.
|
|
|
|
Returns True if the conversation is something worth replying to.
|
|
"""
|
|
prompt = (
|
|
"You're a regular member of a Discord gaming server. You're reading chat and deciding "
|
|
"whether you'd naturally want to jump in and say something.\n\n"
|
|
"Say YES if:\n"
|
|
"- Someone said something you'd have a strong reaction to\n"
|
|
"- You know something relevant about these people (see memory context)\n"
|
|
"- Someone is wrong or has a hot take you'd want to respond to\n"
|
|
"- The conversation is funny or interesting enough to comment on\n"
|
|
"- Someone mentioned something you have an opinion on\n\n"
|
|
"Say NO if:\n"
|
|
"- It's mundane/boring small talk\n"
|
|
"- You'd have nothing interesting to add\n"
|
|
"- People are just chatting normally and don't need interruption\n\n"
|
|
"Reply with EXACTLY one word: YES or NO."
|
|
)
|
|
convo_text = "\n".join(recent_messages[-5:])
|
|
user_content = ""
|
|
if memory_context:
|
|
user_content += f"{memory_context}\n\n"
|
|
user_content += f"Recent chat:\n{convo_text}"
|
|
|
|
t0 = time.monotonic()
|
|
|
|
async with self._semaphore:
|
|
try:
|
|
temp_kwargs = {"temperature": 0.3} if self._supports_temperature else {}
|
|
response = await self._client.chat.completions.create(
|
|
model=self.model,
|
|
messages=[
|
|
{"role": "system", "content": prompt},
|
|
{"role": "user", "content": user_content[:1000]},
|
|
],
|
|
**temp_kwargs,
|
|
max_completion_tokens=16,
|
|
)
|
|
elapsed = int((time.monotonic() - t0) * 1000)
|
|
content = (response.choices[0].message.content or "").strip().lower()
|
|
is_relevant = "yes" in content
|
|
self._log_llm(
|
|
"check_relevance", elapsed, True,
|
|
user_content[:300], content,
|
|
)
|
|
logger.debug("Relevance check: %s", content)
|
|
return is_relevant
|
|
except Exception as e:
|
|
elapsed = int((time.monotonic() - t0) * 1000)
|
|
logger.error("Relevance check error: %s", e)
|
|
self._log_llm("check_relevance", elapsed, False, user_content[:300], error=str(e))
|
|
return False
|
|
|
|
async def extract_memories(
|
|
self,
|
|
conversation: list[dict[str, str]],
|
|
username: str,
|
|
current_profile: str = "",
|
|
) -> dict | None:
|
|
"""Extract memories from a conversation for a specific user.
|
|
|
|
Returns dict with "memories" list and optional "profile_update", or None on failure.
|
|
"""
|
|
# Format conversation as readable lines
|
|
convo_lines = []
|
|
for msg in conversation:
|
|
role = msg.get("role", "")
|
|
content = msg.get("content", "")
|
|
if role == "assistant":
|
|
convo_lines.append(f"Bot: {content}")
|
|
else:
|
|
convo_lines.append(f"{username}: {content}")
|
|
convo_text = "\n".join(convo_lines)
|
|
|
|
user_content = ""
|
|
if current_profile:
|
|
user_content += f"=== CURRENT PROFILE FOR {username} ===\n{current_profile}\n\n"
|
|
else:
|
|
user_content += f"=== CURRENT PROFILE FOR {username} ===\n(no profile yet)\n\n"
|
|
user_content += f"=== CONVERSATION ===\n{convo_text}\n\n"
|
|
user_content += f"Extract any noteworthy memories from this conversation with {username}."
|
|
user_content = self._append_no_think(user_content)
|
|
|
|
req_json = json.dumps([
|
|
{"role": "system", "content": MEMORY_EXTRACTION_PROMPT[:500]},
|
|
{"role": "user", "content": user_content[:500]},
|
|
], default=str)
|
|
t0 = time.monotonic()
|
|
|
|
async with self._semaphore:
|
|
try:
|
|
temp_kwargs = {"temperature": 0.3} if self._supports_temperature else {}
|
|
response = await self._client.chat.completions.create(
|
|
model=self.model,
|
|
messages=[
|
|
{"role": "system", "content": MEMORY_EXTRACTION_PROMPT},
|
|
{"role": "user", "content": user_content},
|
|
],
|
|
tools=[MEMORY_EXTRACTION_TOOL],
|
|
tool_choice={"type": "function", "function": {"name": "extract_memories"}},
|
|
**temp_kwargs,
|
|
max_completion_tokens=1024,
|
|
)
|
|
|
|
elapsed = int((time.monotonic() - t0) * 1000)
|
|
choice = response.choices[0]
|
|
usage = response.usage
|
|
|
|
if choice.message.tool_calls:
|
|
tool_call = choice.message.tool_calls[0]
|
|
resp_text = tool_call.function.arguments
|
|
args = json.loads(resp_text)
|
|
self._log_llm("memory_extraction", elapsed, True, req_json, resp_text,
|
|
input_tokens=usage.prompt_tokens if usage else None,
|
|
output_tokens=usage.completion_tokens if usage else None)
|
|
return self._validate_memory_result(args)
|
|
|
|
logger.warning("No tool call in memory extraction response.")
|
|
self._log_llm("memory_extraction", elapsed, False, req_json, error="No tool call")
|
|
return None
|
|
|
|
except Exception as e:
|
|
elapsed = int((time.monotonic() - t0) * 1000)
|
|
logger.error("LLM memory extraction error: %s", e)
|
|
self._log_llm("memory_extraction", elapsed, False, req_json, error=str(e))
|
|
return None
|
|
|
|
@staticmethod
|
|
def _validate_memory_result(result: dict) -> dict:
|
|
"""Validate and normalize memory extraction result."""
|
|
valid_expirations = {"1d", "3d", "7d", "30d", "permanent"}
|
|
valid_importances = {"low", "medium", "high"}
|
|
|
|
memories = []
|
|
for mem in result.get("memories", []):
|
|
if not isinstance(mem, dict):
|
|
continue
|
|
memory_text = str(mem.get("memory", ""))[:500]
|
|
if not memory_text:
|
|
continue
|
|
topics = mem.get("topics", [])
|
|
if not isinstance(topics, list):
|
|
topics = []
|
|
topics = [str(t).lower() for t in topics]
|
|
|
|
expiration = str(mem.get("expiration", "7d"))
|
|
if expiration not in valid_expirations:
|
|
expiration = "7d"
|
|
|
|
importance = str(mem.get("importance", "medium"))
|
|
if importance not in valid_importances:
|
|
importance = "medium"
|
|
|
|
memories.append({
|
|
"memory": memory_text,
|
|
"topics": topics,
|
|
"expiration": expiration,
|
|
"importance": importance,
|
|
})
|
|
|
|
profile_update = result.get("profile_update")
|
|
if profile_update is not None:
|
|
profile_update = str(profile_update)[:500]
|
|
|
|
return {
|
|
"memories": memories,
|
|
"profile_update": profile_update,
|
|
}
|
|
|
|
async def sanitize_notes(self, notes: str) -> str:
|
|
"""Rewrite user notes to remove any quoted toxic/offensive language.
|
|
|
|
Returns the sanitized notes string, or the original on failure.
|
|
"""
|
|
if not notes or len(notes.strip()) == 0:
|
|
return notes
|
|
|
|
system_prompt = (
|
|
"Rewrite the following user behavior notes. Remove any quoted offensive language, "
|
|
"slurs, or profanity. Replace toxic quotes with abstract descriptions of the behavior "
|
|
"(e.g. 'directed a personal insult at another user' instead of quoting the insult). "
|
|
"Preserve all non-toxic observations, timestamps, and behavioral patterns exactly. "
|
|
"Return ONLY the rewritten notes, nothing else."
|
|
)
|
|
user_content = notes
|
|
if self._no_think:
|
|
user_content += "\n/no_think"
|
|
|
|
t0 = time.monotonic()
|
|
async with self._semaphore:
|
|
try:
|
|
temp_kwargs = {"temperature": 0.1} if self._supports_temperature else {}
|
|
response = await self._client.chat.completions.create(
|
|
model=self.model,
|
|
messages=[
|
|
{"role": "system", "content": system_prompt},
|
|
{"role": "user", "content": user_content},
|
|
],
|
|
**temp_kwargs,
|
|
max_completion_tokens=1024,
|
|
)
|
|
elapsed = int((time.monotonic() - t0) * 1000)
|
|
result = response.choices[0].message.content
|
|
if result and result.strip():
|
|
self._log_llm("sanitize_notes", elapsed, True, notes[:300], result[:300])
|
|
return result.strip()
|
|
self._log_llm("sanitize_notes", elapsed, False, notes[:300], error="Empty response")
|
|
return notes
|
|
except Exception as e:
|
|
elapsed = int((time.monotonic() - t0) * 1000)
|
|
logger.error("LLM sanitize_notes error: %s", e)
|
|
self._log_llm("sanitize_notes", elapsed, False, notes[:300], error=str(e))
|
|
return notes
|
|
|
|
async def analyze_image(
|
|
self,
|
|
image_bytes: bytes,
|
|
system_prompt: str,
|
|
user_text: str = "",
|
|
on_first_token=None,
|
|
media_type: str = "image/png",
|
|
) -> str | None:
|
|
"""Send an image to the vision model with a system prompt.
|
|
|
|
Returns the generated text response, or None on failure.
|
|
"""
|
|
b64 = base64.b64encode(image_bytes).decode()
|
|
data_url = f"data:{media_type};base64,{b64}"
|
|
|
|
user_content: list[dict] = [
|
|
{"type": "image_url", "image_url": {"url": data_url}},
|
|
]
|
|
text_part = user_text or ""
|
|
if self._no_think:
|
|
text_part = (text_part + "\n/no_think").strip()
|
|
if text_part:
|
|
user_content.append({"type": "text", "text": text_part})
|
|
|
|
req_json = json.dumps([
|
|
{"role": "system", "content": system_prompt[:500]},
|
|
{"role": "user", "content": f"[image {len(image_bytes)} bytes] {user_text or ''}"},
|
|
], default=str)
|
|
t0 = time.monotonic()
|
|
|
|
async with self._semaphore:
|
|
try:
|
|
async def _stream_image():
|
|
temp_kwargs = {"temperature": 0.8} if self._supports_temperature else {}
|
|
stream = await self._client.chat.completions.create(
|
|
model=self.model,
|
|
messages=[
|
|
{"role": "system", "content": system_prompt},
|
|
{"role": "user", "content": user_content},
|
|
],
|
|
**temp_kwargs,
|
|
max_completion_tokens=2048,
|
|
stream=True,
|
|
)
|
|
|
|
chunks: list[str] = []
|
|
notified = False
|
|
async for chunk in stream:
|
|
delta = chunk.choices[0].delta if chunk.choices else None
|
|
if delta and delta.content:
|
|
if not notified and on_first_token:
|
|
await on_first_token()
|
|
notified = True
|
|
chunks.append(delta.content)
|
|
|
|
return "".join(chunks).strip()
|
|
|
|
content = await asyncio.wait_for(_stream_image(), timeout=120)
|
|
elapsed = int((time.monotonic() - t0) * 1000)
|
|
self._log_llm("image", elapsed, bool(content), req_json, content or None)
|
|
return content if content else None
|
|
except asyncio.TimeoutError:
|
|
elapsed = int((time.monotonic() - t0) * 1000)
|
|
logger.error("LLM image analysis timed out after %ds", elapsed // 1000)
|
|
self._log_llm("image", elapsed, False, req_json, error="Timeout")
|
|
return None
|
|
except Exception as e:
|
|
elapsed = int((time.monotonic() - t0) * 1000)
|
|
logger.error("LLM image analysis error: %s", e)
|
|
self._log_llm("image", elapsed, False, req_json, error=str(e))
|
|
return None
|
|
|
|
async def raw_analyze(
|
|
self, message: str, context: str = "", user_notes: str = "",
|
|
channel_context: str = "",
|
|
) -> tuple[str, dict | None]:
|
|
"""Return the raw LLM response string AND parsed result for /bcs-test (single LLM call)."""
|
|
user_content = f"=== RECENT CHANNEL MESSAGES (for background context only) ===\n{context}\n\n"
|
|
if user_notes:
|
|
user_content += f"=== NOTES ABOUT THIS USER (from prior analysis) ===\n{user_notes}\n\n"
|
|
if channel_context:
|
|
user_content += f"=== CHANNEL INFO ===\n{channel_context}\n\n"
|
|
user_content += f"=== TARGET MESSAGE (analyze THIS message only) ===\n{message}"
|
|
user_content = self._append_no_think(user_content)
|
|
|
|
req_json = json.dumps([
|
|
{"role": "system", "content": SYSTEM_PROMPT[:500]},
|
|
{"role": "user", "content": user_content},
|
|
], default=str)
|
|
t0 = time.monotonic()
|
|
|
|
async with self._semaphore:
|
|
try:
|
|
temp_kwargs = {"temperature": 0.1} if self._supports_temperature else {}
|
|
response = await self._client.chat.completions.create(
|
|
model=self.model,
|
|
messages=[
|
|
{"role": "system", "content": SYSTEM_PROMPT},
|
|
{"role": "user", "content": user_content},
|
|
],
|
|
tools=[ANALYSIS_TOOL],
|
|
tool_choice={"type": "function", "function": {"name": "report_analysis"}},
|
|
**temp_kwargs,
|
|
max_completion_tokens=2048,
|
|
)
|
|
|
|
elapsed = int((time.monotonic() - t0) * 1000)
|
|
choice = response.choices[0]
|
|
usage = response.usage
|
|
parts = []
|
|
parsed = None
|
|
|
|
if choice.message.content:
|
|
parts.append(f"Content: {choice.message.content}")
|
|
|
|
if choice.message.tool_calls:
|
|
for tc in choice.message.tool_calls:
|
|
parts.append(
|
|
f"Tool call: {tc.function.name}({tc.function.arguments})"
|
|
)
|
|
# Parse the first tool call
|
|
args = json.loads(choice.message.tool_calls[0].function.arguments)
|
|
parsed = self._validate_result(args)
|
|
elif choice.message.content:
|
|
parsed = self._parse_content_fallback(choice.message.content)
|
|
|
|
raw = "\n".join(parts) or "(empty response)"
|
|
self._log_llm("raw_analyze", elapsed, parsed is not None, req_json, raw,
|
|
input_tokens=usage.prompt_tokens if usage else None,
|
|
output_tokens=usage.completion_tokens if usage else None)
|
|
return raw, parsed
|
|
|
|
except Exception as e:
|
|
elapsed = int((time.monotonic() - t0) * 1000)
|
|
self._log_llm("raw_analyze", elapsed, False, req_json, error=str(e))
|
|
return f"Error: {e}", None
|