Files
SPARC/SPARC/database.py
T
0xWheatyz 44456cb073 feat: add database mode for LLM message storage and analytics
Implements a database mode that stores LLM prompts and responses in PostgreSQL
instead of making API calls. This enables:
- Testing without consuming API credits
- Collecting analytics on usage patterns
- Development and debugging workflows

Changes:
- Added DatabaseClient class for PostgreSQL operations
- Modified LLMAnalyzer to support database/API mode toggle
- Added USE_DATABASE config flag to switch between modes
- Included Docker Compose setup for PostgreSQL
- Added utility scripts for database init and analytics viewing
- Comprehensive documentation in DATABASE_MODE.md

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2026-03-10 21:13:13 -04:00

211 lines
6.5 KiB
Python

"""Database client for storing and retrieving LLM messages."""
import psycopg2
from psycopg2.extras import RealDictCursor
from typing import Dict, List, Optional
from datetime import datetime
import json
class DatabaseClient:
"""Handles database operations for message storage and retrieval."""
def __init__(self, database_url: str):
"""Initialize the database client.
Args:
database_url: PostgreSQL connection string
"""
self.database_url = database_url
self.conn = None
def connect(self):
"""Establish database connection."""
if not self.conn or self.conn.closed:
self.conn = psycopg2.connect(self.database_url)
def close(self):
"""Close database connection."""
if self.conn and not self.conn.closed:
self.conn.close()
def initialize_schema(self):
"""Create database tables if they don't exist."""
self.connect()
with self.conn.cursor() as cursor:
# Create messages table
cursor.execute("""
CREATE TABLE IF NOT EXISTS llm_messages (
id SERIAL PRIMARY KEY,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
company_name VARCHAR(255),
analysis_type VARCHAR(50),
model VARCHAR(100),
prompt TEXT NOT NULL,
response TEXT,
metadata JSONB,
token_usage JSONB,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
# Create index on timestamp for analytics queries
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_messages_timestamp
ON llm_messages(timestamp)
""")
# Create index on company_name for filtering
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_messages_company
ON llm_messages(company_name)
""")
self.conn.commit()
def store_message(
self,
prompt: str,
response: str,
company_name: Optional[str] = None,
analysis_type: Optional[str] = None,
model: Optional[str] = None,
metadata: Optional[Dict] = None,
token_usage: Optional[Dict] = None,
) -> int:
"""Store an LLM message exchange in the database.
Args:
prompt: The prompt sent to the LLM
response: The response from the LLM
company_name: Name of company being analyzed
analysis_type: Type of analysis (e.g., 'single_patent', 'portfolio')
model: Model identifier used
metadata: Additional metadata as dict
token_usage: Token usage information
Returns:
The ID of the inserted record
"""
self.connect()
with self.conn.cursor() as cursor:
cursor.execute(
"""
INSERT INTO llm_messages
(prompt, response, company_name, analysis_type, model, metadata, token_usage)
VALUES (%s, %s, %s, %s, %s, %s, %s)
RETURNING id
""",
(
prompt,
response,
company_name,
analysis_type,
model,
json.dumps(metadata) if metadata else None,
json.dumps(token_usage) if token_usage else None,
),
)
message_id = cursor.fetchone()[0]
self.conn.commit()
return message_id
def get_messages(
self,
company_name: Optional[str] = None,
analysis_type: Optional[str] = None,
limit: int = 100,
offset: int = 0,
) -> List[Dict]:
"""Retrieve messages from the database.
Args:
company_name: Filter by company name
analysis_type: Filter by analysis type
limit: Maximum number of records to return
offset: Number of records to skip
Returns:
List of message dictionaries
"""
self.connect()
query = "SELECT * FROM llm_messages WHERE 1=1"
params = []
if company_name:
query += " AND company_name = %s"
params.append(company_name)
if analysis_type:
query += " AND analysis_type = %s"
params.append(analysis_type)
query += " ORDER BY timestamp DESC LIMIT %s OFFSET %s"
params.extend([limit, offset])
with self.conn.cursor(cursor_factory=RealDictCursor) as cursor:
cursor.execute(query, params)
return [dict(row) for row in cursor.fetchall()]
def get_analytics(self, days: int = 30) -> Dict:
"""Get analytics on message usage.
Args:
days: Number of days to look back
Returns:
Dictionary with analytics data
"""
self.connect()
with self.conn.cursor(cursor_factory=RealDictCursor) as cursor:
# Total messages
cursor.execute(
"""
SELECT COUNT(*) as total_messages
FROM llm_messages
WHERE timestamp >= NOW() - INTERVAL '%s days'
""",
(days,),
)
total = cursor.fetchone()["total_messages"]
# Messages by company
cursor.execute(
"""
SELECT company_name, COUNT(*) as count
FROM llm_messages
WHERE timestamp >= NOW() - INTERVAL '%s days'
GROUP BY company_name
ORDER BY count DESC
LIMIT 10
""",
(days,),
)
by_company = cursor.fetchall()
# Messages by type
cursor.execute(
"""
SELECT analysis_type, COUNT(*) as count
FROM llm_messages
WHERE timestamp >= NOW() - INTERVAL '%s days'
GROUP BY analysis_type
ORDER BY count DESC
""",
(days,),
)
by_type = cursor.fetchall()
return {
"total_messages": total,
"by_company": [dict(row) for row in by_company],
"by_type": [dict(row) for row in by_type],
"period_days": days,
}