Bug 1867190 - Add prefs for PHC probablities r=glandium
[gecko.git] / xpcom / io / nsStreamUtils.cpp
blob511e0fa30023c562de0809b3bed52d756cc9dbc3
1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* vim: set ts=8 sts=2 et sw=2 tw=80: */
3 /* This Source Code Form is subject to the terms of the Mozilla Public
4 * License, v. 2.0. If a copy of the MPL was not distributed with this
5 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
7 #include "mozilla/Mutex.h"
8 #include "mozilla/Attributes.h"
9 #include "mozilla/InputStreamLengthWrapper.h"
10 #include "nsIInputStreamLength.h"
11 #include "nsStreamUtils.h"
12 #include "nsCOMPtr.h"
13 #include "nsICloneableInputStream.h"
14 #include "nsIEventTarget.h"
15 #include "nsICancelableRunnable.h"
16 #include "nsISafeOutputStream.h"
17 #include "nsString.h"
18 #include "nsIAsyncInputStream.h"
19 #include "nsIAsyncOutputStream.h"
20 #include "nsIBufferedStreams.h"
21 #include "nsIPipe.h"
22 #include "nsNetCID.h"
23 #include "nsServiceManagerUtils.h"
24 #include "nsThreadUtils.h"
25 #include "nsITransport.h"
26 #include "nsIStreamTransportService.h"
27 #include "NonBlockingAsyncInputStream.h"
29 using namespace mozilla;
31 static NS_DEFINE_CID(kStreamTransportServiceCID, NS_STREAMTRANSPORTSERVICE_CID);
33 //-----------------------------------------------------------------------------
35 // This is a nsICancelableRunnable because we can dispatch it to Workers and
36 // those can be shut down at any time, and in these cases, Cancel() is called
37 // instead of Run().
38 class nsInputStreamReadyEvent final : public CancelableRunnable,
39 public nsIInputStreamCallback,
40 public nsIRunnablePriority {
41 public:
42 NS_DECL_ISUPPORTS_INHERITED
44 nsInputStreamReadyEvent(const char* aName, nsIInputStreamCallback* aCallback,
45 nsIEventTarget* aTarget, uint32_t aPriority)
46 : CancelableRunnable(aName),
47 mCallback(aCallback),
48 mTarget(aTarget),
49 mPriority(aPriority) {}
51 private:
52 ~nsInputStreamReadyEvent() {
53 if (!mCallback) {
54 return;
57 // whoa!! looks like we never posted this event. take care to
58 // release mCallback on the correct thread. if mTarget lives on the
59 // calling thread, then we are ok. otherwise, we have to try to
60 // proxy the Release over the right thread. if that thread is dead,
61 // then there's nothing we can do... better to leak than crash.
63 bool val;
64 nsresult rv = mTarget->IsOnCurrentThread(&val);
65 if (NS_FAILED(rv) || !val) {
66 nsCOMPtr<nsIInputStreamCallback> event = NS_NewInputStreamReadyEvent(
67 "~nsInputStreamReadyEvent", mCallback, mTarget, mPriority);
68 mCallback = nullptr;
69 if (event) {
70 rv = event->OnInputStreamReady(nullptr);
71 if (NS_FAILED(rv)) {
72 MOZ_ASSERT_UNREACHABLE("leaking stream event");
73 nsISupports* sup = event;
74 NS_ADDREF(sup);
80 public:
81 NS_IMETHOD OnInputStreamReady(nsIAsyncInputStream* aStream) override {
82 mStream = aStream;
84 nsresult rv = mTarget->Dispatch(this, NS_DISPATCH_NORMAL);
85 if (NS_FAILED(rv)) {
86 NS_WARNING("Dispatch failed");
87 return NS_ERROR_FAILURE;
90 return NS_OK;
93 NS_IMETHOD Run() override {
94 if (mCallback) {
95 if (mStream) {
96 mCallback->OnInputStreamReady(mStream);
98 mCallback = nullptr;
100 return NS_OK;
103 nsresult Cancel() override {
104 mCallback = nullptr;
105 return NS_OK;
108 NS_IMETHOD GetPriority(uint32_t* aPriority) override {
109 *aPriority = mPriority;
110 return NS_OK;
113 private:
114 nsCOMPtr<nsIAsyncInputStream> mStream;
115 nsCOMPtr<nsIInputStreamCallback> mCallback;
116 nsCOMPtr<nsIEventTarget> mTarget;
117 uint32_t mPriority;
120 NS_IMPL_ISUPPORTS_INHERITED(nsInputStreamReadyEvent, CancelableRunnable,
121 nsIInputStreamCallback, nsIRunnablePriority)
123 //-----------------------------------------------------------------------------
125 // This is a nsICancelableRunnable because we can dispatch it to Workers and
126 // those can be shut down at any time, and in these cases, Cancel() is called
127 // instead of Run().
128 class nsOutputStreamReadyEvent final : public CancelableRunnable,
129 public nsIOutputStreamCallback {
130 public:
131 NS_DECL_ISUPPORTS_INHERITED
133 nsOutputStreamReadyEvent(nsIOutputStreamCallback* aCallback,
134 nsIEventTarget* aTarget)
135 : CancelableRunnable("nsOutputStreamReadyEvent"),
136 mCallback(aCallback),
137 mTarget(aTarget) {}
139 private:
140 ~nsOutputStreamReadyEvent() {
141 if (!mCallback) {
142 return;
145 // whoa!! looks like we never posted this event. take care to
146 // release mCallback on the correct thread. if mTarget lives on the
147 // calling thread, then we are ok. otherwise, we have to try to
148 // proxy the Release over the right thread. if that thread is dead,
149 // then there's nothing we can do... better to leak than crash.
151 bool val;
152 nsresult rv = mTarget->IsOnCurrentThread(&val);
153 if (NS_FAILED(rv) || !val) {
154 nsCOMPtr<nsIOutputStreamCallback> event =
155 NS_NewOutputStreamReadyEvent(mCallback, mTarget);
156 mCallback = nullptr;
157 if (event) {
158 rv = event->OnOutputStreamReady(nullptr);
159 if (NS_FAILED(rv)) {
160 MOZ_ASSERT_UNREACHABLE("leaking stream event");
161 nsISupports* sup = event;
162 NS_ADDREF(sup);
168 public:
169 NS_IMETHOD OnOutputStreamReady(nsIAsyncOutputStream* aStream) override {
170 mStream = aStream;
172 nsresult rv = mTarget->Dispatch(this, NS_DISPATCH_NORMAL);
173 if (NS_FAILED(rv)) {
174 NS_WARNING("PostEvent failed");
175 return NS_ERROR_FAILURE;
178 return NS_OK;
181 NS_IMETHOD Run() override {
182 if (mCallback) {
183 if (mStream) {
184 mCallback->OnOutputStreamReady(mStream);
186 mCallback = nullptr;
188 return NS_OK;
191 nsresult Cancel() override {
192 mCallback = nullptr;
193 return NS_OK;
196 private:
197 nsCOMPtr<nsIAsyncOutputStream> mStream;
198 nsCOMPtr<nsIOutputStreamCallback> mCallback;
199 nsCOMPtr<nsIEventTarget> mTarget;
202 NS_IMPL_ISUPPORTS_INHERITED(nsOutputStreamReadyEvent, CancelableRunnable,
203 nsIOutputStreamCallback)
205 //-----------------------------------------------------------------------------
207 already_AddRefed<nsIInputStreamCallback> NS_NewInputStreamReadyEvent(
208 const char* aName, nsIInputStreamCallback* aCallback,
209 nsIEventTarget* aTarget, uint32_t aPriority) {
210 NS_ASSERTION(aCallback, "null callback");
211 NS_ASSERTION(aTarget, "null target");
212 RefPtr<nsInputStreamReadyEvent> ev =
213 new nsInputStreamReadyEvent(aName, aCallback, aTarget, aPriority);
214 return ev.forget();
217 already_AddRefed<nsIOutputStreamCallback> NS_NewOutputStreamReadyEvent(
218 nsIOutputStreamCallback* aCallback, nsIEventTarget* aTarget) {
219 NS_ASSERTION(aCallback, "null callback");
220 NS_ASSERTION(aTarget, "null target");
221 RefPtr<nsOutputStreamReadyEvent> ev =
222 new nsOutputStreamReadyEvent(aCallback, aTarget);
223 return ev.forget();
226 //-----------------------------------------------------------------------------
227 // NS_AsyncCopy implementation
229 // abstract stream copier...
230 class nsAStreamCopier : public nsIInputStreamCallback,
231 public nsIOutputStreamCallback,
232 public CancelableRunnable {
233 public:
234 NS_DECL_ISUPPORTS_INHERITED
236 nsAStreamCopier()
237 : CancelableRunnable("nsAStreamCopier"),
238 mLock("nsAStreamCopier.mLock"),
239 mCallback(nullptr),
240 mProgressCallback(nullptr),
241 mClosure(nullptr),
242 mChunkSize(0),
243 mEventInProcess(false),
244 mEventIsPending(false),
245 mCloseSource(true),
246 mCloseSink(true),
247 mCanceled(false),
248 mCancelStatus(NS_OK) {}
250 // kick off the async copy...
251 nsresult Start(nsIInputStream* aSource, nsIOutputStream* aSink,
252 nsIEventTarget* aTarget, nsAsyncCopyCallbackFun aCallback,
253 void* aClosure, uint32_t aChunksize, bool aCloseSource,
254 bool aCloseSink, nsAsyncCopyProgressFun aProgressCallback) {
255 mSource = aSource;
256 mSink = aSink;
257 mTarget = aTarget;
258 mCallback = aCallback;
259 mClosure = aClosure;
260 mChunkSize = aChunksize;
261 mCloseSource = aCloseSource;
262 mCloseSink = aCloseSink;
263 mProgressCallback = aProgressCallback;
265 mAsyncSource = do_QueryInterface(mSource);
266 mAsyncSink = do_QueryInterface(mSink);
268 return PostContinuationEvent();
271 // implemented by subclasses, returns number of bytes copied and
272 // sets source and sink condition before returning.
273 virtual uint32_t DoCopy(nsresult* aSourceCondition,
274 nsresult* aSinkCondition) = 0;
276 void Process() {
277 if (!mSource || !mSink) {
278 return;
281 nsresult cancelStatus;
282 bool canceled;
284 MutexAutoLock lock(mLock);
285 canceled = mCanceled;
286 cancelStatus = mCancelStatus;
289 // If the copy was canceled before Process() was even called, then
290 // sourceCondition and sinkCondition should be set to error results to
291 // ensure we don't call Finish() on a canceled nsISafeOutputStream.
292 MOZ_ASSERT(NS_FAILED(cancelStatus) == canceled, "cancel needs an error");
293 nsresult sourceCondition = cancelStatus;
294 nsresult sinkCondition = cancelStatus;
296 // Copy data from the source to the sink until we hit failure or have
297 // copied all the data.
298 for (;;) {
299 // Note: copyFailed will be true if the source or the sink have
300 // reported an error, or if we failed to write any bytes
301 // because we have consumed all of our data.
302 bool copyFailed = false;
303 if (!canceled) {
304 uint32_t n = DoCopy(&sourceCondition, &sinkCondition);
305 if (n > 0 && mProgressCallback) {
306 mProgressCallback(mClosure, n);
308 copyFailed =
309 NS_FAILED(sourceCondition) || NS_FAILED(sinkCondition) || n == 0;
311 MutexAutoLock lock(mLock);
312 canceled = mCanceled;
313 cancelStatus = mCancelStatus;
315 if (copyFailed && !canceled) {
316 if (sourceCondition == NS_BASE_STREAM_WOULD_BLOCK && mAsyncSource) {
317 // need to wait for more data from source. while waiting for
318 // more source data, be sure to observe failures on output end.
319 mAsyncSource->AsyncWait(this, 0, 0, nullptr);
321 if (mAsyncSink) {
322 mAsyncSink->AsyncWait(this, nsIAsyncOutputStream::WAIT_CLOSURE_ONLY,
323 0, nullptr);
325 break;
327 if (sinkCondition == NS_BASE_STREAM_WOULD_BLOCK && mAsyncSink) {
328 // need to wait for more room in the sink. while waiting for
329 // more room in the sink, be sure to observer failures on the
330 // input end.
331 mAsyncSink->AsyncWait(this, 0, 0, nullptr);
333 if (mAsyncSource) {
334 mAsyncSource->AsyncWait(
335 this, nsIAsyncInputStream::WAIT_CLOSURE_ONLY, 0, nullptr);
337 break;
340 if (copyFailed || canceled) {
341 if (mAsyncSource) {
342 // cancel any previously-registered AsyncWait callbacks to avoid leaks
343 mAsyncSource->AsyncWait(nullptr, 0, 0, nullptr);
345 if (mCloseSource) {
346 // close source
347 if (mAsyncSource) {
348 mAsyncSource->CloseWithStatus(canceled ? cancelStatus
349 : sinkCondition);
350 } else {
351 mSource->Close();
354 mAsyncSource = nullptr;
355 mSource = nullptr;
357 if (mAsyncSink) {
358 // cancel any previously-registered AsyncWait callbacks to avoid leaks
359 mAsyncSink->AsyncWait(nullptr, 0, 0, nullptr);
361 if (mCloseSink) {
362 // close sink
363 if (mAsyncSink) {
364 mAsyncSink->CloseWithStatus(canceled ? cancelStatus
365 : sourceCondition);
366 } else {
367 // If we have an nsISafeOutputStream, and our
368 // sourceCondition and sinkCondition are not set to a
369 // failure state, finish writing.
370 nsCOMPtr<nsISafeOutputStream> sostream = do_QueryInterface(mSink);
371 if (sostream && NS_SUCCEEDED(sourceCondition) &&
372 NS_SUCCEEDED(sinkCondition)) {
373 sostream->Finish();
374 } else {
375 mSink->Close();
379 mAsyncSink = nullptr;
380 mSink = nullptr;
382 // notify state complete...
383 if (mCallback) {
384 nsresult status;
385 if (!canceled) {
386 status = sourceCondition;
387 if (NS_SUCCEEDED(status)) {
388 status = sinkCondition;
390 if (status == NS_BASE_STREAM_CLOSED) {
391 status = NS_OK;
393 } else {
394 status = cancelStatus;
396 mCallback(mClosure, status);
398 break;
403 nsresult Cancel(nsresult aReason) {
404 MutexAutoLock lock(mLock);
405 if (mCanceled) {
406 return NS_ERROR_FAILURE;
409 if (NS_SUCCEEDED(aReason)) {
410 NS_WARNING("cancel with non-failure status code");
411 aReason = NS_BASE_STREAM_CLOSED;
414 mCanceled = true;
415 mCancelStatus = aReason;
416 return NS_OK;
419 NS_IMETHOD OnInputStreamReady(nsIAsyncInputStream* aSource) override {
420 PostContinuationEvent();
421 return NS_OK;
424 NS_IMETHOD OnOutputStreamReady(nsIAsyncOutputStream* aSink) override {
425 PostContinuationEvent();
426 return NS_OK;
429 // continuation event handler
430 NS_IMETHOD Run() override {
431 Process();
433 // clear "in process" flag and post any pending continuation event
434 MutexAutoLock lock(mLock);
435 mEventInProcess = false;
436 if (mEventIsPending) {
437 mEventIsPending = false;
438 PostContinuationEvent_Locked();
441 return NS_OK;
444 nsresult Cancel() MOZ_MUST_OVERRIDE override = 0;
446 nsresult PostContinuationEvent() {
447 // we cannot post a continuation event if there is currently
448 // an event in process. doing so could result in Process being
449 // run simultaneously on multiple threads, so we mark the event
450 // as pending, and if an event is already in process then we
451 // just let that existing event take care of posting the real
452 // continuation event.
454 MutexAutoLock lock(mLock);
455 return PostContinuationEvent_Locked();
458 nsresult PostContinuationEvent_Locked() MOZ_REQUIRES(mLock) {
459 nsresult rv = NS_OK;
460 if (mEventInProcess) {
461 mEventIsPending = true;
462 } else {
463 rv = mTarget->Dispatch(this, NS_DISPATCH_NORMAL);
464 if (NS_SUCCEEDED(rv)) {
465 mEventInProcess = true;
466 } else {
467 NS_WARNING("unable to post continuation event");
470 return rv;
473 protected:
474 nsCOMPtr<nsIInputStream> mSource;
475 nsCOMPtr<nsIOutputStream> mSink;
476 nsCOMPtr<nsIAsyncInputStream> mAsyncSource;
477 nsCOMPtr<nsIAsyncOutputStream> mAsyncSink;
478 nsCOMPtr<nsIEventTarget> mTarget;
479 Mutex mLock;
480 nsAsyncCopyCallbackFun mCallback;
481 nsAsyncCopyProgressFun mProgressCallback;
482 void* mClosure;
483 uint32_t mChunkSize;
484 bool mEventInProcess MOZ_GUARDED_BY(mLock);
485 bool mEventIsPending MOZ_GUARDED_BY(mLock);
486 bool mCloseSource;
487 bool mCloseSink;
488 bool mCanceled MOZ_GUARDED_BY(mLock);
489 nsresult mCancelStatus MOZ_GUARDED_BY(mLock);
491 // virtual since subclasses call superclass Release()
492 virtual ~nsAStreamCopier() = default;
495 NS_IMPL_ISUPPORTS_INHERITED(nsAStreamCopier, CancelableRunnable,
496 nsIInputStreamCallback, nsIOutputStreamCallback)
498 class nsStreamCopierIB final : public nsAStreamCopier {
499 public:
500 nsStreamCopierIB() = default;
501 virtual ~nsStreamCopierIB() = default;
503 struct MOZ_STACK_CLASS ReadSegmentsState {
504 // the nsIOutputStream will outlive the ReadSegmentsState on the stack
505 nsIOutputStream* MOZ_NON_OWNING_REF mSink;
506 nsresult mSinkCondition;
509 static nsresult ConsumeInputBuffer(nsIInputStream* aInStr, void* aClosure,
510 const char* aBuffer, uint32_t aOffset,
511 uint32_t aCount, uint32_t* aCountWritten) {
512 ReadSegmentsState* state = (ReadSegmentsState*)aClosure;
514 nsresult rv = state->mSink->Write(aBuffer, aCount, aCountWritten);
515 if (NS_FAILED(rv)) {
516 state->mSinkCondition = rv;
517 } else if (*aCountWritten == 0) {
518 state->mSinkCondition = NS_BASE_STREAM_CLOSED;
521 return state->mSinkCondition;
524 uint32_t DoCopy(nsresult* aSourceCondition,
525 nsresult* aSinkCondition) override {
526 ReadSegmentsState state;
527 state.mSink = mSink;
528 state.mSinkCondition = NS_OK;
530 uint32_t n;
531 *aSourceCondition =
532 mSource->ReadSegments(ConsumeInputBuffer, &state, mChunkSize, &n);
533 *aSinkCondition = NS_SUCCEEDED(state.mSinkCondition) && n == 0
534 ? mSink->StreamStatus()
535 : state.mSinkCondition;
536 return n;
539 nsresult Cancel() override { return NS_OK; }
542 class nsStreamCopierOB final : public nsAStreamCopier {
543 public:
544 nsStreamCopierOB() = default;
545 virtual ~nsStreamCopierOB() = default;
547 struct MOZ_STACK_CLASS WriteSegmentsState {
548 // the nsIInputStream will outlive the WriteSegmentsState on the stack
549 nsIInputStream* MOZ_NON_OWNING_REF mSource;
550 nsresult mSourceCondition;
553 static nsresult FillOutputBuffer(nsIOutputStream* aOutStr, void* aClosure,
554 char* aBuffer, uint32_t aOffset,
555 uint32_t aCount, uint32_t* aCountRead) {
556 WriteSegmentsState* state = (WriteSegmentsState*)aClosure;
558 nsresult rv = state->mSource->Read(aBuffer, aCount, aCountRead);
559 if (NS_FAILED(rv)) {
560 state->mSourceCondition = rv;
561 } else if (*aCountRead == 0) {
562 state->mSourceCondition = NS_BASE_STREAM_CLOSED;
565 return state->mSourceCondition;
568 uint32_t DoCopy(nsresult* aSourceCondition,
569 nsresult* aSinkCondition) override {
570 WriteSegmentsState state;
571 state.mSource = mSource;
572 state.mSourceCondition = NS_OK;
574 uint32_t n;
575 *aSinkCondition =
576 mSink->WriteSegments(FillOutputBuffer, &state, mChunkSize, &n);
577 *aSourceCondition = NS_SUCCEEDED(state.mSourceCondition) && n == 0
578 ? mSource->StreamStatus()
579 : state.mSourceCondition;
580 return n;
583 nsresult Cancel() override { return NS_OK; }
586 //-----------------------------------------------------------------------------
588 nsresult NS_AsyncCopy(nsIInputStream* aSource, nsIOutputStream* aSink,
589 nsIEventTarget* aTarget, nsAsyncCopyMode aMode,
590 uint32_t aChunkSize, nsAsyncCopyCallbackFun aCallback,
591 void* aClosure, bool aCloseSource, bool aCloseSink,
592 nsISupports** aCopierCtx,
593 nsAsyncCopyProgressFun aProgressCallback) {
594 NS_ASSERTION(aTarget, "non-null target required");
596 nsresult rv;
597 nsAStreamCopier* copier;
599 if (aMode == NS_ASYNCCOPY_VIA_READSEGMENTS) {
600 copier = new nsStreamCopierIB();
601 } else {
602 copier = new nsStreamCopierOB();
605 // Start() takes an owning ref to the copier...
606 NS_ADDREF(copier);
607 rv = copier->Start(aSource, aSink, aTarget, aCallback, aClosure, aChunkSize,
608 aCloseSource, aCloseSink, aProgressCallback);
610 if (aCopierCtx) {
611 *aCopierCtx = static_cast<nsISupports*>(static_cast<nsIRunnable*>(copier));
612 NS_ADDREF(*aCopierCtx);
614 NS_RELEASE(copier);
616 return rv;
619 //-----------------------------------------------------------------------------
621 nsresult NS_CancelAsyncCopy(nsISupports* aCopierCtx, nsresult aReason) {
622 nsAStreamCopier* copier =
623 static_cast<nsAStreamCopier*>(static_cast<nsIRunnable*>(aCopierCtx));
624 return copier->Cancel(aReason);
627 //-----------------------------------------------------------------------------
629 namespace {
630 template <typename T>
631 struct ResultTraits {};
633 template <>
634 struct ResultTraits<nsACString> {
635 static void Clear(nsACString& aString) { aString.Truncate(); }
637 static char* GetStorage(nsACString& aString) {
638 return aString.BeginWriting();
642 template <>
643 struct ResultTraits<nsTArray<uint8_t>> {
644 static void Clear(nsTArray<uint8_t>& aArray) { aArray.Clear(); }
646 static char* GetStorage(nsTArray<uint8_t>& aArray) {
647 return reinterpret_cast<char*>(aArray.Elements());
650 } // namespace
652 template <typename T>
653 nsresult DoConsumeStream(nsIInputStream* aStream, uint32_t aMaxCount,
654 T& aResult) {
655 nsresult rv = NS_OK;
656 ResultTraits<T>::Clear(aResult);
658 while (aMaxCount) {
659 uint64_t avail64;
660 rv = aStream->Available(&avail64);
661 if (NS_FAILED(rv)) {
662 if (rv == NS_BASE_STREAM_CLOSED) {
663 rv = NS_OK;
665 break;
667 if (avail64 == 0) {
668 break;
671 uint32_t avail = (uint32_t)XPCOM_MIN<uint64_t>(avail64, aMaxCount);
673 // resize aResult buffer
674 uint32_t length = aResult.Length();
675 CheckedInt<uint32_t> newLength = CheckedInt<uint32_t>(length) + avail;
676 if (!newLength.isValid()) {
677 return NS_ERROR_FILE_TOO_BIG;
680 if (!aResult.SetLength(newLength.value(), fallible)) {
681 return NS_ERROR_OUT_OF_MEMORY;
683 char* buf = ResultTraits<T>::GetStorage(aResult) + length;
685 uint32_t n;
686 rv = aStream->Read(buf, avail, &n);
687 if (NS_FAILED(rv)) {
688 break;
690 if (n != avail) {
691 MOZ_ASSERT(n < avail, "What happened there???");
692 aResult.SetLength(length + n);
694 if (n == 0) {
695 break;
697 aMaxCount -= n;
700 return rv;
703 nsresult NS_ConsumeStream(nsIInputStream* aStream, uint32_t aMaxCount,
704 nsACString& aResult) {
705 return DoConsumeStream(aStream, aMaxCount, aResult);
708 nsresult NS_ConsumeStream(nsIInputStream* aStream, uint32_t aMaxCount,
709 nsTArray<uint8_t>& aResult) {
710 return DoConsumeStream(aStream, aMaxCount, aResult);
713 //-----------------------------------------------------------------------------
715 static nsresult TestInputStream(nsIInputStream* aInStr, void* aClosure,
716 const char* aBuffer, uint32_t aOffset,
717 uint32_t aCount, uint32_t* aCountWritten) {
718 bool* result = static_cast<bool*>(aClosure);
719 *result = true;
720 *aCountWritten = 0;
721 return NS_ERROR_ABORT; // don't call me anymore
724 bool NS_InputStreamIsBuffered(nsIInputStream* aStream) {
725 nsCOMPtr<nsIBufferedInputStream> bufferedIn = do_QueryInterface(aStream);
726 if (bufferedIn) {
727 return true;
730 bool result = false;
731 uint32_t n;
732 nsresult rv = aStream->ReadSegments(TestInputStream, &result, 1, &n);
733 return result || rv != NS_ERROR_NOT_IMPLEMENTED;
736 static nsresult TestOutputStream(nsIOutputStream* aOutStr, void* aClosure,
737 char* aBuffer, uint32_t aOffset,
738 uint32_t aCount, uint32_t* aCountRead) {
739 bool* result = static_cast<bool*>(aClosure);
740 *result = true;
741 *aCountRead = 0;
742 return NS_ERROR_ABORT; // don't call me anymore
745 bool NS_OutputStreamIsBuffered(nsIOutputStream* aStream) {
746 nsCOMPtr<nsIBufferedOutputStream> bufferedOut = do_QueryInterface(aStream);
747 if (bufferedOut) {
748 return true;
751 bool result = false;
752 uint32_t n;
753 aStream->WriteSegments(TestOutputStream, &result, 1, &n);
754 return result;
757 //-----------------------------------------------------------------------------
759 nsresult NS_CopySegmentToStream(nsIInputStream* aInStr, void* aClosure,
760 const char* aBuffer, uint32_t aOffset,
761 uint32_t aCount, uint32_t* aCountWritten) {
762 nsIOutputStream* outStr = static_cast<nsIOutputStream*>(aClosure);
763 *aCountWritten = 0;
764 while (aCount) {
765 uint32_t n;
766 nsresult rv = outStr->Write(aBuffer, aCount, &n);
767 if (NS_FAILED(rv)) {
768 return rv;
770 aBuffer += n;
771 aCount -= n;
772 *aCountWritten += n;
774 return NS_OK;
777 nsresult NS_CopySegmentToBuffer(nsIInputStream* aInStr, void* aClosure,
778 const char* aBuffer, uint32_t aOffset,
779 uint32_t aCount, uint32_t* aCountWritten) {
780 char* toBuf = static_cast<char*>(aClosure);
781 memcpy(&toBuf[aOffset], aBuffer, aCount);
782 *aCountWritten = aCount;
783 return NS_OK;
786 nsresult NS_CopyBufferToSegment(nsIOutputStream* aOutStr, void* aClosure,
787 char* aBuffer, uint32_t aOffset,
788 uint32_t aCount, uint32_t* aCountRead) {
789 const char* fromBuf = static_cast<const char*>(aClosure);
790 memcpy(aBuffer, &fromBuf[aOffset], aCount);
791 *aCountRead = aCount;
792 return NS_OK;
795 nsresult NS_CopyStreamToSegment(nsIOutputStream* aOutputStream, void* aClosure,
796 char* aToSegment, uint32_t aFromOffset,
797 uint32_t aCount, uint32_t* aReadCount) {
798 nsIInputStream* fromStream = static_cast<nsIInputStream*>(aClosure);
799 return fromStream->Read(aToSegment, aCount, aReadCount);
802 nsresult NS_DiscardSegment(nsIInputStream* aInStr, void* aClosure,
803 const char* aBuffer, uint32_t aOffset,
804 uint32_t aCount, uint32_t* aCountWritten) {
805 *aCountWritten = aCount;
806 return NS_OK;
809 //-----------------------------------------------------------------------------
811 nsresult NS_WriteSegmentThunk(nsIInputStream* aInStr, void* aClosure,
812 const char* aBuffer, uint32_t aOffset,
813 uint32_t aCount, uint32_t* aCountWritten) {
814 nsWriteSegmentThunk* thunk = static_cast<nsWriteSegmentThunk*>(aClosure);
815 return thunk->mFun(thunk->mStream, thunk->mClosure, aBuffer, aOffset, aCount,
816 aCountWritten);
819 nsresult NS_FillArray(FallibleTArray<char>& aDest, nsIInputStream* aInput,
820 uint32_t aKeep, uint32_t* aNewBytes) {
821 MOZ_ASSERT(aInput, "null stream");
822 MOZ_ASSERT(aKeep <= aDest.Length(), "illegal keep count");
824 char* aBuffer = aDest.Elements();
825 int64_t keepOffset = int64_t(aDest.Length()) - aKeep;
826 if (aKeep != 0 && keepOffset > 0) {
827 memmove(aBuffer, aBuffer + keepOffset, aKeep);
830 nsresult rv =
831 aInput->Read(aBuffer + aKeep, aDest.Capacity() - aKeep, aNewBytes);
832 if (NS_FAILED(rv)) {
833 *aNewBytes = 0;
835 // NOTE: we rely on the fact that the new slots are NOT initialized by
836 // SetLengthAndRetainStorage here, see nsTArrayElementTraits::Construct()
837 // in nsTArray.h:
838 aDest.SetLengthAndRetainStorage(aKeep + *aNewBytes);
840 MOZ_ASSERT(aDest.Length() <= aDest.Capacity(), "buffer overflow");
841 return rv;
844 bool NS_InputStreamIsCloneable(nsIInputStream* aSource) {
845 if (!aSource) {
846 return false;
849 nsCOMPtr<nsICloneableInputStream> cloneable = do_QueryInterface(aSource);
850 return cloneable && cloneable->GetCloneable();
853 nsresult NS_CloneInputStream(nsIInputStream* aSource,
854 nsIInputStream** aCloneOut,
855 nsIInputStream** aReplacementOut) {
856 if (NS_WARN_IF(!aSource)) {
857 return NS_ERROR_FAILURE;
860 // Attempt to perform the clone directly on the source stream
861 nsCOMPtr<nsICloneableInputStream> cloneable = do_QueryInterface(aSource);
862 if (cloneable && cloneable->GetCloneable()) {
863 if (aReplacementOut) {
864 *aReplacementOut = nullptr;
866 return cloneable->Clone(aCloneOut);
869 // If we failed the clone and the caller does not want to replace their
870 // original stream, then we are done. Return error.
871 if (!aReplacementOut) {
872 return NS_ERROR_FAILURE;
875 // The caller has opted-in to the fallback clone support that replaces
876 // the original stream. Copy the data to a pipe and return two cloned
877 // input streams.
879 nsCOMPtr<nsIInputStream> reader;
880 nsCOMPtr<nsIInputStream> readerClone;
881 nsCOMPtr<nsIOutputStream> writer;
883 NS_NewPipe(getter_AddRefs(reader), getter_AddRefs(writer), 0,
884 0, // default segment size and max size
885 true, true); // non-blocking
887 // Propagate length information provided by nsIInputStreamLength. We don't use
888 // InputStreamLengthHelper::GetSyncLength to avoid the risk of blocking when
889 // called off-main-thread.
890 int64_t length = -1;
891 if (nsCOMPtr<nsIInputStreamLength> streamLength = do_QueryInterface(aSource);
892 streamLength && NS_SUCCEEDED(streamLength->Length(&length)) &&
893 length != -1) {
894 reader = new mozilla::InputStreamLengthWrapper(reader.forget(), length);
897 cloneable = do_QueryInterface(reader);
898 MOZ_ASSERT(cloneable && cloneable->GetCloneable());
900 nsresult rv = cloneable->Clone(getter_AddRefs(readerClone));
901 if (NS_WARN_IF(NS_FAILED(rv))) {
902 return rv;
905 nsCOMPtr<nsIEventTarget> target =
906 do_GetService(NS_STREAMTRANSPORTSERVICE_CONTRACTID, &rv);
907 if (NS_WARN_IF(NS_FAILED(rv))) {
908 return rv;
911 rv = NS_AsyncCopy(aSource, writer, target, NS_ASYNCCOPY_VIA_WRITESEGMENTS);
912 if (NS_WARN_IF(NS_FAILED(rv))) {
913 return rv;
916 readerClone.forget(aCloneOut);
917 reader.forget(aReplacementOut);
919 return NS_OK;
922 nsresult NS_MakeAsyncNonBlockingInputStream(
923 already_AddRefed<nsIInputStream> aSource,
924 nsIAsyncInputStream** aAsyncInputStream, bool aCloseWhenDone,
925 uint32_t aFlags, uint32_t aSegmentSize, uint32_t aSegmentCount) {
926 nsCOMPtr<nsIInputStream> source = std::move(aSource);
927 if (NS_WARN_IF(!aAsyncInputStream)) {
928 return NS_ERROR_FAILURE;
931 bool nonBlocking = false;
932 nsresult rv = source->IsNonBlocking(&nonBlocking);
933 if (NS_WARN_IF(NS_FAILED(rv))) {
934 return rv;
937 nsCOMPtr<nsIAsyncInputStream> asyncStream = do_QueryInterface(source);
939 if (nonBlocking && asyncStream) {
940 // This stream is perfect!
941 asyncStream.forget(aAsyncInputStream);
942 return NS_OK;
945 if (nonBlocking) {
946 // If the stream is non-blocking but not async, we wrap it.
947 return NonBlockingAsyncInputStream::Create(source.forget(),
948 aAsyncInputStream);
951 nsCOMPtr<nsIStreamTransportService> sts =
952 do_GetService(kStreamTransportServiceCID, &rv);
953 if (NS_WARN_IF(NS_FAILED(rv))) {
954 return rv;
957 nsCOMPtr<nsITransport> transport;
958 rv = sts->CreateInputTransport(source, aCloseWhenDone,
959 getter_AddRefs(transport));
960 if (NS_WARN_IF(NS_FAILED(rv))) {
961 return rv;
964 nsCOMPtr<nsIInputStream> wrapper;
965 rv = transport->OpenInputStream(aFlags, aSegmentSize, aSegmentCount,
966 getter_AddRefs(wrapper));
967 if (NS_WARN_IF(NS_FAILED(rv))) {
968 return rv;
971 asyncStream = do_QueryInterface(wrapper);
972 MOZ_ASSERT(asyncStream);
974 asyncStream.forget(aAsyncInputStream);
975 return NS_OK;