Bug 1690340 - Part 2: Use the new naming for the developer tools menu items. r=jdescottes
[gecko.git] / xpcom / io / nsStreamUtils.cpp
blobafdab1dc1c0068e36660bff6f4c531e2bb96d54b
1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* vim: set ts=8 sts=2 et sw=2 tw=80: */
3 /* This Source Code Form is subject to the terms of the Mozilla Public
4 * License, v. 2.0. If a copy of the MPL was not distributed with this
5 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
7 #include "mozilla/Mutex.h"
8 #include "mozilla/Attributes.h"
9 #include "nsStreamUtils.h"
10 #include "nsCOMPtr.h"
11 #include "nsICloneableInputStream.h"
12 #include "nsIEventTarget.h"
13 #include "nsICancelableRunnable.h"
14 #include "nsISafeOutputStream.h"
15 #include "nsString.h"
16 #include "nsIAsyncInputStream.h"
17 #include "nsIAsyncOutputStream.h"
18 #include "nsIBufferedStreams.h"
19 #include "nsIPipe.h"
20 #include "nsNetCID.h"
21 #include "nsServiceManagerUtils.h"
22 #include "nsThreadUtils.h"
23 #include "nsITransport.h"
24 #include "nsIStreamTransportService.h"
25 #include "NonBlockingAsyncInputStream.h"
27 using namespace mozilla;
29 static NS_DEFINE_CID(kStreamTransportServiceCID, NS_STREAMTRANSPORTSERVICE_CID);
31 //-----------------------------------------------------------------------------
33 // This is a nsICancelableRunnable because we can dispatch it to Workers and
34 // those can be shut down at any time, and in these cases, Cancel() is called
35 // instead of Run().
36 class nsInputStreamReadyEvent final : public CancelableRunnable,
37 public nsIInputStreamCallback,
38 public nsIRunnablePriority {
39 public:
40 NS_DECL_ISUPPORTS_INHERITED
42 nsInputStreamReadyEvent(const char* aName, nsIInputStreamCallback* aCallback,
43 nsIEventTarget* aTarget, uint32_t aPriority)
44 : CancelableRunnable(aName),
45 mCallback(aCallback),
46 mTarget(aTarget),
47 mPriority(aPriority) {}
49 private:
50 ~nsInputStreamReadyEvent() {
51 if (!mCallback) {
52 return;
55 // whoa!! looks like we never posted this event. take care to
56 // release mCallback on the correct thread. if mTarget lives on the
57 // calling thread, then we are ok. otherwise, we have to try to
58 // proxy the Release over the right thread. if that thread is dead,
59 // then there's nothing we can do... better to leak than crash.
61 bool val;
62 nsresult rv = mTarget->IsOnCurrentThread(&val);
63 if (NS_FAILED(rv) || !val) {
64 nsCOMPtr<nsIInputStreamCallback> event = NS_NewInputStreamReadyEvent(
65 "~nsInputStreamReadyEvent", mCallback, mTarget, mPriority);
66 mCallback = nullptr;
67 if (event) {
68 rv = event->OnInputStreamReady(nullptr);
69 if (NS_FAILED(rv)) {
70 MOZ_ASSERT_UNREACHABLE("leaking stream event");
71 nsISupports* sup = event;
72 NS_ADDREF(sup);
78 public:
79 NS_IMETHOD OnInputStreamReady(nsIAsyncInputStream* aStream) override {
80 mStream = aStream;
82 nsresult rv = mTarget->Dispatch(this, NS_DISPATCH_NORMAL);
83 if (NS_FAILED(rv)) {
84 NS_WARNING("Dispatch failed");
85 return NS_ERROR_FAILURE;
88 return NS_OK;
91 NS_IMETHOD Run() override {
92 if (mCallback) {
93 if (mStream) {
94 mCallback->OnInputStreamReady(mStream);
96 mCallback = nullptr;
98 return NS_OK;
101 nsresult Cancel() override {
102 mCallback = nullptr;
103 return NS_OK;
106 NS_IMETHOD GetPriority(uint32_t* aPriority) override {
107 *aPriority = mPriority;
108 return NS_OK;
111 private:
112 nsCOMPtr<nsIAsyncInputStream> mStream;
113 nsCOMPtr<nsIInputStreamCallback> mCallback;
114 nsCOMPtr<nsIEventTarget> mTarget;
115 uint32_t mPriority;
118 NS_IMPL_ISUPPORTS_INHERITED(nsInputStreamReadyEvent, CancelableRunnable,
119 nsIInputStreamCallback, nsIRunnablePriority)
121 //-----------------------------------------------------------------------------
123 // This is a nsICancelableRunnable because we can dispatch it to Workers and
124 // those can be shut down at any time, and in these cases, Cancel() is called
125 // instead of Run().
126 class nsOutputStreamReadyEvent final : public CancelableRunnable,
127 public nsIOutputStreamCallback {
128 public:
129 NS_DECL_ISUPPORTS_INHERITED
131 nsOutputStreamReadyEvent(nsIOutputStreamCallback* aCallback,
132 nsIEventTarget* aTarget)
133 : CancelableRunnable("nsOutputStreamReadyEvent"),
134 mCallback(aCallback),
135 mTarget(aTarget) {}
137 private:
138 ~nsOutputStreamReadyEvent() {
139 if (!mCallback) {
140 return;
143 // whoa!! looks like we never posted this event. take care to
144 // release mCallback on the correct thread. if mTarget lives on the
145 // calling thread, then we are ok. otherwise, we have to try to
146 // proxy the Release over the right thread. if that thread is dead,
147 // then there's nothing we can do... better to leak than crash.
149 bool val;
150 nsresult rv = mTarget->IsOnCurrentThread(&val);
151 if (NS_FAILED(rv) || !val) {
152 nsCOMPtr<nsIOutputStreamCallback> event =
153 NS_NewOutputStreamReadyEvent(mCallback, mTarget);
154 mCallback = nullptr;
155 if (event) {
156 rv = event->OnOutputStreamReady(nullptr);
157 if (NS_FAILED(rv)) {
158 MOZ_ASSERT_UNREACHABLE("leaking stream event");
159 nsISupports* sup = event;
160 NS_ADDREF(sup);
166 public:
167 NS_IMETHOD OnOutputStreamReady(nsIAsyncOutputStream* aStream) override {
168 mStream = aStream;
170 nsresult rv = mTarget->Dispatch(this, NS_DISPATCH_NORMAL);
171 if (NS_FAILED(rv)) {
172 NS_WARNING("PostEvent failed");
173 return NS_ERROR_FAILURE;
176 return NS_OK;
179 NS_IMETHOD Run() override {
180 if (mCallback) {
181 if (mStream) {
182 mCallback->OnOutputStreamReady(mStream);
184 mCallback = nullptr;
186 return NS_OK;
189 nsresult Cancel() override {
190 mCallback = nullptr;
191 return NS_OK;
194 private:
195 nsCOMPtr<nsIAsyncOutputStream> mStream;
196 nsCOMPtr<nsIOutputStreamCallback> mCallback;
197 nsCOMPtr<nsIEventTarget> mTarget;
200 NS_IMPL_ISUPPORTS_INHERITED(nsOutputStreamReadyEvent, CancelableRunnable,
201 nsIOutputStreamCallback)
203 //-----------------------------------------------------------------------------
205 already_AddRefed<nsIInputStreamCallback> NS_NewInputStreamReadyEvent(
206 const char* aName, nsIInputStreamCallback* aCallback,
207 nsIEventTarget* aTarget, uint32_t aPriority) {
208 NS_ASSERTION(aCallback, "null callback");
209 NS_ASSERTION(aTarget, "null target");
210 RefPtr<nsInputStreamReadyEvent> ev =
211 new nsInputStreamReadyEvent(aName, aCallback, aTarget, aPriority);
212 return ev.forget();
215 already_AddRefed<nsIOutputStreamCallback> NS_NewOutputStreamReadyEvent(
216 nsIOutputStreamCallback* aCallback, nsIEventTarget* aTarget) {
217 NS_ASSERTION(aCallback, "null callback");
218 NS_ASSERTION(aTarget, "null target");
219 RefPtr<nsOutputStreamReadyEvent> ev =
220 new nsOutputStreamReadyEvent(aCallback, aTarget);
221 return ev.forget();
224 //-----------------------------------------------------------------------------
225 // NS_AsyncCopy implementation
227 // abstract stream copier...
228 class nsAStreamCopier : public nsIInputStreamCallback,
229 public nsIOutputStreamCallback,
230 public CancelableRunnable {
231 public:
232 NS_DECL_ISUPPORTS_INHERITED
234 nsAStreamCopier()
235 : CancelableRunnable("nsAStreamCopier"),
236 mLock("nsAStreamCopier.mLock"),
237 mCallback(nullptr),
238 mProgressCallback(nullptr),
239 mClosure(nullptr),
240 mChunkSize(0),
241 mEventInProcess(false),
242 mEventIsPending(false),
243 mCloseSource(true),
244 mCloseSink(true),
245 mCanceled(false),
246 mCancelStatus(NS_OK) {}
248 // kick off the async copy...
249 nsresult Start(nsIInputStream* aSource, nsIOutputStream* aSink,
250 nsIEventTarget* aTarget, nsAsyncCopyCallbackFun aCallback,
251 void* aClosure, uint32_t aChunksize, bool aCloseSource,
252 bool aCloseSink, nsAsyncCopyProgressFun aProgressCallback) {
253 mSource = aSource;
254 mSink = aSink;
255 mTarget = aTarget;
256 mCallback = aCallback;
257 mClosure = aClosure;
258 mChunkSize = aChunksize;
259 mCloseSource = aCloseSource;
260 mCloseSink = aCloseSink;
261 mProgressCallback = aProgressCallback;
263 mAsyncSource = do_QueryInterface(mSource);
264 mAsyncSink = do_QueryInterface(mSink);
266 return PostContinuationEvent();
269 // implemented by subclasses, returns number of bytes copied and
270 // sets source and sink condition before returning.
271 virtual uint32_t DoCopy(nsresult* aSourceCondition,
272 nsresult* aSinkCondition) = 0;
274 void Process() {
275 if (!mSource || !mSink) {
276 return;
279 nsresult cancelStatus;
280 bool canceled;
282 MutexAutoLock lock(mLock);
283 canceled = mCanceled;
284 cancelStatus = mCancelStatus;
287 // If the copy was canceled before Process() was even called, then
288 // sourceCondition and sinkCondition should be set to error results to
289 // ensure we don't call Finish() on a canceled nsISafeOutputStream.
290 MOZ_ASSERT(NS_FAILED(cancelStatus) == canceled, "cancel needs an error");
291 nsresult sourceCondition = cancelStatus;
292 nsresult sinkCondition = cancelStatus;
294 // Copy data from the source to the sink until we hit failure or have
295 // copied all the data.
296 for (;;) {
297 // Note: copyFailed will be true if the source or the sink have
298 // reported an error, or if we failed to write any bytes
299 // because we have consumed all of our data.
300 bool copyFailed = false;
301 if (!canceled) {
302 uint32_t n = DoCopy(&sourceCondition, &sinkCondition);
303 if (n > 0 && mProgressCallback) {
304 mProgressCallback(mClosure, n);
306 copyFailed =
307 NS_FAILED(sourceCondition) || NS_FAILED(sinkCondition) || n == 0;
309 MutexAutoLock lock(mLock);
310 canceled = mCanceled;
311 cancelStatus = mCancelStatus;
313 if (copyFailed && !canceled) {
314 if (sourceCondition == NS_BASE_STREAM_WOULD_BLOCK && mAsyncSource) {
315 // need to wait for more data from source. while waiting for
316 // more source data, be sure to observe failures on output end.
317 mAsyncSource->AsyncWait(this, 0, 0, nullptr);
319 if (mAsyncSink)
320 mAsyncSink->AsyncWait(this, nsIAsyncOutputStream::WAIT_CLOSURE_ONLY,
321 0, nullptr);
322 break;
323 } else if (sinkCondition == NS_BASE_STREAM_WOULD_BLOCK && mAsyncSink) {
324 // need to wait for more room in the sink. while waiting for
325 // more room in the sink, be sure to observer failures on the
326 // input end.
327 mAsyncSink->AsyncWait(this, 0, 0, nullptr);
329 if (mAsyncSource)
330 mAsyncSource->AsyncWait(
331 this, nsIAsyncInputStream::WAIT_CLOSURE_ONLY, 0, nullptr);
332 break;
335 if (copyFailed || canceled) {
336 if (mCloseSource) {
337 // close source
338 if (mAsyncSource)
339 mAsyncSource->CloseWithStatus(canceled ? cancelStatus
340 : sinkCondition);
341 else {
342 mSource->Close();
345 mAsyncSource = nullptr;
346 mSource = nullptr;
348 if (mCloseSink) {
349 // close sink
350 if (mAsyncSink)
351 mAsyncSink->CloseWithStatus(canceled ? cancelStatus
352 : sourceCondition);
353 else {
354 // If we have an nsISafeOutputStream, and our
355 // sourceCondition and sinkCondition are not set to a
356 // failure state, finish writing.
357 nsCOMPtr<nsISafeOutputStream> sostream = do_QueryInterface(mSink);
358 if (sostream && NS_SUCCEEDED(sourceCondition) &&
359 NS_SUCCEEDED(sinkCondition)) {
360 sostream->Finish();
361 } else {
362 mSink->Close();
366 mAsyncSink = nullptr;
367 mSink = nullptr;
369 // notify state complete...
370 if (mCallback) {
371 nsresult status;
372 if (!canceled) {
373 status = sourceCondition;
374 if (NS_SUCCEEDED(status)) {
375 status = sinkCondition;
377 if (status == NS_BASE_STREAM_CLOSED) {
378 status = NS_OK;
380 } else {
381 status = cancelStatus;
383 mCallback(mClosure, status);
385 break;
390 nsresult Cancel(nsresult aReason) {
391 MutexAutoLock lock(mLock);
392 if (mCanceled) {
393 return NS_ERROR_FAILURE;
396 if (NS_SUCCEEDED(aReason)) {
397 NS_WARNING("cancel with non-failure status code");
398 aReason = NS_BASE_STREAM_CLOSED;
401 mCanceled = true;
402 mCancelStatus = aReason;
403 return NS_OK;
406 NS_IMETHOD OnInputStreamReady(nsIAsyncInputStream* aSource) override {
407 PostContinuationEvent();
408 return NS_OK;
411 NS_IMETHOD OnOutputStreamReady(nsIAsyncOutputStream* aSink) override {
412 PostContinuationEvent();
413 return NS_OK;
416 // continuation event handler
417 NS_IMETHOD Run() override {
418 Process();
420 // clear "in process" flag and post any pending continuation event
421 MutexAutoLock lock(mLock);
422 mEventInProcess = false;
423 if (mEventIsPending) {
424 mEventIsPending = false;
425 PostContinuationEvent_Locked();
428 return NS_OK;
431 nsresult Cancel() MOZ_MUST_OVERRIDE override = 0;
433 nsresult PostContinuationEvent() {
434 // we cannot post a continuation event if there is currently
435 // an event in process. doing so could result in Process being
436 // run simultaneously on multiple threads, so we mark the event
437 // as pending, and if an event is already in process then we
438 // just let that existing event take care of posting the real
439 // continuation event.
441 MutexAutoLock lock(mLock);
442 return PostContinuationEvent_Locked();
445 nsresult PostContinuationEvent_Locked() {
446 nsresult rv = NS_OK;
447 if (mEventInProcess) {
448 mEventIsPending = true;
449 } else {
450 rv = mTarget->Dispatch(this, NS_DISPATCH_NORMAL);
451 if (NS_SUCCEEDED(rv)) {
452 mEventInProcess = true;
453 } else {
454 NS_WARNING("unable to post continuation event");
457 return rv;
460 protected:
461 nsCOMPtr<nsIInputStream> mSource;
462 nsCOMPtr<nsIOutputStream> mSink;
463 nsCOMPtr<nsIAsyncInputStream> mAsyncSource;
464 nsCOMPtr<nsIAsyncOutputStream> mAsyncSink;
465 nsCOMPtr<nsIEventTarget> mTarget;
466 Mutex mLock;
467 nsAsyncCopyCallbackFun mCallback;
468 nsAsyncCopyProgressFun mProgressCallback;
469 void* mClosure;
470 uint32_t mChunkSize;
471 bool mEventInProcess;
472 bool mEventIsPending;
473 bool mCloseSource;
474 bool mCloseSink;
475 bool mCanceled;
476 nsresult mCancelStatus;
478 // virtual since subclasses call superclass Release()
479 virtual ~nsAStreamCopier() = default;
482 NS_IMPL_ISUPPORTS_INHERITED(nsAStreamCopier, CancelableRunnable,
483 nsIInputStreamCallback, nsIOutputStreamCallback)
485 class nsStreamCopierIB final : public nsAStreamCopier {
486 public:
487 nsStreamCopierIB() : nsAStreamCopier() {}
488 virtual ~nsStreamCopierIB() = default;
490 struct MOZ_STACK_CLASS ReadSegmentsState {
491 // the nsIOutputStream will outlive the ReadSegmentsState on the stack
492 nsIOutputStream* MOZ_NON_OWNING_REF mSink;
493 nsresult mSinkCondition;
496 static nsresult ConsumeInputBuffer(nsIInputStream* aInStr, void* aClosure,
497 const char* aBuffer, uint32_t aOffset,
498 uint32_t aCount, uint32_t* aCountWritten) {
499 ReadSegmentsState* state = (ReadSegmentsState*)aClosure;
501 nsresult rv = state->mSink->Write(aBuffer, aCount, aCountWritten);
502 if (NS_FAILED(rv)) {
503 state->mSinkCondition = rv;
504 } else if (*aCountWritten == 0) {
505 state->mSinkCondition = NS_BASE_STREAM_CLOSED;
508 return state->mSinkCondition;
511 uint32_t DoCopy(nsresult* aSourceCondition,
512 nsresult* aSinkCondition) override {
513 ReadSegmentsState state;
514 state.mSink = mSink;
515 state.mSinkCondition = NS_OK;
517 uint32_t n;
518 *aSourceCondition =
519 mSource->ReadSegments(ConsumeInputBuffer, &state, mChunkSize, &n);
520 *aSinkCondition = state.mSinkCondition;
521 return n;
524 nsresult Cancel() override { return NS_OK; }
527 class nsStreamCopierOB final : public nsAStreamCopier {
528 public:
529 nsStreamCopierOB() : nsAStreamCopier() {}
530 virtual ~nsStreamCopierOB() = default;
532 struct MOZ_STACK_CLASS WriteSegmentsState {
533 // the nsIInputStream will outlive the WriteSegmentsState on the stack
534 nsIInputStream* MOZ_NON_OWNING_REF mSource;
535 nsresult mSourceCondition;
538 static nsresult FillOutputBuffer(nsIOutputStream* aOutStr, void* aClosure,
539 char* aBuffer, uint32_t aOffset,
540 uint32_t aCount, uint32_t* aCountRead) {
541 WriteSegmentsState* state = (WriteSegmentsState*)aClosure;
543 nsresult rv = state->mSource->Read(aBuffer, aCount, aCountRead);
544 if (NS_FAILED(rv)) {
545 state->mSourceCondition = rv;
546 } else if (*aCountRead == 0) {
547 state->mSourceCondition = NS_BASE_STREAM_CLOSED;
550 return state->mSourceCondition;
553 uint32_t DoCopy(nsresult* aSourceCondition,
554 nsresult* aSinkCondition) override {
555 WriteSegmentsState state;
556 state.mSource = mSource;
557 state.mSourceCondition = NS_OK;
559 uint32_t n;
560 *aSinkCondition =
561 mSink->WriteSegments(FillOutputBuffer, &state, mChunkSize, &n);
562 *aSourceCondition = state.mSourceCondition;
563 return n;
566 nsresult Cancel() override { return NS_OK; }
569 //-----------------------------------------------------------------------------
571 nsresult NS_AsyncCopy(nsIInputStream* aSource, nsIOutputStream* aSink,
572 nsIEventTarget* aTarget, nsAsyncCopyMode aMode,
573 uint32_t aChunkSize, nsAsyncCopyCallbackFun aCallback,
574 void* aClosure, bool aCloseSource, bool aCloseSink,
575 nsISupports** aCopierCtx,
576 nsAsyncCopyProgressFun aProgressCallback) {
577 NS_ASSERTION(aTarget, "non-null target required");
579 nsresult rv;
580 nsAStreamCopier* copier;
582 if (aMode == NS_ASYNCCOPY_VIA_READSEGMENTS) {
583 copier = new nsStreamCopierIB();
584 } else {
585 copier = new nsStreamCopierOB();
588 // Start() takes an owning ref to the copier...
589 NS_ADDREF(copier);
590 rv = copier->Start(aSource, aSink, aTarget, aCallback, aClosure, aChunkSize,
591 aCloseSource, aCloseSink, aProgressCallback);
593 if (aCopierCtx) {
594 *aCopierCtx = static_cast<nsISupports*>(static_cast<nsIRunnable*>(copier));
595 NS_ADDREF(*aCopierCtx);
597 NS_RELEASE(copier);
599 return rv;
602 //-----------------------------------------------------------------------------
604 nsresult NS_CancelAsyncCopy(nsISupports* aCopierCtx, nsresult aReason) {
605 nsAStreamCopier* copier =
606 static_cast<nsAStreamCopier*>(static_cast<nsIRunnable*>(aCopierCtx));
607 return copier->Cancel(aReason);
610 //-----------------------------------------------------------------------------
612 namespace {
613 template <typename T>
614 struct ResultTraits {};
616 template <>
617 struct ResultTraits<nsACString> {
618 static void Clear(nsACString& aString) { aString.Truncate(); }
620 static char* GetStorage(nsACString& aString) {
621 return aString.BeginWriting();
625 template <>
626 struct ResultTraits<nsTArray<uint8_t>> {
627 static void Clear(nsTArray<uint8_t>& aArray) { aArray.Clear(); }
629 static char* GetStorage(nsTArray<uint8_t>& aArray) {
630 return reinterpret_cast<char*>(aArray.Elements());
633 } // namespace
635 template <typename T>
636 nsresult DoConsumeStream(nsIInputStream* aStream, uint32_t aMaxCount,
637 T& aResult) {
638 nsresult rv = NS_OK;
639 ResultTraits<T>::Clear(aResult);
641 while (aMaxCount) {
642 uint64_t avail64;
643 rv = aStream->Available(&avail64);
644 if (NS_FAILED(rv)) {
645 if (rv == NS_BASE_STREAM_CLOSED) {
646 rv = NS_OK;
648 break;
650 if (avail64 == 0) {
651 break;
654 uint32_t avail = (uint32_t)XPCOM_MIN<uint64_t>(avail64, aMaxCount);
656 // resize aResult buffer
657 uint32_t length = aResult.Length();
658 CheckedInt<uint32_t> newLength = CheckedInt<uint32_t>(length) + avail;
659 if (!newLength.isValid()) {
660 return NS_ERROR_FILE_TOO_BIG;
663 if (!aResult.SetLength(newLength.value(), fallible)) {
664 return NS_ERROR_OUT_OF_MEMORY;
666 char* buf = ResultTraits<T>::GetStorage(aResult) + length;
668 uint32_t n;
669 rv = aStream->Read(buf, avail, &n);
670 if (NS_FAILED(rv)) {
671 break;
673 if (n != avail) {
674 MOZ_ASSERT(n < avail, "What happened there???");
675 aResult.SetLength(length + n);
677 if (n == 0) {
678 break;
680 aMaxCount -= n;
683 return rv;
686 nsresult NS_ConsumeStream(nsIInputStream* aStream, uint32_t aMaxCount,
687 nsACString& aResult) {
688 return DoConsumeStream(aStream, aMaxCount, aResult);
691 nsresult NS_ConsumeStream(nsIInputStream* aStream, uint32_t aMaxCount,
692 nsTArray<uint8_t>& aResult) {
693 return DoConsumeStream(aStream, aMaxCount, aResult);
696 //-----------------------------------------------------------------------------
698 static nsresult TestInputStream(nsIInputStream* aInStr, void* aClosure,
699 const char* aBuffer, uint32_t aOffset,
700 uint32_t aCount, uint32_t* aCountWritten) {
701 bool* result = static_cast<bool*>(aClosure);
702 *result = true;
703 *aCountWritten = 0;
704 return NS_ERROR_ABORT; // don't call me anymore
707 bool NS_InputStreamIsBuffered(nsIInputStream* aStream) {
708 nsCOMPtr<nsIBufferedInputStream> bufferedIn = do_QueryInterface(aStream);
709 if (bufferedIn) {
710 return true;
713 bool result = false;
714 uint32_t n;
715 nsresult rv = aStream->ReadSegments(TestInputStream, &result, 1, &n);
716 return result || rv != NS_ERROR_NOT_IMPLEMENTED;
719 static nsresult TestOutputStream(nsIOutputStream* aOutStr, void* aClosure,
720 char* aBuffer, uint32_t aOffset,
721 uint32_t aCount, uint32_t* aCountRead) {
722 bool* result = static_cast<bool*>(aClosure);
723 *result = true;
724 *aCountRead = 0;
725 return NS_ERROR_ABORT; // don't call me anymore
728 bool NS_OutputStreamIsBuffered(nsIOutputStream* aStream) {
729 nsCOMPtr<nsIBufferedOutputStream> bufferedOut = do_QueryInterface(aStream);
730 if (bufferedOut) {
731 return true;
734 bool result = false;
735 uint32_t n;
736 aStream->WriteSegments(TestOutputStream, &result, 1, &n);
737 return result;
740 //-----------------------------------------------------------------------------
742 nsresult NS_CopySegmentToStream(nsIInputStream* aInStr, void* aClosure,
743 const char* aBuffer, uint32_t aOffset,
744 uint32_t aCount, uint32_t* aCountWritten) {
745 nsIOutputStream* outStr = static_cast<nsIOutputStream*>(aClosure);
746 *aCountWritten = 0;
747 while (aCount) {
748 uint32_t n;
749 nsresult rv = outStr->Write(aBuffer, aCount, &n);
750 if (NS_FAILED(rv)) {
751 return rv;
753 aBuffer += n;
754 aCount -= n;
755 *aCountWritten += n;
757 return NS_OK;
760 nsresult NS_CopySegmentToBuffer(nsIInputStream* aInStr, void* aClosure,
761 const char* aBuffer, uint32_t aOffset,
762 uint32_t aCount, uint32_t* aCountWritten) {
763 char* toBuf = static_cast<char*>(aClosure);
764 memcpy(&toBuf[aOffset], aBuffer, aCount);
765 *aCountWritten = aCount;
766 return NS_OK;
769 nsresult NS_CopySegmentToBuffer(nsIOutputStream* aOutStr, void* aClosure,
770 char* aBuffer, uint32_t aOffset,
771 uint32_t aCount, uint32_t* aCountRead) {
772 const char* fromBuf = static_cast<const char*>(aClosure);
773 memcpy(aBuffer, &fromBuf[aOffset], aCount);
774 *aCountRead = aCount;
775 return NS_OK;
778 nsresult NS_DiscardSegment(nsIInputStream* aInStr, void* aClosure,
779 const char* aBuffer, uint32_t aOffset,
780 uint32_t aCount, uint32_t* aCountWritten) {
781 *aCountWritten = aCount;
782 return NS_OK;
785 //-----------------------------------------------------------------------------
787 nsresult NS_WriteSegmentThunk(nsIInputStream* aInStr, void* aClosure,
788 const char* aBuffer, uint32_t aOffset,
789 uint32_t aCount, uint32_t* aCountWritten) {
790 nsWriteSegmentThunk* thunk = static_cast<nsWriteSegmentThunk*>(aClosure);
791 return thunk->mFun(thunk->mStream, thunk->mClosure, aBuffer, aOffset, aCount,
792 aCountWritten);
795 nsresult NS_FillArray(FallibleTArray<char>& aDest, nsIInputStream* aInput,
796 uint32_t aKeep, uint32_t* aNewBytes) {
797 MOZ_ASSERT(aInput, "null stream");
798 MOZ_ASSERT(aKeep <= aDest.Length(), "illegal keep count");
800 char* aBuffer = aDest.Elements();
801 int64_t keepOffset = int64_t(aDest.Length()) - aKeep;
802 if (aKeep != 0 && keepOffset > 0) {
803 memmove(aBuffer, aBuffer + keepOffset, aKeep);
806 nsresult rv =
807 aInput->Read(aBuffer + aKeep, aDest.Capacity() - aKeep, aNewBytes);
808 if (NS_FAILED(rv)) {
809 *aNewBytes = 0;
811 // NOTE: we rely on the fact that the new slots are NOT initialized by
812 // SetLengthAndRetainStorage here, see nsTArrayElementTraits::Construct()
813 // in nsTArray.h:
814 aDest.SetLengthAndRetainStorage(aKeep + *aNewBytes);
816 MOZ_ASSERT(aDest.Length() <= aDest.Capacity(), "buffer overflow");
817 return rv;
820 bool NS_InputStreamIsCloneable(nsIInputStream* aSource) {
821 if (!aSource) {
822 return false;
825 nsCOMPtr<nsICloneableInputStream> cloneable = do_QueryInterface(aSource);
826 return cloneable && cloneable->GetCloneable();
829 nsresult NS_CloneInputStream(nsIInputStream* aSource,
830 nsIInputStream** aCloneOut,
831 nsIInputStream** aReplacementOut) {
832 if (NS_WARN_IF(!aSource)) {
833 return NS_ERROR_FAILURE;
836 // Attempt to perform the clone directly on the source stream
837 nsCOMPtr<nsICloneableInputStream> cloneable = do_QueryInterface(aSource);
838 if (cloneable && cloneable->GetCloneable()) {
839 if (aReplacementOut) {
840 *aReplacementOut = nullptr;
842 return cloneable->Clone(aCloneOut);
845 // If we failed the clone and the caller does not want to replace their
846 // original stream, then we are done. Return error.
847 if (!aReplacementOut) {
848 return NS_ERROR_FAILURE;
851 // The caller has opted-in to the fallback clone support that replaces
852 // the original stream. Copy the data to a pipe and return two cloned
853 // input streams.
855 nsCOMPtr<nsIInputStream> reader;
856 nsCOMPtr<nsIInputStream> readerClone;
857 nsCOMPtr<nsIOutputStream> writer;
859 nsresult rv = NS_NewPipe(getter_AddRefs(reader), getter_AddRefs(writer), 0,
860 0, // default segment size and max size
861 true, true); // non-blocking
862 if (NS_WARN_IF(NS_FAILED(rv))) {
863 return rv;
866 cloneable = do_QueryInterface(reader);
867 MOZ_ASSERT(cloneable && cloneable->GetCloneable());
869 rv = cloneable->Clone(getter_AddRefs(readerClone));
870 if (NS_WARN_IF(NS_FAILED(rv))) {
871 return rv;
874 nsCOMPtr<nsIEventTarget> target =
875 do_GetService(NS_STREAMTRANSPORTSERVICE_CONTRACTID, &rv);
876 if (NS_WARN_IF(NS_FAILED(rv))) {
877 return rv;
880 rv = NS_AsyncCopy(aSource, writer, target, NS_ASYNCCOPY_VIA_WRITESEGMENTS);
881 if (NS_WARN_IF(NS_FAILED(rv))) {
882 return rv;
885 readerClone.forget(aCloneOut);
886 reader.forget(aReplacementOut);
888 return NS_OK;
891 nsresult NS_MakeAsyncNonBlockingInputStream(
892 already_AddRefed<nsIInputStream> aSource,
893 nsIAsyncInputStream** aAsyncInputStream, bool aCloseWhenDone,
894 uint32_t aFlags, uint32_t aSegmentSize, uint32_t aSegmentCount) {
895 nsCOMPtr<nsIInputStream> source = std::move(aSource);
896 if (NS_WARN_IF(!aAsyncInputStream)) {
897 return NS_ERROR_FAILURE;
900 bool nonBlocking = false;
901 nsresult rv = source->IsNonBlocking(&nonBlocking);
902 if (NS_WARN_IF(NS_FAILED(rv))) {
903 return rv;
906 nsCOMPtr<nsIAsyncInputStream> asyncStream = do_QueryInterface(source);
908 if (nonBlocking && asyncStream) {
909 // This stream is perfect!
910 asyncStream.forget(aAsyncInputStream);
911 return NS_OK;
914 if (nonBlocking) {
915 // If the stream is non-blocking but not async, we wrap it.
916 return NonBlockingAsyncInputStream::Create(source.forget(),
917 aAsyncInputStream);
920 nsCOMPtr<nsIStreamTransportService> sts =
921 do_GetService(kStreamTransportServiceCID, &rv);
922 if (NS_WARN_IF(NS_FAILED(rv))) {
923 return rv;
926 nsCOMPtr<nsITransport> transport;
927 rv = sts->CreateInputTransport(source, aCloseWhenDone,
928 getter_AddRefs(transport));
929 if (NS_WARN_IF(NS_FAILED(rv))) {
930 return rv;
933 nsCOMPtr<nsIInputStream> wrapper;
934 rv = transport->OpenInputStream(aFlags, aSegmentSize, aSegmentCount,
935 getter_AddRefs(wrapper));
936 if (NS_WARN_IF(NS_FAILED(rv))) {
937 return rv;
940 asyncStream = do_QueryInterface(wrapper);
941 MOZ_ASSERT(asyncStream);
943 asyncStream.forget(aAsyncInputStream);
944 return NS_OK;