1 /* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
5 #include "nsStreamTransportService.h"
7 #include "nsXPCOMCIDInternal.h"
8 #include "nsNetSegmentUtils.h"
9 #include "nsTransportUtils.h"
10 #include "nsStreamUtils.h"
14 #include "nsIAsyncInputStream.h"
15 #include "nsIAsyncOutputStream.h"
17 #include "nsITransport.h"
18 #include "nsIObserverService.h"
19 #include "nsThreadPool.h"
20 #include "mozilla/Components.h"
21 #include "mozilla/Services.h"
26 //-----------------------------------------------------------------------------
27 // nsInputStreamTransport
29 // Implements nsIInputStream as a wrapper around the real input stream. This
30 // allows the transport to support seeking, range-limiting, progress reporting,
31 // and close-when-done semantics while utilizing NS_AsyncCopy.
32 //-----------------------------------------------------------------------------
34 class nsInputStreamTransport
: public nsITransport
,
35 public nsIAsyncInputStream
,
36 public nsIInputStreamCallback
{
38 NS_DECL_THREADSAFE_ISUPPORTS
40 NS_DECL_NSIINPUTSTREAM
41 NS_DECL_NSIASYNCINPUTSTREAM
42 NS_DECL_NSIINPUTSTREAMCALLBACK
44 nsInputStreamTransport(nsIInputStream
* source
, bool closeWhenDone
)
45 : mSource(source
), mCloseWhenDone(closeWhenDone
) {
46 mAsyncSource
= do_QueryInterface(mSource
);
50 virtual ~nsInputStreamTransport() = default;
52 Mutex mMutex MOZ_UNANNOTATED
{"nsInputStreamTransport::mMutex"};
54 // This value is protected by mutex.
55 nsCOMPtr
<nsIInputStreamCallback
> mAsyncWaitCallback
;
57 nsCOMPtr
<nsIAsyncInputStream
> mPipeIn
;
59 // while the copy is active, these members may only be accessed from the
60 // nsIInputStream implementation.
61 nsCOMPtr
<nsITransportEventSink
> mEventSink
;
62 nsCOMPtr
<nsIInputStream
> mSource
;
65 nsCOMPtr
<nsIAsyncInputStream
> mAsyncSource
;
68 const bool mCloseWhenDone
;
70 // this variable serves as a lock to prevent the state of the transport
71 // from being modified once the copy is in progress.
72 bool mInProgress
{false};
75 NS_IMPL_ADDREF(nsInputStreamTransport
);
76 NS_IMPL_RELEASE(nsInputStreamTransport
);
78 NS_INTERFACE_MAP_BEGIN(nsInputStreamTransport
)
79 NS_INTERFACE_MAP_ENTRY(nsITransport
)
80 NS_INTERFACE_MAP_ENTRY(nsIInputStream
)
81 NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsIAsyncInputStream
, !!mAsyncSource
)
82 NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsIInputStreamCallback
, !!mAsyncSource
)
83 NS_INTERFACE_MAP_ENTRY_AMBIGUOUS(nsISupports
, nsITransport
)
89 nsInputStreamTransport::OpenInputStream(uint32_t flags
, uint32_t segsize
,
91 nsIInputStream
** result
) {
92 NS_ENSURE_TRUE(!mInProgress
, NS_ERROR_IN_PROGRESS
);
95 nsCOMPtr
<nsIEventTarget
> target
;
96 target
= mozilla::components::StreamTransport::Service(&rv
);
97 if (NS_FAILED(rv
)) return rv
;
99 // XXX if the caller requests an unbuffered stream, then perhaps
100 // we'd want to simply return mSource; however, then we would
101 // not be reading mSource on a background thread. is this ok?
103 bool nonblocking
= !(flags
& OPEN_BLOCKING
);
105 net_ResolveSegmentParams(segsize
, segcount
);
107 nsCOMPtr
<nsIAsyncOutputStream
> pipeOut
;
108 NS_NewPipe2(getter_AddRefs(mPipeIn
), getter_AddRefs(pipeOut
), nonblocking
,
109 true, segsize
, segcount
);
113 // startup async copy process...
114 rv
= NS_AsyncCopy(this, pipeOut
, target
, NS_ASYNCCOPY_VIA_WRITESEGMENTS
,
119 *result
= do_AddRef(mPipeIn
).take();
124 nsInputStreamTransport::OpenOutputStream(uint32_t flags
, uint32_t segsize
,
126 nsIOutputStream
** result
) {
127 // this transport only supports reading!
128 MOZ_ASSERT_UNREACHABLE("nsInputStreamTransport::OpenOutputStream");
129 return NS_ERROR_UNEXPECTED
;
133 nsInputStreamTransport::Close(nsresult reason
) {
134 if (NS_SUCCEEDED(reason
)) reason
= NS_BASE_STREAM_CLOSED
;
136 return mPipeIn
->CloseWithStatus(reason
);
140 nsInputStreamTransport::SetEventSink(nsITransportEventSink
* sink
,
141 nsIEventTarget
* target
) {
142 NS_ENSURE_TRUE(!mInProgress
, NS_ERROR_IN_PROGRESS
);
145 return net_NewTransportEventSinkProxy(getter_AddRefs(mEventSink
), sink
,
153 /** nsIInputStream **/
156 nsInputStreamTransport::Close() {
157 if (mCloseWhenDone
) mSource
->Close();
159 // make additional reads return early...
165 nsInputStreamTransport::Available(uint64_t* result
) {
166 return NS_ERROR_NOT_IMPLEMENTED
;
170 nsInputStreamTransport::StreamStatus() { return mSource
->StreamStatus(); }
173 nsInputStreamTransport::Read(char* buf
, uint32_t count
, uint32_t* result
) {
174 nsresult rv
= mSource
->Read(buf
, count
, result
);
176 if (NS_SUCCEEDED(rv
)) {
179 mEventSink
->OnTransportStatus(this, NS_NET_STATUS_READING
, mOffset
, -1);
186 nsInputStreamTransport::ReadSegments(nsWriteSegmentFun writer
, void* closure
,
187 uint32_t count
, uint32_t* result
) {
188 return NS_ERROR_NOT_IMPLEMENTED
;
192 nsInputStreamTransport::IsNonBlocking(bool* result
) {
197 // nsIAsyncInputStream interface
200 nsInputStreamTransport::CloseWithStatus(nsresult aStatus
) { return Close(); }
203 nsInputStreamTransport::AsyncWait(nsIInputStreamCallback
* aCallback
,
204 uint32_t aFlags
, uint32_t aRequestedCount
,
205 nsIEventTarget
* aEventTarget
) {
206 NS_ENSURE_STATE(!!mAsyncSource
);
208 nsCOMPtr
<nsIInputStreamCallback
> callback
= aCallback
? this : nullptr;
211 MutexAutoLock
lock(mMutex
);
213 if (NS_WARN_IF(mAsyncWaitCallback
&& aCallback
&&
214 mAsyncWaitCallback
!= aCallback
)) {
215 return NS_ERROR_FAILURE
;
218 mAsyncWaitCallback
= aCallback
;
221 return mAsyncSource
->AsyncWait(callback
, aFlags
, aRequestedCount
,
225 // nsIInputStreamCallback
228 nsInputStreamTransport::OnInputStreamReady(nsIAsyncInputStream
* aStream
) {
229 nsCOMPtr
<nsIInputStreamCallback
> callback
;
231 MutexAutoLock
lock(mMutex
);
233 // We have been canceled in the meanwhile.
234 if (!mAsyncWaitCallback
) {
238 callback
.swap(mAsyncWaitCallback
);
241 MOZ_ASSERT(callback
);
242 return callback
->OnInputStreamReady(this);
245 //-----------------------------------------------------------------------------
246 // nsStreamTransportService
247 //-----------------------------------------------------------------------------
249 nsStreamTransportService::nsStreamTransportService() = default;
251 nsStreamTransportService::~nsStreamTransportService() {
252 NS_ASSERTION(!mPool
, "thread pool wasn't shutdown");
255 nsresult
nsStreamTransportService::Init() {
256 // Can't be used multithreaded before this
257 MOZ_PUSH_IGNORE_THREAD_SAFETY
259 mPool
= new nsThreadPool();
261 // Configure the pool
262 mPool
->SetName("StreamTrans"_ns
);
263 // TODO: Make these settings configurable.
264 mPool
->SetThreadLimit(25);
265 mPool
->SetIdleThreadLimit(4);
266 mPool
->SetIdleThreadMaximumTimeout(30 * 1000);
267 mPool
->SetIdleThreadGraceTimeout(500);
268 MOZ_POP_THREAD_SAFETY
270 nsCOMPtr
<nsIObserverService
> obsSvc
= mozilla::services::GetObserverService();
271 if (obsSvc
) obsSvc
->AddObserver(this, "xpcom-shutdown-threads", false);
275 NS_IMPL_ISUPPORTS(nsStreamTransportService
, nsIStreamTransportService
,
276 nsIEventTarget
, nsIObserver
)
279 nsStreamTransportService::DispatchFromScript(nsIRunnable
* task
,
281 nsCOMPtr
<nsIRunnable
> event(task
);
282 return Dispatch(event
.forget(), flags
);
286 nsStreamTransportService::Dispatch(already_AddRefed
<nsIRunnable
> task
,
288 nsCOMPtr
<nsIRunnable
> event(task
); // so it gets released on failure paths
289 nsCOMPtr
<nsIThreadPool
> pool
;
291 mozilla::MutexAutoLock
lock(mShutdownLock
);
293 return NS_ERROR_NOT_INITIALIZED
;
297 NS_ENSURE_TRUE(pool
, NS_ERROR_NOT_INITIALIZED
);
298 return pool
->Dispatch(event
.forget(), flags
);
302 nsStreamTransportService::DelayedDispatch(already_AddRefed
<nsIRunnable
> aEvent
,
304 return NS_ERROR_NOT_IMPLEMENTED
;
308 nsStreamTransportService::RegisterShutdownTask(nsITargetShutdownTask
*) {
309 return NS_ERROR_NOT_IMPLEMENTED
;
313 nsStreamTransportService::UnregisterShutdownTask(nsITargetShutdownTask
*) {
314 return NS_ERROR_NOT_IMPLEMENTED
;
318 nsStreamTransportService::IsOnCurrentThreadInfallible() {
319 nsCOMPtr
<nsIThreadPool
> pool
;
321 mozilla::MutexAutoLock
lock(mShutdownLock
);
327 return pool
->IsOnCurrentThread();
331 nsStreamTransportService::IsOnCurrentThread(bool* result
) {
332 nsCOMPtr
<nsIThreadPool
> pool
;
334 mozilla::MutexAutoLock
lock(mShutdownLock
);
336 return NS_ERROR_NOT_INITIALIZED
;
340 NS_ENSURE_TRUE(pool
, NS_ERROR_NOT_INITIALIZED
);
341 return pool
->IsOnCurrentThread(result
);
345 nsStreamTransportService::CreateInputTransport(nsIInputStream
* stream
,
347 nsITransport
** result
) {
348 RefPtr
<nsInputStreamTransport
> trans
=
349 new nsInputStreamTransport(stream
, closeWhenDone
);
350 trans
.forget(result
);
355 nsStreamTransportService::Observe(nsISupports
* subject
, const char* topic
,
356 const char16_t
* data
) {
357 NS_ASSERTION(strcmp(topic
, "xpcom-shutdown-threads") == 0, "oops");
360 nsCOMPtr
<nsIThreadPool
> pool
;
362 mozilla::MutexAutoLock
lock(mShutdownLock
);
364 pool
= mPool
.forget();
375 } // namespace mozilla