Source code
Revision control
Copy as Markdown
Other Tools
// Copyright © 2017-2018 Mozilla Foundation
//
// This program is made available under an ISC-style license.  See the
// accompanying file LICENSE for details.
use crate::backend::*;
use cubeb_backend::{
    ffi, log_enabled, ContextOps, DeviceId, DeviceInfo, DeviceType, Error, InputProcessingParams,
    Ops, Result, Stream, StreamParams, StreamParamsRef,
};
use pulse::{self, ProplistExt};
use pulse_ffi::*;
use std::cell::RefCell;
use std::default::Default;
use std::ffi::{CStr, CString};
use std::os::raw::c_void;
use std::ptr;
#[derive(Debug)]
pub struct DefaultInfo {
    pub sample_spec: pulse::SampleSpec,
    pub channel_map: pulse::ChannelMap,
    pub flags: pulse::SinkFlags,
}
pub const PULSE_OPS: Ops = capi_new!(PulseContext, PulseStream);
#[repr(C)]
#[derive(Debug)]
pub struct PulseContext {
    _ops: *const Ops,
    pub mainloop: pulse::ThreadedMainloop,
    pub context: Option<pulse::Context>,
    pub default_sink_info: Option<DefaultInfo>,
    pub context_name: Option<CString>,
    pub input_collection_changed_callback: ffi::cubeb_device_collection_changed_callback,
    pub input_collection_changed_user_ptr: *mut c_void,
    pub output_collection_changed_callback: ffi::cubeb_device_collection_changed_callback,
    pub output_collection_changed_user_ptr: *mut c_void,
    pub default_sink_name: Option<CString>,
    pub default_source_name: Option<CString>,
    pub error: bool,
    pub version_2_0_0: bool,
    pub version_0_9_8: bool,
    #[cfg(feature = "pulse-dlopen")]
    pub libpulse: LibLoader,
    devids: RefCell<Intern>,
}
impl PulseContext {
    #[cfg(feature = "pulse-dlopen")]
    fn _new(name: Option<CString>) -> Result<Box<Self>> {
        let libpulse = unsafe { open() };
        if libpulse.is_none() {
            cubeb_log!("libpulse not found");
            return Err(Error::Error);
        }
        let ctx = Box::new(PulseContext {
            _ops: &PULSE_OPS,
            libpulse: libpulse.unwrap(),
            mainloop: pulse::ThreadedMainloop::new(),
            context: None,
            default_sink_info: None,
            context_name: name,
            input_collection_changed_callback: None,
            input_collection_changed_user_ptr: ptr::null_mut(),
            output_collection_changed_callback: None,
            output_collection_changed_user_ptr: ptr::null_mut(),
            default_sink_name: None,
            default_source_name: None,
            error: true,
            version_0_9_8: false,
            version_2_0_0: false,
            devids: RefCell::new(Intern::new()),
        });
        Ok(ctx)
    }
    #[cfg(not(feature = "pulse-dlopen"))]
    fn _new(name: Option<CString>) -> Result<Box<Self>> {
        Ok(Box::new(PulseContext {
            _ops: &PULSE_OPS,
            mainloop: pulse::ThreadedMainloop::new(),
            context: None,
            default_sink_info: None,
            context_name: name,
            input_collection_changed_callback: None,
            input_collection_changed_user_ptr: ptr::null_mut(),
            output_collection_changed_callback: None,
            output_collection_changed_user_ptr: ptr::null_mut(),
            default_sink_name: None,
            default_source_name: None,
            error: true,
            version_0_9_8: false,
            version_2_0_0: false,
            devids: RefCell::new(Intern::new()),
        }))
    }
    fn server_info_cb(context: &pulse::Context, info: Option<&pulse::ServerInfo>, u: *mut c_void) {
        fn sink_info_cb(_: &pulse::Context, i: *const pulse::SinkInfo, eol: i32, u: *mut c_void) {
            let ctx = unsafe { &mut *(u as *mut PulseContext) };
            if eol == 0 {
                let info = unsafe { &*i };
                let flags = pulse::SinkFlags::from_bits_truncate(info.flags);
                ctx.default_sink_info = Some(DefaultInfo {
                    sample_spec: info.sample_spec,
                    channel_map: info.channel_map,
                    flags,
                });
            }
            ctx.mainloop.signal();
        }
        if let Some(info) = info {
            let ctx = unsafe { &mut *(u as *mut PulseContext) };
            // Check if default devices changed, and call the appropriate callback if present.
            let new_sink_name = try_cstr_from(info.default_sink_name).map(|s| s.to_owned());
            let new_source_name = try_cstr_from(info.default_source_name).map(|s| s.to_owned());
            let sink_changed = ctx.default_sink_name != new_sink_name;
            let source_changed = ctx.default_source_name != new_source_name;
            ctx.default_sink_name = new_sink_name;
            ctx.default_source_name = new_source_name;
            if sink_changed && ctx.output_collection_changed_callback.is_some() {
                unsafe {
                    ctx.output_collection_changed_callback.unwrap()(
                        ctx as *mut _ as *mut _,
                        ctx.output_collection_changed_user_ptr,
                    );
                }
            }
            if source_changed && ctx.input_collection_changed_callback.is_some() {
                unsafe {
                    ctx.input_collection_changed_callback.unwrap()(
                        ctx as *mut _ as *mut _,
                        ctx.input_collection_changed_user_ptr,
                    );
                }
            }
            let _ = context.get_sink_info_by_name(
                try_cstr_from(info.default_sink_name),
                sink_info_cb,
                u,
            );
        } else {
            // If info is None, then an error occured.
            let ctx = unsafe { &mut *(u as *mut PulseContext) };
            ctx.mainloop.signal();
        }
    }
    fn new(name: Option<&CStr>) -> Result<Box<Self>> {
        let name = name.map(|s| s.to_owned());
        let mut ctx = PulseContext::_new(name)?;
        if ctx.mainloop.start().is_err() {
            ctx.destroy();
            cubeb_log!("Error: couldn't start pulse's mainloop");
            return Err(Error::Error);
        }
        if ctx.context_init().is_err() {
            ctx.destroy();
            cubeb_log!("Error: couldn't init pulse's context");
            return Err(Error::Error);
        }
        ctx.mainloop.lock();
        /* server_info_callback performs a second async query,
         * which is responsible for initializing default_sink_info
         * and signalling the mainloop to end the wait. */
        let user_data: *mut c_void = ctx.as_mut() as *mut _ as *mut _;
        if let Some(ref context) = ctx.context {
            if let Ok(o) = context.get_server_info(PulseContext::server_info_cb, user_data) {
                ctx.operation_wait(None, &o);
            }
        }
        ctx.mainloop.unlock();
        /* Update `default_sink_info` when the default device changes. */
        if let Err(e) = ctx.subscribe_notifications(pulse::SubscriptionMask::SERVER) {
            cubeb_log!("subscribe_notifications ignored failure: {}", e);
        }
        // Return the result.
        Ok(ctx)
    }
    pub fn destroy(&mut self) {
        self.context_destroy();
        assert!(
            self.input_collection_changed_callback.is_none()
                && self.output_collection_changed_callback.is_none()
        );
        if !self.mainloop.is_null() {
            self.mainloop.stop();
        }
    }
    fn subscribe_notifications(&mut self, mask: pulse::SubscriptionMask) -> Result<()> {
        fn update_collection(
            _: &pulse::Context,
            event: pulse::SubscriptionEvent,
            index: u32,
            user_data: *mut c_void,
        ) {
            let ctx = unsafe { &mut *(user_data as *mut PulseContext) };
            let (f, t) = (event.event_facility(), event.event_type());
            if (f == pulse::SubscriptionEventFacility::Source)
                | (f == pulse::SubscriptionEventFacility::Sink)
            {
                if (t == pulse::SubscriptionEventType::Remove)
                    | (t == pulse::SubscriptionEventType::New)
                {
                    if log_enabled() {
                        let op = if t == pulse::SubscriptionEventType::New {
                            "Adding"
                        } else {
                            "Removing"
                        };
                        let dev = if f == pulse::SubscriptionEventFacility::Sink {
                            "sink"
                        } else {
                            "source "
                        };
                        cubeb_log!("{} {} index {}", op, dev, index);
                    }
                    if f == pulse::SubscriptionEventFacility::Source {
                        unsafe {
                            ctx.input_collection_changed_callback.unwrap()(
                                ctx as *mut _ as *mut _,
                                ctx.input_collection_changed_user_ptr,
                            );
                        }
                    }
                    if f == pulse::SubscriptionEventFacility::Sink {
                        unsafe {
                            ctx.output_collection_changed_callback.unwrap()(
                                ctx as *mut _ as *mut _,
                                ctx.output_collection_changed_user_ptr,
                            );
                        }
                    }
                }
            } else if (f == pulse::SubscriptionEventFacility::Server)
                && (t == pulse::SubscriptionEventType::Change)
            {
                cubeb_log!("Server changed {}", index as i32);
                let user_data: *mut c_void = ctx as *mut _ as *mut _;
                if let Some(ref context) = ctx.context {
                    if let Err(e) = context.get_server_info(PulseContext::server_info_cb, user_data)
                    {
                        cubeb_log!("Error: get_server_info ignored failure: {}", e);
                    }
                }
            }
        }
        fn success(_: &pulse::Context, success: i32, user_data: *mut c_void) {
            let ctx = unsafe { &*(user_data as *mut PulseContext) };
            if success != 1 {
                cubeb_log!("subscribe_success ignored failure: {}", success);
            }
            ctx.mainloop.signal();
        }
        let user_data: *mut c_void = self as *const _ as *mut _;
        if let Some(ref context) = self.context {
            self.mainloop.lock();
            context.set_subscribe_callback(update_collection, user_data);
            if let Ok(o) = context.subscribe(mask, success, self as *const _ as *mut _) {
                self.operation_wait(None, &o);
            } else {
                self.mainloop.unlock();
                cubeb_log!("Error: context subscribe failed");
                return Err(Error::Error);
            }
            self.mainloop.unlock();
        }
        Ok(())
    }
}
impl ContextOps for PulseContext {
    fn init(context_name: Option<&CStr>) -> Result<Box<Self>> {
        PulseContext::new(context_name)
    }
    fn backend_id(&mut self) -> &'static CStr {
        #[allow(clippy::manual_c_str_literals)]
        unsafe {
            CStr::from_ptr(b"pulse-rust\0".as_ptr() as *const _)
        }
    }
    fn max_channel_count(&mut self) -> Result<u32> {
        match self.default_sink_info {
            Some(ref info) => Ok(u32::from(info.channel_map.channels)),
            None => {
                cubeb_log!("Error: couldn't get the max channel count");
                Err(Error::Error)
            }
        }
    }
    fn min_latency(&mut self, params: StreamParams) -> Result<u32> {
        // According to PulseAudio developers, this is a safe minimum.
        Ok(25 * params.rate() / 1000)
    }
    fn preferred_sample_rate(&mut self) -> Result<u32> {
        match self.default_sink_info {
            Some(ref info) => Ok(info.sample_spec.rate),
            None => {
                cubeb_log!("Error: Couldn't get the preferred sample rate");
                Err(Error::Error)
            }
        }
    }
    fn supported_input_processing_params(&mut self) -> Result<InputProcessingParams> {
        Ok(InputProcessingParams::NONE)
    }
    fn enumerate_devices(&mut self, devtype: DeviceType) -> Result<Box<[DeviceInfo]>> {
        fn add_output_device(
            _: &pulse::Context,
            i: *const pulse::SinkInfo,
            eol: i32,
            user_data: *mut c_void,
        ) {
            let list_data = unsafe { &mut *(user_data as *mut PulseDevListData) };
            let ctx = list_data.context;
            if eol != 0 {
                ctx.mainloop.signal();
                return;
            }
            debug_assert!(!i.is_null());
            debug_assert!(!user_data.is_null());
            let info = unsafe { &*i };
            let group_id = match info.proplist().gets("sysfs.path") {
                Some(p) => p.to_owned().into_raw(),
                _ => ptr::null_mut(),
            };
            let vendor_name = match info.proplist().gets("device.vendor.name") {
                Some(p) => p.to_owned().into_raw(),
                _ => ptr::null_mut(),
            };
            let info_name = unsafe { CStr::from_ptr(info.name) };
            let info_description = unsafe { CStr::from_ptr(info.description) }.to_owned();
            let preferred = if *info_name == *list_data.default_sink_name {
                ffi::CUBEB_DEVICE_PREF_ALL
            } else {
                ffi::CUBEB_DEVICE_PREF_NONE
            };
            let device_id = ctx.devids.borrow_mut().add(info_name);
            let friendly_name = info_description.into_raw();
            let devinfo = ffi::cubeb_device_info {
                device_id,
                devid: device_id as ffi::cubeb_devid,
                friendly_name,
                group_id,
                vendor_name,
                device_type: ffi::CUBEB_DEVICE_TYPE_OUTPUT,
                state: ctx.state_from_port(info.active_port),
                preferred,
                format: ffi::CUBEB_DEVICE_FMT_ALL,
                default_format: pulse_format_to_cubeb_format(info.sample_spec.format),
                max_channels: u32::from(info.channel_map.channels),
                min_rate: 1,
                max_rate: PA_RATE_MAX,
                default_rate: info.sample_spec.rate,
                latency_lo: 0,
                latency_hi: 0,
            };
            list_data.devinfo.push(devinfo.into());
        }
        fn add_input_device(
            _: &pulse::Context,
            i: *const pulse::SourceInfo,
            eol: i32,
            user_data: *mut c_void,
        ) {
            let list_data = unsafe { &mut *(user_data as *mut PulseDevListData) };
            let ctx = list_data.context;
            if eol != 0 {
                ctx.mainloop.signal();
                return;
            }
            debug_assert!(!user_data.is_null());
            debug_assert!(!i.is_null());
            let info = unsafe { &*i };
            let group_id = match info.proplist().gets("sysfs.path") {
                Some(p) => p.to_owned().into_raw(),
                _ => ptr::null_mut(),
            };
            let vendor_name = match info.proplist().gets("device.vendor.name") {
                Some(p) => p.to_owned().into_raw(),
                _ => ptr::null_mut(),
            };
            let info_name = unsafe { CStr::from_ptr(info.name) };
            let info_description = unsafe { CStr::from_ptr(info.description) }.to_owned();
            let preferred = if *info_name == *list_data.default_source_name {
                ffi::CUBEB_DEVICE_PREF_ALL
            } else {
                ffi::CUBEB_DEVICE_PREF_NONE
            };
            let device_id = ctx.devids.borrow_mut().add(info_name);
            let friendly_name = info_description.into_raw();
            let devinfo = ffi::cubeb_device_info {
                device_id,
                devid: device_id as ffi::cubeb_devid,
                friendly_name,
                group_id,
                vendor_name,
                device_type: ffi::CUBEB_DEVICE_TYPE_INPUT,
                state: ctx.state_from_port(info.active_port),
                preferred,
                format: ffi::CUBEB_DEVICE_FMT_ALL,
                default_format: pulse_format_to_cubeb_format(info.sample_spec.format),
                max_channels: u32::from(info.channel_map.channels),
                min_rate: 1,
                max_rate: PA_RATE_MAX,
                default_rate: info.sample_spec.rate,
                latency_lo: 0,
                latency_hi: 0,
            };
            list_data.devinfo.push(devinfo.into());
        }
        fn default_device_names(
            _: &pulse::Context,
            info: Option<&pulse::ServerInfo>,
            user_data: *mut c_void,
        ) {
            let list_data = unsafe { &mut *(user_data as *mut PulseDevListData) };
            if let Some(info) = info {
                list_data.default_sink_name = super::try_cstr_from(info.default_sink_name)
                    .map(|s| s.to_owned())
                    .unwrap_or_default();
                list_data.default_source_name = super::try_cstr_from(info.default_source_name)
                    .map(|s| s.to_owned())
                    .unwrap_or_default();
            }
            list_data.context.mainloop.signal();
        }
        let mut user_data = PulseDevListData::new(self);
        if let Some(ref context) = self.context {
            self.mainloop.lock();
            if let Ok(o) =
                context.get_server_info(default_device_names, &mut user_data as *mut _ as *mut _)
            {
                self.operation_wait(None, &o);
            }
            if devtype.contains(DeviceType::OUTPUT) {
                if let Ok(o) = context
                    .get_sink_info_list(add_output_device, &mut user_data as *mut _ as *mut _)
                {
                    self.operation_wait(None, &o);
                }
            }
            if devtype.contains(DeviceType::INPUT) {
                if let Ok(o) = context
                    .get_source_info_list(add_input_device, &mut user_data as *mut _ as *mut _)
                {
                    self.operation_wait(None, &o);
                }
            }
            self.mainloop.unlock();
        }
        Ok(user_data.devinfo.into_boxed_slice())
    }
    fn device_collection_destroy(&mut self, collection: Box<[DeviceInfo]>) -> Result<()> {
        for dev in collection {
            let dev = ffi::cubeb_device_info::from(dev);
            if !dev.group_id.is_null() {
                let _ = unsafe { CString::from_raw(dev.group_id as *mut _) };
            }
            if !dev.vendor_name.is_null() {
                let _ = unsafe { CString::from_raw(dev.vendor_name as *mut _) };
            }
            if !dev.friendly_name.is_null() {
                let _ = unsafe { CString::from_raw(dev.friendly_name as *mut _) };
            }
        }
        Ok(())
    }
    #[allow(clippy::too_many_arguments)]
    fn stream_init(
        &mut self,
        stream_name: Option<&CStr>,
        input_device: DeviceId,
        input_stream_params: Option<&StreamParamsRef>,
        output_device: DeviceId,
        output_stream_params: Option<&StreamParamsRef>,
        latency_frames: u32,
        data_callback: ffi::cubeb_data_callback,
        state_callback: ffi::cubeb_state_callback,
        user_ptr: *mut c_void,
    ) -> Result<Stream> {
        if self.error {
            self.context_init()?;
        }
        let stm = PulseStream::new(
            self,
            stream_name,
            input_device,
            input_stream_params,
            output_device,
            output_stream_params,
            latency_frames,
            data_callback,
            state_callback,
            user_ptr,
        )?;
        Ok(unsafe { Stream::from_ptr(Box::into_raw(stm) as *mut _) })
    }
    fn register_device_collection_changed(
        &mut self,
        devtype: DeviceType,
        cb: ffi::cubeb_device_collection_changed_callback,
        user_ptr: *mut c_void,
    ) -> Result<()> {
        if devtype.contains(DeviceType::INPUT) {
            self.input_collection_changed_callback = cb;
            self.input_collection_changed_user_ptr = user_ptr;
        }
        if devtype.contains(DeviceType::OUTPUT) {
            self.output_collection_changed_callback = cb;
            self.output_collection_changed_user_ptr = user_ptr;
        }
        let mut mask = pulse::SubscriptionMask::empty();
        if self.input_collection_changed_callback.is_some() {
            mask |= pulse::SubscriptionMask::SOURCE;
        }
        if self.output_collection_changed_callback.is_some() {
            mask |= pulse::SubscriptionMask::SINK;
        }
        /* Default device changed, this is always registered in order to update the
         * `default_sink_info` when the default device changes. */
        mask |= pulse::SubscriptionMask::SERVER;
        self.subscribe_notifications(mask)
    }
}
impl Drop for PulseContext {
    fn drop(&mut self) {
        self.destroy();
    }
}
impl PulseContext {
    /* Initialize PulseAudio Context */
    fn context_init(&mut self) -> Result<()> {
        fn error_state(c: &pulse::Context, u: *mut c_void) {
            let ctx = unsafe { &mut *(u as *mut PulseContext) };
            if !c.get_state().is_good() {
                ctx.error = true;
            }
            ctx.mainloop.signal();
        }
        if self.context.is_some() {
            debug_assert!(self.error);
            self.context_destroy();
        }
        self.context = {
            let name = self.context_name.as_ref().map(|s| s.as_ref());
            pulse::Context::new(&self.mainloop.get_api(), name)
        };
        let context_ptr: *mut c_void = self as *mut _ as *mut _;
        if self.context.is_none() {
            cubeb_log!("Error: couldn't create pulse's context");
            return Err(Error::Error);
        }
        self.mainloop.lock();
        let connected = if let Some(ref context) = self.context {
            context.set_state_callback(error_state, context_ptr);
            context
                .connect(None, pulse::ContextFlags::empty(), ptr::null())
                .is_ok()
        } else {
            false
        };
        if !connected || !self.wait_until_context_ready() {
            self.mainloop.unlock();
            self.context_destroy();
            cubeb_log!("Error: error while waiting for pulse's context to be ready");
            return Err(Error::Error);
        }
        self.mainloop.unlock();
        let version_str = unsafe { CStr::from_ptr(pulse::library_version()) };
        if let Ok(version) = semver::Version::parse(&version_str.to_string_lossy()) {
            self.version_0_9_8 =
                version >= semver::Version::parse("0.9.8").expect("Failed to parse version");
            self.version_2_0_0 =
                version >= semver::Version::parse("2.0.0").expect("Failed to parse version");
        }
        self.error = false;
        Ok(())
    }
    fn context_destroy(&mut self) {
        fn drain_complete(_: &pulse::Context, u: *mut c_void) {
            let ctx = unsafe { &*(u as *mut PulseContext) };
            ctx.mainloop.signal();
        }
        let context_ptr: *mut c_void = self as *mut _ as *mut _;
        if let Some(ctx) = self.context.take() {
            self.mainloop.lock();
            if let Ok(o) = ctx.drain(drain_complete, context_ptr) {
                self.operation_wait(None, &o);
            }
            ctx.clear_state_callback();
            ctx.disconnect();
            ctx.unref();
            self.mainloop.unlock();
        }
    }
    pub fn operation_wait<'a, S>(&self, s: S, o: &pulse::Operation) -> bool
    where
        S: Into<Option<&'a pulse::Stream>>,
    {
        let stream = s.into();
        while o.get_state() == PA_OPERATION_RUNNING {
            self.mainloop.wait();
            if let Some(ref context) = self.context {
                if !context.get_state().is_good() {
                    return false;
                }
            }
            if let Some(stm) = stream {
                if !stm.get_state().is_good() {
                    return false;
                }
            }
        }
        true
    }
    pub fn wait_until_context_ready(&self) -> bool {
        if let Some(ref context) = self.context {
            loop {
                let state = context.get_state();
                if !state.is_good() {
                    return false;
                }
                if state == pulse::ContextState::Ready {
                    break;
                }
                self.mainloop.wait();
            }
        }
        true
    }
    fn state_from_port(&self, i: *const pa_port_info) -> ffi::cubeb_device_state {
        if !i.is_null() {
            let info = unsafe { *i };
            if self.version_2_0_0 && info.available == PA_PORT_AVAILABLE_NO {
                ffi::CUBEB_DEVICE_STATE_UNPLUGGED
            } else {
                ffi::CUBEB_DEVICE_STATE_ENABLED
            }
        } else {
            ffi::CUBEB_DEVICE_STATE_ENABLED
        }
    }
}
struct PulseDevListData<'a> {
    default_sink_name: CString,
    default_source_name: CString,
    devinfo: Vec<DeviceInfo>,
    context: &'a PulseContext,
}
impl<'a> PulseDevListData<'a> {
    pub fn new<'b>(context: &'b PulseContext) -> Self
    where
        'b: 'a,
    {
        PulseDevListData {
            default_sink_name: CString::default(),
            default_source_name: CString::default(),
            devinfo: Vec::new(),
            context,
        }
    }
}
fn pulse_format_to_cubeb_format(format: pa_sample_format_t) -> ffi::cubeb_device_fmt {
    match format {
        PA_SAMPLE_S16LE => ffi::CUBEB_DEVICE_FMT_S16LE,
        PA_SAMPLE_S16BE => ffi::CUBEB_DEVICE_FMT_S16BE,
        PA_SAMPLE_FLOAT32LE => ffi::CUBEB_DEVICE_FMT_F32LE,
        PA_SAMPLE_FLOAT32BE => ffi::CUBEB_DEVICE_FMT_F32BE,
        // Unsupported format, return F32NE
        _ => ffi::CUBEB_DEVICE_FMT_F32NE,
    }
}