Workflows
Workflows
DAG-based orchestration for multi-step agent tasks with parallel execution, conditional branching, and error handling.
Overview
Workflows let you compose multiple agents, tools, and functions into a Directed Acyclic Graph (DAG) where each node processes data and passes results downstream. The workflow engine handles execution order, parallelism, state management, and checkpointing automatically.
[fetch-data] ──→ [analyze] ──→ [summarize]
│
└──→ [visualize] (parallel branch)Use workflows when you need deterministic, multi-step pipelines with well-defined dependencies between stages.
When to Use Workflows vs Swarms
| Workflows | Swarms | |
|---|---|---|
| Structure | Fixed DAG, explicit edges | Dynamic, agents decide routing |
| Control | You define execution order | Agents negotiate handoffs |
| Best for | ETL pipelines, CI/CD, approvals | Creative tasks, open-ended research |
| Error handling | Retry, compensation, DLQ | Agent-level recovery |
| Determinism | High | Low |
Quick Example
import { Cogitator, Agent } from '@cogitator-ai/core';
import {
WorkflowBuilder,
WorkflowExecutor,
agentNode,
functionNode,
} from '@cogitator-ai/workflows';
interface ResearchState {
topic: string;
searchResults?: string;
summary?: string;
}
const researcher = new Agent({
name: 'researcher',
instructions: 'Search for information on the given topic.',
});
const writer = new Agent({
name: 'writer',
instructions: 'Write a concise summary from the provided research.',
});
const workflow = new WorkflowBuilder<ResearchState>('research-pipeline')
.initialState({ topic: '' })
.addNode(
'search',
agentNode(researcher, {
inputMapper: (state) => `Research: ${state.topic}`,
stateMapper: (result) => ({ searchResults: result.output }),
})
)
.addNode(
'summarize',
agentNode(writer, {
inputMapper: (state) => `Summarize:\n${state.searchResults}`,
stateMapper: (result) => ({ summary: result.output }),
}),
{ after: ['search'] }
)
.build();
const cogitator = new Cogitator({
llm: {
defaultProvider: 'openai',
providers: {
openai: { type: 'openai', apiKey: process.env.OPENAI_API_KEY!, model: 'gpt-4o' },
},
},
});
const executor = new WorkflowExecutor(cogitator);
const result = await executor.execute(workflow, { topic: 'WebGPU graphics API' });
console.log(result.state.summary);Architecture
The workflow system is composed of several layers:
- WorkflowBuilder -- Fluent API for defining nodes, edges, conditionals, loops, and parallel branches. Validates the DAG at build time.
- WorkflowExecutor -- Runs the workflow with configurable concurrency, checkpointing, and streaming events.
- WorkflowScheduler -- Internal component that resolves execution order from the DAG topology and manages parallel task scheduling.
- Node factories --
agentNode,toolNode,functionNode,customNodefor creating typed workflow nodes. - Saga module -- Retry policies, circuit breakers, compensation management, dead letter queues, and idempotency.
- Triggers -- Cron schedules, webhooks, and event-driven workflow activation.
- Patterns -- MapReduce, sub-workflows, fan-out/fan-in, scatter-gather, race, and fallback patterns.
Package Exports
Everything is exported from @cogitator-ai/workflows:
import {
WorkflowBuilder,
WorkflowExecutor,
WorkflowScheduler,
agentNode,
toolNode,
functionNode,
customNode,
executeWithRetry,
CircuitBreaker,
CompensationManager,
InMemoryDLQ,
createTriggerManager,
cronTrigger,
webhookTrigger,
executeMapReduce,
subworkflowNode,
humanNode,
} from '@cogitator-ai/workflows';Next Steps
- WorkflowBuilder -- construct workflows with the fluent API
- Node Types -- agent, tool, function, and custom nodes
- Execution -- run workflows, stream events, checkpoint and resume
- Sagas & Compensation -- retry, circuit breakers, rollback
- Scheduling & Triggers -- cron, webhooks, events
- Patterns -- MapReduce, sub-workflows, parallel execution