Cogitator
Workflows

Workflows

DAG-based orchestration for multi-step agent tasks with parallel execution, conditional branching, and error handling.

Overview

Workflows let you compose multiple agents, tools, and functions into a Directed Acyclic Graph (DAG) where each node processes data and passes results downstream. The workflow engine handles execution order, parallelism, state management, and checkpointing automatically.

  [fetch-data] ──→ [analyze] ──→ [summarize]

                       └──→ [visualize] (parallel branch)

Use workflows when you need deterministic, multi-step pipelines with well-defined dependencies between stages.

When to Use Workflows vs Swarms

WorkflowsSwarms
StructureFixed DAG, explicit edgesDynamic, agents decide routing
ControlYou define execution orderAgents negotiate handoffs
Best forETL pipelines, CI/CD, approvalsCreative tasks, open-ended research
Error handlingRetry, compensation, DLQAgent-level recovery
DeterminismHighLow

Quick Example

import { Cogitator, Agent } from '@cogitator-ai/core';
import {
  WorkflowBuilder,
  WorkflowExecutor,
  agentNode,
  functionNode,
} from '@cogitator-ai/workflows';

interface ResearchState {
  topic: string;
  searchResults?: string;
  summary?: string;
}

const researcher = new Agent({
  name: 'researcher',
  instructions: 'Search for information on the given topic.',
});

const writer = new Agent({
  name: 'writer',
  instructions: 'Write a concise summary from the provided research.',
});

const workflow = new WorkflowBuilder<ResearchState>('research-pipeline')
  .initialState({ topic: '' })
  .addNode(
    'search',
    agentNode(researcher, {
      inputMapper: (state) => `Research: ${state.topic}`,
      stateMapper: (result) => ({ searchResults: result.output }),
    })
  )
  .addNode(
    'summarize',
    agentNode(writer, {
      inputMapper: (state) => `Summarize:\n${state.searchResults}`,
      stateMapper: (result) => ({ summary: result.output }),
    }),
    { after: ['search'] }
  )
  .build();

const cogitator = new Cogitator({
  llm: {
    defaultProvider: 'openai',
    providers: {
      openai: { type: 'openai', apiKey: process.env.OPENAI_API_KEY!, model: 'gpt-4o' },
    },
  },
});

const executor = new WorkflowExecutor(cogitator);
const result = await executor.execute(workflow, { topic: 'WebGPU graphics API' });

console.log(result.state.summary);

Architecture

The workflow system is composed of several layers:

  • WorkflowBuilder -- Fluent API for defining nodes, edges, conditionals, loops, and parallel branches. Validates the DAG at build time.
  • WorkflowExecutor -- Runs the workflow with configurable concurrency, checkpointing, and streaming events.
  • WorkflowScheduler -- Internal component that resolves execution order from the DAG topology and manages parallel task scheduling.
  • Node factories -- agentNode, toolNode, functionNode, customNode for creating typed workflow nodes.
  • Saga module -- Retry policies, circuit breakers, compensation management, dead letter queues, and idempotency.
  • Triggers -- Cron schedules, webhooks, and event-driven workflow activation.
  • Patterns -- MapReduce, sub-workflows, fan-out/fan-in, scatter-gather, race, and fallback patterns.

Package Exports

Everything is exported from @cogitator-ai/workflows:

import {
  WorkflowBuilder,
  WorkflowExecutor,
  WorkflowScheduler,
  agentNode,
  toolNode,
  functionNode,
  customNode,
  executeWithRetry,
  CircuitBreaker,
  CompensationManager,
  InMemoryDLQ,
  createTriggerManager,
  cronTrigger,
  webhookTrigger,
  executeMapReduce,
  subworkflowNode,
  humanNode,
} from '@cogitator-ai/workflows';

Next Steps

On this page