1 /* -*- Mode: C++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* vim:set ts=2 sw=2 sts=2 et cindent: */
3 /* This Source Code Form is subject to the terms of the Mozilla Public
4 * License, v. 2.0. If a copy of the MPL was not distributed with this
5 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
7 #include "EventTokenBucket.h"
9 #include "nsICancelable.h"
10 #include "nsIIOService.h"
12 #include "nsNetUtil.h"
13 #include "nsServiceManagerUtils.h"
14 #include "nsSocketTransportService2.h"
16 # include "MainThreadUtils.h"
21 # include <mmsystem.h>
27 ////////////////////////////////////////////
28 // EventTokenBucketCancelable
29 ////////////////////////////////////////////
31 class TokenBucketCancelable
: public nsICancelable
{
33 NS_DECL_THREADSAFE_ISUPPORTS
36 explicit TokenBucketCancelable(class ATokenBucketEvent
* event
);
40 virtual ~TokenBucketCancelable() = default;
42 friend class EventTokenBucket
;
43 ATokenBucketEvent
* mEvent
;
46 NS_IMPL_ISUPPORTS(TokenBucketCancelable
, nsICancelable
)
48 TokenBucketCancelable::TokenBucketCancelable(ATokenBucketEvent
* event
)
52 TokenBucketCancelable::Cancel(nsresult reason
) {
53 MOZ_ASSERT(OnSocketThread(), "not on socket thread");
58 void TokenBucketCancelable::Fire() {
61 ATokenBucketEvent
* event
= mEvent
;
63 event
->OnTokenBucketAdmitted();
66 ////////////////////////////////////////////
68 ////////////////////////////////////////////
70 NS_IMPL_ISUPPORTS(EventTokenBucket
, nsITimerCallback
, nsINamed
)
72 // by default 1hz with no burst
73 EventTokenBucket::EventTokenBucket(uint32_t eventsPerSecond
, uint32_t burstSize
)
74 : mUnitCost(kUsecPerSec
),
75 mMaxCredit(kUsecPerSec
),
82 mFineGrainTimerInUse(false),
83 mFineGrainResetTimerArmed(false)
86 mLastUpdate
= TimeStamp::Now();
88 MOZ_ASSERT(NS_IsMainThread());
91 nsCOMPtr
<nsIEventTarget
> sts
;
92 nsCOMPtr
<nsIIOService
> ioService
= do_GetIOService(&rv
);
93 if (NS_SUCCEEDED(rv
)) {
94 sts
= do_GetService(NS_SOCKETTRANSPORTSERVICE_CONTRACTID
, &rv
);
96 if (NS_SUCCEEDED(rv
)) mTimer
= NS_NewTimer(sts
);
97 SetRate(eventsPerSecond
, burstSize
);
100 EventTokenBucket::~EventTokenBucket() {
102 ("EventTokenBucket::dtor %p events=%zu\n", this, mEvents
.GetSize()));
106 // Complete any queued events to prevent hangs
107 while (mEvents
.GetSize()) {
108 RefPtr
<TokenBucketCancelable
> cancelable
= mEvents
.PopFront();
113 void EventTokenBucket::CleanupTimers() {
114 if (mTimer
&& mTimerArmed
) {
122 if (mFineGrainResetTimer
&& mFineGrainResetTimerArmed
) {
123 mFineGrainResetTimer
->Cancel();
125 mFineGrainResetTimer
= nullptr;
126 mFineGrainResetTimerArmed
= false;
130 void EventTokenBucket::SetRate(uint32_t eventsPerSecond
, uint32_t burstSize
) {
131 SOCKET_LOG(("EventTokenBucket::SetRate %p %u %u\n", this, eventsPerSecond
,
134 if (eventsPerSecond
> kMaxHz
) {
135 eventsPerSecond
= kMaxHz
;
136 SOCKET_LOG((" eventsPerSecond out of range\n"));
139 if (!eventsPerSecond
) {
141 SOCKET_LOG((" eventsPerSecond out of range\n"));
144 mUnitCost
= kUsecPerSec
/ eventsPerSecond
;
145 mMaxCredit
= mUnitCost
* burstSize
;
146 if (mMaxCredit
> kUsecPerSec
* 60 * 15) {
147 SOCKET_LOG((" burstSize out of range\n"));
148 mMaxCredit
= kUsecPerSec
* 60 * 15;
150 mCredit
= mMaxCredit
;
151 mLastUpdate
= TimeStamp::Now();
154 void EventTokenBucket::ClearCredits() {
155 MOZ_ASSERT(OnSocketThread(), "not on socket thread");
156 SOCKET_LOG(("EventTokenBucket::ClearCredits %p\n", this));
160 uint32_t EventTokenBucket::BurstEventsAvailable() {
161 MOZ_ASSERT(OnSocketThread(), "not on socket thread");
162 return static_cast<uint32_t>(mCredit
/ mUnitCost
);
165 uint32_t EventTokenBucket::QueuedEvents() {
166 MOZ_ASSERT(OnSocketThread(), "not on socket thread");
167 return mEvents
.GetSize();
170 void EventTokenBucket::Pause() {
171 MOZ_ASSERT(OnSocketThread(), "not on socket thread");
172 SOCKET_LOG(("EventTokenBucket::Pause %p\n", this));
173 if (mPaused
|| mStopped
) return;
182 void EventTokenBucket::UnPause() {
183 MOZ_ASSERT(OnSocketThread(), "not on socket thread");
184 SOCKET_LOG(("EventTokenBucket::UnPause %p\n", this));
185 if (!mPaused
|| mStopped
) return;
192 void EventTokenBucket::Stop() {
193 MOZ_ASSERT(OnSocketThread(), "not on socket thread");
194 SOCKET_LOG(("EventTokenBucket::Stop %p armed=%d\n", this, mTimerArmed
));
198 // Complete any queued events to prevent hangs
199 while (mEvents
.GetSize()) {
200 RefPtr
<TokenBucketCancelable
> cancelable
= mEvents
.PopFront();
205 nsresult
EventTokenBucket::SubmitEvent(ATokenBucketEvent
* event
,
206 nsICancelable
** cancelable
) {
207 MOZ_ASSERT(OnSocketThread(), "not on socket thread");
208 SOCKET_LOG(("EventTokenBucket::SubmitEvent %p\n", this));
210 if (mStopped
|| !mTimer
) return NS_ERROR_FAILURE
;
214 RefPtr
<TokenBucketCancelable
> cancelEvent
= new TokenBucketCancelable(event
);
215 // When this function exits the cancelEvent needs 2 references, one for the
216 // mEvents queue and one for the caller of SubmitEvent()
218 *cancelable
= do_AddRef(cancelEvent
).take();
220 if (mPaused
|| !TryImmediateDispatch(cancelEvent
.get())) {
222 SOCKET_LOG((" queued\n"));
223 mEvents
.Push(cancelEvent
.forget());
226 SOCKET_LOG((" dispatched synchronously\n"));
232 bool EventTokenBucket::TryImmediateDispatch(TokenBucketCancelable
* cancelable
) {
233 if (mCredit
< mUnitCost
) return false;
235 mCredit
-= mUnitCost
;
240 void EventTokenBucket::DispatchEvents() {
241 MOZ_ASSERT(OnSocketThread(), "not on socket thread");
242 SOCKET_LOG(("EventTokenBucket::DispatchEvents %p %d\n", this, mPaused
));
243 if (mPaused
|| mStopped
) return;
245 while (mEvents
.GetSize() && mUnitCost
<= mCredit
) {
246 RefPtr
<TokenBucketCancelable
> cancelable
= mEvents
.PopFront();
247 if (cancelable
->mEvent
) {
249 ("EventTokenBucket::DispachEvents [%p] "
250 "Dispatching queue token bucket event cost=%" PRIu64
251 " credit=%" PRIu64
"\n",
252 this, mUnitCost
, mCredit
));
253 mCredit
-= mUnitCost
;
259 if (!mEvents
.GetSize()) WantNormalTimers();
263 void EventTokenBucket::UpdateTimer() {
264 MOZ_ASSERT(OnSocketThread(), "not on socket thread");
265 if (mTimerArmed
|| mPaused
|| mStopped
|| !mEvents
.GetSize() || !mTimer
) {
269 if (mCredit
>= mUnitCost
) return;
271 // determine the time needed to wait to accumulate enough credits to admit
272 // one more event and set the timer for that point. Always round it
273 // up because firing early doesn't help.
275 uint64_t deficit
= mUnitCost
- mCredit
;
276 uint64_t msecWait
= (deficit
+ (kUsecPerMsec
- 1)) / kUsecPerMsec
;
278 if (msecWait
< 4) { // minimum wait
280 } else if (msecWait
> 60000) { // maximum wait
289 ("EventTokenBucket::UpdateTimer %p for %" PRIu64
"ms\n", this, msecWait
));
290 nsresult rv
= mTimer
->InitWithCallback(this, static_cast<uint32_t>(msecWait
),
291 nsITimer::TYPE_ONE_SHOT
);
292 mTimerArmed
= NS_SUCCEEDED(rv
);
296 EventTokenBucket::Notify(nsITimer
* timer
) {
297 MOZ_ASSERT(OnSocketThread(), "not on socket thread");
300 if (timer
== mFineGrainResetTimer
) {
301 FineGrainResetTimerNotify();
306 SOCKET_LOG(("EventTokenBucket::Notify() %p\n", this));
308 if (mStopped
) return NS_OK
;
318 EventTokenBucket::GetName(nsACString
& aName
) {
319 aName
.AssignLiteral("EventTokenBucket");
323 void EventTokenBucket::UpdateCredits() {
324 MOZ_ASSERT(OnSocketThread(), "not on socket thread");
326 TimeStamp now
= TimeStamp::Now();
327 TimeDuration elapsed
= now
- mLastUpdate
;
330 mCredit
+= static_cast<uint64_t>(elapsed
.ToMicroseconds());
331 if (mCredit
> mMaxCredit
) mCredit
= mMaxCredit
;
332 SOCKET_LOG(("EventTokenBucket::UpdateCredits %p to %" PRIu64
" (%" PRIu64
334 this, mCredit
, mUnitCost
, (double)mCredit
/ mUnitCost
));
338 void EventTokenBucket::FineGrainTimers() {
339 SOCKET_LOG(("EventTokenBucket::FineGrainTimers %p mFineGrainTimerInUse=%d\n",
340 this, mFineGrainTimerInUse
));
342 mLastFineGrainTimerUse
= TimeStamp::Now();
344 if (mFineGrainTimerInUse
) return;
346 if (mUnitCost
> kCostFineGrainThreshold
) return;
349 ("EventTokenBucket::FineGrainTimers %p timeBeginPeriod()\n", this));
351 mFineGrainTimerInUse
= true;
355 void EventTokenBucket::NormalTimers() {
356 if (!mFineGrainTimerInUse
) return;
357 mFineGrainTimerInUse
= false;
359 SOCKET_LOG(("EventTokenBucket::NormalTimers %p timeEndPeriod()\n", this));
363 void EventTokenBucket::WantNormalTimers() {
364 if (!mFineGrainTimerInUse
) return;
365 if (mFineGrainResetTimerArmed
) return;
367 TimeDuration
elapsed(TimeStamp::Now() - mLastFineGrainTimerUse
);
368 static const TimeDuration fiveSeconds
= TimeDuration::FromSeconds(5);
370 if (elapsed
>= fiveSeconds
) {
375 if (!mFineGrainResetTimer
) mFineGrainResetTimer
= NS_NewTimer();
377 // if we can't delay the reset, just do it now
378 if (!mFineGrainResetTimer
) {
383 // pad the callback out 100ms to avoid having to round trip this again if the
384 // timer calls back just a tad early.
386 ("EventTokenBucket::WantNormalTimers %p "
387 "Will reset timer granularity after delay",
390 mFineGrainResetTimer
->InitWithCallback(
392 static_cast<uint32_t>((fiveSeconds
- elapsed
).ToMilliseconds()) + 100,
393 nsITimer::TYPE_ONE_SHOT
);
394 mFineGrainResetTimerArmed
= true;
397 void EventTokenBucket::FineGrainResetTimerNotify() {
398 SOCKET_LOG(("EventTokenBucket::FineGrainResetTimerNotify(%p) events = %zd\n",
399 this, mEvents
.GetSize()));
400 mFineGrainResetTimerArmed
= false;
402 // If we are currently processing events then wait for the queue to drain
403 // before trying to reset back to normal timers again
404 if (!mEvents
.GetSize()) WantNormalTimers();
410 } // namespace mozilla