Compare commits

..

1 Commits

Author SHA1 Message Date
agent-company 349bb4d073 refactor(db): use shared pooled DatabaseClient singleton instead of per-call instances
- Replace get_db_client() creating new DatabaseClient on every call with a
  module-level singleton initialized once at startup via init_db_client()
- Add init_db_client() and close_db_client() lifecycle functions called
  from FastAPI lifespan handler
- Migrate all DatabaseClient methods from legacy self.connect()/self.conn
  to pooled self.get_conn() context manager for thread-safe connection reuse
- Pool is properly torn down on application shutdown

Closes leeworks-agents/SPARC#7

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-26 04:15:03 +00:00
4 changed files with 187 additions and 473 deletions
+6 -2
View File
@@ -16,11 +16,13 @@ from SPARC.analyzer import CompanyAnalyzer
from SPARC.auth import ( from SPARC.auth import (
TokenResponse, TokenResponse,
UserResponse, UserResponse,
close_db_client,
create_tokens, create_tokens,
decode_token, decode_token,
get_current_admin, get_current_admin,
get_current_user, get_current_user,
get_db_client, get_db_client,
init_db_client,
) )
from SPARC.types import BatchAnalysisResult, CompanyAnalysisResult from SPARC.types import BatchAnalysisResult, CompanyAnalysisResult
@@ -148,12 +150,14 @@ _analyzer: CompanyAnalyzer | None = None
@asynccontextmanager @asynccontextmanager
async def lifespan(app: FastAPI): async def lifespan(app: FastAPI):
"""Initialize resources on startup.""" """Initialize resources on startup, clean up on shutdown."""
global _analyzer global _analyzer
init_db_client()
_analyzer = CompanyAnalyzer() _analyzer = CompanyAnalyzer()
yield yield
# Cleanup if needed # Cleanup
_analyzer = None _analyzer = None
close_db_client()
app = FastAPI( app = FastAPI(
+29 -4
View File
@@ -132,11 +132,36 @@ def decode_token(token: str) -> Optional[TokenPayload]:
return None return None
# Shared database client singleton, initialized at startup via init_db_client()
_db_client: DatabaseClient | None = None
def init_db_client() -> None:
"""Initialize the shared database client. Call once at app startup."""
global _db_client
_db_client = DatabaseClient(config.database_url)
_db_client.connect()
def close_db_client() -> None:
"""Close the shared database client. Call at app shutdown."""
global _db_client
if _db_client:
_db_client.close()
_db_client = None
def get_db_client() -> DatabaseClient: def get_db_client() -> DatabaseClient:
"""Get database client for auth operations.""" """Get the shared pooled database client for auth operations.
client = DatabaseClient(config.database_url)
client.connect() Returns the module-level singleton DatabaseClient. If not yet initialized
return client (e.g., during tests), creates a new instance as a fallback.
"""
global _db_client
if _db_client is None:
_db_client = DatabaseClient(config.database_url)
_db_client.connect()
return _db_client
async def get_current_user( async def get_current_user(
+152 -165
View File
@@ -201,8 +201,6 @@ class DatabaseClient:
Returns: Returns:
Cached message dict if found, None otherwise Cached message dict if found, None otherwise
""" """
self.connect()
prompt_hash = self.hash_prompt(prompt) prompt_hash = self.hash_prompt(prompt)
query = """ query = """
@@ -225,10 +223,11 @@ class DatabaseClient:
query += " ORDER BY timestamp DESC LIMIT 1" query += " ORDER BY timestamp DESC LIMIT 1"
with self.conn.cursor(cursor_factory=RealDictCursor) as cursor: with self.get_conn() as conn:
cursor.execute(query, params) with conn.cursor(cursor_factory=RealDictCursor) as cursor:
result = cursor.fetchone() cursor.execute(query, params)
return dict(result) if result else None result = cursor.fetchone()
return dict(result) if result else None
def store_message( def store_message(
self, self,
@@ -256,33 +255,32 @@ class DatabaseClient:
Returns: Returns:
The ID of the inserted record The ID of the inserted record
""" """
self.connect()
prompt_hash = self.hash_prompt(prompt) prompt_hash = self.hash_prompt(prompt)
with self.conn.cursor() as cursor: with self.get_conn() as conn:
cursor.execute( with conn.cursor() as cursor:
""" cursor.execute(
INSERT INTO llm_messages """
(prompt, prompt_hash, response, company_name, analysis_type, model, metadata, token_usage, is_cached) INSERT INTO llm_messages
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s) (prompt, prompt_hash, response, company_name, analysis_type, model, metadata, token_usage, is_cached)
RETURNING id VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
""", RETURNING id
( """,
prompt, (
prompt_hash, prompt,
response, prompt_hash,
company_name, response,
analysis_type, company_name,
model, analysis_type,
json.dumps(metadata) if metadata else None, model,
json.dumps(token_usage) if token_usage else None, json.dumps(metadata) if metadata else None,
is_cached, json.dumps(token_usage) if token_usage else None,
), is_cached,
) ),
)
message_id = cursor.fetchone()[0] message_id = cursor.fetchone()[0]
self.conn.commit() conn.commit()
return message_id return message_id
@@ -304,8 +302,6 @@ class DatabaseClient:
Returns: Returns:
List of message dictionaries List of message dictionaries
""" """
self.connect()
query = "SELECT * FROM llm_messages WHERE 1=1" query = "SELECT * FROM llm_messages WHERE 1=1"
params = [] params = []
@@ -320,9 +316,10 @@ class DatabaseClient:
query += " ORDER BY timestamp DESC LIMIT %s OFFSET %s" query += " ORDER BY timestamp DESC LIMIT %s OFFSET %s"
params.extend([limit, offset]) params.extend([limit, offset])
with self.conn.cursor(cursor_factory=RealDictCursor) as cursor: with self.get_conn() as conn:
cursor.execute(query, params) with conn.cursor(cursor_factory=RealDictCursor) as cursor:
return [dict(row) for row in cursor.fetchall()] cursor.execute(query, params)
return [dict(row) for row in cursor.fetchall()]
def get_analytics(self, days: int = 30) -> Dict: def get_analytics(self, days: int = 30) -> Dict:
"""Get analytics on message usage. """Get analytics on message usage.
@@ -333,53 +330,52 @@ class DatabaseClient:
Returns: Returns:
Dictionary with analytics data Dictionary with analytics data
""" """
self.connect() with self.get_conn() as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cursor:
# Total messages
cursor.execute(
"""
SELECT COUNT(*) as total_messages
FROM llm_messages
WHERE timestamp >= NOW() - INTERVAL '%s days'
""",
(days,),
)
total = cursor.fetchone()["total_messages"]
with self.conn.cursor(cursor_factory=RealDictCursor) as cursor: # Messages by company
# Total messages cursor.execute(
cursor.execute( """
""" SELECT company_name, COUNT(*) as count
SELECT COUNT(*) as total_messages FROM llm_messages
FROM llm_messages WHERE timestamp >= NOW() - INTERVAL '%s days'
WHERE timestamp >= NOW() - INTERVAL '%s days' GROUP BY company_name
""", ORDER BY count DESC
(days,), LIMIT 10
) """,
total = cursor.fetchone()["total_messages"] (days,),
)
by_company = cursor.fetchall()
# Messages by company # Messages by type
cursor.execute( cursor.execute(
""" """
SELECT company_name, COUNT(*) as count SELECT analysis_type, COUNT(*) as count
FROM llm_messages FROM llm_messages
WHERE timestamp >= NOW() - INTERVAL '%s days' WHERE timestamp >= NOW() - INTERVAL '%s days'
GROUP BY company_name GROUP BY analysis_type
ORDER BY count DESC ORDER BY count DESC
LIMIT 10 """,
""", (days,),
(days,), )
) by_type = cursor.fetchall()
by_company = cursor.fetchall()
# Messages by type return {
cursor.execute( "total_messages": total,
""" "by_company": [dict(row) for row in by_company],
SELECT analysis_type, COUNT(*) as count "by_type": [dict(row) for row in by_type],
FROM llm_messages "period_days": days,
WHERE timestamp >= NOW() - INTERVAL '%s days' }
GROUP BY analysis_type
ORDER BY count DESC
""",
(days,),
)
by_type = cursor.fetchall()
return {
"total_messages": total,
"by_company": [dict(row) for row in by_company],
"by_type": [dict(row) for row in by_type],
"period_days": days,
}
# Patent Cache Methods # Patent Cache Methods
@@ -505,25 +501,23 @@ class DatabaseClient:
Returns: Returns:
Created user dict or None if email exists Created user dict or None if email exists
""" """
self.connect()
password_hash = self.hash_password(password) password_hash = self.hash_password(password)
try: try:
with self.conn.cursor(cursor_factory=RealDictCursor) as cursor: with self.get_conn() as conn:
cursor.execute( with conn.cursor(cursor_factory=RealDictCursor) as cursor:
""" cursor.execute(
INSERT INTO users (email, password_hash, role) """
VALUES (%s, %s, %s) INSERT INTO users (email, password_hash, role)
RETURNING id, email, role, created_at VALUES (%s, %s, %s)
""", RETURNING id, email, role, created_at
(email, password_hash, role), """,
) (email, password_hash, role),
user = cursor.fetchone() )
self.conn.commit() user = cursor.fetchone()
conn.commit()
return dict(user) if user else None return dict(user) if user else None
except psycopg2.errors.UniqueViolation: except psycopg2.errors.UniqueViolation:
self.conn.rollback()
return None return None
def authenticate_user(self, email: str, password: str) -> Optional[Dict]: def authenticate_user(self, email: str, password: str) -> Optional[Dict]:
@@ -536,23 +530,22 @@ class DatabaseClient:
Returns: Returns:
User dict if authenticated, None otherwise User dict if authenticated, None otherwise
""" """
self.connect() with self.get_conn() as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cursor:
cursor.execute(
"SELECT * FROM users WHERE email = %s",
(email,),
)
user = cursor.fetchone()
with self.conn.cursor(cursor_factory=RealDictCursor) as cursor: if user and self.verify_password(password, user["password_hash"]):
cursor.execute( return {
"SELECT * FROM users WHERE email = %s", "id": user["id"],
(email,), "email": user["email"],
) "role": user["role"],
user = cursor.fetchone() "created_at": user["created_at"],
}
if user and self.verify_password(password, user["password_hash"]): return None
return {
"id": user["id"],
"email": user["email"],
"role": user["role"],
"created_at": user["created_at"],
}
return None
def get_user_by_id(self, user_id: int) -> Optional[Dict]: def get_user_by_id(self, user_id: int) -> Optional[Dict]:
"""Get a user by ID. """Get a user by ID.
@@ -563,15 +556,14 @@ class DatabaseClient:
Returns: Returns:
User dict or None User dict or None
""" """
self.connect() with self.get_conn() as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cursor:
with self.conn.cursor(cursor_factory=RealDictCursor) as cursor: cursor.execute(
cursor.execute( "SELECT id, email, role, created_at FROM users WHERE id = %s",
"SELECT id, email, role, created_at FROM users WHERE id = %s", (user_id,),
(user_id,), )
) user = cursor.fetchone()
user = cursor.fetchone() return dict(user) if user else None
return dict(user) if user else None
def get_user_by_email(self, email: str) -> Optional[Dict]: def get_user_by_email(self, email: str) -> Optional[Dict]:
"""Get a user by email. """Get a user by email.
@@ -582,15 +574,14 @@ class DatabaseClient:
Returns: Returns:
User dict or None User dict or None
""" """
self.connect() with self.get_conn() as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cursor:
with self.conn.cursor(cursor_factory=RealDictCursor) as cursor: cursor.execute(
cursor.execute( "SELECT id, email, role, created_at FROM users WHERE email = %s",
"SELECT id, email, role, created_at FROM users WHERE email = %s", (email,),
(email,), )
) user = cursor.fetchone()
user = cursor.fetchone() return dict(user) if user else None
return dict(user) if user else None
def get_all_users(self, limit: int = 100, offset: int = 0) -> List[Dict]: def get_all_users(self, limit: int = 100, offset: int = 0) -> List[Dict]:
"""Get all users (admin only). """Get all users (admin only).
@@ -602,19 +593,18 @@ class DatabaseClient:
Returns: Returns:
List of user dicts List of user dicts
""" """
self.connect() with self.get_conn() as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cursor:
with self.conn.cursor(cursor_factory=RealDictCursor) as cursor: cursor.execute(
cursor.execute( """
""" SELECT id, email, role, created_at
SELECT id, email, role, created_at FROM users
FROM users ORDER BY created_at DESC
ORDER BY created_at DESC LIMIT %s OFFSET %s
LIMIT %s OFFSET %s """,
""", (limit, offset),
(limit, offset), )
) return [dict(row) for row in cursor.fetchall()]
return [dict(row) for row in cursor.fetchall()]
def update_user_role(self, user_id: int, role: str) -> Optional[Dict]: def update_user_role(self, user_id: int, role: str) -> Optional[Dict]:
"""Update a user's role (admin only). """Update a user's role (admin only).
@@ -626,20 +616,19 @@ class DatabaseClient:
Returns: Returns:
Updated user dict or None Updated user dict or None
""" """
self.connect() with self.get_conn() as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cursor:
with self.conn.cursor(cursor_factory=RealDictCursor) as cursor: cursor.execute(
cursor.execute( """
""" UPDATE users
UPDATE users SET role = %s, updated_at = CURRENT_TIMESTAMP
SET role = %s, updated_at = CURRENT_TIMESTAMP WHERE id = %s
WHERE id = %s RETURNING id, email, role, created_at
RETURNING id, email, role, created_at """,
""", (role, user_id),
(role, user_id), )
) user = cursor.fetchone()
user = cursor.fetchone() conn.commit()
self.conn.commit()
return dict(user) if user else None return dict(user) if user else None
def delete_user(self, user_id: int) -> bool: def delete_user(self, user_id: int) -> bool:
@@ -651,12 +640,11 @@ class DatabaseClient:
Returns: Returns:
True if deleted True if deleted
""" """
self.connect() with self.get_conn() as conn:
with conn.cursor() as cursor:
with self.conn.cursor() as cursor: cursor.execute("DELETE FROM users WHERE id = %s", (user_id,))
cursor.execute("DELETE FROM users WHERE id = %s", (user_id,)) deleted = cursor.rowcount > 0
deleted = cursor.rowcount > 0 conn.commit()
self.conn.commit()
return deleted return deleted
def get_user_count(self) -> int: def get_user_count(self) -> int:
@@ -665,8 +653,7 @@ class DatabaseClient:
Returns: Returns:
Number of users Number of users
""" """
self.connect() with self.get_conn() as conn:
with conn.cursor() as cursor:
with self.conn.cursor() as cursor: cursor.execute("SELECT COUNT(*) FROM users")
cursor.execute("SELECT COUNT(*) FROM users") return cursor.fetchone()[0]
return cursor.fetchone()[0]
-302
View File
@@ -1,302 +0,0 @@
"""Tests for JWT authentication flow: register, login, protected routes, refresh, admin access."""
from datetime import datetime, timezone
from unittest.mock import MagicMock, patch
import pytest
from fastapi.testclient import TestClient
from SPARC.api import app
from SPARC.auth import create_access_token, create_refresh_token
@pytest.fixture
def client():
"""Create test client."""
return TestClient(app)
@pytest.fixture(autouse=True)
def mock_db(monkeypatch):
"""Mock the database client used by auth endpoints.
Returns a MagicMock with all DB methods pre-configured.
"""
db = MagicMock()
# Default: no users exist
db.get_user_count.return_value = 0
db.get_user_by_id.return_value = None
db.get_user_by_email.return_value = None
db.authenticate_user.return_value = None
db.create_user.return_value = None
db.get_all_users.return_value = []
db.update_user_role.return_value = None
db.delete_user.return_value = False
with patch("SPARC.api.get_db_client", return_value=db), \
patch("SPARC.auth.get_db_client", return_value=db):
yield db
def _make_admin_user():
return {
"id": 1,
"email": "admin@test.com",
"role": "admin",
"created_at": datetime(2025, 1, 1, tzinfo=timezone.utc),
}
def _make_regular_user():
return {
"id": 2,
"email": "user@test.com",
"role": "user",
"created_at": datetime(2025, 1, 1, tzinfo=timezone.utc),
}
def _auth_header(user_dict):
"""Create an Authorization header with a valid access token for the given user."""
token = create_access_token(user_dict["id"], user_dict["email"], user_dict["role"])
return {"Authorization": f"Bearer {token}"}
class TestRegister:
"""POST /auth/register"""
def test_register_first_user_becomes_admin(self, client, mock_db):
"""First registered user should get admin role."""
mock_db.get_user_count.return_value = 0
mock_db.create_user.return_value = {
"id": 1,
"email": "admin@test.com",
"role": "admin",
"created_at": datetime(2025, 1, 1, tzinfo=timezone.utc),
}
response = client.post(
"/auth/register",
json={"email": "admin@test.com", "password": "securepass123"},
)
assert response.status_code == 200
data = response.json()
assert data["email"] == "admin@test.com"
assert data["role"] == "admin"
mock_db.create_user.assert_called_once_with(
email="admin@test.com", password="securepass123", role="admin"
)
def test_register_subsequent_user_gets_user_role(self, client, mock_db):
"""Non-first user should get regular user role."""
mock_db.get_user_count.return_value = 1
mock_db.create_user.return_value = _make_regular_user()
response = client.post(
"/auth/register",
json={"email": "user@test.com", "password": "securepass123"},
)
assert response.status_code == 200
data = response.json()
assert data["role"] == "user"
def test_register_duplicate_email_returns_400(self, client, mock_db):
"""Registering with an existing email should return 400."""
mock_db.get_user_count.return_value = 1
mock_db.create_user.return_value = None # indicates duplicate
response = client.post(
"/auth/register",
json={"email": "existing@test.com", "password": "securepass123"},
)
assert response.status_code == 400
assert "already registered" in response.json()["detail"].lower()
class TestLogin:
"""POST /auth/login"""
def test_login_valid_credentials_returns_tokens(self, client, mock_db):
"""Valid credentials should return access and refresh tokens."""
user = _make_regular_user()
mock_db.authenticate_user.return_value = user
response = client.post(
"/auth/login",
json={"email": "user@test.com", "password": "correctpassword"},
)
assert response.status_code == 200
data = response.json()
assert "access_token" in data
assert "refresh_token" in data
assert data["token_type"] == "bearer"
def test_login_invalid_credentials_returns_401(self, client, mock_db):
"""Invalid credentials should return 401."""
mock_db.authenticate_user.return_value = None
response = client.post(
"/auth/login",
json={"email": "user@test.com", "password": "wrongpassword"},
)
assert response.status_code == 401
assert "invalid" in response.json()["detail"].lower()
class TestGetMe:
"""GET /auth/me"""
def test_valid_access_token_returns_user(self, client, mock_db):
"""A valid access token should return the user's data."""
user = _make_regular_user()
mock_db.get_user_by_id.return_value = user
response = client.get("/auth/me", headers=_auth_header(user))
assert response.status_code == 200
data = response.json()
assert data["email"] == "user@test.com"
assert data["id"] == 2
def test_missing_token_returns_401(self, client):
"""No token should return 401 (403 from HTTPBearer)."""
response = client.get("/auth/me")
assert response.status_code in (401, 403)
def test_expired_token_returns_401(self, client, mock_db):
"""An expired token should return 401."""
# Create a token that has already expired
from datetime import timedelta
import jwt as pyjwt
from SPARC.auth import JWT_ALGORITHM, JWT_SECRET
payload = {
"sub": "1",
"email": "user@test.com",
"role": "user",
"exp": datetime.now(timezone.utc) - timedelta(hours=1),
"type": "access",
}
expired_token = pyjwt.encode(payload, JWT_SECRET, algorithm=JWT_ALGORITHM)
response = client.get(
"/auth/me", headers={"Authorization": f"Bearer {expired_token}"}
)
assert response.status_code == 401
def test_refresh_token_as_access_returns_401(self, client, mock_db):
"""Using a refresh token as an access token should return 401."""
user = _make_regular_user()
refresh_token = create_refresh_token(user["id"], user["email"], user["role"])
response = client.get(
"/auth/me", headers={"Authorization": f"Bearer {refresh_token}"}
)
assert response.status_code == 401
class TestRefreshToken:
"""POST /auth/refresh"""
def test_valid_refresh_token_returns_new_tokens(self, client, mock_db):
"""A valid refresh token should issue new access and refresh tokens."""
user = _make_regular_user()
mock_db.get_user_by_id.return_value = user
refresh = create_refresh_token(user["id"], user["email"], user["role"])
response = client.post(
"/auth/refresh", json={"refresh_token": refresh}
)
assert response.status_code == 200
data = response.json()
assert "access_token" in data
assert "refresh_token" in data
def test_invalid_refresh_token_returns_401(self, client, mock_db):
"""An invalid refresh token should return 401."""
response = client.post(
"/auth/refresh", json={"refresh_token": "invalid-token-string"}
)
assert response.status_code == 401
def test_access_token_as_refresh_returns_401(self, client, mock_db):
"""Using an access token as a refresh token should return 401."""
user = _make_regular_user()
access = create_access_token(user["id"], user["email"], user["role"])
response = client.post(
"/auth/refresh", json={"refresh_token": access}
)
assert response.status_code == 401
class TestAdminUsers:
"""GET /admin/users and PATCH /admin/users/{id}/role"""
def test_admin_can_list_users(self, client, mock_db):
"""Admin token should allow listing users."""
admin = _make_admin_user()
mock_db.get_user_by_id.return_value = admin
mock_db.get_all_users.return_value = [admin, _make_regular_user()]
response = client.get("/admin/users", headers=_auth_header(admin))
assert response.status_code == 200
data = response.json()
assert len(data) == 2
def test_regular_user_cannot_list_users(self, client, mock_db):
"""Regular user token should be rejected with 403."""
user = _make_regular_user()
mock_db.get_user_by_id.return_value = user
response = client.get("/admin/users", headers=_auth_header(user))
assert response.status_code == 403
def test_no_token_cannot_list_users(self, client):
"""No token should be rejected."""
response = client.get("/admin/users")
assert response.status_code in (401, 403)
def test_admin_can_change_user_role(self, client, mock_db):
"""Admin should be able to change another user's role."""
admin = _make_admin_user()
mock_db.get_user_by_id.return_value = admin
mock_db.update_user_role.return_value = {
"id": 2,
"email": "user@test.com",
"role": "admin",
"created_at": datetime(2025, 1, 1, tzinfo=timezone.utc),
}
response = client.patch(
"/admin/users/2/role",
json={"role": "admin"},
headers=_auth_header(admin),
)
assert response.status_code == 200
assert response.json()["role"] == "admin"
def test_admin_cannot_change_own_role(self, client, mock_db):
"""Admin should not be able to change their own role."""
admin = _make_admin_user()
mock_db.get_user_by_id.return_value = admin
response = client.patch(
"/admin/users/1/role",
json={"role": "user"},
headers=_auth_header(admin),
)
assert response.status_code == 400
assert "own role" in response.json()["detail"].lower()