Compare commits

..

1 Commits

Author SHA1 Message Date
agent-company 3b6411869d feat: add cursor-based pagination to /jobs endpoint
Add a cursor query parameter to GET /jobs and return a next_cursor
field in the response envelope. Existing clients using only limit
continue to work without modification. The cursor is an opaque token
encoding created_at and job_id for stable keyset pagination.

Closes leeworks-agents/SPARC#25

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-26 10:19:01 +00:00
3 changed files with 72 additions and 25 deletions
+39 -5
View File
@@ -77,6 +77,13 @@ class JobStatus(BaseModel):
error: str | None = None
class PaginatedJobsResponse(BaseModel):
"""Paginated response for job listings."""
items: list["JobStatus"]
next_cursor: str | None = None
class HealthResponse(BaseModel):
"""Health check response."""
@@ -577,24 +584,51 @@ async def get_job_status(
return _job_row_to_status(job_row)
@app.get("/jobs", response_model=list[JobStatus], tags=["Jobs"])
@app.get("/jobs", response_model=PaginatedJobsResponse, tags=["Jobs"])
async def list_jobs(
status: Annotated[
str | None,
Query(description="Filter by status: pending, running, completed, failed"),
] = None,
limit: Annotated[int, Query(ge=1, le=100)] = 10,
cursor: Annotated[
str | None,
Query(description="Opaque cursor from a previous response's next_cursor field"),
] = None,
_: UserResponse = Depends(get_current_user),
):
"""List all analysis jobs.
"""List analysis jobs with cursor-based pagination.
Pass ``limit`` to control page size. The response includes a ``next_cursor``
field; pass it back as the ``cursor`` query parameter to fetch the next page.
When ``next_cursor`` is ``null``, there are no more results.
Existing clients that use only ``limit`` (without ``cursor``) continue to
work without modification.
Args:
status: Optional filter by job status
limit: Maximum number of jobs to return (default 10, max 100)
cursor: Opaque pagination cursor from a previous response
Returns:
List of job statuses
Paginated list of job statuses
"""
db = _get_job_db()
job_rows = db.list_jobs(status=status, limit=limit)
return [_job_row_to_status(row) for row in job_rows]
# Fetch one extra to determine if there is a next page
job_rows = db.list_jobs(status=status, limit=limit + 1, cursor=cursor)
has_next = len(job_rows) > limit
if has_next:
job_rows = job_rows[:limit]
items = [_job_row_to_status(row) for row in job_rows]
next_cursor = None
if has_next and job_rows:
last = job_rows[-1]
created = last["created_at"]
ts = created.isoformat() if hasattr(created, "isoformat") else str(created)
next_cursor = f"{ts}|{last['job_id']}"
return PaginatedJobsResponse(items=items, next_cursor=next_cursor)
+32 -7
View File
@@ -568,20 +568,45 @@ class DatabaseClient:
self,
status: Optional[str] = None,
limit: int = 10,
cursor: Optional[str] = None,
) -> List[Dict]:
"""List jobs, optionally filtered by status."""
query = "SELECT * FROM jobs"
"""List jobs with optional status filter and cursor-based pagination.
Args:
status: Optional status filter (pending, running, completed, failed).
limit: Maximum number of jobs to return.
cursor: Opaque cursor (``created_at|job_id``) from a previous
response. When provided, only jobs older than the cursor are
returned.
Returns:
List of job dicts ordered by created_at descending.
"""
conditions: list[str] = []
params: list = []
if status:
query += " WHERE status = %s"
conditions.append("status = %s")
params.append(status)
query += " ORDER BY created_at DESC LIMIT %s"
if cursor:
try:
ts_str, cursor_job_id = cursor.rsplit("|", 1)
conditions.append("(created_at, job_id) < (%s, %s)")
params.extend([ts_str, cursor_job_id])
except ValueError:
pass # Ignore malformed cursors; return from start
query = "SELECT * FROM jobs"
if conditions:
query += " WHERE " + " AND ".join(conditions)
query += " ORDER BY created_at DESC, job_id DESC LIMIT %s"
params.append(limit)
with self.get_conn() as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cursor:
cursor.execute(query, params)
return [dict(row) for row in cursor.fetchall()]
with conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute(query, params)
return [dict(row) for row in cur.fetchall()]
def mark_stale_jobs_failed(self) -> int:
"""Mark any jobs in 'running' or 'pending' state as 'failed'.
+1 -13
View File
@@ -1,4 +1,3 @@
import logging
import os
import re
from datetime import datetime, timedelta
@@ -11,8 +10,6 @@ import serpapi
from SPARC import config
from SPARC.types import Patent, Patents
logger = logging.getLogger(__name__)
class SERP:
def query(company: str, days_back: int = None) -> Patents:
@@ -47,7 +44,6 @@ class SERP:
"tbs": date_filter,
"api_key": config.api_key,
}
logger.info("Querying Google Patents for '%s' (last %d days)", company, days_back)
search = serpapi.search(params)
# Convert results to Patent objects, skipping any without PDF links
patent_ids = []
@@ -56,10 +52,8 @@ class SERP:
pdf_link = patent.get("pdf")
if pdf_link:
patent_ids.append(Patent(patent_id=patent["publication_number"], pdf_link=pdf_link, summary=None))
else:
logger.debug("Skipping patent %s (no PDF link)", patent.get("publication_number", "unknown"))
# Patents without PDF links are skipped (see docstring for details)
logger.info("Found %d patents with PDF links for '%s'", len(patent_ids), company)
return Patents(patents=patent_ids)
def save_patents(patent: Patent) -> Patent:
@@ -76,13 +70,9 @@ class SERP:
os.makedirs("patents", exist_ok=True)
if not (os.path.exists(pdf_path) and os.path.getsize(pdf_path) > 0):
logger.info("Downloading PDF for %s", patent.patent_id)
response = requests.get(patent.pdf_link)
with open(pdf_path, "wb") as f:
f.write(response.content)
logger.debug("Saved %d bytes to %s", len(response.content), pdf_path)
else:
logger.debug("Using cached PDF for %s at %s", patent.patent_id, pdf_path)
patent.pdf_path = pdf_path
return patent
@@ -100,13 +90,11 @@ class SERP:
Dictionary containing all extracted sections
"""
logger.debug("Parsing patent PDF: %s", pdf_path)
with pdfplumber.open(pdf_path) as pdf:
# Extract all text
full_text = ""
for page in pdf.pages:
full_text += page.extract_text() + "\n"
logger.debug("Extracted text from %d pages (%d chars)", len(pdf.pages), len(full_text))
# Define section patterns (common in patents)
sections = {