Bug 1708422: part 8) Move `mozInlineSpellChecker::CheckWordsAndAddRangesForMisspellin...
[gecko.git] / netwerk / cache2 / CacheIOThread.cpp
blobcaeab955c18fe2781de817e7e629fa796f5729e3
1 /* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
5 #include "CacheIOThread.h"
6 #include "CacheFileIOManager.h"
7 #include "CacheLog.h"
8 #include "CacheObserver.h"
10 #include "nsIRunnable.h"
11 #include "nsISupportsImpl.h"
12 #include "nsPrintfCString.h"
13 #include "nsThread.h"
14 #include "nsThreadManager.h"
15 #include "nsThreadUtils.h"
16 #include "mozilla/EventQueue.h"
17 #include "mozilla/IOInterposer.h"
18 #include "mozilla/ProfilerLabels.h"
19 #include "mozilla/ThreadEventQueue.h"
20 #include "mozilla/Telemetry.h"
21 #include "mozilla/TelemetryHistogramEnums.h"
23 #ifdef XP_WIN
24 # include <windows.h>
25 #endif
27 #ifdef MOZ_TASK_TRACER
28 # include "GeckoTaskTracer.h"
29 # include "TracedTaskCommon.h"
30 #endif
32 namespace mozilla {
33 namespace net {
35 namespace { // anon
37 class CacheIOTelemetry {
38 public:
39 typedef CacheIOThread::EventQueue::size_type size_type;
40 static size_type mMinLengthToReport[CacheIOThread::LAST_LEVEL];
41 static void Report(uint32_t aLevel, size_type aLength);
44 static CacheIOTelemetry::size_type const kGranularity = 30;
46 CacheIOTelemetry::size_type
47 CacheIOTelemetry::mMinLengthToReport[CacheIOThread::LAST_LEVEL] = {
48 kGranularity, kGranularity, kGranularity, kGranularity,
49 kGranularity, kGranularity, kGranularity, kGranularity};
51 // static
52 void CacheIOTelemetry::Report(uint32_t aLevel,
53 CacheIOTelemetry::size_type aLength) {
54 if (mMinLengthToReport[aLevel] > aLength) {
55 return;
58 static Telemetry::HistogramID telemetryID[] = {
59 Telemetry::HTTP_CACHE_IO_QUEUE_2_OPEN_PRIORITY,
60 Telemetry::HTTP_CACHE_IO_QUEUE_2_READ_PRIORITY,
61 Telemetry::HTTP_CACHE_IO_QUEUE_2_MANAGEMENT,
62 Telemetry::HTTP_CACHE_IO_QUEUE_2_OPEN,
63 Telemetry::HTTP_CACHE_IO_QUEUE_2_READ,
64 Telemetry::HTTP_CACHE_IO_QUEUE_2_WRITE_PRIORITY,
65 Telemetry::HTTP_CACHE_IO_QUEUE_2_WRITE,
66 Telemetry::HTTP_CACHE_IO_QUEUE_2_INDEX,
67 Telemetry::HTTP_CACHE_IO_QUEUE_2_EVICT};
69 // Each bucket is a multiply of kGranularity (30, 60, 90..., 300+)
70 aLength = (aLength / kGranularity);
71 // Next time report only when over the current length + kGranularity
72 mMinLengthToReport[aLevel] = (aLength + 1) * kGranularity;
74 // 10 is number of buckets we have in each probe
75 aLength = std::min<size_type>(aLength, 10);
77 Telemetry::Accumulate(telemetryID[aLevel], aLength - 1); // counted from 0
80 } // namespace
82 namespace detail {
84 /**
85 * Helper class encapsulating platform-specific code to cancel
86 * any pending IO operation taking too long. Solely used during
87 * shutdown to prevent any IO shutdown hangs.
88 * Mainly designed for using Win32 CancelSynchronousIo function.
90 class BlockingIOWatcher {
91 #ifdef XP_WIN
92 // The native handle to the thread
93 HANDLE mThread;
94 // Event signaling back to the main thread, see NotifyOperationDone.
95 HANDLE mEvent;
96 #endif
98 public:
99 // Created and destroyed on the main thread only
100 BlockingIOWatcher();
101 ~BlockingIOWatcher();
103 // Called on the IO thread to grab the platform specific
104 // reference to it.
105 void InitThread();
106 // If there is a blocking operation being handled on the IO
107 // thread, this is called on the main thread during shutdown.
108 // Waits for notification from the IO thread for up to two seconds.
109 // If that times out, it attempts to cancel the IO operation.
110 void WatchAndCancel(Monitor& aMonitor);
111 // Called by the IO thread after each operation has been
112 // finished (after each Run() call). This wakes the main
113 // thread up and makes WatchAndCancel() early exit and become
114 // a no-op.
115 void NotifyOperationDone();
118 #ifdef XP_WIN
120 BlockingIOWatcher::BlockingIOWatcher() : mThread(NULL), mEvent(NULL) {
121 HMODULE kernel32_dll = GetModuleHandleW(L"kernel32.dll");
122 if (!kernel32_dll) {
123 return;
126 mEvent = ::CreateEventW(NULL, TRUE, FALSE, NULL);
129 BlockingIOWatcher::~BlockingIOWatcher() {
130 if (mEvent) {
131 CloseHandle(mEvent);
133 if (mThread) {
134 CloseHandle(mThread);
138 void BlockingIOWatcher::InitThread() {
139 // GetCurrentThread() only returns a pseudo handle, hence DuplicateHandle
140 ::DuplicateHandle(GetCurrentProcess(), GetCurrentThread(),
141 GetCurrentProcess(), &mThread, 0, FALSE,
142 DUPLICATE_SAME_ACCESS);
145 void BlockingIOWatcher::WatchAndCancel(Monitor& aMonitor) {
146 if (!mEvent) {
147 return;
150 // Reset before we enter the monitor to raise the chance we catch
151 // the currently pending IO op completion.
152 ::ResetEvent(mEvent);
154 HANDLE thread;
156 MonitorAutoLock lock(aMonitor);
157 thread = mThread;
159 if (!thread) {
160 return;
164 LOG(("Blocking IO operation pending on IO thread, waiting..."));
166 // It seems wise to use the I/O lag time as a maximum time to wait
167 // for an operation to finish. When that times out and cancelation
168 // succeeds, there will be no other IO operation permitted. By default
169 // this is two seconds.
170 uint32_t maxLag =
171 std::min<uint32_t>(5, CacheObserver::MaxShutdownIOLag()) * 1000;
173 DWORD result = ::WaitForSingleObject(mEvent, maxLag);
174 if (result == WAIT_TIMEOUT) {
175 LOG(("CacheIOThread: Attempting to cancel a long blocking IO operation"));
176 BOOL result = ::CancelSynchronousIo(thread);
177 if (result) {
178 LOG((" cancelation signal succeeded"));
179 } else {
180 DWORD error = GetLastError();
181 LOG((" cancelation signal failed with GetLastError=%u", error));
186 void BlockingIOWatcher::NotifyOperationDone() {
187 if (mEvent) {
188 ::SetEvent(mEvent);
192 #else // WIN
194 // Stub code only (we don't implement IO cancelation for this platform)
196 BlockingIOWatcher::BlockingIOWatcher() = default;
197 BlockingIOWatcher::~BlockingIOWatcher() = default;
198 void BlockingIOWatcher::InitThread() {}
199 void BlockingIOWatcher::WatchAndCancel(Monitor&) {}
200 void BlockingIOWatcher::NotifyOperationDone() {}
202 #endif
204 } // namespace detail
206 CacheIOThread* CacheIOThread::sSelf = nullptr;
208 NS_IMPL_ISUPPORTS(CacheIOThread, nsIThreadObserver)
210 CacheIOThread::CacheIOThread()
211 : mMonitor("CacheIOThread"),
212 mThread(nullptr),
213 mXPCOMThread(nullptr),
214 mLowestLevelWaiting(LAST_LEVEL),
215 mCurrentlyExecutingLevel(0),
216 mHasXPCOMEvents(false),
217 mRerunCurrentEvent(false),
218 mShutdown(false),
219 mIOCancelableEvents(0),
220 mEventCounter(0)
221 #ifdef DEBUG
223 mInsideLoop(true)
224 #endif
226 for (auto& item : mQueueLength) {
227 item = 0;
230 sSelf = this;
233 CacheIOThread::~CacheIOThread() {
234 if (mXPCOMThread) {
235 nsIThread* thread = mXPCOMThread;
236 thread->Release();
239 sSelf = nullptr;
240 #ifdef DEBUG
241 for (auto& event : mEventQueue) {
242 MOZ_ASSERT(!event.Length());
244 #endif
247 nsresult CacheIOThread::Init() {
249 MonitorAutoLock lock(mMonitor);
250 // Yeah, there is not a thread yet, but we want to make sure
251 // the sequencing is correct.
252 mBlockingIOWatcher = MakeUnique<detail::BlockingIOWatcher>();
255 mThread =
256 PR_CreateThread(PR_USER_THREAD, ThreadFunc, this, PR_PRIORITY_NORMAL,
257 PR_GLOBAL_THREAD, PR_JOINABLE_THREAD, 128 * 1024);
258 if (!mThread) {
259 return NS_ERROR_FAILURE;
262 return NS_OK;
265 nsresult CacheIOThread::Dispatch(nsIRunnable* aRunnable, uint32_t aLevel) {
266 return Dispatch(do_AddRef(aRunnable), aLevel);
269 nsresult CacheIOThread::Dispatch(already_AddRefed<nsIRunnable> aRunnable,
270 uint32_t aLevel) {
271 NS_ENSURE_ARG(aLevel < LAST_LEVEL);
273 nsCOMPtr<nsIRunnable> runnable(aRunnable);
275 // Runnable is always expected to be non-null, hard null-check bellow.
276 MOZ_ASSERT(runnable);
278 MonitorAutoLock lock(mMonitor);
280 if (mShutdown && (PR_GetCurrentThread() != mThread))
281 return NS_ERROR_UNEXPECTED;
283 return DispatchInternal(runnable.forget(), aLevel);
286 nsresult CacheIOThread::DispatchAfterPendingOpens(nsIRunnable* aRunnable) {
287 // Runnable is always expected to be non-null, hard null-check bellow.
288 MOZ_ASSERT(aRunnable);
290 MonitorAutoLock lock(mMonitor);
292 if (mShutdown && (PR_GetCurrentThread() != mThread))
293 return NS_ERROR_UNEXPECTED;
295 // Move everything from later executed OPEN level to the OPEN_PRIORITY level
296 // where we post the (eviction) runnable.
297 mQueueLength[OPEN_PRIORITY] += mEventQueue[OPEN].Length();
298 mQueueLength[OPEN] -= mEventQueue[OPEN].Length();
299 mEventQueue[OPEN_PRIORITY].AppendElements(mEventQueue[OPEN]);
300 mEventQueue[OPEN].Clear();
302 return DispatchInternal(do_AddRef(aRunnable), OPEN_PRIORITY);
305 nsresult CacheIOThread::DispatchInternal(
306 already_AddRefed<nsIRunnable> aRunnable, uint32_t aLevel) {
307 nsCOMPtr<nsIRunnable> runnable(aRunnable);
308 #ifdef MOZ_TASK_TRACER
309 if (tasktracer::IsStartLogging()) {
310 runnable = tasktracer::CreateTracedRunnable(runnable.forget());
311 (static_cast<tasktracer::TracedRunnable*>(runnable.get()))->DispatchTask();
313 #endif
315 LogRunnable::LogDispatch(runnable.get());
317 if (NS_WARN_IF(!runnable)) return NS_ERROR_NULL_POINTER;
319 mMonitor.AssertCurrentThreadOwns();
321 ++mQueueLength[aLevel];
322 mEventQueue[aLevel].AppendElement(runnable.forget());
323 if (mLowestLevelWaiting > aLevel) mLowestLevelWaiting = aLevel;
325 mMonitor.NotifyAll();
327 return NS_OK;
330 bool CacheIOThread::IsCurrentThread() {
331 return mThread == PR_GetCurrentThread();
334 uint32_t CacheIOThread::QueueSize(bool highPriority) {
335 MonitorAutoLock lock(mMonitor);
336 if (highPriority) {
337 return mQueueLength[OPEN_PRIORITY] + mQueueLength[READ_PRIORITY];
340 return mQueueLength[OPEN_PRIORITY] + mQueueLength[READ_PRIORITY] +
341 mQueueLength[MANAGEMENT] + mQueueLength[OPEN] + mQueueLength[READ];
344 bool CacheIOThread::YieldInternal() {
345 if (!IsCurrentThread()) {
346 NS_WARNING(
347 "Trying to yield to priority events on non-cache2 I/O thread? "
348 "You probably do something wrong.");
349 return false;
352 if (mCurrentlyExecutingLevel == XPCOM_LEVEL) {
353 // Doesn't make any sense, since this handler is the one
354 // that would be executed as the next one.
355 return false;
358 if (!EventsPending(mCurrentlyExecutingLevel)) return false;
360 mRerunCurrentEvent = true;
361 return true;
364 void CacheIOThread::Shutdown() {
365 if (!mThread) {
366 return;
370 MonitorAutoLock lock(mMonitor);
371 mShutdown = true;
372 mMonitor.NotifyAll();
375 PR_JoinThread(mThread);
376 mThread = nullptr;
379 void CacheIOThread::CancelBlockingIO() {
380 // This is an attempt to cancel any blocking I/O operation taking
381 // too long time.
382 if (!mBlockingIOWatcher) {
383 return;
386 if (!mIOCancelableEvents) {
387 LOG(("CacheIOThread::CancelBlockingIO, no blocking operation to cancel"));
388 return;
391 // OK, when we are here, we are processing an IO on the thread that
392 // can be cancelled.
393 mBlockingIOWatcher->WatchAndCancel(mMonitor);
396 already_AddRefed<nsIEventTarget> CacheIOThread::Target() {
397 nsCOMPtr<nsIEventTarget> target;
399 target = mXPCOMThread;
400 if (!target && mThread) {
401 MonitorAutoLock lock(mMonitor);
402 while (!mXPCOMThread) {
403 lock.Wait();
406 target = mXPCOMThread;
409 return target.forget();
412 // static
413 void CacheIOThread::ThreadFunc(void* aClosure) {
414 // XXXmstange We'd like to register this thread with the profiler, but doing
415 // so causes leaks, see bug 1323100.
416 NS_SetCurrentThreadName("Cache2 I/O");
418 mozilla::IOInterposer::RegisterCurrentThread();
419 CacheIOThread* thread = static_cast<CacheIOThread*>(aClosure);
420 thread->ThreadFunc();
421 mozilla::IOInterposer::UnregisterCurrentThread();
424 void CacheIOThread::ThreadFunc() {
425 nsCOMPtr<nsIThreadInternal> threadInternal;
428 MonitorAutoLock lock(mMonitor);
430 MOZ_ASSERT(mBlockingIOWatcher);
431 mBlockingIOWatcher->InitThread();
433 auto queue =
434 MakeRefPtr<ThreadEventQueue>(MakeUnique<mozilla::EventQueue>());
435 nsCOMPtr<nsIThread> xpcomThread =
436 nsThreadManager::get().CreateCurrentThread(queue,
437 nsThread::NOT_MAIN_THREAD);
439 threadInternal = do_QueryInterface(xpcomThread);
440 if (threadInternal) threadInternal->SetObserver(this);
442 mXPCOMThread = xpcomThread.forget().take();
444 lock.NotifyAll();
446 do {
447 loopStart:
448 // Reset the lowest level now, so that we can detect a new event on
449 // a lower level (i.e. higher priority) has been scheduled while
450 // executing any previously scheduled event.
451 mLowestLevelWaiting = LAST_LEVEL;
453 // Process xpcom events first
454 while (mHasXPCOMEvents) {
455 mHasXPCOMEvents = false;
456 mCurrentlyExecutingLevel = XPCOM_LEVEL;
458 MonitorAutoUnlock unlock(mMonitor);
460 bool processedEvent;
461 nsresult rv;
462 do {
463 nsIThread* thread = mXPCOMThread;
464 rv = thread->ProcessNextEvent(false, &processedEvent);
466 ++mEventCounter;
467 MOZ_ASSERT(mBlockingIOWatcher);
468 mBlockingIOWatcher->NotifyOperationDone();
469 } while (NS_SUCCEEDED(rv) && processedEvent);
472 uint32_t level;
473 for (level = 0; level < LAST_LEVEL; ++level) {
474 if (!mEventQueue[level].Length()) {
475 // no events on this level, go to the next level
476 continue;
479 LoopOneLevel(level);
481 // Go to the first (lowest) level again
482 goto loopStart;
485 if (EventsPending()) {
486 continue;
489 if (mShutdown) {
490 break;
493 AUTO_PROFILER_LABEL("CacheIOThread::ThreadFunc::Wait", IDLE);
494 lock.Wait();
496 } while (true);
498 MOZ_ASSERT(!EventsPending());
500 #ifdef DEBUG
501 // This is for correct assertion on XPCOM events dispatch.
502 mInsideLoop = false;
503 #endif
504 } // lock
506 if (threadInternal) threadInternal->SetObserver(nullptr);
509 void CacheIOThread::LoopOneLevel(uint32_t aLevel) {
510 EventQueue events = std::move(mEventQueue[aLevel]);
511 EventQueue::size_type length = events.Length();
513 mCurrentlyExecutingLevel = aLevel;
515 bool returnEvents = false;
516 bool reportTelemetry = true;
518 EventQueue::size_type index;
520 MonitorAutoUnlock unlock(mMonitor);
522 for (index = 0; index < length; ++index) {
523 if (EventsPending(aLevel)) {
524 // Somebody scheduled a new event on a lower level, break and harry
525 // to execute it! Don't forget to return what we haven't exec.
526 returnEvents = true;
527 break;
530 if (reportTelemetry) {
531 reportTelemetry = false;
532 CacheIOTelemetry::Report(aLevel, length);
535 // Drop any previous flagging, only an event on the current level may set
536 // this flag.
537 mRerunCurrentEvent = false;
539 LogRunnable::Run log(events[index].get());
541 events[index]->Run();
543 MOZ_ASSERT(mBlockingIOWatcher);
544 mBlockingIOWatcher->NotifyOperationDone();
546 if (mRerunCurrentEvent) {
547 // The event handler yields to higher priority events and wants to
548 // rerun.
549 log.WillRunAgain();
550 returnEvents = true;
551 break;
554 ++mEventCounter;
555 --mQueueLength[aLevel];
557 // Release outside the lock.
558 events[index] = nullptr;
562 if (returnEvents) {
563 // This code must prevent any AddRef/Release calls on the stored COMPtrs as
564 // it might be exhaustive and block the monitor's lock for an excessive
565 // amout of time.
567 // 'index' points at the event that was interrupted and asked for re-run,
568 // all events before have run, been nullified, and can be removed.
569 events.RemoveElementsAt(0, index);
570 // Move events that might have been scheduled on this queue to the tail to
571 // preserve the expected per-queue FIFO order.
572 // XXX(Bug 1631371) Check if this should use a fallible operation as it
573 // pretended earlier.
574 events.AppendElements(std::move(mEventQueue[aLevel]));
575 // And finally move everything back to the main queue.
576 mEventQueue[aLevel] = std::move(events);
580 bool CacheIOThread::EventsPending(uint32_t aLastLevel) {
581 return mLowestLevelWaiting < aLastLevel || mHasXPCOMEvents;
584 NS_IMETHODIMP CacheIOThread::OnDispatchedEvent() {
585 MonitorAutoLock lock(mMonitor);
586 mHasXPCOMEvents = true;
587 MOZ_ASSERT(mInsideLoop);
588 lock.Notify();
589 return NS_OK;
592 NS_IMETHODIMP CacheIOThread::OnProcessNextEvent(nsIThreadInternal* thread,
593 bool mayWait) {
594 return NS_OK;
597 NS_IMETHODIMP CacheIOThread::AfterProcessNextEvent(nsIThreadInternal* thread,
598 bool eventWasProcessed) {
599 return NS_OK;
602 // Memory reporting
604 size_t CacheIOThread::SizeOfExcludingThis(
605 mozilla::MallocSizeOf mallocSizeOf) const {
606 MonitorAutoLock lock(const_cast<CacheIOThread*>(this)->mMonitor);
608 size_t n = 0;
609 for (const auto& event : mEventQueue) {
610 n += event.ShallowSizeOfExcludingThis(mallocSizeOf);
611 // Events referenced by the queues are arbitrary objects we cannot be sure
612 // are reported elsewhere as well as probably not implementing nsISizeOf
613 // interface. Deliberatly omitting them from reporting here.
616 return n;
619 size_t CacheIOThread::SizeOfIncludingThis(
620 mozilla::MallocSizeOf mallocSizeOf) const {
621 return mallocSizeOf(this) + SizeOfExcludingThis(mallocSizeOf);
624 CacheIOThread::Cancelable::Cancelable(bool aCancelable)
625 : mCancelable(aCancelable) {
626 // This will only ever be used on the I/O thread,
627 // which is expected to be alive longer than this class.
628 MOZ_ASSERT(CacheIOThread::sSelf);
629 MOZ_ASSERT(CacheIOThread::sSelf->IsCurrentThread());
631 if (mCancelable) {
632 ++CacheIOThread::sSelf->mIOCancelableEvents;
636 CacheIOThread::Cancelable::~Cancelable() {
637 MOZ_ASSERT(CacheIOThread::sSelf);
639 if (mCancelable) {
640 --CacheIOThread::sSelf->mIOCancelableEvents;
644 } // namespace net
645 } // namespace mozilla