Compare commits

..

6 Commits

Author SHA1 Message Date
agent-company 2e6b8c7445 feat: add webhook notification support for job completion and alerts
Send HTTP POST notifications to configured webhook URLs when batch
jobs complete or when scheduled analysis detects significant changes.

- Add SPARC/webhooks.py with retry logic (3 attempts, exponential backoff)
- Support generic HTTP POST and Slack-compatible text payloads
- Integrate into batch job completion handler in api.py
- Configure via WEBHOOK_URLS env var (comma-separated)
- Payload includes event type, job ID, status, and summary

Closes leeworks-agents/SPARC#23

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-26 10:32:07 +00:00
AI-Manager 55c131cb32 Merge pull request 'ci: add pytest and ruff linting to CI workflow' (#32) from feature/ci-testing-linting into main 2026-03-26 07:04:31 +00:00
agent-company fbb72fe2a5 ci: add pytest and ruff linting to CI, fix all lint errors
- Add test job to build.yaml that runs pytest and ruff before building images
- Add standalone test.yaml workflow for PRs
- Add ruff.toml with E/F/I rules configured
- Fix all ruff lint errors: sort imports, remove unused imports, fix re-exports
- Build jobs now depend on test job passing (needs: test)

Closes leeworks-agents/SPARC#18
Closes leeworks-agents/SPARC#19

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-26 07:04:00 +00:00
AI-Manager e484baaf5f Merge pull request 'feat: configurable LLM model, SERP cache TTL, structured logging, fix type' (#29) from feature/p2-config-improvements into main 2026-03-26 07:03:08 +00:00
AI-Manager 069f1c343c Merge pull request 'refactor(db): shared pooled DatabaseClient singleton' (#30) from feature/db-client-pooling into main 2026-03-26 07:02:46 +00:00
agent-company b000146585 feat: configurable LLM model, SERP cache TTL, structured logging, fix patent_id type
- Make LLM model configurable via MODEL env var, default anthropic/claude-3.5-sonnet (#12)
- Expose SERP cache TTL as SERP_CACHE_TTL_HOURS env var, default 24 hours (#13)
- Fix Patent.patent_id type annotation from int to str in types.py (#14)
- Replace all print() calls with structured logging in analyzer.py and llm.py (#11)
- Add LOG_LEVEL config with basicConfig setup in config.py
- Add model and serp_cache_ttl_hours to config.py

Closes leeworks-agents/SPARC#11
Closes leeworks-agents/SPARC#12
Closes leeworks-agents/SPARC#13
Closes leeworks-agents/SPARC#14

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-26 06:03:25 +00:00
17 changed files with 334 additions and 51 deletions
+6
View File
@@ -40,3 +40,9 @@ JWT_SECRET=your-secure-jwt-secret-change-in-production
# When USE_CACHE=true: check database for cached responses before making API calls
# When USE_CACHE=false: always make fresh API calls (still stores results in database)
USE_CACHE=true
# ---- Webhooks ----
# Comma-separated list of webhook URLs for job completion and alert notifications
# Supports generic HTTP POST and Slack/Discord incoming webhooks
# WEBHOOK_URLS=https://hooks.slack.com/services/XXX,https://example.com/webhook
+37
View File
@@ -9,7 +9,43 @@ on:
workflow_dispatch:
jobs:
test:
runs-on: ubuntu-latest
steps:
- name: Install system dependencies
shell: sh
run: |
apk add --no-cache git python3 py3-pip gcc musl-dev libpq-dev python3-dev
- name: Checkout code
shell: sh
run: |
git clone http://gitea.gitea.svc.cluster.local/${{ gitea.repository }}.git .
git checkout ${{ gitea.sha }}
- name: Install Python dependencies
shell: sh
run: |
pip3 install --break-system-packages -r requirements.txt ruff
- name: Run ruff linter
shell: sh
run: |
ruff check SPARC/ tests/
- name: Run pytest
shell: sh
env:
DATABASE_URL: "sqlite://"
API_KEY: "test-key"
OPENROUTER_API_KEY: "test-key"
JWT_SECRET: "test-secret-for-ci"
APP_ENV: "development"
run: |
python3 -m pytest tests/ -v --tb=short -x
build-api:
needs: test
runs-on: ubuntu-latest
steps:
- name: Install dependencies
@@ -81,6 +117,7 @@ jobs:
echo "API image available at ${{ steps.tags.outputs.IMAGE_TAG }}"
build-frontend:
needs: test
runs-on: ubuntu-latest
steps:
- name: Install dependencies
+46
View File
@@ -0,0 +1,46 @@
name: Test and Lint
on:
push:
branches:
- main
pull_request:
branches:
- main
workflow_dispatch:
jobs:
test:
runs-on: ubuntu-latest
steps:
- name: Install system dependencies
shell: sh
run: |
apk add --no-cache git python3 py3-pip gcc musl-dev libpq-dev python3-dev
- name: Checkout code
shell: sh
run: |
git clone http://gitea.gitea.svc.cluster.local/${{ gitea.repository }}.git .
git checkout ${{ gitea.sha }}
- name: Install Python dependencies
shell: sh
run: |
pip3 install --break-system-packages -r requirements.txt ruff
- name: Run ruff linter
shell: sh
run: |
ruff check SPARC/ tests/
- name: Run pytest
shell: sh
env:
DATABASE_URL: "sqlite://"
API_KEY: "test-key"
OPENROUTER_API_KEY: "test-key"
JWT_SECRET: "test-secret-for-ci"
APP_ENV: "development"
run: |
python3 -m pytest tests/ -v --tb=short -x
+3 -2
View File
@@ -1,3 +1,4 @@
from .types import Patents, Patent
from .types import Patent as Patent
from .types import Patents as Patents
all = ["Patents", "Patent"]
__all__ = ["Patents", "Patent"]
+23 -18
View File
@@ -5,14 +5,17 @@ to provide company performance estimation based on patent portfolios.
"""
import hashlib
import logging
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Callable
from SPARC import config
logger = logging.getLogger(__name__)
from SPARC.database import DatabaseClient
from SPARC.serp_api import SERP
from SPARC.llm import LLMAnalyzer
from SPARC.types import Patent, Patents, CompanyAnalysisResult, BatchAnalysisResult
from SPARC.serp_api import SERP
from SPARC.types import BatchAnalysisResult, CompanyAnalysisResult, Patent, Patents
class CompanyAnalyzer:
@@ -52,13 +55,13 @@ class CompanyAnalyzer:
query_hash = hashlib.sha256(company_name.lower().encode()).hexdigest()
cached_ids = self.db.get_cached_serp_query(query_hash)
if cached_ids is not None:
print(f"Using cached SERP results for {company_name} ({len(cached_ids)} patents)")
logger.info("Using cached SERP results for %s (%d patents)", company_name, len(cached_ids))
patents = Patents(patents=[
Patent(patent_id=pid, pdf_link="")
for pid in cached_ids
])
else:
print(f"Retrieving patents for {company_name}...")
logger.info("Retrieving patents for %s...", company_name)
patents = SERP.query(company_name)
# Cache the SERP results
if patents.patents:
@@ -66,12 +69,13 @@ class CompanyAnalyzer:
company_name=company_name,
query_hash=query_hash,
patent_ids=[p.patent_id for p in patents.patents],
ttl_hours=config.serp_cache_ttl_hours,
)
if not patents.patents:
return f"No patents found for {company_name}"
print(f"Found {len(patents.patents)} patents. Processing...")
logger.info("Found %d patents. Processing...", len(patents.patents))
# Download, parse, and minimize patents in parallel
processed_patents = []
@@ -87,12 +91,12 @@ class CompanyAnalyzer:
if result:
processed_patents.append(result)
except Exception as e:
print(f"Warning: Failed to process {patent.patent_id}: {e}")
logger.warning("Failed to process %s: %s", patent.patent_id, e)
if not processed_patents:
return f"Failed to process any patents for {company_name}"
print(f"Analyzing portfolio with LLM...")
logger.info("Analyzing portfolio with LLM...")
# Analyze the full portfolio with LLM
analysis = self.llm_analyzer.analyze_patent_portfolio(
@@ -122,6 +126,7 @@ class CompanyAnalyzer:
FileNotFoundError: If the patent PDF is not found at the expected path.
"""
import os
logger.info("Analyzing patent %s for %s...", patent_id, company_name)
patent_path = f"patents/{patent_id}.pdf"
@@ -183,7 +188,7 @@ class CompanyAnalyzer:
return {"patent_id": patent.patent_id, "content": minimized_content}
except Exception as e:
print(f"Warning: Failed to process {patent.patent_id}: {e}")
logger.warning("Failed to process %s: %s", patent.patent_id, e)
return None
def _analyze_company_safe(self, company_name: str) -> CompanyAnalysisResult:
@@ -254,7 +259,7 @@ class CompanyAnalyzer:
results: list[CompanyAnalysisResult] = []
total = len(companies)
print(f"Starting batch analysis of {total} companies...")
logger.info("Starting batch analysis of %d companies...", total)
with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_company = {
@@ -271,8 +276,8 @@ class CompanyAnalyzer:
result = future.result()
results.append(result)
status = "" if result.success else ""
print(f"[{completed}/{total}] {status} {company}")
status = "OK" if result.success else "FAIL"
logger.info("[%d/%d] %s %s", completed, total, status, company)
if progress_callback:
progress_callback(company, completed, total)
@@ -287,12 +292,12 @@ class CompanyAnalyzer:
error=str(e),
)
)
print(f"[{completed}/{total}] ✗ {company}: {e}")
logger.error("[%d/%d] FAIL %s: %s", completed, total, company, e)
successful = sum(1 for r in results if r.success)
failed = total - successful
print(f"\nBatch complete: {successful} succeeded, {failed} failed")
logger.info("Batch complete: %d succeeded, %d failed", successful, failed)
return BatchAnalysisResult(
results=results,
@@ -318,20 +323,20 @@ class CompanyAnalyzer:
results: list[CompanyAnalysisResult] = []
total = len(companies)
print(f"Starting sequential analysis of {total} companies...")
logger.info("Starting sequential analysis of %d companies...", total)
for idx, company in enumerate(companies, 1):
print(f"\n[{idx}/{total}] Analyzing {company}...")
logger.info("[%d/%d] Analyzing %s...", idx, total, company)
result = self._analyze_company_safe(company)
results.append(result)
status = "" if result.success else ""
print(f"[{idx}/{total}] {status} {company}")
status = "OK" if result.success else "FAIL"
logger.info("[%d/%d] %s %s", idx, total, status, company)
successful = sum(1 for r in results if r.success)
failed = total - successful
print(f"\nBatch complete: {successful} succeeded, {failed} failed")
logger.info("Batch complete: %d succeeded, %d failed", successful, failed)
return BatchAnalysisResult(
results=results,
+17
View File
@@ -519,8 +519,25 @@ def _run_batch_job(job_id: str, companies: list[str], max_workers: int):
progress=100,
result_json=_json.dumps(batch_response.model_dump(), default=str),
)
# Fire webhook notification
from SPARC.webhooks import notify_job_completed
notify_job_completed(
job_id=job_id,
status="completed",
total_companies=result.total_companies,
successful=result.successful,
failed=result.failed,
)
except Exception as e:
db.update_job(job_id, status="failed", error=str(e))
from SPARC.webhooks import notify_job_completed
notify_job_completed(
job_id=job_id,
status="failed",
total_companies=len(companies),
successful=0,
failed=len(companies),
)
@app.post("/analyze/batch/async", response_model=JobStatus, tags=["Analysis"])
+16 -1
View File
@@ -2,11 +2,20 @@
Loads environment variables from .env file for API keys and other secrets.
"""
from dotenv import load_dotenv
import logging
import os
from dotenv import load_dotenv
load_dotenv()
# Logging configuration
log_level = os.getenv("LOG_LEVEL", "INFO").upper()
logging.basicConfig(
level=getattr(logging, log_level, logging.INFO),
format="%(asctime)s %(levelname)s %(name)s %(message)s",
)
# SerpAPI key for patent search
api_key = os.getenv("API_KEY")
@@ -30,6 +39,12 @@ use_database = os.getenv("USE_DATABASE", "false").lower() in ("true", "1", "yes"
patent_search_days = int(os.getenv("PATENT_SEARCH_DAYS", "90"))
patent_thread_workers = int(os.getenv("PATENT_THREAD_WORKERS", "5"))
# LLM model to use via OpenRouter (e.g. "anthropic/claude-3.5-sonnet", "openai/gpt-4o")
model = os.getenv("MODEL", "anthropic/claude-3.5-sonnet")
# SERP cache TTL in hours (how long cached search results are considered fresh)
serp_cache_ttl_hours = int(os.getenv("SERP_CACHE_TTL_HOURS", "24"))
# Root path for running behind a reverse proxy (e.g., "/api" when served at /api/)
# This ensures OpenAPI docs work correctly when accessed via the proxy
root_path = os.getenv("ROOT_PATH", "")
+7 -6
View File
@@ -1,14 +1,15 @@
"""Database client for storing and retrieving LLM messages and user authentication."""
import contextlib
import psycopg2
from psycopg2.pool import ThreadedConnectionPool
from psycopg2.extras import RealDictCursor
from typing import Dict, List, Optional
from datetime import datetime, timedelta
import json
import hashlib
import json
from datetime import datetime, timedelta
from typing import Dict, List, Optional
import bcrypt
import psycopg2
from psycopg2.extras import RealDictCursor
from psycopg2.pool import ThreadedConnectionPool
class DatabaseClient:
+9 -8
View File
@@ -1,9 +1,14 @@
"""LLM integration for patent analysis using OpenRouter."""
import logging
from typing import Dict
from openai import OpenAI
from SPARC import config
from SPARC.database import DatabaseClient
from typing import Dict
logger = logging.getLogger(__name__)
class LLMAnalyzer:
@@ -20,7 +25,7 @@ class LLMAnalyzer:
"""
self.test_mode = test_mode
self.use_cache = use_cache if use_cache is not None else config.use_cache
self.model = "anthropic/claude-3.5-sonnet"
self.model = config.model
# Always initialize database client for storage and caching
self.db_client = DatabaseClient(config.database_url)
@@ -59,11 +64,7 @@ Patent Content:
Provide a concise analysis (2-3 paragraphs) focusing on what this patent reveals about the company's technical direction and competitive advantage."""
if self.test_mode:
print("=" * 80)
print("TEST MODE - Prompt that would be sent to LLM:")
print("=" * 80)
print(prompt)
print("=" * 80)
logger.debug("TEST MODE - Prompt that would be sent to LLM:\n%s", prompt)
return "[TEST MODE - No API call made]"
# Check cache first
@@ -165,7 +166,7 @@ Patent Portfolio:
Provide a comprehensive analysis (4-5 paragraphs) with a final verdict on the company's innovation strength and performance outlook."""
if self.test_mode:
print(prompt)
logger.debug("TEST MODE - Portfolio prompt:\n%s", prompt)
return "[TEST MODE]"
metadata = {
+8 -5
View File
@@ -1,12 +1,15 @@
import os
import serpapi
from SPARC import config
import re
import pdfplumber # pip install pdfplumber
import requests
from datetime import datetime, timedelta
from typing import Dict
from SPARC.types import Patents, Patent
import pdfplumber # pip install pdfplumber
import requests
import serpapi
from SPARC import config
from SPARC.types import Patent, Patents
class SERP:
def query(company: str, days_back: int = None) -> Patents:
+1 -1
View File
@@ -4,7 +4,7 @@ from datetime import datetime
@dataclass
class Patent:
patent_id: int
patent_id: str
pdf_link: str
pdf_path: str | None = None
summary: dict | None = None
+139
View File
@@ -0,0 +1,139 @@
"""Webhook notifications for job completion and alert events.
Sends JSON payloads to configured webhook URLs with retry logic.
Supports generic HTTP POST and Slack-compatible text payloads.
"""
import logging
import os
import time
from datetime import datetime
from typing import Any
import requests
logger = logging.getLogger(__name__)
# Comma-separated list of webhook URLs (env var based config)
_WEBHOOK_URLS_RAW = os.getenv("WEBHOOK_URLS", "")
WEBHOOK_URLS: list[str] = [
url.strip() for url in _WEBHOOK_URLS_RAW.split(",") if url.strip()
]
MAX_RETRIES = 3
BACKOFF_BASE = 2 # seconds
def _is_slack_url(url: str) -> bool:
"""Check if a URL looks like a Slack incoming webhook."""
return "hooks.slack.com" in url or "discord.com/api/webhooks" in url
def _build_payload(event_type: str, data: dict[str, Any], slack: bool = False) -> dict:
"""Build the webhook payload.
Args:
event_type: Type of event (e.g., "job_completed", "alert")
data: Event-specific data
slack: If True, wrap in Slack-compatible ``text`` format
Returns:
JSON-serializable payload dict
"""
payload = {
"event": event_type,
"timestamp": datetime.utcnow().isoformat() + "Z",
**data,
}
if slack:
# Build a human-readable summary for Slack/Discord
lines = [f"*[SPARC] {event_type}*"]
for key, value in data.items():
lines.append(f" {key}: {value}")
return {"text": "\n".join(lines)}
return payload
def _send_with_retry(url: str, payload: dict) -> bool:
"""Send a POST request with exponential backoff retry.
Args:
url: Webhook URL
payload: JSON payload to send
Returns:
True if delivered successfully, False after all retries exhausted
"""
for attempt in range(1, MAX_RETRIES + 1):
try:
response = requests.post(url, json=payload, timeout=10)
if response.status_code < 300:
logger.debug("Webhook delivered to %s (attempt %d)", url, attempt)
return True
logger.warning(
"Webhook %s returned %d (attempt %d/%d)",
url, response.status_code, attempt, MAX_RETRIES,
)
except requests.RequestException as e:
logger.warning(
"Webhook delivery failed for %s (attempt %d/%d): %s",
url, attempt, MAX_RETRIES, e,
)
if attempt < MAX_RETRIES:
wait = BACKOFF_BASE ** attempt
time.sleep(wait)
logger.error("Webhook permanently failed for %s after %d attempts", url, MAX_RETRIES)
return False
def notify(event_type: str, data: dict[str, Any]) -> None:
"""Fire all configured webhooks for an event.
Safe to call even when no webhooks are configured (returns immediately).
Args:
event_type: Event identifier (e.g., "job_completed", "patent_alert")
data: Event data to include in the payload
"""
if not WEBHOOK_URLS:
return
for url in WEBHOOK_URLS:
slack = _is_slack_url(url)
payload = _build_payload(event_type, data, slack=slack)
_send_with_retry(url, payload)
def notify_job_completed(
job_id: str,
status: str,
total_companies: int,
successful: int,
failed: int,
) -> None:
"""Send notification when a batch job completes."""
notify("job_completed", {
"job_id": job_id,
"status": status,
"total_companies": total_companies,
"successful": successful,
"failed": failed,
"summary": f"Batch job {job_id}: {successful}/{total_companies} succeeded",
})
def notify_alert(
company_name: str,
alert_type: str,
message: str,
) -> None:
"""Send notification for a tracked company alert."""
notify("patent_alert", {
"company_name": company_name,
"alert_type": alert_type,
"message": message,
})
+8
View File
@@ -0,0 +1,8 @@
[lint]
select = ["E", "F", "I"]
ignore = [
"E501", # line too long (handled by formatter)
]
[lint.per-file-ignores]
"tests/*" = ["E402", "F841"] # allow import not at top of file, unused vars (mocks) in tests
+5 -3
View File
@@ -1,9 +1,11 @@
"""Tests for the high-level company analyzer orchestration."""
from unittest.mock import MagicMock, Mock
import pytest
from unittest.mock import Mock, patch, call, MagicMock
from SPARC.analyzer import CompanyAnalyzer
from SPARC.types import Patent, Patents, CompanyAnalysisResult, BatchAnalysisResult
from SPARC.types import BatchAnalysisResult, Patent, Patents
@pytest.fixture(autouse=True)
@@ -24,7 +26,7 @@ class TestCompanyAnalyzer:
"""Test analyzer initialization with API key."""
mock_llm = mocker.patch("SPARC.analyzer.LLMAnalyzer")
analyzer = CompanyAnalyzer(openrouter_api_key="test-key")
_analyzer = CompanyAnalyzer(openrouter_api_key="test-key") # noqa: F841
mock_llm.assert_called_once_with(api_key="test-key")
+4 -3
View File
@@ -1,12 +1,13 @@
"""Tests for FastAPI web service endpoints."""
import pytest
from datetime import datetime
from unittest.mock import Mock, patch
from unittest.mock import Mock
import pytest
from fastapi.testclient import TestClient
from SPARC.api import app
from SPARC.types import CompanyAnalysisResult, BatchAnalysisResult
from SPARC.types import BatchAnalysisResult, CompanyAnalysisResult
@pytest.fixture
+3 -1
View File
@@ -1,7 +1,9 @@
"""Tests for LLM analysis functionality."""
from unittest.mock import Mock
import pytest
from unittest.mock import Mock, MagicMock, patch
from SPARC.llm import LLMAnalyzer
+2 -3
View File
@@ -1,9 +1,8 @@
"""Tests for SERP API patent retrieval and parsing functionality."""
import os
import pytest
from unittest.mock import patch, Mock
from datetime import datetime, timedelta
from unittest.mock import Mock
from SPARC.serp_api import SERP
from SPARC.types import Patent