+ {/* Header */}
+
+
+ Historical Analysis Diff
+
+
+ Compare analysis runs for the same company to see what changed between them.
+
+
+
+ {/* Company Search */}
+
+
+ {/* History list */}
+ {company && historyQuery.isLoading && (
+
Loading analysis history...
+ )}
+
+ {company && historyQuery.isError && (
+
+
+
Failed to load history. Check the company name and try again.
+
+ )}
+
+ {company && history.length === 0 && !historyQuery.isLoading && (
+
No analysis history found for "{company}".
+ )}
+
+ {history.length >= 2 && (
+
+
+ Select Two Runs to Compare
+
+
+ {history.map((item, idx) => {
+ const next = history[idx + 1];
+ if (!next) return null;
+ const isSelected =
+ fromId === String(next.id) && toId === String(item.id);
+ return (
+
+ );
+ })}
+
+
+ )}
+
+ {/* Diff Results */}
+ {diffQuery.isLoading && (
+
Computing diff...
+ )}
+
+ {diffQuery.isError && (
+
+
+
Failed to compute diff. One or both analysis IDs may not exist.
+
+ )}
+
+ {diffQuery.data &&
}
+
+ );
+}
+
+function DiffView({ diff }: { diff: AnalysisDiff }) {
+ return (
+
+
+ Diff: #{diff.from_id} → #{diff.to_id}
+
+
+ {/* Summary */}
+
+
{diff.summary}
+
+
{new Date(diff.from_timestamp).toLocaleString()}
+
+
{new Date(diff.to_timestamp).toLocaleString()}
+
+
+
+ {/* Patent count delta */}
+
+ Patent mention delta:
+ 0
+ ? 'text-success'
+ : diff.patent_count_delta < 0
+ ? 'text-error'
+ : 'text-text-secondary'
+ }`}
+ >
+ {diff.patent_count_delta > 0 ? '+' : ''}
+ {diff.patent_count_delta}
+
+
+
+ {/* Added patents */}
+ {diff.added_patents.length > 0 && (
+
+
+
+ New Patents ({diff.added_patents.length})
+
+
+ {diff.added_patents.map((p) => (
+
+ {p}
+
+ ))}
+
+
+ )}
+
+ {/* Removed patents */}
+ {diff.removed_patents.length > 0 && (
+
+
+
+ Removed Patents ({diff.removed_patents.length})
+
+
+ {diff.removed_patents.map((p) => (
+
+ {p}
+
+ ))}
+
+
+ )}
+
+ {/* Changed fields */}
+ {Object.keys(diff.changed_fields).length > 0 && (
+
+
Changed Fields
+
+ {Object.entries(diff.changed_fields).map(([field, vals]) => (
+
+
{field}:
+
{vals.from || 'null'}
+
+
{vals.to || 'null'}
+
+ ))}
+
+
+ )}
+
+ );
+}
diff --git a/tests/test_analysis_diff.py b/tests/test_analysis_diff.py
new file mode 100644
index 0000000..c867b55
--- /dev/null
+++ b/tests/test_analysis_diff.py
@@ -0,0 +1,244 @@
+"""Tests for historical analysis diff endpoint."""
+
+from datetime import datetime
+from unittest.mock import MagicMock, patch
+
+import pytest
+from fastapi.testclient import TestClient
+
+from SPARC.api import AnalysisDiffResponse, _compute_analysis_diff, _extract_patent_ids, app
+from SPARC.auth import UserResponse, get_current_user
+
+
+# ---------- helpers ----------
+
+def _mock_user():
+ """Return a fake authenticated user for dependency override."""
+ return UserResponse(
+ id=1,
+ email="test@example.com",
+ role="user",
+ created_at=datetime(2025, 1, 1),
+ )
+
+
+@pytest.fixture
+def auth_client():
+ """TestClient with auth dependency overridden."""
+ app.dependency_overrides[get_current_user] = _mock_user
+ client = TestClient(app, raise_server_exceptions=False)
+ yield client
+ app.dependency_overrides.clear()
+
+
+# ---------- unit tests for helpers ----------
+
+class TestExtractPatentIds:
+ """Test _extract_patent_ids utility."""
+
+ def test_extracts_standard_ids(self):
+ text = "Patent US-12345678-B2 covers the device. Also see US-9876543-A1."
+ ids = _extract_patent_ids(text)
+ assert "US-12345678-B2" in ids
+ assert "US-9876543-A1" in ids
+
+ def test_empty_text(self):
+ assert _extract_patent_ids("") == set()
+ assert _extract_patent_ids(None) == set() # type: ignore[arg-type]
+
+
+class TestComputeAnalysisDiff:
+ """Test _compute_analysis_diff logic."""
+
+ def test_identical_analyses(self):
+ rec = {
+ "id": 1,
+ "company_name": "nvidia",
+ "analysis_type": "portfolio",
+ "model": "openai/gpt-4o",
+ "response": "Patent US-12345678-B2 is notable.",
+ "timestamp": datetime(2025, 5, 1),
+ }
+ diff = _compute_analysis_diff(rec, dict(rec, id=2, timestamp=datetime(2025, 5, 2)))
+ assert diff.patent_count_delta == 0
+ assert diff.added_patents == []
+ assert diff.removed_patents == []
+
+ def test_added_and_removed_patents(self):
+ from_rec = {
+ "id": 1,
+ "company_name": "nvidia",
+ "analysis_type": "portfolio",
+ "model": "openai/gpt-4o",
+ "response": "Patent US-12345678-B2 and US-11111111-A1.",
+ "timestamp": datetime(2025, 5, 1),
+ }
+ to_rec = {
+ "id": 2,
+ "company_name": "nvidia",
+ "analysis_type": "portfolio",
+ "model": "openai/gpt-4o",
+ "response": "Patent US-12345678-B2 and US-99999999-B1.",
+ "timestamp": datetime(2025, 5, 2),
+ }
+ diff = _compute_analysis_diff(from_rec, to_rec)
+ assert "US-99999999-B1" in diff.added_patents
+ assert "US-11111111-A1" in diff.removed_patents
+ assert diff.patent_count_delta == 0 # one added, one removed
+
+ def test_model_change_detected(self):
+ from_rec = {
+ "id": 1,
+ "company_name": "nvidia",
+ "analysis_type": "portfolio",
+ "model": "openai/gpt-4o",
+ "response": "",
+ "timestamp": datetime(2025, 5, 1),
+ }
+ to_rec = {
+ "id": 2,
+ "company_name": "nvidia",
+ "analysis_type": "portfolio",
+ "model": "anthropic/claude-3.5-sonnet",
+ "response": "",
+ "timestamp": datetime(2025, 5, 2),
+ }
+ diff = _compute_analysis_diff(from_rec, to_rec)
+ assert "model" in diff.changed_fields
+ assert diff.changed_fields["model"]["from"] == "openai/gpt-4o"
+ assert diff.changed_fields["model"]["to"] == "anthropic/claude-3.5-sonnet"
+
+
+# ---------- API endpoint tests ----------
+
+class TestDiffEndpoint:
+ """Test GET /analyze/{company_name}/diff."""
+
+ @patch("SPARC.api._get_job_db")
+ def test_happy_path(self, mock_get_db, auth_client):
+ """Diff returns structured response when both IDs exist."""
+ db = MagicMock()
+ mock_get_db.return_value = db
+
+ from_rec = {
+ "id": 10,
+ "company_name": "nvidia",
+ "analysis_type": "portfolio",
+ "model": "openai/gpt-4o",
+ "response": "Patent US-12345678-B2 found.",
+ "timestamp": datetime(2025, 5, 1),
+ }
+ to_rec = {
+ "id": 20,
+ "company_name": "nvidia",
+ "analysis_type": "portfolio",
+ "model": "openai/gpt-4o",
+ "response": "Patent US-12345678-B2 and US-99999999-A1 found.",
+ "timestamp": datetime(2025, 5, 10),
+ }
+ db.get_analysis_by_id.side_effect = lambda aid: from_rec if aid == 10 else to_rec
+
+ response = auth_client.get("/analyze/nvidia/diff?from=10&to=20")
+ assert response.status_code == 200
+ data = response.json()
+ assert data["company_name"] == "nvidia"
+ assert data["from_id"] == 10
+ assert data["to_id"] == 20
+ assert "US-99999999-A1" in data["added_patents"]
+ assert data["patent_count_delta"] == 1
+
+ @patch("SPARC.api._get_job_db")
+ def test_from_id_not_found(self, mock_get_db, auth_client):
+ """Returns 404 when 'from' analysis ID doesn't exist."""
+ db = MagicMock()
+ mock_get_db.return_value = db
+ db.get_analysis_by_id.return_value = None
+
+ response = auth_client.get("/analyze/nvidia/diff?from=999&to=1000")
+ assert response.status_code == 404
+ assert "999" in response.json()["detail"]
+
+ @patch("SPARC.api._get_job_db")
+ def test_to_id_not_found(self, mock_get_db, auth_client):
+ """Returns 404 when 'to' analysis ID doesn't exist."""
+ db = MagicMock()
+ mock_get_db.return_value = db
+
+ from_rec = {
+ "id": 10,
+ "company_name": "nvidia",
+ "analysis_type": "portfolio",
+ "model": "openai/gpt-4o",
+ "response": "",
+ "timestamp": datetime(2025, 5, 1),
+ }
+ db.get_analysis_by_id.side_effect = lambda aid: from_rec if aid == 10 else None
+
+ response = auth_client.get("/analyze/nvidia/diff?from=10&to=999")
+ assert response.status_code == 404
+ assert "999" in response.json()["detail"]
+
+ @patch("SPARC.api._get_job_db")
+ def test_company_mismatch(self, mock_get_db, auth_client):
+ """Returns 404 when analysis belongs to a different company."""
+ db = MagicMock()
+ mock_get_db.return_value = db
+
+ rec = {
+ "id": 10,
+ "company_name": "intel",
+ "analysis_type": "portfolio",
+ "model": "openai/gpt-4o",
+ "response": "",
+ "timestamp": datetime(2025, 5, 1),
+ }
+ db.get_analysis_by_id.return_value = rec
+
+ response = auth_client.get("/analyze/nvidia/diff?from=10&to=20")
+ assert response.status_code == 404
+
+
+class TestHistoryEndpoint:
+ """Test GET /analyze/{company_name}/history."""
+
+ @patch("SPARC.api._get_job_db")
+ def test_returns_history_list(self, mock_get_db, auth_client):
+ """History endpoint returns list of past analysis runs."""
+ db = MagicMock()
+ mock_get_db.return_value = db
+ db.list_company_analyses.return_value = [
+ {
+ "id": 20,
+ "company_name": "nvidia",
+ "analysis_type": "portfolio",
+ "model": "openai/gpt-4o",
+ "response": "...",
+ "timestamp": datetime(2025, 5, 10),
+ },
+ {
+ "id": 10,
+ "company_name": "nvidia",
+ "analysis_type": "portfolio",
+ "model": "openai/gpt-4o",
+ "response": "...",
+ "timestamp": datetime(2025, 5, 1),
+ },
+ ]
+
+ response = auth_client.get("/analyze/nvidia/history")
+ assert response.status_code == 200
+ data = response.json()
+ assert len(data) == 2
+ assert data[0]["id"] == 20
+ assert data[1]["id"] == 10
+
+ @patch("SPARC.api._get_job_db")
+ def test_empty_history(self, mock_get_db, auth_client):
+ """History endpoint returns empty list when no analyses exist."""
+ db = MagicMock()
+ mock_get_db.return_value = db
+ db.list_company_analyses.return_value = []
+
+ response = auth_client.get("/analyze/nvidia/history")
+ assert response.status_code == 200
+ assert response.json() == []
diff --git a/tests/test_api_keys.py b/tests/test_api_keys.py
new file mode 100644
index 0000000..3942f63
--- /dev/null
+++ b/tests/test_api_keys.py
@@ -0,0 +1,319 @@
+"""Tests for user-level API key generation, listing, revocation, and authentication.
+
+Covers all acceptance criteria from issue #1673:
+1. Users can create API keys (POST /auth/apikeys)
+2. Users can list their active key IDs (GET /auth/apikeys)
+3. Users can revoke keys (DELETE /auth/apikeys/{key_id})
+4. API requests authenticated with a valid API key work on protected endpoints
+5. Revoked keys are immediately rejected
+6. Plaintext key is shown only at creation time
+
+All tests use mocked DB fixtures and require no live database.
+"""
+
+from datetime import datetime, timezone
+from unittest.mock import MagicMock, patch
+
+import pytest
+from fastapi.testclient import TestClient
+
+from SPARC.api import app
+from SPARC.auth import (
+ create_access_token,
+ generate_api_key,
+ hash_api_key,
+ verify_api_key,
+)
+
+
+@pytest.fixture
+def client():
+ """Create test client."""
+ return TestClient(app)
+
+
+def _make_user():
+ return {
+ "id": 1,
+ "email": "user@test.com",
+ "role": "user",
+ "created_at": datetime(2025, 1, 1, tzinfo=timezone.utc),
+ }
+
+
+def _auth_header(user_dict):
+ """Create an Authorization header with a valid access token."""
+ token = create_access_token(user_dict["id"], user_dict["email"], user_dict["role"])
+ return {"Authorization": f"Bearer {token}"}
+
+
+@pytest.fixture(autouse=True)
+def mock_db(monkeypatch):
+ """Mock the database client used by auth and api endpoints."""
+ db = MagicMock()
+ db.get_user_count.return_value = 0
+ db.get_user_by_id.return_value = None
+ db.get_user_by_email.return_value = None
+ db.authenticate_user.return_value = None
+ db.create_user.return_value = None
+ db.get_all_users.return_value = []
+ db.update_user_role.return_value = None
+ db.delete_user.return_value = False
+ db.create_api_key.return_value = None
+ db.list_api_keys.return_value = []
+ db.delete_api_key.return_value = False
+ db.get_all_api_key_hashes.return_value = []
+
+ with patch("SPARC.api.get_db_client", return_value=db), \
+ patch("SPARC.auth.get_db_client", return_value=db):
+ yield db
+
+
+class TestCreateApiKey:
+ """POST /auth/apikeys"""
+
+ def test_create_key_returns_plaintext_and_id(self, client, mock_db):
+ """Creating a key returns the plaintext key and metadata."""
+ user = _make_user()
+ mock_db.get_user_by_id.return_value = user
+ mock_db.create_api_key.return_value = {
+ "id": 42,
+ "user_id": user["id"],
+ "label": "my-ci-key",
+ "created_at": datetime(2025, 6, 1, tzinfo=timezone.utc),
+ }
+
+ response = client.post(
+ "/auth/apikeys",
+ json={"label": "my-ci-key"},
+ headers=_auth_header(user),
+ )
+
+ assert response.status_code == 200
+ data = response.json()
+ assert data["id"] == 42
+ assert len(data["key"]) == 64 # 32 bytes hex = 64 chars
+ assert data["label"] == "my-ci-key"
+ assert "created_at" in data
+
+ # Verify the hash passed to DB is valid for the returned key
+ call_args = mock_db.create_api_key.call_args
+ stored_hash = call_args.kwargs.get("key_hash") or call_args[1].get("key_hash") or call_args[0][1]
+ assert verify_api_key(data["key"], stored_hash)
+
+ def test_create_key_without_label(self, client, mock_db):
+ """Creating a key without a label should work."""
+ user = _make_user()
+ mock_db.get_user_by_id.return_value = user
+ mock_db.create_api_key.return_value = {
+ "id": 1,
+ "user_id": user["id"],
+ "label": None,
+ "created_at": datetime(2025, 6, 1, tzinfo=timezone.utc),
+ }
+
+ response = client.post(
+ "/auth/apikeys",
+ headers=_auth_header(user),
+ )
+
+ assert response.status_code == 200
+ assert response.json()["label"] is None
+
+ def test_create_key_requires_auth(self, client):
+ """Creating a key without auth should fail."""
+ response = client.post("/auth/apikeys")
+ assert response.status_code == 401
+
+
+class TestListApiKeys:
+ """GET /auth/apikeys"""
+
+ def test_list_keys_returns_metadata_only(self, client, mock_db):
+ """Listing keys should return IDs and labels, not secrets."""
+ user = _make_user()
+ mock_db.get_user_by_id.return_value = user
+ mock_db.list_api_keys.return_value = [
+ {"id": 1, "label": "key-1", "created_at": datetime(2025, 1, 1, tzinfo=timezone.utc)},
+ {"id": 2, "label": None, "created_at": datetime(2025, 2, 1, tzinfo=timezone.utc)},
+ ]
+
+ response = client.get("/auth/apikeys", headers=_auth_header(user))
+
+ assert response.status_code == 200
+ data = response.json()
+ assert len(data) == 2
+ assert data[0]["id"] == 1
+ assert data[0]["label"] == "key-1"
+ # Ensure no secret key is exposed
+ for item in data:
+ assert "key" not in item
+ assert "key_hash" not in item
+
+ def test_list_keys_empty(self, client, mock_db):
+ """User with no keys gets an empty list."""
+ user = _make_user()
+ mock_db.get_user_by_id.return_value = user
+ mock_db.list_api_keys.return_value = []
+
+ response = client.get("/auth/apikeys", headers=_auth_header(user))
+
+ assert response.status_code == 200
+ assert response.json() == []
+
+
+class TestRevokeApiKey:
+ """DELETE /auth/apikeys/{key_id}"""
+
+ def test_revoke_existing_key(self, client, mock_db):
+ """Revoking an owned key should succeed."""
+ user = _make_user()
+ mock_db.get_user_by_id.return_value = user
+ mock_db.delete_api_key.return_value = True
+
+ response = client.delete("/auth/apikeys/42", headers=_auth_header(user))
+
+ assert response.status_code == 200
+ assert "revoked" in response.json()["message"].lower()
+ mock_db.delete_api_key.assert_called_once_with(42, user["id"])
+
+ def test_revoke_nonexistent_key_returns_404(self, client, mock_db):
+ """Revoking a key that doesn't exist (or isn't owned) returns 404."""
+ user = _make_user()
+ mock_db.get_user_by_id.return_value = user
+ mock_db.delete_api_key.return_value = False
+
+ response = client.delete("/auth/apikeys/999", headers=_auth_header(user))
+
+ assert response.status_code == 404
+
+
+class TestApiKeyAuthentication:
+ """Using X-API-Key header on protected endpoints."""
+
+ def test_valid_api_key_accesses_protected_endpoint(self, client, mock_db):
+ """A valid API key should authenticate and access /auth/me."""
+ user = _make_user()
+ plaintext = generate_api_key()
+ hashed = hash_api_key(plaintext)
+
+ mock_db.get_all_api_key_hashes.return_value = [
+ {"key_hash": hashed, "user_id": user["id"]},
+ ]
+ mock_db.get_user_by_id.return_value = user
+
+ response = client.get("/auth/me", headers={"X-API-Key": plaintext})
+
+ assert response.status_code == 200
+ data = response.json()
+ assert data["email"] == user["email"]
+ assert data["id"] == user["id"]
+
+ def test_invalid_api_key_returns_401(self, client, mock_db):
+ """An invalid API key should return 401."""
+ mock_db.get_all_api_key_hashes.return_value = []
+
+ response = client.get("/auth/me", headers={"X-API-Key": "bad-key"})
+
+ assert response.status_code == 401
+ assert "invalid api key" in response.json()["detail"].lower()
+
+ def test_revoked_key_returns_401(self, client, mock_db):
+ """After revocation, using the key should return 401."""
+ # Simulate revoked key: no matching hashes in DB
+ mock_db.get_all_api_key_hashes.return_value = []
+
+ response = client.get("/auth/me", headers={"X-API-Key": "a" * 64})
+
+ assert response.status_code == 401
+
+ def test_api_key_for_deleted_user_returns_401(self, client, mock_db):
+ """An API key whose user no longer exists should return 401."""
+ plaintext = generate_api_key()
+ hashed = hash_api_key(plaintext)
+
+ mock_db.get_all_api_key_hashes.return_value = [
+ {"key_hash": hashed, "user_id": 999},
+ ]
+ mock_db.get_user_by_id.return_value = None # user deleted
+
+ response = client.get("/auth/me", headers={"X-API-Key": plaintext})
+
+ assert response.status_code == 401
+
+ def test_no_auth_at_all_returns_401(self, client, mock_db):
+ """No auth header at all should return 401."""
+ response = client.get("/auth/me")
+ assert response.status_code == 401
+
+
+class TestApiKeyFullFlow:
+ """End-to-end flow: create key, use it, revoke it, try again."""
+
+ def test_create_use_revoke_flow(self, client, mock_db):
+ """Simulate full lifecycle of an API key."""
+ user = _make_user()
+ mock_db.get_user_by_id.return_value = user
+
+ # Step 1: Create key
+ mock_db.create_api_key.return_value = {
+ "id": 10,
+ "user_id": user["id"],
+ "label": "test",
+ "created_at": datetime(2025, 6, 1, tzinfo=timezone.utc),
+ }
+
+ create_resp = client.post(
+ "/auth/apikeys",
+ json={"label": "test"},
+ headers=_auth_header(user),
+ )
+ assert create_resp.status_code == 200
+ plaintext = create_resp.json()["key"]
+
+ # Capture the hash that was stored
+ call_args = mock_db.create_api_key.call_args
+ stored_hash = call_args.kwargs.get("key_hash") or call_args[0][1]
+
+ # Step 2: Use key on protected endpoint
+ mock_db.get_all_api_key_hashes.return_value = [
+ {"key_hash": stored_hash, "user_id": user["id"]},
+ ]
+
+ use_resp = client.get("/auth/me", headers={"X-API-Key": plaintext})
+ assert use_resp.status_code == 200
+ assert use_resp.json()["email"] == user["email"]
+
+ # Step 3: Revoke key
+ mock_db.delete_api_key.return_value = True
+ revoke_resp = client.delete("/auth/apikeys/10", headers=_auth_header(user))
+ assert revoke_resp.status_code == 200
+
+ # Step 4: Try using revoked key
+ mock_db.get_all_api_key_hashes.return_value = [] # key removed from DB
+ rejected_resp = client.get("/auth/me", headers={"X-API-Key": plaintext})
+ assert rejected_resp.status_code == 401
+
+
+class TestApiKeyHelpers:
+ """Unit tests for key generation and hashing helpers."""
+
+ def test_generate_api_key_length(self):
+ """Generated key should be 64 hex characters (32 bytes)."""
+ key = generate_api_key()
+ assert len(key) == 64
+ # Should be valid hex
+ int(key, 16)
+
+ def test_generate_api_key_uniqueness(self):
+ """Two generated keys should be different."""
+ k1 = generate_api_key()
+ k2 = generate_api_key()
+ assert k1 != k2
+
+ def test_hash_and_verify(self):
+ """hash_api_key and verify_api_key should round-trip correctly."""
+ key = generate_api_key()
+ hashed = hash_api_key(key)
+ assert verify_api_key(key, hashed)
+ assert not verify_api_key("wrong-key", hashed)
diff --git a/tests/test_rate_limit_admin.py b/tests/test_rate_limit_admin.py
index bc63a5a..f10e9da 100644
--- a/tests/test_rate_limit_admin.py
+++ b/tests/test_rate_limit_admin.py
@@ -20,8 +20,10 @@ def client():
def reset_stats():
"""Reset rate limit stats between tests."""
api._rate_limit_stats.clear()
+ api._rejected_log.clear()
yield
api._rate_limit_stats.clear()
+ api._rejected_log.clear()
def _mock_admin():
@@ -50,8 +52,7 @@ class TestRateLimitAdminEndpoint:
app.dependency_overrides.clear()
def test_non_admin_rejected(self, client):
- """Non-admin users should get 403."""
- # Without overriding the dependency, it should fail auth
+ """Non-admin users should get 401/403."""
response = client.get("/admin/rate-limits")
assert response.status_code in (401, 403)
@@ -77,6 +78,9 @@ class TestRateLimitAdminEndpoint:
for rl in data["rate_limits"]:
assert rl["total_requests"] == 0
assert rl["rejected_requests"] == 0
+ assert rl["by_ip"] == []
+ assert data["throttled_24h"] == 0
+ assert data["throttled_over_time"] == []
finally:
app.dependency_overrides.clear()
@@ -107,3 +111,68 @@ class TestRateLimitAdminEndpoint:
assert isinstance(rl["limit"], str)
finally:
app.dependency_overrides.clear()
+
+ def test_per_ip_breakdown(self, client):
+ """Stats should include per-IP breakdown with total and rejected counts."""
+ api._track_rate_limit_request("/auth/login", "10.0.0.1")
+ api._track_rate_limit_request("/auth/login", "10.0.0.1", rejected=True)
+ api._track_rate_limit_request("/auth/login", "10.0.0.2")
+
+ app.dependency_overrides[api.get_current_admin] = _mock_admin
+ try:
+ response = client.get("/admin/rate-limits")
+ data = response.json()
+ login_stats = next(rl for rl in data["rate_limits"] if rl["endpoint"] == "/auth/login")
+ by_ip = login_stats["by_ip"]
+ assert len(by_ip) == 2
+ ip1 = next(entry for entry in by_ip if entry["ip"] == "10.0.0.1")
+ assert ip1["total"] == 2
+ assert ip1["rejected"] == 1
+ ip2 = next(entry for entry in by_ip if entry["ip"] == "10.0.0.2")
+ assert ip2["total"] == 1
+ assert ip2["rejected"] == 0
+ finally:
+ app.dependency_overrides.clear()
+
+ def test_throttled_24h_count(self, client):
+ """Should report total throttled requests in the last 24 hours."""
+ api._track_rate_limit_request("/auth/login", "10.0.0.1", rejected=True)
+ api._track_rate_limit_request("/auth/register", "10.0.0.2", rejected=True)
+
+ app.dependency_overrides[api.get_current_admin] = _mock_admin
+ try:
+ response = client.get("/admin/rate-limits")
+ data = response.json()
+ assert data["throttled_24h"] == 2
+ finally:
+ app.dependency_overrides.clear()
+
+ def test_throttled_over_time_structure(self, client):
+ """Throttled-over-time should be a list of {timestamp, count} buckets."""
+ api._track_rate_limit_request("/auth/login", "10.0.0.1", rejected=True)
+
+ app.dependency_overrides[api.get_current_admin] = _mock_admin
+ try:
+ response = client.get("/admin/rate-limits")
+ data = response.json()
+ assert len(data["throttled_over_time"]) >= 1
+ entry = data["throttled_over_time"][0]
+ assert "timestamp" in entry
+ assert "count" in entry
+ assert entry["count"] >= 1
+ finally:
+ app.dependency_overrides.clear()
+
+ def test_response_shape_matches_contract(self, client):
+ """The full response should match the expected shape for the frontend."""
+ app.dependency_overrides[api.get_current_admin] = _mock_admin
+ try:
+ response = client.get("/admin/rate-limits")
+ data = response.json()
+ # Top-level keys
+ assert set(data.keys()) == {"rate_limits", "throttled_24h", "throttled_over_time"}
+ # Each rate_limit entry
+ for rl in data["rate_limits"]:
+ assert set(rl.keys()) == {"endpoint", "limit", "total_requests", "rejected_requests", "by_ip"}
+ finally:
+ app.dependency_overrides.clear()