3 * Gemini GNI machine layer
9 * Flow control by mem pool using environment variables:
11 # CHARM_UGNI_MEMPOOL_MAX can be maximum_register_mem/number_of_processes
12 # CHARM_UGNI_SEND_MAX can be half of CHARM_UGNI_MEMPOOL_MAX
13 export CHARM_UGNI_MEMPOOL_INIT_SIZE=8M
14 export CHARM_UGNI_MEMPOOL_MAX=20M
15 export CHARM_UGNI_SEND_MAX=10M
17 # limit on total mempool size allocated, this is to prevent mempool
18 # uses too much memory
19 export CHARM_UGNI_MEMPOOL_SIZE_LIMIT=512M
21 other environment variables:
23 export CHARM_UGNI_NO_DEADLOCK_CHECK=yes # disable checking deadlock
24 export CHARM_UGNI_MAX_MEMORY_ON_NODE=0.8G # max memory per node for mempool
25 export CHARM_UGNI_BIG_MSG_SIZE=4M # set big message size protocol
26 export CHARM_UGNI_BIG_MSG_PIPELINE_LEN=4 # set big message pipe len
40 //#include <numatoolkit.h>
45 #include "cmidirect.h"
51 #define MULTI_THREAD_SEND 0
52 #define COMM_THREAD_SEND 1
56 #define CMK_WORKER_SINGLE_TASK 0
59 #define REMOTE_EVENT 0
62 #define CMI_EXERT_SEND_CAP 0
63 #define CMI_EXERT_RECV_CAP 0
65 #if CMI_EXERT_SEND_CAP
69 #if CMI_EXERT_RECV_CAP
73 #define USE_LRTS_MEMPOOL 1
77 // Trace communication thread
78 #if CMK_TRACE_ENABLED && CMK_SMP_TRACE_COMMTHREAD
79 #define TRACE_THRESHOLD 0.00005
80 #define CMI_MPI_TRACE_MOREDETAILED 0
81 #undef CMI_MPI_TRACE_USEREVENTS
82 #define CMI_MPI_TRACE_USEREVENTS 1
84 #undef CMK_SMP_TRACE_COMMTHREAD
85 #define CMK_SMP_TRACE_COMMTHREAD 0
88 #define CMK_TRACE_COMMOVERHEAD 0
89 #if CMK_TRACE_ENABLED && CMK_TRACE_COMMOVERHEAD
90 #undef CMI_MPI_TRACE_USEREVENTS
91 #define CMI_MPI_TRACE_USEREVENTS 1
93 #undef CMK_TRACE_COMMOVERHEAD
94 #define CMK_TRACE_COMMOVERHEAD 0
97 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && ! CMK_TRACE_IN_CHARM
98 CpvStaticDeclare(double, projTraceStart
);
99 #define START_EVENT() CpvAccess(projTraceStart) = CmiWallTimer();
100 #define END_EVENT(x) traceUserBracketEvent(x, CpvAccess(projTraceStart), CmiWallTimer());
102 #define START_EVENT()
108 #define oneMB (1024ll*1024)
109 #define oneGB (1024ll*1024*1024)
111 static CmiInt8 _mempool_size
= 8*oneMB
;
112 static CmiInt8 _expand_mem
= 4*oneMB
;
113 static CmiInt8 _mempool_size_limit
= 0;
115 static CmiInt8 _totalmem
= 0.8*oneGB
;
118 static CmiInt8 BIG_MSG
= 16*oneMB
;
119 static CmiInt8 ONE_SEG
= 4*oneMB
;
121 static CmiInt8 BIG_MSG
= 4*oneMB
;
122 static CmiInt8 ONE_SEG
= 2*oneMB
;
124 #if MULTI_THREAD_SEND
125 static int BIG_MSG_PIPELINE
= 1;
127 static int BIG_MSG_PIPELINE
= 4;
130 // dynamic flow control
131 static CmiInt8 buffered_send_msg
= 0;
132 static CmiInt8 register_memory_size
= 0;
135 static CmiInt8 MAX_BUFF_SEND
= 100000*oneMB
;
136 static CmiInt8 MAX_REG_MEM
= 200000*oneMB
;
137 static CmiInt8 register_count
= 0;
139 #if CMK_SMP && COMM_THREAD_SEND
140 static CmiInt8 MAX_BUFF_SEND
= 100*oneMB
;
141 static CmiInt8 MAX_REG_MEM
= 200*oneMB
;
143 static CmiInt8 MAX_BUFF_SEND
= 16*oneMB
;
144 static CmiInt8 MAX_REG_MEM
= 25*oneMB
;
150 #endif /* end USE_LRTS_MEMPOOL */
152 #if MULTI_THREAD_SEND
153 #define CMI_GNI_LOCK(x) CmiLock(x);
154 #define CMI_GNI_UNLOCK(x) CmiUnlock(x);
155 #define CMI_PCQUEUEPOP_LOCK(Q) CmiLock((Q)->lock);
156 #define CMI_PCQUEUEPOP_UNLOCK(Q) CmiUnlock((Q)->lock);
158 #define CMI_GNI_LOCK(x)
159 #define CMI_GNI_UNLOCK(x)
160 #define CMI_PCQUEUEPOP_LOCK(Q)
161 #define CMI_PCQUEUEPOP_UNLOCK(Q)
164 static int _tlbpagesize
= 4096;
166 //static int _smpd_count = 0;
168 static int user_set_flag
= 0;
170 static int _checkProgress
= 1; /* check deadlock */
171 static int _detected_hang
= 0;
173 #define SMSG_ATTR_SIZE sizeof(gni_smsg_attr_t)
176 static int useDynamicSMSG
=0; /* dynamic smsgs setup */
178 static int avg_smsg_connection
= 32;
179 static int *smsg_connected_flag
= 0;
180 static gni_smsg_attr_t
**smsg_attr_vector_local
;
181 static gni_smsg_attr_t
**smsg_attr_vector_remote
;
182 static gni_ep_handle_t ep_hndl_unbound
;
183 static gni_smsg_attr_t send_smsg_attr
;
184 static gni_smsg_attr_t recv_smsg_attr
;
186 typedef struct _dynamic_smsg_mailbox
{
190 gni_mem_handle_t mem_hndl
;
191 struct _dynamic_smsg_mailbox
*next
;
192 }dynamic_smsg_mailbox_t
;
194 static dynamic_smsg_mailbox_t
*mailbox_list
;
196 static CmiUInt8 smsg_send_count
= 0, last_smsg_send_count
= 0;
197 static CmiUInt8 smsg_recv_count
= 0, last_smsg_recv_count
= 0;
200 int lrts_send_msg_id
= 0;
201 int lrts_local_done_msg
= 0;
202 int lrts_send_rdma_success
= 0;
211 #if CMK_PERSISTENT_COMM
212 #include "machine-persistent.h"
215 //#define USE_ONESIDED 1
217 //onesided implementation is wrong, since no place to restore omdh
218 #include "onesided.h"
219 onesided_hnd_t onesided_hnd
;
221 #define MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh) omdh. onesided_mem_register(handler, (uint64_t)msg, size, 0, myomdh)
223 #define MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh) onesided_mem_deregister(handler, myomdh)
226 uint8_t onesided_hnd
, omdh
;
229 #define MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdhh, cqh, status) \
230 if(register_memory_size+size>= MAX_REG_MEM) { \
231 status = GNI_RC_ERROR_NOMEM;} \
232 else {status = GNI_MemRegister(nic_hndl, (uint64_t)msg, (uint64_t)size, cqh, GNI_MEM_READWRITE, -1, mem_hndl); \
233 if(status == GNI_RC_SUCCESS) register_memory_size += size; }
235 #define MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh, cqh, status ) \
236 if (register_memory_size + size >= MAX_REG_MEM) { \
237 status = GNI_RC_ERROR_NOMEM; \
238 } else { status = GNI_MemRegister(nic_hndl, (uint64_t)msg, (uint64_t)size, NULL, GNI_MEM_READWRITE, -1, mem_hndl); \
239 if(status == GNI_RC_SUCCESS) register_memory_size += size; }
242 #define MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh, size) \
243 do { if (GNI_MemDeregister(nic_hndl, (mem_hndl) ) == GNI_RC_SUCCESS) \
244 register_memory_size -= size; \
245 else CmiAbort("MEM_DEregister"); \
249 #define GetMempoolBlockPtr(x) (((mempool_header*)((char*)(x)-ALIGNBUF))->block_ptr)
250 #define GetMempoolPtr(x) GetMempoolBlockPtr(x)->mptr
251 #define GetMempoolsize(x) GetMempoolBlockPtr(x)->size
252 #define GetMemHndl(x) GetMempoolBlockPtr(x)->mem_hndl
253 #define IncreaseMsgInRecv(x) (GetMempoolBlockPtr(x)->msgs_in_recv)++
254 #define DecreaseMsgInRecv(x) (GetMempoolBlockPtr(x)->msgs_in_recv)--
255 #define IncreaseMsgInSend(x) (GetMempoolBlockPtr(x)->msgs_in_send)++
256 #define DecreaseMsgInSend(x) (GetMempoolBlockPtr(x)->msgs_in_send)--
257 #define NoMsgInSend(x) GetMempoolBlockPtr(x)->msgs_in_send == 0
258 #define NoMsgInRecv(x) GetMempoolBlockPtr(x)->msgs_in_recv == 0
259 #define NoMsgInFlight(x) (GetMempoolBlockPtr(x)->msgs_in_send + GetMempoolBlockPtr(x)->msgs_in_recv == 0)
260 #define IsMemHndlZero(x) ((x).qword1 == 0 && (x).qword2 == 0)
261 #define SetMemHndlZero(x) do {(x).qword1 = 0;(x).qword2 = 0;} while (0)
262 #define NotRegistered(x) IsMemHndlZero(((block_header*)x)->mem_hndl)
264 #define GetMemHndlFromBlockHeader(x) ((block_header*)x)->mem_hndl
265 #define GetSizeFromBlockHeader(x) ((block_header*)x)->size
267 #define CmiGetMsgSize(m) ((CmiMsgHeaderExt*)m)->size
268 #define CmiSetMsgSize(m,s) ((((CmiMsgHeaderExt*)m)->size)=(s))
269 #define CmiGetMsgSeq(m) ((CmiMsgHeaderExt*)m)->seq
270 #define CmiSetMsgSeq(m, s) ((((CmiMsgHeaderExt*)m)->seq) = (s))
274 /* =======Beginning of Definitions of Performance-Specific Macros =======*/
275 /* If SMSG is not used */
277 #define FMA_PER_CORE 1024
278 #define FMA_BUFFER_SIZE 1024
280 /* If SMSG is used */
281 static int SMSG_MAX_MSG
= 1024;
282 #define SMSG_MAX_CREDIT 72
284 #define MSGQ_MAXSIZE 2048
285 /* large message transfer with FMA or BTE */
286 #define LRTS_GNI_RDMA_THRESHOLD 1024
289 static int REMOTE_QUEUE_ENTRIES
=163840;
290 static int LOCAL_QUEUE_ENTRIES
=163840;
292 static int REMOTE_QUEUE_ENTRIES
=20480;
293 static int LOCAL_QUEUE_ENTRIES
=20480;
296 #define BIG_MSG_TAG 0x26
297 #define PUT_DONE_TAG 0x28
298 #define DIRECT_PUT_DONE_TAG 0x29
300 /* SMSG is data message */
301 #define SMALL_DATA_TAG 0x31
302 #define SMALL_DATA_ACK_TAG 0x32
303 /* SMSG is a control message to initialize a BTE */
304 #define LMSG_INIT_TAG 0x39
305 #define LMSG_INIT_ACK_TAG 0x3a
312 #define GNI_RC_CHECK(msg,rc) do { if(rc != GNI_RC_SUCCESS) { printf("[%d] %s; err=%s\n",CmiMyPe(),msg,gni_err_str[rc]); CmiAbort("GNI_RC_CHECK"); } } while(0)
314 #define GNI_RC_CHECK(msg,rc)
317 #define ALIGN64(x) (size_t)((~63)&((x)+63))
318 //#define ALIGN4(x) (size_t)((~3)&((x)+3))
319 #define ALIGNHUGEPAGE(x) (size_t)((~(_tlbpagesize-1))&((x)+_tlbpagesize-1))
321 static int useStaticMSGQ
= 0;
322 static int useStaticFMA
= 0;
323 static int mysize
, myrank
;
324 static gni_nic_handle_t nic_hndl
;
327 gni_mem_handle_t mdh
;
330 // this is related to dynamic SMSG
332 typedef struct mdh_addr_list
{
333 gni_mem_handle_t mdh
;
335 struct mdh_addr_list
*next
;
338 static unsigned int smsg_memlen
;
339 gni_smsg_attr_t
**smsg_local_attr_vec
= 0;
340 mdh_addr_t setup_mem
;
341 mdh_addr_t
*smsg_connection_vec
= 0;
342 gni_mem_handle_t smsg_connection_memhndl
;
343 static int smsg_expand_slots
= 10;
344 static int smsg_available_slot
= 0;
345 static void *smsg_mailbox_mempool
= 0;
346 mdh_addr_list_t
*smsg_dynamic_list
= 0;
348 static void *smsg_mailbox_base
;
349 gni_msgq_attr_t msgq_attrs
;
350 gni_msgq_handle_t msgq_handle
;
351 gni_msgq_ep_attr_t msgq_ep_attrs
;
352 gni_msgq_ep_attr_t msgq_ep_attrs_size
;
354 /* =====Beginning of Declarations of Machine Specific Variables===== */
356 static int modes
= 0;
357 static gni_cq_handle_t smsg_rx_cqh
= NULL
;
358 static gni_cq_handle_t default_tx_cqh
= NULL
;
359 static gni_cq_handle_t rdma_tx_cqh
= NULL
;
360 static gni_cq_handle_t rdma_rx_cqh
= NULL
;
361 static gni_cq_handle_t post_tx_cqh
= NULL
;
362 static gni_ep_handle_t
*ep_hndl_array
;
364 static CmiNodeLock
*ep_lock_array
;
365 static CmiNodeLock default_tx_cq_lock
;
366 static CmiNodeLock rdma_tx_cq_lock
;
367 static CmiNodeLock global_gni_lock
;
368 static CmiNodeLock rx_cq_lock
;
369 static CmiNodeLock smsg_mailbox_lock
;
370 static CmiNodeLock smsg_rx_cq_lock
;
371 static CmiNodeLock
*mempool_lock
;
373 typedef struct msg_list
380 struct msg_list
*next
;
385 typedef struct control_msg
387 uint64_t source_addr
; /* address from the start of buffer */
388 uint64_t dest_addr
; /* address from the start of buffer */
389 int total_length
; /* total length */
390 int length
; /* length of this packet */
392 int ack_index
; /* index from integer to address */
394 uint8_t seq_id
; //big message 0 meaning single message
395 gni_mem_handle_t source_mem_hndl
;
396 struct control_msg
*next
;
399 #define CONTROL_MSG_SIZE (sizeof(CONTROL_MSG)-sizeof(void*))
401 typedef struct ack_msg
403 uint64_t source_addr
; /* address from the start of buffer */
404 #if ! USE_LRTS_MEMPOOL
405 gni_mem_handle_t source_mem_hndl
;
406 int length
; /* total length */
408 struct ack_msg
*next
;
411 #define ACK_MSG_SIZE (sizeof(ACK_MSG)-sizeof(void*))
415 uint64_t handler_addr
;
419 char core
[CmiMsgHeaderSizeBytes
];
424 CpvDeclare(int, CmiHandleDirectIdx
);
425 void CmiHandleDirectMsg(cmidirectMsg
* msg
)
428 CmiDirectUserHandle
*_handle
= (CmiDirectUserHandle
*)(msg
->handler
);
429 (*(_handle
->callbackFnPtr
))(_handle
->callbackData
);
435 CpvInitialize(int, CmiHandleDirectIdx
);
436 CpvAccess(CmiHandleDirectIdx
) = CmiRegisterHandler( (CmiHandler
) CmiHandleDirectMsg
);
440 typedef struct rmda_msg
446 gni_post_descriptor_t
*pd
;
448 struct rmda_msg
*next
;
455 #define ONE_SEND_QUEUE 0
457 typedef struct msg_list_index
464 #if !ONE_SEND_QUEUE && SMP_LOCKS
465 PCQueue nonEmptyQueues
;
469 static RDMA_REQUEST
*sendRdmaBuf
= 0;
470 static RDMA_REQUEST
*sendRdmaTail
= 0;
471 typedef struct msg_list_index
474 MSG_LIST
*sendSmsgBuf
;
480 // buffered send queue
482 typedef struct smsg_queue
484 MSG_LIST_INDEX
*smsg_msglist_index
;
488 typedef struct smsg_queue
494 SMSG_QUEUE smsg_queue
;
496 SMSG_QUEUE smsg_oob_queue
;
501 #define FreeMsgList(d) free(d);
502 #define MallocMsgList(d) d = ((MSG_LIST*)malloc(sizeof(MSG_LIST)));
506 static MSG_LIST
*msglist_freelist
=0;
508 #define FreeMsgList(d) \
510 (d)->next = msglist_freelist;\
511 msglist_freelist = d; \
514 #define MallocMsgList(d) \
516 d = msglist_freelist;\
517 if (d==0) {d = ((MSG_LIST*)malloc(sizeof(MSG_LIST)));\
519 } else msglist_freelist = d->next; \
527 #define FreeControlMsg(d) free(d);
528 #define MallocControlMsg(d) d = ((CONTROL_MSG*)malloc(sizeof(CONTROL_MSG)));
532 static CONTROL_MSG
*control_freelist
=0;
534 #define FreeControlMsg(d) \
536 (d)->next = control_freelist;\
537 control_freelist = d; \
540 #define MallocControlMsg(d) \
542 d = control_freelist;\
543 if (d==0) {d = ((CONTROL_MSG*)malloc(sizeof(CONTROL_MSG)));\
545 } else control_freelist = d->next; \
552 #define FreeAckMsg(d) free(d);
553 #define MallocAckMsg(d) d = ((ACK_MSG*)malloc(sizeof(ACK_MSG)));
557 static ACK_MSG
*ack_freelist
=0;
559 #define FreeAckMsg(d) \
561 (d)->next = ack_freelist;\
565 #define MallocAckMsg(d) \
568 if (d==0) {d = ((ACK_MSG*)malloc(sizeof(ACK_MSG)));\
570 } else ack_freelist = d->next; \
577 #define FreeRdmaRequest(d) free(d);
578 #define MallocRdmaRequest(d) d = ((RDMA_REQUEST*)malloc(sizeof(RDMA_REQUEST)));
581 static RDMA_REQUEST
*rdma_freelist
= NULL
;
583 #define FreeRdmaRequest(d) \
585 (d)->next = rdma_freelist;\
589 #define MallocRdmaRequest(d) \
592 if (d==0) {d = ((RDMA_REQUEST*)malloc(sizeof(RDMA_REQUEST)));\
594 } else rdma_freelist = d->next; \
599 /* reuse gni_post_descriptor_t */
600 static gni_post_descriptor_t
*post_freelist
=0;
603 #define FreePostDesc(d) \
604 (d)->next_descr = post_freelist;\
607 #define MallocPostDesc(d) \
610 d = ((gni_post_descriptor_t*)malloc(sizeof(gni_post_descriptor_t)));\
613 } else post_freelist = d->next_descr;
616 #define FreePostDesc(d) free(d);
617 #define MallocPostDesc(d) d = ((gni_post_descriptor_t*)malloc(sizeof(gni_post_descriptor_t))); _MEMCHECK(d);
622 /* LrtsSent is called but message can not be sent by SMSGSend because of mailbox full or no credit */
623 static int buffered_smsg_counter
= 0;
625 /* SmsgSend return success but message sent is not confirmed by remote side */
626 static MSG_LIST
*buffered_fma_head
= 0;
627 static MSG_LIST
*buffered_fma_tail
= 0;
630 #define IsFree(a,ind) !( a& (1<<(ind) ))
631 #define SET_BITS(a,ind) a = ( a | (1<<(ind )) )
632 #define Reset(a,ind) a = ( a & (~(1<<(ind))) )
634 CpvDeclare(mempool_type
*, mempool
);
637 /* ack pool for remote events */
640 #define ACK_EVENT(idx) ((idx<<ACK_SHIFT) | myrank)
641 #define ACK_GET_RANK(evt) (evt & ((1<<ACK_SHIFT)-1))
642 #define ACK_GET_INDEX(evt) (evt >> ACK_SHIFT)
649 static struct AckPool
*ackpool
;
650 static int ackpoolsize
;
651 static int ackpool_freehead
;
652 static CmiNodeLock ackpool_lock
;
654 #define GetAckAddress(s) (ackpool[s].addr)
656 static void AckPool_init()
660 ackpool
= (struct AckPool
*)malloc(ackpoolsize
*sizeof(struct AckPool
));
661 for (i
=0; i
<ackpoolsize
-1; i
++) {
662 ackpool
[i
].next
= i
+1;
664 ackpool
[i
].next
= -1;
665 ackpool_freehead
= 0;
666 #if MULTI_THREAD_SEND
667 ackpool_lock
= CmiCreateLock();
672 inline int AckPool_getslot(void *addr
)
676 CMI_GNI_LOCK(ackpool_lock
);
677 s
= ackpool_freehead
;
679 // printf("[%d] AckPool_getslot expand: %d\n", myrank, ackpoolsize);
680 int newsize
= ackpoolsize
* 2;
681 if (newsize
>= 1<<(32-ACK_SHIFT
)) CmiAbort("AckPool too large");
682 struct AckPool
*old_ackpool
= ackpool
;
683 ackpool
= (struct AckPool
*)malloc(newsize
*sizeof(struct AckPool
));
684 memcpy(ackpool
, old_ackpool
, ackpoolsize
*sizeof(struct AckPool
));
685 for (i
=ackpoolsize
; i
<newsize
-1; i
++) {
686 ackpool
[i
].next
= i
+1;
688 ackpool
[i
].next
= -1;
689 ackpool_freehead
= ackpoolsize
;
691 ackpoolsize
= newsize
;
694 ackpool_freehead
= ackpool
[s
].next
;
695 ackpool
[s
].addr
= addr
;
696 CMI_GNI_UNLOCK(ackpool_lock
);
701 inline void AckPool_freeslot(int s
)
703 CmiAssert(s
>=0 && s
<ackpoolsize
);
704 CMI_GNI_LOCK(ackpool_lock
);
705 ackpool
[s
].next
= ackpool_freehead
;
706 ackpool_freehead
= s
;
707 CMI_GNI_UNLOCK(ackpool_lock
);
714 typedef struct comm_thread_stats
716 int count_in_send_buffered_ack
;
717 double time_in_send_buffered_ack
;
718 double max_time_in_send_buffered_ack
;
719 int count_in_send_buffered_smsg
;
720 double time_in_send_buffered_smsg
;
721 double max_time_in_send_buffered_smsg
;
724 static Comm_Thread_Stats comm_stats
;
726 static void init_comm_stats()
728 memset(&comm_stats
, 0, sizeof(Comm_Thread_Stats
));
731 #define STATS_ACK_TIME(x) \
732 { double t = CmiWallTimer(); \
734 t = CmiWallTimer() - t; \
735 comm_stats.count_in_send_buffered_ack ++; \
736 comm_stats.time_in_send_buffered_ack += t; \
737 if (t>comm_stats.max_time_in_send_buffered_ack) \
738 comm_stats.max_time_in_send_buffered_ack = t; \
741 #define STATS_SEND_SMSGS_TIME(x) \
742 { double t = CmiWallTimer(); \
744 t = CmiWallTimer() - t; \
745 comm_stats.count_in_send_buffered_smsg ++; \
746 comm_stats.time_in_send_buffered_smsg += t; \
747 if (t>comm_stats.max_time_in_send_buffered_smsg) \
748 comm_stats.max_time_in_send_buffered_smsg = t; \
751 static void print_comm_stats()
754 printf("PE[%d] count\ttime\tmax \n", myrank
);
755 printf("PE[%d] send buffered ack: %d\t%f\t%f\n", myrank
, comm_stats
.count_in_send_buffered_ack
, comm_stats
.time_in_send_buffered_ack
, comm_stats
.max_time_in_send_buffered_ack
);
756 printf("PE[%d] send smsgs: %d\t%f\t%f\n", myrank
, comm_stats
.count_in_send_buffered_smsg
, comm_stats
.time_in_send_buffered_smsg
, comm_stats
.max_time_in_send_buffered_smsg
);
759 #define STATS_ACK_TIME(x) x
760 #define STATS_SEND_SMSGS_TIME(x) x
763 static int print_stats
= 0;
766 allgather(void *in
,void *out
, int len
)
768 static int *ivec_ptr
=NULL
,already_called
=0,job_size
=0;
771 char *tmp_buf
,*out_ptr
;
773 if(!already_called
) {
775 rc
= PMI_Get_size(&job_size
);
776 CmiAssert(rc
== PMI_SUCCESS
);
777 rc
= PMI_Get_rank(&my_rank
);
778 CmiAssert(rc
== PMI_SUCCESS
);
780 ivec_ptr
= (int *)malloc(sizeof(int) * job_size
);
781 CmiAssert(ivec_ptr
!= NULL
);
783 rc
= PMI_Allgather(&my_rank
,ivec_ptr
,sizeof(int));
784 CmiAssert(rc
== PMI_SUCCESS
);
790 tmp_buf
= (char *)malloc(job_size
* len
);
793 rc
= PMI_Allgather(in
,tmp_buf
,len
);
794 CmiAssert(rc
== PMI_SUCCESS
);
798 for(i
=0;i
<job_size
;i
++) {
800 memcpy(&out_ptr
[len
* ivec_ptr
[i
]],&tmp_buf
[i
* len
],len
);
808 allgather_2(void *in
,void *out
, int len
)
810 //PMI_Allgather is out of order
811 int i
,rc
, extend_len
;
813 char *out_ptr
, *out_ref
;
816 extend_len
= sizeof(int) + len
;
817 in2
= (char*)malloc(extend_len
);
819 memcpy(in2
, &myrank
, sizeof(int));
820 memcpy(in2
+sizeof(int), in
, len
);
822 out_ptr
= (char*)malloc(mysize
*extend_len
);
824 rc
= PMI_Allgather(in2
, out_ptr
, extend_len
);
825 GNI_RC_CHECK("allgather", rc
);
829 for(i
=0;i
<mysize
;i
++) {
831 memcpy(&rank_index
, &(out_ptr
[extend_len
*i
]), sizeof(int));
832 //copy to the rank index slot
833 memcpy(&out_ref
[rank_index
*len
], &out_ptr
[extend_len
*i
+sizeof(int)], len
);
841 static unsigned int get_gni_nic_address(int device_id
)
843 unsigned int address
, cpu_id
;
845 int i
, alps_dev_id
=-1,alps_address
=-1;
848 p_ptr
= getenv("PMI_GNI_DEV_ID");
850 status
= GNI_CdmGetNicAddress(device_id
, &address
, &cpu_id
);
852 GNI_RC_CHECK("GNI_CdmGetNicAddress", status
);
854 while ((token
= strtok(p_ptr
,":")) != NULL
) {
855 alps_dev_id
= atoi(token
);
856 if (alps_dev_id
== device_id
) {
861 CmiAssert(alps_dev_id
!= -1);
862 p_ptr
= getenv("PMI_GNI_LOC_ADDR");
863 CmiAssert(p_ptr
!= NULL
);
865 while ((token
= strtok(p_ptr
,":")) != NULL
) {
866 if (i
== alps_dev_id
) {
867 alps_address
= atoi(token
);
873 CmiAssert(alps_address
!= -1);
874 address
= alps_address
;
879 static uint8_t get_ptag(void)
884 p_ptr
= getenv("PMI_GNI_PTAG");
885 CmiAssert(p_ptr
!= NULL
);
886 token
= strtok(p_ptr
, ":");
887 ptag
= (uint8_t)atoi(token
);
892 static uint32_t get_cookie(void)
897 p_ptr
= getenv("PMI_GNI_COOKIE");
898 CmiAssert(p_ptr
!= NULL
);
899 token
= strtok(p_ptr
, ":");
900 cookie
= (uint32_t)atoi(token
);
907 /* directly mmap memory from hugetlbfs for large pages */
909 #include <sys/stat.h>
911 #include <sys/mman.h>
912 #include <hugetlbfs.h>
914 // size must be _tlbpagesize aligned
915 void *my_get_huge_pages(size_t size
)
919 mode_t mode
= S_IRUSR
| S_IWUSR
| S_IRGRP
| S_IROTH
;
922 snprintf(filename
, sizeof(filename
), "%s/charm_mempool.%d.%d", hugetlbfs_find_path_for_size(_tlbpagesize
), getpid(), rand());
923 fd
= open(filename
, O_RDWR
| O_CREAT
, mode
);
925 CmiAbort("my_get_huge_pages: open filed");
927 ptr
= mmap(NULL
, size
, PROT_READ
| PROT_WRITE
, MAP_PRIVATE
, fd
, 0);
928 if (ptr
== MAP_FAILED
) ptr
= NULL
;
929 //printf("[%d] my_get_huge_pages: %s %d %p\n", myrank, filename, size, ptr);
935 void my_free_huge_pages(void *ptr
, int size
)
937 //printf("[%d] my_free_huge_pages: %p %d\n", myrank, ptr, size);
938 int ret
= munmap(ptr
, size
);
939 if (ret
== -1) CmiAbort("munmap failed in my_free_huge_pages");
944 /* =====Beginning of Definitions of Message-Corruption Related Macros=====*/
945 /* TODO: add any that are related */
946 /* =====End of Definitions of Message-Corruption Related Macros=====*/
949 #include "machine-lrts.h"
950 #include "machine-common-core.c"
952 /* Network progress function is used to poll the network when for
953 messages. This flushes receive buffers on some implementations*/
954 #if CMK_MACHINE_PROGRESS_DEFINED
955 void CmiMachineProgressImpl() {
959 static int SendBufferMsg(SMSG_QUEUE
*queue
);
960 static void SendRdmaMsg();
961 static void PumpNetworkSmsg();
962 static void PumpLocalTransactions(gni_cq_handle_t tx_cqh
, CmiNodeLock cq_lock
);
964 static void PumpCqWriteTransactions();
967 static void PumpRemoteTransactions();
970 #if MACHINE_DEBUG_LOG
971 FILE *debugLog
= NULL
;
972 static CmiInt8 buffered_recv_msg
= 0;
973 int lrts_smsg_success
= 0;
974 int lrts_received_msg
= 0;
977 static void sweep_mempool(mempool_type
*mptr
)
980 block_header
*current
= &(mptr
->block_head
);
982 printf("[n %d %d] sweep_mempool slot START.\n", myrank
, n
++);
983 while( current
!= NULL
) {
984 printf("[n %d %d] sweep_mempool slot %p size: %d (%d %d) %lld %lld.\n", myrank
, n
++, current
, current
->size
, current
->msgs_in_send
, current
->msgs_in_recv
, current
->mem_hndl
.qword1
, current
->mem_hndl
.qword2
);
985 current
= current
->block_next
?(block_header
*)((char*)mptr
+current
->block_next
):NULL
;
987 printf("[n %d] sweep_mempool slot END.\n", myrank
);
991 static gni_return_t
deregisterMemory(mempool_type
*mptr
, block_header
**from
)
993 block_header
*current
= *from
;
995 //while(register_memory_size>= MAX_REG_MEM)
997 while( current
!= NULL
&& ((current
->msgs_in_send
+current
->msgs_in_recv
)>0 || IsMemHndlZero(current
->mem_hndl
) ))
998 current
= current
->block_next
?(block_header
*)((char*)mptr
+current
->block_next
):NULL
;
1001 if(current
== NULL
) return GNI_RC_ERROR_RESOURCE
;
1002 MEMORY_DEREGISTER(onesided_hnd
, nic_hndl
, &(GetMemHndlFromBlockHeader(current
)) , &omdh
, GetSizeFromBlockHeader(current
));
1003 SetMemHndlZero(GetMemHndlFromBlockHeader(current
));
1005 return GNI_RC_SUCCESS
;
1009 static gni_return_t
registerFromMempool(mempool_type
*mptr
, void *blockaddr
, size_t size
, gni_mem_handle_t
*memhndl
, gni_cq_handle_t cqh
)
1011 gni_return_t status
= GNI_RC_SUCCESS
;
1012 //int size = GetMempoolsize(msg);
1013 //void *blockaddr = GetMempoolBlockPtr(msg);
1014 //gni_mem_handle_t *memhndl = &(GetMemHndl(msg));
1016 block_header
*current
= &(mptr
->block_head
);
1017 while(register_memory_size
>= MAX_REG_MEM
)
1019 status
= deregisterMemory(mptr
, ¤t
);
1020 if (status
!= GNI_RC_SUCCESS
) break;
1022 if(register_memory_size
>= MAX_REG_MEM
) return status
;
1024 MACHSTATE3(8, "mempool (%lld,%lld,%d) \n", buffered_send_msg
, buffered_recv_msg
, register_memory_size
);
1027 MEMORY_REGISTER(onesided_hnd
, nic_hndl
, blockaddr
, size
, memhndl
, &omdh
, cqh
, status
);
1028 if(status
== GNI_RC_SUCCESS
)
1032 else if (status
== GNI_RC_INVALID_PARAM
|| status
== GNI_RC_PERMISSION_ERROR
)
1034 CmiAbort("Memory registor for mempool fails\n");
1038 status
= deregisterMemory(mptr
, ¤t
);
1039 if (status
!= GNI_RC_SUCCESS
) break;
1046 static gni_return_t
registerMemory(void *msg
, size_t size
, gni_mem_handle_t
*t
, gni_cq_handle_t cqh
)
1048 static int rank
= -1;
1050 gni_return_t status
;
1051 mempool_type
*mptr1
= CpvAccess(mempool
);//mempool_type*)GetMempoolPtr(msg);
1052 //mempool_type *mptr1 = (mempool_type*)GetMempoolPtr(msg);
1055 status
= registerFromMempool(mptr1
, msg
, size
, t
, cqh
);
1056 if (status
== GNI_RC_SUCCESS
) return status
;
1058 for (i
=0; i
<CmiMyNodeSize()+1; i
++) {
1059 rank
= (rank
+1)%(CmiMyNodeSize()+1);
1060 mptr
= CpvAccessOther(mempool
, rank
);
1061 if (mptr
== mptr1
) continue;
1062 status
= registerFromMempool(mptr
, msg
, size
, t
, cqh
);
1063 if (status
== GNI_RC_SUCCESS
) return status
;
1066 return GNI_RC_ERROR_RESOURCE
;
1070 static void buffer_small_msgs(SMSG_QUEUE
*queue
, void *msg
, int size
, int destNode
, uint8_t tag
)
1073 MallocMsgList(msg_tmp
);
1074 msg_tmp
->destNode
= destNode
;
1075 msg_tmp
->size
= size
;
1080 if (queue
->smsg_msglist_index
[destNode
].sendSmsgBuf
== 0 ) {
1081 queue
->smsg_msglist_index
[destNode
].next
= queue
->smsg_head_index
;
1082 queue
->smsg_head_index
= destNode
;
1083 queue
->smsg_msglist_index
[destNode
].tail
= queue
->smsg_msglist_index
[destNode
].sendSmsgBuf
= msg_tmp
;
1086 queue
->smsg_msglist_index
[destNode
].tail
->next
= msg_tmp
;
1087 queue
->smsg_msglist_index
[destNode
].tail
= msg_tmp
;
1091 PCQueuePush(queue
->sendMsgBuf
, (char*)msg_tmp
);
1094 CmiLock(queue
->smsg_msglist_index
[destNode
].lock
);
1095 if(queue
->smsg_msglist_index
[destNode
].pushed
== 0)
1097 PCQueuePush(nonEmptyQueues
, (char*)&(queue
->smsg_msglist_index
[destNode
]));
1099 PCQueuePush(queue
->smsg_msglist_index
[destNode
].sendSmsgBuf
, (char*)msg_tmp
);
1100 CmiUnlock(queue
->smsg_msglist_index
[destNode
].lock
);
1102 PCQueuePush(queue
->smsg_msglist_index
[destNode
].sendSmsgBuf
, (char*)msg_tmp
);
1107 buffered_smsg_counter
++;
1111 inline static void print_smsg_attr(gni_smsg_attr_t
*a
)
1113 printf("type=%d\n, credit=%d\n, size=%d\n, buf=%p, offset=%d\n", a
->msg_type
, a
->mbox_maxcredit
, a
->buff_size
, a
->msg_buffer
, a
->mbox_offset
);
1117 static void setup_smsg_connection(int destNode
)
1119 mdh_addr_list_t
*new_entry
= 0;
1120 gni_post_descriptor_t
*pd
;
1121 gni_smsg_attr_t
*smsg_attr
;
1122 gni_return_t status
= GNI_RC_NOT_DONE
;
1123 RDMA_REQUEST
*rdma_request_msg
;
1125 if(smsg_available_slot
== smsg_expand_slots
)
1127 new_entry
= (mdh_addr_list_t
*)malloc(sizeof(mdh_addr_list_t
));
1128 new_entry
->addr
= memalign(64, smsg_memlen
*smsg_expand_slots
);
1129 bzero(new_entry
->addr
, smsg_memlen
*smsg_expand_slots
);
1131 status
= GNI_MemRegister(nic_hndl
, (uint64_t)new_entry
->addr
,
1132 smsg_memlen
*smsg_expand_slots
, smsg_rx_cqh
,
1136 smsg_available_slot
= 0;
1137 new_entry
->next
= smsg_dynamic_list
;
1138 smsg_dynamic_list
= new_entry
;
1140 smsg_attr
= (gni_smsg_attr_t
*) malloc (sizeof(gni_smsg_attr_t
));
1141 smsg_attr
->msg_type
= GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT
;
1142 smsg_attr
->mbox_maxcredit
= SMSG_MAX_CREDIT
;
1143 smsg_attr
->msg_maxsize
= SMSG_MAX_MSG
;
1144 smsg_attr
->mbox_offset
= smsg_available_slot
* smsg_memlen
;
1145 smsg_attr
->buff_size
= smsg_memlen
;
1146 smsg_attr
->msg_buffer
= smsg_dynamic_list
->addr
;
1147 smsg_attr
->mem_hndl
= smsg_dynamic_list
->mdh
;
1148 smsg_local_attr_vec
[destNode
] = smsg_attr
;
1149 smsg_available_slot
++;
1151 pd
->type
= GNI_POST_FMA_PUT
;
1152 //pd->cq_mode = GNI_CQMODE_GLOBAL_EVENT | GNI_CQMODE_REMOTE_EVENT;
1153 pd
->cq_mode
= GNI_CQMODE_GLOBAL_EVENT
;
1154 pd
->dlvr_mode
= GNI_DLVMODE_PERFORMANCE
;
1155 pd
->length
= sizeof(gni_smsg_attr_t
);
1156 pd
->local_addr
= (uint64_t) smsg_attr
;
1157 pd
->remote_addr
= (uint64_t)&((((gni_smsg_attr_t
*)(smsg_connection_vec
[destNode
].addr
))[myrank
]));
1158 pd
->remote_mem_hndl
= smsg_connection_vec
[destNode
].mdh
;
1159 pd
->src_cq_hndl
= rdma_tx_cqh
;
1161 status
= GNI_PostFma(ep_hndl_array
[destNode
], pd
);
1162 print_smsg_attr(smsg_attr
);
1163 if(status
== GNI_RC_ERROR_RESOURCE
)
1165 MallocRdmaRequest(rdma_request_msg
);
1166 rdma_request_msg
->destNode
= destNode
;
1167 rdma_request_msg
->pd
= pd
;
1168 /* buffer this request */
1171 if(status
!= GNI_RC_SUCCESS
)
1172 printf("[%d=%d] send post FMA %s\n", myrank
, destNode
, gni_err_str
[status
]);
1174 printf("[%d=%d]OK send post FMA \n", myrank
, destNode
);
1178 /* useDynamicSMSG */
1180 static void alloc_smsg_attr( gni_smsg_attr_t
*local_smsg_attr
)
1182 gni_return_t status
= GNI_RC_NOT_DONE
;
1184 if(mailbox_list
->offset
== mailbox_list
->size
)
1186 dynamic_smsg_mailbox_t
*new_mailbox_entry
;
1187 new_mailbox_entry
= (dynamic_smsg_mailbox_t
*)malloc(sizeof(dynamic_smsg_mailbox_t
));
1188 new_mailbox_entry
->size
= smsg_memlen
*avg_smsg_connection
;
1189 new_mailbox_entry
->mailbox_base
= malloc(new_mailbox_entry
->size
);
1190 bzero(new_mailbox_entry
->mailbox_base
, new_mailbox_entry
->size
);
1191 new_mailbox_entry
->offset
= 0;
1193 status
= GNI_MemRegister(nic_hndl
, (uint64_t)new_mailbox_entry
->mailbox_base
,
1194 new_mailbox_entry
->size
, smsg_rx_cqh
,
1197 &(new_mailbox_entry
->mem_hndl
));
1199 GNI_RC_CHECK("register", status
);
1200 new_mailbox_entry
->next
= mailbox_list
;
1201 mailbox_list
= new_mailbox_entry
;
1203 local_smsg_attr
->msg_type
= GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT
;
1204 local_smsg_attr
->mbox_maxcredit
= SMSG_MAX_CREDIT
;
1205 local_smsg_attr
->msg_maxsize
= SMSG_MAX_MSG
;
1206 local_smsg_attr
->mbox_offset
= mailbox_list
->offset
;
1207 mailbox_list
->offset
+= smsg_memlen
;
1208 local_smsg_attr
->buff_size
= smsg_memlen
;
1209 local_smsg_attr
->msg_buffer
= mailbox_list
->mailbox_base
;
1210 local_smsg_attr
->mem_hndl
= mailbox_list
->mem_hndl
;
1213 /* useDynamicSMSG */
1215 static int connect_to(int destNode
)
1217 gni_return_t status
= GNI_RC_NOT_DONE
;
1218 CmiAssert(smsg_connected_flag
[destNode
] == 0);
1219 CmiAssert (smsg_attr_vector_local
[destNode
] == NULL
);
1220 smsg_attr_vector_local
[destNode
] = (gni_smsg_attr_t
*) malloc (sizeof(gni_smsg_attr_t
));
1221 alloc_smsg_attr(smsg_attr_vector_local
[destNode
]);
1222 smsg_attr_vector_remote
[destNode
] = (gni_smsg_attr_t
*) malloc (sizeof(gni_smsg_attr_t
));
1224 CMI_GNI_LOCK(global_gni_lock
)
1225 status
= GNI_EpPostDataWId (ep_hndl_array
[destNode
], smsg_attr_vector_local
[destNode
], sizeof(gni_smsg_attr_t
),smsg_attr_vector_remote
[destNode
] ,sizeof(gni_smsg_attr_t
), destNode
+mysize
);
1226 CMI_GNI_UNLOCK(global_gni_lock
)
1227 if (status
== GNI_RC_ERROR_RESOURCE
) {
1228 /* possibly destNode is making connection at the same time */
1229 free(smsg_attr_vector_local
[destNode
]);
1230 smsg_attr_vector_local
[destNode
] = NULL
;
1231 free(smsg_attr_vector_remote
[destNode
]);
1232 smsg_attr_vector_remote
[destNode
] = NULL
;
1233 mailbox_list
->offset
-= smsg_memlen
;
1236 GNI_RC_CHECK("GNI_Post", status
);
1237 smsg_connected_flag
[destNode
] = 1;
1242 static gni_return_t
send_smsg_message(SMSG_QUEUE
*queue
, int destNode
, void *msg
, int size
, uint8_t tag
, int inbuff
)
1244 unsigned int remote_address
;
1246 gni_return_t status
= GNI_RC_ERROR_RESOURCE
;
1247 gni_smsg_attr_t
*smsg_attr
;
1248 gni_post_descriptor_t
*pd
;
1249 gni_post_state_t post_state
;
1252 if (useDynamicSMSG
) {
1253 switch (smsg_connected_flag
[destNode
]) {
1255 connect_to(destNode
); /* continue to case 1 */
1256 case 1: /* pending connection, do nothing */
1257 status
= GNI_RC_NOT_DONE
;
1259 buffer_small_msgs(queue
, msg
, size
, destNode
, tag
);
1264 #if ! ONE_SEND_QUEUE
1265 if(PCQueueEmpty(queue
->smsg_msglist_index
[destNode
].sendSmsgBuf
) || inbuff
==1)
1269 if(queue
->smsg_msglist_index
[destNode
].sendSmsgBuf
== 0 || inbuff
==1)
1272 uint64_t *buf
= NULL
;
1274 //CMI_GNI_LOCK(smsg_mailbox_lock)
1275 CMI_GNI_LOCK(default_tx_cq_lock
)
1276 #if CMK_SMP_TRACE_COMMTHREAD
1278 int oldeventid
= -1;
1279 if(tag
== SMALL_DATA_TAG
|| tag
== LMSG_INIT_TAG
)
1282 if ( tag
== SMALL_DATA_TAG
)
1283 real_data
= (char*)msg
;
1285 real_data
= (char*)(((CONTROL_MSG
*)msg
)->source_addr
);
1286 TRACE_COMM_GET_MSGID(real_data
, &oldpe
, &oldeventid
);
1287 TRACE_COMM_SET_COMM_MSGID(real_data
);
1291 if (tag
== LMSG_INIT_TAG
) {
1292 CONTROL_MSG
*control_msg_tmp
= (CONTROL_MSG
*)msg
;
1293 if (control_msg_tmp
->seq_id
== 0 && control_msg_tmp
->ack_index
== -1)
1294 control_msg_tmp
->ack_index
= AckPool_getslot((void*)control_msg_tmp
->source_addr
);
1296 // GNI_EpSetEventData(ep_hndl_array[destNode], destNode, myrank);
1298 status
= GNI_SmsgSendWTag(ep_hndl_array
[destNode
], buf
, bufsize
, msg
, size
, 0, tag
);
1299 #if CMK_SMP_TRACE_COMMTHREAD
1300 if (oldpe
!= -1) TRACE_COMM_SET_MSGID(real_data
, oldpe
, oldeventid
);
1302 CMI_GNI_UNLOCK(default_tx_cq_lock
)
1303 //CMI_GNI_UNLOCK(smsg_mailbox_lock)
1304 if(status
== GNI_RC_SUCCESS
)
1306 #if CMK_SMP_TRACE_COMMTHREAD
1307 if(tag
== SMALL_DATA_TAG
|| tag
== LMSG_INIT_TAG
|| tag
== SMALL_DATA_ACK_TAG
|| tag
== LMSG_INIT_ACK_TAG
)
1309 TRACE_COMM_CREATION(CpvAccess(projTraceStart
), real_data
);
1314 status
= GNI_RC_ERROR_RESOURCE
;
1316 if(status
!= GNI_RC_SUCCESS
&& inbuff
==0)
1317 buffer_small_msgs(queue
, msg
, size
, destNode
, tag
);
1322 static CONTROL_MSG
* construct_control_msg(int size
, char *msg
, uint8_t seqno
)
1324 /* construct a control message and send */
1325 CONTROL_MSG
*control_msg_tmp
;
1326 MallocControlMsg(control_msg_tmp
);
1327 control_msg_tmp
->source_addr
= (uint64_t)msg
;
1328 control_msg_tmp
->seq_id
= seqno
;
1329 control_msg_tmp
->total_length
= control_msg_tmp
->length
= ALIGN64(size
); //for GET 4 bytes aligned
1331 control_msg_tmp
->ack_index
= -1;
1333 #if USE_LRTS_MEMPOOL
1336 control_msg_tmp
->source_mem_hndl
= GetMemHndl(msg
);
1340 SetMemHndlZero(control_msg_tmp
->source_mem_hndl
);
1341 control_msg_tmp
->length
= size
- (seqno
-1)*ONE_SEG
;
1342 if (control_msg_tmp
->length
> ONE_SEG
) control_msg_tmp
->length
= ONE_SEG
;
1345 SetMemHndlZero(control_msg_tmp
->source_mem_hndl
);
1347 return control_msg_tmp
;
1350 #define BLOCKING_SEND_CONTROL 0
1352 // Large message, send control to receiver, receiver register memory and do a GET,
1353 // return 1 - send no success
1355 static gni_return_t
send_large_messages(SMSG_QUEUE
*queue
, int destNode
, CONTROL_MSG
*control_msg_tmp
, int inbuff
)
1357 gni_return_t status
= GNI_RC_ERROR_NOMEM
;
1358 uint32_t vmdh_index
= -1;
1361 uint64_t source_addr
;
1365 size
= control_msg_tmp
->total_length
;
1366 source_addr
= control_msg_tmp
->source_addr
;
1367 register_size
= control_msg_tmp
->length
;
1369 #if USE_LRTS_MEMPOOL
1370 if( control_msg_tmp
->seq_id
== 0 ){
1371 #if BLOCKING_SEND_CONTROL
1372 if (inbuff
== 0 && IsMemHndlZero(GetMemHndl(source_addr
))) {
1373 while (IsMemHndlZero(GetMemHndl(source_addr
)) && buffered_send_msg
+ GetMempoolsize((void*)source_addr
) >= MAX_BUFF_SEND
)
1374 LrtsAdvanceCommunication(0);
1377 if(IsMemHndlZero(GetMemHndl(source_addr
))) //it is in mempool, it is possible to be de-registered by others
1379 msg
= (void*)source_addr
;
1380 if(buffered_send_msg
+ GetMempoolsize(msg
) >= MAX_BUFF_SEND
)
1383 buffer_small_msgs(queue
, control_msg_tmp
, CONTROL_MSG_SIZE
, destNode
, LMSG_INIT_TAG
);
1384 return GNI_RC_ERROR_NOMEM
;
1386 //register the corresponding mempool
1387 status
= registerMemory(GetMempoolBlockPtr(msg
), GetMempoolsize(msg
), &(GetMemHndl(msg
)), rdma_rx_cqh
);
1388 if(status
== GNI_RC_SUCCESS
)
1390 control_msg_tmp
->source_mem_hndl
= GetMemHndl(source_addr
);
1394 control_msg_tmp
->source_mem_hndl
= GetMemHndl(source_addr
);
1395 status
= GNI_RC_SUCCESS
;
1397 if(NoMsgInSend(source_addr
))
1398 register_size
= GetMempoolsize((void*)(source_addr
));
1401 }else if(control_msg_tmp
->seq_id
>0) // BIG_MSG
1403 int offset
= ONE_SEG
*(control_msg_tmp
->seq_id
-1);
1404 source_addr
+= offset
;
1405 size
= control_msg_tmp
->length
;
1406 #if BLOCKING_SEND_CONTROL
1407 if (inbuff
== 0 && IsMemHndlZero(control_msg_tmp
->source_mem_hndl
)) {
1408 while (IsMemHndlZero(control_msg_tmp
->source_mem_hndl
) && buffered_send_msg
+ size
>= MAX_BUFF_SEND
)
1409 LrtsAdvanceCommunication(0);
1412 if (IsMemHndlZero(control_msg_tmp
->source_mem_hndl
)) {
1413 if(buffered_send_msg
+ size
>= MAX_BUFF_SEND
)
1416 buffer_small_msgs(queue
, control_msg_tmp
, CONTROL_MSG_SIZE
, destNode
, LMSG_INIT_TAG
);
1417 return GNI_RC_ERROR_NOMEM
;
1419 status
= registerMemory((void*)source_addr
, ALIGN64(size
), &(control_msg_tmp
->source_mem_hndl
), NULL
);
1420 if(status
== GNI_RC_SUCCESS
) buffered_send_msg
+= ALIGN64(size
);
1424 status
= GNI_RC_SUCCESS
;
1429 if(status
== GNI_RC_SUCCESS
)
1431 status
= send_smsg_message(queue
, destNode
, control_msg_tmp
, CONTROL_MSG_SIZE
, LMSG_INIT_TAG
, inbuff
);
1432 if(status
== GNI_RC_SUCCESS
)
1434 buffered_send_msg
+= register_size
;
1435 if(control_msg_tmp
->seq_id
== 0)
1437 IncreaseMsgInSend(source_addr
);
1439 FreeControlMsg(control_msg_tmp
);
1440 MACHSTATE5(8, "GO SMSG LARGE to %d (%d,%d,%d) tag=%d\n", destNode
, buffered_send_msg
, buffered_recv_msg
, register_memory_size
, LMSG_INIT_TAG
);
1442 status
= GNI_RC_ERROR_RESOURCE
;
1444 } else if (status
== GNI_RC_INVALID_PARAM
|| status
== GNI_RC_PERMISSION_ERROR
)
1446 CmiAbort("Memory registor for large msg\n");
1449 status
= GNI_RC_ERROR_NOMEM
;
1451 buffer_small_msgs(queue
, control_msg_tmp
, CONTROL_MSG_SIZE
, destNode
, LMSG_INIT_TAG
);
1455 MEMORY_REGISTER(onesided_hnd
, nic_hndl
,msg
, ALIGN64(size
), &(control_msg_tmp
->source_mem_hndl
), &omdh
, NULL
, status
)
1456 if(status
== GNI_RC_SUCCESS
)
1458 status
= send_smsg_message(queue
, destNode
, control_msg_tmp
, CONTROL_MSG_SIZE
, LMSG_INIT_TAG
, 0);
1459 if(status
== GNI_RC_SUCCESS
)
1461 FreeControlMsg(control_msg_tmp
);
1463 } else if (status
== GNI_RC_INVALID_PARAM
|| status
== GNI_RC_PERMISSION_ERROR
)
1465 CmiAbort("Memory registor for large msg\n");
1468 buffer_small_msgs(queue
, control_msg_tmp
, CONTROL_MSG_SIZE
, destNode
, LMSG_INIT_TAG
);
1474 inline void LrtsPrepareEnvelope(char *msg
, int size
)
1476 CmiSetMsgSize(msg
, size
);
1479 CmiCommHandle
LrtsSendFunc(int destNode
, int size
, char *msg
, int mode
)
1481 gni_return_t status
= GNI_RC_SUCCESS
;
1483 CONTROL_MSG
*control_msg_tmp
;
1484 int oob
= ( mode
& OUT_OF_BAND
);
1487 MACHSTATE5(8, "GO LrtsSendFn %d(%d) (%d,%d, %d) \n", destNode
, size
, buffered_send_msg
, buffered_recv_msg
, register_memory_size
);
1489 queue
= oob
? &smsg_oob_queue
: &smsg_queue
;
1491 queue
= &smsg_queue
;
1494 LrtsPrepareEnvelope(msg
, size
);
1497 printf("LrtsSendFn %d==>%d, size=%d\n", myrank
, destNode
, size
);
1500 if(size
<= SMSG_MAX_MSG
)
1501 buffer_small_msgs(queue
, msg
, size
, destNode
, SMALL_DATA_TAG
);
1502 else if (size
< BIG_MSG
) {
1503 control_msg_tmp
= construct_control_msg(size
, msg
, 0);
1504 buffer_small_msgs(queue
, control_msg_tmp
, CONTROL_MSG_SIZE
, destNode
, LMSG_INIT_TAG
);
1507 CmiSetMsgSeq(msg
, 0);
1508 control_msg_tmp
= construct_control_msg(size
, msg
, 1);
1509 buffer_small_msgs(queue
, control_msg_tmp
, CONTROL_MSG_SIZE
, destNode
, LMSG_INIT_TAG
);
1511 #else //non-smp, smp(worker sending)
1512 if(size
<= SMSG_MAX_MSG
)
1514 if (GNI_RC_SUCCESS
== send_smsg_message(queue
, destNode
, msg
, size
, SMALL_DATA_TAG
, 0))
1517 else if (size
< BIG_MSG
) {
1518 control_msg_tmp
= construct_control_msg(size
, msg
, 0);
1519 send_large_messages(queue
, destNode
, control_msg_tmp
, 0);
1522 #if USE_LRTS_MEMPOOL
1523 CmiSetMsgSeq(msg
, 0);
1524 control_msg_tmp
= construct_control_msg(size
, msg
, 1);
1525 send_large_messages(queue
, destNode
, control_msg_tmp
, 0);
1527 control_msg_tmp
= construct_control_msg(size
, msg
, 0);
1528 send_large_messages(queue
, destNode
, control_msg_tmp
, 0);
1535 static void PumpDatagramConnection();
1536 static int event_SetupConnect
= 111;
1537 static int event_PumpSmsg
= 222 ;
1538 static int event_PumpTransaction
= 333;
1539 static int event_PumpRdmaTransaction
= 444;
1540 static int event_SendBufferSmsg
= 444;
1541 static int event_SendFmaRdmaMsg
= 555;
1542 static int event_AdvanceCommunication
= 666;
1544 static void registerUserTraceEvents() {
1545 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1546 event_SetupConnect
= traceRegisterUserEvent("setting up connections", -1 );
1547 event_PumpSmsg
= traceRegisterUserEvent("Pump network small msgs", -1);
1548 event_PumpTransaction
= traceRegisterUserEvent("Pump FMA local transaction" , -1);
1549 event_PumpRdmaTransaction
= traceRegisterUserEvent("Pump RDMA local transaction" , -1);
1550 event_SendBufferSmsg
= traceRegisterUserEvent("Sending buffered small msgs", -1);
1551 event_SendFmaRdmaMsg
= traceRegisterUserEvent("Sending buffered fma/rdma transactions", -1);
1552 event_AdvanceCommunication
= traceRegisterUserEvent("Worker thread in sending/receiving", -1);
1556 static void ProcessDeadlock()
1558 static CmiUInt8
*ptr
= NULL
;
1559 static CmiUInt8 last
= 0, mysum
, sum
;
1560 static int count
= 0;
1561 gni_return_t status
;
1564 //printf("[%d] comm thread detected hang %d %d %d\n", CmiMyPe(), smsg_send_count, smsg_recv_count, count);
1565 //sweep_mempool(CpvAccess(mempool));
1566 if (ptr
== NULL
) ptr
= (CmiUInt8
*)malloc(mysize
* sizeof(CmiUInt8
));
1567 mysum
= smsg_send_count
+ smsg_recv_count
;
1568 MACHSTATE5(9,"Before allgather Progress Deadlock (%d,%d) (%d,%d)(%d)\n", buffered_send_msg
, register_memory_size
, last
, sum
, count
);
1569 status
= PMI_Allgather(&mysum
,ptr
,sizeof(CmiUInt8
));
1570 GNI_RC_CHECK("PMI_Allgather", status
);
1572 for (i
=0; i
<mysize
; i
++) sum
+= ptr
[i
];
1573 if (last
== 0 || sum
== last
)
1578 MACHSTATE5(9,"Progress Deadlock (%d,%d) (%d,%d)(%d)\n", buffered_send_msg
, register_memory_size
, last
, sum
, count
);
1580 /* detected twice, it is a real deadlock */
1582 CmiPrintf("Charm++> Network progress engine appears to have stalled, possibly because registered memory limits have been exceeded or are too low. Try adjusting environment variables CHARM_UGNI_MEMPOOL_MAX and CHARM_UGNI_SEND_MAX (current limits are %lld and %lld).\n", MAX_REG_MEM
, MAX_BUFF_SEND
);
1583 CmiAbort("Fatal> Deadlock detected.");
1590 static void CheckProgress()
1592 if (smsg_send_count
== last_smsg_send_count
&&
1593 smsg_recv_count
== last_smsg_recv_count
)
1597 if (_detected_hang
) ProcessDeadlock();
1602 //MACHSTATE5(9,"--Check Progress %d(%d, %d) (%d,%d)\n", mycount, buffered_send_msg, register_memory_size, smsg_send_count, smsg_recv_count);
1603 last_smsg_send_count
= smsg_send_count
;
1604 last_smsg_recv_count
= smsg_recv_count
;
1609 static void set_limit()
1611 //if (!user_set_flag && CmiMyRank() == 0) {
1612 if (CmiMyRank() == 0) {
1613 int mynode
= CmiPhysicalNodeID(CmiMyPe());
1614 int numpes
= CmiNumPesOnPhysicalNode(mynode
);
1615 int numprocesses
= numpes
/ CmiMyNodeSize();
1616 MAX_REG_MEM
= _totalmem
/ numprocesses
;
1617 MAX_BUFF_SEND
= MAX_REG_MEM
/ 2;
1619 printf("mem_max = %lld, send_max =%lld\n", MAX_REG_MEM
, MAX_BUFF_SEND
);
1623 void LrtsPostCommonInit(int everReturn
)
1628 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1629 CpvInitialize(double, projTraceStart
);
1630 /* only PE 0 needs to care about registration (to generate sts file). */
1631 //if (CmiMyPe() == 0)
1633 registerMachineUserEventsFunction(®isterUserTraceEvents
);
1638 CmiIdleState
*s
=CmiNotifyGetState();
1639 CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE
,(CcdVoidFn
)CmiNotifyBeginIdle
,(void *)s
);
1640 CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE
,(CcdVoidFn
)CmiNotifyStillIdle
,(void *)s
);
1642 CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE
,(CcdVoidFn
)CmiNotifyStillIdle
,NULL
);
1644 CcdCallOnConditionKeep(CcdPERIODIC_10ms
, (CcdVoidFn
) PumpDatagramConnection
, NULL
);
1650 if (CmiMyRank() == 0)
1652 CcdCallOnConditionKeep(CcdPERIODIC_2minute
, (CcdVoidFn
) CheckProgress
, NULL
);
1656 CcdCallOnCondition(CcdTOPOLOGY_AVAIL
, (CcdVoidFn
)set_limit
, NULL
);
1660 /* this is called by worker thread */
1661 void LrtsPostNonLocal(){
1662 #if CMK_SMP_TRACE_COMMTHREAD
1663 double startT
, endT
;
1665 #if MULTI_THREAD_SEND
1666 if(mysize
== 1) return;
1667 #if CMK_SMP_TRACE_COMMTHREAD
1671 #if CMK_SMP_TRACE_COMMTHREAD
1672 startT
= CmiWallTimer();
1675 #if CMK_WORKER_SINGLE_TASK
1676 if (CmiMyRank() % 6 == 0)
1680 #if CMK_WORKER_SINGLE_TASK
1681 if (CmiMyRank() % 6 == 1)
1683 PumpLocalTransactions(default_tx_cqh
, default_tx_cq_lock
);
1685 #if CMK_WORKER_SINGLE_TASK
1686 if (CmiMyRank() % 6 == 2)
1688 PumpLocalTransactions(rdma_tx_cqh
, rdma_tx_cq_lock
);
1691 #if CMK_WORKER_SINGLE_TASK
1692 if (CmiMyRank() % 6 == 3)
1694 PumpRemoteTransactions();
1698 if (SendBufferMsg(&smsg_oob_queue
) == 1)
1701 #if CMK_WORKER_SINGLE_TASK
1702 if (CmiMyRank() % 6 == 4)
1704 SendBufferMsg(&smsg_queue
);
1707 #if CMK_WORKER_SINGLE_TASK
1708 if (CmiMyRank() % 6 == 5)
1712 #if CMK_SMP_TRACE_COMMTHREAD
1713 endT
= CmiWallTimer();
1714 traceUserBracketEvent(event_AdvanceCommunication
, startT
, endT
);
1716 #if CMK_SMP_TRACE_COMMTHREAD
1722 /* useDynamicSMSG */
1723 static void PumpDatagramConnection()
1725 uint32_t remote_address
;
1727 gni_return_t status
;
1728 gni_post_state_t post_state
;
1729 uint64_t datagram_id
;
1732 while ((status
= GNI_PostDataProbeById(nic_hndl
, &datagram_id
)) == GNI_RC_SUCCESS
)
1734 if (datagram_id
>= mysize
) { /* bound endpoint */
1735 int pe
= datagram_id
- mysize
;
1736 CMI_GNI_LOCK(global_gni_lock
)
1737 status
= GNI_EpPostDataTestById( ep_hndl_array
[pe
], datagram_id
, &post_state
, &remote_address
, &remote_id
);
1738 CMI_GNI_UNLOCK(global_gni_lock
)
1739 if(status
== GNI_RC_SUCCESS
&& post_state
== GNI_POST_COMPLETED
)
1741 CmiAssert(remote_id
== pe
);
1742 status
= GNI_SmsgInit(ep_hndl_array
[pe
], smsg_attr_vector_local
[pe
], smsg_attr_vector_remote
[pe
]);
1743 GNI_RC_CHECK("Dynamic SMSG Init", status
);
1745 printf("++ Dynamic SMSG setup [%d===>%d] done\n", myrank
, pe
);
1747 CmiAssert(smsg_connected_flag
[pe
] == 1);
1748 smsg_connected_flag
[pe
] = 2;
1751 else { /* unbound ep */
1752 status
= GNI_EpPostDataTestById( ep_hndl_unbound
, datagram_id
, &post_state
, &remote_address
, &remote_id
);
1753 if(status
== GNI_RC_SUCCESS
&& post_state
== GNI_POST_COMPLETED
)
1755 CmiAssert(remote_id
<mysize
);
1756 CmiAssert(smsg_connected_flag
[remote_id
] <= 0);
1757 status
= GNI_SmsgInit(ep_hndl_array
[remote_id
], &send_smsg_attr
, &recv_smsg_attr
);
1758 GNI_RC_CHECK("Dynamic SMSG Init", status
);
1760 printf("++ Dynamic SMSG setup2 [%d===>%d] done\n", myrank
, remote_id
);
1762 smsg_connected_flag
[remote_id
] = 2;
1764 alloc_smsg_attr(&send_smsg_attr
);
1765 status
= GNI_EpPostDataWId (ep_hndl_unbound
, &send_smsg_attr
, SMSG_ATTR_SIZE
, &recv_smsg_attr
, SMSG_ATTR_SIZE
, myrank
);
1766 GNI_RC_CHECK("post unbound datagram", status
);
1772 /* pooling CQ to receive network message */
1773 static void PumpNetworkRdmaMsgs()
1775 gni_cq_entry_t event_data
;
1776 gni_return_t status
;
1781 static void bufferRdmaMsg(int inst_id
, gni_post_descriptor_t
*pd
, int ack_index
)
1783 RDMA_REQUEST
*rdma_request_msg
;
1784 MallocRdmaRequest(rdma_request_msg
);
1785 rdma_request_msg
->destNode
= inst_id
;
1786 rdma_request_msg
->pd
= pd
;
1788 rdma_request_msg
->ack_index
= ack_index
;
1791 PCQueuePush(sendRdmaBuf
, (char*)rdma_request_msg
);
1793 if(sendRdmaBuf
== 0)
1795 sendRdmaBuf
= sendRdmaTail
= rdma_request_msg
;
1797 sendRdmaTail
->next
= rdma_request_msg
;
1798 sendRdmaTail
= rdma_request_msg
;
1804 static void getLargeMsgRequest(void* header
, uint64_t inst_id
);
1806 static void PumpNetworkSmsg()
1810 gni_cq_entry_t event_data
;
1811 gni_return_t status
, status2
;
1816 gni_mem_handle_t msg_mem_hndl
;
1817 gni_smsg_attr_t
*smsg_attr
;
1818 gni_smsg_attr_t
*remote_smsg_attr
;
1820 CONTROL_MSG
*control_msg_tmp
, *header_tmp
;
1821 uint64_t source_addr
;
1822 SMSG_QUEUE
*queue
= &smsg_queue
;
1824 cmidirectMsg
*direct_msg
;
1828 CMI_GNI_LOCK(smsg_rx_cq_lock
)
1829 status
=GNI_CqGetEvent(smsg_rx_cqh
, &event_data
);
1830 CMI_GNI_UNLOCK(smsg_rx_cq_lock
)
1831 if(status
!= GNI_RC_SUCCESS
)
1833 inst_id
= GNI_CQ_GET_INST_ID(event_data
);
1835 inst_id
= ACK_GET_RANK(inst_id
);
1837 // GetEvent returns success but GetNext return not_done. caused by Smsg out-of-order transfer
1839 printf("[%d] %d PumpNetworkMsgs is received from PE: %d, status=%s\n", myrank
, CmiMyRank(), inst_id
, gni_err_str
[status
]);
1841 if (useDynamicSMSG
) {
1842 /* subtle: smsg may come before connection is setup */
1843 while (smsg_connected_flag
[inst_id
] != 2)
1844 PumpDatagramConnection();
1846 msg_tag
= GNI_SMSG_ANY_TAG
;
1848 CMI_GNI_LOCK(smsg_mailbox_lock
)
1849 status
= GNI_SmsgGetNextWTag(ep_hndl_array
[inst_id
], &header
, &msg_tag
);
1850 if (status
!= GNI_RC_SUCCESS
)
1852 CMI_GNI_UNLOCK(smsg_mailbox_lock
)
1856 printf("[%d] from %d request for Large msg is received, messageid: tag=%d\n", myrank
, inst_id
, msg_tag
);
1858 /* copy msg out and then put into queue (small message) */
1860 case SMALL_DATA_TAG
:
1863 msg_nbytes
= CmiGetMsgSize(header
);
1864 msg_data
= CmiAlloc(msg_nbytes
);
1865 memcpy(msg_data
, (char*)header
, msg_nbytes
);
1866 GNI_SmsgRelease(ep_hndl_array
[inst_id
]);
1867 CMI_GNI_UNLOCK(smsg_mailbox_lock
)
1868 TRACE_COMM_CREATION(CpvAccess(projTraceStart
), msg_data
);
1869 handleOneRecvedMsg(msg_nbytes
, msg_data
);
1874 #if MULTI_THREAD_SEND
1875 MallocControlMsg(control_msg_tmp
);
1876 memcpy(control_msg_tmp
, header
, CONTROL_MSG_SIZE
);
1877 GNI_SmsgRelease(ep_hndl_array
[inst_id
]);
1878 CMI_GNI_UNLOCK(smsg_mailbox_lock
)
1879 getLargeMsgRequest(control_msg_tmp
, inst_id
);
1880 FreeControlMsg(control_msg_tmp
);
1882 CMI_GNI_UNLOCK(smsg_mailbox_lock
)
1883 getLargeMsgRequest(header
, inst_id
);
1884 GNI_SmsgRelease(ep_hndl_array
[inst_id
]);
1888 case ACK_TAG
: //msg fit into mempool
1890 /* Get is done, release message . Now put is not used yet*/
1891 void *msg
= (void*)(((ACK_MSG
*)header
)->source_addr
);
1892 GNI_SmsgRelease(ep_hndl_array
[inst_id
]);
1893 CMI_GNI_UNLOCK(smsg_mailbox_lock
)
1894 #if ! USE_LRTS_MEMPOOL
1895 MEMORY_DEREGISTER(onesided_hnd
, nic_hndl
, &(((ACK_MSG
*)header
)->source_mem_hndl
), &omdh
, ((ACK_MSG
*)header
)->length
);
1897 DecreaseMsgInSend(msg
);
1899 if(NoMsgInSend(msg
))
1900 buffered_send_msg
-= GetMempoolsize(msg
);
1901 MACHSTATE5(8, "GO send done to %d (%d,%d, %d) tag=%d\n", inst_id
, buffered_send_msg
, buffered_recv_msg
, register_memory_size
, msg_tag
);
1905 case BIG_MSG_TAG
: //big msg, de-register, transfer next seg
1907 #if MULTI_THREAD_SEND
1908 MallocControlMsg(header_tmp
);
1909 memcpy(header_tmp
, header
, CONTROL_MSG_SIZE
);
1910 GNI_SmsgRelease(ep_hndl_array
[inst_id
]);
1912 header_tmp
= (CONTROL_MSG
*) header
;
1914 CMI_GNI_UNLOCK(smsg_mailbox_lock
)
1915 void *msg
= (void*)(header_tmp
->source_addr
);
1916 int cur_seq
= CmiGetMsgSeq(msg
);
1917 int offset
= ONE_SEG
*(cur_seq
+1);
1918 MEMORY_DEREGISTER(onesided_hnd
, nic_hndl
, &(header_tmp
->source_mem_hndl
), &omdh
, header_tmp
->length
);
1919 buffered_send_msg
-= header_tmp
->length
;
1920 int remain_size
= CmiGetMsgSize(msg
) - header_tmp
->length
;
1921 if (remain_size
< 0) remain_size
= 0;
1922 CmiSetMsgSize(msg
, remain_size
);
1923 if(remain_size
<= 0) //transaction done
1926 }else if (header_tmp
->total_length
> offset
)
1928 CmiSetMsgSeq(msg
, cur_seq
+1);
1929 control_msg_tmp
= construct_control_msg(header_tmp
->total_length
, msg
, cur_seq
+1+1);
1930 control_msg_tmp
->dest_addr
= header_tmp
->dest_addr
;
1932 send_large_messages(queue
, inst_id
, control_msg_tmp
, 0);
1934 if (header_tmp
->seq_id
== 1) {
1936 for (i
=1; i
<BIG_MSG_PIPELINE
; i
++) {
1937 int seq
= cur_seq
+i
+2;
1938 CmiSetMsgSeq(msg
, seq
-1);
1939 control_msg_tmp
= construct_control_msg(header_tmp
->total_length
, (char *)msg
, seq
);
1940 control_msg_tmp
->dest_addr
= header_tmp
->dest_addr
;
1941 send_large_messages(queue
, inst_id
, control_msg_tmp
, 0);
1942 if (header_tmp
->total_length
<= ONE_SEG
*seq
) break;
1946 #if MULTI_THREAD_SEND
1947 FreeControlMsg(header_tmp
);
1949 GNI_SmsgRelease(ep_hndl_array
[inst_id
]);
1953 #if CMK_PERSISTENT_COMM
1954 case PUT_DONE_TAG
: //persistent message
1955 void *msg
= (void *)((CONTROL_MSG
*) header
)->source_addr
;
1956 int size
= ((CONTROL_MSG
*) header
)->length
;
1957 GNI_SmsgRelease(ep_hndl_array
[inst_id
]);
1958 CMI_GNI_UNLOCK(smsg_mailbox_lock
)
1960 handleOneRecvedMsg(size
, msg
);
1964 case DIRECT_PUT_DONE_TAG
: //cmi direct
1965 //create a trigger message
1966 direct_msg
= (cmidirectMsg
*)CmiAlloc(sizeof(cmidirectMsg
));
1967 direct_msg
->handler
= ((CMK_DIRECT_HEADER
*)header
)->handler_addr
;
1968 GNI_SmsgRelease(ep_hndl_array
[inst_id
]);
1969 CMI_GNI_UNLOCK(smsg_mailbox_lock
)
1970 CmiSetHandler(direct_msg
, CpvAccess(CmiHandleDirectIdx
));
1971 CmiPushPE(((CmiDirectUserHandle
*)direct_msg
->handler
)->remoteRank
, direct_msg
);
1972 //(*(((CMK_DIRECT_HEADER*) header)->callbackFnPtr))(((CMK_DIRECT_HEADER*) header)->callbackData);
1976 GNI_SmsgRelease(ep_hndl_array
[inst_id
]);
1977 CMI_GNI_UNLOCK(smsg_mailbox_lock
)
1978 printf("weird tag problem\n");
1979 CmiAbort("Unknown tag\n");
1983 printf("[%d] from %d after switch request for Large msg is received, messageid: tag=%d\n", myrank
, inst_id
, msg_tag
);
1986 msg_tag
= GNI_SMSG_ANY_TAG
;
1987 } //endwhile getNext
1988 } //end while GetEvent
1989 if(status
== GNI_RC_ERROR_RESOURCE
)
1991 printf("charm> Please use +useRecvQueue 204800 in your command line, if the error comes again, increase this number\n");
1992 GNI_RC_CHECK("Smsg_rx_cq full", status
);
1996 static void printDesc(gni_post_descriptor_t
*pd
)
1998 printf(" Descriptor (%p===>%p)(%d)\n", pd
->local_addr
, pd
->remote_addr
, pd
->length
);
2002 static void sendCqWrite(int destNode
, uint64_t data
, gni_mem_handle_t mem_hndl
)
2004 gni_post_descriptor_t
*pd
;
2005 gni_return_t status
= GNI_RC_SUCCESS
;
2008 pd
->type
= GNI_POST_CQWRITE
;
2009 pd
->cq_mode
= GNI_CQMODE_GLOBAL_EVENT
| GNI_CQMODE_REMOTE_EVENT
;
2010 pd
->dlvr_mode
= GNI_DLVMODE_PERFORMANCE
;
2011 pd
->cqwrite_value
= data
;
2012 pd
->remote_mem_hndl
= mem_hndl
;
2013 status
= GNI_PostCqWrite(ep_hndl_array
[destNode
], pd
);
2014 GNI_RC_CHECK("GNI_PostCqWrite", status
);
2018 // for BIG_MSG called on receiver side for receiving control message
2020 static void getLargeMsgRequest(void* header
, uint64_t inst_id
)
2022 #if USE_LRTS_MEMPOOL
2023 CONTROL_MSG
*request_msg
;
2024 gni_return_t status
= GNI_RC_SUCCESS
;
2026 gni_post_descriptor_t
*pd
;
2027 gni_mem_handle_t msg_mem_hndl
;
2028 int source
, size
, transaction_size
, offset
= 0;
2029 size_t register_size
= 0;
2031 // initial a get to transfer data from the sender side */
2032 request_msg
= (CONTROL_MSG
*) header
;
2033 size
= request_msg
->total_length
;
2034 MACHSTATE4(8, "GO Get request from %d (%d,%d, %d) \n", inst_id
, buffered_send_msg
, buffered_recv_msg
, register_memory_size
);
2036 if(request_msg
->seq_id
< 2) {
2037 msg_data
= CmiAlloc(size
);
2038 CmiSetMsgSeq(msg_data
, 0);
2039 _MEMCHECK(msg_data
);
2040 #if CMK_SMP_TRACE_COMMTHREAD
2041 pd
->second_operand
= 1000000 * CmiWallTimer(); //microsecond
2045 offset
= ONE_SEG
*(request_msg
->seq_id
-1);
2046 msg_data
= (char*)request_msg
->dest_addr
+ offset
;
2049 pd
->cqwrite_value
= request_msg
->seq_id
;
2050 if( request_msg
->seq_id
== 0)
2052 pd
->local_mem_hndl
= GetMemHndl(msg_data
);
2053 transaction_size
= ALIGN64(size
);
2054 if(IsMemHndlZero(pd
->local_mem_hndl
))
2056 status
= registerMemory( GetMempoolBlockPtr(msg_data
), GetMempoolsize(msg_data
), &(GetMemHndl(msg_data
)), rdma_rx_cqh
);
2057 if(status
== GNI_RC_SUCCESS
)
2059 pd
->local_mem_hndl
= GetMemHndl(msg_data
);
2063 SetMemHndlZero(pd
->local_mem_hndl
);
2066 if(NoMsgInRecv( (void*)(msg_data
)))
2067 register_size
= GetMempoolsize((void*)(msg_data
));
2072 transaction_size
= ALIGN64(request_msg
->length
);
2073 status
= registerMemory(msg_data
, transaction_size
, &(pd
->local_mem_hndl
), NULL
);
2074 if (status
== GNI_RC_INVALID_PARAM
|| status
== GNI_RC_PERMISSION_ERROR
)
2076 GNI_RC_CHECK("Invalid/permission Mem Register in post", status
);
2079 pd
->first_operand
= ALIGN64(size
); // total length
2081 if(request_msg
->total_length
<= LRTS_GNI_RDMA_THRESHOLD
)
2082 pd
->type
= GNI_POST_FMA_GET
;
2084 pd
->type
= GNI_POST_RDMA_GET
;
2085 pd
->cq_mode
= GNI_CQMODE_GLOBAL_EVENT
;
2086 pd
->dlvr_mode
= GNI_DLVMODE_PERFORMANCE
;
2087 pd
->length
= transaction_size
;
2088 pd
->local_addr
= (uint64_t) msg_data
;
2089 pd
->remote_addr
= request_msg
->source_addr
+ offset
;
2090 pd
->remote_mem_hndl
= request_msg
->source_mem_hndl
;
2091 pd
->src_cq_hndl
= rdma_tx_cqh
;
2095 //memory registration success
2096 if(status
== GNI_RC_SUCCESS
)
2098 CmiNodeLock lock
= pd
->type
== GNI_POST_RDMA_GET
?rdma_tx_cq_lock
:default_tx_cq_lock
;
2101 if( request_msg
->seq_id
== 0)
2103 pd
->cq_mode
|= GNI_CQMODE_REMOTE_EVENT
;
2104 int sts
= GNI_EpSetEventData(ep_hndl_array
[inst_id
], inst_id
, ACK_EVENT(request_msg
->ack_index
));
2105 GNI_RC_CHECK("GNI_EpSetEventData", sts
);
2108 int sts
= GNI_EpSetEventData(ep_hndl_array
[inst_id
], inst_id
, myrank
);
2109 GNI_RC_CHECK("GNI_EpSetEventData", sts
);
2112 if(pd
->type
== GNI_POST_RDMA_GET
)
2114 status
= GNI_PostRdma(ep_hndl_array
[inst_id
], pd
);
2118 status
= GNI_PostFma(ep_hndl_array
[inst_id
], pd
);
2120 CMI_GNI_UNLOCK(lock
)
2122 if(status
== GNI_RC_SUCCESS
)
2124 if(pd
->cqwrite_value
== 0)
2126 #if MACHINE_DEBUG_LOG
2127 buffered_recv_msg
+= register_size
;
2128 MACHSTATE4(8, "GO request from %d (%d,%d, %d)\n", inst_id
, buffered_send_msg
, buffered_recv_msg
, register_memory_size
);
2130 IncreaseMsgInRecv(msg_data
);
2135 SetMemHndlZero((pd
->local_mem_hndl
));
2137 if(status
== GNI_RC_ERROR_RESOURCE
|| status
== GNI_RC_ERROR_NOMEM
)
2140 bufferRdmaMsg(inst_id
, pd
, request_msg
->ack_index
);
2142 bufferRdmaMsg(inst_id
, pd
, -1);
2145 //printf("source: %d pd:(%p,%p)(%p,%p)\n", source, (pd->local_mem_hndl).qword1, (pd->local_mem_hndl).qword2, (pd->remote_mem_hndl).qword1, (pd->remote_mem_hndl).qword2);
2146 GNI_RC_CHECK("GetLargeAFter posting", status
);
2149 CONTROL_MSG
*request_msg
;
2150 gni_return_t status
;
2152 gni_post_descriptor_t
*pd
;
2153 RDMA_REQUEST
*rdma_request_msg
;
2154 gni_mem_handle_t msg_mem_hndl
;
2156 // initial a get to transfer data from the sender side */
2157 request_msg
= (CONTROL_MSG
*) header
;
2158 msg_data
= CmiAlloc(request_msg
->length
);
2159 _MEMCHECK(msg_data
);
2161 MEMORY_REGISTER(onesided_hnd
, nic_hndl
, msg_data
, request_msg
->length
, &msg_mem_hndl
, &omdh
, NULL
, status
)
2163 if (status
== GNI_RC_INVALID_PARAM
|| status
== GNI_RC_PERMISSION_ERROR
)
2165 GNI_RC_CHECK("Invalid/permission Mem Register in post", status
);
2169 if(request_msg
->length
<= LRTS_GNI_RDMA_THRESHOLD
)
2170 pd
->type
= GNI_POST_FMA_GET
;
2172 pd
->type
= GNI_POST_RDMA_GET
;
2173 pd
->cq_mode
= GNI_CQMODE_GLOBAL_EVENT
;// | GNI_CQMODE_REMOTE_EVENT;
2174 pd
->dlvr_mode
= GNI_DLVMODE_PERFORMANCE
;
2175 pd
->length
= ALIGN64(request_msg
->length
);
2176 pd
->local_addr
= (uint64_t) msg_data
;
2177 pd
->remote_addr
= request_msg
->source_addr
;
2178 pd
->remote_mem_hndl
= request_msg
->source_mem_hndl
;
2179 pd
->src_cq_hndl
= rdma_tx_cqh
;
2183 //memory registration successful
2184 if(status
== GNI_RC_SUCCESS
)
2186 pd
->local_mem_hndl
= msg_mem_hndl
;
2188 if(pd
->type
== GNI_POST_RDMA_GET
)
2190 CMI_GNI_LOCK(rdma_tx_cq_lock
)
2191 status
= GNI_PostRdma(ep_hndl_array
[inst_id
], pd
);
2192 CMI_GNI_UNLOCK(rdma_tx_cq_lock
)
2196 CMI_GNI_LOCK(default_tx_cq_lock
)
2197 status
= GNI_PostFma(ep_hndl_array
[inst_id
], pd
);
2198 CMI_GNI_UNLOCK(default_tx_cq_lock
)
2203 SetMemHndlZero(pd
->local_mem_hndl
);
2205 if(status
== GNI_RC_ERROR_RESOURCE
|| status
== GNI_RC_ERROR_NOMEM
)
2207 MallocRdmaRequest(rdma_request_msg
);
2208 rdma_request_msg
->next
= 0;
2209 rdma_request_msg
->destNode
= inst_id
;
2210 rdma_request_msg
->pd
= pd
;
2211 PCQueuePush(sendRdmaBuf
, (char*)rdma_request_msg
);
2213 GNI_RC_CHECK("AFter posting", status
);
2219 static void PumpCqWriteTransactions()
2223 gni_return_t status
;
2226 //CMI_GNI_LOCK(my_cq_lock)
2227 status
= GNI_CqGetEvent(rdma_rx_cqh
, &ev
);
2228 //CMI_GNI_UNLOCK(my_cq_lock)
2229 if(status
!= GNI_RC_SUCCESS
) break;
2230 msg
= (void*) ( GNI_CQ_GET_DATA(ev
) & 0xFFFFFFFFFFFFL
);
2232 DecreaseMsgInSend(msg
);
2233 #if ! USE_LRTS_MEMPOOL
2234 // MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((ACK_MSG *)header)->source_mem_hndl), &omdh, ((ACK_MSG *)header)->length);
2236 DecreaseMsgInSend(msg
);
2238 if(NoMsgInSend(msg
))
2239 buffered_send_msg
-= GetMempoolsize(msg
);
2242 if(status
== GNI_RC_ERROR_RESOURCE
)
2244 GNI_RC_CHECK("rdma_rx_cq full too many ack", status
);
2250 static void PumpRemoteTransactions()
2253 gni_return_t status
;
2258 CMI_GNI_LOCK(global_gni_lock
)
2259 status
= GNI_CqGetEvent(rdma_rx_cqh
, &ev
);
2260 CMI_GNI_UNLOCK(global_gni_lock
)
2261 if(status
!= GNI_RC_SUCCESS
) {
2265 slot
= GNI_CQ_GET_INST_ID(ev
);
2266 slot
= ACK_GET_INDEX(slot
);
2267 //slot = GNI_CQ_GET_DATA(ev) & 0xFFFFFFFFL;
2269 //CMI_GNI_LOCK(ackpool_lock);
2270 msg
= GetAckAddress(slot
);
2271 //CMI_GNI_UNLOCK(ackpool_lock);
2273 DecreaseMsgInSend(msg
);
2274 #if ! USE_LRTS_MEMPOOL
2275 // MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((ACK_MSG *)header)->source_mem_hndl), &omdh, ((ACK_MSG *)header)->length);
2277 DecreaseMsgInSend(msg
);
2279 if(NoMsgInSend(msg
))
2280 buffered_send_msg
-= GetMempoolsize(msg
);
2282 AckPool_freeslot(slot
);
2284 if(status
== GNI_RC_ERROR_RESOURCE
)
2286 GNI_RC_CHECK("rdma_rx_cq full too many ack", status
);
2291 static void PumpLocalTransactions(gni_cq_handle_t my_tx_cqh
, CmiNodeLock my_cq_lock
)
2294 gni_return_t status
;
2295 uint64_t type
, inst_id
;
2296 gni_post_descriptor_t
*tmp_pd
;
2298 CONTROL_MSG
*ack_msg_tmp
;
2302 CMK_DIRECT_HEADER
*cmk_direct_done_msg
;
2304 SMSG_QUEUE
*queue
= &smsg_queue
;
2307 CMI_GNI_LOCK(my_cq_lock
)
2308 status
= GNI_CqGetEvent(my_tx_cqh
, &ev
);
2309 CMI_GNI_UNLOCK(my_cq_lock
)
2310 if(status
!= GNI_RC_SUCCESS
) break;
2312 type
= GNI_CQ_GET_TYPE(ev
);
2313 if (type
== GNI_CQ_EVENT_TYPE_POST
)
2315 inst_id
= GNI_CQ_GET_INST_ID(ev
);
2317 printf("[%d] LocalTransactions localdone=%d\n", myrank
, lrts_local_done_msg
);
2319 CMI_GNI_LOCK(my_cq_lock
)
2320 status
= GNI_GetCompleted(my_tx_cqh
, ev
, &tmp_pd
);
2321 CMI_GNI_UNLOCK(my_cq_lock
)
2323 switch (tmp_pd
->type
) {
2324 #if CMK_PERSISTENT_COMM || CMK_DIRECT
2325 case GNI_POST_RDMA_PUT
:
2326 #if CMK_PERSISTENT_COMM && ! USE_LRTS_MEMPOOL
2327 MEMORY_DEREGISTER(onesided_hnd
, nic_hndl
, &tmp_pd
->local_mem_hndl
, &omdh
, tmp_pd
->length
);
2329 case GNI_POST_FMA_PUT
:
2330 if(tmp_pd
->amo_cmd
== 1) {
2331 //sender ACK to receiver to trigger it is done
2332 cmk_direct_done_msg
= (CMK_DIRECT_HEADER
*) malloc(sizeof(CMK_DIRECT_HEADER
));
2333 cmk_direct_done_msg
->handler_addr
= tmp_pd
->first_operand
;
2334 msg_tag
= DIRECT_PUT_DONE_TAG
;
2337 CmiFree((void *)tmp_pd
->local_addr
);
2338 MallocControlMsg(ack_msg_tmp
);
2339 ack_msg_tmp
->source_addr
= tmp_pd
->remote_addr
;
2340 ack_msg_tmp
->source_mem_hndl
= tmp_pd
->remote_mem_hndl
;
2341 msg_tag
= PUT_DONE_TAG
;
2345 case GNI_POST_RDMA_GET
:
2346 case GNI_POST_FMA_GET
: {
2347 #if ! USE_LRTS_MEMPOOL
2348 MallocControlMsg(ack_msg_tmp
);
2349 ack_msg_tmp
->source_addr
= tmp_pd
->remote_addr
;
2350 ack_msg_tmp
->source_mem_hndl
= tmp_pd
->remote_mem_hndl
;
2351 MEMORY_DEREGISTER(onesided_hnd
, nic_hndl
, &tmp_pd
->local_mem_hndl
, &omdh
, tmp_pd
->length
)
2354 int seq_id
= tmp_pd
->cqwrite_value
;
2355 if(seq_id
> 0) // BIG_MSG
2357 MEMORY_DEREGISTER(onesided_hnd
, nic_hndl
, &tmp_pd
->local_mem_hndl
, &omdh
, tmp_pd
->length
);
2358 MallocControlMsg(ack_msg_tmp
);
2359 ack_msg_tmp
->source_addr
= tmp_pd
->remote_addr
;
2360 ack_msg_tmp
->source_mem_hndl
= tmp_pd
->remote_mem_hndl
;
2361 ack_msg_tmp
->seq_id
= seq_id
;
2362 ack_msg_tmp
->dest_addr
= tmp_pd
->local_addr
- ONE_SEG
*(ack_msg_tmp
->seq_id
-1);
2363 ack_msg_tmp
->source_addr
-= ONE_SEG
*(ack_msg_tmp
->seq_id
-1);
2364 ack_msg_tmp
->length
= tmp_pd
->length
;
2365 ack_msg_tmp
->total_length
= tmp_pd
->first_operand
; // total size
2366 msg_tag
= BIG_MSG_TAG
;
2371 #if !REMOTE_EVENT && !CQWRITE
2372 MallocAckMsg(ack_msg
);
2373 ack_msg
->source_addr
= tmp_pd
->remote_addr
;
2379 case GNI_POST_CQWRITE
:
2380 FreePostDesc(tmp_pd
);
2383 CmiPrintf("type=%d\n", tmp_pd
->type
);
2384 CmiAbort("PumpLocalTransactions: unknown type!");
2385 } /* end of switch */
2388 if (tmp_pd
->amo_cmd
== 1) {
2389 status
= send_smsg_message(queue
, inst_id
, cmk_direct_done_msg
, sizeof(CMK_DIRECT_HEADER
), msg_tag
, 0);
2390 if (status
== GNI_RC_SUCCESS
) free(cmk_direct_done_msg
);
2394 if (msg_tag
== ACK_TAG
) {
2397 status
= send_smsg_message(queue
, inst_id
, ack_msg
, ACK_MSG_SIZE
, msg_tag
, 0);
2398 if (status
== GNI_RC_SUCCESS
) FreeAckMsg(ack_msg
);
2400 sendCqWrite(inst_id
, tmp_pd
->remote_addr
, tmp_pd
->remote_mem_hndl
);
2405 status
= send_smsg_message(queue
, inst_id
, ack_msg_tmp
, CONTROL_MSG_SIZE
, msg_tag
, 0);
2406 if (status
== GNI_RC_SUCCESS
) FreeControlMsg(ack_msg_tmp
);
2408 #if CMK_PERSISTENT_COMM
2409 if (tmp_pd
->type
== GNI_POST_RDMA_GET
|| tmp_pd
->type
== GNI_POST_FMA_GET
)
2412 if( msg_tag
== ACK_TAG
){ //msg fit in mempool
2414 printf("Normal msg transaction PE:%d==>%d\n", myrank
, inst_id
);
2416 TRACE_COMM_CONTROL_CREATION((double)(tmp_pd
->second_operand
/1000000.0), (double)((tmp_pd
->second_operand
+1)/1000000.0), (double)((tmp_pd
->second_operand
+1)/1000000.0), (void*)tmp_pd
->local_addr
);
2419 CmiAssert(SIZEFIELD((void*)(tmp_pd
->local_addr
)) <= tmp_pd
->length
);
2420 DecreaseMsgInRecv((void*)tmp_pd
->local_addr
);
2421 #if MACHINE_DEBUG_LOG
2422 if(NoMsgInRecv((void*)(tmp_pd
->local_addr
)))
2423 buffered_recv_msg
-= GetMempoolsize((void*)(tmp_pd
->local_addr
));
2424 MACHSTATE5(8, "GO Recv done ack send from %d (%d,%d, %d) tag=%d\n", inst_id
, buffered_send_msg
, buffered_recv_msg
, register_memory_size
, msg_tag
);
2426 TRACE_COMM_CREATION(CpvAccess(projTraceStart
), (void*)tmp_pd
->local_addr
);
2427 handleOneRecvedMsg(tmp_pd
->length
, (void*)tmp_pd
->local_addr
);
2428 }else if(msg_tag
== BIG_MSG_TAG
){
2429 void *msg
= (char*)tmp_pd
->local_addr
-(tmp_pd
->cqwrite_value
-1)*ONE_SEG
;
2430 CmiSetMsgSeq(msg
, CmiGetMsgSeq(msg
)+1);
2431 if (tmp_pd
->first_operand
<= ONE_SEG
*CmiGetMsgSeq(msg
)) {
2434 printf("Pipeline msg done [%d]\n", myrank
);
2436 #if CMK_SMP_TRACE_COMMTHREAD
2437 if( tmp_pd
->cqwrite_value
== 1)
2438 TRACE_COMM_CONTROL_CREATION((double)(tmp_pd
->second_operand
/1000000.0), (double)((tmp_pd
->second_operand
+1)/1000000.0), (double)((tmp_pd
->second_operand
+2)/1000000.0), (void*)tmp_pd
->local_addr
);
2440 TRACE_COMM_CREATION(CpvAccess(projTraceStart
), msg
);
2441 handleOneRecvedMsg(tmp_pd
->first_operand
, msg
);
2445 FreePostDesc(tmp_pd
);
2448 if(status
== GNI_RC_ERROR_RESOURCE
)
2450 printf("charm> Please use +useSendQueue 204800 in your command line, if the error comes again, increase this number\n");
2451 GNI_RC_CHECK("Smsg_tx_cq full", status
);
2455 static void SendRdmaMsg()
2457 gni_return_t status
= GNI_RC_SUCCESS
;
2458 gni_mem_handle_t msg_mem_hndl
;
2459 RDMA_REQUEST
*ptr
= 0, *tmp_ptr
;
2460 RDMA_REQUEST
*pre
= 0;
2461 uint64_t register_size
= 0;
2465 int len
= PCQueueLength(sendRdmaBuf
);
2466 for (i
=0; i
<len
; i
++)
2468 CMI_PCQUEUEPOP_LOCK(sendRdmaBuf
)
2469 ptr
= (RDMA_REQUEST
*)PCQueuePop(sendRdmaBuf
);
2470 CMI_PCQUEUEPOP_UNLOCK(sendRdmaBuf
)
2471 if (ptr
== NULL
) break;
2477 MACHSTATE4(8, "noempty-rdma %d (%lld,%lld,%d) \n", ptr
->destNode
, buffered_send_msg
, buffered_recv_msg
, register_memory_size
);
2478 gni_post_descriptor_t
*pd
= ptr
->pd
;
2479 status
= GNI_RC_SUCCESS
;
2481 if(pd
->cqwrite_value
== 0)
2483 if(IsMemHndlZero((GetMemHndl(pd
->local_addr
))))
2485 msg
= (void*)(pd
->local_addr
);
2486 status
= registerMemory(GetMempoolBlockPtr(msg
), GetMempoolsize(msg
), &(GetMemHndl(msg
)), rdma_rx_cqh
);
2487 if(status
== GNI_RC_SUCCESS
)
2489 pd
->local_mem_hndl
= GetMemHndl((void*)(pd
->local_addr
));
2493 pd
->local_mem_hndl
= GetMemHndl((void*)(pd
->local_addr
));
2495 if(NoMsgInRecv( (void*)(pd
->local_addr
)))
2496 register_size
= GetMempoolsize((void*)(pd
->local_addr
));
2499 }else if( IsMemHndlZero(pd
->local_mem_hndl
)) //big msg, can not fit into memory pool, or CmiDirect Msg (which is not from mempool)
2501 status
= registerMemory((void*)(pd
->local_addr
), pd
->length
, &(pd
->local_mem_hndl
), NULL
);
2503 if(status
== GNI_RC_SUCCESS
) //mem register good
2505 CmiNodeLock lock
= pd
->type
== GNI_POST_RDMA_GET
? rdma_tx_cq_lock
:default_tx_cq_lock
;
2508 if( pd
->cqwrite_value
== 0)
2510 pd
->cq_mode
|= GNI_CQMODE_REMOTE_EVENT
;
2511 int sts
= GNI_EpSetEventData(ep_hndl_array
[ptr
->destNode
], ptr
->destNode
, ACK_EVENT(ptr
->ack_index
));
2512 GNI_RC_CHECK("GNI_EpSetEventData", sts
);
2515 int sts
= GNI_EpSetEventData(ep_hndl_array
[ptr
->destNode
], ptr
->destNode
, myrank
);
2516 GNI_RC_CHECK("GNI_EpSetEventData", sts
);
2519 if(pd
->type
== GNI_POST_RDMA_GET
)
2521 status
= GNI_PostRdma(ep_hndl_array
[ptr
->destNode
], pd
);
2525 status
= GNI_PostFma(ep_hndl_array
[ptr
->destNode
], pd
);
2527 CMI_GNI_UNLOCK(lock
);
2529 if(status
== GNI_RC_SUCCESS
) //post good
2534 pre
->next
= ptr
->next
;
2537 sendRdmaBuf
= ptr
->next
;
2540 FreeRdmaRequest(tmp_ptr
);
2542 if(pd
->cqwrite_value
== 0)
2544 IncreaseMsgInRecv(((void*)(pd
->local_addr
)));
2546 #if MACHINE_DEBUG_LOG
2547 buffered_recv_msg
+= register_size
;
2548 MACHSTATE(8, "GO request from buffered\n");
2550 }else // cannot post
2553 PCQueuePush(sendRdmaBuf
, (char*)ptr
);
2560 } else //memory registration fails
2563 PCQueuePush(sendRdmaBuf
, (char*)ptr
);
2576 // return 1 if all messages are sent
2577 static int SendBufferMsg(SMSG_QUEUE
*queue
)
2579 MSG_LIST
*ptr
, *tmp_ptr
, *pre
=0, *current_head
;
2580 CONTROL_MSG
*control_msg_tmp
;
2581 gni_return_t status
;
2583 uint64_t register_size
;
2584 void *register_addr
;
2585 int index_previous
= -1;
2586 #if CMI_EXERT_SEND_CAP
2593 memset(destpe_avail
, 0, mysize
* sizeof(char));
2594 for (index
=0; index
<1; index
++)
2596 int i
, len
= PCQueueLength(queue
->sendMsgBuf
);
2597 for (i
=0; i
<len
; i
++)
2599 CMI_PCQUEUEPOP_LOCK(queue
->sendMsgBuf
)
2600 ptr
= (MSG_LIST
*)PCQueuePop(queue
->sendMsgBuf
);
2601 CMI_PCQUEUEPOP_UNLOCK(queue
->sendMsgBuf
)
2602 if(ptr
== NULL
) break;
2603 if (destpe_avail
[ptr
->destNode
] == 1) { /* can't send to this pe */
2604 PCQueuePush(queue
->sendMsgBuf
, (char*)ptr
);
2609 int nonempty
= PCQueueLength(nonEmptyQueues
);
2610 for(index
=0; index
<nonempty
; index
++)
2612 CMI_PCQUEUEPOP_LOCK(nonEmptyQueues
)
2613 MSG_LIST_INDEX
*current_list
= (MSG_LIST_INDEX
*)PCQueuePop(nonEmptyQueues
);
2614 CMI_PCQUEUEPOP_UNLOCK(nonEmptyQueues
)
2615 if(current_list
== NULL
) break;
2616 PCQueue current_queue
= current_list
-> sendSmsgBuf
;
2617 CmiLock(current_list
->lock
);
2618 int i
, len
= PCQueueLength(current_queue
);
2619 current_list
->pushed
= 0;
2620 CmiUnlock(current_list
->lock
);
2622 for(index
=0; index
<mysize
; index
++)
2624 PCQueue current_queue
= queue
->smsg_msglist_index
[index
].sendSmsgBuf
;
2625 int i
, len
= PCQueueLength(current_queue
);
2627 for (i
=0; i
<len
; i
++)
2629 CMI_PCQUEUEPOP_LOCK(current_queue
)
2630 ptr
= (MSG_LIST
*)PCQueuePop(current_queue
);
2631 CMI_PCQUEUEPOP_UNLOCK(current_queue
)
2632 if (ptr
== 0) break;
2635 int index
= queue
->smsg_head_index
;
2638 ptr
= queue
->smsg_msglist_index
[index
].sendSmsgBuf
;
2643 MACHSTATE5(8, "noempty-smsg %d (%d,%d,%d) tag=%d \n", ptr
->destNode
, buffered_send_msg
, buffered_recv_msg
, register_memory_size
, ptr
->tag
);
2644 status
= GNI_RC_ERROR_RESOURCE
;
2645 if (useDynamicSMSG
&& smsg_connected_flag
[index
] != 2) {
2646 /* connection not exists yet */
2651 case SMALL_DATA_TAG
:
2652 status
= send_smsg_message(queue
, ptr
->destNode
, ptr
->msg
, ptr
->size
, ptr
->tag
, 1);
2653 if(status
== GNI_RC_SUCCESS
)
2659 control_msg_tmp
= (CONTROL_MSG
*)ptr
->msg
;
2660 status
= send_large_messages(queue
, ptr
->destNode
, control_msg_tmp
, 1);
2663 status
= send_smsg_message(queue
, ptr
->destNode
, ptr
->msg
, ptr
->size
, ptr
->tag
, 1);
2664 if(status
== GNI_RC_SUCCESS
) FreeAckMsg((ACK_MSG
*)ptr
->msg
);
2667 status
= send_smsg_message(queue
, ptr
->destNode
, ptr
->msg
, ptr
->size
, ptr
->tag
, 1);
2668 if(status
== GNI_RC_SUCCESS
)
2670 FreeControlMsg((CONTROL_MSG
*)ptr
->msg
);
2674 case DIRECT_PUT_DONE_TAG
:
2675 status
= send_smsg_message(queue
, ptr
->destNode
, ptr
->msg
, sizeof(CMK_DIRECT_HEADER
), ptr
->tag
, 1);
2676 if(status
== GNI_RC_SUCCESS
)
2678 free((CMK_DIRECT_HEADER
*)ptr
->msg
);
2684 printf("Weird tag\n");
2685 CmiAbort("should not happen\n");
2687 if(status
== GNI_RC_SUCCESS
)
2690 buffered_smsg_counter
--;
2691 printf("[%d==>%d] buffered smsg sending done\n", myrank
, ptr
->destNode
);
2697 ptr
= pre
->next
= ptr
->next
;
2700 ptr
= queue
->smsg_msglist_index
[index
].sendSmsgBuf
= queue
->smsg_msglist_index
[index
].sendSmsgBuf
->next
;
2702 FreeMsgList(tmp_ptr
);
2706 #if CMI_EXERT_SEND_CAP
2708 if(sent_cnt
== SEND_CAP
)
2714 PCQueuePush(queue
->sendMsgBuf
, (char*)ptr
);
2716 PCQueuePush(current_queue
, (char*)ptr
);
2723 if(status
== GNI_RC_ERROR_RESOURCE
)
2725 #if CMK_SMP && ONE_SEND_QUEUE
2726 destpe_avail
[ptr
->destNode
] = 1;
2735 queue
->smsg_msglist_index
[index
].tail
= pre
;
2736 if(queue
->smsg_msglist_index
[index
].sendSmsgBuf
== 0)
2738 if(index_previous
!= -1)
2739 queue
->smsg_msglist_index
[index_previous
].next
= queue
->smsg_msglist_index
[index
].next
;
2741 queue
->smsg_head_index
= queue
->smsg_msglist_index
[index
].next
;
2744 index_previous
= index
;
2746 index
= queue
->smsg_msglist_index
[index
].next
;
2748 #if !ONE_SEND_QUEUE && SMP_LOCKS
2749 CmiLock(current_list
->lock
);
2750 if(!PCQueueEmpty(current_queue
) && current_list
->pushed
== 0)
2752 current_list
->pushed
= 1;
2753 PCQueuePush(nonEmptyQueues
, current_list
);
2755 CmiUnlock(current_list
->lock
);
2759 #if CMI_EXERT_SEND_CAP
2760 if(sent_cnt
== SEND_CAP
)
2763 } // end pooling for all cores
2767 static void ProcessDeadlock();
2768 void LrtsAdvanceCommunication(int whileidle
)
2770 static int count
= 0;
2771 /* Receive Msg first */
2772 #if CMK_SMP_TRACE_COMMTHREAD
2773 double startT
, endT
;
2775 if (useDynamicSMSG
&& whileidle
)
2777 #if CMK_SMP_TRACE_COMMTHREAD
2778 startT
= CmiWallTimer();
2780 PumpDatagramConnection();
2781 #if CMK_SMP_TRACE_COMMTHREAD
2782 endT
= CmiWallTimer();
2783 if (endT
-startT
>=TRACE_THRESHOLD
) traceUserBracketEvent(event_SetupConnect
, startT
, endT
);
2787 #if CMK_SMP_TRACE_COMMTHREAD
2788 startT
= CmiWallTimer();
2791 //MACHSTATE(8, "after PumpNetworkSmsg \n") ;
2792 #if CMK_SMP_TRACE_COMMTHREAD
2793 endT
= CmiWallTimer();
2794 if (endT
-startT
>=TRACE_THRESHOLD
) traceUserBracketEvent(event_PumpSmsg
, startT
, endT
);
2797 #if CMK_SMP_TRACE_COMMTHREAD
2798 startT
= CmiWallTimer();
2800 PumpLocalTransactions(default_tx_cqh
, default_tx_cq_lock
);
2801 //MACHSTATE(8, "after PumpLocalTransactions\n") ;
2802 #if CMK_SMP_TRACE_COMMTHREAD
2803 endT
= CmiWallTimer();
2804 if (endT
-startT
>=TRACE_THRESHOLD
) traceUserBracketEvent(event_PumpTransaction
, startT
, endT
);
2807 #if CMK_SMP_TRACE_COMMTHREAD
2808 startT
= CmiWallTimer();
2810 PumpLocalTransactions(rdma_tx_cqh
, rdma_tx_cq_lock
);
2813 PumpCqWriteTransactions();
2817 PumpRemoteTransactions();
2820 //MACHSTATE(8, "after PumpLocalTransactions\n") ;
2821 #if CMK_SMP_TRACE_COMMTHREAD
2822 endT
= CmiWallTimer();
2823 if (endT
-startT
>=TRACE_THRESHOLD
) traceUserBracketEvent(event_PumpRdmaTransaction
, startT
, endT
);
2826 /* Send buffered Message */
2827 #if CMK_SMP_TRACE_COMMTHREAD
2828 startT
= CmiWallTimer();
2831 if (SendBufferMsg(&smsg_oob_queue
) == 1)
2834 SendBufferMsg(&smsg_queue
);
2836 //MACHSTATE(8, "after SendBufferMsg\n") ;
2837 #if CMK_SMP_TRACE_COMMTHREAD
2838 endT
= CmiWallTimer();
2839 if (endT
-startT
>=TRACE_THRESHOLD
) traceUserBracketEvent(event_SendBufferSmsg
, startT
, endT
);
2842 #if CMK_SMP_TRACE_COMMTHREAD
2843 startT
= CmiWallTimer();
2846 //MACHSTATE(8, "after SendRdmaMsg\n") ;
2847 #if CMK_SMP_TRACE_COMMTHREAD
2848 endT
= CmiWallTimer();
2849 if (endT
-startT
>=TRACE_THRESHOLD
) traceUserBracketEvent(event_SendFmaRdmaMsg
, startT
, endT
);
2852 #if CMK_SMP && ! LARGEPAGE
2853 if (_detected_hang
) ProcessDeadlock();
2857 /* useDynamicSMSG */
2858 static void _init_dynamic_smsg()
2860 gni_return_t status
;
2861 uint32_t vmdh_index
= -1;
2864 smsg_attr_vector_local
= (gni_smsg_attr_t
**)malloc(mysize
* sizeof(gni_smsg_attr_t
*));
2865 smsg_attr_vector_remote
= (gni_smsg_attr_t
**)malloc(mysize
* sizeof(gni_smsg_attr_t
*));
2866 smsg_connected_flag
= (int*)malloc(sizeof(int)*mysize
);
2867 for(i
=0; i
<mysize
; i
++) {
2868 smsg_connected_flag
[i
] = 0;
2869 smsg_attr_vector_local
[i
] = NULL
;
2870 smsg_attr_vector_remote
[i
] = NULL
;
2874 SMSG_MAX_MSG
= 4096;
2875 }else if (mysize
<= 4096)
2877 SMSG_MAX_MSG
= 4096/mysize
* 1024;
2878 }else if (mysize
<= 16384)
2885 send_smsg_attr
.msg_type
= GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT
;
2886 send_smsg_attr
.mbox_maxcredit
= SMSG_MAX_CREDIT
;
2887 send_smsg_attr
.msg_maxsize
= SMSG_MAX_MSG
;
2888 status
= GNI_SmsgBufferSizeNeeded(&send_smsg_attr
, &smsg_memlen
);
2889 GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status
);
2891 mailbox_list
= (dynamic_smsg_mailbox_t
*)malloc(sizeof(dynamic_smsg_mailbox_t
));
2892 mailbox_list
->size
= smsg_memlen
*avg_smsg_connection
;
2893 posix_memalign(&mailbox_list
->mailbox_base
, 64, mailbox_list
->size
);
2894 bzero(mailbox_list
->mailbox_base
, mailbox_list
->size
);
2895 mailbox_list
->offset
= 0;
2896 mailbox_list
->next
= 0;
2898 status
= GNI_MemRegister(nic_hndl
, (uint64_t)(mailbox_list
->mailbox_base
),
2899 mailbox_list
->size
, smsg_rx_cqh
,
2902 &(mailbox_list
->mem_hndl
));
2903 GNI_RC_CHECK("MEMORY registration for smsg", status
);
2905 status
= GNI_EpCreate(nic_hndl
, default_tx_cqh
, &ep_hndl_unbound
);
2906 GNI_RC_CHECK("Unbound EP", status
);
2908 alloc_smsg_attr(&send_smsg_attr
);
2910 status
= GNI_EpPostDataWId (ep_hndl_unbound
, &send_smsg_attr
, SMSG_ATTR_SIZE
, &recv_smsg_attr
, SMSG_ATTR_SIZE
, myrank
);
2911 GNI_RC_CHECK("post unbound datagram", status
);
2913 /* always pre-connect to proc 0 */
2914 //if (myrank != 0) connect_to(0);
2917 static void _init_static_smsg()
2919 gni_smsg_attr_t
*smsg_attr
;
2920 gni_smsg_attr_t remote_smsg_attr
;
2921 gni_smsg_attr_t
*smsg_attr_vec
;
2922 gni_mem_handle_t my_smsg_mdh_mailbox
;
2924 gni_return_t status
;
2925 uint32_t vmdh_index
= -1;
2926 mdh_addr_t base_infor
;
2927 mdh_addr_t
*base_addr_vec
;
2932 SMSG_MAX_MSG
= 1024;
2933 }else if (mysize
<= 4096)
2935 SMSG_MAX_MSG
= 1024;
2936 }else if (mysize
<= 16384)
2943 env
= getenv("CHARM_UGNI_SMSG_MAX_SIZE");
2944 if (env
) SMSG_MAX_MSG
= atoi(env
);
2945 CmiAssert(SMSG_MAX_MSG
> 0);
2947 smsg_attr
= malloc(mysize
* sizeof(gni_smsg_attr_t
));
2949 smsg_attr
[0].msg_type
= GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT
;
2950 smsg_attr
[0].mbox_maxcredit
= SMSG_MAX_CREDIT
;
2951 smsg_attr
[0].msg_maxsize
= SMSG_MAX_MSG
;
2952 status
= GNI_SmsgBufferSizeNeeded(&smsg_attr
[0], &smsg_memlen
);
2953 GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status
);
2954 ret
= posix_memalign(&smsg_mailbox_base
, 64, smsg_memlen
*(mysize
));
2955 CmiAssert(ret
== 0);
2956 bzero(smsg_mailbox_base
, smsg_memlen
*(mysize
));
2958 status
= GNI_MemRegister(nic_hndl
, (uint64_t)smsg_mailbox_base
,
2959 smsg_memlen
*(mysize
), smsg_rx_cqh
,
2962 &my_smsg_mdh_mailbox
);
2963 register_memory_size
+= smsg_memlen
*(mysize
);
2964 GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status
);
2966 if (myrank
== 0) printf("Charm++> SMSG memory: %1.1fKB\n", 1.0*smsg_memlen
*(mysize
)/1024);
2967 if (myrank
== 0 && register_memory_size
>=MAX_REG_MEM
) printf("Charm++> FATAL ERROR your program has risk of hanging \n please set CHARM_UGNI_MEMPOOL_MAX to a larger value or use Dynamic smsg\n");
2969 base_infor
.addr
= (uint64_t)smsg_mailbox_base
;
2970 base_infor
.mdh
= my_smsg_mdh_mailbox
;
2971 base_addr_vec
= malloc(mysize
* sizeof(mdh_addr_t
));
2973 allgather(&base_infor
, base_addr_vec
, sizeof(mdh_addr_t
));
2975 for(i
=0; i
<mysize
; i
++)
2979 smsg_attr
[i
].msg_type
= GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT
;
2980 smsg_attr
[i
].mbox_maxcredit
= SMSG_MAX_CREDIT
;
2981 smsg_attr
[i
].msg_maxsize
= SMSG_MAX_MSG
;
2982 smsg_attr
[i
].mbox_offset
= i
*smsg_memlen
;
2983 smsg_attr
[i
].buff_size
= smsg_memlen
;
2984 smsg_attr
[i
].msg_buffer
= smsg_mailbox_base
;
2985 smsg_attr
[i
].mem_hndl
= my_smsg_mdh_mailbox
;
2988 for(i
=0; i
<mysize
; i
++)
2990 if (myrank
== i
) continue;
2992 remote_smsg_attr
.msg_type
= GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT
;
2993 remote_smsg_attr
.mbox_maxcredit
= SMSG_MAX_CREDIT
;
2994 remote_smsg_attr
.msg_maxsize
= SMSG_MAX_MSG
;
2995 remote_smsg_attr
.mbox_offset
= myrank
*smsg_memlen
;
2996 remote_smsg_attr
.buff_size
= smsg_memlen
;
2997 remote_smsg_attr
.msg_buffer
= (void*)base_addr_vec
[i
].addr
;
2998 remote_smsg_attr
.mem_hndl
= base_addr_vec
[i
].mdh
;
3000 /* initialize the smsg channel */
3001 status
= GNI_SmsgInit(ep_hndl_array
[i
], &smsg_attr
[i
], &remote_smsg_attr
);
3002 GNI_RC_CHECK("SMSG Init", status
);
3003 } //end initialization
3005 free(base_addr_vec
);
3008 status
= GNI_SmsgSetMaxRetrans(nic_hndl
, 4096);
3009 GNI_RC_CHECK("SmsgSetMaxRetrans Init", status
);
3013 static void _init_send_queue(SMSG_QUEUE
*queue
)
3017 queue
->sendMsgBuf
= PCQueueCreate();
3018 destpe_avail
= (char*)malloc(mysize
* sizeof(char));
3020 queue
->smsg_msglist_index
= (MSG_LIST_INDEX
*)malloc(mysize
*sizeof(MSG_LIST_INDEX
));
3021 #if CMK_SMP && SMP_LOCKS
3022 nonEmptyQueues
= PCQueueCreate();
3024 for(i
=0; i
<mysize
; i
++)
3027 queue
->smsg_msglist_index
[i
].sendSmsgBuf
= PCQueueCreate();
3029 queue
->smsg_msglist_index
[i
].pushed
= 0;
3030 queue
->smsg_msglist_index
[i
].lock
= CmiCreateLock();
3033 queue
->smsg_msglist_index
[i
].sendSmsgBuf
= 0;
3034 queue
->smsg_msglist_index
[i
].next
= -1;
3035 queue
->smsg_head_index
= -1;
3043 static void _init_smsg()
3047 _init_dynamic_smsg();
3049 _init_static_smsg();
3052 _init_send_queue(&smsg_queue
);
3054 _init_send_queue(&smsg_oob_queue
);
3058 static void _init_static_msgq()
3060 gni_return_t status
;
3061 /* MSGQ is to send and receive short messages for large jobs (exceeding 200,000 ranks). The performance scales by the node count rather than rank count */
3062 msgq_attrs
.max_msg_sz
= MSGQ_MAXSIZE
;
3063 msgq_attrs
.smsg_q_sz
= 1;
3064 msgq_attrs
.rcv_pool_sz
= 1;
3065 msgq_attrs
.num_msgq_eps
= 2;
3066 msgq_attrs
.nloc_insts
= 8;
3067 msgq_attrs
.modes
= 0;
3068 msgq_attrs
.rcv_cq_sz
= REMOTE_QUEUE_ENTRIES
;
3070 status
= GNI_MsgqInit(nic_hndl
, NULL
, NULL
, NULL
, &msgq_attrs
, &msgq_handle
);
3071 GNI_RC_CHECK("MSGQ Init", status
);
3077 static CmiUInt8 total_mempool_size
= 0;
3078 static CmiUInt8 total_mempool_calls
= 0;
3080 #if USE_LRTS_MEMPOOL
3081 void *alloc_mempool_block(size_t *size
, gni_mem_handle_t
*mem_hndl
, int expand_flag
)
3085 gni_return_t status
= GNI_RC_SUCCESS
;
3087 size_t default_size
= expand_flag
? _expand_mem
: _mempool_size
;
3088 if (*size
< default_size
) *size
= default_size
;
3090 // round up to be multiple of _tlbpagesize
3091 //*size = (*size + _tlbpagesize - 1)/_tlbpagesize*_tlbpagesize;
3092 *size
= ALIGNHUGEPAGE(*size
);
3094 total_mempool_size
+= *size
;
3095 total_mempool_calls
+= 1;
3097 if ((*size
> MAX_REG_MEM
|| *size
> MAX_BUFF_SEND
) && expand_flag
)
3099 printf("Error: A mempool block with size %lld is allocated, which is greater than the maximum mempool allowed.\n Please increase the max pool size by using +gni-mempool-max or set enviorment variable CHARM_UGNI_MEMPOOL_MAX. (current=%lld, %lld)\n", *size
, MAX_REG_MEM
, MAX_BUFF_SEND
);
3100 CmiAbort("alloc_mempool_block");
3104 pool
= my_get_huge_pages(*size
);
3107 ret
= posix_memalign(&pool
, ALIGNBUF
, *size
);
3110 #if CMK_SMP && STEAL_MEMPOOL
3111 pool
= steal_mempool_block(size
, mem_hndl
);
3112 if (pool
!= NULL
) return pool
;
3114 printf("Charm++> can not allocate memory pool of size %.2fMB. \n", 1.0*(*size
)/1024/1024);
3116 CmiAbort("alloc_mempool_block: out of memory.");
3118 CmiAbort("alloc_mempool_block: posix_memalign failed");
3123 MEMORY_REGISTER(onesided_hnd
, nic_hndl
, pool
, *size
, mem_hndl
, &omdh
, rdma_rx_cqh
, status
);
3125 if(status
!= GNI_RC_SUCCESS
) {
3126 printf("[%d, %d] memory reigstration %f G (%lld) ask for %lld\n", myrank
, CmiMyRank(), register_memory_size
/(1024*1024.0*1024),register_count
, *size
);
3127 sweep_mempool(CpvAccess(mempool
));
3129 GNI_RC_CHECK("MEMORY_REGISTER", status
);
3131 SetMemHndlZero((*mem_hndl
));
3136 // ptr is a block head pointer
3137 void free_mempool_block(void *ptr
, gni_mem_handle_t mem_hndl
)
3139 if(!(IsMemHndlZero(mem_hndl
)))
3141 MEMORY_DEREGISTER(onesided_hnd
, nic_hndl
, &mem_hndl
, &omdh
, GetSizeFromBlockHeader(ptr
));
3144 my_free_huge_pages(ptr
, GetSizeFromBlockHeader(ptr
));
3151 void LrtsPreCommonInit(int everReturn
){
3152 #if USE_LRTS_MEMPOOL
3153 CpvInitialize(mempool_type
*, mempool
);
3154 CpvAccess(mempool
) = mempool_init(_mempool_size
, alloc_mempool_block
, free_mempool_block
, _mempool_size_limit
);
3155 MACHSTATE2(8, "mempool_init %d %p\n", CmiMyRank(), CpvAccess(mempool
)) ;
3159 void LrtsInit(int *argc
, char ***argv
, int *numNodes
, int *myNodeID
)
3164 unsigned int remote_addr
;
3165 gni_cdm_handle_t cdm_hndl
;
3166 gni_return_t status
= GNI_RC_SUCCESS
;
3167 uint32_t vmdh_index
= -1;
3169 unsigned int local_addr
, *MPID_UGNI_AllAddr
;
3174 //void (*local_event_handler)(gni_cq_entry_t *, void *) = &LocalEventHandle;
3175 //void (*remote_smsg_event_handler)(gni_cq_entry_t *, void *) = &RemoteSmsgEventHandle;
3176 //void (*remote_bte_event_handler)(gni_cq_entry_t *, void *) = &RemoteBteEventHandle;
3178 status
= PMI_Init(&first_spawned
);
3179 GNI_RC_CHECK("PMI_Init", status
);
3181 status
= PMI_Get_size(&mysize
);
3182 GNI_RC_CHECK("PMI_Getsize", status
);
3184 status
= PMI_Get_rank(&myrank
);
3185 GNI_RC_CHECK("PMI_getrank", status
);
3187 //physicalID = CmiPhysicalNodeID(myrank);
3189 //printf("Pysical Node ID:%d for PE:%d\n", physicalID, myrank);
3194 #if MULTI_THREAD_SEND
3195 /* Currently, we only consider the case that comm. thread will only recv msgs */
3196 Cmi_smp_mode_setting
= COMM_WORK_THREADS_SEND_RECV
;
3199 env
= getenv("CHARM_UGNI_REMOTE_QUEUE_SIZE");
3200 if (env
) REMOTE_QUEUE_ENTRIES
= atoi(env
);
3201 CmiGetArgInt(*argv
,"+useRecvQueue", &REMOTE_QUEUE_ENTRIES
);
3203 env
= getenv("CHARM_UGNI_LOCAL_QUEUE_SIZE");
3204 if (env
) LOCAL_QUEUE_ENTRIES
= atoi(env
);
3205 CmiGetArgInt(*argv
,"+useSendQueue", &LOCAL_QUEUE_ENTRIES
);
3207 env
= getenv("CHARM_UGNI_DYNAMIC_SMSG");
3208 if (env
) useDynamicSMSG
= 1;
3209 if (!useDynamicSMSG
)
3210 useDynamicSMSG
= CmiGetArgFlag(*argv
, "+useDynamicSmsg");
3211 CmiGetArgIntDesc(*argv
, "+smsgConnection", &avg_smsg_connection
,"Initial number of SMSGS connection per code");
3212 if (avg_smsg_connection
>mysize
) avg_smsg_connection
= mysize
;
3213 //useStaticMSGQ = CmiGetArgFlag(*argv, "+useStaticMsgQ");
3217 printf("Charm++> Running on Gemini (GNI) with %d processes\n", mysize
);
3218 printf("Charm++> %s SMSG\n", useDynamicSMSG
?"dynamic":"static");
3221 onesided_init(NULL
, &onesided_hnd
);
3223 // this is a GNI test, so use the libonesided bypass functionality
3224 onesided_gni_bypass_get_nih(onesided_hnd
, &nic_hndl
);
3225 local_addr
= gniGetNicAddress();
3228 cookie
= get_cookie();
3230 modes
= GNI_CDM_MODE_CQ_NIC_LOCAL_PLACEMENT
;
3232 //Create and attach to the communication domain */
3233 status
= GNI_CdmCreate(myrank
, ptag
, cookie
, modes
, &cdm_hndl
);
3234 GNI_RC_CHECK("GNI_CdmCreate", status
);
3235 //* device id The device id is the minor number for the device
3236 //that is assigned to the device by the system when the device is created.
3237 //To determine the device number, look in the /dev directory, which contains a list of devices. For a NIC, the device is listed as kgniX
3238 //where X is the device number 0 default
3239 status
= GNI_CdmAttach(cdm_hndl
, device_id
, &local_addr
, &nic_hndl
);
3240 GNI_RC_CHECK("GNI_CdmAttach", status
);
3241 local_addr
= get_gni_nic_address(0);
3243 MPID_UGNI_AllAddr
= (unsigned int *)malloc(sizeof(unsigned int) * mysize
);
3244 _MEMCHECK(MPID_UGNI_AllAddr
);
3245 allgather(&local_addr
, MPID_UGNI_AllAddr
, sizeof(unsigned int));
3246 /* create the local completion queue */
3247 /* the third parameter : The number of events the NIC allows before generating an interrupt. Setting this parameter to zero results in interrupt delivery with every event. When using this parameter, the mode parameter must be set to GNI_CQ_BLOCKING*/
3248 status
= GNI_CqCreate(nic_hndl
, LOCAL_QUEUE_ENTRIES
, 0, GNI_CQ_NOBLOCK
, NULL
, NULL
, &default_tx_cqh
);
3249 GNI_RC_CHECK("GNI_CqCreate (tx)", status
);
3251 status
= GNI_CqCreate(nic_hndl
, LOCAL_QUEUE_ENTRIES
, 0, GNI_CQ_NOBLOCK
, NULL
, NULL
, &rdma_tx_cqh
);
3252 GNI_RC_CHECK("GNI_CqCreate RDMA (tx)", status
);
3253 /* create the destination completion queue for receiving micro-messages, make this queue considerably larger than the number of transfers */
3255 status
= GNI_CqCreate(nic_hndl
, REMOTE_QUEUE_ENTRIES
, 0, GNI_CQ_NOBLOCK
, NULL
, NULL
, &smsg_rx_cqh
);
3256 GNI_RC_CHECK("Create CQ (rx)", status
);
3258 status
= GNI_CqCreate(nic_hndl
, REMOTE_QUEUE_ENTRIES
, 0, GNI_CQ_NOBLOCK
, NULL
, NULL
, &rdma_rx_cqh
);
3259 GNI_RC_CHECK("Create Post CQ (rx)", status
);
3261 //status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &rdma_cqh);
3262 //GNI_RC_CHECK("Create BTE CQ", status);
3264 /* create the endpoints. they need to be bound to allow later CQWrites to them */
3265 ep_hndl_array
= (gni_ep_handle_t
*)malloc(mysize
* sizeof(gni_ep_handle_t
));
3266 _MEMCHECK(ep_hndl_array
);
3267 #if MULTI_THREAD_SEND
3268 rx_cq_lock
= global_gni_lock
= default_tx_cq_lock
= smsg_mailbox_lock
= CmiCreateLock();
3269 //default_tx_cq_lock = CmiCreateLock();
3270 rdma_tx_cq_lock
= CmiCreateLock();
3271 smsg_rx_cq_lock
= CmiCreateLock();
3272 //global_gni_lock = CmiCreateLock();
3273 //rx_cq_lock = CmiCreateLock();
3275 for (i
=0; i
<mysize
; i
++) {
3276 if(i
== myrank
) continue;
3277 status
= GNI_EpCreate(nic_hndl
, default_tx_cqh
, &ep_hndl_array
[i
]);
3278 GNI_RC_CHECK("GNI_EpCreate ", status
);
3279 remote_addr
= MPID_UGNI_AllAddr
[i
];
3280 status
= GNI_EpBind(ep_hndl_array
[i
], remote_addr
, i
);
3281 GNI_RC_CHECK("GNI_EpBind ", status
);
3284 /* SMSG is fastest but not scale; Msgq is scalable, FMA is own implementation for small message */
3288 #if USE_LRTS_MEMPOOL
3289 env
= getenv("CHARM_UGNI_MAX_MEMORY_ON_NODE");
3291 _totalmem
= CmiReadSize(env
);
3293 printf("Charm++> total registered memory available per node is %.1fGB\n", (float)(_totalmem
*1.0/oneGB
));
3296 env
= getenv("CHARM_UGNI_MEMPOOL_INIT_SIZE");
3297 if (env
) _mempool_size
= CmiReadSize(env
);
3298 if (CmiGetArgStringDesc(*argv
,"+gni-mempool-init-size",&env
,"Set the memory pool size"))
3299 _mempool_size
= CmiReadSize(env
);
3302 env
= getenv("CHARM_UGNI_MEMPOOL_MAX");
3304 MAX_REG_MEM
= CmiReadSize(env
);
3307 if (CmiGetArgStringDesc(*argv
,"+gni-mempool-max",&env
,"Set the memory pool max size")) {
3308 MAX_REG_MEM
= CmiReadSize(env
);
3312 env
= getenv("CHARM_UGNI_SEND_MAX");
3314 MAX_BUFF_SEND
= CmiReadSize(env
);
3317 if (CmiGetArgStringDesc(*argv
,"+gni-mempool-max-send",&env
,"Set the memory pool max size for send")) {
3318 MAX_BUFF_SEND
= CmiReadSize(env
);
3322 env
= getenv("CHARM_UGNI_MEMPOOL_SIZE_LIMIT");
3324 _mempool_size_limit
= CmiReadSize(env
);
3327 if (MAX_REG_MEM
< _mempool_size
) MAX_REG_MEM
= _mempool_size
;
3328 if (MAX_BUFF_SEND
> MAX_REG_MEM
) MAX_BUFF_SEND
= MAX_REG_MEM
;
3331 printf("Charm++> memory pool init block size: %1.fMB, total registered memory per node: %1.fMB\n", _mempool_size
/1024.0/1024, _mempool_size_limit
/1024.0/1024);
3332 printf("Charm++> memory pool registered memory limit: %1.fMB, send limit: %1.fMB\n", MAX_REG_MEM
/1024.0/1024, MAX_BUFF_SEND
/1024.0/1024);
3333 if (MAX_REG_MEM
< BIG_MSG
* 2 + oneMB
) {
3334 /* memblock can expand to BIG_MSG * 2 size */
3335 printf("Charm++ Error: The mempool maximum size is too small, please use command line option +gni-mempool-max or environment variable CHARM_UGNI_MEMPOOL_MAX to increase the value to at least %1.fMB.\n", BIG_MSG
* 2.0/1024/1024 + 1);
3336 CmiAbort("mempool maximum size is too small. \n");
3338 #if MULTI_THREAD_SEND
3339 printf("Charm++> worker thread sending messages\n");
3340 #elif COMM_THREAD_SEND
3341 printf("Charm++> only comm thread send/recv messages\n");
3345 #endif /* end of USE_LRTS_MEMPOOL */
3347 env
= getenv("CHARM_UGNI_BIG_MSG_SIZE");
3349 BIG_MSG
= CmiReadSize(env
);
3350 if (BIG_MSG
< ONE_SEG
)
3351 CmiAbort("BIG_MSG size is too small in the environment variable CHARM_UGNI_BIG_MSG_SIZE.");
3353 env
= getenv("CHARM_UGNI_BIG_MSG_PIPELINE_LEN");
3355 BIG_MSG_PIPELINE
= atoi(env
);
3358 env
= getenv("CHARM_UGNI_NO_DEADLOCK_CHECK");
3359 if (env
) _checkProgress
= 0;
3360 if (mysize
== 1) _checkProgress
= 0;
3364 env = getenv("HUGETLB_DEFAULT_PAGE_SIZE");
3366 _tlbpagesize = CmiReadSize(env);
3368 /* real gethugepagesize() is only available when hugetlb module linked */
3369 _tlbpagesize
= gethugepagesize();
3371 printf("Charm++> Cray TLB page size: %1.fK\n", _tlbpagesize
/1024.0);
3375 if (_tlbpagesize
== 4096) {
3376 CmiAbort("Hugepage module, e.g. craype-hugepages8M must be loaded.");
3380 print_stats
= CmiGetArgFlag(*argv
, "+print_stats");
3382 /* init DMA buffer for medium message */
3384 //_init_DMA_buffer();
3386 free(MPID_UGNI_AllAddr
);
3388 sendRdmaBuf
= PCQueueCreate();
3392 #if MACHINE_DEBUG_LOG
3394 sprintf(ln
,"debugLog.%d",myrank
);
3395 debugLog
=fopen(ln
,"w");
3399 // ntk_return_t sts = NTK_System_GetSmpdCount(&_smpd_count);
3410 void* LrtsAlloc(int n_bytes
, int header
)
3414 printf("\n[PE:%d]Alloc Lrts for bytes=%d, head=%d %d\n", CmiMyPe(), n_bytes
, header
, SMSG_MAX_MSG
);
3416 if(n_bytes
<= SMSG_MAX_MSG
)
3418 int totalsize
= n_bytes
+header
;
3419 ptr
= malloc(totalsize
);
3422 CmiAssert(header
+sizeof(mempool_header
) <= ALIGNBUF
);
3423 #if USE_LRTS_MEMPOOL
3424 n_bytes
= ALIGN64(n_bytes
);
3425 if(n_bytes
< BIG_MSG
)
3427 char *res
= mempool_malloc(CpvAccess(mempool
), ALIGNBUF
+n_bytes
-sizeof(mempool_header
), 1);
3428 if (res
) ptr
= res
- sizeof(mempool_header
) + ALIGNBUF
- header
;
3432 //printf("[%d] LrtsAlloc a big_msg: %d %d\n", myrank, n_bytes, ALIGNHUGEPAGE(n_bytes+ALIGNBUF));
3433 n_bytes
= ALIGNHUGEPAGE(n_bytes
+ALIGNBUF
);
3434 char *res
= my_get_huge_pages(n_bytes
);
3436 char *res
= memalign(ALIGNBUF
, n_bytes
+ALIGNBUF
);
3438 if (res
) ptr
= res
+ ALIGNBUF
- header
;
3441 n_bytes
= ALIGN64(n_bytes
); /* make sure size if 4 aligned */
3442 char *res
= memalign(ALIGNBUF
, n_bytes
+ALIGNBUF
);
3443 ptr
= res
+ ALIGNBUF
- header
;
3449 void LrtsFree(void *msg
)
3451 CmiUInt4 size
= SIZEFIELD((char*)msg
+sizeof(CmiChunkHeader
));
3452 if (size
<= SMSG_MAX_MSG
)
3455 size
= ALIGN64(size
);
3459 int s
= ALIGNHUGEPAGE(size
+ALIGNBUF
);
3460 my_free_huge_pages((char*)msg
+ sizeof(CmiChunkHeader
) - ALIGNBUF
, s
);
3462 free((char*)msg
+ sizeof(CmiChunkHeader
) - ALIGNBUF
);
3466 #if USE_LRTS_MEMPOOL
3468 mempool_free_thread((char*)msg
+ sizeof(CmiChunkHeader
) - ALIGNBUF
+ sizeof(mempool_header
));
3470 mempool_free(CpvAccess(mempool
), (char*)msg
+ sizeof(CmiChunkHeader
) - ALIGNBUF
+ sizeof(mempool_header
));
3473 free((char*)msg
+ sizeof(CmiChunkHeader
) - ALIGNBUF
);
3482 if (print_stats
) print_comm_stats();
3485 #if USE_LRTS_MEMPOOL
3486 //printf("FINAL [%d, %d] register=%lld, send=%lld\n", myrank, CmiMyRank(), register_memory_size, buffered_send_msg);
3487 mempool_destroy(CpvAccess(mempool
));
3493 void LrtsDrainResources()
3495 if(mysize
== 1) return;
3498 !SendBufferMsg(&smsg_oob_queue
) ||
3500 !SendBufferMsg(&smsg_queue
)
3504 PumpDatagramConnection();
3506 PumpLocalTransactions(default_tx_cqh
, default_tx_cq_lock
);
3507 PumpLocalTransactions(rdma_tx_cqh
, rdma_tx_cq_lock
);
3513 void LrtsAbort(const char *message
) {
3514 printf("CmiAbort is calling on PE:%d\n", myrank
);
3515 CmiPrintStackTrace(0);
3516 PMI_Abort(-1, message
);
3519 /************************** TIMER FUNCTIONS **************************/
3520 #if CMK_TIMER_USE_SPECIAL
3521 /* MPI calls are not threadsafe, even the timer on some machines */
3522 static CmiNodeLock timerLock
= 0;
3523 static int _absoluteTime
= 0;
3524 static int _is_global
= 0;
3525 static struct timespec start_ts
;
3527 inline int CmiTimerIsSynchronized() {
3531 inline int CmiTimerAbsolute() {
3532 return _absoluteTime
;
3535 double CmiStartTimer() {
3539 double CmiInitTime() {
3540 return (double)(start_ts
.tv_sec
)+(double)start_ts
.tv_nsec
/1000000000.0;
3543 void CmiTimerInit(char **argv
) {
3544 _absoluteTime
= CmiGetArgFlagDesc(argv
,"+useAbsoluteTime", "Use system's absolute time as wallclock time.");
3545 if (_absoluteTime
&& CmiMyPe() == 0)
3546 printf("Charm++> absolute timer is used\n");
3548 _is_global
= CmiTimerIsSynchronized();
3552 if (CmiMyRank() == 0) {
3553 clock_gettime(CLOCK_MONOTONIC
, &start_ts
);
3555 } else { /* we don't have a synchronous timer, set our own start time */
3559 clock_gettime(CLOCK_MONOTONIC
, &start_ts
);
3561 CmiNodeAllBarrier(); /* for smp */
3565 * Since the timerLock is never created, and is
3566 * always NULL, then all the if-condition inside
3567 * the timer functions could be disabled right
3568 * now in the case of SMP.
3570 double CmiTimer(void) {
3571 struct timespec now_ts
;
3572 clock_gettime(CLOCK_MONOTONIC
, &now_ts
);
3573 return _absoluteTime
?((double)(now_ts
.tv_sec
)+(double)now_ts
.tv_nsec
/1000000000.0)
3574 : (double)( now_ts
.tv_sec
- start_ts
.tv_sec
) + (((double) now_ts
.tv_nsec
- (double) start_ts
.tv_nsec
) / 1000000000.0);
3577 double CmiWallTimer(void) {
3578 struct timespec now_ts
;
3579 clock_gettime(CLOCK_MONOTONIC
, &now_ts
);
3580 return _absoluteTime
?((double)(now_ts
.tv_sec
)+(double)now_ts
.tv_nsec
/1000000000.0)
3581 : ( now_ts
.tv_sec
- start_ts
.tv_sec
) + ((now_ts
.tv_nsec
- start_ts
.tv_nsec
) / 1000000000.0);
3584 double CmiCpuTimer(void) {
3585 struct timespec now_ts
;
3586 clock_gettime(CLOCK_MONOTONIC
, &now_ts
);
3587 return _absoluteTime
?((double)(now_ts
.tv_sec
)+(double)now_ts
.tv_nsec
/1000000000.0)
3588 : (double)( now_ts
.tv_sec
- start_ts
.tv_sec
) + (((double) now_ts
.tv_nsec
- (double) start_ts
.tv_nsec
) / 1000000000.0);
3592 /************Barrier Related Functions****************/
3596 gni_return_t status
;
3599 /* make sure all ranks reach here, otherwise comm threads may reach barrier ignoring other ranks */
3600 CmiNodeAllBarrier();
3601 if (CmiMyRank() == CmiMyNodeSize())
3603 if (CmiMyRank() == 0)
3607 * The call of CmiBarrier is usually before the initialization
3608 * of trace module of Charm++, therefore, the START_EVENT
3609 * and END_EVENT are disabled here. -Chao Mei
3612 status
= PMI_Barrier();
3613 GNI_RC_CHECK("PMI_Barrier", status
);
3616 CmiNodeAllBarrier();
3621 #include "machine-cmidirect.c"
3623 #if CMK_PERSISTENT_COMM
3624 #include "machine-persistent.c"