Add S3/MinIO ingestion support

- S3Ingestor class for S3-compatible storage
- Supports: AWS S3, MinIO, DigitalOcean Spaces, Wasabi
- CLI: opus ingest-s3 --bucket my-bucket --prefix notes/
- Features: list objects, download, upload, text extraction

Environment:
- AWS_ACCESS_KEY_ID
- AWS_SECRET_ACCESS_KEY
- AWS_REGION
- S3_ENDPOINT_URL (for MinIO/non-AWS)
This commit is contained in:
2026-03-13 03:18:02 +00:00
parent 64bea5cae6
commit c248487d2e
4 changed files with 427 additions and 1 deletions
+4
View File
@@ -33,6 +33,7 @@ from opus_orchestrator.state import OpusState, create_initial_state
from opus_orchestrator.langgraph_workflow import OpusGraph, run_opus, OpusGraphState
from opus_orchestrator.autogen_critique import CritiqueCrew, create_critique_crew
from opus_orchestrator.utils.github_ingest import GitHubIngestor, create_github_ingestor
from opus_orchestrator.utils.s3_ingest import S3Ingestor, create_s3_ingestor
from opus_orchestrator.frameworks import StoryFramework
from opus_orchestrator.crews import (
OpusCrew,
@@ -112,6 +113,9 @@ __all__ = [
"create_critique_crew",
"GitHubIngestor",
"create_github_ingestor",
# S3/MinIO (NEW!)
"S3Ingestor",
"create_s3_ingestor",
]
# Import legacy orchestrator for backward compatibility
+87 -1
View File
@@ -150,7 +150,7 @@ Examples:
)
# -------------------------------------------------------------------------
# INGEST COMMAND
# INGEST COMMAND (GitHub)
# -------------------------------------------------------------------------
ingest_parser = subparsers.add_parser(
"ingest",
@@ -178,6 +178,43 @@ Examples:
help="Show preview of ingested content",
)
# -------------------------------------------------------------------------
# INGEST-S3 COMMAND
# -------------------------------------------------------------------------
s3_parser = subparsers.add_parser(
"ingest-s3",
help="Ingest content from S3/MinIO",
description="Fetch and analyze content from S3-compatible storage",
)
s3_parser.add_argument(
"--bucket", "-b",
required=True,
help="S3 bucket name",
)
s3_parser.add_argument(
"--prefix", "-p",
default="",
help="Object key prefix",
)
s3_parser.add_argument(
"--endpoint", "-e",
help="S3 endpoint URL (for MinIO, DO Spaces, etc.)",
)
s3_parser.add_argument(
"--output", "-o",
help="Output file for ingested content",
)
s3_parser.add_argument(
"--preview",
action="store_true",
help="Show preview of ingested content",
)
s3_parser.add_argument(
"--list-objects",
action="store_true",
help="List objects instead of downloading",
)
# -------------------------------------------------------------------------
# FRAMEWORKS COMMAND
# -------------------------------------------------------------------------
@@ -409,6 +446,54 @@ def run_ingest(args: argparse.Namespace) -> int:
return 0
def run_s3_ingest(args: argparse.Namespace) -> int:
"""Ingest content from S3/MinIO."""
from opus_orchestrator import S3Ingestor
print(f"\n🪣 Ingesting from S3: {args.bucket}/{args.prefix}\n")
if args.endpoint:
print(f" Endpoint: {args.endpoint}")
ingestor = S3Ingestor(
endpoint_url=args.endpoint,
bucket=args.bucket,
)
if args.list_objects:
# Just list objects
objects = ingestor.list_objects(bucket=args.bucket, prefix=args.prefix)
print(f"📦 Objects ({len(objects)}):")
for obj in objects[:20]:
print(f" {obj['key']} ({obj['size']:,} bytes)")
if len(objects) > 20:
print(f" ... and {len(objects) - 20} more")
return 0
# Ingest content
result = ingestor.ingest_bucket(
bucket=args.bucket,
prefix=args.prefix,
)
print(f"✅ Loaded {result['total_chars']:,} characters")
print(f" Files: {result['file_count']}")
print(f" File list: {', '.join(result['files'].keys())}\n")
if args.preview:
print("📄 PREVIEW (first 2000 chars):")
print("-" * 40)
print(result["combined_text"][:2000])
print("-" * 40)
if args.output:
with open(args.output, "w") as f:
f.write(result["combined_text"])
print(f"\n💾 Saved to: {args.output}")
return 0
def run_frameworks(args: argparse.Namespace) -> int:
"""List available frameworks."""
from opus_orchestrator.frameworks import FRAMEWORKS
@@ -527,6 +612,7 @@ async def main_async(args: argparse.Namespace) -> int:
"generate": run_generate,
"serve": run_serve,
"ingest": run_ingest,
"ingest-s3": run_s3_ingest,
"frameworks": run_frameworks,
"config": run_config,
"docs": run_docs,
+3
View File
@@ -2,11 +2,14 @@
from opus_orchestrator.utils.docs import generate_docs
from opus_orchestrator.utils.github_ingest import GitHubIngestor, create_github_ingestor
from opus_orchestrator.utils.s3_ingest import S3Ingestor, create_s3_ingestor
from opus_orchestrator.utils.llm import get_llm_client
__all__ = [
"generate_docs",
"GitHubIngestor",
"create_github_ingestor",
"S3Ingestor",
"create_s3_ingestor",
"get_llm_client",
]
+333
View File
@@ -0,0 +1,333 @@
"""S3/MinIO ingestion for Opus Orchestrator.
Fetches content from S3-compatible object storage.
"""
import os
import io
from typing import Any, Optional
from pathlib import Path
import boto3
from botocore.exceptions import ClientError, NoCredentialsError
from dotenv import load_dotenv
load_dotenv("/home/solaria/.openclaw/workspace/opus-orchestrator-ai/.env")
class S3Ingestor:
"""Fetch and parse content from S3-compatible storage.
Supports:
- Amazon S3
- MinIO
- DigitalOcean Spaces
- Wasabi
- Other S3-compatible APIs
"""
def __init__(
self,
endpoint_url: Optional[str] = None,
access_key: Optional[str] = None,
secret_key: Optional[str] = None,
region_name: str = "us-east-1",
bucket: Optional[str] = None,
):
"""Initialize S3 ingestor.
Args:
endpoint_url: S3 endpoint URL (for MinIO, DO Spaces, etc.)
access_key: AWS access key
secret_key: AWS secret key
region_name: AWS region
bucket: Default bucket name
"""
self.access_key = access_key or os.environ.get("AWS_ACCESS_KEY_ID")
self.secret_key = secret_key or os.environ.get("AWS_SECRET_ACCESS_KEY")
self.region_name = region_name or os.environ.get("AWS_REGION", "us-east-1")
self.endpoint_url = endpoint_url or os.environ.get("S3_ENDPOINT_URL")
self.bucket = bucket
if not self.access_key or not self.secret_key:
raise ValueError(
"S3 credentials required. Set AWS_ACCESS_KEY_ID/AWS_SECRET_ACCESS_KEY "
"or pass access_key/secret_key."
)
# Determine if using custom endpoint
use_ssl = self.endpoint_url and not self.endpoint_url.startswith("http://")
self.s3_client = boto3.client(
"s3",
endpoint_url=self.endpoint_url,
aws_access_key_id=self.access_key,
aws_secret_access_key=self.secret_key,
region_name=self.region_name,
)
self._buckets_cache: Optional[list[str]] = None
def list_buckets(self) -> list[str]:
"""List available buckets.
Returns:
List of bucket names
"""
try:
response = self.s3_client.list_buckets()
return [b["Name"] for b in response.get("Buckets", [])]
except (ClientError, NoCredentialsError) as e:
raise RuntimeError(f"Failed to list buckets: {e}")
def list_objects(
self,
bucket: Optional[str] = None,
prefix: str = "",
max_keys: int = 1000,
) -> list[dict]:
"""List objects in a bucket.
Args:
bucket: Bucket name (uses default if not provided)
prefix: Object key prefix filter
max_keys: Maximum number of keys to return
Returns:
List of object metadata dicts
"""
bucket = bucket or self.bucket
if not bucket:
raise ValueError("Bucket name required")
try:
response = self.s3_client.list_objects_v2(
Bucket=bucket,
Prefix=prefix,
MaxKeys=max_keys,
)
return [
{
"key": obj["Key"],
"size": obj["Size"],
"last_modified": obj.get("LastModified"),
"etag": obj.get("ETag", "").strip('"'),
}
for obj in response.get("Contents", [])
]
except ClientError as e:
raise RuntimeError(f"Failed to list objects: {e}")
def get_object(
self,
key: str,
bucket: Optional[str] = None,
encoding: str = "utf-8",
) -> str:
"""Get content of a single object.
Args:
key: Object key
bucket: Bucket name (uses default if not provided)
encoding: Text encoding
Returns:
Object content as string
"""
bucket = bucket or self.bucket
if not bucket:
raise ValueError("Bucket name required")
try:
response = self.s3_client.get_object(Bucket=bucket, Key=key)
body = response["Body"]
# Try to read as text
content = body.read()
try:
return content.decode(encoding)
except UnicodeDecodeError:
# Return raw bytes as hex for binary files
return f"[Binary file: {len(content)} bytes]"
except ClientError as e:
raise RuntimeError(f"Failed to get object {key}: {e}")
def get_text_files(
self,
bucket: Optional[str] = None,
prefix: str = "",
extensions: Optional[list[str]] = None,
) -> dict[str, str]:
"""Get all text files from a prefix.
Args:
bucket: Bucket name
prefix: Object key prefix
extensions: File extensions to filter (e.g., ['.txt', '.md'])
Returns:
Dict mapping keys to content
"""
bucket = bucket or self.bucket
if not bucket:
raise ValueError("Bucket name required")
extensions = extensions or [".txt", ".md", ".markdown", ".notes", ".draft"]
objects = self.list_objects(bucket=bucket, prefix=prefix)
results = {}
for obj in objects:
key = obj["key"]
# Check extension
if not any(key.lower().endswith(ext) for ext in extensions):
continue
# Skip directories
if key.endswith("/"):
continue
try:
content = self.get_object(key, bucket)
results[key] = content
except Exception as e:
print(f"Warning: Failed to read {key}: {e}")
continue
return results
def ingest_bucket(
self,
bucket: Optional[str] = None,
prefix: str = "",
extensions: Optional[list[str]] = None,
) -> dict[str, Any]:
"""Ingest all text content from a bucket/prefix.
Args:
bucket: Bucket name
prefix: Object key prefix
extensions: File extensions to include
Returns:
Dict with combined text and metadata
"""
bucket = bucket or self.bucket
if not bucket:
raise ValueError("Bucket name required")
files = self.get_text_files(
bucket=bucket,
prefix=prefix,
extensions=extensions,
)
# Combine all content
combined_lines = []
for key, content in files.items():
combined_lines.append(f"=== {key} ===")
combined_lines.append(content)
combined_lines.append("")
combined_text = "\n".join(combined_lines)
return {
"bucket": bucket,
"prefix": prefix,
"files": files,
"file_count": len(files),
"total_chars": len(combined_text),
"combined_text": combined_text,
}
def download_file(
self,
key: str,
local_path: str | Path,
bucket: Optional[str] = None,
) -> Path:
"""Download a file from S3 to local storage.
Args:
key: Object key
local_path: Local destination path
bucket: Bucket name
Returns:
Path to downloaded file
"""
bucket = bucket or self.bucket
if not bucket:
raise ValueError("Bucket name required")
local_path = Path(local_path)
local_path.parent.mkdir(parents=True, exist_ok=True)
self.s3_client.download_file(bucket, key, str(local_path))
return local_path
def upload_file(
self,
local_path: str | Path,
key: str,
bucket: Optional[str] = None,
content_type: Optional[str] = None,
) -> str:
"""Upload a file to S3.
Args:
local_path: Local file path
key: Object key
bucket: Bucket name
content_type: Content MIME type
Returns:
S3 URL of uploaded file
"""
bucket = bucket or self.bucket
if not bucket:
raise ValueError("Bucket name required")
local_path = Path(local_path)
extra_args = {}
if content_type:
extra_args["ContentType"] = content_type
self.s3_client.upload_file(
str(local_path),
bucket,
key,
ExtraArgs=extra_args,
)
return f"s3://{bucket}/{key}"
def create_s3_ingestor(
endpoint_url: Optional[str] = None,
access_key: Optional[str] = None,
secret_key: Optional[str] = None,
bucket: Optional[str] = None,
) -> S3Ingestor:
"""Factory function to create an S3 ingestor.
Args:
endpoint_url: S3 endpoint URL
access_key: AWS access key
secret_key: AWS secret key
bucket: Default bucket
Returns:
Configured S3Ingestor
"""
return S3Ingestor(
endpoint_url=endpoint_url,
access_key=access_key,
secret_key=secret_key,
bucket=bucket,
)