Compare commits

..

1 Commits

Author SHA1 Message Date
agent-company 3b6411869d feat: add cursor-based pagination to /jobs endpoint
Add a cursor query parameter to GET /jobs and return a next_cursor
field in the response envelope. Existing clients using only limit
continue to work without modification. The cursor is an opaque token
encoding created_at and job_id for stable keyset pagination.

Closes leeworks-agents/SPARC#25

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-26 10:19:01 +00:00
4 changed files with 71 additions and 174 deletions
-6
View File
@@ -40,9 +40,3 @@ JWT_SECRET=your-secure-jwt-secret-change-in-production
# When USE_CACHE=true: check database for cached responses before making API calls # When USE_CACHE=true: check database for cached responses before making API calls
# When USE_CACHE=false: always make fresh API calls (still stores results in database) # When USE_CACHE=false: always make fresh API calls (still stores results in database)
USE_CACHE=true USE_CACHE=true
# ---- Webhooks ----
# Comma-separated list of webhook URLs for job completion and alert notifications
# Supports generic HTTP POST and Slack/Discord incoming webhooks
# WEBHOOK_URLS=https://hooks.slack.com/services/XXX,https://example.com/webhook
+39 -22
View File
@@ -77,6 +77,13 @@ class JobStatus(BaseModel):
error: str | None = None error: str | None = None
class PaginatedJobsResponse(BaseModel):
"""Paginated response for job listings."""
items: list["JobStatus"]
next_cursor: str | None = None
class HealthResponse(BaseModel): class HealthResponse(BaseModel):
"""Health check response.""" """Health check response."""
@@ -519,25 +526,8 @@ def _run_batch_job(job_id: str, companies: list[str], max_workers: int):
progress=100, progress=100,
result_json=_json.dumps(batch_response.model_dump(), default=str), result_json=_json.dumps(batch_response.model_dump(), default=str),
) )
# Fire webhook notification
from SPARC.webhooks import notify_job_completed
notify_job_completed(
job_id=job_id,
status="completed",
total_companies=result.total_companies,
successful=result.successful,
failed=result.failed,
)
except Exception as e: except Exception as e:
db.update_job(job_id, status="failed", error=str(e)) db.update_job(job_id, status="failed", error=str(e))
from SPARC.webhooks import notify_job_completed
notify_job_completed(
job_id=job_id,
status="failed",
total_companies=len(companies),
successful=0,
failed=len(companies),
)
@app.post("/analyze/batch/async", response_model=JobStatus, tags=["Analysis"]) @app.post("/analyze/batch/async", response_model=JobStatus, tags=["Analysis"])
@@ -594,24 +584,51 @@ async def get_job_status(
return _job_row_to_status(job_row) return _job_row_to_status(job_row)
@app.get("/jobs", response_model=list[JobStatus], tags=["Jobs"]) @app.get("/jobs", response_model=PaginatedJobsResponse, tags=["Jobs"])
async def list_jobs( async def list_jobs(
status: Annotated[ status: Annotated[
str | None, str | None,
Query(description="Filter by status: pending, running, completed, failed"), Query(description="Filter by status: pending, running, completed, failed"),
] = None, ] = None,
limit: Annotated[int, Query(ge=1, le=100)] = 10, limit: Annotated[int, Query(ge=1, le=100)] = 10,
cursor: Annotated[
str | None,
Query(description="Opaque cursor from a previous response's next_cursor field"),
] = None,
_: UserResponse = Depends(get_current_user), _: UserResponse = Depends(get_current_user),
): ):
"""List all analysis jobs. """List analysis jobs with cursor-based pagination.
Pass ``limit`` to control page size. The response includes a ``next_cursor``
field; pass it back as the ``cursor`` query parameter to fetch the next page.
When ``next_cursor`` is ``null``, there are no more results.
Existing clients that use only ``limit`` (without ``cursor``) continue to
work without modification.
Args: Args:
status: Optional filter by job status status: Optional filter by job status
limit: Maximum number of jobs to return (default 10, max 100) limit: Maximum number of jobs to return (default 10, max 100)
cursor: Opaque pagination cursor from a previous response
Returns: Returns:
List of job statuses Paginated list of job statuses
""" """
db = _get_job_db() db = _get_job_db()
job_rows = db.list_jobs(status=status, limit=limit) # Fetch one extra to determine if there is a next page
return [_job_row_to_status(row) for row in job_rows] job_rows = db.list_jobs(status=status, limit=limit + 1, cursor=cursor)
has_next = len(job_rows) > limit
if has_next:
job_rows = job_rows[:limit]
items = [_job_row_to_status(row) for row in job_rows]
next_cursor = None
if has_next and job_rows:
last = job_rows[-1]
created = last["created_at"]
ts = created.isoformat() if hasattr(created, "isoformat") else str(created)
next_cursor = f"{ts}|{last['job_id']}"
return PaginatedJobsResponse(items=items, next_cursor=next_cursor)
+32 -7
View File
@@ -568,20 +568,45 @@ class DatabaseClient:
self, self,
status: Optional[str] = None, status: Optional[str] = None,
limit: int = 10, limit: int = 10,
cursor: Optional[str] = None,
) -> List[Dict]: ) -> List[Dict]:
"""List jobs, optionally filtered by status.""" """List jobs with optional status filter and cursor-based pagination.
query = "SELECT * FROM jobs"
Args:
status: Optional status filter (pending, running, completed, failed).
limit: Maximum number of jobs to return.
cursor: Opaque cursor (``created_at|job_id``) from a previous
response. When provided, only jobs older than the cursor are
returned.
Returns:
List of job dicts ordered by created_at descending.
"""
conditions: list[str] = []
params: list = [] params: list = []
if status: if status:
query += " WHERE status = %s" conditions.append("status = %s")
params.append(status) params.append(status)
query += " ORDER BY created_at DESC LIMIT %s"
if cursor:
try:
ts_str, cursor_job_id = cursor.rsplit("|", 1)
conditions.append("(created_at, job_id) < (%s, %s)")
params.extend([ts_str, cursor_job_id])
except ValueError:
pass # Ignore malformed cursors; return from start
query = "SELECT * FROM jobs"
if conditions:
query += " WHERE " + " AND ".join(conditions)
query += " ORDER BY created_at DESC, job_id DESC LIMIT %s"
params.append(limit) params.append(limit)
with self.get_conn() as conn: with self.get_conn() as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cursor: with conn.cursor(cursor_factory=RealDictCursor) as cur:
cursor.execute(query, params) cur.execute(query, params)
return [dict(row) for row in cursor.fetchall()] return [dict(row) for row in cur.fetchall()]
def mark_stale_jobs_failed(self) -> int: def mark_stale_jobs_failed(self) -> int:
"""Mark any jobs in 'running' or 'pending' state as 'failed'. """Mark any jobs in 'running' or 'pending' state as 'failed'.
-139
View File
@@ -1,139 +0,0 @@
"""Webhook notifications for job completion and alert events.
Sends JSON payloads to configured webhook URLs with retry logic.
Supports generic HTTP POST and Slack-compatible text payloads.
"""
import logging
import os
import time
from datetime import datetime
from typing import Any
import requests
logger = logging.getLogger(__name__)
# Comma-separated list of webhook URLs (env var based config)
_WEBHOOK_URLS_RAW = os.getenv("WEBHOOK_URLS", "")
WEBHOOK_URLS: list[str] = [
url.strip() for url in _WEBHOOK_URLS_RAW.split(",") if url.strip()
]
MAX_RETRIES = 3
BACKOFF_BASE = 2 # seconds
def _is_slack_url(url: str) -> bool:
"""Check if a URL looks like a Slack incoming webhook."""
return "hooks.slack.com" in url or "discord.com/api/webhooks" in url
def _build_payload(event_type: str, data: dict[str, Any], slack: bool = False) -> dict:
"""Build the webhook payload.
Args:
event_type: Type of event (e.g., "job_completed", "alert")
data: Event-specific data
slack: If True, wrap in Slack-compatible ``text`` format
Returns:
JSON-serializable payload dict
"""
payload = {
"event": event_type,
"timestamp": datetime.utcnow().isoformat() + "Z",
**data,
}
if slack:
# Build a human-readable summary for Slack/Discord
lines = [f"*[SPARC] {event_type}*"]
for key, value in data.items():
lines.append(f" {key}: {value}")
return {"text": "\n".join(lines)}
return payload
def _send_with_retry(url: str, payload: dict) -> bool:
"""Send a POST request with exponential backoff retry.
Args:
url: Webhook URL
payload: JSON payload to send
Returns:
True if delivered successfully, False after all retries exhausted
"""
for attempt in range(1, MAX_RETRIES + 1):
try:
response = requests.post(url, json=payload, timeout=10)
if response.status_code < 300:
logger.debug("Webhook delivered to %s (attempt %d)", url, attempt)
return True
logger.warning(
"Webhook %s returned %d (attempt %d/%d)",
url, response.status_code, attempt, MAX_RETRIES,
)
except requests.RequestException as e:
logger.warning(
"Webhook delivery failed for %s (attempt %d/%d): %s",
url, attempt, MAX_RETRIES, e,
)
if attempt < MAX_RETRIES:
wait = BACKOFF_BASE ** attempt
time.sleep(wait)
logger.error("Webhook permanently failed for %s after %d attempts", url, MAX_RETRIES)
return False
def notify(event_type: str, data: dict[str, Any]) -> None:
"""Fire all configured webhooks for an event.
Safe to call even when no webhooks are configured (returns immediately).
Args:
event_type: Event identifier (e.g., "job_completed", "patent_alert")
data: Event data to include in the payload
"""
if not WEBHOOK_URLS:
return
for url in WEBHOOK_URLS:
slack = _is_slack_url(url)
payload = _build_payload(event_type, data, slack=slack)
_send_with_retry(url, payload)
def notify_job_completed(
job_id: str,
status: str,
total_companies: int,
successful: int,
failed: int,
) -> None:
"""Send notification when a batch job completes."""
notify("job_completed", {
"job_id": job_id,
"status": status,
"total_companies": total_companies,
"successful": successful,
"failed": failed,
"summary": f"Batch job {job_id}: {successful}/{total_companies} succeeded",
})
def notify_alert(
company_name: str,
alert_type: str,
message: str,
) -> None:
"""Send notification for a tracked company alert."""
notify("patent_alert", {
"company_name": company_name,
"alert_type": alert_type,
"message": message,
})