Cogitator
Workflows

Scheduling & Triggers

Cron-based scheduling, webhook triggers, and event-driven workflow activation with rate limiting and authentication.

Overview

The trigger system lets you activate workflows automatically based on schedules, HTTP requests, or internal events. The TriggerManager coordinates all trigger types through a unified API.

import {
  createTriggerManager,
  cronTrigger,
  webhookTrigger,
  eventTrigger,
} from '@cogitator-ai/workflows';

Trigger Manager

The DefaultTriggerManager is the central coordinator for all trigger types.

import { createTriggerManager } from '@cogitator-ai/workflows';

const manager = createTriggerManager({
  onTriggerFire: async (trigger, context) => {
    const result = await executor.execute(workflow, context.payload);
    return result.workflowId;
  },
});

manager.start();

Registering Triggers

const cronId = await manager.register({
  workflowName: 'daily-report',
  type: 'cron',
  config: cronTrigger('0 9 * * *', { timezone: 'America/New_York' }),
  enabled: true,
});

const webhookId = await manager.register({
  workflowName: 'pr-review',
  type: 'webhook',
  config: webhookTrigger('/github/pr', 'POST', {
    auth: { type: 'hmac', secret: process.env.GITHUB_SECRET!, algorithm: 'sha256' },
  }),
  enabled: true,
});

const eventId = await manager.register({
  workflowName: 'order-processing',
  type: 'event',
  config: eventTrigger('order.created', {
    filter: (payload) => (payload as { amount: number }).amount > 100,
  }),
  enabled: true,
});

Managing Triggers

await manager.enable(triggerId);
await manager.disable(triggerId);
await manager.unregister(triggerId);

const trigger = await manager.get(triggerId);
const allTriggers = await manager.list('daily-report');
const enabled = await manager.listEnabled();

Manual Fire

await manager.fire(triggerId, {
  payload: { forced: true, reason: 'manual test' },
});

Stats

const stats = await manager.getStats();
console.log(`Total triggers: ${stats.total}`);
console.log(`Enabled: ${stats.enabled}`);
console.log(`Total fired: ${stats.totalFired}`);
console.log(`By type:`, stats.byType);

Subscribing to Events

const unsubscribe = manager.onTrigger((trigger, context) => {
  console.log(`Trigger fired: ${trigger.workflowName} (${trigger.type})`);
  console.log(`Payload:`, context.payload);
});

Cron Triggers

Schedule workflows with cron expressions. Supports standard 5-field cron syntax, timezones, catch-up for missed runs, and concurrency limits.

Basic Cron

import { cronTrigger } from '@cogitator-ai/workflows';

const config = cronTrigger('0 9 * * *');

const everyMinute = cronTrigger('* * * * *');
const hourly = cronTrigger('0 * * * *');
const weekdays9am = cronTrigger('0 9 * * 1-5');
const firstOfMonth = cronTrigger('0 0 1 * *');

Cron Options

const config = cronTrigger('0 9 * * *', {
  timezone: 'Europe/London',
  enabled: true,
  runImmediately: false,
  maxConcurrent: 1,
  catchUp: true,
  input: { reportType: 'daily' },
  condition: (ctx) => {
    const hour = new Date(ctx.timestamp).getHours();
    return hour >= 6 && hour <= 22;
  },
});
OptionTypeDescription
timezonestringIANA timezone (e.g., 'America/New_York')
enabledbooleanWhether the trigger is active
runImmediatelybooleanFire once immediately on registration
maxConcurrentnumberMax simultaneous runs from this trigger
catchUpbooleanRun missed executions after downtime
inputobject | functionStatic or dynamic input for the workflow
condition(ctx) => booleanAdditional gate before firing

Cron Utilities

The package includes standalone cron parsing utilities:

import {
  parseCronExpression,
  getNextCronOccurrence,
  getNextCronOccurrences,
  describeCronExpression,
  isValidCronExpression,
  CRON_PRESETS,
} from '@cogitator-ai/workflows';

const isValid = isValidCronExpression('0 9 * * *');

const next = getNextCronOccurrence('0 9 * * *', {
  timezone: 'America/New_York',
});

const nextFive = getNextCronOccurrences('*/15 * * * *', { count: 5 });

const description = describeCronExpression('0 9 * * 1-5');

Direct CronTriggerExecutor

For lower-level control without the full TriggerManager:

import { createCronTrigger } from '@cogitator-ai/workflows';

const cronExecutor = createCronTrigger({
  onFire: async (trigger, context) => {
    console.log(`Cron fired for ${trigger.workflowName}`);
    return 'run_123';
  },
});

const triggerId = cronExecutor.register('nightly-cleanup', {
  expression: '0 2 * * *',
  timezone: 'UTC',
  enabled: true,
});

const state = cronExecutor.getState(triggerId);
console.log(`Next run: ${new Date(state!.nextRun!).toISOString()}`);

Webhook Triggers

Activate workflows from HTTP requests with built-in authentication, rate limiting, payload validation, and deduplication.

Basic Webhook

import { webhookTrigger } from '@cogitator-ai/workflows';

const config = webhookTrigger('/api/deploy', 'POST');

Authentication

Four auth methods are supported:

const bearerAuth = webhookTrigger('/api/trigger', 'POST', {
  auth: { type: 'bearer', secret: 'my-secret-token' },
});

const hmacAuth = webhookTrigger('/github/webhook', 'POST', {
  auth: {
    type: 'hmac',
    secret: process.env.WEBHOOK_SECRET!,
    algorithm: 'sha256',
    headerName: 'x-hub-signature-256',
  },
});

const apiKeyAuth = webhookTrigger('/api/trigger', 'POST', {
  auth: { type: 'api-key', secret: 'key_abc123', headerName: 'x-api-key' },
});

const basicAuth = webhookTrigger('/api/trigger', 'POST', {
  auth: { type: 'basic', secret: 'user:password' },
});

Rate Limiting

const config = webhookTrigger('/api/trigger', 'POST', {
  rateLimit: {
    requests: 100,
    window: 60000,
    burstLimit: 10,
  },
});

Rate limiting is per-client (based on IP address). When exceeded, the webhook returns HTTP 429 with Retry-After headers.

Payload Handling

const config = webhookTrigger('/api/process', 'POST', {
  validatePayload: (body) => {
    const data = body as { type: string; id: string };
    return !!data.type && !!data.id;
  },
  transformPayload: (body) => {
    const data = body as { type: string; id: string; extra: unknown };
    return { eventType: data.type, entityId: data.id };
  },
  deduplicationKey: (body) => (body as { id: string }).id,
  deduplicationWindow: 60000,
});

Handling Requests

Route incoming HTTP requests to the webhook executor via the TriggerManager:

const result = await manager.handleWebhook({
  method: 'POST',
  path: '/github/webhook',
  headers: request.headers,
  body: request.body,
  ip: request.ip,
});

if (result?.triggered) {
  response.status(202).json({ runId: result.runId });
} else {
  response.status(result?.response.status ?? 404).json(result?.response.body);
}

Event Triggers

React to internal application events. Useful for chaining workflows or integrating with your domain events.

import { eventTrigger } from '@cogitator-ai/workflows';

const config = eventTrigger('order.created', {
  source: 'checkout-service',
  filter: (payload) => (payload as { priority: string }).priority === 'high',
  transform: (payload) => ({
    orderId: (payload as { id: string }).id,
    priority: 'high',
  }),
});

Emitting Events

manager.emitEvent('order.created', {
  id: 'order_789',
  priority: 'high',
  source: 'checkout-service',
  amount: 250,
});

Lifecycle

const manager = createTriggerManager({ onTriggerFire: handler });

manager.start();

// ... register triggers, handle webhooks, emit events ...

manager.dispose();

Always call .dispose() to clean up timers, intervals, and rate limiter resources.

On this page