Source code

Revision control

Other Tools

/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
/* vim: set ts=2 et sw=2 tw=80: */
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this file,
* You can obtain one at http://mozilla.org/MPL/2.0/. */
#include <algorithm>
#include <stdio.h>
#include <stdlib.h>
#if !defined(__Userspace_os_Windows)
# include <arpa/inet.h>
#endif
// usrsctp.h expects to have errno definitions prior to its inclusion.
#include <errno.h>
#define SCTP_DEBUG 1
#define SCTP_STDINT_INCLUDE <stdint.h>
#ifdef _MSC_VER
// Disable "warning C4200: nonstandard extension used : zero-sized array in
// struct/union"
// ...which the third-party file usrsctp.h runs afoul of.
# pragma warning(push)
# pragma warning(disable : 4200)
#endif
#include "usrsctp.h"
#ifdef _MSC_VER
# pragma warning(pop)
#endif
#include "nsServiceManagerUtils.h"
#include "nsIInputStream.h"
#include "nsIObserverService.h"
#include "nsIObserver.h"
#include "nsIPrefBranch.h"
#include "nsIPrefService.h"
#include "mozilla/Services.h"
#include "mozilla/Sprintf.h"
#include "nsProxyRelease.h"
#include "nsThread.h"
#include "nsThreadUtils.h"
#include "nsNetUtil.h"
#include "nsNetCID.h"
#include "mozilla/RandomNum.h"
#include "mozilla/StaticMutex.h"
#include "mozilla/UniquePtrExtensions.h"
#include "mozilla/Unused.h"
#include "mozilla/dom/RTCDataChannelBinding.h"
#include "mozilla/dom/RTCStatsReportBinding.h"
#ifdef MOZ_PEERCONNECTION
# include "transport/runnable_utils.h"
# include "jsapi/MediaTransportHandler.h"
# include "mediapacket.h"
#endif
#include "DataChannel.h"
#include "DataChannelProtocol.h"
// Let us turn on and off important assertions in non-debug builds
#ifdef DEBUG
# define ASSERT_WEBRTC(x) MOZ_ASSERT((x))
#elif defined(MOZ_WEBRTC_ASSERT_ALWAYS)
# define ASSERT_WEBRTC(x) \
do { \
if (!(x)) { \
MOZ_CRASH(); \
} \
} while (0)
#endif
namespace mozilla {
LazyLogModule gDataChannelLog("DataChannel");
static LazyLogModule gSCTPLog("SCTP");
#define SCTP_LOG(args) \
MOZ_LOG(mozilla::gSCTPLog, mozilla::LogLevel::Debug, args)
static void debug_printf(const char* format, ...) {
va_list ap;
char buffer[1024];
if (MOZ_LOG_TEST(gSCTPLog, LogLevel::Debug)) {
va_start(ap, format);
#ifdef _WIN32
if (vsnprintf_s(buffer, sizeof(buffer), _TRUNCATE, format, ap) > 0) {
#else
if (VsprintfLiteral(buffer, format, ap) > 0) {
#endif
SCTP_LOG(("%s", buffer));
}
va_end(ap);
}
}
class DataChannelRegistry : public nsIObserver {
public:
NS_DECL_THREADSAFE_ISUPPORTS
static uintptr_t Register(DataChannelConnection* aConnection) {
StaticMutexAutoLock lock(sInstanceMutex);
if (NS_WARN_IF(!Instance())) {
return 0;
}
uintptr_t result = Instance()->RegisterImpl(aConnection);
DC_DEBUG(
("Registering connection %p as ulp %p", aConnection, (void*)result));
return result;
}
static void Deregister(uintptr_t aId) {
StaticMutexAutoLock lock(sInstanceMutex);
DC_DEBUG(("Deregistering connection ulp = %p", (void*)aId));
if (NS_WARN_IF(!Instance())) {
return;
}
Instance()->DeregisterImpl(aId);
}
static RefPtr<DataChannelConnection> Lookup(uintptr_t aId) {
StaticMutexAutoLock lock(sInstanceMutex);
if (NS_WARN_IF(!Instance())) {
return nullptr;
}
return Instance()->LookupImpl(aId);
}
private:
// This is a singleton class, so don't let just anyone create one of these
DataChannelRegistry() {
ASSERT_WEBRTC(NS_IsMainThread());
nsCOMPtr<nsIObserverService> observerService =
mozilla::services::GetObserverService();
if (!observerService) return;
nsresult rv =
observerService->AddObserver(this, "xpcom-will-shutdown", false);
MOZ_ASSERT(rv == NS_OK);
(void)rv;
// TODO(bug 1646716): usrsctp_finish is racy, so we init in the c'tor.
InitUsrSctp();
}
static RefPtr<DataChannelRegistry>& Instance() {
// Lazy-create static registry.
static RefPtr<DataChannelRegistry> sRegistry = new DataChannelRegistry;
return sRegistry;
}
NS_IMETHOD Observe(nsISupports* aSubject, const char* aTopic,
const char16_t* aData) override {
ASSERT_WEBRTC(NS_IsMainThread());
if (strcmp(aTopic, "xpcom-will-shutdown") == 0) {
RefPtr<DataChannelRegistry> self = this;
{
StaticMutexAutoLock lock(sInstanceMutex);
Instance() = nullptr;
}
// |self| is the only reference now
if (NS_WARN_IF(!mConnections.empty())) {
MOZ_ASSERT(false);
mConnections.clear();
}
// TODO(bug 1646716): usrsctp_finish is racy, so we wait until xpcom
// shutdown for this.
DeinitUsrSctp();
nsCOMPtr<nsIObserverService> observerService =
mozilla::services::GetObserverService();
if (NS_WARN_IF(!observerService)) {
return NS_ERROR_FAILURE;
}
nsresult rv =
observerService->RemoveObserver(this, "xpcom-will-shutdown");
MOZ_ASSERT(rv == NS_OK);
(void)rv;
}
return NS_OK;
}
uintptr_t RegisterImpl(DataChannelConnection* aConnection) {
ASSERT_WEBRTC(NS_IsMainThread());
// TODO(bug 1646716): usrsctp_finish is racy, so we init in the c'tor.
// if (mConnections.empty()) {
// InitUsrSctp();
//}
mConnections.emplace(mNextId, aConnection);
return mNextId++;
}
void DeregisterImpl(uintptr_t aId) {
ASSERT_WEBRTC(NS_IsMainThread());
mConnections.erase(aId);
// TODO(bug 1646716): usrsctp_finish is racy, so we wait until xpcom
// shutdown for this.
// if (mConnections.empty()) {
// DeinitUsrSctp();
//}
}
RefPtr<DataChannelConnection> LookupImpl(uintptr_t aId) {
auto it = mConnections.find(aId);
if (NS_WARN_IF(it == mConnections.end())) {
DC_DEBUG(("Can't find connection ulp %p", (void*)aId));
return nullptr;
}
return it->second;
}
virtual ~DataChannelRegistry() = default;
#ifdef SCTP_DTLS_SUPPORTED
static int SctpDtlsOutput(void* addr, void* buffer, size_t length,
uint8_t tos, uint8_t set_df) {
uintptr_t id = reinterpret_cast<uintptr_t>(addr);
RefPtr<DataChannelConnection> connection = DataChannelRegistry::Lookup(id);
if (NS_WARN_IF(!connection) || connection->InShutdown()) {
return 0;
}
return connection->SctpDtlsOutput(addr, buffer, length, tos, set_df);
}
#endif
void InitUsrSctp() {
DC_DEBUG(("sctp_init"));
#ifdef MOZ_PEERCONNECTION
usrsctp_init(0, DataChannelRegistry::SctpDtlsOutput, debug_printf);
#else
MOZ_CRASH("Trying to use SCTP/DTLS without dom/media/webrtc/transport");
#endif
// Set logging to SCTP:LogLevel::Debug to get SCTP debugs
if (MOZ_LOG_TEST(gSCTPLog, LogLevel::Debug)) {
usrsctp_sysctl_set_sctp_debug_on(SCTP_DEBUG_ALL);
}
// Do not send ABORTs in response to INITs (1).
// Do not send ABORTs for received Out of the Blue packets (2).
usrsctp_sysctl_set_sctp_blackhole(2);
// Disable the Explicit Congestion Notification extension (currently not
// supported by the Firefox code)
usrsctp_sysctl_set_sctp_ecn_enable(0);
// Enable interleaving messages for different streams (incoming)
usrsctp_sysctl_set_sctp_default_frag_interleave(2);
// Disabling authentication and dynamic address reconfiguration as neither
// of them are used for data channel and only result in additional code
// paths being used.
usrsctp_sysctl_set_sctp_asconf_enable(0);
usrsctp_sysctl_set_sctp_auth_enable(0);
}
void DeinitUsrSctp() {
DC_DEBUG(("Shutting down SCTP"));
usrsctp_finish();
}
uintptr_t mNextId = 1;
std::map<uintptr_t, RefPtr<DataChannelConnection>> mConnections;
static StaticMutex sInstanceMutex;
};
StaticMutex DataChannelRegistry::sInstanceMutex;
NS_IMPL_ISUPPORTS(DataChannelRegistry, nsIObserver);
OutgoingMsg::OutgoingMsg(struct sctp_sendv_spa& info, const uint8_t* data,
size_t length)
: mLength(length), mData(data) {
mInfo = &info;
mPos = 0;
}
void OutgoingMsg::Advance(size_t offset) {
mPos += offset;
if (mPos > mLength) {
mPos = mLength;
}
}
BufferedOutgoingMsg::BufferedOutgoingMsg(OutgoingMsg& msg) {
size_t length = msg.GetLeft();
auto* tmp = new uint8_t[length]; // infallible malloc!
memcpy(tmp, msg.GetData(), length);
mLength = length;
mData = tmp;
mInfo = new sctp_sendv_spa;
*mInfo = msg.GetInfo();
mPos = 0;
}
BufferedOutgoingMsg::~BufferedOutgoingMsg() {
delete mInfo;
delete[] mData;
}
static int receive_cb(struct socket* sock, union sctp_sockstore addr,
void* data, size_t datalen, struct sctp_rcvinfo rcv,
int flags, void* ulp_info) {
DC_DEBUG(("In receive_cb, ulp_info=%p", ulp_info));
uintptr_t id = reinterpret_cast<uintptr_t>(ulp_info);
RefPtr<DataChannelConnection> connection = DataChannelRegistry::Lookup(id);
if (!connection) {
// Unfortunately, we can get callbacks after calling
// usrsctp_close(socket), so we need to simply ignore them if we've
// already killed the DataChannelConnection object
DC_DEBUG((
"Ignoring receive callback for terminated Connection ulp=%p, %zu bytes",
ulp_info, datalen));
return 0;
}
return connection->ReceiveCallback(sock, data, datalen, rcv, flags);
}
static RefPtr<DataChannelConnection> GetConnectionFromSocket(
struct socket* sock) {
struct sockaddr* addrs = nullptr;
int naddrs = usrsctp_getladdrs(sock, 0, &addrs);
if (naddrs <= 0 || addrs[0].sa_family != AF_CONN) {
return nullptr;
}
// usrsctp_getladdrs() returns the addresses bound to this socket, which
// contains the SctpDataMediaChannel* as sconn_addr. Read the pointer,
// then free the list of addresses once we have the pointer. We only open
// AF_CONN sockets, and they should all have the sconn_addr set to the
// pointer that created them, so [0] is as good as any other.
struct sockaddr_conn* sconn =
reinterpret_cast<struct sockaddr_conn*>(&addrs[0]);
uintptr_t id = reinterpret_cast<uintptr_t>(sconn->sconn_addr);
RefPtr<DataChannelConnection> connection = DataChannelRegistry::Lookup(id);
usrsctp_freeladdrs(addrs);
return connection;
}
// Called when the buffer empties to the threshold value. This is called
// from SctpDtlsInput() through the sctp stack. SctpDtlsInput() calls
// usrsctp_conninput() under lock
static int threshold_event(struct socket* sock, uint32_t sb_free) {
RefPtr<DataChannelConnection> connection = GetConnectionFromSocket(sock);
connection->mLock.AssertCurrentThreadOwns();
if (connection) {
connection->SendDeferredMessages();
} else {
DC_ERROR(("Can't find connection for socket %p", sock));
}
return 0;
}
DataChannelConnection::~DataChannelConnection() {
DC_DEBUG(("Deleting DataChannelConnection %p", (void*)this));
// This may die on the MainThread, or on the STS thread, or on an
// sctp thread if we were in a callback when the DOM side shut things down.
ASSERT_WEBRTC(mState == CLOSED);
MOZ_ASSERT(!mMasterSocket);
MOZ_ASSERT(mPending.GetSize() == 0);
if (!IsSTSThread()) {
// We may be on MainThread *or* on an sctp thread (being called from
// receive_cb() or SctpDtlsOutput())
if (mInternalIOThread) {
// Avoid spinning the event thread from here (which if we're mainthread
// is in the event loop already)
nsCOMPtr<nsIRunnable> r = WrapRunnable(
nsCOMPtr<nsIThread>(mInternalIOThread), &nsIThread::AsyncShutdown);
Dispatch(r.forget());
}
} else {
// on STS, safe to call shutdown
if (mInternalIOThread) {
mInternalIOThread->Shutdown();
}
}
}
void DataChannelConnection::Destroy() {
// Though it's probably ok to do this and close the sockets;
// if we really want it to do true clean shutdowns it can
// create a dependant Internal object that would remain around
// until the network shut down the association or timed out.
DC_DEBUG(("Destroying DataChannelConnection %p", (void*)this));
ASSERT_WEBRTC(NS_IsMainThread());
CloseAll();
MutexAutoLock lock(mLock);
// If we had a pending reset, we aren't waiting for it - clear the list so
// we can deregister this DataChannelConnection without leaking.
ClearResets();
MOZ_ASSERT(mSTS);
ASSERT_WEBRTC(NS_IsMainThread());
mListener = nullptr;
// Finish Destroy on STS thread to avoid bug 876167 - once that's fixed,
// the usrsctp_close() calls can move back here (and just proxy the
// disconnect_all())
RUN_ON_THREAD(mSTS,
WrapRunnable(RefPtr<DataChannelConnection>(this),
&DataChannelConnection::DestroyOnSTS, mSocket,
mMasterSocket),
NS_DISPATCH_NORMAL);
// These will be released on STS
mSocket = nullptr;
mMasterSocket = nullptr; // also a flag that we've Destroyed this connection
// We can't get any more *new* callbacks from the SCTP library
// All existing callbacks have refs to DataChannelConnection - however,
// we need to handle their destroying the object off mainthread/STS
// nsDOMDataChannel objects have refs to DataChannels that have refs to us
}
void DataChannelConnection::DestroyOnSTS(struct socket* aMasterSocket,
struct socket* aSocket) {
if (aSocket && aSocket != aMasterSocket) usrsctp_close(aSocket);
if (aMasterSocket) usrsctp_close(aMasterSocket);
usrsctp_deregister_address(reinterpret_cast<void*>(mId));
DC_DEBUG(
("Deregistered %p from the SCTP stack.", reinterpret_cast<void*>(mId)));
#ifdef MOZ_DIAGNOSTIC_ASSERT_ENABLED
mShutdown = true;
DC_DEBUG(("Shutting down connection %p, id %p", this, (void*)mId));
#endif
disconnect_all();
mTransportHandler = nullptr;
GetMainThreadEventTarget()->Dispatch(NS_NewRunnableFunction(
"DataChannelConnection::Destroy",
[id = mId]() { DataChannelRegistry::Deregister(id); }));
}
Maybe<RefPtr<DataChannelConnection>> DataChannelConnection::Create(
DataChannelConnection::DataConnectionListener* aListener,
nsISerialEventTarget* aTarget, MediaTransportHandler* aHandler,
const uint16_t aLocalPort, const uint16_t aNumStreams,
const Maybe<uint64_t>& aMaxMessageSize) {
ASSERT_WEBRTC(NS_IsMainThread());
RefPtr<DataChannelConnection> connection = new DataChannelConnection(
aListener, aTarget, aHandler); // Walks into a bar
return connection->Init(aLocalPort, aNumStreams, aMaxMessageSize)
? Some(connection)
: Nothing();
}
DataChannelConnection::DataChannelConnection(
DataChannelConnection::DataConnectionListener* aListener,
nsISerialEventTarget* aTarget, MediaTransportHandler* aHandler)
: NeckoTargetHolder(aTarget),
mLock("netwerk::sctp::DataChannelConnection"),
mListener(aListener),
mTransportHandler(aHandler) {
DC_VERBOSE(("Constructor DataChannelConnection=%p, listener=%p", this,
mListener.get()));
#ifdef MOZ_DIAGNOSTIC_ASSERT_ENABLED
mShutdown = false;
#endif
}
bool DataChannelConnection::Init(const uint16_t aLocalPort,
const uint16_t aNumStreams,
const Maybe<uint64_t>& aMaxMessageSize) {
ASSERT_WEBRTC(NS_IsMainThread());
struct sctp_initmsg initmsg;
struct sctp_assoc_value av;
struct sctp_event event;
socklen_t len;
uint16_t event_types[] = {
SCTP_ASSOC_CHANGE, SCTP_PEER_ADDR_CHANGE,
SCTP_REMOTE_ERROR, SCTP_SHUTDOWN_EVENT,
SCTP_ADAPTATION_INDICATION, SCTP_PARTIAL_DELIVERY_EVENT,
SCTP_SEND_FAILED_EVENT, SCTP_STREAM_RESET_EVENT,
SCTP_STREAM_CHANGE_EVENT};
{
// MutexAutoLock lock(mLock); Not needed since we're on mainthread always
mLocalPort = aLocalPort;
SetMaxMessageSize(aMaxMessageSize.isSome(), aMaxMessageSize.valueOr(0));
}
mId = DataChannelRegistry::Register(this);
// XXX FIX! make this a global we get once
// Find the STS thread
nsresult rv;
mSTS = do_GetService(NS_SOCKETTRANSPORTSERVICE_CONTRACTID, &rv);
MOZ_ASSERT(NS_SUCCEEDED(rv));
// Open sctp with a callback
if ((mMasterSocket = usrsctp_socket(
AF_CONN, SOCK_STREAM, IPPROTO_SCTP, receive_cb, threshold_event,
usrsctp_sysctl_get_sctp_sendspace() / 2,
reinterpret_cast<void*>(mId))) == nullptr) {
return false;
}
int buf_size = 1024 * 1024;
if (usrsctp_setsockopt(mMasterSocket, SOL_SOCKET, SO_RCVBUF,
(const void*)&buf_size, sizeof(buf_size)) < 0) {
DC_ERROR(("Couldn't change receive buffer size on SCTP socket"));
goto error_cleanup;
}
if (usrsctp_setsockopt(mMasterSocket, SOL_SOCKET, SO_SNDBUF,
(const void*)&buf_size, sizeof(buf_size)) < 0) {
DC_ERROR(("Couldn't change send buffer size on SCTP socket"));
goto error_cleanup;
}
// Make non-blocking for bind/connect. SCTP over UDP defaults to non-blocking
// in associations for normal IO
if (usrsctp_set_non_blocking(mMasterSocket, 1) < 0) {
DC_ERROR(("Couldn't set non_blocking on SCTP socket"));
// We can't handle connect() safely if it will block, not that this will
// even happen.
goto error_cleanup;
}
// Make sure when we close the socket, make sure it doesn't call us back
// again! This would cause it try to use an invalid DataChannelConnection
// pointer
struct linger l;
l.l_onoff = 1;
l.l_linger = 0;
if (usrsctp_setsockopt(mMasterSocket, SOL_SOCKET, SO_LINGER, (const void*)&l,
(socklen_t)sizeof(struct linger)) < 0) {
DC_ERROR(("Couldn't set SO_LINGER on SCTP socket"));
// unsafe to allow it to continue if this fails
goto error_cleanup;
}
// XXX Consider disabling this when we add proper SDP negotiation.
// We may want to leave enabled for supporting 'cloning' of SDP offers, which
// implies re-use of the same pseudo-port number, or forcing a renegotiation.
{
const int option_value = 1;
if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_REUSE_PORT,
(const void*)&option_value,
(socklen_t)sizeof(option_value)) < 0) {
DC_WARN(("Couldn't set SCTP_REUSE_PORT on SCTP socket"));
}
if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_NODELAY,
(const void*)&option_value,
(socklen_t)sizeof(option_value)) < 0) {
DC_WARN(("Couldn't set SCTP_NODELAY on SCTP socket"));
}
}
// Set explicit EOR
{
const int option_value = 1;
if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_EXPLICIT_EOR,
(const void*)&option_value,
(socklen_t)sizeof(option_value)) < 0) {
DC_ERROR(("*** failed to enable explicit EOR mode %d", errno));
goto error_cleanup;
}
}
// Enable ndata
// TODO: Bug 1381145, enable this once ndata has been deployed
#if 0
av.assoc_id = SCTP_FUTURE_ASSOC;
av.assoc_value = 1;
if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_INTERLEAVING_SUPPORTED, &av,
(socklen_t)sizeof(struct sctp_assoc_value)) < 0) {
DC_ERROR(("*** failed enable ndata errno %d", errno));
goto error_cleanup;
}
#endif
av.assoc_id = SCTP_ALL_ASSOC;
av.assoc_value = SCTP_ENABLE_RESET_STREAM_REQ | SCTP_ENABLE_CHANGE_ASSOC_REQ;
if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_ENABLE_STREAM_RESET,
&av, (socklen_t)sizeof(struct sctp_assoc_value)) < 0) {
DC_ERROR(("*** failed enable stream reset errno %d", errno));
goto error_cleanup;
}
/* Enable the events of interest. */
memset(&event, 0, sizeof(event));
event.se_assoc_id = SCTP_ALL_ASSOC;
event.se_on = 1;
for (unsigned short event_type : event_types) {
event.se_type = event_type;
if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_EVENT, &event,
sizeof(event)) < 0) {
DC_ERROR(("*** failed setsockopt SCTP_EVENT errno %d", errno));
goto error_cleanup;
}
}
memset(&initmsg, 0, sizeof(initmsg));
len = sizeof(initmsg);
if (usrsctp_getsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_INITMSG, &initmsg,
&len) < 0) {
DC_ERROR(("*** failed getsockopt SCTP_INITMSG"));
goto error_cleanup;
}
DC_DEBUG(("Setting number of SCTP streams to %u, was %u/%u", aNumStreams,
initmsg.sinit_num_ostreams, initmsg.sinit_max_instreams));
initmsg.sinit_num_ostreams = aNumStreams;
initmsg.sinit_max_instreams = MAX_NUM_STREAMS;
if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_INITMSG, &initmsg,
(socklen_t)sizeof(initmsg)) < 0) {
DC_ERROR(("*** failed setsockopt SCTP_INITMSG, errno %d", errno));
goto error_cleanup;
}
mSocket = nullptr;
mSTS->Dispatch(
NS_NewRunnableFunction("DataChannelConnection::Init", [id = mId]() {
usrsctp_register_address(reinterpret_cast<void*>(id));
DC_DEBUG(("Registered %p within the SCTP stack.",
reinterpret_cast<void*>(id)));
}));
return true;
error_cleanup:
usrsctp_close(mMasterSocket);
mMasterSocket = nullptr;
return false;
}
void DataChannelConnection::SetMaxMessageSize(bool aMaxMessageSizeSet,
uint64_t aMaxMessageSize) {
MutexAutoLock lock(mLock); // TODO: Needed?
if (mMaxMessageSizeSet && !aMaxMessageSizeSet) {
// Don't overwrite already set MMS with default values
return;
}
mMaxMessageSizeSet = aMaxMessageSizeSet;
mMaxMessageSize = aMaxMessageSize;
nsresult rv;
nsCOMPtr<nsIPrefService> prefs =
do_GetService("@mozilla.org/preferences-service;1", &rv);
if (!NS_WARN_IF(NS_FAILED(rv))) {
nsCOMPtr<nsIPrefBranch> branch = do_QueryInterface(prefs);
if (branch) {
int32_t temp;
if (!NS_FAILED(branch->GetIntPref(
"media.peerconnection.sctp.force_maximum_message_size", &temp))) {
if (temp >= 0) {
mMaxMessageSize = (uint64_t)temp;
}
}
}
}
// Fix remote MMS. This code exists, so future implementations of
// RTCSctpTransport.maxMessageSize can simply provide that value from
// GetMaxMessageSize.
// TODO: Bug 1382779, once resolved, can be increased to
// min(Uint8ArrayMaxSize, UINT32_MAX)
// TODO: Bug 1381146, once resolved, can be increased to whatever we support
// then (hopefully
// SIZE_MAX)
if (mMaxMessageSize == 0 ||
mMaxMessageSize > WEBRTC_DATACHANNEL_MAX_MESSAGE_SIZE_REMOTE) {
mMaxMessageSize = WEBRTC_DATACHANNEL_MAX_MESSAGE_SIZE_REMOTE;
}
DC_DEBUG(("Maximum message size (outgoing data): %" PRIu64
" (set=%s, enforced=%s)",
mMaxMessageSize, mMaxMessageSizeSet ? "yes" : "no",
aMaxMessageSize != mMaxMessageSize ? "yes" : "no"));
}
uint64_t DataChannelConnection::GetMaxMessageSize() { return mMaxMessageSize; }
void DataChannelConnection::AppendStatsToReport(
const UniquePtr<dom::RTCStatsCollection>& aReport,
const DOMHighResTimeStamp aTimestamp) const {
ASSERT_WEBRTC(NS_IsMainThread());
nsString temp;
for (const RefPtr<DataChannel>& chan : mChannels.GetAll()) {
// If channel is empty, ignore
if (!chan) {
continue;
}
mozilla::dom::RTCDataChannelStats stats;
nsString id = u"dc"_ns;
id.AppendInt(chan->GetStream());
stats.mId.Construct(id);
chan->GetLabel(temp);
stats.mTimestamp.Construct(aTimestamp);
stats.mType.Construct(mozilla::dom::RTCStatsType::Data_channel);
stats.mLabel.Construct(temp);
chan->GetProtocol(temp);
stats.mProtocol.Construct(temp);
stats.mDataChannelIdentifier.Construct(chan->GetStream());
{
using State = mozilla::dom::RTCDataChannelState;
State state;
switch (chan->GetReadyState()) {
case CONNECTING:
state = State::Connecting;
break;
case OPEN:
state = State::Open;
break;
case CLOSING:
state = State::Closing;
break;
case CLOSED:
state = State::Closed;
break;
default:
MOZ_ASSERT(false, "Unknown DataChannel state");
continue;
};
stats.mState.Construct(state);
}
auto counters = chan->GetTrafficCounters();
stats.mMessagesSent.Construct(counters.mMessagesSent);
stats.mBytesSent.Construct(counters.mBytesSent);
stats.mMessagesReceived.Construct(counters.mMessagesReceived);
stats.mBytesReceived.Construct(counters.mBytesReceived);
if (!aReport->mDataChannelStats.AppendElement(stats, fallible)) {
mozalloc_handle_oom(0);
}
}
}
#ifdef MOZ_PEERCONNECTION
bool DataChannelConnection::ConnectToTransport(const std::string& aTransportId,
const bool aClient,
const uint16_t aLocalPort,
const uint16_t aRemotePort) {
MutexAutoLock lock(mLock);
MOZ_ASSERT(mMasterSocket,
"SCTP wasn't initialized before ConnectToTransport!");
static const auto paramString =
[](const std::string& tId, const Maybe<bool>& client,
const uint16_t localPort, const uint16_t remotePort) -> std::string {
std::ostringstream stream;
stream << "Transport ID: '" << tId << "', Role: '"
<< (client ? (client.value() ? "client" : "server") : "")
<< "', Local Port: '" << localPort << "', Remote Port: '"
<< remotePort << "'";
return stream.str();
};
const auto params =
paramString(aTransportId, Some(aClient), aLocalPort, aRemotePort);
DC_DEBUG(("ConnectToTransport connecting DTLS transport with parameters: %s",
params.c_str()));
const auto currentReadyState = GetReadyState();
if (currentReadyState == OPEN) {
if (aTransportId == mTransportId && mAllocateEven.isSome() &&
mAllocateEven.value() == aClient && mLocalPort == aLocalPort &&
mRemotePort == aRemotePort) {
DC_WARN(
("Skipping attempt to connect to an already OPEN transport with "
"identical parameters."));
return true;
}
DC_WARN(
("Attempting to connect to an already OPEN transport, because "
"different parameters were provided."));
DC_WARN(("Original transport parameters: %s",
paramString(mTransportId, mAllocateEven, mLocalPort, aRemotePort)
.c_str()));
DC_WARN(("New transport parameters: %s", params.c_str()));
}
if (NS_WARN_IF(aTransportId.empty())) {
return false;
}
mLocalPort = aLocalPort;
mRemotePort = aRemotePort;
SetReadyState(CONNECTING);
mAllocateEven = Some(aClient);
// Could be faster. Probably doesn't matter.
while (auto channel = mChannels.Get(INVALID_STREAM)) {
mChannels.Remove(channel);
channel->mStream = FindFreeStream();
if (channel->mStream != INVALID_STREAM) {
mChannels.Insert(channel);
}
}
RUN_ON_THREAD(mSTS,
WrapRunnable(RefPtr<DataChannelConnection>(this),
&DataChannelConnection::SetSignals, aTransportId),
NS_DISPATCH_NORMAL);
return true;
}
void DataChannelConnection::SetSignals(const std::string& aTransportId) {
ASSERT_WEBRTC(IsSTSThread());
{
MutexAutoLock lock(mLock);
mTransportId = aTransportId;
}
mTransportHandler->SignalPacketReceived.connect(
this, &DataChannelConnection::SctpDtlsInput);
mTransportHandler->SignalStateChange.connect(
this, &DataChannelConnection::TransportStateChange);
// SignalStateChange() doesn't call you with the initial state
if (mTransportHandler->GetState(mTransportId, false) ==
TransportLayer::TS_OPEN) {
DC_DEBUG(("Setting transport signals, dtls already open"));
CompleteConnect();
} else {
DC_DEBUG(("Setting transport signals, dtls not open yet"));
}
}
void DataChannelConnection::TransportStateChange(
const std::string& aTransportId, TransportLayer::State aState) {
ASSERT_WEBRTC(IsSTSThread());
if (aTransportId == mTransportId) {
if (aState == TransportLayer::TS_OPEN) {
DC_DEBUG(("Transport is open!"));
CompleteConnect();
} else if (aState == TransportLayer::TS_CLOSED ||
aState == TransportLayer::TS_NONE ||
aState == TransportLayer::TS_ERROR) {
DC_DEBUG(("Transport is closed!"));
Stop();
}
}
}
void DataChannelConnection::CompleteConnect() {
MutexAutoLock lock(mLock);
DC_DEBUG(("dtls open"));
ASSERT_WEBRTC(IsSTSThread());
if (!mMasterSocket) {
return;
}
struct sockaddr_conn addr;
memset(&addr, 0, sizeof(addr));
addr.sconn_family = AF_CONN;
# if defined(__Userspace_os_Darwin)
addr.sconn_len = sizeof(addr);
# endif
addr.sconn_port = htons(mLocalPort);
addr.sconn_addr = reinterpret_cast<void*>(mId);
DC_DEBUG(("Calling usrsctp_bind"));
int r = usrsctp_bind(mMasterSocket, reinterpret_cast<struct sockaddr*>(&addr),
sizeof(addr));
if (r < 0) {
DC_ERROR(("usrsctp_bind failed: %d", r));
} else {
// This is the remote addr
addr.sconn_port = htons(mRemotePort);
DC_DEBUG(("Calling usrsctp_connect"));
r = usrsctp_connect(
mMasterSocket, reinterpret_cast<struct sockaddr*>(&addr), sizeof(addr));
if (r >= 0 || errno == EINPROGRESS) {
struct sctp_paddrparams paddrparams;
socklen_t opt_len;
memset(&paddrparams, 0, sizeof(struct sctp_paddrparams));
memcpy(&paddrparams.spp_address, &addr, sizeof(struct sockaddr_conn));
opt_len = (socklen_t)sizeof(struct sctp_paddrparams);
r = usrsctp_getsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_PEER_ADDR_PARAMS,
&paddrparams, &opt_len);
if (r < 0) {
DC_ERROR(("usrsctp_getsockopt failed: %d", r));
} else {
// draft-ietf-rtcweb-data-channel-13 section 5: max initial MTU IPV4
// 1200, IPV6 1280
paddrparams.spp_pathmtu = 1200; // safe for either
paddrparams.spp_flags &= ~SPP_PMTUD_ENABLE;
paddrparams.spp_flags |= SPP_PMTUD_DISABLE;
opt_len = (socklen_t)sizeof(struct sctp_paddrparams);
r = usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP,
SCTP_PEER_ADDR_PARAMS, &paddrparams, opt_len);
if (r < 0) {
DC_ERROR(("usrsctp_getsockopt failed: %d", r));
} else {
DC_ERROR(("usrsctp: PMTUD disabled, MTU set to %u",
paddrparams.spp_pathmtu));
}
}
}
if (r < 0) {
if (errno == EINPROGRESS) {
// non-blocking
return;
}
DC_ERROR(("usrsctp_connect failed: %d", errno));
SetReadyState(CLOSED);
} else {
// We fire ON_CONNECTION via SCTP_COMM_UP when we get that
return;
}
}
// Note: currently this doesn't actually notify the application
Dispatch(do_AddRef(new DataChannelOnMessageAvailable(
DataChannelOnMessageAvailable::ON_CONNECTION, this)));
}
// Process any pending Opens
void DataChannelConnection::ProcessQueuedOpens() {
// The nsDeque holds channels with an AddRef applied. Another reference
// (may) be held by the DOMDataChannel, unless it's been GC'd. No other
// references should exist.
// Can't copy nsDeque's. Move into temp array since any that fail will
// go back to mPending
nsRefPtrDeque<DataChannel> temp;
RefPtr<DataChannel> temp_channel;
while (nullptr != (temp_channel = mPending.PopFront())) {
temp.Push(temp_channel.forget());
}
RefPtr<DataChannel> channel;
while (nullptr != (channel = temp.PopFront())) {
if (channel->mFlags & DATA_CHANNEL_FLAGS_FINISH_OPEN) {
DC_DEBUG(("Processing queued open for %p (%u)", channel.get(),
channel->mStream));
channel->mFlags &= ~DATA_CHANNEL_FLAGS_FINISH_OPEN;
// OpenFinish returns a reference itself, so we need to take it can
// Release it
channel = OpenFinish(channel.forget()); // may reset the flag and re-push
} else {
NS_ASSERTION(
false,
"How did a DataChannel get queued without the FINISH_OPEN flag?");
}
}
}
void DataChannelConnection::SctpDtlsInput(const std::string& aTransportId,
const MediaPacket& packet) {
MutexAutoLock lock(mLock);
if ((packet.type() != MediaPacket::SCTP) || (mTransportId != aTransportId)) {
return;
}
if (MOZ_LOG_TEST(gSCTPLog, LogLevel::Debug)) {
char* buf;
if ((buf = usrsctp_dumppacket((void*)packet.data(), packet.len(),
SCTP_DUMP_INBOUND)) != nullptr) {
SCTP_LOG(("%s", buf));
usrsctp_freedumpbuffer(buf);
}
}
// Pass the data to SCTP
usrsctp_conninput(reinterpret_cast<void*>(mId), packet.data(), packet.len(),
0);
}
void DataChannelConnection::SendPacket(std::unique_ptr<MediaPacket>&& packet) {
mSTS->Dispatch(NS_NewRunnableFunction(
"DataChannelConnection::SendPacket",
[this, self = RefPtr<DataChannelConnection>(this),
packet = std::move(packet)]() mutable {
// DC_DEBUG(("%p: SCTP/DTLS sent %ld bytes", this, len));
if (!mTransportId.empty() && mTransportHandler) {
mTransportHandler->SendPacket(mTransportId, std::move(*packet));
}
}));
}
int DataChannelConnection::SctpDtlsOutput(void* addr, void* buffer,
size_t length, uint8_t tos,
uint8_t set_df) {
if (MOZ_LOG_TEST(gSCTPLog, LogLevel::Debug)) {
char* buf;
if ((buf = usrsctp_dumppacket(buffer, length, SCTP_DUMP_OUTBOUND)) !=
nullptr) {
SCTP_LOG(("%s", buf));
usrsctp_freedumpbuffer(buf);
}
}
// We're async proxying even if on the STSThread because this is called
// with internal SCTP locks held in some cases (such as in usrsctp_connect()).
// SCTP has an option for Apple, on IP connections only, to release at least
// one of the locks before calling a packet output routine; with changes to
// the underlying SCTP stack this might remove the need to use an async proxy.
std::unique_ptr<MediaPacket> packet(new MediaPacket);
packet->SetType(MediaPacket::SCTP);
packet->Copy(static_cast<const uint8_t*>(buffer), length);
if (NS_IsMainThread() && mDeferSend) {
mDeferredSend.emplace_back(std::move(packet));
return 0;
}
SendPacket(std::move(packet));
return 0; // cheat! Packets can always be dropped later anyways
}
#endif
#ifdef ALLOW_DIRECT_SCTP_LISTEN_CONNECT
// listen for incoming associations
// Blocks! - Don't call this from main thread!
bool DataChannelConnection::Listen(unsigned short port) {
struct sockaddr_in addr;
socklen_t addr_len;
NS_WARNING_ASSERTION(!NS_IsMainThread(),
"Blocks, do not call from main thread!!!");
/* Acting as the 'server' */
memset((void*)&addr, 0, sizeof(addr));
# ifdef HAVE_SIN_LEN
addr.sin_len = sizeof(struct sockaddr_in);
# endif
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
addr.sin_addr.s_addr = htonl(INADDR_ANY);
DC_DEBUG(("Waiting for connections on port %u", ntohs(addr.sin_port)));
{
MutexAutoLock lock(mLock);
SetReadyState(CONNECTING);
}
if (usrsctp_bind(mMasterSocket, reinterpret_cast<struct sockaddr*>(&addr),
sizeof(struct sockaddr_in)) < 0) {
DC_ERROR(("***Failed userspace_bind"));
return false;
}
if (usrsctp_listen(mMasterSocket, 1) < 0) {
DC_ERROR(("***Failed userspace_listen"));
return false;
}
DC_DEBUG(("Accepting connection"));
addr_len = 0;
if ((mSocket = usrsctp_accept(mMasterSocket, nullptr, &addr_len)) ==
nullptr) {
DC_ERROR(("***Failed accept"));
return false;
}
{
MutexAutoLock lock(mLock);
SetReadyState(OPEN);
}
struct linger l;
l.l_onoff = 1;
l.l_linger = 0;
if (usrsctp_setsockopt(mSocket, SOL_SOCKET, SO_LINGER, (const void*)&l,
(socklen_t)sizeof(struct linger)) < 0) {
DC_WARN(("Couldn't set SO_LINGER on SCTP socket"));
}
// Notify Connection open
// XXX We need to make sure connection sticks around until the message is
// delivered
DC_DEBUG(("%s: sending ON_CONNECTION for %p", __FUNCTION__, this));
Dispatch(do_AddRef(new DataChannelOnMessageAvailable(
DataChannelOnMessageAvailable::ON_CONNECTION, this,
(DataChannel*)nullptr)));
return true;
}
// Blocks! - Don't call this from main thread!
bool DataChannelConnection::Connect(const char* addr, unsigned short port) {
struct sockaddr_in addr4;
struct sockaddr_in6 addr6;
NS_WARNING_ASSERTION(!NS_IsMainThread(),
"Blocks, do not call from main thread!!!");
/* Acting as the connector */
DC_DEBUG(("Connecting to %s, port %u", addr, port));
memset((void*)&addr4, 0, sizeof(struct sockaddr_in));
memset((void*)&addr6, 0, sizeof(struct sockaddr_in6));
# ifdef HAVE_SIN_LEN
addr4.sin_len = sizeof(struct sockaddr_in);
# endif
# ifdef HAVE_SIN6_LEN
addr6.sin6_len = sizeof(struct sockaddr_in6);
# endif
addr4.sin_family = AF_INET;
addr6.sin6_family = AF_INET6;
addr4.sin_port = htons(port);
addr6.sin6_port = htons(port);
{
MutexAutoLock lock(mLock);
SetReadyState(CONNECTING);
}
# if !defined(__Userspace_os_Windows)
if (inet_pton(AF_INET6, addr, &addr6.sin6_addr) == 1) {
if (usrsctp_connect(mMasterSocket,
reinterpret_cast<struct sockaddr*>(&addr6),
sizeof(struct sockaddr_in6)) < 0) {
DC_ERROR(("*** Failed userspace_connect"));
return false;
}
} else if (inet_pton(AF_INET, addr, &addr4.sin_addr) == 1) {
if (usrsctp_connect(mMasterSocket,
reinterpret_cast<struct sockaddr*>(&addr4),
sizeof(struct sockaddr_in)) < 0) {
DC_ERROR(("*** Failed userspace_connect"));
return false;
}
} else {
DC_ERROR(("*** Illegal destination address."));
}
# else
{
struct sockaddr_storage ss;
int sslen = sizeof(ss);
if (!WSAStringToAddressA(const_cast<char*>(addr), AF_INET6, nullptr,
(struct sockaddr*)&ss, &sslen)) {
addr6.sin6_addr =
(reinterpret_cast<struct sockaddr_in6*>(&ss))->sin6_addr;
if (usrsctp_connect(mMasterSocket,
reinterpret_cast<struct sockaddr*>(&addr6),
sizeof(struct sockaddr_in6)) < 0) {
DC_ERROR(("*** Failed userspace_connect"));
return false;
}
} else if (!WSAStringToAddressA(const_cast<char*>(addr), AF_INET, nullptr,
(struct sockaddr*)&ss, &sslen)) {
addr4.sin_addr = (reinterpret_cast<struct sockaddr_in*>(&ss))->sin_addr;
if (usrsctp_connect(mMasterSocket,
reinterpret_cast<struct sockaddr*>(&addr4),
sizeof(struct sockaddr_in)) < 0) {
DC_ERROR(("*** Failed userspace_connect"));
return false;
}
} else {
DC_ERROR(("*** Illegal destination address."));
}
}
# endif
mSocket = mMasterSocket;
DC_DEBUG(("connect() succeeded! Entering connected mode"));
{
MutexAutoLock lock(mLock);
SetReadyState(OPEN);
}
// Notify Connection open
// XXX We need to make sure connection sticks around until the message is
// delivered
DC_DEBUG(("%s: sending ON_CONNECTION for %p", __FUNCTION__, this));
Dispatch(do_AddRef(new DataChannelOnMessageAvailable(
DataChannelOnMessageAvailable::ON_CONNECTION, this,
(DataChannel*)nullptr)));
return true;
}
#endif
DataChannel* DataChannelConnection::FindChannelByStream(uint16_t stream) {
return mChannels.Get(stream).get();
}
uint16_t DataChannelConnection::FindFreeStream() {
ASSERT_WEBRTC(NS_IsMainThread());
uint16_t i, limit;
limit = MAX_NUM_STREAMS;
MOZ_ASSERT(mAllocateEven.isSome());
for (i = (*mAllocateEven ? 0 : 1); i < limit; i += 2) {
if (mChannels.Get(i)) {
continue;
}
// Verify it's not still in the process of closing
size_t j;
for (j = 0; j < mStreamsResetting.Length(); ++j) {
if (mStreamsResetting[j] == i) {
break;
}
}
if (j == mStreamsResetting.Length()) {
return i;
}
}
return INVALID_STREAM;
}
uint32_t DataChannelConnection::UpdateCurrentStreamIndex() {
RefPtr<DataChannel> channel = mChannels.GetNextChannel(mCurrentStream);
if (!channel) {
mCurrentStream = 0;
} else {
mCurrentStream = channel->mStream;
}
return mCurrentStream;
}
uint32_t DataChannelConnection::GetCurrentStreamIndex() {
if (!mChannels.Get(mCurrentStream)) {
// The stream muse have been removed, reset
DC_DEBUG(("Reset mCurrentChannel"));
mCurrentStream = 0;
}
return mCurrentStream;
}
bool DataChannelConnection::RequestMoreStreams(int32_t aNeeded) {
struct sctp_status status;
struct sctp_add_streams sas;
uint32_t outStreamsNeeded;
socklen_t len;
if (aNeeded + mNegotiatedIdLimit > MAX_NUM_STREAMS) {
aNeeded = MAX_NUM_STREAMS - mNegotiatedIdLimit;
}
if (aNeeded <= 0) {
return false;
}
len = (socklen_t)sizeof(struct sctp_status);
if (usrsctp_getsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_STATUS, &status,
&len) < 0) {
DC_ERROR(("***failed: getsockopt SCTP_STATUS"));
return false;
}
outStreamsNeeded = aNeeded; // number to add
// Note: if multiple channel opens happen when we don't have enough space,
// we'll call RequestMoreStreams() multiple times
memset(&sas, 0, sizeof(sas));
sas.sas_instrms = 0;
sas.sas_outstrms = (uint16_t)outStreamsNeeded; /* XXX error handling */
// Doesn't block, we get an event when it succeeds or fails
if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_ADD_STREAMS, &sas,
(socklen_t)sizeof(struct sctp_add_streams)) < 0) {
if (errno == EALREADY) {
DC_DEBUG(("Already have %u output streams", outStreamsNeeded));
return true;
}
DC_ERROR(("***failed: setsockopt ADD errno=%d", errno));
return false;
}
DC_DEBUG(("Requested %u more streams", outStreamsNeeded));
// We add to mNegotiatedIdLimit when we get a SCTP_STREAM_CHANGE_EVENT and the
// values are larger than mNegotiatedIdLimit
return true;
}
// Returns a POSIX error code.
int DataChannelConnection::SendControlMessage(const uint8_t* data, uint32_t len,
uint16_t stream) {
struct sctp_sendv_spa info = {0};
// General flags
info.sendv_flags = SCTP_SEND_SNDINFO_VALID;
// Set stream identifier, protocol identifier and flags
info.sendv_sndinfo.snd_sid = stream;
info.sendv_sndinfo.snd_flags = SCTP_EOR;
info.sendv_sndinfo.snd_ppid = htonl(DATA_CHANNEL_PPID_CONTROL);
// Create message instance and send
// Note: Main-thread IO, but doesn't block
#if (UINT32_MAX > SIZE_MAX)
if (len > SIZE_MAX) {
return EMSGSIZE;
}
#endif
OutgoingMsg msg(info, data, (size_t)len);
bool buffered;
int error = SendMsgInternalOrBuffer(mBufferedControl, msg, buffered, nullptr);
// Set pending type (if buffered)
if (!error && buffered && !mPendingType) {
mPendingType = PENDING_DCEP;
}
return error;
}
// Returns a POSIX error code.
int DataChannelConnection::SendOpenAckMessage(uint16_t stream) {
struct rtcweb_datachannel_ack ack;
memset(&ack, 0, sizeof(struct rtcweb_datachannel_ack));
ack.msg_type = DATA_CHANNEL_ACK;
return SendControlMessage((const uint8_t*)&ack, sizeof(ack), stream);
}
// Returns a POSIX error code.
int DataChannelConnection::SendOpenRequestMessage(
const nsACString& label, const nsACString& protocol, uint16_t stream,
bool unordered, uint16_t prPolicy, uint32_t prValue) {
const int label_len = label.Length(); // not including nul
const int proto_len = protocol.Length(); // not including nul
// careful - request struct include one char for the label
const int req_size = sizeof(struct rtcweb_datachannel_open_request) - 1 +
label_len + proto_len;
UniqueFreePtr<struct rtcweb_datachannel_open_request> req(
(struct rtcweb_datachannel_open_request*)moz_xmalloc(req_size));
memset(req.get(), 0, req_size);
req->msg_type = DATA_CHANNEL_OPEN_REQUEST;
switch (prPolicy) {
case SCTP_PR_SCTP_NONE:
req->channel_type = DATA_CHANNEL_RELIABLE;
break;
case SCTP_PR_SCTP_TTL:
req->channel_type = DATA_CHANNEL_PARTIAL_RELIABLE_TIMED;
break;
case SCTP_PR_SCTP_RTX:
req->channel_type = DATA_CHANNEL_PARTIAL_RELIABLE_REXMIT;
break;
default:
return EINVAL;
}
if (unordered) {
// Per the current types, all differ by 0x80 between ordered and unordered
req->channel_type |=
0x80; // NOTE: be careful if new types are added in the future
}
req->reliability_param = htonl(prValue);
req->priority = htons(0); /* XXX: add support */
req->label_length = htons(label_len);
req->protocol_length = htons(proto_len);
memcpy(&req->label[0], PromiseFlatCString(label).get(), label_len);
memcpy(&req->label[label_len], PromiseFlatCString(protocol).get(), proto_len);
// TODO: req_size is an int... that looks hairy
int error = SendControlMessage((const uint8_t*)req.get(), req_size, stream);
return error;
}
// XXX This should use a separate thread (outbound queue) which should
// select() to know when to *try* to send data to the socket again.
// Alternatively, it can use a timeout, but that's guaranteed to be wrong
// (just not sure in what direction). We could re-implement NSPR's
// PR_POLL_WRITE/etc handling... with a lot of work.
// Better yet, use the SCTP stack's notifications on buffer state to avoid
// filling the SCTP's buffers.
// returns if we're still blocked (true)
bool DataChannelConnection::SendDeferredMessages() {
RefPtr<DataChannel> channel; // we may null out the refs to this
// This may block while something is modifying channels, but should not block
// for IO
ASSERT_WEBRTC(!NS_IsMainThread());
mLock.AssertCurrentThreadOwns();
DC_DEBUG(("SendDeferredMessages called, pending type: %d", mPendingType));
if (!mPendingType) {
return false;
}
// Send pending control messages
// Note: If ndata is not active, check if DCEP messages are currently
// outstanding. These need to
// be sent first before other streams can be used for sending.
if (!mBufferedControl.IsEmpty() &&
(mSendInterleaved || mPendingType == PENDING_DCEP)) {
if (SendBufferedMessages(mBufferedControl, nullptr)) {
return true;
}
// Note: There may or may not be pending data messages
mPendingType = PENDING_DATA;
}
bool blocked = false;
uint32_t i = GetCurrentStreamIndex();
uint32_t end = i;
do {
channel = mChannels.Get(i);
// Should already be cleared if closing/closed
if (!channel || channel->mBufferedData.IsEmpty()) {
i = UpdateCurrentStreamIndex();
continue;
}
// Send buffered data messages
// Warning: This will fail in case ndata is inactive and a previously
// deallocated data channel has not been closed properly. If you
// ever see that no messages can be sent on any channel, this is
// likely the cause (an explicit EOR message partially sent whose
// remaining chunks are still being waited for).
size_t written = 0;
mDeferSend = true;
blocked = SendBufferedMessages(channel->mBufferedData, &written);
mDeferSend = false;
if (written) {
channel->DecrementBufferedAmount(written);
}
for (auto&& packet : mDeferredSend) {
MOZ_ASSERT(written);
SendPacket(std::move(packet));
}
mDeferredSend.clear();
// Update current stream index
// Note: If ndata is not active, the outstanding data messages on this
// stream need to be sent first before other streams can be used for
// sending.
if (mSendInterleaved || !blocked) {
i = UpdateCurrentStreamIndex();
}
} while (!blocked && i != end);
if (!blocked) {
mPendingType = mBufferedControl.IsEmpty() ? PENDING_NONE : PENDING_DCEP;
}
return blocked;
}
// Called with mLock locked!
// buffer MUST have at least one item!
// returns if we're still blocked (true)
bool DataChannelConnection::SendBufferedMessages(
nsTArray<UniquePtr<BufferedOutgoingMsg>>& buffer, size_t* aWritten) {
do {
// Re-send message
int error = SendMsgInternal(*buffer[0], aWritten);
switch (error) {
case 0:
buffer.RemoveElementAt(0);
break;
case EAGAIN:
#if (EAGAIN != EWOULDBLOCK)
case EWOULDBLOCK:
#endif
return true;
default:
buffer.RemoveElementAt(0);
DC_ERROR(("error on sending: %d", error));
break;
}
} while (!buffer.IsEmpty());
return false;
}
// Caller must ensure that length <= SIZE_MAX
void DataChannelConnection::HandleOpenRequestMessage(
const struct rtcweb_datachannel_open_request* req, uint32_t length,
uint16_t stream) {
RefPtr<DataChannel> channel;
uint32_t prValue;
uint16_t prPolicy;
ASSERT_WEBRTC(!NS_IsMainThread());
mLock.AssertCurrentThreadOwns();
const size_t requiredLength = (sizeof(*req) - 1) + ntohs(req->label_length) +
ntohs(req->protocol_length);
if (((size_t)length) != requiredLength) {
if (((size_t)length) < requiredLength) {
DC_ERROR(
("%s: insufficient length: %u, should be %zu. Unable to continue.",
__FUNCTION__, length, requiredLength));
return;
}
DC_WARN(("%s: Inconsistent length: %u, should be %zu", __FUNCTION__, length,
requiredLength));
}
DC_DEBUG(("%s: length %u, sizeof(*req) = %zu", __FUNCTION__, length,
sizeof(*req)));
switch (req->channel_type) {
case DATA_CHANNEL_RELIABLE:
case DATA_CHANNEL_RELIABLE_UNORDERED:
prPolicy = SCTP_PR_SCTP_NONE;
break;
case DATA_CHANNEL_PARTIAL_RELIABLE_REXMIT:
case DATA_CHANNEL_PARTIAL_RELIABLE_REXMIT_UNORDERED:
prPolicy = SCTP_PR_SCTP_RTX;
break;
case DATA_CHANNEL_PARTIAL_RELIABLE_TIMED:
case DATA_CHANNEL_PARTIAL_RELIABLE_TIMED_UNORDERED:
prPolicy = SCTP_PR_SCTP_TTL;
break;
default:
DC_ERROR(("Unknown channel type %d", req->channel_type));
/* XXX error handling */
return;
}
prValue = ntohl(req->reliability_param);
bool ordered = !(req->channel_type & 0x80);
if ((channel = FindChannelByStream(stream))) {
if (!channel->mNegotiated) {
DC_ERROR(
("HandleOpenRequestMessage: channel for pre-existing stream "
"%u that was not externally negotiated. JS is lying to us, or "
"there's an id collision.",
stream));
/* XXX: some error handling */
} else {
DC_DEBUG(("Open for externally negotiated channel %u", stream));
// XXX should also check protocol, maybe label
if (prPolicy != channel->mPrPolicy || prValue != channel->mPrValue ||
ordered != channel->mOrdered) {
DC_WARN(
("external negotiation mismatch with OpenRequest:"
"channel %u, policy %u/%u, value %u/%u, ordered %d/%d",
stream, prPolicy, channel->mPrPolicy, prValue, channel->mPrValue,
static_cast<int>(ordered), static_cast<int>(channel->mOrdered)));
}
}
return;
}
if (stream >= mNegotiatedIdLimit) {
DC_ERROR(("%s: stream %u out of bounds (%zu)", __FUNCTION__, stream,
mNegotiatedIdLimit));
return;
}
nsCString label(
nsDependentCSubstring(&req->label[0], ntohs(req->label_length)));
nsCString protocol(nsDependentCSubstring(
&req->label[ntohs(req->label_length)], ntohs(req->protocol_length)));
channel =
new DataChannel(this, stream, DataChannel::OPEN, label, protocol,
prPolicy, prValue, ordered, false, nullptr, nullptr);
mChannels.Insert(channel);
DC_DEBUG(("%s: sending ON_CHANNEL_CREATED for %s/%s: %u", __FUNCTION__,
channel->mLabel.get(), channel->mProtocol.get(), stream));
Dispatch(do_AddRef(