a0cb9a5773
Add GET /admin/rate-limits endpoint (admin-only) that returns current rate limit configuration and request statistics for all rate-limited endpoints (/auth/register and /auth/login). Tracks total requests and rejection counts via in-memory counters. Includes tests for admin access, non-admin rejection, empty state, request tracking, and configuration display. Closes leeworks-agents/SPARC#1675 Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
110 lines
3.9 KiB
Python
110 lines
3.9 KiB
Python
"""Tests for the /admin/rate-limits endpoint."""
|
|
|
|
from unittest.mock import patch
|
|
|
|
import pytest
|
|
from fastapi.testclient import TestClient
|
|
|
|
from SPARC import api
|
|
from SPARC.api import app
|
|
from SPARC.auth import UserResponse
|
|
|
|
|
|
@pytest.fixture
|
|
def client():
|
|
"""Create test client."""
|
|
return TestClient(app)
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def reset_stats():
|
|
"""Reset rate limit stats between tests."""
|
|
api._rate_limit_stats.clear()
|
|
yield
|
|
api._rate_limit_stats.clear()
|
|
|
|
|
|
def _mock_admin():
|
|
"""Return a mock admin user."""
|
|
return UserResponse(id=1, email="admin@test.com", role="admin", created_at="2025-01-01T00:00:00")
|
|
|
|
|
|
def _mock_user():
|
|
"""Return a mock non-admin user."""
|
|
return UserResponse(id=2, email="user@test.com", role="user", created_at="2025-01-01T00:00:00")
|
|
|
|
|
|
class TestRateLimitAdminEndpoint:
|
|
"""Test GET /admin/rate-limits."""
|
|
|
|
def test_admin_can_access(self, client):
|
|
"""Admin users should be able to access the rate-limits endpoint."""
|
|
app.dependency_overrides[api.get_current_admin] = _mock_admin
|
|
try:
|
|
response = client.get("/admin/rate-limits")
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert "rate_limits" in data
|
|
assert isinstance(data["rate_limits"], list)
|
|
finally:
|
|
app.dependency_overrides.clear()
|
|
|
|
def test_non_admin_rejected(self, client):
|
|
"""Non-admin users should get 403."""
|
|
# Without overriding the dependency, it should fail auth
|
|
response = client.get("/admin/rate-limits")
|
|
assert response.status_code in (401, 403)
|
|
|
|
def test_returns_configured_endpoints(self, client):
|
|
"""Should list all rate-limited endpoints."""
|
|
app.dependency_overrides[api.get_current_admin] = _mock_admin
|
|
try:
|
|
response = client.get("/admin/rate-limits")
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
endpoints = [rl["endpoint"] for rl in data["rate_limits"]]
|
|
assert "/auth/register" in endpoints
|
|
assert "/auth/login" in endpoints
|
|
finally:
|
|
app.dependency_overrides.clear()
|
|
|
|
def test_empty_state_shows_zero_counts(self, client):
|
|
"""When no requests have been made, counts should be zero."""
|
|
app.dependency_overrides[api.get_current_admin] = _mock_admin
|
|
try:
|
|
response = client.get("/admin/rate-limits")
|
|
data = response.json()
|
|
for rl in data["rate_limits"]:
|
|
assert rl["total_requests"] == 0
|
|
assert rl["rejected_requests"] == 0
|
|
finally:
|
|
app.dependency_overrides.clear()
|
|
|
|
def test_tracks_requests(self, client):
|
|
"""After making requests, the stats should reflect them."""
|
|
api._track_rate_limit_request("/auth/login", "127.0.0.1")
|
|
api._track_rate_limit_request("/auth/login", "127.0.0.1")
|
|
api._track_rate_limit_request("/auth/login", "192.168.1.1", rejected=True)
|
|
|
|
app.dependency_overrides[api.get_current_admin] = _mock_admin
|
|
try:
|
|
response = client.get("/admin/rate-limits")
|
|
data = response.json()
|
|
login_stats = next(rl for rl in data["rate_limits"] if rl["endpoint"] == "/auth/login")
|
|
assert login_stats["total_requests"] == 3
|
|
assert login_stats["rejected_requests"] == 1
|
|
finally:
|
|
app.dependency_overrides.clear()
|
|
|
|
def test_includes_limit_config(self, client):
|
|
"""Each endpoint entry should include the rate limit config string."""
|
|
app.dependency_overrides[api.get_current_admin] = _mock_admin
|
|
try:
|
|
response = client.get("/admin/rate-limits")
|
|
data = response.json()
|
|
for rl in data["rate_limits"]:
|
|
assert "limit" in rl
|
|
assert isinstance(rl["limit"], str)
|
|
finally:
|
|
app.dependency_overrides.clear()
|