Source code

Revision control

Other Tools

1
/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2
/* vim: set ts=8 sts=2 et sw=2 tw=80: */
3
/* This Source Code Form is subject to the terms of the Mozilla Public
4
* License, v. 2.0. If a copy of the MPL was not distributed with this
5
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
6
7
#include "ThrottleQueue.h"
8
#include "nsISeekableStream.h"
9
#include "nsIAsyncInputStream.h"
10
#include "nsStreamUtils.h"
11
#include "nsNetUtil.h"
12
13
namespace mozilla {
14
namespace net {
15
16
//-----------------------------------------------------------------------------
17
18
class ThrottleInputStream final : public nsIAsyncInputStream,
19
public nsISeekableStream {
20
public:
21
ThrottleInputStream(nsIInputStream* aStream, ThrottleQueue* aQueue);
22
23
NS_DECL_THREADSAFE_ISUPPORTS
24
NS_DECL_NSIINPUTSTREAM
25
NS_DECL_NSISEEKABLESTREAM
26
NS_DECL_NSITELLABLESTREAM
27
NS_DECL_NSIASYNCINPUTSTREAM
28
29
void AllowInput();
30
31
private:
32
~ThrottleInputStream();
33
34
nsCOMPtr<nsIInputStream> mStream;
35
RefPtr<ThrottleQueue> mQueue;
36
nsresult mClosedStatus;
37
38
nsCOMPtr<nsIInputStreamCallback> mCallback;
39
nsCOMPtr<nsIEventTarget> mEventTarget;
40
};
41
42
NS_IMPL_ISUPPORTS(ThrottleInputStream, nsIAsyncInputStream, nsIInputStream,
43
nsITellableStream, nsISeekableStream)
44
45
ThrottleInputStream::ThrottleInputStream(nsIInputStream* aStream,
46
ThrottleQueue* aQueue)
47
: mStream(aStream), mQueue(aQueue), mClosedStatus(NS_OK) {
48
MOZ_ASSERT(aQueue != nullptr);
49
}
50
51
ThrottleInputStream::~ThrottleInputStream() { Close(); }
52
53
NS_IMETHODIMP
54
ThrottleInputStream::Close() {
55
if (NS_FAILED(mClosedStatus)) {
56
return mClosedStatus;
57
}
58
59
if (mQueue) {
60
mQueue->DequeueStream(this);
61
mQueue = nullptr;
62
mClosedStatus = NS_BASE_STREAM_CLOSED;
63
}
64
return mStream->Close();
65
}
66
67
NS_IMETHODIMP
68
ThrottleInputStream::Available(uint64_t* aResult) {
69
if (NS_FAILED(mClosedStatus)) {
70
return mClosedStatus;
71
}
72
73
return mStream->Available(aResult);
74
}
75
76
NS_IMETHODIMP
77
ThrottleInputStream::Read(char* aBuf, uint32_t aCount, uint32_t* aResult) {
78
if (NS_FAILED(mClosedStatus)) {
79
return mClosedStatus;
80
}
81
82
uint32_t realCount;
83
nsresult rv = mQueue->Available(aCount, &realCount);
84
if (NS_FAILED(rv)) {
85
return rv;
86
}
87
88
if (realCount == 0) {
89
return NS_BASE_STREAM_WOULD_BLOCK;
90
}
91
92
rv = mStream->Read(aBuf, realCount, aResult);
93
if (NS_SUCCEEDED(rv) && *aResult > 0) {
94
mQueue->RecordRead(*aResult);
95
}
96
return rv;
97
}
98
99
NS_IMETHODIMP
100
ThrottleInputStream::ReadSegments(nsWriteSegmentFun aWriter, void* aClosure,
101
uint32_t aCount, uint32_t* aResult) {
102
if (NS_FAILED(mClosedStatus)) {
103
return mClosedStatus;
104
}
105
106
uint32_t realCount;
107
nsresult rv = mQueue->Available(aCount, &realCount);
108
if (NS_FAILED(rv)) {
109
return rv;
110
}
111
112
if (realCount == 0) {
113
return NS_BASE_STREAM_WOULD_BLOCK;
114
}
115
116
rv = mStream->ReadSegments(aWriter, aClosure, realCount, aResult);
117
if (NS_SUCCEEDED(rv) && *aResult > 0) {
118
mQueue->RecordRead(*aResult);
119
}
120
return rv;
121
}
122
123
NS_IMETHODIMP
124
ThrottleInputStream::IsNonBlocking(bool* aNonBlocking) {
125
*aNonBlocking = true;
126
return NS_OK;
127
}
128
129
NS_IMETHODIMP
130
ThrottleInputStream::Seek(int32_t aWhence, int64_t aOffset) {
131
if (NS_FAILED(mClosedStatus)) {
132
return mClosedStatus;
133
}
134
135
nsCOMPtr<nsISeekableStream> sstream = do_QueryInterface(mStream);
136
if (!sstream) {
137
return NS_ERROR_FAILURE;
138
}
139
140
return sstream->Seek(aWhence, aOffset);
141
}
142
143
NS_IMETHODIMP
144
ThrottleInputStream::Tell(int64_t* aResult) {
145
if (NS_FAILED(mClosedStatus)) {
146
return mClosedStatus;
147
}
148
149
nsCOMPtr<nsITellableStream> sstream = do_QueryInterface(mStream);
150
if (!sstream) {
151
return NS_ERROR_FAILURE;
152
}
153
154
return sstream->Tell(aResult);
155
}
156
157
NS_IMETHODIMP
158
ThrottleInputStream::SetEOF() {
159
if (NS_FAILED(mClosedStatus)) {
160
return mClosedStatus;
161
}
162
163
nsCOMPtr<nsISeekableStream> sstream = do_QueryInterface(mStream);
164
if (!sstream) {
165
return NS_ERROR_FAILURE;
166
}
167
168
return sstream->SetEOF();
169
}
170
171
NS_IMETHODIMP
172
ThrottleInputStream::CloseWithStatus(nsresult aStatus) {
173
if (NS_FAILED(mClosedStatus)) {
174
// Already closed, ignore.
175
return NS_OK;
176
}
177
if (NS_SUCCEEDED(aStatus)) {
178
aStatus = NS_BASE_STREAM_CLOSED;
179
}
180
181
mClosedStatus = Close();
182
if (NS_SUCCEEDED(mClosedStatus)) {
183
mClosedStatus = aStatus;
184
}
185
return NS_OK;
186
}
187
188
NS_IMETHODIMP
189
ThrottleInputStream::AsyncWait(nsIInputStreamCallback* aCallback,
190
uint32_t aFlags, uint32_t aRequestedCount,
191
nsIEventTarget* aEventTarget) {
192
if (aFlags != 0) {
193
return NS_ERROR_ILLEGAL_VALUE;
194
}
195
196
mCallback = aCallback;
197
mEventTarget = aEventTarget;
198
if (mCallback) {
199
mQueue->QueueStream(this);
200
} else {
201
mQueue->DequeueStream(this);
202
}
203
return NS_OK;
204
}
205
206
void ThrottleInputStream::AllowInput() {
207
MOZ_ASSERT(mCallback);
208
nsCOMPtr<nsIInputStreamCallback> callbackEvent = NS_NewInputStreamReadyEvent(
209
"ThrottleInputStream::AllowInput", mCallback, mEventTarget);
210
mCallback = nullptr;
211
mEventTarget = nullptr;
212
callbackEvent->OnInputStreamReady(this);
213
}
214
215
//-----------------------------------------------------------------------------
216
217
NS_IMPL_ISUPPORTS(ThrottleQueue, nsIInputChannelThrottleQueue, nsITimerCallback,
218
nsINamed)
219
220
ThrottleQueue::ThrottleQueue()
221
: mMeanBytesPerSecond(0),
222
mMaxBytesPerSecond(0),
223
mBytesProcessed(0),
224
mTimerArmed(false) {
225
nsresult rv;
226
nsCOMPtr<nsIEventTarget> sts;
227
nsCOMPtr<nsIIOService> ioService = do_GetIOService(&rv);
228
if (NS_SUCCEEDED(rv))
229
sts = do_GetService(NS_SOCKETTRANSPORTSERVICE_CONTRACTID, &rv);
230
if (NS_SUCCEEDED(rv)) mTimer = NS_NewTimer(sts);
231
}
232
233
ThrottleQueue::~ThrottleQueue() {
234
if (mTimer && mTimerArmed) {
235
mTimer->Cancel();
236
}
237
mTimer = nullptr;
238
}
239
240
NS_IMETHODIMP
241
ThrottleQueue::RecordRead(uint32_t aBytesRead) {
242
MOZ_ASSERT(OnSocketThread(), "not on socket thread");
243
ThrottleEntry entry;
244
entry.mTime = TimeStamp::Now();
245
entry.mBytesRead = aBytesRead;
246
mReadEvents.AppendElement(entry);
247
mBytesProcessed += aBytesRead;
248
return NS_OK;
249
}
250
251
NS_IMETHODIMP
252
ThrottleQueue::Available(uint32_t aRemaining, uint32_t* aAvailable) {
253
MOZ_ASSERT(OnSocketThread(), "not on socket thread");
254
TimeStamp now = TimeStamp::Now();
255
TimeStamp oneSecondAgo = now - TimeDuration::FromSeconds(1);
256
size_t i;
257
258
// Remove all stale events.
259
for (i = 0; i < mReadEvents.Length(); ++i) {
260
if (mReadEvents[i].mTime >= oneSecondAgo) {
261
break;
262
}
263
}
264
mReadEvents.RemoveElementsAt(0, i);
265
266
uint32_t totalBytes = 0;
267
for (i = 0; i < mReadEvents.Length(); ++i) {
268
totalBytes += mReadEvents[i].mBytesRead;
269
}
270
271
uint32_t spread = mMaxBytesPerSecond - mMeanBytesPerSecond;
272
double prob = static_cast<double>(rand()) / RAND_MAX;
273
uint32_t thisSliceBytes =
274
mMeanBytesPerSecond - spread + static_cast<uint32_t>(2 * spread * prob);
275
276
if (totalBytes >= thisSliceBytes) {
277
*aAvailable = 0;
278
} else {
279
*aAvailable = thisSliceBytes;
280
}
281
return NS_OK;
282
}
283
284
NS_IMETHODIMP
285
ThrottleQueue::Init(uint32_t aMeanBytesPerSecond, uint32_t aMaxBytesPerSecond) {
286
// Can be called on any thread.
287
if (aMeanBytesPerSecond == 0 || aMaxBytesPerSecond == 0 ||
288
aMaxBytesPerSecond < aMeanBytesPerSecond) {
289
return NS_ERROR_ILLEGAL_VALUE;
290
}
291
292
mMeanBytesPerSecond = aMeanBytesPerSecond;
293
mMaxBytesPerSecond = aMaxBytesPerSecond;
294
return NS_OK;
295
}
296
297
NS_IMETHODIMP
298
ThrottleQueue::BytesProcessed(uint64_t* aResult) {
299
*aResult = mBytesProcessed;
300
return NS_OK;
301
}
302
303
NS_IMETHODIMP
304
ThrottleQueue::WrapStream(nsIInputStream* aInputStream,
305
nsIAsyncInputStream** aResult) {
306
nsCOMPtr<nsIAsyncInputStream> result =
307
new ThrottleInputStream(aInputStream, this);
308
result.forget(aResult);
309
return NS_OK;
310
}
311
312
NS_IMETHODIMP
313
ThrottleQueue::Notify(nsITimer* aTimer) {
314
MOZ_ASSERT(OnSocketThread(), "not on socket thread");
315
// A notified reader may need to push itself back on the queue.
316
// Swap out the list of readers so that this works properly.
317
nsTArray<RefPtr<ThrottleInputStream>> events;
318
events.SwapElements(mAsyncEvents);
319
320
// Optimistically notify all the waiting readers, and then let them
321
// requeue if there isn't enough bandwidth.
322
for (size_t i = 0; i < events.Length(); ++i) {
323
events[i]->AllowInput();
324
}
325
326
mTimerArmed = false;
327
return NS_OK;
328
}
329
330
NS_IMETHODIMP
331
ThrottleQueue::GetName(nsACString& aName) {
332
aName.AssignLiteral("net::ThrottleQueue");
333
return NS_OK;
334
}
335
336
void ThrottleQueue::QueueStream(ThrottleInputStream* aStream) {
337
MOZ_ASSERT(OnSocketThread(), "not on socket thread");
338
if (mAsyncEvents.IndexOf(aStream) == mAsyncEvents.NoIndex) {
339
mAsyncEvents.AppendElement(aStream);
340
341
if (!mTimerArmed) {
342
uint32_t ms = 1000;
343
if (mReadEvents.Length() > 0) {
344
TimeStamp t = mReadEvents[0].mTime + TimeDuration::FromSeconds(1);
345
TimeStamp now = TimeStamp::Now();
346
347
if (t > now) {
348
ms = static_cast<uint32_t>((t - now).ToMilliseconds());
349
} else {
350
ms = 1;
351
}
352
}
353
354
if (NS_SUCCEEDED(
355
mTimer->InitWithCallback(this, ms, nsITimer::TYPE_ONE_SHOT))) {
356
mTimerArmed = true;
357
}
358
}
359
}
360
}
361
362
void ThrottleQueue::DequeueStream(ThrottleInputStream* aStream) {
363
MOZ_ASSERT(OnSocketThread(), "not on socket thread");
364
mAsyncEvents.RemoveElement(aStream);
365
}
366
367
} // namespace net
368
} // namespace mozilla