Cogitator
Workflows

Node Types

Built-in workflow node factories for agents, tools, functions, custom logic, and human-in-the-loop interactions.

Overview

Every node in a workflow is a WorkflowNode<S> with a name and an async fn that receives a NodeContext<S> and returns a NodeResult<S>. Instead of writing these by hand, use the built-in node factories.

import { agentNode, toolNode, functionNode, customNode } from '@cogitator-ai/workflows';

Node Context

Every node function receives a NodeContext<S>:

interface NodeContext<S> {
  state: S;
  nodeId: string;
  workflowId: string;
  step: number;
  input?: unknown;
  reportProgress: (progress: number) => void;
}
  • state -- Current workflow state snapshot
  • input -- Output from upstream dependency nodes (single value if one dep, array if multiple)
  • reportProgress -- Report 0-100 progress for streaming events

Node Result

Every node function returns a NodeResult<S>:

interface NodeResult<S> {
  state?: Partial<S>;
  output?: unknown;
  next?: string | string[];
}
  • state -- Partial state updates merged into the workflow state
  • output -- Value passed to downstream nodes via ctx.input
  • next -- Override automatic edge routing with explicit next node(s)

agentNode

Runs a Cogitator Agent as a workflow node. The executor injects a cogitator instance into the context automatically.

import { Agent } from '@cogitator-ai/core';
import { agentNode } from '@cogitator-ai/workflows';

const researcher = new Agent({
  name: 'researcher',
  instructions: 'You are a research assistant.',
});

const node = agentNode(researcher, {
  inputMapper: (state) => `Research the topic: ${state.topic}`,
  stateMapper: (result) => ({ findings: result.output }),
  runOptions: { temperature: 0.3 },
});

AgentNodeOptions

OptionTypeDescription
inputMapper(state, input?) => stringConvert state to agent input string. Defaults to JSON.stringify(state)
stateMapper(result: RunResult) => Partial<S>Map agent output to state updates
runOptionsPartial<RunOptions>Override temperature, maxTokens, etc.

Behavior

  1. inputMapper converts the current state into a prompt string
  2. The agent runs via cogitator.run(agent, { input, ...runOptions })
  3. stateMapper extracts state updates from the RunResult
  4. The agent's raw output is passed downstream via output
builder.addNode(
  'research',
  agentNode(researcher, {
    inputMapper: (state) => state.query,
    stateMapper: (result) => ({ answer: result.output }),
  })
);

toolNode

Runs a single Tool directly without an agent wrapper.

import { tool } from '@cogitator-ai/core';
import { toolNode } from '@cogitator-ai/workflows';
import { z } from 'zod';

const fetchUrl = tool({
  name: 'fetch_url',
  description: 'Fetch content from a URL',
  parameters: z.object({ url: z.string() }),
  execute: async ({ url }) => {
    const res = await fetch(url);
    return res.text();
  },
});

const node = toolNode(fetchUrl, {
  argsMapper: (state) => ({ url: state.targetUrl }),
  stateMapper: (result) => ({ htmlContent: result as string }),
});

ToolNodeOptions

OptionTypeRequiredDescription
argsMapper(state, input?) => TArgsYesMap state to tool arguments
stateMapper(result) => Partial<S>NoMap tool result to state updates

Behavior

  1. argsMapper builds typed arguments from state
  2. The tool's execute function runs with those arguments
  3. stateMapper extracts state updates from the result
  4. The raw tool result is passed downstream

functionNode

Runs an arbitrary async function. The simplest node type for data transformation, validation, or any custom logic.

import { functionNode } from '@cogitator-ai/workflows';

const node = functionNode(
  'normalize',
  async (state, input) => {
    const data = state.rawData ?? '';
    return data.trim().toLowerCase().replace(/\s+/g, ' ');
  },
  {
    stateMapper: (output) => ({ cleanData: output as string }),
  }
);

Signature

function functionNode<S, O>(
  name: string,
  fn: (state: S, input?: unknown) => Promise<O>,
  options?: { stateMapper?: (output: unknown) => Partial<S> }
): WorkflowNode<S>;

The function receives the full state and optional input from upstream nodes. Return any value -- it becomes the node's output.

Examples

Validation:

const validate = functionNode('validate', async (state) => {
  if (!state.email?.includes('@')) {
    throw new Error('Invalid email');
  }
  return { valid: true };
});

Data transformation:

const transform = functionNode(
  'transform',
  async (state) => {
    return state.items.filter((item) => item.score > 0.5);
  },
  {
    stateMapper: (output) => ({ filteredItems: output as Item[] }),
  }
);

customNode

Full control over the node context and result. Use when you need access to workflowId, step, progress reporting, or want to explicitly set next routing.

import { customNode } from '@cogitator-ai/workflows';

const node = customNode('complex-step', async (ctx) => {
  ctx.reportProgress(0);

  const data = await fetchExternalAPI(ctx.state.apiEndpoint);
  ctx.reportProgress(50);

  const processed = await processData(data);
  ctx.reportProgress(100);

  return {
    state: { processedData: processed, lastStep: ctx.nodeId },
    output: processed,
    next: processed.needsReview ? 'review' : 'finalize',
  };
});

Signature

function customNode<S>(
  name: string,
  fn: (ctx: NodeContext<S>) => Promise<NodeResult<S>>
): WorkflowNode<S>;

The next field in the result lets you override the DAG routing dynamically at runtime.


humanNode

Pauses the workflow for human approval or input. Part of the human-in-the-loop system.

import { humanNode, approvalNode, choiceNode, inputNode } from '@cogitator-ai/workflows';

const approval = approvalNode({
  name: 'manager-approval',
  prompt: (state) => `Approve expense of $${state.amount}?`,
  approvers: ['manager@company.com'],
  timeout: 24 * 60 * 60 * 1000,
});

const choice = choiceNode({
  name: 'select-action',
  prompt: (state) => `How should we handle: ${state.issue}?`,
  choices: ['fix', 'ignore', 'escalate'],
});

const freeform = inputNode({
  name: 'get-feedback',
  prompt: (state) => `Provide feedback on: ${state.draft}`,
});

These nodes integrate with ApprovalStore and ApprovalNotifier for persistence and notification delivery. See the human-in-the-loop module for store implementations (InMemoryApprovalStore, FileApprovalStore).


Wiring Nodes into Workflows

All node factories return WorkflowNode<S> objects. Pass them directly to WorkflowBuilder.addNode:

const workflow = new WorkflowBuilder<MyState>('pipeline')
  .initialState({ url: '' })
  .addNode(
    'fetch',
    toolNode(fetchUrl, {
      argsMapper: (state) => ({ url: state.url }),
      stateMapper: (result) => ({ content: result as string }),
    })
  )
  .addNode(
    'analyze',
    agentNode(analyst, {
      inputMapper: (state) => state.content!,
      stateMapper: (result) => ({ analysis: result.output }),
    }),
    { after: ['fetch'] }
  )
  .addNode(
    'format',
    functionNode('format', async (state) => {
      return `# Analysis\n\n${state.analysis}`;
    }),
    { after: ['analyze'] }
  )
  .build();

On this page