Add LLM request/response logging to database

Log every LLM call (analysis, chat, image, raw_analyze) to a new
LlmLog table with request type, model, token counts, duration,
success/failure, and truncated request/response payloads. Enables
debugging prompt issues and tracking usage.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-02-22 22:55:19 -05:00
parent fd798ce027
commit b04d3da2bf
3 changed files with 159 additions and 16 deletions

View File

@@ -126,6 +126,23 @@ class Database:
ALTER TABLE UserState ADD UserNotes NVARCHAR(MAX) NULL
""")
cursor.execute("""
IF NOT EXISTS (SELECT * FROM sys.tables WHERE name = 'LlmLog')
CREATE TABLE LlmLog (
Id BIGINT IDENTITY(1,1) PRIMARY KEY,
RequestType NVARCHAR(50) NOT NULL,
Model NVARCHAR(100) NOT NULL,
InputTokens INT NULL,
OutputTokens INT NULL,
DurationMs INT NOT NULL,
Success BIT NOT NULL,
Request NVARCHAR(MAX) NOT NULL,
Response NVARCHAR(MAX) NULL,
Error NVARCHAR(MAX) NULL,
CreatedAt DATETIME2 NOT NULL DEFAULT SYSUTCDATETIME()
)
""")
cursor.close()
def _parse_database_name(self) -> str:
@@ -348,6 +365,55 @@ class Database:
finally:
conn.close()
# ------------------------------------------------------------------
# LLM Log (fire-and-forget via asyncio.create_task)
# ------------------------------------------------------------------
async def save_llm_log(
self,
request_type: str,
model: str,
duration_ms: int,
success: bool,
request: str,
response: str | None = None,
error: str | None = None,
input_tokens: int | None = None,
output_tokens: int | None = None,
) -> None:
"""Save an LLM request/response log entry."""
if not self._available:
return
try:
await asyncio.to_thread(
self._save_llm_log_sync,
request_type, model, duration_ms, success, request,
response, error, input_tokens, output_tokens,
)
except Exception:
logger.exception("Failed to save LLM log")
def _save_llm_log_sync(
self, request_type, model, duration_ms, success, request,
response, error, input_tokens, output_tokens,
):
conn = self._connect()
try:
cursor = conn.cursor()
cursor.execute(
"""INSERT INTO LlmLog
(RequestType, Model, InputTokens, OutputTokens, DurationMs,
Success, Request, Response, Error)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""",
request_type, model, input_tokens, output_tokens, duration_ms,
1 if success else 0,
request[:4000] if request else "",
response[:4000] if response else None,
error[:4000] if error else None,
)
cursor.close()
finally:
conn.close()
async def close(self):
"""No persistent connection to close (connections are per-operation)."""
pass

View File

@@ -2,6 +2,7 @@ import asyncio
import base64
import json
import logging
import time
from pathlib import Path
from openai import AsyncOpenAI
@@ -97,9 +98,10 @@ ANALYSIS_TOOL = {
class LLMClient:
def __init__(self, base_url: str, model: str, api_key: str = "not-needed"):
def __init__(self, base_url: str, model: str, api_key: str = "not-needed", db=None):
self.model = model
self.host = base_url.rstrip("/")
self._db = db
self._client = AsyncOpenAI(
base_url=f"{self.host}/v1",
api_key=api_key,
@@ -107,6 +109,24 @@ class LLMClient:
)
self._semaphore = asyncio.Semaphore(1) # serialize requests to avoid overloading
def _log_llm(self, request_type: str, duration_ms: int, success: bool,
request: str, response: str | None = None, error: str | None = None,
input_tokens: int | None = None, output_tokens: int | None = None):
"""Fire-and-forget LLM log entry to the database."""
if not self._db:
return
asyncio.create_task(self._db.save_llm_log(
request_type=request_type,
model=self.model,
duration_ms=duration_ms,
success=success,
request=request,
response=response,
error=error,
input_tokens=input_tokens,
output_tokens=output_tokens,
))
async def close(self):
await self._client.close()
@@ -119,7 +139,13 @@ class LLMClient:
user_content += f"=== NOTES ABOUT THIS USER (from prior analysis) ===\n{user_notes}\n\n"
if channel_context:
user_content += f"=== CHANNEL INFO ===\n{channel_context}\n\n"
user_content += f"=== TARGET MESSAGE (analyze THIS message only) ===\n{message}"
user_content += f"=== TARGET MESSAGE (analyze THIS message only) ===\n{message}\n/no_think"
req_json = json.dumps([
{"role": "system", "content": SYSTEM_PROMPT[:500]},
{"role": "user", "content": user_content},
], default=str)
t0 = time.monotonic()
async with self._semaphore:
try:
@@ -132,26 +158,38 @@ class LLMClient:
tools=[ANALYSIS_TOOL],
tool_choice={"type": "function", "function": {"name": "report_analysis"}},
temperature=0.1,
max_tokens=1024,
max_tokens=2048,
)
elapsed = int((time.monotonic() - t0) * 1000)
choice = response.choices[0]
usage = response.usage
# Extract tool call arguments
if choice.message.tool_calls:
tool_call = choice.message.tool_calls[0]
args = json.loads(tool_call.function.arguments)
resp_text = tool_call.function.arguments
args = json.loads(resp_text)
self._log_llm("analysis", elapsed, True, req_json, resp_text,
input_tokens=usage.prompt_tokens if usage else None,
output_tokens=usage.completion_tokens if usage else None)
return self._validate_result(args)
# Fallback: try parsing the message content as JSON
if choice.message.content:
self._log_llm("analysis", elapsed, True, req_json, choice.message.content,
input_tokens=usage.prompt_tokens if usage else None,
output_tokens=usage.completion_tokens if usage else None)
return self._parse_content_fallback(choice.message.content)
logger.warning("No tool call or content in LLM response.")
self._log_llm("analysis", elapsed, False, req_json, error="Empty response")
return None
except Exception as e:
elapsed = int((time.monotonic() - t0) * 1000)
logger.error("LLM analysis error: %s", e)
self._log_llm("analysis", elapsed, False, req_json, error=str(e))
return None
def _validate_result(self, result: dict) -> dict:
@@ -219,16 +257,29 @@ class LLMClient:
first content token arrives (useful for triggering the typing indicator
only after the model starts generating).
"""
# Append /no_think to the last user message to disable thinking on Qwen3
patched = []
for m in messages:
patched.append(m)
if patched and patched[-1].get("role") == "user":
patched[-1] = {**patched[-1], "content": patched[-1]["content"] + "\n/no_think"}
req_json = json.dumps([
{"role": "system", "content": system_prompt[:500]},
*[{"role": m["role"], "content": str(m.get("content", ""))[:200]} for m in patched],
], default=str)
t0 = time.monotonic()
async with self._semaphore:
try:
stream = await self._client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": system_prompt},
*messages,
*patched,
],
temperature=0.8,
max_tokens=300,
max_tokens=2048,
stream=True,
)
@@ -243,9 +294,13 @@ class LLMClient:
chunks.append(delta.content)
content = "".join(chunks).strip()
elapsed = int((time.monotonic() - t0) * 1000)
self._log_llm("chat", elapsed, bool(content), req_json, content or None)
return content if content else None
except Exception as e:
elapsed = int((time.monotonic() - t0) * 1000)
logger.error("LLM chat error: %s", e)
self._log_llm("chat", elapsed, False, req_json, error=str(e))
return None
async def analyze_image(
@@ -265,8 +320,13 @@ class LLMClient:
user_content: list[dict] = [
{"type": "image_url", "image_url": {"url": data_url}},
]
if user_text:
user_content.append({"type": "text", "text": user_text})
user_content.append({"type": "text", "text": (user_text or "") + "\n/no_think"})
req_json = json.dumps([
{"role": "system", "content": system_prompt[:500]},
{"role": "user", "content": f"[image {len(image_bytes)} bytes] {user_text or ''}"},
], default=str)
t0 = time.monotonic()
async with self._semaphore:
try:
@@ -277,7 +337,7 @@ class LLMClient:
{"role": "user", "content": user_content},
],
temperature=0.8,
max_tokens=500,
max_tokens=2048,
stream=True,
)
@@ -292,9 +352,13 @@ class LLMClient:
chunks.append(delta.content)
content = "".join(chunks).strip()
elapsed = int((time.monotonic() - t0) * 1000)
self._log_llm("image", elapsed, bool(content), req_json, content or None)
return content if content else None
except Exception as e:
elapsed = int((time.monotonic() - t0) * 1000)
logger.error("LLM image analysis error: %s", e)
self._log_llm("image", elapsed, False, req_json, error=str(e))
return None
async def raw_analyze(
@@ -307,7 +371,13 @@ class LLMClient:
user_content += f"=== NOTES ABOUT THIS USER (from prior analysis) ===\n{user_notes}\n\n"
if channel_context:
user_content += f"=== CHANNEL INFO ===\n{channel_context}\n\n"
user_content += f"=== TARGET MESSAGE (analyze THIS message only) ===\n{message}"
user_content += f"=== TARGET MESSAGE (analyze THIS message only) ===\n{message}\n/no_think"
req_json = json.dumps([
{"role": "system", "content": SYSTEM_PROMPT[:500]},
{"role": "user", "content": user_content},
], default=str)
t0 = time.monotonic()
async with self._semaphore:
try:
@@ -320,10 +390,12 @@ class LLMClient:
tools=[ANALYSIS_TOOL],
tool_choice={"type": "function", "function": {"name": "report_analysis"}},
temperature=0.1,
max_tokens=1024,
max_tokens=2048,
)
elapsed = int((time.monotonic() - t0) * 1000)
choice = response.choices[0]
usage = response.usage
parts = []
parsed = None
@@ -342,7 +414,12 @@ class LLMClient:
parsed = self._parse_content_fallback(choice.message.content)
raw = "\n".join(parts) or "(empty response)"
self._log_llm("raw_analyze", elapsed, parsed is not None, req_json, raw,
input_tokens=usage.prompt_tokens if usage else None,
output_tokens=usage.completion_tokens if usage else None)
return raw, parsed
except Exception as e:
elapsed = int((time.monotonic() - t0) * 1000)
self._log_llm("raw_analyze", elapsed, False, req_json, error=str(e))
return f"Error: {e}", None