Files
opus-orchestrator-ai/opus_orchestrator/utils/local_ingest.py
T
mrhavens 81bfe8994a Witness ALL data: Include source code in ingestion
- LocalIngestor: Include ALL files by default (source code, configs, etc.)
- GitHubIngestor: Include ALL files by default
- AI witnesses everything and transforms it into documentation
- Filter only build artifacts (.pyc, .so, dist, build)

Philosophy: Don't filter what the AI can see - let it decide
what's relevant. The AI can document code directly!
2026-03-13 05:59:03 +00:00

269 lines
8.4 KiB
Python

"""Local file ingestion for Opus Orchestrator.
Fetches content from local files and directories - INCLUDING SOURCE CODE.
The AI witnesses ALL data and transforms it into documentation.
"""
import os
import fnmatch
from pathlib import Path
from typing import Any, Optional
class LocalIngestor:
"""Fetch and parse content from local files and directories.
Supports:
- ALL file types (including source code!)
- Directories (recursive)
- File pattern matching
Philosophy: The AI witnesses everything and transforms it.
Don't filter what the AI can see - let it decide what's relevant.
"""
def __init__(
self,
root_path: Optional[str] = None,
encoding: str = "utf-8",
):
"""Initialize local ingestor.
Args:
root_path: Root directory for relative paths
encoding: Text file encoding
"""
self.root_path = Path(root_path) if root_path else Path.cwd()
self.encoding = encoding
# INCLUDE ALL FILES - let the AI witness everything!
# Only exclude build artifacts and version control
self.exclude_patterns = [
".git",
".svn",
"__pycache__",
"node_modules",
".venv",
"venv",
".env",
"*.pyc",
"*.so",
"*.o",
"*.a",
"*.dylib",
".DS_Store",
"*.swp",
"*.tmp",
".cache",
"dist",
"build",
"*.egg-info",
".pytest_cache",
".mypy_cache",
".tox",
]
def is_excluded(self, path: Path) -> bool:
"""Check if a path should be excluded.
Args:
path: Path to check
Returns:
True if should be excluded
"""
name = path.name
for pattern in self.exclude_patterns:
if fnmatch.fnmatch(name, pattern):
return True
return False
def get_files(
self,
path: str | Path,
extensions: Optional[list[str]] = None,
recursive: bool = True,
max_files: int = 1000,
include_all: bool = True,
) -> dict[Path, str]:
"""Get all files from a path - INCLUDING SOURCE CODE.
The AI witnesses everything and transforms it into documentation.
Args:
path: File or directory path
extensions: File extensions to include (None = ALL files!)
recursive: Recursively scan directories
max_files: Maximum number of files to read
include_all: If True, include ALL files (default True!)
Returns:
Dict mapping file paths to content
"""
path = Path(path)
if not path.exists():
raise FileNotFoundError(f"Path does not exist: {path}")
# Default: include ALL files (source code, configs, everything!)
# The AI will witness and transform everything
if include_all:
extensions = None # Include everything
else:
extensions = extensions or self.default_extensions
extensions = [ext.lower() for ext in extensions]
results = {}
if path.is_file():
# Single file
if include_all or self._has_valid_extension(path, extensions or []):
results[path] = self._read_file(path)
else:
# Directory
files = self._scan_directory(
path,
extensions,
recursive,
max_files - len(results),
include_all=include_all,
)
for f in files:
try:
results[f] = self._read_file(f)
except Exception as e:
print(f"Warning: Could not read {f}: {e}")
return results
def _has_valid_extension(self, path: Path, extensions: list[str]) -> bool:
"""Check if file has a valid extension."""
ext = path.suffix.lower()
return ext in extensions or path.suffix == "" # Allow no extension
def _scan_directory(
self,
directory: Path,
extensions: Optional[list[str]],
recursive: bool,
max_files: int,
include_all: bool = True,
) -> list[Path]:
"""Scan directory for matching files."""
files = []
try:
if recursive:
for root, dirs, filenames in os.walk(directory):
# Filter out excluded directories
dirs[:] = [d for d in dirs if not self.is_excluded(Path(d))]
for filename in filenames:
filepath = Path(root) / filename
if self.is_excluded(filepath):
continue
# Include ALL files (including source code!)
if include_all or self._has_valid_extension(filepath, extensions or []):
files.append(filepath)
if len(files) >= max_files:
return files
else:
# Non-recursive
for item in directory.iterdir():
if item.is_file() and not self.is_excluded(item):
if include_all or self._has_valid_extension(item, extensions or []):
files.append(item)
if len(files) >= max_files:
break
except PermissionError:
print(f"Warning: Permission denied for {directory}")
return files
def _read_file(self, path: Path) -> str:
"""Read file content."""
try:
with open(path, "r", encoding=self.encoding) as f:
return f.read()
except UnicodeDecodeError:
# Try with different encoding
try:
with open(path, "r", encoding="latin-1") as f:
return f.read()
except Exception:
return f"[Binary file: {path}]"
except Exception as e:
return f"[Error reading {path}: {e}]"
def ingest(
self,
path: str | Path,
extensions: Optional[list[str]] = None,
recursive: bool = True,
) -> dict[str, Any]:
"""Ingest content from local path.
Args:
path: File or directory path
extensions: File extensions to include
recursive: Recursively scan directories
Returns:
Dict with combined text and metadata
"""
files = self.get_files(path, extensions, recursive)
# Combine content with file separators
combined_lines = []
for filepath, content in sorted(files.items()):
rel_path = filepath.relative_to(self.root_path) if filepath.is_relative_to(self.root_path) else filepath
combined_lines.append(f"=== {rel_path} ===")
combined_lines.append(content)
combined_lines.append("")
combined_text = "\n".join(combined_lines)
return {
"path": str(path),
"files": {str(k): v for k, v in files.items()},
"file_count": len(files),
"total_chars": len(combined_text),
"combined_text": combined_text,
}
def summarize(self, content: str, max_length: int = 5000) -> str:
"""Create a summary of content for use as seed.
Args:
content: Full content
max_length: Maximum length of summary
Returns:
Summarized content
"""
if len(content) <= max_length:
return content
# Take first portion + indicator
return content[:max_length] + f"\n\n[...] ({len(content) - max_length} more characters)"
def create_local_ingestor(root_path: Optional[str] = None) -> LocalIngestor:
"""Factory function to create a local ingestor.
Args:
root_path: Root directory
Returns:
Configured LocalIngestor
"""
return LocalIngestor(root_path=root_path)