Source code
Revision control
Copy as Markdown
Other Tools
/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
#include "Http3WebTransportStream.h"
#include "HttpLog.h"
#include "Http3Session.h"
#include "Http3WebTransportSession.h"
#include "mozilla/TimeStamp.h"
#include "nsHttpHandler.h"
#include "nsIOService.h"
#include "nsIPipe.h"
#include "nsSocketTransportService2.h"
#include "nsIWebTransportStream.h"
namespace mozilla::net {
namespace {
// This is an nsAHttpTransaction that does nothing.
class DummyWebTransportStreamTransaction : public nsAHttpTransaction {
public:
NS_DECL_THREADSAFE_ISUPPORTS
DummyWebTransportStreamTransaction() = default;
void SetConnection(nsAHttpConnection*) override {}
nsAHttpConnection* Connection() override { return nullptr; }
void GetSecurityCallbacks(nsIInterfaceRequestor**) override {}
void OnTransportStatus(nsITransport* transport, nsresult status,
int64_t progress) override {}
bool IsDone() override { return false; }
nsresult Status() override { return NS_OK; }
uint32_t Caps() override { return 0; }
[[nodiscard]] nsresult ReadSegments(nsAHttpSegmentReader*, uint32_t,
uint32_t*) override {
return NS_OK;
}
[[nodiscard]] nsresult WriteSegments(nsAHttpSegmentWriter*, uint32_t,
uint32_t*) override {
return NS_OK;
}
void Close(nsresult reason) override {}
nsHttpConnectionInfo* ConnectionInfo() override { return nullptr; }
void SetProxyConnectFailed() override {}
nsHttpRequestHead* RequestHead() override { return nullptr; }
uint32_t Http1xTransactionCount() override { return 0; }
[[nodiscard]] nsresult TakeSubTransactions(
nsTArray<RefPtr<nsAHttpTransaction>>& outTransactions) override {
return NS_OK;
}
private:
virtual ~DummyWebTransportStreamTransaction() = default;
};
NS_IMPL_ISUPPORTS(DummyWebTransportStreamTransaction, nsISupportsWeakReference)
class WebTransportSendStreamStats : public nsIWebTransportSendStreamStats {
public:
NS_DECL_THREADSAFE_ISUPPORTS
explicit WebTransportSendStreamStats(uint64_t aSent, uint64_t aAcked)
: mTimeStamp(TimeStamp::Now()),
mTotalSent(aSent),
mTotalAcknowledged(aAcked) {}
NS_IMETHOD GetTimestamp(mozilla::TimeStamp* aTimestamp) override {
*aTimestamp = mTimeStamp;
return NS_OK;
}
NS_IMETHOD GetBytesSent(uint64_t* aBytesSent) override {
*aBytesSent = mTotalSent;
return NS_OK;
}
NS_IMETHOD GetBytesAcknowledged(uint64_t* aBytesAcknowledged) override {
*aBytesAcknowledged = mTotalAcknowledged;
return NS_OK;
}
private:
virtual ~WebTransportSendStreamStats() = default;
TimeStamp mTimeStamp;
uint64_t mTotalSent;
uint64_t mTotalAcknowledged;
};
NS_IMPL_ISUPPORTS(WebTransportSendStreamStats, nsIWebTransportSendStreamStats)
class WebTransportReceiveStreamStats
: public nsIWebTransportReceiveStreamStats {
public:
NS_DECL_THREADSAFE_ISUPPORTS
explicit WebTransportReceiveStreamStats(uint64_t aReceived)
: mTimeStamp(TimeStamp::Now()), mTotalReceived(aReceived) {}
NS_IMETHOD GetTimestamp(mozilla::TimeStamp* aTimestamp) override {
*aTimestamp = mTimeStamp;
return NS_OK;
}
NS_IMETHOD GetBytesReceived(uint64_t* aByteReceived) override {
*aByteReceived = mTotalReceived;
return NS_OK;
}
private:
virtual ~WebTransportReceiveStreamStats() = default;
TimeStamp mTimeStamp;
uint64_t mTotalReceived;
};
NS_IMPL_ISUPPORTS(WebTransportReceiveStreamStats,
nsIWebTransportReceiveStreamStats)
} // namespace
NS_IMPL_ISUPPORTS(Http3WebTransportStream, nsIInputStreamCallback)
Http3WebTransportStream::Http3WebTransportStream(
Http3Session* aSession, uint64_t aSessionId, WebTransportStreamType aType,
std::function<void(Result<RefPtr<Http3WebTransportStream>, nsresult>&&)>&&
aCallback)
: Http3StreamBase(new DummyWebTransportStreamTransaction(), aSession),
mSessionId(aSessionId),
mStreamType(aType),
mStreamRole(OUTGOING),
mStreamReadyCallback(std::move(aCallback)) {
LOG(("Http3WebTransportStream outgoing ctor %p", this));
}
Http3WebTransportStream::Http3WebTransportStream(Http3Session* aSession,
uint64_t aSessionId,
WebTransportStreamType aType,
uint64_t aStreamId)
: Http3StreamBase(new DummyWebTransportStreamTransaction(), aSession),
mSessionId(aSessionId),
mStreamType(aType),
mStreamRole(INCOMING),
// WAITING_DATA indicates we are waiting
// Http3WebTransportStream::OnInputStreamReady to be called.
mSendState(WAITING_DATA),
mStreamReadyCallback(nullptr) {
LOG(("Http3WebTransportStream incoming ctor %p", this));
mStreamId = aStreamId;
}
Http3WebTransportStream::~Http3WebTransportStream() {
LOG(("Http3WebTransportStream dtor %p", this));
}
nsresult Http3WebTransportStream::TryActivating() {
MOZ_ASSERT(OnSocketThread(), "not on socket thread");
return mSession->TryActivatingWebTransportStream(&mStreamId, this);
}
NS_IMETHODIMP Http3WebTransportStream::OnInputStreamReady(
nsIAsyncInputStream* aStream) {
LOG1(
("Http3WebTransportStream::OnInputStreamReady [this=%p stream=%p "
"state=%d]",
this, aStream, mSendState));
if (mSendState == SEND_DONE) {
// already closed
return NS_OK;
}
mSendState = SENDING;
mSession->StreamHasDataToWrite(this);
return NS_OK;
}
nsresult Http3WebTransportStream::InitOutputPipe() {
nsCOMPtr<nsIAsyncOutputStream> out;
nsCOMPtr<nsIAsyncInputStream> in;
NS_NewPipe2(getter_AddRefs(in), getter_AddRefs(out), true, true,
nsIOService::gDefaultSegmentSize,
nsIOService::gDefaultSegmentCount);
{
MutexAutoLock lock(mMutex);
mSendStreamPipeIn = std::move(in);
mSendStreamPipeOut = std::move(out);
}
nsresult rv =
mSendStreamPipeIn->AsyncWait(this, 0, 0, gSocketTransportService);
if (NS_FAILED(rv)) {
return rv;
}
mSendState = WAITING_DATA;
return NS_OK;
}
nsresult Http3WebTransportStream::InitInputPipe() {
nsCOMPtr<nsIAsyncOutputStream> out;
nsCOMPtr<nsIAsyncInputStream> in;
NS_NewPipe2(getter_AddRefs(in), getter_AddRefs(out), true, true,
nsIOService::gDefaultSegmentSize,
nsIOService::gDefaultSegmentCount);
{
MutexAutoLock lock(mMutex);
mReceiveStreamPipeIn = std::move(in);
mReceiveStreamPipeOut = std::move(out);
}
mRecvState = READING;
return NS_OK;
}
void Http3WebTransportStream::GetWriterAndReader(
nsIAsyncOutputStream** aOutOutputStream,
nsIAsyncInputStream** aOutInputStream) {
nsCOMPtr<nsIAsyncOutputStream> output;
nsCOMPtr<nsIAsyncInputStream> input;
{
MutexAutoLock lock(mMutex);
output = mSendStreamPipeOut;
input = mReceiveStreamPipeIn;
}
output.forget(aOutOutputStream);
input.forget(aOutInputStream);
}
already_AddRefed<nsIWebTransportSendStreamStats>
Http3WebTransportStream::GetSendStreamStats() {
MOZ_ASSERT(OnSocketThread(), "not on socket thread");
nsCOMPtr<nsIWebTransportSendStreamStats> stats =
new WebTransportSendStreamStats(mTotalSent, mTotalAcknowledged);
return stats.forget();
}
already_AddRefed<nsIWebTransportReceiveStreamStats>
Http3WebTransportStream::GetReceiveStreamStats() {
MOZ_ASSERT(OnSocketThread(), "not on socket thread");
nsCOMPtr<nsIWebTransportReceiveStreamStats> stats =
new WebTransportReceiveStreamStats(mTotalReceived);
return stats.forget();
}
nsresult Http3WebTransportStream::OnReadSegment(const char* buf, uint32_t count,
uint32_t* countRead) {
MOZ_ASSERT(OnSocketThread(), "not on socket thread");
LOG(("Http3WebTransportStream::OnReadSegment count=%u state=%d [this=%p]",
count, mSendState, this));
nsresult rv = NS_OK;
switch (mSendState) {
case WAITING_TO_ACTIVATE:
rv = TryActivating();
if (rv == NS_BASE_STREAM_WOULD_BLOCK) {
LOG3(
("Http3WebTransportStream::OnReadSegment %p cannot activate now. "
"queued.\n",
this));
break;
}
if (NS_FAILED(rv)) {
LOG3(
("Http3WebTransportStream::OnReadSegment %p cannot activate "
"error=0x%" PRIx32 ".",
this, static_cast<uint32_t>(rv)));
mStreamReadyCallback(Err(rv));
mStreamReadyCallback = nullptr;
break;
}
rv = InitOutputPipe();
if (NS_SUCCEEDED(rv) && mStreamType == WebTransportStreamType::BiDi) {
rv = InitInputPipe();
}
if (NS_FAILED(rv)) {
LOG3(
("Http3WebTransportStream::OnReadSegment %p failed to create pipe "
"error=0x%" PRIx32 ".",
this, static_cast<uint32_t>(rv)));
mSendState = SEND_DONE;
mStreamReadyCallback(Err(rv));
mStreamReadyCallback = nullptr;
break;
}
// Successfully activated.
mStreamReadyCallback(RefPtr{this});
mStreamReadyCallback = nullptr;
break;
case SENDING: {
rv = mSession->SendRequestBody(mStreamId, buf, count, countRead);
LOG3(
("Http3WebTransportStream::OnReadSegment %p sending body returns "
"error=0x%" PRIx32 ".",
this, static_cast<uint32_t>(rv)));
mTotalSent += *countRead;
} break;
case WAITING_DATA:
// Still waiting
LOG3((
"Http3WebTransportStream::OnReadSegment %p Still waiting for data...",
this));
break;
case SEND_DONE:
LOG3(("Http3WebTransportStream::OnReadSegment %p called after SEND_DONE ",
this));
MOZ_ASSERT(false, "We are done sending this request!");
MOZ_ASSERT(mStreamReadyCallback);
rv = NS_ERROR_UNEXPECTED;
mStreamReadyCallback(Err(rv));
mStreamReadyCallback = nullptr;
break;
}
mSocketOutCondition = rv;
return mSocketOutCondition;
}
// static
nsresult Http3WebTransportStream::ReadRequestSegment(
nsIInputStream* stream, void* closure, const char* buf, uint32_t offset,
uint32_t count, uint32_t* countRead) {
Http3WebTransportStream* wtStream = (Http3WebTransportStream*)closure;
nsresult rv = wtStream->OnReadSegment(buf, count, countRead);
LOG(("Http3WebTransportStream::ReadRequestSegment %p read=%u", wtStream,
*countRead));
return rv;
}
nsresult Http3WebTransportStream::ReadSegments() {
MOZ_ASSERT(OnSocketThread(), "not on socket thread");
LOG(("Http3WebTransportStream::ReadSegments [this=%p]", this));
nsresult rv = NS_OK;
uint32_t sendBytes = 0;
bool again = true;
do {
sendBytes = 0;
rv = mSocketOutCondition = NS_OK;
LOG(("Http3WebTransportStream::ReadSegments state=%d [this=%p]", mSendState,
this));
switch (mSendState) {
case WAITING_TO_ACTIVATE: {
LOG3(
("Http3WebTransportStream %p ReadSegments forcing OnReadSegment "
"call\n",
this));
uint32_t wasted = 0;
nsresult rv2 = OnReadSegment("", 0, &wasted);
LOG3((" OnReadSegment returned 0x%08" PRIx32,
static_cast<uint32_t>(rv2)));
if (mSendState != WAITING_DATA) {
break;
}
}
[[fallthrough]];
case WAITING_DATA:
[[fallthrough]];
case SENDING: {
if (mStreamRole == INCOMING &&
mStreamType == WebTransportStreamType::UniDi) {
rv = NS_OK;
break;
}
mSendState = SENDING;
rv = mSendStreamPipeIn->ReadSegments(ReadRequestSegment, this,
nsIOService::gDefaultSegmentSize,
&sendBytes);
} break;
case SEND_DONE: {
return NS_OK;
}
default:
sendBytes = 0;
rv = NS_OK;
break;
}
LOG(("Http3WebTransportStream::ReadSegments rv=0x%" PRIx32
" read=%u sock-cond=%" PRIx32 " again=%d mSendFin=%d [this=%p]",
static_cast<uint32_t>(rv), sendBytes,
static_cast<uint32_t>(mSocketOutCondition), again, mSendFin, this));
// XXX some streams return NS_BASE_STREAM_CLOSED to indicate EOF.
if (rv == NS_BASE_STREAM_CLOSED || !mPendingTasks.IsEmpty()) {
rv = NS_OK;
sendBytes = 0;
}
if (NS_FAILED(rv)) {
// if the writer didn't want to write any more data, then
// wait for the transaction to call ResumeSend.
if (rv == NS_BASE_STREAM_WOULD_BLOCK) {
mSendState = WAITING_DATA;
rv = mSendStreamPipeIn->AsyncWait(this, 0, 0, gSocketTransportService);
}
again = false;
// Got a WebTransport specific error
if (rv >= NS_ERROR_WEBTRANSPORT_CODE_BASE &&
rv <= NS_ERROR_WEBTRANSPORT_CODE_END) {
uint8_t errorCode = GetWebTransportErrorFromNSResult(rv);
mSendState = SEND_DONE;
Reset(WebTransportErrorToHttp3Error(errorCode));
rv = NS_OK;
}
} else if (NS_FAILED(mSocketOutCondition)) {
if (mSocketOutCondition != NS_BASE_STREAM_WOULD_BLOCK) {
rv = mSocketOutCondition;
}
again = false;
} else if (!sendBytes) {
mSendState = SEND_DONE;
rv = NS_OK;
again = false;
if (!mPendingTasks.IsEmpty()) {
LOG(("Has pending tasks to do"));
nsTArray<std::function<void()>> tasks = std::move(mPendingTasks);
for (const auto& task : tasks) {
task();
}
}
// Tell the underlying stream we're done
SendFin();
}
// write more to the socket until error or end-of-request...
} while (again && gHttpHandler->Active());
return rv;
}
nsresult Http3WebTransportStream::OnWriteSegment(char* buf, uint32_t count,
uint32_t* countWritten) {
MOZ_ASSERT(OnSocketThread(), "not on socket thread");
LOG(("Http3WebTransportStream::OnWriteSegment [this=%p, state=%d", this,
static_cast<uint32_t>(mRecvState)));
nsresult rv = NS_OK;
switch (mRecvState) {
case READING: {
rv = mSession->ReadResponseData(mStreamId, buf, count, countWritten,
&mFin);
if (*countWritten == 0) {
if (mFin) {
mRecvState = RECV_DONE;
rv = NS_BASE_STREAM_CLOSED;
} else {
rv = NS_BASE_STREAM_WOULD_BLOCK;
}
} else {
mTotalReceived += *countWritten;
if (mFin) {
mRecvState = RECEIVED_FIN;
}
}
} break;
case RECEIVED_FIN:
rv = NS_BASE_STREAM_CLOSED;
mRecvState = RECV_DONE;
break;
case RECV_DONE:
rv = NS_ERROR_UNEXPECTED;
break;
default:
rv = NS_ERROR_UNEXPECTED;
break;
}
// Remember the error received from lower layers. A stream pipe may overwrite
// it.
// If rv == NS_OK this will reset mSocketInCondition.
mSocketInCondition = rv;
return rv;
}
// static
nsresult Http3WebTransportStream::WritePipeSegment(nsIOutputStream* stream,
void* closure, char* buf,
uint32_t offset,
uint32_t count,
uint32_t* countWritten) {
Http3WebTransportStream* self = (Http3WebTransportStream*)closure;
nsresult rv = self->OnWriteSegment(buf, count, countWritten);
if (NS_FAILED(rv)) {
return rv;
}
LOG(("Http3WebTransportStream::WritePipeSegment %p written=%u", self,
*countWritten));
return rv;
}
nsresult Http3WebTransportStream::WriteSegments() {
MOZ_ASSERT(OnSocketThread(), "not on socket thread");
if (!mReceiveStreamPipeOut) {
return NS_OK;
}
LOG(("Http3WebTransportStream::WriteSegments [this=%p]", this));
nsresult rv = NS_OK;
uint32_t countWrittenSingle = 0;
bool again = true;
do {
mSocketInCondition = NS_OK;
countWrittenSingle = 0;
rv = mReceiveStreamPipeOut->WriteSegments(WritePipeSegment, this,
nsIOService::gDefaultSegmentSize,
&countWrittenSingle);
LOG(("Http3WebTransportStream::WriteSegments rv=0x%" PRIx32
" countWrittenSingle=%" PRIu32 " socketin=%" PRIx32 " [this=%p]",
static_cast<uint32_t>(rv), countWrittenSingle,
static_cast<uint32_t>(mSocketInCondition), this));
if (NS_FAILED(rv)) {
if (rv == NS_BASE_STREAM_WOULD_BLOCK) {
rv = NS_OK;
}
if (rv == NS_BASE_STREAM_CLOSED) {
mReceiveStreamPipeOut->Close();
rv = NS_OK;
}
again = false;
} else if (NS_FAILED(mSocketInCondition)) {
if (mSocketInCondition != NS_BASE_STREAM_WOULD_BLOCK) {
rv = mSocketInCondition;
if (rv == NS_BASE_STREAM_CLOSED) {
mReceiveStreamPipeOut->Close();
rv = NS_OK;
}
}
again = false;
}
// read more from the socket until error...
} while (again && gHttpHandler->Active());
return rv;
}
bool Http3WebTransportStream::Done() const {
return mSendState == SEND_DONE && mRecvState == RECV_DONE;
}
void Http3WebTransportStream::Close(nsresult aResult) {
MOZ_ASSERT(OnSocketThread(), "not on socket thread");
LOG(("Http3WebTransportStream::Close [this=%p]", this));
mTransaction = nullptr;
if (mSendStreamPipeIn) {
mSendStreamPipeIn->AsyncWait(nullptr, 0, 0, nullptr);
mSendStreamPipeIn->CloseWithStatus(aResult);
}
if (mReceiveStreamPipeOut) {
mReceiveStreamPipeOut->AsyncWait(nullptr, 0, 0, nullptr);
mReceiveStreamPipeOut->CloseWithStatus(aResult);
}
mSendState = SEND_DONE;
mRecvState = RECV_DONE;
mSession = nullptr;
}
void Http3WebTransportStream::SendFin() {
MOZ_ASSERT(OnSocketThread(), "not on socket thread");
LOG(("Http3WebTransportStream::SendFin [this=%p mSendState=%d]", this,
mSendState));
if (mSendFin || !mSession || mResetError) {
// Already closed.
return;
}
mSendFin = true;
switch (mSendState) {
case SENDING: {
mPendingTasks.AppendElement([self = RefPtr{this}]() {
self->mSession->CloseSendingSide(self->mStreamId);
});
} break;
case WAITING_DATA:
mSendState = SEND_DONE;
[[fallthrough]];
case SEND_DONE:
mSession->CloseSendingSide(mStreamId);
// StreamHasDataToWrite needs to be called to trigger ProcessOutput.
mSession->StreamHasDataToWrite(this);
break;
default:
MOZ_ASSERT_UNREACHABLE("invalid mSendState!");
break;
}
}
void Http3WebTransportStream::Reset(uint64_t aErrorCode) {
MOZ_ASSERT(OnSocketThread(), "not on socket thread");
LOG(("Http3WebTransportStream::Reset [this=%p, mSendState=%d]", this,
mSendState));
if (mResetError || !mSession || mSendFin) {
// The stream is already reset.
return;
}
mResetError = Some(aErrorCode);
switch (mSendState) {
case SENDING: {
LOG(("Http3WebTransportStream::Reset [this=%p] reset after sending data",
this));
mPendingTasks.AppendElement([self = RefPtr{this}]() {
// "Reset" needs a special treatment here. If we are sending data and
// ResetWebTransportStream is called before Http3Session::ProcessOutput,
// neqo will drop the last piece of data.
NS_DispatchToCurrentThread(
NS_NewRunnableFunction("Http3WebTransportStream::Reset", [self]() {
self->mSession->ResetWebTransportStream(self, *self->mResetError);
self->mSession->StreamHasDataToWrite(self);
self->mSession->ConnectSlowConsumer(self);
}));
});
} break;
case WAITING_DATA:
mSendState = SEND_DONE;
[[fallthrough]];
case SEND_DONE:
mSession->ResetWebTransportStream(this, *mResetError);
// StreamHasDataToWrite needs to be called to trigger ProcessOutput.
mSession->StreamHasDataToWrite(this);
mSession->ConnectSlowConsumer(this);
break;
default:
MOZ_ASSERT_UNREACHABLE("invalid mSendState!");
break;
}
}
void Http3WebTransportStream::SendStopSending(uint8_t aErrorCode) {