forked from 0xWheatyz/SPARC
ab3964b18d
Introduce a lightweight in-process task queue (thread + queue.Queue) so that webhook HTTP delivery no longer blocks the scheduler or batch-job background tasks. The worker thread preserves the existing exponential- backoff retry logic from _send_with_retry. - Add SPARC/task_queue.py: WebhookTask, start/stop worker, enqueue, drain - Add enqueue_notify / enqueue_job_completed / enqueue_alert to webhooks.py - Update api.py lifespan to start/stop the webhook worker - Update _run_batch_job to use enqueue_job_completed (non-blocking) - Update scheduler to fire enqueue_alert on patent count changes - Add 13 tests covering worker lifecycle, async delivery, retry in worker context, and integration via enqueue helpers - All 22 existing webhook tests continue to pass unchanged Closes leeworks-agents/SPARC#1676 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
195 lines
5.5 KiB
Python
195 lines
5.5 KiB
Python
"""Webhook notifications for job completion and alert events.
|
|
|
|
Sends JSON payloads to configured webhook URLs with retry logic.
|
|
Supports generic HTTP POST and Slack-compatible text payloads.
|
|
"""
|
|
|
|
import logging
|
|
import os
|
|
import time
|
|
from datetime import datetime
|
|
from typing import Any
|
|
|
|
import requests
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Comma-separated list of webhook URLs (env var based config)
|
|
_WEBHOOK_URLS_RAW = os.getenv("WEBHOOK_URLS", "")
|
|
WEBHOOK_URLS: list[str] = [
|
|
url.strip() for url in _WEBHOOK_URLS_RAW.split(",") if url.strip()
|
|
]
|
|
|
|
MAX_RETRIES = 3
|
|
BACKOFF_BASE = 2 # seconds
|
|
|
|
|
|
def _is_slack_url(url: str) -> bool:
|
|
"""Check if a URL looks like a Slack incoming webhook."""
|
|
return "hooks.slack.com" in url or "discord.com/api/webhooks" in url
|
|
|
|
|
|
def _build_payload(event_type: str, data: dict[str, Any], slack: bool = False) -> dict:
|
|
"""Build the webhook payload.
|
|
|
|
Args:
|
|
event_type: Type of event (e.g., "job_completed", "alert")
|
|
data: Event-specific data
|
|
slack: If True, wrap in Slack-compatible ``text`` format
|
|
|
|
Returns:
|
|
JSON-serializable payload dict
|
|
"""
|
|
payload = {
|
|
"event": event_type,
|
|
"timestamp": datetime.utcnow().isoformat() + "Z",
|
|
**data,
|
|
}
|
|
|
|
if slack:
|
|
# Build a human-readable summary for Slack/Discord
|
|
lines = [f"*[SPARC] {event_type}*"]
|
|
for key, value in data.items():
|
|
lines.append(f" {key}: {value}")
|
|
return {"text": "\n".join(lines)}
|
|
|
|
return payload
|
|
|
|
|
|
def _send_with_retry(url: str, payload: dict) -> bool:
|
|
"""Send a POST request with exponential backoff retry.
|
|
|
|
Args:
|
|
url: Webhook URL
|
|
payload: JSON payload to send
|
|
|
|
Returns:
|
|
True if delivered successfully, False after all retries exhausted
|
|
"""
|
|
for attempt in range(1, MAX_RETRIES + 1):
|
|
try:
|
|
response = requests.post(url, json=payload, timeout=10)
|
|
if response.status_code < 300:
|
|
logger.debug("Webhook delivered to %s (attempt %d)", url, attempt)
|
|
return True
|
|
logger.warning(
|
|
"Webhook %s returned %d (attempt %d/%d)",
|
|
url, response.status_code, attempt, MAX_RETRIES,
|
|
)
|
|
except requests.RequestException as e:
|
|
logger.warning(
|
|
"Webhook delivery failed for %s (attempt %d/%d): %s",
|
|
url, attempt, MAX_RETRIES, e,
|
|
)
|
|
|
|
if attempt < MAX_RETRIES:
|
|
wait = BACKOFF_BASE ** attempt
|
|
time.sleep(wait)
|
|
|
|
logger.error("Webhook permanently failed for %s after %d attempts", url, MAX_RETRIES)
|
|
return False
|
|
|
|
|
|
def notify(event_type: str, data: dict[str, Any]) -> None:
|
|
"""Fire all configured webhooks for an event (**blocking**).
|
|
|
|
Safe to call even when no webhooks are configured (returns immediately).
|
|
For non-blocking delivery, use :func:`enqueue_notify` instead.
|
|
|
|
Args:
|
|
event_type: Event identifier (e.g., "job_completed", "patent_alert")
|
|
data: Event data to include in the payload
|
|
"""
|
|
if not WEBHOOK_URLS:
|
|
return
|
|
|
|
for url in WEBHOOK_URLS:
|
|
slack = _is_slack_url(url)
|
|
payload = _build_payload(event_type, data, slack=slack)
|
|
_send_with_retry(url, payload)
|
|
|
|
|
|
def enqueue_notify(event_type: str, data: dict[str, Any]) -> None:
|
|
"""Enqueue webhook delivery for all configured URLs (non-blocking).
|
|
|
|
Returns immediately after placing tasks on the background queue.
|
|
The worker thread handles retry logic asynchronously.
|
|
|
|
Safe to call even when no webhooks are configured.
|
|
|
|
Args:
|
|
event_type: Event identifier (e.g., "job_completed", "patent_alert")
|
|
data: Event data to include in the payload
|
|
"""
|
|
if not WEBHOOK_URLS:
|
|
return
|
|
|
|
from SPARC.task_queue import WebhookTask, enqueue
|
|
|
|
for url in WEBHOOK_URLS:
|
|
slack = _is_slack_url(url)
|
|
payload = _build_payload(event_type, data, slack=slack)
|
|
enqueue(WebhookTask(url=url, payload=payload))
|
|
|
|
|
|
def notify_job_completed(
|
|
job_id: str,
|
|
status: str,
|
|
total_companies: int,
|
|
successful: int,
|
|
failed: int,
|
|
) -> None:
|
|
"""Send notification when a batch job completes (blocking)."""
|
|
notify("job_completed", {
|
|
"job_id": job_id,
|
|
"status": status,
|
|
"total_companies": total_companies,
|
|
"successful": successful,
|
|
"failed": failed,
|
|
"summary": f"Batch job {job_id}: {successful}/{total_companies} succeeded",
|
|
})
|
|
|
|
|
|
def enqueue_job_completed(
|
|
job_id: str,
|
|
status: str,
|
|
total_companies: int,
|
|
successful: int,
|
|
failed: int,
|
|
) -> None:
|
|
"""Enqueue notification when a batch job completes (non-blocking)."""
|
|
enqueue_notify("job_completed", {
|
|
"job_id": job_id,
|
|
"status": status,
|
|
"total_companies": total_companies,
|
|
"successful": successful,
|
|
"failed": failed,
|
|
"summary": f"Batch job {job_id}: {successful}/{total_companies} succeeded",
|
|
})
|
|
|
|
|
|
def notify_alert(
|
|
company_name: str,
|
|
alert_type: str,
|
|
message: str,
|
|
) -> None:
|
|
"""Send notification for a tracked company alert (blocking)."""
|
|
notify("patent_alert", {
|
|
"company_name": company_name,
|
|
"alert_type": alert_type,
|
|
"message": message,
|
|
})
|
|
|
|
|
|
def enqueue_alert(
|
|
company_name: str,
|
|
alert_type: str,
|
|
message: str,
|
|
) -> None:
|
|
"""Enqueue notification for a tracked company alert (non-blocking)."""
|
|
enqueue_notify("patent_alert", {
|
|
"company_name": company_name,
|
|
"alert_type": alert_type,
|
|
"message": message,
|
|
})
|