WorkflowBuilder
Fluent API for constructing DAG-based workflows with nodes, edges, conditionals, loops, and parallel branches.
Overview
WorkflowBuilder provides a chainable API for defining workflow graphs. You add nodes, wire them together with after dependencies, and call .build() to get a validated Workflow object ready for execution.
import { WorkflowBuilder } from '@cogitator-ai/workflows';
interface MyState {
input: string;
result?: string;
}
const workflow = new WorkflowBuilder<MyState>('my-workflow')
.initialState({ input: '' })
.addNode('step-1', myNodeFn)
.addNode('step-2', anotherNodeFn, { after: ['step-1'] })
.build();Constructor
const builder = new WorkflowBuilder<S>(name: string);The generic type S defines the workflow state shape. It must extend WorkflowState (which is Record<string, unknown>).
.initialState(state)
Set the default state that the workflow starts with. Input provided at execution time is merged on top of this.
const builder = new WorkflowBuilder<{ count: number; items: string[] }>('counter').initialState({
count: 0,
items: [],
});.addNode(name, fn, options?)
Add a processing node to the workflow. The fn is a NodeFn<S> -- an async function that receives a NodeContext<S> and returns a NodeResult<S>.
builder.addNode('process', async (ctx) => {
const transformed = ctx.state.input.toUpperCase();
return {
state: { result: transformed },
output: transformed,
};
});Options
The optional options parameter accepts:
after-- Array of node names this node depends on. The node won't execute until all dependencies complete.config-- OptionalNodeConfigfor metadata.
builder.addNode('analyze', analyzeNodeFn, {
after: ['fetch', 'validate'],
config: { timeout: 30000 },
});In practice, you'll use node factories (agentNode, toolNode, functionNode) instead of raw functions. See Node Types.
.addConditional(name, condition, options?)
Add a conditional routing node. The condition function receives the current state and returns the name(s) of the target node(s) to execute next.
interface ReviewState {
code: string;
quality: 'good' | 'bad';
fixedCode?: string;
}
const workflow = new WorkflowBuilder<ReviewState>('code-review')
.initialState({ code: '', quality: 'good' })
.addNode('analyze', analyzeNode)
.addConditional(
'route',
(state) => {
return state.quality === 'good' ? 'approve' : 'fix';
},
{ after: ['analyze'] }
)
.addNode('approve', approveNode, { after: ['route'] })
.addNode('fix', fixNode, { after: ['route'] })
.build();The condition function can return a single node name or an array for multi-target routing.
.addLoop(name, options)
Add a loop construct that re-executes a portion of the graph while a condition holds.
interface RefinementState {
draft: string;
score: number;
iteration: number;
}
const workflow = new WorkflowBuilder<RefinementState>('refine')
.initialState({ draft: '', score: 0, iteration: 0 })
.addNode('write', writeNode)
.addNode('evaluate', evaluateNode, { after: ['write'] })
.addLoop('check-quality', {
condition: (state) => state.score < 8 && state.iteration < 5,
back: 'write',
exit: 'finalize',
after: ['evaluate'],
})
.addNode('finalize', finalizeNode)
.build();Loop Options
| Option | Type | Description |
|---|---|---|
condition | (state: S) => boolean | If true, loop back; if false, exit |
back | string | Node to jump back to when looping |
exit | string | Node to proceed to when done |
after | string[] | Dependencies (optional) |
.addParallel(name, targets, options?)
Add a parallel fan-out node that dispatches execution to multiple targets simultaneously.
const workflow = new WorkflowBuilder<SearchState>('multi-search')
.initialState({ query: '' })
.addNode('prepare', prepareNode)
.addParallel('fan-out', ['search-web', 'search-papers', 'search-code'], {
after: ['prepare'],
})
.addNode('search-web', webSearchNode)
.addNode('search-papers', paperSearchNode)
.addNode('search-code', codeSearchNode)
.addNode('combine', combineNode, {
after: ['search-web', 'search-papers', 'search-code'],
})
.build();.entryPoint(nodeName)
Override the auto-detected entry point. By default, the builder picks the first node with no dependencies. Use this when you need explicit control.
builder
.addNode('init', initNode)
.addNode('process', processNode, { after: ['init'] })
.entryPoint('init')
.build();.build()
Validates the workflow graph and returns a Workflow<S> object. Build-time validation checks:
- Entry point exists in the node set
- All edges reference existing nodes
- Loop
backandexittargets exist - Conditional targets exist
- Parallel targets exist
Throws an Error if validation fails.
const workflow = builder.build();
// workflow.name → 'my-workflow'
// workflow.nodes → Map of WorkflowNode<S>
// workflow.edges → Edge[]
// workflow.entryPoint → string
// workflow.initialState → SFull Example
import { WorkflowBuilder, agentNode, functionNode } from '@cogitator-ai/workflows';
import { Agent } from '@cogitator-ai/core';
interface PipelineState {
url: string;
rawData?: string;
cleanData?: string;
analysis?: string;
isValid: boolean;
}
const fetcher = new Agent({ name: 'fetcher', instructions: 'Fetch and return data from URLs.' });
const analyst = new Agent({ name: 'analyst', instructions: 'Analyze structured data.' });
const workflow = new WorkflowBuilder<PipelineState>('data-pipeline')
.initialState({ url: '', isValid: false })
.addNode(
'fetch',
agentNode(fetcher, {
inputMapper: (state) => `Fetch data from: ${state.url}`,
stateMapper: (result) => ({ rawData: result.output }),
})
)
.addNode(
'clean',
functionNode(
'clean',
async (state) => {
const cleaned = state.rawData?.trim().toLowerCase();
return cleaned;
},
{
stateMapper: (output) => ({ cleanData: output as string, isValid: true }),
}
),
{ after: ['fetch'] }
)
.addConditional('check', (state) => (state.isValid ? 'analyze' : 'error'), {
after: ['clean'],
})
.addNode(
'analyze',
agentNode(analyst, {
inputMapper: (state) => `Analyze: ${state.cleanData}`,
stateMapper: (result) => ({ analysis: result.output }),
}),
{ after: ['check'] }
)
.addNode(
'error',
functionNode('error', async () => {
return { error: 'Validation failed' };
}),
{ after: ['check'] }
)
.build();Edge Types
Under the hood, .build() generates typed edges:
| Type | Description | Generated by |
|---|---|---|
sequential | A depends on B | addNode with after |
conditional | Route based on state | addConditional |
loop | Re-execute subgraph | addLoop |
parallel | Fan-out to multiple nodes | addParallel |