Source code

Revision control

Copy as Markdown

Other Tools

use crate::runtime::task::{self, unowned, Id, JoinHandle, OwnedTasks, Schedule, Task};
use crate::runtime::tests::NoopSchedule;
use std::collections::VecDeque;
use std::future::Future;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
struct AssertDropHandle {
is_dropped: Arc<AtomicBool>,
}
impl AssertDropHandle {
#[track_caller]
fn assert_dropped(&self) {
assert!(self.is_dropped.load(Ordering::SeqCst));
}
#[track_caller]
fn assert_not_dropped(&self) {
assert!(!self.is_dropped.load(Ordering::SeqCst));
}
}
struct AssertDrop {
is_dropped: Arc<AtomicBool>,
}
impl AssertDrop {
fn new() -> (Self, AssertDropHandle) {
let shared = Arc::new(AtomicBool::new(false));
(
AssertDrop {
is_dropped: shared.clone(),
},
AssertDropHandle {
is_dropped: shared.clone(),
},
)
}
}
impl Drop for AssertDrop {
fn drop(&mut self) {
self.is_dropped.store(true, Ordering::SeqCst);
}
}
// A Notified does not shut down on drop, but it is dropped once the ref-count
// hits zero.
#[test]
fn create_drop1() {
let (ad, handle) = AssertDrop::new();
let (notified, join) = unowned(
async {
drop(ad);
unreachable!()
},
NoopSchedule,
Id::next(),
);
drop(notified);
handle.assert_not_dropped();
drop(join);
handle.assert_dropped();
}
#[test]
fn create_drop2() {
let (ad, handle) = AssertDrop::new();
let (notified, join) = unowned(
async {
drop(ad);
unreachable!()
},
NoopSchedule,
Id::next(),
);
drop(join);
handle.assert_not_dropped();
drop(notified);
handle.assert_dropped();
}
#[test]
fn drop_abort_handle1() {
let (ad, handle) = AssertDrop::new();
let (notified, join) = unowned(
async {
drop(ad);
unreachable!()
},
NoopSchedule,
Id::next(),
);
let abort = join.abort_handle();
drop(join);
handle.assert_not_dropped();
drop(notified);
handle.assert_not_dropped();
drop(abort);
handle.assert_dropped();
}
#[test]
fn drop_abort_handle2() {
let (ad, handle) = AssertDrop::new();
let (notified, join) = unowned(
async {
drop(ad);
unreachable!()
},
NoopSchedule,
Id::next(),
);
let abort = join.abort_handle();
drop(notified);
handle.assert_not_dropped();
drop(abort);
handle.assert_not_dropped();
drop(join);
handle.assert_dropped();
}
#[test]
fn drop_abort_handle_clone() {
let (ad, handle) = AssertDrop::new();
let (notified, join) = unowned(
async {
drop(ad);
unreachable!()
},
NoopSchedule,
Id::next(),
);
let abort = join.abort_handle();
let abort_clone = abort.clone();
drop(join);
handle.assert_not_dropped();
drop(notified);
handle.assert_not_dropped();
drop(abort);
handle.assert_not_dropped();
drop(abort_clone);
handle.assert_dropped();
}
// Shutting down through Notified works
#[test]
fn create_shutdown1() {
let (ad, handle) = AssertDrop::new();
let (notified, join) = unowned(
async {
drop(ad);
unreachable!()
},
NoopSchedule,
Id::next(),
);
drop(join);
handle.assert_not_dropped();
notified.shutdown();
handle.assert_dropped();
}
#[test]
fn create_shutdown2() {
let (ad, handle) = AssertDrop::new();
let (notified, join) = unowned(
async {
drop(ad);
unreachable!()
},
NoopSchedule,
Id::next(),
);
handle.assert_not_dropped();
notified.shutdown();
handle.assert_dropped();
drop(join);
}
#[test]
fn unowned_poll() {
let (task, _) = unowned(async {}, NoopSchedule, Id::next());
task.run();
}
#[test]
fn schedule() {
with(|rt| {
rt.spawn(async {
crate::task::yield_now().await;
});
assert_eq!(2, rt.tick());
rt.shutdown();
})
}
#[test]
fn shutdown() {
with(|rt| {
rt.spawn(async {
loop {
crate::task::yield_now().await;
}
});
rt.tick_max(1);
rt.shutdown();
})
}
#[test]
fn shutdown_immediately() {
with(|rt| {
rt.spawn(async {
loop {
crate::task::yield_now().await;
}
});
rt.shutdown();
})
}
#[test]
fn spawn_during_shutdown() {
static DID_SPAWN: AtomicBool = AtomicBool::new(false);
struct SpawnOnDrop(Runtime);
impl Drop for SpawnOnDrop {
fn drop(&mut self) {
DID_SPAWN.store(true, Ordering::SeqCst);
self.0.spawn(async {});
}
}
with(|rt| {
let rt2 = rt.clone();
rt.spawn(async move {
let _spawn_on_drop = SpawnOnDrop(rt2);
loop {
crate::task::yield_now().await;
}
});
rt.tick_max(1);
rt.shutdown();
});
assert!(DID_SPAWN.load(Ordering::SeqCst));
}
fn with(f: impl FnOnce(Runtime)) {
struct Reset;
impl Drop for Reset {
fn drop(&mut self) {
let _rt = CURRENT.try_lock().unwrap().take();
}
}
let _reset = Reset;
let rt = Runtime(Arc::new(Inner {
owned: OwnedTasks::new(16),
core: Mutex::new(Core {
queue: VecDeque::new(),
}),
}));
*CURRENT.try_lock().unwrap() = Some(rt.clone());
f(rt)
}
#[derive(Clone)]
struct Runtime(Arc<Inner>);
struct Inner {
core: Mutex<Core>,
owned: OwnedTasks<Runtime>,
}
struct Core {
queue: VecDeque<task::Notified<Runtime>>,
}
static CURRENT: Mutex<Option<Runtime>> = Mutex::new(None);
impl Runtime {
fn spawn<T>(&self, future: T) -> JoinHandle<T::Output>
where
T: 'static + Send + Future,
T::Output: 'static + Send,
{
let (handle, notified) = self.0.owned.bind(future, self.clone(), Id::next());
if let Some(notified) = notified {
self.schedule(notified);
}
handle
}
fn tick(&self) -> usize {
self.tick_max(usize::MAX)
}
fn tick_max(&self, max: usize) -> usize {
let mut n = 0;
while !self.is_empty() && n < max {
let task = self.next_task();
n += 1;
let task = self.0.owned.assert_owner(task);
task.run();
}
n
}
fn is_empty(&self) -> bool {
self.0.core.try_lock().unwrap().queue.is_empty()
}
fn next_task(&self) -> task::Notified<Runtime> {
self.0.core.try_lock().unwrap().queue.pop_front().unwrap()
}
fn shutdown(&self) {
let mut core = self.0.core.try_lock().unwrap();
self.0.owned.close_and_shutdown_all(0);
while let Some(task) = core.queue.pop_back() {
drop(task);
}
drop(core);
assert!(self.0.owned.is_empty());
}
}
impl Schedule for Runtime {
fn release(&self, task: &Task<Self>) -> Option<Task<Self>> {
self.0.owned.remove(task)
}
fn schedule(&self, task: task::Notified<Self>) {
self.0.core.try_lock().unwrap().queue.push_back(task);
}
}