fix a bug when persistent does not work if CMK_USE_OOB is disabled.
[charm.git] / src / arch / gemini_gni / machine.c
blob34b824c8f8987628697c46d472032dd1f46579b0
2 /** @file
3 * Gemini GNI machine layer
5 * Author: Yanhua Sun
6 Gengbin Zheng
7 * Date: 07-01-2011
9 * Flow control by mem pool using environment variables:
11 # CHARM_UGNI_MEMPOOL_MAX can be maximum_register_mem/number_of_processes
12 # CHARM_UGNI_SEND_MAX can be half of CHARM_UGNI_MEMPOOL_MAX
13 export CHARM_UGNI_MEMPOOL_INIT_SIZE=8M
14 export CHARM_UGNI_MEMPOOL_MAX=20M
15 export CHARM_UGNI_SEND_MAX=10M
17 # limit on total mempool size allocated, this is to prevent mempool
18 # uses too much memory
19 export CHARM_UGNI_MEMPOOL_SIZE_LIMIT=512M
21 other environment variables:
23 export CHARM_UGNI_NO_DEADLOCK_CHECK=yes # disable checking deadlock
24 export CHARM_UGNI_MAX_MEMORY_ON_NODE=0.8G # max memory per node for mempool
25 export CHARM_UGNI_BIG_MSG_SIZE=4M # set big message size protocol
26 export CHARM_UGNI_BIG_MSG_PIPELINE_LEN=4 # set big message pipe len
27 export CHARM_UGNI_RDMA_MAX=100 # max pending RDMA operations
29 /*@{*/
31 #include <stdio.h>
32 #include <stdlib.h>
33 #include <stdint.h>
34 #include <errno.h>
35 #include <malloc.h>
36 #include <unistd.h>
37 #include <time.h>
38 #include <sys/dir.h>
39 #include <sys/stat.h>
40 #include <gni_pub.h>
41 #include <pmi.h>
42 //#include <numatoolkit.h>
44 #include "converse.h"
46 #if CMK_DIRECT
47 #include "cmidirect.h"
48 #endif
50 #define LARGEPAGE 0
52 #if CMK_SMP
53 #define MULTI_THREAD_SEND 0
54 #define COMM_THREAD_SEND (!MULTI_THREAD_SEND)
55 #endif
57 #if MULTI_THREAD_SEND
58 #define CMK_WORKER_SINGLE_TASK 1
59 #endif
61 #define REMOTE_EVENT 1
62 #define CQWRITE 0
64 #define CMI_EXERT_SEND_LARGE_CAP 0
65 #define CMI_EXERT_RECV_RDMA_CAP 0
68 #define CMI_SENDBUFFERSMSG_CAP 0
69 #define CMI_PUMPNETWORKSMSG_CAP 0
70 #define CMI_PUMPREMOTETRANSACTIONS_CAP 0
71 #define CMI_PUMPLOCALTRANSACTIONS_CAP 0
73 #if CMI_SENDBUFFERSMSG_CAP
74 int SendBufferMsg_cap = 20;
75 #endif
77 #if CMI_PUMPNETWORKSMSG_CAP
78 int PumpNetworkSmsg_cap = 20;
79 #endif
81 #if CMI_PUMPREMOTETRANSACTIONS_CAP
82 int PumpRemoteTransactions_cap = 20;
83 #endif
85 #if CMI_PUMPREMOTETRANSACTIONS_CAP
86 int PumpLocalTransactions_cap = 20;
87 #endif
89 #if CMI_EXERT_SEND_LARGE_CAP
90 static int SEND_large_cap = 20;
91 static int SEND_large_pending = 0;
92 #endif
94 #if CMI_EXERT_RECV_RDMA_CAP
95 static int RDMA_cap = 10;
96 static int RDMA_pending = 0;
97 #endif
99 #define USE_LRTS_MEMPOOL 1
101 #define PRINT_SYH 0
103 // Trace communication thread
104 #if CMK_TRACE_ENABLED && CMK_SMP_TRACE_COMMTHREAD
105 #define TRACE_THRESHOLD 0.00001
106 #define CMI_MPI_TRACE_MOREDETAILED 0
107 #undef CMI_MPI_TRACE_USEREVENTS
108 #define CMI_MPI_TRACE_USEREVENTS 1
109 #else
110 #undef CMK_SMP_TRACE_COMMTHREAD
111 #define CMK_SMP_TRACE_COMMTHREAD 0
112 #endif
114 #define CMK_TRACE_COMMOVERHEAD 0
115 #if CMK_TRACE_ENABLED && CMK_TRACE_COMMOVERHEAD
116 #undef CMI_MPI_TRACE_USEREVENTS
117 #define CMI_MPI_TRACE_USEREVENTS 1
118 #else
119 #undef CMK_TRACE_COMMOVERHEAD
120 #define CMK_TRACE_COMMOVERHEAD 0
121 #endif
123 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && ! CMK_TRACE_IN_CHARM
124 CpvStaticDeclare(double, projTraceStart);
125 #define START_EVENT() CpvAccess(projTraceStart) = CmiWallTimer();
126 #define END_EVENT(x) traceUserBracketEvent(x, CpvAccess(projTraceStart), CmiWallTimer());
127 #define EVENT_TIME() CpvAccess(projTraceStart)
128 #else
129 #define START_EVENT()
130 #define END_EVENT(x)
131 #define EVENT_TIME() (0.0)
132 #endif
134 #if USE_LRTS_MEMPOOL
136 #define oneMB (1024ll*1024)
137 #define oneGB (1024ll*1024*1024)
139 static CmiInt8 _mempool_size = 8*oneMB;
140 static CmiInt8 _expand_mem = 4*oneMB;
141 static CmiInt8 _mempool_size_limit = 0;
143 static CmiInt8 _totalmem = 0.8*oneGB;
145 #if LARGEPAGE
146 static CmiInt8 BIG_MSG = 16*oneMB;
147 static CmiInt8 ONE_SEG = 4*oneMB;
148 #else
149 static CmiInt8 BIG_MSG = 4*oneMB;
150 static CmiInt8 ONE_SEG = 2*oneMB;
151 #endif
152 #if MULTI_THREAD_SEND
153 static int BIG_MSG_PIPELINE = 1;
154 #else
155 static int BIG_MSG_PIPELINE = 4;
156 #endif
158 // dynamic flow control
159 static CmiInt8 buffered_send_msg = 0;
160 static CmiInt8 register_memory_size = 0;
162 #if LARGEPAGE
163 static CmiInt8 MAX_BUFF_SEND = 100000*oneMB;
164 static CmiInt8 MAX_REG_MEM = 200000*oneMB;
165 static CmiInt8 register_count = 0;
166 #else
167 #if CMK_SMP && COMM_THREAD_SEND
168 static CmiInt8 MAX_BUFF_SEND = 100*oneMB;
169 static CmiInt8 MAX_REG_MEM = 200*oneMB;
170 #else
171 static CmiInt8 MAX_BUFF_SEND = 16*oneMB;
172 static CmiInt8 MAX_REG_MEM = 25*oneMB;
173 #endif
176 #endif
178 #endif /* end USE_LRTS_MEMPOOL */
180 #if MULTI_THREAD_SEND
181 #define CMI_GNI_LOCK(x) CmiLock(x);
182 #define CMI_GNI_TRYLOCK(x) CmiTryLock(x)
183 #define CMI_GNI_UNLOCK(x) CmiUnlock(x);
184 #define CMI_PCQUEUEPOP_LOCK(Q) CmiLock((Q)->lock);
185 #define CMI_PCQUEUEPOP_UNLOCK(Q) CmiUnlock((Q)->lock);
186 #else
187 #define CMI_GNI_LOCK(x)
188 #define CMI_GNI_TRYLOCK(x) (0)
189 #define CMI_GNI_UNLOCK(x)
190 #define CMI_PCQUEUEPOP_LOCK(Q)
191 #define CMI_PCQUEUEPOP_UNLOCK(Q)
192 #endif
194 static int _tlbpagesize = 4096;
196 //static int _smpd_count = 0;
198 static int user_set_flag = 0;
200 static int _checkProgress = 1; /* check deadlock */
201 static int _detected_hang = 0;
203 #define SMSG_ATTR_SIZE sizeof(gni_smsg_attr_t)
205 // dynamic SMSG
206 static int useDynamicSMSG = 0; /* dynamic smsgs setup */
208 static int avg_smsg_connection = 32;
209 static int *smsg_connected_flag= 0;
210 static gni_smsg_attr_t **smsg_attr_vector_local;
211 static gni_smsg_attr_t **smsg_attr_vector_remote;
212 static gni_ep_handle_t ep_hndl_unbound;
213 static gni_smsg_attr_t send_smsg_attr;
214 static gni_smsg_attr_t recv_smsg_attr;
216 typedef struct _dynamic_smsg_mailbox{
217 void *mailbox_base;
218 int size;
219 int offset;
220 gni_mem_handle_t mem_hndl;
221 struct _dynamic_smsg_mailbox *next;
222 }dynamic_smsg_mailbox_t;
224 static dynamic_smsg_mailbox_t *mailbox_list;
226 static CmiUInt8 smsg_send_count = 0, last_smsg_send_count = 0;
227 static CmiUInt8 smsg_recv_count = 0, last_smsg_recv_count = 0;
229 #if PRINT_SYH
230 int lrts_send_msg_id = 0;
231 int lrts_local_done_msg = 0;
232 int lrts_send_rdma_success = 0;
233 #endif
235 #include "machine.h"
237 #include "pcqueue.h"
239 #include "mempool.h"
241 #if CMK_PERSISTENT_COMM
242 #include "machine-persistent.h"
243 #define POST_HIGHPRIORITY_RDMA STATS_SENDRDMAMSG_TIME(SendRdmaMsg(sendHighPriorBuf));
244 #else
245 #define POST_HIGHPRIORITY_RDMA
246 #endif
248 #if REMOTE_EVENT && (CMK_USE_OOB || CMK_PERSISTENT_COMM)
249 #define PUMP_REMOTE_HIGHPRIORITY STATS_PUMPREMOTETRANSACTIONS_TIME(PumpRemoteTransactions(highpriority_rx_cqh) );
250 #else
251 #define PUMP_REMOTE_HIGHPRIORITY
252 #endif
254 //#define USE_ONESIDED 1
255 #ifdef USE_ONESIDED
256 //onesided implementation is wrong, since no place to restore omdh
257 #include "onesided.h"
258 onesided_hnd_t onesided_hnd;
259 onesided_md_t omdh;
260 #define MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh) omdh. onesided_mem_register(handler, (uint64_t)msg, size, 0, myomdh)
262 #define MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh) onesided_mem_deregister(handler, myomdh)
264 #else
265 uint8_t onesided_hnd, omdh;
267 #if REMOTE_EVENT || CQWRITE
268 #define MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdhh, cqh, status) \
269 if(register_memory_size+size>= MAX_REG_MEM) { \
270 status = GNI_RC_ERROR_NOMEM;} \
271 else {status = GNI_MemRegister(nic_hndl, (uint64_t)msg, (uint64_t)size, cqh, GNI_MEM_READWRITE, -1, mem_hndl); \
272 if(status == GNI_RC_SUCCESS) register_memory_size += size; }
273 #else
274 #define MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh, cqh, status ) \
275 if (register_memory_size + size >= MAX_REG_MEM) { \
276 status = GNI_RC_ERROR_NOMEM; \
277 } else { status = GNI_MemRegister(nic_hndl, (uint64_t)msg, (uint64_t)size, NULL, GNI_MEM_READWRITE, -1, mem_hndl); \
278 if(status == GNI_RC_SUCCESS) register_memory_size += size; }
279 #endif
281 #define MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh, size) \
282 do { if (GNI_MemDeregister(nic_hndl, (mem_hndl) ) == GNI_RC_SUCCESS) \
283 register_memory_size -= size; \
284 else CmiAbort("MEM_DEregister"); \
285 } while (0)
286 #endif
288 #define GetMempoolBlockPtr(x) MEMPOOL_GetBlockPtr(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
289 #define GetMempoolPtr(x) MEMPOOL_GetMempoolPtr(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
290 #define GetMempoolsize(x) MEMPOOL_GetSize(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
291 #define GetMemHndl(x) MEMPOOL_GetMemHndl(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
292 #define IncreaseMsgInRecv(x) MEMPOOL_IncMsgInRecv(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
293 #define DecreaseMsgInRecv(x) MEMPOOL_DecMsgInRecv(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
294 #define IncreaseMsgInSend(x) MEMPOOL_IncMsgInSend(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
295 #define DecreaseMsgInSend(x) MEMPOOL_DecMsgInSend(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
296 #define NoMsgInSend(x) MEMPOOL_GetMsgInSend(MEMPOOL_GetMempoolHeader(x,ALIGNBUF)) == 0
297 #define NoMsgInRecv(x) MEMPOOL_GetMsgInRecv(MEMPOOL_GetMempoolHeader(x,ALIGNBUF)) == 0
298 #define NoMsgInFlight(x) (NoMsgInSend(x) && NoMsgInRecv(x))
299 #define IsMemHndlZero(x) ((x).qword1 == 0 && (x).qword2 == 0)
300 #define SetMemHndlZero(x) do {(x).qword1 = 0;(x).qword2 = 0;} while (0)
301 #define NotRegistered(x) IsMemHndlZero(GetMemHndl(x))
303 #define GetMemHndlFromBlockHeader(x) MEMPOOL_GetBlockMemHndl(x)
304 #define GetSizeFromBlockHeader(x) MEMPOOL_GetBlockSize(x)
306 #define CmiGetMsgSize(m) ((CmiMsgHeaderExt*)m)->size
307 #define CmiSetMsgSize(m,s) ((((CmiMsgHeaderExt*)m)->size)=(s))
308 #define CmiGetMsgSeq(m) ((CmiMsgHeaderExt*)m)->seq
309 #define CmiSetMsgSeq(m, s) ((((CmiMsgHeaderExt*)m)->seq) = (s))
311 #define ALIGNBUF 64
313 /* =======Beginning of Definitions of Performance-Specific Macros =======*/
314 /* If SMSG is not used */
316 #define FMA_PER_CORE 1024
317 #define FMA_BUFFER_SIZE 1024
319 /* If SMSG is used */
320 static int SMSG_MAX_MSG = 1024;
321 #define SMSG_MAX_CREDIT 72
323 #define MSGQ_MAXSIZE 2048
325 /* large message transfer with FMA or BTE */
326 #if ! REMOTE_EVENT
327 #define LRTS_GNI_RDMA_THRESHOLD 1024
328 #else
329 /* remote events only work with RDMA */
330 #define LRTS_GNI_RDMA_THRESHOLD 0
331 #endif
333 #if CMK_SMP
334 static int REMOTE_QUEUE_ENTRIES=163840;
335 static int LOCAL_QUEUE_ENTRIES=163840;
336 #else
337 static int REMOTE_QUEUE_ENTRIES=20480;
338 static int LOCAL_QUEUE_ENTRIES=20480;
339 #endif
341 #define BIG_MSG_TAG 0x26
342 #define PUT_DONE_TAG 0x28
343 #define DIRECT_PUT_DONE_TAG 0x29
344 #define ACK_TAG 0x30
345 /* SMSG is data message */
346 #define SMALL_DATA_TAG 0x31
347 /* SMSG is a control message to initialize a BTE */
348 #define LMSG_INIT_TAG 0x33
349 #define LMSG_OOB_INIT_TAG 0x35
351 #define DEBUG
352 #ifdef GNI_RC_CHECK
353 #undef GNI_RC_CHECK
354 #endif
355 #ifdef DEBUG
356 #define GNI_RC_CHECK(msg,rc) do { if(rc != GNI_RC_SUCCESS) { printf("[%d] %s; err=%s\n",CmiMyPe(),msg,gni_err_str[rc]); fflush(stdout); CmiAbort("GNI_RC_CHECK"); } } while(0)
357 #else
358 #define GNI_RC_CHECK(msg,rc)
359 #endif
361 #define ALIGN64(x) (size_t)((~63)&((x)+63))
362 //#define ALIGN4(x) (size_t)((~3)&((x)+3))
363 #define ALIGNHUGEPAGE(x) (size_t)((~(_tlbpagesize-1))&((x)+_tlbpagesize-1))
365 static int useStaticMSGQ = 0;
366 static int useStaticFMA = 0;
367 static int mysize, myrank;
368 static gni_nic_handle_t nic_hndl;
370 typedef struct {
371 gni_mem_handle_t mdh;
372 uint64_t addr;
373 } mdh_addr_t ;
374 // this is related to dynamic SMSG
376 typedef struct mdh_addr_list{
377 gni_mem_handle_t mdh;
378 void *addr;
379 struct mdh_addr_list *next;
380 }mdh_addr_list_t;
382 static unsigned int smsg_memlen;
383 gni_smsg_attr_t **smsg_local_attr_vec = 0;
384 mdh_addr_t setup_mem;
385 mdh_addr_t *smsg_connection_vec = 0;
386 gni_mem_handle_t smsg_connection_memhndl;
387 static int smsg_expand_slots = 10;
388 static int smsg_available_slot = 0;
389 static void *smsg_mailbox_mempool = 0;
390 mdh_addr_list_t *smsg_dynamic_list = 0;
392 static void *smsg_mailbox_base;
393 gni_msgq_attr_t msgq_attrs;
394 gni_msgq_handle_t msgq_handle;
395 gni_msgq_ep_attr_t msgq_ep_attrs;
396 gni_msgq_ep_attr_t msgq_ep_attrs_size;
398 /* =====Beginning of Declarations of Machine Specific Variables===== */
399 static int cookie;
400 static int modes = 0;
401 static gni_cq_handle_t smsg_rx_cqh = NULL; // smsg send
402 static gni_cq_handle_t default_tx_cqh = NULL; // bind to endpoint
403 static gni_cq_handle_t rdma_tx_cqh = NULL; // rdma - local event
404 static gni_cq_handle_t highprior_rdma_tx_cqh = NULL; // rdma - local event
405 static gni_cq_handle_t rdma_rx_cqh = NULL; // mempool - remote event
406 static gni_cq_handle_t highpriority_rx_cqh = NULL; // mempool - remote event
407 static gni_ep_handle_t *ep_hndl_array;
409 static CmiNodeLock *ep_lock_array;
410 static CmiNodeLock default_tx_cq_lock;
411 static CmiNodeLock rdma_tx_cq_lock;
412 static CmiNodeLock global_gni_lock;
413 static CmiNodeLock rx_cq_lock;
414 static CmiNodeLock smsg_mailbox_lock;
415 static CmiNodeLock smsg_rx_cq_lock;
416 static CmiNodeLock *mempool_lock;
417 //#define CMK_WITH_STATS 1
418 typedef struct msg_list
420 uint32_t destNode;
421 uint32_t size;
422 void *msg;
423 uint8_t tag;
424 #if CMK_WITH_STATS
425 double creation_time;
426 #endif
427 }MSG_LIST;
430 typedef struct control_msg
432 uint64_t source_addr; /* address from the start of buffer */
433 uint64_t dest_addr; /* address from the start of buffer */
434 int total_length; /* total length */
435 int length; /* length of this packet */
436 #if REMOTE_EVENT
437 int ack_index; /* index from integer to address */
438 #endif
439 uint8_t seq_id; //big message 0 meaning single message
440 gni_mem_handle_t source_mem_hndl;
441 struct control_msg *next;
442 } CONTROL_MSG;
444 #define CONTROL_MSG_SIZE (sizeof(CONTROL_MSG)-sizeof(void*))
446 typedef struct ack_msg
448 uint64_t source_addr; /* address from the start of buffer */
449 #if ! USE_LRTS_MEMPOOL
450 gni_mem_handle_t source_mem_hndl;
451 int length; /* total length */
452 #endif
453 struct ack_msg *next;
454 } ACK_MSG;
456 #define ACK_MSG_SIZE (sizeof(ACK_MSG)-sizeof(void*))
458 #if CMK_DIRECT
459 typedef struct{
460 uint64_t handler_addr;
461 }CMK_DIRECT_HEADER;
463 typedef struct {
464 char core[CmiMsgHeaderSizeBytes];
465 uint64_t handler;
466 }cmidirectMsg;
468 //SYH
469 CpvDeclare(int, CmiHandleDirectIdx);
470 void CmiHandleDirectMsg(cmidirectMsg* msg)
473 CmiDirectUserHandle *_handle= (CmiDirectUserHandle*)(msg->handler);
474 (*(_handle->callbackFnPtr))(_handle->callbackData);
475 CmiFree(msg);
478 void CmiDirectInit()
480 CpvInitialize(int, CmiHandleDirectIdx);
481 CpvAccess(CmiHandleDirectIdx) = CmiRegisterHandler( (CmiHandler) CmiHandleDirectMsg);
484 #endif
485 typedef struct rmda_msg
487 int destNode;
488 #if REMOTE_EVENT
489 int ack_index;
490 #endif
491 gni_post_descriptor_t *pd;
492 }RDMA_REQUEST;
495 #define SMP_LOCKS 1
496 #define ONE_SEND_QUEUE 0
497 typedef PCQueue BufferList;
498 typedef struct msg_list_index
500 PCQueue sendSmsgBuf;
501 #if SMP_LOCKS
502 CmiNodeLock lock;
503 int pushed;
504 int destpe;
505 #endif
506 } MSG_LIST_INDEX;
507 char *destpe_avail;
508 PCQueue sendRdmaBuf;
509 PCQueue sendHighPriorBuf;
510 // buffered send queue
511 #if ! ONE_SEND_QUEUE
512 typedef struct smsg_queue
514 MSG_LIST_INDEX *smsg_msglist_index;
515 int smsg_head_index;
516 #if SMP_LOCKS
517 PCQueue nonEmptyQueues;
518 #endif
519 } SMSG_QUEUE;
520 #else
521 typedef struct smsg_queue
523 PCQueue sendMsgBuf;
524 } SMSG_QUEUE;
525 #endif
527 SMSG_QUEUE smsg_queue;
528 #if CMK_USE_OOB
529 SMSG_QUEUE smsg_oob_queue;
530 #define SEND_OOB_SMSG(x) SendBufferMsg(&x, NULL);
531 #define PUMP_LOCAL_HIGHPRIORITY STATS_PUMPLOCALTRANSACTIONS_RDMA_TIME(PumpLocalTransactions(highprior_rdma_tx_cqh, rdma_tx_cq_lock));
532 #else
533 #define SEND_OOB_SMSG(x)
534 #define PUMP_LOCAL_HIGHPRIORITY
535 #endif
537 #define FreeMsgList(d) free(d);
538 #define MallocMsgList(d) d = ((MSG_LIST*)malloc(sizeof(MSG_LIST)));
540 #define FreeControlMsg(d) free(d);
541 #define MallocControlMsg(d) d = ((CONTROL_MSG*)malloc(sizeof(CONTROL_MSG)));
543 #define FreeAckMsg(d) free(d);
544 #define MallocAckMsg(d) d = ((ACK_MSG*)malloc(sizeof(ACK_MSG)));
546 #define FreeRdmaRequest(d) free(d);
547 #define MallocRdmaRequest(d) d = ((RDMA_REQUEST*)malloc(sizeof(RDMA_REQUEST)));
548 /* reuse gni_post_descriptor_t */
549 static gni_post_descriptor_t *post_freelist=0;
551 #define FreePostDesc(d) free(d);
552 #define MallocPostDesc(d) d = ((gni_post_descriptor_t*)malloc(sizeof(gni_post_descriptor_t))); _MEMCHECK(d);
555 /* LrtsSent is called but message can not be sent by SMSGSend because of mailbox full or no credit */
556 static int buffered_smsg_counter = 0;
558 /* SmsgSend return success but message sent is not confirmed by remote side */
559 static MSG_LIST *buffered_fma_head = 0;
560 static MSG_LIST *buffered_fma_tail = 0;
562 /* functions */
563 #define IsFree(a,ind) !( a& (1<<(ind) ))
564 #define SET_BITS(a,ind) a = ( a | (1<<(ind )) )
565 #define Reset(a,ind) a = ( a & (~(1<<(ind))) )
567 CpvDeclare(mempool_type*, mempool);
569 #if CMK_PERSISTENT_COMM
570 CpvDeclare(mempool_type*, persistent_mempool);
571 #endif
573 #if REMOTE_EVENT
574 /* ack pool for remote events */
576 static int SHIFT = 18;
577 #define INDEX_MASK ((1<<(32-SHIFT-1)) - 1)
578 #define RANK_MASK ((1<<SHIFT) - 1)
579 #define ACK_EVENT(idx) ((((idx) & INDEX_MASK)<<SHIFT) | myrank)
581 #define GET_TYPE(evt) (((evt) >> 31) & 1)
582 #define GET_RANK(evt) ((evt) & RANK_MASK)
583 #define GET_INDEX(evt) (((evt) >> SHIFT) & INDEX_MASK)
585 #define PERSIST_EVENT(idx) ( (1<<31) | (((idx) & INDEX_MASK)<<SHIFT) | myrank)
587 #if CMK_SMP
588 #define INIT_SIZE 4096
589 #else
590 #define INIT_SIZE 1024
591 #endif
593 struct IndexStruct {
594 void *addr;
595 int next;
596 int type; // 1: ACK 2: Persistent
599 typedef struct IndexPool {
600 struct IndexStruct *indexes;
601 int size;
602 int freehead;
603 CmiNodeLock lock;
604 } IndexPool;
606 static IndexPool ackPool;
607 #if CMK_PERSISTENT_COMM
608 static IndexPool persistPool;
609 #endif
611 #define GetIndexType(pool, s) (pool.indexes[s].type)
612 #define GetIndexAddress(pool, s) (pool.indexes[s].addr)
614 static void IndexPool_init(IndexPool *pool)
616 int i;
617 if ((1<<SHIFT) < mysize)
618 CmiAbort("Charm++ Error: Remote event's rank field overflow.");
619 pool->size = INIT_SIZE;
620 if ( (1<<(31-SHIFT)) < pool->size) CmiAbort("IndexPool_init: pool initial size is too big.");
621 pool->indexes = (struct IndexStruct *)malloc(pool->size*sizeof(struct IndexStruct));
622 for (i=0; i<pool->size-1; i++) {
623 pool->indexes[i].next = i+1;
624 pool->indexes[i].type = 0;
626 pool->indexes[i].next = -1;
627 pool->freehead = 0;
628 #if MULTI_THREAD_SEND || CMK_PERSISTENT_COMM
629 pool->lock = CmiCreateLock();
630 #else
631 pool->lock = 0;
632 #endif
635 static
636 inline int IndexPool_getslot(IndexPool *pool, void *addr, int type)
638 int s, i;
639 #if MULTI_THREAD_SEND
640 CmiLock(pool->lock);
641 #endif
642 s = pool->freehead;
643 if (s == -1) {
644 int newsize = pool->size * 2;
645 //printf("[%d] IndexPool_getslot %p expand to: %d\n", myrank, pool, newsize);
646 if (newsize > (1<<(32-SHIFT-1))) CmiAbort("IndexPool too large");
647 struct IndexStruct *old_ackpool = pool->indexes;
648 pool->indexes = (struct IndexStruct *)malloc(newsize*sizeof(struct IndexStruct));
649 memcpy(pool->indexes, old_ackpool, pool->size*sizeof(struct IndexStruct));
650 for (i=pool->size; i<newsize-1; i++) {
651 pool->indexes[i].next = i+1;
652 pool->indexes[i].type = 0;
654 pool->indexes[i].next = -1;
655 pool->indexes[i].type = 0;
656 pool->freehead = pool->size;
657 s = pool->size;
658 pool->size = newsize;
659 free(old_ackpool);
661 pool->freehead = pool->indexes[s].next;
662 pool->indexes[s].addr = addr;
663 CmiAssert(pool->indexes[s].type == 0 && (type == 1 || type == 2));
664 pool->indexes[s].type = type;
665 #if MULTI_THREAD_SEND
666 CmiUnlock(pool->lock);
667 #endif
668 return s;
671 static
672 inline void IndexPool_freeslot(IndexPool *pool, int s)
674 CmiAssert(s>=0 && s<pool->size);
675 #if MULTI_THREAD_SEND
676 CmiLock(pool->lock);
677 #endif
678 pool->indexes[s].next = pool->freehead;
679 pool->indexes[s].type = 0;
680 pool->freehead = s;
681 #if MULTI_THREAD_SEND
682 CmiUnlock(pool->lock);
683 #endif
687 #endif
689 /* =====Beginning of Definitions of Message-Corruption Related Macros=====*/
690 #define CMI_MAGIC(msg) ((CmiMsgHeaderBasic *)msg)->magic
691 #define CHARM_MAGIC_NUMBER 126
693 #if CMK_ERROR_CHECKING
694 extern unsigned char computeCheckSum(unsigned char *data, int len);
695 static int checksum_flag = 0;
696 #define CMI_SET_CHECKSUM(msg, len) \
697 if (checksum_flag) { \
698 ((CmiMsgHeaderBasic *)msg)->cksum = 0; \
699 ((CmiMsgHeaderBasic *)msg)->cksum = computeCheckSum((unsigned char*)msg, len); \
701 #define CMI_CHECK_CHECKSUM(msg, len) \
702 if (checksum_flag) \
703 if (computeCheckSum((unsigned char*)msg, len) != 0) \
704 CmiAbort("Fatal error: checksum doesn't agree!\n");
705 #else
706 #define CMI_SET_CHECKSUM(msg, len)
707 #define CMI_CHECK_CHECKSUM(msg, len)
708 #endif
709 /* =====End of Definitions of Message-Corruption Related Macros=====*/
711 static int print_stats = 0;
712 static int stats_off = 0;
713 void CmiTurnOnStats()
715 stats_off = 0;
716 //CmiPrintf("[%d][%d:%d]+++++++++++ turning on stats \n", CmiMyNode(), CmiMyPe(), CmiMyRank());
719 void CmiTurnOffStats()
721 stats_off = 1;
724 #define IS_PUT(type) (type == GNI_POST_FMA_PUT || type == GNI_POST_RDMA_PUT)
726 #if CMK_WITH_STATS
727 FILE *counterLog = NULL;
728 typedef struct comm_thread_stats
730 uint64_t smsg_data_count;
731 uint64_t lmsg_init_count;
732 uint64_t ack_count;
733 uint64_t big_msg_ack_count;
734 uint64_t smsg_count;
735 uint64_t direct_put_done_count;
736 uint64_t put_done_count;
737 //times of calling SmsgSend
738 uint64_t try_smsg_data_count;
739 uint64_t try_lmsg_init_count;
740 uint64_t try_ack_count;
741 uint64_t try_big_msg_ack_count;
742 uint64_t try_direct_put_done_count;
743 uint64_t try_put_done_count;
744 uint64_t try_smsg_count;
746 double max_time_in_send_buffered_smsg;
747 double all_time_in_send_buffered_smsg;
749 uint64_t rdma_get_count, rdma_put_count;
750 uint64_t try_rdma_get_count, try_rdma_put_count;
751 double max_time_from_control_to_rdma_init;
752 double all_time_from_control_to_rdma_init;
754 double max_time_from_rdma_init_to_rdma_done;
755 double all_time_from_rdma_init_to_rdma_done;
757 int count_in_PumpNetwork;
758 double time_in_PumpNetwork;
759 double max_time_in_PumpNetwork;
760 int count_in_SendBufferMsg_smsg;
761 double time_in_SendBufferMsg_smsg;
762 double max_time_in_SendBufferMsg_smsg;
763 int count_in_SendRdmaMsg;
764 double time_in_SendRdmaMsg;
765 double max_time_in_SendRdmaMsg;
766 int count_in_PumpRemoteTransactions;
767 double time_in_PumpRemoteTransactions;
768 double max_time_in_PumpRemoteTransactions;
769 int count_in_PumpLocalTransactions_rdma;
770 double time_in_PumpLocalTransactions_rdma;
771 double max_time_in_PumpLocalTransactions_rdma;
772 int count_in_PumpDatagramConnection;
773 double time_in_PumpDatagramConnection;
774 double max_time_in_PumpDatagramConnection;
775 } Comm_Thread_Stats;
777 static Comm_Thread_Stats comm_stats;
779 static char *counters_dirname = "counters";
781 static void init_comm_stats()
783 memset(&comm_stats, 0, sizeof(Comm_Thread_Stats));
784 if (print_stats){
785 char ln[200];
786 int code = mkdir(counters_dirname, 00777);
787 sprintf(ln,"%s/statistics.%d.%d", counters_dirname, mysize, myrank);
788 counterLog=fopen(ln,"w");
789 if (counterLog == NULL) CmiAbort("Counter files open failed");
793 #define SMSG_CREATION( x ) if(print_stats) { x->creation_time = CmiWallTimer(); }
795 #define SMSG_SENT_DONE(creation_time, tag) \
796 if (print_stats && !stats_off) { if( tag == SMALL_DATA_TAG) comm_stats.smsg_data_count++; \
797 else if( tag == LMSG_INIT_TAG || tag == LMSG_OOB_INIT_TAG) comm_stats.lmsg_init_count++; \
798 else if( tag == ACK_TAG) comm_stats.ack_count++; \
799 else if( tag == BIG_MSG_TAG) comm_stats.big_msg_ack_count++; \
800 else if( tag == PUT_DONE_TAG ) comm_stats.put_done_count++; \
801 else if( tag == DIRECT_PUT_DONE_TAG ) comm_stats.direct_put_done_count++; \
802 comm_stats.smsg_count++; \
803 double inbuff_time = CmiWallTimer() - creation_time; \
804 if(inbuff_time > comm_stats.max_time_in_send_buffered_smsg) comm_stats.max_time_in_send_buffered_smsg= inbuff_time; \
805 comm_stats.all_time_in_send_buffered_smsg += inbuff_time; \
808 #define SMSG_TRY_SEND(tag) \
809 if (print_stats && !stats_off){ if( tag == SMALL_DATA_TAG) comm_stats.try_smsg_data_count++; \
810 else if( tag == LMSG_INIT_TAG || tag == LMSG_OOB_INIT_TAG) comm_stats.try_lmsg_init_count++; \
811 else if( tag == ACK_TAG) comm_stats.try_ack_count++; \
812 else if( tag == BIG_MSG_TAG) comm_stats.try_big_msg_ack_count++; \
813 else if( tag == PUT_DONE_TAG ) comm_stats.try_put_done_count++; \
814 else if( tag == DIRECT_PUT_DONE_TAG ) comm_stats.try_direct_put_done_count++; \
815 comm_stats.try_smsg_count++; \
818 #define RDMA_TRY_SEND(type) if (print_stats && !stats_off) {IS_PUT(type)?comm_stats.try_rdma_put_count++:comm_stats.try_rdma_get_count++;}
820 #define RDMA_TRANS_DONE(x) \
821 if (print_stats && !stats_off) { double rdma_trans_time = CmiWallTimer() - x ; \
822 if(rdma_trans_time > comm_stats.max_time_from_rdma_init_to_rdma_done) comm_stats.max_time_from_rdma_init_to_rdma_done = rdma_trans_time; \
823 comm_stats.all_time_from_rdma_init_to_rdma_done += rdma_trans_time; \
826 #define RDMA_TRANS_INIT(type, x) \
827 if (print_stats && !stats_off) { IS_PUT(type)?comm_stats.rdma_put_count++:comm_stats.rdma_get_count++; \
828 double rdma_trans_time = CmiWallTimer() - x ; \
829 if(rdma_trans_time > comm_stats.max_time_from_control_to_rdma_init) comm_stats.max_time_from_control_to_rdma_init = rdma_trans_time; \
830 comm_stats.all_time_from_control_to_rdma_init += rdma_trans_time; \
833 #define STATS_PUMPNETWORK_TIME(x) \
834 { double t = CmiWallTimer(); \
835 x; \
836 t = CmiWallTimer() - t; \
837 comm_stats.count_in_PumpNetwork++; \
838 comm_stats.time_in_PumpNetwork += t; \
839 if (t>comm_stats.max_time_in_PumpNetwork) \
840 comm_stats.max_time_in_PumpNetwork = t; \
843 #define STATS_PUMPREMOTETRANSACTIONS_TIME(x) \
844 { double t = CmiWallTimer(); \
845 x; \
846 t = CmiWallTimer() - t; \
847 comm_stats.count_in_PumpRemoteTransactions ++; \
848 comm_stats.time_in_PumpRemoteTransactions += t; \
849 if (t>comm_stats.max_time_in_PumpRemoteTransactions) \
850 comm_stats.max_time_in_PumpRemoteTransactions = t; \
853 #define STATS_PUMPLOCALTRANSACTIONS_RDMA_TIME(x) \
854 { double t = CmiWallTimer(); \
855 x; \
856 t = CmiWallTimer() - t; \
857 comm_stats.count_in_PumpLocalTransactions_rdma ++; \
858 comm_stats.time_in_PumpLocalTransactions_rdma += t; \
859 if (t>comm_stats.max_time_in_PumpLocalTransactions_rdma) \
860 comm_stats.max_time_in_PumpLocalTransactions_rdma = t; \
863 #define STATS_SEND_SMSGS_TIME(x) \
864 { double t = CmiWallTimer(); \
865 x; \
866 t = CmiWallTimer() - t; \
867 comm_stats.count_in_SendBufferMsg_smsg ++; \
868 comm_stats.time_in_SendBufferMsg_smsg += t; \
869 if (t>comm_stats.max_time_in_SendBufferMsg_smsg) \
870 comm_stats.max_time_in_SendBufferMsg_smsg = t; \
873 #define STATS_SENDRDMAMSG_TIME(x) \
874 { double t = CmiWallTimer(); \
875 x; \
876 t = CmiWallTimer() - t; \
877 comm_stats.count_in_SendRdmaMsg ++; \
878 comm_stats.time_in_SendRdmaMsg += t; \
879 if (t>comm_stats.max_time_in_SendRdmaMsg) \
880 comm_stats.max_time_in_SendRdmaMsg = t; \
883 #define STATS_PUMPDATAGRAMCONNECTION_TIME(x) \
884 { double t = CmiWallTimer(); \
885 x; \
886 t = CmiWallTimer() - t; \
887 comm_stats.count_in_PumpDatagramConnection ++; \
888 comm_stats.time_in_PumpDatagramConnection += t; \
889 if (t>comm_stats.max_time_in_PumpDatagramConnection) \
890 comm_stats.max_time_in_PumpDatagramConnection = t; \
893 static void print_comm_stats()
895 fprintf(counterLog, "Node[%d] SMSG time in buffer\t[total:%f\tmax:%f\tAverage:%f](milisecond)\n", myrank, 1000.0*comm_stats.all_time_in_send_buffered_smsg, 1000.0*comm_stats.max_time_in_send_buffered_smsg, 1000.0*comm_stats.all_time_in_send_buffered_smsg/comm_stats.smsg_count);
896 fprintf(counterLog, "Node[%d] Smsg Msgs \t[Total:%lld\t Data:%lld\t Lmsg_Init:%lld\t ACK:%lld\t BIG_MSG_ACK:%lld Direct_put_done:%lld\t Persistent_put_done:%lld]\n", myrank,
897 comm_stats.smsg_count, comm_stats.smsg_data_count, comm_stats.lmsg_init_count,
898 comm_stats.ack_count, comm_stats.big_msg_ack_count, comm_stats.direct_put_done_count, comm_stats.put_done_count);
900 fprintf(counterLog, "Node[%d] SmsgSendCalls\t[Total:%lld\t Data:%lld\t Lmsg_Init:%lld\t ACK:%lld\t BIG_MSG_ACK:%lld Direct_put_done:%lld\t Persistent_put_done:%lld]\n\n", myrank,
901 comm_stats.try_smsg_count, comm_stats.try_smsg_data_count, comm_stats.try_lmsg_init_count,
902 comm_stats.try_ack_count, comm_stats.try_big_msg_ack_count, comm_stats.try_direct_put_done_count, comm_stats.try_put_done_count);
904 fprintf(counterLog, "Node[%d] Rdma Transaction [count (GET/PUT):%lld %lld\t calls (GET/PUT):%lld %lld]\n", myrank, comm_stats.rdma_get_count, comm_stats.rdma_put_count, comm_stats.try_rdma_get_count, comm_stats.try_rdma_put_count);
905 fprintf(counterLog, "Node[%d] Rdma time from control arrives to rdma init [Total:%f\tMAX:%f\t Average:%f](milisecond)\n", myrank, 1000.0*comm_stats.all_time_from_control_to_rdma_init, 1000.0*comm_stats.max_time_from_control_to_rdma_init, 1000.0*comm_stats.all_time_from_control_to_rdma_init/(comm_stats.rdma_get_count+comm_stats.rdma_put_count));
906 fprintf(counterLog, "Node[%d] Rdma time from init to rdma done [Total:%f\tMAX:%f\t Average:%f](milisecond)\n\n", myrank,1000.0*comm_stats.all_time_from_rdma_init_to_rdma_done, 1000.0*comm_stats.max_time_from_rdma_init_to_rdma_done, 1000.0*comm_stats.all_time_from_rdma_init_to_rdma_done/(comm_stats.rdma_get_count+comm_stats.rdma_put_count));
909 fprintf(counterLog, " count\ttotal(s)\tmax(s)\taverage(us)\n");
910 fprintf(counterLog, "PumpNetworkSmsg: %d\t%.6f\t%.6f\t%.6f\n", comm_stats.count_in_PumpNetwork, comm_stats.time_in_PumpNetwork, comm_stats.max_time_in_PumpNetwork, comm_stats.time_in_PumpNetwork*1e6/comm_stats.count_in_PumpNetwork);
911 fprintf(counterLog, "PumpRemoteTransactions: %d\t%.6f\t%.6f\t%.6f\n", comm_stats.count_in_PumpRemoteTransactions, comm_stats.time_in_PumpRemoteTransactions, comm_stats.max_time_in_PumpRemoteTransactions, comm_stats.time_in_PumpRemoteTransactions*1e6/comm_stats.count_in_PumpRemoteTransactions);
912 fprintf(counterLog, "PumpLocalTransactions(RDMA): %d\t%.6f\t%.6f\t%.6f\n", comm_stats.count_in_PumpLocalTransactions_rdma, comm_stats.time_in_PumpLocalTransactions_rdma, comm_stats.max_time_in_PumpLocalTransactions_rdma, comm_stats.time_in_PumpLocalTransactions_rdma*1e6/comm_stats.count_in_PumpLocalTransactions_rdma);
913 fprintf(counterLog, "SendBufferMsg (SMSG): %d\t%.6f\t%.6f\t%.6f\n", comm_stats.count_in_SendBufferMsg_smsg, comm_stats.time_in_SendBufferMsg_smsg, comm_stats.max_time_in_SendBufferMsg_smsg, comm_stats.time_in_SendBufferMsg_smsg*1e6/comm_stats.count_in_SendBufferMsg_smsg);
914 fprintf(counterLog, "SendRdmaMsg: %d\t%.6f\t%.6f\t%.6f\n", comm_stats.count_in_SendRdmaMsg, comm_stats.time_in_SendRdmaMsg, comm_stats.max_time_in_SendRdmaMsg, comm_stats.time_in_SendRdmaMsg*1e6/comm_stats.count_in_SendRdmaMsg);
915 if (useDynamicSMSG)
916 fprintf(counterLog, "PumpDatagramConnection: %d\t%.6f\t%.6f\t%.6f\n", comm_stats.count_in_PumpDatagramConnection, comm_stats.time_in_PumpDatagramConnection, comm_stats.max_time_in_PumpDatagramConnection, comm_stats.time_in_PumpDatagramConnection*1e6/comm_stats.count_in_PumpDatagramConnection);
918 fclose(counterLog);
921 #else
922 #define STATS_PUMPNETWORK_TIME(x) x
923 #define STATS_SEND_SMSGS_TIME(x) x
924 #define STATS_PUMPREMOTETRANSACTIONS_TIME(x) x
925 #define STATS_PUMPLOCALTRANSACTIONS_RDMA_TIME(x) x
926 #define STATS_SENDRDMAMSG_TIME(x) x
927 #define STATS_PUMPDATAGRAMCONNECTION_TIME(x) x
928 #endif
930 static void
931 allgather(void *in,void *out, int len)
933 static int *ivec_ptr=NULL,already_called=0,job_size=0;
934 int i,rc;
935 int my_rank;
936 char *tmp_buf,*out_ptr;
938 if(!already_called) {
940 rc = PMI_Get_size(&job_size);
941 CmiAssert(rc == PMI_SUCCESS);
942 rc = PMI_Get_rank(&my_rank);
943 CmiAssert(rc == PMI_SUCCESS);
945 ivec_ptr = (int *)malloc(sizeof(int) * job_size);
946 CmiAssert(ivec_ptr != NULL);
948 rc = PMI_Allgather(&my_rank,ivec_ptr,sizeof(int));
949 CmiAssert(rc == PMI_SUCCESS);
951 already_called = 1;
955 tmp_buf = (char *)malloc(job_size * len);
956 CmiAssert(tmp_buf);
958 rc = PMI_Allgather(in,tmp_buf,len);
959 CmiAssert(rc == PMI_SUCCESS);
961 out_ptr = out;
963 for(i=0;i<job_size;i++) {
965 memcpy(&out_ptr[len * ivec_ptr[i]],&tmp_buf[i * len],len);
969 free(tmp_buf);
972 static void
973 allgather_2(void *in,void *out, int len)
975 //PMI_Allgather is out of order
976 int i,rc, extend_len;
977 int rank_index;
978 char *out_ptr, *out_ref;
979 char *in2;
981 extend_len = sizeof(int) + len;
982 in2 = (char*)malloc(extend_len);
984 memcpy(in2, &myrank, sizeof(int));
985 memcpy(in2+sizeof(int), in, len);
987 out_ptr = (char*)malloc(mysize*extend_len);
989 rc = PMI_Allgather(in2, out_ptr, extend_len);
990 GNI_RC_CHECK("allgather", rc);
992 out_ref = out;
994 for(i=0;i<mysize;i++) {
995 //rank index
996 memcpy(&rank_index, &(out_ptr[extend_len*i]), sizeof(int));
997 //copy to the rank index slot
998 memcpy(&out_ref[rank_index*len], &out_ptr[extend_len*i+sizeof(int)], len);
1001 free(out_ptr);
1002 free(in2);
1006 static unsigned int get_gni_nic_address(int device_id)
1008 unsigned int address, cpu_id;
1009 gni_return_t status;
1010 int i, alps_dev_id=-1,alps_address=-1;
1011 char *token, *p_ptr;
1013 p_ptr = getenv("PMI_GNI_DEV_ID");
1014 if (!p_ptr) {
1015 status = GNI_CdmGetNicAddress(device_id, &address, &cpu_id);
1017 GNI_RC_CHECK("GNI_CdmGetNicAddress", status);
1018 } else {
1019 while ((token = strtok(p_ptr,":")) != NULL) {
1020 alps_dev_id = atoi(token);
1021 if (alps_dev_id == device_id) {
1022 break;
1024 p_ptr = NULL;
1026 CmiAssert(alps_dev_id != -1);
1027 p_ptr = getenv("PMI_GNI_LOC_ADDR");
1028 CmiAssert(p_ptr != NULL);
1029 i = 0;
1030 while ((token = strtok(p_ptr,":")) != NULL) {
1031 if (i == alps_dev_id) {
1032 alps_address = atoi(token);
1033 break;
1035 p_ptr = NULL;
1036 ++i;
1038 CmiAssert(alps_address != -1);
1039 address = alps_address;
1041 return address;
1044 static uint8_t get_ptag(void)
1046 char *p_ptr, *token;
1047 uint8_t ptag;
1049 p_ptr = getenv("PMI_GNI_PTAG");
1050 CmiAssert(p_ptr != NULL);
1051 token = strtok(p_ptr, ":");
1052 ptag = (uint8_t)atoi(token);
1053 return ptag;
1057 static uint32_t get_cookie(void)
1059 uint32_t cookie;
1060 char *p_ptr, *token;
1062 p_ptr = getenv("PMI_GNI_COOKIE");
1063 CmiAssert(p_ptr != NULL);
1064 token = strtok(p_ptr, ":");
1065 cookie = (uint32_t)atoi(token);
1067 return cookie;
1070 #if LARGEPAGE
1072 /* directly mmap memory from hugetlbfs for large pages */
1074 #include <sys/stat.h>
1075 #include <fcntl.h>
1076 #include <sys/mman.h>
1077 #include <hugetlbfs.h>
1079 // size must be _tlbpagesize aligned
1080 void *my_get_huge_pages(size_t size)
1082 char filename[512];
1083 int fd;
1084 mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
1085 void *ptr = NULL;
1087 snprintf(filename, sizeof(filename), "%s/charm_mempool.%d.%d", hugetlbfs_find_path_for_size(_tlbpagesize), getpid(), rand());
1088 fd = open(filename, O_RDWR | O_CREAT, mode);
1089 if (fd == -1) {
1090 CmiAbort("my_get_huge_pages: open filed");
1092 ptr = mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_PRIVATE, fd, 0);
1093 if (ptr == MAP_FAILED) ptr = NULL;
1094 //printf("[%d] my_get_huge_pages: %s %d %p\n", myrank, filename, size, ptr);
1095 close(fd);
1096 unlink(filename);
1097 return ptr;
1100 void my_free_huge_pages(void *ptr, int size)
1102 //printf("[%d] my_free_huge_pages: %p %d\n", myrank, ptr, size);
1103 int ret = munmap(ptr, size);
1104 if (ret == -1) CmiAbort("munmap failed in my_free_huge_pages");
1107 #endif
1109 /* =====Beginning of Definitions of Message-Corruption Related Macros=====*/
1110 /* TODO: add any that are related */
1111 /* =====End of Definitions of Message-Corruption Related Macros=====*/
1114 #include "machine-lrts.h"
1115 #include "machine-common-core.c"
1118 static int SendBufferMsg(SMSG_QUEUE *queue, SMSG_QUEUE *urgent_queue);
1119 static void SendRdmaMsg(PCQueue );
1120 static void PumpNetworkSmsg();
1121 static void PumpLocalTransactions(gni_cq_handle_t tx_cqh, CmiNodeLock cq_lock);
1122 #if CQWRITE
1123 static void PumpCqWriteTransactions();
1124 #endif
1125 #if REMOTE_EVENT
1126 static void PumpRemoteTransactions(gni_cq_handle_t);
1127 #endif
1129 #if MACHINE_DEBUG_LOG
1130 FILE *debugLog = NULL;
1131 static CmiInt8 buffered_recv_msg = 0;
1132 int lrts_smsg_success = 0;
1133 int lrts_received_msg = 0;
1134 #endif
1136 static void sweep_mempool(mempool_type *mptr)
1138 int n = 0;
1139 block_header *current = &(mptr->block_head);
1141 printf("[n %d %d] sweep_mempool slot START.\n", myrank, n++);
1142 while( current!= NULL) {
1143 printf("[n %d %d] sweep_mempool slot %p size: %lld used: %d (%d %d) %lld %lld.\n", myrank, n++, current, current->size, 1<<current->used, current->msgs_in_send, current->msgs_in_recv, current->mem_hndl.qword1, current->mem_hndl.qword2);
1144 current = current->block_next?(block_header *)((char*)mptr+current->block_next):NULL;
1146 printf("[n %d] sweep_mempool slot END.\n", myrank);
1149 inline
1150 static gni_return_t deregisterMemory(mempool_type *mptr, block_header **from)
1152 block_header *current = *from;
1154 //while(register_memory_size>= MAX_REG_MEM)
1156 while( current!= NULL && ((current->msgs_in_send+current->msgs_in_recv)>0 || IsMemHndlZero(current->mem_hndl) ))
1157 current = current->block_next?(block_header *)((char*)mptr+current->block_next):NULL;
1159 *from = current;
1160 if(current == NULL) return GNI_RC_ERROR_RESOURCE;
1161 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(GetMemHndlFromBlockHeader(current)) , &omdh, GetSizeFromBlockHeader(current));
1162 SetMemHndlZero(GetMemHndlFromBlockHeader(current));
1164 return GNI_RC_SUCCESS;
1167 inline
1168 static gni_return_t registerFromMempool(mempool_type *mptr, void *blockaddr, size_t size, gni_mem_handle_t *memhndl, gni_cq_handle_t cqh )
1170 gni_return_t status = GNI_RC_SUCCESS;
1171 //int size = GetMempoolsize(msg);
1172 //void *blockaddr = GetMempoolBlockPtr(msg);
1173 //gni_mem_handle_t *memhndl = &(GetMemHndl(msg));
1175 block_header *current = &(mptr->block_head);
1176 while(register_memory_size>= MAX_REG_MEM)
1178 status = deregisterMemory(mptr, &current);
1179 if (status != GNI_RC_SUCCESS) break;
1181 if(register_memory_size>= MAX_REG_MEM) return status;
1183 MACHSTATE3(8, "mempool (%lld,%lld,%d) \n", buffered_send_msg, buffered_recv_msg, register_memory_size);
1184 while(1)
1186 MEMORY_REGISTER(onesided_hnd, nic_hndl, blockaddr, size, memhndl, &omdh, cqh, status);
1187 if(status == GNI_RC_SUCCESS)
1189 break;
1191 else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1193 GNI_RC_CHECK("registerFromMempool", status);
1195 else
1197 status = deregisterMemory(mptr, &current);
1198 if (status != GNI_RC_SUCCESS) break;
1201 return status;
1204 inline
1205 static gni_return_t registerMemory(void *msg, size_t size, gni_mem_handle_t *t, gni_cq_handle_t cqh )
1207 static int rank = -1;
1208 int i;
1209 gni_return_t status;
1210 mempool_type *mptr1 = CpvAccess(mempool);//mempool_type*)GetMempoolPtr(msg);
1211 //mempool_type *mptr1 = (mempool_type*)GetMempoolPtr(msg);
1212 mempool_type *mptr;
1214 status = registerFromMempool(mptr1, msg, size, t, cqh);
1215 if (status == GNI_RC_SUCCESS) return status;
1216 #if CMK_SMP
1217 for (i=0; i<CmiMyNodeSize()+1; i++) {
1218 rank = (rank+1)%(CmiMyNodeSize()+1);
1219 mptr = CpvAccessOther(mempool, rank);
1220 if (mptr == mptr1) continue;
1221 status = registerFromMempool(mptr, msg, size, t, cqh);
1222 if (status == GNI_RC_SUCCESS) return status;
1224 #endif
1225 return GNI_RC_ERROR_RESOURCE;
1228 inline
1229 static void buffer_small_msgs(SMSG_QUEUE *queue, void *msg, int size, int destNode, uint8_t tag)
1231 MSG_LIST *msg_tmp;
1232 MallocMsgList(msg_tmp);
1233 msg_tmp->destNode = destNode;
1234 msg_tmp->size = size;
1235 msg_tmp->msg = msg;
1236 msg_tmp->tag = tag;
1237 #if CMK_WITH_STATS
1238 SMSG_CREATION(msg_tmp)
1239 #endif
1241 #if ONE_SEND_QUEUE
1242 PCQueuePush(queue->sendMsgBuf, (char*)msg_tmp);
1243 #else
1244 #if SMP_LOCKS
1245 CmiLock(queue->smsg_msglist_index[destNode].lock);
1246 if(queue->smsg_msglist_index[destNode].pushed == 0)
1248 PCQueuePush(queue->nonEmptyQueues, (char*)&(queue->smsg_msglist_index[destNode]));
1250 PCQueuePush(queue->smsg_msglist_index[destNode].sendSmsgBuf, (char*)msg_tmp);
1251 CmiUnlock(queue->smsg_msglist_index[destNode].lock);
1252 #else
1253 PCQueuePush(queue->smsg_msglist_index[destNode].sendSmsgBuf, (char*)msg_tmp);
1254 #endif
1255 #endif
1257 #if PRINT_SYH
1258 buffered_smsg_counter++;
1259 #endif
1262 inline static void print_smsg_attr(gni_smsg_attr_t *a)
1264 printf("type=%d\n, credit=%d\n, size=%d\n, buf=%p, offset=%d\n", a->msg_type, a->mbox_maxcredit, a->buff_size, a->msg_buffer, a->mbox_offset);
1267 inline
1268 static void setup_smsg_connection(int destNode)
1270 mdh_addr_list_t *new_entry = 0;
1271 gni_post_descriptor_t *pd;
1272 gni_smsg_attr_t *smsg_attr;
1273 gni_return_t status = GNI_RC_NOT_DONE;
1274 RDMA_REQUEST *rdma_request_msg;
1276 if(smsg_available_slot == smsg_expand_slots)
1278 new_entry = (mdh_addr_list_t*)malloc(sizeof(mdh_addr_list_t));
1279 new_entry->addr = memalign(64, smsg_memlen*smsg_expand_slots);
1280 bzero(new_entry->addr, smsg_memlen*smsg_expand_slots);
1282 status = GNI_MemRegister(nic_hndl, (uint64_t)new_entry->addr,
1283 smsg_memlen*smsg_expand_slots, smsg_rx_cqh,
1284 GNI_MEM_READWRITE,
1286 &(new_entry->mdh));
1287 smsg_available_slot = 0;
1288 new_entry->next = smsg_dynamic_list;
1289 smsg_dynamic_list = new_entry;
1291 smsg_attr = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
1292 smsg_attr->msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
1293 smsg_attr->mbox_maxcredit = SMSG_MAX_CREDIT;
1294 smsg_attr->msg_maxsize = SMSG_MAX_MSG;
1295 smsg_attr->mbox_offset = smsg_available_slot * smsg_memlen;
1296 smsg_attr->buff_size = smsg_memlen;
1297 smsg_attr->msg_buffer = smsg_dynamic_list->addr;
1298 smsg_attr->mem_hndl = smsg_dynamic_list->mdh;
1299 smsg_local_attr_vec[destNode] = smsg_attr;
1300 smsg_available_slot++;
1301 MallocPostDesc(pd);
1302 pd->type = GNI_POST_FMA_PUT;
1303 pd->cq_mode = GNI_CQMODE_GLOBAL_EVENT ;
1304 pd->dlvr_mode = GNI_DLVMODE_PERFORMANCE;
1305 pd->length = sizeof(gni_smsg_attr_t);
1306 pd->local_addr = (uint64_t) smsg_attr;
1307 pd->remote_addr = (uint64_t)&((((gni_smsg_attr_t*)(smsg_connection_vec[destNode].addr))[myrank]));
1308 pd->remote_mem_hndl = smsg_connection_vec[destNode].mdh;
1309 pd->src_cq_hndl = 0;
1311 pd->rdma_mode = 0;
1312 status = GNI_PostFma(ep_hndl_array[destNode], pd);
1313 print_smsg_attr(smsg_attr);
1314 if(status == GNI_RC_ERROR_RESOURCE )
1316 MallocRdmaRequest(rdma_request_msg);
1317 rdma_request_msg->destNode = destNode;
1318 rdma_request_msg->pd = pd;
1319 /* buffer this request */
1321 #if PRINT_SYH
1322 if(status != GNI_RC_SUCCESS)
1323 printf("[%d=%d] send post FMA %s\n", myrank, destNode, gni_err_str[status]);
1324 else
1325 printf("[%d=%d]OK send post FMA \n", myrank, destNode);
1326 #endif
1329 /* useDynamicSMSG */
1330 inline
1331 static void alloc_smsg_attr( gni_smsg_attr_t *local_smsg_attr)
1333 gni_return_t status = GNI_RC_NOT_DONE;
1335 if(mailbox_list->offset == mailbox_list->size)
1337 dynamic_smsg_mailbox_t *new_mailbox_entry;
1338 new_mailbox_entry = (dynamic_smsg_mailbox_t*)malloc(sizeof(dynamic_smsg_mailbox_t));
1339 new_mailbox_entry->size = smsg_memlen*avg_smsg_connection;
1340 new_mailbox_entry->mailbox_base = malloc(new_mailbox_entry->size);
1341 bzero(new_mailbox_entry->mailbox_base, new_mailbox_entry->size);
1342 new_mailbox_entry->offset = 0;
1344 status = GNI_MemRegister(nic_hndl, (uint64_t)new_mailbox_entry->mailbox_base,
1345 new_mailbox_entry->size, smsg_rx_cqh,
1346 GNI_MEM_READWRITE,
1348 &(new_mailbox_entry->mem_hndl));
1350 GNI_RC_CHECK("register", status);
1351 new_mailbox_entry->next = mailbox_list;
1352 mailbox_list = new_mailbox_entry;
1354 local_smsg_attr->msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
1355 local_smsg_attr->mbox_maxcredit = SMSG_MAX_CREDIT;
1356 local_smsg_attr->msg_maxsize = SMSG_MAX_MSG;
1357 local_smsg_attr->mbox_offset = mailbox_list->offset;
1358 mailbox_list->offset += smsg_memlen;
1359 local_smsg_attr->buff_size = smsg_memlen;
1360 local_smsg_attr->msg_buffer = mailbox_list->mailbox_base;
1361 local_smsg_attr->mem_hndl = mailbox_list->mem_hndl;
1364 /* useDynamicSMSG */
1365 inline
1366 static int connect_to(int destNode)
1368 gni_return_t status = GNI_RC_NOT_DONE;
1369 CmiAssert(smsg_connected_flag[destNode] == 0);
1370 CmiAssert (smsg_attr_vector_local[destNode] == NULL);
1371 smsg_attr_vector_local[destNode] = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
1372 alloc_smsg_attr(smsg_attr_vector_local[destNode]);
1373 smsg_attr_vector_remote[destNode] = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
1375 CMI_GNI_LOCK(global_gni_lock)
1376 status = GNI_EpPostDataWId (ep_hndl_array[destNode], smsg_attr_vector_local[destNode], sizeof(gni_smsg_attr_t),smsg_attr_vector_remote[destNode] ,sizeof(gni_smsg_attr_t), destNode+mysize);
1377 CMI_GNI_UNLOCK(global_gni_lock)
1378 if (status == GNI_RC_ERROR_RESOURCE) {
1379 /* possibly destNode is making connection at the same time */
1380 free(smsg_attr_vector_local[destNode]);
1381 smsg_attr_vector_local[destNode] = NULL;
1382 free(smsg_attr_vector_remote[destNode]);
1383 smsg_attr_vector_remote[destNode] = NULL;
1384 mailbox_list->offset -= smsg_memlen;
1385 #if PRINT_SYH
1386 printf("[%d] send connect_to request to %d failed\n", myrank, destNode);
1387 #endif
1388 return 0;
1390 GNI_RC_CHECK("GNI_Post", status);
1391 smsg_connected_flag[destNode] = 1;
1392 #if PRINT_SYH
1393 printf("[%d] send connect_to request to %d done\n", myrank, destNode);
1394 #endif
1395 return 1;
1398 inline
1399 static gni_return_t send_smsg_message(SMSG_QUEUE *queue, int destNode, void *msg, int size, uint8_t tag, int inbuff, MSG_LIST *ptr )
1401 unsigned int remote_address;
1402 uint32_t remote_id;
1403 gni_return_t status = GNI_RC_ERROR_RESOURCE;
1404 gni_smsg_attr_t *smsg_attr;
1405 gni_post_descriptor_t *pd;
1406 gni_post_state_t post_state;
1407 char *real_data;
1409 if (useDynamicSMSG) {
1410 switch (smsg_connected_flag[destNode]) {
1411 case 0:
1412 connect_to(destNode); /* continue to case 1 */
1413 case 1: /* pending connection, do nothing */
1414 status = GNI_RC_NOT_DONE;
1415 if(inbuff ==0)
1416 buffer_small_msgs(queue, msg, size, destNode, tag);
1417 return status;
1420 #if ! ONE_SEND_QUEUE
1421 if(PCQueueEmpty(queue->smsg_msglist_index[destNode].sendSmsgBuf) || inbuff==1)
1422 #endif
1424 //CMI_GNI_LOCK(smsg_mailbox_lock)
1425 CMI_GNI_LOCK(default_tx_cq_lock)
1426 #if CMK_SMP_TRACE_COMMTHREAD
1427 int oldpe = -1;
1428 int oldeventid = -1;
1429 if(tag == SMALL_DATA_TAG || tag == LMSG_INIT_TAG || tag == LMSG_OOB_INIT_TAG)
1431 START_EVENT();
1432 if ( tag == SMALL_DATA_TAG)
1433 real_data = (char*)msg;
1434 else
1435 real_data = (char*)(((CONTROL_MSG*)msg)->source_addr);
1436 TRACE_COMM_GET_MSGID(real_data, &oldpe, &oldeventid);
1437 TRACE_COMM_SET_COMM_MSGID(real_data);
1439 #endif
1440 #if REMOTE_EVENT
1441 if (tag == LMSG_INIT_TAG || tag == LMSG_OOB_INIT_TAG) {
1442 CONTROL_MSG *control_msg_tmp = (CONTROL_MSG*)msg;
1443 if (control_msg_tmp->seq_id == 0 && control_msg_tmp->ack_index == -1)
1444 control_msg_tmp->ack_index = IndexPool_getslot(&ackPool, (void*)control_msg_tmp->source_addr, 1);
1446 // GNI_EpSetEventData(ep_hndl_array[destNode], destNode, myrank);
1447 #endif
1448 #if CMK_WITH_STATS
1449 SMSG_TRY_SEND(tag)
1450 #endif
1451 #if CMK_WITH_STATS
1452 double creation_time;
1453 if (ptr == NULL)
1454 creation_time = CmiWallTimer();
1455 else
1456 creation_time = ptr->creation_time;
1457 #endif
1459 status = GNI_SmsgSendWTag(ep_hndl_array[destNode], NULL, 0, msg, size, 0, tag);
1460 #if CMK_SMP_TRACE_COMMTHREAD
1461 if (oldpe != -1) TRACE_COMM_SET_MSGID(real_data, oldpe, oldeventid);
1462 #endif
1463 CMI_GNI_UNLOCK(default_tx_cq_lock)
1464 //CMI_GNI_UNLOCK(smsg_mailbox_lock)
1465 if(status == GNI_RC_SUCCESS)
1467 #if CMK_WITH_STATS
1468 SMSG_SENT_DONE(creation_time,tag)
1469 #endif
1470 #if CMK_SMP_TRACE_COMMTHREAD
1471 if(tag == SMALL_DATA_TAG || tag == LMSG_INIT_TAG || tag == LMSG_OOB_INIT_TAG)
1473 TRACE_COMM_CREATION(CpvAccess(projTraceStart), real_data);
1475 #endif
1476 }else
1477 status = GNI_RC_ERROR_RESOURCE;
1479 if(status != GNI_RC_SUCCESS && inbuff ==0)
1480 buffer_small_msgs(queue, msg, size, destNode, tag);
1481 return status;
1484 inline
1485 static CONTROL_MSG* construct_control_msg(int size, char *msg, uint8_t seqno)
1487 /* construct a control message and send */
1488 CONTROL_MSG *control_msg_tmp;
1489 MallocControlMsg(control_msg_tmp);
1490 control_msg_tmp->source_addr = (uint64_t)msg;
1491 control_msg_tmp->seq_id = seqno;
1492 control_msg_tmp->total_length = control_msg_tmp->length = ALIGN64(size); //for GET 4 bytes aligned
1493 #if REMOTE_EVENT
1494 control_msg_tmp->ack_index = -1;
1495 #endif
1496 #if USE_LRTS_MEMPOOL
1497 if(size < BIG_MSG)
1499 control_msg_tmp->source_mem_hndl = GetMemHndl(msg);
1501 else
1503 SetMemHndlZero(control_msg_tmp->source_mem_hndl);
1504 control_msg_tmp->length = size - (seqno-1)*ONE_SEG;
1505 if (control_msg_tmp->length > ONE_SEG) control_msg_tmp->length = ONE_SEG;
1507 #else
1508 SetMemHndlZero(control_msg_tmp->source_mem_hndl);
1509 #endif
1510 return control_msg_tmp;
1513 #define BLOCKING_SEND_CONTROL 0
1515 // Large message, send control to receiver, receiver register memory and do a GET,
1516 // return 1 - send no success
1517 inline static gni_return_t send_large_messages(SMSG_QUEUE *queue, int destNode, CONTROL_MSG *control_msg_tmp, int inbuff, MSG_LIST *smsg_ptr, uint8_t lmsg_tag)
1519 gni_return_t status = GNI_RC_ERROR_NOMEM;
1520 uint32_t vmdh_index = -1;
1521 int size;
1522 int offset = 0;
1523 uint64_t source_addr;
1524 int register_size;
1525 void *msg;
1527 size = control_msg_tmp->total_length;
1528 source_addr = control_msg_tmp->source_addr;
1529 register_size = control_msg_tmp->length;
1531 #if USE_LRTS_MEMPOOL
1532 if( control_msg_tmp->seq_id == 0 ){
1533 #if BLOCKING_SEND_CONTROL
1534 if (inbuff == 0 && IsMemHndlZero(GetMemHndl(source_addr))) {
1535 while (IsMemHndlZero(GetMemHndl(source_addr)) && buffered_send_msg + GetMempoolsize((void*)source_addr) >= MAX_BUFF_SEND)
1536 LrtsAdvanceCommunication(0);
1538 #endif
1539 if(IsMemHndlZero(GetMemHndl(source_addr))) //it is in mempool, it is possible to be de-registered by others
1541 msg = (void*)source_addr;
1542 if(buffered_send_msg + GetMempoolsize(msg) >= MAX_BUFF_SEND)
1544 if(!inbuff)
1545 buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, lmsg_tag);
1546 return GNI_RC_ERROR_NOMEM;
1548 //register the corresponding mempool
1549 status = registerMemory(GetMempoolBlockPtr(msg), GetMempoolsize(msg), &(GetMemHndl(msg)), rdma_rx_cqh);
1550 if(status == GNI_RC_SUCCESS)
1552 control_msg_tmp->source_mem_hndl = GetMemHndl(source_addr);
1554 }else
1556 control_msg_tmp->source_mem_hndl = GetMemHndl(source_addr);
1557 status = GNI_RC_SUCCESS;
1559 if(NoMsgInSend(source_addr))
1560 register_size = GetMempoolsize((void*)(source_addr));
1561 else
1562 register_size = 0;
1563 }else if(control_msg_tmp->seq_id >0) // BIG_MSG
1565 int offset = ONE_SEG*(control_msg_tmp->seq_id-1);
1566 source_addr += offset;
1567 size = control_msg_tmp->length;
1568 #if BLOCKING_SEND_CONTROL
1569 if (inbuff == 0 && IsMemHndlZero(control_msg_tmp->source_mem_hndl)) {
1570 while (IsMemHndlZero(control_msg_tmp->source_mem_hndl) && buffered_send_msg + size >= MAX_BUFF_SEND)
1571 LrtsAdvanceCommunication(0);
1573 #endif
1574 if (IsMemHndlZero(control_msg_tmp->source_mem_hndl)) {
1575 if(buffered_send_msg + size >= MAX_BUFF_SEND)
1577 if(!inbuff)
1578 buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, lmsg_tag);
1579 return GNI_RC_ERROR_NOMEM;
1581 status = registerMemory((void*)source_addr, ALIGN64(size), &(control_msg_tmp->source_mem_hndl), NULL);
1582 if(status == GNI_RC_SUCCESS) buffered_send_msg += ALIGN64(size);
1584 else
1586 status = GNI_RC_SUCCESS;
1588 register_size = 0;
1591 #if CMI_EXERT_SEND_LARGE_CAP
1592 if(SEND_large_pending >= SEND_large_cap)
1594 status = GNI_RC_ERROR_NOMEM;
1596 #endif
1598 if(status == GNI_RC_SUCCESS)
1600 status = send_smsg_message( queue, destNode, control_msg_tmp, CONTROL_MSG_SIZE, lmsg_tag, inbuff, smsg_ptr);
1601 if(status == GNI_RC_SUCCESS)
1603 #if CMI_EXERT_SEND_LARGE_CAP
1604 SEND_large_pending++;
1605 #endif
1606 buffered_send_msg += register_size;
1607 if(control_msg_tmp->seq_id == 0)
1609 IncreaseMsgInSend(source_addr);
1611 FreeControlMsg(control_msg_tmp);
1612 MACHSTATE5(8, "GO SMSG LARGE to %d (%d,%d,%d) tag=%d\n", destNode, buffered_send_msg, buffered_recv_msg, register_memory_size, lmsg_tag);
1613 }else
1614 status = GNI_RC_ERROR_RESOURCE;
1616 } else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1618 CmiAbort("Memory registor for large msg\n");
1619 }else
1621 status = GNI_RC_ERROR_NOMEM;
1622 if(!inbuff)
1623 buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, lmsg_tag);
1625 return status;
1626 #else
1627 MEMORY_REGISTER(onesided_hnd, nic_hndl,msg, ALIGN64(size), &(control_msg_tmp->source_mem_hndl), &omdh, NULL, status)
1628 if(status == GNI_RC_SUCCESS)
1630 status = send_smsg_message(queue, destNode, control_msg_tmp, CONTROL_MSG_SIZE, lmsg_tag, 0, NULL);
1631 if(status == GNI_RC_SUCCESS)
1633 FreeControlMsg(control_msg_tmp);
1635 } else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1637 CmiAbort("Memory registor for large msg\n");
1638 }else
1640 buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, lmsg_tag);
1642 return status;
1643 #endif
1646 inline void LrtsPrepareEnvelope(char *msg, int size)
1648 CmiSetMsgSize(msg, size);
1649 CMI_SET_CHECKSUM(msg, size);
1652 CmiCommHandle LrtsSendFunc(int destNode, int size, char *msg, int mode)
1654 gni_return_t status = GNI_RC_SUCCESS;
1655 uint8_t tag;
1656 CONTROL_MSG *control_msg_tmp;
1657 int oob = ( mode & OUT_OF_BAND);
1658 SMSG_QUEUE *queue;
1660 MACHSTATE5(8, "GO LrtsSendFn %d(%d) (%d,%d, %d) \n", destNode, size, buffered_send_msg, buffered_recv_msg, register_memory_size);
1661 #if CMK_USE_OOB
1662 queue = oob? &smsg_oob_queue : &smsg_queue;
1663 tag = oob? LMSG_OOB_INIT_TAG: LMSG_INIT_TAG;
1664 #else
1665 queue = &smsg_queue;
1666 tag = LMSG_INIT_TAG;
1667 #endif
1669 LrtsPrepareEnvelope(msg, size);
1671 #if PRINT_SYH
1672 printf("LrtsSendFn %d==>%d, size=%d\n", myrank, destNode, size);
1673 #endif
1675 #if CMK_SMP
1676 if(size <= SMSG_MAX_MSG)
1677 buffer_small_msgs(queue, msg, size, destNode, SMALL_DATA_TAG);
1678 else if (size < BIG_MSG) {
1679 control_msg_tmp = construct_control_msg(size, msg, 0);
1680 buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, tag);
1682 else {
1683 CmiSetMsgSeq(msg, 0);
1684 control_msg_tmp = construct_control_msg(size, msg, 1);
1685 buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, tag);
1687 #else //non-smp, smp(worker sending)
1688 if(size <= SMSG_MAX_MSG)
1690 if (GNI_RC_SUCCESS == send_smsg_message(queue, destNode, msg, size, SMALL_DATA_TAG, 0, NULL))
1691 CmiFree(msg);
1693 else if (size < BIG_MSG) {
1694 control_msg_tmp = construct_control_msg(size, msg, 0);
1695 send_large_messages(queue, destNode, control_msg_tmp, 0, NULL, tag);
1697 else {
1698 #if USE_LRTS_MEMPOOL
1699 CmiSetMsgSeq(msg, 0);
1700 control_msg_tmp = construct_control_msg(size, msg, 1);
1701 send_large_messages(queue, destNode, control_msg_tmp, 0, NULL, tag);
1702 #else
1703 control_msg_tmp = construct_control_msg(size, msg, 0);
1704 send_large_messages(queue, destNode, control_msg_tmp, 0, NULL, tag);
1705 #endif
1707 #endif
1708 return 0;
1711 void LrtsSyncListSendFn(int npes, int *pes, int len, char *msg)
1713 int i;
1714 #if CMK_BROADCAST_USE_CMIREFERENCE
1715 for(i=0;i<npes;i++) {
1716 if (pes[i] == CmiMyPe())
1717 CmiSyncSend(pes[i], len, msg);
1718 else {
1719 CmiReference(msg);
1720 CmiSyncSendAndFree(pes[i], len, msg);
1723 #else
1724 for(i=0;i<npes;i++) {
1725 CmiSyncSend(pes[i], len, msg);
1727 #endif
1730 CmiCommHandle LrtsAsyncListSendFn(int npes, int *pes, int len, char *msg)
1732 /* A better asynchronous implementation may be wanted, but at least it works */
1733 CmiSyncListSendFn(npes, pes, len, msg);
1734 return (CmiCommHandle) 0;
1737 void LrtsFreeListSendFn(int npes, int *pes, int len, char *msg)
1739 if (npes == 1) {
1740 CmiSyncSendAndFree(pes[0], len, msg);
1741 return;
1743 #if CMK_PERSISTENT_COMM
1744 if (CpvAccess(phs) && len > PERSIST_MIN_SIZE
1745 #if CMK_SMP
1746 && IS_PERSISTENT_MEMORY(msg)
1747 #endif
1749 int i;
1750 for(i=0;i<npes;i++) {
1751 if (pes[i] == CmiMyPe())
1752 CmiSyncSend(pes[i], len, msg);
1753 else {
1754 CmiReference(msg);
1755 CmiSyncSendAndFree(pes[i], len, msg);
1758 CmiFree(msg);
1759 return;
1761 #endif
1763 #if CMK_BROADCAST_USE_CMIREFERENCE
1764 CmiSyncListSendFn(npes, pes, len, msg);
1765 CmiFree(msg);
1766 #else
1767 int i;
1768 for(i=0;i<npes-1;i++) {
1769 CmiSyncSend(pes[i], len, msg);
1771 if (npes>0)
1772 CmiSyncSendAndFree(pes[npes-1], len, msg);
1773 else
1774 CmiFree(msg);
1775 #endif
1778 static void PumpDatagramConnection();
1779 static int event_SetupConnect = 111;
1780 static int event_PumpSmsg = 222 ;
1781 static int event_PumpTransaction = 333;
1782 static int event_PumpRdmaTransaction = 444;
1783 static int event_SendBufferSmsg = 484;
1784 static int event_SendFmaRdmaMsg = 555;
1785 static int event_AdvanceCommunication = 666;
1787 static void registerUserTraceEvents() {
1788 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1789 event_SetupConnect = traceRegisterUserEvent("setting up connections", -1 );
1790 event_PumpSmsg = traceRegisterUserEvent("Pump network small msgs", -1);
1791 event_PumpTransaction = traceRegisterUserEvent("Pump FMA/RDMA local transaction" , -1);
1792 event_PumpRdmaTransaction = traceRegisterUserEvent("Pump RDMA remote event" , -1);
1793 event_SendBufferSmsg = traceRegisterUserEvent("Sending buffered small msgs", -1);
1794 event_SendFmaRdmaMsg = traceRegisterUserEvent("Sending buffered fma/rdma transactions", -1);
1795 event_AdvanceCommunication = traceRegisterUserEvent("Worker thread in sending/receiving", -1);
1796 #endif
1799 static void ProcessDeadlock()
1801 static CmiUInt8 *ptr = NULL;
1802 static CmiUInt8 last = 0, mysum, sum;
1803 static int count = 0;
1804 gni_return_t status;
1805 int i;
1807 //printf("[%d] comm thread detected hang %d %d %d\n", CmiMyPe(), smsg_send_count, smsg_recv_count, count);
1808 //sweep_mempool(CpvAccess(mempool));
1809 if (ptr == NULL) ptr = (CmiUInt8*)malloc(mysize * sizeof(CmiUInt8));
1810 mysum = smsg_send_count + smsg_recv_count;
1811 MACHSTATE5(9,"Before allgather Progress Deadlock (%d,%d) (%d,%d)(%d)\n", buffered_send_msg, register_memory_size, last, sum, count);
1812 status = PMI_Allgather(&mysum,ptr,sizeof(CmiUInt8));
1813 GNI_RC_CHECK("PMI_Allgather", status);
1814 sum = 0;
1815 for (i=0; i<mysize; i++) sum+= ptr[i];
1816 if (last == 0 || sum == last)
1817 count++;
1818 else
1819 count = 0;
1820 last = sum;
1821 MACHSTATE5(9,"Progress Deadlock (%d,%d) (%d,%d)(%d)\n", buffered_send_msg, register_memory_size, last, sum, count);
1822 if (count == 2) {
1823 /* detected twice, it is a real deadlock */
1824 if (myrank == 0) {
1825 CmiPrintf("Charm++> Network progress engine appears to have stalled, possibly because registered memory limits have been exceeded or are too low. Try adjusting environment variables CHARM_UGNI_MEMPOOL_MAX and CHARM_UGNI_SEND_MAX (current limits are %lld and %lld).\n", MAX_REG_MEM, MAX_BUFF_SEND);
1826 CmiAbort("Fatal> Deadlock detected.");
1830 _detected_hang = 0;
1833 static void CheckProgress()
1835 if (smsg_send_count == last_smsg_send_count &&
1836 smsg_recv_count == last_smsg_recv_count )
1838 _detected_hang = 1;
1839 #if !CMK_SMP
1840 if (_detected_hang) ProcessDeadlock();
1841 #endif
1844 else {
1845 //MACHSTATE5(9,"--Check Progress %d(%d, %d) (%d,%d)\n", mycount, buffered_send_msg, register_memory_size, smsg_send_count, smsg_recv_count);
1846 last_smsg_send_count = smsg_send_count;
1847 last_smsg_recv_count = smsg_recv_count;
1848 _detected_hang = 0;
1852 static void set_limit()
1854 //if (!user_set_flag && CmiMyRank() == 0) {
1855 if (CmiMyRank() == 0) {
1856 int mynode = CmiPhysicalNodeID(CmiMyPe());
1857 int numpes = CmiNumPesOnPhysicalNode(mynode);
1858 int numprocesses = numpes / CmiMyNodeSize();
1859 MAX_REG_MEM = _totalmem / numprocesses;
1860 MAX_BUFF_SEND = MAX_REG_MEM / 2;
1861 if (CmiMyPe() == 0)
1862 printf("mem_max = %.2fM, send_max =%.2fM\n", MAX_REG_MEM/1024.0/1024, MAX_BUFF_SEND/1024./1024);
1863 if(CmiMyPe() == 0 && (smsg_memlen*mysize + _expand_mem > MAX_BUFF_SEND || smsg_memlen*mysize + _mempool_size > MAX_BUFF_SEND))
1865 printf("Charm++> FATAL ERROR your program has risk of hanging \n please try large page or use Dynamic smsg +useDynamicSmsg or contact Charm++ developers\n");
1866 CmiAbort("memory registration\n");
1871 void LrtsPostCommonInit(int everReturn)
1873 #if CMK_DIRECT
1874 CmiDirectInit();
1875 #endif
1876 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1877 CpvInitialize(double, projTraceStart);
1878 /* only PE 0 needs to care about registration (to generate sts file). */
1879 //if (CmiMyPe() == 0)
1881 registerMachineUserEventsFunction(&registerUserTraceEvents);
1883 #endif
1885 #if CMK_SMP
1886 CmiIdleState *s=CmiNotifyGetState();
1887 CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn)CmiNotifyBeginIdle,(void *)s);
1888 CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,(void *)s);
1889 #else
1890 CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,NULL);
1891 if (useDynamicSMSG)
1892 CcdCallOnConditionKeep(CcdPERIODIC_10ms, (CcdVoidFn) PumpDatagramConnection, NULL);
1893 #endif
1895 #if ! LARGEPAGE
1896 if (_checkProgress)
1897 #if CMK_SMP
1898 if (CmiMyRank() == 0)
1899 #endif
1900 CcdCallOnConditionKeep(CcdPERIODIC_2minute, (CcdVoidFn) CheckProgress, NULL);
1901 #endif
1903 #if !LARGEPAGE
1904 CcdCallOnCondition(CcdTOPOLOGY_AVAIL, (CcdVoidFn)set_limit, NULL);
1905 #endif
1908 /* this is called by worker thread */
1909 void LrtsPostNonLocal()
1911 #if 1
1913 #if CMK_SMP_TRACE_COMMTHREAD
1914 double startT, endT;
1915 #endif
1917 #if MULTI_THREAD_SEND
1918 if(mysize == 1) return;
1920 if (CmiMyRank() % 6 != 3) return;
1922 #if CMK_SMP_TRACE_COMMTHREAD
1923 traceEndIdle();
1924 startT = CmiWallTimer();
1925 #endif
1927 CmiMachineProgressImpl();
1929 #if CMK_SMP_TRACE_COMMTHREAD
1930 endT = CmiWallTimer();
1931 traceUserBracketEvent(event_AdvanceCommunication, startT, endT);
1932 traceBeginIdle();
1933 #endif
1935 #endif
1936 #endif
1939 /* Network progress function is used to poll the network when for
1940 messages. This flushes receive buffers on some implementations*/
1941 #if CMK_MACHINE_PROGRESS_DEFINED
1942 void CmiMachineProgressImpl() {
1943 #if ! CMK_SMP || MULTI_THREAD_SEND
1945 STATS_PUMPNETWORK_TIME(PumpNetworkSmsg());
1946 SEND_OOB_SMSG(smsg_oob_queue)
1947 PUMP_REMOTE_HIGHPRIORITY
1948 PUMP_LOCAL_HIGHPRIORITY
1949 POST_HIGHPRIORITY_RDMA
1951 #if 0
1952 #if CMK_WORKER_SINGLE_TASK
1953 if (CmiMyRank() % 6 == 0)
1954 #endif
1955 PumpNetworkSmsg();
1957 #if CMK_WORKER_SINGLE_TASK
1958 if (CmiMyRank() % 6 == 1)
1959 #endif
1960 PumpLocalTransactions(default_tx_cqh, default_tx_cq_lock);
1962 #if CMK_WORKER_SINGLE_TASK
1963 if (CmiMyRank() % 6 == 2)
1964 #endif
1965 PumpLocalTransactions(rdma_tx_cqh, rdma_tx_cq_lock);
1967 #if REMOTE_EVENT
1968 #if CMK_WORKER_SINGLE_TASK
1969 if (CmiMyRank() % 6 == 3)
1970 #endif
1971 PumpRemoteTransactions(rdma_rx_cqh); // rdma_rx_cqh
1972 #endif
1974 #if CMK_WORKER_SINGLE_TASK
1975 if (CmiMyRank() % 6 == 4)
1976 #endif
1978 #if CMK_USE_OOB
1979 SendBufferMsg(&smsg_oob_queue, NULL);
1980 SendBufferMsg(&smsg_queue, &smsg_oob_queue);
1981 #else
1982 SendBufferMsg(&smsg_queue, NULL);
1983 #endif
1986 #if CMK_WORKER_SINGLE_TASK
1987 if (CmiMyRank() % 6 == 5)
1988 #endif
1989 #if CMK_SMP
1990 STATS_SENDRDMAMSG_TIME(SendRdmaMsg(sendRdmaBuf));
1991 #else
1992 STATS_SENDRDMAMSG_TIME(SendRdmaMsg());
1993 #endif
1995 #endif
1996 #endif
1998 #endif
2001 /* useDynamicSMSG */
2002 static void PumpDatagramConnection()
2004 uint32_t remote_address;
2005 uint32_t remote_id;
2006 gni_return_t status;
2007 gni_post_state_t post_state;
2008 uint64_t datagram_id;
2009 int i;
2011 while ((status = GNI_PostDataProbeById(nic_hndl, &datagram_id)) == GNI_RC_SUCCESS)
2013 if (datagram_id >= mysize) { /* bound endpoint */
2014 int pe = datagram_id - mysize;
2015 CMI_GNI_LOCK(global_gni_lock)
2016 status = GNI_EpPostDataTestById( ep_hndl_array[pe], datagram_id, &post_state, &remote_address, &remote_id);
2017 CMI_GNI_UNLOCK(global_gni_lock)
2018 if(status == GNI_RC_SUCCESS && post_state == GNI_POST_COMPLETED)
2020 CmiAssert(remote_id == pe);
2021 status = GNI_SmsgInit(ep_hndl_array[pe], smsg_attr_vector_local[pe], smsg_attr_vector_remote[pe]);
2022 GNI_RC_CHECK("Dynamic SMSG Init", status);
2023 #if PRINT_SYH
2024 printf("[%d] ++ Dynamic SMSG setup [%d===>%d] done\n", myrank, myrank, pe);
2025 #endif
2026 CmiAssert(smsg_connected_flag[pe] == 1);
2027 smsg_connected_flag[pe] = 2;
2030 else { /* unbound ep */
2031 status = GNI_EpPostDataTestById( ep_hndl_unbound, datagram_id, &post_state, &remote_address, &remote_id);
2032 if(status == GNI_RC_SUCCESS && post_state == GNI_POST_COMPLETED)
2034 CmiAssert(remote_id<mysize);
2035 CmiAssert(smsg_connected_flag[remote_id] <= 0);
2036 status = GNI_SmsgInit(ep_hndl_array[remote_id], &send_smsg_attr, &recv_smsg_attr);
2037 GNI_RC_CHECK("Dynamic SMSG Init", status);
2038 #if PRINT_SYH
2039 printf("[%d] ++ Dynamic SMSG setup2 [%d===>%d] done\n", myrank, myrank, remote_id);
2040 #endif
2041 smsg_connected_flag[remote_id] = 2;
2043 alloc_smsg_attr(&send_smsg_attr);
2044 status = GNI_EpPostDataWId (ep_hndl_unbound, &send_smsg_attr, SMSG_ATTR_SIZE, &recv_smsg_attr, SMSG_ATTR_SIZE, myrank);
2045 GNI_RC_CHECK("post unbound datagram", status);
2051 /* pooling CQ to receive network message */
2052 static void PumpNetworkRdmaMsgs()
2054 gni_cq_entry_t event_data;
2055 gni_return_t status;
2059 inline
2060 static void bufferRdmaMsg(PCQueue bufferqueue, int inst_id, gni_post_descriptor_t *pd, int ack_index)
2062 RDMA_REQUEST *rdma_request_msg;
2063 MallocRdmaRequest(rdma_request_msg);
2064 rdma_request_msg->destNode = inst_id;
2065 rdma_request_msg->pd = pd;
2066 #if REMOTE_EVENT
2067 rdma_request_msg->ack_index = ack_index;
2068 #endif
2069 PCQueuePush(bufferqueue, (char*)rdma_request_msg);
2072 static void getLargeMsgRequest(void* header, uint64_t inst_id, uint8_t tag, PCQueue);
2074 static void PumpNetworkSmsg()
2076 uint64_t inst_id;
2077 gni_cq_entry_t event_data;
2078 gni_return_t status;
2079 void *header;
2080 uint8_t msg_tag;
2081 int msg_nbytes;
2082 void *msg_data;
2083 gni_mem_handle_t msg_mem_hndl;
2084 gni_smsg_attr_t *smsg_attr;
2085 gni_smsg_attr_t *remote_smsg_attr;
2086 int init_flag;
2087 CONTROL_MSG *control_msg_tmp, *header_tmp;
2088 uint64_t source_addr;
2089 SMSG_QUEUE *queue = &smsg_queue;
2090 PCQueue tmp_queue;
2091 #if CMK_DIRECT
2092 cmidirectMsg *direct_msg;
2093 #endif
2094 #if CMI_PUMPNETWORKSMSG_CAP
2095 int recv_cnt = 0;
2096 while(recv_cnt< PumpNetworkSmsg_cap) {
2097 #else
2098 while(1) {
2099 #endif
2100 CMI_GNI_LOCK(smsg_rx_cq_lock)
2101 status =GNI_CqGetEvent(smsg_rx_cqh, &event_data);
2102 CMI_GNI_UNLOCK(smsg_rx_cq_lock)
2103 if(status != GNI_RC_SUCCESS) break;
2105 inst_id = GNI_CQ_GET_INST_ID(event_data);
2106 #if REMOTE_EVENT
2107 inst_id = GET_RANK(inst_id); /* important */
2108 #endif
2109 // GetEvent returns success but GetNext return not_done. caused by Smsg out-of-order transfer
2110 #if PRINT_SYH
2111 printf("[%d] %d PumpNetworkMsgs is received from PE: %d, status=%s\n", myrank, CmiMyRank(), inst_id, gni_err_str[status]);
2112 #endif
2113 if (useDynamicSMSG) {
2114 /* subtle: smsg may come before connection is setup */
2115 while (smsg_connected_flag[inst_id] != 2)
2116 PumpDatagramConnection();
2118 msg_tag = GNI_SMSG_ANY_TAG;
2119 while(1) {
2120 CMI_GNI_LOCK(smsg_mailbox_lock)
2121 status = GNI_SmsgGetNextWTag(ep_hndl_array[inst_id], &header, &msg_tag);
2122 if (status != GNI_RC_SUCCESS)
2124 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2125 break;
2127 #if CMI_PUMPNETWORKSMSG_CAP
2128 recv_cnt++;
2129 #endif
2130 #if PRINT_SYH
2131 printf("[%d] from %d smsg msg is received, messageid: tag=%d\n", myrank, inst_id, msg_tag);
2132 #endif
2133 /* copy msg out and then put into queue (small message) */
2134 switch (msg_tag) {
2135 case SMALL_DATA_TAG:
2137 START_EVENT();
2138 msg_nbytes = CmiGetMsgSize(header);
2139 msg_data = CmiAlloc(msg_nbytes);
2140 memcpy(msg_data, (char*)header, msg_nbytes);
2141 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2142 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2143 TRACE_COMM_CREATION(EVENT_TIME(), msg_data);
2144 CMI_CHECK_CHECKSUM(msg_data, msg_nbytes);
2145 handleOneRecvedMsg(msg_nbytes, msg_data);
2146 break;
2148 case LMSG_INIT_TAG:
2149 case LMSG_OOB_INIT_TAG:
2151 tmp_queue = (msg_tag == LMSG_INIT_TAG)? sendRdmaBuf : sendHighPriorBuf;
2152 #if MULTI_THREAD_SEND
2153 MallocControlMsg(control_msg_tmp);
2154 memcpy(control_msg_tmp, header, CONTROL_MSG_SIZE);
2155 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2156 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2157 getLargeMsgRequest(control_msg_tmp, inst_id, msg_tag, tmp_queue);
2158 FreeControlMsg(control_msg_tmp);
2159 #else
2160 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2161 getLargeMsgRequest(header, inst_id, msg_tag, tmp_queue);
2162 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2163 #endif
2164 break;
2166 #if !REMOTE_EVENT && !CQWRITE
2167 case ACK_TAG: //msg fit into mempool
2169 /* Get is done, release message . Now put is not used yet*/
2170 void *msg = (void*)(((ACK_MSG *)header)->source_addr);
2171 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2172 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2173 #if ! USE_LRTS_MEMPOOL
2174 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((ACK_MSG *)header)->source_mem_hndl), &omdh, ((ACK_MSG *)header)->length);
2175 #else
2176 DecreaseMsgInSend(msg);
2177 #endif
2178 if(NoMsgInSend(msg))
2179 buffered_send_msg -= GetMempoolsize(msg);
2180 MACHSTATE5(8, "GO send done to %d (%d,%d, %d) tag=%d\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size, msg_tag);
2181 CmiFree(msg);
2182 #if CMI_EXERT_SEND_LARGE_CAP
2183 SEND_large_pending--;
2184 #endif
2185 break;
2187 #endif
2188 case BIG_MSG_TAG: //big msg, de-register, transfer next seg
2190 #if MULTI_THREAD_SEND
2191 MallocControlMsg(header_tmp);
2192 memcpy(header_tmp, header, CONTROL_MSG_SIZE);
2193 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2194 #else
2195 header_tmp = (CONTROL_MSG *) header;
2196 #endif
2197 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2198 #if CMI_EXERT_SEND_LARGE_CAP
2199 SEND_large_pending--;
2200 #endif
2201 void *msg = (void*)(header_tmp->source_addr);
2202 int cur_seq = CmiGetMsgSeq(msg);
2203 int offset = ONE_SEG*(cur_seq+1);
2204 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(header_tmp->source_mem_hndl), &omdh, header_tmp->length);
2205 buffered_send_msg -= header_tmp->length;
2206 int remain_size = CmiGetMsgSize(msg) - header_tmp->length;
2207 if (remain_size < 0) remain_size = 0;
2208 CmiSetMsgSize(msg, remain_size);
2209 if(remain_size <= 0) //transaction done
2211 CmiFree(msg);
2212 }else if (header_tmp->total_length > offset)
2214 CmiSetMsgSeq(msg, cur_seq+1);
2215 control_msg_tmp = construct_control_msg(header_tmp->total_length, msg, cur_seq+1+1);
2216 control_msg_tmp->dest_addr = header_tmp->dest_addr;
2217 //send next seg
2218 send_large_messages( queue, inst_id, control_msg_tmp, 0, NULL, LMSG_INIT_TAG);
2219 // pipelining
2220 if (header_tmp->seq_id == 1) {
2221 int i;
2222 for (i=1; i<BIG_MSG_PIPELINE; i++) {
2223 int seq = cur_seq+i+2;
2224 CmiSetMsgSeq(msg, seq-1);
2225 control_msg_tmp = construct_control_msg(header_tmp->total_length, (char *)msg, seq);
2226 control_msg_tmp->dest_addr = header_tmp->dest_addr;
2227 send_large_messages( queue, inst_id, control_msg_tmp, 0, NULL, LMSG_INIT_TAG);
2228 if (header_tmp->total_length <= ONE_SEG*seq) break;
2232 #if MULTI_THREAD_SEND
2233 FreeControlMsg(header_tmp);
2234 #else
2235 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2236 #endif
2237 break;
2239 #if CMK_PERSISTENT_COMM && !REMOTE_EVENT && !CQWRITE
2240 case PUT_DONE_TAG: { //persistent message
2241 void *msg = (void *)(((CONTROL_MSG *) header)->source_addr);
2242 int size = ((CONTROL_MSG *) header)->length;
2243 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2244 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2245 CmiReference(msg);
2246 CMI_CHECK_CHECKSUM(msg, size);
2247 handleOneRecvedMsg(size, msg);
2248 #if PRINT_SYH
2249 printf("[%d] PUT_DONE_TAG hand over one message, size: %d. \n", myrank, size);
2250 #endif
2251 break;
2253 #endif
2254 #if CMK_DIRECT
2255 case DIRECT_PUT_DONE_TAG: //cmi direct
2256 //create a trigger message
2257 direct_msg = (cmidirectMsg*)CmiAlloc(sizeof(cmidirectMsg));
2258 direct_msg->handler = ((CMK_DIRECT_HEADER*)header)->handler_addr;
2259 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2260 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2261 CmiSetHandler(direct_msg, CpvAccess(CmiHandleDirectIdx));
2262 CmiPushPE(((CmiDirectUserHandle*)direct_msg->handler)->remoteRank, direct_msg);
2263 //(*(((CMK_DIRECT_HEADER*) header)->callbackFnPtr))(((CMK_DIRECT_HEADER*) header)->callbackData);
2264 break;
2265 #endif
2266 default:
2267 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2268 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2269 printf("weird tag problem\n");
2270 CmiAbort("Unknown tag\n");
2271 } // end switch
2272 #if PRINT_SYH
2273 printf("[%d] from %d after switch request for smsg is received, messageid: tag=%d\n", myrank, inst_id, msg_tag);
2274 #endif
2275 smsg_recv_count ++;
2276 msg_tag = GNI_SMSG_ANY_TAG;
2277 } //endwhile GNI_SmsgGetNextWTag
2278 } //end while GetEvent
2279 if(status == GNI_RC_ERROR_RESOURCE)
2281 printf("charm> Please use +useRecvQueue 204800 in your command line, if the error comes again, increase this number\n");
2282 GNI_RC_CHECK("Smsg_rx_cq full", status);
2286 static void printDesc(gni_post_descriptor_t *pd)
2288 printf(" Descriptor (%p===>%p)(%d)\n", pd->local_addr, pd->remote_addr, pd->length);
2291 #if CQWRITE
2292 static void sendCqWrite(int destNode, uint64_t data, gni_mem_handle_t mem_hndl)
2294 gni_post_descriptor_t *pd;
2295 gni_return_t status = GNI_RC_SUCCESS;
2297 MallocPostDesc(pd);
2298 pd->type = GNI_POST_CQWRITE;
2299 pd->cq_mode = GNI_CQMODE_SILENT;
2300 //pd->cq_mode = GNI_CQMODE_GLOBAL_EVENT | GNI_CQMODE_REMOTE_EVENT ;
2301 pd->dlvr_mode = GNI_DLVMODE_PERFORMANCE;
2302 pd->cqwrite_value = data;
2303 pd->remote_mem_hndl = mem_hndl;
2304 status = GNI_PostCqWrite(ep_hndl_array[destNode], pd);
2305 GNI_RC_CHECK("GNI_PostCqWrite", status);
2307 #endif
2309 // register memory for a message
2310 // return mem handle
2311 static gni_return_t registerMessage(void *msg, int size, int seqno, gni_mem_handle_t *memh)
2313 gni_return_t status = GNI_RC_SUCCESS;
2315 if (!IsMemHndlZero(*memh)) return GNI_RC_SUCCESS;
2317 #if CMK_PERSISTENT_COMM
2318 // persistent message is always registered
2319 // BIG_MSG small pieces do not have malloc chunk header
2320 if (IS_PERSISTENT_MEMORY(msg)) {
2321 *memh = GetMemHndl(msg);
2322 return GNI_RC_SUCCESS;
2324 #endif
2325 if(seqno == 0
2326 #if CMK_PERSISTENT_COMM
2327 || seqno == PERSIST_SEQ
2328 #endif
2331 if(IsMemHndlZero((GetMemHndl(msg))))
2333 msg = (void*)(msg);
2334 status = registerMemory(GetMempoolBlockPtr(msg), GetMempoolsize(msg), &(GetMemHndl(msg)), rdma_rx_cqh);
2335 if(status == GNI_RC_SUCCESS)
2336 *memh = GetMemHndl(msg);
2338 else {
2339 *memh = GetMemHndl(msg);
2342 else {
2343 //big msg, can not fit into memory pool, or CmiDirect Msg (which is not from mempool)
2344 status = registerMemory(msg, size, memh, NULL);
2346 return status;
2349 // for BIG_MSG called on receiver side for receiving control message
2350 // LMSG_INIT_TAG
2351 static void getLargeMsgRequest(void* header, uint64_t inst_id, uint8_t tag, PCQueue bufferRdmaQueue )
2353 #if USE_LRTS_MEMPOOL
2354 CONTROL_MSG *request_msg;
2355 gni_return_t status = GNI_RC_SUCCESS;
2356 void *msg_data;
2357 gni_post_descriptor_t *pd;
2358 gni_mem_handle_t msg_mem_hndl;
2359 int size, transaction_size, offset = 0;
2360 size_t register_size = 0;
2362 // initial a get to transfer data from the sender side */
2363 request_msg = (CONTROL_MSG *) header;
2364 size = request_msg->total_length;
2365 MACHSTATE4(8, "GO Get request from %d (%d,%d, %d) \n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size);
2366 MallocPostDesc(pd);
2367 #if CMK_WITH_STATS
2368 pd->sync_flag_addr = 1000000 * CmiWallTimer(); //microsecond
2369 #endif
2370 if(request_msg->seq_id < 2) {
2371 #if CMK_SMP_TRACE_COMMTHREAD
2372 pd->sync_flag_addr = 1000000 * CmiWallTimer(); //microsecond
2373 #endif
2374 msg_data = CmiAlloc(size);
2375 CmiSetMsgSeq(msg_data, 0);
2376 _MEMCHECK(msg_data);
2378 else {
2379 offset = ONE_SEG*(request_msg->seq_id-1);
2380 msg_data = (char*)request_msg->dest_addr + offset;
2383 pd->cqwrite_value = request_msg->seq_id;
2385 transaction_size = request_msg->seq_id == 0? ALIGN64(size) : ALIGN64(request_msg->length);
2386 SetMemHndlZero(pd->local_mem_hndl);
2387 status = registerMessage(msg_data, transaction_size, request_msg->seq_id, &pd->local_mem_hndl);
2388 if (status == GNI_RC_SUCCESS && request_msg->seq_id == 0) {
2389 if(NoMsgInRecv( (void*)(msg_data)))
2390 register_size = GetMempoolsize((void*)(msg_data));
2393 pd->first_operand = ALIGN64(size); // total length
2395 if(request_msg->total_length <= LRTS_GNI_RDMA_THRESHOLD)
2396 pd->type = GNI_POST_FMA_GET;
2397 else
2398 pd->type = GNI_POST_RDMA_GET;
2399 pd->cq_mode = GNI_CQMODE_GLOBAL_EVENT;
2400 pd->dlvr_mode = GNI_DLVMODE_PERFORMANCE;
2401 pd->length = transaction_size;
2402 pd->local_addr = (uint64_t) msg_data;
2403 pd->remote_addr = request_msg->source_addr + offset;
2404 pd->remote_mem_hndl = request_msg->source_mem_hndl;
2406 if (tag == LMSG_OOB_INIT_TAG)
2407 pd->src_cq_hndl = highprior_rdma_tx_cqh;
2408 else
2410 #if MULTI_THREAD_SEND
2411 pd->src_cq_hndl = rdma_tx_cqh;
2412 #else
2413 pd->src_cq_hndl = 0;
2414 #endif
2417 pd->rdma_mode = 0;
2418 pd->amo_cmd = 0;
2419 #if CMI_EXERT_RECV_RDMA_CAP
2420 if(status == GNI_RC_SUCCESS && RDMA_pending >= RDMA_cap ) status = GNI_RC_ERROR_RESOURCE;
2421 #endif
2422 //memory registration success
2423 if(status == GNI_RC_SUCCESS && tag == LMSG_OOB_INIT_TAG )
2425 CmiNodeLock lock = pd->type == GNI_POST_RDMA_GET?rdma_tx_cq_lock:default_tx_cq_lock;
2426 CMI_GNI_LOCK(lock)
2427 #if REMOTE_EVENT
2428 if( request_msg->seq_id == 0)
2430 pd->cq_mode |= GNI_CQMODE_REMOTE_EVENT;
2431 int sts = GNI_EpSetEventData(ep_hndl_array[inst_id], inst_id, ACK_EVENT(request_msg->ack_index));
2432 GNI_RC_CHECK("GNI_EpSetEventData", sts);
2434 #endif
2436 #if CMK_WITH_STATS
2437 RDMA_TRY_SEND(pd->type)
2438 #endif
2439 if(pd->type == GNI_POST_RDMA_GET)
2441 status = GNI_PostRdma(ep_hndl_array[inst_id], pd);
2443 else
2445 status = GNI_PostFma(ep_hndl_array[inst_id], pd);
2447 CMI_GNI_UNLOCK(lock)
2449 if(status == GNI_RC_SUCCESS )
2451 #if CMI_EXERT_RECV_RDMA_CAP
2452 RDMA_pending++;
2453 #endif
2454 if(pd->cqwrite_value == 0)
2456 #if MACHINE_DEBUG_LOG
2457 buffered_recv_msg += register_size;
2458 MACHSTATE4(8, "GO request from %d (%d,%d, %d)\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size);
2459 #endif
2460 IncreaseMsgInRecv(msg_data);
2461 #if CMK_SMP_TRACE_COMMTHREAD
2462 pd->sync_flag_value = 1000000 * CmiWallTimer(); //microsecond
2463 #endif
2465 #if CMK_WITH_STATS
2466 pd->sync_flag_value = 1000000 * CmiWallTimer(); //microsecond
2467 RDMA_TRANS_INIT(pd->type, pd->sync_flag_addr/1000000.0)
2468 #endif
2470 }else if (status != GNI_RC_SUCCESS)
2472 SetMemHndlZero((pd->local_mem_hndl));
2474 if(status == GNI_RC_ERROR_RESOURCE|| status == GNI_RC_ERROR_NOMEM || tag != LMSG_OOB_INIT_TAG)
2476 #if REMOTE_EVENT
2477 bufferRdmaMsg(bufferRdmaQueue, inst_id, pd, request_msg->ack_index);
2478 #else
2479 bufferRdmaMsg(bufferRdmaQueue, inst_id, pd, -1);
2480 #endif
2481 }else if (status != GNI_RC_SUCCESS) {
2482 // printf("source: %d pd:(%p,%p)(%p,%p) len:%d local:%x remote:%x\n", (int)inst_id, (pd->local_mem_hndl).qword1, (pd->local_mem_hndl).qword2, (pd->remote_mem_hndl).qword1, (pd->remote_mem_hndl).qword2, pd->length, pd->local_addr, pd->remote_addr);
2483 GNI_RC_CHECK("GetLargeAFter posting", status);
2485 #else
2486 CONTROL_MSG *request_msg;
2487 gni_return_t status;
2488 void *msg_data;
2489 gni_post_descriptor_t *pd;
2490 RDMA_REQUEST *rdma_request_msg;
2491 gni_mem_handle_t msg_mem_hndl;
2492 //int source;
2493 // initial a get to transfer data from the sender side */
2494 request_msg = (CONTROL_MSG *) header;
2495 msg_data = CmiAlloc(request_msg->length);
2496 _MEMCHECK(msg_data);
2498 MEMORY_REGISTER(onesided_hnd, nic_hndl, msg_data, request_msg->length, &msg_mem_hndl, &omdh, NULL, status)
2500 if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
2502 GNI_RC_CHECK("Invalid/permission Mem Register in post", status);
2505 MallocPostDesc(pd);
2506 if(request_msg->length <= LRTS_GNI_RDMA_THRESHOLD)
2507 pd->type = GNI_POST_FMA_GET;
2508 else
2509 pd->type = GNI_POST_RDMA_GET;
2510 pd->cq_mode = GNI_CQMODE_GLOBAL_EVENT;// | GNI_CQMODE_REMOTE_EVENT;
2511 pd->dlvr_mode = GNI_DLVMODE_PERFORMANCE;
2512 pd->length = ALIGN64(request_msg->length);
2513 pd->local_addr = (uint64_t) msg_data;
2514 pd->remote_addr = request_msg->source_addr;
2515 pd->remote_mem_hndl = request_msg->source_mem_hndl;
2516 if (tag == LMSG_OOB_INIT_TAG)
2517 pd->src_cq_hndl = highprior_rdma_tx_cqh;
2518 else
2520 #if MULTI_THREAD_SEND
2521 pd->src_cq_hndl = rdma_tx_cqh;
2522 #else
2523 pd->src_cq_hndl = 0;
2524 #endif
2526 pd->rdma_mode = 0;
2527 pd->amo_cmd = 0;
2529 //memory registration successful
2530 if(status == GNI_RC_SUCCESS)
2532 pd->local_mem_hndl = msg_mem_hndl;
2534 if(pd->type == GNI_POST_RDMA_GET)
2536 CMI_GNI_LOCK(rdma_tx_cq_lock)
2537 status = GNI_PostRdma(ep_hndl_array[inst_id], pd);
2538 CMI_GNI_UNLOCK(rdma_tx_cq_lock)
2540 else
2542 CMI_GNI_LOCK(default_tx_cq_lock)
2543 status = GNI_PostFma(ep_hndl_array[inst_id], pd);
2544 CMI_GNI_UNLOCK(default_tx_cq_lock)
2547 }else
2549 SetMemHndlZero(pd->local_mem_hndl);
2551 if(status == GNI_RC_ERROR_RESOURCE|| status == GNI_RC_ERROR_NOMEM )
2553 MallocRdmaRequest(rdma_request_msg);
2554 rdma_request_msg->next = 0;
2555 rdma_request_msg->destNode = inst_id;
2556 rdma_request_msg->pd = pd;
2557 PCQueuePush(sendRdmaBuf, (char*)rdma_request_msg);
2558 }else {
2559 GNI_RC_CHECK("AFter posting", status);
2561 #endif
2564 #if CQWRITE
2565 static void PumpCqWriteTransactions()
2568 gni_cq_entry_t ev;
2569 gni_return_t status;
2570 void *msg;
2571 int msg_size;
2572 while(1) {
2573 //CMI_GNI_LOCK(my_cq_lock)
2574 status = GNI_CqGetEvent(rdma_rx_cqh, &ev);
2575 //CMI_GNI_UNLOCK(my_cq_lock)
2576 if(status != GNI_RC_SUCCESS) break;
2577 msg = (void*) ( GNI_CQ_GET_DATA(ev) & 0xFFFFFFFFFFFFL);
2578 #if CMK_PERSISTENT_COMM
2579 #if PRINT_SYH
2580 printf(" %d CQ write event %p\n", myrank, msg);
2581 #endif
2582 if (!IsMemHndlZero(MEMHFIELD(msg))) {
2583 #if PRINT_SYH
2584 printf(" %d Persistent CQ write event %p\n", myrank, msg);
2585 #endif
2586 CmiReference(msg);
2587 msg_size = CmiGetMsgSize(msg);
2588 CMI_CHECK_CHECKSUM(msg, msg_size);
2589 handleOneRecvedMsg(msg_size, msg);
2590 continue;
2592 #endif
2593 #if ! USE_LRTS_MEMPOOL
2594 // MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((ACK_MSG *)header)->source_mem_hndl), &omdh, ((ACK_MSG *)header)->length);
2595 #else
2596 DecreaseMsgInSend(msg);
2597 #endif
2598 if(NoMsgInSend(msg))
2599 buffered_send_msg -= GetMempoolsize(msg);
2600 CmiFree(msg);
2602 if(status == GNI_RC_ERROR_RESOURCE)
2604 GNI_RC_CHECK("rdma_rx_cq full too many ack", status);
2607 #endif
2609 #if REMOTE_EVENT
2610 static void PumpRemoteTransactions(gni_cq_handle_t rx_cqh)
2612 gni_cq_entry_t ev;
2613 gni_return_t status;
2614 void *msg;
2615 int inst_id, index, type, size;
2617 #if CMI_PUMPREMOTETRANSACTIONS_CAP
2618 int pump_count = 0;
2619 #endif
2620 while(1) {
2621 #if CMI_PUMPREMOTETRANSACTIONS_CAP
2622 if (pump_count > PumpRemoteTransactions_cap) break;
2623 #endif
2624 CMI_GNI_LOCK(global_gni_lock)
2625 // CMI_GNI_LOCK(rdma_tx_cq_lock)
2626 status = GNI_CqGetEvent(rx_cqh, &ev);
2627 // CMI_GNI_UNLOCK(rdma_tx_cq_lock)
2628 CMI_GNI_UNLOCK(global_gni_lock)
2630 if(status != GNI_RC_SUCCESS) break;
2632 #if CMI_PUMPREMOTETRANSACTIONS_CAP
2633 pump_count ++;
2634 #endif
2636 inst_id = GNI_CQ_GET_INST_ID(ev);
2637 index = GET_INDEX(inst_id);
2638 type = GET_TYPE(inst_id);
2639 switch (type) {
2640 case 0: // ACK
2641 CmiAssert(index>=0 && index<ackPool.size);
2642 CMI_GNI_LOCK(ackPool.lock);
2643 CmiAssert(GetIndexType(ackPool, index) == 1);
2644 msg = GetIndexAddress(ackPool, index);
2645 CMI_GNI_UNLOCK(ackPool.lock);
2646 #if PRINT_SYH
2647 printf("[%d] PumpRemoteTransactions: ack: %p index: %d type: %d.\n", myrank, GetMempoolBlockPtr(msg), index, type);
2648 #endif
2649 #if ! USE_LRTS_MEMPOOL
2650 // MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((ACK_MSG *)header)->source_mem_hndl), &omdh, ((ACK_MSG *)header)->length);
2651 #else
2652 DecreaseMsgInSend(msg);
2653 #endif
2654 if(NoMsgInSend(msg))
2655 buffered_send_msg -= GetMempoolsize(msg);
2656 CmiFree(msg);
2657 IndexPool_freeslot(&ackPool, index);
2658 #if CMI_EXERT_SEND_LARGE_CAP
2659 SEND_large_pending--;
2660 #endif
2661 break;
2662 #if CMK_PERSISTENT_COMM
2663 case 1: { // PERSISTENT
2664 CmiLock(persistPool.lock);
2665 CmiAssert(GetIndexType(persistPool, index) == 2);
2666 PersistentReceivesTable *slot = GetIndexAddress(persistPool, index);
2667 CmiUnlock(persistPool.lock);
2668 START_EVENT();
2669 msg = slot->destBuf[0].destAddress;
2670 size = CmiGetMsgSize(msg);
2671 CmiReference(msg);
2672 CMI_CHECK_CHECKSUM(msg, size);
2673 TRACE_COMM_CREATION(EVENT_TIME(), msg);
2674 handleOneRecvedMsg(size, msg);
2675 break;
2677 #endif
2678 default:
2679 fprintf(stderr, "[%d] PumpRemoteTransactions: unknown type: %d\n", myrank, type);
2680 CmiAbort("PumpRemoteTransactions: unknown type");
2683 if(status == GNI_RC_ERROR_RESOURCE)
2685 GNI_RC_CHECK("rdma_rx_cq full too many ack", status);
2688 #endif
2690 static void PumpLocalTransactions(gni_cq_handle_t my_tx_cqh, CmiNodeLock my_cq_lock)
2692 gni_cq_entry_t ev;
2693 gni_return_t status;
2694 uint64_t type, inst_id;
2695 gni_post_descriptor_t *tmp_pd;
2696 MSG_LIST *ptr;
2697 CONTROL_MSG *ack_msg_tmp;
2698 ACK_MSG *ack_msg;
2699 uint8_t msg_tag;
2700 #if CMK_DIRECT
2701 CMK_DIRECT_HEADER *cmk_direct_done_msg;
2702 #endif
2703 SMSG_QUEUE *queue = &smsg_queue;
2704 #if CMI_PUMPLOCALTRANSACTIONS_CAP
2705 int pump_count = 0;
2706 while(pump_count < PumpLocalTransactions_cap) {
2707 pump_count++;
2708 #else
2709 while(1) {
2710 #endif
2711 CMI_GNI_LOCK(my_cq_lock)
2712 status = GNI_CqGetEvent(my_tx_cqh, &ev);
2713 CMI_GNI_UNLOCK(my_cq_lock)
2714 if(status != GNI_RC_SUCCESS) break;
2716 type = GNI_CQ_GET_TYPE(ev);
2717 if (type == GNI_CQ_EVENT_TYPE_POST)
2720 #if CMI_EXERT_RECV_RDMA_CAP
2721 if(RDMA_pending <=0) CmiAbort(" pending error\n");
2722 RDMA_pending--;
2723 #endif
2724 inst_id = GNI_CQ_GET_INST_ID(ev);
2725 #if PRINT_SYH
2726 printf("[%d] LocalTransactions localdone=%d\n", myrank, lrts_local_done_msg);
2727 #endif
2728 CMI_GNI_LOCK(my_cq_lock)
2729 status = GNI_GetCompleted(my_tx_cqh, ev, &tmp_pd);
2730 CMI_GNI_UNLOCK(my_cq_lock)
2732 switch (tmp_pd->type) {
2733 #if CMK_PERSISTENT_COMM || CMK_DIRECT
2734 case GNI_POST_RDMA_PUT:
2735 #if CMK_PERSISTENT_COMM && ! USE_LRTS_MEMPOOL
2736 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length);
2737 #endif
2738 case GNI_POST_FMA_PUT:
2739 if(tmp_pd->amo_cmd == 1) {
2740 #if CMK_DIRECT
2741 //sender ACK to receiver to trigger it is done
2742 cmk_direct_done_msg = (CMK_DIRECT_HEADER*) malloc(sizeof(CMK_DIRECT_HEADER));
2743 cmk_direct_done_msg->handler_addr = tmp_pd->first_operand;
2744 msg_tag = DIRECT_PUT_DONE_TAG;
2745 #endif
2747 else {
2748 CmiFree((void *)tmp_pd->local_addr);
2749 #if REMOTE_EVENT
2750 FreePostDesc(tmp_pd);
2751 continue;
2752 #elif CQWRITE
2753 sendCqWrite(inst_id, tmp_pd->remote_addr, tmp_pd->remote_mem_hndl);
2754 FreePostDesc(tmp_pd);
2755 continue;
2756 #else
2757 MallocControlMsg(ack_msg_tmp);
2758 ack_msg_tmp->source_addr = tmp_pd->remote_addr;
2759 ack_msg_tmp->source_mem_hndl = tmp_pd->remote_mem_hndl;
2760 ack_msg_tmp->length = tmp_pd->length;
2761 msg_tag = PUT_DONE_TAG;
2762 #endif
2764 break;
2765 #endif
2766 case GNI_POST_RDMA_GET:
2767 case GNI_POST_FMA_GET: {
2768 #if ! USE_LRTS_MEMPOOL
2769 MallocControlMsg(ack_msg_tmp);
2770 ack_msg_tmp->source_addr = tmp_pd->remote_addr;
2771 ack_msg_tmp->source_mem_hndl = tmp_pd->remote_mem_hndl;
2772 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length)
2773 msg_tag = ACK_TAG;
2774 #else
2775 #if CMK_WITH_STATS
2776 RDMA_TRANS_DONE(tmp_pd->sync_flag_value/1000000.0)
2777 #endif
2778 int seq_id = tmp_pd->cqwrite_value;
2779 if(seq_id > 0) // BIG_MSG
2781 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length);
2782 MallocControlMsg(ack_msg_tmp);
2783 ack_msg_tmp->source_addr = tmp_pd->remote_addr;
2784 ack_msg_tmp->source_mem_hndl = tmp_pd->remote_mem_hndl;
2785 ack_msg_tmp->seq_id = seq_id;
2786 ack_msg_tmp->dest_addr = tmp_pd->local_addr - ONE_SEG*(ack_msg_tmp->seq_id-1);
2787 ack_msg_tmp->source_addr -= ONE_SEG*(ack_msg_tmp->seq_id-1);
2788 ack_msg_tmp->length = tmp_pd->length;
2789 ack_msg_tmp->total_length = tmp_pd->first_operand; // total size
2790 msg_tag = BIG_MSG_TAG;
2792 else
2794 msg_tag = ACK_TAG;
2795 #if !REMOTE_EVENT && !CQWRITE
2796 MallocAckMsg(ack_msg);
2797 ack_msg->source_addr = tmp_pd->remote_addr;
2798 #endif
2800 #endif
2801 break;
2803 case GNI_POST_CQWRITE:
2804 FreePostDesc(tmp_pd);
2805 continue;
2806 default:
2807 CmiPrintf("type=%d\n", tmp_pd->type);
2808 CmiAbort("PumpLocalTransactions: unknown type!");
2809 } /* end of switch */
2811 #if CMK_DIRECT
2812 if (tmp_pd->amo_cmd == 1) {
2813 status = send_smsg_message(queue, inst_id, cmk_direct_done_msg, sizeof(CMK_DIRECT_HEADER), msg_tag, 0, NULL);
2814 if (status == GNI_RC_SUCCESS) free(cmk_direct_done_msg);
2816 else
2817 #endif
2818 if (msg_tag == ACK_TAG) {
2819 #if !REMOTE_EVENT
2820 #if !CQWRITE
2821 status = send_smsg_message(queue, inst_id, ack_msg, ACK_MSG_SIZE, msg_tag, 0, NULL);
2822 if (status == GNI_RC_SUCCESS) FreeAckMsg(ack_msg);
2823 #else
2824 sendCqWrite(inst_id, tmp_pd->remote_addr, tmp_pd->remote_mem_hndl);
2825 #endif
2826 #endif
2828 else {
2829 status = send_smsg_message(queue, inst_id, ack_msg_tmp, CONTROL_MSG_SIZE, msg_tag, 0, NULL);
2830 if (status == GNI_RC_SUCCESS) FreeControlMsg(ack_msg_tmp);
2832 #if CMK_PERSISTENT_COMM
2833 if (tmp_pd->type == GNI_POST_RDMA_GET || tmp_pd->type == GNI_POST_FMA_GET)
2834 #endif
2836 if( msg_tag == ACK_TAG){ //msg fit in mempool
2837 #if PRINT_SYH
2838 printf("PumpLocalTransactions: Normal msg transaction PE:%d==>%d\n", myrank, inst_id);
2839 #endif
2840 TRACE_COMM_CONTROL_CREATION((double)(tmp_pd->sync_flag_addr/1000000.0), (double)((tmp_pd->sync_flag_addr+1)/1000000.0), (double)((tmp_pd->sync_flag_addr+1)/1000000.0), (void*)tmp_pd->local_addr);
2841 TRACE_COMM_CONTROL_CREATION((double)(tmp_pd->sync_flag_value/1000000.0), (double)((tmp_pd->sync_flag_value+1)/1000000.0), (double)((tmp_pd->sync_flag_value+1)/1000000.0), (void*)tmp_pd->local_addr);
2843 START_EVENT();
2844 CmiAssert(SIZEFIELD((void*)(tmp_pd->local_addr)) <= tmp_pd->length);
2845 DecreaseMsgInRecv((void*)tmp_pd->local_addr);
2846 #if MACHINE_DEBUG_LOG
2847 if(NoMsgInRecv((void*)(tmp_pd->local_addr)))
2848 buffered_recv_msg -= GetMempoolsize((void*)(tmp_pd->local_addr));
2849 MACHSTATE5(8, "GO Recv done ack send from %d (%d,%d, %d) tag=%d\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size, msg_tag);
2850 #endif
2851 TRACE_COMM_CREATION(EVENT_TIME(), (void*)tmp_pd->local_addr);
2852 CMI_CHECK_CHECKSUM((void*)tmp_pd->local_addr, tmp_pd->length);
2853 handleOneRecvedMsg(tmp_pd->length, (void*)tmp_pd->local_addr);
2854 }else if(msg_tag == BIG_MSG_TAG){
2855 void *msg = (char*)tmp_pd->local_addr-(tmp_pd->cqwrite_value-1)*ONE_SEG;
2856 CmiSetMsgSeq(msg, CmiGetMsgSeq(msg)+1);
2857 if (tmp_pd->first_operand <= ONE_SEG*CmiGetMsgSeq(msg)) {
2858 START_EVENT();
2859 #if PRINT_SYH
2860 printf("Pipeline msg done [%d]\n", myrank);
2861 #endif
2862 #if CMK_SMP_TRACE_COMMTHREAD
2863 if( tmp_pd->cqwrite_value == 1)
2864 TRACE_COMM_CONTROL_CREATION((double)(tmp_pd->sync_flag_addr/1000000.0), (double)((tmp_pd->sync_flag_addr+1)/1000000.0), (double)((tmp_pd->sync_flag_addr+2)/1000000.0), (void*)tmp_pd->local_addr);
2865 #endif
2866 TRACE_COMM_CREATION(EVENT_TIME(), msg);
2867 CMI_CHECK_CHECKSUM(msg, tmp_pd->first_operand);
2868 handleOneRecvedMsg(tmp_pd->first_operand, msg);
2872 FreePostDesc(tmp_pd);
2874 } //end while
2875 if(status == GNI_RC_ERROR_RESOURCE)
2877 printf("charm> Please use +useSendQueue 204800 in your command line, if the error comes again, increase this number\n");
2878 GNI_RC_CHECK("Smsg_tx_cq full", status);
2882 static void SendRdmaMsg( BufferList sendqueue)
2884 gni_return_t status = GNI_RC_SUCCESS;
2885 gni_mem_handle_t msg_mem_hndl;
2886 RDMA_REQUEST *ptr = 0, *tmp_ptr;
2887 RDMA_REQUEST *pre = 0;
2888 uint64_t register_size = 0;
2889 void *msg;
2890 int i;
2892 int len = PCQueueLength(sendqueue);
2893 for (i=0; i<len; i++)
2895 #if CMI_EXERT_RECV_RDMA_CAP
2896 if( RDMA_pending >= RDMA_cap) break;
2897 #endif
2898 CMI_PCQUEUEPOP_LOCK( sendqueue)
2899 ptr = (RDMA_REQUEST*)PCQueuePop(sendqueue);
2900 CMI_PCQUEUEPOP_UNLOCK( sendqueue)
2901 if (ptr == NULL) break;
2903 MACHSTATE4(8, "noempty-rdma %d (%lld,%lld,%d) \n", ptr->destNode, buffered_send_msg, buffered_recv_msg, register_memory_size);
2904 gni_post_descriptor_t *pd = ptr->pd;
2906 msg = (void*)(pd->local_addr);
2907 status = registerMessage(msg, pd->length, pd->cqwrite_value, &pd->local_mem_hndl);
2908 register_size = 0;
2909 if(pd->cqwrite_value == 0) {
2910 if(NoMsgInRecv(msg))
2911 register_size = GetMempoolsize(msg);
2914 if(status == GNI_RC_SUCCESS) //mem register good
2916 int destNode = ptr->destNode;
2917 CmiNodeLock lock = (pd->type == GNI_POST_RDMA_GET || pd->type == GNI_POST_RDMA_PUT) ? rdma_tx_cq_lock:default_tx_cq_lock;
2918 CMI_GNI_LOCK(lock);
2919 #if REMOTE_EVENT
2920 if( pd->cqwrite_value == 0) {
2921 pd->cq_mode |= GNI_CQMODE_REMOTE_EVENT;
2922 int sts = GNI_EpSetEventData(ep_hndl_array[destNode], destNode, ACK_EVENT(ptr->ack_index));
2923 GNI_RC_CHECK("GNI_EpSetEventData", sts);
2925 #if CMK_PERSISTENT_COMM
2926 else if (pd->cqwrite_value == PERSIST_SEQ) {
2927 pd->cq_mode |= GNI_CQMODE_REMOTE_EVENT;
2928 int sts = GNI_EpSetEventData(ep_hndl_array[destNode], destNode, PERSIST_EVENT(ptr->ack_index));
2929 GNI_RC_CHECK("GNI_EpSetEventData", sts);
2931 #endif
2932 #endif
2933 #if CMK_WITH_STATS
2934 RDMA_TRY_SEND(pd->type)
2935 #endif
2936 #if CMK_SMP_TRACE_COMMTHREAD
2937 if(IS_PUT(pd->type))
2939 START_EVENT();
2940 TRACE_COMM_CREATION(EVENT_TIME(), (void*)pd->local_addr);//based on assumption, post always succeeds on first try
2942 #endif
2944 if(pd->type == GNI_POST_RDMA_GET || pd->type == GNI_POST_RDMA_PUT)
2946 status = GNI_PostRdma(ep_hndl_array[destNode], pd);
2948 else
2950 status = GNI_PostFma(ep_hndl_array[destNode], pd);
2952 CMI_GNI_UNLOCK(lock);
2954 if(status == GNI_RC_SUCCESS) //post good
2956 #if CMI_EXERT_RECV_RDMA_CAP
2957 RDMA_pending ++;
2958 #endif
2959 if(pd->cqwrite_value == 0)
2961 #if CMK_SMP_TRACE_COMMTHREAD
2962 pd->sync_flag_value = 1000000 * CmiWallTimer(); //microsecond
2963 #endif
2964 IncreaseMsgInRecv(((void*)(pd->local_addr)));
2966 #if CMK_WITH_STATS
2967 pd->sync_flag_value = 1000000 * CmiWallTimer(); //microsecond
2968 RDMA_TRANS_INIT(pd->type, pd->sync_flag_addr/1000000.0)
2969 #endif
2970 #if MACHINE_DEBUG_LOG
2971 buffered_recv_msg += register_size;
2972 MACHSTATE(8, "GO request from buffered\n");
2973 #endif
2974 #if PRINT_SYH
2975 printf("[%d] SendRdmaMsg: post succeed. seqno: %x\n", myrank, pd->cqwrite_value);
2976 #endif
2977 }else // cannot post
2979 PCQueuePush(sendRdmaBuf, (char*)ptr);
2980 #if PRINT_SYH
2981 printf("[%d] SendRdmaMsg: post failed. seqno: %x dest: %d local mhdl: %lld %lld remote mhdl: %lld %lld connect: %d\n", myrank, pd->cqwrite_value, destNode, pd->local_mem_hndl.qword1, pd->local_mem_hndl.qword2, pd->remote_mem_hndl.qword1, pd->remote_mem_hndl.qword2, smsg_connected_flag[destNode]);
2982 #endif
2983 break;
2985 } else //memory registration fails
2987 PCQueuePush(sendqueue, (char*)ptr);
2989 } //end while
2992 static
2993 inline gni_return_t _sendOneBufferedSmsg(SMSG_QUEUE *queue, MSG_LIST *ptr)
2995 CONTROL_MSG *control_msg_tmp;
2996 gni_return_t status = GNI_RC_ERROR_RESOURCE;
2998 MACHSTATE5(8, "noempty-smsg %d (%d,%d,%d) tag=%d \n", ptr->destNode, buffered_send_msg, buffered_recv_msg, register_memory_size, ptr->tag);
2999 if (useDynamicSMSG && smsg_connected_flag[ptr->destNode] != 2) {
3000 /* connection not exists yet */
3001 #if CMK_SMP
3002 /* non-smp case, connect is issued in send_smsg_message */
3003 if (smsg_connected_flag[ptr->destNode] == 0)
3004 connect_to(ptr->destNode);
3005 #endif
3007 else
3008 switch(ptr->tag)
3010 case SMALL_DATA_TAG:
3011 status = send_smsg_message(queue, ptr->destNode, ptr->msg, ptr->size, ptr->tag, 1, ptr);
3012 if(status == GNI_RC_SUCCESS)
3014 CmiFree(ptr->msg);
3016 break;
3017 case LMSG_INIT_TAG:
3018 case LMSG_OOB_INIT_TAG:
3019 control_msg_tmp = (CONTROL_MSG*)ptr->msg;
3020 status = send_large_messages(queue, ptr->destNode, control_msg_tmp, 1, ptr, ptr->tag);
3021 break;
3022 #if !REMOTE_EVENT && !CQWRITE
3023 case ACK_TAG:
3024 status = send_smsg_message(queue, ptr->destNode, ptr->msg, ptr->size, ptr->tag, 1, ptr);
3025 if(status == GNI_RC_SUCCESS) FreeAckMsg((ACK_MSG*)ptr->msg);
3026 break;
3027 #endif
3028 case BIG_MSG_TAG:
3029 status = send_smsg_message(queue, ptr->destNode, ptr->msg, ptr->size, ptr->tag, 1, ptr);
3030 if(status == GNI_RC_SUCCESS)
3032 FreeControlMsg((CONTROL_MSG*)ptr->msg);
3034 break;
3035 #if CMK_PERSISTENT_COMM && !REMOTE_EVENT && !CQWRITE
3036 case PUT_DONE_TAG:
3037 status = send_smsg_message(queue, ptr->destNode, ptr->msg, ptr->size, ptr->tag, 1, ptr);
3038 if(status == GNI_RC_SUCCESS)
3040 FreeControlMsg((CONTROL_MSG*)ptr->msg);
3042 break;
3043 #endif
3044 #if CMK_DIRECT
3045 case DIRECT_PUT_DONE_TAG:
3046 status = send_smsg_message(queue, ptr->destNode, ptr->msg, sizeof(CMK_DIRECT_HEADER), ptr->tag, 1, ptr);
3047 if(status == GNI_RC_SUCCESS)
3049 free((CMK_DIRECT_HEADER*)ptr->msg);
3051 break;
3052 #endif
3053 default:
3054 printf("Weird tag\n");
3055 CmiAbort("should not happen\n");
3056 } // end switch
3057 return status;
3060 // return 1 if all messages are sent
3062 #if ONE_SEND_QUEUE
3064 static int SendBufferMsg(SMSG_QUEUE *queue, SMSG_QUEUE *prio_queue)
3066 MSG_LIST *ptr, *tmp_ptr, *pre=0, *current_head;
3067 CONTROL_MSG *control_msg_tmp;
3068 gni_return_t status;
3069 int done = 1;
3070 uint64_t register_size;
3071 void *register_addr;
3072 int index_previous = -1;
3073 #if CMI_SENDBUFFERSMSG_CAP
3074 int sent_length = 0;
3075 #endif
3076 int index = 0;
3077 memset(destpe_avail, 0, mysize * sizeof(char));
3078 for (index=0; index<1; index++)
3080 int i, len = PCQueueLength(queue->sendMsgBuf);
3081 for (i=0; i<len; i++)
3083 CMI_PCQUEUEPOP_LOCK(queue->sendMsgBuf)
3084 ptr = (MSG_LIST*)PCQueuePop(queue->sendMsgBuf);
3085 CMI_PCQUEUEPOP_UNLOCK(queue->sendMsgBuf)
3086 if(ptr == NULL) break;
3087 if (destpe_avail[ptr->destNode] == 1) { /* can't send to this pe */
3088 PCQueuePush(queue->sendMsgBuf, (char*)ptr);
3089 continue;
3091 status = _sendOneBufferedSmsg(queue, ptr);
3092 #if CMI_SENDBUFFERSMSG_CAP
3093 sent_length++;
3094 #endif
3095 if(status == GNI_RC_SUCCESS)
3097 #if PRINT_SYH
3098 buffered_smsg_counter--;
3099 printf("[%d==>%d] buffered smsg sending done\n", myrank, ptr->destNode);
3100 #endif
3101 FreeMsgList(ptr);
3102 }else {
3103 PCQueuePush(queue->sendMsgBuf, (char*)ptr);
3104 done = 0;
3105 if(status == GNI_RC_ERROR_RESOURCE)
3107 destpe_avail[ptr->destNode] = 1;
3110 } //end while
3111 } // end pooling for all cores
3112 return done;
3115 #else /* ! ONE_SEND_QUEUE */
3117 static int SendBufferMsg(SMSG_QUEUE *queue, SMSG_QUEUE *prio_queue)
3119 MSG_LIST *ptr;
3120 gni_return_t status;
3121 int done = 1;
3122 #if CMI_SENDBUFFERSMSG_CAP
3123 int sent_length = 0;
3124 #endif
3125 int idx;
3126 #if SMP_LOCKS
3127 int index = -1;
3128 int nonempty = PCQueueLength(queue->nonEmptyQueues);
3129 for(idx =0; idx<nonempty; idx++)
3131 index++; if (index >= nonempty) index = 0;
3132 #if CMI_SENDBUFFERSMSG_CAP
3133 if ( sent_length >= SendBufferMsg_cap) { done = 0; return done;}
3134 #endif
3135 CMI_PCQUEUEPOP_LOCK(queue->nonEmptyQueues)
3136 MSG_LIST_INDEX *current_list = (MSG_LIST_INDEX *)PCQueuePop(queue->nonEmptyQueues);
3137 CMI_PCQUEUEPOP_UNLOCK(queue->nonEmptyQueues)
3138 if(current_list == NULL) break;
3139 if (prio_queue && PCQueueLength(prio_queue->smsg_msglist_index[current_list->destpe].sendSmsgBuf) != 0) {
3140 PCQueuePush(queue->nonEmptyQueues, (char*)current_list);
3141 continue;
3143 PCQueue current_queue= current_list->sendSmsgBuf;
3144 CmiLock(current_list->lock);
3145 int i, len = PCQueueLength(current_queue);
3146 current_list->pushed = 0;
3147 CmiUnlock(current_list->lock);
3148 #else /* ! SMP_LOCKS */
3149 static int index = -1;
3150 for(idx =0; idx<mysize; idx++)
3152 index++; if (index == mysize) index = 0;
3153 #if CMI_SENDBUFFERSMSG_CAP
3154 if ( sent_length >= SendBufferMsg_cap) { done = 0; return done;}
3155 #endif
3156 if (prio_queue && PCQueueLength(prio_queue->smsg_msglist_index[index].sendSmsgBuf) != 0) continue; // check urgent queue
3157 //if (index == myrank) continue;
3158 PCQueue current_queue = queue->smsg_msglist_index[index].sendSmsgBuf;
3159 int i, len = PCQueueLength(current_queue);
3160 #endif
3161 for (i=0; i<len; i++) {
3162 CMI_PCQUEUEPOP_LOCK(current_queue)
3163 ptr = (MSG_LIST*)PCQueuePop(current_queue);
3164 CMI_PCQUEUEPOP_UNLOCK(current_queue)
3165 if (ptr == 0) break;
3167 status = _sendOneBufferedSmsg(queue, ptr);
3168 #if CMI_SENDBUFFERSMSG_CAP
3169 sent_length++;
3170 #endif
3171 if(status == GNI_RC_SUCCESS)
3173 #if PRINT_SYH
3174 buffered_smsg_counter--;
3175 printf("[%d==>%d] buffered smsg sending done\n", myrank, ptr->destNode);
3176 #endif
3177 FreeMsgList(ptr);
3178 }else {
3179 PCQueuePush(current_queue, (char*)ptr);
3180 done = 0;
3181 if(status == GNI_RC_ERROR_RESOURCE)
3183 break;
3186 } //end for i
3187 #if SMP_LOCKS
3188 CmiLock(current_list->lock);
3189 if(!PCQueueEmpty(current_queue) && current_list->pushed == 0)
3191 current_list->pushed = 1;
3192 PCQueuePush(queue->nonEmptyQueues, (char*)current_list);
3194 CmiUnlock(current_list->lock);
3195 #endif
3196 } // end pooling for all cores
3197 return done;
3200 #endif
3202 static void ProcessDeadlock();
3203 void LrtsAdvanceCommunication(int whileidle)
3205 static int count = 0;
3206 /* Receive Msg first */
3207 #if CMK_SMP_TRACE_COMMTHREAD
3208 double startT, endT;
3209 #endif
3210 if (useDynamicSMSG && whileidle)
3212 #if CMK_SMP_TRACE_COMMTHREAD
3213 startT = CmiWallTimer();
3214 #endif
3215 STATS_PUMPDATAGRAMCONNECTION_TIME(PumpDatagramConnection());
3216 #if CMK_SMP_TRACE_COMMTHREAD
3217 endT = CmiWallTimer();
3218 if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_SetupConnect, startT, endT);
3219 #endif
3222 SEND_OOB_SMSG(smsg_oob_queue)
3223 PUMP_REMOTE_HIGHPRIORITY
3224 PUMP_LOCAL_HIGHPRIORITY
3225 POST_HIGHPRIORITY_RDMA
3226 // Receiving small messages and persistent
3227 #if CMK_SMP_TRACE_COMMTHREAD
3228 startT = CmiWallTimer();
3229 #endif
3230 STATS_PUMPNETWORK_TIME(PumpNetworkSmsg());
3231 #if CMK_SMP_TRACE_COMMTHREAD
3232 endT = CmiWallTimer();
3233 if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_PumpSmsg, startT, endT);
3234 #endif
3236 SEND_OOB_SMSG(smsg_oob_queue)
3237 PUMP_REMOTE_HIGHPRIORITY
3238 PUMP_LOCAL_HIGHPRIORITY
3239 POST_HIGHPRIORITY_RDMA
3241 ///* Send buffered Message */
3242 #if CMK_SMP_TRACE_COMMTHREAD
3243 startT = CmiWallTimer();
3244 #endif
3245 #if CMK_USE_OOB
3246 STATS_SEND_SMSGS_TIME(SendBufferMsg(&smsg_queue, &smsg_oob_queue));
3247 #else
3248 STATS_SEND_SMSGS_TIME(SendBufferMsg(&smsg_queue, NULL));
3249 #endif
3250 #if CMK_SMP_TRACE_COMMTHREAD
3251 endT = CmiWallTimer();
3252 if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_SendBufferSmsg, startT, endT);
3253 #endif
3255 SEND_OOB_SMSG(smsg_oob_queue)
3256 PUMP_REMOTE_HIGHPRIORITY
3257 PUMP_LOCAL_HIGHPRIORITY
3258 POST_HIGHPRIORITY_RDMA
3260 //Pump Get messages or PUT messages
3261 #if CMK_SMP_TRACE_COMMTHREAD
3262 startT = CmiWallTimer();
3263 #endif
3264 PumpLocalTransactions(default_tx_cqh, default_tx_cq_lock);
3265 #if MULTI_THREAD_SEND
3266 STATS_PUMPLOCALTRANSACTIONS_RDMA_TIME(PumpLocalTransactions(rdma_tx_cqh, rdma_tx_cq_lock));
3267 #endif
3268 #if CMK_SMP_TRACE_COMMTHREAD
3269 endT = CmiWallTimer();
3270 if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_PumpTransaction, startT, endT);
3271 #endif
3273 SEND_OOB_SMSG(smsg_oob_queue)
3274 PUMP_REMOTE_HIGHPRIORITY
3275 PUMP_LOCAL_HIGHPRIORITY
3276 POST_HIGHPRIORITY_RDMA
3277 //Pump Remote event
3278 #if CMK_SMP_TRACE_COMMTHREAD
3279 startT = CmiWallTimer();
3280 #endif
3281 #if CQWRITE
3282 PumpCqWriteTransactions();
3283 #endif
3284 #if REMOTE_EVENT
3285 STATS_PUMPREMOTETRANSACTIONS_TIME(PumpRemoteTransactions(rdma_rx_cqh));
3286 #endif
3287 #if CMK_SMP_TRACE_COMMTHREAD
3288 endT = CmiWallTimer();
3289 if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_PumpRdmaTransaction, startT, endT);
3290 #endif
3292 SEND_OOB_SMSG(smsg_oob_queue)
3293 PUMP_REMOTE_HIGHPRIORITY
3294 PUMP_LOCAL_HIGHPRIORITY
3295 POST_HIGHPRIORITY_RDMA
3297 #if CMK_SMP_TRACE_COMMTHREAD
3298 startT = CmiWallTimer();
3299 #endif
3300 STATS_SENDRDMAMSG_TIME(SendRdmaMsg(sendRdmaBuf));
3301 #if CMK_SMP_TRACE_COMMTHREAD
3302 endT = CmiWallTimer();
3303 if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_SendFmaRdmaMsg, startT, endT);
3304 #endif
3306 #if CMK_SMP && ! LARGEPAGE
3307 if (_detected_hang) ProcessDeadlock();
3308 #endif
3311 static void set_smsg_max()
3313 char *env;
3315 if(mysize <=512)
3317 SMSG_MAX_MSG = 1024;
3318 }else if (mysize <= 4096)
3320 SMSG_MAX_MSG = 1024;
3321 }else if (mysize <= 16384)
3323 SMSG_MAX_MSG = 512;
3324 }else {
3325 SMSG_MAX_MSG = 256;
3328 env = getenv("CHARM_UGNI_SMSG_MAX_SIZE");
3329 if (env) SMSG_MAX_MSG = atoi(env);
3330 CmiAssert(SMSG_MAX_MSG > 0);
3333 /* useDynamicSMSG */
3334 static void _init_dynamic_smsg()
3336 gni_return_t status;
3337 uint32_t vmdh_index = -1;
3338 int i;
3340 smsg_attr_vector_local = (gni_smsg_attr_t**)malloc(mysize * sizeof(gni_smsg_attr_t*));
3341 smsg_attr_vector_remote = (gni_smsg_attr_t**)malloc(mysize * sizeof(gni_smsg_attr_t*));
3342 smsg_connected_flag = (int*)malloc(sizeof(int)*mysize);
3343 for(i=0; i<mysize; i++) {
3344 smsg_connected_flag[i] = 0;
3345 smsg_attr_vector_local[i] = NULL;
3346 smsg_attr_vector_remote[i] = NULL;
3349 set_smsg_max();
3351 send_smsg_attr.msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
3352 send_smsg_attr.mbox_maxcredit = SMSG_MAX_CREDIT;
3353 send_smsg_attr.msg_maxsize = SMSG_MAX_MSG;
3354 status = GNI_SmsgBufferSizeNeeded(&send_smsg_attr, &smsg_memlen);
3355 GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
3357 mailbox_list = (dynamic_smsg_mailbox_t*)malloc(sizeof(dynamic_smsg_mailbox_t));
3358 mailbox_list->size = smsg_memlen*avg_smsg_connection;
3359 posix_memalign(&mailbox_list->mailbox_base, 64, mailbox_list->size);
3360 bzero(mailbox_list->mailbox_base, mailbox_list->size);
3361 mailbox_list->offset = 0;
3362 mailbox_list->next = 0;
3364 status = GNI_MemRegister(nic_hndl, (uint64_t)(mailbox_list->mailbox_base),
3365 mailbox_list->size, smsg_rx_cqh,
3366 GNI_MEM_READWRITE,
3367 vmdh_index,
3368 &(mailbox_list->mem_hndl));
3369 GNI_RC_CHECK("MEMORY registration for smsg", status);
3371 status = GNI_EpCreate(nic_hndl, default_tx_cqh, &ep_hndl_unbound);
3372 GNI_RC_CHECK("Unbound EP", status);
3374 alloc_smsg_attr(&send_smsg_attr);
3376 status = GNI_EpPostDataWId (ep_hndl_unbound, &send_smsg_attr, SMSG_ATTR_SIZE, &recv_smsg_attr, SMSG_ATTR_SIZE, myrank);
3377 GNI_RC_CHECK("post unbound datagram", status);
3379 /* always pre-connect to proc 0 */
3380 //if (myrank != 0) connect_to(0);
3382 status = GNI_SmsgSetMaxRetrans(nic_hndl, 4096);
3383 GNI_RC_CHECK("SmsgSetMaxRetrans Init", status);
3386 static void _init_static_smsg()
3388 gni_smsg_attr_t *smsg_attr;
3389 gni_smsg_attr_t remote_smsg_attr;
3390 gni_smsg_attr_t *smsg_attr_vec;
3391 gni_mem_handle_t my_smsg_mdh_mailbox;
3392 int ret, i;
3393 gni_return_t status;
3394 uint32_t vmdh_index = -1;
3395 mdh_addr_t base_infor;
3396 mdh_addr_t *base_addr_vec;
3398 set_smsg_max();
3400 smsg_attr = malloc(mysize * sizeof(gni_smsg_attr_t));
3402 smsg_attr[0].msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
3403 smsg_attr[0].mbox_maxcredit = SMSG_MAX_CREDIT;
3404 smsg_attr[0].msg_maxsize = SMSG_MAX_MSG;
3405 status = GNI_SmsgBufferSizeNeeded(&smsg_attr[0], &smsg_memlen);
3406 GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
3407 ret = posix_memalign(&smsg_mailbox_base, 64, smsg_memlen*(mysize));
3408 CmiAssert(ret == 0);
3409 bzero(smsg_mailbox_base, smsg_memlen*(mysize));
3411 status = GNI_MemRegister(nic_hndl, (uint64_t)smsg_mailbox_base,
3412 smsg_memlen*(mysize), smsg_rx_cqh,
3413 GNI_MEM_READWRITE,
3414 vmdh_index,
3415 &my_smsg_mdh_mailbox);
3416 register_memory_size += smsg_memlen*(mysize);
3417 GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
3419 if (myrank == 0) printf("Charm++> SMSG memory: %1.1fKB\n", 1.0*smsg_memlen*(mysize)/1024);
3420 if (myrank == 0 && register_memory_size>=MAX_REG_MEM ) printf("Charm++> FATAL ERROR your program has risk of hanging \n please set CHARM_UGNI_MEMPOOL_MAX to a larger value or use Dynamic smsg\n");
3422 base_infor.addr = (uint64_t)smsg_mailbox_base;
3423 base_infor.mdh = my_smsg_mdh_mailbox;
3424 base_addr_vec = malloc(mysize * sizeof(mdh_addr_t));
3426 allgather(&base_infor, base_addr_vec, sizeof(mdh_addr_t));
3428 for(i=0; i<mysize; i++)
3430 if(i==myrank)
3431 continue;
3432 smsg_attr[i].msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
3433 smsg_attr[i].mbox_maxcredit = SMSG_MAX_CREDIT;
3434 smsg_attr[i].msg_maxsize = SMSG_MAX_MSG;
3435 smsg_attr[i].mbox_offset = i*smsg_memlen;
3436 smsg_attr[i].buff_size = smsg_memlen;
3437 smsg_attr[i].msg_buffer = smsg_mailbox_base ;
3438 smsg_attr[i].mem_hndl = my_smsg_mdh_mailbox;
3441 for(i=0; i<mysize; i++)
3443 if (myrank == i) continue;
3445 remote_smsg_attr.msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
3446 remote_smsg_attr.mbox_maxcredit = SMSG_MAX_CREDIT;
3447 remote_smsg_attr.msg_maxsize = SMSG_MAX_MSG;
3448 remote_smsg_attr.mbox_offset = myrank*smsg_memlen;
3449 remote_smsg_attr.buff_size = smsg_memlen;
3450 remote_smsg_attr.msg_buffer = (void*)base_addr_vec[i].addr;
3451 remote_smsg_attr.mem_hndl = base_addr_vec[i].mdh;
3453 /* initialize the smsg channel */
3454 status = GNI_SmsgInit(ep_hndl_array[i], &smsg_attr[i], &remote_smsg_attr);
3455 GNI_RC_CHECK("SMSG Init", status);
3456 } //end initialization
3458 free(base_addr_vec);
3459 free(smsg_attr);
3461 status = GNI_SmsgSetMaxRetrans(nic_hndl, 4096);
3462 GNI_RC_CHECK("SmsgSetMaxRetrans Init", status);
3465 inline
3466 static void _init_send_queue(SMSG_QUEUE *queue)
3468 int i;
3469 #if ONE_SEND_QUEUE
3470 queue->sendMsgBuf = PCQueueCreate();
3471 destpe_avail = (char*)malloc(mysize * sizeof(char));
3472 #else
3473 queue->smsg_msglist_index = (MSG_LIST_INDEX*)malloc(mysize*sizeof(MSG_LIST_INDEX));
3474 #if SMP_LOCKS
3475 queue->nonEmptyQueues = PCQueueCreate();
3476 #endif
3477 for(i =0; i<mysize; i++)
3479 queue->smsg_msglist_index[i].sendSmsgBuf = PCQueueCreate();
3480 #if SMP_LOCKS
3481 queue->smsg_msglist_index[i].pushed = 0;
3482 queue->smsg_msglist_index[i].lock = CmiCreateLock();
3483 queue->smsg_msglist_index[i].destpe = i;
3484 #endif
3486 #endif
3489 inline
3490 static void _init_smsg()
3492 if(mysize > 1) {
3493 if (useDynamicSMSG)
3494 _init_dynamic_smsg();
3495 else
3496 _init_static_smsg();
3499 _init_send_queue(&smsg_queue);
3500 #if CMK_USE_OOB
3501 _init_send_queue(&smsg_oob_queue);
3502 #endif
3505 static void _init_static_msgq()
3507 gni_return_t status;
3508 /* MSGQ is to send and receive short messages for large jobs (exceeding 200,000 ranks). The performance scales by the node count rather than rank count */
3509 msgq_attrs.max_msg_sz = MSGQ_MAXSIZE;
3510 msgq_attrs.smsg_q_sz = 1;
3511 msgq_attrs.rcv_pool_sz = 1;
3512 msgq_attrs.num_msgq_eps = 2;
3513 msgq_attrs.nloc_insts = 8;
3514 msgq_attrs.modes = 0;
3515 msgq_attrs.rcv_cq_sz = REMOTE_QUEUE_ENTRIES ;
3517 status = GNI_MsgqInit(nic_hndl, NULL, NULL, NULL, &msgq_attrs, &msgq_handle);
3518 GNI_RC_CHECK("MSGQ Init", status);
3524 static CmiUInt8 total_mempool_size = 0;
3525 static CmiUInt8 total_mempool_calls = 0;
3527 #if USE_LRTS_MEMPOOL
3529 #if CMK_PERSISTENT_COMM
3530 void *alloc_persistent_mempool_block(size_t *size, gni_mem_handle_t *mem_hndl, int expand_flag)
3532 void *pool;
3533 int ret;
3534 gni_return_t status = GNI_RC_SUCCESS;
3536 size_t default_size = expand_flag? _expand_mem : _mempool_size;
3537 if (*size < default_size) *size = default_size;
3538 #if LARGEPAGE
3539 // round up to be multiple of _tlbpagesize
3540 //*size = (*size + _tlbpagesize - 1)/_tlbpagesize*_tlbpagesize;
3541 *size = ALIGNHUGEPAGE(*size);
3542 #endif
3543 total_mempool_size += *size;
3544 total_mempool_calls += 1;
3545 #if !LARGEPAGE
3546 if ((*size > MAX_REG_MEM || *size > MAX_BUFF_SEND) && expand_flag)
3548 printf("Error: A mempool block with size %lld is allocated, which is greater than the maximum mempool allowed.\n Please increase the max pool size by using +gni-mempool-max or set enviorment variable CHARM_UGNI_MEMPOOL_MAX. (current=%lld, %lld)\n", *size, MAX_REG_MEM, MAX_BUFF_SEND);
3549 CmiAbort("alloc_mempool_block");
3551 #endif
3552 #if LARGEPAGE
3553 pool = my_get_huge_pages(*size);
3554 ret = pool==NULL;
3555 #else
3556 ret = posix_memalign(&pool, ALIGNBUF, *size);
3557 #endif
3558 if (ret != 0) {
3559 printf("Charm++> can not allocate memory pool of size %.2fMB. \n", 1.0*(*size)/1024/1024);
3560 if (ret == ENOMEM)
3561 CmiAbort("alloc_mempool_block: out of memory.");
3562 else
3563 CmiAbort("alloc_mempool_block: posix_memalign failed");
3565 #if LARGEPAGE
3566 CmiMemLock();
3567 register_count++;
3568 MEMORY_REGISTER(onesided_hnd, nic_hndl, pool, *size, mem_hndl, &omdh, highpriority_rx_cqh, status);
3569 CmiMemUnlock();
3570 if(status != GNI_RC_SUCCESS) {
3571 printf("[%d, %d] memory reigstration %f G (%lld) ask for %lld\n", myrank, CmiMyRank(), register_memory_size/(1024*1024.0*1024),register_count, *size);
3572 sweep_mempool(CpvAccess(mempool));
3574 GNI_RC_CHECK("MEMORY_REGISTER", status);
3575 #else
3576 SetMemHndlZero((*mem_hndl));
3577 #endif
3578 return pool;
3580 #endif
3582 void *alloc_mempool_block(size_t *size, gni_mem_handle_t *mem_hndl, int expand_flag)
3584 void *pool;
3585 int ret;
3586 gni_return_t status = GNI_RC_SUCCESS;
3588 size_t default_size = expand_flag? _expand_mem : _mempool_size;
3589 if (*size < default_size) *size = default_size;
3590 #if LARGEPAGE
3591 // round up to be multiple of _tlbpagesize
3592 //*size = (*size + _tlbpagesize - 1)/_tlbpagesize*_tlbpagesize;
3593 *size = ALIGNHUGEPAGE(*size);
3594 #endif
3595 total_mempool_size += *size;
3596 total_mempool_calls += 1;
3597 #if !LARGEPAGE
3598 if ((*size > MAX_REG_MEM || *size > MAX_BUFF_SEND) && expand_flag)
3600 printf("Error: A mempool block with size %lld is allocated, which is greater than the maximum mempool allowed.\n Please increase the max pool size by using +gni-mempool-max or set enviorment variable CHARM_UGNI_MEMPOOL_MAX. (current=%lld, %lld)\n", *size, MAX_REG_MEM, MAX_BUFF_SEND);
3601 CmiAbort("alloc_mempool_block");
3603 #endif
3604 #if LARGEPAGE
3605 pool = my_get_huge_pages(*size);
3606 ret = pool==NULL;
3607 #else
3608 ret = posix_memalign(&pool, ALIGNBUF, *size);
3609 #endif
3610 if (ret != 0) {
3611 printf("Charm++> can not allocate memory pool of size %.2fMB. \n", 1.0*(*size)/1024/1024);
3612 if (ret == ENOMEM)
3613 CmiAbort("alloc_mempool_block: out of memory.");
3614 else
3615 CmiAbort("alloc_mempool_block: posix_memalign failed");
3617 #if LARGEPAGE
3618 CmiMemLock();
3619 register_count++;
3620 MEMORY_REGISTER(onesided_hnd, nic_hndl, pool, *size, mem_hndl, &omdh, rdma_rx_cqh, status);
3621 CmiMemUnlock();
3622 if(status != GNI_RC_SUCCESS) {
3623 printf("[%d, %d] memory reigstration %f G (%lld) ask for %lld\n", myrank, CmiMyRank(), register_memory_size/(1024*1024.0*1024),register_count, *size);
3624 sweep_mempool(CpvAccess(mempool));
3626 GNI_RC_CHECK("MEMORY_REGISTER", status);
3627 #else
3628 SetMemHndlZero((*mem_hndl));
3629 #endif
3630 return pool;
3633 // ptr is a block head pointer
3634 void free_mempool_block(void *ptr, gni_mem_handle_t mem_hndl)
3636 if(!(IsMemHndlZero(mem_hndl)))
3638 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &mem_hndl, &omdh, GetSizeFromBlockHeader(ptr));
3640 #if LARGEPAGE
3641 my_free_huge_pages(ptr, GetSizeFromBlockHeader(ptr));
3642 #else
3643 free(ptr);
3644 #endif
3646 #endif
3648 void LrtsPreCommonInit(int everReturn){
3649 #if USE_LRTS_MEMPOOL
3650 CpvInitialize(mempool_type*, mempool);
3651 CpvAccess(mempool) = mempool_init(_mempool_size, alloc_mempool_block, free_mempool_block, _mempool_size_limit);
3652 #if CMK_PERSISTENT_COMM
3653 CpvInitialize(mempool_type*, persistent_mempool);
3654 CpvAccess(persistent_mempool) = mempool_init(_mempool_size, alloc_persistent_mempool_block, free_mempool_block, _mempool_size_limit);
3655 #endif
3656 MACHSTATE2(8, "mempool_init %d %p\n", CmiMyRank(), CpvAccess(mempool)) ;
3657 #endif
3660 void LrtsInit(int *argc, char ***argv, int *numNodes, int *myNodeID)
3662 register int i;
3663 int rc;
3664 int device_id = 0;
3665 unsigned int remote_addr;
3666 gni_cdm_handle_t cdm_hndl;
3667 gni_return_t status = GNI_RC_SUCCESS;
3668 uint32_t vmdh_index = -1;
3669 uint8_t ptag;
3670 unsigned int local_addr, *MPID_UGNI_AllAddr;
3671 int first_spawned;
3672 int physicalID;
3673 char *env;
3675 //void (*local_event_handler)(gni_cq_entry_t *, void *) = &LocalEventHandle;
3676 //void (*remote_smsg_event_handler)(gni_cq_entry_t *, void *) = &RemoteSmsgEventHandle;
3677 //void (*remote_bte_event_handler)(gni_cq_entry_t *, void *) = &RemoteBteEventHandle;
3679 status = PMI_Init(&first_spawned);
3680 GNI_RC_CHECK("PMI_Init", status);
3682 status = PMI_Get_size(&mysize);
3683 GNI_RC_CHECK("PMI_Getsize", status);
3685 status = PMI_Get_rank(&myrank);
3686 GNI_RC_CHECK("PMI_getrank", status);
3688 //physicalID = CmiPhysicalNodeID(myrank);
3690 //printf("Pysical Node ID:%d for PE:%d\n", physicalID, myrank);
3692 *myNodeID = myrank;
3693 *numNodes = mysize;
3695 #if MULTI_THREAD_SEND
3696 /* Currently, we only consider the case that comm. thread will only recv msgs */
3697 Cmi_smp_mode_setting = COMM_WORK_THREADS_SEND_RECV;
3698 #endif
3700 #if CMI_EXERT_SEND_LARGE_CAP
3701 CmiGetArgInt(*argv,"+useSendLargeCap", &SEND_large_cap);
3702 #endif
3704 #if CMI_SENDBUFFERSMSG_CAP
3705 CmiGetArgInt(*argv,"+useSendBufferCap", &SendBufferMsg_cap);
3706 #endif
3708 #if CMI_PUMPNETWORKSMSG_CAP
3709 CmiGetArgInt(*argv,"+usePumpSmsgCap", &PumpNetworkSmsg_cap);
3710 #endif
3712 CmiGetArgInt(*argv,"+useRecvQueue", &REMOTE_QUEUE_ENTRIES);
3714 env = getenv("CHARM_UGNI_REMOTE_QUEUE_SIZE");
3715 if (env) REMOTE_QUEUE_ENTRIES = atoi(env);
3716 CmiGetArgInt(*argv,"+useRecvQueue", &REMOTE_QUEUE_ENTRIES);
3718 env = getenv("CHARM_UGNI_LOCAL_QUEUE_SIZE");
3719 if (env) LOCAL_QUEUE_ENTRIES = atoi(env);
3720 CmiGetArgInt(*argv,"+useSendQueue", &LOCAL_QUEUE_ENTRIES);
3722 env = getenv("CHARM_UGNI_DYNAMIC_SMSG");
3723 if (env) useDynamicSMSG = 1;
3724 if (!useDynamicSMSG)
3725 useDynamicSMSG = CmiGetArgFlag(*argv, "+useDynamicSmsg");
3726 CmiGetArgIntDesc(*argv, "+smsgConnection", &avg_smsg_connection,"Initial number of SMSGS connection per code");
3727 if (avg_smsg_connection>mysize) avg_smsg_connection = mysize;
3728 //useStaticMSGQ = CmiGetArgFlag(*argv, "+useStaticMsgQ");
3730 if(myrank == 0)
3732 printf("Charm++> Running on Gemini (GNI) with %d processes\n", mysize);
3733 printf("Charm++> %s SMSG\n", useDynamicSMSG?"dynamic":"static");
3735 #ifdef USE_ONESIDED
3736 onesided_init(NULL, &onesided_hnd);
3738 // this is a GNI test, so use the libonesided bypass functionality
3739 onesided_gni_bypass_get_nih(onesided_hnd, &nic_hndl);
3740 local_addr = gniGetNicAddress();
3741 #else
3742 ptag = get_ptag();
3743 cookie = get_cookie();
3744 #if 0
3745 modes = GNI_CDM_MODE_CQ_NIC_LOCAL_PLACEMENT;
3746 #endif
3747 //Create and attach to the communication domain */
3748 status = GNI_CdmCreate(myrank, ptag, cookie, modes, &cdm_hndl);
3749 GNI_RC_CHECK("GNI_CdmCreate", status);
3750 //* device id The device id is the minor number for the device
3751 //that is assigned to the device by the system when the device is created.
3752 //To determine the device number, look in the /dev directory, which contains a list of devices. For a NIC, the device is listed as kgniX
3753 //where X is the device number 0 default
3754 status = GNI_CdmAttach(cdm_hndl, device_id, &local_addr, &nic_hndl);
3755 GNI_RC_CHECK("GNI_CdmAttach", status);
3756 local_addr = get_gni_nic_address(0);
3757 #endif
3758 MPID_UGNI_AllAddr = (unsigned int *)malloc(sizeof(unsigned int) * mysize);
3759 _MEMCHECK(MPID_UGNI_AllAddr);
3760 allgather(&local_addr, MPID_UGNI_AllAddr, sizeof(unsigned int));
3761 /* create the local completion queue */
3762 /* the third parameter : The number of events the NIC allows before generating an interrupt. Setting this parameter to zero results in interrupt delivery with every event. When using this parameter, the mode parameter must be set to GNI_CQ_BLOCKING*/
3763 status = GNI_CqCreate(nic_hndl, LOCAL_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &default_tx_cqh);
3764 GNI_RC_CHECK("GNI_CqCreate (tx)", status);
3765 #if MULTI_THREAD_SEND
3766 status = GNI_CqCreate(nic_hndl, LOCAL_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &rdma_tx_cqh);
3767 GNI_RC_CHECK("GNI_CqCreate RDMA (tx)", status);
3768 #endif
3770 #if CMK_USE_OOB
3771 status = GNI_CqCreate(nic_hndl, LOCAL_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &highprior_rdma_tx_cqh);
3772 GNI_RC_CHECK("GNI_CqCreate high priority RDMA (tx)", status);
3773 #endif
3774 /* create the destination completion queue for receiving micro-messages, make this queue considerably larger than the number of transfers */
3776 status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &smsg_rx_cqh);
3777 GNI_RC_CHECK("Create CQ (rx)", status);
3779 status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &rdma_rx_cqh);
3780 GNI_RC_CHECK("Create Post CQ (rx)", status);
3782 #if CMK_PERSISTENT_COMM
3783 status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &highpriority_rx_cqh);
3784 GNI_RC_CHECK("Create Post CQ (rx)", status);
3785 #endif
3786 //status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &rdma_cqh);
3787 //GNI_RC_CHECK("Create BTE CQ", status);
3789 /* create the endpoints. they need to be bound to allow later CQWrites to them */
3790 ep_hndl_array = (gni_ep_handle_t*)malloc(mysize * sizeof(gni_ep_handle_t));
3791 _MEMCHECK(ep_hndl_array);
3792 #if MULTI_THREAD_SEND
3793 rx_cq_lock = global_gni_lock = default_tx_cq_lock = smsg_mailbox_lock = CmiCreateLock();
3794 //default_tx_cq_lock = CmiCreateLock();
3795 rdma_tx_cq_lock = CmiCreateLock();
3796 smsg_rx_cq_lock = CmiCreateLock();
3797 //global_gni_lock = CmiCreateLock();
3798 //rx_cq_lock = CmiCreateLock();
3799 #endif
3800 for (i=0; i<mysize; i++) {
3801 if(i == myrank) continue;
3802 status = GNI_EpCreate(nic_hndl, default_tx_cqh, &ep_hndl_array[i]);
3803 GNI_RC_CHECK("GNI_EpCreate ", status);
3804 remote_addr = MPID_UGNI_AllAddr[i];
3805 status = GNI_EpBind(ep_hndl_array[i], remote_addr, i);
3806 GNI_RC_CHECK("GNI_EpBind ", status);
3809 /* SMSG is fastest but not scale; Msgq is scalable, FMA is own implementation for small message */
3810 _init_smsg();
3811 PMI_Barrier();
3813 #if USE_LRTS_MEMPOOL
3814 env = getenv("CHARM_UGNI_MAX_MEMORY_ON_NODE");
3815 if (env) {
3816 _totalmem = CmiReadSize(env);
3817 if (myrank == 0)
3818 printf("Charm++> total registered memory available per node is %.1fGB\n", (float)(_totalmem*1.0/oneGB));
3821 env = getenv("CHARM_UGNI_MEMPOOL_INIT_SIZE");
3822 if (env) _mempool_size = CmiReadSize(env);
3823 if (CmiGetArgStringDesc(*argv,"+gni-mempool-init-size",&env,"Set the memory pool size"))
3824 _mempool_size = CmiReadSize(env);
3827 env = getenv("CHARM_UGNI_MEMPOOL_MAX");
3828 if (env) {
3829 MAX_REG_MEM = CmiReadSize(env);
3830 user_set_flag = 1;
3832 if (CmiGetArgStringDesc(*argv,"+gni-mempool-max",&env,"Set the memory pool max size")) {
3833 MAX_REG_MEM = CmiReadSize(env);
3834 user_set_flag = 1;
3837 env = getenv("CHARM_UGNI_SEND_MAX");
3838 if (env) {
3839 MAX_BUFF_SEND = CmiReadSize(env);
3840 user_set_flag = 1;
3842 if (CmiGetArgStringDesc(*argv,"+gni-mempool-max-send",&env,"Set the memory pool max size for send")) {
3843 MAX_BUFF_SEND = CmiReadSize(env);
3844 user_set_flag = 1;
3847 env = getenv("CHARM_UGNI_MEMPOOL_SIZE_LIMIT");
3848 if (env) {
3849 _mempool_size_limit = CmiReadSize(env);
3852 if (MAX_REG_MEM < _mempool_size) MAX_REG_MEM = _mempool_size;
3853 if (MAX_BUFF_SEND > MAX_REG_MEM) MAX_BUFF_SEND = MAX_REG_MEM;
3855 if (myrank==0) {
3856 printf("Charm++> memory pool init block size: %1.fMB, total registered memory per node: %1.fMB\n", _mempool_size/1024.0/1024, _mempool_size_limit/1024.0/1024);
3857 printf("Charm++> memory pool registered memory limit: %1.fMB, send limit: %1.fMB\n", MAX_REG_MEM/1024.0/1024, MAX_BUFF_SEND/1024.0/1024);
3858 if (MAX_REG_MEM < BIG_MSG * 2 + oneMB) {
3859 /* memblock can expand to BIG_MSG * 2 size */
3860 printf("Charm++ Error: The mempool maximum size is too small, please use command line option +gni-mempool-max or environment variable CHARM_UGNI_MEMPOOL_MAX to increase the value to at least %1.fMB.\n", BIG_MSG * 2.0/1024/1024 + 1);
3861 CmiAbort("mempool maximum size is too small. \n");
3863 #if MULTI_THREAD_SEND
3864 printf("Charm++> worker thread sending messages\n");
3865 #elif COMM_THREAD_SEND
3866 printf("Charm++> only comm thread send/recv messages\n");
3867 #endif
3870 #endif /* end of USE_LRTS_MEMPOOL */
3872 env = getenv("CHARM_UGNI_BIG_MSG_SIZE");
3873 if (env) {
3874 BIG_MSG = CmiReadSize(env);
3875 if (BIG_MSG < ONE_SEG)
3876 CmiAbort("BIG_MSG size is too small in the environment variable CHARM_UGNI_BIG_MSG_SIZE.");
3878 env = getenv("CHARM_UGNI_BIG_MSG_PIPELINE_LEN");
3879 if (env) {
3880 BIG_MSG_PIPELINE = atoi(env);
3883 env = getenv("CHARM_UGNI_NO_DEADLOCK_CHECK");
3884 if (env) _checkProgress = 0;
3885 if (mysize == 1) _checkProgress = 0;
3887 #if CMI_EXERT_RECV_RDMA_CAP
3888 env = getenv("CHARM_UGNI_RDMA_MAX");
3889 if (env) {
3890 RDMA_pending = atoi(env);
3891 if (myrank == 0)
3892 printf("Charm++> Max pending RDMA set to: %d\n", RDMA_pending);
3894 #endif
3897 env = getenv("HUGETLB_DEFAULT_PAGE_SIZE");
3898 if (env)
3899 _tlbpagesize = CmiReadSize(env);
3901 /* real gethugepagesize() is only available when hugetlb module linked */
3902 _tlbpagesize = gethugepagesize();
3903 if (myrank == 0) {
3904 printf("Charm++> Cray TLB page size: %1.fK\n", _tlbpagesize/1024.0);
3907 #if LARGEPAGE
3908 if (_tlbpagesize == 4096) {
3909 CmiAbort("Hugepage module, e.g. craype-hugepages8M must be loaded.");
3911 #endif
3913 /* stats related arguments */
3914 #if CMK_WITH_STATS
3915 CmiGetArgStringDesc(*argv,"+gni_stats_root",&counters_dirname,"counter directory name, default counters");
3917 print_stats = CmiGetArgFlag(*argv, "+print_stats");
3919 stats_off = CmiGetArgFlag(*argv, "+stats_off");
3921 init_comm_stats();
3922 #endif
3924 /* init DMA buffer for medium message */
3926 //_init_DMA_buffer();
3928 free(MPID_UGNI_AllAddr);
3930 sendRdmaBuf = PCQueueCreate();
3931 sendHighPriorBuf = PCQueueCreate();
3933 #if MACHINE_DEBUG_LOG
3934 char ln[200];
3935 sprintf(ln,"debugLog.%d",myrank);
3936 debugLog=fopen(ln,"w");
3937 #endif
3939 // NTK_Init();
3940 // ntk_return_t sts = NTK_System_GetSmpdCount(&_smpd_count);
3942 #if REMOTE_EVENT
3943 SHIFT = 1;
3944 while (1<<SHIFT < mysize) SHIFT++;
3945 CmiAssert(SHIFT < 31);
3946 IndexPool_init(&ackPool);
3947 #if CMK_PERSISTENT_COMM
3948 IndexPool_init(&persistPool);
3949 #endif
3950 #endif
3953 void* LrtsAlloc(int n_bytes, int header)
3955 void *ptr = NULL;
3956 #if 0
3957 printf("\n[PE:%d]Alloc Lrts for bytes=%d, head=%d %d\n", CmiMyPe(), n_bytes, header, SMSG_MAX_MSG);
3958 #endif
3959 if(n_bytes <= SMSG_MAX_MSG)
3961 int totalsize = n_bytes+header;
3962 ptr = malloc(totalsize);
3964 else {
3965 CmiAssert(header+sizeof(mempool_header) <= ALIGNBUF);
3966 #if USE_LRTS_MEMPOOL
3967 n_bytes = ALIGN64(n_bytes);
3968 if(n_bytes < BIG_MSG)
3970 char *res = mempool_malloc(CpvAccess(mempool), ALIGNBUF+n_bytes-sizeof(mempool_header), 1);
3971 if (res) ptr = res - sizeof(mempool_header) + ALIGNBUF - header;
3972 }else
3974 #if LARGEPAGE
3975 //printf("[%d] LrtsAlloc a big_msg: %d %d\n", myrank, n_bytes, ALIGNHUGEPAGE(n_bytes+ALIGNBUF));
3976 n_bytes = ALIGNHUGEPAGE(n_bytes+ALIGNBUF);
3977 char *res = my_get_huge_pages(n_bytes);
3978 #else
3979 char *res = memalign(ALIGNBUF, n_bytes+ALIGNBUF);
3980 #endif
3981 if (res) ptr = res + ALIGNBUF - header;
3983 #else
3984 n_bytes = ALIGN64(n_bytes); /* make sure size if 4 aligned */
3985 char *res = memalign(ALIGNBUF, n_bytes+ALIGNBUF);
3986 ptr = res + ALIGNBUF - header;
3987 #endif
3989 return ptr;
3992 void LrtsFree(void *msg)
3994 CmiUInt4 size = SIZEFIELD((char*)msg+sizeof(CmiChunkHeader));
3995 #if CMK_PERSISTENT_COMM
3996 if (IS_PERSISTENT_MEMORY(msg)) return;
3997 #endif
3998 if (size <= SMSG_MAX_MSG)
3999 free(msg);
4000 else {
4001 size = ALIGN64(size);
4002 if(size>=BIG_MSG)
4004 #if LARGEPAGE
4005 int s = ALIGNHUGEPAGE(size+ALIGNBUF);
4006 my_free_huge_pages((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF, s);
4007 #else
4008 free((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF);
4009 #endif
4011 else {
4012 #if USE_LRTS_MEMPOOL
4013 #if CMK_SMP
4014 mempool_free_thread((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF + sizeof(mempool_header));
4015 #else
4016 mempool_free(CpvAccess(mempool), (char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF + sizeof(mempool_header));
4017 #endif
4018 #else
4019 free((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF);
4020 #endif
4025 void LrtsExit()
4027 #if CMK_WITH_STATS
4028 #if CMK_SMP
4029 if(CmiMyRank() == CmiMyNodeSize())
4030 #endif
4031 if (print_stats) print_comm_stats();
4032 #endif
4033 /* free memory ? */
4034 #if USE_LRTS_MEMPOOL
4035 //printf("FINAL [%d, %d] register=%lld, send=%lld\n", myrank, CmiMyRank(), register_memory_size, buffered_send_msg);
4036 mempool_destroy(CpvAccess(mempool));
4037 #endif
4038 PMI_Finalize();
4039 exit(0);
4042 void LrtsDrainResources()
4044 if(mysize == 1) return;
4045 while (
4046 #if CMK_USE_OOB
4047 !SendBufferMsg(&smsg_oob_queue, NULL) ||
4048 #endif
4049 !SendBufferMsg(&smsg_queue, NULL)
4052 if (useDynamicSMSG)
4053 PumpDatagramConnection();
4054 PumpNetworkSmsg();
4055 PumpLocalTransactions(default_tx_cqh, default_tx_cq_lock);
4057 #if MULTI_THREAD_SEND
4058 PumpLocalTransactions(rdma_tx_cqh, rdma_tx_cq_lock);
4059 #endif
4061 #if CMK_USE_OOB
4062 PumpLocalTransactions(highprior_rdma_tx_cqh, rdma_tx_cq_lock);
4063 #endif
4065 #if REMOTE_EVENT
4066 PumpRemoteTransactions(rdma_rx_cqh);
4067 #endif
4068 SendRdmaMsg(sendRdmaBuf);
4069 SendRdmaMsg(sendHighPriorBuf);
4071 PMI_Barrier();
4074 void LrtsAbort(const char *message) {
4075 fprintf(stderr, "[%d] CmiAbort: %s\n", myrank, message);
4076 CmiPrintStackTrace(0);
4077 PMI_Abort(-1, message);
4080 /************************** TIMER FUNCTIONS **************************/
4081 #if CMK_TIMER_USE_SPECIAL
4082 /* MPI calls are not threadsafe, even the timer on some machines */
4083 static CmiNodeLock timerLock = 0;
4084 static int _absoluteTime = 0;
4085 static int _is_global = 0;
4086 static struct timespec start_ts;
4088 inline int CmiTimerIsSynchronized() {
4089 return 0;
4092 inline int CmiTimerAbsolute() {
4093 return _absoluteTime;
4096 double CmiStartTimer() {
4097 return 0.0;
4100 double CmiInitTime() {
4101 return (double)(start_ts.tv_sec)+(double)start_ts.tv_nsec/1000000000.0;
4104 void CmiTimerInit(char **argv) {
4105 _absoluteTime = CmiGetArgFlagDesc(argv,"+useAbsoluteTime", "Use system's absolute time as wallclock time.");
4106 if (_absoluteTime && CmiMyPe() == 0)
4107 printf("Charm++> absolute timer is used\n");
4109 _is_global = CmiTimerIsSynchronized();
4112 if (_is_global) {
4113 if (CmiMyRank() == 0) {
4114 clock_gettime(CLOCK_MONOTONIC, &start_ts);
4116 } else { /* we don't have a synchronous timer, set our own start time */
4117 CmiBarrier();
4118 CmiBarrier();
4119 CmiBarrier();
4120 clock_gettime(CLOCK_MONOTONIC, &start_ts);
4122 CmiNodeAllBarrier(); /* for smp */
4126 * Since the timerLock is never created, and is
4127 * always NULL, then all the if-condition inside
4128 * the timer functions could be disabled right
4129 * now in the case of SMP.
4131 double CmiTimer(void) {
4132 struct timespec now_ts;
4133 clock_gettime(CLOCK_MONOTONIC, &now_ts);
4134 return _absoluteTime?((double)(now_ts.tv_sec)+(double)now_ts.tv_nsec/1000000000.0)
4135 : (double)( now_ts.tv_sec - start_ts.tv_sec ) + (((double) now_ts.tv_nsec - (double) start_ts.tv_nsec) / 1000000000.0);
4138 double CmiWallTimer(void) {
4139 struct timespec now_ts;
4140 clock_gettime(CLOCK_MONOTONIC, &now_ts);
4141 return _absoluteTime?((double)(now_ts.tv_sec)+(double)now_ts.tv_nsec/1000000000.0)
4142 : ( now_ts.tv_sec - start_ts.tv_sec ) + ((now_ts.tv_nsec - start_ts.tv_nsec) / 1000000000.0);
4145 double CmiCpuTimer(void) {
4146 struct timespec now_ts;
4147 clock_gettime(CLOCK_MONOTONIC, &now_ts);
4148 return _absoluteTime?((double)(now_ts.tv_sec)+(double)now_ts.tv_nsec/1000000000.0)
4149 : (double)( now_ts.tv_sec - start_ts.tv_sec ) + (((double) now_ts.tv_nsec - (double) start_ts.tv_nsec) / 1000000000.0);
4152 #endif
4153 /************Barrier Related Functions****************/
4155 int CmiBarrier()
4157 gni_return_t status;
4159 #if CMK_SMP
4160 /* make sure all ranks reach here, otherwise comm threads may reach barrier ignoring other ranks */
4161 CmiNodeAllBarrier();
4162 if (CmiMyRank() == CmiMyNodeSize())
4163 #else
4164 if (CmiMyRank() == 0)
4165 #endif
4168 * The call of CmiBarrier is usually before the initialization
4169 * of trace module of Charm++, therefore, the START_EVENT
4170 * and END_EVENT are disabled here. -Chao Mei
4172 /*START_EVENT();*/
4173 status = PMI_Barrier();
4174 GNI_RC_CHECK("PMI_Barrier", status);
4175 /*END_EVENT(10);*/
4177 CmiNodeAllBarrier();
4178 return 0;
4181 #if CMK_DIRECT
4182 #include "machine-cmidirect.c"
4183 #endif
4184 #if CMK_PERSISTENT_COMM
4185 #include "machine-persistent.c"
4186 #endif