Introduction
Recently, I encountered some problems that had data stream from abroad while at the same time having larger processes to run on GPU & CPU. Processes could run in parallel but I had restrictions to stream out the final results in chronological order.
This is a surprisingly common shape of problem:
- →Data arrives continuously and can't be paused
- →Heavy computation needs to happen on each piece
- →That work can fan out to multiple machines for parallelism
- →But the final output must respect the original sequence
In my case, the data was a live stream of frames — think a continuous feed from an external source — and the "heavy work" involved running an analysis pass on each frame followed by an enrichment step that combined additional data on top. The catch? The output stream had to be chronological. Frame 42 must be streamed out before frame 43, even if frame 43 finishes processing first.
In this post, I'll walk through how I solved this with async Rust using a producer-consumer pattern built on top of mpsc channels. I'll keep things intuitive and show real code along the way.
The Mental Model: A Kitchen Analogy
Before diving into code, picture a restaurant kitchen:
The waiter can't stop taking orders just because the kitchen is busy. The cooks shouldn't be idle if there are orders waiting. And the expediter absolutely must send dish #5 before dish #6, even if #6 was ready first.
That's our architecture. Let's build it.
Step 1: Setting Up the Data Stream (The Producer)
First, let's model the incoming data stream. In reality, this might come from a sensor, a network socket, or a streaming API. We'll represent it as an async stream:
use tokio::sync::mpsc;
use tokio_stream::StreamExt;
/// Each frame carries its sequence number and raw data payload.
struct Frame {
seq: u64,
data: Vec<u8>,
timestamp: std::time::Instant,
}
/// Simulates an incoming data stream.
/// In practice, this could wrap a gRPC stream, a live feed, etc.
async fn data_stream() -> impl tokio_stream::Stream<Item = Frame> {
let mut seq = 0u64;
tokio_stream::iter(std::iter::from_fn(move || {
seq += 1;
Some(Frame {
seq,
data: vec![0u8; 1024 * 1024], // placeholder payload
timestamp: std::time::Instant::now(),
})
}))
}
The key idea here is that the producer's only job is to ingest frames and hand them off. It shouldn't block waiting for processing. This is where the mpsc channel comes in — it decouples the speed of ingestion from the speed of processing.
Step 2: The MPSC Channel — Decoupling Producer from Consumer
mpsc stands for multi-producer, single-consumer. But don't let the name limit your thinking — we'll use it in a more flexible way. The channel gives us a Sender and a Receiver. The sender can be cloned (that's the "multi-producer" part), and the receiver pulls items off in order.
Here's the crucial insight: the channel is our buffer. It absorbs the mismatch between how fast frames arrive and how fast we can process them.
// Bounded channel: if processing falls behind by more than 30 frames,
// the producer will await (backpressure!). This prevents unbounded
// memory growth.
let (frame_tx, frame_rx) = mpsc::channel::<Frame>(30);
Why bounded? Imagine the data stream pushes 60 fps but your pipeline handles 30 fps on average. Without a bound, your channel (and memory) grows without limit. A bounded channel naturally applies backpressure — the producer slows down when the consumer can't keep up.
Step 3: The Processing Pipeline (Fan-Out to Enrichment Servers)
Now for the interesting part. Processing each frame involves two heavy tasks:
1. Analysis pass — extract features, classify, score, etc.
2. Enrichment — generate additional data to merge onto the frame.
Enrichment can happen on multiple servers in parallel. We model this as a fan-out: one input frame can be dispatched to whichever enrichment server is available.
use tokio::sync::mpsc;
/// Result of processing a single frame.
struct ProcessedFrame {
seq: u64, // original sequence number — critical for ordering
merged_data: Vec<u8>,
metadata: FrameMetadata,
}
struct FrameMetadata {
annotations: Vec<Annotation>,
enrichment_server_id: usize,
processing_time_ms: u64,
}
/// Spawns N enrichment workers that pull from a shared receiver
/// and push results into a results channel.
fn spawn_enrichment_workers(
num_workers: usize,
mut frame_rx: mpsc::Receiver<Frame>,
result_tx: mpsc::Sender<ProcessedFrame>,
) {
// We use a single receiver, but wrap it so workers
// compete for frames (work-stealing style).
let frame_rx = std::sync::Arc::new(tokio::sync::Mutex::new(frame_rx));
for worker_id in 0..num_workers {
let rx = frame_rx.clone();
let tx = result_tx.clone();
tokio::spawn(async move {
loop {
// Lock briefly just to grab the next frame
let frame = {
let mut guard = rx.lock().await;
guard.recv().await
};
let Some(frame) = frame else {
break; // Channel closed, producer is done
};
// --- Heavy processing happens here ---
let processed = process_frame(frame, worker_id).await;
if tx.send(processed).await.is_err() {
break; // Output stream hung up
}
}
});
}
}
async fn process_frame(frame: Frame, worker_id: usize) -> ProcessedFrame {
let start = std::time::Instant::now();
// Step 1: Run analysis pass (CPU/GPU bound)
let annotations = run_analysis(&frame.data).await;
// Step 2: Compute enrichment (could call out to an enrichment server)
let enrichment = compute_enrichment(&annotations, worker_id).await;
// Step 3: Merge
let merged_data = merge(&frame.data, &enrichment);
ProcessedFrame {
seq: frame.seq,
merged_data,
metadata: FrameMetadata {
annotations,
enrichment_server_id: worker_id,
processing_time_ms: start.elapsed().as_millis() as u64,
},
}
}
Notice how each ProcessedFrame retains its original seq number. This is the thread we'll use to stitch everything back together in order.
Step 4: Ordered Output — The Resequencing Buffer
This is the trickiest part. Frames fan out to workers, and faster workers finish first. Frame 10 might complete before frame 8. But we must stream them out as 8, 9, 10.
The solution is a resequencing buffer: a small data structure that holds completed frames and releases them in order.
use std::collections::BTreeMap;
/// Collects out-of-order results and yields them in sequence.
struct ResequenceBuffer {
buffer: BTreeMap<u64, ProcessedFrame>,
next_expected: u64,
}
impl ResequenceBuffer {
fn new(start_seq: u64) -> Self {
Self {
buffer: BTreeMap::new(),
next_expected: start_seq,
}
}
/// Insert a processed frame. Returns all frames that are now
/// ready to be streamed out (a contiguous run starting from next_expected).
fn insert(&mut self, frame: ProcessedFrame) -> Vec<ProcessedFrame> {
self.buffer.insert(frame.seq, frame);
let mut ready = Vec::new();
while let Some(entry) = self.buffer.remove(&self.next_expected) {
ready.push(entry);
self.next_expected += 1;
}
ready
}
}
The beauty of BTreeMap here is that it keeps entries sorted by key, so checking "is the next expected frame here?" is an O(log n) operation. In practice, this buffer stays very small — it only grows when there's high variance in processing time across workers.
Step 5: The Output Streamer (Chronological Consumer)
Now we wire the resequencing buffer into a consumer that reads from the results channel:
async fn run_output_stream(mut result_rx: mpsc::Receiver<ProcessedFrame>) {
let mut resequencer = ResequenceBuffer::new(1); // frames start at seq=1
while let Some(processed) = result_rx.recv().await {
let ready_frames = resequencer.insert(processed);
for frame in ready_frames {
// These are guaranteed to be in chronological order
stream_out_frame(&frame).await;
stream_out_metadata(&frame.metadata).await;
println!(
"Streamed out frame {} (processed by worker {})",
frame.seq, frame.metadata.enrichment_server_id
);
}
}
println!("Output stream complete.");
}
Putting It All Together
Here's the full pipeline, wired up in main:
#[tokio::main]
async fn main() {
let num_enrichment_servers = 4;
// Channel 1: Producer -> Workers
let (frame_tx, frame_rx) = mpsc::channel::<Frame>(30);
// Channel 2: Workers -> Output Streamer
let (result_tx, result_rx) = mpsc::channel::<ProcessedFrame>(30);
// Spawn enrichment workers (consumers of frames, producers of results)
spawn_enrichment_workers(num_enrichment_servers, frame_rx, result_tx);
// Spawn the output streamer
let streamer_handle = tokio::spawn(run_output_stream(result_rx));
// Run the producer: ingest frames and send them into the pipeline
let mut stream = std::pin::pin!(data_stream().await);
while let Some(frame) = stream.next().await {
if frame_tx.send(frame).await.is_err() {
eprintln!("Pipeline closed unexpectedly");
break;
}
}
// Signal that no more frames are coming
drop(frame_tx);
// Wait for the output streamer to finish draining
streamer_handle.await.expect("Output streamer task panicked");
}
Lessons Learned
- Bounded channels are non-negotiable for streaming. Without backpressure, a fast producer will eat all your memory. The
mpsc::channel(N)bound is your safety valve. TuningNis a tradeoff: too small and you starve workers during processing spikes; too large and you buffer too many frames in memory. - Carry the sequence number through the entire pipeline. This is cheap (it's a
u64) and gives you total ordering at the end. Don't rely on arrival order at the results channel — parallel workers will finish in unpredictable order. - The
Arc<Mutex<Receiver>>pattern for work-stealing is simple but effective. Each worker locks the receiver just long enough to grab one frame, then releases. The actual heavy processing happens outside the lock. For most workloads, this contention is negligible compared to the processing time per frame. drop(sender)is your shutdown signal. When the producer drops itsSender, allrecv()calls on the correspondingReceiverwill eventually returnNone, naturally shutting down the pipeline without any extra coordination logic.
What's Next?
There are a few directions this could go:
Dynamic scaling — Spin up or shut down enrichment workers based on queue depth. mpsc::Sender::capacity() can tell you how much headroom remains.
Error recovery — What happens when an enrichment server goes down? You could re-enqueue the frame or mark it as dropped and adjust the resequencer.
Metrics & observability — Track per-worker throughput, resequence buffer depth, and end-to-end latency per frame.
The producer-consumer pattern with mpsc channels gives you a surprisingly clean foundation for all of this. Rust's ownership model ensures you can't accidentally share mutable state between workers, and async/await makes the control flow readable even when the concurrency is complex.
Happy streaming.