Kindsnippet
Maturityseedling
Confidencemedium
Originai-drafted
Created
Updated
Tagsarchitecture, patterns
Related
Markdown/snippet/pipeline-stage-communication.md
See what AI agents see
🤖 AI-drafted, human-reviewed. What does this mean?
snippet 🌱 seedling 🤖 ai-drafted

Pipeline Stage Communication

Patterns for connecting independent pipeline stages via message queues — decoupled producers and consumers with batch collection and backpressure.

AI-drafted · directed by / updated April 10, 2026

Connect pipeline stages through message queues so each stage runs as an independent service. Stages don’t call each other directly — they produce to and consume from queues. Combine with worker pool isolation per stage and AIMD rate limiting on external calls for a resilient pipeline.

The Shape

Stage A → [queue] → Stage B → [queue] → Stage C
producer buffer consumer/ buffer consumer
producer

Each stage owns its own runtime, scaling, and failure domain. The queue is the contract between them. Stage A doesn’t know or care whether Stage B is written in a different language, runs on different hardware, or processes items one at a time or in batches.

Producer Side: Send and Move On

The producer pushes results to a channel or queue and immediately returns to its own work. No waiting for the consumer.

fn run(jobs: &Receiver<Input>, results: &Sender<Output>) {
while let Ok(item) = jobs.recv() {
match process(item) {
Ok(output) => { results.send(output).ok(); }
Err(e) => { handle_error(e); }
}
}
}

The .ok() on send is intentional — if the downstream queue is gone, this stage logs and continues rather than panicking.

Consumer Side: Batch Collection

Some stages work more efficiently in batches. Collect items up to a batch size, with a timeout so partial batches don’t stall forever.

async def collect_batch(queue, batch_size: int = 50) -> list:
items = []
while len(items) < batch_size:
try:
item = await asyncio.wait_for(queue.get(), timeout=5.0)
items.append(item)
except asyncio.TimeoutError:
break # flush partial batch
return items

The timeout is critical. Without it, a batch that’s 49/50 full waits indefinitely if the upstream slows down.

Throughput Matching

Stages rarely have identical throughput. The queue absorbs bursts and smooths mismatches.

PatternWhen to use
1:1 queueStages have similar throughput
Fan-out (1:N)Consumer is slower — parallelize it
BatchingConsumer has high per-call overhead, amortize it
Bounded queue + backpressurePrevent memory growth when consumer falls behind

If Stage B is 3x slower than Stage A, run 3 instances of Stage B consuming from the same queue. The queue is the load balancer.

Key Details

Bounded queues. Unbounded queues hide backpressure until memory runs out. Set a hard cap and let the queue push back on producers when full.

Per-stage monitoring. Track queue depth between each pair of stages. Growing depth means the consumer can’t keep up — scale it or investigate before the queue hits its limit.

Graceful drain. On shutdown, stop accepting new items, flush in-progress work, then close the output queue. Stages shut down in order from the head of the pipeline.