forked from 0xWheatyz/SPARC
merge: resolve multi-model conflicts with trends and export endpoints
Keeps model selection, analytics trends, and CSV export endpoints.
This commit is contained in:
+21
-11
@@ -108,12 +108,10 @@ class CompanyAnalyzer:
|
||||
def analyze_single_patent(self, patent_id: str, company_name: str) -> str:
|
||||
"""Analyze a single patent by ID.
|
||||
|
||||
Prerequisite:
|
||||
The patent PDF must already exist at ``patents/{patent_id}.pdf``
|
||||
before calling this method. PDFs are downloaded automatically when
|
||||
using the batch analysis pipeline (``analyze_company`` or the
|
||||
``/analyze/batch`` API endpoint). For standalone usage, download
|
||||
the PDF manually or call ``SERP.save_patents()`` first.
|
||||
If the patent PDF is not already on disk, this method attempts to
|
||||
download it automatically by looking up the PDF link in the database
|
||||
cache. If the link is not cached either, a ``FileNotFoundError`` is
|
||||
raised with instructions on how to obtain the PDF.
|
||||
|
||||
Args:
|
||||
patent_id: Publication ID of the patent (e.g. "US-11234567-B2")
|
||||
@@ -123,7 +121,7 @@ class CompanyAnalyzer:
|
||||
Analysis of the specific patent's innovation quality
|
||||
|
||||
Raises:
|
||||
FileNotFoundError: If the patent PDF is not found at the expected path.
|
||||
FileNotFoundError: If the patent PDF cannot be found or downloaded.
|
||||
"""
|
||||
import os
|
||||
logger.info("Analyzing patent %s for %s...", patent_id, company_name)
|
||||
@@ -131,10 +129,22 @@ class CompanyAnalyzer:
|
||||
patent_path = f"patents/{patent_id}.pdf"
|
||||
|
||||
if not os.path.exists(patent_path):
|
||||
raise FileNotFoundError(
|
||||
f"Patent PDF not found at '{patent_path}'. "
|
||||
f"Download the PDF first using SERP.save_patents() or the batch analysis pipeline."
|
||||
)
|
||||
# Attempt to download the PDF automatically from cached metadata
|
||||
cached = self.db.get_cached_patent(patent_id)
|
||||
pdf_link = cached.get("pdf_link") if cached else None
|
||||
|
||||
if pdf_link:
|
||||
logger.info("PDF not on disk; downloading %s from cached link", patent_id)
|
||||
patent = SERP.save_patents(
|
||||
Patent(patent_id=patent_id, pdf_link=pdf_link)
|
||||
)
|
||||
patent_path = patent.pdf_path
|
||||
else:
|
||||
raise FileNotFoundError(
|
||||
f"Patent PDF not found at '{patent_path}' and no download link is "
|
||||
f"cached for '{patent_id}'. Run a company analysis first to populate "
|
||||
f"the cache, or call SERP.save_patents() with the patent's PDF link."
|
||||
)
|
||||
|
||||
try:
|
||||
sections = SERP.parse_patent_pdf(patent_path)
|
||||
|
||||
+273
-6
@@ -9,7 +9,7 @@ from typing import Annotated, List
|
||||
|
||||
from fastapi import BackgroundTasks, Depends, FastAPI, HTTPException, Query, Request
|
||||
from fastapi.middleware.cors import CORSMiddleware
|
||||
from fastapi.responses import JSONResponse
|
||||
from fastapi.responses import JSONResponse, StreamingResponse
|
||||
from pydantic import BaseModel, EmailStr, Field
|
||||
from slowapi import Limiter
|
||||
from slowapi.errors import RateLimitExceeded
|
||||
@@ -91,6 +91,13 @@ class JobStatus(BaseModel):
|
||||
error: str | None = None
|
||||
|
||||
|
||||
class PaginatedJobsResponse(BaseModel):
|
||||
"""Paginated response for job listings."""
|
||||
|
||||
items: list["JobStatus"]
|
||||
next_cursor: str | None = None
|
||||
|
||||
|
||||
class HealthResponse(BaseModel):
|
||||
"""Health check response."""
|
||||
|
||||
@@ -184,6 +191,9 @@ async def lifespan(app: FastAPI):
|
||||
import logging
|
||||
logging.getLogger(__name__).warning("Marked %d stale jobs as failed on startup", stale)
|
||||
_db.close()
|
||||
# Start scheduled analysis if tracked companies are configured
|
||||
from SPARC.scheduler import start_scheduler
|
||||
start_scheduler()
|
||||
yield
|
||||
# Cleanup
|
||||
_analyzer = None
|
||||
@@ -384,6 +394,60 @@ async def delete_user(
|
||||
return {"message": "User deleted"}
|
||||
|
||||
|
||||
# ============== Tracked Companies Endpoints ==============
|
||||
|
||||
|
||||
class TrackCompanyRequest(BaseModel):
|
||||
"""Request to add a company to tracking."""
|
||||
|
||||
company_name: str = Field(..., min_length=1, max_length=255)
|
||||
|
||||
|
||||
@app.get("/admin/tracked", tags=["Admin"])
|
||||
async def list_tracked_companies(
|
||||
_: UserResponse = Depends(get_current_admin),
|
||||
):
|
||||
"""List all tracked companies (admin only)."""
|
||||
db = get_db_client()
|
||||
return db.list_tracked_companies()
|
||||
|
||||
|
||||
@app.post("/admin/tracked", tags=["Admin"])
|
||||
async def add_tracked_company(
|
||||
request: TrackCompanyRequest,
|
||||
_: UserResponse = Depends(get_current_admin),
|
||||
):
|
||||
"""Add a company to the tracked list (admin only)."""
|
||||
db = get_db_client()
|
||||
result = db.add_tracked_company(request.company_name)
|
||||
if not result:
|
||||
raise HTTPException(status_code=409, detail="Company already tracked")
|
||||
return result
|
||||
|
||||
|
||||
@app.delete("/admin/tracked/{company_name}", tags=["Admin"])
|
||||
async def remove_tracked_company(
|
||||
company_name: str,
|
||||
_: UserResponse = Depends(get_current_admin),
|
||||
):
|
||||
"""Remove a company from the tracked list (admin only)."""
|
||||
db = get_db_client()
|
||||
removed = db.remove_tracked_company(company_name)
|
||||
if not removed:
|
||||
raise HTTPException(status_code=404, detail="Company not found in tracking list")
|
||||
return {"message": f"Stopped tracking {company_name}"}
|
||||
|
||||
|
||||
@app.get("/admin/alerts", tags=["Admin"])
|
||||
async def list_alerts(
|
||||
limit: int = Query(default=50, ge=1, le=200),
|
||||
_: UserResponse = Depends(get_current_admin),
|
||||
):
|
||||
"""List recent alerts from scheduled analysis (admin only)."""
|
||||
db = get_db_client()
|
||||
return db.list_alerts(limit=limit)
|
||||
|
||||
|
||||
# ============== Analytics Endpoint ==============
|
||||
|
||||
|
||||
@@ -430,6 +494,133 @@ async def list_models():
|
||||
}
|
||||
|
||||
|
||||
@app.get("/analytics/trends", tags=["Analytics"])
|
||||
async def get_analytics_trends(
|
||||
days: int = Query(default=90, ge=7, le=365),
|
||||
_: UserResponse = Depends(get_current_user),
|
||||
):
|
||||
"""Get trend data for patent analysis over time.
|
||||
|
||||
Returns two datasets:
|
||||
- ``by_month``: analysis count per company per month
|
||||
- ``by_type_over_time``: analysis type distribution per month
|
||||
|
||||
Args:
|
||||
days: Number of days to look back (default 90)
|
||||
|
||||
Returns:
|
||||
Trend data suitable for time-series and distribution charts
|
||||
"""
|
||||
db = get_db_client()
|
||||
|
||||
with db.get_conn() as conn:
|
||||
with conn.cursor() as cur:
|
||||
# Analyses per company per month
|
||||
cur.execute(
|
||||
"""
|
||||
SELECT
|
||||
TO_CHAR(timestamp, 'YYYY-MM') AS month,
|
||||
company_name,
|
||||
COUNT(*) AS count
|
||||
FROM llm_messages
|
||||
WHERE timestamp >= NOW() - INTERVAL '%s days'
|
||||
AND is_cached = FALSE
|
||||
AND company_name IS NOT NULL
|
||||
GROUP BY month, company_name
|
||||
ORDER BY month
|
||||
""",
|
||||
(days,),
|
||||
)
|
||||
by_month_rows = cur.fetchall()
|
||||
|
||||
# Analysis type distribution per month
|
||||
cur.execute(
|
||||
"""
|
||||
SELECT
|
||||
TO_CHAR(timestamp, 'YYYY-MM') AS month,
|
||||
analysis_type,
|
||||
COUNT(*) AS count
|
||||
FROM llm_messages
|
||||
WHERE timestamp >= NOW() - INTERVAL '%s days'
|
||||
AND is_cached = FALSE
|
||||
GROUP BY month, analysis_type
|
||||
ORDER BY month
|
||||
""",
|
||||
(days,),
|
||||
)
|
||||
by_type_rows = cur.fetchall()
|
||||
|
||||
by_month = [
|
||||
{"month": row[0], "company_name": row[1], "count": row[2]}
|
||||
for row in by_month_rows
|
||||
]
|
||||
by_type_over_time = [
|
||||
{"month": row[0], "analysis_type": row[1], "count": row[2]}
|
||||
for row in by_type_rows
|
||||
]
|
||||
|
||||
return {
|
||||
"by_month": by_month,
|
||||
"by_type_over_time": by_type_over_time,
|
||||
"period_days": days,
|
||||
}
|
||||
|
||||
|
||||
# ============== Export Endpoints ==============
|
||||
|
||||
|
||||
@app.get("/export/{company_name}", tags=["Export"])
|
||||
async def export_company_csv(
|
||||
company_name: str,
|
||||
_: UserResponse = Depends(get_current_user),
|
||||
):
|
||||
"""Export analysis results for a company as a CSV file.
|
||||
|
||||
Returns all stored analysis records for the given company, including
|
||||
analysis type, model used, response text, and timestamp.
|
||||
|
||||
Args:
|
||||
company_name: Company name to export results for
|
||||
|
||||
Returns:
|
||||
CSV file download
|
||||
"""
|
||||
import csv
|
||||
import io
|
||||
|
||||
db = get_db_client()
|
||||
# Query all non-cached analysis results for this company
|
||||
with db.get_conn() as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(
|
||||
"""
|
||||
SELECT company_name, analysis_type, model, response, timestamp
|
||||
FROM llm_messages
|
||||
WHERE LOWER(company_name) = LOWER(%s) AND is_cached = FALSE
|
||||
ORDER BY timestamp DESC
|
||||
""",
|
||||
(company_name,),
|
||||
)
|
||||
rows = cur.fetchall()
|
||||
|
||||
if not rows:
|
||||
raise HTTPException(status_code=404, detail=f"No analysis results found for '{company_name}'")
|
||||
|
||||
output = io.StringIO()
|
||||
writer = csv.writer(output)
|
||||
writer.writerow(["company_name", "analysis_type", "model", "analysis", "timestamp"])
|
||||
for row in rows:
|
||||
writer.writerow(row)
|
||||
|
||||
output.seek(0)
|
||||
safe_name = company_name.replace(" ", "_").lower()
|
||||
return StreamingResponse(
|
||||
iter([output.getvalue()]),
|
||||
media_type="text/csv",
|
||||
headers={"Content-Disposition": f'attachment; filename="sparc_{safe_name}_export.csv"'},
|
||||
)
|
||||
|
||||
|
||||
# ============== System Endpoints ==============
|
||||
|
||||
|
||||
@@ -470,6 +661,38 @@ async def analyze_company(
|
||||
return _convert_result(result)
|
||||
|
||||
|
||||
@app.get(
|
||||
"/analyze/patent/{patent_id}",
|
||||
tags=["Analysis"],
|
||||
)
|
||||
async def analyze_single_patent(
|
||||
patent_id: str,
|
||||
company_name: str = Query(description="Company name for analysis context"),
|
||||
_: UserResponse = Depends(get_current_user),
|
||||
):
|
||||
"""Analyze a single patent by its publication ID.
|
||||
|
||||
If the patent PDF is not already cached locally, the system will attempt
|
||||
to download it automatically from a previously cached link. If no link
|
||||
is available, a 404 error is returned.
|
||||
|
||||
Args:
|
||||
patent_id: Patent publication ID (e.g. "US-11234567-B2")
|
||||
company_name: Company name for analysis context
|
||||
|
||||
Returns:
|
||||
Analysis text for the patent
|
||||
"""
|
||||
if not _analyzer:
|
||||
raise HTTPException(status_code=503, detail="Analyzer not initialized")
|
||||
|
||||
try:
|
||||
analysis = _analyzer.analyze_single_patent(patent_id, company_name)
|
||||
return {"patent_id": patent_id, "company_name": company_name, "analysis": analysis}
|
||||
except FileNotFoundError as e:
|
||||
raise HTTPException(status_code=404, detail=str(e))
|
||||
|
||||
|
||||
@app.post(
|
||||
"/analyze/batch",
|
||||
response_model=BatchAnalysisResponse,
|
||||
@@ -560,8 +783,25 @@ def _run_batch_job(job_id: str, companies: list[str], max_workers: int):
|
||||
progress=100,
|
||||
result_json=_json.dumps(batch_response.model_dump(), default=str),
|
||||
)
|
||||
# Fire webhook notification
|
||||
from SPARC.webhooks import notify_job_completed
|
||||
notify_job_completed(
|
||||
job_id=job_id,
|
||||
status="completed",
|
||||
total_companies=result.total_companies,
|
||||
successful=result.successful,
|
||||
failed=result.failed,
|
||||
)
|
||||
except Exception as e:
|
||||
db.update_job(job_id, status="failed", error=str(e))
|
||||
from SPARC.webhooks import notify_job_completed
|
||||
notify_job_completed(
|
||||
job_id=job_id,
|
||||
status="failed",
|
||||
total_companies=len(companies),
|
||||
successful=0,
|
||||
failed=len(companies),
|
||||
)
|
||||
|
||||
|
||||
@app.post("/analyze/batch/async", response_model=JobStatus, tags=["Analysis"])
|
||||
@@ -618,24 +858,51 @@ async def get_job_status(
|
||||
return _job_row_to_status(job_row)
|
||||
|
||||
|
||||
@app.get("/jobs", response_model=list[JobStatus], tags=["Jobs"])
|
||||
@app.get("/jobs", response_model=PaginatedJobsResponse, tags=["Jobs"])
|
||||
async def list_jobs(
|
||||
status: Annotated[
|
||||
str | None,
|
||||
Query(description="Filter by status: pending, running, completed, failed"),
|
||||
] = None,
|
||||
limit: Annotated[int, Query(ge=1, le=100)] = 10,
|
||||
cursor: Annotated[
|
||||
str | None,
|
||||
Query(description="Opaque cursor from a previous response's next_cursor field"),
|
||||
] = None,
|
||||
_: UserResponse = Depends(get_current_user),
|
||||
):
|
||||
"""List all analysis jobs.
|
||||
"""List analysis jobs with cursor-based pagination.
|
||||
|
||||
Pass ``limit`` to control page size. The response includes a ``next_cursor``
|
||||
field; pass it back as the ``cursor`` query parameter to fetch the next page.
|
||||
When ``next_cursor`` is ``null``, there are no more results.
|
||||
|
||||
Existing clients that use only ``limit`` (without ``cursor``) continue to
|
||||
work without modification.
|
||||
|
||||
Args:
|
||||
status: Optional filter by job status
|
||||
limit: Maximum number of jobs to return (default 10, max 100)
|
||||
cursor: Opaque pagination cursor from a previous response
|
||||
|
||||
Returns:
|
||||
List of job statuses
|
||||
Paginated list of job statuses
|
||||
"""
|
||||
db = _get_job_db()
|
||||
job_rows = db.list_jobs(status=status, limit=limit)
|
||||
return [_job_row_to_status(row) for row in job_rows]
|
||||
# Fetch one extra to determine if there is a next page
|
||||
job_rows = db.list_jobs(status=status, limit=limit + 1, cursor=cursor)
|
||||
|
||||
has_next = len(job_rows) > limit
|
||||
if has_next:
|
||||
job_rows = job_rows[:limit]
|
||||
|
||||
items = [_job_row_to_status(row) for row in job_rows]
|
||||
|
||||
next_cursor = None
|
||||
if has_next and job_rows:
|
||||
last = job_rows[-1]
|
||||
created = last["created_at"]
|
||||
ts = created.isoformat() if hasattr(created, "isoformat") else str(created)
|
||||
next_cursor = f"{ts}|{last['job_id']}"
|
||||
|
||||
return PaginatedJobsResponse(items=items, next_cursor=next_cursor)
|
||||
|
||||
@@ -53,6 +53,13 @@ root_path = os.getenv("ROOT_PATH", "")
|
||||
# Used for safety checks (e.g., refusing default JWT secret in production)
|
||||
app_env = os.getenv("APP_ENV", "development")
|
||||
|
||||
# Storage backend: "local" (default) or "s3" for S3/MinIO object storage
|
||||
storage_backend = os.getenv("STORAGE_BACKEND", "local")
|
||||
s3_bucket = os.getenv("S3_BUCKET", "sparc-patents")
|
||||
s3_endpoint_url = os.getenv("S3_ENDPOINT_URL", "")
|
||||
s3_access_key = os.getenv("AWS_ACCESS_KEY_ID", "")
|
||||
s3_secret_key = os.getenv("AWS_SECRET_ACCESS_KEY", "")
|
||||
|
||||
# CORS allowed origins (comma-separated)
|
||||
# Defaults to localhost dev origins when unset
|
||||
_cors_origins_raw = os.getenv("CORS_ORIGINS", "")
|
||||
|
||||
+139
-7
@@ -192,6 +192,35 @@ class DatabaseClient:
|
||||
ON jobs(status)
|
||||
""")
|
||||
|
||||
# Create tracked companies table for scheduled analysis
|
||||
cursor.execute("""
|
||||
CREATE TABLE IF NOT EXISTS tracked_companies (
|
||||
id SERIAL PRIMARY KEY,
|
||||
company_name VARCHAR(255) UNIQUE NOT NULL,
|
||||
last_patent_count INTEGER DEFAULT 0,
|
||||
last_analysis_at TIMESTAMP,
|
||||
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
||||
)
|
||||
""")
|
||||
|
||||
# Create alerts table for significant changes
|
||||
cursor.execute("""
|
||||
CREATE TABLE IF NOT EXISTS alerts (
|
||||
id SERIAL PRIMARY KEY,
|
||||
company_name VARCHAR(255) NOT NULL,
|
||||
alert_type VARCHAR(50) NOT NULL,
|
||||
message TEXT NOT NULL,
|
||||
old_value NUMERIC,
|
||||
new_value NUMERIC,
|
||||
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
||||
)
|
||||
""")
|
||||
|
||||
cursor.execute("""
|
||||
CREATE INDEX IF NOT EXISTS idx_alerts_company
|
||||
ON alerts(company_name)
|
||||
""")
|
||||
|
||||
self.conn.commit()
|
||||
|
||||
@staticmethod
|
||||
@@ -568,20 +597,45 @@ class DatabaseClient:
|
||||
self,
|
||||
status: Optional[str] = None,
|
||||
limit: int = 10,
|
||||
cursor: Optional[str] = None,
|
||||
) -> List[Dict]:
|
||||
"""List jobs, optionally filtered by status."""
|
||||
query = "SELECT * FROM jobs"
|
||||
"""List jobs with optional status filter and cursor-based pagination.
|
||||
|
||||
Args:
|
||||
status: Optional status filter (pending, running, completed, failed).
|
||||
limit: Maximum number of jobs to return.
|
||||
cursor: Opaque cursor (``created_at|job_id``) from a previous
|
||||
response. When provided, only jobs older than the cursor are
|
||||
returned.
|
||||
|
||||
Returns:
|
||||
List of job dicts ordered by created_at descending.
|
||||
"""
|
||||
conditions: list[str] = []
|
||||
params: list = []
|
||||
|
||||
if status:
|
||||
query += " WHERE status = %s"
|
||||
conditions.append("status = %s")
|
||||
params.append(status)
|
||||
query += " ORDER BY created_at DESC LIMIT %s"
|
||||
|
||||
if cursor:
|
||||
try:
|
||||
ts_str, cursor_job_id = cursor.rsplit("|", 1)
|
||||
conditions.append("(created_at, job_id) < (%s, %s)")
|
||||
params.extend([ts_str, cursor_job_id])
|
||||
except ValueError:
|
||||
pass # Ignore malformed cursors; return from start
|
||||
|
||||
query = "SELECT * FROM jobs"
|
||||
if conditions:
|
||||
query += " WHERE " + " AND ".join(conditions)
|
||||
query += " ORDER BY created_at DESC, job_id DESC LIMIT %s"
|
||||
params.append(limit)
|
||||
|
||||
with self.get_conn() as conn:
|
||||
with conn.cursor(cursor_factory=RealDictCursor) as cursor:
|
||||
cursor.execute(query, params)
|
||||
return [dict(row) for row in cursor.fetchall()]
|
||||
with conn.cursor(cursor_factory=RealDictCursor) as cur:
|
||||
cur.execute(query, params)
|
||||
return [dict(row) for row in cur.fetchall()]
|
||||
|
||||
def mark_stale_jobs_failed(self) -> int:
|
||||
"""Mark any jobs in 'running' or 'pending' state as 'failed'.
|
||||
@@ -803,3 +857,81 @@ class DatabaseClient:
|
||||
with conn.cursor() as cursor:
|
||||
cursor.execute("SELECT COUNT(*) FROM users")
|
||||
return cursor.fetchone()[0]
|
||||
|
||||
# Tracked Companies Methods
|
||||
|
||||
def add_tracked_company(self, company_name: str) -> Optional[Dict]:
|
||||
"""Add a company to the tracking list."""
|
||||
with self.get_conn() as conn:
|
||||
with conn.cursor(cursor_factory=RealDictCursor) as cursor:
|
||||
try:
|
||||
cursor.execute(
|
||||
"INSERT INTO tracked_companies (company_name) VALUES (%s) RETURNING *",
|
||||
(company_name,),
|
||||
)
|
||||
row = cursor.fetchone()
|
||||
conn.commit()
|
||||
return dict(row) if row else None
|
||||
except Exception:
|
||||
conn.rollback()
|
||||
return None
|
||||
|
||||
def remove_tracked_company(self, company_name: str) -> bool:
|
||||
"""Remove a company from the tracking list."""
|
||||
with self.get_conn() as conn:
|
||||
with conn.cursor() as cursor:
|
||||
cursor.execute(
|
||||
"DELETE FROM tracked_companies WHERE LOWER(company_name) = LOWER(%s)",
|
||||
(company_name,),
|
||||
)
|
||||
conn.commit()
|
||||
return cursor.rowcount > 0
|
||||
|
||||
def list_tracked_companies(self) -> List[Dict]:
|
||||
"""List all tracked companies."""
|
||||
with self.get_conn() as conn:
|
||||
with conn.cursor(cursor_factory=RealDictCursor) as cursor:
|
||||
cursor.execute("SELECT * FROM tracked_companies ORDER BY company_name")
|
||||
return [dict(row) for row in cursor.fetchall()]
|
||||
|
||||
def update_tracked_company(
|
||||
self, company_name: str, patent_count: int
|
||||
) -> None:
|
||||
"""Update the last analysis stats for a tracked company."""
|
||||
with self.get_conn() as conn:
|
||||
with conn.cursor() as cursor:
|
||||
cursor.execute(
|
||||
"""UPDATE tracked_companies
|
||||
SET last_patent_count = %s, last_analysis_at = CURRENT_TIMESTAMP
|
||||
WHERE LOWER(company_name) = LOWER(%s)""",
|
||||
(patent_count, company_name),
|
||||
)
|
||||
conn.commit()
|
||||
|
||||
def store_alert(
|
||||
self,
|
||||
company_name: str,
|
||||
alert_type: str,
|
||||
message: str,
|
||||
old_value: float | None = None,
|
||||
new_value: float | None = None,
|
||||
) -> None:
|
||||
"""Record an alert for a significant change."""
|
||||
with self.get_conn() as conn:
|
||||
with conn.cursor() as cursor:
|
||||
cursor.execute(
|
||||
"""INSERT INTO alerts (company_name, alert_type, message, old_value, new_value)
|
||||
VALUES (%s, %s, %s, %s, %s)""",
|
||||
(company_name, alert_type, message, old_value, new_value),
|
||||
)
|
||||
conn.commit()
|
||||
|
||||
def list_alerts(self, limit: int = 50) -> List[Dict]:
|
||||
"""List recent alerts."""
|
||||
with self.get_conn() as conn:
|
||||
with conn.cursor(cursor_factory=RealDictCursor) as cursor:
|
||||
cursor.execute(
|
||||
"SELECT * FROM alerts ORDER BY created_at DESC LIMIT %s",
|
||||
(limit,),
|
||||
)
|
||||
return [dict(row) for row in cursor.fetchall()]
|
||||
|
||||
@@ -0,0 +1,109 @@
|
||||
"""Scheduled patent analysis for tracked companies.
|
||||
|
||||
Uses APScheduler to periodically re-analyze tracked companies and
|
||||
detect significant changes in patent counts.
|
||||
"""
|
||||
|
||||
import logging
|
||||
import os
|
||||
|
||||
from SPARC import config
|
||||
from SPARC.analyzer import CompanyAnalyzer
|
||||
from SPARC.database import DatabaseClient
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Configurable via environment variable (in hours, default 24)
|
||||
SCHEDULE_INTERVAL_HOURS = int(os.getenv("SCHEDULE_INTERVAL_HOURS", "24"))
|
||||
|
||||
# Patent count change threshold (percentage) to trigger an alert
|
||||
CHANGE_THRESHOLD_PERCENT = int(os.getenv("CHANGE_THRESHOLD_PERCENT", "20"))
|
||||
|
||||
|
||||
def run_scheduled_analysis() -> None:
|
||||
"""Re-analyze all tracked companies and check for significant changes."""
|
||||
db = DatabaseClient(config.database_url)
|
||||
db.connect()
|
||||
db.initialize_schema()
|
||||
|
||||
tracked = db.list_tracked_companies()
|
||||
if not tracked:
|
||||
logger.info("No tracked companies configured; skipping scheduled analysis")
|
||||
return
|
||||
|
||||
logger.info("Running scheduled analysis for %d tracked companies", len(tracked))
|
||||
|
||||
analyzer = CompanyAnalyzer(db_client=db)
|
||||
|
||||
for company_row in tracked:
|
||||
name = company_row["company_name"]
|
||||
old_count = company_row.get("last_patent_count", 0) or 0
|
||||
|
||||
try:
|
||||
result = analyzer._analyze_company_safe(name)
|
||||
|
||||
if result.success:
|
||||
new_count = result.patent_count
|
||||
|
||||
# Update tracking record
|
||||
db.update_tracked_company(name, new_count)
|
||||
|
||||
# Check for significant change
|
||||
if old_count > 0:
|
||||
delta_pct = abs(new_count - old_count) / old_count * 100
|
||||
if delta_pct >= CHANGE_THRESHOLD_PERCENT:
|
||||
direction = "increased" if new_count > old_count else "decreased"
|
||||
message = (
|
||||
f"Patent count for {name} {direction} by {delta_pct:.0f}% "
|
||||
f"({old_count} -> {new_count})"
|
||||
)
|
||||
logger.warning("ALERT: %s", message)
|
||||
db.store_alert(
|
||||
company_name=name,
|
||||
alert_type="patent_count_change",
|
||||
message=message,
|
||||
old_value=old_count,
|
||||
new_value=new_count,
|
||||
)
|
||||
elif new_count > 0:
|
||||
# First analysis -- record baseline
|
||||
logger.info("Baseline for %s: %d patents", name, new_count)
|
||||
else:
|
||||
logger.warning("Scheduled analysis failed for %s: %s", name, result.error)
|
||||
|
||||
except Exception as e:
|
||||
logger.error("Error analyzing tracked company %s: %s", name, e)
|
||||
|
||||
db.close()
|
||||
logger.info("Scheduled analysis complete")
|
||||
|
||||
|
||||
def start_scheduler() -> None:
|
||||
"""Start the APScheduler background scheduler.
|
||||
|
||||
Safe to call at application startup. If apscheduler is not installed,
|
||||
the function logs a warning and returns without starting anything.
|
||||
"""
|
||||
try:
|
||||
from apscheduler.schedulers.background import BackgroundScheduler
|
||||
except ImportError:
|
||||
logger.warning(
|
||||
"apscheduler not installed; scheduled analysis disabled. "
|
||||
"Install with: pip install apscheduler"
|
||||
)
|
||||
return
|
||||
|
||||
scheduler = BackgroundScheduler()
|
||||
scheduler.add_job(
|
||||
run_scheduled_analysis,
|
||||
"interval",
|
||||
hours=SCHEDULE_INTERVAL_HOURS,
|
||||
id="scheduled_patent_analysis",
|
||||
replace_existing=True,
|
||||
)
|
||||
scheduler.start()
|
||||
logger.info(
|
||||
"Scheduled patent analysis started (every %d hours, threshold %d%%)",
|
||||
SCHEDULE_INTERVAL_HOURS,
|
||||
CHANGE_THRESHOLD_PERCENT,
|
||||
)
|
||||
+47
-13
@@ -1,4 +1,5 @@
|
||||
import os
|
||||
import io
|
||||
import logging
|
||||
import re
|
||||
from datetime import datetime, timedelta
|
||||
from typing import Dict
|
||||
@@ -8,8 +9,21 @@ import requests
|
||||
import serpapi
|
||||
|
||||
from SPARC import config
|
||||
from SPARC.storage import StorageBackend, get_storage_backend
|
||||
from SPARC.types import Patent, Patents
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Module-level storage instance (lazy-initialized)
|
||||
_storage: StorageBackend | None = None
|
||||
|
||||
|
||||
def _get_storage() -> StorageBackend:
|
||||
global _storage
|
||||
if _storage is None:
|
||||
_storage = get_storage_backend()
|
||||
return _storage
|
||||
|
||||
|
||||
class SERP:
|
||||
def query(company: str, days_back: int = None) -> Patents:
|
||||
@@ -44,6 +58,7 @@ class SERP:
|
||||
"tbs": date_filter,
|
||||
"api_key": config.api_key,
|
||||
}
|
||||
logger.info("Querying Google Patents for '%s' (last %d days)", company, days_back)
|
||||
search = serpapi.search(params)
|
||||
# Convert results to Patent objects, skipping any without PDF links
|
||||
patent_ids = []
|
||||
@@ -52,13 +67,16 @@ class SERP:
|
||||
pdf_link = patent.get("pdf")
|
||||
if pdf_link:
|
||||
patent_ids.append(Patent(patent_id=patent["publication_number"], pdf_link=pdf_link, summary=None))
|
||||
# Patents without PDF links are skipped (see docstring for details)
|
||||
else:
|
||||
logger.debug("Skipping patent %s (no PDF link)", patent.get("publication_number", "unknown"))
|
||||
|
||||
logger.info("Found %d patents with PDF links for '%s'", len(patent_ids), company)
|
||||
return Patents(patents=patent_ids)
|
||||
|
||||
def save_patents(patent: Patent) -> Patent:
|
||||
"""
|
||||
Save the patent PDF to the patents folder, skipping download if already cached.
|
||||
"""Save the patent PDF to storage, skipping download if already cached.
|
||||
|
||||
Uses the configured storage backend (local filesystem or S3).
|
||||
|
||||
Args:
|
||||
patent: Patent object
|
||||
@@ -66,35 +84,51 @@ class SERP:
|
||||
Returns:
|
||||
Patent object with updated PDF path
|
||||
"""
|
||||
pdf_path = f"patents/{patent.patent_id}.pdf"
|
||||
os.makedirs("patents", exist_ok=True)
|
||||
storage = _get_storage()
|
||||
key = f"{patent.patent_id}.pdf"
|
||||
|
||||
if not (os.path.exists(pdf_path) and os.path.getsize(pdf_path) > 0):
|
||||
if not storage.exists(key):
|
||||
logger.info("Downloading PDF for %s", patent.patent_id)
|
||||
response = requests.get(patent.pdf_link)
|
||||
with open(pdf_path, "wb") as f:
|
||||
f.write(response.content)
|
||||
storage.write(key, response.content)
|
||||
logger.debug("Saved %d bytes for %s", len(response.content), patent.patent_id)
|
||||
else:
|
||||
logger.debug("Using cached PDF for %s", patent.patent_id)
|
||||
|
||||
patent.pdf_path = pdf_path
|
||||
patent.pdf_path = storage.path_for(key)
|
||||
return patent
|
||||
|
||||
def parse_patent_pdf(pdf_path: str) -> Dict:
|
||||
"""Extract structured sections from patent PDF.
|
||||
|
||||
Extracts all major sections from a patent PDF including abstract,
|
||||
claims, summary, and detailed description.
|
||||
claims, summary, and detailed description. Supports both local file
|
||||
paths and S3 URIs (s3://bucket/key).
|
||||
|
||||
Args:
|
||||
pdf_path: Path to the patent PDF file
|
||||
pdf_path: Local path or S3 URI to the patent PDF file
|
||||
|
||||
Returns:
|
||||
Dictionary containing all extracted sections
|
||||
"""
|
||||
logger.debug("Parsing patent PDF: %s", pdf_path)
|
||||
|
||||
with pdfplumber.open(pdf_path) as pdf:
|
||||
if pdf_path.startswith("s3://"):
|
||||
# Read from S3 via storage backend
|
||||
storage = _get_storage()
|
||||
# Extract key from "s3://bucket/key"
|
||||
key = pdf_path.split("/", 3)[-1]
|
||||
data = storage.read(key)
|
||||
pdf_file: io.BytesIO | str = io.BytesIO(data)
|
||||
else:
|
||||
pdf_file = pdf_path
|
||||
|
||||
with pdfplumber.open(pdf_file) as pdf:
|
||||
# Extract all text
|
||||
full_text = ""
|
||||
for page in pdf.pages:
|
||||
full_text += page.extract_text() + "\n"
|
||||
logger.debug("Extracted text from %d pages (%d chars)", len(pdf.pages), len(full_text))
|
||||
|
||||
# Define section patterns (common in patents)
|
||||
sections = {
|
||||
|
||||
@@ -0,0 +1,171 @@
|
||||
"""Patent PDF storage abstraction.
|
||||
|
||||
Provides a unified interface for reading and writing patent PDF files,
|
||||
with pluggable backends for local filesystem and S3-compatible object
|
||||
storage (e.g., MinIO, AWS S3).
|
||||
"""
|
||||
|
||||
import logging
|
||||
import os
|
||||
from abc import ABC, abstractmethod
|
||||
|
||||
from SPARC import config
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class StorageBackend(ABC):
|
||||
"""Abstract base class for patent PDF storage."""
|
||||
|
||||
@abstractmethod
|
||||
def read(self, key: str) -> bytes:
|
||||
"""Read a file by key.
|
||||
|
||||
Args:
|
||||
key: Storage key (e.g., "US-12345678-B2.pdf")
|
||||
|
||||
Returns:
|
||||
File contents as bytes.
|
||||
|
||||
Raises:
|
||||
FileNotFoundError: If the file does not exist.
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def write(self, key: str, data: bytes) -> None:
|
||||
"""Write data to storage.
|
||||
|
||||
Args:
|
||||
key: Storage key (e.g., "US-12345678-B2.pdf")
|
||||
data: File contents as bytes.
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def exists(self, key: str) -> bool:
|
||||
"""Check if a file exists in storage.
|
||||
|
||||
Args:
|
||||
key: Storage key.
|
||||
|
||||
Returns:
|
||||
True if the file exists and has non-zero size.
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def path_for(self, key: str) -> str:
|
||||
"""Return a path or URI suitable for downstream consumers.
|
||||
|
||||
For local storage this is a filesystem path; for S3 it is the
|
||||
object key (callers that need a local file should use read()
|
||||
and write to a temporary location).
|
||||
"""
|
||||
|
||||
|
||||
class LocalStorageBackend(StorageBackend):
|
||||
"""Store patent PDFs on the local filesystem under a directory."""
|
||||
|
||||
def __init__(self, base_dir: str = "patents"):
|
||||
self.base_dir = base_dir
|
||||
os.makedirs(self.base_dir, exist_ok=True)
|
||||
|
||||
def _full_path(self, key: str) -> str:
|
||||
return os.path.join(self.base_dir, key)
|
||||
|
||||
def read(self, key: str) -> bytes:
|
||||
path = self._full_path(key)
|
||||
if not os.path.exists(path):
|
||||
raise FileNotFoundError(f"File not found: {path}")
|
||||
with open(path, "rb") as f:
|
||||
return f.read()
|
||||
|
||||
def write(self, key: str, data: bytes) -> None:
|
||||
path = self._full_path(key)
|
||||
os.makedirs(os.path.dirname(path) or self.base_dir, exist_ok=True)
|
||||
with open(path, "wb") as f:
|
||||
f.write(data)
|
||||
logger.debug("Wrote %d bytes to %s", len(data), path)
|
||||
|
||||
def exists(self, key: str) -> bool:
|
||||
path = self._full_path(key)
|
||||
return os.path.exists(path) and os.path.getsize(path) > 0
|
||||
|
||||
def path_for(self, key: str) -> str:
|
||||
return self._full_path(key)
|
||||
|
||||
|
||||
class S3StorageBackend(StorageBackend):
|
||||
"""Store patent PDFs in an S3-compatible bucket."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
bucket: str,
|
||||
endpoint_url: str = "",
|
||||
access_key: str = "",
|
||||
secret_key: str = "",
|
||||
):
|
||||
import boto3
|
||||
|
||||
kwargs: dict = {}
|
||||
if endpoint_url:
|
||||
kwargs["endpoint_url"] = endpoint_url
|
||||
if access_key and secret_key:
|
||||
kwargs["aws_access_key_id"] = access_key
|
||||
kwargs["aws_secret_access_key"] = secret_key
|
||||
|
||||
self.s3 = boto3.client("s3", **kwargs)
|
||||
self.bucket = bucket
|
||||
|
||||
# Ensure bucket exists (useful for MinIO local dev)
|
||||
try:
|
||||
self.s3.head_bucket(Bucket=self.bucket)
|
||||
except Exception:
|
||||
try:
|
||||
self.s3.create_bucket(Bucket=self.bucket)
|
||||
logger.info("Created S3 bucket: %s", self.bucket)
|
||||
except Exception as e:
|
||||
logger.warning("Could not create bucket %s: %s", self.bucket, e)
|
||||
|
||||
def read(self, key: str) -> bytes:
|
||||
try:
|
||||
response = self.s3.get_object(Bucket=self.bucket, Key=key)
|
||||
return response["Body"].read()
|
||||
except self.s3.exceptions.NoSuchKey:
|
||||
raise FileNotFoundError(f"S3 object not found: s3://{self.bucket}/{key}")
|
||||
except Exception as e:
|
||||
if "NoSuchKey" in str(e) or "404" in str(e):
|
||||
raise FileNotFoundError(f"S3 object not found: s3://{self.bucket}/{key}")
|
||||
raise
|
||||
|
||||
def write(self, key: str, data: bytes) -> None:
|
||||
self.s3.put_object(
|
||||
Bucket=self.bucket,
|
||||
Key=key,
|
||||
Body=data,
|
||||
ContentType="application/pdf",
|
||||
)
|
||||
logger.debug("Wrote %d bytes to s3://%s/%s", len(data), self.bucket, key)
|
||||
|
||||
def exists(self, key: str) -> bool:
|
||||
try:
|
||||
response = self.s3.head_object(Bucket=self.bucket, Key=key)
|
||||
return response["ContentLength"] > 0
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
def path_for(self, key: str) -> str:
|
||||
return f"s3://{self.bucket}/{key}"
|
||||
|
||||
|
||||
def get_storage_backend() -> StorageBackend:
|
||||
"""Factory: return the configured storage backend instance."""
|
||||
backend = config.storage_backend.lower()
|
||||
if backend == "s3":
|
||||
logger.info("Using S3 storage backend (bucket=%s)", config.s3_bucket)
|
||||
return S3StorageBackend(
|
||||
bucket=config.s3_bucket,
|
||||
endpoint_url=config.s3_endpoint_url,
|
||||
access_key=config.s3_access_key,
|
||||
secret_key=config.s3_secret_key,
|
||||
)
|
||||
logger.info("Using local storage backend")
|
||||
return LocalStorageBackend()
|
||||
@@ -0,0 +1,139 @@
|
||||
"""Webhook notifications for job completion and alert events.
|
||||
|
||||
Sends JSON payloads to configured webhook URLs with retry logic.
|
||||
Supports generic HTTP POST and Slack-compatible text payloads.
|
||||
"""
|
||||
|
||||
import logging
|
||||
import os
|
||||
import time
|
||||
from datetime import datetime
|
||||
from typing import Any
|
||||
|
||||
import requests
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Comma-separated list of webhook URLs (env var based config)
|
||||
_WEBHOOK_URLS_RAW = os.getenv("WEBHOOK_URLS", "")
|
||||
WEBHOOK_URLS: list[str] = [
|
||||
url.strip() for url in _WEBHOOK_URLS_RAW.split(",") if url.strip()
|
||||
]
|
||||
|
||||
MAX_RETRIES = 3
|
||||
BACKOFF_BASE = 2 # seconds
|
||||
|
||||
|
||||
def _is_slack_url(url: str) -> bool:
|
||||
"""Check if a URL looks like a Slack incoming webhook."""
|
||||
return "hooks.slack.com" in url or "discord.com/api/webhooks" in url
|
||||
|
||||
|
||||
def _build_payload(event_type: str, data: dict[str, Any], slack: bool = False) -> dict:
|
||||
"""Build the webhook payload.
|
||||
|
||||
Args:
|
||||
event_type: Type of event (e.g., "job_completed", "alert")
|
||||
data: Event-specific data
|
||||
slack: If True, wrap in Slack-compatible ``text`` format
|
||||
|
||||
Returns:
|
||||
JSON-serializable payload dict
|
||||
"""
|
||||
payload = {
|
||||
"event": event_type,
|
||||
"timestamp": datetime.utcnow().isoformat() + "Z",
|
||||
**data,
|
||||
}
|
||||
|
||||
if slack:
|
||||
# Build a human-readable summary for Slack/Discord
|
||||
lines = [f"*[SPARC] {event_type}*"]
|
||||
for key, value in data.items():
|
||||
lines.append(f" {key}: {value}")
|
||||
return {"text": "\n".join(lines)}
|
||||
|
||||
return payload
|
||||
|
||||
|
||||
def _send_with_retry(url: str, payload: dict) -> bool:
|
||||
"""Send a POST request with exponential backoff retry.
|
||||
|
||||
Args:
|
||||
url: Webhook URL
|
||||
payload: JSON payload to send
|
||||
|
||||
Returns:
|
||||
True if delivered successfully, False after all retries exhausted
|
||||
"""
|
||||
for attempt in range(1, MAX_RETRIES + 1):
|
||||
try:
|
||||
response = requests.post(url, json=payload, timeout=10)
|
||||
if response.status_code < 300:
|
||||
logger.debug("Webhook delivered to %s (attempt %d)", url, attempt)
|
||||
return True
|
||||
logger.warning(
|
||||
"Webhook %s returned %d (attempt %d/%d)",
|
||||
url, response.status_code, attempt, MAX_RETRIES,
|
||||
)
|
||||
except requests.RequestException as e:
|
||||
logger.warning(
|
||||
"Webhook delivery failed for %s (attempt %d/%d): %s",
|
||||
url, attempt, MAX_RETRIES, e,
|
||||
)
|
||||
|
||||
if attempt < MAX_RETRIES:
|
||||
wait = BACKOFF_BASE ** attempt
|
||||
time.sleep(wait)
|
||||
|
||||
logger.error("Webhook permanently failed for %s after %d attempts", url, MAX_RETRIES)
|
||||
return False
|
||||
|
||||
|
||||
def notify(event_type: str, data: dict[str, Any]) -> None:
|
||||
"""Fire all configured webhooks for an event.
|
||||
|
||||
Safe to call even when no webhooks are configured (returns immediately).
|
||||
|
||||
Args:
|
||||
event_type: Event identifier (e.g., "job_completed", "patent_alert")
|
||||
data: Event data to include in the payload
|
||||
"""
|
||||
if not WEBHOOK_URLS:
|
||||
return
|
||||
|
||||
for url in WEBHOOK_URLS:
|
||||
slack = _is_slack_url(url)
|
||||
payload = _build_payload(event_type, data, slack=slack)
|
||||
_send_with_retry(url, payload)
|
||||
|
||||
|
||||
def notify_job_completed(
|
||||
job_id: str,
|
||||
status: str,
|
||||
total_companies: int,
|
||||
successful: int,
|
||||
failed: int,
|
||||
) -> None:
|
||||
"""Send notification when a batch job completes."""
|
||||
notify("job_completed", {
|
||||
"job_id": job_id,
|
||||
"status": status,
|
||||
"total_companies": total_companies,
|
||||
"successful": successful,
|
||||
"failed": failed,
|
||||
"summary": f"Batch job {job_id}: {successful}/{total_companies} succeeded",
|
||||
})
|
||||
|
||||
|
||||
def notify_alert(
|
||||
company_name: str,
|
||||
alert_type: str,
|
||||
message: str,
|
||||
) -> None:
|
||||
"""Send notification for a tracked company alert."""
|
||||
notify("patent_alert", {
|
||||
"company_name": company_name,
|
||||
"alert_type": alert_type,
|
||||
"message": message,
|
||||
})
|
||||
Reference in New Issue
Block a user