Revision control
Copy as Markdown
Other Tools
// Copyright © 2018 Mozilla Foundation
//
// This program is made available under an ISC-style license. See the
// accompanying file LICENSE for details.
#![allow(unused_assignments)]
#![allow(unused_must_use)]
extern crate coreaudio_sys_utils;
extern crate libc;
extern crate ringbuf;
mod aggregate_device;
mod auto_release;
mod buffer_manager;
mod device_property;
mod mixer;
mod resampler;
mod utils;
use self::aggregate_device::*;
use self::auto_release::*;
use self::buffer_manager::*;
use self::coreaudio_sys_utils::aggregate_device::*;
use self::coreaudio_sys_utils::audio_device_extensions::*;
use self::coreaudio_sys_utils::audio_object::*;
use self::coreaudio_sys_utils::audio_unit::*;
use self::coreaudio_sys_utils::cf_mutable_dict::*;
use self::coreaudio_sys_utils::dispatch::*;
use self::coreaudio_sys_utils::string::*;
use self::coreaudio_sys_utils::sys::*;
use self::device_property::*;
use self::mixer::*;
use self::resampler::*;
use self::utils::*;
use backend::ringbuf::RingBuffer;
#[cfg(feature = "audio-dump")]
use cubeb_backend::ffi::cubeb_audio_dump_stream_t;
use cubeb_backend::{
ffi, ChannelLayout, Context, ContextOps, DeviceCollectionRef, DeviceId, DeviceRef, DeviceType,
Error, InputProcessingParams, Ops, Result, SampleFormat, State, Stream, StreamOps,
StreamParams, StreamParamsRef, StreamPrefs,
};
use mach::mach_time::{mach_absolute_time, mach_timebase_info};
use std::cmp;
use std::ffi::{CStr, CString};
use std::fmt;
use std::mem;
use std::os::raw::{c_uint, c_void};
use std::ptr;
use std::slice;
use std::str::FromStr;
use std::sync::atomic::{AtomicBool, AtomicU32, AtomicUsize, Ordering};
use std::sync::{Arc, Condvar, Mutex, MutexGuard, Weak};
use std::time::{Duration, Instant};
const NO_ERR: OSStatus = 0;
const AU_OUT_BUS: AudioUnitElement = 0;
const AU_IN_BUS: AudioUnitElement = 1;
const PRIVATE_AGGREGATE_DEVICE_NAME: &str = "CubebAggregateDevice";
const VOICEPROCESSING_AGGREGATE_DEVICE_NAME: &str = "VPAUAggregateAudioDevice";
const APPLE_STUDIO_DISPLAY_USB_ID: &str = "05AC:1114";
// Testing empirically, some headsets report a minimal latency that is very low,
// but this does not work in practice. Lie and say the minimum is 128 frames.
const SAFE_MIN_LATENCY_FRAMES: u32 = 128;
const SAFE_MAX_LATENCY_FRAMES: u32 = 512;
const VPIO_IDLE_TIMEOUT: Duration = Duration::from_secs(10);
const MACOS_KERNEL_MAJOR_VERSION_MONTEREY: u32 = 21;
#[derive(Debug, PartialEq)]
enum ParseMacOSKernelVersionError {
SysCtl,
Malformed,
Parsing,
}
fn macos_kernel_major_version() -> std::result::Result<u32, ParseMacOSKernelVersionError> {
let ver = whatsys::kernel_version();
if ver.is_none() {
return Err(ParseMacOSKernelVersionError::SysCtl);
}
let ver = ver.unwrap();
let major = ver.split('.').next();
if major.is_none() {
return Err(ParseMacOSKernelVersionError::Malformed);
}
let parsed_major = u32::from_str(major.unwrap());
if parsed_major.is_err() {
return Err(ParseMacOSKernelVersionError::Parsing);
}
Ok(parsed_major.unwrap())
}
bitflags! {
#[allow(non_camel_case_types)]
#[derive(Clone, Debug, PartialEq, Copy)]
struct device_flags: u32 {
const DEV_UNKNOWN = 0b0000_0000; // Unknown
const DEV_INPUT = 0b0000_0001; // Record device like mic
const DEV_OUTPUT = 0b0000_0010; // Playback device like speakers
const DEV_SELECTED_DEFAULT = 0b0000_0100; // User selected to use the system default device
}
}
#[cfg(feature = "audio-dump")]
fn dump_audio(stream: cubeb_audio_dump_stream_t, audio_samples: *mut c_void, count: u32) {
unsafe {
let rv = ffi::cubeb_audio_dump_write(stream, audio_samples, count);
if rv != 0 {
cubeb_alog!("Error dumping audio data");
}
}
}
fn make_sized_audio_channel_layout(sz: usize) -> AutoRelease<AudioChannelLayout> {
assert!(sz >= mem::size_of::<AudioChannelLayout>());
assert_eq!(
(sz - mem::size_of::<AudioChannelLayout>()) % mem::size_of::<AudioChannelDescription>(),
0
);
let acl = unsafe { libc::calloc(1, sz) } as *mut AudioChannelLayout;
unsafe extern "C" fn free_acl(acl: *mut AudioChannelLayout) {
libc::free(acl as *mut libc::c_void);
}
AutoRelease::new(acl, free_acl)
}
#[allow(non_camel_case_types)]
#[derive(Clone, Debug)]
struct device_info {
id: AudioDeviceID,
flags: device_flags,
}
impl Default for device_info {
fn default() -> Self {
Self {
id: kAudioObjectUnknown,
flags: device_flags::DEV_UNKNOWN,
}
}
}
#[allow(non_camel_case_types)]
#[derive(Debug)]
struct device_property_listener {
device: AudioDeviceID,
property: AudioObjectPropertyAddress,
listener: audio_object_property_listener_proc,
}
impl device_property_listener {
fn new(
device: AudioDeviceID,
property: AudioObjectPropertyAddress,
listener: audio_object_property_listener_proc,
) -> Self {
Self {
device,
property,
listener,
}
}
}
#[derive(Debug, PartialEq)]
struct CAChannelLabel(AudioChannelLabel);
impl From<CAChannelLabel> for mixer::Channel {
fn from(label: CAChannelLabel) -> mixer::Channel {
use self::coreaudio_sys_utils::sys;
match label.0 {
sys::kAudioChannelLabel_Left => mixer::Channel::FrontLeft,
sys::kAudioChannelLabel_Right => mixer::Channel::FrontRight,
sys::kAudioChannelLabel_Center | sys::kAudioChannelLabel_Mono => {
mixer::Channel::FrontCenter
}
sys::kAudioChannelLabel_LFEScreen => mixer::Channel::LowFrequency,
sys::kAudioChannelLabel_LeftSurround => mixer::Channel::BackLeft,
sys::kAudioChannelLabel_RightSurround => mixer::Channel::BackRight,
sys::kAudioChannelLabel_LeftCenter => mixer::Channel::FrontLeftOfCenter,
sys::kAudioChannelLabel_RightCenter => mixer::Channel::FrontRightOfCenter,
sys::kAudioChannelLabel_CenterSurround => mixer::Channel::BackCenter,
sys::kAudioChannelLabel_LeftSurroundDirect => mixer::Channel::SideLeft,
sys::kAudioChannelLabel_RightSurroundDirect => mixer::Channel::SideRight,
sys::kAudioChannelLabel_TopCenterSurround => mixer::Channel::TopCenter,
sys::kAudioChannelLabel_VerticalHeightLeft => mixer::Channel::TopFrontLeft,
sys::kAudioChannelLabel_VerticalHeightCenter => mixer::Channel::TopFrontCenter,
sys::kAudioChannelLabel_VerticalHeightRight => mixer::Channel::TopFrontRight,
sys::kAudioChannelLabel_TopBackLeft => mixer::Channel::TopBackLeft,
sys::kAudioChannelLabel_TopBackCenter => mixer::Channel::TopBackCenter,
sys::kAudioChannelLabel_TopBackRight => mixer::Channel::TopBackRight,
sys::kAudioChannelLabel_Unknown => mixer::Channel::Discrete,
sys::kAudioChannelLabel_Unused => mixer::Channel::Silence,
v => {
eprintln!("Warning: channel label value {} isn't handled", v);
mixer::Channel::Silence
}
}
}
}
fn set_notification_runloop() {
let address = AudioObjectPropertyAddress {
mSelector: kAudioHardwarePropertyRunLoop,
mScope: kAudioObjectPropertyScopeGlobal,
mElement: kAudioObjectPropertyElementMaster,
};
// Ask HAL to manage its own thread for notification by setting the run_loop to NULL.
// Otherwise HAL may use main thread to fire notifications.
let run_loop: CFRunLoopRef = ptr::null_mut();
let size = mem::size_of::<CFRunLoopRef>();
let status =
audio_object_set_property_data(kAudioObjectSystemObject, &address, size, &run_loop);
if status != NO_ERR {
cubeb_log!("Could not make global CoreAudio notifications use their own thread.");
}
}
fn create_device_info(devid: AudioDeviceID, devtype: DeviceType) -> Option<device_info> {
assert_ne!(devid, kAudioObjectSystemObject);
debug_assert_running_serially();
let mut flags = match devtype {
DeviceType::INPUT => device_flags::DEV_INPUT,
DeviceType::OUTPUT => device_flags::DEV_OUTPUT,
_ => panic!("Only accept input or output type"),
};
if devid == kAudioObjectUnknown {
cubeb_log!("Using the system default device");
flags |= device_flags::DEV_SELECTED_DEFAULT;
get_default_device(devtype).map(|id| device_info { id, flags })
} else {
Some(device_info { id: devid, flags })
}
}
fn create_stream_description(stream_params: &StreamParams) -> Result<AudioStreamBasicDescription> {
assert!(stream_params.rate() > 0);
assert!(stream_params.channels() > 0);
let mut desc = AudioStreamBasicDescription::default();
match stream_params.format() {
SampleFormat::S16LE => {
desc.mBitsPerChannel = 16;
desc.mFormatFlags = kAudioFormatFlagIsSignedInteger;
}
SampleFormat::S16BE => {
desc.mBitsPerChannel = 16;
desc.mFormatFlags = kAudioFormatFlagIsSignedInteger | kAudioFormatFlagIsBigEndian;
}
SampleFormat::Float32LE => {
desc.mBitsPerChannel = 32;
desc.mFormatFlags = kAudioFormatFlagIsFloat;
}
SampleFormat::Float32BE => {
desc.mBitsPerChannel = 32;
desc.mFormatFlags = kAudioFormatFlagIsFloat | kAudioFormatFlagIsBigEndian;
}
_ => {
return Err(Error::invalid_format());
}
}
desc.mFormatID = kAudioFormatLinearPCM;
desc.mFormatFlags |= kLinearPCMFormatFlagIsPacked;
desc.mSampleRate = f64::from(stream_params.rate());
desc.mChannelsPerFrame = stream_params.channels();
desc.mBytesPerFrame = (desc.mBitsPerChannel / 8) * desc.mChannelsPerFrame;
desc.mFramesPerPacket = 1;
desc.mBytesPerPacket = desc.mBytesPerFrame * desc.mFramesPerPacket;
desc.mReserved = 0;
Ok(desc)
}
fn set_volume(unit: AudioUnit, volume: f32) -> Result<()> {
assert!(!unit.is_null());
let r = audio_unit_set_parameter(
unit,
kHALOutputParam_Volume,
kAudioUnitScope_Global,
0,
volume,
0,
);
if r == NO_ERR {
Ok(())
} else {
cubeb_log!("AudioUnitSetParameter/kHALOutputParam_Volume rv={}", r);
Err(Error::error())
}
}
fn get_volume(unit: AudioUnit) -> Result<f32> {
assert!(!unit.is_null());
let mut volume: f32 = 0.0;
let r = audio_unit_get_parameter(
unit,
kHALOutputParam_Volume,
kAudioUnitScope_Global,
0,
&mut volume,
);
if r == NO_ERR {
Ok(volume)
} else {
cubeb_log!("AudioUnitGetParameter/kHALOutputParam_Volume rv={}", r);
Err(Error::error())
}
}
fn set_input_mute(unit: AudioUnit, mute: bool) -> Result<()> {
assert!(!unit.is_null());
let mute: u32 = mute.into();
let mut old_mute: u32 = 0;
let r = audio_unit_get_property(
unit,
kAUVoiceIOProperty_MuteOutput,
kAudioUnitScope_Global,
AU_IN_BUS,
&mut old_mute,
&mut mem::size_of::<u32>(),
);
if r != NO_ERR {
cubeb_log!(
"AudioUnitGetProperty/kAUVoiceIOProperty_MuteOutput rv={}",
r
);
return Err(Error::error());
}
if old_mute == mute {
return Ok(());
}
let r = audio_unit_set_property(
unit,
kAUVoiceIOProperty_MuteOutput,
kAudioUnitScope_Global,
AU_IN_BUS,
&mute,
mem::size_of::<u32>(),
);
if r == NO_ERR {
Ok(())
} else {
cubeb_log!(
"AudioUnitSetProperty/kAUVoiceIOProperty_MuteOutput rv={}",
r
);
Err(Error::error())
}
}
fn set_input_processing_params(unit: AudioUnit, params: InputProcessingParams) -> Result<()> {
assert!(!unit.is_null());
let aec = params.contains(InputProcessingParams::ECHO_CANCELLATION);
let ns = params.contains(InputProcessingParams::NOISE_SUPPRESSION);
let agc = params.contains(InputProcessingParams::AUTOMATIC_GAIN_CONTROL);
assert_eq!(aec, ns);
let mut old_agc: u32 = 0;
let r = audio_unit_get_property(
unit,
kAUVoiceIOProperty_VoiceProcessingEnableAGC,
kAudioUnitScope_Global,
AU_IN_BUS,
&mut old_agc,
&mut mem::size_of::<u32>(),
);
if r != NO_ERR {
cubeb_log!(
"AudioUnitGetProperty/kAUVoiceIOProperty_VoiceProcessingEnableAGC rv={}",
r
);
return Err(Error::error());
}
if (old_agc == 1) != agc {
let agc = u32::from(agc);
let r = audio_unit_set_property(
unit,
kAUVoiceIOProperty_VoiceProcessingEnableAGC,
kAudioUnitScope_Global,
AU_IN_BUS,
&agc,
mem::size_of::<u32>(),
);
if r != NO_ERR {
cubeb_log!(
"AudioUnitSetProperty/kAUVoiceIOProperty_VoiceProcessingEnableAGC rv={}",
r
);
return Err(Error::error());
}
cubeb_log!(
"set_input_processing_params on unit {:p} - set agc: {}",
unit,
agc
);
}
let mut old_bypass: u32 = 0;
let r = audio_unit_get_property(
unit,
kAUVoiceIOProperty_BypassVoiceProcessing,
kAudioUnitScope_Global,
AU_IN_BUS,
&mut old_bypass,
&mut mem::size_of::<u32>(),
);
if r != NO_ERR {
cubeb_log!(
"AudioUnitGetProperty/kAUVoiceIOProperty_BypassVoiceProcessing rv={}",
r
);
return Err(Error::error());
}
let bypass = u32::from(!aec);
if old_bypass != bypass {
let r = audio_unit_set_property(
unit,
kAUVoiceIOProperty_BypassVoiceProcessing,
kAudioUnitScope_Global,
AU_IN_BUS,
&bypass,
mem::size_of::<u32>(),
);
if r != NO_ERR {
cubeb_log!(
"AudioUnitSetProperty/kAUVoiceIOProperty_BypassVoiceProcessing rv={}",
r
);
return Err(Error::error());
}
cubeb_log!(
"set_input_processing_params on unit {:p} - set bypass: {}",
unit,
bypass
);
}
Ok(())
}
fn minimum_resampling_input_frames(
input_rate: f64,
output_rate: f64,
output_frames: usize,
) -> usize {
assert!(!approx_eq!(f64, input_rate, 0_f64));
assert!(!approx_eq!(f64, output_rate, 0_f64));
if approx_eq!(f64, input_rate, output_rate) {
return output_frames;
}
(input_rate * output_frames as f64 / output_rate).ceil() as usize
}
fn audiounit_make_silent(io_data: &AudioBuffer) {
assert!(!io_data.mData.is_null());
let bytes = unsafe {
let ptr = io_data.mData as *mut u8;
let len = io_data.mDataByteSize as usize;
slice::from_raw_parts_mut(ptr, len)
};
for data in bytes.iter_mut() {
*data = 0;
}
}
extern "C" fn audiounit_input_callback(
user_ptr: *mut c_void,
flags: *mut AudioUnitRenderActionFlags,
tstamp: *const AudioTimeStamp,
bus: u32,
input_frames: u32,
_: *mut AudioBufferList,
) -> OSStatus {
enum ErrorHandle {
Return(OSStatus),
Reinit,
}
assert!(input_frames > 0);
assert_eq!(bus, AU_IN_BUS);
assert!(!user_ptr.is_null());
let stm = unsafe { &mut *(user_ptr as *mut AudioUnitStream) };
if unsafe { *flags | kAudioTimeStampHostTimeValid } != 0 {
let now = unsafe { mach_absolute_time() };
let input_latency_frames = compute_input_latency(stm, unsafe { (*tstamp).mHostTime }, now);
stm.total_input_latency_frames
.store(input_latency_frames, Ordering::SeqCst);
}
if stm.stopped.load(Ordering::SeqCst) {
cubeb_log!("({:p}) input stopped", stm as *const AudioUnitStream);
return NO_ERR;
}
let handler = |stm: &mut AudioUnitStream,
flags: *mut AudioUnitRenderActionFlags,
tstamp: *const AudioTimeStamp,
bus: u32,
input_frames: u32|
-> ErrorHandle {
let input_buffer_manager = stm.core_stream_data.input_buffer_manager.as_mut().unwrap();
assert_eq!(
stm.core_stream_data.stm_ptr,
user_ptr as *const AudioUnitStream
);
// `flags` and `tstamp` must be non-null so they can be casted into the references.
assert!(!flags.is_null());
let flags = unsafe { &mut (*flags) };
assert!(!tstamp.is_null());
let tstamp = unsafe { &(*tstamp) };
// Create the AudioBufferList to store input.
let mut input_buffer_list = AudioBufferList::default();
input_buffer_list.mBuffers[0].mDataByteSize =
stm.core_stream_data.input_dev_desc.mBytesPerFrame * input_frames;
input_buffer_list.mBuffers[0].mData = ptr::null_mut();
input_buffer_list.mBuffers[0].mNumberChannels =
stm.core_stream_data.input_dev_desc.mChannelsPerFrame;
input_buffer_list.mNumberBuffers = 1;
debug_assert!(!stm.core_stream_data.input_unit.is_null());
let status = audio_unit_render(
stm.core_stream_data.input_unit,
flags,
tstamp,
bus,
input_frames,
&mut input_buffer_list,
);
if (status != NO_ERR)
&& (status != kAudioUnitErr_CannotDoInCurrentContext
|| stm.core_stream_data.output_unit.is_null())
{
return ErrorHandle::Return(status);
}
let handle = if status == kAudioUnitErr_CannotDoInCurrentContext {
assert!(!stm.core_stream_data.output_unit.is_null());
// kAudioUnitErr_CannotDoInCurrentContext is returned when using a BT
// headset and the profile is changed from A2DP to HFP/HSP. The previous
// output device is no longer valid and must be reset.
// For now state that no error occurred and feed silence, stream will be
// resumed once reinit has completed.
ErrorHandle::Reinit
} else {
assert_eq!(status, NO_ERR);
#[cfg(feature = "audio-dump")]
{
dump_audio(
stm.core_stream_data.audio_dump_input,
input_buffer_list.mBuffers[0].mData,
input_frames * stm.core_stream_data.input_dev_desc.mChannelsPerFrame,
);
}
input_buffer_manager
.push_data(input_buffer_list.mBuffers[0].mData, input_frames as usize);
ErrorHandle::Return(status)
};
// Full Duplex. We'll call data_callback in the AudioUnit output callback. Record this
// callback for logging.
if !stm.core_stream_data.output_unit.is_null() {
let input_callback_data = InputCallbackData {
bytes: input_buffer_list.mBuffers[0].mDataByteSize,
rendered_frames: input_frames,
total_available: input_buffer_manager.available_frames(),
channels: input_buffer_list.mBuffers[0].mNumberChannels,
num_buf: input_buffer_list.mNumberBuffers,
};
stm.core_stream_data
.input_logging
.as_mut()
.unwrap()
.push(input_callback_data);
return handle;
}
cubeb_alogv!(
"({:p}) input: buffers {}, size {}, channels {}, rendered frames {}, total frames {}.",
stm.core_stream_data.stm_ptr,
input_buffer_list.mNumberBuffers,
input_buffer_list.mBuffers[0].mDataByteSize,
input_buffer_list.mBuffers[0].mNumberChannels,
input_frames,
input_buffer_manager.available_frames()
);
// Input only. Call the user callback through resampler.
// Resampler will deliver input buffer in the correct rate.
assert!(input_frames as usize <= input_buffer_manager.available_frames());
stm.frames_read.fetch_add(
input_buffer_manager.available_frames(),
atomic::Ordering::SeqCst,
);
let mut total_input_frames = input_buffer_manager.available_frames() as i64;
let input_buffer =
input_buffer_manager.get_linear_data(input_buffer_manager.available_frames());
let outframes = stm.core_stream_data.resampler.fill(
input_buffer,
&mut total_input_frames,
ptr::null_mut(),
0,
);
if outframes < 0 {
if !stm.stopped.swap(true, Ordering::SeqCst) {
stm.notify_state_changed(State::Error);
// Use a new thread, through the queue, to avoid deadlock when calling
// AudioOutputUnitStop method from inside render callback
stm.queue.clone().run_async(move || {
stm.core_stream_data.stop_audiounits();
});
}
return ErrorHandle::Return(status);
}
if outframes < total_input_frames {
stm.draining.store(true, Ordering::SeqCst);
}
handle
};
// If the stream is drained, do nothing.
let handle = if !stm.draining.load(Ordering::SeqCst) {
handler(stm, flags, tstamp, bus, input_frames)
} else {
ErrorHandle::Return(NO_ERR)
};
// If the input (input-only stream) is drained, cancel this callback. Whenever an output
// is involved, the output callback handles stopping all units and notifying of state.
if stm.core_stream_data.output_unit.is_null()
&& stm.draining.load(Ordering::SeqCst)
&& !stm.stopped.swap(true, Ordering::SeqCst)
{
cubeb_alog!("({:p}) Input-only drained.", stm as *const AudioUnitStream);
stm.notify_state_changed(State::Drained);
// Use a new thread, through the queue, to avoid deadlock when calling
// AudioOutputUnitStop method from inside render callback
let stm_ptr = user_ptr as usize;
stm.queue.clone().run_async(move || {
let stm = unsafe { &mut *(stm_ptr as *mut AudioUnitStream) };
stm.core_stream_data.stop_audiounits();
});
}
match handle {
ErrorHandle::Reinit => {
stm.reinit_async();
NO_ERR
}
ErrorHandle::Return(s) => s,
}
}
fn host_time_to_ns(ctx: &AudioUnitContext, host_time: u64) -> u64 {
let mut rv: f64 = host_time as f64;
rv *= ctx.host_time_to_ns_ratio.0 as f64;
rv /= ctx.host_time_to_ns_ratio.1 as f64;
rv as u64
}
fn compute_output_latency(stm: &AudioUnitStream, audio_output_time: u64, now: u64) -> u32 {
const NS2S: u64 = 1_000_000_000;
let output_hw_rate = stm.core_stream_data.output_dev_desc.mSampleRate as u64;
let fixed_latency_ns =
(stm.output_device_latency_frames.load(Ordering::SeqCst) as u64 * NS2S) / output_hw_rate;
// The total output latency is the timestamp difference + the stream latency + the hardware
// latency.
let total_output_latency_ns =
fixed_latency_ns + host_time_to_ns(stm.context, audio_output_time.saturating_sub(now));
(total_output_latency_ns * output_hw_rate / NS2S) as u32
}
fn compute_input_latency(stm: &AudioUnitStream, audio_input_time: u64, now: u64) -> u32 {
const NS2S: u64 = 1_000_000_000;
let input_hw_rate = stm.core_stream_data.input_dev_desc.mSampleRate as u64;
let fixed_latency_ns =
(stm.input_device_latency_frames.load(Ordering::SeqCst) as u64 * NS2S) / input_hw_rate;
// The total input latency is the timestamp difference + the stream latency +
// the hardware latency.
let total_input_latency_ns =
host_time_to_ns(stm.context, now.saturating_sub(audio_input_time)) + fixed_latency_ns;
(total_input_latency_ns * input_hw_rate / NS2S) as u32
}
extern "C" fn audiounit_output_callback(
user_ptr: *mut c_void,
flags: *mut AudioUnitRenderActionFlags,
tstamp: *const AudioTimeStamp,
bus: u32,
output_frames: u32,
out_buffer_list: *mut AudioBufferList,
) -> OSStatus {
assert_eq!(bus, AU_OUT_BUS);
assert!(!out_buffer_list.is_null());
assert!(!user_ptr.is_null());
let stm = unsafe { &mut *(user_ptr as *mut AudioUnitStream) };
if output_frames == 0 {
cubeb_alog!(
"({:p}) output callback empty.",
stm as *const AudioUnitStream
);
return NO_ERR;
}
let out_buffer_list_ref = unsafe { &mut (*out_buffer_list) };
assert_eq!(out_buffer_list_ref.mNumberBuffers, 1);
let buffers = unsafe {
let ptr = out_buffer_list_ref.mBuffers.as_mut_ptr();
let len = out_buffer_list_ref.mNumberBuffers as usize;
slice::from_raw_parts_mut(ptr, len)
};
if stm.stopped.load(Ordering::SeqCst) {
cubeb_alog!("({:p}) output stopped.", stm as *const AudioUnitStream);
audiounit_make_silent(&buffers[0]);
#[cfg(feature = "audio-dump")]
{
dump_audio(
stm.core_stream_data.audio_dump_output,
buffers[0].mData,
output_frames * stm.core_stream_data.output_dev_desc.mChannelsPerFrame,
);
}
return NO_ERR;
}
if stm.draining.load(Ordering::SeqCst) {
// Cancel all callbacks. For input-only streams, the input callback handles
// cancelling itself.
audiounit_make_silent(&buffers[0]);
#[cfg(feature = "audio-dump")]
{
dump_audio(
stm.core_stream_data.audio_dump_output,
buffers[0].mData,
output_frames * stm.core_stream_data.output_dev_desc.mChannelsPerFrame,
);
}
if !stm.stopped.swap(true, Ordering::SeqCst) {
cubeb_alog!("({:p}) output drained.", stm as *const AudioUnitStream);
stm.notify_state_changed(State::Drained);
// Use a new thread, through the queue, to avoid deadlock when calling
// AudioOutputUnitStop method from inside render callback
stm.queue.clone().run_async(move || {
stm.core_stream_data.stop_audiounits();
});
}
return NO_ERR;
}
let now = unsafe { mach_absolute_time() };
if unsafe { *flags | kAudioTimeStampHostTimeValid } != 0 {
let output_latency_frames =
compute_output_latency(stm, unsafe { (*tstamp).mHostTime }, now);
stm.total_output_latency_frames
.store(output_latency_frames, Ordering::SeqCst);
}
// Get output buffer
let output_buffer = match stm.core_stream_data.mixer.as_mut() {
None => buffers[0].mData,
Some(mixer) => {
// If remixing needs to occur, we can't directly work in our final
// destination buffer as data may be overwritten or too small to start with.
mixer.update_buffer_size(output_frames as usize);
mixer.get_buffer_mut_ptr() as *mut c_void
}
};
let prev_frames_written = stm.frames_written.load(Ordering::SeqCst);
stm.frames_written
.fetch_add(output_frames as usize, Ordering::SeqCst);
// Also get the input buffer if the stream is duplex
let (input_buffer, mut input_frames) = if !stm.core_stream_data.input_unit.is_null() {
let input_logging = &mut stm.core_stream_data.input_logging.as_mut().unwrap();
if input_logging.is_empty() {
cubeb_alogv!("no audio input data in output callback");
} else {
while let Some(input_callback_data) = input_logging.pop() {
cubeb_alogv!(
"input: buffers {}, size {}, channels {}, rendered frames {}, total frames {}.",
input_callback_data.num_buf,
input_callback_data.bytes,
input_callback_data.channels,
input_callback_data.rendered_frames,
input_callback_data.total_available
);
}
}
let input_buffer_manager = stm.core_stream_data.input_buffer_manager.as_mut().unwrap();
assert_ne!(stm.core_stream_data.input_dev_desc.mChannelsPerFrame, 0);
// If the output callback came first and this is a duplex stream, we need to
// fill in some additional silence in the resampler.
// Otherwise, if we had more than expected callbacks in a row, or we're
// currently switching, we add some silence as well to compensate for the
// fact that we're lacking some input data.
let input_frames_needed = minimum_resampling_input_frames(
stm.core_stream_data.input_dev_desc.mSampleRate,
f64::from(stm.core_stream_data.output_stream_params.rate()),
output_frames as usize,
);
let buffered_input_frames = input_buffer_manager.available_frames();
// Else if the input has buffered a lot already because the output started late, we
// need to trim the input buffer
if prev_frames_written == 0 && buffered_input_frames > input_frames_needed {
input_buffer_manager.trim(input_frames_needed);
let popped_frames = buffered_input_frames - input_frames_needed;
cubeb_alog!("Dropping {} frames in input buffer.", popped_frames);
}
let input_frames = if input_frames_needed > buffered_input_frames
&& (stm.switching_device.load(Ordering::SeqCst)
|| stm.reinit_pending.load(Ordering::SeqCst)
|| stm.frames_read.load(Ordering::SeqCst) == 0)
{
// The silent frames will be inserted in `get_linear_data` below.
let silent_frames_to_push = input_frames_needed - buffered_input_frames;
cubeb_alog!(
"({:p}) Missing Frames: {} will append {} frames of input silence.",
stm.core_stream_data.stm_ptr,
if stm.frames_read.load(Ordering::SeqCst) == 0 {
"input hasn't started,"
} else if stm.switching_device.load(Ordering::SeqCst) {
"device switching,"
} else {
"reinit pending,"
},
silent_frames_to_push
);
input_frames_needed
} else {
buffered_input_frames
};
stm.frames_read.fetch_add(input_frames, Ordering::SeqCst);
(
input_buffer_manager.get_linear_data(input_frames),
input_frames as i64,
)
} else {
(ptr::null_mut::<c_void>(), 0)
};
cubeb_alogv!(
"({:p}) output: buffers {}, size {}, channels {}, frames {}.",
stm as *const AudioUnitStream,
buffers.len(),
buffers[0].mDataByteSize,
buffers[0].mNumberChannels,
output_frames
);
assert_ne!(output_frames, 0);
let outframes = stm.core_stream_data.resampler.fill(
input_buffer,
if input_buffer.is_null() {
ptr::null_mut()
} else {
&mut input_frames
},
output_buffer,
i64::from(output_frames),
);
if outframes < 0 || outframes > i64::from(output_frames) {
audiounit_make_silent(&buffers[0]);
#[cfg(feature = "audio-dump")]
{
dump_audio(
stm.core_stream_data.audio_dump_output,
buffers[0].mData,
output_frames * stm.core_stream_data.output_dev_desc.mChannelsPerFrame,
);
}
if !stm.stopped.swap(true, Ordering::SeqCst) {
stm.notify_state_changed(State::Error);
// Use a new thread, through the queue, to avoid deadlock when calling
// AudioOutputUnitStop method from inside render callback
stm.queue.clone().run_async(move || {
stm.core_stream_data.stop_audiounits();
});
}
return NO_ERR;
}
stm.draining
.store(outframes < i64::from(output_frames), Ordering::SeqCst);
stm.output_callback_timing_data_write
.write(OutputCallbackTimingData {
frames_queued: stm.frames_queued,
timestamp: now,
buffer_size: outframes as u64,
});
stm.frames_queued += outframes as u64;
// Post process output samples.
if stm.draining.load(Ordering::SeqCst) {
// Clear missing frames (silence)
let frames_to_bytes = |frames: usize| -> usize {
let sample_size = cubeb_sample_size(stm.core_stream_data.output_stream_params.format());
let channel_count = stm.core_stream_data.output_stream_params.channels() as usize;
frames * sample_size * channel_count
};
let out_bytes = unsafe {
slice::from_raw_parts_mut(
output_buffer as *mut u8,
frames_to_bytes(output_frames as usize),
)
};
let start = frames_to_bytes(outframes as usize);
for byte in out_bytes.iter_mut().skip(start) {
*byte = 0;
}
}
// Mixing
if stm.core_stream_data.mixer.is_some() {
assert!(
buffers[0].mDataByteSize
>= stm.core_stream_data.output_dev_desc.mBytesPerFrame * output_frames
);
stm.core_stream_data.mixer.as_mut().unwrap().mix(
output_frames as usize,
buffers[0].mData,
buffers[0].mDataByteSize as usize,
);
}
#[cfg(feature = "audio-dump")]
{
dump_audio(
stm.core_stream_data.audio_dump_output,
buffers[0].mData,
output_frames * stm.core_stream_data.output_dev_desc.mChannelsPerFrame,
);
}
NO_ERR
}
#[allow(clippy::cognitive_complexity)]
extern "C" fn audiounit_property_listener_callback(
id: AudioObjectID,
address_count: u32,
addresses: *const AudioObjectPropertyAddress,
user: *mut c_void,
) -> OSStatus {
assert_ne!(address_count, 0);
let stm = unsafe { &mut *(user as *mut AudioUnitStream) };
let addrs = unsafe { slice::from_raw_parts(addresses, address_count as usize) };
if stm.switching_device.load(Ordering::SeqCst) {
cubeb_log!(
"Switching is already taking place. Skipping event for device {}",
id
);
return NO_ERR;
}
stm.switching_device.store(true, Ordering::SeqCst);
let mut explicit_device_dead = false;
cubeb_log!(
"({:p}) Handling {} device changed events for device {}",
stm as *const AudioUnitStream,
address_count,
id
);
for (i, addr) in addrs.iter().enumerate() {
let p = PropertySelector::from(addr.mSelector);
cubeb_log!("Event #{}: {}", i, p);
assert_ne!(p, PropertySelector::Unknown);
if p == PropertySelector::DeviceIsAlive {
explicit_device_dead = true;
}
}
// Handle the events
if explicit_device_dead {
if !stm.stopped.swap(true, Ordering::SeqCst) {
cubeb_log!("The user-selected input or output device is dead, entering error state");
// Use a different thread, through the queue, to avoid deadlock when calling
// Get/SetProperties method from inside notify callback
stm.queue.clone().run_async(move || {
stm.core_stream_data.stop_audiounits();
stm.close_on_error();
});
}
return NO_ERR;
}
{
let callback = stm.device_changed_callback.lock().unwrap();
if let Some(device_changed_callback) = *callback {
cubeb_log!("Calling device changed callback");
unsafe {
device_changed_callback(stm.user_ptr);
}
}
}
cubeb_log!("Reinitializing stream with new device because of device change, async");
stm.reinit_async();
NO_ERR
}
fn get_default_device(devtype: DeviceType) -> Option<AudioObjectID> {
debug_assert_running_serially();
match get_default_device_id(devtype) {
Err(e) => {
cubeb_log!("Cannot get default {:?} device. Error: {}", devtype, e);
None
}
Ok(id) if id == kAudioObjectUnknown => {
cubeb_log!("Get an invalid default {:?} device: {}", devtype, id);
None
}
Ok(id) => Some(id),
}
}
fn get_default_device_id(devtype: DeviceType) -> std::result::Result<AudioObjectID, OSStatus> {
debug_assert_running_serially();
let address = get_property_address(
match devtype {
DeviceType::INPUT => Property::HardwareDefaultInputDevice,
DeviceType::OUTPUT => Property::HardwareDefaultOutputDevice,
_ => panic!("Unsupport type"),
},
DeviceType::INPUT | DeviceType::OUTPUT,
);
let mut devid: AudioDeviceID = kAudioObjectUnknown;
let mut size = mem::size_of::<AudioDeviceID>();
let status =
audio_object_get_property_data(kAudioObjectSystemObject, &address, &mut size, &mut devid);
if status == NO_ERR {
Ok(devid)
} else {
Err(status)
}
}
fn audiounit_convert_channel_layout(layout: &AudioChannelLayout) -> Result<Vec<mixer::Channel>> {
if layout.mChannelLayoutTag != kAudioChannelLayoutTag_UseChannelDescriptions {
// kAudioChannelLayoutTag_UseChannelBitmap
// kAudioChannelLayoutTag_Mono
// kAudioChannelLayoutTag_Stereo
// ....
cubeb_log!("Only handling UseChannelDescriptions for now.\n");
return Err(Error::error());
}
let channel_descriptions = unsafe {
slice::from_raw_parts(
layout.mChannelDescriptions.as_ptr(),
layout.mNumberChannelDescriptions as usize,
)
};
let mut channels = Vec::with_capacity(layout.mNumberChannelDescriptions as usize);
for description in channel_descriptions {
let label = CAChannelLabel(description.mChannelLabel);
channels.push(label.into());
}
Ok(channels)
}
fn audiounit_get_preferred_channel_layout(output_unit: AudioUnit) -> Result<Vec<mixer::Channel>> {
debug_assert_running_serially();
let mut rv = NO_ERR;
let mut size: usize = 0;
rv = audio_unit_get_property_info(
output_unit,
kAudioDevicePropertyPreferredChannelLayout,
kAudioUnitScope_Output,
AU_OUT_BUS,
&mut size,
None,
);
if rv != NO_ERR {
cubeb_log!(
"AudioUnitGetPropertyInfo/kAudioDevicePropertyPreferredChannelLayout rv={}",
rv
);
return Err(Error::error());
}
debug_assert!(size > 0);
let mut layout = make_sized_audio_channel_layout(size);
rv = audio_unit_get_property(
output_unit,
kAudioDevicePropertyPreferredChannelLayout,
kAudioUnitScope_Output,
AU_OUT_BUS,
layout.as_mut(),
&mut size,
);
if rv != NO_ERR {
cubeb_log!(
"AudioUnitGetProperty/kAudioDevicePropertyPreferredChannelLayout rv={}",
rv
);
return Err(Error::error());
}
audiounit_convert_channel_layout(layout.as_ref())
}
// This is for output AudioUnit only. Calling this by input-only AudioUnit is prone
// to crash intermittently.
fn audiounit_get_current_channel_layout(output_unit: AudioUnit) -> Result<Vec<mixer::Channel>> {
debug_assert_running_serially();
let mut rv = NO_ERR;
let mut size: usize = 0;
rv = audio_unit_get_property_info(
output_unit,
kAudioUnitProperty_AudioChannelLayout,
kAudioUnitScope_Output,
AU_OUT_BUS,
&mut size,
None,
);
if rv != NO_ERR {
cubeb_log!(
"AudioUnitGetPropertyInfo/kAudioUnitProperty_AudioChannelLayout rv={}",
rv
);
return Err(Error::error());
}
debug_assert!(size > 0);
let mut layout = make_sized_audio_channel_layout(size);
rv = audio_unit_get_property(
output_unit,
kAudioUnitProperty_AudioChannelLayout,
kAudioUnitScope_Output,
AU_OUT_BUS,
layout.as_mut(),
&mut size,
);
if rv != NO_ERR {
cubeb_log!(
"AudioUnitGetProperty/kAudioUnitProperty_AudioChannelLayout rv={}",
rv
);
return Err(Error::error());
}
audiounit_convert_channel_layout(layout.as_ref())
}
fn get_channel_layout(output_unit: AudioUnit) -> Result<Vec<mixer::Channel>> {
debug_assert_running_serially();
audiounit_get_current_channel_layout(output_unit)
.or_else(|_| {
// The kAudioUnitProperty_AudioChannelLayout property isn't known before
// macOS 10.12, attempt another method.
cubeb_log!(
"Cannot get current channel layout for audiounit @ {:p}. Trying preferred channel layout.",
output_unit
);
audiounit_get_preferred_channel_layout(output_unit)
})
}
fn start_audiounit(unit: AudioUnit) -> Result<()> {
let status = audio_output_unit_start(unit);
if status == NO_ERR {
Ok(())
} else {
cubeb_log!("Cannot start audiounit @ {:p}. Error: {}", unit, status);
Err(Error::error())
}
}
fn stop_audiounit(unit: AudioUnit) -> Result<()> {
let status = audio_output_unit_stop(unit);
if status == NO_ERR {
Ok(())
} else {
cubeb_log!("Cannot stop audiounit @ {:p}. Error: {}", unit, status);
Err(Error::error())
}
}
fn create_audiounit(device: &device_info) -> Result<AudioUnit> {
assert!(device
.flags
.intersects(device_flags::DEV_INPUT | device_flags::DEV_OUTPUT));
assert!(!device
.flags
.contains(device_flags::DEV_INPUT | device_flags::DEV_OUTPUT));
debug_assert_running_serially();
let unit = create_blank_audiounit()?;
let mut bus = AU_OUT_BUS;
if device.flags.contains(device_flags::DEV_INPUT) {
// Input only.
if let Err(e) = enable_audiounit_scope(unit, DeviceType::INPUT, true) {
cubeb_log!("Failed to enable audiounit input scope. Error: {}", e);
dispose_audio_unit(unit);
return Err(Error::error());
}
if let Err(e) = enable_audiounit_scope(unit, DeviceType::OUTPUT, false) {
cubeb_log!("Failed to disable audiounit output scope. Error: {}", e);
dispose_audio_unit(unit);
return Err(Error::error());
}
bus = AU_IN_BUS;
}
if device.flags.contains(device_flags::DEV_OUTPUT) {
// Output only.
if let Err(e) = enable_audiounit_scope(unit, DeviceType::OUTPUT, true) {
cubeb_log!("Failed to enable audiounit output scope. Error: {}", e);
dispose_audio_unit(unit);
return Err(Error::error());
}
if let Err(e) = enable_audiounit_scope(unit, DeviceType::INPUT, false) {
cubeb_log!("Failed to disable audiounit input scope. Error: {}", e);
dispose_audio_unit(unit);
return Err(Error::error());
}
bus = AU_OUT_BUS;
}
if let Err(e) = set_device_to_audiounit(unit, device.id, bus) {
cubeb_log!(
"Failed to set device {} to the created audiounit. Error: {}",
device.id,
e
);
dispose_audio_unit(unit);
return Err(Error::error());
}
Ok(unit)
}
fn get_voiceprocessing_audiounit(
shared_voice_processing_unit: &mut SharedVoiceProcessingUnitManager,
in_device: &device_info,
out_device: &device_info,
) -> Result<OwningHandle<VoiceProcessingUnit>> {
debug_assert_running_serially();
assert!(in_device.flags.contains(device_flags::DEV_INPUT));
assert!(!in_device.flags.contains(device_flags::DEV_OUTPUT));
assert!(!out_device.flags.contains(device_flags::DEV_INPUT));
let unit_handle = shared_voice_processing_unit.take_or_create();
if let Err(e) = unit_handle {
cubeb_log!(
"Failed to create shared voiceprocessing audiounit. Error: {}",
e
);
return Err(Error::error());
}
let mut unit_handle = unit_handle.unwrap();
if let Err(e) = set_device_to_audiounit(unit_handle.as_mut().unit, in_device.id, AU_IN_BUS) {
cubeb_log!(
"Failed to set in device {} to the created audiounit. Error: {}",
in_device.id,
e
);
return Err(Error::error());
}
let has_output = out_device.id != kAudioObjectUnknown;
if let Err(e) =
enable_audiounit_scope(unit_handle.as_mut().unit, DeviceType::OUTPUT, has_output)
{
cubeb_log!("Failed to enable audiounit input scope. Error: {}", e);
return Err(Error::error());
}
if has_output {
if let Err(e) =
set_device_to_audiounit(unit_handle.as_mut().unit, out_device.id, AU_OUT_BUS)
{
cubeb_log!(
"Failed to set out device {} to the created audiounit. Error: {}",
out_device.id,
e
);
return Err(Error::error());
}
}
Ok(unit_handle)
}
fn enable_audiounit_scope(
unit: AudioUnit,
devtype: DeviceType,
enable_io: bool,
) -> std::result::Result<(), OSStatus> {
assert!(!unit.is_null());
let enable = u32::from(enable_io);
let (scope, element) = match devtype {
DeviceType::INPUT => (kAudioUnitScope_Input, AU_IN_BUS),
DeviceType::OUTPUT => (kAudioUnitScope_Output, AU_OUT_BUS),
_ => panic!(
"Enable AudioUnit {:?} with unsupported type: {:?}",
unit, devtype
),
};
let status = audio_unit_set_property(
unit,
kAudioOutputUnitProperty_EnableIO,
scope,
element,
&enable,
mem::size_of::<u32>(),
);
if status == NO_ERR {
Ok(())
} else {
Err(status)
}
}
fn set_device_to_audiounit(
unit: AudioUnit,
device_id: AudioObjectID,
bus: AudioUnitElement,
) -> std::result::Result<(), OSStatus> {
assert!(!unit.is_null());
let status = audio_unit_set_property(
unit,
kAudioOutputUnitProperty_CurrentDevice,
kAudioUnitScope_Global,
bus,
&device_id,
mem::size_of::<AudioDeviceID>(),
);
if status == NO_ERR {
Ok(())
} else {
Err(status)
}
}
fn create_typed_audiounit(sub_type: c_uint) -> Result<AudioUnit> {
let desc = AudioComponentDescription {
componentType: kAudioUnitType_Output,
componentSubType: sub_type,
componentManufacturer: kAudioUnitManufacturer_Apple,
componentFlags: 0,
componentFlagsMask: 0,
};
let comp = unsafe { AudioComponentFindNext(ptr::null_mut(), &desc) };
if comp.is_null() {
cubeb_log!("Could not find matching audio hardware.");
return Err(Error::error());
}
let mut unit: AudioUnit = ptr::null_mut();
let status = unsafe { AudioComponentInstanceNew(comp, &mut unit) };
if status == NO_ERR {
assert!(!unit.is_null());
Ok(unit)
} else {
cubeb_log!("Fail to get a new AudioUnit. Error: {}", status);
Err(Error::error())
}
}
fn create_blank_audiounit() -> Result<AudioUnit> {
#[cfg(not(target_os = "ios"))]
return create_typed_audiounit(kAudioUnitSubType_HALOutput);
#[cfg(target_os = "ios")]
return create_typed_audiounit(kAudioUnitSubType_RemoteIO);
}
fn create_voiceprocessing_audiounit() -> Result<VoiceProcessingUnit> {
let res = create_typed_audiounit(kAudioUnitSubType_VoiceProcessingIO);
if res.is_err() {
return Err(Error::error());
}
match get_default_device(DeviceType::OUTPUT) {
None => {
cubeb_log!("Could not get default output device in order to undo vpio ducking");
}
Some(id) => {
let r = audio_device_duck(id, 1.0, ptr::null_mut(), 0.5);
if r != NO_ERR {
cubeb_log!(
"Failed to undo ducking of voiceprocessing on output device {}. Proceeding... Error: {}",
id,
r
);
}
}
};
res.map(|unit| VoiceProcessingUnit { unit })
}
fn get_buffer_size(unit: AudioUnit, devtype: DeviceType) -> std::result::Result<u32, OSStatus> {
assert!(!unit.is_null());
let (scope, element) = match devtype {
DeviceType::INPUT => (kAudioUnitScope_Output, AU_IN_BUS),
DeviceType::OUTPUT => (kAudioUnitScope_Input, AU_OUT_BUS),
_ => panic!(
"Get buffer size of AudioUnit {:?} with unsupported type: {:?}",
unit, devtype
),
};
let mut frames: u32 = 0;
let mut size = mem::size_of::<u32>();
let status = audio_unit_get_property(
unit,
kAudioDevicePropertyBufferFrameSize,
scope,
element,
&mut frames,
&mut size,
);
if status == NO_ERR {
Ok(frames)
} else {
Err(status)
}
}
fn set_buffer_size(
unit: AudioUnit,
devtype: DeviceType,
frames: u32,
) -> std::result::Result<(), OSStatus> {
assert!(!unit.is_null());
let (scope, element) = match devtype {
DeviceType::INPUT => (kAudioUnitScope_Output, AU_IN_BUS),
DeviceType::OUTPUT => (kAudioUnitScope_Input, AU_OUT_BUS),
_ => panic!(
"Set buffer size of AudioUnit {:?} with unsupported type: {:?}",
unit, devtype
),
};
let status = audio_unit_set_property(
unit,
kAudioDevicePropertyBufferFrameSize,
scope,
element,
&frames,
mem::size_of_val(&frames),
);
if status == NO_ERR {
Ok(())
} else {
Err(status)
}
}
#[allow(clippy::mutex_atomic)] // The mutex needs to be fed into Condvar::wait_timeout.
fn set_buffer_size_sync(unit: AudioUnit, devtype: DeviceType, frames: u32) -> Result<()> {
let current_frames = get_buffer_size(unit, devtype).map_err(|e| {
cubeb_log!(
"Cannot get buffer size of AudioUnit {:?} for {:?}. Error: {}",
unit,
devtype,
e
);
Error::error()
})?;
if frames == current_frames {
cubeb_log!(
"The buffer frame size of AudioUnit {:?} for {:?} is already {}",
unit,
devtype,
frames
);
return Ok(());
}
let waiting_time = Duration::from_millis(100);
let pair = Arc::new((Mutex::new(false), Condvar::new()));
let mut pair2 = pair.clone();
let pair_ptr = &mut pair2;
assert_eq!(
audio_unit_add_property_listener(
unit,
kAudioDevicePropertyBufferFrameSize,
buffer_size_changed_callback,
pair_ptr,
),
NO_ERR
);
let _teardown = finally(|| {
assert_eq!(
audio_unit_remove_property_listener_with_user_data(
unit,
kAudioDevicePropertyBufferFrameSize,
buffer_size_changed_callback,
pair_ptr,
),
NO_ERR
);
});
set_buffer_size(unit, devtype, frames).map_err(|e| {
cubeb_log!(
"Failed to set buffer size for AudioUnit {:?} for {:?}. Error: {}",
unit,
devtype,
e
);
Error::error()
})?;
let (lock, cvar) = &*pair;
let changed = lock.lock().unwrap();
if !*changed {
let (chg, timeout_res) = cvar.wait_timeout(changed, waiting_time).unwrap();
if timeout_res.timed_out() {
cubeb_log!(
"Timed out for waiting the buffer frame size setting of AudioUnit {:?} for {:?}",
unit,
devtype
);
}
if !*chg {
return Err(Error::error());
}
}
let new_frames = get_buffer_size(unit, devtype).map_err(|e| {
cubeb_log!(
"Cannot get new buffer size of AudioUnit {:?} for {:?}. Error: {}",
unit,
devtype,
e
);
Error::error()
})?;
cubeb_log!(
"The new buffer frames size of AudioUnit {:?} for {:?} is {}",
unit,
devtype,
new_frames
);
extern "C" fn buffer_size_changed_callback(
in_client_data: *mut c_void,
_in_unit: AudioUnit,
in_property_id: AudioUnitPropertyID,
in_scope: AudioUnitScope,
in_element: AudioUnitElement,
) {
if in_scope == 0 {
// filter out the callback for global scope.
return;
}
assert!(in_element == AU_IN_BUS || in_element == AU_OUT_BUS);
assert_eq!(in_property_id, kAudioDevicePropertyBufferFrameSize);
let pair = unsafe { &mut *(in_client_data as *mut Arc<(Mutex<bool>, Condvar)>) };
let (lock, cvar) = &**pair;
let mut changed = lock.lock().unwrap();
*changed = true;
cvar.notify_one();
}
Ok(())
}
fn convert_uint32_into_string(data: u32) -> CString {
let empty = CString::default();
if data == 0 {
return empty;
}
// Reverse 0xWXYZ into 0xZYXW.
let mut buffer = vec![b'\x00'; 4]; // 4 bytes for uint32.
buffer[0] = (data >> 24) as u8;
buffer[1] = (data >> 16) as u8;
buffer[2] = (data >> 8) as u8;
buffer[3] = (data) as u8;
// CString::new() will consume the input bytes vec and add a '\0' at the
// end of the bytes. The input bytes vec must not contain any 0 bytes in
// it in case causing memory leaks.
CString::new(buffer).unwrap_or(empty)
}
fn get_channel_count(
devid: AudioObjectID,
devtype: DeviceType,
) -> std::result::Result<u32, OSStatus> {
assert_ne!(devid, kAudioObjectUnknown);
debug_assert_running_serially();
let devstreams = get_device_streams(devid, devtype)?;
let mut count: u32 = 0;
for ds in devstreams {
if devtype == DeviceType::INPUT
&& CoreStreamData::should_force_vpio_for_input_device(ds.device)
{
count += 1;
} else {
count += get_stream_virtual_format(ds.stream)
.map(|f| f.mChannelsPerFrame)
.unwrap_or(0);
}
}
Ok(count)
}
fn get_range_of_sample_rates(
devid: AudioObjectID,
devtype: DeviceType,
) -> std::result::Result<(f64, f64), String> {
debug_assert_running_serially();
let result = get_ranges_of_device_sample_rate(devid, devtype);
if let Err(e) = result {
return Err(format!("status {}", e));
}
let rates = result.unwrap();
if rates.is_empty() {
return Err(String::from("No data"));
}
let (mut min, mut max) = (f64::MAX, f64::MIN);
for rate in rates {
if rate.mMaximum > max {
max = rate.mMaximum;
}
if rate.mMinimum < min {
min = rate.mMinimum;
}
}
Ok((min, max))
}
fn get_fixed_latency(devid: AudioObjectID, devtype: DeviceType) -> u32 {
debug_assert_running_serially();
let device_latency = match get_device_latency(devid, devtype) {
Ok(latency) => latency,
Err(e) => {
cubeb_log!(
"Cannot get the device latency for device {} in {:?} scope. Error: {}",
devid,
devtype,
e
);
0 // default device latency
}
};
let stream_latency = get_device_streams(devid, devtype).and_then(|devstreams| {
if devstreams.is_empty() {
cubeb_log!(
"No stream on device {} in {:?} scope!",
devid,
devtype
);
Ok(0) // default stream latency
} else {
get_stream_latency(devstreams[0].stream)
}
}).inspect_err(|e| {
cubeb_log!(
"Cannot get the stream, or the latency of the first stream on device {} in {:?} scope. Error: {}",
devid,
devtype,
e
);
}).unwrap_or(0); // default stream latency
device_latency + stream_latency
}
#[allow(non_upper_case_globals)]
fn get_device_group_id(
id: AudioDeviceID,
devtype: DeviceType,
) -> std::result::Result<CString, OSStatus> {
debug_assert_running_serially();
match get_device_transport_type(id, devtype) {
Ok(kAudioDeviceTransportTypeBuiltIn) => {
cubeb_log!(
"The transport type is {:?}",
convert_uint32_into_string(kAudioDeviceTransportTypeBuiltIn)
);
match get_custom_group_id(id, devtype) {
Some(id) => return Ok(id),
None => {
cubeb_log!("Getting model UID instead.");
}
};
}
Ok(trans_type) => {
cubeb_log!(
"The transport type is {:?}. Getting model UID instead.",
convert_uint32_into_string(trans_type)
);
}
Err(e) => {
cubeb_log!(
"Error: {} when getting transport type. Get model uid instead.",
e
);
}
}
// Some devices (e.g. AirPods) might only set the model-uid in the global scope.
// The query might fail if the scope is input-only or output-only.
get_device_model_uid(id, devtype)
.or_else(|_| get_device_model_uid(id, DeviceType::INPUT | DeviceType::OUTPUT))
.map(|uid| uid.into_cstring())
}
fn get_custom_group_id(id: AudioDeviceID, devtype: DeviceType) -> Option<CString> {
debug_assert_running_serially();
const IMIC: u32 = 0x696D_6963; // "imic" (internal microphone)
const ISPK: u32 = 0x6973_706B; // "ispk" (internal speaker)
const EMIC: u32 = 0x656D_6963; // "emic" (external microphone)
const HDPN: u32 = 0x6864_706E; // "hdpn" (headphone)
match get_device_source(id, devtype) {
s @ Ok(IMIC) | s @ Ok(ISPK) => {
const GROUP_ID: &str = "builtin-internal-mic|spk";
cubeb_log!(
"Using hardcode group id: {} when source is: {:?}.",
GROUP_ID,
convert_uint32_into_string(s.unwrap())
);
return Some(CString::new(GROUP_ID).unwrap());
}
s @ Ok(EMIC) | s @ Ok(HDPN) => {
const GROUP_ID: &str = "builtin-external-mic|hdpn";
cubeb_log!(
"Using hardcode group id: {} when source is: {:?}.",
GROUP_ID,
convert_uint32_into_string(s.unwrap())
);
return Some(CString::new(GROUP_ID).unwrap());
}
Ok(s) => {
cubeb_log!(
"No custom group id when source is: {:?}.",
convert_uint32_into_string(s)
);
}
Err(e) => {
cubeb_log!("Error: {} when getting device source. ", e);
}
}
None
}
fn get_device_label(
id: AudioDeviceID,
devtype: DeviceType,
) -> std::result::Result<StringRef, OSStatus> {
debug_assert_running_serially();
get_device_source_name(id, devtype).or_else(|_| get_device_name(id, devtype))
}
fn get_device_global_uid(id: AudioDeviceID) -> std::result::Result<StringRef, OSStatus> {
debug_assert_running_serially();
get_device_uid(id, DeviceType::INPUT | DeviceType::OUTPUT)
}
#[allow(clippy::cognitive_complexity)]
fn create_cubeb_device_info(
devid: AudioObjectID,
devtype: DeviceType,
) -> Result<ffi::cubeb_device_info> {
if devtype != DeviceType::INPUT && devtype != DeviceType::OUTPUT {
return Err(Error::error());
}
let channels = get_channel_count(devid, devtype).map_err(|e| {
cubeb_log!("Cannot get the channel count. Error: {}", e);
Error::error()
})?;
if channels == 0 {
// Invalid type for this device.
return Err(Error::error());
}
let mut dev_info = ffi::cubeb_device_info {
max_channels: channels,
..Default::default()
};
assert!(
mem::size_of::<ffi::cubeb_devid>() >= mem::size_of_val(&devid),
"cubeb_devid can't represent devid"
);
dev_info.devid = devid as ffi::cubeb_devid;
match get_device_uid(devid, devtype) {
Ok(uid) => {
let c_string = uid.into_cstring();
dev_info.device_id = c_string.into_raw();
}
Err(e) => {
cubeb_log!(
"Cannot get the UID for device {} in {:?} scope. Error: {}",
devid,
devtype,
e
);
}
}
match get_device_group_id(devid, devtype) {
Ok(group_id) => {
dev_info.group_id = group_id.into_raw();
}
Err(e) => {
cubeb_log!(
"Cannot get the model UID for device {} in {:?} scope. Error: {}",
devid,
devtype,
e
);
}
}
let label = match get_device_label(devid, devtype) {
Ok(label) => label.into_cstring(),
Err(e) => {
cubeb_log!(
"Cannot get the label for device {} in {:?} scope. Error: {}",
devid,
devtype,
e
);
CString::default()
}
};
dev_info.friendly_name = label.into_raw();
match get_device_manufacturer(devid, devtype) {
Ok(vendor) => {
let vendor = vendor.into_cstring();
dev_info.vendor_name = vendor.into_raw();
}
Err(e) => {
cubeb_log!(
"Cannot get the manufacturer for device {} in {:?} scope. Error: {}",
devid,
devtype,
e
);
}
}
dev_info.device_type = match devtype {
DeviceType::INPUT => ffi::CUBEB_DEVICE_TYPE_INPUT,
DeviceType::OUTPUT => ffi::CUBEB_DEVICE_TYPE_OUTPUT,
_ => panic!("invalid type"),
};
dev_info.state = ffi::CUBEB_DEVICE_STATE_ENABLED;
dev_info.preferred = match get_default_device(devtype) {
Some(id) if id == devid => ffi::CUBEB_DEVICE_PREF_ALL,
_ => ffi::CUBEB_DEVICE_PREF_NONE,
};
dev_info.format = ffi::CUBEB_DEVICE_FMT_ALL;
dev_info.default_format = ffi::CUBEB_DEVICE_FMT_F32NE;
match get_device_sample_rate(devid, devtype) {
Ok(rate) => {
dev_info.default_rate = rate as u32;
}
Err(e) => {
cubeb_log!(
"Cannot get the sample rate for device {} in {:?} scope. Error: {}",
devid,
devtype,
e
);
}
}
match get_range_of_sample_rates(devid, devtype) {
Ok((min, max)) => {
dev_info.min_rate = min as u32;
dev_info.max_rate = max as u32;
}
Err(e) => {
cubeb_log!(
"Cannot get the range of sample rate for device {} in {:?} scope. Error: {}",
devid,
devtype,
e
);
}
}
let latency = get_fixed_latency(devid, devtype);
let (latency_low, latency_high) = match get_device_buffer_frame_size_range(devid, devtype) {
Ok(range) => (
latency + range.mMinimum as u32,
latency + range.mMaximum as u32,
),
Err(e) => {
cubeb_log!("Cannot get the buffer frame size for device {} in {:?} scope. Using default value instead. Error: {}", devid, devtype, e);
(
10 * dev_info.default_rate / 1000,
100 * dev_info.default_rate / 1000,
)
}
};
dev_info.latency_lo = latency_low;
dev_info.latency_hi = latency_high;
Ok(dev_info)
}
fn destroy_cubeb_device_info(device: &mut ffi::cubeb_device_info) {
// This should be mapped to the memory allocation in `create_cubeb_device_info`.
// The `device_id`, `group_id`, `vendor_name` can be null pointer if the queries
// failed, while `friendly_name` will be assigned to a default empty "" string.
// Set the pointers to null in case it points to some released memory.
unsafe {
if !device.device_id.is_null() {
let _ = CString::from_raw(device.device_id as *mut _);
device.device_id = ptr::null();
}
if !device.group_id.is_null() {
let _ = CString::from_raw(device.group_id as *mut _);
device.group_id = ptr::null();
}
assert!(!device.friendly_name.is_null());
let _ = CString::from_raw(device.friendly_name as *mut _);
device.friendly_name = ptr::null();
if !device.vendor_name.is_null() {
let _ = CString::from_raw(device.vendor_name as *mut _);
device.vendor_name = ptr::null();
}
}
}
fn audiounit_get_devices_of_type(devtype: DeviceType) -> Vec<AudioObjectID> {
assert!(devtype.intersects(DeviceType::INPUT | DeviceType::OUTPUT));
debug_assert_running_serially();
let mut devices = get_devices();
// Remove the aggregate device from the list of devices (if any).
devices.retain(|&device| {
if device == kAudioObjectUnknown {
false
} else if let Ok(uid) = get_device_global_uid(device) {
let uid = uid.into_string();
!uid.contains(PRIVATE_AGGREGATE_DEVICE_NAME)
&& !uid.contains(VOICEPROCESSING_AGGREGATE_DEVICE_NAME)
} else {
// Fail to get device uid.
true
}
});
// Expected sorted but did not find anything in the docs.
devices.sort_unstable();
if devtype.contains(DeviceType::INPUT | DeviceType::OUTPUT) {
return devices;
}
let mut devices_in_scope = Vec::new();
for device in devices {
let label = match get_device_label(device, DeviceType::OUTPUT | DeviceType::INPUT) {
Ok(label) => label.into_string(),
Err(e) => format!("Unknown(error: {})", e),
};
let info = format!("{} ({})", device, label);
if let Ok(channels) = get_channel_count(device, devtype) {
cubeb_log!("Device {} has {} {:?}-channels", info, channels, devtype);
if channels > 0 {
devices_in_scope.push(device);
}
} else {
cubeb_log!("Cannot get the channel count for device {}. Ignored.", info);
}
}
devices_in_scope
}
extern "C" fn audiounit_collection_changed_callback(
_in_object_id: AudioObjectID,
_in_number_addresses: u32,
_in_addresses: *const AudioObjectPropertyAddress,
in_client_data: *mut c_void,
) -> OSStatus {
let context = unsafe { &mut *(in_client_data as *mut AudioUnitContext) };
let queue = context.serial_queue.clone();
// This can be called from inside an AudioUnit function, dispatch to another queue.
queue.run_async(move || {
let ctx_ptr = context as *const AudioUnitContext;
let mut devices = context.devices.lock().unwrap();
if devices.input.changed_callback.is_none() && devices.output.changed_callback.is_none() {
return;
}
if devices.input.changed_callback.is_some() {
let input_devices = audiounit_get_devices_of_type(DeviceType::INPUT);
if devices.input.update_devices(input_devices) {
unsafe {
devices.input.changed_callback.unwrap()(
ctx_ptr as *mut ffi::cubeb,
devices.input.callback_user_ptr,
);
}
}
}
if devices.output.changed_callback.is_some() {
let output_devices = audiounit_get_devices_of_type(DeviceType::OUTPUT);
if devices.output.update_devices(output_devices) {
unsafe {
devices.output.changed_callback.unwrap()(
ctx_ptr as *mut ffi::cubeb,
devices.output.callback_user_ptr,
);
}
}
}
});
NO_ERR
}
#[derive(Debug)]
struct DevicesData {
changed_callback: ffi::cubeb_device_collection_changed_callback,
callback_user_ptr: *mut c_void,
devices: Vec<AudioObjectID>,
}
impl DevicesData {
fn set(
&mut self,
changed_callback: ffi::cubeb_device_collection_changed_callback,
callback_user_ptr: *mut c_void,
devices: Vec<AudioObjectID>,
) {
self.changed_callback = changed_callback;
self.callback_user_ptr = callback_user_ptr;
self.devices = devices;
}
fn update_devices(&mut self, devices: Vec<AudioObjectID>) -> bool {
// Elements in the vector expected sorted.
if self.devices == devices {
return false;
}
self.devices = devices;
true
}
fn clear(&mut self) {
self.changed_callback = None;
self.callback_user_ptr = ptr::null_mut();
self.devices.clear();
}
fn is_empty(&self) -> bool {
self.changed_callback.is_none()
&& self.callback_user_ptr.is_null()
&& self.devices.is_empty()
}
}
impl Default for DevicesData {
fn default() -> Self {
Self {
changed_callback: None,
callback_user_ptr: ptr::null_mut(),
devices: Vec::new(),
}
}
}
#[derive(Debug, Default)]
struct SharedDevices {
input: DevicesData,
output: DevicesData,
}
#[derive(Debug, Default)]
struct LatencyController {
streams: u32,
latency: Option<u32>,
}
impl LatencyController {
fn add_stream(&mut self, latency: u32) -> u32 {
self.streams += 1;
// For the 1st stream set anything within safe min-max
if self.streams == 1 {
assert!(self.latency.is_none());
// Silently clamp the latency down to the platform default, because we
// synthetize the clock from the callbacks, and we want the clock to update often.
self.latency = Some(latency.clamp(SAFE_MIN_LATENCY_FRAMES, SAFE_MAX_LATENCY_FRAMES));
}
self.latency.unwrap_or(latency)
}
fn subtract_stream(&mut self) {
self.streams -= 1;
if self.streams == 0 {
assert!(self.latency.is_some());
self.latency = None;
}
}
}
// SharedStorage<T> below looks generic but has evolved to be pretty tailored
// the observed behavior of VoiceProcessingIO audio units on macOS 14.
// Some key points are:
// - Creating the first VoiceProcessingIO unit in a process takes a long time, often > 3s.
// - Creating a second VoiceProcessingIO unit in a process is significantly faster, < 1s.
// - Disposing of a VoiceProcessingIO unit when all other VoiceProcessingIO units are
// uninitialized will take significantly longer than disposing the remaining
// VoiceProcessingIO units, and will have other side effects: starting another
// VoiceProcessingIO unit after this is on par with creating the first one in the
// process, bluetooth devices will move away from the handsfree profile, etc.
// The takeaway is that there is something internal to the VoiceProcessingIO audio unit
// that is costly to create and dispose of and its creation is triggered by creation of
// the first VoiceProcessingIO unit, and its disposal is triggered by the disposal of
// the first VoiceProcessingIO unit when no other VoiceProcessingIO units are initialized.
//
// The intended behavior of SharedStorage<T> and SharedVoiceProcessingUnitManager is therefore:
// - Retain ideally just one VoiceProcessingIO unit after stream destruction, so device
// switching is fast. The benefit of retaining more than one is unclear.
// - Dispose of either all VoiceProcessingIO units, or none at all, such that the retained
// VoiceProcessingIO unit really helps speed up creating and starting the next. In practice
// this means we retain all VoiceProcessingIO units until they can all be disposed of.
#[derive(Debug)]
struct SharedStorageInternal<T> {
// Storage for shared elements.
elements: Vec<T>,
// Number of elements in use, i.e. all elements created/taken and not recycled.
outstanding_element_count: usize,
// Used for invalidation of in-flight tasks to clear elements.
// Incremented when something takes a shared element.
generation: usize,
}
#[derive(Debug)]
struct SharedStorage<T> {
queue: Queue,
idle_timeout: Duration,
storage: Mutex<SharedStorageInternal<T>>,
}
impl<T: Send> SharedStorage<T> {
fn with_idle_timeout(queue: Queue, idle_timeout: Duration) -> Self {
Self {
queue,
idle_timeout,
storage: Mutex::new(SharedStorageInternal::<T> {
elements: Vec::default(),
outstanding_element_count: 0,
generation: 0,
}),
}
}
fn take_locked(guard: &mut MutexGuard<'_, SharedStorageInternal<T>>) -> Result<T> {
if let Some(e) = guard.elements.pop() {
cubeb_log!("Taking shared element #{}.", guard.elements.len());
guard.outstanding_element_count += 1;
guard.generation += 1;
return Ok(e);
}
Err(Error::not_supported())
}
fn create_with_locked<F>(
guard: &mut MutexGuard<'_, SharedStorageInternal<T>>,
f: F,
) -> Result<T>
where
F: FnOnce() -> Result<T>,
{
let start = Instant::now();
match f() {
Ok(obj) => {
cubeb_log!(
"Just created shared element #{}. Took {}s.",
guard.outstanding_element_count,
(Instant::now() - start).as_secs_f32()
);
guard.outstanding_element_count += 1;
guard.generation += 1;
Ok(obj)
}
Err(_) => {
cubeb_log!("Creating shared element failed");
Err(Error::error())
}
}
}
#[cfg(test)]
fn take(&self) -> Result<T> {
let mut guard = self.storage.lock().unwrap();
SharedStorage::take_locked(&mut guard)
}
fn take_or_create_with<F>(&self, f: F) -> Result<T>
where
F: FnOnce() -> Result<T>,
{
let mut guard = self.storage.lock().unwrap();
SharedStorage::take_locked(&mut guard)
.or_else(|_| SharedStorage::create_with_locked(&mut guard, f))
}
fn recycle(&self, obj: T) {
let mut guard = self.storage.lock().unwrap();
guard.outstanding_element_count -= 1;
cubeb_log!(
"Recycling shared element #{}. Nr of live elements now {}.",
guard.elements.len(),
guard.outstanding_element_count
);
guard.elements.push(obj);
}
fn clear_locked(guard: &mut MutexGuard<'_, SharedStorageInternal<T>>) {
let count = guard.elements.len();
let start = Instant::now();
guard.elements.clear();
cubeb_log!(
"Cleared {} shared element{}. Took {}s.",
count,
if count == 1 { "" } else { "s" },
(Instant::now() - start).as_secs_f32()
);
}
fn clear(&self) {
debug_assert_running_serially();
let mut guard = self.storage.lock().unwrap();
SharedStorage::clear_locked(&mut guard);
}
fn clear_if_all_idle_async(storage: &Arc<SharedStorage<T>>) {
let (queue, outstanding_element_count, generation) = {
let guard = storage.storage.lock().unwrap();
(
storage.queue.clone(),
guard.outstanding_element_count,
guard.generation,
)
};
if outstanding_element_count > 0 {
cubeb_log!(
"Not clearing shared voiceprocessing unit storage because {} elements are in use. Generation={}.",
outstanding_element_count,
generation
);
return;
}
cubeb_log!(
"Clearing shared voiceprocessing unit storage in {}s if still at generation {}.",
storage.idle_timeout.as_secs_f32(),
generation
);
let storage = storage.clone();
queue.run_after(Instant::now() + storage.idle_timeout, move || {
let mut guard = storage.storage.lock().unwrap();
if generation != guard.generation {
cubeb_log!(
"Not clearing shared voiceprocessing unit storage for generation {} as we're now at {}.",
generation,
guard.generation
);
return;
}
SharedStorage::clear_locked(&mut guard);
});
}
}
#[derive(Debug)]
struct OwningHandle<T>
where
T: Send,
{
storage: Weak<SharedStorage<T>>,
obj: Option<T>,
}
impl<T: Send> OwningHandle<T> {
fn new(storage: Weak<SharedStorage<T>>, obj: T) -> Self {
Self {
storage,
obj: Some(obj),
}
}
}
impl<T: Send> AsRef<T> for OwningHandle<T> {
fn as_ref(&self) -> &T {
self.obj.as_ref().unwrap()
}
}
impl<T: Send> AsMut<T> for OwningHandle<T> {
fn as_mut(&mut self) -> &mut T {
self.obj.as_mut().unwrap()
}
}
impl<T: Send> Drop for OwningHandle<T> {
fn drop(&mut self) {
let storage = self.storage.upgrade();
assert!(
storage.is_some(),
"Storage must outlive the handle, but didn't"
);
let storage = storage.unwrap();
if self.obj.is_none() {
return;
}
let obj = self.obj.take().unwrap();
storage.recycle(obj);
SharedStorage::clear_if_all_idle_async(&storage);
}
}
#[derive(Debug)]
struct VoiceProcessingUnit {
unit: AudioUnit,
}
impl Drop for VoiceProcessingUnit {
fn drop(&mut self) {
assert!(!self.unit.is_null());
dispose_audio_unit(self.unit);
}
}
unsafe impl Send for VoiceProcessingUnit {}
#[derive(Debug)]
struct SharedVoiceProcessingUnitManager {
sync_storage: Mutex<Option<Arc<SharedStorage<VoiceProcessingUnit>>>>,
queue: Queue,
idle_timeout: Duration,
}
impl SharedVoiceProcessingUnitManager {
fn with_idle_timeout(queue: Queue, idle_timeout: Duration) -> Self {
Self {
sync_storage: Mutex::new(None),
queue,
idle_timeout,
}
}
fn new(queue: Queue) -> Self {
SharedVoiceProcessingUnitManager::with_idle_timeout(queue, VPIO_IDLE_TIMEOUT)
}
fn ensure_storage_locked(
&self,
guard: &mut MutexGuard<Option<Arc<SharedStorage<VoiceProcessingUnit>>>>,
) {
if guard.is_some() {
return;
}
cubeb_log!("Creating shared voiceprocessing storage.");
let storage = SharedStorage::<VoiceProcessingUnit>::with_idle_timeout(
self.queue.clone(),
self.idle_timeout,
);
let old_storage = guard.replace(Arc::from(storage));
assert!(old_storage.is_none());
}
// Take an already existing, shared, vpio unit, if one is available.
#[cfg(test)]
fn take(&mut self) -> Result<OwningHandle<VoiceProcessingUnit>> {
debug_assert_running_serially();
let mut guard = self.sync_storage.lock().unwrap();
self.ensure_storage_locked(&mut guard);
let storage = guard.as_mut().unwrap();
let res = storage.take();
res.map(|u| OwningHandle::new(Arc::downgrade(storage), u))
}
// Take an already existing, shared, vpio unit, or create one if none are available.
fn take_or_create(&mut self) -> Result<OwningHandle<VoiceProcessingUnit>> {
debug_assert_running_serially();
let mut guard = self.sync_storage.lock().unwrap();
self.ensure_storage_locked(&mut guard);
let storage = guard.as_mut().unwrap();
let res = storage.take_or_create_with(create_voiceprocessing_audiounit);
res.map(|u| OwningHandle::new(Arc::downgrade(storage), u))
}
}
unsafe impl Send for SharedVoiceProcessingUnitManager {}
unsafe impl Sync for SharedVoiceProcessingUnitManager {}
impl Drop for SharedVoiceProcessingUnitManager {
fn drop(&mut self) {
debug_assert_not_running_serially();
self.queue.run_final(|| {
let mut guard = self.sync_storage.lock().unwrap();
if guard.is_none() {
return;
}
guard.as_mut().unwrap().clear();
});
}
}
pub const OPS: Ops = capi_new!(AudioUnitContext, AudioUnitStream);
// The fisrt member of the Cubeb context must be a pointer to a Ops struct. The Ops struct is an
// interface to link to all the Cubeb APIs, and the Cubeb interface use this assumption to operate
// the Cubeb APIs on different implementation.
// #[repr(C)] is used to prevent any padding from being added in the beginning of the AudioUnitContext.
#[repr(C)]
#[derive(Debug)]
pub struct AudioUnitContext {
_ops: *const Ops,
serial_queue: Queue,
latency_controller: Mutex<LatencyController>,
devices: Mutex<SharedDevices>,
host_time_to_ns_ratio: (u32, u32),
// Storage for a context-global vpio unit. Duplex streams that need one will take this
// and return it when done.
shared_voice_processing_unit: SharedVoiceProcessingUnitManager,
}
impl AudioUnitContext {
fn new() -> Self {
let queue_label = format!("{}.context", DISPATCH_QUEUE_LABEL);
let serial_queue =
Queue::new_with_target(queue_label.as_str(), get_serial_queue_singleton());
let shared_vp_queue = Queue::new_with_target(
format!("{}.context.shared_vpio", DISPATCH_QUEUE_LABEL).as_str(),
&serial_queue,
);
let host_time_to_ns_ratio = {
let mut timebase_info = mach_timebase_info { numer: 0, denom: 0 };
unsafe {
mach_timebase_info(&mut timebase_info);
}
(timebase_info.numer, timebase_info.denom)
};
Self {
_ops: &OPS as *const _,
serial_queue,
latency_controller: Mutex::new(LatencyController::default()),
devices: Mutex::new(SharedDevices::default()),
host_time_to_ns_ratio,
shared_voice_processing_unit: SharedVoiceProcessingUnitManager::new(shared_vp_queue),
}
}
fn active_streams(&self) -> u32 {
let controller = self.latency_controller.lock().unwrap();
controller.streams
}
fn update_latency_by_adding_stream(&self, latency_frames: u32) -> u32 {
let mut controller = self.latency_controller.lock().unwrap();
controller.add_stream(latency_frames)
}
fn update_latency_by_removing_stream(&self) {
let mut controller = self.latency_controller.lock().unwrap();
controller.subtract_stream();
}
fn add_devices_changed_listener(
&mut self,
devtype: DeviceType,
collection_changed_callback: ffi::cubeb_device_collection_changed_callback,
user_ptr: *mut c_void,
) -> Result<()> {
assert!(devtype.intersects(DeviceType::INPUT | DeviceType::OUTPUT));
assert!(collection_changed_callback.is_some());
let context_ptr = self as *mut AudioUnitContext;
let mut devices = self.devices.lock().unwrap();
// Note: second register without unregister first causes 'nope' error.
// Current implementation requires unregister before register a new cb.
if devtype.contains(DeviceType::INPUT) && devices.input.changed_callback.is_some()
|| devtype.contains(DeviceType::OUTPUT) && devices.output.changed_callback.is_some()
{
return Err(Error::invalid_parameter());
}
if devices.input.changed_callback.is_none() && devices.output.changed_callback.is_none() {
let address = get_property_address(
Property::HardwareDevices,
DeviceType::INPUT | DeviceType::OUTPUT,
);
let ret = audio_object_add_property_listener(
kAudioObjectSystemObject,
&address,
audiounit_collection_changed_callback,
context_ptr,
);
if ret != NO_ERR {
cubeb_log!(
"Cannot add devices-changed listener for {:?}, Error: {}",
devtype,
ret
);
return Err(Error::error());
}
}
if devtype.contains(DeviceType::INPUT) {
// Expected empty after unregister.
assert!(devices.input.is_empty());
devices.input.set(
collection_changed_callback,
user_ptr,
audiounit_get_devices_of_type(DeviceType::INPUT),
);
}
if devtype.contains(DeviceType::OUTPUT) {
// Expected empty after unregister.
assert!(devices.output.is_empty());
devices.output.set(
collection_changed_callback,
user_ptr,
audiounit_get_devices_of_type(DeviceType::OUTPUT),
);
}
Ok(())
}
fn remove_devices_changed_listener(&mut self, devtype: DeviceType) -> Result<()> {
if !devtype.intersects(DeviceType::INPUT | DeviceType::OUTPUT) {
return Err(Error::invalid_parameter());
}
let context_ptr = self as *mut AudioUnitContext;
let mut devices = self.devices.lock().unwrap();
if devtype.contains(DeviceType::INPUT) {
devices.input.clear();
}
if devtype.contains(DeviceType::OUTPUT) {
devices.output.clear();
}
if devices.input.changed_callback.is_some() || devices.output.changed_callback.is_some() {
return Ok(());
}
let address = get_property_address(
Property::HardwareDevices,
DeviceType::INPUT | DeviceType::OUTPUT,
);
// Note: unregister a non registered cb is not a problem, not checking.
let r = audio_object_remove_property_listener(
kAudioObjectSystemObject,
&address,
audiounit_collection_changed_callback,
context_ptr,
);
if r == NO_ERR {
Ok(())
} else {
cubeb_log!(
"Cannot remove devices-changed listener for {:?}, Error: {}",
devtype,
r
);
Err(Error::error())
}
}
}
impl ContextOps for AudioUnitContext {
fn init(_context_name: Option<&CStr>) -> Result<Context> {
run_serially(set_notification_runloop);
let mut ctx = Box::new(AudioUnitContext::new());
let queue_label = format!("{}.context.{:p}", DISPATCH_QUEUE_LABEL, ctx.as_ref());
ctx.serial_queue =
Queue::new_with_target(queue_label.as_str(), get_serial_queue_singleton());
let shared_vp_queue = Queue::new_with_target(
format!("{}.shared_vpio", queue_label).as_str(),
&ctx.serial_queue,
);
ctx.shared_voice_processing_unit = SharedVoiceProcessingUnitManager::new(shared_vp_queue);
Ok(unsafe { Context::from_ptr(Box::into_raw(ctx) as *mut _) })
}
fn backend_id(&mut self) -> &'static CStr {
unsafe { CStr::from_ptr(b"audiounit-rust\0".as_ptr() as *const _) }
}
#[cfg(target_os = "ios")]
fn max_channel_count(&mut self) -> Result<u32> {
Ok(2u32)
}
#[cfg(not(target_os = "ios"))]
fn max_channel_count(&mut self) -> Result<u32> {
self.serial_queue
.run_sync(|| {
let device = match get_default_device(DeviceType::OUTPUT) {
None => {
cubeb_log!("Could not get default output device");
return Err(Error::error());
}
Some(id) => id,
};
get_channel_count(device, DeviceType::OUTPUT).map_err(|e| {
cubeb_log!("Cannot get the channel count. Error: {}", e);
Error::error()
})
})
.unwrap()
}
#[cfg(target_os = "ios")]
fn min_latency(&mut self, _params: StreamParams) -> Result<u32> {
Err(not_supported());
}
#[cfg(not(target_os = "ios"))]
fn min_latency(&mut self, _params: StreamParams) -> Result<u32> {
self.serial_queue
.run_sync(|| {
let device = match get_default_device(DeviceType::OUTPUT) {
None => {
cubeb_log!("Could not get default output device");
return Err(Error::error());
}
Some(id) => id,
};
let range = get_device_buffer_frame_size_range(device, DeviceType::OUTPUT)
.map_err(|e| {
cubeb_log!("Could not get acceptable latency range. Error: {}", e);
Error::error()
})?;
Ok(cmp::max(range.mMinimum as u32, SAFE_MIN_LATENCY_FRAMES))
})
.unwrap()
}
#[cfg(target_os = "ios")]
fn preferred_sample_rate(&mut self) -> Result<u32> {
Err(not_supported());
}
#[cfg(not(target_os = "ios"))]
fn preferred_sample_rate(&mut self) -> Result<u32> {
self.serial_queue
.run_sync(|| {
let device = match get_default_device(DeviceType::OUTPUT) {
None => {
cubeb_log!("Could not get default output device");
return Err(Error::error());
}
Some(id) => id,
};
let rate = get_device_sample_rate(device, DeviceType::OUTPUT).map_err(|e| {
cubeb_log!(
"Cannot get the sample rate of the default output device. Error: {}",
e
);
Error::error()
})?;
Ok(rate as u32)
})
.unwrap()
}
fn supported_input_processing_params(&mut self) -> Result<InputProcessingParams> {
Ok(InputProcessingParams::ECHO_CANCELLATION
| InputProcessingParams::NOISE_SUPPRESSION
| InputProcessingParams::AUTOMATIC_GAIN_CONTROL)
}
fn enumerate_devices(
&mut self,
devtype: DeviceType,
collection: &DeviceCollectionRef,
) -> Result<()> {
let device_infos = self
.serial_queue
.run_sync(|| {
let mut dev_types = vec![DeviceType::INPUT, DeviceType::OUTPUT];
dev_types.retain(|&dt| devtype.contains(dt));
let device_ids: Vec<(DeviceType, Vec<AudioObjectID>)> = dev_types
.iter()
.map(|&dt| (dt, audiounit_get_devices_of_type(dt)))
.collect();
let count = device_ids.iter().map(|(_dt, ids)| ids.len()).sum();
let mut device_infos = Vec::with_capacity(count);
for (dt, dev_ids) in device_ids {
for dev_id in dev_ids {
if let Ok(info) = create_cubeb_device_info(dev_id, dt) {
device_infos.push(info);
}
}
}
device_infos
})
.unwrap();
let (ptr, len) = if device_infos.is_empty() {
(ptr::null_mut(), 0)
} else {
forget_vec(device_infos)
};
let coll = unsafe { &mut *collection.as_ptr() };
coll.device = ptr;
coll.count = len;
Ok(())
}
fn device_collection_destroy(&mut self, collection: &mut DeviceCollectionRef) -> Result<()> {
assert!(!collection.as_ptr().is_null());
let coll = unsafe { &mut *collection.as_ptr() };
if coll.device.is_null() {
return Ok(());
}
let mut devices = retake_forgotten_vec(coll.device, coll.count);
for device in &mut devices {
destroy_cubeb_device_info(device);
}
drop(devices); // Release the memory.
coll.device = ptr::null_mut();
coll.count = 0;
Ok(())
}
fn stream_init(
&mut self,
_stream_name: Option<&CStr>,
input_device: DeviceId,
input_stream_params: Option<&StreamParamsRef>,
output_device: DeviceId,
output_stream_params: Option<&StreamParamsRef>,
latency_frames: u32,
data_callback: ffi::cubeb_data_callback,
state_callback: ffi::cubeb_state_callback,
user_ptr: *mut c_void,
) -> Result<Stream> {
if !input_device.is_null() && input_stream_params.is_none() {
cubeb_log!("Cannot init an input device without input stream params");
return Err(Error::invalid_parameter());
}
if !output_device.is_null() && output_stream_params.is_none() {
cubeb_log!("Cannot init an output device without output stream params");
return Err(Error::invalid_parameter());
}
if input_stream_params.is_none() && output_stream_params.is_none() {
cubeb_log!("Cannot init a stream without any stream params");
return Err(Error::invalid_parameter());
}
if data_callback.is_none() {
cubeb_log!("Cannot init a stream without a data callback");
return Err(Error::invalid_parameter());
}
let in_stm_settings = if let Some(params) = input_stream_params {
let in_device = match self
.serial_queue
.run_sync(|| create_device_info(input_device as AudioDeviceID, DeviceType::INPUT))
.unwrap()
{
None => {
cubeb_log!("Fail to create device info for input");
return Err(Error::error());
}
Some(d) => d,
};
let stm_params = StreamParams::from(unsafe { *params.as_ptr() });
Some((stm_params, in_device))
} else {
None
};
let out_stm_settings = if let Some(params) = output_stream_params {
let out_device = match self
.serial_queue
.run_sync(|| create_device_info(output_device as AudioDeviceID, DeviceType::OUTPUT))
.unwrap()
{
None => {
cubeb_log!("Fail to create device info for output");
return Err(Error::error());
}
Some(d) => d,
};
let stm_params = StreamParams::from(unsafe { *params.as_ptr() });
Some((stm_params, out_device))
} else {
None
};
// Latency cannot change if another stream is operating in parallel. In this case
// latency is set to the other stream value.
let global_latency_frames = self.update_latency_by_adding_stream(latency_frames);
if global_latency_frames != latency_frames {
cubeb_log!(
"Use global latency {} instead of the requested latency {}.",
global_latency_frames,
latency_frames
);
}
let mut boxed_stream = Box::new(AudioUnitStream::new(
self,
user_ptr,
data_callback,
state_callback,
global_latency_frames,
));
// Rename the task queue to be an unique label.
let queue_label = format!(
"{}.stream.{:p}",
DISPATCH_QUEUE_LABEL,
boxed_stream.as_ref()
);
boxed_stream.queue = Queue::new_with_target(queue_label.as_str(), &boxed_stream.queue);
boxed_stream.core_stream_data =
CoreStreamData::new(boxed_stream.as_ref(), in_stm_settings, out_stm_settings);
let result = boxed_stream
.queue
.clone()
.run_sync(|| {
boxed_stream
.core_stream_data
.setup(&mut boxed_stream.context.shared_voice_processing_unit)
})
.unwrap();
if let Err(r) = result {
cubeb_log!(
"({:p}) Could not setup the audiounit stream.",
boxed_stream.as_ref()
);
return Err(r);
}
let cubeb_stream = unsafe { Stream::from_ptr(Box::into_raw(boxed_stream) as *mut _) };
cubeb_log!(
"({:p}) Cubeb stream init successful.",
cubeb_stream.as_ref()
);
Ok(cubeb_stream)
}
fn register_device_collection_changed(
&mut self,
devtype: DeviceType,
collection_changed_callback: ffi::cubeb_device_collection_changed_callback,
user_ptr: *mut c_void,
) -> Result<()> {
if devtype == DeviceType::UNKNOWN {
return Err(Error::invalid_parameter());
}
self.serial_queue
.clone()
.run_sync(|| {
if collection_changed_callback.is_some() {
self.add_devices_changed_listener(
devtype,
collection_changed_callback,
user_ptr,
)
} else {
self.remove_devices_changed_listener(devtype)
}
})
.unwrap()
}
}
impl Drop for AudioUnitContext {
fn drop(&mut self) {
assert!({
let devices = self.devices.lock().unwrap();
devices.input.changed_callback.is_none() && devices.output.changed_callback.is_none()
});
self.shared_voice_processing_unit =
SharedVoiceProcessingUnitManager::new(self.serial_queue.clone());
// Make sure all the pending (device-collection-changed-callback) tasks
// in queue are done, and cancel all the tasks appended after `drop` is executed.
let queue = self.serial_queue.clone();
queue.run_final(|| {});
{
let controller = self.latency_controller.lock().unwrap();
// assert(controller.streams == 0);
debug_assert!(controller.streams == 0);
if controller.streams > 0 {
cubeb_log!(
"({:p}) API misuse, {} streams active when context destroyed!",
self as *const AudioUnitContext,
controller.streams
);
}
}
}
}
#[allow(clippy::non_send_fields_in_send_ty)]
unsafe impl Send for AudioUnitContext {}
unsafe impl Sync for AudioUnitContext {}
// Holds the information for an audio input callback call, for debugging purposes.
struct InputCallbackData {
bytes: u32,
rendered_frames: u32,
total_available: usize,
channels: u32,
num_buf: u32,
}
struct InputCallbackLogger {
prod: ringbuf::Producer<InputCallbackData>,
cons: ringbuf::Consumer<InputCallbackData>,
}
impl InputCallbackLogger {
fn new() -> Self {
let ring = RingBuffer::<InputCallbackData>::new(16);
let (prod, cons) = ring.split();
Self { prod, cons }
}
fn push(&mut self, data: InputCallbackData) {
self.prod.push(data);
}
fn pop(&mut self) -> Option<InputCallbackData> {
self.cons.pop()
}
fn is_empty(&self) -> bool {
self.cons.is_empty()
}
}
impl fmt::Debug for InputCallbackLogger {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"InputCallbackLogger {{ prod: {}, cons: {} }}",
self.prod.len(),
self.cons.len()
)
}
}
#[derive(Debug)]
struct CoreStreamData<'ctx> {
stm_ptr: *const AudioUnitStream<'ctx>,
aggregate_device: Option<AggregateDevice>,
mixer: Option<Mixer>,
resampler: Resampler,
// Stream creation parameters.
input_stream_params: StreamParams,
output_stream_params: StreamParams,
// Device settings for AudioUnits.
input_dev_desc: AudioStreamBasicDescription,
output_dev_desc: AudioStreamBasicDescription,
// I/O AudioUnits.
input_unit: AudioUnit,
output_unit: AudioUnit,
// Handle to shared voiceprocessing AudioUnit, if in use.
voiceprocessing_unit_handle: Option<OwningHandle<VoiceProcessingUnit>>,
// Info of the I/O devices.
input_device: device_info,
output_device: device_info,
input_processing_params: InputProcessingParams,
input_mute: bool,
input_buffer_manager: Option<BufferManager>,
units_running: bool,
// Listeners indicating what system events are monitored.
default_input_listener: Option<device_property_listener>,
default_output_listener: Option<device_property_listener>,
input_alive_listener: Option<device_property_listener>,
input_source_listener: Option<device_property_listener>,
output_alive_listener: Option<device_property_listener>,
output_source_listener: Option<device_property_listener>,
input_logging: Option<InputCallbackLogger>,
#[cfg(feature = "audio-dump")]
audio_dump_session: ffi::cubeb_audio_dump_session_t,
#[cfg(feature = "audio-dump")]
audio_dump_session_running: bool,
#[cfg(feature = "audio-dump")]
audio_dump_input: ffi::cubeb_audio_dump_stream_t,
#[cfg(feature = "audio-dump")]
audio_dump_output: ffi::cubeb_audio_dump_stream_t,
}
impl<'ctx> Default for CoreStreamData<'ctx> {
fn default() -> Self {
Self {
stm_ptr: ptr::null(),
aggregate_device: None,
mixer: None,
resampler: Resampler::default(),
input_stream_params: StreamParams::from(ffi::cubeb_stream_params {
format: ffi::CUBEB_SAMPLE_FLOAT32NE,
rate: 0,
channels: 0,
layout: ffi::CUBEB_LAYOUT_UNDEFINED,
prefs: ffi::CUBEB_STREAM_PREF_NONE,
}),
output_stream_params: StreamParams::from(ffi::cubeb_stream_params {
format: ffi::CUBEB_SAMPLE_FLOAT32NE,
rate: 0,
channels: 0,
layout: ffi::CUBEB_LAYOUT_UNDEFINED,
prefs: ffi::CUBEB_STREAM_PREF_NONE,
}),
input_dev_desc: AudioStreamBasicDescription::default(),
output_dev_desc: AudioStreamBasicDescription::default(),
input_unit: ptr::null_mut(),
output_unit: ptr::null_mut(),
voiceprocessing_unit_handle: None,
input_device: device_info::default(),
output_device: device_info::default(),
input_processing_params: InputProcessingParams::NONE,
input_mute: false,
input_buffer_manager: None,
units_running: false,
default_input_listener: None,
default_output_listener: None,
input_alive_listener: None,
input_source_listener: None,
output_alive_listener: None,
output_source_listener: None,
input_logging: None,
#[cfg(feature = "audio-dump")]
audio_dump_session: ptr::null_mut(),
#[cfg(feature = "audio-dump")]
audio_dump_session_running: false,
#[cfg(feature = "audio-dump")]
audio_dump_input: ptr::null_mut(),
#[cfg(feature = "audio-dump")]
audio_dump_output: ptr::null_mut(),
}
}
}
impl<'ctx> CoreStreamData<'ctx> {
fn new(
stm: &AudioUnitStream<'ctx>,
input_stream_settings: Option<(StreamParams, device_info)>,
output_stream_settings: Option<(StreamParams, device_info)>,
) -> Self {
fn get_default_sttream_params() -> StreamParams {
StreamParams::from(ffi::cubeb_stream_params {
format: ffi::CUBEB_SAMPLE_FLOAT32NE,
rate: 0,
channels: 0,
layout: ffi::CUBEB_LAYOUT_UNDEFINED,
prefs: ffi::CUBEB_STREAM_PREF_NONE,
})
}
let (in_stm_params, in_dev) =
input_stream_settings.unwrap_or((get_default_sttream_params(), device_info::default()));
let (out_stm_params, out_dev) = output_stream_settings
.unwrap_or((get_default_sttream_params(), device_info::default()));
Self {
stm_ptr: stm,
aggregate_device: None,
mixer: None,
resampler: Resampler::default(),
input_stream_params: in_stm_params,
output_stream_params: out_stm_params,
input_dev_desc: AudioStreamBasicDescription::default(),
output_dev_desc: AudioStreamBasicDescription::default(),
input_unit: ptr::null_mut(),
output_unit: ptr::null_mut(),
voiceprocessing_unit_handle: None,
input_device: in_dev,
output_device: out_dev,
input_processing_params: InputProcessingParams::NONE,
input_mute: false,
input_buffer_manager: None,
units_running: false,
default_input_listener: None,
default_output_listener: None,
input_alive_listener: None,
input_source_listener: None,
output_alive_listener: None,
output_source_listener: None,
input_logging: None,
#[cfg(feature = "audio-dump")]
audio_dump_session: ptr::null_mut(),
#[cfg(feature = "audio-dump")]
audio_dump_session_running: false,
#[cfg(feature = "audio-dump")]
audio_dump_input: ptr::null_mut(),
#[cfg(feature = "audio-dump")]
audio_dump_output: ptr::null_mut(),
}
}
fn debug_assert_is_on_stream_queue(&self) {
if self.stm_ptr.is_null() {
return;
}
let stm = unsafe { &*self.stm_ptr };
stm.queue.debug_assert_is_current();
}
fn start_audiounits(&mut self) -> Result<()> {
self.debug_assert_is_on_stream_queue();
// Only allowed to be called after the stream is initialized
// and before the stream is destroyed.
debug_assert!(!self.input_unit.is_null() || !self.output_unit.is_null());
if !self.input_unit.is_null() {
start_audiounit(self.input_unit)?;
}
if self.using_voice_processing_unit() {
// Handle the VoiceProcessIO case where there is a single unit.
// Always try to remember the applied input processing params. If they cannot
// be applied in the new device pair, we notify the client of an error and it
// will have to open a new stream.
if let Err(r) =
set_input_processing_params(self.input_unit, self.input_processing_params)
{
cubeb_log!(
"({:p}) Failed to set params of voiceprocessing. Error: {}",
self.stm_ptr,
r
);
return Err(r);
}
return Ok(());
}
if !self.output_unit.is_null() {
start_audiounit(self.output_unit)?;
}
self.units_running = true;
Ok(())
}
fn stop_audiounits(&mut self) {
self.debug_assert_is_on_stream_queue();
self.units_running = false;
if !self.input_unit.is_null() {
let r = stop_audiounit(self.input_unit);
assert!(r.is_ok());
}
if self.using_voice_processing_unit() {
// Handle the VoiceProcessIO case where there is a single unit.
// Always reset input processing params to VPIO defaults in case VPIO is reused later.
let vpio_defaults = InputProcessingParams::ECHO_CANCELLATION
| InputProcessingParams::AUTOMATIC_GAIN_CONTROL
| InputProcessingParams::NOISE_SUPPRESSION;
if let Err(r) = set_input_processing_params(self.input_unit, vpio_defaults) {
cubeb_log!(
"({:p}) Failed to reset params of voiceprocessing. Error: {}",
self.stm_ptr,
r
);
}
return;
}
if !self.output_unit.is_null() {
let r = stop_audiounit(self.output_unit);
assert!(r.is_ok());
}
}
fn has_input(&self) -> bool {
self.input_stream_params.rate() > 0
}
fn has_output(&self) -> bool {
self.output_stream_params.rate() > 0
}
fn using_voice_processing_unit(&self) -> bool {
self.voiceprocessing_unit_handle.is_some()
}
fn same_clock_domain(&self) -> bool {
self.debug_assert_is_on_stream_queue();
// If not setting up a duplex stream, there is only one device,
// no reclocking necessary.
if !(self.has_input() && self.has_output()) {
return true;
}
let input_domain = match get_clock_domain(self.input_device.id, DeviceType::INPUT) {
Ok(clock_domain) => clock_domain,
Err(_) => {
cubeb_log!("Coudn't determine clock domains for input.");
return false;
}
};
let output_domain = match get_clock_domain(self.output_device.id, DeviceType::OUTPUT) {
Ok(clock_domain) => clock_domain,
Err(_) => {
cubeb_log!("Coudn't determine clock domains for input.");
return false;
}
};
input_domain == output_domain
}
#[allow(non_upper_case_globals)]
fn should_force_vpio_for_input_device(id: AudioDeviceID) -> bool {
assert!(id != kAudioObjectUnknown);
debug_assert_running_serially();
match get_device_transport_type(id, DeviceType::INPUT) {
Ok(kAudioDeviceTransportTypeBuiltIn) => {
cubeb_log!(
"Input device {} is on the VPIO force list because it is built in, \
and its volume is known to be very low without VPIO whenever VPIO \
is hooked up to it elsewhere.",
id
);
true
}
_ => false,
}
}
fn should_block_vpio_for_device_pair(
&self,
in_device: &device_info,
out_device: &device_info,
) -> bool {
self.debug_assert_is_on_stream_queue();
cubeb_log!("Evaluating device pair against VPIO block list");
let log_device_and_get_model_uid = |id, devtype| -> String {
let device_model_uid = get_device_model_uid(id, devtype)
.map(|s| s.into_string())
.unwrap_or_default();
cubeb_log!("{} uid=\"{}\", model_uid=\"{}\", transport_type={:?}, source={:?}, source_name=\"{}\", name=\"{}\", manufacturer=\"{}\"",
if devtype == DeviceType::INPUT {
"Input"
} else {
debug_assert_eq!(devtype, DeviceType::OUTPUT);
"Output"
},
get_device_uid(id, devtype).map(|s| s.into_string()).unwrap_or_default(),
device_model_uid,
convert_uint32_into_string(get_device_transport_type(id, devtype).unwrap_or(0)),
convert_uint32_into_string(get_device_source(id, devtype).unwrap_or(0)),
get_device_source_name(id, devtype).map(|s| s.into_string()).unwrap_or_default(),
get_device_name(id, devtype).map(|s| s.into_string()).unwrap_or_default(),
get_device_manufacturer(id, devtype).map(|s| s.into_string()).unwrap_or_default());
device_model_uid
};
#[allow(non_upper_case_globals)]
let in_id = match in_device.id {
kAudioObjectUnknown => None,
id => Some(id),
};
#[allow(non_upper_case_globals)]
let out_id = match out_device.id {
kAudioObjectUnknown => None,
id => Some(id),
};
let (in_model_uid, out_model_uid) = (
in_id
.map(|id| log_device_and_get_model_uid(id, DeviceType::INPUT))
.unwrap_or_default(),
out_id
.or_else(|| get_default_device(DeviceType::OUTPUT))
.map(|id| log_device_and_get_model_uid(id, DeviceType::OUTPUT))
.unwrap_or_default(),
);
if in_model_uid.contains(APPLE_STUDIO_DISPLAY_USB_ID)
&& out_model_uid.contains(APPLE_STUDIO_DISPLAY_USB_ID)
{
cubeb_log!("Both input and output device is an Apple Studio Display. BLOCKED");
return true;
}
cubeb_log!("Device pair is not blocked");
false
}
fn create_audiounits(
&mut self,
shared_voice_processing_unit: &mut SharedVoiceProcessingUnitManager,
) -> Result<(device_info, device_info)> {
self.debug_assert_is_on_stream_queue();
let should_use_voice_processing_unit = self.has_input()
&& (self
.input_stream_params
.prefs()
.contains(StreamPrefs::VOICE)
|| CoreStreamData::should_force_vpio_for_input_device(self.input_device.id))
&& !self.should_block_vpio_for_device_pair(&self.input_device, &self.output_device)
&& macos_kernel_major_version() != Ok(MACOS_KERNEL_MAJOR_VERSION_MONTEREY);
let should_use_aggregate_device = {
// It's impossible to create an aggregate device from an aggregate device, and it's
// unnecessary to create an aggregate device when opening the same device input/output. In
// all other cases, use an aggregate device.
let mut either_already_aggregate = false;
if self.has_input() {
let input_is_aggregate =
get_device_transport_type(self.input_device.id, DeviceType::INPUT).unwrap_or(0)
== kAudioDeviceTransportTypeAggregate;
if input_is_aggregate {
either_already_aggregate = true;
}
cubeb_log!(
"Input device ID: {} (aggregate: {:?})",
self.input_device.id,
input_is_aggregate
);
}
if self.has_output() {
let output_is_aggregate =
get_device_transport_type(self.output_device.id, DeviceType::OUTPUT)
.unwrap_or(0)
== kAudioDeviceTransportTypeAggregate;
if output_is_aggregate {
either_already_aggregate = true;
}
cubeb_log!(
"Output device ID: {} (aggregate: {:?})",
self.output_device.id,
output_is_aggregate
);
}
// Only use an aggregate device when the device are different.
self.has_input()
&& self.has_output()
&& self.input_device.id != self.output_device.id
&& !either_already_aggregate
};
// Create an AudioUnit:
// - If we're eligible to use voice processing, try creating a VoiceProcessingIO AudioUnit.
// - If we should use an aggregate device, try creating one and input and output AudioUnits next.
// - As last resort, create regular AudioUnits. This is also the normal non-duplex path.
if should_use_voice_processing_unit {
if let Ok(mut au_handle) = get_voiceprocessing_audiounit(
shared_voice_processing_unit,
&self.input_device,
&self.output_device,
) {
self.input_unit = au_handle.as_mut().unit;
if self.has_output() {
self.output_unit = au_handle.as_mut().unit;
}
self.voiceprocessing_unit_handle = Some(au_handle);
return Ok((self.input_device.clone(), self.output_device.clone()));
}
cubeb_log!(
"({:p}) Failed to get VoiceProcessingIO AudioUnit. Trying a regular one.",
self.stm_ptr
);
}
if should_use_aggregate_device {
if let Ok(device) = AggregateDevice::new(self.input_device.id, self.output_device.id) {
let in_dev_info = {
device_info {
id: device.get_device_id(),
..self.input_device
}
};
let out_dev_info = {
device_info {
id: device.get_device_id(),
..self.output_device
}
};
match (
create_audiounit(&in_dev_info),
create_audiounit(&out_dev_info),
) {
(Ok(in_au), Ok(out_au)) => {
cubeb_log!(
"({:p}) Using an aggregate device {} for input and output.",
self.stm_ptr,
device.get_device_id()
);
self.aggregate_device = Some(device);
self.input_unit = in_au;
self.output_unit = out_au;
return Ok((in_dev_info, out_dev_info));
}
(Err(e), Ok(au)) => {
cubeb_log!(
"({:p}) Failed to create input AudioUnit for aggregate device. Error: {}.",
self.stm_ptr,
e
);
dispose_audio_unit(au);
}
(Ok(au), Err(e)) => {
cubeb_log!(
"({:p}) Failed to create output AudioUnit for aggregate device. Error: {}.",
self.stm_ptr,
e
);
dispose_audio_unit(au);
}
(Err(e), _) => {
cubeb_log!(
"({:p}) Failed to create AudioUnits for aggregate device. Error: {}.",
self.stm_ptr,
e
);
}
}
}
cubeb_log!(
"({:p}) Failed to set up aggregate device. Using regular AudioUnits.",
self.stm_ptr
);
}
if self.has_input() {
match create_audiounit(&self.input_device) {
Ok(in_au) => self.input_unit = in_au,
Err(e) => {
cubeb_log!(
"({:p}) Failed to create regular AudioUnit for input. Error: {}",
self.stm_ptr,
e
);
return Err(e);
}
}
}
if self.has_output() {
match create_audiounit(&self.output_device) {
Ok(out_au) => self.output_unit = out_au,
Err(e) => {
cubeb_log!(
"({:p}) Failed to create regular AudioUnit for output. Error: {}",
self.stm_ptr,
e
);
if !self.input_unit.is_null() {
dispose_audio_unit(self.input_unit);
self.input_unit = ptr::null_mut();
}
return Err(e);
}
}
}
Ok((self.input_device.clone(), self.output_device.clone()))
}
#[allow(clippy::cognitive_complexity)] // TODO: Refactoring.
fn setup(
&mut self,
shared_voice_processing_unit: &mut SharedVoiceProcessingUnitManager,
) -> Result<()> {
self.debug_assert_is_on_stream_queue();
if self
.input_stream_params
.prefs()
.contains(StreamPrefs::LOOPBACK)
|| self
.output_stream_params
.prefs()
.contains(StreamPrefs::LOOPBACK)
{
cubeb_log!("({:p}) Loopback not supported for audiounit.", self.stm_ptr);
return Err(Error::not_supported());
}
let same_clock_domain = self.same_clock_domain();
let (in_dev_info, out_dev_info) = self.create_audiounits(shared_voice_processing_unit)?;
let using_voice_processing_unit = self.using_voice_processing_unit();
assert!(!self.stm_ptr.is_null());
let stream = unsafe { &(*self.stm_ptr) };
#[cfg(feature = "audio-dump")]
unsafe {
ffi::cubeb_audio_dump_init(&mut self.audio_dump_session);
}
// Configure I/O stream
if self.has_input() {
assert!(!self.input_unit.is_null());
cubeb_log!(
"({:p}) Initializing input by device info: {:?}",
self.stm_ptr,
in_dev_info
);
let device_channel_count =
get_channel_count(self.input_device.id, DeviceType::INPUT).unwrap_or(0);
if device_channel_count < self.input_stream_params.channels() {
cubeb_log!(
"({:p}) Invalid input channel count; device={}, params={}",
self.stm_ptr,
device_channel_count,
self.input_stream_params.channels()
);
return Err(Error::invalid_parameter());
}
cubeb_log!(
"({:p}) Opening input side: rate {}, channels {}, format {:?}, layout {:?}, prefs {:?}, latency in frames {}, voice processing {}.",
self.stm_ptr,
self.input_stream_params.rate(),
self.input_stream_params.channels(),
self.input_stream_params.format(),
self.input_stream_params.layout(),
self.input_stream_params.prefs(),
stream.latency_frames,
using_voice_processing_unit
);
// Get input device hardware information.
let mut input_hw_desc = AudioStreamBasicDescription::default();
let mut size = mem::size_of::<AudioStreamBasicDescription>();
let r = audio_unit_get_property(
self.input_unit,
kAudioUnitProperty_StreamFormat,
if using_voice_processing_unit {
// With a VPIO unit the input scope includes AEC reference channels.
// We need to use the output scope of the input bus.
kAudioUnitScope_Output
} else {
// With a HAL unit the output scope for the input bus returns the number of
// output channels of the output device, i.e. it seems the bus is ignored.
kAudioUnitScope_Input
},
AU_IN_BUS,
&mut input_hw_desc,
&mut size,
);
if r != NO_ERR {
cubeb_log!(
"AudioUnitGetProperty/input/kAudioUnitProperty_StreamFormat rv={}",
r
);
return Err(Error::error());
}
cubeb_log!(
"({:p}) Input hardware description: {:?}",
self.stm_ptr,
input_hw_desc
);
// Notice: when we are using aggregate device, the input_hw_desc.mChannelsPerFrame is
// the total of all the input channel count of the devices added in the aggregate device.
// Due to our aggregate device settings, the data captured by the output device's input
// channels will be put in the beginning of the raw data given by the input callback.
// Always request all the input channels of the device, and only pass the correct
// channels to the audio callback.
let params = unsafe {
let mut p = *self.input_stream_params.as_ptr();
p.channels = input_hw_desc.mChannelsPerFrame;
// Input AudioUnit must be configured with device's sample rate.
// we will resample inside input callback.
p.rate = input_hw_desc.mSampleRate as _;
StreamParams::from(p)
};
self.input_dev_desc = create_stream_description(¶ms).inspect_err(|_| {
cubeb_log!(
"({:p}) Setting format description for input failed.",
self.stm_ptr
);
})?;
#[cfg(feature = "audio-dump")]
{
let name = format!("input-{:p}.wav", self.stm_ptr);
let cname = CString::new(name).expect("OK");
let rv = unsafe {
ffi::cubeb_audio_dump_stream_init(
self.audio_dump_session,
&mut self.audio_dump_input,
*params.as_ptr(),
cname.as_ptr(),
)
};
if rv == 0 {
assert_ne!(self.audio_dump_input, ptr::null_mut(),);
cubeb_log!("Successfully inited audio dump for input");
} else {
cubeb_log!("Failed to init audio dump for input");
}
}
assert_eq!(self.input_dev_desc.mSampleRate, input_hw_desc.mSampleRate);
// Use latency to set buffer size
assert_ne!(stream.latency_frames, 0);
if let Err(r) =
set_buffer_size_sync(self.input_unit, DeviceType::INPUT, stream.latency_frames)
{
cubeb_log!("({:p}) Error in change input buffer size.", self.stm_ptr);
return Err(r);
}
let r = audio_unit_set_property(
self.input_unit,
kAudioUnitProperty_StreamFormat,
kAudioUnitScope_Output,
AU_IN_BUS,
&self.input_dev_desc,
mem::size_of::<AudioStreamBasicDescription>(),
);
if r != NO_ERR {
cubeb_log!(
"AudioUnitSetProperty/input/kAudioUnitProperty_StreamFormat rv={}",
r
);
return Err(Error::error());
}
// Frames per buffer in the input callback.
let r = audio_unit_set_property(
self.input_unit,
kAudioUnitProperty_MaximumFramesPerSlice,
kAudioUnitScope_Global,
AU_IN_BUS,
&stream.latency_frames,
mem::size_of::<u32>(),
);
if r != NO_ERR {
cubeb_log!(
"AudioUnitSetProperty/input/kAudioUnitProperty_MaximumFramesPerSlice rv={}",
r
);
return Err(Error::error());
}
// When we use the aggregate device, the self.input_dev_desc.mChannelsPerFrame is the
// total input channel count of all the device added in the aggregate device. However,
// we only need the audio data captured by the requested input device, so we need to
// ignore some data captured by the audio input of the requested output device (e.g.,
// the requested output device is a USB headset with built-in mic), in the beginning of
// the raw data taken from input callback.
self.input_buffer_manager = Some(BufferManager::new(
self.input_stream_params.format(),
SAFE_MAX_LATENCY_FRAMES as usize,
self.input_dev_desc.mChannelsPerFrame as usize,
self.input_dev_desc
.mChannelsPerFrame
.saturating_sub(device_channel_count) as usize,
self.input_stream_params.channels() as usize,
));
let aurcbs_in = AURenderCallbackStruct {
inputProc: Some(audiounit_input_callback),
inputProcRefCon: self.stm_ptr as *mut c_void,
};
let r = audio_unit_set_property(
self.input_unit,
kAudioOutputUnitProperty_SetInputCallback,
kAudioUnitScope_Global,
AU_OUT_BUS,
&aurcbs_in,
mem::size_of_val(&aurcbs_in),
);
if r != NO_ERR {
cubeb_log!(
"AudioUnitSetProperty/input/kAudioOutputUnitProperty_SetInputCallback rv={}",
r
);
return Err(Error::error());
}
stream.frames_read.store(0, Ordering::SeqCst);
cubeb_log!(
"({:p}) Input audiounit init with device {} successfully.",
self.stm_ptr,
in_dev_info.id
);
}
if self.has_input() && !self.has_output() && using_voice_processing_unit {
// We must configure the output side of VPIO to match the input side, even if we don't use it.
let r = audio_unit_set_property(
self.input_unit,
kAudioUnitProperty_StreamFormat,
kAudioUnitScope_Input,
AU_OUT_BUS,
&self.input_dev_desc,
mem::size_of::<AudioStreamBasicDescription>(),
);
if r != NO_ERR {
cubeb_log!(
"AudioUnitSetProperty/output/kAudioUnitProperty_StreamFormat rv={}",
r
);
return Err(Error::error());
}
}
if self.has_output() {
assert!(!self.output_unit.is_null());
cubeb_log!(
"({:p}) Initialize output by device info: {:?}",
self.stm_ptr,
out_dev_info
);
cubeb_log!(
"({:p}) Opening output side: rate {}, channels {}, format {:?}, layout {:?}, prefs {:?}, latency in frames {}, voice processing {}.",
self.stm_ptr,
self.output_stream_params.rate(),
self.output_stream_params.channels(),
self.output_stream_params.format(),
self.output_stream_params.layout(),
self.output_stream_params.prefs(),
stream.latency_frames,
using_voice_processing_unit
);
// Get output device hardware information.
let mut output_hw_desc = AudioStreamBasicDescription::default();
let mut size = mem::size_of::<AudioStreamBasicDescription>();
let r = audio_unit_get_property(
self.output_unit,
kAudioUnitProperty_StreamFormat,
if using_voice_processing_unit {
// With a VPIO unit the output scope includes all channels in the hw.
// The VPIO unit however is only MONO which the input scope reflects.
kAudioUnitScope_Input
} else {
// With a HAL unit the output scope for the output bus returns the number of
// output channels of the hw, as we want. The input scope seems limited to
// two channels.
kAudioUnitScope_Output
},
AU_OUT_BUS,
&mut output_hw_desc,
&mut size,
);
if r != NO_ERR {
cubeb_log!(
"AudioUnitGetProperty/output/kAudioUnitProperty_StreamFormat rv={}",
r
);
return Err(Error::error());
}
cubeb_log!(
"({:p}) Output hardware description: {:?}",
self.stm_ptr,
output_hw_desc
);
// This has been observed in the wild.
if output_hw_desc.mChannelsPerFrame == 0 {
cubeb_log!(
"({:p}) Output hardware description channel count is zero",
self.stm_ptr
);
return Err(Error::error());
}
// Simple case of stereo output, map to the stereo pair (that might not be the first
// two channels). Fall back to regular mixing if this fails.
let mut maybe_need_mixer = true;
if self.output_stream_params.channels() == 2
&& self.output_stream_params.layout() == ChannelLayout::STEREO
{
let layout = AudioChannelLayout {
mChannelLayoutTag: kAudioChannelLayoutTag_Stereo,
..Default::default()
};
let r = audio_unit_set_property(
self.output_unit,
kAudioUnitProperty_AudioChannelLayout,
kAudioUnitScope_Input,
AU_OUT_BUS,
&layout,
mem::size_of::<AudioChannelLayout>(),
);
if r != NO_ERR {
cubeb_log!(
"AudioUnitSetProperty/output/kAudioUnitProperty_AudioChannelLayout rv={}",
r
);
}
maybe_need_mixer = r != NO_ERR;
}
// Notice: when we are using aggregate device, the output_hw_desc.mChannelsPerFrame is
// the total of all the output channel count of the devices added in the aggregate device.
// Due to our aggregate device settings, the data recorded by the input device's output
// channels will be appended at the end of the raw data given by the output callback.
let params = unsafe {
let mut p = *self.output_stream_params.as_ptr();
p.channels = if maybe_need_mixer {
output_hw_desc.mChannelsPerFrame
} else {
self.output_stream_params.channels()
};
if using_voice_processing_unit {
// VPIO will always use the sample rate of the input hw for both input and output,
// as reported to us. (We can override it but we cannot improve quality this way).
p.rate = self.input_dev_desc.mSampleRate as _;
}
StreamParams::from(p)
};
self.output_dev_desc = create_stream_description(¶ms).inspect_err(|_| {
cubeb_log!(
"({:p}) Could not initialize the audio stream description.",
self.stm_ptr
);
})?;
#[cfg(feature = "audio-dump")]
{
let name = format!("output-{:p}.wav", self.stm_ptr);
let cname = CString::new(name).expect("OK");
let rv = unsafe {
ffi::cubeb_audio_dump_stream_init(
self.audio_dump_session,
&mut self.audio_dump_output,
*params.as_ptr(),
cname.as_ptr(),
)
};
if rv == 0 {
assert_ne!(self.audio_dump_output, ptr::null_mut(),);
cubeb_log!("Successfully inited audio dump for output");
} else {
cubeb_log!("Failed to init audio dump for output");
}
}
let device_layout = self
.get_output_channel_layout()
.inspect_err(|_| {
cubeb_log!(
"({:p}) Could not get any channel layout. Defaulting to no channels.",
self.stm_ptr
);
})
.unwrap_or_default();
cubeb_log!(
"({:p} Using output device channel layout {:?}",
self.stm_ptr,
device_layout
);
if maybe_need_mixer {
// The mixer will be set up when
// 0. not playing simply stereo, or failing to set the channel layout to the stereo
// pair
// 1. using aggregate device whose input device has output channels
// 2. output device has more channels than we need, and stream isn't simply stereo
// 3. output device has different layout than the one we have
self.mixer = if self.output_dev_desc.mChannelsPerFrame
!= self.output_stream_params.channels()
|| device_layout != mixer::get_channel_order(self.output_stream_params.layout())
{
cubeb_log!("Incompatible channel layouts detected, setting up remixer");
// We will be remixing the data before it reaches the output device.
Some(Mixer::new(
self.output_stream_params.format(),
self.output_stream_params.channels() as usize,
self.output_stream_params.layout(),
self.output_dev_desc.mChannelsPerFrame as usize,
device_layout,
))
} else {
None
};
}
let r = audio_unit_set_property(
self.output_unit,
kAudioUnitProperty_StreamFormat,
kAudioUnitScope_Input,
AU_OUT_BUS,
&self.output_dev_desc,
mem::size_of::<AudioStreamBasicDescription>(),
);
if r != NO_ERR {
cubeb_log!(
"AudioUnitSetProperty/output/kAudioUnitProperty_StreamFormat rv={}",
r
);
return Err(Error::error());
}
// Use latency to set buffer size
assert_ne!(stream.latency_frames, 0);
if let Err(r) =
set_buffer_size_sync(self.output_unit, DeviceType::OUTPUT, stream.latency_frames)
{
cubeb_log!("({:p}) Error in change output buffer size.", self.stm_ptr);
return Err(r);
}
// Frames per buffer in the input callback.
let r = audio_unit_set_property(
self.output_unit,
kAudioUnitProperty_MaximumFramesPerSlice,
kAudioUnitScope_Global,
AU_OUT_BUS,
&stream.latency_frames,
mem::size_of::<u32>(),
);
if r != NO_ERR {
cubeb_log!(
"AudioUnitSetProperty/output/kAudioUnitProperty_MaximumFramesPerSlice rv={}",
r
);
return Err(Error::error());
}
let aurcbs_out = AURenderCallbackStruct {
inputProc: Some(audiounit_output_callback),
inputProcRefCon: self.stm_ptr as *mut c_void,
};
let r = audio_unit_set_property(
self.output_unit,
kAudioUnitProperty_SetRenderCallback,
kAudioUnitScope_Global,
AU_OUT_BUS,
&aurcbs_out,
mem::size_of_val(&aurcbs_out),
);
if r != NO_ERR {
cubeb_log!(
"AudioUnitSetProperty/output/kAudioUnitProperty_SetRenderCallback rv={}",
r
);
return Err(Error::error());
}
stream.frames_written.store(0, Ordering::SeqCst);
cubeb_log!(
"({:p}) Output audiounit init with device {} successfully.",
self.stm_ptr,
out_dev_info.id
);
}
// We use a resampler because input AudioUnit operates
// reliable only in the capture device sample rate.
// Resampler will convert it to the user sample rate
// and deliver it to the callback.
let target_sample_rate = if self.has_input() {
self.input_stream_params.rate()
} else {
assert!(self.has_output());
self.output_stream_params.rate()
};
let resampler_input_params = if self.has_input() {
let mut p = unsafe { *(self.input_stream_params.as_ptr()) };
p.rate = self.input_dev_desc.mSampleRate as u32;
Some(p)
} else {
None
};
let resampler_output_params = if self.has_output() {
let mut p = unsafe { *(self.output_stream_params.as_ptr()) };
p.rate = self.output_dev_desc.mSampleRate as u32;
Some(p)
} else {
None
};
// Only reclock if there is an input and we couldn't use an aggregate device, and the
// devices are not part of the same clock domain.
let reclock_policy = if self.aggregate_device.is_none()
&& !using_voice_processing_unit
&& !same_clock_domain
{
cubeb_log!(
"Reclocking duplex steam using_aggregate_device={} same_clock_domain={}",
self.aggregate_device.is_some(),
same_clock_domain
);
ffi::CUBEB_RESAMPLER_RECLOCK_INPUT
} else {
ffi::CUBEB_RESAMPLER_RECLOCK_NONE
};
self.resampler = Resampler::new(
self.stm_ptr as *mut ffi::cubeb_stream,
resampler_input_params,
resampler_output_params,
target_sample_rate,
stream.data_callback,
stream.user_ptr,
ffi::CUBEB_RESAMPLER_QUALITY_DESKTOP,
reclock_policy,
);
// In duplex, the input thread might be different from the output thread, and we're logging
// everything from the output thread: relay the audio input callback information using a
// ring buffer to diagnose issues.
if self.has_input() && self.has_output() {
self.input_logging = Some(InputCallbackLogger::new());
}
#[cfg(feature = "audio-dump")]
{
unsafe { ffi::cubeb_audio_dump_start(self.audio_dump_session) };
self.audio_dump_session_running = true;
}
if !self.input_unit.is_null() {
let r = audio_unit_initialize(self.input_unit);
if r != NO_ERR {
cubeb_log!("AudioUnitInitialize/input rv={}", r);
return Err(Error::error());
}
stream.input_device_latency_frames.store(
get_fixed_latency(self.input_device.id, DeviceType::INPUT),
Ordering::SeqCst,
);
}
if !self.output_unit.is_null() {
if self.input_unit != self.output_unit {
let r = audio_unit_initialize(self.output_unit);
if r != NO_ERR {
cubeb_log!("AudioUnitInitialize/output rv={}", r);
return Err(Error::error());
}
}
stream.output_device_latency_frames.store(
get_fixed_latency(self.output_device.id, DeviceType::OUTPUT),
Ordering::SeqCst,
);
let mut unit_s: f64 = 0.0;
let mut size = mem::size_of_val(&unit_s);
if audio_unit_get_property(
self.output_unit,
kAudioUnitProperty_Latency,
kAudioUnitScope_Global,
0,
&mut unit_s,
&mut size,
) == NO_ERR
{
stream.output_device_latency_frames.fetch_add(
(unit_s * self.output_dev_desc.mSampleRate) as u32,
Ordering::SeqCst,
);
}
}
if using_voice_processing_unit {
// The VPIO AudioUnit automatically ducks other audio streams on the VPIO
// output device. Its ramp duration is 0.5s when ducking, so unduck similarly
// now.
// NOTE: On MacOS 14 the ducking happens on creation of the VPIO AudioUnit.
// On MacOS 10.15 it happens on both creation and initialization, which
// is why we defer the unducking until now.
#[allow(non_upper_case_globals)]
let mut device = match self.output_device.id {
kAudioObjectUnknown => None,
id => Some(id),
};
device = device.or_else(|| get_default_device(DeviceType::OUTPUT));
match device {
None => {
cubeb_log!(
"({:p}) No output device to undo vpio ducking on",
self.stm_ptr
);
}
Some(id) => {
let r = audio_device_duck(id, 1.0, ptr::null_mut(), 0.5);
if r != NO_ERR {
cubeb_log!(
"({:p}) Failed to undo ducking of voiceprocessing on output device {}. Proceeding... Error: {}",
self.stm_ptr,
id,
r
);
}
}
};
// Always try to remember the applied input mute state. If it cannot be applied
// to the new device pair, we notify the client of an error and it will have to
// open a new stream.
if let Err(r) = set_input_mute(self.input_unit, self.input_mute) {
cubeb_log!(
"({:p}) Failed to set mute state of voiceprocessing. Error: {}",
self.stm_ptr,
r
);
return Err(r);
}
}
if let Err(r) = self.install_system_changed_callback() {
cubeb_log!(
"({:p}) Could not install the device change callback.",
self.stm_ptr
);
return Err(r);
}
if let Err(r) = self.install_device_changed_callback() {
cubeb_log!(
"({:p}) Could not install all device change callback.",
self.stm_ptr
);
return Err(r);
}
// We have either default_input_listener or input_alive_listener.
// We cannot have both of them at the same time.
assert!(
!self.has_input()
|| ((self.default_input_listener.is_some() != self.input_alive_listener.is_some())
&& (self.default_input_listener.is_some()
|| self.input_alive_listener.is_some()))
);
// We have either default_output_listener or output_alive_listener.
// We cannot have both of them at the same time.
assert!(
!self.has_output()
|| ((self.default_output_listener.is_some()
!= self.output_alive_listener.is_some())
&& (self.default_output_listener.is_some()
|| self.output_alive_listener.is_some()))
);
Ok(())
}
fn close(&mut self) {
self.debug_assert_is_on_stream_queue();
if !self.input_unit.is_null() {
audio_unit_uninitialize(self.input_unit);
if self.using_voice_processing_unit() {
// Handle the VoiceProcessIO case where there is a single unit.
self.output_unit = ptr::null_mut();
}
// Cannot unset self.input_unit yet, since the output callback might be live
// and reading it.
}
if !self.output_unit.is_null() {
audio_unit_uninitialize(self.output_unit);
dispose_audio_unit(self.output_unit);
self.output_unit = ptr::null_mut();
}
if !self.input_unit.is_null() {
if !self.using_voice_processing_unit() {
// The VPIO unit is shared and must not be disposed.
dispose_audio_unit(self.input_unit);
}
self.input_unit = ptr::null_mut();
}
// Return the VPIO unit if present.
self.voiceprocessing_unit_handle = None;
#[cfg(feature = "audio-dump")]
{
if !self.audio_dump_session.is_null() {
unsafe {
ffi::cubeb_audio_dump_stop(self.audio_dump_session);
if !self.audio_dump_input.is_null() {
let rv = ffi::cubeb_audio_dump_stream_shutdown(
self.audio_dump_session,
self.audio_dump_input,
);
if rv != 0 {
cubeb_log!("Failed to shutdown audio dump for input");
}
}
if !self.audio_dump_output.is_null() {
let rv = ffi::cubeb_audio_dump_stream_shutdown(
self.audio_dump_session,
self.audio_dump_output,
);
if rv != 0 {
cubeb_log!("Failed to shutdown audio dump for output");
}
}
ffi::cubeb_audio_dump_shutdown(self.audio_dump_session);
self.audio_dump_session = ptr::null_mut();
self.audio_dump_session_running = false;
}
}
}
self.resampler.destroy();
self.mixer = None;
self.aggregate_device = None;
if self.uninstall_system_changed_callback().is_err() {
cubeb_log!(
"({:p}) Could not uninstall the system changed callback",
self.stm_ptr
);
}
if self.uninstall_device_changed_callback().is_err() {
cubeb_log!(
"({:p}) Could not uninstall all device change listeners",
self.stm_ptr
);
}
}
fn install_device_changed_callback(&mut self) -> Result<()> {
self.debug_assert_is_on_stream_queue();
assert!(!self.stm_ptr.is_null());
let stm = unsafe { &(*self.stm_ptr) };
if !self.output_unit.is_null() {
assert_ne!(self.output_device.id, kAudioObjectUnknown);
assert_ne!(self.output_device.id, kAudioObjectSystemObject);
assert!(
self.output_source_listener.is_none(),
"register output_source_listener without unregistering the one in use"
);
assert!(
self.output_alive_listener.is_none(),
"register output_alive_listener without unregistering the one in use"
);
// Get the notification when the data source on the same device changes,
// e.g., when the user plugs in a TRRS headset into the headphone jack.
self.output_source_listener = Some(device_property_listener::new(
self.output_device.id,
get_property_address(Property::DeviceSource, DeviceType::OUTPUT),
audiounit_property_listener_callback,
));
let rv = stm.add_device_listener(self.output_source_listener.as_ref().unwrap());
if rv != NO_ERR {
self.output_source_listener = None;
cubeb_log!("AudioObjectAddPropertyListener/output/kAudioDevicePropertyDataSource rv={}, device id={}", rv, self.output_device.id);
return Err(Error::error());
}
// Get the notification when the output device is going away
// if the output doesn't follow the system default.
if !self
.output_device
.flags
.contains(device_flags::DEV_SELECTED_DEFAULT)
{
self.output_alive_listener = Some(device_property_listener::new(
self.output_device.id,
get_property_address(
Property::DeviceIsAlive,
DeviceType::INPUT | DeviceType::OUTPUT,
),
audiounit_property_listener_callback,
));
let rv = stm.add_device_listener(self.output_alive_listener.as_ref().unwrap());
if rv != NO_ERR {
self.output_alive_listener = None;
cubeb_log!("AudioObjectAddPropertyListener/output/kAudioDevicePropertyDeviceIsAlive rv={}, device id ={}", rv, self.output_device.id);
return Err(Error::error());
}
}
}
if !self.input_unit.is_null() {
assert_ne!(self.input_device.id, kAudioObjectUnknown);
assert_ne!(self.input_device.id, kAudioObjectSystemObject);
assert!(
self.input_source_listener.is_none(),
"register input_source_listener without unregistering the one in use"
);
assert!(
self.input_alive_listener.is_none(),
"register input_alive_listener without unregistering the one in use"
);
// Get the notification when the data source on the same device changes,
// e.g., when the user plugs in a TRRS mic into the headphone jack.
self.input_source_listener = Some(device_property_listener::new(
self.input_device.id,
get_property_address(Property::DeviceSource, DeviceType::INPUT),
audiounit_property_listener_callback,
));
let rv = stm.add_device_listener(self.input_source_listener.as_ref().unwrap());
if rv != NO_ERR {
self.input_source_listener = None;
cubeb_log!("AudioObjectAddPropertyListener/input/kAudioDevicePropertyDataSource rv={}, device id={}", rv, self.input_device.id);
return Err(Error::error());
}
// Get the notification when the input device is going away
// if the input doesn't follow the system default.
if !self
.input_device
.flags
.contains(device_flags::DEV_SELECTED_DEFAULT)
{
self.input_alive_listener = Some(device_property_listener::new(
self.input_device.id,
get_property_address(
Property::DeviceIsAlive,
DeviceType::INPUT | DeviceType::OUTPUT,
),
audiounit_property_listener_callback,
));
let rv = stm.add_device_listener(self.input_alive_listener.as_ref().unwrap());
if rv != NO_ERR {
self.input_alive_listener = None;
cubeb_log!("AudioObjectAddPropertyListener/input/kAudioDevicePropertyDeviceIsAlive rv={}, device id ={}", rv, self.input_device.id);
return Err(Error::error());
}
}
}
Ok(())
}
fn install_system_changed_callback(&mut self) -> Result<()> {
self.debug_assert_is_on_stream_queue();
assert!(!self.stm_ptr.is_null());
let stm = unsafe { &(*self.stm_ptr) };
if !self.output_unit.is_null()
&& self
.output_device
.flags
.contains(device_flags::DEV_SELECTED_DEFAULT)
{
assert!(
self.default_output_listener.is_none(),
"register default_output_listener without unregistering the one in use"
);
// Get the notification when the default output audio changes, e.g.,
// when the user plugs in a USB headset and the system chooses it automatically as the default,
// or when another device is chosen in the dropdown list.
self.default_output_listener = Some(device_property_listener::new(
kAudioObjectSystemObject,
get_property_address(
Property::HardwareDefaultOutputDevice,
DeviceType::INPUT | DeviceType::OUTPUT,
),
audiounit_property_listener_callback,
));
let r = stm.add_device_listener(self.default_output_listener.as_ref().unwrap());
if r != NO_ERR {
self.default_output_listener = None;
cubeb_log!("AudioObjectAddPropertyListener/output/kAudioHardwarePropertyDefaultOutputDevice rv={}", r);
return Err(Error::error());
}
}
if !self.input_unit.is_null()
&& self
.input_device
.flags
.contains(device_flags::DEV_SELECTED_DEFAULT)
{
assert!(
self.default_input_listener.is_none(),
"register default_input_listener without unregistering the one in use"
);
// Get the notification when the default intput audio changes, e.g.,
// when the user plugs in a USB mic and the system chooses it automatically as the default,
// or when another device is chosen in the system preference.
self.default_input_listener = Some(device_property_listener::new(
kAudioObjectSystemObject,
get_property_address(
Property::HardwareDefaultInputDevice,
DeviceType::INPUT | DeviceType::OUTPUT,
),
audiounit_property_listener_callback,
));
let r = stm.add_device_listener(self.default_input_listener.as_ref().unwrap());
if r != NO_ERR {
self.default_input_listener = None;
cubeb_log!("AudioObjectAddPropertyListener/input/kAudioHardwarePropertyDefaultInputDevice rv={}", r);
return Err(Error::error());
}
}
Ok(())
}
fn uninstall_device_changed_callback(&mut self) -> Result<()> {
self.debug_assert_is_on_stream_queue();
if self.stm_ptr.is_null() {
assert!(
self.output_source_listener.is_none()
&& self.output_alive_listener.is_none()
&& self.input_source_listener.is_none()
&& self.input_alive_listener.is_none()
);
return Ok(());
}
let stm = unsafe { &(*self.stm_ptr) };
// Failing to uninstall listeners is not a fatal error.
let mut r = Ok(());
if self.output_source_listener.is_some() {
let rv = stm.remove_device_listener(self.output_source_listener.as_ref().unwrap());
if rv != NO_ERR {
cubeb_log!("AudioObjectRemovePropertyListener/output/kAudioDevicePropertyDataSource rv={}, device id={}", rv, self.output_device.id);
r = Err(Error::error());
}
self.output_source_listener = None;
}
if self.output_alive_listener.is_some() {
let rv = stm.remove_device_listener(self.output_alive_listener.as_ref().unwrap());
if rv != NO_ERR {
cubeb_log!("AudioObjectRemovePropertyListener/output/kAudioDevicePropertyDeviceIsAlive rv={}, device id={}", rv, self.output_device.id);
r = Err(Error::error());
}
self.output_alive_listener = None;
}
if self.input_source_listener.is_some() {
let rv = stm.remove_device_listener(self.input_source_listener.as_ref().unwrap());
if rv != NO_ERR {
cubeb_log!("AudioObjectRemovePropertyListener/input/kAudioDevicePropertyDataSource rv={}, device id={}", rv, self.input_device.id);
r = Err(Error::error());
}
self.input_source_listener = None;
}
if self.input_alive_listener.is_some() {
let rv = stm.remove_device_listener(self.input_alive_listener.as_ref().unwrap());
if rv != NO_ERR {
cubeb_log!("AudioObjectRemovePropertyListener/input/kAudioDevicePropertyDeviceIsAlive rv={}, device id={}", rv, self.input_device.id);
r = Err(Error::error());
}
self.input_alive_listener = None;
}
r
}
fn uninstall_system_changed_callback(&mut self) -> Result<()> {
self.debug_assert_is_on_stream_queue();
if self.stm_ptr.is_null() {
assert!(
self.default_output_listener.is_none() && self.default_input_listener.is_none()
);
return Ok(());
}
let stm = unsafe { &(*self.stm_ptr) };
if self.default_output_listener.is_some() {
let r = stm.remove_device_listener(self.default_output_listener.as_ref().unwrap());
if r != NO_ERR {
return Err(Error::error());
}
self.default_output_listener = None;
}
if self.default_input_listener.is_some() {
let r = stm.remove_device_listener(self.default_input_listener.as_ref().unwrap());
if r != NO_ERR {
return Err(Error::error());
}
self.default_input_listener = None;
}
Ok(())
}
fn get_output_channel_layout(&self) -> Result<Vec<mixer::Channel>> {
self.debug_assert_is_on_stream_queue();
assert!(!self.output_unit.is_null());
if self.using_voice_processing_unit() {
return Ok(get_channel_order(ChannelLayout::MONO));
}
get_channel_layout(self.output_unit)
}
}
impl<'ctx> Drop for CoreStreamData<'ctx> {
fn drop(&mut self) {
self.debug_assert_is_on_stream_queue();
self.stop_audiounits();
self.close();
}
}
#[derive(Debug, Clone)]
struct OutputCallbackTimingData {
frames_queued: u64,
timestamp: u64,
buffer_size: u64,
}
// The fisrt two members of the Cubeb stream must be a pointer to its Cubeb context and a void user
// defined pointer. The Cubeb interface use this assumption to operate the Cubeb APIs.
// #[repr(C)] is used to prevent any padding from being added in the beginning of the AudioUnitStream.
#[repr(C)]
#[derive(Debug)]
struct AudioUnitStream<'ctx> {
context: &'ctx mut AudioUnitContext,
user_ptr: *mut c_void,
// Task queue for the stream.
queue: Queue,
data_callback: ffi::cubeb_data_callback,
state_callback: ffi::cubeb_state_callback,
device_changed_callback: Mutex<ffi::cubeb_device_changed_callback>,
// Frame counters
frames_queued: u64,
// How many frames got read from the input since the stream started (includes
// padded silence)
frames_read: AtomicUsize,
// How many frames got written to the output device since the stream started
frames_written: AtomicUsize,
stopped: AtomicBool,
draining: AtomicBool,
reinit_pending: AtomicBool,
delayed_reinit: bool,
destroy_pending: AtomicBool,
// Latency requested by the user.
latency_frames: u32,
// Fixed latency, characteristic of the device.
output_device_latency_frames: AtomicU32,
input_device_latency_frames: AtomicU32,
// Total latency: the latency of the device + the OS latency
total_output_latency_frames: AtomicU32,
total_input_latency_frames: AtomicU32,
output_callback_timing_data_read: triple_buffer::Output<OutputCallbackTimingData>,
output_callback_timing_data_write: triple_buffer::Input<OutputCallbackTimingData>,
prev_position: u64,
// This is true if a device change callback is currently running.
switching_device: AtomicBool,
core_stream_data: CoreStreamData<'ctx>,
}
impl<'ctx> AudioUnitStream<'ctx> {
fn new(
context: &'ctx mut AudioUnitContext,
user_ptr: *mut c_void,
data_callback: ffi::cubeb_data_callback,
state_callback: ffi::cubeb_state_callback,
latency_frames: u32,
) -> Self {
let output_callback_timing_data =
triple_buffer::TripleBuffer::new(OutputCallbackTimingData {
frames_queued: 0,
timestamp: 0,
buffer_size: 0,
});
let (output_callback_timing_data_write, output_callback_timing_data_read) =
output_callback_timing_data.split();
let queue = context.serial_queue.clone();
AudioUnitStream {
context,
user_ptr,
queue,
data_callback,
state_callback,
device_changed_callback: Mutex::new(None),
frames_queued: 0,
frames_read: AtomicUsize::new(0),
frames_written: AtomicUsize::new(0),
stopped: AtomicBool::new(true),
draining: AtomicBool::new(false),
reinit_pending: AtomicBool::new(false),
delayed_reinit: false,
destroy_pending: AtomicBool::new(false),
latency_frames,
output_device_latency_frames: AtomicU32::new(0),
input_device_latency_frames: AtomicU32::new(0),
total_output_latency_frames: AtomicU32::new(0),
total_input_latency_frames: AtomicU32::new(0),
output_callback_timing_data_write,
output_callback_timing_data_read,
prev_position: 0,
switching_device: AtomicBool::new(false),
core_stream_data: CoreStreamData::default(),
}
}
fn add_device_listener(&self, listener: &device_property_listener) -> OSStatus {
self.queue.debug_assert_is_current();
audio_object_add_property_listener(
listener.device,
&listener.property,
listener.listener,
self as *const Self as *mut c_void,
)
}
fn remove_device_listener(&self, listener: &device_property_listener) -> OSStatus {
self.queue.debug_assert_is_current();
audio_object_remove_property_listener(
listener.device,
&listener.property,
listener.listener,
self as *const Self as *mut c_void,
)
}
fn notify_state_changed(&self, state: State) {
if self.state_callback.is_none() {
return;
}
let callback = self.state_callback.unwrap();
unsafe {
callback(
self as *const AudioUnitStream as *mut ffi::cubeb_stream,
self.user_ptr,
state.into(),
);
}
}
fn reinit(&mut self) -> Result<()> {
self.queue.debug_assert_is_current();
// Call stop_audiounits to avoid potential data race. If there is a running data callback,
// which locks a mutex inside CoreAudio framework, then this call will block the current
// thread until the callback is finished since this call asks to lock a mutex inside
// CoreAudio framework that is used by the data callback.
if !self.stopped.load(Ordering::SeqCst) {
self.core_stream_data.stop_audiounits();
}
if self.stopped.load(Ordering::SeqCst) {
// Something stopped the stream, reinit on next start
self.delayed_reinit = true;
return Ok(());
}
debug_assert!(
!self.core_stream_data.input_unit.is_null()
|| !self.core_stream_data.output_unit.is_null()
);
let vol_rv = if self.core_stream_data.output_unit.is_null() {
Err(Error::error())
} else {
get_volume(self.core_stream_data.output_unit)
};
self.core_stream_data.close();
// Use the new default device if this stream was set to follow the output device.
if self.core_stream_data.has_output()
&& self
.core_stream_data
.output_device
.flags
.contains(device_flags::DEV_SELECTED_DEFAULT)
{
cubeb_log!("Using new default output device");
self.core_stream_data.output_device =
match create_device_info(kAudioObjectUnknown, DeviceType::OUTPUT) {
None => {
cubeb_log!("Fail to create device info for output");
return Err(Error::error());
}
Some(d) => d,
};
}
// Likewise, for the input side
if self.core_stream_data.has_input()
&& self
.core_stream_data
.input_device
.flags
.contains(device_flags::DEV_SELECTED_DEFAULT)
{
cubeb_log!("Using new default input device");
self.core_stream_data.input_device =
match create_device_info(kAudioObjectUnknown, DeviceType::INPUT) {
None => {
cubeb_log!("Fail to create device info for input");
return Err(Error::error());
}
Some(d) => d,
}
}
cubeb_log!("Reinit: setup");
self.core_stream_data
.setup(&mut self.context.shared_voice_processing_unit)
.inspect_err(|_| {
cubeb_log!("({:p}) Setup failed.", self.core_stream_data.stm_ptr);
})?;
if let Ok(volume) = vol_rv {
set_volume(self.core_stream_data.output_unit, volume);
}
// If the stream was running, start it again.
if !self.stopped.load(Ordering::SeqCst) {
self.core_stream_data.start_audiounits().inspect_err(|_| {
cubeb_log!(
"({:p}) Start audiounit failed.",
self.core_stream_data.stm_ptr
);
})?;
}
Ok(())
}
fn reinit_async(&mut self) {
if self.reinit_pending.swap(true, Ordering::SeqCst) {
// A reinit task is already pending, nothing more to do.
cubeb_log!(
"({:p}) re-init stream task already pending, cancelling request",
self as *const AudioUnitStream
);
return;
}
let queue = self.queue.clone();
// Use a new thread, through the queue, to avoid deadlock when calling
// Get/SetProperties method from inside notify callback
queue.run_async(move || {
cubeb_log!("Reinitialization of stream");
let stm_ptr = self as *const AudioUnitStream;
if self.destroy_pending.load(Ordering::SeqCst) {
cubeb_log!(
"({:p}) stream pending destroy, cancelling reinit task",
stm_ptr
);
return;
}
if self.reinit().is_err() {
self.core_stream_data.close();
self.notify_state_changed(State::Error);
cubeb_log!(
"({:p}) Could not reopen the stream after switching.",
stm_ptr
);
}
self.switching_device.store(false, Ordering::SeqCst);
self.reinit_pending.store(false, Ordering::SeqCst);
});
}
fn close_on_error(&mut self) {
self.queue.debug_assert_is_current();
let stm_ptr = self as *const AudioUnitStream;
self.core_stream_data.close();
self.notify_state_changed(State::Error);
cubeb_log!("({:p}) Close the stream due to an error.", stm_ptr);
self.switching_device.store(false, Ordering::SeqCst);
}
fn destroy_internal(&mut self) {
self.queue.debug_assert_is_current();
self.core_stream_data.close();
assert!(self.context.active_streams() >= 1);
self.context.update_latency_by_removing_stream();
}
fn destroy(&mut self) {
self.queue.debug_assert_is_current();
if self
.core_stream_data
.uninstall_system_changed_callback()
.is_err()
{
cubeb_log!(
"({:p}) Could not uninstall the system changed callback",
self as *const AudioUnitStream
);
}
if self
.core_stream_data
.uninstall_device_changed_callback()
.is_err()
{
cubeb_log!(
"({:p}) Could not uninstall all device change listeners",
self as *const AudioUnitStream
);
}
// Execute the stream destroy work.
self.destroy_pending.store(true, Ordering::SeqCst);
// Call stop_audiounits to avoid potential data race. If there is a running data callback,
// which locks a mutex inside CoreAudio framework, then this call will block the current
// thread until the callback is finished since this call asks to lock a mutex inside
// CoreAudio framework that is used by the data callback.
if !self.stopped.swap(true, Ordering::SeqCst) {
self.core_stream_data.stop_audiounits();
}
self.destroy_internal();
cubeb_log!(
"Cubeb stream ({:p}) destroyed successful.",
self as *const AudioUnitStream
);
}
}
impl<'ctx> Drop for AudioUnitStream<'ctx> {
fn drop(&mut self) {
// Execute destroy in serial queue to avoid collision with reinit when un/plug devices
self.queue.clone().run_final(|| {
self.destroy();
self.core_stream_data = CoreStreamData::default();
});
}
}
impl<'ctx> StreamOps for AudioUnitStream<'ctx> {
fn start(&mut self) -> Result<()> {
let was_stopped = self.stopped.load(Ordering::SeqCst);
let was_draining = self.draining.load(Ordering::SeqCst);
self.stopped.store(false, Ordering::SeqCst);
self.draining.store(false, Ordering::SeqCst);
self.queue
.clone()
.run_sync(|| -> Result<()> {
// Need reinitialization: device was changed when paused. It will be started after
// reinit because self.stopped is false.
if self.delayed_reinit {
let rv = self.reinit().inspect_err(|_| {
cubeb_log!(
"({:p}) delayed reinit during start failed.",
self.core_stream_data.stm_ptr
);
});
// In case of failure, restore the state
if rv.is_err() {
self.stopped.store(was_stopped, Ordering::SeqCst);
self.draining.store(was_draining, Ordering::SeqCst);
return rv;
}
self.delayed_reinit = false;
Ok(())
} else {
// Execute start in serial queue to avoid racing with destroy or reinit.
let rv = self.core_stream_data.start_audiounits();
if rv.is_err() {
cubeb_log!("({:p}) start failed.", self.core_stream_data.stm_ptr);
self.stopped.store(was_stopped, Ordering::SeqCst);
self.draining.store(was_draining, Ordering::SeqCst);
return rv;
}
Ok(())
}
})
.unwrap()?;
self.notify_state_changed(State::Started);
cubeb_log!(
"Cubeb stream ({:p}) started successfully.",
self as *const AudioUnitStream
);
Ok(())
}
fn stop(&mut self) -> Result<()> {
if !self.stopped.swap(true, Ordering::SeqCst) {
// Execute stop in serial queue to avoid racing with destroy or reinit.
self.queue
.clone()
.run_sync(|| self.core_stream_data.stop_audiounits());
self.notify_state_changed(State::Stopped);
cubeb_log!(
"Cubeb stream ({:p}) stopped successfully.",
self as *const AudioUnitStream
);
}
Ok(())
}
fn position(&mut self) -> Result<u64> {
let OutputCallbackTimingData {
frames_queued,
timestamp,
buffer_size,
} = self.output_callback_timing_data_read.read().clone();
let total_output_latency_frames =
u64::from(self.total_output_latency_frames.load(Ordering::SeqCst));
// If output latency is available, take it into account. Otherwise, use the number of
// frames played.
let position = if total_output_latency_frames != 0 {
if total_output_latency_frames > frames_queued {
0
} else {
// Interpolate here to match other cubeb backends. Only return an interpolated time
// if we've played enough frames. If the stream is paused, clamp the interpolated
// number of frames to the buffer size.
const NS2S: u64 = 1_000_000_000;
let now = unsafe { mach_absolute_time() };
let diff = now - timestamp;
let interpolated_frames = cmp::min(
host_time_to_ns(self.context, diff)
* self.core_stream_data.output_stream_params.rate() as u64
/ NS2S,
buffer_size,
);
(frames_queued - total_output_latency_frames) + interpolated_frames
}
} else {
frames_queued
};
// Ensure mononicity of the clock even when changing output device.
if position > self.prev_position {
self.prev_position = position;
}
Ok(self.prev_position)
}
#[cfg(target_os = "ios")]
fn latency(&mut self) -> Result<u32> {
Err(not_supported())
}
#[cfg(not(target_os = "ios"))]
fn latency(&mut self) -> Result<u32> {
Ok(self.total_output_latency_frames.load(Ordering::SeqCst))
}
#[cfg(target_os = "ios")]
fn input_latency(&mut self) -> Result<u32> {
Err(not_supported())
}
#[cfg(not(target_os = "ios"))]
fn input_latency(&mut self) -> Result<u32> {
let user_rate = self.core_stream_data.input_stream_params.rate();
let hw_rate = self.core_stream_data.input_dev_desc.mSampleRate as u32;
let frames = self.total_input_latency_frames.load(Ordering::SeqCst);
if frames != 0 {
if hw_rate == user_rate {
Ok(frames)
} else {
Ok((frames * user_rate) / hw_rate)
}
} else {
Err(Error::error())
}
}
fn set_volume(&mut self, volume: f32) -> Result<()> {
// Execute set_volume in serial queue to avoid racing with destroy or reinit.
let result = self
.queue
.run_sync(|| set_volume(self.core_stream_data.output_unit, volume))
.unwrap();
result?;
cubeb_log!(
"Cubeb stream ({:p}) set volume to {}.",
self as *const AudioUnitStream,
volume
);
Ok(())
}
fn set_name(&mut self, _: &CStr) -> Result<()> {
Err(Error::not_supported())
}
fn current_device(&mut self) -> Result<&DeviceRef> {
Err(Error::not_supported())
}
fn set_input_mute(&mut self, mute: bool) -> Result<()> {
if self.core_stream_data.input_unit.is_null() {
return Err(Error::invalid_parameter());
}
if !self.core_stream_data.using_voice_processing_unit() {
return Err(Error::error());
}
// Execute set_input_mute in serial queue to avoid racing with destroy or reinit.
let mut result = Err(Error::error());
let set = &mut result;
let stream = &self;
self.queue.run_sync(move || {
*set = set_input_mute(stream.core_stream_data.input_unit, mute);
});
result?;
cubeb_log!(
"Cubeb stream ({:p}) set input mute to {}.",
self as *const AudioUnitStream,
mute
);
self.core_stream_data.input_mute = mute;
Ok(())
}
fn set_input_processing_params(&mut self, params: InputProcessingParams) -> Result<()> {
// CUBEB_ERROR_INVALID_PARAMETER if a given param is not supported by
// this backend, or if this stream does not have an input device
if self.core_stream_data.input_unit.is_null() {
return Err(Error::invalid_parameter());
}
if self
.context
.supported_input_processing_params()
.unwrap()
.intersection(params)
!= params
{
return Err(Error::invalid_parameter());
}
// AEC and NS are active as soon as VPIO is not bypassed, therefore the only combinations
// of those we can explicitly support are {} and {aec, ns}.
let aec = params.contains(InputProcessingParams::ECHO_CANCELLATION);
let ns = params.contains(InputProcessingParams::NOISE_SUPPRESSION);
if aec != ns {
// No control to turn on AEC without NS or vice versa.
cubeb_log!(
"Cubeb stream ({:p}) couldn't set input processing params {:?}. AEC != NS.",
self as *const AudioUnitStream,
params
);
return Err(Error::error());
}
// CUBEB_ERROR if params could not be applied
// note: only works with VoiceProcessingIO
if !self.core_stream_data.using_voice_processing_unit() {
return Err(Error::error());
}
// Execute set_input_processing_params in serial queue to avoid racing with destroy or reinit.
let mut result = Err(Error::error());
let result_ = &mut result;
let mut deferred = false;
let deferred_ = &mut deferred;
let stream = &self;
self.queue.run_sync(move || {
if stream.core_stream_data.units_running {
*deferred_ = true;
*result_ = Ok(());
} else {
*deferred_ = false;
*result_ = set_input_processing_params(stream.core_stream_data.input_unit, params);
}
});
result?;
cubeb_log!(
"Cubeb stream ({:p}) {} input processing params {:?}.",
self as *const AudioUnitStream,
if deferred { "deferred" } else { "set" },
params
);
self.core_stream_data.input_processing_params = params;
Ok(())
}
#[cfg(target_os = "ios")]
fn device_destroy(&mut self, device: &DeviceRef) -> Result<()> {
Err(not_supported())
}
#[cfg(not(target_os = "ios"))]
fn device_destroy(&mut self, device: &DeviceRef) -> Result<()> {
if device.as_ptr().is_null() {
Err(Error::error())
} else {
unsafe {
let mut dev: Box<ffi::cubeb_device> = Box::from_raw(device.as_ptr() as *mut _);
if !dev.output_name.is_null() {
let _ = CString::from_raw(dev.output_name as *mut _);
dev.output_name = ptr::null_mut();
}
if !dev.input_name.is_null() {
let _ = CString::from_raw(dev.input_name as *mut _);
dev.input_name = ptr::null_mut();
}
drop(dev);
}
Ok(())
}
}
fn register_device_changed_callback(
&mut self,
device_changed_callback: ffi::cubeb_device_changed_callback,
) -> Result<()> {
let mut callback = self.device_changed_callback.lock().unwrap();
// Note: second register without unregister first causes 'nope' error.
// Current implementation requires unregister before register a new cb.
if device_changed_callback.is_some() && callback.is_some() {
Err(Error::invalid_parameter())
} else {
*callback = device_changed_callback;
Ok(())
}
}
}
#[allow(clippy::non_send_fields_in_send_ty)]
unsafe impl<'ctx> Send for AudioUnitStream<'ctx> {}
unsafe impl<'ctx> Sync for AudioUnitStream<'ctx> {}
#[cfg(test)]
mod tests;