charmxi cleanup: rename atomic construct to serial internally
[charm.git] / src / ck-core / ckfutures.C
blob4787c5d5930aec81ce9c1c1e37fba602ba87410d
1 /**
2 \file
3 \addtogroup CkFutures
5 To call [sync] entry methods, we need a way to block
6 the current Converse thread until the called method returns.
8 A "future" represents a thread of control that has been passed
9 to another processor.  It provides a place for a (local) thread to
10 block and the machinery for resuming control based on a remote
11 event.  Futures are thus used to implement Charm++'s "[sync]" methods.
13 This "sequential futures abstraction" is a well-studied concept
14 in remote process control.
16 /*@{*/
17 #include "charm++.h"
18 #include "ck.h"
19 #include "ckarray.h"
20 #include "ckfutures.h"
21 #include <stdlib.h>
23 typedef struct Future_s {
24   bool ready;
25   void *value;
26   CthThread waiters;
27   int next; 
28 } Future;
30 typedef struct {
31   Future *array;
32   int max;
33   int freelist;
35 FutureState;
37 class CkSema {
38   private:
39     CkQ<void*> msgs;
40     CkQ<CthThread> waiters;
41   public:
42     void *wait(void) {
43       void *retmsg = msgs.deq();
44       if(retmsg==0) {
45         waiters.enq(CthSelf());
46         CthSuspend();
47         retmsg = msgs.deq();
48       }
49       return retmsg;
50     }
51     void waitN(int n, void *marray[]) {
52       while (1) {
53         if(msgs.length()<n) {
54           waiters.enq(CthSelf());
55           CthSuspend();
56           continue;
57         }
58         for(int i=0;i<n;i++)
59           marray[i] = msgs.deq();
60         return;
61       }
62     }
63     void signal(void *msg)
64     {
65       msgs.enq(msg);
66       if(!waiters.isEmpty())
67         CthAwaken(waiters.deq());
68       return;
69     }
72 class CkSemaPool {
73   private:
74     CkVec<CkSema*> pool;
75     CkQ<int> freelist;
76   public:
77     int getNew(void) {
78       CkSema *sem = new CkSema();
79       int idx;
80       if(freelist.isEmpty()) {
81         idx = pool.length();
82         pool.insertAtEnd(sem);
83       } else {
84         idx = freelist.deq();
85         pool[idx] = new CkSema();
86       }
87       return idx;
88     }
89     void release(int idx) {
90       CkSema * sem = pool[idx];
91       delete sem;
92       freelist.enq(idx);
93     }
94     void _check(int idx) {
95 #if CMK_ERROR_CHECKING
96       if(pool[idx]==0) {
97               CkAbort("ERROR! operation attempted on invalid semaphore\n");
98       }
99 #endif
100     }
101     void *wait(int idx) { 
102       _check(idx);
103       return pool[idx]->wait(); 
104     }
105     void waitN(int idx, int n, void *marray[]) { 
106       _check(idx);
107       pool[idx]->waitN(n, marray); 
108     }
109     void signal(int idx, void *msg) { 
110       _check(idx);
111       pool[idx]->signal(msg); 
112     }
115 CpvStaticDeclare(FutureState, futurestate);
116 CpvStaticDeclare(CkSemaPool*, semapool);
118 static void addedFutures(int lo, int hi)
120   int i;
121   FutureState *fs = &(CpvAccess(futurestate));
122   Future *array = fs->array;
124   for (i=lo; i<hi; i++)
125     array[i].next = i+1;
126   array[hi-1].next = fs->freelist;
127   fs->freelist = lo;
130 static 
131 inline
132 int createFuture(void)
134   FutureState *fs = &(CpvAccess(futurestate));
135   Future *fut; int handle, origsize;
137   /* if the freelist is empty, allocate more futures. */
138   if (fs->freelist == -1) {
139     origsize = fs->max;
140     fs->max = fs->max * 2;
141     fs->array = (Future*)realloc(fs->array, sizeof(Future)*(fs->max));
142     _MEMCHECK(fs->array);
143     addedFutures(origsize, fs->max);
144   }
145   handle = fs->freelist;
146   fut = fs->array + handle;
147   fs->freelist = fut->next;
148   fut->ready = false;
149   fut->value = 0;
150   fut->waiters = 0;
151   fut->next = 0;
152   return handle;
155 extern "C"
156 CkFuture CkCreateFuture(void)
158   CkFuture fut;
159   fut.id = createFuture();
160   fut.pe = CkMyPe();
161   return fut;
164 extern "C"
165 void CkReleaseFutureID(CkFutureID handle)
167   FutureState *fs = &(CpvAccess(futurestate));
168   Future *fut = (fs->array)+handle;
169   fut->next = fs->freelist;
170   fs->freelist = handle;
173 extern "C"
174 int CkProbeFutureID(CkFutureID handle)
176   FutureState *fs = &(CpvAccess(futurestate));
177   Future *fut = (fs->array)+handle;
178   return (int)(fut->ready);
181 extern "C"
182 void *CkWaitFutureID(CkFutureID handle)
184   CthThread self = CthSelf();
185   FutureState *fs = &(CpvAccess(futurestate));
186   Future *fut = (fs->array)+handle;
187   void *value;
189   if (!(fut->ready)) {
190     CthSetNext(self, fut->waiters);
191     fut->waiters = self;
192     while (!(fut->ready)) { CthSuspend(); fut = (fs->array)+handle; }
193   }
194   fut = (fs->array)+handle;
195   value = fut->value;
196 #if CMK_ERROR_CHECKING
197   if (value==NULL) 
198         CkAbort("ERROR! CkWaitFuture would have to return NULL!\n"
199         "This can happen when a thread that calls a sync method "
200         "gets a CthAwaken call *before* the sync method returns.");
201 #endif
202   return value;
205 extern "C"
206 void CkReleaseFuture(CkFuture fut)
208   CkReleaseFutureID(fut.id);
211 extern "C"
212 int CkProbeFuture(CkFuture fut)
214   return CkProbeFutureID(fut.id);
217 extern "C"
218 void *CkWaitFuture(CkFuture fut)
220   return CkWaitFutureID(fut.id);
223 extern "C"
224 void CkWaitVoidFuture(CkFutureID handle)
226   CkFreeMsg(CkWaitFutureID(handle));
229 static void setFuture(CkFutureID handle, void *pointer)
231   CthThread t;
232   FutureState *fs = &(CpvAccess(futurestate));
233   Future *fut = (fs->array)+handle;
234   fut->ready = true;
235 #if CMK_ERROR_CHECKING
236   if (pointer==NULL) CkAbort("setFuture called with NULL!");
237 #endif
238   fut->value = pointer;
239   for (t=fut->waiters; t; t=CthGetNext(t))
240     CthAwaken(t);
241   fut->waiters = 0;
244 void _futuresModuleInit(void)
246   CpvInitialize(FutureState, futurestate);
247   CpvInitialize(CkSemaPool *, semapool);
248   CpvAccess(futurestate).array = (Future *)malloc(10*sizeof(Future));
249   _MEMCHECK(CpvAccess(futurestate).array);
250   CpvAccess(futurestate).max   = 10;
251   CpvAccess(futurestate).freelist = -1;
252   addedFutures(0,10);
253   CpvAccess(semapool) = new CkSemaPool();
256 CkGroupID _fbocID;
258 class FutureInitMsg : public CMessage_FutureInitMsg {
259   public: int x ;
262 class  FutureMain : public Chare {
263   public:
264     FutureMain(CkArgMsg *m) {
265       _fbocID = CProxy_FutureBOC::ckNew(new FutureInitMsg);
266       delete m;
267     }
268     FutureMain(CkMigrateMessage *m) {}
271 extern "C" 
272 CkFutureID CkRemoteBranchCallAsync(int ep, void *m, CkGroupID group, int PE)
274   CkFutureID ret=CkCreateAttachedFuture(m);
275   CkSendMsgBranch(ep, m, PE, group);
276   return ret;
279 extern "C" 
280 void *CkRemoteBranchCall(int ep, void *m, CkGroupID group, int PE)
282   CkFutureID i = CkRemoteBranchCallAsync(ep, m, group, PE);  
283   return CkWaitReleaseFuture(i);
286 extern "C" 
287 CkFutureID CkRemoteNodeBranchCallAsync(int ep, void *m, CkGroupID group, int node)
289   CkFutureID ret=CkCreateAttachedFuture(m);
290   CkSendMsgNodeBranch(ep, m, node, group);
291   return ret;
294 extern "C" 
295 void *CkRemoteNodeBranchCall(int ep, void *m, CkGroupID group, int node)
297   CkFutureID i = CkRemoteNodeBranchCallAsync(ep, m, group, node);
298   return CkWaitReleaseFuture(i);
301 extern "C" 
302 CkFutureID CkRemoteCallAsync(int ep, void *m, const CkChareID *ID)
304   CkFutureID ret=CkCreateAttachedFuture(m);
305   CkSendMsg(ep, m, ID);
306   return ret;
309 extern "C" 
310 void *CkRemoteCall(int ep, void *m, const CkChareID *ID)
312   CkFutureID i = CkRemoteCallAsync(ep, m, ID);
313   return CkWaitReleaseFuture(i);
317 extern "C" CkFutureID CkCreateAttachedFuture(void *msg)
319   CkFutureID ret=createFuture();
320   UsrToEnv(msg)->setRef(ret);
321   return ret;
324 extern "C" CkFutureID CkCreateAttachedFutureSend(void *msg, int ep,
325 CkArrayID id, CkArrayIndex idx,
326 void(*fptr)(CkArrayID,CkArrayIndex,void*,int,int),int size)
328 CkFutureID ret=createFuture();
329 UsrToEnv(msg)->setRef(ret);
330 #if IGET_FLOWCONTROL
331 if (TheIGetControlClass.iget_request(ret,msg,ep,id,idx,fptr,size))
332 #endif
333 (fptr)(id,idx,msg,ep,0);
334 return ret;
339 extern "C" CkFutureID CkCreateAttachedFutureSend(void *msg, int ep, void *obj,void(*fptr)(void*,void*,int,int))
341   CkFutureID ret=createFuture();
342   UsrToEnv(msg)->setRef(ret);
343 #if IGET_FLOWCONTROL
344   if (TheIGetControlClass.iget_request(ret,msg,ep,obj,fptr)) 
345 #endif
346   (fptr)(obj,msg,ep,0);
347   return ret;
350 extern "C" void *CkWaitReleaseFuture(CkFutureID futNum)
352 #if IGET_FLOWCONTROL
353   TheIGetControlClass.iget_resend(futNum);
354 #endif
355   void *result=CkWaitFutureID(futNum);
356   CkReleaseFutureID(futNum);
357 #if IGET_FLOWCONTROL
358   TheIGetControlClass.iget_free(1);
359 //  TheIGetControlClass.iget_free(sizeof(result));
360 #endif
361   return result;
364 class FutureBOC: public IrrGroup {
365 public:
366   FutureBOC(void){ }
367   FutureBOC(FutureInitMsg *m) { delete m; }
368   FutureBOC(CkMigrateMessage *m) { }
369   void SetFuture(FutureInitMsg * m) { 
370 #if CMK_ERROR_CHECKING
371     if (m==NULL) CkAbort("FutureBOC::SetFuture called with NULL!");
372 #endif
373     int key;
374     key = UsrToEnv((void *)m)->getRef();
375     setFuture( key, m);
376   }
377   void SetSema(FutureInitMsg *m) {
378 #if CMK_ERROR_CHECKING
379     if (m==NULL) CkAbort("FutureBOC::SetSema called with NULL!");
380 #endif
381     int idx;
382     idx = UsrToEnv((void *)m)->getRef();
383     CpvAccess(semapool)->signal(idx,(void*)m);
384   }
387 extern "C" 
388 void CkSendToFutureID(CkFutureID futNum, void *m, int PE)
390   UsrToEnv(m)->setRef(futNum);
391   CProxy_FutureBOC fBOC(_fbocID);
392   fBOC[PE].SetFuture((FutureInitMsg *)m);
395 extern "C"
396 void  CkSendToFuture(CkFuture fut, void *msg)
398   CkSendToFutureID(fut.id, msg, fut.pe);
401 extern "C"
402 CkSemaID CkSemaCreate(void)
404   CkSemaID id;
405   id.pe = CkMyPe();
406   id.idx = CpvAccess(semapool)->getNew();
407   return id;
410 extern "C"
411 void *CkSemaWait(CkSemaID id)
413 #if CMK_ERROR_CHECKING
414   if(id.pe != CkMyPe()) {
415     CkAbort("ERROR: Waiting on nonlocal semaphore! Aborting..\n");
416   }
417 #endif
418   return CpvAccess(semapool)->wait(id.idx);
421 extern "C"
422 void CkSemaWaitN(CkSemaID id, int n, void *marray[])
424 #if CMK_ERROR_CHECKING
425   if(id.pe != CkMyPe()) {
426     CkAbort("ERROR: Waiting on nonlocal semaphore! Aborting..\n");
427   }
428 #endif
429   CpvAccess(semapool)->waitN(id.idx, n, marray);
432 extern "C"
433 void CkSemaSignal(CkSemaID id, void *m)
435   UsrToEnv(m)->setRef(id.idx);
436   CProxy_FutureBOC fBOC(_fbocID);
437   fBOC[id.pe].SetSema((FutureInitMsg *)m);
440 extern "C"
441 void CkSemaDestroy(CkSemaID id)
443 #if CMK_ERROR_CHECKING
444   if(id.pe != CkMyPe()) {
445     CkAbort("ERROR: destroying a nonlocal semaphore! Aborting..\n");
446   }
447 #endif
448   CpvAccess(semapool)->release(id.idx);
452 /*@}*/
453 #include "CkFutures.def.h"