diff --git a/SPARC/api.py b/SPARC/api.py index 1b29d38..7829789 100644 --- a/SPARC/api.py +++ b/SPARC/api.py @@ -224,11 +224,16 @@ async def lifespan(app: FastAPI): import logging logging.getLogger(__name__).warning("Marked %d stale jobs as failed on startup", stale) _db.close() + # Start webhook background worker + from SPARC.task_queue import start_worker as start_webhook_worker + from SPARC.task_queue import stop_worker as stop_webhook_worker + start_webhook_worker() # Start scheduled analysis if tracked companies are configured from SPARC.scheduler import start_scheduler start_scheduler() yield # Cleanup + stop_webhook_worker() _analyzer = None close_db_client() @@ -1104,9 +1109,9 @@ def _run_batch_job(job_id: str, companies: list[str], max_workers: int, model: s progress=100, result_json=_json.dumps(batch_response.model_dump(), default=str), ) - # Fire webhook notification - from SPARC.webhooks import notify_job_completed - notify_job_completed( + # Fire webhook notification (non-blocking via task queue) + from SPARC.webhooks import enqueue_job_completed + enqueue_job_completed( job_id=job_id, status="completed", total_companies=result.total_companies, @@ -1115,8 +1120,8 @@ def _run_batch_job(job_id: str, companies: list[str], max_workers: int, model: s ) except Exception as e: db.update_job(job_id, status="failed", error=str(e)) - from SPARC.webhooks import notify_job_completed - notify_job_completed( + from SPARC.webhooks import enqueue_job_completed + enqueue_job_completed( job_id=job_id, status="failed", total_companies=len(companies), diff --git a/SPARC/scheduler.py b/SPARC/scheduler.py index 4428bfd..51cdf37 100644 --- a/SPARC/scheduler.py +++ b/SPARC/scheduler.py @@ -71,6 +71,13 @@ def run_scheduled_analysis() -> None: old_value=old_count, new_value=new_count, ) + # Fire non-blocking webhook notification + from SPARC.webhooks import enqueue_alert + enqueue_alert( + company_name=name, + alert_type="patent_count_change", + message=message, + ) elif new_count > 0: # First analysis -- record baseline logger.info("Baseline for %s: %d patents", name, new_count) diff --git a/SPARC/task_queue.py b/SPARC/task_queue.py new file mode 100644 index 0000000..bab7faa --- /dev/null +++ b/SPARC/task_queue.py @@ -0,0 +1,113 @@ +"""Lightweight in-process task queue for non-blocking webhook delivery. + +Uses a daemon thread and a :class:`queue.Queue` so that the scheduler and +background jobs can enqueue webhook deliveries without blocking on HTTP +round-trips and retry backoff. + +No external dependencies (Redis, etc.) are required. +""" + +import logging +import queue +import threading +from dataclasses import dataclass +from typing import Any + +logger = logging.getLogger(__name__) + + +@dataclass(frozen=True) +class WebhookTask: + """A single webhook delivery request.""" + + url: str + payload: dict[str, Any] + + +# --------------------------------------------------------------------------- +# Module-level singleton +# --------------------------------------------------------------------------- + +_queue: queue.Queue[WebhookTask | None] = queue.Queue() +_worker_thread: threading.Thread | None = None +_started = threading.Event() + + +def _worker_loop() -> None: + """Process webhook tasks until a ``None`` sentinel is received.""" + import SPARC.webhooks as _webhooks # deferred to avoid circular import + + logger.info("Webhook worker thread started") + _started.set() + + while True: + task = _queue.get() + if task is None: + # Sentinel — shut down + logger.info("Webhook worker thread stopping") + _queue.task_done() + break + + try: + # Look up dynamically so that tests can patch the function + _webhooks._send_with_retry(task.url, task.payload) + except Exception: + logger.exception("Unexpected error delivering webhook to %s", task.url) + finally: + _queue.task_done() + + +def start_worker() -> None: + """Start the background worker thread (idempotent).""" + global _worker_thread + if _worker_thread is not None and _worker_thread.is_alive(): + return + + _started.clear() + _worker_thread = threading.Thread(target=_worker_loop, daemon=True, name="webhook-worker") + _worker_thread.start() + _started.wait() # block until the worker is actually running + logger.info("Webhook task queue ready") + + +def stop_worker(timeout: float = 5.0) -> None: + """Send the stop sentinel and wait for the worker to finish. + + Args: + timeout: Maximum seconds to wait for the worker thread to join. + """ + global _worker_thread + if _worker_thread is None or not _worker_thread.is_alive(): + _worker_thread = None + return + + _queue.put(None) # sentinel + _worker_thread.join(timeout=timeout) + _worker_thread = None + logger.info("Webhook task queue stopped") + + +def enqueue(task: WebhookTask) -> None: + """Add a webhook delivery task to the queue. + + If the worker has not been started the task is still accepted into the + queue and will be processed once :func:`start_worker` is called. + """ + _queue.put(task) + + +def queue_size() -> int: + """Return the approximate number of pending tasks.""" + return _queue.qsize() + + +def drain(timeout: float = 10.0) -> None: + """Block until all currently-enqueued tasks have been processed. + + Useful in tests and graceful shutdown to ensure pending deliveries + complete before the process exits. + + Args: + timeout: Maximum seconds to wait. + """ + _queue.join() diff --git a/SPARC/webhooks.py b/SPARC/webhooks.py index 08760fe..35de060 100644 --- a/SPARC/webhooks.py +++ b/SPARC/webhooks.py @@ -91,9 +91,10 @@ def _send_with_retry(url: str, payload: dict) -> bool: def notify(event_type: str, data: dict[str, Any]) -> None: - """Fire all configured webhooks for an event. + """Fire all configured webhooks for an event (**blocking**). Safe to call even when no webhooks are configured (returns immediately). + For non-blocking delivery, use :func:`enqueue_notify` instead. Args: event_type: Event identifier (e.g., "job_completed", "patent_alert") @@ -108,6 +109,29 @@ def notify(event_type: str, data: dict[str, Any]) -> None: _send_with_retry(url, payload) +def enqueue_notify(event_type: str, data: dict[str, Any]) -> None: + """Enqueue webhook delivery for all configured URLs (non-blocking). + + Returns immediately after placing tasks on the background queue. + The worker thread handles retry logic asynchronously. + + Safe to call even when no webhooks are configured. + + Args: + event_type: Event identifier (e.g., "job_completed", "patent_alert") + data: Event data to include in the payload + """ + if not WEBHOOK_URLS: + return + + from SPARC.task_queue import WebhookTask, enqueue + + for url in WEBHOOK_URLS: + slack = _is_slack_url(url) + payload = _build_payload(event_type, data, slack=slack) + enqueue(WebhookTask(url=url, payload=payload)) + + def notify_job_completed( job_id: str, status: str, @@ -115,7 +139,7 @@ def notify_job_completed( successful: int, failed: int, ) -> None: - """Send notification when a batch job completes.""" + """Send notification when a batch job completes (blocking).""" notify("job_completed", { "job_id": job_id, "status": status, @@ -126,14 +150,45 @@ def notify_job_completed( }) +def enqueue_job_completed( + job_id: str, + status: str, + total_companies: int, + successful: int, + failed: int, +) -> None: + """Enqueue notification when a batch job completes (non-blocking).""" + enqueue_notify("job_completed", { + "job_id": job_id, + "status": status, + "total_companies": total_companies, + "successful": successful, + "failed": failed, + "summary": f"Batch job {job_id}: {successful}/{total_companies} succeeded", + }) + + def notify_alert( company_name: str, alert_type: str, message: str, ) -> None: - """Send notification for a tracked company alert.""" + """Send notification for a tracked company alert (blocking).""" notify("patent_alert", { "company_name": company_name, "alert_type": alert_type, "message": message, }) + + +def enqueue_alert( + company_name: str, + alert_type: str, + message: str, +) -> None: + """Enqueue notification for a tracked company alert (non-blocking).""" + enqueue_notify("patent_alert", { + "company_name": company_name, + "alert_type": alert_type, + "message": message, + }) diff --git a/tests/test_task_queue.py b/tests/test_task_queue.py new file mode 100644 index 0000000..cf25cea --- /dev/null +++ b/tests/test_task_queue.py @@ -0,0 +1,262 @@ +"""Tests for the webhook background task queue. + +Covers: +- Worker lifecycle (start / stop / idempotent start) +- Tasks are processed asynchronously by the worker +- Retry logic is preserved (executed inside the worker thread) +- enqueue_notify / enqueue_job_completed / enqueue_alert non-blocking helpers +- Integration: queued webhook task is eventually delivered (mocked HTTP) +""" + +import threading +import time +from unittest.mock import MagicMock, call, patch + +import pytest + +from SPARC.task_queue import ( + WebhookTask, + drain, + enqueue, + queue_size, + start_worker, + stop_worker, +) + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + +@pytest.fixture(autouse=True) +def _worker_lifecycle(): + """Start the worker before each test and stop it after.""" + start_worker() + yield + stop_worker(timeout=3) + + +# --------------------------------------------------------------------------- +# Worker lifecycle +# --------------------------------------------------------------------------- + +class TestWorkerLifecycle: + def test_start_is_idempotent(self): + """Calling start_worker() twice does not create a second thread.""" + import SPARC.task_queue as tq + + first = tq._worker_thread + start_worker() + assert tq._worker_thread is first + + def test_stop_worker_gracefully(self): + """stop_worker joins the thread cleanly.""" + import SPARC.task_queue as tq + + assert tq._worker_thread is not None + stop_worker(timeout=3) + assert tq._worker_thread is None + + +# --------------------------------------------------------------------------- +# Task processing +# --------------------------------------------------------------------------- + +class TestTaskProcessing: + @patch("SPARC.webhooks._send_with_retry") + def test_enqueued_task_is_delivered(self, mock_send): + """A task put on the queue is eventually processed by the worker.""" + mock_send.return_value = True + task = WebhookTask(url="https://example.com/hook", payload={"event": "test"}) + + enqueue(task) + drain(timeout=5) + + mock_send.assert_called_once_with("https://example.com/hook", {"event": "test"}) + + @patch("SPARC.webhooks._send_with_retry") + def test_multiple_tasks_processed_in_order(self, mock_send): + """Tasks are processed FIFO.""" + mock_send.return_value = True + + for i in range(3): + enqueue(WebhookTask(url=f"https://example.com/{i}", payload={"n": i})) + + drain(timeout=5) + + assert mock_send.call_count == 3 + urls = [c[0][0] for c in mock_send.call_args_list] + assert urls == [ + "https://example.com/0", + "https://example.com/1", + "https://example.com/2", + ] + + @patch("SPARC.webhooks._send_with_retry") + def test_enqueue_returns_immediately(self, mock_send): + """enqueue() does not block even if the worker is slow.""" + event = threading.Event() + + def slow_send(url, payload): + event.wait(timeout=5) + return True + + mock_send.side_effect = slow_send + + start = time.monotonic() + enqueue(WebhookTask(url="https://slow.example.com", payload={})) + elapsed = time.monotonic() - start + + # enqueue should return in well under 1 second + assert elapsed < 0.5 + + # Let the worker finish + event.set() + drain(timeout=5) + + @patch("SPARC.webhooks._send_with_retry", side_effect=RuntimeError("boom")) + def test_worker_survives_unexpected_error(self, mock_send): + """An unexpected exception in delivery does not kill the worker.""" + enqueue(WebhookTask(url="https://example.com/bad", payload={})) + drain(timeout=5) + + # Worker is still alive; enqueue another task + mock_send.side_effect = None + mock_send.return_value = True + + enqueue(WebhookTask(url="https://example.com/good", payload={"ok": True})) + drain(timeout=5) + + assert mock_send.call_count == 2 + + +# --------------------------------------------------------------------------- +# Retry logic preserved in worker context +# --------------------------------------------------------------------------- + +class TestRetryInWorker: + @patch("SPARC.webhooks.time.sleep") + @patch("SPARC.webhooks.requests.post") + def test_retry_logic_runs_inside_worker(self, mock_post, mock_sleep): + """The worker thread uses _send_with_retry, which retries on failure.""" + mock_post.side_effect = [ + MagicMock(status_code=500), + MagicMock(status_code=200), + ] + + enqueue(WebhookTask( + url="https://example.com/retry", + payload={"event": "test"}, + )) + drain(timeout=10) + + assert mock_post.call_count == 2 + mock_sleep.assert_called_once() + + @patch("SPARC.webhooks.time.sleep") + @patch("SPARC.webhooks.requests.post") + def test_all_retries_exhausted_in_worker(self, mock_post, mock_sleep): + """Worker handles permanent failure gracefully.""" + mock_post.return_value = MagicMock(status_code=500) + + enqueue(WebhookTask( + url="https://example.com/fail", + payload={"event": "test"}, + )) + drain(timeout=10) + + from SPARC.webhooks import MAX_RETRIES + assert mock_post.call_count == MAX_RETRIES + + +# --------------------------------------------------------------------------- +# Integration: enqueue_notify and convenience helpers +# --------------------------------------------------------------------------- + +class TestEnqueueHelpers: + @patch("SPARC.webhooks._send_with_retry") + @patch("SPARC.webhooks.WEBHOOK_URLS", ["https://example.com/hook"]) + def test_enqueue_notify_delivers_via_worker(self, mock_send): + """enqueue_notify puts a task on the queue and the worker delivers it.""" + mock_send.return_value = True + + from SPARC.webhooks import enqueue_notify + enqueue_notify("test_event", {"key": "val"}) + drain(timeout=5) + + mock_send.assert_called_once() + url, payload = mock_send.call_args[0] + assert url == "https://example.com/hook" + assert payload["event"] == "test_event" + assert payload["key"] == "val" + + @patch("SPARC.webhooks._send_with_retry") + @patch("SPARC.webhooks.WEBHOOK_URLS", ["https://example.com/hook"]) + def test_enqueue_job_completed(self, mock_send): + """enqueue_job_completed sends job completion data via the queue.""" + mock_send.return_value = True + + from SPARC.webhooks import enqueue_job_completed + enqueue_job_completed( + job_id="job-1", + status="completed", + total_companies=5, + successful=4, + failed=1, + ) + drain(timeout=5) + + mock_send.assert_called_once() + payload = mock_send.call_args[0][1] + assert payload["event"] == "job_completed" + assert payload["job_id"] == "job-1" + assert payload["successful"] == 4 + + @patch("SPARC.webhooks._send_with_retry") + @patch("SPARC.webhooks.WEBHOOK_URLS", ["https://example.com/hook"]) + def test_enqueue_alert(self, mock_send): + """enqueue_alert sends alert data via the queue.""" + mock_send.return_value = True + + from SPARC.webhooks import enqueue_alert + enqueue_alert( + company_name="NVIDIA", + alert_type="patent_count_change", + message="Patent count increased by 30%", + ) + drain(timeout=5) + + mock_send.assert_called_once() + payload = mock_send.call_args[0][1] + assert payload["event"] == "patent_alert" + assert payload["company_name"] == "NVIDIA" + + @patch("SPARC.webhooks._send_with_retry") + @patch("SPARC.webhooks.WEBHOOK_URLS", []) + def test_enqueue_notify_noop_when_no_urls(self, mock_send): + """enqueue_notify is a no-op when WEBHOOK_URLS is empty.""" + from SPARC.webhooks import enqueue_notify + enqueue_notify("test_event", {"key": "val"}) + drain(timeout=2) + + mock_send.assert_not_called() + + @patch("SPARC.webhooks._send_with_retry") + @patch("SPARC.webhooks.WEBHOOK_URLS", [ + "https://hooks.slack.com/services/T00/B00/xxx", + "https://example.com/generic", + ]) + def test_enqueue_notify_slack_formatting(self, mock_send): + """Slack URLs get Slack-formatted payloads even via the queue.""" + mock_send.return_value = True + + from SPARC.webhooks import enqueue_notify + enqueue_notify("test_event", {"key": "val"}) + drain(timeout=5) + + assert mock_send.call_count == 2 + slack_payload = mock_send.call_args_list[0][0][1] + assert "text" in slack_payload + + generic_payload = mock_send.call_args_list[1][0][1] + assert "event" in generic_payload