diff --git a/.env.example b/.env.example index 4e78c43..bdb08f3 100644 --- a/.env.example +++ b/.env.example @@ -35,8 +35,26 @@ JWT_SECRET=your-secure-jwt-secret-change-in-production # Defaults to http://localhost:3000,http://localhost:5173 when unset # CORS_ORIGINS=https://sparc.example.com,https://app.example.com +# ---- Storage ---- + +# Backend for patent PDF storage: "local" (default) or "s3" +STORAGE_BACKEND=local + +# S3/MinIO settings (only used when STORAGE_BACKEND=s3) +# S3_BUCKET=sparc-patents +# S3_ENDPOINT_URL=http://localhost:9000 +# AWS_ACCESS_KEY_ID=minioadmin +# AWS_SECRET_ACCESS_KEY=minioadmin +# To start MinIO locally: docker compose --profile s3 up -d minio + # ---- Cache ---- # When USE_CACHE=true: check database for cached responses before making API calls # When USE_CACHE=false: always make fresh API calls (still stores results in database) USE_CACHE=true + +# ---- Webhooks ---- + +# Comma-separated list of webhook URLs for job completion and alert notifications +# Supports generic HTTP POST and Slack/Discord incoming webhooks +# WEBHOOK_URLS=https://hooks.slack.com/services/XXX,https://example.com/webhook diff --git a/.gitea/workflows/test.yaml b/.gitea/workflows/test.yaml index 9f452fb..49db9b9 100644 --- a/.gitea/workflows/test.yaml +++ b/.gitea/workflows/test.yaml @@ -34,6 +34,17 @@ jobs: run: | ruff check SPARC/ tests/ + - name: Install Node.js and frontend dependencies + shell: sh + run: | + apk add --no-cache nodejs npm + cd frontend && npm ci + + - name: Run TypeScript type check + shell: sh + run: | + cd frontend && npx tsc --noEmit + - name: Run pytest shell: sh env: diff --git a/SPARC/analyzer.py b/SPARC/analyzer.py index 996558a..c55803b 100644 --- a/SPARC/analyzer.py +++ b/SPARC/analyzer.py @@ -108,12 +108,10 @@ class CompanyAnalyzer: def analyze_single_patent(self, patent_id: str, company_name: str) -> str: """Analyze a single patent by ID. - Prerequisite: - The patent PDF must already exist at ``patents/{patent_id}.pdf`` - before calling this method. PDFs are downloaded automatically when - using the batch analysis pipeline (``analyze_company`` or the - ``/analyze/batch`` API endpoint). For standalone usage, download - the PDF manually or call ``SERP.save_patents()`` first. + If the patent PDF is not already on disk, this method attempts to + download it automatically by looking up the PDF link in the database + cache. If the link is not cached either, a ``FileNotFoundError`` is + raised with instructions on how to obtain the PDF. Args: patent_id: Publication ID of the patent (e.g. "US-11234567-B2") @@ -123,7 +121,7 @@ class CompanyAnalyzer: Analysis of the specific patent's innovation quality Raises: - FileNotFoundError: If the patent PDF is not found at the expected path. + FileNotFoundError: If the patent PDF cannot be found or downloaded. """ import os logger.info("Analyzing patent %s for %s...", patent_id, company_name) @@ -131,10 +129,22 @@ class CompanyAnalyzer: patent_path = f"patents/{patent_id}.pdf" if not os.path.exists(patent_path): - raise FileNotFoundError( - f"Patent PDF not found at '{patent_path}'. " - f"Download the PDF first using SERP.save_patents() or the batch analysis pipeline." - ) + # Attempt to download the PDF automatically from cached metadata + cached = self.db.get_cached_patent(patent_id) + pdf_link = cached.get("pdf_link") if cached else None + + if pdf_link: + logger.info("PDF not on disk; downloading %s from cached link", patent_id) + patent = SERP.save_patents( + Patent(patent_id=patent_id, pdf_link=pdf_link) + ) + patent_path = patent.pdf_path + else: + raise FileNotFoundError( + f"Patent PDF not found at '{patent_path}' and no download link is " + f"cached for '{patent_id}'. Run a company analysis first to populate " + f"the cache, or call SERP.save_patents() with the patent's PDF link." + ) try: sections = SERP.parse_patent_pdf(patent_path) diff --git a/SPARC/api.py b/SPARC/api.py index bc58fd0..932aa33 100644 --- a/SPARC/api.py +++ b/SPARC/api.py @@ -9,7 +9,7 @@ from typing import Annotated, List from fastapi import BackgroundTasks, Depends, FastAPI, HTTPException, Query, Request from fastapi.middleware.cors import CORSMiddleware -from fastapi.responses import JSONResponse +from fastapi.responses import JSONResponse, StreamingResponse from pydantic import BaseModel, EmailStr, Field from slowapi import Limiter from slowapi.errors import RateLimitExceeded @@ -77,6 +77,13 @@ class JobStatus(BaseModel): error: str | None = None +class PaginatedJobsResponse(BaseModel): + """Paginated response for job listings.""" + + items: list["JobStatus"] + next_cursor: str | None = None + + class HealthResponse(BaseModel): """Health check response.""" @@ -169,6 +176,9 @@ async def lifespan(app: FastAPI): import logging logging.getLogger(__name__).warning("Marked %d stale jobs as failed on startup", stale) _db.close() + # Start scheduled analysis if tracked companies are configured + from SPARC.scheduler import start_scheduler + start_scheduler() yield # Cleanup _analyzer = None @@ -369,6 +379,60 @@ async def delete_user( return {"message": "User deleted"} +# ============== Tracked Companies Endpoints ============== + + +class TrackCompanyRequest(BaseModel): + """Request to add a company to tracking.""" + + company_name: str = Field(..., min_length=1, max_length=255) + + +@app.get("/admin/tracked", tags=["Admin"]) +async def list_tracked_companies( + _: UserResponse = Depends(get_current_admin), +): + """List all tracked companies (admin only).""" + db = get_db_client() + return db.list_tracked_companies() + + +@app.post("/admin/tracked", tags=["Admin"]) +async def add_tracked_company( + request: TrackCompanyRequest, + _: UserResponse = Depends(get_current_admin), +): + """Add a company to the tracked list (admin only).""" + db = get_db_client() + result = db.add_tracked_company(request.company_name) + if not result: + raise HTTPException(status_code=409, detail="Company already tracked") + return result + + +@app.delete("/admin/tracked/{company_name}", tags=["Admin"]) +async def remove_tracked_company( + company_name: str, + _: UserResponse = Depends(get_current_admin), +): + """Remove a company from the tracked list (admin only).""" + db = get_db_client() + removed = db.remove_tracked_company(company_name) + if not removed: + raise HTTPException(status_code=404, detail="Company not found in tracking list") + return {"message": f"Stopped tracking {company_name}"} + + +@app.get("/admin/alerts", tags=["Admin"]) +async def list_alerts( + limit: int = Query(default=50, ge=1, le=200), + _: UserResponse = Depends(get_current_admin), +): + """List recent alerts from scheduled analysis (admin only).""" + db = get_db_client() + return db.list_alerts(limit=limit) + + # ============== Analytics Endpoint ============== @@ -461,6 +525,61 @@ async def get_analytics_trends( } +# ============== Export Endpoints ============== + + +@app.get("/export/{company_name}", tags=["Export"]) +async def export_company_csv( + company_name: str, + _: UserResponse = Depends(get_current_user), +): + """Export analysis results for a company as a CSV file. + + Returns all stored analysis records for the given company, including + analysis type, model used, response text, and timestamp. + + Args: + company_name: Company name to export results for + + Returns: + CSV file download + """ + import csv + import io + + db = get_db_client() + # Query all non-cached analysis results for this company + with db.get_conn() as conn: + with conn.cursor() as cur: + cur.execute( + """ + SELECT company_name, analysis_type, model, response, timestamp + FROM llm_messages + WHERE LOWER(company_name) = LOWER(%s) AND is_cached = FALSE + ORDER BY timestamp DESC + """, + (company_name,), + ) + rows = cur.fetchall() + + if not rows: + raise HTTPException(status_code=404, detail=f"No analysis results found for '{company_name}'") + + output = io.StringIO() + writer = csv.writer(output) + writer.writerow(["company_name", "analysis_type", "model", "analysis", "timestamp"]) + for row in rows: + writer.writerow(row) + + output.seek(0) + safe_name = company_name.replace(" ", "_").lower() + return StreamingResponse( + iter([output.getvalue()]), + media_type="text/csv", + headers={"Content-Disposition": f'attachment; filename="sparc_{safe_name}_export.csv"'}, + ) + + # ============== System Endpoints ============== @@ -501,6 +620,38 @@ async def analyze_company( return _convert_result(result) +@app.get( + "/analyze/patent/{patent_id}", + tags=["Analysis"], +) +async def analyze_single_patent( + patent_id: str, + company_name: str = Query(description="Company name for analysis context"), + _: UserResponse = Depends(get_current_user), +): + """Analyze a single patent by its publication ID. + + If the patent PDF is not already cached locally, the system will attempt + to download it automatically from a previously cached link. If no link + is available, a 404 error is returned. + + Args: + patent_id: Patent publication ID (e.g. "US-11234567-B2") + company_name: Company name for analysis context + + Returns: + Analysis text for the patent + """ + if not _analyzer: + raise HTTPException(status_code=503, detail="Analyzer not initialized") + + try: + analysis = _analyzer.analyze_single_patent(patent_id, company_name) + return {"patent_id": patent_id, "company_name": company_name, "analysis": analysis} + except FileNotFoundError as e: + raise HTTPException(status_code=404, detail=str(e)) + + @app.post( "/analyze/batch", response_model=BatchAnalysisResponse, @@ -591,8 +742,25 @@ def _run_batch_job(job_id: str, companies: list[str], max_workers: int): progress=100, result_json=_json.dumps(batch_response.model_dump(), default=str), ) + # Fire webhook notification + from SPARC.webhooks import notify_job_completed + notify_job_completed( + job_id=job_id, + status="completed", + total_companies=result.total_companies, + successful=result.successful, + failed=result.failed, + ) except Exception as e: db.update_job(job_id, status="failed", error=str(e)) + from SPARC.webhooks import notify_job_completed + notify_job_completed( + job_id=job_id, + status="failed", + total_companies=len(companies), + successful=0, + failed=len(companies), + ) @app.post("/analyze/batch/async", response_model=JobStatus, tags=["Analysis"]) @@ -649,24 +817,51 @@ async def get_job_status( return _job_row_to_status(job_row) -@app.get("/jobs", response_model=list[JobStatus], tags=["Jobs"]) +@app.get("/jobs", response_model=PaginatedJobsResponse, tags=["Jobs"]) async def list_jobs( status: Annotated[ str | None, Query(description="Filter by status: pending, running, completed, failed"), ] = None, limit: Annotated[int, Query(ge=1, le=100)] = 10, + cursor: Annotated[ + str | None, + Query(description="Opaque cursor from a previous response's next_cursor field"), + ] = None, _: UserResponse = Depends(get_current_user), ): - """List all analysis jobs. + """List analysis jobs with cursor-based pagination. + + Pass ``limit`` to control page size. The response includes a ``next_cursor`` + field; pass it back as the ``cursor`` query parameter to fetch the next page. + When ``next_cursor`` is ``null``, there are no more results. + + Existing clients that use only ``limit`` (without ``cursor``) continue to + work without modification. Args: status: Optional filter by job status limit: Maximum number of jobs to return (default 10, max 100) + cursor: Opaque pagination cursor from a previous response Returns: - List of job statuses + Paginated list of job statuses """ db = _get_job_db() - job_rows = db.list_jobs(status=status, limit=limit) - return [_job_row_to_status(row) for row in job_rows] + # Fetch one extra to determine if there is a next page + job_rows = db.list_jobs(status=status, limit=limit + 1, cursor=cursor) + + has_next = len(job_rows) > limit + if has_next: + job_rows = job_rows[:limit] + + items = [_job_row_to_status(row) for row in job_rows] + + next_cursor = None + if has_next and job_rows: + last = job_rows[-1] + created = last["created_at"] + ts = created.isoformat() if hasattr(created, "isoformat") else str(created) + next_cursor = f"{ts}|{last['job_id']}" + + return PaginatedJobsResponse(items=items, next_cursor=next_cursor) diff --git a/SPARC/config.py b/SPARC/config.py index e6f6173..4d89742 100644 --- a/SPARC/config.py +++ b/SPARC/config.py @@ -53,6 +53,13 @@ root_path = os.getenv("ROOT_PATH", "") # Used for safety checks (e.g., refusing default JWT secret in production) app_env = os.getenv("APP_ENV", "development") +# Storage backend: "local" (default) or "s3" for S3/MinIO object storage +storage_backend = os.getenv("STORAGE_BACKEND", "local") +s3_bucket = os.getenv("S3_BUCKET", "sparc-patents") +s3_endpoint_url = os.getenv("S3_ENDPOINT_URL", "") +s3_access_key = os.getenv("AWS_ACCESS_KEY_ID", "") +s3_secret_key = os.getenv("AWS_SECRET_ACCESS_KEY", "") + # CORS allowed origins (comma-separated) # Defaults to localhost dev origins when unset _cors_origins_raw = os.getenv("CORS_ORIGINS", "") diff --git a/SPARC/database.py b/SPARC/database.py index 4492311..24c7081 100644 --- a/SPARC/database.py +++ b/SPARC/database.py @@ -192,6 +192,35 @@ class DatabaseClient: ON jobs(status) """) + # Create tracked companies table for scheduled analysis + cursor.execute(""" + CREATE TABLE IF NOT EXISTS tracked_companies ( + id SERIAL PRIMARY KEY, + company_name VARCHAR(255) UNIQUE NOT NULL, + last_patent_count INTEGER DEFAULT 0, + last_analysis_at TIMESTAMP, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP + ) + """) + + # Create alerts table for significant changes + cursor.execute(""" + CREATE TABLE IF NOT EXISTS alerts ( + id SERIAL PRIMARY KEY, + company_name VARCHAR(255) NOT NULL, + alert_type VARCHAR(50) NOT NULL, + message TEXT NOT NULL, + old_value NUMERIC, + new_value NUMERIC, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP + ) + """) + + cursor.execute(""" + CREATE INDEX IF NOT EXISTS idx_alerts_company + ON alerts(company_name) + """) + self.conn.commit() @staticmethod @@ -568,20 +597,45 @@ class DatabaseClient: self, status: Optional[str] = None, limit: int = 10, + cursor: Optional[str] = None, ) -> List[Dict]: - """List jobs, optionally filtered by status.""" - query = "SELECT * FROM jobs" + """List jobs with optional status filter and cursor-based pagination. + + Args: + status: Optional status filter (pending, running, completed, failed). + limit: Maximum number of jobs to return. + cursor: Opaque cursor (``created_at|job_id``) from a previous + response. When provided, only jobs older than the cursor are + returned. + + Returns: + List of job dicts ordered by created_at descending. + """ + conditions: list[str] = [] params: list = [] + if status: - query += " WHERE status = %s" + conditions.append("status = %s") params.append(status) - query += " ORDER BY created_at DESC LIMIT %s" + + if cursor: + try: + ts_str, cursor_job_id = cursor.rsplit("|", 1) + conditions.append("(created_at, job_id) < (%s, %s)") + params.extend([ts_str, cursor_job_id]) + except ValueError: + pass # Ignore malformed cursors; return from start + + query = "SELECT * FROM jobs" + if conditions: + query += " WHERE " + " AND ".join(conditions) + query += " ORDER BY created_at DESC, job_id DESC LIMIT %s" params.append(limit) with self.get_conn() as conn: - with conn.cursor(cursor_factory=RealDictCursor) as cursor: - cursor.execute(query, params) - return [dict(row) for row in cursor.fetchall()] + with conn.cursor(cursor_factory=RealDictCursor) as cur: + cur.execute(query, params) + return [dict(row) for row in cur.fetchall()] def mark_stale_jobs_failed(self) -> int: """Mark any jobs in 'running' or 'pending' state as 'failed'. @@ -803,3 +857,81 @@ class DatabaseClient: with conn.cursor() as cursor: cursor.execute("SELECT COUNT(*) FROM users") return cursor.fetchone()[0] + + # Tracked Companies Methods + + def add_tracked_company(self, company_name: str) -> Optional[Dict]: + """Add a company to the tracking list.""" + with self.get_conn() as conn: + with conn.cursor(cursor_factory=RealDictCursor) as cursor: + try: + cursor.execute( + "INSERT INTO tracked_companies (company_name) VALUES (%s) RETURNING *", + (company_name,), + ) + row = cursor.fetchone() + conn.commit() + return dict(row) if row else None + except Exception: + conn.rollback() + return None + + def remove_tracked_company(self, company_name: str) -> bool: + """Remove a company from the tracking list.""" + with self.get_conn() as conn: + with conn.cursor() as cursor: + cursor.execute( + "DELETE FROM tracked_companies WHERE LOWER(company_name) = LOWER(%s)", + (company_name,), + ) + conn.commit() + return cursor.rowcount > 0 + + def list_tracked_companies(self) -> List[Dict]: + """List all tracked companies.""" + with self.get_conn() as conn: + with conn.cursor(cursor_factory=RealDictCursor) as cursor: + cursor.execute("SELECT * FROM tracked_companies ORDER BY company_name") + return [dict(row) for row in cursor.fetchall()] + + def update_tracked_company( + self, company_name: str, patent_count: int + ) -> None: + """Update the last analysis stats for a tracked company.""" + with self.get_conn() as conn: + with conn.cursor() as cursor: + cursor.execute( + """UPDATE tracked_companies + SET last_patent_count = %s, last_analysis_at = CURRENT_TIMESTAMP + WHERE LOWER(company_name) = LOWER(%s)""", + (patent_count, company_name), + ) + conn.commit() + + def store_alert( + self, + company_name: str, + alert_type: str, + message: str, + old_value: float | None = None, + new_value: float | None = None, + ) -> None: + """Record an alert for a significant change.""" + with self.get_conn() as conn: + with conn.cursor() as cursor: + cursor.execute( + """INSERT INTO alerts (company_name, alert_type, message, old_value, new_value) + VALUES (%s, %s, %s, %s, %s)""", + (company_name, alert_type, message, old_value, new_value), + ) + conn.commit() + + def list_alerts(self, limit: int = 50) -> List[Dict]: + """List recent alerts.""" + with self.get_conn() as conn: + with conn.cursor(cursor_factory=RealDictCursor) as cursor: + cursor.execute( + "SELECT * FROM alerts ORDER BY created_at DESC LIMIT %s", + (limit,), + ) + return [dict(row) for row in cursor.fetchall()] diff --git a/SPARC/scheduler.py b/SPARC/scheduler.py new file mode 100644 index 0000000..5af3940 --- /dev/null +++ b/SPARC/scheduler.py @@ -0,0 +1,109 @@ +"""Scheduled patent analysis for tracked companies. + +Uses APScheduler to periodically re-analyze tracked companies and +detect significant changes in patent counts. +""" + +import logging +import os + +from SPARC import config +from SPARC.analyzer import CompanyAnalyzer +from SPARC.database import DatabaseClient + +logger = logging.getLogger(__name__) + +# Configurable via environment variable (in hours, default 24) +SCHEDULE_INTERVAL_HOURS = int(os.getenv("SCHEDULE_INTERVAL_HOURS", "24")) + +# Patent count change threshold (percentage) to trigger an alert +CHANGE_THRESHOLD_PERCENT = int(os.getenv("CHANGE_THRESHOLD_PERCENT", "20")) + + +def run_scheduled_analysis() -> None: + """Re-analyze all tracked companies and check for significant changes.""" + db = DatabaseClient(config.database_url) + db.connect() + db.initialize_schema() + + tracked = db.list_tracked_companies() + if not tracked: + logger.info("No tracked companies configured; skipping scheduled analysis") + return + + logger.info("Running scheduled analysis for %d tracked companies", len(tracked)) + + analyzer = CompanyAnalyzer(db_client=db) + + for company_row in tracked: + name = company_row["company_name"] + old_count = company_row.get("last_patent_count", 0) or 0 + + try: + result = analyzer._analyze_company_safe(name) + + if result.success: + new_count = result.patent_count + + # Update tracking record + db.update_tracked_company(name, new_count) + + # Check for significant change + if old_count > 0: + delta_pct = abs(new_count - old_count) / old_count * 100 + if delta_pct >= CHANGE_THRESHOLD_PERCENT: + direction = "increased" if new_count > old_count else "decreased" + message = ( + f"Patent count for {name} {direction} by {delta_pct:.0f}% " + f"({old_count} -> {new_count})" + ) + logger.warning("ALERT: %s", message) + db.store_alert( + company_name=name, + alert_type="patent_count_change", + message=message, + old_value=old_count, + new_value=new_count, + ) + elif new_count > 0: + # First analysis -- record baseline + logger.info("Baseline for %s: %d patents", name, new_count) + else: + logger.warning("Scheduled analysis failed for %s: %s", name, result.error) + + except Exception as e: + logger.error("Error analyzing tracked company %s: %s", name, e) + + db.close() + logger.info("Scheduled analysis complete") + + +def start_scheduler() -> None: + """Start the APScheduler background scheduler. + + Safe to call at application startup. If apscheduler is not installed, + the function logs a warning and returns without starting anything. + """ + try: + from apscheduler.schedulers.background import BackgroundScheduler + except ImportError: + logger.warning( + "apscheduler not installed; scheduled analysis disabled. " + "Install with: pip install apscheduler" + ) + return + + scheduler = BackgroundScheduler() + scheduler.add_job( + run_scheduled_analysis, + "interval", + hours=SCHEDULE_INTERVAL_HOURS, + id="scheduled_patent_analysis", + replace_existing=True, + ) + scheduler.start() + logger.info( + "Scheduled patent analysis started (every %d hours, threshold %d%%)", + SCHEDULE_INTERVAL_HOURS, + CHANGE_THRESHOLD_PERCENT, + ) diff --git a/SPARC/serp_api.py b/SPARC/serp_api.py index cb6a8af..2c89a2a 100644 --- a/SPARC/serp_api.py +++ b/SPARC/serp_api.py @@ -1,4 +1,5 @@ -import os +import io +import logging import re from datetime import datetime, timedelta from typing import Dict @@ -8,8 +9,21 @@ import requests import serpapi from SPARC import config +from SPARC.storage import StorageBackend, get_storage_backend from SPARC.types import Patent, Patents +logger = logging.getLogger(__name__) + +# Module-level storage instance (lazy-initialized) +_storage: StorageBackend | None = None + + +def _get_storage() -> StorageBackend: + global _storage + if _storage is None: + _storage = get_storage_backend() + return _storage + class SERP: def query(company: str, days_back: int = None) -> Patents: @@ -44,6 +58,7 @@ class SERP: "tbs": date_filter, "api_key": config.api_key, } + logger.info("Querying Google Patents for '%s' (last %d days)", company, days_back) search = serpapi.search(params) # Convert results to Patent objects, skipping any without PDF links patent_ids = [] @@ -52,13 +67,16 @@ class SERP: pdf_link = patent.get("pdf") if pdf_link: patent_ids.append(Patent(patent_id=patent["publication_number"], pdf_link=pdf_link, summary=None)) - # Patents without PDF links are skipped (see docstring for details) + else: + logger.debug("Skipping patent %s (no PDF link)", patent.get("publication_number", "unknown")) + logger.info("Found %d patents with PDF links for '%s'", len(patent_ids), company) return Patents(patents=patent_ids) def save_patents(patent: Patent) -> Patent: - """ - Save the patent PDF to the patents folder, skipping download if already cached. + """Save the patent PDF to storage, skipping download if already cached. + + Uses the configured storage backend (local filesystem or S3). Args: patent: Patent object @@ -66,35 +84,51 @@ class SERP: Returns: Patent object with updated PDF path """ - pdf_path = f"patents/{patent.patent_id}.pdf" - os.makedirs("patents", exist_ok=True) + storage = _get_storage() + key = f"{patent.patent_id}.pdf" - if not (os.path.exists(pdf_path) and os.path.getsize(pdf_path) > 0): + if not storage.exists(key): + logger.info("Downloading PDF for %s", patent.patent_id) response = requests.get(patent.pdf_link) - with open(pdf_path, "wb") as f: - f.write(response.content) + storage.write(key, response.content) + logger.debug("Saved %d bytes for %s", len(response.content), patent.patent_id) + else: + logger.debug("Using cached PDF for %s", patent.patent_id) - patent.pdf_path = pdf_path + patent.pdf_path = storage.path_for(key) return patent def parse_patent_pdf(pdf_path: str) -> Dict: """Extract structured sections from patent PDF. Extracts all major sections from a patent PDF including abstract, - claims, summary, and detailed description. + claims, summary, and detailed description. Supports both local file + paths and S3 URIs (s3://bucket/key). Args: - pdf_path: Path to the patent PDF file + pdf_path: Local path or S3 URI to the patent PDF file Returns: Dictionary containing all extracted sections """ + logger.debug("Parsing patent PDF: %s", pdf_path) - with pdfplumber.open(pdf_path) as pdf: + if pdf_path.startswith("s3://"): + # Read from S3 via storage backend + storage = _get_storage() + # Extract key from "s3://bucket/key" + key = pdf_path.split("/", 3)[-1] + data = storage.read(key) + pdf_file: io.BytesIO | str = io.BytesIO(data) + else: + pdf_file = pdf_path + + with pdfplumber.open(pdf_file) as pdf: # Extract all text full_text = "" for page in pdf.pages: full_text += page.extract_text() + "\n" + logger.debug("Extracted text from %d pages (%d chars)", len(pdf.pages), len(full_text)) # Define section patterns (common in patents) sections = { diff --git a/SPARC/storage.py b/SPARC/storage.py new file mode 100644 index 0000000..5159dd6 --- /dev/null +++ b/SPARC/storage.py @@ -0,0 +1,171 @@ +"""Patent PDF storage abstraction. + +Provides a unified interface for reading and writing patent PDF files, +with pluggable backends for local filesystem and S3-compatible object +storage (e.g., MinIO, AWS S3). +""" + +import logging +import os +from abc import ABC, abstractmethod + +from SPARC import config + +logger = logging.getLogger(__name__) + + +class StorageBackend(ABC): + """Abstract base class for patent PDF storage.""" + + @abstractmethod + def read(self, key: str) -> bytes: + """Read a file by key. + + Args: + key: Storage key (e.g., "US-12345678-B2.pdf") + + Returns: + File contents as bytes. + + Raises: + FileNotFoundError: If the file does not exist. + """ + + @abstractmethod + def write(self, key: str, data: bytes) -> None: + """Write data to storage. + + Args: + key: Storage key (e.g., "US-12345678-B2.pdf") + data: File contents as bytes. + """ + + @abstractmethod + def exists(self, key: str) -> bool: + """Check if a file exists in storage. + + Args: + key: Storage key. + + Returns: + True if the file exists and has non-zero size. + """ + + @abstractmethod + def path_for(self, key: str) -> str: + """Return a path or URI suitable for downstream consumers. + + For local storage this is a filesystem path; for S3 it is the + object key (callers that need a local file should use read() + and write to a temporary location). + """ + + +class LocalStorageBackend(StorageBackend): + """Store patent PDFs on the local filesystem under a directory.""" + + def __init__(self, base_dir: str = "patents"): + self.base_dir = base_dir + os.makedirs(self.base_dir, exist_ok=True) + + def _full_path(self, key: str) -> str: + return os.path.join(self.base_dir, key) + + def read(self, key: str) -> bytes: + path = self._full_path(key) + if not os.path.exists(path): + raise FileNotFoundError(f"File not found: {path}") + with open(path, "rb") as f: + return f.read() + + def write(self, key: str, data: bytes) -> None: + path = self._full_path(key) + os.makedirs(os.path.dirname(path) or self.base_dir, exist_ok=True) + with open(path, "wb") as f: + f.write(data) + logger.debug("Wrote %d bytes to %s", len(data), path) + + def exists(self, key: str) -> bool: + path = self._full_path(key) + return os.path.exists(path) and os.path.getsize(path) > 0 + + def path_for(self, key: str) -> str: + return self._full_path(key) + + +class S3StorageBackend(StorageBackend): + """Store patent PDFs in an S3-compatible bucket.""" + + def __init__( + self, + bucket: str, + endpoint_url: str = "", + access_key: str = "", + secret_key: str = "", + ): + import boto3 + + kwargs: dict = {} + if endpoint_url: + kwargs["endpoint_url"] = endpoint_url + if access_key and secret_key: + kwargs["aws_access_key_id"] = access_key + kwargs["aws_secret_access_key"] = secret_key + + self.s3 = boto3.client("s3", **kwargs) + self.bucket = bucket + + # Ensure bucket exists (useful for MinIO local dev) + try: + self.s3.head_bucket(Bucket=self.bucket) + except Exception: + try: + self.s3.create_bucket(Bucket=self.bucket) + logger.info("Created S3 bucket: %s", self.bucket) + except Exception as e: + logger.warning("Could not create bucket %s: %s", self.bucket, e) + + def read(self, key: str) -> bytes: + try: + response = self.s3.get_object(Bucket=self.bucket, Key=key) + return response["Body"].read() + except self.s3.exceptions.NoSuchKey: + raise FileNotFoundError(f"S3 object not found: s3://{self.bucket}/{key}") + except Exception as e: + if "NoSuchKey" in str(e) or "404" in str(e): + raise FileNotFoundError(f"S3 object not found: s3://{self.bucket}/{key}") + raise + + def write(self, key: str, data: bytes) -> None: + self.s3.put_object( + Bucket=self.bucket, + Key=key, + Body=data, + ContentType="application/pdf", + ) + logger.debug("Wrote %d bytes to s3://%s/%s", len(data), self.bucket, key) + + def exists(self, key: str) -> bool: + try: + response = self.s3.head_object(Bucket=self.bucket, Key=key) + return response["ContentLength"] > 0 + except Exception: + return False + + def path_for(self, key: str) -> str: + return f"s3://{self.bucket}/{key}" + + +def get_storage_backend() -> StorageBackend: + """Factory: return the configured storage backend instance.""" + backend = config.storage_backend.lower() + if backend == "s3": + logger.info("Using S3 storage backend (bucket=%s)", config.s3_bucket) + return S3StorageBackend( + bucket=config.s3_bucket, + endpoint_url=config.s3_endpoint_url, + access_key=config.s3_access_key, + secret_key=config.s3_secret_key, + ) + logger.info("Using local storage backend") + return LocalStorageBackend() diff --git a/SPARC/webhooks.py b/SPARC/webhooks.py new file mode 100644 index 0000000..08760fe --- /dev/null +++ b/SPARC/webhooks.py @@ -0,0 +1,139 @@ +"""Webhook notifications for job completion and alert events. + +Sends JSON payloads to configured webhook URLs with retry logic. +Supports generic HTTP POST and Slack-compatible text payloads. +""" + +import logging +import os +import time +from datetime import datetime +from typing import Any + +import requests + +logger = logging.getLogger(__name__) + +# Comma-separated list of webhook URLs (env var based config) +_WEBHOOK_URLS_RAW = os.getenv("WEBHOOK_URLS", "") +WEBHOOK_URLS: list[str] = [ + url.strip() for url in _WEBHOOK_URLS_RAW.split(",") if url.strip() +] + +MAX_RETRIES = 3 +BACKOFF_BASE = 2 # seconds + + +def _is_slack_url(url: str) -> bool: + """Check if a URL looks like a Slack incoming webhook.""" + return "hooks.slack.com" in url or "discord.com/api/webhooks" in url + + +def _build_payload(event_type: str, data: dict[str, Any], slack: bool = False) -> dict: + """Build the webhook payload. + + Args: + event_type: Type of event (e.g., "job_completed", "alert") + data: Event-specific data + slack: If True, wrap in Slack-compatible ``text`` format + + Returns: + JSON-serializable payload dict + """ + payload = { + "event": event_type, + "timestamp": datetime.utcnow().isoformat() + "Z", + **data, + } + + if slack: + # Build a human-readable summary for Slack/Discord + lines = [f"*[SPARC] {event_type}*"] + for key, value in data.items(): + lines.append(f" {key}: {value}") + return {"text": "\n".join(lines)} + + return payload + + +def _send_with_retry(url: str, payload: dict) -> bool: + """Send a POST request with exponential backoff retry. + + Args: + url: Webhook URL + payload: JSON payload to send + + Returns: + True if delivered successfully, False after all retries exhausted + """ + for attempt in range(1, MAX_RETRIES + 1): + try: + response = requests.post(url, json=payload, timeout=10) + if response.status_code < 300: + logger.debug("Webhook delivered to %s (attempt %d)", url, attempt) + return True + logger.warning( + "Webhook %s returned %d (attempt %d/%d)", + url, response.status_code, attempt, MAX_RETRIES, + ) + except requests.RequestException as e: + logger.warning( + "Webhook delivery failed for %s (attempt %d/%d): %s", + url, attempt, MAX_RETRIES, e, + ) + + if attempt < MAX_RETRIES: + wait = BACKOFF_BASE ** attempt + time.sleep(wait) + + logger.error("Webhook permanently failed for %s after %d attempts", url, MAX_RETRIES) + return False + + +def notify(event_type: str, data: dict[str, Any]) -> None: + """Fire all configured webhooks for an event. + + Safe to call even when no webhooks are configured (returns immediately). + + Args: + event_type: Event identifier (e.g., "job_completed", "patent_alert") + data: Event data to include in the payload + """ + if not WEBHOOK_URLS: + return + + for url in WEBHOOK_URLS: + slack = _is_slack_url(url) + payload = _build_payload(event_type, data, slack=slack) + _send_with_retry(url, payload) + + +def notify_job_completed( + job_id: str, + status: str, + total_companies: int, + successful: int, + failed: int, +) -> None: + """Send notification when a batch job completes.""" + notify("job_completed", { + "job_id": job_id, + "status": status, + "total_companies": total_companies, + "successful": successful, + "failed": failed, + "summary": f"Batch job {job_id}: {successful}/{total_companies} succeeded", + }) + + +def notify_alert( + company_name: str, + alert_type: str, + message: str, +) -> None: + """Send notification for a tracked company alert.""" + notify("patent_alert", { + "company_name": company_name, + "alert_type": alert_type, + "message": message, + }) diff --git a/docker-compose.yml b/docker-compose.yml index fa42f8c..95cc313 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -52,6 +52,29 @@ services: - ./patents:/app/patents restart: unless-stopped + # Optional: MinIO for S3-compatible local object storage + # Enable by setting STORAGE_BACKEND=s3 in .env + minio: + image: minio/minio:latest + container_name: sparc-minio + command: server /data --console-address ":9001" + environment: + MINIO_ROOT_USER: ${AWS_ACCESS_KEY_ID:-minioadmin} + MINIO_ROOT_PASSWORD: ${AWS_SECRET_ACCESS_KEY:-minioadmin} + ports: + - "9000:9000" + - "9001:9001" + volumes: + - minio_data:/data + healthcheck: + test: ["CMD", "mc", "ready", "local"] + interval: 10s + timeout: 5s + retries: 3 + restart: unless-stopped + profiles: + - s3 + dashboard: build: ./frontend container_name: sparc-dashboard @@ -63,3 +86,4 @@ services: volumes: postgres_data: + minio_data: diff --git a/frontend/index.html b/frontend/index.html index 631e457..0ff0633 100644 --- a/frontend/index.html +++ b/frontend/index.html @@ -7,6 +7,15 @@
Loading analytics data...
+
- Set USE_DATABASE=true in your .env file to enable analytics tracking.
+ Could not connect to the analytics database. Ensure PostgreSQL is running and
+ DATABASE_URL is configured correctly.
+ {mutation.error instanceof Error ? mutation.error.message : 'An unexpected error occurred.'} + {' '}Check your connection and try again. +
++ Compare patent portfolios of two companies side by side. +
+