2 * Support for persistent communication setup
13 #if CMK_PERSISTENT_COMM
15 #include "machine-persistent.h"
17 CpvDeclare(int, TABLESIZE
);
19 CpvDeclare(PersistentSendsTable
*, persistentSendsTable
);
20 CpvDeclare(int, persistentSendsTableCount
);
21 CpvDeclare(PersistentReceivesTable
*, persistentReceivesTableHead
);
22 CpvDeclare(PersistentReceivesTable
*, persistentReceivesTableTail
);
23 CpvDeclare(int, persistentReceivesTableCount
);
25 /* Converse message type */
26 typedef struct _PersistentRequestMsg
{
27 char core
[CmiMsgHeaderSizeBytes
];
30 PersistentHandle sourceHandler
;
31 } PersistentRequestMsg
;
33 typedef struct _PersistentReqGrantedMsg
{
34 char core
[CmiMsgHeaderSizeBytes
];
36 void *msgAddr[PERSIST_BUFFERS_NUM];
37 void *slotFlagAddress[PERSIST_BUFFERS_NUM];
39 PersistentBuf buf
[PERSIST_BUFFERS_NUM
];
40 PersistentHandle sourceHandler
;
41 PersistentHandle destHandler
;
42 } PersistentReqGrantedMsg
;
44 typedef struct _PersistentDestoryMsg
{
45 char core
[CmiMsgHeaderSizeBytes
];
46 PersistentHandle destHandlerIndex
;
47 } PersistentDestoryMsg
;
49 /* Converse handler */
50 int persistentRequestHandlerIdx
;
51 int persistentReqGrantedHandlerIdx
;
52 int persistentDestoryHandlerIdx
;
54 CpvDeclare(PersistentHandle
*, phs
);
55 CpvDeclare(int, phsSize
);
56 CpvDeclare(int, curphs
);
58 /******************************************************************************
60 ******************************************************************************/
62 extern void initRecvSlot(PersistentReceivesTable
*slot
);
63 extern void initSendSlot(PersistentSendsTable
*slot
);
65 void swapSendSlotBuffers(PersistentSendsTable
*slot
)
67 if (PERSIST_BUFFERS_NUM
== 2) {
69 void *tmp
= slot
->destAddress
[0];
70 slot
->destAddress
[0] = slot
->destAddress
[1];
71 slot
->destAddress
[1] = tmp
;
72 tmp
= slot
->destSizeAddress
[0];
73 slot
->destSizeAddress
[0] = slot
->destSizeAddress
[1];
74 slot
->destSizeAddress
[1] = tmp
;
76 PersistentBuf tmp
= slot
->destBuf
[0];
77 slot
->destBuf
[0] = slot
->destBuf
[1];
78 slot
->destBuf
[1] = tmp
;
83 void swapRecvSlotBuffers(PersistentReceivesTable
*slot
)
85 if (PERSIST_BUFFERS_NUM
== 2) {
87 void *tmp
= slot
->messagePtr
[0];
88 slot
->messagePtr
[0] = slot
->messagePtr
[1];
89 slot
->messagePtr
[1] = tmp
;
90 tmp
= slot
->recvSizePtr
[0];
91 slot
->recvSizePtr
[0] = slot
->recvSizePtr
[1];
92 slot
->recvSizePtr
[1] = tmp
;
94 PersistentBuf tmp
= slot
->destBuf
[0];
95 slot
->destBuf
[0] = slot
->destBuf
[1];
96 slot
->destBuf
[1] = tmp
;
101 PersistentHandle
getFreeSendSlot()
104 if (CpvAccess(persistentSendsTableCount
) == CpvAccess(TABLESIZE
)) {
105 CmiAbort("Charm++> too many persistent channels on sender.");
107 CpvAccess(persistentSendsTableCount
)++;
108 for (i
=1; i
<CpvAccess(TABLESIZE
); i
++)
109 if (CpvAccess(persistentSendsTable
)[i
].used
== 0) break;
110 return &CpvAccess(persistentSendsTable
)[i
];
113 PersistentHandle
getFreeRecvSlot()
115 PersistentReceivesTable
*slot
= (PersistentReceivesTable
*)CmiAlloc(sizeof(PersistentReceivesTable
));
117 if (CpvAccess(persistentReceivesTableHead
) == NULL
) {
118 CpvAccess(persistentReceivesTableHead
) = CpvAccess(persistentReceivesTableTail
) = slot
;
121 CpvAccess(persistentReceivesTableTail
)->next
= slot
;
122 slot
->prev
= CpvAccess(persistentReceivesTableTail
);
123 CpvAccess(persistentReceivesTableTail
) = slot
;
125 CpvAccess(persistentReceivesTableCount
)++;
129 /******************************************************************************
130 Create Persistent Comm handler
131 When creating a persistent comm with destPE and maxSize
132 1. allocate a free PersistentSendsTable entry, send a
133 PersistentRequestMsg message to destPE
134 buffer persistent message before Persistent Comm is setup;
135 2. destPE execute Converse handler persistentRequestHandler() on the
136 PersistentRequestMsg message:
137 allocate a free PersistentReceivesTable entry;
138 allocate a message buffer of size maxSize for the communication;
139 Send back a PersistentReqGrantedMsg with message address, etc for
141 3. Converse handler persistentReqGrantedHandler() executed on
142 sender for the PersistentReqGrantedMsg. setup finish, send buffered
144 ******************************************************************************/
146 PersistentHandle
CmiCreatePersistent(int destPE
, int maxBytes
)
148 PersistentHandle h
= getFreeSendSlot();
150 PersistentSendsTable
*slot
= (PersistentSendsTable
*)h
;
152 if (CmiMyPe() == destPE
) {
153 CmiAbort("CmiCreatePersistent Error: setting up persistent communication to the same processor is not allowed.");
157 slot
->destPE
= destPE
;
158 slot
->sizeMax
= maxBytes
;
160 PersistentRequestMsg
*msg
= (PersistentRequestMsg
*)CmiAlloc(sizeof(PersistentRequestMsg
));
161 msg
->maxBytes
= maxBytes
;
162 msg
->sourceHandler
= h
;
163 msg
->requestorPE
= CmiMyPe();
165 CmiSetHandler(msg
, persistentRequestHandlerIdx
);
166 CmiSyncSendAndFree(destPE
,sizeof(PersistentRequestMsg
),msg
);
171 static void persistentRequestHandler(void *env
)
173 PersistentRequestMsg
*msg
= (PersistentRequestMsg
*)env
;
177 PersistentHandle h
= getFreeRecvSlot();
178 PersistentReceivesTable
*slot
= (PersistentReceivesTable
*)h
;
179 /*slot->messagePtr = elan_CmiStaticAlloc(msg->maxBytes);*/
181 /* build reply message */
182 PersistentReqGrantedMsg
*gmsg
= CmiAlloc(sizeof(PersistentReqGrantedMsg
));
184 setupRecvSlot(slot
, msg
->maxBytes
);
186 for (i
=0; i
<PERSIST_BUFFERS_NUM
; i
++) {
188 gmsg
->msgAddr
[i
] = slot
->messagePtr
[i
];
189 gmsg
->slotFlagAddress
[i
] = slot
->recvSizePtr
[i
];
191 gmsg
->buf
[i
] = slot
->destBuf
[i
];
195 gmsg
->sourceHandler
= msg
->sourceHandler
;
196 gmsg
->destHandler
= getPersistentHandle(h
, 1);
198 CmiSetHandler(gmsg
, persistentReqGrantedHandlerIdx
);
199 CmiSyncSendAndFree(msg
->requestorPE
,sizeof(PersistentReqGrantedMsg
),gmsg
);
204 static void persistentReqGrantedHandler(void *env
)
208 PersistentReqGrantedMsg
*msg
= (PersistentReqGrantedMsg
*)env
;
209 PersistentHandle h
= msg
->sourceHandler
;
210 PersistentSendsTable
*slot
= (PersistentSendsTable
*)h
;
212 /* CmiPrintf("[%d] Persistent handler granted h:%p\n", CmiMyPe(), h); */
214 CmiAssert(slot
->used
== 1);
217 for (i
=0; i
<PERSIST_BUFFERS_NUM
; i
++) {
219 slot
->destAddress
[i
] = msg
->msgAddr
[i
];
220 slot
->destSizeAddress
[i
] = msg
->slotFlagAddress
[i
];
222 slot
->destBuf
[i
] = msg
->buf
[i
];
225 slot
->destHandle
= msg
->destHandler
;
227 if (slot
->messageBuf
) {
228 LrtsSendPersistentMsg(h
, CmiNodeOf(slot
->destPE
), slot
->messageSize
, slot
->messageBuf
);
229 slot
->messageBuf
= NULL
;
236 receiver initiate the persistent communication
238 PersistentReq
CmiCreateReceiverPersistent(int maxBytes
)
243 PersistentHandle h
= getFreeRecvSlot();
244 PersistentReceivesTable
*slot
= (PersistentReceivesTable
*)h
;
246 setupRecvSlot(slot
, maxBytes
);
249 ret
.maxBytes
= maxBytes
;
251 ret
.bufPtr
= (void **)malloc(PERSIST_BUFFERS_NUM
*sizeof(void*));
252 for (i
=0; i
<PERSIST_BUFFERS_NUM
; i
++) {
254 ret
.messagePtr
[i
] = slot
->messagePtr
[i
];
255 ret
.recvSizePtr
[i
] = slot
->recvSizePtr
[i
];
257 ret
.bufPtr
[i
] = malloc(sizeof(PersistentBuf
));
258 memcpy(&ret
.bufPtr
[i
], &slot
->destBuf
[i
], sizeof(PersistentBuf
));
265 PersistentHandle
CmiRegisterReceivePersistent(PersistentReq recvHand
)
268 PersistentHandle h
= getFreeSendSlot();
270 PersistentSendsTable
*slot
= (PersistentSendsTable
*)h
;
272 slot
->destPE
= recvHand
.pe
;
273 slot
->sizeMax
= recvHand
.maxBytes
;
276 for (i
=0; i
<PERSIST_BUFFERS_NUM
; i
++) {
277 slot
->destAddress
[i
] = recvHand
.messagePtr
[i
];
278 slot
->destSizeAddress
[i
] = recvHand
.recvSizePtr
[i
];
281 memcpy(slot
->destBuf
, recvHand
.bufPtr
, PERSIST_BUFFERS_NUM
*sizeof(PersistentBuf
));
283 slot
->destHandle
= recvHand
.myHand
;
287 /******************************************************************************
288 destory Persistent Comm handler
289 ******************************************************************************/
291 /* Converse Handler */
292 void persistentDestoryHandler(void *env
)
295 PersistentDestoryMsg
*msg
= (PersistentDestoryMsg
*)env
;
296 PersistentHandle h
= getPersistentHandle(msg
->destHandlerIndex
, 0);
299 PersistentReceivesTable
*slot
= (PersistentReceivesTable
*)h
;
301 CpvAccess(persistentReceivesTableCount
) --;
303 slot
->prev
->next
= slot
->next
;
306 CpvAccess(persistentReceivesTableHead
) = slot
->next
;
308 slot
->next
->prev
= slot
->prev
;
311 CpvAccess(persistentReceivesTableTail
) = slot
->prev
;
313 for (i
=0; i
<PERSIST_BUFFERS_NUM
; i
++)
314 if (slot
->destBuf
[i
].destAddress
) /*elan_CmiStaticFree(slot->messagePtr);*/
315 PerFree((char*)slot
->destBuf
[i
].destAddress
);
320 /* FIXME: need to buffer until ReqGranted message come back? */
321 void CmiDestoryPersistent(PersistentHandle h
)
323 if (h
== 0) CmiAbort("CmiDestoryPersistent: not a valid PersistentHandle\n");
325 PersistentSendsTable
*slot
= (PersistentSendsTable
*)h
;
326 //CmiAssert(slot->destHandle != 0);
328 PersistentDestoryMsg
*msg
= (PersistentDestoryMsg
*)
329 CmiAlloc(sizeof(PersistentDestoryMsg
));
330 msg
->destHandlerIndex
= slot
->destHandle
;
332 CmiSetHandler(msg
, persistentDestoryHandlerIdx
);
333 CmiSyncSendAndFree(slot
->destPE
,sizeof(PersistentDestoryMsg
),msg
);
338 CpvAccess(persistentSendsTableCount
) --;
342 void CmiDestoryAllPersistent()
345 for (i
=0; i
<CpvAccess(TABLESIZE
); i
++) {
346 if (CpvAccess(persistentSendsTable
)[i
].messageBuf
)
347 CmiPrintf("Warning: CmiDestoryAllPersistent destoried buffered unsend message.\n");
348 initSendSlot(&CpvAccess(persistentSendsTable
)[i
]);
350 CpvAccess(persistentSendsTableCount
) = 0;
352 PersistentReceivesTable
*slot
= CpvAccess(persistentReceivesTableHead
);
354 PersistentReceivesTable
*next
= slot
->next
;
356 for (i
=0; i
<PERSIST_BUFFERS_NUM
; i
++) {
357 if (slot
->destBuf
[i
].destSizeAddress
)
358 CmiPrintf("Warning: CmiDestoryAllPersistent destoried buffered undelivered message.\n");
359 if (slot
->destBuf
[i
].destAddress
) PerFree((char*)slot
->destBuf
[i
].destAddress
);
364 CpvAccess(persistentReceivesTableHead
) = CpvAccess(persistentReceivesTableTail
) = NULL
;
365 CpvAccess(persistentReceivesTableCount
) = 0;
368 void CmiPersistentInit()
372 persistentRequestHandlerIdx
=
373 CmiRegisterHandler((CmiHandler
)persistentRequestHandler
);
374 persistentReqGrantedHandlerIdx
=
375 CmiRegisterHandler((CmiHandler
)persistentReqGrantedHandler
);
376 persistentDestoryHandlerIdx
=
377 CmiRegisterHandler((CmiHandler
)persistentDestoryHandler
);
380 CpvInitialize(PersistentHandle
*, phs
);
381 CpvAccess(phs
) = NULL
;
382 CpvInitialize(int, phsSize
);
383 CpvInitialize(int, curphs
);
384 CpvAccess(curphs
) = 0;
386 persist_machine_init();
388 CpvInitialize(int, TABLESIZE
);
389 CpvAccess(TABLESIZE
) = 512;
391 CpvInitialize(PersistentSendsTable
*, persistentSendsTable
);
392 CpvAccess(persistentSendsTable
) = (PersistentSendsTable
*)malloc(CpvAccess(TABLESIZE
) * sizeof(PersistentSendsTable
));
393 for (i
=0; i
<CpvAccess(TABLESIZE
); i
++) {
394 initSendSlot(&CpvAccess(persistentSendsTable
)[i
]);
396 CpvInitialize(int, persistentSendsTableCount
);
397 CpvAccess(persistentSendsTableCount
) = 0;
399 CpvInitialize(PersistentReceivesTable
*, persistentReceivesTableHead
);
400 CpvInitialize(PersistentReceivesTable
*, persistentReceivesTableTail
);
401 CpvAccess(persistentReceivesTableHead
) = CpvAccess(persistentReceivesTableTail
) = NULL
;
402 CpvInitialize(int, persistentReceivesTableCount
);
403 CpvAccess(persistentReceivesTableCount
) = 0;
407 void CmiUsePersistentHandle(PersistentHandle
*p
, int n
)
409 if (n
==1 && *p
== NULL
) { p
= NULL
; n
= 0; }
410 #if CMK_ERROR_CHECKING
414 if (p
[i
] == NULL
) CmiAbort("CmiUsePersistentHandle: invalid PersistentHandle.\n");
418 CpvAccess(phsSize
) = n
;
419 CpvAccess(curphs
) = 0;