Source code

Revision control

Copy as Markdown

Other Tools

#![warn(rust_2018_idioms)]
#![cfg(all(feature = "full", not(target_os = "wasi")))] // Wasi doesn't support panic recovery
use std::sync::Arc;
use std::thread::sleep;
use tokio::time::Duration;
use tokio::runtime::Builder;
#[cfg(panic = "unwind")]
struct PanicOnDrop;
#[cfg(panic = "unwind")]
impl Drop for PanicOnDrop {
fn drop(&mut self) {
panic!("Well what did you expect would happen...");
}
}
/// Checks that a suspended task can be aborted without panicking as reported in
#[test]
fn test_abort_without_panic_3157() {
let rt = Builder::new_multi_thread()
.enable_time()
.worker_threads(1)
.build()
.unwrap();
rt.block_on(async move {
let handle = tokio::spawn(async move { tokio::time::sleep(Duration::new(100, 0)).await });
// wait for task to sleep.
tokio::time::sleep(Duration::from_millis(10)).await;
handle.abort();
let _ = handle.await;
});
}
/// Checks that a suspended task can be aborted inside of a current_thread
/// executor without panicking as reported in issue #3662:
#[test]
fn test_abort_without_panic_3662() {
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
struct DropCheck(Arc<AtomicBool>);
impl Drop for DropCheck {
fn drop(&mut self) {
self.0.store(true, Ordering::SeqCst);
}
}
let rt = Builder::new_current_thread().build().unwrap();
rt.block_on(async move {
let drop_flag = Arc::new(AtomicBool::new(false));
let drop_check = DropCheck(drop_flag.clone());
let j = tokio::spawn(async move {
// NB: just grab the drop check here so that it becomes part of the
// task.
let _drop_check = drop_check;
futures::future::pending::<()>().await;
});
let drop_flag2 = drop_flag.clone();
let task = std::thread::spawn(move || {
// This runs in a separate thread so it doesn't have immediate
// thread-local access to the executor. It does however transition
// the underlying task to be completed, which will cause it to be
// dropped (but not in this thread).
assert!(!drop_flag2.load(Ordering::SeqCst));
j.abort();
j
})
.join()
.unwrap();
let result = task.await;
assert!(drop_flag.load(Ordering::SeqCst));
assert!(result.unwrap_err().is_cancelled());
// Note: We do the following to trigger a deferred task cleanup.
//
// The relevant piece of code you want to look at is in:
// `Inner::block_on` of `scheduler/current_thread.rs`.
//
// We cause the cleanup to happen by having a poll return Pending once
// so that the scheduler can go into the "auxiliary tasks" mode, at
// which point the task is removed from the scheduler.
let i = tokio::spawn(async move {
tokio::task::yield_now().await;
});
i.await.unwrap();
});
}
/// Checks that a suspended LocalSet task can be aborted from a remote thread
/// without panicking and without running the tasks destructor on the wrong thread.
#[test]
fn remote_abort_local_set_3929() {
struct DropCheck {
created_on: std::thread::ThreadId,
not_send: std::marker::PhantomData<*const ()>,
}
impl DropCheck {
fn new() -> Self {
Self {
created_on: std::thread::current().id(),
not_send: std::marker::PhantomData,
}
}
}
impl Drop for DropCheck {
fn drop(&mut self) {
if std::thread::current().id() != self.created_on {
panic!("non-Send value dropped in another thread!");
}
}
}
let rt = Builder::new_current_thread().build().unwrap();
let local = tokio::task::LocalSet::new();
let check = DropCheck::new();
let jh = local.spawn_local(async move {
futures::future::pending::<()>().await;
drop(check);
});
let jh2 = std::thread::spawn(move || {
sleep(Duration::from_millis(10));
jh.abort();
});
rt.block_on(local);
jh2.join().unwrap();
}
/// Checks that a suspended task can be aborted even if the `JoinHandle` is immediately dropped.
#[test]
fn test_abort_wakes_task_3964() {
let rt = Builder::new_current_thread().enable_time().build().unwrap();
rt.block_on(async move {
let notify_dropped = Arc::new(());
let weak_notify_dropped = Arc::downgrade(&notify_dropped);
let handle = tokio::spawn(async move {
// Make sure the Arc is moved into the task
let _notify_dropped = notify_dropped;
tokio::time::sleep(Duration::new(100, 0)).await
});
// wait for task to sleep.
tokio::time::sleep(Duration::from_millis(10)).await;
handle.abort();
drop(handle);
// wait for task to abort.
tokio::time::sleep(Duration::from_millis(10)).await;
// Check that the Arc has been dropped.
assert!(weak_notify_dropped.upgrade().is_none());
});
}
/// Checks that aborting a task whose destructor panics does not allow the
/// panic to escape the task.
#[test]
#[cfg(panic = "unwind")]
fn test_abort_task_that_panics_on_drop_contained() {
let rt = Builder::new_current_thread().enable_time().build().unwrap();
rt.block_on(async move {
let handle = tokio::spawn(async move {
// Make sure the Arc is moved into the task
let _panic_dropped = PanicOnDrop;
tokio::time::sleep(Duration::new(100, 0)).await
});
// wait for task to sleep.
tokio::time::sleep(Duration::from_millis(10)).await;
handle.abort();
drop(handle);
// wait for task to abort.
tokio::time::sleep(Duration::from_millis(10)).await;
});
}
/// Checks that aborting a task whose destructor panics has the expected result.
#[test]
#[cfg(panic = "unwind")]
fn test_abort_task_that_panics_on_drop_returned() {
let rt = Builder::new_current_thread().enable_time().build().unwrap();
rt.block_on(async move {
let handle = tokio::spawn(async move {
// Make sure the Arc is moved into the task
let _panic_dropped = PanicOnDrop;
tokio::time::sleep(Duration::new(100, 0)).await
});
// wait for task to sleep.
tokio::time::sleep(Duration::from_millis(10)).await;
handle.abort();
assert!(handle.await.unwrap_err().is_panic());
});
}