fix memory registeration bug with persistent message
[charm.git] / src / arch / util / persist-comm.c
blob73f670e4b8f91e5b0379994ea38a00be3b12a99c
1 /** @file
2 * Support for persistent communication setup
3 * @ingroup Machine
4 */
6 /**
7 * \addtogroup Machine
8 */
9 /*@{*/
11 #include "converse.h"
13 #if CMK_PERSISTENT_COMM
15 #include "machine-persistent.h"
17 CpvDeclare(int, TABLESIZE);
19 CpvDeclare(PersistentSendsTable *, persistentSendsTable);
20 CpvDeclare(int, persistentSendsTableCount);
21 CpvDeclare(PersistentReceivesTable *, persistentReceivesTableHead);
22 CpvDeclare(PersistentReceivesTable *, persistentReceivesTableTail);
23 CpvDeclare(int, persistentReceivesTableCount);
25 /* Converse message type */
26 typedef struct _PersistentRequestMsg {
27 char core[CmiMsgHeaderSizeBytes];
28 int requestorPE;
29 int maxBytes;
30 PersistentHandle sourceHandler;
31 } PersistentRequestMsg;
33 typedef struct _PersistentReqGrantedMsg {
34 char core[CmiMsgHeaderSizeBytes];
36 void *msgAddr[PERSIST_BUFFERS_NUM];
37 void *slotFlagAddress[PERSIST_BUFFERS_NUM];
39 PersistentBuf buf[PERSIST_BUFFERS_NUM];
40 PersistentHandle sourceHandler;
41 PersistentHandle destHandler;
42 } PersistentReqGrantedMsg;
44 typedef struct _PersistentDestoryMsg {
45 char core[CmiMsgHeaderSizeBytes];
46 PersistentHandle destHandlerIndex;
47 } PersistentDestoryMsg;
49 /* Converse handler */
50 int persistentRequestHandlerIdx;
51 int persistentReqGrantedHandlerIdx;
52 int persistentDestoryHandlerIdx;
54 CpvDeclare(PersistentHandle *, phs);
55 CpvDeclare(int, phsSize);
56 CpvDeclare(int, curphs);
58 /******************************************************************************
59 Utilities
60 ******************************************************************************/
62 extern void initRecvSlot(PersistentReceivesTable *slot);
63 extern void initSendSlot(PersistentSendsTable *slot);
65 void swapSendSlotBuffers(PersistentSendsTable *slot)
67 if (PERSIST_BUFFERS_NUM == 2) {
68 #if 0
69 void *tmp = slot->destAddress[0];
70 slot->destAddress[0] = slot->destAddress[1];
71 slot->destAddress[1] = tmp;
72 tmp = slot->destSizeAddress[0];
73 slot->destSizeAddress[0] = slot->destSizeAddress[1];
74 slot->destSizeAddress[1] = tmp;
75 #else
76 PersistentBuf tmp = slot->destBuf[0];
77 slot->destBuf[0] = slot->destBuf[1];
78 slot->destBuf[1] = tmp;
79 #endif
83 void swapRecvSlotBuffers(PersistentReceivesTable *slot)
85 if (PERSIST_BUFFERS_NUM == 2) {
86 #if 0
87 void *tmp = slot->messagePtr[0];
88 slot->messagePtr[0] = slot->messagePtr[1];
89 slot->messagePtr[1] = tmp;
90 tmp = slot->recvSizePtr[0];
91 slot->recvSizePtr[0] = slot->recvSizePtr[1];
92 slot->recvSizePtr[1] = tmp;
93 #else
94 PersistentBuf tmp = slot->destBuf[0];
95 slot->destBuf[0] = slot->destBuf[1];
96 slot->destBuf[1] = tmp;
97 #endif
101 PersistentHandle getFreeSendSlot()
103 int i;
104 if (CpvAccess(persistentSendsTableCount) == CpvAccess(TABLESIZE)) {
105 CmiAbort("Charm++> too many persistent channels on sender.");
107 CpvAccess(persistentSendsTableCount)++;
108 for (i=1; i<CpvAccess(TABLESIZE); i++)
109 if (CpvAccess(persistentSendsTable)[i].used == 0) break;
110 return &CpvAccess(persistentSendsTable)[i];
113 PersistentHandle getFreeRecvSlot()
115 PersistentReceivesTable *slot = (PersistentReceivesTable *)CmiAlloc(sizeof(PersistentReceivesTable));
116 initRecvSlot(slot);
117 if (CpvAccess(persistentReceivesTableHead) == NULL) {
118 CpvAccess(persistentReceivesTableHead) = CpvAccess(persistentReceivesTableTail) = slot;
120 else {
121 CpvAccess(persistentReceivesTableTail)->next = slot;
122 slot->prev = CpvAccess(persistentReceivesTableTail);
123 CpvAccess(persistentReceivesTableTail) = slot;
125 CpvAccess(persistentReceivesTableCount)++;
126 return slot;
129 /******************************************************************************
130 Create Persistent Comm handler
131 When creating a persistent comm with destPE and maxSize
132 1. allocate a free PersistentSendsTable entry, send a
133 PersistentRequestMsg message to destPE
134 buffer persistent message before Persistent Comm is setup;
135 2. destPE execute Converse handler persistentRequestHandler() on the
136 PersistentRequestMsg message:
137 allocate a free PersistentReceivesTable entry;
138 allocate a message buffer of size maxSize for the communication;
139 Send back a PersistentReqGrantedMsg with message address, etc for
140 elan_put;
141 3. Converse handler persistentReqGrantedHandler() executed on
142 sender for the PersistentReqGrantedMsg. setup finish, send buffered
143 message.
144 ******************************************************************************/
146 PersistentHandle CmiCreatePersistent(int destPE, int maxBytes)
148 PersistentHandle h = getFreeSendSlot();
150 PersistentSendsTable *slot = (PersistentSendsTable *)h;
152 if (CmiMyPe() == destPE) {
153 CmiAbort("CmiCreatePersistent Error: setting up persistent communication to the same processor is not allowed.");
156 slot->used = 1;
157 slot->destPE = destPE;
158 slot->sizeMax = maxBytes;
160 PersistentRequestMsg *msg = (PersistentRequestMsg *)CmiAlloc(sizeof(PersistentRequestMsg));
161 msg->maxBytes = maxBytes;
162 msg->sourceHandler = h;
163 msg->requestorPE = CmiMyPe();
165 CmiSetHandler(msg, persistentRequestHandlerIdx);
166 CmiSyncSendAndFree(destPE,sizeof(PersistentRequestMsg),msg);
168 return h;
171 static void persistentRequestHandler(void *env)
173 PersistentRequestMsg *msg = (PersistentRequestMsg *)env;
174 char *buf;
175 int i;
177 PersistentHandle h = getFreeRecvSlot();
178 PersistentReceivesTable *slot = (PersistentReceivesTable *)h;
179 /*slot->messagePtr = elan_CmiStaticAlloc(msg->maxBytes);*/
181 /* build reply message */
182 PersistentReqGrantedMsg *gmsg = CmiAlloc(sizeof(PersistentReqGrantedMsg));
184 setupRecvSlot(slot, msg->maxBytes);
186 for (i=0; i<PERSIST_BUFFERS_NUM; i++) {
187 #if 0
188 gmsg->msgAddr[i] = slot->messagePtr[i];
189 gmsg->slotFlagAddress[i] = slot->recvSizePtr[i];
190 #else
191 gmsg->buf[i] = slot->destBuf[i];
192 #endif
195 gmsg->sourceHandler = msg->sourceHandler;
196 gmsg->destHandler = getPersistentHandle(h, 1);
198 CmiSetHandler(gmsg, persistentReqGrantedHandlerIdx);
199 CmiSyncSendAndFree(msg->requestorPE,sizeof(PersistentReqGrantedMsg),gmsg);
201 CmiFree(msg);
204 static void persistentReqGrantedHandler(void *env)
206 int i;
208 PersistentReqGrantedMsg *msg = (PersistentReqGrantedMsg *)env;
209 PersistentHandle h = msg->sourceHandler;
210 PersistentSendsTable *slot = (PersistentSendsTable *)h;
212 /* CmiPrintf("[%d] Persistent handler granted h:%p\n", CmiMyPe(), h); */
214 CmiAssert(slot->used == 1);
217 for (i=0; i<PERSIST_BUFFERS_NUM; i++) {
218 #if 0
219 slot->destAddress[i] = msg->msgAddr[i];
220 slot->destSizeAddress[i] = msg->slotFlagAddress[i];
221 #else
222 slot->destBuf[i] = msg->buf[i];
223 #endif
225 slot->destHandle = msg->destHandler;
227 if (slot->messageBuf) {
228 LrtsSendPersistentMsg(h, CmiNodeOf(slot->destPE), slot->messageSize, slot->messageBuf);
229 slot->messageBuf = NULL;
231 CmiFree(msg);
235 Another API:
236 receiver initiate the persistent communication
238 PersistentReq CmiCreateReceiverPersistent(int maxBytes)
240 PersistentReq ret;
241 int i;
243 PersistentHandle h = getFreeRecvSlot();
244 PersistentReceivesTable *slot = (PersistentReceivesTable *)h;
246 setupRecvSlot(slot, maxBytes);
248 ret.pe = CmiMyPe();
249 ret.maxBytes = maxBytes;
250 ret.myHand = h;
251 ret.bufPtr = (void **)malloc(PERSIST_BUFFERS_NUM*sizeof(void*));
252 for (i=0; i<PERSIST_BUFFERS_NUM; i++) {
253 #if 0
254 ret.messagePtr[i] = slot->messagePtr[i];
255 ret.recvSizePtr[i] = slot->recvSizePtr[i];
256 #else
257 ret.bufPtr[i] = malloc(sizeof(PersistentBuf));
258 memcpy(&ret.bufPtr[i], &slot->destBuf[i], sizeof(PersistentBuf));
259 #endif
262 return ret;
265 PersistentHandle CmiRegisterReceivePersistent(PersistentReq recvHand)
267 int i;
268 PersistentHandle h = getFreeSendSlot();
270 PersistentSendsTable *slot = (PersistentSendsTable *)h;
271 slot->used = 1;
272 slot->destPE = recvHand.pe;
273 slot->sizeMax = recvHand.maxBytes;
275 #if 0
276 for (i=0; i<PERSIST_BUFFERS_NUM; i++) {
277 slot->destAddress[i] = recvHand.messagePtr[i];
278 slot->destSizeAddress[i] = recvHand.recvSizePtr[i];
280 #else
281 memcpy(slot->destBuf, recvHand.bufPtr, PERSIST_BUFFERS_NUM*sizeof(PersistentBuf));
282 #endif
283 slot->destHandle = recvHand.myHand;
284 return h;
287 /******************************************************************************
288 destory Persistent Comm handler
289 ******************************************************************************/
291 /* Converse Handler */
292 void persistentDestoryHandler(void *env)
294 int i;
295 PersistentDestoryMsg *msg = (PersistentDestoryMsg *)env;
296 PersistentHandle h = getPersistentHandle(msg->destHandlerIndex, 0);
297 CmiAssert(h!=NULL);
298 CmiFree(msg);
299 PersistentReceivesTable *slot = (PersistentReceivesTable *)h;
301 CpvAccess(persistentReceivesTableCount) --;
302 if (slot->prev) {
303 slot->prev->next = slot->next;
305 else
306 CpvAccess(persistentReceivesTableHead) = slot->next;
307 if (slot->next) {
308 slot->next->prev = slot->prev;
310 else
311 CpvAccess(persistentReceivesTableTail) = slot->prev;
313 for (i=0; i<PERSIST_BUFFERS_NUM; i++)
314 if (slot->destBuf[i].destAddress) /*elan_CmiStaticFree(slot->messagePtr);*/
315 PerFree((char*)slot->destBuf[i].destAddress);
317 CmiFree(slot);
320 /* FIXME: need to buffer until ReqGranted message come back? */
321 void CmiDestoryPersistent(PersistentHandle h)
323 if (h == 0) CmiAbort("CmiDestoryPersistent: not a valid PersistentHandle\n");
325 PersistentSendsTable *slot = (PersistentSendsTable *)h;
326 //CmiAssert(slot->destHandle != 0);
328 PersistentDestoryMsg *msg = (PersistentDestoryMsg *)
329 CmiAlloc(sizeof(PersistentDestoryMsg));
330 msg->destHandlerIndex = slot->destHandle;
332 CmiSetHandler(msg, persistentDestoryHandlerIdx);
333 CmiSyncSendAndFree(slot->destPE,sizeof(PersistentDestoryMsg),msg);
335 /* free this slot */
336 initSendSlot(slot);
338 CpvAccess(persistentSendsTableCount) --;
342 void CmiDestoryAllPersistent()
344 int i;
345 for (i=0; i<CpvAccess(TABLESIZE); i++) {
346 if (CpvAccess(persistentSendsTable)[i].messageBuf)
347 CmiPrintf("Warning: CmiDestoryAllPersistent destoried buffered unsend message.\n");
348 initSendSlot(&CpvAccess(persistentSendsTable)[i]);
350 CpvAccess(persistentSendsTableCount) = 0;
352 PersistentReceivesTable *slot = CpvAccess(persistentReceivesTableHead);
353 while (slot) {
354 PersistentReceivesTable *next = slot->next;
355 int i;
356 for (i=0; i<PERSIST_BUFFERS_NUM; i++) {
357 if (slot->destBuf[i].destSizeAddress)
358 CmiPrintf("Warning: CmiDestoryAllPersistent destoried buffered undelivered message.\n");
359 if (slot->destBuf[i].destAddress) PerFree((char*)slot->destBuf[i].destAddress);
361 CmiFree(slot);
362 slot = next;
364 CpvAccess(persistentReceivesTableHead) = CpvAccess(persistentReceivesTableTail) = NULL;
365 CpvAccess(persistentReceivesTableCount) = 0;
368 void CmiPersistentInit()
370 int i;
372 persistentRequestHandlerIdx =
373 CmiRegisterHandler((CmiHandler)persistentRequestHandler);
374 persistentReqGrantedHandlerIdx =
375 CmiRegisterHandler((CmiHandler)persistentReqGrantedHandler);
376 persistentDestoryHandlerIdx =
377 CmiRegisterHandler((CmiHandler)persistentDestoryHandler);
380 CpvInitialize(PersistentHandle*, phs);
381 CpvAccess(phs) = NULL;
382 CpvInitialize(int, phsSize);
383 CpvInitialize(int, curphs);
384 CpvAccess(curphs) = 0;
386 persist_machine_init();
388 CpvInitialize(int, TABLESIZE);
389 CpvAccess(TABLESIZE) = 512;
391 CpvInitialize(PersistentSendsTable *, persistentSendsTable);
392 CpvAccess(persistentSendsTable) = (PersistentSendsTable *)malloc(CpvAccess(TABLESIZE) * sizeof(PersistentSendsTable));
393 for (i=0; i<CpvAccess(TABLESIZE); i++) {
394 initSendSlot(&CpvAccess(persistentSendsTable)[i]);
396 CpvInitialize(int, persistentSendsTableCount);
397 CpvAccess(persistentSendsTableCount) = 0;
399 CpvInitialize(PersistentReceivesTable *, persistentReceivesTableHead);
400 CpvInitialize(PersistentReceivesTable *, persistentReceivesTableTail);
401 CpvAccess(persistentReceivesTableHead) = CpvAccess(persistentReceivesTableTail) = NULL;
402 CpvInitialize(int, persistentReceivesTableCount);
403 CpvAccess(persistentReceivesTableCount) = 0;
407 void CmiUsePersistentHandle(PersistentHandle *p, int n)
409 if (n==1 && *p == NULL) { p = NULL; n = 0; }
410 #if CMK_ERROR_CHECKING
412 int i;
413 for (i=0; i<n; i++)
414 if (p[i] == NULL) CmiAbort("CmiUsePersistentHandle: invalid PersistentHandle.\n");
416 #endif
417 CpvAccess(phs) = p;
418 CpvAccess(phsSize) = n;
419 CpvAccess(curphs) = 0;
422 #endif
424 /*@}*/