Cogitator
Workflows

WorkflowBuilder

Fluent API for constructing DAG-based workflows with nodes, edges, conditionals, loops, and parallel branches.

Overview

WorkflowBuilder provides a chainable API for defining workflow graphs. You add nodes, wire them together with after dependencies, and call .build() to get a validated Workflow object ready for execution.

import { WorkflowBuilder } from '@cogitator-ai/workflows';

interface MyState {
  input: string;
  result?: string;
}

const workflow = new WorkflowBuilder<MyState>('my-workflow')
  .initialState({ input: '' })
  .addNode('step-1', myNodeFn)
  .addNode('step-2', anotherNodeFn, { after: ['step-1'] })
  .build();

Constructor

const builder = new WorkflowBuilder<S>(name: string);

The generic type S defines the workflow state shape. It must extend WorkflowState (which is Record<string, unknown>).

.initialState(state)

Set the default state that the workflow starts with. Input provided at execution time is merged on top of this.

const builder = new WorkflowBuilder<{ count: number; items: string[] }>('counter').initialState({
  count: 0,
  items: [],
});

.addNode(name, fn, options?)

Add a processing node to the workflow. The fn is a NodeFn<S> -- an async function that receives a NodeContext<S> and returns a NodeResult<S>.

builder.addNode('process', async (ctx) => {
  const transformed = ctx.state.input.toUpperCase();
  return {
    state: { result: transformed },
    output: transformed,
  };
});

Options

The optional options parameter accepts:

  • after -- Array of node names this node depends on. The node won't execute until all dependencies complete.
  • config -- Optional NodeConfig for metadata.
builder.addNode('analyze', analyzeNodeFn, {
  after: ['fetch', 'validate'],
  config: { timeout: 30000 },
});

In practice, you'll use node factories (agentNode, toolNode, functionNode) instead of raw functions. See Node Types.

.addConditional(name, condition, options?)

Add a conditional routing node. The condition function receives the current state and returns the name(s) of the target node(s) to execute next.

interface ReviewState {
  code: string;
  quality: 'good' | 'bad';
  fixedCode?: string;
}

const workflow = new WorkflowBuilder<ReviewState>('code-review')
  .initialState({ code: '', quality: 'good' })
  .addNode('analyze', analyzeNode)
  .addConditional(
    'route',
    (state) => {
      return state.quality === 'good' ? 'approve' : 'fix';
    },
    { after: ['analyze'] }
  )
  .addNode('approve', approveNode, { after: ['route'] })
  .addNode('fix', fixNode, { after: ['route'] })
  .build();

The condition function can return a single node name or an array for multi-target routing.

.addLoop(name, options)

Add a loop construct that re-executes a portion of the graph while a condition holds.

interface RefinementState {
  draft: string;
  score: number;
  iteration: number;
}

const workflow = new WorkflowBuilder<RefinementState>('refine')
  .initialState({ draft: '', score: 0, iteration: 0 })
  .addNode('write', writeNode)
  .addNode('evaluate', evaluateNode, { after: ['write'] })
  .addLoop('check-quality', {
    condition: (state) => state.score < 8 && state.iteration < 5,
    back: 'write',
    exit: 'finalize',
    after: ['evaluate'],
  })
  .addNode('finalize', finalizeNode)
  .build();

Loop Options

OptionTypeDescription
condition(state: S) => booleanIf true, loop back; if false, exit
backstringNode to jump back to when looping
exitstringNode to proceed to when done
afterstring[]Dependencies (optional)

.addParallel(name, targets, options?)

Add a parallel fan-out node that dispatches execution to multiple targets simultaneously.

const workflow = new WorkflowBuilder<SearchState>('multi-search')
  .initialState({ query: '' })
  .addNode('prepare', prepareNode)
  .addParallel('fan-out', ['search-web', 'search-papers', 'search-code'], {
    after: ['prepare'],
  })
  .addNode('search-web', webSearchNode)
  .addNode('search-papers', paperSearchNode)
  .addNode('search-code', codeSearchNode)
  .addNode('combine', combineNode, {
    after: ['search-web', 'search-papers', 'search-code'],
  })
  .build();

.entryPoint(nodeName)

Override the auto-detected entry point. By default, the builder picks the first node with no dependencies. Use this when you need explicit control.

builder
  .addNode('init', initNode)
  .addNode('process', processNode, { after: ['init'] })
  .entryPoint('init')
  .build();

.build()

Validates the workflow graph and returns a Workflow<S> object. Build-time validation checks:

  • Entry point exists in the node set
  • All edges reference existing nodes
  • Loop back and exit targets exist
  • Conditional targets exist
  • Parallel targets exist

Throws an Error if validation fails.

const workflow = builder.build();
// workflow.name → 'my-workflow'
// workflow.nodes → Map of WorkflowNode<S>
// workflow.edges → Edge[]
// workflow.entryPoint → string
// workflow.initialState → S

Full Example

import { WorkflowBuilder, agentNode, functionNode } from '@cogitator-ai/workflows';
import { Agent } from '@cogitator-ai/core';

interface PipelineState {
  url: string;
  rawData?: string;
  cleanData?: string;
  analysis?: string;
  isValid: boolean;
}

const fetcher = new Agent({ name: 'fetcher', instructions: 'Fetch and return data from URLs.' });
const analyst = new Agent({ name: 'analyst', instructions: 'Analyze structured data.' });

const workflow = new WorkflowBuilder<PipelineState>('data-pipeline')
  .initialState({ url: '', isValid: false })
  .addNode(
    'fetch',
    agentNode(fetcher, {
      inputMapper: (state) => `Fetch data from: ${state.url}`,
      stateMapper: (result) => ({ rawData: result.output }),
    })
  )
  .addNode(
    'clean',
    functionNode(
      'clean',
      async (state) => {
        const cleaned = state.rawData?.trim().toLowerCase();
        return cleaned;
      },
      {
        stateMapper: (output) => ({ cleanData: output as string, isValid: true }),
      }
    ),
    { after: ['fetch'] }
  )
  .addConditional('check', (state) => (state.isValid ? 'analyze' : 'error'), {
    after: ['clean'],
  })
  .addNode(
    'analyze',
    agentNode(analyst, {
      inputMapper: (state) => `Analyze: ${state.cleanData}`,
      stateMapper: (result) => ({ analysis: result.output }),
    }),
    { after: ['check'] }
  )
  .addNode(
    'error',
    functionNode('error', async () => {
      return { error: 'Validation failed' };
    }),
    { after: ['check'] }
  )
  .build();

Edge Types

Under the hood, .build() generates typed edges:

TypeDescriptionGenerated by
sequentialA depends on BaddNode with after
conditionalRoute based on stateaddConditional
loopRe-execute subgraphaddLoop
parallelFan-out to multiple nodesaddParallel

On this page