2 A CkCallback is a simple way for a library to return data
3 to a wide variety of user code, without the library having
4 to handle all 17 possible cases.
6 This object is implemented as a union, so the entire object
7 can be sent as bytes. Another option would be to use a virtual
10 Initial version by Orion Sky Lawlor, olawlor@acm.org, 2/8/2002
13 #include "ckcallback-ccs.h"
14 #include "CkCallback.decl.h"
16 /*readonly*/ CProxy_ckcallback_group _ckcallbackgroup;
18 typedef CkHashtableT<CkHashtableAdaptorT<unsigned int>, CkCallback*> threadCB_t;
19 CpvStaticDeclare(threadCB_t, threadCBs);
20 CpvStaticDeclare(unsigned int, nextThreadCB);
22 //This main chare is only used to create the callback forwarding group
23 class ckcallback_main : public CBase_ckcallback_main {
25 ckcallback_main(CkArgMsg *m) {
26 _ckcallbackgroup=CProxy_ckcallback_group::ckNew();
31 //The callback group is used to forward a callback to the processor
32 // it originated from.
33 class ckcallback_group : public CBase_ckcallback_group {
35 ckcallback_group() { /*empty*/ }
36 ckcallback_group(CkMigrateMessage *m) { /*empty*/ }
37 void registerCcsCallback(const char *name,const CkCallback &cb);
38 void call(CkCallback &c,CkMarshalledMessage &msg) {
39 c.send(msg.getMessage());
43 /*************** CkCallback implementation ***************/
44 //Initialize the callback's thread fields before sending it off:
45 void CkCallback::impl_thread_init(void)
49 d.thread.onPE=CkMyPe();
51 if (CpvAccess(nextThreadCB)==0) CpvAccess(nextThreadCB)=1;
52 d.thread.cb=CpvAccess(nextThreadCB)++;
53 cb = &CpvAccess(threadCBs).put(d.thread.cb, &exist);
55 *cb = this; //<- so we can find this structure later
56 d.thread.th=NULL; //<- thread isn't suspended yet
57 d.thread.ret=(void*)-1;//<- no data to return yet
60 //Actually suspend this thread
61 void *CkCallback::impl_thread_delay(void) const
63 if (type!=resumeThread)
64 CkAbort("Called impl_thread_delay on non-threaded callback");
65 if (CkMyPe()!=d.thread.onPE)
66 CkAbort("Called thread_delay on different processor than where callback was created");
68 //Find the original callback object:
69 CkCallback *dest=(CkCallback *)this;
70 if (d.thread.cb!=0) dest=CpvAccess(threadCBs).get(d.thread.cb);
72 CkAbort("Called thread_delay on an already deleted callback");
73 if (dest->d.thread.ret==(void*)-1)
74 { //We need to sleep for the result:
75 dest->d.thread.th=CthSelf(); //<- so we know a thread is waiting
77 if (dest->d.thread.ret==(void*)-1)
78 CkAbort("thread resumed, but callback data is still empty");
80 return dest->d.thread.ret;
84 /*These can't be defined in the .h file like the other constructors
85 * because we need CkCallback before CProxyElement* are defined.
87 CkCallback::CkCallback(Chare *p, int ep, bool doInline) {
88 #if CMK_ERROR_CHECKING
89 memset(this, 0, sizeof(CkCallback));
91 type=doInline?isendChare:sendChare;
93 d.chare.id=p->ckGetChareID();
95 CkCallback::CkCallback(Group *p, int ep, bool doInline) {
96 #if CMK_ERROR_CHECKING
97 memset(this, 0, sizeof(CkCallback));
99 type=doInline?isendGroup:sendGroup;
100 d.group.ep=ep; d.group.id=p->ckGetGroupID(); d.group.onPE=CkMyPe();
102 CkCallback::CkCallback(NodeGroup *p, int ep, bool doInline) {
103 #if CMK_ERROR_CHECKING
104 memset(this, 0, sizeof(CkCallback));
106 type=doInline?isendNodeGroup:sendNodeGroup;
107 d.group.ep=ep; d.group.id=p->ckGetGroupID(); d.group.onPE=CkMyNode();
110 CkCallback::CkCallback(int ep,const CProxy_NodeGroup &ngp) {
111 #if CMK_ERROR_CHECKING
112 memset(this, 0, sizeof(CkCallback));
115 d.group.ep=ep; d.group.id=ngp.ckGetGroupID();
118 CkCallback::CkCallback(int ep,int onPE,const CProxy_NodeGroup &ngp,bool doInline) {
119 #if CMK_ERROR_CHECKING
120 memset(this, 0, sizeof(CkCallback));
122 type=doInline?isendNodeGroup:sendNodeGroup;
123 d.group.ep=ep; d.group.id=ngp.ckGetGroupID(); d.group.onPE=onPE;
126 CkCallback::CkCallback(int ep,const CProxyElement_Group &grpElt,bool doInline) {
127 #if CMK_ERROR_CHECKING
128 memset(this, 0, sizeof(CkCallback));
130 type=doInline?isendGroup:sendGroup;
132 d.group.id=grpElt.ckGetGroupID();
133 d.group.onPE=grpElt.ckGetGroupPe();
135 CkCallback::CkCallback(int ep,const CProxyElement_ArrayBase &arrElt,bool doInline) {
136 #if CMK_ERROR_CHECKING
137 memset(this, 0, sizeof(CkCallback));
139 type=doInline?isendArray:sendArray;
141 d.array.id=arrElt.ckGetArrayID();
142 d.array.idx = arrElt.ckGetIndex();
145 CkCallback::CkCallback(int ep,CProxySection_ArrayBase §Elt,bool doInline) {
146 #if CMK_ERROR_CHECKING
147 memset(this, 0, sizeof(CkCallback));
151 CkSectionID secID=sectElt.ckGetSectionID(0);
152 d.section.sinfo = secID._cookie.info;
153 d.section._elems = secID._elems;
154 d.section._nElems = secID._nElems;
155 d.section.pelist = secID.pelist;
156 d.section.npes = secID.npes;
161 CkCallback::CkCallback(int ep, CkSectionID &id) {
162 #if CMK_ERROR_CHECKING
163 memset(this, 0, sizeof(CkCallback));
167 d.section.sinfo = id._cookie.info;
168 d.section._elems = id._elems;
169 d.section._nElems = id._nElems;
170 d.section.pelist = id.pelist;
171 d.section.npes = id.npes;
174 CkCallback::CkCallback(ArrayElement *p, int ep,bool doInline) {
175 #if CMK_ERROR_CHECKING
176 memset(this, 0, sizeof(CkCallback));
178 type=doInline?isendArray:sendArray;
180 d.array.id=p->ckGetArrayID();
181 d.array.idx = p->ckGetArrayIndex();
185 void CkCallback::send(int length,const void *data) const
187 send(CkDataMsg::buildNew(length,data));
190 /*Libraries should call this from their "done" entry points.
191 It takes the given message and handles it appropriately.
192 After the send(), this callback is finished and cannot be reused.
194 void CkCallback::send(void *msg) const
197 // CkPrintf("type:%d\n",type);
198 case ignore: //Just ignore the callback
199 if (msg) CkFreeMsg(msg);
201 case ckExit: //Call ckExit
202 if (msg) CkFreeMsg(msg);
205 case resumeThread: //Resume a waiting thread
206 if (d.thread.onPE==CkMyPe()) {
207 CkCallback *dest=CpvAccess(threadCBs).get(d.thread.cb);
208 if (dest==0 || dest->d.thread.ret!=(void*)-1)
209 CkAbort("Already sent a value to this callback!\n");
210 dest->d.thread.ret=msg; //<- return data
211 if (dest->d.thread.th!=NULL)
212 CthAwaken(dest->d.thread.th);
214 else //Forward message to processor where the thread actually lives
215 _ckcallbackgroup[d.thread.onPE].call(*this,(CkMessage *)msg);
217 case call1Fn: //Call a C function pointer on the current processor
220 case callCFn: //Call a C function pointer on the appropriate processor
221 if (d.cfn.onPE==CkMyPe())
222 (d.cfn.fn)(d.cfn.param,msg);
224 _ckcallbackgroup[d.cfn.onPE].call(*this,(CkMessage *)msg);
226 case sendChare: //Send message to a chare
227 if (!msg) msg=CkAllocSysMsg();
228 if (d.chare.hasRefnum) CkSetRefNum(msg, d.chare.refnum);
229 CkSendMsg(d.chare.ep,msg,&d.chare.id);
231 case isendChare: //inline send-to-chare
232 if (!msg) msg=CkAllocSysMsg();
233 if (d.chare.hasRefnum) CkSetRefNum(msg, d.chare.refnum);
234 CkSendMsgInline(d.chare.ep,msg,&d.chare.id);
236 case sendGroup: //Send message to a group element
237 if (!msg) msg=CkAllocSysMsg();
238 if (d.group.hasRefnum) CkSetRefNum(msg, d.group.refnum);
239 CkSendMsgBranch(d.group.ep,msg,d.group.onPE,d.group.id);
241 case sendNodeGroup: //Send message to a group element
242 if (!msg) msg=CkAllocSysMsg();
243 if (d.group.hasRefnum) CkSetRefNum(msg, d.group.refnum);
244 CkSendMsgNodeBranch(d.group.ep,msg,d.group.onPE,d.group.id);
246 case isendGroup: //inline send-to-group element
247 if (!msg) msg=CkAllocSysMsg();
248 if (d.group.hasRefnum) CkSetRefNum(msg, d.group.refnum);
249 CkSendMsgBranchInline(d.group.ep,msg,d.group.onPE,d.group.id);
251 case isendNodeGroup: //inline send-to-group element
252 if (!msg) msg=CkAllocSysMsg();
253 if (d.group.hasRefnum) CkSetRefNum(msg, d.group.refnum);
254 CkSendMsgNodeBranchInline(d.group.ep,msg,d.group.onPE,d.group.id);
256 case sendArray: //Send message to an array element
257 if (!msg) msg=CkAllocSysMsg();
258 if (d.array.hasRefnum) CkSetRefNum(msg, d.array.refnum);
260 CkSetMsgArrayIfNotThere(msg);
261 CkSendMsgArray(d.array.ep,msg,d.array.id,d.array.idx.asChild());
263 case isendArray: //inline send-to-array element
264 if (!msg) msg=CkAllocSysMsg();
265 if (d.array.hasRefnum) CkSetRefNum(msg, d.array.refnum);
266 CkSendMsgArrayInline(d.array.ep,msg,d.array.id,d.array.idx.asChild());
269 if (!msg) msg=CkAllocSysMsg();
270 if (d.group.hasRefnum) CkSetRefNum(msg, d.group.refnum);
271 CkBroadcastMsgBranch(d.group.ep,msg,d.group.id);
274 if (!msg) msg=CkAllocSysMsg();
275 if (d.group.hasRefnum) CkSetRefNum(msg, d.group.refnum);
276 CkBroadcastMsgNodeBranch(d.group.ep,msg,d.group.id);
279 if (!msg) msg=CkAllocSysMsg();
280 if (d.array.hasRefnum) CkSetRefNum(msg, d.array.refnum);
281 CkBroadcastMsgArray(d.array.ep,msg,d.array.id);
284 if(!msg)msg=CkAllocSysMsg();
285 if (d.section.hasRefnum) CkSetRefNum(msg, d.section.refnum);
286 CkSectionInfo sinfo(d.section.sinfo);
287 CkSectionID secID(sinfo, d.section._elems, d.section._nElems, d.section.pelist, d.section.npes);
288 CkBroadcastMsgSection(d.section.ep,msg,secID);
293 case replyCCS: { /* Send CkDataMsg as a CCS reply */
297 CkDataMsg *m=(CkDataMsg *)msg;
300 length=m->getLength();
302 CcsSendDelayedReply(d.ccsReply.reply,length,data);
303 if (msg) CkFreeMsg(msg);
305 case invalid: //Uninitialized
306 CmiAbort("Called send on uninitialized callback");
308 default: //Out-of-bounds type code
309 CmiAbort("Called send on corrupted callback");
314 void CkCallback::pup(PUP::er &p) {
315 //p((char*)this, sizeof(CkCallback));
318 type = (callbackType)t;
357 p((char*)&d.ccsReply.reply, sizeof(d.ccsReply.reply));
360 p((char*)&d.c1fn, sizeof(d.c1fn));
363 p((char*)&d.cfn, sizeof(d.cfn));
370 CkAbort("Inconsistent CkCallback type");
374 bool CkCallback::containsPointer() const {
399 #if CMK_CHARE_USE_PTR
406 CkAbort("Asked about an unknown CkCallback type");
411 void CkCallback::thread_destroy() const {
412 if (type==resumeThread && CpvAccess(threadCBs).get(d.thread.cb)==this) {
413 CpvAccess(threadCBs).remove(d.thread.cb);
417 CkCallbackResumeThread::~CkCallbackResumeThread() {
418 void * res = thread_delay(); //<- block thread here if it hasn't already
419 if (result != NULL) *result = res;
424 /****** Callback-from-CCS ******/
426 // This function is called by CCS when a request comes in-- it maps the
427 // request to a Charm++ message and passes the message to its callback.
428 extern "C" void ccsHandlerToCallback(void *cbPtr,int reqLen,const void *reqData)
430 CkCallback *cb=(CkCallback *)cbPtr;
431 CkCcsRequestMsg *msg=new (reqLen,0) CkCcsRequestMsg;
432 msg->reply=CcsDelayReply();
434 memcpy(msg->data,reqData,reqLen);
438 // Register this callback with CCS.
439 void ckcallback_group::registerCcsCallback(const char *name,const CkCallback &cb)
441 CcsRegisterHandlerFn(name,ccsHandlerToCallback,new CkCallback(cb));
444 // Broadcast this callback registration to all processors
445 void CcsRegisterHandler(const char *ccs_handlername,const CkCallback &cb) {
446 _ckcallbackgroup.registerCcsCallback(ccs_handlername,cb);
449 enum {dataMsgTag=0x7ed2beef};
450 CkDataMsg *CkDataMsg::buildNew(int length,const void *data)
452 CkDataMsg *msg=new (&length,0) CkDataMsg;
454 memcpy(msg->data,data,length);
455 msg->checkTag=dataMsgTag;
459 void CkDataMsg::check(void)
461 if (checkTag!=dataMsgTag)
462 CkAbort("CkDataMsg corrupted-- bad tag.");
465 void CkCallbackInit() {
466 CpvInitialize(threadCB_t, threadCBs);
467 CpvInitialize(unsigned int, nextThreadCB);
468 CpvAccess(nextThreadCB)=1;
471 #include "CkCallback.def.h"