Cogitator
Workflows

Sagas & Compensation

Retry policies, circuit breakers, compensation rollbacks, dead letter queues, and idempotency for resilient workflows.

Overview

The saga module provides fault tolerance primitives for workflow nodes. When a multi-step workflow fails partway through, you need strategies to retry, isolate faults, roll back completed steps, and handle permanently failed operations.

import {
  executeWithRetry,
  withRetry,
  CircuitBreaker,
  CompensationManager,
  InMemoryDLQ,
} from '@cogitator-ai/workflows';

Retry with Backoff

executeWithRetry wraps any async function with configurable retry logic, supporting constant, linear, and exponential backoff with jitter.

import { executeWithRetry } from '@cogitator-ai/workflows';

const result = await executeWithRetry(
  async (attempt) => {
    const response = await fetch('https://api.example.com/data');
    if (!response.ok) throw new Error(`HTTP ${response.status}`);
    return response.json();
  },
  {
    maxRetries: 3,
    backoff: 'exponential',
    initialDelay: 1000,
    maxDelay: 30000,
    multiplier: 2,
    jitter: 0.1,
    onRetry: (info) => {
      console.log(`Attempt ${info.attempt} failed, retrying in ${info.delay}ms`);
    },
  }
);

if (result.success) {
  console.log('Data:', result.result);
} else {
  console.error(`Failed after ${result.attempts} attempts:`, result.error);
}

RetryResult

interface RetryResult<T> {
  success: boolean;
  result?: T;
  error?: Error;
  attempts: number;
  totalDuration: number;
  delays: number[];
}

Backoff Strategies

StrategyBehaviorExample delays (1s base)
constantSame delay every time1s, 1s, 1s
linearDelay grows linearly1s, 2s, 3s
exponentialDelay doubles each time1s, 2s, 4s

Jitter adds +/- randomness to prevent thundering herd. A jitter of 0.1 means +/-10% of the base delay.

Retryable Error Detection

By default, network errors, timeouts, and 5xx/429 status codes are retried. Override with isRetryable:

const result = await executeWithRetry(fn, {
  maxRetries: 5,
  isRetryable: (error) => {
    return error.message.includes('ECONNRESET') || error.message.includes('429');
  },
});

withRetry Wrapper

Create a retryable version of any async function:

import { withRetry } from '@cogitator-ai/workflows';

const fetchWithRetry = withRetry(
  async (url: string) => {
    const res = await fetch(url);
    return res.json();
  },
  { maxRetries: 3, backoff: 'exponential', initialDelay: 500 }
);

const data = await fetchWithRetry('https://api.example.com/data');

Circuit Breaker

The circuit breaker pattern prevents cascading failures by stopping requests to a failing service after a threshold is reached.

import { CircuitBreaker, createCircuitBreaker } from '@cogitator-ai/workflows';

const breaker = createCircuitBreaker({
  threshold: 5,
  resetTimeout: 30000,
  successThreshold: 2,
  halfOpenMax: 3,
});

States

CLOSED ──(failures >= threshold)──→ OPEN ──(timeout elapsed)──→ HALF_OPEN
  ↑                                                                │
  └──────────(successes >= successThreshold)────────────────────────┘
  • Closed -- Normal operation. Failures are counted. Opens after threshold failures.
  • Open -- All requests fail immediately with CircuitBreakerOpenError. After resetTimeout ms, transitions to half-open.
  • Half-open -- Limited test requests allowed (halfOpenMax). If successThreshold consecutive successes, closes. Any failure reopens.

Usage

try {
  const result = await breaker.execute('external-api', async () => {
    return await callExternalAPI();
  });
} catch (error) {
  if (error instanceof CircuitBreakerOpenError) {
    console.log(`Circuit open for ${error.nodeId}, using fallback`);
    return fallbackValue;
  }
  throw error;
}

Monitoring

breaker.onEvent((event) => {
  switch (event.type) {
    case 'state_change':
      console.log(`${event.nodeId}: ${event.from} → ${event.to}`);
      break;
    case 'failure':
      console.error(`${event.nodeId} failed:`, event.error.message);
      break;
    case 'rejected':
      console.warn(`${event.nodeId} rejected: ${event.reason}`);
      break;
  }
});

const stats = breaker.getStats('external-api');
console.log(`State: ${stats.state}, Failures: ${stats.totalFailures}`);

Compensation Manager

When a multi-step workflow fails, compensation rolls back already-completed steps in reverse order.

import { CompensationManager, createCompensationManager } from '@cogitator-ai/workflows';

interface OrderState {
  orderId: string;
  reservationId?: string;
  chargeId?: string;
}

const manager = createCompensationManager<OrderState>();

manager.registerCompensation('reserve-inventory', async (state) => {
  await inventoryService.release(state.reservationId!);
});

manager.registerCompensation(
  'charge-payment',
  async (state) => {
    await paymentService.refund(state.chargeId!);
  },
  {
    retries: 2,
    timeout: 10000,
  }
);

Recording Completions

As nodes complete, mark them so the manager knows which steps to compensate:

manager.markCompleted('reserve-inventory', { reservationId: 'res_123' });
manager.markCompleted('charge-payment', { chargeId: 'ch_456' });

Triggering Compensation

When a downstream step fails:

const report = await manager.compensate(
  currentState,
  'ship-order',
  new Error('Shipping service unavailable')
);

console.log(`Compensated ${report.compensated.length} steps`);
console.log(`All successful: ${report.allSuccessful}`);

if (report.partialFailures.length > 0) {
  console.error('Failed to compensate:', report.partialFailures);
}

Compensation executes in reverse order by default. Each step can be configured with order: 'reverse' | 'forward' | 'parallel'.

CompensationBuilder

Fluent API for building compensation configurations:

import { compensationBuilder } from '@cogitator-ai/workflows';

const manager = compensationBuilder<OrderState>()
  .addStep('reserve', async (state) => {
    await inventoryService.release(state.reservationId!);
  })
  .addConditionalStep(
    'charge',
    async (state) => {
      await paymentService.refund(state.chargeId!);
    },
    (state, error) => {
      return error.message !== 'ALREADY_REFUNDED';
    }
  )
  .build();

Dead Letter Queue

Failed operations that exhaust all retries go to a DLQ for manual inspection and replay.

import { InMemoryDLQ, FileDLQ, createDLQEntry } from '@cogitator-ai/workflows';

const dlq = new InMemoryDLQ({ defaultTTL: 7 * 24 * 60 * 60 * 1000 });

const entry = createDLQEntry(
  'process-payment',
  'wf_abc123',
  'checkout-workflow',
  currentState,
  new Error('Payment gateway timeout'),
  { attempts: 3, maxAttempts: 3, tags: ['payment', 'critical'] }
);

const id = await dlq.add(entry);

Querying

const entries = await dlq.list({
  workflowName: 'checkout-workflow',
  minAttempts: 3,
  tags: ['critical'],
  limit: 10,
});

for (const entry of entries) {
  console.log(`${entry.nodeId}: ${entry.error.message} (${entry.attempts} attempts)`);
}

File-based DLQ

For persistence across restarts:

const dlq = new FileDLQ('/var/data/dlq', { defaultTTL: 30 * 24 * 60 * 60 * 1000 });

Idempotency

Ensure operations are safe to retry by preventing duplicate execution:

import {
  InMemoryIdempotencyStore,
  generateIdempotencyKey,
  idempotent,
} from '@cogitator-ai/workflows';

const store = new InMemoryIdempotencyStore();

const processPayment = idempotent(
  async (orderId: string, amount: number) => {
    return await paymentGateway.charge(orderId, amount);
  },
  store,
  (orderId, amount) => generateCustomKey('payment', orderId, String(amount))
);

// safe to call multiple times
await processPayment('order_123', 99.99);
await processPayment('order_123', 99.99);

The second call returns the cached result without executing the function again.

On this page