1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* vim: set ts=8 sts=2 et sw=2 tw=80: */
3 /* This Source Code Form is subject to the terms of the Mozilla Public
4 * License, v. 2.0. If a copy of the MPL was not distributed with this
5 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
7 #include "CanvasDrawEventRecorder.h"
11 #include "mozilla/layers/SharedSurfacesChild.h"
12 #include "nsThreadUtils.h"
17 static const uint32_t kMaxSpinCount
= 200;
19 static const TimeDuration kTimeout
= TimeDuration::FromMilliseconds(100);
20 static const int32_t kTimeoutRetryCount
= 50;
22 static const uint32_t kCacheLineSize
= 64;
23 static const uint32_t kSmallStreamSize
= 64 * 1024;
24 static const uint32_t kLargeStreamSize
= 2048 * 1024;
26 static_assert((static_cast<uint64_t>(UINT32_MAX
) + 1) % kSmallStreamSize
== 0,
27 "kSmallStreamSize must be a power of two.");
28 static_assert((static_cast<uint64_t>(UINT32_MAX
) + 1) % kLargeStreamSize
== 0,
29 "kLargeStreamSize must be a power of two.");
31 uint32_t CanvasEventRingBuffer::StreamSize() {
32 return mLargeStream
? kLargeStreamSize
: kSmallStreamSize
;
35 bool CanvasEventRingBuffer::InitBuffer(
36 base::ProcessId aOtherPid
, ipc::SharedMemoryBasic::Handle
* aReadHandle
) {
37 size_t shmemSize
= StreamSize() + (2 * kCacheLineSize
);
38 mSharedMemory
= MakeAndAddRef
<ipc::SharedMemoryBasic
>();
39 if (NS_WARN_IF(!mSharedMemory
->Create(shmemSize
)) ||
40 NS_WARN_IF(!mSharedMemory
->Map(shmemSize
))) {
45 *aReadHandle
= mSharedMemory
->TakeHandle();
46 if (NS_WARN_IF(!*aReadHandle
)) {
51 mBuf
= static_cast<char*>(mSharedMemory
->memory());
53 mAvailable
= StreamSize();
55 static_assert(sizeof(ReadFooter
) <= kCacheLineSize
,
56 "ReadFooter must fit in kCacheLineSize.");
57 mRead
= reinterpret_cast<ReadFooter
*>(mBuf
+ StreamSize());
59 mRead
->returnCount
= 0;
60 mRead
->state
= State::Processing
;
62 static_assert(sizeof(WriteFooter
) <= kCacheLineSize
,
63 "WriteFooter must fit in kCacheLineSize.");
64 mWrite
= reinterpret_cast<WriteFooter
*>(mBuf
+ StreamSize() + kCacheLineSize
);
66 mWrite
->returnCount
= 0;
67 mWrite
->requiredDifference
= 0;
68 mWrite
->state
= State::Processing
;
74 bool CanvasEventRingBuffer::InitWriter(
75 base::ProcessId aOtherPid
, ipc::SharedMemoryBasic::Handle
* aReadHandle
,
76 CrossProcessSemaphoreHandle
* aReaderSem
,
77 CrossProcessSemaphoreHandle
* aWriterSem
,
78 UniquePtr
<WriterServices
> aWriterServices
) {
79 if (!InitBuffer(aOtherPid
, aReadHandle
)) {
83 mReaderSemaphore
.reset(
84 CrossProcessSemaphore::Create("SharedMemoryStreamParent", 0));
85 *aReaderSem
= mReaderSemaphore
->CloneHandle();
86 mReaderSemaphore
->CloseHandle();
87 if (!IsHandleValid(*aReaderSem
)) {
90 mWriterSemaphore
.reset(
91 CrossProcessSemaphore::Create("SharedMemoryStreamChild", 0));
92 *aWriterSem
= mWriterSemaphore
->CloneHandle();
93 mWriterSemaphore
->CloseHandle();
94 if (!IsHandleValid(*aWriterSem
)) {
98 mWriterServices
= std::move(aWriterServices
);
104 bool CanvasEventRingBuffer::InitReader(
105 ipc::SharedMemoryBasic::Handle aReadHandle
,
106 CrossProcessSemaphoreHandle aReaderSem
,
107 CrossProcessSemaphoreHandle aWriterSem
,
108 UniquePtr
<ReaderServices
> aReaderServices
) {
109 if (!SetNewBuffer(std::move(aReadHandle
))) {
113 mReaderSemaphore
.reset(CrossProcessSemaphore::Create(std::move(aReaderSem
)));
114 mReaderSemaphore
->CloseHandle();
115 mWriterSemaphore
.reset(CrossProcessSemaphore::Create(std::move(aWriterSem
)));
116 mWriterSemaphore
->CloseHandle();
118 mReaderServices
= std::move(aReaderServices
);
124 bool CanvasEventRingBuffer::SetNewBuffer(
125 ipc::SharedMemoryBasic::Handle aReadHandle
) {
128 "Shared memory should have been dropped before new buffer is sent.");
130 size_t shmemSize
= StreamSize() + (2 * kCacheLineSize
);
131 mSharedMemory
= MakeAndAddRef
<ipc::SharedMemoryBasic
>();
132 if (NS_WARN_IF(!mSharedMemory
->SetHandle(
133 std::move(aReadHandle
), ipc::SharedMemory::RightsReadWrite
)) ||
134 NS_WARN_IF(!mSharedMemory
->Map(shmemSize
))) {
139 mSharedMemory
->CloseHandle();
141 mBuf
= static_cast<char*>(mSharedMemory
->memory());
142 mRead
= reinterpret_cast<ReadFooter
*>(mBuf
+ StreamSize());
143 mWrite
= reinterpret_cast<WriteFooter
*>(mBuf
+ StreamSize() + kCacheLineSize
);
148 bool CanvasEventRingBuffer::WaitForAndRecalculateAvailableSpace() {
153 uint32_t bufPos
= mOurCount
% StreamSize();
154 uint32_t maxToWrite
= StreamSize() - bufPos
;
155 mAvailable
= std::min(maxToWrite
, WaitForBytesToWrite());
161 mBufPos
= mBuf
+ bufPos
;
165 void CanvasEventRingBuffer::write(const char* const aData
, const size_t aSize
) {
166 const char* curDestPtr
= aData
;
167 size_t remainingToWrite
= aSize
;
168 if (remainingToWrite
> mAvailable
) {
169 if (!WaitForAndRecalculateAvailableSpace()) {
174 if (remainingToWrite
<= mAvailable
) {
175 memcpy(mBufPos
, curDestPtr
, remainingToWrite
);
176 UpdateWriteTotalsBy(remainingToWrite
);
181 memcpy(mBufPos
, curDestPtr
, mAvailable
);
182 IncrementWriteCountBy(mAvailable
);
183 curDestPtr
+= mAvailable
;
184 remainingToWrite
-= mAvailable
;
185 if (!WaitForAndRecalculateAvailableSpace()) {
188 } while (remainingToWrite
> mAvailable
);
190 memcpy(mBufPos
, curDestPtr
, remainingToWrite
);
191 UpdateWriteTotalsBy(remainingToWrite
);
194 void CanvasEventRingBuffer::IncrementWriteCountBy(uint32_t aCount
) {
196 mWrite
->count
= mOurCount
;
197 if (mRead
->state
!= State::Processing
) {
198 CheckAndSignalReader();
202 void CanvasEventRingBuffer::UpdateWriteTotalsBy(uint32_t aCount
) {
203 IncrementWriteCountBy(aCount
);
205 mAvailable
-= aCount
;
208 bool CanvasEventRingBuffer::WaitForAndRecalculateAvailableData() {
213 uint32_t bufPos
= mOurCount
% StreamSize();
214 uint32_t maxToRead
= StreamSize() - bufPos
;
215 mAvailable
= std::min(maxToRead
, WaitForBytesToRead());
222 mBufPos
= mBuf
+ bufPos
;
226 void CanvasEventRingBuffer::read(char* const aOut
, const size_t aSize
) {
227 char* curSrcPtr
= aOut
;
228 size_t remainingToRead
= aSize
;
229 if (remainingToRead
> mAvailable
) {
230 if (!WaitForAndRecalculateAvailableData()) {
235 if (remainingToRead
<= mAvailable
) {
236 memcpy(curSrcPtr
, mBufPos
, remainingToRead
);
237 UpdateReadTotalsBy(remainingToRead
);
242 memcpy(curSrcPtr
, mBufPos
, mAvailable
);
243 IncrementReadCountBy(mAvailable
);
244 curSrcPtr
+= mAvailable
;
245 remainingToRead
-= mAvailable
;
246 if (!WaitForAndRecalculateAvailableData()) {
249 } while (remainingToRead
> mAvailable
);
251 memcpy(curSrcPtr
, mBufPos
, remainingToRead
);
252 UpdateReadTotalsBy(remainingToRead
);
255 void CanvasEventRingBuffer::IncrementReadCountBy(uint32_t aCount
) {
257 mRead
->count
= mOurCount
;
258 if (mWrite
->state
!= State::Processing
) {
259 CheckAndSignalWriter();
263 void CanvasEventRingBuffer::UpdateReadTotalsBy(uint32_t aCount
) {
264 IncrementReadCountBy(aCount
);
266 mAvailable
-= aCount
;
269 void CanvasEventRingBuffer::CheckAndSignalReader() {
271 switch (mRead
->state
) {
272 case State::Processing
:
275 case State::AboutToWait
:
276 // The reader is making a decision about whether to wait. So, we must
277 // wait until it has decided to avoid races. Check if the reader is
278 // closed to avoid hangs.
279 if (mWriterServices
->ReaderClosed()) {
284 if (mRead
->count
!= mOurCount
) {
285 // We have to use compareExchange here because the reader can change
286 // from Waiting to Stopped.
287 if (mRead
->state
.compareExchange(State::Waiting
, State::Processing
)) {
288 mReaderSemaphore
->Signal();
292 MOZ_ASSERT(mRead
->state
== State::Stopped
);
297 if (mRead
->count
!= mOurCount
) {
298 mRead
->state
= State::Processing
;
299 mWriterServices
->ResumeReader();
303 MOZ_ASSERT_UNREACHABLE("Invalid waiting state.");
309 bool CanvasEventRingBuffer::HasDataToRead() {
310 return (mWrite
->count
!= mOurCount
);
313 bool CanvasEventRingBuffer::StopIfEmpty() {
314 // Double-check that the writer isn't waiting.
315 CheckAndSignalWriter();
316 mRead
->state
= State::AboutToWait
;
317 if (HasDataToRead()) {
318 mRead
->state
= State::Processing
;
322 mRead
->state
= State::Stopped
;
326 bool CanvasEventRingBuffer::WaitForDataToRead(TimeDuration aTimeout
,
327 int32_t aRetryCount
) {
328 uint32_t spinCount
= kMaxSpinCount
;
330 if (HasDataToRead()) {
333 } while (--spinCount
!= 0);
335 // Double-check that the writer isn't waiting.
336 CheckAndSignalWriter();
337 mRead
->state
= State::AboutToWait
;
338 if (HasDataToRead()) {
339 mRead
->state
= State::Processing
;
343 mRead
->state
= State::Waiting
;
345 if (mReaderSemaphore
->Wait(Some(aTimeout
))) {
346 MOZ_RELEASE_ASSERT(HasDataToRead());
350 if (mReaderServices
->WriterClosed()) {
351 // Something has gone wrong on the writing side, just return false so
352 // that we can hopefully recover.
355 } while (aRetryCount
-- > 0);
357 // We have to use compareExchange here because the writer can change our
358 // state if we are waiting. signaled
359 if (!mRead
->state
.compareExchange(State::Waiting
, State::Stopped
)) {
360 MOZ_RELEASE_ASSERT(HasDataToRead());
361 MOZ_RELEASE_ASSERT(mRead
->state
== State::Processing
);
362 // The writer has just signaled us, so consume it before returning
363 MOZ_ALWAYS_TRUE(mReaderSemaphore
->Wait());
370 uint8_t CanvasEventRingBuffer::ReadNextEvent() {
372 ReadElement(*this, nextEvent
);
373 while (nextEvent
== kCheckpointEventType
&& good()) {
374 ReadElement(*this, nextEvent
);
377 if (nextEvent
== kDropBufferEventType
) {
378 // Writer is switching to a different sized buffer.
384 mSharedMemory
= nullptr;
385 // We always toggle between smaller and larger stream sizes.
386 mLargeStream
= !mLargeStream
;
392 uint32_t CanvasEventRingBuffer::CreateCheckpoint() {
393 WriteElement(*this, kCheckpointEventType
);
397 bool CanvasEventRingBuffer::WaitForCheckpoint(uint32_t aCheckpoint
) {
398 return WaitForReadCount(aCheckpoint
, kTimeout
);
401 bool CanvasEventRingBuffer::SwitchBuffer(
402 base::ProcessId aOtherPid
, ipc::SharedMemoryBasic::Handle
* aReadHandle
) {
403 WriteElement(*this, kDropBufferEventType
);
405 // Make sure the drop buffer event has been read before continuing. We can't
406 // write an actual checkpoint because there will be no buffer to read from.
407 if (!WaitForCheckpoint(mOurCount
)) {
416 mSharedMemory
= nullptr;
417 // We always toggle between smaller and larger stream sizes.
418 mLargeStream
= !mLargeStream
;
419 return InitBuffer(aOtherPid
, aReadHandle
);
422 void CanvasEventRingBuffer::CheckAndSignalWriter() {
424 switch (mWrite
->state
) {
425 case State::Processing
:
427 case State::AboutToWait
:
428 // The writer is making a decision about whether to wait. So, we must
429 // wait until it has decided to avoid races. Check if the writer is
430 // closed to avoid hangs.
431 if (mReaderServices
->WriterClosed()) {
436 if (mWrite
->count
- mOurCount
<= mWrite
->requiredDifference
) {
437 mWrite
->state
= State::Processing
;
438 mWriterSemaphore
->Signal();
442 MOZ_ASSERT_UNREACHABLE("Invalid waiting state.");
448 bool CanvasEventRingBuffer::WaitForReadCount(uint32_t aReadCount
,
449 TimeDuration aTimeout
) {
450 uint32_t requiredDifference
= mOurCount
- aReadCount
;
451 uint32_t spinCount
= kMaxSpinCount
;
453 if (mOurCount
- mRead
->count
<= requiredDifference
) {
456 } while (--spinCount
!= 0);
458 // Double-check that the reader isn't waiting.
459 CheckAndSignalReader();
460 mWrite
->state
= State::AboutToWait
;
461 if (mOurCount
- mRead
->count
<= requiredDifference
) {
462 mWrite
->state
= State::Processing
;
466 mWrite
->requiredDifference
= requiredDifference
;
467 mWrite
->state
= State::Waiting
;
469 // Wait unless we detect the reading side has closed.
470 while (!mWriterServices
->ReaderClosed() && mRead
->state
!= State::Failed
) {
471 if (mWriterSemaphore
->Wait(Some(aTimeout
))) {
472 MOZ_ASSERT(mOurCount
- mRead
->count
<= requiredDifference
);
477 // Either the reader has failed or we're stopping writing for some other
478 // reason (e.g. shutdown), so mark us as failed so the reader is aware.
479 mWrite
->state
= State::Failed
;
484 uint32_t CanvasEventRingBuffer::WaitForBytesToWrite() {
485 uint32_t streamFullReadCount
= mOurCount
- StreamSize();
486 if (!WaitForReadCount(streamFullReadCount
+ 1, kTimeout
)) {
490 return mRead
->count
- streamFullReadCount
;
493 uint32_t CanvasEventRingBuffer::WaitForBytesToRead() {
494 if (!WaitForDataToRead(kTimeout
, kTimeoutRetryCount
)) {
498 return mWrite
->count
- mOurCount
;
501 void CanvasEventRingBuffer::ReturnWrite(const char* aData
, size_t aSize
) {
502 uint32_t writeCount
= mRead
->returnCount
;
503 uint32_t bufPos
= writeCount
% StreamSize();
504 uint32_t bufRemaining
= StreamSize() - bufPos
;
505 uint32_t availableToWrite
=
506 std::min(bufRemaining
, (mWrite
->returnCount
+ StreamSize() - writeCount
));
507 while (availableToWrite
< aSize
) {
508 if (availableToWrite
) {
509 memcpy(mBuf
+ bufPos
, aData
, availableToWrite
);
510 writeCount
+= availableToWrite
;
511 mRead
->returnCount
= writeCount
;
512 bufPos
= writeCount
% StreamSize();
513 bufRemaining
= StreamSize() - bufPos
;
514 aData
+= availableToWrite
;
515 aSize
-= availableToWrite
;
516 } else if (mReaderServices
->WriterClosed()) {
520 availableToWrite
= std::min(
521 bufRemaining
, (mWrite
->returnCount
+ StreamSize() - writeCount
));
524 memcpy(mBuf
+ bufPos
, aData
, aSize
);
526 mRead
->returnCount
= writeCount
;
529 void CanvasEventRingBuffer::ReturnRead(char* aOut
, size_t aSize
) {
530 // First wait for the event returning the data to be read.
531 WaitForCheckpoint(mOurCount
);
532 uint32_t readCount
= mWrite
->returnCount
;
534 // If the event sending back data fails to play then it will ReturnWrite
535 // nothing. So, wait until something has been written or the reader has
536 // stopped processing.
537 while (readCount
== mRead
->returnCount
) {
538 // We recheck the count, because the other side can write all the data and
539 // started waiting in between these two lines.
540 if (mRead
->state
!= State::Processing
&& readCount
== mRead
->returnCount
) {
545 uint32_t bufPos
= readCount
% StreamSize();
546 uint32_t bufRemaining
= StreamSize() - bufPos
;
547 uint32_t availableToRead
=
548 std::min(bufRemaining
, (mRead
->returnCount
- readCount
));
549 while (availableToRead
< aSize
) {
550 if (availableToRead
) {
551 memcpy(aOut
, mBuf
+ bufPos
, availableToRead
);
552 readCount
+= availableToRead
;
553 mWrite
->returnCount
= readCount
;
554 bufPos
= readCount
% StreamSize();
555 bufRemaining
= StreamSize() - bufPos
;
556 aOut
+= availableToRead
;
557 aSize
-= availableToRead
;
558 } else if (mWriterServices
->ReaderClosed()) {
562 availableToRead
= std::min(bufRemaining
, (mRead
->returnCount
- readCount
));
565 memcpy(aOut
, mBuf
+ bufPos
, aSize
);
567 mWrite
->returnCount
= readCount
;
570 void CanvasDrawEventRecorder::StoreSourceSurfaceRecording(
571 gfx::SourceSurface
* aSurface
, const char* aReason
) {
572 wr::ExternalImageId extId
{};
573 nsresult rv
= layers::SharedSurfacesChild::Share(aSurface
, extId
);
575 DrawEventRecorderPrivate::StoreSourceSurfaceRecording(aSurface
, aReason
);
579 StoreExternalSurfaceRecording(aSurface
, wr::AsUint64(extId
));
582 } // namespace layers
583 } // namespace mozilla