2 * Elan persistent communication
8 Gengbin Zheng, 9/6/2011
12 machine specific persistent comm functions:
13 * LrtsSendPersistentMsg
14 * CmiSyncSendPersistent
16 * PerAlloc PerFree // persistent message memory allocation/free functions
17 * persist_machine_init // machine specific initialization call
20 void LrtsSendPersistentMsg(PersistentHandle h
, int destNode
, int size
, void *m
)
22 gni_post_descriptor_t
*pd
;
24 RDMA_REQUEST
*rdma_request_msg
;
26 PersistentSendsTable
*slot
= (PersistentSendsTable
*)h
;
28 printf("[%d] LrtsSendPersistentMsg: handle from node %d to node %d is NULL. \n", CmiMyPe(), myrank
, destNode
);
29 CmiAbort("LrtsSendPersistentMsg: not a valid PersistentHandle");
31 CmiAssert(CmiNodeOf(slot
->destPE
) == destNode
);
32 if (size
> slot
->sizeMax
) {
33 CmiPrintf("size: %d sizeMax: %d mype=%d destPe=%d\n", size
, slot
->sizeMax
, CmiMyPe(), destNode
);
34 CmiAbort("Abort: Invalid size\n");
37 if (slot
->destBuf
[0].destAddress
) {
38 // CmiPrintf("[%d] LrtsSendPersistentMsg h=%p hdl=%d destNode=%d destAddress=%p size=%d\n", CmiMyPe(), h, CmiGetHandler(m), destNode, slot->destBuf[0].destAddress, size);
43 if(size
<= LRTS_GNI_RDMA_THRESHOLD
) {
44 pd
->type
= GNI_POST_FMA_PUT
;
48 pd
->type
= GNI_POST_RDMA_PUT
;
50 pd
->cq_mode
= GNI_CQMODE_GLOBAL_EVENT
;
51 pd
->dlvr_mode
= GNI_DLVMODE_PERFORMANCE
;
52 pd
->length
= ALIGN64(size
);
53 pd
->local_addr
= (uint64_t) m
;
55 pd
->remote_addr
= (uint64_t)slot
->destBuf
[0].destAddress
;
56 pd
->remote_mem_hndl
= slot
->destBuf
[0].mem_hndl
;
57 pd
->src_cq_hndl
= 0;//post_tx_cqh; /* smsg_tx_cqh; */
59 pd
->cqwrite_value
= PERSIST_SEQ
;
63 pd
->sync_flag_addr
= 1000000 * CmiWallTimer(); //microsecond
65 SetMemHndlZero(pd
->local_mem_hndl
);
67 TRACE_COMM_CREATION(CpvAccess(projTraceStart
), (void*)pd
->local_addr
);
71 bufferRdmaMsg(destNode
, pd
, (int)(size_t)(slot
->destHandle
));
73 bufferRdmaMsg(destNode
, pd
, -1);
79 pd
->cq_mode
|= GNI_CQMODE_REMOTE_EVENT
;
80 int sts
= GNI_EpSetEventData(ep_hndl_array
[destNode
], destNode
, PERSIST_EVENT((int)(size_t)(slot
->destHandle
)));
81 GNI_RC_CHECK("GNI_EpSetEventData", sts
);
83 status
= registerMessage((void*)(pd
->local_addr
), pd
->length
, pd
->cqwrite_value
, &pd
->local_mem_hndl
);
84 if (status
== GNI_RC_SUCCESS
)
87 RDMA_TRY_SEND(pd
->type
)
89 if(pd
->type
== GNI_POST_RDMA_PUT
)
90 status
= GNI_PostRdma(ep_hndl_array
[destNode
], pd
);
92 status
= GNI_PostFma(ep_hndl_array
[destNode
], pd
);
95 status
= GNI_RC_ERROR_RESOURCE
;
96 if(status
== GNI_RC_ERROR_RESOURCE
|| status
== GNI_RC_ERROR_NOMEM
)
99 bufferRdmaMsg(destNode
, pd
, (int)(size_t)(slot
->destHandle
));
101 bufferRdmaMsg(destNode
, pd
, -1);
105 GNI_RC_CHECK("AFter posting", status
);
107 pd
->sync_flag_value
= 1000000 * CmiWallTimer(); //microsecond
108 RDMA_TRANS_INIT(pd
->type
, pd
->sync_flag_addr
/1000000.0)
115 if (slot
->messageBuf
!= NULL
) {
116 CmiPrintf("Unexpected message in buffer on %d\n", CmiMyPe());
119 slot
->messageBuf
= m
;
120 slot
->messageSize
= size
;
123 PersistentHandle
*phs_tmp
= phs
;
124 int phsSize_tmp
= phsSize
;
125 phs
= NULL
; phsSize
= 0;
126 CmiPrintf("[%d]Slot sending message directly\n", CmiMyPe());
127 CmiSyncSendAndFree(slot
->destPE
, size
, m
);
128 phs
= phs_tmp
; phsSize
= phsSize_tmp
;
134 void CmiSyncSendPersistent(int destPE
, int size
, char *msg
, PersistentHandle h
)
136 CmiState cs
= CmiGetState();
137 char *dupmsg
= (char *) CmiAlloc(size
);
138 memcpy(dupmsg
, msg
, size
);
140 /* CmiPrintf("Setting root to %d\n", 0); */
141 CMI_SET_BROADCAST_ROOT(dupmsg
, 0);
143 if (cs
->pe
==destPE
) {
144 CQdCreate(CpvAccess(cQdState
), 1);
145 CdsFifo_Enqueue(CpvAccess(CmiLocalQueue
),dupmsg
);
148 LrtsSendPersistentMsg(h
, destPE
, size
, dupmsg
);
152 extern void CmiReference(void *blk
);
156 /* called in PumpMsgs */
160 PersistentReceivesTable
*slot
= persistentReceivesTableHead
;
162 char *msg
= slot
->messagePtr
[0];
163 int size
= *(slot
->recvSizePtr
[0]);
166 int *footer
= (int*)(msg
+ size
);
167 if (footer
[0] == size
&& footer
[1] == 1) {
168 /*CmiPrintf("[%d] PumpPersistent messagePtr=%p size:%d\n", CmiMyPe(), slot->messagePtr, size);*/
172 dupmsg
= CmiAlloc(size
);
175 memcpy(dupmsg
, msg
, size
);
176 memset(msg
, 0, size
+2*sizeof(int));
179 /* return messagePtr directly and user MUST make sure not to delete it. */
180 /*CmiPrintf("[%d] %p size:%d rank:%d root:%d\n", CmiMyPe(), msg, size, CMI_DEST_RANK(msg), CMI_BROADCAST_ROOT(msg));*/
183 swapRecvSlotBuffers(slot
);
186 CmiPushPE(CMI_DEST_RANK(msg
), msg
);
187 #if CMK_BROADCAST_SPANNING_TREE
188 if (CMI_BROADCAST_ROOT(msg
))
189 SendSpanningChildren(size
, msg
);
191 /* clear footer after message used */
192 *(slot
->recvSizePtr
[0]) = 0;
193 footer
[0] = footer
[1] = 0;
196 /* not safe at all! */
197 /* instead of clear before use, do it earlier */
198 msg
=slot
->messagePtr
[0];
199 size
= *(slot
->recvSizePtr
[0]);
200 footer
= (int*)(msg
+ size
);
201 *(slot
->recvSizePtr
[0]) = 0;
202 footer
[0] = footer
[1] = 0;
215 #error "Persistent communication must be compiled with LARGEPAGE on"
218 void *PerAlloc(int size
)
220 // return CmiAlloc(size);
224 size
= ALIGN64(size
+ sizeof(CmiChunkHeader
));
225 //printf("[%d] PerAlloc %p %p %d. \n", myrank, res, ptr, size);
226 res
= mempool_malloc(CpvAccess(mempool
), ALIGNBUF
+size
-sizeof(mempool_header
), 1);
227 if (res
) ptr
= (char*)res
- sizeof(mempool_header
) + ALIGNBUF
;
230 MEMHFIELD(ptr
) = GetMemHndl(ptr
);
234 void PerFree(char *msg
)
237 mempool_free_thread((char*)msg
- ALIGNBUF
+ sizeof(mempool_header
));
239 mempool_free(CpvAccess(mempool
), (char*)msg
- ALIGNBUF
+ sizeof(mempool_header
));
243 /* machine dependent init call */
244 void persist_machine_init(void)
248 void initSendSlot(PersistentSendsTable
*slot
)
253 slot
->destHandle
= 0;
255 for (i
=0; i
<PERSIST_BUFFERS_NUM
; i
++) {
256 slot
->destAddress
[i
] = NULL
;
257 slot
->destSizeAddress
[i
] = NULL
;
260 memset(&slot
->destBuf
, 0, sizeof(PersistentBuf
)*PERSIST_BUFFERS_NUM
);
261 slot
->messageBuf
= 0;
262 slot
->messageSize
= 0;
263 slot
->prev
= slot
->next
= NULL
;
266 void initRecvSlot(PersistentReceivesTable
*slot
)
270 for (i
=0; i
<PERSIST_BUFFERS_NUM
; i
++) {
271 slot
->messagePtr
[i
] = NULL
;
272 slot
->recvSizePtr
[i
] = NULL
;
275 memset(&slot
->destBuf
, 0, sizeof(PersistentBuf
)*PERSIST_BUFFERS_NUM
);
278 slot
->prev
= slot
->next
= NULL
;
281 void setupRecvSlot(PersistentReceivesTable
*slot
, int maxBytes
)
284 for (i
=0; i
<PERSIST_BUFFERS_NUM
; i
++) {
285 char *buf
= PerAlloc(maxBytes
+sizeof(int)*2);
287 memset(buf
, 0, maxBytes
+sizeof(int)*2);
288 slot
->destBuf
[i
].mem_hndl
= MEMHFIELD(buf
);
289 slot
->destBuf
[i
].destAddress
= buf
;
290 /* note: assume first integer in elan converse header is the msg size */
291 slot
->destBuf
[i
].destSizeAddress
= (unsigned int*)buf
;
293 // assume already registered from mempool
294 // slot->destBuf[i].mem_hndl = GetMemHndl(buf);
296 // FIXME: assume always succeed
298 slot
->sizeMax
= maxBytes
;
300 CmiLock(persistPool
.lock
);
301 slot
->index
= IndexPool_getslot(&persistPool
, slot
, 2);
302 CmiUnlock(persistPool
.lock
);
306 void clearRecvSlot(PersistentReceivesTable
*slot
)
309 CmiLock(persistPool
.lock
);
310 IndexPool_freeslot(&persistPool
, slot
->index
);
311 CmiUnlock(persistPool
.lock
);
315 PersistentHandle
getPersistentHandle(PersistentHandle h
, int toindex
)
319 return (PersistentHandle
)(((PersistentReceivesTable
*)h
)->index
);
321 return (PersistentHandle
)GetIndexAddress(persistPool
, (int)(size_t)h
);