1 /* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
5 #include "CacheIOThread.h"
6 #include "CacheFileIOManager.h"
8 #include "nsIRunnable.h"
9 #include "nsISupportsImpl.h"
10 #include "nsPrintfCString.h"
12 #include "nsThreadManager.h"
13 #include "nsThreadUtils.h"
14 #include "mozilla/EventQueue.h"
15 #include "mozilla/IOInterposer.h"
16 #include "mozilla/ThreadEventQueue.h"
17 #include "GeckoProfiler.h"
23 #ifdef MOZ_TASK_TRACER
24 # include "GeckoTaskTracer.h"
25 # include "TracedTaskCommon.h"
33 class CacheIOTelemetry
{
35 typedef CacheIOThread::EventQueue::size_type size_type
;
36 static size_type mMinLengthToReport
[CacheIOThread::LAST_LEVEL
];
37 static void Report(uint32_t aLevel
, size_type aLength
);
40 static CacheIOTelemetry::size_type
const kGranularity
= 30;
42 CacheIOTelemetry::size_type
43 CacheIOTelemetry::mMinLengthToReport
[CacheIOThread::LAST_LEVEL
] = {
44 kGranularity
, kGranularity
, kGranularity
, kGranularity
,
45 kGranularity
, kGranularity
, kGranularity
, kGranularity
};
48 void CacheIOTelemetry::Report(uint32_t aLevel
,
49 CacheIOTelemetry::size_type aLength
) {
50 if (mMinLengthToReport
[aLevel
] > aLength
) {
54 static Telemetry::HistogramID telemetryID
[] = {
55 Telemetry::HTTP_CACHE_IO_QUEUE_2_OPEN_PRIORITY
,
56 Telemetry::HTTP_CACHE_IO_QUEUE_2_READ_PRIORITY
,
57 Telemetry::HTTP_CACHE_IO_QUEUE_2_MANAGEMENT
,
58 Telemetry::HTTP_CACHE_IO_QUEUE_2_OPEN
,
59 Telemetry::HTTP_CACHE_IO_QUEUE_2_READ
,
60 Telemetry::HTTP_CACHE_IO_QUEUE_2_WRITE_PRIORITY
,
61 Telemetry::HTTP_CACHE_IO_QUEUE_2_WRITE
,
62 Telemetry::HTTP_CACHE_IO_QUEUE_2_INDEX
,
63 Telemetry::HTTP_CACHE_IO_QUEUE_2_EVICT
};
65 // Each bucket is a multiply of kGranularity (30, 60, 90..., 300+)
66 aLength
= (aLength
/ kGranularity
);
67 // Next time report only when over the current length + kGranularity
68 mMinLengthToReport
[aLevel
] = (aLength
+ 1) * kGranularity
;
70 // 10 is number of buckets we have in each probe
71 aLength
= std::min
<size_type
>(aLength
, 10);
73 Telemetry::Accumulate(telemetryID
[aLevel
], aLength
- 1); // counted from 0
81 * Helper class encapsulating platform-specific code to cancel
82 * any pending IO operation taking too long. Solely used during
83 * shutdown to prevent any IO shutdown hangs.
84 * Mainly designed for using Win32 CancelSynchronousIo function.
86 class BlockingIOWatcher
{
88 // The native handle to the thread
90 // Event signaling back to the main thread, see NotifyOperationDone.
95 // Created and destroyed on the main thread only
99 // Called on the IO thread to grab the platform specific
102 // If there is a blocking operation being handled on the IO
103 // thread, this is called on the main thread during shutdown.
104 // Waits for notification from the IO thread for up to two seconds.
105 // If that times out, it attempts to cancel the IO operation.
106 void WatchAndCancel(Monitor
& aMonitor
);
107 // Called by the IO thread after each operation has been
108 // finished (after each Run() call). This wakes the main
109 // thread up and makes WatchAndCancel() early exit and become
111 void NotifyOperationDone();
116 BlockingIOWatcher::BlockingIOWatcher() : mThread(NULL
), mEvent(NULL
) {
117 HMODULE kernel32_dll
= GetModuleHandle("kernel32.dll");
122 mEvent
= ::CreateEventW(NULL
, TRUE
, FALSE
, NULL
);
125 BlockingIOWatcher::~BlockingIOWatcher() {
130 CloseHandle(mThread
);
134 void BlockingIOWatcher::InitThread() {
135 // GetCurrentThread() only returns a pseudo handle, hence DuplicateHandle
136 ::DuplicateHandle(GetCurrentProcess(), GetCurrentThread(),
137 GetCurrentProcess(), &mThread
, 0, FALSE
,
138 DUPLICATE_SAME_ACCESS
);
141 void BlockingIOWatcher::WatchAndCancel(Monitor
& aMonitor
) {
146 // Reset before we enter the monitor to raise the chance we catch
147 // the currently pending IO op completion.
148 ::ResetEvent(mEvent
);
152 MonitorAutoLock
lock(aMonitor
);
160 LOG(("Blocking IO operation pending on IO thread, waiting..."));
162 // It seems wise to use the I/O lag time as a maximum time to wait
163 // for an operation to finish. When that times out and cancelation
164 // succeeds, there will be no other IO operation permitted. By default
165 // this is two seconds.
167 std::min
<uint32_t>(5, CacheObserver::MaxShutdownIOLag()) * 1000;
169 DWORD result
= ::WaitForSingleObject(mEvent
, maxLag
);
170 if (result
== WAIT_TIMEOUT
) {
171 LOG(("CacheIOThread: Attempting to cancel a long blocking IO operation"));
172 BOOL result
= ::CancelSynchronousIo(thread
);
174 LOG((" cancelation signal succeeded"));
176 DWORD error
= GetLastError();
177 LOG((" cancelation signal failed with GetLastError=%u", error
));
182 void BlockingIOWatcher::NotifyOperationDone() {
190 // Stub code only (we don't implement IO cancelation for this platform)
192 BlockingIOWatcher::BlockingIOWatcher() = default;
193 BlockingIOWatcher::~BlockingIOWatcher() = default;
194 void BlockingIOWatcher::InitThread() {}
195 void BlockingIOWatcher::WatchAndCancel(Monitor
&) {}
196 void BlockingIOWatcher::NotifyOperationDone() {}
200 } // namespace detail
202 CacheIOThread
* CacheIOThread::sSelf
= nullptr;
204 NS_IMPL_ISUPPORTS(CacheIOThread
, nsIThreadObserver
)
206 CacheIOThread::CacheIOThread()
207 : mMonitor("CacheIOThread"),
209 mXPCOMThread(nullptr),
210 mLowestLevelWaiting(LAST_LEVEL
),
211 mCurrentlyExecutingLevel(0),
212 mHasXPCOMEvents(false),
213 mRerunCurrentEvent(false),
215 mIOCancelableEvents(0),
222 for (auto& item
: mQueueLength
) {
229 CacheIOThread::~CacheIOThread() {
231 nsIThread
* thread
= mXPCOMThread
;
237 for (auto& event
: mEventQueue
) {
238 MOZ_ASSERT(!event
.Length());
243 nsresult
CacheIOThread::Init() {
245 MonitorAutoLock
lock(mMonitor
);
246 // Yeah, there is not a thread yet, but we want to make sure
247 // the sequencing is correct.
248 mBlockingIOWatcher
= MakeUnique
<detail::BlockingIOWatcher
>();
252 PR_CreateThread(PR_USER_THREAD
, ThreadFunc
, this, PR_PRIORITY_NORMAL
,
253 PR_GLOBAL_THREAD
, PR_JOINABLE_THREAD
, 128 * 1024);
255 return NS_ERROR_FAILURE
;
261 nsresult
CacheIOThread::Dispatch(nsIRunnable
* aRunnable
, uint32_t aLevel
) {
262 return Dispatch(do_AddRef(aRunnable
), aLevel
);
265 nsresult
CacheIOThread::Dispatch(already_AddRefed
<nsIRunnable
> aRunnable
,
267 NS_ENSURE_ARG(aLevel
< LAST_LEVEL
);
269 nsCOMPtr
<nsIRunnable
> runnable(aRunnable
);
271 // Runnable is always expected to be non-null, hard null-check bellow.
272 MOZ_ASSERT(runnable
);
274 MonitorAutoLock
lock(mMonitor
);
276 if (mShutdown
&& (PR_GetCurrentThread() != mThread
))
277 return NS_ERROR_UNEXPECTED
;
279 return DispatchInternal(runnable
.forget(), aLevel
);
282 nsresult
CacheIOThread::DispatchAfterPendingOpens(nsIRunnable
* aRunnable
) {
283 // Runnable is always expected to be non-null, hard null-check bellow.
284 MOZ_ASSERT(aRunnable
);
286 MonitorAutoLock
lock(mMonitor
);
288 if (mShutdown
&& (PR_GetCurrentThread() != mThread
))
289 return NS_ERROR_UNEXPECTED
;
291 // Move everything from later executed OPEN level to the OPEN_PRIORITY level
292 // where we post the (eviction) runnable.
293 mQueueLength
[OPEN_PRIORITY
] += mEventQueue
[OPEN
].Length();
294 mQueueLength
[OPEN
] -= mEventQueue
[OPEN
].Length();
295 mEventQueue
[OPEN_PRIORITY
].AppendElements(mEventQueue
[OPEN
]);
296 mEventQueue
[OPEN
].Clear();
298 return DispatchInternal(do_AddRef(aRunnable
), OPEN_PRIORITY
);
301 nsresult
CacheIOThread::DispatchInternal(
302 already_AddRefed
<nsIRunnable
> aRunnable
, uint32_t aLevel
) {
303 nsCOMPtr
<nsIRunnable
> runnable(aRunnable
);
304 #ifdef MOZ_TASK_TRACER
305 if (tasktracer::IsStartLogging()) {
306 runnable
= tasktracer::CreateTracedRunnable(runnable
.forget());
307 (static_cast<tasktracer::TracedRunnable
*>(runnable
.get()))->DispatchTask();
311 LogRunnable::LogDispatch(runnable
.get());
313 if (NS_WARN_IF(!runnable
)) return NS_ERROR_NULL_POINTER
;
315 mMonitor
.AssertCurrentThreadOwns();
317 ++mQueueLength
[aLevel
];
318 mEventQueue
[aLevel
].AppendElement(runnable
.forget());
319 if (mLowestLevelWaiting
> aLevel
) mLowestLevelWaiting
= aLevel
;
321 mMonitor
.NotifyAll();
326 bool CacheIOThread::IsCurrentThread() {
327 return mThread
== PR_GetCurrentThread();
330 uint32_t CacheIOThread::QueueSize(bool highPriority
) {
331 MonitorAutoLock
lock(mMonitor
);
333 return mQueueLength
[OPEN_PRIORITY
] + mQueueLength
[READ_PRIORITY
];
336 return mQueueLength
[OPEN_PRIORITY
] + mQueueLength
[READ_PRIORITY
] +
337 mQueueLength
[MANAGEMENT
] + mQueueLength
[OPEN
] + mQueueLength
[READ
];
340 bool CacheIOThread::YieldInternal() {
341 if (!IsCurrentThread()) {
343 "Trying to yield to priority events on non-cache2 I/O thread? "
344 "You probably do something wrong.");
348 if (mCurrentlyExecutingLevel
== XPCOM_LEVEL
) {
349 // Doesn't make any sense, since this handler is the one
350 // that would be executed as the next one.
354 if (!EventsPending(mCurrentlyExecutingLevel
)) return false;
356 mRerunCurrentEvent
= true;
360 void CacheIOThread::Shutdown() {
366 MonitorAutoLock
lock(mMonitor
);
368 mMonitor
.NotifyAll();
371 PR_JoinThread(mThread
);
375 void CacheIOThread::CancelBlockingIO() {
376 // This is an attempt to cancel any blocking I/O operation taking
378 if (!mBlockingIOWatcher
) {
382 if (!mIOCancelableEvents
) {
383 LOG(("CacheIOThread::CancelBlockingIO, no blocking operation to cancel"));
387 // OK, when we are here, we are processing an IO on the thread that
389 mBlockingIOWatcher
->WatchAndCancel(mMonitor
);
392 already_AddRefed
<nsIEventTarget
> CacheIOThread::Target() {
393 nsCOMPtr
<nsIEventTarget
> target
;
395 target
= mXPCOMThread
;
396 if (!target
&& mThread
) {
397 MonitorAutoLock
lock(mMonitor
);
398 while (!mXPCOMThread
) {
402 target
= mXPCOMThread
;
405 return target
.forget();
409 void CacheIOThread::ThreadFunc(void* aClosure
) {
410 // XXXmstange We'd like to register this thread with the profiler, but doing
411 // so causes leaks, see bug 1323100.
412 NS_SetCurrentThreadName("Cache2 I/O");
414 mozilla::IOInterposer::RegisterCurrentThread();
415 CacheIOThread
* thread
= static_cast<CacheIOThread
*>(aClosure
);
416 thread
->ThreadFunc();
417 mozilla::IOInterposer::UnregisterCurrentThread();
420 void CacheIOThread::ThreadFunc() {
421 nsCOMPtr
<nsIThreadInternal
> threadInternal
;
424 MonitorAutoLock
lock(mMonitor
);
426 MOZ_ASSERT(mBlockingIOWatcher
);
427 mBlockingIOWatcher
->InitThread();
429 auto queue
= MakeRefPtr
<ThreadEventQueue
<mozilla::EventQueue
>>(
430 MakeUnique
<mozilla::EventQueue
>());
431 nsCOMPtr
<nsIThread
> xpcomThread
=
432 nsThreadManager::get().CreateCurrentThread(queue
,
433 nsThread::NOT_MAIN_THREAD
);
435 threadInternal
= do_QueryInterface(xpcomThread
);
436 if (threadInternal
) threadInternal
->SetObserver(this);
438 mXPCOMThread
= xpcomThread
.forget().take();
444 // Reset the lowest level now, so that we can detect a new event on
445 // a lower level (i.e. higher priority) has been scheduled while
446 // executing any previously scheduled event.
447 mLowestLevelWaiting
= LAST_LEVEL
;
449 // Process xpcom events first
450 while (mHasXPCOMEvents
) {
451 mHasXPCOMEvents
= false;
452 mCurrentlyExecutingLevel
= XPCOM_LEVEL
;
454 MonitorAutoUnlock
unlock(mMonitor
);
459 nsIThread
* thread
= mXPCOMThread
;
460 rv
= thread
->ProcessNextEvent(false, &processedEvent
);
463 MOZ_ASSERT(mBlockingIOWatcher
);
464 mBlockingIOWatcher
->NotifyOperationDone();
465 } while (NS_SUCCEEDED(rv
) && processedEvent
);
469 for (level
= 0; level
< LAST_LEVEL
; ++level
) {
470 if (!mEventQueue
[level
].Length()) {
471 // no events on this level, go to the next level
477 // Go to the first (lowest) level again
481 if (EventsPending()) {
489 AUTO_PROFILER_LABEL("CacheIOThread::ThreadFunc::Wait", IDLE
);
494 MOZ_ASSERT(!EventsPending());
497 // This is for correct assertion on XPCOM events dispatch.
502 if (threadInternal
) threadInternal
->SetObserver(nullptr);
505 void CacheIOThread::LoopOneLevel(uint32_t aLevel
) {
506 EventQueue events
= std::move(mEventQueue
[aLevel
]);
507 EventQueue::size_type length
= events
.Length();
509 mCurrentlyExecutingLevel
= aLevel
;
511 bool returnEvents
= false;
512 bool reportTelemetry
= true;
514 EventQueue::size_type index
;
516 MonitorAutoUnlock
unlock(mMonitor
);
518 for (index
= 0; index
< length
; ++index
) {
519 if (EventsPending(aLevel
)) {
520 // Somebody scheduled a new event on a lower level, break and harry
521 // to execute it! Don't forget to return what we haven't exec.
526 if (reportTelemetry
) {
527 reportTelemetry
= false;
528 CacheIOTelemetry::Report(aLevel
, length
);
531 // Drop any previous flagging, only an event on the current level may set
533 mRerunCurrentEvent
= false;
535 LogRunnable::Run
log(events
[index
].get());
537 events
[index
]->Run();
539 MOZ_ASSERT(mBlockingIOWatcher
);
540 mBlockingIOWatcher
->NotifyOperationDone();
542 if (mRerunCurrentEvent
) {
543 // The event handler yields to higher priority events and wants to
551 --mQueueLength
[aLevel
];
553 // Release outside the lock.
554 events
[index
] = nullptr;
559 // This code must prevent any AddRef/Release calls on the stored COMPtrs as
560 // it might be exhaustive and block the monitor's lock for an excessive
563 // 'index' points at the event that was interrupted and asked for re-run,
564 // all events before have run, been nullified, and can be removed.
565 events
.RemoveElementsAt(0, index
);
566 // Move events that might have been scheduled on this queue to the tail to
567 // preserve the expected per-queue FIFO order.
568 // XXX(Bug 1631371) Check if this should use a fallible operation as it
569 // pretended earlier.
570 events
.AppendElements(std::move(mEventQueue
[aLevel
]));
571 // And finally move everything back to the main queue.
572 mEventQueue
[aLevel
] = std::move(events
);
576 bool CacheIOThread::EventsPending(uint32_t aLastLevel
) {
577 return mLowestLevelWaiting
< aLastLevel
|| mHasXPCOMEvents
;
580 NS_IMETHODIMP
CacheIOThread::OnDispatchedEvent() {
581 MonitorAutoLock
lock(mMonitor
);
582 mHasXPCOMEvents
= true;
583 MOZ_ASSERT(mInsideLoop
);
588 NS_IMETHODIMP
CacheIOThread::OnProcessNextEvent(nsIThreadInternal
* thread
,
593 NS_IMETHODIMP
CacheIOThread::AfterProcessNextEvent(nsIThreadInternal
* thread
,
594 bool eventWasProcessed
) {
600 size_t CacheIOThread::SizeOfExcludingThis(
601 mozilla::MallocSizeOf mallocSizeOf
) const {
602 MonitorAutoLock
lock(const_cast<CacheIOThread
*>(this)->mMonitor
);
605 for (const auto& event
: mEventQueue
) {
606 n
+= event
.ShallowSizeOfExcludingThis(mallocSizeOf
);
607 // Events referenced by the queues are arbitrary objects we cannot be sure
608 // are reported elsewhere as well as probably not implementing nsISizeOf
609 // interface. Deliberatly omitting them from reporting here.
615 size_t CacheIOThread::SizeOfIncludingThis(
616 mozilla::MallocSizeOf mallocSizeOf
) const {
617 return mallocSizeOf(this) + SizeOfExcludingThis(mallocSizeOf
);
620 CacheIOThread::Cancelable::Cancelable(bool aCancelable
)
621 : mCancelable(aCancelable
) {
622 // This will only ever be used on the I/O thread,
623 // which is expected to be alive longer than this class.
624 MOZ_ASSERT(CacheIOThread::sSelf
);
625 MOZ_ASSERT(CacheIOThread::sSelf
->IsCurrentThread());
628 ++CacheIOThread::sSelf
->mIOCancelableEvents
;
632 CacheIOThread::Cancelable::~Cancelable() {
633 MOZ_ASSERT(CacheIOThread::sSelf
);
636 --CacheIOThread::sSelf
->mIOCancelableEvents
;
641 } // namespace mozilla