81bfe8994a
- LocalIngestor: Include ALL files by default (source code, configs, etc.) - GitHubIngestor: Include ALL files by default - AI witnesses everything and transforms it into documentation - Filter only build artifacts (.pyc, .so, dist, build) Philosophy: Don't filter what the AI can see - let it decide what's relevant. The AI can document code directly!
269 lines
8.4 KiB
Python
269 lines
8.4 KiB
Python
"""Local file ingestion for Opus Orchestrator.
|
|
|
|
Fetches content from local files and directories - INCLUDING SOURCE CODE.
|
|
The AI witnesses ALL data and transforms it into documentation.
|
|
"""
|
|
|
|
import os
|
|
import fnmatch
|
|
from pathlib import Path
|
|
from typing import Any, Optional
|
|
|
|
|
|
class LocalIngestor:
|
|
"""Fetch and parse content from local files and directories.
|
|
|
|
Supports:
|
|
- ALL file types (including source code!)
|
|
- Directories (recursive)
|
|
- File pattern matching
|
|
|
|
Philosophy: The AI witnesses everything and transforms it.
|
|
Don't filter what the AI can see - let it decide what's relevant.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
root_path: Optional[str] = None,
|
|
encoding: str = "utf-8",
|
|
):
|
|
"""Initialize local ingestor.
|
|
|
|
Args:
|
|
root_path: Root directory for relative paths
|
|
encoding: Text file encoding
|
|
"""
|
|
self.root_path = Path(root_path) if root_path else Path.cwd()
|
|
self.encoding = encoding
|
|
|
|
# INCLUDE ALL FILES - let the AI witness everything!
|
|
# Only exclude build artifacts and version control
|
|
self.exclude_patterns = [
|
|
".git",
|
|
".svn",
|
|
"__pycache__",
|
|
"node_modules",
|
|
".venv",
|
|
"venv",
|
|
".env",
|
|
"*.pyc",
|
|
"*.so",
|
|
"*.o",
|
|
"*.a",
|
|
"*.dylib",
|
|
".DS_Store",
|
|
"*.swp",
|
|
"*.tmp",
|
|
".cache",
|
|
"dist",
|
|
"build",
|
|
"*.egg-info",
|
|
".pytest_cache",
|
|
".mypy_cache",
|
|
".tox",
|
|
]
|
|
|
|
def is_excluded(self, path: Path) -> bool:
|
|
"""Check if a path should be excluded.
|
|
|
|
Args:
|
|
path: Path to check
|
|
|
|
Returns:
|
|
True if should be excluded
|
|
"""
|
|
name = path.name
|
|
|
|
for pattern in self.exclude_patterns:
|
|
if fnmatch.fnmatch(name, pattern):
|
|
return True
|
|
|
|
return False
|
|
|
|
def get_files(
|
|
self,
|
|
path: str | Path,
|
|
extensions: Optional[list[str]] = None,
|
|
recursive: bool = True,
|
|
max_files: int = 1000,
|
|
include_all: bool = True,
|
|
) -> dict[Path, str]:
|
|
"""Get all files from a path - INCLUDING SOURCE CODE.
|
|
|
|
The AI witnesses everything and transforms it into documentation.
|
|
|
|
Args:
|
|
path: File or directory path
|
|
extensions: File extensions to include (None = ALL files!)
|
|
recursive: Recursively scan directories
|
|
max_files: Maximum number of files to read
|
|
include_all: If True, include ALL files (default True!)
|
|
|
|
Returns:
|
|
Dict mapping file paths to content
|
|
"""
|
|
path = Path(path)
|
|
|
|
if not path.exists():
|
|
raise FileNotFoundError(f"Path does not exist: {path}")
|
|
|
|
# Default: include ALL files (source code, configs, everything!)
|
|
# The AI will witness and transform everything
|
|
if include_all:
|
|
extensions = None # Include everything
|
|
else:
|
|
extensions = extensions or self.default_extensions
|
|
extensions = [ext.lower() for ext in extensions]
|
|
|
|
results = {}
|
|
|
|
if path.is_file():
|
|
# Single file
|
|
if include_all or self._has_valid_extension(path, extensions or []):
|
|
results[path] = self._read_file(path)
|
|
else:
|
|
# Directory
|
|
files = self._scan_directory(
|
|
path,
|
|
extensions,
|
|
recursive,
|
|
max_files - len(results),
|
|
include_all=include_all,
|
|
)
|
|
|
|
for f in files:
|
|
try:
|
|
results[f] = self._read_file(f)
|
|
except Exception as e:
|
|
print(f"Warning: Could not read {f}: {e}")
|
|
|
|
return results
|
|
|
|
def _has_valid_extension(self, path: Path, extensions: list[str]) -> bool:
|
|
"""Check if file has a valid extension."""
|
|
ext = path.suffix.lower()
|
|
return ext in extensions or path.suffix == "" # Allow no extension
|
|
|
|
def _scan_directory(
|
|
self,
|
|
directory: Path,
|
|
extensions: Optional[list[str]],
|
|
recursive: bool,
|
|
max_files: int,
|
|
include_all: bool = True,
|
|
) -> list[Path]:
|
|
"""Scan directory for matching files."""
|
|
files = []
|
|
|
|
try:
|
|
if recursive:
|
|
for root, dirs, filenames in os.walk(directory):
|
|
# Filter out excluded directories
|
|
dirs[:] = [d for d in dirs if not self.is_excluded(Path(d))]
|
|
|
|
for filename in filenames:
|
|
filepath = Path(root) / filename
|
|
|
|
if self.is_excluded(filepath):
|
|
continue
|
|
|
|
# Include ALL files (including source code!)
|
|
if include_all or self._has_valid_extension(filepath, extensions or []):
|
|
files.append(filepath)
|
|
|
|
if len(files) >= max_files:
|
|
return files
|
|
else:
|
|
# Non-recursive
|
|
for item in directory.iterdir():
|
|
if item.is_file() and not self.is_excluded(item):
|
|
if include_all or self._has_valid_extension(item, extensions or []):
|
|
files.append(item)
|
|
|
|
if len(files) >= max_files:
|
|
break
|
|
|
|
except PermissionError:
|
|
print(f"Warning: Permission denied for {directory}")
|
|
|
|
return files
|
|
|
|
def _read_file(self, path: Path) -> str:
|
|
"""Read file content."""
|
|
try:
|
|
with open(path, "r", encoding=self.encoding) as f:
|
|
return f.read()
|
|
except UnicodeDecodeError:
|
|
# Try with different encoding
|
|
try:
|
|
with open(path, "r", encoding="latin-1") as f:
|
|
return f.read()
|
|
except Exception:
|
|
return f"[Binary file: {path}]"
|
|
except Exception as e:
|
|
return f"[Error reading {path}: {e}]"
|
|
|
|
def ingest(
|
|
self,
|
|
path: str | Path,
|
|
extensions: Optional[list[str]] = None,
|
|
recursive: bool = True,
|
|
) -> dict[str, Any]:
|
|
"""Ingest content from local path.
|
|
|
|
Args:
|
|
path: File or directory path
|
|
extensions: File extensions to include
|
|
recursive: Recursively scan directories
|
|
|
|
Returns:
|
|
Dict with combined text and metadata
|
|
"""
|
|
files = self.get_files(path, extensions, recursive)
|
|
|
|
# Combine content with file separators
|
|
combined_lines = []
|
|
for filepath, content in sorted(files.items()):
|
|
rel_path = filepath.relative_to(self.root_path) if filepath.is_relative_to(self.root_path) else filepath
|
|
combined_lines.append(f"=== {rel_path} ===")
|
|
combined_lines.append(content)
|
|
combined_lines.append("")
|
|
|
|
combined_text = "\n".join(combined_lines)
|
|
|
|
return {
|
|
"path": str(path),
|
|
"files": {str(k): v for k, v in files.items()},
|
|
"file_count": len(files),
|
|
"total_chars": len(combined_text),
|
|
"combined_text": combined_text,
|
|
}
|
|
|
|
def summarize(self, content: str, max_length: int = 5000) -> str:
|
|
"""Create a summary of content for use as seed.
|
|
|
|
Args:
|
|
content: Full content
|
|
max_length: Maximum length of summary
|
|
|
|
Returns:
|
|
Summarized content
|
|
"""
|
|
if len(content) <= max_length:
|
|
return content
|
|
|
|
# Take first portion + indicator
|
|
return content[:max_length] + f"\n\n[...] ({len(content) - max_length} more characters)"
|
|
|
|
|
|
def create_local_ingestor(root_path: Optional[str] = None) -> LocalIngestor:
|
|
"""Factory function to create a local ingestor.
|
|
|
|
Args:
|
|
root_path: Root directory
|
|
|
|
Returns:
|
|
Configured LocalIngestor
|
|
"""
|
|
return LocalIngestor(root_path=root_path)
|