barrier.rs |
|
7240 |
batch_semaphore.rs |
# Implementation Details.
The semaphore is implemented using an intrusive linked list of waiters. An
atomic counter tracks the number of available permits. If the semaphore does
not contain the required number of permits, the task attempting to acquire
permits places its waker at the end of a queue. When new permits are made
available (such as by releasing an initial acquisition), they are assigned
to the task at the front of the queue, waking that task if its requested
number of permits is met.
Because waiters are enqueued at the back of the linked list and dequeued
from the front, the semaphore is fair. Tasks trying to acquire large numbers
of permits at a time will always be woken eventually, even if many other
tasks are acquiring smaller numbers of permits. This means that in a
use-case like tokio's read-write lock, writers will not be starved by
readers. |
27297 |
broadcast.rs |
A multi-producer, multi-consumer broadcast queue. Each sent value is seen by
all consumers.
A [`Sender`] is used to broadcast values to **all** connected [`Receiver`]
values. [`Sender`] handles are clone-able, allowing concurrent send and
receive actions. [`Sender`] and [`Receiver`] are both `Send` and `Sync` as
long as `T` is `Send`.
When a value is sent, **all** [`Receiver`] handles are notified and will
receive the value. The value is stored once inside the channel and cloned on
demand for each receiver. Once all receivers have received a clone of the
value, the value is released from the channel.
A channel is created by calling [`channel`], specifying the maximum number
of messages the channel can retain at any given time.
New [`Receiver`] handles are created by calling [`Sender::subscribe`]. The
returned [`Receiver`] will receive values sent **after** the call to
`subscribe`.
This channel is also suitable for the single-producer multi-consumer
use-case, where a single sender broadcasts values to many receivers.
## Lagging
As sent messages must be retained until **all** [`Receiver`] handles receive
a clone, broadcast channels are susceptible to the "slow receiver" problem.
In this case, all but one receiver are able to receive values at the rate
they are sent. Because one receiver is stalled, the channel starts to fill
up.
This broadcast channel implementation handles this case by setting a hard
upper bound on the number of values the channel may retain at any given
time. This upper bound is passed to the [`channel`] function as an argument.
If a value is sent when the channel is at capacity, the oldest value
currently held by the channel is released. This frees up space for the new
value. Any receiver that has not yet seen the released value will return
[`RecvError::Lagged`] the next time [`recv`] is called.
Once [`RecvError::Lagged`] is returned, the lagging receiver's position is
updated to the oldest value contained by the channel. The next call to
[`recv`] will return this value.
This behavior enables a receiver to detect when it has lagged so far behind
that data has been dropped. The caller may decide how to respond to this:
either by aborting its task or by tolerating lost messages and resuming
consumption of the channel.
## Closing
When **all** [`Sender`] handles have been dropped, no new values may be
sent. At this point, the channel is "closed". Once a receiver has received
all values retained by the channel, the next call to [`recv`] will return
with [`RecvError::Closed`].
When a [`Receiver`] handle is dropped, any messages not read by the receiver
will be marked as read. If this receiver was the only one not to have read
that message, the message will be dropped at this point.
[`Sender`]: crate::sync::broadcast::Sender
[`Sender::subscribe`]: crate::sync::broadcast::Sender::subscribe
[`Receiver`]: crate::sync::broadcast::Receiver
[`channel`]: crate::sync::broadcast::channel
[`RecvError::Lagged`]: crate::sync::broadcast::error::RecvError::Lagged
[`RecvError::Closed`]: crate::sync::broadcast::error::RecvError::Closed
[`recv`]: crate::sync::broadcast::Receiver::recv
# Examples
Basic usage
```
use tokio::sync::broadcast;
#[tokio::main]
async fn main() {
let (tx, mut rx1) = broadcast::channel(16);
let mut rx2 = tx.subscribe();
tokio::spawn(async move {
assert_eq!(rx1.recv().await.unwrap(), 10);
assert_eq!(rx1.recv().await.unwrap(), 20);
});
tokio::spawn(async move {
assert_eq!(rx2.recv().await.unwrap(), 10);
assert_eq!(rx2.recv().await.unwrap(), 20);
});
tx.send(10).unwrap();
tx.send(20).unwrap();
}
```
Handling lag
```
use tokio::sync::broadcast;
#[tokio::main]
async fn main() {
let (tx, mut rx) = broadcast::channel(2);
tx.send(10).unwrap();
tx.send(20).unwrap();
tx.send(30).unwrap();
// The receiver lagged behind
assert!(rx.recv().await.is_err());
// At this point, we can abort or continue with lost messages
assert_eq!(20, rx.recv().await.unwrap());
assert_eq!(30, rx.recv().await.unwrap());
}
``` |
50189 |
mod.rs |
|
17022 |
mpsc |
|
|
mutex.rs |
|
46134 |
notify.rs |
|
44489 |
once_cell.rs |
|
17550 |
oneshot.rs |
A one-shot channel is used for sending a single message between
asynchronous tasks. The [`channel`] function is used to create a
[`Sender`] and [`Receiver`] handle pair that form the channel.
The `Sender` handle is used by the producer to send the value.
The `Receiver` handle is used by the consumer to receive the value.
Each handle can be used on separate tasks.
Since the `send` method is not async, it can be used anywhere. This includes
sending between two runtimes, and using it from non-async code.
If the [`Receiver`] is closed before receiving a message which has already
been sent, the message will remain in the channel until the receiver is
dropped, at which point the message will be dropped immediately.
# Examples
```
use tokio::sync::oneshot;
#[tokio::main]
async fn main() {
let (tx, rx) = oneshot::channel();
tokio::spawn(async move {
if let Err(_) = tx.send(3) {
println!("the receiver dropped");
}
});
match rx.await {
Ok(v) => println!("got = {:?}", v),
Err(_) => println!("the sender dropped"),
}
}
```
If the sender is dropped without sending, the receiver will fail with
[`error::RecvError`]:
```
use tokio::sync::oneshot;
#[tokio::main]
async fn main() {
let (tx, rx) = oneshot::channel::<u32>();
tokio::spawn(async move {
drop(tx);
});
match rx.await {
Ok(_) => panic!("This doesn't happen"),
Err(_) => println!("the sender dropped"),
}
}
```
To use a `oneshot` channel in a `tokio::select!` loop, add `&mut` in front of
the channel.
```
use tokio::sync::oneshot;
use tokio::time::{interval, sleep, Duration};
#[tokio::main]
# async fn _doc() {}
# #[tokio::main(flavor = "current_thread", start_paused = true)]
async fn main() {
let (send, mut recv) = oneshot::channel();
let mut interval = interval(Duration::from_millis(100));
# let handle =
tokio::spawn(async move {
sleep(Duration::from_secs(1)).await;
send.send("shut down").unwrap();
});
loop {
tokio::select! {
_ = interval.tick() => println!("Another 100ms"),
msg = &mut recv => {
println!("Got message: {}", msg.unwrap());
break;
}
}
}
# handle.await.unwrap();
}
```
To use a `Sender` from a destructor, put it in an [`Option`] and call
[`Option::take`].
```
use tokio::sync::oneshot;
struct SendOnDrop {
sender: Option<oneshot::Sender<&'static str>>,
}
impl Drop for SendOnDrop {
fn drop(&mut self) {
if let Some(sender) = self.sender.take() {
// Using `let _ =` to ignore send errors.
let _ = sender.send("I got dropped!");
}
}
}
#[tokio::main]
# async fn _doc() {}
# #[tokio::main(flavor = "current_thread")]
async fn main() {
let (send, recv) = oneshot::channel();
let send_on_drop = SendOnDrop { sender: Some(send) };
drop(send_on_drop);
assert_eq!(recv.await, Ok("I got dropped!"));
}
``` |
43214 |
rwlock |
|
|
rwlock.rs |
|
37745 |
semaphore.rs |
|
40710 |
task |
|
|
tests |
|
|
watch.rs |
A multi-producer, multi-consumer channel that only retains the *last* sent
value.
This channel is useful for watching for changes to a value from multiple
points in the code base, for example, changes to configuration values.
# Usage
[`channel`] returns a [`Sender`] / [`Receiver`] pair. These are the producer
and consumer halves of the channel. The channel is created with an initial
value.
Each [`Receiver`] independently tracks the last value *seen* by its caller.
To access the **current** value stored in the channel and mark it as *seen*
by a given [`Receiver`], use [`Receiver::borrow_and_update()`].
To access the current value **without** marking it as *seen*, use
[`Receiver::borrow()`]. (If the value has already been marked *seen*,
[`Receiver::borrow()`] is equivalent to [`Receiver::borrow_and_update()`].)
For more information on when to use these methods, see
[here](#borrow_and_update-versus-borrow).
## Change notifications
The [`Receiver`] half provides an asynchronous [`changed`] method. This
method is ready when a new, *unseen* value is sent via the [`Sender`] half.
* [`Receiver::changed()`] returns `Ok(())` on receiving a new value, or
`Err(`[`error::RecvError`]`)` if the [`Sender`] has been dropped.
* If the current value is *unseen* when calling [`changed`], then
[`changed`] will return immediately. If the current value is *seen*, then
it will sleep until either a new message is sent via the [`Sender`] half,
or the [`Sender`] is dropped.
* On completion, the [`changed`] method marks the new value as *seen*.
* At creation, the initial value is considered *seen*. In other words,
[`Receiver::changed()`] will not return until a subsequent value is sent.
* New [`Receiver`] instances can be created with [`Sender::subscribe()`].
The current value at the time the [`Receiver`] is created is considered
*seen*.
## `borrow_and_update` versus `borrow`
If the receiver intends to await notifications from [`changed`] in a loop,
[`Receiver::borrow_and_update()`] should be preferred over
[`Receiver::borrow()`]. This avoids a potential race where a new value is
sent between [`changed`] being ready and the value being read. (If
[`Receiver::borrow()`] is used, the loop may run twice with the same value.)
If the receiver is only interested in the current value, and does not intend
to wait for changes, then [`Receiver::borrow()`] can be used. It may be more
convenient to use [`borrow`](Receiver::borrow) since it's an `&self`
method---[`borrow_and_update`](Receiver::borrow_and_update) requires `&mut
self`.
# Examples
The following example prints `hello! world! `.
```
use tokio::sync::watch;
use tokio::time::{Duration, sleep};
# async fn dox() -> Result<(), Box<dyn std::error::Error>> {
let (tx, mut rx) = watch::channel("hello");
tokio::spawn(async move {
// Use the equivalent of a "do-while" loop so the initial value is
// processed before awaiting the `changed()` future.
loop {
println!("{}! ", *rx.borrow_and_update());
if rx.changed().await.is_err() {
break;
}
}
});
sleep(Duration::from_millis(100)).await;
tx.send("world")?;
# Ok(())
# }
```
# Closing
[`Sender::is_closed`] and [`Sender::closed`] allow the producer to detect
when all [`Receiver`] handles have been dropped. This indicates that there
is no further interest in the values being produced and work can be stopped.
The value in the channel will not be dropped until the sender and all
receivers have been dropped.
# Thread safety
Both [`Sender`] and [`Receiver`] are thread safe. They can be moved to other
threads and can be used in a concurrent environment. Clones of [`Receiver`]
handles may be moved to separate threads and also used concurrently.
[`Sender`]: crate::sync::watch::Sender
[`Receiver`]: crate::sync::watch::Receiver
[`changed`]: crate::sync::watch::Receiver::changed
[`Receiver::changed()`]: crate::sync::watch::Receiver::changed
[`Receiver::borrow()`]: crate::sync::watch::Receiver::borrow
[`Receiver::borrow_and_update()`]:
crate::sync::watch::Receiver::borrow_and_update
[`channel`]: crate::sync::watch::channel
[`Sender::is_closed`]: crate::sync::watch::Sender::is_closed
[`Sender::closed`]: crate::sync::watch::Sender::closed
[`Sender::subscribe()`]: crate::sync::watch::Sender::subscribe |
49730 |