Orchestration
Overview
The Orchestration class provides the framework for coordinating multiple agents in complex workflows. It manages agent lifecycle, data flow between agents, and execution sequencing.
OrchestrationRuntimeConfiguration
OrchestrationRuntimeConfiguration inherits Orchestration<ChatMessage, ChatMessage> & IRuntimeConfiguration making it both a Orchestration Graph and a Stateful Runtime Configuration
"Orchestration Graph"
Think of Orchestration<TInput, TOuput> as a Graph of connected Nodes: OrchestrationRunnable<TInput, TOutput> connected by conditional transitions:OrchestrationAdvancers<TInput, TOutput> and based off the condition evaluation Can Create a New process:RunnableProcess which holds all the information for the next graph tick cycle.
Orchestration<TInput, TOuput>
The Orchestration requires 3 main things to configure
- Generic TInput/TOutput Parameters
- Defines the required Input for Invoking the Orchestration and the expected output result
- Set SetEntryRunnable(initialRunnable) [runnable requires TInput type of the Orchestration generic TInput type]
- Set SetRunnableWithResult(outputRunnable) [runnable requires TOutput type of the Orchestration generic TOutput type]
OrchestrationRunnable<TInput, TOutput>
To Set up a Runnable:
- Set Generic TInput/TOutput Parameters
- Create the initializer
- Implement the Invoke
Bare Minimum
csharp
public class BareMin : OrchestrationRuntimeConfiguration
{
AgentRunnable agent;
public BareMin()
{
agent = new AgentRunnable(this) { AllowDeadEnd = true }; //Set deadend to disable reattempts and finish the execution
SetEntryRunnable(agent);
SetRunnableWithResult(agent);
}
public override void OnRuntimeInitialized()
{
base.OnRuntimeInitialized();
agent.OnAgentRunnerEvent += (sEvent) =>
{
// Forward agent runner events (including streaming) to runtime
this.OnRuntimeEvent?.Invoke(new ChatRuntimeAgentRunnerEvents(sEvent, Runtime?.Id ?? string.Empty));
};
}
}
public class AgentRunnable : OrchestrationRunnable<ChatMessage, ChatMessage>
{
TornadoAgent Agent;
public Action<AgentRunnerEvents>? OnAgentRunnerEvent { get; set; }
public AgentRunnable(Orchestration orchestrator) : base(orchestrator)
{
TornadoApi api = new TornadoApi("your-api-key");
string instructions = $"""
You are a helpful research assistant. Given a query, come up with a set of web searches,
to perform to best answer the query. Output between 1 and 5 terms to query for.
""";
Agent = new TornadoAgent(
client: api,
model: ChatModel.OpenAi.Gpt5.V5Mini,
name: "Research Agent",
instructions: instructions,
streaming: true);
}
public override async ValueTask<ChatMessage> Invoke(RunnableProcess<ChatMessage, ChatMessage> process)
{
Conversation conv = await Agent.RunAsync(
appendMessages: new List<ChatMessage> { process.Input },
streaming: Agent.Streaming,
onAgentRunnerEvent: (sEvent) =>
{
OnAgentRunnerEvent?.Invoke(sEvent);
return ValueTask.CompletedTask;
});
return conv.Messages.Last();
}
}Connecting Nodes
More advanced connected Orchestration from MagneticOne implementation attempt
csharp
public MagenticOneConfiguration()
{
TornadoApi client = new TornadoApi(Environment.GetEnvironmentVariable("OPENAI_API_KEY"), LLmProviders.OpenAi);
RecordSteps = true;
// Create all the specialized runnables
planner = new PlanningRunnable(client, this);
webSurfer = new WebSurferRunnable(client, this);
fileSurfer = new FileSurferRunnable(client, this);
coder = new CoderRunnable(client, this);
terminal = new TerminalRunnable(client, this);
orchestrator = new OrchestratorRunnable(client, this);
directOrchestrator = new DirectOrchestratorRunnable(client, this);
exit = new ExitRunnable(this) { AllowDeadEnd = true };
// Setup orchestration flow: Planning -> specialized agents -> orchestrator for final summary -> exit
planner.AddAdvancer((plan) => plan.RequiredAgents.Contains("WebSurfer") ||
plan.OriginalTask.ToLower().Contains("search") ||
plan.OriginalTask.ToLower().Contains("web") ||
plan.OriginalTask.ToLower().Contains("research"), webSurfer);
planner.AddAdvancer((plan) => plan.RequiredAgents.Contains("FileSurfer") ||
plan.OriginalTask.ToLower().Contains("file") ||
plan.OriginalTask.ToLower().Contains("document") ||
plan.OriginalTask.ToLower().Contains("write"), fileSurfer);
planner.AddAdvancer((plan) => plan.RequiredAgents.Contains("Coder") ||
plan.OriginalTask.ToLower().Contains("code") ||
plan.OriginalTask.ToLower().Contains("program") ||
plan.OriginalTask.ToLower().Contains("script"), coder);
planner.AddAdvancer((plan) => plan.RequiredAgents.Contains("Terminal") ||
plan.OriginalTask.ToLower().Contains("command") ||
plan.OriginalTask.ToLower().Contains("terminal") ||
plan.OriginalTask.ToLower().Contains("run"), terminal);
// If no specific agents are needed, go directly to directOrchestrator
planner.AddAdvancer((plan) => !RequiresSpecializedAgent(plan), directOrchestrator);
// All specialized agents flow to orchestrator for final summary
webSurfer.AddAdvancer(orchestrator);
fileSurfer.AddAdvancer(orchestrator);
coder.AddAdvancer(orchestrator);
terminal.AddAdvancer(orchestrator);
// Both orchestrators produce final result and exit
orchestrator.AddAdvancer((result) => !string.IsNullOrWhiteSpace(result.TaskResult), exit);
directOrchestrator.AddAdvancer((result) => !string.IsNullOrWhiteSpace(result.TaskResult), exit);
// Configure entry and exit points
SetEntryRunnable(planner);
SetRunnableWithResult(exit);
}Using the builder
csharp
public OrchestrationRuntimeConfiguration BuildComplexAgent(TornadoApi client, bool streaming = false, string chromaUri = "http://localhost:8001/api/v2/",string conversationFile = "AgentV10.json", string withLongtermMemoryID = "AgentV10")
{
AgentRunnable RunnableAgent = new AgentRunnable(client, this, streaming);
ModeratorRunnable inputModerator = new ModeratorRunnable(client, this);
VectorSearchRunnable vectorSearchRunnable = new VectorSearchRunnable(client, this, chromaUri);
WebSearchRunnable webSearchRunnable = new WebSearchRunnable(client, this);
VectorSaveRunnable vectorSaveRunnable = new VectorSaveRunnable(client, this, chromaUri);
ExitPathRunnable exitPathRunnable = new ExitPathRunnable(this);
VectorEntitySaveRunnable vectorEntitySaveRunnable = new VectorEntitySaveRunnable(client, this, chromaUri);
ChatPassthruRunnable chatPassthruRunnable = new ChatPassthruRunnable(this);
return new OrchestrationBuilder(this)
.SetEntryRunnable(inputModerator)
.SetOutputRunnable(RunnableAgent)
.WithRuntimeInitializer((config) =>
{
RunnableAgent.OnAgentRunnerEvent += (sEvent) =>
{
// Forward agent runner events (including streaming) to runtime
config.OnRuntimeEvent?.Invoke(new ChatRuntimeAgentRunnerEvents(sEvent, config.Runtime?.Id ?? string.Empty));
};
return ValueTask.CompletedTask;
})
.WithRuntimeProperty("LatestUserMessage", "")
.WithRuntimeProperty("MemoryCollectionName", $"{withLongtermMemoryID}")
.WithRuntimeProperty("EntitiesCollectionName", $"{withLongtermMemoryID}Entities")
.WithChatMemory(conversationFile)
.WithDataRecording()
.AddParallelAdvancement(inputModerator,
new OrchestrationAdvancer<ChatMessage>(webSearchRunnable),
new OrchestrationAdvancer<ChatMessage>(vectorSearchRunnable),
new OrchestrationAdvancer<ChatMessage>(vectorEntitySaveRunnable),
new OrchestrationAdvancer<ChatMessage>(chatPassthruRunnable))
.AddCombinationalAdvancement<string>(
fromRunnables: [webSearchRunnable, vectorSearchRunnable, vectorEntitySaveRunnable, chatPassthruRunnable],
condition: _ => true,
toRunnable: RunnableAgent,
requiredInputToAdvance: 4,
combinationRunnableName: "CombinationalContextWaiter")
.AddParallelAdvancement(RunnableAgent,
new OrchestrationAdvancer<ChatMessage>(vectorSaveRunnable),
new OrchestrationAdvancer<ChatMessage>(exitPathRunnable))
.AddExitPath<ChatMessage>(exitPathRunnable, _ => true)
.CreateDotGraphVisualization("ChatBotAgent.dot")
.Build();
}Best Practices
- Design clear data contracts between stages
- Handle errors at appropriate levels
- Implement timeout mechanisms
- Monitor orchestration health
- Test each stage independently