1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* vim: set ts=8 sts=2 et sw=2 tw=80: */
3 /* This Source Code Form is subject to the terms of the Mozilla Public
4 * License, v. 2.0. If a copy of the MPL was not distributed with this
5 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
8 #include "mozilla/AlreadyAddRefed.h"
9 #include "mozilla/Assertions.h"
10 #include "mozilla/CheckedInt.h"
11 #include "mozilla/ErrorNames.h"
12 #include "mozilla/Logging.h"
13 #include "mozilla/MoveOnlyFunction.h"
14 #include "mozilla/ipc/InputStreamParams.h"
15 #include "nsIAsyncInputStream.h"
16 #include "nsStreamUtils.h"
17 #include "nsThreadUtils.h"
22 LazyLogModule
gDataPipeLog("DataPipe");
24 namespace data_pipe_detail
{
26 // Helper for queueing up actions to be run once the mutex has been unlocked.
27 // Actions will be run in-order.
28 class MOZ_SCOPED_CAPABILITY DataPipeAutoLock
{
30 explicit DataPipeAutoLock(Mutex
& aMutex
) MOZ_CAPABILITY_ACQUIRE(aMutex
)
34 DataPipeAutoLock(const DataPipeAutoLock
&) = delete;
35 DataPipeAutoLock
& operator=(const DataPipeAutoLock
&) = delete;
38 void AddUnlockAction(F aAction
) {
39 mActions
.AppendElement(std::move(aAction
));
42 ~DataPipeAutoLock() MOZ_CAPABILITY_RELEASE() {
44 for (auto& action
: mActions
) {
51 AutoTArray
<MoveOnlyFunction
<void()>, 4> mActions
;
54 static void DoNotifyOnUnlock(DataPipeAutoLock
& aLock
,
55 already_AddRefed
<nsIRunnable
> aCallback
,
56 already_AddRefed
<nsIEventTarget
> aTarget
) {
57 nsCOMPtr
<nsIRunnable
> callback
{std::move(aCallback
)};
58 nsCOMPtr
<nsIEventTarget
> target
{std::move(aTarget
)};
60 aLock
.AddUnlockAction(
61 [callback
= std::move(callback
), target
= std::move(target
)]() mutable {
63 target
->Dispatch(callback
.forget());
65 NS_DispatchBackgroundTask(callback
.forget());
71 class DataPipeLink
: public NodeController::PortObserver
{
73 DataPipeLink(bool aReceiverSide
, std::shared_ptr
<Mutex
> aMutex
,
74 ScopedPort aPort
, SharedMemoryBasic::Handle aShmemHandle
,
75 SharedMemory
* aShmem
, uint32_t aCapacity
, nsresult aPeerStatus
,
76 uint32_t aOffset
, uint32_t aAvailable
)
77 : mMutex(std::move(aMutex
)),
78 mPort(std::move(aPort
)),
79 mShmemHandle(std::move(aShmemHandle
)),
82 mReceiverSide(aReceiverSide
),
83 mPeerStatus(aPeerStatus
),
85 mAvailable(aAvailable
) {}
87 void Init() MOZ_EXCLUDES(*mMutex
) {
89 DataPipeAutoLock
lock(*mMutex
);
90 if (NS_FAILED(mPeerStatus
)) {
93 MOZ_ASSERT(mPort
.IsValid());
94 mPort
.Controller()->SetPortObserver(mPort
.Port(), this);
96 OnPortStatusChanged();
99 void OnPortStatusChanged() final
MOZ_EXCLUDES(*mMutex
);
101 // Add a task to notify the callback after `aLock` is unlocked.
103 // This method is safe to call multiple times, as after the first time it is
104 // called, `mCallback` will be cleared.
105 void NotifyOnUnlock(DataPipeAutoLock
& aLock
) MOZ_REQUIRES(*mMutex
) {
106 DoNotifyOnUnlock(aLock
, mCallback
.forget(), mCallbackTarget
.forget());
109 void SendBytesConsumedOnUnlock(DataPipeAutoLock
& aLock
, uint32_t aBytes
)
110 MOZ_REQUIRES(*mMutex
) {
111 MOZ_LOG(gDataPipeLog
, LogLevel::Verbose
,
112 ("SendOnUnlock CONSUMED(%u) %s", aBytes
, Describe(aLock
).get()));
113 if (NS_FAILED(mPeerStatus
)) {
117 // `mPort` may be destroyed by `SetPeerError` after the DataPipe is unlocked
118 // but before we send the message. The strong controller and port references
119 // will allow us to try to send the message anyway, and it will be safely
120 // dropped if the port has already been closed. CONSUMED messages are safe
121 // to deliver out-of-order, so we don't need to worry about ordering here.
122 aLock
.AddUnlockAction([controller
= RefPtr
{mPort
.Controller()},
123 port
= mPort
.Port(), aBytes
]() mutable {
124 auto message
= MakeUnique
<IPC::Message
>(
125 MSG_ROUTING_NONE
, DATA_PIPE_BYTES_CONSUMED_MESSAGE_TYPE
);
126 IPC::MessageWriter
writer(*message
);
127 WriteParam(&writer
, aBytes
);
128 controller
->SendUserMessage(port
, std::move(message
));
132 void SetPeerError(DataPipeAutoLock
& aLock
, nsresult aStatus
,
133 bool aSendClosed
= false) MOZ_REQUIRES(*mMutex
) {
134 MOZ_LOG(gDataPipeLog
, LogLevel::Debug
,
135 ("SetPeerError(%s%s) %s", GetStaticErrorName(aStatus
),
136 aSendClosed
? ", send" : "", Describe(aLock
).get()));
137 // The pipe was closed or errored. Clear the observer reference back
138 // to this type from the port layer, and ensure we notify waiters.
139 MOZ_ASSERT(NS_SUCCEEDED(mPeerStatus
));
140 mPeerStatus
= NS_SUCCEEDED(aStatus
) ? NS_BASE_STREAM_CLOSED
: aStatus
;
141 aLock
.AddUnlockAction([port
= std::move(mPort
), aStatus
, aSendClosed
] {
143 auto message
= MakeUnique
<IPC::Message
>(MSG_ROUTING_NONE
,
144 DATA_PIPE_CLOSED_MESSAGE_TYPE
);
145 IPC::MessageWriter
writer(*message
);
146 WriteParam(&writer
, aStatus
);
147 port
.Controller()->SendUserMessage(port
.Port(), std::move(message
));
149 // The `ScopedPort` being destroyed with this action will close it,
150 // clearing the observer reference from the ports layer.
152 NotifyOnUnlock(aLock
);
155 nsCString
Describe(DataPipeAutoLock
& aLock
) const MOZ_REQUIRES(*mMutex
) {
156 return nsPrintfCString(
157 "[%s(%p) c=%u e=%s o=%u a=%u, cb=%s]",
158 mReceiverSide
? "Receiver" : "Sender", this, mCapacity
,
159 GetStaticErrorName(mPeerStatus
), mOffset
, mAvailable
,
160 mCallback
? (mCallbackClosureOnly
? "clo" : "yes") : "no");
163 // This mutex is shared with the `DataPipeBase` which owns this
165 std::shared_ptr
<Mutex
> mMutex
;
167 ScopedPort mPort
MOZ_GUARDED_BY(*mMutex
);
168 SharedMemoryBasic::Handle mShmemHandle
MOZ_GUARDED_BY(*mMutex
);
169 const RefPtr
<SharedMemory
> mShmem
;
170 const uint32_t mCapacity
;
171 const bool mReceiverSide
;
173 bool mProcessingSegment
MOZ_GUARDED_BY(*mMutex
) = false;
175 nsresult mPeerStatus
MOZ_GUARDED_BY(*mMutex
) = NS_OK
;
176 uint32_t mOffset
MOZ_GUARDED_BY(*mMutex
) = 0;
177 uint32_t mAvailable
MOZ_GUARDED_BY(*mMutex
) = 0;
179 bool mCallbackClosureOnly
MOZ_GUARDED_BY(*mMutex
) = false;
180 nsCOMPtr
<nsIRunnable
> mCallback
MOZ_GUARDED_BY(*mMutex
);
181 nsCOMPtr
<nsIEventTarget
> mCallbackTarget
MOZ_GUARDED_BY(*mMutex
);
184 void DataPipeLink::OnPortStatusChanged() {
185 DataPipeAutoLock
lock(*mMutex
);
187 while (NS_SUCCEEDED(mPeerStatus
)) {
188 UniquePtr
<IPC::Message
> message
;
189 if (!mPort
.Controller()->GetMessage(mPort
.Port(), &message
)) {
190 SetPeerError(lock
, NS_BASE_STREAM_CLOSED
);
194 return; // no more messages
197 IPC::MessageReader
reader(*message
);
198 switch (message
->type()) {
199 case DATA_PIPE_CLOSED_MESSAGE_TYPE
: {
200 nsresult status
= NS_OK
;
201 if (!ReadParam(&reader
, &status
)) {
202 NS_WARNING("Unable to parse nsresult error from peer");
203 status
= NS_ERROR_UNEXPECTED
;
205 MOZ_LOG(gDataPipeLog
, LogLevel::Debug
,
206 ("Got CLOSED(%s) %s", GetStaticErrorName(status
),
207 Describe(lock
).get()));
208 SetPeerError(lock
, status
);
211 case DATA_PIPE_BYTES_CONSUMED_MESSAGE_TYPE
: {
212 uint32_t consumed
= 0;
213 if (!ReadParam(&reader
, &consumed
)) {
214 NS_WARNING("Unable to parse bytes consumed from peer");
215 SetPeerError(lock
, NS_ERROR_UNEXPECTED
);
219 MOZ_LOG(gDataPipeLog
, LogLevel::Verbose
,
220 ("Got CONSUMED(%u) %s", consumed
, Describe(lock
).get()));
221 auto newAvailable
= CheckedUint32
{mAvailable
} + consumed
;
222 if (!newAvailable
.isValid() || newAvailable
.value() > mCapacity
) {
223 NS_WARNING("Illegal bytes consumed message received from peer");
224 SetPeerError(lock
, NS_ERROR_UNEXPECTED
);
227 mAvailable
= newAvailable
.value();
228 if (!mCallbackClosureOnly
) {
229 NotifyOnUnlock(lock
);
234 NS_WARNING("Illegal message type received from peer");
235 SetPeerError(lock
, NS_ERROR_UNEXPECTED
);
242 DataPipeBase::DataPipeBase(bool aReceiverSide
, nsresult aError
)
243 : mMutex(std::make_shared
<Mutex
>(aReceiverSide
? "DataPipeReceiver"
244 : "DataPipeSender")),
245 mStatus(NS_SUCCEEDED(aError
) ? NS_BASE_STREAM_CLOSED
: aError
) {}
247 DataPipeBase::DataPipeBase(bool aReceiverSide
, ScopedPort aPort
,
248 SharedMemoryBasic::Handle aShmemHandle
,
249 SharedMemory
* aShmem
, uint32_t aCapacity
,
250 nsresult aPeerStatus
, uint32_t aOffset
,
252 : mMutex(std::make_shared
<Mutex
>(aReceiverSide
? "DataPipeReceiver"
253 : "DataPipeSender")),
255 mLink(new DataPipeLink(aReceiverSide
, mMutex
, std::move(aPort
),
256 std::move(aShmemHandle
), aShmem
, aCapacity
,
257 aPeerStatus
, aOffset
, aAvailable
)) {
261 DataPipeBase::~DataPipeBase() {
262 DataPipeAutoLock
lock(*mMutex
);
263 CloseInternal(lock
, NS_BASE_STREAM_CLOSED
);
266 void DataPipeBase::CloseInternal(DataPipeAutoLock
& aLock
, nsresult aStatus
) {
267 if (NS_FAILED(mStatus
)) {
272 gDataPipeLog
, LogLevel::Debug
,
273 ("Closing(%s) %s", GetStaticErrorName(aStatus
), Describe(aLock
).get()));
275 // Set our status to an errored status.
276 mStatus
= NS_SUCCEEDED(aStatus
) ? NS_BASE_STREAM_CLOSED
: aStatus
;
277 RefPtr
<DataPipeLink
> link
= mLink
.forget();
278 AssertSameMutex(link
->mMutex
);
279 link
->NotifyOnUnlock(aLock
);
281 // If our peer hasn't disappeared yet, clean up our connection to it.
282 if (NS_SUCCEEDED(link
->mPeerStatus
)) {
283 link
->SetPeerError(aLock
, mStatus
, /* aSendClosed */ true);
287 nsresult
DataPipeBase::ProcessSegmentsInternal(
288 uint32_t aCount
, ProcessSegmentFun aProcessSegment
,
289 uint32_t* aProcessedCount
) {
290 *aProcessedCount
= 0;
292 while (*aProcessedCount
< aCount
) {
293 DataPipeAutoLock
lock(*mMutex
);
294 mMutex
->AssertCurrentThreadOwns();
296 MOZ_LOG(gDataPipeLog
, LogLevel::Verbose
,
297 ("ProcessSegments(%u of %u) %s", *aProcessedCount
, aCount
,
298 Describe(lock
).get()));
300 nsresult status
= CheckStatus(lock
);
301 if (NS_FAILED(status
)) {
302 if (*aProcessedCount
> 0) {
305 return status
== NS_BASE_STREAM_CLOSED
? NS_OK
: status
;
308 RefPtr
<DataPipeLink
> link
= mLink
;
309 AssertSameMutex(link
->mMutex
);
310 if (!link
->mAvailable
) {
311 MOZ_DIAGNOSTIC_ASSERT(NS_SUCCEEDED(link
->mPeerStatus
),
312 "CheckStatus will have returned an error");
313 return *aProcessedCount
> 0 ? NS_OK
: NS_BASE_STREAM_WOULD_BLOCK
;
316 MOZ_RELEASE_ASSERT(!link
->mProcessingSegment
,
317 "Only one thread may be processing a segment at a time");
319 // Extract an iterator over the next contiguous region of the shared memory
320 // buffer which will be used .
321 char* start
= static_cast<char*>(link
->mShmem
->memory()) + link
->mOffset
;
323 char* end
= start
+ std::min({aCount
- *aProcessedCount
, link
->mAvailable
,
324 link
->mCapacity
- link
->mOffset
});
326 // Record the consumed region from our segment when exiting this scope,
327 // telling our peer how many bytes were consumed. Hold on to `mLink` to keep
328 // the shmem mapped and make sure we can clean up even if we're closed while
329 // processing the shmem region.
330 link
->mProcessingSegment
= true;
331 auto scopeExit
= MakeScopeExit([&] {
332 mMutex
->AssertCurrentThreadOwns(); // should still be held
333 AssertSameMutex(link
->mMutex
);
335 MOZ_RELEASE_ASSERT(link
->mProcessingSegment
);
336 link
->mProcessingSegment
= false;
337 uint32_t totalProcessed
= iter
- start
;
338 if (totalProcessed
> 0) {
339 link
->mOffset
+= totalProcessed
;
340 MOZ_RELEASE_ASSERT(link
->mOffset
<= link
->mCapacity
);
341 if (link
->mOffset
== link
->mCapacity
) {
344 link
->mAvailable
-= totalProcessed
;
345 link
->SendBytesConsumedOnUnlock(lock
, totalProcessed
);
347 MOZ_LOG(gDataPipeLog
, LogLevel::Verbose
,
348 ("Processed Segment(%u of %zu) %s", totalProcessed
, end
- start
,
349 Describe(lock
).get()));
353 MutexAutoUnlock
unlock(*mMutex
);
355 uint32_t processed
= 0;
356 Span segment
{iter
, end
};
357 nsresult rv
= aProcessSegment(segment
, *aProcessedCount
, &processed
);
358 if (NS_FAILED(rv
) || processed
== 0) {
362 MOZ_RELEASE_ASSERT(processed
<= segment
.Length());
364 *aProcessedCount
+= processed
;
368 MOZ_DIAGNOSTIC_ASSERT(*aProcessedCount
== aCount
,
369 "Must have processed exactly aCount");
373 void DataPipeBase::AsyncWaitInternal(already_AddRefed
<nsIRunnable
> aCallback
,
374 already_AddRefed
<nsIEventTarget
> aTarget
,
376 RefPtr
<nsIRunnable
> callback
= std::move(aCallback
);
377 RefPtr
<nsIEventTarget
> target
= std::move(aTarget
);
379 DataPipeAutoLock
lock(*mMutex
);
380 MOZ_LOG(gDataPipeLog
, LogLevel::Debug
,
381 ("AsyncWait %s %p %s", aClosureOnly
? "(closure)" : "(ready)",
382 callback
.get(), Describe(lock
).get()));
384 if (NS_FAILED(CheckStatus(lock
))) {
387 AssertSameMutex(mLink
->mMutex
);
388 MOZ_ASSERT(!mLink
->mCallback
);
391 DoNotifyOnUnlock(lock
, callback
.forget(), target
.forget());
395 AssertSameMutex(mLink
->mMutex
);
397 // NOTE: After this point, `mLink` may have previously had a callback which is
398 // now being cancelled, make sure we clear `mCallback` even if we're going to
399 // call `aCallback` immediately.
400 mLink
->mCallback
= callback
.forget();
401 mLink
->mCallbackTarget
= target
.forget();
402 mLink
->mCallbackClosureOnly
= aClosureOnly
;
403 if (!aClosureOnly
&& mLink
->mAvailable
) {
404 mLink
->NotifyOnUnlock(lock
);
408 nsresult
DataPipeBase::CheckStatus(DataPipeAutoLock
& aLock
) {
409 // If our peer has closed or errored, we may need to close our local side to
410 // reflect the error code our peer provided. If we're a sender, we want to
411 // become closed immediately, whereas if we're a receiver we want to wait
412 // until our available buffer has been exhausted.
414 // NOTE: There may still be 2-stage writes/reads ongoing at this point, which
415 // will continue due to `mLink` being kept alive by the
416 // `ProcessSegmentsInternal` function.
417 if (NS_FAILED(mStatus
)) {
420 AssertSameMutex(mLink
->mMutex
);
421 if (NS_FAILED(mLink
->mPeerStatus
) &&
422 (!mLink
->mReceiverSide
|| !mLink
->mAvailable
)) {
423 CloseInternal(aLock
, mLink
->mPeerStatus
);
428 nsCString
DataPipeBase::Describe(DataPipeAutoLock
& aLock
) {
430 AssertSameMutex(mLink
->mMutex
);
431 return mLink
->Describe(aLock
);
433 return nsPrintfCString("[status=%s]", GetStaticErrorName(mStatus
));
436 template <typename T
>
437 void DataPipeWrite(IPC::MessageWriter
* aWriter
, T
* aParam
) {
438 DataPipeAutoLock
lock(*aParam
->mMutex
);
439 MOZ_LOG(gDataPipeLog
, LogLevel::Debug
,
440 ("IPC Write: %s", aParam
->Describe(lock
).get()));
442 WriteParam(aWriter
, aParam
->mStatus
);
443 if (NS_FAILED(aParam
->mStatus
)) {
447 aParam
->AssertSameMutex(aParam
->mLink
->mMutex
);
448 MOZ_RELEASE_ASSERT(!aParam
->mLink
->mProcessingSegment
,
449 "cannot transfer while processing a segment");
451 // Serialize relevant parameters to our peer.
452 WriteParam(aWriter
, std::move(aParam
->mLink
->mPort
));
453 WriteParam(aWriter
, std::move(aParam
->mLink
->mShmemHandle
));
454 WriteParam(aWriter
, aParam
->mLink
->mCapacity
);
455 WriteParam(aWriter
, aParam
->mLink
->mPeerStatus
);
456 WriteParam(aWriter
, aParam
->mLink
->mOffset
);
457 WriteParam(aWriter
, aParam
->mLink
->mAvailable
);
459 // Mark our peer as closed so we don't try to send to it when closing.
460 aParam
->mLink
->mPeerStatus
= NS_ERROR_NOT_INITIALIZED
;
461 aParam
->CloseInternal(lock
, NS_ERROR_NOT_INITIALIZED
);
464 template <typename T
>
465 bool DataPipeRead(IPC::MessageReader
* aReader
, RefPtr
<T
>* aResult
) {
467 if (!ReadParam(aReader
, &rv
)) {
468 aReader
->FatalError("failed to read DataPipe status");
472 *aResult
= new T(rv
);
473 MOZ_LOG(gDataPipeLog
, LogLevel::Debug
,
474 ("IPC Read: [status=%s]", GetStaticErrorName(rv
)));
479 if (!ReadParam(aReader
, &port
)) {
480 aReader
->FatalError("failed to read DataPipe port");
483 SharedMemoryBasic::Handle shmemHandle
;
484 if (!ReadParam(aReader
, &shmemHandle
)) {
485 aReader
->FatalError("failed to read DataPipe shmem");
488 // Due to the awkward shared memory API provided by SharedMemoryBasic, we need
489 // to transfer ownership into the `shmem` here, then steal it back later in
490 // the function. Bug 1797039 tracks potential changes to the RawShmem API
491 // which could improve this situation.
492 RefPtr shmem
= new SharedMemoryBasic();
493 if (!shmem
->SetHandle(std::move(shmemHandle
),
494 SharedMemory::RightsReadWrite
)) {
495 aReader
->FatalError("failed to create DataPipe shmem from handle");
498 uint32_t capacity
= 0;
499 nsresult peerStatus
= NS_OK
;
501 uint32_t available
= 0;
502 if (!ReadParam(aReader
, &capacity
) || !ReadParam(aReader
, &peerStatus
) ||
503 !ReadParam(aReader
, &offset
) || !ReadParam(aReader
, &available
)) {
504 aReader
->FatalError("failed to read DataPipe fields");
507 if (!capacity
|| offset
>= capacity
|| available
> capacity
) {
508 aReader
->FatalError("received DataPipe state values are inconsistent");
511 if (!shmem
->Map(SharedMemory::PageAlignedSize(capacity
))) {
512 aReader
->FatalError("failed to map DataPipe shared memory region");
516 *aResult
= new T(std::move(port
), shmem
->TakeHandle(), shmem
, capacity
,
517 peerStatus
, offset
, available
);
518 if (MOZ_LOG_TEST(gDataPipeLog
, LogLevel::Debug
)) {
519 DataPipeAutoLock
lock(*(*aResult
)->mMutex
);
520 MOZ_LOG(gDataPipeLog
, LogLevel::Debug
,
521 ("IPC Read: %s", (*aResult
)->Describe(lock
).get()));
526 } // namespace data_pipe_detail
528 //-----------------------------------------------------------------------------
530 //-----------------------------------------------------------------------------
532 NS_IMPL_ISUPPORTS(DataPipeSender
, nsIOutputStream
, nsIAsyncOutputStream
,
537 NS_IMETHODIMP
DataPipeSender::Close() {
538 return CloseWithStatus(NS_BASE_STREAM_CLOSED
);
541 NS_IMETHODIMP
DataPipeSender::Flush() { return NS_OK
; }
543 NS_IMETHODIMP
DataPipeSender::StreamStatus() {
544 data_pipe_detail::DataPipeAutoLock
lock(*mMutex
);
545 return CheckStatus(lock
);
548 NS_IMETHODIMP
DataPipeSender::Write(const char* aBuf
, uint32_t aCount
,
549 uint32_t* aWriteCount
) {
550 return WriteSegments(NS_CopyBufferToSegment
, (void*)aBuf
, aCount
,
554 NS_IMETHODIMP
DataPipeSender::WriteFrom(nsIInputStream
* aFromStream
,
556 uint32_t* aWriteCount
) {
557 return WriteSegments(NS_CopyStreamToSegment
, aFromStream
, aCount
,
561 NS_IMETHODIMP
DataPipeSender::WriteSegments(nsReadSegmentFun aReader
,
562 void* aClosure
, uint32_t aCount
,
563 uint32_t* aWriteCount
) {
564 auto processSegment
= [&](Span
<char> aSpan
, uint32_t aToOffset
,
565 uint32_t* aReadCount
) -> nsresult
{
566 return aReader(this, aClosure
, aSpan
.data(), aToOffset
, aSpan
.Length(),
569 return ProcessSegmentsInternal(aCount
, processSegment
, aWriteCount
);
572 NS_IMETHODIMP
DataPipeSender::IsNonBlocking(bool* _retval
) {
577 // nsIAsyncOutputStream
579 NS_IMETHODIMP
DataPipeSender::CloseWithStatus(nsresult reason
) {
580 data_pipe_detail::DataPipeAutoLock
lock(*mMutex
);
581 CloseInternal(lock
, reason
);
585 NS_IMETHODIMP
DataPipeSender::AsyncWait(nsIOutputStreamCallback
* aCallback
,
587 uint32_t aRequestedCount
,
588 nsIEventTarget
* aTarget
) {
590 aCallback
? NS_NewCancelableRunnableFunction(
591 "DataPipeReceiver::AsyncWait",
592 [self
= RefPtr
{this}, callback
= RefPtr
{aCallback
}] {
593 MOZ_LOG(gDataPipeLog
, LogLevel::Debug
,
594 ("Calling OnOutputStreamReady(%p, %p)",
595 callback
.get(), self
.get()));
596 callback
->OnOutputStreamReady(self
);
599 do_AddRef(aTarget
), aFlags
& WAIT_CLOSURE_ONLY
);
603 //-----------------------------------------------------------------------------
605 //-----------------------------------------------------------------------------
607 NS_IMPL_ISUPPORTS(DataPipeReceiver
, nsIInputStream
, nsIAsyncInputStream
,
608 nsIIPCSerializableInputStream
, DataPipeReceiver
)
612 NS_IMETHODIMP
DataPipeReceiver::Close() {
613 return CloseWithStatus(NS_BASE_STREAM_CLOSED
);
616 NS_IMETHODIMP
DataPipeReceiver::Available(uint64_t* _retval
) {
617 data_pipe_detail::DataPipeAutoLock
lock(*mMutex
);
618 nsresult rv
= CheckStatus(lock
);
622 AssertSameMutex(mLink
->mMutex
);
623 *_retval
= mLink
->mAvailable
;
627 NS_IMETHODIMP
DataPipeReceiver::StreamStatus() {
628 data_pipe_detail::DataPipeAutoLock
lock(*mMutex
);
629 return CheckStatus(lock
);
632 NS_IMETHODIMP
DataPipeReceiver::Read(char* aBuf
, uint32_t aCount
,
633 uint32_t* aReadCount
) {
634 return ReadSegments(NS_CopySegmentToBuffer
, aBuf
, aCount
, aReadCount
);
637 NS_IMETHODIMP
DataPipeReceiver::ReadSegments(nsWriteSegmentFun aWriter
,
638 void* aClosure
, uint32_t aCount
,
639 uint32_t* aReadCount
) {
640 auto processSegment
= [&](Span
<char> aSpan
, uint32_t aToOffset
,
641 uint32_t* aWriteCount
) -> nsresult
{
642 return aWriter(this, aClosure
, aSpan
.data(), aToOffset
, aSpan
.Length(),
645 return ProcessSegmentsInternal(aCount
, processSegment
, aReadCount
);
648 NS_IMETHODIMP
DataPipeReceiver::IsNonBlocking(bool* _retval
) {
653 // nsIAsyncInputStream
655 NS_IMETHODIMP
DataPipeReceiver::CloseWithStatus(nsresult aStatus
) {
656 data_pipe_detail::DataPipeAutoLock
lock(*mMutex
);
657 CloseInternal(lock
, aStatus
);
661 NS_IMETHODIMP
DataPipeReceiver::AsyncWait(nsIInputStreamCallback
* aCallback
,
663 uint32_t aRequestedCount
,
664 nsIEventTarget
* aTarget
) {
666 aCallback
? NS_NewCancelableRunnableFunction(
667 "DataPipeReceiver::AsyncWait",
668 [self
= RefPtr
{this}, callback
= RefPtr
{aCallback
}] {
669 MOZ_LOG(gDataPipeLog
, LogLevel::Debug
,
670 ("Calling OnInputStreamReady(%p, %p)",
671 callback
.get(), self
.get()));
672 callback
->OnInputStreamReady(self
);
675 do_AddRef(aTarget
), aFlags
& WAIT_CLOSURE_ONLY
);
679 // nsIIPCSerializableInputStream
681 void DataPipeReceiver::SerializedComplexity(uint32_t aMaxSize
,
684 uint32_t* aTransferables
) {
685 // We report DataPipeReceiver as taking one transferrable to serialize, rather
686 // than one pipe, as we aren't starting a new pipe for this purpose, and are
687 // instead transferring an existing pipe.
691 void DataPipeReceiver::Serialize(InputStreamParams
& aParams
, uint32_t aMaxSize
,
692 uint32_t* aSizeUsed
) {
694 aParams
= DataPipeReceiverStreamParams(WrapNotNull(this));
697 bool DataPipeReceiver::Deserialize(const InputStreamParams
& aParams
) {
698 MOZ_CRASH("Handled directly in `DeserializeInputStream`");
701 //-----------------------------------------------------------------------------
703 //-----------------------------------------------------------------------------
705 nsresult
NewDataPipe(uint32_t aCapacity
, DataPipeSender
** aSender
,
706 DataPipeReceiver
** aReceiver
) {
708 aCapacity
= kDefaultDataPipeCapacity
;
711 RefPtr
<NodeController
> controller
= NodeController::GetSingleton();
713 return NS_ERROR_ILLEGAL_DURING_SHUTDOWN
;
716 // Allocate a pair of ports for messaging between the sender & receiver.
717 auto [senderPort
, receiverPort
] = controller
->CreatePortPair();
719 // Create and allocate the shared memory region.
720 auto shmem
= MakeRefPtr
<SharedMemoryBasic
>();
721 size_t alignedCapacity
= SharedMemory::PageAlignedSize(aCapacity
);
722 if (!shmem
->Create(alignedCapacity
) || !shmem
->Map(alignedCapacity
)) {
723 return NS_ERROR_OUT_OF_MEMORY
;
726 // We'll first clone then take the handle from the region so that the sender &
727 // receiver each have a handle. This avoids the need to duplicate the handle
728 // when serializing, when errors are non-recoverable.
729 SharedMemoryBasic::Handle senderShmemHandle
= shmem
->CloneHandle();
730 SharedMemoryBasic::Handle receiverShmemHandle
= shmem
->TakeHandle();
731 if (!senderShmemHandle
|| !receiverShmemHandle
) {
732 return NS_ERROR_OUT_OF_MEMORY
;
736 new DataPipeSender(std::move(senderPort
), std::move(senderShmemHandle
),
737 shmem
, aCapacity
, NS_OK
, 0, aCapacity
);
738 RefPtr receiver
= new DataPipeReceiver(std::move(receiverPort
),
739 std::move(receiverShmemHandle
), shmem
,
740 aCapacity
, NS_OK
, 0, 0);
741 sender
.forget(aSender
);
742 receiver
.forget(aReceiver
);
747 } // namespace mozilla
749 void IPC::ParamTraits
<mozilla::ipc::DataPipeSender
*>::Write(
750 MessageWriter
* aWriter
, mozilla::ipc::DataPipeSender
* aParam
) {
751 mozilla::ipc::data_pipe_detail::DataPipeWrite(aWriter
, aParam
);
754 bool IPC::ParamTraits
<mozilla::ipc::DataPipeSender
*>::Read(
755 MessageReader
* aReader
, RefPtr
<mozilla::ipc::DataPipeSender
>* aResult
) {
756 return mozilla::ipc::data_pipe_detail::DataPipeRead(aReader
, aResult
);
759 void IPC::ParamTraits
<mozilla::ipc::DataPipeReceiver
*>::Write(
760 MessageWriter
* aWriter
, mozilla::ipc::DataPipeReceiver
* aParam
) {
761 mozilla::ipc::data_pipe_detail::DataPipeWrite(aWriter
, aParam
);
764 bool IPC::ParamTraits
<mozilla::ipc::DataPipeReceiver
*>::Read(
765 MessageReader
* aReader
, RefPtr
<mozilla::ipc::DataPipeReceiver
>* aResult
) {
766 return mozilla::ipc::data_pipe_detail::DataPipeRead(aReader
, aResult
);