Pipeline Stage Communication
Patterns for connecting independent pipeline stages via message queues — decoupled producers and consumers with batch collection and backpressure.
Connect pipeline stages through message queues so each stage runs as an independent service. Stages don’t call each other directly — they produce to and consume from queues. Combine with worker pool isolation per stage and AIMD rate limiting on external calls for a resilient pipeline.
The Shape
Stage A → [queue] → Stage B → [queue] → Stage Cproducer buffer consumer/ buffer consumer producerEach stage owns its own runtime, scaling, and failure domain. The queue is the contract between them. Stage A doesn’t know or care whether Stage B is written in a different language, runs on different hardware, or processes items one at a time or in batches.
Producer Side: Send and Move On
The producer pushes results to a channel or queue and immediately returns to its own work. No waiting for the consumer.
fn run(jobs: &Receiver<Input>, results: &Sender<Output>) { while let Ok(item) = jobs.recv() { match process(item) { Ok(output) => { results.send(output).ok(); } Err(e) => { handle_error(e); } } }}The .ok() on send is intentional — if the downstream queue is gone, this stage logs and continues rather than panicking.
Consumer Side: Batch Collection
Some stages work more efficiently in batches. Collect items up to a batch size, with a timeout so partial batches don’t stall forever.
async def collect_batch(queue, batch_size: int = 50) -> list: items = [] while len(items) < batch_size: try: item = await asyncio.wait_for(queue.get(), timeout=5.0) items.append(item) except asyncio.TimeoutError: break # flush partial batch return itemsThe timeout is critical. Without it, a batch that’s 49/50 full waits indefinitely if the upstream slows down.
Throughput Matching
Stages rarely have identical throughput. The queue absorbs bursts and smooths mismatches.
| Pattern | When to use |
|---|---|
| 1:1 queue | Stages have similar throughput |
| Fan-out (1:N) | Consumer is slower — parallelize it |
| Batching | Consumer has high per-call overhead, amortize it |
| Bounded queue + backpressure | Prevent memory growth when consumer falls behind |
If Stage B is 3x slower than Stage A, run 3 instances of Stage B consuming from the same queue. The queue is the load balancer.
Key Details
Bounded queues. Unbounded queues hide backpressure until memory runs out. Set a hard cap and let the queue push back on producers when full.
Per-stage monitoring. Track queue depth between each pair of stages. Growing depth means the consumer can’t keep up — scale it or investigate before the queue hits its limit.
Graceful drain. On shutdown, stop accepting new items, flush in-progress work, then close the output queue. Stages shut down in order from the head of the pipeline.