Compare commits

..

24 Commits

Author SHA1 Message Date
agent-company ab3964b18d Move webhook delivery to background task queue
Introduce a lightweight in-process task queue (thread + queue.Queue) so
that webhook HTTP delivery no longer blocks the scheduler or batch-job
background tasks.  The worker thread preserves the existing exponential-
backoff retry logic from _send_with_retry.

- Add SPARC/task_queue.py: WebhookTask, start/stop worker, enqueue, drain
- Add enqueue_notify / enqueue_job_completed / enqueue_alert to webhooks.py
- Update api.py lifespan to start/stop the webhook worker
- Update _run_batch_job to use enqueue_job_completed (non-blocking)
- Update scheduler to fire enqueue_alert on patent count changes
- Add 13 tests covering worker lifecycle, async delivery, retry in worker
  context, and integration via enqueue helpers
- All 22 existing webhook tests continue to pass unchanged

Closes leeworks-agents/SPARC#1676

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-05-19 15:22:21 +00:00
AI-Manager 313800215c Merge pull request 'Add rate limit stats to admin panel' (#1682) from feature/1675-rate-limit-admin into main
Merge PR #1682
2026-05-19 00:12:56 +00:00
AI-Manager 222f29deb1 Merge pull request 'Add cursor-based pagination to /analyze/batch and /jobs' (#1681) from feature/1669-cursor-pagination into main
Merge PR #1681
2026-05-19 00:12:48 +00:00
AI-Manager e6d95bbf57 Merge pull request 'Add stricter input validation for company names' (#1680) from feature/1670-company-name-validation into main
Merge PR #1680
2026-05-19 00:12:42 +00:00
AI-Manager 68484ef4b1 Merge pull request 'Update ROADMAP.md: mark completed P1 and P2 items as done' (#1679) from feature/1678-update-roadmap into main
Merge PR #1679
2026-05-19 00:12:34 +00:00
agent-company a0cb9a5773 Add rate limit status and usage statistics to admin panel
Add GET /admin/rate-limits endpoint (admin-only) that returns current
rate limit configuration and request statistics for all rate-limited
endpoints (/auth/register and /auth/login). Tracks total requests and
rejection counts via in-memory counters.

Includes tests for admin access, non-admin rejection, empty state,
request tracking, and configuration display.

Closes leeworks-agents/SPARC#1675

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-05-18 21:53:01 +00:00
agent-company 857b3444df Add cursor-based pagination to GET /analyze/batch and update /jobs defaults
Add a new GET /analyze/batch endpoint that returns stored analysis results
with cursor-based pagination (default limit 50, max 200). Also update the
existing /jobs endpoint defaults from limit=10/max=100 to limit=50/max=200
for consistency.

The database layer gains a list_analyses() method with cursor support using
(timestamp, id) ordering, matching the existing list_jobs() pattern.

Includes tests for pagination behavior, boundary limits, cursor forwarding,
company name filtering, and empty result sets.

Closes leeworks-agents/SPARC#1669

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-05-18 21:49:22 +00:00
agent-company a95129904e Add stricter input validation for company names on analysis endpoints
Add a CompanyName validated type enforcing 2-100 character length and
allowing only alphanumeric characters, spaces, hyphens, ampersands, and
periods. Applied to all endpoints accepting company names: /analyze,
/analyze/patent, /analyze/batch, /admin/tracked, and /export.

Includes unit tests covering too-short, too-long, special character,
leading-character, and valid edge cases for both single and batch
endpoints.

Closes leeworks-agents/SPARC#1670

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-05-18 21:38:44 +00:00
agent-company 7c6eed8d72 Update ROADMAP.md to mark completed P1 and P2 items as done
Move seven completed items from the P1 and P2 sections into the
Completed section: in-memory jobs persistence, export endpoint tests,
tracked company admin tests, webhook integration tests, S3 storage
tests, auto-download path tests, and scheduler DatabaseClient refactor.

The P2 section now only lists the two genuinely open items: cursor-based
pagination (Issue #1669) and request validation (Issue #1670).

Closes leeworks-agents/SPARC#1678

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-05-18 21:29:14 +00:00
AI-Manager 4c411e1e0b Merge pull request 'Add tests for tracked company admin endpoints and scheduler' (#1667) from feature/1656-tracked-company-admin-tests into main
Merge: Add tests for tracked company admin endpoints and scheduler integration

Closes #1656
2026-04-20 23:05:57 +00:00
agent-company 6165d66760 Fix scheduler tests to use get_db_client after scheduler refactor
The scheduler was refactored (PR #1665) to use the pooled
get_db_client() from SPARC.auth instead of creating its own
DatabaseClient. Update test mocks accordingly and remove the
db.close() assertion since the pooled client is no longer closed
by the scheduler.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-20 23:05:42 +00:00
agent-company e610dea9a9 Merge remote-tracking branch 'origin/main' into feature/1656-tracked-company-admin-tests 2026-04-20 23:04:59 +00:00
AI-Manager b5f10d2032 Merge pull request 'Add API tests for export endpoints (CSV and PDF)' (#1668) from feature/1655-export-endpoint-tests into main
Merge: Add API tests for export endpoints (CSV and PDF)

Closes #1655
2026-04-20 23:04:23 +00:00
AI-Manager b5d8b0b344 Merge pull request 'Add webhook integration tests for retry logic and payloads' (#1666) from feature/1657-webhook-integration-tests into main
Merge: Add webhook integration tests for retry logic and payloads

Closes #1657
2026-04-20 23:04:19 +00:00
AI-Manager 1170356b2b Merge pull request 'Add S3/MinIO storage backend tests for storage.py' (#1663) from feature/1660-s3-storage-tests into main
Merge: Add S3/MinIO storage backend tests for storage.py

Closes #1660
2026-04-20 23:04:05 +00:00
AI-Manager 84341b3ec4 Merge pull request 'Add test coverage for analyze_single_patent auto-download path' (#1662) from feature/1661-analyze-single-patent-tests into main
Merge: Add test coverage for analyze_single_patent auto-download path

Closes #1661
2026-04-20 23:04:00 +00:00
AI-Manager 0639fb3649 Merge pull request 'Update ROADMAP.md to reflect completed work and add next-horizon items' (#1664) from feature/1659-update-roadmap into main
Merge: Update ROADMAP.md to reflect completed work and add next-horizon items

Closes #1659
2026-04-20 23:03:56 +00:00
AI-Manager b032bf0c90 Merge pull request 'Refactor scheduler.py to use pooled DatabaseClient' (#1665) from feature/1658-scheduler-pooled-db into main
Merge: Refactor scheduler.py to use pooled DatabaseClient

Closes #1658
2026-04-20 23:03:43 +00:00
agent-company a2f81b0396 Add test coverage for analyze_single_patent auto-download path
7 test cases covering:
- PDF on disk analyzed directly (no download)
- Auto-download from cached metadata link when PDF missing
- FileNotFoundError when no cached link available
- Cached patent without pdf_link raises FileNotFoundError
- Analysis pipeline failure returns error string gracefully
- Model override parameter forwarded to LLM
- FileNotFoundError during parsing re-raised (not swallowed)

Closes leeworks-agents/SPARC#1661

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-20 19:21:53 +00:00
agent-company 63ca18e9bf Add S3/MinIO storage backend tests for storage.py
21 test cases covering:
- S3StorageBackend: read, write, exists, path_for with mocked boto3
- Error handling: NoSuchKey exception, generic 404, non-404 re-raise
- Bucket auto-creation on init and graceful handling of creation failure
- Constructor credential/endpoint passthrough
- LocalStorageBackend: round-trip read/write, missing file, empty file
- get_storage_backend() factory: local/s3 selection, case-insensitivity

Closes leeworks-agents/SPARC#1660

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-20 19:20:06 +00:00
agent-company 4cb1a6ed21 Update ROADMAP.md to reflect completed work and add next-horizon items
Move all completed items (security hardening, structured logging, dark mode,
export, webhooks, scheduled analysis, multi-model, trend charts, CI, etc.)
into a new Completed section. Reorganize remaining P1/P2/P3 items to reflect
current priorities. Add new next-horizon items: historical diffing, patent
classification tagging, user API keys, batch export, and multi-tenant support.

Closes leeworks-agents/SPARC#1659

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-20 19:18:22 +00:00
agent-company 2eabb1d704 Add webhook integration tests covering retry logic and Slack/Discord payloads
22 test cases covering:
- Slack/Discord URL detection
- Generic vs Slack payload formatting
- Exponential backoff retry logic with network/timeout error handling
- Multi-URL dispatch with format auto-detection
- notify_job_completed() and notify_alert() helpers

Closes leeworks-agents/SPARC#1657

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-20 19:15:34 +00:00
agent-company fc942b2aa4 Add tests for tracked company admin endpoints and scheduler integration
20 test cases covering:
- GET/POST/DELETE /admin/tracked endpoints with admin auth enforcement
- GET /admin/alerts with limit parameter and auth
- scheduler.run_scheduled_analysis() for multi-company analysis, alert
  triggering on significant patent count changes, graceful failure handling

Closes leeworks-agents/SPARC#1656

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-20 19:14:29 +00:00
agent-company 44a162056d Add API tests for export endpoints (CSV and PDF)
Covers GET /export/{company_name} and /export/{company_name}/pdf with
13 test cases: successful export, 404 on missing data, auth enforcement,
filename sanitization, XML-special character handling in PDF, and
multi-row output validation.

Closes leeworks-agents/SPARC#1655

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-20 19:11:42 +00:00
15 changed files with 2561 additions and 103 deletions
+120 -85
View File
@@ -7,86 +7,124 @@ Semiconductor Patent & Analytics Report Core -- development priorities.
SPARC is a patent analysis platform with a working end-to-end pipeline:
Python/FastAPI backend, React/TypeScript frontend, PostgreSQL for persistence
and caching, Docker Compose for local development, and Gitea Actions CI/CD for
image builds. Core features (patent retrieval via SerpAPI, PDF parsing, LLM
analysis via OpenRouter/Claude, batch processing, JWT authentication, analytics
dashboard) are all implemented and functional.
image builds and testing. Core features include patent retrieval via SerpAPI,
PDF parsing, LLM analysis via OpenRouter (multi-model: Claude, GPT-4o, Gemini,
Llama), batch processing, JWT authentication, analytics dashboard with patent
trend charts, scheduled recurring analysis with alerting, webhook notifications
(Slack/Discord), CSV and PDF export, S3/MinIO storage, side-by-side company
comparison, and dark mode.
---
## Completed
Items that have been implemented and merged into main.
### Security hardening
- ~~Rotate default JWT secret.~~ Startup check refuses to start with the
default secret in non-development environments.
- ~~CORS allow-origins are hardcoded.~~ Allowed origins are now configurable
via environment variable.
- ~~Database credentials in docker-compose.yml.~~ Compose references `.env`
for sensitive values.
### Error handling and resilience
- ~~`get_db_client()` creates a new `DatabaseClient` on every call.~~ Refactored
to a shared pooled singleton initialized at startup.
- ~~No rate limiting on auth endpoints.~~ Rate limiting middleware added to
`/auth/login` and `/auth/register`.
### Test coverage
- ~~API tests bypass authentication.~~ JWT auth integration tests added (33
cases covering registration, login, protected routes, token refresh, and
admin-only endpoints).
- ~~No test stage in CI.~~ Gitea Actions workflow now runs `pytest` and gates
the build.
- ~~No linting or type checking in CI.~~ `ruff` (Python) and `tsc --noEmit`
(TypeScript) added to CI pipeline.
### Backend
- ~~Add structured logging.~~ Python `logging` module used throughout.
- ~~Make LLM model configurable.~~ `MODEL` environment variable accepted;
multi-model support with per-analysis selection (GPT-4o, Gemini, Claude,
Llama).
- ~~SERP cache TTL hardcoded.~~ `SERP_CACHE_TTL_HOURS` exposed as env var.
- ~~Patent PDF storage.~~ S3/MinIO object storage backend added alongside
local filesystem. Volume mount requirement documented.
- ~~`analyze_single_patent` assumes local file.~~ Auto-download from cached
metadata link integrated.
- ~~`Patent.patent_id` typed as `int`.~~ Fixed to `str`.
### Frontend
- ~~No loading/error states.~~ Skeleton loaders and error states added to
Batch and Analytics pages.
- ~~No dark mode.~~ Full dark mode support with theme-aware chart colors.
- ~~Missing lockfile.~~ `package-lock.json` committed.
### Features (formerly P3)
- ~~Export analysis reports.~~ CSV and PDF export endpoints implemented.
- ~~Comparison view.~~ Side-by-side company patent portfolio comparison added.
- ~~Scheduled/recurring analysis.~~ APScheduler-based periodic re-analysis
with configurable interval and change-threshold alerting.
- ~~Webhook/notification support.~~ Slack, Discord, and generic HTTP POST
webhooks with retry logic.
- ~~Multi-model support.~~ Model picker in Analysis and Batch pages; backend
allow-list validation.
- ~~Patent trend charts.~~ Filing frequency and category distribution
visualizations added to Analytics page.
- ~~OpenAPI client generation.~~ TypeScript API client auto-generated from
FastAPI spec with CI freshness check.
### Resilience
- ~~`_jobs` dict is in-memory only.~~ Database-backed job persistence
implemented using `db.list_jobs()` and `mark_stale_jobs_failed()`. The
in-memory `_jobs` dict has been removed.
### Test coverage (P1/P2)
- ~~Export endpoint tests.~~ Tests added for CSV and PDF export endpoints.
- ~~Tracked company admin endpoint tests.~~ Tests added for `/admin/tracked`
CRUD endpoints and scheduler integration.
- ~~Webhook integration tests.~~ Tests added for retry logic, Slack/Discord
payload format, and multi-URL dispatch.
- ~~S3/MinIO storage backend tests.~~ Unit tests added for the S3 backend
(read, write, exists, delete, error handling).
- ~~`analyze_single_patent` auto-download path tests.~~ Tests added for the
auto-download fallback (cache lookup, PDF download, FileNotFoundError).
### Code quality
- ~~Scheduler creates its own DatabaseClient.~~ Refactored to use the
application-level pooled `get_db_client()`.
---
## P1 -- High Priority
These items address correctness, security, and reliability gaps that should be
resolved before broader production use.
### Security hardening
- **Rotate default JWT secret.** `auth.py` ships a fallback
`sparc-secret-key-change-in-production` that will be used if `JWT_SECRET` is
unset. Add a startup check that refuses to start with the default secret in
non-development environments.
- **CORS allow-origins are hardcoded.** `api.py` only permits
`localhost:3000` and `localhost:5173`. Make the allowed origins configurable
via environment variable so the dashboard works when deployed behind a real
domain.
- **Database credentials in docker-compose.yml.** The compose file embeds
`postgres:postgres` in plain text. Reference a `.env` file or Docker secrets
instead.
### Error handling and resilience
- **`get_db_client()` in `auth.py` creates a new `DatabaseClient` on every
call.** This bypasses the connection pool and can exhaust database
connections under load. Refactor to share a single pooled client.
- **`_jobs` dict is in-memory only.** Job state is lost on API restart. Persist
job status in PostgreSQL or Redis so async batch results survive restarts.
- **No rate limiting on auth endpoints.** `/auth/login` and `/auth/register`
are unprotected against brute-force or abuse. Add rate limiting middleware.
### Test coverage for auth and admin
- The existing API tests (`tests/test_api.py`) bypass authentication entirely.
Add tests that exercise the JWT flow: registration, login, protected-route
access, token refresh, and admin-only endpoints.
No outstanding P1 items. All previously listed items have been completed and
moved to the Completed section above.
---
## P2 -- Medium Priority
Improvements to usability, performance, and developer experience.
Improvements to the API surface.
### Backend
### API improvements
- **Add structured logging.** Replace `print()` calls throughout `analyzer.py`,
`serp_api.py`, and `llm.py` with Python `logging` so log levels and
formatting are consistent.
- **Make LLM model configurable.** `llm.py` hardcodes
`anthropic/claude-3.5-sonnet`. Accept a `MODEL` environment variable to allow
switching models without code changes.
- **SERP cache TTL is hardcoded to 24 hours.** Expose `SERP_CACHE_TTL_HOURS`
as an environment variable in `config.py`.
- **Patent PDF storage.** PDFs are saved to a local `patents/` directory. For
containerized deployments, consider object storage (S3/MinIO) or at minimum
document the volume mount requirement more prominently.
- **`analyze_single_patent` assumes local file path.** The method constructs
`patents/{patent_id}.pdf` and reads from disk, but does not download the PDF
first. Either integrate the download step or document the prerequisite.
- **`Patent.patent_id` typed as `int` in `types.py` but used as `str`
everywhere.** Fix the type annotation to `str`.
### Frontend
- **No loading/error states on several pages.** The Batch and Analytics pages
would benefit from skeleton loaders and user-friendly error messages.
- **No dark mode.** Tailwind is configured but no dark variant is applied.
- **Missing `package-lock.json` or `pnpm-lock.yaml`.** The frontend has no
lockfile committed, leading to non-reproducible builds.
### CI/CD
- **No test stage in the Gitea Actions workflow.** `build.yaml` builds and
pushes images but never runs `pytest`. Add a test job that gates the build.
- **No linting or type checking.** Add `ruff` (Python) and `tsc --noEmit`
(TypeScript) to CI.
- **API pagination.** The `/analyze/batch` endpoint needs cursor-based
pagination for large result sets. The `/jobs` endpoint already has cursor
pagination. *(Issue #1669)*
- **Request validation improvements.** Add stricter input validation for
company names (disallow special characters, enforce length limits).
*(Issue #1670)*
---
@@ -94,23 +132,20 @@ Improvements to usability, performance, and developer experience.
Lower-urgency enhancements and future features.
- **Export analysis reports.** Allow users to download analysis results as PDF
or CSV from the dashboard.
- **Comparison view.** Side-by-side comparison of two companies' patent
portfolios.
- **Scheduled/recurring analysis.** Periodically re-analyze tracked companies
and alert on significant changes.
- **Webhook/notification support.** Send alerts (Slack, Discord, email) when
batch jobs complete or when a company's innovation score changes
significantly.
- **Multi-model support.** Let users choose between LLM providers per analysis
(e.g., GPT-4o, Gemini, Claude) and compare outputs.
- **Patent trend charts.** Visualize patent filing frequency and technology
category distribution over time in the Analytics page.
- **API pagination.** The `/analyze/batch` and `/jobs` endpoints could benefit
from cursor-based pagination for large result sets.
- **OpenAPI client generation.** Auto-generate the TypeScript API client from
the FastAPI OpenAPI spec to keep frontend types in sync.
- **Historical analysis diffing.** Show what changed between two analysis runs
for the same company, highlighting new patents and score shifts.
- **Patent classification tagging.** Automatically tag patents by technology
domain (AI, semiconductors, materials science) using LLM classification.
- **User-level API keys.** Allow users to generate personal API keys for
programmatic access without JWT token refresh.
- **Batch export.** Export analysis results for multiple companies at once as
a ZIP archive.
- **Rate limiting dashboard.** Surface rate limit status and usage statistics
in the admin panel.
- **Async webhook delivery.** Move webhook delivery to a background task queue
(e.g., Celery, arq) to avoid blocking the scheduler.
- **Multi-tenant support.** Scope analysis results and tracked companies per
user or organization.
---
+159 -15
View File
@@ -12,10 +12,10 @@ from typing import TYPE_CHECKING, Annotated, List
if TYPE_CHECKING:
from SPARC.database import DatabaseClient
from fastapi import BackgroundTasks, Depends, FastAPI, HTTPException, Query, Request
from fastapi import BackgroundTasks, Depends, FastAPI, HTTPException, Path, Query, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse, StreamingResponse
from pydantic import BaseModel, EmailStr, Field
from pydantic import BaseModel, EmailStr, Field, StringConstraints
from slowapi import Limiter
from slowapi.errors import RateLimitExceeded
from slowapi.util import get_remote_address
@@ -36,6 +36,16 @@ from SPARC.auth import (
)
from SPARC.types import BatchAnalysisResult, CompanyAnalysisResult
# Validated company name type: 2-100 chars, alphanumeric + spaces/hyphens/ampersands/periods only.
CompanyName = Annotated[
str,
StringConstraints(
min_length=2,
max_length=100,
pattern=r"^[a-zA-Z0-9][a-zA-Z0-9 \-&.]*$",
),
]
# Pydantic models for API
class CompanyAnalysisResponse(BaseModel):
@@ -72,7 +82,7 @@ class CompanyAnalysisRequest(BaseModel):
class BatchAnalysisRequest(BaseModel):
"""Request model for batch company analysis."""
companies: list[str] = Field(
companies: list[CompanyName] = Field(
..., min_length=1, max_length=20, description="List of company names to analyze"
)
max_workers: int = Field(
@@ -96,6 +106,24 @@ class JobStatus(BaseModel):
error: str | None = None
class AnalysisRecord(BaseModel):
"""A single stored analysis result."""
id: int
company_name: str | None = None
analysis_type: str | None = None
model: str | None = None
response: str | None = None
timestamp: datetime | None = None
class PaginatedAnalysisResponse(BaseModel):
"""Paginated response for analysis result listings."""
items: list[AnalysisRecord]
next_cursor: str | None = None
class PaginatedJobsResponse(BaseModel):
"""Paginated response for job listings."""
@@ -196,11 +224,16 @@ async def lifespan(app: FastAPI):
import logging
logging.getLogger(__name__).warning("Marked %d stale jobs as failed on startup", stale)
_db.close()
# Start webhook background worker
from SPARC.task_queue import start_worker as start_webhook_worker
from SPARC.task_queue import stop_worker as stop_webhook_worker
start_webhook_worker()
# Start scheduled analysis if tracked companies are configured
from SPARC.scheduler import start_scheduler
start_scheduler()
yield
# Cleanup
stop_webhook_worker()
_analyzer = None
close_db_client()
@@ -217,10 +250,37 @@ app = FastAPI(
limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
# In-memory rate limit statistics
_rate_limit_stats: dict[str, dict] = {}
def _track_rate_limit_request(endpoint: str, ip: str, rejected: bool = False) -> None:
"""Record a request against a rate-limited endpoint."""
key = endpoint
if key not in _rate_limit_stats:
_rate_limit_stats[key] = {
"endpoint": endpoint,
"total_requests": 0,
"rejected_requests": 0,
"by_ip": {},
}
_rate_limit_stats[key]["total_requests"] += 1
if rejected:
_rate_limit_stats[key]["rejected_requests"] += 1
ip_stats = _rate_limit_stats[key].setdefault("by_ip", {})
if ip not in ip_stats:
ip_stats[ip] = {"total": 0, "rejected": 0}
ip_stats[ip]["total"] += 1
if rejected:
ip_stats[ip]["rejected"] += 1
@app.exception_handler(RateLimitExceeded)
async def rate_limit_handler(request: Request, exc: RateLimitExceeded):
"""Return 429 with Retry-After header when rate limit is exceeded."""
endpoint = request.url.path
ip = get_remote_address(request)
_track_rate_limit_request(endpoint, ip, rejected=True)
retry_after = getattr(exc, "retry_after", 60)
return JSONResponse(
status_code=429,
@@ -249,6 +309,7 @@ async def register(request: Request, body: RegisterRequest):
The first registered user automatically becomes an admin.
"""
_track_rate_limit_request("/auth/register", get_remote_address(request))
db = get_db_client()
# First user becomes admin
@@ -279,6 +340,7 @@ async def register(request: Request, body: RegisterRequest):
@limiter.limit("10/minute")
async def login(request: Request, body: LoginRequest):
"""Authenticate user and return JWT tokens."""
_track_rate_limit_request("/auth/login", get_remote_address(request))
db = get_db_client()
user = db.authenticate_user(body.email, body.password)
@@ -405,7 +467,7 @@ async def delete_user(
class TrackCompanyRequest(BaseModel):
"""Request to add a company to tracking."""
company_name: str = Field(..., min_length=1, max_length=255)
company_name: CompanyName = Field(...)
@app.get("/admin/tracked", tags=["Admin"])
@@ -432,7 +494,7 @@ async def add_tracked_company(
@app.delete("/admin/tracked/{company_name}", tags=["Admin"])
async def remove_tracked_company(
company_name: str,
company_name: Annotated[str, Path(min_length=2, max_length=100, pattern=r"^[a-zA-Z0-9][a-zA-Z0-9 \-&.]*$")],
_: UserResponse = Depends(get_current_admin),
):
"""Remove a company from the tracked list (admin only)."""
@@ -443,6 +505,36 @@ async def remove_tracked_company(
return {"message": f"Stopped tracking {company_name}"}
@app.get("/admin/rate-limits", tags=["Admin"])
async def get_rate_limit_stats(
_: UserResponse = Depends(get_current_admin),
):
"""Get rate limit status and usage statistics (admin only).
Returns current rate limit configuration and request statistics
for all rate-limited endpoints.
Returns:
List of rate limit stats per endpoint with total/rejected counts
"""
rate_limits_config = {
"/auth/register": {"limit": "5/minute"},
"/auth/login": {"limit": "10/minute"},
}
results = []
for endpoint, conf in rate_limits_config.items():
stats = _rate_limit_stats.get(endpoint, {})
results.append({
"endpoint": endpoint,
"limit": conf["limit"],
"total_requests": stats.get("total_requests", 0),
"rejected_requests": stats.get("rejected_requests", 0),
})
return {"rate_limits": results}
@app.get("/admin/alerts", tags=["Admin"])
async def list_alerts(
limit: int = Query(default=50, ge=1, le=200),
@@ -590,7 +682,7 @@ async def get_analytics_trends(
@app.get("/export/{company_name}", tags=["Export"])
async def export_company_csv(
company_name: str,
company_name: Annotated[str, Path(min_length=2, max_length=100, pattern=r"^[a-zA-Z0-9][a-zA-Z0-9 \-&.]*$")],
_: UserResponse = Depends(get_current_user),
):
"""Export analysis results for a company as a CSV file.
@@ -642,7 +734,7 @@ async def export_company_csv(
@app.get("/export/{company_name}/pdf", tags=["Export"])
async def export_company_pdf(
company_name: str,
company_name: Annotated[str, Path(min_length=2, max_length=100, pattern=r"^[a-zA-Z0-9][a-zA-Z0-9 \-&.]*$")],
_: UserResponse = Depends(get_current_user),
):
"""Export analysis results for a company as a formatted PDF report.
@@ -816,7 +908,7 @@ async def health_check():
tags=["Analysis"],
)
async def analyze_company(
company_name: str,
company_name: Annotated[str, Path(min_length=2, max_length=100, pattern=r"^[a-zA-Z0-9][a-zA-Z0-9 \-&.]*$")],
model: str | None = Query(default=None, description="LLM model to use (e.g. 'openai/gpt-4o'). Defaults to server config."),
_: UserResponse = Depends(get_current_user),
):
@@ -846,7 +938,7 @@ async def analyze_company(
)
async def analyze_single_patent(
patent_id: str,
company_name: str = Query(description="Company name for analysis context"),
company_name: Annotated[str, Query(min_length=2, max_length=100, pattern=r"^[a-zA-Z0-9][a-zA-Z0-9 \-&.]*$", description="Company name for analysis context")],
_: UserResponse = Depends(get_current_user),
):
"""Analyze a single patent by its publication ID.
@@ -872,6 +964,58 @@ async def analyze_single_patent(
raise HTTPException(status_code=404, detail=str(e))
@app.get(
"/analyze/batch",
response_model=PaginatedAnalysisResponse,
tags=["Analysis"],
)
async def list_analysis_results(
company_name: Annotated[
str | None,
Query(description="Filter results by company name"),
] = None,
limit: Annotated[int, Query(ge=1, le=200)] = 50,
cursor: Annotated[
str | None,
Query(description="Opaque cursor from a previous response's next_cursor field"),
] = None,
_: UserResponse = Depends(get_current_user),
):
"""List stored analysis results with cursor-based pagination.
Returns past analysis results ordered by timestamp descending. Use
``limit`` to control page size (default 50, max 200). The response
includes a ``next_cursor`` field; pass it back as the ``cursor`` query
parameter to fetch the next page. When ``next_cursor`` is ``null``,
there are no more results.
Args:
company_name: Optional filter by company name
limit: Maximum number of results to return (default 50, max 200)
cursor: Opaque pagination cursor from a previous response
Returns:
Paginated list of analysis results
"""
db = _get_job_db()
rows = db.list_analyses(company_name=company_name, limit=limit + 1, cursor=cursor)
has_next = len(rows) > limit
if has_next:
rows = rows[:limit]
items = [AnalysisRecord(**row) for row in rows]
next_cursor = None
if has_next and rows:
last = rows[-1]
ts = last["timestamp"]
ts_str = ts.isoformat() if hasattr(ts, "isoformat") else str(ts)
next_cursor = f"{ts_str}|{last['id']}"
return PaginatedAnalysisResponse(items=items, next_cursor=next_cursor)
@app.post(
"/analyze/batch",
response_model=BatchAnalysisResponse,
@@ -965,9 +1109,9 @@ def _run_batch_job(job_id: str, companies: list[str], max_workers: int, model: s
progress=100,
result_json=_json.dumps(batch_response.model_dump(), default=str),
)
# Fire webhook notification
from SPARC.webhooks import notify_job_completed
notify_job_completed(
# Fire webhook notification (non-blocking via task queue)
from SPARC.webhooks import enqueue_job_completed
enqueue_job_completed(
job_id=job_id,
status="completed",
total_companies=result.total_companies,
@@ -976,8 +1120,8 @@ def _run_batch_job(job_id: str, companies: list[str], max_workers: int, model: s
)
except Exception as e:
db.update_job(job_id, status="failed", error=str(e))
from SPARC.webhooks import notify_job_completed
notify_job_completed(
from SPARC.webhooks import enqueue_job_completed
enqueue_job_completed(
job_id=job_id,
status="failed",
total_companies=len(companies),
@@ -1047,7 +1191,7 @@ async def list_jobs(
str | None,
Query(description="Filter by status: pending, running, completed, failed"),
] = None,
limit: Annotated[int, Query(ge=1, le=100)] = 10,
limit: Annotated[int, Query(ge=1, le=200)] = 50,
cursor: Annotated[
str | None,
Query(description="Opaque cursor from a previous response's next_cursor field"),
+42
View File
@@ -371,6 +371,48 @@ class DatabaseClient:
cursor.execute(query, params)
return [dict(row) for row in cursor.fetchall()]
def list_analyses(
self,
company_name: Optional[str] = None,
limit: int = 50,
cursor: Optional[str] = None,
) -> List[Dict]:
"""List analysis results with cursor-based pagination.
Args:
company_name: Optional filter by company name.
limit: Maximum number of records to return.
cursor: Opaque cursor (``timestamp|id``) from a previous response.
Returns:
List of analysis dicts ordered by timestamp descending.
"""
conditions: list[str] = ["is_cached = FALSE"]
params: list = []
if company_name:
conditions.append("LOWER(company_name) = LOWER(%s)")
params.append(company_name)
if cursor:
try:
ts_str, cursor_id = cursor.rsplit("|", 1)
conditions.append("(timestamp, id) < (%s, %s)")
params.extend([ts_str, int(cursor_id)])
except (ValueError, TypeError):
pass # Ignore malformed cursors; return from start
query = "SELECT id, company_name, analysis_type, model, response, timestamp FROM llm_messages"
if conditions:
query += " WHERE " + " AND ".join(conditions)
query += " ORDER BY timestamp DESC, id DESC LIMIT %s"
params.append(limit)
with self.get_conn() as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute(query, params)
return [dict(row) for row in cur.fetchall()]
def get_analytics(self, days: int = 30) -> Dict:
"""Get analytics on message usage.
+7
View File
@@ -71,6 +71,13 @@ def run_scheduled_analysis() -> None:
old_value=old_count,
new_value=new_count,
)
# Fire non-blocking webhook notification
from SPARC.webhooks import enqueue_alert
enqueue_alert(
company_name=name,
alert_type="patent_count_change",
message=message,
)
elif new_count > 0:
# First analysis -- record baseline
logger.info("Baseline for %s: %d patents", name, new_count)
+113
View File
@@ -0,0 +1,113 @@
"""Lightweight in-process task queue for non-blocking webhook delivery.
Uses a daemon thread and a :class:`queue.Queue` so that the scheduler and
background jobs can enqueue webhook deliveries without blocking on HTTP
round-trips and retry backoff.
No external dependencies (Redis, etc.) are required.
"""
import logging
import queue
import threading
from dataclasses import dataclass
from typing import Any
logger = logging.getLogger(__name__)
@dataclass(frozen=True)
class WebhookTask:
"""A single webhook delivery request."""
url: str
payload: dict[str, Any]
# ---------------------------------------------------------------------------
# Module-level singleton
# ---------------------------------------------------------------------------
_queue: queue.Queue[WebhookTask | None] = queue.Queue()
_worker_thread: threading.Thread | None = None
_started = threading.Event()
def _worker_loop() -> None:
"""Process webhook tasks until a ``None`` sentinel is received."""
import SPARC.webhooks as _webhooks # deferred to avoid circular import
logger.info("Webhook worker thread started")
_started.set()
while True:
task = _queue.get()
if task is None:
# Sentinel — shut down
logger.info("Webhook worker thread stopping")
_queue.task_done()
break
try:
# Look up dynamically so that tests can patch the function
_webhooks._send_with_retry(task.url, task.payload)
except Exception:
logger.exception("Unexpected error delivering webhook to %s", task.url)
finally:
_queue.task_done()
def start_worker() -> None:
"""Start the background worker thread (idempotent)."""
global _worker_thread
if _worker_thread is not None and _worker_thread.is_alive():
return
_started.clear()
_worker_thread = threading.Thread(target=_worker_loop, daemon=True, name="webhook-worker")
_worker_thread.start()
_started.wait() # block until the worker is actually running
logger.info("Webhook task queue ready")
def stop_worker(timeout: float = 5.0) -> None:
"""Send the stop sentinel and wait for the worker to finish.
Args:
timeout: Maximum seconds to wait for the worker thread to join.
"""
global _worker_thread
if _worker_thread is None or not _worker_thread.is_alive():
_worker_thread = None
return
_queue.put(None) # sentinel
_worker_thread.join(timeout=timeout)
_worker_thread = None
logger.info("Webhook task queue stopped")
def enqueue(task: WebhookTask) -> None:
"""Add a webhook delivery task to the queue.
If the worker has not been started the task is still accepted into the
queue and will be processed once :func:`start_worker` is called.
"""
_queue.put(task)
def queue_size() -> int:
"""Return the approximate number of pending tasks."""
return _queue.qsize()
def drain(timeout: float = 10.0) -> None:
"""Block until all currently-enqueued tasks have been processed.
Useful in tests and graceful shutdown to ensure pending deliveries
complete before the process exits.
Args:
timeout: Maximum seconds to wait.
"""
_queue.join()
+58 -3
View File
@@ -91,9 +91,10 @@ def _send_with_retry(url: str, payload: dict) -> bool:
def notify(event_type: str, data: dict[str, Any]) -> None:
"""Fire all configured webhooks for an event.
"""Fire all configured webhooks for an event (**blocking**).
Safe to call even when no webhooks are configured (returns immediately).
For non-blocking delivery, use :func:`enqueue_notify` instead.
Args:
event_type: Event identifier (e.g., "job_completed", "patent_alert")
@@ -108,6 +109,29 @@ def notify(event_type: str, data: dict[str, Any]) -> None:
_send_with_retry(url, payload)
def enqueue_notify(event_type: str, data: dict[str, Any]) -> None:
"""Enqueue webhook delivery for all configured URLs (non-blocking).
Returns immediately after placing tasks on the background queue.
The worker thread handles retry logic asynchronously.
Safe to call even when no webhooks are configured.
Args:
event_type: Event identifier (e.g., "job_completed", "patent_alert")
data: Event data to include in the payload
"""
if not WEBHOOK_URLS:
return
from SPARC.task_queue import WebhookTask, enqueue
for url in WEBHOOK_URLS:
slack = _is_slack_url(url)
payload = _build_payload(event_type, data, slack=slack)
enqueue(WebhookTask(url=url, payload=payload))
def notify_job_completed(
job_id: str,
status: str,
@@ -115,7 +139,7 @@ def notify_job_completed(
successful: int,
failed: int,
) -> None:
"""Send notification when a batch job completes."""
"""Send notification when a batch job completes (blocking)."""
notify("job_completed", {
"job_id": job_id,
"status": status,
@@ -126,14 +150,45 @@ def notify_job_completed(
})
def enqueue_job_completed(
job_id: str,
status: str,
total_companies: int,
successful: int,
failed: int,
) -> None:
"""Enqueue notification when a batch job completes (non-blocking)."""
enqueue_notify("job_completed", {
"job_id": job_id,
"status": status,
"total_companies": total_companies,
"successful": successful,
"failed": failed,
"summary": f"Batch job {job_id}: {successful}/{total_companies} succeeded",
})
def notify_alert(
company_name: str,
alert_type: str,
message: str,
) -> None:
"""Send notification for a tracked company alert."""
"""Send notification for a tracked company alert (blocking)."""
notify("patent_alert", {
"company_name": company_name,
"alert_type": alert_type,
"message": message,
})
def enqueue_alert(
company_name: str,
alert_type: str,
message: str,
) -> None:
"""Enqueue notification for a tracked company alert (non-blocking)."""
enqueue_notify("patent_alert", {
"company_name": company_name,
"alert_type": alert_type,
"message": message,
})
+211
View File
@@ -0,0 +1,211 @@
"""Tests for analyze_single_patent auto-download path.
Covers issue #1661:
- PDF exists on disk: direct analysis (happy path)
- PDF not on disk, cached link exists: auto-download and analyze
- PDF not on disk, no cached link: FileNotFoundError
- Analysis failure after PDF found: graceful error message
- Model override parameter passthrough
"""
import os
from unittest.mock import MagicMock, patch
import pytest
from SPARC.analyzer import CompanyAnalyzer
from SPARC.types import Patent
@pytest.fixture(autouse=True)
def mock_db(mocker):
"""Mock DatabaseClient so no real DB is needed."""
mock_db_cls = mocker.patch("SPARC.analyzer.DatabaseClient")
mock_db_instance = MagicMock()
mock_db_instance.get_cached_patent.return_value = None
mock_db_instance.get_cached_serp_query.return_value = None
mock_db_cls.return_value = mock_db_instance
return mock_db_instance
@pytest.fixture
def analyzer(mocker, mock_db):
"""Create a CompanyAnalyzer with mocked LLM and DB."""
mocker.patch("SPARC.analyzer.LLMAnalyzer")
return CompanyAnalyzer(openrouter_api_key="test-key")
class TestAnalyzeSinglePatentAutoDownload:
"""Test the auto-download logic in analyze_single_patent."""
def test_pdf_on_disk_analyzed_directly(self, analyzer, mocker, tmp_path):
"""When PDF exists on disk, it is analyzed directly without download."""
patent_id = "US-11234567-B2"
# Create the patents dir and PDF file
patents_dir = tmp_path / "patents"
patents_dir.mkdir()
pdf_path = patents_dir / f"{patent_id}.pdf"
pdf_path.write_bytes(b"fake PDF content")
mock_parse = mocker.patch("SPARC.analyzer.SERP.parse_patent_pdf")
mock_minimize = mocker.patch("SPARC.analyzer.SERP.minimize_patent_for_llm")
mock_parse.return_value = {"abstract": "test", "claims": "test claims"}
mock_minimize.return_value = "minimized content"
analyzer.llm_analyzer.analyze_patent_content.return_value = "Good patent."
# Change cwd so patents/{patent_id}.pdf resolves to our tmp_path
original_cwd = os.getcwd()
os.chdir(tmp_path)
try:
result = analyzer.analyze_single_patent(patent_id, "TestCo")
finally:
os.chdir(original_cwd)
assert result == "Good patent."
# DB cache should not have been queried since file existed
analyzer.db.get_cached_patent.assert_not_called()
def test_auto_download_from_cached_link(self, analyzer, mocker, tmp_path):
"""When PDF is not on disk but link is cached, auto-download occurs."""
patent_id = "US-99887766-A1"
# No patents dir exists (PDF not on disk)
mock_save = mocker.patch("SPARC.analyzer.SERP.save_patents")
downloaded_patent = Patent(patent_id=patent_id, pdf_link="https://example.com/patent.pdf")
downloaded_patent.pdf_path = f"patents/{patent_id}.pdf"
mock_save.return_value = downloaded_patent
# Cached patent has a PDF link
analyzer.db.get_cached_patent.return_value = {
"patent_id": patent_id,
"pdf_link": "https://example.com/patent.pdf",
}
# Mock the rest of the analysis pipeline
mock_parse = mocker.patch("SPARC.analyzer.SERP.parse_patent_pdf")
mock_minimize = mocker.patch("SPARC.analyzer.SERP.minimize_patent_for_llm")
mock_parse.return_value = {"abstract": "test abstract"}
mock_minimize.return_value = "minimized content"
analyzer.llm_analyzer.analyze_patent_content.return_value = "Strong innovation."
# Change cwd so patents/{patent_id}.pdf does NOT exist
original_cwd = os.getcwd()
os.chdir(tmp_path)
try:
result = analyzer.analyze_single_patent(patent_id, "DownloadCo")
finally:
os.chdir(original_cwd)
assert result == "Strong innovation."
analyzer.db.get_cached_patent.assert_called_once_with(patent_id)
mock_save.assert_called_once()
# Verify the Patent passed to save_patents has the correct ID and link
saved_patent = mock_save.call_args[0][0]
assert saved_patent.patent_id == patent_id
assert saved_patent.pdf_link == "https://example.com/patent.pdf"
def test_no_cached_link_raises_file_not_found(self, analyzer, mocker, tmp_path):
"""When PDF is not on disk and no cached link, FileNotFoundError raised."""
patent_id = "US-00000000-X1"
analyzer.db.get_cached_patent.return_value = None
original_cwd = os.getcwd()
os.chdir(tmp_path)
try:
with pytest.raises(FileNotFoundError, match="no download link is cached"):
analyzer.analyze_single_patent(patent_id, "MissingCo")
finally:
os.chdir(original_cwd)
def test_cached_patent_without_pdf_link_raises(self, analyzer, mocker, tmp_path):
"""When cached patent exists but has no pdf_link, FileNotFoundError raised."""
patent_id = "US-11111111-B1"
analyzer.db.get_cached_patent.return_value = {
"patent_id": patent_id,
"pdf_link": None,
}
original_cwd = os.getcwd()
os.chdir(tmp_path)
try:
with pytest.raises(FileNotFoundError, match="no download link is cached"):
analyzer.analyze_single_patent(patent_id, "NoPDFCo")
finally:
os.chdir(original_cwd)
def test_analysis_exception_returns_error_message(self, analyzer, mocker, tmp_path):
"""When analysis pipeline fails, returns error string instead of raising."""
patent_id = "US-22222222-A2"
# Create the PDF on disk so it skips download
patents_dir = tmp_path / "patents"
patents_dir.mkdir()
(patents_dir / f"{patent_id}.pdf").write_bytes(b"fake PDF")
# Parse fails
mocker.patch(
"SPARC.analyzer.SERP.parse_patent_pdf",
side_effect=ValueError("Corrupt PDF"),
)
original_cwd = os.getcwd()
os.chdir(tmp_path)
try:
result = analyzer.analyze_single_patent(patent_id, "ErrorCo")
finally:
os.chdir(original_cwd)
assert "Failed to analyze patent" in result
assert "Corrupt PDF" in result
def test_model_override_passed_to_llm(self, analyzer, mocker, tmp_path):
"""The model parameter is forwarded to the LLM analyzer."""
patent_id = "US-33333333-B2"
patents_dir = tmp_path / "patents"
patents_dir.mkdir()
(patents_dir / f"{patent_id}.pdf").write_bytes(b"fake PDF")
mocker.patch("SPARC.analyzer.SERP.parse_patent_pdf", return_value={"abstract": "test"})
mocker.patch("SPARC.analyzer.SERP.minimize_patent_for_llm", return_value="content")
analyzer.llm_analyzer.analyze_patent_content.return_value = "Analysis result."
original_cwd = os.getcwd()
os.chdir(tmp_path)
try:
result = analyzer.analyze_single_patent(
patent_id, "ModelCo", model="openai/gpt-4o"
)
finally:
os.chdir(original_cwd)
assert result == "Analysis result."
analyzer.llm_analyzer.analyze_patent_content.assert_called_once_with(
patent_content="content",
company_name="ModelCo",
model="openai/gpt-4o",
)
def test_file_not_found_during_parse_re_raised(self, analyzer, mocker, tmp_path):
"""FileNotFoundError during parsing is re-raised, not caught."""
patent_id = "US-44444444-C1"
patents_dir = tmp_path / "patents"
patents_dir.mkdir()
(patents_dir / f"{patent_id}.pdf").write_bytes(b"fake PDF")
mocker.patch(
"SPARC.analyzer.SERP.parse_patent_pdf",
side_effect=FileNotFoundError("PDF file vanished"),
)
original_cwd = os.getcwd()
os.chdir(tmp_path)
try:
with pytest.raises(FileNotFoundError, match="PDF file vanished"):
analyzer.analyze_single_patent(patent_id, "VanishCo")
finally:
os.chdir(original_cwd)
+157
View File
@@ -0,0 +1,157 @@
"""Tests for company name input validation on analysis endpoints."""
from datetime import datetime
from unittest.mock import Mock
import pytest
from fastapi.testclient import TestClient
from SPARC.api import app
from SPARC.types import CompanyAnalysisResult
@pytest.fixture
def client():
"""Create test client."""
return TestClient(app)
@pytest.fixture
def mock_analyzer(mocker):
"""Mock the global analyzer so valid requests succeed."""
mock = Mock()
mock._analyze_company_safe.return_value = CompanyAnalysisResult(
company_name="nvidia",
analysis="Test analysis",
patent_count=1,
success=True,
timestamp=datetime.now(),
)
mocker.patch("SPARC.api._analyzer", mock)
return mock
class TestCompanyNameValidation:
"""Test that company names are validated on analysis endpoints."""
# --- Too short ---
def test_single_char_rejected(self, client, mock_analyzer):
"""A one-character company name should be rejected."""
response = client.get("/analyze/X")
assert response.status_code == 422
# --- Too long ---
def test_over_100_chars_rejected(self, client, mock_analyzer):
"""A company name longer than 100 characters should be rejected."""
long_name = "A" * 101
response = client.get(f"/analyze/{long_name}")
assert response.status_code == 422
# --- Special characters ---
@pytest.mark.parametrize(
"bad_name",
[
"nvidia!",
"intel@corp",
"test#company",
"foo$bar",
"a%b",
"x^y",
"semi;colon",
"drop'table",
'say"hello',
"path/traversal",
"back\\slash",
"pipe|char",
"star*glob",
"question?mark",
"<script>",
"curly{brace}",
"equal=sign",
"plus+plus",
"comma,separated",
],
)
def test_special_chars_rejected(self, client, mock_analyzer, bad_name):
"""Company names with disallowed special characters should be rejected."""
response = client.get(f"/analyze/{bad_name}")
assert response.status_code == 422
# --- Valid names ---
@pytest.mark.parametrize(
"valid_name",
[
"nvidia",
"Intel",
"TSMC",
"Texas Instruments",
"Johnson-Johnson",
"AT&T",
"St. Jude Medical",
"3M",
"21st Century Fox",
"ab", # minimum length
"A" * 100, # maximum length
],
)
def test_valid_names_accepted(self, client, mock_analyzer, valid_name):
"""Valid company names should be accepted (200, not 422)."""
response = client.get(f"/analyze/{valid_name}")
# Should not be a validation error; 200 or other non-422 status is fine
assert response.status_code != 422
# --- Batch endpoint validation ---
def test_batch_too_short_rejected(self, client, mock_analyzer):
"""Batch endpoint should reject company names that are too short."""
response = client.post(
"/analyze/batch",
json={"companies": ["X"]},
)
assert response.status_code == 422
def test_batch_too_long_rejected(self, client, mock_analyzer):
"""Batch endpoint should reject company names that are too long."""
response = client.post(
"/analyze/batch",
json={"companies": ["A" * 101]},
)
assert response.status_code == 422
def test_batch_special_chars_rejected(self, client, mock_analyzer):
"""Batch endpoint should reject company names with special chars."""
response = client.post(
"/analyze/batch",
json={"companies": ["nvidia!", "intel"]},
)
assert response.status_code == 422
def test_batch_valid_names_accepted(self, client, mock_analyzer):
"""Batch endpoint should accept valid company names."""
response = client.post(
"/analyze/batch",
json={"companies": ["nvidia", "Intel", "AT&T"]},
)
assert response.status_code != 422
# --- Name must start with alphanumeric ---
def test_leading_space_rejected(self, client, mock_analyzer):
"""Company name starting with a space should be rejected."""
response = client.post(
"/analyze/batch",
json={"companies": [" nvidia"]},
)
assert response.status_code == 422
def test_leading_hyphen_rejected(self, client, mock_analyzer):
"""Company name starting with a hyphen should be rejected."""
response = client.post(
"/analyze/batch",
json={"companies": ["-nvidia"]},
)
assert response.status_code == 422
+224
View File
@@ -0,0 +1,224 @@
"""Tests for export endpoints: CSV and PDF export of analysis results.
Covers issue #1655:
- GET /export/{company_name} (CSV export)
- GET /export/{company_name}/pdf (PDF export)
All tests mock the database layer and use JWT auth fixtures from test_auth patterns.
"""
from datetime import datetime, timezone
from unittest.mock import MagicMock, patch
import pytest
from fastapi.testclient import TestClient
from SPARC.api import app
from SPARC.auth import create_access_token
@pytest.fixture
def client():
"""Create test client."""
return TestClient(app)
@pytest.fixture(autouse=True)
def mock_db():
"""Mock the database client used by export and auth endpoints."""
db = MagicMock()
# Default: user exists for auth
db.get_user_by_id.return_value = {
"id": 1,
"email": "user@test.com",
"role": "user",
"created_at": datetime(2025, 1, 1, tzinfo=timezone.utc),
}
# Mock get_conn for export queries
mock_cursor = MagicMock()
mock_conn = MagicMock()
mock_conn.cursor.return_value.__enter__ = MagicMock(return_value=mock_cursor)
mock_conn.cursor.return_value.__exit__ = MagicMock(return_value=False)
db.get_conn.return_value.__enter__ = MagicMock(return_value=mock_conn)
db.get_conn.return_value.__exit__ = MagicMock(return_value=False)
db._mock_cursor = mock_cursor
with patch("SPARC.api.get_db_client", return_value=db), \
patch("SPARC.auth.get_db_client", return_value=db):
yield db
def _auth_header():
"""Create an Authorization header with a valid access token."""
token = create_access_token(1, "user@test.com", "user")
return {"Authorization": f"Bearer {token}"}
def _sample_rows():
"""Return sample llm_messages rows as tuples (matching cursor.fetchall format)."""
return [
(
"NVIDIA",
"company_analysis",
"anthropic/claude-3.5-sonnet",
"Strong AI patent portfolio with focus on GPU architectures.",
datetime(2025, 6, 15, 10, 30, 0),
),
(
"NVIDIA",
"patent_analysis",
"openai/gpt-4o",
"Patent US-12345678-B2 covers novel tensor core design.",
datetime(2025, 6, 14, 9, 0, 0),
),
]
class TestCSVExport:
"""GET /export/{company_name} -- CSV export."""
def test_csv_export_success(self, client, mock_db):
"""Valid company with results returns a CSV file."""
mock_db._mock_cursor.fetchall.return_value = _sample_rows()
response = client.get("/export/NVIDIA", headers=_auth_header())
assert response.status_code == 200
assert response.headers["content-type"].startswith("text/csv")
assert "attachment" in response.headers.get("content-disposition", "")
assert "sparc_nvidia_export.csv" in response.headers["content-disposition"]
# Verify CSV content (CSV uses \r\n line endings)
lines = response.text.strip().split("\n")
assert len(lines) == 3 # header + 2 data rows
assert lines[0].strip() == "company_name,analysis_type,model,analysis,timestamp"
assert "NVIDIA" in lines[1]
assert "company_analysis" in lines[1]
def test_csv_export_no_results_returns_404(self, client, mock_db):
"""Unknown company returns 404."""
mock_db._mock_cursor.fetchall.return_value = []
response = client.get("/export/nonexistent", headers=_auth_header())
assert response.status_code == 404
assert "No analysis results found" in response.json()["detail"]
def test_csv_export_unauthenticated_returns_401(self, client):
"""Request without token returns 401."""
response = client.get("/export/NVIDIA")
assert response.status_code == 401
def test_csv_export_invalid_token_returns_401(self, client):
"""Request with invalid token returns 401."""
response = client.get(
"/export/NVIDIA",
headers={"Authorization": "Bearer invalid.token.here"},
)
assert response.status_code == 401
def test_csv_export_filename_sanitization(self, client, mock_db):
"""Company names with spaces get sanitized in the filename."""
mock_db._mock_cursor.fetchall.return_value = [
(
"Tesla Motors",
"company_analysis",
"anthropic/claude-3.5-sonnet",
"EV patent portfolio analysis.",
datetime(2025, 6, 15, 10, 0, 0),
),
]
response = client.get("/export/Tesla Motors", headers=_auth_header())
assert response.status_code == 200
assert "tesla_motors" in response.headers["content-disposition"]
def test_csv_export_single_row(self, client, mock_db):
"""Single analysis result produces valid CSV with one data row."""
mock_db._mock_cursor.fetchall.return_value = [_sample_rows()[0]]
response = client.get("/export/NVIDIA", headers=_auth_header())
assert response.status_code == 200
lines = response.text.strip().split("\n")
assert len(lines) == 2 # header + 1 data row
class TestPDFExport:
"""GET /export/{company_name}/pdf -- PDF report export."""
def test_pdf_export_success(self, client, mock_db):
"""Valid company with results returns a PDF file."""
mock_db._mock_cursor.fetchall.return_value = _sample_rows()
response = client.get("/export/NVIDIA/pdf", headers=_auth_header())
assert response.status_code == 200
assert response.headers["content-type"] == "application/pdf"
assert "attachment" in response.headers.get("content-disposition", "")
# PDF files start with %PDF
assert response.content[:4] == b"%PDF"
def test_pdf_export_no_results_returns_404(self, client, mock_db):
"""Unknown company returns 404."""
mock_db._mock_cursor.fetchall.return_value = []
response = client.get("/export/nonexistent/pdf", headers=_auth_header())
assert response.status_code == 404
assert "No analysis results found" in response.json()["detail"]
def test_pdf_export_unauthenticated_returns_401(self, client):
"""Request without token returns 401."""
response = client.get("/export/NVIDIA/pdf")
assert response.status_code == 401
def test_pdf_export_invalid_token_returns_401(self, client):
"""Request with invalid token returns 401."""
response = client.get(
"/export/NVIDIA/pdf",
headers={"Authorization": "Bearer invalid.token.here"},
)
assert response.status_code == 401
def test_pdf_export_filename_contains_date(self, client, mock_db):
"""PDF filename includes the analysis date."""
mock_db._mock_cursor.fetchall.return_value = _sample_rows()
response = client.get("/export/NVIDIA/pdf", headers=_auth_header())
assert response.status_code == 200
disposition = response.headers["content-disposition"]
assert "nvidia-analysis-" in disposition
assert ".pdf" in disposition
def test_pdf_export_special_chars_in_response(self, client, mock_db):
"""Analysis text with XML-special chars (<, >, &) does not break PDF generation."""
rows = [
(
"TestCo",
"company_analysis",
"anthropic/claude-3.5-sonnet",
"Revenue > $1B & growth <20% for Q4. Test <html> escaping.",
datetime(2025, 6, 15, 10, 0, 0),
),
]
mock_db._mock_cursor.fetchall.return_value = rows
response = client.get("/export/TestCo/pdf", headers=_auth_header())
assert response.status_code == 200
assert response.content[:4] == b"%PDF"
def test_pdf_export_multiple_analyses(self, client, mock_db):
"""Multiple analysis records produce a valid PDF with content."""
mock_db._mock_cursor.fetchall.return_value = _sample_rows()
response = client.get("/export/NVIDIA/pdf", headers=_auth_header())
assert response.status_code == 200
# PDF should have reasonable size (more than just headers)
assert len(response.content) > 500
+169
View File
@@ -0,0 +1,169 @@
"""Tests for cursor-based pagination on /analyze/batch GET and /jobs endpoints."""
from datetime import datetime, timedelta
from unittest.mock import Mock, patch
import pytest
from fastapi.testclient import TestClient
from SPARC.api import app
@pytest.fixture
def client():
"""Create test client."""
return TestClient(app)
def _make_analysis_row(id_: int, minutes_ago: int = 0, company: str = "nvidia"):
"""Create a fake analysis row dict."""
ts = datetime.now() - timedelta(minutes=minutes_ago)
return {
"id": id_,
"company_name": company,
"analysis_type": "patent_portfolio",
"model": "openai/gpt-4o",
"response": f"Analysis for {company}",
"timestamp": ts,
}
def _make_job_row(job_id: str, minutes_ago: int = 0, status: str = "completed"):
"""Create a fake job row dict."""
ts = datetime.now() - timedelta(minutes=minutes_ago)
return {
"job_id": job_id,
"status": status,
"progress": 100 if status == "completed" else 0,
"total_companies": 1,
"completed_companies": 1 if status == "completed" else 0,
"result": None,
"error": None,
"created_at": ts,
}
class TestAnalyzeBatchGetPagination:
"""Test cursor-based pagination on GET /analyze/batch."""
@patch("SPARC.api._get_job_db")
def test_returns_items_and_no_cursor_when_less_than_limit(self, mock_get_db, client):
"""When fewer results than limit, next_cursor should be null."""
db = Mock()
db.list_analyses.return_value = [
_make_analysis_row(1, minutes_ago=10),
_make_analysis_row(2, minutes_ago=20),
]
mock_get_db.return_value = db
response = client.get("/analyze/batch?limit=10")
assert response.status_code == 200
data = response.json()
assert len(data["items"]) == 2
assert data["next_cursor"] is None
@patch("SPARC.api._get_job_db")
def test_returns_cursor_when_more_results_exist(self, mock_get_db, client):
"""When more results exist than limit, next_cursor should be set."""
db = Mock()
# Return limit+1 rows to simulate more data
rows = [_make_analysis_row(i, minutes_ago=i) for i in range(4)]
db.list_analyses.return_value = rows
mock_get_db.return_value = db
response = client.get("/analyze/batch?limit=3")
assert response.status_code == 200
data = response.json()
assert len(data["items"]) == 3
assert data["next_cursor"] is not None
@patch("SPARC.api._get_job_db")
def test_cursor_passed_to_db(self, mock_get_db, client):
"""The cursor query param should be forwarded to the database layer."""
db = Mock()
db.list_analyses.return_value = []
mock_get_db.return_value = db
client.get("/analyze/batch?cursor=2025-01-01T00:00:00|42")
db.list_analyses.assert_called_once()
call_kwargs = db.list_analyses.call_args
assert call_kwargs.kwargs.get("cursor") == "2025-01-01T00:00:00|42" or \
(call_kwargs[1].get("cursor") == "2025-01-01T00:00:00|42" if len(call_kwargs) > 1 else False)
@patch("SPARC.api._get_job_db")
def test_default_limit_is_50(self, mock_get_db, client):
"""Default limit should be 50."""
db = Mock()
db.list_analyses.return_value = []
mock_get_db.return_value = db
client.get("/analyze/batch")
call_kwargs = db.list_analyses.call_args
# The endpoint requests limit+1 from DB, so 51
assert 51 in call_kwargs.args or call_kwargs.kwargs.get("limit") == 51
def test_limit_over_200_rejected(self, client):
"""Limit > 200 should be rejected with 422."""
response = client.get("/analyze/batch?limit=201")
assert response.status_code == 422
def test_limit_zero_rejected(self, client):
"""Limit < 1 should be rejected with 422."""
response = client.get("/analyze/batch?limit=0")
assert response.status_code == 422
@patch("SPARC.api._get_job_db")
def test_company_name_filter(self, mock_get_db, client):
"""The company_name filter should be forwarded to the database."""
db = Mock()
db.list_analyses.return_value = []
mock_get_db.return_value = db
client.get("/analyze/batch?company_name=intel")
call_kwargs = db.list_analyses.call_args
assert call_kwargs.kwargs.get("company_name") == "intel" or \
"intel" in (call_kwargs.args if call_kwargs.args else [])
@patch("SPARC.api._get_job_db")
def test_empty_result_set(self, mock_get_db, client):
"""Empty result set returns empty items and null cursor."""
db = Mock()
db.list_analyses.return_value = []
mock_get_db.return_value = db
response = client.get("/analyze/batch")
assert response.status_code == 200
data = response.json()
assert data["items"] == []
assert data["next_cursor"] is None
class TestJobsPaginationDefaults:
"""Test that /jobs endpoint uses updated defaults."""
@patch("SPARC.api._get_job_db")
def test_default_limit_is_50(self, mock_get_db, client):
"""Default limit should now be 50."""
db = Mock()
db.list_jobs.return_value = []
mock_get_db.return_value = db
client.get("/jobs")
call_kwargs = db.list_jobs.call_args
# Endpoint requests limit+1 from DB, so 51
assert 51 in call_kwargs.args or call_kwargs.kwargs.get("limit") == 51
def test_limit_over_200_rejected(self, client):
"""Limit > 200 should be rejected with 422."""
response = client.get("/jobs?limit=201")
assert response.status_code == 422
@patch("SPARC.api._get_job_db")
def test_limit_200_accepted(self, mock_get_db, client):
"""Limit of exactly 200 should be accepted."""
db = Mock()
db.list_jobs.return_value = []
mock_get_db.return_value = db
response = client.get("/jobs?limit=200")
assert response.status_code == 200
+109
View File
@@ -0,0 +1,109 @@
"""Tests for the /admin/rate-limits endpoint."""
from unittest.mock import patch
import pytest
from fastapi.testclient import TestClient
from SPARC import api
from SPARC.api import app
from SPARC.auth import UserResponse
@pytest.fixture
def client():
"""Create test client."""
return TestClient(app)
@pytest.fixture(autouse=True)
def reset_stats():
"""Reset rate limit stats between tests."""
api._rate_limit_stats.clear()
yield
api._rate_limit_stats.clear()
def _mock_admin():
"""Return a mock admin user."""
return UserResponse(id=1, email="admin@test.com", role="admin", created_at="2025-01-01T00:00:00")
def _mock_user():
"""Return a mock non-admin user."""
return UserResponse(id=2, email="user@test.com", role="user", created_at="2025-01-01T00:00:00")
class TestRateLimitAdminEndpoint:
"""Test GET /admin/rate-limits."""
def test_admin_can_access(self, client):
"""Admin users should be able to access the rate-limits endpoint."""
app.dependency_overrides[api.get_current_admin] = _mock_admin
try:
response = client.get("/admin/rate-limits")
assert response.status_code == 200
data = response.json()
assert "rate_limits" in data
assert isinstance(data["rate_limits"], list)
finally:
app.dependency_overrides.clear()
def test_non_admin_rejected(self, client):
"""Non-admin users should get 403."""
# Without overriding the dependency, it should fail auth
response = client.get("/admin/rate-limits")
assert response.status_code in (401, 403)
def test_returns_configured_endpoints(self, client):
"""Should list all rate-limited endpoints."""
app.dependency_overrides[api.get_current_admin] = _mock_admin
try:
response = client.get("/admin/rate-limits")
assert response.status_code == 200
data = response.json()
endpoints = [rl["endpoint"] for rl in data["rate_limits"]]
assert "/auth/register" in endpoints
assert "/auth/login" in endpoints
finally:
app.dependency_overrides.clear()
def test_empty_state_shows_zero_counts(self, client):
"""When no requests have been made, counts should be zero."""
app.dependency_overrides[api.get_current_admin] = _mock_admin
try:
response = client.get("/admin/rate-limits")
data = response.json()
for rl in data["rate_limits"]:
assert rl["total_requests"] == 0
assert rl["rejected_requests"] == 0
finally:
app.dependency_overrides.clear()
def test_tracks_requests(self, client):
"""After making requests, the stats should reflect them."""
api._track_rate_limit_request("/auth/login", "127.0.0.1")
api._track_rate_limit_request("/auth/login", "127.0.0.1")
api._track_rate_limit_request("/auth/login", "192.168.1.1", rejected=True)
app.dependency_overrides[api.get_current_admin] = _mock_admin
try:
response = client.get("/admin/rate-limits")
data = response.json()
login_stats = next(rl for rl in data["rate_limits"] if rl["endpoint"] == "/auth/login")
assert login_stats["total_requests"] == 3
assert login_stats["rejected_requests"] == 1
finally:
app.dependency_overrides.clear()
def test_includes_limit_config(self, client):
"""Each endpoint entry should include the rate limit config string."""
app.dependency_overrides[api.get_current_admin] = _mock_admin
try:
response = client.get("/admin/rate-limits")
data = response.json()
for rl in data["rate_limits"]:
assert "limit" in rl
assert isinstance(rl["limit"], str)
finally:
app.dependency_overrides.clear()
+263
View File
@@ -0,0 +1,263 @@
"""Tests for S3/MinIO storage backend in storage.py.
Covers issue #1660:
- S3StorageBackend read, write, exists, path_for
- Error handling: NoSuchKey, generic S3 errors, bucket auto-creation
- get_storage_backend() factory function
- LocalStorageBackend (basic sanity checks)
"""
from unittest.mock import MagicMock, patch
import pytest
from SPARC.storage import LocalStorageBackend, S3StorageBackend, get_storage_backend
# ---------- S3StorageBackend ----------
class TestS3StorageBackend:
"""Tests for the S3-compatible storage backend."""
@pytest.fixture
def s3_backend(self):
"""Create an S3StorageBackend with a fully mocked boto3 client."""
with patch.dict("sys.modules", {"boto3": MagicMock()}):
import boto3 as mock_boto
mock_s3 = MagicMock()
mock_boto.client.return_value = mock_s3
mock_s3.head_bucket.return_value = {}
backend = S3StorageBackend(
bucket="test-bucket",
endpoint_url="http://minio:9000",
access_key="minioadmin",
secret_key="minioadmin",
)
# Expose mock for assertions
backend._mock_s3 = mock_s3
yield backend
def test_write_puts_object(self, s3_backend):
"""write() calls put_object with correct bucket, key, and body."""
s3_backend.write("US-12345678-B2.pdf", b"PDF content here")
s3_backend._mock_s3.put_object.assert_called_once_with(
Bucket="test-bucket",
Key="US-12345678-B2.pdf",
Body=b"PDF content here",
ContentType="application/pdf",
)
def test_read_returns_body(self, s3_backend):
"""read() returns the Body content from get_object."""
mock_body = MagicMock()
mock_body.read.return_value = b"PDF data"
s3_backend._mock_s3.get_object.return_value = {"Body": mock_body}
result = s3_backend.read("US-12345678-B2.pdf")
assert result == b"PDF data"
s3_backend._mock_s3.get_object.assert_called_once_with(
Bucket="test-bucket",
Key="US-12345678-B2.pdf",
)
def test_read_nosuchkey_raises_file_not_found(self, s3_backend):
"""read() raises FileNotFoundError when object does not exist."""
# Create a NoSuchKey exception class on the mock
nosuchkey = type("NoSuchKey", (Exception,), {})
s3_backend._mock_s3.exceptions.NoSuchKey = nosuchkey
s3_backend._mock_s3.get_object.side_effect = nosuchkey("not found")
# Reassign s3 to trigger the except branch
s3_backend.s3 = s3_backend._mock_s3
with pytest.raises(FileNotFoundError, match="S3 object not found"):
s3_backend.read("missing.pdf")
def test_read_generic_404_raises_file_not_found(self, s3_backend):
"""read() handles generic 404 errors from S3-compatible APIs."""
nosuchkey = type("NoSuchKey", (Exception,), {})
s3_backend._mock_s3.exceptions.NoSuchKey = nosuchkey
s3_backend.s3 = s3_backend._mock_s3
s3_backend.s3.get_object.side_effect = Exception("An error occurred (404)")
with pytest.raises(FileNotFoundError, match="S3 object not found"):
s3_backend.read("missing.pdf")
def test_read_other_error_re_raises(self, s3_backend):
"""read() re-raises non-404 errors."""
nosuchkey = type("NoSuchKey", (Exception,), {})
s3_backend._mock_s3.exceptions.NoSuchKey = nosuchkey
s3_backend.s3 = s3_backend._mock_s3
s3_backend.s3.get_object.side_effect = Exception("Internal server error")
with pytest.raises(Exception, match="Internal server error"):
s3_backend.read("some-file.pdf")
def test_exists_returns_true_for_existing_object(self, s3_backend):
"""exists() returns True when head_object succeeds with content."""
s3_backend._mock_s3.head_object.return_value = {"ContentLength": 1024}
assert s3_backend.exists("US-12345678-B2.pdf") is True
def test_exists_returns_false_for_missing_object(self, s3_backend):
"""exists() returns False when head_object raises an exception."""
s3_backend._mock_s3.head_object.side_effect = Exception("Not Found")
assert s3_backend.exists("missing.pdf") is False
def test_exists_returns_false_for_zero_length(self, s3_backend):
"""exists() returns False when object has zero content length."""
s3_backend._mock_s3.head_object.return_value = {"ContentLength": 0}
assert s3_backend.exists("empty.pdf") is False
def test_path_for_returns_s3_uri(self, s3_backend):
"""path_for() returns an s3:// URI."""
path = s3_backend.path_for("US-12345678-B2.pdf")
assert path == "s3://test-bucket/US-12345678-B2.pdf"
def test_constructor_creates_bucket_if_missing(self):
"""Constructor creates the bucket if head_bucket fails."""
with patch.dict("sys.modules", {"boto3": MagicMock()}):
import boto3 as mock_boto
mock_s3 = MagicMock()
mock_boto.client.return_value = mock_s3
mock_s3.head_bucket.side_effect = Exception("Bucket not found")
S3StorageBackend(
bucket="new-bucket",
endpoint_url="http://minio:9000",
access_key="admin",
secret_key="admin",
)
mock_s3.create_bucket.assert_called_once_with(Bucket="new-bucket")
def test_constructor_handles_bucket_creation_failure(self):
"""Constructor logs warning but does not crash if bucket creation fails."""
with patch.dict("sys.modules", {"boto3": MagicMock()}):
import boto3 as mock_boto
mock_s3 = MagicMock()
mock_boto.client.return_value = mock_s3
mock_s3.head_bucket.side_effect = Exception("Bucket not found")
mock_s3.create_bucket.side_effect = Exception("Permission denied")
# Should not raise
backend = S3StorageBackend(
bucket="locked-bucket",
endpoint_url="http://minio:9000",
access_key="admin",
secret_key="admin",
)
assert backend.bucket == "locked-bucket"
def test_constructor_passes_endpoint_and_credentials(self):
"""Constructor passes endpoint_url and credentials to boto3.client."""
with patch.dict("sys.modules", {"boto3": MagicMock()}):
import boto3 as mock_boto
mock_s3 = MagicMock()
mock_boto.client.return_value = mock_s3
S3StorageBackend(
bucket="test",
endpoint_url="http://minio:9000",
access_key="mykey",
secret_key="mysecret",
)
mock_boto.client.assert_called_with(
"s3",
endpoint_url="http://minio:9000",
aws_access_key_id="mykey",
aws_secret_access_key="mysecret",
)
# ---------- LocalStorageBackend ----------
class TestLocalStorageBackend:
"""Basic sanity checks for the local filesystem backend."""
def test_write_and_read(self, tmp_path):
"""Write and read round-trip produces identical content."""
backend = LocalStorageBackend(base_dir=str(tmp_path))
backend.write("test.pdf", b"hello world")
result = backend.read("test.pdf")
assert result == b"hello world"
def test_read_missing_file_raises(self, tmp_path):
"""Reading a non-existent file raises FileNotFoundError."""
backend = LocalStorageBackend(base_dir=str(tmp_path))
with pytest.raises(FileNotFoundError):
backend.read("nonexistent.pdf")
def test_exists_true_for_written_file(self, tmp_path):
"""exists() returns True after writing a file."""
backend = LocalStorageBackend(base_dir=str(tmp_path))
backend.write("test.pdf", b"data")
assert backend.exists("test.pdf") is True
def test_exists_false_for_missing_file(self, tmp_path):
"""exists() returns False for non-existent file."""
backend = LocalStorageBackend(base_dir=str(tmp_path))
assert backend.exists("missing.pdf") is False
def test_exists_false_for_empty_file(self, tmp_path):
"""exists() returns False for zero-length file."""
backend = LocalStorageBackend(base_dir=str(tmp_path))
backend.write("empty.pdf", b"")
assert backend.exists("empty.pdf") is False
def test_path_for_returns_full_path(self, tmp_path):
"""path_for() returns the full filesystem path."""
backend = LocalStorageBackend(base_dir=str(tmp_path))
path = backend.path_for("test.pdf")
assert path == str(tmp_path / "test.pdf")
# ---------- get_storage_backend() factory ----------
class TestGetStorageBackend:
"""Tests for the storage backend factory function."""
@patch("SPARC.storage.config")
def test_returns_local_backend_by_default(self, mock_config):
"""Default config returns LocalStorageBackend."""
mock_config.storage_backend = "local"
backend = get_storage_backend()
assert isinstance(backend, LocalStorageBackend)
@patch("SPARC.storage.config")
def test_returns_s3_backend_when_configured(self, mock_config):
"""Setting storage_backend=s3 returns S3StorageBackend."""
mock_config.storage_backend = "s3"
mock_config.s3_bucket = "test-bucket"
mock_config.s3_endpoint_url = "http://minio:9000"
mock_config.s3_access_key = "key"
mock_config.s3_secret_key = "secret"
with patch.dict("sys.modules", {"boto3": MagicMock()}):
backend = get_storage_backend()
assert isinstance(backend, S3StorageBackend)
@patch("SPARC.storage.config")
def test_case_insensitive_backend_selection(self, mock_config):
"""Backend selection is case-insensitive."""
mock_config.storage_backend = "LOCAL"
backend = get_storage_backend()
assert isinstance(backend, LocalStorageBackend)
+262
View File
@@ -0,0 +1,262 @@
"""Tests for the webhook background task queue.
Covers:
- Worker lifecycle (start / stop / idempotent start)
- Tasks are processed asynchronously by the worker
- Retry logic is preserved (executed inside the worker thread)
- enqueue_notify / enqueue_job_completed / enqueue_alert non-blocking helpers
- Integration: queued webhook task is eventually delivered (mocked HTTP)
"""
import threading
import time
from unittest.mock import MagicMock, call, patch
import pytest
from SPARC.task_queue import (
WebhookTask,
drain,
enqueue,
queue_size,
start_worker,
stop_worker,
)
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture(autouse=True)
def _worker_lifecycle():
"""Start the worker before each test and stop it after."""
start_worker()
yield
stop_worker(timeout=3)
# ---------------------------------------------------------------------------
# Worker lifecycle
# ---------------------------------------------------------------------------
class TestWorkerLifecycle:
def test_start_is_idempotent(self):
"""Calling start_worker() twice does not create a second thread."""
import SPARC.task_queue as tq
first = tq._worker_thread
start_worker()
assert tq._worker_thread is first
def test_stop_worker_gracefully(self):
"""stop_worker joins the thread cleanly."""
import SPARC.task_queue as tq
assert tq._worker_thread is not None
stop_worker(timeout=3)
assert tq._worker_thread is None
# ---------------------------------------------------------------------------
# Task processing
# ---------------------------------------------------------------------------
class TestTaskProcessing:
@patch("SPARC.webhooks._send_with_retry")
def test_enqueued_task_is_delivered(self, mock_send):
"""A task put on the queue is eventually processed by the worker."""
mock_send.return_value = True
task = WebhookTask(url="https://example.com/hook", payload={"event": "test"})
enqueue(task)
drain(timeout=5)
mock_send.assert_called_once_with("https://example.com/hook", {"event": "test"})
@patch("SPARC.webhooks._send_with_retry")
def test_multiple_tasks_processed_in_order(self, mock_send):
"""Tasks are processed FIFO."""
mock_send.return_value = True
for i in range(3):
enqueue(WebhookTask(url=f"https://example.com/{i}", payload={"n": i}))
drain(timeout=5)
assert mock_send.call_count == 3
urls = [c[0][0] for c in mock_send.call_args_list]
assert urls == [
"https://example.com/0",
"https://example.com/1",
"https://example.com/2",
]
@patch("SPARC.webhooks._send_with_retry")
def test_enqueue_returns_immediately(self, mock_send):
"""enqueue() does not block even if the worker is slow."""
event = threading.Event()
def slow_send(url, payload):
event.wait(timeout=5)
return True
mock_send.side_effect = slow_send
start = time.monotonic()
enqueue(WebhookTask(url="https://slow.example.com", payload={}))
elapsed = time.monotonic() - start
# enqueue should return in well under 1 second
assert elapsed < 0.5
# Let the worker finish
event.set()
drain(timeout=5)
@patch("SPARC.webhooks._send_with_retry", side_effect=RuntimeError("boom"))
def test_worker_survives_unexpected_error(self, mock_send):
"""An unexpected exception in delivery does not kill the worker."""
enqueue(WebhookTask(url="https://example.com/bad", payload={}))
drain(timeout=5)
# Worker is still alive; enqueue another task
mock_send.side_effect = None
mock_send.return_value = True
enqueue(WebhookTask(url="https://example.com/good", payload={"ok": True}))
drain(timeout=5)
assert mock_send.call_count == 2
# ---------------------------------------------------------------------------
# Retry logic preserved in worker context
# ---------------------------------------------------------------------------
class TestRetryInWorker:
@patch("SPARC.webhooks.time.sleep")
@patch("SPARC.webhooks.requests.post")
def test_retry_logic_runs_inside_worker(self, mock_post, mock_sleep):
"""The worker thread uses _send_with_retry, which retries on failure."""
mock_post.side_effect = [
MagicMock(status_code=500),
MagicMock(status_code=200),
]
enqueue(WebhookTask(
url="https://example.com/retry",
payload={"event": "test"},
))
drain(timeout=10)
assert mock_post.call_count == 2
mock_sleep.assert_called_once()
@patch("SPARC.webhooks.time.sleep")
@patch("SPARC.webhooks.requests.post")
def test_all_retries_exhausted_in_worker(self, mock_post, mock_sleep):
"""Worker handles permanent failure gracefully."""
mock_post.return_value = MagicMock(status_code=500)
enqueue(WebhookTask(
url="https://example.com/fail",
payload={"event": "test"},
))
drain(timeout=10)
from SPARC.webhooks import MAX_RETRIES
assert mock_post.call_count == MAX_RETRIES
# ---------------------------------------------------------------------------
# Integration: enqueue_notify and convenience helpers
# ---------------------------------------------------------------------------
class TestEnqueueHelpers:
@patch("SPARC.webhooks._send_with_retry")
@patch("SPARC.webhooks.WEBHOOK_URLS", ["https://example.com/hook"])
def test_enqueue_notify_delivers_via_worker(self, mock_send):
"""enqueue_notify puts a task on the queue and the worker delivers it."""
mock_send.return_value = True
from SPARC.webhooks import enqueue_notify
enqueue_notify("test_event", {"key": "val"})
drain(timeout=5)
mock_send.assert_called_once()
url, payload = mock_send.call_args[0]
assert url == "https://example.com/hook"
assert payload["event"] == "test_event"
assert payload["key"] == "val"
@patch("SPARC.webhooks._send_with_retry")
@patch("SPARC.webhooks.WEBHOOK_URLS", ["https://example.com/hook"])
def test_enqueue_job_completed(self, mock_send):
"""enqueue_job_completed sends job completion data via the queue."""
mock_send.return_value = True
from SPARC.webhooks import enqueue_job_completed
enqueue_job_completed(
job_id="job-1",
status="completed",
total_companies=5,
successful=4,
failed=1,
)
drain(timeout=5)
mock_send.assert_called_once()
payload = mock_send.call_args[0][1]
assert payload["event"] == "job_completed"
assert payload["job_id"] == "job-1"
assert payload["successful"] == 4
@patch("SPARC.webhooks._send_with_retry")
@patch("SPARC.webhooks.WEBHOOK_URLS", ["https://example.com/hook"])
def test_enqueue_alert(self, mock_send):
"""enqueue_alert sends alert data via the queue."""
mock_send.return_value = True
from SPARC.webhooks import enqueue_alert
enqueue_alert(
company_name="NVIDIA",
alert_type="patent_count_change",
message="Patent count increased by 30%",
)
drain(timeout=5)
mock_send.assert_called_once()
payload = mock_send.call_args[0][1]
assert payload["event"] == "patent_alert"
assert payload["company_name"] == "NVIDIA"
@patch("SPARC.webhooks._send_with_retry")
@patch("SPARC.webhooks.WEBHOOK_URLS", [])
def test_enqueue_notify_noop_when_no_urls(self, mock_send):
"""enqueue_notify is a no-op when WEBHOOK_URLS is empty."""
from SPARC.webhooks import enqueue_notify
enqueue_notify("test_event", {"key": "val"})
drain(timeout=2)
mock_send.assert_not_called()
@patch("SPARC.webhooks._send_with_retry")
@patch("SPARC.webhooks.WEBHOOK_URLS", [
"https://hooks.slack.com/services/T00/B00/xxx",
"https://example.com/generic",
])
def test_enqueue_notify_slack_formatting(self, mock_send):
"""Slack URLs get Slack-formatted payloads even via the queue."""
mock_send.return_value = True
from SPARC.webhooks import enqueue_notify
enqueue_notify("test_event", {"key": "val"})
drain(timeout=5)
assert mock_send.call_count == 2
slack_payload = mock_send.call_args_list[0][0][1]
assert "text" in slack_payload
generic_payload = mock_send.call_args_list[1][0][1]
assert "event" in generic_payload
+387
View File
@@ -0,0 +1,387 @@
"""Tests for tracked company admin endpoints and scheduler integration.
Covers issue #1656:
- GET /admin/tracked (list tracked companies)
- POST /admin/tracked (add a tracked company)
- DELETE /admin/tracked/{company_name} (remove a tracked company)
- GET /admin/alerts (list alerts)
- scheduler.run_scheduled_analysis() integration
All tests mock the database layer and use JWT auth fixtures.
"""
from datetime import datetime, timezone
from unittest.mock import MagicMock, patch, call
import pytest
from fastapi.testclient import TestClient
from SPARC.api import app
from SPARC.auth import create_access_token
@pytest.fixture
def client():
"""Create test client."""
return TestClient(app)
@pytest.fixture(autouse=True)
def mock_db():
"""Mock the database client used by admin and auth endpoints."""
db = MagicMock()
# Default admin user for auth
db.get_user_by_id.return_value = {
"id": 1,
"email": "admin@test.com",
"role": "admin",
"created_at": datetime(2025, 1, 1, tzinfo=timezone.utc),
}
with patch("SPARC.api.get_db_client", return_value=db), \
patch("SPARC.auth.get_db_client", return_value=db):
yield db
def _admin_header():
"""Create an Authorization header with a valid admin access token."""
token = create_access_token(1, "admin@test.com", "admin")
return {"Authorization": f"Bearer {token}"}
def _user_header():
"""Create an Authorization header with a regular user access token."""
token = create_access_token(2, "user@test.com", "user")
return {"Authorization": f"Bearer {token}"}
# ---------- GET /admin/tracked ----------
class TestListTrackedCompanies:
"""GET /admin/tracked"""
def test_list_tracked_returns_companies(self, client, mock_db):
"""Admin can list tracked companies."""
mock_db.list_tracked_companies.return_value = [
{"company_name": "NVIDIA", "last_patent_count": 120, "last_analyzed": "2025-06-15"},
{"company_name": "AMD", "last_patent_count": 80, "last_analyzed": "2025-06-14"},
]
response = client.get("/admin/tracked", headers=_admin_header())
assert response.status_code == 200
data = response.json()
assert len(data) == 2
assert data[0]["company_name"] == "NVIDIA"
def test_list_tracked_empty(self, client, mock_db):
"""Returns empty list when no companies are tracked."""
mock_db.list_tracked_companies.return_value = []
response = client.get("/admin/tracked", headers=_admin_header())
assert response.status_code == 200
assert response.json() == []
def test_list_tracked_requires_admin(self, client, mock_db):
"""Regular user cannot access tracked companies list."""
mock_db.get_user_by_id.return_value = {
"id": 2,
"email": "user@test.com",
"role": "user",
"created_at": datetime(2025, 1, 1, tzinfo=timezone.utc),
}
response = client.get("/admin/tracked", headers=_user_header())
assert response.status_code == 403
def test_list_tracked_unauthenticated(self, client):
"""Unauthenticated request returns 401."""
response = client.get("/admin/tracked")
assert response.status_code == 401
# ---------- POST /admin/tracked ----------
class TestAddTrackedCompany:
"""POST /admin/tracked"""
def test_add_tracked_company_success(self, client, mock_db):
"""Admin can add a company to tracking."""
mock_db.add_tracked_company.return_value = {
"company_name": "Intel",
"last_patent_count": 0,
"last_analyzed": None,
}
response = client.post(
"/admin/tracked",
json={"company_name": "Intel"},
headers=_admin_header(),
)
assert response.status_code == 200
data = response.json()
assert data["company_name"] == "Intel"
mock_db.add_tracked_company.assert_called_once_with("Intel")
def test_add_duplicate_returns_409(self, client, mock_db):
"""Adding an already-tracked company returns 409."""
mock_db.add_tracked_company.return_value = None
response = client.post(
"/admin/tracked",
json={"company_name": "NVIDIA"},
headers=_admin_header(),
)
assert response.status_code == 409
assert "already tracked" in response.json()["detail"].lower()
def test_add_tracked_requires_admin(self, client, mock_db):
"""Regular user cannot add tracked companies."""
mock_db.get_user_by_id.return_value = {
"id": 2,
"email": "user@test.com",
"role": "user",
"created_at": datetime(2025, 1, 1, tzinfo=timezone.utc),
}
response = client.post(
"/admin/tracked",
json={"company_name": "Intel"},
headers=_user_header(),
)
assert response.status_code == 403
def test_add_tracked_empty_name_rejected(self, client):
"""Empty company name is rejected by validation."""
response = client.post(
"/admin/tracked",
json={"company_name": ""},
headers=_admin_header(),
)
assert response.status_code == 422 # Pydantic validation error
# ---------- DELETE /admin/tracked/{company_name} ----------
class TestRemoveTrackedCompany:
"""DELETE /admin/tracked/{company_name}"""
def test_remove_tracked_company_success(self, client, mock_db):
"""Admin can remove a tracked company."""
mock_db.remove_tracked_company.return_value = True
response = client.delete(
"/admin/tracked/NVIDIA",
headers=_admin_header(),
)
assert response.status_code == 200
assert "Stopped tracking" in response.json()["message"]
mock_db.remove_tracked_company.assert_called_once_with("NVIDIA")
def test_remove_nonexistent_returns_404(self, client, mock_db):
"""Removing a non-tracked company returns 404."""
mock_db.remove_tracked_company.return_value = False
response = client.delete(
"/admin/tracked/UnknownCorp",
headers=_admin_header(),
)
assert response.status_code == 404
assert "not found" in response.json()["detail"].lower()
def test_remove_tracked_requires_admin(self, client, mock_db):
"""Regular user cannot remove tracked companies."""
mock_db.get_user_by_id.return_value = {
"id": 2,
"email": "user@test.com",
"role": "user",
"created_at": datetime(2025, 1, 1, tzinfo=timezone.utc),
}
response = client.delete(
"/admin/tracked/NVIDIA",
headers=_user_header(),
)
assert response.status_code == 403
# ---------- GET /admin/alerts ----------
class TestListAlerts:
"""GET /admin/alerts"""
def test_list_alerts_returns_data(self, client, mock_db):
"""Admin can list alerts."""
mock_db.list_alerts.return_value = [
{
"id": 1,
"company_name": "NVIDIA",
"alert_type": "patent_count_change",
"message": "Patent count increased by 25%",
"created_at": "2025-06-15T10:00:00Z",
},
]
response = client.get("/admin/alerts", headers=_admin_header())
assert response.status_code == 200
data = response.json()
assert len(data) == 1
assert data[0]["alert_type"] == "patent_count_change"
def test_list_alerts_with_limit(self, client, mock_db):
"""Custom limit parameter is passed to the database."""
mock_db.list_alerts.return_value = []
response = client.get("/admin/alerts?limit=10", headers=_admin_header())
assert response.status_code == 200
mock_db.list_alerts.assert_called_once_with(limit=10)
def test_list_alerts_requires_admin(self, client, mock_db):
"""Regular user cannot access alerts."""
mock_db.get_user_by_id.return_value = {
"id": 2,
"email": "user@test.com",
"role": "user",
"created_at": datetime(2025, 1, 1, tzinfo=timezone.utc),
}
response = client.get("/admin/alerts", headers=_user_header())
assert response.status_code == 403
# ---------- Scheduler integration ----------
class TestSchedulerIntegration:
"""Tests for scheduler.run_scheduled_analysis()."""
def test_no_tracked_companies_skips_analysis(self):
"""Scheduler does nothing when no companies are tracked."""
mock_db = MagicMock()
mock_db.list_tracked_companies.return_value = []
with patch("SPARC.scheduler.get_db_client", return_value=mock_db), \
patch("SPARC.scheduler.CompanyAnalyzer") as mock_analyzer_cls:
from SPARC.scheduler import run_scheduled_analysis
run_scheduled_analysis()
mock_analyzer_cls.assert_not_called()
def test_scheduler_analyzes_each_tracked_company(self):
"""Scheduler runs analysis for every tracked company."""
mock_db = MagicMock()
mock_db.list_tracked_companies.return_value = [
{"company_name": "NVIDIA", "last_patent_count": 100},
{"company_name": "AMD", "last_patent_count": 50},
]
mock_result_nvidia = MagicMock(success=True, patent_count=110)
mock_result_amd = MagicMock(success=True, patent_count=55)
mock_analyzer = MagicMock()
mock_analyzer._analyze_company_safe.side_effect = [mock_result_nvidia, mock_result_amd]
with patch("SPARC.scheduler.get_db_client", return_value=mock_db), \
patch("SPARC.scheduler.CompanyAnalyzer", return_value=mock_analyzer):
from SPARC.scheduler import run_scheduled_analysis
run_scheduled_analysis()
assert mock_analyzer._analyze_company_safe.call_count == 2
mock_db.update_tracked_company.assert_any_call("NVIDIA", 110)
mock_db.update_tracked_company.assert_any_call("AMD", 55)
def test_scheduler_triggers_alert_on_significant_change(self):
"""Scheduler stores an alert when patent count changes significantly."""
mock_db = MagicMock()
mock_db.list_tracked_companies.return_value = [
{"company_name": "Tesla", "last_patent_count": 100},
]
mock_result = MagicMock(success=True, patent_count=130) # 30% increase
mock_analyzer = MagicMock()
mock_analyzer._analyze_company_safe.return_value = mock_result
with patch("SPARC.scheduler.get_db_client", return_value=mock_db), \
patch("SPARC.scheduler.CompanyAnalyzer", return_value=mock_analyzer):
from SPARC.scheduler import run_scheduled_analysis
run_scheduled_analysis()
mock_db.store_alert.assert_called_once()
alert_kwargs = mock_db.store_alert.call_args
assert alert_kwargs[1]["company_name"] == "Tesla"
assert alert_kwargs[1]["alert_type"] == "patent_count_change"
assert alert_kwargs[1]["old_value"] == 100
assert alert_kwargs[1]["new_value"] == 130
def test_scheduler_no_alert_for_small_change(self):
"""Scheduler does not alert when change is below threshold."""
mock_db = MagicMock()
mock_db.list_tracked_companies.return_value = [
{"company_name": "Intel", "last_patent_count": 100},
]
mock_result = MagicMock(success=True, patent_count=105) # 5% increase
mock_analyzer = MagicMock()
mock_analyzer._analyze_company_safe.return_value = mock_result
with patch("SPARC.scheduler.get_db_client", return_value=mock_db), \
patch("SPARC.scheduler.CompanyAnalyzer", return_value=mock_analyzer):
from SPARC.scheduler import run_scheduled_analysis
run_scheduled_analysis()
mock_db.store_alert.assert_not_called()
def test_scheduler_handles_analysis_failure(self):
"""Scheduler continues when one company fails analysis."""
mock_db = MagicMock()
mock_db.list_tracked_companies.return_value = [
{"company_name": "FailCo", "last_patent_count": 50},
{"company_name": "SuccessCo", "last_patent_count": 30},
]
mock_fail_result = MagicMock(success=False, error="API timeout")
mock_ok_result = MagicMock(success=True, patent_count=35)
mock_analyzer = MagicMock()
mock_analyzer._analyze_company_safe.side_effect = [mock_fail_result, mock_ok_result]
with patch("SPARC.scheduler.get_db_client", return_value=mock_db), \
patch("SPARC.scheduler.CompanyAnalyzer", return_value=mock_analyzer):
from SPARC.scheduler import run_scheduled_analysis
run_scheduled_analysis()
# FailCo should not get updated, SuccessCo should
mock_db.update_tracked_company.assert_called_once_with("SuccessCo", 35)
def test_scheduler_handles_exception_in_analysis(self):
"""Scheduler continues even when analysis raises an exception."""
mock_db = MagicMock()
mock_db.list_tracked_companies.return_value = [
{"company_name": "CrashCo", "last_patent_count": 10},
{"company_name": "OKCo", "last_patent_count": 20},
]
mock_ok_result = MagicMock(success=True, patent_count=22)
mock_analyzer = MagicMock()
mock_analyzer._analyze_company_safe.side_effect = [
RuntimeError("unexpected error"),
mock_ok_result,
]
with patch("SPARC.scheduler.get_db_client", return_value=mock_db), \
patch("SPARC.scheduler.CompanyAnalyzer", return_value=mock_analyzer):
from SPARC.scheduler import run_scheduled_analysis
run_scheduled_analysis()
# OKCo should still be processed
mock_db.update_tracked_company.assert_called_once_with("OKCo", 22)
+280
View File
@@ -0,0 +1,280 @@
"""Tests for webhook notification system: retry logic and Slack/Discord payload format.
Covers issue #1657:
- Retry logic with exponential backoff in _send_with_retry
- Slack/Discord payload formatting in _build_payload
- Generic HTTP POST payload formatting
- notify() dispatching to multiple URLs
- notify_job_completed() and notify_alert() convenience helpers
"""
from datetime import datetime
from unittest.mock import MagicMock, patch, call
import pytest
import requests
from SPARC.webhooks import (
MAX_RETRIES,
_build_payload,
_is_slack_url,
_send_with_retry,
notify,
notify_alert,
notify_job_completed,
)
class TestIsSlackUrl:
"""Tests for Slack/Discord URL detection."""
def test_slack_webhook_url(self):
assert _is_slack_url("https://hooks.slack.com/services/T00/B00/xxx") is True
def test_discord_webhook_url(self):
assert _is_slack_url("https://discord.com/api/webhooks/123/abc") is True
def test_generic_url(self):
assert _is_slack_url("https://example.com/webhook") is False
def test_empty_url(self):
assert _is_slack_url("") is False
class TestBuildPayload:
"""Tests for payload construction."""
def test_generic_payload_structure(self):
"""Generic payload includes event type, timestamp, and data."""
payload = _build_payload("job_completed", {"job_id": "abc123"})
assert payload["event"] == "job_completed"
assert payload["job_id"] == "abc123"
assert "timestamp" in payload
# Timestamp should be ISO format ending with Z
assert payload["timestamp"].endswith("Z")
def test_slack_payload_wraps_in_text(self):
"""Slack payload wraps content in a 'text' field."""
payload = _build_payload("patent_alert", {"company_name": "NVIDIA"}, slack=True)
assert "text" in payload
assert "patent_alert" in payload["text"]
assert "NVIDIA" in payload["text"]
# Slack payload should NOT have the event/timestamp at top level
assert "event" not in payload
assert "timestamp" not in payload
def test_generic_payload_does_not_have_text_field(self):
"""Non-Slack payload does not wrap in text."""
payload = _build_payload("job_completed", {"status": "done"})
assert "text" not in payload
assert payload["status"] == "done"
def test_slack_payload_contains_bold_header(self):
"""Slack payload starts with bold event header using Slack markdown."""
payload = _build_payload("job_completed", {"count": 5}, slack=True)
assert payload["text"].startswith("*[SPARC] job_completed*")
def test_payload_merges_all_data_keys(self):
"""All data keys are included in the generic payload."""
data = {"key1": "val1", "key2": 42, "key3": True}
payload = _build_payload("test_event", data)
assert payload["key1"] == "val1"
assert payload["key2"] == 42
assert payload["key3"] is True
class TestSendWithRetry:
"""Tests for retry logic in _send_with_retry."""
@patch("SPARC.webhooks.time.sleep")
@patch("SPARC.webhooks.requests.post")
def test_success_on_first_attempt(self, mock_post, mock_sleep):
"""Successful delivery on first attempt, no retries."""
mock_post.return_value = MagicMock(status_code=200)
result = _send_with_retry("https://example.com/hook", {"event": "test"})
assert result is True
mock_post.assert_called_once()
mock_sleep.assert_not_called()
@patch("SPARC.webhooks.time.sleep")
@patch("SPARC.webhooks.requests.post")
def test_success_on_second_attempt(self, mock_post, mock_sleep):
"""Fails first, succeeds on retry."""
mock_post.side_effect = [
MagicMock(status_code=500),
MagicMock(status_code=200),
]
result = _send_with_retry("https://example.com/hook", {"event": "test"})
assert result is True
assert mock_post.call_count == 2
mock_sleep.assert_called_once()
@patch("SPARC.webhooks.time.sleep")
@patch("SPARC.webhooks.requests.post")
def test_all_retries_exhausted(self, mock_post, mock_sleep):
"""Returns False after all retries fail."""
mock_post.return_value = MagicMock(status_code=500)
result = _send_with_retry("https://example.com/hook", {"event": "test"})
assert result is False
assert mock_post.call_count == MAX_RETRIES
assert mock_sleep.call_count == MAX_RETRIES - 1
@patch("SPARC.webhooks.time.sleep")
@patch("SPARC.webhooks.requests.post")
def test_exponential_backoff_timing(self, mock_post, mock_sleep):
"""Backoff wait times follow exponential pattern (2^attempt)."""
mock_post.return_value = MagicMock(status_code=500)
_send_with_retry("https://example.com/hook", {"event": "test"})
# With BACKOFF_BASE=2: attempt 1 -> sleep(2), attempt 2 -> sleep(4)
expected_waits = [call(2 ** i) for i in range(1, MAX_RETRIES)]
assert mock_sleep.call_args_list == expected_waits
@patch("SPARC.webhooks.time.sleep")
@patch("SPARC.webhooks.requests.post")
def test_network_error_triggers_retry(self, mock_post, mock_sleep):
"""Network exceptions trigger retry, not immediate failure."""
mock_post.side_effect = [
requests.ConnectionError("Connection refused"),
MagicMock(status_code=200),
]
result = _send_with_retry("https://example.com/hook", {"event": "test"})
assert result is True
assert mock_post.call_count == 2
@patch("SPARC.webhooks.time.sleep")
@patch("SPARC.webhooks.requests.post")
def test_timeout_error_triggers_retry(self, mock_post, mock_sleep):
"""Timeout exceptions trigger retry."""
mock_post.side_effect = [
requests.Timeout("Request timed out"),
MagicMock(status_code=200),
]
result = _send_with_retry("https://example.com/hook", {"event": "test"})
assert result is True
assert mock_post.call_count == 2
@patch("SPARC.webhooks.time.sleep")
@patch("SPARC.webhooks.requests.post")
def test_2xx_status_codes_accepted(self, mock_post, mock_sleep):
"""Any 2xx status code is treated as success."""
mock_post.return_value = MagicMock(status_code=204)
result = _send_with_retry("https://example.com/hook", {"event": "test"})
assert result is True
mock_post.assert_called_once()
@patch("SPARC.webhooks.time.sleep")
@patch("SPARC.webhooks.requests.post")
def test_posts_json_payload(self, mock_post, mock_sleep):
"""Payload is sent as JSON with correct timeout."""
mock_post.return_value = MagicMock(status_code=200)
payload = {"event": "test", "data": "value"}
_send_with_retry("https://example.com/hook", payload)
mock_post.assert_called_once_with(
"https://example.com/hook", json=payload, timeout=10
)
class TestNotify:
"""Tests for the notify() dispatcher."""
@patch("SPARC.webhooks._send_with_retry")
@patch("SPARC.webhooks.WEBHOOK_URLS", ["https://example.com/hook1", "https://example.com/hook2"])
def test_dispatches_to_all_urls(self, mock_send):
"""notify() sends to every configured webhook URL."""
mock_send.return_value = True
notify("job_completed", {"job_id": "test123"})
assert mock_send.call_count == 2
@patch("SPARC.webhooks._send_with_retry")
@patch("SPARC.webhooks.WEBHOOK_URLS", [])
def test_no_urls_configured_returns_immediately(self, mock_send):
"""No-op when no webhook URLs are configured."""
notify("job_completed", {"job_id": "test123"})
mock_send.assert_not_called()
@patch("SPARC.webhooks._send_with_retry")
@patch("SPARC.webhooks.WEBHOOK_URLS", [
"https://hooks.slack.com/services/T00/B00/xxx",
"https://example.com/generic",
])
def test_slack_url_gets_slack_payload(self, mock_send):
"""Slack URLs receive Slack-formatted payloads, others get generic."""
mock_send.return_value = True
notify("test_event", {"key": "val"})
# First call (Slack URL) should have "text" key
slack_payload = mock_send.call_args_list[0][0][1]
assert "text" in slack_payload
# Second call (generic URL) should have "event" key
generic_payload = mock_send.call_args_list[1][0][1]
assert "event" in generic_payload
assert generic_payload["event"] == "test_event"
class TestNotifyJobCompleted:
"""Tests for notify_job_completed() convenience function."""
@patch("SPARC.webhooks.notify")
def test_sends_correct_event_and_data(self, mock_notify):
"""Job completion sends proper event type and summary."""
notify_job_completed(
job_id="batch-001",
status="completed",
total_companies=10,
successful=8,
failed=2,
)
mock_notify.assert_called_once()
event, data = mock_notify.call_args[0]
assert event == "job_completed"
assert data["job_id"] == "batch-001"
assert data["successful"] == 8
assert data["failed"] == 2
assert "8/10" in data["summary"]
class TestNotifyAlert:
"""Tests for notify_alert() convenience function."""
@patch("SPARC.webhooks.notify")
def test_sends_correct_event_and_data(self, mock_notify):
"""Alert notification sends patent_alert event type."""
notify_alert(
company_name="NVIDIA",
alert_type="patent_count_change",
message="Patent count increased by 30%",
)
mock_notify.assert_called_once()
event, data = mock_notify.call_args[0]
assert event == "patent_alert"
assert data["company_name"] == "NVIDIA"
assert data["alert_type"] == "patent_count_change"
assert "30%" in data["message"]