Compare commits

..

1 Commits

Author SHA1 Message Date
agent-company 5c25a0f589 fix: enforce max_length=128 and validate GET /analyze/batch filter
Closes leeworks-agents/SPARC#1685

- Increase CompanyName max_length from 100 to 128 everywhere (Pydantic
  type, Path constraints, and the inline Query on analyze/patent).
- Add _COMPANY_NAME_FILTER_QUERY reusable Query annotation and apply it
  to the optional company_name filter on GET /analyze/batch so it is
  validated with the same rules as all other endpoints.
- Update tests: rename test_over_100_chars_rejected → 128, add
  test_exactly_128_chars_accepted at the new boundary, fix batch
  too-long test to use 129 chars, update valid-name parametrize to use
  "A"*128, and add five new tests covering GET /analyze/batch filter
  validation (special chars, too-short, too-long, valid, omitted).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-19 15:18:09 +00:00
6 changed files with 66 additions and 463 deletions
+25 -18
View File
@@ -36,16 +36,28 @@ from SPARC.auth import (
) )
from SPARC.types import BatchAnalysisResult, CompanyAnalysisResult from SPARC.types import BatchAnalysisResult, CompanyAnalysisResult
# Validated company name type: 2-100 chars, alphanumeric + spaces/hyphens/ampersands/periods only. # Validated company name type: 2-128 chars, alphanumeric + spaces/hyphens/ampersands/periods only.
CompanyName = Annotated[ CompanyName = Annotated[
str, str,
StringConstraints( StringConstraints(
min_length=2, min_length=2,
max_length=100, max_length=128,
pattern=r"^[a-zA-Z0-9][a-zA-Z0-9 \-&.]*$", pattern=r"^[a-zA-Z0-9][a-zA-Z0-9 \-&.]*$",
), ),
] ]
# Reusable Query constraint for optional company_name filter parameters.
_COMPANY_NAME_FILTER_QUERY = Query(
default=None,
min_length=2,
max_length=128,
pattern=r"^[a-zA-Z0-9][a-zA-Z0-9 \-&.]*$",
description=(
"Company name filter (2-128 chars; alphanumeric, spaces, hyphens, "
"periods, and ampersands only)"
),
)
# Pydantic models for API # Pydantic models for API
class CompanyAnalysisResponse(BaseModel): class CompanyAnalysisResponse(BaseModel):
@@ -224,16 +236,11 @@ async def lifespan(app: FastAPI):
import logging import logging
logging.getLogger(__name__).warning("Marked %d stale jobs as failed on startup", stale) logging.getLogger(__name__).warning("Marked %d stale jobs as failed on startup", stale)
_db.close() _db.close()
# Start webhook background worker
from SPARC.task_queue import start_worker as start_webhook_worker
from SPARC.task_queue import stop_worker as stop_webhook_worker
start_webhook_worker()
# Start scheduled analysis if tracked companies are configured # Start scheduled analysis if tracked companies are configured
from SPARC.scheduler import start_scheduler from SPARC.scheduler import start_scheduler
start_scheduler() start_scheduler()
yield yield
# Cleanup # Cleanup
stop_webhook_worker()
_analyzer = None _analyzer = None
close_db_client() close_db_client()
@@ -494,7 +501,7 @@ async def add_tracked_company(
@app.delete("/admin/tracked/{company_name}", tags=["Admin"]) @app.delete("/admin/tracked/{company_name}", tags=["Admin"])
async def remove_tracked_company( async def remove_tracked_company(
company_name: Annotated[str, Path(min_length=2, max_length=100, pattern=r"^[a-zA-Z0-9][a-zA-Z0-9 \-&.]*$")], company_name: Annotated[str, Path(min_length=2, max_length=128, pattern=r"^[a-zA-Z0-9][a-zA-Z0-9 \-&.]*$")],
_: UserResponse = Depends(get_current_admin), _: UserResponse = Depends(get_current_admin),
): ):
"""Remove a company from the tracked list (admin only).""" """Remove a company from the tracked list (admin only)."""
@@ -682,7 +689,7 @@ async def get_analytics_trends(
@app.get("/export/{company_name}", tags=["Export"]) @app.get("/export/{company_name}", tags=["Export"])
async def export_company_csv( async def export_company_csv(
company_name: Annotated[str, Path(min_length=2, max_length=100, pattern=r"^[a-zA-Z0-9][a-zA-Z0-9 \-&.]*$")], company_name: Annotated[str, Path(min_length=2, max_length=128, pattern=r"^[a-zA-Z0-9][a-zA-Z0-9 \-&.]*$")],
_: UserResponse = Depends(get_current_user), _: UserResponse = Depends(get_current_user),
): ):
"""Export analysis results for a company as a CSV file. """Export analysis results for a company as a CSV file.
@@ -734,7 +741,7 @@ async def export_company_csv(
@app.get("/export/{company_name}/pdf", tags=["Export"]) @app.get("/export/{company_name}/pdf", tags=["Export"])
async def export_company_pdf( async def export_company_pdf(
company_name: Annotated[str, Path(min_length=2, max_length=100, pattern=r"^[a-zA-Z0-9][a-zA-Z0-9 \-&.]*$")], company_name: Annotated[str, Path(min_length=2, max_length=128, pattern=r"^[a-zA-Z0-9][a-zA-Z0-9 \-&.]*$")],
_: UserResponse = Depends(get_current_user), _: UserResponse = Depends(get_current_user),
): ):
"""Export analysis results for a company as a formatted PDF report. """Export analysis results for a company as a formatted PDF report.
@@ -908,7 +915,7 @@ async def health_check():
tags=["Analysis"], tags=["Analysis"],
) )
async def analyze_company( async def analyze_company(
company_name: Annotated[str, Path(min_length=2, max_length=100, pattern=r"^[a-zA-Z0-9][a-zA-Z0-9 \-&.]*$")], company_name: Annotated[str, Path(min_length=2, max_length=128, pattern=r"^[a-zA-Z0-9][a-zA-Z0-9 \-&.]*$")],
model: str | None = Query(default=None, description="LLM model to use (e.g. 'openai/gpt-4o'). Defaults to server config."), model: str | None = Query(default=None, description="LLM model to use (e.g. 'openai/gpt-4o'). Defaults to server config."),
_: UserResponse = Depends(get_current_user), _: UserResponse = Depends(get_current_user),
): ):
@@ -938,7 +945,7 @@ async def analyze_company(
) )
async def analyze_single_patent( async def analyze_single_patent(
patent_id: str, patent_id: str,
company_name: Annotated[str, Query(min_length=2, max_length=100, pattern=r"^[a-zA-Z0-9][a-zA-Z0-9 \-&.]*$", description="Company name for analysis context")], company_name: Annotated[str, Query(min_length=2, max_length=128, pattern=r"^[a-zA-Z0-9][a-zA-Z0-9 \-&.]*$", description="Company name for analysis context")],
_: UserResponse = Depends(get_current_user), _: UserResponse = Depends(get_current_user),
): ):
"""Analyze a single patent by its publication ID. """Analyze a single patent by its publication ID.
@@ -972,7 +979,7 @@ async def analyze_single_patent(
async def list_analysis_results( async def list_analysis_results(
company_name: Annotated[ company_name: Annotated[
str | None, str | None,
Query(description="Filter results by company name"), _COMPANY_NAME_FILTER_QUERY,
] = None, ] = None,
limit: Annotated[int, Query(ge=1, le=200)] = 50, limit: Annotated[int, Query(ge=1, le=200)] = 50,
cursor: Annotated[ cursor: Annotated[
@@ -1109,9 +1116,9 @@ def _run_batch_job(job_id: str, companies: list[str], max_workers: int, model: s
progress=100, progress=100,
result_json=_json.dumps(batch_response.model_dump(), default=str), result_json=_json.dumps(batch_response.model_dump(), default=str),
) )
# Fire webhook notification (non-blocking via task queue) # Fire webhook notification
from SPARC.webhooks import enqueue_job_completed from SPARC.webhooks import notify_job_completed
enqueue_job_completed( notify_job_completed(
job_id=job_id, job_id=job_id,
status="completed", status="completed",
total_companies=result.total_companies, total_companies=result.total_companies,
@@ -1120,8 +1127,8 @@ def _run_batch_job(job_id: str, companies: list[str], max_workers: int, model: s
) )
except Exception as e: except Exception as e:
db.update_job(job_id, status="failed", error=str(e)) db.update_job(job_id, status="failed", error=str(e))
from SPARC.webhooks import enqueue_job_completed from SPARC.webhooks import notify_job_completed
enqueue_job_completed( notify_job_completed(
job_id=job_id, job_id=job_id,
status="failed", status="failed",
total_companies=len(companies), total_companies=len(companies),
-7
View File
@@ -71,13 +71,6 @@ def run_scheduled_analysis() -> None:
old_value=old_count, old_value=old_count,
new_value=new_count, new_value=new_count,
) )
# Fire non-blocking webhook notification
from SPARC.webhooks import enqueue_alert
enqueue_alert(
company_name=name,
alert_type="patent_count_change",
message=message,
)
elif new_count > 0: elif new_count > 0:
# First analysis -- record baseline # First analysis -- record baseline
logger.info("Baseline for %s: %d patents", name, new_count) logger.info("Baseline for %s: %d patents", name, new_count)
-113
View File
@@ -1,113 +0,0 @@
"""Lightweight in-process task queue for non-blocking webhook delivery.
Uses a daemon thread and a :class:`queue.Queue` so that the scheduler and
background jobs can enqueue webhook deliveries without blocking on HTTP
round-trips and retry backoff.
No external dependencies (Redis, etc.) are required.
"""
import logging
import queue
import threading
from dataclasses import dataclass
from typing import Any
logger = logging.getLogger(__name__)
@dataclass(frozen=True)
class WebhookTask:
"""A single webhook delivery request."""
url: str
payload: dict[str, Any]
# ---------------------------------------------------------------------------
# Module-level singleton
# ---------------------------------------------------------------------------
_queue: queue.Queue[WebhookTask | None] = queue.Queue()
_worker_thread: threading.Thread | None = None
_started = threading.Event()
def _worker_loop() -> None:
"""Process webhook tasks until a ``None`` sentinel is received."""
import SPARC.webhooks as _webhooks # deferred to avoid circular import
logger.info("Webhook worker thread started")
_started.set()
while True:
task = _queue.get()
if task is None:
# Sentinel — shut down
logger.info("Webhook worker thread stopping")
_queue.task_done()
break
try:
# Look up dynamically so that tests can patch the function
_webhooks._send_with_retry(task.url, task.payload)
except Exception:
logger.exception("Unexpected error delivering webhook to %s", task.url)
finally:
_queue.task_done()
def start_worker() -> None:
"""Start the background worker thread (idempotent)."""
global _worker_thread
if _worker_thread is not None and _worker_thread.is_alive():
return
_started.clear()
_worker_thread = threading.Thread(target=_worker_loop, daemon=True, name="webhook-worker")
_worker_thread.start()
_started.wait() # block until the worker is actually running
logger.info("Webhook task queue ready")
def stop_worker(timeout: float = 5.0) -> None:
"""Send the stop sentinel and wait for the worker to finish.
Args:
timeout: Maximum seconds to wait for the worker thread to join.
"""
global _worker_thread
if _worker_thread is None or not _worker_thread.is_alive():
_worker_thread = None
return
_queue.put(None) # sentinel
_worker_thread.join(timeout=timeout)
_worker_thread = None
logger.info("Webhook task queue stopped")
def enqueue(task: WebhookTask) -> None:
"""Add a webhook delivery task to the queue.
If the worker has not been started the task is still accepted into the
queue and will be processed once :func:`start_worker` is called.
"""
_queue.put(task)
def queue_size() -> int:
"""Return the approximate number of pending tasks."""
return _queue.qsize()
def drain(timeout: float = 10.0) -> None:
"""Block until all currently-enqueued tasks have been processed.
Useful in tests and graceful shutdown to ensure pending deliveries
complete before the process exits.
Args:
timeout: Maximum seconds to wait.
"""
_queue.join()
+3 -58
View File
@@ -91,10 +91,9 @@ def _send_with_retry(url: str, payload: dict) -> bool:
def notify(event_type: str, data: dict[str, Any]) -> None: def notify(event_type: str, data: dict[str, Any]) -> None:
"""Fire all configured webhooks for an event (**blocking**). """Fire all configured webhooks for an event.
Safe to call even when no webhooks are configured (returns immediately). Safe to call even when no webhooks are configured (returns immediately).
For non-blocking delivery, use :func:`enqueue_notify` instead.
Args: Args:
event_type: Event identifier (e.g., "job_completed", "patent_alert") event_type: Event identifier (e.g., "job_completed", "patent_alert")
@@ -109,29 +108,6 @@ def notify(event_type: str, data: dict[str, Any]) -> None:
_send_with_retry(url, payload) _send_with_retry(url, payload)
def enqueue_notify(event_type: str, data: dict[str, Any]) -> None:
"""Enqueue webhook delivery for all configured URLs (non-blocking).
Returns immediately after placing tasks on the background queue.
The worker thread handles retry logic asynchronously.
Safe to call even when no webhooks are configured.
Args:
event_type: Event identifier (e.g., "job_completed", "patent_alert")
data: Event data to include in the payload
"""
if not WEBHOOK_URLS:
return
from SPARC.task_queue import WebhookTask, enqueue
for url in WEBHOOK_URLS:
slack = _is_slack_url(url)
payload = _build_payload(event_type, data, slack=slack)
enqueue(WebhookTask(url=url, payload=payload))
def notify_job_completed( def notify_job_completed(
job_id: str, job_id: str,
status: str, status: str,
@@ -139,7 +115,7 @@ def notify_job_completed(
successful: int, successful: int,
failed: int, failed: int,
) -> None: ) -> None:
"""Send notification when a batch job completes (blocking).""" """Send notification when a batch job completes."""
notify("job_completed", { notify("job_completed", {
"job_id": job_id, "job_id": job_id,
"status": status, "status": status,
@@ -150,45 +126,14 @@ def notify_job_completed(
}) })
def enqueue_job_completed(
job_id: str,
status: str,
total_companies: int,
successful: int,
failed: int,
) -> None:
"""Enqueue notification when a batch job completes (non-blocking)."""
enqueue_notify("job_completed", {
"job_id": job_id,
"status": status,
"total_companies": total_companies,
"successful": successful,
"failed": failed,
"summary": f"Batch job {job_id}: {successful}/{total_companies} succeeded",
})
def notify_alert( def notify_alert(
company_name: str, company_name: str,
alert_type: str, alert_type: str,
message: str, message: str,
) -> None: ) -> None:
"""Send notification for a tracked company alert (blocking).""" """Send notification for a tracked company alert."""
notify("patent_alert", { notify("patent_alert", {
"company_name": company_name, "company_name": company_name,
"alert_type": alert_type, "alert_type": alert_type,
"message": message, "message": message,
}) })
def enqueue_alert(
company_name: str,
alert_type: str,
message: str,
) -> None:
"""Enqueue notification for a tracked company alert (non-blocking)."""
enqueue_notify("patent_alert", {
"company_name": company_name,
"alert_type": alert_type,
"message": message,
})
+38 -5
View File
@@ -43,12 +43,18 @@ class TestCompanyNameValidation:
# --- Too long --- # --- Too long ---
def test_over_100_chars_rejected(self, client, mock_analyzer): def test_over_128_chars_rejected(self, client, mock_analyzer):
"""A company name longer than 100 characters should be rejected.""" """A company name longer than 128 characters should be rejected."""
long_name = "A" * 101 long_name = "A" * 129
response = client.get(f"/analyze/{long_name}") response = client.get(f"/analyze/{long_name}")
assert response.status_code == 422 assert response.status_code == 422
def test_exactly_128_chars_accepted(self, client, mock_analyzer):
"""A company name of exactly 128 characters should be accepted."""
max_name = "A" * 128
response = client.get(f"/analyze/{max_name}")
assert response.status_code != 422
# --- Special characters --- # --- Special characters ---
@pytest.mark.parametrize( @pytest.mark.parametrize(
@@ -95,7 +101,7 @@ class TestCompanyNameValidation:
"3M", "3M",
"21st Century Fox", "21st Century Fox",
"ab", # minimum length "ab", # minimum length
"A" * 100, # maximum length "A" * 128, # maximum length
], ],
) )
def test_valid_names_accepted(self, client, mock_analyzer, valid_name): def test_valid_names_accepted(self, client, mock_analyzer, valid_name):
@@ -118,7 +124,7 @@ class TestCompanyNameValidation:
"""Batch endpoint should reject company names that are too long.""" """Batch endpoint should reject company names that are too long."""
response = client.post( response = client.post(
"/analyze/batch", "/analyze/batch",
json={"companies": ["A" * 101]}, json={"companies": ["A" * 129]},
) )
assert response.status_code == 422 assert response.status_code == 422
@@ -155,3 +161,30 @@ class TestCompanyNameValidation:
json={"companies": ["-nvidia"]}, json={"companies": ["-nvidia"]},
) )
assert response.status_code == 422 assert response.status_code == 422
# --- GET /analyze/batch company_name filter validation ---
def test_batch_filter_special_chars_rejected(self, client, mock_analyzer):
"""GET /analyze/batch company_name filter rejects disallowed chars."""
response = client.get("/analyze/batch", params={"company_name": "nvidia!"})
assert response.status_code == 422
def test_batch_filter_too_short_rejected(self, client, mock_analyzer):
"""GET /analyze/batch company_name filter rejects names under 2 chars."""
response = client.get("/analyze/batch", params={"company_name": "X"})
assert response.status_code == 422
def test_batch_filter_too_long_rejected(self, client, mock_analyzer):
"""GET /analyze/batch company_name filter rejects names over 128 chars."""
response = client.get("/analyze/batch", params={"company_name": "A" * 129})
assert response.status_code == 422
def test_batch_filter_valid_name_accepted(self, client, mock_analyzer):
"""GET /analyze/batch company_name filter accepts a valid name."""
response = client.get("/analyze/batch", params={"company_name": "nvidia"})
assert response.status_code != 422
def test_batch_filter_omitted_accepted(self, client, mock_analyzer):
"""GET /analyze/batch without company_name filter should work fine."""
response = client.get("/analyze/batch")
assert response.status_code != 422
-262
View File
@@ -1,262 +0,0 @@
"""Tests for the webhook background task queue.
Covers:
- Worker lifecycle (start / stop / idempotent start)
- Tasks are processed asynchronously by the worker
- Retry logic is preserved (executed inside the worker thread)
- enqueue_notify / enqueue_job_completed / enqueue_alert non-blocking helpers
- Integration: queued webhook task is eventually delivered (mocked HTTP)
"""
import threading
import time
from unittest.mock import MagicMock, call, patch
import pytest
from SPARC.task_queue import (
WebhookTask,
drain,
enqueue,
queue_size,
start_worker,
stop_worker,
)
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture(autouse=True)
def _worker_lifecycle():
"""Start the worker before each test and stop it after."""
start_worker()
yield
stop_worker(timeout=3)
# ---------------------------------------------------------------------------
# Worker lifecycle
# ---------------------------------------------------------------------------
class TestWorkerLifecycle:
def test_start_is_idempotent(self):
"""Calling start_worker() twice does not create a second thread."""
import SPARC.task_queue as tq
first = tq._worker_thread
start_worker()
assert tq._worker_thread is first
def test_stop_worker_gracefully(self):
"""stop_worker joins the thread cleanly."""
import SPARC.task_queue as tq
assert tq._worker_thread is not None
stop_worker(timeout=3)
assert tq._worker_thread is None
# ---------------------------------------------------------------------------
# Task processing
# ---------------------------------------------------------------------------
class TestTaskProcessing:
@patch("SPARC.webhooks._send_with_retry")
def test_enqueued_task_is_delivered(self, mock_send):
"""A task put on the queue is eventually processed by the worker."""
mock_send.return_value = True
task = WebhookTask(url="https://example.com/hook", payload={"event": "test"})
enqueue(task)
drain(timeout=5)
mock_send.assert_called_once_with("https://example.com/hook", {"event": "test"})
@patch("SPARC.webhooks._send_with_retry")
def test_multiple_tasks_processed_in_order(self, mock_send):
"""Tasks are processed FIFO."""
mock_send.return_value = True
for i in range(3):
enqueue(WebhookTask(url=f"https://example.com/{i}", payload={"n": i}))
drain(timeout=5)
assert mock_send.call_count == 3
urls = [c[0][0] for c in mock_send.call_args_list]
assert urls == [
"https://example.com/0",
"https://example.com/1",
"https://example.com/2",
]
@patch("SPARC.webhooks._send_with_retry")
def test_enqueue_returns_immediately(self, mock_send):
"""enqueue() does not block even if the worker is slow."""
event = threading.Event()
def slow_send(url, payload):
event.wait(timeout=5)
return True
mock_send.side_effect = slow_send
start = time.monotonic()
enqueue(WebhookTask(url="https://slow.example.com", payload={}))
elapsed = time.monotonic() - start
# enqueue should return in well under 1 second
assert elapsed < 0.5
# Let the worker finish
event.set()
drain(timeout=5)
@patch("SPARC.webhooks._send_with_retry", side_effect=RuntimeError("boom"))
def test_worker_survives_unexpected_error(self, mock_send):
"""An unexpected exception in delivery does not kill the worker."""
enqueue(WebhookTask(url="https://example.com/bad", payload={}))
drain(timeout=5)
# Worker is still alive; enqueue another task
mock_send.side_effect = None
mock_send.return_value = True
enqueue(WebhookTask(url="https://example.com/good", payload={"ok": True}))
drain(timeout=5)
assert mock_send.call_count == 2
# ---------------------------------------------------------------------------
# Retry logic preserved in worker context
# ---------------------------------------------------------------------------
class TestRetryInWorker:
@patch("SPARC.webhooks.time.sleep")
@patch("SPARC.webhooks.requests.post")
def test_retry_logic_runs_inside_worker(self, mock_post, mock_sleep):
"""The worker thread uses _send_with_retry, which retries on failure."""
mock_post.side_effect = [
MagicMock(status_code=500),
MagicMock(status_code=200),
]
enqueue(WebhookTask(
url="https://example.com/retry",
payload={"event": "test"},
))
drain(timeout=10)
assert mock_post.call_count == 2
mock_sleep.assert_called_once()
@patch("SPARC.webhooks.time.sleep")
@patch("SPARC.webhooks.requests.post")
def test_all_retries_exhausted_in_worker(self, mock_post, mock_sleep):
"""Worker handles permanent failure gracefully."""
mock_post.return_value = MagicMock(status_code=500)
enqueue(WebhookTask(
url="https://example.com/fail",
payload={"event": "test"},
))
drain(timeout=10)
from SPARC.webhooks import MAX_RETRIES
assert mock_post.call_count == MAX_RETRIES
# ---------------------------------------------------------------------------
# Integration: enqueue_notify and convenience helpers
# ---------------------------------------------------------------------------
class TestEnqueueHelpers:
@patch("SPARC.webhooks._send_with_retry")
@patch("SPARC.webhooks.WEBHOOK_URLS", ["https://example.com/hook"])
def test_enqueue_notify_delivers_via_worker(self, mock_send):
"""enqueue_notify puts a task on the queue and the worker delivers it."""
mock_send.return_value = True
from SPARC.webhooks import enqueue_notify
enqueue_notify("test_event", {"key": "val"})
drain(timeout=5)
mock_send.assert_called_once()
url, payload = mock_send.call_args[0]
assert url == "https://example.com/hook"
assert payload["event"] == "test_event"
assert payload["key"] == "val"
@patch("SPARC.webhooks._send_with_retry")
@patch("SPARC.webhooks.WEBHOOK_URLS", ["https://example.com/hook"])
def test_enqueue_job_completed(self, mock_send):
"""enqueue_job_completed sends job completion data via the queue."""
mock_send.return_value = True
from SPARC.webhooks import enqueue_job_completed
enqueue_job_completed(
job_id="job-1",
status="completed",
total_companies=5,
successful=4,
failed=1,
)
drain(timeout=5)
mock_send.assert_called_once()
payload = mock_send.call_args[0][1]
assert payload["event"] == "job_completed"
assert payload["job_id"] == "job-1"
assert payload["successful"] == 4
@patch("SPARC.webhooks._send_with_retry")
@patch("SPARC.webhooks.WEBHOOK_URLS", ["https://example.com/hook"])
def test_enqueue_alert(self, mock_send):
"""enqueue_alert sends alert data via the queue."""
mock_send.return_value = True
from SPARC.webhooks import enqueue_alert
enqueue_alert(
company_name="NVIDIA",
alert_type="patent_count_change",
message="Patent count increased by 30%",
)
drain(timeout=5)
mock_send.assert_called_once()
payload = mock_send.call_args[0][1]
assert payload["event"] == "patent_alert"
assert payload["company_name"] == "NVIDIA"
@patch("SPARC.webhooks._send_with_retry")
@patch("SPARC.webhooks.WEBHOOK_URLS", [])
def test_enqueue_notify_noop_when_no_urls(self, mock_send):
"""enqueue_notify is a no-op when WEBHOOK_URLS is empty."""
from SPARC.webhooks import enqueue_notify
enqueue_notify("test_event", {"key": "val"})
drain(timeout=2)
mock_send.assert_not_called()
@patch("SPARC.webhooks._send_with_retry")
@patch("SPARC.webhooks.WEBHOOK_URLS", [
"https://hooks.slack.com/services/T00/B00/xxx",
"https://example.com/generic",
])
def test_enqueue_notify_slack_formatting(self, mock_send):
"""Slack URLs get Slack-formatted payloads even via the queue."""
mock_send.return_value = True
from SPARC.webhooks import enqueue_notify
enqueue_notify("test_event", {"key": "val"})
drain(timeout=5)
assert mock_send.call_count == 2
slack_payload = mock_send.call_args_list[0][0][1]
assert "text" in slack_payload
generic_payload = mock_send.call_args_list[1][0][1]
assert "event" in generic_payload