1 /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* This Source Code Form is subject to the terms of the Mozilla Public
3 * License, v. 2.0. If a copy of the MPL was not distributed with this
4 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
6 #include "NonBlockingAsyncInputStream.h"
7 #include "mozilla/ipc/InputStreamUtils.h"
8 #include "nsIAsyncInputStream.h"
9 #include "nsICloneableInputStream.h"
10 #include "nsIInputStream.h"
11 #include "nsIIPCSerializableInputStream.h"
12 #include "nsISeekableStream.h"
13 #include "nsStreamUtils.h"
19 class NonBlockingAsyncInputStream::AsyncWaitRunnable final
20 : public CancelableRunnable
{
21 RefPtr
<NonBlockingAsyncInputStream
> mStream
;
22 nsCOMPtr
<nsIInputStreamCallback
> mCallback
;
25 AsyncWaitRunnable(NonBlockingAsyncInputStream
* aStream
,
26 nsIInputStreamCallback
* aCallback
)
27 : CancelableRunnable("AsyncWaitRunnable"),
29 mCallback(aCallback
) {}
33 mStream
->RunAsyncWaitCallback(this, mCallback
.forget());
37 nsresult
Cancel() override
{
43 NS_IMPL_ADDREF(NonBlockingAsyncInputStream
);
44 NS_IMPL_RELEASE(NonBlockingAsyncInputStream
);
46 NonBlockingAsyncInputStream::WaitClosureOnly::WaitClosureOnly(
47 AsyncWaitRunnable
* aRunnable
, nsIEventTarget
* aEventTarget
)
48 : mRunnable(aRunnable
), mEventTarget(aEventTarget
) {}
50 NS_INTERFACE_MAP_BEGIN(NonBlockingAsyncInputStream
)
51 NS_INTERFACE_MAP_ENTRY(nsIInputStream
)
52 NS_INTERFACE_MAP_ENTRY(nsIAsyncInputStream
)
53 NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsICloneableInputStream
,
54 mWeakCloneableInputStream
)
55 NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsIIPCSerializableInputStream
,
56 mWeakIPCSerializableInputStream
)
57 NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsISeekableStream
,
58 mWeakSeekableInputStream
)
59 NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsITellableStream
,
60 mWeakTellableInputStream
)
61 NS_INTERFACE_MAP_ENTRY_AMBIGUOUS(nsISupports
, nsIInputStream
)
65 nsresult
NonBlockingAsyncInputStream::Create(
66 already_AddRefed
<nsIInputStream
> aInputStream
,
67 nsIAsyncInputStream
** aResult
) {
68 MOZ_DIAGNOSTIC_ASSERT(aResult
);
70 nsCOMPtr
<nsIInputStream
> inputStream
= std::move(aInputStream
);
72 bool nonBlocking
= false;
73 nsresult rv
= inputStream
->IsNonBlocking(&nonBlocking
);
74 if (NS_WARN_IF(NS_FAILED(rv
))) {
78 MOZ_DIAGNOSTIC_ASSERT(nonBlocking
);
80 #ifdef MOZ_DIAGNOSTIC_ASSERT_ENABLED
81 nsCOMPtr
<nsIAsyncInputStream
> asyncInputStream
=
82 do_QueryInterface(inputStream
);
83 MOZ_DIAGNOSTIC_ASSERT(!asyncInputStream
);
84 #endif // MOZ_DIAGNOSTIC_ASSERT_ENABLED
86 RefPtr
<NonBlockingAsyncInputStream
> stream
=
87 new NonBlockingAsyncInputStream(inputStream
.forget());
89 stream
.forget(aResult
);
93 NonBlockingAsyncInputStream::NonBlockingAsyncInputStream(
94 already_AddRefed
<nsIInputStream
> aInputStream
)
95 : mInputStream(std::move(aInputStream
)),
96 mWeakCloneableInputStream(nullptr),
97 mWeakIPCSerializableInputStream(nullptr),
98 mWeakSeekableInputStream(nullptr),
99 mWeakTellableInputStream(nullptr),
100 mLock("NonBlockingAsyncInputStream::mLock"),
102 MOZ_ASSERT(mInputStream
);
104 nsCOMPtr
<nsICloneableInputStream
> cloneableStream
=
105 do_QueryInterface(mInputStream
);
106 if (cloneableStream
&& SameCOMIdentity(mInputStream
, cloneableStream
)) {
107 mWeakCloneableInputStream
= cloneableStream
;
110 nsCOMPtr
<nsIIPCSerializableInputStream
> serializableStream
=
111 do_QueryInterface(mInputStream
);
112 if (serializableStream
&& SameCOMIdentity(mInputStream
, serializableStream
)) {
113 mWeakIPCSerializableInputStream
= serializableStream
;
116 nsCOMPtr
<nsISeekableStream
> seekableStream
= do_QueryInterface(mInputStream
);
117 if (seekableStream
&& SameCOMIdentity(mInputStream
, seekableStream
)) {
118 mWeakSeekableInputStream
= seekableStream
;
121 nsCOMPtr
<nsITellableStream
> tellableStream
= do_QueryInterface(mInputStream
);
122 if (tellableStream
&& SameCOMIdentity(mInputStream
, tellableStream
)) {
123 mWeakTellableInputStream
= tellableStream
;
127 NonBlockingAsyncInputStream::~NonBlockingAsyncInputStream() = default;
130 NonBlockingAsyncInputStream::Close() {
131 RefPtr
<AsyncWaitRunnable
> waitClosureOnlyRunnable
;
132 nsCOMPtr
<nsIEventTarget
> waitClosureOnlyEventTarget
;
135 MutexAutoLock
lock(mLock
);
138 // Here we could return NS_BASE_STREAM_CLOSED as well, but just to avoid
139 // warning messages, let's make everybody happy with a NS_OK.
145 NS_ENSURE_STATE(mInputStream
);
146 nsresult rv
= mInputStream
->Close();
147 if (NS_WARN_IF(NS_FAILED(rv
))) {
148 mWaitClosureOnly
.reset();
152 // If we have a WaitClosureOnly runnable, it's time to use it.
153 if (mWaitClosureOnly
.isSome()) {
154 waitClosureOnlyRunnable
= std::move(mWaitClosureOnly
->mRunnable
);
155 waitClosureOnlyEventTarget
= std::move(mWaitClosureOnly
->mEventTarget
);
157 mWaitClosureOnly
.reset();
159 // Now we want to dispatch the asyncWaitCallback.
160 mAsyncWaitCallback
= waitClosureOnlyRunnable
;
164 if (waitClosureOnlyRunnable
) {
165 if (waitClosureOnlyEventTarget
) {
166 waitClosureOnlyEventTarget
->Dispatch(waitClosureOnlyRunnable
,
169 waitClosureOnlyRunnable
->Run();
176 // nsIInputStream interface
179 NonBlockingAsyncInputStream::Available(uint64_t* aLength
) {
180 nsresult rv
= mInputStream
->Available(aLength
);
181 // Don't issue warnings for legal condition NS_BASE_STREAM_CLOSED.
182 if (rv
== NS_BASE_STREAM_CLOSED
|| NS_WARN_IF(NS_FAILED(rv
))) {
186 // Nothing more to read. Let's close the stream now.
188 MutexAutoLock
lock(mLock
);
189 mInputStream
->Close();
191 return NS_BASE_STREAM_CLOSED
;
198 NonBlockingAsyncInputStream::StreamStatus() {
199 return mInputStream
->StreamStatus();
203 NonBlockingAsyncInputStream::Read(char* aBuffer
, uint32_t aCount
,
204 uint32_t* aReadCount
) {
205 return mInputStream
->Read(aBuffer
, aCount
, aReadCount
);
210 class MOZ_RAII ReadSegmentsData
{
212 ReadSegmentsData(NonBlockingAsyncInputStream
* aStream
,
213 nsWriteSegmentFun aFunc
, void* aClosure
)
214 : mStream(aStream
), mFunc(aFunc
), mClosure(aClosure
) {}
216 NonBlockingAsyncInputStream
* mStream
;
217 nsWriteSegmentFun mFunc
;
221 nsresult
ReadSegmentsWriter(nsIInputStream
* aInStream
, void* aClosure
,
222 const char* aFromSegment
, uint32_t aToOffset
,
223 uint32_t aCount
, uint32_t* aWriteCount
) {
224 ReadSegmentsData
* data
= static_cast<ReadSegmentsData
*>(aClosure
);
225 return data
->mFunc(data
->mStream
, data
->mClosure
, aFromSegment
, aToOffset
,
226 aCount
, aWriteCount
);
232 NonBlockingAsyncInputStream::ReadSegments(nsWriteSegmentFun aWriter
,
233 void* aClosure
, uint32_t aCount
,
235 ReadSegmentsData
data(this, aWriter
, aClosure
);
236 return mInputStream
->ReadSegments(ReadSegmentsWriter
, &data
, aCount
, aResult
);
240 NonBlockingAsyncInputStream::IsNonBlocking(bool* aNonBlocking
) {
241 *aNonBlocking
= true;
245 // nsICloneableInputStream interface
248 NonBlockingAsyncInputStream::GetCloneable(bool* aCloneable
) {
249 NS_ENSURE_STATE(mWeakCloneableInputStream
);
250 return mWeakCloneableInputStream
->GetCloneable(aCloneable
);
254 NonBlockingAsyncInputStream::Clone(nsIInputStream
** aResult
) {
255 NS_ENSURE_STATE(mWeakCloneableInputStream
);
257 nsCOMPtr
<nsIInputStream
> clonedStream
;
258 nsresult rv
= mWeakCloneableInputStream
->Clone(getter_AddRefs(clonedStream
));
259 if (NS_WARN_IF(NS_FAILED(rv
))) {
263 nsCOMPtr
<nsIAsyncInputStream
> asyncStream
;
264 rv
= Create(clonedStream
.forget(), getter_AddRefs(asyncStream
));
265 if (NS_WARN_IF(NS_FAILED(rv
))) {
269 asyncStream
.forget(aResult
);
273 // nsIAsyncInputStream interface
276 NonBlockingAsyncInputStream::CloseWithStatus(nsresult aStatus
) {
281 NonBlockingAsyncInputStream::AsyncWait(nsIInputStreamCallback
* aCallback
,
283 uint32_t aRequestedCount
,
284 nsIEventTarget
* aEventTarget
) {
285 RefPtr
<AsyncWaitRunnable
> runnable
;
287 MutexAutoLock
lock(mLock
);
289 mWaitClosureOnly
.reset();
290 mAsyncWaitCallback
= nullptr;
293 // Canceling previous callbacks, which is done above.
297 // Maybe the stream is already closed.
300 nsresult rv
= mInputStream
->Available(&length
);
301 if (NS_SUCCEEDED(rv
) && length
== 0) {
302 mInputStream
->Close();
307 runnable
= new AsyncWaitRunnable(this, aCallback
);
308 if ((aFlags
& nsIAsyncInputStream::WAIT_CLOSURE_ONLY
) && !mClosed
) {
309 mWaitClosureOnly
.emplace(runnable
, aEventTarget
);
313 mAsyncWaitCallback
= runnable
;
316 MOZ_ASSERT(runnable
);
319 return aEventTarget
->Dispatch(runnable
.forget());
322 return runnable
->Run();
325 // nsIIPCSerializableInputStream
327 void NonBlockingAsyncInputStream::SerializedComplexity(
328 uint32_t aMaxSize
, uint32_t* aSizeUsed
, uint32_t* aPipes
,
329 uint32_t* aTransferables
) {
330 InputStreamHelper::SerializedComplexity(mInputStream
, aMaxSize
, aSizeUsed
,
331 aPipes
, aTransferables
);
334 void NonBlockingAsyncInputStream::Serialize(
335 mozilla::ipc::InputStreamParams
& aParams
, uint32_t aMaxSize
,
336 uint32_t* aSizeUsed
) {
337 MOZ_ASSERT(mWeakIPCSerializableInputStream
);
338 InputStreamHelper::SerializeInputStream(mInputStream
, aParams
, aMaxSize
,
342 bool NonBlockingAsyncInputStream::Deserialize(
343 const mozilla::ipc::InputStreamParams
& aParams
) {
344 MOZ_CRASH("NonBlockingAsyncInputStream cannot be deserialized!");
351 NonBlockingAsyncInputStream::Seek(int32_t aWhence
, int64_t aOffset
) {
352 NS_ENSURE_STATE(mWeakSeekableInputStream
);
353 return mWeakSeekableInputStream
->Seek(aWhence
, aOffset
);
357 NonBlockingAsyncInputStream::SetEOF() {
358 NS_ENSURE_STATE(mWeakSeekableInputStream
);
359 return NS_ERROR_NOT_IMPLEMENTED
;
365 NonBlockingAsyncInputStream::Tell(int64_t* aResult
) {
366 NS_ENSURE_STATE(mWeakTellableInputStream
);
367 return mWeakTellableInputStream
->Tell(aResult
);
370 void NonBlockingAsyncInputStream::RunAsyncWaitCallback(
371 NonBlockingAsyncInputStream::AsyncWaitRunnable
* aRunnable
,
372 already_AddRefed
<nsIInputStreamCallback
> aCallback
) {
373 nsCOMPtr
<nsIInputStreamCallback
> callback
= std::move(aCallback
);
376 MutexAutoLock
lock(mLock
);
377 if (mAsyncWaitCallback
!= aRunnable
) {
378 // The callback has been canceled in the meantime.
382 mAsyncWaitCallback
= nullptr;
385 callback
->OnInputStreamReady(this);
388 } // namespace mozilla