Cogitator
Swarms

Distributed Swarms

Scale swarms across multiple machines with Redis-backed coordination, distributed worker nodes, and shared state.

Overview

By default, swarms run in-memory on a single process. For production workloads, you can enable distributed mode to coordinate agents across multiple machines using Redis as the communication backbone.

┌─────────────────────────┐
│     Swarm Coordinator   │
│     (orchestrator node)  │
│                         │
│  Strategy ──▶ Redis     │
│             (pub/sub)   │
└──────────┬──────────────┘
           │ job queue
    ┌──────┴──────┐
    ▼             ▼
┌─────────┐  ┌─────────┐
│ Worker  │  │ Worker  │
│ Node 1  │  │ Node 2  │
│         │  │         │
│ Cogitator│  │ Cogitator│
│ + Agent │  │ + Agent │
└─────────┘  └─────────┘

The orchestrator node runs the strategy and dispatches agent jobs to a Redis queue. Worker nodes pick up jobs, execute the agent, and publish results back through Redis.

Enabling Distributed Mode

import { Cogitator, Agent } from '@cogitator-ai/core';
import { swarm } from '@cogitator-ai/swarms';

const team = swarm('distributed-team')
  .strategy('hierarchical')
  .supervisor(new Agent({ name: 'lead', instructions: 'Coordinate the team.' }))
  .workers([
    new Agent({ name: 'worker-a', instructions: 'Handle backend tasks.' }),
    new Agent({ name: 'worker-b', instructions: 'Handle frontend tasks.' }),
  ])
  .distributed({
    enabled: true,
    redis: {
      host: 'redis.example.com',
      port: 6379,
      password: process.env.REDIS_PASSWORD,
      db: 0,
      keyPrefix: 'swarm',
    },
    queue: 'swarm-agent-jobs',
    timeout: 300_000,
  })
  .build(cogitator);

await team.run({ input: 'Build the checkout flow' });
await team.close();

Always call team.close() when done to clean up Redis connections.

Distributed Configuration

OptionDefaultDescription
enabledfalseEnable distributed mode
redis.host'localhost'Redis server hostname
redis.port6379Redis server port
redis.passwordundefinedRedis authentication password
redis.db0Redis database number
redis.keyPrefix'swarm'Key prefix for all Redis keys
queue'swarm-agent-jobs'Redis list name used as the job queue
timeout300000Timeout in ms for waiting on agent job results

How It Works

When distributed mode is enabled, the Swarm creates a DistributedSwarmCoordinator instead of the local SwarmCoordinator. The distributed coordinator:

  1. Initializes Redis-backed primitivesRedisMessageBus, RedisBlackboard, and RedisSwarmEventEmitter replace their in-memory counterparts
  2. Dispatches jobs — when a strategy calls runAgent(), the coordinator serializes the agent config and pushes a job payload to a Redis list
  3. Waits for results — subscribes to a Redis pub/sub channel for job results, with a configurable timeout
  4. Synchronizes state — blackboard writes and messages are persisted in Redis, so all nodes see the same state

Redis Key Structure

All keys are prefixed with {keyPrefix}:{swarmId}:

Key PatternTypePurpose
swarm:{id}:messagesListMessage bus message history
swarm:{id}:channel:{target}Pub/SubReal-time message delivery
swarm:{id}:blackboard:{section}String (JSON)Blackboard section data
swarm:{id}:blackboard:{section}:historyListBlackboard write history
swarm:{id}:blackboard:changesPub/SubBlackboard change notifications
swarm:{id}:eventsListEvent history
swarm:{id}:events:livePub/SubLive event streaming
swarm:{id}:resultsPub/SubAgent job results
swarm:jobs:{queue}ListJob queue for worker nodes

RedisMessageBus

Drop-in replacement for InMemoryMessageBus. Messages are stored in a Redis list and delivered in real-time via pub/sub pattern subscriptions.

import { RedisMessageBus } from '@cogitator-ai/swarms';
import Redis from 'ioredis';

const bus = new RedisMessageBus(
  { enabled: true, protocol: 'direct' },
  { redis: new Redis(), swarmId: 'swarm_abc123', keyPrefix: 'swarm' }
);

await bus.initialize();

await bus.send({
  swarmId: 'swarm_abc123',
  from: 'agent-a',
  to: 'agent-b',
  type: 'request',
  content: 'Need your analysis',
});

await bus.syncFromRedis();

await bus.close();

The bus maintains a local cache for fast reads and subscribes to Redis pub/sub channels using pattern matching (swarm:{id}:channel:*) for real-time delivery.

RedisBlackboard

Shared state synchronized across all nodes. Each section is stored as a JSON string in Redis, with change notifications broadcast via pub/sub.

import { RedisBlackboard } from '@cogitator-ai/swarms';
import Redis from 'ioredis';

const bb = new RedisBlackboard(
  { enabled: true, sections: { tasks: [] }, trackHistory: true },
  { redis: new Redis(), swarmId: 'swarm_abc123', keyPrefix: 'swarm' }
);

await bb.initialize();

bb.write('tasks', [{ id: 1, title: 'Implement auth' }], 'lead');

const tasks = bb.read('tasks');

bb.subscribe('tasks', (data, agentName) => {
  console.log(`Tasks updated by ${agentName}`);
});

await bb.syncFromRedis();
await bb.close();

On initialization, existing sections are loaded from Redis. The local cache provides instant reads, while writes are persisted to Redis and broadcast to all subscribers.

RedisSwarmEventEmitter

Events are persisted in a Redis list and streamed live to all connected nodes via pub/sub.

import { RedisSwarmEventEmitter } from '@cogitator-ai/swarms';
import Redis from 'ioredis';

const events = new RedisSwarmEventEmitter({
  redis: new Redis(),
  swarmId: 'swarm_abc123',
  keyPrefix: 'swarm',
  maxEvents: 1000,
});

await events.initialize();

events.on('agent:complete', (event) => {
  console.log(`Agent done: ${event.agentName}`);
});

await events.emitAsync('agent:start', { agentName: 'worker-a' });

const history = await events.getEventsAsync();

await events.close();

The emitter auto-trims the event list in Redis to maxEvents entries to prevent unbounded growth.

Building Worker Nodes

Worker nodes consume jobs from the Redis queue, execute agents locally, and publish results back. A minimal worker implementation:

import Redis from 'ioredis';
import { Cogitator, Agent } from '@cogitator-ai/core';

const redis = new Redis({ host: 'redis.example.com' });
const cogitator = new Cogitator({
  /* provider config */
});

async function processJobs() {
  while (true) {
    const [, jobJson] = await redis.blpop('swarm:jobs:swarm-agent-jobs', 0);
    const job = JSON.parse(jobJson);

    const agent = new Agent({
      name: job.agentConfig.name,
      instructions: job.agentConfig.instructions,
      model: `${job.agentConfig.provider}/${job.agentConfig.model}`,
    });

    try {
      const result = await cogitator.run(agent, { input: job.input, context: job.context });

      await redis.publish(
        `swarm:${job.swarmId}:results`,
        JSON.stringify({
          swarmId: job.swarmId,
          agentName: job.agentName,
          output: result.output,
          toolCalls: result.toolCalls,
          tokenUsage: {
            prompt: result.usage.inputTokens,
            completion: result.usage.outputTokens,
            total: result.usage.totalTokens,
          },
        })
      );
    } catch (error) {
      await redis.publish(
        `swarm:${job.swarmId}:results`,
        JSON.stringify({
          swarmId: job.swarmId,
          agentName: job.agentName,
          output: '',
          toolCalls: [],
          tokenUsage: { prompt: 0, completion: 0, total: 0 },
          error: error.message,
        })
      );
    }
  }
}

processJobs();

Scaling Considerations

Horizontal scaling — spin up multiple worker nodes consuming from the same job queue. Redis BLPOP ensures each job is processed by exactly one worker.

State consistency — the blackboard and message bus use Redis as the source of truth. Local caches provide fast reads; writes are immediately persisted and broadcast.

Connection management — each distributed swarm creates 2 Redis connections (main + subscriber). Call close() when done to release connections.

Timeouts — configure the timeout option based on your expected agent execution time. Jobs that exceed the timeout will cause the coordinator to throw an error.

Key cleanup — swarm keys persist in Redis after execution. For production use, set a TTL on keys or implement periodic cleanup based on the swarmId.

On this page