Worker Queues
Distributed job processing with BullMQ — background execution for agents, workflows, and swarms.
Overview
The @cogitator-ai/worker package provides distributed job processing backed by BullMQ and Redis. It supports background execution of agents, workflows, and swarms with automatic retries, job priorities, and Prometheus metrics for autoscaling.
pnpm add @cogitator-ai/workerJobQueue
JobQueue serializes agent configurations and dispatches them as jobs to Redis.
import { JobQueue } from '@cogitator-ai/worker';
const queue = new JobQueue({
name: 'cogitator-jobs',
redis: { host: 'localhost', port: 6379 },
defaultJobOptions: {
attempts: 3,
backoff: { type: 'exponential', delay: 1000 },
removeOnComplete: 100,
removeOnFail: 500,
},
});
const job = await queue.addAgentJob(
{
name: 'researcher',
instructions: 'You are a research assistant.',
model: 'gpt-4o',
provider: 'openai',
tools: [],
},
'Summarize the latest AI papers on reasoning',
{ priority: 1, threadId: 'thread_abc123' }
);
console.log('State:', await queue.getJobState(job.id!));Workflow and swarm jobs follow the same pattern:
await queue.addWorkflowJob(workflowConfig, { query: 'input' }, { priority: 2 });
await queue.addSwarmJob(swarmConfig, 'Debate the pros and cons of microservices');WorkerPool
WorkerPool spawns BullMQ workers that consume jobs from the queue. Built-in processors handle agent, workflow, swarm, and distributed swarm-agent job types.
import { WorkerPool } from '@cogitator-ai/worker';
const pool = new WorkerPool(
{
name: 'cogitator-jobs',
redis: { host: 'localhost', port: 6379 },
workerCount: 4,
concurrency: 5,
},
{
onJobCompleted: (jobId, result) => console.log(`${jobId} done`),
onJobFailed: (jobId, error) => console.error(`${jobId} failed:`, error.message),
}
);
await pool.start();
process.on('SIGTERM', async () => {
await pool.stop(30_000);
await queue.close();
});Redis Cluster
Pass cluster nodes to both queue and workers. Cluster mode uses the {cogitator} hash tag prefix to keep keys on the same slot.
const redisConfig = {
password: process.env.REDIS_PASSWORD,
cluster: {
nodes: [
{ host: 'redis-1.example.com', port: 6379 },
{ host: 'redis-2.example.com', port: 6379 },
],
},
};
const queue = new JobQueue({ redis: redisConfig });
const pool = new WorkerPool({ redis: redisConfig, workerCount: 4, concurrency: 10 });Prometheus Metrics
Expose queue metrics for monitoring and Kubernetes HPA:
import { MetricsCollector } from '@cogitator-ai/worker';
const collector = new MetricsCollector();
app.get('/metrics', async (req, res) => {
const metrics = await queue.getMetrics();
res.set('Content-Type', 'text/plain');
res.send(collector.format(metrics, { queue: 'cogitator-jobs' }));
});| Metric | Type | Description |
|---|---|---|
cogitator_queue_depth | gauge | Jobs waiting + delayed |
cogitator_queue_active | gauge | Jobs currently processing |
cogitator_queue_completed_total | counter | Total completed jobs |
cogitator_queue_failed_total | counter | Total failed jobs |
cogitator_workers_total | gauge | Active worker count |
cogitator_job_duration_seconds | histogram | Processing time distribution |
HPA Autoscaling
Use cogitator_queue_depth as the scaling signal in a Kubernetes HorizontalPodAutoscaler:
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: cogitator-worker
minReplicas: 1
maxReplicas: 20
metrics:
- type: Pods
pods:
metric:
name: cogitator_queue_depth
target:
type: AverageValue
averageValue: '10'