Rust Async tokio

Taming Data Streams with Async Rust: A Producer-Consumer Approach to Real-Time Processing

Hokwang Choi March 2026 12 min read
TL;DR
Libraries:
tokio — async runtime & MPSC channels
tokio-stream — stream utilities
futures — stream combinators
Key concepts:
· Producer-consumer via mpsc channels · Async streams with backpressure · Fan-out parallelism across workers · Ordered output via resequencing buffer

Introduction

Recently, I encountered some problems that had data stream from abroad while at the same time having larger processes to run on GPU & CPU. Processes could run in parallel but I had restrictions to stream out the final results in chronological order.

This is a surprisingly common shape of problem:

In my case, the data was a live stream of frames — think a continuous feed from an external source — and the "heavy work" involved running an analysis pass on each frame followed by an enrichment step that combined additional data on top. The catch? The output stream had to be chronological. Frame 42 must be streamed out before frame 43, even if frame 43 finishes processing first.

In this post, I'll walk through how I solved this with async Rust using a producer-consumer pattern built on top of mpsc channels. I'll keep things intuitive and show real code along the way.

The Mental Model: A Kitchen Analogy

Before diving into code, picture a restaurant kitchen:

🍽️
The Waiter
Takes orders as they come in — your data stream
👨‍🍳
The Cooks
Work on multiple orders in parallel — your GPU/CPU tasks
📋
The Expediter
Ensures dishes go out in order — your ordered output stream

The waiter can't stop taking orders just because the kitchen is busy. The cooks shouldn't be idle if there are orders waiting. And the expediter absolutely must send dish #5 before dish #6, even if #6 was ready first.

That's our architecture. Let's build it.

Step 1: Setting Up the Data Stream (The Producer)

First, let's model the incoming data stream. In reality, this might come from a sensor, a network socket, or a streaming API. We'll represent it as an async stream:

use tokio::sync::mpsc;
use tokio_stream::StreamExt;

/// Each frame carries its sequence number and raw data payload.
struct Frame {
    seq: u64,
    data: Vec<u8>,
    timestamp: std::time::Instant,
}

/// Simulates an incoming data stream.
/// In practice, this could wrap a gRPC stream, a live feed, etc.
async fn data_stream() -> impl tokio_stream::Stream<Item = Frame> {
    let mut seq = 0u64;
    tokio_stream::iter(std::iter::from_fn(move || {
        seq += 1;
        Some(Frame {
            seq,
            data: vec![0u8; 1024 * 1024], // placeholder payload
            timestamp: std::time::Instant::now(),
        })
    }))
}

The key idea here is that the producer's only job is to ingest frames and hand them off. It shouldn't block waiting for processing. This is where the mpsc channel comes in — it decouples the speed of ingestion from the speed of processing.

Step 2: The MPSC Channel — Decoupling Producer from Consumer

mpsc stands for multi-producer, single-consumer. But don't let the name limit your thinking — we'll use it in a more flexible way. The channel gives us a Sender and a Receiver. The sender can be cloned (that's the "multi-producer" part), and the receiver pulls items off in order.

Here's the crucial insight: the channel is our buffer. It absorbs the mismatch between how fast frames arrive and how fast we can process them.

// Bounded channel: if processing falls behind by more than 30 frames,
// the producer will await (backpressure!). This prevents unbounded
// memory growth.
let (frame_tx, frame_rx) = mpsc::channel::<Frame>(30);

Why bounded? Imagine the data stream pushes 60 fps but your pipeline handles 30 fps on average. Without a bound, your channel (and memory) grows without limit. A bounded channel naturally applies backpressure — the producer slows down when the consumer can't keep up.

Backpressure Mechanism
Producer 60 fps ingest mpsc channel (cap: 30) ■ filled ⬜ available Workers 30 fps avg backpressure slows producer

Step 3: The Processing Pipeline (Fan-Out to Enrichment Servers)

Now for the interesting part. Processing each frame involves two heavy tasks:

1. Analysis pass — extract features, classify, score, etc.
2. Enrichment — generate additional data to merge onto the frame.

Enrichment can happen on multiple servers in parallel. We model this as a fan-out: one input frame can be dispatched to whichever enrichment server is available.

use tokio::sync::mpsc;

/// Result of processing a single frame.
struct ProcessedFrame {
    seq: u64,                 // original sequence number — critical for ordering
    merged_data: Vec<u8>,
    metadata: FrameMetadata,
}

struct FrameMetadata {
    annotations: Vec<Annotation>,
    enrichment_server_id: usize,
    processing_time_ms: u64,
}

/// Spawns N enrichment workers that pull from a shared receiver
/// and push results into a results channel.
fn spawn_enrichment_workers(
    num_workers: usize,
    mut frame_rx: mpsc::Receiver<Frame>,
    result_tx: mpsc::Sender<ProcessedFrame>,
) {
    // We use a single receiver, but wrap it so workers
    // compete for frames (work-stealing style).
    let frame_rx = std::sync::Arc::new(tokio::sync::Mutex::new(frame_rx));

    for worker_id in 0..num_workers {
        let rx = frame_rx.clone();
        let tx = result_tx.clone();

        tokio::spawn(async move {
            loop {
                // Lock briefly just to grab the next frame
                let frame = {
                    let mut guard = rx.lock().await;
                    guard.recv().await
                };

                let Some(frame) = frame else {
                    break; // Channel closed, producer is done
                };

                // --- Heavy processing happens here ---
                let processed = process_frame(frame, worker_id).await;

                if tx.send(processed).await.is_err() {
                    break; // Output stream hung up
                }
            }
        });
    }
}

async fn process_frame(frame: Frame, worker_id: usize) -> ProcessedFrame {
    let start = std::time::Instant::now();

    // Step 1: Run analysis pass (CPU/GPU bound)
    let annotations = run_analysis(&frame.data).await;

    // Step 2: Compute enrichment (could call out to an enrichment server)
    let enrichment = compute_enrichment(&annotations, worker_id).await;

    // Step 3: Merge
    let merged_data = merge(&frame.data, &enrichment);

    ProcessedFrame {
        seq: frame.seq,
        merged_data,
        metadata: FrameMetadata {
            annotations,
            enrichment_server_id: worker_id,
            processing_time_ms: start.elapsed().as_millis() as u64,
        },
    }
}

Notice how each ProcessedFrame retains its original seq number. This is the thread we'll use to stitch everything back together in order.

Step 4: Ordered Output — The Resequencing Buffer

This is the trickiest part. Frames fan out to workers, and faster workers finish first. Frame 10 might complete before frame 8. But we must stream them out as 8, 9, 10.

The solution is a resequencing buffer: a small data structure that holds completed frames and releases them in order.

use std::collections::BTreeMap;

/// Collects out-of-order results and yields them in sequence.
struct ResequenceBuffer {
    buffer: BTreeMap<u64, ProcessedFrame>,
    next_expected: u64,
}

impl ResequenceBuffer {
    fn new(start_seq: u64) -> Self {
        Self {
            buffer: BTreeMap::new(),
            next_expected: start_seq,
        }
    }

    /// Insert a processed frame. Returns all frames that are now
    /// ready to be streamed out (a contiguous run starting from next_expected).
    fn insert(&mut self, frame: ProcessedFrame) -> Vec<ProcessedFrame> {
        self.buffer.insert(frame.seq, frame);

        let mut ready = Vec::new();
        while let Some(entry) = self.buffer.remove(&self.next_expected) {
            ready.push(entry);
            self.next_expected += 1;
        }
        ready
    }
}

The beauty of BTreeMap here is that it keeps entries sorted by key, so checking "is the next expected frame here?" is an O(log n) operation. In practice, this buffer stays very small — it only grows when there's high variance in processing time across workers.

Resequencing Buffer — Restoring Order
ARRIVING (out of order) #10 #8 #11 #9 BTreeMap { next_expected: 8 } #8 #9 #10 #11 STREAMED OUT (in order) ✓ #8 #9 #10 #11

Step 5: The Output Streamer (Chronological Consumer)

Now we wire the resequencing buffer into a consumer that reads from the results channel:

async fn run_output_stream(mut result_rx: mpsc::Receiver<ProcessedFrame>) {
    let mut resequencer = ResequenceBuffer::new(1); // frames start at seq=1

    while let Some(processed) = result_rx.recv().await {
        let ready_frames = resequencer.insert(processed);

        for frame in ready_frames {
            // These are guaranteed to be in chronological order
            stream_out_frame(&frame).await;
            stream_out_metadata(&frame.metadata).await;

            println!(
                "Streamed out frame {} (processed by worker {})",
                frame.seq, frame.metadata.enrichment_server_id
            );
        }
    }

    println!("Output stream complete.");
}

Putting It All Together

Here's the full pipeline, wired up in main:

#[tokio::main]
async fn main() {
    let num_enrichment_servers = 4;

    // Channel 1: Producer -> Workers
    let (frame_tx, frame_rx) = mpsc::channel::<Frame>(30);

    // Channel 2: Workers -> Output Streamer
    let (result_tx, result_rx) = mpsc::channel::<ProcessedFrame>(30);

    // Spawn enrichment workers (consumers of frames, producers of results)
    spawn_enrichment_workers(num_enrichment_servers, frame_rx, result_tx);

    // Spawn the output streamer
    let streamer_handle = tokio::spawn(run_output_stream(result_rx));

    // Run the producer: ingest frames and send them into the pipeline
    let mut stream = std::pin::pin!(data_stream().await);
    while let Some(frame) = stream.next().await {
        if frame_tx.send(frame).await.is_err() {
            eprintln!("Pipeline closed unexpectedly");
            break;
        }
    }

    // Signal that no more frames are coming
    drop(frame_tx);

    // Wait for the output streamer to finish draining
    streamer_handle.await.expect("Output streamer task panicked");
}
Full Pipeline Architecture
Data Stream Producer (ingest) mpsc Enrichment Workers Worker 0 Worker 1 Worker 2 Worker 3 mpsc Output Streamer (resequence)

Lessons Learned

  1. Bounded channels are non-negotiable for streaming. Without backpressure, a fast producer will eat all your memory. The mpsc::channel(N) bound is your safety valve. Tuning N is a tradeoff: too small and you starve workers during processing spikes; too large and you buffer too many frames in memory.
  2. Carry the sequence number through the entire pipeline. This is cheap (it's a u64) and gives you total ordering at the end. Don't rely on arrival order at the results channel — parallel workers will finish in unpredictable order.
  3. The Arc<Mutex<Receiver>> pattern for work-stealing is simple but effective. Each worker locks the receiver just long enough to grab one frame, then releases. The actual heavy processing happens outside the lock. For most workloads, this contention is negligible compared to the processing time per frame.
  4. drop(sender) is your shutdown signal. When the producer drops its Sender, all recv() calls on the corresponding Receiver will eventually return None, naturally shutting down the pipeline without any extra coordination logic.

What's Next?

There are a few directions this could go:

Dynamic scaling — Spin up or shut down enrichment workers based on queue depth. mpsc::Sender::capacity() can tell you how much headroom remains.

Error recovery — What happens when an enrichment server goes down? You could re-enqueue the frame or mark it as dropped and adjust the resequencer.

Metrics & observability — Track per-worker throughput, resequence buffer depth, and end-to-end latency per frame.

The producer-consumer pattern with mpsc channels gives you a surprisingly clean foundation for all of this. Rust's ownership model ensures you can't accidentally share mutable state between workers, and async/await makes the control flow readable even when the concurrency is complex.

Happy streaming.

Back to all posts © 2026 Hokwang Choi