1 // Copyright 2001-2018 Crytek GmbH / Crytek Group. All rights reserved.
3 // -------------------------------------------------------------------------
4 // File name: BlockingBackEnd.h
6 // Created: 07/05/2011 by Christopher Bolte
7 // Compilers: Visual Studio.NET
8 // -------------------------------------------------------------------------
10 ////////////////////////////////////////////////////////////////////////////
12 #include "BlockingBackEnd.h"
13 #include "../JobManager.h"
14 #include "../../System.h"
15 #include "../../CPUDetect.h"
17 // used to distinguish regular and blocking worker threads
18 #define BLOCKING_WORKER_ID_FLAG 0x40000000
20 ///////////////////////////////////////////////////////////////////////////////
21 JobManager::BlockingBackEnd::CBlockingBackEnd::CBlockingBackEnd(JobManager::SInfoBlock
** pRegularWorkerFallbacks
, uint32 nRegularWorkerThreads
) :
22 m_Semaphore(SJobQueue_BlockingBackEnd::eMaxWorkQueueJobsSize
),
23 m_pRegularWorkerFallbacks(pRegularWorkerFallbacks
),
24 m_nRegularWorkerThreads(nRegularWorkerThreads
),
25 m_pWorkerThreads(NULL
),
30 #if defined(JOBMANAGER_SUPPORT_STATOSCOPE)
31 m_pBackEndWorkerProfiler
= 0;
35 ///////////////////////////////////////////////////////////////////////////////
36 JobManager::BlockingBackEnd::CBlockingBackEnd::~CBlockingBackEnd()
40 ///////////////////////////////////////////////////////////////////////////////
41 bool JobManager::BlockingBackEnd::CBlockingBackEnd::Init(uint32 nSysMaxWorker
)
43 m_pWorkerThreads
= new CBlockingBackEndWorkerThread
*[nSysMaxWorker
];
45 // create single worker thread for blocking backend
46 for (uint32 i
= 0; i
< nSysMaxWorker
; ++i
)
48 m_pWorkerThreads
[i
] = new CBlockingBackEndWorkerThread(this, m_Semaphore
, m_JobQueue
, m_pRegularWorkerFallbacks
, m_nRegularWorkerThreads
, i
);
50 if (!gEnv
->pThreadManager
->SpawnThread(m_pWorkerThreads
[i
], "JobSystem_Worker_%u (Blocking)", i
))
52 CryFatalError("Error spawning \"JobSystem_Worker_%u (Blocking)\" thread.", i
);
56 m_nNumWorker
= nSysMaxWorker
;
58 #if defined(JOBMANAGER_SUPPORT_STATOSCOPE)
59 m_pBackEndWorkerProfiler
= new JobManager::CWorkerBackEndProfiler
;
60 m_pBackEndWorkerProfiler
->Init(m_nNumWorker
);
66 ///////////////////////////////////////////////////////////////////////////////
67 bool JobManager::BlockingBackEnd::CBlockingBackEnd::ShutDown()
69 for (uint32 i
= 0; i
< m_nNumWorker
; ++i
)
71 if (m_pWorkerThreads
[i
] == NULL
)
74 m_pWorkerThreads
[i
]->SignalStopWork();
75 m_Semaphore
.Release();
78 for (uint32 i
= 0; i
< m_nNumWorker
; ++i
)
80 if (m_pWorkerThreads
[i
] == NULL
)
83 if (gEnv
->pThreadManager
->JoinThread(m_pWorkerThreads
[i
], eJM_TryJoin
))
85 delete m_pWorkerThreads
[i
];
86 m_pWorkerThreads
[i
] = NULL
;
90 m_pWorkerThreads
= NULL
;
93 #if defined(JOBMANAGER_SUPPORT_STATOSCOPE)
94 SAFE_DELETE(m_pBackEndWorkerProfiler
);
100 ///////////////////////////////////////////////////////////////////////////////
101 void JobManager::BlockingBackEnd::CBlockingBackEnd::AddJob(JobManager::CJobDelegator
& crJob
, const JobManager::TJobHandle cJobHandle
, JobManager::SInfoBlock
& rInfoBlock
)
103 uint32 nJobPriority
= crJob
.GetPriorityLevel();
105 /////////////////////////////////////////////////////////////////////////////
106 // Acquire Infoblock to use
109 m_JobQueue
.GetJobSlot(jobSlot
, nJobPriority
);
111 #if !defined(_RELEASE)
112 CJobManager::Instance()->IncreaseRunJobs();
114 // copy info block into job queue
115 JobManager::SInfoBlock
& RESTRICT_REFERENCE rJobInfoBlock
= m_JobQueue
.jobInfoBlocks
[nJobPriority
][jobSlot
];
117 // since we will use the whole InfoBlock, and it is aligned to 128 bytes, clear the cacheline, this is faster than a cachemiss on write
118 //STATIC_CHECK( sizeof(JobManager::SInfoBlock) == 512, ERROR_SIZE_OF_SINFOBLOCK_NOT_EQUALS_512 );
120 // first cache line needs to be persistent
121 ResetLine128(&rJobInfoBlock
, 128);
122 ResetLine128(&rJobInfoBlock
, 256);
123 ResetLine128(&rJobInfoBlock
, 384);
125 /////////////////////////////////////////////////////////////////////////////
126 // Initialize the InfoBlock
127 rInfoBlock
.AssignMembersTo(&rJobInfoBlock
);
129 JobManager::CJobManager::CopyJobParameter(crJob
.GetParamDataSize(), rJobInfoBlock
.GetParamAddress(), crJob
.GetJobParamData());
131 assert(rInfoBlock
.jobInvoker
);
133 const uint32 cJobId
= cJobHandle
->jobId
;
134 rJobInfoBlock
.jobId
= (unsigned char)cJobId
;
136 #if defined(JOBMANAGER_SUPPORT_STATOSCOPE)
137 assert(cJobId
< JobManager::detail::eJOB_FRAME_STATS_MAX_SUPP_JOBS
);
138 m_pBackEndWorkerProfiler
->RegisterJob(cJobId
, CJobManager::Instance()->GetJobName(rInfoBlock
.jobInvoker
));
139 rJobInfoBlock
.frameProfIndex
= (unsigned char)m_pBackEndWorkerProfiler
->GetProfileIndex();
142 /////////////////////////////////////////////////////////////////////////////
143 // initialization finished, make all visible for worker threads
144 // the producing thread won't need the info block anymore, flush it from the cache
145 FlushLine128(&rJobInfoBlock
, 0);
146 FlushLine128(&rJobInfoBlock
, 128);
147 FlushLine128(&rJobInfoBlock
, 256);
148 FlushLine128(&rJobInfoBlock
, 384);
150 //CryLogAlways("Add Job to Slot 0x%x, priority 0x%x", jobSlot, nJobPriority );
152 m_JobQueue
.jobInfoBlockStates
[nJobPriority
][jobSlot
].SetReady();
154 // Release semaphore count to signal the workers that work is available
155 m_Semaphore
.Release();
158 ///////////////////////////////////////////////////////////////////////////////
159 void JobManager::BlockingBackEnd::CBlockingBackEndWorkerThread::SignalStopWork()
164 bool JobManager::BlockingBackEnd::IsBlockingWorkerId(uint32 workerId
)
166 return (workerId
& BLOCKING_WORKER_ID_FLAG
) != 0;
169 uint32
JobManager::BlockingBackEnd::GetIndexFromWorkerId(uint32 workerId
)
171 CRY_ASSERT(IsBlockingWorkerId(workerId
));
172 return workerId
& !BLOCKING_WORKER_ID_FLAG
;
175 //////////////////////////////////////////////////////////////////////////
176 void JobManager::BlockingBackEnd::CBlockingBackEndWorkerThread::ThreadEntry()
179 JobManager::detail::SetWorkerThreadId(m_nId
| BLOCKING_WORKER_ID_FLAG
);
182 SInfoBlock infoBlock
;
183 ///////////////////////////////////////////////////////////////////////////
186 //CRY_PROFILE_REGION_WAITING(PROFILE_SYSTEM, "Wait - JobWorkerThread");
187 m_rSemaphore
.Acquire();
190 IF (m_bStop
== true, 0)
193 bool bFoundBlockingFallbackJob
= false;
195 // handle fallbacks added by other worker threads
196 for (uint32 i
= 0; i
< m_nRegularWorkerThreads
; ++i
)
198 if (m_pRegularWorkerFallbacks
[i
])
200 JobManager::SInfoBlock
* pRegularWorkerFallback
= NULL
;
203 pRegularWorkerFallback
= const_cast<JobManager::SInfoBlock
*>(*(const_cast<volatile JobManager::SInfoBlock
**>(&m_pRegularWorkerFallbacks
[i
])));
205 while (CryInterlockedCompareExchangePointer(alias_cast
<void* volatile*>(&m_pRegularWorkerFallbacks
[i
]), pRegularWorkerFallback
->pNext
, alias_cast
<void*>(pRegularWorkerFallback
)) != pRegularWorkerFallback
);
207 // in case of a fallback job, just get it from the global per thread list
208 pRegularWorkerFallback
->AssignMembersTo(&infoBlock
);
209 JobManager::CJobManager::CopyJobParameter(infoBlock
.paramSize
<< 4, infoBlock
.GetParamAddress(), pRegularWorkerFallback
->GetParamAddress());
211 // free temp info block again
212 delete pRegularWorkerFallback
;
214 bFoundBlockingFallbackJob
= true;
219 // in case we didn't find a fallback, try the regular queue
220 if (bFoundBlockingFallbackJob
== false)
222 ///////////////////////////////////////////////////////////////////////////
223 // multiple steps to get a job of the queue
224 // 1. get our job slot index
225 uint64 currentPushIndex
= ~0;
226 uint64 currentPullIndex
= ~0;
227 uint64 newPullIndex
= ~0;
228 uint32 nPriorityLevel
= ~0;
231 #if CRY_PLATFORM_WINDOWS || CRY_PLATFORM_APPLE || CRY_PLATFORM_LINUX || CRY_PLATFORM_ANDROID // emulate a 64bit atomic read on PC platfom
232 currentPullIndex
= CryInterlockedCompareExchange64(alias_cast
<volatile int64
*>(&m_rJobQueue
.pull
.index
), 0, 0);
233 currentPushIndex
= CryInterlockedCompareExchange64(alias_cast
<volatile int64
*>(&m_rJobQueue
.push
.index
), 0, 0);
235 currentPullIndex
= *const_cast<volatile uint64
*>(&m_rJobQueue
.pull
.index
);
236 currentPushIndex
= *const_cast<volatile uint64
*>(&m_rJobQueue
.push
.index
);
238 // spin if the updated push ptr didn't reach us yet
239 if (currentPushIndex
== currentPullIndex
)
242 // compute priority level from difference between push/pull
243 if (!JobManager::SJobQueuePos::IncreasePullIndex(currentPullIndex
, currentPushIndex
, newPullIndex
, nPriorityLevel
,
244 m_rJobQueue
.GetMaxWorkerQueueJobs(eHighPriority
), m_rJobQueue
.GetMaxWorkerQueueJobs(eRegularPriority
), m_rJobQueue
.GetMaxWorkerQueueJobs(eLowPriority
), m_rJobQueue
.GetMaxWorkerQueueJobs(eStreamPriority
)))
247 // stop spinning when we succesfull got the index
248 if (CryInterlockedCompareExchange64(alias_cast
<volatile int64
*>(&m_rJobQueue
.pull
.index
), newPullIndex
, currentPullIndex
) == currentPullIndex
)
254 // compute our jobslot index from the only increasing publish index
255 uint32 nExtractedCurIndex
= static_cast<uint32
>(JobManager::SJobQueuePos::ExtractIndex(currentPullIndex
, nPriorityLevel
));
256 uint32 nNumWorkerQUeueJobs
= m_rJobQueue
.GetMaxWorkerQueueJobs(nPriorityLevel
);
257 uint32 nJobSlot
= nExtractedCurIndex
& (nNumWorkerQUeueJobs
- 1);
259 //CryLogAlways("Got Job From Slot 0x%x nPriorityLevel 0x%x", nJobSlot, nPriorityLevel );
260 // 2. Wait still the produces has finished writing all data to the SInfoBlock
261 JobManager::detail::SJobQueueSlotState
* pJobInfoBlockState
= &m_rJobQueue
.jobInfoBlockStates
[nPriorityLevel
][nJobSlot
];
263 while (!pJobInfoBlockState
->IsReady())
265 CrySleep(iter
++ > 10 ? 1 : 0);
269 // 3. Get a local copy of the info block as asson as it is ready to be used
270 JobManager::SInfoBlock
* pCurrentJobSlot
= &m_rJobQueue
.jobInfoBlocks
[nPriorityLevel
][nJobSlot
];
271 pCurrentJobSlot
->AssignMembersTo(&infoBlock
);
272 JobManager::CJobManager::CopyJobParameter(infoBlock
.paramSize
<< 4, infoBlock
.GetParamAddress(), pCurrentJobSlot
->GetParamAddress());
274 // 4. Remark the job state as suspended
276 pJobInfoBlockState
->SetNotReady();
278 // 5. Mark the jobslot as free again
280 pCurrentJobSlot
->Release((1 << JobManager::SJobQueuePos::eBitsPerPriorityLevel
) / m_rJobQueue
.GetMaxWorkerQueueJobs(nPriorityLevel
));
283 ///////////////////////////////////////////////////////////////////////////
284 // now we have a valid SInfoBlock to start work on it
285 // Now we are safe to use the info block
286 assert(infoBlock
.jobInvoker
);
287 assert(infoBlock
.GetParamAddress());
289 #if defined(JOBMANAGER_SUPPORT_PROFILING)
290 SJobProfilingData
* pJobProfilingData
= gEnv
->GetJobManager()->GetProfilingData(infoBlock
.profilerIndex
);
291 pJobProfilingData
->startTime
= gEnv
->pTimer
->GetAsyncTime();
292 pJobProfilingData
->isWaiting
= false;
293 pJobProfilingData
->nWorkerThread
= GetWorkerThreadId();
296 #if defined(JOBMANAGER_SUPPORT_STATOSCOPE)
297 const uint64 nStartTime
= JobManager::IWorkerBackEndProfiler::GetTimeSample();
300 // call delegator function to invoke job entry
301 #if !defined(_RELEASE) || defined(PERFORMANCE_BUILD)
302 CRY_PROFILE_REGION(PROFILE_SYSTEM
, "Job");
304 (*infoBlock
.jobInvoker
)(infoBlock
.GetParamAddress());
306 #if defined(JOBMANAGER_SUPPORT_STATOSCOPE)
307 JobManager::IWorkerBackEndProfiler
* workerProfiler
= m_pBlockingBackend
->GetBackEndWorkerProfiler();
308 const uint64 nEndTime
= JobManager::IWorkerBackEndProfiler::GetTimeSample();
309 workerProfiler
->RecordJob(infoBlock
.frameProfIndex
, m_nId
, static_cast<const uint32
>(infoBlock
.jobId
), static_cast<const uint32
>(nEndTime
- nStartTime
));
312 IF (infoBlock
.GetJobState(), 1)
314 SJobState
* pJobState
= infoBlock
.GetJobState();
315 pJobState
->SetStopped();
318 #if defined(JOBMANAGER_SUPPORT_PROFILING)
319 pJobProfilingData
->endTime
= gEnv
->pTimer
->GetAsyncTime();
322 while (m_bStop
== false);
324 ///////////////////////////////////////////////////////////////////////////////
325 ILINE
void IncrQueuePullPointer_Blocking(INT_PTR
& rCurPullAddr
, const INT_PTR cIncr
, const INT_PTR cQueueStart
, const INT_PTR cQueueEnd
)
327 const INT_PTR cNextPull
= rCurPullAddr
+ cIncr
;
328 rCurPullAddr
= (cNextPull
>= cQueueEnd
) ? cQueueStart
: cNextPull
;
331 ///////////////////////////////////////////////////////////////////////////////
332 JobManager::BlockingBackEnd::CBlockingBackEndWorkerThread::CBlockingBackEndWorkerThread(CBlockingBackEnd
* pBlockingBackend
, CryFastSemaphore
& rSemaphore
, JobManager::SJobQueue_BlockingBackEnd
& rJobQueue
, JobManager::SInfoBlock
** pRegularWorkerFallbacks
, uint32 nRegularWorkerThreads
, uint32 nID
) :
333 m_rSemaphore(rSemaphore
),
334 m_rJobQueue(rJobQueue
),
336 m_pBlockingBackend(pBlockingBackend
),
338 m_pRegularWorkerFallbacks(pRegularWorkerFallbacks
),
339 m_nRegularWorkerThreads(nRegularWorkerThreads
)
343 ///////////////////////////////////////////////////////////////////////////////
344 JobManager::BlockingBackEnd::CBlockingBackEndWorkerThread::~CBlockingBackEndWorkerThread()
349 ///////////////////////////////////////////////////////////////////////////////
350 void JobManager::BlockingBackEnd::CBlockingBackEnd::AddBlockingFallbackJob(JobManager::SInfoBlock
* pInfoBlock
, uint32 nWorkerThreadID
)
352 volatile JobManager::SInfoBlock
* pCurrentWorkerFallback
= NULL
;
355 pCurrentWorkerFallback
= *(const_cast<volatile JobManager::SInfoBlock
**>(&m_pRegularWorkerFallbacks
[nWorkerThreadID
]));
356 pInfoBlock
->pNext
= const_cast<JobManager::SInfoBlock
*>(pCurrentWorkerFallback
);
358 while (CryInterlockedCompareExchangePointer(alias_cast
<void* volatile*>(&m_pRegularWorkerFallbacks
[nWorkerThreadID
]), pInfoBlock
, alias_cast
<void*>(pCurrentWorkerFallback
)) != pCurrentWorkerFallback
);