Source code

Revision control

Copy as Markdown

Other Tools

use super::batch_semaphore as ll; // low level implementation
use super::{AcquireError, TryAcquireError};
#[cfg(all(tokio_unstable, feature = "tracing"))]
use crate::util::trace;
use std::sync::Arc;
/// Counting semaphore performing asynchronous permit acquisition.
///
/// A semaphore maintains a set of permits. Permits are used to synchronize
/// access to a shared resource. A semaphore differs from a mutex in that it
/// can allow more than one concurrent caller to access the shared resource at a
/// time.
///
/// When `acquire` is called and the semaphore has remaining permits, the
/// function immediately returns a permit. However, if no remaining permits are
/// available, `acquire` (asynchronously) waits until an outstanding permit is
/// dropped. At this point, the freed permit is assigned to the caller.
///
/// This `Semaphore` is fair, which means that permits are given out in the order
/// they were requested. This fairness is also applied when `acquire_many` gets
/// involved, so if a call to `acquire_many` at the front of the queue requests
/// more permits than currently available, this can prevent a call to `acquire`
/// from completing, even if the semaphore has enough permits complete the call
/// to `acquire`.
///
/// To use the `Semaphore` in a poll function, you can use the [`PollSemaphore`]
/// utility.
///
/// # Examples
///
/// Basic usage:
///
/// ```
/// use tokio::sync::{Semaphore, TryAcquireError};
///
/// #[tokio::main]
/// async fn main() {
/// let semaphore = Semaphore::new(3);
///
/// let a_permit = semaphore.acquire().await.unwrap();
/// let two_permits = semaphore.acquire_many(2).await.unwrap();
///
/// assert_eq!(semaphore.available_permits(), 0);
///
/// let permit_attempt = semaphore.try_acquire();
/// assert_eq!(permit_attempt.err(), Some(TryAcquireError::NoPermits));
/// }
/// ```
///
/// ## Limit the number of simultaneously opened files in your program
///
/// Most operating systems have limits on the number of open file
/// handles. Even in systems without explicit limits, resource constraints
/// implicitly set an upper bound on the number of open files. If your
/// program attempts to open a large number of files and exceeds this
/// limit, it will result in an error.
///
/// This example uses a Semaphore with 100 permits. By acquiring a permit from
/// the Semaphore before accessing a file, you ensure that your program opens
/// no more than 100 files at a time. When trying to open the 101st
/// file, the program will wait until a permit becomes available before
/// proceeding to open another file.
/// ```
/// use std::io::Result;
/// use tokio::fs::File;
/// use tokio::sync::Semaphore;
/// use tokio::io::AsyncWriteExt;
///
/// static PERMITS: Semaphore = Semaphore::const_new(100);
///
/// async fn write_to_file(message: &[u8]) -> Result<()> {
/// let _permit = PERMITS.acquire().await.unwrap();
/// let mut buffer = File::create("example.txt").await?;
/// buffer.write_all(message).await?;
/// Ok(()) // Permit goes out of scope here, and is available again for acquisition
/// }
/// ```
///
/// ## Limit the number of outgoing requests being sent at the same time
///
/// In some scenarios, it might be required to limit the number of outgoing
/// requests being sent in parallel. This could be due to limits of a consumed
/// API or the network resources of the system the application is running on.
///
/// This example uses an `Arc<Semaphore>` with 10 permits. Each task spawned is
/// given a reference to the semaphore by cloning the `Arc<Semaphore>`. Before
/// a task sends a request, it must acquire a permit from the semaphore by
/// calling [`Semaphore::acquire`]. This ensures that at most 10 requests are
/// sent in parallel at any given time. After a task has sent a request, it
/// drops the permit to allow other tasks to send requests.
///
/// ```
/// use std::sync::Arc;
/// use tokio::sync::Semaphore;
///
/// #[tokio::main]
/// async fn main() {
/// // Define maximum number of parallel requests.
/// let semaphore = Arc::new(Semaphore::new(10));
/// // Spawn many tasks that will send requests.
/// let mut jhs = Vec::new();
/// for task_id in 0..100 {
/// let semaphore = semaphore.clone();
/// let jh = tokio::spawn(async move {
/// // Acquire permit before sending request.
/// let _permit = semaphore.acquire().await.unwrap();
/// // Send the request.
/// let response = send_request(task_id).await;
/// // Drop the permit after the request has been sent.
/// drop(_permit);
/// // Handle response.
/// // ...
///
/// response
/// });
/// jhs.push(jh);
/// }
/// // Collect responses from tasks.
/// let mut responses = Vec::new();
/// for jh in jhs {
/// let response = jh.await.unwrap();
/// responses.push(response);
/// }
/// // Process responses.
/// // ...
/// }
/// # async fn send_request(task_id: usize) {
/// # // Send request.
/// # }
/// ```
///
/// ## Limit the number of incoming requests being handled at the same time
///
/// Similar to limiting the number of simultaneously opened files, network handles
/// are a limited resource. Allowing an unbounded amount of requests to be processed
/// could result in a denial-of-service, among many other issues.
///
/// This example uses an `Arc<Semaphore>` instead of a global variable.
/// To limit the number of requests that can be processed at the time,
/// we acquire a permit for each task before spawning it. Once acquired,
/// a new task is spawned; and once finished, the permit is dropped inside
/// of the task to allow others to spawn. Permits must be acquired via
/// [`Semaphore::acquire_owned`] to be movable across the task boundary.
/// (Since our semaphore is not a global variable — if it was, then `acquire` would be enough.)
///
/// ```no_run
/// use std::sync::Arc;
/// use tokio::sync::Semaphore;
/// use tokio::net::TcpListener;
///
/// #[tokio::main]
/// async fn main() -> std::io::Result<()> {
/// let semaphore = Arc::new(Semaphore::new(3));
/// let listener = TcpListener::bind("127.0.0.1:8080").await?;
///
/// loop {
/// // Acquire permit before accepting the next socket.
/// //
/// // We use `acquire_owned` so that we can move `permit` into
/// // other tasks.
/// let permit = semaphore.clone().acquire_owned().await.unwrap();
/// let (mut socket, _) = listener.accept().await?;
///
/// tokio::spawn(async move {
/// // Do work using the socket.
/// handle_connection(&mut socket).await;
/// // Drop socket while the permit is still live.
/// drop(socket);
/// // Drop the permit, so more tasks can be created.
/// drop(permit);
/// });
/// }
/// }
/// # async fn handle_connection(_socket: &mut tokio::net::TcpStream) {
/// # // Do work
/// # }
/// ```
///
/// ## Prevent tests from running in parallel
///
/// By default, Rust runs tests in the same file in parallel. However, in some
/// cases, running two tests in parallel may lead to problems. For example, this
/// can happen when tests use the same database.
///
/// Consider the following scenario:
/// 1. `test_insert`: Inserts a key-value pair into the database, then retrieves
/// the value using the same key to verify the insertion.
/// 2. `test_update`: Inserts a key, then updates the key to a new value and
/// verifies that the value has been accurately updated.
/// 3. `test_others`: A third test that doesn't modify the database state. It
/// can run in parallel with the other tests.
///
/// In this example, `test_insert` and `test_update` need to run in sequence to
/// work, but it doesn't matter which test runs first. We can leverage a
/// semaphore with a single permit to address this challenge.
///
/// ```
/// # use tokio::sync::Mutex;
/// # use std::collections::BTreeMap;
/// # struct Database {
/// # map: Mutex<BTreeMap<String, i32>>,
/// # }
/// # impl Database {
/// # pub const fn setup() -> Database {
/// # Database {
/// # map: Mutex::const_new(BTreeMap::new()),
/// # }
/// # }
/// # pub async fn insert(&self, key: &str, value: i32) {
/// # self.map.lock().await.insert(key.to_string(), value);
/// # }
/// # pub async fn update(&self, key: &str, value: i32) {
/// # self.map.lock().await
/// # .entry(key.to_string())
/// # .and_modify(|origin| *origin = value);
/// # }
/// # pub async fn delete(&self, key: &str) {
/// # self.map.lock().await.remove(key);
/// # }
/// # pub async fn get(&self, key: &str) -> i32 {
/// # *self.map.lock().await.get(key).unwrap()
/// # }
/// # }
/// use tokio::sync::Semaphore;
///
/// // Initialize a static semaphore with only one permit, which is used to
/// // prevent test_insert and test_update from running in parallel.
/// static PERMIT: Semaphore = Semaphore::const_new(1);
///
/// // Initialize the database that will be used by the subsequent tests.
/// static DB: Database = Database::setup();
///
/// #[tokio::test]
/// # async fn fake_test_insert() {}
/// async fn test_insert() {
/// // Acquire permit before proceeding. Since the semaphore has only one permit,
/// // the test will wait if the permit is already acquired by other tests.
/// let permit = PERMIT.acquire().await.unwrap();
///
/// // Do the actual test stuff with database
///
/// // Insert a key-value pair to database
/// let (key, value) = ("name", 0);
/// DB.insert(key, value).await;
///
/// // Verify that the value has been inserted correctly.
/// assert_eq!(DB.get(key).await, value);
///
/// // Undo the insertion, so the database is empty at the end of the test.
/// DB.delete(key).await;
///
/// // Drop permit. This allows the other test to start running.
/// drop(permit);
/// }
///
/// #[tokio::test]
/// # async fn fake_test_update() {}
/// async fn test_update() {
/// // Acquire permit before proceeding. Since the semaphore has only one permit,
/// // the test will wait if the permit is already acquired by other tests.
/// let permit = PERMIT.acquire().await.unwrap();
///
/// // Do the same insert.
/// let (key, value) = ("name", 0);
/// DB.insert(key, value).await;
///
/// // Update the existing value with a new one.
/// let new_value = 1;
/// DB.update(key, new_value).await;
///
/// // Verify that the value has been updated correctly.
/// assert_eq!(DB.get(key).await, new_value);
///
/// // Undo any modificattion.
/// DB.delete(key).await;
///
/// // Drop permit. This allows the other test to start running.
/// drop(permit);
/// }
///
/// #[tokio::test]
/// # async fn fake_test_others() {}
/// async fn test_others() {
/// // This test can run in parallel with test_insert and test_update,
/// // so it does not use PERMIT.
/// }
/// # #[tokio::main(flavor = "current_thread")]
/// # async fn main() {
/// # test_insert().await;
/// # test_update().await;
/// # test_others().await;
/// # }
/// ```
///
/// ## Rate limiting using a token bucket
///
/// This example showcases the [`add_permits`] and [`SemaphorePermit::forget`] methods.
///
/// Many applications and systems have constraints on the rate at which certain
/// operations should occur. Exceeding this rate can result in suboptimal
/// performance or even errors.
///
/// This example implements rate limiting using a [token bucket]. A token bucket is a form of rate
/// limiting that doesn't kick in immediately, to allow for short bursts of incoming requests that
/// arrive at the same time.
///
/// With a token bucket, each incoming request consumes a token, and the tokens are refilled at a
/// certain rate that defines the rate limit. When a burst of requests arrives, tokens are
/// immediately given out until the bucket is empty. Once the bucket is empty, requests will have to
/// wait for new tokens to be added.
///
/// Unlike the example that limits how many requests can be handled at the same time, we do not add
/// tokens back when we finish handling a request. Instead, tokens are added only by a timer task.
///
/// Note that this implementation is suboptimal when the duration is small, because it consumes a
/// lot of cpu constantly looping and sleeping.
///
/// [`add_permits`]: crate::sync::Semaphore::add_permits
/// [`SemaphorePermit::forget`]: crate::sync::SemaphorePermit::forget
/// ```
/// use std::sync::Arc;
/// use tokio::sync::Semaphore;
/// use tokio::time::{interval, Duration};
///
/// struct TokenBucket {
/// sem: Arc<Semaphore>,
/// jh: tokio::task::JoinHandle<()>,
/// }
///
/// impl TokenBucket {
/// fn new(duration: Duration, capacity: usize) -> Self {
/// let sem = Arc::new(Semaphore::new(capacity));
///
/// // refills the tokens at the end of each interval
/// let jh = tokio::spawn({
/// let sem = sem.clone();
/// let mut interval = interval(duration);
/// interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
///
/// async move {
/// loop {
/// interval.tick().await;
///
/// if sem.available_permits() < capacity {
/// sem.add_permits(1);
/// }
/// }
/// }
/// });
///
/// Self { jh, sem }
/// }
///
/// async fn acquire(&self) {
/// // This can return an error if the semaphore is closed, but we
/// // never close it, so this error can never happen.
/// let permit = self.sem.acquire().await.unwrap();
/// // To avoid releasing the permit back to the semaphore, we use
/// // the `SemaphorePermit::forget` method.
/// permit.forget();
/// }
/// }
///
/// impl Drop for TokenBucket {
/// fn drop(&mut self) {
/// // Kill the background task so it stops taking up resources when we
/// // don't need it anymore.
/// self.jh.abort();
/// }
/// }
///
/// #[tokio::main]
/// # async fn _hidden() {}
/// # #[tokio::main(flavor = "current_thread", start_paused = true)]
/// async fn main() {
/// let capacity = 5;
/// let update_interval = Duration::from_secs_f32(1.0 / capacity as f32);
/// let bucket = TokenBucket::new(update_interval, capacity);
///
/// for _ in 0..5 {
/// bucket.acquire().await;
///
/// // do the operation
/// }
/// }
/// ```
///
/// [`Semaphore::acquire_owned`]: crate::sync::Semaphore::acquire_owned
#[derive(Debug)]
pub struct Semaphore {
/// The low level semaphore
ll_sem: ll::Semaphore,
#[cfg(all(tokio_unstable, feature = "tracing"))]
resource_span: tracing::Span,
}
/// A permit from the semaphore.
///
/// This type is created by the [`acquire`] method.
///
/// [`acquire`]: crate::sync::Semaphore::acquire()
#[must_use]
#[clippy::has_significant_drop]
#[derive(Debug)]
pub struct SemaphorePermit<'a> {
sem: &'a Semaphore,
permits: u32,
}
/// An owned permit from the semaphore.
///
/// This type is created by the [`acquire_owned`] method.
///
/// [`acquire_owned`]: crate::sync::Semaphore::acquire_owned()
#[must_use]
#[clippy::has_significant_drop]
#[derive(Debug)]
pub struct OwnedSemaphorePermit {
sem: Arc<Semaphore>,
permits: u32,
}
#[test]
#[cfg(not(loom))]
fn bounds() {
fn check_unpin<T: Unpin>() {}
// This has to take a value, since the async fn's return type is unnameable.
fn check_send_sync_val<T: Send + Sync>(_t: T) {}
fn check_send_sync<T: Send + Sync>() {}
check_unpin::<Semaphore>();
check_unpin::<SemaphorePermit<'_>>();
check_send_sync::<Semaphore>();
let semaphore = Semaphore::new(0);
check_send_sync_val(semaphore.acquire());
}
impl Semaphore {
/// The maximum number of permits which a semaphore can hold. It is `usize::MAX >> 3`.
///
/// Exceeding this limit typically results in a panic.
pub const MAX_PERMITS: usize = super::batch_semaphore::Semaphore::MAX_PERMITS;
/// Creates a new semaphore with the initial number of permits.
///
/// Panics if `permits` exceeds [`Semaphore::MAX_PERMITS`].
#[track_caller]
pub fn new(permits: usize) -> Self {
#[cfg(all(tokio_unstable, feature = "tracing"))]
let resource_span = {
let location = std::panic::Location::caller();
tracing::trace_span!(
parent: None,
"runtime.resource",
concrete_type = "Semaphore",
kind = "Sync",
loc.file = location.file(),
loc.line = location.line(),
loc.col = location.column(),
inherits_child_attrs = true,
)
};
#[cfg(all(tokio_unstable, feature = "tracing"))]
let ll_sem = resource_span.in_scope(|| ll::Semaphore::new(permits));
#[cfg(any(not(tokio_unstable), not(feature = "tracing")))]
let ll_sem = ll::Semaphore::new(permits);
Self {
ll_sem,
#[cfg(all(tokio_unstable, feature = "tracing"))]
resource_span,
}
}
/// Creates a new semaphore with the initial number of permits.
///
/// When using the `tracing` [unstable feature], a `Semaphore` created with
/// `const_new` will not be instrumented. As such, it will not be visible
/// in [`tokio-console`]. Instead, [`Semaphore::new`] should be used to
/// create an instrumented object if that is needed.
///
/// # Examples
///
/// ```
/// use tokio::sync::Semaphore;
///
/// static SEM: Semaphore = Semaphore::const_new(10);
/// ```
///
/// [`tokio-console`]: https://github.com/tokio-rs/console
/// [unstable feature]: crate#unstable-features
#[cfg(not(all(loom, test)))]
pub const fn const_new(permits: usize) -> Self {
Self {
ll_sem: ll::Semaphore::const_new(permits),
#[cfg(all(tokio_unstable, feature = "tracing"))]
resource_span: tracing::Span::none(),
}
}
/// Creates a new closed semaphore with 0 permits.
pub(crate) fn new_closed() -> Self {
Self {
ll_sem: ll::Semaphore::new_closed(),
#[cfg(all(tokio_unstable, feature = "tracing"))]
resource_span: tracing::Span::none(),
}
}
/// Creates a new closed semaphore with 0 permits.
#[cfg(not(all(loom, test)))]
pub(crate) const fn const_new_closed() -> Self {
Self {
ll_sem: ll::Semaphore::const_new_closed(),
#[cfg(all(tokio_unstable, feature = "tracing"))]
resource_span: tracing::Span::none(),
}
}
/// Returns the current number of available permits.
pub fn available_permits(&self) -> usize {
self.ll_sem.available_permits()
}
/// Adds `n` new permits to the semaphore.
///
/// The maximum number of permits is [`Semaphore::MAX_PERMITS`], and this function will panic if the limit is exceeded.
pub fn add_permits(&self, n: usize) {
self.ll_sem.release(n);
}
/// Decrease a semaphore's permits by a maximum of `n`.
///
/// If there are insufficient permits and it's not possible to reduce by `n`,
/// return the number of permits that were actually reduced.
pub fn forget_permits(&self, n: usize) -> usize {
self.ll_sem.forget_permits(n)
}
/// Acquires a permit from the semaphore.
///
/// If the semaphore has been closed, this returns an [`AcquireError`].
/// Otherwise, this returns a [`SemaphorePermit`] representing the
/// acquired permit.
///
/// # Cancel safety
///
/// This method uses a queue to fairly distribute permits in the order they
/// were requested. Cancelling a call to `acquire` makes you lose your place
/// in the queue.
///
/// # Examples
///
/// ```
/// use tokio::sync::Semaphore;
///
/// #[tokio::main]
/// async fn main() {
/// let semaphore = Semaphore::new(2);
///
/// let permit_1 = semaphore.acquire().await.unwrap();
/// assert_eq!(semaphore.available_permits(), 1);
///
/// let permit_2 = semaphore.acquire().await.unwrap();
/// assert_eq!(semaphore.available_permits(), 0);
///
/// drop(permit_1);
/// assert_eq!(semaphore.available_permits(), 1);
/// }
/// ```
///
/// [`AcquireError`]: crate::sync::AcquireError
/// [`SemaphorePermit`]: crate::sync::SemaphorePermit
pub async fn acquire(&self) -> Result<SemaphorePermit<'_>, AcquireError> {
#[cfg(all(tokio_unstable, feature = "tracing"))]
let inner = trace::async_op(
|| self.ll_sem.acquire(1),
self.resource_span.clone(),
"Semaphore::acquire",
"poll",
true,
);
#[cfg(not(all(tokio_unstable, feature = "tracing")))]
let inner = self.ll_sem.acquire(1);
inner.await?;
Ok(SemaphorePermit {
sem: self,
permits: 1,
})
}
/// Acquires `n` permits from the semaphore.
///
/// If the semaphore has been closed, this returns an [`AcquireError`].
/// Otherwise, this returns a [`SemaphorePermit`] representing the
/// acquired permits.
///
/// # Cancel safety
///
/// This method uses a queue to fairly distribute permits in the order they
/// were requested. Cancelling a call to `acquire_many` makes you lose your
/// place in the queue.
///
/// # Examples
///
/// ```
/// use tokio::sync::Semaphore;
///
/// #[tokio::main]
/// async fn main() {
/// let semaphore = Semaphore::new(5);
///
/// let permit = semaphore.acquire_many(3).await.unwrap();
/// assert_eq!(semaphore.available_permits(), 2);
/// }
/// ```
///
/// [`AcquireError`]: crate::sync::AcquireError
/// [`SemaphorePermit`]: crate::sync::SemaphorePermit
pub async fn acquire_many(&self, n: u32) -> Result<SemaphorePermit<'_>, AcquireError> {
#[cfg(all(tokio_unstable, feature = "tracing"))]
trace::async_op(
|| self.ll_sem.acquire(n as usize),
self.resource_span.clone(),
"Semaphore::acquire_many",
"poll",
true,
)
.await?;
#[cfg(not(all(tokio_unstable, feature = "tracing")))]
self.ll_sem.acquire(n as usize).await?;
Ok(SemaphorePermit {
sem: self,
permits: n,
})
}
/// Tries to acquire a permit from the semaphore.
///
/// If the semaphore has been closed, this returns a [`TryAcquireError::Closed`]
/// and a [`TryAcquireError::NoPermits`] if there are no permits left. Otherwise,
/// this returns a [`SemaphorePermit`] representing the acquired permits.
///
/// # Examples
///
/// ```
/// use tokio::sync::{Semaphore, TryAcquireError};
///
/// # fn main() {
/// let semaphore = Semaphore::new(2);
///
/// let permit_1 = semaphore.try_acquire().unwrap();
/// assert_eq!(semaphore.available_permits(), 1);
///
/// let permit_2 = semaphore.try_acquire().unwrap();
/// assert_eq!(semaphore.available_permits(), 0);
///
/// let permit_3 = semaphore.try_acquire();
/// assert_eq!(permit_3.err(), Some(TryAcquireError::NoPermits));
/// # }
/// ```
///
/// [`TryAcquireError::Closed`]: crate::sync::TryAcquireError::Closed
/// [`TryAcquireError::NoPermits`]: crate::sync::TryAcquireError::NoPermits
/// [`SemaphorePermit`]: crate::sync::SemaphorePermit
pub fn try_acquire(&self) -> Result<SemaphorePermit<'_>, TryAcquireError> {
match self.ll_sem.try_acquire(1) {
Ok(()) => Ok(SemaphorePermit {
sem: self,
permits: 1,
}),
Err(e) => Err(e),
}
}
/// Tries to acquire `n` permits from the semaphore.
///
/// If the semaphore has been closed, this returns a [`TryAcquireError::Closed`]
/// and a [`TryAcquireError::NoPermits`] if there are not enough permits left.
/// Otherwise, this returns a [`SemaphorePermit`] representing the acquired permits.
///
/// # Examples
///
/// ```
/// use tokio::sync::{Semaphore, TryAcquireError};
///
/// # fn main() {
/// let semaphore = Semaphore::new(4);
///
/// let permit_1 = semaphore.try_acquire_many(3).unwrap();
/// assert_eq!(semaphore.available_permits(), 1);
///
/// let permit_2 = semaphore.try_acquire_many(2);
/// assert_eq!(permit_2.err(), Some(TryAcquireError::NoPermits));
/// # }
/// ```
///
/// [`TryAcquireError::Closed`]: crate::sync::TryAcquireError::Closed
/// [`TryAcquireError::NoPermits`]: crate::sync::TryAcquireError::NoPermits
/// [`SemaphorePermit`]: crate::sync::SemaphorePermit
pub fn try_acquire_many(&self, n: u32) -> Result<SemaphorePermit<'_>, TryAcquireError> {
match self.ll_sem.try_acquire(n as usize) {
Ok(()) => Ok(SemaphorePermit {
sem: self,
permits: n,
}),
Err(e) => Err(e),
}
}
/// Acquires a permit from the semaphore.
///
/// The semaphore must be wrapped in an [`Arc`] to call this method.
/// If the semaphore has been closed, this returns an [`AcquireError`].
/// Otherwise, this returns a [`OwnedSemaphorePermit`] representing the
/// acquired permit.
///
/// # Cancel safety
///
/// This method uses a queue to fairly distribute permits in the order they
/// were requested. Cancelling a call to `acquire_owned` makes you lose your
/// place in the queue.
///
/// # Examples
///
/// ```
/// use std::sync::Arc;
/// use tokio::sync::Semaphore;
///
/// #[tokio::main]
/// async fn main() {
/// let semaphore = Arc::new(Semaphore::new(3));
/// let mut join_handles = Vec::new();
///
/// for _ in 0..5 {
/// let permit = semaphore.clone().acquire_owned().await.unwrap();
/// join_handles.push(tokio::spawn(async move {
/// // perform task...
/// // explicitly own `permit` in the task
/// drop(permit);
/// }));
/// }
///
/// for handle in join_handles {
/// handle.await.unwrap();
/// }
/// }
/// ```
///
/// [`Arc`]: std::sync::Arc
/// [`AcquireError`]: crate::sync::AcquireError
/// [`OwnedSemaphorePermit`]: crate::sync::OwnedSemaphorePermit
pub async fn acquire_owned(self: Arc<Self>) -> Result<OwnedSemaphorePermit, AcquireError> {
#[cfg(all(tokio_unstable, feature = "tracing"))]
let inner = trace::async_op(
|| self.ll_sem.acquire(1),
self.resource_span.clone(),
"Semaphore::acquire_owned",
"poll",
true,
);
#[cfg(not(all(tokio_unstable, feature = "tracing")))]
let inner = self.ll_sem.acquire(1);
inner.await?;
Ok(OwnedSemaphorePermit {
sem: self,
permits: 1,
})
}
/// Acquires `n` permits from the semaphore.
///
/// The semaphore must be wrapped in an [`Arc`] to call this method.
/// If the semaphore has been closed, this returns an [`AcquireError`].
/// Otherwise, this returns a [`OwnedSemaphorePermit`] representing the
/// acquired permit.
///
/// # Cancel safety
///
/// This method uses a queue to fairly distribute permits in the order they
/// were requested. Cancelling a call to `acquire_many_owned` makes you lose
/// your place in the queue.
///
/// # Examples
///
/// ```
/// use std::sync::Arc;
/// use tokio::sync::Semaphore;
///
/// #[tokio::main]
/// async fn main() {
/// let semaphore = Arc::new(Semaphore::new(10));
/// let mut join_handles = Vec::new();
///
/// for _ in 0..5 {
/// let permit = semaphore.clone().acquire_many_owned(2).await.unwrap();
/// join_handles.push(tokio::spawn(async move {
/// // perform task...
/// // explicitly own `permit` in the task
/// drop(permit);
/// }));
/// }
///
/// for handle in join_handles {
/// handle.await.unwrap();
/// }
/// }
/// ```
///
/// [`Arc`]: std::sync::Arc
/// [`AcquireError`]: crate::sync::AcquireError
/// [`OwnedSemaphorePermit`]: crate::sync::OwnedSemaphorePermit
pub async fn acquire_many_owned(
self: Arc<Self>,
n: u32,
) -> Result<OwnedSemaphorePermit, AcquireError> {
#[cfg(all(tokio_unstable, feature = "tracing"))]
let inner = trace::async_op(
|| self.ll_sem.acquire(n as usize),
self.resource_span.clone(),
"Semaphore::acquire_many_owned",
"poll",
true,
);
#[cfg(not(all(tokio_unstable, feature = "tracing")))]
let inner = self.ll_sem.acquire(n as usize);
inner.await?;
Ok(OwnedSemaphorePermit {
sem: self,
permits: n,
})
}
/// Tries to acquire a permit from the semaphore.
///
/// The semaphore must be wrapped in an [`Arc`] to call this method. If
/// the semaphore has been closed, this returns a [`TryAcquireError::Closed`]
/// and a [`TryAcquireError::NoPermits`] if there are no permits left.
/// Otherwise, this returns a [`OwnedSemaphorePermit`] representing the
/// acquired permit.
///
/// # Examples
///
/// ```
/// use std::sync::Arc;
/// use tokio::sync::{Semaphore, TryAcquireError};
///
/// # fn main() {
/// let semaphore = Arc::new(Semaphore::new(2));
///
/// let permit_1 = Arc::clone(&semaphore).try_acquire_owned().unwrap();
/// assert_eq!(semaphore.available_permits(), 1);
///
/// let permit_2 = Arc::clone(&semaphore).try_acquire_owned().unwrap();
/// assert_eq!(semaphore.available_permits(), 0);
///
/// let permit_3 = semaphore.try_acquire_owned();
/// assert_eq!(permit_3.err(), Some(TryAcquireError::NoPermits));
/// # }
/// ```
///
/// [`Arc`]: std::sync::Arc
/// [`TryAcquireError::Closed`]: crate::sync::TryAcquireError::Closed
/// [`TryAcquireError::NoPermits`]: crate::sync::TryAcquireError::NoPermits
/// [`OwnedSemaphorePermit`]: crate::sync::OwnedSemaphorePermit
pub fn try_acquire_owned(self: Arc<Self>) -> Result<OwnedSemaphorePermit, TryAcquireError> {
match self.ll_sem.try_acquire(1) {
Ok(()) => Ok(OwnedSemaphorePermit {
sem: self,
permits: 1,
}),
Err(e) => Err(e),
}
}
/// Tries to acquire `n` permits from the semaphore.
///
/// The semaphore must be wrapped in an [`Arc`] to call this method. If
/// the semaphore has been closed, this returns a [`TryAcquireError::Closed`]
/// and a [`TryAcquireError::NoPermits`] if there are no permits left.
/// Otherwise, this returns a [`OwnedSemaphorePermit`] representing the
/// acquired permit.
///
/// # Examples
///
/// ```
/// use std::sync::Arc;
/// use tokio::sync::{Semaphore, TryAcquireError};
///
/// # fn main() {
/// let semaphore = Arc::new(Semaphore::new(4));
///
/// let permit_1 = Arc::clone(&semaphore).try_acquire_many_owned(3).unwrap();
/// assert_eq!(semaphore.available_permits(), 1);
///
/// let permit_2 = semaphore.try_acquire_many_owned(2);
/// assert_eq!(permit_2.err(), Some(TryAcquireError::NoPermits));
/// # }
/// ```
///
/// [`Arc`]: std::sync::Arc
/// [`TryAcquireError::Closed`]: crate::sync::TryAcquireError::Closed
/// [`TryAcquireError::NoPermits`]: crate::sync::TryAcquireError::NoPermits
/// [`OwnedSemaphorePermit`]: crate::sync::OwnedSemaphorePermit
pub fn try_acquire_many_owned(
self: Arc<Self>,
n: u32,
) -> Result<OwnedSemaphorePermit, TryAcquireError> {
match self.ll_sem.try_acquire(n as usize) {
Ok(()) => Ok(OwnedSemaphorePermit {
sem: self,
permits: n,
}),
Err(e) => Err(e),
}
}
/// Closes the semaphore.
///
/// This prevents the semaphore from issuing new permits and notifies all pending waiters.
///
/// # Examples
///
/// ```
/// use tokio::sync::Semaphore;
/// use std::sync::Arc;
/// use tokio::sync::TryAcquireError;
///
/// #[tokio::main]
/// async fn main() {
/// let semaphore = Arc::new(Semaphore::new(1));
/// let semaphore2 = semaphore.clone();
///
/// tokio::spawn(async move {
/// let permit = semaphore.acquire_many(2).await;
/// assert!(permit.is_err());
/// println!("waiter received error");
/// });
///
/// println!("closing semaphore");
/// semaphore2.close();
///
/// // Cannot obtain more permits
/// assert_eq!(semaphore2.try_acquire().err(), Some(TryAcquireError::Closed))
/// }
/// ```
pub fn close(&self) {
self.ll_sem.close();
}
/// Returns true if the semaphore is closed
pub fn is_closed(&self) -> bool {
self.ll_sem.is_closed()
}
}
impl<'a> SemaphorePermit<'a> {
/// Forgets the permit **without** releasing it back to the semaphore.
/// This can be used to reduce the amount of permits available from a
/// semaphore.
///
/// # Examples
///
/// ```
/// use std::sync::Arc;
/// use tokio::sync::Semaphore;
///
/// let sem = Arc::new(Semaphore::new(10));
/// {
/// let permit = sem.try_acquire_many(5).unwrap();
/// assert_eq!(sem.available_permits(), 5);
/// permit.forget();
/// }
///
/// // Since we forgot the permit, available permits won't go back to its initial value
/// // even after the permit is dropped.
/// assert_eq!(sem.available_permits(), 5);
/// ```
pub fn forget(mut self) {
self.permits = 0;
}
/// Merge two [`SemaphorePermit`] instances together, consuming `other`
/// without releasing the permits it holds.
///
/// Permits held by both `self` and `other` are released when `self` drops.
///
/// # Panics
///
/// This function panics if permits from different [`Semaphore`] instances
/// are merged.
///
/// # Examples
///
/// ```
/// use std::sync::Arc;
/// use tokio::sync::Semaphore;
///
/// let sem = Arc::new(Semaphore::new(10));
/// let mut permit = sem.try_acquire().unwrap();
///
/// for _ in 0..9 {
/// let _permit = sem.try_acquire().unwrap();
/// // Merge individual permits into a single one.
/// permit.merge(_permit)
/// }
///
/// assert_eq!(sem.available_permits(), 0);
///
/// // Release all permits in a single batch.
/// drop(permit);
///
/// assert_eq!(sem.available_permits(), 10);
/// ```
#[track_caller]
pub fn merge(&mut self, mut other: Self) {
assert!(
std::ptr::eq(self.sem, other.sem),
"merging permits from different semaphore instances"
);
self.permits += other.permits;
other.permits = 0;
}
/// Splits `n` permits from `self` and returns a new [`SemaphorePermit`] instance that holds `n` permits.
///
/// If there are insufficient permits and it's not possible to reduce by `n`, returns `None`.
///
/// # Examples
///
/// ```
/// use std::sync::Arc;
/// use tokio::sync::Semaphore;
///
/// let sem = Arc::new(Semaphore::new(3));
///
/// let mut p1 = sem.try_acquire_many(3).unwrap();
/// let p2 = p1.split(1).unwrap();
///
/// assert_eq!(p1.num_permits(), 2);
/// assert_eq!(p2.num_permits(), 1);
/// ```
pub fn split(&mut self, n: usize) -> Option<Self> {
let n = u32::try_from(n).ok()?;
if n > self.permits {
return None;
}
self.permits -= n;
Some(Self {
sem: self.sem,
permits: n,
})
}
/// Returns the number of permits held by `self`.
pub fn num_permits(&self) -> usize {
self.permits as usize
}
}
impl OwnedSemaphorePermit {
/// Forgets the permit **without** releasing it back to the semaphore.
/// This can be used to reduce the amount of permits available from a
/// semaphore.
///
/// # Examples
///
/// ```
/// use std::sync::Arc;
/// use tokio::sync::Semaphore;
///
/// let sem = Arc::new(Semaphore::new(10));
/// {
/// let permit = sem.clone().try_acquire_many_owned(5).unwrap();
/// assert_eq!(sem.available_permits(), 5);
/// permit.forget();
/// }
///
/// // Since we forgot the permit, available permits won't go back to its initial value
/// // even after the permit is dropped.
/// assert_eq!(sem.available_permits(), 5);
/// ```
pub fn forget(mut self) {
self.permits = 0;
}
/// Merge two [`OwnedSemaphorePermit`] instances together, consuming `other`
/// without releasing the permits it holds.
///
/// Permits held by both `self` and `other` are released when `self` drops.
///
/// # Panics
///
/// This function panics if permits from different [`Semaphore`] instances
/// are merged.
///
/// # Examples
///
/// ```
/// use std::sync::Arc;
/// use tokio::sync::Semaphore;
///
/// let sem = Arc::new(Semaphore::new(10));
/// let mut permit = sem.clone().try_acquire_owned().unwrap();
///
/// for _ in 0..9 {
/// let _permit = sem.clone().try_acquire_owned().unwrap();
/// // Merge individual permits into a single one.
/// permit.merge(_permit)
/// }
///
/// assert_eq!(sem.available_permits(), 0);
///
/// // Release all permits in a single batch.
/// drop(permit);
///
/// assert_eq!(sem.available_permits(), 10);
/// ```
#[track_caller]
pub fn merge(&mut self, mut other: Self) {
assert!(
Arc::ptr_eq(&self.sem, &other.sem),
"merging permits from different semaphore instances"
);
self.permits += other.permits;
other.permits = 0;
}
/// Splits `n` permits from `self` and returns a new [`OwnedSemaphorePermit`] instance that holds `n` permits.
///
/// If there are insufficient permits and it's not possible to reduce by `n`, returns `None`.
///
/// # Note
///
/// It will clone the owned `Arc<Semaphore>` to construct the new instance.
///
/// # Examples
///
/// ```
/// use std::sync::Arc;
/// use tokio::sync::Semaphore;
///
/// let sem = Arc::new(Semaphore::new(3));
///
/// let mut p1 = sem.try_acquire_many_owned(3).unwrap();
/// let p2 = p1.split(1).unwrap();
///
/// assert_eq!(p1.num_permits(), 2);
/// assert_eq!(p2.num_permits(), 1);
/// ```
pub fn split(&mut self, n: usize) -> Option<Self> {
let n = u32::try_from(n).ok()?;
if n > self.permits {
return None;
}
self.permits -= n;
Some(Self {
sem: self.sem.clone(),
permits: n,
})
}
/// Returns the [`Semaphore`] from which this permit was acquired.
pub fn semaphore(&self) -> &Arc<Semaphore> {
&self.sem
}
/// Returns the number of permits held by `self`.
pub fn num_permits(&self) -> usize {
self.permits as usize
}
}
impl Drop for SemaphorePermit<'_> {
fn drop(&mut self) {
self.sem.add_permits(self.permits as usize);
}
}
impl Drop for OwnedSemaphorePermit {
fn drop(&mut self) {
self.sem.add_permits(self.permits as usize);
}
}