Workflow Patterns
MapReduce, sub-workflows, parallel execution, conditional branching, loops, fan-out/fan-in, scatter-gather, race, and fallback patterns.
Overview
Beyond basic DAG execution, Cogitator workflows support advanced composition patterns for data-parallel processing, nested workflows, and fault-tolerant execution strategies.
MapReduce
Process collections of items in parallel (map phase), then combine results (reduce phase). Supports concurrency limits, partial failure handling, per-item retries, and streaming reduce.
Basic MapReduce
import { executeMapReduce, mapReduceNode, collect } from '@cogitator-ai/workflows';
interface DocState {
documents: string[];
summaries?: string[];
}
const result = await executeMapReduce<DocState, string, string[]>(
{ documents: ['doc1.txt', 'doc2.txt', 'doc3.txt'] },
{
name: 'summarize-docs',
map: {
items: (state) => state.documents,
mapper: async (item) => {
const content = await readFile(item as string);
return `Summary of ${item}: ${content.slice(0, 100)}...`;
},
concurrency: 5,
continueOnError: true,
},
reduce: {
...collect<string>(),
},
}
);
console.log(`Processed: ${result.stats.successful}/${result.stats.total}`);
console.log('Summaries:', result.reduced);Map Options
const mapConfig = {
items: (state: S) => state.urls,
mapper: async (item: unknown, index: number, state: S) => {
return await fetch(item as string).then((r) => r.text());
},
concurrency: 10,
continueOnError: true,
timeout: 30000,
retry: {
maxAttempts: 3,
delay: 1000,
backoff: 'exponential' as const,
},
filter: (item: unknown) => (item as string).startsWith('https'),
transform: (item: unknown) => (item as string).trim(),
onProgress: (progress) => {
console.log(`${progress.completed}/${progress.total} (${progress.failed} failed)`);
},
};Built-in Reducers
The package includes common reducer factories:
import {
collect,
sum,
count,
first,
last,
groupBy,
partition,
flatMap,
stats,
} from '@cogitator-ai/workflows';
const asArray = collect<string>();
const total = sum();
const howMany = count();
const firstResult = first<string>();
const lastResult = last<string>();
const grouped = groupBy<{ category: string }, string>((result) => result.category);
const split = partition<number>((result) => result > 0.5);
const flattened = flatMap<string[]>();
const statistics = stats();Convenience Helpers
import { parallelMap, sequentialMap, batchedMap } from '@cogitator-ai/workflows';
const results = await parallelMap(
state,
(s) => s.urls,
async (url) => fetch(url as string).then((r) => r.text())
);
const ordered = await sequentialMap(
state,
(s) => s.items,
async (item) => processItem(item)
);
const batched = await batchedMap(
state,
(s) => s.items,
async (item) => processItem(item),
5
);Sub-Workflows
Execute entire workflows as nodes within a parent workflow. Supports state mapping between parent and child, error strategies, depth limits, and timeout.
Basic Sub-Workflow
import { executeSubworkflow, subworkflowNode } from '@cogitator-ai/workflows';
interface ParentState {
query: string;
detailedAnalysis?: string;
}
interface ChildState {
input: string;
output?: string;
}
const subConfig = subworkflowNode<ParentState, ChildState>('deep-analysis', {
workflow: analysisWorkflow,
inputMapper: (parentState) => ({ input: parentState.query }),
outputMapper: (childResult, parentState) => ({
...parentState,
detailedAnalysis: childResult.state.output,
}),
onError: 'retry',
retryConfig: { maxAttempts: 3, delay: 2000, backoff: 'exponential' },
maxDepth: 5,
timeout: 60000,
});Error Strategies
| Strategy | Behavior |
|---|---|
'propagate' | Re-throw the child error to the parent (default) |
'catch' | Return a failed result without throwing |
'retry' | Retry with backoff according to retryConfig |
'ignore' | Return success even on failure |
Simple Sub-Workflow
When parent and child share the same state shape:
import { simpleSubworkflow } from '@cogitator-ai/workflows';
const sub = simpleSubworkflow('validation', validationWorkflow, {
timeout: 30000,
});Nested Sub-Workflow
Extract a sub-key of parent state for the child:
import { nestedSubworkflow } from '@cogitator-ai/workflows';
interface AppState {
user: UserState;
orders: OrderState;
}
const sub = nestedSubworkflow<AppState, 'orders'>('process-orders', orderWorkflow, 'orders');Conditional Sub-Workflow
import { conditionalSubworkflow } from '@cogitator-ai/workflows';
const sub = conditionalSubworkflow<ParentState, ChildState>('optional-step', {
workflow: optionalWorkflow,
condition: (state) => state.needsDeepAnalysis,
inputMapper: (state) => ({ data: state.rawData }),
outputMapper: (result, state) => ({ ...state, extra: result.state }),
});Parallel Sub-Workflows
Execute multiple sub-workflows concurrently with aggregation.
Fan-Out / Fan-In
Distribute the same workflow across multiple inputs:
import { fanOutFanIn } from '@cogitator-ai/workflows';
interface BatchState {
items: string[];
results: Map<string, string>;
}
const config = fanOutFanIn<BatchState, { input: string; output?: string }>('process-batch', {
workflow: itemWorkflow,
getInputs: (state) =>
state.items.map((item, i) => ({
id: `item-${i}`,
input: { input: item },
})),
aggregator: (results, state) => {
const allResults = new Map<string, string>();
for (const [id, result] of results) {
allResults.set(id, result.state.output ?? '');
}
return { ...state, results: allResults };
},
concurrency: 4,
continueOnError: true,
});Scatter-Gather
Send work to different specialized workflows and gather all results:
import { scatterGather } from '@cogitator-ai/workflows';
const config = scatterGather<AnalysisState, { input: string; output?: string }>('multi-analysis', {
workflows: new Map([
['sentiment', sentimentWorkflow],
['entities', entityWorkflow],
['topics', topicWorkflow],
]),
inputMapper: (state, workflowId) => ({ input: state.text }),
outputMapper: (results, state) => {
const sentiment = results.get('sentiment')?.state.output;
const entities = results.get('entities')?.state.output;
const topics = results.get('topics')?.state.output;
return { ...state, sentiment, entities, topics };
},
timeout: 30000,
});Race
Execute multiple sub-workflows and take the first successful result:
import { raceSubworkflows } from '@cogitator-ai/workflows';
const result = await raceSubworkflows(
parentState,
[fastModelConfig, slowModelConfig, fallbackConfig],
context
);
if (result) {
console.log('Winner:', result.parentState);
}Fallback
Try sub-workflows in order until one succeeds:
import { fallbackSubworkflows } from '@cogitator-ai/workflows';
const result = await fallbackSubworkflows(
parentState,
[primaryConfig, secondaryConfig, lastResortConfig],
context
);Conditional Branching
Use WorkflowBuilder.addConditional for state-based routing:
const workflow = new WorkflowBuilder<ReviewState>('review')
.initialState({ code: '', severity: 'low' })
.addNode('analyze', analyzeNode)
.addConditional(
'route',
(state) => {
if (state.severity === 'critical') return 'emergency-fix';
if (state.severity === 'high') return 'senior-review';
return 'auto-approve';
},
{ after: ['analyze'] }
)
.addNode('emergency-fix', emergencyNode, { after: ['route'] })
.addNode('senior-review', seniorNode, { after: ['route'] })
.addNode('auto-approve', approveNode, { after: ['route'] })
.build();Multi-target routing returns multiple branches:
builder.addConditional('fork', (state) => {
const targets: string[] = ['log'];
if (state.needsNotification) targets.push('notify');
if (state.needsArchive) targets.push('archive');
return targets;
});Loops
Use WorkflowBuilder.addLoop for iterative refinement:
interface DraftState {
draft: string;
score: number;
iteration: number;
}
const workflow = new WorkflowBuilder<DraftState>('iterative-writer')
.initialState({ draft: '', score: 0, iteration: 0 })
.addNode(
'write',
agentNode(writer, {
inputMapper: (state) =>
state.draft
? `Improve this draft:\n${state.draft}`
: 'Write an initial draft about AI agents',
stateMapper: (r) => ({ draft: r.output }),
})
)
.addNode(
'evaluate',
agentNode(critic, {
inputMapper: (state) => `Score 1-10:\n${state.draft}`,
stateMapper: (r) => ({
score: parseInt(r.output) || 0,
iteration: 0,
}),
}),
{ after: ['write'] }
)
.addLoop('quality-gate', {
condition: (state) => state.score < 8 && state.iteration < 5,
back: 'write',
exit: 'publish',
after: ['evaluate'],
})
.addNode(
'publish',
functionNode('publish', async (state) => {
return { finalDraft: state.draft, iterations: state.iteration };
})
)
.build();The loop checks the condition after evaluate completes. If true, execution jumps back to write. If false, it proceeds to publish. The maxIterations option on WorkflowExecutor provides a safety net against infinite loops.
Parallel Execution
Use WorkflowBuilder.addParallel to fan out, then converge with shared dependencies:
const workflow = new WorkflowBuilder<MultiSearchState>('parallel-search')
.initialState({ query: '' })
.addNode('prepare', prepareNode)
.addParallel('search', ['web', 'academic', 'github'], { after: ['prepare'] })
.addNode('web', webSearchNode)
.addNode('academic', academicSearchNode)
.addNode('github', githubSearchNode)
.addNode(
'merge',
functionNode('merge', async (state) => {
return {
webResults: state.webResults,
academicResults: state.academicResults,
githubResults: state.githubResults,
};
}),
{ after: ['web', 'academic', 'github'] }
)
.build();The merge node has after: ['web', 'academic', 'github'] so it waits for all parallel branches to complete before executing.