1 /* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
5 #include "CacheIOThread.h"
6 #include "CacheFileIOManager.h"
8 #include "CacheObserver.h"
10 #include "nsIRunnable.h"
11 #include "nsISupportsImpl.h"
12 #include "nsPrintfCString.h"
14 #include "nsThreadManager.h"
15 #include "nsThreadUtils.h"
16 #include "mozilla/EventQueue.h"
17 #include "mozilla/IOInterposer.h"
18 #include "mozilla/ProfilerLabels.h"
19 #include "mozilla/ThreadEventQueue.h"
20 #include "mozilla/Telemetry.h"
21 #include "mozilla/TelemetryHistogramEnums.h"
27 namespace mozilla::net
{
31 class CacheIOTelemetry
{
33 using size_type
= CacheIOThread::EventQueue::size_type
;
34 static size_type mMinLengthToReport
[CacheIOThread::LAST_LEVEL
];
35 static void Report(uint32_t aLevel
, size_type aLength
);
38 static CacheIOTelemetry::size_type
const kGranularity
= 30;
40 CacheIOTelemetry::size_type
41 CacheIOTelemetry::mMinLengthToReport
[CacheIOThread::LAST_LEVEL
] = {
42 kGranularity
, kGranularity
, kGranularity
, kGranularity
,
43 kGranularity
, kGranularity
, kGranularity
, kGranularity
};
46 void CacheIOTelemetry::Report(uint32_t aLevel
,
47 CacheIOTelemetry::size_type aLength
) {
48 if (mMinLengthToReport
[aLevel
] > aLength
) {
52 static Telemetry::HistogramID telemetryID
[] = {
53 Telemetry::HTTP_CACHE_IO_QUEUE_2_OPEN_PRIORITY
,
54 Telemetry::HTTP_CACHE_IO_QUEUE_2_READ_PRIORITY
,
55 Telemetry::HTTP_CACHE_IO_QUEUE_2_MANAGEMENT
,
56 Telemetry::HTTP_CACHE_IO_QUEUE_2_OPEN
,
57 Telemetry::HTTP_CACHE_IO_QUEUE_2_READ
,
58 Telemetry::HTTP_CACHE_IO_QUEUE_2_WRITE_PRIORITY
,
59 Telemetry::HTTP_CACHE_IO_QUEUE_2_WRITE
,
60 Telemetry::HTTP_CACHE_IO_QUEUE_2_INDEX
,
61 Telemetry::HTTP_CACHE_IO_QUEUE_2_EVICT
};
63 // Each bucket is a multiply of kGranularity (30, 60, 90..., 300+)
64 aLength
= (aLength
/ kGranularity
);
65 // Next time report only when over the current length + kGranularity
66 mMinLengthToReport
[aLevel
] = (aLength
+ 1) * kGranularity
;
68 // 10 is number of buckets we have in each probe
69 aLength
= std::min
<size_type
>(aLength
, 10);
71 Telemetry::Accumulate(telemetryID
[aLevel
], aLength
- 1); // counted from 0
79 * Helper class encapsulating platform-specific code to cancel
80 * any pending IO operation taking too long. Solely used during
81 * shutdown to prevent any IO shutdown hangs.
82 * Mainly designed for using Win32 CancelSynchronousIo function.
84 class NativeThreadHandle
{
86 // The native handle to the thread
91 // Created and destroyed on the main thread only
93 ~NativeThreadHandle();
95 // Called on the IO thread to grab the platform specific
98 // If there is a blocking operation being handled on the IO
99 // thread, this is called on the main thread during shutdown.
100 void CancelBlockingIO(Monitor
& aMonitor
);
105 NativeThreadHandle::NativeThreadHandle() : mThread(NULL
) {}
107 NativeThreadHandle::~NativeThreadHandle() {
109 CloseHandle(mThread
);
113 void NativeThreadHandle::InitThread() {
114 // GetCurrentThread() only returns a pseudo handle, hence DuplicateHandle
115 ::DuplicateHandle(GetCurrentProcess(), GetCurrentThread(),
116 GetCurrentProcess(), &mThread
, 0, FALSE
,
117 DUPLICATE_SAME_ACCESS
);
120 void NativeThreadHandle::CancelBlockingIO(Monitor
& aMonitor
) {
123 MonitorAutoLock
lock(aMonitor
);
131 LOG(("CacheIOThread: Attempting to cancel a long blocking IO operation"));
132 BOOL result
= ::CancelSynchronousIo(thread
);
134 LOG((" cancelation signal succeeded"));
136 DWORD error
= GetLastError();
137 LOG((" cancelation signal failed with GetLastError=%lu", error
));
143 // Stub code only (we don't implement IO cancelation for this platform)
145 NativeThreadHandle::NativeThreadHandle() = default;
146 NativeThreadHandle::~NativeThreadHandle() = default;
147 void NativeThreadHandle::InitThread() {}
148 void NativeThreadHandle::CancelBlockingIO(Monitor
&) {}
152 } // namespace detail
154 CacheIOThread
* CacheIOThread::sSelf
= nullptr;
156 NS_IMPL_ISUPPORTS(CacheIOThread
, nsIThreadObserver
)
158 CacheIOThread::CacheIOThread() {
159 for (auto& item
: mQueueLength
) {
166 CacheIOThread::~CacheIOThread() {
168 MonitorAutoLock
lock(mMonitor
);
169 MOZ_RELEASE_ASSERT(mShutdown
);
173 nsIThread
* thread
= mXPCOMThread
;
179 for (auto& event
: mEventQueue
) {
180 MOZ_ASSERT(!event
.Length());
185 nsresult
CacheIOThread::Init() {
187 MonitorAutoLock
lock(mMonitor
);
188 // Yeah, there is not a thread yet, but we want to make sure
189 // the sequencing is correct.
190 mNativeThreadHandle
= MakeUnique
<detail::NativeThreadHandle
>();
193 // Increase the reference count while spawning a new thread.
194 // If PR_CreateThread succeeds, we will forget this reference and the thread
195 // will be responsible to release it when it completes.
196 RefPtr
<CacheIOThread
> self
= this;
198 PR_CreateThread(PR_USER_THREAD
, ThreadFunc
, this, PR_PRIORITY_NORMAL
,
199 PR_GLOBAL_THREAD
, PR_JOINABLE_THREAD
, 128 * 1024);
201 // Treat this thread as already shutdown.
202 MonitorAutoLock
lock(mMonitor
);
204 return NS_ERROR_FAILURE
;
207 // IMPORTANT: The thread now owns this reference, so it's important that we
208 // leak it here, otherwise we'll end up with a bad refcount.
209 // See the dont_AddRef in ThreadFunc().
210 Unused
<< self
.forget().take();
215 nsresult
CacheIOThread::Dispatch(nsIRunnable
* aRunnable
, uint32_t aLevel
) {
216 return Dispatch(do_AddRef(aRunnable
), aLevel
);
219 nsresult
CacheIOThread::Dispatch(already_AddRefed
<nsIRunnable
> aRunnable
,
221 NS_ENSURE_ARG(aLevel
< LAST_LEVEL
);
223 nsCOMPtr
<nsIRunnable
> runnable(aRunnable
);
225 // Runnable is always expected to be non-null, hard null-check bellow.
226 MOZ_ASSERT(runnable
);
228 MonitorAutoLock
lock(mMonitor
);
230 if (mShutdown
&& (PR_GetCurrentThread() != mThread
)) {
231 return NS_ERROR_UNEXPECTED
;
234 return DispatchInternal(runnable
.forget(), aLevel
);
237 nsresult
CacheIOThread::DispatchAfterPendingOpens(nsIRunnable
* aRunnable
) {
238 // Runnable is always expected to be non-null, hard null-check bellow.
239 MOZ_ASSERT(aRunnable
);
241 MonitorAutoLock
lock(mMonitor
);
243 if (mShutdown
&& (PR_GetCurrentThread() != mThread
)) {
244 return NS_ERROR_UNEXPECTED
;
247 // Move everything from later executed OPEN level to the OPEN_PRIORITY level
248 // where we post the (eviction) runnable.
249 mQueueLength
[OPEN_PRIORITY
] += mEventQueue
[OPEN
].Length();
250 mQueueLength
[OPEN
] -= mEventQueue
[OPEN
].Length();
251 mEventQueue
[OPEN_PRIORITY
].AppendElements(mEventQueue
[OPEN
]);
252 mEventQueue
[OPEN
].Clear();
254 return DispatchInternal(do_AddRef(aRunnable
), OPEN_PRIORITY
);
257 nsresult
CacheIOThread::DispatchInternal(
258 already_AddRefed
<nsIRunnable
> aRunnable
, uint32_t aLevel
) {
259 nsCOMPtr
<nsIRunnable
> runnable(aRunnable
);
261 LogRunnable::LogDispatch(runnable
.get());
263 if (NS_WARN_IF(!runnable
)) return NS_ERROR_NULL_POINTER
;
265 mMonitor
.AssertCurrentThreadOwns();
267 ++mQueueLength
[aLevel
];
268 mEventQueue
[aLevel
].AppendElement(runnable
.forget());
269 if (mLowestLevelWaiting
> aLevel
) mLowestLevelWaiting
= aLevel
;
271 mMonitor
.NotifyAll();
276 bool CacheIOThread::IsCurrentThread() {
277 return mThread
== PR_GetCurrentThread();
280 uint32_t CacheIOThread::QueueSize(bool highPriority
) {
281 MonitorAutoLock
lock(mMonitor
);
283 return mQueueLength
[OPEN_PRIORITY
] + mQueueLength
[READ_PRIORITY
];
286 return mQueueLength
[OPEN_PRIORITY
] + mQueueLength
[READ_PRIORITY
] +
287 mQueueLength
[MANAGEMENT
] + mQueueLength
[OPEN
] + mQueueLength
[READ
];
290 bool CacheIOThread::YieldInternal() {
291 if (!IsCurrentThread()) {
293 "Trying to yield to priority events on non-cache2 I/O thread? "
294 "You probably do something wrong.");
298 if (mCurrentlyExecutingLevel
== XPCOM_LEVEL
) {
299 // Doesn't make any sense, since this handler is the one
300 // that would be executed as the next one.
304 if (!EventsPending(mCurrentlyExecutingLevel
)) return false;
306 mRerunCurrentEvent
= true;
310 void CacheIOThread::Shutdown() {
316 MonitorAutoLock
lock(mMonitor
);
318 mMonitor
.NotifyAll();
321 PR_JoinThread(mThread
);
325 void CacheIOThread::CancelBlockingIO() {
326 // This is an attempt to cancel any blocking I/O operation taking
328 if (!mNativeThreadHandle
) {
332 if (!mIOCancelableEvents
) {
333 LOG(("CacheIOThread::CancelBlockingIO, no blocking operation to cancel"));
337 // OK, when we are here, we are processing an IO on the thread that
339 mNativeThreadHandle
->CancelBlockingIO(mMonitor
);
342 already_AddRefed
<nsIEventTarget
> CacheIOThread::Target() {
343 nsCOMPtr
<nsIEventTarget
> target
;
345 target
= mXPCOMThread
;
346 if (!target
&& mThread
) {
347 MonitorAutoLock
lock(mMonitor
);
348 while (!mXPCOMThread
) {
352 target
= mXPCOMThread
;
355 return target
.forget();
359 void CacheIOThread::ThreadFunc(void* aClosure
) {
360 // XXXmstange We'd like to register this thread with the profiler, but doing
361 // so causes leaks, see bug 1323100.
362 NS_SetCurrentThreadName("Cache2 I/O");
364 mozilla::IOInterposer::RegisterCurrentThread();
365 // We hold on to this reference for the duration of the thread.
366 RefPtr
<CacheIOThread
> thread
=
367 dont_AddRef(static_cast<CacheIOThread
*>(aClosure
));
368 thread
->ThreadFunc();
369 mozilla::IOInterposer::UnregisterCurrentThread();
372 void CacheIOThread::ThreadFunc() {
373 nsCOMPtr
<nsIThreadInternal
> threadInternal
;
376 MonitorAutoLock
lock(mMonitor
);
378 MOZ_ASSERT(mNativeThreadHandle
);
379 mNativeThreadHandle
->InitThread();
382 MakeRefPtr
<ThreadEventQueue
>(MakeUnique
<mozilla::EventQueue
>());
383 nsCOMPtr
<nsIThread
> xpcomThread
=
384 nsThreadManager::get().CreateCurrentThread(queue
);
386 threadInternal
= do_QueryInterface(xpcomThread
);
387 if (threadInternal
) threadInternal
->SetObserver(this);
389 mXPCOMThread
= xpcomThread
.forget().take();
390 nsCOMPtr
<nsIThread
> thread
= NS_GetCurrentThread();
396 // Reset the lowest level now, so that we can detect a new event on
397 // a lower level (i.e. higher priority) has been scheduled while
398 // executing any previously scheduled event.
399 mLowestLevelWaiting
= LAST_LEVEL
;
401 // Process xpcom events first
402 while (mHasXPCOMEvents
) {
403 mHasXPCOMEvents
= false;
404 mCurrentlyExecutingLevel
= XPCOM_LEVEL
;
406 MonitorAutoUnlock
unlock(mMonitor
);
411 rv
= thread
->ProcessNextEvent(false, &processedEvent
);
414 MOZ_ASSERT(mNativeThreadHandle
);
415 } while (NS_SUCCEEDED(rv
) && processedEvent
);
419 for (level
= 0; level
< LAST_LEVEL
; ++level
) {
420 if (!mEventQueue
[level
].Length()) {
421 // no events on this level, go to the next level
427 // Go to the first (lowest) level again
431 if (EventsPending()) {
439 AUTO_PROFILER_LABEL("CacheIOThread::ThreadFunc::Wait", IDLE
);
444 MOZ_ASSERT(!EventsPending());
447 // This is for correct assertion on XPCOM events dispatch.
452 if (threadInternal
) threadInternal
->SetObserver(nullptr);
455 void CacheIOThread::LoopOneLevel(uint32_t aLevel
) {
456 mMonitor
.AssertCurrentThreadOwns();
457 EventQueue events
= std::move(mEventQueue
[aLevel
]);
458 EventQueue::size_type length
= events
.Length();
460 mCurrentlyExecutingLevel
= aLevel
;
462 bool returnEvents
= false;
463 bool reportTelemetry
= true;
465 EventQueue::size_type index
;
467 MonitorAutoUnlock
unlock(mMonitor
);
469 for (index
= 0; index
< length
; ++index
) {
470 if (EventsPending(aLevel
)) {
471 // Somebody scheduled a new event on a lower level, break and harry
472 // to execute it! Don't forget to return what we haven't exec.
477 if (reportTelemetry
) {
478 reportTelemetry
= false;
479 CacheIOTelemetry::Report(aLevel
, length
);
482 // Drop any previous flagging, only an event on the current level may set
484 mRerunCurrentEvent
= false;
486 LogRunnable::Run
log(events
[index
].get());
488 events
[index
]->Run();
490 MOZ_ASSERT(mNativeThreadHandle
);
492 if (mRerunCurrentEvent
) {
493 // The event handler yields to higher priority events and wants to
501 --mQueueLength
[aLevel
];
503 // Release outside the lock.
504 events
[index
] = nullptr;
509 // This code must prevent any AddRef/Release calls on the stored COMPtrs as
510 // it might be exhaustive and block the monitor's lock for an excessive
513 // 'index' points at the event that was interrupted and asked for re-run,
514 // all events before have run, been nullified, and can be removed.
515 events
.RemoveElementsAt(0, index
);
516 // Move events that might have been scheduled on this queue to the tail to
517 // preserve the expected per-queue FIFO order.
518 // XXX(Bug 1631371) Check if this should use a fallible operation as it
519 // pretended earlier.
520 events
.AppendElements(std::move(mEventQueue
[aLevel
]));
521 // And finally move everything back to the main queue.
522 mEventQueue
[aLevel
] = std::move(events
);
526 bool CacheIOThread::EventsPending(uint32_t aLastLevel
) {
527 return mLowestLevelWaiting
< aLastLevel
|| mHasXPCOMEvents
;
530 NS_IMETHODIMP
CacheIOThread::OnDispatchedEvent() {
531 MonitorAutoLock
lock(mMonitor
);
532 mHasXPCOMEvents
= true;
533 MOZ_ASSERT(mInsideLoop
);
538 NS_IMETHODIMP
CacheIOThread::OnProcessNextEvent(nsIThreadInternal
* thread
,
543 NS_IMETHODIMP
CacheIOThread::AfterProcessNextEvent(nsIThreadInternal
* thread
,
544 bool eventWasProcessed
) {
550 size_t CacheIOThread::SizeOfExcludingThis(
551 mozilla::MallocSizeOf mallocSizeOf
) const {
552 MonitorAutoLock
lock(const_cast<CacheIOThread
*>(this)->mMonitor
);
555 for (const auto& event
: mEventQueue
) {
556 n
+= event
.ShallowSizeOfExcludingThis(mallocSizeOf
);
557 // Events referenced by the queues are arbitrary objects we cannot be sure
558 // are reported elsewhere as well as probably not implementing nsISizeOf
559 // interface. Deliberatly omitting them from reporting here.
565 size_t CacheIOThread::SizeOfIncludingThis(
566 mozilla::MallocSizeOf mallocSizeOf
) const {
567 return mallocSizeOf(this) + SizeOfExcludingThis(mallocSizeOf
);
570 CacheIOThread::Cancelable::Cancelable(bool aCancelable
)
571 : mCancelable(aCancelable
) {
572 // This will only ever be used on the I/O thread,
573 // which is expected to be alive longer than this class.
574 MOZ_ASSERT(CacheIOThread::sSelf
);
575 MOZ_ASSERT(CacheIOThread::sSelf
->IsCurrentThread());
578 ++CacheIOThread::sSelf
->mIOCancelableEvents
;
582 CacheIOThread::Cancelable::~Cancelable() {
583 MOZ_ASSERT(CacheIOThread::sSelf
);
586 --CacheIOThread::sSelf
->mIOCancelableEvents
;
590 } // namespace mozilla::net