9 * Flow control by mem pool using environment variables:
11 # CHARM_UGNI_MEMPOOL_MAX can be maximum_register_mem/number_of_processes
12 # CHARM_UGNI_SEND_MAX can be half of CHARM_UGNI_MEMPOOL_MAX
13 export CHARM_UGNI_MEMPOOL_INIT_SIZE=8M
14 export CHARM_UGNI_MEMPOOL_MAX=20M
15 export CHARM_UGNI_SEND_MAX=10M
17 # limit on total mempool size allocated, this is to prevent mempool
18 # uses too much memory
19 export CHARM_UGNI_MEMPOOL_SIZE_LIMIT=512M
21 other environment variables:
23 export CHARM_UGNI_NO_DEADLOCK_CHECK=yes # disable checking deadlock
24 export CHARM_UGNI_MAX_MEMORY_ON_NODE=0.8G # max memory per node for mempool
25 export CHARM_UGNI_BIG_MSG_SIZE=4M # set big message size protocol
26 export CHARM_UGNI_BIG_MSG_PIPELINE_LEN=4 # set big message pipe len
27 export CHARM_UGNI_RDMA_MAX=100 # max pending RDMA operations
42 //#include <numatoolkit.h>
47 #define DIRECT_SEQ 0xFFFFFFE
48 #include "cmidirect.h"
58 #define MULTI_THREAD_SEND 0
59 #define COMM_THREAD_SEND (!MULTI_THREAD_SEND)
63 #define CMK_WORKER_SINGLE_TASK 1
66 #define REMOTE_EVENT 1
70 #define CMI_EXERT_SEND_LARGE_CAP 0
71 #define CMI_EXERT_RECV_RDMA_CAP 0
74 #define CMI_SENDBUFFERSMSG_CAP 0
75 #define CMI_PUMPNETWORKSMSG_CAP 0
76 #define CMI_PUMPREMOTETRANSACTIONS_CAP 0
77 #define CMI_PUMPLOCALTRANSACTIONS_CAP 0
79 #if CMI_SENDBUFFERSMSG_CAP
80 int SendBufferMsg_cap = 20;
83 #if CMI_PUMPNETWORKSMSG_CAP
84 int PumpNetworkSmsg_cap = 20;
87 #if CMI_PUMPREMOTETRANSACTIONS_CAP
88 int PumpRemoteTransactions_cap = 20;
91 #if CMI_PUMPREMOTETRANSACTIONS_CAP
92 int PumpLocalTransactions_cap = 15;
95 #if CMI_EXERT_SEND_LARGE_CAP
96 static int SEND_large_cap = 20;
97 static int SEND_large_pending = 0;
100 #if CMI_EXERT_RECV_RDMA_CAP
101 static int RDMA_cap = 10;
102 static int RDMA_pending = 0;
108 NONCHARM_SMSG_DONT_FREE
111 #define USE_LRTS_MEMPOOL 1
115 // Trace communication thread
116 #if CMK_TRACE_ENABLED && CMK_SMP_TRACE_COMMTHREAD
117 #define TRACE_THRESHOLD 0.00001
118 #undef CMI_MACH_TRACE_USEREVENTS
119 #define CMI_MACH_TRACE_USEREVENTS 1
121 #undef CMK_SMP_TRACE_COMMTHREAD
122 #define CMK_SMP_TRACE_COMMTHREAD 0
125 #define CMK_TRACE_COMMOVERHEAD 0
126 #if CMK_TRACE_ENABLED && CMK_TRACE_COMMOVERHEAD
127 #undef CMI_MACH_TRACE_USEREVENTS
128 #define CMI_MACH_TRACE_USEREVENTS 1
130 #undef CMK_TRACE_COMMOVERHEAD
131 #define CMK_TRACE_COMMOVERHEAD 0
134 #if CMI_MACH_TRACE_USEREVENTS && CMK_TRACE_ENABLED && ! CMK_TRACE_IN_CHARM
135 CpvStaticDeclare(double, projTraceStart);
136 #define START_EVENT() CpvAccess(projTraceStart) = CmiWallTimer();
137 #define END_EVENT(x) traceUserBracketEvent(x, CpvAccess(projTraceStart), CmiWallTimer());
138 #define EVENT_TIME() CpvAccess(projTraceStart)
140 #define START_EVENT()
142 #define EVENT_TIME() (0.0)
147 #define oneMB (1024ll*1024)
148 #define oneGB (1024ll*1024*1024)
150 static CmiInt8 _mempool_size = 8*oneMB;
151 static CmiInt8 _expand_mem = 4*oneMB;
152 static CmiInt8 _mempool_size_limit = 0;
154 static CmiInt8 _totalmem = 0.8*oneGB;
157 static CmiInt8 BIG_MSG = 16*oneMB;
158 static CmiInt8 ONE_SEG = 4*oneMB;
160 static CmiInt8 BIG_MSG = 4*oneMB;
161 static CmiInt8 ONE_SEG = 2*oneMB;
163 #if MULTI_THREAD_SEND
164 static int BIG_MSG_PIPELINE = 1;
166 static int BIG_MSG_PIPELINE = 4;
169 // dynamic flow control
170 static CmiInt8 buffered_send_msg = 0;
171 static CmiInt8 register_memory_size = 0;
174 static CmiInt8 MAX_BUFF_SEND = 100000*oneMB;
175 static CmiInt8 MAX_REG_MEM = 200000*oneMB;
176 static CmiInt8 register_count = 0;
178 #if CMK_SMP && COMM_THREAD_SEND
179 static CmiInt8 MAX_BUFF_SEND = 100*oneMB;
180 static CmiInt8 MAX_REG_MEM = 200*oneMB;
182 static CmiInt8 MAX_BUFF_SEND = 16*oneMB;
183 static CmiInt8 MAX_REG_MEM = 25*oneMB;
189 #endif /* end USE_LRTS_MEMPOOL */
191 #if MULTI_THREAD_SEND
192 #define CMI_GNI_LOCK(x) CmiLock(x);
193 #define CMI_GNI_TRYLOCK(x) CmiTryLock(x)
194 #define CMI_GNI_UNLOCK(x) CmiUnlock(x);
195 #define CMI_PCQUEUEPOP_LOCK(Q) CmiLock((Q)->lock);
196 #define CMI_PCQUEUEPOP_UNLOCK(Q) CmiUnlock((Q)->lock);
198 #define CMI_GNI_LOCK(x)
199 #define CMI_GNI_TRYLOCK(x) (0)
200 #define CMI_GNI_UNLOCK(x)
201 #define CMI_PCQUEUEPOP_LOCK(Q)
202 #define CMI_PCQUEUEPOP_UNLOCK(Q)
205 static int _tlbpagesize = 4096;
207 //static int _smpd_count = 0;
209 static int user_set_flag = 0;
211 static int _checkProgress = 1; /* check deadlock */
212 static int _detected_hang = 0;
214 #define SMSG_ATTR_SIZE sizeof(gni_smsg_attr_t)
217 static int useDynamicSMSG = 0; /* dynamic smsgs setup */
219 static int avg_smsg_connection = 32;
220 static int *smsg_connected_flag= 0;
221 static gni_smsg_attr_t **smsg_attr_vector_local;
222 static gni_smsg_attr_t **smsg_attr_vector_remote;
223 static gni_ep_handle_t ep_hndl_unbound;
224 static gni_smsg_attr_t send_smsg_attr;
225 static gni_smsg_attr_t recv_smsg_attr;
227 typedef struct _dynamic_smsg_mailbox{
231 gni_mem_handle_t mem_hndl;
232 struct _dynamic_smsg_mailbox *next;
233 }dynamic_smsg_mailbox_t;
235 static dynamic_smsg_mailbox_t *mailbox_list;
237 static CmiUInt8 smsg_send_count = 0, last_smsg_send_count = 0;
238 static CmiUInt8 smsg_recv_count = 0, last_smsg_recv_count = 0;
241 int lrts_send_msg_id = 0;
242 int lrts_local_done_msg = 0;
243 int lrts_send_rdma_success = 0;
252 #if CMK_PERSISTENT_COMM
253 #define PERSISTENT_GET_BASE 0
254 #if !PERSISTENT_GET_BASE
255 #define CMK_PERSISTENT_COMM_PUT 1
257 #include "machine-persistent.h"
258 #define POST_HIGHPRIORITY_RDMA STATS_SENDRDMAMSG_TIME(SendRdmaMsg(sendHighPriorBuf));
260 #define POST_HIGHPRIORITY_RDMA
263 #if REMOTE_EVENT && (CMK_USE_OOB || CMK_PERSISTENT_COMM_PUT)
264 #define PUMP_REMOTE_HIGHPRIORITY STATS_PUMPREMOTETRANSACTIONS_TIME(PumpRemoteTransactions(highpriority_rx_cqh) );
266 #define PUMP_REMOTE_HIGHPRIORITY
269 //#define USE_ONESIDED 1
271 //onesided implementation is wrong, since no place to restore omdh
272 #include "onesided.h"
273 onesided_hnd_t onesided_hnd;
275 #define MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh) omdh. onesided_mem_register(handler, (uint64_t)msg, size, 0, myomdh)
277 #define MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh) onesided_mem_deregister(handler, myomdh)
280 uint8_t onesided_hnd, omdh;
282 #if REMOTE_EVENT || CQWRITE
283 #define MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdhh, cqh, status) \
284 if(register_memory_size+size>= MAX_REG_MEM) { \
285 status = GNI_RC_ERROR_NOMEM;} \
286 else {status = GNI_MemRegister(nic_hndl, (uint64_t)msg, (uint64_t)size, cqh, GNI_MEM_READWRITE, -1, mem_hndl); \
287 if(status == GNI_RC_SUCCESS) register_memory_size += size; }
289 #define MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh, cqh, status ) \
290 if (register_memory_size + size >= MAX_REG_MEM) { \
291 status = GNI_RC_ERROR_NOMEM; \
292 } else { status = GNI_MemRegister(nic_hndl, (uint64_t)msg, (uint64_t)size, NULL, GNI_MEM_READWRITE, -1, mem_hndl); \
293 if(status == GNI_RC_SUCCESS) register_memory_size += size; }
296 #define MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh, size) \
297 do { if (GNI_MemDeregister(nic_hndl, (mem_hndl) ) == GNI_RC_SUCCESS) \
298 register_memory_size -= size; \
299 else CmiAbort("MEM_DEregister"); \
303 #define GetMempoolBlockPtr(x) MEMPOOL_GetBlockPtr(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
304 #define GetMempoolPtr(x) MEMPOOL_GetMempoolPtr(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
305 #define GetMempoolsize(x) MEMPOOL_GetSize(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
306 #define GetMemHndl(x) MEMPOOL_GetMemHndl(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
307 #define IncreaseMsgInRecv(x) MEMPOOL_IncMsgInRecv(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
308 #define DecreaseMsgInRecv(x) MEMPOOL_DecMsgInRecv(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
309 #define IncreaseMsgInSend(x) MEMPOOL_IncMsgInSend(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
310 #define DecreaseMsgInSend(x) MEMPOOL_DecMsgInSend(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
311 #define NoMsgInSend(x) MEMPOOL_GetMsgInSend(MEMPOOL_GetMempoolHeader(x,ALIGNBUF)) == 0
312 #define NoMsgInRecv(x) MEMPOOL_GetMsgInRecv(MEMPOOL_GetMempoolHeader(x,ALIGNBUF)) == 0
313 #define NoMsgInFlight(x) (NoMsgInSend(x) && NoMsgInRecv(x))
314 #define IsMemHndlZero(x) ((x).qword1 == 0 && (x).qword2 == 0)
315 #define SetMemHndlZero(x) do {(x).qword1 = 0;(x).qword2 = 0;} while (0)
316 #define NotRegistered(x) IsMemHndlZero(GetMemHndl(x))
318 #define GetMemHndlFromBlockHeader(x) MEMPOOL_GetBlockMemHndl(x)
319 #define GetSizeFromBlockHeader(x) MEMPOOL_GetBlockSize(x)
321 #define CmiGetMsgSize(m) ((CmiMsgHeaderExt*)m)->size
322 #define CmiSetMsgSize(m,s) ((((CmiMsgHeaderExt*)m)->size)=(s))
323 #define CmiGetMsgSeq(m) ((CmiMsgHeaderExt*)m)->seq
324 #define CmiSetMsgSeq(m, s) ((((CmiMsgHeaderExt*)m)->seq) = (s))
328 /* =======Beginning of Definitions of Performance-Specific Macros =======*/
329 /* If SMSG is not used */
331 #define FMA_PER_CORE 1024
332 #define FMA_BUFFER_SIZE 1024
334 /* If SMSG is used */
335 static int SMSG_MAX_MSG = 1024;
336 #define SMSG_MAX_CREDIT 72
338 #define MSGQ_MAXSIZE 2048
340 /* large message transfer with FMA or BTE */
342 #define LRTS_GNI_RDMA_THRESHOLD 1024
344 /* remote events only work with RDMA */
345 #define LRTS_GNI_RDMA_THRESHOLD 0
349 static int REMOTE_QUEUE_ENTRIES=163840;
350 static int LOCAL_QUEUE_ENTRIES=163840;
352 static int REMOTE_QUEUE_ENTRIES=20480;
353 static int LOCAL_QUEUE_ENTRIES=20480;
356 #define BIG_MSG_TAG 0x26
357 #define PUT_DONE_TAG 0x28
358 #define DIRECT_PUT_DONE_TAG 0x29
360 /* SMSG is data message */
361 #define SMALL_DATA_TAG 0x31
362 /* SMSG is a control message to initialize a BTE */
363 #define LMSG_INIT_TAG 0x33
364 #define LMSG_PERSISTENT_INIT_TAG 0x34
365 #define LMSG_OOB_INIT_TAG 0x35
366 #define RDMA_ACK_TAG 0x36
367 #define RDMA_PUT_MD_TAG 0x37
368 #define RDMA_PUT_DONE_TAG 0x38
370 #define RDMA_PUT_MD_DIRECT_TAG 0x39
371 #define RDMA_PUT_DONE_DIRECT_TAG 0x40
372 #define RDMA_DEREG_DIRECT_TAG 0x41
374 #define RDMA_REG_AND_PUT_MD_DIRECT_TAG 0x42
375 #define RDMA_REG_AND_GET_MD_DIRECT_TAG 0x43
378 #define RDMA_COMM_PERFORM_GET_TAG 0x44
379 #define RDMA_COMM_PERFORM_PUT_TAG 0x45
387 #define GNI_RC_CHECK(msg,rc) do { if(rc != GNI_RC_SUCCESS) { printf("[%d] %s; err=%s\n",CmiMyPe(),msg,gni_err_str[rc]); fflush(stdout); CmiAbort("GNI_RC_CHECK"); } } while(0)
389 #define GNI_RC_CHECK(msg,rc)
392 #define ALIGN64(x) (size_t)((~63)&((x)+63))
393 //#define ALIGN4(x) (size_t)((~3)&((x)+3))
394 #define ALIGNHUGEPAGE(x) (size_t)((~(_tlbpagesize-1))&((x)+_tlbpagesize-1))
396 static int useStaticMSGQ = 0;
397 static int useStaticFMA = 0;
398 static int mysize, myrank;
399 static gni_nic_handle_t nic_hndl;
402 gni_mem_handle_t mdh;
405 // this is related to dynamic SMSG
407 typedef struct mdh_addr_list{
408 gni_mem_handle_t mdh;
410 struct mdh_addr_list *next;
413 static unsigned int smsg_memlen;
414 gni_smsg_attr_t **smsg_local_attr_vec = 0;
415 mdh_addr_t setup_mem;
416 mdh_addr_t *smsg_connection_vec = 0;
417 gni_mem_handle_t smsg_connection_memhndl;
418 static int smsg_expand_slots = 10;
419 static int smsg_available_slot = 0;
420 static void *smsg_mailbox_mempool = 0;
421 mdh_addr_list_t *smsg_dynamic_list = 0;
423 static void *smsg_mailbox_base;
424 gni_msgq_attr_t msgq_attrs;
425 gni_msgq_handle_t msgq_handle;
426 gni_msgq_ep_attr_t msgq_ep_attrs;
427 gni_msgq_ep_attr_t msgq_ep_attrs_size;
429 /* =====Beginning of Declarations of Machine Specific Variables===== */
431 static int modes = 0;
432 static gni_cq_handle_t smsg_rx_cqh = NULL; // smsg send
433 static gni_cq_handle_t default_tx_cqh = NULL; // bind to endpoint
434 static gni_cq_handle_t rdma_tx_cqh = NULL; // rdma - local event
435 static gni_cq_handle_t highprior_rdma_tx_cqh = NULL; // rdma - local event
436 static gni_cq_handle_t rdma_rx_cqh = NULL; // mempool - remote event
437 static gni_cq_handle_t highpriority_rx_cqh = NULL; // mempool - remote event
438 static gni_ep_handle_t *ep_hndl_array;
440 static CmiNodeLock *ep_lock_array;
441 static CmiNodeLock default_tx_cq_lock;
442 static CmiNodeLock rdma_tx_cq_lock;
443 static CmiNodeLock global_gni_lock;
444 static CmiNodeLock rx_cq_lock;
445 static CmiNodeLock smsg_mailbox_lock;
446 static CmiNodeLock smsg_rx_cq_lock;
447 static CmiNodeLock *mempool_lock;
449 #if CMK_ONESIDED_IMPL
450 static gni_cq_handle_t rdma_onesided_cqh = NULL;
451 static CmiNodeLock rdma_onesided_cq_lock;
454 //#define CMK_WITH_STATS 1
455 typedef struct msg_list
462 double creation_time;
467 typedef struct control_msg
469 uint64_t source_addr; /* address from the start of buffer */
470 uint64_t dest_addr; /* address from the start of buffer */
471 int total_length; /* total length */
472 int length; /* length of this packet */
474 int ack_index; /* index from integer to address */
476 int seq_id; //big message 0 meaning single message
477 gni_mem_handle_t source_mem_hndl;
478 #if PERSISTENT_GET_BASE
479 gni_mem_handle_t dest_mem_hndl;
481 struct control_msg *next;
484 #define CONTROL_MSG_SIZE (sizeof(CONTROL_MSG)-sizeof(void*))
486 typedef struct ack_msg
488 uint64_t source_addr; /* address from the start of buffer */
489 #if ! USE_LRTS_MEMPOOL
490 gni_mem_handle_t source_mem_hndl;
491 int length; /* total length */
493 struct ack_msg *next;
496 #define ACK_MSG_SIZE (sizeof(ACK_MSG)-sizeof(void*))
500 uint64_t handler_addr;
504 char core[CmiMsgHeaderSizeBytes];
509 CpvDeclare(int, CmiHandleDirectIdx);
510 void CmiHandleDirectMsg(cmidirectMsg* msg)
513 CmiDirectUserHandle *_handle= (CmiDirectUserHandle*)(msg->handler);
514 (*(_handle->callbackFnPtr))(_handle->callbackData);
520 CpvInitialize(int, CmiHandleDirectIdx);
521 CpvAccess(CmiHandleDirectIdx) = CmiRegisterHandler( (CmiHandler) CmiHandleDirectMsg);
525 typedef struct rmda_msg
531 gni_post_descriptor_t *pd;
540 #if CMK_LOCKLESS_QUEUE
541 #define ONE_SEND_QUEUE 1
543 #define ONE_SEND_QUEUE 0
546 typedef PCQueue BufferList;
547 typedef struct msg_list_index
558 PCQueue sendHighPriorBuf;
559 // buffered send queue
561 typedef struct smsg_queue
563 MSG_LIST_INDEX *smsg_msglist_index;
566 PCQueue nonEmptyQueues;
570 typedef struct smsg_queue
572 #if CMK_LOCKLESS_QUEUE
573 MPMCQueue sendMsgBuf;
580 SMSG_QUEUE smsg_queue;
582 SMSG_QUEUE smsg_oob_queue;
583 #define SEND_OOB_SMSG(x) SendBufferMsg(&x, NULL);
584 #define PUMP_LOCAL_HIGHPRIORITY STATS_PUMPLOCALTRANSACTIONS_RDMA_TIME(PumpLocalTransactions(highprior_rdma_tx_cqh, rdma_tx_cq_lock));
586 #define SEND_OOB_SMSG(x)
587 #define PUMP_LOCAL_HIGHPRIORITY
590 #define FreeMsgList(d) free(d);
591 #define MallocMsgList(d) d = ((MSG_LIST*)malloc(sizeof(MSG_LIST)));
593 #define FreeControlMsg(d) free(d);
594 #define MallocControlMsg(d) d = ((CONTROL_MSG*)malloc(sizeof(CONTROL_MSG)));
596 #define FreeAckMsg(d) free(d);
597 #define MallocAckMsg(d) d = ((ACK_MSG*)malloc(sizeof(ACK_MSG)));
599 #define FreeRdmaRequest(d) free(d);
600 #define MallocRdmaRequest(d) d = ((RDMA_REQUEST*)malloc(sizeof(RDMA_REQUEST)));
601 /* reuse gni_post_descriptor_t */
602 static gni_post_descriptor_t *post_freelist=0;
604 #define FreePostDesc(d) free(d);
605 #define MallocPostDesc(d) d = ((gni_post_descriptor_t*)malloc(sizeof(gni_post_descriptor_t))); _MEMCHECK(d);
608 /* LrtsSent is called but message can not be sent by SMSGSend because of mailbox full or no credit */
609 static int buffered_smsg_counter = 0;
611 /* SmsgSend return success but message sent is not confirmed by remote side */
612 static MSG_LIST *buffered_fma_head = 0;
613 static MSG_LIST *buffered_fma_tail = 0;
616 #define IsFree(a,ind) !( a& (1<<(ind) ))
617 #define SET_BITS(a,ind) a = ( a | (1<<(ind )) )
618 #define Reset(a,ind) a = ( a & (~(1<<(ind))) )
620 CpvDeclare(mempool_type*, mempool);
622 #if CMK_PERSISTENT_COMM_PUT
623 CpvDeclare(mempool_type*, persistent_mempool);
626 #if REMOTE_EVENT || CMK_SMSGS_FREE_AFTER_EVENT
633 typedef struct IndexPool {
634 struct IndexStruct *indexes;
641 #define GetIndexType(pool, s) (pool.indexes[s].type)
642 #define GetIndexAddress(pool, s) (pool.indexes[s].addr)
644 static void IndexPool_init(IndexPool *pool, int initsize, int maxsize)
647 pool->size = initsize;
648 pool->maxsize = maxsize;
649 pool->indexes = (struct IndexStruct *)malloc(pool->size*sizeof(struct IndexStruct));
650 if(pool->indexes == NULL) {
651 CmiAbort("malloc of pool is null\n");
653 for (i=0; i<pool->size-1; i++) {
654 pool->indexes[i].next = i+1;
656 pool->indexes[i].next = -1;
658 #if MULTI_THREAD_SEND || CMK_PERSISTENT_COMM_PUT
659 pool->lock = CmiCreateLock();
665 static int IndexPool_getslot(IndexPool *pool, void *addr, int type)
668 #if MULTI_THREAD_SEND
673 int newsize = pool->size * 2;
674 //printf("[%d] IndexPool_getslot %p expand to: %d\n", myrank, pool, newsize);
675 if (newsize > pool->maxsize) {
676 static int indexpool_overflow = 0;
677 if(!indexpool_overflow){
678 printf("[%d] Warning: IndexPool_getslot %p overflow when expanding to: %d\n", myrank, pool, newsize);
679 indexpool_overflow = 1;
683 struct IndexStruct *old_ackpool = pool->indexes;
684 pool->indexes = (struct IndexStruct *)malloc(newsize*sizeof(struct IndexStruct));
685 if(pool->indexes == NULL) {
686 CmiAbort("malloc of pool is null\n");
688 memcpy(pool->indexes, old_ackpool, pool->size*sizeof(struct IndexStruct));
689 for (i=pool->size; i<newsize-1; i++) {
690 pool->indexes[i].next = i+1;
692 pool->indexes[i].next = -1;
693 pool->freehead = pool->size;
695 pool->size = newsize;
698 pool->freehead = pool->indexes[s].next;
699 pool->indexes[s].addr = addr;
700 pool->indexes[s].type = type;
701 #if MULTI_THREAD_SEND
702 CmiUnlock(pool->lock);
707 static void IndexPool_freeslot(IndexPool *pool, int s)
709 CmiAssert(s>=0 && s<pool->size);
710 #if MULTI_THREAD_SEND
713 pool->indexes[s].next = pool->freehead;
715 #if MULTI_THREAD_SEND
716 CmiUnlock(pool->lock);
722 #if CMK_SMSGS_FREE_AFTER_EVENT
725 the pool is to buffer sending smsgs until it can be free'ed .
727 static IndexPool smsgsPool;
731 /* ack pool for remote events */
733 static int SHIFT = 18;
734 #define INDEX_MASK ((1<<(32-SHIFT-1)) - 1)
735 #define RANK_MASK ((1<<SHIFT) - 1)
736 #define ACK_EVENT(idx) ((((idx) & INDEX_MASK)<<SHIFT) | myrank)
738 #define GET_TYPE(evt) (((evt) >> 31) & 1)
739 #define GET_RANK(evt) ((evt) & RANK_MASK)
740 #define GET_INDEX(evt) (((evt) >> SHIFT) & INDEX_MASK)
742 #define PERSIST_EVENT(idx) ( (1<<31) | (((idx) & INDEX_MASK)<<SHIFT) | myrank)
743 #define DIRECT_EVENT(idx) ( (1<<31) | (((idx) & INDEX_MASK)<<SHIFT) | myrank)
746 #define POOL_INIT_SIZE 4096
748 #define POOL_INIT_SIZE 1024
751 static IndexPool ackPool;
752 #if CMK_PERSISTENT_COMM_PUT
753 static IndexPool persistPool;
755 #define persistPool ackPool
762 /* =====Beginning of Definitions of Message-Corruption Related Macros=====*/
763 #define CMI_MAGIC(msg) ((CmiMsgHeaderBasic *)msg)->magic
764 #define CHARM_MAGIC_NUMBER 126
766 #if CMK_ERROR_CHECKING
768 unsigned char computeCheckSum(unsigned char *data, int len);
769 static int checksum_flag = 0;
770 #define CMI_SET_CHECKSUM(msg, len) \
771 if (checksum_flag) { \
772 ((CmiMsgHeaderBasic *)msg)->cksum = 0; \
773 ((CmiMsgHeaderBasic *)msg)->cksum = computeCheckSum((unsigned char*)msg, len); \
775 #define CMI_CHECK_CHECKSUM(msg, len) \
777 if (computeCheckSum((unsigned char*)msg, len) != 0) \
778 CmiAbort("Fatal error: checksum doesn't agree!\n");
780 #define CMI_SET_CHECKSUM(msg, len)
781 #define CMI_CHECK_CHECKSUM(msg, len)
783 /* =====End of Definitions of Message-Corruption Related Macros=====*/
785 static int print_stats = 0;
786 static int stats_off = 0;
787 void CmiTurnOnStats(void)
790 //CmiPrintf("[%d][%d:%d]+++++++++++ turning on stats \n", CmiMyNode(), CmiMyPe(), CmiMyRank());
793 void CmiTurnOffStats(void)
798 #define IS_PUT(type) (type == GNI_POST_FMA_PUT || type == GNI_POST_RDMA_PUT)
801 FILE *counterLog = NULL;
802 typedef struct comm_thread_stats
804 uint64_t smsg_data_count;
805 uint64_t lmsg_init_count;
807 uint64_t big_msg_ack_count;
809 uint64_t direct_put_done_count;
810 uint64_t put_done_count;
811 //times of calling SmsgSend
812 uint64_t try_smsg_data_count;
813 uint64_t try_lmsg_init_count;
814 uint64_t try_ack_count;
815 uint64_t try_big_msg_ack_count;
816 uint64_t try_direct_put_done_count;
817 uint64_t try_put_done_count;
818 uint64_t try_smsg_count;
820 double max_time_in_send_buffered_smsg;
821 double all_time_in_send_buffered_smsg;
823 uint64_t rdma_get_count, rdma_put_count;
824 uint64_t try_rdma_get_count, try_rdma_put_count;
825 double max_time_from_control_to_rdma_init;
826 double all_time_from_control_to_rdma_init;
828 double max_time_from_rdma_init_to_rdma_done;
829 double all_time_from_rdma_init_to_rdma_done;
831 int count_in_PumpNetwork;
832 double time_in_PumpNetwork;
833 double max_time_in_PumpNetwork;
834 int count_in_SendBufferMsg_smsg;
835 double time_in_SendBufferMsg_smsg;
836 double max_time_in_SendBufferMsg_smsg;
837 int count_in_SendRdmaMsg;
838 double time_in_SendRdmaMsg;
839 double max_time_in_SendRdmaMsg;
840 int count_in_PumpRemoteTransactions;
841 double time_in_PumpRemoteTransactions;
842 double max_time_in_PumpRemoteTransactions;
843 int count_in_PumpLocalTransactions_rdma;
844 double time_in_PumpLocalTransactions_rdma;
845 double max_time_in_PumpLocalTransactions_rdma;
846 int count_in_PumpDatagramConnection;
847 double time_in_PumpDatagramConnection;
848 double max_time_in_PumpDatagramConnection;
851 static Comm_Thread_Stats comm_stats;
853 static char *counters_dirname = "counters";
855 static void init_comm_stats(void)
857 memset(&comm_stats, 0, sizeof(Comm_Thread_Stats));
860 int code = mkdir(counters_dirname, 00777);
861 sprintf(ln,"%s/statistics.%d.%d", counters_dirname, mysize, myrank);
862 counterLog=fopen(ln,"w");
863 if (counterLog == NULL) CmiAbort("Counter files open failed");
867 #define SMSG_CREATION( x ) if(print_stats) { x->creation_time = CmiWallTimer(); }
869 #define SMSG_SENT_DONE(creation_time, tag) \
870 if (print_stats && !stats_off) { if( tag == SMALL_DATA_TAG) comm_stats.smsg_data_count++; \
871 else if( tag == LMSG_INIT_TAG || tag == LMSG_OOB_INIT_TAG) comm_stats.lmsg_init_count++; \
872 else if( tag == ACK_TAG) comm_stats.ack_count++; \
873 else if( tag == BIG_MSG_TAG) comm_stats.big_msg_ack_count++; \
874 else if( tag == PUT_DONE_TAG ) comm_stats.put_done_count++; \
875 else if( tag == DIRECT_PUT_DONE_TAG ) comm_stats.direct_put_done_count++; \
876 comm_stats.smsg_count++; \
877 double inbuff_time = CmiWallTimer() - creation_time; \
878 if(inbuff_time > comm_stats.max_time_in_send_buffered_smsg) comm_stats.max_time_in_send_buffered_smsg= inbuff_time; \
879 comm_stats.all_time_in_send_buffered_smsg += inbuff_time; \
882 #define SMSG_TRY_SEND(tag) \
883 if (print_stats && !stats_off){ if( tag == SMALL_DATA_TAG) comm_stats.try_smsg_data_count++; \
884 else if( tag == LMSG_INIT_TAG || tag == LMSG_OOB_INIT_TAG) comm_stats.try_lmsg_init_count++; \
885 else if( tag == ACK_TAG) comm_stats.try_ack_count++; \
886 else if( tag == BIG_MSG_TAG) comm_stats.try_big_msg_ack_count++; \
887 else if( tag == PUT_DONE_TAG ) comm_stats.try_put_done_count++; \
888 else if( tag == DIRECT_PUT_DONE_TAG ) comm_stats.try_direct_put_done_count++; \
889 comm_stats.try_smsg_count++; \
892 #define RDMA_TRY_SEND(type) if (print_stats && !stats_off) {IS_PUT(type)?comm_stats.try_rdma_put_count++:comm_stats.try_rdma_get_count++;}
894 #define RDMA_TRANS_DONE(x) \
895 if (print_stats && !stats_off) { double rdma_trans_time = CmiWallTimer() - x ; \
896 if(rdma_trans_time > comm_stats.max_time_from_rdma_init_to_rdma_done) comm_stats.max_time_from_rdma_init_to_rdma_done = rdma_trans_time; \
897 comm_stats.all_time_from_rdma_init_to_rdma_done += rdma_trans_time; \
900 #define RDMA_TRANS_INIT(type, x) \
901 if (print_stats && !stats_off) { IS_PUT(type)?comm_stats.rdma_put_count++:comm_stats.rdma_get_count++; \
902 double rdma_trans_time = CmiWallTimer() - x ; \
903 if(rdma_trans_time > comm_stats.max_time_from_control_to_rdma_init) comm_stats.max_time_from_control_to_rdma_init = rdma_trans_time; \
904 comm_stats.all_time_from_control_to_rdma_init += rdma_trans_time; \
907 #define STATS_PUMPNETWORK_TIME(x) \
908 { double t = CmiWallTimer(); \
910 t = CmiWallTimer() - t; \
911 comm_stats.count_in_PumpNetwork++; \
912 comm_stats.time_in_PumpNetwork += t; \
913 if (t>comm_stats.max_time_in_PumpNetwork) \
914 comm_stats.max_time_in_PumpNetwork = t; \
917 #define STATS_PUMPREMOTETRANSACTIONS_TIME(x) \
918 { double t = CmiWallTimer(); \
920 t = CmiWallTimer() - t; \
921 comm_stats.count_in_PumpRemoteTransactions ++; \
922 comm_stats.time_in_PumpRemoteTransactions += t; \
923 if (t>comm_stats.max_time_in_PumpRemoteTransactions) \
924 comm_stats.max_time_in_PumpRemoteTransactions = t; \
927 #define STATS_PUMPLOCALTRANSACTIONS_RDMA_TIME(x) \
928 { double t = CmiWallTimer(); \
930 t = CmiWallTimer() - t; \
931 comm_stats.count_in_PumpLocalTransactions_rdma ++; \
932 comm_stats.time_in_PumpLocalTransactions_rdma += t; \
933 if (t>comm_stats.max_time_in_PumpLocalTransactions_rdma) \
934 comm_stats.max_time_in_PumpLocalTransactions_rdma = t; \
937 #define STATS_SEND_SMSGS_TIME(x) \
938 { double t = CmiWallTimer(); \
940 t = CmiWallTimer() - t; \
941 comm_stats.count_in_SendBufferMsg_smsg ++; \
942 comm_stats.time_in_SendBufferMsg_smsg += t; \
943 if (t>comm_stats.max_time_in_SendBufferMsg_smsg) \
944 comm_stats.max_time_in_SendBufferMsg_smsg = t; \
947 #define STATS_SENDRDMAMSG_TIME(x) \
948 { double t = CmiWallTimer(); \
950 t = CmiWallTimer() - t; \
951 comm_stats.count_in_SendRdmaMsg ++; \
952 comm_stats.time_in_SendRdmaMsg += t; \
953 if (t>comm_stats.max_time_in_SendRdmaMsg) \
954 comm_stats.max_time_in_SendRdmaMsg = t; \
957 #define STATS_PUMPDATAGRAMCONNECTION_TIME(x) \
958 { double t = CmiWallTimer(); \
960 t = CmiWallTimer() - t; \
961 comm_stats.count_in_PumpDatagramConnection ++; \
962 comm_stats.time_in_PumpDatagramConnection += t; \
963 if (t>comm_stats.max_time_in_PumpDatagramConnection) \
964 comm_stats.max_time_in_PumpDatagramConnection = t; \
967 static void print_comm_stats(void)
969 fprintf(counterLog, "Node[%d] SMSG time in buffer\t[total:%f\tmax:%f\tAverage:%f](milisecond)\n", myrank, 1000.0*comm_stats.all_time_in_send_buffered_smsg, 1000.0*comm_stats.max_time_in_send_buffered_smsg, 1000.0*comm_stats.all_time_in_send_buffered_smsg/comm_stats.smsg_count);
970 fprintf(counterLog, "Node[%d] Smsg Msgs \t[Total:%lld\t Data:%lld\t Lmsg_Init:%lld\t ACK:%lld\t BIG_MSG_ACK:%lld Direct_put_done:%lld\t Persistent_put_done:%lld]\n", myrank,
971 comm_stats.smsg_count, comm_stats.smsg_data_count, comm_stats.lmsg_init_count,
972 comm_stats.ack_count, comm_stats.big_msg_ack_count, comm_stats.direct_put_done_count, comm_stats.put_done_count);
974 fprintf(counterLog, "Node[%d] SmsgSendCalls\t[Total:%lld\t Data:%lld\t Lmsg_Init:%lld\t ACK:%lld\t BIG_MSG_ACK:%lld Direct_put_done:%lld\t Persistent_put_done:%lld]\n\n", myrank,
975 comm_stats.try_smsg_count, comm_stats.try_smsg_data_count, comm_stats.try_lmsg_init_count,
976 comm_stats.try_ack_count, comm_stats.try_big_msg_ack_count, comm_stats.try_direct_put_done_count, comm_stats.try_put_done_count);
978 fprintf(counterLog, "Node[%d] Rdma Transaction [count (GET/PUT):%lld %lld\t calls (GET/PUT):%lld %lld]\n", myrank, comm_stats.rdma_get_count, comm_stats.rdma_put_count, comm_stats.try_rdma_get_count, comm_stats.try_rdma_put_count);
979 fprintf(counterLog, "Node[%d] Rdma time from control arrives to rdma init [Total:%f\tMAX:%f\t Average:%f](milisecond)\n", myrank, 1000.0*comm_stats.all_time_from_control_to_rdma_init, 1000.0*comm_stats.max_time_from_control_to_rdma_init, 1000.0*comm_stats.all_time_from_control_to_rdma_init/(comm_stats.rdma_get_count+comm_stats.rdma_put_count));
980 fprintf(counterLog, "Node[%d] Rdma time from init to rdma done [Total:%f\tMAX:%f\t Average:%f](milisecond)\n\n", myrank,1000.0*comm_stats.all_time_from_rdma_init_to_rdma_done, 1000.0*comm_stats.max_time_from_rdma_init_to_rdma_done, 1000.0*comm_stats.all_time_from_rdma_init_to_rdma_done/(comm_stats.rdma_get_count+comm_stats.rdma_put_count));
983 fprintf(counterLog, " count\ttotal(s)\tmax(s)\taverage(us)\n");
984 fprintf(counterLog, "PumpNetworkSmsg: %d\t%.6f\t%.6f\t%.6f\n", comm_stats.count_in_PumpNetwork, comm_stats.time_in_PumpNetwork, comm_stats.max_time_in_PumpNetwork, comm_stats.time_in_PumpNetwork*1e6/comm_stats.count_in_PumpNetwork);
985 fprintf(counterLog, "PumpRemoteTransactions: %d\t%.6f\t%.6f\t%.6f\n", comm_stats.count_in_PumpRemoteTransactions, comm_stats.time_in_PumpRemoteTransactions, comm_stats.max_time_in_PumpRemoteTransactions, comm_stats.time_in_PumpRemoteTransactions*1e6/comm_stats.count_in_PumpRemoteTransactions);
986 fprintf(counterLog, "PumpLocalTransactions(RDMA): %d\t%.6f\t%.6f\t%.6f\n", comm_stats.count_in_PumpLocalTransactions_rdma, comm_stats.time_in_PumpLocalTransactions_rdma, comm_stats.max_time_in_PumpLocalTransactions_rdma, comm_stats.time_in_PumpLocalTransactions_rdma*1e6/comm_stats.count_in_PumpLocalTransactions_rdma);
987 fprintf(counterLog, "SendBufferMsg (SMSG): %d\t%.6f\t%.6f\t%.6f\n", comm_stats.count_in_SendBufferMsg_smsg, comm_stats.time_in_SendBufferMsg_smsg, comm_stats.max_time_in_SendBufferMsg_smsg, comm_stats.time_in_SendBufferMsg_smsg*1e6/comm_stats.count_in_SendBufferMsg_smsg);
988 fprintf(counterLog, "SendRdmaMsg: %d\t%.6f\t%.6f\t%.6f\n", comm_stats.count_in_SendRdmaMsg, comm_stats.time_in_SendRdmaMsg, comm_stats.max_time_in_SendRdmaMsg, comm_stats.time_in_SendRdmaMsg*1e6/comm_stats.count_in_SendRdmaMsg);
990 fprintf(counterLog, "PumpDatagramConnection: %d\t%.6f\t%.6f\t%.6f\n", comm_stats.count_in_PumpDatagramConnection, comm_stats.time_in_PumpDatagramConnection, comm_stats.max_time_in_PumpDatagramConnection, comm_stats.time_in_PumpDatagramConnection*1e6/comm_stats.count_in_PumpDatagramConnection);
996 #define STATS_PUMPNETWORK_TIME(x) x
997 #define STATS_SEND_SMSGS_TIME(x) x
998 #define STATS_PUMPREMOTETRANSACTIONS_TIME(x) x
999 #define STATS_PUMPLOCALTRANSACTIONS_RDMA_TIME(x) x
1000 #define STATS_SENDRDMAMSG_TIME(x) x
1001 #define STATS_PUMPDATAGRAMCONNECTION_TIME(x) x
1005 allgather(void *in,void *out, int len)
1007 static int *ivec_ptr=NULL,already_called=0,job_size=0;
1010 char *tmp_buf,*out_ptr;
1012 if(!already_called) {
1014 rc = PMI_Get_size(&job_size);
1015 CmiAssert(rc == PMI_SUCCESS);
1016 rc = PMI_Get_rank(&my_rank);
1017 CmiAssert(rc == PMI_SUCCESS);
1019 ivec_ptr = (int *)malloc(sizeof(int) * job_size);
1020 CmiAssert(ivec_ptr != NULL);
1022 rc = PMI_Allgather(&my_rank,ivec_ptr,sizeof(int));
1023 CmiAssert(rc == PMI_SUCCESS);
1029 tmp_buf = (char *)malloc(job_size * len);
1032 rc = PMI_Allgather(in,tmp_buf,len);
1033 CmiAssert(rc == PMI_SUCCESS);
1035 out_ptr = (char *)out;
1037 for(i=0;i<job_size;i++) {
1039 memcpy(&out_ptr[len * ivec_ptr[i]],&tmp_buf[i * len],len);
1047 allgather_2(void *in,void *out, int len)
1049 //PMI_Allgather is out of order
1050 int i,rc, extend_len;
1052 char *out_ptr, *out_ref;
1055 extend_len = sizeof(int) + len;
1056 in2 = (char*)malloc(extend_len);
1058 memcpy(in2, &myrank, sizeof(int));
1059 memcpy(in2+sizeof(int), in, len);
1061 out_ptr = (char*)malloc(mysize*extend_len);
1063 rc = PMI_Allgather(in2, out_ptr, extend_len);
1064 GNI_RC_CHECK("allgather", rc);
1066 out_ref = (char *)out;
1068 for(i=0;i<mysize;i++) {
1070 memcpy(&rank_index, &(out_ptr[extend_len*i]), sizeof(int));
1071 //copy to the rank index slot
1072 memcpy(&out_ref[rank_index*len], &out_ptr[extend_len*i+sizeof(int)], len);
1080 static unsigned int get_gni_nic_address(int device_id)
1082 unsigned int address, cpu_id;
1083 gni_return_t status;
1084 int i, alps_dev_id=-1,alps_address=-1;
1085 char *token, *p_ptr;
1087 p_ptr = getenv("PMI_GNI_DEV_ID");
1089 status = GNI_CdmGetNicAddress(device_id, &address, &cpu_id);
1091 GNI_RC_CHECK("GNI_CdmGetNicAddress", status);
1093 while ((token = strtok(p_ptr,":")) != NULL) {
1094 alps_dev_id = atoi(token);
1095 if (alps_dev_id == device_id) {
1100 CmiAssert(alps_dev_id != -1);
1101 p_ptr = getenv("PMI_GNI_LOC_ADDR");
1102 CmiAssert(p_ptr != NULL);
1104 while ((token = strtok(p_ptr,":")) != NULL) {
1105 if (i == alps_dev_id) {
1106 alps_address = atoi(token);
1112 CmiAssert(alps_address != -1);
1113 address = alps_address;
1118 static uint8_t get_ptag(void)
1120 char *p_ptr, *token;
1123 p_ptr = getenv("PMI_GNI_PTAG");
1124 CmiAssert(p_ptr != NULL);
1125 token = strtok(p_ptr, ":");
1126 ptag = (uint8_t)atoi(token);
1131 static uint32_t get_cookie(void)
1134 char *p_ptr, *token;
1136 p_ptr = getenv("PMI_GNI_COOKIE");
1137 CmiAssert(p_ptr != NULL);
1138 token = strtok(p_ptr, ":");
1139 cookie = (uint32_t)atoi(token);
1146 /* directly mmap memory from hugetlbfs for large pages */
1148 #include <sys/stat.h>
1150 #include <sys/mman.h>
1155 #include <hugetlbfs.h>
1160 // size must be _tlbpagesize aligned
1161 void *my_get_huge_pages(size_t size)
1165 mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
1168 snprintf(filename, sizeof(filename), "%s/charm_mempool.%d.%d", hugetlbfs_find_path_for_size(_tlbpagesize), getpid(), rand());
1169 fd = open(filename, O_RDWR | O_CREAT, mode);
1171 CmiAbort("my_get_huge_pages: open filed");
1173 ptr = mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_PRIVATE, fd, 0);
1174 if (ptr == MAP_FAILED) ptr = NULL;
1175 //printf("[%d] my_get_huge_pages: %s %d %p\n", myrank, filename, size, ptr);
1181 void my_free_huge_pages(void *ptr, int size)
1183 //printf("[%d] my_free_huge_pages: %p %d\n", myrank, ptr, size);
1184 int ret = munmap(ptr, size);
1185 if (ret == -1) CmiAbort("munmap failed in my_free_huge_pages");
1190 /* =====Beginning of Definitions of Message-Corruption Related Macros=====*/
1191 /* TODO: add any that are related */
1192 /* =====End of Definitions of Message-Corruption Related Macros=====*/
1195 #include "machine-lrts.h"
1197 #include "machine-common-core.C"
1199 #include "machine-rdma.h"
1202 static int SendBufferMsg(SMSG_QUEUE *queue, SMSG_QUEUE *urgent_queue);
1203 static void SendRdmaMsg(PCQueue );
1204 static void PumpNetworkSmsg(void);
1205 static void PumpLocalTransactions(gni_cq_handle_t tx_cqh, CmiNodeLock cq_lock);
1207 static void PumpCqWriteTransactions(void);
1210 static void PumpRemoteTransactions(gni_cq_handle_t);
1213 #if MACHINE_DEBUG_LOG
1214 static CmiInt8 buffered_recv_msg = 0;
1215 int lrts_smsg_success = 0;
1216 int lrts_received_msg = 0;
1219 static void sweep_mempool(mempool_type *mptr)
1222 block_header *current = &(mptr->block_head);
1224 printf("[n %d %d] sweep_mempool slot START.\n", myrank, n++);
1225 while( current!= NULL) {
1226 printf("[n %d %d] sweep_mempool slot %p size: %lld used: %d (%d %d) %lld %lld.\n", myrank, n++, current, current->size, 1<<current->used, current->msgs_in_send, current->msgs_in_recv, current->mem_hndl.qword1, current->mem_hndl.qword2);
1227 current = current->block_next?(block_header *)((char*)mptr+current->block_next):NULL;
1229 printf("[n %d] sweep_mempool slot END.\n", myrank);
1233 static INLINE_KEYWORD gni_return_t deregisterMemory(mempool_type *mptr, block_header **from)
1235 block_header *current = *from;
1237 //while(register_memory_size>= MAX_REG_MEM)
1239 while( current!= NULL && ((current->msgs_in_send+current->msgs_in_recv)>0 || IsMemHndlZero(current->mem_hndl) ))
1240 current = current->block_next?(block_header *)((char*)mptr+current->block_next):NULL;
1243 if(current == NULL) return GNI_RC_ERROR_RESOURCE;
1244 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(GetMemHndlFromBlockHeader(current)) , &omdh, GetSizeFromBlockHeader(current));
1245 SetMemHndlZero(GetMemHndlFromBlockHeader(current));
1247 return GNI_RC_SUCCESS;
1250 static gni_return_t registerFromMempool(mempool_type *mptr, void *blockaddr, size_t size, gni_mem_handle_t *memhndl, gni_cq_handle_t cqh )
1252 gni_return_t status = GNI_RC_SUCCESS;
1253 //int size = GetMempoolsize(msg);
1254 //void *blockaddr = GetMempoolBlockPtr(msg);
1255 //gni_mem_handle_t *memhndl = &(GetMemHndl(msg));
1257 block_header *current = &(mptr->block_head);
1258 while(register_memory_size>= MAX_REG_MEM)
1260 status = deregisterMemory(mptr, ¤t);
1261 if (status != GNI_RC_SUCCESS) break;
1263 if(register_memory_size>= MAX_REG_MEM) return status;
1265 MACHSTATE3(8, "mempool (%lld,%lld,%d) \n", buffered_send_msg, buffered_recv_msg, register_memory_size);
1268 MEMORY_REGISTER(onesided_hnd, nic_hndl, blockaddr, size, memhndl, &omdh, cqh, status);
1269 if(status == GNI_RC_SUCCESS)
1273 else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1275 GNI_RC_CHECK("registerFromMempool", status);
1279 status = deregisterMemory(mptr, ¤t);
1280 if (status != GNI_RC_SUCCESS) break;
1287 static gni_return_t registerMemory(void *msg, size_t size, gni_mem_handle_t *t, gni_cq_handle_t cqh )
1289 static int rank = -1;
1291 gni_return_t status;
1292 mempool_type *mptr1 = CpvAccess(mempool);//mempool_type*)GetMempoolPtr(msg);
1293 //mempool_type *mptr1 = (mempool_type*)GetMempoolPtr(msg);
1296 status = registerFromMempool(mptr1, msg, size, t, cqh);
1297 if (status == GNI_RC_SUCCESS) return status;
1299 for (i=0; i<CmiMyNodeSize()+1; i++) {
1300 rank = (rank+1)%(CmiMyNodeSize()+1);
1301 mptr = CpvAccessOther(mempool, rank);
1302 if (mptr == mptr1) continue;
1303 status = registerFromMempool(mptr, msg, size, t, cqh);
1304 if (status == GNI_RC_SUCCESS) return status;
1307 return GNI_RC_ERROR_RESOURCE;
1310 #if CMK_ONESIDED_IMPL
1311 #include "machine-onesided.h"
1315 static void buffer_small_msgs(SMSG_QUEUE *queue, void *msg, int size, int destNode, uint8_t tag)
1318 MallocMsgList(msg_tmp);
1319 msg_tmp->destNode = destNode;
1320 msg_tmp->size = size;
1324 SMSG_CREATION(msg_tmp)
1328 #if CMK_LOCKLESS_QUEUE
1329 MPMCQueuePush(queue->sendMsgBuf, (char*)msg_tmp);
1331 PCQueuePush(queue->sendMsgBuf, (char*)msg_tmp);
1335 CmiLock(queue->smsg_msglist_index[destNode].lock);
1336 if(queue->smsg_msglist_index[destNode].pushed == 0)
1338 PCQueuePush(queue->nonEmptyQueues, (char*)&(queue->smsg_msglist_index[destNode]));
1340 PCQueuePush(queue->smsg_msglist_index[destNode].sendSmsgBuf, (char*)msg_tmp);
1341 CmiUnlock(queue->smsg_msglist_index[destNode].lock);
1343 PCQueuePush(queue->smsg_msglist_index[destNode].sendSmsgBuf, (char*)msg_tmp);
1348 buffered_smsg_counter++;
1352 INLINE_KEYWORD static void print_smsg_attr(gni_smsg_attr_t *a)
1354 printf("type=%d\n, credit=%d\n, size=%d\n, buf=%p, offset=%d\n", a->msg_type, a->mbox_maxcredit, a->buff_size, a->msg_buffer, a->mbox_offset);
1358 static void setup_smsg_connection(int destNode)
1360 mdh_addr_list_t *new_entry = 0;
1361 gni_post_descriptor_t *pd;
1362 gni_smsg_attr_t *smsg_attr;
1363 gni_return_t status = GNI_RC_NOT_DONE;
1364 RDMA_REQUEST *rdma_request_msg;
1366 if(smsg_available_slot == smsg_expand_slots)
1368 new_entry = (mdh_addr_list_t*)malloc(sizeof(mdh_addr_list_t));
1369 new_entry->addr = memalign(64, smsg_memlen*smsg_expand_slots);
1370 memset(new_entry->addr, 0, smsg_memlen*smsg_expand_slots);
1372 status = GNI_MemRegister(nic_hndl, (uint64_t)new_entry->addr,
1373 smsg_memlen*smsg_expand_slots, smsg_rx_cqh,
1377 smsg_available_slot = 0;
1378 new_entry->next = smsg_dynamic_list;
1379 smsg_dynamic_list = new_entry;
1381 smsg_attr = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
1382 smsg_attr->msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
1383 smsg_attr->mbox_maxcredit = SMSG_MAX_CREDIT;
1384 smsg_attr->msg_maxsize = SMSG_MAX_MSG;
1385 smsg_attr->mbox_offset = smsg_available_slot * smsg_memlen;
1386 smsg_attr->buff_size = smsg_memlen;
1387 smsg_attr->msg_buffer = smsg_dynamic_list->addr;
1388 smsg_attr->mem_hndl = smsg_dynamic_list->mdh;
1389 smsg_local_attr_vec[destNode] = smsg_attr;
1390 smsg_available_slot++;
1392 pd->type = GNI_POST_FMA_PUT;
1393 pd->cq_mode = GNI_CQMODE_GLOBAL_EVENT ;
1394 pd->dlvr_mode = GNI_DLVMODE_PERFORMANCE;
1395 pd->length = sizeof(gni_smsg_attr_t);
1396 pd->local_addr = (uint64_t) smsg_attr;
1397 pd->remote_addr = (uint64_t)&((((gni_smsg_attr_t*)(smsg_connection_vec[destNode].addr))[myrank]));
1398 pd->remote_mem_hndl = smsg_connection_vec[destNode].mdh;
1399 pd->src_cq_hndl = 0;
1402 status = GNI_PostFma(ep_hndl_array[destNode], pd);
1403 print_smsg_attr(smsg_attr);
1404 if(status == GNI_RC_ERROR_RESOURCE )
1406 MallocRdmaRequest(rdma_request_msg);
1407 rdma_request_msg->destNode = destNode;
1408 rdma_request_msg->pd = pd;
1409 /* buffer this request */
1412 if(status != GNI_RC_SUCCESS)
1413 printf("[%d=%d] send post FMA %s\n", myrank, destNode, gni_err_str[status]);
1415 printf("[%d=%d]OK send post FMA \n", myrank, destNode);
1419 /* useDynamicSMSG */
1421 static void alloc_smsg_attr( gni_smsg_attr_t *local_smsg_attr)
1423 gni_return_t status = GNI_RC_NOT_DONE;
1425 if(mailbox_list->offset == mailbox_list->size)
1427 dynamic_smsg_mailbox_t *new_mailbox_entry;
1428 new_mailbox_entry = (dynamic_smsg_mailbox_t*)malloc(sizeof(dynamic_smsg_mailbox_t));
1429 new_mailbox_entry->size = smsg_memlen*avg_smsg_connection;
1430 new_mailbox_entry->mailbox_base = malloc(new_mailbox_entry->size);
1431 memset(new_mailbox_entry->mailbox_base, 0, new_mailbox_entry->size);
1432 new_mailbox_entry->offset = 0;
1434 status = GNI_MemRegister(nic_hndl, (uint64_t)new_mailbox_entry->mailbox_base,
1435 new_mailbox_entry->size, smsg_rx_cqh,
1438 &(new_mailbox_entry->mem_hndl));
1440 GNI_RC_CHECK("register", status);
1441 new_mailbox_entry->next = mailbox_list;
1442 mailbox_list = new_mailbox_entry;
1444 local_smsg_attr->msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
1445 local_smsg_attr->mbox_maxcredit = SMSG_MAX_CREDIT;
1446 local_smsg_attr->msg_maxsize = SMSG_MAX_MSG;
1447 local_smsg_attr->mbox_offset = mailbox_list->offset;
1448 mailbox_list->offset += smsg_memlen;
1449 local_smsg_attr->buff_size = smsg_memlen;
1450 local_smsg_attr->msg_buffer = mailbox_list->mailbox_base;
1451 local_smsg_attr->mem_hndl = mailbox_list->mem_hndl;
1454 /* useDynamicSMSG */
1456 static int connect_to(int destNode)
1458 gni_return_t status = GNI_RC_NOT_DONE;
1459 CmiAssert(smsg_connected_flag[destNode] == 0);
1460 CmiAssert (smsg_attr_vector_local[destNode] == NULL);
1461 smsg_attr_vector_local[destNode] = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
1462 alloc_smsg_attr(smsg_attr_vector_local[destNode]);
1463 smsg_attr_vector_remote[destNode] = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
1465 CMI_GNI_LOCK(global_gni_lock)
1466 status = GNI_EpPostDataWId (ep_hndl_array[destNode], smsg_attr_vector_local[destNode], sizeof(gni_smsg_attr_t),smsg_attr_vector_remote[destNode] ,sizeof(gni_smsg_attr_t), destNode+mysize);
1467 CMI_GNI_UNLOCK(global_gni_lock)
1468 if (status == GNI_RC_ERROR_RESOURCE) {
1469 /* possibly destNode is making connection at the same time */
1470 free(smsg_attr_vector_local[destNode]);
1471 smsg_attr_vector_local[destNode] = NULL;
1472 free(smsg_attr_vector_remote[destNode]);
1473 smsg_attr_vector_remote[destNode] = NULL;
1474 mailbox_list->offset -= smsg_memlen;
1476 printf("[%d] send connect_to request to %d failed\n", myrank, destNode);
1480 GNI_RC_CHECK("GNI_Post", status);
1481 smsg_connected_flag[destNode] = 1;
1483 printf("[%d] send connect_to request to %d done\n", myrank, destNode);
1489 static gni_return_t send_smsg_message(SMSG_QUEUE *queue, int destNode, void *msg, int size, uint8_t tag, int inbuff, MSG_LIST *ptr, int smsgType, int useHeader)
1491 unsigned int remote_address;
1493 gni_return_t status = GNI_RC_ERROR_RESOURCE;
1494 gni_smsg_attr_t *smsg_attr;
1495 gni_post_descriptor_t *pd;
1496 gni_post_state_t post_state;
1500 if (useDynamicSMSG) {
1501 switch (smsg_connected_flag[destNode]) {
1503 connect_to(destNode); /* continue to case 1 */
1504 case 1: /* pending connection, do nothing */
1505 status = GNI_RC_NOT_DONE;
1507 buffer_small_msgs(queue, msg, size, destNode, tag);
1511 #if ! ONE_SEND_QUEUE
1512 if(PCQueueEmpty(queue->smsg_msglist_index[destNode].sendSmsgBuf) || inbuff==1)
1515 //CMI_GNI_LOCK(smsg_mailbox_lock)
1516 CMI_GNI_LOCK(default_tx_cq_lock)
1517 #if CMK_SMP_TRACE_COMMTHREAD
1519 int oldeventid = -1;
1520 if(tag == SMALL_DATA_TAG || tag == LMSG_INIT_TAG || tag == LMSG_OOB_INIT_TAG || tag == LMSG_PERSISTENT_INIT_TAG)
1523 if ( tag == SMALL_DATA_TAG)
1524 real_data = (char*)msg;
1526 real_data = (char*)(((CONTROL_MSG*)msg)->source_addr);
1527 TRACE_COMM_GET_MSGID(real_data, &oldpe, &oldeventid);
1528 TRACE_COMM_SET_COMM_MSGID(real_data);
1532 if (tag == LMSG_INIT_TAG || tag == LMSG_OOB_INIT_TAG || tag == LMSG_PERSISTENT_INIT_TAG) {
1533 CONTROL_MSG *control_msg_tmp = (CONTROL_MSG*)msg;
1534 if (control_msg_tmp->seq_id <= 0 && control_msg_tmp->ack_index == -1)
1536 control_msg_tmp->ack_index = IndexPool_getslot(&ackPool, (void*)control_msg_tmp->source_addr, 1);
1537 if (control_msg_tmp->ack_index == -1) { /* table overflow */
1538 status = GNI_RC_NOT_DONE;
1540 buffer_small_msgs(queue, msg, size, destNode, tag);
1550 double creation_time;
1552 creation_time = CmiWallTimer();
1554 creation_time = ptr->creation_time;
1557 #if CMK_SMSGS_FREE_AFTER_EVENT
1558 msgid = IndexPool_getslot(&smsgsPool, msg, smsgType);
1560 CmiAbort("IndexPool for SMSG overflows.");
1563 status = GNI_SmsgSendWTag(ep_hndl_array[destNode], NULL, 0, msg, size, msgid, tag);
1565 status = GNI_SmsgSendWTag(ep_hndl_array[destNode], msg, size, NULL, 0, msgid, tag);
1566 #if CMK_SMP_TRACE_COMMTHREAD
1567 if (oldpe != -1) TRACE_COMM_SET_MSGID(real_data, oldpe, oldeventid);
1569 CMI_GNI_UNLOCK(default_tx_cq_lock)
1570 //CMI_GNI_UNLOCK(smsg_mailbox_lock)
1571 if(status == GNI_RC_SUCCESS)
1574 SMSG_SENT_DONE(creation_time,tag)
1576 #if CMK_SMP_TRACE_COMMTHREAD
1577 if(tag == SMALL_DATA_TAG || tag == LMSG_INIT_TAG || tag == LMSG_OOB_INIT_TAG || tag == LMSG_PERSISTENT_INIT_TAG )
1579 TRACE_COMM_CREATION(CpvAccess(projTraceStart), real_data);
1583 status = GNI_RC_ERROR_RESOURCE;
1584 #if CMK_SMSGS_FREE_AFTER_EVENT
1585 IndexPool_freeslot(&smsgsPool, msgid);
1589 if(status != GNI_RC_SUCCESS && inbuff ==0)
1590 buffer_small_msgs(queue, msg, size, destNode, tag);
1595 static CONTROL_MSG* construct_control_msg(int size, char *msg, int seqno)
1597 /* construct a control message and send */
1598 CONTROL_MSG *control_msg_tmp;
1599 MallocControlMsg(control_msg_tmp);
1600 control_msg_tmp->source_addr = (uint64_t)msg;
1601 control_msg_tmp->seq_id = seqno;
1602 control_msg_tmp->total_length = control_msg_tmp->length = ALIGN64(size); //for GET 4 bytes aligned
1604 control_msg_tmp->ack_index = -1;
1606 #if USE_LRTS_MEMPOOL
1609 control_msg_tmp->source_mem_hndl = GetMemHndl(msg);
1613 SetMemHndlZero(control_msg_tmp->source_mem_hndl);
1614 control_msg_tmp->length = size - (seqno-1)*ONE_SEG;
1615 if (control_msg_tmp->length > ONE_SEG) control_msg_tmp->length = ONE_SEG;
1618 SetMemHndlZero(control_msg_tmp->source_mem_hndl);
1620 return control_msg_tmp;
1623 #define BLOCKING_SEND_CONTROL 0
1625 // Large message, send control to receiver, receiver register memory and do a GET,
1626 // return 1 - send no success
1627 INLINE_KEYWORD static gni_return_t send_large_messages(SMSG_QUEUE *queue, int destNode, CONTROL_MSG *control_msg_tmp, int inbuff, MSG_LIST *smsg_ptr, uint8_t lmsg_tag)
1629 gni_return_t status = GNI_RC_ERROR_NOMEM;
1630 uint32_t vmdh_index = -1;
1633 uint64_t source_addr;
1637 size = control_msg_tmp->total_length;
1638 source_addr = control_msg_tmp->source_addr;
1639 register_size = control_msg_tmp->length;
1641 #if USE_LRTS_MEMPOOL
1642 if( control_msg_tmp->seq_id <=0 ){
1643 #if BLOCKING_SEND_CONTROL
1644 if (inbuff == 0 && IsMemHndlZero(GetMemHndl(source_addr))) {
1645 while (IsMemHndlZero(GetMemHndl(source_addr)) && buffered_send_msg + GetMempoolsize((void*)source_addr) >= MAX_BUFF_SEND)
1646 LrtsAdvanceCommunication(0);
1649 if(IsMemHndlZero(GetMemHndl(source_addr))) //it is in mempool, it is possible to be de-registered by others
1651 msg = (void*)source_addr;
1652 if(buffered_send_msg + GetMempoolsize(msg) >= MAX_BUFF_SEND)
1655 buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, lmsg_tag);
1656 return GNI_RC_ERROR_NOMEM;
1658 //register the corresponding mempool
1659 status = registerMemory(GetMempoolBlockPtr(msg), GetMempoolsize(msg), &(GetMemHndl(msg)), rdma_rx_cqh);
1660 if(status == GNI_RC_SUCCESS)
1662 control_msg_tmp->source_mem_hndl = GetMemHndl(source_addr);
1666 control_msg_tmp->source_mem_hndl = GetMemHndl(source_addr);
1667 status = GNI_RC_SUCCESS;
1669 if(NoMsgInSend(source_addr))
1670 register_size = GetMempoolsize((void*)(source_addr));
1673 }else if(control_msg_tmp->seq_id >0) // BIG_MSG
1675 int offset = ONE_SEG*(control_msg_tmp->seq_id-1);
1676 source_addr += offset;
1677 size = control_msg_tmp->length;
1678 #if BLOCKING_SEND_CONTROL
1679 if (inbuff == 0 && IsMemHndlZero(control_msg_tmp->source_mem_hndl)) {
1680 while (IsMemHndlZero(control_msg_tmp->source_mem_hndl) && buffered_send_msg + size >= MAX_BUFF_SEND)
1681 LrtsAdvanceCommunication(0);
1684 if (IsMemHndlZero(control_msg_tmp->source_mem_hndl)) {
1685 if(buffered_send_msg + size >= MAX_BUFF_SEND)
1688 buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, lmsg_tag);
1689 return GNI_RC_ERROR_NOMEM;
1691 status = registerMemory((void*)source_addr, ALIGN64(size), &(control_msg_tmp->source_mem_hndl), NULL);
1692 if(status == GNI_RC_SUCCESS) buffered_send_msg += ALIGN64(size);
1696 status = GNI_RC_SUCCESS;
1701 #if CMI_EXERT_SEND_LARGE_CAP
1702 if(SEND_large_pending >= SEND_large_cap)
1704 status = GNI_RC_ERROR_NOMEM;
1708 if(status == GNI_RC_SUCCESS)
1710 status = send_smsg_message( queue, destNode, control_msg_tmp, CONTROL_MSG_SIZE, lmsg_tag, inbuff, smsg_ptr, NONCHARM_SMSG, 0);
1711 if(status == GNI_RC_SUCCESS)
1713 #if CMI_EXERT_SEND_LARGE_CAP
1714 SEND_large_pending++;
1716 buffered_send_msg += register_size;
1717 if(control_msg_tmp->seq_id == 0)
1719 IncreaseMsgInSend(source_addr);
1721 #if ! CMK_SMSGS_FREE_AFTER_EVENT
1722 FreeControlMsg(control_msg_tmp);
1724 MACHSTATE5(8, "GO SMSG LARGE to %d (%d,%d,%d) tag=%d\n", destNode, buffered_send_msg, buffered_recv_msg, register_memory_size, lmsg_tag);
1726 status = GNI_RC_ERROR_RESOURCE;
1728 } else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1730 CmiAbort("Memory registor for large msg\n");
1733 status = GNI_RC_ERROR_NOMEM;
1735 buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, lmsg_tag);
1739 MEMORY_REGISTER(onesided_hnd, nic_hndl,msg, ALIGN64(size), &(control_msg_tmp->source_mem_hndl), &omdh, NULL, status)
1740 if(status == GNI_RC_SUCCESS)
1742 status = send_smsg_message(queue, destNode, control_msg_tmp, CONTROL_MSG_SIZE, lmsg_tag, 0, NULL, NONCHARM_SMSG, 0);
1743 #if ! CMK_SMSGS_FREE_AFTER_EVENT
1744 if(status == GNI_RC_SUCCESS)
1746 FreeControlMsg(control_msg_tmp);
1749 } else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1751 CmiAbort("Memory registor for large msg\n");
1754 buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, lmsg_tag);
1760 INLINE_KEYWORD void LrtsBeginIdle(void) {}
1762 INLINE_KEYWORD void LrtsStillIdle(void) {}
1764 INLINE_KEYWORD void LrtsNotifyIdle(void) {}
1766 INLINE_KEYWORD void LrtsPrepareEnvelope(char *msg, int size)
1768 CmiSetMsgSize(msg, size);
1769 CMI_SET_CHECKSUM(msg, size);
1772 CmiCommHandle LrtsSendFunc(int destNode, int destPE, int size, char *msg, int mode)
1774 gni_return_t status = GNI_RC_SUCCESS;
1776 CONTROL_MSG *control_msg_tmp;
1777 int oob = ( mode & OUT_OF_BAND);
1780 MACHSTATE5(8, "GO LrtsSendFn %d(%d) (%d,%d, %d) \n", destNode, size, buffered_send_msg, buffered_recv_msg, register_memory_size);
1782 queue = oob? &smsg_oob_queue : &smsg_queue;
1783 tag = oob? LMSG_OOB_INIT_TAG: LMSG_INIT_TAG;
1785 queue = &smsg_queue;
1786 tag = LMSG_INIT_TAG;
1789 LrtsPrepareEnvelope(msg, size);
1792 printf("LrtsSendFn %d==>%d, size=%d\n", myrank, destNode, size);
1796 if(size <= SMSG_MAX_MSG)
1797 buffer_small_msgs(queue, msg, size, destNode, SMALL_DATA_TAG);
1798 else if (size < BIG_MSG) {
1799 control_msg_tmp = construct_control_msg(size, msg, 0);
1800 buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, tag);
1803 CmiSetMsgSeq(msg, 0);
1804 control_msg_tmp = construct_control_msg(size, msg, 1);
1805 buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, tag);
1807 #else //non-smp, smp(worker sending)
1808 if(size <= SMSG_MAX_MSG)
1810 if (GNI_RC_SUCCESS == send_smsg_message(queue, destNode, msg, size, SMALL_DATA_TAG, 0, NULL, CHARM_SMSG, 0))
1812 #if !CMK_SMSGS_FREE_AFTER_EVENT
1817 else if (size < BIG_MSG) {
1818 control_msg_tmp = construct_control_msg(size, msg, 0);
1819 send_large_messages(queue, destNode, control_msg_tmp, 0, NULL, tag);
1822 #if USE_LRTS_MEMPOOL
1823 CmiSetMsgSeq(msg, 0);
1824 control_msg_tmp = construct_control_msg(size, msg, 1);
1825 send_large_messages(queue, destNode, control_msg_tmp, 0, NULL, tag);
1827 control_msg_tmp = construct_control_msg(size, msg, 0);
1828 send_large_messages(queue, destNode, control_msg_tmp, 0, NULL, tag);
1836 // this is no different from the common code
1837 void LrtsSyncListSendFn(int npes, int *pes, int len, char *msg)
1840 #if CMK_BROADCAST_USE_CMIREFERENCE
1841 for(i=0;i<npes;i++) {
1842 if (pes[i] == CmiMyPe())
1843 CmiSyncSend(pes[i], len, msg);
1846 CmiSyncSendAndFree(pes[i], len, msg);
1850 for(i=0;i<npes;i++) {
1851 CmiSyncSend(pes[i], len, msg);
1856 CmiCommHandle LrtsAsyncListSendFn(int npes, int *pes, int len, char *msg)
1858 /* A better asynchronous implementation may be wanted, but at least it works */
1859 CmiSyncListSendFn(npes, pes, len, msg);
1860 return (CmiCommHandle) 0;
1863 void LrtsFreeListSendFn(int npes, int *pes, int len, char *msg)
1866 CmiSyncSendAndFree(pes[0], len, msg);
1869 #if CMK_PERSISTENT_COMM
1870 if (CpvAccess(phs) && len > PERSIST_MIN_SIZE
1872 && IS_PERSISTENT_MEMORY(msg)
1876 for(i=0;i<npes;i++) {
1877 if (pes[i] == CmiMyPe())
1878 CmiSyncSend(pes[i], len, msg);
1881 CmiSyncSendAndFree(pes[i], len, msg);
1889 #if CMK_BROADCAST_USE_CMIREFERENCE
1890 CmiSyncListSendFn(npes, pes, len, msg);
1894 for(i=0;i<npes-1;i++) {
1895 CmiSyncSend(pes[i], len, msg);
1898 CmiSyncSendAndFree(pes[npes-1], len, msg);
1905 static void PumpDatagramConnection(void);
1906 static int event_SetupConnect = 111;
1907 static int event_PumpSmsg = 222 ;
1908 static int event_PumpTransaction = 333;
1909 static int event_PumpRdmaTransaction = 444;
1910 static int event_SendBufferSmsg = 484;
1911 static int event_SendFmaRdmaMsg = 555;
1912 static int event_AdvanceCommunication = 666;
1914 static void registerUserTraceEvents(void) {
1915 #if CMI_MACH_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1916 event_SetupConnect = traceRegisterUserEvent("setting up connections", -1 );
1917 event_PumpSmsg = traceRegisterUserEvent("Pump network small msgs", -1);
1918 event_PumpTransaction = traceRegisterUserEvent("Pump FMA/RDMA local transaction" , -1);
1919 event_PumpRdmaTransaction = traceRegisterUserEvent("Pump RDMA remote event" , -1);
1920 event_SendBufferSmsg = traceRegisterUserEvent("Sending buffered small msgs", -1);
1921 event_SendFmaRdmaMsg = traceRegisterUserEvent("Sending buffered fma/rdma transactions", -1);
1922 event_AdvanceCommunication = traceRegisterUserEvent("Worker thread in sending/receiving", -1);
1926 static void ProcessDeadlock(void)
1928 static CmiUInt8 *ptr = NULL;
1929 static CmiUInt8 last = 0, mysum, sum;
1930 static int count = 0;
1931 gni_return_t status;
1934 //printf("[%d] comm thread detected hang %d %d %d\n", CmiMyPe(), smsg_send_count, smsg_recv_count, count);
1935 //sweep_mempool(CpvAccess(mempool));
1936 if (ptr == NULL) ptr = (CmiUInt8*)malloc(mysize * sizeof(CmiUInt8));
1937 mysum = smsg_send_count + smsg_recv_count;
1938 MACHSTATE5(9,"Before allgather Progress Deadlock (%d,%d) (%d,%d)(%d)\n", buffered_send_msg, register_memory_size, last, sum, count);
1939 status = (gni_return_t)PMI_Allgather(&mysum,ptr,sizeof(CmiUInt8));
1940 GNI_RC_CHECK("PMI_Allgather", status);
1942 for (i=0; i<mysize; i++) sum+= ptr[i];
1943 if (last == 0 || sum == last)
1948 MACHSTATE5(9,"Progress Deadlock (%d,%d) (%d,%d)(%d)\n", buffered_send_msg, register_memory_size, last, sum, count);
1950 /* detected twice, it is a real deadlock */
1952 CmiPrintf("Charm++> Network progress engine appears to have stalled, possibly because registered memory limits have been exceeded or are too low. Try adjusting environment variables CHARM_UGNI_MEMPOOL_MAX and CHARM_UGNI_SEND_MAX (current limits are %lld and %lld).\n", MAX_REG_MEM, MAX_BUFF_SEND);
1953 CmiAbort("Fatal> Deadlock detected.");
1960 static void CheckProgress(void)
1962 if (smsg_send_count == last_smsg_send_count &&
1963 smsg_recv_count == last_smsg_recv_count )
1967 if (_detected_hang) ProcessDeadlock();
1972 //MACHSTATE5(9,"--Check Progress %d(%d, %d) (%d,%d)\n", mycount, buffered_send_msg, register_memory_size, smsg_send_count, smsg_recv_count);
1973 last_smsg_send_count = smsg_send_count;
1974 last_smsg_recv_count = smsg_recv_count;
1979 static void set_limit(void)
1981 //if (!user_set_flag && CmiMyRank() == 0) {
1982 if (CmiMyRank() == 0) {
1983 int mynode = CmiPhysicalNodeID(CmiMyPe());
1984 int numpes = CmiNumPesOnPhysicalNode(mynode);
1985 int numprocesses = numpes / CmiMyNodeSize();
1986 MAX_REG_MEM = _totalmem / numprocesses;
1987 MAX_BUFF_SEND = MAX_REG_MEM / 2;
1989 printf("mem_max = %.2fM, send_max =%.2fM\n", MAX_REG_MEM/1024.0/1024, MAX_BUFF_SEND/1024./1024);
1990 if(CmiMyPe() == 0 && (smsg_memlen*mysize + _expand_mem > MAX_BUFF_SEND || smsg_memlen*mysize + _mempool_size > MAX_BUFF_SEND))
1992 printf("Charm++> FATAL ERROR your program has risk of hanging \n please try large page or use Dynamic smsg +useDynamicSmsg or contact Charm++ developers\n");
1993 CmiAbort("memory registration\n");
1998 void LrtsPostCommonInit(int everReturn)
2003 #if CMI_MACH_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
2004 CpvInitialize(double, projTraceStart);
2005 /* only PE 0 needs to care about registration (to generate sts file). */
2006 //if (CmiMyPe() == 0)
2008 registerMachineUserEventsFunction(®isterUserTraceEvents);
2014 CcdCallOnConditionKeep(CcdPERIODIC_10ms, (CcdVoidFn) PumpDatagramConnection, NULL);
2020 if (CmiMyRank() == 0)
2022 CcdCallOnConditionKeep(CcdPERIODIC_2minute, (CcdVoidFn) CheckProgress, NULL);
2026 CcdCallOnCondition(CcdTOPOLOGY_AVAIL, (CcdVoidFn)set_limit, NULL);
2030 /* this is called by worker thread */
2031 void LrtsPostNonLocal(void)
2035 #if CMK_SMP_TRACE_COMMTHREAD
2036 double startT, endT;
2039 #if MULTI_THREAD_SEND
2040 if(mysize == 1) return;
2042 if (CmiMyRank() % 6 != 3) return;
2044 #if CMK_SMP_TRACE_COMMTHREAD
2046 startT = CmiWallTimer();
2049 CmiMachineProgressImpl();
2051 #if CMK_SMP_TRACE_COMMTHREAD
2052 endT = CmiWallTimer();
2053 traceUserBracketEvent(event_AdvanceCommunication, startT, endT);
2061 /* Network progress function is used to poll the network when for
2062 messages. This flushes receive buffers on some implementations*/
2063 #if CMK_MACHINE_PROGRESS_DEFINED
2064 void CmiMachineProgressImpl(void) {
2065 #if ! CMK_SMP || MULTI_THREAD_SEND
2067 STATS_PUMPNETWORK_TIME(PumpNetworkSmsg());
2068 SEND_OOB_SMSG(smsg_oob_queue)
2069 PUMP_REMOTE_HIGHPRIORITY
2070 PUMP_LOCAL_HIGHPRIORITY
2071 POST_HIGHPRIORITY_RDMA
2074 #if CMK_WORKER_SINGLE_TASK
2075 if (CmiMyRank() % 6 == 0)
2079 #if CMK_WORKER_SINGLE_TASK
2080 if (CmiMyRank() % 6 == 1)
2082 PumpLocalTransactions(default_tx_cqh, default_tx_cq_lock);
2084 #if CMK_WORKER_SINGLE_TASK
2085 if (CmiMyRank() % 6 == 2)
2087 PumpLocalTransactions(rdma_tx_cqh, rdma_tx_cq_lock);
2090 #if CMK_WORKER_SINGLE_TASK
2091 if (CmiMyRank() % 6 == 3)
2093 PumpRemoteTransactions(rdma_rx_cqh); // rdma_rx_cqh
2096 #if CMK_WORKER_SINGLE_TASK
2097 if (CmiMyRank() % 6 == 4)
2101 SendBufferMsg(&smsg_oob_queue, NULL);
2102 SendBufferMsg(&smsg_queue, &smsg_oob_queue);
2104 SendBufferMsg(&smsg_queue, NULL);
2108 #if CMK_WORKER_SINGLE_TASK
2109 if (CmiMyRank() % 6 == 5)
2112 STATS_SENDRDMAMSG_TIME(SendRdmaMsg(sendRdmaBuf));
2114 STATS_SENDRDMAMSG_TIME(SendRdmaMsg());
2123 /* useDynamicSMSG */
2124 static void PumpDatagramConnection(void)
2126 uint32_t remote_address;
2128 gni_return_t status;
2129 gni_post_state_t post_state;
2130 uint64_t datagram_id;
2133 while ((status = GNI_PostDataProbeById(nic_hndl, &datagram_id)) == GNI_RC_SUCCESS)
2135 if (datagram_id >= mysize) { /* bound endpoint */
2136 int pe = datagram_id - mysize;
2137 CMI_GNI_LOCK(global_gni_lock)
2138 status = GNI_EpPostDataTestById( ep_hndl_array[pe], datagram_id, &post_state, &remote_address, &remote_id);
2139 CMI_GNI_UNLOCK(global_gni_lock)
2140 if(status == GNI_RC_SUCCESS && post_state == GNI_POST_COMPLETED)
2142 CmiAssert(remote_id == pe);
2143 status = GNI_SmsgInit(ep_hndl_array[pe], smsg_attr_vector_local[pe], smsg_attr_vector_remote[pe]);
2144 GNI_RC_CHECK("Dynamic SMSG Init", status);
2146 printf("[%d] ++ Dynamic SMSG setup [%d===>%d] done\n", myrank, myrank, pe);
2148 CmiAssert(smsg_connected_flag[pe] == 1);
2149 smsg_connected_flag[pe] = 2;
2152 else { /* unbound ep */
2153 status = GNI_EpPostDataTestById( ep_hndl_unbound, datagram_id, &post_state, &remote_address, &remote_id);
2154 if(status == GNI_RC_SUCCESS && post_state == GNI_POST_COMPLETED)
2156 CmiAssert(remote_id<mysize);
2157 CmiAssert(smsg_connected_flag[remote_id] <= 0);
2158 status = GNI_SmsgInit(ep_hndl_array[remote_id], &send_smsg_attr, &recv_smsg_attr);
2159 GNI_RC_CHECK("Dynamic SMSG Init", status);
2161 printf("[%d] ++ Dynamic SMSG setup2 [%d===>%d] done\n", myrank, myrank, remote_id);
2163 smsg_connected_flag[remote_id] = 2;
2165 alloc_smsg_attr(&send_smsg_attr);
2166 status = GNI_EpPostDataWId (ep_hndl_unbound, &send_smsg_attr, SMSG_ATTR_SIZE, &recv_smsg_attr, SMSG_ATTR_SIZE, myrank);
2167 GNI_RC_CHECK("post unbound datagram", status);
2173 /* pooling CQ to receive network message */
2174 static void PumpNetworkRdmaMsgs(void)
2176 gni_cq_entry_t event_data;
2177 gni_return_t status;
2182 static void bufferRdmaMsg(PCQueue bufferqueue, int inst_id, gni_post_descriptor_t *pd, int ack_index)
2184 RDMA_REQUEST *rdma_request_msg;
2185 MallocRdmaRequest(rdma_request_msg);
2186 rdma_request_msg->destNode = inst_id;
2187 rdma_request_msg->pd = pd;
2189 rdma_request_msg->ack_index = ack_index;
2191 PCQueuePush(bufferqueue, (char*)rdma_request_msg);
2194 static void getLargeMsgRequest(void* header, uint64_t inst_id, uint8_t tag, PCQueue);
2195 static void getPersistentMsgRequest(void* header, uint64_t inst_id, uint8_t tag, PCQueue);
2196 static void PRINT_CONTROL(void *header)
2198 CONTROL_MSG *control_msg = (CONTROL_MSG *) header;
2200 printf(" length=%d , seq_id = %d, addr = %lld:%lld:%lld \n", control_msg->length, control_msg->seq_id, control_msg->source_addr, (control_msg->source_mem_hndl).qword1, (control_msg->source_mem_hndl).qword2 );
2201 #if PERSISTENT_GET_BASE
2202 printf(" memhdl = %lld:%lld:%lld \n", control_msg->dest_addr, (control_msg->dest_mem_hndl).qword1, (control_msg->dest_mem_hndl).qword2);
2205 static void PumpNetworkSmsg()
2208 gni_cq_entry_t event_data;
2209 gni_return_t status, deregStatus;
2214 gni_mem_handle_t msg_mem_hndl;
2215 gni_smsg_attr_t *smsg_attr;
2216 gni_smsg_attr_t *remote_smsg_attr;
2218 CONTROL_MSG *control_msg_tmp, *header_tmp;
2219 uint64_t source_addr;
2220 SMSG_QUEUE *queue = &smsg_queue;
2224 cmidirectMsg *direct_msg;
2226 #if CMI_PUMPNETWORKSMSG_CAP
2228 while(recv_cnt< PumpNetworkSmsg_cap) {
2232 CMI_GNI_LOCK(smsg_rx_cq_lock)
2233 status =GNI_CqGetEvent(smsg_rx_cqh, &event_data);
2234 CMI_GNI_UNLOCK(smsg_rx_cq_lock)
2235 if(status != GNI_RC_SUCCESS) break;
2237 inst_id = GNI_CQ_GET_REM_INST_ID(event_data);
2239 inst_id = GET_RANK(inst_id); /* important */
2241 // GetEvent returns success but GetNext return not_done. caused by Smsg out-of-order transfer
2243 printf("[%d] %d PumpNetworkMsgs is received from PE: %d, status=%s\n", myrank, CmiMyRank(), inst_id, gni_err_str[status]);
2245 if (useDynamicSMSG) {
2246 /* subtle: smsg may come before connection is setup */
2247 while (smsg_connected_flag[inst_id] != 2)
2248 PumpDatagramConnection();
2250 msg_tag = GNI_SMSG_ANY_TAG;
2252 CMI_GNI_LOCK(smsg_mailbox_lock)
2253 status = GNI_SmsgGetNextWTag(ep_hndl_array[inst_id], &header, &msg_tag);
2254 if (status != GNI_RC_SUCCESS)
2256 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2259 #if CMI_PUMPNETWORKSMSG_CAP
2263 printf("[%d] from %d smsg msg is received, messageid: tag=%d\n", myrank, inst_id, msg_tag);
2265 /* copy msg out and then put into queue (small message) */
2267 case SMALL_DATA_TAG:
2269 msg_nbytes = CmiGetMsgSize(header);
2270 CmiAssert(msg_nbytes > 0);
2271 msg_data = CmiAlloc(msg_nbytes);
2272 memcpy(msg_data, (char*)header, msg_nbytes);
2273 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2274 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2275 CMI_CHECK_CHECKSUM(msg_data, msg_nbytes);
2276 handleOneRecvedMsg(msg_nbytes, (char *)msg_data);
2279 case LMSG_PERSISTENT_INIT_TAG:
2280 { CMI_GNI_UNLOCK(smsg_mailbox_lock)
2281 getPersistentMsgRequest(header, inst_id, msg_tag, sendRdmaBuf);
2282 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2286 case LMSG_OOB_INIT_TAG:
2288 tmp_queue = (msg_tag == LMSG_INIT_TAG)? sendRdmaBuf : sendHighPriorBuf;
2289 #if MULTI_THREAD_SEND
2290 MallocControlMsg(control_msg_tmp);
2291 memcpy(control_msg_tmp, header, CONTROL_MSG_SIZE);
2292 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2293 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2294 getLargeMsgRequest(control_msg_tmp, inst_id, msg_tag, tmp_queue);
2295 FreeControlMsg(control_msg_tmp);
2297 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2298 getLargeMsgRequest(header, inst_id, msg_tag, tmp_queue);
2299 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2303 #if !REMOTE_EVENT && !CQWRITE
2304 case ACK_TAG: //msg fit into mempool
2306 /* Get is done, release message . Now put is not used yet*/
2307 void *msg = (void*)(((ACK_MSG *)header)->source_addr);
2308 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2309 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2310 #if ! USE_LRTS_MEMPOOL
2311 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((ACK_MSG *)header)->source_mem_hndl), &omdh, ((ACK_MSG *)header)->length);
2313 DecreaseMsgInSend(msg);
2315 if(NoMsgInSend(msg))
2316 buffered_send_msg -= GetMempoolsize(msg);
2317 MACHSTATE5(8, "GO send done to %d (%d,%d, %d) tag=%d\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size, msg_tag);
2319 #if CMI_EXERT_SEND_LARGE_CAP
2320 SEND_large_pending--;
2325 #if CMK_ONESIDED_IMPL
2328 CmiGNIAckOp_t *ack_data = (CmiGNIAckOp_t *)header;
2329 CmiRdmaAck * ack = ack_data->ack;
2330 gni_mem_handle_t mem_hndl = ack_data->mem_hndl;
2331 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2332 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2334 // call the fnPtr to handle the ack
2335 ack->fnPtr(ack->token);
2337 // free callback structure, CmiRdmaAck allocated in CmiSetRdmaAck
2340 // Deregister registered sender memory used for GET
2341 status = GNI_MemDeregister(nic_hndl, &mem_hndl);
2342 GNI_RC_CHECK("GNI_MemDeregister on Sender for GET operation", status);
2345 case RDMA_PUT_MD_TAG:
2347 CmiGNIRzvRdmaRecv_t *recvInfo = (CmiGNIRzvRdmaRecv_t *)header;
2348 recvInfoSize = LrtsGetRdmaRecvInfoSize(recvInfo->numOps);
2349 CmiGNIRzvRdmaRecv_t *newRecvInfo = (CmiGNIRzvRdmaRecv_t *)malloc(recvInfoSize);
2350 memcpy(newRecvInfo, recvInfo, recvInfoSize);
2351 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2352 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2354 LrtsIssueRputs((void *)newRecvInfo, recvInfo->destNode);
2357 case RDMA_PUT_MD_DIRECT_TAG:
2359 NcpyOperationInfo *ncpyOpInfo = (NcpyOperationInfo *)header;
2361 // copy into a new object
2362 NcpyOperationInfo *newNcpyOpInfo = (NcpyOperationInfo *)CmiAlloc(ncpyOpInfo->ncpyOpInfoSize);
2363 memcpy(newNcpyOpInfo, ncpyOpInfo, ncpyOpInfo->ncpyOpInfoSize);
2365 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2366 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2368 post_rdma((uint64_t)newNcpyOpInfo->destPtr,
2369 ((CmiGNIRzvRdmaPtr_t *)((char *)(newNcpyOpInfo->destLayerInfo) + CmiGetRdmaCommonInfoSize()))->mem_hndl,
2370 (uint64_t)newNcpyOpInfo->srcPtr,
2371 ((CmiGNIRzvRdmaPtr_t *)((char *)(newNcpyOpInfo->srcLayerInfo) + CmiGetRdmaCommonInfoSize()))->mem_hndl,
2372 newNcpyOpInfo->srcSize,
2373 (uint64_t)newNcpyOpInfo,
2374 CmiNodeOf(newNcpyOpInfo->destPe),
2380 case RDMA_PUT_DONE_TAG:
2382 CmiGNIRzvRdmaRecv_t *recvInfo = (CmiGNIRzvRdmaRecv_t *)header;
2384 /* copy the received message (recvInfo) into newRecvInfo as
2385 * recvInfo is invalid after SmsgRelease call
2387 recvInfoSize = LrtsGetRdmaRecvInfoSize(recvInfo->numOps);
2388 CmiGNIRzvRdmaRecv_t *newRecvInfo = (CmiGNIRzvRdmaRecv_t *)malloc(recvInfoSize);
2389 memcpy(newRecvInfo, recvInfo, recvInfoSize);
2390 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2391 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2393 char *msg = (char *)newRecvInfo->msg;
2394 int size = CmiGetMsgSize(msg);
2396 handleOneRecvedMsg(size, msg);
2399 for(i=0; i<newRecvInfo->numOps;i++){
2400 CmiGNIRzvRdmaRecvOp_t * recvOp = &newRecvInfo->rdmaOp[i];
2401 // Deregister registered receiver memory used for PUT
2402 status = GNI_MemDeregister(nic_hndl, &(recvOp->remote_mem_hndl));
2403 GNI_RC_CHECK("GNI_MemDeregister on Receiver for PUT operation", status);
2405 // free newRecvInfo as it is no longer used
2409 case RDMA_PUT_DONE_DIRECT_TAG:
2411 // Direct API when PUT is used instead of a GET
2412 // This tag implies the completion of an indirect PUT operation used for the Direct API
2413 CmiGNIRzvRdmaDirectInfo_t *putOp = (CmiGNIRzvRdmaDirectInfo_t *)header;
2414 void *token = (void *)putOp->ref;
2415 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2416 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2418 // Invoke the ack handler function
2419 CmiInvokeNcpyAck(token);
2422 case RDMA_DEREG_DIRECT_TAG:
2425 // This tag implies a request to free the memory handle local to this node
2426 gni_mem_handle_t *memhndl = (gni_mem_handle_t *)header;
2427 status = GNI_MemDeregister(nic_hndl, memhndl);
2428 GNI_RC_CHECK("GNI_MemDeregister Failed!", status);
2429 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2430 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2433 case RDMA_REG_AND_PUT_MD_DIRECT_TAG:
2435 NcpyOperationInfo *ncpyOpInfo = (NcpyOperationInfo *)header;
2437 // copy into a new object
2438 NcpyOperationInfo *newNcpyOpInfo = (NcpyOperationInfo *)CmiAlloc(ncpyOpInfo->ncpyOpInfoSize);
2439 memcpy(newNcpyOpInfo, ncpyOpInfo, ncpyOpInfo->ncpyOpInfoSize);
2441 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2442 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2444 resetNcpyOpInfoPointers(newNcpyOpInfo);
2446 // Register source buffer
2447 ((CmiGNIRzvRdmaPtr_t *)((char *)(newNcpyOpInfo->srcLayerInfo) + CmiGetRdmaCommonInfoSize()))->mem_hndl =
2448 registerDirectMem(newNcpyOpInfo->srcPtr,
2449 newNcpyOpInfo->srcSize,
2452 post_rdma((uint64_t)newNcpyOpInfo->destPtr,
2453 ((CmiGNIRzvRdmaPtr_t *)((char *)(newNcpyOpInfo->destLayerInfo) + CmiGetRdmaCommonInfoSize()))->mem_hndl,
2454 (uint64_t)newNcpyOpInfo->srcPtr,
2455 ((CmiGNIRzvRdmaPtr_t *)((char *)(newNcpyOpInfo->srcLayerInfo) + CmiGetRdmaCommonInfoSize()))->mem_hndl,
2456 newNcpyOpInfo->srcSize,
2457 (uint64_t)newNcpyOpInfo,
2458 CmiNodeOf(newNcpyOpInfo->destPe),
2464 case RDMA_REG_AND_GET_MD_DIRECT_TAG:
2466 NcpyOperationInfo *ncpyOpInfo = (NcpyOperationInfo *)header;
2468 // copy into a new object
2469 NcpyOperationInfo *newNcpyOpInfo = (NcpyOperationInfo *)CmiAlloc(ncpyOpInfo->ncpyOpInfoSize);
2470 memcpy(newNcpyOpInfo, ncpyOpInfo, ncpyOpInfo->ncpyOpInfoSize);
2472 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2473 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2475 resetNcpyOpInfoPointers(newNcpyOpInfo);
2477 ((CmiGNIRzvRdmaPtr_t *)((char *)(newNcpyOpInfo->destLayerInfo) + CmiGetRdmaCommonInfoSize()))->mem_hndl =
2478 registerDirectMem(newNcpyOpInfo->destPtr,
2479 newNcpyOpInfo->srcSize,
2482 post_rdma((uint64_t)newNcpyOpInfo->srcPtr,
2483 ((CmiGNIRzvRdmaPtr_t *)((char *)(newNcpyOpInfo->srcLayerInfo) + CmiGetRdmaCommonInfoSize()))->mem_hndl,
2484 (uint64_t)newNcpyOpInfo->destPtr,
2485 ((CmiGNIRzvRdmaPtr_t *)((char *)(newNcpyOpInfo->destLayerInfo) + CmiGetRdmaCommonInfoSize()))->mem_hndl,
2486 newNcpyOpInfo->srcSize,
2487 (uint64_t)newNcpyOpInfo,
2488 CmiNodeOf(newNcpyOpInfo->srcPe),
2495 case BIG_MSG_TAG: //big msg, de-register, transfer next seg
2497 #if MULTI_THREAD_SEND
2498 MallocControlMsg(header_tmp);
2499 memcpy(header_tmp, header, CONTROL_MSG_SIZE);
2500 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2502 header_tmp = (CONTROL_MSG *) header;
2504 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2505 #if CMI_EXERT_SEND_LARGE_CAP
2506 SEND_large_pending--;
2508 void *msg = (void*)(header_tmp->source_addr);
2509 int cur_seq = CmiGetMsgSeq(msg);
2510 int offset = ONE_SEG*(cur_seq+1);
2511 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(header_tmp->source_mem_hndl), &omdh, header_tmp->length);
2512 buffered_send_msg -= header_tmp->length;
2513 int remain_size = CmiGetMsgSize(msg) - header_tmp->length;
2514 if (remain_size < 0) remain_size = 0;
2515 CmiSetMsgSize(msg, remain_size);
2516 if(remain_size <= 0) //transaction done
2519 }else if (header_tmp->total_length > offset)
2521 CmiSetMsgSeq(msg, cur_seq+1);
2522 control_msg_tmp = construct_control_msg(header_tmp->total_length, (char *)msg, cur_seq+1+1);
2523 control_msg_tmp->dest_addr = header_tmp->dest_addr;
2525 send_large_messages( queue, inst_id, control_msg_tmp, 0, NULL, LMSG_INIT_TAG);
2527 if (header_tmp->seq_id == 1) {
2529 for (i=1; i<BIG_MSG_PIPELINE; i++) {
2530 int seq = cur_seq+i+2;
2531 CmiSetMsgSeq(msg, seq-1);
2532 control_msg_tmp = construct_control_msg(header_tmp->total_length, (char *)msg, seq);
2533 control_msg_tmp->dest_addr = header_tmp->dest_addr;
2534 send_large_messages( queue, inst_id, control_msg_tmp, 0, NULL, LMSG_INIT_TAG);
2535 if (header_tmp->total_length <= ONE_SEG*seq) break;
2539 #if MULTI_THREAD_SEND
2540 FreeControlMsg(header_tmp);
2542 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2546 #if CMK_PERSISTENT_COMM_PUT && !REMOTE_EVENT && !CQWRITE
2547 case PUT_DONE_TAG: { //persistent message
2548 void *msg = (void *)(((CONTROL_MSG *) header)->source_addr);
2549 int size = ((CONTROL_MSG *) header)->length;
2550 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2551 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2553 CMI_CHECK_CHECKSUM(msg, size);
2554 handleOneRecvedMsg(size, msg);
2556 printf("[%d] PUT_DONE_TAG hand over one message, size: %d. \n", myrank, size);
2562 case DIRECT_PUT_DONE_TAG: //cmi direct
2563 //create a trigger message
2564 direct_msg = (cmidirectMsg*)CmiAlloc(sizeof(cmidirectMsg));
2565 direct_msg->handler = ((CMK_DIRECT_HEADER*)header)->handler_addr;
2566 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2567 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2568 CmiSetHandler(direct_msg, CpvAccess(CmiHandleDirectIdx));
2569 CmiPushPE(((CmiDirectUserHandle*)direct_msg->handler)->remoteRank, direct_msg);
2570 //(*(((CMK_DIRECT_HEADER*) header)->callbackFnPtr))(((CMK_DIRECT_HEADER*) header)->callbackData);
2574 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2575 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2576 printf("weird tag problem %d \n", msg_tag);
2577 CmiAbort("Unknown tag\n");
2580 printf("[%d] from %d after switch request for smsg is received, messageid: tag=%d\n", myrank, inst_id, msg_tag);
2583 msg_tag = GNI_SMSG_ANY_TAG;
2584 } //endwhile GNI_SmsgGetNextWTag
2585 } //end while GetEvent
2586 if(status == GNI_RC_ERROR_RESOURCE)
2588 printf("charm> Please use +useRecvQueue %d in your command line, if the error comes again, increase this number\n", REMOTE_QUEUE_ENTRIES*2);
2589 GNI_RC_CHECK("Smsg_rx_cq full", status);
2593 static void printDesc(gni_post_descriptor_t *pd)
2595 printf(" Descriptor (%p===>%p)(%d)\n", pd->local_addr, pd->remote_addr, pd->length);
2599 static void sendCqWrite(int destNode, uint64_t data, gni_mem_handle_t mem_hndl)
2601 gni_post_descriptor_t *pd;
2602 gni_return_t status = GNI_RC_SUCCESS;
2605 pd->type = GNI_POST_CQWRITE;
2606 pd->cq_mode = GNI_CQMODE_SILENT;
2607 //pd->cq_mode = GNI_CQMODE_GLOBAL_EVENT | GNI_CQMODE_REMOTE_EVENT ;
2608 pd->dlvr_mode = GNI_DLVMODE_PERFORMANCE;
2609 pd->cqwrite_value = data;
2610 pd->remote_mem_hndl = mem_hndl;
2611 status = GNI_PostCqWrite(ep_hndl_array[destNode], pd);
2612 GNI_RC_CHECK("GNI_PostCqWrite", status);
2616 // register memory for a message
2617 // return mem handle
2618 static gni_return_t registerMessage(void *msg, int size, int seqno, gni_mem_handle_t *memh)
2620 gni_return_t status = GNI_RC_SUCCESS;
2622 if (!IsMemHndlZero(*memh)) return GNI_RC_SUCCESS;
2624 #if CMK_PERSISTENT_COMM_PUT
2625 // persistent message is always registered
2626 // BIG_MSG small pieces do not have malloc chunk header
2627 if (IS_PERSISTENT_MEMORY(msg)) {
2628 *memh = GetMemHndl(msg);
2629 return GNI_RC_SUCCESS;
2633 #if CMK_PERSISTENT_COMM_PUT
2634 || seqno == PERSIST_SEQ
2638 if(IsMemHndlZero((GetMemHndl(msg))))
2641 status = registerMemory(GetMempoolBlockPtr(msg), GetMempoolsize(msg), &(GetMemHndl(msg)), rdma_rx_cqh);
2642 if(status == GNI_RC_SUCCESS)
2643 *memh = GetMemHndl(msg);
2646 *memh = GetMemHndl(msg);
2650 //big msg, can not fit into memory pool, or CmiDirect Msg (which is not from mempool)
2651 status = registerMemory(msg, size, memh, NULL);
2656 static void getPersistentMsgRequest(void* header, uint64_t inst_id, uint8_t tag, PCQueue bufferRdmaQueue )
2658 #if PERSISTENT_GET_BASE
2659 CONTROL_MSG *request_msg;
2660 gni_return_t status;
2661 gni_post_descriptor_t *pd;
2662 request_msg = (CONTROL_MSG *) header;
2665 pd->cqwrite_value = request_msg->seq_id;
2666 pd->first_operand = ALIGN64(request_msg->length); // total length
2667 if(request_msg->length <= LRTS_GNI_RDMA_THRESHOLD)
2668 pd->type = GNI_POST_FMA_GET;
2670 pd->type = GNI_POST_RDMA_GET;
2671 pd->cq_mode = GNI_CQMODE_GLOBAL_EVENT ;
2672 pd->dlvr_mode = GNI_DLVMODE_PERFORMANCE;
2673 pd->length = ALIGN64(request_msg->length);
2674 pd->local_addr = (uint64_t) request_msg->dest_addr;
2675 pd->local_mem_hndl = request_msg->dest_mem_hndl;
2676 pd->remote_addr = (uint64_t) request_msg->source_addr;
2677 pd->remote_mem_hndl = request_msg->source_mem_hndl;
2678 pd->src_cq_hndl = 0;
2680 pd->amo_cmd = (gni_fma_cmd_type_t)0;
2682 bufferRdmaMsg(bufferRdmaQueue, inst_id, pd, request_msg->ack_index);
2684 bufferRdmaMsg(bufferRdmaQueue, inst_id, pd, -1);
2690 // for BIG_MSG called on receiver side for receiving control message
2692 static void getLargeMsgRequest(void* header, uint64_t inst_id, uint8_t tag, PCQueue bufferRdmaQueue )
2694 #if USE_LRTS_MEMPOOL
2695 CONTROL_MSG *request_msg;
2696 gni_return_t status = GNI_RC_SUCCESS;
2698 gni_post_descriptor_t *pd;
2699 gni_mem_handle_t msg_mem_hndl;
2700 int size, transaction_size, offset = 0;
2701 size_t register_size = 0;
2703 // initial a get to transfer data from the sender side */
2704 request_msg = (CONTROL_MSG *) header;
2705 size = request_msg->total_length;
2706 MACHSTATE4(8, "GO Get request from %d (%d,%d, %d) \n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size);
2709 pd->sync_flag_addr = 1000000 * CmiWallTimer(); //microsecond
2711 if(request_msg->seq_id < 2) {
2712 MACHSTATE2(8, "%d seq id in get large msg requrest %d\n", CmiMyRank(), request_msg->seq_id);
2713 #if CMK_SMP_TRACE_COMMTHREAD
2714 pd->sync_flag_addr = 1000000 * CmiWallTimer(); //microsecond
2716 msg_data = CmiAlloc(size);
2717 CmiSetMsgSeq(msg_data, 0);
2718 _MEMCHECK(msg_data);
2721 offset = ONE_SEG*(request_msg->seq_id-1);
2722 msg_data = (char*)request_msg->dest_addr + offset;
2725 pd->cqwrite_value = request_msg->seq_id;
2727 transaction_size = request_msg->seq_id == 0? ALIGN64(size) : ALIGN64(request_msg->length);
2728 SetMemHndlZero(pd->local_mem_hndl);
2729 status = registerMessage(msg_data, transaction_size, request_msg->seq_id, &pd->local_mem_hndl);
2730 if (status == GNI_RC_SUCCESS && request_msg->seq_id == 0) {
2731 if(NoMsgInRecv( (void*)(msg_data)))
2732 register_size = GetMempoolsize((void*)(msg_data));
2735 pd->first_operand = ALIGN64(size); // total length
2737 if(request_msg->total_length <= LRTS_GNI_RDMA_THRESHOLD)
2738 pd->type = GNI_POST_FMA_GET;
2740 pd->type = GNI_POST_RDMA_GET;
2741 pd->cq_mode = GNI_CQMODE_GLOBAL_EVENT;
2742 pd->dlvr_mode = GNI_DLVMODE_PERFORMANCE;
2743 pd->length = transaction_size;
2744 pd->local_addr = (uint64_t) msg_data;
2745 pd->remote_addr = request_msg->source_addr + offset;
2746 pd->remote_mem_hndl = request_msg->source_mem_hndl;
2748 if (tag == LMSG_OOB_INIT_TAG)
2749 pd->src_cq_hndl = highprior_rdma_tx_cqh;
2752 #if MULTI_THREAD_SEND
2753 pd->src_cq_hndl = rdma_tx_cqh;
2755 pd->src_cq_hndl = 0;
2759 pd->rdma_mode = (gni_fma_cmd_type_t)0;
2760 pd->amo_cmd = (gni_fma_cmd_type_t)0;
2761 #if CMI_EXERT_RECV_RDMA_CAP
2762 if(status == GNI_RC_SUCCESS && RDMA_pending >= RDMA_cap ) status = GNI_RC_ERROR_RESOURCE;
2764 //memory registration success
2765 if(status == GNI_RC_SUCCESS && tag == LMSG_OOB_INIT_TAG )
2767 CmiNodeLock lock = pd->type == GNI_POST_RDMA_GET?rdma_tx_cq_lock:default_tx_cq_lock;
2770 if( request_msg->seq_id == 0)
2772 pd->cq_mode |= GNI_CQMODE_REMOTE_EVENT;
2773 int sts = GNI_EpSetEventData(ep_hndl_array[inst_id], inst_id, ACK_EVENT(request_msg->ack_index));
2774 GNI_RC_CHECK("GNI_EpSetEventData", sts);
2779 RDMA_TRY_SEND(pd->type)
2781 if(pd->type == GNI_POST_RDMA_GET)
2783 status = GNI_PostRdma(ep_hndl_array[inst_id], pd);
2787 status = GNI_PostFma(ep_hndl_array[inst_id], pd);
2789 CMI_GNI_UNLOCK(lock)
2791 if(status == GNI_RC_SUCCESS )
2793 #if CMI_EXERT_RECV_RDMA_CAP
2796 if(pd->cqwrite_value == 0)
2798 #if MACHINE_DEBUG_LOG
2799 buffered_recv_msg += register_size;
2800 MACHSTATE4(8, "GO request from %d (%d,%d, %d)\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size);
2802 IncreaseMsgInRecv(msg_data);
2803 #if CMK_SMP_TRACE_COMMTHREAD
2804 pd->sync_flag_value = 1000000 * CmiWallTimer(); //microsecond
2808 pd->sync_flag_value = 1000000 * CmiWallTimer(); //microsecond
2809 RDMA_TRANS_INIT(pd->type, pd->sync_flag_addr/1000000.0)
2812 }else if (status != GNI_RC_SUCCESS)
2814 SetMemHndlZero((pd->local_mem_hndl));
2816 if(status == GNI_RC_ERROR_RESOURCE|| status == GNI_RC_ERROR_NOMEM || tag != LMSG_OOB_INIT_TAG)
2819 bufferRdmaMsg(bufferRdmaQueue, inst_id, pd, request_msg->ack_index);
2821 bufferRdmaMsg(bufferRdmaQueue, inst_id, pd, -1);
2823 }else if (status != GNI_RC_SUCCESS) {
2824 // printf("source: %d pd:(%p,%p)(%p,%p) len:%d local:%x remote:%x\n", (int)inst_id, (pd->local_mem_hndl).qword1, (pd->local_mem_hndl).qword2, (pd->remote_mem_hndl).qword1, (pd->remote_mem_hndl).qword2, pd->length, pd->local_addr, pd->remote_addr);
2825 GNI_RC_CHECK("GetLargeAFter posting", status);
2828 CONTROL_MSG *request_msg;
2829 gni_return_t status;
2831 gni_post_descriptor_t *pd;
2832 RDMA_REQUEST *rdma_request_msg;
2833 gni_mem_handle_t msg_mem_hndl;
2835 // initial a get to transfer data from the sender side */
2836 request_msg = (CONTROL_MSG *) header;
2837 msg_data = CmiAlloc(request_msg->length);
2838 _MEMCHECK(msg_data);
2840 MEMORY_REGISTER(onesided_hnd, nic_hndl, msg_data, request_msg->length, &msg_mem_hndl, &omdh, NULL, status)
2842 if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
2844 GNI_RC_CHECK("Invalid/permission Mem Register in post", status);
2848 if(request_msg->length <= LRTS_GNI_RDMA_THRESHOLD)
2849 pd->type = GNI_POST_FMA_GET;
2851 pd->type = GNI_POST_RDMA_GET;
2852 pd->cq_mode = GNI_CQMODE_GLOBAL_EVENT;// | GNI_CQMODE_REMOTE_EVENT;
2853 pd->dlvr_mode = GNI_DLVMODE_PERFORMANCE;
2854 pd->length = ALIGN64(request_msg->length);
2855 pd->local_addr = (uint64_t) msg_data;
2856 pd->remote_addr = request_msg->source_addr;
2857 pd->remote_mem_hndl = request_msg->source_mem_hndl;
2858 if (tag == LMSG_OOB_INIT_TAG)
2859 pd->src_cq_hndl = highprior_rdma_tx_cqh;
2862 #if MULTI_THREAD_SEND
2863 pd->src_cq_hndl = rdma_tx_cqh;
2865 pd->src_cq_hndl = 0;
2871 //memory registration successful
2872 if(status == GNI_RC_SUCCESS)
2874 pd->local_mem_hndl = msg_mem_hndl;
2876 if(pd->type == GNI_POST_RDMA_GET)
2878 CMI_GNI_LOCK(rdma_tx_cq_lock)
2879 status = GNI_PostRdma(ep_hndl_array[inst_id], pd);
2880 CMI_GNI_UNLOCK(rdma_tx_cq_lock)
2884 CMI_GNI_LOCK(default_tx_cq_lock)
2885 status = GNI_PostFma(ep_hndl_array[inst_id], pd);
2886 CMI_GNI_UNLOCK(default_tx_cq_lock)
2891 SetMemHndlZero(pd->local_mem_hndl);
2893 if(status == GNI_RC_ERROR_RESOURCE|| status == GNI_RC_ERROR_NOMEM )
2895 MallocRdmaRequest(rdma_request_msg);
2896 rdma_request_msg->next = 0;
2897 rdma_request_msg->destNode = inst_id;
2898 rdma_request_msg->pd = pd;
2899 PCQueuePush(sendRdmaBuf, (char*)rdma_request_msg);
2901 GNI_RC_CHECK("AFter posting", status);
2907 static void PumpCqWriteTransactions(void)
2911 gni_return_t status;
2915 //CMI_GNI_LOCK(my_cq_lock)
2916 status = GNI_CqGetEvent(rdma_rx_cqh, &ev);
2917 //CMI_GNI_UNLOCK(my_cq_lock)
2918 if(status != GNI_RC_SUCCESS) break;
2919 msg = (void*) ( GNI_CQ_GET_DATA(ev) & 0xFFFFFFFFFFFFL);
2920 #if CMK_PERSISTENT_COMM_PUT
2922 printf(" %d CQ write event %p\n", myrank, msg);
2924 if (!IsMemHndlZero(MEMHFIELD(msg))) {
2926 printf(" %d Persistent CQ write event %p\n", myrank, msg);
2929 msg_size = CmiGetMsgSize(msg);
2930 CMI_CHECK_CHECKSUM(msg, msg_size);
2931 handleOneRecvedMsg(msg_size, msg);
2935 #if ! USE_LRTS_MEMPOOL
2936 // MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((ACK_MSG *)header)->source_mem_hndl), &omdh, ((ACK_MSG *)header)->length);
2938 DecreaseMsgInSend(msg);
2940 if(NoMsgInSend(msg))
2941 buffered_send_msg -= GetMempoolsize(msg);
2944 if(status == GNI_RC_ERROR_RESOURCE)
2946 GNI_RC_CHECK("rdma_rx_cq full too many ack", status);
2952 static void PumpRemoteTransactions(gni_cq_handle_t rx_cqh)
2955 gni_return_t status;
2957 int inst_id, index, type, size;
2959 #if CMI_PUMPREMOTETRANSACTIONS_CAP
2963 #if CMI_PUMPREMOTETRANSACTIONS_CAP
2964 if (pump_count > PumpRemoteTransactions_cap) break;
2966 CMI_GNI_LOCK(global_gni_lock)
2967 // CMI_GNI_LOCK(rdma_tx_cq_lock)
2968 status = GNI_CqGetEvent(rx_cqh, &ev);
2969 // CMI_GNI_UNLOCK(rdma_tx_cq_lock)
2970 CMI_GNI_UNLOCK(global_gni_lock)
2972 if(status != GNI_RC_SUCCESS) break;
2974 #if CMI_PUMPREMOTETRANSACTIONS_CAP
2978 inst_id = GNI_CQ_GET_REM_INST_ID(ev);
2979 index = GET_INDEX(inst_id);
2980 type = GET_TYPE(inst_id);
2983 CmiAssert(index>=0 && index<ackPool.size);
2984 CMI_GNI_LOCK(ackPool.lock);
2985 //CmiAssert(GetIndexType(ackPool, index) == 1);
2986 msg = GetIndexAddress(ackPool, index);
2987 CMI_GNI_UNLOCK(ackPool.lock);
2989 MACHSTATE4(8,"[%d] PumpRemoteTransactions: ack: %lld index: %d type: %d.\n", myrank, msg, index, type);
2991 #if ! USE_LRTS_MEMPOOL
2992 // MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((ACK_MSG *)header)->source_mem_hndl), &omdh, ((ACK_MSG *)header)->length);
2994 DecreaseMsgInSend(msg);
2996 if(NoMsgInSend(msg))
2997 buffered_send_msg -= GetMempoolsize(msg);
2999 IndexPool_freeslot(&ackPool, index);
3000 #if CMI_EXERT_SEND_LARGE_CAP
3001 SEND_large_pending--;
3004 #if CMK_PERSISTENT_COMM_PUT
3005 case 1: { // PERSISTENT
3006 CmiLock(persistPool.lock);
3007 CmiAssert(GetIndexType(persistPool, index) == NONCHARM_SMSG);
3008 PersistentReceivesTable *slot = (PersistentReceivesTable *)GetIndexAddress(persistPool, index);
3009 CmiUnlock(persistPool.lock);
3010 msg = slot->destBuf[slot->addrIndex].destAddress;
3011 size = CmiGetMsgSize(msg);
3013 CMI_CHECK_CHECKSUM(msg, size);
3014 handleOneRecvedMsg(size, (char*)msg);
3019 fprintf(stderr, "[%d] PumpRemoteTransactions: unknown type: %d\n", myrank, type);
3020 CmiAbort("PumpRemoteTransactions: unknown type");
3023 if(status == GNI_RC_ERROR_RESOURCE)
3025 printf("charm> Please use +useRecvQueue %d in your command line, if the error comes again, increase this number\n", REMOTE_QUEUE_ENTRIES*2);
3026 GNI_RC_CHECK("PumpRemoteTransactions: rx_cqh full", status);
3031 /* This code overlaps with code in machine-onesided.c in PumpOneSidedRDMATransactions() */
3032 static void PumpLocalTransactions(gni_cq_handle_t my_tx_cqh, CmiNodeLock my_cq_lock)
3035 gni_return_t status;
3036 uint64_t type, inst_id;
3037 gni_post_descriptor_t *tmp_pd;
3039 CONTROL_MSG *ack_msg_tmp;
3043 CMK_DIRECT_HEADER *cmk_direct_done_msg;
3045 SMSG_QUEUE *queue = &smsg_queue;
3046 #if CMI_PUMPLOCALTRANSACTIONS_CAP
3048 while(pump_count < PumpLocalTransactions_cap) {
3053 CMI_GNI_LOCK(my_cq_lock)
3054 status = GNI_CqGetEvent(my_tx_cqh, &ev);
3055 CMI_GNI_UNLOCK(my_cq_lock)
3056 if(status != GNI_RC_SUCCESS) break;
3058 type = GNI_CQ_GET_TYPE(ev);
3059 if (type == GNI_CQ_EVENT_TYPE_POST)
3062 #if CMI_EXERT_RECV_RDMA_CAP
3063 if(RDMA_pending <=0) CmiAbort(" pending error\n");
3066 inst_id = GNI_CQ_GET_INST_ID(ev);
3068 printf("[%d] LocalTransactions localdone=%d\n", myrank, lrts_local_done_msg);
3070 CMI_GNI_LOCK(my_cq_lock)
3071 status = GNI_GetCompleted(my_tx_cqh, ev, &tmp_pd);
3072 CMI_GNI_UNLOCK(my_cq_lock)
3074 switch (tmp_pd->type) {
3075 #if CMK_PERSISTENT_COMM_PUT || CMK_DIRECT
3076 case GNI_POST_RDMA_PUT:
3077 #if CMK_PERSISTENT_COMM_PUT && ! USE_LRTS_MEMPOOL
3078 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length);
3080 case GNI_POST_FMA_PUT:
3081 if(tmp_pd->amo_cmd == 1) {
3083 //sender ACK to receiver to trigger it is done
3084 cmk_direct_done_msg = (CMK_DIRECT_HEADER*) malloc(sizeof(CMK_DIRECT_HEADER));
3085 cmk_direct_done_msg->handler_addr = tmp_pd->first_operand;
3086 msg_tag = DIRECT_PUT_DONE_TAG;
3090 CmiFree((void *)tmp_pd->local_addr);
3092 FreePostDesc(tmp_pd);
3095 sendCqWrite(inst_id, tmp_pd->remote_addr, tmp_pd->remote_mem_hndl);
3096 FreePostDesc(tmp_pd);
3099 MallocControlMsg(ack_msg_tmp);
3100 ack_msg_tmp->source_addr = tmp_pd->remote_addr;
3101 ack_msg_tmp->source_mem_hndl = tmp_pd->remote_mem_hndl;
3102 ack_msg_tmp->length = tmp_pd->length;
3103 msg_tag = PUT_DONE_TAG;
3108 case GNI_POST_RDMA_GET:
3109 case GNI_POST_FMA_GET: {
3110 MACHSTATE2(8, "PumpLocal Get done %lld=>%lld\n", tmp_pd->local_addr, tmp_pd->remote_addr);
3111 #if ! USE_LRTS_MEMPOOL
3112 MallocControlMsg(ack_msg_tmp);
3113 ack_msg_tmp->source_addr = tmp_pd->remote_addr;
3114 ack_msg_tmp->source_mem_hndl = tmp_pd->remote_mem_hndl;
3115 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length)
3119 RDMA_TRANS_DONE(tmp_pd->sync_flag_value/1000000.0)
3121 int seq_id = tmp_pd->cqwrite_value;
3122 if(seq_id > 0) // BIG_MSG
3124 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length);
3125 MallocControlMsg(ack_msg_tmp);
3126 ack_msg_tmp->source_addr = tmp_pd->remote_addr;
3127 ack_msg_tmp->source_mem_hndl = tmp_pd->remote_mem_hndl;
3128 ack_msg_tmp->seq_id = seq_id;
3129 ack_msg_tmp->dest_addr = tmp_pd->local_addr - ONE_SEG*(ack_msg_tmp->seq_id-1);
3130 ack_msg_tmp->source_addr -= ONE_SEG*(ack_msg_tmp->seq_id-1);
3131 ack_msg_tmp->length = tmp_pd->length;
3132 ack_msg_tmp->total_length = tmp_pd->first_operand; // total size
3133 msg_tag = BIG_MSG_TAG;
3138 CmiReference((void*)tmp_pd->local_addr);
3140 #if !REMOTE_EVENT && !CQWRITE
3141 MallocAckMsg(ack_msg);
3142 ack_msg->source_addr = tmp_pd->remote_addr;
3148 case GNI_POST_CQWRITE:
3149 FreePostDesc(tmp_pd);
3152 CmiPrintf("type=%d\n", tmp_pd->type);
3153 CmiAbort("PumpLocalTransactions: unknown type!");
3154 } /* end of switch */
3157 if (tmp_pd->amo_cmd == 1) {
3158 status = send_smsg_message(queue, inst_id, cmk_direct_done_msg, sizeof(CMK_DIRECT_HEADER), msg_tag, 0, NULL, NONCHARM_SMSG, 0);
3159 #if ! CMK_SMSGS_FREE_AFTER_EVENT
3160 if (status == GNI_RC_SUCCESS) free(cmk_direct_done_msg);
3165 if (msg_tag == ACK_TAG) {
3168 status = send_smsg_message(queue, inst_id, ack_msg, ACK_MSG_SIZE, msg_tag, 0, NULL, NONCHARM_SMSG, 0);
3169 #if !CMK_SMSGS_FREE_AFTER_EVENT
3170 if (status == GNI_RC_SUCCESS) FreeAckMsg(ack_msg);
3173 sendCqWrite(inst_id, tmp_pd->remote_addr, tmp_pd->remote_mem_hndl);
3178 status = send_smsg_message(queue, inst_id, ack_msg_tmp, CONTROL_MSG_SIZE, msg_tag, 0, NULL, NONCHARM_SMSG, 0);
3179 #if !CMK_SMSGS_FREE_AFTER_EVENT
3180 if (status == GNI_RC_SUCCESS) FreeControlMsg(ack_msg_tmp);
3183 #if CMK_PERSISTENT_COMM_PUT
3184 if (tmp_pd->type == GNI_POST_RDMA_GET || tmp_pd->type == GNI_POST_FMA_GET)
3187 if( msg_tag == ACK_TAG){ //msg fit in mempool
3189 printf("PumpLocalTransactions: Normal msg transaction PE:%d==>%d\n", myrank, inst_id);
3191 TRACE_COMM_CONTROL_CREATION((double)(tmp_pd->sync_flag_addr/1000000.0), (double)((tmp_pd->sync_flag_addr+1)/1000000.0), (double)((tmp_pd->sync_flag_addr+1)/1000000.0), (char*)tmp_pd->local_addr);
3192 TRACE_COMM_CONTROL_CREATION((double)(tmp_pd->sync_flag_value/1000000.0), (double)((tmp_pd->sync_flag_value+1)/1000000.0), (double)((tmp_pd->sync_flag_value+1)/1000000.0), (char*)tmp_pd->local_addr);
3194 //CmiAssert(SIZEFIELD((void*)(tmp_pd->local_addr)) <= tmp_pd->length);
3195 DecreaseMsgInRecv((void*)tmp_pd->local_addr);
3196 #if MACHINE_DEBUG_LOG
3197 if(NoMsgInRecv((void*)(tmp_pd->local_addr)))
3198 buffered_recv_msg -= GetMempoolsize((void*)(tmp_pd->local_addr));
3199 MACHSTATE5(8, "GO Recv done ack send from %d (%d,%d, %d) tag=%d\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size, msg_tag);
3201 CMI_CHECK_CHECKSUM((void*)tmp_pd->local_addr, CmiGetMsgSize(tmp_pd->local_addr));
3202 handleOneRecvedMsg(CmiGetMsgSize(tmp_pd->local_addr), (char *)tmp_pd->local_addr);
3203 }else if(msg_tag == BIG_MSG_TAG){
3204 void *msg = (char*)tmp_pd->local_addr-(tmp_pd->cqwrite_value-1)*ONE_SEG;
3205 CmiSetMsgSeq(msg, CmiGetMsgSeq(msg)+1);
3206 if (tmp_pd->first_operand <= ONE_SEG*CmiGetMsgSeq(msg)) {
3209 printf("Pipeline msg done [%d]\n", myrank);
3211 #if CMK_SMP_TRACE_COMMTHREAD
3212 if( tmp_pd->cqwrite_value == 1)
3213 TRACE_COMM_CONTROL_CREATION((double)(tmp_pd->sync_flag_addr/1000000.0), (double)((tmp_pd->sync_flag_addr+1)/1000000.0), (double)((tmp_pd->sync_flag_addr+2)/1000000.0), (char*)tmp_pd->local_addr);
3215 CMI_CHECK_CHECKSUM(msg, CmiGetMsgSize(msg));
3216 handleOneRecvedMsg(CmiGetMsgSize(msg), (char *)msg);
3220 FreePostDesc(tmp_pd);
3222 #if CMK_SMSGS_FREE_AFTER_EVENT
3223 else if (type == GNI_CQ_EVENT_TYPE_SMSG) {
3224 // a SmsgsSend is done
3225 int msgid = GNI_CQ_GET_MSG_ID(ev);
3226 int type = GetIndexType(smsgsPool, msgid);
3227 void *addr = GetIndexAddress(smsgsPool, msgid);
3235 case NONCHARM_SMSG_DONT_FREE:
3238 CmiAbort("Invalid SmsgsIndex");
3240 IndexPool_freeslot(&smsgsPool, msgid);
3244 if(status == GNI_RC_ERROR_RESOURCE)
3246 printf("charm> Please use +useSendQueue 204800 in your command line, if the error comes again, increase this number\n");
3247 GNI_RC_CHECK("Smsg_tx_cq full", status);
3251 static void SendRdmaMsg( BufferList sendqueue)
3253 gni_return_t status = GNI_RC_SUCCESS;
3254 gni_mem_handle_t msg_mem_hndl;
3255 RDMA_REQUEST *ptr = 0, *tmp_ptr;
3256 RDMA_REQUEST *pre = 0;
3257 uint64_t register_size = 0;
3261 int len = PCQueueLength(sendqueue);
3262 for (i=0; i<len; i++)
3264 #if CMI_EXERT_RECV_RDMA_CAP
3265 if( RDMA_pending >= RDMA_cap) break;
3267 CMI_PCQUEUEPOP_LOCK( sendqueue)
3268 ptr = (RDMA_REQUEST*)PCQueuePop(sendqueue);
3269 CMI_PCQUEUEPOP_UNLOCK( sendqueue)
3270 if (ptr == NULL) break;
3272 gni_post_descriptor_t *pd = ptr->pd;
3274 msg = (void*)(pd->local_addr);
3275 status = registerMessage(msg, pd->length, pd->cqwrite_value, &pd->local_mem_hndl);
3277 if(pd->cqwrite_value == 0) {
3278 if(NoMsgInRecv(msg))
3279 register_size = GetMempoolsize(msg);
3282 if(status == GNI_RC_SUCCESS) //mem register good
3284 int destNode = ptr->destNode;
3285 CmiNodeLock lock = (pd->type == GNI_POST_RDMA_GET || pd->type == GNI_POST_RDMA_PUT) ? rdma_tx_cq_lock:default_tx_cq_lock;
3288 if( pd->cqwrite_value == 0 || pd->cqwrite_value == -1) {
3289 pd->cq_mode |= GNI_CQMODE_REMOTE_EVENT;
3290 int sts = GNI_EpSetEventData(ep_hndl_array[destNode], destNode, ACK_EVENT(ptr->ack_index));
3291 GNI_RC_CHECK("GNI_EpSetEventData", sts);
3293 #if CMK_PERSISTENT_COMM_PUT
3294 else if (pd->cqwrite_value == PERSIST_SEQ) {
3295 pd->cq_mode |= GNI_CQMODE_REMOTE_EVENT;
3296 int sts = GNI_EpSetEventData(ep_hndl_array[destNode], destNode, PERSIST_EVENT(ptr->ack_index));
3297 GNI_RC_CHECK("GNI_EpSetEventData", sts);
3301 else if (pd->cqwrite_value == DIRECT_SEQ) {
3302 pd->cq_mode |= GNI_CQMODE_REMOTE_EVENT;
3303 int sts = GNI_EpSetEventData(ep_hndl_array[destNode], destNode, DIRECT_EVENT(ptr->ack_index));
3304 GNI_RC_CHECK("GNI_EpSetEventData", sts);
3310 RDMA_TRY_SEND(pd->type)
3312 #if CMK_SMP_TRACE_COMMTHREAD
3313 if(IS_PUT(pd->type))
3316 TRACE_COMM_CREATION(EVENT_TIME(), (char*)pd->local_addr);//based on assumption, post always succeeds on first try
3319 if(pd->type == GNI_POST_RDMA_GET || pd->type == GNI_POST_RDMA_PUT)
3321 status = GNI_PostRdma(ep_hndl_array[destNode], pd);
3325 status = GNI_PostFma(ep_hndl_array[destNode], pd);
3327 CMI_GNI_UNLOCK(lock);
3329 if(status == GNI_RC_SUCCESS) //post good
3331 MACHSTATE4(8, "post noempty-rdma %d (%lld==%lld,%d) \n", ptr->destNode, pd->local_addr, pd->remote_addr, register_memory_size);
3332 #if CMI_EXERT_RECV_RDMA_CAP
3335 if(pd->cqwrite_value <= 0)
3337 #if CMK_SMP_TRACE_COMMTHREAD
3338 pd->sync_flag_value = 1000000 * CmiWallTimer(); //microsecond
3340 IncreaseMsgInRecv(((void*)(pd->local_addr)));
3343 pd->sync_flag_value = 1000000 * CmiWallTimer(); //microsecond
3344 RDMA_TRANS_INIT(pd->type, pd->sync_flag_addr/1000000.0)
3346 #if MACHINE_DEBUG_LOG
3347 buffered_recv_msg += register_size;
3348 MACHSTATE(8, "GO request from buffered\n");
3351 printf("[%d] SendRdmaMsg: post succeed. seqno: %d\n", myrank, pd->cqwrite_value);
3353 FreeRdmaRequest(ptr);
3354 }else // cannot post
3356 PCQueuePush(sendRdmaBuf, (char*)ptr);
3358 printf("[%d] SendRdmaMsg: post failed. seqno: %x dest: %d local mhdl: %lld %lld remote mhdl: %lld %lld connect: %d\n", myrank, pd->cqwrite_value, destNode, pd->local_mem_hndl.qword1, pd->local_mem_hndl.qword2, pd->remote_mem_hndl.qword1, pd->remote_mem_hndl.qword2, smsg_connected_flag[destNode]);
3362 } else //memory registration fails
3364 PCQueuePush(sendqueue, (char*)ptr);
3370 INLINE_KEYWORD gni_return_t _sendOneBufferedSmsg(SMSG_QUEUE *queue, MSG_LIST *ptr)
3372 CONTROL_MSG *control_msg_tmp;
3373 gni_return_t status = GNI_RC_ERROR_RESOURCE;
3374 int numRdmaOps, recvInfoSize, msgSize;
3375 NcpyOperationInfo *ncpyOpInfo;
3377 MACHSTATE5(8, "noempty-smsg %d (%d,%d,%d) tag=%d \n", ptr->destNode, buffered_send_msg, buffered_recv_msg, register_memory_size, ptr->tag);
3378 if (useDynamicSMSG && smsg_connected_flag[ptr->destNode] != 2) {
3379 /* connection not exists yet */
3381 /* non-smp case, connect is issued in send_smsg_message */
3382 if (smsg_connected_flag[ptr->destNode] == 0)
3383 connect_to(ptr->destNode);
3389 case SMALL_DATA_TAG:
3390 status = send_smsg_message(queue, ptr->destNode, ptr->msg, ptr->size, ptr->tag, 1, ptr, CHARM_SMSG, 0);
3391 #if !CMK_SMSGS_FREE_AFTER_EVENT
3392 if(status == GNI_RC_SUCCESS)
3398 case LMSG_PERSISTENT_INIT_TAG:
3400 case LMSG_OOB_INIT_TAG:
3401 control_msg_tmp = (CONTROL_MSG*)ptr->msg;
3402 status = send_large_messages(queue, ptr->destNode, control_msg_tmp, 1, ptr, ptr->tag);
3404 #if !REMOTE_EVENT && !CQWRITE
3406 status = send_smsg_message(queue, ptr->destNode, ptr->msg, ptr->size, ptr->tag, 1, ptr, NONCHARM_SMSG, 0);
3407 #if !CMK_SMSGS_FREE_AFTER_EVENT
3408 if(status == GNI_RC_SUCCESS) FreeAckMsg((ACK_MSG*)ptr->msg);
3413 status = send_smsg_message(queue, ptr->destNode, ptr->msg, ptr->size, ptr->tag, 1, ptr, NONCHARM_SMSG, 0);
3414 #if !CMK_SMSGS_FREE_AFTER_EVENT
3415 if(status == GNI_RC_SUCCESS)
3417 FreeControlMsg((CONTROL_MSG*)ptr->msg);
3421 #if CMK_PERSISTENT_COMM_PUT && !REMOTE_EVENT && !CQWRITE
3423 status = send_smsg_message(queue, ptr->destNode, ptr->msg, ptr->size, ptr->tag, 1, ptr, NONCHARM_SMSG, 0);
3424 #if !CMK_SMSGS_FREE_AFTER_EVENT
3425 if(status == GNI_RC_SUCCESS)
3427 FreeControlMsg((CONTROL_MSG*)ptr->msg);
3433 case DIRECT_PUT_DONE_TAG:
3434 status = send_smsg_message(queue, ptr->destNode, ptr->msg, sizeof(CMK_DIRECT_HEADER), ptr->tag, 1, ptr, NONCHARM_SMSG, 0);
3435 #if !CMK_SMSGS_FREE_AFTER_EVENT
3436 if(status == GNI_RC_SUCCESS)
3438 free((CMK_DIRECT_HEADER*)ptr->msg);
3444 status = send_smsg_message(queue, ptr->destNode, ptr->msg, sizeof(CmiGNIAckOp_t), ptr->tag, 1, ptr, NONCHARM_SMSG, 1);
3445 #if !CMK_SMSGS_FREE_AFTER_EVENT
3446 if(status == GNI_RC_SUCCESS) {
3452 case RDMA_PUT_MD_TAG:
3453 numRdmaOps = ((CmiGNIRzvRdmaRecv_t *)(ptr->msg))->numOps;
3454 recvInfoSize = LrtsGetRdmaRecvInfoSize(numRdmaOps);
3455 status = send_smsg_message(queue, ptr->destNode, ptr->msg, recvInfoSize, ptr->tag, 1, ptr, NONCHARM_SMSG_DONT_FREE, 0);
3458 case RDMA_PUT_DONE_TAG:
3459 numRdmaOps = ((CmiGNIRzvRdmaRecv_t *)(ptr->msg))->numOps;
3460 recvInfoSize = LrtsGetRdmaRecvInfoSize(numRdmaOps);
3461 status = send_smsg_message(queue, ptr->destNode, ptr->msg, recvInfoSize, ptr->tag, 1, ptr, NONCHARM_SMSG, 1);
3462 #if !CMK_SMSGS_FREE_AFTER_EVENT
3463 if(status == GNI_RC_SUCCESS) {
3469 case RDMA_PUT_MD_DIRECT_TAG:
3470 ncpyOpInfo = (NcpyOperationInfo *)(ptr->msg);
3471 status = send_smsg_message(queue, ptr->destNode, ptr->msg, ncpyOpInfo->ncpyOpInfoSize, ptr->tag, 1, ptr, CHARM_SMSG, 1);
3472 #if !CMK_SMSGS_FREE_AFTER_EVENT
3473 if(status == GNI_RC_SUCCESS) {
3479 case RDMA_REG_AND_GET_MD_DIRECT_TAG:
3480 case RDMA_REG_AND_PUT_MD_DIRECT_TAG:
3481 //msgSize = sizeof(CmiGNIRzvRdmaReverseOp_t) + 2*(((CmiGNIRzvRdmaReverseOp_t *)(ptr->msg))->ackSize);
3482 ncpyOpInfo = (NcpyOperationInfo *)(ptr->msg);
3483 status = send_smsg_message(queue, ptr->destNode, ptr->msg, ncpyOpInfo->ncpyOpInfoSize, ptr->tag, 1, ptr, CHARM_SMSG, 1);
3484 #if !CMK_SMSGS_FREE_AFTER_EVENT
3485 if(status == GNI_RC_SUCCESS) {
3492 printf("Weird tag: %d\n", ptr->tag);
3493 CmiAbort("should not happen\n");
3498 // return 1 if all messages are sent
3502 static int SendBufferMsg(SMSG_QUEUE *queue, SMSG_QUEUE *prio_queue)
3504 MSG_LIST *ptr, *tmp_ptr, *pre=0, *current_head;
3505 CONTROL_MSG *control_msg_tmp;
3506 gni_return_t status;
3508 uint64_t register_size;
3509 void *register_addr;
3510 int index_previous = -1;
3511 #if CMI_SENDBUFFERSMSG_CAP
3512 int sent_length = 0;
3515 memset(destpe_avail, 0, mysize * sizeof(char));
3516 for (index=0; index<1; index++)
3519 CmiPrintf("[%d] Called SendBufferMsg\n", CmiMyPe());
3522 #if CMK_LOCKLESS_QUEUE
3523 len = MPMCQueueLength(queue->sendMsgBuf);
3525 len = PCQueueLength(queue->sendMsgBuf);
3527 for (i=0; i<len; i++)
3529 #if CMK_LOCKLESS_QUEUE
3530 ptr = (MSG_LIST*)MPMCQueuePop(queue->sendMsgBuf);
3532 CMI_PCQUEUEPOP_LOCK(queue->sendMsgBuf)
3533 ptr = (MSG_LIST*)PCQueuePop(queue->sendMsgBuf);
3534 CMI_PCQUEUEPOP_UNLOCK(queue->sendMsgBuf)
3536 if(ptr == NULL) break;
3537 if (destpe_avail[ptr->destNode] == 1) { /* can't send to this pe */
3538 #if CMK_LOCKLESS_QUEUE
3539 MPMCQueuePush(queue->sendMsgBuf, (char*)ptr);
3541 PCQueuePush(queue->sendMsgBuf, (char*)ptr);
3546 if (ptr->tag == RDMA_COMM_PERFORM_GET_TAG) {
3547 // Comm thread performing GET on behalf of worker thread for Direct API
3548 _performOneRgetForWorkerThread(ptr);
3552 else if (ptr->tag == RDMA_COMM_PERFORM_PUT_TAG) {
3553 // Comm thread performing PUT on behalf of worker thread for Direct API
3554 _performOneRputForWorkerThread(ptr);
3559 status = _sendOneBufferedSmsg(queue, ptr);
3560 #if CMI_SENDBUFFERSMSG_CAP
3563 if(status == GNI_RC_SUCCESS)
3566 buffered_smsg_counter--;
3567 printf("[%d==>%d] buffered smsg sending done\n", myrank, ptr->destNode);
3571 #if CMK_LOCKLESS_QUEUE
3572 MPMCQueuePush(queue->sendMsgBuf, (char*)ptr);
3574 PCQueuePush(queue->sendMsgBuf, (char*)ptr);
3577 if(status == GNI_RC_ERROR_RESOURCE)
3579 destpe_avail[ptr->destNode] = 1;
3583 } // end pooling for all cores
3587 #else /* ! ONE_SEND_QUEUE */
3589 static int SendBufferMsg(SMSG_QUEUE *queue, SMSG_QUEUE *prio_queue)
3592 gni_return_t status;
3594 #if CMI_SENDBUFFERSMSG_CAP
3595 int sent_length = 0;
3600 int nonempty = PCQueueLength(queue->nonEmptyQueues);
3601 for(idx =0; idx<nonempty; idx++)
3603 index++; if (index >= nonempty) index = 0;
3604 #if CMI_SENDBUFFERSMSG_CAP
3605 if ( sent_length >= SendBufferMsg_cap) { done = 0; return done;}
3607 CMI_PCQUEUEPOP_LOCK(queue->nonEmptyQueues)
3608 MSG_LIST_INDEX *current_list = (MSG_LIST_INDEX *)PCQueuePop(queue->nonEmptyQueues);
3609 CMI_PCQUEUEPOP_UNLOCK(queue->nonEmptyQueues)
3610 if(current_list == NULL) break;
3611 if (prio_queue && PCQueueLength(prio_queue->smsg_msglist_index[current_list->destpe].sendSmsgBuf) != 0) {
3612 PCQueuePush(queue->nonEmptyQueues, (char*)current_list);
3615 PCQueue current_queue= current_list->sendSmsgBuf;
3616 CmiLock(current_list->lock);
3617 int i, len = PCQueueLength(current_queue);
3618 current_list->pushed = 0;
3619 CmiUnlock(current_list->lock);
3620 #else /* ! SMP_LOCKS */
3621 static int index = -1;
3622 for(idx =0; idx<mysize; idx++)
3624 index++; if (index == mysize) index = 0;
3625 #if CMI_SENDBUFFERSMSG_CAP
3626 if ( sent_length >= SendBufferMsg_cap) { done = 0; return done;}
3628 if (prio_queue && PCQueueLength(prio_queue->smsg_msglist_index[index].sendSmsgBuf) != 0) continue; // check urgent queue
3629 //if (index == myrank) continue;
3630 PCQueue current_queue = queue->smsg_msglist_index[index].sendSmsgBuf;
3631 int i, len = PCQueueLength(current_queue);
3633 for (i=0; i<len; i++) {
3634 CMI_PCQUEUEPOP_LOCK(current_queue)
3635 ptr = (MSG_LIST*)PCQueuePop(current_queue);
3636 CMI_PCQUEUEPOP_UNLOCK(current_queue)
3637 if (ptr == 0) break;
3639 if (ptr->tag == RDMA_COMM_PERFORM_GET_TAG) {
3640 // Comm thread performing GET on behalf of worker thread for Direct API
3641 _performOneRgetForWorkerThread(ptr);
3645 else if (ptr->tag == RDMA_COMM_PERFORM_PUT_TAG) {
3646 // Comm thread performing PUT on behalf of worker thread for Direct API
3647 _performOneRputForWorkerThread(ptr);
3652 status = _sendOneBufferedSmsg(queue, ptr);
3653 #if CMI_SENDBUFFERSMSG_CAP
3656 if(status == GNI_RC_SUCCESS)
3659 buffered_smsg_counter--;
3660 printf("[%d==>%d] buffered smsg sending done\n", myrank, ptr->destNode);
3664 PCQueuePush(current_queue, (char*)ptr);
3666 if(status == GNI_RC_ERROR_RESOURCE)
3673 CmiLock(current_list->lock);
3674 if(!PCQueueEmpty(current_queue) && current_list->pushed == 0)
3676 current_list->pushed = 1;
3677 PCQueuePush(queue->nonEmptyQueues, (char*)current_list);
3679 CmiUnlock(current_list->lock);
3681 } // end pooling for all cores
3687 static void ProcessDeadlock(void);
3688 void LrtsAdvanceCommunication(int whileidle)
3690 static int count = 0;
3691 /* Receive Msg first */
3692 #if CMK_SMP_TRACE_COMMTHREAD
3693 double startT, endT;
3695 if (useDynamicSMSG && whileidle)
3697 #if CMK_SMP_TRACE_COMMTHREAD
3698 startT = CmiWallTimer();
3700 STATS_PUMPDATAGRAMCONNECTION_TIME(PumpDatagramConnection());
3701 #if CMK_SMP_TRACE_COMMTHREAD
3702 endT = CmiWallTimer();
3703 if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_SetupConnect, startT, endT);
3707 SEND_OOB_SMSG(smsg_oob_queue)
3708 PUMP_REMOTE_HIGHPRIORITY
3709 PUMP_LOCAL_HIGHPRIORITY
3710 POST_HIGHPRIORITY_RDMA
3711 // Receiving small messages and persistent
3712 #if CMK_SMP_TRACE_COMMTHREAD
3713 startT = CmiWallTimer();
3715 STATS_PUMPNETWORK_TIME(PumpNetworkSmsg());
3716 #if CMK_SMP_TRACE_COMMTHREAD
3717 endT = CmiWallTimer();
3718 if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_PumpSmsg, startT, endT);
3721 SEND_OOB_SMSG(smsg_oob_queue)
3722 PUMP_REMOTE_HIGHPRIORITY
3723 PUMP_LOCAL_HIGHPRIORITY
3724 POST_HIGHPRIORITY_RDMA
3726 ///* Send buffered Message */
3727 #if CMK_SMP_TRACE_COMMTHREAD
3728 startT = CmiWallTimer();
3731 STATS_SEND_SMSGS_TIME(SendBufferMsg(&smsg_queue, &smsg_oob_queue));
3733 STATS_SEND_SMSGS_TIME(SendBufferMsg(&smsg_queue, NULL));
3735 #if CMK_SMP_TRACE_COMMTHREAD
3736 endT = CmiWallTimer();
3737 if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_SendBufferSmsg, startT, endT);
3740 SEND_OOB_SMSG(smsg_oob_queue)
3741 PUMP_REMOTE_HIGHPRIORITY
3742 PUMP_LOCAL_HIGHPRIORITY
3743 POST_HIGHPRIORITY_RDMA
3745 //Pump Get messages or PUT messages
3746 #if CMK_SMP_TRACE_COMMTHREAD
3747 startT = CmiWallTimer();
3749 PumpLocalTransactions(default_tx_cqh, default_tx_cq_lock);
3750 #if MULTI_THREAD_SEND
3751 STATS_PUMPLOCALTRANSACTIONS_RDMA_TIME(PumpLocalTransactions(rdma_tx_cqh, rdma_tx_cq_lock));
3753 #if CMK_SMP_TRACE_COMMTHREAD
3754 endT = CmiWallTimer();
3755 if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_PumpTransaction, startT, endT);
3758 SEND_OOB_SMSG(smsg_oob_queue)
3759 PUMP_REMOTE_HIGHPRIORITY
3760 PUMP_LOCAL_HIGHPRIORITY
3761 POST_HIGHPRIORITY_RDMA
3763 #if CMK_SMP_TRACE_COMMTHREAD
3764 startT = CmiWallTimer();
3767 PumpCqWriteTransactions();
3769 #if CMK_ONESIDED_IMPL
3770 PumpOneSidedRDMATransactions(rdma_onesided_cqh, rdma_onesided_cq_lock);
3773 STATS_PUMPREMOTETRANSACTIONS_TIME(PumpRemoteTransactions(rdma_rx_cqh));
3775 #if CMK_SMP_TRACE_COMMTHREAD
3776 endT = CmiWallTimer();
3777 if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_PumpRdmaTransaction, startT, endT);
3780 SEND_OOB_SMSG(smsg_oob_queue)
3781 PUMP_REMOTE_HIGHPRIORITY
3782 PUMP_LOCAL_HIGHPRIORITY
3783 POST_HIGHPRIORITY_RDMA
3785 #if CMK_SMP_TRACE_COMMTHREAD
3786 startT = CmiWallTimer();
3788 STATS_SENDRDMAMSG_TIME(SendRdmaMsg(sendRdmaBuf));
3789 #if CMK_SMP_TRACE_COMMTHREAD
3790 endT = CmiWallTimer();
3791 if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_SendFmaRdmaMsg, startT, endT);
3794 #if CMK_SMP && ! LARGEPAGE
3795 if (_detected_hang) ProcessDeadlock();
3799 static void set_smsg_max(void)
3805 SMSG_MAX_MSG = 1024;
3806 }else if (mysize <= 4096)
3808 SMSG_MAX_MSG = 1024;
3809 }else if (mysize <= 16384)
3816 env = getenv("CHARM_UGNI_SMSG_MAX_SIZE");
3817 if (env) SMSG_MAX_MSG = atoi(env);
3818 CmiAssert(SMSG_MAX_MSG > 0);
3821 /* useDynamicSMSG */
3822 static void _init_dynamic_smsg(void)
3824 gni_return_t status;
3825 uint32_t vmdh_index = -1;
3828 smsg_attr_vector_local = (gni_smsg_attr_t**)malloc(mysize * sizeof(gni_smsg_attr_t*));
3829 smsg_attr_vector_remote = (gni_smsg_attr_t**)malloc(mysize * sizeof(gni_smsg_attr_t*));
3830 smsg_connected_flag = (int*)malloc(sizeof(int)*mysize);
3831 for(i=0; i<mysize; i++) {
3832 smsg_connected_flag[i] = 0;
3833 smsg_attr_vector_local[i] = NULL;
3834 smsg_attr_vector_remote[i] = NULL;
3839 send_smsg_attr.msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
3840 send_smsg_attr.mbox_maxcredit = SMSG_MAX_CREDIT;
3841 send_smsg_attr.msg_maxsize = SMSG_MAX_MSG;
3842 status = GNI_SmsgBufferSizeNeeded(&send_smsg_attr, &smsg_memlen);
3843 GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
3845 mailbox_list = (dynamic_smsg_mailbox_t*)malloc(sizeof(dynamic_smsg_mailbox_t));
3846 mailbox_list->size = smsg_memlen*avg_smsg_connection;
3847 posix_memalign(&mailbox_list->mailbox_base, 64, mailbox_list->size);
3848 memset(mailbox_list->mailbox_base, 0, mailbox_list->size);
3849 mailbox_list->offset = 0;
3850 mailbox_list->next = 0;
3852 status = GNI_MemRegister(nic_hndl, (uint64_t)(mailbox_list->mailbox_base),
3853 mailbox_list->size, smsg_rx_cqh,
3856 &(mailbox_list->mem_hndl));
3857 GNI_RC_CHECK("MEMORY registration for smsg", status);
3859 status = GNI_EpCreate(nic_hndl, default_tx_cqh, &ep_hndl_unbound);
3860 GNI_RC_CHECK("Unbound EP", status);
3862 alloc_smsg_attr(&send_smsg_attr);
3864 status = GNI_EpPostDataWId (ep_hndl_unbound, &send_smsg_attr, SMSG_ATTR_SIZE, &recv_smsg_attr, SMSG_ATTR_SIZE, myrank);
3865 GNI_RC_CHECK("post unbound datagram", status);
3867 /* always pre-connect to proc 0 */
3868 //if (myrank != 0) connect_to(0);
3870 status = GNI_SmsgSetMaxRetrans(nic_hndl, 4096);
3871 GNI_RC_CHECK("SmsgSetMaxRetrans Init", status);
3874 static void _init_static_smsg(void)
3876 gni_smsg_attr_t *smsg_attr;
3877 gni_smsg_attr_t remote_smsg_attr;
3878 gni_smsg_attr_t *smsg_attr_vec;
3879 gni_mem_handle_t my_smsg_mdh_mailbox;
3881 gni_return_t status;
3882 uint32_t vmdh_index = -1;
3883 mdh_addr_t base_infor;
3884 mdh_addr_t *base_addr_vec;
3888 smsg_attr = (gni_smsg_attr_t *)malloc(mysize * sizeof(gni_smsg_attr_t));
3890 smsg_attr[0].msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
3891 smsg_attr[0].mbox_maxcredit = SMSG_MAX_CREDIT;
3892 smsg_attr[0].msg_maxsize = SMSG_MAX_MSG;
3893 status = GNI_SmsgBufferSizeNeeded(&smsg_attr[0], &smsg_memlen);
3894 GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
3895 ret = posix_memalign(&smsg_mailbox_base, 64, smsg_memlen*(mysize));
3896 CmiAssert(ret == 0);
3897 memset(smsg_mailbox_base, 0, smsg_memlen*(mysize));
3899 status = GNI_MemRegister(nic_hndl, (uint64_t)smsg_mailbox_base,
3900 smsg_memlen*(mysize), smsg_rx_cqh,
3903 &my_smsg_mdh_mailbox);
3904 register_memory_size += smsg_memlen*(mysize);
3905 GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
3907 if (myrank == 0 && !quietMode) printf("Charm++> SMSG memory: %1.1fKB\n", 1.0*smsg_memlen*(mysize)/1024);
3908 if (myrank == 0 && register_memory_size>=MAX_REG_MEM ) printf("Charm++> FATAL ERROR your program has risk of hanging \n please set CHARM_UGNI_MEMPOOL_MAX to a larger value or use Dynamic smsg\n");
3910 base_infor.addr = (uint64_t)smsg_mailbox_base;
3911 base_infor.mdh = my_smsg_mdh_mailbox;
3912 base_addr_vec = (mdh_addr_t *)malloc(mysize * sizeof(mdh_addr_t));
3914 allgather(&base_infor, base_addr_vec, sizeof(mdh_addr_t));
3916 for(i=0; i<mysize; i++)
3920 smsg_attr[i].msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
3921 smsg_attr[i].mbox_maxcredit = SMSG_MAX_CREDIT;
3922 smsg_attr[i].msg_maxsize = SMSG_MAX_MSG;
3923 smsg_attr[i].mbox_offset = i*smsg_memlen;
3924 smsg_attr[i].buff_size = smsg_memlen;
3925 smsg_attr[i].msg_buffer = smsg_mailbox_base ;
3926 smsg_attr[i].mem_hndl = my_smsg_mdh_mailbox;
3929 for(i=0; i<mysize; i++)
3931 if (myrank == i) continue;
3933 remote_smsg_attr.msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
3934 remote_smsg_attr.mbox_maxcredit = SMSG_MAX_CREDIT;
3935 remote_smsg_attr.msg_maxsize = SMSG_MAX_MSG;
3936 remote_smsg_attr.mbox_offset = myrank*smsg_memlen;
3937 remote_smsg_attr.buff_size = smsg_memlen;
3938 remote_smsg_attr.msg_buffer = (void*)base_addr_vec[i].addr;
3939 remote_smsg_attr.mem_hndl = base_addr_vec[i].mdh;
3941 /* initialize the smsg channel */
3942 status = GNI_SmsgInit(ep_hndl_array[i], &smsg_attr[i], &remote_smsg_attr);
3943 GNI_RC_CHECK("SMSG Init", status);
3944 } //end initialization
3946 free(base_addr_vec);
3949 status = GNI_SmsgSetMaxRetrans(nic_hndl, 4096);
3950 GNI_RC_CHECK("SmsgSetMaxRetrans Init", status);
3954 static void _init_send_queue(SMSG_QUEUE *queue)
3958 #if CMK_LOCKLESS_QUEUE
3959 queue->sendMsgBuf = MPMCQueueCreate();
3961 queue->sendMsgBuf = PCQueueCreate();
3963 destpe_avail = (char*)malloc(mysize * sizeof(char));
3965 queue->smsg_msglist_index = (MSG_LIST_INDEX*)malloc(mysize*sizeof(MSG_LIST_INDEX));
3967 queue->nonEmptyQueues = PCQueueCreate();
3969 for(i =0; i<mysize; i++)
3971 queue->smsg_msglist_index[i].sendSmsgBuf = PCQueueCreate();
3973 queue->smsg_msglist_index[i].pushed = 0;
3974 queue->smsg_msglist_index[i].lock = CmiCreateLock();
3975 queue->smsg_msglist_index[i].destpe = i;
3982 static void _init_smsg(void)
3986 _init_dynamic_smsg();
3988 _init_static_smsg();
3991 _init_send_queue(&smsg_queue);
3993 _init_send_queue(&smsg_oob_queue);
3997 static void _init_static_msgq(void)
3999 gni_return_t status;
4000 /* MSGQ is to send and receive short messages for large jobs (exceeding 200,000 ranks). The performance scales by the node count rather than rank count */
4001 msgq_attrs.max_msg_sz = MSGQ_MAXSIZE;
4002 msgq_attrs.smsg_q_sz = 1;
4003 msgq_attrs.rcv_pool_sz = 1;
4004 msgq_attrs.num_msgq_eps = 2;
4005 msgq_attrs.nloc_insts = 8;
4006 msgq_attrs.modes = 0;
4007 msgq_attrs.rcv_cq_sz = REMOTE_QUEUE_ENTRIES ;
4009 status = GNI_MsgqInit(nic_hndl, NULL, NULL, NULL, &msgq_attrs, &msgq_handle);
4010 GNI_RC_CHECK("MSGQ Init", status);
4016 static CmiUInt8 total_mempool_size = 0;
4017 static CmiUInt8 total_mempool_calls = 0;
4019 #if USE_LRTS_MEMPOOL
4022 static void *_alloc_mempool_block(size_t *size, gni_mem_handle_t *mem_hndl, int expand_flag, gni_cq_handle_t cqh)
4026 gni_return_t status = GNI_RC_SUCCESS;
4028 size_t default_size = expand_flag? _expand_mem : _mempool_size;
4029 if (*size < default_size) *size = default_size;
4031 // round up to be multiple of _tlbpagesize
4032 //*size = (*size + _tlbpagesize - 1)/_tlbpagesize*_tlbpagesize;
4033 *size = ALIGNHUGEPAGE(*size);
4035 total_mempool_size += *size;
4036 total_mempool_calls += 1;
4038 if ((*size > MAX_REG_MEM || *size > MAX_BUFF_SEND) && expand_flag)
4040 printf("Error: A mempool block with size %lld is allocated, which is greater than the maximum mempool allowed.\n Please increase the max pool size by using +gni-mempool-max or set enviorment variable CHARM_UGNI_MEMPOOL_MAX. (current=%lld, %lld)\n", *size, MAX_REG_MEM, MAX_BUFF_SEND);
4041 CmiAbort("alloc_mempool_block");
4045 pool = my_get_huge_pages(*size);
4048 ret = posix_memalign(&pool, ALIGNBUF, *size);
4051 printf("Charm++> can not allocate memory pool of size %.2fMB. \n", 1.0*(*size)/1024/1024);
4053 CmiAbort("alloc_mempool_block: out of memory.");
4055 CmiAbort("alloc_mempool_block: posix_memalign failed");
4060 MEMORY_REGISTER(onesided_hnd, nic_hndl, pool, *size, mem_hndl, &omdh, cqh, status);
4062 if(status != GNI_RC_SUCCESS) {
4063 printf("[%d, %d] memory reigstration %f G (%lld) ask for %lld\n", myrank, CmiMyRank(), register_memory_size/(1024*1024.0*1024),register_count, *size);
4064 sweep_mempool(CpvAccess(mempool));
4066 GNI_RC_CHECK("MEMORY_REGISTER", status);
4068 SetMemHndlZero((*mem_hndl));
4074 static void *alloc_mempool_block(size_t *size, gni_mem_handle_t *mem_hndl, int expand_flag)
4076 return _alloc_mempool_block(size, mem_hndl, expand_flag, rdma_rx_cqh);
4079 #if CMK_PERSISTENT_COMM_PUT
4081 static void *alloc_persistent_mempool_block(size_t *size, gni_mem_handle_t *mem_hndl, int expand_flag)
4083 return _alloc_mempool_block(size, mem_hndl, expand_flag, highpriority_rx_cqh);
4087 // ptr is a block head pointer
4088 void free_mempool_block(void *ptr, gni_mem_handle_t mem_hndl)
4090 block_header *bh = (block_header *)ptr;
4091 if(!(IsMemHndlZero(mem_hndl)))
4093 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &mem_hndl, &omdh, GetSizeFromBlockHeader(bh));
4096 my_free_huge_pages(ptr, GetSizeFromBlockHeader(bh));
4103 void LrtsPreCommonInit(int everReturn){
4104 #if USE_LRTS_MEMPOOL
4105 CpvInitialize(mempool_type*, mempool);
4106 CpvAccess(mempool) = mempool_init(_mempool_size, alloc_mempool_block, free_mempool_block, _mempool_size_limit);
4107 #if CMK_PERSISTENT_COMM_PUT
4108 CpvInitialize(mempool_type*, persistent_mempool);
4109 CpvAccess(persistent_mempool) = mempool_init(_mempool_size, alloc_persistent_mempool_block, free_mempool_block, _mempool_size_limit);
4111 MACHSTATE2(8, "mempool_init %d %p\n", CmiMyRank(), CpvAccess(mempool)) ;
4115 CMI_EXTERNC_VARIABLE int quietMode;
4117 void LrtsInit(int *argc, char ***argv, int *numNodes, int *myNodeID)
4122 unsigned int remote_addr;
4123 gni_cdm_handle_t cdm_hndl;
4124 gni_return_t status = GNI_RC_SUCCESS;
4125 uint32_t vmdh_index = -1;
4127 unsigned int local_addr, *MPID_UGNI_AllAddr;
4132 //void (*local_event_handler)(gni_cq_entry_t *, void *) = &LocalEventHandle;
4133 //void (*remote_smsg_event_handler)(gni_cq_entry_t *, void *) = &RemoteSmsgEventHandle;
4134 //void (*remote_bte_event_handler)(gni_cq_entry_t *, void *) = &RemoteBteEventHandle;
4136 if(!CharmLibInterOperate || userDrivenMode) {
4137 status = (gni_return_t)PMI_Init(&first_spawned);
4138 GNI_RC_CHECK("PMI_Init", status);
4141 status = (gni_return_t)PMI_Get_size(&mysize);
4142 GNI_RC_CHECK("PMI_Getsize", status);
4144 status = (gni_return_t)PMI_Get_rank(&myrank);
4145 GNI_RC_CHECK("PMI_getrank", status);
4147 //physicalID = CmiPhysicalNodeID(myrank);
4149 //printf("Pysical Node ID:%d for PE:%d\n", physicalID, myrank);
4154 #if MULTI_THREAD_SEND
4155 /* Currently, we only consider the case that comm. thread will only recv msgs */
4156 Cmi_smp_mode_setting = COMM_WORK_THREADS_SEND_RECV;
4159 #if CMI_EXERT_SEND_LARGE_CAP
4160 CmiGetArgInt(*argv,"+useSendLargeCap", &SEND_large_cap);
4163 #if CMI_EXERT_RECV_RDMA_CAP
4164 CmiGetArgInt(*argv,"+useRecvRdmaCap", &RDMA_cap);
4167 #if CMI_SENDBUFFERSMSG_CAP
4168 CmiGetArgInt(*argv,"+useSendBufferCap", &SendBufferMsg_cap);
4171 #if CMI_PUMPNETWORKSMSG_CAP
4172 CmiGetArgInt(*argv,"+usePumpSmsgCap", &PumpNetworkSmsg_cap);
4175 CmiGetArgInt(*argv,"+useRecvQueue", &REMOTE_QUEUE_ENTRIES);
4177 env = getenv("CHARM_UGNI_REMOTE_QUEUE_SIZE");
4178 if (env) REMOTE_QUEUE_ENTRIES = atoi(env);
4179 CmiGetArgInt(*argv,"+useRecvQueue", &REMOTE_QUEUE_ENTRIES);
4181 env = getenv("CHARM_UGNI_LOCAL_QUEUE_SIZE");
4182 if (env) LOCAL_QUEUE_ENTRIES = atoi(env);
4183 CmiGetArgInt(*argv,"+useSendQueue", &LOCAL_QUEUE_ENTRIES);
4185 env = getenv("CHARM_UGNI_DYNAMIC_SMSG");
4186 if (env) useDynamicSMSG = 1;
4187 if (!useDynamicSMSG)
4188 useDynamicSMSG = CmiGetArgFlag(*argv, "+useDynamicSmsg");
4189 CmiGetArgIntDesc(*argv, "+smsgConnection", &avg_smsg_connection,"Initial number of SMSGS connection per code");
4190 if (avg_smsg_connection>mysize) avg_smsg_connection = mysize;
4191 //useStaticMSGQ = CmiGetArgFlag(*argv, "+useStaticMsgQ");
4193 if ((myrank == 0) && (!quietMode))
4195 printf("Charm++> Running on Gemini (GNI) with %d processes\n", mysize);
4196 printf("Charm++> %s SMSG\n", useDynamicSMSG?"dynamic":"static");
4199 onesided_init(NULL, &onesided_hnd);
4201 // this is a GNI test, so use the libonesided bypass functionality
4202 onesided_gni_bypass_get_nih(onesided_hnd, &nic_hndl);
4203 local_addr = gniGetNicAddress();
4206 cookie = get_cookie();
4208 modes = GNI_CDM_MODE_CQ_NIC_LOCAL_PLACEMENT;
4210 //Create and attach to the communication domain */
4211 status = GNI_CdmCreate(myrank, ptag, cookie, modes, &cdm_hndl);
4212 GNI_RC_CHECK("GNI_CdmCreate", status);
4213 //* device id The device id is the minor number for the device
4214 //that is assigned to the device by the system when the device is created.
4215 //To determine the device number, look in the /dev directory, which contains a list of devices. For a NIC, the device is listed as kgniX
4216 //where X is the device number 0 default
4217 // GNI_CdmAttach adds about 1GB memory usage
4218 status = GNI_CdmAttach(cdm_hndl, device_id, &local_addr, &nic_hndl);
4219 GNI_RC_CHECK("GNI_CdmAttach", status);
4220 local_addr = get_gni_nic_address(0);
4222 MPID_UGNI_AllAddr = (unsigned int *)malloc(sizeof(unsigned int) * mysize);
4223 _MEMCHECK(MPID_UGNI_AllAddr);
4224 allgather(&local_addr, MPID_UGNI_AllAddr, sizeof(unsigned int));
4225 /* create the local completion queue */
4226 /* the third parameter : The number of events the NIC allows before generating an interrupt. Setting this parameter to zero results in interrupt delivery with every event. When using this parameter, the mode parameter must be set to GNI_CQ_BLOCKING*/
4227 status = GNI_CqCreate(nic_hndl, LOCAL_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &default_tx_cqh);
4228 GNI_RC_CHECK("GNI_CqCreate (tx)", status);
4229 #if MULTI_THREAD_SEND
4230 status = GNI_CqCreate(nic_hndl, LOCAL_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &rdma_tx_cqh);
4231 GNI_RC_CHECK("GNI_CqCreate RDMA (tx)", status);
4235 status = GNI_CqCreate(nic_hndl, LOCAL_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &highprior_rdma_tx_cqh);
4236 GNI_RC_CHECK("GNI_CqCreate high priority RDMA (tx)", status);
4238 /* create the destination completion queue for receiving micro-messages, make this queue considerably larger than the number of transfers */
4240 status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &smsg_rx_cqh);
4241 GNI_RC_CHECK("Create CQ (rx)", status);
4243 status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &rdma_rx_cqh);
4244 GNI_RC_CHECK("Create Post CQ (rx)", status);
4246 #if CMK_PERSISTENT_COMM_PUT
4247 status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &highpriority_rx_cqh);
4248 GNI_RC_CHECK("Create Post CQ (rx)", status);
4251 #if CMK_ONESIDED_IMPL
4254 //status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &rdma_cqh);
4255 //GNI_RC_CHECK("Create BTE CQ", status);
4257 /* create the endpoints. they need to be bound to allow later CQWrites to them */
4258 ep_hndl_array = (gni_ep_handle_t*)malloc(mysize * sizeof(gni_ep_handle_t));
4259 _MEMCHECK(ep_hndl_array);
4260 #if MULTI_THREAD_SEND
4261 rx_cq_lock = global_gni_lock = default_tx_cq_lock = smsg_mailbox_lock = CmiCreateLock();
4262 //default_tx_cq_lock = CmiCreateLock();
4263 rdma_tx_cq_lock = CmiCreateLock();
4264 smsg_rx_cq_lock = CmiCreateLock();
4265 //global_gni_lock = CmiCreateLock();
4266 //rx_cq_lock = CmiCreateLock();
4268 for (i=0; i<mysize; i++) {
4269 if(i == myrank) continue;
4270 status = GNI_EpCreate(nic_hndl, default_tx_cqh, &ep_hndl_array[i]);
4271 GNI_RC_CHECK("GNI_EpCreate ", status);
4272 remote_addr = MPID_UGNI_AllAddr[i];
4273 status = GNI_EpBind(ep_hndl_array[i], remote_addr, i);
4274 GNI_RC_CHECK("GNI_EpBind ", status);
4277 /* SMSG is fastest but not scale; Msgq is scalable, FMA is own implementation for small message */
4281 #if USE_LRTS_MEMPOOL
4282 env = getenv("CHARM_UGNI_MAX_MEMORY_ON_NODE");
4284 _totalmem = CmiReadSize(env);
4285 if ((myrank == 0) && (!quietMode))
4286 printf("Charm++> total registered memory available per node is %.1fGB\n", (float)(_totalmem*1.0/oneGB));
4289 env = getenv("CHARM_UGNI_MEMPOOL_INIT_SIZE");
4290 if (env) _mempool_size = CmiReadSize(env);
4291 if (CmiGetArgStringDesc(*argv,"+gni-mempool-init-size",&env,"Set the memory pool size"))
4292 _mempool_size = CmiReadSize(env);
4295 env = getenv("CHARM_UGNI_MEMPOOL_MAX");
4297 MAX_REG_MEM = CmiReadSize(env);
4300 if (CmiGetArgStringDesc(*argv,"+gni-mempool-max",&env,"Set the memory pool max size")) {
4301 MAX_REG_MEM = CmiReadSize(env);
4305 env = getenv("CHARM_UGNI_SEND_MAX");
4307 MAX_BUFF_SEND = CmiReadSize(env);
4310 if (CmiGetArgStringDesc(*argv,"+gni-mempool-max-send",&env,"Set the memory pool max size for send")) {
4311 MAX_BUFF_SEND = CmiReadSize(env);
4315 env = getenv("CHARM_UGNI_MEMPOOL_SIZE_LIMIT");
4317 _mempool_size_limit = CmiReadSize(env);
4320 if (MAX_REG_MEM < _mempool_size) MAX_REG_MEM = _mempool_size;
4321 if (MAX_BUFF_SEND > MAX_REG_MEM) MAX_BUFF_SEND = MAX_REG_MEM;
4324 if (!quietMode) printf("Charm++> memory pool init block size: %1.fMB, total memory pool limit %1.fMB (0 means no limit)\n", _mempool_size/1024.0/1024, _mempool_size_limit/1024.0/1024);
4325 if (!quietMode) printf("Charm++> memory pool registered memory limit: %1.fMB, send limit: %1.fMB\n", MAX_REG_MEM/1024.0/1024, MAX_BUFF_SEND/1024.0/1024);
4326 if (MAX_REG_MEM < BIG_MSG * 2 + oneMB) {
4327 /* memblock can expand to BIG_MSG * 2 size */
4328 if (!quietMode) printf("Charm++ Error: The mempool maximum size is too small, please use command line option +gni-mempool-max or environment variable CHARM_UGNI_MEMPOOL_MAX to increase the value to at least %1.fMB.\n", BIG_MSG * 2.0/1024/1024 + 1);
4329 CmiAbort("mempool maximum size is too small. \n");
4331 #if MULTI_THREAD_SEND
4332 if (!quietMode) printf("Charm++> worker thread sending messages\n");
4333 #elif COMM_THREAD_SEND
4334 if (!quietMode) printf("Charm++> only comm thread send/recv messages\n");
4338 #endif /* end of USE_LRTS_MEMPOOL */
4340 env = getenv("CHARM_UGNI_BIG_MSG_SIZE");
4342 BIG_MSG = CmiReadSize(env);
4343 if (BIG_MSG < ONE_SEG)
4344 CmiAbort("BIG_MSG size is too small in the environment variable CHARM_UGNI_BIG_MSG_SIZE.");
4346 env = getenv("CHARM_UGNI_BIG_MSG_PIPELINE_LEN");
4348 BIG_MSG_PIPELINE = atoi(env);
4351 env = getenv("CHARM_UGNI_NO_DEADLOCK_CHECK");
4352 if (env) _checkProgress = 0;
4353 if (mysize == 1) _checkProgress = 0;
4355 #if CMI_EXERT_RECV_RDMA_CAP
4356 env = getenv("CHARM_UGNI_RDMA_MAX");
4358 RDMA_pending = atoi(env);
4359 if ((myrank == 0) && (!quietMode))
4360 printf("Charm++> Max pending RDMA set to: %d\n", RDMA_pending);
4365 env = getenv("HUGETLB_DEFAULT_PAGE_SIZE");
4367 _tlbpagesize = CmiReadSize(env);
4369 /* real gethugepagesize() is only available when hugetlb module linked */
4371 _tlbpagesize = gethugepagesize();
4373 _tlbpagesize = getpagesize();
4375 if ((myrank == 0) && (!quietMode)) {
4376 printf("Charm++> Cray TLB page size: %1.fK\n", _tlbpagesize/1024.0);
4380 if (_tlbpagesize == 4096) {
4381 CmiAbort("Hugepage module, e.g. craype-hugepages8M must be loaded.");
4385 /* stats related arguments */
4387 CmiGetArgStringDesc(*argv,"+gni_stats_root",&counters_dirname,"counter directory name, default counters");
4389 print_stats = CmiGetArgFlag(*argv, "+print_stats");
4391 stats_off = CmiGetArgFlag(*argv, "+stats_off");
4397 if (CmiGetArgFlag(*argv,"+checksum")) {
4398 #if CMK_ERROR_CHECKING
4400 if (myrank == 0) CmiPrintf("Charm++> CheckSum checking enabled! \n");
4402 if (myrank == 0) CmiPrintf("Charm++> +checksum ignored in optimized version! \n");
4406 /* init DMA buffer for medium message */
4408 //_init_DMA_buffer();
4410 free(MPID_UGNI_AllAddr);
4412 sendRdmaBuf = PCQueueCreate();
4413 sendHighPriorBuf = PCQueueCreate();
4416 // ntk_return_t sts = NTK_System_GetSmpdCount(&_smpd_count);
4418 #if CMK_SMSGS_FREE_AFTER_EVENT
4419 int smsgPoolSize = 16384;
4420 if(mysize*SMSG_MAX_CREDIT < smsgPoolSize)
4421 smsgPoolSize = mysize*SMSG_MAX_CREDIT;
4422 IndexPool_init(&smsgsPool, smsgPoolSize, 1u<<31-1);
4427 while (1<<SHIFT < mysize) SHIFT++;
4428 CmiAssert(SHIFT < 31);
4429 if ( (1<<(31-SHIFT)) < POOL_INIT_SIZE) CmiAbort("IndexPool_init: pool initial size is too big.");
4430 IndexPool_init(&ackPool, POOL_INIT_SIZE, 1u<<(31-SHIFT));
4431 #if CMK_PERSISTENT_COMM_PUT
4432 IndexPool_init(&persistPool, POOL_INIT_SIZE, 1u<<(31-SHIFT));
4438 void* LrtsRdmaAlloc(int n_bytes, int header)
4441 int alignbuf = ALIGNBUF;
4442 CmiAssert(header+sizeof(mempool_header) <= ALIGNBUF);
4443 #if USE_LRTS_MEMPOOL
4444 n_bytes = ALIGN64(n_bytes);
4445 int val = ALIGNBUF+n_bytes-sizeof(mempool_header);
4446 if(n_bytes < BIG_MSG)
4448 char *res = (char *)mempool_malloc(CpvAccess(mempool), ALIGNBUF+n_bytes-sizeof(mempool_header), 1);
4449 if (res) ptr = res - sizeof(mempool_header) + ALIGNBUF - header;
4453 n_bytes = ALIGNHUGEPAGE(n_bytes+ALIGNBUF);
4454 char *res = (char *)my_get_huge_pages(n_bytes);
4456 char *res = (char *)memalign(ALIGNBUF, n_bytes+ALIGNBUF);
4458 if (res) ptr = res + ALIGNBUF - header;
4461 n_bytes = ALIGN64(n_bytes); /* make sure size if 4 aligned */
4462 char *res = (char *)memalign(ALIGNBUF, n_bytes+ALIGNBUF);
4463 ptr = res + ALIGNBUF - header;
4469 void LrtsRdmaFree(void *msg)
4471 int headersize = sizeof(CmiChunkHeader);
4472 char *aligned_addr = (char *)msg + headersize - ALIGNBUF;
4473 CmiUInt4 size = SIZEFIELD((char*)msg+headersize);
4474 size = ALIGN64(size);
4478 int s = ALIGNHUGEPAGE(size+ALIGNBUF);
4479 my_free_huge_pages(aligned_addr, s);
4481 free((char*)msg + headersize - ALIGNBUF);
4485 #if USE_LRTS_MEMPOOL
4487 mempool_free_thread(aligned_addr + sizeof(mempool_header));
4489 mempool_free(CpvAccess(mempool), aligned_addr + sizeof(mempool_header));
4498 void* LrtsAlloc(int n_bytes, int header)
4502 printf("\n[PE:%d]Alloc Lrts for bytes=%d, head=%d %d\n", CmiMyPe(), n_bytes, header, SMSG_MAX_MSG);
4504 if(n_bytes <= SMSG_MAX_MSG)
4506 int totalsize = n_bytes+header;
4507 ptr = memalign(ALIGNBUF, totalsize);
4510 CmiAssert(header+sizeof(mempool_header) <= ALIGNBUF);
4511 #if USE_LRTS_MEMPOOL
4512 n_bytes = ALIGN64(n_bytes);
4513 if(n_bytes < BIG_MSG)
4515 char *res = (char *)mempool_malloc(CpvAccess(mempool), ALIGNBUF+n_bytes-sizeof(mempool_header), 1);
4516 if (res) ptr = res - sizeof(mempool_header) + ALIGNBUF - header;
4520 //printf("[%d] LrtsAlloc a big_msg: %d %d\n", myrank, n_bytes, ALIGNHUGEPAGE(n_bytes+ALIGNBUF));
4521 n_bytes = ALIGNHUGEPAGE(n_bytes+ALIGNBUF);
4522 char *res = (char *)my_get_huge_pages(n_bytes);
4524 char *res = (char *)memalign(ALIGNBUF, n_bytes+ALIGNBUF);
4526 if (res) ptr = res + ALIGNBUF - header;
4529 n_bytes = ALIGN64(n_bytes); /* make sure size if 4 aligned */
4530 char *res = (char *)memalign(ALIGNBUF, n_bytes+ALIGNBUF);
4531 ptr = res + ALIGNBUF - header;
4538 void LrtsFree(void *msg)
4540 int headersize = sizeof(CmiChunkHeader);
4541 char *aligned_addr = (char *)msg + headersize - ALIGNBUF;
4542 CmiUInt4 size = SIZEFIELD((char*)msg+headersize);
4543 #if CMK_PERSISTENT_COMM_PUT
4544 if (IS_PERSISTENT_MEMORY(msg)) return;
4546 if (size <= SMSG_MAX_MSG)
4549 size = ALIGN64(size);
4553 int s = ALIGNHUGEPAGE(size+ALIGNBUF);
4554 my_free_huge_pages(aligned_addr, s);
4556 free((char*)msg + headersize - ALIGNBUF);
4560 #if USE_LRTS_MEMPOOL
4562 mempool_free_thread(aligned_addr + sizeof(mempool_header));
4564 mempool_free(CpvAccess(mempool), aligned_addr + sizeof(mempool_header));
4573 void LrtsExit(int exitcode)
4577 if(CmiMyRank() == CmiMyNodeSize())
4579 if (print_stats) print_comm_stats();
4582 #if USE_LRTS_MEMPOOL
4583 //printf("FINAL [%d, %d] register=%lld, send=%lld\n", myrank, CmiMyRank(), register_memory_size, buffered_send_msg);
4584 mempool_destroy(CpvAccess(mempool));
4586 if(!CharmLibInterOperate || userDrivenMode) {
4593 void LrtsDrainResources(void)
4595 if(mysize == 1) return;
4598 !SendBufferMsg(&smsg_oob_queue, NULL) ||
4600 !SendBufferMsg(&smsg_queue, NULL)
4604 PumpDatagramConnection();
4606 PumpLocalTransactions(default_tx_cqh, default_tx_cq_lock);
4608 #if MULTI_THREAD_SEND
4609 PumpLocalTransactions(rdma_tx_cqh, rdma_tx_cq_lock);
4613 PumpLocalTransactions(highprior_rdma_tx_cqh, rdma_tx_cq_lock);
4617 PumpRemoteTransactions(rdma_rx_cqh);
4619 SendRdmaMsg(sendRdmaBuf);
4620 SendRdmaMsg(sendHighPriorBuf);
4625 void LrtsAbort(const char *message) {
4626 PMI_Abort(-1, message);
4627 CMI_NORETURN_FUNCTION_END
4630 /************************** TIMER FUNCTIONS **************************/
4631 #if CMK_TIMER_USE_SPECIAL
4632 /* MPI calls are not threadsafe, even the timer on some machines */
4633 static CmiNodeLock timerLock = 0;
4634 static int _absoluteTime = 0;
4635 static int _is_global = 0;
4636 static struct timespec start_ts;
4638 int CmiTimerIsSynchronized(void) {
4642 int CmiTimerAbsolute(void) {
4643 return _absoluteTime;
4646 double CmiStartTimer(void) {
4650 double CmiInitTime(void) {
4651 return (double)(start_ts.tv_sec)+(double)start_ts.tv_nsec/1000000000.0;
4654 void CmiTimerInit(char **argv) {
4655 _absoluteTime = CmiGetArgFlagDesc(argv,"+useAbsoluteTime", "Use system's absolute time as wallclock time.");
4656 if (_absoluteTime && CmiMyPe() == 0)
4657 printf("Charm++> absolute timer is used\n");
4659 _is_global = CmiTimerIsSynchronized();
4663 if (CmiMyRank() == 0) {
4664 clock_gettime(CLOCK_MONOTONIC, &start_ts);
4666 } else { /* we don't have a synchronous timer, set our own start time */
4670 clock_gettime(CLOCK_MONOTONIC, &start_ts);
4672 CmiNodeAllBarrier(); /* for smp */
4676 * Since the timerLock is never created, and is
4677 * always NULL, then all the if-condition inside
4678 * the timer functions could be disabled right
4679 * now in the case of SMP.
4681 double CmiTimer(void) {
4682 struct timespec now_ts;
4683 clock_gettime(CLOCK_MONOTONIC, &now_ts);
4684 return _absoluteTime?((double)(now_ts.tv_sec)+(double)now_ts.tv_nsec/1000000000.0)
4685 : (double)( now_ts.tv_sec - start_ts.tv_sec ) + (((double) now_ts.tv_nsec - (double) start_ts.tv_nsec) / 1000000000.0);
4688 double CmiWallTimer(void) {
4689 struct timespec now_ts;
4690 clock_gettime(CLOCK_MONOTONIC, &now_ts);
4691 return _absoluteTime?((double)(now_ts.tv_sec)+(double)now_ts.tv_nsec/1000000000.0)
4692 : ( now_ts.tv_sec - start_ts.tv_sec ) + ((now_ts.tv_nsec - start_ts.tv_nsec) / 1000000000.0);
4695 double CmiCpuTimer(void) {
4696 struct timespec now_ts;
4697 clock_gettime(CLOCK_MONOTONIC, &now_ts);
4698 return _absoluteTime?((double)(now_ts.tv_sec)+(double)now_ts.tv_nsec/1000000000.0)
4699 : (double)( now_ts.tv_sec - start_ts.tv_sec ) + (((double) now_ts.tv_nsec - (double) start_ts.tv_nsec) / 1000000000.0);
4703 /************Barrier Related Functions****************/
4705 void LrtsBarrier(void)
4707 gni_return_t status;
4709 status = (gni_return_t)PMI_Barrier();
4710 GNI_RC_CHECK("PMI_Barrier", status);
4712 #if CMK_ONESIDED_IMPL
4713 #include "machine-onesided.c"
4716 #include "machine-cmidirect.c"
4718 #if CMK_PERSISTENT_COMM
4719 #include "machine-persistent.c"