5 To call [sync] entry methods, we need a way to block
6 the current Converse thread until the called method returns.
8 A "future" represents a thread of control that has been passed
9 to another processor. It provides a place for a (local) thread to
10 block and the machinery for resuming control based on a remote
11 event. Futures are thus used to implement Charm++'s "[sync]" methods.
13 This "sequential futures abstraction" is a well-studied concept
14 in remote process control.
20 #include "ckfutures.h"
23 typedef struct Future_s {
40 CkQ<CthThread> waiters;
43 void *retmsg = msgs.deq();
45 waiters.enq(CthSelf());
51 void waitN(int n, void *marray[]) {
54 waiters.enq(CthSelf());
59 marray[i] = msgs.deq();
63 void signal(void *msg)
66 if(!waiters.isEmpty())
67 CthAwaken(waiters.deq());
78 CkSema *sem = new CkSema();
80 if(freelist.isEmpty()) {
82 pool.insertAtEnd(sem);
85 pool[idx] = new CkSema();
89 void release(int idx) {
90 CkSema * sem = pool[idx];
94 void _check(int idx) {
95 #if CMK_ERROR_CHECKING
97 CkAbort("ERROR! operation attempted on invalid semaphore\n");
101 void *wait(int idx) {
103 return pool[idx]->wait();
105 void waitN(int idx, int n, void *marray[]) {
107 pool[idx]->waitN(n, marray);
109 void signal(int idx, void *msg) {
111 pool[idx]->signal(msg);
115 CpvStaticDeclare(FutureState, futurestate);
116 CpvStaticDeclare(CkSemaPool*, semapool);
118 static void addedFutures(int lo, int hi)
121 FutureState *fs = &(CpvAccess(futurestate));
122 Future *array = fs->array;
124 for (i=lo; i<hi; i++)
126 array[hi-1].next = fs->freelist;
132 int createFuture(void)
134 FutureState *fs = &(CpvAccess(futurestate));
135 Future *fut; int handle, origsize;
137 /* if the freelist is empty, allocate more futures. */
138 if (fs->freelist == -1) {
140 fs->max = fs->max * 2;
141 fs->array = (Future*)realloc(fs->array, sizeof(Future)*(fs->max));
142 _MEMCHECK(fs->array);
143 addedFutures(origsize, fs->max);
145 handle = fs->freelist;
146 fut = fs->array + handle;
147 fs->freelist = fut->next;
156 CkFuture CkCreateFuture(void)
159 fut.id = createFuture();
165 void CkReleaseFutureID(CkFutureID handle)
167 FutureState *fs = &(CpvAccess(futurestate));
168 Future *fut = (fs->array)+handle;
169 fut->next = fs->freelist;
170 fs->freelist = handle;
174 int CkProbeFutureID(CkFutureID handle)
176 FutureState *fs = &(CpvAccess(futurestate));
177 Future *fut = (fs->array)+handle;
178 return (int)(fut->ready);
182 void *CkWaitFutureID(CkFutureID handle)
184 CthThread self = CthSelf();
185 FutureState *fs = &(CpvAccess(futurestate));
186 Future *fut = (fs->array)+handle;
190 CthSetNext(self, fut->waiters);
192 while (!(fut->ready)) { CthSuspend(); fut = (fs->array)+handle; }
194 fut = (fs->array)+handle;
196 #if CMK_ERROR_CHECKING
198 CkAbort("ERROR! CkWaitFuture would have to return NULL!\n"
199 "This can happen when a thread that calls a sync method "
200 "gets a CthAwaken call *before* the sync method returns.");
206 void CkReleaseFuture(CkFuture fut)
208 CkReleaseFutureID(fut.id);
212 int CkProbeFuture(CkFuture fut)
214 return CkProbeFutureID(fut.id);
218 void *CkWaitFuture(CkFuture fut)
220 return CkWaitFutureID(fut.id);
224 void CkWaitVoidFuture(CkFutureID handle)
226 CkFreeMsg(CkWaitFutureID(handle));
229 static void setFuture(CkFutureID handle, void *pointer)
232 FutureState *fs = &(CpvAccess(futurestate));
233 Future *fut = (fs->array)+handle;
235 #if CMK_ERROR_CHECKING
236 if (pointer==NULL) CkAbort("setFuture called with NULL!");
238 fut->value = pointer;
239 for (t=fut->waiters; t; t=CthGetNext(t))
244 void _futuresModuleInit(void)
246 CpvInitialize(FutureState, futurestate);
247 CpvInitialize(CkSemaPool *, semapool);
248 CpvAccess(futurestate).array = (Future *)malloc(10*sizeof(Future));
249 _MEMCHECK(CpvAccess(futurestate).array);
250 CpvAccess(futurestate).max = 10;
251 CpvAccess(futurestate).freelist = -1;
253 CpvAccess(semapool) = new CkSemaPool();
258 class FutureInitMsg : public CMessage_FutureInitMsg {
262 class FutureMain : public Chare {
264 FutureMain(CkArgMsg *m) {
265 _fbocID = CProxy_FutureBOC::ckNew(new FutureInitMsg);
268 FutureMain(CkMigrateMessage *m) {}
272 CkFutureID CkRemoteBranchCallAsync(int ep, void *m, CkGroupID group, int PE)
274 CkFutureID ret=CkCreateAttachedFuture(m);
275 CkSendMsgBranch(ep, m, PE, group);
280 void *CkRemoteBranchCall(int ep, void *m, CkGroupID group, int PE)
282 CkFutureID i = CkRemoteBranchCallAsync(ep, m, group, PE);
283 return CkWaitReleaseFuture(i);
287 CkFutureID CkRemoteNodeBranchCallAsync(int ep, void *m, CkGroupID group, int node)
289 CkFutureID ret=CkCreateAttachedFuture(m);
290 CkSendMsgNodeBranch(ep, m, node, group);
295 void *CkRemoteNodeBranchCall(int ep, void *m, CkGroupID group, int node)
297 CkFutureID i = CkRemoteNodeBranchCallAsync(ep, m, group, node);
298 return CkWaitReleaseFuture(i);
302 CkFutureID CkRemoteCallAsync(int ep, void *m, const CkChareID *ID)
304 CkFutureID ret=CkCreateAttachedFuture(m);
305 CkSendMsg(ep, m, ID);
310 void *CkRemoteCall(int ep, void *m, const CkChareID *ID)
312 CkFutureID i = CkRemoteCallAsync(ep, m, ID);
313 return CkWaitReleaseFuture(i);
317 extern "C" CkFutureID CkCreateAttachedFuture(void *msg)
319 CkFutureID ret=createFuture();
320 UsrToEnv(msg)->setRef(ret);
324 extern "C" CkFutureID CkCreateAttachedFutureSend(void *msg, int ep,
325 CkArrayID id, CkArrayIndex idx,
326 void(*fptr)(CkArrayID,CkArrayIndex,void*,int,int),int size)
328 CkFutureID ret=createFuture();
329 UsrToEnv(msg)->setRef(ret);
331 if (TheIGetControlClass.iget_request(ret,msg,ep,id,idx,fptr,size))
333 (fptr)(id,idx,msg,ep,0);
339 extern "C" CkFutureID CkCreateAttachedFutureSend(void *msg, int ep, void *obj,void(*fptr)(void*,void*,int,int))
341 CkFutureID ret=createFuture();
342 UsrToEnv(msg)->setRef(ret);
344 if (TheIGetControlClass.iget_request(ret,msg,ep,obj,fptr))
346 (fptr)(obj,msg,ep,0);
350 extern "C" void *CkWaitReleaseFuture(CkFutureID futNum)
353 TheIGetControlClass.iget_resend(futNum);
355 void *result=CkWaitFutureID(futNum);
356 CkReleaseFutureID(futNum);
358 TheIGetControlClass.iget_free(1);
359 // TheIGetControlClass.iget_free(sizeof(result));
364 class FutureBOC: public IrrGroup {
367 FutureBOC(FutureInitMsg *m) { delete m; }
368 FutureBOC(CkMigrateMessage *m) { }
369 void SetFuture(FutureInitMsg * m) {
370 #if CMK_ERROR_CHECKING
371 if (m==NULL) CkAbort("FutureBOC::SetFuture called with NULL!");
374 key = UsrToEnv((void *)m)->getRef();
377 void SetSema(FutureInitMsg *m) {
378 #if CMK_ERROR_CHECKING
379 if (m==NULL) CkAbort("FutureBOC::SetSema called with NULL!");
382 idx = UsrToEnv((void *)m)->getRef();
383 CpvAccess(semapool)->signal(idx,(void*)m);
388 void CkSendToFutureID(CkFutureID futNum, void *m, int PE)
390 UsrToEnv(m)->setRef(futNum);
391 CProxy_FutureBOC fBOC(_fbocID);
392 fBOC[PE].SetFuture((FutureInitMsg *)m);
396 void CkSendToFuture(CkFuture fut, void *msg)
398 CkSendToFutureID(fut.id, msg, fut.pe);
402 CkSemaID CkSemaCreate(void)
406 id.idx = CpvAccess(semapool)->getNew();
411 void *CkSemaWait(CkSemaID id)
413 #if CMK_ERROR_CHECKING
414 if(id.pe != CkMyPe()) {
415 CkAbort("ERROR: Waiting on nonlocal semaphore! Aborting..\n");
418 return CpvAccess(semapool)->wait(id.idx);
422 void CkSemaWaitN(CkSemaID id, int n, void *marray[])
424 #if CMK_ERROR_CHECKING
425 if(id.pe != CkMyPe()) {
426 CkAbort("ERROR: Waiting on nonlocal semaphore! Aborting..\n");
429 CpvAccess(semapool)->waitN(id.idx, n, marray);
433 void CkSemaSignal(CkSemaID id, void *m)
435 UsrToEnv(m)->setRef(id.idx);
436 CProxy_FutureBOC fBOC(_fbocID);
437 fBOC[id.pe].SetSema((FutureInitMsg *)m);
441 void CkSemaDestroy(CkSemaID id)
443 #if CMK_ERROR_CHECKING
444 if(id.pe != CkMyPe()) {
445 CkAbort("ERROR: destroying a nonlocal semaphore! Aborting..\n");
448 CpvAccess(semapool)->release(id.idx);
453 #include "CkFutures.def.h"