1 /* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
5 #include "nsAsyncStreamCopier.h"
6 #include "nsIOService.h"
7 #include "nsIEventTarget.h"
8 #include "nsStreamUtils.h"
9 #include "nsThreadUtils.h"
10 #include "nsNetUtil.h"
12 #include "nsIBufferedStreams.h"
13 #include "nsIRequestObserver.h"
14 #include "mozilla/Logging.h"
16 using namespace mozilla
;
17 using namespace mozilla::net
;
21 // MOZ_LOG=nsStreamCopier:5
23 static LazyLogModule
gStreamCopierLog("nsStreamCopier");
24 #define LOG(args) MOZ_LOG(gStreamCopierLog, mozilla::LogLevel::Debug, args)
27 * An event used to perform initialization off the main thread.
29 class AsyncApplyBufferingPolicyEvent final
: public Runnable
{
33 * The nsAsyncStreamCopier requesting the information.
35 explicit AsyncApplyBufferingPolicyEvent(nsAsyncStreamCopier
* aCopier
)
36 : mozilla::Runnable("AsyncApplyBufferingPolicyEvent"),
38 mTarget(GetCurrentEventTarget()) {}
40 NS_IMETHOD
Run() override
{
41 nsresult rv
= mCopier
->ApplyBufferingPolicy();
47 rv
= mTarget
->Dispatch(
48 NewRunnableMethod("nsAsyncStreamCopier::AsyncCopyInternal", mCopier
,
49 &nsAsyncStreamCopier::AsyncCopyInternal
),
51 MOZ_ASSERT(NS_SUCCEEDED(rv
));
60 RefPtr
<nsAsyncStreamCopier
> mCopier
;
61 nsCOMPtr
<nsIEventTarget
> mTarget
;
64 //-----------------------------------------------------------------------------
66 nsAsyncStreamCopier::nsAsyncStreamCopier()
67 : mChunkSize(nsIOService::gDefaultSegmentSize
) {
68 LOG(("Creating nsAsyncStreamCopier @%p\n", this));
71 nsAsyncStreamCopier::~nsAsyncStreamCopier() {
72 LOG(("Destroying nsAsyncStreamCopier @%p\n", this));
75 bool nsAsyncStreamCopier::IsComplete(nsresult
* status
) {
76 MutexAutoLock
lock(mLock
);
77 if (status
) *status
= mStatus
;
81 nsIRequest
* nsAsyncStreamCopier::AsRequest() {
82 return static_cast<nsIRequest
*>(static_cast<nsIAsyncStreamCopier
*>(this));
85 void nsAsyncStreamCopier::Complete(nsresult status
) {
86 LOG(("nsAsyncStreamCopier::Complete [this=%p status=%" PRIx32
"]\n", this,
87 static_cast<uint32_t>(status
)));
89 nsCOMPtr
<nsIRequestObserver
> observer
;
90 nsCOMPtr
<nsISupports
> ctx
;
92 MutexAutoLock
lock(mLock
);
99 // setup OnStopRequest callback and release references...
100 observer
= mObserver
;
106 LOG((" calling OnStopRequest [status=%" PRIx32
"]\n",
107 static_cast<uint32_t>(status
)));
108 observer
->OnStopRequest(AsRequest(), status
);
112 void nsAsyncStreamCopier::OnAsyncCopyComplete(void* closure
, nsresult status
) {
113 // AddRef'd in AsyncCopy. Will be released at the end of the method.
114 RefPtr
<nsAsyncStreamCopier
> self
= dont_AddRef((nsAsyncStreamCopier
*)closure
);
115 self
->Complete(status
);
118 //-----------------------------------------------------------------------------
121 // We cannot use simply NS_IMPL_ISUPPORTSx as both
122 // nsIAsyncStreamCopier and nsIAsyncStreamCopier2 implement nsIRequest
124 NS_IMPL_ADDREF(nsAsyncStreamCopier
)
125 NS_IMPL_RELEASE(nsAsyncStreamCopier
)
126 NS_INTERFACE_TABLE_HEAD(nsAsyncStreamCopier
)
127 NS_INTERFACE_TABLE_BEGIN
128 NS_INTERFACE_TABLE_ENTRY(nsAsyncStreamCopier
, nsIAsyncStreamCopier
)
129 NS_INTERFACE_TABLE_ENTRY(nsAsyncStreamCopier
, nsIAsyncStreamCopier2
)
130 NS_INTERFACE_TABLE_ENTRY_AMBIGUOUS(nsAsyncStreamCopier
, nsIRequest
,
131 nsIAsyncStreamCopier
)
132 NS_INTERFACE_TABLE_ENTRY_AMBIGUOUS(nsAsyncStreamCopier
, nsISupports
,
133 nsIAsyncStreamCopier
)
134 NS_INTERFACE_TABLE_END
135 NS_INTERFACE_TABLE_TAIL
137 //-----------------------------------------------------------------------------
141 nsAsyncStreamCopier::GetName(nsACString
& name
) {
147 nsAsyncStreamCopier::IsPending(bool* result
) {
148 *result
= !IsComplete();
153 nsAsyncStreamCopier::GetStatus(nsresult
* status
) {
159 nsAsyncStreamCopier::Cancel(nsresult status
) {
160 nsCOMPtr
<nsISupports
> copierCtx
;
162 MutexAutoLock
lock(mLock
);
163 if (!mIsPending
) return NS_OK
;
164 copierCtx
.swap(mCopierCtx
);
167 if (NS_SUCCEEDED(status
)) {
168 NS_WARNING("cancel with non-failure status code");
169 status
= NS_BASE_STREAM_CLOSED
;
172 if (copierCtx
) NS_CancelAsyncCopy(copierCtx
, status
);
178 nsAsyncStreamCopier::Suspend() {
179 MOZ_ASSERT_UNREACHABLE("nsAsyncStreamCopier::Suspend");
180 return NS_ERROR_NOT_IMPLEMENTED
;
184 nsAsyncStreamCopier::Resume() {
185 MOZ_ASSERT_UNREACHABLE("nsAsyncStreamCopier::Resume");
186 return NS_ERROR_NOT_IMPLEMENTED
;
190 nsAsyncStreamCopier::GetLoadFlags(nsLoadFlags
* aLoadFlags
) {
191 *aLoadFlags
= LOAD_NORMAL
;
196 nsAsyncStreamCopier::SetLoadFlags(nsLoadFlags aLoadFlags
) { return NS_OK
; }
199 nsAsyncStreamCopier::GetTRRMode(nsIRequest::TRRMode
* aTRRMode
) {
200 return nsIAsyncStreamCopier::GetTRRModeImpl(aTRRMode
);
204 nsAsyncStreamCopier::SetTRRMode(nsIRequest::TRRMode aTRRMode
) {
205 return nsIAsyncStreamCopier::SetTRRModeImpl(aTRRMode
);
209 nsAsyncStreamCopier::GetLoadGroup(nsILoadGroup
** aLoadGroup
) {
210 *aLoadGroup
= nullptr;
215 nsAsyncStreamCopier::SetLoadGroup(nsILoadGroup
* aLoadGroup
) { return NS_OK
; }
217 nsresult
nsAsyncStreamCopier::InitInternal(nsIInputStream
* source
,
218 nsIOutputStream
* sink
,
219 nsIEventTarget
* target
,
220 uint32_t chunkSize
, bool closeSource
,
222 NS_ASSERTION(!mSource
&& !mSink
, "Init() called more than once");
223 if (chunkSize
== 0) {
224 chunkSize
= nsIOService::gDefaultSegmentSize
;
226 mChunkSize
= chunkSize
;
230 mCloseSource
= closeSource
;
231 mCloseSink
= closeSink
;
237 mTarget
= do_GetService(NS_STREAMTRANSPORTSERVICE_CONTRACTID
, &rv
);
246 //-----------------------------------------------------------------------------
247 // nsIAsyncStreamCopier
250 nsAsyncStreamCopier::Init(nsIInputStream
* source
, nsIOutputStream
* sink
,
251 nsIEventTarget
* target
, bool sourceBuffered
,
252 bool sinkBuffered
, uint32_t chunkSize
,
253 bool closeSource
, bool closeSink
) {
254 NS_ASSERTION(sourceBuffered
|| sinkBuffered
,
255 "at least one stream must be buffered");
256 mMode
= sourceBuffered
? NS_ASYNCCOPY_VIA_READSEGMENTS
257 : NS_ASYNCCOPY_VIA_WRITESEGMENTS
;
259 return InitInternal(source
, sink
, target
, chunkSize
, closeSource
, closeSink
);
262 //-----------------------------------------------------------------------------
263 // nsIAsyncStreamCopier2
266 nsAsyncStreamCopier::Init(nsIInputStream
* source
, nsIOutputStream
* sink
,
267 nsIEventTarget
* target
, uint32_t chunkSize
,
268 bool closeSource
, bool closeSink
) {
269 mShouldSniffBuffering
= true;
271 return InitInternal(source
, sink
, target
, chunkSize
, closeSource
, closeSink
);
275 * Detect whether the input or the output stream is buffered,
276 * bufferize one of them if neither is buffered.
278 nsresult
nsAsyncStreamCopier::ApplyBufferingPolicy() {
279 // This function causes I/O, it must not be executed on the main
281 MOZ_ASSERT(!NS_IsMainThread());
283 if (NS_OutputStreamIsBuffered(mSink
)) {
284 // Sink is buffered, no need to perform additional buffering
285 mMode
= NS_ASYNCCOPY_VIA_WRITESEGMENTS
;
288 if (NS_InputStreamIsBuffered(mSource
)) {
289 // Source is buffered, no need to perform additional buffering
290 mMode
= NS_ASYNCCOPY_VIA_READSEGMENTS
;
294 // No buffering, let's buffer the sink
296 nsCOMPtr
<nsIBufferedOutputStream
> sink
=
297 do_CreateInstance(NS_BUFFEREDOUTPUTSTREAM_CONTRACTID
, &rv
);
302 rv
= sink
->Init(mSink
, mChunkSize
);
307 mMode
= NS_ASYNCCOPY_VIA_WRITESEGMENTS
;
312 //-----------------------------------------------------------------------------
313 // Both nsIAsyncStreamCopier and nsIAsyncStreamCopier2
316 nsAsyncStreamCopier::AsyncCopy(nsIRequestObserver
* observer
, nsISupports
* ctx
) {
317 LOG(("nsAsyncStreamCopier::AsyncCopy [this=%p observer=%p]\n", this,
320 NS_ASSERTION(mSource
&& mSink
, "not initialized");
324 // build proxy for observer events
325 rv
= NS_NewRequestObserverProxy(getter_AddRefs(mObserver
), observer
, ctx
);
326 if (NS_FAILED(rv
)) return rv
;
329 // from this point forward, AsyncCopy is going to return NS_OK. any errors
330 // will be reported via OnStopRequest.
334 rv
= mObserver
->OnStartRequest(AsRequest());
335 if (NS_FAILED(rv
)) Cancel(rv
);
338 if (!mShouldSniffBuffering
) {
339 // No buffer sniffing required, let's proceed
344 if (NS_IsMainThread()) {
345 // Don't perform buffer sniffing on the main thread
346 nsCOMPtr
<nsIRunnable
> event
= new AsyncApplyBufferingPolicyEvent(this);
347 rv
= mTarget
->Dispatch(event
, NS_DISPATCH_NORMAL
);
354 // We're not going to block the main thread, so let's sniff here
355 rv
= ApplyBufferingPolicy();
363 // Launch async copy.
364 // All errors are reported through the observer.
365 void nsAsyncStreamCopier::AsyncCopyInternal() {
366 MOZ_ASSERT(mMode
== NS_ASYNCCOPY_VIA_READSEGMENTS
||
367 mMode
== NS_ASYNCCOPY_VIA_WRITESEGMENTS
);
370 // We want to receive progress notifications; release happens in
371 // OnAsyncCopyComplete.
372 RefPtr
<nsAsyncStreamCopier
> self
= this;
374 MutexAutoLock
lock(mLock
);
375 rv
= NS_AsyncCopy(mSource
, mSink
, mTarget
, mMode
, mChunkSize
,
376 OnAsyncCopyComplete
, this, mCloseSource
, mCloseSink
,
377 getter_AddRefs(mCopierCtx
));
381 return; // release self
384 Unused
<< self
.forget(); // Will be released in OnAsyncCopyComplete