Bug #1437: CkLoop worker traces to previous entry on pe rather than
[charm.git] / src / libs / ck-libs / ckloop / CkLoop.h
blobcd7a25c7276c9fac3a3f8d11c0c4d9646a3deb05
1 #ifndef _CKLOOP_H
2 #define _CKLOOP_H
3 #include <assert.h>
5 #include "charm++.h"
6 #include "CkLoopAPI.h"
8 #define USE_TREE_BROADCAST_THRESHOLD 8
9 #define TREE_BCAST_BRANCH (4)
10 #define CACHE_LINE_SIZE 64
11 /* 1. Using converse-level msg, then the msg is always of highest priority.
12 * And the notification msg comes from the singlehelper where the loop parallelization
13 * is initiated.
15 * 2. Using charm-level msg, then the msg could be set with different priorities.
16 * However, the notification msg comes from the singlehelper where the parallelized
17 * loop is executed.
19 * */
20 #define USE_CONVERSE_NOTIFICATION 1
22 CmiNodeLock loop_info_inited_lock;
24 #if CMK_TRACE_ENABLED
25 CpvDeclare(envelope*, dummyEnv);
26 #endif
28 class FuncSingleHelper;
30 class CurLoopInfo {
31 friend class FuncSingleHelper;
33 private:
34 int curChunkIdx; // Should become std::atomic<int>
35 int numChunks;
36 HelperFn fnPtr;
37 int lowerIndex;
38 int upperIndex;
39 int paramNum;
40 void *param;
41 //limitation: only allow single variable reduction of size numChunks!!!
42 void **redBufs;
43 char *bufSpace;
45 volatile int finishFlag;
47 //a tag to indicate whether the task for this new loop has been inited
48 //this tag is needed to prevent other helpers to run the old task
49 int inited;
51 public:
52 CurLoopInfo(int maxChunks):numChunks(0),fnPtr(NULL), lowerIndex(-1), upperIndex(0),
53 paramNum(0), param(NULL), curChunkIdx(-1), finishFlag(0), redBufs(NULL), bufSpace(NULL), inited(0) {
54 redBufs = new void *[maxChunks];
55 bufSpace = new char[maxChunks * CACHE_LINE_SIZE];
56 for (int i=0; i<maxChunks; i++) redBufs[i] = (void *)(bufSpace+i*CACHE_LINE_SIZE);
59 ~CurLoopInfo() {
60 delete [] redBufs;
61 delete [] bufSpace;
64 void set(int nc, HelperFn f, int lIdx, int uIdx, int numParams, void *p) { /*
65 * The locking is to handle a rare data-racing case here. The current loop is
66 * about to finish (just before setting inited to 0; A helper (say B)
67 * just enters the stealWork and passes the inited check. The helper
68 * (say A) is very fast, and starts the next loop, and happens enter
69 * into the middle of this function. Then helper B will face corrupted
70 * task info as it is trying to execute the old loop task!
71 * In reality for user cases, this case happens very rarely!! -Chao Mei
73 CmiLock(loop_info_inited_lock);
74 numChunks = nc;
75 fnPtr = f;
76 lowerIndex = lIdx;
77 upperIndex = uIdx;
78 paramNum = numParams;
79 param = p;
80 curChunkIdx = -1;
81 finishFlag = 0;
82 //needs to be set last
83 inited = 1;
84 CmiUnlock(loop_info_inited_lock);
87 void waitLoopDone(int sync) {
88 //while(!__sync_bool_compare_and_swap(&finishFlag, numChunks, 0));
89 if (sync) while (finishFlag!=numChunks);
90 //finishFlag = 0;
91 CmiLock(loop_info_inited_lock);
92 inited = 0;
93 CmiUnlock(loop_info_inited_lock);
95 int getNextChunkIdx() {
96 int next_chunk_id;
97 CmiMemoryAtomicFetchAndInc(curChunkIdx, next_chunk_id);
98 return next_chunk_id+1;
100 void reportFinished(int counter) {
101 if (counter==0) return;
102 #if defined(_WIN32)
103 #if CMK_SMP
104 CmiLock(cmiMemoryLock);
105 finishFlag=finishFlag+counter;
106 CmiUnlock(cmiMemoryLock);
107 #else
108 finishFlag=finishFlag+counter;
109 #endif
110 #else
111 __sync_add_and_fetch(&finishFlag, counter);
112 #endif
115 int isFree() {
116 return finishFlag == numChunks;
119 void **getRedBufs() {
120 return redBufs;
123 void stealWork();
126 /* FuncCkLoop is a nodegroup object */
128 typedef struct converseNotifyMsg {
129 char core[CmiMsgHeaderSizeBytes];
130 int srcRank;
131 unsigned int eventID;
132 void *ptr;
133 } ConverseNotifyMsg;
135 class CharmNotifyMsg: public CMessage_CharmNotifyMsg {
136 public:
137 int srcRank;
138 void *ptr; //the loop info
141 class HelperNotifyMsg: public CMessage_HelperNotifyMsg {
142 public:
143 int srcRank;
144 FuncSingleHelper *localHelper;
147 class DestroyNotifyMsg: public CMessage_DestroyNotifyMsg {};
149 class FuncCkLoop : public CBase_FuncCkLoop {
150 friend class FuncSingleHelper;
152 public:
153 static int MAX_CHUNKS;
154 private:
155 int mode;
157 int numHelpers; //in pthread mode, the counter includes itself
158 FuncSingleHelper **helperPtr; /* ptrs to the FuncSingleHelpers it manages */
159 int useTreeBcast;
161 public:
162 FuncCkLoop(int mode_, int numThreads_);
164 FuncCkLoop(CkMigrateMessage *m);
166 ~FuncCkLoop() {
167 #if CMK_TRACE_ENABLED
168 int i;
169 for (i = 0; i < CkMyNodeSize(); i++)
170 CmiFree(CpvAccessOther(dummyEnv,i));
171 #endif
172 CmiDestroyLock(loop_info_inited_lock);
173 delete [] helperPtr;
176 // This entry method is used during restart. When the helper chares are
177 // restarted, the FuncCkLoop node group need not be constructed. So the
178 // helper chares send message to the node proxy on their node to register
179 // themselves.
180 void registerHelper(HelperNotifyMsg* msg);
182 void createPThreads();
183 void exit();
184 void init(int mode_, int numThreads_);
186 int getNumHelpers() {
187 return numHelpers;
189 int needTreeBcast() {
190 return useTreeBcast;
193 void parallelizeFunc(HelperFn func, /* the function that finishes a partial work on another thread */
194 int paramNum, void * param, /* the input parameters for the above func */
195 int numChunks, /* number of chunks to be partitioned */
196 int lowerRange, int upperRange, /* the loop-like parallelization happens in [lowerRange, upperRange] */
197 int sync=1, /* whether the flow will continue until all chunks have finished */
198 void *redResult=NULL, REDUCTION_TYPE type=CKLOOP_NONE, /* the reduction result, ONLY SUPPORT SINGLE VAR of TYPE int/float/double */
199 CallerFn cfunc=NULL, /* the caller PE will call this function before starting to work on the chunks */
200 int cparamNum=0, void* cparam=NULL /* the input parameters to the above function */
202 void destroyHelpers();
203 void reduce(void **redBufs, void *redBuf, REDUCTION_TYPE type, int numChunks);
204 void pup(PUP::er &p);
207 void SingleHelperStealWork(ConverseNotifyMsg *msg);
209 /* FuncSingleHelper is a chare located on every core of a node */
210 //allowing arbitrary combination of sync and unsync parallelizd loops
211 #define TASK_BUFFER_SIZE (3)
212 class FuncSingleHelper: public CBase_FuncSingleHelper {
213 friend class FuncCkLoop;
214 private:
215 int totalHelpers;
216 int notifyMsgBufSize;
218 FuncCkLoop *thisCkLoop;
219 CProxy_FuncCkLoop funcckproxy;
220 int useTreeBcast;
222 #if USE_CONVERSE_NOTIFICATION
223 //this msg is shared across all SingleHelpers
224 ConverseNotifyMsg *notifyMsg;
225 #else
226 //acted as a msg buffer for charm-level notification msgs sent to other
227 //SingleHelpers. At each sending,
228 //1. the msg destination chare (SingleHelper) has to be set.
229 //2. the associated loop info has to be set.
230 CharmNotifyMsg **notifyMsg;
231 CurLoopInfo **taskBuffer;
232 int nextFreeTaskBuffer;
233 #endif
234 int nextFreeNotifyMsg;
236 public:
237 FuncSingleHelper();
239 ~FuncSingleHelper() {
240 #if USE_CONVERSE_NOTIFICATION
241 for (int i=0; i<notifyMsgBufSize; i++) {
242 ConverseNotifyMsg *tmp = notifyMsg+i;
243 CurLoopInfo *loop = (CurLoopInfo *)(tmp->ptr);
244 delete loop;
246 free(notifyMsg);
247 #else
248 for (int i=0; i<notifyMsgBufSize; i++) delete notifyMsg[i];
249 free(notifyMsg);
250 for (int i=0; i<TASK_BUFFER_SIZE; i++) delete taskBuffer[i];
251 free(taskBuffer);
252 #endif
254 #if USE_CONVERSE_NOTIFICATION
255 ConverseNotifyMsg *getNotifyMsg() {
256 while (1) {
257 ConverseNotifyMsg *cur = notifyMsg+nextFreeNotifyMsg;
258 CurLoopInfo *loop = (CurLoopInfo *)(cur->ptr);
259 nextFreeNotifyMsg = (nextFreeNotifyMsg+1)%notifyMsgBufSize;
260 if (loop->isFree()) return cur;
262 return NULL;
264 #else
265 CharmNotifyMsg *getNotifyMsg() {
266 while (1) {
267 CharmNotifyMsg *cur = notifyMsg[nextFreeNotifyMsg];
268 CurLoopInfo *loop = (CurLoopInfo *)(cur->ptr);
269 nextFreeNotifyMsg = (nextFreeNotifyMsg+1)%notifyMsgBufSize;
270 if (loop==NULL || loop->isFree()) return cur;
272 return NULL;
274 CurLoopInfo *getNewTask() {
275 while (1) {
276 CurLoopInfo *cur = taskBuffer[nextFreeTaskBuffer];
277 nextFreeTaskBuffer = (nextFreeTaskBuffer+1)%TASK_BUFFER_SIZE;
278 if (cur->isFree()) return cur;
280 return NULL;
282 #endif
284 void stealWork(CharmNotifyMsg *msg);
285 void destroyMyself() {
286 delete this;
289 FuncSingleHelper(CkMigrateMessage *m) : CBase_FuncSingleHelper(m) {}
291 private:
292 void createNotifyMsg();
296 #endif