Compare commits

..

1 Commits

Author SHA1 Message Date
agent-company 96d5d27b17 feat(jobs): persist async batch job state in PostgreSQL
- Add jobs table to database schema (job_id, status, progress, result_json, etc.)
- Add DatabaseClient methods: create_job, update_job, get_job, list_jobs
- Add mark_stale_jobs_failed() called at startup to handle interrupted jobs
- Refactor _run_batch_job and job endpoints to read/write from PostgreSQL
- Remove in-memory _jobs dict; job state now survives API restarts
- Update init_database.py to list all tables in output

Closes leeworks-agents/SPARC#8

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-26 04:22:57 +00:00
8 changed files with 245 additions and 81 deletions
+17 -21
View File
@@ -5,13 +5,10 @@ to provide company performance estimation based on patent portfolios.
"""
import hashlib
import logging
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Callable
from SPARC import config
logger = logging.getLogger(__name__)
from SPARC.database import DatabaseClient
from SPARC.serp_api import SERP
from SPARC.llm import LLMAnalyzer
@@ -55,13 +52,13 @@ class CompanyAnalyzer:
query_hash = hashlib.sha256(company_name.lower().encode()).hexdigest()
cached_ids = self.db.get_cached_serp_query(query_hash)
if cached_ids is not None:
logger.info("Using cached SERP results for %s (%d patents)", company_name, len(cached_ids))
print(f"Using cached SERP results for {company_name} ({len(cached_ids)} patents)")
patents = Patents(patents=[
Patent(patent_id=pid, pdf_link="")
for pid in cached_ids
])
else:
logger.info("Retrieving patents for %s...", company_name)
print(f"Retrieving patents for {company_name}...")
patents = SERP.query(company_name)
# Cache the SERP results
if patents.patents:
@@ -69,13 +66,12 @@ class CompanyAnalyzer:
company_name=company_name,
query_hash=query_hash,
patent_ids=[p.patent_id for p in patents.patents],
ttl_hours=config.serp_cache_ttl_hours,
)
if not patents.patents:
return f"No patents found for {company_name}"
logger.info("Found %d patents. Processing...", len(patents.patents))
print(f"Found {len(patents.patents)} patents. Processing...")
# Download, parse, and minimize patents in parallel
processed_patents = []
@@ -91,12 +87,12 @@ class CompanyAnalyzer:
if result:
processed_patents.append(result)
except Exception as e:
logger.warning("Failed to process %s: %s", patent.patent_id, e)
print(f"Warning: Failed to process {patent.patent_id}: {e}")
if not processed_patents:
return f"Failed to process any patents for {company_name}"
logger.info("Analyzing portfolio with LLM...")
print(f"Analyzing portfolio with LLM...")
# Analyze the full portfolio with LLM
analysis = self.llm_analyzer.analyze_patent_portfolio(
@@ -119,7 +115,7 @@ class CompanyAnalyzer:
"""
# Note: This simplified version assumes the patent PDF is already downloaded
# A more complete implementation would support direct patent ID lookup
logger.info("Analyzing patent %s for %s...", patent_id, company_name)
print(f"Analyzing patent {patent_id} for {company_name}...")
patent_path = f"patents/{patent_id}.pdf"
@@ -173,7 +169,7 @@ class CompanyAnalyzer:
return {"patent_id": patent.patent_id, "content": minimized_content}
except Exception as e:
logger.warning("Failed to process %s: %s", patent.patent_id, e)
print(f"Warning: Failed to process {patent.patent_id}: {e}")
return None
def _analyze_company_safe(self, company_name: str) -> CompanyAnalysisResult:
@@ -244,7 +240,7 @@ class CompanyAnalyzer:
results: list[CompanyAnalysisResult] = []
total = len(companies)
logger.info("Starting batch analysis of %d companies...", total)
print(f"Starting batch analysis of {total} companies...")
with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_company = {
@@ -261,8 +257,8 @@ class CompanyAnalyzer:
result = future.result()
results.append(result)
status = "OK" if result.success else "FAIL"
logger.info("[%d/%d] %s %s", completed, total, status, company)
status = "" if result.success else ""
print(f"[{completed}/{total}] {status} {company}")
if progress_callback:
progress_callback(company, completed, total)
@@ -277,12 +273,12 @@ class CompanyAnalyzer:
error=str(e),
)
)
logger.error("[%d/%d] FAIL %s: %s", completed, total, company, e)
print(f"[{completed}/{total}] ✗ {company}: {e}")
successful = sum(1 for r in results if r.success)
failed = total - successful
logger.info("Batch complete: %d succeeded, %d failed", successful, failed)
print(f"\nBatch complete: {successful} succeeded, {failed} failed")
return BatchAnalysisResult(
results=results,
@@ -308,20 +304,20 @@ class CompanyAnalyzer:
results: list[CompanyAnalysisResult] = []
total = len(companies)
logger.info("Starting sequential analysis of %d companies...", total)
print(f"Starting sequential analysis of {total} companies...")
for idx, company in enumerate(companies, 1):
logger.info("[%d/%d] Analyzing %s...", idx, total, company)
print(f"\n[{idx}/{total}] Analyzing {company}...")
result = self._analyze_company_safe(company)
results.append(result)
status = "OK" if result.success else "FAIL"
logger.info("[%d/%d] %s %s", idx, total, status, company)
status = "" if result.success else ""
print(f"[{idx}/{total}] {status} {company}")
successful = sum(1 for r in results if r.success)
failed = total - successful
logger.info("Batch complete: %d succeeded, %d failed", successful, failed)
print(f"\nBatch complete: {successful} succeeded, {failed} failed")
return BatchAnalysisResult(
results=results,
+69 -33
View File
@@ -114,8 +114,7 @@ class AnalyticsResponse(BaseModel):
period_days: int
# In-memory job storage (for demo; production would use Redis/DB)
_jobs: dict[str, JobStatus] = {}
# Job counter for generating unique IDs (the actual state is in PostgreSQL)
_job_counter = 0
@@ -148,9 +147,19 @@ _analyzer: CompanyAnalyzer | None = None
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Initialize resources on startup."""
"""Initialize resources on startup, clean up on shutdown."""
global _analyzer
_analyzer = CompanyAnalyzer()
# Mark any jobs that were running/pending before the restart as failed
from SPARC.database import DatabaseClient
_db = DatabaseClient(config.database_url)
_db.connect()
_db.initialize_schema()
stale = _db.mark_stale_jobs_failed()
if stale:
import logging
logging.getLogger(__name__).warning("Marked %d stale jobs as failed on startup", stale)
_db.close()
yield
# Cleanup if needed
_analyzer = None
@@ -422,20 +431,52 @@ async def analyze_companies_batch(
return _convert_batch_result(result)
def _get_job_db() -> "DatabaseClient":
"""Get a DatabaseClient for job persistence."""
from SPARC.database import DatabaseClient
db = DatabaseClient(config.database_url)
return db
def _job_row_to_status(row: dict) -> JobStatus:
"""Convert a database job row to a JobStatus model."""
import json as _json
result = None
if row.get("result_json"):
result_data = row["result_json"]
if isinstance(result_data, str):
result_data = _json.loads(result_data)
result = BatchAnalysisResponse(**result_data)
return JobStatus(
job_id=row["job_id"],
status=row["status"],
progress=row["progress"],
total_companies=row["total_companies"],
completed_companies=row["completed_companies"],
result=result,
error=row.get("error"),
)
def _run_batch_job(job_id: str, companies: list[str], max_workers: int):
"""Background task for batch analysis."""
global _jobs, _analyzer
import json as _json
global _analyzer
db = _get_job_db()
if not _analyzer:
_jobs[job_id].status = "failed"
_jobs[job_id].error = "Analyzer not initialized"
db.update_job(job_id, status="failed", error="Analyzer not initialized")
return
_jobs[job_id].status = "running"
db.update_job(job_id, status="running")
def progress_callback(company: str, completed: int, total: int):
_jobs[job_id].completed_companies = completed
_jobs[job_id].progress = int((completed / total) * 100)
db.update_job(
job_id,
completed_companies=completed,
progress=int((completed / total) * 100),
)
try:
result = _analyzer.analyze_companies(
@@ -443,12 +484,15 @@ def _run_batch_job(job_id: str, companies: list[str], max_workers: int):
max_workers=max_workers,
progress_callback=progress_callback,
)
_jobs[job_id].status = "completed"
_jobs[job_id].progress = 100
_jobs[job_id].result = _convert_batch_result(result)
batch_response = _convert_batch_result(result)
db.update_job(
job_id,
status="completed",
progress=100,
result_json=_json.dumps(batch_response.model_dump(), default=str),
)
except Exception as e:
_jobs[job_id].status = "failed"
_jobs[job_id].error = str(e)
db.update_job(job_id, status="failed", error=str(e))
@app.post("/analyze/batch/async", response_model=JobStatus, tags=["Analysis"])
@@ -473,19 +517,14 @@ async def analyze_companies_async(
_job_counter += 1
job_id = f"job_{_job_counter}_{datetime.now().strftime('%Y%m%d%H%M%S')}"
_jobs[job_id] = JobStatus(
job_id=job_id,
status="pending",
progress=0,
total_companies=len(request.companies),
completed_companies=0,
)
db = _get_job_db()
job_row = db.create_job(job_id=job_id, total_companies=len(request.companies))
background_tasks.add_task(
_run_batch_job, job_id, request.companies, request.max_workers
)
return _jobs[job_id]
return _job_row_to_status(job_row)
@app.get("/jobs/{job_id}", response_model=JobStatus, tags=["Jobs"])
@@ -501,10 +540,13 @@ async def get_job_status(
Returns:
Current job status including progress and results when complete
"""
if job_id not in _jobs:
db = _get_job_db()
job_row = db.get_job(job_id)
if not job_row:
raise HTTPException(status_code=404, detail=f"Job {job_id} not found")
return _jobs[job_id]
return _job_row_to_status(job_row)
@app.get("/jobs", response_model=list[JobStatus], tags=["Jobs"])
@@ -525,12 +567,6 @@ async def list_jobs(
Returns:
List of job statuses
"""
jobs = list(_jobs.values())
if status:
jobs = [j for j in jobs if j.status == status]
# Return most recent first
jobs.sort(key=lambda j: j.job_id, reverse=True)
return jobs[:limit]
db = _get_job_db()
job_rows = db.list_jobs(status=status, limit=limit)
return [_job_row_to_status(row) for row in job_rows]
+1 -16
View File
@@ -2,20 +2,11 @@
Loads environment variables from .env file for API keys and other secrets.
"""
import logging
from dotenv import load_dotenv
import os
from dotenv import load_dotenv
load_dotenv()
# Logging configuration
log_level = os.getenv("LOG_LEVEL", "INFO").upper()
logging.basicConfig(
level=getattr(logging, log_level, logging.INFO),
format="%(asctime)s %(levelname)s %(name)s %(message)s",
)
# SerpAPI key for patent search
api_key = os.getenv("API_KEY")
@@ -39,12 +30,6 @@ use_database = os.getenv("USE_DATABASE", "false").lower() in ("true", "1", "yes"
patent_search_days = int(os.getenv("PATENT_SEARCH_DAYS", "90"))
patent_thread_workers = int(os.getenv("PATENT_THREAD_WORKERS", "5"))
# LLM model to use via OpenRouter (e.g. "anthropic/claude-3.5-sonnet", "openai/gpt-4o")
model = os.getenv("MODEL", "anthropic/claude-3.5-sonnet")
# SERP cache TTL in hours (how long cached search results are considered fresh)
serp_cache_ttl_hours = int(os.getenv("SERP_CACHE_TTL_HOURS", "24"))
# Root path for running behind a reverse proxy (e.g., "/api" when served at /api/)
# This ensures OpenAPI docs work correctly when accessed via the proxy
root_path = os.getenv("ROOT_PATH", "")
+145
View File
@@ -171,6 +171,26 @@ class DatabaseClient:
ON serp_queries(query_hash)
""")
# Create jobs table for persisting async batch job state
cursor.execute("""
CREATE TABLE IF NOT EXISTS jobs (
job_id VARCHAR(128) PRIMARY KEY,
status VARCHAR(20) NOT NULL DEFAULT 'pending',
progress INTEGER NOT NULL DEFAULT 0,
total_companies INTEGER NOT NULL DEFAULT 0,
completed_companies INTEGER NOT NULL DEFAULT 0,
result_json JSONB,
error TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_jobs_status
ON jobs(status)
""")
self.conn.commit()
@staticmethod
@@ -462,6 +482,131 @@ class DatabaseClient:
)
conn.commit()
# Job Persistence Methods
def create_job(
self,
job_id: str,
total_companies: int,
) -> Dict:
"""Create a new job record.
Args:
job_id: Unique job identifier
total_companies: Number of companies in the batch
Returns:
Job dict
"""
with self.get_conn() as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cursor:
cursor.execute(
"""
INSERT INTO jobs (job_id, status, progress, total_companies, completed_companies)
VALUES (%s, 'pending', 0, %s, 0)
RETURNING *
""",
(job_id, total_companies),
)
job = cursor.fetchone()
conn.commit()
return dict(job)
def update_job(
self,
job_id: str,
status: Optional[str] = None,
progress: Optional[int] = None,
completed_companies: Optional[int] = None,
result_json: Optional[str] = None,
error: Optional[str] = None,
) -> Optional[Dict]:
"""Update a job's state.
Only non-None fields are updated.
"""
updates = []
params = []
if status is not None:
updates.append("status = %s")
params.append(status)
if progress is not None:
updates.append("progress = %s")
params.append(progress)
if completed_companies is not None:
updates.append("completed_companies = %s")
params.append(completed_companies)
if result_json is not None:
updates.append("result_json = %s")
params.append(result_json)
if error is not None:
updates.append("error = %s")
params.append(error)
if not updates:
return self.get_job(job_id)
updates.append("updated_at = CURRENT_TIMESTAMP")
params.append(job_id)
with self.get_conn() as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cursor:
cursor.execute(
f"UPDATE jobs SET {', '.join(updates)} WHERE job_id = %s RETURNING *",
params,
)
job = cursor.fetchone()
conn.commit()
return dict(job) if job else None
def get_job(self, job_id: str) -> Optional[Dict]:
"""Get a job by ID."""
with self.get_conn() as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cursor:
cursor.execute("SELECT * FROM jobs WHERE job_id = %s", (job_id,))
job = cursor.fetchone()
return dict(job) if job else None
def list_jobs(
self,
status: Optional[str] = None,
limit: int = 10,
) -> List[Dict]:
"""List jobs, optionally filtered by status."""
query = "SELECT * FROM jobs"
params: list = []
if status:
query += " WHERE status = %s"
params.append(status)
query += " ORDER BY created_at DESC LIMIT %s"
params.append(limit)
with self.get_conn() as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cursor:
cursor.execute(query, params)
return [dict(row) for row in cursor.fetchall()]
def mark_stale_jobs_failed(self) -> int:
"""Mark any jobs in 'running' or 'pending' state as 'failed'.
Called at startup to clean up jobs that were interrupted by a restart.
Returns:
Number of jobs marked as failed.
"""
with self.get_conn() as conn:
with conn.cursor() as cursor:
cursor.execute(
"""
UPDATE jobs SET status = 'failed', error = 'Interrupted by server restart',
updated_at = CURRENT_TIMESTAMP
WHERE status IN ('running', 'pending')
"""
)
count = cursor.rowcount
conn.commit()
return count
# User Authentication Methods
@staticmethod
+8 -9
View File
@@ -1,14 +1,9 @@
"""LLM integration for patent analysis using OpenRouter."""
import logging
from typing import Dict
from openai import OpenAI
from SPARC import config
from SPARC.database import DatabaseClient
logger = logging.getLogger(__name__)
from typing import Dict
class LLMAnalyzer:
@@ -25,7 +20,7 @@ class LLMAnalyzer:
"""
self.test_mode = test_mode
self.use_cache = use_cache if use_cache is not None else config.use_cache
self.model = config.model
self.model = "anthropic/claude-3.5-sonnet"
# Always initialize database client for storage and caching
self.db_client = DatabaseClient(config.database_url)
@@ -64,7 +59,11 @@ Patent Content:
Provide a concise analysis (2-3 paragraphs) focusing on what this patent reveals about the company's technical direction and competitive advantage."""
if self.test_mode:
logger.debug("TEST MODE - Prompt that would be sent to LLM:\n%s", prompt)
print("=" * 80)
print("TEST MODE - Prompt that would be sent to LLM:")
print("=" * 80)
print(prompt)
print("=" * 80)
return "[TEST MODE - No API call made]"
# Check cache first
@@ -166,7 +165,7 @@ Patent Portfolio:
Provide a comprehensive analysis (4-5 paragraphs) with a final verdict on the company's innovation strength and performance outlook."""
if self.test_mode:
logger.debug("TEST MODE - Portfolio prompt:\n%s", prompt)
print(prompt)
return "[TEST MODE]"
metadata = {
+1 -1
View File
@@ -4,7 +4,7 @@ from datetime import datetime
@dataclass
class Patent:
patent_id: str
patent_id: int
pdf_link: str
pdf_path: str | None = None
summary: dict | None = None
+3
View File
@@ -40,6 +40,9 @@ def main():
print("\nTables created:")
print(" - llm_messages: Stores all LLM prompts and responses")
print(" - users: Stores user accounts")
print(" - jobs: Stores async batch job state")
print(" - patents: Patent PDF cache")
print(" - serp_queries: SERP query result cache")
print("\nIndexes created:")
print(" - idx_messages_timestamp: For time-based queries")
print(" - idx_messages_company: For company-specific queries")
+1 -1
View File
@@ -5,7 +5,7 @@ from datetime import datetime
from unittest.mock import Mock, patch
from fastapi.testclient import TestClient
from SPARC.api import app, _analyzer, _jobs
from SPARC.api import app
from SPARC.types import CompanyAnalysisResult, BatchAnalysisResult