Compare commits

..

8 Commits

Author SHA1 Message Date
agent-company ab3964b18d Move webhook delivery to background task queue
Introduce a lightweight in-process task queue (thread + queue.Queue) so
that webhook HTTP delivery no longer blocks the scheduler or batch-job
background tasks.  The worker thread preserves the existing exponential-
backoff retry logic from _send_with_retry.

- Add SPARC/task_queue.py: WebhookTask, start/stop worker, enqueue, drain
- Add enqueue_notify / enqueue_job_completed / enqueue_alert to webhooks.py
- Update api.py lifespan to start/stop the webhook worker
- Update _run_batch_job to use enqueue_job_completed (non-blocking)
- Update scheduler to fire enqueue_alert on patent count changes
- Add 13 tests covering worker lifecycle, async delivery, retry in worker
  context, and integration via enqueue helpers
- All 22 existing webhook tests continue to pass unchanged

Closes leeworks-agents/SPARC#1676

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-05-19 15:22:21 +00:00
AI-Manager 313800215c Merge pull request 'Add rate limit stats to admin panel' (#1682) from feature/1675-rate-limit-admin into main
Merge PR #1682
2026-05-19 00:12:56 +00:00
AI-Manager 222f29deb1 Merge pull request 'Add cursor-based pagination to /analyze/batch and /jobs' (#1681) from feature/1669-cursor-pagination into main
Merge PR #1681
2026-05-19 00:12:48 +00:00
AI-Manager e6d95bbf57 Merge pull request 'Add stricter input validation for company names' (#1680) from feature/1670-company-name-validation into main
Merge PR #1680
2026-05-19 00:12:42 +00:00
AI-Manager 68484ef4b1 Merge pull request 'Update ROADMAP.md: mark completed P1 and P2 items as done' (#1679) from feature/1678-update-roadmap into main
Merge PR #1679
2026-05-19 00:12:34 +00:00
agent-company a0cb9a5773 Add rate limit status and usage statistics to admin panel
Add GET /admin/rate-limits endpoint (admin-only) that returns current
rate limit configuration and request statistics for all rate-limited
endpoints (/auth/register and /auth/login). Tracks total requests and
rejection counts via in-memory counters.

Includes tests for admin access, non-admin rejection, empty state,
request tracking, and configuration display.

Closes leeworks-agents/SPARC#1675

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-05-18 21:53:01 +00:00
agent-company 857b3444df Add cursor-based pagination to GET /analyze/batch and update /jobs defaults
Add a new GET /analyze/batch endpoint that returns stored analysis results
with cursor-based pagination (default limit 50, max 200). Also update the
existing /jobs endpoint defaults from limit=10/max=100 to limit=50/max=200
for consistency.

The database layer gains a list_analyses() method with cursor support using
(timestamp, id) ordering, matching the existing list_jobs() pattern.

Includes tests for pagination behavior, boundary limits, cursor forwarding,
company name filtering, and empty result sets.

Closes leeworks-agents/SPARC#1669

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-05-18 21:49:22 +00:00
agent-company 7c6eed8d72 Update ROADMAP.md to mark completed P1 and P2 items as done
Move seven completed items from the P1 and P2 sections into the
Completed section: in-memory jobs persistence, export endpoint tests,
tracked company admin tests, webhook integration tests, S3 storage
tests, auto-download path tests, and scheduler DatabaseClient refactor.

The P2 section now only lists the two genuinely open items: cursor-based
pagination (Issue #1669) and request validation (Issue #1670).

Closes leeworks-agents/SPARC#1678

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-05-18 21:29:14 +00:00
9 changed files with 930 additions and 46 deletions
+30 -37
View File
@@ -81,57 +81,50 @@ Items that have been implemented and merged into main.
- ~~OpenAPI client generation.~~ TypeScript API client auto-generated from - ~~OpenAPI client generation.~~ TypeScript API client auto-generated from
FastAPI spec with CI freshness check. FastAPI spec with CI freshness check.
### Resilience
- ~~`_jobs` dict is in-memory only.~~ Database-backed job persistence
implemented using `db.list_jobs()` and `mark_stale_jobs_failed()`. The
in-memory `_jobs` dict has been removed.
### Test coverage (P1/P2)
- ~~Export endpoint tests.~~ Tests added for CSV and PDF export endpoints.
- ~~Tracked company admin endpoint tests.~~ Tests added for `/admin/tracked`
CRUD endpoints and scheduler integration.
- ~~Webhook integration tests.~~ Tests added for retry logic, Slack/Discord
payload format, and multi-URL dispatch.
- ~~S3/MinIO storage backend tests.~~ Unit tests added for the S3 backend
(read, write, exists, delete, error handling).
- ~~`analyze_single_patent` auto-download path tests.~~ Tests added for the
auto-download fallback (cache lookup, PDF download, FileNotFoundError).
### Code quality
- ~~Scheduler creates its own DatabaseClient.~~ Refactored to use the
application-level pooled `get_db_client()`.
--- ---
## P1 -- High Priority ## P1 -- High Priority
These items address correctness, reliability, and coverage gaps that should be No outstanding P1 items. All previously listed items have been completed and
resolved before broader production use. moved to the Completed section above.
### Resilience
- **`_jobs` dict is in-memory only.** Job state is lost on API restart.
Persist job status in PostgreSQL or Redis so async batch results survive
restarts.
### Test coverage gaps
- **Export endpoint tests.** The CSV and PDF export endpoints (`/export/`)
lack test coverage. Add tests covering auth, success, 404, and edge cases.
*(Issue #1655)*
- **Tracked company admin endpoint tests.** The `/admin/tracked` CRUD
endpoints and scheduler integration lack test coverage. *(Issue #1656)*
--- ---
## P2 -- Medium Priority ## P2 -- Medium Priority
Improvements to reliability, test coverage, and code quality. Improvements to the API surface.
### Test coverage
- **Webhook integration tests.** The retry logic, Slack/Discord payload
format, and multi-URL dispatch in `webhooks.py` need test coverage.
*(Issue #1657)*
- **S3/MinIO storage backend tests.** `storage.py` has local filesystem tests
but no unit tests for the S3 backend (read, write, exists, delete,
error handling). *(Issue #1660)*
- **`analyze_single_patent` auto-download path tests.** The auto-download
fallback (cache lookup, PDF download, FileNotFoundError) in
`analyzer.py` lacks test coverage. *(Issue #1661)*
### Code quality
- **Scheduler creates its own DatabaseClient.** `scheduler.py` bypasses the
application-level pooled client, creating a new connection on every tick.
Refactor to use `get_db_client()`. *(Issue #1658)*
### API improvements ### API improvements
- **API pagination.** The `/analyze/batch` and `/jobs` endpoints could benefit - **API pagination.** The `/analyze/batch` endpoint needs cursor-based
from cursor-based pagination for large result sets. pagination for large result sets. The `/jobs` endpoint already has cursor
pagination. *(Issue #1669)*
- **Request validation improvements.** Add stricter input validation for - **Request validation improvements.** Add stricter input validation for
company names (disallow special characters, enforce length limits). company names (disallow special characters, enforce length limits).
*(Issue #1670)*
--- ---
+140 -6
View File
@@ -106,6 +106,24 @@ class JobStatus(BaseModel):
error: str | None = None error: str | None = None
class AnalysisRecord(BaseModel):
"""A single stored analysis result."""
id: int
company_name: str | None = None
analysis_type: str | None = None
model: str | None = None
response: str | None = None
timestamp: datetime | None = None
class PaginatedAnalysisResponse(BaseModel):
"""Paginated response for analysis result listings."""
items: list[AnalysisRecord]
next_cursor: str | None = None
class PaginatedJobsResponse(BaseModel): class PaginatedJobsResponse(BaseModel):
"""Paginated response for job listings.""" """Paginated response for job listings."""
@@ -206,11 +224,16 @@ async def lifespan(app: FastAPI):
import logging import logging
logging.getLogger(__name__).warning("Marked %d stale jobs as failed on startup", stale) logging.getLogger(__name__).warning("Marked %d stale jobs as failed on startup", stale)
_db.close() _db.close()
# Start webhook background worker
from SPARC.task_queue import start_worker as start_webhook_worker
from SPARC.task_queue import stop_worker as stop_webhook_worker
start_webhook_worker()
# Start scheduled analysis if tracked companies are configured # Start scheduled analysis if tracked companies are configured
from SPARC.scheduler import start_scheduler from SPARC.scheduler import start_scheduler
start_scheduler() start_scheduler()
yield yield
# Cleanup # Cleanup
stop_webhook_worker()
_analyzer = None _analyzer = None
close_db_client() close_db_client()
@@ -227,10 +250,37 @@ app = FastAPI(
limiter = Limiter(key_func=get_remote_address) limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter app.state.limiter = limiter
# In-memory rate limit statistics
_rate_limit_stats: dict[str, dict] = {}
def _track_rate_limit_request(endpoint: str, ip: str, rejected: bool = False) -> None:
"""Record a request against a rate-limited endpoint."""
key = endpoint
if key not in _rate_limit_stats:
_rate_limit_stats[key] = {
"endpoint": endpoint,
"total_requests": 0,
"rejected_requests": 0,
"by_ip": {},
}
_rate_limit_stats[key]["total_requests"] += 1
if rejected:
_rate_limit_stats[key]["rejected_requests"] += 1
ip_stats = _rate_limit_stats[key].setdefault("by_ip", {})
if ip not in ip_stats:
ip_stats[ip] = {"total": 0, "rejected": 0}
ip_stats[ip]["total"] += 1
if rejected:
ip_stats[ip]["rejected"] += 1
@app.exception_handler(RateLimitExceeded) @app.exception_handler(RateLimitExceeded)
async def rate_limit_handler(request: Request, exc: RateLimitExceeded): async def rate_limit_handler(request: Request, exc: RateLimitExceeded):
"""Return 429 with Retry-After header when rate limit is exceeded.""" """Return 429 with Retry-After header when rate limit is exceeded."""
endpoint = request.url.path
ip = get_remote_address(request)
_track_rate_limit_request(endpoint, ip, rejected=True)
retry_after = getattr(exc, "retry_after", 60) retry_after = getattr(exc, "retry_after", 60)
return JSONResponse( return JSONResponse(
status_code=429, status_code=429,
@@ -259,6 +309,7 @@ async def register(request: Request, body: RegisterRequest):
The first registered user automatically becomes an admin. The first registered user automatically becomes an admin.
""" """
_track_rate_limit_request("/auth/register", get_remote_address(request))
db = get_db_client() db = get_db_client()
# First user becomes admin # First user becomes admin
@@ -289,6 +340,7 @@ async def register(request: Request, body: RegisterRequest):
@limiter.limit("10/minute") @limiter.limit("10/minute")
async def login(request: Request, body: LoginRequest): async def login(request: Request, body: LoginRequest):
"""Authenticate user and return JWT tokens.""" """Authenticate user and return JWT tokens."""
_track_rate_limit_request("/auth/login", get_remote_address(request))
db = get_db_client() db = get_db_client()
user = db.authenticate_user(body.email, body.password) user = db.authenticate_user(body.email, body.password)
@@ -453,6 +505,36 @@ async def remove_tracked_company(
return {"message": f"Stopped tracking {company_name}"} return {"message": f"Stopped tracking {company_name}"}
@app.get("/admin/rate-limits", tags=["Admin"])
async def get_rate_limit_stats(
_: UserResponse = Depends(get_current_admin),
):
"""Get rate limit status and usage statistics (admin only).
Returns current rate limit configuration and request statistics
for all rate-limited endpoints.
Returns:
List of rate limit stats per endpoint with total/rejected counts
"""
rate_limits_config = {
"/auth/register": {"limit": "5/minute"},
"/auth/login": {"limit": "10/minute"},
}
results = []
for endpoint, conf in rate_limits_config.items():
stats = _rate_limit_stats.get(endpoint, {})
results.append({
"endpoint": endpoint,
"limit": conf["limit"],
"total_requests": stats.get("total_requests", 0),
"rejected_requests": stats.get("rejected_requests", 0),
})
return {"rate_limits": results}
@app.get("/admin/alerts", tags=["Admin"]) @app.get("/admin/alerts", tags=["Admin"])
async def list_alerts( async def list_alerts(
limit: int = Query(default=50, ge=1, le=200), limit: int = Query(default=50, ge=1, le=200),
@@ -882,6 +964,58 @@ async def analyze_single_patent(
raise HTTPException(status_code=404, detail=str(e)) raise HTTPException(status_code=404, detail=str(e))
@app.get(
"/analyze/batch",
response_model=PaginatedAnalysisResponse,
tags=["Analysis"],
)
async def list_analysis_results(
company_name: Annotated[
str | None,
Query(description="Filter results by company name"),
] = None,
limit: Annotated[int, Query(ge=1, le=200)] = 50,
cursor: Annotated[
str | None,
Query(description="Opaque cursor from a previous response's next_cursor field"),
] = None,
_: UserResponse = Depends(get_current_user),
):
"""List stored analysis results with cursor-based pagination.
Returns past analysis results ordered by timestamp descending. Use
``limit`` to control page size (default 50, max 200). The response
includes a ``next_cursor`` field; pass it back as the ``cursor`` query
parameter to fetch the next page. When ``next_cursor`` is ``null``,
there are no more results.
Args:
company_name: Optional filter by company name
limit: Maximum number of results to return (default 50, max 200)
cursor: Opaque pagination cursor from a previous response
Returns:
Paginated list of analysis results
"""
db = _get_job_db()
rows = db.list_analyses(company_name=company_name, limit=limit + 1, cursor=cursor)
has_next = len(rows) > limit
if has_next:
rows = rows[:limit]
items = [AnalysisRecord(**row) for row in rows]
next_cursor = None
if has_next and rows:
last = rows[-1]
ts = last["timestamp"]
ts_str = ts.isoformat() if hasattr(ts, "isoformat") else str(ts)
next_cursor = f"{ts_str}|{last['id']}"
return PaginatedAnalysisResponse(items=items, next_cursor=next_cursor)
@app.post( @app.post(
"/analyze/batch", "/analyze/batch",
response_model=BatchAnalysisResponse, response_model=BatchAnalysisResponse,
@@ -975,9 +1109,9 @@ def _run_batch_job(job_id: str, companies: list[str], max_workers: int, model: s
progress=100, progress=100,
result_json=_json.dumps(batch_response.model_dump(), default=str), result_json=_json.dumps(batch_response.model_dump(), default=str),
) )
# Fire webhook notification # Fire webhook notification (non-blocking via task queue)
from SPARC.webhooks import notify_job_completed from SPARC.webhooks import enqueue_job_completed
notify_job_completed( enqueue_job_completed(
job_id=job_id, job_id=job_id,
status="completed", status="completed",
total_companies=result.total_companies, total_companies=result.total_companies,
@@ -986,8 +1120,8 @@ def _run_batch_job(job_id: str, companies: list[str], max_workers: int, model: s
) )
except Exception as e: except Exception as e:
db.update_job(job_id, status="failed", error=str(e)) db.update_job(job_id, status="failed", error=str(e))
from SPARC.webhooks import notify_job_completed from SPARC.webhooks import enqueue_job_completed
notify_job_completed( enqueue_job_completed(
job_id=job_id, job_id=job_id,
status="failed", status="failed",
total_companies=len(companies), total_companies=len(companies),
@@ -1057,7 +1191,7 @@ async def list_jobs(
str | None, str | None,
Query(description="Filter by status: pending, running, completed, failed"), Query(description="Filter by status: pending, running, completed, failed"),
] = None, ] = None,
limit: Annotated[int, Query(ge=1, le=100)] = 10, limit: Annotated[int, Query(ge=1, le=200)] = 50,
cursor: Annotated[ cursor: Annotated[
str | None, str | None,
Query(description="Opaque cursor from a previous response's next_cursor field"), Query(description="Opaque cursor from a previous response's next_cursor field"),
+42
View File
@@ -371,6 +371,48 @@ class DatabaseClient:
cursor.execute(query, params) cursor.execute(query, params)
return [dict(row) for row in cursor.fetchall()] return [dict(row) for row in cursor.fetchall()]
def list_analyses(
self,
company_name: Optional[str] = None,
limit: int = 50,
cursor: Optional[str] = None,
) -> List[Dict]:
"""List analysis results with cursor-based pagination.
Args:
company_name: Optional filter by company name.
limit: Maximum number of records to return.
cursor: Opaque cursor (``timestamp|id``) from a previous response.
Returns:
List of analysis dicts ordered by timestamp descending.
"""
conditions: list[str] = ["is_cached = FALSE"]
params: list = []
if company_name:
conditions.append("LOWER(company_name) = LOWER(%s)")
params.append(company_name)
if cursor:
try:
ts_str, cursor_id = cursor.rsplit("|", 1)
conditions.append("(timestamp, id) < (%s, %s)")
params.extend([ts_str, int(cursor_id)])
except (ValueError, TypeError):
pass # Ignore malformed cursors; return from start
query = "SELECT id, company_name, analysis_type, model, response, timestamp FROM llm_messages"
if conditions:
query += " WHERE " + " AND ".join(conditions)
query += " ORDER BY timestamp DESC, id DESC LIMIT %s"
params.append(limit)
with self.get_conn() as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute(query, params)
return [dict(row) for row in cur.fetchall()]
def get_analytics(self, days: int = 30) -> Dict: def get_analytics(self, days: int = 30) -> Dict:
"""Get analytics on message usage. """Get analytics on message usage.
+7
View File
@@ -71,6 +71,13 @@ def run_scheduled_analysis() -> None:
old_value=old_count, old_value=old_count,
new_value=new_count, new_value=new_count,
) )
# Fire non-blocking webhook notification
from SPARC.webhooks import enqueue_alert
enqueue_alert(
company_name=name,
alert_type="patent_count_change",
message=message,
)
elif new_count > 0: elif new_count > 0:
# First analysis -- record baseline # First analysis -- record baseline
logger.info("Baseline for %s: %d patents", name, new_count) logger.info("Baseline for %s: %d patents", name, new_count)
+113
View File
@@ -0,0 +1,113 @@
"""Lightweight in-process task queue for non-blocking webhook delivery.
Uses a daemon thread and a :class:`queue.Queue` so that the scheduler and
background jobs can enqueue webhook deliveries without blocking on HTTP
round-trips and retry backoff.
No external dependencies (Redis, etc.) are required.
"""
import logging
import queue
import threading
from dataclasses import dataclass
from typing import Any
logger = logging.getLogger(__name__)
@dataclass(frozen=True)
class WebhookTask:
"""A single webhook delivery request."""
url: str
payload: dict[str, Any]
# ---------------------------------------------------------------------------
# Module-level singleton
# ---------------------------------------------------------------------------
_queue: queue.Queue[WebhookTask | None] = queue.Queue()
_worker_thread: threading.Thread | None = None
_started = threading.Event()
def _worker_loop() -> None:
"""Process webhook tasks until a ``None`` sentinel is received."""
import SPARC.webhooks as _webhooks # deferred to avoid circular import
logger.info("Webhook worker thread started")
_started.set()
while True:
task = _queue.get()
if task is None:
# Sentinel — shut down
logger.info("Webhook worker thread stopping")
_queue.task_done()
break
try:
# Look up dynamically so that tests can patch the function
_webhooks._send_with_retry(task.url, task.payload)
except Exception:
logger.exception("Unexpected error delivering webhook to %s", task.url)
finally:
_queue.task_done()
def start_worker() -> None:
"""Start the background worker thread (idempotent)."""
global _worker_thread
if _worker_thread is not None and _worker_thread.is_alive():
return
_started.clear()
_worker_thread = threading.Thread(target=_worker_loop, daemon=True, name="webhook-worker")
_worker_thread.start()
_started.wait() # block until the worker is actually running
logger.info("Webhook task queue ready")
def stop_worker(timeout: float = 5.0) -> None:
"""Send the stop sentinel and wait for the worker to finish.
Args:
timeout: Maximum seconds to wait for the worker thread to join.
"""
global _worker_thread
if _worker_thread is None or not _worker_thread.is_alive():
_worker_thread = None
return
_queue.put(None) # sentinel
_worker_thread.join(timeout=timeout)
_worker_thread = None
logger.info("Webhook task queue stopped")
def enqueue(task: WebhookTask) -> None:
"""Add a webhook delivery task to the queue.
If the worker has not been started the task is still accepted into the
queue and will be processed once :func:`start_worker` is called.
"""
_queue.put(task)
def queue_size() -> int:
"""Return the approximate number of pending tasks."""
return _queue.qsize()
def drain(timeout: float = 10.0) -> None:
"""Block until all currently-enqueued tasks have been processed.
Useful in tests and graceful shutdown to ensure pending deliveries
complete before the process exits.
Args:
timeout: Maximum seconds to wait.
"""
_queue.join()
+58 -3
View File
@@ -91,9 +91,10 @@ def _send_with_retry(url: str, payload: dict) -> bool:
def notify(event_type: str, data: dict[str, Any]) -> None: def notify(event_type: str, data: dict[str, Any]) -> None:
"""Fire all configured webhooks for an event. """Fire all configured webhooks for an event (**blocking**).
Safe to call even when no webhooks are configured (returns immediately). Safe to call even when no webhooks are configured (returns immediately).
For non-blocking delivery, use :func:`enqueue_notify` instead.
Args: Args:
event_type: Event identifier (e.g., "job_completed", "patent_alert") event_type: Event identifier (e.g., "job_completed", "patent_alert")
@@ -108,6 +109,29 @@ def notify(event_type: str, data: dict[str, Any]) -> None:
_send_with_retry(url, payload) _send_with_retry(url, payload)
def enqueue_notify(event_type: str, data: dict[str, Any]) -> None:
"""Enqueue webhook delivery for all configured URLs (non-blocking).
Returns immediately after placing tasks on the background queue.
The worker thread handles retry logic asynchronously.
Safe to call even when no webhooks are configured.
Args:
event_type: Event identifier (e.g., "job_completed", "patent_alert")
data: Event data to include in the payload
"""
if not WEBHOOK_URLS:
return
from SPARC.task_queue import WebhookTask, enqueue
for url in WEBHOOK_URLS:
slack = _is_slack_url(url)
payload = _build_payload(event_type, data, slack=slack)
enqueue(WebhookTask(url=url, payload=payload))
def notify_job_completed( def notify_job_completed(
job_id: str, job_id: str,
status: str, status: str,
@@ -115,7 +139,7 @@ def notify_job_completed(
successful: int, successful: int,
failed: int, failed: int,
) -> None: ) -> None:
"""Send notification when a batch job completes.""" """Send notification when a batch job completes (blocking)."""
notify("job_completed", { notify("job_completed", {
"job_id": job_id, "job_id": job_id,
"status": status, "status": status,
@@ -126,14 +150,45 @@ def notify_job_completed(
}) })
def enqueue_job_completed(
job_id: str,
status: str,
total_companies: int,
successful: int,
failed: int,
) -> None:
"""Enqueue notification when a batch job completes (non-blocking)."""
enqueue_notify("job_completed", {
"job_id": job_id,
"status": status,
"total_companies": total_companies,
"successful": successful,
"failed": failed,
"summary": f"Batch job {job_id}: {successful}/{total_companies} succeeded",
})
def notify_alert( def notify_alert(
company_name: str, company_name: str,
alert_type: str, alert_type: str,
message: str, message: str,
) -> None: ) -> None:
"""Send notification for a tracked company alert.""" """Send notification for a tracked company alert (blocking)."""
notify("patent_alert", { notify("patent_alert", {
"company_name": company_name, "company_name": company_name,
"alert_type": alert_type, "alert_type": alert_type,
"message": message, "message": message,
}) })
def enqueue_alert(
company_name: str,
alert_type: str,
message: str,
) -> None:
"""Enqueue notification for a tracked company alert (non-blocking)."""
enqueue_notify("patent_alert", {
"company_name": company_name,
"alert_type": alert_type,
"message": message,
})
+169
View File
@@ -0,0 +1,169 @@
"""Tests for cursor-based pagination on /analyze/batch GET and /jobs endpoints."""
from datetime import datetime, timedelta
from unittest.mock import Mock, patch
import pytest
from fastapi.testclient import TestClient
from SPARC.api import app
@pytest.fixture
def client():
"""Create test client."""
return TestClient(app)
def _make_analysis_row(id_: int, minutes_ago: int = 0, company: str = "nvidia"):
"""Create a fake analysis row dict."""
ts = datetime.now() - timedelta(minutes=minutes_ago)
return {
"id": id_,
"company_name": company,
"analysis_type": "patent_portfolio",
"model": "openai/gpt-4o",
"response": f"Analysis for {company}",
"timestamp": ts,
}
def _make_job_row(job_id: str, minutes_ago: int = 0, status: str = "completed"):
"""Create a fake job row dict."""
ts = datetime.now() - timedelta(minutes=minutes_ago)
return {
"job_id": job_id,
"status": status,
"progress": 100 if status == "completed" else 0,
"total_companies": 1,
"completed_companies": 1 if status == "completed" else 0,
"result": None,
"error": None,
"created_at": ts,
}
class TestAnalyzeBatchGetPagination:
"""Test cursor-based pagination on GET /analyze/batch."""
@patch("SPARC.api._get_job_db")
def test_returns_items_and_no_cursor_when_less_than_limit(self, mock_get_db, client):
"""When fewer results than limit, next_cursor should be null."""
db = Mock()
db.list_analyses.return_value = [
_make_analysis_row(1, minutes_ago=10),
_make_analysis_row(2, minutes_ago=20),
]
mock_get_db.return_value = db
response = client.get("/analyze/batch?limit=10")
assert response.status_code == 200
data = response.json()
assert len(data["items"]) == 2
assert data["next_cursor"] is None
@patch("SPARC.api._get_job_db")
def test_returns_cursor_when_more_results_exist(self, mock_get_db, client):
"""When more results exist than limit, next_cursor should be set."""
db = Mock()
# Return limit+1 rows to simulate more data
rows = [_make_analysis_row(i, minutes_ago=i) for i in range(4)]
db.list_analyses.return_value = rows
mock_get_db.return_value = db
response = client.get("/analyze/batch?limit=3")
assert response.status_code == 200
data = response.json()
assert len(data["items"]) == 3
assert data["next_cursor"] is not None
@patch("SPARC.api._get_job_db")
def test_cursor_passed_to_db(self, mock_get_db, client):
"""The cursor query param should be forwarded to the database layer."""
db = Mock()
db.list_analyses.return_value = []
mock_get_db.return_value = db
client.get("/analyze/batch?cursor=2025-01-01T00:00:00|42")
db.list_analyses.assert_called_once()
call_kwargs = db.list_analyses.call_args
assert call_kwargs.kwargs.get("cursor") == "2025-01-01T00:00:00|42" or \
(call_kwargs[1].get("cursor") == "2025-01-01T00:00:00|42" if len(call_kwargs) > 1 else False)
@patch("SPARC.api._get_job_db")
def test_default_limit_is_50(self, mock_get_db, client):
"""Default limit should be 50."""
db = Mock()
db.list_analyses.return_value = []
mock_get_db.return_value = db
client.get("/analyze/batch")
call_kwargs = db.list_analyses.call_args
# The endpoint requests limit+1 from DB, so 51
assert 51 in call_kwargs.args or call_kwargs.kwargs.get("limit") == 51
def test_limit_over_200_rejected(self, client):
"""Limit > 200 should be rejected with 422."""
response = client.get("/analyze/batch?limit=201")
assert response.status_code == 422
def test_limit_zero_rejected(self, client):
"""Limit < 1 should be rejected with 422."""
response = client.get("/analyze/batch?limit=0")
assert response.status_code == 422
@patch("SPARC.api._get_job_db")
def test_company_name_filter(self, mock_get_db, client):
"""The company_name filter should be forwarded to the database."""
db = Mock()
db.list_analyses.return_value = []
mock_get_db.return_value = db
client.get("/analyze/batch?company_name=intel")
call_kwargs = db.list_analyses.call_args
assert call_kwargs.kwargs.get("company_name") == "intel" or \
"intel" in (call_kwargs.args if call_kwargs.args else [])
@patch("SPARC.api._get_job_db")
def test_empty_result_set(self, mock_get_db, client):
"""Empty result set returns empty items and null cursor."""
db = Mock()
db.list_analyses.return_value = []
mock_get_db.return_value = db
response = client.get("/analyze/batch")
assert response.status_code == 200
data = response.json()
assert data["items"] == []
assert data["next_cursor"] is None
class TestJobsPaginationDefaults:
"""Test that /jobs endpoint uses updated defaults."""
@patch("SPARC.api._get_job_db")
def test_default_limit_is_50(self, mock_get_db, client):
"""Default limit should now be 50."""
db = Mock()
db.list_jobs.return_value = []
mock_get_db.return_value = db
client.get("/jobs")
call_kwargs = db.list_jobs.call_args
# Endpoint requests limit+1 from DB, so 51
assert 51 in call_kwargs.args or call_kwargs.kwargs.get("limit") == 51
def test_limit_over_200_rejected(self, client):
"""Limit > 200 should be rejected with 422."""
response = client.get("/jobs?limit=201")
assert response.status_code == 422
@patch("SPARC.api._get_job_db")
def test_limit_200_accepted(self, mock_get_db, client):
"""Limit of exactly 200 should be accepted."""
db = Mock()
db.list_jobs.return_value = []
mock_get_db.return_value = db
response = client.get("/jobs?limit=200")
assert response.status_code == 200
+109
View File
@@ -0,0 +1,109 @@
"""Tests for the /admin/rate-limits endpoint."""
from unittest.mock import patch
import pytest
from fastapi.testclient import TestClient
from SPARC import api
from SPARC.api import app
from SPARC.auth import UserResponse
@pytest.fixture
def client():
"""Create test client."""
return TestClient(app)
@pytest.fixture(autouse=True)
def reset_stats():
"""Reset rate limit stats between tests."""
api._rate_limit_stats.clear()
yield
api._rate_limit_stats.clear()
def _mock_admin():
"""Return a mock admin user."""
return UserResponse(id=1, email="admin@test.com", role="admin", created_at="2025-01-01T00:00:00")
def _mock_user():
"""Return a mock non-admin user."""
return UserResponse(id=2, email="user@test.com", role="user", created_at="2025-01-01T00:00:00")
class TestRateLimitAdminEndpoint:
"""Test GET /admin/rate-limits."""
def test_admin_can_access(self, client):
"""Admin users should be able to access the rate-limits endpoint."""
app.dependency_overrides[api.get_current_admin] = _mock_admin
try:
response = client.get("/admin/rate-limits")
assert response.status_code == 200
data = response.json()
assert "rate_limits" in data
assert isinstance(data["rate_limits"], list)
finally:
app.dependency_overrides.clear()
def test_non_admin_rejected(self, client):
"""Non-admin users should get 403."""
# Without overriding the dependency, it should fail auth
response = client.get("/admin/rate-limits")
assert response.status_code in (401, 403)
def test_returns_configured_endpoints(self, client):
"""Should list all rate-limited endpoints."""
app.dependency_overrides[api.get_current_admin] = _mock_admin
try:
response = client.get("/admin/rate-limits")
assert response.status_code == 200
data = response.json()
endpoints = [rl["endpoint"] for rl in data["rate_limits"]]
assert "/auth/register" in endpoints
assert "/auth/login" in endpoints
finally:
app.dependency_overrides.clear()
def test_empty_state_shows_zero_counts(self, client):
"""When no requests have been made, counts should be zero."""
app.dependency_overrides[api.get_current_admin] = _mock_admin
try:
response = client.get("/admin/rate-limits")
data = response.json()
for rl in data["rate_limits"]:
assert rl["total_requests"] == 0
assert rl["rejected_requests"] == 0
finally:
app.dependency_overrides.clear()
def test_tracks_requests(self, client):
"""After making requests, the stats should reflect them."""
api._track_rate_limit_request("/auth/login", "127.0.0.1")
api._track_rate_limit_request("/auth/login", "127.0.0.1")
api._track_rate_limit_request("/auth/login", "192.168.1.1", rejected=True)
app.dependency_overrides[api.get_current_admin] = _mock_admin
try:
response = client.get("/admin/rate-limits")
data = response.json()
login_stats = next(rl for rl in data["rate_limits"] if rl["endpoint"] == "/auth/login")
assert login_stats["total_requests"] == 3
assert login_stats["rejected_requests"] == 1
finally:
app.dependency_overrides.clear()
def test_includes_limit_config(self, client):
"""Each endpoint entry should include the rate limit config string."""
app.dependency_overrides[api.get_current_admin] = _mock_admin
try:
response = client.get("/admin/rate-limits")
data = response.json()
for rl in data["rate_limits"]:
assert "limit" in rl
assert isinstance(rl["limit"], str)
finally:
app.dependency_overrides.clear()
+262
View File
@@ -0,0 +1,262 @@
"""Tests for the webhook background task queue.
Covers:
- Worker lifecycle (start / stop / idempotent start)
- Tasks are processed asynchronously by the worker
- Retry logic is preserved (executed inside the worker thread)
- enqueue_notify / enqueue_job_completed / enqueue_alert non-blocking helpers
- Integration: queued webhook task is eventually delivered (mocked HTTP)
"""
import threading
import time
from unittest.mock import MagicMock, call, patch
import pytest
from SPARC.task_queue import (
WebhookTask,
drain,
enqueue,
queue_size,
start_worker,
stop_worker,
)
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture(autouse=True)
def _worker_lifecycle():
"""Start the worker before each test and stop it after."""
start_worker()
yield
stop_worker(timeout=3)
# ---------------------------------------------------------------------------
# Worker lifecycle
# ---------------------------------------------------------------------------
class TestWorkerLifecycle:
def test_start_is_idempotent(self):
"""Calling start_worker() twice does not create a second thread."""
import SPARC.task_queue as tq
first = tq._worker_thread
start_worker()
assert tq._worker_thread is first
def test_stop_worker_gracefully(self):
"""stop_worker joins the thread cleanly."""
import SPARC.task_queue as tq
assert tq._worker_thread is not None
stop_worker(timeout=3)
assert tq._worker_thread is None
# ---------------------------------------------------------------------------
# Task processing
# ---------------------------------------------------------------------------
class TestTaskProcessing:
@patch("SPARC.webhooks._send_with_retry")
def test_enqueued_task_is_delivered(self, mock_send):
"""A task put on the queue is eventually processed by the worker."""
mock_send.return_value = True
task = WebhookTask(url="https://example.com/hook", payload={"event": "test"})
enqueue(task)
drain(timeout=5)
mock_send.assert_called_once_with("https://example.com/hook", {"event": "test"})
@patch("SPARC.webhooks._send_with_retry")
def test_multiple_tasks_processed_in_order(self, mock_send):
"""Tasks are processed FIFO."""
mock_send.return_value = True
for i in range(3):
enqueue(WebhookTask(url=f"https://example.com/{i}", payload={"n": i}))
drain(timeout=5)
assert mock_send.call_count == 3
urls = [c[0][0] for c in mock_send.call_args_list]
assert urls == [
"https://example.com/0",
"https://example.com/1",
"https://example.com/2",
]
@patch("SPARC.webhooks._send_with_retry")
def test_enqueue_returns_immediately(self, mock_send):
"""enqueue() does not block even if the worker is slow."""
event = threading.Event()
def slow_send(url, payload):
event.wait(timeout=5)
return True
mock_send.side_effect = slow_send
start = time.monotonic()
enqueue(WebhookTask(url="https://slow.example.com", payload={}))
elapsed = time.monotonic() - start
# enqueue should return in well under 1 second
assert elapsed < 0.5
# Let the worker finish
event.set()
drain(timeout=5)
@patch("SPARC.webhooks._send_with_retry", side_effect=RuntimeError("boom"))
def test_worker_survives_unexpected_error(self, mock_send):
"""An unexpected exception in delivery does not kill the worker."""
enqueue(WebhookTask(url="https://example.com/bad", payload={}))
drain(timeout=5)
# Worker is still alive; enqueue another task
mock_send.side_effect = None
mock_send.return_value = True
enqueue(WebhookTask(url="https://example.com/good", payload={"ok": True}))
drain(timeout=5)
assert mock_send.call_count == 2
# ---------------------------------------------------------------------------
# Retry logic preserved in worker context
# ---------------------------------------------------------------------------
class TestRetryInWorker:
@patch("SPARC.webhooks.time.sleep")
@patch("SPARC.webhooks.requests.post")
def test_retry_logic_runs_inside_worker(self, mock_post, mock_sleep):
"""The worker thread uses _send_with_retry, which retries on failure."""
mock_post.side_effect = [
MagicMock(status_code=500),
MagicMock(status_code=200),
]
enqueue(WebhookTask(
url="https://example.com/retry",
payload={"event": "test"},
))
drain(timeout=10)
assert mock_post.call_count == 2
mock_sleep.assert_called_once()
@patch("SPARC.webhooks.time.sleep")
@patch("SPARC.webhooks.requests.post")
def test_all_retries_exhausted_in_worker(self, mock_post, mock_sleep):
"""Worker handles permanent failure gracefully."""
mock_post.return_value = MagicMock(status_code=500)
enqueue(WebhookTask(
url="https://example.com/fail",
payload={"event": "test"},
))
drain(timeout=10)
from SPARC.webhooks import MAX_RETRIES
assert mock_post.call_count == MAX_RETRIES
# ---------------------------------------------------------------------------
# Integration: enqueue_notify and convenience helpers
# ---------------------------------------------------------------------------
class TestEnqueueHelpers:
@patch("SPARC.webhooks._send_with_retry")
@patch("SPARC.webhooks.WEBHOOK_URLS", ["https://example.com/hook"])
def test_enqueue_notify_delivers_via_worker(self, mock_send):
"""enqueue_notify puts a task on the queue and the worker delivers it."""
mock_send.return_value = True
from SPARC.webhooks import enqueue_notify
enqueue_notify("test_event", {"key": "val"})
drain(timeout=5)
mock_send.assert_called_once()
url, payload = mock_send.call_args[0]
assert url == "https://example.com/hook"
assert payload["event"] == "test_event"
assert payload["key"] == "val"
@patch("SPARC.webhooks._send_with_retry")
@patch("SPARC.webhooks.WEBHOOK_URLS", ["https://example.com/hook"])
def test_enqueue_job_completed(self, mock_send):
"""enqueue_job_completed sends job completion data via the queue."""
mock_send.return_value = True
from SPARC.webhooks import enqueue_job_completed
enqueue_job_completed(
job_id="job-1",
status="completed",
total_companies=5,
successful=4,
failed=1,
)
drain(timeout=5)
mock_send.assert_called_once()
payload = mock_send.call_args[0][1]
assert payload["event"] == "job_completed"
assert payload["job_id"] == "job-1"
assert payload["successful"] == 4
@patch("SPARC.webhooks._send_with_retry")
@patch("SPARC.webhooks.WEBHOOK_URLS", ["https://example.com/hook"])
def test_enqueue_alert(self, mock_send):
"""enqueue_alert sends alert data via the queue."""
mock_send.return_value = True
from SPARC.webhooks import enqueue_alert
enqueue_alert(
company_name="NVIDIA",
alert_type="patent_count_change",
message="Patent count increased by 30%",
)
drain(timeout=5)
mock_send.assert_called_once()
payload = mock_send.call_args[0][1]
assert payload["event"] == "patent_alert"
assert payload["company_name"] == "NVIDIA"
@patch("SPARC.webhooks._send_with_retry")
@patch("SPARC.webhooks.WEBHOOK_URLS", [])
def test_enqueue_notify_noop_when_no_urls(self, mock_send):
"""enqueue_notify is a no-op when WEBHOOK_URLS is empty."""
from SPARC.webhooks import enqueue_notify
enqueue_notify("test_event", {"key": "val"})
drain(timeout=2)
mock_send.assert_not_called()
@patch("SPARC.webhooks._send_with_retry")
@patch("SPARC.webhooks.WEBHOOK_URLS", [
"https://hooks.slack.com/services/T00/B00/xxx",
"https://example.com/generic",
])
def test_enqueue_notify_slack_formatting(self, mock_send):
"""Slack URLs get Slack-formatted payloads even via the queue."""
mock_send.return_value = True
from SPARC.webhooks import enqueue_notify
enqueue_notify("test_event", {"key": "val"})
drain(timeout=5)
assert mock_send.call_count == 2
slack_payload = mock_send.call_args_list[0][0][1]
assert "text" in slack_payload
generic_payload = mock_send.call_args_list[1][0][1]
assert "event" in generic_payload