!F (Profiling) (DEV-7030) Rewrite of the profiling system to have a unified interface...
[CRYENGINE.git] / Code / CryEngine / CrySystem / JobManager / BlockingBackend / BlockingBackEnd.cpp
blob80bf84ac712f9afb609a3bb13b387033de60b376
1 // Copyright 2001-2018 Crytek GmbH / Crytek Group. All rights reserved.
3 // -------------------------------------------------------------------------
4 // File name: BlockingBackEnd.h
5 // Version: v1.00
6 // Created: 07/05/2011 by Christopher Bolte
7 // Compilers: Visual Studio.NET
8 // -------------------------------------------------------------------------
9 // History:
10 ////////////////////////////////////////////////////////////////////////////
11 #include "StdAfx.h"
12 #include "BlockingBackEnd.h"
13 #include "../JobManager.h"
14 #include "../../System.h"
15 #include "../../CPUDetect.h"
17 // used to distinguish regular and blocking worker threads
18 #define BLOCKING_WORKER_ID_FLAG 0x40000000
20 ///////////////////////////////////////////////////////////////////////////////
21 JobManager::BlockingBackEnd::CBlockingBackEnd::CBlockingBackEnd(JobManager::SInfoBlock** pRegularWorkerFallbacks, uint32 nRegularWorkerThreads) :
22 m_Semaphore(SJobQueue_BlockingBackEnd::eMaxWorkQueueJobsSize),
23 m_pRegularWorkerFallbacks(pRegularWorkerFallbacks),
24 m_nRegularWorkerThreads(nRegularWorkerThreads),
25 m_pWorkerThreads(NULL),
26 m_nNumWorker(0)
28 m_JobQueue.Init();
30 #if defined(JOBMANAGER_SUPPORT_STATOSCOPE)
31 m_pBackEndWorkerProfiler = 0;
32 #endif
35 ///////////////////////////////////////////////////////////////////////////////
36 JobManager::BlockingBackEnd::CBlockingBackEnd::~CBlockingBackEnd()
40 ///////////////////////////////////////////////////////////////////////////////
41 bool JobManager::BlockingBackEnd::CBlockingBackEnd::Init(uint32 nSysMaxWorker)
43 m_pWorkerThreads = new CBlockingBackEndWorkerThread*[nSysMaxWorker];
45 // create single worker thread for blocking backend
46 for (uint32 i = 0; i < nSysMaxWorker; ++i)
48 m_pWorkerThreads[i] = new CBlockingBackEndWorkerThread(this, m_Semaphore, m_JobQueue, m_pRegularWorkerFallbacks, m_nRegularWorkerThreads, i);
50 if (!gEnv->pThreadManager->SpawnThread(m_pWorkerThreads[i], "JobSystem_Worker_%u (Blocking)", i))
52 CryFatalError("Error spawning \"JobSystem_Worker_%u (Blocking)\" thread.", i);
56 m_nNumWorker = nSysMaxWorker;
58 #if defined(JOBMANAGER_SUPPORT_STATOSCOPE)
59 m_pBackEndWorkerProfiler = new JobManager::CWorkerBackEndProfiler;
60 m_pBackEndWorkerProfiler->Init(m_nNumWorker);
61 #endif
63 return true;
66 ///////////////////////////////////////////////////////////////////////////////
67 bool JobManager::BlockingBackEnd::CBlockingBackEnd::ShutDown()
69 for (uint32 i = 0; i < m_nNumWorker; ++i)
71 if (m_pWorkerThreads[i] == NULL)
72 continue;
74 m_pWorkerThreads[i]->SignalStopWork();
75 m_Semaphore.Release();
78 for (uint32 i = 0; i < m_nNumWorker; ++i)
80 if (m_pWorkerThreads[i] == NULL)
81 continue;
83 if (gEnv->pThreadManager->JoinThread(m_pWorkerThreads[i], eJM_TryJoin))
85 delete m_pWorkerThreads[i];
86 m_pWorkerThreads[i] = NULL;
90 m_pWorkerThreads = NULL;
91 m_nNumWorker = 0;
93 #if defined(JOBMANAGER_SUPPORT_STATOSCOPE)
94 SAFE_DELETE(m_pBackEndWorkerProfiler);
95 #endif
97 return true;
100 ///////////////////////////////////////////////////////////////////////////////
101 void JobManager::BlockingBackEnd::CBlockingBackEnd::AddJob(JobManager::CJobDelegator& crJob, const JobManager::TJobHandle cJobHandle, JobManager::SInfoBlock& rInfoBlock)
103 uint32 nJobPriority = crJob.GetPriorityLevel();
105 /////////////////////////////////////////////////////////////////////////////
106 // Acquire Infoblock to use
107 uint32 jobSlot;
109 m_JobQueue.GetJobSlot(jobSlot, nJobPriority);
111 #if !defined(_RELEASE)
112 CJobManager::Instance()->IncreaseRunJobs();
113 #endif
114 // copy info block into job queue
115 JobManager::SInfoBlock& RESTRICT_REFERENCE rJobInfoBlock = m_JobQueue.jobInfoBlocks[nJobPriority][jobSlot];
117 // since we will use the whole InfoBlock, and it is aligned to 128 bytes, clear the cacheline, this is faster than a cachemiss on write
118 //STATIC_CHECK( sizeof(JobManager::SInfoBlock) == 512, ERROR_SIZE_OF_SINFOBLOCK_NOT_EQUALS_512 );
120 // first cache line needs to be persistent
121 ResetLine128(&rJobInfoBlock, 128);
122 ResetLine128(&rJobInfoBlock, 256);
123 ResetLine128(&rJobInfoBlock, 384);
125 /////////////////////////////////////////////////////////////////////////////
126 // Initialize the InfoBlock
127 rInfoBlock.AssignMembersTo(&rJobInfoBlock);
129 JobManager::CJobManager::CopyJobParameter(crJob.GetParamDataSize(), rJobInfoBlock.GetParamAddress(), crJob.GetJobParamData());
131 assert(rInfoBlock.jobInvoker);
133 const uint32 cJobId = cJobHandle->jobId;
134 rJobInfoBlock.jobId = (unsigned char)cJobId;
136 #if defined(JOBMANAGER_SUPPORT_STATOSCOPE)
137 assert(cJobId < JobManager::detail::eJOB_FRAME_STATS_MAX_SUPP_JOBS);
138 m_pBackEndWorkerProfiler->RegisterJob(cJobId, CJobManager::Instance()->GetJobName(rInfoBlock.jobInvoker));
139 rJobInfoBlock.frameProfIndex = (unsigned char)m_pBackEndWorkerProfiler->GetProfileIndex();
140 #endif
142 /////////////////////////////////////////////////////////////////////////////
143 // initialization finished, make all visible for worker threads
144 // the producing thread won't need the info block anymore, flush it from the cache
145 FlushLine128(&rJobInfoBlock, 0);
146 FlushLine128(&rJobInfoBlock, 128);
147 FlushLine128(&rJobInfoBlock, 256);
148 FlushLine128(&rJobInfoBlock, 384);
150 //CryLogAlways("Add Job to Slot 0x%x, priority 0x%x", jobSlot, nJobPriority );
151 MemoryBarrier();
152 m_JobQueue.jobInfoBlockStates[nJobPriority][jobSlot].SetReady();
154 // Release semaphore count to signal the workers that work is available
155 m_Semaphore.Release();
158 ///////////////////////////////////////////////////////////////////////////////
159 void JobManager::BlockingBackEnd::CBlockingBackEndWorkerThread::SignalStopWork()
161 m_bStop = true;
164 bool JobManager::BlockingBackEnd::IsBlockingWorkerId(uint32 workerId)
166 return (workerId & BLOCKING_WORKER_ID_FLAG) != 0;
169 uint32 JobManager::BlockingBackEnd::GetIndexFromWorkerId(uint32 workerId)
171 CRY_ASSERT(IsBlockingWorkerId(workerId));
172 return workerId & !BLOCKING_WORKER_ID_FLAG;
175 //////////////////////////////////////////////////////////////////////////
176 void JobManager::BlockingBackEnd::CBlockingBackEndWorkerThread::ThreadEntry()
178 // set up thread id
179 JobManager::detail::SetWorkerThreadId(m_nId | BLOCKING_WORKER_ID_FLAG);
182 SInfoBlock infoBlock;
183 ///////////////////////////////////////////////////////////////////////////
184 // wait for new work
186 //CRY_PROFILE_REGION_WAITING(PROFILE_SYSTEM, "Wait - JobWorkerThread");
187 m_rSemaphore.Acquire();
190 IF (m_bStop == true, 0)
191 break;
193 bool bFoundBlockingFallbackJob = false;
195 // handle fallbacks added by other worker threads
196 for (uint32 i = 0; i < m_nRegularWorkerThreads; ++i)
198 if (m_pRegularWorkerFallbacks[i])
200 JobManager::SInfoBlock* pRegularWorkerFallback = NULL;
203 pRegularWorkerFallback = const_cast<JobManager::SInfoBlock*>(*(const_cast<volatile JobManager::SInfoBlock**>(&m_pRegularWorkerFallbacks[i])));
205 while (CryInterlockedCompareExchangePointer(alias_cast<void* volatile*>(&m_pRegularWorkerFallbacks[i]), pRegularWorkerFallback->pNext, alias_cast<void*>(pRegularWorkerFallback)) != pRegularWorkerFallback);
207 // in case of a fallback job, just get it from the global per thread list
208 pRegularWorkerFallback->AssignMembersTo(&infoBlock);
209 JobManager::CJobManager::CopyJobParameter(infoBlock.paramSize << 4, infoBlock.GetParamAddress(), pRegularWorkerFallback->GetParamAddress());
211 // free temp info block again
212 delete pRegularWorkerFallback;
214 bFoundBlockingFallbackJob = true;
215 break;
219 // in case we didn't find a fallback, try the regular queue
220 if (bFoundBlockingFallbackJob == false)
222 ///////////////////////////////////////////////////////////////////////////
223 // multiple steps to get a job of the queue
224 // 1. get our job slot index
225 uint64 currentPushIndex = ~0;
226 uint64 currentPullIndex = ~0;
227 uint64 newPullIndex = ~0;
228 uint32 nPriorityLevel = ~0;
231 #if CRY_PLATFORM_WINDOWS || CRY_PLATFORM_APPLE || CRY_PLATFORM_LINUX || CRY_PLATFORM_ANDROID // emulate a 64bit atomic read on PC platfom
232 currentPullIndex = CryInterlockedCompareExchange64(alias_cast<volatile int64*>(&m_rJobQueue.pull.index), 0, 0);
233 currentPushIndex = CryInterlockedCompareExchange64(alias_cast<volatile int64*>(&m_rJobQueue.push.index), 0, 0);
234 #else
235 currentPullIndex = *const_cast<volatile uint64*>(&m_rJobQueue.pull.index);
236 currentPushIndex = *const_cast<volatile uint64*>(&m_rJobQueue.push.index);
237 #endif
238 // spin if the updated push ptr didn't reach us yet
239 if (currentPushIndex == currentPullIndex)
240 continue;
242 // compute priority level from difference between push/pull
243 if (!JobManager::SJobQueuePos::IncreasePullIndex(currentPullIndex, currentPushIndex, newPullIndex, nPriorityLevel,
244 m_rJobQueue.GetMaxWorkerQueueJobs(eHighPriority), m_rJobQueue.GetMaxWorkerQueueJobs(eRegularPriority), m_rJobQueue.GetMaxWorkerQueueJobs(eLowPriority), m_rJobQueue.GetMaxWorkerQueueJobs(eStreamPriority)))
245 continue;
247 // stop spinning when we succesfull got the index
248 if (CryInterlockedCompareExchange64(alias_cast<volatile int64*>(&m_rJobQueue.pull.index), newPullIndex, currentPullIndex) == currentPullIndex)
249 break;
252 while (true);
254 // compute our jobslot index from the only increasing publish index
255 uint32 nExtractedCurIndex = static_cast<uint32>(JobManager::SJobQueuePos::ExtractIndex(currentPullIndex, nPriorityLevel));
256 uint32 nNumWorkerQUeueJobs = m_rJobQueue.GetMaxWorkerQueueJobs(nPriorityLevel);
257 uint32 nJobSlot = nExtractedCurIndex & (nNumWorkerQUeueJobs - 1);
259 //CryLogAlways("Got Job From Slot 0x%x nPriorityLevel 0x%x", nJobSlot, nPriorityLevel );
260 // 2. Wait still the produces has finished writing all data to the SInfoBlock
261 JobManager::detail::SJobQueueSlotState* pJobInfoBlockState = &m_rJobQueue.jobInfoBlockStates[nPriorityLevel][nJobSlot];
262 int iter = 0;
263 while (!pJobInfoBlockState->IsReady())
265 CrySleep(iter++ > 10 ? 1 : 0);
269 // 3. Get a local copy of the info block as asson as it is ready to be used
270 JobManager::SInfoBlock* pCurrentJobSlot = &m_rJobQueue.jobInfoBlocks[nPriorityLevel][nJobSlot];
271 pCurrentJobSlot->AssignMembersTo(&infoBlock);
272 JobManager::CJobManager::CopyJobParameter(infoBlock.paramSize << 4, infoBlock.GetParamAddress(), pCurrentJobSlot->GetParamAddress());
274 // 4. Remark the job state as suspended
275 MemoryBarrier();
276 pJobInfoBlockState->SetNotReady();
278 // 5. Mark the jobslot as free again
279 MemoryBarrier();
280 pCurrentJobSlot->Release((1 << JobManager::SJobQueuePos::eBitsPerPriorityLevel) / m_rJobQueue.GetMaxWorkerQueueJobs(nPriorityLevel));
283 ///////////////////////////////////////////////////////////////////////////
284 // now we have a valid SInfoBlock to start work on it
285 // Now we are safe to use the info block
286 assert(infoBlock.jobInvoker);
287 assert(infoBlock.GetParamAddress());
289 #if defined(JOBMANAGER_SUPPORT_PROFILING)
290 SJobProfilingData* pJobProfilingData = gEnv->GetJobManager()->GetProfilingData(infoBlock.profilerIndex);
291 pJobProfilingData->startTime = gEnv->pTimer->GetAsyncTime();
292 pJobProfilingData->isWaiting = false;
293 pJobProfilingData->nWorkerThread = GetWorkerThreadId();
294 #endif
296 #if defined(JOBMANAGER_SUPPORT_STATOSCOPE)
297 const uint64 nStartTime = JobManager::IWorkerBackEndProfiler::GetTimeSample();
298 #endif
300 // call delegator function to invoke job entry
301 #if !defined(_RELEASE) || defined(PERFORMANCE_BUILD)
302 CRY_PROFILE_REGION(PROFILE_SYSTEM, "Job");
303 #endif
304 (*infoBlock.jobInvoker)(infoBlock.GetParamAddress());
306 #if defined(JOBMANAGER_SUPPORT_STATOSCOPE)
307 JobManager::IWorkerBackEndProfiler* workerProfiler = m_pBlockingBackend->GetBackEndWorkerProfiler();
308 const uint64 nEndTime = JobManager::IWorkerBackEndProfiler::GetTimeSample();
309 workerProfiler->RecordJob(infoBlock.frameProfIndex, m_nId, static_cast<const uint32>(infoBlock.jobId), static_cast<const uint32>(nEndTime - nStartTime));
310 #endif
312 IF (infoBlock.GetJobState(), 1)
314 SJobState* pJobState = infoBlock.GetJobState();
315 pJobState->SetStopped();
318 #if defined(JOBMANAGER_SUPPORT_PROFILING)
319 pJobProfilingData->endTime = gEnv->pTimer->GetAsyncTime();
320 #endif
322 while (m_bStop == false);
324 ///////////////////////////////////////////////////////////////////////////////
325 ILINE void IncrQueuePullPointer_Blocking(INT_PTR& rCurPullAddr, const INT_PTR cIncr, const INT_PTR cQueueStart, const INT_PTR cQueueEnd)
327 const INT_PTR cNextPull = rCurPullAddr + cIncr;
328 rCurPullAddr = (cNextPull >= cQueueEnd) ? cQueueStart : cNextPull;
331 ///////////////////////////////////////////////////////////////////////////////
332 JobManager::BlockingBackEnd::CBlockingBackEndWorkerThread::CBlockingBackEndWorkerThread(CBlockingBackEnd* pBlockingBackend, CryFastSemaphore& rSemaphore, JobManager::SJobQueue_BlockingBackEnd& rJobQueue, JobManager::SInfoBlock** pRegularWorkerFallbacks, uint32 nRegularWorkerThreads, uint32 nID) :
333 m_rSemaphore(rSemaphore),
334 m_rJobQueue(rJobQueue),
335 m_bStop(false),
336 m_pBlockingBackend(pBlockingBackend),
337 m_nId(nID),
338 m_pRegularWorkerFallbacks(pRegularWorkerFallbacks),
339 m_nRegularWorkerThreads(nRegularWorkerThreads)
343 ///////////////////////////////////////////////////////////////////////////////
344 JobManager::BlockingBackEnd::CBlockingBackEndWorkerThread::~CBlockingBackEndWorkerThread()
349 ///////////////////////////////////////////////////////////////////////////////
350 void JobManager::BlockingBackEnd::CBlockingBackEnd::AddBlockingFallbackJob(JobManager::SInfoBlock* pInfoBlock, uint32 nWorkerThreadID)
352 volatile JobManager::SInfoBlock* pCurrentWorkerFallback = NULL;
355 pCurrentWorkerFallback = *(const_cast<volatile JobManager::SInfoBlock**>(&m_pRegularWorkerFallbacks[nWorkerThreadID]));
356 pInfoBlock->pNext = const_cast<JobManager::SInfoBlock*>(pCurrentWorkerFallback);
358 while (CryInterlockedCompareExchangePointer(alias_cast<void* volatile*>(&m_pRegularWorkerFallbacks[nWorkerThreadID]), pInfoBlock, alias_cast<void*>(pCurrentWorkerFallback)) != pCurrentWorkerFallback);