Compare commits

...

6 Commits

Author SHA1 Message Date
agent-company f33447eef8 feat: implement scheduled/recurring analysis with change alerting
Add APScheduler-based background task that periodically re-analyzes
tracked companies and alerts on significant patent count changes.

- Add tracked_companies and alerts tables to database schema
- Add SPARC/scheduler.py with configurable interval and threshold
- Add admin endpoints: GET/POST/DELETE /admin/tracked, GET /admin/alerts
- Scheduler starts at app startup; interval via SCHEDULE_INTERVAL_HOURS
- Change threshold configurable via CHANGE_THRESHOLD_PERCENT env var
- apscheduler is optional; graceful fallback if not installed

Closes leeworks-agents/SPARC#22

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-26 10:30:43 +00:00
AI-Manager 55c131cb32 Merge pull request 'ci: add pytest and ruff linting to CI workflow' (#32) from feature/ci-testing-linting into main 2026-03-26 07:04:31 +00:00
agent-company fbb72fe2a5 ci: add pytest and ruff linting to CI, fix all lint errors
- Add test job to build.yaml that runs pytest and ruff before building images
- Add standalone test.yaml workflow for PRs
- Add ruff.toml with E/F/I rules configured
- Fix all ruff lint errors: sort imports, remove unused imports, fix re-exports
- Build jobs now depend on test job passing (needs: test)

Closes leeworks-agents/SPARC#18
Closes leeworks-agents/SPARC#19

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-26 07:04:00 +00:00
AI-Manager e484baaf5f Merge pull request 'feat: configurable LLM model, SERP cache TTL, structured logging, fix type' (#29) from feature/p2-config-improvements into main 2026-03-26 07:03:08 +00:00
AI-Manager 069f1c343c Merge pull request 'refactor(db): shared pooled DatabaseClient singleton' (#30) from feature/db-client-pooling into main 2026-03-26 07:02:46 +00:00
agent-company d366443b38 refactor(db): use shared pooled DatabaseClient singleton instead of per-call instances
- Replace get_db_client() creating new DatabaseClient on every call with a
  module-level singleton initialized once at startup via init_db_client()
- Add init_db_client() and close_db_client() lifecycle functions called
  from FastAPI lifespan handler
- Migrate all DatabaseClient methods from legacy self.connect()/self.conn
  to pooled self.get_conn() context manager for thread-safe connection reuse
- Pool is properly torn down on application shutdown

Closes leeworks-agents/SPARC#7

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-26 06:03:56 +00:00
15 changed files with 584 additions and 194 deletions
+37
View File
@@ -9,7 +9,43 @@ on:
workflow_dispatch:
jobs:
test:
runs-on: ubuntu-latest
steps:
- name: Install system dependencies
shell: sh
run: |
apk add --no-cache git python3 py3-pip gcc musl-dev libpq-dev python3-dev
- name: Checkout code
shell: sh
run: |
git clone http://gitea.gitea.svc.cluster.local/${{ gitea.repository }}.git .
git checkout ${{ gitea.sha }}
- name: Install Python dependencies
shell: sh
run: |
pip3 install --break-system-packages -r requirements.txt ruff
- name: Run ruff linter
shell: sh
run: |
ruff check SPARC/ tests/
- name: Run pytest
shell: sh
env:
DATABASE_URL: "sqlite://"
API_KEY: "test-key"
OPENROUTER_API_KEY: "test-key"
JWT_SECRET: "test-secret-for-ci"
APP_ENV: "development"
run: |
python3 -m pytest tests/ -v --tb=short -x
build-api:
needs: test
runs-on: ubuntu-latest
steps:
- name: Install dependencies
@@ -81,6 +117,7 @@ jobs:
echo "API image available at ${{ steps.tags.outputs.IMAGE_TAG }}"
build-frontend:
needs: test
runs-on: ubuntu-latest
steps:
- name: Install dependencies
+46
View File
@@ -0,0 +1,46 @@
name: Test and Lint
on:
push:
branches:
- main
pull_request:
branches:
- main
workflow_dispatch:
jobs:
test:
runs-on: ubuntu-latest
steps:
- name: Install system dependencies
shell: sh
run: |
apk add --no-cache git python3 py3-pip gcc musl-dev libpq-dev python3-dev
- name: Checkout code
shell: sh
run: |
git clone http://gitea.gitea.svc.cluster.local/${{ gitea.repository }}.git .
git checkout ${{ gitea.sha }}
- name: Install Python dependencies
shell: sh
run: |
pip3 install --break-system-packages -r requirements.txt ruff
- name: Run ruff linter
shell: sh
run: |
ruff check SPARC/ tests/
- name: Run pytest
shell: sh
env:
DATABASE_URL: "sqlite://"
API_KEY: "test-key"
OPENROUTER_API_KEY: "test-key"
JWT_SECRET: "test-secret-for-ci"
APP_ENV: "development"
run: |
python3 -m pytest tests/ -v --tb=short -x
+3 -2
View File
@@ -1,3 +1,4 @@
from .types import Patents, Patent
from .types import Patent as Patent
from .types import Patents as Patents
all = ["Patents", "Patent"]
__all__ = ["Patents", "Patent"]
+2 -2
View File
@@ -13,9 +13,9 @@ from SPARC import config
logger = logging.getLogger(__name__)
from SPARC.database import DatabaseClient
from SPARC.serp_api import SERP
from SPARC.llm import LLMAnalyzer
from SPARC.types import Patent, Patents, CompanyAnalysisResult, BatchAnalysisResult
from SPARC.serp_api import SERP
from SPARC.types import BatchAnalysisResult, CompanyAnalysisResult, Patent, Patents
class CompanyAnalyzer:
+62 -1
View File
@@ -21,11 +21,13 @@ from SPARC.auth import (
TokenResponse,
UserResponse,
check_jwt_secret,
close_db_client,
create_tokens,
decode_token,
get_current_admin,
get_current_user,
get_db_client,
init_db_client,
)
from SPARC.types import BatchAnalysisResult, CompanyAnalysisResult
@@ -155,6 +157,7 @@ async def lifespan(app: FastAPI):
"""Initialize resources on startup, clean up on shutdown."""
global _analyzer
check_jwt_secret()
init_db_client()
_analyzer = CompanyAnalyzer()
# Mark any jobs that were running/pending before the restart as failed
from SPARC.database import DatabaseClient
@@ -166,9 +169,13 @@ async def lifespan(app: FastAPI):
import logging
logging.getLogger(__name__).warning("Marked %d stale jobs as failed on startup", stale)
_db.close()
# Start scheduled analysis if tracked companies are configured
from SPARC.scheduler import start_scheduler
start_scheduler()
yield
# Cleanup if needed
# Cleanup
_analyzer = None
close_db_client()
app = FastAPI(
@@ -365,6 +372,60 @@ async def delete_user(
return {"message": "User deleted"}
# ============== Tracked Companies Endpoints ==============
class TrackCompanyRequest(BaseModel):
"""Request to add a company to tracking."""
company_name: str = Field(..., min_length=1, max_length=255)
@app.get("/admin/tracked", tags=["Admin"])
async def list_tracked_companies(
_: UserResponse = Depends(get_current_admin),
):
"""List all tracked companies (admin only)."""
db = get_db_client()
return db.list_tracked_companies()
@app.post("/admin/tracked", tags=["Admin"])
async def add_tracked_company(
request: TrackCompanyRequest,
_: UserResponse = Depends(get_current_admin),
):
"""Add a company to the tracked list (admin only)."""
db = get_db_client()
result = db.add_tracked_company(request.company_name)
if not result:
raise HTTPException(status_code=409, detail="Company already tracked")
return result
@app.delete("/admin/tracked/{company_name}", tags=["Admin"])
async def remove_tracked_company(
company_name: str,
_: UserResponse = Depends(get_current_admin),
):
"""Remove a company from the tracked list (admin only)."""
db = get_db_client()
removed = db.remove_tracked_company(company_name)
if not removed:
raise HTTPException(status_code=404, detail="Company not found in tracking list")
return {"message": f"Stopped tracking {company_name}"}
@app.get("/admin/alerts", tags=["Admin"])
async def list_alerts(
limit: int = Query(default=50, ge=1, le=200),
_: UserResponse = Depends(get_current_admin),
):
"""List recent alerts from scheduled analysis (admin only)."""
db = get_db_client()
return db.list_alerts(limit=limit)
# ============== Analytics Endpoint ==============
+29 -4
View File
@@ -146,11 +146,36 @@ def decode_token(token: str) -> Optional[TokenPayload]:
return None
# Shared database client singleton, initialized at startup via init_db_client()
_db_client: DatabaseClient | None = None
def init_db_client() -> None:
"""Initialize the shared database client. Call once at app startup."""
global _db_client
_db_client = DatabaseClient(config.database_url)
_db_client.connect()
def close_db_client() -> None:
"""Close the shared database client. Call at app shutdown."""
global _db_client
if _db_client:
_db_client.close()
_db_client = None
def get_db_client() -> DatabaseClient:
"""Get database client for auth operations."""
client = DatabaseClient(config.database_url)
client.connect()
return client
"""Get the shared pooled database client for auth operations.
Returns the module-level singleton DatabaseClient. If not yet initialized
(e.g., during tests), creates a new instance as a fallback.
"""
global _db_client
if _db_client is None:
_db_client = DatabaseClient(config.database_url)
_db_client.connect()
return _db_client
async def get_current_user(
+265 -170
View File
@@ -1,14 +1,15 @@
"""Database client for storing and retrieving LLM messages and user authentication."""
import contextlib
import psycopg2
from psycopg2.pool import ThreadedConnectionPool
from psycopg2.extras import RealDictCursor
from typing import Dict, List, Optional
from datetime import datetime, timedelta
import json
import hashlib
import json
from datetime import datetime, timedelta
from typing import Dict, List, Optional
import bcrypt
import psycopg2
from psycopg2.extras import RealDictCursor
from psycopg2.pool import ThreadedConnectionPool
class DatabaseClient:
@@ -191,6 +192,35 @@ class DatabaseClient:
ON jobs(status)
""")
# Create tracked companies table for scheduled analysis
cursor.execute("""
CREATE TABLE IF NOT EXISTS tracked_companies (
id SERIAL PRIMARY KEY,
company_name VARCHAR(255) UNIQUE NOT NULL,
last_patent_count INTEGER DEFAULT 0,
last_analysis_at TIMESTAMP,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
# Create alerts table for significant changes
cursor.execute("""
CREATE TABLE IF NOT EXISTS alerts (
id SERIAL PRIMARY KEY,
company_name VARCHAR(255) NOT NULL,
alert_type VARCHAR(50) NOT NULL,
message TEXT NOT NULL,
old_value NUMERIC,
new_value NUMERIC,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_alerts_company
ON alerts(company_name)
""")
self.conn.commit()
@staticmethod
@@ -221,8 +251,6 @@ class DatabaseClient:
Returns:
Cached message dict if found, None otherwise
"""
self.connect()
prompt_hash = self.hash_prompt(prompt)
query = """
@@ -245,10 +273,11 @@ class DatabaseClient:
query += " ORDER BY timestamp DESC LIMIT 1"
with self.conn.cursor(cursor_factory=RealDictCursor) as cursor:
cursor.execute(query, params)
result = cursor.fetchone()
return dict(result) if result else None
with self.get_conn() as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cursor:
cursor.execute(query, params)
result = cursor.fetchone()
return dict(result) if result else None
def store_message(
self,
@@ -276,33 +305,32 @@ class DatabaseClient:
Returns:
The ID of the inserted record
"""
self.connect()
prompt_hash = self.hash_prompt(prompt)
with self.conn.cursor() as cursor:
cursor.execute(
"""
INSERT INTO llm_messages
(prompt, prompt_hash, response, company_name, analysis_type, model, metadata, token_usage, is_cached)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
RETURNING id
""",
(
prompt,
prompt_hash,
response,
company_name,
analysis_type,
model,
json.dumps(metadata) if metadata else None,
json.dumps(token_usage) if token_usage else None,
is_cached,
),
)
with self.get_conn() as conn:
with conn.cursor() as cursor:
cursor.execute(
"""
INSERT INTO llm_messages
(prompt, prompt_hash, response, company_name, analysis_type, model, metadata, token_usage, is_cached)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
RETURNING id
""",
(
prompt,
prompt_hash,
response,
company_name,
analysis_type,
model,
json.dumps(metadata) if metadata else None,
json.dumps(token_usage) if token_usage else None,
is_cached,
),
)
message_id = cursor.fetchone()[0]
self.conn.commit()
message_id = cursor.fetchone()[0]
conn.commit()
return message_id
@@ -324,8 +352,6 @@ class DatabaseClient:
Returns:
List of message dictionaries
"""
self.connect()
query = "SELECT * FROM llm_messages WHERE 1=1"
params = []
@@ -340,9 +366,10 @@ class DatabaseClient:
query += " ORDER BY timestamp DESC LIMIT %s OFFSET %s"
params.extend([limit, offset])
with self.conn.cursor(cursor_factory=RealDictCursor) as cursor:
cursor.execute(query, params)
return [dict(row) for row in cursor.fetchall()]
with self.get_conn() as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cursor:
cursor.execute(query, params)
return [dict(row) for row in cursor.fetchall()]
def get_analytics(self, days: int = 30) -> Dict:
"""Get analytics on message usage.
@@ -353,53 +380,52 @@ class DatabaseClient:
Returns:
Dictionary with analytics data
"""
self.connect()
with self.get_conn() as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cursor:
# Total messages
cursor.execute(
"""
SELECT COUNT(*) as total_messages
FROM llm_messages
WHERE timestamp >= NOW() - INTERVAL '%s days'
""",
(days,),
)
total = cursor.fetchone()["total_messages"]
with self.conn.cursor(cursor_factory=RealDictCursor) as cursor:
# Total messages
cursor.execute(
"""
SELECT COUNT(*) as total_messages
FROM llm_messages
WHERE timestamp >= NOW() - INTERVAL '%s days'
""",
(days,),
)
total = cursor.fetchone()["total_messages"]
# Messages by company
cursor.execute(
"""
SELECT company_name, COUNT(*) as count
FROM llm_messages
WHERE timestamp >= NOW() - INTERVAL '%s days'
GROUP BY company_name
ORDER BY count DESC
LIMIT 10
""",
(days,),
)
by_company = cursor.fetchall()
# Messages by company
cursor.execute(
"""
SELECT company_name, COUNT(*) as count
FROM llm_messages
WHERE timestamp >= NOW() - INTERVAL '%s days'
GROUP BY company_name
ORDER BY count DESC
LIMIT 10
""",
(days,),
)
by_company = cursor.fetchall()
# Messages by type
cursor.execute(
"""
SELECT analysis_type, COUNT(*) as count
FROM llm_messages
WHERE timestamp >= NOW() - INTERVAL '%s days'
GROUP BY analysis_type
ORDER BY count DESC
""",
(days,),
)
by_type = cursor.fetchall()
# Messages by type
cursor.execute(
"""
SELECT analysis_type, COUNT(*) as count
FROM llm_messages
WHERE timestamp >= NOW() - INTERVAL '%s days'
GROUP BY analysis_type
ORDER BY count DESC
""",
(days,),
)
by_type = cursor.fetchall()
return {
"total_messages": total,
"by_company": [dict(row) for row in by_company],
"by_type": [dict(row) for row in by_type],
"period_days": days,
}
return {
"total_messages": total,
"by_company": [dict(row) for row in by_company],
"by_type": [dict(row) for row in by_type],
"period_days": days,
}
# Patent Cache Methods
@@ -650,25 +676,23 @@ class DatabaseClient:
Returns:
Created user dict or None if email exists
"""
self.connect()
password_hash = self.hash_password(password)
try:
with self.conn.cursor(cursor_factory=RealDictCursor) as cursor:
cursor.execute(
"""
INSERT INTO users (email, password_hash, role)
VALUES (%s, %s, %s)
RETURNING id, email, role, created_at
""",
(email, password_hash, role),
)
user = cursor.fetchone()
self.conn.commit()
with self.get_conn() as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cursor:
cursor.execute(
"""
INSERT INTO users (email, password_hash, role)
VALUES (%s, %s, %s)
RETURNING id, email, role, created_at
""",
(email, password_hash, role),
)
user = cursor.fetchone()
conn.commit()
return dict(user) if user else None
except psycopg2.errors.UniqueViolation:
self.conn.rollback()
return None
def authenticate_user(self, email: str, password: str) -> Optional[Dict]:
@@ -681,23 +705,22 @@ class DatabaseClient:
Returns:
User dict if authenticated, None otherwise
"""
self.connect()
with self.get_conn() as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cursor:
cursor.execute(
"SELECT * FROM users WHERE email = %s",
(email,),
)
user = cursor.fetchone()
with self.conn.cursor(cursor_factory=RealDictCursor) as cursor:
cursor.execute(
"SELECT * FROM users WHERE email = %s",
(email,),
)
user = cursor.fetchone()
if user and self.verify_password(password, user["password_hash"]):
return {
"id": user["id"],
"email": user["email"],
"role": user["role"],
"created_at": user["created_at"],
}
return None
if user and self.verify_password(password, user["password_hash"]):
return {
"id": user["id"],
"email": user["email"],
"role": user["role"],
"created_at": user["created_at"],
}
return None
def get_user_by_id(self, user_id: int) -> Optional[Dict]:
"""Get a user by ID.
@@ -708,15 +731,14 @@ class DatabaseClient:
Returns:
User dict or None
"""
self.connect()
with self.conn.cursor(cursor_factory=RealDictCursor) as cursor:
cursor.execute(
"SELECT id, email, role, created_at FROM users WHERE id = %s",
(user_id,),
)
user = cursor.fetchone()
return dict(user) if user else None
with self.get_conn() as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cursor:
cursor.execute(
"SELECT id, email, role, created_at FROM users WHERE id = %s",
(user_id,),
)
user = cursor.fetchone()
return dict(user) if user else None
def get_user_by_email(self, email: str) -> Optional[Dict]:
"""Get a user by email.
@@ -727,15 +749,14 @@ class DatabaseClient:
Returns:
User dict or None
"""
self.connect()
with self.conn.cursor(cursor_factory=RealDictCursor) as cursor:
cursor.execute(
"SELECT id, email, role, created_at FROM users WHERE email = %s",
(email,),
)
user = cursor.fetchone()
return dict(user) if user else None
with self.get_conn() as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cursor:
cursor.execute(
"SELECT id, email, role, created_at FROM users WHERE email = %s",
(email,),
)
user = cursor.fetchone()
return dict(user) if user else None
def get_all_users(self, limit: int = 100, offset: int = 0) -> List[Dict]:
"""Get all users (admin only).
@@ -747,19 +768,18 @@ class DatabaseClient:
Returns:
List of user dicts
"""
self.connect()
with self.conn.cursor(cursor_factory=RealDictCursor) as cursor:
cursor.execute(
"""
SELECT id, email, role, created_at
FROM users
ORDER BY created_at DESC
LIMIT %s OFFSET %s
""",
(limit, offset),
)
return [dict(row) for row in cursor.fetchall()]
with self.get_conn() as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cursor:
cursor.execute(
"""
SELECT id, email, role, created_at
FROM users
ORDER BY created_at DESC
LIMIT %s OFFSET %s
""",
(limit, offset),
)
return [dict(row) for row in cursor.fetchall()]
def update_user_role(self, user_id: int, role: str) -> Optional[Dict]:
"""Update a user's role (admin only).
@@ -771,20 +791,19 @@ class DatabaseClient:
Returns:
Updated user dict or None
"""
self.connect()
with self.conn.cursor(cursor_factory=RealDictCursor) as cursor:
cursor.execute(
"""
UPDATE users
SET role = %s, updated_at = CURRENT_TIMESTAMP
WHERE id = %s
RETURNING id, email, role, created_at
""",
(role, user_id),
)
user = cursor.fetchone()
self.conn.commit()
with self.get_conn() as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cursor:
cursor.execute(
"""
UPDATE users
SET role = %s, updated_at = CURRENT_TIMESTAMP
WHERE id = %s
RETURNING id, email, role, created_at
""",
(role, user_id),
)
user = cursor.fetchone()
conn.commit()
return dict(user) if user else None
def delete_user(self, user_id: int) -> bool:
@@ -796,12 +815,11 @@ class DatabaseClient:
Returns:
True if deleted
"""
self.connect()
with self.conn.cursor() as cursor:
cursor.execute("DELETE FROM users WHERE id = %s", (user_id,))
deleted = cursor.rowcount > 0
self.conn.commit()
with self.get_conn() as conn:
with conn.cursor() as cursor:
cursor.execute("DELETE FROM users WHERE id = %s", (user_id,))
deleted = cursor.rowcount > 0
conn.commit()
return deleted
def get_user_count(self) -> int:
@@ -810,8 +828,85 @@ class DatabaseClient:
Returns:
Number of users
"""
self.connect()
with self.get_conn() as conn:
with conn.cursor() as cursor:
cursor.execute("SELECT COUNT(*) FROM users")
return cursor.fetchone()[0]
with self.conn.cursor() as cursor:
cursor.execute("SELECT COUNT(*) FROM users")
return cursor.fetchone()[0]
# Tracked Companies Methods
def add_tracked_company(self, company_name: str) -> Optional[Dict]:
"""Add a company to the tracking list."""
with self.get_conn() as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cursor:
try:
cursor.execute(
"INSERT INTO tracked_companies (company_name) VALUES (%s) RETURNING *",
(company_name,),
)
row = cursor.fetchone()
conn.commit()
return dict(row) if row else None
except Exception:
conn.rollback()
return None
def remove_tracked_company(self, company_name: str) -> bool:
"""Remove a company from the tracking list."""
with self.get_conn() as conn:
with conn.cursor() as cursor:
cursor.execute(
"DELETE FROM tracked_companies WHERE LOWER(company_name) = LOWER(%s)",
(company_name,),
)
conn.commit()
return cursor.rowcount > 0
def list_tracked_companies(self) -> List[Dict]:
"""List all tracked companies."""
with self.get_conn() as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cursor:
cursor.execute("SELECT * FROM tracked_companies ORDER BY company_name")
return [dict(row) for row in cursor.fetchall()]
def update_tracked_company(
self, company_name: str, patent_count: int
) -> None:
"""Update the last analysis stats for a tracked company."""
with self.get_conn() as conn:
with conn.cursor() as cursor:
cursor.execute(
"""UPDATE tracked_companies
SET last_patent_count = %s, last_analysis_at = CURRENT_TIMESTAMP
WHERE LOWER(company_name) = LOWER(%s)""",
(patent_count, company_name),
)
conn.commit()
def store_alert(
self,
company_name: str,
alert_type: str,
message: str,
old_value: float | None = None,
new_value: float | None = None,
) -> None:
"""Record an alert for a significant change."""
with self.get_conn() as conn:
with conn.cursor() as cursor:
cursor.execute(
"""INSERT INTO alerts (company_name, alert_type, message, old_value, new_value)
VALUES (%s, %s, %s, %s, %s)""",
(company_name, alert_type, message, old_value, new_value),
)
conn.commit()
def list_alerts(self, limit: int = 50) -> List[Dict]:
"""List recent alerts."""
with self.get_conn() as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cursor:
cursor.execute(
"SELECT * FROM alerts ORDER BY created_at DESC LIMIT %s",
(limit,),
)
return [dict(row) for row in cursor.fetchall()]
+109
View File
@@ -0,0 +1,109 @@
"""Scheduled patent analysis for tracked companies.
Uses APScheduler to periodically re-analyze tracked companies and
detect significant changes in patent counts.
"""
import logging
import os
from SPARC import config
from SPARC.analyzer import CompanyAnalyzer
from SPARC.database import DatabaseClient
logger = logging.getLogger(__name__)
# Configurable via environment variable (in hours, default 24)
SCHEDULE_INTERVAL_HOURS = int(os.getenv("SCHEDULE_INTERVAL_HOURS", "24"))
# Patent count change threshold (percentage) to trigger an alert
CHANGE_THRESHOLD_PERCENT = int(os.getenv("CHANGE_THRESHOLD_PERCENT", "20"))
def run_scheduled_analysis() -> None:
"""Re-analyze all tracked companies and check for significant changes."""
db = DatabaseClient(config.database_url)
db.connect()
db.initialize_schema()
tracked = db.list_tracked_companies()
if not tracked:
logger.info("No tracked companies configured; skipping scheduled analysis")
return
logger.info("Running scheduled analysis for %d tracked companies", len(tracked))
analyzer = CompanyAnalyzer(db_client=db)
for company_row in tracked:
name = company_row["company_name"]
old_count = company_row.get("last_patent_count", 0) or 0
try:
result = analyzer._analyze_company_safe(name)
if result.success:
new_count = result.patent_count
# Update tracking record
db.update_tracked_company(name, new_count)
# Check for significant change
if old_count > 0:
delta_pct = abs(new_count - old_count) / old_count * 100
if delta_pct >= CHANGE_THRESHOLD_PERCENT:
direction = "increased" if new_count > old_count else "decreased"
message = (
f"Patent count for {name} {direction} by {delta_pct:.0f}% "
f"({old_count} -> {new_count})"
)
logger.warning("ALERT: %s", message)
db.store_alert(
company_name=name,
alert_type="patent_count_change",
message=message,
old_value=old_count,
new_value=new_count,
)
elif new_count > 0:
# First analysis -- record baseline
logger.info("Baseline for %s: %d patents", name, new_count)
else:
logger.warning("Scheduled analysis failed for %s: %s", name, result.error)
except Exception as e:
logger.error("Error analyzing tracked company %s: %s", name, e)
db.close()
logger.info("Scheduled analysis complete")
def start_scheduler() -> None:
"""Start the APScheduler background scheduler.
Safe to call at application startup. If apscheduler is not installed,
the function logs a warning and returns without starting anything.
"""
try:
from apscheduler.schedulers.background import BackgroundScheduler
except ImportError:
logger.warning(
"apscheduler not installed; scheduled analysis disabled. "
"Install with: pip install apscheduler"
)
return
scheduler = BackgroundScheduler()
scheduler.add_job(
run_scheduled_analysis,
"interval",
hours=SCHEDULE_INTERVAL_HOURS,
id="scheduled_patent_analysis",
replace_existing=True,
)
scheduler.start()
logger.info(
"Scheduled patent analysis started (every %d hours, threshold %d%%)",
SCHEDULE_INTERVAL_HOURS,
CHANGE_THRESHOLD_PERCENT,
)
+8 -5
View File
@@ -1,12 +1,15 @@
import os
import serpapi
from SPARC import config
import re
import pdfplumber # pip install pdfplumber
import requests
from datetime import datetime, timedelta
from typing import Dict
from SPARC.types import Patents, Patent
import pdfplumber # pip install pdfplumber
import requests
import serpapi
from SPARC import config
from SPARC.types import Patent, Patents
class SERP:
def query(company: str, days_back: int = None) -> Patents:
+1
View File
@@ -15,3 +15,4 @@ pandas
bcrypt
PyJWT
slowapi
apscheduler
+8
View File
@@ -0,0 +1,8 @@
[lint]
select = ["E", "F", "I"]
ignore = [
"E501", # line too long (handled by formatter)
]
[lint.per-file-ignores]
"tests/*" = ["E402", "F841"] # allow import not at top of file, unused vars (mocks) in tests
+5 -3
View File
@@ -1,9 +1,11 @@
"""Tests for the high-level company analyzer orchestration."""
from unittest.mock import MagicMock, Mock
import pytest
from unittest.mock import Mock, patch, call, MagicMock
from SPARC.analyzer import CompanyAnalyzer
from SPARC.types import Patent, Patents, CompanyAnalysisResult, BatchAnalysisResult
from SPARC.types import BatchAnalysisResult, Patent, Patents
@pytest.fixture(autouse=True)
@@ -24,7 +26,7 @@ class TestCompanyAnalyzer:
"""Test analyzer initialization with API key."""
mock_llm = mocker.patch("SPARC.analyzer.LLMAnalyzer")
analyzer = CompanyAnalyzer(openrouter_api_key="test-key")
_analyzer = CompanyAnalyzer(openrouter_api_key="test-key") # noqa: F841
mock_llm.assert_called_once_with(api_key="test-key")
+4 -3
View File
@@ -1,12 +1,13 @@
"""Tests for FastAPI web service endpoints."""
import pytest
from datetime import datetime
from unittest.mock import Mock, patch
from unittest.mock import Mock
import pytest
from fastapi.testclient import TestClient
from SPARC.api import app
from SPARC.types import CompanyAnalysisResult, BatchAnalysisResult
from SPARC.types import BatchAnalysisResult, CompanyAnalysisResult
@pytest.fixture
+3 -1
View File
@@ -1,7 +1,9 @@
"""Tests for LLM analysis functionality."""
from unittest.mock import Mock
import pytest
from unittest.mock import Mock, MagicMock, patch
from SPARC.llm import LLMAnalyzer
+2 -3
View File
@@ -1,9 +1,8 @@
"""Tests for SERP API patent retrieval and parsing functionality."""
import os
import pytest
from unittest.mock import patch, Mock
from datetime import datetime, timedelta
from unittest.mock import Mock
from SPARC.serp_api import SERP
from SPARC.types import Patent