Compare commits

..

1 Commits

Author SHA1 Message Date
agent-company ab3964b18d Move webhook delivery to background task queue
Introduce a lightweight in-process task queue (thread + queue.Queue) so
that webhook HTTP delivery no longer blocks the scheduler or batch-job
background tasks.  The worker thread preserves the existing exponential-
backoff retry logic from _send_with_retry.

- Add SPARC/task_queue.py: WebhookTask, start/stop worker, enqueue, drain
- Add enqueue_notify / enqueue_job_completed / enqueue_alert to webhooks.py
- Update api.py lifespan to start/stop the webhook worker
- Update _run_batch_job to use enqueue_job_completed (non-blocking)
- Update scheduler to fire enqueue_alert on patent count changes
- Add 13 tests covering worker lifecycle, async delivery, retry in worker
  context, and integration via enqueue helpers
- All 22 existing webhook tests continue to pass unchanged

Closes leeworks-agents/SPARC#1676

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-05-19 15:22:21 +00:00
9 changed files with 529 additions and 411 deletions
+62 -57
View File
@@ -224,11 +224,16 @@ async def lifespan(app: FastAPI):
import logging
logging.getLogger(__name__).warning("Marked %d stale jobs as failed on startup", stale)
_db.close()
# Start webhook background worker
from SPARC.task_queue import start_worker as start_webhook_worker
from SPARC.task_queue import stop_worker as stop_webhook_worker
start_webhook_worker()
# Start scheduled analysis if tracked companies are configured
from SPARC.scheduler import start_scheduler
start_scheduler()
yield
# Cleanup
stop_webhook_worker()
_analyzer = None
close_db_client()
@@ -897,58 +902,6 @@ async def health_check():
)
@app.get(
"/analyze/batch",
response_model=PaginatedAnalysisResponse,
tags=["Analysis"],
)
async def list_analysis_results(
company_name: Annotated[
str | None,
Query(description="Filter results by company name"),
] = None,
limit: Annotated[int, Query(ge=1, le=200)] = 50,
cursor: Annotated[
str | None,
Query(description="Opaque cursor from a previous response's next_cursor field"),
] = None,
_: UserResponse = Depends(get_current_user),
):
"""List stored analysis results with cursor-based pagination.
Returns past analysis results ordered by timestamp descending. Use
``limit`` to control page size (default 50, max 200). The response
includes a ``next_cursor`` field; pass it back as the ``cursor`` query
parameter to fetch the next page. When ``next_cursor`` is ``null``,
there are no more results.
Args:
company_name: Optional filter by company name
limit: Maximum number of results to return (default 50, max 200)
cursor: Opaque pagination cursor from a previous response
Returns:
Paginated list of analysis results
"""
db = _get_job_db()
rows = db.list_analyses(company_name=company_name, limit=limit + 1, cursor=cursor)
has_next = len(rows) > limit
if has_next:
rows = rows[:limit]
items = [AnalysisRecord(**row) for row in rows]
next_cursor = None
if has_next and rows:
last = rows[-1]
ts = last["timestamp"]
ts_str = ts.isoformat() if hasattr(ts, "isoformat") else str(ts)
next_cursor = f"{ts_str}|{last['id']}"
return PaginatedAnalysisResponse(items=items, next_cursor=next_cursor)
@app.get(
"/analyze/{company_name}",
response_model=CompanyAnalysisResponse,
@@ -1011,6 +964,58 @@ async def analyze_single_patent(
raise HTTPException(status_code=404, detail=str(e))
@app.get(
"/analyze/batch",
response_model=PaginatedAnalysisResponse,
tags=["Analysis"],
)
async def list_analysis_results(
company_name: Annotated[
str | None,
Query(description="Filter results by company name"),
] = None,
limit: Annotated[int, Query(ge=1, le=200)] = 50,
cursor: Annotated[
str | None,
Query(description="Opaque cursor from a previous response's next_cursor field"),
] = None,
_: UserResponse = Depends(get_current_user),
):
"""List stored analysis results with cursor-based pagination.
Returns past analysis results ordered by timestamp descending. Use
``limit`` to control page size (default 50, max 200). The response
includes a ``next_cursor`` field; pass it back as the ``cursor`` query
parameter to fetch the next page. When ``next_cursor`` is ``null``,
there are no more results.
Args:
company_name: Optional filter by company name
limit: Maximum number of results to return (default 50, max 200)
cursor: Opaque pagination cursor from a previous response
Returns:
Paginated list of analysis results
"""
db = _get_job_db()
rows = db.list_analyses(company_name=company_name, limit=limit + 1, cursor=cursor)
has_next = len(rows) > limit
if has_next:
rows = rows[:limit]
items = [AnalysisRecord(**row) for row in rows]
next_cursor = None
if has_next and rows:
last = rows[-1]
ts = last["timestamp"]
ts_str = ts.isoformat() if hasattr(ts, "isoformat") else str(ts)
next_cursor = f"{ts_str}|{last['id']}"
return PaginatedAnalysisResponse(items=items, next_cursor=next_cursor)
@app.post(
"/analyze/batch",
response_model=BatchAnalysisResponse,
@@ -1104,9 +1109,9 @@ def _run_batch_job(job_id: str, companies: list[str], max_workers: int, model: s
progress=100,
result_json=_json.dumps(batch_response.model_dump(), default=str),
)
# Fire webhook notification
from SPARC.webhooks import notify_job_completed
notify_job_completed(
# Fire webhook notification (non-blocking via task queue)
from SPARC.webhooks import enqueue_job_completed
enqueue_job_completed(
job_id=job_id,
status="completed",
total_companies=result.total_companies,
@@ -1115,8 +1120,8 @@ def _run_batch_job(job_id: str, companies: list[str], max_workers: int, model: s
)
except Exception as e:
db.update_job(job_id, status="failed", error=str(e))
from SPARC.webhooks import notify_job_completed
notify_job_completed(
from SPARC.webhooks import enqueue_job_completed
enqueue_job_completed(
job_id=job_id,
status="failed",
total_companies=len(companies),
+7
View File
@@ -71,6 +71,13 @@ def run_scheduled_analysis() -> None:
old_value=old_count,
new_value=new_count,
)
# Fire non-blocking webhook notification
from SPARC.webhooks import enqueue_alert
enqueue_alert(
company_name=name,
alert_type="patent_count_change",
message=message,
)
elif new_count > 0:
# First analysis -- record baseline
logger.info("Baseline for %s: %d patents", name, new_count)
+113
View File
@@ -0,0 +1,113 @@
"""Lightweight in-process task queue for non-blocking webhook delivery.
Uses a daemon thread and a :class:`queue.Queue` so that the scheduler and
background jobs can enqueue webhook deliveries without blocking on HTTP
round-trips and retry backoff.
No external dependencies (Redis, etc.) are required.
"""
import logging
import queue
import threading
from dataclasses import dataclass
from typing import Any
logger = logging.getLogger(__name__)
@dataclass(frozen=True)
class WebhookTask:
"""A single webhook delivery request."""
url: str
payload: dict[str, Any]
# ---------------------------------------------------------------------------
# Module-level singleton
# ---------------------------------------------------------------------------
_queue: queue.Queue[WebhookTask | None] = queue.Queue()
_worker_thread: threading.Thread | None = None
_started = threading.Event()
def _worker_loop() -> None:
"""Process webhook tasks until a ``None`` sentinel is received."""
import SPARC.webhooks as _webhooks # deferred to avoid circular import
logger.info("Webhook worker thread started")
_started.set()
while True:
task = _queue.get()
if task is None:
# Sentinel — shut down
logger.info("Webhook worker thread stopping")
_queue.task_done()
break
try:
# Look up dynamically so that tests can patch the function
_webhooks._send_with_retry(task.url, task.payload)
except Exception:
logger.exception("Unexpected error delivering webhook to %s", task.url)
finally:
_queue.task_done()
def start_worker() -> None:
"""Start the background worker thread (idempotent)."""
global _worker_thread
if _worker_thread is not None and _worker_thread.is_alive():
return
_started.clear()
_worker_thread = threading.Thread(target=_worker_loop, daemon=True, name="webhook-worker")
_worker_thread.start()
_started.wait() # block until the worker is actually running
logger.info("Webhook task queue ready")
def stop_worker(timeout: float = 5.0) -> None:
"""Send the stop sentinel and wait for the worker to finish.
Args:
timeout: Maximum seconds to wait for the worker thread to join.
"""
global _worker_thread
if _worker_thread is None or not _worker_thread.is_alive():
_worker_thread = None
return
_queue.put(None) # sentinel
_worker_thread.join(timeout=timeout)
_worker_thread = None
logger.info("Webhook task queue stopped")
def enqueue(task: WebhookTask) -> None:
"""Add a webhook delivery task to the queue.
If the worker has not been started the task is still accepted into the
queue and will be processed once :func:`start_worker` is called.
"""
_queue.put(task)
def queue_size() -> int:
"""Return the approximate number of pending tasks."""
return _queue.qsize()
def drain(timeout: float = 10.0) -> None:
"""Block until all currently-enqueued tasks have been processed.
Useful in tests and graceful shutdown to ensure pending deliveries
complete before the process exits.
Args:
timeout: Maximum seconds to wait.
"""
_queue.join()
+58 -3
View File
@@ -91,9 +91,10 @@ def _send_with_retry(url: str, payload: dict) -> bool:
def notify(event_type: str, data: dict[str, Any]) -> None:
"""Fire all configured webhooks for an event.
"""Fire all configured webhooks for an event (**blocking**).
Safe to call even when no webhooks are configured (returns immediately).
For non-blocking delivery, use :func:`enqueue_notify` instead.
Args:
event_type: Event identifier (e.g., "job_completed", "patent_alert")
@@ -108,6 +109,29 @@ def notify(event_type: str, data: dict[str, Any]) -> None:
_send_with_retry(url, payload)
def enqueue_notify(event_type: str, data: dict[str, Any]) -> None:
"""Enqueue webhook delivery for all configured URLs (non-blocking).
Returns immediately after placing tasks on the background queue.
The worker thread handles retry logic asynchronously.
Safe to call even when no webhooks are configured.
Args:
event_type: Event identifier (e.g., "job_completed", "patent_alert")
data: Event data to include in the payload
"""
if not WEBHOOK_URLS:
return
from SPARC.task_queue import WebhookTask, enqueue
for url in WEBHOOK_URLS:
slack = _is_slack_url(url)
payload = _build_payload(event_type, data, slack=slack)
enqueue(WebhookTask(url=url, payload=payload))
def notify_job_completed(
job_id: str,
status: str,
@@ -115,7 +139,7 @@ def notify_job_completed(
successful: int,
failed: int,
) -> None:
"""Send notification when a batch job completes."""
"""Send notification when a batch job completes (blocking)."""
notify("job_completed", {
"job_id": job_id,
"status": status,
@@ -126,14 +150,45 @@ def notify_job_completed(
})
def enqueue_job_completed(
job_id: str,
status: str,
total_companies: int,
successful: int,
failed: int,
) -> None:
"""Enqueue notification when a batch job completes (non-blocking)."""
enqueue_notify("job_completed", {
"job_id": job_id,
"status": status,
"total_companies": total_companies,
"successful": successful,
"failed": failed,
"summary": f"Batch job {job_id}: {successful}/{total_companies} succeeded",
})
def notify_alert(
company_name: str,
alert_type: str,
message: str,
) -> None:
"""Send notification for a tracked company alert."""
"""Send notification for a tracked company alert (blocking)."""
notify("patent_alert", {
"company_name": company_name,
"alert_type": alert_type,
"message": message,
})
def enqueue_alert(
company_name: str,
alert_type: str,
message: str,
) -> None:
"""Enqueue notification for a tracked company alert (non-blocking)."""
enqueue_notify("patent_alert", {
"company_name": company_name,
"alert_type": alert_type,
"message": message,
})
+3 -79
View File
@@ -1,5 +1,5 @@
import axios, { AxiosError, InternalAxiosRequestConfig } from 'axios';
import type { TokenResponse, User, CompanyAnalysis, BatchAnalysisResult, JobStatus, Analytics, PaginatedJobsResponse, PaginatedAnalysisResponse } from '../types';
import type { TokenResponse, User, CompanyAnalysis, BatchAnalysisResult, JobStatus, Analytics } from '../types';
const API_BASE_URL = import.meta.env.VITE_API_URL || '/api';
@@ -141,60 +141,15 @@ export const analysisApi = {
return response.data;
},
listJobs: async (status?: string, limit = 50, cursor?: string): Promise<PaginatedJobsResponse> => {
listJobs: async (status?: string, limit = 10): Promise<JobStatus[]> => {
const params = new URLSearchParams();
if (status) params.append('status', status);
params.append('limit', limit.toString());
if (cursor) params.append('cursor', cursor);
const response = await api.get<PaginatedJobsResponse>(`/jobs?${params}`);
return response.data;
},
listBatchAnalyses: async (companyName?: string, limit = 50, cursor?: string): Promise<PaginatedAnalysisResponse> => {
const params = new URLSearchParams();
if (companyName) params.append('company_name', companyName);
params.append('limit', limit.toString());
if (cursor) params.append('cursor', cursor);
const response = await api.get<PaginatedAnalysisResponse>(`/analyze/batch?${params}`);
return response.data;
},
getCompanyHistory: async (companyName: string, limit = 20): Promise<AnalysisHistoryItem[]> => {
const response = await api.get<AnalysisHistoryItem[]>(
`/analyze/${encodeURIComponent(companyName)}/history?limit=${limit}`
);
return response.data;
},
diffAnalyses: async (companyName: string, fromId: number, toId: number): Promise<AnalysisDiff> => {
const response = await api.get<AnalysisDiff>(
`/analyze/${encodeURIComponent(companyName)}/diff?from=${fromId}&to=${toId}`
);
const response = await api.get<JobStatus[]>(`/jobs?${params}`);
return response.data;
},
};
// Analysis diff types
export interface AnalysisHistoryItem {
id: number;
analysis_type: string | null;
model: string | null;
timestamp: string;
}
export interface AnalysisDiff {
company_name: string;
from_id: number;
to_id: number;
from_timestamp: string;
to_timestamp: string;
patent_count_delta: number;
added_patents: string[];
removed_patents: string[];
changed_fields: Record<string, { from: string | null; to: string | null }>;
summary: string;
}
// Export API
export const exportApi = {
exportCsv: async (companyName: string): Promise<void> => {
@@ -246,32 +201,6 @@ export const analyticsApi = {
},
};
// Rate limit types
export interface RateLimitIpEntry {
ip: string;
total: number;
rejected: number;
}
export interface RateLimitEndpointStats {
endpoint: string;
limit: string;
total_requests: number;
rejected_requests: number;
by_ip: RateLimitIpEntry[];
}
export interface ThrottledBucket {
timestamp: string;
count: number;
}
export interface RateLimitStatsResponse {
rate_limits: RateLimitEndpointStats[];
throttled_24h: number;
throttled_over_time: ThrottledBucket[];
}
// Admin API
export const adminApi = {
listUsers: async (limit = 100, offset = 0): Promise<User[]> => {
@@ -287,11 +216,6 @@ export const adminApi = {
deleteUser: async (userId: number): Promise<void> => {
await api.delete(`/admin/users/${userId}`);
},
getRateLimits: async (): Promise<RateLimitStatsResponse> => {
const response = await api.get<RateLimitStatsResponse>('/admin/rate-limits');
return response.data;
},
};
export default api;
+5 -96
View File
@@ -222,17 +222,7 @@ export interface paths {
path?: never;
cookie?: never;
};
/**
* List Batch Analyses
* @description List stored analysis results with cursor-based pagination.
*
* Returns past analysis results ordered by timestamp descending. Use
* ``limit`` to control page size (default 50, max 200). The response
* includes a ``next_cursor`` field; pass it back as the ``cursor`` query
* parameter to fetch the next page. When ``next_cursor`` is ``null``,
* there are no more results.
*/
get: operations["list_batch_analyses_analyze_batch_get"];
get?: never;
put?: never;
/**
* Analyze Companies Batch
@@ -318,15 +308,14 @@ export interface paths {
};
/**
* List Jobs
* @description List analysis jobs with cursor-based pagination.
* @description List all analysis jobs.
*
* Args:
* status: Optional filter by job status
* limit: Maximum number of jobs to return (default 50, max 200)
* cursor: Opaque cursor from a previous response's next_cursor field
* limit: Maximum number of jobs to return (default 10, max 100)
*
* Returns:
* Paginated list of job statuses with next_cursor for subsequent pages
* List of job statuses
*/
get: operations["list_jobs_jobs_get"];
put?: never;
@@ -341,27 +330,6 @@ export interface paths {
export type webhooks = Record<string, never>;
export interface components {
schemas: {
/**
* AnalysisRecord
* @description A single stored analysis result.
*/
AnalysisRecord: {
/** Id */
id: number;
/** Company Name */
company_name?: string | null;
/** Analysis Type */
analysis_type?: string | null;
/** Model */
model?: string | null;
/** Response */
response?: string | null;
/**
* Timestamp
* Format: date-time
*/
timestamp?: string | null;
};
/**
* AnalyticsResponse
* @description Analytics response model.
@@ -457,26 +425,6 @@ export interface components {
*/
timestamp: string;
};
/**
* PaginatedAnalysisResponse
* @description Paginated response for analysis result listings.
*/
PaginatedAnalysisResponse: {
/** Items */
items: components["schemas"]["AnalysisRecord"][];
/** Next Cursor */
next_cursor?: string | null;
};
/**
* PaginatedJobsResponse
* @description Paginated response for job listings.
*/
PaginatedJobsResponse: {
/** Items */
items: components["schemas"]["JobStatus"][];
/** Next Cursor */
next_cursor?: string | null;
};
/**
* JobStatus
* @description Status of a background analysis job.
@@ -996,10 +944,7 @@ export interface operations {
query?: {
/** @description Filter by status: pending, running, completed, failed */
status?: string | null;
/** @description Maximum number of jobs to return (default 50, max 200) */
limit?: number;
/** @description Opaque cursor from a previous response's next_cursor field */
cursor?: string | null;
};
header?: never;
path?: never;
@@ -1013,43 +958,7 @@ export interface operations {
[name: string]: unknown;
};
content: {
"application/json": components["schemas"]["PaginatedJobsResponse"];
};
};
/** @description Validation Error */
422: {
headers: {
[name: string]: unknown;
};
content: {
"application/json": components["schemas"]["HTTPValidationError"];
};
};
};
};
list_batch_analyses_analyze_batch_get: {
parameters: {
query?: {
/** @description Filter results by company name */
company_name?: string | null;
/** @description Maximum number of results to return (default 50, max 200) */
limit?: number;
/** @description Opaque cursor from a previous response's next_cursor field */
cursor?: string | null;
};
header?: never;
path?: never;
cookie?: never;
};
requestBody?: never;
responses: {
/** @description Successful Response */
200: {
headers: {
[name: string]: unknown;
};
content: {
"application/json": components["schemas"]["PaginatedAnalysisResponse"];
"application/json": components["schemas"]["JobStatus"][];
};
};
/** @description Validation Error */
-5
View File
@@ -30,8 +30,3 @@ export type HealthResponse = components['schemas']['HealthResponse'];
export type BatchAnalysisRequest = components['schemas']['BatchAnalysisRequest'];
export type ValidationError = components['schemas']['ValidationError'];
export type HTTPValidationError = components['schemas']['HTTPValidationError'];
// Pagination types
export type AnalysisRecord = components['schemas']['AnalysisRecord'];
export type PaginatedAnalysisResponse = components['schemas']['PaginatedAnalysisResponse'];
export type PaginatedJobsResponse = components['schemas']['PaginatedJobsResponse'];
+19 -171
View File
@@ -1,13 +1,12 @@
"""Tests for cursor-based pagination on /analyze/batch GET and /jobs endpoints."""
from datetime import datetime, timedelta, timezone
from unittest.mock import MagicMock, Mock, patch
from datetime import datetime, timedelta
from unittest.mock import Mock, patch
import pytest
from fastapi.testclient import TestClient
from SPARC.api import app
from SPARC.auth import create_access_token
@pytest.fixture
@@ -16,27 +15,6 @@ def client():
return TestClient(app)
@pytest.fixture(autouse=True)
def mock_auth_db():
"""Mock the auth DB so JWT token validation succeeds without a real database."""
db = MagicMock()
db.get_user_by_id.return_value = {
"id": 1,
"email": "user@test.com",
"role": "user",
"created_at": datetime(2025, 1, 1, tzinfo=timezone.utc),
}
with patch("SPARC.api.get_db_client", return_value=db), \
patch("SPARC.auth.get_db_client", return_value=db):
yield db
def _auth_header():
"""Create a Bearer auth header for a regular user."""
token = create_access_token(1, "user@test.com", "user")
return {"Authorization": f"Bearer {token}"}
def _make_analysis_row(id_: int, minutes_ago: int = 0, company: str = "nvidia"):
"""Create a fake analysis row dict."""
ts = datetime.now() - timedelta(minutes=minutes_ago)
@@ -78,7 +56,7 @@ class TestAnalyzeBatchGetPagination:
]
mock_get_db.return_value = db
response = client.get("/analyze/batch?limit=10", headers=_auth_header())
response = client.get("/analyze/batch?limit=10")
assert response.status_code == 200
data = response.json()
assert len(data["items"]) == 2
@@ -93,7 +71,7 @@ class TestAnalyzeBatchGetPagination:
db.list_analyses.return_value = rows
mock_get_db.return_value = db
response = client.get("/analyze/batch?limit=3", headers=_auth_header())
response = client.get("/analyze/batch?limit=3")
assert response.status_code == 200
data = response.json()
assert len(data["items"]) == 3
@@ -106,14 +84,11 @@ class TestAnalyzeBatchGetPagination:
db.list_analyses.return_value = []
mock_get_db.return_value = db
client.get("/analyze/batch?cursor=2025-01-01T00:00:00|42", headers=_auth_header())
client.get("/analyze/batch?cursor=2025-01-01T00:00:00|42")
db.list_analyses.assert_called_once()
call_kwargs = db.list_analyses.call_args
cursor_val = (
call_kwargs.kwargs.get("cursor")
or (call_kwargs[1].get("cursor") if len(call_kwargs) > 1 else None)
)
assert cursor_val == "2025-01-01T00:00:00|42"
assert call_kwargs.kwargs.get("cursor") == "2025-01-01T00:00:00|42" or \
(call_kwargs[1].get("cursor") == "2025-01-01T00:00:00|42" if len(call_kwargs) > 1 else False)
@patch("SPARC.api._get_job_db")
def test_default_limit_is_50(self, mock_get_db, client):
@@ -122,19 +97,19 @@ class TestAnalyzeBatchGetPagination:
db.list_analyses.return_value = []
mock_get_db.return_value = db
client.get("/analyze/batch", headers=_auth_header())
client.get("/analyze/batch")
call_kwargs = db.list_analyses.call_args
# The endpoint requests limit+1 from DB, so 51
assert 51 in call_kwargs.args or call_kwargs.kwargs.get("limit") == 51
def test_limit_over_200_rejected(self, client):
"""Limit > 200 should be rejected with 422."""
response = client.get("/analyze/batch?limit=201", headers=_auth_header())
response = client.get("/analyze/batch?limit=201")
assert response.status_code == 422
def test_limit_zero_rejected(self, client):
"""Limit < 1 should be rejected with 422."""
response = client.get("/analyze/batch?limit=0", headers=_auth_header())
response = client.get("/analyze/batch?limit=0")
assert response.status_code == 422
@patch("SPARC.api._get_job_db")
@@ -144,13 +119,10 @@ class TestAnalyzeBatchGetPagination:
db.list_analyses.return_value = []
mock_get_db.return_value = db
client.get("/analyze/batch?company_name=intel", headers=_auth_header())
client.get("/analyze/batch?company_name=intel")
call_kwargs = db.list_analyses.call_args
company_val = (
call_kwargs.kwargs.get("company_name")
or (call_kwargs[1].get("company_name") if len(call_kwargs) > 1 else None)
)
assert company_val == "intel"
assert call_kwargs.kwargs.get("company_name") == "intel" or \
"intel" in (call_kwargs.args if call_kwargs.args else [])
@patch("SPARC.api._get_job_db")
def test_empty_result_set(self, mock_get_db, client):
@@ -159,30 +131,15 @@ class TestAnalyzeBatchGetPagination:
db.list_analyses.return_value = []
mock_get_db.return_value = db
response = client.get("/analyze/batch", headers=_auth_header())
response = client.get("/analyze/batch")
assert response.status_code == 200
data = response.json()
assert data["items"] == []
assert data["next_cursor"] is None
@patch("SPARC.api._get_job_db")
def test_subsequent_page_uses_cursor(self, mock_get_db, client):
"""Passing a cursor should retrieve the next page of results."""
db = Mock()
db.list_analyses.return_value = [_make_analysis_row(99, minutes_ago=100)]
mock_get_db.return_value = db
cursor = "2025-06-01T12:00:00|50"
response = client.get(f"/analyze/batch?limit=10&cursor={cursor}", headers=_auth_header())
assert response.status_code == 200
data = response.json()
# Only one item returned → last page → no next cursor
assert len(data["items"]) == 1
assert data["next_cursor"] is None
class TestJobsPagination:
"""Test cursor-based pagination on GET /jobs."""
class TestJobsPaginationDefaults:
"""Test that /jobs endpoint uses updated defaults."""
@patch("SPARC.api._get_job_db")
def test_default_limit_is_50(self, mock_get_db, client):
@@ -191,19 +148,14 @@ class TestJobsPagination:
db.list_jobs.return_value = []
mock_get_db.return_value = db
client.get("/jobs", headers=_auth_header())
client.get("/jobs")
call_kwargs = db.list_jobs.call_args
# Endpoint requests limit+1 from DB, so 51
assert 51 in call_kwargs.args or call_kwargs.kwargs.get("limit") == 51
def test_limit_over_200_rejected(self, client):
"""Limit > 200 should be rejected with 422."""
response = client.get("/jobs?limit=201", headers=_auth_header())
assert response.status_code == 422
def test_limit_zero_rejected(self, client):
"""Limit < 1 should be rejected with 422."""
response = client.get("/jobs?limit=0", headers=_auth_header())
response = client.get("/jobs?limit=201")
assert response.status_code == 422
@patch("SPARC.api._get_job_db")
@@ -213,109 +165,5 @@ class TestJobsPagination:
db.list_jobs.return_value = []
mock_get_db.return_value = db
response = client.get("/jobs?limit=200", headers=_auth_header())
response = client.get("/jobs?limit=200")
assert response.status_code == 200
@patch("SPARC.api._get_job_db")
def test_first_page_returns_items_and_cursor(self, mock_get_db, client):
"""First page with more results than limit should return next_cursor."""
db = Mock()
# Return limit+1 rows to simulate more data available
rows = [_make_job_row(f"job-{i}", minutes_ago=i) for i in range(4)]
db.list_jobs.return_value = rows
mock_get_db.return_value = db
response = client.get("/jobs?limit=3", headers=_auth_header())
assert response.status_code == 200
data = response.json()
assert len(data["items"]) == 3
assert data["next_cursor"] is not None
@patch("SPARC.api._get_job_db")
def test_last_page_returns_no_cursor(self, mock_get_db, client):
"""When fewer results than limit, next_cursor should be null (last page)."""
db = Mock()
rows = [
_make_job_row("job-a", minutes_ago=5),
_make_job_row("job-b", minutes_ago=10),
]
db.list_jobs.return_value = rows
mock_get_db.return_value = db
response = client.get("/jobs?limit=10", headers=_auth_header())
assert response.status_code == 200
data = response.json()
assert len(data["items"]) == 2
assert data["next_cursor"] is None
@patch("SPARC.api._get_job_db")
def test_cursor_forwarded_to_db(self, mock_get_db, client):
"""The cursor query param should be forwarded to the database layer."""
db = Mock()
db.list_jobs.return_value = []
mock_get_db.return_value = db
client.get("/jobs?cursor=2025-01-01T00:00:00|job-99", headers=_auth_header())
db.list_jobs.assert_called_once()
call_kwargs = db.list_jobs.call_args
cursor_val = (
call_kwargs.kwargs.get("cursor")
or (call_kwargs[1].get("cursor") if len(call_kwargs) > 1 else None)
)
assert cursor_val == "2025-01-01T00:00:00|job-99"
@patch("SPARC.api._get_job_db")
def test_empty_result_set(self, mock_get_db, client):
"""Empty result set returns empty items list and null next_cursor."""
db = Mock()
db.list_jobs.return_value = []
mock_get_db.return_value = db
response = client.get("/jobs", headers=_auth_header())
assert response.status_code == 200
data = response.json()
assert data["items"] == []
assert data["next_cursor"] is None
@patch("SPARC.api._get_job_db")
def test_status_filter_forwarded(self, mock_get_db, client):
"""The status filter should be forwarded to the database layer."""
db = Mock()
db.list_jobs.return_value = []
mock_get_db.return_value = db
client.get("/jobs?status=completed", headers=_auth_header())
db.list_jobs.assert_called_once()
call_kwargs = db.list_jobs.call_args
status_val = (
call_kwargs.kwargs.get("status")
or (call_kwargs[1].get("status") if len(call_kwargs) > 1 else None)
)
assert status_val == "completed"
@patch("SPARC.api._get_job_db")
def test_response_has_paginated_shape(self, mock_get_db, client):
"""Response must have 'items' and 'next_cursor' fields (paginated shape)."""
db = Mock()
db.list_jobs.return_value = [_make_job_row("job-x")]
mock_get_db.return_value = db
response = client.get("/jobs?limit=10", headers=_auth_header())
assert response.status_code == 200
data = response.json()
assert "items" in data
assert "next_cursor" in data
@patch("SPARC.api._get_job_db")
def test_subsequent_page_uses_cursor(self, mock_get_db, client):
"""Passing cursor returns the next page; last page has null next_cursor."""
db = Mock()
db.list_jobs.return_value = [_make_job_row("job-last", minutes_ago=200)]
mock_get_db.return_value = db
cursor = "2025-06-01T12:00:00|job-50"
response = client.get(f"/jobs?limit=10&cursor={cursor}", headers=_auth_header())
assert response.status_code == 200
data = response.json()
assert len(data["items"]) == 1
assert data["next_cursor"] is None
+262
View File
@@ -0,0 +1,262 @@
"""Tests for the webhook background task queue.
Covers:
- Worker lifecycle (start / stop / idempotent start)
- Tasks are processed asynchronously by the worker
- Retry logic is preserved (executed inside the worker thread)
- enqueue_notify / enqueue_job_completed / enqueue_alert non-blocking helpers
- Integration: queued webhook task is eventually delivered (mocked HTTP)
"""
import threading
import time
from unittest.mock import MagicMock, call, patch
import pytest
from SPARC.task_queue import (
WebhookTask,
drain,
enqueue,
queue_size,
start_worker,
stop_worker,
)
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture(autouse=True)
def _worker_lifecycle():
"""Start the worker before each test and stop it after."""
start_worker()
yield
stop_worker(timeout=3)
# ---------------------------------------------------------------------------
# Worker lifecycle
# ---------------------------------------------------------------------------
class TestWorkerLifecycle:
def test_start_is_idempotent(self):
"""Calling start_worker() twice does not create a second thread."""
import SPARC.task_queue as tq
first = tq._worker_thread
start_worker()
assert tq._worker_thread is first
def test_stop_worker_gracefully(self):
"""stop_worker joins the thread cleanly."""
import SPARC.task_queue as tq
assert tq._worker_thread is not None
stop_worker(timeout=3)
assert tq._worker_thread is None
# ---------------------------------------------------------------------------
# Task processing
# ---------------------------------------------------------------------------
class TestTaskProcessing:
@patch("SPARC.webhooks._send_with_retry")
def test_enqueued_task_is_delivered(self, mock_send):
"""A task put on the queue is eventually processed by the worker."""
mock_send.return_value = True
task = WebhookTask(url="https://example.com/hook", payload={"event": "test"})
enqueue(task)
drain(timeout=5)
mock_send.assert_called_once_with("https://example.com/hook", {"event": "test"})
@patch("SPARC.webhooks._send_with_retry")
def test_multiple_tasks_processed_in_order(self, mock_send):
"""Tasks are processed FIFO."""
mock_send.return_value = True
for i in range(3):
enqueue(WebhookTask(url=f"https://example.com/{i}", payload={"n": i}))
drain(timeout=5)
assert mock_send.call_count == 3
urls = [c[0][0] for c in mock_send.call_args_list]
assert urls == [
"https://example.com/0",
"https://example.com/1",
"https://example.com/2",
]
@patch("SPARC.webhooks._send_with_retry")
def test_enqueue_returns_immediately(self, mock_send):
"""enqueue() does not block even if the worker is slow."""
event = threading.Event()
def slow_send(url, payload):
event.wait(timeout=5)
return True
mock_send.side_effect = slow_send
start = time.monotonic()
enqueue(WebhookTask(url="https://slow.example.com", payload={}))
elapsed = time.monotonic() - start
# enqueue should return in well under 1 second
assert elapsed < 0.5
# Let the worker finish
event.set()
drain(timeout=5)
@patch("SPARC.webhooks._send_with_retry", side_effect=RuntimeError("boom"))
def test_worker_survives_unexpected_error(self, mock_send):
"""An unexpected exception in delivery does not kill the worker."""
enqueue(WebhookTask(url="https://example.com/bad", payload={}))
drain(timeout=5)
# Worker is still alive; enqueue another task
mock_send.side_effect = None
mock_send.return_value = True
enqueue(WebhookTask(url="https://example.com/good", payload={"ok": True}))
drain(timeout=5)
assert mock_send.call_count == 2
# ---------------------------------------------------------------------------
# Retry logic preserved in worker context
# ---------------------------------------------------------------------------
class TestRetryInWorker:
@patch("SPARC.webhooks.time.sleep")
@patch("SPARC.webhooks.requests.post")
def test_retry_logic_runs_inside_worker(self, mock_post, mock_sleep):
"""The worker thread uses _send_with_retry, which retries on failure."""
mock_post.side_effect = [
MagicMock(status_code=500),
MagicMock(status_code=200),
]
enqueue(WebhookTask(
url="https://example.com/retry",
payload={"event": "test"},
))
drain(timeout=10)
assert mock_post.call_count == 2
mock_sleep.assert_called_once()
@patch("SPARC.webhooks.time.sleep")
@patch("SPARC.webhooks.requests.post")
def test_all_retries_exhausted_in_worker(self, mock_post, mock_sleep):
"""Worker handles permanent failure gracefully."""
mock_post.return_value = MagicMock(status_code=500)
enqueue(WebhookTask(
url="https://example.com/fail",
payload={"event": "test"},
))
drain(timeout=10)
from SPARC.webhooks import MAX_RETRIES
assert mock_post.call_count == MAX_RETRIES
# ---------------------------------------------------------------------------
# Integration: enqueue_notify and convenience helpers
# ---------------------------------------------------------------------------
class TestEnqueueHelpers:
@patch("SPARC.webhooks._send_with_retry")
@patch("SPARC.webhooks.WEBHOOK_URLS", ["https://example.com/hook"])
def test_enqueue_notify_delivers_via_worker(self, mock_send):
"""enqueue_notify puts a task on the queue and the worker delivers it."""
mock_send.return_value = True
from SPARC.webhooks import enqueue_notify
enqueue_notify("test_event", {"key": "val"})
drain(timeout=5)
mock_send.assert_called_once()
url, payload = mock_send.call_args[0]
assert url == "https://example.com/hook"
assert payload["event"] == "test_event"
assert payload["key"] == "val"
@patch("SPARC.webhooks._send_with_retry")
@patch("SPARC.webhooks.WEBHOOK_URLS", ["https://example.com/hook"])
def test_enqueue_job_completed(self, mock_send):
"""enqueue_job_completed sends job completion data via the queue."""
mock_send.return_value = True
from SPARC.webhooks import enqueue_job_completed
enqueue_job_completed(
job_id="job-1",
status="completed",
total_companies=5,
successful=4,
failed=1,
)
drain(timeout=5)
mock_send.assert_called_once()
payload = mock_send.call_args[0][1]
assert payload["event"] == "job_completed"
assert payload["job_id"] == "job-1"
assert payload["successful"] == 4
@patch("SPARC.webhooks._send_with_retry")
@patch("SPARC.webhooks.WEBHOOK_URLS", ["https://example.com/hook"])
def test_enqueue_alert(self, mock_send):
"""enqueue_alert sends alert data via the queue."""
mock_send.return_value = True
from SPARC.webhooks import enqueue_alert
enqueue_alert(
company_name="NVIDIA",
alert_type="patent_count_change",
message="Patent count increased by 30%",
)
drain(timeout=5)
mock_send.assert_called_once()
payload = mock_send.call_args[0][1]
assert payload["event"] == "patent_alert"
assert payload["company_name"] == "NVIDIA"
@patch("SPARC.webhooks._send_with_retry")
@patch("SPARC.webhooks.WEBHOOK_URLS", [])
def test_enqueue_notify_noop_when_no_urls(self, mock_send):
"""enqueue_notify is a no-op when WEBHOOK_URLS is empty."""
from SPARC.webhooks import enqueue_notify
enqueue_notify("test_event", {"key": "val"})
drain(timeout=2)
mock_send.assert_not_called()
@patch("SPARC.webhooks._send_with_retry")
@patch("SPARC.webhooks.WEBHOOK_URLS", [
"https://hooks.slack.com/services/T00/B00/xxx",
"https://example.com/generic",
])
def test_enqueue_notify_slack_formatting(self, mock_send):
"""Slack URLs get Slack-formatted payloads even via the queue."""
mock_send.return_value = True
from SPARC.webhooks import enqueue_notify
enqueue_notify("test_event", {"key": "val"})
drain(timeout=5)
assert mock_send.call_count == 2
slack_payload = mock_send.call_args_list[0][0][1]
assert "text" in slack_payload
generic_payload = mock_send.call_args_list[1][0][1]
assert "event" in generic_payload