Cogitator
Workflows

Workflow Execution

Running workflows with the WorkflowExecutor, streaming events, checkpointing, and resuming long-running workflows.

WorkflowExecutor

WorkflowExecutor is the main engine that takes a built Workflow and runs it. It resolves the DAG topology, schedules nodes respecting dependencies, manages concurrency, and optionally saves checkpoints.

import { Cogitator } from '@cogitator-ai/core';
import { WorkflowExecutor } from '@cogitator-ai/workflows';

const cogitator = new Cogitator({
  llm: {
    defaultProvider: 'openai',
    providers: {
      openai: { type: 'openai', apiKey: process.env.OPENAI_API_KEY!, model: 'gpt-4o' },
    },
  },
});

const executor = new WorkflowExecutor(cogitator);

The executor requires a Cogitator instance because agentNode nodes need it to run agents. You can optionally pass a CheckpointStore as the second argument.

.execute(workflow, input?, options?)

Run a workflow to completion.

interface TaskState {
  prompt: string;
  result?: string;
}

const result = await executor.execute(workflow, { prompt: 'Explain quantum computing' });

console.log(result.state.result);
console.log(`Completed in ${result.duration}ms`);

Input Merging

The input parameter is merged on top of workflow.initialState:

const workflow = new WorkflowBuilder<{ query: string; limit: number }>('search')
  .initialState({ query: '', limit: 10 })
  .addNode('search', searchNode)
  .build();

// limit stays at 10, query is overridden
const result = await executor.execute(workflow, { query: 'typescript generics' });

Execution Options

interface WorkflowExecuteOptions {
  maxConcurrency?: number;
  maxIterations?: number;
  checkpoint?: boolean;
  checkpointStrategy?: 'per-iteration' | 'per-node';
  onNodeStart?: (nodeName: string) => void;
  onNodeComplete?: (nodeName: string, output: unknown, duration: number) => void;
  onNodeError?: (nodeName: string, error: Error) => void;
  onNodeProgress?: (nodeName: string, progress: number) => void;
}
OptionDefaultDescription
maxConcurrency4Max parallel node executions
maxIterations100Safety limit for loop iterations
checkpointfalseEnable checkpoint saving
checkpointStrategy'per-iteration'When to save checkpoints

Lifecycle Hooks

Track execution progress with callbacks:

const result = await executor.execute(
  workflow,
  { topic: 'AI agents' },
  {
    onNodeStart: (node) => console.log(`Starting: ${node}`),
    onNodeComplete: (node, output, duration) => {
      console.log(`Completed: ${node} in ${duration}ms`);
    },
    onNodeError: (node, error) => {
      console.error(`Failed: ${node}`, error.message);
    },
    onNodeProgress: (node, progress) => {
      console.log(`${node}: ${progress}%`);
    },
  }
);

WorkflowResult

The return value of .execute():

interface WorkflowResult<S> {
  workflowId: string;
  workflowName: string;
  state: S;
  nodeResults: Map<string, { output: unknown; duration: number }>;
  duration: number;
  checkpointId?: string;
  error?: Error;
}

Access individual node outputs:

const result = await executor.execute(workflow, input);

const searchOutput = result.nodeResults.get('search');
console.log(searchOutput?.output);
console.log(searchOutput?.duration);

if (result.error) {
  console.error('Workflow failed:', result.error.message);
}

Streaming Events

Use .stream() to get an async iterable of events as the workflow executes. This is ideal for real-time UIs.

for await (const event of executor.stream(workflow, { topic: 'GraphQL' })) {
  switch (event.type) {
    case 'workflow_started':
      console.log(`Workflow ${event.workflowName} started`);
      break;

    case 'node_started':
      console.log(`Node ${event.nodeName} started`);
      break;

    case 'node_progress':
      console.log(`Node ${event.nodeName}: ${event.progress}%`);
      break;

    case 'node_completed':
      console.log(`Node ${event.nodeName} done in ${event.duration}ms`);
      break;

    case 'node_error':
      console.error(`Node ${event.nodeName} failed:`, event.error);
      break;

    case 'workflow_completed':
      console.log(`Done in ${event.duration}ms`);
      console.log(event.result.state);
      break;
  }
}

Event Types

EventFieldsWhen
workflow_startedworkflowId, workflowName, timestampWorkflow begins
node_startednodeName, timestampNode execution starts
node_progressnodeName, progress, timestampNode reports progress
node_completednodeName, output, durationNode finishes
node_errornodeName, error, timestampNode throws
workflow_completedworkflowId, result, durationWorkflow finishes

Checkpointing

For long-running workflows, enable checkpointing to persist state after each iteration. If the process crashes, you can resume from the last checkpoint.

Checkpoint Stores

import { InMemoryCheckpointStore, FileCheckpointStore } from '@cogitator-ai/workflows';

const memoryStore = new InMemoryCheckpointStore();

const fileStore = new FileCheckpointStore('/tmp/checkpoints');

const executor = new WorkflowExecutor(cogitator, fileStore);

Enabling Checkpoints

const result = await executor.execute(workflow, input, {
  checkpoint: true,
  checkpointStrategy: 'per-node',
});

console.log('Checkpoint:', result.checkpointId);

Strategies:

  • 'per-iteration' -- Save after each batch of parallel nodes completes
  • 'per-node' -- Save immediately after every single node completes (more granular, slightly slower)

Resuming from Checkpoint

const result = await executor.resume(workflow, checkpointId, {
  maxConcurrency: 4,
});

The executor loads the checkpoint, restores state and completed node set, then continues execution from where it left off.

Checkpoint Data

interface WorkflowCheckpoint {
  id: string;
  workflowId: string;
  workflowName: string;
  state: unknown;
  completedNodes: string[];
  nodeResults: Record<string, unknown>;
  timestamp: number;
}

Error Handling

If a node throws, the executor catches the error and stores it in result.error. Remaining nodes in the current batch are skipped.

const result = await executor.execute(workflow, input);

if (result.error) {
  console.error(`Failed at node: ${result.error.message}`);
  console.log('Completed nodes:', [...result.nodeResults.keys()]);
  console.log('Partial state:', result.state);
}

For more sophisticated error handling (retry, circuit breakers, compensation), see Sagas & Compensation.

Concurrency Control

The maxConcurrency option limits how many nodes can run in parallel. Nodes without dependency relationships between them are candidates for parallel execution.

// run up to 2 nodes at a time
await executor.execute(workflow, input, { maxConcurrency: 2 });

// fully sequential
await executor.execute(workflow, input, { maxConcurrency: 1 });

// aggressive parallelism
await executor.execute(workflow, input, { maxConcurrency: 16 });

The internal WorkflowScheduler uses the DAG topology to determine which nodes can safely run in parallel at each iteration.

Iteration Safety

The maxIterations limit prevents infinite loops. If the workflow hits this limit, the result will contain an error:

const result = await executor.execute(workflow, input, { maxIterations: 50 });

if (result.error?.message.includes('max iterations')) {
  console.warn('Workflow hit iteration safety limit');
}

On this page