fix memory registeration bug with persistent message
[charm.git] / src / arch / gemini_gni / machine.c
blob0f93a816cc26dabc2b07dc13499cba32bf3fdaa8
2 /** @file
3 * Gemini GNI machine layer
5 * Author: Yanhua Sun
6 Gengbin Zheng
7 * Date: 07-01-2011
9 * Flow control by mem pool using environment variables:
11 # CHARM_UGNI_MEMPOOL_MAX can be maximum_register_mem/number_of_processes
12 # CHARM_UGNI_SEND_MAX can be half of CHARM_UGNI_MEMPOOL_MAX
13 export CHARM_UGNI_MEMPOOL_INIT_SIZE=8M
14 export CHARM_UGNI_MEMPOOL_MAX=20M
15 export CHARM_UGNI_SEND_MAX=10M
17 # limit on total mempool size allocated, this is to prevent mempool
18 # uses too much memory
19 export CHARM_UGNI_MEMPOOL_SIZE_LIMIT=512M
21 other environment variables:
23 export CHARM_UGNI_NO_DEADLOCK_CHECK=yes # disable checking deadlock
24 export CHARM_UGNI_MAX_MEMORY_ON_NODE=0.8G # max memory per node for mempool
25 export CHARM_UGNI_BIG_MSG_SIZE=4M # set big message size protocol
26 export CHARM_UGNI_BIG_MSG_PIPELINE_LEN=4 # set big message pipe len
28 /*@{*/
30 #include <stdio.h>
31 #include <stdlib.h>
32 #include <stdint.h>
33 #include <errno.h>
34 #include <malloc.h>
35 #include <unistd.h>
36 #include <time.h>
37 #include <sys/dir.h>
38 #include <sys/stat.h>
39 #include <gni_pub.h>
40 #include <pmi.h>
41 //#include <numatoolkit.h>
43 #include "converse.h"
45 #if CMK_DIRECT
46 #include "cmidirect.h"
47 #endif
49 #define LARGEPAGE 0
51 #if CMK_SMP
52 #define MULTI_THREAD_SEND 0
53 #define COMM_THREAD_SEND 1
54 #endif
56 #if MULTI_THREAD_SEND
57 #define CMK_WORKER_SINGLE_TASK 0
58 #endif
60 #define REMOTE_EVENT 0
61 #define CQWRITE 0
63 #define CMI_EXERT_SEND_CAP 0
64 #define CMI_EXERT_RECV_CAP 0
66 #if CMI_EXERT_SEND_CAP
67 #define SEND_CAP 16
68 #endif
70 #if CMI_EXERT_RECV_CAP
71 #define RECV_CAP 2
72 #endif
74 #define USE_LRTS_MEMPOOL 1
76 #define PRINT_SYH 0
78 // Trace communication thread
79 #if CMK_TRACE_ENABLED && CMK_SMP_TRACE_COMMTHREAD
80 #define TRACE_THRESHOLD 0.00005
81 #define CMI_MPI_TRACE_MOREDETAILED 0
82 #undef CMI_MPI_TRACE_USEREVENTS
83 #define CMI_MPI_TRACE_USEREVENTS 1
84 #else
85 #undef CMK_SMP_TRACE_COMMTHREAD
86 #define CMK_SMP_TRACE_COMMTHREAD 0
87 #endif
89 #define CMK_TRACE_COMMOVERHEAD 0
90 #if CMK_TRACE_ENABLED && CMK_TRACE_COMMOVERHEAD
91 #undef CMI_MPI_TRACE_USEREVENTS
92 #define CMI_MPI_TRACE_USEREVENTS 1
93 #else
94 #undef CMK_TRACE_COMMOVERHEAD
95 #define CMK_TRACE_COMMOVERHEAD 0
96 #endif
98 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && ! CMK_TRACE_IN_CHARM
99 CpvStaticDeclare(double, projTraceStart);
100 #define START_EVENT() CpvAccess(projTraceStart) = CmiWallTimer();
101 #define END_EVENT(x) traceUserBracketEvent(x, CpvAccess(projTraceStart), CmiWallTimer());
102 #else
103 #define START_EVENT()
104 #define END_EVENT(x)
105 #endif
107 #if USE_LRTS_MEMPOOL
109 #define oneMB (1024ll*1024)
110 #define oneGB (1024ll*1024*1024)
112 static CmiInt8 _mempool_size = 8*oneMB;
113 static CmiInt8 _expand_mem = 4*oneMB;
114 static CmiInt8 _mempool_size_limit = 0;
116 static CmiInt8 _totalmem = 0.8*oneGB;
118 #if LARGEPAGE
119 static CmiInt8 BIG_MSG = 16*oneMB;
120 static CmiInt8 ONE_SEG = 4*oneMB;
121 #else
122 static CmiInt8 BIG_MSG = 4*oneMB;
123 static CmiInt8 ONE_SEG = 2*oneMB;
124 #endif
125 #if MULTI_THREAD_SEND
126 static int BIG_MSG_PIPELINE = 1;
127 #else
128 static int BIG_MSG_PIPELINE = 4;
129 #endif
131 // dynamic flow control
132 static CmiInt8 buffered_send_msg = 0;
133 static CmiInt8 register_memory_size = 0;
135 #if LARGEPAGE
136 static CmiInt8 MAX_BUFF_SEND = 100000*oneMB;
137 static CmiInt8 MAX_REG_MEM = 200000*oneMB;
138 static CmiInt8 register_count = 0;
139 #else
140 #if CMK_SMP && COMM_THREAD_SEND
141 static CmiInt8 MAX_BUFF_SEND = 100*oneMB;
142 static CmiInt8 MAX_REG_MEM = 200*oneMB;
143 #else
144 static CmiInt8 MAX_BUFF_SEND = 16*oneMB;
145 static CmiInt8 MAX_REG_MEM = 25*oneMB;
146 #endif
149 #endif
151 #endif /* end USE_LRTS_MEMPOOL */
153 #if MULTI_THREAD_SEND
154 #define CMI_GNI_LOCK(x) CmiLock(x);
155 #define CMI_GNI_UNLOCK(x) CmiUnlock(x);
156 #define CMI_PCQUEUEPOP_LOCK(Q) CmiLock((Q)->lock);
157 #define CMI_PCQUEUEPOP_UNLOCK(Q) CmiUnlock((Q)->lock);
158 #else
159 #define CMI_GNI_LOCK(x)
160 #define CMI_GNI_UNLOCK(x)
161 #define CMI_PCQUEUEPOP_LOCK(Q)
162 #define CMI_PCQUEUEPOP_UNLOCK(Q)
163 #endif
165 static int _tlbpagesize = 4096;
167 //static int _smpd_count = 0;
169 static int user_set_flag = 0;
171 static int _checkProgress = 1; /* check deadlock */
172 static int _detected_hang = 0;
174 #define SMSG_ATTR_SIZE sizeof(gni_smsg_attr_t)
176 // dynamic SMSG
177 static int useDynamicSMSG =0; /* dynamic smsgs setup */
179 static int avg_smsg_connection = 32;
180 static int *smsg_connected_flag= 0;
181 static gni_smsg_attr_t **smsg_attr_vector_local;
182 static gni_smsg_attr_t **smsg_attr_vector_remote;
183 static gni_ep_handle_t ep_hndl_unbound;
184 static gni_smsg_attr_t send_smsg_attr;
185 static gni_smsg_attr_t recv_smsg_attr;
187 typedef struct _dynamic_smsg_mailbox{
188 void *mailbox_base;
189 int size;
190 int offset;
191 gni_mem_handle_t mem_hndl;
192 struct _dynamic_smsg_mailbox *next;
193 }dynamic_smsg_mailbox_t;
195 static dynamic_smsg_mailbox_t *mailbox_list;
197 static CmiUInt8 smsg_send_count = 0, last_smsg_send_count = 0;
198 static CmiUInt8 smsg_recv_count = 0, last_smsg_recv_count = 0;
200 #if PRINT_SYH
201 int lrts_send_msg_id = 0;
202 int lrts_local_done_msg = 0;
203 int lrts_send_rdma_success = 0;
204 #endif
206 #include "machine.h"
208 #include "pcqueue.h"
210 #include "mempool.h"
212 #if CMK_PERSISTENT_COMM
213 #include "machine-persistent.h"
214 #endif
216 //#define USE_ONESIDED 1
217 #ifdef USE_ONESIDED
218 //onesided implementation is wrong, since no place to restore omdh
219 #include "onesided.h"
220 onesided_hnd_t onesided_hnd;
221 onesided_md_t omdh;
222 #define MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh) omdh. onesided_mem_register(handler, (uint64_t)msg, size, 0, myomdh)
224 #define MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh) onesided_mem_deregister(handler, myomdh)
226 #else
227 uint8_t onesided_hnd, omdh;
229 #if REMOTE_EVENT || CQWRITE
230 #define MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdhh, cqh, status) \
231 if(register_memory_size+size>= MAX_REG_MEM) { \
232 status = GNI_RC_ERROR_NOMEM;} \
233 else {status = GNI_MemRegister(nic_hndl, (uint64_t)msg, (uint64_t)size, cqh, GNI_MEM_READWRITE, -1, mem_hndl); \
234 if(status == GNI_RC_SUCCESS) register_memory_size += size; }
235 #else
236 #define MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh, cqh, status ) \
237 if (register_memory_size + size >= MAX_REG_MEM) { \
238 status = GNI_RC_ERROR_NOMEM; \
239 } else { status = GNI_MemRegister(nic_hndl, (uint64_t)msg, (uint64_t)size, NULL, GNI_MEM_READWRITE, -1, mem_hndl); \
240 if(status == GNI_RC_SUCCESS) register_memory_size += size; }
241 #endif
243 #define MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh, size) \
244 do { if (GNI_MemDeregister(nic_hndl, (mem_hndl) ) == GNI_RC_SUCCESS) \
245 register_memory_size -= size; \
246 else CmiAbort("MEM_DEregister"); \
247 } while (0)
248 #endif
250 #define GetMempoolBlockPtr(x) (((mempool_header*)((char*)(x)-ALIGNBUF))->block_ptr)
251 #define GetMempoolPtr(x) GetMempoolBlockPtr(x)->mptr
252 #define GetMempoolsize(x) GetMempoolBlockPtr(x)->size
253 #define GetMemHndl(x) GetMempoolBlockPtr(x)->mem_hndl
254 #define IncreaseMsgInRecv(x) (GetMempoolBlockPtr(x)->msgs_in_recv)++
255 #define DecreaseMsgInRecv(x) (GetMempoolBlockPtr(x)->msgs_in_recv)--
256 #define IncreaseMsgInSend(x) (GetMempoolBlockPtr(x)->msgs_in_send)++
257 #define DecreaseMsgInSend(x) (GetMempoolBlockPtr(x)->msgs_in_send)--
258 #define NoMsgInSend(x) GetMempoolBlockPtr(x)->msgs_in_send == 0
259 #define NoMsgInRecv(x) GetMempoolBlockPtr(x)->msgs_in_recv == 0
260 #define NoMsgInFlight(x) (GetMempoolBlockPtr(x)->msgs_in_send + GetMempoolBlockPtr(x)->msgs_in_recv == 0)
261 #define IsMemHndlZero(x) ((x).qword1 == 0 && (x).qword2 == 0)
262 #define SetMemHndlZero(x) do {(x).qword1 = 0;(x).qword2 = 0;} while (0)
263 #define NotRegistered(x) IsMemHndlZero(((block_header*)x)->mem_hndl)
265 #define GetMemHndlFromBlockHeader(x) ((block_header*)x)->mem_hndl
266 #define GetSizeFromBlockHeader(x) ((block_header*)x)->size
268 #define CmiGetMsgSize(m) ((CmiMsgHeaderExt*)m)->size
269 #define CmiSetMsgSize(m,s) ((((CmiMsgHeaderExt*)m)->size)=(s))
270 #define CmiGetMsgSeq(m) ((CmiMsgHeaderExt*)m)->seq
271 #define CmiSetMsgSeq(m, s) ((((CmiMsgHeaderExt*)m)->seq) = (s))
273 #define ALIGNBUF 64
275 /* =======Beginning of Definitions of Performance-Specific Macros =======*/
276 /* If SMSG is not used */
278 #define FMA_PER_CORE 1024
279 #define FMA_BUFFER_SIZE 1024
281 /* If SMSG is used */
282 static int SMSG_MAX_MSG = 1024;
283 #define SMSG_MAX_CREDIT 72
285 #define MSGQ_MAXSIZE 2048
287 /* large message transfer with FMA or BTE */
288 #if ! REMOTE_EVENT
289 #define LRTS_GNI_RDMA_THRESHOLD 1024
290 #else
291 /* remote events only work with RDMA */
292 #define LRTS_GNI_RDMA_THRESHOLD 0
293 #endif
295 #if CMK_SMP
296 static int REMOTE_QUEUE_ENTRIES=163840;
297 static int LOCAL_QUEUE_ENTRIES=163840;
298 #else
299 static int REMOTE_QUEUE_ENTRIES=20480;
300 static int LOCAL_QUEUE_ENTRIES=20480;
301 #endif
303 #define BIG_MSG_TAG 0x26
304 #define PUT_DONE_TAG 0x28
305 #define DIRECT_PUT_DONE_TAG 0x29
306 #define ACK_TAG 0x30
307 /* SMSG is data message */
308 #define SMALL_DATA_TAG 0x31
309 /* SMSG is a control message to initialize a BTE */
310 #define LMSG_INIT_TAG 0x39
312 #define DEBUG
313 #ifdef GNI_RC_CHECK
314 #undef GNI_RC_CHECK
315 #endif
316 #ifdef DEBUG
317 #define GNI_RC_CHECK(msg,rc) do { if(rc != GNI_RC_SUCCESS) { printf("[%d] %s; err=%s\n",CmiMyPe(),msg,gni_err_str[rc]); CmiAbort("GNI_RC_CHECK"); } } while(0)
318 #else
319 #define GNI_RC_CHECK(msg,rc)
320 #endif
322 #define ALIGN64(x) (size_t)((~63)&((x)+63))
323 //#define ALIGN4(x) (size_t)((~3)&((x)+3))
324 #define ALIGNHUGEPAGE(x) (size_t)((~(_tlbpagesize-1))&((x)+_tlbpagesize-1))
326 static int useStaticMSGQ = 0;
327 static int useStaticFMA = 0;
328 static int mysize, myrank;
329 static gni_nic_handle_t nic_hndl;
331 typedef struct {
332 gni_mem_handle_t mdh;
333 uint64_t addr;
334 } mdh_addr_t ;
335 // this is related to dynamic SMSG
337 typedef struct mdh_addr_list{
338 gni_mem_handle_t mdh;
339 void *addr;
340 struct mdh_addr_list *next;
341 }mdh_addr_list_t;
343 static unsigned int smsg_memlen;
344 gni_smsg_attr_t **smsg_local_attr_vec = 0;
345 mdh_addr_t setup_mem;
346 mdh_addr_t *smsg_connection_vec = 0;
347 gni_mem_handle_t smsg_connection_memhndl;
348 static int smsg_expand_slots = 10;
349 static int smsg_available_slot = 0;
350 static void *smsg_mailbox_mempool = 0;
351 mdh_addr_list_t *smsg_dynamic_list = 0;
353 static void *smsg_mailbox_base;
354 gni_msgq_attr_t msgq_attrs;
355 gni_msgq_handle_t msgq_handle;
356 gni_msgq_ep_attr_t msgq_ep_attrs;
357 gni_msgq_ep_attr_t msgq_ep_attrs_size;
359 /* =====Beginning of Declarations of Machine Specific Variables===== */
360 static int cookie;
361 static int modes = 0;
362 static gni_cq_handle_t smsg_rx_cqh = NULL;
363 static gni_cq_handle_t default_tx_cqh = NULL;
364 static gni_cq_handle_t rdma_tx_cqh = NULL;
365 static gni_cq_handle_t rdma_rx_cqh = NULL;
366 static gni_cq_handle_t post_tx_cqh = NULL;
367 static gni_ep_handle_t *ep_hndl_array;
369 static CmiNodeLock *ep_lock_array;
370 static CmiNodeLock default_tx_cq_lock;
371 static CmiNodeLock rdma_tx_cq_lock;
372 static CmiNodeLock global_gni_lock;
373 static CmiNodeLock rx_cq_lock;
374 static CmiNodeLock smsg_mailbox_lock;
375 static CmiNodeLock smsg_rx_cq_lock;
376 static CmiNodeLock *mempool_lock;
377 //#define CMK_WITH_STATS 0
378 typedef struct msg_list
380 uint32_t destNode;
381 uint32_t size;
382 void *msg;
383 uint8_t tag;
384 #if !CMK_SMP
385 struct msg_list *next;
386 #endif
387 #if CMK_WITH_STATS
388 double creation_time;
389 #endif
390 }MSG_LIST;
393 typedef struct control_msg
395 uint64_t source_addr; /* address from the start of buffer */
396 uint64_t dest_addr; /* address from the start of buffer */
397 int total_length; /* total length */
398 int length; /* length of this packet */
399 #if REMOTE_EVENT
400 int ack_index; /* index from integer to address */
401 #endif
402 uint8_t seq_id; //big message 0 meaning single message
403 gni_mem_handle_t source_mem_hndl;
404 struct control_msg *next;
405 } CONTROL_MSG;
407 #define CONTROL_MSG_SIZE (sizeof(CONTROL_MSG)-sizeof(void*))
409 typedef struct ack_msg
411 uint64_t source_addr; /* address from the start of buffer */
412 #if ! USE_LRTS_MEMPOOL
413 gni_mem_handle_t source_mem_hndl;
414 int length; /* total length */
415 #endif
416 struct ack_msg *next;
417 } ACK_MSG;
419 #define ACK_MSG_SIZE (sizeof(ACK_MSG)-sizeof(void*))
421 #if CMK_DIRECT
422 typedef struct{
423 uint64_t handler_addr;
424 }CMK_DIRECT_HEADER;
426 typedef struct {
427 char core[CmiMsgHeaderSizeBytes];
428 uint64_t handler;
429 }cmidirectMsg;
431 //SYH
432 CpvDeclare(int, CmiHandleDirectIdx);
433 void CmiHandleDirectMsg(cmidirectMsg* msg)
436 CmiDirectUserHandle *_handle= (CmiDirectUserHandle*)(msg->handler);
437 (*(_handle->callbackFnPtr))(_handle->callbackData);
438 CmiFree(msg);
441 void CmiDirectInit()
443 CpvInitialize(int, CmiHandleDirectIdx);
444 CpvAccess(CmiHandleDirectIdx) = CmiRegisterHandler( (CmiHandler) CmiHandleDirectMsg);
447 #endif
448 typedef struct rmda_msg
450 int destNode;
451 #if REMOTE_EVENT
452 int ack_index;
453 #endif
454 gni_post_descriptor_t *pd;
455 #if !CMK_SMP
456 struct rmda_msg *next;
457 #endif
458 }RDMA_REQUEST;
461 #if CMK_SMP
462 #define SMP_LOCKS 0
463 #define ONE_SEND_QUEUE 0
464 PCQueue sendRdmaBuf;
465 typedef struct msg_list_index
467 PCQueue sendSmsgBuf;
468 int pushed;
469 CmiNodeLock lock;
470 } MSG_LIST_INDEX;
471 char *destpe_avail;
472 #if !ONE_SEND_QUEUE && SMP_LOCKS
473 PCQueue nonEmptyQueues;
474 #endif
475 #else /* non-smp */
477 static RDMA_REQUEST *sendRdmaBuf = 0;
478 static RDMA_REQUEST *sendRdmaTail = 0;
479 typedef struct msg_list_index
481 int next;
482 MSG_LIST *sendSmsgBuf;
483 MSG_LIST *tail;
484 } MSG_LIST_INDEX;
486 #endif
488 // buffered send queue
489 #if ! ONE_SEND_QUEUE
490 typedef struct smsg_queue
492 MSG_LIST_INDEX *smsg_msglist_index;
493 int smsg_head_index;
494 } SMSG_QUEUE;
495 #else
496 typedef struct smsg_queue
498 PCQueue sendMsgBuf;
499 } SMSG_QUEUE;
500 #endif
502 SMSG_QUEUE smsg_queue;
503 #if CMK_USE_OOB
504 SMSG_QUEUE smsg_oob_queue;
505 #endif
507 #if CMK_SMP
509 #define FreeMsgList(d) free(d);
510 #define MallocMsgList(d) d = ((MSG_LIST*)malloc(sizeof(MSG_LIST)));
512 #else
514 static MSG_LIST *msglist_freelist=0;
516 #define FreeMsgList(d) \
517 do { \
518 (d)->next = msglist_freelist;\
519 msglist_freelist = d; \
520 } while (0)
522 #define MallocMsgList(d) \
523 do { \
524 d = msglist_freelist;\
525 if (d==0) {d = ((MSG_LIST*)malloc(sizeof(MSG_LIST)));\
526 _MEMCHECK(d);\
527 } else msglist_freelist = d->next; \
528 d->next =0; \
529 } while (0)
531 #endif
533 #if CMK_SMP
535 #define FreeControlMsg(d) free(d);
536 #define MallocControlMsg(d) d = ((CONTROL_MSG*)malloc(sizeof(CONTROL_MSG)));
538 #else
540 static CONTROL_MSG *control_freelist=0;
542 #define FreeControlMsg(d) \
543 do { \
544 (d)->next = control_freelist;\
545 control_freelist = d; \
546 } while (0);
548 #define MallocControlMsg(d) \
549 do { \
550 d = control_freelist;\
551 if (d==0) {d = ((CONTROL_MSG*)malloc(sizeof(CONTROL_MSG)));\
552 _MEMCHECK(d);\
553 } else control_freelist = d->next; \
554 } while (0);
556 #endif
558 #if CMK_SMP
560 #define FreeAckMsg(d) free(d);
561 #define MallocAckMsg(d) d = ((ACK_MSG*)malloc(sizeof(ACK_MSG)));
563 #else
565 static ACK_MSG *ack_freelist=0;
567 #define FreeAckMsg(d) \
568 do { \
569 (d)->next = ack_freelist;\
570 ack_freelist = d; \
571 } while (0)
573 #define MallocAckMsg(d) \
574 do { \
575 d = ack_freelist;\
576 if (d==0) {d = ((ACK_MSG*)malloc(sizeof(ACK_MSG)));\
577 _MEMCHECK(d);\
578 } else ack_freelist = d->next; \
579 } while (0)
581 #endif
584 # if CMK_SMP
585 #define FreeRdmaRequest(d) free(d);
586 #define MallocRdmaRequest(d) d = ((RDMA_REQUEST*)malloc(sizeof(RDMA_REQUEST)));
587 #else
589 static RDMA_REQUEST *rdma_freelist = NULL;
591 #define FreeRdmaRequest(d) \
592 do { \
593 (d)->next = rdma_freelist;\
594 rdma_freelist = d; \
595 } while (0)
597 #define MallocRdmaRequest(d) \
598 do { \
599 d = rdma_freelist;\
600 if (d==0) {d = ((RDMA_REQUEST*)malloc(sizeof(RDMA_REQUEST)));\
601 _MEMCHECK(d);\
602 } else rdma_freelist = d->next; \
603 d->next =0; \
604 } while (0)
605 #endif
607 /* reuse gni_post_descriptor_t */
608 static gni_post_descriptor_t *post_freelist=0;
610 #if !CMK_SMP
611 #define FreePostDesc(d) \
612 (d)->next_descr = post_freelist;\
613 post_freelist = d;
615 #define MallocPostDesc(d) \
616 d = post_freelist;\
617 if (d==0) { \
618 d = ((gni_post_descriptor_t*)malloc(sizeof(gni_post_descriptor_t)));\
619 d->next_descr = 0;\
620 _MEMCHECK(d);\
621 } else post_freelist = d->next_descr;
622 #else
624 #define FreePostDesc(d) free(d);
625 #define MallocPostDesc(d) d = ((gni_post_descriptor_t*)malloc(sizeof(gni_post_descriptor_t))); _MEMCHECK(d);
627 #endif
630 /* LrtsSent is called but message can not be sent by SMSGSend because of mailbox full or no credit */
631 static int buffered_smsg_counter = 0;
633 /* SmsgSend return success but message sent is not confirmed by remote side */
634 static MSG_LIST *buffered_fma_head = 0;
635 static MSG_LIST *buffered_fma_tail = 0;
637 /* functions */
638 #define IsFree(a,ind) !( a& (1<<(ind) ))
639 #define SET_BITS(a,ind) a = ( a | (1<<(ind )) )
640 #define Reset(a,ind) a = ( a & (~(1<<(ind))) )
642 CpvDeclare(mempool_type*, mempool);
644 #if REMOTE_EVENT
645 /* ack pool for remote events */
647 #define ACK_SHIFT 19
648 #define ACK_EVENT(idx) (((idx)<<ACK_SHIFT) | myrank)
649 #define ACK_GET_RANK(evt) ((evt) & ((1<<ACK_SHIFT)-1))
650 #define ACK_GET_INDEX(evt) ((evt) >> ACK_SHIFT)
652 struct IndexStruct {
653 void *addr;
654 int next;
655 int type; // 1: ACK 2: Persistent
658 typedef struct IndexPool {
659 struct IndexStruct *indexes;
660 int size;
661 int freehead;
662 CmiNodeLock lock;
663 } IndexPool;
665 static IndexPool ackPool;
668 #define GetIndexType(s) (ackPool.indexes[s].type)
669 #define GetIndexAddress(s) (ackPool.indexes[s].addr)
671 static void IndexPool_init(IndexPool *pool)
673 int i;
674 if ((1<<ACK_SHIFT) < mysize)
675 CmiAbort("Charm++ Error: Remote event's rank field overflow.");
676 pool->size = 2048;
677 pool->indexes = (struct IndexStruct *)malloc(pool->size*sizeof(struct IndexStruct));
678 for (i=0; i<pool->size-1; i++) {
679 pool->indexes[i].next = i+1;
681 pool->indexes[i].next = -1;
682 pool->freehead = 0;
683 #if MULTI_THREAD_SEND
684 pool->lock = CmiCreateLock();
685 #endif
688 static
689 inline int IndexPool_getslot(IndexPool *pool, void *addr, int type)
691 int i;
692 int s;
693 CMI_GNI_LOCK(pool->lock);
694 s = pool->freehead;
695 if (s == -1) {
696 int newsize = pool->size * 2;
697 printf("[%d] AckPool_getslot expand to: %d\n", myrank, newsize);
698 if (newsize > (1<<(32-ACK_SHIFT))) CmiAbort("AckPool too large");
699 struct IndexStruct *old_ackpool = pool->indexes;
700 pool->indexes = (struct IndexStruct *)malloc(newsize*sizeof(struct IndexStruct));
701 memcpy(pool->indexes, old_ackpool, pool->size*sizeof(struct IndexStruct));
702 for (i=pool->size; i<newsize-1; i++) {
703 pool->indexes[i].next = i+1;
705 pool->indexes[i].next = -1;
706 pool->freehead = pool->size;
707 s = pool->size;
708 pool->size = newsize;
709 free(old_ackpool);
711 pool->freehead = pool->indexes[s].next;
712 pool->indexes[s].addr = addr;
713 pool->indexes[s].type = type;
714 CMI_GNI_UNLOCK(pool->lock);
715 return s;
718 static
719 inline void IndexPool_freeslot(IndexPool *pool, int s)
721 CmiAssert(s>=0 && s<pool->size);
722 CMI_GNI_LOCK(pool->lock);
723 pool->indexes[s].next = pool->freehead;
724 pool->freehead = s;
725 CMI_GNI_UNLOCK(pool->lock);
729 #endif
731 /* =====Beginning of Definitions of Message-Corruption Related Macros=====*/
732 #define CMI_MAGIC(msg) ((CmiMsgHeaderBasic *)msg)->magic
733 #define CHARM_MAGIC_NUMBER 126
735 #if CMK_ERROR_CHECKING
736 extern unsigned char computeCheckSum(unsigned char *data, int len);
737 static int checksum_flag = 0;
738 #define CMI_SET_CHECKSUM(msg, len) \
739 if (checksum_flag) { \
740 ((CmiMsgHeaderBasic *)msg)->cksum = 0; \
741 ((CmiMsgHeaderBasic *)msg)->cksum = computeCheckSum((unsigned char*)msg, len); \
743 #define CMI_CHECK_CHECKSUM(msg, len) \
744 if (checksum_flag) \
745 if (computeCheckSum((unsigned char*)msg, len) != 0) \
746 CmiAbort("Fatal error: checksum doesn't agree!\n");
747 #else
748 #define CMI_SET_CHECKSUM(msg, len)
749 #define CMI_CHECK_CHECKSUM(msg, len)
750 #endif
751 /* =====End of Definitions of Message-Corruption Related Macros=====*/
753 static int print_stats = 0;
754 static int stats_off = 0;
755 void CmiTurnOnStats()
757 stats_off = 0;
760 void CmiTurnOffStats()
762 stats_off = 1;
765 #if CMK_WITH_STATS
766 FILE *counterLog = NULL;
767 typedef struct comm_thread_stats
769 uint64_t smsg_data_count;
770 uint64_t lmsg_init_count;
771 uint64_t ack_count;
772 uint64_t big_msg_ack_count;
773 uint64_t smsg_count;
774 //times of calling SmsgSend
775 uint64_t try_smsg_data_count;
776 uint64_t try_lmsg_init_count;
777 uint64_t try_ack_count;
778 uint64_t try_big_msg_ack_count;
779 uint64_t try_smsg_count;
781 double max_time_in_send_buffered_smsg;
782 double all_time_in_send_buffered_smsg;
784 uint64_t rdma_count;
785 uint64_t try_rdma_count;
786 double max_time_from_control_to_rdma_init;
787 double all_time_from_control_to_rdma_init;
789 double max_time_from_rdma_init_to_rdma_done;
790 double all_time_from_rdma_init_to_rdma_done;
791 } Comm_Thread_Stats;
793 static Comm_Thread_Stats comm_stats;
795 static void init_comm_stats()
797 memset(&comm_stats, 0, sizeof(Comm_Thread_Stats));
800 #define SMSG_CREATION( x ) if(print_stats && !stats_off) { x->creation_time = CmiWallTimer(); }
802 #define SMSG_SENT_DONE(creation_time, tag) \
803 if (print_stats && !stats_off) { if( tag == SMALL_DATA_TAG) comm_stats.smsg_data_count++; \
804 else if( tag == LMSG_INIT_TAG) comm_stats.lmsg_init_count++; \
805 else if( tag == ACK_TAG) comm_stats.ack_count++; \
806 else if( tag == BIG_MSG_TAG) comm_stats.big_msg_ack_count++; \
807 comm_stats.smsg_count++; \
808 double inbuff_time = CmiWallTimer() - creation_time; \
809 if(inbuff_time > comm_stats.max_time_in_send_buffered_smsg) comm_stats.max_time_in_send_buffered_smsg= inbuff_time; \
810 comm_stats.all_time_in_send_buffered_smsg += inbuff_time; \
813 #define SMSG_TRY_SEND(tag) \
814 if (print_stats && !stats_off){ if( tag == SMALL_DATA_TAG) comm_stats.try_smsg_data_count++; \
815 else if( tag == LMSG_INIT_TAG) comm_stats.try_lmsg_init_count++; \
816 else if( tag == ACK_TAG) comm_stats.try_ack_count++; \
817 else if( tag == BIG_MSG_TAG) comm_stats.try_big_msg_ack_count++; \
818 comm_stats.try_smsg_count++; \
821 #define RDMA_TRY_SEND() if (print_stats && !stats_off) {comm_stats.try_rdma_count++;}
823 #define RDMA_TRANS_DONE(x) \
824 if (print_stats && !stats_off) { double rdma_trans_time = CmiWallTimer() - x ; \
825 if(rdma_trans_time > comm_stats.max_time_from_rdma_init_to_rdma_done) comm_stats.max_time_from_rdma_init_to_rdma_done = rdma_trans_time; \
826 comm_stats.all_time_from_rdma_init_to_rdma_done += rdma_trans_time; \
829 #define RDMA_TRANS_INIT(x) \
830 if (print_stats && !stats_off) { comm_stats.rdma_count++; \
831 double rdma_trans_time = CmiWallTimer() - x ; \
832 if(rdma_trans_time > comm_stats.max_time_from_control_to_rdma_init) comm_stats.max_time_from_control_to_rdma_init = rdma_trans_time; \
833 comm_stats.all_time_from_control_to_rdma_init += rdma_trans_time; \
837 static void print_comm_stats()
839 fprintf(counterLog, "Node[%d]SMSG time in buffer\t[max:%f\tAverage:%f](milisecond)\n", myrank, 1000*comm_stats.max_time_in_send_buffered_smsg, 1000.0*comm_stats.all_time_in_send_buffered_smsg/comm_stats.smsg_count);
840 fprintf(counterLog, "Node[%d]Smsg Msgs \t[Total:%lld\t Data:%lld\t Lmsg_Init:%lld\t ACK:%lld\t BIG_MSG_ACK:%lld]\n", myrank,
841 comm_stats.smsg_count, comm_stats.smsg_data_count, comm_stats.lmsg_init_count,
842 comm_stats.ack_count, comm_stats.big_msg_ack_count);
844 fprintf(counterLog, "Node[%d]SmsgSendCalls\t[Total:%lld\t Data:%lld\t Lmsg_Init:%lld\t ACK:%lld\t BIG_MSG_ACK:%lld]\n\n", myrank,
845 comm_stats.try_smsg_count, comm_stats.try_smsg_data_count, comm_stats.try_lmsg_init_count,
846 comm_stats.try_ack_count, comm_stats.try_big_msg_ack_count);
848 fprintf(counterLog, "Node[%d]Rdma Transaction [count:%lld\t calls:%lld]\n", myrank, comm_stats.rdma_count, comm_stats.try_rdma_count);
849 fprintf(counterLog, "Node[%d]Rdma time from control arrives to rdma init [MAX:%f\t Average:%f](milisecond)\n", myrank, 1000.0*comm_stats.max_time_from_control_to_rdma_init, 1000.0*comm_stats.all_time_from_control_to_rdma_init/comm_stats.rdma_count);
850 fprintf(counterLog, "Node[%d]Rdma time from init to rdma done [MAX:%f\t Average:%f](milisecond)\n\n", myrank, 1000.0*comm_stats.max_time_from_rdma_init_to_rdma_done, 1000.0*comm_stats.all_time_from_rdma_init_to_rdma_done/comm_stats.rdma_count);
853 #else
854 #define STATS_ACK_TIME(x) x
855 #define STATS_SEND_SMSGS_TIME(x) x
856 #endif
858 static void
859 allgather(void *in,void *out, int len)
861 static int *ivec_ptr=NULL,already_called=0,job_size=0;
862 int i,rc;
863 int my_rank;
864 char *tmp_buf,*out_ptr;
866 if(!already_called) {
868 rc = PMI_Get_size(&job_size);
869 CmiAssert(rc == PMI_SUCCESS);
870 rc = PMI_Get_rank(&my_rank);
871 CmiAssert(rc == PMI_SUCCESS);
873 ivec_ptr = (int *)malloc(sizeof(int) * job_size);
874 CmiAssert(ivec_ptr != NULL);
876 rc = PMI_Allgather(&my_rank,ivec_ptr,sizeof(int));
877 CmiAssert(rc == PMI_SUCCESS);
879 already_called = 1;
883 tmp_buf = (char *)malloc(job_size * len);
884 CmiAssert(tmp_buf);
886 rc = PMI_Allgather(in,tmp_buf,len);
887 CmiAssert(rc == PMI_SUCCESS);
889 out_ptr = out;
891 for(i=0;i<job_size;i++) {
893 memcpy(&out_ptr[len * ivec_ptr[i]],&tmp_buf[i * len],len);
897 free(tmp_buf);
900 static void
901 allgather_2(void *in,void *out, int len)
903 //PMI_Allgather is out of order
904 int i,rc, extend_len;
905 int rank_index;
906 char *out_ptr, *out_ref;
907 char *in2;
909 extend_len = sizeof(int) + len;
910 in2 = (char*)malloc(extend_len);
912 memcpy(in2, &myrank, sizeof(int));
913 memcpy(in2+sizeof(int), in, len);
915 out_ptr = (char*)malloc(mysize*extend_len);
917 rc = PMI_Allgather(in2, out_ptr, extend_len);
918 GNI_RC_CHECK("allgather", rc);
920 out_ref = out;
922 for(i=0;i<mysize;i++) {
923 //rank index
924 memcpy(&rank_index, &(out_ptr[extend_len*i]), sizeof(int));
925 //copy to the rank index slot
926 memcpy(&out_ref[rank_index*len], &out_ptr[extend_len*i+sizeof(int)], len);
929 free(out_ptr);
930 free(in2);
934 static unsigned int get_gni_nic_address(int device_id)
936 unsigned int address, cpu_id;
937 gni_return_t status;
938 int i, alps_dev_id=-1,alps_address=-1;
939 char *token, *p_ptr;
941 p_ptr = getenv("PMI_GNI_DEV_ID");
942 if (!p_ptr) {
943 status = GNI_CdmGetNicAddress(device_id, &address, &cpu_id);
945 GNI_RC_CHECK("GNI_CdmGetNicAddress", status);
946 } else {
947 while ((token = strtok(p_ptr,":")) != NULL) {
948 alps_dev_id = atoi(token);
949 if (alps_dev_id == device_id) {
950 break;
952 p_ptr = NULL;
954 CmiAssert(alps_dev_id != -1);
955 p_ptr = getenv("PMI_GNI_LOC_ADDR");
956 CmiAssert(p_ptr != NULL);
957 i = 0;
958 while ((token = strtok(p_ptr,":")) != NULL) {
959 if (i == alps_dev_id) {
960 alps_address = atoi(token);
961 break;
963 p_ptr = NULL;
964 ++i;
966 CmiAssert(alps_address != -1);
967 address = alps_address;
969 return address;
972 static uint8_t get_ptag(void)
974 char *p_ptr, *token;
975 uint8_t ptag;
977 p_ptr = getenv("PMI_GNI_PTAG");
978 CmiAssert(p_ptr != NULL);
979 token = strtok(p_ptr, ":");
980 ptag = (uint8_t)atoi(token);
981 return ptag;
985 static uint32_t get_cookie(void)
987 uint32_t cookie;
988 char *p_ptr, *token;
990 p_ptr = getenv("PMI_GNI_COOKIE");
991 CmiAssert(p_ptr != NULL);
992 token = strtok(p_ptr, ":");
993 cookie = (uint32_t)atoi(token);
995 return cookie;
998 #if LARGEPAGE
1000 /* directly mmap memory from hugetlbfs for large pages */
1002 #include <sys/stat.h>
1003 #include <fcntl.h>
1004 #include <sys/mman.h>
1005 #include <hugetlbfs.h>
1007 // size must be _tlbpagesize aligned
1008 void *my_get_huge_pages(size_t size)
1010 char filename[512];
1011 int fd;
1012 mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
1013 void *ptr = NULL;
1015 snprintf(filename, sizeof(filename), "%s/charm_mempool.%d.%d", hugetlbfs_find_path_for_size(_tlbpagesize), getpid(), rand());
1016 fd = open(filename, O_RDWR | O_CREAT, mode);
1017 if (fd == -1) {
1018 CmiAbort("my_get_huge_pages: open filed");
1020 ptr = mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_PRIVATE, fd, 0);
1021 if (ptr == MAP_FAILED) ptr = NULL;
1022 //printf("[%d] my_get_huge_pages: %s %d %p\n", myrank, filename, size, ptr);
1023 close(fd);
1024 unlink(filename);
1025 return ptr;
1028 void my_free_huge_pages(void *ptr, int size)
1030 //printf("[%d] my_free_huge_pages: %p %d\n", myrank, ptr, size);
1031 int ret = munmap(ptr, size);
1032 if (ret == -1) CmiAbort("munmap failed in my_free_huge_pages");
1035 #endif
1037 /* =====Beginning of Definitions of Message-Corruption Related Macros=====*/
1038 /* TODO: add any that are related */
1039 /* =====End of Definitions of Message-Corruption Related Macros=====*/
1042 #include "machine-lrts.h"
1043 #include "machine-common-core.c"
1045 /* Network progress function is used to poll the network when for
1046 messages. This flushes receive buffers on some implementations*/
1047 #if CMK_MACHINE_PROGRESS_DEFINED
1048 void CmiMachineProgressImpl() {
1050 #endif
1052 static int SendBufferMsg(SMSG_QUEUE *queue);
1053 static void SendRdmaMsg();
1054 static void PumpNetworkSmsg();
1055 static void PumpLocalTransactions(gni_cq_handle_t tx_cqh, CmiNodeLock cq_lock);
1056 #if CQWRITE
1057 static void PumpCqWriteTransactions();
1058 #endif
1059 #if REMOTE_EVENT
1060 static void PumpRemoteTransactions();
1061 #endif
1063 #if MACHINE_DEBUG_LOG
1064 FILE *debugLog = NULL;
1065 static CmiInt8 buffered_recv_msg = 0;
1066 int lrts_smsg_success = 0;
1067 int lrts_received_msg = 0;
1068 #endif
1070 static void sweep_mempool(mempool_type *mptr)
1072 int n = 0;
1073 block_header *current = &(mptr->block_head);
1075 printf("[n %d %d] sweep_mempool slot START.\n", myrank, n++);
1076 while( current!= NULL) {
1077 printf("[n %d %d] sweep_mempool slot %p size: %lld used: %d (%d %d) %lld %lld.\n", myrank, n++, current, current->size, 1<<current->used, current->msgs_in_send, current->msgs_in_recv, current->mem_hndl.qword1, current->mem_hndl.qword2);
1078 current = current->block_next?(block_header *)((char*)mptr+current->block_next):NULL;
1080 printf("[n %d] sweep_mempool slot END.\n", myrank);
1083 inline
1084 static gni_return_t deregisterMemory(mempool_type *mptr, block_header **from)
1086 block_header *current = *from;
1088 //while(register_memory_size>= MAX_REG_MEM)
1090 while( current!= NULL && ((current->msgs_in_send+current->msgs_in_recv)>0 || IsMemHndlZero(current->mem_hndl) ))
1091 current = current->block_next?(block_header *)((char*)mptr+current->block_next):NULL;
1093 *from = current;
1094 if(current == NULL) return GNI_RC_ERROR_RESOURCE;
1095 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(GetMemHndlFromBlockHeader(current)) , &omdh, GetSizeFromBlockHeader(current));
1096 SetMemHndlZero(GetMemHndlFromBlockHeader(current));
1098 return GNI_RC_SUCCESS;
1101 inline
1102 static gni_return_t registerFromMempool(mempool_type *mptr, void *blockaddr, size_t size, gni_mem_handle_t *memhndl, gni_cq_handle_t cqh )
1104 gni_return_t status = GNI_RC_SUCCESS;
1105 //int size = GetMempoolsize(msg);
1106 //void *blockaddr = GetMempoolBlockPtr(msg);
1107 //gni_mem_handle_t *memhndl = &(GetMemHndl(msg));
1109 block_header *current = &(mptr->block_head);
1110 while(register_memory_size>= MAX_REG_MEM)
1112 status = deregisterMemory(mptr, &current);
1113 if (status != GNI_RC_SUCCESS) break;
1115 if(register_memory_size>= MAX_REG_MEM) return status;
1117 MACHSTATE3(8, "mempool (%lld,%lld,%d) \n", buffered_send_msg, buffered_recv_msg, register_memory_size);
1118 while(1)
1120 MEMORY_REGISTER(onesided_hnd, nic_hndl, blockaddr, size, memhndl, &omdh, cqh, status);
1121 if(status == GNI_RC_SUCCESS)
1123 break;
1125 else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1127 CmiAbort("Memory registor for mempool fails\n");
1129 else
1131 status = deregisterMemory(mptr, &current);
1132 if (status != GNI_RC_SUCCESS) break;
1135 return status;
1138 inline
1139 static gni_return_t registerMemory(void *msg, size_t size, gni_mem_handle_t *t, gni_cq_handle_t cqh )
1141 static int rank = -1;
1142 int i;
1143 gni_return_t status;
1144 mempool_type *mptr1 = CpvAccess(mempool);//mempool_type*)GetMempoolPtr(msg);
1145 //mempool_type *mptr1 = (mempool_type*)GetMempoolPtr(msg);
1146 mempool_type *mptr;
1148 status = registerFromMempool(mptr1, msg, size, t, cqh);
1149 if (status == GNI_RC_SUCCESS) return status;
1150 #if CMK_SMP
1151 for (i=0; i<CmiMyNodeSize()+1; i++) {
1152 rank = (rank+1)%(CmiMyNodeSize()+1);
1153 mptr = CpvAccessOther(mempool, rank);
1154 if (mptr == mptr1) continue;
1155 status = registerFromMempool(mptr, msg, size, t, cqh);
1156 if (status == GNI_RC_SUCCESS) return status;
1158 #endif
1159 return GNI_RC_ERROR_RESOURCE;
1162 inline
1163 static void buffer_small_msgs(SMSG_QUEUE *queue, void *msg, int size, int destNode, uint8_t tag)
1165 MSG_LIST *msg_tmp;
1166 MallocMsgList(msg_tmp);
1167 msg_tmp->destNode = destNode;
1168 msg_tmp->size = size;
1169 msg_tmp->msg = msg;
1170 msg_tmp->tag = tag;
1171 #if CMK_WITH_STATS
1172 SMSG_CREATION(msg_tmp)
1173 #endif
1174 #if !CMK_SMP
1175 if (queue->smsg_msglist_index[destNode].sendSmsgBuf == 0 ) {
1176 queue->smsg_msglist_index[destNode].next = queue->smsg_head_index;
1177 queue->smsg_head_index = destNode;
1178 queue->smsg_msglist_index[destNode].tail = queue->smsg_msglist_index[destNode].sendSmsgBuf = msg_tmp;
1179 }else
1181 queue->smsg_msglist_index[destNode].tail->next = msg_tmp;
1182 queue->smsg_msglist_index[destNode].tail = msg_tmp;
1184 #else
1185 #if ONE_SEND_QUEUE
1186 PCQueuePush(queue->sendMsgBuf, (char*)msg_tmp);
1187 #else
1188 #if SMP_LOCKS
1189 CmiLock(queue->smsg_msglist_index[destNode].lock);
1190 if(queue->smsg_msglist_index[destNode].pushed == 0)
1192 PCQueuePush(nonEmptyQueues, (char*)&(queue->smsg_msglist_index[destNode]));
1194 PCQueuePush(queue->smsg_msglist_index[destNode].sendSmsgBuf, (char*)msg_tmp);
1195 CmiUnlock(queue->smsg_msglist_index[destNode].lock);
1196 #else
1197 PCQueuePush(queue->smsg_msglist_index[destNode].sendSmsgBuf, (char*)msg_tmp);
1198 #endif
1199 #endif
1200 #endif
1201 #if PRINT_SYH
1202 buffered_smsg_counter++;
1203 #endif
1206 inline static void print_smsg_attr(gni_smsg_attr_t *a)
1208 printf("type=%d\n, credit=%d\n, size=%d\n, buf=%p, offset=%d\n", a->msg_type, a->mbox_maxcredit, a->buff_size, a->msg_buffer, a->mbox_offset);
1211 inline
1212 static void setup_smsg_connection(int destNode)
1214 mdh_addr_list_t *new_entry = 0;
1215 gni_post_descriptor_t *pd;
1216 gni_smsg_attr_t *smsg_attr;
1217 gni_return_t status = GNI_RC_NOT_DONE;
1218 RDMA_REQUEST *rdma_request_msg;
1220 if(smsg_available_slot == smsg_expand_slots)
1222 new_entry = (mdh_addr_list_t*)malloc(sizeof(mdh_addr_list_t));
1223 new_entry->addr = memalign(64, smsg_memlen*smsg_expand_slots);
1224 bzero(new_entry->addr, smsg_memlen*smsg_expand_slots);
1226 status = GNI_MemRegister(nic_hndl, (uint64_t)new_entry->addr,
1227 smsg_memlen*smsg_expand_slots, smsg_rx_cqh,
1228 GNI_MEM_READWRITE,
1230 &(new_entry->mdh));
1231 smsg_available_slot = 0;
1232 new_entry->next = smsg_dynamic_list;
1233 smsg_dynamic_list = new_entry;
1235 smsg_attr = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
1236 smsg_attr->msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
1237 smsg_attr->mbox_maxcredit = SMSG_MAX_CREDIT;
1238 smsg_attr->msg_maxsize = SMSG_MAX_MSG;
1239 smsg_attr->mbox_offset = smsg_available_slot * smsg_memlen;
1240 smsg_attr->buff_size = smsg_memlen;
1241 smsg_attr->msg_buffer = smsg_dynamic_list->addr;
1242 smsg_attr->mem_hndl = smsg_dynamic_list->mdh;
1243 smsg_local_attr_vec[destNode] = smsg_attr;
1244 smsg_available_slot++;
1245 MallocPostDesc(pd);
1246 pd->type = GNI_POST_FMA_PUT;
1247 //pd->cq_mode = GNI_CQMODE_GLOBAL_EVENT | GNI_CQMODE_REMOTE_EVENT;
1248 pd->cq_mode = GNI_CQMODE_GLOBAL_EVENT ;
1249 pd->dlvr_mode = GNI_DLVMODE_PERFORMANCE;
1250 pd->length = sizeof(gni_smsg_attr_t);
1251 pd->local_addr = (uint64_t) smsg_attr;
1252 pd->remote_addr = (uint64_t)&((((gni_smsg_attr_t*)(smsg_connection_vec[destNode].addr))[myrank]));
1253 pd->remote_mem_hndl = smsg_connection_vec[destNode].mdh;
1254 pd->src_cq_hndl = rdma_tx_cqh;
1255 pd->rdma_mode = 0;
1256 status = GNI_PostFma(ep_hndl_array[destNode], pd);
1257 print_smsg_attr(smsg_attr);
1258 if(status == GNI_RC_ERROR_RESOURCE )
1260 MallocRdmaRequest(rdma_request_msg);
1261 rdma_request_msg->destNode = destNode;
1262 rdma_request_msg->pd = pd;
1263 /* buffer this request */
1265 #if PRINT_SYH
1266 if(status != GNI_RC_SUCCESS)
1267 printf("[%d=%d] send post FMA %s\n", myrank, destNode, gni_err_str[status]);
1268 else
1269 printf("[%d=%d]OK send post FMA \n", myrank, destNode);
1270 #endif
1273 /* useDynamicSMSG */
1274 inline
1275 static void alloc_smsg_attr( gni_smsg_attr_t *local_smsg_attr)
1277 gni_return_t status = GNI_RC_NOT_DONE;
1279 if(mailbox_list->offset == mailbox_list->size)
1281 dynamic_smsg_mailbox_t *new_mailbox_entry;
1282 new_mailbox_entry = (dynamic_smsg_mailbox_t*)malloc(sizeof(dynamic_smsg_mailbox_t));
1283 new_mailbox_entry->size = smsg_memlen*avg_smsg_connection;
1284 new_mailbox_entry->mailbox_base = malloc(new_mailbox_entry->size);
1285 bzero(new_mailbox_entry->mailbox_base, new_mailbox_entry->size);
1286 new_mailbox_entry->offset = 0;
1288 status = GNI_MemRegister(nic_hndl, (uint64_t)new_mailbox_entry->mailbox_base,
1289 new_mailbox_entry->size, smsg_rx_cqh,
1290 GNI_MEM_READWRITE,
1292 &(new_mailbox_entry->mem_hndl));
1294 GNI_RC_CHECK("register", status);
1295 new_mailbox_entry->next = mailbox_list;
1296 mailbox_list = new_mailbox_entry;
1298 local_smsg_attr->msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
1299 local_smsg_attr->mbox_maxcredit = SMSG_MAX_CREDIT;
1300 local_smsg_attr->msg_maxsize = SMSG_MAX_MSG;
1301 local_smsg_attr->mbox_offset = mailbox_list->offset;
1302 mailbox_list->offset += smsg_memlen;
1303 local_smsg_attr->buff_size = smsg_memlen;
1304 local_smsg_attr->msg_buffer = mailbox_list->mailbox_base;
1305 local_smsg_attr->mem_hndl = mailbox_list->mem_hndl;
1308 /* useDynamicSMSG */
1309 inline
1310 static int connect_to(int destNode)
1312 gni_return_t status = GNI_RC_NOT_DONE;
1313 CmiAssert(smsg_connected_flag[destNode] == 0);
1314 CmiAssert (smsg_attr_vector_local[destNode] == NULL);
1315 smsg_attr_vector_local[destNode] = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
1316 alloc_smsg_attr(smsg_attr_vector_local[destNode]);
1317 smsg_attr_vector_remote[destNode] = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
1319 CMI_GNI_LOCK(global_gni_lock)
1320 status = GNI_EpPostDataWId (ep_hndl_array[destNode], smsg_attr_vector_local[destNode], sizeof(gni_smsg_attr_t),smsg_attr_vector_remote[destNode] ,sizeof(gni_smsg_attr_t), destNode+mysize);
1321 CMI_GNI_UNLOCK(global_gni_lock)
1322 if (status == GNI_RC_ERROR_RESOURCE) {
1323 /* possibly destNode is making connection at the same time */
1324 free(smsg_attr_vector_local[destNode]);
1325 smsg_attr_vector_local[destNode] = NULL;
1326 free(smsg_attr_vector_remote[destNode]);
1327 smsg_attr_vector_remote[destNode] = NULL;
1328 mailbox_list->offset -= smsg_memlen;
1329 return 0;
1331 GNI_RC_CHECK("GNI_Post", status);
1332 smsg_connected_flag[destNode] = 1;
1333 return 1;
1336 inline
1337 static gni_return_t send_smsg_message(SMSG_QUEUE *queue, int destNode, void *msg, int size, uint8_t tag, int inbuff, MSG_LIST *ptr )
1339 unsigned int remote_address;
1340 uint32_t remote_id;
1341 gni_return_t status = GNI_RC_ERROR_RESOURCE;
1342 gni_smsg_attr_t *smsg_attr;
1343 gni_post_descriptor_t *pd;
1344 gni_post_state_t post_state;
1345 char *real_data;
1347 if (useDynamicSMSG) {
1348 switch (smsg_connected_flag[destNode]) {
1349 case 0:
1350 connect_to(destNode); /* continue to case 1 */
1351 case 1: /* pending connection, do nothing */
1352 status = GNI_RC_NOT_DONE;
1353 if(inbuff ==0)
1354 buffer_small_msgs(queue, msg, size, destNode, tag);
1355 return status;
1358 #if CMK_SMP
1359 #if ! ONE_SEND_QUEUE
1360 if(PCQueueEmpty(queue->smsg_msglist_index[destNode].sendSmsgBuf) || inbuff==1)
1361 #endif
1363 #else
1364 if(queue->smsg_msglist_index[destNode].sendSmsgBuf == 0 || inbuff==1)
1366 #endif
1367 //CMI_GNI_LOCK(smsg_mailbox_lock)
1368 CMI_GNI_LOCK(default_tx_cq_lock)
1369 #if CMK_SMP_TRACE_COMMTHREAD
1370 int oldpe = -1;
1371 int oldeventid = -1;
1372 if(tag == SMALL_DATA_TAG || tag == LMSG_INIT_TAG)
1374 START_EVENT();
1375 if ( tag == SMALL_DATA_TAG)
1376 real_data = (char*)msg;
1377 else
1378 real_data = (char*)(((CONTROL_MSG*)msg)->source_addr);
1379 TRACE_COMM_GET_MSGID(real_data, &oldpe, &oldeventid);
1380 TRACE_COMM_SET_COMM_MSGID(real_data);
1382 #endif
1383 #if REMOTE_EVENT
1384 if (tag == LMSG_INIT_TAG) {
1385 CONTROL_MSG *control_msg_tmp = (CONTROL_MSG*)msg;
1386 if (control_msg_tmp->seq_id == 0 && control_msg_tmp->ack_index == -1)
1387 control_msg_tmp->ack_index = IndexPool_getslot(&ackPool, (void*)control_msg_tmp->source_addr, 1);
1389 // GNI_EpSetEventData(ep_hndl_array[destNode], destNode, myrank);
1390 #endif
1391 #if CMK_WITH_STATS
1392 SMSG_TRY_SEND(tag)
1393 #endif
1394 #if CMK_WITH_STATS
1395 double creation_time;
1396 if (ptr == NULL)
1397 creation_time = CmiWallTimer();
1398 else
1399 creation_time = ptr->creation_time;
1400 #endif
1402 status = GNI_SmsgSendWTag(ep_hndl_array[destNode], NULL, 0, msg, size, 0, tag);
1403 #if CMK_SMP_TRACE_COMMTHREAD
1404 if (oldpe != -1) TRACE_COMM_SET_MSGID(real_data, oldpe, oldeventid);
1405 #endif
1406 CMI_GNI_UNLOCK(default_tx_cq_lock)
1407 //CMI_GNI_UNLOCK(smsg_mailbox_lock)
1408 if(status == GNI_RC_SUCCESS)
1410 #if CMK_WITH_STATS
1411 SMSG_SENT_DONE(creation_time,tag)
1412 #endif
1413 #if CMK_SMP_TRACE_COMMTHREAD
1414 if(tag == SMALL_DATA_TAG || tag == LMSG_INIT_TAG )
1416 TRACE_COMM_CREATION(CpvAccess(projTraceStart), real_data);
1418 #endif
1419 }else
1420 status = GNI_RC_ERROR_RESOURCE;
1422 if(status != GNI_RC_SUCCESS && inbuff ==0)
1423 buffer_small_msgs(queue, msg, size, destNode, tag);
1424 return status;
1427 inline
1428 static CONTROL_MSG* construct_control_msg(int size, char *msg, uint8_t seqno)
1430 /* construct a control message and send */
1431 CONTROL_MSG *control_msg_tmp;
1432 MallocControlMsg(control_msg_tmp);
1433 control_msg_tmp->source_addr = (uint64_t)msg;
1434 control_msg_tmp->seq_id = seqno;
1435 control_msg_tmp->total_length = control_msg_tmp->length = ALIGN64(size); //for GET 4 bytes aligned
1436 #if REMOTE_EVENT
1437 control_msg_tmp->ack_index = -1;
1438 #endif
1439 #if USE_LRTS_MEMPOOL
1440 if(size < BIG_MSG)
1442 control_msg_tmp->source_mem_hndl = GetMemHndl(msg);
1444 else
1446 SetMemHndlZero(control_msg_tmp->source_mem_hndl);
1447 control_msg_tmp->length = size - (seqno-1)*ONE_SEG;
1448 if (control_msg_tmp->length > ONE_SEG) control_msg_tmp->length = ONE_SEG;
1450 #else
1451 SetMemHndlZero(control_msg_tmp->source_mem_hndl);
1452 #endif
1453 return control_msg_tmp;
1456 #define BLOCKING_SEND_CONTROL 0
1458 // Large message, send control to receiver, receiver register memory and do a GET,
1459 // return 1 - send no success
1460 inline static gni_return_t send_large_messages(SMSG_QUEUE *queue, int destNode, CONTROL_MSG *control_msg_tmp, int inbuff, MSG_LIST *smsg_ptr)
1462 gni_return_t status = GNI_RC_ERROR_NOMEM;
1463 uint32_t vmdh_index = -1;
1464 int size;
1465 int offset = 0;
1466 uint64_t source_addr;
1467 int register_size;
1468 void *msg;
1470 size = control_msg_tmp->total_length;
1471 source_addr = control_msg_tmp->source_addr;
1472 register_size = control_msg_tmp->length;
1474 #if USE_LRTS_MEMPOOL
1475 if( control_msg_tmp->seq_id == 0 ){
1476 #if BLOCKING_SEND_CONTROL
1477 if (inbuff == 0 && IsMemHndlZero(GetMemHndl(source_addr))) {
1478 while (IsMemHndlZero(GetMemHndl(source_addr)) && buffered_send_msg + GetMempoolsize((void*)source_addr) >= MAX_BUFF_SEND)
1479 LrtsAdvanceCommunication(0);
1481 #endif
1482 if(IsMemHndlZero(GetMemHndl(source_addr))) //it is in mempool, it is possible to be de-registered by others
1484 msg = (void*)source_addr;
1485 if(buffered_send_msg + GetMempoolsize(msg) >= MAX_BUFF_SEND)
1487 if(!inbuff)
1488 buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1489 return GNI_RC_ERROR_NOMEM;
1491 //register the corresponding mempool
1492 status = registerMemory(GetMempoolBlockPtr(msg), GetMempoolsize(msg), &(GetMemHndl(msg)), rdma_rx_cqh);
1493 if(status == GNI_RC_SUCCESS)
1495 control_msg_tmp->source_mem_hndl = GetMemHndl(source_addr);
1497 }else
1499 control_msg_tmp->source_mem_hndl = GetMemHndl(source_addr);
1500 status = GNI_RC_SUCCESS;
1502 if(NoMsgInSend(source_addr))
1503 register_size = GetMempoolsize((void*)(source_addr));
1504 else
1505 register_size = 0;
1506 }else if(control_msg_tmp->seq_id >0) // BIG_MSG
1508 int offset = ONE_SEG*(control_msg_tmp->seq_id-1);
1509 source_addr += offset;
1510 size = control_msg_tmp->length;
1511 #if BLOCKING_SEND_CONTROL
1512 if (inbuff == 0 && IsMemHndlZero(control_msg_tmp->source_mem_hndl)) {
1513 while (IsMemHndlZero(control_msg_tmp->source_mem_hndl) && buffered_send_msg + size >= MAX_BUFF_SEND)
1514 LrtsAdvanceCommunication(0);
1516 #endif
1517 if (IsMemHndlZero(control_msg_tmp->source_mem_hndl)) {
1518 if(buffered_send_msg + size >= MAX_BUFF_SEND)
1520 if(!inbuff)
1521 buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1522 return GNI_RC_ERROR_NOMEM;
1524 status = registerMemory((void*)source_addr, ALIGN64(size), &(control_msg_tmp->source_mem_hndl), NULL);
1525 if(status == GNI_RC_SUCCESS) buffered_send_msg += ALIGN64(size);
1527 else
1529 status = GNI_RC_SUCCESS;
1531 register_size = 0;
1534 if(status == GNI_RC_SUCCESS)
1536 status = send_smsg_message( queue, destNode, control_msg_tmp, CONTROL_MSG_SIZE, LMSG_INIT_TAG, inbuff, smsg_ptr);
1537 if(status == GNI_RC_SUCCESS)
1539 buffered_send_msg += register_size;
1540 if(control_msg_tmp->seq_id == 0)
1542 IncreaseMsgInSend(source_addr);
1544 FreeControlMsg(control_msg_tmp);
1545 MACHSTATE5(8, "GO SMSG LARGE to %d (%d,%d,%d) tag=%d\n", destNode, buffered_send_msg, buffered_recv_msg, register_memory_size, LMSG_INIT_TAG);
1546 }else
1547 status = GNI_RC_ERROR_RESOURCE;
1549 } else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1551 CmiAbort("Memory registor for large msg\n");
1552 }else
1554 status = GNI_RC_ERROR_NOMEM;
1555 if(!inbuff)
1556 buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1558 return status;
1559 #else
1560 MEMORY_REGISTER(onesided_hnd, nic_hndl,msg, ALIGN64(size), &(control_msg_tmp->source_mem_hndl), &omdh, NULL, status)
1561 if(status == GNI_RC_SUCCESS)
1563 status = send_smsg_message(queue, destNode, control_msg_tmp, CONTROL_MSG_SIZE, LMSG_INIT_TAG, 0, NULL);
1564 if(status == GNI_RC_SUCCESS)
1566 FreeControlMsg(control_msg_tmp);
1568 } else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1570 CmiAbort("Memory registor for large msg\n");
1571 }else
1573 buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1575 return status;
1576 #endif
1579 inline void LrtsPrepareEnvelope(char *msg, int size)
1581 CmiSetMsgSize(msg, size);
1582 CMI_SET_CHECKSUM(msg, size);
1585 CmiCommHandle LrtsSendFunc(int destNode, int size, char *msg, int mode)
1587 gni_return_t status = GNI_RC_SUCCESS;
1588 uint8_t tag;
1589 CONTROL_MSG *control_msg_tmp;
1590 int oob = ( mode & OUT_OF_BAND);
1591 SMSG_QUEUE *queue;
1593 MACHSTATE5(8, "GO LrtsSendFn %d(%d) (%d,%d, %d) \n", destNode, size, buffered_send_msg, buffered_recv_msg, register_memory_size);
1594 #if CMK_USE_OOB
1595 queue = oob? &smsg_oob_queue : &smsg_queue;
1596 #else
1597 queue = &smsg_queue;
1598 #endif
1600 LrtsPrepareEnvelope(msg, size);
1602 #if PRINT_SYH
1603 printf("LrtsSendFn %d==>%d, size=%d\n", myrank, destNode, size);
1604 #endif
1605 #if CMK_SMP
1606 if(size <= SMSG_MAX_MSG)
1607 buffer_small_msgs(queue, msg, size, destNode, SMALL_DATA_TAG);
1608 else if (size < BIG_MSG) {
1609 control_msg_tmp = construct_control_msg(size, msg, 0);
1610 buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1612 else {
1613 CmiSetMsgSeq(msg, 0);
1614 control_msg_tmp = construct_control_msg(size, msg, 1);
1615 buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, LMSG_INIT_TAG);
1617 #else //non-smp, smp(worker sending)
1618 if(size <= SMSG_MAX_MSG)
1620 if (GNI_RC_SUCCESS == send_smsg_message(queue, destNode, msg, size, SMALL_DATA_TAG, 0, NULL))
1621 CmiFree(msg);
1623 else if (size < BIG_MSG) {
1624 control_msg_tmp = construct_control_msg(size, msg, 0);
1625 send_large_messages(queue, destNode, control_msg_tmp, 0, NULL);
1627 else {
1628 #if USE_LRTS_MEMPOOL
1629 CmiSetMsgSeq(msg, 0);
1630 control_msg_tmp = construct_control_msg(size, msg, 1);
1631 send_large_messages(queue, destNode, control_msg_tmp, 0, NULL);
1632 #else
1633 control_msg_tmp = construct_control_msg(size, msg, 0);
1634 send_large_messages(queue, destNode, control_msg_tmp, 0, NULL);
1635 #endif
1637 #endif
1638 return 0;
1641 static void PumpDatagramConnection();
1642 static int event_SetupConnect = 111;
1643 static int event_PumpSmsg = 222 ;
1644 static int event_PumpTransaction = 333;
1645 static int event_PumpRdmaTransaction = 444;
1646 static int event_SendBufferSmsg = 444;
1647 static int event_SendFmaRdmaMsg = 555;
1648 static int event_AdvanceCommunication = 666;
1650 static void registerUserTraceEvents() {
1651 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1652 event_SetupConnect = traceRegisterUserEvent("setting up connections", -1 );
1653 event_PumpSmsg = traceRegisterUserEvent("Pump network small msgs", -1);
1654 event_PumpTransaction = traceRegisterUserEvent("Pump FMA local transaction" , -1);
1655 event_PumpRdmaTransaction = traceRegisterUserEvent("Pump RDMA local transaction" , -1);
1656 event_SendBufferSmsg = traceRegisterUserEvent("Sending buffered small msgs", -1);
1657 event_SendFmaRdmaMsg = traceRegisterUserEvent("Sending buffered fma/rdma transactions", -1);
1658 event_AdvanceCommunication = traceRegisterUserEvent("Worker thread in sending/receiving", -1);
1659 #endif
1662 static void ProcessDeadlock()
1664 static CmiUInt8 *ptr = NULL;
1665 static CmiUInt8 last = 0, mysum, sum;
1666 static int count = 0;
1667 gni_return_t status;
1668 int i;
1670 //printf("[%d] comm thread detected hang %d %d %d\n", CmiMyPe(), smsg_send_count, smsg_recv_count, count);
1671 //sweep_mempool(CpvAccess(mempool));
1672 if (ptr == NULL) ptr = (CmiUInt8*)malloc(mysize * sizeof(CmiUInt8));
1673 mysum = smsg_send_count + smsg_recv_count;
1674 MACHSTATE5(9,"Before allgather Progress Deadlock (%d,%d) (%d,%d)(%d)\n", buffered_send_msg, register_memory_size, last, sum, count);
1675 status = PMI_Allgather(&mysum,ptr,sizeof(CmiUInt8));
1676 GNI_RC_CHECK("PMI_Allgather", status);
1677 sum = 0;
1678 for (i=0; i<mysize; i++) sum+= ptr[i];
1679 if (last == 0 || sum == last)
1680 count++;
1681 else
1682 count = 0;
1683 last = sum;
1684 MACHSTATE5(9,"Progress Deadlock (%d,%d) (%d,%d)(%d)\n", buffered_send_msg, register_memory_size, last, sum, count);
1685 if (count == 2) {
1686 /* detected twice, it is a real deadlock */
1687 if (myrank == 0) {
1688 CmiPrintf("Charm++> Network progress engine appears to have stalled, possibly because registered memory limits have been exceeded or are too low. Try adjusting environment variables CHARM_UGNI_MEMPOOL_MAX and CHARM_UGNI_SEND_MAX (current limits are %lld and %lld).\n", MAX_REG_MEM, MAX_BUFF_SEND);
1689 CmiAbort("Fatal> Deadlock detected.");
1693 _detected_hang = 0;
1696 static void CheckProgress()
1698 if (smsg_send_count == last_smsg_send_count &&
1699 smsg_recv_count == last_smsg_recv_count )
1701 _detected_hang = 1;
1702 #if !CMK_SMP
1703 if (_detected_hang) ProcessDeadlock();
1704 #endif
1707 else {
1708 //MACHSTATE5(9,"--Check Progress %d(%d, %d) (%d,%d)\n", mycount, buffered_send_msg, register_memory_size, smsg_send_count, smsg_recv_count);
1709 last_smsg_send_count = smsg_send_count;
1710 last_smsg_recv_count = smsg_recv_count;
1711 _detected_hang = 0;
1715 static void set_limit()
1717 //if (!user_set_flag && CmiMyRank() == 0) {
1718 if (CmiMyRank() == 0) {
1719 int mynode = CmiPhysicalNodeID(CmiMyPe());
1720 int numpes = CmiNumPesOnPhysicalNode(mynode);
1721 int numprocesses = numpes / CmiMyNodeSize();
1722 MAX_REG_MEM = _totalmem / numprocesses;
1723 MAX_BUFF_SEND = MAX_REG_MEM / 2;
1724 if (CmiMyPe() == 0)
1725 printf("mem_max = %.2fM, send_max =%.2fM\n", MAX_REG_MEM/1024.0/1024, MAX_BUFF_SEND/1024./1024);
1726 if(CmiMyPe() == 0 && (smsg_memlen*mysize + _expand_mem > MAX_BUFF_SEND || smsg_memlen*mysize + _mempool_size > MAX_BUFF_SEND))
1728 printf("Charm++> FATAL ERROR your program has risk of hanging \n please try large page or use Dynamic smsg +useDynamicSmsg or contact Charm++ developers\n");
1729 CmiAbort("memory registration\n");
1734 void LrtsPostCommonInit(int everReturn)
1736 #if CMK_DIRECT
1737 CmiDirectInit();
1738 #endif
1739 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1740 CpvInitialize(double, projTraceStart);
1741 /* only PE 0 needs to care about registration (to generate sts file). */
1742 //if (CmiMyPe() == 0)
1744 registerMachineUserEventsFunction(&registerUserTraceEvents);
1746 #endif
1748 #if CMK_SMP
1749 CmiIdleState *s=CmiNotifyGetState();
1750 CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,(CcdVoidFn)CmiNotifyBeginIdle,(void *)s);
1751 CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,(void *)s);
1752 #else
1753 CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,(CcdVoidFn)CmiNotifyStillIdle,NULL);
1754 if (useDynamicSMSG)
1755 CcdCallOnConditionKeep(CcdPERIODIC_10ms, (CcdVoidFn) PumpDatagramConnection, NULL);
1756 #endif
1758 #if ! LARGEPAGE
1759 if (_checkProgress)
1760 #if CMK_SMP
1761 if (CmiMyRank() == 0)
1762 #endif
1763 CcdCallOnConditionKeep(CcdPERIODIC_2minute, (CcdVoidFn) CheckProgress, NULL);
1764 #endif
1766 #if !LARGEPAGE
1767 CcdCallOnCondition(CcdTOPOLOGY_AVAIL, (CcdVoidFn)set_limit, NULL);
1768 #endif
1771 /* this is called by worker thread */
1772 void LrtsPostNonLocal(){
1773 #if CMK_SMP_TRACE_COMMTHREAD
1774 double startT, endT;
1775 #endif
1776 #if MULTI_THREAD_SEND
1777 if(mysize == 1) return;
1778 #if CMK_SMP_TRACE_COMMTHREAD
1779 traceEndIdle();
1780 #endif
1782 #if CMK_SMP_TRACE_COMMTHREAD
1783 startT = CmiWallTimer();
1784 #endif
1786 #if CMK_WORKER_SINGLE_TASK
1787 if (CmiMyRank() % 6 == 0)
1788 #endif
1789 PumpNetworkSmsg();
1791 #if CMK_WORKER_SINGLE_TASK
1792 if (CmiMyRank() % 6 == 1)
1793 #endif
1794 PumpLocalTransactions(default_tx_cqh, default_tx_cq_lock);
1796 #if CMK_WORKER_SINGLE_TASK
1797 if (CmiMyRank() % 6 == 2)
1798 #endif
1799 PumpLocalTransactions(rdma_tx_cqh, rdma_tx_cq_lock);
1801 #if REMOTE_EVENT
1802 #if CMK_WORKER_SINGLE_TASK
1803 if (CmiMyRank() % 6 == 3)
1804 #endif
1805 PumpRemoteTransactions();
1806 #endif
1808 #if CMK_USE_OOB
1809 if (SendBufferMsg(&smsg_oob_queue) == 1)
1810 #endif
1812 #if CMK_WORKER_SINGLE_TASK
1813 if (CmiMyRank() % 6 == 4)
1814 #endif
1815 SendBufferMsg(&smsg_queue);
1818 #if CMK_WORKER_SINGLE_TASK
1819 if (CmiMyRank() % 6 == 5)
1820 #endif
1821 SendRdmaMsg();
1823 #if CMK_SMP_TRACE_COMMTHREAD
1824 endT = CmiWallTimer();
1825 traceUserBracketEvent(event_AdvanceCommunication, startT, endT);
1826 #endif
1827 #if CMK_SMP_TRACE_COMMTHREAD
1828 traceBeginIdle();
1829 #endif
1830 #endif
1833 /* useDynamicSMSG */
1834 static void PumpDatagramConnection()
1836 uint32_t remote_address;
1837 uint32_t remote_id;
1838 gni_return_t status;
1839 gni_post_state_t post_state;
1840 uint64_t datagram_id;
1841 int i;
1843 while ((status = GNI_PostDataProbeById(nic_hndl, &datagram_id)) == GNI_RC_SUCCESS)
1845 if (datagram_id >= mysize) { /* bound endpoint */
1846 int pe = datagram_id - mysize;
1847 CMI_GNI_LOCK(global_gni_lock)
1848 status = GNI_EpPostDataTestById( ep_hndl_array[pe], datagram_id, &post_state, &remote_address, &remote_id);
1849 CMI_GNI_UNLOCK(global_gni_lock)
1850 if(status == GNI_RC_SUCCESS && post_state == GNI_POST_COMPLETED)
1852 CmiAssert(remote_id == pe);
1853 status = GNI_SmsgInit(ep_hndl_array[pe], smsg_attr_vector_local[pe], smsg_attr_vector_remote[pe]);
1854 GNI_RC_CHECK("Dynamic SMSG Init", status);
1855 #if PRINT_SYH
1856 printf("++ Dynamic SMSG setup [%d===>%d] done\n", myrank, pe);
1857 #endif
1858 CmiAssert(smsg_connected_flag[pe] == 1);
1859 smsg_connected_flag[pe] = 2;
1862 else { /* unbound ep */
1863 status = GNI_EpPostDataTestById( ep_hndl_unbound, datagram_id, &post_state, &remote_address, &remote_id);
1864 if(status == GNI_RC_SUCCESS && post_state == GNI_POST_COMPLETED)
1866 CmiAssert(remote_id<mysize);
1867 CmiAssert(smsg_connected_flag[remote_id] <= 0);
1868 status = GNI_SmsgInit(ep_hndl_array[remote_id], &send_smsg_attr, &recv_smsg_attr);
1869 GNI_RC_CHECK("Dynamic SMSG Init", status);
1870 #if PRINT_SYH
1871 printf("++ Dynamic SMSG setup2 [%d===>%d] done\n", myrank, remote_id);
1872 #endif
1873 smsg_connected_flag[remote_id] = 2;
1875 alloc_smsg_attr(&send_smsg_attr);
1876 status = GNI_EpPostDataWId (ep_hndl_unbound, &send_smsg_attr, SMSG_ATTR_SIZE, &recv_smsg_attr, SMSG_ATTR_SIZE, myrank);
1877 GNI_RC_CHECK("post unbound datagram", status);
1883 /* pooling CQ to receive network message */
1884 static void PumpNetworkRdmaMsgs()
1886 gni_cq_entry_t event_data;
1887 gni_return_t status;
1891 inline
1892 static void bufferRdmaMsg(int inst_id, gni_post_descriptor_t *pd, int ack_index)
1894 RDMA_REQUEST *rdma_request_msg;
1895 MallocRdmaRequest(rdma_request_msg);
1896 rdma_request_msg->destNode = inst_id;
1897 rdma_request_msg->pd = pd;
1898 #if REMOTE_EVENT
1899 rdma_request_msg->ack_index = ack_index;
1900 #endif
1901 #if CMK_SMP
1902 PCQueuePush(sendRdmaBuf, (char*)rdma_request_msg);
1903 #else
1904 if(sendRdmaBuf == 0)
1906 sendRdmaBuf = sendRdmaTail = rdma_request_msg;
1907 }else{
1908 sendRdmaTail->next = rdma_request_msg;
1909 sendRdmaTail = rdma_request_msg;
1911 #endif
1915 static void getLargeMsgRequest(void* header, uint64_t inst_id);
1917 static void PumpNetworkSmsg()
1919 uint64_t inst_id;
1920 int ret;
1921 gni_cq_entry_t event_data;
1922 gni_return_t status, status2;
1923 void *header;
1924 uint8_t msg_tag;
1925 int msg_nbytes;
1926 void *msg_data;
1927 gni_mem_handle_t msg_mem_hndl;
1928 gni_smsg_attr_t *smsg_attr;
1929 gni_smsg_attr_t *remote_smsg_attr;
1930 int init_flag;
1931 CONTROL_MSG *control_msg_tmp, *header_tmp;
1932 uint64_t source_addr;
1933 SMSG_QUEUE *queue = &smsg_queue;
1934 #if CMK_DIRECT
1935 cmidirectMsg *direct_msg;
1936 #endif
1937 while(1)
1939 CMI_GNI_LOCK(smsg_rx_cq_lock)
1940 status =GNI_CqGetEvent(smsg_rx_cqh, &event_data);
1941 CMI_GNI_UNLOCK(smsg_rx_cq_lock)
1942 if(status != GNI_RC_SUCCESS)
1943 break;
1944 inst_id = GNI_CQ_GET_INST_ID(event_data);
1945 #if REMOTE_EVENT
1946 inst_id = ACK_GET_RANK(inst_id);
1947 #endif
1948 // GetEvent returns success but GetNext return not_done. caused by Smsg out-of-order transfer
1949 #if PRINT_SYH
1950 printf("[%d] %d PumpNetworkMsgs is received from PE: %d, status=%s\n", myrank, CmiMyRank(), inst_id, gni_err_str[status]);
1951 #endif
1952 if (useDynamicSMSG) {
1953 /* subtle: smsg may come before connection is setup */
1954 while (smsg_connected_flag[inst_id] != 2)
1955 PumpDatagramConnection();
1957 msg_tag = GNI_SMSG_ANY_TAG;
1958 while(1) {
1959 CMI_GNI_LOCK(smsg_mailbox_lock)
1960 status = GNI_SmsgGetNextWTag(ep_hndl_array[inst_id], &header, &msg_tag);
1961 if (status != GNI_RC_SUCCESS)
1963 CMI_GNI_UNLOCK(smsg_mailbox_lock)
1964 break;
1966 #if PRINT_SYH
1967 printf("[%d] from %d smsg msg is received, messageid: tag=%d\n", myrank, inst_id, msg_tag);
1968 #endif
1969 /* copy msg out and then put into queue (small message) */
1970 switch (msg_tag) {
1971 case SMALL_DATA_TAG:
1973 START_EVENT();
1974 msg_nbytes = CmiGetMsgSize(header);
1975 msg_data = CmiAlloc(msg_nbytes);
1976 memcpy(msg_data, (char*)header, msg_nbytes);
1977 GNI_SmsgRelease(ep_hndl_array[inst_id]);
1978 CMI_GNI_UNLOCK(smsg_mailbox_lock)
1979 TRACE_COMM_CREATION(CpvAccess(projTraceStart), msg_data);
1980 CMI_CHECK_CHECKSUM(msg_data, msg_nbytes);
1981 handleOneRecvedMsg(msg_nbytes, msg_data);
1982 break;
1984 case LMSG_INIT_TAG:
1986 #if MULTI_THREAD_SEND
1987 MallocControlMsg(control_msg_tmp);
1988 memcpy(control_msg_tmp, header, CONTROL_MSG_SIZE);
1989 GNI_SmsgRelease(ep_hndl_array[inst_id]);
1990 CMI_GNI_UNLOCK(smsg_mailbox_lock)
1991 getLargeMsgRequest(control_msg_tmp, inst_id);
1992 FreeControlMsg(control_msg_tmp);
1993 #else
1994 CMI_GNI_UNLOCK(smsg_mailbox_lock)
1995 getLargeMsgRequest(header, inst_id);
1996 GNI_SmsgRelease(ep_hndl_array[inst_id]);
1997 #endif
1998 break;
2000 case ACK_TAG: //msg fit into mempool
2002 /* Get is done, release message . Now put is not used yet*/
2003 void *msg = (void*)(((ACK_MSG *)header)->source_addr);
2004 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2005 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2006 #if ! USE_LRTS_MEMPOOL
2007 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((ACK_MSG *)header)->source_mem_hndl), &omdh, ((ACK_MSG *)header)->length);
2008 #else
2009 DecreaseMsgInSend(msg);
2010 #endif
2011 if(NoMsgInSend(msg))
2012 buffered_send_msg -= GetMempoolsize(msg);
2013 MACHSTATE5(8, "GO send done to %d (%d,%d, %d) tag=%d\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size, msg_tag);
2014 CmiFree(msg);
2015 break;
2017 case BIG_MSG_TAG: //big msg, de-register, transfer next seg
2019 #if MULTI_THREAD_SEND
2020 MallocControlMsg(header_tmp);
2021 memcpy(header_tmp, header, CONTROL_MSG_SIZE);
2022 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2023 #else
2024 header_tmp = (CONTROL_MSG *) header;
2025 #endif
2026 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2027 void *msg = (void*)(header_tmp->source_addr);
2028 int cur_seq = CmiGetMsgSeq(msg);
2029 int offset = ONE_SEG*(cur_seq+1);
2030 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(header_tmp->source_mem_hndl), &omdh, header_tmp->length);
2031 buffered_send_msg -= header_tmp->length;
2032 int remain_size = CmiGetMsgSize(msg) - header_tmp->length;
2033 if (remain_size < 0) remain_size = 0;
2034 CmiSetMsgSize(msg, remain_size);
2035 if(remain_size <= 0) //transaction done
2037 CmiFree(msg);
2038 }else if (header_tmp->total_length > offset)
2040 CmiSetMsgSeq(msg, cur_seq+1);
2041 control_msg_tmp = construct_control_msg(header_tmp->total_length, msg, cur_seq+1+1);
2042 control_msg_tmp->dest_addr = header_tmp->dest_addr;
2043 //send next seg
2044 send_large_messages( queue, inst_id, control_msg_tmp, 0, NULL);
2045 // pipelining
2046 if (header_tmp->seq_id == 1) {
2047 int i;
2048 for (i=1; i<BIG_MSG_PIPELINE; i++) {
2049 int seq = cur_seq+i+2;
2050 CmiSetMsgSeq(msg, seq-1);
2051 control_msg_tmp = construct_control_msg(header_tmp->total_length, (char *)msg, seq);
2052 control_msg_tmp->dest_addr = header_tmp->dest_addr;
2053 send_large_messages( queue, inst_id, control_msg_tmp, 0, NULL);
2054 if (header_tmp->total_length <= ONE_SEG*seq) break;
2058 #if MULTI_THREAD_SEND
2059 FreeControlMsg(header_tmp);
2060 #else
2061 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2062 #endif
2063 break;
2065 #if CMK_PERSISTENT_COMM
2066 case PUT_DONE_TAG: { //persistent message
2067 void *msg = (void *)(((CONTROL_MSG *) header)->source_addr);
2068 int size = ((CONTROL_MSG *) header)->length;
2069 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2070 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2071 CmiReference(msg);
2072 CMI_CHECK_CHECKSUM(msg, size);
2073 handleOneRecvedMsg(size, msg);
2074 #if PRINT_SYH
2075 printf("[%d] PUT_DONE_TAG hand over one message, size: %d. \n", myrank, size);
2076 #endif
2077 break;
2079 #endif
2080 #if CMK_DIRECT
2081 case DIRECT_PUT_DONE_TAG: //cmi direct
2082 //create a trigger message
2083 direct_msg = (cmidirectMsg*)CmiAlloc(sizeof(cmidirectMsg));
2084 direct_msg->handler = ((CMK_DIRECT_HEADER*)header)->handler_addr;
2085 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2086 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2087 CmiSetHandler(direct_msg, CpvAccess(CmiHandleDirectIdx));
2088 CmiPushPE(((CmiDirectUserHandle*)direct_msg->handler)->remoteRank, direct_msg);
2089 //(*(((CMK_DIRECT_HEADER*) header)->callbackFnPtr))(((CMK_DIRECT_HEADER*) header)->callbackData);
2090 break;
2091 #endif
2092 default: {
2093 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2094 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2095 printf("weird tag problem\n");
2096 CmiAbort("Unknown tag\n");
2098 } // end switch
2099 #if PRINT_SYH
2100 printf("[%d] from %d after switch request for smsg is received, messageid: tag=%d\n", myrank, inst_id, msg_tag);
2101 #endif
2102 smsg_recv_count ++;
2103 msg_tag = GNI_SMSG_ANY_TAG;
2104 } //endwhile getNext
2105 } //end while GetEvent
2106 if(status == GNI_RC_ERROR_RESOURCE)
2108 printf("charm> Please use +useRecvQueue 204800 in your command line, if the error comes again, increase this number\n");
2109 GNI_RC_CHECK("Smsg_rx_cq full", status);
2113 static void printDesc(gni_post_descriptor_t *pd)
2115 printf(" Descriptor (%p===>%p)(%d)\n", pd->local_addr, pd->remote_addr, pd->length);
2118 #if CQWRITE
2119 static void sendCqWrite(int destNode, uint64_t data, gni_mem_handle_t mem_hndl)
2121 gni_post_descriptor_t *pd;
2122 gni_return_t status = GNI_RC_SUCCESS;
2124 MallocPostDesc(pd);
2125 pd->type = GNI_POST_CQWRITE;
2126 pd->cq_mode = GNI_CQMODE_SILENT;
2127 //pd->cq_mode = GNI_CQMODE_GLOBAL_EVENT | GNI_CQMODE_REMOTE_EVENT ;
2128 pd->dlvr_mode = GNI_DLVMODE_PERFORMANCE;
2129 pd->cqwrite_value = data;
2130 pd->remote_mem_hndl = mem_hndl;
2131 status = GNI_PostCqWrite(ep_hndl_array[destNode], pd);
2132 GNI_RC_CHECK("GNI_PostCqWrite", status);
2134 #endif
2136 // register memory for a message
2137 // return mem handle
2138 static gni_return_t registerMessage(void *msg, int size, int seqno, gni_mem_handle_t *memh)
2140 gni_return_t status = GNI_RC_SUCCESS;
2142 if (!IsMemHndlZero(*memh)) return GNI_RC_SUCCESS;
2144 #if CMK_PERSISTENT_COMM
2145 // persistent message is always registered
2146 // BIG_MSG small pieces do not have malloc chunk header
2147 if ((seqno <= 1 || seqno == PERSIST_SEQ) && !IsMemHndlZero(MEMHFIELD(msg))) {
2148 *memh = MEMHFIELD(msg);
2149 return GNI_RC_SUCCESS;
2151 #endif
2152 if(seqno == 0
2153 #if CMK_PERSISTENT_COMM
2154 || seqno == PERSIST_SEQ
2155 #endif
2158 if(IsMemHndlZero((GetMemHndl(msg))))
2160 msg = (void*)(msg);
2161 status = registerMemory(GetMempoolBlockPtr(msg), GetMempoolsize(msg), &(GetMemHndl(msg)), rdma_rx_cqh);
2162 if(status == GNI_RC_SUCCESS)
2163 *memh = GetMemHndl(msg);
2165 else {
2166 *memh = GetMemHndl(msg);
2169 else {
2170 //big msg, can not fit into memory pool, or CmiDirect Msg (which is not from mempool)
2171 status = registerMemory(msg, size, memh, NULL);
2173 return status;
2176 // for BIG_MSG called on receiver side for receiving control message
2177 // LMSG_INIT_TAG
2178 static void getLargeMsgRequest(void* header, uint64_t inst_id )
2180 #if USE_LRTS_MEMPOOL
2181 CONTROL_MSG *request_msg;
2182 gni_return_t status = GNI_RC_SUCCESS;
2183 void *msg_data;
2184 gni_post_descriptor_t *pd;
2185 gni_mem_handle_t msg_mem_hndl;
2186 int size, transaction_size, offset = 0;
2187 size_t register_size = 0;
2189 // initial a get to transfer data from the sender side */
2190 request_msg = (CONTROL_MSG *) header;
2191 size = request_msg->total_length;
2192 MACHSTATE4(8, "GO Get request from %d (%d,%d, %d) \n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size);
2193 MallocPostDesc(pd);
2194 #if CMK_WITH_STATS
2195 pd->sync_flag_addr = 1000000 * CmiWallTimer(); //microsecond
2196 #endif
2197 if(request_msg->seq_id < 2) {
2198 #if CMK_SMP_TRACE_COMMTHREAD
2199 pd->sync_flag_addr = 1000000 * CmiWallTimer(); //microsecond
2200 #endif
2201 msg_data = CmiAlloc(size);
2202 CmiSetMsgSeq(msg_data, 0);
2203 _MEMCHECK(msg_data);
2205 else {
2206 offset = ONE_SEG*(request_msg->seq_id-1);
2207 msg_data = (char*)request_msg->dest_addr + offset;
2210 pd->cqwrite_value = request_msg->seq_id;
2212 transaction_size = request_msg->seq_id == 0? ALIGN64(size) : ALIGN64(request_msg->length);
2213 SetMemHndlZero(pd->local_mem_hndl);
2214 status = registerMessage(msg_data, transaction_size, request_msg->seq_id, &pd->local_mem_hndl);
2215 if (status == GNI_RC_SUCCESS && request_msg->seq_id == 0) {
2216 if(NoMsgInRecv( (void*)(msg_data)))
2217 register_size = GetMempoolsize((void*)(msg_data));
2220 pd->first_operand = ALIGN64(size); // total length
2222 if(request_msg->total_length <= LRTS_GNI_RDMA_THRESHOLD)
2223 pd->type = GNI_POST_FMA_GET;
2224 else
2225 pd->type = GNI_POST_RDMA_GET;
2226 pd->cq_mode = GNI_CQMODE_GLOBAL_EVENT;
2227 pd->dlvr_mode = GNI_DLVMODE_PERFORMANCE;
2228 pd->length = transaction_size;
2229 pd->local_addr = (uint64_t) msg_data;
2230 pd->remote_addr = request_msg->source_addr + offset;
2231 pd->remote_mem_hndl = request_msg->source_mem_hndl;
2232 pd->src_cq_hndl = rdma_tx_cqh;
2233 pd->rdma_mode = 0;
2234 pd->amo_cmd = 0;
2236 //memory registration success
2237 if(status == GNI_RC_SUCCESS)
2239 CmiNodeLock lock = pd->type == GNI_POST_RDMA_GET?rdma_tx_cq_lock:default_tx_cq_lock;
2240 CMI_GNI_LOCK(lock)
2241 #if REMOTE_EVENT
2242 if( request_msg->seq_id == 0)
2244 pd->cq_mode |= GNI_CQMODE_REMOTE_EVENT;
2245 int sts = GNI_EpSetEventData(ep_hndl_array[inst_id], inst_id, ACK_EVENT(request_msg->ack_index));
2246 GNI_RC_CHECK("GNI_EpSetEventData", sts);
2248 #endif
2250 #if CMK_WITH_STATS
2251 RDMA_TRY_SEND()
2252 #endif
2253 if(pd->type == GNI_POST_RDMA_GET)
2255 status = GNI_PostRdma(ep_hndl_array[inst_id], pd);
2257 else
2259 status = GNI_PostFma(ep_hndl_array[inst_id], pd);
2261 CMI_GNI_UNLOCK(lock)
2263 if(status == GNI_RC_SUCCESS )
2265 if(pd->cqwrite_value == 0)
2267 #if MACHINE_DEBUG_LOG
2268 buffered_recv_msg += register_size;
2269 MACHSTATE4(8, "GO request from %d (%d,%d, %d)\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size);
2270 #endif
2271 IncreaseMsgInRecv(msg_data);
2272 #if CMK_SMP_TRACE_COMMTHREAD
2273 pd->sync_flag_value = 1000000 * CmiWallTimer(); //microsecond
2274 #endif
2276 #if CMK_WITH_STATS
2277 pd->sync_flag_value = 1000000 * CmiWallTimer(); //microsecond
2278 RDMA_TRANS_INIT(pd->sync_flag_addr/1000000.0)
2279 #endif
2281 }else
2283 SetMemHndlZero((pd->local_mem_hndl));
2285 if(status == GNI_RC_ERROR_RESOURCE|| status == GNI_RC_ERROR_NOMEM )
2287 #if REMOTE_EVENT
2288 bufferRdmaMsg(inst_id, pd, request_msg->ack_index);
2289 #else
2290 bufferRdmaMsg(inst_id, pd, -1);
2291 #endif
2292 }else if (status != GNI_RC_SUCCESS) {
2293 // printf("source: %d pd:(%p,%p)(%p,%p) len:%d local:%x remote:%x\n", (int)inst_id, (pd->local_mem_hndl).qword1, (pd->local_mem_hndl).qword2, (pd->remote_mem_hndl).qword1, (pd->remote_mem_hndl).qword2, pd->length, pd->local_addr, pd->remote_addr);
2294 GNI_RC_CHECK("GetLargeAFter posting", status);
2296 #else
2297 CONTROL_MSG *request_msg;
2298 gni_return_t status;
2299 void *msg_data;
2300 gni_post_descriptor_t *pd;
2301 RDMA_REQUEST *rdma_request_msg;
2302 gni_mem_handle_t msg_mem_hndl;
2303 //int source;
2304 // initial a get to transfer data from the sender side */
2305 request_msg = (CONTROL_MSG *) header;
2306 msg_data = CmiAlloc(request_msg->length);
2307 _MEMCHECK(msg_data);
2309 MEMORY_REGISTER(onesided_hnd, nic_hndl, msg_data, request_msg->length, &msg_mem_hndl, &omdh, NULL, status)
2311 if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
2313 GNI_RC_CHECK("Invalid/permission Mem Register in post", status);
2316 MallocPostDesc(pd);
2317 if(request_msg->length <= LRTS_GNI_RDMA_THRESHOLD)
2318 pd->type = GNI_POST_FMA_GET;
2319 else
2320 pd->type = GNI_POST_RDMA_GET;
2321 pd->cq_mode = GNI_CQMODE_GLOBAL_EVENT;// | GNI_CQMODE_REMOTE_EVENT;
2322 pd->dlvr_mode = GNI_DLVMODE_PERFORMANCE;
2323 pd->length = ALIGN64(request_msg->length);
2324 pd->local_addr = (uint64_t) msg_data;
2325 pd->remote_addr = request_msg->source_addr;
2326 pd->remote_mem_hndl = request_msg->source_mem_hndl;
2327 pd->src_cq_hndl = rdma_tx_cqh;
2328 pd->rdma_mode = 0;
2329 pd->amo_cmd = 0;
2331 //memory registration successful
2332 if(status == GNI_RC_SUCCESS)
2334 pd->local_mem_hndl = msg_mem_hndl;
2336 if(pd->type == GNI_POST_RDMA_GET)
2338 CMI_GNI_LOCK(rdma_tx_cq_lock)
2339 status = GNI_PostRdma(ep_hndl_array[inst_id], pd);
2340 CMI_GNI_UNLOCK(rdma_tx_cq_lock)
2342 else
2344 CMI_GNI_LOCK(default_tx_cq_lock)
2345 status = GNI_PostFma(ep_hndl_array[inst_id], pd);
2346 CMI_GNI_UNLOCK(default_tx_cq_lock)
2349 }else
2351 SetMemHndlZero(pd->local_mem_hndl);
2353 if(status == GNI_RC_ERROR_RESOURCE|| status == GNI_RC_ERROR_NOMEM )
2355 MallocRdmaRequest(rdma_request_msg);
2356 rdma_request_msg->next = 0;
2357 rdma_request_msg->destNode = inst_id;
2358 rdma_request_msg->pd = pd;
2359 PCQueuePush(sendRdmaBuf, (char*)rdma_request_msg);
2360 }else {
2361 GNI_RC_CHECK("AFter posting", status);
2363 #endif
2366 #if CQWRITE
2367 static void PumpCqWriteTransactions()
2370 gni_cq_entry_t ev;
2371 gni_return_t status;
2372 void *msg;
2373 int msg_size;
2374 while(1) {
2375 //CMI_GNI_LOCK(my_cq_lock)
2376 status = GNI_CqGetEvent(rdma_rx_cqh, &ev);
2377 //CMI_GNI_UNLOCK(my_cq_lock)
2378 if(status != GNI_RC_SUCCESS) break;
2379 msg = (void*) ( GNI_CQ_GET_DATA(ev) & 0xFFFFFFFFFFFFL);
2380 #if CMK_PERSISTENT_COMM
2381 #if PRINT_SYH
2382 printf(" %d CQ write event %p\n", myrank, msg);
2383 #endif
2384 if (!IsMemHndlZero(MEMHFIELD(msg))) {
2385 #if PRINT_SYH
2386 printf(" %d Persistent CQ write event %p\n", myrank, msg);
2387 #endif
2388 CmiReference(msg);
2389 msg_size = CmiGetMsgSize(msg);
2390 CMI_CHECK_CHECKSUM(msg, msg_size);
2391 handleOneRecvedMsg(msg_size, msg);
2392 continue;
2394 #endif
2395 DecreaseMsgInSend(msg);
2396 #if ! USE_LRTS_MEMPOOL
2397 // MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((ACK_MSG *)header)->source_mem_hndl), &omdh, ((ACK_MSG *)header)->length);
2398 #else
2399 DecreaseMsgInSend(msg);
2400 #endif
2401 if(NoMsgInSend(msg))
2402 buffered_send_msg -= GetMempoolsize(msg);
2403 CmiFree(msg);
2405 if(status == GNI_RC_ERROR_RESOURCE)
2407 GNI_RC_CHECK("rdma_rx_cq full too many ack", status);
2410 #endif
2412 #if REMOTE_EVENT
2413 static void PumpRemoteTransactions()
2415 gni_cq_entry_t ev;
2416 gni_return_t status;
2417 void *msg;
2418 int slot, type, size;
2420 while(1) {
2421 CMI_GNI_LOCK(global_gni_lock)
2422 status = GNI_CqGetEvent(rdma_rx_cqh, &ev);
2423 CMI_GNI_UNLOCK(global_gni_lock)
2424 if(status != GNI_RC_SUCCESS) {
2425 break;
2428 slot = GNI_CQ_GET_INST_ID(ev);
2429 slot = ACK_GET_INDEX(slot);
2430 //slot = GNI_CQ_GET_DATA(ev) & 0xFFFFFFFFL;
2432 //CMI_GNI_LOCK(ackpool_lock);
2433 type = GetIndexType(slot);
2434 msg = GetIndexAddress(slot);
2435 //CMI_GNI_UNLOCK(ackpool_lock);
2437 switch (type) {
2438 case 1: // ACK
2439 DecreaseMsgInSend(msg);
2440 #if ! USE_LRTS_MEMPOOL
2441 // MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((ACK_MSG *)header)->source_mem_hndl), &omdh, ((ACK_MSG *)header)->length);
2442 #else
2443 DecreaseMsgInSend(msg);
2444 #endif
2445 if(NoMsgInSend(msg))
2446 buffered_send_msg -= GetMempoolsize(msg);
2447 CmiFree(msg);
2448 IndexPool_freeslot(&ackPool, slot);
2449 break;
2450 #if CMK_PERSISTENT_COMM
2451 case 2: // PERSISTENT
2452 msg = ((PersistentReceivesTable*)msg)->destBuf[0].destAddress;
2453 size = CmiGetMsgSize(msg);
2454 CmiReference(msg);
2455 CMI_CHECK_CHECKSUM(msg, size);
2456 handleOneRecvedMsg(size, msg);
2457 break;
2458 #endif
2459 default: {
2460 fprintf(stderr, "[%d] PumpRemoteTransactions: unknown type: %d\n", myrank, type);
2461 CmiAbort("PumpRemoteTransactions: unknown type");
2465 if(status == GNI_RC_ERROR_RESOURCE)
2467 GNI_RC_CHECK("rdma_rx_cq full too many ack", status);
2470 #endif
2472 static void PumpLocalTransactions(gni_cq_handle_t my_tx_cqh, CmiNodeLock my_cq_lock)
2474 gni_cq_entry_t ev;
2475 gni_return_t status;
2476 uint64_t type, inst_id;
2477 gni_post_descriptor_t *tmp_pd;
2478 MSG_LIST *ptr;
2479 CONTROL_MSG *ack_msg_tmp;
2480 ACK_MSG *ack_msg;
2481 uint8_t msg_tag;
2482 #if CMK_DIRECT
2483 CMK_DIRECT_HEADER *cmk_direct_done_msg;
2484 #endif
2485 SMSG_QUEUE *queue = &smsg_queue;
2487 while(1) {
2488 CMI_GNI_LOCK(my_cq_lock)
2489 status = GNI_CqGetEvent(my_tx_cqh, &ev);
2490 CMI_GNI_UNLOCK(my_cq_lock)
2491 if(status != GNI_RC_SUCCESS) break;
2493 type = GNI_CQ_GET_TYPE(ev);
2494 if (type == GNI_CQ_EVENT_TYPE_POST)
2496 inst_id = GNI_CQ_GET_INST_ID(ev);
2497 #if PRINT_SYH
2498 printf("[%d] LocalTransactions localdone=%d\n", myrank, lrts_local_done_msg);
2499 #endif
2500 CMI_GNI_LOCK(my_cq_lock)
2501 status = GNI_GetCompleted(my_tx_cqh, ev, &tmp_pd);
2502 CMI_GNI_UNLOCK(my_cq_lock)
2504 switch (tmp_pd->type) {
2505 #if CMK_PERSISTENT_COMM || CMK_DIRECT
2506 case GNI_POST_RDMA_PUT:
2507 #if CMK_PERSISTENT_COMM && ! USE_LRTS_MEMPOOL
2508 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length);
2509 #endif
2510 case GNI_POST_FMA_PUT:
2511 if(tmp_pd->amo_cmd == 1) {
2512 #if CMK_DIRECT
2513 //sender ACK to receiver to trigger it is done
2514 cmk_direct_done_msg = (CMK_DIRECT_HEADER*) malloc(sizeof(CMK_DIRECT_HEADER));
2515 cmk_direct_done_msg->handler_addr = tmp_pd->first_operand;
2516 msg_tag = DIRECT_PUT_DONE_TAG;
2517 #endif
2519 else {
2520 CmiFree((void *)tmp_pd->local_addr);
2521 #if REMOTE_EVENT
2522 FreePostDesc(tmp_pd);
2523 continue;
2524 #elif CQWRITE
2525 sendCqWrite(inst_id, tmp_pd->remote_addr, tmp_pd->remote_mem_hndl);
2526 FreePostDesc(tmp_pd);
2527 continue;
2528 #else
2529 MallocControlMsg(ack_msg_tmp);
2530 ack_msg_tmp->source_addr = tmp_pd->remote_addr;
2531 ack_msg_tmp->source_mem_hndl = tmp_pd->remote_mem_hndl;
2532 ack_msg_tmp->length = tmp_pd->length;
2533 msg_tag = PUT_DONE_TAG;
2534 #endif
2536 break;
2537 #endif
2538 case GNI_POST_RDMA_GET:
2539 case GNI_POST_FMA_GET: {
2540 #if ! USE_LRTS_MEMPOOL
2541 MallocControlMsg(ack_msg_tmp);
2542 ack_msg_tmp->source_addr = tmp_pd->remote_addr;
2543 ack_msg_tmp->source_mem_hndl = tmp_pd->remote_mem_hndl;
2544 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length)
2545 msg_tag = ACK_TAG;
2546 #else
2547 #if CMK_WITH_STATS
2548 RDMA_TRANS_DONE(tmp_pd->sync_flag_value/1000000.0)
2549 #endif
2550 int seq_id = tmp_pd->cqwrite_value;
2551 if(seq_id > 0) // BIG_MSG
2553 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length);
2554 MallocControlMsg(ack_msg_tmp);
2555 ack_msg_tmp->source_addr = tmp_pd->remote_addr;
2556 ack_msg_tmp->source_mem_hndl = tmp_pd->remote_mem_hndl;
2557 ack_msg_tmp->seq_id = seq_id;
2558 ack_msg_tmp->dest_addr = tmp_pd->local_addr - ONE_SEG*(ack_msg_tmp->seq_id-1);
2559 ack_msg_tmp->source_addr -= ONE_SEG*(ack_msg_tmp->seq_id-1);
2560 ack_msg_tmp->length = tmp_pd->length;
2561 ack_msg_tmp->total_length = tmp_pd->first_operand; // total size
2562 msg_tag = BIG_MSG_TAG;
2564 else
2566 msg_tag = ACK_TAG;
2567 #if !REMOTE_EVENT && !CQWRITE
2568 MallocAckMsg(ack_msg);
2569 ack_msg->source_addr = tmp_pd->remote_addr;
2570 #endif
2572 #endif
2573 break;
2575 case GNI_POST_CQWRITE:
2576 FreePostDesc(tmp_pd);
2577 continue;
2578 default:
2579 CmiPrintf("type=%d\n", tmp_pd->type);
2580 CmiAbort("PumpLocalTransactions: unknown type!");
2581 } /* end of switch */
2583 #if CMK_DIRECT
2584 if (tmp_pd->amo_cmd == 1) {
2585 status = send_smsg_message(queue, inst_id, cmk_direct_done_msg, sizeof(CMK_DIRECT_HEADER), msg_tag, 0, NULL);
2586 if (status == GNI_RC_SUCCESS) free(cmk_direct_done_msg);
2588 else
2589 #endif
2590 if (msg_tag == ACK_TAG) {
2591 #if !REMOTE_EVENT
2592 #if !CQWRITE
2593 status = send_smsg_message(queue, inst_id, ack_msg, ACK_MSG_SIZE, msg_tag, 0, NULL);
2594 if (status == GNI_RC_SUCCESS) FreeAckMsg(ack_msg);
2595 #else
2596 sendCqWrite(inst_id, tmp_pd->remote_addr, tmp_pd->remote_mem_hndl);
2597 #endif
2598 #endif
2600 else {
2601 status = send_smsg_message(queue, inst_id, ack_msg_tmp, CONTROL_MSG_SIZE, msg_tag, 0, NULL);
2602 if (status == GNI_RC_SUCCESS) FreeControlMsg(ack_msg_tmp);
2604 #if CMK_PERSISTENT_COMM
2605 if (tmp_pd->type == GNI_POST_RDMA_GET || tmp_pd->type == GNI_POST_FMA_GET)
2606 #endif
2608 if( msg_tag == ACK_TAG){ //msg fit in mempool
2609 #if PRINT_SYH
2610 printf("Normal msg transaction PE:%d==>%d\n", myrank, inst_id);
2611 #endif
2612 TRACE_COMM_CONTROL_CREATION((double)(tmp_pd->sync_flag_addr/1000000.0), (double)((tmp_pd->sync_flag_addr+1)/1000000.0), (double)((tmp_pd->sync_flag_addr+1)/1000000.0), (void*)tmp_pd->local_addr);
2613 TRACE_COMM_CONTROL_CREATION((double)(tmp_pd->sync_flag_value/1000000.0), (double)((tmp_pd->sync_flag_value+1)/1000000.0), (double)((tmp_pd->sync_flag_value+1)/1000000.0), (void*)tmp_pd->local_addr);
2615 START_EVENT();
2616 CmiAssert(SIZEFIELD((void*)(tmp_pd->local_addr)) <= tmp_pd->length);
2617 DecreaseMsgInRecv((void*)tmp_pd->local_addr);
2618 #if MACHINE_DEBUG_LOG
2619 if(NoMsgInRecv((void*)(tmp_pd->local_addr)))
2620 buffered_recv_msg -= GetMempoolsize((void*)(tmp_pd->local_addr));
2621 MACHSTATE5(8, "GO Recv done ack send from %d (%d,%d, %d) tag=%d\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size, msg_tag);
2622 #endif
2623 TRACE_COMM_CREATION(CpvAccess(projTraceStart), (void*)tmp_pd->local_addr);
2624 CMI_CHECK_CHECKSUM((void*)tmp_pd->local_addr, tmp_pd->length);
2625 handleOneRecvedMsg(tmp_pd->length, (void*)tmp_pd->local_addr);
2626 }else if(msg_tag == BIG_MSG_TAG){
2627 void *msg = (char*)tmp_pd->local_addr-(tmp_pd->cqwrite_value-1)*ONE_SEG;
2628 CmiSetMsgSeq(msg, CmiGetMsgSeq(msg)+1);
2629 if (tmp_pd->first_operand <= ONE_SEG*CmiGetMsgSeq(msg)) {
2630 START_EVENT();
2631 #if PRINT_SYH
2632 printf("Pipeline msg done [%d]\n", myrank);
2633 #endif
2634 #if CMK_SMP_TRACE_COMMTHREAD
2635 if( tmp_pd->cqwrite_value == 1)
2636 TRACE_COMM_CONTROL_CREATION((double)(tmp_pd->sync_flag_addr/1000000.0), (double)((tmp_pd->sync_flag_addr+1)/1000000.0), (double)((tmp_pd->sync_flag_addr+2)/1000000.0), (void*)tmp_pd->local_addr);
2637 #endif
2638 TRACE_COMM_CREATION(CpvAccess(projTraceStart), msg);
2639 CMI_CHECK_CHECKSUM(msg, tmp_pd->first_operand);
2640 handleOneRecvedMsg(tmp_pd->first_operand, msg);
2644 FreePostDesc(tmp_pd);
2646 } //end while
2647 if(status == GNI_RC_ERROR_RESOURCE)
2649 printf("charm> Please use +useSendQueue 204800 in your command line, if the error comes again, increase this number\n");
2650 GNI_RC_CHECK("Smsg_tx_cq full", status);
2654 static void SendRdmaMsg()
2656 gni_return_t status = GNI_RC_SUCCESS;
2657 gni_mem_handle_t msg_mem_hndl;
2658 RDMA_REQUEST *ptr = 0, *tmp_ptr;
2659 RDMA_REQUEST *pre = 0;
2660 uint64_t register_size = 0;
2661 void *msg;
2662 int i;
2663 #if CMK_SMP
2664 int len = PCQueueLength(sendRdmaBuf);
2665 for (i=0; i<len; i++)
2667 CMI_PCQUEUEPOP_LOCK(sendRdmaBuf)
2668 ptr = (RDMA_REQUEST*)PCQueuePop(sendRdmaBuf);
2669 CMI_PCQUEUEPOP_UNLOCK(sendRdmaBuf)
2670 if (ptr == NULL) break;
2671 #else
2672 ptr = sendRdmaBuf;
2673 while (ptr!=0)
2675 #endif
2676 MACHSTATE4(8, "noempty-rdma %d (%lld,%lld,%d) \n", ptr->destNode, buffered_send_msg, buffered_recv_msg, register_memory_size);
2677 gni_post_descriptor_t *pd = ptr->pd;
2679 msg = (void*)(pd->local_addr);
2680 status = registerMessage(msg, pd->length, pd->cqwrite_value, &pd->local_mem_hndl);
2681 register_size = 0;
2682 if(pd->cqwrite_value == 0) {
2683 if(NoMsgInRecv(msg))
2684 register_size = GetMempoolsize(msg);
2687 if(status == GNI_RC_SUCCESS) //mem register good
2689 CmiNodeLock lock = (pd->type == GNI_POST_RDMA_GET || pd->type == GNI_POST_RDMA_PUT) ? rdma_tx_cq_lock:default_tx_cq_lock;
2690 CMI_GNI_LOCK(lock);
2691 #if REMOTE_EVENT
2692 if( pd->cqwrite_value == 0
2693 #if CMK_PERSISTENT_COMM
2694 || pd->cqwrite_value == PERSIST_SEQ
2695 #endif
2698 pd->cq_mode |= GNI_CQMODE_REMOTE_EVENT;
2699 int sts = GNI_EpSetEventData(ep_hndl_array[ptr->destNode], ptr->destNode, ACK_EVENT(ptr->ack_index));
2700 GNI_RC_CHECK("GNI_EpSetEventData", sts);
2702 #endif
2703 #if CMK_WITH_STATS
2704 RDMA_TRY_SEND()
2705 #endif
2706 if(pd->type == GNI_POST_RDMA_GET || pd->type == GNI_POST_RDMA_PUT)
2708 status = GNI_PostRdma(ep_hndl_array[ptr->destNode], pd);
2710 else
2712 status = GNI_PostFma(ep_hndl_array[ptr->destNode], pd);
2714 CMI_GNI_UNLOCK(lock);
2716 if(status == GNI_RC_SUCCESS) //post good
2718 #if !CMK_SMP
2719 tmp_ptr = ptr;
2720 if(pre != 0) {
2721 pre->next = ptr->next;
2723 else {
2724 sendRdmaBuf = ptr->next;
2726 ptr = ptr->next;
2727 FreeRdmaRequest(tmp_ptr);
2728 #endif
2729 if(pd->cqwrite_value == 0)
2731 #if CMK_SMP_TRACE_COMMTHREAD
2732 pd->sync_flag_value = 1000000 * CmiWallTimer(); //microsecond
2733 #endif
2734 IncreaseMsgInRecv(((void*)(pd->local_addr)));
2736 #if CMK_WITH_STATS
2737 pd->sync_flag_value = 1000000 * CmiWallTimer(); //microsecond
2738 RDMA_TRANS_INIT(pd->sync_flag_addr/1000000.0)
2739 #endif
2740 #if MACHINE_DEBUG_LOG
2741 buffered_recv_msg += register_size;
2742 MACHSTATE(8, "GO request from buffered\n");
2743 #endif
2744 #if PRINT_SYH
2745 printf("[%d] SendRdmaMsg: post succeed. seqno: %d\n", myrank, pd->cqwrite_value);
2746 #endif
2747 }else // cannot post
2749 #if CMK_SMP
2750 PCQueuePush(sendRdmaBuf, (char*)ptr);
2751 #else
2752 pre = ptr;
2753 ptr = ptr->next;
2754 #endif
2755 #if PRINT_SYH
2756 printf("[%d] SendRdmaMsg: post failed. seqno: %d\n", myrank, pd->cqwrite_value);
2757 #endif
2758 break;
2760 } else //memory registration fails
2762 #if CMK_SMP
2763 PCQueuePush(sendRdmaBuf, (char*)ptr);
2764 #else
2765 pre = ptr;
2766 ptr = ptr->next;
2767 #endif
2769 } //end while
2770 #if ! CMK_SMP
2771 if(ptr == 0)
2772 sendRdmaTail = pre;
2773 #endif
2776 // return 1 if all messages are sent
2777 static int SendBufferMsg(SMSG_QUEUE *queue)
2779 MSG_LIST *ptr, *tmp_ptr, *pre=0, *current_head;
2780 CONTROL_MSG *control_msg_tmp;
2781 gni_return_t status;
2782 int done = 1;
2783 uint64_t register_size;
2784 void *register_addr;
2785 int index_previous = -1;
2786 #if CMI_EXERT_SEND_CAP
2787 int sent_cnt = 0;
2788 #endif
2790 #if CMK_SMP
2791 int index = 0;
2792 #if ONE_SEND_QUEUE
2793 memset(destpe_avail, 0, mysize * sizeof(char));
2794 for (index=0; index<1; index++)
2796 int i, len = PCQueueLength(queue->sendMsgBuf);
2797 for (i=0; i<len; i++)
2799 CMI_PCQUEUEPOP_LOCK(queue->sendMsgBuf)
2800 ptr = (MSG_LIST*)PCQueuePop(queue->sendMsgBuf);
2801 CMI_PCQUEUEPOP_UNLOCK(queue->sendMsgBuf)
2802 if(ptr == NULL) break;
2803 if (destpe_avail[ptr->destNode] == 1) { /* can't send to this pe */
2804 PCQueuePush(queue->sendMsgBuf, (char*)ptr);
2805 continue;
2807 #else
2808 #if SMP_LOCKS
2809 int nonempty = PCQueueLength(nonEmptyQueues);
2810 for(index =0; index<nonempty; index++)
2812 CMI_PCQUEUEPOP_LOCK(nonEmptyQueues)
2813 MSG_LIST_INDEX *current_list = (MSG_LIST_INDEX *)PCQueuePop(nonEmptyQueues);
2814 CMI_PCQUEUEPOP_UNLOCK(nonEmptyQueues)
2815 if(current_list == NULL) break;
2816 PCQueue current_queue= current_list-> sendSmsgBuf;
2817 CmiLock(current_list->lock);
2818 int i, len = PCQueueLength(current_queue);
2819 current_list->pushed = 0;
2820 CmiUnlock(current_list->lock);
2821 #else
2822 for(index =0; index<mysize; index++)
2824 PCQueue current_queue = queue->smsg_msglist_index[index].sendSmsgBuf;
2825 int i, len = PCQueueLength(current_queue);
2826 #endif
2827 for (i=0; i<len; i++)
2829 CMI_PCQUEUEPOP_LOCK(current_queue)
2830 ptr = (MSG_LIST*)PCQueuePop(current_queue);
2831 CMI_PCQUEUEPOP_UNLOCK(current_queue)
2832 if (ptr == 0) break;
2833 #endif
2834 #else
2835 int index = queue->smsg_head_index;
2836 while(index != -1)
2838 ptr = queue->smsg_msglist_index[index].sendSmsgBuf;
2839 pre = 0;
2840 while(ptr != 0)
2842 #endif
2843 MACHSTATE5(8, "noempty-smsg %d (%d,%d,%d) tag=%d \n", ptr->destNode, buffered_send_msg, buffered_recv_msg, register_memory_size, ptr->tag);
2844 status = GNI_RC_ERROR_RESOURCE;
2845 if (useDynamicSMSG && smsg_connected_flag[index] != 2) {
2846 /* connection not exists yet */
2848 else
2849 switch(ptr->tag)
2851 case SMALL_DATA_TAG:
2852 status = send_smsg_message(queue, ptr->destNode, ptr->msg, ptr->size, ptr->tag, 1, ptr);
2853 if(status == GNI_RC_SUCCESS)
2855 CmiFree(ptr->msg);
2857 break;
2858 case LMSG_INIT_TAG:
2859 control_msg_tmp = (CONTROL_MSG*)ptr->msg;
2860 status = send_large_messages( queue, ptr->destNode, control_msg_tmp, 1, ptr);
2861 break;
2862 case ACK_TAG:
2863 status = send_smsg_message(queue, ptr->destNode, ptr->msg, ptr->size, ptr->tag, 1, ptr);
2864 if(status == GNI_RC_SUCCESS) FreeAckMsg((ACK_MSG*)ptr->msg);
2865 break;
2866 case BIG_MSG_TAG:
2867 status = send_smsg_message(queue, ptr->destNode, ptr->msg, ptr->size, ptr->tag, 1, ptr);
2868 if(status == GNI_RC_SUCCESS)
2870 FreeControlMsg((CONTROL_MSG*)ptr->msg);
2872 break;
2873 #if CMK_PERSISTENT_COMM
2874 case PUT_DONE_TAG:
2875 status = send_smsg_message(queue, ptr->destNode, ptr->msg, ptr->size, ptr->tag, 1, ptr);
2876 if(status == GNI_RC_SUCCESS)
2878 FreeControlMsg((CONTROL_MSG*)ptr->msg);
2880 break;
2881 #endif
2882 #if CMK_DIRECT
2883 case DIRECT_PUT_DONE_TAG:
2884 status = send_smsg_message(queue, ptr->destNode, ptr->msg, sizeof(CMK_DIRECT_HEADER), ptr->tag, 1, ptr);
2885 if(status == GNI_RC_SUCCESS)
2887 free((CMK_DIRECT_HEADER*)ptr->msg);
2889 break;
2891 #endif
2892 default:
2893 printf("Weird tag\n");
2894 CmiAbort("should not happen\n");
2895 } // end switch
2896 if(status == GNI_RC_SUCCESS)
2898 #if PRINT_SYH
2899 buffered_smsg_counter--;
2900 printf("[%d==>%d] buffered smsg sending done\n", myrank, ptr->destNode);
2901 #endif
2902 #if !CMK_SMP
2903 tmp_ptr = ptr;
2904 if(pre)
2906 ptr = pre ->next = ptr->next;
2907 }else
2909 ptr = queue->smsg_msglist_index[index].sendSmsgBuf = queue->smsg_msglist_index[index].sendSmsgBuf->next;
2911 FreeMsgList(tmp_ptr);
2912 #else
2913 FreeMsgList(ptr);
2914 #endif
2915 #if CMI_EXERT_SEND_CAP
2916 sent_cnt++;
2917 if(sent_cnt == SEND_CAP)
2918 break;
2919 #endif
2920 }else {
2921 #if CMK_SMP
2922 #if ONE_SEND_QUEUE
2923 PCQueuePush(queue->sendMsgBuf, (char*)ptr);
2924 #else
2925 PCQueuePush(current_queue, (char*)ptr);
2926 #endif
2927 #else
2928 pre = ptr;
2929 ptr=ptr->next;
2930 #endif
2931 done = 0;
2932 if(status == GNI_RC_ERROR_RESOURCE)
2934 #if CMK_SMP && ONE_SEND_QUEUE
2935 destpe_avail[ptr->destNode] = 1;
2936 #else
2937 break;
2938 #endif
2941 } //end while
2942 #if !CMK_SMP
2943 if(ptr == 0)
2944 queue->smsg_msglist_index[index].tail = pre;
2945 if(queue->smsg_msglist_index[index].sendSmsgBuf == 0)
2947 if(index_previous != -1)
2948 queue->smsg_msglist_index[index_previous].next = queue->smsg_msglist_index[index].next;
2949 else
2950 queue->smsg_head_index = queue->smsg_msglist_index[index].next;
2952 else {
2953 index_previous = index;
2955 index = queue->smsg_msglist_index[index].next;
2956 #else
2957 #if !ONE_SEND_QUEUE && SMP_LOCKS
2958 CmiLock(current_list->lock);
2959 if(!PCQueueEmpty(current_queue) && current_list->pushed == 0)
2961 current_list->pushed = 1;
2962 PCQueuePush(nonEmptyQueues, current_list);
2964 CmiUnlock(current_list->lock);
2965 #endif
2966 #endif
2968 #if CMI_EXERT_SEND_CAP
2969 if(sent_cnt == SEND_CAP)
2970 break;
2971 #endif
2972 } // end pooling for all cores
2973 return done;
2976 static void ProcessDeadlock();
2977 void LrtsAdvanceCommunication(int whileidle)
2979 static int count = 0;
2980 /* Receive Msg first */
2981 #if CMK_SMP_TRACE_COMMTHREAD
2982 double startT, endT;
2983 #endif
2984 if (useDynamicSMSG && whileidle)
2986 #if CMK_SMP_TRACE_COMMTHREAD
2987 startT = CmiWallTimer();
2988 #endif
2989 PumpDatagramConnection();
2990 #if CMK_SMP_TRACE_COMMTHREAD
2991 endT = CmiWallTimer();
2992 if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_SetupConnect, startT, endT);
2993 #endif
2996 #if CMK_SMP_TRACE_COMMTHREAD
2997 startT = CmiWallTimer();
2998 #endif
2999 PumpNetworkSmsg();
3000 //MACHSTATE(8, "after PumpNetworkSmsg \n") ;
3001 #if CMK_SMP_TRACE_COMMTHREAD
3002 endT = CmiWallTimer();
3003 if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_PumpSmsg, startT, endT);
3004 #endif
3006 #if CMK_SMP_TRACE_COMMTHREAD
3007 startT = CmiWallTimer();
3008 #endif
3009 PumpLocalTransactions(default_tx_cqh, default_tx_cq_lock);
3010 //MACHSTATE(8, "after PumpLocalTransactions\n") ;
3011 #if CMK_SMP_TRACE_COMMTHREAD
3012 endT = CmiWallTimer();
3013 if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_PumpTransaction, startT, endT);
3014 #endif
3016 #if CMK_SMP_TRACE_COMMTHREAD
3017 startT = CmiWallTimer();
3018 #endif
3019 PumpLocalTransactions(rdma_tx_cqh, rdma_tx_cq_lock);
3021 #if CQWRITE
3022 PumpCqWriteTransactions();
3023 #endif
3025 #if REMOTE_EVENT
3026 PumpRemoteTransactions();
3027 #endif
3029 //MACHSTATE(8, "after PumpLocalTransactions\n") ;
3030 #if CMK_SMP_TRACE_COMMTHREAD
3031 endT = CmiWallTimer();
3032 if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_PumpRdmaTransaction, startT, endT);
3033 #endif
3035 /* Send buffered Message */
3036 #if CMK_SMP_TRACE_COMMTHREAD
3037 startT = CmiWallTimer();
3038 #endif
3039 #if CMK_USE_OOB
3040 if (SendBufferMsg(&smsg_oob_queue) == 1)
3041 #endif
3043 SendBufferMsg(&smsg_queue);
3045 //MACHSTATE(8, "after SendBufferMsg\n") ;
3046 #if CMK_SMP_TRACE_COMMTHREAD
3047 endT = CmiWallTimer();
3048 if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_SendBufferSmsg, startT, endT);
3049 #endif
3051 #if CMK_SMP_TRACE_COMMTHREAD
3052 startT = CmiWallTimer();
3053 #endif
3054 SendRdmaMsg();
3055 //MACHSTATE(8, "after SendRdmaMsg\n") ;
3056 #if CMK_SMP_TRACE_COMMTHREAD
3057 endT = CmiWallTimer();
3058 if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_SendFmaRdmaMsg, startT, endT);
3059 #endif
3061 #if CMK_SMP && ! LARGEPAGE
3062 if (_detected_hang) ProcessDeadlock();
3063 #endif
3066 /* useDynamicSMSG */
3067 static void _init_dynamic_smsg()
3069 gni_return_t status;
3070 uint32_t vmdh_index = -1;
3071 int i;
3073 smsg_attr_vector_local = (gni_smsg_attr_t**)malloc(mysize * sizeof(gni_smsg_attr_t*));
3074 smsg_attr_vector_remote = (gni_smsg_attr_t**)malloc(mysize * sizeof(gni_smsg_attr_t*));
3075 smsg_connected_flag = (int*)malloc(sizeof(int)*mysize);
3076 for(i=0; i<mysize; i++) {
3077 smsg_connected_flag[i] = 0;
3078 smsg_attr_vector_local[i] = NULL;
3079 smsg_attr_vector_remote[i] = NULL;
3081 if(mysize <=512)
3083 SMSG_MAX_MSG = 4096;
3084 }else if (mysize <= 4096)
3086 SMSG_MAX_MSG = 4096/mysize * 1024;
3087 }else if (mysize <= 16384)
3089 SMSG_MAX_MSG = 512;
3090 }else {
3091 SMSG_MAX_MSG = 256;
3094 send_smsg_attr.msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
3095 send_smsg_attr.mbox_maxcredit = SMSG_MAX_CREDIT;
3096 send_smsg_attr.msg_maxsize = SMSG_MAX_MSG;
3097 status = GNI_SmsgBufferSizeNeeded(&send_smsg_attr, &smsg_memlen);
3098 GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
3100 mailbox_list = (dynamic_smsg_mailbox_t*)malloc(sizeof(dynamic_smsg_mailbox_t));
3101 mailbox_list->size = smsg_memlen*avg_smsg_connection;
3102 posix_memalign(&mailbox_list->mailbox_base, 64, mailbox_list->size);
3103 bzero(mailbox_list->mailbox_base, mailbox_list->size);
3104 mailbox_list->offset = 0;
3105 mailbox_list->next = 0;
3107 status = GNI_MemRegister(nic_hndl, (uint64_t)(mailbox_list->mailbox_base),
3108 mailbox_list->size, smsg_rx_cqh,
3109 GNI_MEM_READWRITE,
3110 vmdh_index,
3111 &(mailbox_list->mem_hndl));
3112 GNI_RC_CHECK("MEMORY registration for smsg", status);
3114 status = GNI_EpCreate(nic_hndl, default_tx_cqh, &ep_hndl_unbound);
3115 GNI_RC_CHECK("Unbound EP", status);
3117 alloc_smsg_attr(&send_smsg_attr);
3119 status = GNI_EpPostDataWId (ep_hndl_unbound, &send_smsg_attr, SMSG_ATTR_SIZE, &recv_smsg_attr, SMSG_ATTR_SIZE, myrank);
3120 GNI_RC_CHECK("post unbound datagram", status);
3122 /* always pre-connect to proc 0 */
3123 //if (myrank != 0) connect_to(0);
3126 static void _init_static_smsg()
3128 gni_smsg_attr_t *smsg_attr;
3129 gni_smsg_attr_t remote_smsg_attr;
3130 gni_smsg_attr_t *smsg_attr_vec;
3131 gni_mem_handle_t my_smsg_mdh_mailbox;
3132 int ret, i;
3133 gni_return_t status;
3134 uint32_t vmdh_index = -1;
3135 mdh_addr_t base_infor;
3136 mdh_addr_t *base_addr_vec;
3137 char *env;
3139 if(mysize <=512)
3141 SMSG_MAX_MSG = 1024;
3142 }else if (mysize <= 4096)
3144 SMSG_MAX_MSG = 1024;
3145 }else if (mysize <= 16384)
3147 SMSG_MAX_MSG = 512;
3148 }else {
3149 SMSG_MAX_MSG = 256;
3152 env = getenv("CHARM_UGNI_SMSG_MAX_SIZE");
3153 if (env) SMSG_MAX_MSG = atoi(env);
3154 CmiAssert(SMSG_MAX_MSG > 0);
3156 smsg_attr = malloc(mysize * sizeof(gni_smsg_attr_t));
3158 smsg_attr[0].msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
3159 smsg_attr[0].mbox_maxcredit = SMSG_MAX_CREDIT;
3160 smsg_attr[0].msg_maxsize = SMSG_MAX_MSG;
3161 status = GNI_SmsgBufferSizeNeeded(&smsg_attr[0], &smsg_memlen);
3162 GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
3163 ret = posix_memalign(&smsg_mailbox_base, 64, smsg_memlen*(mysize));
3164 CmiAssert(ret == 0);
3165 bzero(smsg_mailbox_base, smsg_memlen*(mysize));
3167 status = GNI_MemRegister(nic_hndl, (uint64_t)smsg_mailbox_base,
3168 smsg_memlen*(mysize), smsg_rx_cqh,
3169 GNI_MEM_READWRITE,
3170 vmdh_index,
3171 &my_smsg_mdh_mailbox);
3172 register_memory_size += smsg_memlen*(mysize);
3173 GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
3175 if (myrank == 0) printf("Charm++> SMSG memory: %1.1fKB\n", 1.0*smsg_memlen*(mysize)/1024);
3176 if (myrank == 0 && register_memory_size>=MAX_REG_MEM ) printf("Charm++> FATAL ERROR your program has risk of hanging \n please set CHARM_UGNI_MEMPOOL_MAX to a larger value or use Dynamic smsg\n");
3178 base_infor.addr = (uint64_t)smsg_mailbox_base;
3179 base_infor.mdh = my_smsg_mdh_mailbox;
3180 base_addr_vec = malloc(mysize * sizeof(mdh_addr_t));
3182 allgather(&base_infor, base_addr_vec, sizeof(mdh_addr_t));
3184 for(i=0; i<mysize; i++)
3186 if(i==myrank)
3187 continue;
3188 smsg_attr[i].msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
3189 smsg_attr[i].mbox_maxcredit = SMSG_MAX_CREDIT;
3190 smsg_attr[i].msg_maxsize = SMSG_MAX_MSG;
3191 smsg_attr[i].mbox_offset = i*smsg_memlen;
3192 smsg_attr[i].buff_size = smsg_memlen;
3193 smsg_attr[i].msg_buffer = smsg_mailbox_base ;
3194 smsg_attr[i].mem_hndl = my_smsg_mdh_mailbox;
3197 for(i=0; i<mysize; i++)
3199 if (myrank == i) continue;
3201 remote_smsg_attr.msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
3202 remote_smsg_attr.mbox_maxcredit = SMSG_MAX_CREDIT;
3203 remote_smsg_attr.msg_maxsize = SMSG_MAX_MSG;
3204 remote_smsg_attr.mbox_offset = myrank*smsg_memlen;
3205 remote_smsg_attr.buff_size = smsg_memlen;
3206 remote_smsg_attr.msg_buffer = (void*)base_addr_vec[i].addr;
3207 remote_smsg_attr.mem_hndl = base_addr_vec[i].mdh;
3209 /* initialize the smsg channel */
3210 status = GNI_SmsgInit(ep_hndl_array[i], &smsg_attr[i], &remote_smsg_attr);
3211 GNI_RC_CHECK("SMSG Init", status);
3212 } //end initialization
3214 free(base_addr_vec);
3215 free(smsg_attr);
3217 status = GNI_SmsgSetMaxRetrans(nic_hndl, 4096);
3218 GNI_RC_CHECK("SmsgSetMaxRetrans Init", status);
3221 inline
3222 static void _init_send_queue(SMSG_QUEUE *queue)
3224 int i;
3225 #if ONE_SEND_QUEUE
3226 queue->sendMsgBuf = PCQueueCreate();
3227 destpe_avail = (char*)malloc(mysize * sizeof(char));
3228 #else
3229 queue->smsg_msglist_index = (MSG_LIST_INDEX*)malloc(mysize*sizeof(MSG_LIST_INDEX));
3230 #if CMK_SMP && SMP_LOCKS
3231 nonEmptyQueues = PCQueueCreate();
3232 #endif
3233 for(i =0; i<mysize; i++)
3235 #if CMK_SMP
3236 queue->smsg_msglist_index[i].sendSmsgBuf = PCQueueCreate();
3237 #if SMP_LOCKS
3238 queue->smsg_msglist_index[i].pushed = 0;
3239 queue->smsg_msglist_index[i].lock = CmiCreateLock();
3240 #endif
3241 #else
3242 queue->smsg_msglist_index[i].sendSmsgBuf = 0;
3243 queue->smsg_msglist_index[i].next = -1;
3244 queue->smsg_head_index = -1;
3245 #endif
3248 #endif
3251 inline
3252 static void _init_smsg()
3254 if(mysize > 1) {
3255 if (useDynamicSMSG)
3256 _init_dynamic_smsg();
3257 else
3258 _init_static_smsg();
3261 _init_send_queue(&smsg_queue);
3262 #if CMK_USE_OOB
3263 _init_send_queue(&smsg_oob_queue);
3264 #endif
3267 static void _init_static_msgq()
3269 gni_return_t status;
3270 /* MSGQ is to send and receive short messages for large jobs (exceeding 200,000 ranks). The performance scales by the node count rather than rank count */
3271 msgq_attrs.max_msg_sz = MSGQ_MAXSIZE;
3272 msgq_attrs.smsg_q_sz = 1;
3273 msgq_attrs.rcv_pool_sz = 1;
3274 msgq_attrs.num_msgq_eps = 2;
3275 msgq_attrs.nloc_insts = 8;
3276 msgq_attrs.modes = 0;
3277 msgq_attrs.rcv_cq_sz = REMOTE_QUEUE_ENTRIES ;
3279 status = GNI_MsgqInit(nic_hndl, NULL, NULL, NULL, &msgq_attrs, &msgq_handle);
3280 GNI_RC_CHECK("MSGQ Init", status);
3286 static CmiUInt8 total_mempool_size = 0;
3287 static CmiUInt8 total_mempool_calls = 0;
3289 #if USE_LRTS_MEMPOOL
3290 void *alloc_mempool_block(size_t *size, gni_mem_handle_t *mem_hndl, int expand_flag)
3292 void *pool;
3293 int ret;
3294 gni_return_t status = GNI_RC_SUCCESS;
3296 size_t default_size = expand_flag? _expand_mem : _mempool_size;
3297 if (*size < default_size) *size = default_size;
3298 #if LARGEPAGE
3299 // round up to be multiple of _tlbpagesize
3300 //*size = (*size + _tlbpagesize - 1)/_tlbpagesize*_tlbpagesize;
3301 *size = ALIGNHUGEPAGE(*size);
3302 #endif
3303 total_mempool_size += *size;
3304 total_mempool_calls += 1;
3305 #if !LARGEPAGE
3306 if ((*size > MAX_REG_MEM || *size > MAX_BUFF_SEND) && expand_flag)
3308 printf("Error: A mempool block with size %lld is allocated, which is greater than the maximum mempool allowed.\n Please increase the max pool size by using +gni-mempool-max or set enviorment variable CHARM_UGNI_MEMPOOL_MAX. (current=%lld, %lld)\n", *size, MAX_REG_MEM, MAX_BUFF_SEND);
3309 CmiAbort("alloc_mempool_block");
3311 #endif
3312 #if LARGEPAGE
3313 pool = my_get_huge_pages(*size);
3314 ret = pool==NULL;
3315 #else
3316 ret = posix_memalign(&pool, ALIGNBUF, *size);
3317 #endif
3318 if (ret != 0) {
3319 #if CMK_SMP && STEAL_MEMPOOL
3320 pool = steal_mempool_block(size, mem_hndl);
3321 if (pool != NULL) return pool;
3322 #endif
3323 printf("Charm++> can not allocate memory pool of size %.2fMB. \n", 1.0*(*size)/1024/1024);
3324 if (ret == ENOMEM)
3325 CmiAbort("alloc_mempool_block: out of memory.");
3326 else
3327 CmiAbort("alloc_mempool_block: posix_memalign failed");
3329 #if LARGEPAGE
3330 CmiMemLock();
3331 register_count++;
3332 MEMORY_REGISTER(onesided_hnd, nic_hndl, pool, *size, mem_hndl, &omdh, rdma_rx_cqh, status);
3333 CmiMemUnlock();
3334 if(status != GNI_RC_SUCCESS) {
3335 printf("[%d, %d] memory reigstration %f G (%lld) ask for %lld\n", myrank, CmiMyRank(), register_memory_size/(1024*1024.0*1024),register_count, *size);
3336 sweep_mempool(CpvAccess(mempool));
3338 GNI_RC_CHECK("MEMORY_REGISTER", status);
3339 #else
3340 SetMemHndlZero((*mem_hndl));
3341 #endif
3342 return pool;
3345 // ptr is a block head pointer
3346 void free_mempool_block(void *ptr, gni_mem_handle_t mem_hndl)
3348 if(!(IsMemHndlZero(mem_hndl)))
3350 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &mem_hndl, &omdh, GetSizeFromBlockHeader(ptr));
3352 #if LARGEPAGE
3353 my_free_huge_pages(ptr, GetSizeFromBlockHeader(ptr));
3354 #else
3355 free(ptr);
3356 #endif
3358 #endif
3360 void LrtsPreCommonInit(int everReturn){
3361 #if USE_LRTS_MEMPOOL
3362 CpvInitialize(mempool_type*, mempool);
3363 CpvAccess(mempool) = mempool_init(_mempool_size, alloc_mempool_block, free_mempool_block, _mempool_size_limit);
3364 MACHSTATE2(8, "mempool_init %d %p\n", CmiMyRank(), CpvAccess(mempool)) ;
3365 #endif
3368 void LrtsInit(int *argc, char ***argv, int *numNodes, int *myNodeID)
3370 register int i;
3371 int rc;
3372 int device_id = 0;
3373 unsigned int remote_addr;
3374 gni_cdm_handle_t cdm_hndl;
3375 gni_return_t status = GNI_RC_SUCCESS;
3376 uint32_t vmdh_index = -1;
3377 uint8_t ptag;
3378 unsigned int local_addr, *MPID_UGNI_AllAddr;
3379 int first_spawned;
3380 int physicalID;
3381 char *env;
3383 //void (*local_event_handler)(gni_cq_entry_t *, void *) = &LocalEventHandle;
3384 //void (*remote_smsg_event_handler)(gni_cq_entry_t *, void *) = &RemoteSmsgEventHandle;
3385 //void (*remote_bte_event_handler)(gni_cq_entry_t *, void *) = &RemoteBteEventHandle;
3387 status = PMI_Init(&first_spawned);
3388 GNI_RC_CHECK("PMI_Init", status);
3390 status = PMI_Get_size(&mysize);
3391 GNI_RC_CHECK("PMI_Getsize", status);
3393 status = PMI_Get_rank(&myrank);
3394 GNI_RC_CHECK("PMI_getrank", status);
3396 //physicalID = CmiPhysicalNodeID(myrank);
3398 //printf("Pysical Node ID:%d for PE:%d\n", physicalID, myrank);
3400 *myNodeID = myrank;
3401 *numNodes = mysize;
3403 #if MULTI_THREAD_SEND
3404 /* Currently, we only consider the case that comm. thread will only recv msgs */
3405 Cmi_smp_mode_setting = COMM_WORK_THREADS_SEND_RECV;
3406 #endif
3408 env = getenv("CHARM_UGNI_REMOTE_QUEUE_SIZE");
3409 if (env) REMOTE_QUEUE_ENTRIES = atoi(env);
3410 CmiGetArgInt(*argv,"+useRecvQueue", &REMOTE_QUEUE_ENTRIES);
3412 env = getenv("CHARM_UGNI_LOCAL_QUEUE_SIZE");
3413 if (env) LOCAL_QUEUE_ENTRIES = atoi(env);
3414 CmiGetArgInt(*argv,"+useSendQueue", &LOCAL_QUEUE_ENTRIES);
3416 env = getenv("CHARM_UGNI_DYNAMIC_SMSG");
3417 if (env) useDynamicSMSG = 1;
3418 if (!useDynamicSMSG)
3419 useDynamicSMSG = CmiGetArgFlag(*argv, "+useDynamicSmsg");
3420 CmiGetArgIntDesc(*argv, "+smsgConnection", &avg_smsg_connection,"Initial number of SMSGS connection per code");
3421 if (avg_smsg_connection>mysize) avg_smsg_connection = mysize;
3422 //useStaticMSGQ = CmiGetArgFlag(*argv, "+useStaticMsgQ");
3424 if(myrank == 0)
3426 printf("Charm++> Running on Gemini (GNI) with %d processes\n", mysize);
3427 printf("Charm++> %s SMSG\n", useDynamicSMSG?"dynamic":"static");
3429 #ifdef USE_ONESIDED
3430 onesided_init(NULL, &onesided_hnd);
3432 // this is a GNI test, so use the libonesided bypass functionality
3433 onesided_gni_bypass_get_nih(onesided_hnd, &nic_hndl);
3434 local_addr = gniGetNicAddress();
3435 #else
3436 ptag = get_ptag();
3437 cookie = get_cookie();
3438 #if 0
3439 modes = GNI_CDM_MODE_CQ_NIC_LOCAL_PLACEMENT;
3440 #endif
3441 //Create and attach to the communication domain */
3442 status = GNI_CdmCreate(myrank, ptag, cookie, modes, &cdm_hndl);
3443 GNI_RC_CHECK("GNI_CdmCreate", status);
3444 //* device id The device id is the minor number for the device
3445 //that is assigned to the device by the system when the device is created.
3446 //To determine the device number, look in the /dev directory, which contains a list of devices. For a NIC, the device is listed as kgniX
3447 //where X is the device number 0 default
3448 status = GNI_CdmAttach(cdm_hndl, device_id, &local_addr, &nic_hndl);
3449 GNI_RC_CHECK("GNI_CdmAttach", status);
3450 local_addr = get_gni_nic_address(0);
3451 #endif
3452 MPID_UGNI_AllAddr = (unsigned int *)malloc(sizeof(unsigned int) * mysize);
3453 _MEMCHECK(MPID_UGNI_AllAddr);
3454 allgather(&local_addr, MPID_UGNI_AllAddr, sizeof(unsigned int));
3455 /* create the local completion queue */
3456 /* the third parameter : The number of events the NIC allows before generating an interrupt. Setting this parameter to zero results in interrupt delivery with every event. When using this parameter, the mode parameter must be set to GNI_CQ_BLOCKING*/
3457 status = GNI_CqCreate(nic_hndl, LOCAL_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &default_tx_cqh);
3458 GNI_RC_CHECK("GNI_CqCreate (tx)", status);
3460 status = GNI_CqCreate(nic_hndl, LOCAL_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &rdma_tx_cqh);
3461 GNI_RC_CHECK("GNI_CqCreate RDMA (tx)", status);
3462 /* create the destination completion queue for receiving micro-messages, make this queue considerably larger than the number of transfers */
3464 status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &smsg_rx_cqh);
3465 GNI_RC_CHECK("Create CQ (rx)", status);
3467 status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &rdma_rx_cqh);
3468 GNI_RC_CHECK("Create Post CQ (rx)", status);
3470 //status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &rdma_cqh);
3471 //GNI_RC_CHECK("Create BTE CQ", status);
3473 /* create the endpoints. they need to be bound to allow later CQWrites to them */
3474 ep_hndl_array = (gni_ep_handle_t*)malloc(mysize * sizeof(gni_ep_handle_t));
3475 _MEMCHECK(ep_hndl_array);
3476 #if MULTI_THREAD_SEND
3477 rx_cq_lock = global_gni_lock = default_tx_cq_lock = smsg_mailbox_lock = CmiCreateLock();
3478 //default_tx_cq_lock = CmiCreateLock();
3479 rdma_tx_cq_lock = CmiCreateLock();
3480 smsg_rx_cq_lock = CmiCreateLock();
3481 //global_gni_lock = CmiCreateLock();
3482 //rx_cq_lock = CmiCreateLock();
3483 #endif
3484 for (i=0; i<mysize; i++) {
3485 if(i == myrank) continue;
3486 status = GNI_EpCreate(nic_hndl, default_tx_cqh, &ep_hndl_array[i]);
3487 GNI_RC_CHECK("GNI_EpCreate ", status);
3488 remote_addr = MPID_UGNI_AllAddr[i];
3489 status = GNI_EpBind(ep_hndl_array[i], remote_addr, i);
3490 GNI_RC_CHECK("GNI_EpBind ", status);
3493 /* SMSG is fastest but not scale; Msgq is scalable, FMA is own implementation for small message */
3494 _init_smsg();
3495 PMI_Barrier();
3497 #if USE_LRTS_MEMPOOL
3498 env = getenv("CHARM_UGNI_MAX_MEMORY_ON_NODE");
3499 if (env) {
3500 _totalmem = CmiReadSize(env);
3501 if (myrank == 0)
3502 printf("Charm++> total registered memory available per node is %.1fGB\n", (float)(_totalmem*1.0/oneGB));
3505 env = getenv("CHARM_UGNI_MEMPOOL_INIT_SIZE");
3506 if (env) _mempool_size = CmiReadSize(env);
3507 if (CmiGetArgStringDesc(*argv,"+gni-mempool-init-size",&env,"Set the memory pool size"))
3508 _mempool_size = CmiReadSize(env);
3511 env = getenv("CHARM_UGNI_MEMPOOL_MAX");
3512 if (env) {
3513 MAX_REG_MEM = CmiReadSize(env);
3514 user_set_flag = 1;
3516 if (CmiGetArgStringDesc(*argv,"+gni-mempool-max",&env,"Set the memory pool max size")) {
3517 MAX_REG_MEM = CmiReadSize(env);
3518 user_set_flag = 1;
3521 env = getenv("CHARM_UGNI_SEND_MAX");
3522 if (env) {
3523 MAX_BUFF_SEND = CmiReadSize(env);
3524 user_set_flag = 1;
3526 if (CmiGetArgStringDesc(*argv,"+gni-mempool-max-send",&env,"Set the memory pool max size for send")) {
3527 MAX_BUFF_SEND = CmiReadSize(env);
3528 user_set_flag = 1;
3531 env = getenv("CHARM_UGNI_MEMPOOL_SIZE_LIMIT");
3532 if (env) {
3533 _mempool_size_limit = CmiReadSize(env);
3536 if (MAX_REG_MEM < _mempool_size) MAX_REG_MEM = _mempool_size;
3537 if (MAX_BUFF_SEND > MAX_REG_MEM) MAX_BUFF_SEND = MAX_REG_MEM;
3539 if (myrank==0) {
3540 printf("Charm++> memory pool init block size: %1.fMB, total registered memory per node: %1.fMB\n", _mempool_size/1024.0/1024, _mempool_size_limit/1024.0/1024);
3541 printf("Charm++> memory pool registered memory limit: %1.fMB, send limit: %1.fMB\n", MAX_REG_MEM/1024.0/1024, MAX_BUFF_SEND/1024.0/1024);
3542 if (MAX_REG_MEM < BIG_MSG * 2 + oneMB) {
3543 /* memblock can expand to BIG_MSG * 2 size */
3544 printf("Charm++ Error: The mempool maximum size is too small, please use command line option +gni-mempool-max or environment variable CHARM_UGNI_MEMPOOL_MAX to increase the value to at least %1.fMB.\n", BIG_MSG * 2.0/1024/1024 + 1);
3545 CmiAbort("mempool maximum size is too small. \n");
3547 #if MULTI_THREAD_SEND
3548 printf("Charm++> worker thread sending messages\n");
3549 #elif COMM_THREAD_SEND
3550 printf("Charm++> only comm thread send/recv messages\n");
3551 #endif
3554 #endif /* end of USE_LRTS_MEMPOOL */
3556 env = getenv("CHARM_UGNI_BIG_MSG_SIZE");
3557 if (env) {
3558 BIG_MSG = CmiReadSize(env);
3559 if (BIG_MSG < ONE_SEG)
3560 CmiAbort("BIG_MSG size is too small in the environment variable CHARM_UGNI_BIG_MSG_SIZE.");
3562 env = getenv("CHARM_UGNI_BIG_MSG_PIPELINE_LEN");
3563 if (env) {
3564 BIG_MSG_PIPELINE = atoi(env);
3567 env = getenv("CHARM_UGNI_NO_DEADLOCK_CHECK");
3568 if (env) _checkProgress = 0;
3569 if (mysize == 1) _checkProgress = 0;
3573 env = getenv("HUGETLB_DEFAULT_PAGE_SIZE");
3574 if (env)
3575 _tlbpagesize = CmiReadSize(env);
3577 /* real gethugepagesize() is only available when hugetlb module linked */
3578 _tlbpagesize = gethugepagesize();
3579 if (myrank == 0) {
3580 printf("Charm++> Cray TLB page size: %1.fK\n", _tlbpagesize/1024.0);
3583 #if LARGEPAGE
3584 if (_tlbpagesize == 4096) {
3585 CmiAbort("Hugepage module, e.g. craype-hugepages8M must be loaded.");
3587 #endif
3589 print_stats = CmiGetArgFlag(*argv, "+print_stats");
3591 stats_off = CmiGetArgFlag(*argv, "+stats_off");
3593 /* init DMA buffer for medium message */
3595 //_init_DMA_buffer();
3597 free(MPID_UGNI_AllAddr);
3598 #if CMK_SMP
3599 sendRdmaBuf = PCQueueCreate();
3600 #else
3601 sendRdmaBuf = 0;
3602 #endif
3603 #if MACHINE_DEBUG_LOG
3604 char ln[200];
3605 sprintf(ln,"debugLog.%d",myrank);
3606 debugLog=fopen(ln,"w");
3607 #endif
3609 #if CMK_WITH_STATS
3610 if (print_stats){
3611 char ln[200];
3612 int code = mkdir("counters", 00777);
3613 sprintf(ln,"counters/statistics.%d.%d", mysize, myrank);
3614 counterLog=fopen(ln,"w");
3616 #endif
3617 // NTK_Init();
3618 // ntk_return_t sts = NTK_System_GetSmpdCount(&_smpd_count);
3620 #if REMOTE_EVENT
3621 IndexPool_init(&ackPool);
3622 #endif
3624 #if CMK_WITH_STATS
3625 init_comm_stats();
3626 #endif
3629 void* LrtsAlloc(int n_bytes, int header)
3631 void *ptr = NULL;
3632 #if 0
3633 printf("\n[PE:%d]Alloc Lrts for bytes=%d, head=%d %d\n", CmiMyPe(), n_bytes, header, SMSG_MAX_MSG);
3634 #endif
3635 if(n_bytes <= SMSG_MAX_MSG)
3637 int totalsize = n_bytes+header;
3638 ptr = malloc(totalsize);
3640 else {
3641 CmiAssert(header+sizeof(mempool_header) <= ALIGNBUF);
3642 #if USE_LRTS_MEMPOOL
3643 n_bytes = ALIGN64(n_bytes);
3644 if(n_bytes < BIG_MSG)
3646 char *res = mempool_malloc(CpvAccess(mempool), ALIGNBUF+n_bytes-sizeof(mempool_header), 1);
3647 if (res) ptr = res - sizeof(mempool_header) + ALIGNBUF - header;
3648 }else
3650 #if LARGEPAGE
3651 //printf("[%d] LrtsAlloc a big_msg: %d %d\n", myrank, n_bytes, ALIGNHUGEPAGE(n_bytes+ALIGNBUF));
3652 n_bytes = ALIGNHUGEPAGE(n_bytes+ALIGNBUF);
3653 char *res = my_get_huge_pages(n_bytes);
3654 #else
3655 char *res = memalign(ALIGNBUF, n_bytes+ALIGNBUF);
3656 #endif
3657 if (res) ptr = res + ALIGNBUF - header;
3659 #else
3660 n_bytes = ALIGN64(n_bytes); /* make sure size if 4 aligned */
3661 char *res = memalign(ALIGNBUF, n_bytes+ALIGNBUF);
3662 ptr = res + ALIGNBUF - header;
3663 #endif
3665 #if CMK_PERSISTENT_COMM
3666 if (ptr) SetMemHndlZero(MEMHFIELD((char*)ptr+header));
3667 #endif
3668 return ptr;
3671 void LrtsFree(void *msg)
3673 CmiUInt4 size = SIZEFIELD((char*)msg+sizeof(CmiChunkHeader));
3674 #if CMK_PERSISTENT_COMM
3675 if (!IsMemHndlZero(MEMHFIELD((char*)msg+sizeof(CmiChunkHeader)))) return;
3676 #endif
3677 if (size <= SMSG_MAX_MSG)
3678 free(msg);
3679 else {
3680 size = ALIGN64(size);
3681 if(size>=BIG_MSG)
3683 #if LARGEPAGE
3684 int s = ALIGNHUGEPAGE(size+ALIGNBUF);
3685 my_free_huge_pages((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF, s);
3686 #else
3687 free((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF);
3688 #endif
3690 else {
3691 #if USE_LRTS_MEMPOOL
3692 #if CMK_SMP
3693 mempool_free_thread((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF + sizeof(mempool_header));
3694 #else
3695 mempool_free(CpvAccess(mempool), (char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF + sizeof(mempool_header));
3696 #endif
3697 #else
3698 free((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF);
3699 #endif
3704 void LrtsExit()
3706 #if CMK_WITH_STATS
3707 #if CMK_SMP
3708 if(CmiMyRank() == CmiMyNodeSize())
3709 #endif
3710 if (print_stats) print_comm_stats();
3711 #endif
3712 /* free memory ? */
3713 #if USE_LRTS_MEMPOOL
3714 //printf("FINAL [%d, %d] register=%lld, send=%lld\n", myrank, CmiMyRank(), register_memory_size, buffered_send_msg);
3715 mempool_destroy(CpvAccess(mempool));
3716 #endif
3717 PMI_Finalize();
3718 exit(0);
3721 void LrtsDrainResources()
3723 if(mysize == 1) return;
3724 while (
3725 #if CMK_USE_OOB
3726 !SendBufferMsg(&smsg_oob_queue) ||
3727 #endif
3728 !SendBufferMsg(&smsg_queue)
3731 if (useDynamicSMSG)
3732 PumpDatagramConnection();
3733 PumpNetworkSmsg();
3734 PumpLocalTransactions(default_tx_cqh, default_tx_cq_lock);
3735 PumpLocalTransactions(rdma_tx_cqh, rdma_tx_cq_lock);
3736 SendRdmaMsg();
3738 PMI_Barrier();
3741 void LrtsAbort(const char *message) {
3742 fprintf(stderr, "[%d] CmiAbort: %s\n", myrank, message);
3743 CmiPrintStackTrace(0);
3744 PMI_Abort(-1, message);
3747 /************************** TIMER FUNCTIONS **************************/
3748 #if CMK_TIMER_USE_SPECIAL
3749 /* MPI calls are not threadsafe, even the timer on some machines */
3750 static CmiNodeLock timerLock = 0;
3751 static int _absoluteTime = 0;
3752 static int _is_global = 0;
3753 static struct timespec start_ts;
3755 inline int CmiTimerIsSynchronized() {
3756 return 0;
3759 inline int CmiTimerAbsolute() {
3760 return _absoluteTime;
3763 double CmiStartTimer() {
3764 return 0.0;
3767 double CmiInitTime() {
3768 return (double)(start_ts.tv_sec)+(double)start_ts.tv_nsec/1000000000.0;
3771 void CmiTimerInit(char **argv) {
3772 _absoluteTime = CmiGetArgFlagDesc(argv,"+useAbsoluteTime", "Use system's absolute time as wallclock time.");
3773 if (_absoluteTime && CmiMyPe() == 0)
3774 printf("Charm++> absolute timer is used\n");
3776 _is_global = CmiTimerIsSynchronized();
3779 if (_is_global) {
3780 if (CmiMyRank() == 0) {
3781 clock_gettime(CLOCK_MONOTONIC, &start_ts);
3783 } else { /* we don't have a synchronous timer, set our own start time */
3784 CmiBarrier();
3785 CmiBarrier();
3786 CmiBarrier();
3787 clock_gettime(CLOCK_MONOTONIC, &start_ts);
3789 CmiNodeAllBarrier(); /* for smp */
3793 * Since the timerLock is never created, and is
3794 * always NULL, then all the if-condition inside
3795 * the timer functions could be disabled right
3796 * now in the case of SMP.
3798 double CmiTimer(void) {
3799 struct timespec now_ts;
3800 clock_gettime(CLOCK_MONOTONIC, &now_ts);
3801 return _absoluteTime?((double)(now_ts.tv_sec)+(double)now_ts.tv_nsec/1000000000.0)
3802 : (double)( now_ts.tv_sec - start_ts.tv_sec ) + (((double) now_ts.tv_nsec - (double) start_ts.tv_nsec) / 1000000000.0);
3805 double CmiWallTimer(void) {
3806 struct timespec now_ts;
3807 clock_gettime(CLOCK_MONOTONIC, &now_ts);
3808 return _absoluteTime?((double)(now_ts.tv_sec)+(double)now_ts.tv_nsec/1000000000.0)
3809 : ( now_ts.tv_sec - start_ts.tv_sec ) + ((now_ts.tv_nsec - start_ts.tv_nsec) / 1000000000.0);
3812 double CmiCpuTimer(void) {
3813 struct timespec now_ts;
3814 clock_gettime(CLOCK_MONOTONIC, &now_ts);
3815 return _absoluteTime?((double)(now_ts.tv_sec)+(double)now_ts.tv_nsec/1000000000.0)
3816 : (double)( now_ts.tv_sec - start_ts.tv_sec ) + (((double) now_ts.tv_nsec - (double) start_ts.tv_nsec) / 1000000000.0);
3819 #endif
3820 /************Barrier Related Functions****************/
3822 int CmiBarrier()
3824 gni_return_t status;
3826 #if CMK_SMP
3827 /* make sure all ranks reach here, otherwise comm threads may reach barrier ignoring other ranks */
3828 CmiNodeAllBarrier();
3829 if (CmiMyRank() == CmiMyNodeSize())
3830 #else
3831 if (CmiMyRank() == 0)
3832 #endif
3835 * The call of CmiBarrier is usually before the initialization
3836 * of trace module of Charm++, therefore, the START_EVENT
3837 * and END_EVENT are disabled here. -Chao Mei
3839 /*START_EVENT();*/
3840 status = PMI_Barrier();
3841 GNI_RC_CHECK("PMI_Barrier", status);
3842 /*END_EVENT(10);*/
3844 CmiNodeAllBarrier();
3845 return 0;
3848 #if CMK_DIRECT
3849 #include "machine-cmidirect.c"
3850 #endif
3851 #if CMK_PERSISTENT_COMM
3852 #include "machine-persistent.c"
3853 #endif