---
title: "Pipeline Stage Communication"
description: "Patterns for connecting independent pipeline stages via message queues — decoupled producers and consumers with batch collection and backpressure."
kind: snippet
maturity: seedling
confidence: medium
origin: ai-drafted
tags: [architecture, patterns]
created: 2026-04-07
updated: 2026-04-10
related: [worker-pool-isolation, parallel-ai-research-pipelines, aimd-rate-limiting]
url: https://krowdev.com/snippet/pipeline-stage-communication/
---


Connect pipeline stages through message queues so each stage runs as an independent service. Stages don't call each other directly — they produce to and consume from queues. Combine with [worker pool isolation](/snippet/worker-pool-isolation/) per stage and [AIMD rate limiting](/note/aimd-rate-limiting/) on external calls for a resilient pipeline.

## The Shape

```
Stage A  →  [queue]  →  Stage B  →  [queue]  →  Stage C
producer     buffer     consumer/    buffer     consumer
                        producer
```

Each stage owns its own runtime, scaling, and failure domain. The queue is the contract between them. Stage A doesn't know or care whether Stage B is written in a different language, runs on different hardware, or processes items one at a time or in batches.

## Producer Side: Send and Move On

The producer pushes results to a channel or queue and immediately returns to its own work. No waiting for the consumer.

```rust
fn run(jobs: &Receiver<Input>, results: &Sender<Output>) {
    while let Ok(item) = jobs.recv() {
        match process(item) {
            Ok(output) => { results.send(output).ok(); }
            Err(e) => { handle_error(e); }
        }
    }
}
```

The `.ok()` on send is intentional — if the downstream queue is gone, this stage logs and continues rather than panicking.

## Consumer Side: Batch Collection

Some stages work more efficiently in batches. Collect items up to a batch size, with a timeout so partial batches don't stall forever.

```python
async def collect_batch(queue, batch_size: int = 50) -> list:
    items = []
    while len(items) < batch_size:
        try:
            item = await asyncio.wait_for(queue.get(), timeout=5.0)
            items.append(item)
        except asyncio.TimeoutError:
            break  # flush partial batch
    return items
```

The timeout is critical. Without it, a batch that's 49/50 full waits indefinitely if the upstream slows down.

## Throughput Matching

Stages rarely have identical throughput. The queue absorbs bursts and smooths mismatches.

| Pattern | When to use |
|---------|------------|
| 1:1 queue | Stages have similar throughput |
| Fan-out (1:N) | Consumer is slower — parallelize it |
| Batching | Consumer has high per-call overhead, amortize it |
| Bounded queue + backpressure | Prevent memory growth when consumer falls behind |

If Stage B is 3x slower than Stage A, run 3 instances of Stage B consuming from the same queue. The queue is the load balancer.

## Key Details

**Bounded queues.** Unbounded queues hide backpressure until memory runs out. Set a hard cap and let the queue push back on producers when full.

**Per-stage monitoring.** Track queue depth between each pair of stages. Growing depth means the consumer can't keep up — scale it or investigate before the queue hits its limit.

**Graceful drain.** On shutdown, stop accepting new items, flush in-progress work, then close the output queue. Stages shut down in order from the head of the pipeline.

