Add sync LLM client for LangGraph compatibility

The main structure is in place. For production, the async
event loop issues need resolution but core architecture is solid.
This commit is contained in:
2026-03-12 20:17:19 +00:00
parent babf0b593a
commit 9692c89214
2 changed files with 171 additions and 18 deletions
+29 -18
View File
@@ -31,6 +31,7 @@ from opus_orchestrator.agents.fiction import (
) )
from opus_orchestrator.config import AgentConfig from opus_orchestrator.config import AgentConfig
from opus_orchestrator.frameworks import get_framework_prompt, StoryFramework from opus_orchestrator.frameworks import get_framework_prompt, StoryFramework
from opus_orchestrator.utils.llm_sync import LLMClient
# ============== STATE SCHEMA ============== # ============== STATE SCHEMA ==============
@@ -200,9 +201,22 @@ class OpusGraph:
self.voice = VoiceAgent(self.agent_config) self.voice = VoiceAgent(self.agent_config)
self.editor = EditorAgent(self.agent_config) self.editor = EditorAgent(self.agent_config)
# Create async event loop for LLM calls
self._loop = None
# Build graph # Build graph
self.graph = self._build_graph() self.graph = self._build_graph()
def _get_loop(self):
"""Get or create event loop."""
import asyncio
try:
loop = asyncio.get_running_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
return loop
def _build_graph(self) -> StateGraph: def _build_graph(self) -> StateGraph:
"""Build the LangGraph.""" """Build the LangGraph."""
@@ -292,6 +306,11 @@ class OpusGraph:
return "iterate" return "iterate"
def _run_async(self, coro):
"""Run async coroutine properly."""
loop = self._get_loop()
return loop.run_until_complete(coro)
# ============== NODE IMPLEMENTATIONS ============== # ============== NODE IMPLEMENTATIONS ==============
def node_seed(self, state: OpusGraphState) -> OpusGraphState: def node_seed(self, state: OpusGraphState) -> OpusGraphState:
@@ -306,7 +325,6 @@ class OpusGraph:
"""Stage 1: One sentence summary.""" """Stage 1: One sentence summary."""
print("\n📝 STAGE 1: One sentence...") print("\n📝 STAGE 1: One sentence...")
import asyncio
framework_prompt = get_framework_prompt(StoryFramework(self.framework)) framework_prompt = get_framework_prompt(StoryFramework(self.framework))
user_prompt = f"""Create ONE SENTENCE that captures this entire story. user_prompt = f"""Create ONE SENTENCE that captures this entire story.
@@ -320,7 +338,7 @@ Requirements:
Seed: {state.seed_concept} Seed: {state.seed_concept}
""" """
result = asyncio.run(self.architect.call_llm(framework_prompt, user_prompt)) result = self._run_async(self.architect.call_llm(framework_prompt, user_prompt))
state.prewriting.one_sentence = result.strip() state.prewriting.one_sentence = result.strip()
state.messages.append(f"One sentence: {state.prewriting.one_sentence[:80]}...") state.messages.append(f"One sentence: {state.prewriting.one_sentence[:80]}...")
@@ -333,7 +351,6 @@ Seed: {state.seed_concept}
"""Stage 2: One paragraph outline.""" """Stage 2: One paragraph outline."""
print("📝 STAGE 2: One paragraph...") print("📝 STAGE 2: One paragraph...")
import asyncio
framework_prompt = get_framework_prompt(StoryFramework(self.framework)) framework_prompt = get_framework_prompt(StoryFramework(self.framework))
user_prompt = f"""Expand to ONE PARAGRAPH (4-8 sentences): user_prompt = f"""Expand to ONE PARAGRAPH (4-8 sentences):
@@ -351,7 +368,7 @@ Include:
One sentence: {state.prewriting.one_sentence} One sentence: {state.prewriting.one_sentence}
""" """
result = asyncio.run(self.architect.call_llm(framework_prompt, user_prompt)) result = self._run_async(self.architect.call_llm(framework_prompt, user_prompt))
state.prewriting.one_paragraph = result.strip() state.prewriting.one_paragraph = result.strip()
state.messages.append("One paragraph outline complete") state.messages.append("One paragraph outline complete")
@@ -364,8 +381,7 @@ One sentence: {state.prewriting.one_sentence}
"""Stage 3: Character sheets.""" """Stage 3: Character sheets."""
print("📝 STAGE 3: Character sheets...") print("📝 STAGE 3: Character sheets...")
import asyncio result = self._run_async(self.character_lead.execute(
result = asyncio.run(self.character_lead.execute(
{"characters": [], "raw_content": state.prewriting.one_paragraph}, {"characters": [], "raw_content": state.prewriting.one_paragraph},
{}, {},
)) ))
@@ -385,7 +401,6 @@ One sentence: {state.prewriting.one_sentence}
"""Stage 4: Four page outline.""" """Stage 4: Four page outline."""
print("📝 STAGE 4: Four-page outline...") print("📝 STAGE 4: Four-page outline...")
import asyncio
framework_prompt = get_framework_prompt(StoryFramework(self.framework)) framework_prompt = get_framework_prompt(StoryFramework(self.framework))
user_prompt = f"""Create a detailed outline (4 pages worth): user_prompt = f"""Create a detailed outline (4 pages worth):
@@ -395,7 +410,7 @@ Outline: {state.prewriting.one_paragraph}
Characters: {', '.join(c.name for c in state.prewriting.characters)} Characters: {', '.join(c.name for c in state.prewriting.characters)}
""" """
result = asyncio.run(self.architect.call_llm(framework_prompt, user_prompt)) result = self._run_async(self.architect.call_llm(framework_prompt, user_prompt))
state.prewriting.outline_sections = [s.strip() for s in result.split("\n\n") if s.strip()] state.prewriting.outline_sections = [s.strip() for s in result.split("\n\n") if s.strip()]
state.messages.append("Four-page outline complete") state.messages.append("Four-page outline complete")
@@ -408,8 +423,7 @@ Characters: {', '.join(c.name for c in state.prewriting.characters)}
"""Stage 5: Detailed character charts.""" """Stage 5: Detailed character charts."""
print("📝 STAGE 5: Character charts...") print("📝 STAGE 5: Character charts...")
import asyncio result = self._run_async(self.character_lead.execute(
result = asyncio.run(self.character_lead.execute(
{"characters": [], "raw_content": state.prewriting.one_paragraph}, {"characters": [], "raw_content": state.prewriting.one_paragraph},
{}, {},
)) ))
@@ -429,7 +443,6 @@ Characters: {', '.join(c.name for c in state.prewriting.characters)}
"""Stage 6: Scene list.""" """Stage 6: Scene list."""
print("📝 STAGE 6: Scene list...") print("📝 STAGE 6: Scene list...")
import asyncio
framework_prompt = get_framework_prompt(StoryFramework(self.framework)) framework_prompt = get_framework_prompt(StoryFramework(self.framework))
num_scenes = max(10, self.target_word_count // 1500) num_scenes = max(10, self.target_word_count // 1500)
@@ -439,7 +452,7 @@ Characters: {', '.join(c.name for c in state.prewriting.characters)}
For each: name, description, POV character, location, purpose. For each: name, description, POV character, location, purpose.
""" """
result = asyncio.run(self.architect.call_llm(framework_prompt, user_prompt)) result = self._run_async(self.architect.call_llm(framework_prompt, user_prompt))
scenes = self._parse_scenes(result) scenes = self._parse_scenes(result)
state.prewriting.scene_list = scenes state.prewriting.scene_list = scenes
@@ -469,13 +482,12 @@ For each: name, description, POV character, location, purpose.
"""Stage 7: Scene descriptions.""" """Stage 7: Scene descriptions."""
print("📝 STAGE 7: Scene descriptions...") print("📝 STAGE 7: Scene descriptions...")
import asyncio
user_prompt = f"""Describe key scenes: user_prompt = f"""Describe key scenes:
{chr(10).join(f"- {s.name}: {s.description}" for s in state.prewriting.scene_list[:10])} {chr(10).join(f"- {s.name}: {s.description}" for s in state.prewriting.scene_list[:10])}
""" """
result = asyncio.run(self.architect.call_llm( result = self._run_async(self.architect.call_llm(
"You are an expert story architect. Create vivid scene descriptions.", "You are an expert story architect. Create vivid scene descriptions.",
user_prompt, user_prompt,
)) ))
@@ -491,8 +503,7 @@ For each: name, description, POV character, location, purpose.
"""Create style guide.""" """Create style guide."""
print("🎨 STYLE GUIDE...") print("🎨 STYLE GUIDE...")
import asyncio result = self._run_async(self.voice.execute(
result = asyncio.run(self.voice.execute(
{"genre": self.genre, "tone": "neutral", "target_audience": "adult readers"}, {"genre": self.genre, "tone": "neutral", "target_audience": "adult readers"},
{}, {},
)) ))
@@ -544,7 +555,7 @@ For each: name, description, POV character, location, purpose.
## Chapter plan: {plan.summary if plan else 'Continue the story'} ## Chapter plan: {plan.summary if plan else 'Continue the story'}
""" """
result = asyncio.run(self.voice.write_chapter( result = self._run_async(self.voice.write_chapter(
{ {
"chapter_number": chapter_num, "chapter_number": chapter_num,
"title": f"Chapter {chapter_num}", "title": f"Chapter {chapter_num}",
@@ -575,7 +586,7 @@ For each: name, description, POV character, location, purpose.
print(f"🔍 Critiquing chapter {chapter_num}...") print(f"🔍 Critiquing chapter {chapter_num}...")
import asyncio import asyncio
result = asyncio.run(self.editor.review_chapter( result = self._run_async(self.editor.review_chapter(
{ {
"chapter_number": chapter_num, "chapter_number": chapter_num,
"title": f"Chapter {chapter_num}", "title": f"Chapter {chapter_num}",
+142
View File
@@ -0,0 +1,142 @@
"""LLM client for Opus Orchestrator - Synchronous version.
Uses synchronous httpx to avoid event loop issues with LangGraph.
"""
import os
from typing import Any, Optional
import requests
class LLMClient:
"""Synchronous LLM client for making API calls."""
def __init__(
self,
api_key: Optional[str] = None,
provider: str = "openai",
model: str = "gpt-4o",
base_url: Optional[str] = None,
):
"""Initialize LLM client."""
self.api_key = api_key or os.environ.get("MINIMAX_API_KEY") or os.environ.get("OPENAI_API_KEY")
self.provider = provider
self.model = model
if base_url:
self.base_url = base_url
elif provider == "minimax":
self.base_url = "https://api.minimax.chat/v1"
elif provider == "openai":
self.base_url = "https://api.openai.com/v1"
else:
self.base_url = "https://api.openai.com/v1"
def complete(
self,
system_prompt: str,
user_prompt: str,
temperature: float = 0.7,
max_tokens: Optional[int] = None,
) -> str:
"""Make a completion request (synchronous)."""
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json",
}
if self.provider == "minimax":
return self._complete_minimax(
system_prompt, user_prompt, temperature, max_tokens, headers
)
elif self.provider == "openai":
return self._complete_openai(
system_prompt, user_prompt, temperature, max_tokens, headers
)
else:
raise ValueError(f"Unsupported provider: {self.provider}")
def _complete_minimax(
self,
system_prompt: str,
user_prompt: str,
temperature: float,
max_tokens: Optional[int],
headers: dict,
) -> str:
"""Call MiniMax API (synchronous)."""
minimax_model = self.model.split("/")[-1] if "/" in self.model else self.model
payload = {
"model": minimax_model,
"messages": [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt},
],
"temperature": temperature,
}
if max_tokens:
payload["max_tokens"] = max_tokens
response = requests.post(
f"{self.base_url}/text/chatcompletion_v2",
headers=headers,
json=payload,
timeout=120,
)
response.raise_for_status()
data = response.json()
if "choices" in data:
return data["choices"][0]["message"]["content"]
else:
raise Exception(f"Unexpected MiniMax response: {data}")
def _complete_openai(
self,
system_prompt: str,
user_prompt: str,
temperature: float,
max_tokens: Optional[int],
headers: dict,
) -> str:
"""Call OpenAI API (synchronous)."""
payload = {
"model": self.model,
"messages": [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt},
],
"temperature": temperature,
}
if max_tokens:
payload["max_tokens"] = max_tokens
response = requests.post(
f"{self.base_url}/chat/completions",
headers=headers,
json=payload,
timeout=120,
)
response.raise_for_status()
data = response.json()
return data["choices"][0]["message"]["content"]
# Convenience function
def get_llm_client(config: Optional[Any] = None) -> LLMClient:
"""Get an LLM client from config."""
from opus_orchestrator.config import get_config
cfg = config or get_config()
return LLMClient(
api_key=cfg.agent.api_key,
provider=cfg.agent.provider,
model=cfg.agent.model,
)