Source code
Revision control
Copy as Markdown
Other Tools
//! Uring file operations tests.
#![cfg(all(
tokio_unstable,
feature = "io-uring",
feature = "rt",
feature = "fs",
target_os = "linux"
))]
use futures::future::FutureExt;
use std::fs;
use std::future::poll_fn;
use std::task::Poll;
use tempfile::NamedTempFile;
#[tokio::test]
async fn file_descriptors_are_closed_when_cancelling_open_op() {
let tmp = NamedTempFile::new().unwrap();
let path = tmp.path().to_path_buf();
let fd_count_before_opens = fs::read_dir("/proc/self/fd").unwrap().count();
for _ in 0..128 {
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
let path = path.clone();
let handle = tokio::spawn(async move {
poll_fn(|cx| {
let opt = {
let mut opt = tokio::fs::OpenOptions::new();
opt.read(true);
opt
};
let fut = opt.open(&path);
// If io_uring is enabled (and not falling back to the thread pool),
// the first poll should return Pending. We don't check if the result
// is actually pending because we run some checks on old kernel that
// do not support uring.
let _pending = Box::pin(fut).poll_unpin(cx);
tx.send(()).unwrap();
Poll::<()>::Pending
})
.await;
});
// Wait for the first poll
rx.recv().await.unwrap();
handle.abort();
let res = handle.await.unwrap_err();
assert!(res.is_cancelled());
}
let fd_count_after_cancel = fs::read_dir("/proc/self/fd").unwrap().count();
let leaked = fd_count_after_cancel.saturating_sub(fd_count_before_opens);
// Since we are opening 128 files, we expect that the related fds
// related to this operation will be closed. Since some other fds
// can be opened in the meantime, we expect this number to be higher
// than the counter before opening the files. This number could be
// lower, but to avoid test flakiness we check that this is at most
// half the number of the file we opened to check if there's a leak.
assert!(leaked <= 64);
}