diff --git a/tests/test_auth.py b/tests/test_auth.py new file mode 100644 index 0000000..de79259 --- /dev/null +++ b/tests/test_auth.py @@ -0,0 +1,302 @@ +"""Tests for JWT authentication flow: register, login, protected routes, refresh, admin access.""" + +from datetime import datetime, timezone +from unittest.mock import MagicMock, patch + +import pytest +from fastapi.testclient import TestClient + +from SPARC.api import app +from SPARC.auth import create_access_token, create_refresh_token + + +@pytest.fixture +def client(): + """Create test client.""" + return TestClient(app) + + +@pytest.fixture(autouse=True) +def mock_db(monkeypatch): + """Mock the database client used by auth endpoints. + + Returns a MagicMock with all DB methods pre-configured. + """ + db = MagicMock() + + # Default: no users exist + db.get_user_count.return_value = 0 + db.get_user_by_id.return_value = None + db.get_user_by_email.return_value = None + db.authenticate_user.return_value = None + db.create_user.return_value = None + db.get_all_users.return_value = [] + db.update_user_role.return_value = None + db.delete_user.return_value = False + + with patch("SPARC.api.get_db_client", return_value=db), \ + patch("SPARC.auth.get_db_client", return_value=db): + yield db + + +def _make_admin_user(): + return { + "id": 1, + "email": "admin@test.com", + "role": "admin", + "created_at": datetime(2025, 1, 1, tzinfo=timezone.utc), + } + + +def _make_regular_user(): + return { + "id": 2, + "email": "user@test.com", + "role": "user", + "created_at": datetime(2025, 1, 1, tzinfo=timezone.utc), + } + + +def _auth_header(user_dict): + """Create an Authorization header with a valid access token for the given user.""" + token = create_access_token(user_dict["id"], user_dict["email"], user_dict["role"]) + return {"Authorization": f"Bearer {token}"} + + +class TestRegister: + """POST /auth/register""" + + def test_register_first_user_becomes_admin(self, client, mock_db): + """First registered user should get admin role.""" + mock_db.get_user_count.return_value = 0 + mock_db.create_user.return_value = { + "id": 1, + "email": "admin@test.com", + "role": "admin", + "created_at": datetime(2025, 1, 1, tzinfo=timezone.utc), + } + + response = client.post( + "/auth/register", + json={"email": "admin@test.com", "password": "securepass123"}, + ) + + assert response.status_code == 200 + data = response.json() + assert data["email"] == "admin@test.com" + assert data["role"] == "admin" + mock_db.create_user.assert_called_once_with( + email="admin@test.com", password="securepass123", role="admin" + ) + + def test_register_subsequent_user_gets_user_role(self, client, mock_db): + """Non-first user should get regular user role.""" + mock_db.get_user_count.return_value = 1 + mock_db.create_user.return_value = _make_regular_user() + + response = client.post( + "/auth/register", + json={"email": "user@test.com", "password": "securepass123"}, + ) + + assert response.status_code == 200 + data = response.json() + assert data["role"] == "user" + + def test_register_duplicate_email_returns_400(self, client, mock_db): + """Registering with an existing email should return 400.""" + mock_db.get_user_count.return_value = 1 + mock_db.create_user.return_value = None # indicates duplicate + + response = client.post( + "/auth/register", + json={"email": "existing@test.com", "password": "securepass123"}, + ) + + assert response.status_code == 400 + assert "already registered" in response.json()["detail"].lower() + + +class TestLogin: + """POST /auth/login""" + + def test_login_valid_credentials_returns_tokens(self, client, mock_db): + """Valid credentials should return access and refresh tokens.""" + user = _make_regular_user() + mock_db.authenticate_user.return_value = user + + response = client.post( + "/auth/login", + json={"email": "user@test.com", "password": "correctpassword"}, + ) + + assert response.status_code == 200 + data = response.json() + assert "access_token" in data + assert "refresh_token" in data + assert data["token_type"] == "bearer" + + def test_login_invalid_credentials_returns_401(self, client, mock_db): + """Invalid credentials should return 401.""" + mock_db.authenticate_user.return_value = None + + response = client.post( + "/auth/login", + json={"email": "user@test.com", "password": "wrongpassword"}, + ) + + assert response.status_code == 401 + assert "invalid" in response.json()["detail"].lower() + + +class TestGetMe: + """GET /auth/me""" + + def test_valid_access_token_returns_user(self, client, mock_db): + """A valid access token should return the user's data.""" + user = _make_regular_user() + mock_db.get_user_by_id.return_value = user + + response = client.get("/auth/me", headers=_auth_header(user)) + + assert response.status_code == 200 + data = response.json() + assert data["email"] == "user@test.com" + assert data["id"] == 2 + + def test_missing_token_returns_401(self, client): + """No token should return 401 (403 from HTTPBearer).""" + response = client.get("/auth/me") + assert response.status_code in (401, 403) + + def test_expired_token_returns_401(self, client, mock_db): + """An expired token should return 401.""" + # Create a token that has already expired + from datetime import timedelta + + import jwt as pyjwt + from SPARC.auth import JWT_ALGORITHM, JWT_SECRET + + payload = { + "sub": "1", + "email": "user@test.com", + "role": "user", + "exp": datetime.now(timezone.utc) - timedelta(hours=1), + "type": "access", + } + expired_token = pyjwt.encode(payload, JWT_SECRET, algorithm=JWT_ALGORITHM) + + response = client.get( + "/auth/me", headers={"Authorization": f"Bearer {expired_token}"} + ) + assert response.status_code == 401 + + def test_refresh_token_as_access_returns_401(self, client, mock_db): + """Using a refresh token as an access token should return 401.""" + user = _make_regular_user() + refresh_token = create_refresh_token(user["id"], user["email"], user["role"]) + + response = client.get( + "/auth/me", headers={"Authorization": f"Bearer {refresh_token}"} + ) + assert response.status_code == 401 + + +class TestRefreshToken: + """POST /auth/refresh""" + + def test_valid_refresh_token_returns_new_tokens(self, client, mock_db): + """A valid refresh token should issue new access and refresh tokens.""" + user = _make_regular_user() + mock_db.get_user_by_id.return_value = user + refresh = create_refresh_token(user["id"], user["email"], user["role"]) + + response = client.post( + "/auth/refresh", json={"refresh_token": refresh} + ) + + assert response.status_code == 200 + data = response.json() + assert "access_token" in data + assert "refresh_token" in data + + def test_invalid_refresh_token_returns_401(self, client, mock_db): + """An invalid refresh token should return 401.""" + response = client.post( + "/auth/refresh", json={"refresh_token": "invalid-token-string"} + ) + assert response.status_code == 401 + + def test_access_token_as_refresh_returns_401(self, client, mock_db): + """Using an access token as a refresh token should return 401.""" + user = _make_regular_user() + access = create_access_token(user["id"], user["email"], user["role"]) + + response = client.post( + "/auth/refresh", json={"refresh_token": access} + ) + assert response.status_code == 401 + + +class TestAdminUsers: + """GET /admin/users and PATCH /admin/users/{id}/role""" + + def test_admin_can_list_users(self, client, mock_db): + """Admin token should allow listing users.""" + admin = _make_admin_user() + mock_db.get_user_by_id.return_value = admin + mock_db.get_all_users.return_value = [admin, _make_regular_user()] + + response = client.get("/admin/users", headers=_auth_header(admin)) + + assert response.status_code == 200 + data = response.json() + assert len(data) == 2 + + def test_regular_user_cannot_list_users(self, client, mock_db): + """Regular user token should be rejected with 403.""" + user = _make_regular_user() + mock_db.get_user_by_id.return_value = user + + response = client.get("/admin/users", headers=_auth_header(user)) + + assert response.status_code == 403 + + def test_no_token_cannot_list_users(self, client): + """No token should be rejected.""" + response = client.get("/admin/users") + assert response.status_code in (401, 403) + + def test_admin_can_change_user_role(self, client, mock_db): + """Admin should be able to change another user's role.""" + admin = _make_admin_user() + mock_db.get_user_by_id.return_value = admin + mock_db.update_user_role.return_value = { + "id": 2, + "email": "user@test.com", + "role": "admin", + "created_at": datetime(2025, 1, 1, tzinfo=timezone.utc), + } + + response = client.patch( + "/admin/users/2/role", + json={"role": "admin"}, + headers=_auth_header(admin), + ) + + assert response.status_code == 200 + assert response.json()["role"] == "admin" + + def test_admin_cannot_change_own_role(self, client, mock_db): + """Admin should not be able to change their own role.""" + admin = _make_admin_user() + mock_db.get_user_by_id.return_value = admin + + response = client.patch( + "/admin/users/1/role", + json={"role": "user"}, + headers=_auth_header(admin), + ) + + assert response.status_code == 400 + assert "own role" in response.json()["detail"].lower()