Source code

Revision control

Copy as Markdown

Other Tools

//! Unix pipe types.
use crate::io::interest::Interest;
use crate::io::{AsyncRead, AsyncWrite, PollEvented, ReadBuf, Ready};
use mio::unix::pipe as mio_pipe;
use std::fs::File;
use std::io::{self, Read, Write};
use std::os::unix::fs::OpenOptionsExt;
use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, FromRawFd, IntoRawFd, OwnedFd, RawFd};
use std::path::Path;
use std::pin::Pin;
use std::task::{Context, Poll};
cfg_io_util! {
use bytes::BufMut;
}
/// Creates a new anonymous Unix pipe.
///
/// This function will open a new pipe and associate both pipe ends with the default
/// event loop.
///
/// If you need to create a pipe for communication with a spawned process, you can
/// use [`Stdio::piped()`] instead.
///
/// [`Stdio::piped()`]: std::process::Stdio::piped
///
/// # Errors
///
/// If creating a pipe fails, this function will return with the related OS error.
///
/// # Examples
///
/// Create a pipe and pass the writing end to a spawned process.
///
/// ```no_run
/// use tokio::net::unix::pipe;
/// use tokio::process::Command;
/// # use tokio::io::AsyncReadExt;
/// # use std::error::Error;
///
/// # async fn dox() -> Result<(), Box<dyn Error>> {
/// let (tx, mut rx) = pipe::pipe()?;
/// let mut buffer = String::new();
///
/// let status = Command::new("echo")
/// .arg("Hello, world!")
/// .stdout(tx.into_blocking_fd()?)
/// .status();
/// rx.read_to_string(&mut buffer).await?;
///
/// assert!(status.await?.success());
/// assert_eq!(buffer, "Hello, world!\n");
/// # Ok(())
/// # }
/// ```
///
/// # Panics
///
/// This function panics if it is not called from within a runtime with
/// IO enabled.
///
/// The runtime is usually set implicitly when this function is called
/// from a future driven by a tokio runtime, otherwise runtime can be set
/// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
pub fn pipe() -> io::Result<(Sender, Receiver)> {
let (tx, rx) = mio_pipe::new()?;
Ok((Sender::from_mio(tx)?, Receiver::from_mio(rx)?))
}
/// Options and flags which can be used to configure how a FIFO file is opened.
///
/// This builder allows configuring how to create a pipe end from a FIFO file.
/// Generally speaking, when using `OpenOptions`, you'll first call [`new`],
/// then chain calls to methods to set each option, then call either
/// [`open_receiver`] or [`open_sender`], passing the path of the FIFO file you
/// are trying to open. This will give you a [`io::Result`] with a pipe end
/// inside that you can further operate on.
///
/// [`new`]: OpenOptions::new
/// [`open_receiver`]: OpenOptions::open_receiver
/// [`open_sender`]: OpenOptions::open_sender
///
/// # Examples
///
/// Opening a pair of pipe ends from a FIFO file:
///
/// ```no_run
/// use tokio::net::unix::pipe;
/// # use std::error::Error;
///
/// const FIFO_NAME: &str = "path/to/a/fifo";
///
/// # async fn dox() -> Result<(), Box<dyn Error>> {
/// let rx = pipe::OpenOptions::new().open_receiver(FIFO_NAME)?;
/// let tx = pipe::OpenOptions::new().open_sender(FIFO_NAME)?;
/// # Ok(())
/// # }
/// ```
///
/// Opening a [`Sender`] on Linux when you are sure the file is a FIFO:
///
/// ```ignore
/// use tokio::net::unix::pipe;
/// use nix::{unistd::mkfifo, sys::stat::Mode};
/// # use std::error::Error;
///
/// // Our program has exclusive access to this path.
/// const FIFO_NAME: &str = "path/to/a/new/fifo";
///
/// # async fn dox() -> Result<(), Box<dyn Error>> {
/// mkfifo(FIFO_NAME, Mode::S_IRWXU)?;
/// let tx = pipe::OpenOptions::new()
/// .read_write(true)
/// .unchecked(true)
/// .open_sender(FIFO_NAME)?;
/// # Ok(())
/// # }
/// ```
#[derive(Clone, Debug)]
pub struct OpenOptions {
#[cfg(target_os = "linux")]
read_write: bool,
unchecked: bool,
}
impl OpenOptions {
/// Creates a blank new set of options ready for configuration.
///
/// All options are initially set to `false`.
pub fn new() -> OpenOptions {
OpenOptions {
#[cfg(target_os = "linux")]
read_write: false,
unchecked: false,
}
}
/// Sets the option for read-write access.
///
/// This option, when true, will indicate that a FIFO file will be opened
/// in read-write access mode. This operation is not defined by the POSIX
/// standard and is only guaranteed to work on Linux.
///
/// # Examples
///
/// Opening a [`Sender`] even if there are no open reading ends:
///
/// ```ignore
/// use tokio::net::unix::pipe;
///
/// let tx = pipe::OpenOptions::new()
/// .read_write(true)
/// .open_sender("path/to/a/fifo");
/// ```
///
/// Opening a resilient [`Receiver`] i.e. a reading pipe end which will not
/// fail with [`UnexpectedEof`] during reading if all writing ends of the
/// pipe close the FIFO file.
///
/// [`UnexpectedEof`]: std::io::ErrorKind::UnexpectedEof
///
/// ```ignore
/// use tokio::net::unix::pipe;
///
/// let tx = pipe::OpenOptions::new()
/// .read_write(true)
/// .open_receiver("path/to/a/fifo");
/// ```
#[cfg(target_os = "linux")]
#[cfg_attr(docsrs, doc(cfg(target_os = "linux")))]
pub fn read_write(&mut self, value: bool) -> &mut Self {
self.read_write = value;
self
}
/// Sets the option to skip the check for FIFO file type.
///
/// By default, [`open_receiver`] and [`open_sender`] functions will check
/// if the opened file is a FIFO file. Set this option to `true` if you are
/// sure the file is a FIFO file.
///
/// [`open_receiver`]: OpenOptions::open_receiver
/// [`open_sender`]: OpenOptions::open_sender
///
/// # Examples
///
/// ```no_run
/// use tokio::net::unix::pipe;
/// use nix::{unistd::mkfifo, sys::stat::Mode};
/// # use std::error::Error;
///
/// // Our program has exclusive access to this path.
/// const FIFO_NAME: &str = "path/to/a/new/fifo";
///
/// # async fn dox() -> Result<(), Box<dyn Error>> {
/// mkfifo(FIFO_NAME, Mode::S_IRWXU)?;
/// let rx = pipe::OpenOptions::new()
/// .unchecked(true)
/// .open_receiver(FIFO_NAME)?;
/// # Ok(())
/// # }
/// ```
pub fn unchecked(&mut self, value: bool) -> &mut Self {
self.unchecked = value;
self
}
/// Creates a [`Receiver`] from a FIFO file with the options specified by `self`.
///
/// This function will open the FIFO file at the specified path, possibly
/// check if it is a pipe, and associate the pipe with the default event
/// loop for reading.
///
/// # Errors
///
/// If the file type check fails, this function will fail with `io::ErrorKind::InvalidInput`.
/// This function may also fail with other standard OS errors.
///
/// # Panics
///
/// This function panics if it is not called from within a runtime with
/// IO enabled.
///
/// The runtime is usually set implicitly when this function is called
/// from a future driven by a tokio runtime, otherwise runtime can be set
/// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
pub fn open_receiver<P: AsRef<Path>>(&self, path: P) -> io::Result<Receiver> {
let file = self.open(path.as_ref(), PipeEnd::Receiver)?;
Receiver::from_file_unchecked(file)
}
/// Creates a [`Sender`] from a FIFO file with the options specified by `self`.
///
/// This function will open the FIFO file at the specified path, possibly
/// check if it is a pipe, and associate the pipe with the default event
/// loop for writing.
///
/// # Errors
///
/// If the file type check fails, this function will fail with `io::ErrorKind::InvalidInput`.
/// If the file is not opened in read-write access mode and the file is not
/// currently open for reading, this function will fail with `ENXIO`.
/// This function may also fail with other standard OS errors.
///
/// # Panics
///
/// This function panics if it is not called from within a runtime with
/// IO enabled.
///
/// The runtime is usually set implicitly when this function is called
/// from a future driven by a tokio runtime, otherwise runtime can be set
/// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
pub fn open_sender<P: AsRef<Path>>(&self, path: P) -> io::Result<Sender> {
let file = self.open(path.as_ref(), PipeEnd::Sender)?;
Sender::from_file_unchecked(file)
}
fn open(&self, path: &Path, pipe_end: PipeEnd) -> io::Result<File> {
let mut options = std::fs::OpenOptions::new();
options
.read(pipe_end == PipeEnd::Receiver)
.write(pipe_end == PipeEnd::Sender)
.custom_flags(libc::O_NONBLOCK);
#[cfg(target_os = "linux")]
if self.read_write {
options.read(true).write(true);
}
let file = options.open(path)?;
if !self.unchecked && !is_pipe(file.as_fd())? {
return Err(io::Error::new(io::ErrorKind::InvalidInput, "not a pipe"));
}
Ok(file)
}
}
impl Default for OpenOptions {
fn default() -> OpenOptions {
OpenOptions::new()
}
}
#[derive(Clone, Copy, PartialEq, Eq, Debug)]
enum PipeEnd {
Sender,
Receiver,
}
/// Writing end of a Unix pipe.
///
/// It can be constructed from a FIFO file with [`OpenOptions::open_sender`].
///
/// Opening a named pipe for writing involves a few steps.
/// Call to [`OpenOptions::open_sender`] might fail with an error indicating
/// different things:
///
/// * [`io::ErrorKind::NotFound`] - There is no file at the specified path.
/// * [`io::ErrorKind::InvalidInput`] - The file exists, but it is not a FIFO.
/// * [`ENXIO`] - The file is a FIFO, but no process has it open for reading.
/// Sleep for a while and try again.
/// * Other OS errors not specific to opening FIFO files.
///
/// Opening a `Sender` from a FIFO file should look like this:
///
/// ```no_run
/// use tokio::net::unix::pipe;
/// use tokio::time::{self, Duration};
///
/// const FIFO_NAME: &str = "path/to/a/fifo";
///
/// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
/// // Wait for a reader to open the file.
/// let tx = loop {
/// match pipe::OpenOptions::new().open_sender(FIFO_NAME) {
/// Ok(tx) => break tx,
/// Err(e) if e.raw_os_error() == Some(libc::ENXIO) => {},
/// Err(e) => return Err(e.into()),
/// }
///
/// time::sleep(Duration::from_millis(50)).await;
/// };
/// # Ok(())
/// # }
/// ```
///
/// On Linux, it is possible to create a `Sender` without waiting in a sleeping
/// loop. This is done by opening a named pipe in read-write access mode with
/// `OpenOptions::read_write`. This way, a `Sender` can at the same time hold
/// both a writing end and a reading end, and the latter allows to open a FIFO
/// without [`ENXIO`] error since the pipe is open for reading as well.
///
/// `Sender` cannot be used to read from a pipe, so in practice the read access
/// is only used when a FIFO is opened. However, using a `Sender` in read-write
/// mode **may lead to lost data**, because written data will be dropped by the
/// system as soon as all pipe ends are closed. To avoid lost data you have to
/// make sure that a reading end has been opened before dropping a `Sender`.
///
/// Note that using read-write access mode with FIFO files is not defined by
/// the POSIX standard and it is only guaranteed to work on Linux.
///
/// ```ignore
/// use tokio::io::AsyncWriteExt;
/// use tokio::net::unix::pipe;
///
/// const FIFO_NAME: &str = "path/to/a/fifo";
///
/// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
/// let mut tx = pipe::OpenOptions::new()
/// .read_write(true)
/// .open_sender(FIFO_NAME)?;
///
/// // Asynchronously write to the pipe before a reader.
/// tx.write_all(b"hello world").await?;
/// # Ok(())
/// # }
/// ```
///
#[derive(Debug)]
pub struct Sender {
io: PollEvented<mio_pipe::Sender>,
}
impl Sender {
fn from_mio(mio_tx: mio_pipe::Sender) -> io::Result<Sender> {
let io = PollEvented::new_with_interest(mio_tx, Interest::WRITABLE)?;
Ok(Sender { io })
}
/// Creates a new `Sender` from a [`File`].
///
/// This function is intended to construct a pipe from a [`File`] representing
/// a special FIFO file. It will check if the file is a pipe and has write access,
/// set it in non-blocking mode and perform the conversion.
///
/// # Errors
///
/// Fails with `io::ErrorKind::InvalidInput` if the file is not a pipe or it
/// does not have write access. Also fails with any standard OS error if it occurs.
///
/// # Panics
///
/// This function panics if it is not called from within a runtime with
/// IO enabled.
///
/// The runtime is usually set implicitly when this function is called
/// from a future driven by a tokio runtime, otherwise runtime can be set
/// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
pub fn from_file(file: File) -> io::Result<Sender> {
Sender::from_owned_fd(file.into())
}
/// Creates a new `Sender` from an [`OwnedFd`].
///
/// This function is intended to construct a pipe from an [`OwnedFd`] representing
/// an anonymous pipe or a special FIFO file. It will check if the file descriptor
/// is a pipe and has write access, set it in non-blocking mode and perform the
/// conversion.
///
/// # Errors
///
/// Fails with `io::ErrorKind::InvalidInput` if the file descriptor is not a pipe
/// or it does not have write access. Also fails with any standard OS error if it
/// occurs.
///
/// # Panics
///
/// This function panics if it is not called from within a runtime with
/// IO enabled.
///
/// The runtime is usually set implicitly when this function is called
/// from a future driven by a tokio runtime, otherwise runtime can be set
/// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
pub fn from_owned_fd(owned_fd: OwnedFd) -> io::Result<Sender> {
if !is_pipe(owned_fd.as_fd())? {
return Err(io::Error::new(io::ErrorKind::InvalidInput, "not a pipe"));
}
let flags = get_file_flags(owned_fd.as_fd())?;
if has_write_access(flags) {
set_nonblocking(owned_fd.as_fd(), flags)?;
Sender::from_owned_fd_unchecked(owned_fd)
} else {
Err(io::Error::new(
io::ErrorKind::InvalidInput,
"not in O_WRONLY or O_RDWR access mode",
))
}
}
/// Creates a new `Sender` from a [`File`] without checking pipe properties.
///
/// This function is intended to construct a pipe from a File representing
/// a special FIFO file. The conversion assumes nothing about the underlying
/// file; it is left up to the user to make sure it is opened with write access,
/// represents a pipe and is set in non-blocking mode.
///
/// # Examples
///
/// ```no_run
/// use tokio::net::unix::pipe;
/// use std::fs::OpenOptions;
/// use std::os::unix::fs::{FileTypeExt, OpenOptionsExt};
/// # use std::error::Error;
///
/// const FIFO_NAME: &str = "path/to/a/fifo";
///
/// # async fn dox() -> Result<(), Box<dyn Error>> {
/// let file = OpenOptions::new()
/// .write(true)
/// .custom_flags(libc::O_NONBLOCK)
/// .open(FIFO_NAME)?;
/// if file.metadata()?.file_type().is_fifo() {
/// let tx = pipe::Sender::from_file_unchecked(file)?;
/// /* use the Sender */
/// }
/// # Ok(())
/// # }
/// ```
///
/// # Panics
///
/// This function panics if it is not called from within a runtime with
/// IO enabled.
///
/// The runtime is usually set implicitly when this function is called
/// from a future driven by a tokio runtime, otherwise runtime can be set
/// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
pub fn from_file_unchecked(file: File) -> io::Result<Sender> {
Sender::from_owned_fd_unchecked(file.into())
}
/// Creates a new `Sender` from an [`OwnedFd`] without checking pipe properties.
///
/// This function is intended to construct a pipe from an [`OwnedFd`] representing
/// an anonymous pipe or a special FIFO file. The conversion assumes nothing about
/// the underlying pipe; it is left up to the user to make sure that the file
/// descriptor represents the writing end of a pipe and the pipe is set in
/// non-blocking mode.
///
/// # Panics
///
/// This function panics if it is not called from within a runtime with
/// IO enabled.
///
/// The runtime is usually set implicitly when this function is called
/// from a future driven by a tokio runtime, otherwise runtime can be set
/// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
pub fn from_owned_fd_unchecked(owned_fd: OwnedFd) -> io::Result<Sender> {
// Safety: OwnedFd represents a valid, open file descriptor.
let mio_tx = unsafe { mio_pipe::Sender::from_raw_fd(owned_fd.into_raw_fd()) };
Sender::from_mio(mio_tx)
}
/// Waits for any of the requested ready states.
///
/// This function can be used instead of [`writable()`] to check the returned
/// ready set for [`Ready::WRITABLE`] and [`Ready::WRITE_CLOSED`] events.
///
/// The function may complete without the pipe being ready. This is a
/// false-positive and attempting an operation will return with
/// `io::ErrorKind::WouldBlock`. The function can also return with an empty
/// [`Ready`] set, so you should always check the returned value and possibly
/// wait again if the requested states are not set.
///
/// [`writable()`]: Self::writable
///
/// # Cancel safety
///
/// This method is cancel safe. Once a readiness event occurs, the method
/// will continue to return immediately until the readiness event is
/// consumed by an attempt to write that fails with `WouldBlock` or
/// `Poll::Pending`.
pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
let event = self.io.registration().readiness(interest).await?;
Ok(event.ready)
}
/// Waits for the pipe to become writable.
///
/// This function is equivalent to `ready(Interest::WRITABLE)` and is usually
/// paired with [`try_write()`].
///
/// [`try_write()`]: Self::try_write
///
/// # Examples
///
/// ```no_run
/// use tokio::net::unix::pipe;
/// use std::io;
///
/// #[tokio::main]
/// async fn main() -> io::Result<()> {
/// // Open a writing end of a fifo
/// let tx = pipe::OpenOptions::new().open_sender("path/to/a/fifo")?;
///
/// loop {
/// // Wait for the pipe to be writable
/// tx.writable().await?;
///
/// // Try to write data, this may still fail with `WouldBlock`
/// // if the readiness event is a false positive.
/// match tx.try_write(b"hello world") {
/// Ok(n) => {
/// break;
/// }
/// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
/// continue;
/// }
/// Err(e) => {
/// return Err(e.into());
/// }
/// }
/// }
///
/// Ok(())
/// }
/// ```
pub async fn writable(&self) -> io::Result<()> {
self.ready(Interest::WRITABLE).await?;
Ok(())
}
/// Polls for write readiness.
///
/// If the pipe is not currently ready for writing, this method will
/// store a clone of the `Waker` from the provided `Context`. When the pipe
/// becomes ready for writing, `Waker::wake` will be called on the waker.
///
/// Note that on multiple calls to `poll_write_ready` or `poll_write`, only
/// the `Waker` from the `Context` passed to the most recent call is
/// scheduled to receive a wakeup.
///
/// This function is intended for cases where creating and pinning a future
/// via [`writable`] is not feasible. Where possible, using [`writable`] is
/// preferred, as this supports polling from multiple tasks at once.
///
/// [`writable`]: Self::writable
///
/// # Return value
///
/// The function returns:
///
/// * `Poll::Pending` if the pipe is not ready for writing.
/// * `Poll::Ready(Ok(()))` if the pipe is ready for writing.
/// * `Poll::Ready(Err(e))` if an error is encountered.
///
/// # Errors
///
/// This function may encounter any standard I/O error except `WouldBlock`.
pub fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.io.registration().poll_write_ready(cx).map_ok(|_| ())
}
/// Tries to write a buffer to the pipe, returning how many bytes were
/// written.
///
/// The function will attempt to write the entire contents of `buf`, but
/// only part of the buffer may be written. If the length of `buf` is not
/// greater than `PIPE_BUF` (an OS constant, 4096 under Linux), then the
/// write is guaranteed to be atomic, i.e. either the entire content of
/// `buf` will be written or this method will fail with `WouldBlock`. There
/// is no such guarantee if `buf` is larger than `PIPE_BUF`.
///
/// This function is usually paired with [`writable`].
///
/// [`writable`]: Self::writable
///
/// # Return
///
/// If data is successfully written, `Ok(n)` is returned, where `n` is the
/// number of bytes written. If the pipe is not ready to write data,
/// `Err(io::ErrorKind::WouldBlock)` is returned.
///
/// # Examples
///
/// ```no_run
/// use tokio::net::unix::pipe;
/// use std::io;
///
/// #[tokio::main]
/// async fn main() -> io::Result<()> {
/// // Open a writing end of a fifo
/// let tx = pipe::OpenOptions::new().open_sender("path/to/a/fifo")?;
///
/// loop {
/// // Wait for the pipe to be writable
/// tx.writable().await?;
///
/// // Try to write data, this may still fail with `WouldBlock`
/// // if the readiness event is a false positive.
/// match tx.try_write(b"hello world") {
/// Ok(n) => {
/// break;
/// }
/// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
/// continue;
/// }
/// Err(e) => {
/// return Err(e.into());
/// }
/// }
/// }
///
/// Ok(())
/// }
/// ```
pub fn try_write(&self, buf: &[u8]) -> io::Result<usize> {
self.io
.registration()
.try_io(Interest::WRITABLE, || (&*self.io).write(buf))
}
/// Tries to write several buffers to the pipe, returning how many bytes
/// were written.
///
/// Data is written from each buffer in order, with the final buffer read
/// from possible being only partially consumed. This method behaves
/// equivalently to a single call to [`try_write()`] with concatenated
/// buffers.
///
/// If the total length of buffers is not greater than `PIPE_BUF` (an OS
/// constant, 4096 under Linux), then the write is guaranteed to be atomic,
/// i.e. either the entire contents of buffers will be written or this
/// method will fail with `WouldBlock`. There is no such guarantee if the
/// total length of buffers is greater than `PIPE_BUF`.
///
/// This function is usually paired with [`writable`].
///
/// [`try_write()`]: Self::try_write()
/// [`writable`]: Self::writable
///
/// # Return
///
/// If data is successfully written, `Ok(n)` is returned, where `n` is the
/// number of bytes written. If the pipe is not ready to write data,
/// `Err(io::ErrorKind::WouldBlock)` is returned.
///
/// # Examples
///
/// ```no_run
/// use tokio::net::unix::pipe;
/// use std::io;
///
/// #[tokio::main]
/// async fn main() -> io::Result<()> {
/// // Open a writing end of a fifo
/// let tx = pipe::OpenOptions::new().open_sender("path/to/a/fifo")?;
///
/// let bufs = [io::IoSlice::new(b"hello "), io::IoSlice::new(b"world")];
///
/// loop {
/// // Wait for the pipe to be writable
/// tx.writable().await?;
///
/// // Try to write data, this may still fail with `WouldBlock`
/// // if the readiness event is a false positive.
/// match tx.try_write_vectored(&bufs) {
/// Ok(n) => {
/// break;
/// }
/// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
/// continue;
/// }
/// Err(e) => {
/// return Err(e.into());
/// }
/// }
/// }
///
/// Ok(())
/// }
/// ```
pub fn try_write_vectored(&self, buf: &[io::IoSlice<'_>]) -> io::Result<usize> {
self.io
.registration()
.try_io(Interest::WRITABLE, || (&*self.io).write_vectored(buf))
}
/// Converts the pipe into an [`OwnedFd`] in blocking mode.
///
/// This function will deregister this pipe end from the event loop, set
/// it in blocking mode and perform the conversion.
pub fn into_blocking_fd(self) -> io::Result<OwnedFd> {
let fd = self.into_nonblocking_fd()?;
set_blocking(&fd)?;
Ok(fd)
}
/// Converts the pipe into an [`OwnedFd`] in nonblocking mode.
///
/// This function will deregister this pipe end from the event loop and
/// perform the conversion. The returned file descriptor will be in nonblocking
/// mode.
pub fn into_nonblocking_fd(self) -> io::Result<OwnedFd> {
let mio_pipe = self.io.into_inner()?;
// Safety: the pipe is now deregistered from the event loop
// and we are the only owner of this pipe end.
let owned_fd = unsafe { OwnedFd::from_raw_fd(mio_pipe.into_raw_fd()) };
Ok(owned_fd)
}
}
impl AsyncWrite for Sender {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
self.io.poll_write(cx, buf)
}
fn poll_write_vectored(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &[io::IoSlice<'_>],
) -> Poll<io::Result<usize>> {
self.io.poll_write_vectored(cx, bufs)
}
fn is_write_vectored(&self) -> bool {
true
}
fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
Poll::Ready(Ok(()))
}
fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
Poll::Ready(Ok(()))
}
}
impl AsRawFd for Sender {
fn as_raw_fd(&self) -> RawFd {
self.io.as_raw_fd()
}
}
impl AsFd for Sender {
fn as_fd(&self) -> BorrowedFd<'_> {
unsafe { BorrowedFd::borrow_raw(self.as_raw_fd()) }
}
}
/// Reading end of a Unix pipe.
///
/// It can be constructed from a FIFO file with [`OpenOptions::open_receiver`].
///
/// # Examples
///
/// Receiving messages from a named pipe in a loop:
///
/// ```no_run
/// use tokio::net::unix::pipe;
/// use tokio::io::{self, AsyncReadExt};
///
/// const FIFO_NAME: &str = "path/to/a/fifo";
///
/// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
/// let mut rx = pipe::OpenOptions::new().open_receiver(FIFO_NAME)?;
/// loop {
/// let mut msg = vec![0; 256];
/// match rx.read_exact(&mut msg).await {
/// Ok(_) => {
/// /* handle the message */
/// }
/// Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => {
/// // Writing end has been closed, we should reopen the pipe.
/// rx = pipe::OpenOptions::new().open_receiver(FIFO_NAME)?;
/// }
/// Err(e) => return Err(e.into()),
/// }
/// }
/// # }
/// ```
///
/// On Linux, you can use a `Receiver` in read-write access mode to implement
/// resilient reading from a named pipe. Unlike `Receiver` opened in read-only
/// mode, read from a pipe in read-write mode will not fail with `UnexpectedEof`
/// when the writing end is closed. This way, a `Receiver` can asynchronously
/// wait for the next writer to open the pipe.
///
/// You should not use functions waiting for EOF such as [`read_to_end`] with
/// a `Receiver` in read-write access mode, since it **may wait forever**.
/// `Receiver` in this mode also holds an open writing end, which prevents
/// receiving EOF.
///
/// To set the read-write access mode you can use `OpenOptions::read_write`.
/// Note that using read-write access mode with FIFO files is not defined by
/// the POSIX standard and it is only guaranteed to work on Linux.
///
/// ```ignore
/// use tokio::net::unix::pipe;
/// use tokio::io::AsyncReadExt;
/// # use std::error::Error;
///
/// const FIFO_NAME: &str = "path/to/a/fifo";
///
/// # async fn dox() -> Result<(), Box<dyn Error>> {
/// let mut rx = pipe::OpenOptions::new()
/// .read_write(true)
/// .open_receiver(FIFO_NAME)?;
/// loop {
/// let mut msg = vec![0; 256];
/// rx.read_exact(&mut msg).await?;
/// /* handle the message */
/// }
/// # }
/// ```
///
/// [`read_to_end`]: crate::io::AsyncReadExt::read_to_end
#[derive(Debug)]
pub struct Receiver {
io: PollEvented<mio_pipe::Receiver>,
}
impl Receiver {
fn from_mio(mio_rx: mio_pipe::Receiver) -> io::Result<Receiver> {
let io = PollEvented::new_with_interest(mio_rx, Interest::READABLE)?;
Ok(Receiver { io })
}
/// Creates a new `Receiver` from a [`File`].
///
/// This function is intended to construct a pipe from a [`File`] representing
/// a special FIFO file. It will check if the file is a pipe and has read access,
/// set it in non-blocking mode and perform the conversion.
///
/// # Errors
///
/// Fails with `io::ErrorKind::InvalidInput` if the file is not a pipe or it
/// does not have read access. Also fails with any standard OS error if it occurs.
///
/// # Panics
///
/// This function panics if it is not called from within a runtime with
/// IO enabled.
///
/// The runtime is usually set implicitly when this function is called
/// from a future driven by a tokio runtime, otherwise runtime can be set
/// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
pub fn from_file(file: File) -> io::Result<Receiver> {
Receiver::from_owned_fd(file.into())
}
/// Creates a new `Receiver` from an [`OwnedFd`].
///
/// This function is intended to construct a pipe from an [`OwnedFd`] representing
/// an anonymous pipe or a special FIFO file. It will check if the file descriptor
/// is a pipe and has read access, set it in non-blocking mode and perform the
/// conversion.
///
/// # Errors
///
/// Fails with `io::ErrorKind::InvalidInput` if the file descriptor is not a pipe
/// or it does not have read access. Also fails with any standard OS error if it
/// occurs.
///
/// # Panics
///
/// This function panics if it is not called from within a runtime with
/// IO enabled.
///
/// The runtime is usually set implicitly when this function is called
/// from a future driven by a tokio runtime, otherwise runtime can be set
/// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
pub fn from_owned_fd(owned_fd: OwnedFd) -> io::Result<Receiver> {
if !is_pipe(owned_fd.as_fd())? {
return Err(io::Error::new(io::ErrorKind::InvalidInput, "not a pipe"));
}
let flags = get_file_flags(owned_fd.as_fd())?;
if has_read_access(flags) {
set_nonblocking(owned_fd.as_fd(), flags)?;
Receiver::from_owned_fd_unchecked(owned_fd)
} else {
Err(io::Error::new(
io::ErrorKind::InvalidInput,
"not in O_RDONLY or O_RDWR access mode",
))
}
}
/// Creates a new `Receiver` from a [`File`] without checking pipe properties.
///
/// This function is intended to construct a pipe from a File representing
/// a special FIFO file. The conversion assumes nothing about the underlying
/// file; it is left up to the user to make sure it is opened with read access,
/// represents a pipe and is set in non-blocking mode.
///
/// # Examples
///
/// ```no_run
/// use tokio::net::unix::pipe;
/// use std::fs::OpenOptions;
/// use std::os::unix::fs::{FileTypeExt, OpenOptionsExt};
/// # use std::error::Error;
///
/// const FIFO_NAME: &str = "path/to/a/fifo";
///
/// # async fn dox() -> Result<(), Box<dyn Error>> {
/// let file = OpenOptions::new()
/// .read(true)
/// .custom_flags(libc::O_NONBLOCK)
/// .open(FIFO_NAME)?;
/// if file.metadata()?.file_type().is_fifo() {
/// let rx = pipe::Receiver::from_file_unchecked(file)?;
/// /* use the Receiver */
/// }
/// # Ok(())
/// # }
/// ```
///
/// # Panics
///
/// This function panics if it is not called from within a runtime with
/// IO enabled.
///
/// The runtime is usually set implicitly when this function is called
/// from a future driven by a tokio runtime, otherwise runtime can be set
/// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
pub fn from_file_unchecked(file: File) -> io::Result<Receiver> {
Receiver::from_owned_fd_unchecked(file.into())
}
/// Creates a new `Receiver` from an [`OwnedFd`] without checking pipe properties.
///
/// This function is intended to construct a pipe from an [`OwnedFd`] representing
/// an anonymous pipe or a special FIFO file. The conversion assumes nothing about
/// the underlying pipe; it is left up to the user to make sure that the file
/// descriptor represents the reading end of a pipe and the pipe is set in
/// non-blocking mode.
///
/// # Panics
///
/// This function panics if it is not called from within a runtime with
/// IO enabled.
///
/// The runtime is usually set implicitly when this function is called
/// from a future driven by a tokio runtime, otherwise runtime can be set
/// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
pub fn from_owned_fd_unchecked(owned_fd: OwnedFd) -> io::Result<Receiver> {
// Safety: OwnedFd represents a valid, open file descriptor.
let mio_rx = unsafe { mio_pipe::Receiver::from_raw_fd(owned_fd.into_raw_fd()) };
Receiver::from_mio(mio_rx)
}
/// Waits for any of the requested ready states.
///
/// This function can be used instead of [`readable()`] to check the returned
/// ready set for [`Ready::READABLE`] and [`Ready::READ_CLOSED`] events.
///
/// The function may complete without the pipe being ready. This is a
/// false-positive and attempting an operation will return with
/// `io::ErrorKind::WouldBlock`. The function can also return with an empty
/// [`Ready`] set, so you should always check the returned value and possibly
/// wait again if the requested states are not set.
///
/// [`readable()`]: Self::readable
///
/// # Cancel safety
///
/// This method is cancel safe. Once a readiness event occurs, the method
/// will continue to return immediately until the readiness event is
/// consumed by an attempt to read that fails with `WouldBlock` or
/// `Poll::Pending`.
pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
let event = self.io.registration().readiness(interest).await?;
Ok(event.ready)
}
/// Waits for the pipe to become readable.
///
/// This function is equivalent to `ready(Interest::READABLE)` and is usually
/// paired with [`try_read()`].
///
/// [`try_read()`]: Self::try_read()
///
/// # Examples
///
/// ```no_run
/// use tokio::net::unix::pipe;
/// use std::io;
///
/// #[tokio::main]
/// async fn main() -> io::Result<()> {
/// // Open a reading end of a fifo
/// let rx = pipe::OpenOptions::new().open_receiver("path/to/a/fifo")?;
///
/// let mut msg = vec![0; 1024];
///
/// loop {
/// // Wait for the pipe to be readable
/// rx.readable().await?;
///
/// // Try to read data, this may still fail with `WouldBlock`
/// // if the readiness event is a false positive.
/// match rx.try_read(&mut msg) {
/// Ok(n) => {
/// msg.truncate(n);
/// break;
/// }
/// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
/// continue;
/// }
/// Err(e) => {
/// return Err(e.into());
/// }
/// }
/// }
///
/// println!("GOT = {:?}", msg);
/// Ok(())
/// }
/// ```
pub async fn readable(&self) -> io::Result<()> {
self.ready(Interest::READABLE).await?;
Ok(())
}
/// Polls for read readiness.
///
/// If the pipe is not currently ready for reading, this method will
/// store a clone of the `Waker` from the provided `Context`. When the pipe
/// becomes ready for reading, `Waker::wake` will be called on the waker.
///
/// Note that on multiple calls to `poll_read_ready` or `poll_read`, only
/// the `Waker` from the `Context` passed to the most recent call is
/// scheduled to receive a wakeup.
///
/// This function is intended for cases where creating and pinning a future
/// via [`readable`] is not feasible. Where possible, using [`readable`] is
/// preferred, as this supports polling from multiple tasks at once.
///
/// [`readable`]: Self::readable
///
/// # Return value
///
/// The function returns:
///
/// * `Poll::Pending` if the pipe is not ready for reading.
/// * `Poll::Ready(Ok(()))` if the pipe is ready for reading.
/// * `Poll::Ready(Err(e))` if an error is encountered.
///
/// # Errors
///
/// This function may encounter any standard I/O error except `WouldBlock`.
pub fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.io.registration().poll_read_ready(cx).map_ok(|_| ())
}
/// Tries to read data from the pipe into the provided buffer, returning how
/// many bytes were read.
///
/// Reads any pending data from the pipe but does not wait for new data
/// to arrive. On success, returns the number of bytes read. Because
/// `try_read()` is non-blocking, the buffer does not have to be stored by
/// the async task and can exist entirely on the stack.
///
/// Usually [`readable()`] is used with this function.
///
/// [`readable()`]: Self::readable()
///
/// # Return
///
/// If data is successfully read, `Ok(n)` is returned, where `n` is the
/// number of bytes read. If `n` is `0`, then it can indicate one of two scenarios:
///
/// 1. The pipe's writing end is closed and will no longer write data.
/// 2. The specified buffer was 0 bytes in length.
///
/// If the pipe is not ready to read data,
/// `Err(io::ErrorKind::WouldBlock)` is returned.
///
/// # Examples
///
/// ```no_run
/// use tokio::net::unix::pipe;
/// use std::io;
///
/// #[tokio::main]
/// async fn main() -> io::Result<()> {
/// // Open a reading end of a fifo
/// let rx = pipe::OpenOptions::new().open_receiver("path/to/a/fifo")?;
///
/// let mut msg = vec![0; 1024];
///
/// loop {
/// // Wait for the pipe to be readable
/// rx.readable().await?;
///
/// // Try to read data, this may still fail with `WouldBlock`
/// // if the readiness event is a false positive.
/// match rx.try_read(&mut msg) {
/// Ok(n) => {
/// msg.truncate(n);
/// break;
/// }
/// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
/// continue;
/// }
/// Err(e) => {
/// return Err(e.into());
/// }
/// }
/// }
///
/// println!("GOT = {:?}", msg);
/// Ok(())
/// }
/// ```
pub fn try_read(&self, buf: &mut [u8]) -> io::Result<usize> {
self.io
.registration()
.try_io(Interest::READABLE, || (&*self.io).read(buf))
}
/// Tries to read data from the pipe into the provided buffers, returning
/// how many bytes were read.
///
/// Data is copied to fill each buffer in order, with the final buffer
/// written to possibly being only partially filled. This method behaves
/// equivalently to a single call to [`try_read()`] with concatenated
/// buffers.
///
/// Reads any pending data from the pipe but does not wait for new data
/// to arrive. On success, returns the number of bytes read. Because
/// `try_read_vectored()` is non-blocking, the buffer does not have to be
/// stored by the async task and can exist entirely on the stack.
///
/// Usually, [`readable()`] is used with this function.
///
/// [`try_read()`]: Self::try_read()
/// [`readable()`]: Self::readable()
///
/// # Return
///
/// If data is successfully read, `Ok(n)` is returned, where `n` is the
/// number of bytes read. `Ok(0)` indicates the pipe's writing end is
/// closed and will no longer write data. If the pipe is not ready to read
/// data `Err(io::ErrorKind::WouldBlock)` is returned.
///
/// # Examples
///
/// ```no_run
/// use tokio::net::unix::pipe;
/// use std::io;
///
/// #[tokio::main]
/// async fn main() -> io::Result<()> {
/// // Open a reading end of a fifo
/// let rx = pipe::OpenOptions::new().open_receiver("path/to/a/fifo")?;
///
/// loop {
/// // Wait for the pipe to be readable
/// rx.readable().await?;
///
/// // Creating the buffer **after** the `await` prevents it from
/// // being stored in the async task.
/// let mut buf_a = [0; 512];
/// let mut buf_b = [0; 1024];
/// let mut bufs = [
/// io::IoSliceMut::new(&mut buf_a),
/// io::IoSliceMut::new(&mut buf_b),
/// ];
///
/// // Try to read data, this may still fail with `WouldBlock`
/// // if the readiness event is a false positive.
/// match rx.try_read_vectored(&mut bufs) {
/// Ok(0) => break,
/// Ok(n) => {
/// println!("read {} bytes", n);
/// }
/// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
/// continue;
/// }
/// Err(e) => {
/// return Err(e.into());
/// }
/// }
/// }
///
/// Ok(())
/// }
/// ```
pub fn try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize> {
self.io
.registration()
.try_io(Interest::READABLE, || (&*self.io).read_vectored(bufs))
}
cfg_io_util! {
/// Tries to read data from the pipe into the provided buffer, advancing the
/// buffer's internal cursor, returning how many bytes were read.
///
/// Reads any pending data from the pipe but does not wait for new data
/// to arrive. On success, returns the number of bytes read. Because
/// `try_read_buf()` is non-blocking, the buffer does not have to be stored by
/// the async task and can exist entirely on the stack.
///
/// Usually, [`readable()`] or [`ready()`] is used with this function.
///
/// [`readable()`]: Self::readable
/// [`ready()`]: Self::ready
///
/// # Return
///
/// If data is successfully read, `Ok(n)` is returned, where `n` is the
/// number of bytes read. `Ok(0)` indicates the pipe's writing end is
/// closed and will no longer write data. If the pipe is not ready to read
/// data `Err(io::ErrorKind::WouldBlock)` is returned.
///
/// # Examples
///
/// ```no_run
/// use tokio::net::unix::pipe;
/// use std::io;
///
/// #[tokio::main]
/// async fn main() -> io::Result<()> {
/// // Open a reading end of a fifo
/// let rx = pipe::OpenOptions::new().open_receiver("path/to/a/fifo")?;
///
/// loop {
/// // Wait for the pipe to be readable
/// rx.readable().await?;
///
/// let mut buf = Vec::with_capacity(4096);
///
/// // Try to read data, this may still fail with `WouldBlock`
/// // if the readiness event is a false positive.
/// match rx.try_read_buf(&mut buf) {
/// Ok(0) => break,
/// Ok(n) => {
/// println!("read {} bytes", n);
/// }
/// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
/// continue;
/// }
/// Err(e) => {
/// return Err(e.into());
/// }
/// }
/// }
///
/// Ok(())
/// }
/// ```
pub fn try_read_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> {
self.io.registration().try_io(Interest::READABLE, || {
use std::io::Read;
let dst = buf.chunk_mut();
let dst =
unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) };
// Safety: `mio_pipe::Receiver` uses a `std::fs::File` underneath,
// which correctly handles reads into uninitialized memory.
let n = (&*self.io).read(dst)?;
unsafe {
buf.advance_mut(n);
}
Ok(n)
})
}
}
/// Converts the pipe into an [`OwnedFd`] in blocking mode.
///
/// This function will deregister this pipe end from the event loop, set
/// it in blocking mode and perform the conversion.
pub fn into_blocking_fd(self) -> io::Result<OwnedFd> {
let fd = self.into_nonblocking_fd()?;
set_blocking(&fd)?;
Ok(fd)
}
/// Converts the pipe into an [`OwnedFd`] in nonblocking mode.
///
/// This function will deregister this pipe end from the event loop and
/// perform the conversion. Returned file descriptor will be in nonblocking
/// mode.
pub fn into_nonblocking_fd(self) -> io::Result<OwnedFd> {
let mio_pipe = self.io.into_inner()?;
// Safety: the pipe is now deregistered from the event loop
// and we are the only owner of this pipe end.
let owned_fd = unsafe { OwnedFd::from_raw_fd(mio_pipe.into_raw_fd()) };
Ok(owned_fd)
}
}
impl AsyncRead for Receiver {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
// Safety: `mio_pipe::Receiver` uses a `std::fs::File` underneath,
// which correctly handles reads into uninitialized memory.
unsafe { self.io.poll_read(cx, buf) }
}
}
impl AsRawFd for Receiver {
fn as_raw_fd(&self) -> RawFd {
self.io.as_raw_fd()
}
}
impl AsFd for Receiver {
fn as_fd(&self) -> BorrowedFd<'_> {
unsafe { BorrowedFd::borrow_raw(self.as_raw_fd()) }
}
}
/// Checks if the file descriptor is a pipe or a FIFO.
fn is_pipe(fd: BorrowedFd<'_>) -> io::Result<bool> {
// Safety: `libc::stat` is C-like struct used for syscalls and all-zero
// byte pattern forms a valid value.
let mut stat: libc::stat = unsafe { std::mem::zeroed() };
// Safety: it's safe to call `fstat` with a valid, open file descriptor
// and a valid pointer to a `stat` struct.
let r = unsafe { libc::fstat(fd.as_raw_fd(), &mut stat) };
if r == -1 {
Err(io::Error::last_os_error())
} else {
Ok((stat.st_mode as libc::mode_t & libc::S_IFMT) == libc::S_IFIFO)
}
}
/// Gets file descriptor's flags by fcntl.
fn get_file_flags(fd: BorrowedFd<'_>) -> io::Result<libc::c_int> {
// Safety: it's safe to use `fcntl` to read flags of a valid, open file descriptor.
let flags = unsafe { libc::fcntl(fd.as_raw_fd(), libc::F_GETFL) };
if flags < 0 {
Err(io::Error::last_os_error())
} else {
Ok(flags)
}
}
/// Checks for `O_RDONLY` or `O_RDWR` access mode.
fn has_read_access(flags: libc::c_int) -> bool {
let mode = flags & libc::O_ACCMODE;
mode == libc::O_RDONLY || mode == libc::O_RDWR
}
/// Checks for `O_WRONLY` or `O_RDWR` access mode.
fn has_write_access(flags: libc::c_int) -> bool {
let mode = flags & libc::O_ACCMODE;
mode == libc::O_WRONLY || mode == libc::O_RDWR
}
/// Sets file descriptor's flags with `O_NONBLOCK` by fcntl.
fn set_nonblocking(fd: BorrowedFd<'_>, current_flags: libc::c_int) -> io::Result<()> {
let flags = current_flags | libc::O_NONBLOCK;
if flags != current_flags {
// Safety: it's safe to use `fcntl` to set the `O_NONBLOCK` flag of a valid,
// open file descriptor.
let ret = unsafe { libc::fcntl(fd.as_raw_fd(), libc::F_SETFL, flags) };
if ret < 0 {
return Err(io::Error::last_os_error());
}
}
Ok(())
}
/// Removes `O_NONBLOCK` from fd's flags.
fn set_blocking<T: AsRawFd>(fd: &T) -> io::Result<()> {
// Safety: it's safe to use `fcntl` to read flags of a valid, open file descriptor.
let previous = unsafe { libc::fcntl(fd.as_raw_fd(), libc::F_GETFL) };
if previous == -1 {
return Err(io::Error::last_os_error());
}
let new = previous & !libc::O_NONBLOCK;
// Safety: it's safe to use `fcntl` to unset the `O_NONBLOCK` flag of a valid,
// open file descriptor.
let r = unsafe { libc::fcntl(fd.as_raw_fd(), libc::F_SETFL, new) };
if r == -1 {
Err(io::Error::last_os_error())
} else {
Ok(())
}
}