Files
SPARC/SPARC/analyzer.py
T
agent-company 3dac88ec90 docs: document patent PDF storage, add FileNotFoundError, commit lockfile
- Add docstring to analyze_single_patent explaining the PDF prerequisite
- Raise FileNotFoundError with helpful message when PDF is missing
- Add patent PDF storage section to README with Docker volume mount example
- Commit frontend/package-lock.json for reproducible builds

Closes leeworks-agents/SPARC#15
Closes leeworks-agents/SPARC#17

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-26 04:17:09 +00:00

342 lines
12 KiB
Python

"""High-level patent analysis orchestration.
This module ties together patent retrieval, parsing, and LLM analysis
to provide company performance estimation based on patent portfolios.
"""
import hashlib
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Callable
from SPARC import config
from SPARC.database import DatabaseClient
from SPARC.serp_api import SERP
from SPARC.llm import LLMAnalyzer
from SPARC.types import Patent, Patents, CompanyAnalysisResult, BatchAnalysisResult
class CompanyAnalyzer:
"""Orchestrates end-to-end company performance analysis via patents."""
def __init__(self, openrouter_api_key: str | None = None, db_client: DatabaseClient | None = None):
"""Initialize the company analyzer.
Args:
openrouter_api_key: Optional OpenRouter API key. If None, loads from config.
db_client: Optional DatabaseClient for patent caching. Created automatically if None.
"""
self.llm_analyzer = LLMAnalyzer(api_key=openrouter_api_key)
self.db = db_client or DatabaseClient(config.database_url)
self.db.connect()
self.db.initialize_schema()
def analyze_company(self, company_name: str, patents: "Patents | None" = None) -> str:
"""Analyze a company's performance based on their patent portfolio.
This is the main entry point that orchestrates the full pipeline:
1. Retrieve patents from SERP API
2. Download and parse each patent PDF
3. Minimize patent content (remove bloat)
4. Analyze portfolio with LLM
5. Return performance estimation
Args:
company_name: Name of the company to analyze
patents: Optional pre-fetched Patents result to avoid duplicate API calls
Returns:
Comprehensive analysis of company's innovation and performance outlook
"""
if patents is None:
# Check SERP query cache first
query_hash = hashlib.sha256(company_name.lower().encode()).hexdigest()
cached_ids = self.db.get_cached_serp_query(query_hash)
if cached_ids is not None:
print(f"Using cached SERP results for {company_name} ({len(cached_ids)} patents)")
patents = Patents(patents=[
Patent(patent_id=pid, pdf_link="")
for pid in cached_ids
])
else:
print(f"Retrieving patents for {company_name}...")
patents = SERP.query(company_name)
# Cache the SERP results
if patents.patents:
self.db.store_serp_query(
company_name=company_name,
query_hash=query_hash,
patent_ids=[p.patent_id for p in patents.patents],
)
if not patents.patents:
return f"No patents found for {company_name}"
print(f"Found {len(patents.patents)} patents. Processing...")
# Download, parse, and minimize patents in parallel
processed_patents = []
with ThreadPoolExecutor(max_workers=config.patent_thread_workers) as executor:
future_to_patent = {
executor.submit(self._process_single_patent, patent, company_name, self.db): patent
for patent in patents.patents
}
for future in as_completed(future_to_patent):
patent = future_to_patent[future]
try:
result = future.result()
if result:
processed_patents.append(result)
except Exception as e:
print(f"Warning: Failed to process {patent.patent_id}: {e}")
if not processed_patents:
return f"Failed to process any patents for {company_name}"
print(f"Analyzing portfolio with LLM...")
# Analyze the full portfolio with LLM
analysis = self.llm_analyzer.analyze_patent_portfolio(
patents_data=processed_patents, company_name=company_name
)
return analysis
def analyze_single_patent(self, patent_id: str, company_name: str) -> str:
"""Analyze a single patent by ID.
Prerequisite:
The patent PDF must already exist at ``patents/{patent_id}.pdf``
before calling this method. PDFs are downloaded automatically when
using the batch analysis pipeline (``analyze_company`` or the
``/analyze/batch`` API endpoint). For standalone usage, download
the PDF manually or call ``SERP.save_patents()`` first.
Args:
patent_id: Publication ID of the patent (e.g. "US-11234567-B2")
company_name: Name of the company (for context)
Returns:
Analysis of the specific patent's innovation quality
Raises:
FileNotFoundError: If the patent PDF is not found at the expected path.
"""
import os
patent_path = f"patents/{patent_id}.pdf"
if not os.path.exists(patent_path):
raise FileNotFoundError(
f"Patent PDF not found at '{patent_path}'. "
f"Download the PDF first using SERP.save_patents() or the batch analysis pipeline."
)
try:
sections = SERP.parse_patent_pdf(patent_path)
minimized_content = SERP.minimize_patent_for_llm(sections)
analysis = self.llm_analyzer.analyze_patent_content(
patent_content=minimized_content, company_name=company_name
)
return analysis
except FileNotFoundError:
raise
except Exception as e:
return f"Failed to analyze patent {patent_id}: {e}"
@staticmethod
def _process_single_patent(
patent: Patent,
company_name: str = "",
db: DatabaseClient | None = None,
) -> dict | None:
"""Download, parse, and minimize a single patent. Thread-safe.
Checks DB cache before downloading. Stores results after processing.
Returns:
Dict with patent_id and minimized content, or None on failure.
"""
try:
# Check DB cache first
if db:
cached = db.get_cached_patent(patent.patent_id)
if cached and cached.get("minimized_content"):
return {"patent_id": patent.patent_id, "content": cached["minimized_content"]}
# Full processing: download, parse, minimize
patent = SERP.save_patents(patent)
sections = SERP.parse_patent_pdf(patent.pdf_path)
minimized_content = SERP.minimize_patent_for_llm(sections)
# Store in DB cache
if db:
db.store_patent(
patent_id=patent.patent_id,
company_name=company_name,
pdf_link=patent.pdf_link,
raw_sections=sections,
minimized_content=minimized_content,
)
return {"patent_id": patent.patent_id, "content": minimized_content}
except Exception as e:
print(f"Warning: Failed to process {patent.patent_id}: {e}")
return None
def _analyze_company_safe(self, company_name: str) -> CompanyAnalysisResult:
"""Internal wrapper that catches exceptions and returns structured result.
Args:
company_name: Name of the company to analyze
Returns:
CompanyAnalysisResult with success/failure status
"""
try:
# Delegate to analyze_company which handles SERP/patent caching
analysis = self.analyze_company(company_name)
# Determine patent count from cached SERP query
query_hash = hashlib.sha256(company_name.lower().encode()).hexdigest()
cached_ids = self.db.get_cached_serp_query(query_hash)
patent_count = len(cached_ids) if cached_ids else 0
# Check if analysis indicates failure
if analysis.startswith("No patents found") or analysis.startswith(
"Failed to process"
):
return CompanyAnalysisResult(
company_name=company_name,
analysis=analysis,
patent_count=patent_count,
success=False,
error=analysis,
)
return CompanyAnalysisResult(
company_name=company_name,
analysis=analysis,
patent_count=patent_count,
success=True,
)
except Exception as e:
return CompanyAnalysisResult(
company_name=company_name,
analysis="",
patent_count=0,
success=False,
error=str(e),
)
def analyze_companies(
self,
companies: list[str],
max_workers: int = 3,
progress_callback: Callable[[str, int, int], None] | None = None,
) -> BatchAnalysisResult:
"""Analyze multiple companies' patent portfolios in batch.
Processes companies concurrently for improved performance while
respecting API rate limits.
Args:
companies: List of company names to analyze
max_workers: Maximum concurrent analyses (default 3 to avoid rate limits)
progress_callback: Optional callback(company_name, completed, total)
Returns:
BatchAnalysisResult containing all individual results and summary stats
"""
results: list[CompanyAnalysisResult] = []
total = len(companies)
print(f"Starting batch analysis of {total} companies...")
with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_company = {
executor.submit(self._analyze_company_safe, company): company
for company in companies
}
completed = 0
for future in as_completed(future_to_company):
company = future_to_company[future]
completed += 1
try:
result = future.result()
results.append(result)
status = "" if result.success else ""
print(f"[{completed}/{total}] {status} {company}")
if progress_callback:
progress_callback(company, completed, total)
except Exception as e:
results.append(
CompanyAnalysisResult(
company_name=company,
analysis="",
patent_count=0,
success=False,
error=str(e),
)
)
print(f"[{completed}/{total}] ✗ {company}: {e}")
successful = sum(1 for r in results if r.success)
failed = total - successful
print(f"\nBatch complete: {successful} succeeded, {failed} failed")
return BatchAnalysisResult(
results=results,
total_companies=total,
successful=successful,
failed=failed,
)
def analyze_companies_sequential(
self, companies: list[str]
) -> BatchAnalysisResult:
"""Analyze multiple companies sequentially (safer for rate limits).
Use this when you want more control over API rate limiting or
when debugging issues.
Args:
companies: List of company names to analyze
Returns:
BatchAnalysisResult containing all individual results
"""
results: list[CompanyAnalysisResult] = []
total = len(companies)
print(f"Starting sequential analysis of {total} companies...")
for idx, company in enumerate(companies, 1):
print(f"\n[{idx}/{total}] Analyzing {company}...")
result = self._analyze_company_safe(company)
results.append(result)
status = "" if result.success else ""
print(f"[{idx}/{total}] {status} {company}")
successful = sum(1 for r in results if r.success)
failed = total - successful
print(f"\nBatch complete: {successful} succeeded, {failed} failed")
return BatchAnalysisResult(
results=results,
total_companies=total,
successful=successful,
failed=failed,
)