From a0cb9a57730008767f420053097560ac94e60d67 Mon Sep 17 00:00:00 2001 From: agent-company Date: Mon, 18 May 2026 21:53:01 +0000 Subject: [PATCH] Add rate limit status and usage statistics to admin panel Add GET /admin/rate-limits endpoint (admin-only) that returns current rate limit configuration and request statistics for all rate-limited endpoints (/auth/register and /auth/login). Tracks total requests and rejection counts via in-memory counters. Includes tests for admin access, non-admin rejection, empty state, request tracking, and configuration display. Closes leeworks-agents/SPARC#1675 Co-Authored-By: Claude Opus 4.6 (1M context) --- SPARC/api.py | 59 ++++++++++++++++++ tests/test_rate_limit_admin.py | 109 +++++++++++++++++++++++++++++++++ 2 files changed, 168 insertions(+) create mode 100644 tests/test_rate_limit_admin.py diff --git a/SPARC/api.py b/SPARC/api.py index a42ddd7..789ba70 100644 --- a/SPARC/api.py +++ b/SPARC/api.py @@ -217,10 +217,37 @@ app = FastAPI( limiter = Limiter(key_func=get_remote_address) app.state.limiter = limiter +# In-memory rate limit statistics +_rate_limit_stats: dict[str, dict] = {} + + +def _track_rate_limit_request(endpoint: str, ip: str, rejected: bool = False) -> None: + """Record a request against a rate-limited endpoint.""" + key = endpoint + if key not in _rate_limit_stats: + _rate_limit_stats[key] = { + "endpoint": endpoint, + "total_requests": 0, + "rejected_requests": 0, + "by_ip": {}, + } + _rate_limit_stats[key]["total_requests"] += 1 + if rejected: + _rate_limit_stats[key]["rejected_requests"] += 1 + ip_stats = _rate_limit_stats[key].setdefault("by_ip", {}) + if ip not in ip_stats: + ip_stats[ip] = {"total": 0, "rejected": 0} + ip_stats[ip]["total"] += 1 + if rejected: + ip_stats[ip]["rejected"] += 1 + @app.exception_handler(RateLimitExceeded) async def rate_limit_handler(request: Request, exc: RateLimitExceeded): """Return 429 with Retry-After header when rate limit is exceeded.""" + endpoint = request.url.path + ip = get_remote_address(request) + _track_rate_limit_request(endpoint, ip, rejected=True) retry_after = getattr(exc, "retry_after", 60) return JSONResponse( status_code=429, @@ -249,6 +276,7 @@ async def register(request: Request, body: RegisterRequest): The first registered user automatically becomes an admin. """ + _track_rate_limit_request("/auth/register", get_remote_address(request)) db = get_db_client() # First user becomes admin @@ -279,6 +307,7 @@ async def register(request: Request, body: RegisterRequest): @limiter.limit("10/minute") async def login(request: Request, body: LoginRequest): """Authenticate user and return JWT tokens.""" + _track_rate_limit_request("/auth/login", get_remote_address(request)) db = get_db_client() user = db.authenticate_user(body.email, body.password) @@ -443,6 +472,36 @@ async def remove_tracked_company( return {"message": f"Stopped tracking {company_name}"} +@app.get("/admin/rate-limits", tags=["Admin"]) +async def get_rate_limit_stats( + _: UserResponse = Depends(get_current_admin), +): + """Get rate limit status and usage statistics (admin only). + + Returns current rate limit configuration and request statistics + for all rate-limited endpoints. + + Returns: + List of rate limit stats per endpoint with total/rejected counts + """ + rate_limits_config = { + "/auth/register": {"limit": "5/minute"}, + "/auth/login": {"limit": "10/minute"}, + } + + results = [] + for endpoint, conf in rate_limits_config.items(): + stats = _rate_limit_stats.get(endpoint, {}) + results.append({ + "endpoint": endpoint, + "limit": conf["limit"], + "total_requests": stats.get("total_requests", 0), + "rejected_requests": stats.get("rejected_requests", 0), + }) + + return {"rate_limits": results} + + @app.get("/admin/alerts", tags=["Admin"]) async def list_alerts( limit: int = Query(default=50, ge=1, le=200), diff --git a/tests/test_rate_limit_admin.py b/tests/test_rate_limit_admin.py new file mode 100644 index 0000000..bc63a5a --- /dev/null +++ b/tests/test_rate_limit_admin.py @@ -0,0 +1,109 @@ +"""Tests for the /admin/rate-limits endpoint.""" + +from unittest.mock import patch + +import pytest +from fastapi.testclient import TestClient + +from SPARC import api +from SPARC.api import app +from SPARC.auth import UserResponse + + +@pytest.fixture +def client(): + """Create test client.""" + return TestClient(app) + + +@pytest.fixture(autouse=True) +def reset_stats(): + """Reset rate limit stats between tests.""" + api._rate_limit_stats.clear() + yield + api._rate_limit_stats.clear() + + +def _mock_admin(): + """Return a mock admin user.""" + return UserResponse(id=1, email="admin@test.com", role="admin", created_at="2025-01-01T00:00:00") + + +def _mock_user(): + """Return a mock non-admin user.""" + return UserResponse(id=2, email="user@test.com", role="user", created_at="2025-01-01T00:00:00") + + +class TestRateLimitAdminEndpoint: + """Test GET /admin/rate-limits.""" + + def test_admin_can_access(self, client): + """Admin users should be able to access the rate-limits endpoint.""" + app.dependency_overrides[api.get_current_admin] = _mock_admin + try: + response = client.get("/admin/rate-limits") + assert response.status_code == 200 + data = response.json() + assert "rate_limits" in data + assert isinstance(data["rate_limits"], list) + finally: + app.dependency_overrides.clear() + + def test_non_admin_rejected(self, client): + """Non-admin users should get 403.""" + # Without overriding the dependency, it should fail auth + response = client.get("/admin/rate-limits") + assert response.status_code in (401, 403) + + def test_returns_configured_endpoints(self, client): + """Should list all rate-limited endpoints.""" + app.dependency_overrides[api.get_current_admin] = _mock_admin + try: + response = client.get("/admin/rate-limits") + assert response.status_code == 200 + data = response.json() + endpoints = [rl["endpoint"] for rl in data["rate_limits"]] + assert "/auth/register" in endpoints + assert "/auth/login" in endpoints + finally: + app.dependency_overrides.clear() + + def test_empty_state_shows_zero_counts(self, client): + """When no requests have been made, counts should be zero.""" + app.dependency_overrides[api.get_current_admin] = _mock_admin + try: + response = client.get("/admin/rate-limits") + data = response.json() + for rl in data["rate_limits"]: + assert rl["total_requests"] == 0 + assert rl["rejected_requests"] == 0 + finally: + app.dependency_overrides.clear() + + def test_tracks_requests(self, client): + """After making requests, the stats should reflect them.""" + api._track_rate_limit_request("/auth/login", "127.0.0.1") + api._track_rate_limit_request("/auth/login", "127.0.0.1") + api._track_rate_limit_request("/auth/login", "192.168.1.1", rejected=True) + + app.dependency_overrides[api.get_current_admin] = _mock_admin + try: + response = client.get("/admin/rate-limits") + data = response.json() + login_stats = next(rl for rl in data["rate_limits"] if rl["endpoint"] == "/auth/login") + assert login_stats["total_requests"] == 3 + assert login_stats["rejected_requests"] == 1 + finally: + app.dependency_overrides.clear() + + def test_includes_limit_config(self, client): + """Each endpoint entry should include the rate limit config string.""" + app.dependency_overrides[api.get_current_admin] = _mock_admin + try: + response = client.get("/admin/rate-limits") + data = response.json() + for rl in data["rate_limits"]: + assert "limit" in rl + assert isinstance(rl["limit"], str) + finally: + app.dependency_overrides.clear()