have a node level persistent setup API.
[charm.git] / src / arch / gemini_gni / machine-persistent.c
blob5b36524f35b4f178ad6f9b67d20ddc4e08e42404
1 /** @file
2 * Elan persistent communication
3 * @ingroup Machine
4 */
6 /*
7 included in machine.c
8 Gengbin Zheng, 9/6/2011
9 */
12 machine specific persistent comm functions:
13 * LrtsSendPersistentMsg
14 * CmiSyncSendPersistent
15 * PumpPersistent
16 * PerAlloc PerFree // persistent message memory allocation/free functions
17 * persist_machine_init // machine specific initialization call
20 void LrtsSendPersistentMsg(PersistentHandle h, int destNode, int size, void *m)
22 gni_post_descriptor_t *pd;
23 gni_return_t status;
24 RDMA_REQUEST *rdma_request_msg;
26 PersistentSendsTable *slot = (PersistentSendsTable *)h;
27 if (h==NULL) {
28 printf("[%d] LrtsSendPersistentMsg: handle from node %d to node %d is NULL. \n", CmiMyPe(), myrank, destNode);
29 CmiAbort("LrtsSendPersistentMsg: not a valid PersistentHandle");
31 CmiAssert(CmiNodeOf(slot->destPE) == destNode);
32 if (size > slot->sizeMax) {
33 CmiPrintf("size: %d sizeMax: %d mype=%d destPe=%d\n", size, slot->sizeMax, CmiMyPe(), destNode);
34 CmiAbort("Abort: Invalid size\n");
37 if (slot->destBuf[0].destAddress) {
38 // CmiPrintf("[%d] LrtsSendPersistentMsg h=%p hdl=%d destNode=%d destAddress=%p size=%d\n", CmiMyPe(), h, CmiGetHandler(m), destNode, slot->destBuf[0].destAddress, size);
40 // uGNI part
41 START_EVENT();
42 MallocPostDesc(pd);
43 if(size <= LRTS_GNI_RDMA_THRESHOLD) {
44 pd->type = GNI_POST_FMA_PUT;
46 else
48 pd->type = GNI_POST_RDMA_PUT;
50 pd->cq_mode = GNI_CQMODE_GLOBAL_EVENT;
51 pd->dlvr_mode = GNI_DLVMODE_PERFORMANCE;
52 pd->length = ALIGN64(size);
53 pd->local_addr = (uint64_t) m;
55 pd->remote_addr = (uint64_t)slot->destBuf[0].destAddress;
56 pd->remote_mem_hndl = slot->destBuf[0].mem_hndl;
57 pd->src_cq_hndl = 0;//post_tx_cqh; /* smsg_tx_cqh; */
58 pd->rdma_mode = 0;
59 pd->cqwrite_value = PERSIST_SEQ;
60 pd->amo_cmd = 0;
62 #if CMK_WITH_STATS
63 pd->sync_flag_addr = 1000000 * CmiWallTimer(); //microsecond
64 #endif
65 SetMemHndlZero(pd->local_mem_hndl);
67 TRACE_COMM_CREATION(CpvAccess(projTraceStart), (void*)pd->local_addr);
68 /* always buffer */
69 #if CMK_SMP || 1
70 #if REMOTE_EVENT
71 bufferRdmaMsg(destNode, pd, (int)(size_t)(slot->destHandle));
72 #else
73 bufferRdmaMsg(destNode, pd, -1);
74 #endif
76 #else /* non smp */
78 #if REMOTE_EVENT
79 pd->cq_mode |= GNI_CQMODE_REMOTE_EVENT;
80 int sts = GNI_EpSetEventData(ep_hndl_array[destNode], destNode, PERSIST_EVENT((int)(size_t)(slot->destHandle)));
81 GNI_RC_CHECK("GNI_EpSetEventData", sts);
82 #endif
83 status = registerMessage((void*)(pd->local_addr), pd->length, pd->cqwrite_value, &pd->local_mem_hndl);
84 if (status == GNI_RC_SUCCESS)
86 #if CMK_WITH_STATS
87 RDMA_TRY_SEND(pd->type)
88 #endif
89 if(pd->type == GNI_POST_RDMA_PUT)
90 status = GNI_PostRdma(ep_hndl_array[destNode], pd);
91 else
92 status = GNI_PostFma(ep_hndl_array[destNode], pd);
94 else
95 status = GNI_RC_ERROR_RESOURCE;
96 if(status == GNI_RC_ERROR_RESOURCE|| status == GNI_RC_ERROR_NOMEM )
98 #if REMOTE_EVENT
99 bufferRdmaMsg(destNode, pd, (int)(size_t)(slot->destHandle));
100 #else
101 bufferRdmaMsg(destNode, pd, -1);
102 #endif
104 else {
105 GNI_RC_CHECK("AFter posting", status);
106 #if CMK_WITH_STATS
107 pd->sync_flag_value = 1000000 * CmiWallTimer(); //microsecond
108 RDMA_TRANS_INIT(pd->type, pd->sync_flag_addr/1000000.0)
109 #endif
111 #endif
113 else {
114 #if 1
115 if (slot->messageBuf != NULL) {
116 CmiPrintf("Unexpected message in buffer on %d\n", CmiMyPe());
117 CmiAbort("");
119 slot->messageBuf = m;
120 slot->messageSize = size;
121 #else
122 /* normal send */
123 PersistentHandle *phs_tmp = phs;
124 int phsSize_tmp = phsSize;
125 phs = NULL; phsSize = 0;
126 CmiPrintf("[%d]Slot sending message directly\n", CmiMyPe());
127 CmiSyncSendAndFree(slot->destPE, size, m);
128 phs = phs_tmp; phsSize = phsSize_tmp;
129 #endif
133 #if 0
134 void CmiSyncSendPersistent(int destPE, int size, char *msg, PersistentHandle h)
136 CmiState cs = CmiGetState();
137 char *dupmsg = (char *) CmiAlloc(size);
138 memcpy(dupmsg, msg, size);
140 /* CmiPrintf("Setting root to %d\n", 0); */
141 CMI_SET_BROADCAST_ROOT(dupmsg, 0);
143 if (cs->pe==destPE) {
144 CQdCreate(CpvAccess(cQdState), 1);
145 CdsFifo_Enqueue(CpvAccess(CmiLocalQueue),dupmsg);
147 else
148 LrtsSendPersistentMsg(h, destPE, size, dupmsg);
150 #endif
152 extern void CmiReference(void *blk);
154 #if 0
156 /* called in PumpMsgs */
157 int PumpPersistent()
159 int status = 0;
160 PersistentReceivesTable *slot = persistentReceivesTableHead;
161 while (slot) {
162 char *msg = slot->messagePtr[0];
163 int size = *(slot->recvSizePtr[0]);
164 if (size)
166 int *footer = (int*)(msg + size);
167 if (footer[0] == size && footer[1] == 1) {
168 /*CmiPrintf("[%d] PumpPersistent messagePtr=%p size:%d\n", CmiMyPe(), slot->messagePtr, size);*/
170 #if 0
171 void *dupmsg;
172 dupmsg = CmiAlloc(size);
174 _MEMCHECK(dupmsg);
175 memcpy(dupmsg, msg, size);
176 memset(msg, 0, size+2*sizeof(int));
177 msg = dupmsg;
178 #else
179 /* return messagePtr directly and user MUST make sure not to delete it. */
180 /*CmiPrintf("[%d] %p size:%d rank:%d root:%d\n", CmiMyPe(), msg, size, CMI_DEST_RANK(msg), CMI_BROADCAST_ROOT(msg));*/
182 CmiReference(msg);
183 swapRecvSlotBuffers(slot);
184 #endif
186 CmiPushPE(CMI_DEST_RANK(msg), msg);
187 #if CMK_BROADCAST_SPANNING_TREE
188 if (CMI_BROADCAST_ROOT(msg))
189 SendSpanningChildren(size, msg);
190 #endif
191 /* clear footer after message used */
192 *(slot->recvSizePtr[0]) = 0;
193 footer[0] = footer[1] = 0;
195 #if 0
196 /* not safe at all! */
197 /* instead of clear before use, do it earlier */
198 msg=slot->messagePtr[0];
199 size = *(slot->recvSizePtr[0]);
200 footer = (int*)(msg + size);
201 *(slot->recvSizePtr[0]) = 0;
202 footer[0] = footer[1] = 0;
203 #endif
204 status = 1;
207 slot = slot->next;
209 return status;
212 #endif
214 #if ! LARGEPAGE
215 #error "Persistent communication must be compiled with LARGEPAGE on"
216 #endif
218 void *PerAlloc(int size)
220 // return CmiAlloc(size);
221 gni_return_t status;
222 void *res = NULL;
223 char *ptr;
224 size = ALIGN64(size + sizeof(CmiChunkHeader));
225 //printf("[%d] PerAlloc %p %p %d. \n", myrank, res, ptr, size);
226 res = mempool_malloc(CpvAccess(mempool), ALIGNBUF+size-sizeof(mempool_header), 1);
227 if (res) ptr = (char*)res - sizeof(mempool_header) + ALIGNBUF;
228 SIZEFIELD(ptr)=size;
229 REFFIELD(ptr)=1;
230 MEMHFIELD(ptr) = GetMemHndl(ptr);
231 return ptr;
234 void PerFree(char *msg)
236 #if CMK_SMP
237 mempool_free_thread((char*)msg - ALIGNBUF + sizeof(mempool_header));
238 #else
239 mempool_free(CpvAccess(mempool), (char*)msg - ALIGNBUF + sizeof(mempool_header));
240 #endif
243 /* machine dependent init call */
244 void persist_machine_init(void)
248 void initSendSlot(PersistentSendsTable *slot)
250 int i;
251 slot->destPE = -1;
252 slot->sizeMax = 0;
253 slot->destHandle = 0;
254 #if 0
255 for (i=0; i<PERSIST_BUFFERS_NUM; i++) {
256 slot->destAddress[i] = NULL;
257 slot->destSizeAddress[i] = NULL;
259 #endif
260 memset(&slot->destBuf, 0, sizeof(PersistentBuf)*PERSIST_BUFFERS_NUM);
261 slot->messageBuf = 0;
262 slot->messageSize = 0;
263 slot->prev = slot->next = NULL;
266 void initRecvSlot(PersistentReceivesTable *slot)
268 int i;
269 #if 0
270 for (i=0; i<PERSIST_BUFFERS_NUM; i++) {
271 slot->messagePtr[i] = NULL;
272 slot->recvSizePtr[i] = NULL;
274 #endif
275 memset(&slot->destBuf, 0, sizeof(PersistentBuf)*PERSIST_BUFFERS_NUM);
276 slot->sizeMax = 0;
277 slot->index = -1;
278 slot->prev = slot->next = NULL;
281 void setupRecvSlot(PersistentReceivesTable *slot, int maxBytes)
283 int i;
284 for (i=0; i<PERSIST_BUFFERS_NUM; i++) {
285 char *buf = PerAlloc(maxBytes+sizeof(int)*2);
286 _MEMCHECK(buf);
287 memset(buf, 0, maxBytes+sizeof(int)*2);
288 slot->destBuf[i].mem_hndl = MEMHFIELD(buf);
289 slot->destBuf[i].destAddress = buf;
290 /* note: assume first integer in elan converse header is the msg size */
291 slot->destBuf[i].destSizeAddress = (unsigned int*)buf;
292 #if USE_LRTS_MEMPOOL
293 // assume already registered from mempool
294 // slot->destBuf[i].mem_hndl = GetMemHndl(buf);
295 #endif
296 // FIXME: assume always succeed
298 slot->sizeMax = maxBytes;
299 #if REMOTE_EVENT
300 CmiLock(persistPool.lock);
301 slot->index = IndexPool_getslot(&persistPool, slot, 2);
302 CmiUnlock(persistPool.lock);
303 #endif
306 void clearRecvSlot(PersistentReceivesTable *slot)
308 #if REMOTE_EVENT
309 CmiLock(persistPool.lock);
310 IndexPool_freeslot(&persistPool, slot->index);
311 CmiUnlock(persistPool.lock);
312 #endif
315 PersistentHandle getPersistentHandle(PersistentHandle h, int toindex)
317 #if REMOTE_EVENT
318 if (toindex)
319 return (PersistentHandle)(((PersistentReceivesTable*)h)->index);
320 else {
321 return (PersistentHandle)GetIndexAddress(persistPool, (int)(size_t)h);
323 #else
324 return h;
325 #endif