no bug - Bumping Firefox l10n changesets r=release a=l10n-bump DONTBUILD CLOSED TREE
[gecko.git] / xpcom / io / SlicedInputStream.cpp
blobc64af8e9dd8bbced69b13e29a3c5b1f18a8dde83
1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* vim: set ts=8 sts=2 et sw=2 tw=80: */
3 /* This Source Code Form is subject to the terms of the Mozilla Public
4 * License, v. 2.0. If a copy of the MPL was not distributed with this
5 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
7 #include "SlicedInputStream.h"
8 #include "mozilla/ipc/InputStreamUtils.h"
9 #include "mozilla/CheckedInt.h"
10 #include "mozilla/ScopeExit.h"
11 #include "nsISeekableStream.h"
12 #include "nsStreamUtils.h"
14 namespace mozilla {
16 using namespace ipc;
18 NS_IMPL_ADDREF(SlicedInputStream);
19 NS_IMPL_RELEASE(SlicedInputStream);
21 NS_INTERFACE_MAP_BEGIN(SlicedInputStream)
22 NS_INTERFACE_MAP_ENTRY(nsIInputStream)
23 NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsICloneableInputStream,
24 mWeakCloneableInputStream || !mInputStream)
25 NS_INTERFACE_MAP_ENTRY_CONDITIONAL(
26 nsIIPCSerializableInputStream,
27 mWeakIPCSerializableInputStream || !mInputStream)
28 NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsISeekableStream,
29 mWeakSeekableInputStream || !mInputStream)
30 NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsITellableStream,
31 mWeakTellableInputStream || !mInputStream)
32 NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsIAsyncInputStream,
33 mWeakAsyncInputStream || !mInputStream)
34 NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsIInputStreamCallback,
35 mWeakAsyncInputStream || !mInputStream)
36 NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsIInputStreamLength,
37 mWeakInputStreamLength || !mInputStream)
38 NS_INTERFACE_MAP_ENTRY_CONDITIONAL(
39 nsIAsyncInputStreamLength, mWeakAsyncInputStreamLength || !mInputStream)
40 NS_INTERFACE_MAP_ENTRY_CONDITIONAL(
41 nsIInputStreamLengthCallback,
42 mWeakAsyncInputStreamLength || !mInputStream)
43 NS_INTERFACE_MAP_ENTRY_AMBIGUOUS(nsISupports, nsIInputStream)
44 NS_INTERFACE_MAP_END
46 SlicedInputStream::SlicedInputStream(
47 already_AddRefed<nsIInputStream> aInputStream, uint64_t aStart,
48 uint64_t aLength)
49 : mWeakCloneableInputStream(nullptr),
50 mWeakIPCSerializableInputStream(nullptr),
51 mWeakSeekableInputStream(nullptr),
52 mWeakTellableInputStream(nullptr),
53 mWeakAsyncInputStream(nullptr),
54 mWeakInputStreamLength(nullptr),
55 mWeakAsyncInputStreamLength(nullptr),
56 mStart(aStart),
57 mLength(aLength),
58 mCurPos(0),
59 mClosed(false),
60 mAsyncWaitFlags(0),
61 mAsyncWaitRequestedCount(0),
62 mMutex("SlicedInputStream::mMutex") {
63 nsCOMPtr<nsIInputStream> inputStream = std::move(aInputStream);
64 SetSourceStream(inputStream.forget());
67 SlicedInputStream::SlicedInputStream()
68 : mWeakCloneableInputStream(nullptr),
69 mWeakIPCSerializableInputStream(nullptr),
70 mWeakSeekableInputStream(nullptr),
71 mWeakTellableInputStream(nullptr),
72 mWeakAsyncInputStream(nullptr),
73 mWeakInputStreamLength(nullptr),
74 mWeakAsyncInputStreamLength(nullptr),
75 mStart(0),
76 mLength(0),
77 mCurPos(0),
78 mClosed(false),
79 mAsyncWaitFlags(0),
80 mAsyncWaitRequestedCount(0),
81 mMutex("SlicedInputStream::mMutex") {}
83 SlicedInputStream::~SlicedInputStream() = default;
85 void SlicedInputStream::SetSourceStream(
86 already_AddRefed<nsIInputStream> aInputStream) {
87 MOZ_ASSERT(!mInputStream);
89 mInputStream = std::move(aInputStream);
91 nsCOMPtr<nsICloneableInputStream> cloneableStream =
92 do_QueryInterface(mInputStream);
93 if (cloneableStream && SameCOMIdentity(mInputStream, cloneableStream)) {
94 mWeakCloneableInputStream = cloneableStream;
97 nsCOMPtr<nsIIPCSerializableInputStream> serializableStream =
98 do_QueryInterface(mInputStream);
99 if (serializableStream && SameCOMIdentity(mInputStream, serializableStream)) {
100 mWeakIPCSerializableInputStream = serializableStream;
103 nsCOMPtr<nsISeekableStream> seekableStream = do_QueryInterface(mInputStream);
104 if (seekableStream && SameCOMIdentity(mInputStream, seekableStream)) {
105 mWeakSeekableInputStream = seekableStream;
108 nsCOMPtr<nsITellableStream> tellableStream = do_QueryInterface(mInputStream);
109 if (tellableStream && SameCOMIdentity(mInputStream, tellableStream)) {
110 mWeakTellableInputStream = tellableStream;
113 nsCOMPtr<nsIAsyncInputStream> asyncInputStream =
114 do_QueryInterface(mInputStream);
115 if (asyncInputStream && SameCOMIdentity(mInputStream, asyncInputStream)) {
116 mWeakAsyncInputStream = asyncInputStream;
119 nsCOMPtr<nsIInputStreamLength> streamLength = do_QueryInterface(mInputStream);
120 if (streamLength && SameCOMIdentity(mInputStream, streamLength)) {
121 mWeakInputStreamLength = streamLength;
124 nsCOMPtr<nsIAsyncInputStreamLength> asyncStreamLength =
125 do_QueryInterface(mInputStream);
126 if (asyncStreamLength && SameCOMIdentity(mInputStream, asyncStreamLength)) {
127 mWeakAsyncInputStreamLength = asyncStreamLength;
131 uint64_t SlicedInputStream::AdjustRange(uint64_t aRange) {
132 CheckedUint64 range(aRange);
133 range += mCurPos;
135 // Let's remove extra length from the end.
136 if (range.isValid() && range.value() > mStart + mLength) {
137 aRange -= XPCOM_MIN((uint64_t)aRange, range.value() - (mStart + mLength));
140 // Let's remove extra length from the begin.
141 if (mCurPos < mStart) {
142 aRange -= XPCOM_MIN((uint64_t)aRange, mStart - mCurPos);
145 return aRange;
148 // nsIInputStream interface
150 NS_IMETHODIMP
151 SlicedInputStream::Close() {
152 NS_ENSURE_STATE(mInputStream);
154 mClosed = true;
155 return mInputStream->Close();
158 NS_IMETHODIMP
159 SlicedInputStream::Available(uint64_t* aLength) {
160 NS_ENSURE_STATE(mInputStream);
162 if (mClosed) {
163 return NS_BASE_STREAM_CLOSED;
166 nsresult rv = mInputStream->Available(aLength);
167 if (rv == NS_BASE_STREAM_CLOSED) {
168 mClosed = true;
169 return rv;
172 if (NS_WARN_IF(NS_FAILED(rv))) {
173 return rv;
176 *aLength = AdjustRange(*aLength);
177 return NS_OK;
180 NS_IMETHODIMP
181 SlicedInputStream::StreamStatus() {
182 NS_ENSURE_STATE(mInputStream);
184 if (mClosed) {
185 return NS_BASE_STREAM_CLOSED;
188 nsresult rv = mInputStream->StreamStatus();
189 if (rv == NS_BASE_STREAM_CLOSED) {
190 mClosed = true;
192 return rv;
195 NS_IMETHODIMP
196 SlicedInputStream::Read(char* aBuffer, uint32_t aCount, uint32_t* aReadCount) {
197 *aReadCount = 0;
199 if (mClosed) {
200 return NS_OK;
203 if (mCurPos < mStart) {
204 nsCOMPtr<nsISeekableStream> seekableStream =
205 do_QueryInterface(mInputStream);
206 if (seekableStream) {
207 nsresult rv =
208 seekableStream->Seek(nsISeekableStream::NS_SEEK_SET, mStart);
209 if (NS_WARN_IF(NS_FAILED(rv))) {
210 return rv;
213 mCurPos = mStart;
214 } else {
215 char buf[4096];
216 while (mCurPos < mStart) {
217 uint32_t bytesRead;
218 uint64_t bufCount = XPCOM_MIN(mStart - mCurPos, (uint64_t)sizeof(buf));
219 nsresult rv = mInputStream->Read(buf, bufCount, &bytesRead);
220 if (NS_SUCCEEDED(rv) && bytesRead == 0) {
221 mClosed = true;
222 return rv;
225 if (NS_WARN_IF(NS_FAILED(rv))) {
226 return rv;
229 mCurPos += bytesRead;
234 // Let's reduce aCount in case it's too big.
235 if (mCurPos + aCount > mStart + mLength) {
236 aCount = mStart + mLength - mCurPos;
239 // Nothing else to read.
240 if (!aCount) {
241 return NS_OK;
244 nsresult rv = mInputStream->Read(aBuffer, aCount, aReadCount);
245 if (NS_SUCCEEDED(rv) && *aReadCount == 0) {
246 mClosed = true;
247 return rv;
250 if (NS_WARN_IF(NS_FAILED(rv))) {
251 return rv;
254 mCurPos += *aReadCount;
255 return NS_OK;
258 NS_IMETHODIMP
259 SlicedInputStream::ReadSegments(nsWriteSegmentFun aWriter, void* aClosure,
260 uint32_t aCount, uint32_t* aResult) {
261 return NS_ERROR_NOT_IMPLEMENTED;
264 NS_IMETHODIMP
265 SlicedInputStream::IsNonBlocking(bool* aNonBlocking) {
266 NS_ENSURE_STATE(mInputStream);
267 return mInputStream->IsNonBlocking(aNonBlocking);
270 // nsICloneableInputStream interface
272 NS_IMETHODIMP
273 SlicedInputStream::GetCloneable(bool* aCloneable) {
274 NS_ENSURE_STATE(mInputStream);
275 NS_ENSURE_STATE(mWeakCloneableInputStream);
277 *aCloneable = true;
278 return NS_OK;
281 NS_IMETHODIMP
282 SlicedInputStream::Clone(nsIInputStream** aResult) {
283 NS_ENSURE_STATE(mInputStream);
284 NS_ENSURE_STATE(mWeakCloneableInputStream);
286 nsCOMPtr<nsIInputStream> clonedStream;
287 nsresult rv = mWeakCloneableInputStream->Clone(getter_AddRefs(clonedStream));
288 if (NS_WARN_IF(NS_FAILED(rv))) {
289 return rv;
292 nsCOMPtr<nsIInputStream> sis =
293 new SlicedInputStream(clonedStream.forget(), mStart, mLength);
295 sis.forget(aResult);
296 return NS_OK;
299 // nsIAsyncInputStream interface
301 NS_IMETHODIMP
302 SlicedInputStream::CloseWithStatus(nsresult aStatus) {
303 NS_ENSURE_STATE(mInputStream);
304 NS_ENSURE_STATE(mWeakAsyncInputStream);
306 mClosed = true;
307 return mWeakAsyncInputStream->CloseWithStatus(aStatus);
310 NS_IMETHODIMP
311 SlicedInputStream::AsyncWait(nsIInputStreamCallback* aCallback, uint32_t aFlags,
312 uint32_t aRequestedCount,
313 nsIEventTarget* aEventTarget) {
314 NS_ENSURE_STATE(mInputStream);
315 NS_ENSURE_STATE(mWeakAsyncInputStream);
317 nsCOMPtr<nsIInputStreamCallback> callback = aCallback ? this : nullptr;
319 uint32_t flags = aFlags;
320 uint32_t requestedCount = aRequestedCount;
323 MutexAutoLock lock(mMutex);
325 if (NS_WARN_IF(mAsyncWaitCallback && aCallback &&
326 mAsyncWaitCallback != aCallback)) {
327 return NS_ERROR_FAILURE;
330 mAsyncWaitCallback = aCallback;
332 // If we haven't started retrieving data, let's see if we can seek.
333 // If we cannot seek, we will do consecutive reads.
334 if (mCurPos < mStart && mWeakSeekableInputStream) {
335 nsresult rv = mWeakSeekableInputStream->Seek(
336 nsISeekableStream::NS_SEEK_SET, mStart);
337 if (NS_WARN_IF(NS_FAILED(rv))) {
338 return rv;
341 mCurPos = mStart;
344 mAsyncWaitFlags = aFlags;
345 mAsyncWaitRequestedCount = aRequestedCount;
346 mAsyncWaitEventTarget = aEventTarget;
348 // If we are not at the right position, let's do an asyncWait just internal.
349 if (mCurPos < mStart) {
350 flags = 0;
351 requestedCount = mStart - mCurPos;
355 return mWeakAsyncInputStream->AsyncWait(callback, flags, requestedCount,
356 aEventTarget);
359 // nsIInputStreamCallback
361 NS_IMETHODIMP
362 SlicedInputStream::OnInputStreamReady(nsIAsyncInputStream* aStream) {
363 MOZ_ASSERT(mInputStream);
364 MOZ_ASSERT(mWeakAsyncInputStream);
365 MOZ_ASSERT(mWeakAsyncInputStream == aStream);
367 nsCOMPtr<nsIInputStreamCallback> callback;
368 uint32_t asyncWaitFlags = 0;
369 uint32_t asyncWaitRequestedCount = 0;
370 nsCOMPtr<nsIEventTarget> asyncWaitEventTarget;
373 MutexAutoLock lock(mMutex);
375 // We have been canceled in the meanwhile.
376 if (!mAsyncWaitCallback) {
377 return NS_OK;
380 auto raii = MakeScopeExit([&] {
381 mMutex.AssertCurrentThreadOwns();
382 mAsyncWaitCallback = nullptr;
383 mAsyncWaitEventTarget = nullptr;
386 asyncWaitFlags = mAsyncWaitFlags;
387 asyncWaitRequestedCount = mAsyncWaitRequestedCount;
388 asyncWaitEventTarget = mAsyncWaitEventTarget;
390 // If at the end of this locked block, the callback is not null, it will be
391 // executed, otherwise, we are going to exec another AsyncWait().
392 callback = mAsyncWaitCallback;
394 if (mCurPos < mStart) {
395 char buf[4096];
396 nsresult rv = NS_OK;
397 while (mCurPos < mStart) {
398 uint32_t bytesRead;
399 uint64_t bufCount = XPCOM_MIN(mStart - mCurPos, (uint64_t)sizeof(buf));
400 rv = mInputStream->Read(buf, bufCount, &bytesRead);
401 if (NS_SUCCEEDED(rv) && bytesRead == 0) {
402 mClosed = true;
403 break;
406 if (rv == NS_BASE_STREAM_WOULD_BLOCK) {
407 asyncWaitFlags = 0;
408 asyncWaitRequestedCount = mStart - mCurPos;
409 // Here we want to exec another AsyncWait().
410 callback = nullptr;
411 break;
414 if (NS_WARN_IF(NS_FAILED(rv))) {
415 break;
418 mCurPos += bytesRead;
421 // Now we are ready to do the 'real' asyncWait.
422 if (mCurPos >= mStart) {
423 // We don't want to nullify the callback now, because it will be needed
424 // at the next ::OnInputStreamReady.
425 raii.release();
426 callback = nullptr;
431 if (callback) {
432 return callback->OnInputStreamReady(this);
435 return mWeakAsyncInputStream->AsyncWait(
436 this, asyncWaitFlags, asyncWaitRequestedCount, asyncWaitEventTarget);
439 // nsIIPCSerializableInputStream
441 void SlicedInputStream::SerializedComplexity(uint32_t aMaxSize,
442 uint32_t* aSizeUsed,
443 uint32_t* aPipes,
444 uint32_t* aTransferables) {
445 InputStreamHelper::SerializedComplexity(mInputStream, aMaxSize, aSizeUsed,
446 aPipes, aTransferables);
448 // If we're going to be serializing a pipe to transfer the sliced data, and we
449 // are getting no efficiency improvements from transferables, stream this
450 // sliced input stream directly as a pipe to avoid streaming data which will
451 // be sliced off anyway.
452 if (*aPipes > 0 && *aTransferables == 0) {
453 *aSizeUsed = 0;
454 *aPipes = 1;
455 *aTransferables = 0;
459 void SlicedInputStream::Serialize(mozilla::ipc::InputStreamParams& aParams,
460 uint32_t aMaxSize, uint32_t* aSizeUsed) {
461 MOZ_ASSERT(mInputStream);
462 MOZ_ASSERT(mWeakIPCSerializableInputStream);
464 uint32_t sizeUsed = 0, pipes = 0, transferables = 0;
465 SerializedComplexity(aMaxSize, &sizeUsed, &pipes, &transferables);
466 if (pipes > 0 && transferables == 0) {
467 InputStreamHelper::SerializeInputStreamAsPipe(this, aParams);
468 return;
471 SlicedInputStreamParams params;
472 InputStreamHelper::SerializeInputStream(mInputStream, params.stream(),
473 aMaxSize, aSizeUsed);
474 params.start() = mStart;
475 params.length() = mLength;
476 params.curPos() = mCurPos;
477 params.closed() = mClosed;
479 aParams = params;
482 bool SlicedInputStream::Deserialize(
483 const mozilla::ipc::InputStreamParams& aParams) {
484 MOZ_ASSERT(!mInputStream);
485 MOZ_ASSERT(!mWeakIPCSerializableInputStream);
487 if (aParams.type() != InputStreamParams::TSlicedInputStreamParams) {
488 NS_ERROR("Received unknown parameters from the other process!");
489 return false;
492 const SlicedInputStreamParams& params = aParams.get_SlicedInputStreamParams();
494 nsCOMPtr<nsIInputStream> stream =
495 InputStreamHelper::DeserializeInputStream(params.stream());
496 if (!stream) {
497 NS_WARNING("Deserialize failed!");
498 return false;
501 SetSourceStream(stream.forget());
503 mStart = params.start();
504 mLength = params.length();
505 mCurPos = params.curPos();
506 mClosed = params.closed();
508 return true;
511 // nsISeekableStream
513 NS_IMETHODIMP
514 SlicedInputStream::Seek(int32_t aWhence, int64_t aOffset) {
515 NS_ENSURE_STATE(mInputStream);
516 NS_ENSURE_STATE(mWeakSeekableInputStream);
518 int64_t offset;
519 nsresult rv;
521 switch (aWhence) {
522 case NS_SEEK_SET:
523 offset = mStart + aOffset;
524 break;
525 case NS_SEEK_CUR:
526 // mCurPos could be lower than mStart if the reading has not started yet.
527 offset = XPCOM_MAX(mStart, mCurPos) + aOffset;
528 break;
529 case NS_SEEK_END: {
530 uint64_t available;
531 rv = mInputStream->Available(&available);
532 if (rv == NS_BASE_STREAM_CLOSED) {
533 mClosed = true;
534 return rv;
537 if (NS_WARN_IF(NS_FAILED(rv))) {
538 return rv;
541 offset = XPCOM_MIN(mStart + mLength, available) + aOffset;
542 break;
544 default:
545 return NS_ERROR_ILLEGAL_VALUE;
548 if (offset < (int64_t)mStart || offset > (int64_t)(mStart + mLength)) {
549 return NS_ERROR_INVALID_ARG;
552 rv = mWeakSeekableInputStream->Seek(NS_SEEK_SET, offset);
553 if (NS_WARN_IF(NS_FAILED(rv))) {
554 return rv;
557 mCurPos = offset;
558 return NS_OK;
561 NS_IMETHODIMP
562 SlicedInputStream::SetEOF() {
563 NS_ENSURE_STATE(mInputStream);
564 NS_ENSURE_STATE(mWeakSeekableInputStream);
566 mClosed = true;
567 return mWeakSeekableInputStream->SetEOF();
570 // nsITellableStream
572 NS_IMETHODIMP
573 SlicedInputStream::Tell(int64_t* aResult) {
574 NS_ENSURE_STATE(mInputStream);
575 NS_ENSURE_STATE(mWeakTellableInputStream);
577 int64_t tell = 0;
579 nsresult rv = mWeakTellableInputStream->Tell(&tell);
580 if (NS_WARN_IF(NS_FAILED(rv))) {
581 return rv;
584 if (tell < (int64_t)mStart) {
585 *aResult = 0;
586 return NS_OK;
589 *aResult = tell - mStart;
590 if (*aResult > (int64_t)mLength) {
591 *aResult = mLength;
594 return NS_OK;
597 // nsIInputStreamLength
599 NS_IMETHODIMP
600 SlicedInputStream::Length(int64_t* aLength) {
601 NS_ENSURE_STATE(mInputStream);
602 NS_ENSURE_STATE(mWeakInputStreamLength);
604 nsresult rv = mWeakInputStreamLength->Length(aLength);
605 if (rv == NS_BASE_STREAM_CLOSED) {
606 mClosed = true;
607 return rv;
610 if (NS_WARN_IF(NS_FAILED(rv))) {
611 return rv;
614 if (*aLength == -1) {
615 return NS_OK;
618 *aLength = (int64_t)AdjustRange((uint64_t)*aLength);
619 return NS_OK;
622 // nsIAsyncInputStreamLength
624 NS_IMETHODIMP
625 SlicedInputStream::AsyncLengthWait(nsIInputStreamLengthCallback* aCallback,
626 nsIEventTarget* aEventTarget) {
627 NS_ENSURE_STATE(mInputStream);
628 NS_ENSURE_STATE(mWeakAsyncInputStreamLength);
630 nsCOMPtr<nsIInputStreamLengthCallback> callback = aCallback ? this : nullptr;
632 MutexAutoLock lock(mMutex);
633 mAsyncWaitLengthCallback = aCallback;
636 return mWeakAsyncInputStreamLength->AsyncLengthWait(callback, aEventTarget);
639 // nsIInputStreamLengthCallback
641 NS_IMETHODIMP
642 SlicedInputStream::OnInputStreamLengthReady(nsIAsyncInputStreamLength* aStream,
643 int64_t aLength) {
644 MOZ_ASSERT(mInputStream);
645 MOZ_ASSERT(mWeakAsyncInputStreamLength);
646 MOZ_ASSERT(mWeakAsyncInputStreamLength == aStream);
648 nsCOMPtr<nsIInputStreamLengthCallback> callback;
650 MutexAutoLock lock(mMutex);
652 // We have been canceled in the meanwhile.
653 if (!mAsyncWaitLengthCallback) {
654 return NS_OK;
657 callback.swap(mAsyncWaitLengthCallback);
660 if (aLength != -1) {
661 aLength = (int64_t)AdjustRange((uint64_t)aLength);
664 MOZ_ASSERT(callback);
665 return callback->OnInputStreamLengthReady(this, aLength);
668 } // namespace mozilla