Files
Breehavior-Monitor/utils/llm_client.py
AJ Isaacs bf32a9536a feat: add server rule violation detection and compress prompts
- LLM now evaluates messages against numbered server rules and reports
  violated_rules in analysis output
- Warnings and mutes cite the specific rule(s) broken
- Rules extracted to prompts/rules.txt for prompt injection
- Personality prompts moved to prompts/personalities/ and compressed
  (~63% reduction across all prompt files)
- All prompt files tightened: removed redundancy, consolidated Do NOT
  sections, trimmed examples while preserving behavioral instructions

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-27 22:14:35 -05:00

997 lines
44 KiB
Python

import asyncio
import base64
import json
import logging
import time
from datetime import datetime, timedelta, timezone
from pathlib import Path
from openai import AsyncOpenAI
logger = logging.getLogger("bcs.llm")
_PROMPTS_DIR = Path(__file__).resolve().parent.parent / "prompts"
SYSTEM_PROMPT = (_PROMPTS_DIR / "analysis.txt").read_text(encoding="utf-8")
ANALYSIS_TOOL = {
"type": "function",
"function": {
"name": "report_analysis",
"description": "Report the toxicity and topic analysis of a Discord message.",
"parameters": {
"type": "object",
"properties": {
"toxicity_score": {
"type": "number",
"description": "Toxicity rating from 0.0 (completely harmless) to 1.0 (extremely toxic).",
},
"categories": {
"type": "array",
"items": {
"type": "string",
"enum": [
"aggressive",
"passive_aggressive",
"instigating",
"hostile",
"manipulative",
"sexual_vulgar",
"jealousy",
"none",
],
},
"description": "Detected toxicity behavior categories.",
},
"reasoning": {
"type": "string",
"description": "Brief explanation of the toxicity analysis.",
},
"off_topic": {
"type": "boolean",
"description": "True if the message is off-topic personal drama rather than gaming-related conversation.",
},
"topic_category": {
"type": "string",
"enum": [
"gaming",
"personal_drama",
"relationship_issues",
"real_life_venting",
"gossip",
"general_chat",
"meta",
],
"description": "What topic category the message falls into.",
},
"topic_reasoning": {
"type": "string",
"description": "Brief explanation of the topic classification.",
},
"coherence_score": {
"type": "number",
"description": "Coherence rating from 0.0 (incoherent gibberish) to 1.0 (clear and well-written). Normal texting shortcuts are fine.",
},
"coherence_flag": {
"type": "string",
"enum": [
"normal",
"intoxicated",
"tired",
"angry_typing",
"mobile_keyboard",
"language_barrier",
],
"description": "Best guess at why coherence is low, if applicable.",
},
"note_update": {
"type": ["string", "null"],
"description": "Brief new observation about this user's style/behavior for future reference, or null if nothing new.",
},
"detected_game": {
"type": ["string", "null"],
"description": "The game channel name this message is about (e.g. 'gta-online', 'warzone'), or null if not game-specific.",
},
"violated_rules": {
"type": "array",
"items": {"type": "integer"},
"description": "Rule numbers violated (empty array if none).",
},
},
"required": ["toxicity_score", "categories", "reasoning", "off_topic", "topic_category", "topic_reasoning", "coherence_score", "coherence_flag"],
},
},
}
CONVERSATION_TOOL = {
"type": "function",
"function": {
"name": "report_conversation_scan",
"description": "Analyze a conversation block and report findings per user.",
"parameters": {
"type": "object",
"properties": {
"user_findings": {
"type": "array",
"items": {
"type": "object",
"properties": {
"username": {
"type": "string",
"description": "Discord display name of the user.",
},
"toxicity_score": {
"type": "number",
"description": "Weighted toxicity 0.0-1.0 across their messages in this conversation.",
},
"categories": {
"type": "array",
"items": {
"type": "string",
"enum": [
"aggressive",
"passive_aggressive",
"instigating",
"hostile",
"manipulative",
"sexual_vulgar",
"jealousy",
"none",
],
},
"description": "Detected toxicity behavior categories.",
},
"reasoning": {
"type": "string",
"description": "Brief explanation of this user's behavior in the conversation.",
},
"worst_message": {
"type": ["string", "null"],
"description": "Most problematic snippet from this user (quoted, max 100 chars), or null if nothing notable.",
},
"off_topic": {
"type": "boolean",
"description": "True if this user's messages were primarily off-topic personal drama.",
},
"topic_category": {
"type": "string",
"enum": [
"gaming",
"personal_drama",
"relationship_issues",
"real_life_venting",
"gossip",
"general_chat",
"meta",
],
"description": "What topic category this user's messages fall into.",
},
"topic_reasoning": {
"type": "string",
"description": "Brief explanation of the topic classification for this user.",
},
"coherence_score": {
"type": "number",
"description": "Coherence rating 0.0-1.0 across this user's messages. Normal texting shortcuts are fine.",
},
"coherence_flag": {
"type": "string",
"enum": [
"normal",
"intoxicated",
"tired",
"angry_typing",
"mobile_keyboard",
"language_barrier",
],
"description": "Best guess at why coherence is low, if applicable.",
},
"note_update": {
"type": ["string", "null"],
"description": "New observation about this user's pattern, or null.",
},
"detected_game": {
"type": ["string", "null"],
"description": "The game channel name this user's messages are about, or null.",
},
"violated_rules": {
"type": "array",
"items": {"type": "integer"},
"description": "Rule numbers violated (empty array if none).",
},
},
"required": ["username", "toxicity_score", "categories", "reasoning", "off_topic", "topic_category", "topic_reasoning", "coherence_score", "coherence_flag"],
},
"description": "Findings for each user who participated in the conversation.",
},
"conversation_summary": {
"type": "string",
"description": "One-sentence summary of the overall conversation tone and any escalation patterns.",
},
},
"required": ["user_findings", "conversation_summary"],
},
},
}
MEMORY_EXTRACTION_TOOL = {
"type": "function",
"function": {
"name": "extract_memories",
"description": "Extract noteworthy memories from a conversation for future reference.",
"parameters": {
"type": "object",
"properties": {
"memories": {
"type": "array",
"items": {
"type": "object",
"properties": {
"memory": {
"type": "string",
"description": "A concise fact or observation worth remembering.",
},
"topics": {
"type": "array",
"items": {"type": "string"},
"description": "Topic tags for retrieval (e.g., 'gta', 'personal', 'warzone').",
},
"expiration": {
"type": "string",
"enum": ["1d", "3d", "7d", "30d", "permanent"],
"description": "How long this memory stays relevant.",
},
"importance": {
"type": "string",
"enum": ["low", "medium", "high"],
"description": "How important this memory is for future interactions.",
},
},
"required": ["memory", "topics", "expiration", "importance"],
},
"description": "Memories to store. Only include genuinely new or noteworthy information.",
},
"profile_update": {
"type": ["string", "null"],
"description": "Full updated profile summary incorporating new permanent facts, or null if no profile changes.",
},
},
"required": ["memories"],
},
},
}
MEMORY_EXTRACTION_PROMPT = (_PROMPTS_DIR / "memory_extraction.txt").read_text(encoding="utf-8")
_NO_TEMPERATURE_MODELS = {"gpt-5-nano", "o1", "o1-mini", "o1-preview", "o3", "o3-mini", "o4-mini"}
class LLMClient:
def __init__(self, base_url: str, model: str, api_key: str = "not-needed",
db=None, no_think: bool = False, concurrency: int = 4):
self.model = model
self.host = base_url.rstrip("/")
self._db = db
self._no_think = no_think
self._supports_temperature = model not in _NO_TEMPERATURE_MODELS
timeout = 600.0 if self.host else 120.0 # local models need longer for VRAM load
client_kwargs = {"api_key": api_key, "timeout": timeout}
if self.host:
client_kwargs["base_url"] = f"{self.host}/v1"
self._client = AsyncOpenAI(**client_kwargs)
self._semaphore = asyncio.Semaphore(concurrency)
def _log_llm(self, request_type: str, duration_ms: int, success: bool,
request: str, response: str | None = None, error: str | None = None,
input_tokens: int | None = None, output_tokens: int | None = None):
"""Fire-and-forget LLM log entry to the database."""
if not self._db:
return
asyncio.create_task(self._db.save_llm_log(
request_type=request_type,
model=self.model,
duration_ms=duration_ms,
success=success,
request=request,
response=response,
error=error,
input_tokens=input_tokens,
output_tokens=output_tokens,
))
def _append_no_think(self, text: str) -> str:
return text + "\n/no_think" if self._no_think else text
async def close(self):
await self._client.close()
async def analyze_message(
self, message: str, context: str = "", user_notes: str = "",
channel_context: str = "", mention_context: str = "",
rules_context: str = "",
) -> dict | None:
user_content = f"=== RECENT CHANNEL MESSAGES (for background context only) ===\n{context}\n\n"
if user_notes:
user_content += f"=== NOTES ABOUT THIS USER (from prior analysis) ===\n{user_notes}\n\n"
if channel_context:
user_content += f"=== CHANNEL INFO ===\n{channel_context}\n\n"
if rules_context:
user_content += f"=== SERVER RULES ===\n{rules_context}\n\n"
if mention_context:
user_content += f"=== USER REPORT (a user flagged this conversation — focus on this concern) ===\n{mention_context}\n\n"
user_content += f"=== TARGET MESSAGE (analyze THIS message only) ===\n{message}"
user_content = self._append_no_think(user_content)
req_json = json.dumps([
{"role": "system", "content": SYSTEM_PROMPT[:500]},
{"role": "user", "content": user_content},
], default=str)
t0 = time.monotonic()
async with self._semaphore:
try:
temp_kwargs = {"temperature": 0.1} if self._supports_temperature else {}
response = await self._client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": user_content},
],
tools=[ANALYSIS_TOOL],
tool_choice={"type": "function", "function": {"name": "report_analysis"}},
**temp_kwargs,
max_completion_tokens=2048,
)
elapsed = int((time.monotonic() - t0) * 1000)
choice = response.choices[0]
usage = response.usage
# Extract tool call arguments
if choice.message.tool_calls:
tool_call = choice.message.tool_calls[0]
resp_text = tool_call.function.arguments
args = json.loads(resp_text)
self._log_llm("analysis", elapsed, True, req_json, resp_text,
input_tokens=usage.prompt_tokens if usage else None,
output_tokens=usage.completion_tokens if usage else None)
return self._validate_result(args)
# Fallback: try parsing the message content as JSON
if choice.message.content:
self._log_llm("analysis", elapsed, True, req_json, choice.message.content,
input_tokens=usage.prompt_tokens if usage else None,
output_tokens=usage.completion_tokens if usage else None)
return self._parse_content_fallback(choice.message.content)
logger.warning("No tool call or content in LLM response.")
self._log_llm("analysis", elapsed, False, req_json, error="Empty response")
return None
except Exception as e:
elapsed = int((time.monotonic() - t0) * 1000)
logger.error("LLM analysis error: %s", e)
self._log_llm("analysis", elapsed, False, req_json, error=str(e))
return None
def _validate_result(self, result: dict) -> dict:
score = float(result.get("toxicity_score", 0.0))
result["toxicity_score"] = min(max(score, 0.0), 1.0)
if not isinstance(result.get("categories"), list):
result["categories"] = ["none"]
if not isinstance(result.get("reasoning"), str):
result["reasoning"] = ""
result["off_topic"] = bool(result.get("off_topic", False))
result.setdefault("topic_category", "general_chat")
result.setdefault("topic_reasoning", "")
coherence = float(result.get("coherence_score", 0.85))
result["coherence_score"] = min(max(coherence, 0.0), 1.0)
result.setdefault("coherence_flag", "normal")
result.setdefault("note_update", None)
result.setdefault("detected_game", None)
if not isinstance(result.get("violated_rules"), list):
result["violated_rules"] = []
return result
def _parse_content_fallback(self, text: str) -> dict | None:
"""Try to parse plain-text content as JSON if tool calling didn't work."""
import re
# Try direct JSON
try:
result = json.loads(text.strip())
return self._validate_result(result)
except (json.JSONDecodeError, ValueError):
pass
# Try extracting from code block
match = re.search(r"```(?:json)?\s*(\{.*?\})\s*```", text, re.DOTALL)
if match:
try:
result = json.loads(match.group(1))
return self._validate_result(result)
except (json.JSONDecodeError, ValueError):
pass
# Regex fallback for toxicity_score
score_match = re.search(r'"toxicity_score"\s*:\s*([\d.]+)', text)
if score_match:
return {
"toxicity_score": min(max(float(score_match.group(1)), 0.0), 1.0),
"categories": ["unknown"],
"reasoning": "Parsed via fallback regex",
}
logger.warning("Could not parse LLM content fallback: %s", text[:200])
return None
# -- Conversation-level analysis (mention scan) --
@staticmethod
def _format_relative_time(delta: timedelta) -> str:
total_seconds = int(delta.total_seconds())
if total_seconds < 60:
return f"~{total_seconds}s ago"
minutes = total_seconds // 60
if minutes < 60:
return f"~{minutes}m ago"
hours = minutes // 60
return f"~{hours}h ago"
@staticmethod
def _format_conversation_block(
messages: list[tuple[str, str, datetime, str | None]],
now: datetime | None = None,
new_message_start: int | None = None,
) -> str:
"""Format messages as a compact timestamped chat block.
Each tuple is (username, content, timestamp, reply_to_username).
Consecutive messages from the same user collapse to indented lines.
Replies shown as ``username → replied_to:``.
If *new_message_start* is given, a separator is inserted before that
index so the LLM can distinguish context from new messages.
"""
if now is None:
now = datetime.now(timezone.utc)
lines = [f"[Current time: {now.strftime('%I:%M %p')}]", ""]
last_user = None
in_new_section = new_message_start is None or new_message_start == 0
for idx, (username, content, ts, reply_to) in enumerate(messages):
if new_message_start is not None and idx == new_message_start:
lines.append("")
lines.append("--- NEW MESSAGES (score only these) ---")
lines.append("")
last_user = None # reset collapse so first new msg gets full header
in_new_section = True
delta = now - ts.replace(tzinfo=timezone.utc) if ts.tzinfo is None else now - ts
rel = LLMClient._format_relative_time(delta)
tag = "" if in_new_section else " [CONTEXT]"
if username == last_user:
# Continuation from same user — indent
for line in content.split("\n"):
lines.append(f" {line}")
else:
# New user block
if reply_to:
prefix = f"[{rel}] {username}{reply_to}:{tag} "
else:
prefix = f"[{rel}] {username}:{tag} "
msg_lines = content.split("\n")
lines.append(prefix + msg_lines[0])
for line in msg_lines[1:]:
lines.append(f" {line}")
last_user = username
return "\n".join(lines)
async def analyze_conversation(
self,
messages: list[tuple[str, str, datetime, str | None]],
mention_context: str = "",
channel_context: str = "",
user_notes_map: dict[str, str] | None = None,
new_message_start: int | None = None,
user_aliases: str = "",
rules_context: str = "",
) -> dict | None:
"""Analyze a conversation block in one call, returning per-user findings."""
if not messages:
return None
convo_block = self._format_conversation_block(messages, new_message_start=new_message_start)
user_content = f"=== CONVERSATION BLOCK ===\n{convo_block}\n\n"
if user_aliases:
user_content += f"=== KNOWN MEMBER ALIASES (names other members use to refer to each other) ===\n{user_aliases}\n\n"
if user_notes_map:
notes_lines = [f" {u}: {n}" for u, n in user_notes_map.items() if n]
if notes_lines:
user_content += "=== USER NOTES (from prior analysis) ===\n" + "\n".join(notes_lines) + "\n\n"
if channel_context:
user_content += f"=== CHANNEL INFO ===\n{channel_context}\n\n"
if rules_context:
user_content += f"=== SERVER RULES ===\n{rules_context}\n\n"
if mention_context:
user_content += f"=== USER REPORT (a user flagged this conversation — focus on this concern) ===\n{mention_context}\n\n"
user_content += "Analyze the conversation block above and report findings for each user."
user_content = self._append_no_think(user_content)
req_json = json.dumps([
{"role": "system", "content": SYSTEM_PROMPT[:500]},
{"role": "user", "content": user_content[:500]},
], default=str)
t0 = time.monotonic()
async with self._semaphore:
try:
temp_kwargs = {"temperature": 0.1} if self._supports_temperature else {}
response = await self._client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": user_content},
],
tools=[CONVERSATION_TOOL],
tool_choice={"type": "function", "function": {"name": "report_conversation_scan"}},
**temp_kwargs,
max_completion_tokens=4096,
)
elapsed = int((time.monotonic() - t0) * 1000)
choice = response.choices[0]
usage = response.usage
if choice.message.tool_calls:
tool_call = choice.message.tool_calls[0]
resp_text = tool_call.function.arguments
args = json.loads(resp_text)
self._log_llm("conversation", elapsed, True, req_json, resp_text,
input_tokens=usage.prompt_tokens if usage else None,
output_tokens=usage.completion_tokens if usage else None)
return self._validate_conversation_result(args)
# Fallback: try parsing the message content as JSON
if choice.message.content:
logger.warning("No tool call in conversation analysis — trying content fallback. Content: %s", choice.message.content[:300])
fallback = self._parse_conversation_content_fallback(choice.message.content)
if fallback:
self._log_llm("conversation", elapsed, True, req_json, choice.message.content,
input_tokens=usage.prompt_tokens if usage else None,
output_tokens=usage.completion_tokens if usage else None)
return fallback
logger.warning("No tool call or parseable content in conversation analysis response.")
self._log_llm("conversation", elapsed, False, req_json, error="Empty response")
return None
except Exception as e:
elapsed = int((time.monotonic() - t0) * 1000)
logger.error("LLM conversation analysis error: %s", e)
self._log_llm("conversation", elapsed, False, req_json, error=str(e))
return None
@staticmethod
def _validate_conversation_result(result: dict) -> dict:
"""Validate and normalize conversation analysis result."""
if not isinstance(result, dict):
return {"user_findings": [], "conversation_summary": ""}
findings = [f for f in result.get("user_findings", []) if isinstance(f, dict)]
for finding in findings:
finding.setdefault("username", "unknown")
score = float(finding.get("toxicity_score", 0.0))
finding["toxicity_score"] = min(max(score, 0.0), 1.0)
if not isinstance(finding.get("categories"), list):
finding["categories"] = ["none"]
finding.setdefault("reasoning", "")
finding.setdefault("worst_message", None)
finding["off_topic"] = bool(finding.get("off_topic", False))
finding.setdefault("topic_category", "general_chat")
finding.setdefault("topic_reasoning", "")
coherence = float(finding.get("coherence_score", 0.85))
finding["coherence_score"] = min(max(coherence, 0.0), 1.0)
finding.setdefault("coherence_flag", "normal")
finding.setdefault("note_update", None)
finding.setdefault("detected_game", None)
if not isinstance(finding.get("violated_rules"), list):
finding["violated_rules"] = []
result["user_findings"] = findings
result.setdefault("conversation_summary", "")
return result
def _parse_conversation_content_fallback(self, text: str) -> dict | None:
"""Try to parse plain-text content as JSON for conversation analysis."""
import re
# Try direct JSON parse
try:
result = json.loads(text.strip())
if "user_findings" in result:
return self._validate_conversation_result(result)
except (json.JSONDecodeError, ValueError):
pass
# Try extracting from code block
match = re.search(r"```(?:json)?\s*(\{.*?\})\s*```", text, re.DOTALL)
if match:
try:
result = json.loads(match.group(1))
if "user_findings" in result:
return self._validate_conversation_result(result)
except (json.JSONDecodeError, ValueError):
pass
logger.warning("Could not parse conversation content fallback: %s", text[:200])
return None
async def chat(
self, messages: list[dict[str, str]], system_prompt: str,
on_first_token=None, recent_bot_replies: list[str] | None = None,
) -> str | None:
"""Send a conversational chat request (no tools).
If *on_first_token* is an async callable it will be awaited once the
first content token arrives (useful for triggering the typing indicator
only after the model starts generating).
"""
patched = list(messages)
# Append recent bot replies to the system prompt so the model avoids
# repeating the same phrases / joke structures.
effective_prompt = system_prompt
if recent_bot_replies:
avoid_block = "\n".join(f"- {r}" for r in recent_bot_replies)
effective_prompt += (
"\n\nIMPORTANT — you recently said the following. "
"Do NOT reuse any of these phrases, sentence structures, or joke patterns. "
"Come up with something completely different.\n"
+ avoid_block
)
req_json = json.dumps([
{"role": "system", "content": effective_prompt[:500]},
*[{"role": m["role"], "content": str(m.get("content", ""))[:200]} for m in patched],
], default=str)
t0 = time.monotonic()
async with self._semaphore:
try:
temp_kwargs = {"temperature": 0.9, "frequency_penalty": 0.8, "presence_penalty": 0.6} if self._supports_temperature else {}
stream = await self._client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": effective_prompt},
*patched,
],
**temp_kwargs,
max_completion_tokens=2048,
stream=True,
)
chunks: list[str] = []
notified = False
async for chunk in stream:
delta = chunk.choices[0].delta if chunk.choices else None
if delta and delta.content:
if not notified and on_first_token:
await on_first_token()
notified = True
chunks.append(delta.content)
content = "".join(chunks).strip()
elapsed = int((time.monotonic() - t0) * 1000)
self._log_llm("chat", elapsed, bool(content), req_json, content or None)
return content if content else None
except Exception as e:
elapsed = int((time.monotonic() - t0) * 1000)
logger.error("LLM chat error: %s", e)
self._log_llm("chat", elapsed, False, req_json, error=str(e))
return None
async def classify_mention_intent(self, message_text: str) -> str:
"""Classify whether a bot @mention is a chat/question or a moderation report.
Returns 'chat' or 'report'. Defaults to 'chat' on failure.
"""
prompt = (
"You are classifying the intent of a Discord message that @mentioned a bot.\n"
"Reply with EXACTLY one word: 'chat' or 'report'.\n\n"
"- 'chat' = the user is talking to the bot, asking a question, joking, greeting, "
"or having a conversation. This includes things like 'what do you think?', "
"'hey bot', 'do you know...', or any general interaction.\n"
"- 'report' = the user is flagging bad behavior, asking the bot to check/scan "
"the chat, reporting toxicity, or pointing out someone being problematic. "
"This includes things like 'check this', 'they're being toxic', 'look at what "
"they said', 'scan the chat', or concerns about other users.\n\n"
"If unsure, say 'chat'."
)
t0 = time.monotonic()
async with self._semaphore:
try:
temp_kwargs = {"temperature": 0.0} if self._supports_temperature else {}
response = await self._client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": prompt},
{"role": "user", "content": message_text},
],
**temp_kwargs,
max_completion_tokens=16,
)
elapsed = int((time.monotonic() - t0) * 1000)
content = (response.choices[0].message.content or "").strip().lower()
intent = "report" if "report" in content else "chat"
self._log_llm("classify_intent", elapsed, True, message_text[:200], intent)
logger.info("Mention intent classified as '%s' for: %s", intent, message_text[:80])
return intent
except Exception as e:
elapsed = int((time.monotonic() - t0) * 1000)
logger.error("Intent classification error: %s", e)
self._log_llm("classify_intent", elapsed, False, message_text[:200], error=str(e))
return "chat"
async def extract_memories(
self,
conversation: list[dict[str, str]],
username: str,
current_profile: str = "",
) -> dict | None:
"""Extract memories from a conversation for a specific user.
Returns dict with "memories" list and optional "profile_update", or None on failure.
"""
# Format conversation as readable lines
convo_lines = []
for msg in conversation:
role = msg.get("role", "")
content = msg.get("content", "")
if role == "assistant":
convo_lines.append(f"Bot: {content}")
else:
convo_lines.append(f"{username}: {content}")
convo_text = "\n".join(convo_lines)
user_content = ""
if current_profile:
user_content += f"=== CURRENT PROFILE FOR {username} ===\n{current_profile}\n\n"
else:
user_content += f"=== CURRENT PROFILE FOR {username} ===\n(no profile yet)\n\n"
user_content += f"=== CONVERSATION ===\n{convo_text}\n\n"
user_content += f"Extract any noteworthy memories from this conversation with {username}."
user_content = self._append_no_think(user_content)
req_json = json.dumps([
{"role": "system", "content": MEMORY_EXTRACTION_PROMPT[:500]},
{"role": "user", "content": user_content[:500]},
], default=str)
t0 = time.monotonic()
async with self._semaphore:
try:
temp_kwargs = {"temperature": 0.3} if self._supports_temperature else {}
response = await self._client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": MEMORY_EXTRACTION_PROMPT},
{"role": "user", "content": user_content},
],
tools=[MEMORY_EXTRACTION_TOOL],
tool_choice={"type": "function", "function": {"name": "extract_memories"}},
**temp_kwargs,
max_completion_tokens=1024,
)
elapsed = int((time.monotonic() - t0) * 1000)
choice = response.choices[0]
usage = response.usage
if choice.message.tool_calls:
tool_call = choice.message.tool_calls[0]
resp_text = tool_call.function.arguments
args = json.loads(resp_text)
self._log_llm("memory_extraction", elapsed, True, req_json, resp_text,
input_tokens=usage.prompt_tokens if usage else None,
output_tokens=usage.completion_tokens if usage else None)
return self._validate_memory_result(args)
logger.warning("No tool call in memory extraction response.")
self._log_llm("memory_extraction", elapsed, False, req_json, error="No tool call")
return None
except Exception as e:
elapsed = int((time.monotonic() - t0) * 1000)
logger.error("LLM memory extraction error: %s", e)
self._log_llm("memory_extraction", elapsed, False, req_json, error=str(e))
return None
@staticmethod
def _validate_memory_result(result: dict) -> dict:
"""Validate and normalize memory extraction result."""
valid_expirations = {"1d", "3d", "7d", "30d", "permanent"}
valid_importances = {"low", "medium", "high"}
memories = []
for mem in result.get("memories", []):
if not isinstance(mem, dict):
continue
memory_text = str(mem.get("memory", ""))[:500]
if not memory_text:
continue
topics = mem.get("topics", [])
if not isinstance(topics, list):
topics = []
topics = [str(t).lower() for t in topics]
expiration = str(mem.get("expiration", "7d"))
if expiration not in valid_expirations:
expiration = "7d"
importance = str(mem.get("importance", "medium"))
if importance not in valid_importances:
importance = "medium"
memories.append({
"memory": memory_text,
"topics": topics,
"expiration": expiration,
"importance": importance,
})
profile_update = result.get("profile_update")
if profile_update is not None:
profile_update = str(profile_update)[:500]
return {
"memories": memories,
"profile_update": profile_update,
}
async def analyze_image(
self,
image_bytes: bytes,
system_prompt: str,
user_text: str = "",
on_first_token=None,
) -> str | None:
"""Send an image to the vision model with a system prompt.
Returns the generated text response, or None on failure.
"""
b64 = base64.b64encode(image_bytes).decode()
data_url = f"data:image/png;base64,{b64}"
user_content: list[dict] = [
{"type": "image_url", "image_url": {"url": data_url}},
]
text_part = user_text or ""
if self._no_think:
text_part = (text_part + "\n/no_think").strip()
if text_part:
user_content.append({"type": "text", "text": text_part})
req_json = json.dumps([
{"role": "system", "content": system_prompt[:500]},
{"role": "user", "content": f"[image {len(image_bytes)} bytes] {user_text or ''}"},
], default=str)
t0 = time.monotonic()
async with self._semaphore:
try:
async def _stream_image():
temp_kwargs = {"temperature": 0.8} if self._supports_temperature else {}
stream = await self._client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_content},
],
**temp_kwargs,
max_completion_tokens=2048,
stream=True,
)
chunks: list[str] = []
notified = False
async for chunk in stream:
delta = chunk.choices[0].delta if chunk.choices else None
if delta and delta.content:
if not notified and on_first_token:
await on_first_token()
notified = True
chunks.append(delta.content)
return "".join(chunks).strip()
content = await asyncio.wait_for(_stream_image(), timeout=120)
elapsed = int((time.monotonic() - t0) * 1000)
self._log_llm("image", elapsed, bool(content), req_json, content or None)
return content if content else None
except asyncio.TimeoutError:
elapsed = int((time.monotonic() - t0) * 1000)
logger.error("LLM image analysis timed out after %ds", elapsed // 1000)
self._log_llm("image", elapsed, False, req_json, error="Timeout")
return None
except Exception as e:
elapsed = int((time.monotonic() - t0) * 1000)
logger.error("LLM image analysis error: %s", e)
self._log_llm("image", elapsed, False, req_json, error=str(e))
return None
async def raw_analyze(
self, message: str, context: str = "", user_notes: str = "",
channel_context: str = "",
) -> tuple[str, dict | None]:
"""Return the raw LLM response string AND parsed result for /bcs-test (single LLM call)."""
user_content = f"=== RECENT CHANNEL MESSAGES (for background context only) ===\n{context}\n\n"
if user_notes:
user_content += f"=== NOTES ABOUT THIS USER (from prior analysis) ===\n{user_notes}\n\n"
if channel_context:
user_content += f"=== CHANNEL INFO ===\n{channel_context}\n\n"
user_content += f"=== TARGET MESSAGE (analyze THIS message only) ===\n{message}"
user_content = self._append_no_think(user_content)
req_json = json.dumps([
{"role": "system", "content": SYSTEM_PROMPT[:500]},
{"role": "user", "content": user_content},
], default=str)
t0 = time.monotonic()
async with self._semaphore:
try:
temp_kwargs = {"temperature": 0.1} if self._supports_temperature else {}
response = await self._client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": user_content},
],
tools=[ANALYSIS_TOOL],
tool_choice={"type": "function", "function": {"name": "report_analysis"}},
**temp_kwargs,
max_completion_tokens=2048,
)
elapsed = int((time.monotonic() - t0) * 1000)
choice = response.choices[0]
usage = response.usage
parts = []
parsed = None
if choice.message.content:
parts.append(f"Content: {choice.message.content}")
if choice.message.tool_calls:
for tc in choice.message.tool_calls:
parts.append(
f"Tool call: {tc.function.name}({tc.function.arguments})"
)
# Parse the first tool call
args = json.loads(choice.message.tool_calls[0].function.arguments)
parsed = self._validate_result(args)
elif choice.message.content:
parsed = self._parse_content_fallback(choice.message.content)
raw = "\n".join(parts) or "(empty response)"
self._log_llm("raw_analyze", elapsed, parsed is not None, req_json, raw,
input_tokens=usage.prompt_tokens if usage else None,
output_tokens=usage.completion_tokens if usage else None)
return raw, parsed
except Exception as e:
elapsed = int((time.monotonic() - t0) * 1000)
self._log_llm("raw_analyze", elapsed, False, req_json, error=str(e))
return f"Error: {e}", None