Workflow Execution
Running workflows with the WorkflowExecutor, streaming events, checkpointing, and resuming long-running workflows.
WorkflowExecutor
WorkflowExecutor is the main engine that takes a built Workflow and runs it. It resolves the DAG topology, schedules nodes respecting dependencies, manages concurrency, and optionally saves checkpoints.
import { Cogitator } from '@cogitator-ai/core';
import { WorkflowExecutor } from '@cogitator-ai/workflows';
const cogitator = new Cogitator({
llm: {
defaultProvider: 'openai',
providers: {
openai: { type: 'openai', apiKey: process.env.OPENAI_API_KEY!, model: 'gpt-4o' },
},
},
});
const executor = new WorkflowExecutor(cogitator);The executor requires a Cogitator instance because agentNode nodes need it to run agents. You can optionally pass a CheckpointStore as the second argument.
.execute(workflow, input?, options?)
Run a workflow to completion.
interface TaskState {
prompt: string;
result?: string;
}
const result = await executor.execute(workflow, { prompt: 'Explain quantum computing' });
console.log(result.state.result);
console.log(`Completed in ${result.duration}ms`);Input Merging
The input parameter is merged on top of workflow.initialState:
const workflow = new WorkflowBuilder<{ query: string; limit: number }>('search')
.initialState({ query: '', limit: 10 })
.addNode('search', searchNode)
.build();
// limit stays at 10, query is overridden
const result = await executor.execute(workflow, { query: 'typescript generics' });Execution Options
interface WorkflowExecuteOptions {
maxConcurrency?: number;
maxIterations?: number;
checkpoint?: boolean;
checkpointStrategy?: 'per-iteration' | 'per-node';
onNodeStart?: (nodeName: string) => void;
onNodeComplete?: (nodeName: string, output: unknown, duration: number) => void;
onNodeError?: (nodeName: string, error: Error) => void;
onNodeProgress?: (nodeName: string, progress: number) => void;
}| Option | Default | Description |
|---|---|---|
maxConcurrency | 4 | Max parallel node executions |
maxIterations | 100 | Safety limit for loop iterations |
checkpoint | false | Enable checkpoint saving |
checkpointStrategy | 'per-iteration' | When to save checkpoints |
Lifecycle Hooks
Track execution progress with callbacks:
const result = await executor.execute(
workflow,
{ topic: 'AI agents' },
{
onNodeStart: (node) => console.log(`Starting: ${node}`),
onNodeComplete: (node, output, duration) => {
console.log(`Completed: ${node} in ${duration}ms`);
},
onNodeError: (node, error) => {
console.error(`Failed: ${node}`, error.message);
},
onNodeProgress: (node, progress) => {
console.log(`${node}: ${progress}%`);
},
}
);WorkflowResult
The return value of .execute():
interface WorkflowResult<S> {
workflowId: string;
workflowName: string;
state: S;
nodeResults: Map<string, { output: unknown; duration: number }>;
duration: number;
checkpointId?: string;
error?: Error;
}Access individual node outputs:
const result = await executor.execute(workflow, input);
const searchOutput = result.nodeResults.get('search');
console.log(searchOutput?.output);
console.log(searchOutput?.duration);
if (result.error) {
console.error('Workflow failed:', result.error.message);
}Streaming Events
Use .stream() to get an async iterable of events as the workflow executes. This is ideal for real-time UIs.
for await (const event of executor.stream(workflow, { topic: 'GraphQL' })) {
switch (event.type) {
case 'workflow_started':
console.log(`Workflow ${event.workflowName} started`);
break;
case 'node_started':
console.log(`Node ${event.nodeName} started`);
break;
case 'node_progress':
console.log(`Node ${event.nodeName}: ${event.progress}%`);
break;
case 'node_completed':
console.log(`Node ${event.nodeName} done in ${event.duration}ms`);
break;
case 'node_error':
console.error(`Node ${event.nodeName} failed:`, event.error);
break;
case 'workflow_completed':
console.log(`Done in ${event.duration}ms`);
console.log(event.result.state);
break;
}
}Event Types
| Event | Fields | When |
|---|---|---|
workflow_started | workflowId, workflowName, timestamp | Workflow begins |
node_started | nodeName, timestamp | Node execution starts |
node_progress | nodeName, progress, timestamp | Node reports progress |
node_completed | nodeName, output, duration | Node finishes |
node_error | nodeName, error, timestamp | Node throws |
workflow_completed | workflowId, result, duration | Workflow finishes |
Checkpointing
For long-running workflows, enable checkpointing to persist state after each iteration. If the process crashes, you can resume from the last checkpoint.
Checkpoint Stores
import { InMemoryCheckpointStore, FileCheckpointStore } from '@cogitator-ai/workflows';
const memoryStore = new InMemoryCheckpointStore();
const fileStore = new FileCheckpointStore('/tmp/checkpoints');
const executor = new WorkflowExecutor(cogitator, fileStore);Enabling Checkpoints
const result = await executor.execute(workflow, input, {
checkpoint: true,
checkpointStrategy: 'per-node',
});
console.log('Checkpoint:', result.checkpointId);Strategies:
'per-iteration'-- Save after each batch of parallel nodes completes'per-node'-- Save immediately after every single node completes (more granular, slightly slower)
Resuming from Checkpoint
const result = await executor.resume(workflow, checkpointId, {
maxConcurrency: 4,
});The executor loads the checkpoint, restores state and completed node set, then continues execution from where it left off.
Checkpoint Data
interface WorkflowCheckpoint {
id: string;
workflowId: string;
workflowName: string;
state: unknown;
completedNodes: string[];
nodeResults: Record<string, unknown>;
timestamp: number;
}Error Handling
If a node throws, the executor catches the error and stores it in result.error. Remaining nodes in the current batch are skipped.
const result = await executor.execute(workflow, input);
if (result.error) {
console.error(`Failed at node: ${result.error.message}`);
console.log('Completed nodes:', [...result.nodeResults.keys()]);
console.log('Partial state:', result.state);
}For more sophisticated error handling (retry, circuit breakers, compensation), see Sagas & Compensation.
Concurrency Control
The maxConcurrency option limits how many nodes can run in parallel. Nodes without dependency relationships between them are candidates for parallel execution.
// run up to 2 nodes at a time
await executor.execute(workflow, input, { maxConcurrency: 2 });
// fully sequential
await executor.execute(workflow, input, { maxConcurrency: 1 });
// aggressive parallelism
await executor.execute(workflow, input, { maxConcurrency: 16 });The internal WorkflowScheduler uses the DAG topology to determine which nodes can safely run in parallel at each iteration.
Iteration Safety
The maxIterations limit prevents infinite loops. If the workflow hits this limit, the result will contain an error:
const result = await executor.execute(workflow, input, { maxIterations: 50 });
if (result.error?.message.includes('max iterations')) {
console.warn('Workflow hit iteration safety limit');
}