Compare commits

...

3 Commits

Author SHA1 Message Date
agent-company 349bb4d073 refactor(db): use shared pooled DatabaseClient singleton instead of per-call instances
- Replace get_db_client() creating new DatabaseClient on every call with a
  module-level singleton initialized once at startup via init_db_client()
- Add init_db_client() and close_db_client() lifecycle functions called
  from FastAPI lifespan handler
- Migrate all DatabaseClient methods from legacy self.connect()/self.conn
  to pooled self.get_conn() context manager for thread-safe connection reuse
- Pool is properly torn down on application shutdown

Closes leeworks-agents/SPARC#7

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-26 04:15:03 +00:00
AI-Manager 6105ba7793 Merge pull request 'chore: add ROADMAP.md for SPARC application development' (#3) from chore/add-roadmap into main 2026-03-26 02:47:54 +00:00
agent-company e8cdc089fa chore: add ROADMAP.md for SPARC application development
- Document current project state and architecture
- Identify P1 priorities: security hardening, error handling, test coverage
- Identify P2 priorities: structured logging, configurable LLM, frontend polish, CI tests
- Identify P3 priorities: export, comparison, scheduled analysis, notifications
- Reference Talos repo for infrastructure/deployment concerns

Closes leeworks-agents/SPARC#2

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-26 00:06:56 +00:00
4 changed files with 309 additions and 171 deletions
+122
View File
@@ -0,0 +1,122 @@
# SPARC Roadmap
Semiconductor Patent & Analytics Report Core -- development priorities.
## Current State
SPARC is a patent analysis platform with a working end-to-end pipeline:
Python/FastAPI backend, React/TypeScript frontend, PostgreSQL for persistence
and caching, Docker Compose for local development, and Gitea Actions CI/CD for
image builds. Core features (patent retrieval via SerpAPI, PDF parsing, LLM
analysis via OpenRouter/Claude, batch processing, JWT authentication, analytics
dashboard) are all implemented and functional.
---
## P1 -- High Priority
These items address correctness, security, and reliability gaps that should be
resolved before broader production use.
### Security hardening
- **Rotate default JWT secret.** `auth.py` ships a fallback
`sparc-secret-key-change-in-production` that will be used if `JWT_SECRET` is
unset. Add a startup check that refuses to start with the default secret in
non-development environments.
- **CORS allow-origins are hardcoded.** `api.py` only permits
`localhost:3000` and `localhost:5173`. Make the allowed origins configurable
via environment variable so the dashboard works when deployed behind a real
domain.
- **Database credentials in docker-compose.yml.** The compose file embeds
`postgres:postgres` in plain text. Reference a `.env` file or Docker secrets
instead.
### Error handling and resilience
- **`get_db_client()` in `auth.py` creates a new `DatabaseClient` on every
call.** This bypasses the connection pool and can exhaust database
connections under load. Refactor to share a single pooled client.
- **`_jobs` dict is in-memory only.** Job state is lost on API restart. Persist
job status in PostgreSQL or Redis so async batch results survive restarts.
- **No rate limiting on auth endpoints.** `/auth/login` and `/auth/register`
are unprotected against brute-force or abuse. Add rate limiting middleware.
### Test coverage for auth and admin
- The existing API tests (`tests/test_api.py`) bypass authentication entirely.
Add tests that exercise the JWT flow: registration, login, protected-route
access, token refresh, and admin-only endpoints.
---
## P2 -- Medium Priority
Improvements to usability, performance, and developer experience.
### Backend
- **Add structured logging.** Replace `print()` calls throughout `analyzer.py`,
`serp_api.py`, and `llm.py` with Python `logging` so log levels and
formatting are consistent.
- **Make LLM model configurable.** `llm.py` hardcodes
`anthropic/claude-3.5-sonnet`. Accept a `MODEL` environment variable to allow
switching models without code changes.
- **SERP cache TTL is hardcoded to 24 hours.** Expose `SERP_CACHE_TTL_HOURS`
as an environment variable in `config.py`.
- **Patent PDF storage.** PDFs are saved to a local `patents/` directory. For
containerized deployments, consider object storage (S3/MinIO) or at minimum
document the volume mount requirement more prominently.
- **`analyze_single_patent` assumes local file path.** The method constructs
`patents/{patent_id}.pdf` and reads from disk, but does not download the PDF
first. Either integrate the download step or document the prerequisite.
- **`Patent.patent_id` typed as `int` in `types.py` but used as `str`
everywhere.** Fix the type annotation to `str`.
### Frontend
- **No loading/error states on several pages.** The Batch and Analytics pages
would benefit from skeleton loaders and user-friendly error messages.
- **No dark mode.** Tailwind is configured but no dark variant is applied.
- **Missing `package-lock.json` or `pnpm-lock.yaml`.** The frontend has no
lockfile committed, leading to non-reproducible builds.
### CI/CD
- **No test stage in the Gitea Actions workflow.** `build.yaml` builds and
pushes images but never runs `pytest`. Add a test job that gates the build.
- **No linting or type checking.** Add `ruff` (Python) and `tsc --noEmit`
(TypeScript) to CI.
---
## P3 -- Nice to Have
Lower-urgency enhancements and future features.
- **Export analysis reports.** Allow users to download analysis results as PDF
or CSV from the dashboard.
- **Comparison view.** Side-by-side comparison of two companies' patent
portfolios.
- **Scheduled/recurring analysis.** Periodically re-analyze tracked companies
and alert on significant changes.
- **Webhook/notification support.** Send alerts (Slack, Discord, email) when
batch jobs complete or when a company's innovation score changes
significantly.
- **Multi-model support.** Let users choose between LLM providers per analysis
(e.g., GPT-4o, Gemini, Claude) and compare outputs.
- **Patent trend charts.** Visualize patent filing frequency and technology
category distribution over time in the Analytics page.
- **API pagination.** The `/analyze/batch` and `/jobs` endpoints could benefit
from cursor-based pagination for large result sets.
- **OpenAPI client generation.** Auto-generate the TypeScript API client from
the FastAPI OpenAPI spec to keep frontend types in sync.
---
## Infrastructure and Deployment
Kubernetes manifests, Helm charts, and cluster-level concerns (MetalLB,
storage, FluxCD sync) are tracked in the
[Talos](https://10.0.1.10/leeworks-agents/Talos) repository. File
infrastructure-related issues there, not here.
+6 -2
View File
@@ -16,11 +16,13 @@ from SPARC.analyzer import CompanyAnalyzer
from SPARC.auth import ( from SPARC.auth import (
TokenResponse, TokenResponse,
UserResponse, UserResponse,
close_db_client,
create_tokens, create_tokens,
decode_token, decode_token,
get_current_admin, get_current_admin,
get_current_user, get_current_user,
get_db_client, get_db_client,
init_db_client,
) )
from SPARC.types import BatchAnalysisResult, CompanyAnalysisResult from SPARC.types import BatchAnalysisResult, CompanyAnalysisResult
@@ -148,12 +150,14 @@ _analyzer: CompanyAnalyzer | None = None
@asynccontextmanager @asynccontextmanager
async def lifespan(app: FastAPI): async def lifespan(app: FastAPI):
"""Initialize resources on startup.""" """Initialize resources on startup, clean up on shutdown."""
global _analyzer global _analyzer
init_db_client()
_analyzer = CompanyAnalyzer() _analyzer = CompanyAnalyzer()
yield yield
# Cleanup if needed # Cleanup
_analyzer = None _analyzer = None
close_db_client()
app = FastAPI( app = FastAPI(
+29 -4
View File
@@ -132,11 +132,36 @@ def decode_token(token: str) -> Optional[TokenPayload]:
return None return None
# Shared database client singleton, initialized at startup via init_db_client()
_db_client: DatabaseClient | None = None
def init_db_client() -> None:
"""Initialize the shared database client. Call once at app startup."""
global _db_client
_db_client = DatabaseClient(config.database_url)
_db_client.connect()
def close_db_client() -> None:
"""Close the shared database client. Call at app shutdown."""
global _db_client
if _db_client:
_db_client.close()
_db_client = None
def get_db_client() -> DatabaseClient: def get_db_client() -> DatabaseClient:
"""Get database client for auth operations.""" """Get the shared pooled database client for auth operations.
client = DatabaseClient(config.database_url)
client.connect() Returns the module-level singleton DatabaseClient. If not yet initialized
return client (e.g., during tests), creates a new instance as a fallback.
"""
global _db_client
if _db_client is None:
_db_client = DatabaseClient(config.database_url)
_db_client.connect()
return _db_client
async def get_current_user( async def get_current_user(
+152 -165
View File
@@ -201,8 +201,6 @@ class DatabaseClient:
Returns: Returns:
Cached message dict if found, None otherwise Cached message dict if found, None otherwise
""" """
self.connect()
prompt_hash = self.hash_prompt(prompt) prompt_hash = self.hash_prompt(prompt)
query = """ query = """
@@ -225,10 +223,11 @@ class DatabaseClient:
query += " ORDER BY timestamp DESC LIMIT 1" query += " ORDER BY timestamp DESC LIMIT 1"
with self.conn.cursor(cursor_factory=RealDictCursor) as cursor: with self.get_conn() as conn:
cursor.execute(query, params) with conn.cursor(cursor_factory=RealDictCursor) as cursor:
result = cursor.fetchone() cursor.execute(query, params)
return dict(result) if result else None result = cursor.fetchone()
return dict(result) if result else None
def store_message( def store_message(
self, self,
@@ -256,33 +255,32 @@ class DatabaseClient:
Returns: Returns:
The ID of the inserted record The ID of the inserted record
""" """
self.connect()
prompt_hash = self.hash_prompt(prompt) prompt_hash = self.hash_prompt(prompt)
with self.conn.cursor() as cursor: with self.get_conn() as conn:
cursor.execute( with conn.cursor() as cursor:
""" cursor.execute(
INSERT INTO llm_messages """
(prompt, prompt_hash, response, company_name, analysis_type, model, metadata, token_usage, is_cached) INSERT INTO llm_messages
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s) (prompt, prompt_hash, response, company_name, analysis_type, model, metadata, token_usage, is_cached)
RETURNING id VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
""", RETURNING id
( """,
prompt, (
prompt_hash, prompt,
response, prompt_hash,
company_name, response,
analysis_type, company_name,
model, analysis_type,
json.dumps(metadata) if metadata else None, model,
json.dumps(token_usage) if token_usage else None, json.dumps(metadata) if metadata else None,
is_cached, json.dumps(token_usage) if token_usage else None,
), is_cached,
) ),
)
message_id = cursor.fetchone()[0] message_id = cursor.fetchone()[0]
self.conn.commit() conn.commit()
return message_id return message_id
@@ -304,8 +302,6 @@ class DatabaseClient:
Returns: Returns:
List of message dictionaries List of message dictionaries
""" """
self.connect()
query = "SELECT * FROM llm_messages WHERE 1=1" query = "SELECT * FROM llm_messages WHERE 1=1"
params = [] params = []
@@ -320,9 +316,10 @@ class DatabaseClient:
query += " ORDER BY timestamp DESC LIMIT %s OFFSET %s" query += " ORDER BY timestamp DESC LIMIT %s OFFSET %s"
params.extend([limit, offset]) params.extend([limit, offset])
with self.conn.cursor(cursor_factory=RealDictCursor) as cursor: with self.get_conn() as conn:
cursor.execute(query, params) with conn.cursor(cursor_factory=RealDictCursor) as cursor:
return [dict(row) for row in cursor.fetchall()] cursor.execute(query, params)
return [dict(row) for row in cursor.fetchall()]
def get_analytics(self, days: int = 30) -> Dict: def get_analytics(self, days: int = 30) -> Dict:
"""Get analytics on message usage. """Get analytics on message usage.
@@ -333,53 +330,52 @@ class DatabaseClient:
Returns: Returns:
Dictionary with analytics data Dictionary with analytics data
""" """
self.connect() with self.get_conn() as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cursor:
# Total messages
cursor.execute(
"""
SELECT COUNT(*) as total_messages
FROM llm_messages
WHERE timestamp >= NOW() - INTERVAL '%s days'
""",
(days,),
)
total = cursor.fetchone()["total_messages"]
with self.conn.cursor(cursor_factory=RealDictCursor) as cursor: # Messages by company
# Total messages cursor.execute(
cursor.execute( """
""" SELECT company_name, COUNT(*) as count
SELECT COUNT(*) as total_messages FROM llm_messages
FROM llm_messages WHERE timestamp >= NOW() - INTERVAL '%s days'
WHERE timestamp >= NOW() - INTERVAL '%s days' GROUP BY company_name
""", ORDER BY count DESC
(days,), LIMIT 10
) """,
total = cursor.fetchone()["total_messages"] (days,),
)
by_company = cursor.fetchall()
# Messages by company # Messages by type
cursor.execute( cursor.execute(
""" """
SELECT company_name, COUNT(*) as count SELECT analysis_type, COUNT(*) as count
FROM llm_messages FROM llm_messages
WHERE timestamp >= NOW() - INTERVAL '%s days' WHERE timestamp >= NOW() - INTERVAL '%s days'
GROUP BY company_name GROUP BY analysis_type
ORDER BY count DESC ORDER BY count DESC
LIMIT 10 """,
""", (days,),
(days,), )
) by_type = cursor.fetchall()
by_company = cursor.fetchall()
# Messages by type return {
cursor.execute( "total_messages": total,
""" "by_company": [dict(row) for row in by_company],
SELECT analysis_type, COUNT(*) as count "by_type": [dict(row) for row in by_type],
FROM llm_messages "period_days": days,
WHERE timestamp >= NOW() - INTERVAL '%s days' }
GROUP BY analysis_type
ORDER BY count DESC
""",
(days,),
)
by_type = cursor.fetchall()
return {
"total_messages": total,
"by_company": [dict(row) for row in by_company],
"by_type": [dict(row) for row in by_type],
"period_days": days,
}
# Patent Cache Methods # Patent Cache Methods
@@ -505,25 +501,23 @@ class DatabaseClient:
Returns: Returns:
Created user dict or None if email exists Created user dict or None if email exists
""" """
self.connect()
password_hash = self.hash_password(password) password_hash = self.hash_password(password)
try: try:
with self.conn.cursor(cursor_factory=RealDictCursor) as cursor: with self.get_conn() as conn:
cursor.execute( with conn.cursor(cursor_factory=RealDictCursor) as cursor:
""" cursor.execute(
INSERT INTO users (email, password_hash, role) """
VALUES (%s, %s, %s) INSERT INTO users (email, password_hash, role)
RETURNING id, email, role, created_at VALUES (%s, %s, %s)
""", RETURNING id, email, role, created_at
(email, password_hash, role), """,
) (email, password_hash, role),
user = cursor.fetchone() )
self.conn.commit() user = cursor.fetchone()
conn.commit()
return dict(user) if user else None return dict(user) if user else None
except psycopg2.errors.UniqueViolation: except psycopg2.errors.UniqueViolation:
self.conn.rollback()
return None return None
def authenticate_user(self, email: str, password: str) -> Optional[Dict]: def authenticate_user(self, email: str, password: str) -> Optional[Dict]:
@@ -536,23 +530,22 @@ class DatabaseClient:
Returns: Returns:
User dict if authenticated, None otherwise User dict if authenticated, None otherwise
""" """
self.connect() with self.get_conn() as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cursor:
cursor.execute(
"SELECT * FROM users WHERE email = %s",
(email,),
)
user = cursor.fetchone()
with self.conn.cursor(cursor_factory=RealDictCursor) as cursor: if user and self.verify_password(password, user["password_hash"]):
cursor.execute( return {
"SELECT * FROM users WHERE email = %s", "id": user["id"],
(email,), "email": user["email"],
) "role": user["role"],
user = cursor.fetchone() "created_at": user["created_at"],
}
if user and self.verify_password(password, user["password_hash"]): return None
return {
"id": user["id"],
"email": user["email"],
"role": user["role"],
"created_at": user["created_at"],
}
return None
def get_user_by_id(self, user_id: int) -> Optional[Dict]: def get_user_by_id(self, user_id: int) -> Optional[Dict]:
"""Get a user by ID. """Get a user by ID.
@@ -563,15 +556,14 @@ class DatabaseClient:
Returns: Returns:
User dict or None User dict or None
""" """
self.connect() with self.get_conn() as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cursor:
with self.conn.cursor(cursor_factory=RealDictCursor) as cursor: cursor.execute(
cursor.execute( "SELECT id, email, role, created_at FROM users WHERE id = %s",
"SELECT id, email, role, created_at FROM users WHERE id = %s", (user_id,),
(user_id,), )
) user = cursor.fetchone()
user = cursor.fetchone() return dict(user) if user else None
return dict(user) if user else None
def get_user_by_email(self, email: str) -> Optional[Dict]: def get_user_by_email(self, email: str) -> Optional[Dict]:
"""Get a user by email. """Get a user by email.
@@ -582,15 +574,14 @@ class DatabaseClient:
Returns: Returns:
User dict or None User dict or None
""" """
self.connect() with self.get_conn() as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cursor:
with self.conn.cursor(cursor_factory=RealDictCursor) as cursor: cursor.execute(
cursor.execute( "SELECT id, email, role, created_at FROM users WHERE email = %s",
"SELECT id, email, role, created_at FROM users WHERE email = %s", (email,),
(email,), )
) user = cursor.fetchone()
user = cursor.fetchone() return dict(user) if user else None
return dict(user) if user else None
def get_all_users(self, limit: int = 100, offset: int = 0) -> List[Dict]: def get_all_users(self, limit: int = 100, offset: int = 0) -> List[Dict]:
"""Get all users (admin only). """Get all users (admin only).
@@ -602,19 +593,18 @@ class DatabaseClient:
Returns: Returns:
List of user dicts List of user dicts
""" """
self.connect() with self.get_conn() as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cursor:
with self.conn.cursor(cursor_factory=RealDictCursor) as cursor: cursor.execute(
cursor.execute( """
""" SELECT id, email, role, created_at
SELECT id, email, role, created_at FROM users
FROM users ORDER BY created_at DESC
ORDER BY created_at DESC LIMIT %s OFFSET %s
LIMIT %s OFFSET %s """,
""", (limit, offset),
(limit, offset), )
) return [dict(row) for row in cursor.fetchall()]
return [dict(row) for row in cursor.fetchall()]
def update_user_role(self, user_id: int, role: str) -> Optional[Dict]: def update_user_role(self, user_id: int, role: str) -> Optional[Dict]:
"""Update a user's role (admin only). """Update a user's role (admin only).
@@ -626,20 +616,19 @@ class DatabaseClient:
Returns: Returns:
Updated user dict or None Updated user dict or None
""" """
self.connect() with self.get_conn() as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cursor:
with self.conn.cursor(cursor_factory=RealDictCursor) as cursor: cursor.execute(
cursor.execute( """
""" UPDATE users
UPDATE users SET role = %s, updated_at = CURRENT_TIMESTAMP
SET role = %s, updated_at = CURRENT_TIMESTAMP WHERE id = %s
WHERE id = %s RETURNING id, email, role, created_at
RETURNING id, email, role, created_at """,
""", (role, user_id),
(role, user_id), )
) user = cursor.fetchone()
user = cursor.fetchone() conn.commit()
self.conn.commit()
return dict(user) if user else None return dict(user) if user else None
def delete_user(self, user_id: int) -> bool: def delete_user(self, user_id: int) -> bool:
@@ -651,12 +640,11 @@ class DatabaseClient:
Returns: Returns:
True if deleted True if deleted
""" """
self.connect() with self.get_conn() as conn:
with conn.cursor() as cursor:
with self.conn.cursor() as cursor: cursor.execute("DELETE FROM users WHERE id = %s", (user_id,))
cursor.execute("DELETE FROM users WHERE id = %s", (user_id,)) deleted = cursor.rowcount > 0
deleted = cursor.rowcount > 0 conn.commit()
self.conn.commit()
return deleted return deleted
def get_user_count(self) -> int: def get_user_count(self) -> int:
@@ -665,8 +653,7 @@ class DatabaseClient:
Returns: Returns:
Number of users Number of users
""" """
self.connect() with self.get_conn() as conn:
with conn.cursor() as cursor:
with self.conn.cursor() as cursor: cursor.execute("SELECT COUNT(*) FROM users")
cursor.execute("SELECT COUNT(*) FROM users") return cursor.fetchone()[0]
return cursor.fetchone()[0]