Scheduling & Triggers
Cron-based scheduling, webhook triggers, and event-driven workflow activation with rate limiting and authentication.
Overview
The trigger system lets you activate workflows automatically based on schedules, HTTP requests, or internal events. The TriggerManager coordinates all trigger types through a unified API.
import {
createTriggerManager,
cronTrigger,
webhookTrigger,
eventTrigger,
} from '@cogitator-ai/workflows';Trigger Manager
The DefaultTriggerManager is the central coordinator for all trigger types.
import { createTriggerManager } from '@cogitator-ai/workflows';
const manager = createTriggerManager({
onTriggerFire: async (trigger, context) => {
const result = await executor.execute(workflow, context.payload);
return result.workflowId;
},
});
manager.start();Registering Triggers
const cronId = await manager.register({
workflowName: 'daily-report',
type: 'cron',
config: cronTrigger('0 9 * * *', { timezone: 'America/New_York' }),
enabled: true,
});
const webhookId = await manager.register({
workflowName: 'pr-review',
type: 'webhook',
config: webhookTrigger('/github/pr', 'POST', {
auth: { type: 'hmac', secret: process.env.GITHUB_SECRET!, algorithm: 'sha256' },
}),
enabled: true,
});
const eventId = await manager.register({
workflowName: 'order-processing',
type: 'event',
config: eventTrigger('order.created', {
filter: (payload) => (payload as { amount: number }).amount > 100,
}),
enabled: true,
});Managing Triggers
await manager.enable(triggerId);
await manager.disable(triggerId);
await manager.unregister(triggerId);
const trigger = await manager.get(triggerId);
const allTriggers = await manager.list('daily-report');
const enabled = await manager.listEnabled();Manual Fire
await manager.fire(triggerId, {
payload: { forced: true, reason: 'manual test' },
});Stats
const stats = await manager.getStats();
console.log(`Total triggers: ${stats.total}`);
console.log(`Enabled: ${stats.enabled}`);
console.log(`Total fired: ${stats.totalFired}`);
console.log(`By type:`, stats.byType);Subscribing to Events
const unsubscribe = manager.onTrigger((trigger, context) => {
console.log(`Trigger fired: ${trigger.workflowName} (${trigger.type})`);
console.log(`Payload:`, context.payload);
});Cron Triggers
Schedule workflows with cron expressions. Supports standard 5-field cron syntax, timezones, catch-up for missed runs, and concurrency limits.
Basic Cron
import { cronTrigger } from '@cogitator-ai/workflows';
const config = cronTrigger('0 9 * * *');
const everyMinute = cronTrigger('* * * * *');
const hourly = cronTrigger('0 * * * *');
const weekdays9am = cronTrigger('0 9 * * 1-5');
const firstOfMonth = cronTrigger('0 0 1 * *');Cron Options
const config = cronTrigger('0 9 * * *', {
timezone: 'Europe/London',
enabled: true,
runImmediately: false,
maxConcurrent: 1,
catchUp: true,
input: { reportType: 'daily' },
condition: (ctx) => {
const hour = new Date(ctx.timestamp).getHours();
return hour >= 6 && hour <= 22;
},
});| Option | Type | Description |
|---|---|---|
timezone | string | IANA timezone (e.g., 'America/New_York') |
enabled | boolean | Whether the trigger is active |
runImmediately | boolean | Fire once immediately on registration |
maxConcurrent | number | Max simultaneous runs from this trigger |
catchUp | boolean | Run missed executions after downtime |
input | object | function | Static or dynamic input for the workflow |
condition | (ctx) => boolean | Additional gate before firing |
Cron Utilities
The package includes standalone cron parsing utilities:
import {
parseCronExpression,
getNextCronOccurrence,
getNextCronOccurrences,
describeCronExpression,
isValidCronExpression,
CRON_PRESETS,
} from '@cogitator-ai/workflows';
const isValid = isValidCronExpression('0 9 * * *');
const next = getNextCronOccurrence('0 9 * * *', {
timezone: 'America/New_York',
});
const nextFive = getNextCronOccurrences('*/15 * * * *', { count: 5 });
const description = describeCronExpression('0 9 * * 1-5');Direct CronTriggerExecutor
For lower-level control without the full TriggerManager:
import { createCronTrigger } from '@cogitator-ai/workflows';
const cronExecutor = createCronTrigger({
onFire: async (trigger, context) => {
console.log(`Cron fired for ${trigger.workflowName}`);
return 'run_123';
},
});
const triggerId = cronExecutor.register('nightly-cleanup', {
expression: '0 2 * * *',
timezone: 'UTC',
enabled: true,
});
const state = cronExecutor.getState(triggerId);
console.log(`Next run: ${new Date(state!.nextRun!).toISOString()}`);Webhook Triggers
Activate workflows from HTTP requests with built-in authentication, rate limiting, payload validation, and deduplication.
Basic Webhook
import { webhookTrigger } from '@cogitator-ai/workflows';
const config = webhookTrigger('/api/deploy', 'POST');Authentication
Four auth methods are supported:
const bearerAuth = webhookTrigger('/api/trigger', 'POST', {
auth: { type: 'bearer', secret: 'my-secret-token' },
});
const hmacAuth = webhookTrigger('/github/webhook', 'POST', {
auth: {
type: 'hmac',
secret: process.env.WEBHOOK_SECRET!,
algorithm: 'sha256',
headerName: 'x-hub-signature-256',
},
});
const apiKeyAuth = webhookTrigger('/api/trigger', 'POST', {
auth: { type: 'api-key', secret: 'key_abc123', headerName: 'x-api-key' },
});
const basicAuth = webhookTrigger('/api/trigger', 'POST', {
auth: { type: 'basic', secret: 'user:password' },
});Rate Limiting
const config = webhookTrigger('/api/trigger', 'POST', {
rateLimit: {
requests: 100,
window: 60000,
burstLimit: 10,
},
});Rate limiting is per-client (based on IP address). When exceeded, the webhook returns HTTP 429 with Retry-After headers.
Payload Handling
const config = webhookTrigger('/api/process', 'POST', {
validatePayload: (body) => {
const data = body as { type: string; id: string };
return !!data.type && !!data.id;
},
transformPayload: (body) => {
const data = body as { type: string; id: string; extra: unknown };
return { eventType: data.type, entityId: data.id };
},
deduplicationKey: (body) => (body as { id: string }).id,
deduplicationWindow: 60000,
});Handling Requests
Route incoming HTTP requests to the webhook executor via the TriggerManager:
const result = await manager.handleWebhook({
method: 'POST',
path: '/github/webhook',
headers: request.headers,
body: request.body,
ip: request.ip,
});
if (result?.triggered) {
response.status(202).json({ runId: result.runId });
} else {
response.status(result?.response.status ?? 404).json(result?.response.body);
}Event Triggers
React to internal application events. Useful for chaining workflows or integrating with your domain events.
import { eventTrigger } from '@cogitator-ai/workflows';
const config = eventTrigger('order.created', {
source: 'checkout-service',
filter: (payload) => (payload as { priority: string }).priority === 'high',
transform: (payload) => ({
orderId: (payload as { id: string }).id,
priority: 'high',
}),
});Emitting Events
manager.emitEvent('order.created', {
id: 'order_789',
priority: 'high',
source: 'checkout-service',
amount: 250,
});Lifecycle
const manager = createTriggerManager({ onTriggerFire: handler });
manager.start();
// ... register triggers, handle webhooks, emit events ...
manager.dispose();Always call .dispose() to clean up timers, intervals, and rate limiter resources.