fix a bug in error checking for overflow
[charm.git] / src / arch / gemini_gni / machine.c
blob5688ce715267f7ddaf68196af1f1213bd7c8eeed
2 /** @file
3 * Gemini GNI machine layer
5 * Author: Yanhua Sun
6 Gengbin Zheng
7 * Date: 07-01-2011
9 * Flow control by mem pool using environment variables:
11 # CHARM_UGNI_MEMPOOL_MAX can be maximum_register_mem/number_of_processes
12 # CHARM_UGNI_SEND_MAX can be half of CHARM_UGNI_MEMPOOL_MAX
13 export CHARM_UGNI_MEMPOOL_INIT_SIZE=8M
14 export CHARM_UGNI_MEMPOOL_MAX=20M
15 export CHARM_UGNI_SEND_MAX=10M
17 # limit on total mempool size allocated, this is to prevent mempool
18 # uses too much memory
19 export CHARM_UGNI_MEMPOOL_SIZE_LIMIT=512M
21 other environment variables:
23 export CHARM_UGNI_NO_DEADLOCK_CHECK=yes # disable checking deadlock
24 export CHARM_UGNI_MAX_MEMORY_ON_NODE=0.8G # max memory per node for mempool
25 export CHARM_UGNI_BIG_MSG_SIZE=4M # set big message size protocol
26 export CHARM_UGNI_BIG_MSG_PIPELINE_LEN=4 # set big message pipe len
28 /*@{*/
30 #include <stdio.h>
31 #include <stdlib.h>
32 #include <stdint.h>
33 #include <errno.h>
34 #include <malloc.h>
35 #include <unistd.h>
36 #include <time.h>
38 #include <gni_pub.h>
39 #include <pmi.h>
40 //#include <numatoolkit.h>
42 #include "converse.h"
44 #if CMK_DIRECT
45 #include "cmidirect.h"
46 #endif
48 #define LARGEPAGE 0
50 #if CMK_SMP
51 #define MULTI_THREAD_SEND 0
52 #define COMM_THREAD_SEND 1
53 #endif
55 #if MULTI_THREAD_SEND
56 #define CMK_WORKER_SINGLE_TASK 0
57 #endif
59 #define REMOTE_EVENT 0
60 #define CQWRITE 0
62 #define CMI_EXERT_SEND_CAP 0
63 #define CMI_EXERT_RECV_CAP 0
65 #if CMI_EXERT_SEND_CAP
66 #define SEND_CAP 16
67 #endif
69 #if CMI_EXERT_RECV_CAP
70 #define RECV_CAP 2
71 #endif
73 #define USE_LRTS_MEMPOOL 1
75 #define PRINT_SYH 0
77 // Trace communication thread
78 #if CMK_TRACE_ENABLED && CMK_SMP_TRACE_COMMTHREAD
79 #define TRACE_THRESHOLD 0.00005
80 #define CMI_MPI_TRACE_MOREDETAILED 0
81 #undef CMI_MPI_TRACE_USEREVENTS
82 #define CMI_MPI_TRACE_USEREVENTS 1
83 #else
84 #undef CMK_SMP_TRACE_COMMTHREAD
85 #define CMK_SMP_TRACE_COMMTHREAD 0
86 #endif
88 #define CMK_TRACE_COMMOVERHEAD 0
89 #if CMK_TRACE_ENABLED && CMK_TRACE_COMMOVERHEAD
90 #undef CMI_MPI_TRACE_USEREVENTS
91 #define CMI_MPI_TRACE_USEREVENTS 1
92 #else
93 #undef CMK_TRACE_COMMOVERHEAD
94 #define CMK_TRACE_COMMOVERHEAD 0
95 #endif
97 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && ! CMK_TRACE_IN_CHARM
98 CpvStaticDeclare(double, projTraceStart);
99 #define START_EVENT() CpvAccess(projTraceStart) = CmiWallTimer();
100 #define END_EVENT(x) traceUserBracketEvent(x, CpvAccess(projTraceStart), CmiWallTimer());
101 #else
102 #define START_EVENT()
103 #define END_EVENT(x)
104 #endif
106 #if USE_LRTS_MEMPOOL
108 #define oneMB (1024ll*1024)
109 #define oneGB (1024ll*1024*1024)
111 static CmiInt8 _mempool_size = 8*oneMB;
112 static CmiInt8 _expand_mem = 4*oneMB;
113 static CmiInt8 _mempool_size_limit = 0;
115 static CmiInt8 _totalmem = 0.8*oneGB;
117 #if LARGEPAGE
118 static CmiInt8 BIG_MSG = 16*oneMB;
119 static CmiInt8 ONE_SEG = 4*oneMB;
120 #else
121 static CmiInt8 BIG_MSG = 4*oneMB;
122 static CmiInt8 ONE_SEG = 2*oneMB;
123 #endif
124 #if MULTI_THREAD_SEND
125 static int BIG_MSG_PIPELINE = 1;
126 #else
127 static int BIG_MSG_PIPELINE = 4;
128 #endif
130 // dynamic flow control
131 static CmiInt8 buffered_send_msg = 0;
132 static CmiInt8 register_memory_size = 0;
134 #if LARGEPAGE
135 static CmiInt8 MAX_BUFF_SEND = 100000*oneMB;
136 static CmiInt8 MAX_REG_MEM = 200000*oneMB;
137 static CmiInt8 register_count = 0;
138 #else
139 #if CMK_SMP && COMM_THREAD_SEND
140 static CmiInt8 MAX_BUFF_SEND = 100*oneMB;
141 static CmiInt8 MAX_REG_MEM = 200*oneMB;
142 #else
143 static CmiInt8 MAX_BUFF_SEND = 16*oneMB;
144 static CmiInt8 MAX_REG_MEM = 25*oneMB;
145 #endif
148 #endif
150 #endif /* end USE_LRTS_MEMPOOL */
152 #if MULTI_THREAD_SEND
153 #define CMI_GNI_LOCK(x) CmiLock(x);
154 #define CMI_GNI_UNLOCK(x) CmiUnlock(x);
155 #define CMI_PCQUEUEPOP_LOCK(Q) CmiLock((Q)->lock);
156 #define CMI_PCQUEUEPOP_UNLOCK(Q) CmiUnlock((Q)->lock);
157 #else
158 #define CMI_GNI_LOCK(x)
159 #define CMI_GNI_UNLOCK(x)
160 #define CMI_PCQUEUEPOP_LOCK(Q)
161 #define CMI_PCQUEUEPOP_UNLOCK(Q)
162 #endif
164 static int _tlbpagesize = 4096;
166 //static int _smpd_count = 0;
168 static int user_set_flag = 0;
170 static int _checkProgress = 1; /* check deadlock */
171 static int _detected_hang = 0;
173 #define SMSG_ATTR_SIZE sizeof(gni_smsg_attr_t)
175 // dynamic SMSG
176 static int useDynamicSMSG =0; /* dynamic smsgs setup */
178 static int avg_smsg_connection = 32;
179 static int *smsg_connected_flag= 0;
180 static gni_smsg_attr_t **smsg_attr_vector_local;
181 static gni_smsg_attr_t **smsg_attr_vector_remote;
182 static gni_ep_handle_t ep_hndl_unbound;
183 static gni_smsg_attr_t send_smsg_attr;
184 static gni_smsg_attr_t recv_smsg_attr;
186 typedef struct _dynamic_smsg_mailbox{
187 void *mailbox_base;
188 int size;
189 int offset;
190 gni_mem_handle_t mem_hndl;
191 struct _dynamic_smsg_mailbox *next;
192 }dynamic_smsg_mailbox_t;
194 static dynamic_smsg_mailbox_t *mailbox_list;
196 static CmiUInt8 smsg_send_count = 0, last_smsg_send_count = 0;
197 static CmiUInt8 smsg_recv_count = 0, last_smsg_recv_count = 0;
199 #if PRINT_SYH
200 int lrts_send_msg_id = 0;
201 int lrts_local_done_msg = 0;
202 int lrts_send_rdma_success = 0;
203 #endif
205 #include "machine.h"
207 #include "pcqueue.h"
209 #include "mempool.h"
211 #if CMK_PERSISTENT_COMM
212 #include "machine-persistent.h"
213 #endif
215 //#define USE_ONESIDED 1
216 #ifdef USE_ONESIDED
217 //onesided implementation is wrong, since no place to restore omdh
218 #include "onesided.h"
219 onesided_hnd_t onesided_hnd;
220 onesided_md_t omdh;
221 #define MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh) omdh. onesided_mem_register(handler, (uint64_t)msg, size, 0, myomdh)
223 #define MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh) onesided_mem_deregister(handler, myomdh)
225 #else
226 uint8_t onesided_hnd, omdh;
228 #if REMOTE_EVENT
229 #define MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdhh, cqh, status) \
230 if(register_memory_size+size>= MAX_REG_MEM) { \
231 status = GNI_RC_ERROR_NOMEM;} \
232 else {status = GNI_MemRegister(nic_hndl, (uint64_t)msg, (uint64_t)size, cqh, GNI_MEM_READWRITE, -1, mem_hndl); \
233 if(status == GNI_RC_SUCCESS) register_memory_size += size; }
234 #else
235 #define MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh, cqh, status ) \
236 if (register_memory_size + size >= MAX_REG_MEM) { \
237 status = GNI_RC_ERROR_NOMEM; \
238 } else { status = GNI_MemRegister(nic_hndl, (uint64_t)msg, (uint64_t)size, NULL, GNI_MEM_READWRITE, -1, mem_hndl); \
239 if(status == GNI_RC_SUCCESS) register_memory_size += size; }
240 #endif
242 #define MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh, size) \
243 do { if (GNI_MemDeregister(nic_hndl, (mem_hndl) ) == GNI_RC_SUCCESS) \
244 register_memory_size -= size; \
245 else CmiAbort("MEM_DEregister"); \
246 } while (0)
247 #endif
249 #define GetMempoolBlockPtr(x) (((mempool_header*)((char*)(x)-ALIGNBUF))->block_ptr)
250 #define GetMempoolPtr(x) GetMempoolBlockPtr(x)->mptr
251 #define GetMempoolsize(x) GetMempoolBlockPtr(x)->size
252 #define GetMemHndl(x) GetMempoolBlockPtr(x)->mem_hndl
253 #define IncreaseMsgInRecv(x) (GetMempoolBlockPtr(x)->msgs_in_recv)++
254 #define DecreaseMsgInRecv(x) (GetMempoolBlockPtr(x)->msgs_in_recv)--
255 #define IncreaseMsgInSend(x) (GetMempoolBlockPtr(x)->msgs_in_send)++
256 #define DecreaseMsgInSend(x) (GetMempoolBlockPtr(x)->msgs_in_send)--
257 #define NoMsgInSend(x) GetMempoolBlockPtr(x)->msgs_in_send == 0
258 #define NoMsgInRecv(x) GetMempoolBlockPtr(x)->msgs_in_recv == 0
259 #define NoMsgInFlight(x) (GetMempoolBlockPtr(x)->msgs_in_send + GetMempoolBlockPtr(x)->msgs_in_recv == 0)
260 #define IsMemHndlZero(x) ((x).qword1 == 0 && (x).qword2 == 0)
261 #define SetMemHndlZero(x) do {(x).qword1 = 0;(x).qword2 = 0;} while (0)
262 #define NotRegistered(x) IsMemHndlZero(((block_header*)x)->mem_hndl)
264 #define GetMemHndlFromBlockHeader(x) ((block_header*)x)->mem_hndl
265 #define GetSizeFromBlockHeader(x) ((block_header*)x)->size
267 #define CmiGetMsgSize(m) ((CmiMsgHeaderExt*)m)->size
268 #define CmiSetMsgSize(m,s) ((((CmiMsgHeaderExt*)m)->size)=(s))
269 #define CmiGetMsgSeq(m) ((CmiMsgHeaderExt*)m)->seq
270 #define CmiSetMsgSeq(m, s) ((((CmiMsgHeaderExt*)m)->seq) = (s))
272 #define ALIGNBUF 64
274 /* =======Beginning of Definitions of Performance-Specific Macros =======*/
275 /* If SMSG is not used */
277 #define FMA_PER_CORE 1024
278 #define FMA_BUFFER_SIZE 1024
280 /* If SMSG is used */
281 static int SMSG_MAX_MSG = 1024;
282 #define SMSG_MAX_CREDIT 72
284 #define MSGQ_MAXSIZE 2048
285 /* large message transfer with FMA or BTE */
286 #define LRTS_GNI_RDMA_THRESHOLD 1024
288 #if CMK_SMP
289 static int REMOTE_QUEUE_ENTRIES=163840;
290 static int LOCAL_QUEUE_ENTRIES=163840;
291 #else
292 static int REMOTE_QUEUE_ENTRIES=20480;
293 static int LOCAL_QUEUE_ENTRIES=20480;
294 #endif
296 #define BIG_MSG_TAG 0x26
297 #define PUT_DONE_TAG 0x28
298 #define DIRECT_PUT_DONE_TAG 0x29
299 #define ACK_TAG 0x30
300 /* SMSG is data message */
301 #define SMALL_DATA_TAG 0x31
302 #define SMALL_DATA_ACK_TAG 0x32
303 /* SMSG is a control message to initialize a BTE */
304 #define LMSG_INIT_TAG 0x39
305 #define LMSG_INIT_ACK_TAG 0x3a
307 #define DEBUG
308 #ifdef GNI_RC_CHECK
309 #undef GNI_RC_CHECK
310 #endif
311 #ifdef DEBUG
312 #define GNI_RC_CHECK(msg,rc) do { if(rc != GNI_RC_SUCCESS) { printf("[%d] %s; err=%s\n",CmiMyPe(),msg,gni_err_str[rc]); CmiAbort("GNI_RC_CHECK"); } } while(0)
313 #else
314 #define GNI_RC_CHECK(msg,rc)
315 #endif
317 #define ALIGN64(x) (size_t)((~63)&((x)+63))
318 //#define ALIGN4(x) (size_t)((~3)&((x)+3))
319 #define ALIGNHUGEPAGE(x) (size_t)((~(_tlbpagesize-1))&((x)+_tlbpagesize-1))
321 static int useStaticMSGQ = 0;
322 static int useStaticFMA = 0;
323 static int mysize, myrank;
324 static gni_nic_handle_t nic_hndl;
326 typedef struct {
327 gni_mem_handle_t mdh;
328 uint64_t addr;
329 } mdh_addr_t ;
330 // this is related to dynamic SMSG
332 typedef struct mdh_addr_list{
333 gni_mem_handle_t mdh;
334 void *addr;
335 struct mdh_addr_list *next;
336 }mdh_addr_list_t;
338 static unsigned int smsg_memlen;
339 gni_smsg_attr_t **smsg_local_attr_vec = 0;
340 mdh_addr_t setup_mem;
341 mdh_addr_t *smsg_connection_vec = 0;
342 gni_mem_handle_t smsg_connection_memhndl;
343 static int smsg_expand_slots = 10;
344 static int smsg_available_slot = 0;
345 static void *smsg_mailbox_mempool = 0;
346 mdh_addr_list_t *smsg_dynamic_list = 0;
348 static void *smsg_mailbox_base;
349 gni_msgq_attr_t msgq_attrs;
350 gni_msgq_handle_t msgq_handle;
351 gni_msgq_ep_attr_t msgq_ep_attrs;
352 gni_msgq_ep_attr_t msgq_ep_attrs_size;
354 /* =====Beginning of Declarations of Machine Specific Variables===== */
355 static int cookie;
356 static int modes = 0;
357 static gni_cq_handle_t smsg_rx_cqh = NULL;
358 static gni_cq_handle_t default_tx_cqh = NULL;
359 static gni_cq_handle_t rdma_tx_cqh = NULL;
360 static gni_cq_handle_t rdma_rx_cqh = NULL;
361 static gni_cq_handle_t post_tx_cqh = NULL;
362 static gni_ep_handle_t *ep_hndl_array;
364 static CmiNodeLock *ep_lock_array;
365 static CmiNodeLock default_tx_cq_lock;
366 static CmiNodeLock rdma_tx_cq_lock;
367 static CmiNodeLock global_gni_lock;
368 static CmiNodeLock rx_cq_lock;
369 static CmiNodeLock smsg_mailbox_lock;
370 static CmiNodeLock smsg_rx_cq_lock;
371 static CmiNodeLock *mempool_lock;
373 typedef struct msg_list
375 uint32_t destNode;
376 uint32_t size;
377 void *msg;
378 uint8_t tag;
379 #if !CMK_SMP
380 struct msg_list *next;
381 #endif
382 }MSG_LIST;
385 typedef struct control_msg
387 uint64_t source_addr; /* address from the start of buffer */
388 uint64_t dest_addr; /* address from the start of buffer */
389 int total_length; /* total length */
390 int length; /* length of this packet */
391 #if REMOTE_EVENT
392 int ack_index; /* index from integer to address */
393 #endif
394 uint8_t seq_id; //big message 0 meaning single message
395 gni_mem_handle_t source_mem_hndl;
396 struct control_msg *next;
397 } CONTROL_MSG;
399 #define CONTROL_MSG_SIZE (sizeof(CONTROL_MSG)-sizeof(void*))
401 typedef struct ack_msg
403 uint64_t source_addr; /* address from the start of buffer */
404 #if ! USE_LRTS_MEMPOOL
405 gni_mem_handle_t source_mem_hndl;
406 int length; /* total length */
407 #endif
408 struct ack_msg *next;
409 } ACK_MSG;
411 #define ACK_MSG_SIZE (sizeof(ACK_MSG)-sizeof(void*))
413 #if CMK_DIRECT
414 typedef struct{
415 uint64_t handler_addr;
416 }CMK_DIRECT_HEADER;
418 typedef struct {
419 char core[CmiMsgHeaderSizeBytes];
420 uint64_t handler;
421 }cmidirectMsg;
423 //SYH
424 CpvDeclare(int, CmiHandleDirectIdx);
425 void CmiHandleDirectMsg(cmidirectMsg* msg)
428 CmiDirectUserHandle *_handle= (CmiDirectUserHandle*)(msg->handler);
429 (*(_handle->callbackFnPtr))(_handle->callbackData);
430 CmiFree(msg);
433 void CmiDirectInit()
435 CpvInitialize(int, CmiHandleDirectIdx);
436 CpvAccess(CmiHandleDirectIdx) = CmiRegisterHandler( (CmiHandler) CmiHandleDirectMsg);
439 #endif
440 typedef struct rmda_msg
442 int destNode;
443 #if REMOTE_EVENT
444 int ack_index;
445 #endif
446 gni_post_descriptor_t *pd;
447 #if !CMK_SMP
448 struct rmda_msg *next;
449 #endif
450 }RDMA_REQUEST;
453 #if CMK_SMP
454 #define SMP_LOCKS 0
455 #define ONE_SEND_QUEUE 0
456 PCQueue sendRdmaBuf;
457 typedef struct msg_list_index
459 PCQueue sendSmsgBuf;
460 int pushed;
461 CmiNodeLock lock;
462 } MSG_LIST_INDEX;
463 char *destpe_avail;
464 #if !ONE_SEND_QUEUE && SMP_LOCKS
465 PCQueue nonEmptyQueues;
466 #endif
467 #else /* non-smp */
469 static RDMA_REQUEST *sendRdmaBuf = 0;
470 static RDMA_REQUEST *sendRdmaTail = 0;
471 typedef struct msg_list_index
473 int next;
474 MSG_LIST *sendSmsgBuf;
475 MSG_LIST *tail;
476 } MSG_LIST_INDEX;
478 #endif
480 // buffered send queue
481 #if ! ONE_SEND_QUEUE
482 typedef struct smsg_queue
484 MSG_LIST_INDEX *smsg_msglist_index;
485 int smsg_head_index;
486 } SMSG_QUEUE;
487 #else
488 typedef struct smsg_queue
490 PCQueue sendMsgBuf;
491 } SMSG_QUEUE;
492 #endif
494 SMSG_QUEUE smsg_queue;
495 #if CMK_USE_OOB
496 SMSG_QUEUE smsg_oob_queue;
497 #endif
499 #if CMK_SMP
501 #define FreeMsgList(d) free(d);
502 #define MallocMsgList(d) d = ((MSG_LIST*)malloc(sizeof(MSG_LIST)));
504 #else
506 static MSG_LIST *msglist_freelist=0;
508 #define FreeMsgList(d) \
509 do { \
510 (d)->next = msglist_freelist;\
511 msglist_freelist = d; \
512 } while (0)
514 #define MallocMsgList(d) \
515 do { \
516 d = msglist_freelist;\
517 if (d==0) {d = ((MSG_LIST*)malloc(sizeof(MSG_LIST)));\
518 _MEMCHECK(d);\
519 } else msglist_freelist = d->next; \
520 d->next =0; \
521 } while (0)
523 #endif
525 #if CMK_SMP
527 #define FreeControlMsg(d) free(d);
528 #define MallocControlMsg(d) d = ((CONTROL_MSG*)malloc(sizeof(CONTROL_MSG)));
530 #else
532 static CONTROL_MSG *control_freelist=0;
534 #define FreeControlMsg(d) \
535 do { \
536 (d)->next = control_freelist;\
537 control_freelist = d; \
538 } while (0);
540 #define MallocControlMsg(d) \
541 do { \
542 d = control_freelist;\
543 if (d==0) {d = ((CONTROL_MSG*)malloc(sizeof(CONTROL_MSG)));\
544 _MEMCHECK(d);\
545 } else control_freelist = d->next; \
546 } while (0);
548 #endif
550 #if CMK_SMP
552 #define FreeAckMsg(d) free(d);
553 #define MallocAckMsg(d) d = ((ACK_MSG*)malloc(sizeof(ACK_MSG)));
555 #else
557 static ACK_MSG *ack_freelist=0;
559 #define FreeAckMsg(d) \
560 do { \
561 (d)->next = ack_freelist;\
562 ack_freelist = d; \
563 } while (0)
565 #define MallocAckMsg(d) \
566 do { \
567 d = ack_freelist;\
568 if (d==0) {d = ((ACK_MSG*)malloc(sizeof(ACK_MSG)));\
569 _MEMCHECK(d);\
570 } else ack_freelist = d->next; \
571 } while (0)
573 #endif
576 # if CMK_SMP
577 #define FreeRdmaRequest(d) free(d);
578 #define MallocRdmaRequest(d) d = ((RDMA_REQUEST*)malloc(sizeof(RDMA_REQUEST)));
579 #else
581 static RDMA_REQUEST *rdma_freelist = NULL;
583 #define FreeRdmaRequest(d) \
584 do { \
585 (d)->next = rdma_freelist;\
586 rdma_freelist = d; \
587 } while (0)
589 #define MallocRdmaRequest(d) \
590 do { \
591 d = rdma_freelist;\
592 if (d==0) {d = ((RDMA_REQUEST*)malloc(sizeof(RDMA_REQUEST)));\
593 _MEMCHECK(d);\
594 } else rdma_freelist = d->next; \
595 d->next =0; \
596 } while (0)
597 #endif
599 /* reuse gni_post_descriptor_t */
600 static gni_post_descriptor_t *post_freelist=0;
602 #if !CMK_SMP
603 #define FreePostDesc(d) \
604 (d)->next_descr = post_freelist;\
605 post_freelist = d;
607 #define MallocPostDesc(d) \
608 d = post_freelist;\
609 if (d==0) { \
610 d = ((gni_post_descriptor_t*)malloc(sizeof(gni_post_descriptor_t)));\
611 d->next_descr = 0;\
612 _MEMCHECK(d);\
613 } else post_freelist = d->next_descr;
614 #else
616 #define FreePostDesc(d) free(d);
617 #define MallocPostDesc(d) d = ((gni_post_descriptor_t*)malloc(sizeof(gni_post_descriptor_t))); _MEMCHECK(d);
619 #endif
622 /* LrtsSent is called but message can not be sent by SMSGSend because of mailbox full or no credit */
623 static int buffered_smsg_counter = 0;
625 /* SmsgSend return success but message sent is not confirmed by remote side */
626 static MSG_LIST *buffered_fma_head = 0;
627 static MSG_LIST *buffered_fma_tail = 0;
629 /* functions */
630 #define IsFree(a,ind) !( a& (1<<(ind) ))
631 #define SET_BITS(a,ind) a = ( a | (1<<(ind )) )
632 #define Reset(a,ind) a = ( a & (~(1<<(ind))) )
634 CpvDeclare(mempool_type*, mempool);
636 #if REMOTE_EVENT
637 /* ack pool for remote events */
639 #define ACK_SHIFT 19
640 #define ACK_EVENT(idx) ((idx<<ACK_SHIFT) | myrank)
641 #define ACK_GET_RANK(evt) (evt & ((1<<ACK_SHIFT)-1))
642 #define ACK_GET_INDEX(evt) (evt >> ACK_SHIFT)
644 struct AckPool {
645 void *addr;
646 int next;
649 static struct AckPool *ackpool;
650 static int ackpoolsize;
651 static int ackpool_freehead;
652 static CmiNodeLock ackpool_lock;
654 #define GetAckAddress(s) (ackpool[s].addr)
656 static void AckPool_init()
658 int i;
659 ackpoolsize = 2048;
660 ackpool = (struct AckPool *)malloc(ackpoolsize*sizeof(struct AckPool));
661 for (i=0; i<ackpoolsize-1; i++) {
662 ackpool[i].next = i+1;
664 ackpool[i].next = -1;
665 ackpool_freehead = 0;
666 #if MULTI_THREAD_SEND
667 ackpool_lock = CmiCreateLock();
668 #endif
671 static
672 inline int AckPool_getslot(void *addr)
674 int i;
675 int s;
676 CMI_GNI_LOCK(ackpool_lock);
677 s = ackpool_freehead;
678 if (s == -1) {
679 // printf("[%d] AckPool_getslot expand: %d\n", myrank, ackpoolsize);
680 int newsize = ackpoolsize * 2;
681 if (newsize >= 1<<(32-ACK_SHIFT)) CmiAbort("AckPool too large");
682 struct AckPool *old_ackpool = ackpool;
683 ackpool = (struct AckPool *)malloc(newsize*sizeof(struct AckPool));
684 memcpy(ackpool, old_ackpool, ackpoolsize*sizeof(struct AckPool));
685 for (i=ackpoolsize; i<newsize-1; i++) {
686 ackpool[i].next = i+1;
688 ackpool[i].next = -1;
689 ackpool_freehead = ackpoolsize;
690 s = ackpoolsize;
691 ackpoolsize = newsize;
692 free(old_ackpool);
694 ackpool_freehead = ackpool[s].next;
695 ackpool[s].addr = addr;
696 CMI_GNI_UNLOCK(ackpool_lock);
697 return s;
700 static
701 inline void AckPool_freeslot(int s)
703 CmiAssert(s>=0 && s<ackpoolsize);
704 CMI_GNI_LOCK(ackpool_lock);
705 ackpool[s].next = ackpool_freehead;
706 ackpool_freehead = s;
707 CMI_GNI_UNLOCK(ackpool_lock);
711 #endif
713 #if CMK_WITH_STATS
714 typedef struct comm_thread_stats
716 int count_in_send_buffered_ack;
717 double time_in_send_buffered_ack;
718 double max_time_in_send_buffered_ack;
719 int count_in_send_buffered_smsg;
720 double time_in_send_buffered_smsg;
721 double max_time_in_send_buffered_smsg;
722 } Comm_Thread_Stats;
724 static Comm_Thread_Stats comm_stats;
726 static void init_comm_stats()
728 memset(&comm_stats, 0, sizeof(Comm_Thread_Stats));
731 #define STATS_ACK_TIME(x) \
732 { double t = CmiWallTimer(); \
733 x; \
734 t = CmiWallTimer() - t; \
735 comm_stats.count_in_send_buffered_ack ++; \
736 comm_stats.time_in_send_buffered_ack += t; \
737 if (t>comm_stats.max_time_in_send_buffered_ack) \
738 comm_stats.max_time_in_send_buffered_ack = t; \
741 #define STATS_SEND_SMSGS_TIME(x) \
742 { double t = CmiWallTimer(); \
743 x; \
744 t = CmiWallTimer() - t; \
745 comm_stats.count_in_send_buffered_smsg ++; \
746 comm_stats.time_in_send_buffered_smsg += t; \
747 if (t>comm_stats.max_time_in_send_buffered_smsg) \
748 comm_stats.max_time_in_send_buffered_smsg = t; \
751 static void print_comm_stats()
753 if (myrank == 0)
754 printf("PE[%d] count\ttime\tmax \n", myrank);
755 printf("PE[%d] send buffered ack: %d\t%f\t%f\n", myrank, comm_stats.count_in_send_buffered_ack, comm_stats.time_in_send_buffered_ack, comm_stats.max_time_in_send_buffered_ack);
756 printf("PE[%d] send smsgs: %d\t%f\t%f\n", myrank, comm_stats.count_in_send_buffered_smsg, comm_stats.time_in_send_buffered_smsg, comm_stats.max_time_in_send_buffered_smsg);
758 #else
759 #define STATS_ACK_TIME(x) x
760 #define STATS_SEND_SMSGS_TIME(x) x
761 #endif
763 static int print_stats = 0;
765 static void
766 allgather(void *in,void *out, int len)
768 static int *ivec_ptr=NULL,already_called=0,job_size=0;
769 int i,rc;
770 int my_rank;
771 char *tmp_buf,*out_ptr;
773 if(!already_called) {
775 rc = PMI_Get_size(&job_size);
776 CmiAssert(rc == PMI_SUCCESS);
777 rc = PMI_Get_rank(&my_rank);
778 CmiAssert(rc == PMI_SUCCESS);
780 ivec_ptr = (int *)malloc(sizeof(int) * job_size);
781 CmiAssert(ivec_ptr != NULL);
783 rc = PMI_Allgather(&my_rank,ivec_ptr,sizeof(int));
784 CmiAssert(rc == PMI_SUCCESS);
786 already_called = 1;
790 tmp_buf = (char *)malloc(job_size * len);
791 CmiAssert(tmp_buf);
793 rc = PMI_Allgather(in,tmp_buf,len);
794 CmiAssert(rc == PMI_SUCCESS);
796 out_ptr = out;
798 for(i=0;i<job_size;i++) {
800 memcpy(&out_ptr[len * ivec_ptr[i]],&tmp_buf[i * len],len);
804 free(tmp_buf);
807 static void
808 allgather_2(void *in,void *out, int len)
810 //PMI_Allgather is out of order
811 int i,rc, extend_len;
812 int rank_index;
813 char *out_ptr, *out_ref;
814 char *in2;
816 extend_len = sizeof(int) + len;
817 in2 = (char*)malloc(extend_len);
819 memcpy(in2, &myrank, sizeof(int));
820 memcpy(in2+sizeof(int), in, len);
822 out_ptr = (char*)malloc(mysize*extend_len);
824 rc = PMI_Allgather(in2, out_ptr, extend_len);
825 GNI_RC_CHECK("allgather", rc);
827 out_ref = out;
829 for(i=0;i<mysize;i++) {
830 //rank index
831 memcpy(&rank_index, &(out_ptr[extend_len*i]), sizeof(int));
832 //copy to the rank index slot
833 memcpy(&out_ref[rank_index*len], &out_ptr[extend_len*i+sizeof(int)], len);
836 free(out_ptr);
837 free(in2);
841 static unsigned int get_gni_nic_address(int device_id)
843 unsigned int address, cpu_id;
844 gni_return_t status;
845 int i, alps_dev_id=-1,alps_address=-1;
846 char *token, *p_ptr;
848 p_ptr = getenv("PMI_GNI_DEV_ID");
849 if (!p_ptr) {
850 status = GNI_CdmGetNicAddress(device_id, &address, &cpu_id);
852 GNI_RC_CHECK("GNI_CdmGetNicAddress", status);
853 } else {
854 while ((token = strtok(p_ptr,":")) != NULL) {
855 alps_dev_id = atoi(token);
856 if (alps_dev_id == device_id) {
857 break;
859 p_ptr = NULL;
861 CmiAssert(alps_dev_id != -1);
862 p_ptr = getenv("PMI_GNI_LOC_ADDR");
863 CmiAssert(p_ptr != NULL);
864 i = 0;
865 while ((token = strtok(p_ptr,":")) != NULL) {
866 if (i == alps_dev_id) {
867 alps_address = atoi(token);
868 break;
870 p_ptr = NULL;
871 ++i;
873 CmiAssert(alps_address != -1);
874 address = alps_address;
876 return address;
879 static uint8_t get_ptag(void)
881 char *p_ptr, *token;
882 uint8_t ptag;
884 p_ptr = getenv("PMI_GNI_PTAG");
885 CmiAssert(p_ptr != NULL);
886 token = strtok(p_ptr, ":");
887 ptag = (uint8_t)atoi(token);
888 return ptag;
892 static uint32_t get_cookie(void)
894 uint32_t cookie;
895 char *p_ptr, *token;
897 p_ptr = getenv("PMI_GNI_COOKIE");
898 CmiAssert(p_ptr != NULL);
899 token = strtok(p_ptr, ":");
900 cookie = (uint32_t)atoi(token);
902 return cookie;
905 #if LARGEPAGE
907 /* directly mmap memory from hugetlbfs for large pages */
909 #include <sys/stat.h>
910 #include <fcntl.h>
911 #include <sys/mman.h>
912 #include <hugetlbfs.h>
914 // size must be _tlbpagesize aligned
915 void *my_get_huge_pages(size_t size)
917 char filename[512];
918 int fd;
919 mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
920 void *ptr = NULL;
922 snprintf(filename, sizeof(filename), "%s/charm_mempool.%d.%d", hugetlbfs_find_path_for_size(_tlbpagesize), getpid(), rand());
923 fd = open(filename, O_RDWR | O_CREAT, mode);
924 if (fd == -1) {
925 CmiAbort("my_get_huge_pages: open filed");
927 ptr = mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_PRIVATE, fd, 0);
928 if (ptr == MAP_FAILED) ptr = NULL;
929 //printf("[%d] my_get_huge_pages: %s %d %p\n", myrank, filename, size, ptr);
930 close(fd);
931 unlink(filename);
932 return ptr;
935 void my_free_huge_pages(void *ptr, int size)
937 //printf("[%d] my_free_huge_pages: %p %d\n", myrank, ptr, size);
938 int ret = munmap(ptr, size);
939 if (ret == -1) CmiAbort("munmap failed in my_free_huge_pages");
942 #endif
944 /* =====Beginning of Definitions of Message-Corruption Related Macros=====*/
945 /* TODO: add any that are related */
946 /* =====End of Definitions of Message-Corruption Related Macros=====*/
949 #include "machine-lrts.h"
950 #include "machine-common-core.c"
952 /* Network progress function is used to poll the network when for
953 messages. This flushes receive buffers on some implementations*/
954 #if CMK_MACHINE_PROGRESS_DEFINED
955 void CmiMachineProgressImpl() {
957 #endif
959 static int SendBufferMsg(SMSG_QUEUE *queue);
960 static void SendRdmaMsg();
961 static void PumpNetworkSmsg();
962 static void PumpLocalTransactions(gni_cq_handle_t tx_cqh, CmiNodeLock cq_lock);
963 #if CQWRITE
964 static void PumpCqWriteTransactions();
965 #endif
966 #if REMOTE_EVENT
967 static void PumpRemoteTransactions();
968 #endif
970 #if MACHINE_DEBUG_LOG
971 FILE *debugLog = NULL;
972 static CmiInt8 buffered_recv_msg = 0;
973 int lrts_smsg_success = 0;
974 int lrts_received_msg = 0;
975 #endif
977 static void sweep_mempool(mempool_type *mptr)
979 int n = 0;
980 block_header *current = &(mptr->block_head);
982 printf("[n %d %d] sweep_mempool slot START.\n", myrank, n++);
983 while( current!= NULL) {
984 printf("[n %d %d] sweep_mempool slot %p size: %d (%d %d) %lld %lld.\n", myrank, n++, current, current->size, current->msgs_in_send, current->msgs_in_recv, current->mem_hndl.qword1, current->mem_hndl.qword2);
985 current = current->block_next?(block_header *)((char*)mptr+current->block_next):NULL;
987 printf("[n %d] sweep_mempool slot END.\n", myrank);
990 inline
991 static gni_return_t deregisterMemory(mempool_type *mptr, block_header **from)
993 block_header *current = *from;
995 //while(register_memory_size>= MAX_REG_MEM)
997 while( current!= NULL && ((current->msgs_in_send+current->msgs_in_recv)>0 || IsMemHndlZero(current->mem_hndl) ))
998 current = current->block_next?(block_header *)((char*)mptr+current->block_next):NULL;
1000 *from = current;
1001 if(current == NULL) return GNI_RC_ERROR_RESOURCE;
1002 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(GetMemHndlFromBlockHeader(current)) , &omdh, GetSizeFromBlockHeader(current));
1003 SetMemHndlZero(GetMemHndlFromBlockHeader(current));
1005 return GNI_RC_SUCCESS;
1008 inline
1009 static gni_return_t registerFromMempool(mempool_type *mptr, void *blockaddr, size_t size, gni_mem_handle_t *memhndl, gni_cq_handle_t cqh )
1011 gni_return_t status = GNI_RC_SUCCESS;
1012 //int size = GetMempoolsize(msg);
1013 //void *blockaddr = GetMempoolBlockPtr(msg);
1014 //gni_mem_handle_t *memhndl = &(GetMemHndl(msg));
1016 block_header *current = &(mptr->block_head);
1017 while(register_memory_size>= MAX_REG_MEM)
1019 status = deregisterMemory(mptr, &current);
1020 if (status != GNI_RC_SUCCESS) break;
1022 if(register_memory_size>= MAX_REG_MEM) return status;
1024 MACHSTATE3(8, "mempool (%lld,%lld,%d) \n", buffered_send_msg, buffered_recv_msg, register_memory_size);
1025 while(1)
1027 MEMORY_REGISTER(onesided_hnd, nic_hndl, blockaddr, size, memhndl, &omdh, cqh, status);
1028 if(status == GNI_RC_SUCCESS)
1030 break;
1032 else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1034 CmiAbort("Memory registor for mempool fails\n");
1036 else
1038 status = deregisterMemory(mptr, &current);
1039 if (status != GNI_RC_SUCCESS) break;
1042 return status;
1045 inline
1046 static gni_return_t registerMemory(void *msg, size_t size, gni_mem_handle_t *t, gni_cq_handle_t cqh )
1048 static int rank = -1;
1049 int i;
1050 gni_return_t status;
1051 mempool_type *mptr1 = CpvAccess(mempool);//mempool_type*)GetMempoolPtr(msg);
1052 //mempool_type *mptr1 = (mempool_type*)GetMempoolPtr(msg);
1053 mempool_type *mptr;
1055 status = registerFromMempool(mptr1, msg, size, t, cqh);
1056 if (status == GNI_RC_SUCCESS) return status;
1057 #if CMK_SMP
1058 for (i=0; i<CmiMyNodeSize()+1; i++) {
1059 rank = (rank+1)%(CmiMyNodeSize()+1);
1060 mptr = CpvAccessOther(mempool, rank);
1061 if (mptr == mptr1) continue;
1062 status = registerFromMempool(mptr, msg, size, t, cqh);
1063 if (status == GNI_RC_SUCCESS) return status;
1065 #endif
1066 return GNI_RC_ERROR_RESOURCE;
1069 inline
1070 static void buffer_small_msgs(SMSG_QUEUE *queue, void *msg, int size, int destNode, uint8_t tag)
1072 MSG_LIST *msg_tmp;
1073 MallocMsgList(msg_tmp);
1074 msg_tmp->destNode = destNode;
1075 msg_tmp->size = size;
1076 msg_tmp->msg = msg;
1077 msg_tmp->tag = tag;
1079 #if !CMK_SMP
1080 if (queue->smsg_msglist_index[destNode].sendSmsgBuf == 0 ) {
1081 queue->smsg_msglist_index[destNode].next = queue->smsg_head_index;
1082 queue->smsg_head_index = destNode;
1083 queue->smsg_msglist_index[destNode].tail = queue->smsg_msglist_index[destNode].sendSmsgBuf = msg_tmp;
1084 }else
1086 queue->smsg_msglist_index[destNode].tail->next = msg_tmp;
1087 queue->smsg_msglist_index[destNode].tail = msg_tmp;
1089 #else
1090 #if ONE_SEND_QUEUE
1091 PCQueuePush(queue->sendMsgBuf, (char*)msg_tmp);
1092 #else
1093 #if SMP_LOCKS
1094 CmiLock(queue->smsg_msglist_index[destNode].lock);
1095 if(queue->smsg_msglist_index[destNode].pushed == 0)
1097 PCQueuePush(nonEmptyQueues, (char*)&(queue->smsg_msglist_index[destNode]));
1099 PCQueuePush(queue->smsg_msglist_index[destNode].sendSmsgBuf, (char*)msg_tmp);
1100 CmiUnlock(queue->smsg_msglist_index[destNode].lock);
1101 #else
1102 PCQueuePush(queue->smsg_msglist_index[destNode].sendSmsgBuf, (char*)msg_tmp);
1103 #endif
1104 #endif
1105 #endif
1106 #if PRINT_SYH
1107 buffered_smsg_counter++;
1108 #endif
1111 inline static void print_smsg_attr(gni_smsg_attr_t *a)
1113 printf("type=%d\n, credit=%d\n, size=%d\n, buf=%p, offset=%d\n", a->msg_type, a->mbox_maxcredit, a->buff_size, a->msg_buffer, a->mbox_offset);
1116 inline
1117 static void setup_smsg_connection(int destNode)
1119 mdh_addr_list_t *new_entry = 0;
1120 gni_post_descriptor_t *pd;
1121 gni_smsg_attr_t *smsg_attr;
1122 gni_return_t status = GNI_RC_NOT_DONE;
1123 RDMA_REQUEST *rdma_request_msg;
1125 if(smsg_available_slot == smsg_expand_slots)
1127 new_entry = (mdh_addr_list_t*)malloc(sizeof(mdh_addr_list_t));
1128 new_entry->addr = memalign(64, smsg_memlen*smsg_expand_slots);
1129 bzero(new_entry->addr, smsg_memlen*smsg_expand_slots);
1131 status = GNI_MemRegister(nic_hndl, (uint64_t)new_entry->addr,
1132 smsg_memlen*smsg_expand_slots, smsg_rx_cqh,
1133 GNI_MEM_READWRITE,
1135 &(new_entry->mdh));
1136 smsg_available_slot = 0;
1137 new_entry->next = smsg_dynamic_list;
1138 smsg_dynamic_list = new_entry;
1140 smsg_attr = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
1141 smsg_attr->msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
1142 smsg_attr->mbox_maxcredit = SMSG_MAX_CREDIT;
1143 smsg_attr->msg_maxsize = SMSG_MAX_MSG;
1144 smsg_attr->mbox_offset = smsg_available_slot * smsg_memlen;
1145 smsg_attr->buff_size = smsg_memlen;
1146 smsg_attr->msg_buffer = smsg_dynamic_list->addr;
1147 smsg_attr->mem_hndl = smsg_dynamic_list->mdh;
1148 smsg_local_attr_vec[destNode] = smsg_attr;
1149 smsg_available_slot++;
1150 MallocPostDesc(pd);
1151 pd->type = GNI_POST_FMA_PUT;
1152 //pd->cq_mode = GNI_CQMODE_GLOBAL_EVENT | GNI_CQMODE_REMOTE_EVENT;
1153 pd->cq_mode = GNI_CQMODE_GLOBAL_EVENT ;
1154 pd->dlvr_mode = GNI_DLVMODE_PERFORMANCE;
1155 pd->length = sizeof(gni_smsg_attr_t);
1156 pd->local_addr = (uint64_t) smsg_attr;
1157 pd->remote_addr = (uint64_t)&((((gni_smsg_attr_t*)(smsg_connection_vec[destNode].addr))[myrank]));
1158 pd->remote_mem_hndl = smsg_connection_vec[destNode].mdh;
1159 pd->src_cq_hndl = rdma_tx_cqh;
1160 pd->rdma_mode = 0;
1161 status = GNI_PostFma(ep_hndl_array[destNode], pd);
1162 print_smsg_attr(smsg_attr);
1163 if(status == GNI_RC_ERROR_RESOURCE )
1165 MallocRdmaRequest(rdma_request_msg);
1166 rdma_request_msg->destNode = destNode;
1167 rdma_request_msg->pd = pd;
1168 /* buffer this request */
1170 #if PRINT_SYH
1171 if(status != GNI_RC_SUCCESS)
1172 printf("[%d=%d] send post FMA %s\n", myrank, destNode, gni_err_str[status]);
1173 else
1174 printf("[%d=%d]OK send post FMA \n", myrank, destNode);
1175 #endif
1178 /* useDynamicSMSG */
1179 inline
1180 static void alloc_smsg_attr( gni_smsg_attr_t *local_smsg_attr)
1182 gni_return_t status = GNI_RC_NOT_DONE;
1184 if(mailbox_list->offset == mailbox_list->size)
1186 dynamic_smsg_mailbox_t *new_mailbox_entry;
1187 new_mailbox_entry = (dynamic_smsg_mailbox_t*)malloc(sizeof(dynamic_smsg_mailbox_t));
1188 new_mailbox_entry->size = smsg_memlen*avg_smsg_connection;
1189 new_mailbox_entry->mailbox_base = malloc(new_mailbox_entry->size);
1190 bzero(new_mailbox_entry->mailbox_base, new_mailbox_entry->size);
1191 new_mailbox_entry->offset = 0;
1193 status = GNI_MemRegister(nic_hndl, (uint64_t)new_mailbox_entry->mailbox_base,
1194 new_mailbox_entry->size, smsg_rx_cqh,
1195 GNI_MEM_READWRITE,
1197 &(new_mailbox_entry->mem_hndl));
1199 GNI_RC_CHECK("register", status);
1200 new_mailbox_entry->next = mailbox_list;
1201 mailbox_list = new_mailbox_entry;
1203 local_smsg_attr->msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
1204 local_smsg_attr->mbox_maxcredit = SMSG_MAX_CREDIT;
1205 local_smsg_attr->msg_maxsize = SMSG_MAX_MSG;
1206 local_smsg_attr->mbox_offset = mailbox_list->offset;
1207 mailbox_list->offset += smsg_memlen;
1208 local_smsg_attr->buff_size = smsg_memlen;
1209 local_smsg_attr->msg_buffer = mailbox_list->mailbox_base;
1210 local_smsg_attr->mem_hndl = mailbox_list->mem_hndl;
1213 /* useDynamicSMSG */
1214 inline
1215 static int connect_to(int destNode)
1217 gni_return_t status = GNI_RC_NOT_DONE;
1218 CmiAssert(smsg_connected_flag[destNode] == 0);
1219 CmiAssert (smsg_attr_vector_local[destNode] == NULL);
1220 smsg_attr_vector_local[destNode] = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
1221 alloc_smsg_attr(smsg_attr_vector_local[destNode]);
1222 smsg_attr_vector_remote[destNode] = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
1224 CMI_GNI_LOCK(global_gni_lock)
1225 status = GNI_EpPostDataWId (ep_hndl_array[destNode], smsg_attr_vector_local[destNode], sizeof(gni_smsg_attr_t),smsg_attr_vector_remote[destNode] ,sizeof(gni_smsg_attr_t), destNode+mysize);
1226 CMI_GNI_UNLOCK(global_gni_lock)
1227 if (status == GNI_RC_ERROR_RESOURCE) {
1228 /* possibly destNode is making connection at the same time */
1229 free(smsg_attr_vector_local[destNode]);
1230 smsg_attr_vector_local[destNode] = NULL;
1231 free(smsg_attr_vector_remote[destNode]);
1232 smsg_attr_vector_remote[destNode] = NULL;
1233 mailbox_list->offset -= smsg_memlen;
1234 return 0;
1236 GNI_RC_CHECK("GNI_Post", status);
1237 smsg_connected_flag[destNode] = 1;
1238 return 1;
1241 inline
1242 static gni_return_t send_smsg_message(SMSG_QUEUE *queue, int destNode, void *msg, int size, uint8_t tag, int inbuff )
1244 unsigned int remote_address;
1245 uint32_t remote_id;
1246 gni_return_t status = GNI_RC_ERROR_RESOURCE;
1247 gni_smsg_attr_t *smsg_attr;
1248 gni_post_descriptor_t *pd;
1249 gni_post_state_t post_state;
1250 char *real_data;
1252 if (useDynamicSMSG) {
1253 switch (smsg_connected_flag[destNode]) {
1254 case 0:
1255 connect_to(destNode); /* continue to case 1 */
1256 case 1: /* pending connection, do nothing */
1257 status = GNI_RC_NOT_DONE;
1258 if(inbuff ==0)
1259 buffer_small_msgs(queue, msg, size, destNode, tag);
1260 return status;
1263 #if CMK_SMP
1264 #if ! ONE_SEND_QUEUE
1265 if(PCQueueEmpty(queue->smsg_msglist_index[destNode].sendSmsgBuf) || inbuff==1)
1266 #endif
1268 #else
1269 if(queue->smsg_msglist_index[destNode].sendSmsgBuf == 0 || inbuff==1)
1271 #endif
1272 uint64_t *buf = NULL;
1273 int bufsize = 0;
1274 //CMI_GNI_LOCK(smsg_mailbox_lock)
1275 CMI_GNI_LOCK(default_tx_cq_lock)
1276 #if CMK_SMP_TRACE_COMMTHREAD
1277 int oldpe = -1;
1278 int oldeventid = -1;
1279 if(tag == SMALL_DATA_TAG || tag == LMSG_INIT_TAG)
1281 START_EVENT();
1282 if ( tag == SMALL_DATA_TAG)
1283 real_data = (char*)msg;
1284 else
1285 real_data = (char*)(((CONTROL_MSG*)msg)->source_addr);
1286 TRACE_COMM_GET_MSGID(real_data, &oldpe, &oldeventid);
1287 TRACE_COMM_SET_COMM_MSGID(real_data);
1289 #endif
1290 #if REMOTE_EVENT
1291 if (tag == LMSG_INIT_TAG) {
1292 CONTROL_MSG *control_msg_tmp = (CONTROL_MSG*)msg;
1293 if (control_msg_tmp->seq_id == 0 && control_msg_tmp->ack_index == -1)
1294 control_msg_tmp->ack_index = AckPool_getslot((void*)control_msg_tmp->source_addr);
1296 // GNI_EpSetEventData(ep_hndl_array[destNode], destNode, myrank);
1297 #endif
1298 status = GNI_SmsgSendWTag(ep_hndl_array[destNode], buf, bufsize, msg, size, 0, tag);
1299 #if CMK_SMP_TRACE_COMMTHREAD
1300 if (oldpe != -1) TRACE_COMM_SET_MSGID(real_data, oldpe, oldeventid);
1301 #endif
1302 CMI_GNI_UNLOCK(default_tx_cq_lock)
1303 //CMI_GNI_UNLOCK(smsg_mailbox_lock)
1304 if(status == GNI_RC_SUCCESS)
1306 #if CMK_SMP_TRACE_COMMTHREAD
1307 if(tag == SMALL_DATA_TAG || tag == LMSG_INIT_TAG || tag == SMALL_DATA_ACK_TAG || tag == LMSG_INIT_ACK_TAG)
1309 TRACE_COMM_CREATION(CpvAccess(projTraceStart), real_data);
1311 #endif
1312 smsg_send_count ++;
1313 }else
1314 status = GNI_RC_ERROR_RESOURCE;
1316 if(status != GNI_RC_SUCCESS && inbuff ==0)
1317 buffer_small_msgs(queue, msg, size, destNode, tag);
1318 return status;
1321 inline
1322 static CONTROL_MSG* construct_control_msg(int size, char *msg, uint8_t seqno)
1324 /* construct a control message and send */
1325 CONTROL_MSG *control_msg_tmp;
1326 MallocControlMsg(control_msg_tmp);
1327 control_msg_tmp->source_addr = (uint64_t)msg;
1328 control_msg_tmp->seq_id = seqno;
1329 control_msg_tmp->total_length = control_msg_tmp->length = ALIGN64(size); //for GET 4 bytes aligned
1330 #if REMOTE_EVENT
1331 control_msg_tmp->ack_index = -1;
1332 #endif
1333 #if USE_LRTS_MEMPOOL
1334 if(size < BIG_MSG)
1336 control_msg_tmp->source_mem_hndl = GetMemHndl(msg);
1338 else
1340 SetMemHndlZero(control_msg_tmp->source_mem_hndl);
1341 control_msg_tmp->length = size - (seqno-1)*ONE_SEG;
1342 if (control_msg_tmp->length > ONE_SEG) control_msg_tmp->length = ONE_SEG;
1344 #else
1345 SetMemHndlZero(control_msg_tmp->source_mem_hndl);
1346 #endif
1347 return control_msg_tmp;
1350 #define BLOCKING_SEND_CONTROL 0
1352 // Large message, send control to receiver, receiver register memory and do a GET,
1353 // return 1 - send no success
1354 inline
1355 static gni_return_t send_large_messages(SMSG_QUEUE *queue, int destNode, CONTROL_MSG *control_msg_tmp, int inbuff)
1357 gni_return_t status = GNI_RC_ERROR_NOMEM;
1358 uint32_t vmdh_index = -1;
1359 int size;
1360 int offset = 0;
1361 uint64_t source_addr;
1362 int register_size;
1363 void *msg;
1365 size = control_msg_tmp->total_length;
1366 source_addr = control_msg_tmp->source_addr;
1367 register_size = control_msg_tmp->length;
1369 #if USE_LRTS_MEMPOOL
1370 if( control_msg_tmp->seq_id == 0 ){
1371 #if BLOCKING_SEND_CONTROL
1372 if (inbuff == 0 && IsMemHndlZero(GetMemHndl(source_addr))) {
1373 while (IsMemHndlZero(GetMemHndl(source_addr)) && buffered_send_msg + GetMempoolsize((void*)source_addr) >= MAX_BUFF_SEND)
1374 LrtsAdvanceCommunication(0);
1376 #endif
1377 if(IsMemHndlZero(GetMemHndl(source_addr))) //it is in mempool, it is possible to be de-registered by others
1379 msg = (void*)source_addr;
1380 if(buffered_send_msg + GetMempoolsize(msg) >= MAX_BUFF_SEND)
1382 if(!inbuff)
1383 buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1384 return GNI_RC_ERROR_NOMEM;
1386 //register the corresponding mempool
1387 status = registerMemory(GetMempoolBlockPtr(msg), GetMempoolsize(msg), &(GetMemHndl(msg)), rdma_rx_cqh);
1388 if(status == GNI_RC_SUCCESS)
1390 control_msg_tmp->source_mem_hndl = GetMemHndl(source_addr);
1392 }else
1394 control_msg_tmp->source_mem_hndl = GetMemHndl(source_addr);
1395 status = GNI_RC_SUCCESS;
1397 if(NoMsgInSend(source_addr))
1398 register_size = GetMempoolsize((void*)(source_addr));
1399 else
1400 register_size = 0;
1401 }else if(control_msg_tmp->seq_id >0) // BIG_MSG
1403 int offset = ONE_SEG*(control_msg_tmp->seq_id-1);
1404 source_addr += offset;
1405 size = control_msg_tmp->length;
1406 #if BLOCKING_SEND_CONTROL
1407 if (inbuff == 0 && IsMemHndlZero(control_msg_tmp->source_mem_hndl)) {
1408 while (IsMemHndlZero(control_msg_tmp->source_mem_hndl) && buffered_send_msg + size >= MAX_BUFF_SEND)
1409 LrtsAdvanceCommunication(0);
1411 #endif
1412 if (IsMemHndlZero(control_msg_tmp->source_mem_hndl)) {
1413 if(buffered_send_msg + size >= MAX_BUFF_SEND)
1415 if(!inbuff)
1416 buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1417 return GNI_RC_ERROR_NOMEM;
1419 status = registerMemory((void*)source_addr, ALIGN64(size), &(control_msg_tmp->source_mem_hndl), NULL);
1420 if(status == GNI_RC_SUCCESS) buffered_send_msg += ALIGN64(size);
1422 else
1424 status = GNI_RC_SUCCESS;
1426 register_size = 0;
1429 if(status == GNI_RC_SUCCESS)
1431 status = send_smsg_message(queue, destNode, control_msg_tmp, CONTROL_MSG_SIZE, LMSG_INIT_TAG, inbuff);
1432 if(status == GNI_RC_SUCCESS)
1434 buffered_send_msg += register_size;
1435 if(control_msg_tmp->seq_id == 0)
1437 IncreaseMsgInSend(source_addr);
1439 FreeControlMsg(control_msg_tmp);
1440 MACHSTATE5(8, "GO SMSG LARGE to %d (%d,%d,%d) tag=%d\n", destNode, buffered_send_msg, buffered_recv_msg, register_memory_size, LMSG_INIT_TAG);
1441 }else
1442 status = GNI_RC_ERROR_RESOURCE;
1444 } else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1446 CmiAbort("Memory registor for large msg\n");
1447 }else
1449 status = GNI_RC_ERROR_NOMEM;
1450 if(!inbuff)
1451 buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1453 return status;
1454 #else
1455 MEMORY_REGISTER(onesided_hnd, nic_hndl,msg, ALIGN64(size), &(control_msg_tmp->source_mem_hndl), &omdh, NULL, status)
1456 if(status == GNI_RC_SUCCESS)
1458 status = send_smsg_message(queue, destNode, control_msg_tmp, CONTROL_MSG_SIZE, LMSG_INIT_TAG, 0);
1459 if(status == GNI_RC_SUCCESS)
1461 FreeControlMsg(control_msg_tmp);
1463 } else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1465 CmiAbort("Memory registor for large msg\n");
1466 }else
1468 buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1470 return status;
1471 #endif
1474 inline void LrtsPrepareEnvelope(char *msg, int size)
1476 CmiSetMsgSize(msg, size);
1479 CmiCommHandle LrtsSendFunc(int destNode, int size, char *msg, int mode)
1481 gni_return_t status = GNI_RC_SUCCESS;
1482 uint8_t tag;
1483 CONTROL_MSG *control_msg_tmp;
1484 int oob = ( mode & OUT_OF_BAND);
1485 SMSG_QUEUE *queue;
1487 MACHSTATE5(8, "GO LrtsSendFn %d(%d) (%d,%d, %d) \n", destNode, size, buffered_send_msg, buffered_recv_msg, register_memory_size);
1488 #if CMK_USE_OOB
1489 queue = oob? &smsg_oob_queue : &smsg_queue;
1490 #else
1491 queue = &smsg_queue;
1492 #endif
1494 LrtsPrepareEnvelope(msg, size);
1496 #if PRINT_SYH
1497 printf("LrtsSendFn %d==>%d, size=%d\n", myrank, destNode, size);
1498 #endif
1499 #if CMK_SMP
1500 if(size <= SMSG_MAX_MSG)
1501 buffer_small_msgs(queue, msg, size, destNode, SMALL_DATA_TAG);
1502 else if (size < BIG_MSG) {
1503 control_msg_tmp = construct_control_msg(size, msg, 0);
1504 buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1506 else {
1507 CmiSetMsgSeq(msg, 0);
1508 control_msg_tmp = construct_control_msg(size, msg, 1);
1509 buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1511 #else //non-smp, smp(worker sending)
1512 if(size <= SMSG_MAX_MSG)
1514 if (GNI_RC_SUCCESS == send_smsg_message(queue, destNode, msg, size, SMALL_DATA_TAG, 0))
1515 CmiFree(msg);
1517 else if (size < BIG_MSG) {
1518 control_msg_tmp = construct_control_msg(size, msg, 0);
1519 send_large_messages(queue, destNode, control_msg_tmp, 0);
1521 else {
1522 #if USE_LRTS_MEMPOOL
1523 CmiSetMsgSeq(msg, 0);
1524 control_msg_tmp = construct_control_msg(size, msg, 1);
1525 send_large_messages(queue, destNode, control_msg_tmp, 0);
1526 #else
1527 control_msg_tmp = construct_control_msg(size, msg, 0);
1528 send_large_messages(queue, destNode, control_msg_tmp, 0);
1529 #endif
1531 #endif
1532 return 0;
1535 static void PumpDatagramConnection();
1536 static int event_SetupConnect = 111;
1537 static int event_PumpSmsg = 222 ;
1538 static int event_PumpTransaction = 333;
1539 static int event_PumpRdmaTransaction = 444;
1540 static int event_SendBufferSmsg = 444;
1541 static int event_SendFmaRdmaMsg = 555;
1542 static int event_AdvanceCommunication = 666;
1544 static void registerUserTraceEvents() {
1545 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1546 event_SetupConnect = traceRegisterUserEvent("setting up connections", -1 );
1547 event_PumpSmsg = traceRegisterUserEvent("Pump network small msgs", -1);
1548 event_PumpTransaction = traceRegisterUserEvent("Pump FMA local transaction" , -1);
1549 event_PumpRdmaTransaction = traceRegisterUserEvent("Pump RDMA local transaction" , -1);
1550 event_SendBufferSmsg = traceRegisterUserEvent("Sending buffered small msgs", -1);
1551 event_SendFmaRdmaMsg = traceRegisterUserEvent("Sending buffered fma/rdma transactions", -1);
1552 event_AdvanceCommunication = traceRegisterUserEvent("Worker thread in sending/receiving", -1);
1553 #endif
1556 static void ProcessDeadlock()
1558 static CmiUInt8 *ptr = NULL;
1559 static CmiUInt8 last = 0, mysum, sum;
1560 static int count = 0;
1561 gni_return_t status;
1562 int i;
1564 //printf("[%d] comm thread detected hang %d %d %d\n", CmiMyPe(), smsg_send_count, smsg_recv_count, count);
1565 //sweep_mempool(CpvAccess(mempool));
1566 if (ptr == NULL) ptr = (CmiUInt8*)malloc(mysize * sizeof(CmiUInt8));
1567 mysum = smsg_send_count + smsg_recv_count;
1568 MACHSTATE5(9,"Before allgather Progress Deadlock (%d,%d) (%d,%d)(%d)\n", buffered_send_msg, register_memory_size, last, sum, count);
1569 status = PMI_Allgather(&mysum,ptr,sizeof(CmiUInt8));
1570 GNI_RC_CHECK("PMI_Allgather", status);
1571 sum = 0;
1572 for (i=0; i<mysize; i++) sum+= ptr[i];
1573 if (last == 0 || sum == last)
1574 count++;
1575 else
1576 count = 0;
1577 last = sum;
1578 MACHSTATE5(9,"Progress Deadlock (%d,%d) (%d,%d)(%d)\n", buffered_send_msg, register_memory_size, last, sum, count);
1579 if (count == 2) {
1580 /* detected twice, it is a real deadlock */
1581 if (myrank == 0) {
1582 CmiPrintf("Charm++> Network progress engine appears to have stalled, possibly because registered memory limits have been exceeded or are too low. Try adjusting environment variables CHARM_UGNI_MEMPOOL_MAX and CHARM_UGNI_SEND_MAX (current limits are %lld and %lld).\n", MAX_REG_MEM, MAX_BUFF_SEND);
1583 CmiAbort("Fatal> Deadlock detected.");
1587 _detected_hang = 0;
1590 static void CheckProgress()
1592 if (smsg_send_count == last_smsg_send_count &&
1593 smsg_recv_count == last_smsg_recv_count )
1595 _detected_hang = 1;
1596 #if !CMK_SMP
1597 if (_detected_hang) ProcessDeadlock();
1598 #endif
1601 else {
1602 //MACHSTATE5(9,"--Check Progress %d(%d, %d) (%d,%d)\n", mycount, buffered_send_msg, register_memory_size, smsg_send_count, smsg_recv_count);
1603 last_smsg_send_count = smsg_send_count;
1604 last_smsg_recv_count = smsg_recv_count;
1605 _detected_hang = 0;
1609 static void set_limit()
1611 //if (!user_set_flag && CmiMyRank() == 0) {
1612 if (CmiMyRank() == 0) {
1613 int mynode = CmiPhysicalNodeID(CmiMyPe());
1614 int numpes = CmiNumPesOnPhysicalNode(mynode);
1615 int numprocesses = numpes / CmiMyNodeSize();
1616 MAX_REG_MEM = _totalmem / numprocesses;
1617 MAX_BUFF_SEND = MAX_REG_MEM / 2;
1618 if (CmiMyPe() == 0)
1619 printf("mem_max = %lld, send_max =%lld\n", MAX_REG_MEM, MAX_BUFF_SEND);
1623 void LrtsPostCommonInit(int everReturn)
1625 #if CMK_DIRECT
1626 CmiDirectInit();
1627 #endif
1628 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1629 CpvInitialize(double, projTraceStart);
1630 /* only PE 0 needs to care about registration (to generate sts file). */
1631 //if (CmiMyPe() == 0)
1633 registerMachineUserEventsFunction(&registerUserTraceEvents);
1635 #endif
1637 #if CMK_SMP
1638 CmiIdleState *s=CmiNotifyGetState();
1639 CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn)CmiNotifyBeginIdle,(void *)s);
1640 CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,(void *)s);
1641 #else
1642 CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,NULL);
1643 if (useDynamicSMSG)
1644 CcdCallOnConditionKeep(CcdPERIODIC_10ms, (CcdVoidFn) PumpDatagramConnection, NULL);
1645 #endif
1647 #if ! LARGEPAGE
1648 if (_checkProgress)
1649 #if CMK_SMP
1650 if (CmiMyRank() == 0)
1651 #endif
1652 CcdCallOnConditionKeep(CcdPERIODIC_2minute, (CcdVoidFn) CheckProgress, NULL);
1653 #endif
1655 #if !LARGEPAGE
1656 CcdCallOnCondition(CcdTOPOLOGY_AVAIL, (CcdVoidFn)set_limit, NULL);
1657 #endif
1660 /* this is called by worker thread */
1661 void LrtsPostNonLocal(){
1662 #if CMK_SMP_TRACE_COMMTHREAD
1663 double startT, endT;
1664 #endif
1665 #if MULTI_THREAD_SEND
1666 if(mysize == 1) return;
1667 #if CMK_SMP_TRACE_COMMTHREAD
1668 traceEndIdle();
1669 #endif
1671 #if CMK_SMP_TRACE_COMMTHREAD
1672 startT = CmiWallTimer();
1673 #endif
1675 #if CMK_WORKER_SINGLE_TASK
1676 if (CmiMyRank() % 6 == 0)
1677 #endif
1678 PumpNetworkSmsg();
1680 #if CMK_WORKER_SINGLE_TASK
1681 if (CmiMyRank() % 6 == 1)
1682 #endif
1683 PumpLocalTransactions(default_tx_cqh, default_tx_cq_lock);
1685 #if CMK_WORKER_SINGLE_TASK
1686 if (CmiMyRank() % 6 == 2)
1687 #endif
1688 PumpLocalTransactions(rdma_tx_cqh, rdma_tx_cq_lock);
1690 #if REMOTE_EVENT
1691 #if CMK_WORKER_SINGLE_TASK
1692 if (CmiMyRank() % 6 == 3)
1693 #endif
1694 PumpRemoteTransactions();
1695 #endif
1697 #if CMK_USE_OOB
1698 if (SendBufferMsg(&smsg_oob_queue) == 1)
1699 #endif
1701 #if CMK_WORKER_SINGLE_TASK
1702 if (CmiMyRank() % 6 == 4)
1703 #endif
1704 SendBufferMsg(&smsg_queue);
1707 #if CMK_WORKER_SINGLE_TASK
1708 if (CmiMyRank() % 6 == 5)
1709 #endif
1710 SendRdmaMsg();
1712 #if CMK_SMP_TRACE_COMMTHREAD
1713 endT = CmiWallTimer();
1714 traceUserBracketEvent(event_AdvanceCommunication, startT, endT);
1715 #endif
1716 #if CMK_SMP_TRACE_COMMTHREAD
1717 traceBeginIdle();
1718 #endif
1719 #endif
1722 /* useDynamicSMSG */
1723 static void PumpDatagramConnection()
1725 uint32_t remote_address;
1726 uint32_t remote_id;
1727 gni_return_t status;
1728 gni_post_state_t post_state;
1729 uint64_t datagram_id;
1730 int i;
1732 while ((status = GNI_PostDataProbeById(nic_hndl, &datagram_id)) == GNI_RC_SUCCESS)
1734 if (datagram_id >= mysize) { /* bound endpoint */
1735 int pe = datagram_id - mysize;
1736 CMI_GNI_LOCK(global_gni_lock)
1737 status = GNI_EpPostDataTestById( ep_hndl_array[pe], datagram_id, &post_state, &remote_address, &remote_id);
1738 CMI_GNI_UNLOCK(global_gni_lock)
1739 if(status == GNI_RC_SUCCESS && post_state == GNI_POST_COMPLETED)
1741 CmiAssert(remote_id == pe);
1742 status = GNI_SmsgInit(ep_hndl_array[pe], smsg_attr_vector_local[pe], smsg_attr_vector_remote[pe]);
1743 GNI_RC_CHECK("Dynamic SMSG Init", status);
1744 #if PRINT_SYH
1745 printf("++ Dynamic SMSG setup [%d===>%d] done\n", myrank, pe);
1746 #endif
1747 CmiAssert(smsg_connected_flag[pe] == 1);
1748 smsg_connected_flag[pe] = 2;
1751 else { /* unbound ep */
1752 status = GNI_EpPostDataTestById( ep_hndl_unbound, datagram_id, &post_state, &remote_address, &remote_id);
1753 if(status == GNI_RC_SUCCESS && post_state == GNI_POST_COMPLETED)
1755 CmiAssert(remote_id<mysize);
1756 CmiAssert(smsg_connected_flag[remote_id] <= 0);
1757 status = GNI_SmsgInit(ep_hndl_array[remote_id], &send_smsg_attr, &recv_smsg_attr);
1758 GNI_RC_CHECK("Dynamic SMSG Init", status);
1759 #if PRINT_SYH
1760 printf("++ Dynamic SMSG setup2 [%d===>%d] done\n", myrank, remote_id);
1761 #endif
1762 smsg_connected_flag[remote_id] = 2;
1764 alloc_smsg_attr(&send_smsg_attr);
1765 status = GNI_EpPostDataWId (ep_hndl_unbound, &send_smsg_attr, SMSG_ATTR_SIZE, &recv_smsg_attr, SMSG_ATTR_SIZE, myrank);
1766 GNI_RC_CHECK("post unbound datagram", status);
1772 /* pooling CQ to receive network message */
1773 static void PumpNetworkRdmaMsgs()
1775 gni_cq_entry_t event_data;
1776 gni_return_t status;
1780 inline
1781 static void bufferRdmaMsg(int inst_id, gni_post_descriptor_t *pd, int ack_index)
1783 RDMA_REQUEST *rdma_request_msg;
1784 MallocRdmaRequest(rdma_request_msg);
1785 rdma_request_msg->destNode = inst_id;
1786 rdma_request_msg->pd = pd;
1787 #if REMOTE_EVENT
1788 rdma_request_msg->ack_index = ack_index;
1789 #endif
1790 #if CMK_SMP
1791 PCQueuePush(sendRdmaBuf, (char*)rdma_request_msg);
1792 #else
1793 if(sendRdmaBuf == 0)
1795 sendRdmaBuf = sendRdmaTail = rdma_request_msg;
1796 }else{
1797 sendRdmaTail->next = rdma_request_msg;
1798 sendRdmaTail = rdma_request_msg;
1800 #endif
1804 static void getLargeMsgRequest(void* header, uint64_t inst_id);
1806 static void PumpNetworkSmsg()
1808 uint64_t inst_id;
1809 int ret;
1810 gni_cq_entry_t event_data;
1811 gni_return_t status, status2;
1812 void *header;
1813 uint8_t msg_tag;
1814 int msg_nbytes;
1815 void *msg_data;
1816 gni_mem_handle_t msg_mem_hndl;
1817 gni_smsg_attr_t *smsg_attr;
1818 gni_smsg_attr_t *remote_smsg_attr;
1819 int init_flag;
1820 CONTROL_MSG *control_msg_tmp, *header_tmp;
1821 uint64_t source_addr;
1822 SMSG_QUEUE *queue = &smsg_queue;
1823 #if CMK_DIRECT
1824 cmidirectMsg *direct_msg;
1825 #endif
1826 while(1)
1828 CMI_GNI_LOCK(smsg_rx_cq_lock)
1829 status =GNI_CqGetEvent(smsg_rx_cqh, &event_data);
1830 CMI_GNI_UNLOCK(smsg_rx_cq_lock)
1831 if(status != GNI_RC_SUCCESS)
1832 break;
1833 inst_id = GNI_CQ_GET_INST_ID(event_data);
1834 #if REMOTE_EVENT
1835 inst_id = ACK_GET_RANK(inst_id);
1836 #endif
1837 // GetEvent returns success but GetNext return not_done. caused by Smsg out-of-order transfer
1838 #if PRINT_SYH
1839 printf("[%d] %d PumpNetworkMsgs is received from PE: %d, status=%s\n", myrank, CmiMyRank(), inst_id, gni_err_str[status]);
1840 #endif
1841 if (useDynamicSMSG) {
1842 /* subtle: smsg may come before connection is setup */
1843 while (smsg_connected_flag[inst_id] != 2)
1844 PumpDatagramConnection();
1846 msg_tag = GNI_SMSG_ANY_TAG;
1847 while(1) {
1848 CMI_GNI_LOCK(smsg_mailbox_lock)
1849 status = GNI_SmsgGetNextWTag(ep_hndl_array[inst_id], &header, &msg_tag);
1850 if (status != GNI_RC_SUCCESS)
1852 CMI_GNI_UNLOCK(smsg_mailbox_lock)
1853 break;
1855 #if PRINT_SYH
1856 printf("[%d] from %d request for Large msg is received, messageid: tag=%d\n", myrank, inst_id, msg_tag);
1857 #endif
1858 /* copy msg out and then put into queue (small message) */
1859 switch (msg_tag) {
1860 case SMALL_DATA_TAG:
1862 START_EVENT();
1863 msg_nbytes = CmiGetMsgSize(header);
1864 msg_data = CmiAlloc(msg_nbytes);
1865 memcpy(msg_data, (char*)header, msg_nbytes);
1866 GNI_SmsgRelease(ep_hndl_array[inst_id]);
1867 CMI_GNI_UNLOCK(smsg_mailbox_lock)
1868 TRACE_COMM_CREATION(CpvAccess(projTraceStart), msg_data);
1869 handleOneRecvedMsg(msg_nbytes, msg_data);
1870 break;
1872 case LMSG_INIT_TAG:
1874 #if MULTI_THREAD_SEND
1875 MallocControlMsg(control_msg_tmp);
1876 memcpy(control_msg_tmp, header, CONTROL_MSG_SIZE);
1877 GNI_SmsgRelease(ep_hndl_array[inst_id]);
1878 CMI_GNI_UNLOCK(smsg_mailbox_lock)
1879 getLargeMsgRequest(control_msg_tmp, inst_id);
1880 FreeControlMsg(control_msg_tmp);
1881 #else
1882 CMI_GNI_UNLOCK(smsg_mailbox_lock)
1883 getLargeMsgRequest(header, inst_id);
1884 GNI_SmsgRelease(ep_hndl_array[inst_id]);
1885 #endif
1886 break;
1888 case ACK_TAG: //msg fit into mempool
1890 /* Get is done, release message . Now put is not used yet*/
1891 void *msg = (void*)(((ACK_MSG *)header)->source_addr);
1892 GNI_SmsgRelease(ep_hndl_array[inst_id]);
1893 CMI_GNI_UNLOCK(smsg_mailbox_lock)
1894 #if ! USE_LRTS_MEMPOOL
1895 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((ACK_MSG *)header)->source_mem_hndl), &omdh, ((ACK_MSG *)header)->length);
1896 #else
1897 DecreaseMsgInSend(msg);
1898 #endif
1899 if(NoMsgInSend(msg))
1900 buffered_send_msg -= GetMempoolsize(msg);
1901 MACHSTATE5(8, "GO send done to %d (%d,%d, %d) tag=%d\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size, msg_tag);
1902 CmiFree(msg);
1903 break;
1905 case BIG_MSG_TAG: //big msg, de-register, transfer next seg
1907 #if MULTI_THREAD_SEND
1908 MallocControlMsg(header_tmp);
1909 memcpy(header_tmp, header, CONTROL_MSG_SIZE);
1910 GNI_SmsgRelease(ep_hndl_array[inst_id]);
1911 #else
1912 header_tmp = (CONTROL_MSG *) header;
1913 #endif
1914 CMI_GNI_UNLOCK(smsg_mailbox_lock)
1915 void *msg = (void*)(header_tmp->source_addr);
1916 int cur_seq = CmiGetMsgSeq(msg);
1917 int offset = ONE_SEG*(cur_seq+1);
1918 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(header_tmp->source_mem_hndl), &omdh, header_tmp->length);
1919 buffered_send_msg -= header_tmp->length;
1920 int remain_size = CmiGetMsgSize(msg) - header_tmp->length;
1921 if (remain_size < 0) remain_size = 0;
1922 CmiSetMsgSize(msg, remain_size);
1923 if(remain_size <= 0) //transaction done
1925 CmiFree(msg);
1926 }else if (header_tmp->total_length > offset)
1928 CmiSetMsgSeq(msg, cur_seq+1);
1929 control_msg_tmp = construct_control_msg(header_tmp->total_length, msg, cur_seq+1+1);
1930 control_msg_tmp->dest_addr = header_tmp->dest_addr;
1931 //send next seg
1932 send_large_messages(queue, inst_id, control_msg_tmp, 0);
1933 // pipelining
1934 if (header_tmp->seq_id == 1) {
1935 int i;
1936 for (i=1; i<BIG_MSG_PIPELINE; i++) {
1937 int seq = cur_seq+i+2;
1938 CmiSetMsgSeq(msg, seq-1);
1939 control_msg_tmp = construct_control_msg(header_tmp->total_length, (char *)msg, seq);
1940 control_msg_tmp->dest_addr = header_tmp->dest_addr;
1941 send_large_messages(queue, inst_id, control_msg_tmp, 0);
1942 if (header_tmp->total_length <= ONE_SEG*seq) break;
1946 #if MULTI_THREAD_SEND
1947 FreeControlMsg(header_tmp);
1948 #else
1949 GNI_SmsgRelease(ep_hndl_array[inst_id]);
1950 #endif
1951 break;
1953 #if CMK_PERSISTENT_COMM
1954 case PUT_DONE_TAG: //persistent message
1955 void *msg = (void *)((CONTROL_MSG *) header)->source_addr;
1956 int size = ((CONTROL_MSG *) header)->length;
1957 GNI_SmsgRelease(ep_hndl_array[inst_id]);
1958 CMI_GNI_UNLOCK(smsg_mailbox_lock)
1959 CmiReference(msg);
1960 handleOneRecvedMsg(size, msg);
1961 break;
1962 #endif
1963 #if CMK_DIRECT
1964 case DIRECT_PUT_DONE_TAG: //cmi direct
1965 //create a trigger message
1966 direct_msg = (cmidirectMsg*)CmiAlloc(sizeof(cmidirectMsg));
1967 direct_msg->handler = ((CMK_DIRECT_HEADER*)header)->handler_addr;
1968 GNI_SmsgRelease(ep_hndl_array[inst_id]);
1969 CMI_GNI_UNLOCK(smsg_mailbox_lock)
1970 CmiSetHandler(direct_msg, CpvAccess(CmiHandleDirectIdx));
1971 CmiPushPE(((CmiDirectUserHandle*)direct_msg->handler)->remoteRank, direct_msg);
1972 //(*(((CMK_DIRECT_HEADER*) header)->callbackFnPtr))(((CMK_DIRECT_HEADER*) header)->callbackData);
1973 break;
1974 #endif
1975 default: {
1976 GNI_SmsgRelease(ep_hndl_array[inst_id]);
1977 CMI_GNI_UNLOCK(smsg_mailbox_lock)
1978 printf("weird tag problem\n");
1979 CmiAbort("Unknown tag\n");
1981 } // end switch
1982 #if PRINT_SYH
1983 printf("[%d] from %d after switch request for Large msg is received, messageid: tag=%d\n", myrank, inst_id, msg_tag);
1984 #endif
1985 smsg_recv_count ++;
1986 msg_tag = GNI_SMSG_ANY_TAG;
1987 } //endwhile getNext
1988 } //end while GetEvent
1989 if(status == GNI_RC_ERROR_RESOURCE)
1991 printf("charm> Please use +useRecvQueue 204800 in your command line, if the error comes again, increase this number\n");
1992 GNI_RC_CHECK("Smsg_rx_cq full", status);
1996 static void printDesc(gni_post_descriptor_t *pd)
1998 printf(" Descriptor (%p===>%p)(%d)\n", pd->local_addr, pd->remote_addr, pd->length);
2001 #if CQWRITE
2002 static void sendCqWrite(int destNode, uint64_t data, gni_mem_handle_t mem_hndl)
2004 gni_post_descriptor_t *pd;
2005 gni_return_t status = GNI_RC_SUCCESS;
2007 MallocPostDesc(pd);
2008 pd->type = GNI_POST_CQWRITE;
2009 pd->cq_mode = GNI_CQMODE_GLOBAL_EVENT | GNI_CQMODE_REMOTE_EVENT ;
2010 pd->dlvr_mode = GNI_DLVMODE_PERFORMANCE;
2011 pd->cqwrite_value = data;
2012 pd->remote_mem_hndl = mem_hndl;
2013 status = GNI_PostCqWrite(ep_hndl_array[destNode], pd);
2014 GNI_RC_CHECK("GNI_PostCqWrite", status);
2016 #endif
2018 // for BIG_MSG called on receiver side for receiving control message
2019 // LMSG_INIT_TAG
2020 static void getLargeMsgRequest(void* header, uint64_t inst_id )
2022 #if USE_LRTS_MEMPOOL
2023 CONTROL_MSG *request_msg;
2024 gni_return_t status = GNI_RC_SUCCESS;
2025 void *msg_data;
2026 gni_post_descriptor_t *pd;
2027 gni_mem_handle_t msg_mem_hndl;
2028 int source, size, transaction_size, offset = 0;
2029 size_t register_size = 0;
2031 // initial a get to transfer data from the sender side */
2032 request_msg = (CONTROL_MSG *) header;
2033 size = request_msg->total_length;
2034 MACHSTATE4(8, "GO Get request from %d (%d,%d, %d) \n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size);
2035 MallocPostDesc(pd);
2036 if(request_msg->seq_id < 2) {
2037 msg_data = CmiAlloc(size);
2038 CmiSetMsgSeq(msg_data, 0);
2039 _MEMCHECK(msg_data);
2040 #if CMK_SMP_TRACE_COMMTHREAD
2041 pd->second_operand = 1000000 * CmiWallTimer(); //microsecond
2042 #endif
2044 else {
2045 offset = ONE_SEG*(request_msg->seq_id-1);
2046 msg_data = (char*)request_msg->dest_addr + offset;
2049 pd->cqwrite_value = request_msg->seq_id;
2050 if( request_msg->seq_id == 0)
2052 pd->local_mem_hndl= GetMemHndl(msg_data);
2053 transaction_size = ALIGN64(size);
2054 if(IsMemHndlZero(pd->local_mem_hndl))
2056 status = registerMemory( GetMempoolBlockPtr(msg_data), GetMempoolsize(msg_data), &(GetMemHndl(msg_data)), rdma_rx_cqh);
2057 if(status == GNI_RC_SUCCESS)
2059 pd->local_mem_hndl = GetMemHndl(msg_data);
2061 else
2063 SetMemHndlZero(pd->local_mem_hndl);
2066 if(NoMsgInRecv( (void*)(msg_data)))
2067 register_size = GetMempoolsize((void*)(msg_data));
2068 else
2069 register_size = 0;
2071 else{
2072 transaction_size = ALIGN64(request_msg->length);
2073 status = registerMemory(msg_data, transaction_size, &(pd->local_mem_hndl), NULL);
2074 if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
2076 GNI_RC_CHECK("Invalid/permission Mem Register in post", status);
2079 pd->first_operand = ALIGN64(size); // total length
2081 if(request_msg->total_length <= LRTS_GNI_RDMA_THRESHOLD)
2082 pd->type = GNI_POST_FMA_GET;
2083 else
2084 pd->type = GNI_POST_RDMA_GET;
2085 pd->cq_mode = GNI_CQMODE_GLOBAL_EVENT;
2086 pd->dlvr_mode = GNI_DLVMODE_PERFORMANCE;
2087 pd->length = transaction_size;
2088 pd->local_addr = (uint64_t) msg_data;
2089 pd->remote_addr = request_msg->source_addr + offset;
2090 pd->remote_mem_hndl = request_msg->source_mem_hndl;
2091 pd->src_cq_hndl = rdma_tx_cqh;
2092 pd->rdma_mode = 0;
2093 pd->amo_cmd = 0;
2095 //memory registration success
2096 if(status == GNI_RC_SUCCESS)
2098 CmiNodeLock lock = pd->type == GNI_POST_RDMA_GET?rdma_tx_cq_lock:default_tx_cq_lock;
2099 CMI_GNI_LOCK(lock)
2100 #if REMOTE_EVENT
2101 if( request_msg->seq_id == 0)
2103 pd->cq_mode |= GNI_CQMODE_REMOTE_EVENT;
2104 int sts = GNI_EpSetEventData(ep_hndl_array[inst_id], inst_id, ACK_EVENT(request_msg->ack_index));
2105 GNI_RC_CHECK("GNI_EpSetEventData", sts);
2107 else {
2108 int sts = GNI_EpSetEventData(ep_hndl_array[inst_id], inst_id, myrank);
2109 GNI_RC_CHECK("GNI_EpSetEventData", sts);
2111 #endif
2112 if(pd->type == GNI_POST_RDMA_GET)
2114 status = GNI_PostRdma(ep_hndl_array[inst_id], pd);
2116 else
2118 status = GNI_PostFma(ep_hndl_array[inst_id], pd);
2120 CMI_GNI_UNLOCK(lock)
2122 if(status == GNI_RC_SUCCESS )
2124 if(pd->cqwrite_value == 0)
2126 #if MACHINE_DEBUG_LOG
2127 buffered_recv_msg += register_size;
2128 MACHSTATE4(8, "GO request from %d (%d,%d, %d)\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size);
2129 #endif
2130 IncreaseMsgInRecv(msg_data);
2133 }else
2135 SetMemHndlZero((pd->local_mem_hndl));
2137 if(status == GNI_RC_ERROR_RESOURCE|| status == GNI_RC_ERROR_NOMEM )
2139 #if REMOTE_EVENT
2140 bufferRdmaMsg(inst_id, pd, request_msg->ack_index);
2141 #else
2142 bufferRdmaMsg(inst_id, pd, -1);
2143 #endif
2144 }else {
2145 //printf("source: %d pd:(%p,%p)(%p,%p)\n", source, (pd->local_mem_hndl).qword1, (pd->local_mem_hndl).qword2, (pd->remote_mem_hndl).qword1, (pd->remote_mem_hndl).qword2);
2146 GNI_RC_CHECK("GetLargeAFter posting", status);
2148 #else
2149 CONTROL_MSG *request_msg;
2150 gni_return_t status;
2151 void *msg_data;
2152 gni_post_descriptor_t *pd;
2153 RDMA_REQUEST *rdma_request_msg;
2154 gni_mem_handle_t msg_mem_hndl;
2155 //int source;
2156 // initial a get to transfer data from the sender side */
2157 request_msg = (CONTROL_MSG *) header;
2158 msg_data = CmiAlloc(request_msg->length);
2159 _MEMCHECK(msg_data);
2161 MEMORY_REGISTER(onesided_hnd, nic_hndl, msg_data, request_msg->length, &msg_mem_hndl, &omdh, NULL, status)
2163 if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
2165 GNI_RC_CHECK("Invalid/permission Mem Register in post", status);
2168 MallocPostDesc(pd);
2169 if(request_msg->length <= LRTS_GNI_RDMA_THRESHOLD)
2170 pd->type = GNI_POST_FMA_GET;
2171 else
2172 pd->type = GNI_POST_RDMA_GET;
2173 pd->cq_mode = GNI_CQMODE_GLOBAL_EVENT;// | GNI_CQMODE_REMOTE_EVENT;
2174 pd->dlvr_mode = GNI_DLVMODE_PERFORMANCE;
2175 pd->length = ALIGN64(request_msg->length);
2176 pd->local_addr = (uint64_t) msg_data;
2177 pd->remote_addr = request_msg->source_addr;
2178 pd->remote_mem_hndl = request_msg->source_mem_hndl;
2179 pd->src_cq_hndl = rdma_tx_cqh;
2180 pd->rdma_mode = 0;
2181 pd->amo_cmd = 0;
2183 //memory registration successful
2184 if(status == GNI_RC_SUCCESS)
2186 pd->local_mem_hndl = msg_mem_hndl;
2188 if(pd->type == GNI_POST_RDMA_GET)
2190 CMI_GNI_LOCK(rdma_tx_cq_lock)
2191 status = GNI_PostRdma(ep_hndl_array[inst_id], pd);
2192 CMI_GNI_UNLOCK(rdma_tx_cq_lock)
2194 else
2196 CMI_GNI_LOCK(default_tx_cq_lock)
2197 status = GNI_PostFma(ep_hndl_array[inst_id], pd);
2198 CMI_GNI_UNLOCK(default_tx_cq_lock)
2201 }else
2203 SetMemHndlZero(pd->local_mem_hndl);
2205 if(status == GNI_RC_ERROR_RESOURCE|| status == GNI_RC_ERROR_NOMEM )
2207 MallocRdmaRequest(rdma_request_msg);
2208 rdma_request_msg->next = 0;
2209 rdma_request_msg->destNode = inst_id;
2210 rdma_request_msg->pd = pd;
2211 PCQueuePush(sendRdmaBuf, (char*)rdma_request_msg);
2212 }else {
2213 GNI_RC_CHECK("AFter posting", status);
2215 #endif
2218 #if CQWRITE
2219 static void PumpCqWriteTransactions()
2222 gni_cq_entry_t ev;
2223 gni_return_t status;
2224 void *msg;
2225 while(1) {
2226 //CMI_GNI_LOCK(my_cq_lock)
2227 status = GNI_CqGetEvent(rdma_rx_cqh, &ev);
2228 //CMI_GNI_UNLOCK(my_cq_lock)
2229 if(status != GNI_RC_SUCCESS) break;
2230 msg = (void*) ( GNI_CQ_GET_DATA(ev) & 0xFFFFFFFFFFFFL);
2232 DecreaseMsgInSend(msg);
2233 #if ! USE_LRTS_MEMPOOL
2234 // MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((ACK_MSG *)header)->source_mem_hndl), &omdh, ((ACK_MSG *)header)->length);
2235 #else
2236 DecreaseMsgInSend(msg);
2237 #endif
2238 if(NoMsgInSend(msg))
2239 buffered_send_msg -= GetMempoolsize(msg);
2240 CmiFree(msg);
2242 if(status == GNI_RC_ERROR_RESOURCE)
2244 GNI_RC_CHECK("rdma_rx_cq full too many ack", status);
2247 #endif
2249 #if REMOTE_EVENT
2250 static void PumpRemoteTransactions()
2252 gni_cq_entry_t ev;
2253 gni_return_t status;
2254 void *msg;
2255 int slot;
2257 while(1) {
2258 CMI_GNI_LOCK(global_gni_lock)
2259 status = GNI_CqGetEvent(rdma_rx_cqh, &ev);
2260 CMI_GNI_UNLOCK(global_gni_lock)
2261 if(status != GNI_RC_SUCCESS) {
2262 break;
2265 slot = GNI_CQ_GET_INST_ID(ev);
2266 slot = ACK_GET_INDEX(slot);
2267 //slot = GNI_CQ_GET_DATA(ev) & 0xFFFFFFFFL;
2269 //CMI_GNI_LOCK(ackpool_lock);
2270 msg = GetAckAddress(slot);
2271 //CMI_GNI_UNLOCK(ackpool_lock);
2273 DecreaseMsgInSend(msg);
2274 #if ! USE_LRTS_MEMPOOL
2275 // MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((ACK_MSG *)header)->source_mem_hndl), &omdh, ((ACK_MSG *)header)->length);
2276 #else
2277 DecreaseMsgInSend(msg);
2278 #endif
2279 if(NoMsgInSend(msg))
2280 buffered_send_msg -= GetMempoolsize(msg);
2281 CmiFree(msg);
2282 AckPool_freeslot(slot);
2284 if(status == GNI_RC_ERROR_RESOURCE)
2286 GNI_RC_CHECK("rdma_rx_cq full too many ack", status);
2289 #endif
2291 static void PumpLocalTransactions(gni_cq_handle_t my_tx_cqh, CmiNodeLock my_cq_lock)
2293 gni_cq_entry_t ev;
2294 gni_return_t status;
2295 uint64_t type, inst_id;
2296 gni_post_descriptor_t *tmp_pd;
2297 MSG_LIST *ptr;
2298 CONTROL_MSG *ack_msg_tmp;
2299 ACK_MSG *ack_msg;
2300 uint8_t msg_tag;
2301 #if CMK_DIRECT
2302 CMK_DIRECT_HEADER *cmk_direct_done_msg;
2303 #endif
2304 SMSG_QUEUE *queue = &smsg_queue;
2306 while(1) {
2307 CMI_GNI_LOCK(my_cq_lock)
2308 status = GNI_CqGetEvent(my_tx_cqh, &ev);
2309 CMI_GNI_UNLOCK(my_cq_lock)
2310 if(status != GNI_RC_SUCCESS) break;
2312 type = GNI_CQ_GET_TYPE(ev);
2313 if (type == GNI_CQ_EVENT_TYPE_POST)
2315 inst_id = GNI_CQ_GET_INST_ID(ev);
2316 #if PRINT_SYH
2317 printf("[%d] LocalTransactions localdone=%d\n", myrank, lrts_local_done_msg);
2318 #endif
2319 CMI_GNI_LOCK(my_cq_lock)
2320 status = GNI_GetCompleted(my_tx_cqh, ev, &tmp_pd);
2321 CMI_GNI_UNLOCK(my_cq_lock)
2323 switch (tmp_pd->type) {
2324 #if CMK_PERSISTENT_COMM || CMK_DIRECT
2325 case GNI_POST_RDMA_PUT:
2326 #if CMK_PERSISTENT_COMM && ! USE_LRTS_MEMPOOL
2327 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length);
2328 #endif
2329 case GNI_POST_FMA_PUT:
2330 if(tmp_pd->amo_cmd == 1) {
2331 //sender ACK to receiver to trigger it is done
2332 cmk_direct_done_msg = (CMK_DIRECT_HEADER*) malloc(sizeof(CMK_DIRECT_HEADER));
2333 cmk_direct_done_msg->handler_addr = tmp_pd->first_operand;
2334 msg_tag = DIRECT_PUT_DONE_TAG;
2336 else {
2337 CmiFree((void *)tmp_pd->local_addr);
2338 MallocControlMsg(ack_msg_tmp);
2339 ack_msg_tmp->source_addr = tmp_pd->remote_addr;
2340 ack_msg_tmp->source_mem_hndl = tmp_pd->remote_mem_hndl;
2341 msg_tag = PUT_DONE_TAG;
2343 break;
2344 #endif
2345 case GNI_POST_RDMA_GET:
2346 case GNI_POST_FMA_GET: {
2347 #if ! USE_LRTS_MEMPOOL
2348 MallocControlMsg(ack_msg_tmp);
2349 ack_msg_tmp->source_addr = tmp_pd->remote_addr;
2350 ack_msg_tmp->source_mem_hndl = tmp_pd->remote_mem_hndl;
2351 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length)
2352 msg_tag = ACK_TAG;
2353 #else
2354 int seq_id = tmp_pd->cqwrite_value;
2355 if(seq_id > 0) // BIG_MSG
2357 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length);
2358 MallocControlMsg(ack_msg_tmp);
2359 ack_msg_tmp->source_addr = tmp_pd->remote_addr;
2360 ack_msg_tmp->source_mem_hndl = tmp_pd->remote_mem_hndl;
2361 ack_msg_tmp->seq_id = seq_id;
2362 ack_msg_tmp->dest_addr = tmp_pd->local_addr - ONE_SEG*(ack_msg_tmp->seq_id-1);
2363 ack_msg_tmp->source_addr -= ONE_SEG*(ack_msg_tmp->seq_id-1);
2364 ack_msg_tmp->length = tmp_pd->length;
2365 ack_msg_tmp->total_length = tmp_pd->first_operand; // total size
2366 msg_tag = BIG_MSG_TAG;
2368 else
2370 msg_tag = ACK_TAG;
2371 #if !REMOTE_EVENT && !CQWRITE
2372 MallocAckMsg(ack_msg);
2373 ack_msg->source_addr = tmp_pd->remote_addr;
2374 #endif
2376 #endif
2377 break;
2379 case GNI_POST_CQWRITE:
2380 FreePostDesc(tmp_pd);
2381 continue;
2382 default:
2383 CmiPrintf("type=%d\n", tmp_pd->type);
2384 CmiAbort("PumpLocalTransactions: unknown type!");
2385 } /* end of switch */
2387 #if CMK_DIRECT
2388 if (tmp_pd->amo_cmd == 1) {
2389 status = send_smsg_message(queue, inst_id, cmk_direct_done_msg, sizeof(CMK_DIRECT_HEADER), msg_tag, 0);
2390 if (status == GNI_RC_SUCCESS) free(cmk_direct_done_msg);
2392 else
2393 #endif
2394 if (msg_tag == ACK_TAG) {
2395 #if !REMOTE_EVENT
2396 #if !CQWRITE
2397 status = send_smsg_message(queue, inst_id, ack_msg, ACK_MSG_SIZE, msg_tag, 0);
2398 if (status == GNI_RC_SUCCESS) FreeAckMsg(ack_msg);
2399 #else
2400 sendCqWrite(inst_id, tmp_pd->remote_addr, tmp_pd->remote_mem_hndl);
2401 #endif
2402 #endif
2404 else {
2405 status = send_smsg_message(queue, inst_id, ack_msg_tmp, CONTROL_MSG_SIZE, msg_tag, 0);
2406 if (status == GNI_RC_SUCCESS) FreeControlMsg(ack_msg_tmp);
2408 #if CMK_PERSISTENT_COMM
2409 if (tmp_pd->type == GNI_POST_RDMA_GET || tmp_pd->type == GNI_POST_FMA_GET)
2410 #endif
2412 if( msg_tag == ACK_TAG){ //msg fit in mempool
2413 #if PRINT_SYH
2414 printf("Normal msg transaction PE:%d==>%d\n", myrank, inst_id);
2415 #endif
2416 TRACE_COMM_CONTROL_CREATION((double)(tmp_pd->second_operand/1000000.0), (double)((tmp_pd->second_operand+1)/1000000.0), (double)((tmp_pd->second_operand+1)/1000000.0), (void*)tmp_pd->local_addr);
2418 START_EVENT();
2419 CmiAssert(SIZEFIELD((void*)(tmp_pd->local_addr)) <= tmp_pd->length);
2420 DecreaseMsgInRecv((void*)tmp_pd->local_addr);
2421 #if MACHINE_DEBUG_LOG
2422 if(NoMsgInRecv((void*)(tmp_pd->local_addr)))
2423 buffered_recv_msg -= GetMempoolsize((void*)(tmp_pd->local_addr));
2424 MACHSTATE5(8, "GO Recv done ack send from %d (%d,%d, %d) tag=%d\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size, msg_tag);
2425 #endif
2426 TRACE_COMM_CREATION(CpvAccess(projTraceStart), (void*)tmp_pd->local_addr);
2427 handleOneRecvedMsg(tmp_pd->length, (void*)tmp_pd->local_addr);
2428 }else if(msg_tag == BIG_MSG_TAG){
2429 void *msg = (char*)tmp_pd->local_addr-(tmp_pd->cqwrite_value-1)*ONE_SEG;
2430 CmiSetMsgSeq(msg, CmiGetMsgSeq(msg)+1);
2431 if (tmp_pd->first_operand <= ONE_SEG*CmiGetMsgSeq(msg)) {
2432 START_EVENT();
2433 #if PRINT_SYH
2434 printf("Pipeline msg done [%d]\n", myrank);
2435 #endif
2436 #if CMK_SMP_TRACE_COMMTHREAD
2437 if( tmp_pd->cqwrite_value == 1)
2438 TRACE_COMM_CONTROL_CREATION((double)(tmp_pd->second_operand/1000000.0), (double)((tmp_pd->second_operand+1)/1000000.0), (double)((tmp_pd->second_operand+2)/1000000.0), (void*)tmp_pd->local_addr);
2439 #endif
2440 TRACE_COMM_CREATION(CpvAccess(projTraceStart), msg);
2441 handleOneRecvedMsg(tmp_pd->first_operand, msg);
2445 FreePostDesc(tmp_pd);
2447 } //end while
2448 if(status == GNI_RC_ERROR_RESOURCE)
2450 printf("charm> Please use +useSendQueue 204800 in your command line, if the error comes again, increase this number\n");
2451 GNI_RC_CHECK("Smsg_tx_cq full", status);
2455 static void SendRdmaMsg()
2457 gni_return_t status = GNI_RC_SUCCESS;
2458 gni_mem_handle_t msg_mem_hndl;
2459 RDMA_REQUEST *ptr = 0, *tmp_ptr;
2460 RDMA_REQUEST *pre = 0;
2461 uint64_t register_size = 0;
2462 void *msg;
2463 int i;
2464 #if CMK_SMP
2465 int len = PCQueueLength(sendRdmaBuf);
2466 for (i=0; i<len; i++)
2468 CMI_PCQUEUEPOP_LOCK(sendRdmaBuf)
2469 ptr = (RDMA_REQUEST*)PCQueuePop(sendRdmaBuf);
2470 CMI_PCQUEUEPOP_UNLOCK(sendRdmaBuf)
2471 if (ptr == NULL) break;
2472 #else
2473 ptr = sendRdmaBuf;
2474 while (ptr!=0)
2476 #endif
2477 MACHSTATE4(8, "noempty-rdma %d (%lld,%lld,%d) \n", ptr->destNode, buffered_send_msg, buffered_recv_msg, register_memory_size);
2478 gni_post_descriptor_t *pd = ptr->pd;
2479 status = GNI_RC_SUCCESS;
2481 if(pd->cqwrite_value == 0)
2483 if(IsMemHndlZero((GetMemHndl(pd->local_addr))))
2485 msg = (void*)(pd->local_addr);
2486 status = registerMemory(GetMempoolBlockPtr(msg), GetMempoolsize(msg), &(GetMemHndl(msg)), rdma_rx_cqh);
2487 if(status == GNI_RC_SUCCESS)
2489 pd->local_mem_hndl = GetMemHndl((void*)(pd->local_addr));
2491 }else
2493 pd->local_mem_hndl = GetMemHndl((void*)(pd->local_addr));
2495 if(NoMsgInRecv( (void*)(pd->local_addr)))
2496 register_size = GetMempoolsize((void*)(pd->local_addr));
2497 else
2498 register_size = 0;
2499 }else if( IsMemHndlZero(pd->local_mem_hndl)) //big msg, can not fit into memory pool, or CmiDirect Msg (which is not from mempool)
2501 status = registerMemory((void*)(pd->local_addr), pd->length, &(pd->local_mem_hndl), NULL);
2503 if(status == GNI_RC_SUCCESS) //mem register good
2505 CmiNodeLock lock = pd->type == GNI_POST_RDMA_GET? rdma_tx_cq_lock:default_tx_cq_lock;
2506 CMI_GNI_LOCK(lock);
2507 #if REMOTE_EVENT
2508 if( pd->cqwrite_value == 0)
2510 pd->cq_mode |= GNI_CQMODE_REMOTE_EVENT;
2511 int sts = GNI_EpSetEventData(ep_hndl_array[ptr->destNode], ptr->destNode, ACK_EVENT(ptr->ack_index));
2512 GNI_RC_CHECK("GNI_EpSetEventData", sts);
2514 else {
2515 int sts = GNI_EpSetEventData(ep_hndl_array[ptr->destNode], ptr->destNode, myrank);
2516 GNI_RC_CHECK("GNI_EpSetEventData", sts);
2518 #endif
2519 if(pd->type == GNI_POST_RDMA_GET)
2521 status = GNI_PostRdma(ep_hndl_array[ptr->destNode], pd);
2523 else
2525 status = GNI_PostFma(ep_hndl_array[ptr->destNode], pd);
2527 CMI_GNI_UNLOCK(lock);
2529 if(status == GNI_RC_SUCCESS) //post good
2531 #if !CMK_SMP
2532 tmp_ptr = ptr;
2533 if(pre != 0) {
2534 pre->next = ptr->next;
2536 else {
2537 sendRdmaBuf = ptr->next;
2539 ptr = ptr->next;
2540 FreeRdmaRequest(tmp_ptr);
2541 #endif
2542 if(pd->cqwrite_value == 0)
2544 IncreaseMsgInRecv(((void*)(pd->local_addr)));
2546 #if MACHINE_DEBUG_LOG
2547 buffered_recv_msg += register_size;
2548 MACHSTATE(8, "GO request from buffered\n");
2549 #endif
2550 }else // cannot post
2552 #if CMK_SMP
2553 PCQueuePush(sendRdmaBuf, (char*)ptr);
2554 #else
2555 pre = ptr;
2556 ptr = ptr->next;
2557 #endif
2558 break;
2560 } else //memory registration fails
2562 #if CMK_SMP
2563 PCQueuePush(sendRdmaBuf, (char*)ptr);
2564 #else
2565 pre = ptr;
2566 ptr = ptr->next;
2567 #endif
2569 } //end while
2570 #if ! CMK_SMP
2571 if(ptr == 0)
2572 sendRdmaTail = pre;
2573 #endif
2576 // return 1 if all messages are sent
2577 static int SendBufferMsg(SMSG_QUEUE *queue)
2579 MSG_LIST *ptr, *tmp_ptr, *pre=0, *current_head;
2580 CONTROL_MSG *control_msg_tmp;
2581 gni_return_t status;
2582 int done = 1;
2583 uint64_t register_size;
2584 void *register_addr;
2585 int index_previous = -1;
2586 #if CMI_EXERT_SEND_CAP
2587 int sent_cnt = 0;
2588 #endif
2590 #if CMK_SMP
2591 int index = 0;
2592 #if ONE_SEND_QUEUE
2593 memset(destpe_avail, 0, mysize * sizeof(char));
2594 for (index=0; index<1; index++)
2596 int i, len = PCQueueLength(queue->sendMsgBuf);
2597 for (i=0; i<len; i++)
2599 CMI_PCQUEUEPOP_LOCK(queue->sendMsgBuf)
2600 ptr = (MSG_LIST*)PCQueuePop(queue->sendMsgBuf);
2601 CMI_PCQUEUEPOP_UNLOCK(queue->sendMsgBuf)
2602 if(ptr == NULL) break;
2603 if (destpe_avail[ptr->destNode] == 1) { /* can't send to this pe */
2604 PCQueuePush(queue->sendMsgBuf, (char*)ptr);
2605 continue;
2607 #else
2608 #if SMP_LOCKS
2609 int nonempty = PCQueueLength(nonEmptyQueues);
2610 for(index =0; index<nonempty; index++)
2612 CMI_PCQUEUEPOP_LOCK(nonEmptyQueues)
2613 MSG_LIST_INDEX *current_list = (MSG_LIST_INDEX *)PCQueuePop(nonEmptyQueues);
2614 CMI_PCQUEUEPOP_UNLOCK(nonEmptyQueues)
2615 if(current_list == NULL) break;
2616 PCQueue current_queue= current_list-> sendSmsgBuf;
2617 CmiLock(current_list->lock);
2618 int i, len = PCQueueLength(current_queue);
2619 current_list->pushed = 0;
2620 CmiUnlock(current_list->lock);
2621 #else
2622 for(index =0; index<mysize; index++)
2624 PCQueue current_queue = queue->smsg_msglist_index[index].sendSmsgBuf;
2625 int i, len = PCQueueLength(current_queue);
2626 #endif
2627 for (i=0; i<len; i++)
2629 CMI_PCQUEUEPOP_LOCK(current_queue)
2630 ptr = (MSG_LIST*)PCQueuePop(current_queue);
2631 CMI_PCQUEUEPOP_UNLOCK(current_queue)
2632 if (ptr == 0) break;
2633 #endif
2634 #else
2635 int index = queue->smsg_head_index;
2636 while(index != -1)
2638 ptr = queue->smsg_msglist_index[index].sendSmsgBuf;
2639 pre = 0;
2640 while(ptr != 0)
2642 #endif
2643 MACHSTATE5(8, "noempty-smsg %d (%d,%d,%d) tag=%d \n", ptr->destNode, buffered_send_msg, buffered_recv_msg, register_memory_size, ptr->tag);
2644 status = GNI_RC_ERROR_RESOURCE;
2645 if (useDynamicSMSG && smsg_connected_flag[index] != 2) {
2646 /* connection not exists yet */
2648 else
2649 switch(ptr->tag)
2651 case SMALL_DATA_TAG:
2652 status = send_smsg_message(queue, ptr->destNode, ptr->msg, ptr->size, ptr->tag, 1);
2653 if(status == GNI_RC_SUCCESS)
2655 CmiFree(ptr->msg);
2657 break;
2658 case LMSG_INIT_TAG:
2659 control_msg_tmp = (CONTROL_MSG*)ptr->msg;
2660 status = send_large_messages(queue, ptr->destNode, control_msg_tmp, 1);
2661 break;
2662 case ACK_TAG:
2663 status = send_smsg_message(queue, ptr->destNode, ptr->msg, ptr->size, ptr->tag, 1);
2664 if(status == GNI_RC_SUCCESS) FreeAckMsg((ACK_MSG*)ptr->msg);
2665 break;
2666 case BIG_MSG_TAG:
2667 status = send_smsg_message(queue, ptr->destNode, ptr->msg, ptr->size, ptr->tag, 1);
2668 if(status == GNI_RC_SUCCESS)
2670 FreeControlMsg((CONTROL_MSG*)ptr->msg);
2672 break;
2673 #if CMK_DIRECT
2674 case DIRECT_PUT_DONE_TAG:
2675 status = send_smsg_message(queue, ptr->destNode, ptr->msg, sizeof(CMK_DIRECT_HEADER), ptr->tag, 1);
2676 if(status == GNI_RC_SUCCESS)
2678 free((CMK_DIRECT_HEADER*)ptr->msg);
2680 break;
2682 #endif
2683 default:
2684 printf("Weird tag\n");
2685 CmiAbort("should not happen\n");
2686 } // end switch
2687 if(status == GNI_RC_SUCCESS)
2689 #if PRINT_SYH
2690 buffered_smsg_counter--;
2691 printf("[%d==>%d] buffered smsg sending done\n", myrank, ptr->destNode);
2692 #endif
2693 #if !CMK_SMP
2694 tmp_ptr = ptr;
2695 if(pre)
2697 ptr = pre ->next = ptr->next;
2698 }else
2700 ptr = queue->smsg_msglist_index[index].sendSmsgBuf = queue->smsg_msglist_index[index].sendSmsgBuf->next;
2702 FreeMsgList(tmp_ptr);
2703 #else
2704 FreeMsgList(ptr);
2705 #endif
2706 #if CMI_EXERT_SEND_CAP
2707 sent_cnt++;
2708 if(sent_cnt == SEND_CAP)
2709 break;
2710 #endif
2711 }else {
2712 #if CMK_SMP
2713 #if ONE_SEND_QUEUE
2714 PCQueuePush(queue->sendMsgBuf, (char*)ptr);
2715 #else
2716 PCQueuePush(current_queue, (char*)ptr);
2717 #endif
2718 #else
2719 pre = ptr;
2720 ptr=ptr->next;
2721 #endif
2722 done = 0;
2723 if(status == GNI_RC_ERROR_RESOURCE)
2725 #if CMK_SMP && ONE_SEND_QUEUE
2726 destpe_avail[ptr->destNode] = 1;
2727 #else
2728 break;
2729 #endif
2732 } //end while
2733 #if !CMK_SMP
2734 if(ptr == 0)
2735 queue->smsg_msglist_index[index].tail = pre;
2736 if(queue->smsg_msglist_index[index].sendSmsgBuf == 0)
2738 if(index_previous != -1)
2739 queue->smsg_msglist_index[index_previous].next = queue->smsg_msglist_index[index].next;
2740 else
2741 queue->smsg_head_index = queue->smsg_msglist_index[index].next;
2743 else {
2744 index_previous = index;
2746 index = queue->smsg_msglist_index[index].next;
2747 #else
2748 #if !ONE_SEND_QUEUE && SMP_LOCKS
2749 CmiLock(current_list->lock);
2750 if(!PCQueueEmpty(current_queue) && current_list->pushed == 0)
2752 current_list->pushed = 1;
2753 PCQueuePush(nonEmptyQueues, current_list);
2755 CmiUnlock(current_list->lock);
2756 #endif
2757 #endif
2759 #if CMI_EXERT_SEND_CAP
2760 if(sent_cnt == SEND_CAP)
2761 break;
2762 #endif
2763 } // end pooling for all cores
2764 return done;
2767 static void ProcessDeadlock();
2768 void LrtsAdvanceCommunication(int whileidle)
2770 static int count = 0;
2771 /* Receive Msg first */
2772 #if CMK_SMP_TRACE_COMMTHREAD
2773 double startT, endT;
2774 #endif
2775 if (useDynamicSMSG && whileidle)
2777 #if CMK_SMP_TRACE_COMMTHREAD
2778 startT = CmiWallTimer();
2779 #endif
2780 PumpDatagramConnection();
2781 #if CMK_SMP_TRACE_COMMTHREAD
2782 endT = CmiWallTimer();
2783 if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_SetupConnect, startT, endT);
2784 #endif
2787 #if CMK_SMP_TRACE_COMMTHREAD
2788 startT = CmiWallTimer();
2789 #endif
2790 PumpNetworkSmsg();
2791 //MACHSTATE(8, "after PumpNetworkSmsg \n") ;
2792 #if CMK_SMP_TRACE_COMMTHREAD
2793 endT = CmiWallTimer();
2794 if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_PumpSmsg, startT, endT);
2795 #endif
2797 #if CMK_SMP_TRACE_COMMTHREAD
2798 startT = CmiWallTimer();
2799 #endif
2800 PumpLocalTransactions(default_tx_cqh, default_tx_cq_lock);
2801 //MACHSTATE(8, "after PumpLocalTransactions\n") ;
2802 #if CMK_SMP_TRACE_COMMTHREAD
2803 endT = CmiWallTimer();
2804 if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_PumpTransaction, startT, endT);
2805 #endif
2807 #if CMK_SMP_TRACE_COMMTHREAD
2808 startT = CmiWallTimer();
2809 #endif
2810 PumpLocalTransactions(rdma_tx_cqh, rdma_tx_cq_lock);
2812 #if CQWRITE
2813 PumpCqWriteTransactions();
2814 #endif
2816 #if REMOTE_EVENT
2817 PumpRemoteTransactions();
2818 #endif
2820 //MACHSTATE(8, "after PumpLocalTransactions\n") ;
2821 #if CMK_SMP_TRACE_COMMTHREAD
2822 endT = CmiWallTimer();
2823 if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_PumpRdmaTransaction, startT, endT);
2824 #endif
2826 /* Send buffered Message */
2827 #if CMK_SMP_TRACE_COMMTHREAD
2828 startT = CmiWallTimer();
2829 #endif
2830 #if CMK_USE_OOB
2831 if (SendBufferMsg(&smsg_oob_queue) == 1)
2832 #endif
2834 SendBufferMsg(&smsg_queue);
2836 //MACHSTATE(8, "after SendBufferMsg\n") ;
2837 #if CMK_SMP_TRACE_COMMTHREAD
2838 endT = CmiWallTimer();
2839 if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_SendBufferSmsg, startT, endT);
2840 #endif
2842 #if CMK_SMP_TRACE_COMMTHREAD
2843 startT = CmiWallTimer();
2844 #endif
2845 SendRdmaMsg();
2846 //MACHSTATE(8, "after SendRdmaMsg\n") ;
2847 #if CMK_SMP_TRACE_COMMTHREAD
2848 endT = CmiWallTimer();
2849 if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_SendFmaRdmaMsg, startT, endT);
2850 #endif
2852 #if CMK_SMP && ! LARGEPAGE
2853 if (_detected_hang) ProcessDeadlock();
2854 #endif
2857 /* useDynamicSMSG */
2858 static void _init_dynamic_smsg()
2860 gni_return_t status;
2861 uint32_t vmdh_index = -1;
2862 int i;
2864 smsg_attr_vector_local = (gni_smsg_attr_t**)malloc(mysize * sizeof(gni_smsg_attr_t*));
2865 smsg_attr_vector_remote = (gni_smsg_attr_t**)malloc(mysize * sizeof(gni_smsg_attr_t*));
2866 smsg_connected_flag = (int*)malloc(sizeof(int)*mysize);
2867 for(i=0; i<mysize; i++) {
2868 smsg_connected_flag[i] = 0;
2869 smsg_attr_vector_local[i] = NULL;
2870 smsg_attr_vector_remote[i] = NULL;
2872 if(mysize <=512)
2874 SMSG_MAX_MSG = 4096;
2875 }else if (mysize <= 4096)
2877 SMSG_MAX_MSG = 4096/mysize * 1024;
2878 }else if (mysize <= 16384)
2880 SMSG_MAX_MSG = 512;
2881 }else {
2882 SMSG_MAX_MSG = 256;
2885 send_smsg_attr.msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
2886 send_smsg_attr.mbox_maxcredit = SMSG_MAX_CREDIT;
2887 send_smsg_attr.msg_maxsize = SMSG_MAX_MSG;
2888 status = GNI_SmsgBufferSizeNeeded(&send_smsg_attr, &smsg_memlen);
2889 GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
2891 mailbox_list = (dynamic_smsg_mailbox_t*)malloc(sizeof(dynamic_smsg_mailbox_t));
2892 mailbox_list->size = smsg_memlen*avg_smsg_connection;
2893 posix_memalign(&mailbox_list->mailbox_base, 64, mailbox_list->size);
2894 bzero(mailbox_list->mailbox_base, mailbox_list->size);
2895 mailbox_list->offset = 0;
2896 mailbox_list->next = 0;
2898 status = GNI_MemRegister(nic_hndl, (uint64_t)(mailbox_list->mailbox_base),
2899 mailbox_list->size, smsg_rx_cqh,
2900 GNI_MEM_READWRITE,
2901 vmdh_index,
2902 &(mailbox_list->mem_hndl));
2903 GNI_RC_CHECK("MEMORY registration for smsg", status);
2905 status = GNI_EpCreate(nic_hndl, default_tx_cqh, &ep_hndl_unbound);
2906 GNI_RC_CHECK("Unbound EP", status);
2908 alloc_smsg_attr(&send_smsg_attr);
2910 status = GNI_EpPostDataWId (ep_hndl_unbound, &send_smsg_attr, SMSG_ATTR_SIZE, &recv_smsg_attr, SMSG_ATTR_SIZE, myrank);
2911 GNI_RC_CHECK("post unbound datagram", status);
2913 /* always pre-connect to proc 0 */
2914 //if (myrank != 0) connect_to(0);
2917 static void _init_static_smsg()
2919 gni_smsg_attr_t *smsg_attr;
2920 gni_smsg_attr_t remote_smsg_attr;
2921 gni_smsg_attr_t *smsg_attr_vec;
2922 gni_mem_handle_t my_smsg_mdh_mailbox;
2923 int ret, i;
2924 gni_return_t status;
2925 uint32_t vmdh_index = -1;
2926 mdh_addr_t base_infor;
2927 mdh_addr_t *base_addr_vec;
2928 char *env;
2930 if(mysize <=512)
2932 SMSG_MAX_MSG = 1024;
2933 }else if (mysize <= 4096)
2935 SMSG_MAX_MSG = 1024;
2936 }else if (mysize <= 16384)
2938 SMSG_MAX_MSG = 512;
2939 }else {
2940 SMSG_MAX_MSG = 256;
2943 env = getenv("CHARM_UGNI_SMSG_MAX_SIZE");
2944 if (env) SMSG_MAX_MSG = atoi(env);
2945 CmiAssert(SMSG_MAX_MSG > 0);
2947 smsg_attr = malloc(mysize * sizeof(gni_smsg_attr_t));
2949 smsg_attr[0].msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
2950 smsg_attr[0].mbox_maxcredit = SMSG_MAX_CREDIT;
2951 smsg_attr[0].msg_maxsize = SMSG_MAX_MSG;
2952 status = GNI_SmsgBufferSizeNeeded(&smsg_attr[0], &smsg_memlen);
2953 GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
2954 ret = posix_memalign(&smsg_mailbox_base, 64, smsg_memlen*(mysize));
2955 CmiAssert(ret == 0);
2956 bzero(smsg_mailbox_base, smsg_memlen*(mysize));
2958 status = GNI_MemRegister(nic_hndl, (uint64_t)smsg_mailbox_base,
2959 smsg_memlen*(mysize), smsg_rx_cqh,
2960 GNI_MEM_READWRITE,
2961 vmdh_index,
2962 &my_smsg_mdh_mailbox);
2963 register_memory_size += smsg_memlen*(mysize);
2964 GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
2966 if (myrank == 0) printf("Charm++> SMSG memory: %1.1fKB\n", 1.0*smsg_memlen*(mysize)/1024);
2967 if (myrank == 0 && register_memory_size>=MAX_REG_MEM ) printf("Charm++> FATAL ERROR your program has risk of hanging \n please set CHARM_UGNI_MEMPOOL_MAX to a larger value or use Dynamic smsg\n");
2969 base_infor.addr = (uint64_t)smsg_mailbox_base;
2970 base_infor.mdh = my_smsg_mdh_mailbox;
2971 base_addr_vec = malloc(mysize * sizeof(mdh_addr_t));
2973 allgather(&base_infor, base_addr_vec, sizeof(mdh_addr_t));
2975 for(i=0; i<mysize; i++)
2977 if(i==myrank)
2978 continue;
2979 smsg_attr[i].msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
2980 smsg_attr[i].mbox_maxcredit = SMSG_MAX_CREDIT;
2981 smsg_attr[i].msg_maxsize = SMSG_MAX_MSG;
2982 smsg_attr[i].mbox_offset = i*smsg_memlen;
2983 smsg_attr[i].buff_size = smsg_memlen;
2984 smsg_attr[i].msg_buffer = smsg_mailbox_base ;
2985 smsg_attr[i].mem_hndl = my_smsg_mdh_mailbox;
2988 for(i=0; i<mysize; i++)
2990 if (myrank == i) continue;
2992 remote_smsg_attr.msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
2993 remote_smsg_attr.mbox_maxcredit = SMSG_MAX_CREDIT;
2994 remote_smsg_attr.msg_maxsize = SMSG_MAX_MSG;
2995 remote_smsg_attr.mbox_offset = myrank*smsg_memlen;
2996 remote_smsg_attr.buff_size = smsg_memlen;
2997 remote_smsg_attr.msg_buffer = (void*)base_addr_vec[i].addr;
2998 remote_smsg_attr.mem_hndl = base_addr_vec[i].mdh;
3000 /* initialize the smsg channel */
3001 status = GNI_SmsgInit(ep_hndl_array[i], &smsg_attr[i], &remote_smsg_attr);
3002 GNI_RC_CHECK("SMSG Init", status);
3003 } //end initialization
3005 free(base_addr_vec);
3006 free(smsg_attr);
3008 status = GNI_SmsgSetMaxRetrans(nic_hndl, 4096);
3009 GNI_RC_CHECK("SmsgSetMaxRetrans Init", status);
3012 inline
3013 static void _init_send_queue(SMSG_QUEUE *queue)
3015 int i;
3016 #if ONE_SEND_QUEUE
3017 queue->sendMsgBuf = PCQueueCreate();
3018 destpe_avail = (char*)malloc(mysize * sizeof(char));
3019 #else
3020 queue->smsg_msglist_index = (MSG_LIST_INDEX*)malloc(mysize*sizeof(MSG_LIST_INDEX));
3021 #if CMK_SMP && SMP_LOCKS
3022 nonEmptyQueues = PCQueueCreate();
3023 #endif
3024 for(i =0; i<mysize; i++)
3026 #if CMK_SMP
3027 queue->smsg_msglist_index[i].sendSmsgBuf = PCQueueCreate();
3028 #if SMP_LOCKS
3029 queue->smsg_msglist_index[i].pushed = 0;
3030 queue->smsg_msglist_index[i].lock = CmiCreateLock();
3031 #endif
3032 #else
3033 queue->smsg_msglist_index[i].sendSmsgBuf = 0;
3034 queue->smsg_msglist_index[i].next = -1;
3035 queue->smsg_head_index = -1;
3036 #endif
3039 #endif
3042 inline
3043 static void _init_smsg()
3045 if(mysize > 1) {
3046 if (useDynamicSMSG)
3047 _init_dynamic_smsg();
3048 else
3049 _init_static_smsg();
3052 _init_send_queue(&smsg_queue);
3053 #if CMK_USE_OOB
3054 _init_send_queue(&smsg_oob_queue);
3055 #endif
3058 static void _init_static_msgq()
3060 gni_return_t status;
3061 /* MSGQ is to send and receive short messages for large jobs (exceeding 200,000 ranks). The performance scales by the node count rather than rank count */
3062 msgq_attrs.max_msg_sz = MSGQ_MAXSIZE;
3063 msgq_attrs.smsg_q_sz = 1;
3064 msgq_attrs.rcv_pool_sz = 1;
3065 msgq_attrs.num_msgq_eps = 2;
3066 msgq_attrs.nloc_insts = 8;
3067 msgq_attrs.modes = 0;
3068 msgq_attrs.rcv_cq_sz = REMOTE_QUEUE_ENTRIES ;
3070 status = GNI_MsgqInit(nic_hndl, NULL, NULL, NULL, &msgq_attrs, &msgq_handle);
3071 GNI_RC_CHECK("MSGQ Init", status);
3077 static CmiUInt8 total_mempool_size = 0;
3078 static CmiUInt8 total_mempool_calls = 0;
3080 #if USE_LRTS_MEMPOOL
3081 void *alloc_mempool_block(size_t *size, gni_mem_handle_t *mem_hndl, int expand_flag)
3083 void *pool;
3084 int ret;
3085 gni_return_t status = GNI_RC_SUCCESS;
3087 size_t default_size = expand_flag? _expand_mem : _mempool_size;
3088 if (*size < default_size) *size = default_size;
3089 #if LARGEPAGE
3090 // round up to be multiple of _tlbpagesize
3091 //*size = (*size + _tlbpagesize - 1)/_tlbpagesize*_tlbpagesize;
3092 *size = ALIGNHUGEPAGE(*size);
3093 #endif
3094 total_mempool_size += *size;
3095 total_mempool_calls += 1;
3096 #if !LARGEPAGE
3097 if ((*size > MAX_REG_MEM || *size > MAX_BUFF_SEND) && expand_flag)
3099 printf("Error: A mempool block with size %lld is allocated, which is greater than the maximum mempool allowed.\n Please increase the max pool size by using +gni-mempool-max or set enviorment variable CHARM_UGNI_MEMPOOL_MAX. (current=%lld, %lld)\n", *size, MAX_REG_MEM, MAX_BUFF_SEND);
3100 CmiAbort("alloc_mempool_block");
3102 #endif
3103 #if LARGEPAGE
3104 pool = my_get_huge_pages(*size);
3105 ret = pool==NULL;
3106 #else
3107 ret = posix_memalign(&pool, ALIGNBUF, *size);
3108 #endif
3109 if (ret != 0) {
3110 #if CMK_SMP && STEAL_MEMPOOL
3111 pool = steal_mempool_block(size, mem_hndl);
3112 if (pool != NULL) return pool;
3113 #endif
3114 printf("Charm++> can not allocate memory pool of size %.2fMB. \n", 1.0*(*size)/1024/1024);
3115 if (ret == ENOMEM)
3116 CmiAbort("alloc_mempool_block: out of memory.");
3117 else
3118 CmiAbort("alloc_mempool_block: posix_memalign failed");
3120 #if LARGEPAGE
3121 CmiMemLock();
3122 register_count++;
3123 MEMORY_REGISTER(onesided_hnd, nic_hndl, pool, *size, mem_hndl, &omdh, rdma_rx_cqh, status);
3124 CmiMemUnlock();
3125 if(status != GNI_RC_SUCCESS) {
3126 printf("[%d, %d] memory reigstration %f G (%lld) ask for %lld\n", myrank, CmiMyRank(), register_memory_size/(1024*1024.0*1024),register_count, *size);
3127 sweep_mempool(CpvAccess(mempool));
3129 GNI_RC_CHECK("MEMORY_REGISTER", status);
3130 #else
3131 SetMemHndlZero((*mem_hndl));
3132 #endif
3133 return pool;
3136 // ptr is a block head pointer
3137 void free_mempool_block(void *ptr, gni_mem_handle_t mem_hndl)
3139 if(!(IsMemHndlZero(mem_hndl)))
3141 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &mem_hndl, &omdh, GetSizeFromBlockHeader(ptr));
3143 #if LARGEPAGE
3144 my_free_huge_pages(ptr, GetSizeFromBlockHeader(ptr));
3145 #else
3146 free(ptr);
3147 #endif
3149 #endif
3151 void LrtsPreCommonInit(int everReturn){
3152 #if USE_LRTS_MEMPOOL
3153 CpvInitialize(mempool_type*, mempool);
3154 CpvAccess(mempool) = mempool_init(_mempool_size, alloc_mempool_block, free_mempool_block, _mempool_size_limit);
3155 MACHSTATE2(8, "mempool_init %d %p\n", CmiMyRank(), CpvAccess(mempool)) ;
3156 #endif
3159 void LrtsInit(int *argc, char ***argv, int *numNodes, int *myNodeID)
3161 register int i;
3162 int rc;
3163 int device_id = 0;
3164 unsigned int remote_addr;
3165 gni_cdm_handle_t cdm_hndl;
3166 gni_return_t status = GNI_RC_SUCCESS;
3167 uint32_t vmdh_index = -1;
3168 uint8_t ptag;
3169 unsigned int local_addr, *MPID_UGNI_AllAddr;
3170 int first_spawned;
3171 int physicalID;
3172 char *env;
3174 //void (*local_event_handler)(gni_cq_entry_t *, void *) = &LocalEventHandle;
3175 //void (*remote_smsg_event_handler)(gni_cq_entry_t *, void *) = &RemoteSmsgEventHandle;
3176 //void (*remote_bte_event_handler)(gni_cq_entry_t *, void *) = &RemoteBteEventHandle;
3178 status = PMI_Init(&first_spawned);
3179 GNI_RC_CHECK("PMI_Init", status);
3181 status = PMI_Get_size(&mysize);
3182 GNI_RC_CHECK("PMI_Getsize", status);
3184 status = PMI_Get_rank(&myrank);
3185 GNI_RC_CHECK("PMI_getrank", status);
3187 //physicalID = CmiPhysicalNodeID(myrank);
3189 //printf("Pysical Node ID:%d for PE:%d\n", physicalID, myrank);
3191 *myNodeID = myrank;
3192 *numNodes = mysize;
3194 #if MULTI_THREAD_SEND
3195 /* Currently, we only consider the case that comm. thread will only recv msgs */
3196 Cmi_smp_mode_setting = COMM_WORK_THREADS_SEND_RECV;
3197 #endif
3199 env = getenv("CHARM_UGNI_REMOTE_QUEUE_SIZE");
3200 if (env) REMOTE_QUEUE_ENTRIES = atoi(env);
3201 CmiGetArgInt(*argv,"+useRecvQueue", &REMOTE_QUEUE_ENTRIES);
3203 env = getenv("CHARM_UGNI_LOCAL_QUEUE_SIZE");
3204 if (env) LOCAL_QUEUE_ENTRIES = atoi(env);
3205 CmiGetArgInt(*argv,"+useSendQueue", &LOCAL_QUEUE_ENTRIES);
3207 env = getenv("CHARM_UGNI_DYNAMIC_SMSG");
3208 if (env) useDynamicSMSG = 1;
3209 if (!useDynamicSMSG)
3210 useDynamicSMSG = CmiGetArgFlag(*argv, "+useDynamicSmsg");
3211 CmiGetArgIntDesc(*argv, "+smsgConnection", &avg_smsg_connection,"Initial number of SMSGS connection per code");
3212 if (avg_smsg_connection>mysize) avg_smsg_connection = mysize;
3213 //useStaticMSGQ = CmiGetArgFlag(*argv, "+useStaticMsgQ");
3215 if(myrank == 0)
3217 printf("Charm++> Running on Gemini (GNI) with %d processes\n", mysize);
3218 printf("Charm++> %s SMSG\n", useDynamicSMSG?"dynamic":"static");
3220 #ifdef USE_ONESIDED
3221 onesided_init(NULL, &onesided_hnd);
3223 // this is a GNI test, so use the libonesided bypass functionality
3224 onesided_gni_bypass_get_nih(onesided_hnd, &nic_hndl);
3225 local_addr = gniGetNicAddress();
3226 #else
3227 ptag = get_ptag();
3228 cookie = get_cookie();
3229 #if 0
3230 modes = GNI_CDM_MODE_CQ_NIC_LOCAL_PLACEMENT;
3231 #endif
3232 //Create and attach to the communication domain */
3233 status = GNI_CdmCreate(myrank, ptag, cookie, modes, &cdm_hndl);
3234 GNI_RC_CHECK("GNI_CdmCreate", status);
3235 //* device id The device id is the minor number for the device
3236 //that is assigned to the device by the system when the device is created.
3237 //To determine the device number, look in the /dev directory, which contains a list of devices. For a NIC, the device is listed as kgniX
3238 //where X is the device number 0 default
3239 status = GNI_CdmAttach(cdm_hndl, device_id, &local_addr, &nic_hndl);
3240 GNI_RC_CHECK("GNI_CdmAttach", status);
3241 local_addr = get_gni_nic_address(0);
3242 #endif
3243 MPID_UGNI_AllAddr = (unsigned int *)malloc(sizeof(unsigned int) * mysize);
3244 _MEMCHECK(MPID_UGNI_AllAddr);
3245 allgather(&local_addr, MPID_UGNI_AllAddr, sizeof(unsigned int));
3246 /* create the local completion queue */
3247 /* the third parameter : The number of events the NIC allows before generating an interrupt. Setting this parameter to zero results in interrupt delivery with every event. When using this parameter, the mode parameter must be set to GNI_CQ_BLOCKING*/
3248 status = GNI_CqCreate(nic_hndl, LOCAL_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &default_tx_cqh);
3249 GNI_RC_CHECK("GNI_CqCreate (tx)", status);
3251 status = GNI_CqCreate(nic_hndl, LOCAL_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &rdma_tx_cqh);
3252 GNI_RC_CHECK("GNI_CqCreate RDMA (tx)", status);
3253 /* create the destination completion queue for receiving micro-messages, make this queue considerably larger than the number of transfers */
3255 status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &smsg_rx_cqh);
3256 GNI_RC_CHECK("Create CQ (rx)", status);
3258 status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &rdma_rx_cqh);
3259 GNI_RC_CHECK("Create Post CQ (rx)", status);
3261 //status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &rdma_cqh);
3262 //GNI_RC_CHECK("Create BTE CQ", status);
3264 /* create the endpoints. they need to be bound to allow later CQWrites to them */
3265 ep_hndl_array = (gni_ep_handle_t*)malloc(mysize * sizeof(gni_ep_handle_t));
3266 _MEMCHECK(ep_hndl_array);
3267 #if MULTI_THREAD_SEND
3268 rx_cq_lock = global_gni_lock = default_tx_cq_lock = smsg_mailbox_lock = CmiCreateLock();
3269 //default_tx_cq_lock = CmiCreateLock();
3270 rdma_tx_cq_lock = CmiCreateLock();
3271 smsg_rx_cq_lock = CmiCreateLock();
3272 //global_gni_lock = CmiCreateLock();
3273 //rx_cq_lock = CmiCreateLock();
3274 #endif
3275 for (i=0; i<mysize; i++) {
3276 if(i == myrank) continue;
3277 status = GNI_EpCreate(nic_hndl, default_tx_cqh, &ep_hndl_array[i]);
3278 GNI_RC_CHECK("GNI_EpCreate ", status);
3279 remote_addr = MPID_UGNI_AllAddr[i];
3280 status = GNI_EpBind(ep_hndl_array[i], remote_addr, i);
3281 GNI_RC_CHECK("GNI_EpBind ", status);
3284 /* SMSG is fastest but not scale; Msgq is scalable, FMA is own implementation for small message */
3285 _init_smsg();
3286 PMI_Barrier();
3288 #if USE_LRTS_MEMPOOL
3289 env = getenv("CHARM_UGNI_MAX_MEMORY_ON_NODE");
3290 if (env) {
3291 _totalmem = CmiReadSize(env);
3292 if (myrank == 0)
3293 printf("Charm++> total registered memory available per node is %.1fGB\n", (float)(_totalmem*1.0/oneGB));
3296 env = getenv("CHARM_UGNI_MEMPOOL_INIT_SIZE");
3297 if (env) _mempool_size = CmiReadSize(env);
3298 if (CmiGetArgStringDesc(*argv,"+gni-mempool-init-size",&env,"Set the memory pool size"))
3299 _mempool_size = CmiReadSize(env);
3302 env = getenv("CHARM_UGNI_MEMPOOL_MAX");
3303 if (env) {
3304 MAX_REG_MEM = CmiReadSize(env);
3305 user_set_flag = 1;
3307 if (CmiGetArgStringDesc(*argv,"+gni-mempool-max",&env,"Set the memory pool max size")) {
3308 MAX_REG_MEM = CmiReadSize(env);
3309 user_set_flag = 1;
3312 env = getenv("CHARM_UGNI_SEND_MAX");
3313 if (env) {
3314 MAX_BUFF_SEND = CmiReadSize(env);
3315 user_set_flag = 1;
3317 if (CmiGetArgStringDesc(*argv,"+gni-mempool-max-send",&env,"Set the memory pool max size for send")) {
3318 MAX_BUFF_SEND = CmiReadSize(env);
3319 user_set_flag = 1;
3322 env = getenv("CHARM_UGNI_MEMPOOL_SIZE_LIMIT");
3323 if (env) {
3324 _mempool_size_limit = CmiReadSize(env);
3327 if (MAX_REG_MEM < _mempool_size) MAX_REG_MEM = _mempool_size;
3328 if (MAX_BUFF_SEND > MAX_REG_MEM) MAX_BUFF_SEND = MAX_REG_MEM;
3330 if (myrank==0) {
3331 printf("Charm++> memory pool init block size: %1.fMB, total registered memory per node: %1.fMB\n", _mempool_size/1024.0/1024, _mempool_size_limit/1024.0/1024);
3332 printf("Charm++> memory pool registered memory limit: %1.fMB, send limit: %1.fMB\n", MAX_REG_MEM/1024.0/1024, MAX_BUFF_SEND/1024.0/1024);
3333 if (MAX_REG_MEM < BIG_MSG * 2 + oneMB) {
3334 /* memblock can expand to BIG_MSG * 2 size */
3335 printf("Charm++ Error: The mempool maximum size is too small, please use command line option +gni-mempool-max or environment variable CHARM_UGNI_MEMPOOL_MAX to increase the value to at least %1.fMB.\n", BIG_MSG * 2.0/1024/1024 + 1);
3336 CmiAbort("mempool maximum size is too small. \n");
3338 #if MULTI_THREAD_SEND
3339 printf("Charm++> worker thread sending messages\n");
3340 #elif COMM_THREAD_SEND
3341 printf("Charm++> only comm thread send/recv messages\n");
3342 #endif
3345 #endif /* end of USE_LRTS_MEMPOOL */
3347 env = getenv("CHARM_UGNI_BIG_MSG_SIZE");
3348 if (env) {
3349 BIG_MSG = CmiReadSize(env);
3350 if (BIG_MSG < ONE_SEG)
3351 CmiAbort("BIG_MSG size is too small in the environment variable CHARM_UGNI_BIG_MSG_SIZE.");
3353 env = getenv("CHARM_UGNI_BIG_MSG_PIPELINE_LEN");
3354 if (env) {
3355 BIG_MSG_PIPELINE = atoi(env);
3358 env = getenv("CHARM_UGNI_NO_DEADLOCK_CHECK");
3359 if (env) _checkProgress = 0;
3360 if (mysize == 1) _checkProgress = 0;
3364 env = getenv("HUGETLB_DEFAULT_PAGE_SIZE");
3365 if (env)
3366 _tlbpagesize = CmiReadSize(env);
3368 /* real gethugepagesize() is only available when hugetlb module linked */
3369 _tlbpagesize = gethugepagesize();
3370 if (myrank == 0) {
3371 printf("Charm++> Cray TLB page size: %1.fK\n", _tlbpagesize/1024.0);
3374 #if LARGEPAGE
3375 if (_tlbpagesize == 4096) {
3376 CmiAbort("Hugepage module, e.g. craype-hugepages8M must be loaded.");
3378 #endif
3380 print_stats = CmiGetArgFlag(*argv, "+print_stats");
3382 /* init DMA buffer for medium message */
3384 //_init_DMA_buffer();
3386 free(MPID_UGNI_AllAddr);
3387 #if CMK_SMP
3388 sendRdmaBuf = PCQueueCreate();
3389 #else
3390 sendRdmaBuf = 0;
3391 #endif
3392 #if MACHINE_DEBUG_LOG
3393 char ln[200];
3394 sprintf(ln,"debugLog.%d",myrank);
3395 debugLog=fopen(ln,"w");
3396 #endif
3398 // NTK_Init();
3399 // ntk_return_t sts = NTK_System_GetSmpdCount(&_smpd_count);
3401 #if REMOTE_EVENT
3402 AckPool_init();
3403 #endif
3405 #if CMK_WITH_STATS
3406 init_comm_stats();
3407 #endif
3410 void* LrtsAlloc(int n_bytes, int header)
3412 void *ptr = NULL;
3413 #if 0
3414 printf("\n[PE:%d]Alloc Lrts for bytes=%d, head=%d %d\n", CmiMyPe(), n_bytes, header, SMSG_MAX_MSG);
3415 #endif
3416 if(n_bytes <= SMSG_MAX_MSG)
3418 int totalsize = n_bytes+header;
3419 ptr = malloc(totalsize);
3421 else {
3422 CmiAssert(header+sizeof(mempool_header) <= ALIGNBUF);
3423 #if USE_LRTS_MEMPOOL
3424 n_bytes = ALIGN64(n_bytes);
3425 if(n_bytes < BIG_MSG)
3427 char *res = mempool_malloc(CpvAccess(mempool), ALIGNBUF+n_bytes-sizeof(mempool_header), 1);
3428 if (res) ptr = res - sizeof(mempool_header) + ALIGNBUF - header;
3429 }else
3431 #if LARGEPAGE
3432 //printf("[%d] LrtsAlloc a big_msg: %d %d\n", myrank, n_bytes, ALIGNHUGEPAGE(n_bytes+ALIGNBUF));
3433 n_bytes = ALIGNHUGEPAGE(n_bytes+ALIGNBUF);
3434 char *res = my_get_huge_pages(n_bytes);
3435 #else
3436 char *res = memalign(ALIGNBUF, n_bytes+ALIGNBUF);
3437 #endif
3438 if (res) ptr = res + ALIGNBUF - header;
3440 #else
3441 n_bytes = ALIGN64(n_bytes); /* make sure size if 4 aligned */
3442 char *res = memalign(ALIGNBUF, n_bytes+ALIGNBUF);
3443 ptr = res + ALIGNBUF - header;
3444 #endif
3446 return ptr;
3449 void LrtsFree(void *msg)
3451 CmiUInt4 size = SIZEFIELD((char*)msg+sizeof(CmiChunkHeader));
3452 if (size <= SMSG_MAX_MSG)
3453 free(msg);
3454 else {
3455 size = ALIGN64(size);
3456 if(size>=BIG_MSG)
3458 #if LARGEPAGE
3459 int s = ALIGNHUGEPAGE(size+ALIGNBUF);
3460 my_free_huge_pages((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF, s);
3461 #else
3462 free((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF);
3463 #endif
3465 else {
3466 #if USE_LRTS_MEMPOOL
3467 #if CMK_SMP
3468 mempool_free_thread((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF + sizeof(mempool_header));
3469 #else
3470 mempool_free(CpvAccess(mempool), (char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF + sizeof(mempool_header));
3471 #endif
3472 #else
3473 free((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF);
3474 #endif
3479 void LrtsExit()
3481 #if CMK_WITH_STATS
3482 if (print_stats) print_comm_stats();
3483 #endif
3484 /* free memory ? */
3485 #if USE_LRTS_MEMPOOL
3486 //printf("FINAL [%d, %d] register=%lld, send=%lld\n", myrank, CmiMyRank(), register_memory_size, buffered_send_msg);
3487 mempool_destroy(CpvAccess(mempool));
3488 #endif
3489 PMI_Finalize();
3490 exit(0);
3493 void LrtsDrainResources()
3495 if(mysize == 1) return;
3496 while (
3497 #if CMK_USE_OOB
3498 !SendBufferMsg(&smsg_oob_queue) ||
3499 #endif
3500 !SendBufferMsg(&smsg_queue)
3503 if (useDynamicSMSG)
3504 PumpDatagramConnection();
3505 PumpNetworkSmsg();
3506 PumpLocalTransactions(default_tx_cqh, default_tx_cq_lock);
3507 PumpLocalTransactions(rdma_tx_cqh, rdma_tx_cq_lock);
3508 SendRdmaMsg();
3510 PMI_Barrier();
3513 void LrtsAbort(const char *message) {
3514 printf("CmiAbort is calling on PE:%d\n", myrank);
3515 CmiPrintStackTrace(0);
3516 PMI_Abort(-1, message);
3519 /************************** TIMER FUNCTIONS **************************/
3520 #if CMK_TIMER_USE_SPECIAL
3521 /* MPI calls are not threadsafe, even the timer on some machines */
3522 static CmiNodeLock timerLock = 0;
3523 static int _absoluteTime = 0;
3524 static int _is_global = 0;
3525 static struct timespec start_ts;
3527 inline int CmiTimerIsSynchronized() {
3528 return 0;
3531 inline int CmiTimerAbsolute() {
3532 return _absoluteTime;
3535 double CmiStartTimer() {
3536 return 0.0;
3539 double CmiInitTime() {
3540 return (double)(start_ts.tv_sec)+(double)start_ts.tv_nsec/1000000000.0;
3543 void CmiTimerInit(char **argv) {
3544 _absoluteTime = CmiGetArgFlagDesc(argv,"+useAbsoluteTime", "Use system's absolute time as wallclock time.");
3545 if (_absoluteTime && CmiMyPe() == 0)
3546 printf("Charm++> absolute timer is used\n");
3548 _is_global = CmiTimerIsSynchronized();
3551 if (_is_global) {
3552 if (CmiMyRank() == 0) {
3553 clock_gettime(CLOCK_MONOTONIC, &start_ts);
3555 } else { /* we don't have a synchronous timer, set our own start time */
3556 CmiBarrier();
3557 CmiBarrier();
3558 CmiBarrier();
3559 clock_gettime(CLOCK_MONOTONIC, &start_ts);
3561 CmiNodeAllBarrier(); /* for smp */
3565 * Since the timerLock is never created, and is
3566 * always NULL, then all the if-condition inside
3567 * the timer functions could be disabled right
3568 * now in the case of SMP.
3570 double CmiTimer(void) {
3571 struct timespec now_ts;
3572 clock_gettime(CLOCK_MONOTONIC, &now_ts);
3573 return _absoluteTime?((double)(now_ts.tv_sec)+(double)now_ts.tv_nsec/1000000000.0)
3574 : (double)( now_ts.tv_sec - start_ts.tv_sec ) + (((double) now_ts.tv_nsec - (double) start_ts.tv_nsec) / 1000000000.0);
3577 double CmiWallTimer(void) {
3578 struct timespec now_ts;
3579 clock_gettime(CLOCK_MONOTONIC, &now_ts);
3580 return _absoluteTime?((double)(now_ts.tv_sec)+(double)now_ts.tv_nsec/1000000000.0)
3581 : ( now_ts.tv_sec - start_ts.tv_sec ) + ((now_ts.tv_nsec - start_ts.tv_nsec) / 1000000000.0);
3584 double CmiCpuTimer(void) {
3585 struct timespec now_ts;
3586 clock_gettime(CLOCK_MONOTONIC, &now_ts);
3587 return _absoluteTime?((double)(now_ts.tv_sec)+(double)now_ts.tv_nsec/1000000000.0)
3588 : (double)( now_ts.tv_sec - start_ts.tv_sec ) + (((double) now_ts.tv_nsec - (double) start_ts.tv_nsec) / 1000000000.0);
3591 #endif
3592 /************Barrier Related Functions****************/
3594 int CmiBarrier()
3596 gni_return_t status;
3598 #if CMK_SMP
3599 /* make sure all ranks reach here, otherwise comm threads may reach barrier ignoring other ranks */
3600 CmiNodeAllBarrier();
3601 if (CmiMyRank() == CmiMyNodeSize())
3602 #else
3603 if (CmiMyRank() == 0)
3604 #endif
3607 * The call of CmiBarrier is usually before the initialization
3608 * of trace module of Charm++, therefore, the START_EVENT
3609 * and END_EVENT are disabled here. -Chao Mei
3611 /*START_EVENT();*/
3612 status = PMI_Barrier();
3613 GNI_RC_CHECK("PMI_Barrier", status);
3614 /*END_EVENT(10);*/
3616 CmiNodeAllBarrier();
3617 return 0;
3620 #if CMK_DIRECT
3621 #include "machine-cmidirect.c"
3622 #endif
3623 #if CMK_PERSISTENT_COMM
3624 #include "machine-persistent.c"
3625 #endif