Cogitator
Deployment

Worker Queues

Distributed job processing with BullMQ — background execution for agents, workflows, and swarms.

Overview

The @cogitator-ai/worker package provides distributed job processing backed by BullMQ and Redis. It supports background execution of agents, workflows, and swarms with automatic retries, job priorities, and Prometheus metrics for autoscaling.

pnpm add @cogitator-ai/worker

JobQueue

JobQueue serializes agent configurations and dispatches them as jobs to Redis.

import { JobQueue } from '@cogitator-ai/worker';

const queue = new JobQueue({
  name: 'cogitator-jobs',
  redis: { host: 'localhost', port: 6379 },
  defaultJobOptions: {
    attempts: 3,
    backoff: { type: 'exponential', delay: 1000 },
    removeOnComplete: 100,
    removeOnFail: 500,
  },
});

const job = await queue.addAgentJob(
  {
    name: 'researcher',
    instructions: 'You are a research assistant.',
    model: 'gpt-4o',
    provider: 'openai',
    tools: [],
  },
  'Summarize the latest AI papers on reasoning',
  { priority: 1, threadId: 'thread_abc123' }
);

console.log('State:', await queue.getJobState(job.id!));

Workflow and swarm jobs follow the same pattern:

await queue.addWorkflowJob(workflowConfig, { query: 'input' }, { priority: 2 });
await queue.addSwarmJob(swarmConfig, 'Debate the pros and cons of microservices');

WorkerPool

WorkerPool spawns BullMQ workers that consume jobs from the queue. Built-in processors handle agent, workflow, swarm, and distributed swarm-agent job types.

import { WorkerPool } from '@cogitator-ai/worker';

const pool = new WorkerPool(
  {
    name: 'cogitator-jobs',
    redis: { host: 'localhost', port: 6379 },
    workerCount: 4,
    concurrency: 5,
  },
  {
    onJobCompleted: (jobId, result) => console.log(`${jobId} done`),
    onJobFailed: (jobId, error) => console.error(`${jobId} failed:`, error.message),
  }
);

await pool.start();

process.on('SIGTERM', async () => {
  await pool.stop(30_000);
  await queue.close();
});

Redis Cluster

Pass cluster nodes to both queue and workers. Cluster mode uses the {cogitator} hash tag prefix to keep keys on the same slot.

const redisConfig = {
  password: process.env.REDIS_PASSWORD,
  cluster: {
    nodes: [
      { host: 'redis-1.example.com', port: 6379 },
      { host: 'redis-2.example.com', port: 6379 },
    ],
  },
};

const queue = new JobQueue({ redis: redisConfig });
const pool = new WorkerPool({ redis: redisConfig, workerCount: 4, concurrency: 10 });

Prometheus Metrics

Expose queue metrics for monitoring and Kubernetes HPA:

import { MetricsCollector } from '@cogitator-ai/worker';

const collector = new MetricsCollector();

app.get('/metrics', async (req, res) => {
  const metrics = await queue.getMetrics();
  res.set('Content-Type', 'text/plain');
  res.send(collector.format(metrics, { queue: 'cogitator-jobs' }));
});
MetricTypeDescription
cogitator_queue_depthgaugeJobs waiting + delayed
cogitator_queue_activegaugeJobs currently processing
cogitator_queue_completed_totalcounterTotal completed jobs
cogitator_queue_failed_totalcounterTotal failed jobs
cogitator_workers_totalgaugeActive worker count
cogitator_job_duration_secondshistogramProcessing time distribution

HPA Autoscaling

Use cogitator_queue_depth as the scaling signal in a Kubernetes HorizontalPodAutoscaler:

apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: cogitator-worker
  minReplicas: 1
  maxReplicas: 20
  metrics:
    - type: Pods
      pods:
        metric:
          name: cogitator_queue_depth
        target:
          type: AverageValue
          averageValue: '10'

On this page