Files
SPARC/SPARC/storage.py
T
agent-company 9a43f85259 feat: add S3/MinIO object storage support for patent PDFs
Introduce a StorageBackend abstraction (local filesystem and S3) for
patent PDF storage. When STORAGE_BACKEND=s3, PDFs are read/written via
boto3 to an S3-compatible bucket instead of the local filesystem.

- Add SPARC/storage.py with LocalStorageBackend and S3StorageBackend
- Update serp_api.py save_patents and parse_patent_pdf to use storage
- Add storage config vars to config.py and .env.example
- Add optional MinIO service to docker-compose.yml (--profile s3)
- Add boto3 to requirements.txt

Closes leeworks-agents/SPARC#38

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-26 10:17:24 +00:00

172 lines
5.2 KiB
Python

"""Patent PDF storage abstraction.
Provides a unified interface for reading and writing patent PDF files,
with pluggable backends for local filesystem and S3-compatible object
storage (e.g., MinIO, AWS S3).
"""
import logging
import os
from abc import ABC, abstractmethod
from SPARC import config
logger = logging.getLogger(__name__)
class StorageBackend(ABC):
"""Abstract base class for patent PDF storage."""
@abstractmethod
def read(self, key: str) -> bytes:
"""Read a file by key.
Args:
key: Storage key (e.g., "US-12345678-B2.pdf")
Returns:
File contents as bytes.
Raises:
FileNotFoundError: If the file does not exist.
"""
@abstractmethod
def write(self, key: str, data: bytes) -> None:
"""Write data to storage.
Args:
key: Storage key (e.g., "US-12345678-B2.pdf")
data: File contents as bytes.
"""
@abstractmethod
def exists(self, key: str) -> bool:
"""Check if a file exists in storage.
Args:
key: Storage key.
Returns:
True if the file exists and has non-zero size.
"""
@abstractmethod
def path_for(self, key: str) -> str:
"""Return a path or URI suitable for downstream consumers.
For local storage this is a filesystem path; for S3 it is the
object key (callers that need a local file should use read()
and write to a temporary location).
"""
class LocalStorageBackend(StorageBackend):
"""Store patent PDFs on the local filesystem under a directory."""
def __init__(self, base_dir: str = "patents"):
self.base_dir = base_dir
os.makedirs(self.base_dir, exist_ok=True)
def _full_path(self, key: str) -> str:
return os.path.join(self.base_dir, key)
def read(self, key: str) -> bytes:
path = self._full_path(key)
if not os.path.exists(path):
raise FileNotFoundError(f"File not found: {path}")
with open(path, "rb") as f:
return f.read()
def write(self, key: str, data: bytes) -> None:
path = self._full_path(key)
os.makedirs(os.path.dirname(path) or self.base_dir, exist_ok=True)
with open(path, "wb") as f:
f.write(data)
logger.debug("Wrote %d bytes to %s", len(data), path)
def exists(self, key: str) -> bool:
path = self._full_path(key)
return os.path.exists(path) and os.path.getsize(path) > 0
def path_for(self, key: str) -> str:
return self._full_path(key)
class S3StorageBackend(StorageBackend):
"""Store patent PDFs in an S3-compatible bucket."""
def __init__(
self,
bucket: str,
endpoint_url: str = "",
access_key: str = "",
secret_key: str = "",
):
import boto3
kwargs: dict = {}
if endpoint_url:
kwargs["endpoint_url"] = endpoint_url
if access_key and secret_key:
kwargs["aws_access_key_id"] = access_key
kwargs["aws_secret_access_key"] = secret_key
self.s3 = boto3.client("s3", **kwargs)
self.bucket = bucket
# Ensure bucket exists (useful for MinIO local dev)
try:
self.s3.head_bucket(Bucket=self.bucket)
except Exception:
try:
self.s3.create_bucket(Bucket=self.bucket)
logger.info("Created S3 bucket: %s", self.bucket)
except Exception as e:
logger.warning("Could not create bucket %s: %s", self.bucket, e)
def read(self, key: str) -> bytes:
try:
response = self.s3.get_object(Bucket=self.bucket, Key=key)
return response["Body"].read()
except self.s3.exceptions.NoSuchKey:
raise FileNotFoundError(f"S3 object not found: s3://{self.bucket}/{key}")
except Exception as e:
if "NoSuchKey" in str(e) or "404" in str(e):
raise FileNotFoundError(f"S3 object not found: s3://{self.bucket}/{key}")
raise
def write(self, key: str, data: bytes) -> None:
self.s3.put_object(
Bucket=self.bucket,
Key=key,
Body=data,
ContentType="application/pdf",
)
logger.debug("Wrote %d bytes to s3://%s/%s", len(data), self.bucket, key)
def exists(self, key: str) -> bool:
try:
response = self.s3.head_object(Bucket=self.bucket, Key=key)
return response["ContentLength"] > 0
except Exception:
return False
def path_for(self, key: str) -> str:
return f"s3://{self.bucket}/{key}"
def get_storage_backend() -> StorageBackend:
"""Factory: return the configured storage backend instance."""
backend = config.storage_backend.lower()
if backend == "s3":
logger.info("Using S3 storage backend (bucket=%s)", config.s3_bucket)
return S3StorageBackend(
bucket=config.s3_bucket,
endpoint_url=config.s3_endpoint_url,
access_key=config.s3_access_key,
secret_key=config.s3_secret_key,
)
logger.info("Using local storage backend")
return LocalStorageBackend()