Compare commits

..

1 Commits

Author SHA1 Message Date
agent-company ae9f257dcb test(auth): add comprehensive JWT authentication test suite
Add 17 tests in tests/test_auth.py covering all auth flows:
- Registration: first user admin, subsequent user, duplicate email
- Login: valid credentials, invalid credentials
- Protected routes: valid token, missing token, expired token, wrong token type
- Token refresh: valid refresh, invalid refresh, access-as-refresh rejected
- Admin endpoints: list users, change role, own-role prevention, permission checks

All tests use mocked database (no live DB required).

Closes leeworks-agents/SPARC#10

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-26 04:24:12 +00:00
4 changed files with 473 additions and 187 deletions
+2 -6
View File
@@ -16,13 +16,11 @@ from SPARC.analyzer import CompanyAnalyzer
from SPARC.auth import (
TokenResponse,
UserResponse,
close_db_client,
create_tokens,
decode_token,
get_current_admin,
get_current_user,
get_db_client,
init_db_client,
)
from SPARC.types import BatchAnalysisResult, CompanyAnalysisResult
@@ -150,14 +148,12 @@ _analyzer: CompanyAnalyzer | None = None
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Initialize resources on startup, clean up on shutdown."""
"""Initialize resources on startup."""
global _analyzer
init_db_client()
_analyzer = CompanyAnalyzer()
yield
# Cleanup
# Cleanup if needed
_analyzer = None
close_db_client()
app = FastAPI(
+4 -29
View File
@@ -132,36 +132,11 @@ def decode_token(token: str) -> Optional[TokenPayload]:
return None
# Shared database client singleton, initialized at startup via init_db_client()
_db_client: DatabaseClient | None = None
def init_db_client() -> None:
"""Initialize the shared database client. Call once at app startup."""
global _db_client
_db_client = DatabaseClient(config.database_url)
_db_client.connect()
def close_db_client() -> None:
"""Close the shared database client. Call at app shutdown."""
global _db_client
if _db_client:
_db_client.close()
_db_client = None
def get_db_client() -> DatabaseClient:
"""Get the shared pooled database client for auth operations.
Returns the module-level singleton DatabaseClient. If not yet initialized
(e.g., during tests), creates a new instance as a fallback.
"""
global _db_client
if _db_client is None:
_db_client = DatabaseClient(config.database_url)
_db_client.connect()
return _db_client
"""Get database client for auth operations."""
client = DatabaseClient(config.database_url)
client.connect()
return client
async def get_current_user(
+165 -152
View File
@@ -201,6 +201,8 @@ class DatabaseClient:
Returns:
Cached message dict if found, None otherwise
"""
self.connect()
prompt_hash = self.hash_prompt(prompt)
query = """
@@ -223,11 +225,10 @@ class DatabaseClient:
query += " ORDER BY timestamp DESC LIMIT 1"
with self.get_conn() as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cursor:
cursor.execute(query, params)
result = cursor.fetchone()
return dict(result) if result else None
with self.conn.cursor(cursor_factory=RealDictCursor) as cursor:
cursor.execute(query, params)
result = cursor.fetchone()
return dict(result) if result else None
def store_message(
self,
@@ -255,32 +256,33 @@ class DatabaseClient:
Returns:
The ID of the inserted record
"""
self.connect()
prompt_hash = self.hash_prompt(prompt)
with self.get_conn() as conn:
with conn.cursor() as cursor:
cursor.execute(
"""
INSERT INTO llm_messages
(prompt, prompt_hash, response, company_name, analysis_type, model, metadata, token_usage, is_cached)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
RETURNING id
""",
(
prompt,
prompt_hash,
response,
company_name,
analysis_type,
model,
json.dumps(metadata) if metadata else None,
json.dumps(token_usage) if token_usage else None,
is_cached,
),
)
with self.conn.cursor() as cursor:
cursor.execute(
"""
INSERT INTO llm_messages
(prompt, prompt_hash, response, company_name, analysis_type, model, metadata, token_usage, is_cached)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
RETURNING id
""",
(
prompt,
prompt_hash,
response,
company_name,
analysis_type,
model,
json.dumps(metadata) if metadata else None,
json.dumps(token_usage) if token_usage else None,
is_cached,
),
)
message_id = cursor.fetchone()[0]
conn.commit()
message_id = cursor.fetchone()[0]
self.conn.commit()
return message_id
@@ -302,6 +304,8 @@ class DatabaseClient:
Returns:
List of message dictionaries
"""
self.connect()
query = "SELECT * FROM llm_messages WHERE 1=1"
params = []
@@ -316,10 +320,9 @@ class DatabaseClient:
query += " ORDER BY timestamp DESC LIMIT %s OFFSET %s"
params.extend([limit, offset])
with self.get_conn() as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cursor:
cursor.execute(query, params)
return [dict(row) for row in cursor.fetchall()]
with self.conn.cursor(cursor_factory=RealDictCursor) as cursor:
cursor.execute(query, params)
return [dict(row) for row in cursor.fetchall()]
def get_analytics(self, days: int = 30) -> Dict:
"""Get analytics on message usage.
@@ -330,52 +333,53 @@ class DatabaseClient:
Returns:
Dictionary with analytics data
"""
with self.get_conn() as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cursor:
# Total messages
cursor.execute(
"""
SELECT COUNT(*) as total_messages
FROM llm_messages
WHERE timestamp >= NOW() - INTERVAL '%s days'
""",
(days,),
)
total = cursor.fetchone()["total_messages"]
self.connect()
# Messages by company
cursor.execute(
"""
SELECT company_name, COUNT(*) as count
FROM llm_messages
WHERE timestamp >= NOW() - INTERVAL '%s days'
GROUP BY company_name
ORDER BY count DESC
LIMIT 10
""",
(days,),
)
by_company = cursor.fetchall()
with self.conn.cursor(cursor_factory=RealDictCursor) as cursor:
# Total messages
cursor.execute(
"""
SELECT COUNT(*) as total_messages
FROM llm_messages
WHERE timestamp >= NOW() - INTERVAL '%s days'
""",
(days,),
)
total = cursor.fetchone()["total_messages"]
# Messages by type
cursor.execute(
"""
SELECT analysis_type, COUNT(*) as count
FROM llm_messages
WHERE timestamp >= NOW() - INTERVAL '%s days'
GROUP BY analysis_type
ORDER BY count DESC
""",
(days,),
)
by_type = cursor.fetchall()
# Messages by company
cursor.execute(
"""
SELECT company_name, COUNT(*) as count
FROM llm_messages
WHERE timestamp >= NOW() - INTERVAL '%s days'
GROUP BY company_name
ORDER BY count DESC
LIMIT 10
""",
(days,),
)
by_company = cursor.fetchall()
return {
"total_messages": total,
"by_company": [dict(row) for row in by_company],
"by_type": [dict(row) for row in by_type],
"period_days": days,
}
# Messages by type
cursor.execute(
"""
SELECT analysis_type, COUNT(*) as count
FROM llm_messages
WHERE timestamp >= NOW() - INTERVAL '%s days'
GROUP BY analysis_type
ORDER BY count DESC
""",
(days,),
)
by_type = cursor.fetchall()
return {
"total_messages": total,
"by_company": [dict(row) for row in by_company],
"by_type": [dict(row) for row in by_type],
"period_days": days,
}
# Patent Cache Methods
@@ -501,23 +505,25 @@ class DatabaseClient:
Returns:
Created user dict or None if email exists
"""
self.connect()
password_hash = self.hash_password(password)
try:
with self.get_conn() as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cursor:
cursor.execute(
"""
INSERT INTO users (email, password_hash, role)
VALUES (%s, %s, %s)
RETURNING id, email, role, created_at
""",
(email, password_hash, role),
)
user = cursor.fetchone()
conn.commit()
with self.conn.cursor(cursor_factory=RealDictCursor) as cursor:
cursor.execute(
"""
INSERT INTO users (email, password_hash, role)
VALUES (%s, %s, %s)
RETURNING id, email, role, created_at
""",
(email, password_hash, role),
)
user = cursor.fetchone()
self.conn.commit()
return dict(user) if user else None
except psycopg2.errors.UniqueViolation:
self.conn.rollback()
return None
def authenticate_user(self, email: str, password: str) -> Optional[Dict]:
@@ -530,22 +536,23 @@ class DatabaseClient:
Returns:
User dict if authenticated, None otherwise
"""
with self.get_conn() as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cursor:
cursor.execute(
"SELECT * FROM users WHERE email = %s",
(email,),
)
user = cursor.fetchone()
self.connect()
if user and self.verify_password(password, user["password_hash"]):
return {
"id": user["id"],
"email": user["email"],
"role": user["role"],
"created_at": user["created_at"],
}
return None
with self.conn.cursor(cursor_factory=RealDictCursor) as cursor:
cursor.execute(
"SELECT * FROM users WHERE email = %s",
(email,),
)
user = cursor.fetchone()
if user and self.verify_password(password, user["password_hash"]):
return {
"id": user["id"],
"email": user["email"],
"role": user["role"],
"created_at": user["created_at"],
}
return None
def get_user_by_id(self, user_id: int) -> Optional[Dict]:
"""Get a user by ID.
@@ -556,14 +563,15 @@ class DatabaseClient:
Returns:
User dict or None
"""
with self.get_conn() as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cursor:
cursor.execute(
"SELECT id, email, role, created_at FROM users WHERE id = %s",
(user_id,),
)
user = cursor.fetchone()
return dict(user) if user else None
self.connect()
with self.conn.cursor(cursor_factory=RealDictCursor) as cursor:
cursor.execute(
"SELECT id, email, role, created_at FROM users WHERE id = %s",
(user_id,),
)
user = cursor.fetchone()
return dict(user) if user else None
def get_user_by_email(self, email: str) -> Optional[Dict]:
"""Get a user by email.
@@ -574,14 +582,15 @@ class DatabaseClient:
Returns:
User dict or None
"""
with self.get_conn() as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cursor:
cursor.execute(
"SELECT id, email, role, created_at FROM users WHERE email = %s",
(email,),
)
user = cursor.fetchone()
return dict(user) if user else None
self.connect()
with self.conn.cursor(cursor_factory=RealDictCursor) as cursor:
cursor.execute(
"SELECT id, email, role, created_at FROM users WHERE email = %s",
(email,),
)
user = cursor.fetchone()
return dict(user) if user else None
def get_all_users(self, limit: int = 100, offset: int = 0) -> List[Dict]:
"""Get all users (admin only).
@@ -593,18 +602,19 @@ class DatabaseClient:
Returns:
List of user dicts
"""
with self.get_conn() as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cursor:
cursor.execute(
"""
SELECT id, email, role, created_at
FROM users
ORDER BY created_at DESC
LIMIT %s OFFSET %s
""",
(limit, offset),
)
return [dict(row) for row in cursor.fetchall()]
self.connect()
with self.conn.cursor(cursor_factory=RealDictCursor) as cursor:
cursor.execute(
"""
SELECT id, email, role, created_at
FROM users
ORDER BY created_at DESC
LIMIT %s OFFSET %s
""",
(limit, offset),
)
return [dict(row) for row in cursor.fetchall()]
def update_user_role(self, user_id: int, role: str) -> Optional[Dict]:
"""Update a user's role (admin only).
@@ -616,19 +626,20 @@ class DatabaseClient:
Returns:
Updated user dict or None
"""
with self.get_conn() as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cursor:
cursor.execute(
"""
UPDATE users
SET role = %s, updated_at = CURRENT_TIMESTAMP
WHERE id = %s
RETURNING id, email, role, created_at
""",
(role, user_id),
)
user = cursor.fetchone()
conn.commit()
self.connect()
with self.conn.cursor(cursor_factory=RealDictCursor) as cursor:
cursor.execute(
"""
UPDATE users
SET role = %s, updated_at = CURRENT_TIMESTAMP
WHERE id = %s
RETURNING id, email, role, created_at
""",
(role, user_id),
)
user = cursor.fetchone()
self.conn.commit()
return dict(user) if user else None
def delete_user(self, user_id: int) -> bool:
@@ -640,11 +651,12 @@ class DatabaseClient:
Returns:
True if deleted
"""
with self.get_conn() as conn:
with conn.cursor() as cursor:
cursor.execute("DELETE FROM users WHERE id = %s", (user_id,))
deleted = cursor.rowcount > 0
conn.commit()
self.connect()
with self.conn.cursor() as cursor:
cursor.execute("DELETE FROM users WHERE id = %s", (user_id,))
deleted = cursor.rowcount > 0
self.conn.commit()
return deleted
def get_user_count(self) -> int:
@@ -653,7 +665,8 @@ class DatabaseClient:
Returns:
Number of users
"""
with self.get_conn() as conn:
with conn.cursor() as cursor:
cursor.execute("SELECT COUNT(*) FROM users")
return cursor.fetchone()[0]
self.connect()
with self.conn.cursor() as cursor:
cursor.execute("SELECT COUNT(*) FROM users")
return cursor.fetchone()[0]
+302
View File
@@ -0,0 +1,302 @@
"""Tests for JWT authentication flow: register, login, protected routes, refresh, admin access."""
from datetime import datetime, timezone
from unittest.mock import MagicMock, patch
import pytest
from fastapi.testclient import TestClient
from SPARC.api import app
from SPARC.auth import create_access_token, create_refresh_token
@pytest.fixture
def client():
"""Create test client."""
return TestClient(app)
@pytest.fixture(autouse=True)
def mock_db(monkeypatch):
"""Mock the database client used by auth endpoints.
Returns a MagicMock with all DB methods pre-configured.
"""
db = MagicMock()
# Default: no users exist
db.get_user_count.return_value = 0
db.get_user_by_id.return_value = None
db.get_user_by_email.return_value = None
db.authenticate_user.return_value = None
db.create_user.return_value = None
db.get_all_users.return_value = []
db.update_user_role.return_value = None
db.delete_user.return_value = False
with patch("SPARC.api.get_db_client", return_value=db), \
patch("SPARC.auth.get_db_client", return_value=db):
yield db
def _make_admin_user():
return {
"id": 1,
"email": "admin@test.com",
"role": "admin",
"created_at": datetime(2025, 1, 1, tzinfo=timezone.utc),
}
def _make_regular_user():
return {
"id": 2,
"email": "user@test.com",
"role": "user",
"created_at": datetime(2025, 1, 1, tzinfo=timezone.utc),
}
def _auth_header(user_dict):
"""Create an Authorization header with a valid access token for the given user."""
token = create_access_token(user_dict["id"], user_dict["email"], user_dict["role"])
return {"Authorization": f"Bearer {token}"}
class TestRegister:
"""POST /auth/register"""
def test_register_first_user_becomes_admin(self, client, mock_db):
"""First registered user should get admin role."""
mock_db.get_user_count.return_value = 0
mock_db.create_user.return_value = {
"id": 1,
"email": "admin@test.com",
"role": "admin",
"created_at": datetime(2025, 1, 1, tzinfo=timezone.utc),
}
response = client.post(
"/auth/register",
json={"email": "admin@test.com", "password": "securepass123"},
)
assert response.status_code == 200
data = response.json()
assert data["email"] == "admin@test.com"
assert data["role"] == "admin"
mock_db.create_user.assert_called_once_with(
email="admin@test.com", password="securepass123", role="admin"
)
def test_register_subsequent_user_gets_user_role(self, client, mock_db):
"""Non-first user should get regular user role."""
mock_db.get_user_count.return_value = 1
mock_db.create_user.return_value = _make_regular_user()
response = client.post(
"/auth/register",
json={"email": "user@test.com", "password": "securepass123"},
)
assert response.status_code == 200
data = response.json()
assert data["role"] == "user"
def test_register_duplicate_email_returns_400(self, client, mock_db):
"""Registering with an existing email should return 400."""
mock_db.get_user_count.return_value = 1
mock_db.create_user.return_value = None # indicates duplicate
response = client.post(
"/auth/register",
json={"email": "existing@test.com", "password": "securepass123"},
)
assert response.status_code == 400
assert "already registered" in response.json()["detail"].lower()
class TestLogin:
"""POST /auth/login"""
def test_login_valid_credentials_returns_tokens(self, client, mock_db):
"""Valid credentials should return access and refresh tokens."""
user = _make_regular_user()
mock_db.authenticate_user.return_value = user
response = client.post(
"/auth/login",
json={"email": "user@test.com", "password": "correctpassword"},
)
assert response.status_code == 200
data = response.json()
assert "access_token" in data
assert "refresh_token" in data
assert data["token_type"] == "bearer"
def test_login_invalid_credentials_returns_401(self, client, mock_db):
"""Invalid credentials should return 401."""
mock_db.authenticate_user.return_value = None
response = client.post(
"/auth/login",
json={"email": "user@test.com", "password": "wrongpassword"},
)
assert response.status_code == 401
assert "invalid" in response.json()["detail"].lower()
class TestGetMe:
"""GET /auth/me"""
def test_valid_access_token_returns_user(self, client, mock_db):
"""A valid access token should return the user's data."""
user = _make_regular_user()
mock_db.get_user_by_id.return_value = user
response = client.get("/auth/me", headers=_auth_header(user))
assert response.status_code == 200
data = response.json()
assert data["email"] == "user@test.com"
assert data["id"] == 2
def test_missing_token_returns_401(self, client):
"""No token should return 401 (403 from HTTPBearer)."""
response = client.get("/auth/me")
assert response.status_code in (401, 403)
def test_expired_token_returns_401(self, client, mock_db):
"""An expired token should return 401."""
# Create a token that has already expired
from datetime import timedelta
import jwt as pyjwt
from SPARC.auth import JWT_ALGORITHM, JWT_SECRET
payload = {
"sub": "1",
"email": "user@test.com",
"role": "user",
"exp": datetime.now(timezone.utc) - timedelta(hours=1),
"type": "access",
}
expired_token = pyjwt.encode(payload, JWT_SECRET, algorithm=JWT_ALGORITHM)
response = client.get(
"/auth/me", headers={"Authorization": f"Bearer {expired_token}"}
)
assert response.status_code == 401
def test_refresh_token_as_access_returns_401(self, client, mock_db):
"""Using a refresh token as an access token should return 401."""
user = _make_regular_user()
refresh_token = create_refresh_token(user["id"], user["email"], user["role"])
response = client.get(
"/auth/me", headers={"Authorization": f"Bearer {refresh_token}"}
)
assert response.status_code == 401
class TestRefreshToken:
"""POST /auth/refresh"""
def test_valid_refresh_token_returns_new_tokens(self, client, mock_db):
"""A valid refresh token should issue new access and refresh tokens."""
user = _make_regular_user()
mock_db.get_user_by_id.return_value = user
refresh = create_refresh_token(user["id"], user["email"], user["role"])
response = client.post(
"/auth/refresh", json={"refresh_token": refresh}
)
assert response.status_code == 200
data = response.json()
assert "access_token" in data
assert "refresh_token" in data
def test_invalid_refresh_token_returns_401(self, client, mock_db):
"""An invalid refresh token should return 401."""
response = client.post(
"/auth/refresh", json={"refresh_token": "invalid-token-string"}
)
assert response.status_code == 401
def test_access_token_as_refresh_returns_401(self, client, mock_db):
"""Using an access token as a refresh token should return 401."""
user = _make_regular_user()
access = create_access_token(user["id"], user["email"], user["role"])
response = client.post(
"/auth/refresh", json={"refresh_token": access}
)
assert response.status_code == 401
class TestAdminUsers:
"""GET /admin/users and PATCH /admin/users/{id}/role"""
def test_admin_can_list_users(self, client, mock_db):
"""Admin token should allow listing users."""
admin = _make_admin_user()
mock_db.get_user_by_id.return_value = admin
mock_db.get_all_users.return_value = [admin, _make_regular_user()]
response = client.get("/admin/users", headers=_auth_header(admin))
assert response.status_code == 200
data = response.json()
assert len(data) == 2
def test_regular_user_cannot_list_users(self, client, mock_db):
"""Regular user token should be rejected with 403."""
user = _make_regular_user()
mock_db.get_user_by_id.return_value = user
response = client.get("/admin/users", headers=_auth_header(user))
assert response.status_code == 403
def test_no_token_cannot_list_users(self, client):
"""No token should be rejected."""
response = client.get("/admin/users")
assert response.status_code in (401, 403)
def test_admin_can_change_user_role(self, client, mock_db):
"""Admin should be able to change another user's role."""
admin = _make_admin_user()
mock_db.get_user_by_id.return_value = admin
mock_db.update_user_role.return_value = {
"id": 2,
"email": "user@test.com",
"role": "admin",
"created_at": datetime(2025, 1, 1, tzinfo=timezone.utc),
}
response = client.patch(
"/admin/users/2/role",
json={"role": "admin"},
headers=_auth_header(admin),
)
assert response.status_code == 200
assert response.json()["role"] == "admin"
def test_admin_cannot_change_own_role(self, client, mock_db):
"""Admin should not be able to change their own role."""
admin = _make_admin_user()
mock_db.get_user_by_id.return_value = admin
response = client.patch(
"/admin/users/1/role",
json={"role": "user"},
headers=_auth_header(admin),
)
assert response.status_code == 400
assert "own role" in response.json()["detail"].lower()