---
title: "Batch queue subsystem"
description: "Why @elizaos/core ships utils/batch-queue and how PriorityQueue, BatchProcessor, TaskDrain, and BatchQueue fit together"
---

This page mirrors the contributor-oriented markdown in the core package (`packages/core/docs/BATCH_QUEUE.md`) so the public docs stay aligned with source.

<Tip>
  **Why this exists:** the runtime is not globally “batching-bound,” but several subsystems share the same **ordering**, **bounded parallelism**, **retries**, and **repeat-task drain** patterns. One small composable stack avoids parallel one-off queues that drift apart over time.
</Tip>

## Problem statement

Several subsystems need overlapping concerns:

- **Ordering** — e.g. high / normal / low priority before processing.
- **Bounded parallelism** — avoid unbounded `Promise.all` against model APIs.
- **Retries and backoff** — align with the rest of the runtime (`utils/retry` in `@elizaos/core`).
- **Scheduling** — repeat tasks with `tags: ["queue", "repeat"]`, stable metadata (`maxFailures: -1`, etc.).

It is tempting to fix **only** the worst hot path (for example a three-line semaphore in one service). That is often correct for a **single** bottleneck. The risk is **proliferation**: each new feature copies a slightly different queue, drain loop, or `createTask` + `registerTaskWorker` pair. Those copies drift, disagree on edge cases (pause, dispose, retry), and make review harder (“is this the same pattern as embedding or a new one?”).

## What we standardized

| Piece | Role |
| ----- | ---- |
| **`PriorityQueue<T>`** | What runs next (three deques, optional `maxSize` / `onPressure`). |
| **`BatchProcessor<T>`** | How a **slice** runs: semaphore-limited concurrency, retries, `onExhausted`. |
| **`TaskDrain`** | When the task system ticks: find/create repeat task, optional worker registration. |
| **`BatchQueue<T>`** | End-to-end “enqueue → drain on interval → process batch” when a service needs all three. |
| **`Semaphore`** | Shared primitive; also re-exported from `prompt-batcher/shared` so existing imports keep working. |

Callers **compose only what they need**:

- **Embedding generation** — `BatchQueue` (queue + processor + drain).
- **Action filter index build** — `BatchProcessor` only (synchronous batch job, no repeat task).
- **Knowledge (documents + `generateTextEmbeddingsBatch`)** — `BatchProcessor` for capped parallel embedding calls when falling back to per-text requests.
- **Prompt batcher per-affinity drains** — `TaskDrain` with **`skipRegisterWorker: true`** because a **single** global worker handles `BATCHER_DRAIN` and dispatches by `metadata.affinityKey`; per-affinity instances must not register duplicate workers with the same name.

## Design choices (WHYs)

### Why not push failed items back onto the priority queue?

**BatchProcessor** retries **in place** for that item, then moves on. Re-queueing failures between ticks would complicate lifecycle (duplicate detection, ordering, partial batches) and could interact badly with concurrent drains. See the header comment in `batch-processor.ts` in the monorepo.

### Why does `TaskDrain` support `skipRegisterWorker`?

Registering a worker twice under the same task name would **overwrite** the previous handler. Batcher affinities share one logical worker in `TaskService`; each affinity only needs the **DB row** and interval updates. See `task-drain.ts` in the monorepo.

### Why `maxFailures: -1` on drain tasks?

`JSON.stringify(Infinity)` becomes `null`; metadata round-trips through storage. **`-1`** is stored reliably and means “do not auto-pause” for long-lived drains. See the core package changelog for batcher / task notes.

### Why is the embedding queue unbounded by default?

**Throughput** (embedding I/O) is usually the real limit, not in-memory queue length. A bounded queue with eviction is a **product policy**; it can be reintroduced via `maxSize` + `onPressure` on `BatchQueueOptions` if a deployment needs it.

### Invalid `getPriority` values?

Anything other than `high`, `normal`, or `low` is logged **once** per queue (via the core logger) and treated as **normal** so typos are visible and do not silently sort work into the low tier.

## Limitations (current behavior)

- **Cancellation:** `dispose` does not abort in-flight `process()` calls (for example an active embedding request). Shutdown is cooperative.
- **High-priority flush on dispose:** Defaults to a dedicated `BatchProcessor` (serial, one attempt per item) for bounded concurrency and no long retry tail on stop; `disposeHighPriorityViaProcessor: false` restores the legacy direct loop.
- **Queue size:** Unbounded unless you configure `maxSize` + `onPressure` on `BatchQueueOptions`.

## Where to read code

- Module rationale (reviewer-oriented): [`batch-queue.ts` (core source)](https://github.com/elizaos/eliza/blob/main/packages/core/src/utils/batch-queue.ts)
- Composition and `BatchQueue` behavior: [`batch-queue/index.ts`](https://github.com/elizaos/eliza/blob/main/packages/core/src/utils/batch-queue/index.ts)
- Canonical markdown (stays in sync with this page): [`BATCH_QUEUE.md`](https://github.com/elizaos/eliza/blob/main/packages/core/docs/BATCH_QUEUE.md)
- Tests: `batch-queue.test.ts`, `task-drain.test.ts` under `packages/core/src/__tests__/`

## Related docs

- [Background tasks](/guides/background-tasks) — task workers, scheduling, and how batch-queue fits beside them
- [Core runtime](/runtime/core) — embedding generation and runtime APIs
- [Contribute to core](/guides/contribute-to-core) — monorepo layout and contribution flow
