Source code

Revision control

Other Tools

/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
/* vim: set sw=2 ts=8 et tw=80 : */
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
// HttpLog.h should generally be included first
#include "HttpLog.h"
// Log on level :5, instead of default :4.
#undef LOG
#define LOG(args) LOG5(args)
#undef LOG_ENABLED
#define LOG_ENABLED() LOG5_ENABLED()
#include <algorithm>
#include "AltServiceChild.h"
#include "Http2Session.h"
#include "Http2Stream.h"
#include "Http2Push.h"
#include "mozilla/EndianUtils.h"
#include "mozilla/Telemetry.h"
#include "mozilla/Preferences.h"
#include "nsHttp.h"
#include "nsHttpHandler.h"
#include "nsHttpConnection.h"
#include "nsIRequestContext.h"
#include "nsISSLSocketControl.h"
#include "nsISupportsPriority.h"
#include "nsStandardURL.h"
#include "nsURLHelper.h"
#include "prnetdb.h"
#include "sslt.h"
#include "mozilla/Sprintf.h"
#include "nsSocketTransportService2.h"
#include "nsNetUtil.h"
#include "CacheControlParser.h"
#include "CachePushChecker.h"
#include "LoadContextInfo.h"
#include "TCPFastOpenLayer.h"
#include "nsQueryObject.h"
namespace mozilla {
namespace net {
// Http2Session has multiple inheritance of things that implement nsISupports
NS_IMPL_ADDREF(Http2Session)
NS_IMPL_RELEASE(Http2Session)
NS_INTERFACE_MAP_BEGIN(Http2Session)
NS_INTERFACE_MAP_ENTRY_AMBIGUOUS(nsISupports, nsAHttpConnection)
NS_INTERFACE_MAP_END
// "magic" refers to the string that preceeds HTTP/2 on the wire
// to help find any intermediaries speaking an older version of HTTP
const uint8_t Http2Session::kMagicHello[] = {
0x50, 0x52, 0x49, 0x20, 0x2a, 0x20, 0x48, 0x54, 0x54, 0x50, 0x2f, 0x32,
0x2e, 0x30, 0x0d, 0x0a, 0x0d, 0x0a, 0x53, 0x4d, 0x0d, 0x0a, 0x0d, 0x0a};
Http2Session* Http2Session::CreateSession(nsISocketTransport* aSocketTransport,
enum SpdyVersion version,
bool attemptingEarlyData) {
if (!gHttpHandler) {
RefPtr<nsHttpHandler> handler = nsHttpHandler::GetInstance();
Unused << handler.get();
}
Http2Session* session =
new Http2Session(aSocketTransport, version, attemptingEarlyData);
session->SendHello();
return session;
}
Http2Session::Http2Session(nsISocketTransport* aSocketTransport,
enum SpdyVersion version, bool attemptingEarlyData)
: mSocketTransport(aSocketTransport),
mSegmentReader(nullptr),
mSegmentWriter(nullptr),
mNextStreamID(3) // 1 is reserved for Updgrade handshakes
,
mLastPushedID(0),
mConcurrentHighWater(0),
mDownstreamState(BUFFERING_OPENING_SETTINGS),
mInputFrameBufferSize(kDefaultBufferSize),
mInputFrameBufferUsed(0),
mInputFrameDataSize(0),
mInputFrameDataRead(0),
mInputFrameFinal(false),
mInputFrameType(0),
mInputFrameFlags(0),
mInputFrameID(0),
mPaddingLength(0),
mInputFrameDataStream(nullptr),
mNeedsCleanup(nullptr),
mDownstreamRstReason(NO_HTTP_ERROR),
mExpectedHeaderID(0),
mExpectedPushPromiseID(0),
mContinuedPromiseStream(0),
mFlatHTTPResponseHeadersOut(0),
mShouldGoAway(false),
mClosed(false),
mCleanShutdown(false),
mReceivedSettings(false),
mTLSProfileConfirmed(false),
mGoAwayReason(NO_HTTP_ERROR),
mClientGoAwayReason(UNASSIGNED),
mPeerGoAwayReason(UNASSIGNED),
mGoAwayID(0),
mOutgoingGoAwayID(0),
mConcurrent(0),
mServerPushedResources(0),
mServerInitialStreamWindow(kDefaultRwin),
mLocalSessionWindow(kDefaultRwin),
mServerSessionWindow(kDefaultRwin),
mInitialRwin(ASpdySession::kInitialRwin),
mOutputQueueSize(kDefaultQueueSize),
mOutputQueueUsed(0),
mOutputQueueSent(0),
mLastReadEpoch(PR_IntervalNow()),
mPingSentEpoch(0),
mPreviousUsed(false),
mAggregatedHeaderSize(0),
mWaitingForSettingsAck(false),
mGoAwayOnPush(false),
mUseH2Deps(false),
mAttemptingEarlyData(attemptingEarlyData),
mOriginFrameActivated(false),
mCntActivated(0),
mTlsHandshakeFinished(false),
mCheckNetworkStallsWithTFO(false),
mLastRequestBytesSentTime(0),
mPeerFailedHandshake(false),
mTrrStreams(0),
mEnableWebsockets(false),
mPeerAllowsWebsockets(false),
mProcessedWaitingWebsockets(false) {
MOZ_ASSERT(OnSocketThread(), "not on socket thread");
static uint64_t sSerial;
mSerial = ++sSerial;
LOG3(("Http2Session::Http2Session %p serial=0x%" PRIX64 "\n", this, mSerial));
mInputFrameBuffer = MakeUnique<char[]>(mInputFrameBufferSize);
mOutputQueueBuffer = MakeUnique<char[]>(mOutputQueueSize);
mDecompressBuffer.SetCapacity(kDefaultBufferSize);
mPushAllowance = gHttpHandler->SpdyPushAllowance();
mInitialRwin = std::max(gHttpHandler->SpdyPullAllowance(), mPushAllowance);
mMaxConcurrent = gHttpHandler->DefaultSpdyConcurrent();
mSendingChunkSize = gHttpHandler->SpdySendingChunkSize();
mLastDataReadEpoch = mLastReadEpoch;
mPingThreshold = gHttpHandler->SpdyPingThreshold();
mPreviousPingThreshold = mPingThreshold;
mCurrentForegroundTabOuterContentWindowId =
gHttpHandler->ConnMgr()->CurrentTopLevelOuterContentWindowId();
mEnableWebsockets = gHttpHandler->IsH2WebsocketsEnabled();
bool dumpHpackTables = gHttpHandler->DumpHpackTables();
mCompressor.SetDumpTables(dumpHpackTables);
mDecompressor.SetDumpTables(dumpHpackTables);
}
void Http2Session::Shutdown() {
for (auto iter = mStreamTransactionHash.Iter(); !iter.Done(); iter.Next()) {
auto stream = iter.UserData();
// On a clean server hangup the server sets the GoAwayID to be the ID of
// the last transaction it processed. If the ID of stream in the
// local stream is greater than that it can safely be restarted because the
// server guarantees it was not partially processed. Streams that have not
// registered an ID haven't actually been sent yet so they can always be
// restarted.
if (mCleanShutdown &&
(stream->StreamID() > mGoAwayID || !stream->HasRegisteredID())) {
CloseStream(stream, NS_ERROR_NET_RESET); // can be restarted
} else if (stream->RecvdData()) {
CloseStream(stream, NS_ERROR_NET_PARTIAL_TRANSFER);
} else if (mGoAwayReason == INADEQUATE_SECURITY) {
CloseStream(stream, NS_ERROR_NET_INADEQUATE_SECURITY);
} else if (!mCleanShutdown && (mGoAwayReason != NO_HTTP_ERROR)) {
CloseStream(stream, NS_ERROR_NET_HTTP2_SENT_GOAWAY);
} else {
CloseStream(stream, NS_ERROR_ABORT);
}
}
}
Http2Session::~Http2Session() {
LOG3(("Http2Session::~Http2Session %p mDownstreamState=%X", this,
mDownstreamState));
Shutdown();
if (mTrrStreams) {
Telemetry::Accumulate(Telemetry::DNS_TRR_REQUEST_PER_CONN, mTrrStreams);
}
Telemetry::Accumulate(Telemetry::SPDY_PARALLEL_STREAMS, mConcurrentHighWater);
Telemetry::Accumulate(Telemetry::SPDY_REQUEST_PER_CONN_3, mCntActivated);
Telemetry::Accumulate(Telemetry::SPDY_SERVER_INITIATED_STREAMS,
mServerPushedResources);
Telemetry::Accumulate(Telemetry::SPDY_GOAWAY_LOCAL, mClientGoAwayReason);
Telemetry::Accumulate(Telemetry::SPDY_GOAWAY_PEER, mPeerGoAwayReason);
Telemetry::Accumulate(Telemetry::HTTP2_FAIL_BEFORE_SETTINGS,
mPeerFailedHandshake);
}
inline nsresult Http2Session::SessionError(enum errorType reason) {
LOG3(("Http2Session::SessionError %p reason=0x%x mPeerGoAwayReason=0x%x",
this, reason, mPeerGoAwayReason));
mGoAwayReason = reason;
if (reason == INADEQUATE_SECURITY) {
// This one is special, as we have an error page just for this
return NS_ERROR_NET_INADEQUATE_SECURITY;
}
// We're the one sending a generic GOAWAY
return NS_ERROR_NET_HTTP2_SENT_GOAWAY;
}
void Http2Session::LogIO(Http2Session* self, Http2Stream* stream,
const char* label, const char* data,
uint32_t datalen) {
if (!LOG5_ENABLED()) return;
LOG5(("Http2Session::LogIO %p stream=%p id=0x%X [%s]", self, stream,
stream ? stream->StreamID() : 0, label));
// Max line is (16 * 3) + 10(prefix) + newline + null
char linebuf[128];
uint32_t index;
char* line = linebuf;
linebuf[127] = 0;
for (index = 0; index < datalen; ++index) {
if (!(index % 16)) {
if (index) {
*line = 0;
LOG5(("%s", linebuf));
}
line = linebuf;
snprintf(line, 128, "%08X: ", index);
line += 10;
}
snprintf(line, 128 - (line - linebuf), "%02X ",
(reinterpret_cast<const uint8_t*>(data))[index]);
line += 3;
}
if (index) {
*line = 0;
LOG5(("%s", linebuf));
}
}
typedef nsresult (*Http2ControlFx)(Http2Session* self);
static Http2ControlFx sControlFunctions[] = {
nullptr, // type 0 data is not a control function
Http2Session::RecvHeaders,
Http2Session::RecvPriority,
Http2Session::RecvRstStream,
Http2Session::RecvSettings,
Http2Session::RecvPushPromise,
Http2Session::RecvPing,
Http2Session::RecvGoAway,
Http2Session::RecvWindowUpdate,
Http2Session::RecvContinuation,
Http2Session::RecvAltSvc, // extension for type 0x0A
Http2Session::RecvUnused, // 0x0B was BLOCKED still radioactive
Http2Session::RecvOrigin // extension for type 0x0C
};
bool Http2Session::RoomForMoreConcurrent() {
MOZ_ASSERT(OnSocketThread(), "not on socket thread");
return (mConcurrent < mMaxConcurrent);
}
bool Http2Session::RoomForMoreStreams() {
if (mNextStreamID + mStreamTransactionHash.Count() * 2 > kMaxStreamID)
return false;
return !mShouldGoAway;
}
PRIntervalTime Http2Session::IdleTime() {
return PR_IntervalNow() - mLastDataReadEpoch;
}
uint32_t Http2Session::ReadTimeoutTick(PRIntervalTime now) {
MOZ_ASSERT(OnSocketThread(), "not on socket thread");
LOG3(("Http2Session::ReadTimeoutTick %p delta since last read %ds\n", this,
PR_IntervalToSeconds(now - mLastReadEpoch)));
uint32_t nextTick = UINT32_MAX;
if (mCheckNetworkStallsWithTFO && mLastRequestBytesSentTime) {
PRIntervalTime initialResponseDelta = now - mLastRequestBytesSentTime;
if (initialResponseDelta >= gHttpHandler->FastOpenStallsTimeout()) {
gHttpHandler->IncrementFastOpenStallsCounter();
mCheckNetworkStallsWithTFO = false;
} else {
nextTick = PR_IntervalToSeconds(gHttpHandler->FastOpenStallsTimeout()) -
PR_IntervalToSeconds(initialResponseDelta);
}
}
if (!mPingThreshold) return nextTick;
if ((now - mLastReadEpoch) < mPingThreshold) {
// recent activity means ping is not an issue
if (mPingSentEpoch) {
mPingSentEpoch = 0;
if (mPreviousUsed) {
// restore the former value
mPingThreshold = mPreviousPingThreshold;
mPreviousUsed = false;
}
}
return std::min(nextTick, PR_IntervalToSeconds(mPingThreshold) -
PR_IntervalToSeconds(now - mLastReadEpoch));
}
if (mPingSentEpoch) {
LOG3(("Http2Session::ReadTimeoutTick %p handle outstanding ping\n", this));
if ((now - mPingSentEpoch) >= gHttpHandler->SpdyPingTimeout()) {
LOG3(("Http2Session::ReadTimeoutTick %p Ping Timer Exhaustion\n", this));
mPingSentEpoch = 0;
Close(NS_ERROR_NET_TIMEOUT);
return UINT32_MAX;
}
return 1; // run the tick aggressively while ping is outstanding
}
LOG3(("Http2Session::ReadTimeoutTick %p generating ping\n", this));
mPingSentEpoch = PR_IntervalNow();
if (!mPingSentEpoch) {
mPingSentEpoch = 1; // avoid the 0 sentinel value
}
GeneratePing(false);
Unused << ResumeRecv(); // read the ping reply
// Check for orphaned push streams. This looks expensive, but generally the
// list is empty.
Http2PushedStream* deleteMe;
TimeStamp timestampNow;
do {
deleteMe = nullptr;
for (uint32_t index = mPushedStreams.Length(); index > 0; --index) {
Http2PushedStream* pushedStream = mPushedStreams[index - 1];
if (timestampNow.IsNull())
timestampNow = TimeStamp::Now(); // lazy initializer
// if stream finished, but is not connected, and its been like that for
// long then cleanup the stream.
if (pushedStream->IsOrphaned(timestampNow)) {
LOG3(("Http2Session Timeout Pushed Stream %p 0x%X\n", this,
pushedStream->StreamID()));
deleteMe = pushedStream;
break; // don't CleanupStream() while iterating this vector
}
}
if (deleteMe) CleanupStream(deleteMe, NS_ERROR_ABORT, CANCEL_ERROR);
} while (deleteMe);
return 1; // run the tick aggressively while ping is outstanding
}
uint32_t Http2Session::RegisterStreamID(Http2Stream* stream, uint32_t aNewID) {
MOZ_ASSERT(OnSocketThread(), "not on socket thread");
MOZ_ASSERT(mNextStreamID < 0xfffffff0,
"should have stopped admitting streams");
MOZ_ASSERT(!(aNewID & 1),
"0 for autoassign pull, otherwise explicit even push assignment");
if (!aNewID) {
// auto generate a new pull stream ID
aNewID = mNextStreamID;
MOZ_ASSERT(aNewID & 1, "pull ID must be odd.");
mNextStreamID += 2;
}
LOG1(
("Http2Session::RegisterStreamID session=%p stream=%p id=0x%X "
"concurrent=%d",
this, stream, aNewID, mConcurrent));
// We've used up plenty of ID's on this session. Start
// moving to a new one before there is a crunch involving
// server push streams or concurrent non-registered submits
if (aNewID >= kMaxStreamID) mShouldGoAway = true;
// integrity check
if (mStreamIDHash.Get(aNewID)) {
LOG3((" New ID already present\n"));
MOZ_ASSERT(false, "New ID already present in mStreamIDHash");
mShouldGoAway = true;
return kDeadStreamID;
}
mStreamIDHash.Put(aNewID, stream);
// If TCP fast Open has been used and conection was idle for some time
// we will be cautious and watch out for bug 1395494.
if (!mCheckNetworkStallsWithTFO && mConnection) {
RefPtr<HttpConnectionBase> connBase = mConnection->HttpConnection();
RefPtr<nsHttpConnection> conn = do_QueryObject(connBase);
if (conn && (conn->GetFastOpenStatus() == TFO_DATA_SENT) &&
gHttpHandler
->CheckIfConnectionIsStalledOnlyIfIdleForThisAmountOfSeconds() &&
IdleTime() >=
gHttpHandler
->CheckIfConnectionIsStalledOnlyIfIdleForThisAmountOfSeconds()) {
// If a connection was using the TCP FastOpen and it was idle for a
// long time we should check for stalls like bug 1395494.
mCheckNetworkStallsWithTFO = true;
mLastRequestBytesSentTime = PR_IntervalNow();
}
}
if (aNewID & 1) {
// don't count push streams here
MOZ_ASSERT(stream->Transaction(), "no transation for the stream!");
RefPtr<nsHttpConnectionInfo> ci(stream->Transaction()->ConnectionInfo());
if (ci && ci->GetIsTrrServiceChannel()) {
IncrementTrrCounter();
}
}
return aNewID;
}
bool Http2Session::AddStream(nsAHttpTransaction* aHttpTransaction,
int32_t aPriority, bool aUseTunnel,
bool aIsWebsocket,
nsIInterfaceRequestor* aCallbacks) {
MOZ_ASSERT(OnSocketThread(), "not on socket thread");
// integrity check
if (mStreamTransactionHash.Get(aHttpTransaction)) {
LOG3((" New transaction already present\n"));
MOZ_ASSERT(false, "AddStream duplicate transaction pointer");
return false;
}
if (!mConnection) {
mConnection = aHttpTransaction->Connection();
}
if (!mFirstHttpTransaction && !mTlsHandshakeFinished) {
mFirstHttpTransaction = aHttpTransaction->QueryHttpTransaction();
LOG3(("Http2Session::AddStream first session=%p trans=%p ", this,
mFirstHttpTransaction.get()));
}
if (mClosed || mShouldGoAway) {
nsHttpTransaction* trans = aHttpTransaction->QueryHttpTransaction();
if (trans) {
RefPtr<Http2PushedStreamWrapper> pushedStreamWrapper;
pushedStreamWrapper = trans->GetPushedStream();
if (!pushedStreamWrapper || !pushedStreamWrapper->GetStream()) {
LOG3(
("Http2Session::AddStream %p atrans=%p trans=%p session unusable - "
"resched.\n",
this, aHttpTransaction, trans));
aHttpTransaction->SetConnection(nullptr);
nsresult rv =
gHttpHandler->InitiateTransaction(trans, trans->Priority());
if (NS_FAILED(rv)) {
LOG3(
("Http2Session::AddStream %p atrans=%p trans=%p failed to "
"initiate "
"transaction (%08x).\n",
this, aHttpTransaction, trans, static_cast<uint32_t>(rv)));
}
return true;
}
}
}
aHttpTransaction->SetConnection(this);
aHttpTransaction->OnActivated();
if (aIsWebsocket) {
MOZ_ASSERT(!aUseTunnel, "Websocket on tunnel?!");
nsHttpTransaction* trans = aHttpTransaction->QueryHttpTransaction();
MOZ_ASSERT(trans, "Websocket without transaction?!");
if (!trans) {
LOG3(("Http2Session::AddStream %p websocket without transaction. WAT?!",
this));
return true;
}
if (!mEnableWebsockets) {
LOG3(
("Http2Session::AddStream %p Re-queuing websocket as h1 due to "
"mEnableWebsockets=false",
this));
aHttpTransaction->SetConnection(nullptr);
aHttpTransaction->DisableSpdy();
nsresult rv = gHttpHandler->InitiateTransaction(trans, trans->Priority());
if (NS_FAILED(rv)) {
LOG3(
("Http2Session::AddStream %p failed to reinitiate websocket "
"transaction (0x%08x).",
this, static_cast<uint32_t>(rv)));
}
return true;
}
if (!mPeerAllowsWebsockets) {
LOG3(("Http2Session::AddStream %p mPeerAllowsWebsockets=false", this));
if (!mProcessedWaitingWebsockets) {
LOG3(
("Http2Session::AddStream %p waiting for SETTINGS to determine "
"fate of websocket",
this));
mWaitingWebsockets.AppendElement(aHttpTransaction);
mWaitingWebsocketCallbacks.AppendElement(aCallbacks);
} else {
LOG3(
("Http2Session::AddStream %p Re-queuing websocket as h1 due to "
"mPeerAllowsWebsockets=false",
this));
aHttpTransaction->SetConnection(nullptr);
aHttpTransaction->DisableSpdy();
if (trans) {
nsresult rv =
gHttpHandler->InitiateTransaction(trans, trans->Priority());
if (NS_FAILED(rv)) {
LOG3(
("Http2Session::AddStream %p failed to reinitiate websocket "
"transaction (%08x).\n",
this, static_cast<uint32_t>(rv)));
}
}
}
return true;
}
LOG3(("Http2Session::AddStream session=%p trans=%p websocket", this,
aHttpTransaction));
CreateWebsocketStream(aHttpTransaction, aCallbacks);
return true;
}
if (aUseTunnel) {
LOG3(("Http2Session::AddStream session=%p trans=%p OnTunnel", this,
aHttpTransaction));
DispatchOnTunnel(aHttpTransaction, aCallbacks);
return true;
}
Http2Stream* stream =
new Http2Stream(aHttpTransaction, this, aPriority,
mCurrentForegroundTabOuterContentWindowId);
LOG3(("Http2Session::AddStream session=%p stream=%p serial=%" PRIu64 " "
"NextID=0x%X (tentative)",
this, stream, mSerial, mNextStreamID));
mStreamTransactionHash.Put(aHttpTransaction, stream);
mReadyForWrite.Push(stream);
SetWriteCallbacks();
// Kick off the SYN transmit without waiting for the poll loop
// This won't work for the first stream because there is no segment reader
// yet.
if (mSegmentReader) {
uint32_t countRead;
Unused << ReadSegments(nullptr, kDefaultBufferSize, &countRead);
}
if (!(aHttpTransaction->Caps() & NS_HTTP_ALLOW_KEEPALIVE) &&
!aHttpTransaction->IsNullTransaction()) {
LOG3(("Http2Session::AddStream %p transaction %p forces keep-alive off.\n",
this, aHttpTransaction));
DontReuse();
}
return true;
}
void Http2Session::QueueStream(Http2Stream* stream) {
// will be removed via processpending or a shutdown path
MOZ_ASSERT(OnSocketThread(), "not on socket thread");
MOZ_ASSERT(!stream->CountAsActive());
MOZ_ASSERT(!stream->Queued());
LOG3(("Http2Session::QueueStream %p stream %p queued.", this, stream));
#ifdef DEBUG
int32_t qsize = mQueuedStreams.GetSize();
for (int32_t i = 0; i < qsize; i++) {
Http2Stream* qStream = mQueuedStreams.ObjectAt(i);
MOZ_ASSERT(qStream != stream);
MOZ_ASSERT(qStream->Queued());
}
#endif
stream->SetQueued(true);
mQueuedStreams.Push(stream);
}
void Http2Session::ProcessPending() {
MOZ_ASSERT(OnSocketThread(), "not on socket thread");
Http2Stream* stream;
while (RoomForMoreConcurrent() && (stream = mQueuedStreams.PopFront())) {
LOG3(("Http2Session::ProcessPending %p stream %p woken from queue.", this,
stream));
MOZ_ASSERT(!stream->CountAsActive());
MOZ_ASSERT(stream->Queued());
stream->SetQueued(false);
mReadyForWrite.Push(stream);
SetWriteCallbacks();
}
}
nsresult Http2Session::NetworkRead(nsAHttpSegmentWriter* writer, char* buf,
uint32_t count, uint32_t* countWritten) {
MOZ_ASSERT(OnSocketThread(), "not on socket thread");
if (!count) {
*countWritten = 0;
return NS_OK;
}
nsresult rv = writer->OnWriteSegment(buf, count, countWritten);
if (NS_SUCCEEDED(rv) && *countWritten > 0) {
mLastReadEpoch = PR_IntervalNow();
mCheckNetworkStallsWithTFO = false;
}
return rv;
}
void Http2Session::SetWriteCallbacks() {
if (mConnection &&
(GetWriteQueueSize() || (mOutputQueueUsed > mOutputQueueSent))) {
Unused << mConnection->ResumeSend();
}
}
void Http2Session::RealignOutputQueue() {
if (mAttemptingEarlyData) {
// We can't realign right now, because we may need what's in there if early
// data fails.
return;
}
mOutputQueueUsed -= mOutputQueueSent;
memmove(mOutputQueueBuffer.get(), mOutputQueueBuffer.get() + mOutputQueueSent,
mOutputQueueUsed);
mOutputQueueSent = 0;
}
void Http2Session::FlushOutputQueue() {
if (!mSegmentReader || !mOutputQueueUsed) return;
nsresult rv;
uint32_t countRead;
uint32_t avail = mOutputQueueUsed - mOutputQueueSent;
if (!avail && mAttemptingEarlyData) {
// This is kind of a hack, but there are cases where we'll have already
// written the data we want whlie doing early data, but we get called again
// with a reader, and we need to avoid calling the reader when there's
// nothing for it to read.
return;
}
rv = mSegmentReader->OnReadSegment(
mOutputQueueBuffer.get() + mOutputQueueSent, avail, &countRead);
LOG3(("Http2Session::FlushOutputQueue %p sz=%d rv=%" PRIx32 " actual=%d",
this, avail, static_cast<uint32_t>(rv), countRead));
// Dont worry about errors on write, we will pick this up as a read error too
if (NS_FAILED(rv)) return;
mOutputQueueSent += countRead;
if (mAttemptingEarlyData) {
return;
}
if (countRead == avail) {
mOutputQueueUsed = 0;
mOutputQueueSent = 0;
return;
}
// If the output queue is close to filling up and we have sent out a good
// chunk of data from the beginning then realign it.
if ((mOutputQueueSent >= kQueueMinimumCleanup) &&
((mOutputQueueSize - mOutputQueueUsed) < kQueueTailRoom)) {
RealignOutputQueue();
}
}
void Http2Session::DontReuse() {
LOG3(("Http2Session::DontReuse %p\n", this));
if (!OnSocketThread()) {
LOG3(("Http2Session %p not on socket thread\n", this));
nsCOMPtr<nsIRunnable> event = NewRunnableMethod(
"Http2Session::DontReuse", this, &Http2Session::DontReuse);
gSocketTransportService->Dispatch(event, NS_DISPATCH_NORMAL);
return;
}
mShouldGoAway = true;
if (!mClosed && !mStreamTransactionHash.Count()) {
Close(NS_OK);
}
}
enum SpdyVersion Http2Session::SpdyVersion() { return SpdyVersion::HTTP_2; }
uint32_t Http2Session::GetWriteQueueSize() {
MOZ_ASSERT(OnSocketThread(), "not on socket thread");
return mReadyForWrite.GetSize();
}
void Http2Session::ChangeDownstreamState(enum internalStateType newState) {
MOZ_ASSERT(OnSocketThread(), "not on socket thread");
LOG3(("Http2Session::ChangeDownstreamState() %p from %X to %X", this,
mDownstreamState, newState));
mDownstreamState = newState;
}
void Http2Session::ResetDownstreamState() {
MOZ_ASSERT(OnSocketThread(), "not on socket thread");
LOG3(("Http2Session::ResetDownstreamState() %p", this));
ChangeDownstreamState(BUFFERING_FRAME_HEADER);
if (mInputFrameFinal && mInputFrameDataStream) {
mInputFrameFinal = false;
LOG3((" SetRecvdFin id=0x%x\n", mInputFrameDataStream->StreamID()));
mInputFrameDataStream->SetRecvdFin(true);
MaybeDecrementConcurrent(mInputFrameDataStream);
}
mInputFrameFinal = false;
mInputFrameBufferUsed = 0;
mInputFrameDataStream = nullptr;
}
// return true if activated (and counted against max)
// otherwise return false and queue
bool Http2Session::TryToActivate(Http2Stream* aStream) {
if (aStream->Queued()) {
LOG3(("Http2Session::TryToActivate %p stream=%p already queued.\n", this,
aStream));
return false;
}
if (!RoomForMoreConcurrent()) {
LOG3(
("Http2Session::TryToActivate %p stream=%p no room for more concurrent "
"streams\n",
this, aStream));
QueueStream(aStream);
return false;
}
LOG3(("Http2Session::TryToActivate %p stream=%p\n", this, aStream));
IncrementConcurrent(aStream);
mCntActivated++;
return true;
}
void Http2Session::IncrementConcurrent(Http2Stream* stream) {
MOZ_ASSERT(OnSocketThread(), "not on socket thread");
MOZ_ASSERT(!stream->StreamID() || (stream->StreamID() & 1),
"Do not activate pushed streams");
nsAHttpTransaction* trans = stream->Transaction();
if (!trans || !trans->IsNullTransaction() ||
trans->QuerySpdyConnectTransaction()) {
MOZ_ASSERT(!stream->CountAsActive());
stream->SetCountAsActive(true);
++mConcurrent;
if (mConcurrent > mConcurrentHighWater) {
mConcurrentHighWater = mConcurrent;
}
LOG3(
("Http2Session::IncrementCounter %p counting stream %p Currently %d "
"streams in session, high water mark is %d\n",
this, stream, mConcurrent, mConcurrentHighWater));
}
}
// call with data length (i.e. 0 for 0 data bytes - ignore 9 byte header)
// dest must have 9 bytes of allocated space
template <typename charType>
void Http2Session::CreateFrameHeader(charType dest, uint16_t frameLength,
uint8_t frameType, uint8_t frameFlags,
uint32_t streamID) {
MOZ_ASSERT(frameLength <= kMaxFrameData, "framelength too large");
MOZ_ASSERT(!(streamID & 0x80000000));
MOZ_ASSERT(!frameFlags || (frameType != FRAME_TYPE_PRIORITY &&
frameType != FRAME_TYPE_RST_STREAM &&
frameType != FRAME_TYPE_GOAWAY &&
frameType != FRAME_TYPE_WINDOW_UPDATE));
dest[0] = 0x00;
NetworkEndian::writeUint16(dest + 1, frameLength);
dest[3] = frameType;
dest[4] = frameFlags;
NetworkEndian::writeUint32(dest + 5, streamID);
}
char* Http2Session::EnsureOutputBuffer(uint32_t spaceNeeded) {
// this is an infallible allocation (if an allocation is
// needed, which is probably isn't)
EnsureBuffer(mOutputQueueBuffer, mOutputQueueUsed + spaceNeeded,
mOutputQueueUsed, mOutputQueueSize);
return mOutputQueueBuffer.get() + mOutputQueueUsed;
}
template void Http2Session::CreateFrameHeader(char* dest, uint16_t frameLength,
uint8_t frameType,
uint8_t frameFlags,
uint32_t streamID);
template void Http2Session::CreateFrameHeader(uint8_t* dest,
uint16_t frameLength,
uint8_t frameType,
uint8_t frameFlags,
uint32_t streamID);
void Http2Session::MaybeDecrementConcurrent(Http2Stream* aStream) {
MOZ_ASSERT(OnSocketThread(), "not on socket thread");
LOG3(("MaybeDecrementConcurrent %p id=0x%X concurrent=%d active=%d\n", this,
aStream->StreamID(), mConcurrent, aStream->CountAsActive()));
if (!aStream->CountAsActive()) return;
MOZ_ASSERT(mConcurrent);
aStream->SetCountAsActive(false);
--mConcurrent;
ProcessPending();
}
// Need to decompress some data in order to keep the compression
// context correct, but we really don't care what the result is
nsresult Http2Session::UncompressAndDiscard(bool isPush) {
nsresult rv;
nsAutoCString trash;
rv = mDecompressor.DecodeHeaderBlock(
reinterpret_cast<const uint8_t*>(mDecompressBuffer.BeginReading()),
mDecompressBuffer.Length(), trash, isPush);
mDecompressBuffer.Truncate();
if (NS_FAILED(rv)) {
LOG3(("Http2Session::UncompressAndDiscard %p Compression Error\n", this));
mGoAwayReason = COMPRESSION_ERROR;
return rv;
}
return NS_OK;
}
void Http2Session::GeneratePing(bool isAck) {
MOZ_ASSERT(OnSocketThread(), "not on socket thread");
LOG3(("Http2Session::GeneratePing %p isAck=%d\n", this, isAck));
char* packet = EnsureOutputBuffer(kFrameHeaderBytes + 8);
mOutputQueueUsed += kFrameHeaderBytes + 8;
if (isAck) {
CreateFrameHeader(packet, 8, FRAME_TYPE_PING, kFlag_ACK, 0);
memcpy(packet + kFrameHeaderBytes,
mInputFrameBuffer.get() + kFrameHeaderBytes, 8);
} else {
CreateFrameHeader(packet, 8, FRAME_TYPE_PING, 0, 0);
memset(packet + kFrameHeaderBytes, 0, 8);
}
LogIO(this, nullptr, "Generate Ping", packet, kFrameHeaderBytes + 8);
FlushOutputQueue();
}
void Http2Session::GenerateSettingsAck() {
// need to generate ack of this settings frame
MOZ_ASSERT(OnSocketThread(), "not on socket thread");
LOG3(("Http2Session::GenerateSettingsAck %p\n", this));
char* packet = EnsureOutputBuffer(kFrameHeaderBytes);
mOutputQueueUsed += kFrameHeaderBytes;
CreateFrameHeader(packet, 0, FRAME_TYPE_SETTINGS, kFlag_ACK, 0);
LogIO(this, nullptr, "Generate Settings ACK", packet, kFrameHeaderBytes);
FlushOutputQueue();
}
void Http2Session::GeneratePriority(uint32_t aID, uint8_t aPriorityWeight) {
MOZ_ASSERT(OnSocketThread(), "not on socket thread");
LOG3(("Http2Session::GeneratePriority %p %X %X\n", this, aID,
aPriorityWeight));
char* packet = CreatePriorityFrame(aID, 0, aPriorityWeight);
LogIO(this, nullptr, "Generate Priority", packet, kFrameHeaderBytes + 5);
FlushOutputQueue();
}
void Http2Session::GenerateRstStream(uint32_t aStatusCode, uint32_t aID) {
MOZ_ASSERT(OnSocketThread(), "not on socket thread");
// make sure we don't do this twice for the same stream (at least if we
// have a stream entry for it)
Http2Stream* stream = mStreamIDHash.Get(aID);
if (stream) {
if (stream->SentReset()) return;
stream->SetSentReset(true);
}
LOG3(("Http2Session::GenerateRst %p 0x%X %d\n", this, aID, aStatusCode));
uint32_t frameSize = kFrameHeaderBytes + 4;
char* packet = EnsureOutputBuffer(frameSize);
mOutputQueueUsed += frameSize;
CreateFrameHeader(packet, 4, FRAME_TYPE_RST_STREAM, 0, aID);
NetworkEndian::writeUint32(packet + kFrameHeaderBytes, aStatusCode);
LogIO(this, nullptr, "Generate Reset", packet, frameSize);
FlushOutputQueue();
}
void Http2Session::GenerateGoAway(uint32_t aStatusCode) {
MOZ_ASSERT(OnSocketThread(), "not on socket thread");
LOG3(("Http2Session::GenerateGoAway %p code=%X\n", this, aStatusCode));
mClientGoAwayReason = aStatusCode;
uint32_t frameSize = kFrameHeaderBytes + 8;
char* packet = EnsureOutputBuffer(frameSize);
mOutputQueueUsed += frameSize;
CreateFrameHeader(packet, 8, FRAME_TYPE_GOAWAY, 0, 0);
// last-good-stream-id are bytes 9-12 reflecting pushes
NetworkEndian::writeUint32(packet + kFrameHeaderBytes, mOutgoingGoAwayID);
// bytes 13-16 are the status code.
NetworkEndian::writeUint32(packet + frameSize - 4, aStatusCode);
LogIO(this, nullptr, "Generate GoAway", packet, frameSize);
FlushOutputQueue();
}
// The Hello is comprised of
// 1] 24 octets of magic, which are designed to
// flush out silent but broken intermediaries
// 2] a settings frame which sets a small flow control window for pushes
// 3] a window update frame which creates a large session flow control window
// 4] 6 priority frames for streams which will never be opened with headers
// these streams (3, 5, 7, 9, b, d) build a dependency tree that all other
// streams will be direct leaves of.
void Http2Session::SendHello() {
MOZ_ASSERT(OnSocketThread(), "not on socket thread");
LOG3(("Http2Session::SendHello %p\n", this));
// sized for magic + 5 settings and a session window update and 6 priority
// frames 24 magic, 33 for settings (9 header + 4 settings @6), 13 for window
// update, 6 priority frames at 14 (9 + 5) each
static const uint32_t maxSettings = 5;
static const uint32_t prioritySize =
kPriorityGroupCount * (kFrameHeaderBytes + 5);
static const uint32_t maxDataLen =
24 + kFrameHeaderBytes + maxSettings * 6 + 13 + prioritySize;
char* packet = EnsureOutputBuffer(maxDataLen);
memcpy(packet, kMagicHello, 24);
mOutputQueueUsed += 24;
LogIO(this, nullptr, "Magic Connection Header", packet, 24);
packet = mOutputQueueBuffer.get() + mOutputQueueUsed;
memset(packet, 0, maxDataLen - 24);
// frame header will be filled in after we know how long the frame is
uint8_t numberOfEntries = 0;
// entries need to be listed in order by ID
// 1st entry is bytes 9 to 14
// 2nd entry is bytes 15 to 20
// 3rd entry is bytes 21 to 26
// 4th entry is bytes 27 to 32
// 5th entry is bytes 33 to 38
// Let the other endpoint know about our default HPACK decompress table size
uint32_t maxHpackBufferSize = gHttpHandler->DefaultHpackBuffer();
mDecompressor.SetInitialMaxBufferSize(maxHpackBufferSize);
NetworkEndian::writeUint16(packet + kFrameHeaderBytes + (6 * numberOfEntries),
SETTINGS_TYPE_HEADER_TABLE_SIZE);
NetworkEndian::writeUint32(
packet + kFrameHeaderBytes + (6 * numberOfEntries) + 2,
maxHpackBufferSize);
numberOfEntries++;
if (!gHttpHandler->AllowPush()) {
// If we don't support push then set MAX_CONCURRENT to 0 and also
// set ENABLE_PUSH to 0
NetworkEndian::writeUint16(
packet + kFrameHeaderBytes + (6 * numberOfEntries),
SETTINGS_TYPE_ENABLE_PUSH);
// The value portion of the setting pair is already initialized to 0
numberOfEntries++;
NetworkEndian::writeUint16(
packet + kFrameHeaderBytes + (6 * numberOfEntries),
SETTINGS_TYPE_MAX_CONCURRENT);
// The value portion of the setting pair is already initialized to 0
numberOfEntries++;
mWaitingForSettingsAck = true;
}
// Advertise the Push RWIN for the session, and on each new pull stream
// send a window update
NetworkEndian::writeUint16(packet + kFrameHeaderBytes + (6 * numberOfEntries),
SETTINGS_TYPE_INITIAL_WINDOW);
NetworkEndian::writeUint32(
packet + kFrameHeaderBytes + (6 * numberOfEntries) + 2, mPushAllowance);
numberOfEntries++;
// Make sure the other endpoint knows that we're sticking to the default max
// frame size
NetworkEndian::writeUint16(packet + kFrameHeaderBytes + (6 * numberOfEntries),
SETTINGS_TYPE_MAX_FRAME_SIZE);
NetworkEndian::writeUint32(
packet + kFrameHeaderBytes + (6 * numberOfEntries) + 2, kMaxFrameData);
numberOfEntries++;
MOZ_ASSERT(numberOfEntries <= maxSettings);
uint32_t dataLen = 6 * numberOfEntries;
CreateFrameHeader(packet, dataLen, FRAME_TYPE_SETTINGS, 0, 0);
mOutputQueueUsed += kFrameHeaderBytes + dataLen;
LogIO(this, nullptr, "Generate Settings", packet,
kFrameHeaderBytes + dataLen);
// now bump the local session window from 64KB
uint32_t sessionWindowBump = mInitialRwin - kDefaultRwin;
if (kDefaultRwin < mInitialRwin) {
// send a window update for the session (Stream 0) for something large
mLocalSessionWindow = mInitialRwin;
packet = mOutputQueueBuffer.get() + mOutputQueueUsed;
CreateFrameHeader(packet, 4, FRAME_TYPE_WINDOW_UPDATE, 0, 0);
mOutputQueueUsed += kFrameHeaderBytes + 4;
NetworkEndian::writeUint32(packet + kFrameHeaderBytes, sessionWindowBump);
LOG3(("Session Window increase at start of session %p %u\n", this,
sessionWindowBump));
LogIO(this, nullptr, "Session Window Bump ", packet, kFrameHeaderBytes + 4);
}
if (gHttpHandler->UseH2Deps() &&
gHttpHandler->CriticalRequestPrioritization()) {
mUseH2Deps = true;
MOZ_ASSERT(mNextStreamID == kLeaderGroupID);
CreatePriorityNode(kLeaderGroupID, 0, 200, "leader");
mNextStreamID += 2;
MOZ_ASSERT(mNextStreamID == kOtherGroupID);
CreatePriorityNode(kOtherGroupID, 0, 100, "other");
mNextStreamID += 2;
MOZ_ASSERT(mNextStreamID == kBackgroundGroupID);
CreatePriorityNode(kBackgroundGroupID, 0, 0, "background");
mNextStreamID += 2;
MOZ_ASSERT(mNextStreamID == kSpeculativeGroupID);
CreatePriorityNode(kSpeculativeGroupID, kBackgroundGroupID, 0,
"speculative");
mNextStreamID += 2;
MOZ_ASSERT(mNextStreamID == kFollowerGroupID);
CreatePriorityNode(kFollowerGroupID, kLeaderGroupID, 0, "follower");
mNextStreamID += 2;
MOZ_ASSERT(mNextStreamID == kUrgentStartGroupID);
CreatePriorityNode(kUrgentStartGroupID, 0, 240, "urgentStart");
mNextStreamID += 2;
// Hey, you! YES YOU! If you add/remove any groups here, you almost
// certainly need to change the lookup of the stream/ID hash in
// Http2Session::OnTransportStatus. Yeah, that's right. YOU!
}
FlushOutputQueue();
}
void Http2Session::SendPriorityFrame(uint32_t streamID, uint32_t dependsOn,
uint8_t weight) {
MOZ_ASSERT(OnSocketThread(), "not on socket thread");
LOG3(
("Http2Session::SendPriorityFrame %p Frame 0x%X depends on 0x%X "
"weight %d\n",
this, streamID, dependsOn, weight));
char* packet = CreatePriorityFrame(streamID, dependsOn, weight);
LogIO(this, nullptr, "SendPriorityFrame", packet, kFrameHeaderBytes + 5);
FlushOutputQueue();
}
char* Http2Session::CreatePriorityFrame(uint32_t streamID, uint32_t dependsOn,
uint8_t weight) {
MOZ_ASSERT(streamID, "Priority on stream 0");
char* packet = EnsureOutputBuffer(kFrameHeaderBytes + 5);
CreateFrameHeader(packet, 5, FRAME_TYPE_PRIORITY, 0, streamID);
mOutputQueueUsed += kFrameHeaderBytes + 5;
NetworkEndian::writeUint32(packet + kFrameHeaderBytes,
dependsOn); // depends on
packet[kFrameHeaderBytes + 4] = weight; // weight
return packet;
}
void Http2Session::CreatePriorityNode(uint32_t streamID, uint32_t dependsOn,
uint8_t weight, const char* label) {
char* packet = CreatePriorityFrame(streamID, dependsOn, weight);
LOG3(
("Http2Session %p generate Priority Frame 0x%X depends on 0x%X "
"weight %d for %s class\n",
this, streamID, dependsOn, weight, label));
LogIO(this, nullptr, "Priority dep node", packet, kFrameHeaderBytes + 5);
}
// perform a bunch of integrity checks on the stream.
// returns true if passed, false (plus LOG and ABORT) if failed.
bool Http2Session::VerifyStream(Http2Stream* aStream,
uint32_t aOptionalID = 0) {
// This is annoying, but at least it is O(1)
MOZ_ASSERT(OnSocketThread(), "not on socket thread");
#ifndef DEBUG
// Only do the real verification in debug builds
return true;
#else // DEBUG
if (!aStream) return true;
uint32_t test = 0;
do {
if (aStream->StreamID() == kDeadStreamID) break;
nsAHttpTransaction* trans = aStream->Transaction();
test++;
if (!trans) break;
test++;
if (mStreamTransactionHash.Get(trans) != aStream) break;
if (aStream->StreamID()) {
Http2Stream* idStream = mStreamIDHash.Get(aStream->StreamID());
test++;
if (idStream != aStream) break;
if (aOptionalID) {
test++;
if (idStream->StreamID() != aOptionalID) break;
}
}
// tests passed
return true;
} while (false);
LOG3(
("Http2Session %p VerifyStream Failure %p stream->id=0x%X "
"optionalID=0x%X trans=%p test=%d\n",
this, aStream, aStream->StreamID(), aOptionalID, aStream->Transaction(),
test));
MOZ_ASSERT(false, "VerifyStream");
return false;
#endif // DEBUG
}
void Http2Session::CleanupStream(Http2Stream* aStream, nsresult aResult,
errorType aResetCode) {
MOZ_ASSERT(OnSocketThread(), "not on socket thread");
LOG3(("Http2Session::CleanupStream %p %p 0x%X %" PRIX32 "\n", this, aStream,
aStream ? aStream->StreamID() : 0, static_cast<uint32_t>(aResult)));
if (!aStream) {
return;
}
Http2PushedStream* pushSource = aStream->PushSource();
if (pushSource) {
// aStream is a synthetic attached to an even push
MOZ_ASSERT(pushSource->GetConsumerStream() == aStream);
MOZ_ASSERT(!aStream->StreamID());
MOZ_ASSERT(!(pushSource->StreamID() & 0x1));
aStream->ClearPushSource();
}
if (aStream->DeferCleanup(aResult)) {
LOG3(("Http2Session::CleanupStream 0x%X deferred\n", aStream->StreamID()));
return;
}
if (!VerifyStream(aStream)) {
LOG3(("Http2Session::CleanupStream failed to verify stream\n"));
return;
}
// don't reset a stream that has recevied a fin or rst
if (!aStream->RecvdFin() && !aStream->RecvdReset() && aStream->StreamID() &&
!(mInputFrameFinal &&
(aStream == mInputFrameDataStream))) { // !(recvdfin with mark pending)
LOG3(("Stream 0x%X had not processed recv FIN, sending RST code %X\n",
aStream->StreamID(), aResetCode));
GenerateRstStream(aResetCode, aStream->StreamID());
}
CloseStream(aStream, aResult);
// Remove the stream from the ID hash table and, if an even id, the pushed
// table too.
uint32_t id = aStream->StreamID();
if (id > 0) {
mStreamIDHash.Remove(id);
if (!(id & 1)) {
mPushedStreams.RemoveElement(aStream);
Http2PushedStream* pushStream = static_cast<Http2PushedStream*>(aStream);
nsAutoCString hashKey;
DebugOnly<bool> rv = pushStream->GetHashKey(hashKey);
MOZ_ASSERT(rv);
nsIRequestContext* requestContext = aStream->RequestContext();
if (requestContext) {
SpdyPushCache* cache = requestContext->GetSpdyPushCache();
if (cache) {
// Make sure the id of the stream in the push cache is the same
// as the id of the stream we're cleaning up! See bug 1368080.
Http2PushedStream* trash =
cache->RemovePushedStreamHttp2ByID(hashKey, aStream->StreamID());
LOG3(
("Http2Session::CleanupStream %p aStream=%p pushStream=%p "
"trash=%p",
this, aStream, pushStream, trash));
}
}
}
}
RemoveStreamFromQueues(aStream);
// removing from the stream transaction hash will
// delete the Http2Stream and drop the reference to
// its transaction
mStreamTransactionHash.Remove(aStream->Transaction());
if (mShouldGoAway && !mStreamTransactionHash.Count()) Close(NS_OK);
if (pushSource) {
pushSource->SetDeferCleanupOnSuccess(false);
CleanupStream(pushSource, aResult, aResetCode);
}
}
void Http2Session::CleanupStream(uint32_t aID, nsresult aResult,
errorType aResetCode) {
MOZ_ASSERT(OnSocketThread(), "not on socket thread");
Http2Stream* stream = mStreamIDHash.Get(aID);
LOG3(("Http2Session::CleanupStream %p by ID 0x%X to stream %p\n", this, aID,
stream));
if (!stream) {
return;
}
CleanupStream(stream, aResult, aResetCode);
}
static void RemoveStreamFromQueue(Http2Stream* aStream,
nsDeque<Http2Stream>& queue) {
size_t size = queue.GetSize();
for (size_t count = 0; count < size; ++count) {
Http2Stream* stream = queue.PopFront();
if (stream != aStream) queue.Push(stream);
}
}
void Http2Session::RemoveStreamFromQueues(Http2Stream* aStream) {
RemoveStreamFromQueue(aStream, mReadyForWrite);
RemoveStreamFromQueue(aStream, mQueuedStreams);
RemoveStreamFromQueue(aStream, mPushesReadyForRead);
RemoveStreamFromQueue(aStream, mSlowConsumersReadyForRead);
}
void Http2Session::CloseStream(Http2Stream* aStream, nsresult aResult) {
MOZ_ASSERT(OnSocketThread(), "not on socket thread");
LOG3(("Http2Session::CloseStream %p %p 0x%x %" PRIX32 "\n", this, aStream,
aStream->StreamID(), static_cast<uint32_t>(aResult)));
MaybeDecrementConcurrent(aStream);
// Check if partial frame reader
if (aStream == mInputFrameDataStream) {
LOG3(("Stream had active partial read frame on close"));
ChangeDownstreamState(DISCARDING_DATA_FRAME);
mInputFrameDataStream = nullptr;
}
RemoveStreamFromQueues(aStream);
if (aStream->IsTunnel()) {
UnRegisterTunnel(aStream);
}
// Send the stream the close() indication
aStream->Close(aResult);
}
nsresult Http2Session::SetInputFrameDataStream(uint32_t streamID) {
mInputFrameDataStream = mStreamIDHash.Get(streamID);
if (VerifyStream(mInputFrameDataStream, streamID)) return NS_OK;
LOG3(("Http2Session::SetInputFrameDataStream failed to verify 0x%X\n",
streamID));
mInputFrameDataStream = nullptr;
return NS_ERROR_UNEXPECTED;
}
nsresult Http2Session::ParsePadding(uint8_t& paddingControlBytes,
uint16_t& paddingLength) {
if (mInputFrameFlags & kFlag_PADDED) {
paddingLength =
*reinterpret_cast<uint8_t*>(&mInputFrameBuffer[kFrameHeaderBytes]);
paddingControlBytes = 1;
} else {
paddingLength = 0;
paddingControlBytes = 0;
}
if (static_cast<uint32_t>(paddingLength + paddingControlBytes) >
mInputFrameDataSize) {
// This is fatal to the session
LOG3(
("Http2Session::ParsePadding %p stream 0x%x PROTOCOL_ERROR "
"paddingLength %d > frame size %d\n",
this, mInputFrameID, paddingLength, mInputFrameDataSize));
return SessionError(PROTOCOL_ERROR);
}
return NS_OK;
}
nsresult Http2Session::RecvHeaders(Http2Session* self) {
MOZ_ASSERT(self->mInputFrameType == FRAME_TYPE_HEADERS ||
self->mInputFrameType == FRAME_TYPE_CONTINUATION);
bool isContinuation = self->mExpectedHeaderID != 0;
// If this doesn't have END_HEADERS set on it then require the next
// frame to be HEADERS of the same ID
bool endHeadersFlag = self->mInputFrameFlags & kFlag_END_HEADERS;
if (endHeadersFlag)
self->mExpectedHeaderID = 0;
else
self->mExpectedHeaderID = self->mInputFrameID;
uint32_t priorityLen = 0;
if (self->mInputFrameFlags & kFlag_PRIORITY) {
priorityLen = 5;
}
nsresult rv = self->SetInputFrameDataStream(self->mInputFrameID);
MOZ_ASSERT(NS_SUCCEEDED(rv));
// Find out how much padding this frame has, so we can only extract the real
// header data from the frame.
uint16_t paddingLength = 0;
uint8_t paddingControlBytes = 0;
if (!isContinuation) {
self->mDecompressBuffer.Truncate();
rv = self->ParsePadding(paddingControlBytes, paddingLength);
if (NS_FAILED(rv)) {
return rv;
}
}
LOG3(
("Http2Session::RecvHeaders %p stream 0x%X priorityLen=%d stream=%p "
"end_stream=%d end_headers=%d priority_group=%d "
"paddingLength=%d padded=%d\n",
self, self->mInputFrameID, priorityLen, self->mInputFrameDataStream,
self->mInputFrameFlags & kFlag_END_STREAM,
self->mInputFrameFlags & kFlag_END_HEADERS,
self->mInputFrameFlags & kFlag_PRIORITY, paddingLength,
self->mInputFrameFlags & kFlag_PADDED));
if ((paddingControlBytes + priorityLen + paddingLength) >
self->mInputFrameDataSize) {
// This is fatal to the session
return self->SessionError(PROTOCOL_ERROR);
}
if (!self->mInputFrameDataStream) {
// Cannot find stream. We can continue the session, but we need to
// uncompress the header block to maintain the correct compression context
LOG3(
("Http2Session::RecvHeaders %p lookup mInputFrameID stream "
"0x%X failed. NextStreamID = 0x%X\n",
self, self->mInputFrameID, self->mNextStreamID));
if (self->mInputFrameID >= self->mNextStreamID)
self->GenerateRstStream(PROTOCOL_ERROR, self->mInputFrameID);
self->mDecompressBuffer.Append(
&self->mInputFrameBuffer[kFrameHeaderBytes + paddingControlBytes +
priorityLen],
self->mInputFrameDataSize - paddingControlBytes - priorityLen -
paddingLength);
if (self->mInputFrameFlags & kFlag_END_HEADERS) {
rv = self->UncompressAndDiscard(false);
if (NS_FAILED(rv)) {
LOG3(("Http2Session::RecvHeaders uncompress failed\n"));
// this is fatal to the session
self->mGoAwayReason = COMPRESSION_ERROR;
return rv;
}
}
self->ResetDownstreamState();
return NS_OK;
}
// make sure this is either the first headers or a trailer
if (self->mInputFrameDataStream->AllHeadersReceived() &&
!(self->mInputFrameFlags & kFlag_END_STREAM)) {
// Any header block after the first that does *not* end the stream is
// illegal.
LOG3(("Http2Session::Illegal Extra HeaderBlock %p 0x%X\n", self,
self->mInputFrameID));
return self->SessionError(PROTOCOL_ERROR);
}
// queue up any compression bytes
self->mDecompressBuffer.Append(
&self->mInputFrameBuffer[kFrameHeaderBytes + paddingControlBytes +
priorityLen],
self->mInputFrameDataSize - paddingControlBytes - priorityLen -
paddingLength);
self->mInputFrameDataStream->UpdateTransportReadEvents(
self->mInputFrameDataSize);
self->mLastDataReadEpoch = self->mLastReadEpoch;
if (!isContinuation) {
self->mAggregatedHeaderSize = self->mInputFrameDataSize -
paddingControlBytes - priorityLen -
paddingLength;
} else {
self->mAggregatedHeaderSize += self->mInputFrameDataSize -
paddingControlBytes - priorityLen -
paddingLength;
}
if (!endHeadersFlag) { // more are coming - don't process yet
self->ResetDownstreamState();
return NS_OK;
}
if (isContinuation) {
Telemetry::Accumulate(Telemetry::SPDY_CONTINUED_HEADERS,
self->mAggregatedHeaderSize);
}
rv = self->ResponseHeadersComplete();
if (rv == NS_ERROR_ILLEGAL_VALUE) {
LOG3(("Http2Session::RecvHeaders %p PROTOCOL_ERROR detected stream 0x%X\n",
self, self->mInputFrameID));
self->CleanupStream(self->mInputFrameDataStream, rv, PROTOCOL_ERROR);
self->ResetDownstreamState();
rv = NS_OK;
} else if (NS_FAILED(rv)) {
// This is fatal to the session.
self->mGoAwayReason = COMPRESSION_ERROR;
}
return rv;
}
// ResponseHeadersComplete() returns NS_ERROR_ILLEGAL_VALUE when the stream
// should be reset with a PROTOCOL_ERROR, NS_OK when the response headers were
// fine, and any other error is fatal to the session.
nsresult Http2Session::ResponseHeadersComplete() {
LOG3(("Http2Session::ResponseHeadersComplete %p for 0x%X fin=%d", this,
mInputFrameDataStream->StreamID(), mInputFrameFinal));
// Anything prior to AllHeadersReceived() => true is actual headers. After
// that, we need to handle them as trailers instead (which are special-cased
// so we don't have to use the nasty chunked parser for all h2, just in case).
if (mInputFrameDataStream->AllHeadersReceived()) {
LOG3(("Http2Session::ResponseHeadersComplete processing trailers"));
MOZ_ASSERT(mInputFrameFlags & kFlag_END_STREAM);
nsresult rv = mInputFrameDataStream->ConvertResponseTrailers(
&mDecompressor, mDecompressBuffer);
if (NS_FAILED(rv)) {
LOG3((
"Http2Session::ResponseHeadersComplete trailer conversion failed\n"));
return rv;
}
mFlatHTTPResponseHeadersOut = 0;
mFlatHTTPResponseHeaders.Truncate();
if (mInputFrameFinal) {
// need to process the fin
ChangeDownstreamState(PROCESSING_COMPLETE_HEADERS);
} else {
ResetDownstreamState();
}
return NS_OK;
}
// if this turns out to be a 1xx response code we have to
// undo the headers received bit that we are setting here.
bool didFirstSetAllRecvd = !mInputFrameDataStream->AllHeadersReceived();
mInputFrameDataStream->SetAllHeadersReceived();
// The stream needs to see flattened http headers
// Uncompressed http/2 format headers currently live in
// Http2Stream::mDecompressBuffer - convert that to HTTP format in
// mFlatHTTPResponseHeaders via ConvertHeaders()
nsresult rv;
int32_t httpResponseCode; // out param to ConvertResponseHeaders
mFlatHTTPResponseHeadersOut = 0;
rv = mInputFrameDataStream->ConvertResponseHeaders(
&mDecompressor, mDecompressBuffer, mFlatHTTPResponseHeaders,
httpResponseCode);
if (rv == NS_ERROR_NET_RESET) {
LOG(
("Http2Session::ResponseHeadersComplete %p ConvertResponseHeaders "
"reset\n",
this));
// This means the stream found connection-oriented auth. Treat this like we
// got a reset with HTTP_1_1_REQUIRED.
mInputFrameDataStream->Transaction()->DisableSpdy();
CleanupStream(mInputFrameDataStream, NS_ERROR_NET_RESET, CANCEL_ERROR);
ResetDownstreamState();
return NS_OK;
} else if (NS_FAILED(rv)) {
return rv;
}
// allow more headers in the case of 1xx
if (((httpResponseCode / 100) == 1) && didFirstSetAllRecvd) {
mInputFrameDataStream->UnsetAllHeadersReceived();
}
ChangeDownstreamState(PROCESSING_COMPLETE_HEADERS);
return NS_OK;
}
nsresult Http2Session::RecvPriority(Http2Session* self) {
MOZ_ASSERT(self->mInputFrameType == FRAME_TYPE_PRIORITY);
if (self->mInputFrameDataSize != 5) {
LOG3(("Http2Session::RecvPriority %p wrong length data=%d\n", self,
self->mInputFrameDataSize));
return self->SessionError(PROTOCOL_ERROR);
}
if (!self->mInputFrameID) {
LOG3(("Http2Session::RecvPriority %p stream ID of 0.\n", self));
return self->SessionError(PROTOCOL_ERROR);
}
nsresult rv = self->SetInputFrameDataStream(self->mInputFrameID);
if (NS_FAILED(rv)) return rv;
uint32_t newPriorityDependency = NetworkEndian::readUint32(
self->mInputFrameBuffer.get() + kFrameHeaderBytes);
bool exclusive = !!(newPriorityDependency & 0x80000000);
newPriorityDependency &= 0x7fffffff;
uint8_t newPriorityWeight =
*(self->mInputFrameBuffer.get() + kFrameHeaderBytes + 4);
// undefined what it means when the server sends a priority frame. ignore it.
LOG3(
("Http2Session::RecvPriority %p 0x%X received dependency=0x%X "
"weight=%u exclusive=%d",
self->mInputFrameDataStream, self->mInputFrameID, newPriorityDependency,
newPriorityWeight, exclusive));
self->ResetDownstreamState();
return NS_OK;
}
nsresult Http2Session::RecvRstStream(Http2Session* self) {
MOZ_ASSERT(self->mInputFrameType == FRAME_TYPE_RST_STREAM);
if (self->mInputFrameDataSize != 4) {
LOG3(("Http2Session::RecvRstStream %p RST_STREAM wrong length data=%d",
self, self->mInputFrameDataSize));
return self->SessionError(PROTOCOL_ERROR);
}
if (!self->mInputFrameID) {
LOG3(("Http2Session::RecvRstStream %p stream ID of 0.\n", self));
return self->SessionError(PROTOCOL_ERROR);
}
self->mDownstreamRstReason = NetworkEndian::readUint32(
self->mInputFrameBuffer.get() + kFrameHeaderBytes);
LOG3(("Http2Session::RecvRstStream %p RST_STREAM Reason Code %u ID %x\n",
self, self->mDownstreamRstReason, self->mInputFrameID));
DebugOnly<nsresult> rv = self->SetInputFrameDataStream(self->mInputFrameID);
MOZ_ASSERT(NS_SUCCEEDED(rv));
if (!self->mInputFrameDataStream) {
// if we can't find the stream just ignore it (4.2 closed)
self->ResetDownstreamState();
return NS_OK;
}
self->mInputFrameDataStream->SetRecvdReset(true);
self->MaybeDecrementConcurrent(self->mInputFrameDataStream);
self->ChangeDownstreamState(PROCESSING_CONTROL_RST_STREAM);
return NS_OK;
}
nsresult Http2Session::RecvSettings(Http2Session* self) {
MOZ_ASSERT(self->mInputFrameType == FRAME_TYPE_SETTINGS);
if (self->mInputFrameID) {
LOG3(("Http2Session::RecvSettings %p needs stream ID of 0. 0x%X\n", self,
self->mInputFrameID));
return self->SessionError(PROTOCOL_ERROR);
}
if (self->mInputFrameDataSize % 6) {
// Number of Settings is determined by dividing by each 6 byte setting
// entry. So the payload must be a multiple of 6.
LOG3(("Http2Session::RecvSettings %p SETTINGS wrong length data=%d", self,
self->mInputFrameDataSize));
return self->SessionError(PROTOCOL_ERROR);
}
self->mReceivedSettings = true;
uint32_t numEntries = self->mInputFrameDataSize / 6;
LOG3(
("Http2Session::RecvSettings %p SETTINGS Control Frame "
"with %d entries ack=%X",
self, numEntries, self->mInputFrameFlags & kFlag_ACK));
if ((self->mInputFrameFlags & kFlag_ACK) && self->mInputFrameDataSize) {
LOG3(("Http2Session::RecvSettings %p ACK with non zero payload is err\n",
self));
return self->SessionError(PROTOCOL_ERROR);
}
for (uint32_t index = 0; index < numEntries; ++index) {
uint8_t* setting =
reinterpret_cast<uint8_t*>(self->mInputFrameBuffer.get()) +
kFrameHeaderBytes + index * 6;
uint16_t id = NetworkEndian::readUint16(setting);
uint32_t value = NetworkEndian::readUint32(setting + 2);
LOG3(("Settings ID %u, Value %u", id, value));
switch (id) {
case SETTINGS_TYPE_HEADER_TABLE_SIZE:
LOG3(("Compression header table setting received: %d\n", value));
self->mCompressor.SetMaxBufferSize(value);
break;
case SETTINGS_TYPE_ENABLE_PUSH:
LOG3(("Client received an ENABLE Push SETTING. Odd.\n"));
// nop
break;
case SETTINGS_TYPE_MAX_CONCURRENT:
self->mMaxConcurrent = value;
Telemetry::Accumulate(Telemetry::SPDY_SETTINGS_MAX_STREAMS, value);
self->ProcessPending();
break;
case SETTINGS_TYPE_INITIAL_WINDOW: {
Telemetry::Accumulate(Telemetry::SPDY_SETTINGS_IW, value >> 10);
int32_t delta = value - self->mServerInitialStreamWindow;
self->mServerInitialStreamWindow = value;
// SETTINGS only adjusts stream windows. Leave the session window alone.
// We need to add the delta to all open streams (delta can be negative)
for (auto iter = self->mStreamTransactionHash.Iter(); !iter.Done();
iter.Next()) {
iter.Data()->UpdateServerReceiveWindow(delta);
}
} break;
case SETTINGS_TYPE_MAX_FRAME_SIZE: {
if ((value < kMaxFrameData) || (value >= 0x01000000)) {
LOG3(("Received invalid max frame size 0x%X", value));
return self->SessionError(PROTOCOL_ERROR);
}
// We stick to the default for simplicity's sake, so nothing to change
} break;
case SETTINGS_TYPE_ENABLE_CONNECT_PROTOCOL: {
if (value == 1) {
LOG3(("Enabling extended CONNECT"));
self->mPeerAllowsWebsockets = true;
} else if (value > 1) {
LOG3(("Peer sent invalid value for ENABLE_CONNECT_PROTOCOL %d",
value));
return self->SessionError(PROTOCOL_ERROR);
} else if (self->mPeerAllowsWebsockets) {
LOG3(("Peer tried to re-disable extended CONNECT"));
return self->SessionError(PROTOCOL_ERROR);
}
} break;
default:
LOG3(("Received an unknown SETTING id %d. Ignoring.", id));
break;
}
}
self->ResetDownstreamState();
if (!(self->mInputFrameFlags & kFlag_ACK)) {
self->GenerateSettingsAck();
} else if (self->mWaitingForSettingsAck) {
self->mGoAwayOnPush = true;
}
if (!self->mProcessedWaitingWebsockets) {
self->ProcessWaitingWebsockets();
}
return NS_OK;
}
nsresult Http2Session::RecvPushPromise(Http2Session* self) {
MOZ_ASSERT(self->mInputFrameType == FRAME_TYPE_PUSH_PROMISE ||
self->mInputFrameType == FRAME_TYPE_CONTINUATION);
// Find out how much padding this frame has, so we can only extract the real
// header data from the frame.
uint16_t paddingLength = 0;
uint8_t paddingControlBytes = 0;
// If this doesn't have END_PUSH_PROMISE set on it then require the next
// frame to be PUSH_PROMISE of the same ID
uint32_t promiseLen;
uint32_t promisedID;
if (self->mExpectedPushPromiseID) {
promiseLen = 0; // really a continuation frame
promisedID = self->mContinuedPromiseStream;
} else {
self->mDecompressBuffer.Truncate();
nsresult rv = self->ParsePadding(paddingControlBytes, paddingLength);
if (NS_FAILED(rv)) {
return rv;
}
promiseLen = 4;
promisedID =
NetworkEndian::readUint32(self->mInputFrameBuffer.get() +
kFrameHeaderBytes + paddingControlBytes);
promisedID &= 0x7fffffff;
if (promisedID <= self->mLastPushedID) {
LOG3(("Http2Session::RecvPushPromise %p ID too low %u expected > %u.\n",
self, promisedID, self->mLastPushedID));
return self->SessionError(PROTOCOL_ERROR);
}
self->mLastPushedID = promisedID;
}
uint32_t associatedID = self->mInputFrameID;
if (self->mInputFrameFlags & kFlag_END_PUSH_PROMISE) {
self->mExpectedPushPromiseID = 0;
self->mContinuedPromiseStream = 0;
} else {
self->mExpectedPushPromiseID = self->mInputFrameID;
self->mContinuedPromiseStream = promisedID;
}
if ((paddingControlBytes + promiseLen + paddingLength) >
self->mInputFrameDataSize) {
// This is fatal to the session
LOG3(
("Http2Session::RecvPushPromise %p ID 0x%X assoc ID 0x%X "
"PROTOCOL_ERROR extra %d > frame size %d\n",
self, promisedID, associatedID,
(paddingControlBytes + promiseLen + paddingLength),
self->mInputFrameDataSize));
return self->SessionError(PROTOCOL_ERROR);
}
LOG3(
("Http2Session::RecvPushPromise %p ID 0x%X assoc ID 0x%X "
"paddingLength %d padded %d\n",
self, promisedID, associatedID, paddingLength,
self->mInputFrameFlags & kFlag_PADDED));
if (!associatedID || !promisedID || (promisedID & 1)) {
LOG3(("Http2Session::RecvPushPromise %p ID invalid.\n", self));
return self->SessionError(PROTOCOL_ERROR);
}
// confirm associated-to
nsresult rv = self->SetInputFrameDataStream(associatedID);
if (NS_FAILED(rv)) return rv;
Http2Stream* associatedStream = self->mInputFrameDataStream;
++(self->mServerPushedResources);
// Anytime we start using the high bit of stream ID (either client or server)
// begin to migrate to a new session.
if (promisedID >= kMaxStreamID) self->mShouldGoAway = true;
bool resetStream = true;
SpdyPushCache* cache = nullptr;
if (self->mShouldGoAway && !Http2PushedStream::TestOnPush(associatedStream)) {
LOG3(
("Http2Session::RecvPushPromise %p cache push while in GoAway "
"mode refused.\n",
self));
self->GenerateRstStream(REFUSED_STREAM_ERROR, promisedID);
} else if (!gHttpHandler->AllowPush()) {
// ENABLE_PUSH and MAX_CONCURRENT_STREAMS of 0 in settings disabled push
LOG3(("Http2Session::RecvPushPromise Push Recevied when Disabled\n"));
if (self->mGoAwayOnPush) {
LOG3(("Http2Session::RecvPushPromise sending GOAWAY"));
return self->SessionError(PROTOCOL_ERROR);
}
self->GenerateRstStream(REFUSED_STREAM_ERROR, promisedID);
} else if (!(associatedID & 1)) {
LOG3(
("Http2Session::RecvPushPromise %p assocated=0x%X on pushed (even) "
"stream not allowed\n",
self, associatedID));
self->GenerateRstStream(PROTOCOL_ERROR, promisedID);
} else if (!associatedStream) {
LOG3(("Http2Session::RecvPushPromise %p lookup associated ID failed.\n",
self));
self->GenerateRstStream(PROTOCOL_ERROR, promisedID);
} else if (Http2PushedStream::TestOnPush(associatedStream)) {
LOG3(("Http2Session::RecvPushPromise %p will be handled by push listener.",
self));
resetStream = false;
} else {
nsIRequestContext* requestContext = associatedStream->RequestContext();
if (requestContext) {
cache = requestContext->GetSpdyPushCache();
if (!cache) {
cache = new SpdyPushCache();
requestContext->SetSpdyPushCache(cache);
}
}
if (!cache) {
// this is unexpected, but we can handle it just by refusing the push
LOG3(
("Http2Session::RecvPushPromise Push Recevied without push cache\n"));
self->GenerateRstStream(REFUSED_STREAM_ERROR, promisedID);
} else {
resetStream = false;
}
}
if (resetStream) {
// Need to decompress the headers even though we aren't using them yet in
// order to keep the compression context consistent for other frames
self->mDecompressBuffer.Append(
&self->mInputFrameBuffer[kFrameHeaderBytes + paddingControlBytes +
promiseLen],
self->mInputFrameDataSize - paddingControlBytes - promiseLen -
paddingLength);
if (self->mInputFrameFlags & kFlag_END_PUSH_PROMISE) {
rv = self->UncompressAndDiscard(true);
if (NS_FAILED(rv)) {
LOG3(("Http2Session::RecvPushPromise uncompress failed\n"));
self->mGoAwayReason = COMPRESSION_ERROR;
return rv;
}
}
self->ResetDownstreamState();
return NS_OK;
}
self->mDecompressBuffer.Append(
&self->mInputFrameBuffer[kFrameHeaderBytes + paddingControlBytes +
promiseLen],
self->mInputFrameDataSize - paddingControlBytes - promiseLen -
paddingLength);
if (self->mInputFrameType != FRAME_TYPE_CONTINUATION) {
self->mAggregatedHeaderSize = self->mInputFrameDataSize -
paddingControlBytes - promiseLen -
paddingLength;
} else {
self->mAggregatedHeaderSize += self->mInputFrameDataSize -
paddingControlBytes - promiseLen -
paddingLength;
}
if (!(self->mInputFrameFlags & kFlag_END_PUSH_PROMISE)) {
LOG3(
("Http2Session::RecvPushPromise not finishing processing for "
"multi-frame push\n"));
self->ResetDownstreamState();
return NS_OK;
}
if (self->mInputFrameType == FRAME_TYPE_CONTINUATION) {
Telemetry::