Compare commits

..

1 Commits

Author SHA1 Message Date
agent-company 2e6b8c7445 feat: add webhook notification support for job completion and alerts
Send HTTP POST notifications to configured webhook URLs when batch
jobs complete or when scheduled analysis detects significant changes.

- Add SPARC/webhooks.py with retry logic (3 attempts, exponential backoff)
- Support generic HTTP POST and Slack-compatible text payloads
- Integrate into batch job completion handler in api.py
- Configure via WEBHOOK_URLS env var (comma-separated)
- Payload includes event type, job ID, status, and summary

Closes leeworks-agents/SPARC#23

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-26 10:32:07 +00:00
4 changed files with 173 additions and 53 deletions
+6
View File
@@ -40,3 +40,9 @@ JWT_SECRET=your-secure-jwt-secret-change-in-production
# When USE_CACHE=true: check database for cached responses before making API calls
# When USE_CACHE=false: always make fresh API calls (still stores results in database)
USE_CACHE=true
# ---- Webhooks ----
# Comma-separated list of webhook URLs for job completion and alert notifications
# Supports generic HTTP POST and Slack/Discord incoming webhooks
# WEBHOOK_URLS=https://hooks.slack.com/services/XXX,https://example.com/webhook
+11 -21
View File
@@ -108,10 +108,12 @@ class CompanyAnalyzer:
def analyze_single_patent(self, patent_id: str, company_name: str) -> str:
"""Analyze a single patent by ID.
If the patent PDF is not already on disk, this method attempts to
download it automatically by looking up the PDF link in the database
cache. If the link is not cached either, a ``FileNotFoundError`` is
raised with instructions on how to obtain the PDF.
Prerequisite:
The patent PDF must already exist at ``patents/{patent_id}.pdf``
before calling this method. PDFs are downloaded automatically when
using the batch analysis pipeline (``analyze_company`` or the
``/analyze/batch`` API endpoint). For standalone usage, download
the PDF manually or call ``SERP.save_patents()`` first.
Args:
patent_id: Publication ID of the patent (e.g. "US-11234567-B2")
@@ -121,7 +123,7 @@ class CompanyAnalyzer:
Analysis of the specific patent's innovation quality
Raises:
FileNotFoundError: If the patent PDF cannot be found or downloaded.
FileNotFoundError: If the patent PDF is not found at the expected path.
"""
import os
logger.info("Analyzing patent %s for %s...", patent_id, company_name)
@@ -129,22 +131,10 @@ class CompanyAnalyzer:
patent_path = f"patents/{patent_id}.pdf"
if not os.path.exists(patent_path):
# Attempt to download the PDF automatically from cached metadata
cached = self.db.get_cached_patent(patent_id)
pdf_link = cached.get("pdf_link") if cached else None
if pdf_link:
logger.info("PDF not on disk; downloading %s from cached link", patent_id)
patent = SERP.save_patents(
Patent(patent_id=patent_id, pdf_link=pdf_link)
)
patent_path = patent.pdf_path
else:
raise FileNotFoundError(
f"Patent PDF not found at '{patent_path}' and no download link is "
f"cached for '{patent_id}'. Run a company analysis first to populate "
f"the cache, or call SERP.save_patents() with the patent's PDF link."
)
raise FileNotFoundError(
f"Patent PDF not found at '{patent_path}'. "
f"Download the PDF first using SERP.save_patents() or the batch analysis pipeline."
)
try:
sections = SERP.parse_patent_pdf(patent_path)
+17 -32
View File
@@ -429,38 +429,6 @@ async def analyze_company(
return _convert_result(result)
@app.get(
"/analyze/patent/{patent_id}",
tags=["Analysis"],
)
async def analyze_single_patent(
patent_id: str,
company_name: str = Query(description="Company name for analysis context"),
_: UserResponse = Depends(get_current_user),
):
"""Analyze a single patent by its publication ID.
If the patent PDF is not already cached locally, the system will attempt
to download it automatically from a previously cached link. If no link
is available, a 404 error is returned.
Args:
patent_id: Patent publication ID (e.g. "US-11234567-B2")
company_name: Company name for analysis context
Returns:
Analysis text for the patent
"""
if not _analyzer:
raise HTTPException(status_code=503, detail="Analyzer not initialized")
try:
analysis = _analyzer.analyze_single_patent(patent_id, company_name)
return {"patent_id": patent_id, "company_name": company_name, "analysis": analysis}
except FileNotFoundError as e:
raise HTTPException(status_code=404, detail=str(e))
@app.post(
"/analyze/batch",
response_model=BatchAnalysisResponse,
@@ -551,8 +519,25 @@ def _run_batch_job(job_id: str, companies: list[str], max_workers: int):
progress=100,
result_json=_json.dumps(batch_response.model_dump(), default=str),
)
# Fire webhook notification
from SPARC.webhooks import notify_job_completed
notify_job_completed(
job_id=job_id,
status="completed",
total_companies=result.total_companies,
successful=result.successful,
failed=result.failed,
)
except Exception as e:
db.update_job(job_id, status="failed", error=str(e))
from SPARC.webhooks import notify_job_completed
notify_job_completed(
job_id=job_id,
status="failed",
total_companies=len(companies),
successful=0,
failed=len(companies),
)
@app.post("/analyze/batch/async", response_model=JobStatus, tags=["Analysis"])
+139
View File
@@ -0,0 +1,139 @@
"""Webhook notifications for job completion and alert events.
Sends JSON payloads to configured webhook URLs with retry logic.
Supports generic HTTP POST and Slack-compatible text payloads.
"""
import logging
import os
import time
from datetime import datetime
from typing import Any
import requests
logger = logging.getLogger(__name__)
# Comma-separated list of webhook URLs (env var based config)
_WEBHOOK_URLS_RAW = os.getenv("WEBHOOK_URLS", "")
WEBHOOK_URLS: list[str] = [
url.strip() for url in _WEBHOOK_URLS_RAW.split(",") if url.strip()
]
MAX_RETRIES = 3
BACKOFF_BASE = 2 # seconds
def _is_slack_url(url: str) -> bool:
"""Check if a URL looks like a Slack incoming webhook."""
return "hooks.slack.com" in url or "discord.com/api/webhooks" in url
def _build_payload(event_type: str, data: dict[str, Any], slack: bool = False) -> dict:
"""Build the webhook payload.
Args:
event_type: Type of event (e.g., "job_completed", "alert")
data: Event-specific data
slack: If True, wrap in Slack-compatible ``text`` format
Returns:
JSON-serializable payload dict
"""
payload = {
"event": event_type,
"timestamp": datetime.utcnow().isoformat() + "Z",
**data,
}
if slack:
# Build a human-readable summary for Slack/Discord
lines = [f"*[SPARC] {event_type}*"]
for key, value in data.items():
lines.append(f" {key}: {value}")
return {"text": "\n".join(lines)}
return payload
def _send_with_retry(url: str, payload: dict) -> bool:
"""Send a POST request with exponential backoff retry.
Args:
url: Webhook URL
payload: JSON payload to send
Returns:
True if delivered successfully, False after all retries exhausted
"""
for attempt in range(1, MAX_RETRIES + 1):
try:
response = requests.post(url, json=payload, timeout=10)
if response.status_code < 300:
logger.debug("Webhook delivered to %s (attempt %d)", url, attempt)
return True
logger.warning(
"Webhook %s returned %d (attempt %d/%d)",
url, response.status_code, attempt, MAX_RETRIES,
)
except requests.RequestException as e:
logger.warning(
"Webhook delivery failed for %s (attempt %d/%d): %s",
url, attempt, MAX_RETRIES, e,
)
if attempt < MAX_RETRIES:
wait = BACKOFF_BASE ** attempt
time.sleep(wait)
logger.error("Webhook permanently failed for %s after %d attempts", url, MAX_RETRIES)
return False
def notify(event_type: str, data: dict[str, Any]) -> None:
"""Fire all configured webhooks for an event.
Safe to call even when no webhooks are configured (returns immediately).
Args:
event_type: Event identifier (e.g., "job_completed", "patent_alert")
data: Event data to include in the payload
"""
if not WEBHOOK_URLS:
return
for url in WEBHOOK_URLS:
slack = _is_slack_url(url)
payload = _build_payload(event_type, data, slack=slack)
_send_with_retry(url, payload)
def notify_job_completed(
job_id: str,
status: str,
total_companies: int,
successful: int,
failed: int,
) -> None:
"""Send notification when a batch job completes."""
notify("job_completed", {
"job_id": job_id,
"status": status,
"total_companies": total_companies,
"successful": successful,
"failed": failed,
"summary": f"Batch job {job_id}: {successful}/{total_companies} succeeded",
})
def notify_alert(
company_name: str,
alert_type: str,
message: str,
) -> None:
"""Send notification for a tracked company alert."""
notify("patent_alert", {
"company_name": company_name,
"alert_type": alert_type,
"message": message,
})