Node Types
Built-in workflow node factories for agents, tools, functions, custom logic, and human-in-the-loop interactions.
Overview
Every node in a workflow is a WorkflowNode<S> with a name and an async fn that receives a NodeContext<S> and returns a NodeResult<S>. Instead of writing these by hand, use the built-in node factories.
import { agentNode, toolNode, functionNode, customNode } from '@cogitator-ai/workflows';Node Context
Every node function receives a NodeContext<S>:
interface NodeContext<S> {
state: S;
nodeId: string;
workflowId: string;
step: number;
input?: unknown;
reportProgress: (progress: number) => void;
}state-- Current workflow state snapshotinput-- Output from upstream dependency nodes (single value if one dep, array if multiple)reportProgress-- Report 0-100 progress for streaming events
Node Result
Every node function returns a NodeResult<S>:
interface NodeResult<S> {
state?: Partial<S>;
output?: unknown;
next?: string | string[];
}state-- Partial state updates merged into the workflow stateoutput-- Value passed to downstream nodes viactx.inputnext-- Override automatic edge routing with explicit next node(s)
agentNode
Runs a Cogitator Agent as a workflow node. The executor injects a cogitator instance into the context automatically.
import { Agent } from '@cogitator-ai/core';
import { agentNode } from '@cogitator-ai/workflows';
const researcher = new Agent({
name: 'researcher',
instructions: 'You are a research assistant.',
});
const node = agentNode(researcher, {
inputMapper: (state) => `Research the topic: ${state.topic}`,
stateMapper: (result) => ({ findings: result.output }),
runOptions: { temperature: 0.3 },
});AgentNodeOptions
| Option | Type | Description |
|---|---|---|
inputMapper | (state, input?) => string | Convert state to agent input string. Defaults to JSON.stringify(state) |
stateMapper | (result: RunResult) => Partial<S> | Map agent output to state updates |
runOptions | Partial<RunOptions> | Override temperature, maxTokens, etc. |
Behavior
inputMapperconverts the current state into a prompt string- The agent runs via
cogitator.run(agent, { input, ...runOptions }) stateMapperextracts state updates from theRunResult- The agent's raw output is passed downstream via
output
builder.addNode(
'research',
agentNode(researcher, {
inputMapper: (state) => state.query,
stateMapper: (result) => ({ answer: result.output }),
})
);toolNode
Runs a single Tool directly without an agent wrapper.
import { tool } from '@cogitator-ai/core';
import { toolNode } from '@cogitator-ai/workflows';
import { z } from 'zod';
const fetchUrl = tool({
name: 'fetch_url',
description: 'Fetch content from a URL',
parameters: z.object({ url: z.string() }),
execute: async ({ url }) => {
const res = await fetch(url);
return res.text();
},
});
const node = toolNode(fetchUrl, {
argsMapper: (state) => ({ url: state.targetUrl }),
stateMapper: (result) => ({ htmlContent: result as string }),
});ToolNodeOptions
| Option | Type | Required | Description |
|---|---|---|---|
argsMapper | (state, input?) => TArgs | Yes | Map state to tool arguments |
stateMapper | (result) => Partial<S> | No | Map tool result to state updates |
Behavior
argsMapperbuilds typed arguments from state- The tool's
executefunction runs with those arguments stateMapperextracts state updates from the result- The raw tool result is passed downstream
functionNode
Runs an arbitrary async function. The simplest node type for data transformation, validation, or any custom logic.
import { functionNode } from '@cogitator-ai/workflows';
const node = functionNode(
'normalize',
async (state, input) => {
const data = state.rawData ?? '';
return data.trim().toLowerCase().replace(/\s+/g, ' ');
},
{
stateMapper: (output) => ({ cleanData: output as string }),
}
);Signature
function functionNode<S, O>(
name: string,
fn: (state: S, input?: unknown) => Promise<O>,
options?: { stateMapper?: (output: unknown) => Partial<S> }
): WorkflowNode<S>;The function receives the full state and optional input from upstream nodes. Return any value -- it becomes the node's output.
Examples
Validation:
const validate = functionNode('validate', async (state) => {
if (!state.email?.includes('@')) {
throw new Error('Invalid email');
}
return { valid: true };
});Data transformation:
const transform = functionNode(
'transform',
async (state) => {
return state.items.filter((item) => item.score > 0.5);
},
{
stateMapper: (output) => ({ filteredItems: output as Item[] }),
}
);customNode
Full control over the node context and result. Use when you need access to workflowId, step, progress reporting, or want to explicitly set next routing.
import { customNode } from '@cogitator-ai/workflows';
const node = customNode('complex-step', async (ctx) => {
ctx.reportProgress(0);
const data = await fetchExternalAPI(ctx.state.apiEndpoint);
ctx.reportProgress(50);
const processed = await processData(data);
ctx.reportProgress(100);
return {
state: { processedData: processed, lastStep: ctx.nodeId },
output: processed,
next: processed.needsReview ? 'review' : 'finalize',
};
});Signature
function customNode<S>(
name: string,
fn: (ctx: NodeContext<S>) => Promise<NodeResult<S>>
): WorkflowNode<S>;The next field in the result lets you override the DAG routing dynamically at runtime.
humanNode
Pauses the workflow for human approval or input. Part of the human-in-the-loop system.
import { humanNode, approvalNode, choiceNode, inputNode } from '@cogitator-ai/workflows';
const approval = approvalNode({
name: 'manager-approval',
prompt: (state) => `Approve expense of $${state.amount}?`,
approvers: ['manager@company.com'],
timeout: 24 * 60 * 60 * 1000,
});
const choice = choiceNode({
name: 'select-action',
prompt: (state) => `How should we handle: ${state.issue}?`,
choices: ['fix', 'ignore', 'escalate'],
});
const freeform = inputNode({
name: 'get-feedback',
prompt: (state) => `Provide feedback on: ${state.draft}`,
});These nodes integrate with ApprovalStore and ApprovalNotifier for persistence and notification delivery. See the human-in-the-loop module for store implementations (InMemoryApprovalStore, FileApprovalStore).
Wiring Nodes into Workflows
All node factories return WorkflowNode<S> objects. Pass them directly to WorkflowBuilder.addNode:
const workflow = new WorkflowBuilder<MyState>('pipeline')
.initialState({ url: '' })
.addNode(
'fetch',
toolNode(fetchUrl, {
argsMapper: (state) => ({ url: state.url }),
stateMapper: (result) => ({ content: result as string }),
})
)
.addNode(
'analyze',
agentNode(analyst, {
inputMapper: (state) => state.content!,
stateMapper: (result) => ({ analysis: result.output }),
}),
{ after: ['fetch'] }
)
.addNode(
'format',
functionNode('format', async (state) => {
return `# Analysis\n\n${state.analysis}`;
}),
{ after: ['analyze'] }
)
.build();