| error.rs |
|
789 |
- |
| lib.rs |
This crate contains the queueing logic for asynchronous operations.
It also contains helpers for synchronizing operations such as error handling
across futures, in the [`line_token`] module.
# The operation queue
The queueing of operations is handled by the [`OperationQueue`] struct. It
runs a given number of parallel runners, to which it dispatches operations
on a "first come, first served" basis.
An operation is a data structure that implements the [`QueuedOperation`]
trait, and is started by the queue calling its `perform` method. Because
this method is asynchronous, thus breaking [dyn compatibility], another
trait that is dyn-compatible ([`ErasedQueuedOperation`]) is used by the
queue. However, `ErasedQueuedOperation` is implemented by any type that
implements `QueuedOperation`, so consumers usually don't need to bother with
it.
[`OperationQueue`] is runtime-agnostic, meaning it is not designed to work
only with a specific asynchronous runtime. However, it still needs to spawn
a task for each of its runners. This is why [`OperationQueue::new`] takes a
function as its sole argument, which is given the future for a runner's
loop. For example, creating a new queue with the `tokio` crate could look
like this:
```
# use operation_queue::OperationQueue;
let queue = OperationQueue::new(|fut| {
let _ = tokio::task::spawn_local(fut);
});
```
The queue is started by [`OperationQueue::start`], and stopped by
[`OperationQueue::stop`]. When starting the queue, the number of runners
provided as the function's argument are created and started. A runner is a
small stateful `struct` with an infinite asynchronous loop. Upon stopping,
the queue terminates and clears all current runners. Note that, once
stopped, a queue cannot be started again.
Queuing operations is done with [`OperationQueue::enqueue`]. The operation
is pushed to the back of the queue, and will be performed whenenever the
previous operations have also been performed and a runner becomes available.
# Multithreading
In order to maintain compatibility with the current Thunderbird code-base,
neither the operation queue's runner, nor the synchronization helpers in the
[`line_token`] module, can be sent between threads. This is something we
plan to address in the future.
[dyn compatibility]:
<https://doc.rust-lang.org/reference/items/traits.html#dyn-compatibility> |
2990 |
- |
| line_token.rs |
Helpers for synchronizing operations (e.g. error handling) across futures.
This module revolves around the [`Line`] struct, which is an asynchronous
flow control structure that behaves a bit like a mutex, with the exception
that consumers waiting for the [`Line`] to be released do not subsequently
lock it.
The design of a [`Line`] is inspired from the one of a [one-track railway
line](https://en.wikipedia.org/wiki/Token_(railway_signalling)). To avoid
collisions, conductors must acquire a token at the entrance to the line that
ensures they're the only one on it. If the token is being held, traffic
around this line stops until it's released again.
Similarly, in a context with multiple parallel [`Future`]s, it might be
necessary to ensure only one takes care of a given operation. For example,
if multiple requests are being performed against the same service, and one
of them hits an authentication error, it is likely the others will as well.
In this case, it is preferrable to only let one future handle the error than
let every request re-authenticate independently (in this example,
credentials are the same across requests, and multiple simultaneous
authentication attempts might cause issues with complex flows).
Each future holds a shared on a [`Line`] (e.g. wrapped in an [`Rc`] or an
[`Arc`]). Whenever a future needs to perform an operation that should only
be performed once at a time, it attempts to acquire the line's token with
[`Line::try_acquire_token`]. This function returns an enum
([`AcquireOutcome`]) describing one of two cases:
* The line's token is available and has been acquired, and the future can
start performing the operation immediately. It is granted the line's
[`Token`], which it must hold in scope for the duration of the operation,
as dropping it releases the line.
* The line's token has already been acquired by another future, in which
case the future must wait for the line to become available again. When the
line becomes available again, the future does not need to acquire another
token, as another future should have taken care of performing the
operation.
[`OperationQueue`]: crate::operation_queue::OperationQueue
[`Future`]: std::future::Future
[`Rc`]: std::rc::Rc
[`Arc`]: std::sync::Arc |
10690 |
- |
| operation_queue.rs |
This module defines the types and data structures for the operation queue.
See the crate's top-level documentation. |
14568 |
- |