"""Tests for JWT authentication flow: register, login, protected routes, refresh, admin access. Covers all five scenarios required by issue #1624: 1. Registration (POST /auth/register) 2. Login (POST /auth/login) 3. Protected route access (GET /auth/me) -- valid, missing, expired, wrong-type tokens 4. Token refresh (POST /auth/refresh) 5. Admin-only endpoints (GET /admin/users, PATCH role, DELETE user) All tests use mocked DB fixtures and require no live database. """ from datetime import datetime, timedelta, timezone from unittest.mock import MagicMock, patch import jwt as pyjwt import pytest from fastapi.testclient import TestClient from SPARC.api import app from SPARC.auth import ( JWT_ALGORITHM, JWT_SECRET, create_access_token, create_refresh_token, ) @pytest.fixture def client(): """Create test client.""" return TestClient(app) @pytest.fixture(autouse=True) def mock_db(monkeypatch): """Mock the database client used by auth endpoints. Returns a MagicMock with all DB methods pre-configured. """ db = MagicMock() # Default: no users exist db.get_user_count.return_value = 0 db.get_user_by_id.return_value = None db.get_user_by_email.return_value = None db.authenticate_user.return_value = None db.create_user.return_value = None db.get_all_users.return_value = [] db.update_user_role.return_value = None db.delete_user.return_value = False with patch("SPARC.api.get_db_client", return_value=db), \ patch("SPARC.auth.get_db_client", return_value=db): yield db def _make_admin_user(): return { "id": 1, "email": "admin@test.com", "role": "admin", "created_at": datetime(2025, 1, 1, tzinfo=timezone.utc), } def _make_regular_user(): return { "id": 2, "email": "user@test.com", "role": "user", "created_at": datetime(2025, 1, 1, tzinfo=timezone.utc), } def _auth_header(user_dict): """Create an Authorization header with a valid access token for the given user.""" token = create_access_token(user_dict["id"], user_dict["email"], user_dict["role"]) return {"Authorization": f"Bearer {token}"} class TestRegister: """POST /auth/register""" def test_register_first_user_becomes_admin(self, client, mock_db): """First registered user should get admin role.""" mock_db.get_user_count.return_value = 0 mock_db.create_user.return_value = { "id": 1, "email": "admin@test.com", "role": "admin", "created_at": datetime(2025, 1, 1, tzinfo=timezone.utc), } response = client.post( "/auth/register", json={"email": "admin@test.com", "password": "securepass123"}, ) assert response.status_code == 200 data = response.json() assert data["email"] == "admin@test.com" assert data["role"] == "admin" mock_db.create_user.assert_called_once_with( email="admin@test.com", password="securepass123", role="admin" ) def test_register_subsequent_user_gets_user_role(self, client, mock_db): """Non-first user should get regular user role.""" mock_db.get_user_count.return_value = 1 mock_db.create_user.return_value = _make_regular_user() response = client.post( "/auth/register", json={"email": "user@test.com", "password": "securepass123"}, ) assert response.status_code == 200 data = response.json() assert data["role"] == "user" def test_register_duplicate_email_returns_400(self, client, mock_db): """Registering with an existing email should return 400.""" mock_db.get_user_count.return_value = 1 mock_db.create_user.return_value = None # indicates duplicate response = client.post( "/auth/register", json={"email": "existing@test.com", "password": "securepass123"}, ) assert response.status_code == 400 assert "already registered" in response.json()["detail"].lower() class TestLogin: """POST /auth/login""" def test_login_valid_credentials_returns_tokens(self, client, mock_db): """Valid credentials should return access and refresh tokens.""" user = _make_regular_user() mock_db.authenticate_user.return_value = user response = client.post( "/auth/login", json={"email": "user@test.com", "password": "correctpassword"}, ) assert response.status_code == 200 data = response.json() assert "access_token" in data assert "refresh_token" in data assert data["token_type"] == "bearer" def test_login_invalid_credentials_returns_401(self, client, mock_db): """Invalid credentials should return 401.""" mock_db.authenticate_user.return_value = None response = client.post( "/auth/login", json={"email": "user@test.com", "password": "wrongpassword"}, ) assert response.status_code == 401 assert "invalid" in response.json()["detail"].lower() class TestGetMe: """GET /auth/me""" def test_valid_access_token_returns_user(self, client, mock_db): """A valid access token should return the user's data.""" user = _make_regular_user() mock_db.get_user_by_id.return_value = user response = client.get("/auth/me", headers=_auth_header(user)) assert response.status_code == 200 data = response.json() assert data["email"] == "user@test.com" assert data["id"] == 2 def test_missing_token_returns_401(self, client): """No token should return 401 (403 from HTTPBearer).""" response = client.get("/auth/me") assert response.status_code in (401, 403) def test_expired_token_returns_401(self, client, mock_db): """An expired token should return 401.""" payload = { "sub": "1", "email": "user@test.com", "role": "user", "exp": datetime.now(timezone.utc) - timedelta(hours=1), "type": "access", } expired_token = pyjwt.encode(payload, JWT_SECRET, algorithm=JWT_ALGORITHM) response = client.get( "/auth/me", headers={"Authorization": f"Bearer {expired_token}"} ) assert response.status_code == 401 def test_refresh_token_as_access_returns_401(self, client, mock_db): """Using a refresh token as an access token should return 401.""" user = _make_regular_user() refresh_token = create_refresh_token(user["id"], user["email"], user["role"]) response = client.get( "/auth/me", headers={"Authorization": f"Bearer {refresh_token}"} ) assert response.status_code == 401 class TestRefreshToken: """POST /auth/refresh""" def test_valid_refresh_token_returns_new_tokens(self, client, mock_db): """A valid refresh token should issue new access and refresh tokens.""" user = _make_regular_user() mock_db.get_user_by_id.return_value = user refresh = create_refresh_token(user["id"], user["email"], user["role"]) response = client.post( "/auth/refresh", json={"refresh_token": refresh} ) assert response.status_code == 200 data = response.json() assert "access_token" in data assert "refresh_token" in data def test_invalid_refresh_token_returns_401(self, client, mock_db): """An invalid refresh token should return 401.""" response = client.post( "/auth/refresh", json={"refresh_token": "invalid-token-string"} ) assert response.status_code == 401 def test_access_token_as_refresh_returns_401(self, client, mock_db): """Using an access token as a refresh token should return 401.""" user = _make_regular_user() access = create_access_token(user["id"], user["email"], user["role"]) response = client.post( "/auth/refresh", json={"refresh_token": access} ) assert response.status_code == 401 class TestAdminUsers: """GET /admin/users and PATCH /admin/users/{id}/role""" def test_admin_can_list_users(self, client, mock_db): """Admin token should allow listing users.""" admin = _make_admin_user() mock_db.get_user_by_id.return_value = admin mock_db.get_all_users.return_value = [admin, _make_regular_user()] response = client.get("/admin/users", headers=_auth_header(admin)) assert response.status_code == 200 data = response.json() assert len(data) == 2 def test_regular_user_cannot_list_users(self, client, mock_db): """Regular user token should be rejected with 403.""" user = _make_regular_user() mock_db.get_user_by_id.return_value = user response = client.get("/admin/users", headers=_auth_header(user)) assert response.status_code == 403 def test_no_token_cannot_list_users(self, client): """No token should be rejected.""" response = client.get("/admin/users") assert response.status_code in (401, 403) def test_admin_can_change_user_role(self, client, mock_db): """Admin should be able to change another user's role.""" admin = _make_admin_user() mock_db.get_user_by_id.return_value = admin mock_db.update_user_role.return_value = { "id": 2, "email": "user@test.com", "role": "admin", "created_at": datetime(2025, 1, 1, tzinfo=timezone.utc), } response = client.patch( "/admin/users/2/role", json={"role": "admin"}, headers=_auth_header(admin), ) assert response.status_code == 200 assert response.json()["role"] == "admin" def test_admin_cannot_change_own_role(self, client, mock_db): """Admin should not be able to change their own role.""" admin = _make_admin_user() mock_db.get_user_by_id.return_value = admin response = client.patch( "/admin/users/1/role", json={"role": "user"}, headers=_auth_header(admin), ) assert response.status_code == 400 assert "own role" in response.json()["detail"].lower() def test_role_change_nonexistent_user_returns_404(self, client, mock_db): """Changing role for a user that does not exist should return 404.""" admin = _make_admin_user() mock_db.get_user_by_id.return_value = admin mock_db.update_user_role.return_value = None response = client.patch( "/admin/users/999/role", json={"role": "admin"}, headers=_auth_header(admin), ) assert response.status_code == 404 assert "not found" in response.json()["detail"].lower() def test_regular_user_cannot_change_role(self, client, mock_db): """Non-admin user should receive 403 when trying to change roles.""" user = _make_regular_user() mock_db.get_user_by_id.return_value = user response = client.patch( "/admin/users/1/role", json={"role": "admin"}, headers=_auth_header(user), ) assert response.status_code == 403 class TestAdminDeleteUser: """DELETE /admin/users/{user_id}""" def test_admin_can_delete_user(self, client, mock_db): """Admin should be able to delete another user.""" admin = _make_admin_user() mock_db.get_user_by_id.return_value = admin mock_db.delete_user.return_value = True response = client.delete( "/admin/users/2", headers=_auth_header(admin), ) assert response.status_code == 200 assert "deleted" in response.json()["message"].lower() mock_db.delete_user.assert_called_once_with(2) def test_admin_cannot_delete_self(self, client, mock_db): """Admin should not be able to delete themselves.""" admin = _make_admin_user() mock_db.get_user_by_id.return_value = admin response = client.delete( "/admin/users/1", headers=_auth_header(admin), ) assert response.status_code == 400 assert "yourself" in response.json()["detail"].lower() def test_delete_nonexistent_user_returns_404(self, client, mock_db): """Deleting a user that does not exist should return 404.""" admin = _make_admin_user() mock_db.get_user_by_id.return_value = admin mock_db.delete_user.return_value = False response = client.delete( "/admin/users/999", headers=_auth_header(admin), ) assert response.status_code == 404 assert "not found" in response.json()["detail"].lower() def test_regular_user_cannot_delete_user(self, client, mock_db): """Non-admin user should receive 403 when trying to delete users.""" user = _make_regular_user() mock_db.get_user_by_id.return_value = user response = client.delete( "/admin/users/1", headers=_auth_header(user), ) assert response.status_code == 403 def test_no_token_cannot_delete_user(self, client): """Missing token should be rejected for delete endpoint.""" response = client.delete("/admin/users/1") assert response.status_code in (401, 403) class TestEdgeCases: """Additional edge-case tests for auth robustness.""" def test_register_invalid_email_returns_422(self, client, mock_db): """Registration with an invalid email format should return 422.""" response = client.post( "/auth/register", json={"email": "not-an-email", "password": "securepass123"}, ) assert response.status_code == 422 def test_register_short_password_returns_422(self, client, mock_db): """Registration with a password shorter than 8 chars should return 422.""" response = client.post( "/auth/register", json={"email": "user@test.com", "password": "short"}, ) assert response.status_code == 422 def test_register_missing_fields_returns_422(self, client, mock_db): """Registration with missing fields should return 422.""" response = client.post("/auth/register", json={}) assert response.status_code == 422 def test_login_missing_fields_returns_422(self, client, mock_db): """Login with missing fields should return 422.""" response = client.post("/auth/login", json={"email": "user@test.com"}) assert response.status_code == 422 def test_malformed_token_returns_401(self, client, mock_db): """A completely malformed token string should return 401.""" response = client.get( "/auth/me", headers={"Authorization": "Bearer not.a.valid.jwt.token"}, ) assert response.status_code == 401 def test_token_with_wrong_secret_returns_401(self, client, mock_db): """A token signed with a different secret should return 401.""" payload = { "sub": "1", "email": "user@test.com", "role": "user", "exp": datetime.now(timezone.utc) + timedelta(hours=1), "type": "access", } wrong_secret_token = pyjwt.encode(payload, "wrong-secret", algorithm=JWT_ALGORITHM) response = client.get( "/auth/me", headers={"Authorization": f"Bearer {wrong_secret_token}"}, ) assert response.status_code == 401 def test_token_for_deleted_user_returns_401(self, client, mock_db): """A valid token for a user no longer in the DB should return 401.""" user = _make_regular_user() mock_db.get_user_by_id.return_value = None # user was deleted response = client.get("/auth/me", headers=_auth_header(user)) assert response.status_code == 401 def test_refresh_for_deleted_user_returns_401(self, client, mock_db): """Refreshing a token for a deleted user should return 401.""" user = _make_regular_user() mock_db.get_user_by_id.return_value = None refresh = create_refresh_token(user["id"], user["email"], user["role"]) response = client.post( "/auth/refresh", json={"refresh_token": refresh} ) assert response.status_code == 401 def test_login_returns_decodable_tokens(self, client, mock_db): """Tokens returned by login should be decodable and contain expected claims.""" user = _make_regular_user() mock_db.authenticate_user.return_value = user response = client.post( "/auth/login", json={"email": "user@test.com", "password": "correctpassword"}, ) data = response.json() access_payload = pyjwt.decode( data["access_token"], JWT_SECRET, algorithms=[JWT_ALGORITHM] ) assert access_payload["sub"] == str(user["id"]) assert access_payload["email"] == user["email"] assert access_payload["type"] == "access" refresh_payload = pyjwt.decode( data["refresh_token"], JWT_SECRET, algorithms=[JWT_ALGORITHM] ) assert refresh_payload["type"] == "refresh"