1 /* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
5 #include "nsStreamTransportService.h"
7 #include "nsXPCOMCIDInternal.h"
8 #include "nsNetSegmentUtils.h"
9 #include "nsTransportUtils.h"
10 #include "nsStreamUtils.h"
14 #include "nsIAsyncInputStream.h"
15 #include "nsIAsyncOutputStream.h"
17 #include "nsITransport.h"
18 #include "nsIObserverService.h"
19 #include "nsThreadPool.h"
20 #include "mozilla/Services.h"
25 //-----------------------------------------------------------------------------
26 // nsInputStreamTransport
28 // Implements nsIInputStream as a wrapper around the real input stream. This
29 // allows the transport to support seeking, range-limiting, progress reporting,
30 // and close-when-done semantics while utilizing NS_AsyncCopy.
31 //-----------------------------------------------------------------------------
33 class nsInputStreamTransport
: public nsITransport
,
34 public nsIAsyncInputStream
,
35 public nsIInputStreamCallback
{
37 NS_DECL_THREADSAFE_ISUPPORTS
39 NS_DECL_NSIINPUTSTREAM
40 NS_DECL_NSIASYNCINPUTSTREAM
41 NS_DECL_NSIINPUTSTREAMCALLBACK
43 nsInputStreamTransport(nsIInputStream
* source
, bool closeWhenDone
)
44 : mSource(source
), mCloseWhenDone(closeWhenDone
) {
45 mAsyncSource
= do_QueryInterface(mSource
);
49 virtual ~nsInputStreamTransport() = default;
51 Mutex mMutex MOZ_UNANNOTATED
{"nsInputStreamTransport::mMutex"};
53 // This value is protected by mutex.
54 nsCOMPtr
<nsIInputStreamCallback
> mAsyncWaitCallback
;
56 nsCOMPtr
<nsIAsyncInputStream
> mPipeIn
;
58 // while the copy is active, these members may only be accessed from the
59 // nsIInputStream implementation.
60 nsCOMPtr
<nsITransportEventSink
> mEventSink
;
61 nsCOMPtr
<nsIInputStream
> mSource
;
64 nsCOMPtr
<nsIAsyncInputStream
> mAsyncSource
;
67 const bool mCloseWhenDone
;
69 // this variable serves as a lock to prevent the state of the transport
70 // from being modified once the copy is in progress.
71 bool mInProgress
{false};
74 NS_IMPL_ADDREF(nsInputStreamTransport
);
75 NS_IMPL_RELEASE(nsInputStreamTransport
);
77 NS_INTERFACE_MAP_BEGIN(nsInputStreamTransport
)
78 NS_INTERFACE_MAP_ENTRY(nsITransport
)
79 NS_INTERFACE_MAP_ENTRY(nsIInputStream
)
80 NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsIAsyncInputStream
, !!mAsyncSource
)
81 NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsIInputStreamCallback
, !!mAsyncSource
)
82 NS_INTERFACE_MAP_ENTRY_AMBIGUOUS(nsISupports
, nsITransport
)
88 nsInputStreamTransport::OpenInputStream(uint32_t flags
, uint32_t segsize
,
90 nsIInputStream
** result
) {
91 NS_ENSURE_TRUE(!mInProgress
, NS_ERROR_IN_PROGRESS
);
94 nsCOMPtr
<nsIEventTarget
> target
=
95 do_GetService(NS_STREAMTRANSPORTSERVICE_CONTRACTID
, &rv
);
96 if (NS_FAILED(rv
)) return rv
;
98 // XXX if the caller requests an unbuffered stream, then perhaps
99 // we'd want to simply return mSource; however, then we would
100 // not be reading mSource on a background thread. is this ok?
102 bool nonblocking
= !(flags
& OPEN_BLOCKING
);
104 net_ResolveSegmentParams(segsize
, segcount
);
106 nsCOMPtr
<nsIAsyncOutputStream
> pipeOut
;
107 NS_NewPipe2(getter_AddRefs(mPipeIn
), getter_AddRefs(pipeOut
), nonblocking
,
108 true, segsize
, segcount
);
112 // startup async copy process...
113 rv
= NS_AsyncCopy(this, pipeOut
, target
, NS_ASYNCCOPY_VIA_WRITESEGMENTS
,
118 *result
= do_AddRef(mPipeIn
).take();
123 nsInputStreamTransport::OpenOutputStream(uint32_t flags
, uint32_t segsize
,
125 nsIOutputStream
** result
) {
126 // this transport only supports reading!
127 MOZ_ASSERT_UNREACHABLE("nsInputStreamTransport::OpenOutputStream");
128 return NS_ERROR_UNEXPECTED
;
132 nsInputStreamTransport::Close(nsresult reason
) {
133 if (NS_SUCCEEDED(reason
)) reason
= NS_BASE_STREAM_CLOSED
;
135 return mPipeIn
->CloseWithStatus(reason
);
139 nsInputStreamTransport::SetEventSink(nsITransportEventSink
* sink
,
140 nsIEventTarget
* target
) {
141 NS_ENSURE_TRUE(!mInProgress
, NS_ERROR_IN_PROGRESS
);
144 return net_NewTransportEventSinkProxy(getter_AddRefs(mEventSink
), sink
,
152 /** nsIInputStream **/
155 nsInputStreamTransport::Close() {
156 if (mCloseWhenDone
) mSource
->Close();
158 // make additional reads return early...
164 nsInputStreamTransport::Available(uint64_t* result
) {
165 return NS_ERROR_NOT_IMPLEMENTED
;
169 nsInputStreamTransport::StreamStatus() { return mSource
->StreamStatus(); }
172 nsInputStreamTransport::Read(char* buf
, uint32_t count
, uint32_t* result
) {
173 nsresult rv
= mSource
->Read(buf
, count
, result
);
175 if (NS_SUCCEEDED(rv
)) {
178 mEventSink
->OnTransportStatus(this, NS_NET_STATUS_READING
, mOffset
, -1);
185 nsInputStreamTransport::ReadSegments(nsWriteSegmentFun writer
, void* closure
,
186 uint32_t count
, uint32_t* result
) {
187 return NS_ERROR_NOT_IMPLEMENTED
;
191 nsInputStreamTransport::IsNonBlocking(bool* result
) {
196 // nsIAsyncInputStream interface
199 nsInputStreamTransport::CloseWithStatus(nsresult aStatus
) { return Close(); }
202 nsInputStreamTransport::AsyncWait(nsIInputStreamCallback
* aCallback
,
203 uint32_t aFlags
, uint32_t aRequestedCount
,
204 nsIEventTarget
* aEventTarget
) {
205 NS_ENSURE_STATE(!!mAsyncSource
);
207 nsCOMPtr
<nsIInputStreamCallback
> callback
= aCallback
? this : nullptr;
210 MutexAutoLock
lock(mMutex
);
212 if (NS_WARN_IF(mAsyncWaitCallback
&& aCallback
&&
213 mAsyncWaitCallback
!= aCallback
)) {
214 return NS_ERROR_FAILURE
;
217 mAsyncWaitCallback
= aCallback
;
220 return mAsyncSource
->AsyncWait(callback
, aFlags
, aRequestedCount
,
224 // nsIInputStreamCallback
227 nsInputStreamTransport::OnInputStreamReady(nsIAsyncInputStream
* aStream
) {
228 nsCOMPtr
<nsIInputStreamCallback
> callback
;
230 MutexAutoLock
lock(mMutex
);
232 // We have been canceled in the meanwhile.
233 if (!mAsyncWaitCallback
) {
237 callback
.swap(mAsyncWaitCallback
);
240 MOZ_ASSERT(callback
);
241 return callback
->OnInputStreamReady(this);
244 //-----------------------------------------------------------------------------
245 // nsStreamTransportService
246 //-----------------------------------------------------------------------------
248 nsStreamTransportService::nsStreamTransportService() = default;
250 nsStreamTransportService::~nsStreamTransportService() {
251 NS_ASSERTION(!mPool
, "thread pool wasn't shutdown");
254 nsresult
nsStreamTransportService::Init() {
255 // Can't be used multithreaded before this
256 MOZ_PUSH_IGNORE_THREAD_SAFETY
258 mPool
= new nsThreadPool();
260 // Configure the pool
261 mPool
->SetName("StreamTrans"_ns
);
262 mPool
->SetThreadLimit(25);
263 mPool
->SetIdleThreadLimit(5);
264 mPool
->SetIdleThreadTimeoutRegressive(true);
265 mPool
->SetIdleThreadTimeout(PR_SecondsToInterval(30));
266 MOZ_POP_THREAD_SAFETY
268 nsCOMPtr
<nsIObserverService
> obsSvc
= mozilla::services::GetObserverService();
269 if (obsSvc
) obsSvc
->AddObserver(this, "xpcom-shutdown-threads", false);
273 NS_IMPL_ISUPPORTS(nsStreamTransportService
, nsIStreamTransportService
,
274 nsIEventTarget
, nsIObserver
)
277 nsStreamTransportService::DispatchFromScript(nsIRunnable
* task
,
279 nsCOMPtr
<nsIRunnable
> event(task
);
280 return Dispatch(event
.forget(), flags
);
284 nsStreamTransportService::Dispatch(already_AddRefed
<nsIRunnable
> task
,
286 nsCOMPtr
<nsIRunnable
> event(task
); // so it gets released on failure paths
287 nsCOMPtr
<nsIThreadPool
> pool
;
289 mozilla::MutexAutoLock
lock(mShutdownLock
);
291 return NS_ERROR_NOT_INITIALIZED
;
295 NS_ENSURE_TRUE(pool
, NS_ERROR_NOT_INITIALIZED
);
296 return pool
->Dispatch(event
.forget(), flags
);
300 nsStreamTransportService::DelayedDispatch(already_AddRefed
<nsIRunnable
> aEvent
,
302 return NS_ERROR_NOT_IMPLEMENTED
;
306 nsStreamTransportService::RegisterShutdownTask(nsITargetShutdownTask
*) {
307 return NS_ERROR_NOT_IMPLEMENTED
;
311 nsStreamTransportService::UnregisterShutdownTask(nsITargetShutdownTask
*) {
312 return NS_ERROR_NOT_IMPLEMENTED
;
316 nsStreamTransportService::IsOnCurrentThreadInfallible() {
317 nsCOMPtr
<nsIThreadPool
> pool
;
319 mozilla::MutexAutoLock
lock(mShutdownLock
);
325 return pool
->IsOnCurrentThread();
329 nsStreamTransportService::IsOnCurrentThread(bool* result
) {
330 nsCOMPtr
<nsIThreadPool
> pool
;
332 mozilla::MutexAutoLock
lock(mShutdownLock
);
334 return NS_ERROR_NOT_INITIALIZED
;
338 NS_ENSURE_TRUE(pool
, NS_ERROR_NOT_INITIALIZED
);
339 return pool
->IsOnCurrentThread(result
);
343 nsStreamTransportService::CreateInputTransport(nsIInputStream
* stream
,
345 nsITransport
** result
) {
346 RefPtr
<nsInputStreamTransport
> trans
=
347 new nsInputStreamTransport(stream
, closeWhenDone
);
348 trans
.forget(result
);
353 nsStreamTransportService::Observe(nsISupports
* subject
, const char* topic
,
354 const char16_t
* data
) {
355 NS_ASSERTION(strcmp(topic
, "xpcom-shutdown-threads") == 0, "oops");
358 nsCOMPtr
<nsIThreadPool
> pool
;
360 mozilla::MutexAutoLock
lock(mShutdownLock
);
362 pool
= mPool
.forget();
372 class AvailableEvent final
: public Runnable
{
374 AvailableEvent(nsIInputStream
* stream
, nsIInputAvailableCallback
* callback
)
375 : Runnable("net::AvailableEvent"),
378 mDoingCallback(false),
380 mResultForCallback(NS_OK
) {
381 mCallbackTarget
= GetCurrentSerialEventTarget();
384 NS_IMETHOD
Run() override
{
385 if (mDoingCallback
) {
387 mCallback
->OnInputAvailableComplete(mSize
, mResultForCallback
);
391 mResultForCallback
= mStream
->Available(&mSize
);
393 mDoingCallback
= true;
395 nsCOMPtr
<nsIRunnable
> event(this); // overly cute
396 mCallbackTarget
->Dispatch(event
.forget(), NS_DISPATCH_NORMAL
);
397 mCallbackTarget
= nullptr;
403 virtual ~AvailableEvent() = default;
405 nsCOMPtr
<nsIInputStream
> mStream
;
406 nsCOMPtr
<nsIInputAvailableCallback
> mCallback
;
407 nsCOMPtr
<nsIEventTarget
> mCallbackTarget
;
410 nsresult mResultForCallback
;
414 nsStreamTransportService::InputAvailable(nsIInputStream
* stream
,
415 nsIInputAvailableCallback
* callback
) {
416 nsCOMPtr
<nsIThreadPool
> pool
;
418 mozilla::MutexAutoLock
lock(mShutdownLock
);
420 return NS_ERROR_NOT_INITIALIZED
;
424 nsCOMPtr
<nsIRunnable
> event
= new AvailableEvent(stream
, callback
);
425 return pool
->Dispatch(event
.forget(), NS_DISPATCH_NORMAL
);
429 } // namespace mozilla