Clean up C linkage specific to the C++ migration
[charm.git] / src / arch / util / persist-comm.C
blob2062e68d6765419f1cd97a8be874616cadebf879
1 /** @file
2  * Support for persistent communication setup
3  * @ingroup Machine
4  */
6 /**
7  * \addtogroup Machine
8 */
9 /*@{*/
11 #include "converse.h"
12 #if CMK_PERSISTENT_COMM
13 //#define EXTERNAL_COMPRESS 1
14 //#if EXTERNAL_COMPRESS
15 //#else
16 #include "compress.C"
17 #include "compress-external.C"
18 //#endif
19 #include "machine-persistent.h"
20 #define ENVELOP_SIZE 104
21 //#define VERIFY 1 
22 CpvDeclare(PersistentSendsTable *, persistentSendsTableHead);
23 CpvDeclare(PersistentSendsTable *, persistentSendsTableTail);
24 CpvDeclare(int, persistentSendsTableCount);
25 CpvDeclare(PersistentReceivesTable *, persistentReceivesTableHead);
26 CpvDeclare(PersistentReceivesTable *, persistentReceivesTableTail);
27 CpvDeclare(int, persistentReceivesTableCount);
29 /* Converse message type */
30 typedef struct _PersistentRequestMsg {
31   char core[CmiMsgHeaderSizeBytes];
32   int requestorPE;
33   int maxBytes;
34   PersistentHandle sourceHandler;
35 #if DELTA_COMPRESS
36   int   compressStart;
37   int   dataType;
38 #endif
39 } PersistentRequestMsg;
41 typedef struct _PersistentReqGrantedMsg {
42   char core[CmiMsgHeaderSizeBytes];
44   void *msgAddr[PERSIST_BUFFERS_NUM];
45   void *slotFlagAddress[PERSIST_BUFFERS_NUM];
47   PersistentBuf    buf[PERSIST_BUFFERS_NUM];
48   PersistentHandle sourceHandler;
49   PersistentHandle destHandler;
50   PersistentHandle destDataHandler;
51 } PersistentReqGrantedMsg;
53 typedef struct _PersistentDestroyMsg {
54   char core[CmiMsgHeaderSizeBytes];
55   PersistentHandle destHandlerIndex;
56 } PersistentDestroyMsg;
58 /* Converse handler */
59 int persistentRequestHandlerIdx;
60 int persistentReqGrantedHandlerIdx;
61 int persistentDestroyHandlerIdx;
62 int persistentDecompressHandlerIdx;
63 int persistentNoDecompressHandlerIdx;
65 CpvDeclare(PersistentHandle *, phs);
66 CpvDeclare(int, phsSize);
67 CpvDeclare(int, curphs);
69 /******************************************************************************
70      Utilities
71 ******************************************************************************/
73 void initRecvSlot(PersistentReceivesTable *slot);
74 void initSendSlot(PersistentSendsTable *slot);
76 void swapSendSlotBuffers(PersistentSendsTable *slot)
78   if (PERSIST_BUFFERS_NUM == 2) {
79 #if 0
80   void *tmp = slot->destAddress[0];
81   slot->destAddress[0] = slot->destAddress[1];
82   slot->destAddress[1] = tmp;
83   tmp = slot->destSizeAddress[0];
84   slot->destSizeAddress[0] = slot->destSizeAddress[1];
85   slot->destSizeAddress[1] = tmp;
86 #else
87   PersistentBuf tmp = slot->destBuf[0];
88   slot->destBuf[0] = slot->destBuf[1];
89   slot->destBuf[1] = tmp;
90 #endif
91   }
94 void swapRecvSlotBuffers(PersistentReceivesTable *slot)
96   if (PERSIST_BUFFERS_NUM == 2) {
97 #if 0
98   void *tmp = slot->messagePtr[0];
99   slot->messagePtr[0] = slot->messagePtr[1];
100   slot->messagePtr[1] = tmp;
101   tmp = slot->recvSizePtr[0];
102   slot->recvSizePtr[0] = slot->recvSizePtr[1];
103   slot->recvSizePtr[1] = tmp;
104 #else
105   PersistentBuf tmp = slot->destBuf[0];
106   slot->destBuf[0] = slot->destBuf[1];
107   slot->destBuf[1] = tmp;
108 #endif
109   }
112 PersistentHandle getFreeSendSlot(void)
114   PersistentSendsTable *slot = (PersistentSendsTable *)malloc(sizeof(PersistentSendsTable));
115   initSendSlot(slot);
116   if (CpvAccess(persistentSendsTableHead) == NULL) {
117     CpvAccess(persistentSendsTableHead) = CpvAccess(persistentSendsTableTail) = slot;
118   }
119   else {
120     CpvAccess(persistentSendsTableTail)->next = slot;
121     slot->prev = CpvAccess(persistentSendsTableTail);
122     CpvAccess(persistentSendsTableTail) = slot;
123   }
124   CpvAccess(persistentSendsTableCount)++;
125   return slot;
128 PersistentHandle getFreeRecvSlot(void)
130   PersistentReceivesTable *slot = (PersistentReceivesTable *)malloc(sizeof(PersistentReceivesTable));
131   initRecvSlot(slot);
132   if (CpvAccess(persistentReceivesTableHead) == NULL) {
133     CpvAccess(persistentReceivesTableHead) = CpvAccess(persistentReceivesTableTail) = slot;
134   }
135   else {
136     CpvAccess(persistentReceivesTableTail)->next = slot;
137     slot->prev = CpvAccess(persistentReceivesTableTail);
138     CpvAccess(persistentReceivesTableTail) = slot;
139   }
140   CpvAccess(persistentReceivesTableCount)++;
141   return slot;
144 /******************************************************************************
145      Create Persistent Comm handler
146      When creating a persistent comm with destPE and maxSize
147      1. allocate a free PersistentSendsTable entry, send a 
148         PersistentRequestMsg message to destPE
149         buffer persistent message before  Persistent Comm is setup;
150      2. destPE execute Converse handler persistentRequestHandler() on the
151         PersistentRequestMsg message:
152         allocate a free PersistentReceivesTable entry;
153         allocate a message buffer of size maxSize for the communication;
154         Send back a PersistentReqGrantedMsg with message address, etc for
155         rdma_put;
156      3. Converse handler persistentReqGrantedHandler() executed on
157         sender for the PersistentReqGrantedMsg. setup finish, send buffered
158         message.
159 ******************************************************************************/
160 PersistentHandle CmiCreateCompressPersistent(int destPE, int maxBytes, int compressStart, int type)
162   PersistentHandle h;
163   PersistentSendsTable *slot;
165   if (CmiMyNode() == CmiNodeOf(destPE)) return NULL;
167   h = getFreeSendSlot();
168   slot = (PersistentSendsTable *)h;
170   slot->destPE = destPE;
171   slot->sizeMax = ALIGN16(maxBytes);
172   slot->addrIndex = 0;
173   PersistentRequestMsg *msg = (PersistentRequestMsg *)CmiAlloc(sizeof(PersistentRequestMsg));
174   msg->maxBytes = maxBytes;
175   msg->sourceHandler = h;
176   msg->requestorPE = CmiMyPe();
177 #if DELTA_COMPRESS
178   slot->previousMsg  = NULL; 
179   slot->compressStart =  msg->compressStart = compressStart;
180   slot->dataType = msg->dataType = type;
181   slot->compressSize = 0;
182   slot->compressFlag = 1;
183 #endif
184   CmiSetHandler(msg, persistentRequestHandlerIdx);
185   CmiSyncSendAndFree(destPE,sizeof(PersistentRequestMsg),msg);
187   return h;
191 PersistentHandle CmiCreateCompressPersistentSize(int destPE, int maxBytes, int compressStart, int compressSize, int type)
193   PersistentHandle h;
194   PersistentSendsTable *slot;
196   if (CmiMyNode() == CmiNodeOf(destPE)) return NULL;
198   h = getFreeSendSlot();
199   slot = (PersistentSendsTable *)h;
201   slot->destPE = destPE;
202   slot->sizeMax = ALIGN16(maxBytes);
203   slot->addrIndex = 0;
204   PersistentRequestMsg *msg = (PersistentRequestMsg *)CmiAlloc(sizeof(PersistentRequestMsg));
205   msg->maxBytes = maxBytes;
206   msg->sourceHandler = h;
207   msg->requestorPE = CmiMyPe();
208 #if DELTA_COMPRESS
209   slot->previousMsg  = NULL; 
210   slot->compressStart =  msg->compressStart = compressStart;
211   slot->compressSize = compressSize;
212   slot->dataType = msg->dataType = type;
213   slot->compressFlag = 1;
214 #endif
215   CmiSetHandler(msg, persistentRequestHandlerIdx);
216   CmiSyncSendAndFree(destPE,sizeof(PersistentRequestMsg),msg);
218   return h;
221 PersistentHandle CmiCreatePersistent(int destPE, int maxBytes)
223   PersistentHandle h;
224   PersistentSendsTable *slot;
226   if (CmiMyNode() == CmiNodeOf(destPE)) return NULL;
228   h = getFreeSendSlot();
229   slot = (PersistentSendsTable *)h;
231   slot->destPE = destPE;
232   slot->sizeMax = ALIGN16(maxBytes);
233   slot->addrIndex = 0;
234   PersistentRequestMsg *msg = (PersistentRequestMsg *)CmiAlloc(sizeof(PersistentRequestMsg));
235   msg->maxBytes = maxBytes;
236   msg->sourceHandler = h;
237   msg->requestorPE = CmiMyPe();
239 #if DELTA_COMPRESS
240   slot->compressFlag = 0;
241 #endif
242   CmiSetHandler(msg, persistentRequestHandlerIdx);
243   CmiSyncSendAndFree(destPE,sizeof(PersistentRequestMsg),msg);
245   return h;
248 #if DELTA_COMPRESS
249 static void persistentNoDecompressHandler(void *msg)
251     //no msg is compressed, just update history
252     PersistentReceivesTable *slot = (PersistentReceivesTable *) (((CmiMsgHeaderExt*)msg)-> persistRecvHandler);
253     int size = ((CmiMsgHeaderExt*)msg)->size;
254     slot->addrIndex = (slot->addrIndex + 1)%PERSIST_BUFFERS_NUM;
255     // uncompress data from historyIndex data
256 #if COPY_HISTORY 
257     memcpy(slot->history, msg, size);
258 #endif
259     CldRestoreHandler((char *)msg);
260     (((CmiMsgHeaderExt*)msg)->xhdl) =  (((CmiMsgHeaderExt*)msg)->xxhdl);
261     CmiHandleMessage(msg);
264 static void persistentDecompressHandler(void *msg)
266     //  recover message based on previousRecvMsg
267     PersistentReceivesTable *slot = (PersistentReceivesTable *) (((CmiMsgHeaderExt*)msg)-> persistRecvHandler);
268     int     historyIndex;
269     int i;
270     char    *cmsg = (char*)msg;
271     int     size = ((CmiMsgHeaderExt*)msg)->size;
272     int     compressSize = *(int*)(msg+slot->compressStart);
273     int     originalSize = *(int*)(msg+slot->compressStart+sizeof(int));
274     
275     char    *decompressData =(char*) malloc(originalSize);
276 #if COPY_HISTORY
277     char *history = slot->history;
278 #else
279     historyIndex = (slot->addrIndex + 1)%PERSIST_BUFFERS_NUM;
280     slot->addrIndex = (slot->addrIndex + 1)%PERSIST_BUFFERS_NUM;
281     char *history = (char*)(slot->destBuf[historyIndex].destAddress);
282 #endif
283     //CmiPrintf("[%d] begin uncompress message is decompressed [%d:%d:%d start:%d]\n ", CmiMyPe(), size, compressSize, originalSize, slot->compressStart);
284     int  left_size = size - slot->compressStart - originalSize;
285     char *base_dst = cmsg+size-1;
286     char *base_src = cmsg+ size - originalSize +compressSize+sizeof(int) -1;
287     for(i=0; i<left_size; i++)
288     {
289        *base_dst = *base_src;
290        base_dst--;
291        base_src--;
292     }
294     if(slot->dataType == CMI_FLOATING) 
295         decompressFloatingPoint(msg + slot->compressStart+2*sizeof(int), decompressData, originalSize, compressSize, history+slot->compressStart);
296     else if(slot->dataType == CMI_CHAR) 
297         decompressChar(msg + slot->compressStart+2*sizeof(int), decompressData, originalSize, compressSize, history+slot->compressStart);
298     else if(slot->dataType == CMI_ZLIB) 
299         decompressZlib(msg + slot->compressStart+2*sizeof(int), decompressData, originalSize, compressSize, history+slot->compressStart);
300     else if(slot->dataType == CMI_LZ4) 
301         decompressLz4(msg + slot->compressStart+2*sizeof(int), decompressData, originalSize, compressSize, history+slot->compressStart);
302     memcpy(msg+slot->compressStart, decompressData, originalSize);
303     free(decompressData);
304     CldRestoreHandler(cmsg);
305     (((CmiMsgHeaderExt*)msg)->xhdl) =  (((CmiMsgHeaderExt*)msg)->xxhdl);
307 #if VERIFY
308    
309     char    real1 = cmsg[size - originalSize +sizeof(int)+compressSize];
310     char    real2 = cmsg[size - originalSize +sizeof(int)+compressSize+1];
311     char checksum1 = cmsg[0];
312     for(i=1; i< slot->compressStart; i++)
313         checksum1 ^= cmsg[i];
314     if(memcmp(&checksum1, &real1, 1))
315         CmiPrintf("receiver chumsum wrong header \n");
316     char  checksum2 = cmsg[slot->compressStart];
317     for(i=slot->compressStart+1; i< size; i++)
318         checksum2 ^= cmsg[i];
319     if(memcmp(&checksum2, &real2, 1))
320         CmiPrintf("receiver chumsum wrong data \n");
322 #endif
323 #if COPY_HISTORY
324     memcpy(slot->history, msg, size);
325 #endif
326     CmiHandleMessage(msg);
329 #if 0
330 int CompressPersistentMsg(PersistentHandle h, int size, void **m)
332     void *msg = *m;
333     PersistentSendsTable *slot = (PersistentSendsTable *)h;
334     int  newSize;
335     void *history = slot->previousMsg;
336     void *dest=NULL;
337     int compressSize=size;
338     if(history == NULL)
339     {
340         newSize = size;
341         slot->previousMsg = msg;
342         CmiReference(msg);
343         (((CmiMsgHeaderExt*)msg)->xxhdl) = (((CmiMsgHeaderExt*)msg)->xhdl);
344         CldSwitchHandler((char *)msg, persistentNoDecompressHandlerIdx);
345     }else
346     {
347         if(slot->compressSize == 0)
348         {
349             slot->compressSize = size - slot->compressStart;
350         }
351         if(slot->compressSize>100)
352         {
353             dest = CmiAlloc(size);
354             compressChar((char*)msg+slot->compressStart, (char*)dest+slot->compressStart+sizeof(int), slot->compressSize, &compressSize, (char*)history+slot->compressStart);
355         }
356     
357         CmiFree(history);
358         history = msg;
359         CmiReference(msg);
360         if(slot->compressSize-compressSize <= 100) //no compress
361         {
362             newSize = size;
363             (((CmiMsgHeaderExt*)msg)->xxhdl) = (((CmiMsgHeaderExt*)msg)->xhdl);
364             CldSwitchHandler((char *)msg, persistentNoDecompressHandlerIdx);
365             if(dest != NULL)
366                 CmiFree(dest);
367         }else
368         {
369             //header
370             memcpy(dest, msg, slot->compressStart);
371             //compressedSize
372             *(int*)(dest+slot->compressStart) = compressSize;
373             //tail
374             int leftSize = size - slot->compressStart - slot->compressSize;
375             if(leftSize > 0)
376                 memcpy((char*)dest+slot->compressStart+sizeof(int)+compressSize, msg+slot->compressStart+slot->compressSize, leftSize);
377             newSize = size-slot->compressSize+compressSize+sizeof(int);
378             (((CmiMsgHeaderExt*)dest)->xxhdl) = (((CmiMsgHeaderExt*)dest)->xhdl);
379             CldSwitchHandler((char *)dest, persistentDecompressHandlerIdx);
380             CmiPrintf(" handler =(%d : %d : %d) (%d:%d:%d)  %d\n", (((CmiMsgHeaderExt*)dest)->hdl), (((CmiMsgHeaderExt*)dest)->xhdl), (((CmiMsgHeaderExt*)dest)->xxhdl), (((CmiMsgHeaderExt*)msg)->hdl),  (((CmiMsgHeaderExt*)msg)->xhdl), (((CmiMsgHeaderExt*)msg)->xxhdl), persistentDecompressHandlerIdx);
381             *m=dest;
382         }
383         //update history
384     }
385     ((CmiMsgHeaderExt*)*m)-> persistRecvHandler = slot->destDataHandle;
386     ((CmiMsgHeaderExt*)*m)->size = size;
387     return newSize;
390 #else
391 int CompressPersistentMsg(PersistentHandle h, int size, void *msg)
393     PersistentSendsTable *slot = (PersistentSendsTable *)h;
394     int  newSize;
395     void *history = slot->previousMsg;
396     void *dest=NULL;
397     int  compressSize=size;
398     int  i;
399     char *cmsg = (char*)msg;
401    
402     ((CmiMsgHeaderExt*)msg)-> persistRecvHandler = slot->destDataHandle;
403     ((CmiMsgHeaderExt*)msg)->size = size;
404     
405     if(history == NULL)
406     {
407         newSize = size;
408         slot->previousMsg = msg;
409         slot->previousSize = size;
410         CmiReference(msg);
411         (((CmiMsgHeaderExt*)msg)->xxhdl) = (((CmiMsgHeaderExt*)msg)->xhdl);
412         CldSwitchHandler(cmsg, persistentNoDecompressHandlerIdx);
413     }else if(size != slot->previousSize)    //persistent msg size changes
414     {
415         newSize = size;
416         CmiFree(slot->previousMsg);
417         slot->previousMsg = msg;
418         if(slot->compressSize == slot->previousSize - slot->compressStart)
419             slot->compressSize = size - slot->compressStart;
420         slot->previousSize = size;
421         CmiReference(msg);
422         (((CmiMsgHeaderExt*)msg)->xxhdl) = (((CmiMsgHeaderExt*)msg)->xhdl);
423         CldSwitchHandler(cmsg, persistentNoDecompressHandlerIdx);
424     }
425     else {
426         
427         if(slot->compressSize == 0) {slot->compressSize = size-slot->compressStart; }
428 #if VERIFY
429         char checksum1;
430         char checksum2;
431         void *history_save = CmiAlloc(size);
432         memcpy(history_save, history, size);
433         checksum1 = cmsg[0];
434         for(i=1; i< slot->compressStart; i++)
435             checksum1 ^= cmsg[i];
436         checksum2 = cmsg[slot->compressStart];
437         for(i=slot->compressStart+1; i< size; i++)
438             checksum2 ^= cmsg[i];
439 #endif
440         //dest = malloc(slot->compressSize);
441 #if EXTERNAL_COMPRESS
442         int maxSize = (slot->compressSize+40)>LZ4_compressBound(slot->compressSize) ? slot->compressSize+40 : LZ4_compressBound(slot->compressSize);
443 #else
444         int maxSize = slot->compressSize;
445 #endif
446         dest = malloc(maxSize);
447     if(slot->dataType == CMI_FLOATING) 
448         compressFloatingPoint(msg+slot->compressStart, dest, slot->compressSize, &compressSize, history+slot->compressStart);
449     else if(slot->dataType == CMI_CHAR)
450         compressChar(msg+slot->compressStart, dest, slot->compressSize, &compressSize, history+slot->compressStart);
451     else if(slot->dataType == CMI_ZLIB)
452         compressZlib(msg+slot->compressStart, dest, slot->compressSize, &compressSize, history+slot->compressStart);
453     else if(slot->dataType == CMI_LZ4) 
454         compressLz4(msg+slot->compressStart, dest, slot->compressSize, &compressSize, history+slot->compressStart);
456 #if VERIFY
457         void *recover = malloc(slot->compressSize);
458         decompressChar(dest, recover, slot->compressSize, compressSize, history_save+slot->compressStart);
459         if(memcmp(msg+slot->compressStart, recover, slot->compressSize))
460             CmiPrintf("sth wrong with compression\n");
461 #endif
462         if(slot->compressSize - compressSize <= 100) //not compress
463         {
464             newSize = size;
465             (((CmiMsgHeaderExt*)msg)->xxhdl) = (((CmiMsgHeaderExt*)msg)->xhdl);
466             CldSwitchHandler(cmsg, persistentNoDecompressHandlerIdx);
467             CmiFree(slot->previousMsg);
468             slot->previousMsg = msg;
469             CmiReference(msg);
470         }else
471         {
472             memcpy(history+slot->compressStart, msg+slot->compressStart, slot->compressSize);
473             *(int*)(msg+slot->compressStart) = compressSize;
474             *(int*)(msg+slot->compressStart+sizeof(int)) = slot->compressSize;
475             memcpy(msg+slot->compressStart+2*sizeof(int), dest, compressSize);
476             int leftSize = size-slot->compressStart-slot->compressSize;
477             //depending on memcpy implementation, this might not be safe
478             if(leftSize > 0)
479                 memcpy(msg+slot->compressStart+compressSize+2*sizeof(int), msg+slot->compressStart+slot->compressSize, leftSize);
480             newSize = slot->compressStart + compressSize + 2*sizeof(int) +leftSize;
481             (((CmiMsgHeaderExt*)msg)->xxhdl) = (((CmiMsgHeaderExt*)msg)->xhdl);
482             CldSwitchHandler(cmsg, persistentDecompressHandlerIdx);
483 #if VERIFY
484             memcpy(msg+newSize, &checksum1, 1); 
485             memcpy(msg+newSize+1, &checksum2, 1); 
486             char *orig = CmiAlloc(size);
487             memcpy(orig, msg, newSize);
488              
489             char    *decompressData =(char*) malloc(slot->compressSize);
490             int left_size = size - slot->compressStart - slot->compressSize;
491             char *base_dst = orig+size-1;
492             char *base_src = orig + size - slot->compressSize +compressSize+2*sizeof(int) -1;
493             for(i=0; i<left_size; i++)
494             {
495                 *base_dst = *base_src;
496                 base_dst--;
497                 base_src--;
498             }
499     
500             decompressChar(orig+slot->compressStart+2*sizeof(int), decompressData, slot->compressSize, compressSize, history_save+slot->compressStart);
501             memcpy(orig+slot->compressStart, decompressData, slot->compressSize);
502             free(decompressData);
503             CldRestoreHandler(orig);
504             (((CmiMsgHeaderExt*)orig)->xhdl) =  (((CmiMsgHeaderExt*)orig)->xxhdl);
505             if(memcmp(orig, history, slot->compressStart))
506                 CmiPrintf("sth wrong header all \n");
507             if(memcmp(orig+slot->compressStart, history+slot->compressStart, slot->compressSize))
508                 CmiPrintf("sth wrong data \n");
509             newSize += 2;
510 #endif
511         }
512         free(dest);
513     }
515     return newSize;
518 #endif
519 #else
520 #endif
522 /* for SMP */
523 PersistentHandle CmiCreateNodePersistent(int destNode, int maxBytes)
525     /* randomly pick one rank on the destination node is fine for setup.
526        actual message will be handled by comm thread anyway */
527   int pe = CmiNodeFirst(destNode) + rand()/RAND_MAX * CmiMyNodeSize();
528   return CmiCreatePersistent(pe, maxBytes);
530 PersistentHandle CmiCreateCompressNodePersistent(int destNode, int maxBytes, int start, int type)
532     /* randomly pick one rank on the destination node is fine for setup.
533        actual message will be handled by comm thread anyway */
534   int pe = CmiNodeFirst(destNode) + rand()/RAND_MAX * CmiMyNodeSize();
535   return CmiCreateCompressPersistent(pe, maxBytes, start, type);
538 PersistentHandle CmiCreateCompressNodePersistentSize(int destNode, int maxBytes, int start, int compressSize, int type)
540     /* randomly pick one rank on the destination node is fine for setup.
541        actual message will be handled by comm thread anyway */
542   int pe = CmiNodeFirst(destNode) + rand()/RAND_MAX * CmiMyNodeSize();
543   return CmiCreateCompressPersistentSize(pe, maxBytes, start, compressSize, type);
547 static void persistentRequestHandler(void *env)
548 {             
549   PersistentRequestMsg *msg = (PersistentRequestMsg *)env;
550   char *buf;
551   int i;
553   PersistentHandle h = getFreeRecvSlot();
554   PersistentReceivesTable *slot = (PersistentReceivesTable *)h;
556   /* build reply message */
557   PersistentReqGrantedMsg *gmsg = CmiAlloc(sizeof(PersistentReqGrantedMsg));
559 #if DELTA_COMPRESS
560   slot->compressStart = msg->compressStart;
561   slot->dataType = msg->dataType;
562 #if COPY_HISTORY
563   slot->history = malloc(msg->maxBytes);
564 #endif
565 #endif
566   setupRecvSlot(slot, msg->maxBytes);
568   for (i=0; i<PERSIST_BUFFERS_NUM; i++) {
569 #if 0
570     gmsg->msgAddr[i] = slot->messagePtr[i];
571     gmsg->slotFlagAddress[i] = slot->recvSizePtr[i];
572 #else
573     gmsg->buf[i] = slot->destBuf[i];
574 #endif
575   }
577   gmsg->sourceHandler = msg->sourceHandler;
578   gmsg->destHandler = getPersistentHandle(h, 1);
579 #if  DELTA_COMPRESS
580   gmsg->destDataHandler = h;
581   //CmiPrintf("[%d] receiver slot=%p, current=%d, h=%p  =%p \n", CmiMyPe(), slot, slot->addrIndex, h, gmsg->destDataHandler);
582 #endif
583   CmiSetHandler(gmsg, persistentReqGrantedHandlerIdx);
584   CmiSyncSendAndFree(msg->requestorPE,sizeof(PersistentReqGrantedMsg),gmsg);
586   CmiFree(msg);
589 static void persistentReqGrantedHandler(void *env)
591   int i;
593   PersistentReqGrantedMsg *msg = (PersistentReqGrantedMsg *)env;
594   PersistentHandle h = msg->sourceHandler;
595   PersistentSendsTable *slot = (PersistentSendsTable *)h;
597   /* CmiPrintf("[%d] Persistent handler granted  h:%p\n", CmiMyPe(), h); */
599   for (i=0; i<PERSIST_BUFFERS_NUM; i++) {
600 #if 0
601     slot->destAddress[i] = msg->msgAddr[i];
602     slot->destSizeAddress[i] = msg->slotFlagAddress[i];
603 #else
604     slot->destBuf[i] = msg->buf[i];
605 #endif
606   }
607   slot->destHandle = msg->destHandler;
608 #if DELTA_COMPRESS
609   slot->destDataHandle = msg->destDataHandler;
610   //CmiPrintf("+++[%d] req grant %p\n", CmiMyPe(), slot->destDataHandle);
611 #endif
612   if (slot->messageBuf) {
613     LrtsSendPersistentMsg(h, CmiGetNodeGlobal(CmiNodeOf(slot->destPE),CmiMyPartition()), slot->messageSize, slot->messageBuf);
614     slot->messageBuf = NULL;
615   }
616   CmiFree(msg);
620   Another API:
621   receiver initiate the persistent communication
623 PersistentReq CmiCreateReceiverPersistent(int maxBytes)
625     PersistentReq ret;
626     int i;
628   PersistentHandle h = getFreeRecvSlot();
629   PersistentReceivesTable *slot = (PersistentReceivesTable *)h;
631   setupRecvSlot(slot, maxBytes);
633   ret.pe = CmiMyPe();
634   ret.maxBytes = maxBytes;
635   ret.myHand = h;
636   ret.bufPtr = (void **)malloc(PERSIST_BUFFERS_NUM*sizeof(void*));
637   for (i=0; i<PERSIST_BUFFERS_NUM; i++) {
638 #if 0
639     ret.messagePtr[i] = slot->messagePtr[i];
640     ret.recvSizePtr[i] = slot->recvSizePtr[i];
641 #else
642     ret.bufPtr[i] = malloc(sizeof(PersistentBuf));
643     memcpy(&ret.bufPtr[i], &slot->destBuf[i], sizeof(PersistentBuf));
644 #endif
645   }
647   return ret;
650 PersistentHandle CmiRegisterReceivePersistent(PersistentReq recvHand)
652   int i;
653   PersistentHandle h = getFreeSendSlot();
655   PersistentSendsTable *slot = (PersistentSendsTable *)h;
656   slot->destPE = recvHand.pe;
657   slot->sizeMax = recvHand.maxBytes;
659 #if 0
660   for (i=0; i<PERSIST_BUFFERS_NUM; i++) {
661     slot->destAddress[i] = recvHand.messagePtr[i];
662     slot->destSizeAddress[i] = recvHand.recvSizePtr[i];
663   }
664 #else
665   memcpy(slot->destBuf, recvHand.bufPtr, PERSIST_BUFFERS_NUM*sizeof(PersistentBuf));
666 #endif
667   slot->destHandle = recvHand.myHand;
668   return h;
671 /******************************************************************************
672      destroy Persistent Comm handler
673 ******************************************************************************/
675 /* Converse Handler */
676 void persistentDestroyHandler(void *env)
677 {             
678   int i;
679   PersistentDestroyMsg *msg = (PersistentDestroyMsg *)env;
680   PersistentHandle h = getPersistentHandle(msg->destHandlerIndex, 0);
681   CmiAssert(h!=NULL);
682   CmiFree(msg);
683   PersistentReceivesTable *slot = (PersistentReceivesTable *)h;
685   CpvAccess(persistentReceivesTableCount) --;
686   if (slot->prev) {
687     slot->prev->next = slot->next;
688   }
689   else
690     CpvAccess(persistentReceivesTableHead) = slot->next;
691   if (slot->next) {
692     slot->next->prev = slot->prev;
693   }
694   else
695     CpvAccess(persistentReceivesTableTail) = slot->prev;
697   for (i=0; i<PERSIST_BUFFERS_NUM; i++) 
698     if (slot->destBuf[i].destAddress) 
699       PerFree((char*)slot->destBuf[i].destAddress);
701   clearRecvSlot(slot);
703   free(slot);
706 /* FIXME: need to buffer until ReqGranted message come back? */
707 void CmiDestroyPersistent(PersistentHandle h)
709   if (h == NULL) return;
711   PersistentSendsTable *slot = (PersistentSendsTable *)h;
712   /* CmiAssert(slot->destHandle != 0); */
714   PersistentDestroyMsg *msg = (PersistentDestroyMsg *)
715                               CmiAlloc(sizeof(PersistentDestroyMsg));
716   msg->destHandlerIndex = slot->destHandle;
718   CmiSetHandler(msg, persistentDestroyHandlerIdx);
719   CmiSyncSendAndFree(slot->destPE,sizeof(PersistentDestroyMsg),msg);
721   /* free this slot */
722   if (slot->prev) {
723     slot->prev->next = slot->next;
724   }
725   else
726     CpvAccess(persistentSendsTableHead) = slot->next;
727   if (slot->next) {
728     slot->next->prev = slot->prev;
729   }
730   else
731     CpvAccess(persistentSendsTableTail) = slot->prev;
732   free(slot);
734   CpvAccess(persistentSendsTableCount) --;
738 void CmiDestroyAllPersistent(void)
740   PersistentSendsTable *sendslot = CpvAccess(persistentSendsTableHead);
741   while (sendslot) {
742     PersistentSendsTable *next = sendslot->next;
743     free(sendslot);
744     sendslot = next;
745   }
746   CpvAccess(persistentSendsTableHead) = CpvAccess(persistentSendsTableTail) = NULL;
747   CpvAccess(persistentSendsTableCount) = 0;
749   PersistentReceivesTable *slot = CpvAccess(persistentReceivesTableHead);
750   while (slot) {
751     PersistentReceivesTable *next = slot->next;
752     int i;
753     for (i=0; i<PERSIST_BUFFERS_NUM; i++)  {
754       //if (slot->destBuf[i].destSizeAddress)
755       //  CmiPrintf("Warning: CmiDestroyAllPersistent destoried buffered undelivered message.\n");
756       if (slot->destBuf[i].destAddress) PerFree((char*)slot->destBuf[i].destAddress);
757     }
758     free(slot);
759     slot = next;
760   }
761   CpvAccess(persistentReceivesTableHead) = CpvAccess(persistentReceivesTableTail) = NULL;
762   CpvAccess(persistentReceivesTableCount) = 0;
766 void CmiPersistentInit(void)
768   int i;
770   persistentRequestHandlerIdx = 
771        CmiRegisterHandler((CmiHandler)persistentRequestHandler);
772   persistentReqGrantedHandlerIdx = 
773        CmiRegisterHandler((CmiHandler)persistentReqGrantedHandler);
774   persistentDestroyHandlerIdx = 
775        CmiRegisterHandler((CmiHandler)persistentDestroyHandler);
777 #if DELTA_COMPRESS
778   persistentDecompressHandlerIdx = 
779       CmiRegisterHandler((CmiHandler)persistentDecompressHandler);
780   persistentNoDecompressHandlerIdx = 
781       CmiRegisterHandler((CmiHandler)persistentNoDecompressHandler);
782 #endif
784   CpvInitialize(PersistentHandle*, phs);
785   CpvAccess(phs) = NULL;
786   CpvInitialize(int, phsSize);
787   CpvInitialize(int, curphs);
788   CpvAccess(curphs) = 0;
790   persist_machine_init();
792   CpvInitialize(PersistentSendsTable *, persistentSendsTableHead);
793   CpvInitialize(PersistentSendsTable *, persistentSendsTableTail);
794   CpvAccess(persistentSendsTableHead) = CpvAccess(persistentSendsTableTail) = NULL;
795   CpvInitialize(int, persistentSendsTableCount);
796   CpvAccess(persistentSendsTableCount) = 0;
798   CpvInitialize(PersistentReceivesTable *, persistentReceivesTableHead);
799   CpvInitialize(PersistentReceivesTable *, persistentReceivesTableTail);
800   CpvAccess(persistentReceivesTableHead) = CpvAccess(persistentReceivesTableTail) = NULL;
801   CpvInitialize(int, persistentReceivesTableCount);
802   CpvAccess(persistentReceivesTableCount) = 0;
805 void CmiUsePersistentHandle(PersistentHandle *p, int n)
807   if (n==1 && *p == NULL) { p = NULL; n = 0; }
808 #if  CMK_ERROR_CHECKING && 0
809   {
810   int i;
811   for (i=0; i<n; i++)
812     if (p[i] == NULL) CmiAbort("CmiUsePersistentHandle: invalid PersistentHandle.\n");
813   }
814 #endif
815   CpvAccess(phs) = p;
816   CpvAccess(phsSize) = n;
817   CpvAccess(curphs) = 0;
820 void CmiPersistentOneSend(void)
822   if (CpvAccess(phs)) CpvAccess(curphs)++;
825 #endif
826 /*@}*/