3 * Gemini GNI machine layer
9 * Flow control by mem pool using environment variables:
11 # CHARM_UGNI_MEMPOOL_MAX can be maximum_register_mem/number_of_processes
12 # CHARM_UGNI_SEND_MAX can be half of CHARM_UGNI_MEMPOOL_MAX
13 export CHARM_UGNI_MEMPOOL_INIT_SIZE=8M
14 export CHARM_UGNI_MEMPOOL_MAX=20M
15 export CHARM_UGNI_SEND_MAX=10M
17 # limit on total mempool size allocated, this is to prevent mempool
18 # uses too much memory
19 export CHARM_UGNI_MEMPOOL_SIZE_LIMIT=512M
21 other environment variables:
23 export CHARM_UGNI_NO_DEADLOCK_CHECK=yes # disable checking deadlock
24 export CHARM_UGNI_MAX_MEMORY_ON_NODE=0.8G # max memory per node for mempool
25 export CHARM_UGNI_BIG_MSG_SIZE=4M # set big message size protocol
26 export CHARM_UGNI_BIG_MSG_PIPELINE_LEN=4 # set big message pipe len
27 export CHARM_UGNI_RDMA_MAX=100 # max pending RDMA operations
42 //#include <numatoolkit.h>
47 #include "cmidirect.h"
53 #define MULTI_THREAD_SEND 0
54 #define COMM_THREAD_SEND (!MULTI_THREAD_SEND)
58 #define CMK_WORKER_SINGLE_TASK 1
61 #define REMOTE_EVENT 1
64 #define CMI_EXERT_SEND_LARGE_CAP 0
65 #define CMI_EXERT_RECV_RDMA_CAP 0
68 #define CMI_SENDBUFFERSMSG_CAP 0
69 #define CMI_PUMPNETWORKSMSG_CAP 0
70 #define CMI_PUMPREMOTETRANSACTIONS_CAP 0
71 #define CMI_PUMPLOCALTRANSACTIONS_CAP 0
73 #if CMI_SENDBUFFERSMSG_CAP
74 int SendBufferMsg_cap
= 20;
77 #if CMI_PUMPNETWORKSMSG_CAP
78 int PumpNetworkSmsg_cap
= 20;
81 #if CMI_PUMPREMOTETRANSACTIONS_CAP
82 int PumpRemoteTransactions_cap
= 20;
85 #if CMI_PUMPREMOTETRANSACTIONS_CAP
86 int PumpLocalTransactions_cap
= 20;
89 #if CMI_EXERT_SEND_LARGE_CAP
90 static int SEND_large_cap
= 20;
91 static int SEND_large_pending
= 0;
94 #if CMI_EXERT_RECV_RDMA_CAP
95 static int RDMA_cap
= 10;
96 static int RDMA_pending
= 0;
99 #define USE_LRTS_MEMPOOL 1
103 // Trace communication thread
104 #if CMK_TRACE_ENABLED && CMK_SMP_TRACE_COMMTHREAD
105 #define TRACE_THRESHOLD 0.00001
106 #define CMI_MPI_TRACE_MOREDETAILED 0
107 #undef CMI_MPI_TRACE_USEREVENTS
108 #define CMI_MPI_TRACE_USEREVENTS 1
110 #undef CMK_SMP_TRACE_COMMTHREAD
111 #define CMK_SMP_TRACE_COMMTHREAD 0
114 #define CMK_TRACE_COMMOVERHEAD 0
115 #if CMK_TRACE_ENABLED && CMK_TRACE_COMMOVERHEAD
116 #undef CMI_MPI_TRACE_USEREVENTS
117 #define CMI_MPI_TRACE_USEREVENTS 1
119 #undef CMK_TRACE_COMMOVERHEAD
120 #define CMK_TRACE_COMMOVERHEAD 0
123 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && ! CMK_TRACE_IN_CHARM
124 CpvStaticDeclare(double, projTraceStart
);
125 #define START_EVENT() CpvAccess(projTraceStart) = CmiWallTimer();
126 #define END_EVENT(x) traceUserBracketEvent(x, CpvAccess(projTraceStart), CmiWallTimer());
127 #define EVENT_TIME() CpvAccess(projTraceStart)
129 #define START_EVENT()
131 #define EVENT_TIME() (0.0)
136 #define oneMB (1024ll*1024)
137 #define oneGB (1024ll*1024*1024)
139 static CmiInt8 _mempool_size
= 8*oneMB
;
140 static CmiInt8 _expand_mem
= 4*oneMB
;
141 static CmiInt8 _mempool_size_limit
= 0;
143 static CmiInt8 _totalmem
= 0.8*oneGB
;
146 static CmiInt8 BIG_MSG
= 16*oneMB
;
147 static CmiInt8 ONE_SEG
= 4*oneMB
;
149 static CmiInt8 BIG_MSG
= 4*oneMB
;
150 static CmiInt8 ONE_SEG
= 2*oneMB
;
152 #if MULTI_THREAD_SEND
153 static int BIG_MSG_PIPELINE
= 1;
155 static int BIG_MSG_PIPELINE
= 4;
158 // dynamic flow control
159 static CmiInt8 buffered_send_msg
= 0;
160 static CmiInt8 register_memory_size
= 0;
163 static CmiInt8 MAX_BUFF_SEND
= 100000*oneMB
;
164 static CmiInt8 MAX_REG_MEM
= 200000*oneMB
;
165 static CmiInt8 register_count
= 0;
167 #if CMK_SMP && COMM_THREAD_SEND
168 static CmiInt8 MAX_BUFF_SEND
= 100*oneMB
;
169 static CmiInt8 MAX_REG_MEM
= 200*oneMB
;
171 static CmiInt8 MAX_BUFF_SEND
= 16*oneMB
;
172 static CmiInt8 MAX_REG_MEM
= 25*oneMB
;
178 #endif /* end USE_LRTS_MEMPOOL */
180 #if MULTI_THREAD_SEND
181 #define CMI_GNI_LOCK(x) CmiLock(x);
182 #define CMI_GNI_TRYLOCK(x) CmiTryLock(x)
183 #define CMI_GNI_UNLOCK(x) CmiUnlock(x);
184 #define CMI_PCQUEUEPOP_LOCK(Q) CmiLock((Q)->lock);
185 #define CMI_PCQUEUEPOP_UNLOCK(Q) CmiUnlock((Q)->lock);
187 #define CMI_GNI_LOCK(x)
188 #define CMI_GNI_TRYLOCK(x) (0)
189 #define CMI_GNI_UNLOCK(x)
190 #define CMI_PCQUEUEPOP_LOCK(Q)
191 #define CMI_PCQUEUEPOP_UNLOCK(Q)
194 static int _tlbpagesize
= 4096;
196 //static int _smpd_count = 0;
198 static int user_set_flag
= 0;
200 static int _checkProgress
= 1; /* check deadlock */
201 static int _detected_hang
= 0;
203 #define SMSG_ATTR_SIZE sizeof(gni_smsg_attr_t)
206 static int useDynamicSMSG
= 0; /* dynamic smsgs setup */
208 static int avg_smsg_connection
= 32;
209 static int *smsg_connected_flag
= 0;
210 static gni_smsg_attr_t
**smsg_attr_vector_local
;
211 static gni_smsg_attr_t
**smsg_attr_vector_remote
;
212 static gni_ep_handle_t ep_hndl_unbound
;
213 static gni_smsg_attr_t send_smsg_attr
;
214 static gni_smsg_attr_t recv_smsg_attr
;
216 typedef struct _dynamic_smsg_mailbox
{
220 gni_mem_handle_t mem_hndl
;
221 struct _dynamic_smsg_mailbox
*next
;
222 }dynamic_smsg_mailbox_t
;
224 static dynamic_smsg_mailbox_t
*mailbox_list
;
226 static CmiUInt8 smsg_send_count
= 0, last_smsg_send_count
= 0;
227 static CmiUInt8 smsg_recv_count
= 0, last_smsg_recv_count
= 0;
230 int lrts_send_msg_id
= 0;
231 int lrts_local_done_msg
= 0;
232 int lrts_send_rdma_success
= 0;
241 #if CMK_PERSISTENT_COMM
242 #include "machine-persistent.h"
243 #define POST_HIGHPRIORITY_RDMA STATS_SENDRDMAMSG_TIME(SendRdmaMsg(sendHighPriorBuf));
245 #define POST_HIGHPRIORITY_RDMA
248 #if REMOTE_EVENT && (CMK_USE_OOB || CMK_PERSISTENT_COMM)
249 #define PUMP_REMOTE_HIGHPRIORITY STATS_PUMPREMOTETRANSACTIONS_TIME(PumpRemoteTransactions(highpriority_rx_cqh) );
251 #define PUMP_REMOTE_HIGHPRIORITY
254 //#define USE_ONESIDED 1
256 //onesided implementation is wrong, since no place to restore omdh
257 #include "onesided.h"
258 onesided_hnd_t onesided_hnd
;
260 #define MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh) omdh. onesided_mem_register(handler, (uint64_t)msg, size, 0, myomdh)
262 #define MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh) onesided_mem_deregister(handler, myomdh)
265 uint8_t onesided_hnd
, omdh
;
267 #if REMOTE_EVENT || CQWRITE
268 #define MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdhh, cqh, status) \
269 if(register_memory_size+size>= MAX_REG_MEM) { \
270 status = GNI_RC_ERROR_NOMEM;} \
271 else {status = GNI_MemRegister(nic_hndl, (uint64_t)msg, (uint64_t)size, cqh, GNI_MEM_READWRITE, -1, mem_hndl); \
272 if(status == GNI_RC_SUCCESS) register_memory_size += size; }
274 #define MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh, cqh, status ) \
275 if (register_memory_size + size >= MAX_REG_MEM) { \
276 status = GNI_RC_ERROR_NOMEM; \
277 } else { status = GNI_MemRegister(nic_hndl, (uint64_t)msg, (uint64_t)size, NULL, GNI_MEM_READWRITE, -1, mem_hndl); \
278 if(status == GNI_RC_SUCCESS) register_memory_size += size; }
281 #define MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh, size) \
282 do { if (GNI_MemDeregister(nic_hndl, (mem_hndl) ) == GNI_RC_SUCCESS) \
283 register_memory_size -= size; \
284 else CmiAbort("MEM_DEregister"); \
288 #define GetMempoolBlockPtr(x) MEMPOOL_GetBlockPtr(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
289 #define GetMempoolPtr(x) MEMPOOL_GetMempoolPtr(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
290 #define GetMempoolsize(x) MEMPOOL_GetSize(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
291 #define GetMemHndl(x) MEMPOOL_GetMemHndl(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
292 #define IncreaseMsgInRecv(x) MEMPOOL_IncMsgInRecv(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
293 #define DecreaseMsgInRecv(x) MEMPOOL_DecMsgInRecv(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
294 #define IncreaseMsgInSend(x) MEMPOOL_IncMsgInSend(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
295 #define DecreaseMsgInSend(x) MEMPOOL_DecMsgInSend(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
296 #define NoMsgInSend(x) MEMPOOL_GetMsgInSend(MEMPOOL_GetMempoolHeader(x,ALIGNBUF)) == 0
297 #define NoMsgInRecv(x) MEMPOOL_GetMsgInRecv(MEMPOOL_GetMempoolHeader(x,ALIGNBUF)) == 0
298 #define NoMsgInFlight(x) (NoMsgInSend(x) && NoMsgInRecv(x))
299 #define IsMemHndlZero(x) ((x).qword1 == 0 && (x).qword2 == 0)
300 #define SetMemHndlZero(x) do {(x).qword1 = 0;(x).qword2 = 0;} while (0)
301 #define NotRegistered(x) IsMemHndlZero(GetMemHndl(x))
303 #define GetMemHndlFromBlockHeader(x) MEMPOOL_GetBlockMemHndl(x)
304 #define GetSizeFromBlockHeader(x) MEMPOOL_GetBlockSize(x)
306 #define CmiGetMsgSize(m) ((CmiMsgHeaderExt*)m)->size
307 #define CmiSetMsgSize(m,s) ((((CmiMsgHeaderExt*)m)->size)=(s))
308 #define CmiGetMsgSeq(m) ((CmiMsgHeaderExt*)m)->seq
309 #define CmiSetMsgSeq(m, s) ((((CmiMsgHeaderExt*)m)->seq) = (s))
313 /* =======Beginning of Definitions of Performance-Specific Macros =======*/
314 /* If SMSG is not used */
316 #define FMA_PER_CORE 1024
317 #define FMA_BUFFER_SIZE 1024
319 /* If SMSG is used */
320 static int SMSG_MAX_MSG
= 1024;
321 #define SMSG_MAX_CREDIT 72
323 #define MSGQ_MAXSIZE 2048
325 /* large message transfer with FMA or BTE */
327 #define LRTS_GNI_RDMA_THRESHOLD 1024
329 /* remote events only work with RDMA */
330 #define LRTS_GNI_RDMA_THRESHOLD 0
334 static int REMOTE_QUEUE_ENTRIES
=163840;
335 static int LOCAL_QUEUE_ENTRIES
=163840;
337 static int REMOTE_QUEUE_ENTRIES
=20480;
338 static int LOCAL_QUEUE_ENTRIES
=20480;
341 #define BIG_MSG_TAG 0x26
342 #define PUT_DONE_TAG 0x28
343 #define DIRECT_PUT_DONE_TAG 0x29
345 /* SMSG is data message */
346 #define SMALL_DATA_TAG 0x31
347 /* SMSG is a control message to initialize a BTE */
348 #define LMSG_INIT_TAG 0x33
349 #define LMSG_OOB_INIT_TAG 0x35
356 #define GNI_RC_CHECK(msg,rc) do { if(rc != GNI_RC_SUCCESS) { printf("[%d] %s; err=%s\n",CmiMyPe(),msg,gni_err_str[rc]); fflush(stdout); CmiAbort("GNI_RC_CHECK"); } } while(0)
358 #define GNI_RC_CHECK(msg,rc)
361 #define ALIGN64(x) (size_t)((~63)&((x)+63))
362 //#define ALIGN4(x) (size_t)((~3)&((x)+3))
363 #define ALIGNHUGEPAGE(x) (size_t)((~(_tlbpagesize-1))&((x)+_tlbpagesize-1))
365 static int useStaticMSGQ
= 0;
366 static int useStaticFMA
= 0;
367 static int mysize
, myrank
;
368 static gni_nic_handle_t nic_hndl
;
371 gni_mem_handle_t mdh
;
374 // this is related to dynamic SMSG
376 typedef struct mdh_addr_list
{
377 gni_mem_handle_t mdh
;
379 struct mdh_addr_list
*next
;
382 static unsigned int smsg_memlen
;
383 gni_smsg_attr_t
**smsg_local_attr_vec
= 0;
384 mdh_addr_t setup_mem
;
385 mdh_addr_t
*smsg_connection_vec
= 0;
386 gni_mem_handle_t smsg_connection_memhndl
;
387 static int smsg_expand_slots
= 10;
388 static int smsg_available_slot
= 0;
389 static void *smsg_mailbox_mempool
= 0;
390 mdh_addr_list_t
*smsg_dynamic_list
= 0;
392 static void *smsg_mailbox_base
;
393 gni_msgq_attr_t msgq_attrs
;
394 gni_msgq_handle_t msgq_handle
;
395 gni_msgq_ep_attr_t msgq_ep_attrs
;
396 gni_msgq_ep_attr_t msgq_ep_attrs_size
;
398 /* =====Beginning of Declarations of Machine Specific Variables===== */
400 static int modes
= 0;
401 static gni_cq_handle_t smsg_rx_cqh
= NULL
; // smsg send
402 static gni_cq_handle_t default_tx_cqh
= NULL
; // bind to endpoint
403 static gni_cq_handle_t rdma_tx_cqh
= NULL
; // rdma - local event
404 static gni_cq_handle_t highprior_rdma_tx_cqh
= NULL
; // rdma - local event
405 static gni_cq_handle_t rdma_rx_cqh
= NULL
; // mempool - remote event
406 static gni_cq_handle_t highpriority_rx_cqh
= NULL
; // mempool - remote event
407 static gni_ep_handle_t
*ep_hndl_array
;
409 static CmiNodeLock
*ep_lock_array
;
410 static CmiNodeLock default_tx_cq_lock
;
411 static CmiNodeLock rdma_tx_cq_lock
;
412 static CmiNodeLock global_gni_lock
;
413 static CmiNodeLock rx_cq_lock
;
414 static CmiNodeLock smsg_mailbox_lock
;
415 static CmiNodeLock smsg_rx_cq_lock
;
416 static CmiNodeLock
*mempool_lock
;
417 //#define CMK_WITH_STATS 1
418 typedef struct msg_list
425 double creation_time
;
430 typedef struct control_msg
432 uint64_t source_addr
; /* address from the start of buffer */
433 uint64_t dest_addr
; /* address from the start of buffer */
434 int total_length
; /* total length */
435 int length
; /* length of this packet */
437 int ack_index
; /* index from integer to address */
439 uint8_t seq_id
; //big message 0 meaning single message
440 gni_mem_handle_t source_mem_hndl
;
441 struct control_msg
*next
;
444 #define CONTROL_MSG_SIZE (sizeof(CONTROL_MSG)-sizeof(void*))
446 typedef struct ack_msg
448 uint64_t source_addr
; /* address from the start of buffer */
449 #if ! USE_LRTS_MEMPOOL
450 gni_mem_handle_t source_mem_hndl
;
451 int length
; /* total length */
453 struct ack_msg
*next
;
456 #define ACK_MSG_SIZE (sizeof(ACK_MSG)-sizeof(void*))
460 uint64_t handler_addr
;
464 char core
[CmiMsgHeaderSizeBytes
];
469 CpvDeclare(int, CmiHandleDirectIdx
);
470 void CmiHandleDirectMsg(cmidirectMsg
* msg
)
473 CmiDirectUserHandle
*_handle
= (CmiDirectUserHandle
*)(msg
->handler
);
474 (*(_handle
->callbackFnPtr
))(_handle
->callbackData
);
480 CpvInitialize(int, CmiHandleDirectIdx
);
481 CpvAccess(CmiHandleDirectIdx
) = CmiRegisterHandler( (CmiHandler
) CmiHandleDirectMsg
);
485 typedef struct rmda_msg
491 gni_post_descriptor_t
*pd
;
496 #define ONE_SEND_QUEUE 0
497 typedef PCQueue BufferList
;
498 typedef struct msg_list_index
509 PCQueue sendHighPriorBuf
;
510 // buffered send queue
512 typedef struct smsg_queue
514 MSG_LIST_INDEX
*smsg_msglist_index
;
517 PCQueue nonEmptyQueues
;
521 typedef struct smsg_queue
527 SMSG_QUEUE smsg_queue
;
529 SMSG_QUEUE smsg_oob_queue
;
530 #define SEND_OOB_SMSG(x) SendBufferMsg(&x, NULL);
531 #define PUMP_LOCAL_HIGHPRIORITY STATS_PUMPLOCALTRANSACTIONS_RDMA_TIME(PumpLocalTransactions(highprior_rdma_tx_cqh, rdma_tx_cq_lock));
533 #define SEND_OOB_SMSG(x)
534 #define PUMP_LOCAL_HIGHPRIORITY
537 #define FreeMsgList(d) free(d);
538 #define MallocMsgList(d) d = ((MSG_LIST*)malloc(sizeof(MSG_LIST)));
540 #define FreeControlMsg(d) free(d);
541 #define MallocControlMsg(d) d = ((CONTROL_MSG*)malloc(sizeof(CONTROL_MSG)));
543 #define FreeAckMsg(d) free(d);
544 #define MallocAckMsg(d) d = ((ACK_MSG*)malloc(sizeof(ACK_MSG)));
546 #define FreeRdmaRequest(d) free(d);
547 #define MallocRdmaRequest(d) d = ((RDMA_REQUEST*)malloc(sizeof(RDMA_REQUEST)));
548 /* reuse gni_post_descriptor_t */
549 static gni_post_descriptor_t
*post_freelist
=0;
551 #define FreePostDesc(d) free(d);
552 #define MallocPostDesc(d) d = ((gni_post_descriptor_t*)malloc(sizeof(gni_post_descriptor_t))); _MEMCHECK(d);
555 /* LrtsSent is called but message can not be sent by SMSGSend because of mailbox full or no credit */
556 static int buffered_smsg_counter
= 0;
558 /* SmsgSend return success but message sent is not confirmed by remote side */
559 static MSG_LIST
*buffered_fma_head
= 0;
560 static MSG_LIST
*buffered_fma_tail
= 0;
563 #define IsFree(a,ind) !( a& (1<<(ind) ))
564 #define SET_BITS(a,ind) a = ( a | (1<<(ind )) )
565 #define Reset(a,ind) a = ( a & (~(1<<(ind))) )
567 CpvDeclare(mempool_type
*, mempool
);
569 #if CMK_PERSISTENT_COMM
570 CpvDeclare(mempool_type
*, persistent_mempool
);
574 /* ack pool for remote events */
576 static int SHIFT
= 18;
577 #define INDEX_MASK ((1<<(32-SHIFT-1)) - 1)
578 #define RANK_MASK ((1<<SHIFT) - 1)
579 #define ACK_EVENT(idx) ((((idx) & INDEX_MASK)<<SHIFT) | myrank)
581 #define GET_TYPE(evt) (((evt) >> 31) & 1)
582 #define GET_RANK(evt) ((evt) & RANK_MASK)
583 #define GET_INDEX(evt) (((evt) >> SHIFT) & INDEX_MASK)
585 #define PERSIST_EVENT(idx) ( (1<<31) | (((idx) & INDEX_MASK)<<SHIFT) | myrank)
588 #define INIT_SIZE 4096
590 #define INIT_SIZE 1024
596 int type
; // 1: ACK 2: Persistent
599 typedef struct IndexPool
{
600 struct IndexStruct
*indexes
;
606 static IndexPool ackPool
;
607 #if CMK_PERSISTENT_COMM
608 static IndexPool persistPool
;
611 #define GetIndexType(pool, s) (pool.indexes[s].type)
612 #define GetIndexAddress(pool, s) (pool.indexes[s].addr)
614 static void IndexPool_init(IndexPool
*pool
)
617 if ((1<<SHIFT
) < mysize
)
618 CmiAbort("Charm++ Error: Remote event's rank field overflow.");
619 pool
->size
= INIT_SIZE
;
620 if ( (1<<(31-SHIFT
)) < pool
->size
) CmiAbort("IndexPool_init: pool initial size is too big.");
621 pool
->indexes
= (struct IndexStruct
*)malloc(pool
->size
*sizeof(struct IndexStruct
));
622 for (i
=0; i
<pool
->size
-1; i
++) {
623 pool
->indexes
[i
].next
= i
+1;
624 pool
->indexes
[i
].type
= 0;
626 pool
->indexes
[i
].next
= -1;
628 #if MULTI_THREAD_SEND || CMK_PERSISTENT_COMM
629 pool
->lock
= CmiCreateLock();
636 inline int IndexPool_getslot(IndexPool
*pool
, void *addr
, int type
)
639 #if MULTI_THREAD_SEND
644 int newsize
= pool
->size
* 2;
645 //printf("[%d] IndexPool_getslot %p expand to: %d\n", myrank, pool, newsize);
646 if (newsize
> (1<<(32-SHIFT
-1))) CmiAbort("IndexPool too large");
647 struct IndexStruct
*old_ackpool
= pool
->indexes
;
648 pool
->indexes
= (struct IndexStruct
*)malloc(newsize
*sizeof(struct IndexStruct
));
649 memcpy(pool
->indexes
, old_ackpool
, pool
->size
*sizeof(struct IndexStruct
));
650 for (i
=pool
->size
; i
<newsize
-1; i
++) {
651 pool
->indexes
[i
].next
= i
+1;
652 pool
->indexes
[i
].type
= 0;
654 pool
->indexes
[i
].next
= -1;
655 pool
->indexes
[i
].type
= 0;
656 pool
->freehead
= pool
->size
;
658 pool
->size
= newsize
;
661 pool
->freehead
= pool
->indexes
[s
].next
;
662 pool
->indexes
[s
].addr
= addr
;
663 CmiAssert(pool
->indexes
[s
].type
== 0 && (type
== 1 || type
== 2));
664 pool
->indexes
[s
].type
= type
;
665 #if MULTI_THREAD_SEND
666 CmiUnlock(pool
->lock
);
672 inline void IndexPool_freeslot(IndexPool
*pool
, int s
)
674 CmiAssert(s
>=0 && s
<pool
->size
);
675 #if MULTI_THREAD_SEND
678 pool
->indexes
[s
].next
= pool
->freehead
;
679 pool
->indexes
[s
].type
= 0;
681 #if MULTI_THREAD_SEND
682 CmiUnlock(pool
->lock
);
689 /* =====Beginning of Definitions of Message-Corruption Related Macros=====*/
690 #define CMI_MAGIC(msg) ((CmiMsgHeaderBasic *)msg)->magic
691 #define CHARM_MAGIC_NUMBER 126
693 #if CMK_ERROR_CHECKING
694 extern unsigned char computeCheckSum(unsigned char *data
, int len
);
695 static int checksum_flag
= 0;
696 #define CMI_SET_CHECKSUM(msg, len) \
697 if (checksum_flag) { \
698 ((CmiMsgHeaderBasic *)msg)->cksum = 0; \
699 ((CmiMsgHeaderBasic *)msg)->cksum = computeCheckSum((unsigned char*)msg, len); \
701 #define CMI_CHECK_CHECKSUM(msg, len) \
703 if (computeCheckSum((unsigned char*)msg, len) != 0) \
704 CmiAbort("Fatal error: checksum doesn't agree!\n");
706 #define CMI_SET_CHECKSUM(msg, len)
707 #define CMI_CHECK_CHECKSUM(msg, len)
709 /* =====End of Definitions of Message-Corruption Related Macros=====*/
711 static int print_stats
= 0;
712 static int stats_off
= 0;
713 void CmiTurnOnStats()
716 //CmiPrintf("[%d][%d:%d]+++++++++++ turning on stats \n", CmiMyNode(), CmiMyPe(), CmiMyRank());
719 void CmiTurnOffStats()
724 #define IS_PUT(type) (type == GNI_POST_FMA_PUT || type == GNI_POST_RDMA_PUT)
727 FILE *counterLog
= NULL
;
728 typedef struct comm_thread_stats
730 uint64_t smsg_data_count
;
731 uint64_t lmsg_init_count
;
733 uint64_t big_msg_ack_count
;
735 uint64_t direct_put_done_count
;
736 uint64_t put_done_count
;
737 //times of calling SmsgSend
738 uint64_t try_smsg_data_count
;
739 uint64_t try_lmsg_init_count
;
740 uint64_t try_ack_count
;
741 uint64_t try_big_msg_ack_count
;
742 uint64_t try_direct_put_done_count
;
743 uint64_t try_put_done_count
;
744 uint64_t try_smsg_count
;
746 double max_time_in_send_buffered_smsg
;
747 double all_time_in_send_buffered_smsg
;
749 uint64_t rdma_get_count
, rdma_put_count
;
750 uint64_t try_rdma_get_count
, try_rdma_put_count
;
751 double max_time_from_control_to_rdma_init
;
752 double all_time_from_control_to_rdma_init
;
754 double max_time_from_rdma_init_to_rdma_done
;
755 double all_time_from_rdma_init_to_rdma_done
;
757 int count_in_PumpNetwork
;
758 double time_in_PumpNetwork
;
759 double max_time_in_PumpNetwork
;
760 int count_in_SendBufferMsg_smsg
;
761 double time_in_SendBufferMsg_smsg
;
762 double max_time_in_SendBufferMsg_smsg
;
763 int count_in_SendRdmaMsg
;
764 double time_in_SendRdmaMsg
;
765 double max_time_in_SendRdmaMsg
;
766 int count_in_PumpRemoteTransactions
;
767 double time_in_PumpRemoteTransactions
;
768 double max_time_in_PumpRemoteTransactions
;
769 int count_in_PumpLocalTransactions_rdma
;
770 double time_in_PumpLocalTransactions_rdma
;
771 double max_time_in_PumpLocalTransactions_rdma
;
772 int count_in_PumpDatagramConnection
;
773 double time_in_PumpDatagramConnection
;
774 double max_time_in_PumpDatagramConnection
;
777 static Comm_Thread_Stats comm_stats
;
779 static char *counters_dirname
= "counters";
781 static void init_comm_stats()
783 memset(&comm_stats
, 0, sizeof(Comm_Thread_Stats
));
786 int code
= mkdir(counters_dirname
, 00777);
787 sprintf(ln
,"%s/statistics.%d.%d", counters_dirname
, mysize
, myrank
);
788 counterLog
=fopen(ln
,"w");
789 if (counterLog
== NULL
) CmiAbort("Counter files open failed");
793 #define SMSG_CREATION( x ) if(print_stats) { x->creation_time = CmiWallTimer(); }
795 #define SMSG_SENT_DONE(creation_time, tag) \
796 if (print_stats && !stats_off) { if( tag == SMALL_DATA_TAG) comm_stats.smsg_data_count++; \
797 else if( tag == LMSG_INIT_TAG || tag == LMSG_OOB_INIT_TAG) comm_stats.lmsg_init_count++; \
798 else if( tag == ACK_TAG) comm_stats.ack_count++; \
799 else if( tag == BIG_MSG_TAG) comm_stats.big_msg_ack_count++; \
800 else if( tag == PUT_DONE_TAG ) comm_stats.put_done_count++; \
801 else if( tag == DIRECT_PUT_DONE_TAG ) comm_stats.direct_put_done_count++; \
802 comm_stats.smsg_count++; \
803 double inbuff_time = CmiWallTimer() - creation_time; \
804 if(inbuff_time > comm_stats.max_time_in_send_buffered_smsg) comm_stats.max_time_in_send_buffered_smsg= inbuff_time; \
805 comm_stats.all_time_in_send_buffered_smsg += inbuff_time; \
808 #define SMSG_TRY_SEND(tag) \
809 if (print_stats && !stats_off){ if( tag == SMALL_DATA_TAG) comm_stats.try_smsg_data_count++; \
810 else if( tag == LMSG_INIT_TAG || tag == LMSG_OOB_INIT_TAG) comm_stats.try_lmsg_init_count++; \
811 else if( tag == ACK_TAG) comm_stats.try_ack_count++; \
812 else if( tag == BIG_MSG_TAG) comm_stats.try_big_msg_ack_count++; \
813 else if( tag == PUT_DONE_TAG ) comm_stats.try_put_done_count++; \
814 else if( tag == DIRECT_PUT_DONE_TAG ) comm_stats.try_direct_put_done_count++; \
815 comm_stats.try_smsg_count++; \
818 #define RDMA_TRY_SEND(type) if (print_stats && !stats_off) {IS_PUT(type)?comm_stats.try_rdma_put_count++:comm_stats.try_rdma_get_count++;}
820 #define RDMA_TRANS_DONE(x) \
821 if (print_stats && !stats_off) { double rdma_trans_time = CmiWallTimer() - x ; \
822 if(rdma_trans_time > comm_stats.max_time_from_rdma_init_to_rdma_done) comm_stats.max_time_from_rdma_init_to_rdma_done = rdma_trans_time; \
823 comm_stats.all_time_from_rdma_init_to_rdma_done += rdma_trans_time; \
826 #define RDMA_TRANS_INIT(type, x) \
827 if (print_stats && !stats_off) { IS_PUT(type)?comm_stats.rdma_put_count++:comm_stats.rdma_get_count++; \
828 double rdma_trans_time = CmiWallTimer() - x ; \
829 if(rdma_trans_time > comm_stats.max_time_from_control_to_rdma_init) comm_stats.max_time_from_control_to_rdma_init = rdma_trans_time; \
830 comm_stats.all_time_from_control_to_rdma_init += rdma_trans_time; \
833 #define STATS_PUMPNETWORK_TIME(x) \
834 { double t = CmiWallTimer(); \
836 t = CmiWallTimer() - t; \
837 comm_stats.count_in_PumpNetwork++; \
838 comm_stats.time_in_PumpNetwork += t; \
839 if (t>comm_stats.max_time_in_PumpNetwork) \
840 comm_stats.max_time_in_PumpNetwork = t; \
843 #define STATS_PUMPREMOTETRANSACTIONS_TIME(x) \
844 { double t = CmiWallTimer(); \
846 t = CmiWallTimer() - t; \
847 comm_stats.count_in_PumpRemoteTransactions ++; \
848 comm_stats.time_in_PumpRemoteTransactions += t; \
849 if (t>comm_stats.max_time_in_PumpRemoteTransactions) \
850 comm_stats.max_time_in_PumpRemoteTransactions = t; \
853 #define STATS_PUMPLOCALTRANSACTIONS_RDMA_TIME(x) \
854 { double t = CmiWallTimer(); \
856 t = CmiWallTimer() - t; \
857 comm_stats.count_in_PumpLocalTransactions_rdma ++; \
858 comm_stats.time_in_PumpLocalTransactions_rdma += t; \
859 if (t>comm_stats.max_time_in_PumpLocalTransactions_rdma) \
860 comm_stats.max_time_in_PumpLocalTransactions_rdma = t; \
863 #define STATS_SEND_SMSGS_TIME(x) \
864 { double t = CmiWallTimer(); \
866 t = CmiWallTimer() - t; \
867 comm_stats.count_in_SendBufferMsg_smsg ++; \
868 comm_stats.time_in_SendBufferMsg_smsg += t; \
869 if (t>comm_stats.max_time_in_SendBufferMsg_smsg) \
870 comm_stats.max_time_in_SendBufferMsg_smsg = t; \
873 #define STATS_SENDRDMAMSG_TIME(x) \
874 { double t = CmiWallTimer(); \
876 t = CmiWallTimer() - t; \
877 comm_stats.count_in_SendRdmaMsg ++; \
878 comm_stats.time_in_SendRdmaMsg += t; \
879 if (t>comm_stats.max_time_in_SendRdmaMsg) \
880 comm_stats.max_time_in_SendRdmaMsg = t; \
883 #define STATS_PUMPDATAGRAMCONNECTION_TIME(x) \
884 { double t = CmiWallTimer(); \
886 t = CmiWallTimer() - t; \
887 comm_stats.count_in_PumpDatagramConnection ++; \
888 comm_stats.time_in_PumpDatagramConnection += t; \
889 if (t>comm_stats.max_time_in_PumpDatagramConnection) \
890 comm_stats.max_time_in_PumpDatagramConnection = t; \
893 static void print_comm_stats()
895 fprintf(counterLog
, "Node[%d] SMSG time in buffer\t[total:%f\tmax:%f\tAverage:%f](milisecond)\n", myrank
, 1000.0*comm_stats
.all_time_in_send_buffered_smsg
, 1000.0*comm_stats
.max_time_in_send_buffered_smsg
, 1000.0*comm_stats
.all_time_in_send_buffered_smsg
/comm_stats
.smsg_count
);
896 fprintf(counterLog
, "Node[%d] Smsg Msgs \t[Total:%lld\t Data:%lld\t Lmsg_Init:%lld\t ACK:%lld\t BIG_MSG_ACK:%lld Direct_put_done:%lld\t Persistent_put_done:%lld]\n", myrank
,
897 comm_stats
.smsg_count
, comm_stats
.smsg_data_count
, comm_stats
.lmsg_init_count
,
898 comm_stats
.ack_count
, comm_stats
.big_msg_ack_count
, comm_stats
.direct_put_done_count
, comm_stats
.put_done_count
);
900 fprintf(counterLog
, "Node[%d] SmsgSendCalls\t[Total:%lld\t Data:%lld\t Lmsg_Init:%lld\t ACK:%lld\t BIG_MSG_ACK:%lld Direct_put_done:%lld\t Persistent_put_done:%lld]\n\n", myrank
,
901 comm_stats
.try_smsg_count
, comm_stats
.try_smsg_data_count
, comm_stats
.try_lmsg_init_count
,
902 comm_stats
.try_ack_count
, comm_stats
.try_big_msg_ack_count
, comm_stats
.try_direct_put_done_count
, comm_stats
.try_put_done_count
);
904 fprintf(counterLog
, "Node[%d] Rdma Transaction [count (GET/PUT):%lld %lld\t calls (GET/PUT):%lld %lld]\n", myrank
, comm_stats
.rdma_get_count
, comm_stats
.rdma_put_count
, comm_stats
.try_rdma_get_count
, comm_stats
.try_rdma_put_count
);
905 fprintf(counterLog
, "Node[%d] Rdma time from control arrives to rdma init [Total:%f\tMAX:%f\t Average:%f](milisecond)\n", myrank
, 1000.0*comm_stats
.all_time_from_control_to_rdma_init
, 1000.0*comm_stats
.max_time_from_control_to_rdma_init
, 1000.0*comm_stats
.all_time_from_control_to_rdma_init
/(comm_stats
.rdma_get_count
+comm_stats
.rdma_put_count
));
906 fprintf(counterLog
, "Node[%d] Rdma time from init to rdma done [Total:%f\tMAX:%f\t Average:%f](milisecond)\n\n", myrank
,1000.0*comm_stats
.all_time_from_rdma_init_to_rdma_done
, 1000.0*comm_stats
.max_time_from_rdma_init_to_rdma_done
, 1000.0*comm_stats
.all_time_from_rdma_init_to_rdma_done
/(comm_stats
.rdma_get_count
+comm_stats
.rdma_put_count
));
909 fprintf(counterLog
, " count\ttotal(s)\tmax(s)\taverage(us)\n");
910 fprintf(counterLog
, "PumpNetworkSmsg: %d\t%.6f\t%.6f\t%.6f\n", comm_stats
.count_in_PumpNetwork
, comm_stats
.time_in_PumpNetwork
, comm_stats
.max_time_in_PumpNetwork
, comm_stats
.time_in_PumpNetwork
*1e6
/comm_stats
.count_in_PumpNetwork
);
911 fprintf(counterLog
, "PumpRemoteTransactions: %d\t%.6f\t%.6f\t%.6f\n", comm_stats
.count_in_PumpRemoteTransactions
, comm_stats
.time_in_PumpRemoteTransactions
, comm_stats
.max_time_in_PumpRemoteTransactions
, comm_stats
.time_in_PumpRemoteTransactions
*1e6
/comm_stats
.count_in_PumpRemoteTransactions
);
912 fprintf(counterLog
, "PumpLocalTransactions(RDMA): %d\t%.6f\t%.6f\t%.6f\n", comm_stats
.count_in_PumpLocalTransactions_rdma
, comm_stats
.time_in_PumpLocalTransactions_rdma
, comm_stats
.max_time_in_PumpLocalTransactions_rdma
, comm_stats
.time_in_PumpLocalTransactions_rdma
*1e6
/comm_stats
.count_in_PumpLocalTransactions_rdma
);
913 fprintf(counterLog
, "SendBufferMsg (SMSG): %d\t%.6f\t%.6f\t%.6f\n", comm_stats
.count_in_SendBufferMsg_smsg
, comm_stats
.time_in_SendBufferMsg_smsg
, comm_stats
.max_time_in_SendBufferMsg_smsg
, comm_stats
.time_in_SendBufferMsg_smsg
*1e6
/comm_stats
.count_in_SendBufferMsg_smsg
);
914 fprintf(counterLog
, "SendRdmaMsg: %d\t%.6f\t%.6f\t%.6f\n", comm_stats
.count_in_SendRdmaMsg
, comm_stats
.time_in_SendRdmaMsg
, comm_stats
.max_time_in_SendRdmaMsg
, comm_stats
.time_in_SendRdmaMsg
*1e6
/comm_stats
.count_in_SendRdmaMsg
);
916 fprintf(counterLog
, "PumpDatagramConnection: %d\t%.6f\t%.6f\t%.6f\n", comm_stats
.count_in_PumpDatagramConnection
, comm_stats
.time_in_PumpDatagramConnection
, comm_stats
.max_time_in_PumpDatagramConnection
, comm_stats
.time_in_PumpDatagramConnection
*1e6
/comm_stats
.count_in_PumpDatagramConnection
);
922 #define STATS_PUMPNETWORK_TIME(x) x
923 #define STATS_SEND_SMSGS_TIME(x) x
924 #define STATS_PUMPREMOTETRANSACTIONS_TIME(x) x
925 #define STATS_PUMPLOCALTRANSACTIONS_RDMA_TIME(x) x
926 #define STATS_SENDRDMAMSG_TIME(x) x
927 #define STATS_PUMPDATAGRAMCONNECTION_TIME(x) x
931 allgather(void *in
,void *out
, int len
)
933 static int *ivec_ptr
=NULL
,already_called
=0,job_size
=0;
936 char *tmp_buf
,*out_ptr
;
938 if(!already_called
) {
940 rc
= PMI_Get_size(&job_size
);
941 CmiAssert(rc
== PMI_SUCCESS
);
942 rc
= PMI_Get_rank(&my_rank
);
943 CmiAssert(rc
== PMI_SUCCESS
);
945 ivec_ptr
= (int *)malloc(sizeof(int) * job_size
);
946 CmiAssert(ivec_ptr
!= NULL
);
948 rc
= PMI_Allgather(&my_rank
,ivec_ptr
,sizeof(int));
949 CmiAssert(rc
== PMI_SUCCESS
);
955 tmp_buf
= (char *)malloc(job_size
* len
);
958 rc
= PMI_Allgather(in
,tmp_buf
,len
);
959 CmiAssert(rc
== PMI_SUCCESS
);
963 for(i
=0;i
<job_size
;i
++) {
965 memcpy(&out_ptr
[len
* ivec_ptr
[i
]],&tmp_buf
[i
* len
],len
);
973 allgather_2(void *in
,void *out
, int len
)
975 //PMI_Allgather is out of order
976 int i
,rc
, extend_len
;
978 char *out_ptr
, *out_ref
;
981 extend_len
= sizeof(int) + len
;
982 in2
= (char*)malloc(extend_len
);
984 memcpy(in2
, &myrank
, sizeof(int));
985 memcpy(in2
+sizeof(int), in
, len
);
987 out_ptr
= (char*)malloc(mysize
*extend_len
);
989 rc
= PMI_Allgather(in2
, out_ptr
, extend_len
);
990 GNI_RC_CHECK("allgather", rc
);
994 for(i
=0;i
<mysize
;i
++) {
996 memcpy(&rank_index
, &(out_ptr
[extend_len
*i
]), sizeof(int));
997 //copy to the rank index slot
998 memcpy(&out_ref
[rank_index
*len
], &out_ptr
[extend_len
*i
+sizeof(int)], len
);
1006 static unsigned int get_gni_nic_address(int device_id
)
1008 unsigned int address
, cpu_id
;
1009 gni_return_t status
;
1010 int i
, alps_dev_id
=-1,alps_address
=-1;
1011 char *token
, *p_ptr
;
1013 p_ptr
= getenv("PMI_GNI_DEV_ID");
1015 status
= GNI_CdmGetNicAddress(device_id
, &address
, &cpu_id
);
1017 GNI_RC_CHECK("GNI_CdmGetNicAddress", status
);
1019 while ((token
= strtok(p_ptr
,":")) != NULL
) {
1020 alps_dev_id
= atoi(token
);
1021 if (alps_dev_id
== device_id
) {
1026 CmiAssert(alps_dev_id
!= -1);
1027 p_ptr
= getenv("PMI_GNI_LOC_ADDR");
1028 CmiAssert(p_ptr
!= NULL
);
1030 while ((token
= strtok(p_ptr
,":")) != NULL
) {
1031 if (i
== alps_dev_id
) {
1032 alps_address
= atoi(token
);
1038 CmiAssert(alps_address
!= -1);
1039 address
= alps_address
;
1044 static uint8_t get_ptag(void)
1046 char *p_ptr
, *token
;
1049 p_ptr
= getenv("PMI_GNI_PTAG");
1050 CmiAssert(p_ptr
!= NULL
);
1051 token
= strtok(p_ptr
, ":");
1052 ptag
= (uint8_t)atoi(token
);
1057 static uint32_t get_cookie(void)
1060 char *p_ptr
, *token
;
1062 p_ptr
= getenv("PMI_GNI_COOKIE");
1063 CmiAssert(p_ptr
!= NULL
);
1064 token
= strtok(p_ptr
, ":");
1065 cookie
= (uint32_t)atoi(token
);
1072 /* directly mmap memory from hugetlbfs for large pages */
1074 #include <sys/stat.h>
1076 #include <sys/mman.h>
1077 #include <hugetlbfs.h>
1079 // size must be _tlbpagesize aligned
1080 void *my_get_huge_pages(size_t size
)
1084 mode_t mode
= S_IRUSR
| S_IWUSR
| S_IRGRP
| S_IROTH
;
1087 snprintf(filename
, sizeof(filename
), "%s/charm_mempool.%d.%d", hugetlbfs_find_path_for_size(_tlbpagesize
), getpid(), rand());
1088 fd
= open(filename
, O_RDWR
| O_CREAT
, mode
);
1090 CmiAbort("my_get_huge_pages: open filed");
1092 ptr
= mmap(NULL
, size
, PROT_READ
| PROT_WRITE
, MAP_PRIVATE
, fd
, 0);
1093 if (ptr
== MAP_FAILED
) ptr
= NULL
;
1094 //printf("[%d] my_get_huge_pages: %s %d %p\n", myrank, filename, size, ptr);
1100 void my_free_huge_pages(void *ptr
, int size
)
1102 //printf("[%d] my_free_huge_pages: %p %d\n", myrank, ptr, size);
1103 int ret
= munmap(ptr
, size
);
1104 if (ret
== -1) CmiAbort("munmap failed in my_free_huge_pages");
1109 /* =====Beginning of Definitions of Message-Corruption Related Macros=====*/
1110 /* TODO: add any that are related */
1111 /* =====End of Definitions of Message-Corruption Related Macros=====*/
1114 #include "machine-lrts.h"
1115 #include "machine-common-core.c"
1118 static int SendBufferMsg(SMSG_QUEUE
*queue
, SMSG_QUEUE
*urgent_queue
);
1119 static void SendRdmaMsg(PCQueue
);
1120 static void PumpNetworkSmsg();
1121 static void PumpLocalTransactions(gni_cq_handle_t tx_cqh
, CmiNodeLock cq_lock
);
1123 static void PumpCqWriteTransactions();
1126 static void PumpRemoteTransactions(gni_cq_handle_t
);
1129 #if MACHINE_DEBUG_LOG
1130 FILE *debugLog
= NULL
;
1131 static CmiInt8 buffered_recv_msg
= 0;
1132 int lrts_smsg_success
= 0;
1133 int lrts_received_msg
= 0;
1136 static void sweep_mempool(mempool_type
*mptr
)
1139 block_header
*current
= &(mptr
->block_head
);
1141 printf("[n %d %d] sweep_mempool slot START.\n", myrank
, n
++);
1142 while( current
!= NULL
) {
1143 printf("[n %d %d] sweep_mempool slot %p size: %lld used: %d (%d %d) %lld %lld.\n", myrank
, n
++, current
, current
->size
, 1<<current
->used
, current
->msgs_in_send
, current
->msgs_in_recv
, current
->mem_hndl
.qword1
, current
->mem_hndl
.qword2
);
1144 current
= current
->block_next
?(block_header
*)((char*)mptr
+current
->block_next
):NULL
;
1146 printf("[n %d] sweep_mempool slot END.\n", myrank
);
1150 static gni_return_t
deregisterMemory(mempool_type
*mptr
, block_header
**from
)
1152 block_header
*current
= *from
;
1154 //while(register_memory_size>= MAX_REG_MEM)
1156 while( current
!= NULL
&& ((current
->msgs_in_send
+current
->msgs_in_recv
)>0 || IsMemHndlZero(current
->mem_hndl
) ))
1157 current
= current
->block_next
?(block_header
*)((char*)mptr
+current
->block_next
):NULL
;
1160 if(current
== NULL
) return GNI_RC_ERROR_RESOURCE
;
1161 MEMORY_DEREGISTER(onesided_hnd
, nic_hndl
, &(GetMemHndlFromBlockHeader(current
)) , &omdh
, GetSizeFromBlockHeader(current
));
1162 SetMemHndlZero(GetMemHndlFromBlockHeader(current
));
1164 return GNI_RC_SUCCESS
;
1168 static gni_return_t
registerFromMempool(mempool_type
*mptr
, void *blockaddr
, size_t size
, gni_mem_handle_t
*memhndl
, gni_cq_handle_t cqh
)
1170 gni_return_t status
= GNI_RC_SUCCESS
;
1171 //int size = GetMempoolsize(msg);
1172 //void *blockaddr = GetMempoolBlockPtr(msg);
1173 //gni_mem_handle_t *memhndl = &(GetMemHndl(msg));
1175 block_header
*current
= &(mptr
->block_head
);
1176 while(register_memory_size
>= MAX_REG_MEM
)
1178 status
= deregisterMemory(mptr
, ¤t
);
1179 if (status
!= GNI_RC_SUCCESS
) break;
1181 if(register_memory_size
>= MAX_REG_MEM
) return status
;
1183 MACHSTATE3(8, "mempool (%lld,%lld,%d) \n", buffered_send_msg
, buffered_recv_msg
, register_memory_size
);
1186 MEMORY_REGISTER(onesided_hnd
, nic_hndl
, blockaddr
, size
, memhndl
, &omdh
, cqh
, status
);
1187 if(status
== GNI_RC_SUCCESS
)
1191 else if (status
== GNI_RC_INVALID_PARAM
|| status
== GNI_RC_PERMISSION_ERROR
)
1193 GNI_RC_CHECK("registerFromMempool", status
);
1197 status
= deregisterMemory(mptr
, ¤t
);
1198 if (status
!= GNI_RC_SUCCESS
) break;
1205 static gni_return_t
registerMemory(void *msg
, size_t size
, gni_mem_handle_t
*t
, gni_cq_handle_t cqh
)
1207 static int rank
= -1;
1209 gni_return_t status
;
1210 mempool_type
*mptr1
= CpvAccess(mempool
);//mempool_type*)GetMempoolPtr(msg);
1211 //mempool_type *mptr1 = (mempool_type*)GetMempoolPtr(msg);
1214 status
= registerFromMempool(mptr1
, msg
, size
, t
, cqh
);
1215 if (status
== GNI_RC_SUCCESS
) return status
;
1217 for (i
=0; i
<CmiMyNodeSize()+1; i
++) {
1218 rank
= (rank
+1)%(CmiMyNodeSize()+1);
1219 mptr
= CpvAccessOther(mempool
, rank
);
1220 if (mptr
== mptr1
) continue;
1221 status
= registerFromMempool(mptr
, msg
, size
, t
, cqh
);
1222 if (status
== GNI_RC_SUCCESS
) return status
;
1225 return GNI_RC_ERROR_RESOURCE
;
1229 static void buffer_small_msgs(SMSG_QUEUE
*queue
, void *msg
, int size
, int destNode
, uint8_t tag
)
1232 MallocMsgList(msg_tmp
);
1233 msg_tmp
->destNode
= destNode
;
1234 msg_tmp
->size
= size
;
1238 SMSG_CREATION(msg_tmp
)
1242 PCQueuePush(queue
->sendMsgBuf
, (char*)msg_tmp
);
1245 CmiLock(queue
->smsg_msglist_index
[destNode
].lock
);
1246 if(queue
->smsg_msglist_index
[destNode
].pushed
== 0)
1248 PCQueuePush(queue
->nonEmptyQueues
, (char*)&(queue
->smsg_msglist_index
[destNode
]));
1250 PCQueuePush(queue
->smsg_msglist_index
[destNode
].sendSmsgBuf
, (char*)msg_tmp
);
1251 CmiUnlock(queue
->smsg_msglist_index
[destNode
].lock
);
1253 PCQueuePush(queue
->smsg_msglist_index
[destNode
].sendSmsgBuf
, (char*)msg_tmp
);
1258 buffered_smsg_counter
++;
1262 inline static void print_smsg_attr(gni_smsg_attr_t
*a
)
1264 printf("type=%d\n, credit=%d\n, size=%d\n, buf=%p, offset=%d\n", a
->msg_type
, a
->mbox_maxcredit
, a
->buff_size
, a
->msg_buffer
, a
->mbox_offset
);
1268 static void setup_smsg_connection(int destNode
)
1270 mdh_addr_list_t
*new_entry
= 0;
1271 gni_post_descriptor_t
*pd
;
1272 gni_smsg_attr_t
*smsg_attr
;
1273 gni_return_t status
= GNI_RC_NOT_DONE
;
1274 RDMA_REQUEST
*rdma_request_msg
;
1276 if(smsg_available_slot
== smsg_expand_slots
)
1278 new_entry
= (mdh_addr_list_t
*)malloc(sizeof(mdh_addr_list_t
));
1279 new_entry
->addr
= memalign(64, smsg_memlen
*smsg_expand_slots
);
1280 bzero(new_entry
->addr
, smsg_memlen
*smsg_expand_slots
);
1282 status
= GNI_MemRegister(nic_hndl
, (uint64_t)new_entry
->addr
,
1283 smsg_memlen
*smsg_expand_slots
, smsg_rx_cqh
,
1287 smsg_available_slot
= 0;
1288 new_entry
->next
= smsg_dynamic_list
;
1289 smsg_dynamic_list
= new_entry
;
1291 smsg_attr
= (gni_smsg_attr_t
*) malloc (sizeof(gni_smsg_attr_t
));
1292 smsg_attr
->msg_type
= GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT
;
1293 smsg_attr
->mbox_maxcredit
= SMSG_MAX_CREDIT
;
1294 smsg_attr
->msg_maxsize
= SMSG_MAX_MSG
;
1295 smsg_attr
->mbox_offset
= smsg_available_slot
* smsg_memlen
;
1296 smsg_attr
->buff_size
= smsg_memlen
;
1297 smsg_attr
->msg_buffer
= smsg_dynamic_list
->addr
;
1298 smsg_attr
->mem_hndl
= smsg_dynamic_list
->mdh
;
1299 smsg_local_attr_vec
[destNode
] = smsg_attr
;
1300 smsg_available_slot
++;
1302 pd
->type
= GNI_POST_FMA_PUT
;
1303 pd
->cq_mode
= GNI_CQMODE_GLOBAL_EVENT
;
1304 pd
->dlvr_mode
= GNI_DLVMODE_PERFORMANCE
;
1305 pd
->length
= sizeof(gni_smsg_attr_t
);
1306 pd
->local_addr
= (uint64_t) smsg_attr
;
1307 pd
->remote_addr
= (uint64_t)&((((gni_smsg_attr_t
*)(smsg_connection_vec
[destNode
].addr
))[myrank
]));
1308 pd
->remote_mem_hndl
= smsg_connection_vec
[destNode
].mdh
;
1309 pd
->src_cq_hndl
= 0;
1312 status
= GNI_PostFma(ep_hndl_array
[destNode
], pd
);
1313 print_smsg_attr(smsg_attr
);
1314 if(status
== GNI_RC_ERROR_RESOURCE
)
1316 MallocRdmaRequest(rdma_request_msg
);
1317 rdma_request_msg
->destNode
= destNode
;
1318 rdma_request_msg
->pd
= pd
;
1319 /* buffer this request */
1322 if(status
!= GNI_RC_SUCCESS
)
1323 printf("[%d=%d] send post FMA %s\n", myrank
, destNode
, gni_err_str
[status
]);
1325 printf("[%d=%d]OK send post FMA \n", myrank
, destNode
);
1329 /* useDynamicSMSG */
1331 static void alloc_smsg_attr( gni_smsg_attr_t
*local_smsg_attr
)
1333 gni_return_t status
= GNI_RC_NOT_DONE
;
1335 if(mailbox_list
->offset
== mailbox_list
->size
)
1337 dynamic_smsg_mailbox_t
*new_mailbox_entry
;
1338 new_mailbox_entry
= (dynamic_smsg_mailbox_t
*)malloc(sizeof(dynamic_smsg_mailbox_t
));
1339 new_mailbox_entry
->size
= smsg_memlen
*avg_smsg_connection
;
1340 new_mailbox_entry
->mailbox_base
= malloc(new_mailbox_entry
->size
);
1341 bzero(new_mailbox_entry
->mailbox_base
, new_mailbox_entry
->size
);
1342 new_mailbox_entry
->offset
= 0;
1344 status
= GNI_MemRegister(nic_hndl
, (uint64_t)new_mailbox_entry
->mailbox_base
,
1345 new_mailbox_entry
->size
, smsg_rx_cqh
,
1348 &(new_mailbox_entry
->mem_hndl
));
1350 GNI_RC_CHECK("register", status
);
1351 new_mailbox_entry
->next
= mailbox_list
;
1352 mailbox_list
= new_mailbox_entry
;
1354 local_smsg_attr
->msg_type
= GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT
;
1355 local_smsg_attr
->mbox_maxcredit
= SMSG_MAX_CREDIT
;
1356 local_smsg_attr
->msg_maxsize
= SMSG_MAX_MSG
;
1357 local_smsg_attr
->mbox_offset
= mailbox_list
->offset
;
1358 mailbox_list
->offset
+= smsg_memlen
;
1359 local_smsg_attr
->buff_size
= smsg_memlen
;
1360 local_smsg_attr
->msg_buffer
= mailbox_list
->mailbox_base
;
1361 local_smsg_attr
->mem_hndl
= mailbox_list
->mem_hndl
;
1364 /* useDynamicSMSG */
1366 static int connect_to(int destNode
)
1368 gni_return_t status
= GNI_RC_NOT_DONE
;
1369 CmiAssert(smsg_connected_flag
[destNode
] == 0);
1370 CmiAssert (smsg_attr_vector_local
[destNode
] == NULL
);
1371 smsg_attr_vector_local
[destNode
] = (gni_smsg_attr_t
*) malloc (sizeof(gni_smsg_attr_t
));
1372 alloc_smsg_attr(smsg_attr_vector_local
[destNode
]);
1373 smsg_attr_vector_remote
[destNode
] = (gni_smsg_attr_t
*) malloc (sizeof(gni_smsg_attr_t
));
1375 CMI_GNI_LOCK(global_gni_lock
)
1376 status
= GNI_EpPostDataWId (ep_hndl_array
[destNode
], smsg_attr_vector_local
[destNode
], sizeof(gni_smsg_attr_t
),smsg_attr_vector_remote
[destNode
] ,sizeof(gni_smsg_attr_t
), destNode
+mysize
);
1377 CMI_GNI_UNLOCK(global_gni_lock
)
1378 if (status
== GNI_RC_ERROR_RESOURCE
) {
1379 /* possibly destNode is making connection at the same time */
1380 free(smsg_attr_vector_local
[destNode
]);
1381 smsg_attr_vector_local
[destNode
] = NULL
;
1382 free(smsg_attr_vector_remote
[destNode
]);
1383 smsg_attr_vector_remote
[destNode
] = NULL
;
1384 mailbox_list
->offset
-= smsg_memlen
;
1386 printf("[%d] send connect_to request to %d failed\n", myrank
, destNode
);
1390 GNI_RC_CHECK("GNI_Post", status
);
1391 smsg_connected_flag
[destNode
] = 1;
1393 printf("[%d] send connect_to request to %d done\n", myrank
, destNode
);
1399 static gni_return_t
send_smsg_message(SMSG_QUEUE
*queue
, int destNode
, void *msg
, int size
, uint8_t tag
, int inbuff
, MSG_LIST
*ptr
)
1401 unsigned int remote_address
;
1403 gni_return_t status
= GNI_RC_ERROR_RESOURCE
;
1404 gni_smsg_attr_t
*smsg_attr
;
1405 gni_post_descriptor_t
*pd
;
1406 gni_post_state_t post_state
;
1409 if (useDynamicSMSG
) {
1410 switch (smsg_connected_flag
[destNode
]) {
1412 connect_to(destNode
); /* continue to case 1 */
1413 case 1: /* pending connection, do nothing */
1414 status
= GNI_RC_NOT_DONE
;
1416 buffer_small_msgs(queue
, msg
, size
, destNode
, tag
);
1420 #if ! ONE_SEND_QUEUE
1421 if(PCQueueEmpty(queue
->smsg_msglist_index
[destNode
].sendSmsgBuf
) || inbuff
==1)
1424 //CMI_GNI_LOCK(smsg_mailbox_lock)
1425 CMI_GNI_LOCK(default_tx_cq_lock
)
1426 #if CMK_SMP_TRACE_COMMTHREAD
1428 int oldeventid
= -1;
1429 if(tag
== SMALL_DATA_TAG
|| tag
== LMSG_INIT_TAG
|| tag
== LMSG_OOB_INIT_TAG
)
1432 if ( tag
== SMALL_DATA_TAG
)
1433 real_data
= (char*)msg
;
1435 real_data
= (char*)(((CONTROL_MSG
*)msg
)->source_addr
);
1436 TRACE_COMM_GET_MSGID(real_data
, &oldpe
, &oldeventid
);
1437 TRACE_COMM_SET_COMM_MSGID(real_data
);
1441 if (tag
== LMSG_INIT_TAG
|| tag
== LMSG_OOB_INIT_TAG
) {
1442 CONTROL_MSG
*control_msg_tmp
= (CONTROL_MSG
*)msg
;
1443 if (control_msg_tmp
->seq_id
== 0 && control_msg_tmp
->ack_index
== -1)
1444 control_msg_tmp
->ack_index
= IndexPool_getslot(&ackPool
, (void*)control_msg_tmp
->source_addr
, 1);
1446 // GNI_EpSetEventData(ep_hndl_array[destNode], destNode, myrank);
1452 double creation_time
;
1454 creation_time
= CmiWallTimer();
1456 creation_time
= ptr
->creation_time
;
1459 status
= GNI_SmsgSendWTag(ep_hndl_array
[destNode
], NULL
, 0, msg
, size
, 0, tag
);
1460 #if CMK_SMP_TRACE_COMMTHREAD
1461 if (oldpe
!= -1) TRACE_COMM_SET_MSGID(real_data
, oldpe
, oldeventid
);
1463 CMI_GNI_UNLOCK(default_tx_cq_lock
)
1464 //CMI_GNI_UNLOCK(smsg_mailbox_lock)
1465 if(status
== GNI_RC_SUCCESS
)
1468 SMSG_SENT_DONE(creation_time
,tag
)
1470 #if CMK_SMP_TRACE_COMMTHREAD
1471 if(tag
== SMALL_DATA_TAG
|| tag
== LMSG_INIT_TAG
|| tag
== LMSG_OOB_INIT_TAG
)
1473 TRACE_COMM_CREATION(CpvAccess(projTraceStart
), real_data
);
1477 status
= GNI_RC_ERROR_RESOURCE
;
1479 if(status
!= GNI_RC_SUCCESS
&& inbuff
==0)
1480 buffer_small_msgs(queue
, msg
, size
, destNode
, tag
);
1485 static CONTROL_MSG
* construct_control_msg(int size
, char *msg
, uint8_t seqno
)
1487 /* construct a control message and send */
1488 CONTROL_MSG
*control_msg_tmp
;
1489 MallocControlMsg(control_msg_tmp
);
1490 control_msg_tmp
->source_addr
= (uint64_t)msg
;
1491 control_msg_tmp
->seq_id
= seqno
;
1492 control_msg_tmp
->total_length
= control_msg_tmp
->length
= ALIGN64(size
); //for GET 4 bytes aligned
1494 control_msg_tmp
->ack_index
= -1;
1496 #if USE_LRTS_MEMPOOL
1499 control_msg_tmp
->source_mem_hndl
= GetMemHndl(msg
);
1503 SetMemHndlZero(control_msg_tmp
->source_mem_hndl
);
1504 control_msg_tmp
->length
= size
- (seqno
-1)*ONE_SEG
;
1505 if (control_msg_tmp
->length
> ONE_SEG
) control_msg_tmp
->length
= ONE_SEG
;
1508 SetMemHndlZero(control_msg_tmp
->source_mem_hndl
);
1510 return control_msg_tmp
;
1513 #define BLOCKING_SEND_CONTROL 0
1515 // Large message, send control to receiver, receiver register memory and do a GET,
1516 // return 1 - send no success
1517 inline static gni_return_t
send_large_messages(SMSG_QUEUE
*queue
, int destNode
, CONTROL_MSG
*control_msg_tmp
, int inbuff
, MSG_LIST
*smsg_ptr
, uint8_t lmsg_tag
)
1519 gni_return_t status
= GNI_RC_ERROR_NOMEM
;
1520 uint32_t vmdh_index
= -1;
1523 uint64_t source_addr
;
1527 size
= control_msg_tmp
->total_length
;
1528 source_addr
= control_msg_tmp
->source_addr
;
1529 register_size
= control_msg_tmp
->length
;
1531 #if USE_LRTS_MEMPOOL
1532 if( control_msg_tmp
->seq_id
== 0 ){
1533 #if BLOCKING_SEND_CONTROL
1534 if (inbuff
== 0 && IsMemHndlZero(GetMemHndl(source_addr
))) {
1535 while (IsMemHndlZero(GetMemHndl(source_addr
)) && buffered_send_msg
+ GetMempoolsize((void*)source_addr
) >= MAX_BUFF_SEND
)
1536 LrtsAdvanceCommunication(0);
1539 if(IsMemHndlZero(GetMemHndl(source_addr
))) //it is in mempool, it is possible to be de-registered by others
1541 msg
= (void*)source_addr
;
1542 if(buffered_send_msg
+ GetMempoolsize(msg
) >= MAX_BUFF_SEND
)
1545 buffer_small_msgs(queue
, control_msg_tmp
, CONTROL_MSG_SIZE
, destNode
, lmsg_tag
);
1546 return GNI_RC_ERROR_NOMEM
;
1548 //register the corresponding mempool
1549 status
= registerMemory(GetMempoolBlockPtr(msg
), GetMempoolsize(msg
), &(GetMemHndl(msg
)), rdma_rx_cqh
);
1550 if(status
== GNI_RC_SUCCESS
)
1552 control_msg_tmp
->source_mem_hndl
= GetMemHndl(source_addr
);
1556 control_msg_tmp
->source_mem_hndl
= GetMemHndl(source_addr
);
1557 status
= GNI_RC_SUCCESS
;
1559 if(NoMsgInSend(source_addr
))
1560 register_size
= GetMempoolsize((void*)(source_addr
));
1563 }else if(control_msg_tmp
->seq_id
>0) // BIG_MSG
1565 int offset
= ONE_SEG
*(control_msg_tmp
->seq_id
-1);
1566 source_addr
+= offset
;
1567 size
= control_msg_tmp
->length
;
1568 #if BLOCKING_SEND_CONTROL
1569 if (inbuff
== 0 && IsMemHndlZero(control_msg_tmp
->source_mem_hndl
)) {
1570 while (IsMemHndlZero(control_msg_tmp
->source_mem_hndl
) && buffered_send_msg
+ size
>= MAX_BUFF_SEND
)
1571 LrtsAdvanceCommunication(0);
1574 if (IsMemHndlZero(control_msg_tmp
->source_mem_hndl
)) {
1575 if(buffered_send_msg
+ size
>= MAX_BUFF_SEND
)
1578 buffer_small_msgs(queue
, control_msg_tmp
, CONTROL_MSG_SIZE
, destNode
, lmsg_tag
);
1579 return GNI_RC_ERROR_NOMEM
;
1581 status
= registerMemory((void*)source_addr
, ALIGN64(size
), &(control_msg_tmp
->source_mem_hndl
), NULL
);
1582 if(status
== GNI_RC_SUCCESS
) buffered_send_msg
+= ALIGN64(size
);
1586 status
= GNI_RC_SUCCESS
;
1591 #if CMI_EXERT_SEND_LARGE_CAP
1592 if(SEND_large_pending
>= SEND_large_cap
)
1594 status
= GNI_RC_ERROR_NOMEM
;
1598 if(status
== GNI_RC_SUCCESS
)
1600 status
= send_smsg_message( queue
, destNode
, control_msg_tmp
, CONTROL_MSG_SIZE
, lmsg_tag
, inbuff
, smsg_ptr
);
1601 if(status
== GNI_RC_SUCCESS
)
1603 #if CMI_EXERT_SEND_LARGE_CAP
1604 SEND_large_pending
++;
1606 buffered_send_msg
+= register_size
;
1607 if(control_msg_tmp
->seq_id
== 0)
1609 IncreaseMsgInSend(source_addr
);
1611 FreeControlMsg(control_msg_tmp
);
1612 MACHSTATE5(8, "GO SMSG LARGE to %d (%d,%d,%d) tag=%d\n", destNode
, buffered_send_msg
, buffered_recv_msg
, register_memory_size
, lmsg_tag
);
1614 status
= GNI_RC_ERROR_RESOURCE
;
1616 } else if (status
== GNI_RC_INVALID_PARAM
|| status
== GNI_RC_PERMISSION_ERROR
)
1618 CmiAbort("Memory registor for large msg\n");
1621 status
= GNI_RC_ERROR_NOMEM
;
1623 buffer_small_msgs(queue
, control_msg_tmp
, CONTROL_MSG_SIZE
, destNode
, lmsg_tag
);
1627 MEMORY_REGISTER(onesided_hnd
, nic_hndl
,msg
, ALIGN64(size
), &(control_msg_tmp
->source_mem_hndl
), &omdh
, NULL
, status
)
1628 if(status
== GNI_RC_SUCCESS
)
1630 status
= send_smsg_message(queue
, destNode
, control_msg_tmp
, CONTROL_MSG_SIZE
, lmsg_tag
, 0, NULL
);
1631 if(status
== GNI_RC_SUCCESS
)
1633 FreeControlMsg(control_msg_tmp
);
1635 } else if (status
== GNI_RC_INVALID_PARAM
|| status
== GNI_RC_PERMISSION_ERROR
)
1637 CmiAbort("Memory registor for large msg\n");
1640 buffer_small_msgs(queue
, control_msg_tmp
, CONTROL_MSG_SIZE
, destNode
, lmsg_tag
);
1646 inline void LrtsPrepareEnvelope(char *msg
, int size
)
1648 CmiSetMsgSize(msg
, size
);
1649 CMI_SET_CHECKSUM(msg
, size
);
1652 CmiCommHandle
LrtsSendFunc(int destNode
, int size
, char *msg
, int mode
)
1654 gni_return_t status
= GNI_RC_SUCCESS
;
1656 CONTROL_MSG
*control_msg_tmp
;
1657 int oob
= ( mode
& OUT_OF_BAND
);
1660 MACHSTATE5(8, "GO LrtsSendFn %d(%d) (%d,%d, %d) \n", destNode
, size
, buffered_send_msg
, buffered_recv_msg
, register_memory_size
);
1662 queue
= oob
? &smsg_oob_queue
: &smsg_queue
;
1663 tag
= oob
? LMSG_OOB_INIT_TAG
: LMSG_INIT_TAG
;
1665 queue
= &smsg_queue
;
1666 tag
= LMSG_INIT_TAG
;
1669 LrtsPrepareEnvelope(msg
, size
);
1672 printf("LrtsSendFn %d==>%d, size=%d\n", myrank
, destNode
, size
);
1676 if(size
<= SMSG_MAX_MSG
)
1677 buffer_small_msgs(queue
, msg
, size
, destNode
, SMALL_DATA_TAG
);
1678 else if (size
< BIG_MSG
) {
1679 control_msg_tmp
= construct_control_msg(size
, msg
, 0);
1680 buffer_small_msgs(queue
, control_msg_tmp
, CONTROL_MSG_SIZE
, destNode
, tag
);
1683 CmiSetMsgSeq(msg
, 0);
1684 control_msg_tmp
= construct_control_msg(size
, msg
, 1);
1685 buffer_small_msgs(queue
, control_msg_tmp
, CONTROL_MSG_SIZE
, destNode
, tag
);
1687 #else //non-smp, smp(worker sending)
1688 if(size
<= SMSG_MAX_MSG
)
1690 if (GNI_RC_SUCCESS
== send_smsg_message(queue
, destNode
, msg
, size
, SMALL_DATA_TAG
, 0, NULL
))
1693 else if (size
< BIG_MSG
) {
1694 control_msg_tmp
= construct_control_msg(size
, msg
, 0);
1695 send_large_messages(queue
, destNode
, control_msg_tmp
, 0, NULL
, tag
);
1698 #if USE_LRTS_MEMPOOL
1699 CmiSetMsgSeq(msg
, 0);
1700 control_msg_tmp
= construct_control_msg(size
, msg
, 1);
1701 send_large_messages(queue
, destNode
, control_msg_tmp
, 0, NULL
, tag
);
1703 control_msg_tmp
= construct_control_msg(size
, msg
, 0);
1704 send_large_messages(queue
, destNode
, control_msg_tmp
, 0, NULL
, tag
);
1711 void LrtsSyncListSendFn(int npes
, int *pes
, int len
, char *msg
)
1714 #if CMK_BROADCAST_USE_CMIREFERENCE
1715 for(i
=0;i
<npes
;i
++) {
1716 if (pes
[i
] == CmiMyPe())
1717 CmiSyncSend(pes
[i
], len
, msg
);
1720 CmiSyncSendAndFree(pes
[i
], len
, msg
);
1724 for(i
=0;i
<npes
;i
++) {
1725 CmiSyncSend(pes
[i
], len
, msg
);
1730 CmiCommHandle
LrtsAsyncListSendFn(int npes
, int *pes
, int len
, char *msg
)
1732 /* A better asynchronous implementation may be wanted, but at least it works */
1733 CmiSyncListSendFn(npes
, pes
, len
, msg
);
1734 return (CmiCommHandle
) 0;
1737 void LrtsFreeListSendFn(int npes
, int *pes
, int len
, char *msg
)
1740 CmiSyncSendAndFree(pes
[0], len
, msg
);
1743 #if CMK_PERSISTENT_COMM
1744 if (CpvAccess(phs
) && len
> PERSIST_MIN_SIZE
1746 && IS_PERSISTENT_MEMORY(msg
)
1750 for(i
=0;i
<npes
;i
++) {
1751 if (pes
[i
] == CmiMyPe())
1752 CmiSyncSend(pes
[i
], len
, msg
);
1755 CmiSyncSendAndFree(pes
[i
], len
, msg
);
1763 #if CMK_BROADCAST_USE_CMIREFERENCE
1764 CmiSyncListSendFn(npes
, pes
, len
, msg
);
1768 for(i
=0;i
<npes
-1;i
++) {
1769 CmiSyncSend(pes
[i
], len
, msg
);
1772 CmiSyncSendAndFree(pes
[npes
-1], len
, msg
);
1778 static void PumpDatagramConnection();
1779 static int event_SetupConnect
= 111;
1780 static int event_PumpSmsg
= 222 ;
1781 static int event_PumpTransaction
= 333;
1782 static int event_PumpRdmaTransaction
= 444;
1783 static int event_SendBufferSmsg
= 484;
1784 static int event_SendFmaRdmaMsg
= 555;
1785 static int event_AdvanceCommunication
= 666;
1787 static void registerUserTraceEvents() {
1788 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1789 event_SetupConnect
= traceRegisterUserEvent("setting up connections", -1 );
1790 event_PumpSmsg
= traceRegisterUserEvent("Pump network small msgs", -1);
1791 event_PumpTransaction
= traceRegisterUserEvent("Pump FMA/RDMA local transaction" , -1);
1792 event_PumpRdmaTransaction
= traceRegisterUserEvent("Pump RDMA remote event" , -1);
1793 event_SendBufferSmsg
= traceRegisterUserEvent("Sending buffered small msgs", -1);
1794 event_SendFmaRdmaMsg
= traceRegisterUserEvent("Sending buffered fma/rdma transactions", -1);
1795 event_AdvanceCommunication
= traceRegisterUserEvent("Worker thread in sending/receiving", -1);
1799 static void ProcessDeadlock()
1801 static CmiUInt8
*ptr
= NULL
;
1802 static CmiUInt8 last
= 0, mysum
, sum
;
1803 static int count
= 0;
1804 gni_return_t status
;
1807 //printf("[%d] comm thread detected hang %d %d %d\n", CmiMyPe(), smsg_send_count, smsg_recv_count, count);
1808 //sweep_mempool(CpvAccess(mempool));
1809 if (ptr
== NULL
) ptr
= (CmiUInt8
*)malloc(mysize
* sizeof(CmiUInt8
));
1810 mysum
= smsg_send_count
+ smsg_recv_count
;
1811 MACHSTATE5(9,"Before allgather Progress Deadlock (%d,%d) (%d,%d)(%d)\n", buffered_send_msg
, register_memory_size
, last
, sum
, count
);
1812 status
= PMI_Allgather(&mysum
,ptr
,sizeof(CmiUInt8
));
1813 GNI_RC_CHECK("PMI_Allgather", status
);
1815 for (i
=0; i
<mysize
; i
++) sum
+= ptr
[i
];
1816 if (last
== 0 || sum
== last
)
1821 MACHSTATE5(9,"Progress Deadlock (%d,%d) (%d,%d)(%d)\n", buffered_send_msg
, register_memory_size
, last
, sum
, count
);
1823 /* detected twice, it is a real deadlock */
1825 CmiPrintf("Charm++> Network progress engine appears to have stalled, possibly because registered memory limits have been exceeded or are too low. Try adjusting environment variables CHARM_UGNI_MEMPOOL_MAX and CHARM_UGNI_SEND_MAX (current limits are %lld and %lld).\n", MAX_REG_MEM
, MAX_BUFF_SEND
);
1826 CmiAbort("Fatal> Deadlock detected.");
1833 static void CheckProgress()
1835 if (smsg_send_count
== last_smsg_send_count
&&
1836 smsg_recv_count
== last_smsg_recv_count
)
1840 if (_detected_hang
) ProcessDeadlock();
1845 //MACHSTATE5(9,"--Check Progress %d(%d, %d) (%d,%d)\n", mycount, buffered_send_msg, register_memory_size, smsg_send_count, smsg_recv_count);
1846 last_smsg_send_count
= smsg_send_count
;
1847 last_smsg_recv_count
= smsg_recv_count
;
1852 static void set_limit()
1854 //if (!user_set_flag && CmiMyRank() == 0) {
1855 if (CmiMyRank() == 0) {
1856 int mynode
= CmiPhysicalNodeID(CmiMyPe());
1857 int numpes
= CmiNumPesOnPhysicalNode(mynode
);
1858 int numprocesses
= numpes
/ CmiMyNodeSize();
1859 MAX_REG_MEM
= _totalmem
/ numprocesses
;
1860 MAX_BUFF_SEND
= MAX_REG_MEM
/ 2;
1862 printf("mem_max = %.2fM, send_max =%.2fM\n", MAX_REG_MEM
/1024.0/1024, MAX_BUFF_SEND
/1024./1024);
1863 if(CmiMyPe() == 0 && (smsg_memlen
*mysize
+ _expand_mem
> MAX_BUFF_SEND
|| smsg_memlen
*mysize
+ _mempool_size
> MAX_BUFF_SEND
))
1865 printf("Charm++> FATAL ERROR your program has risk of hanging \n please try large page or use Dynamic smsg +useDynamicSmsg or contact Charm++ developers\n");
1866 CmiAbort("memory registration\n");
1871 void LrtsPostCommonInit(int everReturn
)
1876 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1877 CpvInitialize(double, projTraceStart
);
1878 /* only PE 0 needs to care about registration (to generate sts file). */
1879 //if (CmiMyPe() == 0)
1881 registerMachineUserEventsFunction(®isterUserTraceEvents
);
1886 CmiIdleState
*s
=CmiNotifyGetState();
1887 CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE
,(CcdVoidFn
)CmiNotifyBeginIdle
,(void *)s
);
1888 CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE
,(CcdVoidFn
)CmiNotifyStillIdle
,(void *)s
);
1890 CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE
,(CcdVoidFn
)CmiNotifyStillIdle
,NULL
);
1892 CcdCallOnConditionKeep(CcdPERIODIC_10ms
, (CcdVoidFn
) PumpDatagramConnection
, NULL
);
1898 if (CmiMyRank() == 0)
1900 CcdCallOnConditionKeep(CcdPERIODIC_2minute
, (CcdVoidFn
) CheckProgress
, NULL
);
1904 CcdCallOnCondition(CcdTOPOLOGY_AVAIL
, (CcdVoidFn
)set_limit
, NULL
);
1908 /* this is called by worker thread */
1909 void LrtsPostNonLocal()
1913 #if CMK_SMP_TRACE_COMMTHREAD
1914 double startT
, endT
;
1917 #if MULTI_THREAD_SEND
1918 if(mysize
== 1) return;
1920 if (CmiMyRank() % 6 != 3) return;
1922 #if CMK_SMP_TRACE_COMMTHREAD
1924 startT
= CmiWallTimer();
1927 CmiMachineProgressImpl();
1929 #if CMK_SMP_TRACE_COMMTHREAD
1930 endT
= CmiWallTimer();
1931 traceUserBracketEvent(event_AdvanceCommunication
, startT
, endT
);
1939 /* Network progress function is used to poll the network when for
1940 messages. This flushes receive buffers on some implementations*/
1941 #if CMK_MACHINE_PROGRESS_DEFINED
1942 void CmiMachineProgressImpl() {
1943 #if ! CMK_SMP || MULTI_THREAD_SEND
1945 STATS_PUMPNETWORK_TIME(PumpNetworkSmsg());
1946 SEND_OOB_SMSG(smsg_oob_queue
)
1947 PUMP_REMOTE_HIGHPRIORITY
1948 PUMP_LOCAL_HIGHPRIORITY
1949 POST_HIGHPRIORITY_RDMA
1952 #if CMK_WORKER_SINGLE_TASK
1953 if (CmiMyRank() % 6 == 0)
1957 #if CMK_WORKER_SINGLE_TASK
1958 if (CmiMyRank() % 6 == 1)
1960 PumpLocalTransactions(default_tx_cqh
, default_tx_cq_lock
);
1962 #if CMK_WORKER_SINGLE_TASK
1963 if (CmiMyRank() % 6 == 2)
1965 PumpLocalTransactions(rdma_tx_cqh
, rdma_tx_cq_lock
);
1968 #if CMK_WORKER_SINGLE_TASK
1969 if (CmiMyRank() % 6 == 3)
1971 PumpRemoteTransactions(rdma_rx_cqh
); // rdma_rx_cqh
1974 #if CMK_WORKER_SINGLE_TASK
1975 if (CmiMyRank() % 6 == 4)
1979 SendBufferMsg(&smsg_oob_queue
, NULL
);
1980 SendBufferMsg(&smsg_queue
, &smsg_oob_queue
);
1982 SendBufferMsg(&smsg_queue
, NULL
);
1986 #if CMK_WORKER_SINGLE_TASK
1987 if (CmiMyRank() % 6 == 5)
1990 STATS_SENDRDMAMSG_TIME(SendRdmaMsg(sendRdmaBuf
));
1992 STATS_SENDRDMAMSG_TIME(SendRdmaMsg());
2001 /* useDynamicSMSG */
2002 static void PumpDatagramConnection()
2004 uint32_t remote_address
;
2006 gni_return_t status
;
2007 gni_post_state_t post_state
;
2008 uint64_t datagram_id
;
2011 while ((status
= GNI_PostDataProbeById(nic_hndl
, &datagram_id
)) == GNI_RC_SUCCESS
)
2013 if (datagram_id
>= mysize
) { /* bound endpoint */
2014 int pe
= datagram_id
- mysize
;
2015 CMI_GNI_LOCK(global_gni_lock
)
2016 status
= GNI_EpPostDataTestById( ep_hndl_array
[pe
], datagram_id
, &post_state
, &remote_address
, &remote_id
);
2017 CMI_GNI_UNLOCK(global_gni_lock
)
2018 if(status
== GNI_RC_SUCCESS
&& post_state
== GNI_POST_COMPLETED
)
2020 CmiAssert(remote_id
== pe
);
2021 status
= GNI_SmsgInit(ep_hndl_array
[pe
], smsg_attr_vector_local
[pe
], smsg_attr_vector_remote
[pe
]);
2022 GNI_RC_CHECK("Dynamic SMSG Init", status
);
2024 printf("[%d] ++ Dynamic SMSG setup [%d===>%d] done\n", myrank
, myrank
, pe
);
2026 CmiAssert(smsg_connected_flag
[pe
] == 1);
2027 smsg_connected_flag
[pe
] = 2;
2030 else { /* unbound ep */
2031 status
= GNI_EpPostDataTestById( ep_hndl_unbound
, datagram_id
, &post_state
, &remote_address
, &remote_id
);
2032 if(status
== GNI_RC_SUCCESS
&& post_state
== GNI_POST_COMPLETED
)
2034 CmiAssert(remote_id
<mysize
);
2035 CmiAssert(smsg_connected_flag
[remote_id
] <= 0);
2036 status
= GNI_SmsgInit(ep_hndl_array
[remote_id
], &send_smsg_attr
, &recv_smsg_attr
);
2037 GNI_RC_CHECK("Dynamic SMSG Init", status
);
2039 printf("[%d] ++ Dynamic SMSG setup2 [%d===>%d] done\n", myrank
, myrank
, remote_id
);
2041 smsg_connected_flag
[remote_id
] = 2;
2043 alloc_smsg_attr(&send_smsg_attr
);
2044 status
= GNI_EpPostDataWId (ep_hndl_unbound
, &send_smsg_attr
, SMSG_ATTR_SIZE
, &recv_smsg_attr
, SMSG_ATTR_SIZE
, myrank
);
2045 GNI_RC_CHECK("post unbound datagram", status
);
2051 /* pooling CQ to receive network message */
2052 static void PumpNetworkRdmaMsgs()
2054 gni_cq_entry_t event_data
;
2055 gni_return_t status
;
2060 static void bufferRdmaMsg(PCQueue bufferqueue
, int inst_id
, gni_post_descriptor_t
*pd
, int ack_index
)
2062 RDMA_REQUEST
*rdma_request_msg
;
2063 MallocRdmaRequest(rdma_request_msg
);
2064 rdma_request_msg
->destNode
= inst_id
;
2065 rdma_request_msg
->pd
= pd
;
2067 rdma_request_msg
->ack_index
= ack_index
;
2069 PCQueuePush(bufferqueue
, (char*)rdma_request_msg
);
2072 static void getLargeMsgRequest(void* header
, uint64_t inst_id
, uint8_t tag
, PCQueue
);
2074 static void PumpNetworkSmsg()
2077 gni_cq_entry_t event_data
;
2078 gni_return_t status
;
2083 gni_mem_handle_t msg_mem_hndl
;
2084 gni_smsg_attr_t
*smsg_attr
;
2085 gni_smsg_attr_t
*remote_smsg_attr
;
2087 CONTROL_MSG
*control_msg_tmp
, *header_tmp
;
2088 uint64_t source_addr
;
2089 SMSG_QUEUE
*queue
= &smsg_queue
;
2092 cmidirectMsg
*direct_msg
;
2094 #if CMI_PUMPNETWORKSMSG_CAP
2096 while(recv_cnt
< PumpNetworkSmsg_cap
) {
2100 CMI_GNI_LOCK(smsg_rx_cq_lock
)
2101 status
=GNI_CqGetEvent(smsg_rx_cqh
, &event_data
);
2102 CMI_GNI_UNLOCK(smsg_rx_cq_lock
)
2103 if(status
!= GNI_RC_SUCCESS
) break;
2105 inst_id
= GNI_CQ_GET_INST_ID(event_data
);
2107 inst_id
= GET_RANK(inst_id
); /* important */
2109 // GetEvent returns success but GetNext return not_done. caused by Smsg out-of-order transfer
2111 printf("[%d] %d PumpNetworkMsgs is received from PE: %d, status=%s\n", myrank
, CmiMyRank(), inst_id
, gni_err_str
[status
]);
2113 if (useDynamicSMSG
) {
2114 /* subtle: smsg may come before connection is setup */
2115 while (smsg_connected_flag
[inst_id
] != 2)
2116 PumpDatagramConnection();
2118 msg_tag
= GNI_SMSG_ANY_TAG
;
2120 CMI_GNI_LOCK(smsg_mailbox_lock
)
2121 status
= GNI_SmsgGetNextWTag(ep_hndl_array
[inst_id
], &header
, &msg_tag
);
2122 if (status
!= GNI_RC_SUCCESS
)
2124 CMI_GNI_UNLOCK(smsg_mailbox_lock
)
2127 #if CMI_PUMPNETWORKSMSG_CAP
2131 printf("[%d] from %d smsg msg is received, messageid: tag=%d\n", myrank
, inst_id
, msg_tag
);
2133 /* copy msg out and then put into queue (small message) */
2135 case SMALL_DATA_TAG
:
2138 msg_nbytes
= CmiGetMsgSize(header
);
2139 msg_data
= CmiAlloc(msg_nbytes
);
2140 memcpy(msg_data
, (char*)header
, msg_nbytes
);
2141 GNI_SmsgRelease(ep_hndl_array
[inst_id
]);
2142 CMI_GNI_UNLOCK(smsg_mailbox_lock
)
2143 TRACE_COMM_CREATION(EVENT_TIME(), msg_data
);
2144 CMI_CHECK_CHECKSUM(msg_data
, msg_nbytes
);
2145 handleOneRecvedMsg(msg_nbytes
, msg_data
);
2149 case LMSG_OOB_INIT_TAG
:
2151 tmp_queue
= (msg_tag
== LMSG_INIT_TAG
)? sendRdmaBuf
: sendHighPriorBuf
;
2152 #if MULTI_THREAD_SEND
2153 MallocControlMsg(control_msg_tmp
);
2154 memcpy(control_msg_tmp
, header
, CONTROL_MSG_SIZE
);
2155 GNI_SmsgRelease(ep_hndl_array
[inst_id
]);
2156 CMI_GNI_UNLOCK(smsg_mailbox_lock
)
2157 getLargeMsgRequest(control_msg_tmp
, inst_id
, msg_tag
, tmp_queue
);
2158 FreeControlMsg(control_msg_tmp
);
2160 CMI_GNI_UNLOCK(smsg_mailbox_lock
)
2161 getLargeMsgRequest(header
, inst_id
, msg_tag
, tmp_queue
);
2162 GNI_SmsgRelease(ep_hndl_array
[inst_id
]);
2166 #if !REMOTE_EVENT && !CQWRITE
2167 case ACK_TAG
: //msg fit into mempool
2169 /* Get is done, release message . Now put is not used yet*/
2170 void *msg
= (void*)(((ACK_MSG
*)header
)->source_addr
);
2171 GNI_SmsgRelease(ep_hndl_array
[inst_id
]);
2172 CMI_GNI_UNLOCK(smsg_mailbox_lock
)
2173 #if ! USE_LRTS_MEMPOOL
2174 MEMORY_DEREGISTER(onesided_hnd
, nic_hndl
, &(((ACK_MSG
*)header
)->source_mem_hndl
), &omdh
, ((ACK_MSG
*)header
)->length
);
2176 DecreaseMsgInSend(msg
);
2178 if(NoMsgInSend(msg
))
2179 buffered_send_msg
-= GetMempoolsize(msg
);
2180 MACHSTATE5(8, "GO send done to %d (%d,%d, %d) tag=%d\n", inst_id
, buffered_send_msg
, buffered_recv_msg
, register_memory_size
, msg_tag
);
2182 #if CMI_EXERT_SEND_LARGE_CAP
2183 SEND_large_pending
--;
2188 case BIG_MSG_TAG
: //big msg, de-register, transfer next seg
2190 #if MULTI_THREAD_SEND
2191 MallocControlMsg(header_tmp
);
2192 memcpy(header_tmp
, header
, CONTROL_MSG_SIZE
);
2193 GNI_SmsgRelease(ep_hndl_array
[inst_id
]);
2195 header_tmp
= (CONTROL_MSG
*) header
;
2197 CMI_GNI_UNLOCK(smsg_mailbox_lock
)
2198 #if CMI_EXERT_SEND_LARGE_CAP
2199 SEND_large_pending
--;
2201 void *msg
= (void*)(header_tmp
->source_addr
);
2202 int cur_seq
= CmiGetMsgSeq(msg
);
2203 int offset
= ONE_SEG
*(cur_seq
+1);
2204 MEMORY_DEREGISTER(onesided_hnd
, nic_hndl
, &(header_tmp
->source_mem_hndl
), &omdh
, header_tmp
->length
);
2205 buffered_send_msg
-= header_tmp
->length
;
2206 int remain_size
= CmiGetMsgSize(msg
) - header_tmp
->length
;
2207 if (remain_size
< 0) remain_size
= 0;
2208 CmiSetMsgSize(msg
, remain_size
);
2209 if(remain_size
<= 0) //transaction done
2212 }else if (header_tmp
->total_length
> offset
)
2214 CmiSetMsgSeq(msg
, cur_seq
+1);
2215 control_msg_tmp
= construct_control_msg(header_tmp
->total_length
, msg
, cur_seq
+1+1);
2216 control_msg_tmp
->dest_addr
= header_tmp
->dest_addr
;
2218 send_large_messages( queue
, inst_id
, control_msg_tmp
, 0, NULL
, LMSG_INIT_TAG
);
2220 if (header_tmp
->seq_id
== 1) {
2222 for (i
=1; i
<BIG_MSG_PIPELINE
; i
++) {
2223 int seq
= cur_seq
+i
+2;
2224 CmiSetMsgSeq(msg
, seq
-1);
2225 control_msg_tmp
= construct_control_msg(header_tmp
->total_length
, (char *)msg
, seq
);
2226 control_msg_tmp
->dest_addr
= header_tmp
->dest_addr
;
2227 send_large_messages( queue
, inst_id
, control_msg_tmp
, 0, NULL
, LMSG_INIT_TAG
);
2228 if (header_tmp
->total_length
<= ONE_SEG
*seq
) break;
2232 #if MULTI_THREAD_SEND
2233 FreeControlMsg(header_tmp
);
2235 GNI_SmsgRelease(ep_hndl_array
[inst_id
]);
2239 #if CMK_PERSISTENT_COMM && !REMOTE_EVENT && !CQWRITE
2240 case PUT_DONE_TAG
: { //persistent message
2241 void *msg
= (void *)(((CONTROL_MSG
*) header
)->source_addr
);
2242 int size
= ((CONTROL_MSG
*) header
)->length
;
2243 GNI_SmsgRelease(ep_hndl_array
[inst_id
]);
2244 CMI_GNI_UNLOCK(smsg_mailbox_lock
)
2246 CMI_CHECK_CHECKSUM(msg
, size
);
2247 handleOneRecvedMsg(size
, msg
);
2249 printf("[%d] PUT_DONE_TAG hand over one message, size: %d. \n", myrank
, size
);
2255 case DIRECT_PUT_DONE_TAG
: //cmi direct
2256 //create a trigger message
2257 direct_msg
= (cmidirectMsg
*)CmiAlloc(sizeof(cmidirectMsg
));
2258 direct_msg
->handler
= ((CMK_DIRECT_HEADER
*)header
)->handler_addr
;
2259 GNI_SmsgRelease(ep_hndl_array
[inst_id
]);
2260 CMI_GNI_UNLOCK(smsg_mailbox_lock
)
2261 CmiSetHandler(direct_msg
, CpvAccess(CmiHandleDirectIdx
));
2262 CmiPushPE(((CmiDirectUserHandle
*)direct_msg
->handler
)->remoteRank
, direct_msg
);
2263 //(*(((CMK_DIRECT_HEADER*) header)->callbackFnPtr))(((CMK_DIRECT_HEADER*) header)->callbackData);
2267 GNI_SmsgRelease(ep_hndl_array
[inst_id
]);
2268 CMI_GNI_UNLOCK(smsg_mailbox_lock
)
2269 printf("weird tag problem\n");
2270 CmiAbort("Unknown tag\n");
2273 printf("[%d] from %d after switch request for smsg is received, messageid: tag=%d\n", myrank
, inst_id
, msg_tag
);
2276 msg_tag
= GNI_SMSG_ANY_TAG
;
2277 } //endwhile GNI_SmsgGetNextWTag
2278 } //end while GetEvent
2279 if(status
== GNI_RC_ERROR_RESOURCE
)
2281 printf("charm> Please use +useRecvQueue 204800 in your command line, if the error comes again, increase this number\n");
2282 GNI_RC_CHECK("Smsg_rx_cq full", status
);
2286 static void printDesc(gni_post_descriptor_t
*pd
)
2288 printf(" Descriptor (%p===>%p)(%d)\n", pd
->local_addr
, pd
->remote_addr
, pd
->length
);
2292 static void sendCqWrite(int destNode
, uint64_t data
, gni_mem_handle_t mem_hndl
)
2294 gni_post_descriptor_t
*pd
;
2295 gni_return_t status
= GNI_RC_SUCCESS
;
2298 pd
->type
= GNI_POST_CQWRITE
;
2299 pd
->cq_mode
= GNI_CQMODE_SILENT
;
2300 //pd->cq_mode = GNI_CQMODE_GLOBAL_EVENT | GNI_CQMODE_REMOTE_EVENT ;
2301 pd
->dlvr_mode
= GNI_DLVMODE_PERFORMANCE
;
2302 pd
->cqwrite_value
= data
;
2303 pd
->remote_mem_hndl
= mem_hndl
;
2304 status
= GNI_PostCqWrite(ep_hndl_array
[destNode
], pd
);
2305 GNI_RC_CHECK("GNI_PostCqWrite", status
);
2309 // register memory for a message
2310 // return mem handle
2311 static gni_return_t
registerMessage(void *msg
, int size
, int seqno
, gni_mem_handle_t
*memh
)
2313 gni_return_t status
= GNI_RC_SUCCESS
;
2315 if (!IsMemHndlZero(*memh
)) return GNI_RC_SUCCESS
;
2317 #if CMK_PERSISTENT_COMM
2318 // persistent message is always registered
2319 // BIG_MSG small pieces do not have malloc chunk header
2320 if (IS_PERSISTENT_MEMORY(msg
)) {
2321 *memh
= GetMemHndl(msg
);
2322 return GNI_RC_SUCCESS
;
2326 #if CMK_PERSISTENT_COMM
2327 || seqno
== PERSIST_SEQ
2331 if(IsMemHndlZero((GetMemHndl(msg
))))
2334 status
= registerMemory(GetMempoolBlockPtr(msg
), GetMempoolsize(msg
), &(GetMemHndl(msg
)), rdma_rx_cqh
);
2335 if(status
== GNI_RC_SUCCESS
)
2336 *memh
= GetMemHndl(msg
);
2339 *memh
= GetMemHndl(msg
);
2343 //big msg, can not fit into memory pool, or CmiDirect Msg (which is not from mempool)
2344 status
= registerMemory(msg
, size
, memh
, NULL
);
2349 // for BIG_MSG called on receiver side for receiving control message
2351 static void getLargeMsgRequest(void* header
, uint64_t inst_id
, uint8_t tag
, PCQueue bufferRdmaQueue
)
2353 #if USE_LRTS_MEMPOOL
2354 CONTROL_MSG
*request_msg
;
2355 gni_return_t status
= GNI_RC_SUCCESS
;
2357 gni_post_descriptor_t
*pd
;
2358 gni_mem_handle_t msg_mem_hndl
;
2359 int size
, transaction_size
, offset
= 0;
2360 size_t register_size
= 0;
2362 // initial a get to transfer data from the sender side */
2363 request_msg
= (CONTROL_MSG
*) header
;
2364 size
= request_msg
->total_length
;
2365 MACHSTATE4(8, "GO Get request from %d (%d,%d, %d) \n", inst_id
, buffered_send_msg
, buffered_recv_msg
, register_memory_size
);
2368 pd
->sync_flag_addr
= 1000000 * CmiWallTimer(); //microsecond
2370 if(request_msg
->seq_id
< 2) {
2371 #if CMK_SMP_TRACE_COMMTHREAD
2372 pd
->sync_flag_addr
= 1000000 * CmiWallTimer(); //microsecond
2374 msg_data
= CmiAlloc(size
);
2375 CmiSetMsgSeq(msg_data
, 0);
2376 _MEMCHECK(msg_data
);
2379 offset
= ONE_SEG
*(request_msg
->seq_id
-1);
2380 msg_data
= (char*)request_msg
->dest_addr
+ offset
;
2383 pd
->cqwrite_value
= request_msg
->seq_id
;
2385 transaction_size
= request_msg
->seq_id
== 0? ALIGN64(size
) : ALIGN64(request_msg
->length
);
2386 SetMemHndlZero(pd
->local_mem_hndl
);
2387 status
= registerMessage(msg_data
, transaction_size
, request_msg
->seq_id
, &pd
->local_mem_hndl
);
2388 if (status
== GNI_RC_SUCCESS
&& request_msg
->seq_id
== 0) {
2389 if(NoMsgInRecv( (void*)(msg_data
)))
2390 register_size
= GetMempoolsize((void*)(msg_data
));
2393 pd
->first_operand
= ALIGN64(size
); // total length
2395 if(request_msg
->total_length
<= LRTS_GNI_RDMA_THRESHOLD
)
2396 pd
->type
= GNI_POST_FMA_GET
;
2398 pd
->type
= GNI_POST_RDMA_GET
;
2399 pd
->cq_mode
= GNI_CQMODE_GLOBAL_EVENT
;
2400 pd
->dlvr_mode
= GNI_DLVMODE_PERFORMANCE
;
2401 pd
->length
= transaction_size
;
2402 pd
->local_addr
= (uint64_t) msg_data
;
2403 pd
->remote_addr
= request_msg
->source_addr
+ offset
;
2404 pd
->remote_mem_hndl
= request_msg
->source_mem_hndl
;
2406 if (tag
== LMSG_OOB_INIT_TAG
)
2407 pd
->src_cq_hndl
= highprior_rdma_tx_cqh
;
2410 #if MULTI_THREAD_SEND
2411 pd
->src_cq_hndl
= rdma_tx_cqh
;
2413 pd
->src_cq_hndl
= 0;
2419 #if CMI_EXERT_RECV_RDMA_CAP
2420 if(status
== GNI_RC_SUCCESS
&& RDMA_pending
>= RDMA_cap
) status
= GNI_RC_ERROR_RESOURCE
;
2422 //memory registration success
2423 if(status
== GNI_RC_SUCCESS
&& tag
== LMSG_OOB_INIT_TAG
)
2425 CmiNodeLock lock
= pd
->type
== GNI_POST_RDMA_GET
?rdma_tx_cq_lock
:default_tx_cq_lock
;
2428 if( request_msg
->seq_id
== 0)
2430 pd
->cq_mode
|= GNI_CQMODE_REMOTE_EVENT
;
2431 int sts
= GNI_EpSetEventData(ep_hndl_array
[inst_id
], inst_id
, ACK_EVENT(request_msg
->ack_index
));
2432 GNI_RC_CHECK("GNI_EpSetEventData", sts
);
2437 RDMA_TRY_SEND(pd
->type
)
2439 if(pd
->type
== GNI_POST_RDMA_GET
)
2441 status
= GNI_PostRdma(ep_hndl_array
[inst_id
], pd
);
2445 status
= GNI_PostFma(ep_hndl_array
[inst_id
], pd
);
2447 CMI_GNI_UNLOCK(lock
)
2449 if(status
== GNI_RC_SUCCESS
)
2451 #if CMI_EXERT_RECV_RDMA_CAP
2454 if(pd
->cqwrite_value
== 0)
2456 #if MACHINE_DEBUG_LOG
2457 buffered_recv_msg
+= register_size
;
2458 MACHSTATE4(8, "GO request from %d (%d,%d, %d)\n", inst_id
, buffered_send_msg
, buffered_recv_msg
, register_memory_size
);
2460 IncreaseMsgInRecv(msg_data
);
2461 #if CMK_SMP_TRACE_COMMTHREAD
2462 pd
->sync_flag_value
= 1000000 * CmiWallTimer(); //microsecond
2466 pd
->sync_flag_value
= 1000000 * CmiWallTimer(); //microsecond
2467 RDMA_TRANS_INIT(pd
->type
, pd
->sync_flag_addr
/1000000.0)
2470 }else if (status
!= GNI_RC_SUCCESS
)
2472 SetMemHndlZero((pd
->local_mem_hndl
));
2474 if(status
== GNI_RC_ERROR_RESOURCE
|| status
== GNI_RC_ERROR_NOMEM
|| tag
!= LMSG_OOB_INIT_TAG
)
2477 bufferRdmaMsg(bufferRdmaQueue
, inst_id
, pd
, request_msg
->ack_index
);
2479 bufferRdmaMsg(bufferRdmaQueue
, inst_id
, pd
, -1);
2481 }else if (status
!= GNI_RC_SUCCESS
) {
2482 // printf("source: %d pd:(%p,%p)(%p,%p) len:%d local:%x remote:%x\n", (int)inst_id, (pd->local_mem_hndl).qword1, (pd->local_mem_hndl).qword2, (pd->remote_mem_hndl).qword1, (pd->remote_mem_hndl).qword2, pd->length, pd->local_addr, pd->remote_addr);
2483 GNI_RC_CHECK("GetLargeAFter posting", status
);
2486 CONTROL_MSG
*request_msg
;
2487 gni_return_t status
;
2489 gni_post_descriptor_t
*pd
;
2490 RDMA_REQUEST
*rdma_request_msg
;
2491 gni_mem_handle_t msg_mem_hndl
;
2493 // initial a get to transfer data from the sender side */
2494 request_msg
= (CONTROL_MSG
*) header
;
2495 msg_data
= CmiAlloc(request_msg
->length
);
2496 _MEMCHECK(msg_data
);
2498 MEMORY_REGISTER(onesided_hnd
, nic_hndl
, msg_data
, request_msg
->length
, &msg_mem_hndl
, &omdh
, NULL
, status
)
2500 if (status
== GNI_RC_INVALID_PARAM
|| status
== GNI_RC_PERMISSION_ERROR
)
2502 GNI_RC_CHECK("Invalid/permission Mem Register in post", status
);
2506 if(request_msg
->length
<= LRTS_GNI_RDMA_THRESHOLD
)
2507 pd
->type
= GNI_POST_FMA_GET
;
2509 pd
->type
= GNI_POST_RDMA_GET
;
2510 pd
->cq_mode
= GNI_CQMODE_GLOBAL_EVENT
;// | GNI_CQMODE_REMOTE_EVENT;
2511 pd
->dlvr_mode
= GNI_DLVMODE_PERFORMANCE
;
2512 pd
->length
= ALIGN64(request_msg
->length
);
2513 pd
->local_addr
= (uint64_t) msg_data
;
2514 pd
->remote_addr
= request_msg
->source_addr
;
2515 pd
->remote_mem_hndl
= request_msg
->source_mem_hndl
;
2516 if (tag
== LMSG_OOB_INIT_TAG
)
2517 pd
->src_cq_hndl
= highprior_rdma_tx_cqh
;
2520 #if MULTI_THREAD_SEND
2521 pd
->src_cq_hndl
= rdma_tx_cqh
;
2523 pd
->src_cq_hndl
= 0;
2529 //memory registration successful
2530 if(status
== GNI_RC_SUCCESS
)
2532 pd
->local_mem_hndl
= msg_mem_hndl
;
2534 if(pd
->type
== GNI_POST_RDMA_GET
)
2536 CMI_GNI_LOCK(rdma_tx_cq_lock
)
2537 status
= GNI_PostRdma(ep_hndl_array
[inst_id
], pd
);
2538 CMI_GNI_UNLOCK(rdma_tx_cq_lock
)
2542 CMI_GNI_LOCK(default_tx_cq_lock
)
2543 status
= GNI_PostFma(ep_hndl_array
[inst_id
], pd
);
2544 CMI_GNI_UNLOCK(default_tx_cq_lock
)
2549 SetMemHndlZero(pd
->local_mem_hndl
);
2551 if(status
== GNI_RC_ERROR_RESOURCE
|| status
== GNI_RC_ERROR_NOMEM
)
2553 MallocRdmaRequest(rdma_request_msg
);
2554 rdma_request_msg
->next
= 0;
2555 rdma_request_msg
->destNode
= inst_id
;
2556 rdma_request_msg
->pd
= pd
;
2557 PCQueuePush(sendRdmaBuf
, (char*)rdma_request_msg
);
2559 GNI_RC_CHECK("AFter posting", status
);
2565 static void PumpCqWriteTransactions()
2569 gni_return_t status
;
2573 //CMI_GNI_LOCK(my_cq_lock)
2574 status
= GNI_CqGetEvent(rdma_rx_cqh
, &ev
);
2575 //CMI_GNI_UNLOCK(my_cq_lock)
2576 if(status
!= GNI_RC_SUCCESS
) break;
2577 msg
= (void*) ( GNI_CQ_GET_DATA(ev
) & 0xFFFFFFFFFFFFL
);
2578 #if CMK_PERSISTENT_COMM
2580 printf(" %d CQ write event %p\n", myrank
, msg
);
2582 if (!IsMemHndlZero(MEMHFIELD(msg
))) {
2584 printf(" %d Persistent CQ write event %p\n", myrank
, msg
);
2587 msg_size
= CmiGetMsgSize(msg
);
2588 CMI_CHECK_CHECKSUM(msg
, msg_size
);
2589 handleOneRecvedMsg(msg_size
, msg
);
2593 #if ! USE_LRTS_MEMPOOL
2594 // MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((ACK_MSG *)header)->source_mem_hndl), &omdh, ((ACK_MSG *)header)->length);
2596 DecreaseMsgInSend(msg
);
2598 if(NoMsgInSend(msg
))
2599 buffered_send_msg
-= GetMempoolsize(msg
);
2602 if(status
== GNI_RC_ERROR_RESOURCE
)
2604 GNI_RC_CHECK("rdma_rx_cq full too many ack", status
);
2610 static void PumpRemoteTransactions(gni_cq_handle_t rx_cqh
)
2613 gni_return_t status
;
2615 int inst_id
, index
, type
, size
;
2617 #if CMI_PUMPREMOTETRANSACTIONS_CAP
2621 #if CMI_PUMPREMOTETRANSACTIONS_CAP
2622 if (pump_count
> PumpRemoteTransactions_cap
) break;
2624 CMI_GNI_LOCK(global_gni_lock
)
2625 // CMI_GNI_LOCK(rdma_tx_cq_lock)
2626 status
= GNI_CqGetEvent(rx_cqh
, &ev
);
2627 // CMI_GNI_UNLOCK(rdma_tx_cq_lock)
2628 CMI_GNI_UNLOCK(global_gni_lock
)
2630 if(status
!= GNI_RC_SUCCESS
) break;
2632 #if CMI_PUMPREMOTETRANSACTIONS_CAP
2636 inst_id
= GNI_CQ_GET_INST_ID(ev
);
2637 index
= GET_INDEX(inst_id
);
2638 type
= GET_TYPE(inst_id
);
2641 CmiAssert(index
>=0 && index
<ackPool
.size
);
2642 CMI_GNI_LOCK(ackPool
.lock
);
2643 CmiAssert(GetIndexType(ackPool
, index
) == 1);
2644 msg
= GetIndexAddress(ackPool
, index
);
2645 CMI_GNI_UNLOCK(ackPool
.lock
);
2647 printf("[%d] PumpRemoteTransactions: ack: %p index: %d type: %d.\n", myrank
, GetMempoolBlockPtr(msg
), index
, type
);
2649 #if ! USE_LRTS_MEMPOOL
2650 // MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((ACK_MSG *)header)->source_mem_hndl), &omdh, ((ACK_MSG *)header)->length);
2652 DecreaseMsgInSend(msg
);
2654 if(NoMsgInSend(msg
))
2655 buffered_send_msg
-= GetMempoolsize(msg
);
2657 IndexPool_freeslot(&ackPool
, index
);
2658 #if CMI_EXERT_SEND_LARGE_CAP
2659 SEND_large_pending
--;
2662 #if CMK_PERSISTENT_COMM
2663 case 1: { // PERSISTENT
2664 CmiLock(persistPool
.lock
);
2665 CmiAssert(GetIndexType(persistPool
, index
) == 2);
2666 PersistentReceivesTable
*slot
= GetIndexAddress(persistPool
, index
);
2667 CmiUnlock(persistPool
.lock
);
2669 msg
= slot
->destBuf
[0].destAddress
;
2670 size
= CmiGetMsgSize(msg
);
2672 CMI_CHECK_CHECKSUM(msg
, size
);
2673 TRACE_COMM_CREATION(EVENT_TIME(), msg
);
2674 handleOneRecvedMsg(size
, msg
);
2679 fprintf(stderr
, "[%d] PumpRemoteTransactions: unknown type: %d\n", myrank
, type
);
2680 CmiAbort("PumpRemoteTransactions: unknown type");
2683 if(status
== GNI_RC_ERROR_RESOURCE
)
2685 GNI_RC_CHECK("rdma_rx_cq full too many ack", status
);
2690 static void PumpLocalTransactions(gni_cq_handle_t my_tx_cqh
, CmiNodeLock my_cq_lock
)
2693 gni_return_t status
;
2694 uint64_t type
, inst_id
;
2695 gni_post_descriptor_t
*tmp_pd
;
2697 CONTROL_MSG
*ack_msg_tmp
;
2701 CMK_DIRECT_HEADER
*cmk_direct_done_msg
;
2703 SMSG_QUEUE
*queue
= &smsg_queue
;
2704 #if CMI_PUMPLOCALTRANSACTIONS_CAP
2706 while(pump_count
< PumpLocalTransactions_cap
) {
2711 CMI_GNI_LOCK(my_cq_lock
)
2712 status
= GNI_CqGetEvent(my_tx_cqh
, &ev
);
2713 CMI_GNI_UNLOCK(my_cq_lock
)
2714 if(status
!= GNI_RC_SUCCESS
) break;
2716 type
= GNI_CQ_GET_TYPE(ev
);
2717 if (type
== GNI_CQ_EVENT_TYPE_POST
)
2720 #if CMI_EXERT_RECV_RDMA_CAP
2721 if(RDMA_pending
<=0) CmiAbort(" pending error\n");
2724 inst_id
= GNI_CQ_GET_INST_ID(ev
);
2726 printf("[%d] LocalTransactions localdone=%d\n", myrank
, lrts_local_done_msg
);
2728 CMI_GNI_LOCK(my_cq_lock
)
2729 status
= GNI_GetCompleted(my_tx_cqh
, ev
, &tmp_pd
);
2730 CMI_GNI_UNLOCK(my_cq_lock
)
2732 switch (tmp_pd
->type
) {
2733 #if CMK_PERSISTENT_COMM || CMK_DIRECT
2734 case GNI_POST_RDMA_PUT
:
2735 #if CMK_PERSISTENT_COMM && ! USE_LRTS_MEMPOOL
2736 MEMORY_DEREGISTER(onesided_hnd
, nic_hndl
, &tmp_pd
->local_mem_hndl
, &omdh
, tmp_pd
->length
);
2738 case GNI_POST_FMA_PUT
:
2739 if(tmp_pd
->amo_cmd
== 1) {
2741 //sender ACK to receiver to trigger it is done
2742 cmk_direct_done_msg
= (CMK_DIRECT_HEADER
*) malloc(sizeof(CMK_DIRECT_HEADER
));
2743 cmk_direct_done_msg
->handler_addr
= tmp_pd
->first_operand
;
2744 msg_tag
= DIRECT_PUT_DONE_TAG
;
2748 CmiFree((void *)tmp_pd
->local_addr
);
2750 FreePostDesc(tmp_pd
);
2753 sendCqWrite(inst_id
, tmp_pd
->remote_addr
, tmp_pd
->remote_mem_hndl
);
2754 FreePostDesc(tmp_pd
);
2757 MallocControlMsg(ack_msg_tmp
);
2758 ack_msg_tmp
->source_addr
= tmp_pd
->remote_addr
;
2759 ack_msg_tmp
->source_mem_hndl
= tmp_pd
->remote_mem_hndl
;
2760 ack_msg_tmp
->length
= tmp_pd
->length
;
2761 msg_tag
= PUT_DONE_TAG
;
2766 case GNI_POST_RDMA_GET
:
2767 case GNI_POST_FMA_GET
: {
2768 #if ! USE_LRTS_MEMPOOL
2769 MallocControlMsg(ack_msg_tmp
);
2770 ack_msg_tmp
->source_addr
= tmp_pd
->remote_addr
;
2771 ack_msg_tmp
->source_mem_hndl
= tmp_pd
->remote_mem_hndl
;
2772 MEMORY_DEREGISTER(onesided_hnd
, nic_hndl
, &tmp_pd
->local_mem_hndl
, &omdh
, tmp_pd
->length
)
2776 RDMA_TRANS_DONE(tmp_pd
->sync_flag_value
/1000000.0)
2778 int seq_id
= tmp_pd
->cqwrite_value
;
2779 if(seq_id
> 0) // BIG_MSG
2781 MEMORY_DEREGISTER(onesided_hnd
, nic_hndl
, &tmp_pd
->local_mem_hndl
, &omdh
, tmp_pd
->length
);
2782 MallocControlMsg(ack_msg_tmp
);
2783 ack_msg_tmp
->source_addr
= tmp_pd
->remote_addr
;
2784 ack_msg_tmp
->source_mem_hndl
= tmp_pd
->remote_mem_hndl
;
2785 ack_msg_tmp
->seq_id
= seq_id
;
2786 ack_msg_tmp
->dest_addr
= tmp_pd
->local_addr
- ONE_SEG
*(ack_msg_tmp
->seq_id
-1);
2787 ack_msg_tmp
->source_addr
-= ONE_SEG
*(ack_msg_tmp
->seq_id
-1);
2788 ack_msg_tmp
->length
= tmp_pd
->length
;
2789 ack_msg_tmp
->total_length
= tmp_pd
->first_operand
; // total size
2790 msg_tag
= BIG_MSG_TAG
;
2795 #if !REMOTE_EVENT && !CQWRITE
2796 MallocAckMsg(ack_msg
);
2797 ack_msg
->source_addr
= tmp_pd
->remote_addr
;
2803 case GNI_POST_CQWRITE
:
2804 FreePostDesc(tmp_pd
);
2807 CmiPrintf("type=%d\n", tmp_pd
->type
);
2808 CmiAbort("PumpLocalTransactions: unknown type!");
2809 } /* end of switch */
2812 if (tmp_pd
->amo_cmd
== 1) {
2813 status
= send_smsg_message(queue
, inst_id
, cmk_direct_done_msg
, sizeof(CMK_DIRECT_HEADER
), msg_tag
, 0, NULL
);
2814 if (status
== GNI_RC_SUCCESS
) free(cmk_direct_done_msg
);
2818 if (msg_tag
== ACK_TAG
) {
2821 status
= send_smsg_message(queue
, inst_id
, ack_msg
, ACK_MSG_SIZE
, msg_tag
, 0, NULL
);
2822 if (status
== GNI_RC_SUCCESS
) FreeAckMsg(ack_msg
);
2824 sendCqWrite(inst_id
, tmp_pd
->remote_addr
, tmp_pd
->remote_mem_hndl
);
2829 status
= send_smsg_message(queue
, inst_id
, ack_msg_tmp
, CONTROL_MSG_SIZE
, msg_tag
, 0, NULL
);
2830 if (status
== GNI_RC_SUCCESS
) FreeControlMsg(ack_msg_tmp
);
2832 #if CMK_PERSISTENT_COMM
2833 if (tmp_pd
->type
== GNI_POST_RDMA_GET
|| tmp_pd
->type
== GNI_POST_FMA_GET
)
2836 if( msg_tag
== ACK_TAG
){ //msg fit in mempool
2838 printf("PumpLocalTransactions: Normal msg transaction PE:%d==>%d\n", myrank
, inst_id
);
2840 TRACE_COMM_CONTROL_CREATION((double)(tmp_pd
->sync_flag_addr
/1000000.0), (double)((tmp_pd
->sync_flag_addr
+1)/1000000.0), (double)((tmp_pd
->sync_flag_addr
+1)/1000000.0), (void*)tmp_pd
->local_addr
);
2841 TRACE_COMM_CONTROL_CREATION((double)(tmp_pd
->sync_flag_value
/1000000.0), (double)((tmp_pd
->sync_flag_value
+1)/1000000.0), (double)((tmp_pd
->sync_flag_value
+1)/1000000.0), (void*)tmp_pd
->local_addr
);
2844 CmiAssert(SIZEFIELD((void*)(tmp_pd
->local_addr
)) <= tmp_pd
->length
);
2845 DecreaseMsgInRecv((void*)tmp_pd
->local_addr
);
2846 #if MACHINE_DEBUG_LOG
2847 if(NoMsgInRecv((void*)(tmp_pd
->local_addr
)))
2848 buffered_recv_msg
-= GetMempoolsize((void*)(tmp_pd
->local_addr
));
2849 MACHSTATE5(8, "GO Recv done ack send from %d (%d,%d, %d) tag=%d\n", inst_id
, buffered_send_msg
, buffered_recv_msg
, register_memory_size
, msg_tag
);
2851 TRACE_COMM_CREATION(EVENT_TIME(), (void*)tmp_pd
->local_addr
);
2852 CMI_CHECK_CHECKSUM((void*)tmp_pd
->local_addr
, tmp_pd
->length
);
2853 handleOneRecvedMsg(tmp_pd
->length
, (void*)tmp_pd
->local_addr
);
2854 }else if(msg_tag
== BIG_MSG_TAG
){
2855 void *msg
= (char*)tmp_pd
->local_addr
-(tmp_pd
->cqwrite_value
-1)*ONE_SEG
;
2856 CmiSetMsgSeq(msg
, CmiGetMsgSeq(msg
)+1);
2857 if (tmp_pd
->first_operand
<= ONE_SEG
*CmiGetMsgSeq(msg
)) {
2860 printf("Pipeline msg done [%d]\n", myrank
);
2862 #if CMK_SMP_TRACE_COMMTHREAD
2863 if( tmp_pd
->cqwrite_value
== 1)
2864 TRACE_COMM_CONTROL_CREATION((double)(tmp_pd
->sync_flag_addr
/1000000.0), (double)((tmp_pd
->sync_flag_addr
+1)/1000000.0), (double)((tmp_pd
->sync_flag_addr
+2)/1000000.0), (void*)tmp_pd
->local_addr
);
2866 TRACE_COMM_CREATION(EVENT_TIME(), msg
);
2867 CMI_CHECK_CHECKSUM(msg
, tmp_pd
->first_operand
);
2868 handleOneRecvedMsg(tmp_pd
->first_operand
, msg
);
2872 FreePostDesc(tmp_pd
);
2875 if(status
== GNI_RC_ERROR_RESOURCE
)
2877 printf("charm> Please use +useSendQueue 204800 in your command line, if the error comes again, increase this number\n");
2878 GNI_RC_CHECK("Smsg_tx_cq full", status
);
2882 static void SendRdmaMsg( BufferList sendqueue
)
2884 gni_return_t status
= GNI_RC_SUCCESS
;
2885 gni_mem_handle_t msg_mem_hndl
;
2886 RDMA_REQUEST
*ptr
= 0, *tmp_ptr
;
2887 RDMA_REQUEST
*pre
= 0;
2888 uint64_t register_size
= 0;
2892 int len
= PCQueueLength(sendqueue
);
2893 for (i
=0; i
<len
; i
++)
2895 #if CMI_EXERT_RECV_RDMA_CAP
2896 if( RDMA_pending
>= RDMA_cap
) break;
2898 CMI_PCQUEUEPOP_LOCK( sendqueue
)
2899 ptr
= (RDMA_REQUEST
*)PCQueuePop(sendqueue
);
2900 CMI_PCQUEUEPOP_UNLOCK( sendqueue
)
2901 if (ptr
== NULL
) break;
2903 MACHSTATE4(8, "noempty-rdma %d (%lld,%lld,%d) \n", ptr
->destNode
, buffered_send_msg
, buffered_recv_msg
, register_memory_size
);
2904 gni_post_descriptor_t
*pd
= ptr
->pd
;
2906 msg
= (void*)(pd
->local_addr
);
2907 status
= registerMessage(msg
, pd
->length
, pd
->cqwrite_value
, &pd
->local_mem_hndl
);
2909 if(pd
->cqwrite_value
== 0) {
2910 if(NoMsgInRecv(msg
))
2911 register_size
= GetMempoolsize(msg
);
2914 if(status
== GNI_RC_SUCCESS
) //mem register good
2916 int destNode
= ptr
->destNode
;
2917 CmiNodeLock lock
= (pd
->type
== GNI_POST_RDMA_GET
|| pd
->type
== GNI_POST_RDMA_PUT
) ? rdma_tx_cq_lock
:default_tx_cq_lock
;
2920 if( pd
->cqwrite_value
== 0) {
2921 pd
->cq_mode
|= GNI_CQMODE_REMOTE_EVENT
;
2922 int sts
= GNI_EpSetEventData(ep_hndl_array
[destNode
], destNode
, ACK_EVENT(ptr
->ack_index
));
2923 GNI_RC_CHECK("GNI_EpSetEventData", sts
);
2925 #if CMK_PERSISTENT_COMM
2926 else if (pd
->cqwrite_value
== PERSIST_SEQ
) {
2927 pd
->cq_mode
|= GNI_CQMODE_REMOTE_EVENT
;
2928 int sts
= GNI_EpSetEventData(ep_hndl_array
[destNode
], destNode
, PERSIST_EVENT(ptr
->ack_index
));
2929 GNI_RC_CHECK("GNI_EpSetEventData", sts
);
2934 RDMA_TRY_SEND(pd
->type
)
2936 #if CMK_SMP_TRACE_COMMTHREAD
2937 if(IS_PUT(pd
->type
))
2940 TRACE_COMM_CREATION(EVENT_TIME(), (void*)pd
->local_addr
);//based on assumption, post always succeeds on first try
2944 if(pd
->type
== GNI_POST_RDMA_GET
|| pd
->type
== GNI_POST_RDMA_PUT
)
2946 status
= GNI_PostRdma(ep_hndl_array
[destNode
], pd
);
2950 status
= GNI_PostFma(ep_hndl_array
[destNode
], pd
);
2952 CMI_GNI_UNLOCK(lock
);
2954 if(status
== GNI_RC_SUCCESS
) //post good
2956 #if CMI_EXERT_RECV_RDMA_CAP
2959 if(pd
->cqwrite_value
== 0)
2961 #if CMK_SMP_TRACE_COMMTHREAD
2962 pd
->sync_flag_value
= 1000000 * CmiWallTimer(); //microsecond
2964 IncreaseMsgInRecv(((void*)(pd
->local_addr
)));
2967 pd
->sync_flag_value
= 1000000 * CmiWallTimer(); //microsecond
2968 RDMA_TRANS_INIT(pd
->type
, pd
->sync_flag_addr
/1000000.0)
2970 #if MACHINE_DEBUG_LOG
2971 buffered_recv_msg
+= register_size
;
2972 MACHSTATE(8, "GO request from buffered\n");
2975 printf("[%d] SendRdmaMsg: post succeed. seqno: %x\n", myrank
, pd
->cqwrite_value
);
2977 }else // cannot post
2979 PCQueuePush(sendRdmaBuf
, (char*)ptr
);
2981 printf("[%d] SendRdmaMsg: post failed. seqno: %x dest: %d local mhdl: %lld %lld remote mhdl: %lld %lld connect: %d\n", myrank
, pd
->cqwrite_value
, destNode
, pd
->local_mem_hndl
.qword1
, pd
->local_mem_hndl
.qword2
, pd
->remote_mem_hndl
.qword1
, pd
->remote_mem_hndl
.qword2
, smsg_connected_flag
[destNode
]);
2985 } else //memory registration fails
2987 PCQueuePush(sendqueue
, (char*)ptr
);
2993 inline gni_return_t
_sendOneBufferedSmsg(SMSG_QUEUE
*queue
, MSG_LIST
*ptr
)
2995 CONTROL_MSG
*control_msg_tmp
;
2996 gni_return_t status
= GNI_RC_ERROR_RESOURCE
;
2998 MACHSTATE5(8, "noempty-smsg %d (%d,%d,%d) tag=%d \n", ptr
->destNode
, buffered_send_msg
, buffered_recv_msg
, register_memory_size
, ptr
->tag
);
2999 if (useDynamicSMSG
&& smsg_connected_flag
[ptr
->destNode
] != 2) {
3000 /* connection not exists yet */
3002 /* non-smp case, connect is issued in send_smsg_message */
3003 if (smsg_connected_flag
[ptr
->destNode
] == 0)
3004 connect_to(ptr
->destNode
);
3010 case SMALL_DATA_TAG
:
3011 status
= send_smsg_message(queue
, ptr
->destNode
, ptr
->msg
, ptr
->size
, ptr
->tag
, 1, ptr
);
3012 if(status
== GNI_RC_SUCCESS
)
3018 case LMSG_OOB_INIT_TAG
:
3019 control_msg_tmp
= (CONTROL_MSG
*)ptr
->msg
;
3020 status
= send_large_messages(queue
, ptr
->destNode
, control_msg_tmp
, 1, ptr
, ptr
->tag
);
3022 #if !REMOTE_EVENT && !CQWRITE
3024 status
= send_smsg_message(queue
, ptr
->destNode
, ptr
->msg
, ptr
->size
, ptr
->tag
, 1, ptr
);
3025 if(status
== GNI_RC_SUCCESS
) FreeAckMsg((ACK_MSG
*)ptr
->msg
);
3029 status
= send_smsg_message(queue
, ptr
->destNode
, ptr
->msg
, ptr
->size
, ptr
->tag
, 1, ptr
);
3030 if(status
== GNI_RC_SUCCESS
)
3032 FreeControlMsg((CONTROL_MSG
*)ptr
->msg
);
3035 #if CMK_PERSISTENT_COMM && !REMOTE_EVENT && !CQWRITE
3037 status
= send_smsg_message(queue
, ptr
->destNode
, ptr
->msg
, ptr
->size
, ptr
->tag
, 1, ptr
);
3038 if(status
== GNI_RC_SUCCESS
)
3040 FreeControlMsg((CONTROL_MSG
*)ptr
->msg
);
3045 case DIRECT_PUT_DONE_TAG
:
3046 status
= send_smsg_message(queue
, ptr
->destNode
, ptr
->msg
, sizeof(CMK_DIRECT_HEADER
), ptr
->tag
, 1, ptr
);
3047 if(status
== GNI_RC_SUCCESS
)
3049 free((CMK_DIRECT_HEADER
*)ptr
->msg
);
3054 printf("Weird tag\n");
3055 CmiAbort("should not happen\n");
3060 // return 1 if all messages are sent
3064 static int SendBufferMsg(SMSG_QUEUE
*queue
, SMSG_QUEUE
*prio_queue
)
3066 MSG_LIST
*ptr
, *tmp_ptr
, *pre
=0, *current_head
;
3067 CONTROL_MSG
*control_msg_tmp
;
3068 gni_return_t status
;
3070 uint64_t register_size
;
3071 void *register_addr
;
3072 int index_previous
= -1;
3073 #if CMI_SENDBUFFERSMSG_CAP
3074 int sent_length
= 0;
3077 memset(destpe_avail
, 0, mysize
* sizeof(char));
3078 for (index
=0; index
<1; index
++)
3080 int i
, len
= PCQueueLength(queue
->sendMsgBuf
);
3081 for (i
=0; i
<len
; i
++)
3083 CMI_PCQUEUEPOP_LOCK(queue
->sendMsgBuf
)
3084 ptr
= (MSG_LIST
*)PCQueuePop(queue
->sendMsgBuf
);
3085 CMI_PCQUEUEPOP_UNLOCK(queue
->sendMsgBuf
)
3086 if(ptr
== NULL
) break;
3087 if (destpe_avail
[ptr
->destNode
] == 1) { /* can't send to this pe */
3088 PCQueuePush(queue
->sendMsgBuf
, (char*)ptr
);
3091 status
= _sendOneBufferedSmsg(queue
, ptr
);
3092 #if CMI_SENDBUFFERSMSG_CAP
3095 if(status
== GNI_RC_SUCCESS
)
3098 buffered_smsg_counter
--;
3099 printf("[%d==>%d] buffered smsg sending done\n", myrank
, ptr
->destNode
);
3103 PCQueuePush(queue
->sendMsgBuf
, (char*)ptr
);
3105 if(status
== GNI_RC_ERROR_RESOURCE
)
3107 destpe_avail
[ptr
->destNode
] = 1;
3111 } // end pooling for all cores
3115 #else /* ! ONE_SEND_QUEUE */
3117 static int SendBufferMsg(SMSG_QUEUE
*queue
, SMSG_QUEUE
*prio_queue
)
3120 gni_return_t status
;
3122 #if CMI_SENDBUFFERSMSG_CAP
3123 int sent_length
= 0;
3128 int nonempty
= PCQueueLength(queue
->nonEmptyQueues
);
3129 for(idx
=0; idx
<nonempty
; idx
++)
3131 index
++; if (index
>= nonempty
) index
= 0;
3132 #if CMI_SENDBUFFERSMSG_CAP
3133 if ( sent_length
>= SendBufferMsg_cap
) { done
= 0; return done
;}
3135 CMI_PCQUEUEPOP_LOCK(queue
->nonEmptyQueues
)
3136 MSG_LIST_INDEX
*current_list
= (MSG_LIST_INDEX
*)PCQueuePop(queue
->nonEmptyQueues
);
3137 CMI_PCQUEUEPOP_UNLOCK(queue
->nonEmptyQueues
)
3138 if(current_list
== NULL
) break;
3139 if (prio_queue
&& PCQueueLength(prio_queue
->smsg_msglist_index
[current_list
->destpe
].sendSmsgBuf
) != 0) {
3140 PCQueuePush(queue
->nonEmptyQueues
, (char*)current_list
);
3143 PCQueue current_queue
= current_list
->sendSmsgBuf
;
3144 CmiLock(current_list
->lock
);
3145 int i
, len
= PCQueueLength(current_queue
);
3146 current_list
->pushed
= 0;
3147 CmiUnlock(current_list
->lock
);
3148 #else /* ! SMP_LOCKS */
3149 static int index
= -1;
3150 for(idx
=0; idx
<mysize
; idx
++)
3152 index
++; if (index
== mysize
) index
= 0;
3153 #if CMI_SENDBUFFERSMSG_CAP
3154 if ( sent_length
>= SendBufferMsg_cap
) { done
= 0; return done
;}
3156 if (prio_queue
&& PCQueueLength(prio_queue
->smsg_msglist_index
[index
].sendSmsgBuf
) != 0) continue; // check urgent queue
3157 //if (index == myrank) continue;
3158 PCQueue current_queue
= queue
->smsg_msglist_index
[index
].sendSmsgBuf
;
3159 int i
, len
= PCQueueLength(current_queue
);
3161 for (i
=0; i
<len
; i
++) {
3162 CMI_PCQUEUEPOP_LOCK(current_queue
)
3163 ptr
= (MSG_LIST
*)PCQueuePop(current_queue
);
3164 CMI_PCQUEUEPOP_UNLOCK(current_queue
)
3165 if (ptr
== 0) break;
3167 status
= _sendOneBufferedSmsg(queue
, ptr
);
3168 #if CMI_SENDBUFFERSMSG_CAP
3171 if(status
== GNI_RC_SUCCESS
)
3174 buffered_smsg_counter
--;
3175 printf("[%d==>%d] buffered smsg sending done\n", myrank
, ptr
->destNode
);
3179 PCQueuePush(current_queue
, (char*)ptr
);
3181 if(status
== GNI_RC_ERROR_RESOURCE
)
3188 CmiLock(current_list
->lock
);
3189 if(!PCQueueEmpty(current_queue
) && current_list
->pushed
== 0)
3191 current_list
->pushed
= 1;
3192 PCQueuePush(queue
->nonEmptyQueues
, (char*)current_list
);
3194 CmiUnlock(current_list
->lock
);
3196 } // end pooling for all cores
3202 static void ProcessDeadlock();
3203 void LrtsAdvanceCommunication(int whileidle
)
3205 static int count
= 0;
3206 /* Receive Msg first */
3207 #if CMK_SMP_TRACE_COMMTHREAD
3208 double startT
, endT
;
3210 if (useDynamicSMSG
&& whileidle
)
3212 #if CMK_SMP_TRACE_COMMTHREAD
3213 startT
= CmiWallTimer();
3215 STATS_PUMPDATAGRAMCONNECTION_TIME(PumpDatagramConnection());
3216 #if CMK_SMP_TRACE_COMMTHREAD
3217 endT
= CmiWallTimer();
3218 if (endT
-startT
>=TRACE_THRESHOLD
) traceUserBracketEvent(event_SetupConnect
, startT
, endT
);
3222 SEND_OOB_SMSG(smsg_oob_queue
)
3223 PUMP_REMOTE_HIGHPRIORITY
3224 PUMP_LOCAL_HIGHPRIORITY
3225 POST_HIGHPRIORITY_RDMA
3226 // Receiving small messages and persistent
3227 #if CMK_SMP_TRACE_COMMTHREAD
3228 startT
= CmiWallTimer();
3230 STATS_PUMPNETWORK_TIME(PumpNetworkSmsg());
3231 #if CMK_SMP_TRACE_COMMTHREAD
3232 endT
= CmiWallTimer();
3233 if (endT
-startT
>=TRACE_THRESHOLD
) traceUserBracketEvent(event_PumpSmsg
, startT
, endT
);
3236 SEND_OOB_SMSG(smsg_oob_queue
)
3237 PUMP_REMOTE_HIGHPRIORITY
3238 PUMP_LOCAL_HIGHPRIORITY
3239 POST_HIGHPRIORITY_RDMA
3241 ///* Send buffered Message */
3242 #if CMK_SMP_TRACE_COMMTHREAD
3243 startT
= CmiWallTimer();
3246 STATS_SEND_SMSGS_TIME(SendBufferMsg(&smsg_queue
, &smsg_oob_queue
));
3248 STATS_SEND_SMSGS_TIME(SendBufferMsg(&smsg_queue
, NULL
));
3250 #if CMK_SMP_TRACE_COMMTHREAD
3251 endT
= CmiWallTimer();
3252 if (endT
-startT
>=TRACE_THRESHOLD
) traceUserBracketEvent(event_SendBufferSmsg
, startT
, endT
);
3255 SEND_OOB_SMSG(smsg_oob_queue
)
3256 PUMP_REMOTE_HIGHPRIORITY
3257 PUMP_LOCAL_HIGHPRIORITY
3258 POST_HIGHPRIORITY_RDMA
3260 //Pump Get messages or PUT messages
3261 #if CMK_SMP_TRACE_COMMTHREAD
3262 startT
= CmiWallTimer();
3264 PumpLocalTransactions(default_tx_cqh
, default_tx_cq_lock
);
3265 #if MULTI_THREAD_SEND
3266 STATS_PUMPLOCALTRANSACTIONS_RDMA_TIME(PumpLocalTransactions(rdma_tx_cqh
, rdma_tx_cq_lock
));
3268 #if CMK_SMP_TRACE_COMMTHREAD
3269 endT
= CmiWallTimer();
3270 if (endT
-startT
>=TRACE_THRESHOLD
) traceUserBracketEvent(event_PumpTransaction
, startT
, endT
);
3273 SEND_OOB_SMSG(smsg_oob_queue
)
3274 PUMP_REMOTE_HIGHPRIORITY
3275 PUMP_LOCAL_HIGHPRIORITY
3276 POST_HIGHPRIORITY_RDMA
3278 #if CMK_SMP_TRACE_COMMTHREAD
3279 startT
= CmiWallTimer();
3282 PumpCqWriteTransactions();
3285 STATS_PUMPREMOTETRANSACTIONS_TIME(PumpRemoteTransactions(rdma_rx_cqh
));
3287 #if CMK_SMP_TRACE_COMMTHREAD
3288 endT
= CmiWallTimer();
3289 if (endT
-startT
>=TRACE_THRESHOLD
) traceUserBracketEvent(event_PumpRdmaTransaction
, startT
, endT
);
3292 SEND_OOB_SMSG(smsg_oob_queue
)
3293 PUMP_REMOTE_HIGHPRIORITY
3294 PUMP_LOCAL_HIGHPRIORITY
3295 POST_HIGHPRIORITY_RDMA
3297 #if CMK_SMP_TRACE_COMMTHREAD
3298 startT
= CmiWallTimer();
3300 STATS_SENDRDMAMSG_TIME(SendRdmaMsg(sendRdmaBuf
));
3301 #if CMK_SMP_TRACE_COMMTHREAD
3302 endT
= CmiWallTimer();
3303 if (endT
-startT
>=TRACE_THRESHOLD
) traceUserBracketEvent(event_SendFmaRdmaMsg
, startT
, endT
);
3306 #if CMK_SMP && ! LARGEPAGE
3307 if (_detected_hang
) ProcessDeadlock();
3311 static void set_smsg_max()
3317 SMSG_MAX_MSG
= 1024;
3318 }else if (mysize
<= 4096)
3320 SMSG_MAX_MSG
= 1024;
3321 }else if (mysize
<= 16384)
3328 env
= getenv("CHARM_UGNI_SMSG_MAX_SIZE");
3329 if (env
) SMSG_MAX_MSG
= atoi(env
);
3330 CmiAssert(SMSG_MAX_MSG
> 0);
3333 /* useDynamicSMSG */
3334 static void _init_dynamic_smsg()
3336 gni_return_t status
;
3337 uint32_t vmdh_index
= -1;
3340 smsg_attr_vector_local
= (gni_smsg_attr_t
**)malloc(mysize
* sizeof(gni_smsg_attr_t
*));
3341 smsg_attr_vector_remote
= (gni_smsg_attr_t
**)malloc(mysize
* sizeof(gni_smsg_attr_t
*));
3342 smsg_connected_flag
= (int*)malloc(sizeof(int)*mysize
);
3343 for(i
=0; i
<mysize
; i
++) {
3344 smsg_connected_flag
[i
] = 0;
3345 smsg_attr_vector_local
[i
] = NULL
;
3346 smsg_attr_vector_remote
[i
] = NULL
;
3351 send_smsg_attr
.msg_type
= GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT
;
3352 send_smsg_attr
.mbox_maxcredit
= SMSG_MAX_CREDIT
;
3353 send_smsg_attr
.msg_maxsize
= SMSG_MAX_MSG
;
3354 status
= GNI_SmsgBufferSizeNeeded(&send_smsg_attr
, &smsg_memlen
);
3355 GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status
);
3357 mailbox_list
= (dynamic_smsg_mailbox_t
*)malloc(sizeof(dynamic_smsg_mailbox_t
));
3358 mailbox_list
->size
= smsg_memlen
*avg_smsg_connection
;
3359 posix_memalign(&mailbox_list
->mailbox_base
, 64, mailbox_list
->size
);
3360 bzero(mailbox_list
->mailbox_base
, mailbox_list
->size
);
3361 mailbox_list
->offset
= 0;
3362 mailbox_list
->next
= 0;
3364 status
= GNI_MemRegister(nic_hndl
, (uint64_t)(mailbox_list
->mailbox_base
),
3365 mailbox_list
->size
, smsg_rx_cqh
,
3368 &(mailbox_list
->mem_hndl
));
3369 GNI_RC_CHECK("MEMORY registration for smsg", status
);
3371 status
= GNI_EpCreate(nic_hndl
, default_tx_cqh
, &ep_hndl_unbound
);
3372 GNI_RC_CHECK("Unbound EP", status
);
3374 alloc_smsg_attr(&send_smsg_attr
);
3376 status
= GNI_EpPostDataWId (ep_hndl_unbound
, &send_smsg_attr
, SMSG_ATTR_SIZE
, &recv_smsg_attr
, SMSG_ATTR_SIZE
, myrank
);
3377 GNI_RC_CHECK("post unbound datagram", status
);
3379 /* always pre-connect to proc 0 */
3380 //if (myrank != 0) connect_to(0);
3382 status
= GNI_SmsgSetMaxRetrans(nic_hndl
, 4096);
3383 GNI_RC_CHECK("SmsgSetMaxRetrans Init", status
);
3386 static void _init_static_smsg()
3388 gni_smsg_attr_t
*smsg_attr
;
3389 gni_smsg_attr_t remote_smsg_attr
;
3390 gni_smsg_attr_t
*smsg_attr_vec
;
3391 gni_mem_handle_t my_smsg_mdh_mailbox
;
3393 gni_return_t status
;
3394 uint32_t vmdh_index
= -1;
3395 mdh_addr_t base_infor
;
3396 mdh_addr_t
*base_addr_vec
;
3400 smsg_attr
= malloc(mysize
* sizeof(gni_smsg_attr_t
));
3402 smsg_attr
[0].msg_type
= GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT
;
3403 smsg_attr
[0].mbox_maxcredit
= SMSG_MAX_CREDIT
;
3404 smsg_attr
[0].msg_maxsize
= SMSG_MAX_MSG
;
3405 status
= GNI_SmsgBufferSizeNeeded(&smsg_attr
[0], &smsg_memlen
);
3406 GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status
);
3407 ret
= posix_memalign(&smsg_mailbox_base
, 64, smsg_memlen
*(mysize
));
3408 CmiAssert(ret
== 0);
3409 bzero(smsg_mailbox_base
, smsg_memlen
*(mysize
));
3411 status
= GNI_MemRegister(nic_hndl
, (uint64_t)smsg_mailbox_base
,
3412 smsg_memlen
*(mysize
), smsg_rx_cqh
,
3415 &my_smsg_mdh_mailbox
);
3416 register_memory_size
+= smsg_memlen
*(mysize
);
3417 GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status
);
3419 if (myrank
== 0) printf("Charm++> SMSG memory: %1.1fKB\n", 1.0*smsg_memlen
*(mysize
)/1024);
3420 if (myrank
== 0 && register_memory_size
>=MAX_REG_MEM
) printf("Charm++> FATAL ERROR your program has risk of hanging \n please set CHARM_UGNI_MEMPOOL_MAX to a larger value or use Dynamic smsg\n");
3422 base_infor
.addr
= (uint64_t)smsg_mailbox_base
;
3423 base_infor
.mdh
= my_smsg_mdh_mailbox
;
3424 base_addr_vec
= malloc(mysize
* sizeof(mdh_addr_t
));
3426 allgather(&base_infor
, base_addr_vec
, sizeof(mdh_addr_t
));
3428 for(i
=0; i
<mysize
; i
++)
3432 smsg_attr
[i
].msg_type
= GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT
;
3433 smsg_attr
[i
].mbox_maxcredit
= SMSG_MAX_CREDIT
;
3434 smsg_attr
[i
].msg_maxsize
= SMSG_MAX_MSG
;
3435 smsg_attr
[i
].mbox_offset
= i
*smsg_memlen
;
3436 smsg_attr
[i
].buff_size
= smsg_memlen
;
3437 smsg_attr
[i
].msg_buffer
= smsg_mailbox_base
;
3438 smsg_attr
[i
].mem_hndl
= my_smsg_mdh_mailbox
;
3441 for(i
=0; i
<mysize
; i
++)
3443 if (myrank
== i
) continue;
3445 remote_smsg_attr
.msg_type
= GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT
;
3446 remote_smsg_attr
.mbox_maxcredit
= SMSG_MAX_CREDIT
;
3447 remote_smsg_attr
.msg_maxsize
= SMSG_MAX_MSG
;
3448 remote_smsg_attr
.mbox_offset
= myrank
*smsg_memlen
;
3449 remote_smsg_attr
.buff_size
= smsg_memlen
;
3450 remote_smsg_attr
.msg_buffer
= (void*)base_addr_vec
[i
].addr
;
3451 remote_smsg_attr
.mem_hndl
= base_addr_vec
[i
].mdh
;
3453 /* initialize the smsg channel */
3454 status
= GNI_SmsgInit(ep_hndl_array
[i
], &smsg_attr
[i
], &remote_smsg_attr
);
3455 GNI_RC_CHECK("SMSG Init", status
);
3456 } //end initialization
3458 free(base_addr_vec
);
3461 status
= GNI_SmsgSetMaxRetrans(nic_hndl
, 4096);
3462 GNI_RC_CHECK("SmsgSetMaxRetrans Init", status
);
3466 static void _init_send_queue(SMSG_QUEUE
*queue
)
3470 queue
->sendMsgBuf
= PCQueueCreate();
3471 destpe_avail
= (char*)malloc(mysize
* sizeof(char));
3473 queue
->smsg_msglist_index
= (MSG_LIST_INDEX
*)malloc(mysize
*sizeof(MSG_LIST_INDEX
));
3475 queue
->nonEmptyQueues
= PCQueueCreate();
3477 for(i
=0; i
<mysize
; i
++)
3479 queue
->smsg_msglist_index
[i
].sendSmsgBuf
= PCQueueCreate();
3481 queue
->smsg_msglist_index
[i
].pushed
= 0;
3482 queue
->smsg_msglist_index
[i
].lock
= CmiCreateLock();
3483 queue
->smsg_msglist_index
[i
].destpe
= i
;
3490 static void _init_smsg()
3494 _init_dynamic_smsg();
3496 _init_static_smsg();
3499 _init_send_queue(&smsg_queue
);
3501 _init_send_queue(&smsg_oob_queue
);
3505 static void _init_static_msgq()
3507 gni_return_t status
;
3508 /* MSGQ is to send and receive short messages for large jobs (exceeding 200,000 ranks). The performance scales by the node count rather than rank count */
3509 msgq_attrs
.max_msg_sz
= MSGQ_MAXSIZE
;
3510 msgq_attrs
.smsg_q_sz
= 1;
3511 msgq_attrs
.rcv_pool_sz
= 1;
3512 msgq_attrs
.num_msgq_eps
= 2;
3513 msgq_attrs
.nloc_insts
= 8;
3514 msgq_attrs
.modes
= 0;
3515 msgq_attrs
.rcv_cq_sz
= REMOTE_QUEUE_ENTRIES
;
3517 status
= GNI_MsgqInit(nic_hndl
, NULL
, NULL
, NULL
, &msgq_attrs
, &msgq_handle
);
3518 GNI_RC_CHECK("MSGQ Init", status
);
3524 static CmiUInt8 total_mempool_size
= 0;
3525 static CmiUInt8 total_mempool_calls
= 0;
3527 #if USE_LRTS_MEMPOOL
3529 #if CMK_PERSISTENT_COMM
3530 void *alloc_persistent_mempool_block(size_t *size
, gni_mem_handle_t
*mem_hndl
, int expand_flag
)
3534 gni_return_t status
= GNI_RC_SUCCESS
;
3536 size_t default_size
= expand_flag
? _expand_mem
: _mempool_size
;
3537 if (*size
< default_size
) *size
= default_size
;
3539 // round up to be multiple of _tlbpagesize
3540 //*size = (*size + _tlbpagesize - 1)/_tlbpagesize*_tlbpagesize;
3541 *size
= ALIGNHUGEPAGE(*size
);
3543 total_mempool_size
+= *size
;
3544 total_mempool_calls
+= 1;
3546 if ((*size
> MAX_REG_MEM
|| *size
> MAX_BUFF_SEND
) && expand_flag
)
3548 printf("Error: A mempool block with size %lld is allocated, which is greater than the maximum mempool allowed.\n Please increase the max pool size by using +gni-mempool-max or set enviorment variable CHARM_UGNI_MEMPOOL_MAX. (current=%lld, %lld)\n", *size
, MAX_REG_MEM
, MAX_BUFF_SEND
);
3549 CmiAbort("alloc_mempool_block");
3553 pool
= my_get_huge_pages(*size
);
3556 ret
= posix_memalign(&pool
, ALIGNBUF
, *size
);
3559 printf("Charm++> can not allocate memory pool of size %.2fMB. \n", 1.0*(*size
)/1024/1024);
3561 CmiAbort("alloc_mempool_block: out of memory.");
3563 CmiAbort("alloc_mempool_block: posix_memalign failed");
3568 MEMORY_REGISTER(onesided_hnd
, nic_hndl
, pool
, *size
, mem_hndl
, &omdh
, highpriority_rx_cqh
, status
);
3570 if(status
!= GNI_RC_SUCCESS
) {
3571 printf("[%d, %d] memory reigstration %f G (%lld) ask for %lld\n", myrank
, CmiMyRank(), register_memory_size
/(1024*1024.0*1024),register_count
, *size
);
3572 sweep_mempool(CpvAccess(mempool
));
3574 GNI_RC_CHECK("MEMORY_REGISTER", status
);
3576 SetMemHndlZero((*mem_hndl
));
3582 void *alloc_mempool_block(size_t *size
, gni_mem_handle_t
*mem_hndl
, int expand_flag
)
3586 gni_return_t status
= GNI_RC_SUCCESS
;
3588 size_t default_size
= expand_flag
? _expand_mem
: _mempool_size
;
3589 if (*size
< default_size
) *size
= default_size
;
3591 // round up to be multiple of _tlbpagesize
3592 //*size = (*size + _tlbpagesize - 1)/_tlbpagesize*_tlbpagesize;
3593 *size
= ALIGNHUGEPAGE(*size
);
3595 total_mempool_size
+= *size
;
3596 total_mempool_calls
+= 1;
3598 if ((*size
> MAX_REG_MEM
|| *size
> MAX_BUFF_SEND
) && expand_flag
)
3600 printf("Error: A mempool block with size %lld is allocated, which is greater than the maximum mempool allowed.\n Please increase the max pool size by using +gni-mempool-max or set enviorment variable CHARM_UGNI_MEMPOOL_MAX. (current=%lld, %lld)\n", *size
, MAX_REG_MEM
, MAX_BUFF_SEND
);
3601 CmiAbort("alloc_mempool_block");
3605 pool
= my_get_huge_pages(*size
);
3608 ret
= posix_memalign(&pool
, ALIGNBUF
, *size
);
3611 printf("Charm++> can not allocate memory pool of size %.2fMB. \n", 1.0*(*size
)/1024/1024);
3613 CmiAbort("alloc_mempool_block: out of memory.");
3615 CmiAbort("alloc_mempool_block: posix_memalign failed");
3620 MEMORY_REGISTER(onesided_hnd
, nic_hndl
, pool
, *size
, mem_hndl
, &omdh
, rdma_rx_cqh
, status
);
3622 if(status
!= GNI_RC_SUCCESS
) {
3623 printf("[%d, %d] memory reigstration %f G (%lld) ask for %lld\n", myrank
, CmiMyRank(), register_memory_size
/(1024*1024.0*1024),register_count
, *size
);
3624 sweep_mempool(CpvAccess(mempool
));
3626 GNI_RC_CHECK("MEMORY_REGISTER", status
);
3628 SetMemHndlZero((*mem_hndl
));
3633 // ptr is a block head pointer
3634 void free_mempool_block(void *ptr
, gni_mem_handle_t mem_hndl
)
3636 if(!(IsMemHndlZero(mem_hndl
)))
3638 MEMORY_DEREGISTER(onesided_hnd
, nic_hndl
, &mem_hndl
, &omdh
, GetSizeFromBlockHeader(ptr
));
3641 my_free_huge_pages(ptr
, GetSizeFromBlockHeader(ptr
));
3648 void LrtsPreCommonInit(int everReturn
){
3649 #if USE_LRTS_MEMPOOL
3650 CpvInitialize(mempool_type
*, mempool
);
3651 CpvAccess(mempool
) = mempool_init(_mempool_size
, alloc_mempool_block
, free_mempool_block
, _mempool_size_limit
);
3652 #if CMK_PERSISTENT_COMM
3653 CpvInitialize(mempool_type
*, persistent_mempool
);
3654 CpvAccess(persistent_mempool
) = mempool_init(_mempool_size
, alloc_persistent_mempool_block
, free_mempool_block
, _mempool_size_limit
);
3656 MACHSTATE2(8, "mempool_init %d %p\n", CmiMyRank(), CpvAccess(mempool
)) ;
3660 void LrtsInit(int *argc
, char ***argv
, int *numNodes
, int *myNodeID
)
3665 unsigned int remote_addr
;
3666 gni_cdm_handle_t cdm_hndl
;
3667 gni_return_t status
= GNI_RC_SUCCESS
;
3668 uint32_t vmdh_index
= -1;
3670 unsigned int local_addr
, *MPID_UGNI_AllAddr
;
3675 //void (*local_event_handler)(gni_cq_entry_t *, void *) = &LocalEventHandle;
3676 //void (*remote_smsg_event_handler)(gni_cq_entry_t *, void *) = &RemoteSmsgEventHandle;
3677 //void (*remote_bte_event_handler)(gni_cq_entry_t *, void *) = &RemoteBteEventHandle;
3679 status
= PMI_Init(&first_spawned
);
3680 GNI_RC_CHECK("PMI_Init", status
);
3682 status
= PMI_Get_size(&mysize
);
3683 GNI_RC_CHECK("PMI_Getsize", status
);
3685 status
= PMI_Get_rank(&myrank
);
3686 GNI_RC_CHECK("PMI_getrank", status
);
3688 //physicalID = CmiPhysicalNodeID(myrank);
3690 //printf("Pysical Node ID:%d for PE:%d\n", physicalID, myrank);
3695 #if MULTI_THREAD_SEND
3696 /* Currently, we only consider the case that comm. thread will only recv msgs */
3697 Cmi_smp_mode_setting
= COMM_WORK_THREADS_SEND_RECV
;
3700 #if CMI_EXERT_SEND_LARGE_CAP
3701 CmiGetArgInt(*argv
,"+useSendLargeCap", &SEND_large_cap
);
3704 #if CMI_SENDBUFFERSMSG_CAP
3705 CmiGetArgInt(*argv
,"+useSendBufferCap", &SendBufferMsg_cap
);
3708 #if CMI_PUMPNETWORKSMSG_CAP
3709 CmiGetArgInt(*argv
,"+usePumpSmsgCap", &PumpNetworkSmsg_cap
);
3712 CmiGetArgInt(*argv
,"+useRecvQueue", &REMOTE_QUEUE_ENTRIES
);
3714 env
= getenv("CHARM_UGNI_REMOTE_QUEUE_SIZE");
3715 if (env
) REMOTE_QUEUE_ENTRIES
= atoi(env
);
3716 CmiGetArgInt(*argv
,"+useRecvQueue", &REMOTE_QUEUE_ENTRIES
);
3718 env
= getenv("CHARM_UGNI_LOCAL_QUEUE_SIZE");
3719 if (env
) LOCAL_QUEUE_ENTRIES
= atoi(env
);
3720 CmiGetArgInt(*argv
,"+useSendQueue", &LOCAL_QUEUE_ENTRIES
);
3722 env
= getenv("CHARM_UGNI_DYNAMIC_SMSG");
3723 if (env
) useDynamicSMSG
= 1;
3724 if (!useDynamicSMSG
)
3725 useDynamicSMSG
= CmiGetArgFlag(*argv
, "+useDynamicSmsg");
3726 CmiGetArgIntDesc(*argv
, "+smsgConnection", &avg_smsg_connection
,"Initial number of SMSGS connection per code");
3727 if (avg_smsg_connection
>mysize
) avg_smsg_connection
= mysize
;
3728 //useStaticMSGQ = CmiGetArgFlag(*argv, "+useStaticMsgQ");
3732 printf("Charm++> Running on Gemini (GNI) with %d processes\n", mysize
);
3733 printf("Charm++> %s SMSG\n", useDynamicSMSG
?"dynamic":"static");
3736 onesided_init(NULL
, &onesided_hnd
);
3738 // this is a GNI test, so use the libonesided bypass functionality
3739 onesided_gni_bypass_get_nih(onesided_hnd
, &nic_hndl
);
3740 local_addr
= gniGetNicAddress();
3743 cookie
= get_cookie();
3745 modes
= GNI_CDM_MODE_CQ_NIC_LOCAL_PLACEMENT
;
3747 //Create and attach to the communication domain */
3748 status
= GNI_CdmCreate(myrank
, ptag
, cookie
, modes
, &cdm_hndl
);
3749 GNI_RC_CHECK("GNI_CdmCreate", status
);
3750 //* device id The device id is the minor number for the device
3751 //that is assigned to the device by the system when the device is created.
3752 //To determine the device number, look in the /dev directory, which contains a list of devices. For a NIC, the device is listed as kgniX
3753 //where X is the device number 0 default
3754 status
= GNI_CdmAttach(cdm_hndl
, device_id
, &local_addr
, &nic_hndl
);
3755 GNI_RC_CHECK("GNI_CdmAttach", status
);
3756 local_addr
= get_gni_nic_address(0);
3758 MPID_UGNI_AllAddr
= (unsigned int *)malloc(sizeof(unsigned int) * mysize
);
3759 _MEMCHECK(MPID_UGNI_AllAddr
);
3760 allgather(&local_addr
, MPID_UGNI_AllAddr
, sizeof(unsigned int));
3761 /* create the local completion queue */
3762 /* the third parameter : The number of events the NIC allows before generating an interrupt. Setting this parameter to zero results in interrupt delivery with every event. When using this parameter, the mode parameter must be set to GNI_CQ_BLOCKING*/
3763 status
= GNI_CqCreate(nic_hndl
, LOCAL_QUEUE_ENTRIES
, 0, GNI_CQ_NOBLOCK
, NULL
, NULL
, &default_tx_cqh
);
3764 GNI_RC_CHECK("GNI_CqCreate (tx)", status
);
3765 #if MULTI_THREAD_SEND
3766 status
= GNI_CqCreate(nic_hndl
, LOCAL_QUEUE_ENTRIES
, 0, GNI_CQ_NOBLOCK
, NULL
, NULL
, &rdma_tx_cqh
);
3767 GNI_RC_CHECK("GNI_CqCreate RDMA (tx)", status
);
3771 status
= GNI_CqCreate(nic_hndl
, LOCAL_QUEUE_ENTRIES
, 0, GNI_CQ_NOBLOCK
, NULL
, NULL
, &highprior_rdma_tx_cqh
);
3772 GNI_RC_CHECK("GNI_CqCreate high priority RDMA (tx)", status
);
3774 /* create the destination completion queue for receiving micro-messages, make this queue considerably larger than the number of transfers */
3776 status
= GNI_CqCreate(nic_hndl
, REMOTE_QUEUE_ENTRIES
, 0, GNI_CQ_NOBLOCK
, NULL
, NULL
, &smsg_rx_cqh
);
3777 GNI_RC_CHECK("Create CQ (rx)", status
);
3779 status
= GNI_CqCreate(nic_hndl
, REMOTE_QUEUE_ENTRIES
, 0, GNI_CQ_NOBLOCK
, NULL
, NULL
, &rdma_rx_cqh
);
3780 GNI_RC_CHECK("Create Post CQ (rx)", status
);
3782 #if CMK_PERSISTENT_COMM
3783 status
= GNI_CqCreate(nic_hndl
, REMOTE_QUEUE_ENTRIES
, 0, GNI_CQ_NOBLOCK
, NULL
, NULL
, &highpriority_rx_cqh
);
3784 GNI_RC_CHECK("Create Post CQ (rx)", status
);
3786 //status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &rdma_cqh);
3787 //GNI_RC_CHECK("Create BTE CQ", status);
3789 /* create the endpoints. they need to be bound to allow later CQWrites to them */
3790 ep_hndl_array
= (gni_ep_handle_t
*)malloc(mysize
* sizeof(gni_ep_handle_t
));
3791 _MEMCHECK(ep_hndl_array
);
3792 #if MULTI_THREAD_SEND
3793 rx_cq_lock
= global_gni_lock
= default_tx_cq_lock
= smsg_mailbox_lock
= CmiCreateLock();
3794 //default_tx_cq_lock = CmiCreateLock();
3795 rdma_tx_cq_lock
= CmiCreateLock();
3796 smsg_rx_cq_lock
= CmiCreateLock();
3797 //global_gni_lock = CmiCreateLock();
3798 //rx_cq_lock = CmiCreateLock();
3800 for (i
=0; i
<mysize
; i
++) {
3801 if(i
== myrank
) continue;
3802 status
= GNI_EpCreate(nic_hndl
, default_tx_cqh
, &ep_hndl_array
[i
]);
3803 GNI_RC_CHECK("GNI_EpCreate ", status
);
3804 remote_addr
= MPID_UGNI_AllAddr
[i
];
3805 status
= GNI_EpBind(ep_hndl_array
[i
], remote_addr
, i
);
3806 GNI_RC_CHECK("GNI_EpBind ", status
);
3809 /* SMSG is fastest but not scale; Msgq is scalable, FMA is own implementation for small message */
3813 #if USE_LRTS_MEMPOOL
3814 env
= getenv("CHARM_UGNI_MAX_MEMORY_ON_NODE");
3816 _totalmem
= CmiReadSize(env
);
3818 printf("Charm++> total registered memory available per node is %.1fGB\n", (float)(_totalmem
*1.0/oneGB
));
3821 env
= getenv("CHARM_UGNI_MEMPOOL_INIT_SIZE");
3822 if (env
) _mempool_size
= CmiReadSize(env
);
3823 if (CmiGetArgStringDesc(*argv
,"+gni-mempool-init-size",&env
,"Set the memory pool size"))
3824 _mempool_size
= CmiReadSize(env
);
3827 env
= getenv("CHARM_UGNI_MEMPOOL_MAX");
3829 MAX_REG_MEM
= CmiReadSize(env
);
3832 if (CmiGetArgStringDesc(*argv
,"+gni-mempool-max",&env
,"Set the memory pool max size")) {
3833 MAX_REG_MEM
= CmiReadSize(env
);
3837 env
= getenv("CHARM_UGNI_SEND_MAX");
3839 MAX_BUFF_SEND
= CmiReadSize(env
);
3842 if (CmiGetArgStringDesc(*argv
,"+gni-mempool-max-send",&env
,"Set the memory pool max size for send")) {
3843 MAX_BUFF_SEND
= CmiReadSize(env
);
3847 env
= getenv("CHARM_UGNI_MEMPOOL_SIZE_LIMIT");
3849 _mempool_size_limit
= CmiReadSize(env
);
3852 if (MAX_REG_MEM
< _mempool_size
) MAX_REG_MEM
= _mempool_size
;
3853 if (MAX_BUFF_SEND
> MAX_REG_MEM
) MAX_BUFF_SEND
= MAX_REG_MEM
;
3856 printf("Charm++> memory pool init block size: %1.fMB, total registered memory per node: %1.fMB\n", _mempool_size
/1024.0/1024, _mempool_size_limit
/1024.0/1024);
3857 printf("Charm++> memory pool registered memory limit: %1.fMB, send limit: %1.fMB\n", MAX_REG_MEM
/1024.0/1024, MAX_BUFF_SEND
/1024.0/1024);
3858 if (MAX_REG_MEM
< BIG_MSG
* 2 + oneMB
) {
3859 /* memblock can expand to BIG_MSG * 2 size */
3860 printf("Charm++ Error: The mempool maximum size is too small, please use command line option +gni-mempool-max or environment variable CHARM_UGNI_MEMPOOL_MAX to increase the value to at least %1.fMB.\n", BIG_MSG
* 2.0/1024/1024 + 1);
3861 CmiAbort("mempool maximum size is too small. \n");
3863 #if MULTI_THREAD_SEND
3864 printf("Charm++> worker thread sending messages\n");
3865 #elif COMM_THREAD_SEND
3866 printf("Charm++> only comm thread send/recv messages\n");
3870 #endif /* end of USE_LRTS_MEMPOOL */
3872 env
= getenv("CHARM_UGNI_BIG_MSG_SIZE");
3874 BIG_MSG
= CmiReadSize(env
);
3875 if (BIG_MSG
< ONE_SEG
)
3876 CmiAbort("BIG_MSG size is too small in the environment variable CHARM_UGNI_BIG_MSG_SIZE.");
3878 env
= getenv("CHARM_UGNI_BIG_MSG_PIPELINE_LEN");
3880 BIG_MSG_PIPELINE
= atoi(env
);
3883 env
= getenv("CHARM_UGNI_NO_DEADLOCK_CHECK");
3884 if (env
) _checkProgress
= 0;
3885 if (mysize
== 1) _checkProgress
= 0;
3887 #if CMI_EXERT_RECV_RDMA_CAP
3888 env
= getenv("CHARM_UGNI_RDMA_MAX");
3890 RDMA_pending
= atoi(env
);
3892 printf("Charm++> Max pending RDMA set to: %d\n", RDMA_pending
);
3897 env = getenv("HUGETLB_DEFAULT_PAGE_SIZE");
3899 _tlbpagesize = CmiReadSize(env);
3901 /* real gethugepagesize() is only available when hugetlb module linked */
3902 _tlbpagesize
= gethugepagesize();
3904 printf("Charm++> Cray TLB page size: %1.fK\n", _tlbpagesize
/1024.0);
3908 if (_tlbpagesize
== 4096) {
3909 CmiAbort("Hugepage module, e.g. craype-hugepages8M must be loaded.");
3913 /* stats related arguments */
3915 CmiGetArgStringDesc(*argv
,"+gni_stats_root",&counters_dirname
,"counter directory name, default counters");
3917 print_stats
= CmiGetArgFlag(*argv
, "+print_stats");
3919 stats_off
= CmiGetArgFlag(*argv
, "+stats_off");
3924 /* init DMA buffer for medium message */
3926 //_init_DMA_buffer();
3928 free(MPID_UGNI_AllAddr
);
3930 sendRdmaBuf
= PCQueueCreate();
3931 sendHighPriorBuf
= PCQueueCreate();
3933 #if MACHINE_DEBUG_LOG
3935 sprintf(ln
,"debugLog.%d",myrank
);
3936 debugLog
=fopen(ln
,"w");
3940 // ntk_return_t sts = NTK_System_GetSmpdCount(&_smpd_count);
3944 while (1<<SHIFT
< mysize
) SHIFT
++;
3945 CmiAssert(SHIFT
< 31);
3946 IndexPool_init(&ackPool
);
3947 #if CMK_PERSISTENT_COMM
3948 IndexPool_init(&persistPool
);
3953 void* LrtsAlloc(int n_bytes
, int header
)
3957 printf("\n[PE:%d]Alloc Lrts for bytes=%d, head=%d %d\n", CmiMyPe(), n_bytes
, header
, SMSG_MAX_MSG
);
3959 if(n_bytes
<= SMSG_MAX_MSG
)
3961 int totalsize
= n_bytes
+header
;
3962 ptr
= malloc(totalsize
);
3965 CmiAssert(header
+sizeof(mempool_header
) <= ALIGNBUF
);
3966 #if USE_LRTS_MEMPOOL
3967 n_bytes
= ALIGN64(n_bytes
);
3968 if(n_bytes
< BIG_MSG
)
3970 char *res
= mempool_malloc(CpvAccess(mempool
), ALIGNBUF
+n_bytes
-sizeof(mempool_header
), 1);
3971 if (res
) ptr
= res
- sizeof(mempool_header
) + ALIGNBUF
- header
;
3975 //printf("[%d] LrtsAlloc a big_msg: %d %d\n", myrank, n_bytes, ALIGNHUGEPAGE(n_bytes+ALIGNBUF));
3976 n_bytes
= ALIGNHUGEPAGE(n_bytes
+ALIGNBUF
);
3977 char *res
= my_get_huge_pages(n_bytes
);
3979 char *res
= memalign(ALIGNBUF
, n_bytes
+ALIGNBUF
);
3981 if (res
) ptr
= res
+ ALIGNBUF
- header
;
3984 n_bytes
= ALIGN64(n_bytes
); /* make sure size if 4 aligned */
3985 char *res
= memalign(ALIGNBUF
, n_bytes
+ALIGNBUF
);
3986 ptr
= res
+ ALIGNBUF
- header
;
3992 void LrtsFree(void *msg
)
3994 CmiUInt4 size
= SIZEFIELD((char*)msg
+sizeof(CmiChunkHeader
));
3995 #if CMK_PERSISTENT_COMM
3996 if (IS_PERSISTENT_MEMORY(msg
)) return;
3998 if (size
<= SMSG_MAX_MSG
)
4001 size
= ALIGN64(size
);
4005 int s
= ALIGNHUGEPAGE(size
+ALIGNBUF
);
4006 my_free_huge_pages((char*)msg
+ sizeof(CmiChunkHeader
) - ALIGNBUF
, s
);
4008 free((char*)msg
+ sizeof(CmiChunkHeader
) - ALIGNBUF
);
4012 #if USE_LRTS_MEMPOOL
4014 mempool_free_thread((char*)msg
+ sizeof(CmiChunkHeader
) - ALIGNBUF
+ sizeof(mempool_header
));
4016 mempool_free(CpvAccess(mempool
), (char*)msg
+ sizeof(CmiChunkHeader
) - ALIGNBUF
+ sizeof(mempool_header
));
4019 free((char*)msg
+ sizeof(CmiChunkHeader
) - ALIGNBUF
);
4029 if(CmiMyRank() == CmiMyNodeSize())
4031 if (print_stats
) print_comm_stats();
4034 #if USE_LRTS_MEMPOOL
4035 //printf("FINAL [%d, %d] register=%lld, send=%lld\n", myrank, CmiMyRank(), register_memory_size, buffered_send_msg);
4036 mempool_destroy(CpvAccess(mempool
));
4042 void LrtsDrainResources()
4044 if(mysize
== 1) return;
4047 !SendBufferMsg(&smsg_oob_queue
, NULL
) ||
4049 !SendBufferMsg(&smsg_queue
, NULL
)
4053 PumpDatagramConnection();
4055 PumpLocalTransactions(default_tx_cqh
, default_tx_cq_lock
);
4057 #if MULTI_THREAD_SEND
4058 PumpLocalTransactions(rdma_tx_cqh
, rdma_tx_cq_lock
);
4062 PumpLocalTransactions(highprior_rdma_tx_cqh
, rdma_tx_cq_lock
);
4066 PumpRemoteTransactions(rdma_rx_cqh
);
4068 SendRdmaMsg(sendRdmaBuf
);
4069 SendRdmaMsg(sendHighPriorBuf
);
4074 void LrtsAbort(const char *message
) {
4075 fprintf(stderr
, "[%d] CmiAbort: %s\n", myrank
, message
);
4076 CmiPrintStackTrace(0);
4077 PMI_Abort(-1, message
);
4080 /************************** TIMER FUNCTIONS **************************/
4081 #if CMK_TIMER_USE_SPECIAL
4082 /* MPI calls are not threadsafe, even the timer on some machines */
4083 static CmiNodeLock timerLock
= 0;
4084 static int _absoluteTime
= 0;
4085 static int _is_global
= 0;
4086 static struct timespec start_ts
;
4088 inline int CmiTimerIsSynchronized() {
4092 inline int CmiTimerAbsolute() {
4093 return _absoluteTime
;
4096 double CmiStartTimer() {
4100 double CmiInitTime() {
4101 return (double)(start_ts
.tv_sec
)+(double)start_ts
.tv_nsec
/1000000000.0;
4104 void CmiTimerInit(char **argv
) {
4105 _absoluteTime
= CmiGetArgFlagDesc(argv
,"+useAbsoluteTime", "Use system's absolute time as wallclock time.");
4106 if (_absoluteTime
&& CmiMyPe() == 0)
4107 printf("Charm++> absolute timer is used\n");
4109 _is_global
= CmiTimerIsSynchronized();
4113 if (CmiMyRank() == 0) {
4114 clock_gettime(CLOCK_MONOTONIC
, &start_ts
);
4116 } else { /* we don't have a synchronous timer, set our own start time */
4120 clock_gettime(CLOCK_MONOTONIC
, &start_ts
);
4122 CmiNodeAllBarrier(); /* for smp */
4126 * Since the timerLock is never created, and is
4127 * always NULL, then all the if-condition inside
4128 * the timer functions could be disabled right
4129 * now in the case of SMP.
4131 double CmiTimer(void) {
4132 struct timespec now_ts
;
4133 clock_gettime(CLOCK_MONOTONIC
, &now_ts
);
4134 return _absoluteTime
?((double)(now_ts
.tv_sec
)+(double)now_ts
.tv_nsec
/1000000000.0)
4135 : (double)( now_ts
.tv_sec
- start_ts
.tv_sec
) + (((double) now_ts
.tv_nsec
- (double) start_ts
.tv_nsec
) / 1000000000.0);
4138 double CmiWallTimer(void) {
4139 struct timespec now_ts
;
4140 clock_gettime(CLOCK_MONOTONIC
, &now_ts
);
4141 return _absoluteTime
?((double)(now_ts
.tv_sec
)+(double)now_ts
.tv_nsec
/1000000000.0)
4142 : ( now_ts
.tv_sec
- start_ts
.tv_sec
) + ((now_ts
.tv_nsec
- start_ts
.tv_nsec
) / 1000000000.0);
4145 double CmiCpuTimer(void) {
4146 struct timespec now_ts
;
4147 clock_gettime(CLOCK_MONOTONIC
, &now_ts
);
4148 return _absoluteTime
?((double)(now_ts
.tv_sec
)+(double)now_ts
.tv_nsec
/1000000000.0)
4149 : (double)( now_ts
.tv_sec
- start_ts
.tv_sec
) + (((double) now_ts
.tv_nsec
- (double) start_ts
.tv_nsec
) / 1000000000.0);
4153 /************Barrier Related Functions****************/
4157 gni_return_t status
;
4160 /* make sure all ranks reach here, otherwise comm threads may reach barrier ignoring other ranks */
4161 CmiNodeAllBarrier();
4162 if (CmiMyRank() == CmiMyNodeSize())
4164 if (CmiMyRank() == 0)
4168 * The call of CmiBarrier is usually before the initialization
4169 * of trace module of Charm++, therefore, the START_EVENT
4170 * and END_EVENT are disabled here. -Chao Mei
4173 status
= PMI_Barrier();
4174 GNI_RC_CHECK("PMI_Barrier", status
);
4177 CmiNodeAllBarrier();
4182 #include "machine-cmidirect.c"
4184 #if CMK_PERSISTENT_COMM
4185 #include "machine-persistent.c"