feat: implement scheduled/recurring analysis with change alerting

Add APScheduler-based background task that periodically re-analyzes
tracked companies and alerts on significant patent count changes.

- Add tracked_companies and alerts tables to database schema
- Add SPARC/scheduler.py with configurable interval and threshold
- Add admin endpoints: GET/POST/DELETE /admin/tracked, GET /admin/alerts
- Scheduler starts at app startup; interval via SCHEDULE_INTERVAL_HOURS
- Change threshold configurable via CHANGE_THRESHOLD_PERCENT env var
- apscheduler is optional; graceful fallback if not installed

Closes leeworks-agents/SPARC#22

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
agent-company
2026-03-26 10:30:43 +00:00
parent 55c131cb32
commit f33447eef8
4 changed files with 274 additions and 0 deletions
+107
View File
@@ -192,6 +192,35 @@ class DatabaseClient:
ON jobs(status)
""")
# Create tracked companies table for scheduled analysis
cursor.execute("""
CREATE TABLE IF NOT EXISTS tracked_companies (
id SERIAL PRIMARY KEY,
company_name VARCHAR(255) UNIQUE NOT NULL,
last_patent_count INTEGER DEFAULT 0,
last_analysis_at TIMESTAMP,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
# Create alerts table for significant changes
cursor.execute("""
CREATE TABLE IF NOT EXISTS alerts (
id SERIAL PRIMARY KEY,
company_name VARCHAR(255) NOT NULL,
alert_type VARCHAR(50) NOT NULL,
message TEXT NOT NULL,
old_value NUMERIC,
new_value NUMERIC,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_alerts_company
ON alerts(company_name)
""")
self.conn.commit()
@staticmethod
@@ -803,3 +832,81 @@ class DatabaseClient:
with conn.cursor() as cursor:
cursor.execute("SELECT COUNT(*) FROM users")
return cursor.fetchone()[0]
# Tracked Companies Methods
def add_tracked_company(self, company_name: str) -> Optional[Dict]:
"""Add a company to the tracking list."""
with self.get_conn() as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cursor:
try:
cursor.execute(
"INSERT INTO tracked_companies (company_name) VALUES (%s) RETURNING *",
(company_name,),
)
row = cursor.fetchone()
conn.commit()
return dict(row) if row else None
except Exception:
conn.rollback()
return None
def remove_tracked_company(self, company_name: str) -> bool:
"""Remove a company from the tracking list."""
with self.get_conn() as conn:
with conn.cursor() as cursor:
cursor.execute(
"DELETE FROM tracked_companies WHERE LOWER(company_name) = LOWER(%s)",
(company_name,),
)
conn.commit()
return cursor.rowcount > 0
def list_tracked_companies(self) -> List[Dict]:
"""List all tracked companies."""
with self.get_conn() as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cursor:
cursor.execute("SELECT * FROM tracked_companies ORDER BY company_name")
return [dict(row) for row in cursor.fetchall()]
def update_tracked_company(
self, company_name: str, patent_count: int
) -> None:
"""Update the last analysis stats for a tracked company."""
with self.get_conn() as conn:
with conn.cursor() as cursor:
cursor.execute(
"""UPDATE tracked_companies
SET last_patent_count = %s, last_analysis_at = CURRENT_TIMESTAMP
WHERE LOWER(company_name) = LOWER(%s)""",
(patent_count, company_name),
)
conn.commit()
def store_alert(
self,
company_name: str,
alert_type: str,
message: str,
old_value: float | None = None,
new_value: float | None = None,
) -> None:
"""Record an alert for a significant change."""
with self.get_conn() as conn:
with conn.cursor() as cursor:
cursor.execute(
"""INSERT INTO alerts (company_name, alert_type, message, old_value, new_value)
VALUES (%s, %s, %s, %s, %s)""",
(company_name, alert_type, message, old_value, new_value),
)
conn.commit()
def list_alerts(self, limit: int = 50) -> List[Dict]:
"""List recent alerts."""
with self.get_conn() as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cursor:
cursor.execute(
"SELECT * FROM alerts ORDER BY created_at DESC LIMIT %s",
(limit,),
)
return [dict(row) for row in cursor.fetchall()]