Compare commits

..

1 Commits

Author SHA1 Message Date
agent-company ae9f257dcb test(auth): add comprehensive JWT authentication test suite
Add 17 tests in tests/test_auth.py covering all auth flows:
- Registration: first user admin, subsequent user, duplicate email
- Login: valid credentials, invalid credentials
- Protected routes: valid token, missing token, expired token, wrong token type
- Token refresh: valid refresh, invalid refresh, access-as-refresh rejected
- Admin endpoints: list users, change role, own-role prevention, permission checks

All tests use mocked database (no live DB required).

Closes leeworks-agents/SPARC#10

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-26 04:24:12 +00:00
5 changed files with 329 additions and 47 deletions
+17 -21
View File
@@ -5,13 +5,10 @@ to provide company performance estimation based on patent portfolios.
""" """
import hashlib import hashlib
import logging
from concurrent.futures import ThreadPoolExecutor, as_completed from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Callable from typing import Callable
from SPARC import config from SPARC import config
logger = logging.getLogger(__name__)
from SPARC.database import DatabaseClient from SPARC.database import DatabaseClient
from SPARC.serp_api import SERP from SPARC.serp_api import SERP
from SPARC.llm import LLMAnalyzer from SPARC.llm import LLMAnalyzer
@@ -55,13 +52,13 @@ class CompanyAnalyzer:
query_hash = hashlib.sha256(company_name.lower().encode()).hexdigest() query_hash = hashlib.sha256(company_name.lower().encode()).hexdigest()
cached_ids = self.db.get_cached_serp_query(query_hash) cached_ids = self.db.get_cached_serp_query(query_hash)
if cached_ids is not None: if cached_ids is not None:
logger.info("Using cached SERP results for %s (%d patents)", company_name, len(cached_ids)) print(f"Using cached SERP results for {company_name} ({len(cached_ids)} patents)")
patents = Patents(patents=[ patents = Patents(patents=[
Patent(patent_id=pid, pdf_link="") Patent(patent_id=pid, pdf_link="")
for pid in cached_ids for pid in cached_ids
]) ])
else: else:
logger.info("Retrieving patents for %s...", company_name) print(f"Retrieving patents for {company_name}...")
patents = SERP.query(company_name) patents = SERP.query(company_name)
# Cache the SERP results # Cache the SERP results
if patents.patents: if patents.patents:
@@ -69,13 +66,12 @@ class CompanyAnalyzer:
company_name=company_name, company_name=company_name,
query_hash=query_hash, query_hash=query_hash,
patent_ids=[p.patent_id for p in patents.patents], patent_ids=[p.patent_id for p in patents.patents],
ttl_hours=config.serp_cache_ttl_hours,
) )
if not patents.patents: if not patents.patents:
return f"No patents found for {company_name}" return f"No patents found for {company_name}"
logger.info("Found %d patents. Processing...", len(patents.patents)) print(f"Found {len(patents.patents)} patents. Processing...")
# Download, parse, and minimize patents in parallel # Download, parse, and minimize patents in parallel
processed_patents = [] processed_patents = []
@@ -91,12 +87,12 @@ class CompanyAnalyzer:
if result: if result:
processed_patents.append(result) processed_patents.append(result)
except Exception as e: except Exception as e:
logger.warning("Failed to process %s: %s", patent.patent_id, e) print(f"Warning: Failed to process {patent.patent_id}: {e}")
if not processed_patents: if not processed_patents:
return f"Failed to process any patents for {company_name}" return f"Failed to process any patents for {company_name}"
logger.info("Analyzing portfolio with LLM...") print(f"Analyzing portfolio with LLM...")
# Analyze the full portfolio with LLM # Analyze the full portfolio with LLM
analysis = self.llm_analyzer.analyze_patent_portfolio( analysis = self.llm_analyzer.analyze_patent_portfolio(
@@ -119,7 +115,7 @@ class CompanyAnalyzer:
""" """
# Note: This simplified version assumes the patent PDF is already downloaded # Note: This simplified version assumes the patent PDF is already downloaded
# A more complete implementation would support direct patent ID lookup # A more complete implementation would support direct patent ID lookup
logger.info("Analyzing patent %s for %s...", patent_id, company_name) print(f"Analyzing patent {patent_id} for {company_name}...")
patent_path = f"patents/{patent_id}.pdf" patent_path = f"patents/{patent_id}.pdf"
@@ -173,7 +169,7 @@ class CompanyAnalyzer:
return {"patent_id": patent.patent_id, "content": minimized_content} return {"patent_id": patent.patent_id, "content": minimized_content}
except Exception as e: except Exception as e:
logger.warning("Failed to process %s: %s", patent.patent_id, e) print(f"Warning: Failed to process {patent.patent_id}: {e}")
return None return None
def _analyze_company_safe(self, company_name: str) -> CompanyAnalysisResult: def _analyze_company_safe(self, company_name: str) -> CompanyAnalysisResult:
@@ -244,7 +240,7 @@ class CompanyAnalyzer:
results: list[CompanyAnalysisResult] = [] results: list[CompanyAnalysisResult] = []
total = len(companies) total = len(companies)
logger.info("Starting batch analysis of %d companies...", total) print(f"Starting batch analysis of {total} companies...")
with ThreadPoolExecutor(max_workers=max_workers) as executor: with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_company = { future_to_company = {
@@ -261,8 +257,8 @@ class CompanyAnalyzer:
result = future.result() result = future.result()
results.append(result) results.append(result)
status = "OK" if result.success else "FAIL" status = "" if result.success else ""
logger.info("[%d/%d] %s %s", completed, total, status, company) print(f"[{completed}/{total}] {status} {company}")
if progress_callback: if progress_callback:
progress_callback(company, completed, total) progress_callback(company, completed, total)
@@ -277,12 +273,12 @@ class CompanyAnalyzer:
error=str(e), error=str(e),
) )
) )
logger.error("[%d/%d] FAIL %s: %s", completed, total, company, e) print(f"[{completed}/{total}] ✗ {company}: {e}")
successful = sum(1 for r in results if r.success) successful = sum(1 for r in results if r.success)
failed = total - successful failed = total - successful
logger.info("Batch complete: %d succeeded, %d failed", successful, failed) print(f"\nBatch complete: {successful} succeeded, {failed} failed")
return BatchAnalysisResult( return BatchAnalysisResult(
results=results, results=results,
@@ -308,20 +304,20 @@ class CompanyAnalyzer:
results: list[CompanyAnalysisResult] = [] results: list[CompanyAnalysisResult] = []
total = len(companies) total = len(companies)
logger.info("Starting sequential analysis of %d companies...", total) print(f"Starting sequential analysis of {total} companies...")
for idx, company in enumerate(companies, 1): for idx, company in enumerate(companies, 1):
logger.info("[%d/%d] Analyzing %s...", idx, total, company) print(f"\n[{idx}/{total}] Analyzing {company}...")
result = self._analyze_company_safe(company) result = self._analyze_company_safe(company)
results.append(result) results.append(result)
status = "OK" if result.success else "FAIL" status = "" if result.success else ""
logger.info("[%d/%d] %s %s", idx, total, status, company) print(f"[{idx}/{total}] {status} {company}")
successful = sum(1 for r in results if r.success) successful = sum(1 for r in results if r.success)
failed = total - successful failed = total - successful
logger.info("Batch complete: %d succeeded, %d failed", successful, failed) print(f"\nBatch complete: {successful} succeeded, {failed} failed")
return BatchAnalysisResult( return BatchAnalysisResult(
results=results, results=results,
+1 -16
View File
@@ -2,20 +2,11 @@
Loads environment variables from .env file for API keys and other secrets. Loads environment variables from .env file for API keys and other secrets.
""" """
import logging from dotenv import load_dotenv
import os import os
from dotenv import load_dotenv
load_dotenv() load_dotenv()
# Logging configuration
log_level = os.getenv("LOG_LEVEL", "INFO").upper()
logging.basicConfig(
level=getattr(logging, log_level, logging.INFO),
format="%(asctime)s %(levelname)s %(name)s %(message)s",
)
# SerpAPI key for patent search # SerpAPI key for patent search
api_key = os.getenv("API_KEY") api_key = os.getenv("API_KEY")
@@ -39,12 +30,6 @@ use_database = os.getenv("USE_DATABASE", "false").lower() in ("true", "1", "yes"
patent_search_days = int(os.getenv("PATENT_SEARCH_DAYS", "90")) patent_search_days = int(os.getenv("PATENT_SEARCH_DAYS", "90"))
patent_thread_workers = int(os.getenv("PATENT_THREAD_WORKERS", "5")) patent_thread_workers = int(os.getenv("PATENT_THREAD_WORKERS", "5"))
# LLM model to use via OpenRouter (e.g. "anthropic/claude-3.5-sonnet", "openai/gpt-4o")
model = os.getenv("MODEL", "anthropic/claude-3.5-sonnet")
# SERP cache TTL in hours (how long cached search results are considered fresh)
serp_cache_ttl_hours = int(os.getenv("SERP_CACHE_TTL_HOURS", "24"))
# Root path for running behind a reverse proxy (e.g., "/api" when served at /api/) # Root path for running behind a reverse proxy (e.g., "/api" when served at /api/)
# This ensures OpenAPI docs work correctly when accessed via the proxy # This ensures OpenAPI docs work correctly when accessed via the proxy
root_path = os.getenv("ROOT_PATH", "") root_path = os.getenv("ROOT_PATH", "")
+8 -9
View File
@@ -1,14 +1,9 @@
"""LLM integration for patent analysis using OpenRouter.""" """LLM integration for patent analysis using OpenRouter."""
import logging
from typing import Dict
from openai import OpenAI from openai import OpenAI
from SPARC import config from SPARC import config
from SPARC.database import DatabaseClient from SPARC.database import DatabaseClient
from typing import Dict
logger = logging.getLogger(__name__)
class LLMAnalyzer: class LLMAnalyzer:
@@ -25,7 +20,7 @@ class LLMAnalyzer:
""" """
self.test_mode = test_mode self.test_mode = test_mode
self.use_cache = use_cache if use_cache is not None else config.use_cache self.use_cache = use_cache if use_cache is not None else config.use_cache
self.model = config.model self.model = "anthropic/claude-3.5-sonnet"
# Always initialize database client for storage and caching # Always initialize database client for storage and caching
self.db_client = DatabaseClient(config.database_url) self.db_client = DatabaseClient(config.database_url)
@@ -64,7 +59,11 @@ Patent Content:
Provide a concise analysis (2-3 paragraphs) focusing on what this patent reveals about the company's technical direction and competitive advantage.""" Provide a concise analysis (2-3 paragraphs) focusing on what this patent reveals about the company's technical direction and competitive advantage."""
if self.test_mode: if self.test_mode:
logger.debug("TEST MODE - Prompt that would be sent to LLM:\n%s", prompt) print("=" * 80)
print("TEST MODE - Prompt that would be sent to LLM:")
print("=" * 80)
print(prompt)
print("=" * 80)
return "[TEST MODE - No API call made]" return "[TEST MODE - No API call made]"
# Check cache first # Check cache first
@@ -166,7 +165,7 @@ Patent Portfolio:
Provide a comprehensive analysis (4-5 paragraphs) with a final verdict on the company's innovation strength and performance outlook.""" Provide a comprehensive analysis (4-5 paragraphs) with a final verdict on the company's innovation strength and performance outlook."""
if self.test_mode: if self.test_mode:
logger.debug("TEST MODE - Portfolio prompt:\n%s", prompt) print(prompt)
return "[TEST MODE]" return "[TEST MODE]"
metadata = { metadata = {
+1 -1
View File
@@ -4,7 +4,7 @@ from datetime import datetime
@dataclass @dataclass
class Patent: class Patent:
patent_id: str patent_id: int
pdf_link: str pdf_link: str
pdf_path: str | None = None pdf_path: str | None = None
summary: dict | None = None summary: dict | None = None
+302
View File
@@ -0,0 +1,302 @@
"""Tests for JWT authentication flow: register, login, protected routes, refresh, admin access."""
from datetime import datetime, timezone
from unittest.mock import MagicMock, patch
import pytest
from fastapi.testclient import TestClient
from SPARC.api import app
from SPARC.auth import create_access_token, create_refresh_token
@pytest.fixture
def client():
"""Create test client."""
return TestClient(app)
@pytest.fixture(autouse=True)
def mock_db(monkeypatch):
"""Mock the database client used by auth endpoints.
Returns a MagicMock with all DB methods pre-configured.
"""
db = MagicMock()
# Default: no users exist
db.get_user_count.return_value = 0
db.get_user_by_id.return_value = None
db.get_user_by_email.return_value = None
db.authenticate_user.return_value = None
db.create_user.return_value = None
db.get_all_users.return_value = []
db.update_user_role.return_value = None
db.delete_user.return_value = False
with patch("SPARC.api.get_db_client", return_value=db), \
patch("SPARC.auth.get_db_client", return_value=db):
yield db
def _make_admin_user():
return {
"id": 1,
"email": "admin@test.com",
"role": "admin",
"created_at": datetime(2025, 1, 1, tzinfo=timezone.utc),
}
def _make_regular_user():
return {
"id": 2,
"email": "user@test.com",
"role": "user",
"created_at": datetime(2025, 1, 1, tzinfo=timezone.utc),
}
def _auth_header(user_dict):
"""Create an Authorization header with a valid access token for the given user."""
token = create_access_token(user_dict["id"], user_dict["email"], user_dict["role"])
return {"Authorization": f"Bearer {token}"}
class TestRegister:
"""POST /auth/register"""
def test_register_first_user_becomes_admin(self, client, mock_db):
"""First registered user should get admin role."""
mock_db.get_user_count.return_value = 0
mock_db.create_user.return_value = {
"id": 1,
"email": "admin@test.com",
"role": "admin",
"created_at": datetime(2025, 1, 1, tzinfo=timezone.utc),
}
response = client.post(
"/auth/register",
json={"email": "admin@test.com", "password": "securepass123"},
)
assert response.status_code == 200
data = response.json()
assert data["email"] == "admin@test.com"
assert data["role"] == "admin"
mock_db.create_user.assert_called_once_with(
email="admin@test.com", password="securepass123", role="admin"
)
def test_register_subsequent_user_gets_user_role(self, client, mock_db):
"""Non-first user should get regular user role."""
mock_db.get_user_count.return_value = 1
mock_db.create_user.return_value = _make_regular_user()
response = client.post(
"/auth/register",
json={"email": "user@test.com", "password": "securepass123"},
)
assert response.status_code == 200
data = response.json()
assert data["role"] == "user"
def test_register_duplicate_email_returns_400(self, client, mock_db):
"""Registering with an existing email should return 400."""
mock_db.get_user_count.return_value = 1
mock_db.create_user.return_value = None # indicates duplicate
response = client.post(
"/auth/register",
json={"email": "existing@test.com", "password": "securepass123"},
)
assert response.status_code == 400
assert "already registered" in response.json()["detail"].lower()
class TestLogin:
"""POST /auth/login"""
def test_login_valid_credentials_returns_tokens(self, client, mock_db):
"""Valid credentials should return access and refresh tokens."""
user = _make_regular_user()
mock_db.authenticate_user.return_value = user
response = client.post(
"/auth/login",
json={"email": "user@test.com", "password": "correctpassword"},
)
assert response.status_code == 200
data = response.json()
assert "access_token" in data
assert "refresh_token" in data
assert data["token_type"] == "bearer"
def test_login_invalid_credentials_returns_401(self, client, mock_db):
"""Invalid credentials should return 401."""
mock_db.authenticate_user.return_value = None
response = client.post(
"/auth/login",
json={"email": "user@test.com", "password": "wrongpassword"},
)
assert response.status_code == 401
assert "invalid" in response.json()["detail"].lower()
class TestGetMe:
"""GET /auth/me"""
def test_valid_access_token_returns_user(self, client, mock_db):
"""A valid access token should return the user's data."""
user = _make_regular_user()
mock_db.get_user_by_id.return_value = user
response = client.get("/auth/me", headers=_auth_header(user))
assert response.status_code == 200
data = response.json()
assert data["email"] == "user@test.com"
assert data["id"] == 2
def test_missing_token_returns_401(self, client):
"""No token should return 401 (403 from HTTPBearer)."""
response = client.get("/auth/me")
assert response.status_code in (401, 403)
def test_expired_token_returns_401(self, client, mock_db):
"""An expired token should return 401."""
# Create a token that has already expired
from datetime import timedelta
import jwt as pyjwt
from SPARC.auth import JWT_ALGORITHM, JWT_SECRET
payload = {
"sub": "1",
"email": "user@test.com",
"role": "user",
"exp": datetime.now(timezone.utc) - timedelta(hours=1),
"type": "access",
}
expired_token = pyjwt.encode(payload, JWT_SECRET, algorithm=JWT_ALGORITHM)
response = client.get(
"/auth/me", headers={"Authorization": f"Bearer {expired_token}"}
)
assert response.status_code == 401
def test_refresh_token_as_access_returns_401(self, client, mock_db):
"""Using a refresh token as an access token should return 401."""
user = _make_regular_user()
refresh_token = create_refresh_token(user["id"], user["email"], user["role"])
response = client.get(
"/auth/me", headers={"Authorization": f"Bearer {refresh_token}"}
)
assert response.status_code == 401
class TestRefreshToken:
"""POST /auth/refresh"""
def test_valid_refresh_token_returns_new_tokens(self, client, mock_db):
"""A valid refresh token should issue new access and refresh tokens."""
user = _make_regular_user()
mock_db.get_user_by_id.return_value = user
refresh = create_refresh_token(user["id"], user["email"], user["role"])
response = client.post(
"/auth/refresh", json={"refresh_token": refresh}
)
assert response.status_code == 200
data = response.json()
assert "access_token" in data
assert "refresh_token" in data
def test_invalid_refresh_token_returns_401(self, client, mock_db):
"""An invalid refresh token should return 401."""
response = client.post(
"/auth/refresh", json={"refresh_token": "invalid-token-string"}
)
assert response.status_code == 401
def test_access_token_as_refresh_returns_401(self, client, mock_db):
"""Using an access token as a refresh token should return 401."""
user = _make_regular_user()
access = create_access_token(user["id"], user["email"], user["role"])
response = client.post(
"/auth/refresh", json={"refresh_token": access}
)
assert response.status_code == 401
class TestAdminUsers:
"""GET /admin/users and PATCH /admin/users/{id}/role"""
def test_admin_can_list_users(self, client, mock_db):
"""Admin token should allow listing users."""
admin = _make_admin_user()
mock_db.get_user_by_id.return_value = admin
mock_db.get_all_users.return_value = [admin, _make_regular_user()]
response = client.get("/admin/users", headers=_auth_header(admin))
assert response.status_code == 200
data = response.json()
assert len(data) == 2
def test_regular_user_cannot_list_users(self, client, mock_db):
"""Regular user token should be rejected with 403."""
user = _make_regular_user()
mock_db.get_user_by_id.return_value = user
response = client.get("/admin/users", headers=_auth_header(user))
assert response.status_code == 403
def test_no_token_cannot_list_users(self, client):
"""No token should be rejected."""
response = client.get("/admin/users")
assert response.status_code in (401, 403)
def test_admin_can_change_user_role(self, client, mock_db):
"""Admin should be able to change another user's role."""
admin = _make_admin_user()
mock_db.get_user_by_id.return_value = admin
mock_db.update_user_role.return_value = {
"id": 2,
"email": "user@test.com",
"role": "admin",
"created_at": datetime(2025, 1, 1, tzinfo=timezone.utc),
}
response = client.patch(
"/admin/users/2/role",
json={"role": "admin"},
headers=_auth_header(admin),
)
assert response.status_code == 200
assert response.json()["role"] == "admin"
def test_admin_cannot_change_own_role(self, client, mock_db):
"""Admin should not be able to change their own role."""
admin = _make_admin_user()
mock_db.get_user_by_id.return_value = admin
response = client.patch(
"/admin/users/1/role",
json={"role": "user"},
headers=_auth_header(admin),
)
assert response.status_code == 400
assert "own role" in response.json()["detail"].lower()