Merge mozilla-central to autoland. CLOSED TREE
[gecko.git] / netwerk / base / ThrottleQueue.cpp
blob4313a6ecb3a298c89393bfc48db38008cb4955bf
1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* vim: set ts=8 sts=2 et sw=2 tw=80: */
3 /* This Source Code Form is subject to the terms of the Mozilla Public
4 * License, v. 2.0. If a copy of the MPL was not distributed with this
5 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
7 #include "ThrottleQueue.h"
8 #include "mozilla/net/InputChannelThrottleQueueParent.h"
9 #include "nsISeekableStream.h"
10 #include "nsIAsyncInputStream.h"
11 #include "nsIOService.h"
12 #include "nsSocketTransportService2.h"
13 #include "nsStreamUtils.h"
14 #include "nsNetUtil.h"
16 namespace mozilla {
17 namespace net {
19 //-----------------------------------------------------------------------------
21 class ThrottleInputStream final : public nsIAsyncInputStream,
22 public nsISeekableStream {
23 public:
24 ThrottleInputStream(nsIInputStream* aStream, ThrottleQueue* aQueue);
26 NS_DECL_THREADSAFE_ISUPPORTS
27 NS_DECL_NSIINPUTSTREAM
28 NS_DECL_NSISEEKABLESTREAM
29 NS_DECL_NSITELLABLESTREAM
30 NS_DECL_NSIASYNCINPUTSTREAM
32 void AllowInput();
34 private:
35 ~ThrottleInputStream();
37 nsCOMPtr<nsIInputStream> mStream;
38 RefPtr<ThrottleQueue> mQueue;
39 nsresult mClosedStatus;
41 nsCOMPtr<nsIInputStreamCallback> mCallback;
42 nsCOMPtr<nsIEventTarget> mEventTarget;
45 NS_IMPL_ISUPPORTS(ThrottleInputStream, nsIAsyncInputStream, nsIInputStream,
46 nsITellableStream, nsISeekableStream)
48 ThrottleInputStream::ThrottleInputStream(nsIInputStream* aStream,
49 ThrottleQueue* aQueue)
50 : mStream(aStream), mQueue(aQueue), mClosedStatus(NS_OK) {
51 MOZ_ASSERT(aQueue != nullptr);
54 ThrottleInputStream::~ThrottleInputStream() { Close(); }
56 NS_IMETHODIMP
57 ThrottleInputStream::Close() {
58 if (NS_FAILED(mClosedStatus)) {
59 return mClosedStatus;
62 if (mQueue) {
63 mQueue->DequeueStream(this);
64 mQueue = nullptr;
65 mClosedStatus = NS_BASE_STREAM_CLOSED;
67 return mStream->Close();
70 NS_IMETHODIMP
71 ThrottleInputStream::Available(uint64_t* aResult) {
72 if (NS_FAILED(mClosedStatus)) {
73 return mClosedStatus;
76 return mStream->Available(aResult);
79 NS_IMETHODIMP
80 ThrottleInputStream::StreamStatus() {
81 if (NS_FAILED(mClosedStatus)) {
82 return mClosedStatus;
85 return mStream->StreamStatus();
88 NS_IMETHODIMP
89 ThrottleInputStream::Read(char* aBuf, uint32_t aCount, uint32_t* aResult) {
90 if (NS_FAILED(mClosedStatus)) {
91 return mClosedStatus;
94 uint32_t realCount;
95 nsresult rv = mQueue->Available(aCount, &realCount);
96 if (NS_FAILED(rv)) {
97 return rv;
100 if (realCount == 0) {
101 return NS_BASE_STREAM_WOULD_BLOCK;
104 rv = mStream->Read(aBuf, realCount, aResult);
105 if (NS_SUCCEEDED(rv) && *aResult > 0) {
106 mQueue->RecordRead(*aResult);
108 return rv;
111 NS_IMETHODIMP
112 ThrottleInputStream::ReadSegments(nsWriteSegmentFun aWriter, void* aClosure,
113 uint32_t aCount, uint32_t* aResult) {
114 if (NS_FAILED(mClosedStatus)) {
115 return mClosedStatus;
118 uint32_t realCount;
119 nsresult rv = mQueue->Available(aCount, &realCount);
120 if (NS_FAILED(rv)) {
121 return rv;
123 MOZ_ASSERT(realCount <= aCount);
125 if (realCount == 0) {
126 return NS_BASE_STREAM_WOULD_BLOCK;
129 rv = mStream->ReadSegments(aWriter, aClosure, realCount, aResult);
130 if (NS_SUCCEEDED(rv) && *aResult > 0) {
131 mQueue->RecordRead(*aResult);
133 return rv;
136 NS_IMETHODIMP
137 ThrottleInputStream::IsNonBlocking(bool* aNonBlocking) {
138 *aNonBlocking = true;
139 return NS_OK;
142 NS_IMETHODIMP
143 ThrottleInputStream::Seek(int32_t aWhence, int64_t aOffset) {
144 if (NS_FAILED(mClosedStatus)) {
145 return mClosedStatus;
148 nsCOMPtr<nsISeekableStream> sstream = do_QueryInterface(mStream);
149 if (!sstream) {
150 return NS_ERROR_FAILURE;
153 return sstream->Seek(aWhence, aOffset);
156 NS_IMETHODIMP
157 ThrottleInputStream::Tell(int64_t* aResult) {
158 if (NS_FAILED(mClosedStatus)) {
159 return mClosedStatus;
162 nsCOMPtr<nsITellableStream> sstream = do_QueryInterface(mStream);
163 if (!sstream) {
164 return NS_ERROR_FAILURE;
167 return sstream->Tell(aResult);
170 NS_IMETHODIMP
171 ThrottleInputStream::SetEOF() {
172 if (NS_FAILED(mClosedStatus)) {
173 return mClosedStatus;
176 nsCOMPtr<nsISeekableStream> sstream = do_QueryInterface(mStream);
177 if (!sstream) {
178 return NS_ERROR_FAILURE;
181 return sstream->SetEOF();
184 NS_IMETHODIMP
185 ThrottleInputStream::CloseWithStatus(nsresult aStatus) {
186 if (NS_FAILED(mClosedStatus)) {
187 // Already closed, ignore.
188 return NS_OK;
190 if (NS_SUCCEEDED(aStatus)) {
191 aStatus = NS_BASE_STREAM_CLOSED;
194 mClosedStatus = Close();
195 if (NS_SUCCEEDED(mClosedStatus)) {
196 mClosedStatus = aStatus;
198 return NS_OK;
201 NS_IMETHODIMP
202 ThrottleInputStream::AsyncWait(nsIInputStreamCallback* aCallback,
203 uint32_t aFlags, uint32_t aRequestedCount,
204 nsIEventTarget* aEventTarget) {
205 if (aFlags != 0) {
206 return NS_ERROR_ILLEGAL_VALUE;
209 mCallback = aCallback;
210 mEventTarget = aEventTarget;
211 if (mCallback) {
212 mQueue->QueueStream(this);
213 } else {
214 mQueue->DequeueStream(this);
216 return NS_OK;
219 void ThrottleInputStream::AllowInput() {
220 MOZ_ASSERT(mCallback);
221 nsCOMPtr<nsIInputStreamCallback> callbackEvent = NS_NewInputStreamReadyEvent(
222 "ThrottleInputStream::AllowInput", mCallback, mEventTarget);
223 mCallback = nullptr;
224 mEventTarget = nullptr;
225 callbackEvent->OnInputStreamReady(this);
228 //-----------------------------------------------------------------------------
230 // static
231 already_AddRefed<nsIInputChannelThrottleQueue> ThrottleQueue::Create() {
232 MOZ_ASSERT(XRE_IsParentProcess());
234 nsCOMPtr<nsIInputChannelThrottleQueue> tq;
235 if (nsIOService::UseSocketProcess()) {
236 tq = new InputChannelThrottleQueueParent();
237 } else {
238 tq = new ThrottleQueue();
241 return tq.forget();
244 NS_IMPL_ISUPPORTS(ThrottleQueue, nsIInputChannelThrottleQueue, nsITimerCallback,
245 nsINamed)
247 ThrottleQueue::ThrottleQueue()
250 nsresult rv;
251 nsCOMPtr<nsIEventTarget> sts;
252 nsCOMPtr<nsIIOService> ioService = do_GetIOService(&rv);
253 if (NS_SUCCEEDED(rv)) {
254 sts = do_GetService(NS_SOCKETTRANSPORTSERVICE_CONTRACTID, &rv);
256 if (NS_SUCCEEDED(rv)) mTimer = NS_NewTimer(sts);
259 ThrottleQueue::~ThrottleQueue() {
260 if (mTimer && mTimerArmed) {
261 mTimer->Cancel();
263 mTimer = nullptr;
266 NS_IMETHODIMP
267 ThrottleQueue::RecordRead(uint32_t aBytesRead) {
268 MOZ_ASSERT(OnSocketThread(), "not on socket thread");
269 ThrottleEntry entry;
270 entry.mTime = TimeStamp::Now();
271 entry.mBytesRead = aBytesRead;
272 mReadEvents.AppendElement(entry);
273 mBytesProcessed += aBytesRead;
274 return NS_OK;
277 NS_IMETHODIMP
278 ThrottleQueue::Available(uint32_t aRemaining, uint32_t* aAvailable) {
279 MOZ_ASSERT(OnSocketThread(), "not on socket thread");
280 TimeStamp now = TimeStamp::Now();
281 TimeStamp oneSecondAgo = now - TimeDuration::FromSeconds(1);
282 size_t i;
284 // Remove all stale events.
285 for (i = 0; i < mReadEvents.Length(); ++i) {
286 if (mReadEvents[i].mTime >= oneSecondAgo) {
287 break;
290 mReadEvents.RemoveElementsAt(0, i);
292 uint32_t totalBytes = 0;
293 for (i = 0; i < mReadEvents.Length(); ++i) {
294 totalBytes += mReadEvents[i].mBytesRead;
297 uint32_t spread = mMaxBytesPerSecond - mMeanBytesPerSecond;
298 double prob = static_cast<double>(rand()) / RAND_MAX;
299 uint32_t thisSliceBytes =
300 mMeanBytesPerSecond - spread + static_cast<uint32_t>(2 * spread * prob);
302 if (totalBytes >= thisSliceBytes) {
303 *aAvailable = 0;
304 } else {
305 *aAvailable = std::min(thisSliceBytes, aRemaining);
307 return NS_OK;
310 NS_IMETHODIMP
311 ThrottleQueue::Init(uint32_t aMeanBytesPerSecond, uint32_t aMaxBytesPerSecond) {
312 // Can be called on any thread.
313 if (aMeanBytesPerSecond == 0 || aMaxBytesPerSecond == 0 ||
314 aMaxBytesPerSecond < aMeanBytesPerSecond) {
315 return NS_ERROR_ILLEGAL_VALUE;
318 mMeanBytesPerSecond = aMeanBytesPerSecond;
319 mMaxBytesPerSecond = aMaxBytesPerSecond;
320 return NS_OK;
323 NS_IMETHODIMP
324 ThrottleQueue::BytesProcessed(uint64_t* aResult) {
325 *aResult = mBytesProcessed;
326 return NS_OK;
329 NS_IMETHODIMP
330 ThrottleQueue::WrapStream(nsIInputStream* aInputStream,
331 nsIAsyncInputStream** aResult) {
332 nsCOMPtr<nsIAsyncInputStream> result =
333 new ThrottleInputStream(aInputStream, this);
334 result.forget(aResult);
335 return NS_OK;
338 NS_IMETHODIMP
339 ThrottleQueue::Notify(nsITimer* aTimer) {
340 MOZ_ASSERT(OnSocketThread(), "not on socket thread");
341 // A notified reader may need to push itself back on the queue.
342 // Swap out the list of readers so that this works properly.
343 nsTArray<RefPtr<ThrottleInputStream>> events = std::move(mAsyncEvents);
345 // Optimistically notify all the waiting readers, and then let them
346 // requeue if there isn't enough bandwidth.
347 for (size_t i = 0; i < events.Length(); ++i) {
348 events[i]->AllowInput();
351 mTimerArmed = false;
352 return NS_OK;
355 NS_IMETHODIMP
356 ThrottleQueue::GetName(nsACString& aName) {
357 aName.AssignLiteral("net::ThrottleQueue");
358 return NS_OK;
361 void ThrottleQueue::QueueStream(ThrottleInputStream* aStream) {
362 MOZ_ASSERT(OnSocketThread(), "not on socket thread");
363 if (mAsyncEvents.IndexOf(aStream) ==
364 nsTArray<RefPtr<mozilla::net::ThrottleInputStream>>::NoIndex) {
365 mAsyncEvents.AppendElement(aStream);
367 if (!mTimerArmed) {
368 uint32_t ms = 1000;
369 if (mReadEvents.Length() > 0) {
370 TimeStamp t = mReadEvents[0].mTime + TimeDuration::FromSeconds(1);
371 TimeStamp now = TimeStamp::Now();
373 if (t > now) {
374 ms = static_cast<uint32_t>((t - now).ToMilliseconds());
375 } else {
376 ms = 1;
380 if (NS_SUCCEEDED(
381 mTimer->InitWithCallback(this, ms, nsITimer::TYPE_ONE_SHOT))) {
382 mTimerArmed = true;
388 void ThrottleQueue::DequeueStream(ThrottleInputStream* aStream) {
389 MOZ_ASSERT(OnSocketThread(), "not on socket thread");
390 mAsyncEvents.RemoveElement(aStream);
393 NS_IMETHODIMP
394 ThrottleQueue::GetMeanBytesPerSecond(uint32_t* aMeanBytesPerSecond) {
395 NS_ENSURE_ARG(aMeanBytesPerSecond);
397 *aMeanBytesPerSecond = mMeanBytesPerSecond;
398 return NS_OK;
401 NS_IMETHODIMP
402 ThrottleQueue::GetMaxBytesPerSecond(uint32_t* aMaxBytesPerSecond) {
403 NS_ENSURE_ARG(aMaxBytesPerSecond);
405 *aMaxBytesPerSecond = mMaxBytesPerSecond;
406 return NS_OK;
409 } // namespace net
410 } // namespace mozilla