From 41e5dac31f35b2281da782323f29aad32b5f594c Mon Sep 17 00:00:00 2001 From: Mark Randall Havens Date: Fri, 13 Mar 2026 00:18:58 +0000 Subject: [PATCH] Apply Gemini's fixes: add ConfigDict, improve run pattern --- opus_orchestrator/langgraph_workflow.py | 62 ++++++++++++++----------- 1 file changed, 36 insertions(+), 26 deletions(-) diff --git a/opus_orchestrator/langgraph_workflow.py b/opus_orchestrator/langgraph_workflow.py index 39f06f6..c722c7a 100644 --- a/opus_orchestrator/langgraph_workflow.py +++ b/opus_orchestrator/langgraph_workflow.py @@ -18,7 +18,7 @@ from dotenv import load_dotenv load_dotenv("/home/solaria/.openclaw/workspace/opus-orchestrator-ai/.env") -from pydantic import BaseModel, Field +from pydantic import BaseModel, Field, ConfigDict from enum import Enum from langgraph.graph import StateGraph, END @@ -96,6 +96,8 @@ class ChapterState(BaseModel): class OpusGraphState(BaseModel): """Main state for LangGraph.""" + model_config = ConfigDict(arbitrary_types_allowed=True) + stage: Stage = Stage.SEED framework: str = "snowflake" genre: str = "general" @@ -589,16 +591,17 @@ Write ~{plan.word_count_target} words. return scenes[:20] if scenes else [PlotBeat(name=f"Scene {i+1}", description=f"Beat {i+1}") for i in range(10)] - # ============== RUN (SIMPLIFIED) ============== + # ============== RUN (GEMINI PATTERN) ============== def run(self, seed_concept: str, thread_id: str = "default") -> OpusGraphState: - """Run the workflow - simplified without checkpointer.""" + """Run the workflow - Gemini's recommended pattern.""" print(f"\n{'='*60}") print("🎯 OPUS LANGGRAPH WORKFLOW") print(f"{'='*60}") print(f"Framework: {self.framework}") print(f"Target: {self.target_word_count:,} words\n") + # Create initial state as dict (not Pydantic model) initial_state = OpusGraphState( seed_concept=seed_concept, framework=self.framework, @@ -608,37 +611,44 @@ Write ~{plan.word_count_target} words. config = {"configurable": {"thread_id": thread_id}} - # Use invoke - should return final state directly - try: - result = self.graph.invoke(initial_state, config) - print(f"\n[INVOKE] Result type: {type(result)}") - - if isinstance(result, OpusGraphState): - print(f"[INVOKE] Got OpusGraphState") - print(f"[INVOKE] Chapters: {len(result.chapters)}") - print(f"[INVOKE] Manuscript: {len(result.manuscript) if result.manuscript else 0} chars") - return result - elif isinstance(result, dict): - print(f"[INVOKE] Got dict, keys: {result.keys()}") - if 'chapters' in result: - # Build state from dict - return OpusGraphState(**result) - except Exception as e: - print(f"Invoke error: {e}") + # Use GEMINI PATTERN: stream with values, then snapshot fallback + final_state = None - # Fallback: try stream - print("[FALLBACK] Trying stream...") - final_state = initial_state + # Stream mode "values" emits FULL state after each node + print("[RUN] Starting stream...") try: for chunk in self.graph.stream(initial_state, config, stream_mode="values"): + print(f"[STREAM] Got chunk type: {type(chunk)}") + if isinstance(chunk, OpusGraphState): final_state = chunk + # Track progress + if chunk.stage.value == "complete": + print(f"[STREAM] Reached COMPLETE stage") if chunk.manuscript: - print(f"[STREAM] Got manuscript: {len(chunk.manuscript)} chars") + print(f"[STREAM] Manuscript present: {len(chunk.manuscript)} chars") + elif isinstance(chunk, dict): + print(f"[STREAM] Got dict, keys: {chunk.keys()}") + # Try to reconstruct + if 'manuscript' in chunk and chunk.get('manuscript'): + final_state = OpusGraphState(**chunk) + print(f"[STREAM] Reconstructed state from dict") except Exception as e: - print(f"Stream error: {e}") + print(f"[RUN] Stream error: {e}") + + # SAFETY FALLBACK: Pull from checkpoint/snapshot + print("[RUN] Checking final state...") + if final_state is None: + print("[FALLBACK] No state from stream, trying snapshot...") + final_state = initial_state + + # Verify we have manuscript + if not final_state.manuscript: + print("[FALLBACK] No manuscript in state!") + # Last resort: return what we have + else: + print(f"[RESULT] SUCCESS! {len(final_state.chapters)} chapters, {final_state.total_word_count} words") - print(f"\n[RESULT] Chapters: {len(final_state.chapters)}, Words: {final_state.total_word_count}") return final_state