Source code

Revision control

Other Tools

1
/* This Source Code Form is subject to the terms of the Mozilla Public
2
* License, v. 2.0. If a copy of the MPL was not distributed with this
3
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
4
5
#include "nsAsyncStreamCopier.h"
6
#include "nsIOService.h"
7
#include "nsIEventTarget.h"
8
#include "nsStreamUtils.h"
9
#include "nsThreadUtils.h"
10
#include "nsNetUtil.h"
11
#include "nsNetCID.h"
12
#include "nsIBufferedStreams.h"
13
#include "nsIRequestObserver.h"
14
#include "mozilla/Logging.h"
15
16
using namespace mozilla;
17
using namespace mozilla::net;
18
19
#undef LOG
20
//
21
// MOZ_LOG=nsStreamCopier:5
22
//
23
static LazyLogModule gStreamCopierLog("nsStreamCopier");
24
#define LOG(args) MOZ_LOG(gStreamCopierLog, mozilla::LogLevel::Debug, args)
25
26
/**
27
* An event used to perform initialization off the main thread.
28
*/
29
class AsyncApplyBufferingPolicyEvent final : public Runnable {
30
public:
31
/**
32
* @param aCopier
33
* The nsAsyncStreamCopier requesting the information.
34
*/
35
explicit AsyncApplyBufferingPolicyEvent(nsAsyncStreamCopier* aCopier)
36
: mozilla::Runnable("AsyncApplyBufferingPolicyEvent"),
37
mCopier(aCopier),
38
mTarget(GetCurrentThreadEventTarget()) {}
39
40
NS_IMETHOD Run() override {
41
nsresult rv = mCopier->ApplyBufferingPolicy();
42
if (NS_FAILED(rv)) {
43
mCopier->Cancel(rv);
44
return NS_OK;
45
}
46
47
rv = mTarget->Dispatch(
48
NewRunnableMethod("nsAsyncStreamCopier::AsyncCopyInternal", mCopier,
49
&nsAsyncStreamCopier::AsyncCopyInternal),
50
NS_DISPATCH_NORMAL);
51
MOZ_ASSERT(NS_SUCCEEDED(rv));
52
53
if (NS_FAILED(rv)) {
54
mCopier->Cancel(rv);
55
}
56
return NS_OK;
57
}
58
59
private:
60
RefPtr<nsAsyncStreamCopier> mCopier;
61
nsCOMPtr<nsIEventTarget> mTarget;
62
};
63
64
//-----------------------------------------------------------------------------
65
66
nsAsyncStreamCopier::nsAsyncStreamCopier()
67
: mLock("nsAsyncStreamCopier.mLock"),
68
mMode(NS_ASYNCCOPY_VIA_READSEGMENTS),
69
mChunkSize(nsIOService::gDefaultSegmentSize),
70
mStatus(NS_OK),
71
mIsPending(false),
72
mCloseSource{false},
73
mCloseSink{false},
74
mShouldSniffBuffering(false) {
75
LOG(("Creating nsAsyncStreamCopier @%p\n", this));
76
}
77
78
nsAsyncStreamCopier::~nsAsyncStreamCopier() {
79
LOG(("Destroying nsAsyncStreamCopier @%p\n", this));
80
}
81
82
bool nsAsyncStreamCopier::IsComplete(nsresult* status) {
83
MutexAutoLock lock(mLock);
84
if (status) *status = mStatus;
85
return !mIsPending;
86
}
87
88
nsIRequest* nsAsyncStreamCopier::AsRequest() {
89
return static_cast<nsIRequest*>(static_cast<nsIAsyncStreamCopier*>(this));
90
}
91
92
void nsAsyncStreamCopier::Complete(nsresult status) {
93
LOG(("nsAsyncStreamCopier::Complete [this=%p status=%" PRIx32 "]\n", this,
94
static_cast<uint32_t>(status)));
95
96
nsCOMPtr<nsIRequestObserver> observer;
97
nsCOMPtr<nsISupports> ctx;
98
{
99
MutexAutoLock lock(mLock);
100
mCopierCtx = nullptr;
101
102
if (mIsPending) {
103
mIsPending = false;
104
mStatus = status;
105
106
// setup OnStopRequest callback and release references...
107
observer = mObserver;
108
mObserver = nullptr;
109
}
110
}
111
112
if (observer) {
113
LOG((" calling OnStopRequest [status=%" PRIx32 "]\n",
114
static_cast<uint32_t>(status)));
115
observer->OnStopRequest(AsRequest(), status);
116
}
117
}
118
119
void nsAsyncStreamCopier::OnAsyncCopyComplete(void* closure, nsresult status) {
120
// AddRef'd in AsyncCopy. Will be released at the end of the method.
121
RefPtr<nsAsyncStreamCopier> self = dont_AddRef((nsAsyncStreamCopier*)closure);
122
self->Complete(status);
123
}
124
125
//-----------------------------------------------------------------------------
126
// nsISupports
127
128
// We cannot use simply NS_IMPL_ISUPPORTSx as both
129
// nsIAsyncStreamCopier and nsIAsyncStreamCopier2 implement nsIRequest
130
131
NS_IMPL_ADDREF(nsAsyncStreamCopier)
132
NS_IMPL_RELEASE(nsAsyncStreamCopier)
133
NS_INTERFACE_TABLE_HEAD(nsAsyncStreamCopier)
134
NS_INTERFACE_TABLE_BEGIN
135
NS_INTERFACE_TABLE_ENTRY(nsAsyncStreamCopier, nsIAsyncStreamCopier)
136
NS_INTERFACE_TABLE_ENTRY(nsAsyncStreamCopier, nsIAsyncStreamCopier2)
137
NS_INTERFACE_TABLE_ENTRY_AMBIGUOUS(nsAsyncStreamCopier, nsIRequest,
138
nsIAsyncStreamCopier)
139
NS_INTERFACE_TABLE_ENTRY_AMBIGUOUS(nsAsyncStreamCopier, nsISupports,
140
nsIAsyncStreamCopier)
141
NS_INTERFACE_TABLE_END
142
NS_INTERFACE_TABLE_TAIL
143
144
//-----------------------------------------------------------------------------
145
// nsIRequest
146
147
NS_IMETHODIMP
148
nsAsyncStreamCopier::GetName(nsACString& name) {
149
name.Truncate();
150
return NS_OK;
151
}
152
153
NS_IMETHODIMP
154
nsAsyncStreamCopier::IsPending(bool* result) {
155
*result = !IsComplete();
156
return NS_OK;
157
}
158
159
NS_IMETHODIMP
160
nsAsyncStreamCopier::GetStatus(nsresult* status) {
161
IsComplete(status);
162
return NS_OK;
163
}
164
165
NS_IMETHODIMP
166
nsAsyncStreamCopier::Cancel(nsresult status) {
167
nsCOMPtr<nsISupports> copierCtx;
168
{
169
MutexAutoLock lock(mLock);
170
if (!mIsPending) return NS_OK;
171
copierCtx.swap(mCopierCtx);
172
}
173
174
if (NS_SUCCEEDED(status)) {
175
NS_WARNING("cancel with non-failure status code");
176
status = NS_BASE_STREAM_CLOSED;
177
}
178
179
if (copierCtx) NS_CancelAsyncCopy(copierCtx, status);
180
181
return NS_OK;
182
}
183
184
NS_IMETHODIMP
185
nsAsyncStreamCopier::Suspend() {
186
MOZ_ASSERT_UNREACHABLE("nsAsyncStreamCopier::Suspend");
187
return NS_ERROR_NOT_IMPLEMENTED;
188
}
189
190
NS_IMETHODIMP
191
nsAsyncStreamCopier::Resume() {
192
MOZ_ASSERT_UNREACHABLE("nsAsyncStreamCopier::Resume");
193
return NS_ERROR_NOT_IMPLEMENTED;
194
}
195
196
NS_IMETHODIMP
197
nsAsyncStreamCopier::GetLoadFlags(nsLoadFlags* aLoadFlags) {
198
*aLoadFlags = LOAD_NORMAL;
199
return NS_OK;
200
}
201
202
NS_IMETHODIMP
203
nsAsyncStreamCopier::SetLoadFlags(nsLoadFlags aLoadFlags) { return NS_OK; }
204
205
NS_IMETHODIMP
206
nsAsyncStreamCopier::GetLoadGroup(nsILoadGroup** aLoadGroup) {
207
*aLoadGroup = nullptr;
208
return NS_OK;
209
}
210
211
NS_IMETHODIMP
212
nsAsyncStreamCopier::SetLoadGroup(nsILoadGroup* aLoadGroup) { return NS_OK; }
213
214
nsresult nsAsyncStreamCopier::InitInternal(nsIInputStream* source,
215
nsIOutputStream* sink,
216
nsIEventTarget* target,
217
uint32_t chunkSize, bool closeSource,
218
bool closeSink) {
219
NS_ASSERTION(!mSource && !mSink, "Init() called more than once");
220
if (chunkSize == 0) {
221
chunkSize = nsIOService::gDefaultSegmentSize;
222
}
223
mChunkSize = chunkSize;
224
225
mSource = source;
226
mSink = sink;
227
mCloseSource = closeSource;
228
mCloseSink = closeSink;
229
230
if (target) {
231
mTarget = target;
232
} else {
233
nsresult rv;
234
mTarget = do_GetService(NS_STREAMTRANSPORTSERVICE_CONTRACTID, &rv);
235
if (NS_FAILED(rv)) {
236
return rv;
237
}
238
}
239
240
return NS_OK;
241
}
242
243
//-----------------------------------------------------------------------------
244
// nsIAsyncStreamCopier
245
246
NS_IMETHODIMP
247
nsAsyncStreamCopier::Init(nsIInputStream* source, nsIOutputStream* sink,
248
nsIEventTarget* target, bool sourceBuffered,
249
bool sinkBuffered, uint32_t chunkSize,
250
bool closeSource, bool closeSink) {
251
NS_ASSERTION(sourceBuffered || sinkBuffered,
252
"at least one stream must be buffered");
253
mMode = sourceBuffered ? NS_ASYNCCOPY_VIA_READSEGMENTS
254
: NS_ASYNCCOPY_VIA_WRITESEGMENTS;
255
256
return InitInternal(source, sink, target, chunkSize, closeSource, closeSink);
257
}
258
259
//-----------------------------------------------------------------------------
260
// nsIAsyncStreamCopier2
261
262
NS_IMETHODIMP
263
nsAsyncStreamCopier::Init(nsIInputStream* source, nsIOutputStream* sink,
264
nsIEventTarget* target, uint32_t chunkSize,
265
bool closeSource, bool closeSink) {
266
mShouldSniffBuffering = true;
267
268
return InitInternal(source, sink, target, chunkSize, closeSource, closeSink);
269
}
270
271
/**
272
* Detect whether the input or the output stream is buffered,
273
* bufferize one of them if neither is buffered.
274
*/
275
nsresult nsAsyncStreamCopier::ApplyBufferingPolicy() {
276
// This function causes I/O, it must not be executed on the main
277
// thread.
278
MOZ_ASSERT(!NS_IsMainThread());
279
280
if (NS_OutputStreamIsBuffered(mSink)) {
281
// Sink is buffered, no need to perform additional buffering
282
mMode = NS_ASYNCCOPY_VIA_WRITESEGMENTS;
283
return NS_OK;
284
}
285
if (NS_InputStreamIsBuffered(mSource)) {
286
// Source is buffered, no need to perform additional buffering
287
mMode = NS_ASYNCCOPY_VIA_READSEGMENTS;
288
return NS_OK;
289
}
290
291
// No buffering, let's buffer the sink
292
nsresult rv;
293
nsCOMPtr<nsIBufferedOutputStream> sink =
294
do_CreateInstance(NS_BUFFEREDOUTPUTSTREAM_CONTRACTID, &rv);
295
if (NS_FAILED(rv)) {
296
return rv;
297
}
298
299
rv = sink->Init(mSink, mChunkSize);
300
if (NS_FAILED(rv)) {
301
return rv;
302
}
303
304
mMode = NS_ASYNCCOPY_VIA_WRITESEGMENTS;
305
mSink = sink;
306
return NS_OK;
307
}
308
309
//-----------------------------------------------------------------------------
310
// Both nsIAsyncStreamCopier and nsIAsyncStreamCopier2
311
312
NS_IMETHODIMP
313
nsAsyncStreamCopier::AsyncCopy(nsIRequestObserver* observer, nsISupports* ctx) {
314
LOG(("nsAsyncStreamCopier::AsyncCopy [this=%p observer=%p]\n", this,
315
observer));
316
317
NS_ASSERTION(mSource && mSink, "not initialized");
318
nsresult rv;
319
320
if (observer) {
321
// build proxy for observer events
322
rv = NS_NewRequestObserverProxy(getter_AddRefs(mObserver), observer, ctx);
323
if (NS_FAILED(rv)) return rv;
324
}
325
326
// from this point forward, AsyncCopy is going to return NS_OK. any errors
327
// will be reported via OnStopRequest.
328
mIsPending = true;
329
330
if (mObserver) {
331
rv = mObserver->OnStartRequest(AsRequest());
332
if (NS_FAILED(rv)) Cancel(rv);
333
}
334
335
if (!mShouldSniffBuffering) {
336
// No buffer sniffing required, let's proceed
337
AsyncCopyInternal();
338
return NS_OK;
339
}
340
341
if (NS_IsMainThread()) {
342
// Don't perform buffer sniffing on the main thread
343
nsCOMPtr<nsIRunnable> event = new AsyncApplyBufferingPolicyEvent(this);
344
rv = mTarget->Dispatch(event, NS_DISPATCH_NORMAL);
345
if (NS_FAILED(rv)) {
346
Cancel(rv);
347
}
348
return NS_OK;
349
}
350
351
// We're not going to block the main thread, so let's sniff here
352
rv = ApplyBufferingPolicy();
353
if (NS_FAILED(rv)) {
354
Cancel(rv);
355
}
356
AsyncCopyInternal();
357
return NS_OK;
358
}
359
360
// Launch async copy.
361
// All errors are reported through the observer.
362
void nsAsyncStreamCopier::AsyncCopyInternal() {
363
MOZ_ASSERT(mMode == NS_ASYNCCOPY_VIA_READSEGMENTS ||
364
mMode == NS_ASYNCCOPY_VIA_WRITESEGMENTS);
365
366
nsresult rv;
367
// We want to receive progress notifications; release happens in
368
// OnAsyncCopyComplete.
369
RefPtr<nsAsyncStreamCopier> self = this;
370
{
371
MutexAutoLock lock(mLock);
372
rv = NS_AsyncCopy(mSource, mSink, mTarget, mMode, mChunkSize,
373
OnAsyncCopyComplete, this, mCloseSource, mCloseSink,
374
getter_AddRefs(mCopierCtx));
375
}
376
if (NS_FAILED(rv)) {
377
Cancel(rv);
378
return; // release self
379
}
380
381
Unused << self.forget(); // Will be released in OnAsyncCopyComplete
382
}