3 * Gemini GNI machine layer
9 * Flow control by mem pool using environment variables:
11 # CHARM_UGNI_MEMPOOL_MAX can be maximum_register_mem/number_of_processes
12 # CHARM_UGNI_SEND_MAX can be half of CHARM_UGNI_MEMPOOL_MAX
13 export CHARM_UGNI_MEMPOOL_INIT_SIZE=8M
14 export CHARM_UGNI_MEMPOOL_MAX=20M
15 export CHARM_UGNI_SEND_MAX=10M
17 # limit on total mempool size allocated, this is to prevent mempool
18 # uses too much memory
19 export CHARM_UGNI_MEMPOOL_SIZE_LIMIT=512M
21 other environment variables:
23 export CHARM_UGNI_NO_DEADLOCK_CHECK=yes # disable checking deadlock
24 export CHARM_UGNI_MAX_MEMORY_ON_NODE=0.8G # max memory per node for mempool
25 export CHARM_UGNI_BIG_MSG_SIZE=4M # set big message size protocol
26 export CHARM_UGNI_BIG_MSG_PIPELINE_LEN=4 # set big message pipe len
41 //#include <numatoolkit.h>
46 #include "cmidirect.h"
52 #define MULTI_THREAD_SEND 0
53 #define COMM_THREAD_SEND 1
57 #define CMK_WORKER_SINGLE_TASK 0
60 #define REMOTE_EVENT 0
63 #define CMI_EXERT_SEND_CAP 0
64 #define CMI_EXERT_RECV_CAP 0
66 #if CMI_EXERT_SEND_CAP
70 #if CMI_EXERT_RECV_CAP
74 #define USE_LRTS_MEMPOOL 1
78 // Trace communication thread
79 #if CMK_TRACE_ENABLED && CMK_SMP_TRACE_COMMTHREAD
80 #define TRACE_THRESHOLD 0.00005
81 #define CMI_MPI_TRACE_MOREDETAILED 0
82 #undef CMI_MPI_TRACE_USEREVENTS
83 #define CMI_MPI_TRACE_USEREVENTS 1
85 #undef CMK_SMP_TRACE_COMMTHREAD
86 #define CMK_SMP_TRACE_COMMTHREAD 0
89 #define CMK_TRACE_COMMOVERHEAD 0
90 #if CMK_TRACE_ENABLED && CMK_TRACE_COMMOVERHEAD
91 #undef CMI_MPI_TRACE_USEREVENTS
92 #define CMI_MPI_TRACE_USEREVENTS 1
94 #undef CMK_TRACE_COMMOVERHEAD
95 #define CMK_TRACE_COMMOVERHEAD 0
98 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && ! CMK_TRACE_IN_CHARM
99 CpvStaticDeclare(double, projTraceStart
);
100 #define START_EVENT() CpvAccess(projTraceStart) = CmiWallTimer();
101 #define END_EVENT(x) traceUserBracketEvent(x, CpvAccess(projTraceStart), CmiWallTimer());
103 #define START_EVENT()
109 #define oneMB (1024ll*1024)
110 #define oneGB (1024ll*1024*1024)
112 static CmiInt8 _mempool_size
= 8*oneMB
;
113 static CmiInt8 _expand_mem
= 4*oneMB
;
114 static CmiInt8 _mempool_size_limit
= 0;
116 static CmiInt8 _totalmem
= 0.8*oneGB
;
119 static CmiInt8 BIG_MSG
= 16*oneMB
;
120 static CmiInt8 ONE_SEG
= 4*oneMB
;
122 static CmiInt8 BIG_MSG
= 4*oneMB
;
123 static CmiInt8 ONE_SEG
= 2*oneMB
;
125 #if MULTI_THREAD_SEND
126 static int BIG_MSG_PIPELINE
= 1;
128 static int BIG_MSG_PIPELINE
= 4;
131 // dynamic flow control
132 static CmiInt8 buffered_send_msg
= 0;
133 static CmiInt8 register_memory_size
= 0;
136 static CmiInt8 MAX_BUFF_SEND
= 100000*oneMB
;
137 static CmiInt8 MAX_REG_MEM
= 200000*oneMB
;
138 static CmiInt8 register_count
= 0;
140 #if CMK_SMP && COMM_THREAD_SEND
141 static CmiInt8 MAX_BUFF_SEND
= 100*oneMB
;
142 static CmiInt8 MAX_REG_MEM
= 200*oneMB
;
144 static CmiInt8 MAX_BUFF_SEND
= 16*oneMB
;
145 static CmiInt8 MAX_REG_MEM
= 25*oneMB
;
151 #endif /* end USE_LRTS_MEMPOOL */
153 #if MULTI_THREAD_SEND
154 #define CMI_GNI_LOCK(x) CmiLock(x);
155 #define CMI_GNI_UNLOCK(x) CmiUnlock(x);
156 #define CMI_PCQUEUEPOP_LOCK(Q) CmiLock((Q)->lock);
157 #define CMI_PCQUEUEPOP_UNLOCK(Q) CmiUnlock((Q)->lock);
159 #define CMI_GNI_LOCK(x)
160 #define CMI_GNI_UNLOCK(x)
161 #define CMI_PCQUEUEPOP_LOCK(Q)
162 #define CMI_PCQUEUEPOP_UNLOCK(Q)
165 static int _tlbpagesize
= 4096;
167 //static int _smpd_count = 0;
169 static int user_set_flag
= 0;
171 static int _checkProgress
= 1; /* check deadlock */
172 static int _detected_hang
= 0;
174 #define SMSG_ATTR_SIZE sizeof(gni_smsg_attr_t)
177 static int useDynamicSMSG
=0; /* dynamic smsgs setup */
179 static int avg_smsg_connection
= 32;
180 static int *smsg_connected_flag
= 0;
181 static gni_smsg_attr_t
**smsg_attr_vector_local
;
182 static gni_smsg_attr_t
**smsg_attr_vector_remote
;
183 static gni_ep_handle_t ep_hndl_unbound
;
184 static gni_smsg_attr_t send_smsg_attr
;
185 static gni_smsg_attr_t recv_smsg_attr
;
187 typedef struct _dynamic_smsg_mailbox
{
191 gni_mem_handle_t mem_hndl
;
192 struct _dynamic_smsg_mailbox
*next
;
193 }dynamic_smsg_mailbox_t
;
195 static dynamic_smsg_mailbox_t
*mailbox_list
;
197 static CmiUInt8 smsg_send_count
= 0, last_smsg_send_count
= 0;
198 static CmiUInt8 smsg_recv_count
= 0, last_smsg_recv_count
= 0;
201 int lrts_send_msg_id
= 0;
202 int lrts_local_done_msg
= 0;
203 int lrts_send_rdma_success
= 0;
212 #if CMK_PERSISTENT_COMM
213 #include "machine-persistent.h"
216 //#define USE_ONESIDED 1
218 //onesided implementation is wrong, since no place to restore omdh
219 #include "onesided.h"
220 onesided_hnd_t onesided_hnd
;
222 #define MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh) omdh. onesided_mem_register(handler, (uint64_t)msg, size, 0, myomdh)
224 #define MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh) onesided_mem_deregister(handler, myomdh)
227 uint8_t onesided_hnd
, omdh
;
229 #if REMOTE_EVENT || CQWRITE
230 #define MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdhh, cqh, status) \
231 if(register_memory_size+size>= MAX_REG_MEM) { \
232 status = GNI_RC_ERROR_NOMEM;} \
233 else {status = GNI_MemRegister(nic_hndl, (uint64_t)msg, (uint64_t)size, cqh, GNI_MEM_READWRITE, -1, mem_hndl); \
234 if(status == GNI_RC_SUCCESS) register_memory_size += size; }
236 #define MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh, cqh, status ) \
237 if (register_memory_size + size >= MAX_REG_MEM) { \
238 status = GNI_RC_ERROR_NOMEM; \
239 } else { status = GNI_MemRegister(nic_hndl, (uint64_t)msg, (uint64_t)size, NULL, GNI_MEM_READWRITE, -1, mem_hndl); \
240 if(status == GNI_RC_SUCCESS) register_memory_size += size; }
243 #define MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh, size) \
244 do { if (GNI_MemDeregister(nic_hndl, (mem_hndl) ) == GNI_RC_SUCCESS) \
245 register_memory_size -= size; \
246 else CmiAbort("MEM_DEregister"); \
250 #define GetMempoolBlockPtr(x) (((mempool_header*)((char*)(x)-ALIGNBUF))->block_ptr)
251 #define GetMempoolPtr(x) GetMempoolBlockPtr(x)->mptr
252 #define GetMempoolsize(x) GetMempoolBlockPtr(x)->size
253 #define GetMemHndl(x) GetMempoolBlockPtr(x)->mem_hndl
254 #define IncreaseMsgInRecv(x) (GetMempoolBlockPtr(x)->msgs_in_recv)++
255 #define DecreaseMsgInRecv(x) (GetMempoolBlockPtr(x)->msgs_in_recv)--
256 #define IncreaseMsgInSend(x) (GetMempoolBlockPtr(x)->msgs_in_send)++
257 #define DecreaseMsgInSend(x) (GetMempoolBlockPtr(x)->msgs_in_send)--
258 #define NoMsgInSend(x) GetMempoolBlockPtr(x)->msgs_in_send == 0
259 #define NoMsgInRecv(x) GetMempoolBlockPtr(x)->msgs_in_recv == 0
260 #define NoMsgInFlight(x) (GetMempoolBlockPtr(x)->msgs_in_send + GetMempoolBlockPtr(x)->msgs_in_recv == 0)
261 #define IsMemHndlZero(x) ((x).qword1 == 0 && (x).qword2 == 0)
262 #define SetMemHndlZero(x) do {(x).qword1 = 0;(x).qword2 = 0;} while (0)
263 #define NotRegistered(x) IsMemHndlZero(((block_header*)x)->mem_hndl)
265 #define GetMemHndlFromBlockHeader(x) ((block_header*)x)->mem_hndl
266 #define GetSizeFromBlockHeader(x) ((block_header*)x)->size
268 #define CmiGetMsgSize(m) ((CmiMsgHeaderExt*)m)->size
269 #define CmiSetMsgSize(m,s) ((((CmiMsgHeaderExt*)m)->size)=(s))
270 #define CmiGetMsgSeq(m) ((CmiMsgHeaderExt*)m)->seq
271 #define CmiSetMsgSeq(m, s) ((((CmiMsgHeaderExt*)m)->seq) = (s))
275 /* =======Beginning of Definitions of Performance-Specific Macros =======*/
276 /* If SMSG is not used */
278 #define FMA_PER_CORE 1024
279 #define FMA_BUFFER_SIZE 1024
281 /* If SMSG is used */
282 static int SMSG_MAX_MSG
= 1024;
283 #define SMSG_MAX_CREDIT 72
285 #define MSGQ_MAXSIZE 2048
287 /* large message transfer with FMA or BTE */
289 #define LRTS_GNI_RDMA_THRESHOLD 1024
291 /* remote events only work with RDMA */
292 #define LRTS_GNI_RDMA_THRESHOLD 0
296 static int REMOTE_QUEUE_ENTRIES
=163840;
297 static int LOCAL_QUEUE_ENTRIES
=163840;
299 static int REMOTE_QUEUE_ENTRIES
=20480;
300 static int LOCAL_QUEUE_ENTRIES
=20480;
303 #define BIG_MSG_TAG 0x26
304 #define PUT_DONE_TAG 0x28
305 #define DIRECT_PUT_DONE_TAG 0x29
307 /* SMSG is data message */
308 #define SMALL_DATA_TAG 0x31
309 /* SMSG is a control message to initialize a BTE */
310 #define LMSG_INIT_TAG 0x39
317 #define GNI_RC_CHECK(msg,rc) do { if(rc != GNI_RC_SUCCESS) { printf("[%d] %s; err=%s\n",CmiMyPe(),msg,gni_err_str[rc]); CmiAbort("GNI_RC_CHECK"); } } while(0)
319 #define GNI_RC_CHECK(msg,rc)
322 #define ALIGN64(x) (size_t)((~63)&((x)+63))
323 //#define ALIGN4(x) (size_t)((~3)&((x)+3))
324 #define ALIGNHUGEPAGE(x) (size_t)((~(_tlbpagesize-1))&((x)+_tlbpagesize-1))
326 static int useStaticMSGQ
= 0;
327 static int useStaticFMA
= 0;
328 static int mysize
, myrank
;
329 static gni_nic_handle_t nic_hndl
;
332 gni_mem_handle_t mdh
;
335 // this is related to dynamic SMSG
337 typedef struct mdh_addr_list
{
338 gni_mem_handle_t mdh
;
340 struct mdh_addr_list
*next
;
343 static unsigned int smsg_memlen
;
344 gni_smsg_attr_t
**smsg_local_attr_vec
= 0;
345 mdh_addr_t setup_mem
;
346 mdh_addr_t
*smsg_connection_vec
= 0;
347 gni_mem_handle_t smsg_connection_memhndl
;
348 static int smsg_expand_slots
= 10;
349 static int smsg_available_slot
= 0;
350 static void *smsg_mailbox_mempool
= 0;
351 mdh_addr_list_t
*smsg_dynamic_list
= 0;
353 static void *smsg_mailbox_base
;
354 gni_msgq_attr_t msgq_attrs
;
355 gni_msgq_handle_t msgq_handle
;
356 gni_msgq_ep_attr_t msgq_ep_attrs
;
357 gni_msgq_ep_attr_t msgq_ep_attrs_size
;
359 /* =====Beginning of Declarations of Machine Specific Variables===== */
361 static int modes
= 0;
362 static gni_cq_handle_t smsg_rx_cqh
= NULL
;
363 static gni_cq_handle_t default_tx_cqh
= NULL
;
364 static gni_cq_handle_t rdma_tx_cqh
= NULL
;
365 static gni_cq_handle_t rdma_rx_cqh
= NULL
;
366 static gni_cq_handle_t post_tx_cqh
= NULL
;
367 static gni_ep_handle_t
*ep_hndl_array
;
369 static CmiNodeLock
*ep_lock_array
;
370 static CmiNodeLock default_tx_cq_lock
;
371 static CmiNodeLock rdma_tx_cq_lock
;
372 static CmiNodeLock global_gni_lock
;
373 static CmiNodeLock rx_cq_lock
;
374 static CmiNodeLock smsg_mailbox_lock
;
375 static CmiNodeLock smsg_rx_cq_lock
;
376 static CmiNodeLock
*mempool_lock
;
377 //#define CMK_WITH_STATS 0
378 typedef struct msg_list
385 struct msg_list
*next
;
388 double creation_time
;
393 typedef struct control_msg
395 uint64_t source_addr
; /* address from the start of buffer */
396 uint64_t dest_addr
; /* address from the start of buffer */
397 int total_length
; /* total length */
398 int length
; /* length of this packet */
400 int ack_index
; /* index from integer to address */
402 uint8_t seq_id
; //big message 0 meaning single message
403 gni_mem_handle_t source_mem_hndl
;
404 struct control_msg
*next
;
407 #define CONTROL_MSG_SIZE (sizeof(CONTROL_MSG)-sizeof(void*))
409 typedef struct ack_msg
411 uint64_t source_addr
; /* address from the start of buffer */
412 #if ! USE_LRTS_MEMPOOL
413 gni_mem_handle_t source_mem_hndl
;
414 int length
; /* total length */
416 struct ack_msg
*next
;
419 #define ACK_MSG_SIZE (sizeof(ACK_MSG)-sizeof(void*))
423 uint64_t handler_addr
;
427 char core
[CmiMsgHeaderSizeBytes
];
432 CpvDeclare(int, CmiHandleDirectIdx
);
433 void CmiHandleDirectMsg(cmidirectMsg
* msg
)
436 CmiDirectUserHandle
*_handle
= (CmiDirectUserHandle
*)(msg
->handler
);
437 (*(_handle
->callbackFnPtr
))(_handle
->callbackData
);
443 CpvInitialize(int, CmiHandleDirectIdx
);
444 CpvAccess(CmiHandleDirectIdx
) = CmiRegisterHandler( (CmiHandler
) CmiHandleDirectMsg
);
448 typedef struct rmda_msg
454 gni_post_descriptor_t
*pd
;
456 struct rmda_msg
*next
;
463 #define ONE_SEND_QUEUE 0
465 typedef struct msg_list_index
472 #if !ONE_SEND_QUEUE && SMP_LOCKS
473 PCQueue nonEmptyQueues
;
477 static RDMA_REQUEST
*sendRdmaBuf
= 0;
478 static RDMA_REQUEST
*sendRdmaTail
= 0;
479 typedef struct msg_list_index
482 MSG_LIST
*sendSmsgBuf
;
488 // buffered send queue
490 typedef struct smsg_queue
492 MSG_LIST_INDEX
*smsg_msglist_index
;
496 typedef struct smsg_queue
502 SMSG_QUEUE smsg_queue
;
504 SMSG_QUEUE smsg_oob_queue
;
509 #define FreeMsgList(d) free(d);
510 #define MallocMsgList(d) d = ((MSG_LIST*)malloc(sizeof(MSG_LIST)));
514 static MSG_LIST
*msglist_freelist
=0;
516 #define FreeMsgList(d) \
518 (d)->next = msglist_freelist;\
519 msglist_freelist = d; \
522 #define MallocMsgList(d) \
524 d = msglist_freelist;\
525 if (d==0) {d = ((MSG_LIST*)malloc(sizeof(MSG_LIST)));\
527 } else msglist_freelist = d->next; \
535 #define FreeControlMsg(d) free(d);
536 #define MallocControlMsg(d) d = ((CONTROL_MSG*)malloc(sizeof(CONTROL_MSG)));
540 static CONTROL_MSG
*control_freelist
=0;
542 #define FreeControlMsg(d) \
544 (d)->next = control_freelist;\
545 control_freelist = d; \
548 #define MallocControlMsg(d) \
550 d = control_freelist;\
551 if (d==0) {d = ((CONTROL_MSG*)malloc(sizeof(CONTROL_MSG)));\
553 } else control_freelist = d->next; \
560 #define FreeAckMsg(d) free(d);
561 #define MallocAckMsg(d) d = ((ACK_MSG*)malloc(sizeof(ACK_MSG)));
565 static ACK_MSG
*ack_freelist
=0;
567 #define FreeAckMsg(d) \
569 (d)->next = ack_freelist;\
573 #define MallocAckMsg(d) \
576 if (d==0) {d = ((ACK_MSG*)malloc(sizeof(ACK_MSG)));\
578 } else ack_freelist = d->next; \
585 #define FreeRdmaRequest(d) free(d);
586 #define MallocRdmaRequest(d) d = ((RDMA_REQUEST*)malloc(sizeof(RDMA_REQUEST)));
589 static RDMA_REQUEST
*rdma_freelist
= NULL
;
591 #define FreeRdmaRequest(d) \
593 (d)->next = rdma_freelist;\
597 #define MallocRdmaRequest(d) \
600 if (d==0) {d = ((RDMA_REQUEST*)malloc(sizeof(RDMA_REQUEST)));\
602 } else rdma_freelist = d->next; \
607 /* reuse gni_post_descriptor_t */
608 static gni_post_descriptor_t
*post_freelist
=0;
611 #define FreePostDesc(d) \
612 (d)->next_descr = post_freelist;\
615 #define MallocPostDesc(d) \
618 d = ((gni_post_descriptor_t*)malloc(sizeof(gni_post_descriptor_t)));\
621 } else post_freelist = d->next_descr;
624 #define FreePostDesc(d) free(d);
625 #define MallocPostDesc(d) d = ((gni_post_descriptor_t*)malloc(sizeof(gni_post_descriptor_t))); _MEMCHECK(d);
630 /* LrtsSent is called but message can not be sent by SMSGSend because of mailbox full or no credit */
631 static int buffered_smsg_counter
= 0;
633 /* SmsgSend return success but message sent is not confirmed by remote side */
634 static MSG_LIST
*buffered_fma_head
= 0;
635 static MSG_LIST
*buffered_fma_tail
= 0;
638 #define IsFree(a,ind) !( a& (1<<(ind) ))
639 #define SET_BITS(a,ind) a = ( a | (1<<(ind )) )
640 #define Reset(a,ind) a = ( a & (~(1<<(ind))) )
642 CpvDeclare(mempool_type
*, mempool
);
645 /* ack pool for remote events */
648 #define ACK_EVENT(idx) (((idx)<<ACK_SHIFT) | myrank)
649 #define ACK_GET_RANK(evt) ((evt) & ((1<<ACK_SHIFT)-1))
650 #define ACK_GET_INDEX(evt) ((evt) >> ACK_SHIFT)
655 int type
; // 1: ACK 2: Persistent
658 typedef struct IndexPool
{
659 struct IndexStruct
*indexes
;
665 static IndexPool ackPool
;
668 #define GetIndexType(s) (ackPool.indexes[s].type)
669 #define GetIndexAddress(s) (ackPool.indexes[s].addr)
671 static void IndexPool_init(IndexPool
*pool
)
674 if ((1<<ACK_SHIFT
) < mysize
)
675 CmiAbort("Charm++ Error: Remote event's rank field overflow.");
677 pool
->indexes
= (struct IndexStruct
*)malloc(pool
->size
*sizeof(struct IndexStruct
));
678 for (i
=0; i
<pool
->size
-1; i
++) {
679 pool
->indexes
[i
].next
= i
+1;
681 pool
->indexes
[i
].next
= -1;
683 #if MULTI_THREAD_SEND
684 pool
->lock
= CmiCreateLock();
689 inline int IndexPool_getslot(IndexPool
*pool
, void *addr
, int type
)
693 CMI_GNI_LOCK(pool
->lock
);
696 int newsize
= pool
->size
* 2;
697 printf("[%d] AckPool_getslot expand to: %d\n", myrank
, newsize
);
698 if (newsize
> (1<<(32-ACK_SHIFT
))) CmiAbort("AckPool too large");
699 struct IndexStruct
*old_ackpool
= pool
->indexes
;
700 pool
->indexes
= (struct IndexStruct
*)malloc(newsize
*sizeof(struct IndexStruct
));
701 memcpy(pool
->indexes
, old_ackpool
, pool
->size
*sizeof(struct IndexStruct
));
702 for (i
=pool
->size
; i
<newsize
-1; i
++) {
703 pool
->indexes
[i
].next
= i
+1;
705 pool
->indexes
[i
].next
= -1;
706 pool
->freehead
= pool
->size
;
708 pool
->size
= newsize
;
711 pool
->freehead
= pool
->indexes
[s
].next
;
712 pool
->indexes
[s
].addr
= addr
;
713 pool
->indexes
[s
].type
= type
;
714 CMI_GNI_UNLOCK(pool
->lock
);
719 inline void IndexPool_freeslot(IndexPool
*pool
, int s
)
721 CmiAssert(s
>=0 && s
<pool
->size
);
722 CMI_GNI_LOCK(pool
->lock
);
723 pool
->indexes
[s
].next
= pool
->freehead
;
725 CMI_GNI_UNLOCK(pool
->lock
);
731 /* =====Beginning of Definitions of Message-Corruption Related Macros=====*/
732 #define CMI_MAGIC(msg) ((CmiMsgHeaderBasic *)msg)->magic
733 #define CHARM_MAGIC_NUMBER 126
735 #if CMK_ERROR_CHECKING
736 extern unsigned char computeCheckSum(unsigned char *data
, int len
);
737 static int checksum_flag
= 0;
738 #define CMI_SET_CHECKSUM(msg, len) \
739 if (checksum_flag) { \
740 ((CmiMsgHeaderBasic *)msg)->cksum = 0; \
741 ((CmiMsgHeaderBasic *)msg)->cksum = computeCheckSum((unsigned char*)msg, len); \
743 #define CMI_CHECK_CHECKSUM(msg, len) \
745 if (computeCheckSum((unsigned char*)msg, len) != 0) \
746 CmiAbort("Fatal error: checksum doesn't agree!\n");
748 #define CMI_SET_CHECKSUM(msg, len)
749 #define CMI_CHECK_CHECKSUM(msg, len)
751 /* =====End of Definitions of Message-Corruption Related Macros=====*/
753 static int print_stats
= 0;
754 static int stats_off
= 0;
755 void CmiTurnOnStats()
760 void CmiTurnOffStats()
766 FILE *counterLog
= NULL
;
767 typedef struct comm_thread_stats
769 uint64_t smsg_data_count
;
770 uint64_t lmsg_init_count
;
772 uint64_t big_msg_ack_count
;
774 //times of calling SmsgSend
775 uint64_t try_smsg_data_count
;
776 uint64_t try_lmsg_init_count
;
777 uint64_t try_ack_count
;
778 uint64_t try_big_msg_ack_count
;
779 uint64_t try_smsg_count
;
781 double max_time_in_send_buffered_smsg
;
782 double all_time_in_send_buffered_smsg
;
785 uint64_t try_rdma_count
;
786 double max_time_from_control_to_rdma_init
;
787 double all_time_from_control_to_rdma_init
;
789 double max_time_from_rdma_init_to_rdma_done
;
790 double all_time_from_rdma_init_to_rdma_done
;
793 static Comm_Thread_Stats comm_stats
;
795 static void init_comm_stats()
797 memset(&comm_stats
, 0, sizeof(Comm_Thread_Stats
));
800 #define SMSG_CREATION( x ) if(print_stats && !stats_off) { x->creation_time = CmiWallTimer(); }
802 #define SMSG_SENT_DONE(creation_time, tag) \
803 if (print_stats && !stats_off) { if( tag == SMALL_DATA_TAG) comm_stats.smsg_data_count++; \
804 else if( tag == LMSG_INIT_TAG) comm_stats.lmsg_init_count++; \
805 else if( tag == ACK_TAG) comm_stats.ack_count++; \
806 else if( tag == BIG_MSG_TAG) comm_stats.big_msg_ack_count++; \
807 comm_stats.smsg_count++; \
808 double inbuff_time = CmiWallTimer() - creation_time; \
809 if(inbuff_time > comm_stats.max_time_in_send_buffered_smsg) comm_stats.max_time_in_send_buffered_smsg= inbuff_time; \
810 comm_stats.all_time_in_send_buffered_smsg += inbuff_time; \
813 #define SMSG_TRY_SEND(tag) \
814 if (print_stats && !stats_off){ if( tag == SMALL_DATA_TAG) comm_stats.try_smsg_data_count++; \
815 else if( tag == LMSG_INIT_TAG) comm_stats.try_lmsg_init_count++; \
816 else if( tag == ACK_TAG) comm_stats.try_ack_count++; \
817 else if( tag == BIG_MSG_TAG) comm_stats.try_big_msg_ack_count++; \
818 comm_stats.try_smsg_count++; \
821 #define RDMA_TRY_SEND() if (print_stats && !stats_off) {comm_stats.try_rdma_count++;}
823 #define RDMA_TRANS_DONE(x) \
824 if (print_stats && !stats_off) { double rdma_trans_time = CmiWallTimer() - x ; \
825 if(rdma_trans_time > comm_stats.max_time_from_rdma_init_to_rdma_done) comm_stats.max_time_from_rdma_init_to_rdma_done = rdma_trans_time; \
826 comm_stats.all_time_from_rdma_init_to_rdma_done += rdma_trans_time; \
829 #define RDMA_TRANS_INIT(x) \
830 if (print_stats && !stats_off) { comm_stats.rdma_count++; \
831 double rdma_trans_time = CmiWallTimer() - x ; \
832 if(rdma_trans_time > comm_stats.max_time_from_control_to_rdma_init) comm_stats.max_time_from_control_to_rdma_init = rdma_trans_time; \
833 comm_stats.all_time_from_control_to_rdma_init += rdma_trans_time; \
837 static void print_comm_stats()
839 fprintf(counterLog
, "Node[%d]SMSG time in buffer\t[max:%f\tAverage:%f](milisecond)\n", myrank
, 1000*comm_stats
.max_time_in_send_buffered_smsg
, 1000.0*comm_stats
.all_time_in_send_buffered_smsg
/comm_stats
.smsg_count
);
840 fprintf(counterLog
, "Node[%d]Smsg Msgs \t[Total:%lld\t Data:%lld\t Lmsg_Init:%lld\t ACK:%lld\t BIG_MSG_ACK:%lld]\n", myrank
,
841 comm_stats
.smsg_count
, comm_stats
.smsg_data_count
, comm_stats
.lmsg_init_count
,
842 comm_stats
.ack_count
, comm_stats
.big_msg_ack_count
);
844 fprintf(counterLog
, "Node[%d]SmsgSendCalls\t[Total:%lld\t Data:%lld\t Lmsg_Init:%lld\t ACK:%lld\t BIG_MSG_ACK:%lld]\n\n", myrank
,
845 comm_stats
.try_smsg_count
, comm_stats
.try_smsg_data_count
, comm_stats
.try_lmsg_init_count
,
846 comm_stats
.try_ack_count
, comm_stats
.try_big_msg_ack_count
);
848 fprintf(counterLog
, "Node[%d]Rdma Transaction [count:%lld\t calls:%lld]\n", myrank
, comm_stats
.rdma_count
, comm_stats
.try_rdma_count
);
849 fprintf(counterLog
, "Node[%d]Rdma time from control arrives to rdma init [MAX:%f\t Average:%f](milisecond)\n", myrank
, 1000.0*comm_stats
.max_time_from_control_to_rdma_init
, 1000.0*comm_stats
.all_time_from_control_to_rdma_init
/comm_stats
.rdma_count
);
850 fprintf(counterLog
, "Node[%d]Rdma time from init to rdma done [MAX:%f\t Average:%f](milisecond)\n\n", myrank
, 1000.0*comm_stats
.max_time_from_rdma_init_to_rdma_done
, 1000.0*comm_stats
.all_time_from_rdma_init_to_rdma_done
/comm_stats
.rdma_count
);
854 #define STATS_ACK_TIME(x) x
855 #define STATS_SEND_SMSGS_TIME(x) x
859 allgather(void *in
,void *out
, int len
)
861 static int *ivec_ptr
=NULL
,already_called
=0,job_size
=0;
864 char *tmp_buf
,*out_ptr
;
866 if(!already_called
) {
868 rc
= PMI_Get_size(&job_size
);
869 CmiAssert(rc
== PMI_SUCCESS
);
870 rc
= PMI_Get_rank(&my_rank
);
871 CmiAssert(rc
== PMI_SUCCESS
);
873 ivec_ptr
= (int *)malloc(sizeof(int) * job_size
);
874 CmiAssert(ivec_ptr
!= NULL
);
876 rc
= PMI_Allgather(&my_rank
,ivec_ptr
,sizeof(int));
877 CmiAssert(rc
== PMI_SUCCESS
);
883 tmp_buf
= (char *)malloc(job_size
* len
);
886 rc
= PMI_Allgather(in
,tmp_buf
,len
);
887 CmiAssert(rc
== PMI_SUCCESS
);
891 for(i
=0;i
<job_size
;i
++) {
893 memcpy(&out_ptr
[len
* ivec_ptr
[i
]],&tmp_buf
[i
* len
],len
);
901 allgather_2(void *in
,void *out
, int len
)
903 //PMI_Allgather is out of order
904 int i
,rc
, extend_len
;
906 char *out_ptr
, *out_ref
;
909 extend_len
= sizeof(int) + len
;
910 in2
= (char*)malloc(extend_len
);
912 memcpy(in2
, &myrank
, sizeof(int));
913 memcpy(in2
+sizeof(int), in
, len
);
915 out_ptr
= (char*)malloc(mysize
*extend_len
);
917 rc
= PMI_Allgather(in2
, out_ptr
, extend_len
);
918 GNI_RC_CHECK("allgather", rc
);
922 for(i
=0;i
<mysize
;i
++) {
924 memcpy(&rank_index
, &(out_ptr
[extend_len
*i
]), sizeof(int));
925 //copy to the rank index slot
926 memcpy(&out_ref
[rank_index
*len
], &out_ptr
[extend_len
*i
+sizeof(int)], len
);
934 static unsigned int get_gni_nic_address(int device_id
)
936 unsigned int address
, cpu_id
;
938 int i
, alps_dev_id
=-1,alps_address
=-1;
941 p_ptr
= getenv("PMI_GNI_DEV_ID");
943 status
= GNI_CdmGetNicAddress(device_id
, &address
, &cpu_id
);
945 GNI_RC_CHECK("GNI_CdmGetNicAddress", status
);
947 while ((token
= strtok(p_ptr
,":")) != NULL
) {
948 alps_dev_id
= atoi(token
);
949 if (alps_dev_id
== device_id
) {
954 CmiAssert(alps_dev_id
!= -1);
955 p_ptr
= getenv("PMI_GNI_LOC_ADDR");
956 CmiAssert(p_ptr
!= NULL
);
958 while ((token
= strtok(p_ptr
,":")) != NULL
) {
959 if (i
== alps_dev_id
) {
960 alps_address
= atoi(token
);
966 CmiAssert(alps_address
!= -1);
967 address
= alps_address
;
972 static uint8_t get_ptag(void)
977 p_ptr
= getenv("PMI_GNI_PTAG");
978 CmiAssert(p_ptr
!= NULL
);
979 token
= strtok(p_ptr
, ":");
980 ptag
= (uint8_t)atoi(token
);
985 static uint32_t get_cookie(void)
990 p_ptr
= getenv("PMI_GNI_COOKIE");
991 CmiAssert(p_ptr
!= NULL
);
992 token
= strtok(p_ptr
, ":");
993 cookie
= (uint32_t)atoi(token
);
1000 /* directly mmap memory from hugetlbfs for large pages */
1002 #include <sys/stat.h>
1004 #include <sys/mman.h>
1005 #include <hugetlbfs.h>
1007 // size must be _tlbpagesize aligned
1008 void *my_get_huge_pages(size_t size
)
1012 mode_t mode
= S_IRUSR
| S_IWUSR
| S_IRGRP
| S_IROTH
;
1015 snprintf(filename
, sizeof(filename
), "%s/charm_mempool.%d.%d", hugetlbfs_find_path_for_size(_tlbpagesize
), getpid(), rand());
1016 fd
= open(filename
, O_RDWR
| O_CREAT
, mode
);
1018 CmiAbort("my_get_huge_pages: open filed");
1020 ptr
= mmap(NULL
, size
, PROT_READ
| PROT_WRITE
, MAP_PRIVATE
, fd
, 0);
1021 if (ptr
== MAP_FAILED
) ptr
= NULL
;
1022 //printf("[%d] my_get_huge_pages: %s %d %p\n", myrank, filename, size, ptr);
1028 void my_free_huge_pages(void *ptr
, int size
)
1030 //printf("[%d] my_free_huge_pages: %p %d\n", myrank, ptr, size);
1031 int ret
= munmap(ptr
, size
);
1032 if (ret
== -1) CmiAbort("munmap failed in my_free_huge_pages");
1037 /* =====Beginning of Definitions of Message-Corruption Related Macros=====*/
1038 /* TODO: add any that are related */
1039 /* =====End of Definitions of Message-Corruption Related Macros=====*/
1042 #include "machine-lrts.h"
1043 #include "machine-common-core.c"
1045 /* Network progress function is used to poll the network when for
1046 messages. This flushes receive buffers on some implementations*/
1047 #if CMK_MACHINE_PROGRESS_DEFINED
1048 void CmiMachineProgressImpl() {
1052 static int SendBufferMsg(SMSG_QUEUE
*queue
);
1053 static void SendRdmaMsg();
1054 static void PumpNetworkSmsg();
1055 static void PumpLocalTransactions(gni_cq_handle_t tx_cqh
, CmiNodeLock cq_lock
);
1057 static void PumpCqWriteTransactions();
1060 static void PumpRemoteTransactions();
1063 #if MACHINE_DEBUG_LOG
1064 FILE *debugLog
= NULL
;
1065 static CmiInt8 buffered_recv_msg
= 0;
1066 int lrts_smsg_success
= 0;
1067 int lrts_received_msg
= 0;
1070 static void sweep_mempool(mempool_type
*mptr
)
1073 block_header
*current
= &(mptr
->block_head
);
1075 printf("[n %d %d] sweep_mempool slot START.\n", myrank
, n
++);
1076 while( current
!= NULL
) {
1077 printf("[n %d %d] sweep_mempool slot %p size: %lld used: %d (%d %d) %lld %lld.\n", myrank
, n
++, current
, current
->size
, 1<<current
->used
, current
->msgs_in_send
, current
->msgs_in_recv
, current
->mem_hndl
.qword1
, current
->mem_hndl
.qword2
);
1078 current
= current
->block_next
?(block_header
*)((char*)mptr
+current
->block_next
):NULL
;
1080 printf("[n %d] sweep_mempool slot END.\n", myrank
);
1084 static gni_return_t
deregisterMemory(mempool_type
*mptr
, block_header
**from
)
1086 block_header
*current
= *from
;
1088 //while(register_memory_size>= MAX_REG_MEM)
1090 while( current
!= NULL
&& ((current
->msgs_in_send
+current
->msgs_in_recv
)>0 || IsMemHndlZero(current
->mem_hndl
) ))
1091 current
= current
->block_next
?(block_header
*)((char*)mptr
+current
->block_next
):NULL
;
1094 if(current
== NULL
) return GNI_RC_ERROR_RESOURCE
;
1095 MEMORY_DEREGISTER(onesided_hnd
, nic_hndl
, &(GetMemHndlFromBlockHeader(current
)) , &omdh
, GetSizeFromBlockHeader(current
));
1096 SetMemHndlZero(GetMemHndlFromBlockHeader(current
));
1098 return GNI_RC_SUCCESS
;
1102 static gni_return_t
registerFromMempool(mempool_type
*mptr
, void *blockaddr
, size_t size
, gni_mem_handle_t
*memhndl
, gni_cq_handle_t cqh
)
1104 gni_return_t status
= GNI_RC_SUCCESS
;
1105 //int size = GetMempoolsize(msg);
1106 //void *blockaddr = GetMempoolBlockPtr(msg);
1107 //gni_mem_handle_t *memhndl = &(GetMemHndl(msg));
1109 block_header
*current
= &(mptr
->block_head
);
1110 while(register_memory_size
>= MAX_REG_MEM
)
1112 status
= deregisterMemory(mptr
, ¤t
);
1113 if (status
!= GNI_RC_SUCCESS
) break;
1115 if(register_memory_size
>= MAX_REG_MEM
) return status
;
1117 MACHSTATE3(8, "mempool (%lld,%lld,%d) \n", buffered_send_msg
, buffered_recv_msg
, register_memory_size
);
1120 MEMORY_REGISTER(onesided_hnd
, nic_hndl
, blockaddr
, size
, memhndl
, &omdh
, cqh
, status
);
1121 if(status
== GNI_RC_SUCCESS
)
1125 else if (status
== GNI_RC_INVALID_PARAM
|| status
== GNI_RC_PERMISSION_ERROR
)
1127 CmiAbort("Memory registor for mempool fails\n");
1131 status
= deregisterMemory(mptr
, ¤t
);
1132 if (status
!= GNI_RC_SUCCESS
) break;
1139 static gni_return_t
registerMemory(void *msg
, size_t size
, gni_mem_handle_t
*t
, gni_cq_handle_t cqh
)
1141 static int rank
= -1;
1143 gni_return_t status
;
1144 mempool_type
*mptr1
= CpvAccess(mempool
);//mempool_type*)GetMempoolPtr(msg);
1145 //mempool_type *mptr1 = (mempool_type*)GetMempoolPtr(msg);
1148 status
= registerFromMempool(mptr1
, msg
, size
, t
, cqh
);
1149 if (status
== GNI_RC_SUCCESS
) return status
;
1151 for (i
=0; i
<CmiMyNodeSize()+1; i
++) {
1152 rank
= (rank
+1)%(CmiMyNodeSize()+1);
1153 mptr
= CpvAccessOther(mempool
, rank
);
1154 if (mptr
== mptr1
) continue;
1155 status
= registerFromMempool(mptr
, msg
, size
, t
, cqh
);
1156 if (status
== GNI_RC_SUCCESS
) return status
;
1159 return GNI_RC_ERROR_RESOURCE
;
1163 static void buffer_small_msgs(SMSG_QUEUE
*queue
, void *msg
, int size
, int destNode
, uint8_t tag
)
1166 MallocMsgList(msg_tmp
);
1167 msg_tmp
->destNode
= destNode
;
1168 msg_tmp
->size
= size
;
1172 SMSG_CREATION(msg_tmp
)
1175 if (queue
->smsg_msglist_index
[destNode
].sendSmsgBuf
== 0 ) {
1176 queue
->smsg_msglist_index
[destNode
].next
= queue
->smsg_head_index
;
1177 queue
->smsg_head_index
= destNode
;
1178 queue
->smsg_msglist_index
[destNode
].tail
= queue
->smsg_msglist_index
[destNode
].sendSmsgBuf
= msg_tmp
;
1181 queue
->smsg_msglist_index
[destNode
].tail
->next
= msg_tmp
;
1182 queue
->smsg_msglist_index
[destNode
].tail
= msg_tmp
;
1186 PCQueuePush(queue
->sendMsgBuf
, (char*)msg_tmp
);
1189 CmiLock(queue
->smsg_msglist_index
[destNode
].lock
);
1190 if(queue
->smsg_msglist_index
[destNode
].pushed
== 0)
1192 PCQueuePush(nonEmptyQueues
, (char*)&(queue
->smsg_msglist_index
[destNode
]));
1194 PCQueuePush(queue
->smsg_msglist_index
[destNode
].sendSmsgBuf
, (char*)msg_tmp
);
1195 CmiUnlock(queue
->smsg_msglist_index
[destNode
].lock
);
1197 PCQueuePush(queue
->smsg_msglist_index
[destNode
].sendSmsgBuf
, (char*)msg_tmp
);
1202 buffered_smsg_counter
++;
1206 inline static void print_smsg_attr(gni_smsg_attr_t
*a
)
1208 printf("type=%d\n, credit=%d\n, size=%d\n, buf=%p, offset=%d\n", a
->msg_type
, a
->mbox_maxcredit
, a
->buff_size
, a
->msg_buffer
, a
->mbox_offset
);
1212 static void setup_smsg_connection(int destNode
)
1214 mdh_addr_list_t
*new_entry
= 0;
1215 gni_post_descriptor_t
*pd
;
1216 gni_smsg_attr_t
*smsg_attr
;
1217 gni_return_t status
= GNI_RC_NOT_DONE
;
1218 RDMA_REQUEST
*rdma_request_msg
;
1220 if(smsg_available_slot
== smsg_expand_slots
)
1222 new_entry
= (mdh_addr_list_t
*)malloc(sizeof(mdh_addr_list_t
));
1223 new_entry
->addr
= memalign(64, smsg_memlen
*smsg_expand_slots
);
1224 bzero(new_entry
->addr
, smsg_memlen
*smsg_expand_slots
);
1226 status
= GNI_MemRegister(nic_hndl
, (uint64_t)new_entry
->addr
,
1227 smsg_memlen
*smsg_expand_slots
, smsg_rx_cqh
,
1231 smsg_available_slot
= 0;
1232 new_entry
->next
= smsg_dynamic_list
;
1233 smsg_dynamic_list
= new_entry
;
1235 smsg_attr
= (gni_smsg_attr_t
*) malloc (sizeof(gni_smsg_attr_t
));
1236 smsg_attr
->msg_type
= GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT
;
1237 smsg_attr
->mbox_maxcredit
= SMSG_MAX_CREDIT
;
1238 smsg_attr
->msg_maxsize
= SMSG_MAX_MSG
;
1239 smsg_attr
->mbox_offset
= smsg_available_slot
* smsg_memlen
;
1240 smsg_attr
->buff_size
= smsg_memlen
;
1241 smsg_attr
->msg_buffer
= smsg_dynamic_list
->addr
;
1242 smsg_attr
->mem_hndl
= smsg_dynamic_list
->mdh
;
1243 smsg_local_attr_vec
[destNode
] = smsg_attr
;
1244 smsg_available_slot
++;
1246 pd
->type
= GNI_POST_FMA_PUT
;
1247 //pd->cq_mode = GNI_CQMODE_GLOBAL_EVENT | GNI_CQMODE_REMOTE_EVENT;
1248 pd
->cq_mode
= GNI_CQMODE_GLOBAL_EVENT
;
1249 pd
->dlvr_mode
= GNI_DLVMODE_PERFORMANCE
;
1250 pd
->length
= sizeof(gni_smsg_attr_t
);
1251 pd
->local_addr
= (uint64_t) smsg_attr
;
1252 pd
->remote_addr
= (uint64_t)&((((gni_smsg_attr_t
*)(smsg_connection_vec
[destNode
].addr
))[myrank
]));
1253 pd
->remote_mem_hndl
= smsg_connection_vec
[destNode
].mdh
;
1254 pd
->src_cq_hndl
= rdma_tx_cqh
;
1256 status
= GNI_PostFma(ep_hndl_array
[destNode
], pd
);
1257 print_smsg_attr(smsg_attr
);
1258 if(status
== GNI_RC_ERROR_RESOURCE
)
1260 MallocRdmaRequest(rdma_request_msg
);
1261 rdma_request_msg
->destNode
= destNode
;
1262 rdma_request_msg
->pd
= pd
;
1263 /* buffer this request */
1266 if(status
!= GNI_RC_SUCCESS
)
1267 printf("[%d=%d] send post FMA %s\n", myrank
, destNode
, gni_err_str
[status
]);
1269 printf("[%d=%d]OK send post FMA \n", myrank
, destNode
);
1273 /* useDynamicSMSG */
1275 static void alloc_smsg_attr( gni_smsg_attr_t
*local_smsg_attr
)
1277 gni_return_t status
= GNI_RC_NOT_DONE
;
1279 if(mailbox_list
->offset
== mailbox_list
->size
)
1281 dynamic_smsg_mailbox_t
*new_mailbox_entry
;
1282 new_mailbox_entry
= (dynamic_smsg_mailbox_t
*)malloc(sizeof(dynamic_smsg_mailbox_t
));
1283 new_mailbox_entry
->size
= smsg_memlen
*avg_smsg_connection
;
1284 new_mailbox_entry
->mailbox_base
= malloc(new_mailbox_entry
->size
);
1285 bzero(new_mailbox_entry
->mailbox_base
, new_mailbox_entry
->size
);
1286 new_mailbox_entry
->offset
= 0;
1288 status
= GNI_MemRegister(nic_hndl
, (uint64_t)new_mailbox_entry
->mailbox_base
,
1289 new_mailbox_entry
->size
, smsg_rx_cqh
,
1292 &(new_mailbox_entry
->mem_hndl
));
1294 GNI_RC_CHECK("register", status
);
1295 new_mailbox_entry
->next
= mailbox_list
;
1296 mailbox_list
= new_mailbox_entry
;
1298 local_smsg_attr
->msg_type
= GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT
;
1299 local_smsg_attr
->mbox_maxcredit
= SMSG_MAX_CREDIT
;
1300 local_smsg_attr
->msg_maxsize
= SMSG_MAX_MSG
;
1301 local_smsg_attr
->mbox_offset
= mailbox_list
->offset
;
1302 mailbox_list
->offset
+= smsg_memlen
;
1303 local_smsg_attr
->buff_size
= smsg_memlen
;
1304 local_smsg_attr
->msg_buffer
= mailbox_list
->mailbox_base
;
1305 local_smsg_attr
->mem_hndl
= mailbox_list
->mem_hndl
;
1308 /* useDynamicSMSG */
1310 static int connect_to(int destNode
)
1312 gni_return_t status
= GNI_RC_NOT_DONE
;
1313 CmiAssert(smsg_connected_flag
[destNode
] == 0);
1314 CmiAssert (smsg_attr_vector_local
[destNode
] == NULL
);
1315 smsg_attr_vector_local
[destNode
] = (gni_smsg_attr_t
*) malloc (sizeof(gni_smsg_attr_t
));
1316 alloc_smsg_attr(smsg_attr_vector_local
[destNode
]);
1317 smsg_attr_vector_remote
[destNode
] = (gni_smsg_attr_t
*) malloc (sizeof(gni_smsg_attr_t
));
1319 CMI_GNI_LOCK(global_gni_lock
)
1320 status
= GNI_EpPostDataWId (ep_hndl_array
[destNode
], smsg_attr_vector_local
[destNode
], sizeof(gni_smsg_attr_t
),smsg_attr_vector_remote
[destNode
] ,sizeof(gni_smsg_attr_t
), destNode
+mysize
);
1321 CMI_GNI_UNLOCK(global_gni_lock
)
1322 if (status
== GNI_RC_ERROR_RESOURCE
) {
1323 /* possibly destNode is making connection at the same time */
1324 free(smsg_attr_vector_local
[destNode
]);
1325 smsg_attr_vector_local
[destNode
] = NULL
;
1326 free(smsg_attr_vector_remote
[destNode
]);
1327 smsg_attr_vector_remote
[destNode
] = NULL
;
1328 mailbox_list
->offset
-= smsg_memlen
;
1331 GNI_RC_CHECK("GNI_Post", status
);
1332 smsg_connected_flag
[destNode
] = 1;
1337 static gni_return_t
send_smsg_message(SMSG_QUEUE
*queue
, int destNode
, void *msg
, int size
, uint8_t tag
, int inbuff
, MSG_LIST
*ptr
)
1339 unsigned int remote_address
;
1341 gni_return_t status
= GNI_RC_ERROR_RESOURCE
;
1342 gni_smsg_attr_t
*smsg_attr
;
1343 gni_post_descriptor_t
*pd
;
1344 gni_post_state_t post_state
;
1347 if (useDynamicSMSG
) {
1348 switch (smsg_connected_flag
[destNode
]) {
1350 connect_to(destNode
); /* continue to case 1 */
1351 case 1: /* pending connection, do nothing */
1352 status
= GNI_RC_NOT_DONE
;
1354 buffer_small_msgs(queue
, msg
, size
, destNode
, tag
);
1359 #if ! ONE_SEND_QUEUE
1360 if(PCQueueEmpty(queue
->smsg_msglist_index
[destNode
].sendSmsgBuf
) || inbuff
==1)
1364 if(queue
->smsg_msglist_index
[destNode
].sendSmsgBuf
== 0 || inbuff
==1)
1367 //CMI_GNI_LOCK(smsg_mailbox_lock)
1368 CMI_GNI_LOCK(default_tx_cq_lock
)
1369 #if CMK_SMP_TRACE_COMMTHREAD
1371 int oldeventid
= -1;
1372 if(tag
== SMALL_DATA_TAG
|| tag
== LMSG_INIT_TAG
)
1375 if ( tag
== SMALL_DATA_TAG
)
1376 real_data
= (char*)msg
;
1378 real_data
= (char*)(((CONTROL_MSG
*)msg
)->source_addr
);
1379 TRACE_COMM_GET_MSGID(real_data
, &oldpe
, &oldeventid
);
1380 TRACE_COMM_SET_COMM_MSGID(real_data
);
1384 if (tag
== LMSG_INIT_TAG
) {
1385 CONTROL_MSG
*control_msg_tmp
= (CONTROL_MSG
*)msg
;
1386 if (control_msg_tmp
->seq_id
== 0 && control_msg_tmp
->ack_index
== -1)
1387 control_msg_tmp
->ack_index
= IndexPool_getslot(&ackPool
, (void*)control_msg_tmp
->source_addr
, 1);
1389 // GNI_EpSetEventData(ep_hndl_array[destNode], destNode, myrank);
1395 double creation_time
;
1397 creation_time
= CmiWallTimer();
1399 creation_time
= ptr
->creation_time
;
1402 status
= GNI_SmsgSendWTag(ep_hndl_array
[destNode
], NULL
, 0, msg
, size
, 0, tag
);
1403 #if CMK_SMP_TRACE_COMMTHREAD
1404 if (oldpe
!= -1) TRACE_COMM_SET_MSGID(real_data
, oldpe
, oldeventid
);
1406 CMI_GNI_UNLOCK(default_tx_cq_lock
)
1407 //CMI_GNI_UNLOCK(smsg_mailbox_lock)
1408 if(status
== GNI_RC_SUCCESS
)
1411 SMSG_SENT_DONE(creation_time
,tag
)
1413 #if CMK_SMP_TRACE_COMMTHREAD
1414 if(tag
== SMALL_DATA_TAG
|| tag
== LMSG_INIT_TAG
)
1416 TRACE_COMM_CREATION(CpvAccess(projTraceStart
), real_data
);
1420 status
= GNI_RC_ERROR_RESOURCE
;
1422 if(status
!= GNI_RC_SUCCESS
&& inbuff
==0)
1423 buffer_small_msgs(queue
, msg
, size
, destNode
, tag
);
1428 static CONTROL_MSG
* construct_control_msg(int size
, char *msg
, uint8_t seqno
)
1430 /* construct a control message and send */
1431 CONTROL_MSG
*control_msg_tmp
;
1432 MallocControlMsg(control_msg_tmp
);
1433 control_msg_tmp
->source_addr
= (uint64_t)msg
;
1434 control_msg_tmp
->seq_id
= seqno
;
1435 control_msg_tmp
->total_length
= control_msg_tmp
->length
= ALIGN64(size
); //for GET 4 bytes aligned
1437 control_msg_tmp
->ack_index
= -1;
1439 #if USE_LRTS_MEMPOOL
1442 control_msg_tmp
->source_mem_hndl
= GetMemHndl(msg
);
1446 SetMemHndlZero(control_msg_tmp
->source_mem_hndl
);
1447 control_msg_tmp
->length
= size
- (seqno
-1)*ONE_SEG
;
1448 if (control_msg_tmp
->length
> ONE_SEG
) control_msg_tmp
->length
= ONE_SEG
;
1451 SetMemHndlZero(control_msg_tmp
->source_mem_hndl
);
1453 return control_msg_tmp
;
1456 #define BLOCKING_SEND_CONTROL 0
1458 // Large message, send control to receiver, receiver register memory and do a GET,
1459 // return 1 - send no success
1460 inline static gni_return_t
send_large_messages(SMSG_QUEUE
*queue
, int destNode
, CONTROL_MSG
*control_msg_tmp
, int inbuff
, MSG_LIST
*smsg_ptr
)
1462 gni_return_t status
= GNI_RC_ERROR_NOMEM
;
1463 uint32_t vmdh_index
= -1;
1466 uint64_t source_addr
;
1470 size
= control_msg_tmp
->total_length
;
1471 source_addr
= control_msg_tmp
->source_addr
;
1472 register_size
= control_msg_tmp
->length
;
1474 #if USE_LRTS_MEMPOOL
1475 if( control_msg_tmp
->seq_id
== 0 ){
1476 #if BLOCKING_SEND_CONTROL
1477 if (inbuff
== 0 && IsMemHndlZero(GetMemHndl(source_addr
))) {
1478 while (IsMemHndlZero(GetMemHndl(source_addr
)) && buffered_send_msg
+ GetMempoolsize((void*)source_addr
) >= MAX_BUFF_SEND
)
1479 LrtsAdvanceCommunication(0);
1482 if(IsMemHndlZero(GetMemHndl(source_addr
))) //it is in mempool, it is possible to be de-registered by others
1484 msg
= (void*)source_addr
;
1485 if(buffered_send_msg
+ GetMempoolsize(msg
) >= MAX_BUFF_SEND
)
1488 buffer_small_msgs(queue
, control_msg_tmp
, CONTROL_MSG_SIZE
, destNode
, LMSG_INIT_TAG
);
1489 return GNI_RC_ERROR_NOMEM
;
1491 //register the corresponding mempool
1492 status
= registerMemory(GetMempoolBlockPtr(msg
), GetMempoolsize(msg
), &(GetMemHndl(msg
)), rdma_rx_cqh
);
1493 if(status
== GNI_RC_SUCCESS
)
1495 control_msg_tmp
->source_mem_hndl
= GetMemHndl(source_addr
);
1499 control_msg_tmp
->source_mem_hndl
= GetMemHndl(source_addr
);
1500 status
= GNI_RC_SUCCESS
;
1502 if(NoMsgInSend(source_addr
))
1503 register_size
= GetMempoolsize((void*)(source_addr
));
1506 }else if(control_msg_tmp
->seq_id
>0) // BIG_MSG
1508 int offset
= ONE_SEG
*(control_msg_tmp
->seq_id
-1);
1509 source_addr
+= offset
;
1510 size
= control_msg_tmp
->length
;
1511 #if BLOCKING_SEND_CONTROL
1512 if (inbuff
== 0 && IsMemHndlZero(control_msg_tmp
->source_mem_hndl
)) {
1513 while (IsMemHndlZero(control_msg_tmp
->source_mem_hndl
) && buffered_send_msg
+ size
>= MAX_BUFF_SEND
)
1514 LrtsAdvanceCommunication(0);
1517 if (IsMemHndlZero(control_msg_tmp
->source_mem_hndl
)) {
1518 if(buffered_send_msg
+ size
>= MAX_BUFF_SEND
)
1521 buffer_small_msgs(queue
, control_msg_tmp
, CONTROL_MSG_SIZE
, destNode
, LMSG_INIT_TAG
);
1522 return GNI_RC_ERROR_NOMEM
;
1524 status
= registerMemory((void*)source_addr
, ALIGN64(size
), &(control_msg_tmp
->source_mem_hndl
), NULL
);
1525 if(status
== GNI_RC_SUCCESS
) buffered_send_msg
+= ALIGN64(size
);
1529 status
= GNI_RC_SUCCESS
;
1534 if(status
== GNI_RC_SUCCESS
)
1536 status
= send_smsg_message( queue
, destNode
, control_msg_tmp
, CONTROL_MSG_SIZE
, LMSG_INIT_TAG
, inbuff
, smsg_ptr
);
1537 if(status
== GNI_RC_SUCCESS
)
1539 buffered_send_msg
+= register_size
;
1540 if(control_msg_tmp
->seq_id
== 0)
1542 IncreaseMsgInSend(source_addr
);
1544 FreeControlMsg(control_msg_tmp
);
1545 MACHSTATE5(8, "GO SMSG LARGE to %d (%d,%d,%d) tag=%d\n", destNode
, buffered_send_msg
, buffered_recv_msg
, register_memory_size
, LMSG_INIT_TAG
);
1547 status
= GNI_RC_ERROR_RESOURCE
;
1549 } else if (status
== GNI_RC_INVALID_PARAM
|| status
== GNI_RC_PERMISSION_ERROR
)
1551 CmiAbort("Memory registor for large msg\n");
1554 status
= GNI_RC_ERROR_NOMEM
;
1556 buffer_small_msgs(queue
, control_msg_tmp
, CONTROL_MSG_SIZE
, destNode
, LMSG_INIT_TAG
);
1560 MEMORY_REGISTER(onesided_hnd
, nic_hndl
,msg
, ALIGN64(size
), &(control_msg_tmp
->source_mem_hndl
), &omdh
, NULL
, status
)
1561 if(status
== GNI_RC_SUCCESS
)
1563 status
= send_smsg_message(queue
, destNode
, control_msg_tmp
, CONTROL_MSG_SIZE
, LMSG_INIT_TAG
, 0, NULL
);
1564 if(status
== GNI_RC_SUCCESS
)
1566 FreeControlMsg(control_msg_tmp
);
1568 } else if (status
== GNI_RC_INVALID_PARAM
|| status
== GNI_RC_PERMISSION_ERROR
)
1570 CmiAbort("Memory registor for large msg\n");
1573 buffer_small_msgs(queue
, control_msg_tmp
, CONTROL_MSG_SIZE
, destNode
, LMSG_INIT_TAG
);
1579 inline void LrtsPrepareEnvelope(char *msg
, int size
)
1581 CmiSetMsgSize(msg
, size
);
1582 CMI_SET_CHECKSUM(msg
, size
);
1585 CmiCommHandle
LrtsSendFunc(int destNode
, int size
, char *msg
, int mode
)
1587 gni_return_t status
= GNI_RC_SUCCESS
;
1589 CONTROL_MSG
*control_msg_tmp
;
1590 int oob
= ( mode
& OUT_OF_BAND
);
1593 MACHSTATE5(8, "GO LrtsSendFn %d(%d) (%d,%d, %d) \n", destNode
, size
, buffered_send_msg
, buffered_recv_msg
, register_memory_size
);
1595 queue
= oob
? &smsg_oob_queue
: &smsg_queue
;
1597 queue
= &smsg_queue
;
1600 LrtsPrepareEnvelope(msg
, size
);
1603 printf("LrtsSendFn %d==>%d, size=%d\n", myrank
, destNode
, size
);
1606 if(size
<= SMSG_MAX_MSG
)
1607 buffer_small_msgs(queue
, msg
, size
, destNode
, SMALL_DATA_TAG
);
1608 else if (size
< BIG_MSG
) {
1609 control_msg_tmp
= construct_control_msg(size
, msg
, 0);
1610 buffer_small_msgs(queue
, control_msg_tmp
, CONTROL_MSG_SIZE
, destNode
, LMSG_INIT_TAG
);
1613 CmiSetMsgSeq(msg
, 0);
1614 control_msg_tmp
= construct_control_msg(size
, msg
, 1);
1615 buffer_small_msgs(queue
, control_msg_tmp
, CONTROL_MSG_SIZE
, destNode
, LMSG_INIT_TAG
);
1617 #else //non-smp, smp(worker sending)
1618 if(size
<= SMSG_MAX_MSG
)
1620 if (GNI_RC_SUCCESS
== send_smsg_message(queue
, destNode
, msg
, size
, SMALL_DATA_TAG
, 0, NULL
))
1623 else if (size
< BIG_MSG
) {
1624 control_msg_tmp
= construct_control_msg(size
, msg
, 0);
1625 send_large_messages(queue
, destNode
, control_msg_tmp
, 0, NULL
);
1628 #if USE_LRTS_MEMPOOL
1629 CmiSetMsgSeq(msg
, 0);
1630 control_msg_tmp
= construct_control_msg(size
, msg
, 1);
1631 send_large_messages(queue
, destNode
, control_msg_tmp
, 0, NULL
);
1633 control_msg_tmp
= construct_control_msg(size
, msg
, 0);
1634 send_large_messages(queue
, destNode
, control_msg_tmp
, 0, NULL
);
1641 static void PumpDatagramConnection();
1642 static int event_SetupConnect
= 111;
1643 static int event_PumpSmsg
= 222 ;
1644 static int event_PumpTransaction
= 333;
1645 static int event_PumpRdmaTransaction
= 444;
1646 static int event_SendBufferSmsg
= 444;
1647 static int event_SendFmaRdmaMsg
= 555;
1648 static int event_AdvanceCommunication
= 666;
1650 static void registerUserTraceEvents() {
1651 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1652 event_SetupConnect
= traceRegisterUserEvent("setting up connections", -1 );
1653 event_PumpSmsg
= traceRegisterUserEvent("Pump network small msgs", -1);
1654 event_PumpTransaction
= traceRegisterUserEvent("Pump FMA local transaction" , -1);
1655 event_PumpRdmaTransaction
= traceRegisterUserEvent("Pump RDMA local transaction" , -1);
1656 event_SendBufferSmsg
= traceRegisterUserEvent("Sending buffered small msgs", -1);
1657 event_SendFmaRdmaMsg
= traceRegisterUserEvent("Sending buffered fma/rdma transactions", -1);
1658 event_AdvanceCommunication
= traceRegisterUserEvent("Worker thread in sending/receiving", -1);
1662 static void ProcessDeadlock()
1664 static CmiUInt8
*ptr
= NULL
;
1665 static CmiUInt8 last
= 0, mysum
, sum
;
1666 static int count
= 0;
1667 gni_return_t status
;
1670 //printf("[%d] comm thread detected hang %d %d %d\n", CmiMyPe(), smsg_send_count, smsg_recv_count, count);
1671 //sweep_mempool(CpvAccess(mempool));
1672 if (ptr
== NULL
) ptr
= (CmiUInt8
*)malloc(mysize
* sizeof(CmiUInt8
));
1673 mysum
= smsg_send_count
+ smsg_recv_count
;
1674 MACHSTATE5(9,"Before allgather Progress Deadlock (%d,%d) (%d,%d)(%d)\n", buffered_send_msg
, register_memory_size
, last
, sum
, count
);
1675 status
= PMI_Allgather(&mysum
,ptr
,sizeof(CmiUInt8
));
1676 GNI_RC_CHECK("PMI_Allgather", status
);
1678 for (i
=0; i
<mysize
; i
++) sum
+= ptr
[i
];
1679 if (last
== 0 || sum
== last
)
1684 MACHSTATE5(9,"Progress Deadlock (%d,%d) (%d,%d)(%d)\n", buffered_send_msg
, register_memory_size
, last
, sum
, count
);
1686 /* detected twice, it is a real deadlock */
1688 CmiPrintf("Charm++> Network progress engine appears to have stalled, possibly because registered memory limits have been exceeded or are too low. Try adjusting environment variables CHARM_UGNI_MEMPOOL_MAX and CHARM_UGNI_SEND_MAX (current limits are %lld and %lld).\n", MAX_REG_MEM
, MAX_BUFF_SEND
);
1689 CmiAbort("Fatal> Deadlock detected.");
1696 static void CheckProgress()
1698 if (smsg_send_count
== last_smsg_send_count
&&
1699 smsg_recv_count
== last_smsg_recv_count
)
1703 if (_detected_hang
) ProcessDeadlock();
1708 //MACHSTATE5(9,"--Check Progress %d(%d, %d) (%d,%d)\n", mycount, buffered_send_msg, register_memory_size, smsg_send_count, smsg_recv_count);
1709 last_smsg_send_count
= smsg_send_count
;
1710 last_smsg_recv_count
= smsg_recv_count
;
1715 static void set_limit()
1717 //if (!user_set_flag && CmiMyRank() == 0) {
1718 if (CmiMyRank() == 0) {
1719 int mynode
= CmiPhysicalNodeID(CmiMyPe());
1720 int numpes
= CmiNumPesOnPhysicalNode(mynode
);
1721 int numprocesses
= numpes
/ CmiMyNodeSize();
1722 MAX_REG_MEM
= _totalmem
/ numprocesses
;
1723 MAX_BUFF_SEND
= MAX_REG_MEM
/ 2;
1725 printf("mem_max = %.2fM, send_max =%.2fM\n", MAX_REG_MEM
/1024.0/1024, MAX_BUFF_SEND
/1024./1024);
1726 if(CmiMyPe() == 0 && (smsg_memlen
*mysize
+ _expand_mem
> MAX_BUFF_SEND
|| smsg_memlen
*mysize
+ _mempool_size
> MAX_BUFF_SEND
))
1728 printf("Charm++> FATAL ERROR your program has risk of hanging \n please try large page or use Dynamic smsg +useDynamicSmsg or contact Charm++ developers\n");
1729 CmiAbort("memory registration\n");
1734 void LrtsPostCommonInit(int everReturn
)
1739 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1740 CpvInitialize(double, projTraceStart
);
1741 /* only PE 0 needs to care about registration (to generate sts file). */
1742 //if (CmiMyPe() == 0)
1744 registerMachineUserEventsFunction(®isterUserTraceEvents
);
1749 CmiIdleState
*s
=CmiNotifyGetState();
1750 CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE
,(CcdVoidFn
)CmiNotifyBeginIdle
,(void *)s
);
1751 CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE
,(CcdVoidFn
)CmiNotifyStillIdle
,(void *)s
);
1753 CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE
,(CcdVoidFn
)CmiNotifyStillIdle
,NULL
);
1755 CcdCallOnConditionKeep(CcdPERIODIC_10ms
, (CcdVoidFn
) PumpDatagramConnection
, NULL
);
1761 if (CmiMyRank() == 0)
1763 CcdCallOnConditionKeep(CcdPERIODIC_2minute
, (CcdVoidFn
) CheckProgress
, NULL
);
1767 CcdCallOnCondition(CcdTOPOLOGY_AVAIL
, (CcdVoidFn
)set_limit
, NULL
);
1771 /* this is called by worker thread */
1772 void LrtsPostNonLocal(){
1773 #if CMK_SMP_TRACE_COMMTHREAD
1774 double startT
, endT
;
1776 #if MULTI_THREAD_SEND
1777 if(mysize
== 1) return;
1778 #if CMK_SMP_TRACE_COMMTHREAD
1782 #if CMK_SMP_TRACE_COMMTHREAD
1783 startT
= CmiWallTimer();
1786 #if CMK_WORKER_SINGLE_TASK
1787 if (CmiMyRank() % 6 == 0)
1791 #if CMK_WORKER_SINGLE_TASK
1792 if (CmiMyRank() % 6 == 1)
1794 PumpLocalTransactions(default_tx_cqh
, default_tx_cq_lock
);
1796 #if CMK_WORKER_SINGLE_TASK
1797 if (CmiMyRank() % 6 == 2)
1799 PumpLocalTransactions(rdma_tx_cqh
, rdma_tx_cq_lock
);
1802 #if CMK_WORKER_SINGLE_TASK
1803 if (CmiMyRank() % 6 == 3)
1805 PumpRemoteTransactions();
1809 if (SendBufferMsg(&smsg_oob_queue
) == 1)
1812 #if CMK_WORKER_SINGLE_TASK
1813 if (CmiMyRank() % 6 == 4)
1815 SendBufferMsg(&smsg_queue
);
1818 #if CMK_WORKER_SINGLE_TASK
1819 if (CmiMyRank() % 6 == 5)
1823 #if CMK_SMP_TRACE_COMMTHREAD
1824 endT
= CmiWallTimer();
1825 traceUserBracketEvent(event_AdvanceCommunication
, startT
, endT
);
1827 #if CMK_SMP_TRACE_COMMTHREAD
1833 /* useDynamicSMSG */
1834 static void PumpDatagramConnection()
1836 uint32_t remote_address
;
1838 gni_return_t status
;
1839 gni_post_state_t post_state
;
1840 uint64_t datagram_id
;
1843 while ((status
= GNI_PostDataProbeById(nic_hndl
, &datagram_id
)) == GNI_RC_SUCCESS
)
1845 if (datagram_id
>= mysize
) { /* bound endpoint */
1846 int pe
= datagram_id
- mysize
;
1847 CMI_GNI_LOCK(global_gni_lock
)
1848 status
= GNI_EpPostDataTestById( ep_hndl_array
[pe
], datagram_id
, &post_state
, &remote_address
, &remote_id
);
1849 CMI_GNI_UNLOCK(global_gni_lock
)
1850 if(status
== GNI_RC_SUCCESS
&& post_state
== GNI_POST_COMPLETED
)
1852 CmiAssert(remote_id
== pe
);
1853 status
= GNI_SmsgInit(ep_hndl_array
[pe
], smsg_attr_vector_local
[pe
], smsg_attr_vector_remote
[pe
]);
1854 GNI_RC_CHECK("Dynamic SMSG Init", status
);
1856 printf("++ Dynamic SMSG setup [%d===>%d] done\n", myrank
, pe
);
1858 CmiAssert(smsg_connected_flag
[pe
] == 1);
1859 smsg_connected_flag
[pe
] = 2;
1862 else { /* unbound ep */
1863 status
= GNI_EpPostDataTestById( ep_hndl_unbound
, datagram_id
, &post_state
, &remote_address
, &remote_id
);
1864 if(status
== GNI_RC_SUCCESS
&& post_state
== GNI_POST_COMPLETED
)
1866 CmiAssert(remote_id
<mysize
);
1867 CmiAssert(smsg_connected_flag
[remote_id
] <= 0);
1868 status
= GNI_SmsgInit(ep_hndl_array
[remote_id
], &send_smsg_attr
, &recv_smsg_attr
);
1869 GNI_RC_CHECK("Dynamic SMSG Init", status
);
1871 printf("++ Dynamic SMSG setup2 [%d===>%d] done\n", myrank
, remote_id
);
1873 smsg_connected_flag
[remote_id
] = 2;
1875 alloc_smsg_attr(&send_smsg_attr
);
1876 status
= GNI_EpPostDataWId (ep_hndl_unbound
, &send_smsg_attr
, SMSG_ATTR_SIZE
, &recv_smsg_attr
, SMSG_ATTR_SIZE
, myrank
);
1877 GNI_RC_CHECK("post unbound datagram", status
);
1883 /* pooling CQ to receive network message */
1884 static void PumpNetworkRdmaMsgs()
1886 gni_cq_entry_t event_data
;
1887 gni_return_t status
;
1892 static void bufferRdmaMsg(int inst_id
, gni_post_descriptor_t
*pd
, int ack_index
)
1894 RDMA_REQUEST
*rdma_request_msg
;
1895 MallocRdmaRequest(rdma_request_msg
);
1896 rdma_request_msg
->destNode
= inst_id
;
1897 rdma_request_msg
->pd
= pd
;
1899 rdma_request_msg
->ack_index
= ack_index
;
1902 PCQueuePush(sendRdmaBuf
, (char*)rdma_request_msg
);
1904 if(sendRdmaBuf
== 0)
1906 sendRdmaBuf
= sendRdmaTail
= rdma_request_msg
;
1908 sendRdmaTail
->next
= rdma_request_msg
;
1909 sendRdmaTail
= rdma_request_msg
;
1915 static void getLargeMsgRequest(void* header
, uint64_t inst_id
);
1917 static void PumpNetworkSmsg()
1921 gni_cq_entry_t event_data
;
1922 gni_return_t status
, status2
;
1927 gni_mem_handle_t msg_mem_hndl
;
1928 gni_smsg_attr_t
*smsg_attr
;
1929 gni_smsg_attr_t
*remote_smsg_attr
;
1931 CONTROL_MSG
*control_msg_tmp
, *header_tmp
;
1932 uint64_t source_addr
;
1933 SMSG_QUEUE
*queue
= &smsg_queue
;
1935 cmidirectMsg
*direct_msg
;
1939 CMI_GNI_LOCK(smsg_rx_cq_lock
)
1940 status
=GNI_CqGetEvent(smsg_rx_cqh
, &event_data
);
1941 CMI_GNI_UNLOCK(smsg_rx_cq_lock
)
1942 if(status
!= GNI_RC_SUCCESS
)
1944 inst_id
= GNI_CQ_GET_INST_ID(event_data
);
1946 inst_id
= ACK_GET_RANK(inst_id
);
1948 // GetEvent returns success but GetNext return not_done. caused by Smsg out-of-order transfer
1950 printf("[%d] %d PumpNetworkMsgs is received from PE: %d, status=%s\n", myrank
, CmiMyRank(), inst_id
, gni_err_str
[status
]);
1952 if (useDynamicSMSG
) {
1953 /* subtle: smsg may come before connection is setup */
1954 while (smsg_connected_flag
[inst_id
] != 2)
1955 PumpDatagramConnection();
1957 msg_tag
= GNI_SMSG_ANY_TAG
;
1959 CMI_GNI_LOCK(smsg_mailbox_lock
)
1960 status
= GNI_SmsgGetNextWTag(ep_hndl_array
[inst_id
], &header
, &msg_tag
);
1961 if (status
!= GNI_RC_SUCCESS
)
1963 CMI_GNI_UNLOCK(smsg_mailbox_lock
)
1967 printf("[%d] from %d smsg msg is received, messageid: tag=%d\n", myrank
, inst_id
, msg_tag
);
1969 /* copy msg out and then put into queue (small message) */
1971 case SMALL_DATA_TAG
:
1974 msg_nbytes
= CmiGetMsgSize(header
);
1975 msg_data
= CmiAlloc(msg_nbytes
);
1976 memcpy(msg_data
, (char*)header
, msg_nbytes
);
1977 GNI_SmsgRelease(ep_hndl_array
[inst_id
]);
1978 CMI_GNI_UNLOCK(smsg_mailbox_lock
)
1979 TRACE_COMM_CREATION(CpvAccess(projTraceStart
), msg_data
);
1980 CMI_CHECK_CHECKSUM(msg_data
, msg_nbytes
);
1981 handleOneRecvedMsg(msg_nbytes
, msg_data
);
1986 #if MULTI_THREAD_SEND
1987 MallocControlMsg(control_msg_tmp
);
1988 memcpy(control_msg_tmp
, header
, CONTROL_MSG_SIZE
);
1989 GNI_SmsgRelease(ep_hndl_array
[inst_id
]);
1990 CMI_GNI_UNLOCK(smsg_mailbox_lock
)
1991 getLargeMsgRequest(control_msg_tmp
, inst_id
);
1992 FreeControlMsg(control_msg_tmp
);
1994 CMI_GNI_UNLOCK(smsg_mailbox_lock
)
1995 getLargeMsgRequest(header
, inst_id
);
1996 GNI_SmsgRelease(ep_hndl_array
[inst_id
]);
2000 case ACK_TAG
: //msg fit into mempool
2002 /* Get is done, release message . Now put is not used yet*/
2003 void *msg
= (void*)(((ACK_MSG
*)header
)->source_addr
);
2004 GNI_SmsgRelease(ep_hndl_array
[inst_id
]);
2005 CMI_GNI_UNLOCK(smsg_mailbox_lock
)
2006 #if ! USE_LRTS_MEMPOOL
2007 MEMORY_DEREGISTER(onesided_hnd
, nic_hndl
, &(((ACK_MSG
*)header
)->source_mem_hndl
), &omdh
, ((ACK_MSG
*)header
)->length
);
2009 DecreaseMsgInSend(msg
);
2011 if(NoMsgInSend(msg
))
2012 buffered_send_msg
-= GetMempoolsize(msg
);
2013 MACHSTATE5(8, "GO send done to %d (%d,%d, %d) tag=%d\n", inst_id
, buffered_send_msg
, buffered_recv_msg
, register_memory_size
, msg_tag
);
2017 case BIG_MSG_TAG
: //big msg, de-register, transfer next seg
2019 #if MULTI_THREAD_SEND
2020 MallocControlMsg(header_tmp
);
2021 memcpy(header_tmp
, header
, CONTROL_MSG_SIZE
);
2022 GNI_SmsgRelease(ep_hndl_array
[inst_id
]);
2024 header_tmp
= (CONTROL_MSG
*) header
;
2026 CMI_GNI_UNLOCK(smsg_mailbox_lock
)
2027 void *msg
= (void*)(header_tmp
->source_addr
);
2028 int cur_seq
= CmiGetMsgSeq(msg
);
2029 int offset
= ONE_SEG
*(cur_seq
+1);
2030 MEMORY_DEREGISTER(onesided_hnd
, nic_hndl
, &(header_tmp
->source_mem_hndl
), &omdh
, header_tmp
->length
);
2031 buffered_send_msg
-= header_tmp
->length
;
2032 int remain_size
= CmiGetMsgSize(msg
) - header_tmp
->length
;
2033 if (remain_size
< 0) remain_size
= 0;
2034 CmiSetMsgSize(msg
, remain_size
);
2035 if(remain_size
<= 0) //transaction done
2038 }else if (header_tmp
->total_length
> offset
)
2040 CmiSetMsgSeq(msg
, cur_seq
+1);
2041 control_msg_tmp
= construct_control_msg(header_tmp
->total_length
, msg
, cur_seq
+1+1);
2042 control_msg_tmp
->dest_addr
= header_tmp
->dest_addr
;
2044 send_large_messages( queue
, inst_id
, control_msg_tmp
, 0, NULL
);
2046 if (header_tmp
->seq_id
== 1) {
2048 for (i
=1; i
<BIG_MSG_PIPELINE
; i
++) {
2049 int seq
= cur_seq
+i
+2;
2050 CmiSetMsgSeq(msg
, seq
-1);
2051 control_msg_tmp
= construct_control_msg(header_tmp
->total_length
, (char *)msg
, seq
);
2052 control_msg_tmp
->dest_addr
= header_tmp
->dest_addr
;
2053 send_large_messages( queue
, inst_id
, control_msg_tmp
, 0, NULL
);
2054 if (header_tmp
->total_length
<= ONE_SEG
*seq
) break;
2058 #if MULTI_THREAD_SEND
2059 FreeControlMsg(header_tmp
);
2061 GNI_SmsgRelease(ep_hndl_array
[inst_id
]);
2065 #if CMK_PERSISTENT_COMM
2066 case PUT_DONE_TAG
: { //persistent message
2067 void *msg
= (void *)(((CONTROL_MSG
*) header
)->source_addr
);
2068 int size
= ((CONTROL_MSG
*) header
)->length
;
2069 GNI_SmsgRelease(ep_hndl_array
[inst_id
]);
2070 CMI_GNI_UNLOCK(smsg_mailbox_lock
)
2072 CMI_CHECK_CHECKSUM(msg
, size
);
2073 handleOneRecvedMsg(size
, msg
);
2075 printf("[%d] PUT_DONE_TAG hand over one message, size: %d. \n", myrank
, size
);
2081 case DIRECT_PUT_DONE_TAG
: //cmi direct
2082 //create a trigger message
2083 direct_msg
= (cmidirectMsg
*)CmiAlloc(sizeof(cmidirectMsg
));
2084 direct_msg
->handler
= ((CMK_DIRECT_HEADER
*)header
)->handler_addr
;
2085 GNI_SmsgRelease(ep_hndl_array
[inst_id
]);
2086 CMI_GNI_UNLOCK(smsg_mailbox_lock
)
2087 CmiSetHandler(direct_msg
, CpvAccess(CmiHandleDirectIdx
));
2088 CmiPushPE(((CmiDirectUserHandle
*)direct_msg
->handler
)->remoteRank
, direct_msg
);
2089 //(*(((CMK_DIRECT_HEADER*) header)->callbackFnPtr))(((CMK_DIRECT_HEADER*) header)->callbackData);
2093 GNI_SmsgRelease(ep_hndl_array
[inst_id
]);
2094 CMI_GNI_UNLOCK(smsg_mailbox_lock
)
2095 printf("weird tag problem\n");
2096 CmiAbort("Unknown tag\n");
2100 printf("[%d] from %d after switch request for smsg is received, messageid: tag=%d\n", myrank
, inst_id
, msg_tag
);
2103 msg_tag
= GNI_SMSG_ANY_TAG
;
2104 } //endwhile getNext
2105 } //end while GetEvent
2106 if(status
== GNI_RC_ERROR_RESOURCE
)
2108 printf("charm> Please use +useRecvQueue 204800 in your command line, if the error comes again, increase this number\n");
2109 GNI_RC_CHECK("Smsg_rx_cq full", status
);
2113 static void printDesc(gni_post_descriptor_t
*pd
)
2115 printf(" Descriptor (%p===>%p)(%d)\n", pd
->local_addr
, pd
->remote_addr
, pd
->length
);
2119 static void sendCqWrite(int destNode
, uint64_t data
, gni_mem_handle_t mem_hndl
)
2121 gni_post_descriptor_t
*pd
;
2122 gni_return_t status
= GNI_RC_SUCCESS
;
2125 pd
->type
= GNI_POST_CQWRITE
;
2126 pd
->cq_mode
= GNI_CQMODE_SILENT
;
2127 //pd->cq_mode = GNI_CQMODE_GLOBAL_EVENT | GNI_CQMODE_REMOTE_EVENT ;
2128 pd
->dlvr_mode
= GNI_DLVMODE_PERFORMANCE
;
2129 pd
->cqwrite_value
= data
;
2130 pd
->remote_mem_hndl
= mem_hndl
;
2131 status
= GNI_PostCqWrite(ep_hndl_array
[destNode
], pd
);
2132 GNI_RC_CHECK("GNI_PostCqWrite", status
);
2136 // register memory for a message
2137 // return mem handle
2138 static gni_return_t
registerMessage(void *msg
, int size
, int seqno
, gni_mem_handle_t
*memh
)
2140 gni_return_t status
= GNI_RC_SUCCESS
;
2142 if (!IsMemHndlZero(*memh
)) return GNI_RC_SUCCESS
;
2144 #if CMK_PERSISTENT_COMM
2145 // persistent message is always registered
2146 // BIG_MSG small pieces do not have malloc chunk header
2147 if ((seqno
<= 1 || seqno
== PERSIST_SEQ
) && !IsMemHndlZero(MEMHFIELD(msg
))) {
2148 *memh
= MEMHFIELD(msg
);
2149 return GNI_RC_SUCCESS
;
2153 #if CMK_PERSISTENT_COMM
2154 || seqno
== PERSIST_SEQ
2158 if(IsMemHndlZero((GetMemHndl(msg
))))
2161 status
= registerMemory(GetMempoolBlockPtr(msg
), GetMempoolsize(msg
), &(GetMemHndl(msg
)), rdma_rx_cqh
);
2162 if(status
== GNI_RC_SUCCESS
)
2163 *memh
= GetMemHndl(msg
);
2166 *memh
= GetMemHndl(msg
);
2170 //big msg, can not fit into memory pool, or CmiDirect Msg (which is not from mempool)
2171 status
= registerMemory(msg
, size
, memh
, NULL
);
2176 // for BIG_MSG called on receiver side for receiving control message
2178 static void getLargeMsgRequest(void* header
, uint64_t inst_id
)
2180 #if USE_LRTS_MEMPOOL
2181 CONTROL_MSG
*request_msg
;
2182 gni_return_t status
= GNI_RC_SUCCESS
;
2184 gni_post_descriptor_t
*pd
;
2185 gni_mem_handle_t msg_mem_hndl
;
2186 int size
, transaction_size
, offset
= 0;
2187 size_t register_size
= 0;
2189 // initial a get to transfer data from the sender side */
2190 request_msg
= (CONTROL_MSG
*) header
;
2191 size
= request_msg
->total_length
;
2192 MACHSTATE4(8, "GO Get request from %d (%d,%d, %d) \n", inst_id
, buffered_send_msg
, buffered_recv_msg
, register_memory_size
);
2195 pd
->sync_flag_addr
= 1000000 * CmiWallTimer(); //microsecond
2197 if(request_msg
->seq_id
< 2) {
2198 #if CMK_SMP_TRACE_COMMTHREAD
2199 pd
->sync_flag_addr
= 1000000 * CmiWallTimer(); //microsecond
2201 msg_data
= CmiAlloc(size
);
2202 CmiSetMsgSeq(msg_data
, 0);
2203 _MEMCHECK(msg_data
);
2206 offset
= ONE_SEG
*(request_msg
->seq_id
-1);
2207 msg_data
= (char*)request_msg
->dest_addr
+ offset
;
2210 pd
->cqwrite_value
= request_msg
->seq_id
;
2212 transaction_size
= request_msg
->seq_id
== 0? ALIGN64(size
) : ALIGN64(request_msg
->length
);
2213 SetMemHndlZero(pd
->local_mem_hndl
);
2214 status
= registerMessage(msg_data
, transaction_size
, request_msg
->seq_id
, &pd
->local_mem_hndl
);
2215 if (status
== GNI_RC_SUCCESS
&& request_msg
->seq_id
== 0) {
2216 if(NoMsgInRecv( (void*)(msg_data
)))
2217 register_size
= GetMempoolsize((void*)(msg_data
));
2220 pd
->first_operand
= ALIGN64(size
); // total length
2222 if(request_msg
->total_length
<= LRTS_GNI_RDMA_THRESHOLD
)
2223 pd
->type
= GNI_POST_FMA_GET
;
2225 pd
->type
= GNI_POST_RDMA_GET
;
2226 pd
->cq_mode
= GNI_CQMODE_GLOBAL_EVENT
;
2227 pd
->dlvr_mode
= GNI_DLVMODE_PERFORMANCE
;
2228 pd
->length
= transaction_size
;
2229 pd
->local_addr
= (uint64_t) msg_data
;
2230 pd
->remote_addr
= request_msg
->source_addr
+ offset
;
2231 pd
->remote_mem_hndl
= request_msg
->source_mem_hndl
;
2232 pd
->src_cq_hndl
= rdma_tx_cqh
;
2236 //memory registration success
2237 if(status
== GNI_RC_SUCCESS
)
2239 CmiNodeLock lock
= pd
->type
== GNI_POST_RDMA_GET
?rdma_tx_cq_lock
:default_tx_cq_lock
;
2242 if( request_msg
->seq_id
== 0)
2244 pd
->cq_mode
|= GNI_CQMODE_REMOTE_EVENT
;
2245 int sts
= GNI_EpSetEventData(ep_hndl_array
[inst_id
], inst_id
, ACK_EVENT(request_msg
->ack_index
));
2246 GNI_RC_CHECK("GNI_EpSetEventData", sts
);
2253 if(pd
->type
== GNI_POST_RDMA_GET
)
2255 status
= GNI_PostRdma(ep_hndl_array
[inst_id
], pd
);
2259 status
= GNI_PostFma(ep_hndl_array
[inst_id
], pd
);
2261 CMI_GNI_UNLOCK(lock
)
2263 if(status
== GNI_RC_SUCCESS
)
2265 if(pd
->cqwrite_value
== 0)
2267 #if MACHINE_DEBUG_LOG
2268 buffered_recv_msg
+= register_size
;
2269 MACHSTATE4(8, "GO request from %d (%d,%d, %d)\n", inst_id
, buffered_send_msg
, buffered_recv_msg
, register_memory_size
);
2271 IncreaseMsgInRecv(msg_data
);
2272 #if CMK_SMP_TRACE_COMMTHREAD
2273 pd
->sync_flag_value
= 1000000 * CmiWallTimer(); //microsecond
2277 pd
->sync_flag_value
= 1000000 * CmiWallTimer(); //microsecond
2278 RDMA_TRANS_INIT(pd
->sync_flag_addr
/1000000.0)
2283 SetMemHndlZero((pd
->local_mem_hndl
));
2285 if(status
== GNI_RC_ERROR_RESOURCE
|| status
== GNI_RC_ERROR_NOMEM
)
2288 bufferRdmaMsg(inst_id
, pd
, request_msg
->ack_index
);
2290 bufferRdmaMsg(inst_id
, pd
, -1);
2292 }else if (status
!= GNI_RC_SUCCESS
) {
2293 // printf("source: %d pd:(%p,%p)(%p,%p) len:%d local:%x remote:%x\n", (int)inst_id, (pd->local_mem_hndl).qword1, (pd->local_mem_hndl).qword2, (pd->remote_mem_hndl).qword1, (pd->remote_mem_hndl).qword2, pd->length, pd->local_addr, pd->remote_addr);
2294 GNI_RC_CHECK("GetLargeAFter posting", status
);
2297 CONTROL_MSG
*request_msg
;
2298 gni_return_t status
;
2300 gni_post_descriptor_t
*pd
;
2301 RDMA_REQUEST
*rdma_request_msg
;
2302 gni_mem_handle_t msg_mem_hndl
;
2304 // initial a get to transfer data from the sender side */
2305 request_msg
= (CONTROL_MSG
*) header
;
2306 msg_data
= CmiAlloc(request_msg
->length
);
2307 _MEMCHECK(msg_data
);
2309 MEMORY_REGISTER(onesided_hnd
, nic_hndl
, msg_data
, request_msg
->length
, &msg_mem_hndl
, &omdh
, NULL
, status
)
2311 if (status
== GNI_RC_INVALID_PARAM
|| status
== GNI_RC_PERMISSION_ERROR
)
2313 GNI_RC_CHECK("Invalid/permission Mem Register in post", status
);
2317 if(request_msg
->length
<= LRTS_GNI_RDMA_THRESHOLD
)
2318 pd
->type
= GNI_POST_FMA_GET
;
2320 pd
->type
= GNI_POST_RDMA_GET
;
2321 pd
->cq_mode
= GNI_CQMODE_GLOBAL_EVENT
;// | GNI_CQMODE_REMOTE_EVENT;
2322 pd
->dlvr_mode
= GNI_DLVMODE_PERFORMANCE
;
2323 pd
->length
= ALIGN64(request_msg
->length
);
2324 pd
->local_addr
= (uint64_t) msg_data
;
2325 pd
->remote_addr
= request_msg
->source_addr
;
2326 pd
->remote_mem_hndl
= request_msg
->source_mem_hndl
;
2327 pd
->src_cq_hndl
= rdma_tx_cqh
;
2331 //memory registration successful
2332 if(status
== GNI_RC_SUCCESS
)
2334 pd
->local_mem_hndl
= msg_mem_hndl
;
2336 if(pd
->type
== GNI_POST_RDMA_GET
)
2338 CMI_GNI_LOCK(rdma_tx_cq_lock
)
2339 status
= GNI_PostRdma(ep_hndl_array
[inst_id
], pd
);
2340 CMI_GNI_UNLOCK(rdma_tx_cq_lock
)
2344 CMI_GNI_LOCK(default_tx_cq_lock
)
2345 status
= GNI_PostFma(ep_hndl_array
[inst_id
], pd
);
2346 CMI_GNI_UNLOCK(default_tx_cq_lock
)
2351 SetMemHndlZero(pd
->local_mem_hndl
);
2353 if(status
== GNI_RC_ERROR_RESOURCE
|| status
== GNI_RC_ERROR_NOMEM
)
2355 MallocRdmaRequest(rdma_request_msg
);
2356 rdma_request_msg
->next
= 0;
2357 rdma_request_msg
->destNode
= inst_id
;
2358 rdma_request_msg
->pd
= pd
;
2359 PCQueuePush(sendRdmaBuf
, (char*)rdma_request_msg
);
2361 GNI_RC_CHECK("AFter posting", status
);
2367 static void PumpCqWriteTransactions()
2371 gni_return_t status
;
2375 //CMI_GNI_LOCK(my_cq_lock)
2376 status
= GNI_CqGetEvent(rdma_rx_cqh
, &ev
);
2377 //CMI_GNI_UNLOCK(my_cq_lock)
2378 if(status
!= GNI_RC_SUCCESS
) break;
2379 msg
= (void*) ( GNI_CQ_GET_DATA(ev
) & 0xFFFFFFFFFFFFL
);
2380 #if CMK_PERSISTENT_COMM
2382 printf(" %d CQ write event %p\n", myrank
, msg
);
2384 if (!IsMemHndlZero(MEMHFIELD(msg
))) {
2386 printf(" %d Persistent CQ write event %p\n", myrank
, msg
);
2389 msg_size
= CmiGetMsgSize(msg
);
2390 CMI_CHECK_CHECKSUM(msg
, msg_size
);
2391 handleOneRecvedMsg(msg_size
, msg
);
2395 DecreaseMsgInSend(msg
);
2396 #if ! USE_LRTS_MEMPOOL
2397 // MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((ACK_MSG *)header)->source_mem_hndl), &omdh, ((ACK_MSG *)header)->length);
2399 DecreaseMsgInSend(msg
);
2401 if(NoMsgInSend(msg
))
2402 buffered_send_msg
-= GetMempoolsize(msg
);
2405 if(status
== GNI_RC_ERROR_RESOURCE
)
2407 GNI_RC_CHECK("rdma_rx_cq full too many ack", status
);
2413 static void PumpRemoteTransactions()
2416 gni_return_t status
;
2418 int slot
, type
, size
;
2421 CMI_GNI_LOCK(global_gni_lock
)
2422 status
= GNI_CqGetEvent(rdma_rx_cqh
, &ev
);
2423 CMI_GNI_UNLOCK(global_gni_lock
)
2424 if(status
!= GNI_RC_SUCCESS
) {
2428 slot
= GNI_CQ_GET_INST_ID(ev
);
2429 slot
= ACK_GET_INDEX(slot
);
2430 //slot = GNI_CQ_GET_DATA(ev) & 0xFFFFFFFFL;
2432 //CMI_GNI_LOCK(ackpool_lock);
2433 type
= GetIndexType(slot
);
2434 msg
= GetIndexAddress(slot
);
2435 //CMI_GNI_UNLOCK(ackpool_lock);
2439 DecreaseMsgInSend(msg
);
2440 #if ! USE_LRTS_MEMPOOL
2441 // MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((ACK_MSG *)header)->source_mem_hndl), &omdh, ((ACK_MSG *)header)->length);
2443 DecreaseMsgInSend(msg
);
2445 if(NoMsgInSend(msg
))
2446 buffered_send_msg
-= GetMempoolsize(msg
);
2448 IndexPool_freeslot(&ackPool
, slot
);
2450 #if CMK_PERSISTENT_COMM
2451 case 2: // PERSISTENT
2452 msg
= ((PersistentReceivesTable
*)msg
)->destBuf
[0].destAddress
;
2453 size
= CmiGetMsgSize(msg
);
2455 CMI_CHECK_CHECKSUM(msg
, size
);
2456 handleOneRecvedMsg(size
, msg
);
2460 fprintf(stderr
, "[%d] PumpRemoteTransactions: unknown type: %d\n", myrank
, type
);
2461 CmiAbort("PumpRemoteTransactions: unknown type");
2465 if(status
== GNI_RC_ERROR_RESOURCE
)
2467 GNI_RC_CHECK("rdma_rx_cq full too many ack", status
);
2472 static void PumpLocalTransactions(gni_cq_handle_t my_tx_cqh
, CmiNodeLock my_cq_lock
)
2475 gni_return_t status
;
2476 uint64_t type
, inst_id
;
2477 gni_post_descriptor_t
*tmp_pd
;
2479 CONTROL_MSG
*ack_msg_tmp
;
2483 CMK_DIRECT_HEADER
*cmk_direct_done_msg
;
2485 SMSG_QUEUE
*queue
= &smsg_queue
;
2488 CMI_GNI_LOCK(my_cq_lock
)
2489 status
= GNI_CqGetEvent(my_tx_cqh
, &ev
);
2490 CMI_GNI_UNLOCK(my_cq_lock
)
2491 if(status
!= GNI_RC_SUCCESS
) break;
2493 type
= GNI_CQ_GET_TYPE(ev
);
2494 if (type
== GNI_CQ_EVENT_TYPE_POST
)
2496 inst_id
= GNI_CQ_GET_INST_ID(ev
);
2498 printf("[%d] LocalTransactions localdone=%d\n", myrank
, lrts_local_done_msg
);
2500 CMI_GNI_LOCK(my_cq_lock
)
2501 status
= GNI_GetCompleted(my_tx_cqh
, ev
, &tmp_pd
);
2502 CMI_GNI_UNLOCK(my_cq_lock
)
2504 switch (tmp_pd
->type
) {
2505 #if CMK_PERSISTENT_COMM || CMK_DIRECT
2506 case GNI_POST_RDMA_PUT
:
2507 #if CMK_PERSISTENT_COMM && ! USE_LRTS_MEMPOOL
2508 MEMORY_DEREGISTER(onesided_hnd
, nic_hndl
, &tmp_pd
->local_mem_hndl
, &omdh
, tmp_pd
->length
);
2510 case GNI_POST_FMA_PUT
:
2511 if(tmp_pd
->amo_cmd
== 1) {
2513 //sender ACK to receiver to trigger it is done
2514 cmk_direct_done_msg
= (CMK_DIRECT_HEADER
*) malloc(sizeof(CMK_DIRECT_HEADER
));
2515 cmk_direct_done_msg
->handler_addr
= tmp_pd
->first_operand
;
2516 msg_tag
= DIRECT_PUT_DONE_TAG
;
2520 CmiFree((void *)tmp_pd
->local_addr
);
2522 FreePostDesc(tmp_pd
);
2525 sendCqWrite(inst_id
, tmp_pd
->remote_addr
, tmp_pd
->remote_mem_hndl
);
2526 FreePostDesc(tmp_pd
);
2529 MallocControlMsg(ack_msg_tmp
);
2530 ack_msg_tmp
->source_addr
= tmp_pd
->remote_addr
;
2531 ack_msg_tmp
->source_mem_hndl
= tmp_pd
->remote_mem_hndl
;
2532 ack_msg_tmp
->length
= tmp_pd
->length
;
2533 msg_tag
= PUT_DONE_TAG
;
2538 case GNI_POST_RDMA_GET
:
2539 case GNI_POST_FMA_GET
: {
2540 #if ! USE_LRTS_MEMPOOL
2541 MallocControlMsg(ack_msg_tmp
);
2542 ack_msg_tmp
->source_addr
= tmp_pd
->remote_addr
;
2543 ack_msg_tmp
->source_mem_hndl
= tmp_pd
->remote_mem_hndl
;
2544 MEMORY_DEREGISTER(onesided_hnd
, nic_hndl
, &tmp_pd
->local_mem_hndl
, &omdh
, tmp_pd
->length
)
2548 RDMA_TRANS_DONE(tmp_pd
->sync_flag_value
/1000000.0)
2550 int seq_id
= tmp_pd
->cqwrite_value
;
2551 if(seq_id
> 0) // BIG_MSG
2553 MEMORY_DEREGISTER(onesided_hnd
, nic_hndl
, &tmp_pd
->local_mem_hndl
, &omdh
, tmp_pd
->length
);
2554 MallocControlMsg(ack_msg_tmp
);
2555 ack_msg_tmp
->source_addr
= tmp_pd
->remote_addr
;
2556 ack_msg_tmp
->source_mem_hndl
= tmp_pd
->remote_mem_hndl
;
2557 ack_msg_tmp
->seq_id
= seq_id
;
2558 ack_msg_tmp
->dest_addr
= tmp_pd
->local_addr
- ONE_SEG
*(ack_msg_tmp
->seq_id
-1);
2559 ack_msg_tmp
->source_addr
-= ONE_SEG
*(ack_msg_tmp
->seq_id
-1);
2560 ack_msg_tmp
->length
= tmp_pd
->length
;
2561 ack_msg_tmp
->total_length
= tmp_pd
->first_operand
; // total size
2562 msg_tag
= BIG_MSG_TAG
;
2567 #if !REMOTE_EVENT && !CQWRITE
2568 MallocAckMsg(ack_msg
);
2569 ack_msg
->source_addr
= tmp_pd
->remote_addr
;
2575 case GNI_POST_CQWRITE
:
2576 FreePostDesc(tmp_pd
);
2579 CmiPrintf("type=%d\n", tmp_pd
->type
);
2580 CmiAbort("PumpLocalTransactions: unknown type!");
2581 } /* end of switch */
2584 if (tmp_pd
->amo_cmd
== 1) {
2585 status
= send_smsg_message(queue
, inst_id
, cmk_direct_done_msg
, sizeof(CMK_DIRECT_HEADER
), msg_tag
, 0, NULL
);
2586 if (status
== GNI_RC_SUCCESS
) free(cmk_direct_done_msg
);
2590 if (msg_tag
== ACK_TAG
) {
2593 status
= send_smsg_message(queue
, inst_id
, ack_msg
, ACK_MSG_SIZE
, msg_tag
, 0, NULL
);
2594 if (status
== GNI_RC_SUCCESS
) FreeAckMsg(ack_msg
);
2596 sendCqWrite(inst_id
, tmp_pd
->remote_addr
, tmp_pd
->remote_mem_hndl
);
2601 status
= send_smsg_message(queue
, inst_id
, ack_msg_tmp
, CONTROL_MSG_SIZE
, msg_tag
, 0, NULL
);
2602 if (status
== GNI_RC_SUCCESS
) FreeControlMsg(ack_msg_tmp
);
2604 #if CMK_PERSISTENT_COMM
2605 if (tmp_pd
->type
== GNI_POST_RDMA_GET
|| tmp_pd
->type
== GNI_POST_FMA_GET
)
2608 if( msg_tag
== ACK_TAG
){ //msg fit in mempool
2610 printf("Normal msg transaction PE:%d==>%d\n", myrank
, inst_id
);
2612 TRACE_COMM_CONTROL_CREATION((double)(tmp_pd
->sync_flag_addr
/1000000.0), (double)((tmp_pd
->sync_flag_addr
+1)/1000000.0), (double)((tmp_pd
->sync_flag_addr
+1)/1000000.0), (void*)tmp_pd
->local_addr
);
2613 TRACE_COMM_CONTROL_CREATION((double)(tmp_pd
->sync_flag_value
/1000000.0), (double)((tmp_pd
->sync_flag_value
+1)/1000000.0), (double)((tmp_pd
->sync_flag_value
+1)/1000000.0), (void*)tmp_pd
->local_addr
);
2616 CmiAssert(SIZEFIELD((void*)(tmp_pd
->local_addr
)) <= tmp_pd
->length
);
2617 DecreaseMsgInRecv((void*)tmp_pd
->local_addr
);
2618 #if MACHINE_DEBUG_LOG
2619 if(NoMsgInRecv((void*)(tmp_pd
->local_addr
)))
2620 buffered_recv_msg
-= GetMempoolsize((void*)(tmp_pd
->local_addr
));
2621 MACHSTATE5(8, "GO Recv done ack send from %d (%d,%d, %d) tag=%d\n", inst_id
, buffered_send_msg
, buffered_recv_msg
, register_memory_size
, msg_tag
);
2623 TRACE_COMM_CREATION(CpvAccess(projTraceStart
), (void*)tmp_pd
->local_addr
);
2624 CMI_CHECK_CHECKSUM((void*)tmp_pd
->local_addr
, tmp_pd
->length
);
2625 handleOneRecvedMsg(tmp_pd
->length
, (void*)tmp_pd
->local_addr
);
2626 }else if(msg_tag
== BIG_MSG_TAG
){
2627 void *msg
= (char*)tmp_pd
->local_addr
-(tmp_pd
->cqwrite_value
-1)*ONE_SEG
;
2628 CmiSetMsgSeq(msg
, CmiGetMsgSeq(msg
)+1);
2629 if (tmp_pd
->first_operand
<= ONE_SEG
*CmiGetMsgSeq(msg
)) {
2632 printf("Pipeline msg done [%d]\n", myrank
);
2634 #if CMK_SMP_TRACE_COMMTHREAD
2635 if( tmp_pd
->cqwrite_value
== 1)
2636 TRACE_COMM_CONTROL_CREATION((double)(tmp_pd
->sync_flag_addr
/1000000.0), (double)((tmp_pd
->sync_flag_addr
+1)/1000000.0), (double)((tmp_pd
->sync_flag_addr
+2)/1000000.0), (void*)tmp_pd
->local_addr
);
2638 TRACE_COMM_CREATION(CpvAccess(projTraceStart
), msg
);
2639 CMI_CHECK_CHECKSUM(msg
, tmp_pd
->first_operand
);
2640 handleOneRecvedMsg(tmp_pd
->first_operand
, msg
);
2644 FreePostDesc(tmp_pd
);
2647 if(status
== GNI_RC_ERROR_RESOURCE
)
2649 printf("charm> Please use +useSendQueue 204800 in your command line, if the error comes again, increase this number\n");
2650 GNI_RC_CHECK("Smsg_tx_cq full", status
);
2654 static void SendRdmaMsg()
2656 gni_return_t status
= GNI_RC_SUCCESS
;
2657 gni_mem_handle_t msg_mem_hndl
;
2658 RDMA_REQUEST
*ptr
= 0, *tmp_ptr
;
2659 RDMA_REQUEST
*pre
= 0;
2660 uint64_t register_size
= 0;
2664 int len
= PCQueueLength(sendRdmaBuf
);
2665 for (i
=0; i
<len
; i
++)
2667 CMI_PCQUEUEPOP_LOCK(sendRdmaBuf
)
2668 ptr
= (RDMA_REQUEST
*)PCQueuePop(sendRdmaBuf
);
2669 CMI_PCQUEUEPOP_UNLOCK(sendRdmaBuf
)
2670 if (ptr
== NULL
) break;
2676 MACHSTATE4(8, "noempty-rdma %d (%lld,%lld,%d) \n", ptr
->destNode
, buffered_send_msg
, buffered_recv_msg
, register_memory_size
);
2677 gni_post_descriptor_t
*pd
= ptr
->pd
;
2679 msg
= (void*)(pd
->local_addr
);
2680 status
= registerMessage(msg
, pd
->length
, pd
->cqwrite_value
, &pd
->local_mem_hndl
);
2682 if(pd
->cqwrite_value
== 0) {
2683 if(NoMsgInRecv(msg
))
2684 register_size
= GetMempoolsize(msg
);
2687 if(status
== GNI_RC_SUCCESS
) //mem register good
2689 CmiNodeLock lock
= (pd
->type
== GNI_POST_RDMA_GET
|| pd
->type
== GNI_POST_RDMA_PUT
) ? rdma_tx_cq_lock
:default_tx_cq_lock
;
2692 if( pd
->cqwrite_value
== 0
2693 #if CMK_PERSISTENT_COMM
2694 || pd
->cqwrite_value
== PERSIST_SEQ
2698 pd
->cq_mode
|= GNI_CQMODE_REMOTE_EVENT
;
2699 int sts
= GNI_EpSetEventData(ep_hndl_array
[ptr
->destNode
], ptr
->destNode
, ACK_EVENT(ptr
->ack_index
));
2700 GNI_RC_CHECK("GNI_EpSetEventData", sts
);
2706 if(pd
->type
== GNI_POST_RDMA_GET
|| pd
->type
== GNI_POST_RDMA_PUT
)
2708 status
= GNI_PostRdma(ep_hndl_array
[ptr
->destNode
], pd
);
2712 status
= GNI_PostFma(ep_hndl_array
[ptr
->destNode
], pd
);
2714 CMI_GNI_UNLOCK(lock
);
2716 if(status
== GNI_RC_SUCCESS
) //post good
2721 pre
->next
= ptr
->next
;
2724 sendRdmaBuf
= ptr
->next
;
2727 FreeRdmaRequest(tmp_ptr
);
2729 if(pd
->cqwrite_value
== 0)
2731 #if CMK_SMP_TRACE_COMMTHREAD
2732 pd
->sync_flag_value
= 1000000 * CmiWallTimer(); //microsecond
2734 IncreaseMsgInRecv(((void*)(pd
->local_addr
)));
2737 pd
->sync_flag_value
= 1000000 * CmiWallTimer(); //microsecond
2738 RDMA_TRANS_INIT(pd
->sync_flag_addr
/1000000.0)
2740 #if MACHINE_DEBUG_LOG
2741 buffered_recv_msg
+= register_size
;
2742 MACHSTATE(8, "GO request from buffered\n");
2745 printf("[%d] SendRdmaMsg: post succeed. seqno: %d\n", myrank
, pd
->cqwrite_value
);
2747 }else // cannot post
2750 PCQueuePush(sendRdmaBuf
, (char*)ptr
);
2756 printf("[%d] SendRdmaMsg: post failed. seqno: %d\n", myrank
, pd
->cqwrite_value
);
2760 } else //memory registration fails
2763 PCQueuePush(sendRdmaBuf
, (char*)ptr
);
2776 // return 1 if all messages are sent
2777 static int SendBufferMsg(SMSG_QUEUE
*queue
)
2779 MSG_LIST
*ptr
, *tmp_ptr
, *pre
=0, *current_head
;
2780 CONTROL_MSG
*control_msg_tmp
;
2781 gni_return_t status
;
2783 uint64_t register_size
;
2784 void *register_addr
;
2785 int index_previous
= -1;
2786 #if CMI_EXERT_SEND_CAP
2793 memset(destpe_avail
, 0, mysize
* sizeof(char));
2794 for (index
=0; index
<1; index
++)
2796 int i
, len
= PCQueueLength(queue
->sendMsgBuf
);
2797 for (i
=0; i
<len
; i
++)
2799 CMI_PCQUEUEPOP_LOCK(queue
->sendMsgBuf
)
2800 ptr
= (MSG_LIST
*)PCQueuePop(queue
->sendMsgBuf
);
2801 CMI_PCQUEUEPOP_UNLOCK(queue
->sendMsgBuf
)
2802 if(ptr
== NULL
) break;
2803 if (destpe_avail
[ptr
->destNode
] == 1) { /* can't send to this pe */
2804 PCQueuePush(queue
->sendMsgBuf
, (char*)ptr
);
2809 int nonempty
= PCQueueLength(nonEmptyQueues
);
2810 for(index
=0; index
<nonempty
; index
++)
2812 CMI_PCQUEUEPOP_LOCK(nonEmptyQueues
)
2813 MSG_LIST_INDEX
*current_list
= (MSG_LIST_INDEX
*)PCQueuePop(nonEmptyQueues
);
2814 CMI_PCQUEUEPOP_UNLOCK(nonEmptyQueues
)
2815 if(current_list
== NULL
) break;
2816 PCQueue current_queue
= current_list
-> sendSmsgBuf
;
2817 CmiLock(current_list
->lock
);
2818 int i
, len
= PCQueueLength(current_queue
);
2819 current_list
->pushed
= 0;
2820 CmiUnlock(current_list
->lock
);
2822 for(index
=0; index
<mysize
; index
++)
2824 PCQueue current_queue
= queue
->smsg_msglist_index
[index
].sendSmsgBuf
;
2825 int i
, len
= PCQueueLength(current_queue
);
2827 for (i
=0; i
<len
; i
++)
2829 CMI_PCQUEUEPOP_LOCK(current_queue
)
2830 ptr
= (MSG_LIST
*)PCQueuePop(current_queue
);
2831 CMI_PCQUEUEPOP_UNLOCK(current_queue
)
2832 if (ptr
== 0) break;
2835 int index
= queue
->smsg_head_index
;
2838 ptr
= queue
->smsg_msglist_index
[index
].sendSmsgBuf
;
2843 MACHSTATE5(8, "noempty-smsg %d (%d,%d,%d) tag=%d \n", ptr
->destNode
, buffered_send_msg
, buffered_recv_msg
, register_memory_size
, ptr
->tag
);
2844 status
= GNI_RC_ERROR_RESOURCE
;
2845 if (useDynamicSMSG
&& smsg_connected_flag
[index
] != 2) {
2846 /* connection not exists yet */
2851 case SMALL_DATA_TAG
:
2852 status
= send_smsg_message(queue
, ptr
->destNode
, ptr
->msg
, ptr
->size
, ptr
->tag
, 1, ptr
);
2853 if(status
== GNI_RC_SUCCESS
)
2859 control_msg_tmp
= (CONTROL_MSG
*)ptr
->msg
;
2860 status
= send_large_messages( queue
, ptr
->destNode
, control_msg_tmp
, 1, ptr
);
2863 status
= send_smsg_message(queue
, ptr
->destNode
, ptr
->msg
, ptr
->size
, ptr
->tag
, 1, ptr
);
2864 if(status
== GNI_RC_SUCCESS
) FreeAckMsg((ACK_MSG
*)ptr
->msg
);
2867 status
= send_smsg_message(queue
, ptr
->destNode
, ptr
->msg
, ptr
->size
, ptr
->tag
, 1, ptr
);
2868 if(status
== GNI_RC_SUCCESS
)
2870 FreeControlMsg((CONTROL_MSG
*)ptr
->msg
);
2873 #if CMK_PERSISTENT_COMM
2875 status
= send_smsg_message(queue
, ptr
->destNode
, ptr
->msg
, ptr
->size
, ptr
->tag
, 1, ptr
);
2876 if(status
== GNI_RC_SUCCESS
)
2878 FreeControlMsg((CONTROL_MSG
*)ptr
->msg
);
2883 case DIRECT_PUT_DONE_TAG
:
2884 status
= send_smsg_message(queue
, ptr
->destNode
, ptr
->msg
, sizeof(CMK_DIRECT_HEADER
), ptr
->tag
, 1, ptr
);
2885 if(status
== GNI_RC_SUCCESS
)
2887 free((CMK_DIRECT_HEADER
*)ptr
->msg
);
2893 printf("Weird tag\n");
2894 CmiAbort("should not happen\n");
2896 if(status
== GNI_RC_SUCCESS
)
2899 buffered_smsg_counter
--;
2900 printf("[%d==>%d] buffered smsg sending done\n", myrank
, ptr
->destNode
);
2906 ptr
= pre
->next
= ptr
->next
;
2909 ptr
= queue
->smsg_msglist_index
[index
].sendSmsgBuf
= queue
->smsg_msglist_index
[index
].sendSmsgBuf
->next
;
2911 FreeMsgList(tmp_ptr
);
2915 #if CMI_EXERT_SEND_CAP
2917 if(sent_cnt
== SEND_CAP
)
2923 PCQueuePush(queue
->sendMsgBuf
, (char*)ptr
);
2925 PCQueuePush(current_queue
, (char*)ptr
);
2932 if(status
== GNI_RC_ERROR_RESOURCE
)
2934 #if CMK_SMP && ONE_SEND_QUEUE
2935 destpe_avail
[ptr
->destNode
] = 1;
2944 queue
->smsg_msglist_index
[index
].tail
= pre
;
2945 if(queue
->smsg_msglist_index
[index
].sendSmsgBuf
== 0)
2947 if(index_previous
!= -1)
2948 queue
->smsg_msglist_index
[index_previous
].next
= queue
->smsg_msglist_index
[index
].next
;
2950 queue
->smsg_head_index
= queue
->smsg_msglist_index
[index
].next
;
2953 index_previous
= index
;
2955 index
= queue
->smsg_msglist_index
[index
].next
;
2957 #if !ONE_SEND_QUEUE && SMP_LOCKS
2958 CmiLock(current_list
->lock
);
2959 if(!PCQueueEmpty(current_queue
) && current_list
->pushed
== 0)
2961 current_list
->pushed
= 1;
2962 PCQueuePush(nonEmptyQueues
, current_list
);
2964 CmiUnlock(current_list
->lock
);
2968 #if CMI_EXERT_SEND_CAP
2969 if(sent_cnt
== SEND_CAP
)
2972 } // end pooling for all cores
2976 static void ProcessDeadlock();
2977 void LrtsAdvanceCommunication(int whileidle
)
2979 static int count
= 0;
2980 /* Receive Msg first */
2981 #if CMK_SMP_TRACE_COMMTHREAD
2982 double startT
, endT
;
2984 if (useDynamicSMSG
&& whileidle
)
2986 #if CMK_SMP_TRACE_COMMTHREAD
2987 startT
= CmiWallTimer();
2989 PumpDatagramConnection();
2990 #if CMK_SMP_TRACE_COMMTHREAD
2991 endT
= CmiWallTimer();
2992 if (endT
-startT
>=TRACE_THRESHOLD
) traceUserBracketEvent(event_SetupConnect
, startT
, endT
);
2996 #if CMK_SMP_TRACE_COMMTHREAD
2997 startT
= CmiWallTimer();
3000 //MACHSTATE(8, "after PumpNetworkSmsg \n") ;
3001 #if CMK_SMP_TRACE_COMMTHREAD
3002 endT
= CmiWallTimer();
3003 if (endT
-startT
>=TRACE_THRESHOLD
) traceUserBracketEvent(event_PumpSmsg
, startT
, endT
);
3006 #if CMK_SMP_TRACE_COMMTHREAD
3007 startT
= CmiWallTimer();
3009 PumpLocalTransactions(default_tx_cqh
, default_tx_cq_lock
);
3010 //MACHSTATE(8, "after PumpLocalTransactions\n") ;
3011 #if CMK_SMP_TRACE_COMMTHREAD
3012 endT
= CmiWallTimer();
3013 if (endT
-startT
>=TRACE_THRESHOLD
) traceUserBracketEvent(event_PumpTransaction
, startT
, endT
);
3016 #if CMK_SMP_TRACE_COMMTHREAD
3017 startT
= CmiWallTimer();
3019 PumpLocalTransactions(rdma_tx_cqh
, rdma_tx_cq_lock
);
3022 PumpCqWriteTransactions();
3026 PumpRemoteTransactions();
3029 //MACHSTATE(8, "after PumpLocalTransactions\n") ;
3030 #if CMK_SMP_TRACE_COMMTHREAD
3031 endT
= CmiWallTimer();
3032 if (endT
-startT
>=TRACE_THRESHOLD
) traceUserBracketEvent(event_PumpRdmaTransaction
, startT
, endT
);
3035 /* Send buffered Message */
3036 #if CMK_SMP_TRACE_COMMTHREAD
3037 startT
= CmiWallTimer();
3040 if (SendBufferMsg(&smsg_oob_queue
) == 1)
3043 SendBufferMsg(&smsg_queue
);
3045 //MACHSTATE(8, "after SendBufferMsg\n") ;
3046 #if CMK_SMP_TRACE_COMMTHREAD
3047 endT
= CmiWallTimer();
3048 if (endT
-startT
>=TRACE_THRESHOLD
) traceUserBracketEvent(event_SendBufferSmsg
, startT
, endT
);
3051 #if CMK_SMP_TRACE_COMMTHREAD
3052 startT
= CmiWallTimer();
3055 //MACHSTATE(8, "after SendRdmaMsg\n") ;
3056 #if CMK_SMP_TRACE_COMMTHREAD
3057 endT
= CmiWallTimer();
3058 if (endT
-startT
>=TRACE_THRESHOLD
) traceUserBracketEvent(event_SendFmaRdmaMsg
, startT
, endT
);
3061 #if CMK_SMP && ! LARGEPAGE
3062 if (_detected_hang
) ProcessDeadlock();
3066 /* useDynamicSMSG */
3067 static void _init_dynamic_smsg()
3069 gni_return_t status
;
3070 uint32_t vmdh_index
= -1;
3073 smsg_attr_vector_local
= (gni_smsg_attr_t
**)malloc(mysize
* sizeof(gni_smsg_attr_t
*));
3074 smsg_attr_vector_remote
= (gni_smsg_attr_t
**)malloc(mysize
* sizeof(gni_smsg_attr_t
*));
3075 smsg_connected_flag
= (int*)malloc(sizeof(int)*mysize
);
3076 for(i
=0; i
<mysize
; i
++) {
3077 smsg_connected_flag
[i
] = 0;
3078 smsg_attr_vector_local
[i
] = NULL
;
3079 smsg_attr_vector_remote
[i
] = NULL
;
3083 SMSG_MAX_MSG
= 4096;
3084 }else if (mysize
<= 4096)
3086 SMSG_MAX_MSG
= 4096/mysize
* 1024;
3087 }else if (mysize
<= 16384)
3094 send_smsg_attr
.msg_type
= GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT
;
3095 send_smsg_attr
.mbox_maxcredit
= SMSG_MAX_CREDIT
;
3096 send_smsg_attr
.msg_maxsize
= SMSG_MAX_MSG
;
3097 status
= GNI_SmsgBufferSizeNeeded(&send_smsg_attr
, &smsg_memlen
);
3098 GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status
);
3100 mailbox_list
= (dynamic_smsg_mailbox_t
*)malloc(sizeof(dynamic_smsg_mailbox_t
));
3101 mailbox_list
->size
= smsg_memlen
*avg_smsg_connection
;
3102 posix_memalign(&mailbox_list
->mailbox_base
, 64, mailbox_list
->size
);
3103 bzero(mailbox_list
->mailbox_base
, mailbox_list
->size
);
3104 mailbox_list
->offset
= 0;
3105 mailbox_list
->next
= 0;
3107 status
= GNI_MemRegister(nic_hndl
, (uint64_t)(mailbox_list
->mailbox_base
),
3108 mailbox_list
->size
, smsg_rx_cqh
,
3111 &(mailbox_list
->mem_hndl
));
3112 GNI_RC_CHECK("MEMORY registration for smsg", status
);
3114 status
= GNI_EpCreate(nic_hndl
, default_tx_cqh
, &ep_hndl_unbound
);
3115 GNI_RC_CHECK("Unbound EP", status
);
3117 alloc_smsg_attr(&send_smsg_attr
);
3119 status
= GNI_EpPostDataWId (ep_hndl_unbound
, &send_smsg_attr
, SMSG_ATTR_SIZE
, &recv_smsg_attr
, SMSG_ATTR_SIZE
, myrank
);
3120 GNI_RC_CHECK("post unbound datagram", status
);
3122 /* always pre-connect to proc 0 */
3123 //if (myrank != 0) connect_to(0);
3126 static void _init_static_smsg()
3128 gni_smsg_attr_t
*smsg_attr
;
3129 gni_smsg_attr_t remote_smsg_attr
;
3130 gni_smsg_attr_t
*smsg_attr_vec
;
3131 gni_mem_handle_t my_smsg_mdh_mailbox
;
3133 gni_return_t status
;
3134 uint32_t vmdh_index
= -1;
3135 mdh_addr_t base_infor
;
3136 mdh_addr_t
*base_addr_vec
;
3141 SMSG_MAX_MSG
= 1024;
3142 }else if (mysize
<= 4096)
3144 SMSG_MAX_MSG
= 1024;
3145 }else if (mysize
<= 16384)
3152 env
= getenv("CHARM_UGNI_SMSG_MAX_SIZE");
3153 if (env
) SMSG_MAX_MSG
= atoi(env
);
3154 CmiAssert(SMSG_MAX_MSG
> 0);
3156 smsg_attr
= malloc(mysize
* sizeof(gni_smsg_attr_t
));
3158 smsg_attr
[0].msg_type
= GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT
;
3159 smsg_attr
[0].mbox_maxcredit
= SMSG_MAX_CREDIT
;
3160 smsg_attr
[0].msg_maxsize
= SMSG_MAX_MSG
;
3161 status
= GNI_SmsgBufferSizeNeeded(&smsg_attr
[0], &smsg_memlen
);
3162 GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status
);
3163 ret
= posix_memalign(&smsg_mailbox_base
, 64, smsg_memlen
*(mysize
));
3164 CmiAssert(ret
== 0);
3165 bzero(smsg_mailbox_base
, smsg_memlen
*(mysize
));
3167 status
= GNI_MemRegister(nic_hndl
, (uint64_t)smsg_mailbox_base
,
3168 smsg_memlen
*(mysize
), smsg_rx_cqh
,
3171 &my_smsg_mdh_mailbox
);
3172 register_memory_size
+= smsg_memlen
*(mysize
);
3173 GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status
);
3175 if (myrank
== 0) printf("Charm++> SMSG memory: %1.1fKB\n", 1.0*smsg_memlen
*(mysize
)/1024);
3176 if (myrank
== 0 && register_memory_size
>=MAX_REG_MEM
) printf("Charm++> FATAL ERROR your program has risk of hanging \n please set CHARM_UGNI_MEMPOOL_MAX to a larger value or use Dynamic smsg\n");
3178 base_infor
.addr
= (uint64_t)smsg_mailbox_base
;
3179 base_infor
.mdh
= my_smsg_mdh_mailbox
;
3180 base_addr_vec
= malloc(mysize
* sizeof(mdh_addr_t
));
3182 allgather(&base_infor
, base_addr_vec
, sizeof(mdh_addr_t
));
3184 for(i
=0; i
<mysize
; i
++)
3188 smsg_attr
[i
].msg_type
= GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT
;
3189 smsg_attr
[i
].mbox_maxcredit
= SMSG_MAX_CREDIT
;
3190 smsg_attr
[i
].msg_maxsize
= SMSG_MAX_MSG
;
3191 smsg_attr
[i
].mbox_offset
= i
*smsg_memlen
;
3192 smsg_attr
[i
].buff_size
= smsg_memlen
;
3193 smsg_attr
[i
].msg_buffer
= smsg_mailbox_base
;
3194 smsg_attr
[i
].mem_hndl
= my_smsg_mdh_mailbox
;
3197 for(i
=0; i
<mysize
; i
++)
3199 if (myrank
== i
) continue;
3201 remote_smsg_attr
.msg_type
= GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT
;
3202 remote_smsg_attr
.mbox_maxcredit
= SMSG_MAX_CREDIT
;
3203 remote_smsg_attr
.msg_maxsize
= SMSG_MAX_MSG
;
3204 remote_smsg_attr
.mbox_offset
= myrank
*smsg_memlen
;
3205 remote_smsg_attr
.buff_size
= smsg_memlen
;
3206 remote_smsg_attr
.msg_buffer
= (void*)base_addr_vec
[i
].addr
;
3207 remote_smsg_attr
.mem_hndl
= base_addr_vec
[i
].mdh
;
3209 /* initialize the smsg channel */
3210 status
= GNI_SmsgInit(ep_hndl_array
[i
], &smsg_attr
[i
], &remote_smsg_attr
);
3211 GNI_RC_CHECK("SMSG Init", status
);
3212 } //end initialization
3214 free(base_addr_vec
);
3217 status
= GNI_SmsgSetMaxRetrans(nic_hndl
, 4096);
3218 GNI_RC_CHECK("SmsgSetMaxRetrans Init", status
);
3222 static void _init_send_queue(SMSG_QUEUE
*queue
)
3226 queue
->sendMsgBuf
= PCQueueCreate();
3227 destpe_avail
= (char*)malloc(mysize
* sizeof(char));
3229 queue
->smsg_msglist_index
= (MSG_LIST_INDEX
*)malloc(mysize
*sizeof(MSG_LIST_INDEX
));
3230 #if CMK_SMP && SMP_LOCKS
3231 nonEmptyQueues
= PCQueueCreate();
3233 for(i
=0; i
<mysize
; i
++)
3236 queue
->smsg_msglist_index
[i
].sendSmsgBuf
= PCQueueCreate();
3238 queue
->smsg_msglist_index
[i
].pushed
= 0;
3239 queue
->smsg_msglist_index
[i
].lock
= CmiCreateLock();
3242 queue
->smsg_msglist_index
[i
].sendSmsgBuf
= 0;
3243 queue
->smsg_msglist_index
[i
].next
= -1;
3244 queue
->smsg_head_index
= -1;
3252 static void _init_smsg()
3256 _init_dynamic_smsg();
3258 _init_static_smsg();
3261 _init_send_queue(&smsg_queue
);
3263 _init_send_queue(&smsg_oob_queue
);
3267 static void _init_static_msgq()
3269 gni_return_t status
;
3270 /* MSGQ is to send and receive short messages for large jobs (exceeding 200,000 ranks). The performance scales by the node count rather than rank count */
3271 msgq_attrs
.max_msg_sz
= MSGQ_MAXSIZE
;
3272 msgq_attrs
.smsg_q_sz
= 1;
3273 msgq_attrs
.rcv_pool_sz
= 1;
3274 msgq_attrs
.num_msgq_eps
= 2;
3275 msgq_attrs
.nloc_insts
= 8;
3276 msgq_attrs
.modes
= 0;
3277 msgq_attrs
.rcv_cq_sz
= REMOTE_QUEUE_ENTRIES
;
3279 status
= GNI_MsgqInit(nic_hndl
, NULL
, NULL
, NULL
, &msgq_attrs
, &msgq_handle
);
3280 GNI_RC_CHECK("MSGQ Init", status
);
3286 static CmiUInt8 total_mempool_size
= 0;
3287 static CmiUInt8 total_mempool_calls
= 0;
3289 #if USE_LRTS_MEMPOOL
3290 void *alloc_mempool_block(size_t *size
, gni_mem_handle_t
*mem_hndl
, int expand_flag
)
3294 gni_return_t status
= GNI_RC_SUCCESS
;
3296 size_t default_size
= expand_flag
? _expand_mem
: _mempool_size
;
3297 if (*size
< default_size
) *size
= default_size
;
3299 // round up to be multiple of _tlbpagesize
3300 //*size = (*size + _tlbpagesize - 1)/_tlbpagesize*_tlbpagesize;
3301 *size
= ALIGNHUGEPAGE(*size
);
3303 total_mempool_size
+= *size
;
3304 total_mempool_calls
+= 1;
3306 if ((*size
> MAX_REG_MEM
|| *size
> MAX_BUFF_SEND
) && expand_flag
)
3308 printf("Error: A mempool block with size %lld is allocated, which is greater than the maximum mempool allowed.\n Please increase the max pool size by using +gni-mempool-max or set enviorment variable CHARM_UGNI_MEMPOOL_MAX. (current=%lld, %lld)\n", *size
, MAX_REG_MEM
, MAX_BUFF_SEND
);
3309 CmiAbort("alloc_mempool_block");
3313 pool
= my_get_huge_pages(*size
);
3316 ret
= posix_memalign(&pool
, ALIGNBUF
, *size
);
3319 #if CMK_SMP && STEAL_MEMPOOL
3320 pool
= steal_mempool_block(size
, mem_hndl
);
3321 if (pool
!= NULL
) return pool
;
3323 printf("Charm++> can not allocate memory pool of size %.2fMB. \n", 1.0*(*size
)/1024/1024);
3325 CmiAbort("alloc_mempool_block: out of memory.");
3327 CmiAbort("alloc_mempool_block: posix_memalign failed");
3332 MEMORY_REGISTER(onesided_hnd
, nic_hndl
, pool
, *size
, mem_hndl
, &omdh
, rdma_rx_cqh
, status
);
3334 if(status
!= GNI_RC_SUCCESS
) {
3335 printf("[%d, %d] memory reigstration %f G (%lld) ask for %lld\n", myrank
, CmiMyRank(), register_memory_size
/(1024*1024.0*1024),register_count
, *size
);
3336 sweep_mempool(CpvAccess(mempool
));
3338 GNI_RC_CHECK("MEMORY_REGISTER", status
);
3340 SetMemHndlZero((*mem_hndl
));
3345 // ptr is a block head pointer
3346 void free_mempool_block(void *ptr
, gni_mem_handle_t mem_hndl
)
3348 if(!(IsMemHndlZero(mem_hndl
)))
3350 MEMORY_DEREGISTER(onesided_hnd
, nic_hndl
, &mem_hndl
, &omdh
, GetSizeFromBlockHeader(ptr
));
3353 my_free_huge_pages(ptr
, GetSizeFromBlockHeader(ptr
));
3360 void LrtsPreCommonInit(int everReturn
){
3361 #if USE_LRTS_MEMPOOL
3362 CpvInitialize(mempool_type
*, mempool
);
3363 CpvAccess(mempool
) = mempool_init(_mempool_size
, alloc_mempool_block
, free_mempool_block
, _mempool_size_limit
);
3364 MACHSTATE2(8, "mempool_init %d %p\n", CmiMyRank(), CpvAccess(mempool
)) ;
3368 void LrtsInit(int *argc
, char ***argv
, int *numNodes
, int *myNodeID
)
3373 unsigned int remote_addr
;
3374 gni_cdm_handle_t cdm_hndl
;
3375 gni_return_t status
= GNI_RC_SUCCESS
;
3376 uint32_t vmdh_index
= -1;
3378 unsigned int local_addr
, *MPID_UGNI_AllAddr
;
3383 //void (*local_event_handler)(gni_cq_entry_t *, void *) = &LocalEventHandle;
3384 //void (*remote_smsg_event_handler)(gni_cq_entry_t *, void *) = &RemoteSmsgEventHandle;
3385 //void (*remote_bte_event_handler)(gni_cq_entry_t *, void *) = &RemoteBteEventHandle;
3387 status
= PMI_Init(&first_spawned
);
3388 GNI_RC_CHECK("PMI_Init", status
);
3390 status
= PMI_Get_size(&mysize
);
3391 GNI_RC_CHECK("PMI_Getsize", status
);
3393 status
= PMI_Get_rank(&myrank
);
3394 GNI_RC_CHECK("PMI_getrank", status
);
3396 //physicalID = CmiPhysicalNodeID(myrank);
3398 //printf("Pysical Node ID:%d for PE:%d\n", physicalID, myrank);
3403 #if MULTI_THREAD_SEND
3404 /* Currently, we only consider the case that comm. thread will only recv msgs */
3405 Cmi_smp_mode_setting
= COMM_WORK_THREADS_SEND_RECV
;
3408 env
= getenv("CHARM_UGNI_REMOTE_QUEUE_SIZE");
3409 if (env
) REMOTE_QUEUE_ENTRIES
= atoi(env
);
3410 CmiGetArgInt(*argv
,"+useRecvQueue", &REMOTE_QUEUE_ENTRIES
);
3412 env
= getenv("CHARM_UGNI_LOCAL_QUEUE_SIZE");
3413 if (env
) LOCAL_QUEUE_ENTRIES
= atoi(env
);
3414 CmiGetArgInt(*argv
,"+useSendQueue", &LOCAL_QUEUE_ENTRIES
);
3416 env
= getenv("CHARM_UGNI_DYNAMIC_SMSG");
3417 if (env
) useDynamicSMSG
= 1;
3418 if (!useDynamicSMSG
)
3419 useDynamicSMSG
= CmiGetArgFlag(*argv
, "+useDynamicSmsg");
3420 CmiGetArgIntDesc(*argv
, "+smsgConnection", &avg_smsg_connection
,"Initial number of SMSGS connection per code");
3421 if (avg_smsg_connection
>mysize
) avg_smsg_connection
= mysize
;
3422 //useStaticMSGQ = CmiGetArgFlag(*argv, "+useStaticMsgQ");
3426 printf("Charm++> Running on Gemini (GNI) with %d processes\n", mysize
);
3427 printf("Charm++> %s SMSG\n", useDynamicSMSG
?"dynamic":"static");
3430 onesided_init(NULL
, &onesided_hnd
);
3432 // this is a GNI test, so use the libonesided bypass functionality
3433 onesided_gni_bypass_get_nih(onesided_hnd
, &nic_hndl
);
3434 local_addr
= gniGetNicAddress();
3437 cookie
= get_cookie();
3439 modes
= GNI_CDM_MODE_CQ_NIC_LOCAL_PLACEMENT
;
3441 //Create and attach to the communication domain */
3442 status
= GNI_CdmCreate(myrank
, ptag
, cookie
, modes
, &cdm_hndl
);
3443 GNI_RC_CHECK("GNI_CdmCreate", status
);
3444 //* device id The device id is the minor number for the device
3445 //that is assigned to the device by the system when the device is created.
3446 //To determine the device number, look in the /dev directory, which contains a list of devices. For a NIC, the device is listed as kgniX
3447 //where X is the device number 0 default
3448 status
= GNI_CdmAttach(cdm_hndl
, device_id
, &local_addr
, &nic_hndl
);
3449 GNI_RC_CHECK("GNI_CdmAttach", status
);
3450 local_addr
= get_gni_nic_address(0);
3452 MPID_UGNI_AllAddr
= (unsigned int *)malloc(sizeof(unsigned int) * mysize
);
3453 _MEMCHECK(MPID_UGNI_AllAddr
);
3454 allgather(&local_addr
, MPID_UGNI_AllAddr
, sizeof(unsigned int));
3455 /* create the local completion queue */
3456 /* the third parameter : The number of events the NIC allows before generating an interrupt. Setting this parameter to zero results in interrupt delivery with every event. When using this parameter, the mode parameter must be set to GNI_CQ_BLOCKING*/
3457 status
= GNI_CqCreate(nic_hndl
, LOCAL_QUEUE_ENTRIES
, 0, GNI_CQ_NOBLOCK
, NULL
, NULL
, &default_tx_cqh
);
3458 GNI_RC_CHECK("GNI_CqCreate (tx)", status
);
3460 status
= GNI_CqCreate(nic_hndl
, LOCAL_QUEUE_ENTRIES
, 0, GNI_CQ_NOBLOCK
, NULL
, NULL
, &rdma_tx_cqh
);
3461 GNI_RC_CHECK("GNI_CqCreate RDMA (tx)", status
);
3462 /* create the destination completion queue for receiving micro-messages, make this queue considerably larger than the number of transfers */
3464 status
= GNI_CqCreate(nic_hndl
, REMOTE_QUEUE_ENTRIES
, 0, GNI_CQ_NOBLOCK
, NULL
, NULL
, &smsg_rx_cqh
);
3465 GNI_RC_CHECK("Create CQ (rx)", status
);
3467 status
= GNI_CqCreate(nic_hndl
, REMOTE_QUEUE_ENTRIES
, 0, GNI_CQ_NOBLOCK
, NULL
, NULL
, &rdma_rx_cqh
);
3468 GNI_RC_CHECK("Create Post CQ (rx)", status
);
3470 //status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &rdma_cqh);
3471 //GNI_RC_CHECK("Create BTE CQ", status);
3473 /* create the endpoints. they need to be bound to allow later CQWrites to them */
3474 ep_hndl_array
= (gni_ep_handle_t
*)malloc(mysize
* sizeof(gni_ep_handle_t
));
3475 _MEMCHECK(ep_hndl_array
);
3476 #if MULTI_THREAD_SEND
3477 rx_cq_lock
= global_gni_lock
= default_tx_cq_lock
= smsg_mailbox_lock
= CmiCreateLock();
3478 //default_tx_cq_lock = CmiCreateLock();
3479 rdma_tx_cq_lock
= CmiCreateLock();
3480 smsg_rx_cq_lock
= CmiCreateLock();
3481 //global_gni_lock = CmiCreateLock();
3482 //rx_cq_lock = CmiCreateLock();
3484 for (i
=0; i
<mysize
; i
++) {
3485 if(i
== myrank
) continue;
3486 status
= GNI_EpCreate(nic_hndl
, default_tx_cqh
, &ep_hndl_array
[i
]);
3487 GNI_RC_CHECK("GNI_EpCreate ", status
);
3488 remote_addr
= MPID_UGNI_AllAddr
[i
];
3489 status
= GNI_EpBind(ep_hndl_array
[i
], remote_addr
, i
);
3490 GNI_RC_CHECK("GNI_EpBind ", status
);
3493 /* SMSG is fastest but not scale; Msgq is scalable, FMA is own implementation for small message */
3497 #if USE_LRTS_MEMPOOL
3498 env
= getenv("CHARM_UGNI_MAX_MEMORY_ON_NODE");
3500 _totalmem
= CmiReadSize(env
);
3502 printf("Charm++> total registered memory available per node is %.1fGB\n", (float)(_totalmem
*1.0/oneGB
));
3505 env
= getenv("CHARM_UGNI_MEMPOOL_INIT_SIZE");
3506 if (env
) _mempool_size
= CmiReadSize(env
);
3507 if (CmiGetArgStringDesc(*argv
,"+gni-mempool-init-size",&env
,"Set the memory pool size"))
3508 _mempool_size
= CmiReadSize(env
);
3511 env
= getenv("CHARM_UGNI_MEMPOOL_MAX");
3513 MAX_REG_MEM
= CmiReadSize(env
);
3516 if (CmiGetArgStringDesc(*argv
,"+gni-mempool-max",&env
,"Set the memory pool max size")) {
3517 MAX_REG_MEM
= CmiReadSize(env
);
3521 env
= getenv("CHARM_UGNI_SEND_MAX");
3523 MAX_BUFF_SEND
= CmiReadSize(env
);
3526 if (CmiGetArgStringDesc(*argv
,"+gni-mempool-max-send",&env
,"Set the memory pool max size for send")) {
3527 MAX_BUFF_SEND
= CmiReadSize(env
);
3531 env
= getenv("CHARM_UGNI_MEMPOOL_SIZE_LIMIT");
3533 _mempool_size_limit
= CmiReadSize(env
);
3536 if (MAX_REG_MEM
< _mempool_size
) MAX_REG_MEM
= _mempool_size
;
3537 if (MAX_BUFF_SEND
> MAX_REG_MEM
) MAX_BUFF_SEND
= MAX_REG_MEM
;
3540 printf("Charm++> memory pool init block size: %1.fMB, total registered memory per node: %1.fMB\n", _mempool_size
/1024.0/1024, _mempool_size_limit
/1024.0/1024);
3541 printf("Charm++> memory pool registered memory limit: %1.fMB, send limit: %1.fMB\n", MAX_REG_MEM
/1024.0/1024, MAX_BUFF_SEND
/1024.0/1024);
3542 if (MAX_REG_MEM
< BIG_MSG
* 2 + oneMB
) {
3543 /* memblock can expand to BIG_MSG * 2 size */
3544 printf("Charm++ Error: The mempool maximum size is too small, please use command line option +gni-mempool-max or environment variable CHARM_UGNI_MEMPOOL_MAX to increase the value to at least %1.fMB.\n", BIG_MSG
* 2.0/1024/1024 + 1);
3545 CmiAbort("mempool maximum size is too small. \n");
3547 #if MULTI_THREAD_SEND
3548 printf("Charm++> worker thread sending messages\n");
3549 #elif COMM_THREAD_SEND
3550 printf("Charm++> only comm thread send/recv messages\n");
3554 #endif /* end of USE_LRTS_MEMPOOL */
3556 env
= getenv("CHARM_UGNI_BIG_MSG_SIZE");
3558 BIG_MSG
= CmiReadSize(env
);
3559 if (BIG_MSG
< ONE_SEG
)
3560 CmiAbort("BIG_MSG size is too small in the environment variable CHARM_UGNI_BIG_MSG_SIZE.");
3562 env
= getenv("CHARM_UGNI_BIG_MSG_PIPELINE_LEN");
3564 BIG_MSG_PIPELINE
= atoi(env
);
3567 env
= getenv("CHARM_UGNI_NO_DEADLOCK_CHECK");
3568 if (env
) _checkProgress
= 0;
3569 if (mysize
== 1) _checkProgress
= 0;
3573 env = getenv("HUGETLB_DEFAULT_PAGE_SIZE");
3575 _tlbpagesize = CmiReadSize(env);
3577 /* real gethugepagesize() is only available when hugetlb module linked */
3578 _tlbpagesize
= gethugepagesize();
3580 printf("Charm++> Cray TLB page size: %1.fK\n", _tlbpagesize
/1024.0);
3584 if (_tlbpagesize
== 4096) {
3585 CmiAbort("Hugepage module, e.g. craype-hugepages8M must be loaded.");
3589 print_stats
= CmiGetArgFlag(*argv
, "+print_stats");
3591 stats_off
= CmiGetArgFlag(*argv
, "+stats_off");
3593 /* init DMA buffer for medium message */
3595 //_init_DMA_buffer();
3597 free(MPID_UGNI_AllAddr
);
3599 sendRdmaBuf
= PCQueueCreate();
3603 #if MACHINE_DEBUG_LOG
3605 sprintf(ln
,"debugLog.%d",myrank
);
3606 debugLog
=fopen(ln
,"w");
3612 int code
= mkdir("counters", 00777);
3613 sprintf(ln
,"counters/statistics.%d.%d", mysize
, myrank
);
3614 counterLog
=fopen(ln
,"w");
3618 // ntk_return_t sts = NTK_System_GetSmpdCount(&_smpd_count);
3621 IndexPool_init(&ackPool
);
3629 void* LrtsAlloc(int n_bytes
, int header
)
3633 printf("\n[PE:%d]Alloc Lrts for bytes=%d, head=%d %d\n", CmiMyPe(), n_bytes
, header
, SMSG_MAX_MSG
);
3635 if(n_bytes
<= SMSG_MAX_MSG
)
3637 int totalsize
= n_bytes
+header
;
3638 ptr
= malloc(totalsize
);
3641 CmiAssert(header
+sizeof(mempool_header
) <= ALIGNBUF
);
3642 #if USE_LRTS_MEMPOOL
3643 n_bytes
= ALIGN64(n_bytes
);
3644 if(n_bytes
< BIG_MSG
)
3646 char *res
= mempool_malloc(CpvAccess(mempool
), ALIGNBUF
+n_bytes
-sizeof(mempool_header
), 1);
3647 if (res
) ptr
= res
- sizeof(mempool_header
) + ALIGNBUF
- header
;
3651 //printf("[%d] LrtsAlloc a big_msg: %d %d\n", myrank, n_bytes, ALIGNHUGEPAGE(n_bytes+ALIGNBUF));
3652 n_bytes
= ALIGNHUGEPAGE(n_bytes
+ALIGNBUF
);
3653 char *res
= my_get_huge_pages(n_bytes
);
3655 char *res
= memalign(ALIGNBUF
, n_bytes
+ALIGNBUF
);
3657 if (res
) ptr
= res
+ ALIGNBUF
- header
;
3660 n_bytes
= ALIGN64(n_bytes
); /* make sure size if 4 aligned */
3661 char *res
= memalign(ALIGNBUF
, n_bytes
+ALIGNBUF
);
3662 ptr
= res
+ ALIGNBUF
- header
;
3665 #if CMK_PERSISTENT_COMM
3666 if (ptr
) SetMemHndlZero(MEMHFIELD((char*)ptr
+header
));
3671 void LrtsFree(void *msg
)
3673 CmiUInt4 size
= SIZEFIELD((char*)msg
+sizeof(CmiChunkHeader
));
3674 #if CMK_PERSISTENT_COMM
3675 if (!IsMemHndlZero(MEMHFIELD((char*)msg
+sizeof(CmiChunkHeader
)))) return;
3677 if (size
<= SMSG_MAX_MSG
)
3680 size
= ALIGN64(size
);
3684 int s
= ALIGNHUGEPAGE(size
+ALIGNBUF
);
3685 my_free_huge_pages((char*)msg
+ sizeof(CmiChunkHeader
) - ALIGNBUF
, s
);
3687 free((char*)msg
+ sizeof(CmiChunkHeader
) - ALIGNBUF
);
3691 #if USE_LRTS_MEMPOOL
3693 mempool_free_thread((char*)msg
+ sizeof(CmiChunkHeader
) - ALIGNBUF
+ sizeof(mempool_header
));
3695 mempool_free(CpvAccess(mempool
), (char*)msg
+ sizeof(CmiChunkHeader
) - ALIGNBUF
+ sizeof(mempool_header
));
3698 free((char*)msg
+ sizeof(CmiChunkHeader
) - ALIGNBUF
);
3708 if(CmiMyRank() == CmiMyNodeSize())
3710 if (print_stats
) print_comm_stats();
3713 #if USE_LRTS_MEMPOOL
3714 //printf("FINAL [%d, %d] register=%lld, send=%lld\n", myrank, CmiMyRank(), register_memory_size, buffered_send_msg);
3715 mempool_destroy(CpvAccess(mempool
));
3721 void LrtsDrainResources()
3723 if(mysize
== 1) return;
3726 !SendBufferMsg(&smsg_oob_queue
) ||
3728 !SendBufferMsg(&smsg_queue
)
3732 PumpDatagramConnection();
3734 PumpLocalTransactions(default_tx_cqh
, default_tx_cq_lock
);
3735 PumpLocalTransactions(rdma_tx_cqh
, rdma_tx_cq_lock
);
3741 void LrtsAbort(const char *message
) {
3742 fprintf(stderr
, "[%d] CmiAbort: %s\n", myrank
, message
);
3743 CmiPrintStackTrace(0);
3744 PMI_Abort(-1, message
);
3747 /************************** TIMER FUNCTIONS **************************/
3748 #if CMK_TIMER_USE_SPECIAL
3749 /* MPI calls are not threadsafe, even the timer on some machines */
3750 static CmiNodeLock timerLock
= 0;
3751 static int _absoluteTime
= 0;
3752 static int _is_global
= 0;
3753 static struct timespec start_ts
;
3755 inline int CmiTimerIsSynchronized() {
3759 inline int CmiTimerAbsolute() {
3760 return _absoluteTime
;
3763 double CmiStartTimer() {
3767 double CmiInitTime() {
3768 return (double)(start_ts
.tv_sec
)+(double)start_ts
.tv_nsec
/1000000000.0;
3771 void CmiTimerInit(char **argv
) {
3772 _absoluteTime
= CmiGetArgFlagDesc(argv
,"+useAbsoluteTime", "Use system's absolute time as wallclock time.");
3773 if (_absoluteTime
&& CmiMyPe() == 0)
3774 printf("Charm++> absolute timer is used\n");
3776 _is_global
= CmiTimerIsSynchronized();
3780 if (CmiMyRank() == 0) {
3781 clock_gettime(CLOCK_MONOTONIC
, &start_ts
);
3783 } else { /* we don't have a synchronous timer, set our own start time */
3787 clock_gettime(CLOCK_MONOTONIC
, &start_ts
);
3789 CmiNodeAllBarrier(); /* for smp */
3793 * Since the timerLock is never created, and is
3794 * always NULL, then all the if-condition inside
3795 * the timer functions could be disabled right
3796 * now in the case of SMP.
3798 double CmiTimer(void) {
3799 struct timespec now_ts
;
3800 clock_gettime(CLOCK_MONOTONIC
, &now_ts
);
3801 return _absoluteTime
?((double)(now_ts
.tv_sec
)+(double)now_ts
.tv_nsec
/1000000000.0)
3802 : (double)( now_ts
.tv_sec
- start_ts
.tv_sec
) + (((double) now_ts
.tv_nsec
- (double) start_ts
.tv_nsec
) / 1000000000.0);
3805 double CmiWallTimer(void) {
3806 struct timespec now_ts
;
3807 clock_gettime(CLOCK_MONOTONIC
, &now_ts
);
3808 return _absoluteTime
?((double)(now_ts
.tv_sec
)+(double)now_ts
.tv_nsec
/1000000000.0)
3809 : ( now_ts
.tv_sec
- start_ts
.tv_sec
) + ((now_ts
.tv_nsec
- start_ts
.tv_nsec
) / 1000000000.0);
3812 double CmiCpuTimer(void) {
3813 struct timespec now_ts
;
3814 clock_gettime(CLOCK_MONOTONIC
, &now_ts
);
3815 return _absoluteTime
?((double)(now_ts
.tv_sec
)+(double)now_ts
.tv_nsec
/1000000000.0)
3816 : (double)( now_ts
.tv_sec
- start_ts
.tv_sec
) + (((double) now_ts
.tv_nsec
- (double) start_ts
.tv_nsec
) / 1000000000.0);
3820 /************Barrier Related Functions****************/
3824 gni_return_t status
;
3827 /* make sure all ranks reach here, otherwise comm threads may reach barrier ignoring other ranks */
3828 CmiNodeAllBarrier();
3829 if (CmiMyRank() == CmiMyNodeSize())
3831 if (CmiMyRank() == 0)
3835 * The call of CmiBarrier is usually before the initialization
3836 * of trace module of Charm++, therefore, the START_EVENT
3837 * and END_EVENT are disabled here. -Chao Mei
3840 status
= PMI_Barrier();
3841 GNI_RC_CHECK("PMI_Barrier", status
);
3844 CmiNodeAllBarrier();
3849 #include "machine-cmidirect.c"
3851 #if CMK_PERSISTENT_COMM
3852 #include "machine-persistent.c"