perf(analyzer): parallelize patent download/parse/minimize with threads

Replace the sequential per-patent loop with a ThreadPoolExecutor
(workers controlled by PATENT_THREAD_WORKERS config). Each patent is
processed independently in _process_single_patent, which is thread-safe
since SERP methods are stateless and operate on separate files.
This commit is contained in:
2026-03-24 14:32:23 -04:00
parent 90f9cfc826
commit b9bb3dc1cd
+31 -21
View File
@@ -7,6 +7,7 @@ to provide company performance estimation based on patent portfolios.
from concurrent.futures import ThreadPoolExecutor, as_completed from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Callable from typing import Callable
from SPARC import config
from SPARC.serp_api import SERP from SPARC.serp_api import SERP
from SPARC.llm import LLMAnalyzer from SPARC.llm import LLMAnalyzer
from SPARC.types import Patent, CompanyAnalysisResult, BatchAnalysisResult from SPARC.types import Patent, CompanyAnalysisResult, BatchAnalysisResult
@@ -49,28 +50,21 @@ class CompanyAnalyzer:
print(f"Found {len(patents.patents)} patents. Processing...") print(f"Found {len(patents.patents)} patents. Processing...")
# Download and parse each patent # Download, parse, and minimize patents in parallel
processed_patents = [] processed_patents = []
for idx, patent in enumerate(patents.patents, 1): with ThreadPoolExecutor(max_workers=config.patent_thread_workers) as executor:
print(f"Processing patent {idx}/{len(patents.patents)}: {patent.patent_id}") future_to_patent = {
executor.submit(self._process_single_patent, patent): patent
try: for patent in patents.patents
# Download PDF }
patent = SERP.save_patents(patent) for future in as_completed(future_to_patent):
patent = future_to_patent[future]
# Parse sections from PDF try:
sections = SERP.parse_patent_pdf(patent.pdf_path) result = future.result()
if result:
# Minimize for LLM (remove bloat) processed_patents.append(result)
minimized_content = SERP.minimize_patent_for_llm(sections) except Exception as e:
print(f"Warning: Failed to process {patent.patent_id}: {e}")
processed_patents.append(
{"patent_id": patent.patent_id, "content": minimized_content}
)
except Exception as e:
print(f"Warning: Failed to process {patent.patent_id}: {e}")
continue
if not processed_patents: if not processed_patents:
return f"Failed to process any patents for {company_name}" return f"Failed to process any patents for {company_name}"
@@ -115,6 +109,22 @@ class CompanyAnalyzer:
except Exception as e: except Exception as e:
return f"Failed to analyze patent {patent_id}: {e}" return f"Failed to analyze patent {patent_id}: {e}"
@staticmethod
def _process_single_patent(patent: Patent) -> dict | None:
"""Download, parse, and minimize a single patent. Thread-safe.
Returns:
Dict with patent_id and minimized content, or None on failure.
"""
try:
patent = SERP.save_patents(patent)
sections = SERP.parse_patent_pdf(patent.pdf_path)
minimized_content = SERP.minimize_patent_for_llm(sections)
return {"patent_id": patent.patent_id, "content": minimized_content}
except Exception as e:
print(f"Warning: Failed to process {patent.patent_id}: {e}")
return None
def _analyze_company_safe(self, company_name: str) -> CompanyAnalysisResult: def _analyze_company_safe(self, company_name: str) -> CompanyAnalysisResult:
"""Internal wrapper that catches exceptions and returns structured result. """Internal wrapper that catches exceptions and returns structured result.