Source code

Revision control

Other Tools

1
/* -*- Mode: C++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2
/* vim:set ts=2 sw=2 sts=2 et cindent: */
3
/* This Source Code Form is subject to the terms of the Mozilla Public
4
* License, v. 2.0. If a copy of the MPL was not distributed with this
5
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
6
7
#include "EventTokenBucket.h"
8
9
#include "nsICancelable.h"
10
#include "nsIIOService.h"
11
#include "nsNetCID.h"
12
#include "nsNetUtil.h"
13
#include "nsServiceManagerUtils.h"
14
#include "nsSocketTransportService2.h"
15
#ifdef DEBUG
16
# include "MainThreadUtils.h"
17
#endif
18
19
#ifdef XP_WIN
20
# include <windows.h>
21
# include <mmsystem.h>
22
#endif
23
24
namespace mozilla {
25
namespace net {
26
27
////////////////////////////////////////////
28
// EventTokenBucketCancelable
29
////////////////////////////////////////////
30
31
class TokenBucketCancelable : public nsICancelable {
32
public:
33
NS_DECL_THREADSAFE_ISUPPORTS
34
NS_DECL_NSICANCELABLE
35
36
explicit TokenBucketCancelable(class ATokenBucketEvent* event);
37
void Fire();
38
39
private:
40
virtual ~TokenBucketCancelable() = default;
41
42
friend class EventTokenBucket;
43
ATokenBucketEvent* mEvent;
44
};
45
46
NS_IMPL_ISUPPORTS(TokenBucketCancelable, nsICancelable)
47
48
TokenBucketCancelable::TokenBucketCancelable(ATokenBucketEvent* event)
49
: mEvent(event) {}
50
51
NS_IMETHODIMP
52
TokenBucketCancelable::Cancel(nsresult reason) {
53
MOZ_ASSERT(OnSocketThread(), "not on socket thread");
54
mEvent = nullptr;
55
return NS_OK;
56
}
57
58
void TokenBucketCancelable::Fire() {
59
if (!mEvent) return;
60
61
ATokenBucketEvent* event = mEvent;
62
mEvent = nullptr;
63
event->OnTokenBucketAdmitted();
64
}
65
66
////////////////////////////////////////////
67
// EventTokenBucket
68
////////////////////////////////////////////
69
70
NS_IMPL_ISUPPORTS(EventTokenBucket, nsITimerCallback, nsINamed)
71
72
// by default 1hz with no burst
73
EventTokenBucket::EventTokenBucket(uint32_t eventsPerSecond, uint32_t burstSize)
74
: mUnitCost(kUsecPerSec),
75
mMaxCredit(kUsecPerSec),
76
mCredit(kUsecPerSec),
77
mPaused(false),
78
mStopped(false),
79
mTimerArmed(false)
80
#ifdef XP_WIN
81
,
82
mFineGrainTimerInUse(false),
83
mFineGrainResetTimerArmed(false)
84
#endif
85
{
86
mLastUpdate = TimeStamp::Now();
87
88
MOZ_ASSERT(NS_IsMainThread());
89
90
nsresult rv;
91
nsCOMPtr<nsIEventTarget> sts;
92
nsCOMPtr<nsIIOService> ioService = do_GetIOService(&rv);
93
if (NS_SUCCEEDED(rv))
94
sts = do_GetService(NS_SOCKETTRANSPORTSERVICE_CONTRACTID, &rv);
95
if (NS_SUCCEEDED(rv)) mTimer = NS_NewTimer(sts);
96
SetRate(eventsPerSecond, burstSize);
97
}
98
99
EventTokenBucket::~EventTokenBucket() {
100
SOCKET_LOG(
101
("EventTokenBucket::dtor %p events=%zu\n", this, mEvents.GetSize()));
102
103
CleanupTimers();
104
105
// Complete any queued events to prevent hangs
106
while (mEvents.GetSize()) {
107
RefPtr<TokenBucketCancelable> cancelable =
108
dont_AddRef(static_cast<TokenBucketCancelable*>(mEvents.PopFront()));
109
cancelable->Fire();
110
}
111
}
112
113
void EventTokenBucket::CleanupTimers() {
114
if (mTimer && mTimerArmed) {
115
mTimer->Cancel();
116
}
117
mTimer = nullptr;
118
mTimerArmed = false;
119
120
#ifdef XP_WIN
121
NormalTimers();
122
if (mFineGrainResetTimer && mFineGrainResetTimerArmed) {
123
mFineGrainResetTimer->Cancel();
124
}
125
mFineGrainResetTimer = nullptr;
126
mFineGrainResetTimerArmed = false;
127
#endif
128
}
129
130
void EventTokenBucket::SetRate(uint32_t eventsPerSecond, uint32_t burstSize) {
131
SOCKET_LOG(("EventTokenBucket::SetRate %p %u %u\n", this, eventsPerSecond,
132
burstSize));
133
134
if (eventsPerSecond > kMaxHz) {
135
eventsPerSecond = kMaxHz;
136
SOCKET_LOG((" eventsPerSecond out of range\n"));
137
}
138
139
if (!eventsPerSecond) {
140
eventsPerSecond = 1;
141
SOCKET_LOG((" eventsPerSecond out of range\n"));
142
}
143
144
mUnitCost = kUsecPerSec / eventsPerSecond;
145
mMaxCredit = mUnitCost * burstSize;
146
if (mMaxCredit > kUsecPerSec * 60 * 15) {
147
SOCKET_LOG((" burstSize out of range\n"));
148
mMaxCredit = kUsecPerSec * 60 * 15;
149
}
150
mCredit = mMaxCredit;
151
mLastUpdate = TimeStamp::Now();
152
}
153
154
void EventTokenBucket::ClearCredits() {
155
MOZ_ASSERT(OnSocketThread(), "not on socket thread");
156
SOCKET_LOG(("EventTokenBucket::ClearCredits %p\n", this));
157
mCredit = 0;
158
}
159
160
uint32_t EventTokenBucket::BurstEventsAvailable() {
161
MOZ_ASSERT(OnSocketThread(), "not on socket thread");
162
return static_cast<uint32_t>(mCredit / mUnitCost);
163
}
164
165
uint32_t EventTokenBucket::QueuedEvents() {
166
MOZ_ASSERT(OnSocketThread(), "not on socket thread");
167
return mEvents.GetSize();
168
}
169
170
void EventTokenBucket::Pause() {
171
MOZ_ASSERT(OnSocketThread(), "not on socket thread");
172
SOCKET_LOG(("EventTokenBucket::Pause %p\n", this));
173
if (mPaused || mStopped) return;
174
175
mPaused = true;
176
if (mTimerArmed) {
177
mTimer->Cancel();
178
mTimerArmed = false;
179
}
180
}
181
182
void EventTokenBucket::UnPause() {
183
MOZ_ASSERT(OnSocketThread(), "not on socket thread");
184
SOCKET_LOG(("EventTokenBucket::UnPause %p\n", this));
185
if (!mPaused || mStopped) return;
186
187
mPaused = false;
188
DispatchEvents();
189
UpdateTimer();
190
}
191
192
void EventTokenBucket::Stop() {
193
MOZ_ASSERT(OnSocketThread(), "not on socket thread");
194
SOCKET_LOG(("EventTokenBucket::Stop %p armed=%d\n", this, mTimerArmed));
195
mStopped = true;
196
CleanupTimers();
197
198
// Complete any queued events to prevent hangs
199
while (mEvents.GetSize()) {
200
RefPtr<TokenBucketCancelable> cancelable =
201
dont_AddRef(static_cast<TokenBucketCancelable*>(mEvents.PopFront()));
202
cancelable->Fire();
203
}
204
}
205
206
nsresult EventTokenBucket::SubmitEvent(ATokenBucketEvent* event,
207
nsICancelable** cancelable) {
208
MOZ_ASSERT(OnSocketThread(), "not on socket thread");
209
SOCKET_LOG(("EventTokenBucket::SubmitEvent %p\n", this));
210
211
if (mStopped || !mTimer) return NS_ERROR_FAILURE;
212
213
UpdateCredits();
214
215
RefPtr<TokenBucketCancelable> cancelEvent = new TokenBucketCancelable(event);
216
// When this function exits the cancelEvent needs 2 references, one for the
217
// mEvents queue and one for the caller of SubmitEvent()
218
219
NS_ADDREF(*cancelable = cancelEvent.get());
220
221
if (mPaused || !TryImmediateDispatch(cancelEvent.get())) {
222
// queue it
223
SOCKET_LOG((" queued\n"));
224
mEvents.Push(cancelEvent.forget().take());
225
UpdateTimer();
226
} else {
227
SOCKET_LOG((" dispatched synchronously\n"));
228
}
229
230
return NS_OK;
231
}
232
233
bool EventTokenBucket::TryImmediateDispatch(TokenBucketCancelable* cancelable) {
234
if (mCredit < mUnitCost) return false;
235
236
mCredit -= mUnitCost;
237
cancelable->Fire();
238
return true;
239
}
240
241
void EventTokenBucket::DispatchEvents() {
242
MOZ_ASSERT(OnSocketThread(), "not on socket thread");
243
SOCKET_LOG(("EventTokenBucket::DispatchEvents %p %d\n", this, mPaused));
244
if (mPaused || mStopped) return;
245
246
while (mEvents.GetSize() && mUnitCost <= mCredit) {
247
RefPtr<TokenBucketCancelable> cancelable =
248
dont_AddRef(static_cast<TokenBucketCancelable*>(mEvents.PopFront()));
249
if (cancelable->mEvent) {
250
SOCKET_LOG(
251
("EventTokenBucket::DispachEvents [%p] "
252
"Dispatching queue token bucket event cost=%" PRIu64
253
" credit=%" PRIu64 "\n",
254
this, mUnitCost, mCredit));
255
mCredit -= mUnitCost;
256
cancelable->Fire();
257
}
258
}
259
260
#ifdef XP_WIN
261
if (!mEvents.GetSize()) WantNormalTimers();
262
#endif
263
}
264
265
void EventTokenBucket::UpdateTimer() {
266
MOZ_ASSERT(OnSocketThread(), "not on socket thread");
267
if (mTimerArmed || mPaused || mStopped || !mEvents.GetSize() || !mTimer)
268
return;
269
270
if (mCredit >= mUnitCost) return;
271
272
// determine the time needed to wait to accumulate enough credits to admit
273
// one more event and set the timer for that point. Always round it
274
// up because firing early doesn't help.
275
//
276
uint64_t deficit = mUnitCost - mCredit;
277
uint64_t msecWait = (deficit + (kUsecPerMsec - 1)) / kUsecPerMsec;
278
279
if (msecWait < 4) // minimum wait
280
msecWait = 4;
281
else if (msecWait > 60000) // maximum wait
282
msecWait = 60000;
283
284
#ifdef XP_WIN
285
FineGrainTimers();
286
#endif
287
288
SOCKET_LOG(
289
("EventTokenBucket::UpdateTimer %p for %" PRIu64 "ms\n", this, msecWait));
290
nsresult rv = mTimer->InitWithCallback(this, static_cast<uint32_t>(msecWait),
291
nsITimer::TYPE_ONE_SHOT);
292
mTimerArmed = NS_SUCCEEDED(rv);
293
}
294
295
NS_IMETHODIMP
296
EventTokenBucket::Notify(nsITimer* timer) {
297
MOZ_ASSERT(OnSocketThread(), "not on socket thread");
298
299
#ifdef XP_WIN
300
if (timer == mFineGrainResetTimer) {
301
FineGrainResetTimerNotify();
302
return NS_OK;
303
}
304
#endif
305
306
SOCKET_LOG(("EventTokenBucket::Notify() %p\n", this));
307
mTimerArmed = false;
308
if (mStopped) return NS_OK;
309
310
UpdateCredits();
311
DispatchEvents();
312
UpdateTimer();
313
314
return NS_OK;
315
}
316
317
NS_IMETHODIMP
318
EventTokenBucket::GetName(nsACString& aName) {
319
aName.AssignLiteral("EventTokenBucket");
320
return NS_OK;
321
}
322
323
void EventTokenBucket::UpdateCredits() {
324
MOZ_ASSERT(OnSocketThread(), "not on socket thread");
325
326
TimeStamp now = TimeStamp::Now();
327
TimeDuration elapsed = now - mLastUpdate;
328
mLastUpdate = now;
329
330
mCredit += static_cast<uint64_t>(elapsed.ToMicroseconds());
331
if (mCredit > mMaxCredit) mCredit = mMaxCredit;
332
SOCKET_LOG(("EventTokenBucket::UpdateCredits %p to %" PRIu64 " (%" PRIu64
333
" each.. %3.2f)\n",
334
this, mCredit, mUnitCost, (double)mCredit / mUnitCost));
335
}
336
337
#ifdef XP_WIN
338
void EventTokenBucket::FineGrainTimers() {
339
SOCKET_LOG(("EventTokenBucket::FineGrainTimers %p mFineGrainTimerInUse=%d\n",
340
this, mFineGrainTimerInUse));
341
342
mLastFineGrainTimerUse = TimeStamp::Now();
343
344
if (mFineGrainTimerInUse) return;
345
346
if (mUnitCost > kCostFineGrainThreshold) return;
347
348
SOCKET_LOG(
349
("EventTokenBucket::FineGrainTimers %p timeBeginPeriod()\n", this));
350
351
mFineGrainTimerInUse = true;
352
timeBeginPeriod(1);
353
}
354
355
void EventTokenBucket::NormalTimers() {
356
if (!mFineGrainTimerInUse) return;
357
mFineGrainTimerInUse = false;
358
359
SOCKET_LOG(("EventTokenBucket::NormalTimers %p timeEndPeriod()\n", this));
360
timeEndPeriod(1);
361
}
362
363
void EventTokenBucket::WantNormalTimers() {
364
if (!mFineGrainTimerInUse) return;
365
if (mFineGrainResetTimerArmed) return;
366
367
TimeDuration elapsed(TimeStamp::Now() - mLastFineGrainTimerUse);
368
static const TimeDuration fiveSeconds = TimeDuration::FromSeconds(5);
369
370
if (elapsed >= fiveSeconds) {
371
NormalTimers();
372
return;
373
}
374
375
if (!mFineGrainResetTimer) mFineGrainResetTimer = NS_NewTimer();
376
377
// if we can't delay the reset, just do it now
378
if (!mFineGrainResetTimer) {
379
NormalTimers();
380
return;
381
}
382
383
// pad the callback out 100ms to avoid having to round trip this again if the
384
// timer calls back just a tad early.
385
SOCKET_LOG(
386
("EventTokenBucket::WantNormalTimers %p "
387
"Will reset timer granularity after delay",
388
this));
389
390
mFineGrainResetTimer->InitWithCallback(
391
this,
392
static_cast<uint32_t>((fiveSeconds - elapsed).ToMilliseconds()) + 100,
393
nsITimer::TYPE_ONE_SHOT);
394
mFineGrainResetTimerArmed = true;
395
}
396
397
void EventTokenBucket::FineGrainResetTimerNotify() {
398
SOCKET_LOG(("EventTokenBucket::FineGrainResetTimerNotify() events = %d\n",
399
this, mEvents.GetSize()));
400
mFineGrainResetTimerArmed = false;
401
402
// If we are currently processing events then wait for the queue to drain
403
// before trying to reset back to normal timers again
404
if (!mEvents.GetSize()) WantNormalTimers();
405
}
406
407
#endif
408
409
} // namespace net
410
} // namespace mozilla