Bug 1909613 - Enable <details name=''> everywhere, r=emilio
[gecko.git] / netwerk / base / ThrottleQueue.cpp
blob83ab30efbe85028d0612d7dd9957d6774111ae47
1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* vim: set ts=8 sts=2 et sw=2 tw=80: */
3 /* This Source Code Form is subject to the terms of the Mozilla Public
4 * License, v. 2.0. If a copy of the MPL was not distributed with this
5 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
7 #include "ThrottleQueue.h"
8 #include "mozilla/Components.h"
9 #include "mozilla/net/InputChannelThrottleQueueParent.h"
10 #include "nsISeekableStream.h"
11 #include "nsIAsyncInputStream.h"
12 #include "nsIOService.h"
13 #include "nsSocketTransportService2.h"
14 #include "nsStreamUtils.h"
15 #include "nsNetUtil.h"
17 namespace mozilla {
18 namespace net {
20 //-----------------------------------------------------------------------------
22 class ThrottleInputStream final : public nsIAsyncInputStream,
23 public nsISeekableStream {
24 public:
25 ThrottleInputStream(nsIInputStream* aStream, ThrottleQueue* aQueue);
27 NS_DECL_THREADSAFE_ISUPPORTS
28 NS_DECL_NSIINPUTSTREAM
29 NS_DECL_NSISEEKABLESTREAM
30 NS_DECL_NSITELLABLESTREAM
31 NS_DECL_NSIASYNCINPUTSTREAM
33 void AllowInput();
35 private:
36 ~ThrottleInputStream();
38 nsCOMPtr<nsIInputStream> mStream;
39 RefPtr<ThrottleQueue> mQueue;
40 nsresult mClosedStatus;
42 nsCOMPtr<nsIInputStreamCallback> mCallback;
43 nsCOMPtr<nsIEventTarget> mEventTarget;
46 NS_IMPL_ISUPPORTS(ThrottleInputStream, nsIAsyncInputStream, nsIInputStream,
47 nsITellableStream, nsISeekableStream)
49 ThrottleInputStream::ThrottleInputStream(nsIInputStream* aStream,
50 ThrottleQueue* aQueue)
51 : mStream(aStream), mQueue(aQueue), mClosedStatus(NS_OK) {
52 MOZ_ASSERT(aQueue != nullptr);
55 ThrottleInputStream::~ThrottleInputStream() { Close(); }
57 NS_IMETHODIMP
58 ThrottleInputStream::Close() {
59 if (NS_FAILED(mClosedStatus)) {
60 return mClosedStatus;
63 if (mQueue) {
64 mQueue->DequeueStream(this);
65 mQueue = nullptr;
66 mClosedStatus = NS_BASE_STREAM_CLOSED;
68 return mStream->Close();
71 NS_IMETHODIMP
72 ThrottleInputStream::Available(uint64_t* aResult) {
73 if (NS_FAILED(mClosedStatus)) {
74 return mClosedStatus;
77 return mStream->Available(aResult);
80 NS_IMETHODIMP
81 ThrottleInputStream::StreamStatus() {
82 if (NS_FAILED(mClosedStatus)) {
83 return mClosedStatus;
86 return mStream->StreamStatus();
89 NS_IMETHODIMP
90 ThrottleInputStream::Read(char* aBuf, uint32_t aCount, uint32_t* aResult) {
91 if (NS_FAILED(mClosedStatus)) {
92 return mClosedStatus;
95 uint32_t realCount;
96 nsresult rv = mQueue->Available(aCount, &realCount);
97 if (NS_FAILED(rv)) {
98 return rv;
101 if (realCount == 0) {
102 return NS_BASE_STREAM_WOULD_BLOCK;
105 rv = mStream->Read(aBuf, realCount, aResult);
106 if (NS_SUCCEEDED(rv) && *aResult > 0) {
107 mQueue->RecordRead(*aResult);
109 return rv;
112 NS_IMETHODIMP
113 ThrottleInputStream::ReadSegments(nsWriteSegmentFun aWriter, void* aClosure,
114 uint32_t aCount, uint32_t* aResult) {
115 if (NS_FAILED(mClosedStatus)) {
116 return mClosedStatus;
119 uint32_t realCount;
120 nsresult rv = mQueue->Available(aCount, &realCount);
121 if (NS_FAILED(rv)) {
122 return rv;
124 MOZ_ASSERT(realCount <= aCount);
126 if (realCount == 0) {
127 return NS_BASE_STREAM_WOULD_BLOCK;
130 rv = mStream->ReadSegments(aWriter, aClosure, realCount, aResult);
131 if (NS_SUCCEEDED(rv) && *aResult > 0) {
132 mQueue->RecordRead(*aResult);
134 return rv;
137 NS_IMETHODIMP
138 ThrottleInputStream::IsNonBlocking(bool* aNonBlocking) {
139 *aNonBlocking = true;
140 return NS_OK;
143 NS_IMETHODIMP
144 ThrottleInputStream::Seek(int32_t aWhence, int64_t aOffset) {
145 if (NS_FAILED(mClosedStatus)) {
146 return mClosedStatus;
149 nsCOMPtr<nsISeekableStream> sstream = do_QueryInterface(mStream);
150 if (!sstream) {
151 return NS_ERROR_FAILURE;
154 return sstream->Seek(aWhence, aOffset);
157 NS_IMETHODIMP
158 ThrottleInputStream::Tell(int64_t* aResult) {
159 if (NS_FAILED(mClosedStatus)) {
160 return mClosedStatus;
163 nsCOMPtr<nsITellableStream> sstream = do_QueryInterface(mStream);
164 if (!sstream) {
165 return NS_ERROR_FAILURE;
168 return sstream->Tell(aResult);
171 NS_IMETHODIMP
172 ThrottleInputStream::SetEOF() {
173 if (NS_FAILED(mClosedStatus)) {
174 return mClosedStatus;
177 nsCOMPtr<nsISeekableStream> sstream = do_QueryInterface(mStream);
178 if (!sstream) {
179 return NS_ERROR_FAILURE;
182 return sstream->SetEOF();
185 NS_IMETHODIMP
186 ThrottleInputStream::CloseWithStatus(nsresult aStatus) {
187 if (NS_FAILED(mClosedStatus)) {
188 // Already closed, ignore.
189 return NS_OK;
191 if (NS_SUCCEEDED(aStatus)) {
192 aStatus = NS_BASE_STREAM_CLOSED;
195 mClosedStatus = Close();
196 if (NS_SUCCEEDED(mClosedStatus)) {
197 mClosedStatus = aStatus;
199 return NS_OK;
202 NS_IMETHODIMP
203 ThrottleInputStream::AsyncWait(nsIInputStreamCallback* aCallback,
204 uint32_t aFlags, uint32_t aRequestedCount,
205 nsIEventTarget* aEventTarget) {
206 if (aFlags != 0) {
207 return NS_ERROR_ILLEGAL_VALUE;
210 mCallback = aCallback;
211 mEventTarget = aEventTarget;
212 if (mCallback) {
213 mQueue->QueueStream(this);
214 } else {
215 mQueue->DequeueStream(this);
217 return NS_OK;
220 void ThrottleInputStream::AllowInput() {
221 MOZ_ASSERT(mCallback);
222 nsCOMPtr<nsIInputStreamCallback> callbackEvent = NS_NewInputStreamReadyEvent(
223 "ThrottleInputStream::AllowInput", mCallback, mEventTarget);
224 mCallback = nullptr;
225 mEventTarget = nullptr;
226 callbackEvent->OnInputStreamReady(this);
229 //-----------------------------------------------------------------------------
231 // static
232 already_AddRefed<nsIInputChannelThrottleQueue> ThrottleQueue::Create() {
233 MOZ_ASSERT(XRE_IsParentProcess());
235 nsCOMPtr<nsIInputChannelThrottleQueue> tq;
236 if (nsIOService::UseSocketProcess()) {
237 tq = new InputChannelThrottleQueueParent();
238 } else {
239 tq = new ThrottleQueue();
242 return tq.forget();
245 NS_IMPL_ISUPPORTS(ThrottleQueue, nsIInputChannelThrottleQueue, nsITimerCallback,
246 nsINamed)
248 ThrottleQueue::ThrottleQueue()
251 nsresult rv;
252 nsCOMPtr<nsIEventTarget> sts;
253 nsCOMPtr<nsIIOService> ioService = do_GetIOService(&rv);
254 if (NS_SUCCEEDED(rv)) {
255 sts = mozilla::components::SocketTransport::Service(&rv);
257 if (NS_SUCCEEDED(rv)) mTimer = NS_NewTimer(sts);
260 ThrottleQueue::~ThrottleQueue() {
261 if (mTimer && mTimerArmed) {
262 mTimer->Cancel();
264 mTimer = nullptr;
267 NS_IMETHODIMP
268 ThrottleQueue::RecordRead(uint32_t aBytesRead) {
269 MOZ_ASSERT(OnSocketThread(), "not on socket thread");
270 ThrottleEntry entry;
271 entry.mTime = TimeStamp::Now();
272 entry.mBytesRead = aBytesRead;
273 mReadEvents.AppendElement(entry);
274 mBytesProcessed += aBytesRead;
275 return NS_OK;
278 NS_IMETHODIMP
279 ThrottleQueue::Available(uint32_t aRemaining, uint32_t* aAvailable) {
280 MOZ_ASSERT(OnSocketThread(), "not on socket thread");
281 TimeStamp now = TimeStamp::Now();
282 TimeStamp oneSecondAgo = now - TimeDuration::FromSeconds(1);
283 size_t i;
285 // Remove all stale events.
286 for (i = 0; i < mReadEvents.Length(); ++i) {
287 if (mReadEvents[i].mTime >= oneSecondAgo) {
288 break;
291 mReadEvents.RemoveElementsAt(0, i);
293 uint32_t totalBytes = 0;
294 for (i = 0; i < mReadEvents.Length(); ++i) {
295 totalBytes += mReadEvents[i].mBytesRead;
298 uint32_t spread = mMaxBytesPerSecond - mMeanBytesPerSecond;
299 double prob = static_cast<double>(rand()) / RAND_MAX;
300 uint32_t thisSliceBytes =
301 mMeanBytesPerSecond - spread + static_cast<uint32_t>(2 * spread * prob);
303 if (totalBytes >= thisSliceBytes) {
304 *aAvailable = 0;
305 } else {
306 *aAvailable = std::min(thisSliceBytes, aRemaining);
308 return NS_OK;
311 NS_IMETHODIMP
312 ThrottleQueue::Init(uint32_t aMeanBytesPerSecond, uint32_t aMaxBytesPerSecond) {
313 // Can be called on any thread.
314 if (aMeanBytesPerSecond == 0 || aMaxBytesPerSecond == 0 ||
315 aMaxBytesPerSecond < aMeanBytesPerSecond) {
316 return NS_ERROR_ILLEGAL_VALUE;
319 mMeanBytesPerSecond = aMeanBytesPerSecond;
320 mMaxBytesPerSecond = aMaxBytesPerSecond;
321 return NS_OK;
324 NS_IMETHODIMP
325 ThrottleQueue::BytesProcessed(uint64_t* aResult) {
326 *aResult = mBytesProcessed;
327 return NS_OK;
330 NS_IMETHODIMP
331 ThrottleQueue::WrapStream(nsIInputStream* aInputStream,
332 nsIAsyncInputStream** aResult) {
333 nsCOMPtr<nsIAsyncInputStream> result =
334 new ThrottleInputStream(aInputStream, this);
335 result.forget(aResult);
336 return NS_OK;
339 NS_IMETHODIMP
340 ThrottleQueue::Notify(nsITimer* aTimer) {
341 MOZ_ASSERT(OnSocketThread(), "not on socket thread");
342 // A notified reader may need to push itself back on the queue.
343 // Swap out the list of readers so that this works properly.
344 nsTArray<RefPtr<ThrottleInputStream>> events = std::move(mAsyncEvents);
346 // Optimistically notify all the waiting readers, and then let them
347 // requeue if there isn't enough bandwidth.
348 for (size_t i = 0; i < events.Length(); ++i) {
349 events[i]->AllowInput();
352 mTimerArmed = false;
353 return NS_OK;
356 NS_IMETHODIMP
357 ThrottleQueue::GetName(nsACString& aName) {
358 aName.AssignLiteral("net::ThrottleQueue");
359 return NS_OK;
362 void ThrottleQueue::QueueStream(ThrottleInputStream* aStream) {
363 MOZ_ASSERT(OnSocketThread(), "not on socket thread");
364 if (mAsyncEvents.IndexOf(aStream) ==
365 nsTArray<RefPtr<mozilla::net::ThrottleInputStream>>::NoIndex) {
366 mAsyncEvents.AppendElement(aStream);
368 if (!mTimerArmed) {
369 uint32_t ms = 1000;
370 if (mReadEvents.Length() > 0) {
371 TimeStamp t = mReadEvents[0].mTime + TimeDuration::FromSeconds(1);
372 TimeStamp now = TimeStamp::Now();
374 if (t > now) {
375 ms = static_cast<uint32_t>((t - now).ToMilliseconds());
376 } else {
377 ms = 1;
381 if (NS_SUCCEEDED(
382 mTimer->InitWithCallback(this, ms, nsITimer::TYPE_ONE_SHOT))) {
383 mTimerArmed = true;
389 void ThrottleQueue::DequeueStream(ThrottleInputStream* aStream) {
390 MOZ_ASSERT(OnSocketThread(), "not on socket thread");
391 mAsyncEvents.RemoveElement(aStream);
394 NS_IMETHODIMP
395 ThrottleQueue::GetMeanBytesPerSecond(uint32_t* aMeanBytesPerSecond) {
396 NS_ENSURE_ARG(aMeanBytesPerSecond);
398 *aMeanBytesPerSecond = mMeanBytesPerSecond;
399 return NS_OK;
402 NS_IMETHODIMP
403 ThrottleQueue::GetMaxBytesPerSecond(uint32_t* aMaxBytesPerSecond) {
404 NS_ENSURE_ARG(aMaxBytesPerSecond);
406 *aMaxBytesPerSecond = mMaxBytesPerSecond;
407 return NS_OK;
410 } // namespace net
411 } // namespace mozilla