Cogitator
Workflows

Workflow Patterns

MapReduce, sub-workflows, parallel execution, conditional branching, loops, fan-out/fan-in, scatter-gather, race, and fallback patterns.

Overview

Beyond basic DAG execution, Cogitator workflows support advanced composition patterns for data-parallel processing, nested workflows, and fault-tolerant execution strategies.

MapReduce

Process collections of items in parallel (map phase), then combine results (reduce phase). Supports concurrency limits, partial failure handling, per-item retries, and streaming reduce.

Basic MapReduce

import { executeMapReduce, mapReduceNode, collect } from '@cogitator-ai/workflows';

interface DocState {
  documents: string[];
  summaries?: string[];
}

const result = await executeMapReduce<DocState, string, string[]>(
  { documents: ['doc1.txt', 'doc2.txt', 'doc3.txt'] },
  {
    name: 'summarize-docs',
    map: {
      items: (state) => state.documents,
      mapper: async (item) => {
        const content = await readFile(item as string);
        return `Summary of ${item}: ${content.slice(0, 100)}...`;
      },
      concurrency: 5,
      continueOnError: true,
    },
    reduce: {
      ...collect<string>(),
    },
  }
);

console.log(`Processed: ${result.stats.successful}/${result.stats.total}`);
console.log('Summaries:', result.reduced);

Map Options

const mapConfig = {
  items: (state: S) => state.urls,
  mapper: async (item: unknown, index: number, state: S) => {
    return await fetch(item as string).then((r) => r.text());
  },
  concurrency: 10,
  continueOnError: true,
  timeout: 30000,
  retry: {
    maxAttempts: 3,
    delay: 1000,
    backoff: 'exponential' as const,
  },
  filter: (item: unknown) => (item as string).startsWith('https'),
  transform: (item: unknown) => (item as string).trim(),
  onProgress: (progress) => {
    console.log(`${progress.completed}/${progress.total} (${progress.failed} failed)`);
  },
};

Built-in Reducers

The package includes common reducer factories:

import {
  collect,
  sum,
  count,
  first,
  last,
  groupBy,
  partition,
  flatMap,
  stats,
} from '@cogitator-ai/workflows';

const asArray = collect<string>();

const total = sum();

const howMany = count();

const firstResult = first<string>();

const lastResult = last<string>();

const grouped = groupBy<{ category: string }, string>((result) => result.category);

const split = partition<number>((result) => result > 0.5);

const flattened = flatMap<string[]>();

const statistics = stats();

Convenience Helpers

import { parallelMap, sequentialMap, batchedMap } from '@cogitator-ai/workflows';

const results = await parallelMap(
  state,
  (s) => s.urls,
  async (url) => fetch(url as string).then((r) => r.text())
);

const ordered = await sequentialMap(
  state,
  (s) => s.items,
  async (item) => processItem(item)
);

const batched = await batchedMap(
  state,
  (s) => s.items,
  async (item) => processItem(item),
  5
);

Sub-Workflows

Execute entire workflows as nodes within a parent workflow. Supports state mapping between parent and child, error strategies, depth limits, and timeout.

Basic Sub-Workflow

import { executeSubworkflow, subworkflowNode } from '@cogitator-ai/workflows';

interface ParentState {
  query: string;
  detailedAnalysis?: string;
}

interface ChildState {
  input: string;
  output?: string;
}

const subConfig = subworkflowNode<ParentState, ChildState>('deep-analysis', {
  workflow: analysisWorkflow,
  inputMapper: (parentState) => ({ input: parentState.query }),
  outputMapper: (childResult, parentState) => ({
    ...parentState,
    detailedAnalysis: childResult.state.output,
  }),
  onError: 'retry',
  retryConfig: { maxAttempts: 3, delay: 2000, backoff: 'exponential' },
  maxDepth: 5,
  timeout: 60000,
});

Error Strategies

StrategyBehavior
'propagate'Re-throw the child error to the parent (default)
'catch'Return a failed result without throwing
'retry'Retry with backoff according to retryConfig
'ignore'Return success even on failure

Simple Sub-Workflow

When parent and child share the same state shape:

import { simpleSubworkflow } from '@cogitator-ai/workflows';

const sub = simpleSubworkflow('validation', validationWorkflow, {
  timeout: 30000,
});

Nested Sub-Workflow

Extract a sub-key of parent state for the child:

import { nestedSubworkflow } from '@cogitator-ai/workflows';

interface AppState {
  user: UserState;
  orders: OrderState;
}

const sub = nestedSubworkflow<AppState, 'orders'>('process-orders', orderWorkflow, 'orders');

Conditional Sub-Workflow

import { conditionalSubworkflow } from '@cogitator-ai/workflows';

const sub = conditionalSubworkflow<ParentState, ChildState>('optional-step', {
  workflow: optionalWorkflow,
  condition: (state) => state.needsDeepAnalysis,
  inputMapper: (state) => ({ data: state.rawData }),
  outputMapper: (result, state) => ({ ...state, extra: result.state }),
});

Parallel Sub-Workflows

Execute multiple sub-workflows concurrently with aggregation.

Fan-Out / Fan-In

Distribute the same workflow across multiple inputs:

import { fanOutFanIn } from '@cogitator-ai/workflows';

interface BatchState {
  items: string[];
  results: Map<string, string>;
}

const config = fanOutFanIn<BatchState, { input: string; output?: string }>('process-batch', {
  workflow: itemWorkflow,
  getInputs: (state) =>
    state.items.map((item, i) => ({
      id: `item-${i}`,
      input: { input: item },
    })),
  aggregator: (results, state) => {
    const allResults = new Map<string, string>();
    for (const [id, result] of results) {
      allResults.set(id, result.state.output ?? '');
    }
    return { ...state, results: allResults };
  },
  concurrency: 4,
  continueOnError: true,
});

Scatter-Gather

Send work to different specialized workflows and gather all results:

import { scatterGather } from '@cogitator-ai/workflows';

const config = scatterGather<AnalysisState, { input: string; output?: string }>('multi-analysis', {
  workflows: new Map([
    ['sentiment', sentimentWorkflow],
    ['entities', entityWorkflow],
    ['topics', topicWorkflow],
  ]),
  inputMapper: (state, workflowId) => ({ input: state.text }),
  outputMapper: (results, state) => {
    const sentiment = results.get('sentiment')?.state.output;
    const entities = results.get('entities')?.state.output;
    const topics = results.get('topics')?.state.output;
    return { ...state, sentiment, entities, topics };
  },
  timeout: 30000,
});

Race

Execute multiple sub-workflows and take the first successful result:

import { raceSubworkflows } from '@cogitator-ai/workflows';

const result = await raceSubworkflows(
  parentState,
  [fastModelConfig, slowModelConfig, fallbackConfig],
  context
);

if (result) {
  console.log('Winner:', result.parentState);
}

Fallback

Try sub-workflows in order until one succeeds:

import { fallbackSubworkflows } from '@cogitator-ai/workflows';

const result = await fallbackSubworkflows(
  parentState,
  [primaryConfig, secondaryConfig, lastResortConfig],
  context
);

Conditional Branching

Use WorkflowBuilder.addConditional for state-based routing:

const workflow = new WorkflowBuilder<ReviewState>('review')
  .initialState({ code: '', severity: 'low' })
  .addNode('analyze', analyzeNode)
  .addConditional(
    'route',
    (state) => {
      if (state.severity === 'critical') return 'emergency-fix';
      if (state.severity === 'high') return 'senior-review';
      return 'auto-approve';
    },
    { after: ['analyze'] }
  )
  .addNode('emergency-fix', emergencyNode, { after: ['route'] })
  .addNode('senior-review', seniorNode, { after: ['route'] })
  .addNode('auto-approve', approveNode, { after: ['route'] })
  .build();

Multi-target routing returns multiple branches:

builder.addConditional('fork', (state) => {
  const targets: string[] = ['log'];
  if (state.needsNotification) targets.push('notify');
  if (state.needsArchive) targets.push('archive');
  return targets;
});

Loops

Use WorkflowBuilder.addLoop for iterative refinement:

interface DraftState {
  draft: string;
  score: number;
  iteration: number;
}

const workflow = new WorkflowBuilder<DraftState>('iterative-writer')
  .initialState({ draft: '', score: 0, iteration: 0 })
  .addNode(
    'write',
    agentNode(writer, {
      inputMapper: (state) =>
        state.draft
          ? `Improve this draft:\n${state.draft}`
          : 'Write an initial draft about AI agents',
      stateMapper: (r) => ({ draft: r.output }),
    })
  )
  .addNode(
    'evaluate',
    agentNode(critic, {
      inputMapper: (state) => `Score 1-10:\n${state.draft}`,
      stateMapper: (r) => ({
        score: parseInt(r.output) || 0,
        iteration: 0,
      }),
    }),
    { after: ['write'] }
  )
  .addLoop('quality-gate', {
    condition: (state) => state.score < 8 && state.iteration < 5,
    back: 'write',
    exit: 'publish',
    after: ['evaluate'],
  })
  .addNode(
    'publish',
    functionNode('publish', async (state) => {
      return { finalDraft: state.draft, iterations: state.iteration };
    })
  )
  .build();

The loop checks the condition after evaluate completes. If true, execution jumps back to write. If false, it proceeds to publish. The maxIterations option on WorkflowExecutor provides a safety net against infinite loops.

Parallel Execution

Use WorkflowBuilder.addParallel to fan out, then converge with shared dependencies:

const workflow = new WorkflowBuilder<MultiSearchState>('parallel-search')
  .initialState({ query: '' })
  .addNode('prepare', prepareNode)
  .addParallel('search', ['web', 'academic', 'github'], { after: ['prepare'] })
  .addNode('web', webSearchNode)
  .addNode('academic', academicSearchNode)
  .addNode('github', githubSearchNode)
  .addNode(
    'merge',
    functionNode('merge', async (state) => {
      return {
        webResults: state.webResults,
        academicResults: state.academicResults,
        githubResults: state.githubResults,
      };
    }),
    { after: ['web', 'academic', 'github'] }
  )
  .build();

The merge node has after: ['web', 'academic', 'github'] so it waits for all parallel branches to complete before executing.

On this page