Sagas & Compensation
Retry policies, circuit breakers, compensation rollbacks, dead letter queues, and idempotency for resilient workflows.
Overview
The saga module provides fault tolerance primitives for workflow nodes. When a multi-step workflow fails partway through, you need strategies to retry, isolate faults, roll back completed steps, and handle permanently failed operations.
import {
executeWithRetry,
withRetry,
CircuitBreaker,
CompensationManager,
InMemoryDLQ,
} from '@cogitator-ai/workflows';Retry with Backoff
executeWithRetry wraps any async function with configurable retry logic, supporting constant, linear, and exponential backoff with jitter.
import { executeWithRetry } from '@cogitator-ai/workflows';
const result = await executeWithRetry(
async (attempt) => {
const response = await fetch('https://api.example.com/data');
if (!response.ok) throw new Error(`HTTP ${response.status}`);
return response.json();
},
{
maxRetries: 3,
backoff: 'exponential',
initialDelay: 1000,
maxDelay: 30000,
multiplier: 2,
jitter: 0.1,
onRetry: (info) => {
console.log(`Attempt ${info.attempt} failed, retrying in ${info.delay}ms`);
},
}
);
if (result.success) {
console.log('Data:', result.result);
} else {
console.error(`Failed after ${result.attempts} attempts:`, result.error);
}RetryResult
interface RetryResult<T> {
success: boolean;
result?: T;
error?: Error;
attempts: number;
totalDuration: number;
delays: number[];
}Backoff Strategies
| Strategy | Behavior | Example delays (1s base) |
|---|---|---|
constant | Same delay every time | 1s, 1s, 1s |
linear | Delay grows linearly | 1s, 2s, 3s |
exponential | Delay doubles each time | 1s, 2s, 4s |
Jitter adds +/- randomness to prevent thundering herd. A jitter of 0.1 means +/-10% of the base delay.
Retryable Error Detection
By default, network errors, timeouts, and 5xx/429 status codes are retried. Override with isRetryable:
const result = await executeWithRetry(fn, {
maxRetries: 5,
isRetryable: (error) => {
return error.message.includes('ECONNRESET') || error.message.includes('429');
},
});withRetry Wrapper
Create a retryable version of any async function:
import { withRetry } from '@cogitator-ai/workflows';
const fetchWithRetry = withRetry(
async (url: string) => {
const res = await fetch(url);
return res.json();
},
{ maxRetries: 3, backoff: 'exponential', initialDelay: 500 }
);
const data = await fetchWithRetry('https://api.example.com/data');Circuit Breaker
The circuit breaker pattern prevents cascading failures by stopping requests to a failing service after a threshold is reached.
import { CircuitBreaker, createCircuitBreaker } from '@cogitator-ai/workflows';
const breaker = createCircuitBreaker({
threshold: 5,
resetTimeout: 30000,
successThreshold: 2,
halfOpenMax: 3,
});States
CLOSED ──(failures >= threshold)──→ OPEN ──(timeout elapsed)──→ HALF_OPEN
↑ │
└──────────(successes >= successThreshold)────────────────────────┘- Closed -- Normal operation. Failures are counted. Opens after
thresholdfailures. - Open -- All requests fail immediately with
CircuitBreakerOpenError. AfterresetTimeoutms, transitions to half-open. - Half-open -- Limited test requests allowed (
halfOpenMax). IfsuccessThresholdconsecutive successes, closes. Any failure reopens.
Usage
try {
const result = await breaker.execute('external-api', async () => {
return await callExternalAPI();
});
} catch (error) {
if (error instanceof CircuitBreakerOpenError) {
console.log(`Circuit open for ${error.nodeId}, using fallback`);
return fallbackValue;
}
throw error;
}Monitoring
breaker.onEvent((event) => {
switch (event.type) {
case 'state_change':
console.log(`${event.nodeId}: ${event.from} → ${event.to}`);
break;
case 'failure':
console.error(`${event.nodeId} failed:`, event.error.message);
break;
case 'rejected':
console.warn(`${event.nodeId} rejected: ${event.reason}`);
break;
}
});
const stats = breaker.getStats('external-api');
console.log(`State: ${stats.state}, Failures: ${stats.totalFailures}`);Compensation Manager
When a multi-step workflow fails, compensation rolls back already-completed steps in reverse order.
import { CompensationManager, createCompensationManager } from '@cogitator-ai/workflows';
interface OrderState {
orderId: string;
reservationId?: string;
chargeId?: string;
}
const manager = createCompensationManager<OrderState>();
manager.registerCompensation('reserve-inventory', async (state) => {
await inventoryService.release(state.reservationId!);
});
manager.registerCompensation(
'charge-payment',
async (state) => {
await paymentService.refund(state.chargeId!);
},
{
retries: 2,
timeout: 10000,
}
);Recording Completions
As nodes complete, mark them so the manager knows which steps to compensate:
manager.markCompleted('reserve-inventory', { reservationId: 'res_123' });
manager.markCompleted('charge-payment', { chargeId: 'ch_456' });Triggering Compensation
When a downstream step fails:
const report = await manager.compensate(
currentState,
'ship-order',
new Error('Shipping service unavailable')
);
console.log(`Compensated ${report.compensated.length} steps`);
console.log(`All successful: ${report.allSuccessful}`);
if (report.partialFailures.length > 0) {
console.error('Failed to compensate:', report.partialFailures);
}Compensation executes in reverse order by default. Each step can be configured with order: 'reverse' | 'forward' | 'parallel'.
CompensationBuilder
Fluent API for building compensation configurations:
import { compensationBuilder } from '@cogitator-ai/workflows';
const manager = compensationBuilder<OrderState>()
.addStep('reserve', async (state) => {
await inventoryService.release(state.reservationId!);
})
.addConditionalStep(
'charge',
async (state) => {
await paymentService.refund(state.chargeId!);
},
(state, error) => {
return error.message !== 'ALREADY_REFUNDED';
}
)
.build();Dead Letter Queue
Failed operations that exhaust all retries go to a DLQ for manual inspection and replay.
import { InMemoryDLQ, FileDLQ, createDLQEntry } from '@cogitator-ai/workflows';
const dlq = new InMemoryDLQ({ defaultTTL: 7 * 24 * 60 * 60 * 1000 });
const entry = createDLQEntry(
'process-payment',
'wf_abc123',
'checkout-workflow',
currentState,
new Error('Payment gateway timeout'),
{ attempts: 3, maxAttempts: 3, tags: ['payment', 'critical'] }
);
const id = await dlq.add(entry);Querying
const entries = await dlq.list({
workflowName: 'checkout-workflow',
minAttempts: 3,
tags: ['critical'],
limit: 10,
});
for (const entry of entries) {
console.log(`${entry.nodeId}: ${entry.error.message} (${entry.attempts} attempts)`);
}File-based DLQ
For persistence across restarts:
const dlq = new FileDLQ('/var/data/dlq', { defaultTTL: 30 * 24 * 60 * 60 * 1000 });Idempotency
Ensure operations are safe to retry by preventing duplicate execution:
import {
InMemoryIdempotencyStore,
generateIdempotencyKey,
idempotent,
} from '@cogitator-ai/workflows';
const store = new InMemoryIdempotencyStore();
const processPayment = idempotent(
async (orderId: string, amount: number) => {
return await paymentGateway.charge(orderId, amount);
},
store,
(orderId, amount) => generateCustomKey('payment', orderId, String(amount))
);
// safe to call multiple times
await processPayment('order_123', 99.99);
await processPayment('order_123', 99.99);The second call returns the cached result without executing the function again.