Source code

Revision control

Copy as Markdown

Other Tools

#![cfg_attr(not(feature = "sync"), allow(dead_code, unreachable_pub))]
//! A multi-producer, multi-consumer channel that only retains the *last* sent
//! value.
//!
//! This channel is useful for watching for changes to a value from multiple
//! points in the code base, for example, changes to configuration values.
//!
//! # Usage
//!
//! [`channel`] returns a [`Sender`] / [`Receiver`] pair. These are the producer
//! and consumer halves of the channel. The channel is created with an initial
//! value.
//!
//! Each [`Receiver`] independently tracks the last value *seen* by its caller.
//!
//! To access the **current** value stored in the channel and mark it as *seen*
//! by a given [`Receiver`], use [`Receiver::borrow_and_update()`].
//!
//! To access the current value **without** marking it as *seen*, use
//! [`Receiver::borrow()`]. (If the value has already been marked *seen*,
//! [`Receiver::borrow()`] is equivalent to [`Receiver::borrow_and_update()`].)
//!
//! For more information on when to use these methods, see
//! [here](#borrow_and_update-versus-borrow).
//!
//! ## Change notifications
//!
//! The [`Receiver`] half provides an asynchronous [`changed`] method. This
//! method is ready when a new, *unseen* value is sent via the [`Sender`] half.
//!
//! * [`Receiver::changed()`] returns `Ok(())` on receiving a new value, or
//! `Err(`[`error::RecvError`]`)` if the [`Sender`] has been dropped.
//! * If the current value is *unseen* when calling [`changed`], then
//! [`changed`] will return immediately. If the current value is *seen*, then
//! it will sleep until either a new message is sent via the [`Sender`] half,
//! or the [`Sender`] is dropped.
//! * On completion, the [`changed`] method marks the new value as *seen*.
//! * At creation, the initial value is considered *seen*. In other words,
//! [`Receiver::changed()`] will not return until a subsequent value is sent.
//! * New [`Receiver`] instances can be created with [`Sender::subscribe()`].
//! The current value at the time the [`Receiver`] is created is considered
//! *seen*.
//!
//! ## `borrow_and_update` versus `borrow`
//!
//! If the receiver intends to await notifications from [`changed`] in a loop,
//! [`Receiver::borrow_and_update()`] should be preferred over
//! [`Receiver::borrow()`]. This avoids a potential race where a new value is
//! sent between [`changed`] being ready and the value being read. (If
//! [`Receiver::borrow()`] is used, the loop may run twice with the same value.)
//!
//! If the receiver is only interested in the current value, and does not intend
//! to wait for changes, then [`Receiver::borrow()`] can be used. It may be more
//! convenient to use [`borrow`](Receiver::borrow) since it's an `&self`
//! method---[`borrow_and_update`](Receiver::borrow_and_update) requires `&mut
//! self`.
//!
//! # Examples
//!
//! The following example prints `hello! world! `.
//!
//! ```
//! use tokio::sync::watch;
//! use tokio::time::{Duration, sleep};
//!
//! # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
//! let (tx, mut rx) = watch::channel("hello");
//!
//! tokio::spawn(async move {
//! // Use the equivalent of a "do-while" loop so the initial value is
//! // processed before awaiting the `changed()` future.
//! loop {
//! println!("{}! ", *rx.borrow_and_update());
//! if rx.changed().await.is_err() {
//! break;
//! }
//! }
//! });
//!
//! sleep(Duration::from_millis(100)).await;
//! tx.send("world")?;
//! # Ok(())
//! # }
//! ```
//!
//! # Closing
//!
//! [`Sender::is_closed`] and [`Sender::closed`] allow the producer to detect
//! when all [`Receiver`] handles have been dropped. This indicates that there
//! is no further interest in the values being produced and work can be stopped.
//!
//! The value in the channel will not be dropped until the sender and all
//! receivers have been dropped.
//!
//! # Thread safety
//!
//! Both [`Sender`] and [`Receiver`] are thread safe. They can be moved to other
//! threads and can be used in a concurrent environment. Clones of [`Receiver`]
//! handles may be moved to separate threads and also used concurrently.
//!
//! [`Sender`]: crate::sync::watch::Sender
//! [`Receiver`]: crate::sync::watch::Receiver
//! [`changed`]: crate::sync::watch::Receiver::changed
//! [`Receiver::changed()`]: crate::sync::watch::Receiver::changed
//! [`Receiver::borrow()`]: crate::sync::watch::Receiver::borrow
//! [`Receiver::borrow_and_update()`]:
//! crate::sync::watch::Receiver::borrow_and_update
//! [`channel`]: crate::sync::watch::channel
//! [`Sender::is_closed`]: crate::sync::watch::Sender::is_closed
//! [`Sender::closed`]: crate::sync::watch::Sender::closed
//! [`Sender::subscribe()`]: crate::sync::watch::Sender::subscribe
use crate::sync::notify::Notify;
use crate::loom::sync::atomic::AtomicUsize;
use crate::loom::sync::atomic::Ordering::{AcqRel, Relaxed};
use crate::loom::sync::{Arc, RwLock, RwLockReadGuard};
use std::fmt;
use std::mem;
use std::ops;
use std::panic;
/// Receives values from the associated [`Sender`](struct@Sender).
///
/// Instances are created by the [`channel`](fn@channel) function.
///
/// To turn this receiver into a `Stream`, you can use the [`WatchStream`]
/// wrapper.
///
#[derive(Debug)]
pub struct Receiver<T> {
/// Pointer to the shared state
shared: Arc<Shared<T>>,
/// Last observed version
version: Version,
}
/// Sends values to the associated [`Receiver`](struct@Receiver).
///
/// Instances are created by the [`channel`](fn@channel) function.
#[derive(Debug)]
pub struct Sender<T> {
shared: Arc<Shared<T>>,
}
impl<T> Clone for Sender<T> {
fn clone(&self) -> Self {
self.shared.ref_count_tx.fetch_add(1, Relaxed);
Self {
shared: self.shared.clone(),
}
}
}
impl<T: Default> Default for Sender<T> {
fn default() -> Self {
Self::new(T::default())
}
}
/// Returns a reference to the inner value.
///
/// Outstanding borrows hold a read lock on the inner value. This means that
/// long-lived borrows could cause the producer half to block. It is recommended
/// to keep the borrow as short-lived as possible. Additionally, if you are
/// running in an environment that allows `!Send` futures, you must ensure that
/// the returned `Ref` type is never held alive across an `.await` point,
/// otherwise, it can lead to a deadlock.
///
/// The priority policy of the lock is dependent on the underlying lock
/// implementation, and this type does not guarantee that any particular policy
/// will be used. In particular, a producer which is waiting to acquire the lock
/// in `send` might or might not block concurrent calls to `borrow`, e.g.:
///
/// <details><summary>Potential deadlock example</summary>
///
/// ```text
/// // Task 1 (on thread A) | // Task 2 (on thread B)
/// let _ref1 = rx.borrow(); |
/// | // will block
/// | let _ = tx.send(());
/// // may deadlock |
/// let _ref2 = rx.borrow(); |
/// ```
/// </details>
#[derive(Debug)]
pub struct Ref<'a, T> {
inner: RwLockReadGuard<'a, T>,
has_changed: bool,
}
impl<'a, T> Ref<'a, T> {
/// Indicates if the borrowed value is considered as _changed_ since the last
/// time it has been marked as seen.
///
/// Unlike [`Receiver::has_changed()`], this method does not fail if the channel is closed.
///
/// When borrowed from the [`Sender`] this function will always return `false`.
///
/// # Examples
///
/// ```
/// use tokio::sync::watch;
///
/// #[tokio::main]
/// async fn main() {
/// let (tx, mut rx) = watch::channel("hello");
///
/// tx.send("goodbye").unwrap();
/// // The sender does never consider the value as changed.
/// assert!(!tx.borrow().has_changed());
///
/// // Drop the sender immediately, just for testing purposes.
/// drop(tx);
///
/// // Even if the sender has already been dropped...
/// assert!(rx.has_changed().is_err());
/// // ...the modified value is still readable and detected as changed.
/// assert_eq!(*rx.borrow(), "goodbye");
/// assert!(rx.borrow().has_changed());
///
/// // Read the changed value and mark it as seen.
/// {
/// let received = rx.borrow_and_update();
/// assert_eq!(*received, "goodbye");
/// assert!(received.has_changed());
/// // Release the read lock when leaving this scope.
/// }
///
/// // Now the value has already been marked as seen and could
/// // never be modified again (after the sender has been dropped).
/// assert!(!rx.borrow().has_changed());
/// }
/// ```
pub fn has_changed(&self) -> bool {
self.has_changed
}
}
struct Shared<T> {
/// The most recent value.
value: RwLock<T>,
/// The current version.
///
/// The lowest bit represents a "closed" state. The rest of the bits
/// represent the current version.
state: AtomicState,
/// Tracks the number of `Receiver` instances.
ref_count_rx: AtomicUsize,
/// Tracks the number of `Sender` instances.
ref_count_tx: AtomicUsize,
/// Notifies waiting receivers that the value changed.
notify_rx: big_notify::BigNotify,
/// Notifies any task listening for `Receiver` dropped events.
notify_tx: Notify,
}
impl<T: fmt::Debug> fmt::Debug for Shared<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let state = self.state.load();
f.debug_struct("Shared")
.field("value", &self.value)
.field("version", &state.version())
.field("is_closed", &state.is_closed())
.field("ref_count_rx", &self.ref_count_rx)
.finish()
}
}
pub mod error {
//! Watch error types.
use std::error::Error;
use std::fmt;
/// Error produced when sending a value fails.
#[derive(PartialEq, Eq, Clone, Copy)]
pub struct SendError<T>(pub T);
// ===== impl SendError =====
impl<T> fmt::Debug for SendError<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("SendError").finish_non_exhaustive()
}
}
impl<T> fmt::Display for SendError<T> {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(fmt, "channel closed")
}
}
impl<T> Error for SendError<T> {}
/// Error produced when receiving a change notification.
#[derive(Debug, Clone)]
pub struct RecvError(pub(super) ());
// ===== impl RecvError =====
impl fmt::Display for RecvError {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(fmt, "channel closed")
}
}
impl Error for RecvError {}
}
mod big_notify {
use super::Notify;
use crate::sync::notify::Notified;
// To avoid contention on the lock inside the `Notify`, we store multiple
// copies of it. Then, we use either circular access or randomness to spread
// out threads over different `Notify` objects.
//
// Some simple benchmarks show that randomness performs slightly better than
// circular access (probably due to contention on `next`), so we prefer to
// use randomness when Tokio is compiled with a random number generator.
//
// When the random number generator is not available, we fall back to
// circular access.
pub(super) struct BigNotify {
#[cfg(not(all(not(loom), feature = "sync", any(feature = "rt", feature = "macros"))))]
next: std::sync::atomic::AtomicUsize,
inner: [Notify; 8],
}
impl BigNotify {
pub(super) fn new() -> Self {
Self {
#[cfg(not(all(
not(loom),
feature = "sync",
any(feature = "rt", feature = "macros")
)))]
next: std::sync::atomic::AtomicUsize::new(0),
inner: Default::default(),
}
}
pub(super) fn notify_waiters(&self) {
for notify in &self.inner {
notify.notify_waiters();
}
}
/// This function implements the case where randomness is not available.
#[cfg(not(all(not(loom), feature = "sync", any(feature = "rt", feature = "macros"))))]
pub(super) fn notified(&self) -> Notified<'_> {
let i = self.next.fetch_add(1, std::sync::atomic::Ordering::Relaxed) % 8;
self.inner[i].notified()
}
/// This function implements the case where randomness is available.
#[cfg(all(not(loom), feature = "sync", any(feature = "rt", feature = "macros")))]
pub(super) fn notified(&self) -> Notified<'_> {
let i = crate::runtime::context::thread_rng_n(8) as usize;
self.inner[i].notified()
}
}
}
use self::state::{AtomicState, Version};
mod state {
use crate::loom::sync::atomic::AtomicUsize;
use crate::loom::sync::atomic::Ordering;
const CLOSED_BIT: usize = 1;
// Using 2 as the step size preserves the `CLOSED_BIT`.
const STEP_SIZE: usize = 2;
/// The version part of the state. The lowest bit is always zero.
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub(super) struct Version(usize);
/// Snapshot of the state. The first bit is used as the CLOSED bit.
/// The remaining bits are used as the version.
///
/// The CLOSED bit tracks whether the Sender has been dropped. Dropping all
/// receivers does not set it.
#[derive(Copy, Clone, Debug)]
pub(super) struct StateSnapshot(usize);
/// The state stored in an atomic integer.
///
/// The `Sender` uses `Release` ordering for storing a new state
/// and the `Receiver`s use `Acquire` ordering for loading the
/// current state. This ensures that written values are seen by
/// the `Receiver`s for a proper handover.
#[derive(Debug)]
pub(super) struct AtomicState(AtomicUsize);
impl Version {
/// Decrements the version.
pub(super) fn decrement(&mut self) {
// Using a wrapping decrement here is required to ensure that the
// operation is consistent with `std::sync::atomic::AtomicUsize::fetch_add()`
// which wraps on overflow.
self.0 = self.0.wrapping_sub(STEP_SIZE);
}
pub(super) const INITIAL: Self = Version(0);
}
impl StateSnapshot {
/// Extract the version from the state.
pub(super) fn version(self) -> Version {
Version(self.0 & !CLOSED_BIT)
}
/// Is the closed bit set?
pub(super) fn is_closed(self) -> bool {
(self.0 & CLOSED_BIT) == CLOSED_BIT
}
}
impl AtomicState {
/// Create a new `AtomicState` that is not closed and which has the
/// version set to `Version::INITIAL`.
pub(super) fn new() -> Self {
AtomicState(AtomicUsize::new(Version::INITIAL.0))
}
/// Load the current value of the state.
///
/// Only used by the receiver and for debugging purposes.
///
/// The receiver side (read-only) uses `Acquire` ordering for a proper handover
/// of the shared value with the sender side (single writer). The state is always
/// updated after modifying and before releasing the (exclusive) lock on the
/// shared value.
pub(super) fn load(&self) -> StateSnapshot {
StateSnapshot(self.0.load(Ordering::Acquire))
}
/// Increment the version counter.
pub(super) fn increment_version_while_locked(&self) {
// Use `Release` ordering to ensure that the shared value
// has been written before updating the version. The shared
// value is still protected by an exclusive lock during this
// method.
self.0.fetch_add(STEP_SIZE, Ordering::Release);
}
/// Set the closed bit in the state.
pub(super) fn set_closed(&self) {
self.0.fetch_or(CLOSED_BIT, Ordering::Release);
}
}
}
/// Creates a new watch channel, returning the "send" and "receive" handles.
///
/// All values sent by [`Sender`] will become visible to the [`Receiver`] handles.
/// Only the last value sent is made available to the [`Receiver`] half. All
/// intermediate values are dropped.
///
/// # Examples
///
/// The following example prints `hello! world! `.
///
/// ```
/// use tokio::sync::watch;
/// use tokio::time::{Duration, sleep};
///
/// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
/// let (tx, mut rx) = watch::channel("hello");
///
/// tokio::spawn(async move {
/// // Use the equivalent of a "do-while" loop so the initial value is
/// // processed before awaiting the `changed()` future.
/// loop {
/// println!("{}! ", *rx.borrow_and_update());
/// if rx.changed().await.is_err() {
/// break;
/// }
/// }
/// });
///
/// sleep(Duration::from_millis(100)).await;
/// tx.send("world")?;
/// # Ok(())
/// # }
/// ```
///
/// [`Sender`]: struct@Sender
/// [`Receiver`]: struct@Receiver
pub fn channel<T>(init: T) -> (Sender<T>, Receiver<T>) {
let shared = Arc::new(Shared {
value: RwLock::new(init),
state: AtomicState::new(),
ref_count_rx: AtomicUsize::new(1),
ref_count_tx: AtomicUsize::new(1),
notify_rx: big_notify::BigNotify::new(),
notify_tx: Notify::new(),
});
let tx = Sender {
shared: shared.clone(),
};
let rx = Receiver {
shared,
version: Version::INITIAL,
};
(tx, rx)
}
impl<T> Receiver<T> {
fn from_shared(version: Version, shared: Arc<Shared<T>>) -> Self {
// No synchronization necessary as this is only used as a counter and
// not memory access.
shared.ref_count_rx.fetch_add(1, Relaxed);
Self { shared, version }
}
/// Returns a reference to the most recently sent value.
///
/// This method does not mark the returned value as seen, so future calls to
/// [`changed`] may return immediately even if you have already seen the
/// value with a call to `borrow`.
///
/// Outstanding borrows hold a read lock on the inner value. This means that
/// long-lived borrows could cause the producer half to block. It is recommended
/// to keep the borrow as short-lived as possible. Additionally, if you are
/// running in an environment that allows `!Send` futures, you must ensure that
/// the returned `Ref` type is never held alive across an `.await` point,
/// otherwise, it can lead to a deadlock.
///
/// The priority policy of the lock is dependent on the underlying lock
/// implementation, and this type does not guarantee that any particular policy
/// will be used. In particular, a producer which is waiting to acquire the lock
/// in `send` might or might not block concurrent calls to `borrow`, e.g.:
///
/// <details><summary>Potential deadlock example</summary>
///
/// ```text
/// // Task 1 (on thread A) | // Task 2 (on thread B)
/// let _ref1 = rx.borrow(); |
/// | // will block
/// | let _ = tx.send(());
/// // may deadlock |
/// let _ref2 = rx.borrow(); |
/// ```
/// </details>
///
/// For more information on when to use this method versus
/// [`borrow_and_update`], see [here](self#borrow_and_update-versus-borrow).
///
/// [`changed`]: Receiver::changed
/// [`borrow_and_update`]: Receiver::borrow_and_update
///
/// # Examples
///
/// ```
/// use tokio::sync::watch;
///
/// let (_, rx) = watch::channel("hello");
/// assert_eq!(*rx.borrow(), "hello");
/// ```
pub fn borrow(&self) -> Ref<'_, T> {
let inner = self.shared.value.read().unwrap();
// After obtaining a read-lock no concurrent writes could occur
// and the loaded version matches that of the borrowed reference.
let new_version = self.shared.state.load().version();
let has_changed = self.version != new_version;
Ref { inner, has_changed }
}
/// Returns a reference to the most recently sent value and marks that value
/// as seen.
///
/// This method marks the current value as seen. Subsequent calls to [`changed`]
/// will not return immediately until the [`Sender`] has modified the shared
/// value again.
///
/// Outstanding borrows hold a read lock on the inner value. This means that
/// long-lived borrows could cause the producer half to block. It is recommended
/// to keep the borrow as short-lived as possible. Additionally, if you are
/// running in an environment that allows `!Send` futures, you must ensure that
/// the returned `Ref` type is never held alive across an `.await` point,
/// otherwise, it can lead to a deadlock.
///
/// The priority policy of the lock is dependent on the underlying lock
/// implementation, and this type does not guarantee that any particular policy
/// will be used. In particular, a producer which is waiting to acquire the lock
/// in `send` might or might not block concurrent calls to `borrow`, e.g.:
///
/// <details><summary>Potential deadlock example</summary>
///
/// ```text
/// // Task 1 (on thread A) | // Task 2 (on thread B)
/// let _ref1 = rx1.borrow_and_update(); |
/// | // will block
/// | let _ = tx.send(());
/// // may deadlock |
/// let _ref2 = rx2.borrow_and_update(); |
/// ```
/// </details>
///
/// For more information on when to use this method versus [`borrow`], see
/// [here](self#borrow_and_update-versus-borrow).
///
/// [`changed`]: Receiver::changed
/// [`borrow`]: Receiver::borrow
pub fn borrow_and_update(&mut self) -> Ref<'_, T> {
let inner = self.shared.value.read().unwrap();
// After obtaining a read-lock no concurrent writes could occur
// and the loaded version matches that of the borrowed reference.
let new_version = self.shared.state.load().version();
let has_changed = self.version != new_version;
// Mark the shared value as seen by updating the version
self.version = new_version;
Ref { inner, has_changed }
}
/// Checks if this channel contains a message that this receiver has not yet
/// seen. The new value is not marked as seen.
///
/// Although this method is called `has_changed`, it does not check new
/// messages for equality, so this call will return true even if the new
/// message is equal to the old message.
///
/// Returns an error if the channel has been closed.
/// # Examples
///
/// ```
/// use tokio::sync::watch;
///
/// #[tokio::main]
/// async fn main() {
/// let (tx, mut rx) = watch::channel("hello");
///
/// tx.send("goodbye").unwrap();
///
/// assert!(rx.has_changed().unwrap());
/// assert_eq!(*rx.borrow_and_update(), "goodbye");
///
/// // The value has been marked as seen
/// assert!(!rx.has_changed().unwrap());
///
/// drop(tx);
/// // The `tx` handle has been dropped
/// assert!(rx.has_changed().is_err());
/// }
/// ```
pub fn has_changed(&self) -> Result<bool, error::RecvError> {
// Load the version from the state
let state = self.shared.state.load();
if state.is_closed() {
// The sender has dropped.
return Err(error::RecvError(()));
}
let new_version = state.version();
Ok(self.version != new_version)
}
/// Marks the state as changed.
///
/// After invoking this method [`has_changed()`](Self::has_changed)
/// returns `true` and [`changed()`](Self::changed) returns
/// immediately, regardless of whether a new value has been sent.
///
/// This is useful for triggering an initial change notification after
/// subscribing to synchronize new receivers.
pub fn mark_changed(&mut self) {
self.version.decrement();
}
/// Marks the state as unchanged.
///
/// The current value will be considered seen by the receiver.
///
/// This is useful if you are not interested in the current value
/// visible in the receiver.
pub fn mark_unchanged(&mut self) {
let current_version = self.shared.state.load().version();
self.version = current_version;
}
/// Waits for a change notification, then marks the newest value as seen.
///
/// If the newest value in the channel has not yet been marked seen when
/// this method is called, the method marks that value seen and returns
/// immediately. If the newest value has already been marked seen, then the
/// method sleeps until a new message is sent by the [`Sender`] connected to
/// this `Receiver`, or until the [`Sender`] is dropped.
///
/// This method returns an error if and only if the [`Sender`] is dropped.
///
/// For more information, see
/// [*Change notifications*](self#change-notifications) in the module-level documentation.
///
/// # Cancel safety
///
/// This method is cancel safe. If you use it as the event in a
/// [`tokio::select!`](crate::select) statement and some other branch
/// completes first, then it is guaranteed that no values have been marked
/// seen by this call to `changed`.
///
/// [`Sender`]: struct@Sender
///
/// # Examples
///
/// ```
/// use tokio::sync::watch;
///
/// #[tokio::main]
/// async fn main() {
/// let (tx, mut rx) = watch::channel("hello");
///
/// tokio::spawn(async move {
/// tx.send("goodbye").unwrap();
/// });
///
/// assert!(rx.changed().await.is_ok());
/// assert_eq!(*rx.borrow_and_update(), "goodbye");
///
/// // The `tx` handle has been dropped
/// assert!(rx.changed().await.is_err());
/// }
/// ```
pub async fn changed(&mut self) -> Result<(), error::RecvError> {
changed_impl(&self.shared, &mut self.version).await
}
/// Waits for a value that satisfies the provided condition.
///
/// This method will call the provided closure whenever something is sent on
/// the channel. Once the closure returns `true`, this method will return a
/// reference to the value that was passed to the closure.
///
/// Before `wait_for` starts waiting for changes, it will call the closure
/// on the current value. If the closure returns `true` when given the
/// current value, then `wait_for` will immediately return a reference to
/// the current value. This is the case even if the current value is already
/// considered seen.
///
/// The watch channel only keeps track of the most recent value, so if
/// several messages are sent faster than `wait_for` is able to call the
/// closure, then it may skip some updates. Whenever the closure is called,
/// it will be called with the most recent value.
///
/// When this function returns, the value that was passed to the closure
/// when it returned `true` will be considered seen.
///
/// If the channel is closed, then `wait_for` will return a `RecvError`.
/// Once this happens, no more messages can ever be sent on the channel.
/// When an error is returned, it is guaranteed that the closure has been
/// called on the last value, and that it returned `false` for that value.
/// (If the closure returned `true`, then the last value would have been
/// returned instead of the error.)
///
/// Like the `borrow` method, the returned borrow holds a read lock on the
/// inner value. This means that long-lived borrows could cause the producer
/// half to block. It is recommended to keep the borrow as short-lived as
/// possible. See the documentation of `borrow` for more information on
/// this.
///
/// [`Receiver::changed()`]: crate::sync::watch::Receiver::changed
///
/// # Examples
///
/// ```
/// use tokio::sync::watch;
///
/// #[tokio::main]
///
/// async fn main() {
/// let (tx, _rx) = watch::channel("hello");
///
/// tx.send("goodbye").unwrap();
///
/// // here we subscribe to a second receiver
/// // now in case of using `changed` we would have
/// // to first check the current value and then wait
/// // for changes or else `changed` would hang.
/// let mut rx2 = tx.subscribe();
///
/// // in place of changed we have use `wait_for`
/// // which would automatically check the current value
/// // and wait for changes until the closure returns true.
/// assert!(rx2.wait_for(|val| *val == "goodbye").await.is_ok());
/// assert_eq!(*rx2.borrow(), "goodbye");
/// }
/// ```
pub async fn wait_for(
&mut self,
mut f: impl FnMut(&T) -> bool,
) -> Result<Ref<'_, T>, error::RecvError> {
let mut closed = false;
loop {
{
let inner = self.shared.value.read().unwrap();
let new_version = self.shared.state.load().version();
let has_changed = self.version != new_version;
self.version = new_version;
if !closed || has_changed {
let result = panic::catch_unwind(panic::AssertUnwindSafe(|| f(&inner)));
match result {
Ok(true) => {
return Ok(Ref { inner, has_changed });
}
Ok(false) => {
// Skip the value.
}
Err(panicked) => {
// Drop the read-lock to avoid poisoning it.
drop(inner);
// Forward the panic to the caller.
panic::resume_unwind(panicked);
// Unreachable
}
};
}
}
if closed {
return Err(error::RecvError(()));
}
// Wait for the value to change.
closed = changed_impl(&self.shared, &mut self.version).await.is_err();
}
}
/// Returns `true` if receivers belong to the same channel.
///
/// # Examples
///
/// ```
/// let (tx, rx) = tokio::sync::watch::channel(true);
/// let rx2 = rx.clone();
/// assert!(rx.same_channel(&rx2));
///
/// let (tx3, rx3) = tokio::sync::watch::channel(true);
/// assert!(!rx3.same_channel(&rx2));
/// ```
pub fn same_channel(&self, other: &Self) -> bool {
Arc::ptr_eq(&self.shared, &other.shared)
}
cfg_process_driver! {
pub(crate) fn try_has_changed(&mut self) -> Option<Result<(), error::RecvError>> {
maybe_changed(&self.shared, &mut self.version)
}
}
}
fn maybe_changed<T>(
shared: &Shared<T>,
version: &mut Version,
) -> Option<Result<(), error::RecvError>> {
// Load the version from the state
let state = shared.state.load();
let new_version = state.version();
if *version != new_version {
// Observe the new version and return
*version = new_version;
return Some(Ok(()));
}
if state.is_closed() {
// The sender has been dropped.
return Some(Err(error::RecvError(())));
}
None
}
async fn changed_impl<T>(
shared: &Shared<T>,
version: &mut Version,
) -> Result<(), error::RecvError> {
crate::trace::async_trace_leaf().await;
loop {
// In order to avoid a race condition, we first request a notification,
// **then** check the current value's version. If a new version exists,
// the notification request is dropped.
let notified = shared.notify_rx.notified();
if let Some(ret) = maybe_changed(shared, version) {
return ret;
}
notified.await;
// loop around again in case the wake-up was spurious
}
}
impl<T> Clone for Receiver<T> {
fn clone(&self) -> Self {
let version = self.version;
let shared = self.shared.clone();
Self::from_shared(version, shared)
}
}
impl<T> Drop for Receiver<T> {
fn drop(&mut self) {
// No synchronization necessary as this is only used as a counter and
// not memory access.
if 1 == self.shared.ref_count_rx.fetch_sub(1, Relaxed) {
// This is the last `Receiver` handle, tasks waiting on `Sender::closed()`
self.shared.notify_tx.notify_waiters();
}
}
}
impl<T> Sender<T> {
/// Creates the sending-half of the [`watch`] channel.
///
/// See documentation of [`watch::channel`] for errors when calling this function.
/// Beware that attempting to send a value when there are no receivers will
/// return an error.
///
/// [`watch`]: crate::sync::watch
/// [`watch::channel`]: crate::sync::watch
///
/// # Examples
/// ```
/// let sender = tokio::sync::watch::Sender::new(0u8);
/// assert!(sender.send(3).is_err());
/// let _rec = sender.subscribe();
/// assert!(sender.send(4).is_ok());
/// ```
pub fn new(init: T) -> Self {
let (tx, _) = channel(init);
tx
}
/// Sends a new value via the channel, notifying all receivers.
///
/// This method fails if the channel is closed, which is the case when
/// every receiver has been dropped. It is possible to reopen the channel
/// using the [`subscribe`] method. However, when `send` fails, the value
/// isn't made available for future receivers (but returned with the
/// [`SendError`]).
///
/// To always make a new value available for future receivers, even if no
/// receiver currently exists, one of the other send methods
/// ([`send_if_modified`], [`send_modify`], or [`send_replace`]) can be
/// used instead.
///
/// [`subscribe`]: Sender::subscribe
/// [`SendError`]: error::SendError
/// [`send_if_modified`]: Sender::send_if_modified
/// [`send_modify`]: Sender::send_modify
/// [`send_replace`]: Sender::send_replace
pub fn send(&self, value: T) -> Result<(), error::SendError<T>> {
// This is pretty much only useful as a hint anyway, so synchronization isn't critical.
if 0 == self.receiver_count() {
return Err(error::SendError(value));
}
self.send_replace(value);
Ok(())
}
/// Modifies the watched value **unconditionally** in-place,
/// notifying all receivers.
///
/// This can be useful for modifying the watched value, without
/// having to allocate a new instance. Additionally, this
/// method permits sending values even when there are no receivers.
///
/// Prefer to use the more versatile function [`Self::send_if_modified()`]
/// if the value is only modified conditionally during the mutable borrow
/// to prevent unneeded change notifications for unmodified values.
///
/// # Panics
///
/// This function panics when the invocation of the `modify` closure panics.
/// No receivers are notified when panicking. All changes of the watched
/// value applied by the closure before panicking will be visible in
/// subsequent calls to `borrow`.
///
/// # Examples
///
/// ```
/// use tokio::sync::watch;
///
/// struct State {
/// counter: usize,
/// }
/// let (state_tx, state_rx) = watch::channel(State { counter: 0 });
/// state_tx.send_modify(|state| state.counter += 1);
/// assert_eq!(state_rx.borrow().counter, 1);
/// ```
pub fn send_modify<F>(&self, modify: F)
where
F: FnOnce(&mut T),
{
self.send_if_modified(|value| {
modify(value);
true
});
}
/// Modifies the watched value **conditionally** in-place,
/// notifying all receivers only if modified.
///
/// This can be useful for modifying the watched value, without
/// having to allocate a new instance. Additionally, this
/// method permits sending values even when there are no receivers.
///
/// The `modify` closure must return `true` if the value has actually
/// been modified during the mutable borrow. It should only return `false`
/// if the value is guaranteed to be unmodified despite the mutable
/// borrow.
///
/// Receivers are only notified if the closure returned `true`. If the
/// closure has modified the value but returned `false` this results
/// in a *silent modification*, i.e. the modified value will be visible
/// in subsequent calls to `borrow`, but receivers will not receive
/// a change notification.
///
/// Returns the result of the closure, i.e. `true` if the value has
/// been modified and `false` otherwise.
///
/// # Panics
///
/// This function panics when the invocation of the `modify` closure panics.
/// No receivers are notified when panicking. All changes of the watched
/// value applied by the closure before panicking will be visible in
/// subsequent calls to `borrow`.
///
/// # Examples
///
/// ```
/// use tokio::sync::watch;
///
/// struct State {
/// counter: usize,
/// }
/// let (state_tx, mut state_rx) = watch::channel(State { counter: 1 });
/// let inc_counter_if_odd = |state: &mut State| {
/// if state.counter % 2 == 1 {
/// state.counter += 1;
/// return true;
/// }
/// false
/// };
///
/// assert_eq!(state_rx.borrow().counter, 1);
///
/// assert!(!state_rx.has_changed().unwrap());
/// assert!(state_tx.send_if_modified(inc_counter_if_odd));
/// assert!(state_rx.has_changed().unwrap());
/// assert_eq!(state_rx.borrow_and_update().counter, 2);
///
/// assert!(!state_rx.has_changed().unwrap());
/// assert!(!state_tx.send_if_modified(inc_counter_if_odd));
/// assert!(!state_rx.has_changed().unwrap());
/// assert_eq!(state_rx.borrow_and_update().counter, 2);
/// ```
pub fn send_if_modified<F>(&self, modify: F) -> bool
where
F: FnOnce(&mut T) -> bool,
{
{
// Acquire the write lock and update the value.
let mut lock = self.shared.value.write().unwrap();
// Update the value and catch possible panic inside func.
let result = panic::catch_unwind(panic::AssertUnwindSafe(|| modify(&mut lock)));
match result {
Ok(modified) => {
if !modified {
// Abort, i.e. don't notify receivers if unmodified
return false;
}
// Continue if modified
}
Err(panicked) => {
// Drop the lock to avoid poisoning it.
drop(lock);
// Forward the panic to the caller.
panic::resume_unwind(panicked);
// Unreachable
}
};
self.shared.state.increment_version_while_locked();
// Release the write lock.
//
// Incrementing the version counter while holding the lock ensures
// that receivers are able to figure out the version number of the
// value they are currently looking at.
drop(lock);
}
self.shared.notify_rx.notify_waiters();
true
}
/// Sends a new value via the channel, notifying all receivers and returning
/// the previous value in the channel.
///
/// This can be useful for reusing the buffers inside a watched value.
/// Additionally, this method permits sending values even when there are no
/// receivers.
///
/// # Examples
///
/// ```
/// use tokio::sync::watch;
///
/// let (tx, _rx) = watch::channel(1);
/// assert_eq!(tx.send_replace(2), 1);
/// assert_eq!(tx.send_replace(3), 2);
/// ```
pub fn send_replace(&self, mut value: T) -> T {
// swap old watched value with the new one
self.send_modify(|old| mem::swap(old, &mut value));
value
}
/// Returns a reference to the most recently sent value
///
/// Outstanding borrows hold a read lock on the inner value. This means that
/// long-lived borrows could cause the producer half to block. It is recommended
/// to keep the borrow as short-lived as possible. Additionally, if you are
/// running in an environment that allows `!Send` futures, you must ensure that
/// the returned `Ref` type is never held alive across an `.await` point,
/// otherwise, it can lead to a deadlock.
///
/// # Examples
///
/// ```
/// use tokio::sync::watch;
///
/// let (tx, _) = watch::channel("hello");
/// assert_eq!(*tx.borrow(), "hello");
/// ```
pub fn borrow(&self) -> Ref<'_, T> {
let inner = self.shared.value.read().unwrap();
// The sender/producer always sees the current version
let has_changed = false;
Ref { inner, has_changed }
}
/// Checks if the channel has been closed. This happens when all receivers
/// have dropped.
///
/// # Examples
///
/// ```
/// let (tx, rx) = tokio::sync::watch::channel(());
/// assert!(!tx.is_closed());
///
/// drop(rx);
/// assert!(tx.is_closed());
/// ```
pub fn is_closed(&self) -> bool {
self.receiver_count() == 0
}
/// Completes when all receivers have dropped.
///
/// This allows the producer to get notified when interest in the produced
/// values is canceled and immediately stop doing work. Once a channel is
/// closed, the only way to reopen it is to call [`Sender::subscribe`] to
/// get a new receiver.
///
/// If the channel becomes closed for a brief amount of time (e.g., the last
/// receiver is dropped and then `subscribe` is called), then this call to
/// `closed` might return, but it is also possible that it does not "notice"
/// that the channel was closed for a brief amount of time.
///
/// # Cancel safety
///
/// This method is cancel safe.
///
/// # Examples
///
/// ```
/// use tokio::sync::watch;
///
/// #[tokio::main]
/// async fn main() {
/// let (tx, rx) = watch::channel("hello");
///
/// tokio::spawn(async move {
/// // use `rx`
/// drop(rx);
/// });
///
/// // Waits for `rx` to drop
/// tx.closed().await;
/// println!("the `rx` handles dropped")
/// }
/// ```
pub async fn closed(&self) {
crate::trace::async_trace_leaf().await;
while self.receiver_count() > 0 {
let notified = self.shared.notify_tx.notified();
if self.receiver_count() == 0 {
return;
}
notified.await;
// The channel could have been reopened in the meantime by calling
// `subscribe`, so we loop again.
}
}
/// Creates a new [`Receiver`] connected to this `Sender`.
///
/// All messages sent before this call to `subscribe` are initially marked
/// as seen by the new `Receiver`.
///
/// This method can be called even if there are no other receivers. In this
/// case, the channel is reopened.
///
/// # Examples
///
/// The new channel will receive messages sent on this `Sender`.
///
/// ```
/// use tokio::sync::watch;
///
/// #[tokio::main]
/// async fn main() {
/// let (tx, _rx) = watch::channel(0u64);
///
/// tx.send(5).unwrap();
///
/// let rx = tx.subscribe();
/// assert_eq!(5, *rx.borrow());
///
/// tx.send(10).unwrap();
/// assert_eq!(10, *rx.borrow());
/// }
/// ```
///
/// The most recent message is considered seen by the channel, so this test
/// is guaranteed to pass.
///
/// ```
/// use tokio::sync::watch;
/// use tokio::time::Duration;
///
/// #[tokio::main]
/// async fn main() {
/// let (tx, _rx) = watch::channel(0u64);
/// tx.send(5).unwrap();
/// let mut rx = tx.subscribe();
///
/// tokio::spawn(async move {
/// // by spawning and sleeping, the message is sent after `main`
/// // hits the call to `changed`.
/// # if false {
/// tokio::time::sleep(Duration::from_millis(10)).await;
/// # }
/// tx.send(100).unwrap();
/// });
///
/// rx.changed().await.unwrap();
/// assert_eq!(100, *rx.borrow());
/// }
/// ```
pub fn subscribe(&self) -> Receiver<T> {
let shared = self.shared.clone();
let version = shared.state.load().version();
// The CLOSED bit in the state tracks only whether the sender is
// dropped, so we do not need to unset it if this reopens the channel.
Receiver::from_shared(version, shared)
}
/// Returns the number of receivers that currently exist.
///
/// # Examples
///
/// ```
/// use tokio::sync::watch;
///
/// #[tokio::main]
/// async fn main() {
/// let (tx, rx1) = watch::channel("hello");
///
/// assert_eq!(1, tx.receiver_count());
///
/// let mut _rx2 = rx1.clone();
///
/// assert_eq!(2, tx.receiver_count());
/// }
/// ```
pub fn receiver_count(&self) -> usize {
self.shared.ref_count_rx.load(Relaxed)
}
/// Returns `true` if senders belong to the same channel.
///
/// # Examples
///
/// ```
/// let (tx, rx) = tokio::sync::watch::channel(true);
/// let tx2 = tx.clone();
/// assert!(tx.same_channel(&tx2));
///
/// let (tx3, rx3) = tokio::sync::watch::channel(true);
/// assert!(!tx3.same_channel(&tx2));
/// ```
pub fn same_channel(&self, other: &Self) -> bool {
Arc::ptr_eq(&self.shared, &other.shared)
}
}
impl<T> Drop for Sender<T> {
fn drop(&mut self) {
if self.shared.ref_count_tx.fetch_sub(1, AcqRel) == 1 {
self.shared.state.set_closed();
self.shared.notify_rx.notify_waiters();
}
}
}
// ===== impl Ref =====
impl<T> ops::Deref for Ref<'_, T> {
type Target = T;
fn deref(&self) -> &T {
self.inner.deref()
}
}
#[cfg(all(test, loom))]
mod tests {
use futures::future::FutureExt;
use loom::thread;
#[test]
fn watch_spurious_wakeup() {
loom::model(|| {
let (send, mut recv) = crate::sync::watch::channel(0i32);
send.send(1).unwrap();
let send_thread = thread::spawn(move || {
send.send(2).unwrap();
send
});
recv.changed().now_or_never();
let send = send_thread.join().unwrap();
let recv_thread = thread::spawn(move || {
recv.changed().now_or_never();
recv.changed().now_or_never();
recv
});
send.send(3).unwrap();
let mut recv = recv_thread.join().unwrap();
let send_thread = thread::spawn(move || {
send.send(2).unwrap();
});
recv.changed().now_or_never();
send_thread.join().unwrap();
});
}
#[test]
fn watch_borrow() {
loom::model(|| {
let (send, mut recv) = crate::sync::watch::channel(0i32);
assert!(send.borrow().eq(&0));
assert!(recv.borrow().eq(&0));
send.send(1).unwrap();
assert!(send.borrow().eq(&1));
let send_thread = thread::spawn(move || {
send.send(2).unwrap();
send
});
recv.changed().now_or_never();
let send = send_thread.join().unwrap();
let recv_thread = thread::spawn(move || {
recv.changed().now_or_never();
recv.changed().now_or_never();
recv
});
send.send(3).unwrap();
let recv = recv_thread.join().unwrap();
assert!(recv.borrow().eq(&3));
assert!(send.borrow().eq(&3));
send.send(2).unwrap();
thread::spawn(move || {
assert!(recv.borrow().eq(&2));
});
assert!(send.borrow().eq(&2));
});
}
}