Compare commits

...

11 Commits

Author SHA1 Message Date
agent-company 47cddcbeaf feat(security): add JWT startup guard, configurable CORS, and externalize DB credentials
- Add check_jwt_secret() that refuses default JWT secret when APP_ENV != development
- Make CORS origins configurable via CORS_ORIGINS env var (comma-separated)
- Replace hardcoded postgres credentials in docker-compose.yml with env var references
- Add APP_ENV and cors_origins to config.py
- Update .env.example with all required variables and documentation
- Add tests for JWT startup guard and CORS configuration

Closes leeworks-agents/SPARC#4
Closes leeworks-agents/SPARC#5
Closes leeworks-agents/SPARC#6

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-26 04:06:31 +00:00
AI-Manager 6105ba7793 Merge pull request 'chore: add ROADMAP.md for SPARC application development' (#3) from chore/add-roadmap into main 2026-03-26 02:47:54 +00:00
agent-company e8cdc089fa chore: add ROADMAP.md for SPARC application development
- Document current project state and architecture
- Identify P1 priorities: security hardening, error handling, test coverage
- Identify P2 priorities: structured logging, configurable LLM, frontend polish, CI tests
- Identify P3 priorities: export, comparison, scheduled analysis, notifications
- Reference Talos repo for infrastructure/deployment concerns

Closes leeworks-agents/SPARC#2

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-26 00:06:56 +00:00
0xWheatyz 9c971dac72 fix(analyzer): route _analyze_company_safe through cache-aware path
_analyze_company_safe was calling SERP.query directly, bypassing the
SERP query cache in analyze_company. Now delegates fully to
analyze_company() and reads patent_count from the serp_queries cache.
2026-03-24 15:02:19 -04:00
0xWheatyz 6f0b448044 test(analyzer,serp): add tests for caching, single query, and parallel processing
- Add TestSingleQueryBugFix: verify SERP.query called once per analysis
- Add TestPatentCaching: DB cache hit/miss, SERP query cache hit/miss
- Add TestDynamicDateRange: rolling window, days_back param
- Add TestFilesystemPDFCaching: skip download, redownload empty files
- Add autouse mock_db fixture to prevent real DB connections in all tests
2026-03-24 14:39:09 -04:00
0xWheatyz 1a297eb60b feat(analyzer): integrate DB patent and SERP query caching
Before querying SERP API, check serp_queries cache (24h TTL). Before
downloading/parsing each patent, check patents table for cached
minimized_content. Store results after processing so repeated analyses
skip all network I/O and PDF parsing entirely.
2026-03-24 14:35:24 -04:00
0xWheatyz 3154f6b732 feat(database): add patent/serp caching tables and connection pooling
- Add patents table (patent_id PK, raw_sections JSONB, minimized_content)
- Add serp_queries table (query_hash unique, result_patent_ids, expires_at)
- Add cache methods: get/store_patent, get/store_serp_query
- Replace single connection with ThreadedConnectionPool (min=2, max=10)
- Add get_conn() context manager for thread-safe connection checkout
- Legacy single-connection path preserved for backwards compatibility
2026-03-24 14:34:33 -04:00
0xWheatyz b9bb3dc1cd perf(analyzer): parallelize patent download/parse/minimize with threads
Replace the sequential per-patent loop with a ThreadPoolExecutor
(workers controlled by PATENT_THREAD_WORKERS config). Each patent is
processed independently in _process_single_patent, which is thread-safe
since SERP methods are stateless and operate on separate files.
2026-03-24 14:32:23 -04:00
0xWheatyz 90f9cfc826 fix(serp): replace hardcoded date range with rolling window
The SERP query had a frozen date range (Oct-Nov 2025) that returned
stale patents. Now computes a rolling window from config
(PATENT_SEARCH_DAYS, default 90 days). Also adds filesystem-level PDF
caching to skip re-downloading existing patent PDFs, and adds
PATENT_THREAD_WORKERS config for upcoming parallel processing.
2026-03-24 14:31:43 -04:00
0xWheatyz d387bbbdf3 fix(analyzer): eliminate double SERP.query() call per company analysis
_analyze_company_safe called SERP.query() then passed the company name
to analyze_company() which called SERP.query() again — doubling API
usage. Now analyze_company() accepts an optional patents param so callers
can pass pre-fetched results through.
2026-03-24 14:16:49 -04:00
0xWheatyz fa564e5e1e chore: forcing new git commit 2026-03-23 17:45:42 -04:00
13 changed files with 851 additions and 63 deletions
+30 -9
View File
@@ -1,21 +1,42 @@
# SPARC Configuration
# ---- Application Environment ----
# Set to "production" or "staging" in deployed environments.
# The API will refuse to start with the default JWT secret unless APP_ENV=development.
APP_ENV=development
# ---- API Keys ----
# SerpAPI key for patent search
API_KEY=your_serpapi_key_here
# OpenRouter API key for LLM analysis
OPENROUTER_API_KEY=your_openrouter_key_here
# Database configuration
# All messages are stored in the database for persistence and caching
DATABASE_URL=postgresql://postgres:postgres@localhost:5432/sparc
# ---- Database ----
# Cache configuration
# When USE_CACHE=true: check database for cached responses before making API calls
# When USE_CACHE=false: always make fresh API calls (still stores results in database)
# Default: true
USE_CACHE=true
# PostgreSQL credentials (used by docker-compose)
POSTGRES_USER=postgres
POSTGRES_PASSWORD=change-me-to-a-secure-password
POSTGRES_DB=sparc
# JWT Secret for authentication
# Full database URL (must match the credentials above)
DATABASE_URL=postgresql://postgres:change-me-to-a-secure-password@localhost:5432/sparc
# ---- Authentication ----
# JWT Secret for signing tokens
# IMPORTANT: Change this to a secure random string in production
JWT_SECRET=your-secure-jwt-secret-change-in-production
# ---- CORS ----
# Comma-separated list of allowed origins for CORS
# Defaults to http://localhost:3000,http://localhost:5173 when unset
# CORS_ORIGINS=https://sparc.example.com,https://app.example.com
# ---- Cache ----
# When USE_CACHE=true: check database for cached responses before making API calls
# When USE_CACHE=false: always make fresh API calls (still stores results in database)
USE_CACHE=true
View File
+122
View File
@@ -0,0 +1,122 @@
# SPARC Roadmap
Semiconductor Patent & Analytics Report Core -- development priorities.
## Current State
SPARC is a patent analysis platform with a working end-to-end pipeline:
Python/FastAPI backend, React/TypeScript frontend, PostgreSQL for persistence
and caching, Docker Compose for local development, and Gitea Actions CI/CD for
image builds. Core features (patent retrieval via SerpAPI, PDF parsing, LLM
analysis via OpenRouter/Claude, batch processing, JWT authentication, analytics
dashboard) are all implemented and functional.
---
## P1 -- High Priority
These items address correctness, security, and reliability gaps that should be
resolved before broader production use.
### Security hardening
- **Rotate default JWT secret.** `auth.py` ships a fallback
`sparc-secret-key-change-in-production` that will be used if `JWT_SECRET` is
unset. Add a startup check that refuses to start with the default secret in
non-development environments.
- **CORS allow-origins are hardcoded.** `api.py` only permits
`localhost:3000` and `localhost:5173`. Make the allowed origins configurable
via environment variable so the dashboard works when deployed behind a real
domain.
- **Database credentials in docker-compose.yml.** The compose file embeds
`postgres:postgres` in plain text. Reference a `.env` file or Docker secrets
instead.
### Error handling and resilience
- **`get_db_client()` in `auth.py` creates a new `DatabaseClient` on every
call.** This bypasses the connection pool and can exhaust database
connections under load. Refactor to share a single pooled client.
- **`_jobs` dict is in-memory only.** Job state is lost on API restart. Persist
job status in PostgreSQL or Redis so async batch results survive restarts.
- **No rate limiting on auth endpoints.** `/auth/login` and `/auth/register`
are unprotected against brute-force or abuse. Add rate limiting middleware.
### Test coverage for auth and admin
- The existing API tests (`tests/test_api.py`) bypass authentication entirely.
Add tests that exercise the JWT flow: registration, login, protected-route
access, token refresh, and admin-only endpoints.
---
## P2 -- Medium Priority
Improvements to usability, performance, and developer experience.
### Backend
- **Add structured logging.** Replace `print()` calls throughout `analyzer.py`,
`serp_api.py`, and `llm.py` with Python `logging` so log levels and
formatting are consistent.
- **Make LLM model configurable.** `llm.py` hardcodes
`anthropic/claude-3.5-sonnet`. Accept a `MODEL` environment variable to allow
switching models without code changes.
- **SERP cache TTL is hardcoded to 24 hours.** Expose `SERP_CACHE_TTL_HOURS`
as an environment variable in `config.py`.
- **Patent PDF storage.** PDFs are saved to a local `patents/` directory. For
containerized deployments, consider object storage (S3/MinIO) or at minimum
document the volume mount requirement more prominently.
- **`analyze_single_patent` assumes local file path.** The method constructs
`patents/{patent_id}.pdf` and reads from disk, but does not download the PDF
first. Either integrate the download step or document the prerequisite.
- **`Patent.patent_id` typed as `int` in `types.py` but used as `str`
everywhere.** Fix the type annotation to `str`.
### Frontend
- **No loading/error states on several pages.** The Batch and Analytics pages
would benefit from skeleton loaders and user-friendly error messages.
- **No dark mode.** Tailwind is configured but no dark variant is applied.
- **Missing `package-lock.json` or `pnpm-lock.yaml`.** The frontend has no
lockfile committed, leading to non-reproducible builds.
### CI/CD
- **No test stage in the Gitea Actions workflow.** `build.yaml` builds and
pushes images but never runs `pytest`. Add a test job that gates the build.
- **No linting or type checking.** Add `ruff` (Python) and `tsc --noEmit`
(TypeScript) to CI.
---
## P3 -- Nice to Have
Lower-urgency enhancements and future features.
- **Export analysis reports.** Allow users to download analysis results as PDF
or CSV from the dashboard.
- **Comparison view.** Side-by-side comparison of two companies' patent
portfolios.
- **Scheduled/recurring analysis.** Periodically re-analyze tracked companies
and alert on significant changes.
- **Webhook/notification support.** Send alerts (Slack, Discord, email) when
batch jobs complete or when a company's innovation score changes
significantly.
- **Multi-model support.** Let users choose between LLM providers per analysis
(e.g., GPT-4o, Gemini, Claude) and compare outputs.
- **Patent trend charts.** Visualize patent filing frequency and technology
category distribution over time in the Analytics page.
- **API pagination.** The `/analyze/batch` and `/jobs` endpoints could benefit
from cursor-based pagination for large result sets.
- **OpenAPI client generation.** Auto-generate the TypeScript API client from
the FastAPI OpenAPI spec to keep frontend types in sync.
---
## Infrastructure and Deployment
Kubernetes manifests, Helm charts, and cluster-level concerns (MetalLB,
storage, FluxCD sync) are tracked in the
[Talos](https://10.0.1.10/leeworks-agents/Talos) repository. File
infrastructure-related issues there, not here.
+91 -29
View File
@@ -4,26 +4,33 @@ This module ties together patent retrieval, parsing, and LLM analysis
to provide company performance estimation based on patent portfolios.
"""
import hashlib
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Callable
from SPARC import config
from SPARC.database import DatabaseClient
from SPARC.serp_api import SERP
from SPARC.llm import LLMAnalyzer
from SPARC.types import Patent, CompanyAnalysisResult, BatchAnalysisResult
from SPARC.types import Patent, Patents, CompanyAnalysisResult, BatchAnalysisResult
class CompanyAnalyzer:
"""Orchestrates end-to-end company performance analysis via patents."""
def __init__(self, openrouter_api_key: str | None = None):
def __init__(self, openrouter_api_key: str | None = None, db_client: DatabaseClient | None = None):
"""Initialize the company analyzer.
Args:
openrouter_api_key: Optional OpenRouter API key. If None, loads from config.
db_client: Optional DatabaseClient for patent caching. Created automatically if None.
"""
self.llm_analyzer = LLMAnalyzer(api_key=openrouter_api_key)
self.db = db_client or DatabaseClient(config.database_url)
self.db.connect()
self.db.initialize_schema()
def analyze_company(self, company_name: str) -> str:
def analyze_company(self, company_name: str, patents: "Patents | None" = None) -> str:
"""Analyze a company's performance based on their patent portfolio.
This is the main entry point that orchestrates the full pipeline:
@@ -35,40 +42,52 @@ class CompanyAnalyzer:
Args:
company_name: Name of the company to analyze
patents: Optional pre-fetched Patents result to avoid duplicate API calls
Returns:
Comprehensive analysis of company's innovation and performance outlook
"""
print(f"Retrieving patents for {company_name}...")
patents = SERP.query(company_name)
if patents is None:
# Check SERP query cache first
query_hash = hashlib.sha256(company_name.lower().encode()).hexdigest()
cached_ids = self.db.get_cached_serp_query(query_hash)
if cached_ids is not None:
print(f"Using cached SERP results for {company_name} ({len(cached_ids)} patents)")
patents = Patents(patents=[
Patent(patent_id=pid, pdf_link="")
for pid in cached_ids
])
else:
print(f"Retrieving patents for {company_name}...")
patents = SERP.query(company_name)
# Cache the SERP results
if patents.patents:
self.db.store_serp_query(
company_name=company_name,
query_hash=query_hash,
patent_ids=[p.patent_id for p in patents.patents],
)
if not patents.patents:
return f"No patents found for {company_name}"
print(f"Found {len(patents.patents)} patents. Processing...")
# Download and parse each patent
# Download, parse, and minimize patents in parallel
processed_patents = []
for idx, patent in enumerate(patents.patents, 1):
print(f"Processing patent {idx}/{len(patents.patents)}: {patent.patent_id}")
try:
# Download PDF
patent = SERP.save_patents(patent)
# Parse sections from PDF
sections = SERP.parse_patent_pdf(patent.pdf_path)
# Minimize for LLM (remove bloat)
minimized_content = SERP.minimize_patent_for_llm(sections)
processed_patents.append(
{"patent_id": patent.patent_id, "content": minimized_content}
)
except Exception as e:
print(f"Warning: Failed to process {patent.patent_id}: {e}")
continue
with ThreadPoolExecutor(max_workers=config.patent_thread_workers) as executor:
future_to_patent = {
executor.submit(self._process_single_patent, patent, company_name, self.db): patent
for patent in patents.patents
}
for future in as_completed(future_to_patent):
patent = future_to_patent[future]
try:
result = future.result()
if result:
processed_patents.append(result)
except Exception as e:
print(f"Warning: Failed to process {patent.patent_id}: {e}")
if not processed_patents:
return f"Failed to process any patents for {company_name}"
@@ -113,6 +132,46 @@ class CompanyAnalyzer:
except Exception as e:
return f"Failed to analyze patent {patent_id}: {e}"
@staticmethod
def _process_single_patent(
patent: Patent,
company_name: str = "",
db: DatabaseClient | None = None,
) -> dict | None:
"""Download, parse, and minimize a single patent. Thread-safe.
Checks DB cache before downloading. Stores results after processing.
Returns:
Dict with patent_id and minimized content, or None on failure.
"""
try:
# Check DB cache first
if db:
cached = db.get_cached_patent(patent.patent_id)
if cached and cached.get("minimized_content"):
return {"patent_id": patent.patent_id, "content": cached["minimized_content"]}
# Full processing: download, parse, minimize
patent = SERP.save_patents(patent)
sections = SERP.parse_patent_pdf(patent.pdf_path)
minimized_content = SERP.minimize_patent_for_llm(sections)
# Store in DB cache
if db:
db.store_patent(
patent_id=patent.patent_id,
company_name=company_name,
pdf_link=patent.pdf_link,
raw_sections=sections,
minimized_content=minimized_content,
)
return {"patent_id": patent.patent_id, "content": minimized_content}
except Exception as e:
print(f"Warning: Failed to process {patent.patent_id}: {e}")
return None
def _analyze_company_safe(self, company_name: str) -> CompanyAnalysisResult:
"""Internal wrapper that catches exceptions and returns structured result.
@@ -123,11 +182,14 @@ class CompanyAnalyzer:
CompanyAnalysisResult with success/failure status
"""
try:
patents = SERP.query(company_name)
patent_count = len(patents.patents) if patents.patents else 0
# Delegate to analyze_company which handles SERP/patent caching
analysis = self.analyze_company(company_name)
# Determine patent count from cached SERP query
query_hash = hashlib.sha256(company_name.lower().encode()).hexdigest()
cached_ids = self.db.get_cached_serp_query(query_hash)
patent_count = len(cached_ids) if cached_ids else 0
# Check if analysis indicates failure
if analysis.startswith("No patents found") or analysis.startswith(
"Failed to process"
+3 -1
View File
@@ -16,6 +16,7 @@ from SPARC.analyzer import CompanyAnalyzer
from SPARC.auth import (
TokenResponse,
UserResponse,
check_jwt_secret,
create_tokens,
decode_token,
get_current_admin,
@@ -150,6 +151,7 @@ _analyzer: CompanyAnalyzer | None = None
async def lifespan(app: FastAPI):
"""Initialize resources on startup."""
global _analyzer
check_jwt_secret()
_analyzer = CompanyAnalyzer()
yield
# Cleanup if needed
@@ -167,7 +169,7 @@ app = FastAPI(
# Add CORS middleware for React frontend
app.add_middleware(
CORSMiddleware,
allow_origins=["http://localhost:3000", "http://localhost:5173"],
allow_origins=config.cors_origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
+15 -1
View File
@@ -13,11 +13,25 @@ from SPARC import config
from SPARC.database import DatabaseClient
# JWT Configuration
JWT_SECRET = os.getenv("JWT_SECRET", "sparc-secret-key-change-in-production")
_DEFAULT_JWT_SECRET = "sparc-secret-key-change-in-production"
JWT_SECRET = os.getenv("JWT_SECRET", _DEFAULT_JWT_SECRET)
JWT_ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
REFRESH_TOKEN_EXPIRE_DAYS = 7
def check_jwt_secret() -> None:
"""Refuse to start with the default JWT secret in non-development environments.
Raises:
RuntimeError: If JWT_SECRET is the default value and APP_ENV is not 'development'.
"""
if JWT_SECRET == _DEFAULT_JWT_SECRET and config.app_env != "development":
raise RuntimeError(
f"FATAL: JWT_SECRET is set to the default value and APP_ENV={config.app_env!r}. "
"Set a secure JWT_SECRET environment variable before running in non-development environments."
)
security = HTTPBearer()
+17
View File
@@ -26,6 +26,23 @@ use_cache = os.getenv("USE_CACHE", "true").lower() in ("true", "1", "yes")
# This variable is kept for backwards compatibility but has no effect
use_database = os.getenv("USE_DATABASE", "false").lower() in ("true", "1", "yes")
# Patent search configuration
patent_search_days = int(os.getenv("PATENT_SEARCH_DAYS", "90"))
patent_thread_workers = int(os.getenv("PATENT_THREAD_WORKERS", "5"))
# Root path for running behind a reverse proxy (e.g., "/api" when served at /api/)
# This ensures OpenAPI docs work correctly when accessed via the proxy
root_path = os.getenv("ROOT_PATH", "")
# Application environment: "development", "staging", or "production"
# Used for safety checks (e.g., refusing default JWT secret in production)
app_env = os.getenv("APP_ENV", "development")
# CORS allowed origins (comma-separated)
# Defaults to localhost dev origins when unset
_cors_origins_raw = os.getenv("CORS_ORIGINS", "")
cors_origins: list[str] = (
[o.strip() for o in _cors_origins_raw.split(",") if o.strip()]
if _cors_origins_raw
else ["http://localhost:3000", "http://localhost:5173"]
)
+146 -4
View File
@@ -1,9 +1,11 @@
"""Database client for storing and retrieving LLM messages and user authentication."""
import contextlib
import psycopg2
from psycopg2.pool import ThreadedConnectionPool
from psycopg2.extras import RealDictCursor
from typing import Dict, List, Optional
from datetime import datetime
from datetime import datetime, timedelta
import json
import hashlib
import bcrypt
@@ -12,24 +14,49 @@ import bcrypt
class DatabaseClient:
"""Handles database operations for message storage and retrieval."""
def __init__(self, database_url: str):
def __init__(self, database_url: str, minconn: int = 2, maxconn: int = 10):
"""Initialize the database client.
Args:
database_url: PostgreSQL connection string
minconn: Minimum connections in the pool
maxconn: Maximum connections in the pool
"""
self.database_url = database_url
self._pool: ThreadedConnectionPool | None = None
self._minconn = minconn
self._maxconn = maxconn
# Legacy single connection kept for backwards compatibility
self.conn = None
def _ensure_pool(self):
"""Create the connection pool if it doesn't exist yet."""
if self._pool is None or self._pool.closed:
self._pool = ThreadedConnectionPool(
self._minconn, self._maxconn, self.database_url
)
@contextlib.contextmanager
def get_conn(self):
"""Check out a connection from the pool. Returns it on exit."""
self._ensure_pool()
conn = self._pool.getconn()
try:
yield conn
finally:
self._pool.putconn(conn)
def connect(self):
"""Establish database connection."""
"""Establish database connection (legacy single-connection path)."""
if not self.conn or self.conn.closed:
self.conn = psycopg2.connect(self.database_url)
def close(self):
"""Close database connection."""
"""Close database connection and pool."""
if self.conn and not self.conn.closed:
self.conn.close()
if self._pool and not self._pool.closed:
self._pool.closeall()
def initialize_schema(self):
"""Create database tables if they don't exist."""
@@ -110,6 +137,40 @@ class DatabaseClient:
ON users(email)
""")
# Create patents cache table
cursor.execute("""
CREATE TABLE IF NOT EXISTS patents (
patent_id VARCHAR(64) PRIMARY KEY,
company_name VARCHAR(255),
pdf_link TEXT,
raw_sections JSONB,
minimized_content TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_patents_company
ON patents(company_name)
""")
# Create SERP query cache table
cursor.execute("""
CREATE TABLE IF NOT EXISTS serp_queries (
id SERIAL PRIMARY KEY,
company_name VARCHAR(255),
query_hash VARCHAR(64) UNIQUE,
result_patent_ids TEXT[],
expires_at TIMESTAMP NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_serp_queries_hash
ON serp_queries(query_hash)
""")
self.conn.commit()
@staticmethod
@@ -320,6 +381,87 @@ class DatabaseClient:
"period_days": days,
}
# Patent Cache Methods
def get_cached_patent(self, patent_id: str) -> Optional[Dict]:
"""Look up a cached patent by ID.
Returns:
Dict with raw_sections and minimized_content, or None.
"""
with self.get_conn() as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cursor:
cursor.execute(
"SELECT * FROM patents WHERE patent_id = %s",
(patent_id,),
)
row = cursor.fetchone()
return dict(row) if row else None
def store_patent(
self,
patent_id: str,
company_name: str,
pdf_link: str,
raw_sections: Dict,
minimized_content: str,
) -> None:
"""Store a processed patent in the cache."""
with self.get_conn() as conn:
with conn.cursor() as cursor:
cursor.execute(
"""
INSERT INTO patents (patent_id, company_name, pdf_link, raw_sections, minimized_content)
VALUES (%s, %s, %s, %s, %s)
ON CONFLICT (patent_id) DO UPDATE SET
raw_sections = EXCLUDED.raw_sections,
minimized_content = EXCLUDED.minimized_content
""",
(patent_id, company_name, pdf_link, json.dumps(raw_sections), minimized_content),
)
conn.commit()
def get_cached_serp_query(self, query_hash: str) -> Optional[List[str]]:
"""Look up cached SERP query results.
Returns:
List of patent IDs if cache hit and not expired, None otherwise.
"""
with self.get_conn() as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cursor:
cursor.execute(
"""
SELECT result_patent_ids FROM serp_queries
WHERE query_hash = %s AND expires_at > NOW()
""",
(query_hash,),
)
row = cursor.fetchone()
return row["result_patent_ids"] if row else None
def store_serp_query(
self,
company_name: str,
query_hash: str,
patent_ids: List[str],
ttl_hours: int = 24,
) -> None:
"""Store SERP query results in the cache."""
expires_at = datetime.now() + timedelta(hours=ttl_hours)
with self.get_conn() as conn:
with conn.cursor() as cursor:
cursor.execute(
"""
INSERT INTO serp_queries (company_name, query_hash, result_patent_ids, expires_at)
VALUES (%s, %s, %s, %s)
ON CONFLICT (query_hash) DO UPDATE SET
result_patent_ids = EXCLUDED.result_patent_ids,
expires_at = EXCLUDED.expires_at
""",
(company_name, query_hash, patent_ids, expires_at),
)
conn.commit()
# User Authentication Methods
@staticmethod
+20 -8
View File
@@ -1,17 +1,20 @@
import os
import serpapi
from SPARC import config
import re
import pdfplumber # pip install pdfplumber
import requests
from datetime import datetime, timedelta
from typing import Dict
from SPARC.types import Patents, Patent
class SERP:
def query(company: str) -> Patents:
def query(company: str, days_back: int = None) -> Patents:
"""Query Google Patents for a company's recent patents.
Args:
company: Name of the company to search for
days_back: Number of days to look back for patents (default from config)
Returns:
Patents object containing list of patents with PDF links
@@ -23,13 +26,19 @@ class SERP:
patents with restricted access). The returned count may be lower
than the requested number of results.
"""
if days_back is None:
days_back = config.patent_search_days
end_date = datetime.now()
start_date = end_date - timedelta(days=days_back)
date_filter = f"cdr:1,cd_min:{start_date.strftime('%-m/%-d/%Y')},cd_max:{end_date.strftime('%-m/%-d/%Y')}"
# Make API call
params = {
"engine": "google_patents",
"q": company,
"num": 10,
"filter": 1,
"tbs": "cdr:1,cd_min:10/28/2025,cd_max:11/4/2025",
"tbs": date_filter,
"api_key": config.api_key,
}
search = serpapi.search(params)
@@ -46,7 +55,7 @@ class SERP:
def save_patents(patent: Patent) -> Patent:
"""
Save the patent PDF to the patents folder
Save the patent PDF to the patents folder, skipping download if already cached.
Args:
patent: Patent object
@@ -54,12 +63,15 @@ class SERP:
Returns:
Patent object with updated PDF path
"""
response = requests.get(patent.pdf_link)
print(patent.pdf_link)
with open(f"patents/{patent.patent_id}.pdf", "wb") as f:
f.write(response.content)
pdf_path = f"patents/{patent.patent_id}.pdf"
os.makedirs("patents", exist_ok=True)
patent.pdf_path = f"patents/{patent.patent_id}.pdf"
if not (os.path.exists(pdf_path) and os.path.getsize(pdf_path) > 0):
response = requests.get(patent.pdf_link)
with open(pdf_path, "wb") as f:
f.write(response.content)
patent.pdf_path = pdf_path
return patent
def parse_patent_pdf(pdf_path: str) -> Dict:
+8 -6
View File
@@ -3,15 +3,15 @@ services:
image: postgres:16-alpine
container_name: sparc-postgres
environment:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
POSTGRES_DB: sparc
POSTGRES_USER: ${POSTGRES_USER}
POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
POSTGRES_DB: ${POSTGRES_DB}
ports:
- "5432:5432"
volumes:
- postgres_data:/var/lib/postgresql/data
healthcheck:
test: ["CMD-SHELL", "pg_isready -U postgres"]
test: ["CMD-SHELL", "pg_isready -U ${POSTGRES_USER}"]
interval: 5s
timeout: 5s
retries: 5
@@ -22,7 +22,7 @@ services:
container_name: sparc-init-db
command: python scripts/init_database.py
environment:
DATABASE_URL: postgresql://postgres:postgres@postgres:5432/sparc
DATABASE_URL: postgresql://${POSTGRES_USER}:${POSTGRES_PASSWORD}@postgres:5432/${POSTGRES_DB}
depends_on:
postgres:
condition: service_healthy
@@ -35,9 +35,11 @@ services:
environment:
API_KEY: ${API_KEY}
OPENROUTER_API_KEY: ${OPENROUTER_API_KEY}
DATABASE_URL: postgresql://postgres:postgres@postgres:5432/sparc
DATABASE_URL: postgresql://${POSTGRES_USER}:${POSTGRES_PASSWORD}@postgres:5432/${POSTGRES_DB}
USE_CACHE: "true"
JWT_SECRET: ${JWT_SECRET:-sparc-secret-key-change-in-production}
CORS_ORIGINS: ${CORS_ORIGINS:-}
APP_ENV: ${APP_ENV:-development}
ROOT_PATH: /api
ports:
- "8000:8000"
+191 -3
View File
@@ -1,11 +1,22 @@
"""Tests for the high-level company analyzer orchestration."""
import pytest
from unittest.mock import Mock, patch, call
from unittest.mock import Mock, patch, call, MagicMock
from SPARC.analyzer import CompanyAnalyzer
from SPARC.types import Patent, Patents, CompanyAnalysisResult, BatchAnalysisResult
@pytest.fixture(autouse=True)
def mock_db(mocker):
"""Mock DatabaseClient for all tests so no real DB connection is needed."""
mock_db_cls = mocker.patch("SPARC.analyzer.DatabaseClient")
mock_db_instance = MagicMock()
mock_db_instance.get_cached_patent.return_value = None
mock_db_instance.get_cached_serp_query.return_value = None
mock_db_cls.return_value = mock_db_instance
return mock_db_instance
class TestCompanyAnalyzer:
"""Test the CompanyAnalyzer orchestration logic."""
@@ -17,7 +28,7 @@ class TestCompanyAnalyzer:
mock_llm.assert_called_once_with(api_key="test-key")
def test_analyze_company_full_pipeline(self, mocker):
def test_analyze_company_full_pipeline(self, mocker, mock_db):
"""Test complete company analysis pipeline."""
# Mock all the dependencies
mock_query = mocker.patch("SPARC.analyzer.SERP.query")
@@ -178,6 +189,180 @@ class TestCompanyAnalyzer:
assert "PDF not found" in result
class TestSingleQueryBugFix:
"""Test that SERP.query is only called once per company analysis."""
def test_analyze_company_safe_calls_query_once(self, mocker, mock_db):
"""_analyze_company_safe should call SERP.query exactly once."""
mock_query = mocker.patch("SPARC.analyzer.SERP.query")
mock_save = mocker.patch("SPARC.analyzer.SERP.save_patents")
mock_parse = mocker.patch("SPARC.analyzer.SERP.parse_patent_pdf")
mock_minimize = mocker.patch("SPARC.analyzer.SERP.minimize_patent_for_llm")
mock_llm = mocker.patch("SPARC.analyzer.LLMAnalyzer")
patent = Patent(patent_id="US123", pdf_link="http://example.com/test.pdf")
mock_query.return_value = Patents(patents=[patent])
def save_side_effect(p):
p.pdf_path = "patents/US123.pdf"
return p
mock_save.side_effect = save_side_effect
mock_parse.return_value = {"abstract": "Test"}
mock_minimize.return_value = "Content"
mock_llm_instance = Mock()
mock_llm_instance.analyze_patent_portfolio.return_value = "Analysis"
mock_llm.return_value = mock_llm_instance
analyzer = CompanyAnalyzer()
analyzer._analyze_company_safe("TestCorp")
# The key assertion: SERP.query called exactly once, not twice
mock_query.assert_called_once_with("TestCorp")
def test_analyze_company_with_prefetched_patents_skips_query(self, mocker):
"""analyze_company should not call SERP.query when patents are provided."""
mock_query = mocker.patch("SPARC.analyzer.SERP.query")
mock_save = mocker.patch("SPARC.analyzer.SERP.save_patents")
mock_parse = mocker.patch("SPARC.analyzer.SERP.parse_patent_pdf")
mock_minimize = mocker.patch("SPARC.analyzer.SERP.minimize_patent_for_llm")
mock_llm = mocker.patch("SPARC.analyzer.LLMAnalyzer")
patent = Patent(patent_id="US123", pdf_link="http://example.com/test.pdf")
prefetched = Patents(patents=[patent])
def save_side_effect(p):
p.pdf_path = "patents/US123.pdf"
return p
mock_save.side_effect = save_side_effect
mock_parse.return_value = {"abstract": "Test"}
mock_minimize.return_value = "Content"
mock_llm_instance = Mock()
mock_llm_instance.analyze_patent_portfolio.return_value = "Analysis"
mock_llm.return_value = mock_llm_instance
analyzer = CompanyAnalyzer()
analyzer.analyze_company("TestCorp", patents=prefetched)
# SERP.query should never be called
mock_query.assert_not_called()
class TestPatentCaching:
"""Test patent-level DB caching in the pipeline."""
def test_process_single_patent_uses_db_cache(self, mocker, mock_db):
"""_process_single_patent returns cached content when available."""
mock_save = mocker.patch("SPARC.analyzer.SERP.save_patents")
mock_db.get_cached_patent.return_value = {
"patent_id": "US123",
"minimized_content": "Cached minimized content",
}
patent = Patent(patent_id="US123", pdf_link="http://example.com/test.pdf")
result = CompanyAnalyzer._process_single_patent(patent, "TestCorp", mock_db)
assert result == {"patent_id": "US123", "content": "Cached minimized content"}
# Should NOT download since cache hit
mock_save.assert_not_called()
def test_process_single_patent_stores_to_db_cache(self, mocker, mock_db):
"""_process_single_patent stores result in DB after processing."""
mock_save = mocker.patch("SPARC.analyzer.SERP.save_patents")
mock_parse = mocker.patch("SPARC.analyzer.SERP.parse_patent_pdf")
mock_minimize = mocker.patch("SPARC.analyzer.SERP.minimize_patent_for_llm")
# No cache hit
mock_db.get_cached_patent.return_value = None
patent = Patent(patent_id="US123", pdf_link="http://example.com/test.pdf")
def save_side_effect(p):
p.pdf_path = "patents/US123.pdf"
return p
mock_save.side_effect = save_side_effect
mock_parse.return_value = {"abstract": "Test abstract"}
mock_minimize.return_value = "Minimized content"
result = CompanyAnalyzer._process_single_patent(patent, "TestCorp", mock_db)
assert result == {"patent_id": "US123", "content": "Minimized content"}
mock_db.store_patent.assert_called_once_with(
patent_id="US123",
company_name="TestCorp",
pdf_link="http://example.com/test.pdf",
raw_sections={"abstract": "Test abstract"},
minimized_content="Minimized content",
)
def test_serp_query_cache_hit_skips_api(self, mocker, mock_db):
"""When SERP query is cached, API call is skipped."""
mock_query = mocker.patch("SPARC.analyzer.SERP.query")
mock_save = mocker.patch("SPARC.analyzer.SERP.save_patents")
mock_parse = mocker.patch("SPARC.analyzer.SERP.parse_patent_pdf")
mock_minimize = mocker.patch("SPARC.analyzer.SERP.minimize_patent_for_llm")
mock_llm = mocker.patch("SPARC.analyzer.LLMAnalyzer")
# Simulate SERP cache hit
mock_db.get_cached_serp_query.return_value = ["US123"]
# Simulate patent cache hit too
mock_db.get_cached_patent.return_value = {
"patent_id": "US123",
"minimized_content": "Cached content",
}
mock_llm_instance = Mock()
mock_llm_instance.analyze_patent_portfolio.return_value = "Analysis"
mock_llm.return_value = mock_llm_instance
analyzer = CompanyAnalyzer()
result = analyzer.analyze_company("TestCorp")
assert result == "Analysis"
# SERP.query should NOT be called
mock_query.assert_not_called()
# No downloads should happen
mock_save.assert_not_called()
def test_serp_query_cache_miss_stores_result(self, mocker, mock_db):
"""When SERP query cache misses, result is stored after API call."""
mock_query = mocker.patch("SPARC.analyzer.SERP.query")
mock_save = mocker.patch("SPARC.analyzer.SERP.save_patents")
mock_parse = mocker.patch("SPARC.analyzer.SERP.parse_patent_pdf")
mock_minimize = mocker.patch("SPARC.analyzer.SERP.minimize_patent_for_llm")
mock_llm = mocker.patch("SPARC.analyzer.LLMAnalyzer")
mock_db.get_cached_serp_query.return_value = None
patent = Patent(patent_id="US123", pdf_link="http://example.com/test.pdf")
mock_query.return_value = Patents(patents=[patent])
def save_side_effect(p):
p.pdf_path = "patents/US123.pdf"
return p
mock_save.side_effect = save_side_effect
mock_parse.return_value = {"abstract": "Test"}
mock_minimize.return_value = "Content"
mock_llm_instance = Mock()
mock_llm_instance.analyze_patent_portfolio.return_value = "Analysis"
mock_llm.return_value = mock_llm_instance
analyzer = CompanyAnalyzer()
analyzer.analyze_company("TestCorp")
mock_db.store_serp_query.assert_called_once()
call_kwargs = mock_db.store_serp_query.call_args[1]
assert call_kwargs["company_name"] == "TestCorp"
assert call_kwargs["patent_ids"] == ["US123"]
class TestBatchProcessing:
"""Test multi-company batch processing functionality."""
@@ -316,7 +501,7 @@ class TestBatchProcessing:
assert callback.call_count == 2
def test_company_analysis_result_structure(self, mocker):
def test_company_analysis_result_structure(self, mocker, mock_db):
"""Test CompanyAnalysisResult has correct structure."""
mock_query = mocker.patch("SPARC.analyzer.SERP.query")
mock_save = mocker.patch("SPARC.analyzer.SERP.save_patents")
@@ -327,6 +512,9 @@ class TestBatchProcessing:
patent = Patent(patent_id="US123", pdf_link="http://example.com/test.pdf")
mock_query.return_value = Patents(patents=[patent])
# Simulate DB caching: after store, subsequent get returns the IDs
mock_db.get_cached_serp_query.side_effect = [None, ["US123"]]
def save_side_effect(p):
p.pdf_path = "patents/US123.pdf"
return p
+116
View File
@@ -0,0 +1,116 @@
"""Tests for security hardening: JWT secret startup check, CORS config, credential handling."""
import os
from unittest.mock import patch
import pytest
class TestJWTSecretStartupCheck:
"""Test the startup guard that refuses default JWT secret in non-dev environments."""
def test_default_secret_in_production_raises(self):
"""Starting with default secret and APP_ENV=production must raise RuntimeError."""
with patch.dict(os.environ, {"APP_ENV": "production"}):
# Reload config to pick up the new APP_ENV
import importlib
import SPARC.config
importlib.reload(SPARC.config)
from SPARC.auth import _DEFAULT_JWT_SECRET, check_jwt_secret
# Patch JWT_SECRET to the default
with patch("SPARC.auth.JWT_SECRET", _DEFAULT_JWT_SECRET):
with pytest.raises(RuntimeError, match="FATAL.*JWT_SECRET"):
check_jwt_secret()
# Restore config
with patch.dict(os.environ, {"APP_ENV": "development"}):
importlib.reload(SPARC.config)
def test_default_secret_in_development_succeeds(self):
"""Starting with default secret and APP_ENV=development must not raise."""
with patch.dict(os.environ, {"APP_ENV": "development"}):
import importlib
import SPARC.config
importlib.reload(SPARC.config)
from SPARC.auth import _DEFAULT_JWT_SECRET, check_jwt_secret
with patch("SPARC.auth.JWT_SECRET", _DEFAULT_JWT_SECRET):
# Should not raise
check_jwt_secret()
# Restore
importlib.reload(SPARC.config)
def test_custom_secret_in_production_succeeds(self):
"""Starting with a custom secret in production must not raise."""
with patch.dict(os.environ, {"APP_ENV": "production"}):
import importlib
import SPARC.config
importlib.reload(SPARC.config)
from SPARC.auth import check_jwt_secret
with patch("SPARC.auth.JWT_SECRET", "my-secure-random-secret-abc123"):
# Should not raise
check_jwt_secret()
with patch.dict(os.environ, {"APP_ENV": "development"}):
importlib.reload(SPARC.config)
def test_default_secret_unset_env_succeeds(self):
"""When APP_ENV is unset (defaults to development), default secret is allowed."""
with patch.dict(os.environ, {}, clear=False):
# Remove APP_ENV if present
env = os.environ.copy()
env.pop("APP_ENV", None)
with patch.dict(os.environ, env, clear=True):
import importlib
import SPARC.config
importlib.reload(SPARC.config)
from SPARC.auth import _DEFAULT_JWT_SECRET, check_jwt_secret
with patch("SPARC.auth.JWT_SECRET", _DEFAULT_JWT_SECRET):
# Should not raise (defaults to development)
check_jwt_secret()
with patch.dict(os.environ, {"APP_ENV": "development"}):
importlib.reload(SPARC.config)
class TestCORSConfig:
"""Test that CORS origins are configurable via environment variable."""
def test_default_cors_origins(self):
"""When CORS_ORIGINS is unset, defaults to localhost origins."""
with patch.dict(os.environ, {"CORS_ORIGINS": ""}):
import importlib
import SPARC.config
importlib.reload(SPARC.config)
assert SPARC.config.cors_origins == [
"http://localhost:3000",
"http://localhost:5173",
]
def test_custom_cors_origins(self):
"""Setting CORS_ORIGINS configures allowed origins."""
with patch.dict(os.environ, {"CORS_ORIGINS": "https://sparc.example.com,https://app.example.com"}):
import importlib
import SPARC.config
importlib.reload(SPARC.config)
assert SPARC.config.cors_origins == [
"https://sparc.example.com",
"https://app.example.com",
]
# Restore
with patch.dict(os.environ, {"CORS_ORIGINS": ""}):
importlib.reload(SPARC.config)
def test_single_cors_origin(self):
"""A single origin without comma works correctly."""
with patch.dict(os.environ, {"CORS_ORIGINS": "https://sparc.example.com"}):
import importlib
import SPARC.config
importlib.reload(SPARC.config)
assert SPARC.config.cors_origins == ["https://sparc.example.com"]
with patch.dict(os.environ, {"CORS_ORIGINS": ""}):
importlib.reload(SPARC.config)
+90
View File
@@ -1,7 +1,11 @@
"""Tests for SERP API patent retrieval and parsing functionality."""
import os
import pytest
from unittest.mock import patch, Mock
from datetime import datetime, timedelta
from SPARC.serp_api import SERP
from SPARC.types import Patent
class TestTextCleaning:
@@ -176,3 +180,89 @@ class TestPatentMinimization:
# Sections should be separated by double newlines
assert "\n\n" in result
class TestDynamicDateRange:
"""Test dynamic date range computation in SERP.query."""
def test_query_uses_rolling_date_window(self, mocker):
"""Verify the date filter uses a rolling window, not hardcoded dates."""
mock_search = mocker.patch("SPARC.serp_api.serpapi.search")
mock_search.return_value = {"organic_results": []}
mocker.patch("SPARC.serp_api.config.api_key", "fake-key")
mocker.patch("SPARC.serp_api.config.patent_search_days", 90)
SERP.query("TestCorp")
call_params = mock_search.call_args[0][0]
tbs = call_params["tbs"]
# Should contain "cdr:1,cd_min:" with a date, not the old hardcoded one
assert "cdr:1,cd_min:" in tbs
assert "10/28/2025" not in tbs # old hardcoded date gone
def test_query_respects_days_back_param(self, mocker):
"""Verify days_back parameter controls the date window."""
mock_search = mocker.patch("SPARC.serp_api.serpapi.search")
mock_search.return_value = {"organic_results": []}
mocker.patch("SPARC.serp_api.config.api_key", "fake-key")
mocker.patch("SPARC.serp_api.config.patent_search_days", 90)
now = datetime.now()
SERP.query("TestCorp", days_back=30)
call_params = mock_search.call_args[0][0]
tbs = call_params["tbs"]
expected_start = (now - timedelta(days=30)).strftime("%-m/%-d/%Y")
assert expected_start in tbs
class TestFilesystemPDFCaching:
"""Test that save_patents skips download for existing files."""
def test_save_patents_skips_download_when_cached(self, mocker, tmp_path):
"""Already-downloaded PDFs should not be re-downloaded."""
mock_get = mocker.patch("SPARC.serp_api.requests.get")
mocker.patch("SPARC.serp_api.os.makedirs")
pdf_path = tmp_path / "US123.pdf"
pdf_path.write_bytes(b"%PDF-1.4 fake content")
mocker.patch("SPARC.serp_api.os.path.exists", return_value=True)
mocker.patch("SPARC.serp_api.os.path.getsize", return_value=100)
patent = Patent(patent_id="US123", pdf_link="http://example.com/test.pdf")
result = SERP.save_patents(patent)
mock_get.assert_not_called()
assert result.pdf_path == "patents/US123.pdf"
def test_save_patents_downloads_when_not_cached(self, mocker):
"""Missing PDFs should be downloaded."""
mock_response = Mock()
mock_response.content = b"%PDF-1.4 content"
mock_get = mocker.patch("SPARC.serp_api.requests.get", return_value=mock_response)
mocker.patch("SPARC.serp_api.os.makedirs")
mocker.patch("SPARC.serp_api.os.path.exists", return_value=False)
mock_open = mocker.patch("builtins.open", mocker.mock_open())
patent = Patent(patent_id="US456", pdf_link="http://example.com/test.pdf")
result = SERP.save_patents(patent)
mock_get.assert_called_once_with("http://example.com/test.pdf")
assert result.pdf_path == "patents/US456.pdf"
def test_save_patents_redownloads_empty_files(self, mocker):
"""Empty/corrupt PDFs (0 bytes) should be re-downloaded."""
mock_response = Mock()
mock_response.content = b"%PDF-1.4 content"
mock_get = mocker.patch("SPARC.serp_api.requests.get", return_value=mock_response)
mocker.patch("SPARC.serp_api.os.makedirs")
mocker.patch("SPARC.serp_api.os.path.exists", return_value=True)
mocker.patch("SPARC.serp_api.os.path.getsize", return_value=0)
mock_open = mocker.patch("builtins.open", mocker.mock_open())
patent = Patent(patent_id="US789", pdf_link="http://example.com/test.pdf")
result = SERP.save_patents(patent)
mock_get.assert_called_once()
assert result.pdf_path == "patents/US789.pdf"