Compare commits

..

7 Commits

Author SHA1 Message Date
agent-company 2e6b8c7445 feat: add webhook notification support for job completion and alerts
Send HTTP POST notifications to configured webhook URLs when batch
jobs complete or when scheduled analysis detects significant changes.

- Add SPARC/webhooks.py with retry logic (3 attempts, exponential backoff)
- Support generic HTTP POST and Slack-compatible text payloads
- Integrate into batch job completion handler in api.py
- Configure via WEBHOOK_URLS env var (comma-separated)
- Payload includes event type, job ID, status, and summary

Closes leeworks-agents/SPARC#23

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-26 10:32:07 +00:00
AI-Manager 55c131cb32 Merge pull request 'ci: add pytest and ruff linting to CI workflow' (#32) from feature/ci-testing-linting into main 2026-03-26 07:04:31 +00:00
agent-company fbb72fe2a5 ci: add pytest and ruff linting to CI, fix all lint errors
- Add test job to build.yaml that runs pytest and ruff before building images
- Add standalone test.yaml workflow for PRs
- Add ruff.toml with E/F/I rules configured
- Fix all ruff lint errors: sort imports, remove unused imports, fix re-exports
- Build jobs now depend on test job passing (needs: test)

Closes leeworks-agents/SPARC#18
Closes leeworks-agents/SPARC#19

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-26 07:04:00 +00:00
AI-Manager e484baaf5f Merge pull request 'feat: configurable LLM model, SERP cache TTL, structured logging, fix type' (#29) from feature/p2-config-improvements into main 2026-03-26 07:03:08 +00:00
AI-Manager 069f1c343c Merge pull request 'refactor(db): shared pooled DatabaseClient singleton' (#30) from feature/db-client-pooling into main 2026-03-26 07:02:46 +00:00
agent-company d366443b38 refactor(db): use shared pooled DatabaseClient singleton instead of per-call instances
- Replace get_db_client() creating new DatabaseClient on every call with a
  module-level singleton initialized once at startup via init_db_client()
- Add init_db_client() and close_db_client() lifecycle functions called
  from FastAPI lifespan handler
- Migrate all DatabaseClient methods from legacy self.connect()/self.conn
  to pooled self.get_conn() context manager for thread-safe connection reuse
- Pool is properly torn down on application shutdown

Closes leeworks-agents/SPARC#7

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-26 06:03:56 +00:00
agent-company b000146585 feat: configurable LLM model, SERP cache TTL, structured logging, fix patent_id type
- Make LLM model configurable via MODEL env var, default anthropic/claude-3.5-sonnet (#12)
- Expose SERP cache TTL as SERP_CACHE_TTL_HOURS env var, default 24 hours (#13)
- Fix Patent.patent_id type annotation from int to str in types.py (#14)
- Replace all print() calls with structured logging in analyzer.py and llm.py (#11)
- Add LOG_LEVEL config with basicConfig setup in config.py
- Add model and serp_cache_ttl_hours to config.py

Closes leeworks-agents/SPARC#11
Closes leeworks-agents/SPARC#12
Closes leeworks-agents/SPARC#13
Closes leeworks-agents/SPARC#14

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-26 06:03:25 +00:00
9 changed files with 390 additions and 194 deletions
+6
View File
@@ -40,3 +40,9 @@ JWT_SECRET=your-secure-jwt-secret-change-in-production
# When USE_CACHE=true: check database for cached responses before making API calls
# When USE_CACHE=false: always make fresh API calls (still stores results in database)
USE_CACHE=true
# ---- Webhooks ----
# Comma-separated list of webhook URLs for job completion and alert notifications
# Supports generic HTTP POST and Slack/Discord incoming webhooks
# WEBHOOK_URLS=https://hooks.slack.com/services/XXX,https://example.com/webhook
+21 -16
View File
@@ -5,10 +5,13 @@ to provide company performance estimation based on patent portfolios.
"""
import hashlib
import logging
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Callable
from SPARC import config
logger = logging.getLogger(__name__)
from SPARC.database import DatabaseClient
from SPARC.llm import LLMAnalyzer
from SPARC.serp_api import SERP
@@ -52,13 +55,13 @@ class CompanyAnalyzer:
query_hash = hashlib.sha256(company_name.lower().encode()).hexdigest()
cached_ids = self.db.get_cached_serp_query(query_hash)
if cached_ids is not None:
print(f"Using cached SERP results for {company_name} ({len(cached_ids)} patents)")
logger.info("Using cached SERP results for %s (%d patents)", company_name, len(cached_ids))
patents = Patents(patents=[
Patent(patent_id=pid, pdf_link="")
for pid in cached_ids
])
else:
print(f"Retrieving patents for {company_name}...")
logger.info("Retrieving patents for %s...", company_name)
patents = SERP.query(company_name)
# Cache the SERP results
if patents.patents:
@@ -66,12 +69,13 @@ class CompanyAnalyzer:
company_name=company_name,
query_hash=query_hash,
patent_ids=[p.patent_id for p in patents.patents],
ttl_hours=config.serp_cache_ttl_hours,
)
if not patents.patents:
return f"No patents found for {company_name}"
print(f"Found {len(patents.patents)} patents. Processing...")
logger.info("Found %d patents. Processing...", len(patents.patents))
# Download, parse, and minimize patents in parallel
processed_patents = []
@@ -87,12 +91,12 @@ class CompanyAnalyzer:
if result:
processed_patents.append(result)
except Exception as e:
print(f"Warning: Failed to process {patent.patent_id}: {e}")
logger.warning("Failed to process %s: %s", patent.patent_id, e)
if not processed_patents:
return f"Failed to process any patents for {company_name}"
print("Analyzing portfolio with LLM...")
logger.info("Analyzing portfolio with LLM...")
# Analyze the full portfolio with LLM
analysis = self.llm_analyzer.analyze_patent_portfolio(
@@ -122,6 +126,7 @@ class CompanyAnalyzer:
FileNotFoundError: If the patent PDF is not found at the expected path.
"""
import os
logger.info("Analyzing patent %s for %s...", patent_id, company_name)
patent_path = f"patents/{patent_id}.pdf"
@@ -183,7 +188,7 @@ class CompanyAnalyzer:
return {"patent_id": patent.patent_id, "content": minimized_content}
except Exception as e:
print(f"Warning: Failed to process {patent.patent_id}: {e}")
logger.warning("Failed to process %s: %s", patent.patent_id, e)
return None
def _analyze_company_safe(self, company_name: str) -> CompanyAnalysisResult:
@@ -254,7 +259,7 @@ class CompanyAnalyzer:
results: list[CompanyAnalysisResult] = []
total = len(companies)
print(f"Starting batch analysis of {total} companies...")
logger.info("Starting batch analysis of %d companies...", total)
with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_company = {
@@ -271,8 +276,8 @@ class CompanyAnalyzer:
result = future.result()
results.append(result)
status = "" if result.success else ""
print(f"[{completed}/{total}] {status} {company}")
status = "OK" if result.success else "FAIL"
logger.info("[%d/%d] %s %s", completed, total, status, company)
if progress_callback:
progress_callback(company, completed, total)
@@ -287,12 +292,12 @@ class CompanyAnalyzer:
error=str(e),
)
)
print(f"[{completed}/{total}] ✗ {company}: {e}")
logger.error("[%d/%d] FAIL %s: %s", completed, total, company, e)
successful = sum(1 for r in results if r.success)
failed = total - successful
print(f"\nBatch complete: {successful} succeeded, {failed} failed")
logger.info("Batch complete: %d succeeded, %d failed", successful, failed)
return BatchAnalysisResult(
results=results,
@@ -318,20 +323,20 @@ class CompanyAnalyzer:
results: list[CompanyAnalysisResult] = []
total = len(companies)
print(f"Starting sequential analysis of {total} companies...")
logger.info("Starting sequential analysis of %d companies...", total)
for idx, company in enumerate(companies, 1):
print(f"\n[{idx}/{total}] Analyzing {company}...")
logger.info("[%d/%d] Analyzing %s...", idx, total, company)
result = self._analyze_company_safe(company)
results.append(result)
status = "" if result.success else ""
print(f"[{idx}/{total}] {status} {company}")
status = "OK" if result.success else "FAIL"
logger.info("[%d/%d] %s %s", idx, total, status, company)
successful = sum(1 for r in results if r.success)
failed = total - successful
print(f"\nBatch complete: {successful} succeeded, {failed} failed")
logger.info("Batch complete: %d succeeded, %d failed", successful, failed)
return BatchAnalysisResult(
results=results,
+22 -1
View File
@@ -21,11 +21,13 @@ from SPARC.auth import (
TokenResponse,
UserResponse,
check_jwt_secret,
close_db_client,
create_tokens,
decode_token,
get_current_admin,
get_current_user,
get_db_client,
init_db_client,
)
from SPARC.types import BatchAnalysisResult, CompanyAnalysisResult
@@ -155,6 +157,7 @@ async def lifespan(app: FastAPI):
"""Initialize resources on startup, clean up on shutdown."""
global _analyzer
check_jwt_secret()
init_db_client()
_analyzer = CompanyAnalyzer()
# Mark any jobs that were running/pending before the restart as failed
from SPARC.database import DatabaseClient
@@ -167,8 +170,9 @@ async def lifespan(app: FastAPI):
logging.getLogger(__name__).warning("Marked %d stale jobs as failed on startup", stale)
_db.close()
yield
# Cleanup if needed
# Cleanup
_analyzer = None
close_db_client()
app = FastAPI(
@@ -515,8 +519,25 @@ def _run_batch_job(job_id: str, companies: list[str], max_workers: int):
progress=100,
result_json=_json.dumps(batch_response.model_dump(), default=str),
)
# Fire webhook notification
from SPARC.webhooks import notify_job_completed
notify_job_completed(
job_id=job_id,
status="completed",
total_companies=result.total_companies,
successful=result.successful,
failed=result.failed,
)
except Exception as e:
db.update_job(job_id, status="failed", error=str(e))
from SPARC.webhooks import notify_job_completed
notify_job_completed(
job_id=job_id,
status="failed",
total_companies=len(companies),
successful=0,
failed=len(companies),
)
@app.post("/analyze/batch/async", response_model=JobStatus, tags=["Analysis"])
+29 -4
View File
@@ -146,11 +146,36 @@ def decode_token(token: str) -> Optional[TokenPayload]:
return None
# Shared database client singleton, initialized at startup via init_db_client()
_db_client: DatabaseClient | None = None
def init_db_client() -> None:
"""Initialize the shared database client. Call once at app startup."""
global _db_client
_db_client = DatabaseClient(config.database_url)
_db_client.connect()
def close_db_client() -> None:
"""Close the shared database client. Call at app shutdown."""
global _db_client
if _db_client:
_db_client.close()
_db_client = None
def get_db_client() -> DatabaseClient:
"""Get database client for auth operations."""
client = DatabaseClient(config.database_url)
client.connect()
return client
"""Get the shared pooled database client for auth operations.
Returns the module-level singleton DatabaseClient. If not yet initialized
(e.g., during tests), creates a new instance as a fallback.
"""
global _db_client
if _db_client is None:
_db_client = DatabaseClient(config.database_url)
_db_client.connect()
return _db_client
async def get_current_user(
+14
View File
@@ -2,12 +2,20 @@
Loads environment variables from .env file for API keys and other secrets.
"""
import logging
import os
from dotenv import load_dotenv
load_dotenv()
# Logging configuration
log_level = os.getenv("LOG_LEVEL", "INFO").upper()
logging.basicConfig(
level=getattr(logging, log_level, logging.INFO),
format="%(asctime)s %(levelname)s %(name)s %(message)s",
)
# SerpAPI key for patent search
api_key = os.getenv("API_KEY")
@@ -31,6 +39,12 @@ use_database = os.getenv("USE_DATABASE", "false").lower() in ("true", "1", "yes"
patent_search_days = int(os.getenv("PATENT_SEARCH_DAYS", "90"))
patent_thread_workers = int(os.getenv("PATENT_THREAD_WORKERS", "5"))
# LLM model to use via OpenRouter (e.g. "anthropic/claude-3.5-sonnet", "openai/gpt-4o")
model = os.getenv("MODEL", "anthropic/claude-3.5-sonnet")
# SERP cache TTL in hours (how long cached search results are considered fresh)
serp_cache_ttl_hours = int(os.getenv("SERP_CACHE_TTL_HOURS", "24"))
# Root path for running behind a reverse proxy (e.g., "/api" when served at /api/)
# This ensures OpenAPI docs work correctly when accessed via the proxy
root_path = os.getenv("ROOT_PATH", "")
+28 -41
View File
@@ -222,8 +222,6 @@ class DatabaseClient:
Returns:
Cached message dict if found, None otherwise
"""
self.connect()
prompt_hash = self.hash_prompt(prompt)
query = """
@@ -246,7 +244,8 @@ class DatabaseClient:
query += " ORDER BY timestamp DESC LIMIT 1"
with self.conn.cursor(cursor_factory=RealDictCursor) as cursor:
with self.get_conn() as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cursor:
cursor.execute(query, params)
result = cursor.fetchone()
return dict(result) if result else None
@@ -277,11 +276,10 @@ class DatabaseClient:
Returns:
The ID of the inserted record
"""
self.connect()
prompt_hash = self.hash_prompt(prompt)
with self.conn.cursor() as cursor:
with self.get_conn() as conn:
with conn.cursor() as cursor:
cursor.execute(
"""
INSERT INTO llm_messages
@@ -303,7 +301,7 @@ class DatabaseClient:
)
message_id = cursor.fetchone()[0]
self.conn.commit()
conn.commit()
return message_id
@@ -325,8 +323,6 @@ class DatabaseClient:
Returns:
List of message dictionaries
"""
self.connect()
query = "SELECT * FROM llm_messages WHERE 1=1"
params = []
@@ -341,7 +337,8 @@ class DatabaseClient:
query += " ORDER BY timestamp DESC LIMIT %s OFFSET %s"
params.extend([limit, offset])
with self.conn.cursor(cursor_factory=RealDictCursor) as cursor:
with self.get_conn() as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cursor:
cursor.execute(query, params)
return [dict(row) for row in cursor.fetchall()]
@@ -354,9 +351,8 @@ class DatabaseClient:
Returns:
Dictionary with analytics data
"""
self.connect()
with self.conn.cursor(cursor_factory=RealDictCursor) as cursor:
with self.get_conn() as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cursor:
# Total messages
cursor.execute(
"""
@@ -651,12 +647,11 @@ class DatabaseClient:
Returns:
Created user dict or None if email exists
"""
self.connect()
password_hash = self.hash_password(password)
try:
with self.conn.cursor(cursor_factory=RealDictCursor) as cursor:
with self.get_conn() as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cursor:
cursor.execute(
"""
INSERT INTO users (email, password_hash, role)
@@ -666,10 +661,9 @@ class DatabaseClient:
(email, password_hash, role),
)
user = cursor.fetchone()
self.conn.commit()
conn.commit()
return dict(user) if user else None
except psycopg2.errors.UniqueViolation:
self.conn.rollback()
return None
def authenticate_user(self, email: str, password: str) -> Optional[Dict]:
@@ -682,9 +676,8 @@ class DatabaseClient:
Returns:
User dict if authenticated, None otherwise
"""
self.connect()
with self.conn.cursor(cursor_factory=RealDictCursor) as cursor:
with self.get_conn() as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cursor:
cursor.execute(
"SELECT * FROM users WHERE email = %s",
(email,),
@@ -709,9 +702,8 @@ class DatabaseClient:
Returns:
User dict or None
"""
self.connect()
with self.conn.cursor(cursor_factory=RealDictCursor) as cursor:
with self.get_conn() as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cursor:
cursor.execute(
"SELECT id, email, role, created_at FROM users WHERE id = %s",
(user_id,),
@@ -728,9 +720,8 @@ class DatabaseClient:
Returns:
User dict or None
"""
self.connect()
with self.conn.cursor(cursor_factory=RealDictCursor) as cursor:
with self.get_conn() as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cursor:
cursor.execute(
"SELECT id, email, role, created_at FROM users WHERE email = %s",
(email,),
@@ -748,9 +739,8 @@ class DatabaseClient:
Returns:
List of user dicts
"""
self.connect()
with self.conn.cursor(cursor_factory=RealDictCursor) as cursor:
with self.get_conn() as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cursor:
cursor.execute(
"""
SELECT id, email, role, created_at
@@ -772,9 +762,8 @@ class DatabaseClient:
Returns:
Updated user dict or None
"""
self.connect()
with self.conn.cursor(cursor_factory=RealDictCursor) as cursor:
with self.get_conn() as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cursor:
cursor.execute(
"""
UPDATE users
@@ -785,7 +774,7 @@ class DatabaseClient:
(role, user_id),
)
user = cursor.fetchone()
self.conn.commit()
conn.commit()
return dict(user) if user else None
def delete_user(self, user_id: int) -> bool:
@@ -797,12 +786,11 @@ class DatabaseClient:
Returns:
True if deleted
"""
self.connect()
with self.conn.cursor() as cursor:
with self.get_conn() as conn:
with conn.cursor() as cursor:
cursor.execute("DELETE FROM users WHERE id = %s", (user_id,))
deleted = cursor.rowcount > 0
self.conn.commit()
conn.commit()
return deleted
def get_user_count(self) -> int:
@@ -811,8 +799,7 @@ class DatabaseClient:
Returns:
Number of users
"""
self.connect()
with self.conn.cursor() as cursor:
with self.get_conn() as conn:
with conn.cursor() as cursor:
cursor.execute("SELECT COUNT(*) FROM users")
return cursor.fetchone()[0]
+6 -7
View File
@@ -1,5 +1,6 @@
"""LLM integration for patent analysis using OpenRouter."""
import logging
from typing import Dict
from openai import OpenAI
@@ -7,6 +8,8 @@ from openai import OpenAI
from SPARC import config
from SPARC.database import DatabaseClient
logger = logging.getLogger(__name__)
class LLMAnalyzer:
"""Handles LLM-based analysis of patent content."""
@@ -22,7 +25,7 @@ class LLMAnalyzer:
"""
self.test_mode = test_mode
self.use_cache = use_cache if use_cache is not None else config.use_cache
self.model = "anthropic/claude-3.5-sonnet"
self.model = config.model
# Always initialize database client for storage and caching
self.db_client = DatabaseClient(config.database_url)
@@ -61,11 +64,7 @@ Patent Content:
Provide a concise analysis (2-3 paragraphs) focusing on what this patent reveals about the company's technical direction and competitive advantage."""
if self.test_mode:
print("=" * 80)
print("TEST MODE - Prompt that would be sent to LLM:")
print("=" * 80)
print(prompt)
print("=" * 80)
logger.debug("TEST MODE - Prompt that would be sent to LLM:\n%s", prompt)
return "[TEST MODE - No API call made]"
# Check cache first
@@ -167,7 +166,7 @@ Patent Portfolio:
Provide a comprehensive analysis (4-5 paragraphs) with a final verdict on the company's innovation strength and performance outlook."""
if self.test_mode:
print(prompt)
logger.debug("TEST MODE - Portfolio prompt:\n%s", prompt)
return "[TEST MODE]"
metadata = {
+1 -1
View File
@@ -4,7 +4,7 @@ from datetime import datetime
@dataclass
class Patent:
patent_id: int
patent_id: str
pdf_link: str
pdf_path: str | None = None
summary: dict | None = None
+139
View File
@@ -0,0 +1,139 @@
"""Webhook notifications for job completion and alert events.
Sends JSON payloads to configured webhook URLs with retry logic.
Supports generic HTTP POST and Slack-compatible text payloads.
"""
import logging
import os
import time
from datetime import datetime
from typing import Any
import requests
logger = logging.getLogger(__name__)
# Comma-separated list of webhook URLs (env var based config)
_WEBHOOK_URLS_RAW = os.getenv("WEBHOOK_URLS", "")
WEBHOOK_URLS: list[str] = [
url.strip() for url in _WEBHOOK_URLS_RAW.split(",") if url.strip()
]
MAX_RETRIES = 3
BACKOFF_BASE = 2 # seconds
def _is_slack_url(url: str) -> bool:
"""Check if a URL looks like a Slack incoming webhook."""
return "hooks.slack.com" in url or "discord.com/api/webhooks" in url
def _build_payload(event_type: str, data: dict[str, Any], slack: bool = False) -> dict:
"""Build the webhook payload.
Args:
event_type: Type of event (e.g., "job_completed", "alert")
data: Event-specific data
slack: If True, wrap in Slack-compatible ``text`` format
Returns:
JSON-serializable payload dict
"""
payload = {
"event": event_type,
"timestamp": datetime.utcnow().isoformat() + "Z",
**data,
}
if slack:
# Build a human-readable summary for Slack/Discord
lines = [f"*[SPARC] {event_type}*"]
for key, value in data.items():
lines.append(f" {key}: {value}")
return {"text": "\n".join(lines)}
return payload
def _send_with_retry(url: str, payload: dict) -> bool:
"""Send a POST request with exponential backoff retry.
Args:
url: Webhook URL
payload: JSON payload to send
Returns:
True if delivered successfully, False after all retries exhausted
"""
for attempt in range(1, MAX_RETRIES + 1):
try:
response = requests.post(url, json=payload, timeout=10)
if response.status_code < 300:
logger.debug("Webhook delivered to %s (attempt %d)", url, attempt)
return True
logger.warning(
"Webhook %s returned %d (attempt %d/%d)",
url, response.status_code, attempt, MAX_RETRIES,
)
except requests.RequestException as e:
logger.warning(
"Webhook delivery failed for %s (attempt %d/%d): %s",
url, attempt, MAX_RETRIES, e,
)
if attempt < MAX_RETRIES:
wait = BACKOFF_BASE ** attempt
time.sleep(wait)
logger.error("Webhook permanently failed for %s after %d attempts", url, MAX_RETRIES)
return False
def notify(event_type: str, data: dict[str, Any]) -> None:
"""Fire all configured webhooks for an event.
Safe to call even when no webhooks are configured (returns immediately).
Args:
event_type: Event identifier (e.g., "job_completed", "patent_alert")
data: Event data to include in the payload
"""
if not WEBHOOK_URLS:
return
for url in WEBHOOK_URLS:
slack = _is_slack_url(url)
payload = _build_payload(event_type, data, slack=slack)
_send_with_retry(url, payload)
def notify_job_completed(
job_id: str,
status: str,
total_companies: int,
successful: int,
failed: int,
) -> None:
"""Send notification when a batch job completes."""
notify("job_completed", {
"job_id": job_id,
"status": status,
"total_companies": total_companies,
"successful": successful,
"failed": failed,
"summary": f"Batch job {job_id}: {successful}/{total_companies} succeeded",
})
def notify_alert(
company_name: str,
alert_type: str,
message: str,
) -> None:
"""Send notification for a tracked company alert."""
notify("patent_alert", {
"company_name": company_name,
"alert_type": alert_type,
"message": message,
})