From b9bb3dc1cd301e62c18d61d84cc7e6490e693874 Mon Sep 17 00:00:00 2001 From: 0xWheatyz Date: Tue, 24 Mar 2026 14:32:23 -0400 Subject: [PATCH] perf(analyzer): parallelize patent download/parse/minimize with threads Replace the sequential per-patent loop with a ThreadPoolExecutor (workers controlled by PATENT_THREAD_WORKERS config). Each patent is processed independently in _process_single_patent, which is thread-safe since SERP methods are stateless and operate on separate files. --- SPARC/analyzer.py | 52 ++++++++++++++++++++++++++++------------------- 1 file changed, 31 insertions(+), 21 deletions(-) diff --git a/SPARC/analyzer.py b/SPARC/analyzer.py index 47ae7e2..ffbd68e 100644 --- a/SPARC/analyzer.py +++ b/SPARC/analyzer.py @@ -7,6 +7,7 @@ to provide company performance estimation based on patent portfolios. from concurrent.futures import ThreadPoolExecutor, as_completed from typing import Callable +from SPARC import config from SPARC.serp_api import SERP from SPARC.llm import LLMAnalyzer from SPARC.types import Patent, CompanyAnalysisResult, BatchAnalysisResult @@ -49,28 +50,21 @@ class CompanyAnalyzer: print(f"Found {len(patents.patents)} patents. Processing...") - # Download and parse each patent + # Download, parse, and minimize patents in parallel processed_patents = [] - for idx, patent in enumerate(patents.patents, 1): - print(f"Processing patent {idx}/{len(patents.patents)}: {patent.patent_id}") - - try: - # Download PDF - patent = SERP.save_patents(patent) - - # Parse sections from PDF - sections = SERP.parse_patent_pdf(patent.pdf_path) - - # Minimize for LLM (remove bloat) - minimized_content = SERP.minimize_patent_for_llm(sections) - - processed_patents.append( - {"patent_id": patent.patent_id, "content": minimized_content} - ) - - except Exception as e: - print(f"Warning: Failed to process {patent.patent_id}: {e}") - continue + with ThreadPoolExecutor(max_workers=config.patent_thread_workers) as executor: + future_to_patent = { + executor.submit(self._process_single_patent, patent): patent + for patent in patents.patents + } + for future in as_completed(future_to_patent): + patent = future_to_patent[future] + try: + result = future.result() + if result: + processed_patents.append(result) + except Exception as e: + print(f"Warning: Failed to process {patent.patent_id}: {e}") if not processed_patents: return f"Failed to process any patents for {company_name}" @@ -115,6 +109,22 @@ class CompanyAnalyzer: except Exception as e: return f"Failed to analyze patent {patent_id}: {e}" + @staticmethod + def _process_single_patent(patent: Patent) -> dict | None: + """Download, parse, and minimize a single patent. Thread-safe. + + Returns: + Dict with patent_id and minimized content, or None on failure. + """ + try: + patent = SERP.save_patents(patent) + sections = SERP.parse_patent_pdf(patent.pdf_path) + minimized_content = SERP.minimize_patent_for_llm(sections) + return {"patent_id": patent.patent_id, "content": minimized_content} + except Exception as e: + print(f"Warning: Failed to process {patent.patent_id}: {e}") + return None + def _analyze_company_safe(self, company_name: str) -> CompanyAnalysisResult: """Internal wrapper that catches exceptions and returns structured result.