Files
SPARC/SPARC/analyzer.py
T
agent-company fbb72fe2a5 ci: add pytest and ruff linting to CI, fix all lint errors
- Add test job to build.yaml that runs pytest and ruff before building images
- Add standalone test.yaml workflow for PRs
- Add ruff.toml with E/F/I rules configured
- Fix all ruff lint errors: sort imports, remove unused imports, fix re-exports
- Build jobs now depend on test job passing (needs: test)

Closes leeworks-agents/SPARC#18
Closes leeworks-agents/SPARC#19

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-26 07:04:00 +00:00

347 lines
13 KiB
Python

"""High-level patent analysis orchestration.
This module ties together patent retrieval, parsing, and LLM analysis
to provide company performance estimation based on patent portfolios.
"""
import hashlib
import logging
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Callable
from SPARC import config
logger = logging.getLogger(__name__)
from SPARC.database import DatabaseClient
from SPARC.llm import LLMAnalyzer
from SPARC.serp_api import SERP
from SPARC.types import BatchAnalysisResult, CompanyAnalysisResult, Patent, Patents
class CompanyAnalyzer:
"""Orchestrates end-to-end company performance analysis via patents."""
def __init__(self, openrouter_api_key: str | None = None, db_client: DatabaseClient | None = None):
"""Initialize the company analyzer.
Args:
openrouter_api_key: Optional OpenRouter API key. If None, loads from config.
db_client: Optional DatabaseClient for patent caching. Created automatically if None.
"""
self.llm_analyzer = LLMAnalyzer(api_key=openrouter_api_key)
self.db = db_client or DatabaseClient(config.database_url)
self.db.connect()
self.db.initialize_schema()
def analyze_company(self, company_name: str, patents: "Patents | None" = None) -> str:
"""Analyze a company's performance based on their patent portfolio.
This is the main entry point that orchestrates the full pipeline:
1. Retrieve patents from SERP API
2. Download and parse each patent PDF
3. Minimize patent content (remove bloat)
4. Analyze portfolio with LLM
5. Return performance estimation
Args:
company_name: Name of the company to analyze
patents: Optional pre-fetched Patents result to avoid duplicate API calls
Returns:
Comprehensive analysis of company's innovation and performance outlook
"""
if patents is None:
# Check SERP query cache first
query_hash = hashlib.sha256(company_name.lower().encode()).hexdigest()
cached_ids = self.db.get_cached_serp_query(query_hash)
if cached_ids is not None:
logger.info("Using cached SERP results for %s (%d patents)", company_name, len(cached_ids))
patents = Patents(patents=[
Patent(patent_id=pid, pdf_link="")
for pid in cached_ids
])
else:
logger.info("Retrieving patents for %s...", company_name)
patents = SERP.query(company_name)
# Cache the SERP results
if patents.patents:
self.db.store_serp_query(
company_name=company_name,
query_hash=query_hash,
patent_ids=[p.patent_id for p in patents.patents],
ttl_hours=config.serp_cache_ttl_hours,
)
if not patents.patents:
return f"No patents found for {company_name}"
logger.info("Found %d patents. Processing...", len(patents.patents))
# Download, parse, and minimize patents in parallel
processed_patents = []
with ThreadPoolExecutor(max_workers=config.patent_thread_workers) as executor:
future_to_patent = {
executor.submit(self._process_single_patent, patent, company_name, self.db): patent
for patent in patents.patents
}
for future in as_completed(future_to_patent):
patent = future_to_patent[future]
try:
result = future.result()
if result:
processed_patents.append(result)
except Exception as e:
logger.warning("Failed to process %s: %s", patent.patent_id, e)
if not processed_patents:
return f"Failed to process any patents for {company_name}"
logger.info("Analyzing portfolio with LLM...")
# Analyze the full portfolio with LLM
analysis = self.llm_analyzer.analyze_patent_portfolio(
patents_data=processed_patents, company_name=company_name
)
return analysis
def analyze_single_patent(self, patent_id: str, company_name: str) -> str:
"""Analyze a single patent by ID.
Prerequisite:
The patent PDF must already exist at ``patents/{patent_id}.pdf``
before calling this method. PDFs are downloaded automatically when
using the batch analysis pipeline (``analyze_company`` or the
``/analyze/batch`` API endpoint). For standalone usage, download
the PDF manually or call ``SERP.save_patents()`` first.
Args:
patent_id: Publication ID of the patent (e.g. "US-11234567-B2")
company_name: Name of the company (for context)
Returns:
Analysis of the specific patent's innovation quality
Raises:
FileNotFoundError: If the patent PDF is not found at the expected path.
"""
import os
logger.info("Analyzing patent %s for %s...", patent_id, company_name)
patent_path = f"patents/{patent_id}.pdf"
if not os.path.exists(patent_path):
raise FileNotFoundError(
f"Patent PDF not found at '{patent_path}'. "
f"Download the PDF first using SERP.save_patents() or the batch analysis pipeline."
)
try:
sections = SERP.parse_patent_pdf(patent_path)
minimized_content = SERP.minimize_patent_for_llm(sections)
analysis = self.llm_analyzer.analyze_patent_content(
patent_content=minimized_content, company_name=company_name
)
return analysis
except FileNotFoundError:
raise
except Exception as e:
return f"Failed to analyze patent {patent_id}: {e}"
@staticmethod
def _process_single_patent(
patent: Patent,
company_name: str = "",
db: DatabaseClient | None = None,
) -> dict | None:
"""Download, parse, and minimize a single patent. Thread-safe.
Checks DB cache before downloading. Stores results after processing.
Returns:
Dict with patent_id and minimized content, or None on failure.
"""
try:
# Check DB cache first
if db:
cached = db.get_cached_patent(patent.patent_id)
if cached and cached.get("minimized_content"):
return {"patent_id": patent.patent_id, "content": cached["minimized_content"]}
# Full processing: download, parse, minimize
patent = SERP.save_patents(patent)
sections = SERP.parse_patent_pdf(patent.pdf_path)
minimized_content = SERP.minimize_patent_for_llm(sections)
# Store in DB cache
if db:
db.store_patent(
patent_id=patent.patent_id,
company_name=company_name,
pdf_link=patent.pdf_link,
raw_sections=sections,
minimized_content=minimized_content,
)
return {"patent_id": patent.patent_id, "content": minimized_content}
except Exception as e:
logger.warning("Failed to process %s: %s", patent.patent_id, e)
return None
def _analyze_company_safe(self, company_name: str) -> CompanyAnalysisResult:
"""Internal wrapper that catches exceptions and returns structured result.
Args:
company_name: Name of the company to analyze
Returns:
CompanyAnalysisResult with success/failure status
"""
try:
# Delegate to analyze_company which handles SERP/patent caching
analysis = self.analyze_company(company_name)
# Determine patent count from cached SERP query
query_hash = hashlib.sha256(company_name.lower().encode()).hexdigest()
cached_ids = self.db.get_cached_serp_query(query_hash)
patent_count = len(cached_ids) if cached_ids else 0
# Check if analysis indicates failure
if analysis.startswith("No patents found") or analysis.startswith(
"Failed to process"
):
return CompanyAnalysisResult(
company_name=company_name,
analysis=analysis,
patent_count=patent_count,
success=False,
error=analysis,
)
return CompanyAnalysisResult(
company_name=company_name,
analysis=analysis,
patent_count=patent_count,
success=True,
)
except Exception as e:
return CompanyAnalysisResult(
company_name=company_name,
analysis="",
patent_count=0,
success=False,
error=str(e),
)
def analyze_companies(
self,
companies: list[str],
max_workers: int = 3,
progress_callback: Callable[[str, int, int], None] | None = None,
) -> BatchAnalysisResult:
"""Analyze multiple companies' patent portfolios in batch.
Processes companies concurrently for improved performance while
respecting API rate limits.
Args:
companies: List of company names to analyze
max_workers: Maximum concurrent analyses (default 3 to avoid rate limits)
progress_callback: Optional callback(company_name, completed, total)
Returns:
BatchAnalysisResult containing all individual results and summary stats
"""
results: list[CompanyAnalysisResult] = []
total = len(companies)
logger.info("Starting batch analysis of %d companies...", total)
with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_company = {
executor.submit(self._analyze_company_safe, company): company
for company in companies
}
completed = 0
for future in as_completed(future_to_company):
company = future_to_company[future]
completed += 1
try:
result = future.result()
results.append(result)
status = "OK" if result.success else "FAIL"
logger.info("[%d/%d] %s %s", completed, total, status, company)
if progress_callback:
progress_callback(company, completed, total)
except Exception as e:
results.append(
CompanyAnalysisResult(
company_name=company,
analysis="",
patent_count=0,
success=False,
error=str(e),
)
)
logger.error("[%d/%d] FAIL %s: %s", completed, total, company, e)
successful = sum(1 for r in results if r.success)
failed = total - successful
logger.info("Batch complete: %d succeeded, %d failed", successful, failed)
return BatchAnalysisResult(
results=results,
total_companies=total,
successful=successful,
failed=failed,
)
def analyze_companies_sequential(
self, companies: list[str]
) -> BatchAnalysisResult:
"""Analyze multiple companies sequentially (safer for rate limits).
Use this when you want more control over API rate limiting or
when debugging issues.
Args:
companies: List of company names to analyze
Returns:
BatchAnalysisResult containing all individual results
"""
results: list[CompanyAnalysisResult] = []
total = len(companies)
logger.info("Starting sequential analysis of %d companies...", total)
for idx, company in enumerate(companies, 1):
logger.info("[%d/%d] Analyzing %s...", idx, total, company)
result = self._analyze_company_safe(company)
results.append(result)
status = "OK" if result.success else "FAIL"
logger.info("[%d/%d] %s %s", idx, total, status, company)
successful = sum(1 for r in results if r.success)
failed = total - successful
logger.info("Batch complete: %d succeeded, %d failed", successful, failed)
return BatchAnalysisResult(
results=results,
total_companies=total,
successful=successful,
failed=failed,
)