pub struct Semaphore<Lock: RawMutex = Spinlock> { /* private fields */ }
Expand description

An asynchronous counting semaphore.

A semaphore is a synchronization primitive that limits the number of tasks that may run concurrently. It consists of a count of permits, which tasks may acquire in order to execute in some context. When a task acquires a permit from the semaphore, the count of permits held by the semaphore is decreased. When no permits remain in the semaphore, any task that wishes to acquire a permit must (asynchronously) wait until another task has released a permit.

The Permit type is a RAII guard representing one or more permits acquired from a Semaphore. When a Permit is dropped, the permits it represents are released back to the Semaphore, potentially allowing a waiting task to acquire them.

Fairness

This semaphore is fair: as permits become available, they are assigned to waiting tasks in the order that those tasks requested permits (first-in, first-out). This means that all tasks waiting to acquire permits will eventually be allowed to progress, and a single task cannot starve the semaphore of permits (provided that permits are eventually released). The semaphore remains fair even when a call to acquire requests more than one permit at a time.

Overriding the blocking mutex

This type uses a blocking Mutex internally to synchronize access to its wait list. By default, this is a Spinlock. To use an alternative RawMutex implementation, use the new_with_raw_mutex constructor. See the documentation on overriding mutex implementations for more details.

Note that this type currently requires that the raw mutex implement RawMutex rather than mutex_traits::ScopedRawMutex!

Examples

Using a semaphore to limit concurrency:

use maitake_sync::Semaphore;
use alloc::sync::Arc;

// Allow 4 tasks to run concurrently at a time.
let semaphore = Arc::new(Semaphore::new(4));

for _ in 0..8 {
    // Clone the `Arc` around the semaphore.
    let semaphore = semaphore.clone();
    task::spawn(async move {
        // Acquire a permit from the semaphore, returning a RAII guard that
        // releases the permit back to the semaphore when dropped.
        //
        // If all 4 permits have been acquired, the calling task will yield,
        // and it will be woken when another task releases a permit.
        let _permit = semaphore
            .acquire(1)
            .await
            .expect("semaphore will not be closed");

        // do some work...
    });
}

A semaphore may also be used to cause a task to run once all of a set of tasks have completed. If we want some task B to run only after a fixed number n of tasks A have run, we can have task B try to acquire n permits from a semaphore with 0 permits, and have each task A add one permit to the semaphore when it completes.

For example:

use maitake_sync::Semaphore;
use alloc::sync::Arc;

// How many tasks will we be waiting for the completion of?
const TASKS: usize = 4;

// Create the semaphore with 0 permits.
let semaphore = Arc::new(Semaphore::new(0));

// Spawn the "B" task that will wait for the 4 "A" tasks to complete.
task::spawn({
    let semaphore = semaphore.clone();
    async move {
        println!("Task B starting...");

        // Since the semaphore is created with 0 permits, this will
        // wait until all 4 "A" tasks have completed.
       let _permit = semaphore
            .acquire(TASKS)
            .await
            .expect("semaphore will not be closed");

        // ... do some work ...

        println!("Task B done!");
    }
});

for i in 0..TASKS {
    let semaphore = semaphore.clone();
    task::spawn(async move {
        println!("Task A {i} starting...");

        // Add a single permit to the semaphore. Once all 4 tasks have
        // completed, the semaphore will have the 4 permits required to
        // wake the "B" task.
        semaphore.add_permits(1);

        // ... do some work ...

        println!("Task A {i} done");
    });
}

Implementations§

source§

impl Semaphore

source

pub const fn new(permits: usize) -> Self

Returns a new Semaphore with permits permits available.

Panics

If permits is less than MAX_PERMITS (usize::MAX - 1).

source§

impl<Lock: RawMutex> Semaphore<Lock>

source

pub const MAX_PERMITS: usize = 18_446_744_073_709_551_614usize

The maximum number of permits a Semaphore may contain.

source

pub const fn new_with_raw_mutex(permits: usize, lock: Lock) -> Self

Returns a new Semaphore with permits permits available, using the provided RawMutex implementation.

This constructor allows a Semaphore to be constructed with any type that implements RawMutex as the underlying raw blocking mutex implementation. See the documentation on overriding mutex implementations for more details.

Panics

If permits is less than MAX_PERMITS (usize::MAX - 1).

source

pub fn available_permits(&self) -> usize

Returns the number of permits currently available in this semaphore, or 0 if the semaphore is closed.

source

pub fn acquire(&self, permits: usize) -> Acquire<'_, Lock>

Acquire permits permits from the Semaphore, waiting asynchronously if there are insufficient permits currently available.

Returns
  • Ok(Permit) with the requested number of permits, if the permits were acquired.
  • Err(Closed) if the semaphore was closed.
Cancellation

This method uses a queue to fairly distribute permits in the order they were requested. If an Acquire future is dropped before it completes, the task will lose its place in the queue.

source

pub fn add_permits(&self, permits: usize)

Add permits new permits to the semaphore.

This permanently increases the number of permits available in the semaphore. The permit count can be permanently decreased by calling acquire or try_acquire, and forgetting the returned Permit.

Panics

If adding permits permits would cause the permit count to overflow MAX_PERMITS (usize::MAX - 1).

source

pub fn try_acquire( &self, permits: usize ) -> Result<Permit<'_, Lock>, TryAcquireError>

Try to acquire permits permits from the Semaphore, without waiting for additional permits to become available.

Returns
source

pub fn close(&self)

Closes the semaphore.

This wakes all tasks currently waiting on the semaphore, and prevents new permits from being acquired.

source§

impl<Lock: RawMutex> Semaphore<Lock>

source

pub fn acquire_owned(self: &Arc<Self>, permits: usize) -> AcquireOwned<Lock>

Available on crate feature alloc only.

Acquire permits permits from the Semaphore, waiting asynchronously if there are insufficient permits currently available, and returning an OwnedPermit.

This method behaves identically to acquire, except that it requires the Semaphore to be wrapped in an Arc, and returns an OwnedPermit which clones the Arc rather than borrowing the semaphore. This allows the returned OwnedPermit to be valid for the 'static lifetime.

Returns
  • Ok(OwnedPermit) with the requested number of permits, if the permits were acquired.
  • Err(Closed) if the semaphore was closed.
Cancellation

This method uses a queue to fairly distribute permits in the order they were requested. If an AcquireOwned future is dropped before it completes, the task will lose its place in the queue.

source

pub fn try_acquire_owned( self: &Arc<Self>, permits: usize ) -> Result<OwnedPermit<Lock>, TryAcquireError>

Available on crate feature alloc only.

Try to acquire permits permits from the Semaphore, without waiting for additional permits to become available, and returning an OwnedPermit.

This method behaves identically to try_acquire, except that it requires the Semaphore to be wrapped in an Arc, and returns an OwnedPermit which clones the Arc rather than borrowing the semaphore. This allows the returned OwnedPermit to be valid for the 'static lifetime.

Returns

Trait Implementations§

source§

impl<Lock: Debug + RawMutex> Debug for Semaphore<Lock>

source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more

Auto Trait Implementations§

§

impl<Lock = Spinlock> !RefUnwindSafe for Semaphore<Lock>

§

impl<Lock> Send for Semaphore<Lock>
where Lock: Send,

§

impl<Lock> Sync for Semaphore<Lock>
where Lock: Sync,

§

impl<Lock> Unpin for Semaphore<Lock>
where Lock: Unpin,

§

impl<Lock = Spinlock> !UnwindSafe for Semaphore<Lock>

Blanket Implementations§

source§

impl<T> Any for T
where T: 'static + ?Sized,

source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
source§

impl<T> Borrow<T> for T
where T: ?Sized,

source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
source§

impl<T> From<T> for T

source§

fn from(t: T) -> T

Returns the argument unchanged.

§

impl<T> Instrument for T

§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided [Span], returning an Instrumented wrapper. Read more
§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
source§

impl<T, U> Into<U> for T
where U: From<T>,

source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

§

type Error = Infallible

The type returned in the event of a conversion error.
source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
§

impl<T> WithSubscriber for T

§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a [WithDispatch] wrapper. Read more
§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a [WithDispatch] wrapper. Read more