2 * Support for persistent communication setup
12 #if CMK_PERSISTENT_COMM
13 //#define EXTERNAL_COMPRESS 1
14 //#if EXTERNAL_COMPRESS
17 #include "compress-external.C"
19 #include "machine-persistent.h"
20 #define ENVELOP_SIZE 104
22 CpvDeclare(PersistentSendsTable *, persistentSendsTableHead);
23 CpvDeclare(PersistentSendsTable *, persistentSendsTableTail);
24 CpvDeclare(int, persistentSendsTableCount);
25 CpvDeclare(PersistentReceivesTable *, persistentReceivesTableHead);
26 CpvDeclare(PersistentReceivesTable *, persistentReceivesTableTail);
27 CpvDeclare(int, persistentReceivesTableCount);
29 /* Converse message type */
30 typedef struct _PersistentRequestMsg {
31 char core[CmiMsgHeaderSizeBytes];
34 PersistentHandle sourceHandler;
39 } PersistentRequestMsg;
41 typedef struct _PersistentReqGrantedMsg {
42 char core[CmiMsgHeaderSizeBytes];
44 void *msgAddr[PERSIST_BUFFERS_NUM];
45 void *slotFlagAddress[PERSIST_BUFFERS_NUM];
47 PersistentBuf buf[PERSIST_BUFFERS_NUM];
48 PersistentHandle sourceHandler;
49 PersistentHandle destHandler;
50 PersistentHandle destDataHandler;
51 } PersistentReqGrantedMsg;
53 typedef struct _PersistentDestroyMsg {
54 char core[CmiMsgHeaderSizeBytes];
55 PersistentHandle destHandlerIndex;
56 } PersistentDestroyMsg;
58 /* Converse handler */
59 int persistentRequestHandlerIdx;
60 int persistentReqGrantedHandlerIdx;
61 int persistentDestroyHandlerIdx;
62 int persistentDecompressHandlerIdx;
63 int persistentNoDecompressHandlerIdx;
65 CpvDeclare(PersistentHandle *, phs);
66 CpvDeclare(int, phsSize);
67 CpvDeclare(int, curphs);
69 /******************************************************************************
71 ******************************************************************************/
73 void initRecvSlot(PersistentReceivesTable *slot);
74 void initSendSlot(PersistentSendsTable *slot);
76 void swapSendSlotBuffers(PersistentSendsTable *slot)
78 if (PERSIST_BUFFERS_NUM == 2) {
80 void *tmp = slot->destAddress[0];
81 slot->destAddress[0] = slot->destAddress[1];
82 slot->destAddress[1] = tmp;
83 tmp = slot->destSizeAddress[0];
84 slot->destSizeAddress[0] = slot->destSizeAddress[1];
85 slot->destSizeAddress[1] = tmp;
87 PersistentBuf tmp = slot->destBuf[0];
88 slot->destBuf[0] = slot->destBuf[1];
89 slot->destBuf[1] = tmp;
94 void swapRecvSlotBuffers(PersistentReceivesTable *slot)
96 if (PERSIST_BUFFERS_NUM == 2) {
98 void *tmp = slot->messagePtr[0];
99 slot->messagePtr[0] = slot->messagePtr[1];
100 slot->messagePtr[1] = tmp;
101 tmp = slot->recvSizePtr[0];
102 slot->recvSizePtr[0] = slot->recvSizePtr[1];
103 slot->recvSizePtr[1] = tmp;
105 PersistentBuf tmp = slot->destBuf[0];
106 slot->destBuf[0] = slot->destBuf[1];
107 slot->destBuf[1] = tmp;
112 PersistentHandle getFreeSendSlot(void)
114 PersistentSendsTable *slot = (PersistentSendsTable *)malloc(sizeof(PersistentSendsTable));
116 if (CpvAccess(persistentSendsTableHead) == NULL) {
117 CpvAccess(persistentSendsTableHead) = CpvAccess(persistentSendsTableTail) = slot;
120 CpvAccess(persistentSendsTableTail)->next = slot;
121 slot->prev = CpvAccess(persistentSendsTableTail);
122 CpvAccess(persistentSendsTableTail) = slot;
124 CpvAccess(persistentSendsTableCount)++;
128 PersistentHandle getFreeRecvSlot(void)
130 PersistentReceivesTable *slot = (PersistentReceivesTable *)malloc(sizeof(PersistentReceivesTable));
132 if (CpvAccess(persistentReceivesTableHead) == NULL) {
133 CpvAccess(persistentReceivesTableHead) = CpvAccess(persistentReceivesTableTail) = slot;
136 CpvAccess(persistentReceivesTableTail)->next = slot;
137 slot->prev = CpvAccess(persistentReceivesTableTail);
138 CpvAccess(persistentReceivesTableTail) = slot;
140 CpvAccess(persistentReceivesTableCount)++;
144 /******************************************************************************
145 Create Persistent Comm handler
146 When creating a persistent comm with destPE and maxSize
147 1. allocate a free PersistentSendsTable entry, send a
148 PersistentRequestMsg message to destPE
149 buffer persistent message before Persistent Comm is setup;
150 2. destPE execute Converse handler persistentRequestHandler() on the
151 PersistentRequestMsg message:
152 allocate a free PersistentReceivesTable entry;
153 allocate a message buffer of size maxSize for the communication;
154 Send back a PersistentReqGrantedMsg with message address, etc for
156 3. Converse handler persistentReqGrantedHandler() executed on
157 sender for the PersistentReqGrantedMsg. setup finish, send buffered
159 ******************************************************************************/
160 PersistentHandle CmiCreateCompressPersistent(int destPE, int maxBytes, int compressStart, int type)
163 PersistentSendsTable *slot;
165 if (CmiMyNode() == CmiNodeOf(destPE)) return NULL;
167 h = getFreeSendSlot();
168 slot = (PersistentSendsTable *)h;
170 slot->destPE = destPE;
171 slot->sizeMax = ALIGN16(maxBytes);
173 PersistentRequestMsg *msg = (PersistentRequestMsg *)CmiAlloc(sizeof(PersistentRequestMsg));
174 msg->maxBytes = maxBytes;
175 msg->sourceHandler = h;
176 msg->requestorPE = CmiMyPe();
178 slot->previousMsg = NULL;
179 slot->compressStart = msg->compressStart = compressStart;
180 slot->dataType = msg->dataType = type;
181 slot->compressSize = 0;
182 slot->compressFlag = 1;
184 CmiSetHandler(msg, persistentRequestHandlerIdx);
185 CmiSyncSendAndFree(destPE,sizeof(PersistentRequestMsg),msg);
191 PersistentHandle CmiCreateCompressPersistentSize(int destPE, int maxBytes, int compressStart, int compressSize, int type)
194 PersistentSendsTable *slot;
196 if (CmiMyNode() == CmiNodeOf(destPE)) return NULL;
198 h = getFreeSendSlot();
199 slot = (PersistentSendsTable *)h;
201 slot->destPE = destPE;
202 slot->sizeMax = ALIGN16(maxBytes);
204 PersistentRequestMsg *msg = (PersistentRequestMsg *)CmiAlloc(sizeof(PersistentRequestMsg));
205 msg->maxBytes = maxBytes;
206 msg->sourceHandler = h;
207 msg->requestorPE = CmiMyPe();
209 slot->previousMsg = NULL;
210 slot->compressStart = msg->compressStart = compressStart;
211 slot->compressSize = compressSize;
212 slot->dataType = msg->dataType = type;
213 slot->compressFlag = 1;
215 CmiSetHandler(msg, persistentRequestHandlerIdx);
216 CmiSyncSendAndFree(destPE,sizeof(PersistentRequestMsg),msg);
221 PersistentHandle CmiCreatePersistent(int destPE, int maxBytes)
224 PersistentSendsTable *slot;
226 if (CmiMyNode() == CmiNodeOf(destPE)) return NULL;
228 h = getFreeSendSlot();
229 slot = (PersistentSendsTable *)h;
231 slot->destPE = destPE;
232 slot->sizeMax = ALIGN16(maxBytes);
234 PersistentRequestMsg *msg = (PersistentRequestMsg *)CmiAlloc(sizeof(PersistentRequestMsg));
235 msg->maxBytes = maxBytes;
236 msg->sourceHandler = h;
237 msg->requestorPE = CmiMyPe();
240 slot->compressFlag = 0;
242 CmiSetHandler(msg, persistentRequestHandlerIdx);
243 CmiSyncSendAndFree(destPE,sizeof(PersistentRequestMsg),msg);
249 static void persistentNoDecompressHandler(void *msg)
251 //no msg is compressed, just update history
252 PersistentReceivesTable *slot = (PersistentReceivesTable *) (((CmiMsgHeaderExt*)msg)-> persistRecvHandler);
253 int size = ((CmiMsgHeaderExt*)msg)->size;
254 slot->addrIndex = (slot->addrIndex + 1)%PERSIST_BUFFERS_NUM;
255 // uncompress data from historyIndex data
257 memcpy(slot->history, msg, size);
259 CldRestoreHandler((char *)msg);
260 (((CmiMsgHeaderExt*)msg)->xhdl) = (((CmiMsgHeaderExt*)msg)->xxhdl);
261 CmiHandleMessage(msg);
264 static void persistentDecompressHandler(void *msg)
266 // recover message based on previousRecvMsg
267 PersistentReceivesTable *slot = (PersistentReceivesTable *) (((CmiMsgHeaderExt*)msg)-> persistRecvHandler);
270 char *cmsg = (char*)msg;
271 int size = ((CmiMsgHeaderExt*)msg)->size;
272 int compressSize = *(int*)(msg+slot->compressStart);
273 int originalSize = *(int*)(msg+slot->compressStart+sizeof(int));
275 char *decompressData =(char*) malloc(originalSize);
277 char *history = slot->history;
279 historyIndex = (slot->addrIndex + 1)%PERSIST_BUFFERS_NUM;
280 slot->addrIndex = (slot->addrIndex + 1)%PERSIST_BUFFERS_NUM;
281 char *history = (char*)(slot->destBuf[historyIndex].destAddress);
283 //CmiPrintf("[%d] begin uncompress message is decompressed [%d:%d:%d start:%d]\n ", CmiMyPe(), size, compressSize, originalSize, slot->compressStart);
284 int left_size = size - slot->compressStart - originalSize;
285 char *base_dst = cmsg+size-1;
286 char *base_src = cmsg+ size - originalSize +compressSize+sizeof(int) -1;
287 for(i=0; i<left_size; i++)
289 *base_dst = *base_src;
294 if(slot->dataType == CMI_FLOATING)
295 decompressFloatingPoint(msg + slot->compressStart+2*sizeof(int), decompressData, originalSize, compressSize, history+slot->compressStart);
296 else if(slot->dataType == CMI_CHAR)
297 decompressChar(msg + slot->compressStart+2*sizeof(int), decompressData, originalSize, compressSize, history+slot->compressStart);
298 else if(slot->dataType == CMI_ZLIB)
299 decompressZlib(msg + slot->compressStart+2*sizeof(int), decompressData, originalSize, compressSize, history+slot->compressStart);
300 else if(slot->dataType == CMI_LZ4)
301 decompressLz4(msg + slot->compressStart+2*sizeof(int), decompressData, originalSize, compressSize, history+slot->compressStart);
302 memcpy(msg+slot->compressStart, decompressData, originalSize);
303 free(decompressData);
304 CldRestoreHandler(cmsg);
305 (((CmiMsgHeaderExt*)msg)->xhdl) = (((CmiMsgHeaderExt*)msg)->xxhdl);
309 char real1 = cmsg[size - originalSize +sizeof(int)+compressSize];
310 char real2 = cmsg[size - originalSize +sizeof(int)+compressSize+1];
311 char checksum1 = cmsg[0];
312 for(i=1; i< slot->compressStart; i++)
313 checksum1 ^= cmsg[i];
314 if(memcmp(&checksum1, &real1, 1))
315 CmiPrintf("receiver chumsum wrong header \n");
316 char checksum2 = cmsg[slot->compressStart];
317 for(i=slot->compressStart+1; i< size; i++)
318 checksum2 ^= cmsg[i];
319 if(memcmp(&checksum2, &real2, 1))
320 CmiPrintf("receiver chumsum wrong data \n");
324 memcpy(slot->history, msg, size);
326 CmiHandleMessage(msg);
330 int CompressPersistentMsg(PersistentHandle h, int size, void **m)
333 PersistentSendsTable *slot = (PersistentSendsTable *)h;
335 void *history = slot->previousMsg;
337 int compressSize=size;
341 slot->previousMsg = msg;
343 (((CmiMsgHeaderExt*)msg)->xxhdl) = (((CmiMsgHeaderExt*)msg)->xhdl);
344 CldSwitchHandler((char *)msg, persistentNoDecompressHandlerIdx);
347 if(slot->compressSize == 0)
349 slot->compressSize = size - slot->compressStart;
351 if(slot->compressSize>100)
353 dest = CmiAlloc(size);
354 compressChar((char*)msg+slot->compressStart, (char*)dest+slot->compressStart+sizeof(int), slot->compressSize, &compressSize, (char*)history+slot->compressStart);
360 if(slot->compressSize-compressSize <= 100) //no compress
363 (((CmiMsgHeaderExt*)msg)->xxhdl) = (((CmiMsgHeaderExt*)msg)->xhdl);
364 CldSwitchHandler((char *)msg, persistentNoDecompressHandlerIdx);
370 memcpy(dest, msg, slot->compressStart);
372 *(int*)(dest+slot->compressStart) = compressSize;
374 int leftSize = size - slot->compressStart - slot->compressSize;
376 memcpy((char*)dest+slot->compressStart+sizeof(int)+compressSize, msg+slot->compressStart+slot->compressSize, leftSize);
377 newSize = size-slot->compressSize+compressSize+sizeof(int);
378 (((CmiMsgHeaderExt*)dest)->xxhdl) = (((CmiMsgHeaderExt*)dest)->xhdl);
379 CldSwitchHandler((char *)dest, persistentDecompressHandlerIdx);
380 CmiPrintf(" handler =(%d : %d : %d) (%d:%d:%d) %d\n", (((CmiMsgHeaderExt*)dest)->hdl), (((CmiMsgHeaderExt*)dest)->xhdl), (((CmiMsgHeaderExt*)dest)->xxhdl), (((CmiMsgHeaderExt*)msg)->hdl), (((CmiMsgHeaderExt*)msg)->xhdl), (((CmiMsgHeaderExt*)msg)->xxhdl), persistentDecompressHandlerIdx);
385 ((CmiMsgHeaderExt*)*m)-> persistRecvHandler = slot->destDataHandle;
386 ((CmiMsgHeaderExt*)*m)->size = size;
391 int CompressPersistentMsg(PersistentHandle h, int size, void *msg)
393 PersistentSendsTable *slot = (PersistentSendsTable *)h;
395 void *history = slot->previousMsg;
397 int compressSize=size;
399 char *cmsg = (char*)msg;
402 ((CmiMsgHeaderExt*)msg)-> persistRecvHandler = slot->destDataHandle;
403 ((CmiMsgHeaderExt*)msg)->size = size;
408 slot->previousMsg = msg;
409 slot->previousSize = size;
411 (((CmiMsgHeaderExt*)msg)->xxhdl) = (((CmiMsgHeaderExt*)msg)->xhdl);
412 CldSwitchHandler(cmsg, persistentNoDecompressHandlerIdx);
413 }else if(size != slot->previousSize) //persistent msg size changes
416 CmiFree(slot->previousMsg);
417 slot->previousMsg = msg;
418 if(slot->compressSize == slot->previousSize - slot->compressStart)
419 slot->compressSize = size - slot->compressStart;
420 slot->previousSize = size;
422 (((CmiMsgHeaderExt*)msg)->xxhdl) = (((CmiMsgHeaderExt*)msg)->xhdl);
423 CldSwitchHandler(cmsg, persistentNoDecompressHandlerIdx);
427 if(slot->compressSize == 0) {slot->compressSize = size-slot->compressStart; }
431 void *history_save = CmiAlloc(size);
432 memcpy(history_save, history, size);
434 for(i=1; i< slot->compressStart; i++)
435 checksum1 ^= cmsg[i];
436 checksum2 = cmsg[slot->compressStart];
437 for(i=slot->compressStart+1; i< size; i++)
438 checksum2 ^= cmsg[i];
440 //dest = malloc(slot->compressSize);
441 #if EXTERNAL_COMPRESS
442 int maxSize = (slot->compressSize+40)>LZ4_compressBound(slot->compressSize) ? slot->compressSize+40 : LZ4_compressBound(slot->compressSize);
444 int maxSize = slot->compressSize;
446 dest = malloc(maxSize);
447 if(slot->dataType == CMI_FLOATING)
448 compressFloatingPoint(msg+slot->compressStart, dest, slot->compressSize, &compressSize, history+slot->compressStart);
449 else if(slot->dataType == CMI_CHAR)
450 compressChar(msg+slot->compressStart, dest, slot->compressSize, &compressSize, history+slot->compressStart);
451 else if(slot->dataType == CMI_ZLIB)
452 compressZlib(msg+slot->compressStart, dest, slot->compressSize, &compressSize, history+slot->compressStart);
453 else if(slot->dataType == CMI_LZ4)
454 compressLz4(msg+slot->compressStart, dest, slot->compressSize, &compressSize, history+slot->compressStart);
457 void *recover = malloc(slot->compressSize);
458 decompressChar(dest, recover, slot->compressSize, compressSize, history_save+slot->compressStart);
459 if(memcmp(msg+slot->compressStart, recover, slot->compressSize))
460 CmiPrintf("sth wrong with compression\n");
462 if(slot->compressSize - compressSize <= 100) //not compress
465 (((CmiMsgHeaderExt*)msg)->xxhdl) = (((CmiMsgHeaderExt*)msg)->xhdl);
466 CldSwitchHandler(cmsg, persistentNoDecompressHandlerIdx);
467 CmiFree(slot->previousMsg);
468 slot->previousMsg = msg;
472 memcpy(history+slot->compressStart, msg+slot->compressStart, slot->compressSize);
473 *(int*)(msg+slot->compressStart) = compressSize;
474 *(int*)(msg+slot->compressStart+sizeof(int)) = slot->compressSize;
475 memcpy(msg+slot->compressStart+2*sizeof(int), dest, compressSize);
476 int leftSize = size-slot->compressStart-slot->compressSize;
477 //depending on memcpy implementation, this might not be safe
479 memcpy(msg+slot->compressStart+compressSize+2*sizeof(int), msg+slot->compressStart+slot->compressSize, leftSize);
480 newSize = slot->compressStart + compressSize + 2*sizeof(int) +leftSize;
481 (((CmiMsgHeaderExt*)msg)->xxhdl) = (((CmiMsgHeaderExt*)msg)->xhdl);
482 CldSwitchHandler(cmsg, persistentDecompressHandlerIdx);
484 memcpy(msg+newSize, &checksum1, 1);
485 memcpy(msg+newSize+1, &checksum2, 1);
486 char *orig = CmiAlloc(size);
487 memcpy(orig, msg, newSize);
489 char *decompressData =(char*) malloc(slot->compressSize);
490 int left_size = size - slot->compressStart - slot->compressSize;
491 char *base_dst = orig+size-1;
492 char *base_src = orig + size - slot->compressSize +compressSize+2*sizeof(int) -1;
493 for(i=0; i<left_size; i++)
495 *base_dst = *base_src;
500 decompressChar(orig+slot->compressStart+2*sizeof(int), decompressData, slot->compressSize, compressSize, history_save+slot->compressStart);
501 memcpy(orig+slot->compressStart, decompressData, slot->compressSize);
502 free(decompressData);
503 CldRestoreHandler(orig);
504 (((CmiMsgHeaderExt*)orig)->xhdl) = (((CmiMsgHeaderExt*)orig)->xxhdl);
505 if(memcmp(orig, history, slot->compressStart))
506 CmiPrintf("sth wrong header all \n");
507 if(memcmp(orig+slot->compressStart, history+slot->compressStart, slot->compressSize))
508 CmiPrintf("sth wrong data \n");
523 PersistentHandle CmiCreateNodePersistent(int destNode, int maxBytes)
525 /* randomly pick one rank on the destination node is fine for setup.
526 actual message will be handled by comm thread anyway */
527 int pe = CmiNodeFirst(destNode) + rand()/RAND_MAX * CmiMyNodeSize();
528 return CmiCreatePersistent(pe, maxBytes);
530 PersistentHandle CmiCreateCompressNodePersistent(int destNode, int maxBytes, int start, int type)
532 /* randomly pick one rank on the destination node is fine for setup.
533 actual message will be handled by comm thread anyway */
534 int pe = CmiNodeFirst(destNode) + rand()/RAND_MAX * CmiMyNodeSize();
535 return CmiCreateCompressPersistent(pe, maxBytes, start, type);
538 PersistentHandle CmiCreateCompressNodePersistentSize(int destNode, int maxBytes, int start, int compressSize, int type)
540 /* randomly pick one rank on the destination node is fine for setup.
541 actual message will be handled by comm thread anyway */
542 int pe = CmiNodeFirst(destNode) + rand()/RAND_MAX * CmiMyNodeSize();
543 return CmiCreateCompressPersistentSize(pe, maxBytes, start, compressSize, type);
547 static void persistentRequestHandler(void *env)
549 PersistentRequestMsg *msg = (PersistentRequestMsg *)env;
553 PersistentHandle h = getFreeRecvSlot();
554 PersistentReceivesTable *slot = (PersistentReceivesTable *)h;
556 /* build reply message */
557 PersistentReqGrantedMsg *gmsg = CmiAlloc(sizeof(PersistentReqGrantedMsg));
560 slot->compressStart = msg->compressStart;
561 slot->dataType = msg->dataType;
563 slot->history = malloc(msg->maxBytes);
566 setupRecvSlot(slot, msg->maxBytes);
568 for (i=0; i<PERSIST_BUFFERS_NUM; i++) {
570 gmsg->msgAddr[i] = slot->messagePtr[i];
571 gmsg->slotFlagAddress[i] = slot->recvSizePtr[i];
573 gmsg->buf[i] = slot->destBuf[i];
577 gmsg->sourceHandler = msg->sourceHandler;
578 gmsg->destHandler = getPersistentHandle(h, 1);
580 gmsg->destDataHandler = h;
581 //CmiPrintf("[%d] receiver slot=%p, current=%d, h=%p =%p \n", CmiMyPe(), slot, slot->addrIndex, h, gmsg->destDataHandler);
583 CmiSetHandler(gmsg, persistentReqGrantedHandlerIdx);
584 CmiSyncSendAndFree(msg->requestorPE,sizeof(PersistentReqGrantedMsg),gmsg);
589 static void persistentReqGrantedHandler(void *env)
593 PersistentReqGrantedMsg *msg = (PersistentReqGrantedMsg *)env;
594 PersistentHandle h = msg->sourceHandler;
595 PersistentSendsTable *slot = (PersistentSendsTable *)h;
597 /* CmiPrintf("[%d] Persistent handler granted h:%p\n", CmiMyPe(), h); */
599 for (i=0; i<PERSIST_BUFFERS_NUM; i++) {
601 slot->destAddress[i] = msg->msgAddr[i];
602 slot->destSizeAddress[i] = msg->slotFlagAddress[i];
604 slot->destBuf[i] = msg->buf[i];
607 slot->destHandle = msg->destHandler;
609 slot->destDataHandle = msg->destDataHandler;
610 //CmiPrintf("+++[%d] req grant %p\n", CmiMyPe(), slot->destDataHandle);
612 if (slot->messageBuf) {
613 LrtsSendPersistentMsg(h, CmiGetNodeGlobal(CmiNodeOf(slot->destPE),CmiMyPartition()), slot->messageSize, slot->messageBuf);
614 slot->messageBuf = NULL;
621 receiver initiate the persistent communication
623 PersistentReq CmiCreateReceiverPersistent(int maxBytes)
628 PersistentHandle h = getFreeRecvSlot();
629 PersistentReceivesTable *slot = (PersistentReceivesTable *)h;
631 setupRecvSlot(slot, maxBytes);
634 ret.maxBytes = maxBytes;
636 ret.bufPtr = (void **)malloc(PERSIST_BUFFERS_NUM*sizeof(void*));
637 for (i=0; i<PERSIST_BUFFERS_NUM; i++) {
639 ret.messagePtr[i] = slot->messagePtr[i];
640 ret.recvSizePtr[i] = slot->recvSizePtr[i];
642 ret.bufPtr[i] = malloc(sizeof(PersistentBuf));
643 memcpy(&ret.bufPtr[i], &slot->destBuf[i], sizeof(PersistentBuf));
650 PersistentHandle CmiRegisterReceivePersistent(PersistentReq recvHand)
653 PersistentHandle h = getFreeSendSlot();
655 PersistentSendsTable *slot = (PersistentSendsTable *)h;
656 slot->destPE = recvHand.pe;
657 slot->sizeMax = recvHand.maxBytes;
660 for (i=0; i<PERSIST_BUFFERS_NUM; i++) {
661 slot->destAddress[i] = recvHand.messagePtr[i];
662 slot->destSizeAddress[i] = recvHand.recvSizePtr[i];
665 memcpy(slot->destBuf, recvHand.bufPtr, PERSIST_BUFFERS_NUM*sizeof(PersistentBuf));
667 slot->destHandle = recvHand.myHand;
671 /******************************************************************************
672 destroy Persistent Comm handler
673 ******************************************************************************/
675 /* Converse Handler */
676 void persistentDestroyHandler(void *env)
679 PersistentDestroyMsg *msg = (PersistentDestroyMsg *)env;
680 PersistentHandle h = getPersistentHandle(msg->destHandlerIndex, 0);
683 PersistentReceivesTable *slot = (PersistentReceivesTable *)h;
685 CpvAccess(persistentReceivesTableCount) --;
687 slot->prev->next = slot->next;
690 CpvAccess(persistentReceivesTableHead) = slot->next;
692 slot->next->prev = slot->prev;
695 CpvAccess(persistentReceivesTableTail) = slot->prev;
697 for (i=0; i<PERSIST_BUFFERS_NUM; i++)
698 if (slot->destBuf[i].destAddress)
699 PerFree((char*)slot->destBuf[i].destAddress);
706 /* FIXME: need to buffer until ReqGranted message come back? */
707 void CmiDestroyPersistent(PersistentHandle h)
709 if (h == NULL) return;
711 PersistentSendsTable *slot = (PersistentSendsTable *)h;
712 /* CmiAssert(slot->destHandle != 0); */
714 PersistentDestroyMsg *msg = (PersistentDestroyMsg *)
715 CmiAlloc(sizeof(PersistentDestroyMsg));
716 msg->destHandlerIndex = slot->destHandle;
718 CmiSetHandler(msg, persistentDestroyHandlerIdx);
719 CmiSyncSendAndFree(slot->destPE,sizeof(PersistentDestroyMsg),msg);
723 slot->prev->next = slot->next;
726 CpvAccess(persistentSendsTableHead) = slot->next;
728 slot->next->prev = slot->prev;
731 CpvAccess(persistentSendsTableTail) = slot->prev;
734 CpvAccess(persistentSendsTableCount) --;
738 void CmiDestroyAllPersistent(void)
740 PersistentSendsTable *sendslot = CpvAccess(persistentSendsTableHead);
742 PersistentSendsTable *next = sendslot->next;
746 CpvAccess(persistentSendsTableHead) = CpvAccess(persistentSendsTableTail) = NULL;
747 CpvAccess(persistentSendsTableCount) = 0;
749 PersistentReceivesTable *slot = CpvAccess(persistentReceivesTableHead);
751 PersistentReceivesTable *next = slot->next;
753 for (i=0; i<PERSIST_BUFFERS_NUM; i++) {
754 //if (slot->destBuf[i].destSizeAddress)
755 // CmiPrintf("Warning: CmiDestroyAllPersistent destoried buffered undelivered message.\n");
756 if (slot->destBuf[i].destAddress) PerFree((char*)slot->destBuf[i].destAddress);
761 CpvAccess(persistentReceivesTableHead) = CpvAccess(persistentReceivesTableTail) = NULL;
762 CpvAccess(persistentReceivesTableCount) = 0;
766 void CmiPersistentInit(void)
770 persistentRequestHandlerIdx =
771 CmiRegisterHandler((CmiHandler)persistentRequestHandler);
772 persistentReqGrantedHandlerIdx =
773 CmiRegisterHandler((CmiHandler)persistentReqGrantedHandler);
774 persistentDestroyHandlerIdx =
775 CmiRegisterHandler((CmiHandler)persistentDestroyHandler);
778 persistentDecompressHandlerIdx =
779 CmiRegisterHandler((CmiHandler)persistentDecompressHandler);
780 persistentNoDecompressHandlerIdx =
781 CmiRegisterHandler((CmiHandler)persistentNoDecompressHandler);
784 CpvInitialize(PersistentHandle*, phs);
785 CpvAccess(phs) = NULL;
786 CpvInitialize(int, phsSize);
787 CpvInitialize(int, curphs);
788 CpvAccess(curphs) = 0;
790 persist_machine_init();
792 CpvInitialize(PersistentSendsTable *, persistentSendsTableHead);
793 CpvInitialize(PersistentSendsTable *, persistentSendsTableTail);
794 CpvAccess(persistentSendsTableHead) = CpvAccess(persistentSendsTableTail) = NULL;
795 CpvInitialize(int, persistentSendsTableCount);
796 CpvAccess(persistentSendsTableCount) = 0;
798 CpvInitialize(PersistentReceivesTable *, persistentReceivesTableHead);
799 CpvInitialize(PersistentReceivesTable *, persistentReceivesTableTail);
800 CpvAccess(persistentReceivesTableHead) = CpvAccess(persistentReceivesTableTail) = NULL;
801 CpvInitialize(int, persistentReceivesTableCount);
802 CpvAccess(persistentReceivesTableCount) = 0;
805 void CmiUsePersistentHandle(PersistentHandle *p, int n)
807 if (n==1 && *p == NULL) { p = NULL; n = 0; }
808 #if CMK_ERROR_CHECKING && 0
812 if (p[i] == NULL) CmiAbort("CmiUsePersistentHandle: invalid PersistentHandle.\n");
816 CpvAccess(phsSize) = n;
817 CpvAccess(curphs) = 0;
820 void CmiPersistentOneSend(void)
822 if (CpvAccess(phs)) CpvAccess(curphs)++;