Compare commits

..

1 Commits

Author SHA1 Message Date
agent-company d366443b38 refactor(db): use shared pooled DatabaseClient singleton instead of per-call instances
- Replace get_db_client() creating new DatabaseClient on every call with a
  module-level singleton initialized once at startup via init_db_client()
- Add init_db_client() and close_db_client() lifecycle functions called
  from FastAPI lifespan handler
- Migrate all DatabaseClient methods from legacy self.connect()/self.conn
  to pooled self.get_conn() context manager for thread-safe connection reuse
- Pool is properly torn down on application shutdown

Closes leeworks-agents/SPARC#7

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-26 06:03:56 +00:00
15 changed files with 214 additions and 301 deletions
-37
View File
@@ -9,43 +9,7 @@ on:
workflow_dispatch: workflow_dispatch:
jobs: jobs:
test:
runs-on: ubuntu-latest
steps:
- name: Install system dependencies
shell: sh
run: |
apk add --no-cache git python3 py3-pip gcc musl-dev libpq-dev python3-dev
- name: Checkout code
shell: sh
run: |
git clone http://gitea.gitea.svc.cluster.local/${{ gitea.repository }}.git .
git checkout ${{ gitea.sha }}
- name: Install Python dependencies
shell: sh
run: |
pip3 install --break-system-packages -r requirements.txt ruff
- name: Run ruff linter
shell: sh
run: |
ruff check SPARC/ tests/
- name: Run pytest
shell: sh
env:
DATABASE_URL: "sqlite://"
API_KEY: "test-key"
OPENROUTER_API_KEY: "test-key"
JWT_SECRET: "test-secret-for-ci"
APP_ENV: "development"
run: |
python3 -m pytest tests/ -v --tb=short -x
build-api: build-api:
needs: test
runs-on: ubuntu-latest runs-on: ubuntu-latest
steps: steps:
- name: Install dependencies - name: Install dependencies
@@ -117,7 +81,6 @@ jobs:
echo "API image available at ${{ steps.tags.outputs.IMAGE_TAG }}" echo "API image available at ${{ steps.tags.outputs.IMAGE_TAG }}"
build-frontend: build-frontend:
needs: test
runs-on: ubuntu-latest runs-on: ubuntu-latest
steps: steps:
- name: Install dependencies - name: Install dependencies
-46
View File
@@ -1,46 +0,0 @@
name: Test and Lint
on:
push:
branches:
- main
pull_request:
branches:
- main
workflow_dispatch:
jobs:
test:
runs-on: ubuntu-latest
steps:
- name: Install system dependencies
shell: sh
run: |
apk add --no-cache git python3 py3-pip gcc musl-dev libpq-dev python3-dev
- name: Checkout code
shell: sh
run: |
git clone http://gitea.gitea.svc.cluster.local/${{ gitea.repository }}.git .
git checkout ${{ gitea.sha }}
- name: Install Python dependencies
shell: sh
run: |
pip3 install --break-system-packages -r requirements.txt ruff
- name: Run ruff linter
shell: sh
run: |
ruff check SPARC/ tests/
- name: Run pytest
shell: sh
env:
DATABASE_URL: "sqlite://"
API_KEY: "test-key"
OPENROUTER_API_KEY: "test-key"
JWT_SECRET: "test-secret-for-ci"
APP_ENV: "development"
run: |
python3 -m pytest tests/ -v --tb=short -x
+2 -3
View File
@@ -1,4 +1,3 @@
from .types import Patent as Patent from .types import Patents, Patent
from .types import Patents as Patents
__all__ = ["Patents", "Patent"] all = ["Patents", "Patent"]
+3 -3
View File
@@ -10,9 +10,9 @@ from typing import Callable
from SPARC import config from SPARC import config
from SPARC.database import DatabaseClient from SPARC.database import DatabaseClient
from SPARC.llm import LLMAnalyzer
from SPARC.serp_api import SERP from SPARC.serp_api import SERP
from SPARC.types import BatchAnalysisResult, CompanyAnalysisResult, Patent, Patents from SPARC.llm import LLMAnalyzer
from SPARC.types import Patent, Patents, CompanyAnalysisResult, BatchAnalysisResult
class CompanyAnalyzer: class CompanyAnalyzer:
@@ -92,7 +92,7 @@ class CompanyAnalyzer:
if not processed_patents: if not processed_patents:
return f"Failed to process any patents for {company_name}" return f"Failed to process any patents for {company_name}"
print("Analyzing portfolio with LLM...") print(f"Analyzing portfolio with LLM...")
# Analyze the full portfolio with LLM # Analyze the full portfolio with LLM
analysis = self.llm_analyzer.analyze_patent_portfolio( analysis = self.llm_analyzer.analyze_patent_portfolio(
+5 -1
View File
@@ -21,11 +21,13 @@ from SPARC.auth import (
TokenResponse, TokenResponse,
UserResponse, UserResponse,
check_jwt_secret, check_jwt_secret,
close_db_client,
create_tokens, create_tokens,
decode_token, decode_token,
get_current_admin, get_current_admin,
get_current_user, get_current_user,
get_db_client, get_db_client,
init_db_client,
) )
from SPARC.types import BatchAnalysisResult, CompanyAnalysisResult from SPARC.types import BatchAnalysisResult, CompanyAnalysisResult
@@ -155,6 +157,7 @@ async def lifespan(app: FastAPI):
"""Initialize resources on startup, clean up on shutdown.""" """Initialize resources on startup, clean up on shutdown."""
global _analyzer global _analyzer
check_jwt_secret() check_jwt_secret()
init_db_client()
_analyzer = CompanyAnalyzer() _analyzer = CompanyAnalyzer()
# Mark any jobs that were running/pending before the restart as failed # Mark any jobs that were running/pending before the restart as failed
from SPARC.database import DatabaseClient from SPARC.database import DatabaseClient
@@ -167,8 +170,9 @@ async def lifespan(app: FastAPI):
logging.getLogger(__name__).warning("Marked %d stale jobs as failed on startup", stale) logging.getLogger(__name__).warning("Marked %d stale jobs as failed on startup", stale)
_db.close() _db.close()
yield yield
# Cleanup if needed # Cleanup
_analyzer = None _analyzer = None
close_db_client()
app = FastAPI( app = FastAPI(
+29 -4
View File
@@ -146,11 +146,36 @@ def decode_token(token: str) -> Optional[TokenPayload]:
return None return None
# Shared database client singleton, initialized at startup via init_db_client()
_db_client: DatabaseClient | None = None
def init_db_client() -> None:
"""Initialize the shared database client. Call once at app startup."""
global _db_client
_db_client = DatabaseClient(config.database_url)
_db_client.connect()
def close_db_client() -> None:
"""Close the shared database client. Call at app shutdown."""
global _db_client
if _db_client:
_db_client.close()
_db_client = None
def get_db_client() -> DatabaseClient: def get_db_client() -> DatabaseClient:
"""Get database client for auth operations.""" """Get the shared pooled database client for auth operations.
client = DatabaseClient(config.database_url)
client.connect() Returns the module-level singleton DatabaseClient. If not yet initialized
return client (e.g., during tests), creates a new instance as a fallback.
"""
global _db_client
if _db_client is None:
_db_client = DatabaseClient(config.database_url)
_db_client.connect()
return _db_client
async def get_current_user( async def get_current_user(
+1 -2
View File
@@ -2,9 +2,8 @@
Loads environment variables from .env file for API keys and other secrets. Loads environment variables from .env file for API keys and other secrets.
""" """
import os
from dotenv import load_dotenv from dotenv import load_dotenv
import os
load_dotenv() load_dotenv()
+34 -48
View File
@@ -1,15 +1,14 @@
"""Database client for storing and retrieving LLM messages and user authentication.""" """Database client for storing and retrieving LLM messages and user authentication."""
import contextlib import contextlib
import hashlib
import json
from datetime import datetime, timedelta
from typing import Dict, List, Optional
import bcrypt
import psycopg2 import psycopg2
from psycopg2.extras import RealDictCursor
from psycopg2.pool import ThreadedConnectionPool from psycopg2.pool import ThreadedConnectionPool
from psycopg2.extras import RealDictCursor
from typing import Dict, List, Optional
from datetime import datetime, timedelta
import json
import hashlib
import bcrypt
class DatabaseClient: class DatabaseClient:
@@ -222,8 +221,6 @@ class DatabaseClient:
Returns: Returns:
Cached message dict if found, None otherwise Cached message dict if found, None otherwise
""" """
self.connect()
prompt_hash = self.hash_prompt(prompt) prompt_hash = self.hash_prompt(prompt)
query = """ query = """
@@ -246,7 +243,8 @@ class DatabaseClient:
query += " ORDER BY timestamp DESC LIMIT 1" query += " ORDER BY timestamp DESC LIMIT 1"
with self.conn.cursor(cursor_factory=RealDictCursor) as cursor: with self.get_conn() as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cursor:
cursor.execute(query, params) cursor.execute(query, params)
result = cursor.fetchone() result = cursor.fetchone()
return dict(result) if result else None return dict(result) if result else None
@@ -277,11 +275,10 @@ class DatabaseClient:
Returns: Returns:
The ID of the inserted record The ID of the inserted record
""" """
self.connect()
prompt_hash = self.hash_prompt(prompt) prompt_hash = self.hash_prompt(prompt)
with self.conn.cursor() as cursor: with self.get_conn() as conn:
with conn.cursor() as cursor:
cursor.execute( cursor.execute(
""" """
INSERT INTO llm_messages INSERT INTO llm_messages
@@ -303,7 +300,7 @@ class DatabaseClient:
) )
message_id = cursor.fetchone()[0] message_id = cursor.fetchone()[0]
self.conn.commit() conn.commit()
return message_id return message_id
@@ -325,8 +322,6 @@ class DatabaseClient:
Returns: Returns:
List of message dictionaries List of message dictionaries
""" """
self.connect()
query = "SELECT * FROM llm_messages WHERE 1=1" query = "SELECT * FROM llm_messages WHERE 1=1"
params = [] params = []
@@ -341,7 +336,8 @@ class DatabaseClient:
query += " ORDER BY timestamp DESC LIMIT %s OFFSET %s" query += " ORDER BY timestamp DESC LIMIT %s OFFSET %s"
params.extend([limit, offset]) params.extend([limit, offset])
with self.conn.cursor(cursor_factory=RealDictCursor) as cursor: with self.get_conn() as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cursor:
cursor.execute(query, params) cursor.execute(query, params)
return [dict(row) for row in cursor.fetchall()] return [dict(row) for row in cursor.fetchall()]
@@ -354,9 +350,8 @@ class DatabaseClient:
Returns: Returns:
Dictionary with analytics data Dictionary with analytics data
""" """
self.connect() with self.get_conn() as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cursor:
with self.conn.cursor(cursor_factory=RealDictCursor) as cursor:
# Total messages # Total messages
cursor.execute( cursor.execute(
""" """
@@ -651,12 +646,11 @@ class DatabaseClient:
Returns: Returns:
Created user dict or None if email exists Created user dict or None if email exists
""" """
self.connect()
password_hash = self.hash_password(password) password_hash = self.hash_password(password)
try: try:
with self.conn.cursor(cursor_factory=RealDictCursor) as cursor: with self.get_conn() as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cursor:
cursor.execute( cursor.execute(
""" """
INSERT INTO users (email, password_hash, role) INSERT INTO users (email, password_hash, role)
@@ -666,10 +660,9 @@ class DatabaseClient:
(email, password_hash, role), (email, password_hash, role),
) )
user = cursor.fetchone() user = cursor.fetchone()
self.conn.commit() conn.commit()
return dict(user) if user else None return dict(user) if user else None
except psycopg2.errors.UniqueViolation: except psycopg2.errors.UniqueViolation:
self.conn.rollback()
return None return None
def authenticate_user(self, email: str, password: str) -> Optional[Dict]: def authenticate_user(self, email: str, password: str) -> Optional[Dict]:
@@ -682,9 +675,8 @@ class DatabaseClient:
Returns: Returns:
User dict if authenticated, None otherwise User dict if authenticated, None otherwise
""" """
self.connect() with self.get_conn() as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cursor:
with self.conn.cursor(cursor_factory=RealDictCursor) as cursor:
cursor.execute( cursor.execute(
"SELECT * FROM users WHERE email = %s", "SELECT * FROM users WHERE email = %s",
(email,), (email,),
@@ -709,9 +701,8 @@ class DatabaseClient:
Returns: Returns:
User dict or None User dict or None
""" """
self.connect() with self.get_conn() as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cursor:
with self.conn.cursor(cursor_factory=RealDictCursor) as cursor:
cursor.execute( cursor.execute(
"SELECT id, email, role, created_at FROM users WHERE id = %s", "SELECT id, email, role, created_at FROM users WHERE id = %s",
(user_id,), (user_id,),
@@ -728,9 +719,8 @@ class DatabaseClient:
Returns: Returns:
User dict or None User dict or None
""" """
self.connect() with self.get_conn() as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cursor:
with self.conn.cursor(cursor_factory=RealDictCursor) as cursor:
cursor.execute( cursor.execute(
"SELECT id, email, role, created_at FROM users WHERE email = %s", "SELECT id, email, role, created_at FROM users WHERE email = %s",
(email,), (email,),
@@ -748,9 +738,8 @@ class DatabaseClient:
Returns: Returns:
List of user dicts List of user dicts
""" """
self.connect() with self.get_conn() as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cursor:
with self.conn.cursor(cursor_factory=RealDictCursor) as cursor:
cursor.execute( cursor.execute(
""" """
SELECT id, email, role, created_at SELECT id, email, role, created_at
@@ -772,9 +761,8 @@ class DatabaseClient:
Returns: Returns:
Updated user dict or None Updated user dict or None
""" """
self.connect() with self.get_conn() as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cursor:
with self.conn.cursor(cursor_factory=RealDictCursor) as cursor:
cursor.execute( cursor.execute(
""" """
UPDATE users UPDATE users
@@ -785,7 +773,7 @@ class DatabaseClient:
(role, user_id), (role, user_id),
) )
user = cursor.fetchone() user = cursor.fetchone()
self.conn.commit() conn.commit()
return dict(user) if user else None return dict(user) if user else None
def delete_user(self, user_id: int) -> bool: def delete_user(self, user_id: int) -> bool:
@@ -797,12 +785,11 @@ class DatabaseClient:
Returns: Returns:
True if deleted True if deleted
""" """
self.connect() with self.get_conn() as conn:
with conn.cursor() as cursor:
with self.conn.cursor() as cursor:
cursor.execute("DELETE FROM users WHERE id = %s", (user_id,)) cursor.execute("DELETE FROM users WHERE id = %s", (user_id,))
deleted = cursor.rowcount > 0 deleted = cursor.rowcount > 0
self.conn.commit() conn.commit()
return deleted return deleted
def get_user_count(self) -> int: def get_user_count(self) -> int:
@@ -811,8 +798,7 @@ class DatabaseClient:
Returns: Returns:
Number of users Number of users
""" """
self.connect() with self.get_conn() as conn:
with conn.cursor() as cursor:
with self.conn.cursor() as cursor:
cursor.execute("SELECT COUNT(*) FROM users") cursor.execute("SELECT COUNT(*) FROM users")
return cursor.fetchone()[0] return cursor.fetchone()[0]
+1 -3
View File
@@ -1,11 +1,9 @@
"""LLM integration for patent analysis using OpenRouter.""" """LLM integration for patent analysis using OpenRouter."""
from typing import Dict
from openai import OpenAI from openai import OpenAI
from SPARC import config from SPARC import config
from SPARC.database import DatabaseClient from SPARC.database import DatabaseClient
from typing import Dict
class LLMAnalyzer: class LLMAnalyzer:
+5 -8
View File
@@ -1,15 +1,12 @@
import os import os
import serpapi
from SPARC import config
import re import re
from datetime import datetime, timedelta
from typing import Dict
import pdfplumber # pip install pdfplumber import pdfplumber # pip install pdfplumber
import requests import requests
import serpapi from datetime import datetime, timedelta
from typing import Dict
from SPARC import config from SPARC.types import Patents, Patent
from SPARC.types import Patent, Patents
class SERP: class SERP:
def query(company: str, days_back: int = None) -> Patents: def query(company: str, days_back: int = None) -> Patents:
-8
View File
@@ -1,8 +0,0 @@
[lint]
select = ["E", "F", "I"]
ignore = [
"E501", # line too long (handled by formatter)
]
[lint.per-file-ignores]
"tests/*" = ["E402", "F841"] # allow import not at top of file, unused vars (mocks) in tests
+3 -5
View File
@@ -1,11 +1,9 @@
"""Tests for the high-level company analyzer orchestration.""" """Tests for the high-level company analyzer orchestration."""
from unittest.mock import MagicMock, Mock
import pytest import pytest
from unittest.mock import Mock, patch, call, MagicMock
from SPARC.analyzer import CompanyAnalyzer from SPARC.analyzer import CompanyAnalyzer
from SPARC.types import BatchAnalysisResult, Patent, Patents from SPARC.types import Patent, Patents, CompanyAnalysisResult, BatchAnalysisResult
@pytest.fixture(autouse=True) @pytest.fixture(autouse=True)
@@ -26,7 +24,7 @@ class TestCompanyAnalyzer:
"""Test analyzer initialization with API key.""" """Test analyzer initialization with API key."""
mock_llm = mocker.patch("SPARC.analyzer.LLMAnalyzer") mock_llm = mocker.patch("SPARC.analyzer.LLMAnalyzer")
_analyzer = CompanyAnalyzer(openrouter_api_key="test-key") # noqa: F841 analyzer = CompanyAnalyzer(openrouter_api_key="test-key")
mock_llm.assert_called_once_with(api_key="test-key") mock_llm.assert_called_once_with(api_key="test-key")
+3 -4
View File
@@ -1,13 +1,12 @@
"""Tests for FastAPI web service endpoints.""" """Tests for FastAPI web service endpoints."""
from datetime import datetime
from unittest.mock import Mock
import pytest import pytest
from datetime import datetime
from unittest.mock import Mock, patch
from fastapi.testclient import TestClient from fastapi.testclient import TestClient
from SPARC.api import app from SPARC.api import app
from SPARC.types import BatchAnalysisResult, CompanyAnalysisResult from SPARC.types import CompanyAnalysisResult, BatchAnalysisResult
@pytest.fixture @pytest.fixture
+1 -3
View File
@@ -1,9 +1,7 @@
"""Tests for LLM analysis functionality.""" """Tests for LLM analysis functionality."""
from unittest.mock import Mock
import pytest import pytest
from unittest.mock import Mock, MagicMock, patch
from SPARC.llm import LLMAnalyzer from SPARC.llm import LLMAnalyzer
+3 -2
View File
@@ -1,8 +1,9 @@
"""Tests for SERP API patent retrieval and parsing functionality.""" """Tests for SERP API patent retrieval and parsing functionality."""
import os
import pytest
from unittest.mock import patch, Mock
from datetime import datetime, timedelta from datetime import datetime, timedelta
from unittest.mock import Mock
from SPARC.serp_api import SERP from SPARC.serp_api import SERP
from SPARC.types import Patent from SPARC.types import Patent