Source code
Revision control
Copy as Markdown
Other Tools
/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this file,
// Original author: ekr@rtfm.com
#include "MediaPipeline.h"
#include <inttypes.h>
#include <math.h>
#include <sstream>
#include <utility>
#include "AudioSegment.h"
#include "AudioConverter.h"
#include "DOMMediaStream.h"
#include "ImageContainer.h"
#include "ImageTypes.h"
#include "MediaEngine.h"
#include "MediaSegment.h"
#include "MediaTrackGraph.h"
#include "MediaTrackListener.h"
#include "MediaStreamTrack.h"
#include "RtpLogger.h"
#include "VideoFrameConverter.h"
#include "VideoSegment.h"
#include "VideoStreamTrack.h"
#include "VideoUtils.h"
#include "mozilla/Logging.h"
#include "mozilla/NullPrincipal.h"
#include "mozilla/PeerIdentity.h"
#include "mozilla/Preferences.h"
#include "mozilla/SharedThreadPool.h"
#include "mozilla/Sprintf.h"
#include "mozilla/StaticPrefs_media.h"
#include "mozilla/TaskQueue.h"
#include "mozilla/UniquePtr.h"
#include "mozilla/UniquePtrExtensions.h"
#include "mozilla/dom/RTCStatsReportBinding.h"
#include "mozilla/dom/Document.h"
#include "mozilla/gfx/Point.h"
#include "mozilla/gfx/Types.h"
#include "nsError.h"
#include "nsThreadUtils.h"
#include "transport/runnable_utils.h"
#include "jsapi/MediaTransportHandler.h"
#include "jsapi/PeerConnectionImpl.h"
#include "Tracing.h"
#include "libwebrtcglue/WebrtcImageBuffer.h"
#include "libwebrtcglue/MediaConduitInterface.h"
#include "common_video/include/video_frame_buffer.h"
#include "modules/rtp_rtcp/include/rtp_rtcp.h"
#include "modules/rtp_rtcp/include/rtp_header_extension_map.h"
#include "modules/rtp_rtcp/source/rtp_packet_received.h"
// Max size given stereo is 480*2*2 = 1920 (10ms of 16-bits stereo audio at
// 48KHz)
#define AUDIO_SAMPLE_BUFFER_MAX_BYTES (480 * 2 * 2)
static_assert((WEBRTC_MAX_SAMPLE_RATE / 100) * sizeof(uint16_t) * 2 <=
AUDIO_SAMPLE_BUFFER_MAX_BYTES,
"AUDIO_SAMPLE_BUFFER_MAX_BYTES is not large enough");
using namespace mozilla;
using namespace mozilla::dom;
using namespace mozilla::gfx;
using namespace mozilla::layers;
mozilla::LazyLogModule gMediaPipelineLog("MediaPipeline");
namespace mozilla {
// An async inserter for audio data, to avoid running audio codec encoders
// on the MTG/input audio thread. Basically just bounces all the audio
// data to a single audio processing/input queue. We could if we wanted to
// use multiple threads and a TaskQueue.
class AudioProxyThread {
public:
NS_INLINE_DECL_THREADSAFE_REFCOUNTING(AudioProxyThread)
explicit AudioProxyThread(RefPtr<AudioSessionConduit> aConduit)
: mConduit(std::move(aConduit)),
mTaskQueue(TaskQueue::Create(
GetMediaThreadPool(MediaThreadType::WEBRTC_WORKER), "AudioProxy")),
mAudioConverter(nullptr) {
MOZ_ASSERT(mConduit);
MOZ_COUNT_CTOR(AudioProxyThread);
}
// This function is the identity if aInputRate is supported.
// Else, it returns a rate that is supported, that ensure no loss in audio
// quality: the sampling rate returned is always greater to the inputed
// sampling-rate, if they differ..
uint32_t AppropriateSendingRateForInputRate(uint32_t aInputRate) {
AudioSessionConduit* conduit =
static_cast<AudioSessionConduit*>(mConduit.get());
if (conduit->IsSamplingFreqSupported(aInputRate)) {
return aInputRate;
}
if (aInputRate < 16000) {
return 16000;
}
if (aInputRate < 32000) {
return 32000;
}
if (aInputRate < 44100) {
return 44100;
}
return 48000;
}
// From an arbitrary AudioChunk at sampling-rate aRate, process the audio into
// something the conduit can work with (or send silence if the track is not
// enabled), and send the audio in 10ms chunks to the conduit.
void InternalProcessAudioChunk(TrackRate aRate, const AudioChunk& aChunk,
bool aEnabled) {
MOZ_ASSERT(mTaskQueue->IsCurrentThreadIn());
// Convert to interleaved 16-bits integer audio, with a maximum of two
// channels (since the WebRTC.org code below makes the assumption that the
// input audio is either mono or stereo), with a sample-rate rate that is
// 16, 32, 44.1, or 48kHz.
uint32_t outputChannels = aChunk.ChannelCount() == 1 ? 1 : 2;
int32_t transmissionRate = AppropriateSendingRateForInputRate(aRate);
// We take advantage of the fact that the common case (microphone directly
// to PeerConnection, that is, a normal call), the samples are already
// 16-bits mono, so the representation in interleaved and planar is the
// same, and we can just use that.
if (aEnabled && outputChannels == 1 &&
aChunk.mBufferFormat == AUDIO_FORMAT_S16 && transmissionRate == aRate) {
const int16_t* samples = aChunk.ChannelData<int16_t>().Elements()[0];
PacketizeAndSend(samples, transmissionRate, outputChannels,
aChunk.mDuration);
return;
}
uint32_t sampleCount = aChunk.mDuration * outputChannels;
if (mInterleavedAudio.Length() < sampleCount) {
mInterleavedAudio.SetLength(sampleCount);
}
if (!aEnabled || aChunk.mBufferFormat == AUDIO_FORMAT_SILENCE) {
PodZero(mInterleavedAudio.Elements(), sampleCount);
} else if (aChunk.mBufferFormat == AUDIO_FORMAT_FLOAT32) {
DownmixAndInterleave(aChunk.ChannelData<float>(), aChunk.mDuration,
aChunk.mVolume, outputChannels,
mInterleavedAudio.Elements());
} else if (aChunk.mBufferFormat == AUDIO_FORMAT_S16) {
DownmixAndInterleave(aChunk.ChannelData<int16_t>(), aChunk.mDuration,
aChunk.mVolume, outputChannels,
mInterleavedAudio.Elements());
}
int16_t* inputAudio = mInterleavedAudio.Elements();
size_t inputAudioFrameCount = aChunk.mDuration;
AudioConfig inputConfig(AudioConfig::ChannelLayout(outputChannels), aRate,
AudioConfig::FORMAT_S16);
AudioConfig outputConfig(AudioConfig::ChannelLayout(outputChannels),
transmissionRate, AudioConfig::FORMAT_S16);
// Resample to an acceptable sample-rate for the sending side
if (!mAudioConverter || mAudioConverter->InputConfig() != inputConfig ||
mAudioConverter->OutputConfig() != outputConfig) {
mAudioConverter = MakeUnique<AudioConverter>(inputConfig, outputConfig);
}
int16_t* processedAudio = nullptr;
size_t framesProcessed =
mAudioConverter->Process(inputAudio, inputAudioFrameCount);
if (framesProcessed == 0) {
// In place conversion not possible, use a buffer.
framesProcessed = mAudioConverter->Process(mOutputAudio, inputAudio,
inputAudioFrameCount);
processedAudio = mOutputAudio.Data();
} else {
processedAudio = inputAudio;
}
PacketizeAndSend(processedAudio, transmissionRate, outputChannels,
framesProcessed);
}
// This packetizes aAudioData in 10ms chunks and sends it.
// aAudioData is interleaved audio data at a rate and with a channel count
// that is appropriate to send with the conduit.
void PacketizeAndSend(const int16_t* aAudioData, uint32_t aRate,
uint32_t aChannels, uint32_t aFrameCount) {
MOZ_ASSERT(AppropriateSendingRateForInputRate(aRate) == aRate);
MOZ_ASSERT(aChannels == 1 || aChannels == 2);
MOZ_ASSERT(aAudioData);
uint32_t audio_10ms = aRate / 100;
if (!mPacketizer || mPacketizer->mPacketSize != audio_10ms ||
mPacketizer->mChannels != aChannels) {
// It's the right thing to drop the bit of audio still in the packetizer:
// we don't want to send to the conduit audio that has two different
// rates while telling it that it has a constante rate.
mPacketizer =
MakeUnique<AudioPacketizer<int16_t, int16_t>>(audio_10ms, aChannels);
mPacket = MakeUnique<int16_t[]>(audio_10ms * aChannels);
}
mPacketizer->Input(aAudioData, aFrameCount);
while (mPacketizer->PacketsAvailable()) {
mPacketizer->Output(mPacket.get());
auto frame = std::make_unique<webrtc::AudioFrame>();
// UpdateFrame makes a copy of the audio data.
frame->UpdateFrame(frame->timestamp_, mPacket.get(),
mPacketizer->mPacketSize, aRate, frame->speech_type_,
frame->vad_activity_, mPacketizer->mChannels);
mConduit->SendAudioFrame(std::move(frame));
}
}
void QueueAudioChunk(TrackRate aRate, const AudioChunk& aChunk,
bool aEnabled) {
RefPtr<AudioProxyThread> self = this;
nsresult rv = mTaskQueue->Dispatch(NS_NewRunnableFunction(
"AudioProxyThread::QueueAudioChunk", [self, aRate, aChunk, aEnabled]() {
self->InternalProcessAudioChunk(aRate, aChunk, aEnabled);
}));
MOZ_DIAGNOSTIC_ASSERT(NS_SUCCEEDED(rv));
Unused << rv;
}
protected:
virtual ~AudioProxyThread() { MOZ_COUNT_DTOR(AudioProxyThread); }
const RefPtr<AudioSessionConduit> mConduit;
const RefPtr<TaskQueue> mTaskQueue;
// Only accessed on mTaskQueue
UniquePtr<AudioPacketizer<int16_t, int16_t>> mPacketizer;
// A buffer to hold a single packet of audio.
UniquePtr<int16_t[]> mPacket;
nsTArray<int16_t> mInterleavedAudio;
AlignedShortBuffer mOutputAudio;
UniquePtr<AudioConverter> mAudioConverter;
};
#define INIT_MIRROR(name, val) \
name(AbstractThread::MainThread(), val, "MediaPipeline::" #name " (Mirror)")
MediaPipeline::MediaPipeline(const std::string& aPc,
RefPtr<MediaTransportHandler> aTransportHandler,
DirectionType aDirection,
RefPtr<AbstractThread> aCallThread,
RefPtr<nsISerialEventTarget> aStsThread,
RefPtr<MediaSessionConduit> aConduit)
: mConduit(std::move(aConduit)),
mDirection(aDirection),
mCallThread(std::move(aCallThread)),
mStsThread(std::move(aStsThread)),
INIT_MIRROR(mActive, false),
mLevel(0),
mTransportHandler(std::move(aTransportHandler)),
mRtpPacketsSent(0),
mRtcpPacketsSent(0),
mRtpPacketsReceived(0),
mRtpBytesSent(0),
mRtpBytesReceived(0),
mPc(aPc),
mRtpHeaderExtensionMap(new webrtc::RtpHeaderExtensionMap()),
mPacketDumper(PacketDumper::GetPacketDumper(mPc)) {}
#undef INIT_MIRROR
MediaPipeline::~MediaPipeline() {
MOZ_LOG(gMediaPipelineLog, LogLevel::Info,
("Destroying MediaPipeline: %s", mDescription.c_str()));
}
void MediaPipeline::Shutdown() {
MOZ_ASSERT(NS_IsMainThread());
mActive.DisconnectIfConnected();
RUN_ON_THREAD(mStsThread,
WrapRunnable(RefPtr<MediaPipeline>(this),
&MediaPipeline::DetachTransport_s),
NS_DISPATCH_NORMAL);
}
void MediaPipeline::DetachTransport_s() {
ASSERT_ON_THREAD(mStsThread);
MOZ_LOG(gMediaPipelineLog, LogLevel::Info,
("%s in %s", mDescription.c_str(), __FUNCTION__));
disconnect_all();
mRtpState = TransportLayer::TS_NONE;
mRtcpState = TransportLayer::TS_NONE;
mTransportId.clear();
mConduit->SetTransportActive(false);
mRtpSendEventListener.DisconnectIfExists();
mSenderRtcpSendEventListener.DisconnectIfExists();
mReceiverRtcpSendEventListener.DisconnectIfExists();
}
void MediaPipeline::UpdateTransport_m(
const std::string& aTransportId, UniquePtr<MediaPipelineFilter>&& aFilter) {
mStsThread->Dispatch(NS_NewRunnableFunction(
__func__, [aTransportId, filter = std::move(aFilter),
self = RefPtr<MediaPipeline>(this)]() mutable {
self->UpdateTransport_s(aTransportId, std::move(filter));
}));
}
void MediaPipeline::UpdateTransport_s(
const std::string& aTransportId, UniquePtr<MediaPipelineFilter>&& aFilter) {
ASSERT_ON_THREAD(mStsThread);
if (!mSignalsConnected) {
mTransportHandler->SignalStateChange.connect(
this, &MediaPipeline::RtpStateChange);
mTransportHandler->SignalRtcpStateChange.connect(
this, &MediaPipeline::RtcpStateChange);
mTransportHandler->SignalEncryptedSending.connect(
this, &MediaPipeline::EncryptedPacketSending);
mTransportHandler->SignalPacketReceived.connect(
this, &MediaPipeline::PacketReceived);
mTransportHandler->SignalAlpnNegotiated.connect(
this, &MediaPipeline::AlpnNegotiated);
mSignalsConnected = true;
}
if (aTransportId != mTransportId) {
mTransportId = aTransportId;
mRtpState = mTransportHandler->GetState(mTransportId, false);
mRtcpState = mTransportHandler->GetState(mTransportId, true);
CheckTransportStates();
}
if (mFilter) {
for (const auto& extension : mFilter->GetExtmap()) {
mRtpHeaderExtensionMap->Deregister(extension.uri);
}
}
if (mFilter && aFilter) {
// Use the new filter, but don't forget any remote SSRCs that we've learned
// by receiving traffic.
mFilter->Update(*aFilter);
} else {
mFilter = std::move(aFilter);
}
if (mFilter) {
for (const auto& extension : mFilter->GetExtmap()) {
mRtpHeaderExtensionMap->RegisterByUri(extension.id, extension.uri);
}
}
}
void MediaPipeline::GetContributingSourceStats(
const nsString& aInboundRtpStreamId,
FallibleTArray<dom::RTCRTPContributingSourceStats>& aArr) const {
ASSERT_ON_THREAD(mStsThread);
// Get the expiry from now
DOMHighResTimeStamp expiry =
RtpCSRCStats::GetExpiryFromTime(GetTimestampMaker().GetNow().ToDom());
for (auto info : mCsrcStats) {
if (!info.second.Expired(expiry)) {
RTCRTPContributingSourceStats stats;
info.second.GetWebidlInstance(stats, aInboundRtpStreamId);
if (!aArr.AppendElement(stats, fallible)) {
mozalloc_handle_oom(0);
}
}
}
}
void MediaPipeline::RtpStateChange(const std::string& aTransportId,
TransportLayer::State aState) {
if (mTransportId != aTransportId) {
return;
}
mRtpState = aState;
CheckTransportStates();
}
void MediaPipeline::RtcpStateChange(const std::string& aTransportId,
TransportLayer::State aState) {
if (mTransportId != aTransportId) {
return;
}
mRtcpState = aState;
CheckTransportStates();
}
void MediaPipeline::CheckTransportStates() {
ASSERT_ON_THREAD(mStsThread);
if (mRtpState == TransportLayer::TS_CLOSED ||
mRtpState == TransportLayer::TS_ERROR ||
mRtcpState == TransportLayer::TS_CLOSED ||
mRtcpState == TransportLayer::TS_ERROR) {
MOZ_LOG(gMediaPipelineLog, LogLevel::Warning,
("RTP Transport failed for pipeline %p flow %s", this,
mDescription.c_str()));
NS_WARNING(
"MediaPipeline Transport failed. This is not properly cleaned up yet");
// TODO(ekr@rtfm.com): SECURITY: Figure out how to clean up if the
// connection was good and now it is bad.
// TODO(ekr@rtfm.com): Report up so that the PC knows we
// have experienced an error.
mConduit->SetTransportActive(false);
mRtpSendEventListener.DisconnectIfExists();
mSenderRtcpSendEventListener.DisconnectIfExists();
mReceiverRtcpSendEventListener.DisconnectIfExists();
return;
}
if (mRtpState == TransportLayer::TS_OPEN) {
MOZ_LOG(gMediaPipelineLog, LogLevel::Info,
("RTP Transport ready for pipeline %p flow %s", this,
mDescription.c_str()));
}
if (mRtcpState == TransportLayer::TS_OPEN) {
MOZ_LOG(gMediaPipelineLog, LogLevel::Info,
("RTCP Transport ready for pipeline %p flow %s", this,
mDescription.c_str()));
}
if (mRtpState == TransportLayer::TS_OPEN && mRtcpState == mRtpState) {
if (mDirection == DirectionType::TRANSMIT) {
mRtpSendEventListener = mConduit->SenderRtpSendEvent().Connect(
mStsThread, this, &MediaPipeline::SendPacket);
mSenderRtcpSendEventListener = mConduit->SenderRtcpSendEvent().Connect(
mStsThread, this, &MediaPipeline::SendPacket);
} else {
mConduit->ConnectReceiverRtpEvent(mRtpReceiveEvent);
mReceiverRtcpSendEventListener =
mConduit->ReceiverRtcpSendEvent().Connect(mStsThread, this,
&MediaPipeline::SendPacket);
}
mConduit->SetTransportActive(true);
TransportReady_s();
}
}
void MediaPipeline::SendPacket(MediaPacket&& aPacket) {
ASSERT_ON_THREAD(mStsThread);
const bool isRtp = aPacket.type() == MediaPacket::RTP;
if (isRtp && mRtpState != TransportLayer::TS_OPEN) {
return;
}
if (!isRtp && mRtcpState != TransportLayer::TS_OPEN) {
return;
}
aPacket.sdp_level() = Some(Level());
if (RtpLogger::IsPacketLoggingOn()) {
RtpLogger::LogPacket(aPacket, false, mDescription);
}
if (isRtp) {
mPacketDumper->Dump(Level(), dom::mozPacketDumpType::Rtp, true,
aPacket.data(), aPacket.len());
IncrementRtpPacketsSent(aPacket);
} else {
mPacketDumper->Dump(Level(), dom::mozPacketDumpType::Rtcp, true,
aPacket.data(), aPacket.len());
IncrementRtcpPacketsSent();
}
MOZ_LOG(
gMediaPipelineLog, LogLevel::Debug,
("%s sending %s packet", mDescription.c_str(), (isRtp ? "RTP" : "RTCP")));
mTransportHandler->SendPacket(mTransportId, std::move(aPacket));
}
void MediaPipeline::IncrementRtpPacketsSent(const MediaPacket& aPacket) {
ASSERT_ON_THREAD(mStsThread);
++mRtpPacketsSent;
mRtpBytesSent += aPacket.len();
if (!(mRtpPacketsSent % 100)) {
MOZ_LOG(gMediaPipelineLog, LogLevel::Info,
("RTP sent packet count for %s Pipeline %p: %u (%" PRId64 " bytes)",
mDescription.c_str(), this, mRtpPacketsSent, mRtpBytesSent));
}
}
void MediaPipeline::IncrementRtcpPacketsSent() {
ASSERT_ON_THREAD(mStsThread);
++mRtcpPacketsSent;
if (!(mRtcpPacketsSent % 100)) {
MOZ_LOG(gMediaPipelineLog, LogLevel::Info,
("RTCP sent packet count for %s Pipeline %p: %u",
mDescription.c_str(), this, mRtcpPacketsSent));
}
}
void MediaPipeline::IncrementRtpPacketsReceived(int32_t aBytes) {
ASSERT_ON_THREAD(mStsThread);
++mRtpPacketsReceived;
mRtpBytesReceived += aBytes;
if (!(mRtpPacketsReceived % 100)) {
MOZ_LOG(
gMediaPipelineLog, LogLevel::Info,
("RTP received packet count for %s Pipeline %p: %u (%" PRId64 " bytes)",
mDescription.c_str(), this, mRtpPacketsReceived, mRtpBytesReceived));
}
}
void MediaPipeline::PacketReceived(const std::string& aTransportId,
const MediaPacket& packet) {
ASSERT_ON_THREAD(mStsThread);
if (mTransportId != aTransportId) {
return;
}
MOZ_ASSERT(mRtpState == TransportLayer::TS_OPEN);
if (packet.type() != MediaPacket::RTP) {
return;
}
if (mDirection == DirectionType::TRANSMIT) {
return;
}
if (!packet.len()) {
return;
}
webrtc::RTPHeader header;
rtc::CopyOnWriteBuffer packet_buffer(packet.data(), packet.len());
webrtc::RtpPacketReceived parsedPacket(mRtpHeaderExtensionMap.get());
if (!parsedPacket.Parse(packet_buffer)) {
return;
}
parsedPacket.GetHeader(&header);
if (mFilter && !mFilter->Filter(header)) {
return;
}
auto now = GetTimestampMaker().GetNow();
parsedPacket.set_arrival_time(now.ToRealtime());
if (IsVideo()) {
parsedPacket.set_payload_type_frequency(webrtc::kVideoPayloadTypeFrequency);
}
// Remove expired RtpCSRCStats
if (!mCsrcStats.empty()) {
auto expiry = RtpCSRCStats::GetExpiryFromTime(now.ToDom());
for (auto p = mCsrcStats.begin(); p != mCsrcStats.end();) {
if (p->second.Expired(expiry)) {
p = mCsrcStats.erase(p);
continue;
}
p++;
}
}
// Add new RtpCSRCStats
if (header.numCSRCs) {
for (auto i = 0; i < header.numCSRCs; i++) {
auto csrcInfo = mCsrcStats.find(header.arrOfCSRCs[i]);
if (csrcInfo == mCsrcStats.end()) {
mCsrcStats.insert(
std::make_pair(header.arrOfCSRCs[i],
RtpCSRCStats(header.arrOfCSRCs[i], now.ToDom())));
} else {
csrcInfo->second.SetTimestamp(now.ToDom());
}
}
}
MOZ_LOG(gMediaPipelineLog, LogLevel::Debug,
("%s received RTP packet.", mDescription.c_str()));
IncrementRtpPacketsReceived(packet.len());
RtpLogger::LogPacket(packet, true, mDescription);
// Might be nice to pass ownership of the buffer in this case, but it is a
// small optimization in a rare case.
mPacketDumper->Dump(mLevel, dom::mozPacketDumpType::Srtp, false,
packet.encrypted_data(), packet.encrypted_len());
mPacketDumper->Dump(mLevel, dom::mozPacketDumpType::Rtp, false, packet.data(),
packet.len());
mRtpReceiveEvent.Notify(std::move(parsedPacket), header);
}
void MediaPipeline::AlpnNegotiated(const std::string& aAlpn,
bool aPrivacyRequested) {
ASSERT_ON_THREAD(mStsThread);
if (aPrivacyRequested && Direction() == DirectionType::RECEIVE) {
// This will force the receive pipelines to drop data until they have
// received a private PrincipalHandle from RTCRtpReceiver (which takes a
// detour via main thread).
static_cast<MediaPipelineReceive*>(this)->OnPrivacyRequested_s();
}
}
void MediaPipeline::EncryptedPacketSending(const std::string& aTransportId,
const MediaPacket& aPacket) {
ASSERT_ON_THREAD(mStsThread);
if (mTransportId == aTransportId) {
dom::mozPacketDumpType type;
if (aPacket.type() == MediaPacket::SRTP) {
type = dom::mozPacketDumpType::Srtp;
} else if (aPacket.type() == MediaPacket::SRTCP) {
type = dom::mozPacketDumpType::Srtcp;
} else if (aPacket.type() == MediaPacket::DTLS) {
return;
} else {
MOZ_ASSERT(false);
return;
}
mPacketDumper->Dump(Level(), type, true, aPacket.data(), aPacket.len());
}
}
class MediaPipelineTransmit::PipelineListener
: public DirectMediaTrackListener {
friend class MediaPipelineTransmit;
public:
explicit PipelineListener(RefPtr<MediaSessionConduit> aConduit)
: mConduit(std::move(aConduit)),
mActive(false),
mEnabled(false),
mDirectConnect(false) {}
~PipelineListener() {
if (mConverter) {
mConverter->Shutdown();
}
}
void SetActive(bool aActive) {
mActive = aActive;
if (mConverter) {
mConverter->SetActive(aActive);
}
}
void SetEnabled(bool aEnabled) { mEnabled = aEnabled; }
// These are needed since nested classes don't have access to any particular
// instance of the parent
void SetAudioProxy(RefPtr<AudioProxyThread> aProxy) {
mAudioProcessing = std::move(aProxy);
}
void SetVideoFrameConverter(RefPtr<VideoFrameConverter> aConverter) {
mConverter = std::move(aConverter);
}
void OnVideoFrameConverted(webrtc::VideoFrame aVideoFrame) {
MOZ_RELEASE_ASSERT(mConduit->type() == MediaSessionConduit::VIDEO);
static_cast<VideoSessionConduit*>(mConduit.get())
->SendVideoFrame(std::move(aVideoFrame));
}
// Implement MediaTrackListener
void NotifyQueuedChanges(MediaTrackGraph* aGraph, TrackTime aOffset,
const MediaSegment& aQueuedMedia) override;
void NotifyEnabledStateChanged(MediaTrackGraph* aGraph,
bool aEnabled) override;
// Implement DirectMediaTrackListener
void NotifyRealtimeTrackData(MediaTrackGraph* aGraph, TrackTime aOffset,
const MediaSegment& aMedia) override;
void NotifyDirectListenerInstalled(InstallationResult aResult) override;
void NotifyDirectListenerUninstalled() override;
private:
void NewData(const MediaSegment& aMedia, TrackRate aRate = 0);
const RefPtr<MediaSessionConduit> mConduit;
RefPtr<AudioProxyThread> mAudioProcessing;
RefPtr<VideoFrameConverter> mConverter;
// active is true if there is a transport to send on
mozilla::Atomic<bool> mActive;
// enabled is true if the media access control permits sending
// actual content; when false you get black/silence
mozilla::Atomic<bool> mEnabled;
// Written and read on the MediaTrackGraph thread
bool mDirectConnect;
};
MediaPipelineTransmit::MediaPipelineTransmit(
const std::string& aPc, RefPtr<MediaTransportHandler> aTransportHandler,
RefPtr<AbstractThread> aCallThread, RefPtr<nsISerialEventTarget> aStsThread,
bool aIsVideo, RefPtr<MediaSessionConduit> aConduit)
: MediaPipeline(aPc, std::move(aTransportHandler), DirectionType::TRANSMIT,
std::move(aCallThread), std::move(aStsThread),
std::move(aConduit)),
mWatchManager(this, AbstractThread::MainThread()),
mIsVideo(aIsVideo),
mListener(new PipelineListener(mConduit)),
mDomTrack(nullptr, "MediaPipelineTransmit::mDomTrack"),
mSendTrackOverride(nullptr, "MediaPipelineTransmit::mSendTrackOverride") {
if (!IsVideo()) {
mAudioProcessing =
MakeAndAddRef<AudioProxyThread>(*mConduit->AsAudioSessionConduit());
mListener->SetAudioProxy(mAudioProcessing);
}
mWatchManager.Watch(mActive, &MediaPipelineTransmit::UpdateSendState);
mWatchManager.Watch(mDomTrack, &MediaPipelineTransmit::UpdateSendState);
mWatchManager.Watch(mSendTrackOverride,
&MediaPipelineTransmit::UpdateSendState);
mDescription = GenerateDescription();
}
void MediaPipelineTransmit::RegisterListener() {
if (!IsVideo()) {
return;
}
mConverter = VideoFrameConverter::Create(GetTimestampMaker());
mFrameListener = mConverter->VideoFrameConvertedEvent().Connect(
mConverter->mTaskQueue,
[listener = mListener](webrtc::VideoFrame aFrame) {
listener->OnVideoFrameConverted(std::move(aFrame));
});
mListener->SetVideoFrameConverter(mConverter);
}
already_AddRefed<MediaPipelineTransmit> MediaPipelineTransmit::Create(
const std::string& aPc, RefPtr<MediaTransportHandler> aTransportHandler,
RefPtr<AbstractThread> aCallThread, RefPtr<nsISerialEventTarget> aStsThread,
bool aIsVideo, RefPtr<MediaSessionConduit> aConduit) {
RefPtr<MediaPipelineTransmit> transmit = new MediaPipelineTransmit(
aPc, std::move(aTransportHandler), std::move(aCallThread),
std::move(aStsThread), aIsVideo, std::move(aConduit));
transmit->RegisterListener();
return transmit.forget();
}
MediaPipelineTransmit::~MediaPipelineTransmit() {
mFrameListener.DisconnectIfExists();
MOZ_ASSERT(!mTransmitting);
MOZ_ASSERT(!mDomTrack.Ref());
}
void MediaPipelineTransmit::InitControl(
MediaPipelineTransmitControlInterface* aControl) {
aControl->CanonicalTransmitting().ConnectMirror(&mActive);
}
void MediaPipelineTransmit::Shutdown() {
MediaPipeline::Shutdown();
mWatchManager.Shutdown();
if (mDomTrack.Ref()) {
mDomTrack.Ref()->RemovePrincipalChangeObserver(this);
mDomTrack = nullptr;
}
mUnsettingSendTrack = false;
UpdateSendState();
MOZ_ASSERT(!mTransmitting);
}
void MediaPipeline::SetDescription_s(const std::string& description) {
ASSERT_ON_THREAD(mStsThread);
mDescription = description;
}
std::string MediaPipelineTransmit::GenerateDescription() const {
MOZ_ASSERT(NS_IsMainThread());
std::stringstream description;
description << mPc << "| ";
description << (mIsVideo ? "Transmit video[" : "Transmit audio[");
if (mDomTrack.Ref()) {
nsString nsTrackId;
mDomTrack.Ref()->GetId(nsTrackId);
description << NS_ConvertUTF16toUTF8(nsTrackId).get();
} else if (mSendTrackOverride.Ref()) {
description << "override " << mSendTrackOverride.Ref().get();
} else {
description << "no track";
}
description << "]";
return description.str();
}
void MediaPipelineTransmit::UpdateSendState() {
MOZ_ASSERT(NS_IsMainThread());
// This runs because either mActive, mDomTrack or mSendTrackOverride changed,
// or because mSendTrack was unset async. Based on these inputs this method
// is responsible for hooking up mSendTrack to mListener in order to feed data
// to the conduit.
//
// If we are inactive, or if the send track does not match what we want to
// send (mDomTrack or mSendTrackOverride), we must stop feeding data to the
// conduit. NB that removing the listener from mSendTrack is async, and we
// must wait for it to resolve before adding mListener to another track.
// mUnsettingSendTrack gates us until the listener has been removed from
// mSendTrack.
//
// If we are active and the send track does match what we want to send, we
// make sure mListener is added to the send track. Either now, or if we're
// still waiting for another send track to be removed, during a future call to
// this method.
if (mUnsettingSendTrack) {
// We must wait for the send track to be unset before we can set it again,
// to avoid races. Once unset this function is triggered again.
return;
}
const bool wasTransmitting = mTransmitting;
const bool haveLiveSendTrack = mSendTrack && !mSendTrack->IsDestroyed();
const bool haveLiveDomTrack = mDomTrack.Ref() && !mDomTrack.Ref()->Ended();
const bool haveLiveOverrideTrack =
mSendTrackOverride.Ref() && !mSendTrackOverride.Ref()->IsDestroyed();
const bool mustRemoveSendTrack =
haveLiveSendTrack && !mSendTrackOverride.Ref() &&
(!haveLiveDomTrack || mDomTrack.Ref()->GetTrack() != mSendPortSource);
mTransmitting = mActive && (haveLiveDomTrack || haveLiveOverrideTrack) &&
!mustRemoveSendTrack;
MOZ_LOG(gMediaPipelineLog, LogLevel::Debug,
("MediaPipeline %p UpdateSendState wasTransmitting=%d, active=%d, "
"sendTrack=%p (%s), domTrack=%p (%s), "
"sendTrackOverride=%p (%s), mustRemove=%d, mTransmitting=%d",
this, wasTransmitting, mActive.Ref(), mSendTrack.get(),
haveLiveSendTrack ? "live" : "ended", mDomTrack.Ref().get(),
haveLiveDomTrack ? "live" : "ended", mSendTrackOverride.Ref().get(),
haveLiveOverrideTrack ? "live" : "ended", mustRemoveSendTrack,
mTransmitting));
if (!wasTransmitting && mTransmitting) {
MOZ_LOG(gMediaPipelineLog, LogLevel::Debug,
("Attaching pipeline %p to track %p conduit type=%s", this,
mDomTrack.Ref().get(), mIsVideo ? "video" : "audio"));
if (mDescriptionInvalidated) {
// Only update the description when we attach to a track, as detaching is
// always a longer async step than updating the description. Updating on
// detach would cause the wrong track id to be attributed in logs.
RUN_ON_THREAD(mStsThread,
WrapRunnable(RefPtr<MediaPipeline>(this),
&MediaPipelineTransmit::SetDescription_s,
GenerateDescription()),
NS_DISPATCH_NORMAL);
mDescriptionInvalidated = false;
}
if (mSendTrackOverride.Ref()) {
// Special path that allows unittests to avoid mDomTrack and the graph by
// manually calling SetSendTrack.
mSendTrack = mSendTrackOverride.Ref();
} else {
mSendTrack = mDomTrack.Ref()->Graph()->CreateForwardedInputTrack(
mDomTrack.Ref()->GetTrack()->mType);
mSendPortSource = mDomTrack.Ref()->GetTrack();
mSendPort = mSendTrack->AllocateInputPort(mSendPortSource.get());
}
if (mIsVideo) {
mConverter->SetTrackingId(mDomTrack.Ref()->GetSource().mTrackingId);
}
mSendTrack->QueueSetAutoend(false);
if (mIsVideo) {
mSendTrack->AddDirectListener(mListener);
}
mSendTrack->AddListener(mListener);
}
if (wasTransmitting && !mTransmitting) {
MOZ_LOG(gMediaPipelineLog, LogLevel::Debug,
("Detaching pipeline %p from track %p conduit type=%s", this,
mDomTrack.Ref().get(), mIsVideo ? "video" : "audio"));
mUnsettingSendTrack = true;
if (mIsVideo) {
mSendTrack->RemoveDirectListener(mListener);
}
mSendTrack->RemoveListener(mListener)->Then(
GetMainThreadSerialEventTarget(), __func__,
[this, self = RefPtr<MediaPipelineTransmit>(this)] {
mUnsettingSendTrack = false;
mSendTrack = nullptr;
if (!mWatchManager.IsShutdown()) {
mWatchManager.ManualNotify(&MediaPipelineTransmit::UpdateSendState);
}
});
if (!mSendTrackOverride.Ref()) {
// If an override is set it may be re-used.
mSendTrack->Destroy();
mSendPort->Destroy();
mSendPort = nullptr;
mSendPortSource = nullptr;
}
}
}
bool MediaPipelineTransmit::Transmitting() const {
MOZ_ASSERT(NS_IsMainThread());
return mActive;
}
bool MediaPipelineTransmit::IsVideo() const { return mIsVideo; }
void MediaPipelineTransmit::PrincipalChanged(dom::MediaStreamTrack* aTrack) {
MOZ_ASSERT(aTrack && aTrack == mDomTrack.Ref());
PeerConnectionWrapper pcw(mPc);
if (pcw.impl()) {
Document* doc = pcw.impl()->GetParentObject()->GetExtantDoc();
if (doc) {
UpdateSinkIdentity(doc->NodePrincipal(), pcw.impl()->GetPeerIdentity());
} else {
MOZ_LOG(gMediaPipelineLog, LogLevel::Info,
("Can't update sink principal; document gone"));
}
}
}
void MediaPipelineTransmit::UpdateSinkIdentity(
nsIPrincipal* aPrincipal, const PeerIdentity* aSinkIdentity) {
MOZ_ASSERT(NS_IsMainThread());
if (!mDomTrack.Ref()) {
// Nothing to do here
return;
}
bool enableTrack = aPrincipal->Subsumes(mDomTrack.Ref()->GetPrincipal());
if (!enableTrack) {
// first try didn't work, but there's a chance that this is still available
// if our track is bound to a peerIdentity, and the peer connection (our
// sink) is bound to the same identity, then we can enable the track.
const PeerIdentity* trackIdentity = mDomTrack.Ref()->GetPeerIdentity();
if (aSinkIdentity && trackIdentity) {
enableTrack = (*aSinkIdentity == *trackIdentity);
}
}
mListener->SetEnabled(enableTrack);
}
void MediaPipelineTransmit::TransportReady_s() {