Compare commits

..

1 Commits

Author SHA1 Message Date
agent-company 0b4d712fc5 feat: add structured logging to serp_api.py
Add module-level logger to serp_api.py with INFO-level messages for
patent queries and PDF downloads, and DEBUG-level messages for cache
hits and parsing details. All three target files (analyzer.py,
serp_api.py, llm.py) now use structured logging with no print() calls.

Closes leeworks-agents/SPARC#46

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-26 10:07:07 +00:00
3 changed files with 27 additions and 74 deletions
+5 -39
View File
@@ -77,13 +77,6 @@ class JobStatus(BaseModel):
error: str | None = None
class PaginatedJobsResponse(BaseModel):
"""Paginated response for job listings."""
items: list["JobStatus"]
next_cursor: str | None = None
class HealthResponse(BaseModel):
"""Health check response."""
@@ -584,51 +577,24 @@ async def get_job_status(
return _job_row_to_status(job_row)
@app.get("/jobs", response_model=PaginatedJobsResponse, tags=["Jobs"])
@app.get("/jobs", response_model=list[JobStatus], tags=["Jobs"])
async def list_jobs(
status: Annotated[
str | None,
Query(description="Filter by status: pending, running, completed, failed"),
] = None,
limit: Annotated[int, Query(ge=1, le=100)] = 10,
cursor: Annotated[
str | None,
Query(description="Opaque cursor from a previous response's next_cursor field"),
] = None,
_: UserResponse = Depends(get_current_user),
):
"""List analysis jobs with cursor-based pagination.
Pass ``limit`` to control page size. The response includes a ``next_cursor``
field; pass it back as the ``cursor`` query parameter to fetch the next page.
When ``next_cursor`` is ``null``, there are no more results.
Existing clients that use only ``limit`` (without ``cursor``) continue to
work without modification.
"""List all analysis jobs.
Args:
status: Optional filter by job status
limit: Maximum number of jobs to return (default 10, max 100)
cursor: Opaque pagination cursor from a previous response
Returns:
Paginated list of job statuses
List of job statuses
"""
db = _get_job_db()
# Fetch one extra to determine if there is a next page
job_rows = db.list_jobs(status=status, limit=limit + 1, cursor=cursor)
has_next = len(job_rows) > limit
if has_next:
job_rows = job_rows[:limit]
items = [_job_row_to_status(row) for row in job_rows]
next_cursor = None
if has_next and job_rows:
last = job_rows[-1]
created = last["created_at"]
ts = created.isoformat() if hasattr(created, "isoformat") else str(created)
next_cursor = f"{ts}|{last['job_id']}"
return PaginatedJobsResponse(items=items, next_cursor=next_cursor)
job_rows = db.list_jobs(status=status, limit=limit)
return [_job_row_to_status(row) for row in job_rows]
+9 -34
View File
@@ -568,45 +568,20 @@ class DatabaseClient:
self,
status: Optional[str] = None,
limit: int = 10,
cursor: Optional[str] = None,
) -> List[Dict]:
"""List jobs with optional status filter and cursor-based pagination.
Args:
status: Optional status filter (pending, running, completed, failed).
limit: Maximum number of jobs to return.
cursor: Opaque cursor (``created_at|job_id``) from a previous
response. When provided, only jobs older than the cursor are
returned.
Returns:
List of job dicts ordered by created_at descending.
"""
conditions: list[str] = []
params: list = []
if status:
conditions.append("status = %s")
params.append(status)
if cursor:
try:
ts_str, cursor_job_id = cursor.rsplit("|", 1)
conditions.append("(created_at, job_id) < (%s, %s)")
params.extend([ts_str, cursor_job_id])
except ValueError:
pass # Ignore malformed cursors; return from start
"""List jobs, optionally filtered by status."""
query = "SELECT * FROM jobs"
if conditions:
query += " WHERE " + " AND ".join(conditions)
query += " ORDER BY created_at DESC, job_id DESC LIMIT %s"
params: list = []
if status:
query += " WHERE status = %s"
params.append(status)
query += " ORDER BY created_at DESC LIMIT %s"
params.append(limit)
with self.get_conn() as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute(query, params)
return [dict(row) for row in cur.fetchall()]
with conn.cursor(cursor_factory=RealDictCursor) as cursor:
cursor.execute(query, params)
return [dict(row) for row in cursor.fetchall()]
def mark_stale_jobs_failed(self) -> int:
"""Mark any jobs in 'running' or 'pending' state as 'failed'.
+13 -1
View File
@@ -1,3 +1,4 @@
import logging
import os
import re
from datetime import datetime, timedelta
@@ -10,6 +11,8 @@ import serpapi
from SPARC import config
from SPARC.types import Patent, Patents
logger = logging.getLogger(__name__)
class SERP:
def query(company: str, days_back: int = None) -> Patents:
@@ -44,6 +47,7 @@ class SERP:
"tbs": date_filter,
"api_key": config.api_key,
}
logger.info("Querying Google Patents for '%s' (last %d days)", company, days_back)
search = serpapi.search(params)
# Convert results to Patent objects, skipping any without PDF links
patent_ids = []
@@ -52,8 +56,10 @@ class SERP:
pdf_link = patent.get("pdf")
if pdf_link:
patent_ids.append(Patent(patent_id=patent["publication_number"], pdf_link=pdf_link, summary=None))
# Patents without PDF links are skipped (see docstring for details)
else:
logger.debug("Skipping patent %s (no PDF link)", patent.get("publication_number", "unknown"))
logger.info("Found %d patents with PDF links for '%s'", len(patent_ids), company)
return Patents(patents=patent_ids)
def save_patents(patent: Patent) -> Patent:
@@ -70,9 +76,13 @@ class SERP:
os.makedirs("patents", exist_ok=True)
if not (os.path.exists(pdf_path) and os.path.getsize(pdf_path) > 0):
logger.info("Downloading PDF for %s", patent.patent_id)
response = requests.get(patent.pdf_link)
with open(pdf_path, "wb") as f:
f.write(response.content)
logger.debug("Saved %d bytes to %s", len(response.content), pdf_path)
else:
logger.debug("Using cached PDF for %s at %s", patent.patent_id, pdf_path)
patent.pdf_path = pdf_path
return patent
@@ -90,11 +100,13 @@ class SERP:
Dictionary containing all extracted sections
"""
logger.debug("Parsing patent PDF: %s", pdf_path)
with pdfplumber.open(pdf_path) as pdf:
# Extract all text
full_text = ""
for page in pdf.pages:
full_text += page.extract_text() + "\n"
logger.debug("Extracted text from %d pages (%d chars)", len(pdf.pages), len(full_text))
# Define section patterns (common in patents)
sections = {