Bug 1861709 replace AudioCallbackDriver::ThreadRunning() assertions that mean to...
[gecko.git] / netwerk / base / nsStreamTransportService.cpp
blobe1369bbcb5ff0cd682c07f4c2af6021c8ae1cdc6
1 /* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
5 #include "nsStreamTransportService.h"
6 #include "ErrorList.h"
7 #include "nsXPCOMCIDInternal.h"
8 #include "nsNetSegmentUtils.h"
9 #include "nsTransportUtils.h"
10 #include "nsStreamUtils.h"
11 #include "nsError.h"
12 #include "nsNetCID.h"
14 #include "nsIAsyncInputStream.h"
15 #include "nsIAsyncOutputStream.h"
16 #include "nsIPipe.h"
17 #include "nsITransport.h"
18 #include "nsIObserverService.h"
19 #include "nsThreadPool.h"
20 #include "mozilla/Services.h"
22 namespace mozilla {
23 namespace net {
25 //-----------------------------------------------------------------------------
26 // nsInputStreamTransport
28 // Implements nsIInputStream as a wrapper around the real input stream. This
29 // allows the transport to support seeking, range-limiting, progress reporting,
30 // and close-when-done semantics while utilizing NS_AsyncCopy.
31 //-----------------------------------------------------------------------------
33 class nsInputStreamTransport : public nsITransport,
34 public nsIAsyncInputStream,
35 public nsIInputStreamCallback {
36 public:
37 NS_DECL_THREADSAFE_ISUPPORTS
38 NS_DECL_NSITRANSPORT
39 NS_DECL_NSIINPUTSTREAM
40 NS_DECL_NSIASYNCINPUTSTREAM
41 NS_DECL_NSIINPUTSTREAMCALLBACK
43 nsInputStreamTransport(nsIInputStream* source, bool closeWhenDone)
44 : mSource(source), mCloseWhenDone(closeWhenDone) {
45 mAsyncSource = do_QueryInterface(mSource);
48 private:
49 virtual ~nsInputStreamTransport() = default;
51 Mutex mMutex MOZ_UNANNOTATED{"nsInputStreamTransport::mMutex"};
53 // This value is protected by mutex.
54 nsCOMPtr<nsIInputStreamCallback> mAsyncWaitCallback;
56 nsCOMPtr<nsIAsyncInputStream> mPipeIn;
58 // while the copy is active, these members may only be accessed from the
59 // nsIInputStream implementation.
60 nsCOMPtr<nsITransportEventSink> mEventSink;
61 nsCOMPtr<nsIInputStream> mSource;
63 // It can be null.
64 nsCOMPtr<nsIAsyncInputStream> mAsyncSource;
66 int64_t mOffset{0};
67 const bool mCloseWhenDone;
69 // this variable serves as a lock to prevent the state of the transport
70 // from being modified once the copy is in progress.
71 bool mInProgress{false};
74 NS_IMPL_ADDREF(nsInputStreamTransport);
75 NS_IMPL_RELEASE(nsInputStreamTransport);
77 NS_INTERFACE_MAP_BEGIN(nsInputStreamTransport)
78 NS_INTERFACE_MAP_ENTRY(nsITransport)
79 NS_INTERFACE_MAP_ENTRY(nsIInputStream)
80 NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsIAsyncInputStream, !!mAsyncSource)
81 NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsIInputStreamCallback, !!mAsyncSource)
82 NS_INTERFACE_MAP_ENTRY_AMBIGUOUS(nsISupports, nsITransport)
83 NS_INTERFACE_MAP_END
85 /** nsITransport **/
87 NS_IMETHODIMP
88 nsInputStreamTransport::OpenInputStream(uint32_t flags, uint32_t segsize,
89 uint32_t segcount,
90 nsIInputStream** result) {
91 NS_ENSURE_TRUE(!mInProgress, NS_ERROR_IN_PROGRESS);
93 nsresult rv;
94 nsCOMPtr<nsIEventTarget> target =
95 do_GetService(NS_STREAMTRANSPORTSERVICE_CONTRACTID, &rv);
96 if (NS_FAILED(rv)) return rv;
98 // XXX if the caller requests an unbuffered stream, then perhaps
99 // we'd want to simply return mSource; however, then we would
100 // not be reading mSource on a background thread. is this ok?
102 bool nonblocking = !(flags & OPEN_BLOCKING);
104 net_ResolveSegmentParams(segsize, segcount);
106 nsCOMPtr<nsIAsyncOutputStream> pipeOut;
107 NS_NewPipe2(getter_AddRefs(mPipeIn), getter_AddRefs(pipeOut), nonblocking,
108 true, segsize, segcount);
110 mInProgress = true;
112 // startup async copy process...
113 rv = NS_AsyncCopy(this, pipeOut, target, NS_ASYNCCOPY_VIA_WRITESEGMENTS,
114 segsize);
115 if (NS_FAILED(rv)) {
116 return rv;
118 *result = do_AddRef(mPipeIn).take();
119 return NS_OK;
122 NS_IMETHODIMP
123 nsInputStreamTransport::OpenOutputStream(uint32_t flags, uint32_t segsize,
124 uint32_t segcount,
125 nsIOutputStream** result) {
126 // this transport only supports reading!
127 MOZ_ASSERT_UNREACHABLE("nsInputStreamTransport::OpenOutputStream");
128 return NS_ERROR_UNEXPECTED;
131 NS_IMETHODIMP
132 nsInputStreamTransport::Close(nsresult reason) {
133 if (NS_SUCCEEDED(reason)) reason = NS_BASE_STREAM_CLOSED;
135 return mPipeIn->CloseWithStatus(reason);
138 NS_IMETHODIMP
139 nsInputStreamTransport::SetEventSink(nsITransportEventSink* sink,
140 nsIEventTarget* target) {
141 NS_ENSURE_TRUE(!mInProgress, NS_ERROR_IN_PROGRESS);
143 if (target) {
144 return net_NewTransportEventSinkProxy(getter_AddRefs(mEventSink), sink,
145 target);
148 mEventSink = sink;
149 return NS_OK;
152 /** nsIInputStream **/
154 NS_IMETHODIMP
155 nsInputStreamTransport::Close() {
156 if (mCloseWhenDone) mSource->Close();
158 // make additional reads return early...
159 mOffset = 0;
160 return NS_OK;
163 NS_IMETHODIMP
164 nsInputStreamTransport::Available(uint64_t* result) {
165 return NS_ERROR_NOT_IMPLEMENTED;
168 NS_IMETHODIMP
169 nsInputStreamTransport::StreamStatus() { return mSource->StreamStatus(); }
171 NS_IMETHODIMP
172 nsInputStreamTransport::Read(char* buf, uint32_t count, uint32_t* result) {
173 nsresult rv = mSource->Read(buf, count, result);
175 if (NS_SUCCEEDED(rv)) {
176 mOffset += *result;
177 if (mEventSink) {
178 mEventSink->OnTransportStatus(this, NS_NET_STATUS_READING, mOffset, -1);
181 return rv;
184 NS_IMETHODIMP
185 nsInputStreamTransport::ReadSegments(nsWriteSegmentFun writer, void* closure,
186 uint32_t count, uint32_t* result) {
187 return NS_ERROR_NOT_IMPLEMENTED;
190 NS_IMETHODIMP
191 nsInputStreamTransport::IsNonBlocking(bool* result) {
192 *result = false;
193 return NS_OK;
196 // nsIAsyncInputStream interface
198 NS_IMETHODIMP
199 nsInputStreamTransport::CloseWithStatus(nsresult aStatus) { return Close(); }
201 NS_IMETHODIMP
202 nsInputStreamTransport::AsyncWait(nsIInputStreamCallback* aCallback,
203 uint32_t aFlags, uint32_t aRequestedCount,
204 nsIEventTarget* aEventTarget) {
205 NS_ENSURE_STATE(!!mAsyncSource);
207 nsCOMPtr<nsIInputStreamCallback> callback = aCallback ? this : nullptr;
210 MutexAutoLock lock(mMutex);
212 if (NS_WARN_IF(mAsyncWaitCallback && aCallback &&
213 mAsyncWaitCallback != aCallback)) {
214 return NS_ERROR_FAILURE;
217 mAsyncWaitCallback = aCallback;
220 return mAsyncSource->AsyncWait(callback, aFlags, aRequestedCount,
221 aEventTarget);
224 // nsIInputStreamCallback
226 NS_IMETHODIMP
227 nsInputStreamTransport::OnInputStreamReady(nsIAsyncInputStream* aStream) {
228 nsCOMPtr<nsIInputStreamCallback> callback;
230 MutexAutoLock lock(mMutex);
232 // We have been canceled in the meanwhile.
233 if (!mAsyncWaitCallback) {
234 return NS_OK;
237 callback.swap(mAsyncWaitCallback);
240 MOZ_ASSERT(callback);
241 return callback->OnInputStreamReady(this);
244 //-----------------------------------------------------------------------------
245 // nsStreamTransportService
246 //-----------------------------------------------------------------------------
248 nsStreamTransportService::nsStreamTransportService() = default;
250 nsStreamTransportService::~nsStreamTransportService() {
251 NS_ASSERTION(!mPool, "thread pool wasn't shutdown");
254 nsresult nsStreamTransportService::Init() {
255 // Can't be used multithreaded before this
256 MOZ_PUSH_IGNORE_THREAD_SAFETY
257 MOZ_ASSERT(!mPool);
258 mPool = new nsThreadPool();
260 // Configure the pool
261 mPool->SetName("StreamTrans"_ns);
262 mPool->SetThreadLimit(25);
263 mPool->SetIdleThreadLimit(5);
264 mPool->SetIdleThreadTimeoutRegressive(true);
265 mPool->SetIdleThreadTimeout(PR_SecondsToInterval(30));
266 MOZ_POP_THREAD_SAFETY
268 nsCOMPtr<nsIObserverService> obsSvc = mozilla::services::GetObserverService();
269 if (obsSvc) obsSvc->AddObserver(this, "xpcom-shutdown-threads", false);
270 return NS_OK;
273 NS_IMPL_ISUPPORTS(nsStreamTransportService, nsIStreamTransportService,
274 nsIEventTarget, nsIObserver)
276 NS_IMETHODIMP
277 nsStreamTransportService::DispatchFromScript(nsIRunnable* task,
278 uint32_t flags) {
279 nsCOMPtr<nsIRunnable> event(task);
280 return Dispatch(event.forget(), flags);
283 NS_IMETHODIMP
284 nsStreamTransportService::Dispatch(already_AddRefed<nsIRunnable> task,
285 uint32_t flags) {
286 nsCOMPtr<nsIRunnable> event(task); // so it gets released on failure paths
287 nsCOMPtr<nsIThreadPool> pool;
289 mozilla::MutexAutoLock lock(mShutdownLock);
290 if (mIsShutdown) {
291 return NS_ERROR_NOT_INITIALIZED;
293 pool = mPool;
295 NS_ENSURE_TRUE(pool, NS_ERROR_NOT_INITIALIZED);
296 return pool->Dispatch(event.forget(), flags);
299 NS_IMETHODIMP
300 nsStreamTransportService::DelayedDispatch(already_AddRefed<nsIRunnable> aEvent,
301 uint32_t aDelayMs) {
302 return NS_ERROR_NOT_IMPLEMENTED;
305 NS_IMETHODIMP
306 nsStreamTransportService::RegisterShutdownTask(nsITargetShutdownTask*) {
307 return NS_ERROR_NOT_IMPLEMENTED;
310 NS_IMETHODIMP
311 nsStreamTransportService::UnregisterShutdownTask(nsITargetShutdownTask*) {
312 return NS_ERROR_NOT_IMPLEMENTED;
315 NS_IMETHODIMP_(bool)
316 nsStreamTransportService::IsOnCurrentThreadInfallible() {
317 nsCOMPtr<nsIThreadPool> pool;
319 mozilla::MutexAutoLock lock(mShutdownLock);
320 pool = mPool;
322 if (!pool) {
323 return false;
325 return pool->IsOnCurrentThread();
328 NS_IMETHODIMP
329 nsStreamTransportService::IsOnCurrentThread(bool* result) {
330 nsCOMPtr<nsIThreadPool> pool;
332 mozilla::MutexAutoLock lock(mShutdownLock);
333 if (mIsShutdown) {
334 return NS_ERROR_NOT_INITIALIZED;
336 pool = mPool;
338 NS_ENSURE_TRUE(pool, NS_ERROR_NOT_INITIALIZED);
339 return pool->IsOnCurrentThread(result);
342 NS_IMETHODIMP
343 nsStreamTransportService::CreateInputTransport(nsIInputStream* stream,
344 bool closeWhenDone,
345 nsITransport** result) {
346 RefPtr<nsInputStreamTransport> trans =
347 new nsInputStreamTransport(stream, closeWhenDone);
348 trans.forget(result);
349 return NS_OK;
352 NS_IMETHODIMP
353 nsStreamTransportService::Observe(nsISupports* subject, const char* topic,
354 const char16_t* data) {
355 NS_ASSERTION(strcmp(topic, "xpcom-shutdown-threads") == 0, "oops");
358 nsCOMPtr<nsIThreadPool> pool;
360 mozilla::MutexAutoLock lock(mShutdownLock);
361 mIsShutdown = true;
362 pool = mPool.forget();
365 if (pool) {
366 pool->Shutdown();
369 return NS_OK;
372 class AvailableEvent final : public Runnable {
373 public:
374 AvailableEvent(nsIInputStream* stream, nsIInputAvailableCallback* callback)
375 : Runnable("net::AvailableEvent"),
376 mStream(stream),
377 mCallback(callback),
378 mDoingCallback(false),
379 mSize(0),
380 mResultForCallback(NS_OK) {
381 mCallbackTarget = GetCurrentSerialEventTarget();
384 NS_IMETHOD Run() override {
385 if (mDoingCallback) {
386 // pong
387 mCallback->OnInputAvailableComplete(mSize, mResultForCallback);
388 mCallback = nullptr;
389 } else {
390 // ping
391 mResultForCallback = mStream->Available(&mSize);
392 mStream = nullptr;
393 mDoingCallback = true;
395 nsCOMPtr<nsIRunnable> event(this); // overly cute
396 mCallbackTarget->Dispatch(event.forget(), NS_DISPATCH_NORMAL);
397 mCallbackTarget = nullptr;
399 return NS_OK;
402 private:
403 virtual ~AvailableEvent() = default;
405 nsCOMPtr<nsIInputStream> mStream;
406 nsCOMPtr<nsIInputAvailableCallback> mCallback;
407 nsCOMPtr<nsIEventTarget> mCallbackTarget;
408 bool mDoingCallback;
409 uint64_t mSize;
410 nsresult mResultForCallback;
413 NS_IMETHODIMP
414 nsStreamTransportService::InputAvailable(nsIInputStream* stream,
415 nsIInputAvailableCallback* callback) {
416 nsCOMPtr<nsIThreadPool> pool;
418 mozilla::MutexAutoLock lock(mShutdownLock);
419 if (mIsShutdown) {
420 return NS_ERROR_NOT_INITIALIZED;
422 pool = mPool;
424 nsCOMPtr<nsIRunnable> event = new AvailableEvent(stream, callback);
425 return pool->Dispatch(event.forget(), NS_DISPATCH_NORMAL);
428 } // namespace net
429 } // namespace mozilla