f33447eef8
Add APScheduler-based background task that periodically re-analyzes tracked companies and alerts on significant patent count changes. - Add tracked_companies and alerts tables to database schema - Add SPARC/scheduler.py with configurable interval and threshold - Add admin endpoints: GET/POST/DELETE /admin/tracked, GET /admin/alerts - Scheduler starts at app startup; interval via SCHEDULE_INTERVAL_HOURS - Change threshold configurable via CHANGE_THRESHOLD_PERCENT env var - apscheduler is optional; graceful fallback if not installed Closes leeworks-agents/SPARC#22 Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
110 lines
3.7 KiB
Python
110 lines
3.7 KiB
Python
"""Scheduled patent analysis for tracked companies.
|
|
|
|
Uses APScheduler to periodically re-analyze tracked companies and
|
|
detect significant changes in patent counts.
|
|
"""
|
|
|
|
import logging
|
|
import os
|
|
|
|
from SPARC import config
|
|
from SPARC.analyzer import CompanyAnalyzer
|
|
from SPARC.database import DatabaseClient
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Configurable via environment variable (in hours, default 24)
|
|
SCHEDULE_INTERVAL_HOURS = int(os.getenv("SCHEDULE_INTERVAL_HOURS", "24"))
|
|
|
|
# Patent count change threshold (percentage) to trigger an alert
|
|
CHANGE_THRESHOLD_PERCENT = int(os.getenv("CHANGE_THRESHOLD_PERCENT", "20"))
|
|
|
|
|
|
def run_scheduled_analysis() -> None:
|
|
"""Re-analyze all tracked companies and check for significant changes."""
|
|
db = DatabaseClient(config.database_url)
|
|
db.connect()
|
|
db.initialize_schema()
|
|
|
|
tracked = db.list_tracked_companies()
|
|
if not tracked:
|
|
logger.info("No tracked companies configured; skipping scheduled analysis")
|
|
return
|
|
|
|
logger.info("Running scheduled analysis for %d tracked companies", len(tracked))
|
|
|
|
analyzer = CompanyAnalyzer(db_client=db)
|
|
|
|
for company_row in tracked:
|
|
name = company_row["company_name"]
|
|
old_count = company_row.get("last_patent_count", 0) or 0
|
|
|
|
try:
|
|
result = analyzer._analyze_company_safe(name)
|
|
|
|
if result.success:
|
|
new_count = result.patent_count
|
|
|
|
# Update tracking record
|
|
db.update_tracked_company(name, new_count)
|
|
|
|
# Check for significant change
|
|
if old_count > 0:
|
|
delta_pct = abs(new_count - old_count) / old_count * 100
|
|
if delta_pct >= CHANGE_THRESHOLD_PERCENT:
|
|
direction = "increased" if new_count > old_count else "decreased"
|
|
message = (
|
|
f"Patent count for {name} {direction} by {delta_pct:.0f}% "
|
|
f"({old_count} -> {new_count})"
|
|
)
|
|
logger.warning("ALERT: %s", message)
|
|
db.store_alert(
|
|
company_name=name,
|
|
alert_type="patent_count_change",
|
|
message=message,
|
|
old_value=old_count,
|
|
new_value=new_count,
|
|
)
|
|
elif new_count > 0:
|
|
# First analysis -- record baseline
|
|
logger.info("Baseline for %s: %d patents", name, new_count)
|
|
else:
|
|
logger.warning("Scheduled analysis failed for %s: %s", name, result.error)
|
|
|
|
except Exception as e:
|
|
logger.error("Error analyzing tracked company %s: %s", name, e)
|
|
|
|
db.close()
|
|
logger.info("Scheduled analysis complete")
|
|
|
|
|
|
def start_scheduler() -> None:
|
|
"""Start the APScheduler background scheduler.
|
|
|
|
Safe to call at application startup. If apscheduler is not installed,
|
|
the function logs a warning and returns without starting anything.
|
|
"""
|
|
try:
|
|
from apscheduler.schedulers.background import BackgroundScheduler
|
|
except ImportError:
|
|
logger.warning(
|
|
"apscheduler not installed; scheduled analysis disabled. "
|
|
"Install with: pip install apscheduler"
|
|
)
|
|
return
|
|
|
|
scheduler = BackgroundScheduler()
|
|
scheduler.add_job(
|
|
run_scheduled_analysis,
|
|
"interval",
|
|
hours=SCHEDULE_INTERVAL_HOURS,
|
|
id="scheduled_patent_analysis",
|
|
replace_existing=True,
|
|
)
|
|
scheduler.start()
|
|
logger.info(
|
|
"Scheduled patent analysis started (every %d hours, threshold %d%%)",
|
|
SCHEDULE_INTERVAL_HOURS,
|
|
CHANGE_THRESHOLD_PERCENT,
|
|
)
|