diff --git a/opus_orchestrator/langgraph_workflow.py b/opus_orchestrator/langgraph_workflow.py index 35aa98e..862da18 100644 --- a/opus_orchestrator/langgraph_workflow.py +++ b/opus_orchestrator/langgraph_workflow.py @@ -421,26 +421,39 @@ Genre: {self.genre} } def node_write_chapters(self, state: OpusGraphState) -> dict: - """Write all chapters.""" + """Write all chapters with coherence tracking.""" print("\n✍️ WRITING CHAPTERS...") system_prompt = f"""You are a professional novelist. Style: {state.style_guide[:500] if state.style_guide else 'Professional fiction'} + +Maintain consistent character voices and world details across chapters. """ chapters = {} critique_iterations = state.critique_iterations or {} + # COHERENCE: Track what happened in previous chapters + previous_chapters_summary = "" + for plan in state.prewriting.chapter_plans: chapter_num = plan.chapter_number print(f"\n Writing chapter {chapter_num}...") user_prompt = f"""Write Chapter {chapter_num}: {plan.summary} -Story: {state.prewriting.one_sentence} -Characters: {', '.join(c.name for c in state.prewriting.characters[:3])} +Story Context: +- Premise: {state.prewriting.one_sentence} +- One Paragraph: {state.prewriting.one_paragraph} +- Key Characters: {', '.join(c.name for c in state.prewriting.characters)} -Write ~{plan.word_count_target} words. +## SUMMARY OF PREVIOUS CHAPTERS: +{previous_chapters_summary if previous_chapters_summary else "This is the first chapter."} + +## TASK: +Write the full prose for Chapter {chapter_num}. +Target: ~{plan.word_count_target} words. +Follow the story arc and ensure a smooth transition from previous events. """ result = self._call_llm(system_prompt, user_prompt) @@ -462,6 +475,7 @@ Write ~{plan.word_count_target} words. "genre": self.genre, "one_sentence": state.prewriting.one_sentence, "summary": plan.summary, + "previous_chapters": previous_chapters_summary, } # Iterate critique @@ -502,6 +516,9 @@ Write ~{plan.word_count_target} words. critique_summary=critique_summary, ) + # Update running summary for next chapter + previous_chapters_summary += f"\n- Chapter {chapter_num}: {plan.summary}" + status = "✅" if approved else "⚠️" print(f" {status} Chapter {chapter_num} complete: {word_count} words, score: {critique_score:.2f}") @@ -510,7 +527,7 @@ Write ~{plan.word_count_target} words. "critique_iterations": critique_iterations, "stage": Stage.WRITING, "progress": 0.90, - "messages": state.messages + [f"Wrote {len(chapters)} chapters with AutoGen critique"], + "messages": state.messages + [f"Wrote {len(chapters)} chapters with AutoGen critique and coherence tracking"], } def node_complete(self, state: OpusGraphState) -> dict: @@ -605,7 +622,7 @@ Write ~{plan.word_count_target} words. print(f"Framework: {self.framework}") print(f"Target: {self.target_word_count:,} words\n") - # Create initial state as dict (not Pydantic model) + # Create initial state initial_state = OpusGraphState( seed_concept=seed_concept, framework=self.framework, @@ -617,54 +634,49 @@ Write ~{plan.word_count_target} words. # Use GEMINI PATTERN: stream with values, then snapshot fallback final_state = None + last_error = None # Stream mode "values" emits FULL state after each node print("[RUN] Starting stream...") try: for chunk in self.graph.stream(initial_state, config, stream_mode="values"): - print(f"[STREAM] Got chunk type: {type(chunk)}") - if isinstance(chunk, OpusGraphState): final_state = chunk - # Track progress - if chunk.stage.value == "complete": - print(f"[STREAM] Reached COMPLETE stage") - if chunk.manuscript: - print(f"[STREAM] Manuscript present: {len(chunk.manuscript)} chars") elif isinstance(chunk, dict): - print(f"[STREAM] Got dict, keys: {chunk.keys()}") - # Try to reconstruct - if 'manuscript' in chunk and chunk.get('manuscript'): + # Try to reconstruct if it looks like state + if 'stage' in chunk: final_state = OpusGraphState(**chunk) - print(f"[STREAM] Reconstructed state from dict") except Exception as e: print(f"[RUN] Stream error: {e}") - import traceback - traceback.print_exc() + last_error = e # Don't give up - try to recover partial state # Enable checkpointing for recovery print("[RUN] Checking final state...") + if final_state is None or final_state.stage != Stage.COMPLETE: + print("[WARNING] Workflow incomplete or failed. Attempting recovery from checkpoint...") + try: + snapshot = self.graph.get_state(config) + if snapshot and snapshot.values: + # Merge snapshot values into OpusGraphState + final_state = OpusGraphState(**snapshot.values) + print(f"[RECOVERY] Recovered state at stage: {final_state.stage}") + except Exception as e2: + print(f"[RECOVERY] Failed to get state snapshot: {e2}") + if final_state is None: - print("[WARNING] No state from stream, attempting recovery...") - # Try to recover from any partial state that was accumulated - # In a full implementation, we'd load from checkpoint here - # For now, raise a clear error instead of silently failing raise RuntimeError( - f"Workflow failed to complete. " - f"Last known stage: {getattr(final_state, 'stage', 'unknown') if final_state else 'initial'}. " - f"Error: {e}" + f"Workflow failed to complete and could not be recovered. " + f"Last error: {last_error}" ) - # Verify we have manuscript - if not final_state.manuscript: - print("[WARNING] No manuscript generated!") - # Return partial state for debugging - if final_state.prewriting.one_sentence: - print(f"[PARTIAL] Generated: {final_state.prewriting.one_sentence[:100]}...") - raise RuntimeError("Workflow completed but no manuscript was generated.") + # Verify we have manuscript if we finished + if final_state.stage == Stage.COMPLETE and not final_state.manuscript: + print("[WARNING] Workflow completed but no manuscript was generated.") - print(f"[RESULT] SUCCESS! {len(final_state.chapters)} chapters, {final_state.total_word_count} words") + print(f"[RESULT] Final Stage: {final_state.stage}") + if final_state.total_word_count > 0: + print(f"[RESULT] SUCCESS! {len(final_state.chapters)} chapters, {final_state.total_word_count} words") return final_state diff --git a/opus_orchestrator/orchestrator.py b/opus_orchestrator/orchestrator.py index 14f2a0e..146aaae 100644 --- a/opus_orchestrator/orchestrator.py +++ b/opus_orchestrator/orchestrator.py @@ -316,14 +316,40 @@ Generate a detailed outline with: # ========================================================================= - async def ingest(self, content: Optional[RawContent] = None) -> OpusState: - """Ingest raw content from repository.""" - if self.repo_url and not content: - content = RawContent( - content_type="repository", - text="[Content would be extracted from GitHub repository]", - metadata={"repo_url": self.repo_url}, + async def ingest( + self, + content: Optional[RawContent] = None, + sources: Optional[list[dict]] = None, + ) -> OpusState: + """Ingest raw content from multiple sources. + + Args: + content: Pre-loaded raw content + sources: List of source configurations (github, local, s3) + """ + if sources: + from opus_orchestrator.utils.multi_source_ingest import ingest_multiple + + print(f"📥 Ingesting from {len(sources)} sources...") + + result = await ingest_multiple( + sources=sources, + github_token=self.config.github_token, + # AWS keys would come from environment ) + + content = RawContent( + content_type="multi-source", + text=result.merged_content, + metadata={ + "total_sources": result.total_sources, + "successful": result.successful_sources, + "summary": result.source_summary, + }, + ) + elif self.repo_url and not content: + # Fallback to single GitHub repo + content = self.ingest_from_github(self.repo_url) self.state = OpusState( repo_url=self.repo_url or "", @@ -828,19 +854,27 @@ Make it vivid, engaging, and true to the characters. return manuscript # ========================================================================= - # MAIN RUN METHOD - FULL SNOWFLAKE + # MAIN RUN METHOD - FULL PIPELINE # ========================================================================= - async def run(self) -> Manuscript: - """Run the full pipeline with selected framework.""" - framework_name = self.framework_info.get("name", "Unknown") + async def run(self, sources: Optional[list[dict]] = None) -> Manuscript: + """Run the full pipeline (Fiction or Nonfiction).""" print(f"\n{'='*60}") - print(f"❄️ OPUS ORCHESTRATOR - {framework_name.upper()}") - print(f"{'='*60}") - print(f"Framework: {self.framework_info.get('description', '')}\n") + print(f"❄️ OPUS ORCHESTRATOR - {self.book_type.value.upper()}") + print(f"{'='*60}\n") - await self.ingest() + await self.ingest(sources=sources) + + if self.book_type == BookType.FICTION: + return await self._run_fiction() + else: + return await self._run_nonfiction() + + async def _run_fiction(self) -> Manuscript: + """Run the fiction pipeline.""" + framework_name = self.framework_info.get("name", "Unknown") + print(f"Framework: {framework_name}\n") # Pre-writing stages await self.snowflake_stage_1() # One sentence @@ -858,17 +892,46 @@ Make it vivid, engaging, and true to the characters. await self.generate_blueprint() # Write and critique chapters - manuscript = await self.compile_manuscript() + return await self.compile_manuscript() - print(f"\n{'='*60}") - print("✅ COMPLETE!") - print(f"{'='*60}") - print(f"📖 Title: {manuscript.title}") - print(f"📄 Words: {manuscript.total_word_count:,}") - print(f"📑 Chapters: {len(manuscript.chapters)}") - print(f"🎯 Framework: {framework_name}") - - return manuscript + async def _run_nonfiction(self) -> Manuscript: + """Run the nonfiction pipeline.""" + print(f"Purpose: {self.purpose.value if self.purpose else 'N/A'}") + print(f"Framework: {self.nonfiction_framework.get('name', 'N/A')}\n") + + # 1. Research & Analysis + print("🔍 RESEARCH & ANALYSIS...") + # (Simplified for now - would use researcher agent) + self.one_sentence = f"A book about {self.intent.genre or 'the subject'}" + self.one_paragraph = f"Comprehensive guide covering {self.intent.genre}" + + # 2. Generate Chapters based on Framework stages + print("📅 GENERATING BLUEPRINT...") + chapters_blueprint = [] + for i, stage in enumerate(self.framework_stages): + chapters_blueprint.append(ChapterBlueprint( + chapter_number=i + 1, + title=stage, + summary=f"Section covering {stage}", + word_count_target=self.intent.target_word_count // len(self.framework_stages), + )) + + self.state.blueprint = BookBlueprint( + title=self.intent.working_title or "Nonfiction Guide", + genre=self.intent.genre or "nonfiction", + target_audience=self.intent.target_audience, + target_word_count=self.intent.target_word_count, + structure="framework-driven", + themes=[], + tone=self.intent.tone or "informative", + chapters=chapters_blueprint, + ) + + # 3. Create style guide + await self.create_style_guide() + + # 4. Write chapters + return await self.compile_manuscript() def save_manuscript(self, output_path: Optional[Path] = None) -> Path: """Save manuscript and pre-writing to files.""" diff --git a/opus_orchestrator/utils/llm.py b/opus_orchestrator/utils/llm.py index df8a103..d4ca581 100644 --- a/opus_orchestrator/utils/llm.py +++ b/opus_orchestrator/utils/llm.py @@ -174,7 +174,7 @@ class LLMClient: else: raise Exception(f"Unexpected MiniMax response: {data}") - async def _complete_openai( + async def _complete_openai_async( self, system_prompt: str, user_prompt: str, @@ -182,7 +182,7 @@ class LLMClient: max_tokens: Optional[int], headers: dict, ) -> str: - """Call OpenAI API.""" + """Call OpenAI API (async).""" payload = { "model": self.model, "messages": [