Source code

Revision control

Other Tools

1
/* -*- Mode: C++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2
/* vim:set ts=4 sts=2 sw=2 et cin: */
3
/* This Source Code Form is subject to the terms of the Mozilla Public
4
* License, v. 2.0. If a copy of the MPL was not distributed with this
5
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
6
7
#include "nsIOService.h"
8
#include "nsInputStreamPump.h"
9
#include "nsIStreamTransportService.h"
10
#include "nsISeekableStream.h"
11
#include "nsITransport.h"
12
#include "nsIThreadRetargetableStreamListener.h"
13
#include "nsThreadUtils.h"
14
#include "nsCOMPtr.h"
15
#include "mozilla/Logging.h"
16
#include "mozilla/NonBlockingAsyncInputStream.h"
17
#include "mozilla/SlicedInputStream.h"
18
#include "GeckoProfiler.h"
19
#include "nsIStreamListener.h"
20
#include "nsILoadGroup.h"
21
#include "nsNetCID.h"
22
#include "nsStreamUtils.h"
23
#include <algorithm>
24
25
static NS_DEFINE_CID(kStreamTransportServiceCID, NS_STREAMTRANSPORTSERVICE_CID);
26
27
//
28
// MOZ_LOG=nsStreamPump:5
29
//
30
static mozilla::LazyLogModule gStreamPumpLog("nsStreamPump");
31
#undef LOG
32
#define LOG(args) MOZ_LOG(gStreamPumpLog, mozilla::LogLevel::Debug, args)
33
34
//-----------------------------------------------------------------------------
35
// nsInputStreamPump methods
36
//-----------------------------------------------------------------------------
37
38
nsInputStreamPump::nsInputStreamPump()
39
: mState(STATE_IDLE),
40
mStreamOffset(0),
41
mStreamLength(0),
42
mSegSize(0),
43
mSegCount(0),
44
mStatus(NS_OK),
45
mSuspendCount(0),
46
mLoadFlags(LOAD_NORMAL),
47
mIsPending(false),
48
mProcessingCallbacks(false),
49
mWaitingForInputStreamReady(false),
50
mCloseWhenDone(false),
51
mRetargeting(false),
52
mAsyncStreamIsBuffered(false),
53
mMutex("nsInputStreamPump") {}
54
55
nsresult nsInputStreamPump::Create(nsInputStreamPump** result,
56
nsIInputStream* stream, uint32_t segsize,
57
uint32_t segcount, bool closeWhenDone,
58
nsIEventTarget* mainThreadTarget) {
59
nsresult rv = NS_ERROR_OUT_OF_MEMORY;
60
RefPtr<nsInputStreamPump> pump = new nsInputStreamPump();
61
if (pump) {
62
rv = pump->Init(stream, segsize, segcount, closeWhenDone, mainThreadTarget);
63
if (NS_SUCCEEDED(rv)) {
64
pump.forget(result);
65
}
66
}
67
return rv;
68
}
69
70
struct PeekData {
71
PeekData(nsInputStreamPump::PeekSegmentFun fun, void* closure)
72
: mFunc(fun), mClosure(closure) {}
73
74
nsInputStreamPump::PeekSegmentFun mFunc;
75
void* mClosure;
76
};
77
78
static nsresult CallPeekFunc(nsIInputStream* aInStream, void* aClosure,
79
const char* aFromSegment, uint32_t aToOffset,
80
uint32_t aCount, uint32_t* aWriteCount) {
81
NS_ASSERTION(aToOffset == 0, "Called more than once?");
82
NS_ASSERTION(aCount > 0, "Called without data?");
83
84
PeekData* data = static_cast<PeekData*>(aClosure);
85
data->mFunc(data->mClosure, reinterpret_cast<const uint8_t*>(aFromSegment),
86
aCount);
87
return NS_BINDING_ABORTED;
88
}
89
90
nsresult nsInputStreamPump::PeekStream(PeekSegmentFun callback, void* closure) {
91
RecursiveMutexAutoLock lock(mMutex);
92
93
MOZ_ASSERT(mAsyncStream, "PeekStream called without stream");
94
95
nsresult rv = CreateBufferedStreamIfNeeded();
96
NS_ENSURE_SUCCESS(rv, rv);
97
98
// See if the pipe is closed by checking the return of Available.
99
uint64_t dummy64;
100
rv = mAsyncStream->Available(&dummy64);
101
if (NS_FAILED(rv)) return rv;
102
uint32_t dummy = (uint32_t)std::min(dummy64, (uint64_t)UINT32_MAX);
103
104
PeekData data(callback, closure);
105
return mAsyncStream->ReadSegments(
106
CallPeekFunc, &data, net::nsIOService::gDefaultSegmentSize, &dummy);
107
}
108
109
nsresult nsInputStreamPump::EnsureWaiting() {
110
mMutex.AssertCurrentThreadIn();
111
112
// no need to worry about multiple threads... an input stream pump lives
113
// on only one thread at a time.
114
MOZ_ASSERT(mAsyncStream);
115
if (!mWaitingForInputStreamReady && !mProcessingCallbacks) {
116
// Ensure OnStateStop is called on the main thread.
117
if (mState == STATE_STOP) {
118
nsCOMPtr<nsIEventTarget> mainThread =
119
mLabeledMainThreadTarget ? mLabeledMainThreadTarget
120
: do_AddRef(GetMainThreadEventTarget());
121
if (mTargetThread != mainThread) {
122
mTargetThread = mainThread;
123
}
124
}
125
MOZ_ASSERT(mTargetThread);
126
nsresult rv = mAsyncStream->AsyncWait(this, 0, 0, mTargetThread);
127
if (NS_FAILED(rv)) {
128
NS_ERROR("AsyncWait failed");
129
return rv;
130
}
131
// Any retargeting during STATE_START or START_TRANSFER is complete
132
// after the call to AsyncWait; next callback wil be on mTargetThread.
133
mRetargeting = false;
134
mWaitingForInputStreamReady = true;
135
}
136
return NS_OK;
137
}
138
139
//-----------------------------------------------------------------------------
140
// nsInputStreamPump::nsISupports
141
//-----------------------------------------------------------------------------
142
143
// although this class can only be accessed from one thread at a time, we do
144
// allow its ownership to move from thread to thread, assuming the consumer
145
// understands the limitations of this.
146
NS_IMPL_ISUPPORTS(nsInputStreamPump, nsIRequest, nsIThreadRetargetableRequest,
147
nsIInputStreamCallback, nsIInputStreamPump)
148
149
//-----------------------------------------------------------------------------
150
// nsInputStreamPump::nsIRequest
151
//-----------------------------------------------------------------------------
152
153
NS_IMETHODIMP
154
nsInputStreamPump::GetName(nsACString& result) {
155
RecursiveMutexAutoLock lock(mMutex);
156
157
result.Truncate();
158
return NS_OK;
159
}
160
161
NS_IMETHODIMP
162
nsInputStreamPump::IsPending(bool* result) {
163
RecursiveMutexAutoLock lock(mMutex);
164
165
*result = (mState != STATE_IDLE);
166
return NS_OK;
167
}
168
169
NS_IMETHODIMP
170
nsInputStreamPump::GetStatus(nsresult* status) {
171
RecursiveMutexAutoLock lock(mMutex);
172
173
*status = mStatus;
174
return NS_OK;
175
}
176
177
NS_IMETHODIMP
178
nsInputStreamPump::Cancel(nsresult status) {
179
MOZ_ASSERT(NS_IsMainThread());
180
181
RecursiveMutexAutoLock lock(mMutex);
182
183
LOG(("nsInputStreamPump::Cancel [this=%p status=%" PRIx32 "]\n", this,
184
static_cast<uint32_t>(status)));
185
186
if (NS_FAILED(mStatus)) {
187
LOG((" already canceled\n"));
188
return NS_OK;
189
}
190
191
NS_ASSERTION(NS_FAILED(status), "cancel with non-failure status code");
192
mStatus = status;
193
194
// close input stream
195
if (mAsyncStream) {
196
mAsyncStream->CloseWithStatus(status);
197
if (mSuspendCount == 0) EnsureWaiting();
198
// Otherwise, EnsureWaiting will be called by Resume().
199
// Note that while suspended, OnInputStreamReady will
200
// not do anything, and also note that calling asyncWait
201
// on a closed stream works and will dispatch an event immediately.
202
}
203
return NS_OK;
204
}
205
206
NS_IMETHODIMP
207
nsInputStreamPump::Suspend() {
208
RecursiveMutexAutoLock lock(mMutex);
209
210
LOG(("nsInputStreamPump::Suspend [this=%p]\n", this));
211
NS_ENSURE_TRUE(mState != STATE_IDLE, NS_ERROR_UNEXPECTED);
212
++mSuspendCount;
213
return NS_OK;
214
}
215
216
NS_IMETHODIMP
217
nsInputStreamPump::Resume() {
218
RecursiveMutexAutoLock lock(mMutex);
219
220
LOG(("nsInputStreamPump::Resume [this=%p]\n", this));
221
NS_ENSURE_TRUE(mSuspendCount > 0, NS_ERROR_UNEXPECTED);
222
NS_ENSURE_TRUE(mState != STATE_IDLE, NS_ERROR_UNEXPECTED);
223
224
// There is a brief in-between state when we null out mAsyncStream in
225
// OnStateStop() before calling OnStopRequest, and only afterwards set
226
// STATE_IDLE, which we need to handle gracefully.
227
if (--mSuspendCount == 0 && mAsyncStream) EnsureWaiting();
228
return NS_OK;
229
}
230
231
NS_IMETHODIMP
232
nsInputStreamPump::GetLoadFlags(nsLoadFlags* aLoadFlags) {
233
RecursiveMutexAutoLock lock(mMutex);
234
235
*aLoadFlags = mLoadFlags;
236
return NS_OK;
237
}
238
239
NS_IMETHODIMP
240
nsInputStreamPump::SetLoadFlags(nsLoadFlags aLoadFlags) {
241
RecursiveMutexAutoLock lock(mMutex);
242
243
mLoadFlags = aLoadFlags;
244
return NS_OK;
245
}
246
247
NS_IMETHODIMP
248
nsInputStreamPump::GetLoadGroup(nsILoadGroup** aLoadGroup) {
249
RecursiveMutexAutoLock lock(mMutex);
250
251
NS_IF_ADDREF(*aLoadGroup = mLoadGroup);
252
return NS_OK;
253
}
254
255
NS_IMETHODIMP
256
nsInputStreamPump::SetLoadGroup(nsILoadGroup* aLoadGroup) {
257
RecursiveMutexAutoLock lock(mMutex);
258
259
mLoadGroup = aLoadGroup;
260
return NS_OK;
261
}
262
263
//-----------------------------------------------------------------------------
264
// nsInputStreamPump::nsIInputStreamPump implementation
265
//-----------------------------------------------------------------------------
266
267
NS_IMETHODIMP
268
nsInputStreamPump::Init(nsIInputStream* stream, uint32_t segsize,
269
uint32_t segcount, bool closeWhenDone,
270
nsIEventTarget* mainThreadTarget) {
271
NS_ENSURE_TRUE(mState == STATE_IDLE, NS_ERROR_IN_PROGRESS);
272
273
mStream = stream;
274
mSegSize = segsize;
275
mSegCount = segcount;
276
mCloseWhenDone = closeWhenDone;
277
mLabeledMainThreadTarget = mainThreadTarget;
278
279
return NS_OK;
280
}
281
282
NS_IMETHODIMP
283
nsInputStreamPump::AsyncRead(nsIStreamListener* listener, nsISupports* ctxt) {
284
RecursiveMutexAutoLock lock(mMutex);
285
286
NS_ENSURE_TRUE(mState == STATE_IDLE, NS_ERROR_IN_PROGRESS);
287
NS_ENSURE_ARG_POINTER(listener);
288
MOZ_ASSERT(NS_IsMainThread(),
289
"nsInputStreamPump should be read from the "
290
"main thread only.");
291
292
//
293
// OK, we need to use the stream transport service if
294
//
295
// (1) the stream is blocking
296
// (2) the stream does not support nsIAsyncInputStream
297
//
298
299
bool nonBlocking;
300
nsresult rv = mStream->IsNonBlocking(&nonBlocking);
301
if (NS_FAILED(rv)) return rv;
302
303
if (nonBlocking) {
304
mAsyncStream = do_QueryInterface(mStream);
305
if (!mAsyncStream) {
306
rv = NonBlockingAsyncInputStream::Create(mStream.forget(),
307
getter_AddRefs(mAsyncStream));
308
if (NS_WARN_IF(NS_FAILED(rv))) return rv;
309
}
310
MOZ_ASSERT(mAsyncStream);
311
}
312
313
if (!mAsyncStream) {
314
// ok, let's use the stream transport service to read this stream.
315
nsCOMPtr<nsIStreamTransportService> sts =
316
do_GetService(kStreamTransportServiceCID, &rv);
317
if (NS_FAILED(rv)) return rv;
318
319
nsCOMPtr<nsITransport> transport;
320
rv = sts->CreateInputTransport(mStream, mCloseWhenDone,
321
getter_AddRefs(transport));
322
if (NS_FAILED(rv)) return rv;
323
324
nsCOMPtr<nsIInputStream> wrapper;
325
rv = transport->OpenInputStream(0, mSegSize, mSegCount,
326
getter_AddRefs(wrapper));
327
if (NS_FAILED(rv)) return rv;
328
329
mAsyncStream = do_QueryInterface(wrapper, &rv);
330
if (NS_FAILED(rv)) return rv;
331
}
332
333
// release our reference to the original stream. from this point forward,
334
// we only reference the "stream" via mAsyncStream.
335
mStream = nullptr;
336
337
// mStreamOffset now holds the number of bytes currently read.
338
mStreamOffset = 0;
339
340
// grab event queue (we must do this here by contract, since all notifications
341
// must go to the thread which called AsyncRead)
342
if (NS_IsMainThread() && mLabeledMainThreadTarget) {
343
mTargetThread = mLabeledMainThreadTarget;
344
} else {
345
mTargetThread = GetCurrentThreadEventTarget();
346
}
347
NS_ENSURE_STATE(mTargetThread);
348
349
rv = EnsureWaiting();
350
if (NS_FAILED(rv)) return rv;
351
352
if (mLoadGroup) mLoadGroup->AddRequest(this, nullptr);
353
354
mState = STATE_START;
355
mListener = listener;
356
return NS_OK;
357
}
358
359
//-----------------------------------------------------------------------------
360
// nsInputStreamPump::nsIInputStreamCallback implementation
361
//-----------------------------------------------------------------------------
362
363
NS_IMETHODIMP
364
nsInputStreamPump::OnInputStreamReady(nsIAsyncInputStream* stream) {
365
LOG(("nsInputStreamPump::OnInputStreamReady [this=%p]\n", this));
366
367
AUTO_PROFILER_LABEL("nsInputStreamPump::OnInputStreamReady", NETWORK);
368
369
// this function has been called from a PLEvent, so we can safely call
370
// any listener or progress sink methods directly from here.
371
372
for (;;) {
373
// There should only be one iteration of this loop happening at a time.
374
// To prevent AsyncWait() (called during callbacks or on other threads)
375
// from creating a parallel OnInputStreamReady(), we use:
376
// -- a mutex; and
377
// -- a boolean mProcessingCallbacks to detect parallel loops
378
// when exiting the mutex for callbacks.
379
RecursiveMutexAutoLock lock(mMutex);
380
381
// Prevent parallel execution during callbacks, while out of mutex.
382
if (mProcessingCallbacks) {
383
MOZ_ASSERT(!mProcessingCallbacks);
384
break;
385
}
386
mProcessingCallbacks = true;
387
if (mSuspendCount || mState == STATE_IDLE) {
388
mWaitingForInputStreamReady = false;
389
mProcessingCallbacks = false;
390
break;
391
}
392
393
uint32_t nextState;
394
switch (mState) {
395
case STATE_START:
396
nextState = OnStateStart();
397
break;
398
case STATE_TRANSFER:
399
nextState = OnStateTransfer();
400
break;
401
case STATE_STOP:
402
mRetargeting = false;
403
nextState = OnStateStop();
404
break;
405
default:
406
nextState = 0;
407
MOZ_ASSERT_UNREACHABLE("Unknown enum value.");
408
return NS_ERROR_UNEXPECTED;
409
}
410
411
bool stillTransferring =
412
(mState == STATE_TRANSFER && nextState == STATE_TRANSFER);
413
if (stillTransferring) {
414
NS_ASSERTION(NS_SUCCEEDED(mStatus),
415
"Should not have failed status for ongoing transfer");
416
} else {
417
NS_ASSERTION(mState != nextState,
418
"Only OnStateTransfer can be called more than once.");
419
}
420
if (mRetargeting) {
421
NS_ASSERTION(mState != STATE_STOP,
422
"Retargeting should not happen during OnStateStop.");
423
}
424
425
// Set mRetargeting so EnsureWaiting will be called. It ensures that
426
// OnStateStop is called on the main thread.
427
if (nextState == STATE_STOP && !NS_IsMainThread()) {
428
mRetargeting = true;
429
}
430
431
// Unset mProcessingCallbacks here (while we have lock) so our own call to
432
// EnsureWaiting isn't blocked by it.
433
mProcessingCallbacks = false;
434
435
// We must break the loop if suspended during one of the previous
436
// operation.
437
if (mSuspendCount) {
438
mState = nextState;
439
mWaitingForInputStreamReady = false;
440
break;
441
}
442
443
// Wait asynchronously if there is still data to transfer, or we're
444
// switching event delivery to another thread.
445
if (stillTransferring || mRetargeting) {
446
mState = nextState;
447
mWaitingForInputStreamReady = false;
448
nsresult rv = EnsureWaiting();
449
if (NS_SUCCEEDED(rv)) break;
450
451
// Failure to start asynchronous wait: stop transfer.
452
// Do not set mStatus if it was previously set to report a failure.
453
if (NS_SUCCEEDED(mStatus)) {
454
mStatus = rv;
455
}
456
nextState = STATE_STOP;
457
}
458
459
mState = nextState;
460
}
461
return NS_OK;
462
}
463
464
uint32_t nsInputStreamPump::OnStateStart() {
465
mMutex.AssertCurrentThreadIn();
466
467
AUTO_PROFILER_LABEL("nsInputStreamPump::OnStateStart", NETWORK);
468
469
LOG((" OnStateStart [this=%p]\n", this));
470
471
nsresult rv;
472
473
// need to check the reason why the stream is ready. this is required
474
// so our listener can check our status from OnStartRequest.
475
// XXX async streams should have a GetStatus method!
476
if (NS_SUCCEEDED(mStatus)) {
477
uint64_t avail;
478
rv = mAsyncStream->Available(&avail);
479
if (NS_FAILED(rv) && rv != NS_BASE_STREAM_CLOSED) mStatus = rv;
480
}
481
482
{
483
// Note: Must exit mutex for call to OnStartRequest to avoid
484
// deadlocks when calls to RetargetDeliveryTo for multiple
485
// nsInputStreamPumps are needed (e.g. nsHttpChannel).
486
RecursiveMutexAutoUnlock unlock(mMutex);
487
rv = mListener->OnStartRequest(this);
488
}
489
490
// an error returned from OnStartRequest should cause us to abort; however,
491
// we must not stomp on mStatus if already canceled.
492
if (NS_FAILED(rv) && NS_SUCCEEDED(mStatus)) mStatus = rv;
493
494
return NS_SUCCEEDED(mStatus) ? STATE_TRANSFER : STATE_STOP;
495
}
496
497
uint32_t nsInputStreamPump::OnStateTransfer() {
498
mMutex.AssertCurrentThreadIn();
499
500
AUTO_PROFILER_LABEL("nsInputStreamPump::OnStateTransfer", NETWORK);
501
502
LOG((" OnStateTransfer [this=%p]\n", this));
503
504
// if canceled, go directly to STATE_STOP...
505
if (NS_FAILED(mStatus)) return STATE_STOP;
506
507
nsresult rv = CreateBufferedStreamIfNeeded();
508
if (NS_WARN_IF(NS_FAILED(rv))) {
509
return STATE_STOP;
510
}
511
512
uint64_t avail;
513
rv = mAsyncStream->Available(&avail);
514
LOG((" Available returned [stream=%p rv=%" PRIx32 " avail=%" PRIu64 "]\n",
515
mAsyncStream.get(), static_cast<uint32_t>(rv), avail));
516
517
if (rv == NS_BASE_STREAM_CLOSED) {
518
rv = NS_OK;
519
avail = 0;
520
} else if (NS_SUCCEEDED(rv) && avail) {
521
// we used to limit avail to 16K - we were afraid some ODA handlers
522
// might assume they wouldn't get more than 16K at once
523
// we're removing that limit since it speeds up local file access.
524
// Now there's an implicit 64K limit of 4 16K segments
525
// NOTE: ok, so the story is as follows. OnDataAvailable impls
526
// are by contract supposed to consume exactly |avail| bytes.
527
// however, many do not... mailnews... stream converters...
528
// cough, cough. the input stream pump is fairly tolerant
529
// in this regard; however, if an ODA does not consume any
530
// data from the stream, then we could potentially end up in
531
// an infinite loop. we do our best here to try to catch
532
// such an error. (see bug 189672)
533
534
// in most cases this QI will succeed (mAsyncStream is almost always
535
// a nsPipeInputStream, which implements nsITellableStream::Tell).
536
int64_t offsetBefore;
537
nsCOMPtr<nsITellableStream> tellable = do_QueryInterface(mAsyncStream);
538
if (tellable && NS_FAILED(tellable->Tell(&offsetBefore))) {
539
MOZ_ASSERT_UNREACHABLE("Tell failed on readable stream");
540
offsetBefore = 0;
541
}
542
543
uint32_t odaAvail = avail > UINT32_MAX ? UINT32_MAX : uint32_t(avail);
544
545
LOG((" calling OnDataAvailable [offset=%" PRIu64 " count=%" PRIu64
546
"(%u)]\n",
547
mStreamOffset, avail, odaAvail));
548
549
{
550
// Note: Must exit mutex for call to OnStartRequest to avoid
551
// deadlocks when calls to RetargetDeliveryTo for multiple
552
// nsInputStreamPumps are needed (e.g. nsHttpChannel).
553
RecursiveMutexAutoUnlock unlock(mMutex);
554
rv = mListener->OnDataAvailable(this, mAsyncStream, mStreamOffset,
555
odaAvail);
556
}
557
558
// don't enter this code if ODA failed or called Cancel
559
if (NS_SUCCEEDED(rv) && NS_SUCCEEDED(mStatus)) {
560
// test to see if this ODA failed to consume data
561
if (tellable) {
562
// NOTE: if Tell fails, which can happen if the stream is
563
// now closed, then we assume that everything was read.
564
int64_t offsetAfter;
565
if (NS_FAILED(tellable->Tell(&offsetAfter)))
566
offsetAfter = offsetBefore + odaAvail;
567
if (offsetAfter > offsetBefore)
568
mStreamOffset += (offsetAfter - offsetBefore);
569
else if (mSuspendCount == 0) {
570
//
571
// possible infinite loop if we continue pumping data!
572
//
573
// NOTE: although not allowed by nsIStreamListener, we
574
// will allow the ODA impl to Suspend the pump. IMAP
575
// does this :-(
576
//
577
NS_ERROR("OnDataAvailable implementation consumed no data");
578
mStatus = NS_ERROR_UNEXPECTED;
579
}
580
} else
581
mStreamOffset += odaAvail; // assume ODA behaved well
582
}
583
}
584
585
// an error returned from Available or OnDataAvailable should cause us to
586
// abort; however, we must not stop on mStatus if already canceled.
587
588
if (NS_SUCCEEDED(mStatus)) {
589
if (NS_FAILED(rv))
590
mStatus = rv;
591
else if (avail) {
592
// if stream is now closed, advance to STATE_STOP right away.
593
// Available may return 0 bytes available at the moment; that
594
// would not mean that we are done.
595
// XXX async streams should have a GetStatus method!
596
rv = mAsyncStream->Available(&avail);
597
if (NS_SUCCEEDED(rv)) return STATE_TRANSFER;
598
if (rv != NS_BASE_STREAM_CLOSED) mStatus = rv;
599
}
600
}
601
return STATE_STOP;
602
}
603
604
nsresult nsInputStreamPump::CallOnStateStop() {
605
RecursiveMutexAutoLock lock(mMutex);
606
607
MOZ_ASSERT(NS_IsMainThread(),
608
"CallOnStateStop should only be called on the main thread.");
609
610
mState = OnStateStop();
611
return NS_OK;
612
}
613
614
uint32_t nsInputStreamPump::OnStateStop() {
615
mMutex.AssertCurrentThreadIn();
616
617
if (!NS_IsMainThread()) {
618
// This method can be called on a different thread if nsInputStreamPump
619
// is used off the main-thread.
620
nsresult rv = mLabeledMainThreadTarget->Dispatch(
621
NewRunnableMethod("nsInputStreamPump::CallOnStateStop", this,
622
&nsInputStreamPump::CallOnStateStop));
623
NS_ENSURE_SUCCESS(rv, STATE_IDLE);
624
return STATE_IDLE;
625
}
626
627
AUTO_PROFILER_LABEL("nsInputStreamPump::OnStateStop", NETWORK);
628
629
LOG((" OnStateStop [this=%p status=%" PRIx32 "]\n", this,
630
static_cast<uint32_t>(mStatus)));
631
632
// if an error occurred, we must be sure to pass the error onto the async
633
// stream. in some cases, this is redundant, but since close is idempotent,
634
// this is OK. otherwise, be sure to honor the "close-when-done" option.
635
636
if (!mAsyncStream || !mListener) {
637
MOZ_ASSERT(mAsyncStream, "null mAsyncStream: OnStateStop called twice?");
638
MOZ_ASSERT(mListener, "null mListener: OnStateStop called twice?");
639
return STATE_IDLE;
640
}
641
642
if (NS_FAILED(mStatus))
643
mAsyncStream->CloseWithStatus(mStatus);
644
else if (mCloseWhenDone)
645
mAsyncStream->Close();
646
647
mAsyncStream = nullptr;
648
mTargetThread = nullptr;
649
mIsPending = false;
650
{
651
// Note: Must exit mutex for call to OnStartRequest to avoid
652
// deadlocks when calls to RetargetDeliveryTo for multiple
653
// nsInputStreamPumps are needed (e.g. nsHttpChannel).
654
RecursiveMutexAutoUnlock unlock(mMutex);
655
mListener->OnStopRequest(this, mStatus);
656
}
657
mListener = nullptr;
658
659
if (mLoadGroup) mLoadGroup->RemoveRequest(this, nullptr, mStatus);
660
661
return STATE_IDLE;
662
}
663
664
nsresult nsInputStreamPump::CreateBufferedStreamIfNeeded() {
665
if (mAsyncStreamIsBuffered) {
666
return NS_OK;
667
}
668
669
// ReadSegments is not available for any nsIAsyncInputStream. In order to use
670
// it, we wrap a nsIBufferedInputStream around it, if needed.
671
672
if (NS_InputStreamIsBuffered(mAsyncStream)) {
673
mAsyncStreamIsBuffered = true;
674
return NS_OK;
675
}
676
677
nsCOMPtr<nsIInputStream> stream;
678
nsresult rv = NS_NewBufferedInputStream(getter_AddRefs(stream),
679
mAsyncStream.forget(), 4096);
680
NS_ENSURE_SUCCESS(rv, rv);
681
682
// A buffered inputStream must implement nsIAsyncInputStream.
683
mAsyncStream = do_QueryInterface(stream);
684
MOZ_DIAGNOSTIC_ASSERT(mAsyncStream);
685
mAsyncStreamIsBuffered = true;
686
687
return NS_OK;
688
}
689
690
//-----------------------------------------------------------------------------
691
// nsIThreadRetargetableRequest
692
//-----------------------------------------------------------------------------
693
694
NS_IMETHODIMP
695
nsInputStreamPump::RetargetDeliveryTo(nsIEventTarget* aNewTarget) {
696
RecursiveMutexAutoLock lock(mMutex);
697
698
NS_ENSURE_ARG(aNewTarget);
699
NS_ENSURE_TRUE(mState == STATE_START || mState == STATE_TRANSFER,
700
NS_ERROR_UNEXPECTED);
701
702
// If canceled, do not retarget. Return with canceled status.
703
if (NS_FAILED(mStatus)) {
704
return mStatus;
705
}
706
707
if (aNewTarget == mTargetThread) {
708
NS_WARNING("Retargeting delivery to same thread");
709
return NS_OK;
710
}
711
712
// Ensure that |mListener| and any subsequent listeners can be retargeted
713
// to another thread.
714
nsresult rv = NS_OK;
715
nsCOMPtr<nsIThreadRetargetableStreamListener> retargetableListener =
716
do_QueryInterface(mListener, &rv);
717
if (NS_SUCCEEDED(rv) && retargetableListener) {
718
rv = retargetableListener->CheckListenerChain();
719
if (NS_SUCCEEDED(rv)) {
720
mTargetThread = aNewTarget;
721
mRetargeting = true;
722
}
723
}
724
LOG(
725
("nsInputStreamPump::RetargetDeliveryTo [this=%p aNewTarget=%p] "
726
"%s listener [%p] rv[%" PRIx32 "]",
727
this, aNewTarget, (mTargetThread == aNewTarget ? "success" : "failure"),
728
(nsIStreamListener*)mListener, static_cast<uint32_t>(rv)));
729
return rv;
730
}
731
732
NS_IMETHODIMP
733
nsInputStreamPump::GetDeliveryTarget(nsIEventTarget** aNewTarget) {
734
RecursiveMutexAutoLock lock(mMutex);
735
736
nsCOMPtr<nsIEventTarget> target = mTargetThread;
737
target.forget(aNewTarget);
738
return NS_OK;
739
}