1 /* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
5 #include "CacheIOThread.h"
6 #include "CacheFileIOManager.h"
8 #include "CacheObserver.h"
10 #include "nsIRunnable.h"
11 #include "nsISupportsImpl.h"
12 #include "nsPrintfCString.h"
14 #include "nsThreadManager.h"
15 #include "nsThreadUtils.h"
16 #include "mozilla/EventQueue.h"
17 #include "mozilla/IOInterposer.h"
18 #include "mozilla/ProfilerLabels.h"
19 #include "mozilla/ThreadEventQueue.h"
20 #include "mozilla/Telemetry.h"
21 #include "mozilla/TelemetryHistogramEnums.h"
27 #ifdef MOZ_TASK_TRACER
28 # include "GeckoTaskTracer.h"
29 # include "TracedTaskCommon.h"
37 class CacheIOTelemetry
{
39 typedef CacheIOThread::EventQueue::size_type size_type
;
40 static size_type mMinLengthToReport
[CacheIOThread::LAST_LEVEL
];
41 static void Report(uint32_t aLevel
, size_type aLength
);
44 static CacheIOTelemetry::size_type
const kGranularity
= 30;
46 CacheIOTelemetry::size_type
47 CacheIOTelemetry::mMinLengthToReport
[CacheIOThread::LAST_LEVEL
] = {
48 kGranularity
, kGranularity
, kGranularity
, kGranularity
,
49 kGranularity
, kGranularity
, kGranularity
, kGranularity
};
52 void CacheIOTelemetry::Report(uint32_t aLevel
,
53 CacheIOTelemetry::size_type aLength
) {
54 if (mMinLengthToReport
[aLevel
] > aLength
) {
58 static Telemetry::HistogramID telemetryID
[] = {
59 Telemetry::HTTP_CACHE_IO_QUEUE_2_OPEN_PRIORITY
,
60 Telemetry::HTTP_CACHE_IO_QUEUE_2_READ_PRIORITY
,
61 Telemetry::HTTP_CACHE_IO_QUEUE_2_MANAGEMENT
,
62 Telemetry::HTTP_CACHE_IO_QUEUE_2_OPEN
,
63 Telemetry::HTTP_CACHE_IO_QUEUE_2_READ
,
64 Telemetry::HTTP_CACHE_IO_QUEUE_2_WRITE_PRIORITY
,
65 Telemetry::HTTP_CACHE_IO_QUEUE_2_WRITE
,
66 Telemetry::HTTP_CACHE_IO_QUEUE_2_INDEX
,
67 Telemetry::HTTP_CACHE_IO_QUEUE_2_EVICT
};
69 // Each bucket is a multiply of kGranularity (30, 60, 90..., 300+)
70 aLength
= (aLength
/ kGranularity
);
71 // Next time report only when over the current length + kGranularity
72 mMinLengthToReport
[aLevel
] = (aLength
+ 1) * kGranularity
;
74 // 10 is number of buckets we have in each probe
75 aLength
= std::min
<size_type
>(aLength
, 10);
77 Telemetry::Accumulate(telemetryID
[aLevel
], aLength
- 1); // counted from 0
85 * Helper class encapsulating platform-specific code to cancel
86 * any pending IO operation taking too long. Solely used during
87 * shutdown to prevent any IO shutdown hangs.
88 * Mainly designed for using Win32 CancelSynchronousIo function.
90 class BlockingIOWatcher
{
92 // The native handle to the thread
94 // Event signaling back to the main thread, see NotifyOperationDone.
99 // Created and destroyed on the main thread only
101 ~BlockingIOWatcher();
103 // Called on the IO thread to grab the platform specific
106 // If there is a blocking operation being handled on the IO
107 // thread, this is called on the main thread during shutdown.
108 // Waits for notification from the IO thread for up to two seconds.
109 // If that times out, it attempts to cancel the IO operation.
110 void WatchAndCancel(Monitor
& aMonitor
);
111 // Called by the IO thread after each operation has been
112 // finished (after each Run() call). This wakes the main
113 // thread up and makes WatchAndCancel() early exit and become
115 void NotifyOperationDone();
120 BlockingIOWatcher::BlockingIOWatcher() : mThread(NULL
), mEvent(NULL
) {
121 HMODULE kernel32_dll
= GetModuleHandleW(L
"kernel32.dll");
126 mEvent
= ::CreateEventW(NULL
, TRUE
, FALSE
, NULL
);
129 BlockingIOWatcher::~BlockingIOWatcher() {
134 CloseHandle(mThread
);
138 void BlockingIOWatcher::InitThread() {
139 // GetCurrentThread() only returns a pseudo handle, hence DuplicateHandle
140 ::DuplicateHandle(GetCurrentProcess(), GetCurrentThread(),
141 GetCurrentProcess(), &mThread
, 0, FALSE
,
142 DUPLICATE_SAME_ACCESS
);
145 void BlockingIOWatcher::WatchAndCancel(Monitor
& aMonitor
) {
150 // Reset before we enter the monitor to raise the chance we catch
151 // the currently pending IO op completion.
152 ::ResetEvent(mEvent
);
156 MonitorAutoLock
lock(aMonitor
);
164 LOG(("Blocking IO operation pending on IO thread, waiting..."));
166 // It seems wise to use the I/O lag time as a maximum time to wait
167 // for an operation to finish. When that times out and cancelation
168 // succeeds, there will be no other IO operation permitted. By default
169 // this is two seconds.
171 std::min
<uint32_t>(5, CacheObserver::MaxShutdownIOLag()) * 1000;
173 DWORD result
= ::WaitForSingleObject(mEvent
, maxLag
);
174 if (result
== WAIT_TIMEOUT
) {
175 LOG(("CacheIOThread: Attempting to cancel a long blocking IO operation"));
176 BOOL result
= ::CancelSynchronousIo(thread
);
178 LOG((" cancelation signal succeeded"));
180 DWORD error
= GetLastError();
181 LOG((" cancelation signal failed with GetLastError=%u", error
));
186 void BlockingIOWatcher::NotifyOperationDone() {
194 // Stub code only (we don't implement IO cancelation for this platform)
196 BlockingIOWatcher::BlockingIOWatcher() = default;
197 BlockingIOWatcher::~BlockingIOWatcher() = default;
198 void BlockingIOWatcher::InitThread() {}
199 void BlockingIOWatcher::WatchAndCancel(Monitor
&) {}
200 void BlockingIOWatcher::NotifyOperationDone() {}
204 } // namespace detail
206 CacheIOThread
* CacheIOThread::sSelf
= nullptr;
208 NS_IMPL_ISUPPORTS(CacheIOThread
, nsIThreadObserver
)
210 CacheIOThread::CacheIOThread()
211 : mMonitor("CacheIOThread"),
213 mXPCOMThread(nullptr),
214 mLowestLevelWaiting(LAST_LEVEL
),
215 mCurrentlyExecutingLevel(0),
216 mHasXPCOMEvents(false),
217 mRerunCurrentEvent(false),
219 mIOCancelableEvents(0),
226 for (auto& item
: mQueueLength
) {
233 CacheIOThread::~CacheIOThread() {
235 nsIThread
* thread
= mXPCOMThread
;
241 for (auto& event
: mEventQueue
) {
242 MOZ_ASSERT(!event
.Length());
247 nsresult
CacheIOThread::Init() {
249 MonitorAutoLock
lock(mMonitor
);
250 // Yeah, there is not a thread yet, but we want to make sure
251 // the sequencing is correct.
252 mBlockingIOWatcher
= MakeUnique
<detail::BlockingIOWatcher
>();
256 PR_CreateThread(PR_USER_THREAD
, ThreadFunc
, this, PR_PRIORITY_NORMAL
,
257 PR_GLOBAL_THREAD
, PR_JOINABLE_THREAD
, 128 * 1024);
259 return NS_ERROR_FAILURE
;
265 nsresult
CacheIOThread::Dispatch(nsIRunnable
* aRunnable
, uint32_t aLevel
) {
266 return Dispatch(do_AddRef(aRunnable
), aLevel
);
269 nsresult
CacheIOThread::Dispatch(already_AddRefed
<nsIRunnable
> aRunnable
,
271 NS_ENSURE_ARG(aLevel
< LAST_LEVEL
);
273 nsCOMPtr
<nsIRunnable
> runnable(aRunnable
);
275 // Runnable is always expected to be non-null, hard null-check bellow.
276 MOZ_ASSERT(runnable
);
278 MonitorAutoLock
lock(mMonitor
);
280 if (mShutdown
&& (PR_GetCurrentThread() != mThread
))
281 return NS_ERROR_UNEXPECTED
;
283 return DispatchInternal(runnable
.forget(), aLevel
);
286 nsresult
CacheIOThread::DispatchAfterPendingOpens(nsIRunnable
* aRunnable
) {
287 // Runnable is always expected to be non-null, hard null-check bellow.
288 MOZ_ASSERT(aRunnable
);
290 MonitorAutoLock
lock(mMonitor
);
292 if (mShutdown
&& (PR_GetCurrentThread() != mThread
))
293 return NS_ERROR_UNEXPECTED
;
295 // Move everything from later executed OPEN level to the OPEN_PRIORITY level
296 // where we post the (eviction) runnable.
297 mQueueLength
[OPEN_PRIORITY
] += mEventQueue
[OPEN
].Length();
298 mQueueLength
[OPEN
] -= mEventQueue
[OPEN
].Length();
299 mEventQueue
[OPEN_PRIORITY
].AppendElements(mEventQueue
[OPEN
]);
300 mEventQueue
[OPEN
].Clear();
302 return DispatchInternal(do_AddRef(aRunnable
), OPEN_PRIORITY
);
305 nsresult
CacheIOThread::DispatchInternal(
306 already_AddRefed
<nsIRunnable
> aRunnable
, uint32_t aLevel
) {
307 nsCOMPtr
<nsIRunnable
> runnable(aRunnable
);
308 #ifdef MOZ_TASK_TRACER
309 if (tasktracer::IsStartLogging()) {
310 runnable
= tasktracer::CreateTracedRunnable(runnable
.forget());
311 (static_cast<tasktracer::TracedRunnable
*>(runnable
.get()))->DispatchTask();
315 LogRunnable::LogDispatch(runnable
.get());
317 if (NS_WARN_IF(!runnable
)) return NS_ERROR_NULL_POINTER
;
319 mMonitor
.AssertCurrentThreadOwns();
321 ++mQueueLength
[aLevel
];
322 mEventQueue
[aLevel
].AppendElement(runnable
.forget());
323 if (mLowestLevelWaiting
> aLevel
) mLowestLevelWaiting
= aLevel
;
325 mMonitor
.NotifyAll();
330 bool CacheIOThread::IsCurrentThread() {
331 return mThread
== PR_GetCurrentThread();
334 uint32_t CacheIOThread::QueueSize(bool highPriority
) {
335 MonitorAutoLock
lock(mMonitor
);
337 return mQueueLength
[OPEN_PRIORITY
] + mQueueLength
[READ_PRIORITY
];
340 return mQueueLength
[OPEN_PRIORITY
] + mQueueLength
[READ_PRIORITY
] +
341 mQueueLength
[MANAGEMENT
] + mQueueLength
[OPEN
] + mQueueLength
[READ
];
344 bool CacheIOThread::YieldInternal() {
345 if (!IsCurrentThread()) {
347 "Trying to yield to priority events on non-cache2 I/O thread? "
348 "You probably do something wrong.");
352 if (mCurrentlyExecutingLevel
== XPCOM_LEVEL
) {
353 // Doesn't make any sense, since this handler is the one
354 // that would be executed as the next one.
358 if (!EventsPending(mCurrentlyExecutingLevel
)) return false;
360 mRerunCurrentEvent
= true;
364 void CacheIOThread::Shutdown() {
370 MonitorAutoLock
lock(mMonitor
);
372 mMonitor
.NotifyAll();
375 PR_JoinThread(mThread
);
379 void CacheIOThread::CancelBlockingIO() {
380 // This is an attempt to cancel any blocking I/O operation taking
382 if (!mBlockingIOWatcher
) {
386 if (!mIOCancelableEvents
) {
387 LOG(("CacheIOThread::CancelBlockingIO, no blocking operation to cancel"));
391 // OK, when we are here, we are processing an IO on the thread that
393 mBlockingIOWatcher
->WatchAndCancel(mMonitor
);
396 already_AddRefed
<nsIEventTarget
> CacheIOThread::Target() {
397 nsCOMPtr
<nsIEventTarget
> target
;
399 target
= mXPCOMThread
;
400 if (!target
&& mThread
) {
401 MonitorAutoLock
lock(mMonitor
);
402 while (!mXPCOMThread
) {
406 target
= mXPCOMThread
;
409 return target
.forget();
413 void CacheIOThread::ThreadFunc(void* aClosure
) {
414 // XXXmstange We'd like to register this thread with the profiler, but doing
415 // so causes leaks, see bug 1323100.
416 NS_SetCurrentThreadName("Cache2 I/O");
418 mozilla::IOInterposer::RegisterCurrentThread();
419 CacheIOThread
* thread
= static_cast<CacheIOThread
*>(aClosure
);
420 thread
->ThreadFunc();
421 mozilla::IOInterposer::UnregisterCurrentThread();
424 void CacheIOThread::ThreadFunc() {
425 nsCOMPtr
<nsIThreadInternal
> threadInternal
;
428 MonitorAutoLock
lock(mMonitor
);
430 MOZ_ASSERT(mBlockingIOWatcher
);
431 mBlockingIOWatcher
->InitThread();
434 MakeRefPtr
<ThreadEventQueue
>(MakeUnique
<mozilla::EventQueue
>());
435 nsCOMPtr
<nsIThread
> xpcomThread
=
436 nsThreadManager::get().CreateCurrentThread(queue
,
437 nsThread::NOT_MAIN_THREAD
);
439 threadInternal
= do_QueryInterface(xpcomThread
);
440 if (threadInternal
) threadInternal
->SetObserver(this);
442 mXPCOMThread
= xpcomThread
.forget().take();
448 // Reset the lowest level now, so that we can detect a new event on
449 // a lower level (i.e. higher priority) has been scheduled while
450 // executing any previously scheduled event.
451 mLowestLevelWaiting
= LAST_LEVEL
;
453 // Process xpcom events first
454 while (mHasXPCOMEvents
) {
455 mHasXPCOMEvents
= false;
456 mCurrentlyExecutingLevel
= XPCOM_LEVEL
;
458 MonitorAutoUnlock
unlock(mMonitor
);
463 nsIThread
* thread
= mXPCOMThread
;
464 rv
= thread
->ProcessNextEvent(false, &processedEvent
);
467 MOZ_ASSERT(mBlockingIOWatcher
);
468 mBlockingIOWatcher
->NotifyOperationDone();
469 } while (NS_SUCCEEDED(rv
) && processedEvent
);
473 for (level
= 0; level
< LAST_LEVEL
; ++level
) {
474 if (!mEventQueue
[level
].Length()) {
475 // no events on this level, go to the next level
481 // Go to the first (lowest) level again
485 if (EventsPending()) {
493 AUTO_PROFILER_LABEL("CacheIOThread::ThreadFunc::Wait", IDLE
);
498 MOZ_ASSERT(!EventsPending());
501 // This is for correct assertion on XPCOM events dispatch.
506 if (threadInternal
) threadInternal
->SetObserver(nullptr);
509 void CacheIOThread::LoopOneLevel(uint32_t aLevel
) {
510 EventQueue events
= std::move(mEventQueue
[aLevel
]);
511 EventQueue::size_type length
= events
.Length();
513 mCurrentlyExecutingLevel
= aLevel
;
515 bool returnEvents
= false;
516 bool reportTelemetry
= true;
518 EventQueue::size_type index
;
520 MonitorAutoUnlock
unlock(mMonitor
);
522 for (index
= 0; index
< length
; ++index
) {
523 if (EventsPending(aLevel
)) {
524 // Somebody scheduled a new event on a lower level, break and harry
525 // to execute it! Don't forget to return what we haven't exec.
530 if (reportTelemetry
) {
531 reportTelemetry
= false;
532 CacheIOTelemetry::Report(aLevel
, length
);
535 // Drop any previous flagging, only an event on the current level may set
537 mRerunCurrentEvent
= false;
539 LogRunnable::Run
log(events
[index
].get());
541 events
[index
]->Run();
543 MOZ_ASSERT(mBlockingIOWatcher
);
544 mBlockingIOWatcher
->NotifyOperationDone();
546 if (mRerunCurrentEvent
) {
547 // The event handler yields to higher priority events and wants to
555 --mQueueLength
[aLevel
];
557 // Release outside the lock.
558 events
[index
] = nullptr;
563 // This code must prevent any AddRef/Release calls on the stored COMPtrs as
564 // it might be exhaustive and block the monitor's lock for an excessive
567 // 'index' points at the event that was interrupted and asked for re-run,
568 // all events before have run, been nullified, and can be removed.
569 events
.RemoveElementsAt(0, index
);
570 // Move events that might have been scheduled on this queue to the tail to
571 // preserve the expected per-queue FIFO order.
572 // XXX(Bug 1631371) Check if this should use a fallible operation as it
573 // pretended earlier.
574 events
.AppendElements(std::move(mEventQueue
[aLevel
]));
575 // And finally move everything back to the main queue.
576 mEventQueue
[aLevel
] = std::move(events
);
580 bool CacheIOThread::EventsPending(uint32_t aLastLevel
) {
581 return mLowestLevelWaiting
< aLastLevel
|| mHasXPCOMEvents
;
584 NS_IMETHODIMP
CacheIOThread::OnDispatchedEvent() {
585 MonitorAutoLock
lock(mMonitor
);
586 mHasXPCOMEvents
= true;
587 MOZ_ASSERT(mInsideLoop
);
592 NS_IMETHODIMP
CacheIOThread::OnProcessNextEvent(nsIThreadInternal
* thread
,
597 NS_IMETHODIMP
CacheIOThread::AfterProcessNextEvent(nsIThreadInternal
* thread
,
598 bool eventWasProcessed
) {
604 size_t CacheIOThread::SizeOfExcludingThis(
605 mozilla::MallocSizeOf mallocSizeOf
) const {
606 MonitorAutoLock
lock(const_cast<CacheIOThread
*>(this)->mMonitor
);
609 for (const auto& event
: mEventQueue
) {
610 n
+= event
.ShallowSizeOfExcludingThis(mallocSizeOf
);
611 // Events referenced by the queues are arbitrary objects we cannot be sure
612 // are reported elsewhere as well as probably not implementing nsISizeOf
613 // interface. Deliberatly omitting them from reporting here.
619 size_t CacheIOThread::SizeOfIncludingThis(
620 mozilla::MallocSizeOf mallocSizeOf
) const {
621 return mallocSizeOf(this) + SizeOfExcludingThis(mallocSizeOf
);
624 CacheIOThread::Cancelable::Cancelable(bool aCancelable
)
625 : mCancelable(aCancelable
) {
626 // This will only ever be used on the I/O thread,
627 // which is expected to be alive longer than this class.
628 MOZ_ASSERT(CacheIOThread::sSelf
);
629 MOZ_ASSERT(CacheIOThread::sSelf
->IsCurrentThread());
632 ++CacheIOThread::sSelf
->mIOCancelableEvents
;
636 CacheIOThread::Cancelable::~Cancelable() {
637 MOZ_ASSERT(CacheIOThread::sSelf
);
640 --CacheIOThread::sSelf
->mIOCancelableEvents
;
645 } // namespace mozilla