Source code

Revision control

Other Tools

1
/* This Source Code Form is subject to the terms of the Mozilla Public
2
* License, v. 2.0. If a copy of the MPL was not distributed with this
3
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
4
5
#include "nsStreamTransportService.h"
6
#include "nsXPCOMCIDInternal.h"
7
#include "nsNetSegmentUtils.h"
8
#include "nsTransportUtils.h"
9
#include "nsStreamUtils.h"
10
#include "nsError.h"
11
#include "nsNetCID.h"
12
13
#include "nsIAsyncInputStream.h"
14
#include "nsIAsyncOutputStream.h"
15
#include "nsISeekableStream.h"
16
#include "nsIPipe.h"
17
#include "nsITransport.h"
18
#include "nsIObserverService.h"
19
#include "nsThreadPool.h"
20
#include "mozilla/Services.h"
21
22
namespace mozilla {
23
namespace net {
24
25
//-----------------------------------------------------------------------------
26
// nsInputStreamTransport
27
//
28
// Implements nsIInputStream as a wrapper around the real input stream. This
29
// allows the transport to support seeking, range-limiting, progress reporting,
30
// and close-when-done semantics while utilizing NS_AsyncCopy.
31
//-----------------------------------------------------------------------------
32
33
class nsInputStreamTransport : public nsITransport, public nsIInputStream {
34
public:
35
// Record refcount changes to ensure that stream transports are destroyed
36
// on consistent threads when recording/replaying.
37
NS_DECL_THREADSAFE_ISUPPORTS_WITH_RECORDING(recordreplay::Behavior::Preserve)
38
NS_DECL_NSITRANSPORT
39
NS_DECL_NSIINPUTSTREAM
40
41
nsInputStreamTransport(nsIInputStream* source, bool closeWhenDone)
42
: mSource(source),
43
mOffset(0),
44
mCloseWhenDone(closeWhenDone),
45
mInProgress(false) {}
46
47
private:
48
virtual ~nsInputStreamTransport() = default;
49
50
nsCOMPtr<nsIAsyncInputStream> mPipeIn;
51
52
// while the copy is active, these members may only be accessed from the
53
// nsIInputStream implementation.
54
nsCOMPtr<nsITransportEventSink> mEventSink;
55
nsCOMPtr<nsIInputStream> mSource;
56
int64_t mOffset;
57
bool mCloseWhenDone;
58
59
// this variable serves as a lock to prevent the state of the transport
60
// from being modified once the copy is in progress.
61
bool mInProgress;
62
};
63
64
NS_IMPL_ISUPPORTS(nsInputStreamTransport, nsITransport, nsIInputStream)
65
66
/** nsITransport **/
67
68
NS_IMETHODIMP
69
nsInputStreamTransport::OpenInputStream(uint32_t flags, uint32_t segsize,
70
uint32_t segcount,
71
nsIInputStream** result) {
72
NS_ENSURE_TRUE(!mInProgress, NS_ERROR_IN_PROGRESS);
73
74
nsresult rv;
75
nsCOMPtr<nsIEventTarget> target =
76
do_GetService(NS_STREAMTRANSPORTSERVICE_CONTRACTID, &rv);
77
if (NS_FAILED(rv)) return rv;
78
79
// XXX if the caller requests an unbuffered stream, then perhaps
80
// we'd want to simply return mSource; however, then we would
81
// not be reading mSource on a background thread. is this ok?
82
83
bool nonblocking = !(flags & OPEN_BLOCKING);
84
85
net_ResolveSegmentParams(segsize, segcount);
86
87
nsCOMPtr<nsIAsyncOutputStream> pipeOut;
88
rv = NS_NewPipe2(getter_AddRefs(mPipeIn), getter_AddRefs(pipeOut),
89
nonblocking, true, segsize, segcount);
90
if (NS_FAILED(rv)) return rv;
91
92
mInProgress = true;
93
94
// startup async copy process...
95
rv = NS_AsyncCopy(this, pipeOut, target, NS_ASYNCCOPY_VIA_WRITESEGMENTS,
96
segsize);
97
if (NS_SUCCEEDED(rv)) NS_ADDREF(*result = mPipeIn);
98
99
return rv;
100
}
101
102
NS_IMETHODIMP
103
nsInputStreamTransport::OpenOutputStream(uint32_t flags, uint32_t segsize,
104
uint32_t segcount,
105
nsIOutputStream** result) {
106
// this transport only supports reading!
107
MOZ_ASSERT_UNREACHABLE("nsInputStreamTransport::OpenOutputStream");
108
return NS_ERROR_UNEXPECTED;
109
}
110
111
NS_IMETHODIMP
112
nsInputStreamTransport::Close(nsresult reason) {
113
if (NS_SUCCEEDED(reason)) reason = NS_BASE_STREAM_CLOSED;
114
115
return mPipeIn->CloseWithStatus(reason);
116
}
117
118
NS_IMETHODIMP
119
nsInputStreamTransport::SetEventSink(nsITransportEventSink* sink,
120
nsIEventTarget* target) {
121
NS_ENSURE_TRUE(!mInProgress, NS_ERROR_IN_PROGRESS);
122
123
if (target)
124
return net_NewTransportEventSinkProxy(getter_AddRefs(mEventSink), sink,
125
target);
126
127
mEventSink = sink;
128
return NS_OK;
129
}
130
131
/** nsIInputStream **/
132
133
NS_IMETHODIMP
134
nsInputStreamTransport::Close() {
135
if (mCloseWhenDone) mSource->Close();
136
137
// make additional reads return early...
138
mOffset = 0;
139
return NS_OK;
140
}
141
142
NS_IMETHODIMP
143
nsInputStreamTransport::Available(uint64_t* result) {
144
return NS_ERROR_NOT_IMPLEMENTED;
145
}
146
147
NS_IMETHODIMP
148
nsInputStreamTransport::Read(char* buf, uint32_t count, uint32_t* result) {
149
nsresult rv = mSource->Read(buf, count, result);
150
151
if (NS_SUCCEEDED(rv)) {
152
mOffset += *result;
153
if (mEventSink)
154
mEventSink->OnTransportStatus(this, NS_NET_STATUS_READING, mOffset, -1);
155
}
156
return rv;
157
}
158
159
NS_IMETHODIMP
160
nsInputStreamTransport::ReadSegments(nsWriteSegmentFun writer, void* closure,
161
uint32_t count, uint32_t* result) {
162
return NS_ERROR_NOT_IMPLEMENTED;
163
}
164
165
NS_IMETHODIMP
166
nsInputStreamTransport::IsNonBlocking(bool* result) {
167
*result = false;
168
return NS_OK;
169
}
170
171
//-----------------------------------------------------------------------------
172
// nsStreamTransportService
173
//-----------------------------------------------------------------------------
174
175
nsStreamTransportService::~nsStreamTransportService() {
176
NS_ASSERTION(!mPool, "thread pool wasn't shutdown");
177
}
178
179
nsresult nsStreamTransportService::Init() {
180
mPool = new nsThreadPool();
181
182
// Configure the pool
183
mPool->SetName(NS_LITERAL_CSTRING("StreamTrans"));
184
mPool->SetThreadLimit(25);
185
mPool->SetIdleThreadLimit(5);
186
mPool->SetIdleThreadTimeoutRegressive(true);
187
mPool->SetIdleThreadTimeout(PR_SecondsToInterval(30));
188
189
nsCOMPtr<nsIObserverService> obsSvc = mozilla::services::GetObserverService();
190
if (obsSvc) obsSvc->AddObserver(this, "xpcom-shutdown-threads", false);
191
return NS_OK;
192
}
193
194
NS_IMPL_ISUPPORTS(nsStreamTransportService, nsIStreamTransportService,
195
nsIEventTarget, nsIObserver)
196
197
NS_IMETHODIMP
198
nsStreamTransportService::DispatchFromScript(nsIRunnable* task,
199
uint32_t flags) {
200
nsCOMPtr<nsIRunnable> event(task);
201
return Dispatch(event.forget(), flags);
202
}
203
204
NS_IMETHODIMP
205
nsStreamTransportService::Dispatch(already_AddRefed<nsIRunnable> task,
206
uint32_t flags) {
207
nsCOMPtr<nsIRunnable> event(task); // so it gets released on failure paths
208
nsCOMPtr<nsIThreadPool> pool;
209
{
210
mozilla::MutexAutoLock lock(mShutdownLock);
211
if (mIsShutdown) {
212
return NS_ERROR_NOT_INITIALIZED;
213
}
214
pool = mPool;
215
}
216
NS_ENSURE_TRUE(pool, NS_ERROR_NOT_INITIALIZED);
217
return pool->Dispatch(event.forget(), flags);
218
}
219
220
NS_IMETHODIMP
221
nsStreamTransportService::DelayedDispatch(already_AddRefed<nsIRunnable>,
222
uint32_t) {
223
return NS_ERROR_NOT_IMPLEMENTED;
224
}
225
226
NS_IMETHODIMP_(bool)
227
nsStreamTransportService::IsOnCurrentThreadInfallible() {
228
nsCOMPtr<nsIThreadPool> pool;
229
{
230
mozilla::MutexAutoLock lock(mShutdownLock);
231
pool = mPool;
232
}
233
if (!pool) {
234
return false;
235
}
236
return pool->IsOnCurrentThread();
237
}
238
239
NS_IMETHODIMP
240
nsStreamTransportService::IsOnCurrentThread(bool* result) {
241
nsCOMPtr<nsIThreadPool> pool;
242
{
243
mozilla::MutexAutoLock lock(mShutdownLock);
244
if (mIsShutdown) {
245
return NS_ERROR_NOT_INITIALIZED;
246
}
247
pool = mPool;
248
}
249
NS_ENSURE_TRUE(pool, NS_ERROR_NOT_INITIALIZED);
250
return pool->IsOnCurrentThread(result);
251
}
252
253
NS_IMETHODIMP
254
nsStreamTransportService::CreateInputTransport(nsIInputStream* stream,
255
bool closeWhenDone,
256
nsITransport** result) {
257
nsInputStreamTransport* trans =
258
new nsInputStreamTransport(stream, closeWhenDone);
259
if (!trans) return NS_ERROR_OUT_OF_MEMORY;
260
NS_ADDREF(*result = trans);
261
return NS_OK;
262
}
263
264
NS_IMETHODIMP
265
nsStreamTransportService::Observe(nsISupports* subject, const char* topic,
266
const char16_t* data) {
267
NS_ASSERTION(strcmp(topic, "xpcom-shutdown-threads") == 0, "oops");
268
269
{
270
mozilla::MutexAutoLock lock(mShutdownLock);
271
mIsShutdown = true;
272
}
273
274
if (mPool) {
275
mPool->Shutdown();
276
mPool = nullptr;
277
}
278
return NS_OK;
279
}
280
281
class AvailableEvent final : public Runnable {
282
public:
283
AvailableEvent(nsIInputStream* stream, nsIInputAvailableCallback* callback)
284
: Runnable("net::AvailableEvent"),
285
mStream(stream),
286
mCallback(callback),
287
mDoingCallback(false),
288
mSize(0),
289
mResultForCallback(NS_OK) {
290
mCallbackTarget = GetCurrentThreadEventTarget();
291
}
292
293
NS_IMETHOD Run() override {
294
if (mDoingCallback) {
295
// pong
296
mCallback->OnInputAvailableComplete(mSize, mResultForCallback);
297
mCallback = nullptr;
298
} else {
299
// ping
300
mResultForCallback = mStream->Available(&mSize);
301
mStream = nullptr;
302
mDoingCallback = true;
303
304
nsCOMPtr<nsIRunnable> event(this); // overly cute
305
mCallbackTarget->Dispatch(event.forget(), NS_DISPATCH_NORMAL);
306
mCallbackTarget = nullptr;
307
}
308
return NS_OK;
309
}
310
311
private:
312
virtual ~AvailableEvent() = default;
313
314
nsCOMPtr<nsIInputStream> mStream;
315
nsCOMPtr<nsIInputAvailableCallback> mCallback;
316
nsCOMPtr<nsIEventTarget> mCallbackTarget;
317
bool mDoingCallback;
318
uint64_t mSize;
319
nsresult mResultForCallback;
320
};
321
322
NS_IMETHODIMP
323
nsStreamTransportService::InputAvailable(nsIInputStream* stream,
324
nsIInputAvailableCallback* callback) {
325
nsCOMPtr<nsIThreadPool> pool;
326
{
327
mozilla::MutexAutoLock lock(mShutdownLock);
328
if (mIsShutdown) {
329
return NS_ERROR_NOT_INITIALIZED;
330
}
331
pool = mPool;
332
}
333
nsCOMPtr<nsIRunnable> event = new AvailableEvent(stream, callback);
334
return pool->Dispatch(event.forget(), NS_DISPATCH_NORMAL);
335
}
336
337
} // namespace net
338
} // namespace mozilla