feat: add database mode for LLM message storage and analytics

Implements a database mode that stores LLM prompts and responses in PostgreSQL
instead of making API calls. This enables:
- Testing without consuming API credits
- Collecting analytics on usage patterns
- Development and debugging workflows

Changes:
- Added DatabaseClient class for PostgreSQL operations
- Modified LLMAnalyzer to support database/API mode toggle
- Added USE_DATABASE config flag to switch between modes
- Included Docker Compose setup for PostgreSQL
- Added utility scripts for database init and analytics viewing
- Comprehensive documentation in DATABASE_MODE.md

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
2026-03-10 21:13:13 -04:00
parent 11a4aba46f
commit 44456cb073
11 changed files with 952 additions and 4 deletions
+8
View File
@@ -12,3 +12,11 @@ api_key = os.getenv("API_KEY")
# OpenRouter API key for LLM analysis
openrouter_api_key = os.getenv("OPENROUTER_API_KEY")
# Database configuration
database_url = os.getenv("DATABASE_URL", "postgresql://postgres:postgres@localhost:5432/sparc")
# Toggle between database mode and API mode
# When True: stores all messages in database instead of sending to OpenRouter
# When False: sends messages to OpenRouter API as normal
use_database = os.getenv("USE_DATABASE", "false").lower() in ("true", "1", "yes")
+210
View File
@@ -0,0 +1,210 @@
"""Database client for storing and retrieving LLM messages."""
import psycopg2
from psycopg2.extras import RealDictCursor
from typing import Dict, List, Optional
from datetime import datetime
import json
class DatabaseClient:
"""Handles database operations for message storage and retrieval."""
def __init__(self, database_url: str):
"""Initialize the database client.
Args:
database_url: PostgreSQL connection string
"""
self.database_url = database_url
self.conn = None
def connect(self):
"""Establish database connection."""
if not self.conn or self.conn.closed:
self.conn = psycopg2.connect(self.database_url)
def close(self):
"""Close database connection."""
if self.conn and not self.conn.closed:
self.conn.close()
def initialize_schema(self):
"""Create database tables if they don't exist."""
self.connect()
with self.conn.cursor() as cursor:
# Create messages table
cursor.execute("""
CREATE TABLE IF NOT EXISTS llm_messages (
id SERIAL PRIMARY KEY,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
company_name VARCHAR(255),
analysis_type VARCHAR(50),
model VARCHAR(100),
prompt TEXT NOT NULL,
response TEXT,
metadata JSONB,
token_usage JSONB,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
# Create index on timestamp for analytics queries
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_messages_timestamp
ON llm_messages(timestamp)
""")
# Create index on company_name for filtering
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_messages_company
ON llm_messages(company_name)
""")
self.conn.commit()
def store_message(
self,
prompt: str,
response: str,
company_name: Optional[str] = None,
analysis_type: Optional[str] = None,
model: Optional[str] = None,
metadata: Optional[Dict] = None,
token_usage: Optional[Dict] = None,
) -> int:
"""Store an LLM message exchange in the database.
Args:
prompt: The prompt sent to the LLM
response: The response from the LLM
company_name: Name of company being analyzed
analysis_type: Type of analysis (e.g., 'single_patent', 'portfolio')
model: Model identifier used
metadata: Additional metadata as dict
token_usage: Token usage information
Returns:
The ID of the inserted record
"""
self.connect()
with self.conn.cursor() as cursor:
cursor.execute(
"""
INSERT INTO llm_messages
(prompt, response, company_name, analysis_type, model, metadata, token_usage)
VALUES (%s, %s, %s, %s, %s, %s, %s)
RETURNING id
""",
(
prompt,
response,
company_name,
analysis_type,
model,
json.dumps(metadata) if metadata else None,
json.dumps(token_usage) if token_usage else None,
),
)
message_id = cursor.fetchone()[0]
self.conn.commit()
return message_id
def get_messages(
self,
company_name: Optional[str] = None,
analysis_type: Optional[str] = None,
limit: int = 100,
offset: int = 0,
) -> List[Dict]:
"""Retrieve messages from the database.
Args:
company_name: Filter by company name
analysis_type: Filter by analysis type
limit: Maximum number of records to return
offset: Number of records to skip
Returns:
List of message dictionaries
"""
self.connect()
query = "SELECT * FROM llm_messages WHERE 1=1"
params = []
if company_name:
query += " AND company_name = %s"
params.append(company_name)
if analysis_type:
query += " AND analysis_type = %s"
params.append(analysis_type)
query += " ORDER BY timestamp DESC LIMIT %s OFFSET %s"
params.extend([limit, offset])
with self.conn.cursor(cursor_factory=RealDictCursor) as cursor:
cursor.execute(query, params)
return [dict(row) for row in cursor.fetchall()]
def get_analytics(self, days: int = 30) -> Dict:
"""Get analytics on message usage.
Args:
days: Number of days to look back
Returns:
Dictionary with analytics data
"""
self.connect()
with self.conn.cursor(cursor_factory=RealDictCursor) as cursor:
# Total messages
cursor.execute(
"""
SELECT COUNT(*) as total_messages
FROM llm_messages
WHERE timestamp >= NOW() - INTERVAL '%s days'
""",
(days,),
)
total = cursor.fetchone()["total_messages"]
# Messages by company
cursor.execute(
"""
SELECT company_name, COUNT(*) as count
FROM llm_messages
WHERE timestamp >= NOW() - INTERVAL '%s days'
GROUP BY company_name
ORDER BY count DESC
LIMIT 10
""",
(days,),
)
by_company = cursor.fetchall()
# Messages by type
cursor.execute(
"""
SELECT analysis_type, COUNT(*) as count
FROM llm_messages
WHERE timestamp >= NOW() - INTERVAL '%s days'
GROUP BY analysis_type
ORDER BY count DESC
""",
(days,),
)
by_type = cursor.fetchall()
return {
"total_messages": total,
"by_company": [dict(row) for row in by_company],
"by_type": [dict(row) for row in by_type],
"period_days": days,
}
+89 -4
View File
@@ -2,22 +2,33 @@
from openai import OpenAI
from SPARC import config
from SPARC.database import DatabaseClient
from typing import Dict
class LLMAnalyzer:
"""Handles LLM-based analysis of patent content."""
def __init__(self, api_key: str | None = None, test_mode: bool = False):
def __init__(self, api_key: str | None = None, test_mode: bool = False, use_database: bool | None = None):
"""Initialize the LLM analyzer.
Args:
api_key: OpenRouter API key. If None, will attempt to load from config.
test_mode: If True, print prompts instead of making API calls
use_database: If True, store messages in database instead of calling API.
If None, will use config.use_database
"""
self.test_mode = test_mode
self.use_database = use_database if use_database is not None else config.use_database
self.db_client = None
if (api_key or config.openrouter_api_key) and not test_mode:
# Initialize database client if in database mode
if self.use_database:
self.db_client = DatabaseClient(config.database_url)
self.db_client.initialize_schema()
# Initialize OpenRouter client if not in database mode
if (api_key or config.openrouter_api_key) and not test_mode and not self.use_database:
self.client = OpenAI(
api_key=api_key or config.openrouter_api_key,
base_url="https://openrouter.ai/api/v1"
@@ -57,13 +68,47 @@ Provide a concise analysis (2-3 paragraphs) focusing on what this patent reveals
print("=" * 80)
return "[TEST MODE - No API call made]"
# Database mode: store the prompt and return a placeholder response
if self.use_database:
response_text = "[DATABASE MODE] Message stored for testing/analytics. Enable API mode to get actual analysis."
self.db_client.store_message(
prompt=prompt,
response=response_text,
company_name=company_name,
analysis_type="single_patent",
model=self.model if hasattr(self, 'model') else None,
metadata={"patent_content_length": len(patent_content)}
)
return response_text
# API mode: send to OpenRouter
if self.client:
response = self.client.chat.completions.create(
model=self.model,
max_tokens=1024,
messages=[{"role": "user", "content": prompt}],
)
return response.choices[0].message.content
response_text = response.choices[0].message.content
# Store in database if db_client is available (for logging even in API mode)
if self.db_client:
self.db_client.store_message(
prompt=prompt,
response=response_text,
company_name=company_name,
analysis_type="single_patent",
model=self.model,
metadata={"patent_content_length": len(patent_content)},
token_usage={
"prompt_tokens": response.usage.prompt_tokens,
"completion_tokens": response.usage.completion_tokens,
"total_tokens": response.usage.total_tokens
} if hasattr(response, 'usage') else None
)
return response_text
def analyze_patent_portfolio(
self, patents_data: list[Dict[str, str]], company_name: str
@@ -105,6 +150,25 @@ Provide a comprehensive analysis (4-5 paragraphs) with a final verdict on the co
print(prompt)
return "[TEST MODE]"
# Database mode: store the prompt and return a placeholder response
if self.use_database:
response_text = "[DATABASE MODE] Message stored for testing/analytics. Enable API mode to get actual analysis."
self.db_client.store_message(
prompt=prompt,
response=response_text,
company_name=company_name,
analysis_type="portfolio",
model=self.model if hasattr(self, 'model') else None,
metadata={
"patent_count": len(patents_data),
"patent_ids": [p['patent_id'] for p in patents_data]
}
)
return response_text
# API mode: send to OpenRouter
try:
response = self.client.chat.completions.create(
model=self.model,
@@ -112,7 +176,28 @@ Provide a comprehensive analysis (4-5 paragraphs) with a final verdict on the co
messages=[{"role": "user", "content": prompt}],
)
return response.choices[0].message.content
response_text = response.choices[0].message.content
# Store in database if db_client is available (for logging even in API mode)
if self.db_client:
self.db_client.store_message(
prompt=prompt,
response=response_text,
company_name=company_name,
analysis_type="portfolio",
model=self.model,
metadata={
"patent_count": len(patents_data),
"patent_ids": [p['patent_id'] for p in patents_data]
},
token_usage={
"prompt_tokens": response.usage.prompt_tokens,
"completion_tokens": response.usage.completion_tokens,
"total_tokens": response.usage.total_tokens
} if hasattr(response, 'usage') else None
)
return response_text
except AttributeError:
return prompt