1 /* -*- Mode: C++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2 /* vim:set ts=4 sts=4 sw=4 et cin: */
3 /* This Source Code Form is subject to the terms of the Mozilla Public
4 * License, v. 2.0. If a copy of the MPL was not distributed with this
5 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
7 #include "nsIOService.h"
8 #include "nsInputStreamPump.h"
9 #include "nsIServiceManager.h"
10 #include "nsIStreamTransportService.h"
11 #include "nsIInterfaceRequestorUtils.h"
12 #include "nsISeekableStream.h"
13 #include "nsITransport.h"
14 #include "nsNetUtil.h"
15 #include "nsThreadUtils.h"
20 static NS_DEFINE_CID(kStreamTransportServiceCID
, NS_STREAMTRANSPORTSERVICE_CID
);
22 #if defined(PR_LOGGING)
24 // NSPR_LOG_MODULES=nsStreamPump:5
26 static PRLogModuleInfo
*gStreamPumpLog
= nullptr;
28 #define LOG(args) PR_LOG(gStreamPumpLog, PR_LOG_DEBUG, args)
30 //-----------------------------------------------------------------------------
31 // nsInputStreamPump methods
32 //-----------------------------------------------------------------------------
34 nsInputStreamPump::nsInputStreamPump()
37 , mStreamLength(LL_MaxUint())
40 , mLoadFlags(LOAD_NORMAL
)
42 , mCloseWhenDone(false)
44 #if defined(PR_LOGGING)
46 gStreamPumpLog
= PR_NewLogModule("nsStreamPump");
50 nsInputStreamPump::~nsInputStreamPump()
55 nsInputStreamPump::Create(nsInputStreamPump
**result
,
56 nsIInputStream
*stream
,
63 nsresult rv
= NS_ERROR_OUT_OF_MEMORY
;
64 nsRefPtr
<nsInputStreamPump
> pump
= new nsInputStreamPump();
66 rv
= pump
->Init(stream
, streamPos
, streamLen
,
67 segsize
, segcount
, closeWhenDone
);
68 if (NS_SUCCEEDED(rv
)) {
77 PeekData(nsInputStreamPump::PeekSegmentFun fun
, void* closure
)
78 : mFunc(fun
), mClosure(closure
) {}
80 nsInputStreamPump::PeekSegmentFun mFunc
;
85 CallPeekFunc(nsIInputStream
*aInStream
, void *aClosure
,
86 const char *aFromSegment
, uint32_t aToOffset
, uint32_t aCount
,
87 uint32_t *aWriteCount
)
89 NS_ASSERTION(aToOffset
== 0, "Called more than once?");
90 NS_ASSERTION(aCount
> 0, "Called without data?");
92 PeekData
* data
= static_cast<PeekData
*>(aClosure
);
93 data
->mFunc(data
->mClosure
,
94 reinterpret_cast<const uint8_t*>(aFromSegment
), aCount
);
95 return NS_BINDING_ABORTED
;
99 nsInputStreamPump::PeekStream(PeekSegmentFun callback
, void* closure
)
101 NS_ASSERTION(mAsyncStream
, "PeekStream called without stream");
103 // See if the pipe is closed by checking the return of Available.
105 nsresult rv
= mAsyncStream
->Available(&dummy64
);
108 uint32_t dummy
= (uint32_t)NS_MIN(dummy64
, (uint64_t)PR_UINT32_MAX
);
110 PeekData
data(callback
, closure
);
111 return mAsyncStream
->ReadSegments(CallPeekFunc
,
113 nsIOService::gDefaultSegmentSize
,
118 nsInputStreamPump::EnsureWaiting()
120 // no need to worry about multiple threads... an input stream pump lives
121 // on only one thread.
124 nsresult rv
= mAsyncStream
->AsyncWait(this, 0, 0, mTargetThread
);
126 NS_ERROR("AsyncWait failed");
134 //-----------------------------------------------------------------------------
135 // nsInputStreamPump::nsISupports
136 //-----------------------------------------------------------------------------
138 // although this class can only be accessed from one thread at a time, we do
139 // allow its ownership to move from thread to thread, assuming the consumer
140 // understands the limitations of this.
141 NS_IMPL_THREADSAFE_ISUPPORTS3(nsInputStreamPump
,
143 nsIInputStreamCallback
,
146 //-----------------------------------------------------------------------------
147 // nsInputStreamPump::nsIRequest
148 //-----------------------------------------------------------------------------
151 nsInputStreamPump::GetName(nsACString
&result
)
158 nsInputStreamPump::IsPending(bool *result
)
160 *result
= (mState
!= STATE_IDLE
);
165 nsInputStreamPump::GetStatus(nsresult
*status
)
172 nsInputStreamPump::Cancel(nsresult status
)
174 LOG(("nsInputStreamPump::Cancel [this=%x status=%x]\n",
177 if (NS_FAILED(mStatus
)) {
178 LOG((" already canceled\n"));
182 NS_ASSERTION(NS_FAILED(status
), "cancel with non-failure status code");
185 // close input stream
187 mAsyncStream
->CloseWithStatus(status
);
188 if (mSuspendCount
== 0)
190 // Otherwise, EnsureWaiting will be called by Resume().
191 // Note that while suspended, OnInputStreamReady will
192 // not do anything, and also note that calling asyncWait
193 // on a closed stream works and will dispatch an event immediately.
199 nsInputStreamPump::Suspend()
201 LOG(("nsInputStreamPump::Suspend [this=%x]\n", this));
202 NS_ENSURE_TRUE(mState
!= STATE_IDLE
, NS_ERROR_UNEXPECTED
);
208 nsInputStreamPump::Resume()
210 LOG(("nsInputStreamPump::Resume [this=%x]\n", this));
211 NS_ENSURE_TRUE(mSuspendCount
> 0, NS_ERROR_UNEXPECTED
);
212 NS_ENSURE_TRUE(mState
!= STATE_IDLE
, NS_ERROR_UNEXPECTED
);
214 if (--mSuspendCount
== 0)
220 nsInputStreamPump::GetLoadFlags(nsLoadFlags
*aLoadFlags
)
222 *aLoadFlags
= mLoadFlags
;
227 nsInputStreamPump::SetLoadFlags(nsLoadFlags aLoadFlags
)
229 mLoadFlags
= aLoadFlags
;
234 nsInputStreamPump::GetLoadGroup(nsILoadGroup
**aLoadGroup
)
236 NS_IF_ADDREF(*aLoadGroup
= mLoadGroup
);
241 nsInputStreamPump::SetLoadGroup(nsILoadGroup
*aLoadGroup
)
243 mLoadGroup
= aLoadGroup
;
247 //-----------------------------------------------------------------------------
248 // nsInputStreamPump::nsIInputStreamPump implementation
249 //-----------------------------------------------------------------------------
252 nsInputStreamPump::Init(nsIInputStream
*stream
,
253 int64_t streamPos
, int64_t streamLen
,
254 uint32_t segsize
, uint32_t segcount
,
257 NS_ENSURE_TRUE(mState
== STATE_IDLE
, NS_ERROR_IN_PROGRESS
);
259 mStreamOffset
= uint64_t(streamPos
);
260 if (int64_t(streamLen
) >= int64_t(0))
261 mStreamLength
= uint64_t(streamLen
);
264 mSegCount
= segcount
;
265 mCloseWhenDone
= closeWhenDone
;
271 nsInputStreamPump::AsyncRead(nsIStreamListener
*listener
, nsISupports
*ctxt
)
273 NS_ENSURE_TRUE(mState
== STATE_IDLE
, NS_ERROR_IN_PROGRESS
);
274 NS_ENSURE_ARG_POINTER(listener
);
277 // OK, we need to use the stream transport service if
279 // (1) the stream is blocking
280 // (2) the stream does not support nsIAsyncInputStream
284 nsresult rv
= mStream
->IsNonBlocking(&nonBlocking
);
285 if (NS_FAILED(rv
)) return rv
;
288 mAsyncStream
= do_QueryInterface(mStream
);
290 // if the stream supports nsIAsyncInputStream, and if we need to seek
291 // to a starting offset, then we must do so here. in the non-async
292 // stream case, the stream transport service will take care of seeking
295 if (mAsyncStream
&& (mStreamOffset
!= LL_MAXUINT
)) {
296 nsCOMPtr
<nsISeekableStream
> seekable
= do_QueryInterface(mStream
);
298 seekable
->Seek(nsISeekableStream::NS_SEEK_SET
, mStreamOffset
);
303 // ok, let's use the stream transport service to read this stream.
304 nsCOMPtr
<nsIStreamTransportService
> sts
=
305 do_GetService(kStreamTransportServiceCID
, &rv
);
306 if (NS_FAILED(rv
)) return rv
;
308 nsCOMPtr
<nsITransport
> transport
;
309 rv
= sts
->CreateInputTransport(mStream
, mStreamOffset
, mStreamLength
,
310 mCloseWhenDone
, getter_AddRefs(transport
));
311 if (NS_FAILED(rv
)) return rv
;
313 nsCOMPtr
<nsIInputStream
> wrapper
;
314 rv
= transport
->OpenInputStream(0, mSegSize
, mSegCount
, getter_AddRefs(wrapper
));
315 if (NS_FAILED(rv
)) return rv
;
317 mAsyncStream
= do_QueryInterface(wrapper
, &rv
);
318 if (NS_FAILED(rv
)) return rv
;
321 // release our reference to the original stream. from this point forward,
322 // we only reference the "stream" via mAsyncStream.
325 // mStreamOffset now holds the number of bytes currently read. we use this
326 // to enforce the mStreamLength restriction.
329 // grab event queue (we must do this here by contract, since all notifications
330 // must go to the thread which called AsyncRead)
331 mTargetThread
= do_GetCurrentThread();
332 NS_ENSURE_STATE(mTargetThread
);
334 rv
= EnsureWaiting();
335 if (NS_FAILED(rv
)) return rv
;
338 mLoadGroup
->AddRequest(this, nullptr);
340 mState
= STATE_START
;
341 mListener
= listener
;
342 mListenerContext
= ctxt
;
346 //-----------------------------------------------------------------------------
347 // nsInputStreamPump::nsIInputStreamCallback implementation
348 //-----------------------------------------------------------------------------
351 nsInputStreamPump::OnInputStreamReady(nsIAsyncInputStream
*stream
)
353 LOG(("nsInputStreamPump::OnInputStreamReady [this=%x]\n", this));
355 SAMPLE_LABEL("Input", "nsInputStreamPump::OnInputStreamReady");
356 // this function has been called from a PLEvent, so we can safely call
357 // any listener or progress sink methods directly from here.
360 if (mSuspendCount
|| mState
== STATE_IDLE
) {
368 nextState
= OnStateStart();
371 nextState
= OnStateTransfer();
374 nextState
= OnStateStop();
378 NS_NOTREACHED("Unknown enum value.");
379 return NS_ERROR_UNEXPECTED
;
382 if (mState
== nextState
&& !mSuspendCount
) {
383 NS_ASSERTION(mState
== STATE_TRANSFER
, "unexpected state");
384 NS_ASSERTION(NS_SUCCEEDED(mStatus
), "unexpected status");
387 mStatus
= EnsureWaiting();
388 if (NS_SUCCEEDED(mStatus
))
391 nextState
= STATE_STOP
;
400 nsInputStreamPump::OnStateStart()
402 SAMPLE_LABEL("nsInputStreamPump", "OnStateStart");
403 LOG((" OnStateStart [this=%x]\n", this));
407 // need to check the reason why the stream is ready. this is required
408 // so our listener can check our status from OnStartRequest.
409 // XXX async streams should have a GetStatus method!
410 if (NS_SUCCEEDED(mStatus
)) {
412 rv
= mAsyncStream
->Available(&avail
);
413 if (NS_FAILED(rv
) && rv
!= NS_BASE_STREAM_CLOSED
)
417 rv
= mListener
->OnStartRequest(this, mListenerContext
);
419 // an error returned from OnStartRequest should cause us to abort; however,
420 // we must not stomp on mStatus if already canceled.
421 if (NS_FAILED(rv
) && NS_SUCCEEDED(mStatus
))
424 return NS_SUCCEEDED(mStatus
) ? STATE_TRANSFER
: STATE_STOP
;
428 nsInputStreamPump::OnStateTransfer()
430 SAMPLE_LABEL("Input", "nsInputStreamPump::OnStateTransfer");
431 LOG((" OnStateTransfer [this=%x]\n", this));
433 // if canceled, go directly to STATE_STOP...
434 if (NS_FAILED(mStatus
))
440 rv
= mAsyncStream
->Available(&avail
);
441 LOG((" Available returned [stream=%x rv=%x avail=%llu]\n", mAsyncStream
.get(), rv
, avail
));
443 if (rv
== NS_BASE_STREAM_CLOSED
) {
447 else if (NS_SUCCEEDED(rv
) && avail
) {
448 // figure out how much data to report (XXX detect overflow??)
449 if (avail
> mStreamLength
- mStreamOffset
)
450 avail
= mStreamLength
- mStreamOffset
;
453 // we used to limit avail to 16K - we were afraid some ODA handlers
454 // might assume they wouldn't get more than 16K at once
455 // we're removing that limit since it speeds up local file access.
456 // Now there's an implicit 64K limit of 4 16K segments
457 // NOTE: ok, so the story is as follows. OnDataAvailable impls
458 // are by contract supposed to consume exactly |avail| bytes.
459 // however, many do not... mailnews... stream converters...
460 // cough, cough. the input stream pump is fairly tolerant
461 // in this regard; however, if an ODA does not consume any
462 // data from the stream, then we could potentially end up in
463 // an infinite loop. we do our best here to try to catch
464 // such an error. (see bug 189672)
466 // in most cases this QI will succeed (mAsyncStream is almost always
467 // a nsPipeInputStream, which implements nsISeekableStream::Tell).
468 int64_t offsetBefore
;
469 nsCOMPtr
<nsISeekableStream
> seekable
= do_QueryInterface(mAsyncStream
);
470 if (seekable
&& NS_FAILED(seekable
->Tell(&offsetBefore
))) {
471 NS_NOTREACHED("Tell failed on readable stream");
475 // report the current stream offset to our listener... if we've
476 // streamed more than PR_UINT32_MAX, then avoid overflowing the
477 // stream offset. it's the best we can do without a 64-bit stream
480 mStreamOffset
> PR_UINT32_MAX
?
481 PR_UINT32_MAX
: uint32_t(mStreamOffset
);
483 avail
> PR_UINT32_MAX
?
484 PR_UINT32_MAX
: uint32_t(avail
);
486 LOG((" calling OnDataAvailable [offset=%lld(%u) count=%llu(%u)]\n",
487 mStreamOffset
, odaOffset
, avail
, odaAvail
));
489 rv
= mListener
->OnDataAvailable(this, mListenerContext
, mAsyncStream
,
490 odaOffset
, odaAvail
);
492 // don't enter this code if ODA failed or called Cancel
493 if (NS_SUCCEEDED(rv
) && NS_SUCCEEDED(mStatus
)) {
494 // test to see if this ODA failed to consume data
496 // NOTE: if Tell fails, which can happen if the stream is
497 // now closed, then we assume that everything was read.
499 if (NS_FAILED(seekable
->Tell(&offsetAfter
)))
500 offsetAfter
= offsetBefore
+ odaAvail
;
501 if (offsetAfter
> offsetBefore
)
502 mStreamOffset
+= (offsetAfter
- offsetBefore
);
503 else if (mSuspendCount
== 0) {
505 // possible infinite loop if we continue pumping data!
507 // NOTE: although not allowed by nsIStreamListener, we
508 // will allow the ODA impl to Suspend the pump. IMAP
511 NS_ERROR("OnDataAvailable implementation consumed no data");
512 mStatus
= NS_ERROR_UNEXPECTED
;
516 mStreamOffset
+= odaAvail
; // assume ODA behaved well
521 // an error returned from Available or OnDataAvailable should cause us to
522 // abort; however, we must not stomp on mStatus if already canceled.
524 if (NS_SUCCEEDED(mStatus
)) {
528 // if stream is now closed, advance to STATE_STOP right away.
529 // Available may return 0 bytes available at the moment; that
530 // would not mean that we are done.
531 // XXX async streams should have a GetStatus method!
532 rv
= mAsyncStream
->Available(&avail
);
533 if (NS_SUCCEEDED(rv
))
534 return STATE_TRANSFER
;
541 nsInputStreamPump::OnStateStop()
543 SAMPLE_LABEL("Input", "nsInputStreamPump::OnStateTransfer");
544 LOG((" OnStateStop [this=%x status=%x]\n", this, mStatus
));
546 // if an error occurred, we must be sure to pass the error onto the async
547 // stream. in some cases, this is redundant, but since close is idempotent,
548 // this is OK. otherwise, be sure to honor the "close-when-done" option.
550 if (NS_FAILED(mStatus
))
551 mAsyncStream
->CloseWithStatus(mStatus
);
552 else if (mCloseWhenDone
)
553 mAsyncStream
->Close();
559 mListener
->OnStopRequest(this, mListenerContext
, mStatus
);
561 mListenerContext
= 0;
564 mLoadGroup
->RemoveRequest(this, nullptr, mStatus
);