import asyncio import base64 import json import logging import time from datetime import datetime, timedelta, timezone from pathlib import Path from openai import AsyncOpenAI logger = logging.getLogger("bcs.llm") _PROMPTS_DIR = Path(__file__).resolve().parent.parent / "prompts" SYSTEM_PROMPT = (_PROMPTS_DIR / "analysis.txt").read_text(encoding="utf-8") ANALYSIS_TOOL = { "type": "function", "function": { "name": "report_analysis", "description": "Report the toxicity and topic analysis of a Discord message.", "parameters": { "type": "object", "properties": { "toxicity_score": { "type": "number", "description": "Toxicity rating from 0.0 (completely harmless) to 1.0 (extremely toxic).", }, "categories": { "type": "array", "items": { "type": "string", "enum": [ "aggressive", "passive_aggressive", "instigating", "hostile", "manipulative", "sexual_vulgar", "none", ], }, "description": "Detected toxicity behavior categories.", }, "reasoning": { "type": "string", "description": "Brief explanation of the toxicity analysis.", }, "off_topic": { "type": "boolean", "description": "True if the message is off-topic personal drama rather than gaming-related conversation.", }, "topic_category": { "type": "string", "enum": [ "gaming", "personal_drama", "relationship_issues", "real_life_venting", "gossip", "general_chat", "meta", ], "description": "What topic category the message falls into.", }, "topic_reasoning": { "type": "string", "description": "Brief explanation of the topic classification.", }, "coherence_score": { "type": "number", "description": "Coherence rating from 0.0 (incoherent gibberish) to 1.0 (clear and well-written). Normal texting shortcuts are fine.", }, "coherence_flag": { "type": "string", "enum": [ "normal", "intoxicated", "tired", "angry_typing", "mobile_keyboard", "language_barrier", ], "description": "Best guess at why coherence is low, if applicable.", }, "note_update": { "type": ["string", "null"], "description": "Brief new observation about this user's style/behavior for future reference, or null if nothing new.", }, "detected_game": { "type": ["string", "null"], "description": "The game channel name this message is about (e.g. 'gta-online', 'warzone'), or null if not game-specific.", }, }, "required": ["toxicity_score", "categories", "reasoning", "off_topic", "topic_category", "topic_reasoning", "coherence_score", "coherence_flag"], }, }, } CONVERSATION_TOOL = { "type": "function", "function": { "name": "report_conversation_scan", "description": "Analyze a conversation block and report findings per user.", "parameters": { "type": "object", "properties": { "user_findings": { "type": "array", "items": { "type": "object", "properties": { "username": { "type": "string", "description": "Discord display name of the user.", }, "toxicity_score": { "type": "number", "description": "Weighted toxicity 0.0-1.0 across their messages in this conversation.", }, "categories": { "type": "array", "items": { "type": "string", "enum": [ "aggressive", "passive_aggressive", "instigating", "hostile", "manipulative", "sexual_vulgar", "none", ], }, "description": "Detected toxicity behavior categories.", }, "reasoning": { "type": "string", "description": "Brief explanation of this user's behavior in the conversation.", }, "worst_message": { "type": ["string", "null"], "description": "Most problematic snippet from this user (quoted, max 100 chars), or null if nothing notable.", }, "off_topic": { "type": "boolean", "description": "True if this user's messages were primarily off-topic personal drama.", }, "topic_category": { "type": "string", "enum": [ "gaming", "personal_drama", "relationship_issues", "real_life_venting", "gossip", "general_chat", "meta", ], "description": "What topic category this user's messages fall into.", }, "topic_reasoning": { "type": "string", "description": "Brief explanation of the topic classification for this user.", }, "coherence_score": { "type": "number", "description": "Coherence rating 0.0-1.0 across this user's messages. Normal texting shortcuts are fine.", }, "coherence_flag": { "type": "string", "enum": [ "normal", "intoxicated", "tired", "angry_typing", "mobile_keyboard", "language_barrier", ], "description": "Best guess at why coherence is low, if applicable.", }, "note_update": { "type": ["string", "null"], "description": "New observation about this user's pattern, or null.", }, "detected_game": { "type": ["string", "null"], "description": "The game channel name this user's messages are about, or null.", }, }, "required": ["username", "toxicity_score", "categories", "reasoning", "off_topic", "topic_category", "topic_reasoning", "coherence_score", "coherence_flag"], }, "description": "Findings for each user who participated in the conversation.", }, "conversation_summary": { "type": "string", "description": "One-sentence summary of the overall conversation tone and any escalation patterns.", }, }, "required": ["user_findings", "conversation_summary"], }, }, } _NO_TEMPERATURE_MODELS = {"gpt-5-nano", "o1", "o1-mini", "o1-preview", "o3", "o3-mini", "o4-mini"} class LLMClient: def __init__(self, base_url: str, model: str, api_key: str = "not-needed", db=None, no_think: bool = False, concurrency: int = 4): self.model = model self.host = base_url.rstrip("/") self._db = db self._no_think = no_think self._supports_temperature = model not in _NO_TEMPERATURE_MODELS timeout = 600.0 if self.host else 120.0 # local models need longer for VRAM load client_kwargs = {"api_key": api_key, "timeout": timeout} if self.host: client_kwargs["base_url"] = f"{self.host}/v1" self._client = AsyncOpenAI(**client_kwargs) self._semaphore = asyncio.Semaphore(concurrency) def _log_llm(self, request_type: str, duration_ms: int, success: bool, request: str, response: str | None = None, error: str | None = None, input_tokens: int | None = None, output_tokens: int | None = None): """Fire-and-forget LLM log entry to the database.""" if not self._db: return asyncio.create_task(self._db.save_llm_log( request_type=request_type, model=self.model, duration_ms=duration_ms, success=success, request=request, response=response, error=error, input_tokens=input_tokens, output_tokens=output_tokens, )) def _append_no_think(self, text: str) -> str: return text + "\n/no_think" if self._no_think else text async def close(self): await self._client.close() async def analyze_message( self, message: str, context: str = "", user_notes: str = "", channel_context: str = "", mention_context: str = "", ) -> dict | None: user_content = f"=== RECENT CHANNEL MESSAGES (for background context only) ===\n{context}\n\n" if user_notes: user_content += f"=== NOTES ABOUT THIS USER (from prior analysis) ===\n{user_notes}\n\n" if channel_context: user_content += f"=== CHANNEL INFO ===\n{channel_context}\n\n" if mention_context: user_content += f"=== USER REPORT (a user flagged this conversation — focus on this concern) ===\n{mention_context}\n\n" user_content += f"=== TARGET MESSAGE (analyze THIS message only) ===\n{message}" user_content = self._append_no_think(user_content) req_json = json.dumps([ {"role": "system", "content": SYSTEM_PROMPT[:500]}, {"role": "user", "content": user_content}, ], default=str) t0 = time.monotonic() async with self._semaphore: try: temp_kwargs = {"temperature": 0.1} if self._supports_temperature else {} response = await self._client.chat.completions.create( model=self.model, messages=[ {"role": "system", "content": SYSTEM_PROMPT}, {"role": "user", "content": user_content}, ], tools=[ANALYSIS_TOOL], tool_choice={"type": "function", "function": {"name": "report_analysis"}}, **temp_kwargs, max_completion_tokens=2048, ) elapsed = int((time.monotonic() - t0) * 1000) choice = response.choices[0] usage = response.usage # Extract tool call arguments if choice.message.tool_calls: tool_call = choice.message.tool_calls[0] resp_text = tool_call.function.arguments args = json.loads(resp_text) self._log_llm("analysis", elapsed, True, req_json, resp_text, input_tokens=usage.prompt_tokens if usage else None, output_tokens=usage.completion_tokens if usage else None) return self._validate_result(args) # Fallback: try parsing the message content as JSON if choice.message.content: self._log_llm("analysis", elapsed, True, req_json, choice.message.content, input_tokens=usage.prompt_tokens if usage else None, output_tokens=usage.completion_tokens if usage else None) return self._parse_content_fallback(choice.message.content) logger.warning("No tool call or content in LLM response.") self._log_llm("analysis", elapsed, False, req_json, error="Empty response") return None except Exception as e: elapsed = int((time.monotonic() - t0) * 1000) logger.error("LLM analysis error: %s", e) self._log_llm("analysis", elapsed, False, req_json, error=str(e)) return None def _validate_result(self, result: dict) -> dict: score = float(result.get("toxicity_score", 0.0)) result["toxicity_score"] = min(max(score, 0.0), 1.0) if not isinstance(result.get("categories"), list): result["categories"] = ["none"] if not isinstance(result.get("reasoning"), str): result["reasoning"] = "" result["off_topic"] = bool(result.get("off_topic", False)) result.setdefault("topic_category", "general_chat") result.setdefault("topic_reasoning", "") coherence = float(result.get("coherence_score", 0.85)) result["coherence_score"] = min(max(coherence, 0.0), 1.0) result.setdefault("coherence_flag", "normal") result.setdefault("note_update", None) result.setdefault("detected_game", None) return result def _parse_content_fallback(self, text: str) -> dict | None: """Try to parse plain-text content as JSON if tool calling didn't work.""" import re # Try direct JSON try: result = json.loads(text.strip()) return self._validate_result(result) except (json.JSONDecodeError, ValueError): pass # Try extracting from code block match = re.search(r"```(?:json)?\s*(\{.*?\})\s*```", text, re.DOTALL) if match: try: result = json.loads(match.group(1)) return self._validate_result(result) except (json.JSONDecodeError, ValueError): pass # Regex fallback for toxicity_score score_match = re.search(r'"toxicity_score"\s*:\s*([\d.]+)', text) if score_match: return { "toxicity_score": min(max(float(score_match.group(1)), 0.0), 1.0), "categories": ["unknown"], "reasoning": "Parsed via fallback regex", } logger.warning("Could not parse LLM content fallback: %s", text[:200]) return None # -- Conversation-level analysis (mention scan) -- @staticmethod def _format_relative_time(delta: timedelta) -> str: total_seconds = int(delta.total_seconds()) if total_seconds < 60: return f"~{total_seconds}s ago" minutes = total_seconds // 60 if minutes < 60: return f"~{minutes}m ago" hours = minutes // 60 return f"~{hours}h ago" @staticmethod def _format_conversation_block( messages: list[tuple[str, str, datetime, str | None]], now: datetime | None = None, new_message_start: int | None = None, ) -> str: """Format messages as a compact timestamped chat block. Each tuple is (username, content, timestamp, reply_to_username). Consecutive messages from the same user collapse to indented lines. Replies shown as ``username → replied_to:``. If *new_message_start* is given, a separator is inserted before that index so the LLM can distinguish context from new messages. """ if now is None: now = datetime.now(timezone.utc) lines = [f"[Current time: {now.strftime('%I:%M %p')}]", ""] last_user = None in_new_section = new_message_start is None or new_message_start == 0 for idx, (username, content, ts, reply_to) in enumerate(messages): if new_message_start is not None and idx == new_message_start: lines.append("") lines.append("--- NEW MESSAGES (score only these) ---") lines.append("") last_user = None # reset collapse so first new msg gets full header in_new_section = True delta = now - ts.replace(tzinfo=timezone.utc) if ts.tzinfo is None else now - ts rel = LLMClient._format_relative_time(delta) tag = "" if in_new_section else " [CONTEXT]" if username == last_user: # Continuation from same user — indent for line in content.split("\n"): lines.append(f" {line}") else: # New user block if reply_to: prefix = f"[{rel}] {username} → {reply_to}:{tag} " else: prefix = f"[{rel}] {username}:{tag} " msg_lines = content.split("\n") lines.append(prefix + msg_lines[0]) for line in msg_lines[1:]: lines.append(f" {line}") last_user = username return "\n".join(lines) async def analyze_conversation( self, messages: list[tuple[str, str, datetime, str | None]], mention_context: str = "", channel_context: str = "", user_notes_map: dict[str, str] | None = None, new_message_start: int | None = None, ) -> dict | None: """Analyze a conversation block in one call, returning per-user findings.""" if not messages: return None convo_block = self._format_conversation_block(messages, new_message_start=new_message_start) user_content = f"=== CONVERSATION BLOCK ===\n{convo_block}\n\n" if user_notes_map: notes_lines = [f" {u}: {n}" for u, n in user_notes_map.items() if n] if notes_lines: user_content += "=== USER NOTES (from prior analysis) ===\n" + "\n".join(notes_lines) + "\n\n" if channel_context: user_content += f"=== CHANNEL INFO ===\n{channel_context}\n\n" if mention_context: user_content += f"=== USER REPORT (a user flagged this conversation — focus on this concern) ===\n{mention_context}\n\n" user_content += "Analyze the conversation block above and report findings for each user." user_content = self._append_no_think(user_content) req_json = json.dumps([ {"role": "system", "content": SYSTEM_PROMPT[:500]}, {"role": "user", "content": user_content[:500]}, ], default=str) t0 = time.monotonic() async with self._semaphore: try: temp_kwargs = {"temperature": 0.1} if self._supports_temperature else {} response = await self._client.chat.completions.create( model=self.model, messages=[ {"role": "system", "content": SYSTEM_PROMPT}, {"role": "user", "content": user_content}, ], tools=[CONVERSATION_TOOL], tool_choice={"type": "function", "function": {"name": "report_conversation_scan"}}, **temp_kwargs, max_completion_tokens=4096, ) elapsed = int((time.monotonic() - t0) * 1000) choice = response.choices[0] usage = response.usage if choice.message.tool_calls: tool_call = choice.message.tool_calls[0] resp_text = tool_call.function.arguments args = json.loads(resp_text) self._log_llm("conversation", elapsed, True, req_json, resp_text, input_tokens=usage.prompt_tokens if usage else None, output_tokens=usage.completion_tokens if usage else None) return self._validate_conversation_result(args) # Fallback: try parsing the message content as JSON if choice.message.content: logger.warning("No tool call in conversation analysis — trying content fallback. Content: %s", choice.message.content[:300]) fallback = self._parse_conversation_content_fallback(choice.message.content) if fallback: self._log_llm("conversation", elapsed, True, req_json, choice.message.content, input_tokens=usage.prompt_tokens if usage else None, output_tokens=usage.completion_tokens if usage else None) return fallback logger.warning("No tool call or parseable content in conversation analysis response.") self._log_llm("conversation", elapsed, False, req_json, error="Empty response") return None except Exception as e: elapsed = int((time.monotonic() - t0) * 1000) logger.error("LLM conversation analysis error: %s", e) self._log_llm("conversation", elapsed, False, req_json, error=str(e)) return None @staticmethod def _validate_conversation_result(result: dict) -> dict: """Validate and normalize conversation analysis result.""" if not isinstance(result, dict): return {"user_findings": [], "conversation_summary": ""} findings = [f for f in result.get("user_findings", []) if isinstance(f, dict)] for finding in findings: finding.setdefault("username", "unknown") score = float(finding.get("toxicity_score", 0.0)) finding["toxicity_score"] = min(max(score, 0.0), 1.0) if not isinstance(finding.get("categories"), list): finding["categories"] = ["none"] finding.setdefault("reasoning", "") finding.setdefault("worst_message", None) finding["off_topic"] = bool(finding.get("off_topic", False)) finding.setdefault("topic_category", "general_chat") finding.setdefault("topic_reasoning", "") coherence = float(finding.get("coherence_score", 0.85)) finding["coherence_score"] = min(max(coherence, 0.0), 1.0) finding.setdefault("coherence_flag", "normal") finding.setdefault("note_update", None) finding.setdefault("detected_game", None) result["user_findings"] = findings result.setdefault("conversation_summary", "") return result def _parse_conversation_content_fallback(self, text: str) -> dict | None: """Try to parse plain-text content as JSON for conversation analysis.""" import re # Try direct JSON parse try: result = json.loads(text.strip()) if "user_findings" in result: return self._validate_conversation_result(result) except (json.JSONDecodeError, ValueError): pass # Try extracting from code block match = re.search(r"```(?:json)?\s*(\{.*?\})\s*```", text, re.DOTALL) if match: try: result = json.loads(match.group(1)) if "user_findings" in result: return self._validate_conversation_result(result) except (json.JSONDecodeError, ValueError): pass logger.warning("Could not parse conversation content fallback: %s", text[:200]) return None async def chat( self, messages: list[dict[str, str]], system_prompt: str, on_first_token=None, recent_bot_replies: list[str] | None = None, ) -> str | None: """Send a conversational chat request (no tools). If *on_first_token* is an async callable it will be awaited once the first content token arrives (useful for triggering the typing indicator only after the model starts generating). """ patched = list(messages) # Append recent bot replies to the system prompt so the model avoids # repeating the same phrases / joke structures. effective_prompt = system_prompt if recent_bot_replies: avoid_block = "\n".join(f"- {r}" for r in recent_bot_replies) effective_prompt += ( "\n\nIMPORTANT — you recently said the following. " "Do NOT reuse any of these phrases, sentence structures, or joke patterns. " "Come up with something completely different.\n" + avoid_block ) req_json = json.dumps([ {"role": "system", "content": effective_prompt[:500]}, *[{"role": m["role"], "content": str(m.get("content", ""))[:200]} for m in patched], ], default=str) t0 = time.monotonic() async with self._semaphore: try: temp_kwargs = {"temperature": 0.9, "frequency_penalty": 0.8, "presence_penalty": 0.6} if self._supports_temperature else {} stream = await self._client.chat.completions.create( model=self.model, messages=[ {"role": "system", "content": effective_prompt}, *patched, ], **temp_kwargs, max_completion_tokens=2048, stream=True, ) chunks: list[str] = [] notified = False async for chunk in stream: delta = chunk.choices[0].delta if chunk.choices else None if delta and delta.content: if not notified and on_first_token: await on_first_token() notified = True chunks.append(delta.content) content = "".join(chunks).strip() elapsed = int((time.monotonic() - t0) * 1000) self._log_llm("chat", elapsed, bool(content), req_json, content or None) return content if content else None except Exception as e: elapsed = int((time.monotonic() - t0) * 1000) logger.error("LLM chat error: %s", e) self._log_llm("chat", elapsed, False, req_json, error=str(e)) return None async def analyze_image( self, image_bytes: bytes, system_prompt: str, user_text: str = "", on_first_token=None, ) -> str | None: """Send an image to the vision model with a system prompt. Returns the generated text response, or None on failure. """ b64 = base64.b64encode(image_bytes).decode() data_url = f"data:image/png;base64,{b64}" user_content: list[dict] = [ {"type": "image_url", "image_url": {"url": data_url}}, ] text_part = user_text or "" if self._no_think: text_part = (text_part + "\n/no_think").strip() if text_part: user_content.append({"type": "text", "text": text_part}) req_json = json.dumps([ {"role": "system", "content": system_prompt[:500]}, {"role": "user", "content": f"[image {len(image_bytes)} bytes] {user_text or ''}"}, ], default=str) t0 = time.monotonic() async with self._semaphore: try: async def _stream_image(): temp_kwargs = {"temperature": 0.8} if self._supports_temperature else {} stream = await self._client.chat.completions.create( model=self.model, messages=[ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_content}, ], **temp_kwargs, max_completion_tokens=2048, stream=True, ) chunks: list[str] = [] notified = False async for chunk in stream: delta = chunk.choices[0].delta if chunk.choices else None if delta and delta.content: if not notified and on_first_token: await on_first_token() notified = True chunks.append(delta.content) return "".join(chunks).strip() content = await asyncio.wait_for(_stream_image(), timeout=120) elapsed = int((time.monotonic() - t0) * 1000) self._log_llm("image", elapsed, bool(content), req_json, content or None) return content if content else None except asyncio.TimeoutError: elapsed = int((time.monotonic() - t0) * 1000) logger.error("LLM image analysis timed out after %ds", elapsed // 1000) self._log_llm("image", elapsed, False, req_json, error="Timeout") return None except Exception as e: elapsed = int((time.monotonic() - t0) * 1000) logger.error("LLM image analysis error: %s", e) self._log_llm("image", elapsed, False, req_json, error=str(e)) return None async def raw_analyze( self, message: str, context: str = "", user_notes: str = "", channel_context: str = "", ) -> tuple[str, dict | None]: """Return the raw LLM response string AND parsed result for /bcs-test (single LLM call).""" user_content = f"=== RECENT CHANNEL MESSAGES (for background context only) ===\n{context}\n\n" if user_notes: user_content += f"=== NOTES ABOUT THIS USER (from prior analysis) ===\n{user_notes}\n\n" if channel_context: user_content += f"=== CHANNEL INFO ===\n{channel_context}\n\n" user_content += f"=== TARGET MESSAGE (analyze THIS message only) ===\n{message}" user_content = self._append_no_think(user_content) req_json = json.dumps([ {"role": "system", "content": SYSTEM_PROMPT[:500]}, {"role": "user", "content": user_content}, ], default=str) t0 = time.monotonic() async with self._semaphore: try: temp_kwargs = {"temperature": 0.1} if self._supports_temperature else {} response = await self._client.chat.completions.create( model=self.model, messages=[ {"role": "system", "content": SYSTEM_PROMPT}, {"role": "user", "content": user_content}, ], tools=[ANALYSIS_TOOL], tool_choice={"type": "function", "function": {"name": "report_analysis"}}, **temp_kwargs, max_completion_tokens=2048, ) elapsed = int((time.monotonic() - t0) * 1000) choice = response.choices[0] usage = response.usage parts = [] parsed = None if choice.message.content: parts.append(f"Content: {choice.message.content}") if choice.message.tool_calls: for tc in choice.message.tool_calls: parts.append( f"Tool call: {tc.function.name}({tc.function.arguments})" ) # Parse the first tool call args = json.loads(choice.message.tool_calls[0].function.arguments) parsed = self._validate_result(args) elif choice.message.content: parsed = self._parse_content_fallback(choice.message.content) raw = "\n".join(parts) or "(empty response)" self._log_llm("raw_analyze", elapsed, parsed is not None, req_json, raw, input_tokens=usage.prompt_tokens if usage else None, output_tokens=usage.completion_tokens if usage else None) return raw, parsed except Exception as e: elapsed = int((time.monotonic() - t0) * 1000) self._log_llm("raw_analyze", elapsed, False, req_json, error=str(e)) return f"Error: {e}", None