LRTS Comm Thread Tracing in message recieve
[charm.git] / src / ck-core / ckcallback.C
blobf07e24480b841efedd7370d3e46e3c638648730b
1 /*
2 A CkCallback is a simple way for a library to return data 
3 to a wide variety of user code, without the library having
4 to handle all 17 possible cases.
6 This object is implemented as a union, so the entire object
7 can be sent as bytes.  Another option would be to use a virtual 
8 "send" method.
10 Initial version by Orion Sky Lawlor, olawlor@acm.org, 2/8/2002
12 #include "charm++.h"
13 #include "ckcallback-ccs.h"
14 #include "CkCallback.decl.h"
15 #include "envelope.h"
16 /*readonly*/ CProxy_ckcallback_group _ckcallbackgroup;
18 typedef CkHashtableT<CkHashtableAdaptorT<unsigned int>, CkCallback*> threadCB_t;
19 CpvStaticDeclare(threadCB_t, threadCBs);
20 CpvStaticDeclare(unsigned int, nextThreadCB);
22 //This main chare is only used to create the callback forwarding group
23 class ckcallback_main : public CBase_ckcallback_main {
24 public:
25         ckcallback_main(CkArgMsg *m) {
26                 _ckcallbackgroup=CProxy_ckcallback_group::ckNew();
27                 delete m;
28         }
31 //The callback group is used to forward a callback to the processor
32 // it originated from.
33 class ckcallback_group : public CBase_ckcallback_group {
34 public:
35         ckcallback_group() { /*empty*/ }
36         ckcallback_group(CkMigrateMessage *m) { /*empty*/ }
37         void registerCcsCallback(const char *name,const CkCallback &cb);
38         void call(CkCallback &c,CkMarshalledMessage &msg) {
39                 c.send(msg.getMessage());
40         }
43 /*************** CkCallback implementation ***************/
44 //Initialize the callback's thread fields before sending it off:
45 void CkCallback::impl_thread_init(void)
47     int exist;
48     CkCallback **cb;
49     d.thread.onPE=CkMyPe();
50         do {
51           if (CpvAccess(nextThreadCB)==0) CpvAccess(nextThreadCB)=1;
52           d.thread.cb=CpvAccess(nextThreadCB)++;
53           cb = &CpvAccess(threadCBs).put(d.thread.cb, &exist);
54         } while (exist==1);
55         *cb = this; //<- so we can find this structure later
56         d.thread.th=NULL; //<- thread isn't suspended yet
57         d.thread.ret=(void*)-1;//<- no data to return yet
60 //Actually suspend this thread
61 void *CkCallback::impl_thread_delay(void) const
63         if (type!=resumeThread) 
64                 CkAbort("Called impl_thread_delay on non-threaded callback");
65         if (CkMyPe()!=d.thread.onPE)
66                 CkAbort("Called thread_delay on different processor than where callback was created");
67         
68         //Find the original callback object:
69         CkCallback *dest=(CkCallback *)this;
70         if (d.thread.cb!=0) dest=CpvAccess(threadCBs).get(d.thread.cb);
71         if (dest==0)
72             CkAbort("Called thread_delay on an already deleted callback");
73         if (dest->d.thread.ret==(void*)-1) 
74         {  //We need to sleep for the result:
75                 dest->d.thread.th=CthSelf(); //<- so we know a thread is waiting
76                 CthSuspend();
77                 if (dest->d.thread.ret==(void*)-1) 
78                         CkAbort("thread resumed, but callback data is still empty");
79         }
80         return dest->d.thread.ret;
84 /*These can't be defined in the .h file like the other constructors
85  * because we need CkCallback before CProxyElement* are defined.
86  */
87 CkCallback::CkCallback(Chare *p, int ep, bool doInline) {
88 #if CMK_ERROR_CHECKING
89       memset(this, 0, sizeof(CkCallback));
90 #endif
91       type=doInline?isendChare:sendChare;
92         d.chare.ep=ep; 
93         d.chare.id=p->ckGetChareID();
95 CkCallback::CkCallback(Group *p, int ep, bool doInline) {
96 #if CMK_ERROR_CHECKING
97       memset(this, 0, sizeof(CkCallback));
98 #endif
99       type=doInline?isendGroup:sendGroup;
100         d.group.ep=ep; d.group.id=p->ckGetGroupID(); d.group.onPE=CkMyPe();
102 CkCallback::CkCallback(NodeGroup *p, int ep, bool doInline) {
103 #if CMK_ERROR_CHECKING
104       memset(this, 0, sizeof(CkCallback));
105 #endif
106       type=doInline?isendNodeGroup:sendNodeGroup;
107         d.group.ep=ep; d.group.id=p->ckGetGroupID(); d.group.onPE=CkMyNode();
110 CkCallback::CkCallback(int ep,const CProxy_NodeGroup &ngp) {
111 #if CMK_ERROR_CHECKING
112       memset(this, 0, sizeof(CkCallback));
113 #endif
114       type=bcastNodeGroup;
115         d.group.ep=ep; d.group.id=ngp.ckGetGroupID();
118 CkCallback::CkCallback(int ep,int onPE,const CProxy_NodeGroup &ngp,bool doInline) {
119 #if CMK_ERROR_CHECKING
120       memset(this, 0, sizeof(CkCallback));
121 #endif
122       type=doInline?isendNodeGroup:sendNodeGroup;
123         d.group.ep=ep; d.group.id=ngp.ckGetGroupID(); d.group.onPE=onPE;
126 CkCallback::CkCallback(int ep,const CProxyElement_Group &grpElt,bool doInline) {
127 #if CMK_ERROR_CHECKING
128       memset(this, 0, sizeof(CkCallback));
129 #endif
130       type=doInline?isendGroup:sendGroup;
131         d.group.ep=ep; 
132         d.group.id=grpElt.ckGetGroupID(); 
133         d.group.onPE=grpElt.ckGetGroupPe();
135 CkCallback::CkCallback(int ep,const CProxyElement_ArrayBase &arrElt,bool doInline) {
136 #if CMK_ERROR_CHECKING
137       memset(this, 0, sizeof(CkCallback));
138 #endif
139       type=doInline?isendArray:sendArray;
140         d.array.ep=ep; 
141         d.array.id=arrElt.ckGetArrayID(); 
142         d.array.idx = arrElt.ckGetIndex();
145 CkCallback::CkCallback(int ep,CProxySection_ArrayBase &sectElt,bool doInline) {
146 #if CMK_ERROR_CHECKING
147       memset(this, 0, sizeof(CkCallback));
148 #endif
149       type=bcastSection;
150       d.section.ep=ep; 
151       CkSectionID secID=sectElt.ckGetSectionID(0); 
152       d.section.sinfo = secID._cookie.info;
153       d.section._elems = secID._elems;
154       d.section._nElems = secID._nElems;
155       d.section.pelist = secID.pelist;
156       d.section.npes = secID.npes;
157       secID._elems = NULL;
158       secID.pelist = NULL;
161 CkCallback::CkCallback(int ep, CkSectionID &id) {
162 #if CMK_ERROR_CHECKING
163       memset(this, 0, sizeof(CkCallback));
164 #endif
165       type=bcastSection;
166       d.section.ep=ep;
167       d.section.sinfo = id._cookie.info;
168       d.section._elems = id._elems;
169       d.section._nElems = id._nElems;
170       d.section.pelist = id.pelist;
171       d.section.npes = id.npes;
174 CkCallback::CkCallback(ArrayElement *p, int ep,bool doInline) {
175 #if CMK_ERROR_CHECKING
176       memset(this, 0, sizeof(CkCallback));
177 #endif
178       type=doInline?isendArray:sendArray;
179     d.array.ep=ep; 
180         d.array.id=p->ckGetArrayID(); 
181         d.array.idx = p->ckGetArrayIndex();
185 void CkCallback::send(int length,const void *data) const
187         send(CkDataMsg::buildNew(length,data));
190 /*Libraries should call this from their "done" entry points.
191   It takes the given message and handles it appropriately.
192   After the send(), this callback is finished and cannot be reused.
194 void CkCallback::send(void *msg) const
196         switch(type) {
197           //    CkPrintf("type:%d\n",type);
198         case ignore: //Just ignore the callback
199                 if (msg) CkFreeMsg(msg);
200                 break;
201         case ckExit: //Call ckExit
202                 if (msg) CkFreeMsg(msg);
203                 CkExit();
204                 break;
205         case resumeThread: //Resume a waiting thread
206                 if (d.thread.onPE==CkMyPe()) {
207                         CkCallback *dest=CpvAccess(threadCBs).get(d.thread.cb);
208                         if (dest==0 || dest->d.thread.ret!=(void*)-1)
209                                 CkAbort("Already sent a value to this callback!\n");
210                         dest->d.thread.ret=msg; //<- return data
211                         if (dest->d.thread.th!=NULL)
212                                 CthAwaken(dest->d.thread.th);
213                 } 
214                 else //Forward message to processor where the thread actually lives
215                         _ckcallbackgroup[d.thread.onPE].call(*this,(CkMessage *)msg);
216                 break;
217         case call1Fn: //Call a C function pointer on the current processor
218                 (d.c1fn.fn)(msg);
219                 break;
220         case callCFn: //Call a C function pointer on the appropriate processor
221                 if (d.cfn.onPE==CkMyPe())
222                         (d.cfn.fn)(d.cfn.param,msg);
223                 else
224                         _ckcallbackgroup[d.cfn.onPE].call(*this,(CkMessage *)msg);
225                 break;
226         case sendChare: //Send message to a chare
227                 if (!msg) msg=CkAllocSysMsg();
228                 if (d.chare.hasRefnum) CkSetRefNum(msg, d.chare.refnum);
229                 CkSendMsg(d.chare.ep,msg,&d.chare.id);
230                 break;
231         case isendChare: //inline send-to-chare
232                 if (!msg) msg=CkAllocSysMsg();
233                 if (d.chare.hasRefnum) CkSetRefNum(msg, d.chare.refnum);
234                 CkSendMsgInline(d.chare.ep,msg,&d.chare.id);
235                 break;
236         case sendGroup: //Send message to a group element
237                 if (!msg) msg=CkAllocSysMsg();
238                 if (d.group.hasRefnum) CkSetRefNum(msg, d.group.refnum);
239                 CkSendMsgBranch(d.group.ep,msg,d.group.onPE,d.group.id);
240                 break;
241         case sendNodeGroup: //Send message to a group element
242                 if (!msg) msg=CkAllocSysMsg();
243                 if (d.group.hasRefnum) CkSetRefNum(msg, d.group.refnum);
244                 CkSendMsgNodeBranch(d.group.ep,msg,d.group.onPE,d.group.id);
245                 break;
246         case isendGroup: //inline send-to-group element
247                 if (!msg) msg=CkAllocSysMsg();
248                 if (d.group.hasRefnum) CkSetRefNum(msg, d.group.refnum);
249                 CkSendMsgBranchInline(d.group.ep,msg,d.group.onPE,d.group.id);
250                 break;
251         case isendNodeGroup: //inline send-to-group element
252                 if (!msg) msg=CkAllocSysMsg();
253                 if (d.group.hasRefnum) CkSetRefNum(msg, d.group.refnum);
254                 CkSendMsgNodeBranchInline(d.group.ep,msg,d.group.onPE,d.group.id);
255                 break;
256         case sendArray: //Send message to an array element
257                 if (!msg) msg=CkAllocSysMsg();
258                 if (d.array.hasRefnum) CkSetRefNum(msg, d.array.refnum);
260                 CkSetMsgArrayIfNotThere(msg);
261                 CkSendMsgArray(d.array.ep,msg,d.array.id,d.array.idx.asChild());
262                 break;
263         case isendArray: //inline send-to-array element
264                 if (!msg) msg=CkAllocSysMsg();
265                 if (d.array.hasRefnum) CkSetRefNum(msg, d.array.refnum);
266                 CkSendMsgArrayInline(d.array.ep,msg,d.array.id,d.array.idx.asChild());
267                 break;
268         case bcastGroup:
269                 if (!msg) msg=CkAllocSysMsg();
270                 if (d.group.hasRefnum) CkSetRefNum(msg, d.group.refnum);
271                 CkBroadcastMsgBranch(d.group.ep,msg,d.group.id);
272                 break;
273         case bcastNodeGroup:
274                 if (!msg) msg=CkAllocSysMsg();
275                 if (d.group.hasRefnum) CkSetRefNum(msg, d.group.refnum);
276                 CkBroadcastMsgNodeBranch(d.group.ep,msg,d.group.id);
277                 break;
278         case bcastArray:
279                 if (!msg) msg=CkAllocSysMsg();
280                 if (d.array.hasRefnum) CkSetRefNum(msg, d.array.refnum);
281                 CkBroadcastMsgArray(d.array.ep,msg,d.array.id);
282                 break;
283         case bcastSection: {
284                 if(!msg)msg=CkAllocSysMsg();
285                 if (d.section.hasRefnum) CkSetRefNum(msg, d.section.refnum);
286                 CkSectionInfo sinfo(d.section.sinfo);
287                 CkSectionID secID(sinfo, d.section._elems, d.section._nElems, d.section.pelist, d.section.npes);
288                 CkBroadcastMsgSection(d.section.ep,msg,secID);
289                 secID._elems = NULL;
290                 secID.pelist = NULL;
291                 break;
292              }
293         case replyCCS: { /* Send CkDataMsg as a CCS reply */
294                 void *data=NULL;
295                 int length=0;
296                 if (msg) {
297                         CkDataMsg *m=(CkDataMsg *)msg;
298                         m->check();
299                         data=m->getData();
300                         length=m->getLength();
301                 }
302                 CcsSendDelayedReply(d.ccsReply.reply,length,data);
303                 if (msg) CkFreeMsg(msg);
304                 } break;
305         case invalid: //Uninitialized
306                 CmiAbort("Called send on uninitialized callback");
307                 break;
308         default: //Out-of-bounds type code
309                 CmiAbort("Called send on corrupted callback");
310                 break;
311         };
314 void CkCallback::pup(PUP::er &p) {
315   //p((char*)this, sizeof(CkCallback));
316   int t = (int)type;
317   p|t;
318   type = (callbackType)t;
319   switch (type) {
320   case resumeThread:
321     p|d.thread.onPE;
322     p|d.thread.cb;
323     break;
324   case isendChare:
325   case sendChare:
326     p|d.chare.ep;
327     p|d.chare.id;
328     p|d.chare.hasRefnum;
329     p|d.chare.refnum;
330     break;
331   case isendGroup:
332   case sendGroup:
333   case isendNodeGroup:
334   case sendNodeGroup:
335     p|d.group.onPE;
336     p|d.group.hasRefnum;
337     p|d.group.refnum;
338   case bcastNodeGroup:
339   case bcastGroup:
340     p|d.group.ep;
341     p|d.group.id;
342     p|d.group.hasRefnum;
343     p|d.group.refnum;
344     break;
345   case isendArray:
346   case sendArray:
347     p|d.array.idx;
348     p|d.array.hasRefnum;
349     p|d.array.refnum;
350   case bcastArray:
351     p|d.array.ep;
352     p|d.array.id;
353     p|d.array.hasRefnum;
354     p|d.array.refnum;
355     break;
356   case replyCCS:
357     p((char*)&d.ccsReply.reply, sizeof(d.ccsReply.reply));
358     break;
359   case call1Fn:
360     p((char*)&d.c1fn, sizeof(d.c1fn));
361     break;
362   case callCFn:
363     p((char*)&d.cfn, sizeof(d.cfn));
364     break;
365   case ignore:
366   case ckExit:
367   case invalid:
368     break;
369   default:
370     CkAbort("Inconsistent CkCallback type");
371   }
374 bool CkCallback::containsPointer() const {
375   switch(type) {
376   case invalid:
377   case ignore:
378   case ckExit:
379   case sendGroup:
380   case sendNodeGroup:
381   case sendArray:
382   case isendGroup:
383   case isendNodeGroup:
384   case isendArray:
385   case bcastGroup:
386   case bcastNodeGroup:
387   case bcastArray:
388     return false;
390   case resumeThread:
391   case callCFn:
392   case call1Fn:
393   case replyCCS:
394   case bcastSection:
395     return true;
397   case sendChare:
398   case isendChare:
399 #if CMK_CHARE_USE_PTR
400     return true;
401 #else
402     return false;
403 #endif
405   default:
406     CkAbort("Asked about an unknown CkCallback type");
407     return true;
408   }
411 void CkCallback::thread_destroy() const {
412   if (type==resumeThread && CpvAccess(threadCBs).get(d.thread.cb)==this) {
413     CpvAccess(threadCBs).remove(d.thread.cb);
414   }
417 CkCallbackResumeThread::~CkCallbackResumeThread() {
418   void * res = thread_delay(); //<- block thread here if it hasn't already
419   if (result != NULL) *result = res;
420   else CkFreeMsg(res);
421   thread_destroy();
424 /****** Callback-from-CCS ******/
426 // This function is called by CCS when a request comes in-- it maps the 
427 // request to a Charm++ message and passes the message to its callback.
428 extern "C" void ccsHandlerToCallback(void *cbPtr,int reqLen,const void *reqData) 
430         CkCallback *cb=(CkCallback *)cbPtr;
431         CkCcsRequestMsg *msg=new (reqLen,0) CkCcsRequestMsg;
432         msg->reply=CcsDelayReply();
433         msg->length=reqLen;
434         memcpy(msg->data,reqData,reqLen);
435         cb->send(msg);
438 // Register this callback with CCS.
439 void ckcallback_group::registerCcsCallback(const char *name,const CkCallback &cb)
441         CcsRegisterHandlerFn(name,ccsHandlerToCallback,new CkCallback(cb));
444 // Broadcast this callback registration to all processors
445 void CcsRegisterHandler(const char *ccs_handlername,const CkCallback &cb) {
446         _ckcallbackgroup.registerCcsCallback(ccs_handlername,cb);
449 enum {dataMsgTag=0x7ed2beef};
450 CkDataMsg *CkDataMsg::buildNew(int length,const void *data)
452         CkDataMsg *msg=new (&length,0) CkDataMsg;
453         msg->length=length;
454         memcpy(msg->data,data,length);
455         msg->checkTag=dataMsgTag;
456         return msg;
459 void CkDataMsg::check(void)
461         if (checkTag!=dataMsgTag)
462                 CkAbort("CkDataMsg corrupted-- bad tag.");
465 void CkCallbackInit() {
466   CpvInitialize(threadCB_t, threadCBs);
467   CpvInitialize(unsigned int, nextThreadCB);
468   CpvAccess(nextThreadCB)=1;
471 #include "CkCallback.def.h"