Source code
Revision control
Copy as Markdown
Other Tools
/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
/* vim: set sw=2 ts=8 et tw=80 : */
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
// HttpLog.h should generally be included first
#include "HttpLog.h"
#include "nsCOMPtr.h"
#include "nsStringFwd.h"
// Log on level :5, instead of default :4.
#undef LOG
#define LOG(args) LOG5(args)
#undef LOG_ENABLED
#define LOG_ENABLED() LOG5_ENABLED()
#include <algorithm>
#include "AltServiceChild.h"
#include "CacheControlParser.h"
#include "CachePushChecker.h"
#include "Http2Push.h"
#include "Http2Session.h"
#include "Http2Stream.h"
#include "Http2StreamBase.h"
#include "Http2StreamTunnel.h"
#include "LoadContextInfo.h"
#include "mozilla/EndianUtils.h"
#include "mozilla/glean/GleanMetrics.h"
#include "mozilla/Preferences.h"
#include "mozilla/Sprintf.h"
#include "mozilla/StaticPrefs_network.h"
#include "mozilla/Telemetry.h"
#include "nsHttp.h"
#include "nsHttpConnection.h"
#include "nsHttpHandler.h"
#include "nsIRequestContext.h"
#include "nsISupportsPriority.h"
#include "nsITLSSocketControl.h"
#include "nsNetUtil.h"
#include "nsQueryObject.h"
#include "nsSocketTransportService2.h"
#include "nsStandardURL.h"
#include "nsURLHelper.h"
#include "prnetdb.h"
#include "sslerr.h"
#include "sslt.h"
namespace mozilla {
namespace net {
// Http2Session has multiple inheritance of things that implement nsISupports
NS_IMPL_ADDREF(Http2Session)
NS_IMPL_RELEASE(Http2Session)
NS_INTERFACE_MAP_BEGIN(Http2Session)
NS_INTERFACE_MAP_ENTRY(nsISupportsWeakReference)
NS_INTERFACE_MAP_ENTRY_CONCRETE(Http2Session)
NS_INTERFACE_MAP_ENTRY_AMBIGUOUS(nsISupports, nsAHttpConnection)
NS_INTERFACE_MAP_END
static void RemoveStreamFromQueue(Http2StreamBase* aStream,
nsTArray<WeakPtr<Http2StreamBase>>& queue) {
for (const auto& stream : Reversed(queue)) {
if (stream == aStream) {
queue.RemoveElement(stream);
}
}
}
static void AddStreamToQueue(Http2StreamBase* aStream,
nsTArray<WeakPtr<Http2StreamBase>>& queue) {
if (!queue.Contains(aStream)) {
queue.AppendElement(aStream);
}
}
static already_AddRefed<Http2StreamBase> GetNextStreamFromQueue(
nsTArray<WeakPtr<Http2StreamBase>>& queue) {
while (!queue.IsEmpty() && !queue[0]) {
MOZ_ASSERT(false);
queue.RemoveElementAt(0);
}
if (queue.IsEmpty()) {
return nullptr;
}
RefPtr<Http2StreamBase> stream = queue[0].get();
queue.RemoveElementAt(0);
return stream.forget();
}
// "magic" refers to the string that preceeds HTTP/2 on the wire
// to help find any intermediaries speaking an older version of HTTP
const uint8_t Http2Session::kMagicHello[] = {
0x50, 0x52, 0x49, 0x20, 0x2a, 0x20, 0x48, 0x54, 0x54, 0x50, 0x2f, 0x32,
0x2e, 0x30, 0x0d, 0x0a, 0x0d, 0x0a, 0x53, 0x4d, 0x0d, 0x0a, 0x0d, 0x0a};
Http2Session* Http2Session::CreateSession(nsISocketTransport* aSocketTransport,
enum SpdyVersion version,
bool attemptingEarlyData) {
if (!gHttpHandler) {
RefPtr<nsHttpHandler> handler = nsHttpHandler::GetInstance();
Unused << handler.get();
}
Http2Session* session =
new Http2Session(aSocketTransport, version, attemptingEarlyData);
session->SendHello();
return session;
}
Http2Session::Http2Session(nsISocketTransport* aSocketTransport,
enum SpdyVersion version, bool attemptingEarlyData)
: mSocketTransport(aSocketTransport),
mSegmentReader(nullptr),
mSegmentWriter(nullptr),
mNextStreamID(3) // 1 is reserved for Updgrade handshakes
,
mLastPushedID(0),
mConcurrentHighWater(0),
mDownstreamState(BUFFERING_OPENING_SETTINGS),
mInputFrameBufferSize(kDefaultBufferSize),
mInputFrameBufferUsed(0),
mInputFrameDataSize(0),
mInputFrameDataRead(0),
mInputFrameFinal(false),
mInputFrameType(0),
mInputFrameFlags(0),
mInputFrameID(0),
mPaddingLength(0),
mInputFrameDataStream(nullptr),
mNeedsCleanup(nullptr),
mDownstreamRstReason(NO_HTTP_ERROR),
mExpectedHeaderID(0),
mExpectedPushPromiseID(0),
mContinuedPromiseStream(0),
mFlatHTTPResponseHeadersOut(0),
mShouldGoAway(false),
mClosed(false),
mCleanShutdown(false),
mReceivedSettings(false),
mTLSProfileConfirmed(false),
mGoAwayReason(NO_HTTP_ERROR),
mClientGoAwayReason(UNASSIGNED),
mPeerGoAwayReason(UNASSIGNED),
mGoAwayID(0),
mOutgoingGoAwayID(0),
mConcurrent(0),
mServerPushedResources(0),
mServerInitialStreamWindow(kDefaultRwin),
mLocalSessionWindow(kDefaultRwin),
mServerSessionWindow(kDefaultRwin),
mInitialRwin(ASpdySession::kInitialRwin),
mOutputQueueSize(kDefaultQueueSize),
mOutputQueueUsed(0),
mOutputQueueSent(0),
mLastReadEpoch(PR_IntervalNow()),
mPingSentEpoch(0),
mPreviousUsed(false),
mAggregatedHeaderSize(0),
mWaitingForSettingsAck(false),
mGoAwayOnPush(false),
mUseH2Deps(false),
mAttemptingEarlyData(attemptingEarlyData),
mOriginFrameActivated(false),
mCntActivated(0),
mTlsHandshakeFinished(false),
mPeerFailedHandshake(false),
mTrrStreams(0),
mEnableWebsockets(false),
mPeerAllowsWebsockets(false),
mProcessedWaitingWebsockets(false) {
MOZ_ASSERT(OnSocketThread(), "not on socket thread");
static uint64_t sSerial;
mSerial = ++sSerial;
LOG3(("Http2Session::Http2Session %p serial=0x%" PRIX64 "\n", this, mSerial));
mInputFrameBuffer = MakeUnique<char[]>(mInputFrameBufferSize);
mOutputQueueBuffer = MakeUnique<char[]>(mOutputQueueSize);
mDecompressBuffer.SetCapacity(kDefaultBufferSize);
mPushAllowance = gHttpHandler->SpdyPushAllowance();
mInitialRwin = std::max(gHttpHandler->SpdyPullAllowance(), mPushAllowance);
mMaxConcurrent = gHttpHandler->DefaultSpdyConcurrent();
mSendingChunkSize = gHttpHandler->SpdySendingChunkSize();
mLastDataReadEpoch = mLastReadEpoch;
mPingThreshold = gHttpHandler->SpdyPingThreshold();
mPreviousPingThreshold = mPingThreshold;
mCurrentBrowserId = gHttpHandler->ConnMgr()->CurrentBrowserId();
mEnableWebsockets = StaticPrefs::network_http_http2_websockets();
bool dumpHpackTables = StaticPrefs::network_http_http2_enable_hpack_dump();
mCompressor.SetDumpTables(dumpHpackTables);
mDecompressor.SetDumpTables(dumpHpackTables);
}
void Http2Session::Shutdown(nsresult aReason) {
for (const auto& stream : mStreamTransactionHash.Values()) {
ShutdownStream(stream, aReason);
}
for (auto& stream : mTunnelStreams) {
ShutdownStream(stream, aReason);
}
}
void Http2Session::ShutdownStream(Http2StreamBase* aStream, nsresult aReason) {
// On a clean server hangup the server sets the GoAwayID to be the ID of
// the last transaction it processed. If the ID of stream in the
// local stream is greater than that it can safely be restarted because the
// server guarantees it was not partially processed. Streams that have not
// registered an ID haven't actually been sent yet so they can always be
// restarted.
if (mCleanShutdown &&
(aStream->StreamID() > mGoAwayID || !aStream->HasRegisteredID())) {
CloseStream(aStream, NS_ERROR_NET_RESET); // can be restarted
} else if (aStream->RecvdData()) {
CloseStream(aStream, NS_ERROR_NET_PARTIAL_TRANSFER);
} else if (mGoAwayReason == INADEQUATE_SECURITY) {
CloseStream(aStream, NS_ERROR_NET_INADEQUATE_SECURITY);
} else if (!mCleanShutdown && (mGoAwayReason != NO_HTTP_ERROR)) {
CloseStream(aStream, NS_ERROR_NET_HTTP2_SENT_GOAWAY);
} else if (!mCleanShutdown && PossibleZeroRTTRetryError(aReason)) {
CloseStream(aStream, aReason);
} else {
CloseStream(aStream, NS_ERROR_ABORT);
}
}
Http2Session::~Http2Session() {
MOZ_DIAGNOSTIC_ASSERT(OnSocketThread());
LOG3(("Http2Session::~Http2Session %p mDownstreamState=%X", this,
mDownstreamState));
Shutdown(NS_OK);
if (mTrrStreams) {
mozilla::glean::networking::trr_request_count_per_conn.Get("h2"_ns).Add(
static_cast<int32_t>(mTrrStreams));
}
Telemetry::Accumulate(Telemetry::SPDY_PARALLEL_STREAMS, mConcurrentHighWater);
Telemetry::Accumulate(Telemetry::SPDY_REQUEST_PER_CONN_3, mCntActivated);
Telemetry::Accumulate(Telemetry::SPDY_SERVER_INITIATED_STREAMS,
mServerPushedResources);
Telemetry::Accumulate(Telemetry::SPDY_GOAWAY_LOCAL, mClientGoAwayReason);
Telemetry::Accumulate(Telemetry::SPDY_GOAWAY_PEER, mPeerGoAwayReason);
Telemetry::Accumulate(Telemetry::HTTP2_FAIL_BEFORE_SETTINGS,
mPeerFailedHandshake);
}
inline nsresult Http2Session::SessionError(enum errorType reason) {
LOG3(("Http2Session::SessionError %p reason=0x%x mPeerGoAwayReason=0x%x",
this, reason, mPeerGoAwayReason));
mGoAwayReason = reason;
if (reason == INADEQUATE_SECURITY) {
// This one is special, as we have an error page just for this
return NS_ERROR_NET_INADEQUATE_SECURITY;
}
// We're the one sending a generic GOAWAY
return NS_ERROR_NET_HTTP2_SENT_GOAWAY;
}
void Http2Session::LogIO(Http2Session* self, Http2StreamBase* stream,
const char* label, const char* data,
uint32_t datalen) {
if (!MOZ_LOG_TEST(gHttpIOLog, LogLevel::Verbose)) {
return;
}
MOZ_LOG(gHttpIOLog, LogLevel::Verbose,
("Http2Session::LogIO %p stream=%p id=0x%X [%s]", self, stream,
stream ? stream->StreamID() : 0, label));
// Max line is (16 * 3) + 10(prefix) + newline + null
char linebuf[128];
uint32_t index;
char* line = linebuf;
linebuf[127] = 0;
for (index = 0; index < datalen; ++index) {
if (!(index % 16)) {
if (index) {
*line = 0;
MOZ_LOG(gHttpIOLog, LogLevel::Verbose, ("%s", linebuf));
}
line = linebuf;
snprintf(line, 128, "%08X: ", index);
line += 10;
}
snprintf(line, 128 - (line - linebuf), "%02X ",
(reinterpret_cast<const uint8_t*>(data))[index]);
line += 3;
}
if (index) {
*line = 0;
MOZ_LOG(gHttpIOLog, LogLevel::Verbose, ("%s", linebuf));
}
}
using Http2ControlFx = nsresult (*)(Http2Session*);
static constexpr Http2ControlFx sControlFunctions[] = {
nullptr, // type 0 data is not a control function
Http2Session::RecvHeaders,
Http2Session::RecvPriority,
Http2Session::RecvRstStream,
Http2Session::RecvSettings,
Http2Session::RecvPushPromise,
Http2Session::RecvPing,
Http2Session::RecvGoAway,
Http2Session::RecvWindowUpdate,
Http2Session::RecvContinuation,
Http2Session::RecvAltSvc, // extension for type 0x0A
Http2Session::RecvUnused, // 0x0B was BLOCKED still radioactive
Http2Session::RecvOrigin, // extension for type 0x0C
Http2Session::RecvUnused, // 0x0D
Http2Session::RecvUnused, // 0x0E
Http2Session::RecvUnused, // 0x0F
Http2Session::RecvPriorityUpdate, // 0x10
};
static_assert(sControlFunctions[Http2Session::FRAME_TYPE_DATA] == nullptr);
static_assert(sControlFunctions[Http2Session::FRAME_TYPE_HEADERS] ==
Http2Session::RecvHeaders);
static_assert(sControlFunctions[Http2Session::FRAME_TYPE_PRIORITY] ==
Http2Session::RecvPriority);
static_assert(sControlFunctions[Http2Session::FRAME_TYPE_RST_STREAM] ==
Http2Session::RecvRstStream);
static_assert(sControlFunctions[Http2Session::FRAME_TYPE_SETTINGS] ==
Http2Session::RecvSettings);
static_assert(sControlFunctions[Http2Session::FRAME_TYPE_PUSH_PROMISE] ==
Http2Session::RecvPushPromise);
static_assert(sControlFunctions[Http2Session::FRAME_TYPE_PING] ==
Http2Session::RecvPing);
static_assert(sControlFunctions[Http2Session::FRAME_TYPE_GOAWAY] ==
Http2Session::RecvGoAway);
static_assert(sControlFunctions[Http2Session::FRAME_TYPE_WINDOW_UPDATE] ==
Http2Session::RecvWindowUpdate);
static_assert(sControlFunctions[Http2Session::FRAME_TYPE_CONTINUATION] ==
Http2Session::RecvContinuation);
static_assert(sControlFunctions[Http2Session::FRAME_TYPE_ALTSVC] ==
Http2Session::RecvAltSvc);
static_assert(sControlFunctions[Http2Session::FRAME_TYPE_UNUSED] ==
Http2Session::RecvUnused);
static_assert(sControlFunctions[Http2Session::FRAME_TYPE_ORIGIN] ==
Http2Session::RecvOrigin);
static_assert(sControlFunctions[0x0D] == Http2Session::RecvUnused);
static_assert(sControlFunctions[0x0E] == Http2Session::RecvUnused);
static_assert(sControlFunctions[0x0F] == Http2Session::RecvUnused);
static_assert(sControlFunctions[Http2Session::FRAME_TYPE_PRIORITY_UPDATE] ==
Http2Session::RecvPriorityUpdate);
bool Http2Session::RoomForMoreConcurrent() {
MOZ_ASSERT(OnSocketThread(), "not on socket thread");
return (mConcurrent < mMaxConcurrent);
}
bool Http2Session::RoomForMoreStreams() {
if (mNextStreamID + mStreamTransactionHash.Count() * 2 > kMaxStreamID) {
return false;
}
return !mShouldGoAway;
}
PRIntervalTime Http2Session::IdleTime() {
return PR_IntervalNow() - mLastDataReadEpoch;
}
uint32_t Http2Session::ReadTimeoutTick(PRIntervalTime now) {
MOZ_ASSERT(OnSocketThread(), "not on socket thread");
LOG3(("Http2Session::ReadTimeoutTick %p delta since last read %ds\n", this,
PR_IntervalToSeconds(now - mLastReadEpoch)));
if (!mPingThreshold) {
return UINT32_MAX;
}
if ((now - mLastReadEpoch) < mPingThreshold) {
// recent activity means ping is not an issue
if (mPingSentEpoch) {
mPingSentEpoch = 0;
if (mPreviousUsed) {
// restore the former value
mPingThreshold = mPreviousPingThreshold;
mPreviousUsed = false;
}
}
return PR_IntervalToSeconds(mPingThreshold) -
PR_IntervalToSeconds(now - mLastReadEpoch);
}
if (mPingSentEpoch) {
bool isTrr = (mTrrStreams > 0);
uint32_t pingTimeout = isTrr ? StaticPrefs::network_trr_ping_timeout()
: gHttpHandler->SpdyPingTimeout();
LOG3(
("Http2Session::ReadTimeoutTick %p handle outstanding ping, "
"timeout=%d\n",
this, pingTimeout));
if ((now - mPingSentEpoch) >= pingTimeout) {
LOG3(("Http2Session::ReadTimeoutTick %p Ping Timer Exhaustion\n", this));
if (mConnection) {
mConnection->SetCloseReason(ConnectionCloseReason::IDLE_TIMEOUT);
}
mPingSentEpoch = 0;
if (isTrr) {
// These must be set this way to ensure we gracefully restart all
// streams
mGoAwayID = 0;
mCleanShutdown = true;
// If TRR is mode 2, this Http2Session will be closed due to TRR request
// timeout, so we won't reach this code. If we are in mode 3, the
// request timeout is usually larger than the ping timeout. We close the
// stream with NS_ERROR_NET_RESET, so the transactions can be restarted.
Close(NS_ERROR_NET_RESET);
} else {
Close(NS_ERROR_NET_TIMEOUT);
}
return UINT32_MAX;
}
return 1; // run the tick aggressively while ping is outstanding
}
LOG3(("Http2Session::ReadTimeoutTick %p generating ping\n", this));
mPingSentEpoch = PR_IntervalNow();
if (!mPingSentEpoch) {
mPingSentEpoch = 1; // avoid the 0 sentinel value
}
GeneratePing(false);
Unused << ResumeRecv(); // read the ping reply
// Check for orphaned push streams. This looks expensive, but generally the
// list is empty.
Http2PushedStream* deleteMe;
TimeStamp timestampNow;
do {
deleteMe = nullptr;
for (uint32_t index = mPushedStreams.Length(); index > 0; --index) {
Http2PushedStream* pushedStream = mPushedStreams[index - 1];
if (timestampNow.IsNull()) {
timestampNow = TimeStamp::Now(); // lazy initializer
}
// if stream finished, but is not connected, and its been like that for
// long then cleanup the stream.
if (pushedStream->IsOrphaned(timestampNow)) {
LOG3(("Http2Session Timeout Pushed Stream %p 0x%X\n", this,
pushedStream->StreamID()));
deleteMe = pushedStream;
break; // don't CleanupStream() while iterating this vector
}
}
if (deleteMe) CleanupStream(deleteMe, NS_ERROR_ABORT, CANCEL_ERROR);
} while (deleteMe);
return 1; // run the tick aggressively while ping is outstanding
}
uint32_t Http2Session::RegisterStreamID(Http2StreamBase* stream,
uint32_t aNewID) {
MOZ_ASSERT(OnSocketThread(), "not on socket thread");
MOZ_ASSERT(mNextStreamID < 0xfffffff0,
"should have stopped admitting streams");
MOZ_ASSERT(!(aNewID & 1),
"0 for autoassign pull, otherwise explicit even push assignment");
if (!aNewID) {
// auto generate a new pull stream ID
aNewID = mNextStreamID;
MOZ_ASSERT(aNewID & 1, "pull ID must be odd.");
mNextStreamID += 2;
}
LOG1(
("Http2Session::RegisterStreamID session=%p stream=%p id=0x%X "
"concurrent=%d",
this, stream, aNewID, mConcurrent));
// We've used up plenty of ID's on this session. Start
// moving to a new one before there is a crunch involving
// server push streams or concurrent non-registered submits
if (aNewID >= kMaxStreamID) mShouldGoAway = true;
// integrity check
if (mStreamIDHash.Contains(aNewID)) {
LOG3((" New ID already present\n"));
MOZ_ASSERT(false, "New ID already present in mStreamIDHash");
mShouldGoAway = true;
return kDeadStreamID;
}
mStreamIDHash.InsertOrUpdate(aNewID, stream);
if (aNewID & 1) {
// don't count push streams here
RefPtr<nsHttpConnectionInfo> ci(stream->ConnectionInfo());
if (ci && ci->GetIsTrrServiceChannel()) {
IncrementTrrCounter();
}
}
return aNewID;
}
bool Http2Session::AddStream(nsAHttpTransaction* aHttpTransaction,
int32_t aPriority,
nsIInterfaceRequestor* aCallbacks) {
MOZ_ASSERT(OnSocketThread(), "not on socket thread");
// integrity check
if (mStreamTransactionHash.Contains(aHttpTransaction)) {
LOG3((" New transaction already present\n"));
MOZ_ASSERT(false, "AddStream duplicate transaction pointer");
return false;
}
if (!mConnection) {
mConnection = aHttpTransaction->Connection();
}
if (!mFirstHttpTransaction && !mTlsHandshakeFinished) {
mFirstHttpTransaction = aHttpTransaction->QueryHttpTransaction();
LOG3(("Http2Session::AddStream first session=%p trans=%p ", this,
mFirstHttpTransaction.get()));
}
if (mClosed || mShouldGoAway) {
nsHttpTransaction* trans = aHttpTransaction->QueryHttpTransaction();
if (trans) {
RefPtr<Http2PushedStreamWrapper> pushedStreamWrapper;
pushedStreamWrapper = trans->GetPushedStream();
if (!pushedStreamWrapper || !pushedStreamWrapper->GetStream()) {
LOG3(
("Http2Session::AddStream %p atrans=%p trans=%p session unusable - "
"resched.\n",
this, aHttpTransaction, trans));
aHttpTransaction->SetConnection(nullptr);
nsresult rv =
gHttpHandler->InitiateTransaction(trans, trans->Priority());
if (NS_FAILED(rv)) {
LOG3(
("Http2Session::AddStream %p atrans=%p trans=%p failed to "
"initiate "
"transaction (%08x).\n",
this, aHttpTransaction, trans, static_cast<uint32_t>(rv)));
}
return true;
}
}
}
aHttpTransaction->SetConnection(this);
aHttpTransaction->OnActivated();
CreateStream(aHttpTransaction, aPriority, Http2StreamBaseType::Normal);
return true;
}
void Http2Session::CreateStream(nsAHttpTransaction* aHttpTransaction,
int32_t aPriority,
Http2StreamBaseType streamType) {
RefPtr<Http2StreamBase> refStream;
switch (streamType) {
case Http2StreamBaseType::Normal:
refStream =
new Http2Stream(aHttpTransaction, this, aPriority, mCurrentBrowserId);
break;
case Http2StreamBaseType::WebSocket:
case Http2StreamBaseType::Tunnel:
case Http2StreamBaseType::ServerPush:
MOZ_RELEASE_ASSERT(false);
return;
}
LOG3(("Http2Session::AddStream session=%p stream=%p serial=%" PRIu64 " "
"NextID=0x%X (tentative)",
this, refStream.get(), mSerial, mNextStreamID));
RefPtr<Http2StreamBase> stream = refStream;
mStreamTransactionHash.InsertOrUpdate(aHttpTransaction, std::move(refStream));
AddStreamToQueue(stream, mReadyForWrite);
SetWriteCallbacks();
// Kick off the SYN transmit without waiting for the poll loop
// This won't work for the first stream because there is no segment reader
// yet.
if (mSegmentReader) {
uint32_t countRead;
Unused << ReadSegments(nullptr, kDefaultBufferSize, &countRead);
}
if (!(aHttpTransaction->Caps() & NS_HTTP_ALLOW_KEEPALIVE) &&
!aHttpTransaction->IsNullTransaction()) {
LOG3(("Http2Session::AddStream %p transaction %p forces keep-alive off.\n",
this, aHttpTransaction));
DontReuse();
}
}
already_AddRefed<nsHttpConnection> Http2Session::CreateTunnelStream(
nsAHttpTransaction* aHttpTransaction, nsIInterfaceRequestor* aCallbacks,
PRIntervalTime aRtt, bool aIsWebSocket) {
RefPtr<Http2StreamTunnel> refStream = CreateTunnelStreamFromConnInfo(
this, mCurrentBrowserId, aHttpTransaction->ConnectionInfo(),
aIsWebSocket);
RefPtr<nsHttpConnection> newConn = refStream->CreateHttpConnection(
aHttpTransaction, aCallbacks, aRtt, aIsWebSocket);
mTunnelStreams.AppendElement(std::move(refStream));
return newConn.forget();
}
void Http2Session::QueueStream(Http2StreamBase* stream) {
// will be removed via processpending or a shutdown path
MOZ_ASSERT(OnSocketThread(), "not on socket thread");
MOZ_ASSERT(!stream->CountAsActive());
MOZ_ASSERT(!stream->Queued());
LOG3(("Http2Session::QueueStream %p stream %p queued.", this, stream));
#ifdef DEBUG
for (const auto& qStream : mQueuedStreams) {
MOZ_ASSERT(qStream != stream);
MOZ_ASSERT(qStream->Queued());
}
#endif
stream->SetQueued(true);
AddStreamToQueue(stream, mQueuedStreams);
}
void Http2Session::ProcessPending() {
MOZ_ASSERT(OnSocketThread(), "not on socket thread");
RefPtr<Http2StreamBase> stream;
while (RoomForMoreConcurrent() &&
(stream = GetNextStreamFromQueue(mQueuedStreams))) {
LOG3(("Http2Session::ProcessPending %p stream %p woken from queue.", this,
stream.get()));
MOZ_ASSERT(!stream->CountAsActive());
MOZ_ASSERT(stream->Queued());
stream->SetQueued(false);
AddStreamToQueue(stream, mReadyForWrite);
SetWriteCallbacks();
}
}
nsresult Http2Session::NetworkRead(nsAHttpSegmentWriter* writer, char* buf,
uint32_t count, uint32_t* countWritten) {
MOZ_ASSERT(OnSocketThread(), "not on socket thread");
if (!count) {
*countWritten = 0;
return NS_OK;
}
nsresult rv = writer->OnWriteSegment(buf, count, countWritten);
if (NS_SUCCEEDED(rv) && *countWritten > 0) {
mLastReadEpoch = PR_IntervalNow();
}
return rv;
}
void Http2Session::SetWriteCallbacks() {
if (mConnection &&
(GetWriteQueueSize() || (mOutputQueueUsed > mOutputQueueSent))) {
Unused << mConnection->ResumeSend();
}
}
void Http2Session::RealignOutputQueue() {
if (mAttemptingEarlyData) {
// We can't realign right now, because we may need what's in there if early
// data fails.
return;
}
mOutputQueueUsed -= mOutputQueueSent;
memmove(mOutputQueueBuffer.get(), mOutputQueueBuffer.get() + mOutputQueueSent,
mOutputQueueUsed);
mOutputQueueSent = 0;
}
void Http2Session::FlushOutputQueue() {
if (!mSegmentReader || !mOutputQueueUsed) return;
nsresult rv;
uint32_t countRead;
uint32_t avail = mOutputQueueUsed - mOutputQueueSent;
if (!avail && mAttemptingEarlyData) {
// This is kind of a hack, but there are cases where we'll have already
// written the data we want whlie doing early data, but we get called again
// with a reader, and we need to avoid calling the reader when there's
// nothing for it to read.
return;
}
rv = mSegmentReader->OnReadSegment(
mOutputQueueBuffer.get() + mOutputQueueSent, avail, &countRead);
LOG3(("Http2Session::FlushOutputQueue %p sz=%d rv=%" PRIx32 " actual=%d",
this, avail, static_cast<uint32_t>(rv), countRead));
// Dont worry about errors on write, we will pick this up as a read error too
if (NS_FAILED(rv)) return;
mOutputQueueSent += countRead;
if (mAttemptingEarlyData) {
return;
}
if (countRead == avail) {
mOutputQueueUsed = 0;
mOutputQueueSent = 0;
return;
}
// If the output queue is close to filling up and we have sent out a good
// chunk of data from the beginning then realign it.
if ((mOutputQueueSent >= kQueueMinimumCleanup) &&
((mOutputQueueSize - mOutputQueueUsed) < kQueueTailRoom)) {
RealignOutputQueue();
}
}
void Http2Session::DontReuse() {
LOG3(("Http2Session::DontReuse %p\n", this));
if (!OnSocketThread()) {
LOG3(("Http2Session %p not on socket thread\n", this));
nsCOMPtr<nsIRunnable> event = NewRunnableMethod(
"Http2Session::DontReuse", this, &Http2Session::DontReuse);
gSocketTransportService->Dispatch(event, NS_DISPATCH_NORMAL);
return;
}
mShouldGoAway = true;
if (!mClosed && !mStreamTransactionHash.Count()) {
Close(NS_OK);
}
}
enum SpdyVersion Http2Session::SpdyVersion() { return SpdyVersion::HTTP_2; }
uint32_t Http2Session::GetWriteQueueSize() {
MOZ_ASSERT(OnSocketThread(), "not on socket thread");
return mReadyForWrite.Length();
}
void Http2Session::ChangeDownstreamState(enum internalStateType newState) {
MOZ_ASSERT(OnSocketThread(), "not on socket thread");
LOG3(("Http2Session::ChangeDownstreamState() %p from %X to %X", this,
mDownstreamState, newState));
mDownstreamState = newState;
}
void Http2Session::ResetDownstreamState() {
MOZ_ASSERT(OnSocketThread(), "not on socket thread");
LOG3(("Http2Session::ResetDownstreamState() %p", this));
ChangeDownstreamState(BUFFERING_FRAME_HEADER);
if (mInputFrameFinal && mInputFrameDataStream) {
mInputFrameFinal = false;
LOG3((" SetRecvdFin id=0x%x\n", mInputFrameDataStream->StreamID()));
mInputFrameDataStream->SetRecvdFin(true);
MaybeDecrementConcurrent(mInputFrameDataStream);
}
mInputFrameFinal = false;
mInputFrameBufferUsed = 0;
mInputFrameDataStream = nullptr;
}
// return true if activated (and counted against max)
// otherwise return false and queue
bool Http2Session::TryToActivate(Http2StreamBase* aStream) {
if (aStream->Queued()) {
LOG3(("Http2Session::TryToActivate %p stream=%p already queued.\n", this,
aStream));
return false;
}
if (!RoomForMoreConcurrent()) {
LOG3(
("Http2Session::TryToActivate %p stream=%p no room for more concurrent "
"streams\n",
this, aStream));
QueueStream(aStream);
return false;
}
LOG3(("Http2Session::TryToActivate %p stream=%p\n", this, aStream));
IncrementConcurrent(aStream);
mCntActivated++;
return true;
}
void Http2Session::IncrementConcurrent(Http2StreamBase* stream) {
MOZ_ASSERT(OnSocketThread(), "not on socket thread");
MOZ_ASSERT(!stream->StreamID() || (stream->StreamID() & 1),
"Do not activate pushed streams");
nsAHttpTransaction* trans = stream->Transaction();
if (!trans || !trans->IsNullTransaction()) {
MOZ_ASSERT(!stream->CountAsActive());
stream->SetCountAsActive(true);
++mConcurrent;
if (mConcurrent > mConcurrentHighWater) {
mConcurrentHighWater = mConcurrent;
}
LOG3(
("Http2Session::IncrementCounter %p counting stream %p Currently %d "
"streams in session, high water mark is %d\n",
this, stream, mConcurrent, mConcurrentHighWater));
}
}
// call with data length (i.e. 0 for 0 data bytes - ignore 9 byte header)
// dest must have 9 bytes of allocated space
template <typename charType>
void Http2Session::CreateFrameHeader(charType dest, uint16_t frameLength,
uint8_t frameType, uint8_t frameFlags,
uint32_t streamID) {
MOZ_ASSERT(frameLength <= kMaxFrameData, "framelength too large");
MOZ_ASSERT(!(streamID & 0x80000000));
MOZ_ASSERT(!frameFlags || (frameType != FRAME_TYPE_PRIORITY &&
frameType != FRAME_TYPE_RST_STREAM &&
frameType != FRAME_TYPE_GOAWAY &&
frameType != FRAME_TYPE_WINDOW_UPDATE));