1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* vim: set ts=8 sts=2 et sw=2 tw=80: */
3 /* This Source Code Form is subject to the terms of the Mozilla Public
4 * License, v. 2.0. If a copy of the MPL was not distributed with this
5 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
7 #include "ThrottleQueue.h"
8 #include "mozilla/net/InputChannelThrottleQueueParent.h"
9 #include "nsISeekableStream.h"
10 #include "nsIAsyncInputStream.h"
11 #include "nsIOService.h"
12 #include "nsSocketTransportService2.h"
13 #include "nsStreamUtils.h"
14 #include "nsNetUtil.h"
19 //-----------------------------------------------------------------------------
21 class ThrottleInputStream final
: public nsIAsyncInputStream
,
22 public nsISeekableStream
{
24 ThrottleInputStream(nsIInputStream
* aStream
, ThrottleQueue
* aQueue
);
26 NS_DECL_THREADSAFE_ISUPPORTS
27 NS_DECL_NSIINPUTSTREAM
28 NS_DECL_NSISEEKABLESTREAM
29 NS_DECL_NSITELLABLESTREAM
30 NS_DECL_NSIASYNCINPUTSTREAM
35 ~ThrottleInputStream();
37 nsCOMPtr
<nsIInputStream
> mStream
;
38 RefPtr
<ThrottleQueue
> mQueue
;
39 nsresult mClosedStatus
;
41 nsCOMPtr
<nsIInputStreamCallback
> mCallback
;
42 nsCOMPtr
<nsIEventTarget
> mEventTarget
;
45 NS_IMPL_ISUPPORTS(ThrottleInputStream
, nsIAsyncInputStream
, nsIInputStream
,
46 nsITellableStream
, nsISeekableStream
)
48 ThrottleInputStream::ThrottleInputStream(nsIInputStream
* aStream
,
49 ThrottleQueue
* aQueue
)
50 : mStream(aStream
), mQueue(aQueue
), mClosedStatus(NS_OK
) {
51 MOZ_ASSERT(aQueue
!= nullptr);
54 ThrottleInputStream::~ThrottleInputStream() { Close(); }
57 ThrottleInputStream::Close() {
58 if (NS_FAILED(mClosedStatus
)) {
63 mQueue
->DequeueStream(this);
65 mClosedStatus
= NS_BASE_STREAM_CLOSED
;
67 return mStream
->Close();
71 ThrottleInputStream::Available(uint64_t* aResult
) {
72 if (NS_FAILED(mClosedStatus
)) {
76 return mStream
->Available(aResult
);
80 ThrottleInputStream::StreamStatus() {
81 if (NS_FAILED(mClosedStatus
)) {
85 return mStream
->StreamStatus();
89 ThrottleInputStream::Read(char* aBuf
, uint32_t aCount
, uint32_t* aResult
) {
90 if (NS_FAILED(mClosedStatus
)) {
95 nsresult rv
= mQueue
->Available(aCount
, &realCount
);
100 if (realCount
== 0) {
101 return NS_BASE_STREAM_WOULD_BLOCK
;
104 rv
= mStream
->Read(aBuf
, realCount
, aResult
);
105 if (NS_SUCCEEDED(rv
) && *aResult
> 0) {
106 mQueue
->RecordRead(*aResult
);
112 ThrottleInputStream::ReadSegments(nsWriteSegmentFun aWriter
, void* aClosure
,
113 uint32_t aCount
, uint32_t* aResult
) {
114 if (NS_FAILED(mClosedStatus
)) {
115 return mClosedStatus
;
119 nsresult rv
= mQueue
->Available(aCount
, &realCount
);
123 MOZ_ASSERT(realCount
<= aCount
);
125 if (realCount
== 0) {
126 return NS_BASE_STREAM_WOULD_BLOCK
;
129 rv
= mStream
->ReadSegments(aWriter
, aClosure
, realCount
, aResult
);
130 if (NS_SUCCEEDED(rv
) && *aResult
> 0) {
131 mQueue
->RecordRead(*aResult
);
137 ThrottleInputStream::IsNonBlocking(bool* aNonBlocking
) {
138 *aNonBlocking
= true;
143 ThrottleInputStream::Seek(int32_t aWhence
, int64_t aOffset
) {
144 if (NS_FAILED(mClosedStatus
)) {
145 return mClosedStatus
;
148 nsCOMPtr
<nsISeekableStream
> sstream
= do_QueryInterface(mStream
);
150 return NS_ERROR_FAILURE
;
153 return sstream
->Seek(aWhence
, aOffset
);
157 ThrottleInputStream::Tell(int64_t* aResult
) {
158 if (NS_FAILED(mClosedStatus
)) {
159 return mClosedStatus
;
162 nsCOMPtr
<nsITellableStream
> sstream
= do_QueryInterface(mStream
);
164 return NS_ERROR_FAILURE
;
167 return sstream
->Tell(aResult
);
171 ThrottleInputStream::SetEOF() {
172 if (NS_FAILED(mClosedStatus
)) {
173 return mClosedStatus
;
176 nsCOMPtr
<nsISeekableStream
> sstream
= do_QueryInterface(mStream
);
178 return NS_ERROR_FAILURE
;
181 return sstream
->SetEOF();
185 ThrottleInputStream::CloseWithStatus(nsresult aStatus
) {
186 if (NS_FAILED(mClosedStatus
)) {
187 // Already closed, ignore.
190 if (NS_SUCCEEDED(aStatus
)) {
191 aStatus
= NS_BASE_STREAM_CLOSED
;
194 mClosedStatus
= Close();
195 if (NS_SUCCEEDED(mClosedStatus
)) {
196 mClosedStatus
= aStatus
;
202 ThrottleInputStream::AsyncWait(nsIInputStreamCallback
* aCallback
,
203 uint32_t aFlags
, uint32_t aRequestedCount
,
204 nsIEventTarget
* aEventTarget
) {
206 return NS_ERROR_ILLEGAL_VALUE
;
209 mCallback
= aCallback
;
210 mEventTarget
= aEventTarget
;
212 mQueue
->QueueStream(this);
214 mQueue
->DequeueStream(this);
219 void ThrottleInputStream::AllowInput() {
220 MOZ_ASSERT(mCallback
);
221 nsCOMPtr
<nsIInputStreamCallback
> callbackEvent
= NS_NewInputStreamReadyEvent(
222 "ThrottleInputStream::AllowInput", mCallback
, mEventTarget
);
224 mEventTarget
= nullptr;
225 callbackEvent
->OnInputStreamReady(this);
228 //-----------------------------------------------------------------------------
231 already_AddRefed
<nsIInputChannelThrottleQueue
> ThrottleQueue::Create() {
232 MOZ_ASSERT(XRE_IsParentProcess());
234 nsCOMPtr
<nsIInputChannelThrottleQueue
> tq
;
235 if (nsIOService::UseSocketProcess()) {
236 tq
= new InputChannelThrottleQueueParent();
238 tq
= new ThrottleQueue();
244 NS_IMPL_ISUPPORTS(ThrottleQueue
, nsIInputChannelThrottleQueue
, nsITimerCallback
,
247 ThrottleQueue::ThrottleQueue()
251 nsCOMPtr
<nsIEventTarget
> sts
;
252 nsCOMPtr
<nsIIOService
> ioService
= do_GetIOService(&rv
);
253 if (NS_SUCCEEDED(rv
)) {
254 sts
= do_GetService(NS_SOCKETTRANSPORTSERVICE_CONTRACTID
, &rv
);
256 if (NS_SUCCEEDED(rv
)) mTimer
= NS_NewTimer(sts
);
259 ThrottleQueue::~ThrottleQueue() {
260 if (mTimer
&& mTimerArmed
) {
267 ThrottleQueue::RecordRead(uint32_t aBytesRead
) {
268 MOZ_ASSERT(OnSocketThread(), "not on socket thread");
270 entry
.mTime
= TimeStamp::Now();
271 entry
.mBytesRead
= aBytesRead
;
272 mReadEvents
.AppendElement(entry
);
273 mBytesProcessed
+= aBytesRead
;
278 ThrottleQueue::Available(uint32_t aRemaining
, uint32_t* aAvailable
) {
279 MOZ_ASSERT(OnSocketThread(), "not on socket thread");
280 TimeStamp now
= TimeStamp::Now();
281 TimeStamp oneSecondAgo
= now
- TimeDuration::FromSeconds(1);
284 // Remove all stale events.
285 for (i
= 0; i
< mReadEvents
.Length(); ++i
) {
286 if (mReadEvents
[i
].mTime
>= oneSecondAgo
) {
290 mReadEvents
.RemoveElementsAt(0, i
);
292 uint32_t totalBytes
= 0;
293 for (i
= 0; i
< mReadEvents
.Length(); ++i
) {
294 totalBytes
+= mReadEvents
[i
].mBytesRead
;
297 uint32_t spread
= mMaxBytesPerSecond
- mMeanBytesPerSecond
;
298 double prob
= static_cast<double>(rand()) / RAND_MAX
;
299 uint32_t thisSliceBytes
=
300 mMeanBytesPerSecond
- spread
+ static_cast<uint32_t>(2 * spread
* prob
);
302 if (totalBytes
>= thisSliceBytes
) {
305 *aAvailable
= std::min(thisSliceBytes
, aRemaining
);
311 ThrottleQueue::Init(uint32_t aMeanBytesPerSecond
, uint32_t aMaxBytesPerSecond
) {
312 // Can be called on any thread.
313 if (aMeanBytesPerSecond
== 0 || aMaxBytesPerSecond
== 0 ||
314 aMaxBytesPerSecond
< aMeanBytesPerSecond
) {
315 return NS_ERROR_ILLEGAL_VALUE
;
318 mMeanBytesPerSecond
= aMeanBytesPerSecond
;
319 mMaxBytesPerSecond
= aMaxBytesPerSecond
;
324 ThrottleQueue::BytesProcessed(uint64_t* aResult
) {
325 *aResult
= mBytesProcessed
;
330 ThrottleQueue::WrapStream(nsIInputStream
* aInputStream
,
331 nsIAsyncInputStream
** aResult
) {
332 nsCOMPtr
<nsIAsyncInputStream
> result
=
333 new ThrottleInputStream(aInputStream
, this);
334 result
.forget(aResult
);
339 ThrottleQueue::Notify(nsITimer
* aTimer
) {
340 MOZ_ASSERT(OnSocketThread(), "not on socket thread");
341 // A notified reader may need to push itself back on the queue.
342 // Swap out the list of readers so that this works properly.
343 nsTArray
<RefPtr
<ThrottleInputStream
>> events
= std::move(mAsyncEvents
);
345 // Optimistically notify all the waiting readers, and then let them
346 // requeue if there isn't enough bandwidth.
347 for (size_t i
= 0; i
< events
.Length(); ++i
) {
348 events
[i
]->AllowInput();
356 ThrottleQueue::GetName(nsACString
& aName
) {
357 aName
.AssignLiteral("net::ThrottleQueue");
361 void ThrottleQueue::QueueStream(ThrottleInputStream
* aStream
) {
362 MOZ_ASSERT(OnSocketThread(), "not on socket thread");
363 if (mAsyncEvents
.IndexOf(aStream
) ==
364 nsTArray
<RefPtr
<mozilla::net::ThrottleInputStream
>>::NoIndex
) {
365 mAsyncEvents
.AppendElement(aStream
);
369 if (mReadEvents
.Length() > 0) {
370 TimeStamp t
= mReadEvents
[0].mTime
+ TimeDuration::FromSeconds(1);
371 TimeStamp now
= TimeStamp::Now();
374 ms
= static_cast<uint32_t>((t
- now
).ToMilliseconds());
381 mTimer
->InitWithCallback(this, ms
, nsITimer::TYPE_ONE_SHOT
))) {
388 void ThrottleQueue::DequeueStream(ThrottleInputStream
* aStream
) {
389 MOZ_ASSERT(OnSocketThread(), "not on socket thread");
390 mAsyncEvents
.RemoveElement(aStream
);
394 ThrottleQueue::GetMeanBytesPerSecond(uint32_t* aMeanBytesPerSecond
) {
395 NS_ENSURE_ARG(aMeanBytesPerSecond
);
397 *aMeanBytesPerSecond
= mMeanBytesPerSecond
;
402 ThrottleQueue::GetMaxBytesPerSecond(uint32_t* aMaxBytesPerSecond
) {
403 NS_ENSURE_ARG(aMaxBytesPerSecond
);
405 *aMaxBytesPerSecond
= mMaxBytesPerSecond
;
410 } // namespace mozilla