Bug 1858509 add thread-safety annotations around MediaSourceDemuxer::mMonitor r=alwu
[gecko.git] / ipc / glue / DataPipe.cpp
blobbc1af11515fd3061063e63c3697b7138e687711c
1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* vim: set ts=8 sts=2 et sw=2 tw=80: */
3 /* This Source Code Form is subject to the terms of the Mozilla Public
4 * License, v. 2.0. If a copy of the MPL was not distributed with this
5 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
7 #include "DataPipe.h"
8 #include "mozilla/AlreadyAddRefed.h"
9 #include "mozilla/Assertions.h"
10 #include "mozilla/CheckedInt.h"
11 #include "mozilla/ErrorNames.h"
12 #include "mozilla/Logging.h"
13 #include "mozilla/MoveOnlyFunction.h"
14 #include "mozilla/ipc/InputStreamParams.h"
15 #include "nsIAsyncInputStream.h"
16 #include "nsStreamUtils.h"
17 #include "nsThreadUtils.h"
19 namespace mozilla {
20 namespace ipc {
22 LazyLogModule gDataPipeLog("DataPipe");
24 namespace data_pipe_detail {
26 // Helper for queueing up actions to be run once the mutex has been unlocked.
27 // Actions will be run in-order.
28 class MOZ_SCOPED_CAPABILITY DataPipeAutoLock {
29 public:
30 explicit DataPipeAutoLock(Mutex& aMutex) MOZ_CAPABILITY_ACQUIRE(aMutex)
31 : mMutex(aMutex) {
32 mMutex.Lock();
34 DataPipeAutoLock(const DataPipeAutoLock&) = delete;
35 DataPipeAutoLock& operator=(const DataPipeAutoLock&) = delete;
37 template <typename F>
38 void AddUnlockAction(F aAction) {
39 mActions.AppendElement(std::move(aAction));
42 ~DataPipeAutoLock() MOZ_CAPABILITY_RELEASE() {
43 mMutex.Unlock();
44 for (auto& action : mActions) {
45 action();
49 private:
50 Mutex& mMutex;
51 AutoTArray<MoveOnlyFunction<void()>, 4> mActions;
54 static void DoNotifyOnUnlock(DataPipeAutoLock& aLock,
55 already_AddRefed<nsIRunnable> aCallback,
56 already_AddRefed<nsIEventTarget> aTarget) {
57 nsCOMPtr<nsIRunnable> callback{std::move(aCallback)};
58 nsCOMPtr<nsIEventTarget> target{std::move(aTarget)};
59 if (callback) {
60 aLock.AddUnlockAction(
61 [callback = std::move(callback), target = std::move(target)]() mutable {
62 if (target) {
63 target->Dispatch(callback.forget());
64 } else {
65 NS_DispatchBackgroundTask(callback.forget());
67 });
71 class DataPipeLink : public NodeController::PortObserver {
72 public:
73 DataPipeLink(bool aReceiverSide, std::shared_ptr<Mutex> aMutex,
74 ScopedPort aPort, SharedMemoryBasic::Handle aShmemHandle,
75 SharedMemory* aShmem, uint32_t aCapacity, nsresult aPeerStatus,
76 uint32_t aOffset, uint32_t aAvailable)
77 : mMutex(std::move(aMutex)),
78 mPort(std::move(aPort)),
79 mShmemHandle(std::move(aShmemHandle)),
80 mShmem(aShmem),
81 mCapacity(aCapacity),
82 mReceiverSide(aReceiverSide),
83 mPeerStatus(aPeerStatus),
84 mOffset(aOffset),
85 mAvailable(aAvailable) {}
87 void Init() MOZ_EXCLUDES(*mMutex) {
89 DataPipeAutoLock lock(*mMutex);
90 if (NS_FAILED(mPeerStatus)) {
91 return;
93 MOZ_ASSERT(mPort.IsValid());
94 mPort.Controller()->SetPortObserver(mPort.Port(), this);
96 OnPortStatusChanged();
99 void OnPortStatusChanged() final MOZ_EXCLUDES(*mMutex);
101 // Add a task to notify the callback after `aLock` is unlocked.
103 // This method is safe to call multiple times, as after the first time it is
104 // called, `mCallback` will be cleared.
105 void NotifyOnUnlock(DataPipeAutoLock& aLock) MOZ_REQUIRES(*mMutex) {
106 DoNotifyOnUnlock(aLock, mCallback.forget(), mCallbackTarget.forget());
109 void SendBytesConsumedOnUnlock(DataPipeAutoLock& aLock, uint32_t aBytes)
110 MOZ_REQUIRES(*mMutex) {
111 MOZ_LOG(gDataPipeLog, LogLevel::Verbose,
112 ("SendOnUnlock CONSUMED(%u) %s", aBytes, Describe(aLock).get()));
113 if (NS_FAILED(mPeerStatus)) {
114 return;
117 // `mPort` may be destroyed by `SetPeerError` after the DataPipe is unlocked
118 // but before we send the message. The strong controller and port references
119 // will allow us to try to send the message anyway, and it will be safely
120 // dropped if the port has already been closed. CONSUMED messages are safe
121 // to deliver out-of-order, so we don't need to worry about ordering here.
122 aLock.AddUnlockAction([controller = RefPtr{mPort.Controller()},
123 port = mPort.Port(), aBytes]() mutable {
124 auto message = MakeUnique<IPC::Message>(
125 MSG_ROUTING_NONE, DATA_PIPE_BYTES_CONSUMED_MESSAGE_TYPE);
126 IPC::MessageWriter writer(*message);
127 WriteParam(&writer, aBytes);
128 controller->SendUserMessage(port, std::move(message));
132 void SetPeerError(DataPipeAutoLock& aLock, nsresult aStatus,
133 bool aSendClosed = false) MOZ_REQUIRES(*mMutex) {
134 MOZ_LOG(gDataPipeLog, LogLevel::Debug,
135 ("SetPeerError(%s%s) %s", GetStaticErrorName(aStatus),
136 aSendClosed ? ", send" : "", Describe(aLock).get()));
137 // The pipe was closed or errored. Clear the observer reference back
138 // to this type from the port layer, and ensure we notify waiters.
139 MOZ_ASSERT(NS_SUCCEEDED(mPeerStatus));
140 mPeerStatus = NS_SUCCEEDED(aStatus) ? NS_BASE_STREAM_CLOSED : aStatus;
141 aLock.AddUnlockAction([port = std::move(mPort), aStatus, aSendClosed] {
142 if (aSendClosed) {
143 auto message = MakeUnique<IPC::Message>(MSG_ROUTING_NONE,
144 DATA_PIPE_CLOSED_MESSAGE_TYPE);
145 IPC::MessageWriter writer(*message);
146 WriteParam(&writer, aStatus);
147 port.Controller()->SendUserMessage(port.Port(), std::move(message));
149 // The `ScopedPort` being destroyed with this action will close it,
150 // clearing the observer reference from the ports layer.
152 NotifyOnUnlock(aLock);
155 nsCString Describe(DataPipeAutoLock& aLock) const MOZ_REQUIRES(*mMutex) {
156 return nsPrintfCString(
157 "[%s(%p) c=%u e=%s o=%u a=%u, cb=%s]",
158 mReceiverSide ? "Receiver" : "Sender", this, mCapacity,
159 GetStaticErrorName(mPeerStatus), mOffset, mAvailable,
160 mCallback ? (mCallbackClosureOnly ? "clo" : "yes") : "no");
163 // This mutex is shared with the `DataPipeBase` which owns this
164 // `DataPipeLink`.
165 std::shared_ptr<Mutex> mMutex;
167 ScopedPort mPort MOZ_GUARDED_BY(*mMutex);
168 SharedMemoryBasic::Handle mShmemHandle MOZ_GUARDED_BY(*mMutex);
169 const RefPtr<SharedMemory> mShmem;
170 const uint32_t mCapacity;
171 const bool mReceiverSide;
173 bool mProcessingSegment MOZ_GUARDED_BY(*mMutex) = false;
175 nsresult mPeerStatus MOZ_GUARDED_BY(*mMutex) = NS_OK;
176 uint32_t mOffset MOZ_GUARDED_BY(*mMutex) = 0;
177 uint32_t mAvailable MOZ_GUARDED_BY(*mMutex) = 0;
179 bool mCallbackClosureOnly MOZ_GUARDED_BY(*mMutex) = false;
180 nsCOMPtr<nsIRunnable> mCallback MOZ_GUARDED_BY(*mMutex);
181 nsCOMPtr<nsIEventTarget> mCallbackTarget MOZ_GUARDED_BY(*mMutex);
184 void DataPipeLink::OnPortStatusChanged() {
185 DataPipeAutoLock lock(*mMutex);
187 while (NS_SUCCEEDED(mPeerStatus)) {
188 UniquePtr<IPC::Message> message;
189 if (!mPort.Controller()->GetMessage(mPort.Port(), &message)) {
190 SetPeerError(lock, NS_BASE_STREAM_CLOSED);
191 return;
193 if (!message) {
194 return; // no more messages
197 IPC::MessageReader reader(*message);
198 switch (message->type()) {
199 case DATA_PIPE_CLOSED_MESSAGE_TYPE: {
200 nsresult status = NS_OK;
201 if (!ReadParam(&reader, &status)) {
202 NS_WARNING("Unable to parse nsresult error from peer");
203 status = NS_ERROR_UNEXPECTED;
205 MOZ_LOG(gDataPipeLog, LogLevel::Debug,
206 ("Got CLOSED(%s) %s", GetStaticErrorName(status),
207 Describe(lock).get()));
208 SetPeerError(lock, status);
209 return;
211 case DATA_PIPE_BYTES_CONSUMED_MESSAGE_TYPE: {
212 uint32_t consumed = 0;
213 if (!ReadParam(&reader, &consumed)) {
214 NS_WARNING("Unable to parse bytes consumed from peer");
215 SetPeerError(lock, NS_ERROR_UNEXPECTED);
216 return;
219 MOZ_LOG(gDataPipeLog, LogLevel::Verbose,
220 ("Got CONSUMED(%u) %s", consumed, Describe(lock).get()));
221 auto newAvailable = CheckedUint32{mAvailable} + consumed;
222 if (!newAvailable.isValid() || newAvailable.value() > mCapacity) {
223 NS_WARNING("Illegal bytes consumed message received from peer");
224 SetPeerError(lock, NS_ERROR_UNEXPECTED);
225 return;
227 mAvailable = newAvailable.value();
228 if (!mCallbackClosureOnly) {
229 NotifyOnUnlock(lock);
231 break;
233 default: {
234 NS_WARNING("Illegal message type received from peer");
235 SetPeerError(lock, NS_ERROR_UNEXPECTED);
236 return;
242 DataPipeBase::DataPipeBase(bool aReceiverSide, nsresult aError)
243 : mMutex(std::make_shared<Mutex>(aReceiverSide ? "DataPipeReceiver"
244 : "DataPipeSender")),
245 mStatus(NS_SUCCEEDED(aError) ? NS_BASE_STREAM_CLOSED : aError) {}
247 DataPipeBase::DataPipeBase(bool aReceiverSide, ScopedPort aPort,
248 SharedMemoryBasic::Handle aShmemHandle,
249 SharedMemory* aShmem, uint32_t aCapacity,
250 nsresult aPeerStatus, uint32_t aOffset,
251 uint32_t aAvailable)
252 : mMutex(std::make_shared<Mutex>(aReceiverSide ? "DataPipeReceiver"
253 : "DataPipeSender")),
254 mStatus(NS_OK),
255 mLink(new DataPipeLink(aReceiverSide, mMutex, std::move(aPort),
256 std::move(aShmemHandle), aShmem, aCapacity,
257 aPeerStatus, aOffset, aAvailable)) {
258 mLink->Init();
261 DataPipeBase::~DataPipeBase() {
262 DataPipeAutoLock lock(*mMutex);
263 CloseInternal(lock, NS_BASE_STREAM_CLOSED);
266 void DataPipeBase::CloseInternal(DataPipeAutoLock& aLock, nsresult aStatus) {
267 if (NS_FAILED(mStatus)) {
268 return;
271 MOZ_LOG(
272 gDataPipeLog, LogLevel::Debug,
273 ("Closing(%s) %s", GetStaticErrorName(aStatus), Describe(aLock).get()));
275 // Set our status to an errored status.
276 mStatus = NS_SUCCEEDED(aStatus) ? NS_BASE_STREAM_CLOSED : aStatus;
277 RefPtr<DataPipeLink> link = mLink.forget();
278 AssertSameMutex(link->mMutex);
279 link->NotifyOnUnlock(aLock);
281 // If our peer hasn't disappeared yet, clean up our connection to it.
282 if (NS_SUCCEEDED(link->mPeerStatus)) {
283 link->SetPeerError(aLock, mStatus, /* aSendClosed */ true);
287 nsresult DataPipeBase::ProcessSegmentsInternal(
288 uint32_t aCount, ProcessSegmentFun aProcessSegment,
289 uint32_t* aProcessedCount) {
290 *aProcessedCount = 0;
292 while (*aProcessedCount < aCount) {
293 DataPipeAutoLock lock(*mMutex);
294 mMutex->AssertCurrentThreadOwns();
296 MOZ_LOG(gDataPipeLog, LogLevel::Verbose,
297 ("ProcessSegments(%u of %u) %s", *aProcessedCount, aCount,
298 Describe(lock).get()));
300 nsresult status = CheckStatus(lock);
301 if (NS_FAILED(status)) {
302 if (*aProcessedCount > 0) {
303 return NS_OK;
305 return status == NS_BASE_STREAM_CLOSED ? NS_OK : status;
308 RefPtr<DataPipeLink> link = mLink;
309 AssertSameMutex(link->mMutex);
310 if (!link->mAvailable) {
311 MOZ_DIAGNOSTIC_ASSERT(NS_SUCCEEDED(link->mPeerStatus),
312 "CheckStatus will have returned an error");
313 return *aProcessedCount > 0 ? NS_OK : NS_BASE_STREAM_WOULD_BLOCK;
316 MOZ_RELEASE_ASSERT(!link->mProcessingSegment,
317 "Only one thread may be processing a segment at a time");
319 // Extract an iterator over the next contiguous region of the shared memory
320 // buffer which will be used .
321 char* start = static_cast<char*>(link->mShmem->memory()) + link->mOffset;
322 char* iter = start;
323 char* end = start + std::min({aCount - *aProcessedCount, link->mAvailable,
324 link->mCapacity - link->mOffset});
326 // Record the consumed region from our segment when exiting this scope,
327 // telling our peer how many bytes were consumed. Hold on to `mLink` to keep
328 // the shmem mapped and make sure we can clean up even if we're closed while
329 // processing the shmem region.
330 link->mProcessingSegment = true;
331 auto scopeExit = MakeScopeExit([&] {
332 mMutex->AssertCurrentThreadOwns(); // should still be held
333 AssertSameMutex(link->mMutex);
335 MOZ_RELEASE_ASSERT(link->mProcessingSegment);
336 link->mProcessingSegment = false;
337 uint32_t totalProcessed = iter - start;
338 if (totalProcessed > 0) {
339 link->mOffset += totalProcessed;
340 MOZ_RELEASE_ASSERT(link->mOffset <= link->mCapacity);
341 if (link->mOffset == link->mCapacity) {
342 link->mOffset = 0;
344 link->mAvailable -= totalProcessed;
345 link->SendBytesConsumedOnUnlock(lock, totalProcessed);
347 MOZ_LOG(gDataPipeLog, LogLevel::Verbose,
348 ("Processed Segment(%u of %zu) %s", totalProcessed, end - start,
349 Describe(lock).get()));
353 MutexAutoUnlock unlock(*mMutex);
354 while (iter < end) {
355 uint32_t processed = 0;
356 Span segment{iter, end};
357 nsresult rv = aProcessSegment(segment, *aProcessedCount, &processed);
358 if (NS_FAILED(rv) || processed == 0) {
359 return NS_OK;
362 MOZ_RELEASE_ASSERT(processed <= segment.Length());
363 iter += processed;
364 *aProcessedCount += processed;
368 MOZ_DIAGNOSTIC_ASSERT(*aProcessedCount == aCount,
369 "Must have processed exactly aCount");
370 return NS_OK;
373 void DataPipeBase::AsyncWaitInternal(already_AddRefed<nsIRunnable> aCallback,
374 already_AddRefed<nsIEventTarget> aTarget,
375 bool aClosureOnly) {
376 RefPtr<nsIRunnable> callback = std::move(aCallback);
377 RefPtr<nsIEventTarget> target = std::move(aTarget);
379 DataPipeAutoLock lock(*mMutex);
380 MOZ_LOG(gDataPipeLog, LogLevel::Debug,
381 ("AsyncWait %s %p %s", aClosureOnly ? "(closure)" : "(ready)",
382 callback.get(), Describe(lock).get()));
384 if (NS_FAILED(CheckStatus(lock))) {
385 #ifdef DEBUG
386 if (mLink) {
387 AssertSameMutex(mLink->mMutex);
388 MOZ_ASSERT(!mLink->mCallback);
390 #endif
391 DoNotifyOnUnlock(lock, callback.forget(), target.forget());
392 return;
395 AssertSameMutex(mLink->mMutex);
397 // NOTE: After this point, `mLink` may have previously had a callback which is
398 // now being cancelled, make sure we clear `mCallback` even if we're going to
399 // call `aCallback` immediately.
400 mLink->mCallback = callback.forget();
401 mLink->mCallbackTarget = target.forget();
402 mLink->mCallbackClosureOnly = aClosureOnly;
403 if (!aClosureOnly && mLink->mAvailable) {
404 mLink->NotifyOnUnlock(lock);
408 nsresult DataPipeBase::CheckStatus(DataPipeAutoLock& aLock) {
409 // If our peer has closed or errored, we may need to close our local side to
410 // reflect the error code our peer provided. If we're a sender, we want to
411 // become closed immediately, whereas if we're a receiver we want to wait
412 // until our available buffer has been exhausted.
414 // NOTE: There may still be 2-stage writes/reads ongoing at this point, which
415 // will continue due to `mLink` being kept alive by the
416 // `ProcessSegmentsInternal` function.
417 if (NS_FAILED(mStatus)) {
418 return mStatus;
420 AssertSameMutex(mLink->mMutex);
421 if (NS_FAILED(mLink->mPeerStatus) &&
422 (!mLink->mReceiverSide || !mLink->mAvailable)) {
423 CloseInternal(aLock, mLink->mPeerStatus);
425 return mStatus;
428 nsCString DataPipeBase::Describe(DataPipeAutoLock& aLock) {
429 if (mLink) {
430 AssertSameMutex(mLink->mMutex);
431 return mLink->Describe(aLock);
433 return nsPrintfCString("[status=%s]", GetStaticErrorName(mStatus));
436 template <typename T>
437 void DataPipeWrite(IPC::MessageWriter* aWriter, T* aParam) {
438 DataPipeAutoLock lock(*aParam->mMutex);
439 MOZ_LOG(gDataPipeLog, LogLevel::Debug,
440 ("IPC Write: %s", aParam->Describe(lock).get()));
442 WriteParam(aWriter, aParam->mStatus);
443 if (NS_FAILED(aParam->mStatus)) {
444 return;
447 aParam->AssertSameMutex(aParam->mLink->mMutex);
448 MOZ_RELEASE_ASSERT(!aParam->mLink->mProcessingSegment,
449 "cannot transfer while processing a segment");
451 // Serialize relevant parameters to our peer.
452 WriteParam(aWriter, std::move(aParam->mLink->mPort));
453 WriteParam(aWriter, std::move(aParam->mLink->mShmemHandle));
454 WriteParam(aWriter, aParam->mLink->mCapacity);
455 WriteParam(aWriter, aParam->mLink->mPeerStatus);
456 WriteParam(aWriter, aParam->mLink->mOffset);
457 WriteParam(aWriter, aParam->mLink->mAvailable);
459 // Mark our peer as closed so we don't try to send to it when closing.
460 aParam->mLink->mPeerStatus = NS_ERROR_NOT_INITIALIZED;
461 aParam->CloseInternal(lock, NS_ERROR_NOT_INITIALIZED);
464 template <typename T>
465 bool DataPipeRead(IPC::MessageReader* aReader, RefPtr<T>* aResult) {
466 nsresult rv = NS_OK;
467 if (!ReadParam(aReader, &rv)) {
468 aReader->FatalError("failed to read DataPipe status");
469 return false;
471 if (NS_FAILED(rv)) {
472 *aResult = new T(rv);
473 MOZ_LOG(gDataPipeLog, LogLevel::Debug,
474 ("IPC Read: [status=%s]", GetStaticErrorName(rv)));
475 return true;
478 ScopedPort port;
479 if (!ReadParam(aReader, &port)) {
480 aReader->FatalError("failed to read DataPipe port");
481 return false;
483 SharedMemoryBasic::Handle shmemHandle;
484 if (!ReadParam(aReader, &shmemHandle)) {
485 aReader->FatalError("failed to read DataPipe shmem");
486 return false;
488 // Due to the awkward shared memory API provided by SharedMemoryBasic, we need
489 // to transfer ownership into the `shmem` here, then steal it back later in
490 // the function. Bug 1797039 tracks potential changes to the RawShmem API
491 // which could improve this situation.
492 RefPtr shmem = new SharedMemoryBasic();
493 if (!shmem->SetHandle(std::move(shmemHandle),
494 SharedMemory::RightsReadWrite)) {
495 aReader->FatalError("failed to create DataPipe shmem from handle");
496 return false;
498 uint32_t capacity = 0;
499 nsresult peerStatus = NS_OK;
500 uint32_t offset = 0;
501 uint32_t available = 0;
502 if (!ReadParam(aReader, &capacity) || !ReadParam(aReader, &peerStatus) ||
503 !ReadParam(aReader, &offset) || !ReadParam(aReader, &available)) {
504 aReader->FatalError("failed to read DataPipe fields");
505 return false;
507 if (!capacity || offset >= capacity || available > capacity) {
508 aReader->FatalError("received DataPipe state values are inconsistent");
509 return false;
511 if (!shmem->Map(SharedMemory::PageAlignedSize(capacity))) {
512 aReader->FatalError("failed to map DataPipe shared memory region");
513 return false;
516 *aResult = new T(std::move(port), shmem->TakeHandle(), shmem, capacity,
517 peerStatus, offset, available);
518 if (MOZ_LOG_TEST(gDataPipeLog, LogLevel::Debug)) {
519 DataPipeAutoLock lock(*(*aResult)->mMutex);
520 MOZ_LOG(gDataPipeLog, LogLevel::Debug,
521 ("IPC Read: %s", (*aResult)->Describe(lock).get()));
523 return true;
526 } // namespace data_pipe_detail
528 //-----------------------------------------------------------------------------
529 // DataPipeSender
530 //-----------------------------------------------------------------------------
532 NS_IMPL_ISUPPORTS(DataPipeSender, nsIOutputStream, nsIAsyncOutputStream,
533 DataPipeSender)
535 // nsIOutputStream
537 NS_IMETHODIMP DataPipeSender::Close() {
538 return CloseWithStatus(NS_BASE_STREAM_CLOSED);
541 NS_IMETHODIMP DataPipeSender::Flush() { return NS_OK; }
543 NS_IMETHODIMP DataPipeSender::StreamStatus() {
544 data_pipe_detail::DataPipeAutoLock lock(*mMutex);
545 return CheckStatus(lock);
548 NS_IMETHODIMP DataPipeSender::Write(const char* aBuf, uint32_t aCount,
549 uint32_t* aWriteCount) {
550 return WriteSegments(NS_CopyBufferToSegment, (void*)aBuf, aCount,
551 aWriteCount);
554 NS_IMETHODIMP DataPipeSender::WriteFrom(nsIInputStream* aFromStream,
555 uint32_t aCount,
556 uint32_t* aWriteCount) {
557 return WriteSegments(NS_CopyStreamToSegment, aFromStream, aCount,
558 aWriteCount);
561 NS_IMETHODIMP DataPipeSender::WriteSegments(nsReadSegmentFun aReader,
562 void* aClosure, uint32_t aCount,
563 uint32_t* aWriteCount) {
564 auto processSegment = [&](Span<char> aSpan, uint32_t aToOffset,
565 uint32_t* aReadCount) -> nsresult {
566 return aReader(this, aClosure, aSpan.data(), aToOffset, aSpan.Length(),
567 aReadCount);
569 return ProcessSegmentsInternal(aCount, processSegment, aWriteCount);
572 NS_IMETHODIMP DataPipeSender::IsNonBlocking(bool* _retval) {
573 *_retval = true;
574 return NS_OK;
577 // nsIAsyncOutputStream
579 NS_IMETHODIMP DataPipeSender::CloseWithStatus(nsresult reason) {
580 data_pipe_detail::DataPipeAutoLock lock(*mMutex);
581 CloseInternal(lock, reason);
582 return NS_OK;
585 NS_IMETHODIMP DataPipeSender::AsyncWait(nsIOutputStreamCallback* aCallback,
586 uint32_t aFlags,
587 uint32_t aRequestedCount,
588 nsIEventTarget* aTarget) {
589 AsyncWaitInternal(
590 aCallback ? NS_NewCancelableRunnableFunction(
591 "DataPipeReceiver::AsyncWait",
592 [self = RefPtr{this}, callback = RefPtr{aCallback}] {
593 MOZ_LOG(gDataPipeLog, LogLevel::Debug,
594 ("Calling OnOutputStreamReady(%p, %p)",
595 callback.get(), self.get()));
596 callback->OnOutputStreamReady(self);
598 : nullptr,
599 do_AddRef(aTarget), aFlags & WAIT_CLOSURE_ONLY);
600 return NS_OK;
603 //-----------------------------------------------------------------------------
604 // DataPipeReceiver
605 //-----------------------------------------------------------------------------
607 NS_IMPL_ISUPPORTS(DataPipeReceiver, nsIInputStream, nsIAsyncInputStream,
608 nsIIPCSerializableInputStream, DataPipeReceiver)
610 // nsIInputStream
612 NS_IMETHODIMP DataPipeReceiver::Close() {
613 return CloseWithStatus(NS_BASE_STREAM_CLOSED);
616 NS_IMETHODIMP DataPipeReceiver::Available(uint64_t* _retval) {
617 data_pipe_detail::DataPipeAutoLock lock(*mMutex);
618 nsresult rv = CheckStatus(lock);
619 if (NS_FAILED(rv)) {
620 return rv;
622 AssertSameMutex(mLink->mMutex);
623 *_retval = mLink->mAvailable;
624 return NS_OK;
627 NS_IMETHODIMP DataPipeReceiver::StreamStatus() {
628 data_pipe_detail::DataPipeAutoLock lock(*mMutex);
629 return CheckStatus(lock);
632 NS_IMETHODIMP DataPipeReceiver::Read(char* aBuf, uint32_t aCount,
633 uint32_t* aReadCount) {
634 return ReadSegments(NS_CopySegmentToBuffer, aBuf, aCount, aReadCount);
637 NS_IMETHODIMP DataPipeReceiver::ReadSegments(nsWriteSegmentFun aWriter,
638 void* aClosure, uint32_t aCount,
639 uint32_t* aReadCount) {
640 auto processSegment = [&](Span<char> aSpan, uint32_t aToOffset,
641 uint32_t* aWriteCount) -> nsresult {
642 return aWriter(this, aClosure, aSpan.data(), aToOffset, aSpan.Length(),
643 aWriteCount);
645 return ProcessSegmentsInternal(aCount, processSegment, aReadCount);
648 NS_IMETHODIMP DataPipeReceiver::IsNonBlocking(bool* _retval) {
649 *_retval = true;
650 return NS_OK;
653 // nsIAsyncInputStream
655 NS_IMETHODIMP DataPipeReceiver::CloseWithStatus(nsresult aStatus) {
656 data_pipe_detail::DataPipeAutoLock lock(*mMutex);
657 CloseInternal(lock, aStatus);
658 return NS_OK;
661 NS_IMETHODIMP DataPipeReceiver::AsyncWait(nsIInputStreamCallback* aCallback,
662 uint32_t aFlags,
663 uint32_t aRequestedCount,
664 nsIEventTarget* aTarget) {
665 AsyncWaitInternal(
666 aCallback ? NS_NewCancelableRunnableFunction(
667 "DataPipeReceiver::AsyncWait",
668 [self = RefPtr{this}, callback = RefPtr{aCallback}] {
669 MOZ_LOG(gDataPipeLog, LogLevel::Debug,
670 ("Calling OnInputStreamReady(%p, %p)",
671 callback.get(), self.get()));
672 callback->OnInputStreamReady(self);
674 : nullptr,
675 do_AddRef(aTarget), aFlags & WAIT_CLOSURE_ONLY);
676 return NS_OK;
679 // nsIIPCSerializableInputStream
681 void DataPipeReceiver::SerializedComplexity(uint32_t aMaxSize,
682 uint32_t* aSizeUsed,
683 uint32_t* aPipes,
684 uint32_t* aTransferables) {
685 // We report DataPipeReceiver as taking one transferrable to serialize, rather
686 // than one pipe, as we aren't starting a new pipe for this purpose, and are
687 // instead transferring an existing pipe.
688 *aTransferables = 1;
691 void DataPipeReceiver::Serialize(InputStreamParams& aParams, uint32_t aMaxSize,
692 uint32_t* aSizeUsed) {
693 *aSizeUsed = 0;
694 aParams = DataPipeReceiverStreamParams(WrapNotNull(this));
697 bool DataPipeReceiver::Deserialize(const InputStreamParams& aParams) {
698 MOZ_CRASH("Handled directly in `DeserializeInputStream`");
701 //-----------------------------------------------------------------------------
702 // NewDataPipe
703 //-----------------------------------------------------------------------------
705 nsresult NewDataPipe(uint32_t aCapacity, DataPipeSender** aSender,
706 DataPipeReceiver** aReceiver) {
707 if (!aCapacity) {
708 aCapacity = kDefaultDataPipeCapacity;
711 RefPtr<NodeController> controller = NodeController::GetSingleton();
712 if (!controller) {
713 return NS_ERROR_ILLEGAL_DURING_SHUTDOWN;
716 // Allocate a pair of ports for messaging between the sender & receiver.
717 auto [senderPort, receiverPort] = controller->CreatePortPair();
719 // Create and allocate the shared memory region.
720 auto shmem = MakeRefPtr<SharedMemoryBasic>();
721 size_t alignedCapacity = SharedMemory::PageAlignedSize(aCapacity);
722 if (!shmem->Create(alignedCapacity) || !shmem->Map(alignedCapacity)) {
723 return NS_ERROR_OUT_OF_MEMORY;
726 // We'll first clone then take the handle from the region so that the sender &
727 // receiver each have a handle. This avoids the need to duplicate the handle
728 // when serializing, when errors are non-recoverable.
729 SharedMemoryBasic::Handle senderShmemHandle = shmem->CloneHandle();
730 SharedMemoryBasic::Handle receiverShmemHandle = shmem->TakeHandle();
731 if (!senderShmemHandle || !receiverShmemHandle) {
732 return NS_ERROR_OUT_OF_MEMORY;
735 RefPtr sender =
736 new DataPipeSender(std::move(senderPort), std::move(senderShmemHandle),
737 shmem, aCapacity, NS_OK, 0, aCapacity);
738 RefPtr receiver = new DataPipeReceiver(std::move(receiverPort),
739 std::move(receiverShmemHandle), shmem,
740 aCapacity, NS_OK, 0, 0);
741 sender.forget(aSender);
742 receiver.forget(aReceiver);
743 return NS_OK;
746 } // namespace ipc
747 } // namespace mozilla
749 void IPC::ParamTraits<mozilla::ipc::DataPipeSender*>::Write(
750 MessageWriter* aWriter, mozilla::ipc::DataPipeSender* aParam) {
751 mozilla::ipc::data_pipe_detail::DataPipeWrite(aWriter, aParam);
754 bool IPC::ParamTraits<mozilla::ipc::DataPipeSender*>::Read(
755 MessageReader* aReader, RefPtr<mozilla::ipc::DataPipeSender>* aResult) {
756 return mozilla::ipc::data_pipe_detail::DataPipeRead(aReader, aResult);
759 void IPC::ParamTraits<mozilla::ipc::DataPipeReceiver*>::Write(
760 MessageWriter* aWriter, mozilla::ipc::DataPipeReceiver* aParam) {
761 mozilla::ipc::data_pipe_detail::DataPipeWrite(aWriter, aParam);
764 bool IPC::ParamTraits<mozilla::ipc::DataPipeReceiver*>::Read(
765 MessageReader* aReader, RefPtr<mozilla::ipc::DataPipeReceiver>* aResult) {
766 return mozilla::ipc::data_pipe_detail::DataPipeRead(aReader, aResult);