feat(analyzer): integrate DB patent and SERP query caching

Before querying SERP API, check serp_queries cache (24h TTL). Before
downloading/parsing each patent, check patents table for cached
minimized_content. Store results after processing so repeated analyses
skip all network I/O and PDF parsing entirely.
This commit is contained in:
2026-03-24 14:35:24 -04:00
parent 3154f6b732
commit 1a297eb60b
+51 -4
View File
@@ -4,25 +4,31 @@ This module ties together patent retrieval, parsing, and LLM analysis
to provide company performance estimation based on patent portfolios. to provide company performance estimation based on patent portfolios.
""" """
import hashlib
from concurrent.futures import ThreadPoolExecutor, as_completed from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Callable from typing import Callable
from SPARC import config from SPARC import config
from SPARC.database import DatabaseClient
from SPARC.serp_api import SERP from SPARC.serp_api import SERP
from SPARC.llm import LLMAnalyzer from SPARC.llm import LLMAnalyzer
from SPARC.types import Patent, CompanyAnalysisResult, BatchAnalysisResult from SPARC.types import Patent, Patents, CompanyAnalysisResult, BatchAnalysisResult
class CompanyAnalyzer: class CompanyAnalyzer:
"""Orchestrates end-to-end company performance analysis via patents.""" """Orchestrates end-to-end company performance analysis via patents."""
def __init__(self, openrouter_api_key: str | None = None): def __init__(self, openrouter_api_key: str | None = None, db_client: DatabaseClient | None = None):
"""Initialize the company analyzer. """Initialize the company analyzer.
Args: Args:
openrouter_api_key: Optional OpenRouter API key. If None, loads from config. openrouter_api_key: Optional OpenRouter API key. If None, loads from config.
db_client: Optional DatabaseClient for patent caching. Created automatically if None.
""" """
self.llm_analyzer = LLMAnalyzer(api_key=openrouter_api_key) self.llm_analyzer = LLMAnalyzer(api_key=openrouter_api_key)
self.db = db_client or DatabaseClient(config.database_url)
self.db.connect()
self.db.initialize_schema()
def analyze_company(self, company_name: str, patents: "Patents | None" = None) -> str: def analyze_company(self, company_name: str, patents: "Patents | None" = None) -> str:
"""Analyze a company's performance based on their patent portfolio. """Analyze a company's performance based on their patent portfolio.
@@ -42,8 +48,25 @@ class CompanyAnalyzer:
Comprehensive analysis of company's innovation and performance outlook Comprehensive analysis of company's innovation and performance outlook
""" """
if patents is None: if patents is None:
# Check SERP query cache first
query_hash = hashlib.sha256(company_name.lower().encode()).hexdigest()
cached_ids = self.db.get_cached_serp_query(query_hash)
if cached_ids is not None:
print(f"Using cached SERP results for {company_name} ({len(cached_ids)} patents)")
patents = Patents(patents=[
Patent(patent_id=pid, pdf_link="")
for pid in cached_ids
])
else:
print(f"Retrieving patents for {company_name}...") print(f"Retrieving patents for {company_name}...")
patents = SERP.query(company_name) patents = SERP.query(company_name)
# Cache the SERP results
if patents.patents:
self.db.store_serp_query(
company_name=company_name,
query_hash=query_hash,
patent_ids=[p.patent_id for p in patents.patents],
)
if not patents.patents: if not patents.patents:
return f"No patents found for {company_name}" return f"No patents found for {company_name}"
@@ -54,7 +77,7 @@ class CompanyAnalyzer:
processed_patents = [] processed_patents = []
with ThreadPoolExecutor(max_workers=config.patent_thread_workers) as executor: with ThreadPoolExecutor(max_workers=config.patent_thread_workers) as executor:
future_to_patent = { future_to_patent = {
executor.submit(self._process_single_patent, patent): patent executor.submit(self._process_single_patent, patent, company_name, self.db): patent
for patent in patents.patents for patent in patents.patents
} }
for future in as_completed(future_to_patent): for future in as_completed(future_to_patent):
@@ -110,16 +133,40 @@ class CompanyAnalyzer:
return f"Failed to analyze patent {patent_id}: {e}" return f"Failed to analyze patent {patent_id}: {e}"
@staticmethod @staticmethod
def _process_single_patent(patent: Patent) -> dict | None: def _process_single_patent(
patent: Patent,
company_name: str = "",
db: DatabaseClient | None = None,
) -> dict | None:
"""Download, parse, and minimize a single patent. Thread-safe. """Download, parse, and minimize a single patent. Thread-safe.
Checks DB cache before downloading. Stores results after processing.
Returns: Returns:
Dict with patent_id and minimized content, or None on failure. Dict with patent_id and minimized content, or None on failure.
""" """
try: try:
# Check DB cache first
if db:
cached = db.get_cached_patent(patent.patent_id)
if cached and cached.get("minimized_content"):
return {"patent_id": patent.patent_id, "content": cached["minimized_content"]}
# Full processing: download, parse, minimize
patent = SERP.save_patents(patent) patent = SERP.save_patents(patent)
sections = SERP.parse_patent_pdf(patent.pdf_path) sections = SERP.parse_patent_pdf(patent.pdf_path)
minimized_content = SERP.minimize_patent_for_llm(sections) minimized_content = SERP.minimize_patent_for_llm(sections)
# Store in DB cache
if db:
db.store_patent(
patent_id=patent.patent_id,
company_name=company_name,
pdf_link=patent.pdf_link,
raw_sections=sections,
minimized_content=minimized_content,
)
return {"patent_id": patent.patent_id, "content": minimized_content} return {"patent_id": patent.patent_id, "content": minimized_content}
except Exception as e: except Exception as e:
print(f"Warning: Failed to process {patent.patent_id}: {e}") print(f"Warning: Failed to process {patent.patent_id}: {e}")