8 #define USE_TREE_BROADCAST_THRESHOLD 8
9 #define TREE_BCAST_BRANCH (4)
10 #define CACHE_LINE_SIZE 64
11 /* 1. Using converse-level msg, then the msg is always of highest priority.
12 * And the notification msg comes from the singlehelper where the loop parallelization
15 * 2. Using charm-level msg, then the msg could be set with different priorities.
16 * However, the notification msg comes from the singlehelper where the parallelized
20 #define USE_CONVERSE_NOTIFICATION 1
22 CmiNodeLock loop_info_inited_lock
;
25 CpvDeclare(envelope
*, dummyEnv
);
28 class FuncSingleHelper
;
31 friend class FuncSingleHelper
;
34 int curChunkIdx
; // Should become std::atomic<int>
41 //limitation: only allow single variable reduction of size numChunks!!!
45 volatile int finishFlag
;
47 //a tag to indicate whether the task for this new loop has been inited
48 //this tag is needed to prevent other helpers to run the old task
52 CurLoopInfo(int maxChunks
):numChunks(0),fnPtr(NULL
), lowerIndex(-1), upperIndex(0),
53 paramNum(0), param(NULL
), curChunkIdx(-1), finishFlag(0), redBufs(NULL
), bufSpace(NULL
), inited(0) {
54 redBufs
= new void *[maxChunks
];
55 bufSpace
= new char[maxChunks
* CACHE_LINE_SIZE
];
56 for (int i
=0; i
<maxChunks
; i
++) redBufs
[i
] = (void *)(bufSpace
+i
*CACHE_LINE_SIZE
);
64 void set(int nc
, HelperFn f
, int lIdx
, int uIdx
, int numParams
, void *p
) { /*
65 * The locking is to handle a rare data-racing case here. The current loop is
66 * about to finish (just before setting inited to 0; A helper (say B)
67 * just enters the stealWork and passes the inited check. The helper
68 * (say A) is very fast, and starts the next loop, and happens enter
69 * into the middle of this function. Then helper B will face corrupted
70 * task info as it is trying to execute the old loop task!
71 * In reality for user cases, this case happens very rarely!! -Chao Mei
73 CmiLock(loop_info_inited_lock
);
82 //needs to be set last
84 CmiUnlock(loop_info_inited_lock
);
87 void waitLoopDone(int sync
) {
88 //while(!__sync_bool_compare_and_swap(&finishFlag, numChunks, 0));
89 if (sync
) while (finishFlag
!=numChunks
);
91 CmiLock(loop_info_inited_lock
);
93 CmiUnlock(loop_info_inited_lock
);
95 int getNextChunkIdx() {
97 CmiMemoryAtomicFetchAndInc(curChunkIdx
, next_chunk_id
);
98 return next_chunk_id
+1;
100 void reportFinished(int counter
) {
101 if (counter
==0) return;
104 CmiLock(cmiMemoryLock
);
105 finishFlag
=finishFlag
+counter
;
106 CmiUnlock(cmiMemoryLock
);
108 finishFlag
=finishFlag
+counter
;
111 __sync_add_and_fetch(&finishFlag
, counter
);
116 return finishFlag
== numChunks
;
119 void **getRedBufs() {
126 /* FuncCkLoop is a nodegroup object */
128 typedef struct converseNotifyMsg
{
129 char core
[CmiMsgHeaderSizeBytes
];
131 unsigned int eventID
;
135 class CharmNotifyMsg
: public CMessage_CharmNotifyMsg
{
138 void *ptr
; //the loop info
141 class HelperNotifyMsg
: public CMessage_HelperNotifyMsg
{
144 FuncSingleHelper
*localHelper
;
147 class DestroyNotifyMsg
: public CMessage_DestroyNotifyMsg
{};
149 class FuncCkLoop
: public CBase_FuncCkLoop
{
150 friend class FuncSingleHelper
;
153 static int MAX_CHUNKS
;
157 int numHelpers
; //in pthread mode, the counter includes itself
158 FuncSingleHelper
**helperPtr
; /* ptrs to the FuncSingleHelpers it manages */
162 FuncCkLoop(int mode_
, int numThreads_
);
164 FuncCkLoop(CkMigrateMessage
*m
);
167 #if CMK_TRACE_ENABLED
169 for (i
= 0; i
< CkMyNodeSize(); i
++)
170 CmiFree(CpvAccessOther(dummyEnv
,i
));
172 CmiDestroyLock(loop_info_inited_lock
);
176 // This entry method is used during restart. When the helper chares are
177 // restarted, the FuncCkLoop node group need not be constructed. So the
178 // helper chares send message to the node proxy on their node to register
180 void registerHelper(HelperNotifyMsg
* msg
);
182 void createPThreads();
184 void init(int mode_
, int numThreads_
);
186 int getNumHelpers() {
189 int needTreeBcast() {
193 void parallelizeFunc(HelperFn func
, /* the function that finishes a partial work on another thread */
194 int paramNum
, void * param
, /* the input parameters for the above func */
195 int numChunks
, /* number of chunks to be partitioned */
196 int lowerRange
, int upperRange
, /* the loop-like parallelization happens in [lowerRange, upperRange] */
197 int sync
=1, /* whether the flow will continue until all chunks have finished */
198 void *redResult
=NULL
, REDUCTION_TYPE type
=CKLOOP_NONE
, /* the reduction result, ONLY SUPPORT SINGLE VAR of TYPE int/float/double */
199 CallerFn cfunc
=NULL
, /* the caller PE will call this function before starting to work on the chunks */
200 int cparamNum
=0, void* cparam
=NULL
/* the input parameters to the above function */
202 void destroyHelpers();
203 void reduce(void **redBufs
, void *redBuf
, REDUCTION_TYPE type
, int numChunks
);
204 void pup(PUP::er
&p
);
207 void SingleHelperStealWork(ConverseNotifyMsg
*msg
);
209 /* FuncSingleHelper is a chare located on every core of a node */
210 //allowing arbitrary combination of sync and unsync parallelizd loops
211 #define TASK_BUFFER_SIZE (3)
212 class FuncSingleHelper
: public CBase_FuncSingleHelper
{
213 friend class FuncCkLoop
;
216 int notifyMsgBufSize
;
218 FuncCkLoop
*thisCkLoop
;
219 CProxy_FuncCkLoop funcckproxy
;
222 #if USE_CONVERSE_NOTIFICATION
223 //this msg is shared across all SingleHelpers
224 ConverseNotifyMsg
*notifyMsg
;
226 //acted as a msg buffer for charm-level notification msgs sent to other
227 //SingleHelpers. At each sending,
228 //1. the msg destination chare (SingleHelper) has to be set.
229 //2. the associated loop info has to be set.
230 CharmNotifyMsg
**notifyMsg
;
231 CurLoopInfo
**taskBuffer
;
232 int nextFreeTaskBuffer
;
234 int nextFreeNotifyMsg
;
239 ~FuncSingleHelper() {
240 #if USE_CONVERSE_NOTIFICATION
241 for (int i
=0; i
<notifyMsgBufSize
; i
++) {
242 ConverseNotifyMsg
*tmp
= notifyMsg
+i
;
243 CurLoopInfo
*loop
= (CurLoopInfo
*)(tmp
->ptr
);
248 for (int i
=0; i
<notifyMsgBufSize
; i
++) delete notifyMsg
[i
];
250 for (int i
=0; i
<TASK_BUFFER_SIZE
; i
++) delete taskBuffer
[i
];
254 #if USE_CONVERSE_NOTIFICATION
255 ConverseNotifyMsg
*getNotifyMsg() {
257 ConverseNotifyMsg
*cur
= notifyMsg
+nextFreeNotifyMsg
;
258 CurLoopInfo
*loop
= (CurLoopInfo
*)(cur
->ptr
);
259 nextFreeNotifyMsg
= (nextFreeNotifyMsg
+1)%notifyMsgBufSize
;
260 if (loop
->isFree()) return cur
;
265 CharmNotifyMsg
*getNotifyMsg() {
267 CharmNotifyMsg
*cur
= notifyMsg
[nextFreeNotifyMsg
];
268 CurLoopInfo
*loop
= (CurLoopInfo
*)(cur
->ptr
);
269 nextFreeNotifyMsg
= (nextFreeNotifyMsg
+1)%notifyMsgBufSize
;
270 if (loop
==NULL
|| loop
->isFree()) return cur
;
274 CurLoopInfo
*getNewTask() {
276 CurLoopInfo
*cur
= taskBuffer
[nextFreeTaskBuffer
];
277 nextFreeTaskBuffer
= (nextFreeTaskBuffer
+1)%TASK_BUFFER_SIZE
;
278 if (cur
->isFree()) return cur
;
284 void stealWork(CharmNotifyMsg
*msg
);
285 void destroyMyself() {
289 FuncSingleHelper(CkMigrateMessage
*m
) : CBase_FuncSingleHelper(m
) {}
292 void createNotifyMsg();