From 64cdabd2f899d61c3509cac582f5371d3fa0d015 Mon Sep 17 00:00:00 2001 From: Mark Randall Havens Date: Fri, 13 Mar 2026 03:45:43 +0000 Subject: [PATCH] Add local file/directory ingestion support - LocalIngestor class for files and directories - CLI: opus ingest-local PATH - Generate from local: opus generate --local ./my-notes/ - Support for extensions, recursive scanning, summarize - Pattern-based exclusion (.git, __pycache__, etc.) --- opus_orchestrator/cli.py | 121 +++++++++++- opus_orchestrator/utils/__init__.py | 3 + opus_orchestrator/utils/local_ingest.py | 245 ++++++++++++++++++++++++ 3 files changed, 368 insertions(+), 1 deletion(-) create mode 100644 opus_orchestrator/utils/local_ingest.py diff --git a/opus_orchestrator/cli.py b/opus_orchestrator/cli.py index d79cfb8..8699979 100644 --- a/opus_orchestrator/cli.py +++ b/opus_orchestrator/cli.py @@ -269,6 +269,11 @@ Examples: "--save-commit-msg", help="Commit message for GitHub save", ) + gen_parser.add_argument( + "--local", + "-l", + help="Local file or directory to use as source", + ) gen_parser.add_argument( "--use-crewai", action="store_true", @@ -376,6 +381,48 @@ Examples: help="List objects instead of downloading", ) + # ------------------------------------------------------------------------- + # INGEST-LOCAL COMMAND + # ------------------------------------------------------------------------- + local_parser = subparsers.add_parser( + "ingest-local", + help="Ingest content from local files/directories", + description="Fetch and analyze content from local files and directories", + ) + local_parser.add_argument( + "path", + help="File or directory path to ingest", + ) + local_parser.add_argument( + "--extensions", "-e", + help="Comma-separated file extensions (default: txt,md,markdown,notes,draft)", + ) + local_parser.add_argument( + "--no-recursive", + action="store_true", + help="Don't scan subdirectories", + ) + local_parser.add_argument( + "--output", "-o", + help="Output file for ingested content", + ) + local_parser.add_argument( + "--preview", + action="store_true", + help="Show preview of ingested content", + ) + local_parser.add_argument( + "--summarize", + action="store_true", + help="Summarize content instead of full ingest", + ) + local_parser.add_argument( + "--max-length", + type=int, + default=10000, + help="Max length for summary (default: 10000)", + ) + # ------------------------------------------------------------------------- # FRAMEWORKS COMMAND # ------------------------------------------------------------------------- @@ -491,7 +538,24 @@ async def run_generate(args: argparse.Namespace) -> int: # Determine the seed concept seed_concept = args.concept - if args.repo: + if args.local: + # Ingest from local files/directory + from opus_orchestrator import LocalIngestor + + print(f"šŸ“‚ Ingesting from local: {args.local}") + + ingestor = LocalIngestor() + result = ingestor.ingest(args.local) + + full_text = result["combined_text"] + print(f" āœ… Loaded {len(full_text):,} characters from {result['file_count']} files") + print(f" šŸ“„ Files: {', '.join(list(result['files'].keys())[:5])}") + if result['file_count'] > 5: + print(f" ... and {result['file_count'] - 5} more") + print() + + seed_concept = full_text + elif args.repo: # Ingest from GitHub - use FULL content print(f"šŸ“„ Ingesting from GitHub: {args.repo}") @@ -800,6 +864,60 @@ def run_s3_ingest(args: argparse.Namespace) -> int: return 0 +def run_local_ingest(args: argparse.Namespace) -> int: + """Ingest content from local files/directories.""" + from opus_orchestrator import LocalIngestor + + print(f"\nšŸ“‚ Ingesting from local: {args.path}\n") + + # Parse extensions + extensions = None + if args.extensions: + extensions = [ext.strip() for ext in args.extensions.split(",")] + + # Create ingestor + ingestor = LocalIngestor() + + # Ingest + result = ingestor.ingest( + path=args.path, + extensions=extensions, + recursive=not args.no_recursive, + ) + + if args.summarize: + content = ingestor.summarize(result["combined_text"], args.max_length) + else: + content = result["combined_text"] + + print(f"āœ… Loaded {result['total_chars']:,} characters") + print(f" Files: {result['file_count']}") + print(f" Root: {result['path']}") + + files_list = list(result["files"].keys()) + print(f" File list: {', '.join(files_list[:10])}") + if len(files_list) > 10: + print(f" ... and {len(files_list) - 10} more") + + if args.summarize: + print(f" šŸ“ Summarized to {args.max_length} characters") + + print() + + if args.preview: + print("šŸ“„ PREVIEW (first 2000 chars):") + print("-" * 40) + print(content[:2000]) + print("-" * 40) + + if args.output: + with open(args.output, "w") as f: + f.write(content) + print(f"\nšŸ’¾ Saved to: {args.output}") + + return 0 + + def run_frameworks(args: argparse.Namespace) -> int: """List available frameworks.""" from opus_orchestrator.frameworks import FRAMEWORKS @@ -919,6 +1037,7 @@ async def main_async(args: argparse.Namespace) -> int: "serve": run_serve, "ingest": run_ingest, "ingest-s3": run_s3_ingest, + "ingest-local": run_local_ingest, "frameworks": run_frameworks, "config": run_config, "docs": run_docs, diff --git a/opus_orchestrator/utils/__init__.py b/opus_orchestrator/utils/__init__.py index 1607ef5..ee9c5ac 100644 --- a/opus_orchestrator/utils/__init__.py +++ b/opus_orchestrator/utils/__init__.py @@ -3,6 +3,7 @@ from opus_orchestrator.utils.docs import generate_docs from opus_orchestrator.utils.github_ingest import GitHubIngestor, create_github_ingestor from opus_orchestrator.utils.s3_ingest import S3Ingestor, create_s3_ingestor +from opus_orchestrator.utils.local_ingest import LocalIngestor, create_local_ingestor from opus_orchestrator.utils.llm import get_llm_client __all__ = [ @@ -11,5 +12,7 @@ __all__ = [ "create_github_ingestor", "S3Ingestor", "create_s3_ingestor", + "LocalIngestor", + "create_local_ingestor", "get_llm_client", ] diff --git a/opus_orchestrator/utils/local_ingest.py b/opus_orchestrator/utils/local_ingest.py new file mode 100644 index 0000000..a951574 --- /dev/null +++ b/opus_orchestrator/utils/local_ingest.py @@ -0,0 +1,245 @@ +"""Local file ingestion for Opus Orchestrator. + +Fetches content from local files and directories. +""" + +import os +import fnmatch +from pathlib import Path +from typing import Any, Optional + + +class LocalIngestor: + """Fetch and parse content from local files and directories. + + Supports: + - Individual files + - Directories (recursive) + - File pattern matching + - Multiple formats (txt, md, markdown, etc.) + """ + + def __init__( + self, + root_path: Optional[str] = None, + encoding: str = "utf-8", + ): + """Initialize local ingestor. + + Args: + root_path: Root directory for relative paths + encoding: Text file encoding + """ + self.root_path = Path(root_path) if root_path else Path.cwd() + self.encoding = encoding + + # Default file extensions to include + self.default_extensions = [".txt", ".md", ".markdown", ".notes", ".draft", ".rst", ".org"] + + # Files/dirs to exclude + self.exclude_patterns = [ + ".git", + ".svn", + "__pycache__", + "node_modules", + ".venv", + "venv", + ".env", + "*.pyc", + ".DS_Store", + "*.swp", + "*.tmp", + ".cache", + ] + + def is_excluded(self, path: Path) -> bool: + """Check if a path should be excluded. + + Args: + path: Path to check + + Returns: + True if should be excluded + """ + name = path.name + + for pattern in self.exclude_patterns: + if fnmatch.fnmatch(name, pattern): + return True + + return False + + def get_files( + self, + path: str | Path, + extensions: Optional[list[str]] = None, + recursive: bool = True, + max_files: int = 1000, + ) -> dict[Path, str]: + """Get all text files from a path. + + Args: + path: File or directory path + extensions: File extensions to include (default: common text formats) + recursive: Recursively scan directories + max_files: Maximum number of files to read + + Returns: + Dict mapping file paths to content + """ + path = Path(path) + + if not path.exists(): + raise FileNotFoundError(f"Path does not exist: {path}") + + extensions = extensions or self.default_extensions + extensions = [ext.lower() for ext in extensions] + + results = {} + + if path.is_file(): + # Single file + if self._has_valid_extension(path, extensions): + results[path] = self._read_file(path) + else: + # Directory + files = self._scan_directory( + path, + extensions, + recursive, + max_files - len(results) + ) + + for f in files: + try: + results[f] = self._read_file(f) + except Exception as e: + print(f"Warning: Could not read {f}: {e}") + + return results + + def _has_valid_extension(self, path: Path, extensions: list[str]) -> bool: + """Check if file has a valid extension.""" + ext = path.suffix.lower() + return ext in extensions or path.suffix == "" # Allow no extension + + def _scan_directory( + self, + directory: Path, + extensions: list[str], + recursive: bool, + max_files: int, + ) -> list[Path]: + """Scan directory for matching files.""" + files = [] + + try: + if recursive: + for root, dirs, filenames in os.walk(directory): + # Filter out excluded directories + dirs[:] = [d for d in dirs if not self.is_excluded(Path(d))] + + for filename in filenames: + filepath = Path(root) / filename + + if self.is_excluded(filepath): + continue + + if self._has_valid_extension(filepath, extensions): + files.append(filepath) + + if len(files) >= max_files: + return files + else: + # Non-recursive + for item in directory.iterdir(): + if item.is_file() and not self.is_excluded(item): + if self._has_valid_extension(item, extensions): + files.append(item) + + if len(files) >= max_files: + break + + except PermissionError: + print(f"Warning: Permission denied for {directory}") + + return files + + def _read_file(self, path: Path) -> str: + """Read file content.""" + try: + with open(path, "r", encoding=self.encoding) as f: + return f.read() + except UnicodeDecodeError: + # Try with different encoding + try: + with open(path, "r", encoding="latin-1") as f: + return f.read() + except Exception: + return f"[Binary file: {path}]" + except Exception as e: + return f"[Error reading {path}: {e}]" + + def ingest( + self, + path: str | Path, + extensions: Optional[list[str]] = None, + recursive: bool = True, + ) -> dict[str, Any]: + """Ingest content from local path. + + Args: + path: File or directory path + extensions: File extensions to include + recursive: Recursively scan directories + + Returns: + Dict with combined text and metadata + """ + files = self.get_files(path, extensions, recursive) + + # Combine content with file separators + combined_lines = [] + for filepath, content in sorted(files.items()): + rel_path = filepath.relative_to(self.root_path) if filepath.is_relative_to(self.root_path) else filepath + combined_lines.append(f"=== {rel_path} ===") + combined_lines.append(content) + combined_lines.append("") + + combined_text = "\n".join(combined_lines) + + return { + "path": str(path), + "files": {str(k): v for k, v in files.items()}, + "file_count": len(files), + "total_chars": len(combined_text), + "combined_text": combined_text, + } + + def summarize(self, content: str, max_length: int = 5000) -> str: + """Create a summary of content for use as seed. + + Args: + content: Full content + max_length: Maximum length of summary + + Returns: + Summarized content + """ + if len(content) <= max_length: + return content + + # Take first portion + indicator + return content[:max_length] + f"\n\n[...] ({len(content) - max_length} more characters)" + + +def create_local_ingestor(root_path: Optional[str] = None) -> LocalIngestor: + """Factory function to create a local ingestor. + + Args: + root_path: Root directory + + Returns: + Configured LocalIngestor + """ + return LocalIngestor(root_path=root_path)