Revision control

Copy as Markdown

Other Tools

use coreaudio_sys::*;
use std::ffi::CString;
use std::mem;
use std::os::raw::c_void;
use std::panic;
use std::ptr;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Mutex, OnceLock};
use std::thread;
use std::time::Duration;
use std::time::Instant;
pub const DISPATCH_QUEUE_LABEL: &str = "org.mozilla.cubeb";
pub fn get_serial_queue_singleton() -> &'static Queue {
static SERIAL_QUEUE: OnceLock<Queue> = OnceLock::new();
SERIAL_QUEUE.get_or_init(|| Queue::new(DISPATCH_QUEUE_LABEL))
pub fn debug_assert_running_serially() {
pub fn debug_assert_not_running_serially() {
pub fn run_serially<F, B>(work: F) -> B
F: FnOnce() -> B,
get_serial_queue_singleton().run_sync(|| work()).unwrap()
pub fn run_serially_forward_panics<F, B>(work: F) -> B
F: panic::UnwindSafe + FnOnce() -> B,
match run_serially(|| panic::catch_unwind(|| work())) {
Ok(res) => res,
Err(e) => panic::resume_unwind(e),
// Queue: A wrapper around `dispatch_queue_t` that is always serial.
// ------------------------------------------------------------------------------------------------
pub struct Queue {
queue: Mutex<dispatch_queue_t>,
owned: AtomicBool,
impl Queue {
pub fn new_with_target(label: &str, target: &Queue) -> Self {
const DISPATCH_QUEUE_SERIAL: dispatch_queue_attr_t =
let label = CString::new(label).unwrap();
let c_string = label.as_ptr();
let queue = {
let target_guard = target.queue.lock().unwrap();
Self {
queue: Mutex::new(unsafe {
owned: AtomicBool::new(true),
pub fn new(label: &str) -> Self {
Queue::new_with_target(label, &Queue::get_global_queue())
pub fn get_global_queue() -> Self {
Self {
queue: Mutex::new(unsafe { dispatch_get_global_queue(QOS_CLASS_DEFAULT as isize, 0) }),
owned: AtomicBool::new(false),
pub fn debug_assert_is_current(&self) {
let guard = self.queue.lock().unwrap();
unsafe {
pub fn debug_assert_is_current(&self) {}
pub fn debug_assert_is_not_current(&self) {
let guard = self.queue.lock().unwrap();
unsafe {
pub fn debug_assert_is_not_current(&self) {}
pub fn run_async<F>(&self, work: F)
F: Send + FnOnce(),
let guard = self.queue.lock().unwrap();
let should_cancel = self.get_should_cancel(*guard);
let (closure, executor) = Self::create_closure_and_executor(|| {
if should_cancel.map_or(false, |v| v.load(Ordering::SeqCst)) {
unsafe {
dispatch_async_f(*guard, closure, executor);
pub fn run_after<F>(&self, when: Instant, work: F)
F: Send + FnOnce(),
let now = Instant::now();
if when <= now {
return self.run_async(work);
let nanos = (when - now).as_nanos() as i64;
let when = unsafe { dispatch_time(DISPATCH_TIME_NOW.into(), nanos) };
let guard = self.queue.lock().unwrap();
let should_cancel = self.get_should_cancel(*guard);
let (closure, executor) = Self::create_closure_and_executor(|| {
if should_cancel.map_or(false, |v| v.load(Ordering::SeqCst)) {
unsafe {
dispatch_after_f(when, *guard, closure, executor);
pub fn run_sync<F, B>(&self, work: F) -> Option<B>
F: FnOnce() -> B,
let queue: Option<dispatch_queue_t>;
let mut res: Option<B> = None;
let cex: Option<(*mut c_void, dispatch_function_t)>;
let guard = self.queue.lock().unwrap();
queue = Some(*guard);
let should_cancel = self.get_should_cancel(*guard);
cex = Some(Self::create_closure_and_executor(|| {
if should_cancel.map_or(false, |v| v.load(Ordering::SeqCst)) {
res = Some(work());
let (closure, executor) = cex.unwrap();
unsafe {
dispatch_sync_f(queue.unwrap(), closure, executor);
pub fn run_final<F, B>(&self, work: F) -> Option<B>
F: FnOnce() -> B,
"Doesn't make sense to finalize global queue"
let queue: Option<dispatch_queue_t>;
let mut res: Option<B> = None;
let cex: Option<(*mut c_void, dispatch_function_t)>;
let guard = self.queue.lock().unwrap();
queue = Some(*guard);
let should_cancel = self.get_should_cancel(*guard);
"dispatch context should be allocated!"
cex = Some(Self::create_closure_and_executor(|| {
res = Some(work());
.expect("dispatch context should be allocated!")
.store(true, Ordering::SeqCst);
let (closure, executor) = cex.unwrap();
unsafe {
dispatch_sync_f(queue.unwrap(), closure, executor);
fn get_should_cancel(&self, queue: dispatch_queue_t) -> Option<&mut AtomicBool> {
if !self.owned.load(Ordering::SeqCst) {
return None;
unsafe {
let context =
dispatch_get_context(mem::transmute::<dispatch_queue_t, dispatch_object_t>(queue))
as *mut AtomicBool;
fn set_should_cancel(&self, context: Box<AtomicBool>) {
unsafe {
let guard = self.queue.lock().unwrap();
let queue = mem::transmute::<dispatch_queue_t, dispatch_object_t>(*guard);
// Leak the context from Box.
dispatch_set_context(queue, Box::into_raw(context) as *mut c_void);
extern "C" fn finalizer(context: *mut c_void) {
// Retake the leaked context into box and then drop it.
let _ = unsafe { Box::from_raw(context as *mut AtomicBool) };
// The `finalizer` is only run if the `context` in `queue` is set by `dispatch_set_context`.
dispatch_set_finalizer_f(queue, Some(finalizer));
fn release(&self) {
let guard = self.queue.lock().unwrap();
let queue = *guard;
unsafe {
// This will release the inner `dispatch_queue_t` asynchronously.
// TODO: It's incredibly unsafe to call `transmute` directly.
// Find another way to release the queue.
dispatch_release(mem::transmute::<dispatch_queue_t, dispatch_object_t>(queue));
fn create_closure_and_executor<F>(closure: F) -> (*mut c_void, dispatch_function_t)
F: FnOnce(),
extern "C" fn closure_executer<F>(unboxed_closure: *mut c_void)
F: FnOnce(),
// Retake the leaked closure.
let closure = unsafe { Box::from_raw(unboxed_closure as *mut F) };
// Execute the closure.
// closure is released after finishing this function call.
let closure = Box::new(closure); // Allocate closure on heap.
let executor: dispatch_function_t = Some(closure_executer::<F>);
Box::into_raw(closure) as *mut c_void, // Leak the closure.
impl Drop for Queue {
fn drop(&mut self) {
if self.owned.load(Ordering::SeqCst) {
impl Clone for Queue {
fn clone(&self) -> Self {
"No need to clone a static queue"
let guard = self.queue.lock().unwrap();
let queue = *guard;
// TODO: It's incredibly unsafe to call `transmute` directly.
// Find another way to release the queue.
unsafe {
dispatch_retain(mem::transmute::<dispatch_queue_t, dispatch_object_t>(queue));
Self {
queue: Mutex::new(queue),
owned: AtomicBool::new(true),
unsafe impl Send for Queue {}
unsafe impl Sync for Queue {}
fn run_tasks_in_order() {
let mut visited = Vec::<u32>::new();
// Rust compilter doesn't allow a pointer to be passed across threads.
// A hacky way to do that is to cast the pointer into a value, then
// the value, which is actually an address, can be copied into threads.
let ptr = &mut visited as *mut Vec<u32> as usize;
fn visit(v: u32, visited_ptr: usize) {
let visited = unsafe { &mut *(visited_ptr as *mut Vec<u32>) };
let queue = Queue::new("Run tasks in order");
queue.run_sync(|| visit(1, ptr));
queue.run_sync(|| visit(2, ptr));
queue.run_async(|| visit(3, ptr));
queue.run_async(|| visit(4, ptr));
// Call sync here to block the current thread and make sure all the tasks are done.
queue.run_sync(|| visit(5, ptr));
assert_eq!(visited, vec![1, 2, 3, 4, 5]);
fn run_final_task() {
let mut visited = Vec::<u32>::new();
// Rust compilter doesn't allow a pointer to be passed across threads.
// A hacky way to do that is to cast the pointer into a value, then
// the value, which is actually an address, can be copied into threads.
let ptr = &mut visited as *mut Vec<u32> as usize;
fn visit(v: u32, visited_ptr: usize) {
let visited = unsafe { &mut *(visited_ptr as *mut Vec<u32>) };
let queue = Queue::new("Task after run_final will be cancelled");
queue.run_sync(|| visit(1, ptr));
queue.run_async(|| visit(2, ptr));
queue.run_final(|| visit(3, ptr));
queue.run_async(|| visit(4, ptr));
queue.run_sync(|| visit(5, ptr));
// `queue` will be dropped asynchronously and then the `finalizer` of the `queue`
// should be fired to clean up the `context` set in the `queue`.
assert_eq!(visited, vec![1, 2, 3]);
fn sync_return_value() {
let q = Queue::new("Test queue");
assert_eq!(q.run_sync(|| 42), Some(42));
assert_eq!(q.run_final(|| "foo"), Some("foo"));
assert_eq!(q.run_sync(|| Ok::<(), u32>(())), None);
fn run_after() {
let mut visited = Vec::<u32>::new();
// Rust compilter doesn't allow a pointer to be passed across threads.
// A hacky way to do that is to cast the pointer into a value, then
// the value, which is actually an address, can be copied into threads.
let ptr = &mut visited as *mut Vec<u32> as usize;
fn visit(v: u32, visited_ptr: usize) {
let visited = unsafe { &mut *(visited_ptr as *mut Vec<u32>) };
let queue = Queue::new("Task after run_final will be cancelled");
queue.run_async(|| visit(1, ptr));
queue.run_after(Instant::now() + Duration::from_millis(10), || visit(2, ptr));
queue.run_after(Instant::now() + Duration::from_secs(1), || visit(3, ptr));
queue.run_async(|| visit(4, ptr));
queue.run_final(|| visit(5, ptr));
// `queue` will be dropped asynchronously and then the `finalizer` of the `queue`
// should be fired to clean up the `context` set in the `queue`.
assert_eq!(visited, vec![1, 4, 2, 5]);