import asyncio import json import logging from pathlib import Path from openai import AsyncOpenAI logger = logging.getLogger("bcs.llm") _PROMPTS_DIR = Path(__file__).resolve().parent.parent / "prompts" SYSTEM_PROMPT = (_PROMPTS_DIR / "analysis.txt").read_text(encoding="utf-8") ANALYSIS_TOOL = { "type": "function", "function": { "name": "report_analysis", "description": "Report the toxicity and topic analysis of a Discord message.", "parameters": { "type": "object", "properties": { "toxicity_score": { "type": "number", "description": "Toxicity rating from 0.0 (completely harmless) to 1.0 (extremely toxic).", }, "categories": { "type": "array", "items": { "type": "string", "enum": [ "aggressive", "passive_aggressive", "instigating", "hostile", "manipulative", "none", ], }, "description": "Detected toxicity behavior categories.", }, "reasoning": { "type": "string", "description": "Brief explanation of the toxicity analysis.", }, "off_topic": { "type": "boolean", "description": "True if the message is off-topic personal drama rather than gaming-related conversation.", }, "topic_category": { "type": "string", "enum": [ "gaming", "personal_drama", "relationship_issues", "real_life_venting", "gossip", "general_chat", "meta", ], "description": "What topic category the message falls into.", }, "topic_reasoning": { "type": "string", "description": "Brief explanation of the topic classification.", }, "coherence_score": { "type": "number", "description": "Coherence rating from 0.0 (incoherent gibberish) to 1.0 (clear and well-written). Normal texting shortcuts are fine.", }, "coherence_flag": { "type": "string", "enum": [ "normal", "intoxicated", "tired", "angry_typing", "mobile_keyboard", "language_barrier", ], "description": "Best guess at why coherence is low, if applicable.", }, "note_update": { "type": ["string", "null"], "description": "Brief new observation about this user's style/behavior for future reference, or null if nothing new.", }, }, "required": ["toxicity_score", "categories", "reasoning", "off_topic", "topic_category", "topic_reasoning", "coherence_score", "coherence_flag"], }, }, } class LLMClient: def __init__(self, base_url: str, model: str, api_key: str = "not-needed"): self.model = model self.host = base_url.rstrip("/") self._client = AsyncOpenAI( base_url=f"{self.host}/v1", api_key=api_key, timeout=600.0, # 10 min — first request loads model into VRAM ) self._semaphore = asyncio.Semaphore(1) # serialize requests to avoid overloading async def close(self): await self._client.close() async def analyze_message( self, message: str, context: str = "", user_notes: str = "" ) -> dict | None: user_content = f"=== CONTEXT (other users' recent messages, for background only) ===\n{context}\n\n" if user_notes: user_content += f"=== NOTES ABOUT THIS USER (from prior analysis) ===\n{user_notes}\n\n" user_content += f"=== TARGET MESSAGE (analyze THIS message only) ===\n{message}" async with self._semaphore: try: response = await self._client.chat.completions.create( model=self.model, messages=[ {"role": "system", "content": SYSTEM_PROMPT}, {"role": "user", "content": user_content}, ], tools=[ANALYSIS_TOOL], tool_choice={"type": "function", "function": {"name": "report_analysis"}}, temperature=0.1, ) choice = response.choices[0] # Extract tool call arguments if choice.message.tool_calls: tool_call = choice.message.tool_calls[0] args = json.loads(tool_call.function.arguments) return self._validate_result(args) # Fallback: try parsing the message content as JSON if choice.message.content: return self._parse_content_fallback(choice.message.content) logger.warning("No tool call or content in LLM response.") return None except Exception as e: logger.error("LLM analysis error: %s", e) return None def _validate_result(self, result: dict) -> dict: score = float(result.get("toxicity_score", 0.0)) result["toxicity_score"] = min(max(score, 0.0), 1.0) if not isinstance(result.get("categories"), list): result["categories"] = ["none"] if not isinstance(result.get("reasoning"), str): result["reasoning"] = "" result["off_topic"] = bool(result.get("off_topic", False)) result.setdefault("topic_category", "general_chat") result.setdefault("topic_reasoning", "") coherence = float(result.get("coherence_score", 0.85)) result["coherence_score"] = min(max(coherence, 0.0), 1.0) result.setdefault("coherence_flag", "normal") result.setdefault("note_update", None) return result def _parse_content_fallback(self, text: str) -> dict | None: """Try to parse plain-text content as JSON if tool calling didn't work.""" import re # Try direct JSON try: result = json.loads(text.strip()) return self._validate_result(result) except (json.JSONDecodeError, ValueError): pass # Try extracting from code block match = re.search(r"```(?:json)?\s*(\{.*?\})\s*```", text, re.DOTALL) if match: try: result = json.loads(match.group(1)) return self._validate_result(result) except (json.JSONDecodeError, ValueError): pass # Regex fallback for toxicity_score score_match = re.search(r'"toxicity_score"\s*:\s*([\d.]+)', text) if score_match: return { "toxicity_score": min(max(float(score_match.group(1)), 0.0), 1.0), "categories": ["unknown"], "reasoning": "Parsed via fallback regex", } logger.warning("Could not parse LLM content fallback: %s", text[:200]) return None async def chat( self, messages: list[dict[str, str]], system_prompt: str, on_first_token=None, ) -> str | None: """Send a conversational chat request (no tools). If *on_first_token* is an async callable it will be awaited once the first content token arrives (useful for triggering the typing indicator only after the model starts generating). """ async with self._semaphore: try: stream = await self._client.chat.completions.create( model=self.model, messages=[ {"role": "system", "content": system_prompt}, *messages, ], temperature=0.8, max_tokens=300, stream=True, ) chunks: list[str] = [] notified = False async for chunk in stream: delta = chunk.choices[0].delta if chunk.choices else None if delta and delta.content: if not notified and on_first_token: await on_first_token() notified = True chunks.append(delta.content) content = "".join(chunks).strip() return content if content else None except Exception as e: logger.error("LLM chat error: %s", e) return None async def raw_analyze(self, message: str, context: str = "", user_notes: str = "") -> tuple[str, dict | None]: """Return the raw LLM response string AND parsed result for /bcs-test (single LLM call).""" user_content = f"=== CONTEXT (other users' recent messages, for background only) ===\n{context}\n\n" if user_notes: user_content += f"=== NOTES ABOUT THIS USER (from prior analysis) ===\n{user_notes}\n\n" user_content += f"=== TARGET MESSAGE (analyze THIS message only) ===\n{message}" async with self._semaphore: try: response = await self._client.chat.completions.create( model=self.model, messages=[ {"role": "system", "content": SYSTEM_PROMPT}, {"role": "user", "content": user_content}, ], tools=[ANALYSIS_TOOL], tool_choice={"type": "function", "function": {"name": "report_analysis"}}, temperature=0.1, ) choice = response.choices[0] parts = [] parsed = None if choice.message.content: parts.append(f"Content: {choice.message.content}") if choice.message.tool_calls: for tc in choice.message.tool_calls: parts.append( f"Tool call: {tc.function.name}({tc.function.arguments})" ) # Parse the first tool call args = json.loads(choice.message.tool_calls[0].function.arguments) parsed = self._validate_result(args) elif choice.message.content: parsed = self._parse_content_fallback(choice.message.content) raw = "\n".join(parts) or "(empty response)" return raw, parsed except Exception as e: return f"Error: {e}", None