Source code
Revision control
Copy as Markdown
Other Tools
// These tests only work on Unix platforms because they rely on Unix pipes
// as a way of generating I/O events from within the same process.
//
// Also, Miri doesn't like it when you leak a thread, which will happen in
// the "deadlock" case below, so we skip these tests on Miri.
#![cfg(all(not(miri), unix, feature = "full"))]
use std::sync::mpsc::RecvTimeoutError;
use std::time::Duration;
/// Test that, without `enable_eager_driver_handoff`, we can reliably reproduce
/// a deadlock when a task blocks indefinitely. If this test fails, it means
/// that the test `eager_driver_handoff_fixes_deadlock` is not actually testing
/// a condition that can deadlock the runtime.
#[test]
fn deadlocks_consistently() {
let rt = rt_builder().build().unwrap();
assert_eq!(
do_test(rt),
Err(RecvTimeoutError::Timeout),
"runtime did not deadlock! the `eager_driver_handoff_fixes_deadlock` \
test may no longer reproduce the bug it is intended to test a fix \
for!",
);
}
/// This is the one that actually tests whether eager driver handoff works as
/// expected: it runs the same reproducer as `deadlocks_consistently` a single
/// timebut with `enable_eager_driver_handoff` enabled. If this test fails, it
/// means that the eager driver handoff fix is not working as expected.
#[test]
#[cfg(tokio_unstable)]
fn eager_driver_handoff_fixes_deadlock() {
let rt = rt_builder().enable_eager_driver_handoff().build().unwrap();
assert_eq!(
do_test(rt),
Ok(()),
"the runtime should not deadlock because the driver is \
eagerly handed off"
);
}
/// Reproduces a deadlock occurring when the worker thread holding the I/O or
/// time driver runs a task that blocks that worker until another task, which is
/// *waiting on the I/O or time driver*, performs an action that unblocks it.
/// This general class of problem is described in:
///
/// The deadlock occurs as follows:
///
/// 1. Both worker threads are idle. Worker A parks on the I/O driver (holding
/// the driver lock), Worker B parks on a condvar.
/// 2. An I/O event fires. Worker A (the driver holder) wakes up and processes the
/// I/O event, placing the woken task in its run queue.
/// 3. Worker A begins polling the task. The task blocks the worker thread
/// until it is woken by the other task, so Worker A never returns to
/// poll the I/O driver.
/// 4. Worker B remains parked on the condvar indefinitely — nobody calls
/// `unpark` on it, so it never wakes to take over the I/O driver.
/// 5. Subsequent I/O events are never processed because no worker is polling
/// the time driver. The runtime is wedged.
///
/// To trigger this reliably, the test uses two tasks which send and receive
/// on a pair of Unix pipes. We use pipes to reliably trigger I/O events from
/// within the same process.
///
/// The "bad" task's pipe is written to immediately, while the "good" task is
/// still waiting for a read on its pipe. Then, when it's woken, it writes to
/// the other pipe, waking the "good" task, and then blocks on a notification
/// from a blocking channel which is only written to by the "good" task.
///
/// Because this task first waits on a pipe read, this ensures that it is
/// first polled from the worker thread that has last parked on the I/O
/// driver, so when it blocks, it prevents the driver from making progress.
///
/// The specific scenario we've constructed here is, admittedly, somewhat
/// contrived: most Tokio applications aren't going to spawn tasks that attempt
/// to communicate using a `std::sync::mpsc` blocking channel. Instead, you can
/// imagine the blocking channel as a stand-in for some other operation that
/// blocks a thread until some event occurs, which is triggered by an operation
/// performed by the other task, which is waiting on an asynchronous event
/// before performing it. The task that blocks the runtime could, for example,
/// be waiting on a blocking syscall that completes only when the other task
/// does something.
fn do_test(rt: tokio::runtime::Runtime) -> Result<(), RecvTimeoutError> {
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::unix::pipe::pipe;
// A blocking MPSC from the standard library is used to wait for the test to
// complete within a reasonable amount of time, so that we can determine
// whether or not the runtime has deadlocked.
let (done_tx, done_rx) = std::sync::mpsc::channel();
std::thread::spawn(move || {
rt.block_on(async {
let (mut pipe1_tx, mut pipe1_rx) = pipe().expect("ceci n'est pas une pipe");
let (mut pipe2_tx, mut pipe2_rx) = pipe().expect("ceci n'est pas une pipe");
// Note that we use a *blocking* MPSC from the standard library
// here, since the entire purpose of the channel is to temporarily
// block a worker thread when calling `recv`.
let (deadlock_tx, deadlock_rx) = std::sync::mpsc::channel();
let bad_task = tokio::spawn(async move {
// Wait on a pipe for a bit to ensure that we end up on the worker
// holding the time driver.
let mut buf = [0u8; 1];
pipe1_rx.read_exact(&mut buf).await.unwrap();
// Okay, we have now definitely woken up on the worker
// thread that polled the I/0 driver. Now, block this
// worker thread until woken by the *other* task. If the
// other task's I/O driver notification wakes it up,
// this task will be unblocked and the runtime will keep running.
//
// However, if *this* task blocking the thread prevents the
// I/O driver from ever running, the runtime will be wedged forever.
pipe2_tx.write_all(&[2]).await.unwrap();
deadlock_rx.recv().unwrap();
});
let good_task = tokio::spawn(async move {
let mut buf = [0u8; 1];
pipe2_rx.read_exact(&mut buf).await.unwrap();
deadlock_tx.send(()).unwrap();
});
tokio::time::sleep(Duration::from_millis(100)).await;
pipe1_tx.write_all(&[1]).await.unwrap();
good_task.await.unwrap();
bad_task.await.unwrap();
});
done_tx.send(()).unwrap();
});
done_rx.recv_timeout(Duration::from_secs(10))
}
/// Base runtime builder for both tests in this module: two worker threads, time
/// driver enabled. This returns a `Builder`, rather than a `Runtime`, so that
/// the `eager_driver_handoff_fixes_deadlock` test can configure the runtime to
/// enable eager driver handoff before building it.
fn rt_builder() -> tokio::runtime::Builder {
let mut builder = tokio::runtime::Builder::new_multi_thread();
builder.enable_all().worker_threads(2);
builder
}