Bug 785860 - fix sts preload list tests to skip private mode tests if private browsin...
[gecko.git] / netwerk / base / src / nsInputStreamPump.cpp
blob6e68def3dabf5c62c0e3cb3aad341d6696eac018
1 /* -*- Mode: C++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2 /* vim:set ts=4 sts=4 sw=4 et cin: */
3 /* This Source Code Form is subject to the terms of the Mozilla Public
4 * License, v. 2.0. If a copy of the MPL was not distributed with this
5 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
7 #include "nsIOService.h"
8 #include "nsInputStreamPump.h"
9 #include "nsIServiceManager.h"
10 #include "nsIStreamTransportService.h"
11 #include "nsIInterfaceRequestorUtils.h"
12 #include "nsISeekableStream.h"
13 #include "nsITransport.h"
14 #include "nsNetUtil.h"
15 #include "nsThreadUtils.h"
16 #include "nsCOMPtr.h"
17 #include "prlog.h"
18 #include "sampler.h"
20 static NS_DEFINE_CID(kStreamTransportServiceCID, NS_STREAMTRANSPORTSERVICE_CID);
22 #if defined(PR_LOGGING)
24 // NSPR_LOG_MODULES=nsStreamPump:5
26 static PRLogModuleInfo *gStreamPumpLog = nullptr;
27 #endif
28 #define LOG(args) PR_LOG(gStreamPumpLog, PR_LOG_DEBUG, args)
30 //-----------------------------------------------------------------------------
31 // nsInputStreamPump methods
32 //-----------------------------------------------------------------------------
34 nsInputStreamPump::nsInputStreamPump()
35 : mState(STATE_IDLE)
36 , mStreamOffset(0)
37 , mStreamLength(LL_MaxUint())
38 , mStatus(NS_OK)
39 , mSuspendCount(0)
40 , mLoadFlags(LOAD_NORMAL)
41 , mWaiting(false)
42 , mCloseWhenDone(false)
44 #if defined(PR_LOGGING)
45 if (!gStreamPumpLog)
46 gStreamPumpLog = PR_NewLogModule("nsStreamPump");
47 #endif
50 nsInputStreamPump::~nsInputStreamPump()
54 nsresult
55 nsInputStreamPump::Create(nsInputStreamPump **result,
56 nsIInputStream *stream,
57 int64_t streamPos,
58 int64_t streamLen,
59 uint32_t segsize,
60 uint32_t segcount,
61 bool closeWhenDone)
63 nsresult rv = NS_ERROR_OUT_OF_MEMORY;
64 nsRefPtr<nsInputStreamPump> pump = new nsInputStreamPump();
65 if (pump) {
66 rv = pump->Init(stream, streamPos, streamLen,
67 segsize, segcount, closeWhenDone);
68 if (NS_SUCCEEDED(rv)) {
69 *result = nullptr;
70 pump.swap(*result);
73 return rv;
76 struct PeekData {
77 PeekData(nsInputStreamPump::PeekSegmentFun fun, void* closure)
78 : mFunc(fun), mClosure(closure) {}
80 nsInputStreamPump::PeekSegmentFun mFunc;
81 void* mClosure;
84 static NS_METHOD
85 CallPeekFunc(nsIInputStream *aInStream, void *aClosure,
86 const char *aFromSegment, uint32_t aToOffset, uint32_t aCount,
87 uint32_t *aWriteCount)
89 NS_ASSERTION(aToOffset == 0, "Called more than once?");
90 NS_ASSERTION(aCount > 0, "Called without data?");
92 PeekData* data = static_cast<PeekData*>(aClosure);
93 data->mFunc(data->mClosure,
94 reinterpret_cast<const uint8_t*>(aFromSegment), aCount);
95 return NS_BINDING_ABORTED;
98 nsresult
99 nsInputStreamPump::PeekStream(PeekSegmentFun callback, void* closure)
101 NS_ASSERTION(mAsyncStream, "PeekStream called without stream");
103 // See if the pipe is closed by checking the return of Available.
104 uint64_t dummy64;
105 nsresult rv = mAsyncStream->Available(&dummy64);
106 if (NS_FAILED(rv))
107 return rv;
108 uint32_t dummy = (uint32_t)NS_MIN(dummy64, (uint64_t)PR_UINT32_MAX);
110 PeekData data(callback, closure);
111 return mAsyncStream->ReadSegments(CallPeekFunc,
112 &data,
113 nsIOService::gDefaultSegmentSize,
114 &dummy);
117 nsresult
118 nsInputStreamPump::EnsureWaiting()
120 // no need to worry about multiple threads... an input stream pump lives
121 // on only one thread.
123 if (!mWaiting) {
124 nsresult rv = mAsyncStream->AsyncWait(this, 0, 0, mTargetThread);
125 if (NS_FAILED(rv)) {
126 NS_ERROR("AsyncWait failed");
127 return rv;
129 mWaiting = true;
131 return NS_OK;
134 //-----------------------------------------------------------------------------
135 // nsInputStreamPump::nsISupports
136 //-----------------------------------------------------------------------------
138 // although this class can only be accessed from one thread at a time, we do
139 // allow its ownership to move from thread to thread, assuming the consumer
140 // understands the limitations of this.
141 NS_IMPL_THREADSAFE_ISUPPORTS3(nsInputStreamPump,
142 nsIRequest,
143 nsIInputStreamCallback,
144 nsIInputStreamPump)
146 //-----------------------------------------------------------------------------
147 // nsInputStreamPump::nsIRequest
148 //-----------------------------------------------------------------------------
150 NS_IMETHODIMP
151 nsInputStreamPump::GetName(nsACString &result)
153 result.Truncate();
154 return NS_OK;
157 NS_IMETHODIMP
158 nsInputStreamPump::IsPending(bool *result)
160 *result = (mState != STATE_IDLE);
161 return NS_OK;
164 NS_IMETHODIMP
165 nsInputStreamPump::GetStatus(nsresult *status)
167 *status = mStatus;
168 return NS_OK;
171 NS_IMETHODIMP
172 nsInputStreamPump::Cancel(nsresult status)
174 LOG(("nsInputStreamPump::Cancel [this=%x status=%x]\n",
175 this, status));
177 if (NS_FAILED(mStatus)) {
178 LOG((" already canceled\n"));
179 return NS_OK;
182 NS_ASSERTION(NS_FAILED(status), "cancel with non-failure status code");
183 mStatus = status;
185 // close input stream
186 if (mAsyncStream) {
187 mAsyncStream->CloseWithStatus(status);
188 if (mSuspendCount == 0)
189 EnsureWaiting();
190 // Otherwise, EnsureWaiting will be called by Resume().
191 // Note that while suspended, OnInputStreamReady will
192 // not do anything, and also note that calling asyncWait
193 // on a closed stream works and will dispatch an event immediately.
195 return NS_OK;
198 NS_IMETHODIMP
199 nsInputStreamPump::Suspend()
201 LOG(("nsInputStreamPump::Suspend [this=%x]\n", this));
202 NS_ENSURE_TRUE(mState != STATE_IDLE, NS_ERROR_UNEXPECTED);
203 ++mSuspendCount;
204 return NS_OK;
207 NS_IMETHODIMP
208 nsInputStreamPump::Resume()
210 LOG(("nsInputStreamPump::Resume [this=%x]\n", this));
211 NS_ENSURE_TRUE(mSuspendCount > 0, NS_ERROR_UNEXPECTED);
212 NS_ENSURE_TRUE(mState != STATE_IDLE, NS_ERROR_UNEXPECTED);
214 if (--mSuspendCount == 0)
215 EnsureWaiting();
216 return NS_OK;
219 NS_IMETHODIMP
220 nsInputStreamPump::GetLoadFlags(nsLoadFlags *aLoadFlags)
222 *aLoadFlags = mLoadFlags;
223 return NS_OK;
226 NS_IMETHODIMP
227 nsInputStreamPump::SetLoadFlags(nsLoadFlags aLoadFlags)
229 mLoadFlags = aLoadFlags;
230 return NS_OK;
233 NS_IMETHODIMP
234 nsInputStreamPump::GetLoadGroup(nsILoadGroup **aLoadGroup)
236 NS_IF_ADDREF(*aLoadGroup = mLoadGroup);
237 return NS_OK;
240 NS_IMETHODIMP
241 nsInputStreamPump::SetLoadGroup(nsILoadGroup *aLoadGroup)
243 mLoadGroup = aLoadGroup;
244 return NS_OK;
247 //-----------------------------------------------------------------------------
248 // nsInputStreamPump::nsIInputStreamPump implementation
249 //-----------------------------------------------------------------------------
251 NS_IMETHODIMP
252 nsInputStreamPump::Init(nsIInputStream *stream,
253 int64_t streamPos, int64_t streamLen,
254 uint32_t segsize, uint32_t segcount,
255 bool closeWhenDone)
257 NS_ENSURE_TRUE(mState == STATE_IDLE, NS_ERROR_IN_PROGRESS);
259 mStreamOffset = uint64_t(streamPos);
260 if (int64_t(streamLen) >= int64_t(0))
261 mStreamLength = uint64_t(streamLen);
262 mStream = stream;
263 mSegSize = segsize;
264 mSegCount = segcount;
265 mCloseWhenDone = closeWhenDone;
267 return NS_OK;
270 NS_IMETHODIMP
271 nsInputStreamPump::AsyncRead(nsIStreamListener *listener, nsISupports *ctxt)
273 NS_ENSURE_TRUE(mState == STATE_IDLE, NS_ERROR_IN_PROGRESS);
274 NS_ENSURE_ARG_POINTER(listener);
277 // OK, we need to use the stream transport service if
279 // (1) the stream is blocking
280 // (2) the stream does not support nsIAsyncInputStream
283 bool nonBlocking;
284 nsresult rv = mStream->IsNonBlocking(&nonBlocking);
285 if (NS_FAILED(rv)) return rv;
287 if (nonBlocking) {
288 mAsyncStream = do_QueryInterface(mStream);
290 // if the stream supports nsIAsyncInputStream, and if we need to seek
291 // to a starting offset, then we must do so here. in the non-async
292 // stream case, the stream transport service will take care of seeking
293 // for us.
295 if (mAsyncStream && (mStreamOffset != LL_MAXUINT)) {
296 nsCOMPtr<nsISeekableStream> seekable = do_QueryInterface(mStream);
297 if (seekable)
298 seekable->Seek(nsISeekableStream::NS_SEEK_SET, mStreamOffset);
302 if (!mAsyncStream) {
303 // ok, let's use the stream transport service to read this stream.
304 nsCOMPtr<nsIStreamTransportService> sts =
305 do_GetService(kStreamTransportServiceCID, &rv);
306 if (NS_FAILED(rv)) return rv;
308 nsCOMPtr<nsITransport> transport;
309 rv = sts->CreateInputTransport(mStream, mStreamOffset, mStreamLength,
310 mCloseWhenDone, getter_AddRefs(transport));
311 if (NS_FAILED(rv)) return rv;
313 nsCOMPtr<nsIInputStream> wrapper;
314 rv = transport->OpenInputStream(0, mSegSize, mSegCount, getter_AddRefs(wrapper));
315 if (NS_FAILED(rv)) return rv;
317 mAsyncStream = do_QueryInterface(wrapper, &rv);
318 if (NS_FAILED(rv)) return rv;
321 // release our reference to the original stream. from this point forward,
322 // we only reference the "stream" via mAsyncStream.
323 mStream = 0;
325 // mStreamOffset now holds the number of bytes currently read. we use this
326 // to enforce the mStreamLength restriction.
327 mStreamOffset = 0;
329 // grab event queue (we must do this here by contract, since all notifications
330 // must go to the thread which called AsyncRead)
331 mTargetThread = do_GetCurrentThread();
332 NS_ENSURE_STATE(mTargetThread);
334 rv = EnsureWaiting();
335 if (NS_FAILED(rv)) return rv;
337 if (mLoadGroup)
338 mLoadGroup->AddRequest(this, nullptr);
340 mState = STATE_START;
341 mListener = listener;
342 mListenerContext = ctxt;
343 return NS_OK;
346 //-----------------------------------------------------------------------------
347 // nsInputStreamPump::nsIInputStreamCallback implementation
348 //-----------------------------------------------------------------------------
350 NS_IMETHODIMP
351 nsInputStreamPump::OnInputStreamReady(nsIAsyncInputStream *stream)
353 LOG(("nsInputStreamPump::OnInputStreamReady [this=%x]\n", this));
355 SAMPLE_LABEL("Input", "nsInputStreamPump::OnInputStreamReady");
356 // this function has been called from a PLEvent, so we can safely call
357 // any listener or progress sink methods directly from here.
359 for (;;) {
360 if (mSuspendCount || mState == STATE_IDLE) {
361 mWaiting = false;
362 break;
365 uint32_t nextState;
366 switch (mState) {
367 case STATE_START:
368 nextState = OnStateStart();
369 break;
370 case STATE_TRANSFER:
371 nextState = OnStateTransfer();
372 break;
373 case STATE_STOP:
374 nextState = OnStateStop();
375 break;
376 default:
377 nextState = 0;
378 NS_NOTREACHED("Unknown enum value.");
379 return NS_ERROR_UNEXPECTED;
382 if (mState == nextState && !mSuspendCount) {
383 NS_ASSERTION(mState == STATE_TRANSFER, "unexpected state");
384 NS_ASSERTION(NS_SUCCEEDED(mStatus), "unexpected status");
386 mWaiting = false;
387 mStatus = EnsureWaiting();
388 if (NS_SUCCEEDED(mStatus))
389 break;
391 nextState = STATE_STOP;
394 mState = nextState;
396 return NS_OK;
399 uint32_t
400 nsInputStreamPump::OnStateStart()
402 SAMPLE_LABEL("nsInputStreamPump", "OnStateStart");
403 LOG((" OnStateStart [this=%x]\n", this));
405 nsresult rv;
407 // need to check the reason why the stream is ready. this is required
408 // so our listener can check our status from OnStartRequest.
409 // XXX async streams should have a GetStatus method!
410 if (NS_SUCCEEDED(mStatus)) {
411 uint64_t avail;
412 rv = mAsyncStream->Available(&avail);
413 if (NS_FAILED(rv) && rv != NS_BASE_STREAM_CLOSED)
414 mStatus = rv;
417 rv = mListener->OnStartRequest(this, mListenerContext);
419 // an error returned from OnStartRequest should cause us to abort; however,
420 // we must not stomp on mStatus if already canceled.
421 if (NS_FAILED(rv) && NS_SUCCEEDED(mStatus))
422 mStatus = rv;
424 return NS_SUCCEEDED(mStatus) ? STATE_TRANSFER : STATE_STOP;
427 uint32_t
428 nsInputStreamPump::OnStateTransfer()
430 SAMPLE_LABEL("Input", "nsInputStreamPump::OnStateTransfer");
431 LOG((" OnStateTransfer [this=%x]\n", this));
433 // if canceled, go directly to STATE_STOP...
434 if (NS_FAILED(mStatus))
435 return STATE_STOP;
437 nsresult rv;
439 uint64_t avail;
440 rv = mAsyncStream->Available(&avail);
441 LOG((" Available returned [stream=%x rv=%x avail=%llu]\n", mAsyncStream.get(), rv, avail));
443 if (rv == NS_BASE_STREAM_CLOSED) {
444 rv = NS_OK;
445 avail = 0;
447 else if (NS_SUCCEEDED(rv) && avail) {
448 // figure out how much data to report (XXX detect overflow??)
449 if (avail > mStreamLength - mStreamOffset)
450 avail = mStreamLength - mStreamOffset;
452 if (avail) {
453 // we used to limit avail to 16K - we were afraid some ODA handlers
454 // might assume they wouldn't get more than 16K at once
455 // we're removing that limit since it speeds up local file access.
456 // Now there's an implicit 64K limit of 4 16K segments
457 // NOTE: ok, so the story is as follows. OnDataAvailable impls
458 // are by contract supposed to consume exactly |avail| bytes.
459 // however, many do not... mailnews... stream converters...
460 // cough, cough. the input stream pump is fairly tolerant
461 // in this regard; however, if an ODA does not consume any
462 // data from the stream, then we could potentially end up in
463 // an infinite loop. we do our best here to try to catch
464 // such an error. (see bug 189672)
466 // in most cases this QI will succeed (mAsyncStream is almost always
467 // a nsPipeInputStream, which implements nsISeekableStream::Tell).
468 int64_t offsetBefore;
469 nsCOMPtr<nsISeekableStream> seekable = do_QueryInterface(mAsyncStream);
470 if (seekable && NS_FAILED(seekable->Tell(&offsetBefore))) {
471 NS_NOTREACHED("Tell failed on readable stream");
472 offsetBefore = 0;
475 // report the current stream offset to our listener... if we've
476 // streamed more than PR_UINT32_MAX, then avoid overflowing the
477 // stream offset. it's the best we can do without a 64-bit stream
478 // listener API.
479 uint32_t odaOffset =
480 mStreamOffset > PR_UINT32_MAX ?
481 PR_UINT32_MAX : uint32_t(mStreamOffset);
482 uint32_t odaAvail =
483 avail > PR_UINT32_MAX ?
484 PR_UINT32_MAX : uint32_t(avail);
486 LOG((" calling OnDataAvailable [offset=%lld(%u) count=%llu(%u)]\n",
487 mStreamOffset, odaOffset, avail, odaAvail));
489 rv = mListener->OnDataAvailable(this, mListenerContext, mAsyncStream,
490 odaOffset, odaAvail);
492 // don't enter this code if ODA failed or called Cancel
493 if (NS_SUCCEEDED(rv) && NS_SUCCEEDED(mStatus)) {
494 // test to see if this ODA failed to consume data
495 if (seekable) {
496 // NOTE: if Tell fails, which can happen if the stream is
497 // now closed, then we assume that everything was read.
498 int64_t offsetAfter;
499 if (NS_FAILED(seekable->Tell(&offsetAfter)))
500 offsetAfter = offsetBefore + odaAvail;
501 if (offsetAfter > offsetBefore)
502 mStreamOffset += (offsetAfter - offsetBefore);
503 else if (mSuspendCount == 0) {
505 // possible infinite loop if we continue pumping data!
507 // NOTE: although not allowed by nsIStreamListener, we
508 // will allow the ODA impl to Suspend the pump. IMAP
509 // does this :-(
511 NS_ERROR("OnDataAvailable implementation consumed no data");
512 mStatus = NS_ERROR_UNEXPECTED;
515 else
516 mStreamOffset += odaAvail; // assume ODA behaved well
521 // an error returned from Available or OnDataAvailable should cause us to
522 // abort; however, we must not stomp on mStatus if already canceled.
524 if (NS_SUCCEEDED(mStatus)) {
525 if (NS_FAILED(rv))
526 mStatus = rv;
527 else if (avail) {
528 // if stream is now closed, advance to STATE_STOP right away.
529 // Available may return 0 bytes available at the moment; that
530 // would not mean that we are done.
531 // XXX async streams should have a GetStatus method!
532 rv = mAsyncStream->Available(&avail);
533 if (NS_SUCCEEDED(rv))
534 return STATE_TRANSFER;
537 return STATE_STOP;
540 uint32_t
541 nsInputStreamPump::OnStateStop()
543 SAMPLE_LABEL("Input", "nsInputStreamPump::OnStateTransfer");
544 LOG((" OnStateStop [this=%x status=%x]\n", this, mStatus));
546 // if an error occurred, we must be sure to pass the error onto the async
547 // stream. in some cases, this is redundant, but since close is idempotent,
548 // this is OK. otherwise, be sure to honor the "close-when-done" option.
550 if (NS_FAILED(mStatus))
551 mAsyncStream->CloseWithStatus(mStatus);
552 else if (mCloseWhenDone)
553 mAsyncStream->Close();
555 mAsyncStream = 0;
556 mTargetThread = 0;
557 mIsPending = false;
559 mListener->OnStopRequest(this, mListenerContext, mStatus);
560 mListener = 0;
561 mListenerContext = 0;
563 if (mLoadGroup)
564 mLoadGroup->RemoveRequest(this, nullptr, mStatus);
566 return STATE_IDLE;