Source code
Revision control
Copy as Markdown
Other Tools
/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
// HttpLog.h should generally be included first
#include "HttpLog.h"
#include <algorithm>
#include "Http2WebTransportStream.h"
#include "Http2WebTransportSession.h"
#include "Capsule.h"
#include "CapsuleEncoder.h"
#include "nsIOService.h"
namespace mozilla::net {
NS_IMPL_ISUPPORTS(Http2WebTransportStream, nsIOutputStreamCallback,
nsIInputStreamCallback)
Http2WebTransportStream::Http2WebTransportStream(
Http2WebTransportSessionImpl* aWebTransportSession, StreamId aStreamId,
uint64_t aInitialMaxStreamData, uint64_t aInitialLocalMaxStreamData,
std::function<void(Result<RefPtr<WebTransportStreamBase>, nsresult>&&)>&&
aCallback)
: WebTransportStreamBase(aWebTransportSession->GetStreamId(),
std::move(aCallback)),
mWebTransportSession(aWebTransportSession),
mStreamId(aStreamId),
mOwnerThread(GetCurrentSerialEventTarget()),
mFc(aStreamId, aInitialMaxStreamData),
mReceiverFc(aStreamId, aInitialLocalMaxStreamData) {
LOG(("Http2WebTransportStream outgoing ctor:%p", this));
mStreamRole = OUTGOING;
mStreamType = mStreamId.StreamType();
}
Http2WebTransportStream::Http2WebTransportStream(
Http2WebTransportSessionImpl* aWebTransportSession,
uint64_t aInitialMaxStreamData, uint64_t aInitialLocalMaxStreamData,
StreamId aStreamId)
: WebTransportStreamBase(aWebTransportSession->GetStreamId(), nullptr),
mWebTransportSession(aWebTransportSession),
mStreamId(aStreamId),
mOwnerThread(GetCurrentSerialEventTarget()),
mFc(aStreamId, aInitialMaxStreamData),
mReceiverFc(aStreamId, aInitialLocalMaxStreamData) {
LOG(("Http2WebTransportStream incoming ctor:%p", this));
mStreamRole = INCOMING;
mStreamType = mStreamId.StreamType();
}
Http2WebTransportStream::~Http2WebTransportStream() {
LOG(("Http2WebTransportStream dtor:%p", this));
}
nsresult Http2WebTransportStream::Init() {
nsresult rv = NS_OK;
auto resultCallback = MakeScopeExit([&] {
if (NS_FAILED(rv)) {
mSendState = SEND_DONE;
mRecvState = RECV_DONE;
if (mStreamReadyCallback) {
mStreamReadyCallback(Err(rv));
}
} else {
mSocketInCondition = NS_OK;
mSocketOutCondition = NS_OK;
RefPtr<WebTransportStreamBase> stream = this;
if (mStreamReadyCallback) {
mStreamReadyCallback(stream);
}
}
mStreamReadyCallback = nullptr;
});
if (mStreamRole == INCOMING) {
rv = InitInputPipe();
if (NS_FAILED(rv)) {
return rv;
}
if (mStreamType == WebTransportStreamType::BiDi) {
rv = InitOutputPipe();
}
return rv;
}
MOZ_ASSERT(mStreamRole == OUTGOING);
rv = InitOutputPipe();
if (NS_FAILED(rv)) {
return rv;
}
if (mStreamType == WebTransportStreamType::BiDi) {
rv = InitInputPipe();
}
if (mSendStreamPipeIn) {
rv = mSendStreamPipeIn->AsyncWait(this, 0, 0, mOwnerThread);
}
return rv;
}
class StreamId Http2WebTransportStream::WebTransportStreamId() const {
return mStreamId;
}
uint64_t Http2WebTransportStream::GetStreamId() const { return mStreamId; }
void Http2WebTransportStream::SendStopSending(uint8_t aErrorCode) {
if (mSentStopSending || !mWebTransportSession) {
// A WT_STOP_SENDING capsule MUST NOT be sent multiple times for the same
// stream.
return;
}
mSentStopSending = true;
mStopSendingCapsule.emplace(
Capsule::WebTransportStopSending(aErrorCode, mStreamId));
mWebTransportSession->StreamHasCapsuleToSend();
mRecvState = RECV_DONE;
}
void Http2WebTransportStream::SendFin() {}
void Http2WebTransportStream::Reset(uint64_t aErrorCode) {
if (mSentReset || !mWebTransportSession || mSendState == SEND_DONE) {
// A WT_RESET_STREAM capsule MUST NOT be sent after a stream is closed or
// reset.
return;
}
mSentReset = true;
mStreamResetCapsule.emplace(Capsule::WebTransportResetStream(
aErrorCode, mTotalSent.value(), mStreamId));
mWebTransportSession->StreamHasCapsuleToSend();
mRecvState = RECV_DONE;
mSendState = SEND_DONE;
}
already_AddRefed<nsIWebTransportSendStreamStats>
Http2WebTransportStream::GetSendStreamStats() {
return nullptr;
}
already_AddRefed<nsIWebTransportReceiveStreamStats>
Http2WebTransportStream::GetReceiveStreamStats() {
return nullptr;
}
bool Http2WebTransportStream::RecvDone() const { return false; }
void Http2WebTransportStream::SetSendOrder(Maybe<int64_t> aSendOrder) {}
NS_IMETHODIMP
Http2WebTransportStream::OnInputStreamReady(nsIAsyncInputStream* aIn) {
LOG1(
("Http2WebTransportStream::OnInputStreamReady [this=%p stream=%p "
"state=%d]",
this, aIn, mSendState));
if (mSendState == SEND_DONE) {
// already closed
return NS_OK;
}
uint32_t sendBytes = 0;
return mSendStreamPipeIn->ReadSegments(
ReadRequestSegment, this, nsIOService::gDefaultSegmentSize, &sendBytes);
}
NS_IMETHODIMP
Http2WebTransportStream::OnOutputStreamReady(nsIAsyncOutputStream* aOut) {
if (!mCurrentOut) {
if (mOutgoingQueue.IsEmpty()) {
return NS_OK;
}
mCurrentOut = mOutgoingQueue.Pop();
}
while (mCurrentOut && mReceiveStreamPipeOut && (mRecvState != RECV_DONE)) {
char* writeBuffer = reinterpret_cast<char*>(const_cast<uint8_t*>(
mCurrentOut->GetData().Elements())) +
mWriteOffset;
uint32_t toWrite = mCurrentOut->GetData().Length() - mWriteOffset;
if (mReliableSize) {
if (mTotalReceived + toWrite > *mReliableSize) {
toWrite = *mReliableSize - mTotalReceived;
}
}
uint32_t wrote = 0;
nsresult rv = mReceiveStreamPipeOut->Write(writeBuffer, toWrite, &wrote);
LOG(("Http2WebTransportStream::Write rv=0x%" PRIx32 " wrote=%" PRIu32
" socketin=%" PRIx32 " [this=%p]",
static_cast<uint32_t>(rv), wrote,
static_cast<uint32_t>(mSocketInCondition), this));
if (rv == NS_BASE_STREAM_WOULD_BLOCK) {
mSocketInCondition =
mReceiveStreamPipeOut->AsyncWait(this, 0, 0, nullptr);
return mSocketInCondition;
}
if (NS_FAILED(rv)) {
LOG(("Http2WebTransportStream::OnOutputStreamReady %p failed %u\n", this,
static_cast<uint32_t>(rv)));
// TODO: close this stream
mSocketInCondition = rv;
mCurrentOut = nullptr;
mRecvState = RECV_DONE;
return NS_OK;
}
// Retire when sending data to the consumer.
mReceiverFc.AddRetired(wrote);
mWebTransportSession->ReceiverFc().AddRetired(wrote);
mWriteOffset += wrote;
mTotalReceived += wrote;
// A receiver of a WT_RESET_STREAM capsule can discard any data in excess of
// the Reliable Size indicated, even if that data was already received.
if (mReliableSize && mTotalReceived == *mReliableSize) {
mSocketInCondition = NS_OK;
mWriteOffset = 0;
mCurrentOut = nullptr;
mOutgoingQueue.Clear();
mRecvState = RECV_DONE;
break;
}
if (toWrite == wrote) {
mWriteOffset = 0;
mCurrentOut = mOutgoingQueue.IsEmpty() ? nullptr : mOutgoingQueue.Pop();
}
}
return NS_OK;
}
// static
nsresult Http2WebTransportStream::ReadRequestSegment(
nsIInputStream* stream, void* closure, const char* buf, uint32_t offset,
uint32_t count, uint32_t* countRead) {
Http2WebTransportStream* wtStream = (Http2WebTransportStream*)closure;
LOG(("Http2WebTransportStream::ReadRequestSegment %p count=%u", wtStream,
count));
*countRead = 0;
if (!wtStream->mWebTransportSession) {
return NS_ERROR_UNEXPECTED;
}
uint64_t limit =
std::min(wtStream->mWebTransportSession->SessionDataFc().Available(),
wtStream->mFc.Available());
if (limit < count) {
if (wtStream->mWebTransportSession->SessionDataFc().Available() < count) {
LOG(("blocked by session level flow control"));
wtStream->mWebTransportSession->SessionDataFc().Blocked();
}
if (wtStream->mFc.Available() < count) {
LOG(("blocked by stream level flow control"));
wtStream->mFc.Blocked();
}
return NS_BASE_STREAM_WOULD_BLOCK;
}
nsTArray<uint8_t> data;
data.AppendElements(buf, count);
Capsule capsule = Capsule::WebTransportStreamData(wtStream->mStreamId, false,
std::move(data));
UniquePtr<CapsuleEncoder> encoder = MakeUnique<CapsuleEncoder>();
encoder->EncodeCapsule(capsule);
wtStream->mCapsuleQueue.Push(std::move(encoder));
*countRead = count;
return NS_OK;
}
void Http2WebTransportStream::TakeOutputCapsule(
mozilla::Queue<UniquePtr<CapsuleEncoder>>& aOutput) {
LOG(("Http2WebTransportStream::TakeOutputCapsule %p", this));
if (mCapsuleQueue.IsEmpty()) {
mSendStreamPipeIn->AsyncWait(this, 0, 0, mOwnerThread);
return;
}
while (!mCapsuleQueue.IsEmpty()) {
UniquePtr<CapsuleEncoder> entry = mCapsuleQueue.Pop();
aOutput.Push(std::move(entry));
}
mSendStreamPipeIn->AsyncWait(this, 0, 0, mOwnerThread);
}
void Http2WebTransportStream::WriteMaintenanceCapsules(
mozilla::Queue<UniquePtr<CapsuleEncoder>>& aOutput) {
if (mStopSendingCapsule) {
UniquePtr<CapsuleEncoder> encoder = MakeUnique<CapsuleEncoder>();
encoder->EncodeCapsule(*mStopSendingCapsule);
mStopSendingCapsule = Nothing();
aOutput.Push(std::move(encoder));
}
if (mStreamResetCapsule) {
UniquePtr<CapsuleEncoder> encoder = MakeUnique<CapsuleEncoder>();
encoder->EncodeCapsule(*mStreamResetCapsule);
mStreamResetCapsule = Nothing();
aOutput.Push(std::move(encoder));
}
auto dataBlocked = mFc.CreateStreamDataBlockedCapsule();
if (dataBlocked) {
aOutput.Push(MakeUnique<CapsuleEncoder>(dataBlocked.ref()));
}
auto maxStreamData = mReceiverFc.CreateMaxStreamDataCapsule();
if (maxStreamData) {
aOutput.Push(MakeUnique<CapsuleEncoder>(maxStreamData.ref()));
}
// Keep reading data from the consumer.
mSendStreamPipeIn->AsyncWait(this, 0, 0, mOwnerThread);
}
nsresult Http2WebTransportStream::OnCapsule(Capsule&& aCapsule) {
switch (aCapsule.Type()) {
case CapsuleType::WT_STREAM: {
LOG(("Handling WT_STREAM\n"));
WebTransportStreamDataCapsule& streamData =
aCapsule.GetWebTransportStreamDataCapsule();
return HandleStreamData(false, std::move(streamData.mData));
}
case CapsuleType::WT_STREAM_FIN:
LOG(("Handling WT_STREAM_FIN\n"));
break;
case CapsuleType::WT_MAX_STREAM_DATA: {
LOG(("Handling WT_MAX_STREAM_DATA\n"));
WebTransportMaxStreamDataCapsule& maxStreamData =
aCapsule.GetWebTransportMaxStreamDataCapsule();
return HandleMaxStreamData(maxStreamData.mLimit);
}
case CapsuleType::WT_STREAM_DATA_BLOCKED:
LOG(("Handling WT_STREAM_DATA_BLOCKED\n"));
break;
default:
LOG(("Unhandled capsule type\n"));
break;
}
return NS_OK;
}
nsresult Http2WebTransportStream::HandleMaxStreamData(uint64_t aLimit) {
mFc.Update(aLimit);
return NS_OK;
}
void Http2WebTransportStream::OnStopSending() { mSendState = SEND_DONE; }
void Http2WebTransportStream::OnReset(uint64_t aSize) {
if (mReliableSize) {
return;
}
mReliableSize.emplace(aSize);
LOG(("Http2WebTransportStream::OnReset %p mReliableSize=%" PRIu64
" mTotalReceived=%" PRIu64,
this, *mReliableSize, mTotalReceived));
if (*mReliableSize < mTotalReceived) {
// A receiver MUST treat the receipt of a WT_RESET_STREAM with a Reliable
// Size smaller than the number of bytes it has received on the stream as a
// session error.
// TODO: find a better error code.
mWebTransportSession->OnError(0);
}
}
void Http2WebTransportStream::OnStreamDataSent(size_t aCount) {
LOG(("Http2WebTransportStream::OnStreamDataSent %p aCount=%" PRIu64
" mTotalSent=%" PRIu64,
this, static_cast<uint64_t>(aCount), mTotalSent.value()));
mTotalSent += aCount;
if (!mTotalSent.isValid()) {
// TODO: find a better error code.
mWebTransportSession->OnError(0);
return;
}
mFc.Consume(aCount);
mWebTransportSession->SessionDataFc().Consume(aCount);
}
void Http2WebTransportStream::Close(nsresult aResult) {
if (mSendStreamPipeIn) {
mSendStreamPipeIn->AsyncWait(nullptr, 0, 0, nullptr);
mSendStreamPipeIn->CloseWithStatus(aResult);
}
if (mReceiveStreamPipeOut) {
mReceiveStreamPipeOut->AsyncWait(nullptr, 0, 0, nullptr);
mReceiveStreamPipeOut->CloseWithStatus(aResult);
}
mSendState = SEND_DONE;
mRecvState = RECV_DONE;
mWebTransportSession = nullptr;
}
nsresult Http2WebTransportStream::HandleStreamData(bool aFin,
nsTArray<uint8_t>&& aData) {
LOG(("Http2WebTransportStream::HandleStreamData [this=%p, state=%d aFin=%d",
this, static_cast<uint32_t>(mRecvState), aFin));
if (NS_FAILED(mSocketInCondition)) {
mRecvState = RECV_DONE;
}
uint32_t countWrittenSingle = 0;
switch (mRecvState) {
case READING: {
size_t length = aData.Length();
if (length) {
auto newConsumed =
mReceiverFc.SetConsumed(mReceiverFc.Consumed() + length);
if (newConsumed.isErr()) {
mSocketInCondition = newConsumed.unwrapErr();
} else {
if (!mWebTransportSession->ReceiverFc().Consume(
newConsumed.unwrap())) {
LOG(("Exceed session flow control limit"));
mSocketInCondition = NS_ERROR_NOT_AVAILABLE;
} else {
mOutgoingQueue.Push(MakeUnique<StreamData>(std::move(aData)));
mSocketInCondition = OnOutputStreamReady(mReceiveStreamPipeOut);
}
}
} else {
// Empty WT_STREAM capsules MUST NOT be used unless they open or close a
// stream
// TODO: Handle empty stream capsule
}
LOG((
"Http2WebTransportStream::HandleStreamData "
"countWrittenSingle=%" PRIu32 " socketin=%" PRIx32 " [this=%p]",
countWrittenSingle, static_cast<uint32_t>(mSocketInCondition), this));
if (NS_FAILED(mSocketInCondition)) {
mReceiveStreamPipeOut->Close();
mRecvState = RECV_DONE;
} else {
if (aFin) {
mRecvState = RECEIVED_FIN;
}
}
} break;
case RECEIVED_FIN:
mRecvState = RECV_DONE;
break;
case RECV_DONE:
mSocketInCondition = NS_ERROR_UNEXPECTED;
break;
default:
mSocketInCondition = NS_ERROR_UNEXPECTED;
break;
}
return mSocketInCondition;
}
} // namespace mozilla::net