"""Tests for JWT authentication flow: register, login, protected routes, refresh, admin access.""" from datetime import datetime, timezone from unittest.mock import MagicMock, patch import pytest from fastapi.testclient import TestClient from SPARC.api import app from SPARC.auth import create_access_token, create_refresh_token @pytest.fixture def client(): """Create test client.""" return TestClient(app) @pytest.fixture(autouse=True) def mock_db(monkeypatch): """Mock the database client used by auth endpoints. Returns a MagicMock with all DB methods pre-configured. """ db = MagicMock() # Default: no users exist db.get_user_count.return_value = 0 db.get_user_by_id.return_value = None db.get_user_by_email.return_value = None db.authenticate_user.return_value = None db.create_user.return_value = None db.get_all_users.return_value = [] db.update_user_role.return_value = None db.delete_user.return_value = False with patch("SPARC.api.get_db_client", return_value=db), \ patch("SPARC.auth.get_db_client", return_value=db): yield db def _make_admin_user(): return { "id": 1, "email": "admin@test.com", "role": "admin", "created_at": datetime(2025, 1, 1, tzinfo=timezone.utc), } def _make_regular_user(): return { "id": 2, "email": "user@test.com", "role": "user", "created_at": datetime(2025, 1, 1, tzinfo=timezone.utc), } def _auth_header(user_dict): """Create an Authorization header with a valid access token for the given user.""" token = create_access_token(user_dict["id"], user_dict["email"], user_dict["role"]) return {"Authorization": f"Bearer {token}"} class TestRegister: """POST /auth/register""" def test_register_first_user_becomes_admin(self, client, mock_db): """First registered user should get admin role.""" mock_db.get_user_count.return_value = 0 mock_db.create_user.return_value = { "id": 1, "email": "admin@test.com", "role": "admin", "created_at": datetime(2025, 1, 1, tzinfo=timezone.utc), } response = client.post( "/auth/register", json={"email": "admin@test.com", "password": "securepass123"}, ) assert response.status_code == 200 data = response.json() assert data["email"] == "admin@test.com" assert data["role"] == "admin" mock_db.create_user.assert_called_once_with( email="admin@test.com", password="securepass123", role="admin" ) def test_register_subsequent_user_gets_user_role(self, client, mock_db): """Non-first user should get regular user role.""" mock_db.get_user_count.return_value = 1 mock_db.create_user.return_value = _make_regular_user() response = client.post( "/auth/register", json={"email": "user@test.com", "password": "securepass123"}, ) assert response.status_code == 200 data = response.json() assert data["role"] == "user" def test_register_duplicate_email_returns_400(self, client, mock_db): """Registering with an existing email should return 400.""" mock_db.get_user_count.return_value = 1 mock_db.create_user.return_value = None # indicates duplicate response = client.post( "/auth/register", json={"email": "existing@test.com", "password": "securepass123"}, ) assert response.status_code == 400 assert "already registered" in response.json()["detail"].lower() class TestLogin: """POST /auth/login""" def test_login_valid_credentials_returns_tokens(self, client, mock_db): """Valid credentials should return access and refresh tokens.""" user = _make_regular_user() mock_db.authenticate_user.return_value = user response = client.post( "/auth/login", json={"email": "user@test.com", "password": "correctpassword"}, ) assert response.status_code == 200 data = response.json() assert "access_token" in data assert "refresh_token" in data assert data["token_type"] == "bearer" def test_login_invalid_credentials_returns_401(self, client, mock_db): """Invalid credentials should return 401.""" mock_db.authenticate_user.return_value = None response = client.post( "/auth/login", json={"email": "user@test.com", "password": "wrongpassword"}, ) assert response.status_code == 401 assert "invalid" in response.json()["detail"].lower() class TestGetMe: """GET /auth/me""" def test_valid_access_token_returns_user(self, client, mock_db): """A valid access token should return the user's data.""" user = _make_regular_user() mock_db.get_user_by_id.return_value = user response = client.get("/auth/me", headers=_auth_header(user)) assert response.status_code == 200 data = response.json() assert data["email"] == "user@test.com" assert data["id"] == 2 def test_missing_token_returns_401(self, client): """No token should return 401 (403 from HTTPBearer).""" response = client.get("/auth/me") assert response.status_code in (401, 403) def test_expired_token_returns_401(self, client, mock_db): """An expired token should return 401.""" # Create a token that has already expired from datetime import timedelta import jwt as pyjwt from SPARC.auth import JWT_ALGORITHM, JWT_SECRET payload = { "sub": "1", "email": "user@test.com", "role": "user", "exp": datetime.now(timezone.utc) - timedelta(hours=1), "type": "access", } expired_token = pyjwt.encode(payload, JWT_SECRET, algorithm=JWT_ALGORITHM) response = client.get( "/auth/me", headers={"Authorization": f"Bearer {expired_token}"} ) assert response.status_code == 401 def test_refresh_token_as_access_returns_401(self, client, mock_db): """Using a refresh token as an access token should return 401.""" user = _make_regular_user() refresh_token = create_refresh_token(user["id"], user["email"], user["role"]) response = client.get( "/auth/me", headers={"Authorization": f"Bearer {refresh_token}"} ) assert response.status_code == 401 class TestRefreshToken: """POST /auth/refresh""" def test_valid_refresh_token_returns_new_tokens(self, client, mock_db): """A valid refresh token should issue new access and refresh tokens.""" user = _make_regular_user() mock_db.get_user_by_id.return_value = user refresh = create_refresh_token(user["id"], user["email"], user["role"]) response = client.post( "/auth/refresh", json={"refresh_token": refresh} ) assert response.status_code == 200 data = response.json() assert "access_token" in data assert "refresh_token" in data def test_invalid_refresh_token_returns_401(self, client, mock_db): """An invalid refresh token should return 401.""" response = client.post( "/auth/refresh", json={"refresh_token": "invalid-token-string"} ) assert response.status_code == 401 def test_access_token_as_refresh_returns_401(self, client, mock_db): """Using an access token as a refresh token should return 401.""" user = _make_regular_user() access = create_access_token(user["id"], user["email"], user["role"]) response = client.post( "/auth/refresh", json={"refresh_token": access} ) assert response.status_code == 401 class TestAdminUsers: """GET /admin/users and PATCH /admin/users/{id}/role""" def test_admin_can_list_users(self, client, mock_db): """Admin token should allow listing users.""" admin = _make_admin_user() mock_db.get_user_by_id.return_value = admin mock_db.get_all_users.return_value = [admin, _make_regular_user()] response = client.get("/admin/users", headers=_auth_header(admin)) assert response.status_code == 200 data = response.json() assert len(data) == 2 def test_regular_user_cannot_list_users(self, client, mock_db): """Regular user token should be rejected with 403.""" user = _make_regular_user() mock_db.get_user_by_id.return_value = user response = client.get("/admin/users", headers=_auth_header(user)) assert response.status_code == 403 def test_no_token_cannot_list_users(self, client): """No token should be rejected.""" response = client.get("/admin/users") assert response.status_code in (401, 403) def test_admin_can_change_user_role(self, client, mock_db): """Admin should be able to change another user's role.""" admin = _make_admin_user() mock_db.get_user_by_id.return_value = admin mock_db.update_user_role.return_value = { "id": 2, "email": "user@test.com", "role": "admin", "created_at": datetime(2025, 1, 1, tzinfo=timezone.utc), } response = client.patch( "/admin/users/2/role", json={"role": "admin"}, headers=_auth_header(admin), ) assert response.status_code == 200 assert response.json()["role"] == "admin" def test_admin_cannot_change_own_role(self, client, mock_db): """Admin should not be able to change their own role.""" admin = _make_admin_user() mock_db.get_user_by_id.return_value = admin response = client.patch( "/admin/users/1/role", json={"role": "user"}, headers=_auth_header(admin), ) assert response.status_code == 400 assert "own role" in response.json()["detail"].lower()