The Problem
This is a follow-up to my previous post on producer-consumer pipelines. If you've built one of these with tokio, you've probably run into this exact situation.
You spawn a consumer task with tokio::spawn. It pulls items from an mpsc channel, does some heavy processing. At some point, the consumer hits an error — maybe a network call fails, maybe it gets bad data, maybe something panics. The task dies.
Back on the producer side, you're happily sending items into the channel. Then send() fails. You get a SendError telling you the channel is closed. And that's it. That's all the information you have.
The real error — the one that actually caused the failure — is trapped inside the JoinHandle of the spawned task. Unless you explicitly .await it, you'll never see it. You're debugging blind.
Here's a minimal example of the problem:
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let (tx, rx) = tokio::sync::mpsc::channel::<String>(16);
// Consumer — this will fail internally
let worker = tokio::spawn(async move {
consume(rx).await
});
// Producer — sends items until the channel breaks
for i in 0..100 {
let msg = format!("item-{i}");
if let Err(e) = tx.send(msg).await {
// All we know is: channel closed.
// WHY did it close? No idea.
return Err(anyhow::anyhow!("send failed: {e}"));
}
}
worker.await??;
Ok(())
}
async fn consume(
mut rx: tokio::sync::mpsc::Receiver<String>,
) -> anyhow::Result<()> {
while let Some(item) = rx.recv().await {
// Imagine this fails on the 5th item
process(&item).await?;
}
Ok(())
}
When process() returns an error on item 5, the consumer task exits. The Receiver drops. The producer's next send() gets SendError("item-6"). Your logs say "send failed" — but the real cause ("connection refused on enrichment server" or whatever) is silently swallowed.
The Fix: Intercept the JoinHandle
The idea is straightforward. When send() fails, instead of just reporting the channel error, we .await the consumer's JoinHandle to extract the real cause. If the consumer returned an Err, we surface that. If it panicked, we re-panic. Otherwise we fall back to the generic channel-closed message.
Here's a utility function that does this:
use tokio::task::JoinHandle;
/// When a channel send fails, this function awaits the consumer's
/// JoinHandle to recover the actual error that caused the channel
/// to close. If the task panicked, the panic is propagated.
pub async fn resolve_task_error<T>(
task: JoinHandle<anyhow::Result<T>>,
send_err: impl std::fmt::Display,
) -> anyhow::Error {
match task.await {
// The task returned an Err — this is the real cause
Ok(Err(task_err)) => {
anyhow::anyhow!("Worker task failed: {task_err}")
}
// The task panicked — propagate the panic
Err(join_err) if join_err.is_panic() => {
std::panic::resume_unwind(join_err.into_panic())
}
// Task finished Ok or was cancelled — channel closed for
// some other reason
_ => {
anyhow::anyhow!(
"Channel closed unexpectedly (send error: {send_err})"
)
}
}
}
Three branches, three cases:
- The task returned
Err(e). This is the common case — the consumer hit a real error. We wrap it and return it. Now the producer knows what actually went wrong. - The task panicked. We don't swallow panics — that would hide bugs.
resume_unwindre-raises the panic so it behaves the same as if it happened in the current task. - Anything else. The task completed
Okbut the channel still broke, or it was cancelled. Unusual, but we still give a useful message with the original send error for context.
Making It Ergonomic with a Macro
Calling resolve_task_error every time you send is verbose. You need to drop the sender, await the handle, return the error — it's a lot of boilerplate. A macro cleans this up:
#[macro_export]
macro_rules! channel_send {
($sender:expr, $value:expr, $task_handle:expr) => {
if let Err(send_err) = $sender.send($value).await {
// Drop the sender so the consumer's recv() returns None
// and it can shut down cleanly if it hasn't already.
drop($sender);
return Err(
resolve_task_error($task_handle, send_err).await
);
}
};
}
Now the producer side becomes much cleaner:
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let (tx, rx) = tokio::sync::mpsc::channel::<String>(16);
let worker = tokio::spawn(async move {
consume(rx).await
});
for i in 0..100 {
let msg = format!("item-{i}");
channel_send!(tx, msg, worker);
}
drop(tx);
worker.await??;
Ok(())
}
If the consumer dies on item 5, instead of getting "send failed: channel closed", you now get something like "Worker task failed: connection refused: enrichment-server-03:8443". You know exactly what happened and where to look.
The Subtle Bit: Why drop(sender) Matters
There's a detail in the macro that's easy to miss. Before we await the join handle, we drop($sender). Why?
If the consumer hasn't fully exited yet — say it's stuck in some cleanup code or a slow .await — it might still be blocking on recv(). As long as our sender is alive, recv() won't return None, so the consumer hangs. And then we'd deadlock: we're waiting for the consumer's handle to resolve, and the consumer is waiting for us to send or drop.
Dropping the sender first breaks the cycle. The consumer's recv() returns None, it exits, and the join handle resolves.
Multiple Consumers
In the previous post, we had multiple enrichment workers pulling from a shared receiver. The same pattern extends. You'd hold a Vec<JoinHandle> and check each one when a send fails:
/// Check multiple worker handles, returning the first real error found.
pub async fn resolve_first_task_error<T>(
tasks: Vec<JoinHandle<anyhow::Result<T>>>,
send_err: impl std::fmt::Display,
) -> anyhow::Error {
for task in tasks {
match task.await {
Ok(Err(task_err)) => {
return anyhow::anyhow!(
"Worker task failed: {task_err}"
);
}
Err(join_err) if join_err.is_panic() => {
std::panic::resume_unwind(join_err.into_panic())
}
_ => continue,
}
}
anyhow::anyhow!("All workers exited but channel closed: {send_err}")
}
This iterates through each handle. The first one that failed with a real error gets surfaced. If one panicked, we propagate it. If none of them explain the failure, we fall back to the generic message.
When This Pattern Shows Up
I've hit this in a few different shapes:
- Streaming pipelines — frame processor dies mid-stream, producer just sees the channel drop. This was the original motivation from the async streams work.
- Fan-out workers — one of N enrichment workers panics, the shared receiver drops, and the producer is left guessing which worker failed and why.
- Background flushers — you batch items and spawn a task to flush them somewhere (database, file, network). If the flusher dies, your main loop just sees a dead channel.
In all of these, the pattern is the same: the real error is on the consumer side, but you only observe the symptom on the producer side. Bridging that gap with a handle check turns a frustrating debugging session into a clear error message.
One More Thing: Panics
The resume_unwind call deserves a mention. When a tokio::spawn'd task panics, the panic is caught by the runtime and stored in the JoinError. If you just .await the handle, you get Err(JoinError) — which you might log and move on from. But the original panic payload is still there.
resume_unwind extracts that payload and re-panics. This means the panic behaves as if it happened in your current task — you get the same stack trace, the same abort behavior, the same panic hooks firing. You don't accidentally downgrade a panic into a logged error.
Rule of thumb: errors are for expected failures you can handle. Panics are for bugs. Don't convert one into the other — let errors propagate as errors and panics propagate as panics.
Takeaways
The "channel closed" problem is one of those things that's obvious in hindsight but wastes hours when you first encounter it. The fix is small — a function and a macro — but it changes the debugging experience from "something died somewhere" to "this specific thing failed for this specific reason."
If you're building producer-consumer pipelines with tokio, wire this in from the start.