Apply minor cleanup to mempool
[charm.git] / src / arch / gni / machine.C
blob3e0fd388e4f2a7ba69c4d8553b5c28fe24984468
2 /** @file
3  * GNI machine layer
4  *
5  * Author:   Yanhua Sun
6              Gengbin Zheng
7  * Date:   07-01-2011
8  *
9  *  Flow control by mem pool using environment variables:
11     # CHARM_UGNI_MEMPOOL_MAX can be maximum_register_mem/number_of_processes
12     # CHARM_UGNI_SEND_MAX can be half of CHARM_UGNI_MEMPOOL_MAX
13     export CHARM_UGNI_MEMPOOL_INIT_SIZE=8M
14     export CHARM_UGNI_MEMPOOL_MAX=20M
15     export CHARM_UGNI_SEND_MAX=10M
17     # limit on total mempool size allocated, this is to prevent mempool
18     # uses too much memory
19     export CHARM_UGNI_MEMPOOL_SIZE_LIMIT=512M 
21     other environment variables:
23     export CHARM_UGNI_NO_DEADLOCK_CHECK=yes    # disable checking deadlock
24     export CHARM_UGNI_MAX_MEMORY_ON_NODE=0.8G  # max memory per node for mempool
25     export CHARM_UGNI_BIG_MSG_SIZE=4M          # set big message size protocol
26     export CHARM_UGNI_BIG_MSG_PIPELINE_LEN=4   # set big message pipe len
27     export CHARM_UGNI_RDMA_MAX=100             # max pending RDMA operations
28  */
29 /*@{*/
31 #include <stdio.h>
32 #include <stdlib.h>
33 #include <stdint.h>
34 #include <errno.h>
35 #include <malloc.h>
36 #include <unistd.h>
37 #include <time.h>
38 #include <sys/dir.h>
39 #include <sys/stat.h>
40 #include <gni_pub.h>
41 #include <pmi.h>
42 //#include <numatoolkit.h>
44 #include "converse.h"
46 #if CMK_DIRECT
47 #define DIRECT_SEQ 0xFFFFFFE 
48 #include "cmidirect.h"
49 #endif
51 #if REGULARPAGE 
52 #define     LARGEPAGE              0
53 #else
54 #define     LARGEPAGE              1
55 #endif
57 #if CMK_SMP
58 #define MULTI_THREAD_SEND          0
59 #define COMM_THREAD_SEND           (!MULTI_THREAD_SEND)
60 #endif
62 #if MULTI_THREAD_SEND
63 #define CMK_WORKER_SINGLE_TASK     1
64 #endif
66 #define REMOTE_EVENT               1
68 #define CQWRITE                    0
70 #define CMI_EXERT_SEND_LARGE_CAP   0
71 #define CMI_EXERT_RECV_RDMA_CAP    0
74 #define CMI_SENDBUFFERSMSG_CAP            0
75 #define CMI_PUMPNETWORKSMSG_CAP           0
76 #define CMI_PUMPREMOTETRANSACTIONS_CAP    0
77 #define CMI_PUMPLOCALTRANSACTIONS_CAP     0
79 #if CMI_SENDBUFFERSMSG_CAP
80 int     SendBufferMsg_cap  = 20;
81 #endif
83 #if CMI_PUMPNETWORKSMSG_CAP
84 int     PumpNetworkSmsg_cap = 20;
85 #endif
87 #if CMI_PUMPREMOTETRANSACTIONS_CAP
88 int     PumpRemoteTransactions_cap = 20;
89 #endif
91 #if CMI_PUMPREMOTETRANSACTIONS_CAP
92 int     PumpLocalTransactions_cap = 15;
93 #endif
95 #if CMI_EXERT_SEND_LARGE_CAP
96 static int SEND_large_cap = 20;
97 static int SEND_large_pending = 0;
98 #endif
100 #if CMI_EXERT_RECV_RDMA_CAP
101 static int   RDMA_cap =   10;
102 static int   RDMA_pending = 0;
103 #endif
105 enum CMK_SMSG_TYPE {
106   CHARM_SMSG=1,
107   NONCHARM_SMSG,
108   NONCHARM_SMSG_DONT_FREE
111 #define USE_LRTS_MEMPOOL                  1
113 #define PRINT_SYH                         0
115 // Trace communication thread
116 #if CMK_TRACE_ENABLED && CMK_SMP_TRACE_COMMTHREAD
117 #define TRACE_THRESHOLD     0.00001
118 #undef CMI_MACH_TRACE_USEREVENTS
119 #define CMI_MACH_TRACE_USEREVENTS 1
120 #else
121 #undef CMK_SMP_TRACE_COMMTHREAD
122 #define CMK_SMP_TRACE_COMMTHREAD 0
123 #endif
125 #define CMK_TRACE_COMMOVERHEAD 0
126 #if CMK_TRACE_ENABLED && CMK_TRACE_COMMOVERHEAD
127 #undef CMI_MACH_TRACE_USEREVENTS
128 #define CMI_MACH_TRACE_USEREVENTS 1
129 #else
130 #undef CMK_TRACE_COMMOVERHEAD
131 #define CMK_TRACE_COMMOVERHEAD 0
132 #endif
134 #if CMI_MACH_TRACE_USEREVENTS && CMK_TRACE_ENABLED && ! CMK_TRACE_IN_CHARM
135 CpvStaticDeclare(double, projTraceStart);
136 #define  START_EVENT()  CpvAccess(projTraceStart) = CmiWallTimer();
137 #define  END_EVENT(x)   traceUserBracketEvent(x, CpvAccess(projTraceStart), CmiWallTimer());
138 #define  EVENT_TIME()   CpvAccess(projTraceStart)
139 #else
140 #define  START_EVENT()
141 #define  END_EVENT(x)
142 #define  EVENT_TIME()   (0.0)
143 #endif
145 #if USE_LRTS_MEMPOOL
147 #define oneMB (1024ll*1024)
148 #define oneGB (1024ll*1024*1024)
150 static CmiInt8 _mempool_size = 8*oneMB;
151 static CmiInt8 _expand_mem =  4*oneMB;
152 static CmiInt8 _mempool_size_limit = 0;
154 static CmiInt8 _totalmem = 0.8*oneGB;
156 #if LARGEPAGE
157 static CmiInt8 BIG_MSG  =  16*oneMB;
158 static CmiInt8 ONE_SEG  =  4*oneMB;
159 #else
160 static CmiInt8 BIG_MSG  =  4*oneMB;
161 static CmiInt8 ONE_SEG  =  2*oneMB;
162 #endif
163 #if MULTI_THREAD_SEND
164 static int BIG_MSG_PIPELINE = 1;
165 #else
166 static int BIG_MSG_PIPELINE = 4;
167 #endif
169 // dynamic flow control
170 static CmiInt8 buffered_send_msg = 0;
171 static CmiInt8 register_memory_size = 0;
173 #if LARGEPAGE
174 static CmiInt8  MAX_BUFF_SEND  =  100000*oneMB;
175 static CmiInt8  MAX_REG_MEM    =  200000*oneMB;
176 static CmiInt8  register_count = 0;
177 #else
178 #if CMK_SMP && COMM_THREAD_SEND 
179 static CmiInt8  MAX_BUFF_SEND  =  100*oneMB;
180 static CmiInt8  MAX_REG_MEM    =  200*oneMB;
181 #else
182 static CmiInt8  MAX_BUFF_SEND  =  16*oneMB;
183 static CmiInt8  MAX_REG_MEM    =  25*oneMB;
184 #endif
187 #endif
189 #endif     /* end USE_LRTS_MEMPOOL */
191 #if MULTI_THREAD_SEND 
192 #define     CMI_GNI_LOCK(x)       CmiLock(x);
193 #define     CMI_GNI_TRYLOCK(x)       CmiTryLock(x)
194 #define     CMI_GNI_UNLOCK(x)        CmiUnlock(x);
195 #define     CMI_PCQUEUEPOP_LOCK(Q)   CmiLock((Q)->lock);
196 #define     CMI_PCQUEUEPOP_UNLOCK(Q)    CmiUnlock((Q)->lock);
197 #else
198 #define     CMI_GNI_LOCK(x)
199 #define     CMI_GNI_TRYLOCK(x)         (0)
200 #define     CMI_GNI_UNLOCK(x)
201 #define     CMI_PCQUEUEPOP_LOCK(Q)   
202 #define     CMI_PCQUEUEPOP_UNLOCK(Q)
203 #endif
205 static int _tlbpagesize = 4096;
207 //static int _smpd_count  = 0;
209 static int   user_set_flag  = 0;
211 static int _checkProgress = 1;             /* check deadlock */
212 static int _detected_hang = 0;
214 #define             SMSG_ATTR_SIZE      sizeof(gni_smsg_attr_t)
216 // dynamic SMSG
217 static int useDynamicSMSG = 0;               /* dynamic smsgs setup */
219 static int avg_smsg_connection = 32;
220 static int                 *smsg_connected_flag= 0;
221 static gni_smsg_attr_t     **smsg_attr_vector_local;
222 static gni_smsg_attr_t     **smsg_attr_vector_remote;
223 static gni_ep_handle_t     ep_hndl_unbound;
224 static gni_smsg_attr_t     send_smsg_attr;
225 static gni_smsg_attr_t     recv_smsg_attr;
227 typedef struct _dynamic_smsg_mailbox{
228    void     *mailbox_base;
229    int      size;
230    int      offset;
231    gni_mem_handle_t  mem_hndl;
232    struct      _dynamic_smsg_mailbox  *next;
233 }dynamic_smsg_mailbox_t;
235 static dynamic_smsg_mailbox_t  *mailbox_list;
237 static CmiUInt8  smsg_send_count = 0,  last_smsg_send_count = 0;
238 static CmiUInt8  smsg_recv_count = 0,  last_smsg_recv_count = 0;
240 #if PRINT_SYH
241 int         lrts_send_msg_id = 0;
242 int         lrts_local_done_msg = 0;
243 int         lrts_send_rdma_success = 0;
244 #endif
246 #include "machine.h"
248 #include "pcqueue.h"
250 #include "mempool.h"
252 #if CMK_PERSISTENT_COMM
253 #define PERSISTENT_GET_BASE 0 
254 #if !PERSISTENT_GET_BASE
255 #define CMK_PERSISTENT_COMM_PUT 1 
256 #endif
257 #include "machine-persistent.h"
258 #define  POST_HIGHPRIORITY_RDMA    STATS_SENDRDMAMSG_TIME(SendRdmaMsg(sendHighPriorBuf));
259 #else  
260 #define  POST_HIGHPRIORITY_RDMA   
261 #endif
263 #if REMOTE_EVENT && (CMK_USE_OOB || CMK_PERSISTENT_COMM_PUT) 
264 #define  PUMP_REMOTE_HIGHPRIORITY    STATS_PUMPREMOTETRANSACTIONS_TIME(PumpRemoteTransactions(highpriority_rx_cqh) );
265 #else
266 #define  PUMP_REMOTE_HIGHPRIORITY
267 #endif
269 //#define  USE_ONESIDED 1
270 #ifdef USE_ONESIDED
271 //onesided implementation is wrong, since no place to restore omdh
272 #include "onesided.h"
273 onesided_hnd_t   onesided_hnd;
274 onesided_md_t    omdh;
275 #define MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh)  omdh. onesided_mem_register(handler, (uint64_t)msg, size, 0, myomdh) 
277 #define MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh) onesided_mem_deregister(handler, myomdh)
279 #else
280 uint8_t   onesided_hnd, omdh;
282 #if REMOTE_EVENT || CQWRITE 
283 #define  MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdhh, cqh, status) \
284     if(register_memory_size+size>= MAX_REG_MEM) { \
285         status = GNI_RC_ERROR_NOMEM;} \
286     else {status = GNI_MemRegister(nic_hndl, (uint64_t)msg,  (uint64_t)size, cqh,  GNI_MEM_READWRITE, -1, mem_hndl); \
287         if(status == GNI_RC_SUCCESS) register_memory_size += size; }  
288 #else
289 #define  MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh, cqh, status ) \
290         if (register_memory_size + size >= MAX_REG_MEM) { \
291             status = GNI_RC_ERROR_NOMEM; \
292         } else { status = GNI_MemRegister(nic_hndl, (uint64_t)msg,  (uint64_t)size, NULL,  GNI_MEM_READWRITE, -1, mem_hndl); \
293             if(status == GNI_RC_SUCCESS) register_memory_size += size; } 
294 #endif
296 #define  MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh, size)  \
297     do { if (GNI_MemDeregister(nic_hndl, (mem_hndl) ) == GNI_RC_SUCCESS) \
298              register_memory_size -= size; \
299          else CmiAbort("MEM_DEregister");  \
300     } while (0)
301 #endif
303 #define   GetMempoolBlockPtr(x)   MEMPOOL_GetBlockPtr(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
304 #define   GetMempoolPtr(x)        MEMPOOL_GetMempoolPtr(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
305 #define   GetMempoolsize(x)       MEMPOOL_GetSize(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
306 #define   GetMemHndl(x)           MEMPOOL_GetMemHndl(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
307 #define   IncreaseMsgInRecv(x)    MEMPOOL_IncMsgInRecv(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
308 #define   DecreaseMsgInRecv(x)    MEMPOOL_DecMsgInRecv(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
309 #define   IncreaseMsgInSend(x)    MEMPOOL_IncMsgInSend(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
310 #define   DecreaseMsgInSend(x)    MEMPOOL_DecMsgInSend(MEMPOOL_GetMempoolHeader(x,ALIGNBUF))
311 #define   NoMsgInSend(x)          MEMPOOL_GetMsgInSend(MEMPOOL_GetMempoolHeader(x,ALIGNBUF)) == 0
312 #define   NoMsgInRecv(x)          MEMPOOL_GetMsgInRecv(MEMPOOL_GetMempoolHeader(x,ALIGNBUF)) == 0
313 #define   NoMsgInFlight(x)        (NoMsgInSend(x) && NoMsgInRecv(x))
314 #define   IsMemHndlZero(x)        ((x).qword1 == 0 && (x).qword2 == 0)
315 #define   SetMemHndlZero(x)       do {(x).qword1 = 0;(x).qword2 = 0;} while (0)
316 #define   NotRegistered(x)        IsMemHndlZero(GetMemHndl(x))
318 #define   GetMemHndlFromBlockHeader(x) MEMPOOL_GetBlockMemHndl(x)
319 #define   GetSizeFromBlockHeader(x)    MEMPOOL_GetBlockSize(x)
321 #define CmiGetMsgSize(m)     ((CmiMsgHeaderExt*)m)->size
322 #define CmiSetMsgSize(m,s)   ((((CmiMsgHeaderExt*)m)->size)=(s))
323 #define CmiGetMsgSeq(m)      ((CmiMsgHeaderExt*)m)->seq
324 #define CmiSetMsgSeq(m, s)   ((((CmiMsgHeaderExt*)m)->seq) = (s))
326 #define ALIGNBUF                64
328 /* =======Beginning of Definitions of Performance-Specific Macros =======*/
329 /* If SMSG is not used */
331 #define FMA_PER_CORE  1024
332 #define FMA_BUFFER_SIZE 1024
334 /* If SMSG is used */
335 static int  SMSG_MAX_MSG = 1024;
336 #define SMSG_MAX_CREDIT    72
338 #define MSGQ_MAXSIZE       2048
340 /* large message transfer with FMA or BTE */
341 #if ! REMOTE_EVENT
342 #define LRTS_GNI_RDMA_THRESHOLD  1024 
343 #else
344    /* remote events only work with RDMA */
345 #define LRTS_GNI_RDMA_THRESHOLD  0 
346 #endif
348 #if CMK_SMP
349 static int  REMOTE_QUEUE_ENTRIES=163840; 
350 static int LOCAL_QUEUE_ENTRIES=163840; 
351 #else
352 static int  REMOTE_QUEUE_ENTRIES=20480;
353 static int LOCAL_QUEUE_ENTRIES=20480; 
354 #endif
356 #define BIG_MSG_TAG             0x26
357 #define PUT_DONE_TAG            0x28
358 #define DIRECT_PUT_DONE_TAG     0x29
359 #define ACK_TAG                 0x30
360 /* SMSG is data message */
361 #define SMALL_DATA_TAG          0x31
362 /* SMSG is a control message to initialize a BTE */
363 #define LMSG_INIT_TAG           0x33 
364 #define LMSG_PERSISTENT_INIT_TAG           0x34 
365 #define LMSG_OOB_INIT_TAG       0x35
366 #define RDMA_ACK_TAG            0x36
367 #define RDMA_PUT_MD_TAG         0x37
368 #define RDMA_PUT_DONE_TAG       0x38
370 #define RDMA_PUT_MD_DIRECT_TAG  0x39
371 #define RDMA_PUT_DONE_DIRECT_TAG 0x40
372 #define RDMA_DEREG_DIRECT_TAG    0x41
374 #define RDMA_REG_AND_PUT_MD_DIRECT_TAG  0x42
375 #define RDMA_REG_AND_GET_MD_DIRECT_TAG  0x43
377 #if CMK_SMP
378 #define RDMA_COMM_PERFORM_GET_TAG     0x44
379 #define RDMA_COMM_PERFORM_PUT_TAG     0x45
380 #endif
382 #define DEBUG
383 #ifdef GNI_RC_CHECK
384 #undef GNI_RC_CHECK
385 #endif
386 #ifdef DEBUG
387 #define GNI_RC_CHECK(msg,rc) do { if(rc != GNI_RC_SUCCESS) {           printf("[%d] %s; err=%s\n",CmiMyPe(),msg,gni_err_str[rc]); fflush(stdout); CmiAbort("GNI_RC_CHECK"); } } while(0)
388 #else
389 #define GNI_RC_CHECK(msg,rc)
390 #endif
392 #define ALIGN64(x)       (size_t)((~63)&((x)+63))
393 //#define ALIGN4(x)        (size_t)((~3)&((x)+3)) 
394 #define ALIGNHUGEPAGE(x)   (size_t)((~(_tlbpagesize-1))&((x)+_tlbpagesize-1))
396 static int useStaticMSGQ = 0;
397 static int useStaticFMA = 0;
398 static int mysize, myrank;
399 static gni_nic_handle_t   nic_hndl;
401 typedef struct {
402     gni_mem_handle_t mdh;
403     uint64_t addr;
404 } mdh_addr_t ;
405 // this is related to dynamic SMSG
407 typedef struct mdh_addr_list{
408     gni_mem_handle_t mdh;
409    void *addr;
410     struct mdh_addr_list *next;
411 }mdh_addr_list_t;
413 static unsigned int         smsg_memlen;
414 gni_smsg_attr_t    **smsg_local_attr_vec = 0;
415 mdh_addr_t          setup_mem;
416 mdh_addr_t          *smsg_connection_vec = 0;
417 gni_mem_handle_t    smsg_connection_memhndl;
418 static int          smsg_expand_slots = 10;
419 static int          smsg_available_slot = 0;
420 static void         *smsg_mailbox_mempool = 0;
421 mdh_addr_list_t     *smsg_dynamic_list = 0;
423 static void             *smsg_mailbox_base;
424 gni_msgq_attr_t         msgq_attrs;
425 gni_msgq_handle_t       msgq_handle;
426 gni_msgq_ep_attr_t      msgq_ep_attrs;
427 gni_msgq_ep_attr_t      msgq_ep_attrs_size;
429 /* =====Beginning of Declarations of Machine Specific Variables===== */
430 static int cookie;
431 static int modes = 0;
432 static gni_cq_handle_t       smsg_rx_cqh = NULL;      // smsg send
433 static gni_cq_handle_t       default_tx_cqh = NULL;   // bind to endpoint
434 static gni_cq_handle_t       rdma_tx_cqh = NULL;      // rdma - local event
435 static gni_cq_handle_t       highprior_rdma_tx_cqh = NULL;      // rdma - local event
436 static gni_cq_handle_t       rdma_rx_cqh = NULL;      // mempool - remote event
437 static gni_cq_handle_t       highpriority_rx_cqh = NULL;      // mempool - remote event
438 static gni_ep_handle_t       *ep_hndl_array;
440 static CmiNodeLock           *ep_lock_array;
441 static CmiNodeLock           default_tx_cq_lock; 
442 static CmiNodeLock           rdma_tx_cq_lock; 
443 static CmiNodeLock           global_gni_lock; 
444 static CmiNodeLock           rx_cq_lock;
445 static CmiNodeLock           smsg_mailbox_lock;
446 static CmiNodeLock           smsg_rx_cq_lock;
447 static CmiNodeLock           *mempool_lock;
449 #if CMK_ONESIDED_IMPL
450 static gni_cq_handle_t       rdma_onesided_cqh = NULL;
451 static CmiNodeLock           rdma_onesided_cq_lock;
452 #endif
454 //#define     CMK_WITH_STATS      1
455 typedef struct msg_list
457     uint32_t destNode;
458     uint32_t size;
459     void *msg;
460     uint8_t tag;
461 #if CMK_WITH_STATS
462     double  creation_time;
463 #endif
464 }MSG_LIST;
467 typedef struct control_msg
469     uint64_t            source_addr;    /* address from the start of buffer  */
470     uint64_t            dest_addr;      /* address from the start of buffer */
471     int                 total_length;   /* total length */
472     int                 length;         /* length of this packet */
473 #if REMOTE_EVENT
474     int                 ack_index;      /* index from integer to address */
475 #endif
476     int     seq_id;         //big message   0 meaning single message
477     gni_mem_handle_t    source_mem_hndl;
478 #if PERSISTENT_GET_BASE
479     gni_mem_handle_t    dest_mem_hndl;
480 #endif
481     struct control_msg  *next;
482 } CONTROL_MSG;
484 #define CONTROL_MSG_SIZE       (sizeof(CONTROL_MSG)-sizeof(void*))
486 typedef struct ack_msg
488     uint64_t            source_addr;    /* address from the start of buffer  */
489 #if ! USE_LRTS_MEMPOOL
490     gni_mem_handle_t    source_mem_hndl;
491     int                 length;          /* total length */
492 #endif
493     struct ack_msg     *next;
494 } ACK_MSG;
496 #define ACK_MSG_SIZE       (sizeof(ACK_MSG)-sizeof(void*))
498 #if CMK_DIRECT
499 typedef struct{
500     uint64_t    handler_addr;
501 }CMK_DIRECT_HEADER;
503 typedef struct {
504     char core[CmiMsgHeaderSizeBytes];
505     uint64_t handler;
506 }cmidirectMsg;
508 //SYH
509 CpvDeclare(int, CmiHandleDirectIdx);
510 void CmiHandleDirectMsg(cmidirectMsg* msg)
513     CmiDirectUserHandle *_handle= (CmiDirectUserHandle*)(msg->handler);
514    (*(_handle->callbackFnPtr))(_handle->callbackData);
515    CmiFree(msg);
518 void CmiDirectInit()
520     CpvInitialize(int,  CmiHandleDirectIdx);
521     CpvAccess(CmiHandleDirectIdx) = CmiRegisterHandler( (CmiHandler) CmiHandleDirectMsg);
524 #endif
525 typedef struct  rmda_msg
527     int                   destNode;
528 #if REMOTE_EVENT
529     int                   ack_index;
530 #endif
531     gni_post_descriptor_t *pd;
532 }RDMA_REQUEST;
535 #if CMK_SMP
536 #define SMP_LOCKS                       1
537 #else
538 #define SMP_LOCKS                       0
539 #endif
540 #if CMK_LOCKLESS_QUEUE
541 #define ONE_SEND_QUEUE                  1
542 #else
543 #define ONE_SEND_QUEUE                  0
544 #endif
546 typedef PCQueue BufferList;
547 typedef struct  msg_list_index
549     PCQueue       sendSmsgBuf;
550 #if  SMP_LOCKS
551     CmiNodeLock   lock;
552     int           pushed;
553     int           destpe;
554 #endif
555 } MSG_LIST_INDEX;
556 char                *destpe_avail;
557 PCQueue sendRdmaBuf;
558 PCQueue sendHighPriorBuf;
559 // buffered send queue
560 #if ! ONE_SEND_QUEUE
561 typedef struct smsg_queue
563     MSG_LIST_INDEX   *smsg_msglist_index;
564     int               smsg_head_index;
565 #if  SMP_LOCKS
566     PCQueue     nonEmptyQueues;
567 #endif
568 } SMSG_QUEUE;
569 #else
570 typedef struct smsg_queue
572 #if CMK_LOCKLESS_QUEUE
573     MPMCQueue       sendMsgBuf;
574 #else
575     PCQueue       sendMsgBuf;
576 #endif
577 }  SMSG_QUEUE;
578 #endif
580 SMSG_QUEUE                  smsg_queue;
581 #if CMK_USE_OOB
582 SMSG_QUEUE                  smsg_oob_queue;
583 #define SEND_OOB_SMSG(x)            SendBufferMsg(&x, NULL);
584 #define PUMP_LOCAL_HIGHPRIORITY    STATS_PUMPLOCALTRANSACTIONS_RDMA_TIME(PumpLocalTransactions(highprior_rdma_tx_cqh,  rdma_tx_cq_lock)); 
585 #else
586 #define SEND_OOB_SMSG(x)            
587 #define PUMP_LOCAL_HIGHPRIORITY     
588 #endif
590 #define FreeMsgList(d)   free(d);
591 #define MallocMsgList(d)  d = ((MSG_LIST*)malloc(sizeof(MSG_LIST)));
593 #define FreeControlMsg(d)      free(d);
594 #define MallocControlMsg(d)    d = ((CONTROL_MSG*)malloc(sizeof(CONTROL_MSG)));
596 #define FreeAckMsg(d)      free(d);
597 #define MallocAckMsg(d)    d = ((ACK_MSG*)malloc(sizeof(ACK_MSG)));
599 #define FreeRdmaRequest(d)       free(d);
600 #define MallocRdmaRequest(d)     d = ((RDMA_REQUEST*)malloc(sizeof(RDMA_REQUEST)));   
601 /* reuse gni_post_descriptor_t */
602 static gni_post_descriptor_t *post_freelist=0;
604 #define FreePostDesc(d)     free(d);
605 #define MallocPostDesc(d)   d = ((gni_post_descriptor_t*)malloc(sizeof(gni_post_descriptor_t))); _MEMCHECK(d);
608 /* LrtsSent is called but message can not be sent by SMSGSend because of mailbox full or no credit */
609 static int      buffered_smsg_counter = 0;
611 /* SmsgSend return success but message sent is not confirmed by remote side */
612 static MSG_LIST *buffered_fma_head = 0;
613 static MSG_LIST *buffered_fma_tail = 0;
615 /* functions  */
616 #define IsFree(a,ind)  !( a& (1<<(ind) ))
617 #define SET_BITS(a,ind) a = ( a | (1<<(ind )) )
618 #define Reset(a,ind) a = ( a & (~(1<<(ind))) )
620 CpvDeclare(mempool_type*, mempool);
622 #if CMK_PERSISTENT_COMM_PUT
623 CpvDeclare(mempool_type*, persistent_mempool);
624 #endif
626 #if REMOTE_EVENT || CMK_SMSGS_FREE_AFTER_EVENT 
627 struct IndexStruct {
628 void *addr;
629 int next;
630 int type;    
633 typedef struct IndexPool {
634     struct IndexStruct   *indexes;
635     int                   size;
636     int                   freehead;
637     CmiNodeLock           lock;
638     int                   maxsize;
639 } IndexPool;
641 #define  GetIndexType(pool, s)             (pool.indexes[s].type)
642 #define  GetIndexAddress(pool, s)          (pool.indexes[s].addr)
644 static void IndexPool_init(IndexPool *pool, int initsize, int maxsize)
646     int i;
647     pool->size = initsize;
648     pool->maxsize = maxsize;
649     pool->indexes = (struct IndexStruct *)malloc(pool->size*sizeof(struct IndexStruct));
650     if(pool->indexes == NULL) {
651         CmiAbort("malloc of pool is null\n");
652     }
653     for (i=0; i<pool->size-1; i++) {
654         pool->indexes[i].next = i+1;
655     }
656     pool->indexes[i].next = -1;
657     pool->freehead = 0;
658 #if MULTI_THREAD_SEND || CMK_PERSISTENT_COMM_PUT
659     pool->lock  = CmiCreateLock();
660 #else
661     pool->lock  = 0;
662 #endif
665 static int IndexPool_getslot(IndexPool *pool, void *addr, int type)
667     int s, i;
668 #if MULTI_THREAD_SEND  
669     CmiLock(pool->lock);
670 #endif
671     s = pool->freehead;
672     if (s == -1) {
673         int newsize = pool->size * 2;
674         //printf("[%d] IndexPool_getslot %p expand to: %d\n", myrank, pool, newsize);
675         if (newsize > pool->maxsize) {
676             static int indexpool_overflow = 0;
677             if(!indexpool_overflow){
678                 printf("[%d] Warning: IndexPool_getslot %p overflow when expanding to: %d\n", myrank, pool, newsize);
679                 indexpool_overflow = 1;
680             }
681             return -1;
682         }
683         struct IndexStruct *old_ackpool = pool->indexes;
684         pool->indexes = (struct IndexStruct *)malloc(newsize*sizeof(struct IndexStruct));
685         if(pool->indexes == NULL) {
686             CmiAbort("malloc of pool is null\n");
687         }
688         memcpy(pool->indexes, old_ackpool, pool->size*sizeof(struct IndexStruct));
689         for (i=pool->size; i<newsize-1; i++) {
690             pool->indexes[i].next = i+1;
691         }
692         pool->indexes[i].next = -1;
693         pool->freehead = pool->size;
694         s = pool->size;
695         pool->size = newsize;
696         free(old_ackpool);
697     }
698     pool->freehead = pool->indexes[s].next;
699     pool->indexes[s].addr = addr;
700     pool->indexes[s].type = type;
701 #if MULTI_THREAD_SEND
702     CmiUnlock(pool->lock);
703 #endif
704     return s;
707 static void IndexPool_freeslot(IndexPool *pool, int s)
709     CmiAssert(s>=0 && s<pool->size);
710 #if MULTI_THREAD_SEND
711     CmiLock(pool->lock);
712 #endif
713     pool->indexes[s].next = pool->freehead;
714     pool->freehead = s;
715 #if MULTI_THREAD_SEND
716     CmiUnlock(pool->lock);
717 #endif
720 #endif  
722 #if CMK_SMSGS_FREE_AFTER_EVENT
723 /* 
724   SMSGS pool 
725   the pool is to buffer sending smsgs until it can be free'ed .
727 static IndexPool smsgsPool;
728 #endif
730 #if REMOTE_EVENT
731 /* ack pool for remote events */
733 static int  SHIFT   =           18;
734 #define INDEX_MASK              ((1<<(32-SHIFT-1)) - 1)
735 #define RANK_MASK               ((1<<SHIFT) - 1)
736 #define ACK_EVENT(idx)          ((((idx) & INDEX_MASK)<<SHIFT) | myrank)
738 #define GET_TYPE(evt)           (((evt) >> 31) & 1)
739 #define GET_RANK(evt)           ((evt) & RANK_MASK)
740 #define GET_INDEX(evt)          (((evt) >> SHIFT) & INDEX_MASK)
742 #define PERSIST_EVENT(idx)      ( (1<<31) | (((idx) & INDEX_MASK)<<SHIFT) | myrank)
743 #define DIRECT_EVENT(idx)      ( (1<<31) | (((idx) & INDEX_MASK)<<SHIFT) | myrank)
745 #if CMK_SMP
746 #define POOL_INIT_SIZE                4096
747 #else
748 #define POOL_INIT_SIZE                1024
749 #endif
751 static IndexPool  ackPool;
752 #if CMK_PERSISTENT_COMM_PUT
753 static IndexPool  persistPool;
754 #else
755 #define persistPool ackPool 
756 #endif
758 #endif
762 /* =====Beginning of Definitions of Message-Corruption Related Macros=====*/
763 #define CMI_MAGIC(msg)                   ((CmiMsgHeaderBasic *)msg)->magic
764 #define CHARM_MAGIC_NUMBER               126
766 #if CMK_ERROR_CHECKING
767 CMI_EXTERNC
768 unsigned char computeCheckSum(unsigned char *data, int len);
769 static int checksum_flag = 0;
770 #define CMI_SET_CHECKSUM(msg, len)      \
771         if (checksum_flag)  {   \
772           ((CmiMsgHeaderBasic *)msg)->cksum = 0;        \
773           ((CmiMsgHeaderBasic *)msg)->cksum = computeCheckSum((unsigned char*)msg, len);        \
774         }
775 #define CMI_CHECK_CHECKSUM(msg, len)    \
776         if (checksum_flag)      \
777           if (computeCheckSum((unsigned char*)msg, len) != 0)   \
778             CmiAbort("Fatal error: checksum doesn't agree!\n");
779 #else
780 #define CMI_SET_CHECKSUM(msg, len)
781 #define CMI_CHECK_CHECKSUM(msg, len)
782 #endif
783 /* =====End of Definitions of Message-Corruption Related Macros=====*/
785 static int print_stats = 0;
786 static int stats_off = 0;
787 void CmiTurnOnStats(void)
789     stats_off = 0;
790     //CmiPrintf("[%d][%d:%d]+++++++++++ turning on stats \n", CmiMyNode(), CmiMyPe(), CmiMyRank());
793 void CmiTurnOffStats(void)
795     stats_off = 1;
798 #define IS_PUT(type)    (type == GNI_POST_FMA_PUT || type == GNI_POST_RDMA_PUT)
800 #if CMK_WITH_STATS
801 FILE *counterLog = NULL;
802 typedef struct comm_thread_stats
804     uint64_t  smsg_data_count;
805     uint64_t  lmsg_init_count;
806     uint64_t  ack_count;
807     uint64_t  big_msg_ack_count;
808     uint64_t  smsg_count;
809     uint64_t  direct_put_done_count;
810     uint64_t  put_done_count;
811     //times of calling SmsgSend
812     uint64_t  try_smsg_data_count;
813     uint64_t  try_lmsg_init_count;
814     uint64_t  try_ack_count;
815     uint64_t  try_big_msg_ack_count;
816     uint64_t  try_direct_put_done_count;
817     uint64_t  try_put_done_count;
818     uint64_t  try_smsg_count;
819     
820     double    max_time_in_send_buffered_smsg;
821     double    all_time_in_send_buffered_smsg;
823     uint64_t  rdma_get_count, rdma_put_count;
824     uint64_t  try_rdma_get_count, try_rdma_put_count;
825     double    max_time_from_control_to_rdma_init;
826     double    all_time_from_control_to_rdma_init;
828     double    max_time_from_rdma_init_to_rdma_done;
829     double    all_time_from_rdma_init_to_rdma_done;
831     int      count_in_PumpNetwork;
832     double   time_in_PumpNetwork;
833     double   max_time_in_PumpNetwork;
834     int      count_in_SendBufferMsg_smsg;
835     double   time_in_SendBufferMsg_smsg;
836     double   max_time_in_SendBufferMsg_smsg;
837     int      count_in_SendRdmaMsg;
838     double   time_in_SendRdmaMsg;
839     double   max_time_in_SendRdmaMsg;
840     int      count_in_PumpRemoteTransactions;
841     double   time_in_PumpRemoteTransactions;
842     double   max_time_in_PumpRemoteTransactions;
843     int      count_in_PumpLocalTransactions_rdma;
844     double   time_in_PumpLocalTransactions_rdma;
845     double   max_time_in_PumpLocalTransactions_rdma;
846     int      count_in_PumpDatagramConnection;
847     double   time_in_PumpDatagramConnection;
848     double   max_time_in_PumpDatagramConnection;
849 } Comm_Thread_Stats;
851 static Comm_Thread_Stats   comm_stats;
853 static char *counters_dirname = "counters";
855 static void init_comm_stats(void)
857   memset(&comm_stats, 0, sizeof(Comm_Thread_Stats));
858   if (print_stats){
859       char ln[200];
860       int code = mkdir(counters_dirname, 00777); 
861       sprintf(ln,"%s/statistics.%d.%d", counters_dirname, mysize, myrank);
862       counterLog=fopen(ln,"w");
863       if (counterLog == NULL) CmiAbort("Counter files open failed");
864   }
867 #define SMSG_CREATION( x ) if(print_stats) { x->creation_time = CmiWallTimer(); }
869 #define SMSG_SENT_DONE(creation_time, tag)  \
870         if (print_stats && !stats_off) {   if( tag == SMALL_DATA_TAG) comm_stats.smsg_data_count++;  \
871             else  if( tag == LMSG_INIT_TAG || tag == LMSG_OOB_INIT_TAG) comm_stats.lmsg_init_count++;  \
872             else  if( tag == ACK_TAG) comm_stats.ack_count++;  \
873             else  if( tag == BIG_MSG_TAG) comm_stats.big_msg_ack_count++;  \
874             else  if( tag == PUT_DONE_TAG ) comm_stats.put_done_count++;  \
875             else  if( tag == DIRECT_PUT_DONE_TAG ) comm_stats.direct_put_done_count++;  \
876             comm_stats.smsg_count++; \
877             double inbuff_time = CmiWallTimer() - creation_time;   \
878             if(inbuff_time > comm_stats.max_time_in_send_buffered_smsg) comm_stats.max_time_in_send_buffered_smsg= inbuff_time; \
879             comm_stats.all_time_in_send_buffered_smsg += inbuff_time;  \
880         }
882 #define SMSG_TRY_SEND(tag)  \
883         if (print_stats && !stats_off){   if( tag == SMALL_DATA_TAG) comm_stats.try_smsg_data_count++;  \
884             else  if( tag == LMSG_INIT_TAG || tag == LMSG_OOB_INIT_TAG) comm_stats.try_lmsg_init_count++;  \
885             else  if( tag == ACK_TAG) comm_stats.try_ack_count++;  \
886             else  if( tag == BIG_MSG_TAG) comm_stats.try_big_msg_ack_count++;  \
887             else  if( tag == PUT_DONE_TAG ) comm_stats.try_put_done_count++;  \
888             else  if( tag == DIRECT_PUT_DONE_TAG ) comm_stats.try_direct_put_done_count++;  \
889             comm_stats.try_smsg_count++; \
890         }
892 #define  RDMA_TRY_SEND(type)        if (print_stats && !stats_off) {IS_PUT(type)?comm_stats.try_rdma_put_count++:comm_stats.try_rdma_get_count++;}
894 #define  RDMA_TRANS_DONE(x)      \
895          if (print_stats && !stats_off) {  double rdma_trans_time = CmiWallTimer() - x ; \
896              if(rdma_trans_time > comm_stats.max_time_from_rdma_init_to_rdma_done) comm_stats.max_time_from_rdma_init_to_rdma_done = rdma_trans_time; \
897              comm_stats.all_time_from_rdma_init_to_rdma_done += rdma_trans_time; \
898          }
900 #define  RDMA_TRANS_INIT(type, x)      \
901          if (print_stats && !stats_off) {   IS_PUT(type)?comm_stats.rdma_put_count++:comm_stats.rdma_get_count++;  \
902              double rdma_trans_time = CmiWallTimer() - x ; \
903              if(rdma_trans_time > comm_stats.max_time_from_control_to_rdma_init) comm_stats.max_time_from_control_to_rdma_init = rdma_trans_time; \
904              comm_stats.all_time_from_control_to_rdma_init += rdma_trans_time; \
905          }
907 #define STATS_PUMPNETWORK_TIME(x)   \
908         { double t = CmiWallTimer(); \
909           x;        \
910           t = CmiWallTimer() - t;          \
911           comm_stats.count_in_PumpNetwork++;        \
912           comm_stats.time_in_PumpNetwork += t;   \
913           if (t>comm_stats.max_time_in_PumpNetwork)      \
914               comm_stats.max_time_in_PumpNetwork = t;    \
915         }
917 #define STATS_PUMPREMOTETRANSACTIONS_TIME(x)   \
918         { double t = CmiWallTimer(); \
919           x;        \
920           t = CmiWallTimer() - t;          \
921           comm_stats.count_in_PumpRemoteTransactions ++;        \
922           comm_stats.time_in_PumpRemoteTransactions += t;   \
923           if (t>comm_stats.max_time_in_PumpRemoteTransactions)      \
924               comm_stats.max_time_in_PumpRemoteTransactions = t;    \
925         }
927 #define STATS_PUMPLOCALTRANSACTIONS_RDMA_TIME(x)   \
928         { double t = CmiWallTimer(); \
929           x;        \
930           t = CmiWallTimer() - t;          \
931           comm_stats.count_in_PumpLocalTransactions_rdma ++;        \
932           comm_stats.time_in_PumpLocalTransactions_rdma += t;   \
933           if (t>comm_stats.max_time_in_PumpLocalTransactions_rdma)      \
934               comm_stats.max_time_in_PumpLocalTransactions_rdma = t;    \
935         }
937 #define STATS_SEND_SMSGS_TIME(x)   \
938         { double t = CmiWallTimer(); \
939           x;        \
940           t = CmiWallTimer() - t;          \
941           comm_stats.count_in_SendBufferMsg_smsg ++;        \
942           comm_stats.time_in_SendBufferMsg_smsg += t;   \
943           if (t>comm_stats.max_time_in_SendBufferMsg_smsg)      \
944               comm_stats.max_time_in_SendBufferMsg_smsg = t;    \
945         }
947 #define STATS_SENDRDMAMSG_TIME(x)   \
948         { double t = CmiWallTimer(); \
949           x;        \
950           t = CmiWallTimer() - t;          \
951           comm_stats.count_in_SendRdmaMsg ++;        \
952           comm_stats.time_in_SendRdmaMsg += t;   \
953           if (t>comm_stats.max_time_in_SendRdmaMsg)      \
954               comm_stats.max_time_in_SendRdmaMsg = t;    \
955         }
957 #define STATS_PUMPDATAGRAMCONNECTION_TIME(x)   \
958         { double t = CmiWallTimer(); \
959           x;        \
960           t = CmiWallTimer() - t;          \
961           comm_stats.count_in_PumpDatagramConnection ++;        \
962           comm_stats.time_in_PumpDatagramConnection += t;   \
963           if (t>comm_stats.max_time_in_PumpDatagramConnection)      \
964               comm_stats.max_time_in_PumpDatagramConnection = t;    \
965         }
967 static void print_comm_stats(void)
969     fprintf(counterLog, "Node[%d] SMSG time in buffer\t[total:%f\tmax:%f\tAverage:%f](milisecond)\n", myrank, 1000.0*comm_stats.all_time_in_send_buffered_smsg, 1000.0*comm_stats.max_time_in_send_buffered_smsg, 1000.0*comm_stats.all_time_in_send_buffered_smsg/comm_stats.smsg_count);
970     fprintf(counterLog, "Node[%d] Smsg  Msgs  \t[Total:%lld\t Data:%lld\t Lmsg_Init:%lld\t ACK:%lld\t BIG_MSG_ACK:%lld Direct_put_done:%lld\t Persistent_put_done:%lld]\n", myrank, 
971             comm_stats.smsg_count, comm_stats.smsg_data_count, comm_stats.lmsg_init_count, 
972             comm_stats.ack_count, comm_stats.big_msg_ack_count, comm_stats.direct_put_done_count, comm_stats.put_done_count);
973     
974     fprintf(counterLog, "Node[%d] SmsgSendCalls\t[Total:%lld\t Data:%lld\t Lmsg_Init:%lld\t ACK:%lld\t BIG_MSG_ACK:%lld Direct_put_done:%lld\t Persistent_put_done:%lld]\n\n", myrank, 
975             comm_stats.try_smsg_count, comm_stats.try_smsg_data_count, comm_stats.try_lmsg_init_count, 
976             comm_stats.try_ack_count, comm_stats.try_big_msg_ack_count, comm_stats.try_direct_put_done_count, comm_stats.try_put_done_count);
978     fprintf(counterLog, "Node[%d] Rdma Transaction [count (GET/PUT):%lld %lld\t calls (GET/PUT):%lld %lld]\n", myrank, comm_stats.rdma_get_count, comm_stats.rdma_put_count, comm_stats.try_rdma_get_count, comm_stats.try_rdma_put_count);
979     fprintf(counterLog, "Node[%d] Rdma time from control arrives to rdma init [Total:%f\tMAX:%f\t Average:%f](milisecond)\n", myrank, 1000.0*comm_stats.all_time_from_control_to_rdma_init, 1000.0*comm_stats.max_time_from_control_to_rdma_init, 1000.0*comm_stats.all_time_from_control_to_rdma_init/(comm_stats.rdma_get_count+comm_stats.rdma_put_count)); 
980     fprintf(counterLog, "Node[%d] Rdma time from init to rdma done [Total:%f\tMAX:%f\t Average:%f](milisecond)\n\n", myrank,1000.0*comm_stats.all_time_from_rdma_init_to_rdma_done, 1000.0*comm_stats.max_time_from_rdma_init_to_rdma_done, 1000.0*comm_stats.all_time_from_rdma_init_to_rdma_done/(comm_stats.rdma_get_count+comm_stats.rdma_put_count));
983     fprintf(counterLog, "                             count\ttotal(s)\tmax(s)\taverage(us)\n");
984     fprintf(counterLog, "PumpNetworkSmsg:              %d\t%.6f\t%.6f\t%.6f\n", comm_stats.count_in_PumpNetwork, comm_stats.time_in_PumpNetwork, comm_stats.max_time_in_PumpNetwork, comm_stats.time_in_PumpNetwork*1e6/comm_stats.count_in_PumpNetwork);
985     fprintf(counterLog, "PumpRemoteTransactions:       %d\t%.6f\t%.6f\t%.6f\n", comm_stats.count_in_PumpRemoteTransactions, comm_stats.time_in_PumpRemoteTransactions, comm_stats.max_time_in_PumpRemoteTransactions, comm_stats.time_in_PumpRemoteTransactions*1e6/comm_stats.count_in_PumpRemoteTransactions);
986     fprintf(counterLog, "PumpLocalTransactions(RDMA):  %d\t%.6f\t%.6f\t%.6f\n", comm_stats.count_in_PumpLocalTransactions_rdma, comm_stats.time_in_PumpLocalTransactions_rdma, comm_stats.max_time_in_PumpLocalTransactions_rdma, comm_stats.time_in_PumpLocalTransactions_rdma*1e6/comm_stats.count_in_PumpLocalTransactions_rdma);
987     fprintf(counterLog, "SendBufferMsg (SMSG):         %d\t%.6f\t%.6f\t%.6f\n",  comm_stats.count_in_SendBufferMsg_smsg, comm_stats.time_in_SendBufferMsg_smsg, comm_stats.max_time_in_SendBufferMsg_smsg, comm_stats.time_in_SendBufferMsg_smsg*1e6/comm_stats.count_in_SendBufferMsg_smsg);
988     fprintf(counterLog, "SendRdmaMsg:                  %d\t%.6f\t%.6f\t%.6f\n",  comm_stats.count_in_SendRdmaMsg, comm_stats.time_in_SendRdmaMsg, comm_stats.max_time_in_SendRdmaMsg, comm_stats.time_in_SendRdmaMsg*1e6/comm_stats.count_in_SendRdmaMsg);
989     if (useDynamicSMSG)
990     fprintf(counterLog, "PumpDatagramConnection:                  %d\t%.6f\t%.6f\t%.6f\n",  comm_stats.count_in_PumpDatagramConnection, comm_stats.time_in_PumpDatagramConnection, comm_stats.max_time_in_PumpDatagramConnection, comm_stats.time_in_PumpDatagramConnection*1e6/comm_stats.count_in_PumpDatagramConnection);
992     fclose(counterLog);
995 #else
996 #define STATS_PUMPNETWORK_TIME(x)                  x
997 #define STATS_SEND_SMSGS_TIME(x)                   x
998 #define STATS_PUMPREMOTETRANSACTIONS_TIME(x)       x
999 #define STATS_PUMPLOCALTRANSACTIONS_RDMA_TIME(x)   x
1000 #define STATS_SENDRDMAMSG_TIME(x)                  x
1001 #define STATS_PUMPDATAGRAMCONNECTION_TIME(x)       x
1002 #endif
1004 static void
1005 allgather(void *in,void *out, int len)
1007     static int *ivec_ptr=NULL,already_called=0,job_size=0;
1008     int i,rc;
1009     int my_rank;
1010     char *tmp_buf,*out_ptr;
1012     if(!already_called) {
1014         rc = PMI_Get_size(&job_size);
1015         CmiAssert(rc == PMI_SUCCESS);
1016         rc = PMI_Get_rank(&my_rank);
1017         CmiAssert(rc == PMI_SUCCESS);
1019         ivec_ptr = (int *)malloc(sizeof(int) * job_size);
1020         CmiAssert(ivec_ptr != NULL);
1022         rc = PMI_Allgather(&my_rank,ivec_ptr,sizeof(int));
1023         CmiAssert(rc == PMI_SUCCESS);
1025         already_called = 1;
1027     }
1029     tmp_buf = (char *)malloc(job_size * len);
1030     CmiAssert(tmp_buf);
1032     rc = PMI_Allgather(in,tmp_buf,len);
1033     CmiAssert(rc == PMI_SUCCESS);
1035     out_ptr = (char *)out;
1037     for(i=0;i<job_size;i++) {
1039         memcpy(&out_ptr[len * ivec_ptr[i]],&tmp_buf[i * len],len);
1041     }
1043     free(tmp_buf);
1046 static void
1047 allgather_2(void *in,void *out, int len)
1049     //PMI_Allgather is out of order
1050     int i,rc, extend_len;
1051     int  rank_index;
1052     char *out_ptr, *out_ref;
1053     char *in2;
1055     extend_len = sizeof(int) + len;
1056     in2 = (char*)malloc(extend_len);
1058     memcpy(in2, &myrank, sizeof(int));
1059     memcpy(in2+sizeof(int), in, len);
1061     out_ptr = (char*)malloc(mysize*extend_len);
1063     rc = PMI_Allgather(in2, out_ptr, extend_len);
1064     GNI_RC_CHECK("allgather", rc);
1066     out_ref = (char *)out;
1068     for(i=0;i<mysize;i++) {
1069         //rank index 
1070         memcpy(&rank_index, &(out_ptr[extend_len*i]), sizeof(int));
1071         //copy to the rank index slot
1072         memcpy(&out_ref[rank_index*len], &out_ptr[extend_len*i+sizeof(int)], len);
1073     }
1075     free(out_ptr);
1076     free(in2);
1080 static unsigned int get_gni_nic_address(int device_id)
1082     unsigned int address, cpu_id;
1083     gni_return_t status;
1084     int i, alps_dev_id=-1,alps_address=-1;
1085     char *token, *p_ptr;
1087     p_ptr = getenv("PMI_GNI_DEV_ID");
1088     if (!p_ptr) {
1089         status = GNI_CdmGetNicAddress(device_id, &address, &cpu_id);
1090        
1091         GNI_RC_CHECK("GNI_CdmGetNicAddress", status);
1092     } else {
1093         while ((token = strtok(p_ptr,":")) != NULL) {
1094             alps_dev_id = atoi(token);
1095             if (alps_dev_id == device_id) {
1096                 break;
1097             }
1098             p_ptr = NULL;
1099         }
1100         CmiAssert(alps_dev_id != -1);
1101         p_ptr = getenv("PMI_GNI_LOC_ADDR");
1102         CmiAssert(p_ptr != NULL);
1103         i = 0;
1104         while ((token = strtok(p_ptr,":")) != NULL) {
1105             if (i == alps_dev_id) {
1106                 alps_address = atoi(token);
1107                 break;
1108             }
1109             p_ptr = NULL;
1110             ++i;
1111         }
1112         CmiAssert(alps_address != -1);
1113         address = alps_address;
1114     }
1115     return address;
1118 static uint8_t get_ptag(void)
1120     char *p_ptr, *token;
1121     uint8_t ptag;
1123     p_ptr = getenv("PMI_GNI_PTAG");
1124     CmiAssert(p_ptr != NULL);
1125     token = strtok(p_ptr, ":");
1126     ptag = (uint8_t)atoi(token);
1127     return ptag;
1128         
1131 static uint32_t get_cookie(void)
1133     uint32_t cookie;
1134     char *p_ptr, *token;
1136     p_ptr = getenv("PMI_GNI_COOKIE");
1137     CmiAssert(p_ptr != NULL);
1138     token = strtok(p_ptr, ":");
1139     cookie = (uint32_t)atoi(token);
1141     return cookie;
1144 #if LARGEPAGE
1146 /* directly mmap memory from hugetlbfs for large pages */
1148 #include <sys/stat.h>
1149 #include <fcntl.h>
1150 #include <sys/mman.h>
1152 #ifdef __cplusplus
1153 extern "C" {
1154 #endif
1155 #include <hugetlbfs.h>
1156 #ifdef __cplusplus
1158 #endif
1160 // size must be _tlbpagesize aligned
1161 void *my_get_huge_pages(size_t size)
1163     char filename[512];
1164     int fd;
1165     mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
1166     void *ptr = NULL;
1168     snprintf(filename, sizeof(filename), "%s/charm_mempool.%d.%d", hugetlbfs_find_path_for_size(_tlbpagesize), getpid(), rand());
1169     fd = open(filename, O_RDWR | O_CREAT, mode);
1170     if (fd == -1) {
1171         CmiAbort("my_get_huge_pages: open filed");
1172     }
1173     ptr = mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_PRIVATE, fd, 0);
1174     if (ptr == MAP_FAILED) ptr = NULL;
1175 //printf("[%d] my_get_huge_pages: %s %d %p\n", myrank, filename, size, ptr);
1176     close(fd);
1177     unlink(filename);
1178     return ptr;
1181 void my_free_huge_pages(void *ptr, int size)
1183 //printf("[%d] my_free_huge_pages: %p %d\n", myrank, ptr, size);
1184     int ret = munmap(ptr, size);
1185     if (ret == -1) CmiAbort("munmap failed in my_free_huge_pages");
1188 #endif
1190 /* =====Beginning of Definitions of Message-Corruption Related Macros=====*/
1191 /* TODO: add any that are related */
1192 /* =====End of Definitions of Message-Corruption Related Macros=====*/
1195 #include "machine-lrts.h"
1197 #include "machine-common-core.C"
1199 #include "machine-rdma.h"
1202 static int SendBufferMsg(SMSG_QUEUE *queue, SMSG_QUEUE *urgent_queue);
1203 static void SendRdmaMsg(PCQueue );
1204 static void PumpNetworkSmsg(void);
1205 static void PumpLocalTransactions(gni_cq_handle_t tx_cqh, CmiNodeLock cq_lock);
1206 #if CQWRITE
1207 static void PumpCqWriteTransactions(void);
1208 #endif
1209 #if REMOTE_EVENT
1210 static void PumpRemoteTransactions(gni_cq_handle_t);
1211 #endif
1213 #if MACHINE_DEBUG_LOG
1214 static CmiInt8 buffered_recv_msg = 0;
1215 int         lrts_smsg_success = 0;
1216 int         lrts_received_msg = 0;
1217 #endif
1219 static void sweep_mempool(mempool_type *mptr)
1221     int n = 0;
1222     block_header *current = &(mptr->block_head);
1224     printf("[n %d %d] sweep_mempool slot START.\n", myrank, n++);
1225     while( current!= NULL) {
1226         printf("[n %d %d] sweep_mempool slot %p size: %lld used: %d (%d %d) %lld %lld.\n", myrank, n++, current, current->size, 1<<current->used, current->msgs_in_send, current->msgs_in_recv, current->mem_hndl.qword1, current->mem_hndl.qword2);
1227         current = current->block_next?(block_header *)((char*)mptr+current->block_next):NULL;
1228     }
1229     printf("[n %d] sweep_mempool slot END.\n", myrank);
1233 static INLINE_KEYWORD  gni_return_t deregisterMemory(mempool_type *mptr, block_header **from)
1235     block_header *current = *from;
1237     //while(register_memory_size>= MAX_REG_MEM)
1238     //{
1239         while( current!= NULL && ((current->msgs_in_send+current->msgs_in_recv)>0 || IsMemHndlZero(current->mem_hndl) ))
1240             current = current->block_next?(block_header *)((char*)mptr+current->block_next):NULL;
1242         *from = current;
1243         if(current == NULL) return GNI_RC_ERROR_RESOURCE;
1244         MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(GetMemHndlFromBlockHeader(current)) , &omdh, GetSizeFromBlockHeader(current));
1245         SetMemHndlZero(GetMemHndlFromBlockHeader(current));
1246     //}
1247     return GNI_RC_SUCCESS;
1250 static gni_return_t registerFromMempool(mempool_type *mptr, void *blockaddr, size_t size, gni_mem_handle_t  *memhndl, gni_cq_handle_t cqh )
1252     gni_return_t status = GNI_RC_SUCCESS;
1253     //int size = GetMempoolsize(msg);
1254     //void *blockaddr = GetMempoolBlockPtr(msg);
1255     //gni_mem_handle_t  *memhndl =   &(GetMemHndl(msg));
1256    
1257     block_header *current = &(mptr->block_head);
1258     while(register_memory_size>= MAX_REG_MEM)
1259     {
1260         status = deregisterMemory(mptr, &current);
1261         if (status != GNI_RC_SUCCESS) break;
1262     }
1263     if(register_memory_size>= MAX_REG_MEM) return status;
1265     MACHSTATE3(8, "mempool (%lld,%lld,%d) \n", buffered_send_msg, buffered_recv_msg, register_memory_size); 
1266     while(1)
1267     {
1268         MEMORY_REGISTER(onesided_hnd, nic_hndl, blockaddr, size, memhndl, &omdh, cqh, status);
1269         if(status == GNI_RC_SUCCESS)
1270         {
1271             break;
1272         }
1273         else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1274         {
1275             GNI_RC_CHECK("registerFromMempool", status);
1276         }
1277         else
1278         {
1279             status = deregisterMemory(mptr, &current);
1280             if (status != GNI_RC_SUCCESS) break;
1281         }
1282     }; 
1283     return status;
1286 INLINE_KEYWORD
1287 static gni_return_t registerMemory(void *msg, size_t size, gni_mem_handle_t *t, gni_cq_handle_t cqh )
1289     static int rank = -1;
1290     int i;
1291     gni_return_t status;
1292     mempool_type *mptr1 = CpvAccess(mempool);//mempool_type*)GetMempoolPtr(msg);
1293     //mempool_type *mptr1 = (mempool_type*)GetMempoolPtr(msg);
1294     mempool_type *mptr;
1296     status = registerFromMempool(mptr1, msg, size, t, cqh);
1297     if (status == GNI_RC_SUCCESS) return status;
1298 #if CMK_SMP 
1299     for (i=0; i<CmiMyNodeSize()+1; i++) {
1300       rank = (rank+1)%(CmiMyNodeSize()+1);
1301       mptr = CpvAccessOther(mempool, rank);
1302       if (mptr == mptr1) continue;
1303       status = registerFromMempool(mptr, msg, size, t, cqh);
1304       if (status == GNI_RC_SUCCESS) return status;
1305     }
1306 #endif
1307     return  GNI_RC_ERROR_RESOURCE;
1310 #if CMK_ONESIDED_IMPL
1311 #include "machine-onesided.h"
1312 #endif
1314 INLINE_KEYWORD
1315 static void buffer_small_msgs(SMSG_QUEUE *queue, void *msg, int size, int destNode, uint8_t tag)
1317     MSG_LIST        *msg_tmp;
1318     MallocMsgList(msg_tmp);
1319     msg_tmp->destNode = destNode;
1320     msg_tmp->size   = size;
1321     msg_tmp->msg    = msg;
1322     msg_tmp->tag    = tag;
1323 #if CMK_WITH_STATS
1324     SMSG_CREATION(msg_tmp)
1325 #endif
1327 #if ONE_SEND_QUEUE
1328 #if CMK_LOCKLESS_QUEUE
1329     MPMCQueuePush(queue->sendMsgBuf, (char*)msg_tmp);
1330 #else
1331     PCQueuePush(queue->sendMsgBuf, (char*)msg_tmp);
1332 #endif
1333 #else
1334 #if SMP_LOCKS
1335     CmiLock(queue->smsg_msglist_index[destNode].lock);
1336     if(queue->smsg_msglist_index[destNode].pushed == 0)
1337     {
1338         PCQueuePush(queue->nonEmptyQueues, (char*)&(queue->smsg_msglist_index[destNode]));
1339     }
1340     PCQueuePush(queue->smsg_msglist_index[destNode].sendSmsgBuf, (char*)msg_tmp);
1341     CmiUnlock(queue->smsg_msglist_index[destNode].lock);
1342 #else
1343     PCQueuePush(queue->smsg_msglist_index[destNode].sendSmsgBuf, (char*)msg_tmp);
1344 #endif
1345 #endif
1347 #if PRINT_SYH
1348     buffered_smsg_counter++;
1349 #endif
1352 INLINE_KEYWORD static void print_smsg_attr(gni_smsg_attr_t     *a)
1354     printf("type=%d\n, credit=%d\n, size=%d\n, buf=%p, offset=%d\n", a->msg_type, a->mbox_maxcredit, a->buff_size, a->msg_buffer, a->mbox_offset);
1356 #if 0
1357 INLINE_KEYWORD
1358 static void setup_smsg_connection(int destNode)
1360     mdh_addr_list_t  *new_entry = 0;
1361     gni_post_descriptor_t *pd;
1362     gni_smsg_attr_t      *smsg_attr;
1363     gni_return_t status = GNI_RC_NOT_DONE;
1364     RDMA_REQUEST        *rdma_request_msg;
1365     
1366     if(smsg_available_slot == smsg_expand_slots)
1367     {
1368         new_entry = (mdh_addr_list_t*)malloc(sizeof(mdh_addr_list_t));
1369         new_entry->addr = memalign(64, smsg_memlen*smsg_expand_slots);
1370         memset(new_entry->addr, 0, smsg_memlen*smsg_expand_slots);
1372         status = GNI_MemRegister(nic_hndl, (uint64_t)new_entry->addr,
1373             smsg_memlen*smsg_expand_slots, smsg_rx_cqh,
1374             GNI_MEM_READWRITE,   
1375             -1,
1376             &(new_entry->mdh));
1377         smsg_available_slot = 0; 
1378         new_entry->next = smsg_dynamic_list;
1379         smsg_dynamic_list = new_entry;
1380     }
1381     smsg_attr = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
1382     smsg_attr->msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
1383     smsg_attr->mbox_maxcredit = SMSG_MAX_CREDIT;
1384     smsg_attr->msg_maxsize = SMSG_MAX_MSG;
1385     smsg_attr->mbox_offset = smsg_available_slot * smsg_memlen;
1386     smsg_attr->buff_size = smsg_memlen;
1387     smsg_attr->msg_buffer = smsg_dynamic_list->addr;
1388     smsg_attr->mem_hndl = smsg_dynamic_list->mdh;
1389     smsg_local_attr_vec[destNode] = smsg_attr;
1390     smsg_available_slot++;
1391     MallocPostDesc(pd);
1392     pd->type            = GNI_POST_FMA_PUT;
1393     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT ;
1394     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
1395     pd->length          = sizeof(gni_smsg_attr_t);
1396     pd->local_addr      = (uint64_t) smsg_attr;
1397     pd->remote_addr     = (uint64_t)&((((gni_smsg_attr_t*)(smsg_connection_vec[destNode].addr))[myrank]));
1398     pd->remote_mem_hndl = smsg_connection_vec[destNode].mdh;
1399     pd->src_cq_hndl     = 0;
1401     pd->rdma_mode       = 0;
1402     status = GNI_PostFma(ep_hndl_array[destNode],  pd);
1403     print_smsg_attr(smsg_attr);
1404     if(status == GNI_RC_ERROR_RESOURCE )
1405     {
1406         MallocRdmaRequest(rdma_request_msg);
1407         rdma_request_msg->destNode = destNode;
1408         rdma_request_msg->pd = pd;
1409         /* buffer this request */
1410     }
1411 #if PRINT_SYH
1412     if(status != GNI_RC_SUCCESS)
1413        printf("[%d=%d] send post FMA %s\n", myrank, destNode, gni_err_str[status]);
1414     else
1415         printf("[%d=%d]OK send post FMA \n", myrank, destNode);
1416 #endif
1418 #endif
1419 /* useDynamicSMSG */
1420 INLINE_KEYWORD
1421 static void alloc_smsg_attr( gni_smsg_attr_t *local_smsg_attr)
1423     gni_return_t status = GNI_RC_NOT_DONE;
1425     if(mailbox_list->offset == mailbox_list->size)
1426     {
1427         dynamic_smsg_mailbox_t *new_mailbox_entry;
1428         new_mailbox_entry = (dynamic_smsg_mailbox_t*)malloc(sizeof(dynamic_smsg_mailbox_t));
1429         new_mailbox_entry->size = smsg_memlen*avg_smsg_connection;
1430         new_mailbox_entry->mailbox_base = malloc(new_mailbox_entry->size);
1431         memset(new_mailbox_entry->mailbox_base, 0, new_mailbox_entry->size);
1432         new_mailbox_entry->offset = 0;
1433         
1434         status = GNI_MemRegister(nic_hndl, (uint64_t)new_mailbox_entry->mailbox_base,
1435             new_mailbox_entry->size, smsg_rx_cqh,
1436             GNI_MEM_READWRITE,   
1437             -1,
1438             &(new_mailbox_entry->mem_hndl));
1440         GNI_RC_CHECK("register", status);
1441         new_mailbox_entry->next = mailbox_list;
1442         mailbox_list = new_mailbox_entry;
1443     }
1444     local_smsg_attr->msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
1445     local_smsg_attr->mbox_maxcredit = SMSG_MAX_CREDIT;
1446     local_smsg_attr->msg_maxsize = SMSG_MAX_MSG;
1447     local_smsg_attr->mbox_offset = mailbox_list->offset;
1448     mailbox_list->offset += smsg_memlen;
1449     local_smsg_attr->buff_size = smsg_memlen;
1450     local_smsg_attr->msg_buffer = mailbox_list->mailbox_base;
1451     local_smsg_attr->mem_hndl = mailbox_list->mem_hndl;
1454 /* useDynamicSMSG */
1455 INLINE_KEYWORD
1456 static int connect_to(int destNode)
1458     gni_return_t status = GNI_RC_NOT_DONE;
1459     CmiAssert(smsg_connected_flag[destNode] == 0);
1460     CmiAssert (smsg_attr_vector_local[destNode] == NULL);
1461     smsg_attr_vector_local[destNode] = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
1462     alloc_smsg_attr(smsg_attr_vector_local[destNode]);
1463     smsg_attr_vector_remote[destNode] = (gni_smsg_attr_t*) malloc (sizeof(gni_smsg_attr_t));
1464     
1465     CMI_GNI_LOCK(global_gni_lock)
1466     status = GNI_EpPostDataWId (ep_hndl_array[destNode], smsg_attr_vector_local[destNode], sizeof(gni_smsg_attr_t),smsg_attr_vector_remote[destNode] ,sizeof(gni_smsg_attr_t), destNode+mysize);
1467     CMI_GNI_UNLOCK(global_gni_lock)
1468     if (status == GNI_RC_ERROR_RESOURCE) {
1469       /* possibly destNode is making connection at the same time */
1470       free(smsg_attr_vector_local[destNode]);
1471       smsg_attr_vector_local[destNode] = NULL;
1472       free(smsg_attr_vector_remote[destNode]);
1473       smsg_attr_vector_remote[destNode] = NULL;
1474       mailbox_list->offset -= smsg_memlen;
1475 #if PRINT_SYH
1476     printf("[%d] send connect_to request to %d failed\n", myrank, destNode);
1477 #endif
1478       return 0;
1479     }
1480     GNI_RC_CHECK("GNI_Post", status);
1481     smsg_connected_flag[destNode] = 1;
1482 #if PRINT_SYH
1483     printf("[%d] send connect_to request to %d done\n", myrank, destNode);
1484 #endif
1485     return 1;
1488 INLINE_KEYWORD
1489 static gni_return_t send_smsg_message(SMSG_QUEUE *queue, int destNode, void *msg, int size, uint8_t tag, int inbuff, MSG_LIST *ptr, int smsgType, int useHeader)
1491     unsigned int          remote_address;
1492     uint32_t              remote_id;
1493     gni_return_t          status = GNI_RC_ERROR_RESOURCE;
1494     gni_smsg_attr_t       *smsg_attr;
1495     gni_post_descriptor_t *pd;
1496     gni_post_state_t      post_state;
1497     char                  *real_data; 
1498     int                   msgid = 0;
1500     if (useDynamicSMSG) {
1501         switch (smsg_connected_flag[destNode]) {
1502         case 0: 
1503             connect_to(destNode);         /* continue to case 1 */
1504         case 1:                           /* pending connection, do nothing */
1505             status = GNI_RC_NOT_DONE;
1506             if(inbuff ==0)
1507                 buffer_small_msgs(queue, msg, size, destNode, tag);
1508             return status;
1509         }
1510     }
1511 #if ! ONE_SEND_QUEUE
1512     if(PCQueueEmpty(queue->smsg_msglist_index[destNode].sendSmsgBuf) || inbuff==1)
1513 #endif
1514     {
1515         //CMI_GNI_LOCK(smsg_mailbox_lock)
1516         CMI_GNI_LOCK(default_tx_cq_lock)
1517 #if CMK_SMP_TRACE_COMMTHREAD
1518         int oldpe = -1;
1519         int oldeventid = -1;
1520         if(tag == SMALL_DATA_TAG || tag == LMSG_INIT_TAG || tag == LMSG_OOB_INIT_TAG || tag == LMSG_PERSISTENT_INIT_TAG)
1521         { 
1522             START_EVENT();
1523             if ( tag == SMALL_DATA_TAG)
1524                 real_data = (char*)msg; 
1525             else 
1526                 real_data = (char*)(((CONTROL_MSG*)msg)->source_addr);
1527             TRACE_COMM_GET_MSGID(real_data, &oldpe, &oldeventid);
1528             TRACE_COMM_SET_COMM_MSGID(real_data);
1529         }
1530 #endif
1531 #if REMOTE_EVENT
1532         if (tag == LMSG_INIT_TAG || tag == LMSG_OOB_INIT_TAG || tag == LMSG_PERSISTENT_INIT_TAG) {
1533             CONTROL_MSG *control_msg_tmp = (CONTROL_MSG*)msg;
1534             if (control_msg_tmp->seq_id <= 0 && control_msg_tmp->ack_index == -1)
1535             {
1536                 control_msg_tmp->ack_index = IndexPool_getslot(&ackPool, (void*)control_msg_tmp->source_addr, 1);
1537                 if (control_msg_tmp->ack_index == -1) {    /* table overflow */
1538                     status = GNI_RC_NOT_DONE;
1539                     if (inbuff ==0)
1540                         buffer_small_msgs(queue, msg, size, destNode, tag);
1541                     return status;
1542                 }
1543             }
1544         }
1545 #endif
1546 #if     CMK_WITH_STATS
1547         SMSG_TRY_SEND(tag)
1548 #endif
1549 #if CMK_WITH_STATS
1550         double              creation_time;
1551         if (ptr == NULL)
1552             creation_time = CmiWallTimer();
1553         else
1554             creation_time = ptr->creation_time;
1555 #endif
1557 #if CMK_SMSGS_FREE_AFTER_EVENT
1558         msgid = IndexPool_getslot(&smsgsPool, msg, smsgType);
1559         if(msgid == -1)
1560             CmiAbort("IndexPool for SMSG overflows.");
1561 #endif
1562         if(!useHeader)
1563           status = GNI_SmsgSendWTag(ep_hndl_array[destNode], NULL, 0, msg, size, msgid, tag);
1564         else
1565           status = GNI_SmsgSendWTag(ep_hndl_array[destNode], msg, size, NULL, 0, msgid, tag);
1566 #if CMK_SMP_TRACE_COMMTHREAD
1567         if (oldpe != -1)  TRACE_COMM_SET_MSGID(real_data, oldpe, oldeventid);
1568 #endif
1569         CMI_GNI_UNLOCK(default_tx_cq_lock)
1570         //CMI_GNI_UNLOCK(smsg_mailbox_lock)
1571         if(status == GNI_RC_SUCCESS)
1572         {
1573 #if     CMK_WITH_STATS
1574             SMSG_SENT_DONE(creation_time,tag) 
1575 #endif
1576 #if CMK_SMP_TRACE_COMMTHREAD
1577             if(tag == SMALL_DATA_TAG || tag == LMSG_INIT_TAG || tag == LMSG_OOB_INIT_TAG || tag == LMSG_PERSISTENT_INIT_TAG )
1578             { 
1579                 TRACE_COMM_CREATION(CpvAccess(projTraceStart), real_data);
1580             }
1581 #endif
1582         }else {
1583             status = GNI_RC_ERROR_RESOURCE;
1584 #if CMK_SMSGS_FREE_AFTER_EVENT
1585             IndexPool_freeslot(&smsgsPool, msgid);
1586 #endif
1587         }
1588     }
1589     if(status != GNI_RC_SUCCESS && inbuff ==0)
1590         buffer_small_msgs(queue, msg, size, destNode, tag);
1591     return status;
1594 INLINE_KEYWORD
1595 static CONTROL_MSG* construct_control_msg(int size, char *msg, int seqno)
1597     /* construct a control message and send */
1598     CONTROL_MSG         *control_msg_tmp;
1599     MallocControlMsg(control_msg_tmp);
1600     control_msg_tmp->source_addr = (uint64_t)msg;
1601     control_msg_tmp->seq_id    = seqno;
1602     control_msg_tmp->total_length = control_msg_tmp->length = ALIGN64(size); //for GET 4 bytes aligned 
1603 #if REMOTE_EVENT
1604     control_msg_tmp->ack_index    =  -1;
1605 #endif
1606 #if     USE_LRTS_MEMPOOL
1607     if(size < BIG_MSG)
1608     {
1609         control_msg_tmp->source_mem_hndl = GetMemHndl(msg);
1610     }
1611     else
1612     {
1613         SetMemHndlZero(control_msg_tmp->source_mem_hndl);
1614         control_msg_tmp->length = size - (seqno-1)*ONE_SEG;
1615         if (control_msg_tmp->length > ONE_SEG) control_msg_tmp->length = ONE_SEG;
1616     }
1617 #else
1618     SetMemHndlZero(control_msg_tmp->source_mem_hndl);
1619 #endif
1620     return control_msg_tmp;
1623 #define BLOCKING_SEND_CONTROL    0
1625 // Large message, send control to receiver, receiver register memory and do a GET, 
1626 // return 1 - send no success
1627 INLINE_KEYWORD static gni_return_t send_large_messages(SMSG_QUEUE *queue, int destNode, CONTROL_MSG  *control_msg_tmp, int inbuff, MSG_LIST *smsg_ptr, uint8_t lmsg_tag)
1629     gni_return_t        status  =  GNI_RC_ERROR_NOMEM;
1630     uint32_t            vmdh_index  = -1;
1631     int                 size;
1632     int                 offset = 0;
1633     uint64_t            source_addr;
1634     int                 register_size; 
1635     void                *msg;
1637     size    =   control_msg_tmp->total_length;
1638     source_addr = control_msg_tmp->source_addr;
1639     register_size = control_msg_tmp->length;
1641 #if  USE_LRTS_MEMPOOL
1642     if( control_msg_tmp->seq_id <=0 ){
1643 #if BLOCKING_SEND_CONTROL
1644         if (inbuff == 0 && IsMemHndlZero(GetMemHndl(source_addr))) {
1645             while (IsMemHndlZero(GetMemHndl(source_addr)) && buffered_send_msg + GetMempoolsize((void*)source_addr) >= MAX_BUFF_SEND)
1646                 LrtsAdvanceCommunication(0);
1647         }
1648 #endif
1649         if(IsMemHndlZero(GetMemHndl(source_addr))) //it is in mempool, it is possible to be de-registered by others
1650         {
1651             msg = (void*)source_addr;
1652             if(buffered_send_msg + GetMempoolsize(msg) >= MAX_BUFF_SEND)
1653             {
1654                 if(!inbuff)
1655                     buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, lmsg_tag);
1656                 return GNI_RC_ERROR_NOMEM;
1657             }
1658             //register the corresponding mempool
1659             status = registerMemory(GetMempoolBlockPtr(msg), GetMempoolsize(msg), &(GetMemHndl(msg)), rdma_rx_cqh);
1660             if(status == GNI_RC_SUCCESS)
1661             {
1662                 control_msg_tmp->source_mem_hndl = GetMemHndl(source_addr);
1663             }
1664         }else
1665         {
1666             control_msg_tmp->source_mem_hndl = GetMemHndl(source_addr);
1667             status = GNI_RC_SUCCESS;
1668         }
1669         if(NoMsgInSend(source_addr))
1670             register_size = GetMempoolsize((void*)(source_addr));
1671         else
1672             register_size = 0;
1673     }else if(control_msg_tmp->seq_id >0)    // BIG_MSG
1674     {
1675         int offset = ONE_SEG*(control_msg_tmp->seq_id-1);
1676         source_addr += offset;
1677         size = control_msg_tmp->length;
1678 #if BLOCKING_SEND_CONTROL
1679         if (inbuff == 0 && IsMemHndlZero(control_msg_tmp->source_mem_hndl)) {
1680             while (IsMemHndlZero(control_msg_tmp->source_mem_hndl) && buffered_send_msg + size >= MAX_BUFF_SEND)
1681                 LrtsAdvanceCommunication(0);
1682         }
1683 #endif
1684         if (IsMemHndlZero(control_msg_tmp->source_mem_hndl)) {
1685             if(buffered_send_msg + size >= MAX_BUFF_SEND)
1686             {
1687                 if(!inbuff)
1688                     buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, lmsg_tag);
1689                 return GNI_RC_ERROR_NOMEM;
1690             }
1691             status = registerMemory((void*)source_addr, ALIGN64(size), &(control_msg_tmp->source_mem_hndl), NULL);
1692             if(status == GNI_RC_SUCCESS) buffered_send_msg += ALIGN64(size);
1693         }
1694         else
1695         {
1696             status = GNI_RC_SUCCESS;
1697         }
1698         register_size = 0;  
1699     }
1701 #if CMI_EXERT_SEND_LARGE_CAP
1702     if(SEND_large_pending >= SEND_large_cap)
1703     {
1704         status = GNI_RC_ERROR_NOMEM;
1705     }
1706 #endif
1708     if(status == GNI_RC_SUCCESS)
1709     {
1710        status = send_smsg_message( queue, destNode, control_msg_tmp, CONTROL_MSG_SIZE, lmsg_tag, inbuff, smsg_ptr, NONCHARM_SMSG, 0); 
1711         if(status == GNI_RC_SUCCESS)
1712         {
1713 #if CMI_EXERT_SEND_LARGE_CAP
1714             SEND_large_pending++;
1715 #endif
1716             buffered_send_msg += register_size;
1717             if(control_msg_tmp->seq_id == 0)
1718             {
1719                 IncreaseMsgInSend(source_addr);
1720             }
1721 #if ! CMK_SMSGS_FREE_AFTER_EVENT
1722             FreeControlMsg(control_msg_tmp);
1723 #endif
1724             MACHSTATE5(8, "GO SMSG LARGE to %d (%d,%d,%d) tag=%d\n", destNode, buffered_send_msg, buffered_recv_msg, register_memory_size, lmsg_tag); 
1725         }else
1726             status = GNI_RC_ERROR_RESOURCE;
1728     } else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1729     {
1730         CmiAbort("Memory registor for large msg\n");
1731     }else 
1732     {
1733         status = GNI_RC_ERROR_NOMEM; 
1734         if(!inbuff)
1735             buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, lmsg_tag);
1736     }
1737     return status;
1738 #else
1739     MEMORY_REGISTER(onesided_hnd, nic_hndl,msg, ALIGN64(size), &(control_msg_tmp->source_mem_hndl), &omdh, NULL, status)
1740     if(status == GNI_RC_SUCCESS)
1741     {
1742         status = send_smsg_message(queue, destNode, control_msg_tmp, CONTROL_MSG_SIZE, lmsg_tag, 0, NULL, NONCHARM_SMSG, 0);
1743 #if ! CMK_SMSGS_FREE_AFTER_EVENT
1744         if(status == GNI_RC_SUCCESS)
1745         {
1746             FreeControlMsg(control_msg_tmp);
1747         }
1748 #endif
1749     } else if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR)
1750     {
1751         CmiAbort("Memory registor for large msg\n");
1752     }else 
1753     {
1754         buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, lmsg_tag);
1755     }
1756     return status;
1757 #endif
1760 INLINE_KEYWORD void LrtsBeginIdle(void) {}
1762 INLINE_KEYWORD void LrtsStillIdle(void) {}
1764 INLINE_KEYWORD void LrtsNotifyIdle(void) {}
1766 INLINE_KEYWORD void LrtsPrepareEnvelope(char *msg, int size)
1768     CmiSetMsgSize(msg, size);
1769     CMI_SET_CHECKSUM(msg, size);
1772 CmiCommHandle LrtsSendFunc(int destNode, int destPE, int size, char *msg, int mode)
1774     gni_return_t        status  =   GNI_RC_SUCCESS;
1775     uint8_t tag;
1776     CONTROL_MSG         *control_msg_tmp;
1777     int                 oob = ( mode & OUT_OF_BAND);
1778     SMSG_QUEUE          *queue;
1780     MACHSTATE5(8, "GO LrtsSendFn %d(%d) (%d,%d, %d) \n", destNode, size, buffered_send_msg, buffered_recv_msg, register_memory_size); 
1781 #if CMK_USE_OOB
1782     queue = oob? &smsg_oob_queue : &smsg_queue;
1783     tag = oob? LMSG_OOB_INIT_TAG: LMSG_INIT_TAG;
1784 #else
1785     queue = &smsg_queue;
1786     tag = LMSG_INIT_TAG;
1787 #endif
1789     LrtsPrepareEnvelope(msg, size);
1791 #if PRINT_SYH
1792     printf("LrtsSendFn %d==>%d, size=%d\n", myrank, destNode, size);
1793 #endif 
1795 #if CMK_SMP 
1796     if(size <= SMSG_MAX_MSG)
1797         buffer_small_msgs(queue, msg, size, destNode, SMALL_DATA_TAG);
1798     else if (size < BIG_MSG) {
1799         control_msg_tmp =  construct_control_msg(size, msg, 0);
1800         buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, tag);
1801     }
1802     else {
1803         CmiSetMsgSeq(msg, 0);
1804         control_msg_tmp =  construct_control_msg(size, msg, 1);
1805         buffer_small_msgs(queue, control_msg_tmp, CONTROL_MSG_SIZE, destNode, tag);
1806     }
1807 #else   //non-smp, smp(worker sending)
1808     if(size <= SMSG_MAX_MSG)
1809     {
1810         if (GNI_RC_SUCCESS == send_smsg_message(queue, destNode,  msg, size, SMALL_DATA_TAG, 0, NULL, CHARM_SMSG, 0))
1811         {  
1812 #if !CMK_SMSGS_FREE_AFTER_EVENT
1813             CmiFree(msg); 
1814 #endif
1815         }
1816     }
1817     else if (size < BIG_MSG) {
1818         control_msg_tmp =  construct_control_msg(size, msg, 0);
1819         send_large_messages(queue, destNode, control_msg_tmp, 0, NULL, tag);
1820     }
1821     else {
1822 #if     USE_LRTS_MEMPOOL
1823         CmiSetMsgSeq(msg, 0);
1824         control_msg_tmp =  construct_control_msg(size, msg, 1);
1825         send_large_messages(queue, destNode, control_msg_tmp, 0, NULL, tag);
1826 #else
1827         control_msg_tmp =  construct_control_msg(size, msg, 0);
1828         send_large_messages(queue, destNode, control_msg_tmp, 0, NULL, tag);
1829 #endif
1830     }
1831 #endif
1832     return 0;
1835 #if 0
1836 // this is no different from the common code
1837 void LrtsSyncListSendFn(int npes, int *pes, int len, char *msg)
1839   int i;
1840 #if CMK_BROADCAST_USE_CMIREFERENCE
1841   for(i=0;i<npes;i++) {
1842     if (pes[i] == CmiMyPe())
1843       CmiSyncSend(pes[i], len, msg);
1844     else {
1845       CmiReference(msg);
1846       CmiSyncSendAndFree(pes[i], len, msg);
1847     }
1848   }
1849 #else
1850   for(i=0;i<npes;i++) {
1851     CmiSyncSend(pes[i], len, msg);
1852   }
1853 #endif
1856 CmiCommHandle LrtsAsyncListSendFn(int npes, int *pes, int len, char *msg)
1858   /* A better asynchronous implementation may be wanted, but at least it works */
1859   CmiSyncListSendFn(npes, pes, len, msg);
1860   return (CmiCommHandle) 0;
1863 void LrtsFreeListSendFn(int npes, int *pes, int len, char *msg)
1865   if (npes == 1) {
1866       CmiSyncSendAndFree(pes[0], len, msg);
1867       return;
1868   }
1869 #if CMK_PERSISTENT_COMM
1870   if (CpvAccess(phs) && len > PERSIST_MIN_SIZE 
1871 #if CMK_SMP
1872             && IS_PERSISTENT_MEMORY(msg)
1873 #endif
1874      ){
1875       int i;
1876       for(i=0;i<npes;i++) {
1877         if (pes[i] == CmiMyPe())
1878           CmiSyncSend(pes[i], len, msg);
1879         else {
1880           CmiReference(msg);
1881           CmiSyncSendAndFree(pes[i], len, msg);
1882         }
1883       }
1884       CmiFree(msg);
1885       return;
1886   }
1887 #endif
1888   
1889 #if CMK_BROADCAST_USE_CMIREFERENCE
1890   CmiSyncListSendFn(npes, pes, len, msg);
1891   CmiFree(msg);
1892 #else
1893   int i;
1894   for(i=0;i<npes-1;i++) {
1895     CmiSyncSend(pes[i], len, msg);
1896   }
1897   if (npes>0)
1898     CmiSyncSendAndFree(pes[npes-1], len, msg);
1899   else 
1900     CmiFree(msg);
1901 #endif
1903 #endif
1905 static void    PumpDatagramConnection(void);
1906 static      int         event_SetupConnect = 111;
1907 static      int         event_PumpSmsg = 222 ;
1908 static      int         event_PumpTransaction = 333;
1909 static      int         event_PumpRdmaTransaction = 444;
1910 static      int         event_SendBufferSmsg = 484;
1911 static      int         event_SendFmaRdmaMsg = 555;
1912 static      int         event_AdvanceCommunication = 666;
1914 static void registerUserTraceEvents(void) {
1915 #if CMI_MACH_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
1916     event_SetupConnect = traceRegisterUserEvent("setting up connections", -1 );
1917     event_PumpSmsg = traceRegisterUserEvent("Pump network small msgs", -1);
1918     event_PumpTransaction = traceRegisterUserEvent("Pump FMA/RDMA local transaction" , -1);
1919     event_PumpRdmaTransaction = traceRegisterUserEvent("Pump RDMA remote event" , -1);
1920     event_SendBufferSmsg = traceRegisterUserEvent("Sending buffered small msgs", -1);
1921     event_SendFmaRdmaMsg = traceRegisterUserEvent("Sending buffered fma/rdma transactions", -1);
1922     event_AdvanceCommunication = traceRegisterUserEvent("Worker thread in sending/receiving", -1);
1923 #endif
1926 static void ProcessDeadlock(void)
1928     static CmiUInt8 *ptr = NULL;
1929     static CmiUInt8  last = 0, mysum, sum;
1930     static int count = 0;
1931     gni_return_t status;
1932     int i;
1934 //printf("[%d] comm thread detected hang %d %d %d\n", CmiMyPe(), smsg_send_count, smsg_recv_count, count);
1935 //sweep_mempool(CpvAccess(mempool));
1936     if (ptr == NULL) ptr = (CmiUInt8*)malloc(mysize * sizeof(CmiUInt8));
1937     mysum = smsg_send_count + smsg_recv_count;
1938     MACHSTATE5(9,"Before allgather Progress Deadlock (%d,%d)  (%d,%d)(%d)\n", buffered_send_msg, register_memory_size, last, sum, count); 
1939     status = (gni_return_t)PMI_Allgather(&mysum,ptr,sizeof(CmiUInt8));
1940     GNI_RC_CHECK("PMI_Allgather", status);
1941     sum = 0;
1942     for (i=0; i<mysize; i++)  sum+= ptr[i];
1943     if (last == 0 || sum == last) 
1944         count++;
1945     else
1946         count = 0;
1947     last = sum;
1948     MACHSTATE5(9,"Progress Deadlock (%d,%d)  (%d,%d)(%d)\n", buffered_send_msg, register_memory_size, last, sum, count); 
1949     if (count == 2) { 
1950         /* detected twice, it is a real deadlock */
1951         if (myrank == 0)  {
1952             CmiPrintf("Charm++> Network progress engine appears to have stalled, possibly because registered memory limits have been exceeded or are too low.  Try adjusting environment variables CHARM_UGNI_MEMPOOL_MAX and CHARM_UGNI_SEND_MAX (current limits are %lld and %lld).\n", MAX_REG_MEM, MAX_BUFF_SEND);
1953             CmiAbort("Fatal> Deadlock detected.");
1954         }
1956     }
1957     _detected_hang = 0;
1960 static void CheckProgress(void)
1962     if (smsg_send_count == last_smsg_send_count &&
1963         smsg_recv_count == last_smsg_recv_count ) 
1964     {
1965         _detected_hang = 1;
1966 #if !CMK_SMP
1967         if (_detected_hang) ProcessDeadlock();
1968 #endif
1970     }
1971     else {
1972         //MACHSTATE5(9,"--Check Progress %d(%d, %d) (%d,%d)\n", mycount, buffered_send_msg, register_memory_size, smsg_send_count, smsg_recv_count); 
1973         last_smsg_send_count = smsg_send_count;
1974         last_smsg_recv_count = smsg_recv_count;
1975         _detected_hang = 0;
1976     }
1979 static void set_limit(void)
1981     //if (!user_set_flag && CmiMyRank() == 0) {
1982     if (CmiMyRank() == 0) {
1983         int mynode = CmiPhysicalNodeID(CmiMyPe());
1984         int numpes = CmiNumPesOnPhysicalNode(mynode);
1985         int numprocesses = numpes / CmiMyNodeSize();
1986         MAX_REG_MEM  = _totalmem / numprocesses;
1987         MAX_BUFF_SEND = MAX_REG_MEM / 2;
1988         if (CmiMyPe() == 0)
1989            printf("mem_max = %.2fM, send_max =%.2fM\n", MAX_REG_MEM/1024.0/1024, MAX_BUFF_SEND/1024./1024);
1990         if(CmiMyPe() == 0 && (smsg_memlen*mysize + _expand_mem > MAX_BUFF_SEND ||  smsg_memlen*mysize + _mempool_size > MAX_BUFF_SEND))
1991         {
1992              printf("Charm++> FATAL ERROR your program has risk of hanging \n please try large page or use Dynamic smsg +useDynamicSmsg or contact Charm++ developers\n");
1993              CmiAbort("memory registration\n");
1994         }
1995     }
1998 void LrtsPostCommonInit(int everReturn)
2000 #if CMK_DIRECT
2001     CmiDirectInit();
2002 #endif
2003 #if CMI_MACH_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
2004     CpvInitialize(double, projTraceStart);
2005     /* only PE 0 needs to care about registration (to generate sts file). */
2006     //if (CmiMyPe() == 0) 
2007     {
2008         registerMachineUserEventsFunction(&registerUserTraceEvents);
2009     }
2010 #endif
2012 #if !CMK_SMP
2013     if (useDynamicSMSG)
2014     CcdCallOnConditionKeep(CcdPERIODIC_10ms, (CcdVoidFn) PumpDatagramConnection, NULL);
2015 #endif
2017 #if ! LARGEPAGE
2018     if (_checkProgress)
2019 #if CMK_SMP
2020     if (CmiMyRank() == 0)
2021 #endif
2022     CcdCallOnConditionKeep(CcdPERIODIC_2minute, (CcdVoidFn) CheckProgress, NULL);
2023 #endif
2025 #if !LARGEPAGE
2026     CcdCallOnCondition(CcdTOPOLOGY_AVAIL, (CcdVoidFn)set_limit, NULL);
2027 #endif
2030 /* this is called by worker thread */
2031 void LrtsPostNonLocal(void)
2033 #if 1
2035 #if CMK_SMP_TRACE_COMMTHREAD
2036     double startT, endT;
2037 #endif
2039 #if MULTI_THREAD_SEND
2040     if(mysize == 1) return;
2042     if (CmiMyRank() % 6 != 3) return;
2044 #if CMK_SMP_TRACE_COMMTHREAD
2045     traceEndIdle();
2046     startT = CmiWallTimer();
2047 #endif
2049     CmiMachineProgressImpl();
2051 #if CMK_SMP_TRACE_COMMTHREAD
2052     endT = CmiWallTimer();
2053     traceUserBracketEvent(event_AdvanceCommunication, startT, endT);
2054     traceBeginIdle();
2055 #endif
2057 #endif
2058 #endif
2061 /* Network progress function is used to poll the network when for
2062    messages. This flushes receive buffers on some  implementations*/
2063 #if CMK_MACHINE_PROGRESS_DEFINED
2064 void CmiMachineProgressImpl(void) {
2065 #if ! CMK_SMP || MULTI_THREAD_SEND
2067     STATS_PUMPNETWORK_TIME(PumpNetworkSmsg());
2068     SEND_OOB_SMSG(smsg_oob_queue)
2069     PUMP_REMOTE_HIGHPRIORITY
2070     PUMP_LOCAL_HIGHPRIORITY
2071     POST_HIGHPRIORITY_RDMA
2073 #if 0
2074 #if CMK_WORKER_SINGLE_TASK
2075     if (CmiMyRank() % 6 == 0)
2076 #endif
2077     PumpNetworkSmsg();
2079 #if CMK_WORKER_SINGLE_TASK
2080     if (CmiMyRank() % 6 == 1)
2081 #endif
2082     PumpLocalTransactions(default_tx_cqh, default_tx_cq_lock);
2084 #if CMK_WORKER_SINGLE_TASK
2085     if (CmiMyRank() % 6 == 2)
2086 #endif
2087     PumpLocalTransactions(rdma_tx_cqh, rdma_tx_cq_lock);
2089 #if REMOTE_EVENT
2090 #if CMK_WORKER_SINGLE_TASK
2091     if (CmiMyRank() % 6 == 3)
2092 #endif
2093     PumpRemoteTransactions(rdma_rx_cqh);         // rdma_rx_cqh
2094 #endif
2096 #if CMK_WORKER_SINGLE_TASK
2097     if (CmiMyRank() % 6 == 4)
2098 #endif
2099     {
2100 #if CMK_USE_OOB
2101     SendBufferMsg(&smsg_oob_queue, NULL);
2102     SendBufferMsg(&smsg_queue, &smsg_oob_queue);
2103 #else
2104     SendBufferMsg(&smsg_queue, NULL);
2105 #endif
2106     }
2108 #if CMK_WORKER_SINGLE_TASK
2109     if (CmiMyRank() % 6 == 5)
2110 #endif
2111 #if CMK_SMP
2112     STATS_SENDRDMAMSG_TIME(SendRdmaMsg(sendRdmaBuf));
2113 #else
2114     STATS_SENDRDMAMSG_TIME(SendRdmaMsg());
2115 #endif
2117 #endif
2118 #endif
2120 #endif
2123 /* useDynamicSMSG */
2124 static void    PumpDatagramConnection(void)
2126     uint32_t          remote_address;
2127     uint32_t          remote_id;
2128     gni_return_t status;
2129     gni_post_state_t  post_state;
2130     uint64_t          datagram_id;
2131     int i;
2133    while ((status = GNI_PostDataProbeById(nic_hndl, &datagram_id)) == GNI_RC_SUCCESS)
2134    {
2135        if (datagram_id >= mysize) {           /* bound endpoint */
2136            int pe = datagram_id - mysize;
2137            CMI_GNI_LOCK(global_gni_lock)
2138            status = GNI_EpPostDataTestById( ep_hndl_array[pe], datagram_id, &post_state, &remote_address, &remote_id);
2139            CMI_GNI_UNLOCK(global_gni_lock)
2140            if(status == GNI_RC_SUCCESS && post_state == GNI_POST_COMPLETED)
2141            {
2142                CmiAssert(remote_id == pe);
2143                status = GNI_SmsgInit(ep_hndl_array[pe], smsg_attr_vector_local[pe], smsg_attr_vector_remote[pe]);
2144                GNI_RC_CHECK("Dynamic SMSG Init", status);
2145 #if PRINT_SYH
2146                printf("[%d] ++ Dynamic SMSG setup [%d===>%d] done\n", myrank, myrank, pe);
2147 #endif
2148                CmiAssert(smsg_connected_flag[pe] == 1);
2149                smsg_connected_flag[pe] = 2;
2150            }
2151        }
2152        else {         /* unbound ep */
2153            status = GNI_EpPostDataTestById( ep_hndl_unbound, datagram_id, &post_state, &remote_address, &remote_id);
2154            if(status == GNI_RC_SUCCESS && post_state == GNI_POST_COMPLETED)
2155            {
2156                CmiAssert(remote_id<mysize);
2157                CmiAssert(smsg_connected_flag[remote_id] <= 0);
2158                status = GNI_SmsgInit(ep_hndl_array[remote_id], &send_smsg_attr, &recv_smsg_attr);
2159                GNI_RC_CHECK("Dynamic SMSG Init", status);
2160 #if PRINT_SYH
2161                printf("[%d] ++ Dynamic SMSG setup2 [%d===>%d] done\n", myrank, myrank, remote_id);
2162 #endif
2163                smsg_connected_flag[remote_id] = 2;
2165                alloc_smsg_attr(&send_smsg_attr);
2166                status = GNI_EpPostDataWId (ep_hndl_unbound, &send_smsg_attr,  SMSG_ATTR_SIZE, &recv_smsg_attr, SMSG_ATTR_SIZE, myrank);
2167                GNI_RC_CHECK("post unbound datagram", status);
2168            }
2169        }
2170    }
2173 /* pooling CQ to receive network message */
2174 static void PumpNetworkRdmaMsgs(void)
2176     gni_cq_entry_t      event_data;
2177     gni_return_t        status;
2181 INLINE_KEYWORD
2182 static void bufferRdmaMsg(PCQueue bufferqueue, int inst_id, gni_post_descriptor_t *pd, int ack_index)
2184     RDMA_REQUEST        *rdma_request_msg;
2185     MallocRdmaRequest(rdma_request_msg);
2186     rdma_request_msg->destNode = inst_id;
2187     rdma_request_msg->pd = pd;
2188 #if REMOTE_EVENT
2189     rdma_request_msg->ack_index = ack_index;
2190 #endif
2191     PCQueuePush(bufferqueue, (char*)rdma_request_msg);
2194 static void getLargeMsgRequest(void* header, uint64_t inst_id,  uint8_t tag, PCQueue);
2195 static void getPersistentMsgRequest(void* header, uint64_t inst_id,  uint8_t tag, PCQueue);
2196 static void PRINT_CONTROL(void *header)
2198     CONTROL_MSG *control_msg = (CONTROL_MSG *) header;
2200     printf(" length=%d , seq_id = %d, addr = %lld:%lld:%lld  \n", control_msg->length, control_msg->seq_id, control_msg->source_addr, (control_msg->source_mem_hndl).qword1, (control_msg->source_mem_hndl).qword2 );
2201 #if PERSISTENT_GET_BASE
2202     printf(" memhdl = %lld:%lld:%lld \n", control_msg->dest_addr, (control_msg->dest_mem_hndl).qword1, (control_msg->dest_mem_hndl).qword2);
2203 #endif
2205 static void PumpNetworkSmsg()
2207     uint64_t            inst_id;
2208     gni_cq_entry_t      event_data;
2209     gni_return_t        status, deregStatus;
2210     void                *header;
2211     uint8_t             msg_tag;
2212     int                 msg_nbytes;
2213     void                *msg_data;
2214     gni_mem_handle_t    msg_mem_hndl;
2215     gni_smsg_attr_t     *smsg_attr;
2216     gni_smsg_attr_t     *remote_smsg_attr;
2217     int                 init_flag;
2218     CONTROL_MSG         *control_msg_tmp, *header_tmp;
2219     uint64_t            source_addr;
2220     SMSG_QUEUE         *queue = &smsg_queue;
2221     PCQueue             tmp_queue;
2222     int                 recvInfoSize;
2223 #if  CMK_DIRECT
2224     cmidirectMsg        *direct_msg;
2225 #endif
2226 #if CMI_PUMPNETWORKSMSG_CAP 
2227     int                  recv_cnt = 0;
2228     while(recv_cnt< PumpNetworkSmsg_cap) {
2229 #else
2230     while(1) {
2231 #endif
2232         CMI_GNI_LOCK(smsg_rx_cq_lock)
2233         status =GNI_CqGetEvent(smsg_rx_cqh, &event_data);
2234         CMI_GNI_UNLOCK(smsg_rx_cq_lock)
2235         if(status != GNI_RC_SUCCESS) break;
2237         inst_id = GNI_CQ_GET_REM_INST_ID(event_data);
2238 #if REMOTE_EVENT
2239         inst_id = GET_RANK(inst_id);      /* important */
2240 #endif
2241         // GetEvent returns success but GetNext return not_done. caused by Smsg out-of-order transfer
2242 #if PRINT_SYH
2243         printf("[%d] %d PumpNetworkMsgs is received from PE: %d,  status=%s\n", myrank, CmiMyRank(), inst_id,  gni_err_str[status]);
2244 #endif
2245         if (useDynamicSMSG) {
2246             /* subtle: smsg may come before connection is setup */
2247             while (smsg_connected_flag[inst_id] != 2) 
2248                PumpDatagramConnection();
2249         }
2250         msg_tag = GNI_SMSG_ANY_TAG;
2251         while(1) {
2252             CMI_GNI_LOCK(smsg_mailbox_lock)
2253             status = GNI_SmsgGetNextWTag(ep_hndl_array[inst_id], &header, &msg_tag);
2254             if (status != GNI_RC_SUCCESS)
2255             {
2256                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2257                 break;
2258             }
2259 #if         CMI_PUMPNETWORKSMSG_CAP
2260             recv_cnt++; 
2261 #endif
2262 #if PRINT_SYH
2263             printf("[%d] from %d smsg msg is received, messageid: tag=%d\n", myrank, inst_id, msg_tag);
2264 #endif
2265             /* copy msg out and then put into queue (small message) */
2266             switch (msg_tag) {
2267             case SMALL_DATA_TAG:
2268             {
2269                 msg_nbytes = CmiGetMsgSize(header);
2270                 CmiAssert(msg_nbytes > 0);
2271                 msg_data    = CmiAlloc(msg_nbytes);
2272                 memcpy(msg_data, (char*)header, msg_nbytes);
2273                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2274                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2275                 CMI_CHECK_CHECKSUM(msg_data, msg_nbytes);
2276                 handleOneRecvedMsg(msg_nbytes, (char *)msg_data);
2277                 break;
2278             }
2279             case LMSG_PERSISTENT_INIT_TAG:
2280             {   CMI_GNI_UNLOCK(smsg_mailbox_lock)
2281                 getPersistentMsgRequest(header, inst_id, msg_tag, sendRdmaBuf);
2282                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2283                 break;
2284             }
2285             case LMSG_INIT_TAG:
2286             case LMSG_OOB_INIT_TAG:
2287             {
2288                 tmp_queue = (msg_tag == LMSG_INIT_TAG)? sendRdmaBuf : sendHighPriorBuf; 
2289 #if MULTI_THREAD_SEND
2290                 MallocControlMsg(control_msg_tmp);
2291                 memcpy(control_msg_tmp, header, CONTROL_MSG_SIZE);
2292                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2293                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2294                 getLargeMsgRequest(control_msg_tmp, inst_id, msg_tag, tmp_queue);
2295                 FreeControlMsg(control_msg_tmp);
2296 #else
2297                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2298                 getLargeMsgRequest(header, inst_id, msg_tag, tmp_queue);
2299                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2300 #endif
2301                 break;
2302             }
2303 #if !REMOTE_EVENT && !CQWRITE
2304             case ACK_TAG:   //msg fit into mempool
2305             {
2306                 /* Get is done, release message . Now put is not used yet*/
2307                 void *msg = (void*)(((ACK_MSG *)header)->source_addr);
2308                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2309                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2310 #if ! USE_LRTS_MEMPOOL
2311                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((ACK_MSG *)header)->source_mem_hndl), &omdh, ((ACK_MSG *)header)->length);
2312 #else
2313                 DecreaseMsgInSend(msg);
2314 #endif
2315                 if(NoMsgInSend(msg))
2316                     buffered_send_msg -= GetMempoolsize(msg);
2317                 MACHSTATE5(8, "GO send done to %d (%d,%d, %d) tag=%d\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size, msg_tag); 
2318                 CmiFree(msg);
2319 #if CMI_EXERT_SEND_LARGE_CAP
2320                 SEND_large_pending--;
2321 #endif
2322                 break;
2323             }
2324 #endif
2325 #if CMK_ONESIDED_IMPL
2326             case RDMA_ACK_TAG:
2327             {
2328                 CmiGNIAckOp_t *ack_data = (CmiGNIAckOp_t *)header;
2329                 CmiRdmaAck * ack = ack_data->ack;
2330                 gni_mem_handle_t mem_hndl = ack_data->mem_hndl;
2331                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2332                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2334                 // call the fnPtr to handle the ack
2335                 ack->fnPtr(ack->token);
2337                 // free callback structure, CmiRdmaAck allocated in CmiSetRdmaAck
2338                 free(ack);
2340                 // Deregister registered sender memory used for GET
2341                 status = GNI_MemDeregister(nic_hndl, &mem_hndl);
2342                 GNI_RC_CHECK("GNI_MemDeregister on Sender for GET operation", status);
2343                 break;
2344             }
2345             case RDMA_PUT_MD_TAG:
2346             {
2347                 CmiGNIRzvRdmaRecv_t *recvInfo = (CmiGNIRzvRdmaRecv_t *)header;
2348                 recvInfoSize = LrtsGetRdmaRecvInfoSize(recvInfo->numOps);
2349                 CmiGNIRzvRdmaRecv_t *newRecvInfo = (CmiGNIRzvRdmaRecv_t *)malloc(recvInfoSize);
2350                 memcpy(newRecvInfo, recvInfo, recvInfoSize);
2351                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2352                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2354                 LrtsIssueRputs((void *)newRecvInfo, recvInfo->destNode);
2355                 break;
2356             }
2357             case RDMA_PUT_MD_DIRECT_TAG:
2358             {
2359                 NcpyOperationInfo *ncpyOpInfo = (NcpyOperationInfo *)header;
2361                 // copy into a new object
2362                 NcpyOperationInfo *newNcpyOpInfo = (NcpyOperationInfo *)CmiAlloc(ncpyOpInfo->ncpyOpInfoSize);
2363                 memcpy(newNcpyOpInfo, ncpyOpInfo, ncpyOpInfo->ncpyOpInfoSize);
2365                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2366                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2368                 post_rdma((uint64_t)newNcpyOpInfo->destPtr,
2369                           ((CmiGNIRzvRdmaPtr_t *)((char *)(newNcpyOpInfo->destLayerInfo) + CmiGetRdmaCommonInfoSize()))->mem_hndl,
2370                           (uint64_t)newNcpyOpInfo->srcPtr,
2371                           ((CmiGNIRzvRdmaPtr_t *)((char *)(newNcpyOpInfo->srcLayerInfo) + CmiGetRdmaCommonInfoSize()))->mem_hndl,
2372                           newNcpyOpInfo->srcSize,
2373                           (uint64_t)newNcpyOpInfo,
2374                           CmiNodeOf(newNcpyOpInfo->destPe),
2375                           GNI_POST_RDMA_PUT,
2376                           DIRECT_SEND_RECV);
2378                 break;
2379             }
2380             case RDMA_PUT_DONE_TAG:
2381             {
2382                 CmiGNIRzvRdmaRecv_t *recvInfo = (CmiGNIRzvRdmaRecv_t *)header;
2384                 /* copy the received message (recvInfo) into newRecvInfo as
2385                  * recvInfo is invalid after SmsgRelease call
2386                  */
2387                 recvInfoSize = LrtsGetRdmaRecvInfoSize(recvInfo->numOps);
2388                 CmiGNIRzvRdmaRecv_t *newRecvInfo = (CmiGNIRzvRdmaRecv_t *)malloc(recvInfoSize);
2389                 memcpy(newRecvInfo, recvInfo, recvInfoSize);
2390                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2391                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2393                 char *msg = (char *)newRecvInfo->msg;
2394                 int size = CmiGetMsgSize(msg);
2396                 handleOneRecvedMsg(size, msg);
2398                 int i;
2399                 for(i=0; i<newRecvInfo->numOps;i++){
2400                     CmiGNIRzvRdmaRecvOp_t * recvOp = &newRecvInfo->rdmaOp[i];
2401                     // Deregister registered receiver memory used for PUT
2402                     status = GNI_MemDeregister(nic_hndl, &(recvOp->remote_mem_hndl));
2403                     GNI_RC_CHECK("GNI_MemDeregister on Receiver for PUT operation", status);
2404                 }
2405                 // free newRecvInfo as it is no longer used
2406                 free(newRecvInfo);
2407                 break;
2408             }
2409             case RDMA_PUT_DONE_DIRECT_TAG:
2410             {
2411                 // Direct API when PUT is used instead of a GET
2412                 // This tag implies the completion of an indirect PUT operation used for the Direct API
2413                 CmiGNIRzvRdmaDirectInfo_t *putOp = (CmiGNIRzvRdmaDirectInfo_t *)header;
2414                 void *token = (void *)putOp->ref;
2415                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2416                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2418                 // Invoke the ack handler function
2419                 CmiInvokeNcpyAck(token);
2420                 break;
2421             }
2422             case RDMA_DEREG_DIRECT_TAG:
2423             {
2424                 // Direct API
2425                 // This tag implies a request to free the memory handle local to this node
2426                 gni_mem_handle_t *memhndl = (gni_mem_handle_t *)header;
2427                 status = GNI_MemDeregister(nic_hndl, memhndl);
2428                 GNI_RC_CHECK("GNI_MemDeregister Failed!", status);
2429                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2430                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2431                 break;
2432             }
2433             case RDMA_REG_AND_PUT_MD_DIRECT_TAG:
2434             {
2435                 NcpyOperationInfo *ncpyOpInfo = (NcpyOperationInfo *)header;
2437                 // copy into a new object
2438                 NcpyOperationInfo *newNcpyOpInfo = (NcpyOperationInfo *)CmiAlloc(ncpyOpInfo->ncpyOpInfoSize);
2439                 memcpy(newNcpyOpInfo, ncpyOpInfo, ncpyOpInfo->ncpyOpInfoSize);
2441                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2442                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2444                 resetNcpyOpInfoPointers(newNcpyOpInfo);
2446                 // Register source buffer
2447                 ((CmiGNIRzvRdmaPtr_t *)((char *)(newNcpyOpInfo->srcLayerInfo) + CmiGetRdmaCommonInfoSize()))->mem_hndl = 
2448                                             registerDirectMem(newNcpyOpInfo->srcPtr,
2449                                                                 newNcpyOpInfo->srcSize,
2450                                                                 GNI_MEM_READ_ONLY);
2452                 post_rdma((uint64_t)newNcpyOpInfo->destPtr,
2453                           ((CmiGNIRzvRdmaPtr_t *)((char *)(newNcpyOpInfo->destLayerInfo) + CmiGetRdmaCommonInfoSize()))->mem_hndl,
2454                           (uint64_t)newNcpyOpInfo->srcPtr,
2455                           ((CmiGNIRzvRdmaPtr_t *)((char *)(newNcpyOpInfo->srcLayerInfo) + CmiGetRdmaCommonInfoSize()))->mem_hndl,
2456                           newNcpyOpInfo->srcSize,
2457                           (uint64_t)newNcpyOpInfo,
2458                           CmiNodeOf(newNcpyOpInfo->destPe),
2459                           GNI_POST_RDMA_PUT,
2460                           DIRECT_SEND_RECV);
2462                 break;
2463             }
2464             case RDMA_REG_AND_GET_MD_DIRECT_TAG:
2465             {
2466                 NcpyOperationInfo *ncpyOpInfo = (NcpyOperationInfo *)header;
2468                 // copy into a new object
2469                 NcpyOperationInfo *newNcpyOpInfo = (NcpyOperationInfo *)CmiAlloc(ncpyOpInfo->ncpyOpInfoSize);
2470                 memcpy(newNcpyOpInfo, ncpyOpInfo, ncpyOpInfo->ncpyOpInfoSize);
2472                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2473                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2475                 resetNcpyOpInfoPointers(newNcpyOpInfo);
2477                 ((CmiGNIRzvRdmaPtr_t *)((char *)(newNcpyOpInfo->destLayerInfo) + CmiGetRdmaCommonInfoSize()))->mem_hndl =
2478                                               registerDirectMem(newNcpyOpInfo->destPtr,
2479                                                                 newNcpyOpInfo->srcSize,
2480                                                                 GNI_MEM_READWRITE);
2482                 post_rdma((uint64_t)newNcpyOpInfo->srcPtr,
2483                           ((CmiGNIRzvRdmaPtr_t *)((char *)(newNcpyOpInfo->srcLayerInfo) + CmiGetRdmaCommonInfoSize()))->mem_hndl,
2484                           (uint64_t)newNcpyOpInfo->destPtr,
2485                           ((CmiGNIRzvRdmaPtr_t *)((char *)(newNcpyOpInfo->destLayerInfo) + CmiGetRdmaCommonInfoSize()))->mem_hndl,
2486                           newNcpyOpInfo->srcSize,
2487                           (uint64_t)newNcpyOpInfo,
2488                           CmiNodeOf(newNcpyOpInfo->srcPe),
2489                           GNI_POST_RDMA_GET,
2490                           DIRECT_SEND_RECV);
2492                  break;
2493             }
2494 #endif
2495             case BIG_MSG_TAG:  //big msg, de-register, transfer next seg
2496             {
2497 #if MULTI_THREAD_SEND
2498                 MallocControlMsg(header_tmp);
2499                 memcpy(header_tmp, header, CONTROL_MSG_SIZE);
2500                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2501 #else
2502                 header_tmp = (CONTROL_MSG *) header;
2503 #endif
2504                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2505 #if CMI_EXERT_SEND_LARGE_CAP
2506                 SEND_large_pending--;
2507 #endif
2508                 void *msg = (void*)(header_tmp->source_addr);
2509                 int cur_seq = CmiGetMsgSeq(msg);
2510                 int offset = ONE_SEG*(cur_seq+1);
2511                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(header_tmp->source_mem_hndl), &omdh, header_tmp->length);
2512                 buffered_send_msg -= header_tmp->length;
2513                 int remain_size = CmiGetMsgSize(msg) - header_tmp->length;
2514                 if (remain_size < 0) remain_size = 0;
2515                 CmiSetMsgSize(msg, remain_size);
2516                 if(remain_size <= 0) //transaction done
2517                 {
2518                     CmiFree(msg);
2519                 }else if (header_tmp->total_length > offset)
2520                 {
2521                     CmiSetMsgSeq(msg, cur_seq+1);
2522                     control_msg_tmp = construct_control_msg(header_tmp->total_length, (char *)msg, cur_seq+1+1);
2523                     control_msg_tmp->dest_addr = header_tmp->dest_addr;
2524                     //send next seg
2525                     send_large_messages( queue, inst_id, control_msg_tmp, 0, NULL, LMSG_INIT_TAG);
2526                          // pipelining
2527                     if (header_tmp->seq_id == 1) {
2528                       int i;
2529                       for (i=1; i<BIG_MSG_PIPELINE; i++) {
2530                         int seq = cur_seq+i+2;
2531                         CmiSetMsgSeq(msg, seq-1);
2532                         control_msg_tmp =  construct_control_msg(header_tmp->total_length, (char *)msg, seq);
2533                         control_msg_tmp->dest_addr = header_tmp->dest_addr;
2534                         send_large_messages( queue, inst_id, control_msg_tmp, 0, NULL, LMSG_INIT_TAG);
2535                         if (header_tmp->total_length <= ONE_SEG*seq) break;
2536                       }
2537                     }
2538                 }
2539 #if MULTI_THREAD_SEND
2540                 FreeControlMsg(header_tmp);
2541 #else
2542                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2543 #endif
2544                 break;
2545             }
2546 #if CMK_PERSISTENT_COMM_PUT && !REMOTE_EVENT && !CQWRITE
2547             case PUT_DONE_TAG:  {   //persistent message
2548                 void *msg = (void *)(((CONTROL_MSG *) header)->source_addr);
2549                 int size = ((CONTROL_MSG *) header)->length;
2550                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2551                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2552                 CmiReference(msg);
2553                 CMI_CHECK_CHECKSUM(msg, size);
2554                 handleOneRecvedMsg(size, msg); 
2555 #if PRINT_SYH
2556                 printf("[%d] PUT_DONE_TAG hand over one message, size: %d. \n", myrank, size);
2557 #endif
2558                 break;
2559             }
2560 #endif
2561 #if CMK_DIRECT
2562             case DIRECT_PUT_DONE_TAG:  //cmi direct 
2563                 //create a trigger message
2564                 direct_msg = (cmidirectMsg*)CmiAlloc(sizeof(cmidirectMsg));
2565                 direct_msg->handler = ((CMK_DIRECT_HEADER*)header)->handler_addr;
2566                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2567                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2568                 CmiSetHandler(direct_msg, CpvAccess(CmiHandleDirectIdx));
2569                 CmiPushPE(((CmiDirectUserHandle*)direct_msg->handler)->remoteRank, direct_msg);
2570                 //(*(((CMK_DIRECT_HEADER*) header)->callbackFnPtr))(((CMK_DIRECT_HEADER*) header)->callbackData);
2571                 break;
2572 #endif
2573             default:
2574                 GNI_SmsgRelease(ep_hndl_array[inst_id]);
2575                 CMI_GNI_UNLOCK(smsg_mailbox_lock)
2576                 printf("weird tag problem %d \n", msg_tag);
2577                 CmiAbort("Unknown tag\n");
2578             }               // end switch
2579 #if PRINT_SYH
2580             printf("[%d] from %d after switch request for smsg is received, messageid: tag=%d\n", myrank, inst_id, msg_tag);
2581 #endif
2582             smsg_recv_count ++;
2583             msg_tag = GNI_SMSG_ANY_TAG;
2584         } //endwhile GNI_SmsgGetNextWTag
2585     }   //end while GetEvent
2586     if(status == GNI_RC_ERROR_RESOURCE)
2587     {
2588         printf("charm> Please use +useRecvQueue %d in your command line, if the error comes again, increase this number\n", REMOTE_QUEUE_ENTRIES*2);
2589         GNI_RC_CHECK("Smsg_rx_cq full", status);
2590     }
2593 static void printDesc(gni_post_descriptor_t *pd)
2595     printf(" Descriptor (%p===>%p)(%d)\n", pd->local_addr, pd->remote_addr, pd->length); 
2598 #if CQWRITE
2599 static void sendCqWrite(int destNode, uint64_t data, gni_mem_handle_t mem_hndl)
2601     gni_post_descriptor_t *pd;
2602     gni_return_t        status = GNI_RC_SUCCESS;
2603     
2604     MallocPostDesc(pd);
2605     pd->type = GNI_POST_CQWRITE;
2606     pd->cq_mode = GNI_CQMODE_SILENT;
2607     //pd->cq_mode = GNI_CQMODE_GLOBAL_EVENT | GNI_CQMODE_REMOTE_EVENT ;
2608     pd->dlvr_mode = GNI_DLVMODE_PERFORMANCE;
2609     pd->cqwrite_value = data;
2610     pd->remote_mem_hndl = mem_hndl;
2611     status = GNI_PostCqWrite(ep_hndl_array[destNode], pd);
2612     GNI_RC_CHECK("GNI_PostCqWrite", status);
2614 #endif
2616 // register memory for a message
2617 // return mem handle
2618 static gni_return_t  registerMessage(void *msg, int size, int seqno, gni_mem_handle_t *memh)
2620     gni_return_t status = GNI_RC_SUCCESS;
2622     if (!IsMemHndlZero(*memh)) return GNI_RC_SUCCESS;
2624 #if CMK_PERSISTENT_COMM_PUT
2625       // persistent message is always registered
2626       // BIG_MSG small pieces do not have malloc chunk header
2627     if (IS_PERSISTENT_MEMORY(msg)) {
2628         *memh = GetMemHndl(msg);
2629         return GNI_RC_SUCCESS;
2630     }
2631 #endif
2632     if(seqno == 0 
2633 #if CMK_PERSISTENT_COMM_PUT
2634          || seqno == PERSIST_SEQ
2635 #endif
2636       )
2637     {
2638         if(IsMemHndlZero((GetMemHndl(msg))))
2639         {
2640             msg = (void*)(msg);
2641             status = registerMemory(GetMempoolBlockPtr(msg), GetMempoolsize(msg), &(GetMemHndl(msg)), rdma_rx_cqh);
2642             if(status == GNI_RC_SUCCESS)
2643                 *memh = GetMemHndl(msg);
2644         }
2645         else {
2646             *memh = GetMemHndl(msg);
2647         }
2648     }
2649     else {
2650         //big msg, can not fit into memory pool, or CmiDirect Msg (which is not from mempool)
2651         status = registerMemory(msg, size, memh, NULL); 
2652     }
2653     return status;
2656 static void getPersistentMsgRequest(void* header, uint64_t inst_id, uint8_t tag, PCQueue bufferRdmaQueue )
2658 #if   PERSISTENT_GET_BASE
2659     CONTROL_MSG         *request_msg;
2660     gni_return_t        status;
2661     gni_post_descriptor_t *pd;
2662     request_msg = (CONTROL_MSG *) header;
2664     MallocPostDesc(pd);
2665     pd->cqwrite_value = request_msg->seq_id;
2666     pd->first_operand = ALIGN64(request_msg->length); //  total length
2667     if(request_msg->length <= LRTS_GNI_RDMA_THRESHOLD) 
2668         pd->type            = GNI_POST_FMA_GET;
2669     else
2670         pd->type            = GNI_POST_RDMA_GET;
2671     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT ;
2672     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
2673     pd->length          = ALIGN64(request_msg->length);
2674     pd->local_addr      = (uint64_t) request_msg->dest_addr;
2675     pd->local_mem_hndl  = request_msg->dest_mem_hndl;
2676     pd->remote_addr     = (uint64_t) request_msg->source_addr;
2677     pd->remote_mem_hndl = request_msg->source_mem_hndl;
2678     pd->src_cq_hndl     = 0;
2679     pd->rdma_mode       = 0;
2680     pd->amo_cmd         = (gni_fma_cmd_type_t)0;
2681 #if REMOTE_EVENT
2682     bufferRdmaMsg(bufferRdmaQueue, inst_id, pd, request_msg->ack_index); 
2683 #else
2684     bufferRdmaMsg(bufferRdmaQueue, inst_id, pd, -1); 
2685 #endif
2687 #endif
2690 // for BIG_MSG called on receiver side for receiving control message
2691 // LMSG_INIT_TAG
2692 static void getLargeMsgRequest(void* header, uint64_t inst_id, uint8_t tag, PCQueue bufferRdmaQueue )
2694 #if     USE_LRTS_MEMPOOL
2695     CONTROL_MSG         *request_msg;
2696     gni_return_t        status = GNI_RC_SUCCESS;
2697     void                *msg_data;
2698     gni_post_descriptor_t *pd;
2699     gni_mem_handle_t    msg_mem_hndl;
2700     int                 size, transaction_size, offset = 0;
2701     size_t              register_size = 0;
2703     // initial a get to transfer data from the sender side */
2704     request_msg = (CONTROL_MSG *) header;
2705     size = request_msg->total_length;
2706     MACHSTATE4(8, "GO Get request from %d (%d,%d, %d) \n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size); 
2707     MallocPostDesc(pd);
2708 #if CMK_WITH_STATS 
2709     pd->sync_flag_addr = 1000000 * CmiWallTimer(); //microsecond
2710 #endif
2711     if(request_msg->seq_id < 2)   {
2712         MACHSTATE2(8, "%d seq id in get large msg requrest %d\n", CmiMyRank(), request_msg->seq_id);
2713 #if CMK_SMP_TRACE_COMMTHREAD 
2714         pd->sync_flag_addr = 1000000 * CmiWallTimer(); //microsecond
2715 #endif
2716         msg_data = CmiAlloc(size);
2717         CmiSetMsgSeq(msg_data, 0);
2718         _MEMCHECK(msg_data);
2719     }
2720     else {
2721         offset = ONE_SEG*(request_msg->seq_id-1);
2722         msg_data = (char*)request_msg->dest_addr + offset;
2723     }
2724    
2725     pd->cqwrite_value = request_msg->seq_id;
2727     transaction_size = request_msg->seq_id == 0? ALIGN64(size) : ALIGN64(request_msg->length);
2728     SetMemHndlZero(pd->local_mem_hndl);
2729     status = registerMessage(msg_data, transaction_size, request_msg->seq_id, &pd->local_mem_hndl);
2730     if (status == GNI_RC_SUCCESS && request_msg->seq_id == 0) {
2731         if(NoMsgInRecv( (void*)(msg_data)))
2732             register_size = GetMempoolsize((void*)(msg_data));
2733     }
2735     pd->first_operand = ALIGN64(size);                   //  total length
2737     if(request_msg->total_length <= LRTS_GNI_RDMA_THRESHOLD)
2738         pd->type            = GNI_POST_FMA_GET;
2739     else
2740         pd->type            = GNI_POST_RDMA_GET;
2741     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT;
2742     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
2743     pd->length          = transaction_size;
2744     pd->local_addr      = (uint64_t) msg_data;
2745     pd->remote_addr     = request_msg->source_addr + offset;
2746     pd->remote_mem_hndl = request_msg->source_mem_hndl;
2748     if (tag == LMSG_OOB_INIT_TAG) 
2749         pd->src_cq_hndl     = highprior_rdma_tx_cqh;
2750     else
2751     {
2752 #if MULTI_THREAD_SEND
2753         pd->src_cq_hndl     = rdma_tx_cqh;
2754 #else
2755         pd->src_cq_hndl     = 0;
2756 #endif
2757     }
2759     pd->rdma_mode       = (gni_fma_cmd_type_t)0;
2760     pd->amo_cmd         = (gni_fma_cmd_type_t)0;
2761 #if CMI_EXERT_RECV_RDMA_CAP
2762     if(status == GNI_RC_SUCCESS && RDMA_pending >= RDMA_cap ) status = GNI_RC_ERROR_RESOURCE; 
2763 #endif
2764     //memory registration success
2765     if(status == GNI_RC_SUCCESS && tag == LMSG_OOB_INIT_TAG )
2766     {
2767         CmiNodeLock lock = pd->type == GNI_POST_RDMA_GET?rdma_tx_cq_lock:default_tx_cq_lock;
2768         CMI_GNI_LOCK(lock)
2769 #if REMOTE_EVENT
2770         if( request_msg->seq_id == 0)
2771         {
2772             pd->cq_mode |= GNI_CQMODE_REMOTE_EVENT;
2773             int sts = GNI_EpSetEventData(ep_hndl_array[inst_id], inst_id, ACK_EVENT(request_msg->ack_index));
2774             GNI_RC_CHECK("GNI_EpSetEventData", sts);
2775         }
2776 #endif
2778 #if CMK_WITH_STATS
2779         RDMA_TRY_SEND(pd->type)
2780 #endif
2781         if(pd->type == GNI_POST_RDMA_GET) 
2782         {
2783             status = GNI_PostRdma(ep_hndl_array[inst_id], pd);
2784         }
2785         else
2786         {
2787             status = GNI_PostFma(ep_hndl_array[inst_id],  pd);
2788         }
2789         CMI_GNI_UNLOCK(lock)
2791         if(status == GNI_RC_SUCCESS )
2792         {
2793 #if CMI_EXERT_RECV_RDMA_CAP
2794             RDMA_pending++;
2795 #endif
2796             if(pd->cqwrite_value == 0)
2797             {
2798 #if MACHINE_DEBUG_LOG
2799                 buffered_recv_msg += register_size;
2800                 MACHSTATE4(8, "GO request from %d (%d,%d, %d)\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size); 
2801 #endif
2802                 IncreaseMsgInRecv(msg_data);
2803 #if CMK_SMP_TRACE_COMMTHREAD 
2804                 pd->sync_flag_value = 1000000 * CmiWallTimer(); //microsecond
2805 #endif
2806             }
2807 #if  CMK_WITH_STATS
2808             pd->sync_flag_value = 1000000 * CmiWallTimer(); //microsecond
2809             RDMA_TRANS_INIT(pd->type, pd->sync_flag_addr/1000000.0)
2810 #endif
2811         }
2812     }else if (status != GNI_RC_SUCCESS)
2813     {
2814         SetMemHndlZero((pd->local_mem_hndl));
2815     }
2816         if(status == GNI_RC_ERROR_RESOURCE|| status == GNI_RC_ERROR_NOMEM || tag != LMSG_OOB_INIT_TAG)
2817     {
2818 #if REMOTE_EVENT
2819         bufferRdmaMsg(bufferRdmaQueue, inst_id, pd, request_msg->ack_index); 
2820 #else
2821         bufferRdmaMsg(bufferRdmaQueue, inst_id, pd, -1); 
2822 #endif
2823     }else if (status != GNI_RC_SUCCESS) {
2824         // printf("source: %d pd:(%p,%p)(%p,%p) len:%d local:%x remote:%x\n", (int)inst_id, (pd->local_mem_hndl).qword1, (pd->local_mem_hndl).qword2, (pd->remote_mem_hndl).qword1, (pd->remote_mem_hndl).qword2, pd->length, pd->local_addr, pd->remote_addr);
2825         GNI_RC_CHECK("GetLargeAFter posting", status);
2826     }
2827 #else
2828     CONTROL_MSG         *request_msg;
2829     gni_return_t        status;
2830     void                *msg_data;
2831     gni_post_descriptor_t *pd;
2832     RDMA_REQUEST        *rdma_request_msg;
2833     gni_mem_handle_t    msg_mem_hndl;
2834     //int source;
2835     // initial a get to transfer data from the sender side */
2836     request_msg = (CONTROL_MSG *) header;
2837     msg_data = CmiAlloc(request_msg->length);
2838     _MEMCHECK(msg_data);
2840     MEMORY_REGISTER(onesided_hnd, nic_hndl, msg_data, request_msg->length, &msg_mem_hndl, &omdh, NULL,  status)
2842     if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR) 
2843     {
2844         GNI_RC_CHECK("Invalid/permission Mem Register in post", status);
2845     }
2847     MallocPostDesc(pd);
2848     if(request_msg->length <= LRTS_GNI_RDMA_THRESHOLD) 
2849         pd->type            = GNI_POST_FMA_GET;
2850     else
2851         pd->type            = GNI_POST_RDMA_GET;
2852     pd->cq_mode         = GNI_CQMODE_GLOBAL_EVENT;// |  GNI_CQMODE_REMOTE_EVENT;
2853     pd->dlvr_mode       = GNI_DLVMODE_PERFORMANCE;
2854     pd->length          = ALIGN64(request_msg->length);
2855     pd->local_addr      = (uint64_t) msg_data;
2856     pd->remote_addr     = request_msg->source_addr;
2857     pd->remote_mem_hndl = request_msg->source_mem_hndl;
2858     if (tag == LMSG_OOB_INIT_TAG) 
2859         pd->src_cq_hndl     = highprior_rdma_tx_cqh;
2860     else
2861     {
2862 #if MULTI_THREAD_SEND
2863         pd->src_cq_hndl     = rdma_tx_cqh;
2864 #else
2865         pd->src_cq_hndl     = 0;
2866 #endif
2867     }
2868     pd->rdma_mode       = 0;
2869     pd->amo_cmd         = 0;
2871     //memory registration successful
2872     if(status == GNI_RC_SUCCESS)
2873     {
2874         pd->local_mem_hndl  = msg_mem_hndl;
2875        
2876         if(pd->type == GNI_POST_RDMA_GET) 
2877         {
2878             CMI_GNI_LOCK(rdma_tx_cq_lock)
2879             status = GNI_PostRdma(ep_hndl_array[inst_id], pd);
2880             CMI_GNI_UNLOCK(rdma_tx_cq_lock)
2881         }
2882         else
2883         {
2884             CMI_GNI_LOCK(default_tx_cq_lock)
2885             status = GNI_PostFma(ep_hndl_array[inst_id],  pd);
2886             CMI_GNI_UNLOCK(default_tx_cq_lock)
2887         }
2889     }else
2890     {
2891         SetMemHndlZero(pd->local_mem_hndl);
2892     }
2893     if(status == GNI_RC_ERROR_RESOURCE|| status == GNI_RC_ERROR_NOMEM )
2894     {
2895         MallocRdmaRequest(rdma_request_msg);
2896         rdma_request_msg->next = 0;
2897         rdma_request_msg->destNode = inst_id;
2898         rdma_request_msg->pd = pd;
2899         PCQueuePush(sendRdmaBuf, (char*)rdma_request_msg);
2900     }else {
2901         GNI_RC_CHECK("AFter posting", status);
2902     }
2903 #endif
2906 #if CQWRITE
2907 static void PumpCqWriteTransactions(void)
2910     gni_cq_entry_t          ev;
2911     gni_return_t            status;
2912     void                    *msg;  
2913     int                     msg_size;
2914     while(1) {
2915         //CMI_GNI_LOCK(my_cq_lock) 
2916         status = GNI_CqGetEvent(rdma_rx_cqh, &ev);
2917         //CMI_GNI_UNLOCK(my_cq_lock)
2918         if(status != GNI_RC_SUCCESS) break;
2919         msg = (void*) ( GNI_CQ_GET_DATA(ev) & 0xFFFFFFFFFFFFL);
2920 #if CMK_PERSISTENT_COMM_PUT
2921 #if PRINT_SYH
2922         printf(" %d CQ write event %p\n", myrank, msg);
2923 #endif
2924         if (!IsMemHndlZero(MEMHFIELD(msg))) {
2925 #if PRINT_SYH
2926             printf(" %d Persistent CQ write event %p\n", myrank, msg);
2927 #endif
2928             CmiReference(msg);
2929             msg_size = CmiGetMsgSize(msg);
2930             CMI_CHECK_CHECKSUM(msg, msg_size);
2931             handleOneRecvedMsg(msg_size, msg); 
2932             continue;
2933         }
2934 #endif
2935 #if ! USE_LRTS_MEMPOOL
2936        // MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((ACK_MSG *)header)->source_mem_hndl), &omdh, ((ACK_MSG *)header)->length);
2937 #else
2938         DecreaseMsgInSend(msg);
2939 #endif
2940         if(NoMsgInSend(msg))
2941             buffered_send_msg -= GetMempoolsize(msg);
2942         CmiFree(msg);
2943     };
2944     if(status == GNI_RC_ERROR_RESOURCE)
2945     {
2946         GNI_RC_CHECK("rdma_rx_cq full too many ack", status);
2947     }
2949 #endif
2951 #if REMOTE_EVENT
2952 static void PumpRemoteTransactions(gni_cq_handle_t rx_cqh)
2954     gni_cq_entry_t          ev;
2955     gni_return_t            status;
2956     void                    *msg;   
2957     int                     inst_id, index, type, size;
2959 #if CMI_PUMPREMOTETRANSACTIONS_CAP
2960     int                     pump_count = 0;
2961 #endif
2962     while(1) {
2963 #if CMI_PUMPREMOTETRANSACTIONS_CAP
2964         if (pump_count > PumpRemoteTransactions_cap) break;
2965 #endif
2966         CMI_GNI_LOCK(global_gni_lock)
2967 //        CMI_GNI_LOCK(rdma_tx_cq_lock)
2968         status = GNI_CqGetEvent(rx_cqh, &ev);
2969 //        CMI_GNI_UNLOCK(rdma_tx_cq_lock)
2970         CMI_GNI_UNLOCK(global_gni_lock)
2972         if(status != GNI_RC_SUCCESS) break;
2974 #if CMI_PUMPREMOTETRANSACTIONS_CAP
2975         pump_count ++;
2976 #endif
2978         inst_id = GNI_CQ_GET_REM_INST_ID(ev);
2979         index = GET_INDEX(inst_id);
2980         type = GET_TYPE(inst_id);
2981         switch (type) {
2982         case 0:    // ACK
2983             CmiAssert(index>=0 && index<ackPool.size);
2984             CMI_GNI_LOCK(ackPool.lock);
2985             //CmiAssert(GetIndexType(ackPool, index) == 1);
2986             msg = GetIndexAddress(ackPool, index);
2987             CMI_GNI_UNLOCK(ackPool.lock);
2988 #if PRINT_SYH
2989             MACHSTATE4(8,"[%d] PumpRemoteTransactions: ack: %lld index: %d type: %d.\n", myrank, msg, index, type);
2990 #endif
2991 #if ! USE_LRTS_MEMPOOL
2992            // MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((ACK_MSG *)header)->source_mem_hndl), &omdh, ((ACK_MSG *)header)->length);
2993 #else
2994             DecreaseMsgInSend(msg);
2995 #endif
2996             if(NoMsgInSend(msg))
2997                 buffered_send_msg -= GetMempoolsize(msg);
2998             CmiFree(msg);
2999             IndexPool_freeslot(&ackPool, index);
3000 #if CMI_EXERT_SEND_LARGE_CAP
3001             SEND_large_pending--;
3002 #endif
3003             break;
3004 #if CMK_PERSISTENT_COMM_PUT
3005         case 1:  {    // PERSISTENT
3006             CmiLock(persistPool.lock);
3007             CmiAssert(GetIndexType(persistPool, index) == NONCHARM_SMSG);
3008             PersistentReceivesTable *slot = (PersistentReceivesTable *)GetIndexAddress(persistPool, index);
3009             CmiUnlock(persistPool.lock);
3010             msg = slot->destBuf[slot->addrIndex].destAddress;
3011             size = CmiGetMsgSize(msg);
3012             CmiReference(msg);
3013             CMI_CHECK_CHECKSUM(msg, size);
3014             handleOneRecvedMsg(size, (char*)msg); 
3015             break;
3016             }
3017 #endif
3018         default:
3019             fprintf(stderr, "[%d] PumpRemoteTransactions: unknown type: %d\n", myrank, type);
3020             CmiAbort("PumpRemoteTransactions: unknown type");
3021         }
3022     }
3023     if(status == GNI_RC_ERROR_RESOURCE)
3024     {
3025         printf("charm> Please use +useRecvQueue %d in your command line, if the error comes again, increase this number\n", REMOTE_QUEUE_ENTRIES*2);
3026         GNI_RC_CHECK("PumpRemoteTransactions: rx_cqh full", status);
3027     }
3029 #endif
3031 /* This code overlaps with code in machine-onesided.c in PumpOneSidedRDMATransactions() */
3032 static void PumpLocalTransactions(gni_cq_handle_t my_tx_cqh, CmiNodeLock my_cq_lock)
3034     gni_cq_entry_t          ev;
3035     gni_return_t            status;
3036     uint64_t                type, inst_id;
3037     gni_post_descriptor_t   *tmp_pd;
3038     MSG_LIST                *ptr;
3039     CONTROL_MSG             *ack_msg_tmp;
3040     ACK_MSG                 *ack_msg;
3041     uint8_t                 msg_tag;
3042 #if CMK_DIRECT
3043     CMK_DIRECT_HEADER       *cmk_direct_done_msg;
3044 #endif
3045     SMSG_QUEUE         *queue = &smsg_queue;
3046 #if CMI_PUMPLOCALTRANSACTIONS_CAP
3047     int         pump_count = 0;
3048     while(pump_count < PumpLocalTransactions_cap) {
3049         pump_count++;
3050 #else
3051     while(1) {
3052 #endif
3053         CMI_GNI_LOCK(my_cq_lock) 
3054         status = GNI_CqGetEvent(my_tx_cqh, &ev);
3055         CMI_GNI_UNLOCK(my_cq_lock)
3056         if(status != GNI_RC_SUCCESS) break;
3057         
3058         type = GNI_CQ_GET_TYPE(ev);
3059         if (type == GNI_CQ_EVENT_TYPE_POST)
3060         {
3062 #if CMI_EXERT_RECV_RDMA_CAP
3063             if(RDMA_pending <=0) CmiAbort(" pending error\n");
3064             RDMA_pending--;
3065 #endif
3066             inst_id     = GNI_CQ_GET_INST_ID(ev);
3067 #if PRINT_SYH
3068             printf("[%d] LocalTransactions localdone=%d\n", myrank,  lrts_local_done_msg);
3069 #endif
3070             CMI_GNI_LOCK(my_cq_lock)
3071             status = GNI_GetCompleted(my_tx_cqh, ev, &tmp_pd);
3072             CMI_GNI_UNLOCK(my_cq_lock)
3074             switch (tmp_pd->type) {
3075 #if CMK_PERSISTENT_COMM_PUT  || CMK_DIRECT
3076             case GNI_POST_RDMA_PUT:
3077 #if CMK_PERSISTENT_COMM_PUT && ! USE_LRTS_MEMPOOL
3078                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length);
3079 #endif
3080             case GNI_POST_FMA_PUT:
3081                 if(tmp_pd->amo_cmd == 1) {
3082 #if CMK_DIRECT
3083                     //sender ACK to receiver to trigger it is done
3084                     cmk_direct_done_msg = (CMK_DIRECT_HEADER*) malloc(sizeof(CMK_DIRECT_HEADER));
3085                     cmk_direct_done_msg->handler_addr = tmp_pd->first_operand;
3086                     msg_tag = DIRECT_PUT_DONE_TAG;
3087 #endif
3088                 }
3089                 else {
3090                     CmiFree((void *)tmp_pd->local_addr);
3091 #if REMOTE_EVENT
3092                     FreePostDesc(tmp_pd);
3093                     continue;
3094 #elif CQWRITE
3095                     sendCqWrite(inst_id, tmp_pd->remote_addr, tmp_pd->remote_mem_hndl);
3096                     FreePostDesc(tmp_pd);
3097                     continue;
3098 #else
3099                     MallocControlMsg(ack_msg_tmp);
3100                     ack_msg_tmp->source_addr = tmp_pd->remote_addr;
3101                     ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
3102                     ack_msg_tmp->length  = tmp_pd->length;
3103                     msg_tag = PUT_DONE_TAG;
3104 #endif
3105                 }
3106                 break;
3107 #endif
3108             case GNI_POST_RDMA_GET:
3109             case GNI_POST_FMA_GET:  {
3110                 MACHSTATE2(8, "PumpLocal Get done %lld=>%lld\n", tmp_pd->local_addr, tmp_pd->remote_addr);
3111 #if  ! USE_LRTS_MEMPOOL
3112                 MallocControlMsg(ack_msg_tmp);
3113                 ack_msg_tmp->source_addr = tmp_pd->remote_addr;
3114                 ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
3115                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length)
3116                 msg_tag = ACK_TAG;  
3117 #else
3118 #if CMK_WITH_STATS
3119                 RDMA_TRANS_DONE(tmp_pd->sync_flag_value/1000000.0)
3120 #endif
3121                 int seq_id = tmp_pd->cqwrite_value;
3122                 if(seq_id > 0)      // BIG_MSG
3123                 {
3124                     MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length);
3125                     MallocControlMsg(ack_msg_tmp);
3126                     ack_msg_tmp->source_addr = tmp_pd->remote_addr;
3127                     ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
3128                     ack_msg_tmp->seq_id = seq_id;
3129                     ack_msg_tmp->dest_addr = tmp_pd->local_addr - ONE_SEG*(ack_msg_tmp->seq_id-1);
3130                     ack_msg_tmp->source_addr -= ONE_SEG*(ack_msg_tmp->seq_id-1);
3131                     ack_msg_tmp->length = tmp_pd->length;
3132                     ack_msg_tmp->total_length = tmp_pd->first_operand;     // total size
3133                     msg_tag = BIG_MSG_TAG; 
3134                 } 
3135                 else
3136                 {
3137                     if(seq_id < 0)
3138                         CmiReference((void*)tmp_pd->local_addr);
3139                     msg_tag = ACK_TAG; 
3140 #if  !REMOTE_EVENT && !CQWRITE
3141                     MallocAckMsg(ack_msg);
3142                     ack_msg->source_addr = tmp_pd->remote_addr;
3143 #endif
3144                 }
3145 #endif
3146                 break;
3147             }
3148             case  GNI_POST_CQWRITE:
3149                    FreePostDesc(tmp_pd);
3150                    continue;
3151             default:
3152                 CmiPrintf("type=%d\n", tmp_pd->type);
3153                 CmiAbort("PumpLocalTransactions: unknown type!");
3154             }      /* end of switch */
3156 #if CMK_DIRECT
3157             if (tmp_pd->amo_cmd == 1) {
3158                 status = send_smsg_message(queue, inst_id, cmk_direct_done_msg, sizeof(CMK_DIRECT_HEADER), msg_tag, 0, NULL, NONCHARM_SMSG, 0);
3159 #if ! CMK_SMSGS_FREE_AFTER_EVENT
3160                 if (status == GNI_RC_SUCCESS) free(cmk_direct_done_msg); 
3161 #endif
3162             }
3163             else
3164 #endif
3165             if (msg_tag == ACK_TAG) {
3166 #if !REMOTE_EVENT
3167 #if   !CQWRITE
3168                 status = send_smsg_message(queue, inst_id, ack_msg, ACK_MSG_SIZE, msg_tag, 0, NULL, NONCHARM_SMSG, 0); 
3169 #if !CMK_SMSGS_FREE_AFTER_EVENT
3170                 if (status == GNI_RC_SUCCESS) FreeAckMsg(ack_msg);
3171 #endif
3172 #else
3173                 sendCqWrite(inst_id, tmp_pd->remote_addr, tmp_pd->remote_mem_hndl); 
3174 #endif
3175 #endif
3176             }
3177             else {
3178                 status = send_smsg_message(queue, inst_id, ack_msg_tmp, CONTROL_MSG_SIZE, msg_tag, 0, NULL, NONCHARM_SMSG, 0); 
3179 #if !CMK_SMSGS_FREE_AFTER_EVENT
3180                 if (status == GNI_RC_SUCCESS) FreeControlMsg(ack_msg_tmp);
3181 #endif
3182             }
3183 #if CMK_PERSISTENT_COMM_PUT
3184             if (tmp_pd->type == GNI_POST_RDMA_GET || tmp_pd->type == GNI_POST_FMA_GET)
3185 #endif
3186             {
3187                 if( msg_tag == ACK_TAG){    //msg fit in mempool 
3188 #if PRINT_SYH
3189                     printf("PumpLocalTransactions: Normal msg transaction PE:%d==>%d\n", myrank, inst_id);
3190 #endif
3191                     TRACE_COMM_CONTROL_CREATION((double)(tmp_pd->sync_flag_addr/1000000.0), (double)((tmp_pd->sync_flag_addr+1)/1000000.0), (double)((tmp_pd->sync_flag_addr+1)/1000000.0), (char*)tmp_pd->local_addr); 
3192                     TRACE_COMM_CONTROL_CREATION((double)(tmp_pd->sync_flag_value/1000000.0), (double)((tmp_pd->sync_flag_value+1)/1000000.0), (double)((tmp_pd->sync_flag_value+1)/1000000.0), (char*)tmp_pd->local_addr); 
3194                     //CmiAssert(SIZEFIELD((void*)(tmp_pd->local_addr)) <= tmp_pd->length);
3195                     DecreaseMsgInRecv((void*)tmp_pd->local_addr);
3196 #if MACHINE_DEBUG_LOG
3197                     if(NoMsgInRecv((void*)(tmp_pd->local_addr)))
3198                         buffered_recv_msg -= GetMempoolsize((void*)(tmp_pd->local_addr));
3199                     MACHSTATE5(8, "GO Recv done ack send from %d (%d,%d, %d) tag=%d\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size, msg_tag); 
3200 #endif
3201                     CMI_CHECK_CHECKSUM((void*)tmp_pd->local_addr, CmiGetMsgSize(tmp_pd->local_addr));
3202                     handleOneRecvedMsg(CmiGetMsgSize(tmp_pd->local_addr), (char *)tmp_pd->local_addr);
3203                 }else if(msg_tag == BIG_MSG_TAG){
3204                     void *msg = (char*)tmp_pd->local_addr-(tmp_pd->cqwrite_value-1)*ONE_SEG;
3205                     CmiSetMsgSeq(msg, CmiGetMsgSeq(msg)+1);
3206                     if (tmp_pd->first_operand <= ONE_SEG*CmiGetMsgSeq(msg)) {
3207                         START_EVENT();
3208 #if PRINT_SYH
3209                         printf("Pipeline msg done [%d]\n", myrank);
3210 #endif
3211 #if     CMK_SMP_TRACE_COMMTHREAD
3212                         if( tmp_pd->cqwrite_value == 1)
3213                             TRACE_COMM_CONTROL_CREATION((double)(tmp_pd->sync_flag_addr/1000000.0), (double)((tmp_pd->sync_flag_addr+1)/1000000.0), (double)((tmp_pd->sync_flag_addr+2)/1000000.0), (char*)tmp_pd->local_addr); 
3214 #endif
3215                         CMI_CHECK_CHECKSUM(msg, CmiGetMsgSize(msg));
3216                         handleOneRecvedMsg(CmiGetMsgSize(msg), (char *)msg);
3217                     }
3218                 }
3219             }
3220             FreePostDesc(tmp_pd);
3221         }
3222 #if CMK_SMSGS_FREE_AFTER_EVENT
3223         else if  (type == GNI_CQ_EVENT_TYPE_SMSG) {
3224             // a SmsgsSend is done
3225             int msgid = GNI_CQ_GET_MSG_ID(ev);
3226             int type = GetIndexType(smsgsPool, msgid);
3227             void *addr = GetIndexAddress(smsgsPool, msgid);
3228             switch (type) {
3229             case CHARM_SMSG:
3230                 CmiFree(addr);
3231                 break;
3232             case NONCHARM_SMSG:
3233                 free(addr);
3234                 break;
3235             case NONCHARM_SMSG_DONT_FREE:
3236                 break;
3237             default:
3238                 CmiAbort("Invalid SmsgsIndex");
3239             }
3240             IndexPool_freeslot(&smsgsPool, msgid);
3241         }
3242 #endif
3243     } //end while
3244     if(status == GNI_RC_ERROR_RESOURCE)
3245     {
3246         printf("charm> Please use +useSendQueue 204800 in your command line, if the error comes again, increase this number\n");  
3247         GNI_RC_CHECK("Smsg_tx_cq full", status);
3248     }
3251 static void  SendRdmaMsg( BufferList sendqueue)
3253     gni_return_t            status = GNI_RC_SUCCESS;
3254     gni_mem_handle_t        msg_mem_hndl;
3255     RDMA_REQUEST            *ptr = 0, *tmp_ptr;
3256     RDMA_REQUEST            *pre = 0;
3257     uint64_t                register_size = 0;
3258     void                    *msg;
3259     int                     i;
3261     int len = PCQueueLength(sendqueue);
3262     for (i=0; i<len; i++)
3263     {
3264 #if CMI_EXERT_RECV_RDMA_CAP
3265         if( RDMA_pending >= RDMA_cap) break;
3266 #endif
3267         CMI_PCQUEUEPOP_LOCK( sendqueue)
3268         ptr = (RDMA_REQUEST*)PCQueuePop(sendqueue);
3269         CMI_PCQUEUEPOP_UNLOCK( sendqueue)
3270         if (ptr == NULL) break;
3271         
3272         gni_post_descriptor_t *pd = ptr->pd;
3273         
3274         msg = (void*)(pd->local_addr);
3275         status = registerMessage(msg, pd->length, pd->cqwrite_value, &pd->local_mem_hndl);
3276         register_size = 0;
3277         if(pd->cqwrite_value == 0) {
3278             if(NoMsgInRecv(msg))
3279                 register_size = GetMempoolsize(msg);
3280         }
3282         if(status == GNI_RC_SUCCESS)        //mem register good
3283         {
3284             int destNode = ptr->destNode;
3285             CmiNodeLock lock = (pd->type == GNI_POST_RDMA_GET || pd->type == GNI_POST_RDMA_PUT) ? rdma_tx_cq_lock:default_tx_cq_lock;
3286             CMI_GNI_LOCK(lock);
3287 #if REMOTE_EVENT
3288             if( pd->cqwrite_value == 0 || pd->cqwrite_value == -1) {
3289                 pd->cq_mode |= GNI_CQMODE_REMOTE_EVENT;
3290                 int sts = GNI_EpSetEventData(ep_hndl_array[destNode], destNode, ACK_EVENT(ptr->ack_index));
3291                 GNI_RC_CHECK("GNI_EpSetEventData", sts);
3292             }
3293 #if CMK_PERSISTENT_COMM_PUT
3294             else if (pd->cqwrite_value == PERSIST_SEQ) {
3295                 pd->cq_mode |= GNI_CQMODE_REMOTE_EVENT;
3296                 int sts = GNI_EpSetEventData(ep_hndl_array[destNode], destNode, PERSIST_EVENT(ptr->ack_index));
3297                 GNI_RC_CHECK("GNI_EpSetEventData", sts);
3298             }
3299 #endif
3300 #if CMK_DIRECT
3301             else if (pd->cqwrite_value == DIRECT_SEQ) {
3302                 pd->cq_mode |= GNI_CQMODE_REMOTE_EVENT;
3303                 int sts = GNI_EpSetEventData(ep_hndl_array[destNode], destNode, DIRECT_EVENT(ptr->ack_index));
3304                 GNI_RC_CHECK("GNI_EpSetEventData", sts);
3305             }
3306 #endif
3308 #endif
3309 #if CMK_WITH_STATS
3310             RDMA_TRY_SEND(pd->type)
3311 #endif
3312 #if CMK_SMP_TRACE_COMMTHREAD
3313             if(IS_PUT(pd->type))
3314             {
3315                  START_EVENT();
3316                  TRACE_COMM_CREATION(EVENT_TIME(), (char*)pd->local_addr);//based on assumption, post always succeeds on first try
3317             }
3318 #endif
3319             if(pd->type == GNI_POST_RDMA_GET || pd->type == GNI_POST_RDMA_PUT) 
3320             {
3321                 status = GNI_PostRdma(ep_hndl_array[destNode], pd);
3322             }
3323             else
3324             {
3325                 status = GNI_PostFma(ep_hndl_array[destNode],  pd);
3326             }
3327             CMI_GNI_UNLOCK(lock);
3328             
3329             if(status == GNI_RC_SUCCESS)    //post good
3330             {
3331                 MACHSTATE4(8, "post noempty-rdma  %d (%lld==%lld,%d) \n", ptr->destNode, pd->local_addr, pd->remote_addr,  register_memory_size); 
3332 #if CMI_EXERT_RECV_RDMA_CAP
3333                 RDMA_pending ++;
3334 #endif
3335                 if(pd->cqwrite_value <= 0)
3336                 {
3337 #if CMK_SMP_TRACE_COMMTHREAD 
3338                     pd->sync_flag_value = 1000000 * CmiWallTimer(); //microsecond
3339 #endif
3340                     IncreaseMsgInRecv(((void*)(pd->local_addr)));
3341                 }
3342 #if  CMK_WITH_STATS
3343                 pd->sync_flag_value = 1000000 * CmiWallTimer(); //microsecond
3344                 RDMA_TRANS_INIT(pd->type, pd->sync_flag_addr/1000000.0)
3345 #endif
3346 #if MACHINE_DEBUG_LOG
3347                 buffered_recv_msg += register_size;
3348                 MACHSTATE(8, "GO request from buffered\n"); 
3349 #endif
3350 #if PRINT_SYH
3351                 printf("[%d] SendRdmaMsg: post succeed. seqno: %d\n", myrank, pd->cqwrite_value);
3352 #endif
3353                 FreeRdmaRequest(ptr);
3354             }else           // cannot post
3355             {
3356                 PCQueuePush(sendRdmaBuf, (char*)ptr);
3357 #if PRINT_SYH
3358                 printf("[%d] SendRdmaMsg: post failed. seqno: %x dest: %d local mhdl: %lld %lld remote mhdl: %lld %lld connect: %d\n", myrank, pd->cqwrite_value, destNode, pd->local_mem_hndl.qword1, pd->local_mem_hndl.qword2, pd->remote_mem_hndl.qword1, pd->remote_mem_hndl.qword2, smsg_connected_flag[destNode]);
3359 #endif
3360                 break;
3361             }
3362         } else          //memory registration fails
3363         {
3364             PCQueuePush(sendqueue, (char*)ptr);
3365         }
3366     } //end while
3369 static 
3370 INLINE_KEYWORD gni_return_t _sendOneBufferedSmsg(SMSG_QUEUE *queue, MSG_LIST *ptr)
3372     CONTROL_MSG         *control_msg_tmp;
3373     gni_return_t        status = GNI_RC_ERROR_RESOURCE;
3374     int                 numRdmaOps, recvInfoSize, msgSize;
3375     NcpyOperationInfo *ncpyOpInfo;
3377     MACHSTATE5(8, "noempty-smsg  %d (%d,%d,%d) tag=%d \n", ptr->destNode, buffered_send_msg, buffered_recv_msg, register_memory_size, ptr->tag); 
3378     if (useDynamicSMSG && smsg_connected_flag[ptr->destNode] != 2) {   
3379             /* connection not exists yet */
3380 #if CMK_SMP
3381             /* non-smp case, connect is issued in send_smsg_message */
3382         if (smsg_connected_flag[ptr->destNode] == 0)
3383             connect_to(ptr->destNode); 
3384 #endif
3385     }
3386     else
3387     switch(ptr->tag)
3388     {
3389     case SMALL_DATA_TAG:
3390         status = send_smsg_message(queue, ptr->destNode,  ptr->msg, ptr->size, ptr->tag, 1, ptr, CHARM_SMSG, 0);  
3391 #if !CMK_SMSGS_FREE_AFTER_EVENT
3392         if(status == GNI_RC_SUCCESS)
3393         {
3394             CmiFree(ptr->msg);
3395         }
3396 #endif
3397         break;
3398     case LMSG_PERSISTENT_INIT_TAG:
3399     case LMSG_INIT_TAG:
3400     case LMSG_OOB_INIT_TAG:
3401         control_msg_tmp = (CONTROL_MSG*)ptr->msg;
3402         status = send_large_messages(queue, ptr->destNode, control_msg_tmp, 1, ptr, ptr->tag);
3403         break;
3404 #if !REMOTE_EVENT && !CQWRITE
3405     case ACK_TAG:
3406         status = send_smsg_message(queue, ptr->destNode, ptr->msg, ptr->size, ptr->tag, 1, ptr, NONCHARM_SMSG, 0);  
3407 #if !CMK_SMSGS_FREE_AFTER_EVENT
3408         if(status == GNI_RC_SUCCESS) FreeAckMsg((ACK_MSG*)ptr->msg);
3409 #endif
3410         break;
3411 #endif
3412     case BIG_MSG_TAG:
3413         status = send_smsg_message(queue, ptr->destNode, ptr->msg, ptr->size, ptr->tag, 1, ptr, NONCHARM_SMSG, 0);  
3414 #if !CMK_SMSGS_FREE_AFTER_EVENT
3415         if(status == GNI_RC_SUCCESS)
3416         {
3417             FreeControlMsg((CONTROL_MSG*)ptr->msg);
3418         }
3419 #endif
3420         break;
3421 #if CMK_PERSISTENT_COMM_PUT && !REMOTE_EVENT && !CQWRITE 
3422     case PUT_DONE_TAG:
3423         status = send_smsg_message(queue, ptr->destNode, ptr->msg, ptr->size, ptr->tag, 1, ptr, NONCHARM_SMSG, 0);
3424 #if !CMK_SMSGS_FREE_AFTER_EVENT
3425         if(status == GNI_RC_SUCCESS)
3426         {
3427             FreeControlMsg((CONTROL_MSG*)ptr->msg);
3428         }
3429 #endif
3430         break;
3431 #endif
3432 #if CMK_DIRECT
3433     case DIRECT_PUT_DONE_TAG:
3434         status = send_smsg_message(queue, ptr->destNode, ptr->msg, sizeof(CMK_DIRECT_HEADER), ptr->tag, 1, ptr, NONCHARM_SMSG, 0);
3435 #if !CMK_SMSGS_FREE_AFTER_EVENT
3436         if(status == GNI_RC_SUCCESS)
3437         {
3438             free((CMK_DIRECT_HEADER*)ptr->msg);
3439         }
3440 #endif
3441         break;
3442 #endif
3443     case RDMA_ACK_TAG:
3444         status = send_smsg_message(queue, ptr->destNode, ptr->msg, sizeof(CmiGNIAckOp_t), ptr->tag, 1, ptr, NONCHARM_SMSG, 1);
3445 #if !CMK_SMSGS_FREE_AFTER_EVENT
3446         if(status == GNI_RC_SUCCESS) {
3447           free(ptr->msg);
3448         }
3449 #endif
3450         break;
3452     case RDMA_PUT_MD_TAG:
3453         numRdmaOps = ((CmiGNIRzvRdmaRecv_t *)(ptr->msg))->numOps;
3454         recvInfoSize = LrtsGetRdmaRecvInfoSize(numRdmaOps);
3455         status = send_smsg_message(queue, ptr->destNode, ptr->msg, recvInfoSize, ptr->tag, 1, ptr, NONCHARM_SMSG_DONT_FREE, 0);
3456         break;
3458      case RDMA_PUT_DONE_TAG:
3459         numRdmaOps = ((CmiGNIRzvRdmaRecv_t *)(ptr->msg))->numOps;
3460         recvInfoSize = LrtsGetRdmaRecvInfoSize(numRdmaOps);
3461         status = send_smsg_message(queue, ptr->destNode, ptr->msg, recvInfoSize, ptr->tag, 1, ptr, NONCHARM_SMSG, 1);
3462 #if !CMK_SMSGS_FREE_AFTER_EVENT
3463         if(status == GNI_RC_SUCCESS) {
3464           free(ptr->msg);
3465         }
3466 #endif
3467         break;
3469      case RDMA_PUT_MD_DIRECT_TAG:
3470         ncpyOpInfo = (NcpyOperationInfo *)(ptr->msg);
3471         status = send_smsg_message(queue, ptr->destNode, ptr->msg, ncpyOpInfo->ncpyOpInfoSize, ptr->tag, 1, ptr, CHARM_SMSG, 1);
3472 #if !CMK_SMSGS_FREE_AFTER_EVENT
3473         if(status == GNI_RC_SUCCESS) {
3474           CmiFree(ptr->msg);
3475         }
3476 #endif
3477         break;
3479      case RDMA_REG_AND_GET_MD_DIRECT_TAG:
3480      case RDMA_REG_AND_PUT_MD_DIRECT_TAG:
3481         //msgSize = sizeof(CmiGNIRzvRdmaReverseOp_t) + 2*(((CmiGNIRzvRdmaReverseOp_t *)(ptr->msg))->ackSize);
3482         ncpyOpInfo = (NcpyOperationInfo *)(ptr->msg);
3483         status = send_smsg_message(queue, ptr->destNode, ptr->msg, ncpyOpInfo->ncpyOpInfoSize, ptr->tag, 1, ptr, CHARM_SMSG, 1);
3484 #if !CMK_SMSGS_FREE_AFTER_EVENT
3485         if(status == GNI_RC_SUCCESS) {
3486           CmiFree(ptr->msg);
3487         }
3488 #endif
3489         break;
3491     default:
3492         printf("Weird tag: %d\n", ptr->tag);
3493         CmiAbort("should not happen\n");
3494     }       // end switch
3495     return status;
3498 // return 1 if all messages are sent
3500 #if ONE_SEND_QUEUE
3502 static int SendBufferMsg(SMSG_QUEUE *queue, SMSG_QUEUE *prio_queue)
3504     MSG_LIST            *ptr, *tmp_ptr, *pre=0, *current_head;
3505     CONTROL_MSG         *control_msg_tmp;
3506     gni_return_t        status;
3507     int                 done = 1;
3508     uint64_t            register_size;
3509     void                *register_addr;
3510     int                 index_previous = -1;
3511 #if     CMI_SENDBUFFERSMSG_CAP
3512     int                 sent_length = 0;
3513 #endif
3514     int          index = 0;
3515     memset(destpe_avail, 0, mysize * sizeof(char));
3516     for (index=0; index<1; index++)
3517     {
3518 #if CMK_DEBUG
3519         CmiPrintf("[%d] Called SendBufferMsg\n", CmiMyPe());
3520 #endif
3521         int i, len;
3522 #if CMK_LOCKLESS_QUEUE
3523         len = MPMCQueueLength(queue->sendMsgBuf);
3524 #else
3525         len = PCQueueLength(queue->sendMsgBuf);
3526 #endif
3527         for (i=0; i<len; i++) 
3528         {
3529 #if CMK_LOCKLESS_QUEUE
3530             ptr = (MSG_LIST*)MPMCQueuePop(queue->sendMsgBuf);
3531 #else
3532             CMI_PCQUEUEPOP_LOCK(queue->sendMsgBuf)
3533             ptr = (MSG_LIST*)PCQueuePop(queue->sendMsgBuf);
3534             CMI_PCQUEUEPOP_UNLOCK(queue->sendMsgBuf)
3535 #endif
3536             if(ptr == NULL) break;
3537             if (destpe_avail[ptr->destNode] == 1) {       /* can't send to this pe */
3538 #if CMK_LOCKLESS_QUEUE
3539                 MPMCQueuePush(queue->sendMsgBuf, (char*)ptr);
3540 #else
3541                 PCQueuePush(queue->sendMsgBuf, (char*)ptr);
3542 #endif
3543                 continue;
3544             }
3545 #if CMK_SMP
3546             if (ptr->tag == RDMA_COMM_PERFORM_GET_TAG) {
3547                 // Comm thread performing GET on behalf of worker thread for Direct API
3548                 _performOneRgetForWorkerThread(ptr);
3549                 FreeMsgList(ptr);
3550                 continue;
3551             }
3552             else if (ptr->tag == RDMA_COMM_PERFORM_PUT_TAG) {
3553                 // Comm thread performing PUT on behalf of worker thread for Direct API
3554                 _performOneRputForWorkerThread(ptr);
3555                 FreeMsgList(ptr);
3556                 continue;
3557             }
3558 #endif
3559             status = _sendOneBufferedSmsg(queue, ptr);
3560 #if CMI_SENDBUFFERSMSG_CAP
3561             sent_length++;
3562 #endif
3563             if(status == GNI_RC_SUCCESS)
3564             {
3565 #if PRINT_SYH
3566                 buffered_smsg_counter--;
3567                 printf("[%d==>%d] buffered smsg sending done\n", myrank, ptr->destNode);
3568 #endif
3569                 FreeMsgList(ptr);
3570             }else {
3571 #if CMK_LOCKLESS_QUEUE
3572                 MPMCQueuePush(queue->sendMsgBuf, (char*)ptr);
3573 #else
3574                 PCQueuePush(queue->sendMsgBuf, (char*)ptr);
3575 #endif
3576                 done = 0;
3577                 if(status == GNI_RC_ERROR_RESOURCE)
3578                 {
3579                     destpe_avail[ptr->destNode] = 1;
3580                 }
3581             } 
3582         } //end while
3583     }   // end pooling for all cores
3584     return done;
3587 #else   /*  ! ONE_SEND_QUEUE  */
3589 static int SendBufferMsg(SMSG_QUEUE *queue, SMSG_QUEUE *prio_queue)
3591     MSG_LIST            *ptr;
3592     gni_return_t        status;
3593     int                 done = 1;
3594 #if     CMI_SENDBUFFERSMSG_CAP
3595     int                 sent_length = 0;
3596 #endif
3597     int idx;
3598 #if SMP_LOCKS
3599     int          index = -1;
3600     int nonempty = PCQueueLength(queue->nonEmptyQueues);
3601     for(idx =0; idx<nonempty; idx++) 
3602     {
3603         index++;  if (index >= nonempty) index = 0;
3604 #if CMI_SENDBUFFERSMSG_CAP
3605         if ( sent_length >= SendBufferMsg_cap) { done = 0; return done;}
3606 #endif
3607         CMI_PCQUEUEPOP_LOCK(queue->nonEmptyQueues)
3608         MSG_LIST_INDEX *current_list = (MSG_LIST_INDEX *)PCQueuePop(queue->nonEmptyQueues);
3609         CMI_PCQUEUEPOP_UNLOCK(queue->nonEmptyQueues)
3610         if(current_list == NULL) break; 
3611         if (prio_queue && PCQueueLength(prio_queue->smsg_msglist_index[current_list->destpe].sendSmsgBuf) != 0) {
3612             PCQueuePush(queue->nonEmptyQueues, (char*)current_list);
3613             continue;
3614         }
3615         PCQueue current_queue= current_list->sendSmsgBuf;
3616         CmiLock(current_list->lock);
3617         int i, len = PCQueueLength(current_queue);
3618         current_list->pushed = 0;
3619         CmiUnlock(current_list->lock);
3620 #else      /* ! SMP_LOCKS */
3621     static int          index = -1;
3622     for(idx =0; idx<mysize; idx++) 
3623     {
3624         index++;  if (index == mysize) index = 0;
3625 #if CMI_SENDBUFFERSMSG_CAP
3626         if ( sent_length >= SendBufferMsg_cap) { done = 0; return done;}
3627 #endif
3628         if (prio_queue && PCQueueLength(prio_queue->smsg_msglist_index[index].sendSmsgBuf) != 0) continue;             // check urgent queue
3629         //if (index == myrank) continue;
3630         PCQueue current_queue = queue->smsg_msglist_index[index].sendSmsgBuf;
3631         int i, len = PCQueueLength(current_queue);
3632 #endif
3633         for (i=0; i<len; i++)  {
3634             CMI_PCQUEUEPOP_LOCK(current_queue)
3635             ptr = (MSG_LIST*)PCQueuePop(current_queue);
3636             CMI_PCQUEUEPOP_UNLOCK(current_queue)
3637             if (ptr == 0) break;
3638 #if CMK_SMP
3639             if (ptr->tag == RDMA_COMM_PERFORM_GET_TAG) {
3640                 // Comm thread performing GET on behalf of worker thread for Direct API
3641                 _performOneRgetForWorkerThread(ptr);
3642                 FreeMsgList(ptr);
3643                 continue;
3644             }
3645             else if (ptr->tag == RDMA_COMM_PERFORM_PUT_TAG) {
3646                 // Comm thread performing PUT on behalf of worker thread for Direct API
3647                 _performOneRputForWorkerThread(ptr);
3648                 FreeMsgList(ptr);
3649                 continue;
3650             }
3651 #endif
3652             status = _sendOneBufferedSmsg(queue, ptr);
3653 #if CMI_SENDBUFFERSMSG_CAP
3654             sent_length++;
3655 #endif
3656             if(status == GNI_RC_SUCCESS)
3657             {
3658 #if PRINT_SYH
3659                 buffered_smsg_counter--;
3660                 printf("[%d==>%d] buffered smsg sending done\n", myrank, ptr->destNode);
3661 #endif
3662                 FreeMsgList(ptr);
3663             }else {
3664                 PCQueuePush(current_queue, (char*)ptr);
3665                 done = 0;
3666                 if(status == GNI_RC_ERROR_RESOURCE)
3667                 {
3668                     break;
3669                 }
3670             } 
3671         } //end for i
3672 #if SMP_LOCKS
3673         CmiLock(current_list->lock);
3674         if(!PCQueueEmpty(current_queue) && current_list->pushed == 0)
3675         {
3676             current_list->pushed = 1;
3677             PCQueuePush(queue->nonEmptyQueues, (char*)current_list);
3678         }
3679         CmiUnlock(current_list->lock); 
3680 #endif
3681     }   // end pooling for all cores
3682     return done;
3685 #endif
3687 static void ProcessDeadlock(void);
3688 void LrtsAdvanceCommunication(int whileidle)
3690     static int count = 0;
3691     /*  Receive Msg first */
3692 #if CMK_SMP_TRACE_COMMTHREAD
3693     double startT, endT;
3694 #endif
3695     if (useDynamicSMSG && whileidle)
3696     {
3697 #if CMK_SMP_TRACE_COMMTHREAD
3698         startT = CmiWallTimer();
3699 #endif
3700         STATS_PUMPDATAGRAMCONNECTION_TIME(PumpDatagramConnection());
3701 #if CMK_SMP_TRACE_COMMTHREAD
3702         endT = CmiWallTimer();
3703         if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_SetupConnect, startT, endT);
3704 #endif
3705     }
3707     SEND_OOB_SMSG(smsg_oob_queue)
3708     PUMP_REMOTE_HIGHPRIORITY
3709     PUMP_LOCAL_HIGHPRIORITY
3710     POST_HIGHPRIORITY_RDMA
3711     // Receiving small messages and persistent
3712 #if CMK_SMP_TRACE_COMMTHREAD
3713     startT = CmiWallTimer();
3714 #endif
3715     STATS_PUMPNETWORK_TIME(PumpNetworkSmsg());
3716 #if CMK_SMP_TRACE_COMMTHREAD
3717     endT = CmiWallTimer();
3718     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_PumpSmsg, startT, endT);
3719 #endif
3721     SEND_OOB_SMSG(smsg_oob_queue)
3722     PUMP_REMOTE_HIGHPRIORITY
3723     PUMP_LOCAL_HIGHPRIORITY
3724     POST_HIGHPRIORITY_RDMA
3725     
3726     ///* Send buffered Message */
3727 #if CMK_SMP_TRACE_COMMTHREAD
3728     startT = CmiWallTimer();
3729 #endif
3730 #if CMK_USE_OOB
3731     STATS_SEND_SMSGS_TIME(SendBufferMsg(&smsg_queue, &smsg_oob_queue));
3732 #else
3733     STATS_SEND_SMSGS_TIME(SendBufferMsg(&smsg_queue, NULL));
3734 #endif
3735 #if CMK_SMP_TRACE_COMMTHREAD
3736     endT = CmiWallTimer();
3737     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_SendBufferSmsg, startT, endT);
3738 #endif
3740     SEND_OOB_SMSG(smsg_oob_queue)
3741     PUMP_REMOTE_HIGHPRIORITY
3742     PUMP_LOCAL_HIGHPRIORITY
3743     POST_HIGHPRIORITY_RDMA
3745     //Pump Get messages or PUT messages
3746 #if CMK_SMP_TRACE_COMMTHREAD
3747     startT = CmiWallTimer();
3748 #endif
3749     PumpLocalTransactions(default_tx_cqh, default_tx_cq_lock);
3750 #if MULTI_THREAD_SEND
3751     STATS_PUMPLOCALTRANSACTIONS_RDMA_TIME(PumpLocalTransactions(rdma_tx_cqh,  rdma_tx_cq_lock));
3752 #endif
3753 #if CMK_SMP_TRACE_COMMTHREAD
3754     endT = CmiWallTimer();
3755     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_PumpTransaction, startT, endT);
3756 #endif
3757     
3758     SEND_OOB_SMSG(smsg_oob_queue)
3759     PUMP_REMOTE_HIGHPRIORITY
3760     PUMP_LOCAL_HIGHPRIORITY
3761     POST_HIGHPRIORITY_RDMA
3762     //Pump Remote event
3763 #if CMK_SMP_TRACE_COMMTHREAD
3764     startT = CmiWallTimer();
3765 #endif
3766 #if CQWRITE
3767     PumpCqWriteTransactions();
3768 #endif
3769 #if CMK_ONESIDED_IMPL
3770     PumpOneSidedRDMATransactions(rdma_onesided_cqh, rdma_onesided_cq_lock);
3771 #endif
3772 #if REMOTE_EVENT
3773     STATS_PUMPREMOTETRANSACTIONS_TIME(PumpRemoteTransactions(rdma_rx_cqh));
3774 #endif
3775 #if CMK_SMP_TRACE_COMMTHREAD
3776     endT = CmiWallTimer();
3777     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_PumpRdmaTransaction, startT, endT);
3778 #endif
3780     SEND_OOB_SMSG(smsg_oob_queue)
3781     PUMP_REMOTE_HIGHPRIORITY
3782     PUMP_LOCAL_HIGHPRIORITY
3783     POST_HIGHPRIORITY_RDMA
3785 #if CMK_SMP_TRACE_COMMTHREAD
3786     startT = CmiWallTimer();
3787 #endif
3788     STATS_SENDRDMAMSG_TIME(SendRdmaMsg(sendRdmaBuf));
3789 #if CMK_SMP_TRACE_COMMTHREAD
3790     endT = CmiWallTimer();
3791     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(event_SendFmaRdmaMsg, startT, endT);
3792 #endif
3794 #if CMK_SMP && ! LARGEPAGE
3795     if (_detected_hang)  ProcessDeadlock();
3796 #endif
3799 static void set_smsg_max(void)
3801     char *env;
3803     if(mysize <=512)
3804     {
3805         SMSG_MAX_MSG = 1024;
3806     }else if (mysize <= 4096)
3807     {
3808         SMSG_MAX_MSG = 1024;
3809     }else if (mysize <= 16384)
3810     {
3811         SMSG_MAX_MSG = 512;
3812     }else {
3813         SMSG_MAX_MSG = 256;
3814     }
3816     env = getenv("CHARM_UGNI_SMSG_MAX_SIZE");
3817     if (env) SMSG_MAX_MSG = atoi(env);
3818     CmiAssert(SMSG_MAX_MSG > 0);
3819 }    
3821 /* useDynamicSMSG */
3822 static void _init_dynamic_smsg(void)
3824     gni_return_t status;
3825     uint32_t     vmdh_index = -1;
3826     int i;
3828     smsg_attr_vector_local = (gni_smsg_attr_t**)malloc(mysize * sizeof(gni_smsg_attr_t*));
3829     smsg_attr_vector_remote = (gni_smsg_attr_t**)malloc(mysize * sizeof(gni_smsg_attr_t*));
3830     smsg_connected_flag = (int*)malloc(sizeof(int)*mysize);
3831     for(i=0; i<mysize; i++) {
3832         smsg_connected_flag[i] = 0;
3833         smsg_attr_vector_local[i] = NULL;
3834         smsg_attr_vector_remote[i] = NULL;
3835     }
3837     set_smsg_max();
3839     send_smsg_attr.msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
3840     send_smsg_attr.mbox_maxcredit = SMSG_MAX_CREDIT;
3841     send_smsg_attr.msg_maxsize = SMSG_MAX_MSG;
3842     status = GNI_SmsgBufferSizeNeeded(&send_smsg_attr, &smsg_memlen);
3843     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
3845     mailbox_list = (dynamic_smsg_mailbox_t*)malloc(sizeof(dynamic_smsg_mailbox_t));
3846     mailbox_list->size = smsg_memlen*avg_smsg_connection;
3847     posix_memalign(&mailbox_list->mailbox_base, 64, mailbox_list->size);
3848     memset(mailbox_list->mailbox_base, 0, mailbox_list->size);
3849     mailbox_list->offset = 0;
3850     mailbox_list->next = 0;
3851     
3852     status = GNI_MemRegister(nic_hndl, (uint64_t)(mailbox_list->mailbox_base),
3853         mailbox_list->size, smsg_rx_cqh,
3854         GNI_MEM_READWRITE,   
3855         vmdh_index,
3856         &(mailbox_list->mem_hndl));
3857     GNI_RC_CHECK("MEMORY registration for smsg", status);
3859     status = GNI_EpCreate(nic_hndl, default_tx_cqh, &ep_hndl_unbound);
3860     GNI_RC_CHECK("Unbound EP", status);
3861     
3862     alloc_smsg_attr(&send_smsg_attr);
3864     status = GNI_EpPostDataWId (ep_hndl_unbound, &send_smsg_attr,  SMSG_ATTR_SIZE, &recv_smsg_attr, SMSG_ATTR_SIZE, myrank);
3865     GNI_RC_CHECK("post unbound datagram", status);
3867       /* always pre-connect to proc 0 */
3868     //if (myrank != 0) connect_to(0);
3870     status = GNI_SmsgSetMaxRetrans(nic_hndl, 4096);
3871     GNI_RC_CHECK("SmsgSetMaxRetrans Init", status);
3874 static void _init_static_smsg(void)
3876     gni_smsg_attr_t      *smsg_attr;
3877     gni_smsg_attr_t      remote_smsg_attr;
3878     gni_smsg_attr_t      *smsg_attr_vec;
3879     gni_mem_handle_t     my_smsg_mdh_mailbox;
3880     int      ret, i;
3881     gni_return_t status;
3882     uint32_t              vmdh_index = -1;
3883     mdh_addr_t            base_infor;
3884     mdh_addr_t            *base_addr_vec;
3886     set_smsg_max();
3887     
3888     smsg_attr = (gni_smsg_attr_t *)malloc(mysize * sizeof(gni_smsg_attr_t));
3889     
3890     smsg_attr[0].msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
3891     smsg_attr[0].mbox_maxcredit = SMSG_MAX_CREDIT;
3892     smsg_attr[0].msg_maxsize = SMSG_MAX_MSG;
3893     status = GNI_SmsgBufferSizeNeeded(&smsg_attr[0], &smsg_memlen);
3894     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
3895     ret = posix_memalign(&smsg_mailbox_base, 64, smsg_memlen*(mysize));
3896     CmiAssert(ret == 0);
3897     memset(smsg_mailbox_base, 0, smsg_memlen*(mysize));
3898     
3899     status = GNI_MemRegister(nic_hndl, (uint64_t)smsg_mailbox_base,
3900             smsg_memlen*(mysize), smsg_rx_cqh,
3901             GNI_MEM_READWRITE,   
3902             vmdh_index,
3903             &my_smsg_mdh_mailbox);
3904     register_memory_size += smsg_memlen*(mysize);
3905     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
3907     if (myrank == 0 && !quietMode)  printf("Charm++> SMSG memory: %1.1fKB\n", 1.0*smsg_memlen*(mysize)/1024);
3908     if (myrank == 0 && register_memory_size>=MAX_REG_MEM ) printf("Charm++> FATAL ERROR your program has risk of hanging \n please set CHARM_UGNI_MEMPOOL_MAX  to a larger value or use Dynamic smsg\n");
3910     base_infor.addr =  (uint64_t)smsg_mailbox_base;
3911     base_infor.mdh =  my_smsg_mdh_mailbox;
3912     base_addr_vec = (mdh_addr_t *)malloc(mysize * sizeof(mdh_addr_t));
3914     allgather(&base_infor, base_addr_vec,  sizeof(mdh_addr_t));
3916     for(i=0; i<mysize; i++)
3917     {
3918         if(i==myrank)
3919             continue;
3920         smsg_attr[i].msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
3921         smsg_attr[i].mbox_maxcredit = SMSG_MAX_CREDIT;
3922         smsg_attr[i].msg_maxsize = SMSG_MAX_MSG;
3923         smsg_attr[i].mbox_offset = i*smsg_memlen;
3924         smsg_attr[i].buff_size = smsg_memlen;
3925         smsg_attr[i].msg_buffer = smsg_mailbox_base ;
3926         smsg_attr[i].mem_hndl = my_smsg_mdh_mailbox;
3927     }
3929     for(i=0; i<mysize; i++)
3930     {
3931         if (myrank == i) continue;
3933         remote_smsg_attr.msg_type = GNI_SMSG_TYPE_MBOX_AUTO_RETRANSMIT;
3934         remote_smsg_attr.mbox_maxcredit = SMSG_MAX_CREDIT;
3935         remote_smsg_attr.msg_maxsize = SMSG_MAX_MSG;
3936         remote_smsg_attr.mbox_offset = myrank*smsg_memlen;
3937         remote_smsg_attr.buff_size = smsg_memlen;
3938         remote_smsg_attr.msg_buffer = (void*)base_addr_vec[i].addr;
3939         remote_smsg_attr.mem_hndl = base_addr_vec[i].mdh;
3941         /* initialize the smsg channel */
3942         status = GNI_SmsgInit(ep_hndl_array[i], &smsg_attr[i], &remote_smsg_attr);
3943         GNI_RC_CHECK("SMSG Init", status);
3944     } //end initialization
3946     free(base_addr_vec);
3947     free(smsg_attr);
3949     status = GNI_SmsgSetMaxRetrans(nic_hndl, 4096);
3950     GNI_RC_CHECK("SmsgSetMaxRetrans Init", status);
3953 INLINE_KEYWORD
3954 static void _init_send_queue(SMSG_QUEUE *queue)
3956      int i;
3957 #if ONE_SEND_QUEUE
3958 #if CMK_LOCKLESS_QUEUE
3959      queue->sendMsgBuf = MPMCQueueCreate();
3960 #else
3961      queue->sendMsgBuf = PCQueueCreate();
3962 #endif
3963      destpe_avail = (char*)malloc(mysize * sizeof(char));
3964 #else
3965      queue->smsg_msglist_index = (MSG_LIST_INDEX*)malloc(mysize*sizeof(MSG_LIST_INDEX));
3966 #if SMP_LOCKS
3967      queue->nonEmptyQueues = PCQueueCreate();
3968 #endif
3969      for(i =0; i<mysize; i++)
3970      {
3971          queue->smsg_msglist_index[i].sendSmsgBuf = PCQueueCreate();
3972 #if SMP_LOCKS
3973          queue->smsg_msglist_index[i].pushed = 0;
3974          queue->smsg_msglist_index[i].lock = CmiCreateLock();
3975          queue->smsg_msglist_index[i].destpe = i;
3976 #endif
3977      }
3978 #endif
3981 INLINE_KEYWORD
3982 static void _init_smsg(void)
3984     if(mysize > 1) {
3985         if (useDynamicSMSG)
3986             _init_dynamic_smsg();
3987         else
3988             _init_static_smsg();
3989     }
3991     _init_send_queue(&smsg_queue);
3992 #if CMK_USE_OOB
3993     _init_send_queue(&smsg_oob_queue);
3994 #endif
3997 static void _init_static_msgq(void)
3999     gni_return_t status;
4000     /* MSGQ is to send and receive short messages for large jobs (exceeding 200,000 ranks). The          performance scales by the node count rather than rank count */
4001     msgq_attrs.max_msg_sz = MSGQ_MAXSIZE;
4002     msgq_attrs.smsg_q_sz = 1;
4003     msgq_attrs.rcv_pool_sz = 1;
4004     msgq_attrs.num_msgq_eps = 2;
4005     msgq_attrs.nloc_insts = 8;
4006     msgq_attrs.modes = 0;
4007     msgq_attrs.rcv_cq_sz = REMOTE_QUEUE_ENTRIES ;
4009     status = GNI_MsgqInit(nic_hndl, NULL, NULL, NULL, &msgq_attrs, &msgq_handle);
4010     GNI_RC_CHECK("MSGQ Init", status);
4016 static CmiUInt8 total_mempool_size = 0;
4017 static CmiUInt8 total_mempool_calls = 0;
4019 #if USE_LRTS_MEMPOOL
4021 INLINE_KEYWORD
4022 static void *_alloc_mempool_block(size_t *size, gni_mem_handle_t *mem_hndl, int expand_flag, gni_cq_handle_t cqh)
4024     void *pool;
4025     int ret;
4026     gni_return_t status = GNI_RC_SUCCESS;
4028     size_t default_size =  expand_flag? _expand_mem : _mempool_size;
4029     if (*size < default_size) *size = default_size;
4030 #if LARGEPAGE
4031     // round up to be multiple of _tlbpagesize
4032     //*size = (*size + _tlbpagesize - 1)/_tlbpagesize*_tlbpagesize;
4033     *size = ALIGNHUGEPAGE(*size);
4034 #endif
4035     total_mempool_size += *size;
4036     total_mempool_calls += 1;
4037 #if   !LARGEPAGE
4038     if ((*size > MAX_REG_MEM || *size > MAX_BUFF_SEND) && expand_flag) 
4039     {
4040         printf("Error: A mempool block with size %lld is allocated, which is greater than the maximum mempool allowed.\n Please increase the max pool size by using +gni-mempool-max or set enviorment variable CHARM_UGNI_MEMPOOL_MAX. (current=%lld, %lld)\n", *size, MAX_REG_MEM, MAX_BUFF_SEND);
4041         CmiAbort("alloc_mempool_block");
4042     }
4043 #endif
4044 #if LARGEPAGE
4045     pool = my_get_huge_pages(*size);
4046     ret = pool==NULL;
4047 #else
4048     ret = posix_memalign(&pool, ALIGNBUF, *size);
4049 #endif
4050     if (ret != 0) {
4051       printf("Charm++> can not allocate memory pool of size %.2fMB. \n", 1.0*(*size)/1024/1024);
4052       if (ret == ENOMEM)
4053         CmiAbort("alloc_mempool_block: out of memory.");
4054       else
4055         CmiAbort("alloc_mempool_block: posix_memalign failed");
4056     }
4057 #if LARGEPAGE
4058     CmiMemLock();
4059     register_count++;
4060     MEMORY_REGISTER(onesided_hnd, nic_hndl, pool, *size, mem_hndl, &omdh, cqh, status);
4061     CmiMemUnlock();
4062     if(status != GNI_RC_SUCCESS) {
4063         printf("[%d, %d] memory reigstration %f G (%lld) ask for %lld\n", myrank, CmiMyRank(), register_memory_size/(1024*1024.0*1024),register_count, *size);
4064 sweep_mempool(CpvAccess(mempool));
4065     }
4066     GNI_RC_CHECK("MEMORY_REGISTER", status);
4067 #else
4068     SetMemHndlZero((*mem_hndl));
4069 #endif
4070     return pool;
4073 INLINE_KEYWORD
4074 static void *alloc_mempool_block(size_t *size, gni_mem_handle_t *mem_hndl, int expand_flag)
4076     return _alloc_mempool_block(size, mem_hndl, expand_flag, rdma_rx_cqh);
4079 #if CMK_PERSISTENT_COMM_PUT
4080 INLINE_KEYWORD
4081 static void *alloc_persistent_mempool_block(size_t *size, gni_mem_handle_t *mem_hndl, int expand_flag)
4083     return _alloc_mempool_block(size, mem_hndl, expand_flag, highpriority_rx_cqh);
4085 #endif
4087 // ptr is a block head pointer
4088 void free_mempool_block(void *ptr, gni_mem_handle_t mem_hndl)
4090     block_header *bh = (block_header *)ptr;
4091     if(!(IsMemHndlZero(mem_hndl)))
4092     {
4093         MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &mem_hndl, &omdh, GetSizeFromBlockHeader(bh));
4094     }
4095 #if LARGEPAGE
4096     my_free_huge_pages(ptr, GetSizeFromBlockHeader(bh));
4097 #else
4098     free(ptr);
4099 #endif
4101 #endif
4103 void LrtsPreCommonInit(int everReturn){
4104 #if USE_LRTS_MEMPOOL
4105     CpvInitialize(mempool_type*, mempool);
4106     CpvAccess(mempool) = mempool_init(_mempool_size, alloc_mempool_block, free_mempool_block, _mempool_size_limit);
4107 #if CMK_PERSISTENT_COMM_PUT
4108     CpvInitialize(mempool_type*, persistent_mempool);
4109     CpvAccess(persistent_mempool) = mempool_init(_mempool_size, alloc_persistent_mempool_block, free_mempool_block, _mempool_size_limit);
4110 #endif
4111     MACHSTATE2(8, "mempool_init %d %p\n", CmiMyRank(), CpvAccess(mempool)) ; 
4112 #endif
4115 CMI_EXTERNC_VARIABLE int quietMode;
4117 void LrtsInit(int *argc, char ***argv, int *numNodes, int *myNodeID)
4119     register int            i;
4120     int                     rc;
4121     int                     device_id = 0;
4122     unsigned int            remote_addr;
4123     gni_cdm_handle_t        cdm_hndl;
4124     gni_return_t            status = GNI_RC_SUCCESS;
4125     uint32_t                vmdh_index = -1;
4126     uint8_t                 ptag;
4127     unsigned int            local_addr, *MPID_UGNI_AllAddr;
4128     int                     first_spawned;
4129     int                     physicalID;
4130     char                   *env;
4132     //void (*local_event_handler)(gni_cq_entry_t *, void *)       = &LocalEventHandle;
4133     //void (*remote_smsg_event_handler)(gni_cq_entry_t *, void *) = &RemoteSmsgEventHandle;
4134     //void (*remote_bte_event_handler)(gni_cq_entry_t *, void *)  = &RemoteBteEventHandle;
4135   
4136     if(!CharmLibInterOperate || userDrivenMode) {
4137       status = (gni_return_t)PMI_Init(&first_spawned);
4138       GNI_RC_CHECK("PMI_Init", status);
4139     }
4141     status = (gni_return_t)PMI_Get_size(&mysize);
4142     GNI_RC_CHECK("PMI_Getsize", status);
4144     status = (gni_return_t)PMI_Get_rank(&myrank);
4145     GNI_RC_CHECK("PMI_getrank", status);
4147     //physicalID = CmiPhysicalNodeID(myrank);
4148     
4149     //printf("Pysical Node ID:%d for PE:%d\n", physicalID, myrank);
4151     *myNodeID = myrank;
4152     *numNodes = mysize;
4153   
4154 #if MULTI_THREAD_SEND
4155     /* Currently, we only consider the case that comm. thread will only recv msgs */
4156     Cmi_smp_mode_setting = COMM_WORK_THREADS_SEND_RECV;
4157 #endif
4159 #if CMI_EXERT_SEND_LARGE_CAP
4160     CmiGetArgInt(*argv,"+useSendLargeCap", &SEND_large_cap);
4161 #endif
4163 #if CMI_EXERT_RECV_RDMA_CAP 
4164     CmiGetArgInt(*argv,"+useRecvRdmaCap", &RDMA_cap);
4165 #endif
4166   
4167 #if CMI_SENDBUFFERSMSG_CAP
4168     CmiGetArgInt(*argv,"+useSendBufferCap", &SendBufferMsg_cap);
4169 #endif
4171 #if CMI_PUMPNETWORKSMSG_CAP 
4172     CmiGetArgInt(*argv,"+usePumpSmsgCap", &PumpNetworkSmsg_cap);
4173 #endif
4175     CmiGetArgInt(*argv,"+useRecvQueue", &REMOTE_QUEUE_ENTRIES);
4176     
4177     env = getenv("CHARM_UGNI_REMOTE_QUEUE_SIZE");
4178     if (env) REMOTE_QUEUE_ENTRIES = atoi(env);
4179     CmiGetArgInt(*argv,"+useRecvQueue", &REMOTE_QUEUE_ENTRIES);
4181     env = getenv("CHARM_UGNI_LOCAL_QUEUE_SIZE");
4182     if (env) LOCAL_QUEUE_ENTRIES = atoi(env);
4183     CmiGetArgInt(*argv,"+useSendQueue", &LOCAL_QUEUE_ENTRIES);
4185     env = getenv("CHARM_UGNI_DYNAMIC_SMSG");
4186     if (env) useDynamicSMSG = 1;
4187     if (!useDynamicSMSG)
4188       useDynamicSMSG = CmiGetArgFlag(*argv, "+useDynamicSmsg");
4189     CmiGetArgIntDesc(*argv, "+smsgConnection", &avg_smsg_connection,"Initial number of SMSGS connection per code");
4190     if (avg_smsg_connection>mysize) avg_smsg_connection = mysize;
4191     //useStaticMSGQ = CmiGetArgFlag(*argv, "+useStaticMsgQ");
4192     
4193     if ((myrank == 0) && (!quietMode))
4194     {
4195         printf("Charm++> Running on Gemini (GNI) with %d processes\n", mysize);
4196         printf("Charm++> %s SMSG\n", useDynamicSMSG?"dynamic":"static");
4197     }
4198 #ifdef USE_ONESIDED
4199     onesided_init(NULL, &onesided_hnd);
4201     // this is a GNI test, so use the libonesided bypass functionality
4202     onesided_gni_bypass_get_nih(onesided_hnd, &nic_hndl);
4203     local_addr = gniGetNicAddress();
4204 #else
4205     ptag = get_ptag();
4206     cookie = get_cookie();
4207 #if 0
4208     modes = GNI_CDM_MODE_CQ_NIC_LOCAL_PLACEMENT;
4209 #endif
4210     //Create and attach to the communication  domain */
4211     status = GNI_CdmCreate(myrank, ptag, cookie, modes, &cdm_hndl);
4212     GNI_RC_CHECK("GNI_CdmCreate", status);
4213     //* device id The device id is the minor number for the device
4214     //that is assigned to the device by the system when the device is created.
4215     //To determine the device number, look in the /dev directory, which contains a list of devices. For a NIC, the device is listed as kgniX
4216     //where X is the device number 0 default 
4217     // GNI_CdmAttach adds about 1GB memory usage
4218     status = GNI_CdmAttach(cdm_hndl, device_id, &local_addr, &nic_hndl);
4219     GNI_RC_CHECK("GNI_CdmAttach", status);
4220     local_addr = get_gni_nic_address(0);
4221 #endif
4222     MPID_UGNI_AllAddr = (unsigned int *)malloc(sizeof(unsigned int) * mysize);
4223     _MEMCHECK(MPID_UGNI_AllAddr);
4224     allgather(&local_addr, MPID_UGNI_AllAddr, sizeof(unsigned int));
4225     /* create the local completion queue */
4226     /* the third parameter : The number of events the NIC allows before generating an interrupt. Setting this parameter to zero results in interrupt delivery with every event. When using this parameter, the mode parameter must be set to GNI_CQ_BLOCKING*/
4227     status = GNI_CqCreate(nic_hndl, LOCAL_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &default_tx_cqh);
4228     GNI_RC_CHECK("GNI_CqCreate (tx)", status);
4229 #if MULTI_THREAD_SEND
4230     status = GNI_CqCreate(nic_hndl, LOCAL_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &rdma_tx_cqh);
4231     GNI_RC_CHECK("GNI_CqCreate RDMA (tx)", status);
4232 #endif
4234 #if CMK_USE_OOB
4235     status = GNI_CqCreate(nic_hndl, LOCAL_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &highprior_rdma_tx_cqh);
4236     GNI_RC_CHECK("GNI_CqCreate high priority RDMA (tx)", status);
4237 #endif
4238     /* create the destination completion queue for receiving micro-messages, make this queue considerably larger than the number of transfers */
4240     status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &smsg_rx_cqh);
4241     GNI_RC_CHECK("Create CQ (rx)", status);
4242     
4243     status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &rdma_rx_cqh);
4244     GNI_RC_CHECK("Create Post CQ (rx)", status);
4245    
4246 #if CMK_PERSISTENT_COMM_PUT
4247     status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &highpriority_rx_cqh);
4248     GNI_RC_CHECK("Create Post CQ (rx)", status);
4249 #endif
4251 #if CMK_ONESIDED_IMPL
4252     _initOnesided();
4253 #endif
4254     //status = GNI_CqCreate(nic_hndl, REMOTE_QUEUE_ENTRIES, 0, GNI_CQ_NOBLOCK, NULL, NULL, &rdma_cqh);
4255     //GNI_RC_CHECK("Create BTE CQ", status);
4257     /* create the endpoints. they need to be bound to allow later CQWrites to them */
4258     ep_hndl_array = (gni_ep_handle_t*)malloc(mysize * sizeof(gni_ep_handle_t));
4259     _MEMCHECK(ep_hndl_array);
4260 #if MULTI_THREAD_SEND 
4261     rx_cq_lock = global_gni_lock = default_tx_cq_lock = smsg_mailbox_lock = CmiCreateLock();
4262     //default_tx_cq_lock = CmiCreateLock();
4263     rdma_tx_cq_lock = CmiCreateLock();
4264     smsg_rx_cq_lock = CmiCreateLock();
4265     //global_gni_lock  = CmiCreateLock();
4266     //rx_cq_lock  = CmiCreateLock();
4267 #endif
4268     for (i=0; i<mysize; i++) {
4269         if(i == myrank) continue;
4270         status = GNI_EpCreate(nic_hndl, default_tx_cqh, &ep_hndl_array[i]);
4271         GNI_RC_CHECK("GNI_EpCreate ", status);   
4272         remote_addr = MPID_UGNI_AllAddr[i];
4273         status = GNI_EpBind(ep_hndl_array[i], remote_addr, i);
4274         GNI_RC_CHECK("GNI_EpBind ", status);   
4275     }
4277     /* SMSG is fastest but not scale; Msgq is scalable, FMA is own implementation for small message */
4278     _init_smsg();
4279     PMI_Barrier();
4281 #if     USE_LRTS_MEMPOOL
4282     env = getenv("CHARM_UGNI_MAX_MEMORY_ON_NODE");
4283     if (env) {
4284         _totalmem = CmiReadSize(env);
4285         if ((myrank == 0) && (!quietMode))
4286             printf("Charm++> total registered memory available per node is %.1fGB\n", (float)(_totalmem*1.0/oneGB));
4287     }
4289     env = getenv("CHARM_UGNI_MEMPOOL_INIT_SIZE");
4290     if (env) _mempool_size = CmiReadSize(env);
4291     if (CmiGetArgStringDesc(*argv,"+gni-mempool-init-size",&env,"Set the memory pool size")) 
4292         _mempool_size = CmiReadSize(env);
4295     env = getenv("CHARM_UGNI_MEMPOOL_MAX");
4296     if (env) {
4297         MAX_REG_MEM = CmiReadSize(env);
4298         user_set_flag = 1;
4299     }
4300     if (CmiGetArgStringDesc(*argv,"+gni-mempool-max",&env,"Set the memory pool max size"))  {
4301         MAX_REG_MEM = CmiReadSize(env);
4302         user_set_flag = 1;
4303     }
4305     env = getenv("CHARM_UGNI_SEND_MAX");
4306     if (env) {
4307         MAX_BUFF_SEND = CmiReadSize(env);
4308         user_set_flag = 1;
4309     }
4310     if (CmiGetArgStringDesc(*argv,"+gni-mempool-max-send",&env,"Set the memory pool max size for send"))  {
4311         MAX_BUFF_SEND = CmiReadSize(env);
4312         user_set_flag = 1;
4313     }
4315     env = getenv("CHARM_UGNI_MEMPOOL_SIZE_LIMIT");
4316     if (env) {
4317         _mempool_size_limit = CmiReadSize(env);
4318     }
4320     if (MAX_REG_MEM < _mempool_size) MAX_REG_MEM = _mempool_size;
4321     if (MAX_BUFF_SEND > MAX_REG_MEM)  MAX_BUFF_SEND = MAX_REG_MEM;
4323     if (myrank==0) {
4324         if (!quietMode) printf("Charm++> memory pool init block size: %1.fMB, total memory pool limit %1.fMB (0 means no limit)\n", _mempool_size/1024.0/1024, _mempool_size_limit/1024.0/1024);
4325         if (!quietMode) printf("Charm++> memory pool registered memory limit: %1.fMB, send limit: %1.fMB\n", MAX_REG_MEM/1024.0/1024, MAX_BUFF_SEND/1024.0/1024);
4326         if (MAX_REG_MEM < BIG_MSG * 2 + oneMB)  {
4327             /* memblock can expand to BIG_MSG * 2 size */
4328             if (!quietMode) printf("Charm++ Error: The mempool maximum size is too small, please use command line option +gni-mempool-max or environment variable CHARM_UGNI_MEMPOOL_MAX to increase the value to at least %1.fMB.\n",  BIG_MSG * 2.0/1024/1024 + 1);
4329             CmiAbort("mempool maximum size is too small. \n");
4330         }
4331 #if MULTI_THREAD_SEND
4332         if (!quietMode) printf("Charm++> worker thread sending messages\n");
4333 #elif COMM_THREAD_SEND
4334         if (!quietMode) printf("Charm++> only comm thread send/recv messages\n");
4335 #endif
4336     }
4338 #endif     /* end of USE_LRTS_MEMPOOL */
4340     env = getenv("CHARM_UGNI_BIG_MSG_SIZE");
4341     if (env) {
4342         BIG_MSG = CmiReadSize(env);
4343         if (BIG_MSG < ONE_SEG)
4344           CmiAbort("BIG_MSG size is too small in the environment variable CHARM_UGNI_BIG_MSG_SIZE.");
4345     }
4346     env = getenv("CHARM_UGNI_BIG_MSG_PIPELINE_LEN");
4347     if (env) {
4348         BIG_MSG_PIPELINE = atoi(env);
4349     }
4351     env = getenv("CHARM_UGNI_NO_DEADLOCK_CHECK");
4352     if (env) _checkProgress = 0;
4353     if (mysize == 1) _checkProgress = 0;
4355 #if CMI_EXERT_RECV_RDMA_CAP
4356     env = getenv("CHARM_UGNI_RDMA_MAX");
4357     if (env)  {
4358         RDMA_pending = atoi(env);
4359         if ((myrank == 0) && (!quietMode))
4360             printf("Charm++> Max pending RDMA set to: %d\n", RDMA_pending);
4361     }
4362 #endif
4363     
4364     /*
4365     env = getenv("HUGETLB_DEFAULT_PAGE_SIZE");
4366     if (env) 
4367         _tlbpagesize = CmiReadSize(env);
4368     */
4369     /* real gethugepagesize() is only available when hugetlb module linked */
4370 #if LARGEPAGE
4371     _tlbpagesize = gethugepagesize();
4372 #else
4373     _tlbpagesize = getpagesize();
4374 #endif
4375     if ((myrank == 0) && (!quietMode)) {
4376         printf("Charm++> Cray TLB page size: %1.fK\n", _tlbpagesize/1024.0);
4377     }
4379 #if LARGEPAGE
4380     if (_tlbpagesize == 4096) {
4381         CmiAbort("Hugepage module, e.g. craype-hugepages8M must be loaded.");
4382     }
4383 #endif
4385       /* stats related arguments */
4386 #if CMK_WITH_STATS
4387     CmiGetArgStringDesc(*argv,"+gni_stats_root",&counters_dirname,"counter directory name, default counters");
4389     print_stats = CmiGetArgFlag(*argv, "+print_stats");
4390     
4391     stats_off = CmiGetArgFlag(*argv, "+stats_off");
4393     init_comm_stats();
4394 #endif
4396     /* checksum flag */
4397     if (CmiGetArgFlag(*argv,"+checksum")) {
4398 #if CMK_ERROR_CHECKING
4399         checksum_flag = 1;
4400         if (myrank == 0) CmiPrintf("Charm++> CheckSum checking enabled! \n");
4401 #else
4402         if (myrank == 0) CmiPrintf("Charm++> +checksum ignored in optimized version! \n");
4403 #endif
4404     }
4406     /* init DMA buffer for medium message */
4408     //_init_DMA_buffer();
4409     
4410     free(MPID_UGNI_AllAddr);
4412     sendRdmaBuf = PCQueueCreate();
4413     sendHighPriorBuf = PCQueueCreate();
4415 //    NTK_Init();
4416 //    ntk_return_t sts = NTK_System_GetSmpdCount(&_smpd_count);
4418 #if CMK_SMSGS_FREE_AFTER_EVENT
4419     int smsgPoolSize = 16384;
4420     if(mysize*SMSG_MAX_CREDIT < smsgPoolSize)
4421         smsgPoolSize = mysize*SMSG_MAX_CREDIT;
4422     IndexPool_init(&smsgsPool, smsgPoolSize, 1u<<31-1);
4423 #endif
4425 #if  REMOTE_EVENT
4426     SHIFT = 1;
4427     while (1<<SHIFT < mysize) SHIFT++;
4428     CmiAssert(SHIFT < 31);
4429     if ( (1<<(31-SHIFT)) < POOL_INIT_SIZE) CmiAbort("IndexPool_init: pool initial size is too big.");
4430     IndexPool_init(&ackPool, POOL_INIT_SIZE, 1u<<(31-SHIFT));
4431 #if CMK_PERSISTENT_COMM_PUT
4432     IndexPool_init(&persistPool, POOL_INIT_SIZE, 1u<<(31-SHIFT));
4433 #endif
4434 #endif
4437 CMI_EXTERNC
4438 void* LrtsRdmaAlloc(int n_bytes, int header)
4440     void *ptr = NULL;
4441     int alignbuf = ALIGNBUF;
4442     CmiAssert(header+sizeof(mempool_header) <= ALIGNBUF);
4443 #if     USE_LRTS_MEMPOOL
4444     n_bytes = ALIGN64(n_bytes);
4445     int val = ALIGNBUF+n_bytes-sizeof(mempool_header);
4446     if(n_bytes < BIG_MSG)
4447     {
4448         char *res = (char *)mempool_malloc(CpvAccess(mempool), ALIGNBUF+n_bytes-sizeof(mempool_header), 1);
4449         if (res) ptr = res - sizeof(mempool_header) + ALIGNBUF - header;
4450     }else
4451     {
4452 #if LARGEPAGE
4453         n_bytes = ALIGNHUGEPAGE(n_bytes+ALIGNBUF);
4454         char *res = (char *)my_get_huge_pages(n_bytes);
4455 #else
4456         char *res = (char *)memalign(ALIGNBUF, n_bytes+ALIGNBUF);
4457 #endif
4458         if (res) ptr = res + ALIGNBUF - header;
4459     }
4460 #else
4461     n_bytes = ALIGN64(n_bytes);           /* make sure size if 4 aligned */
4462     char *res = (char *)memalign(ALIGNBUF, n_bytes+ALIGNBUF);
4463     ptr = res + ALIGNBUF - header;
4464 #endif
4465     return ptr;
4468 CMI_EXTERNC
4469 void  LrtsRdmaFree(void *msg)
4471     int headersize = sizeof(CmiChunkHeader);
4472     char *aligned_addr = (char *)msg + headersize - ALIGNBUF;
4473     CmiUInt4 size = SIZEFIELD((char*)msg+headersize);
4474     size = ALIGN64(size);
4475     if(size>=BIG_MSG)
4476     {
4477 #if LARGEPAGE
4478         int s = ALIGNHUGEPAGE(size+ALIGNBUF);
4479         my_free_huge_pages(aligned_addr, s);
4480 #else
4481         free((char*)msg + headersize - ALIGNBUF);
4482 #endif
4483     }
4484     else {
4485 #if    USE_LRTS_MEMPOOL
4486 #if CMK_SMP
4487         mempool_free_thread(aligned_addr + sizeof(mempool_header));
4488 #else
4489         mempool_free(CpvAccess(mempool), aligned_addr + sizeof(mempool_header));
4490 #endif
4491 #else
4492         free(aligned_addr);
4493 #endif
4494     }
4497 CMI_EXTERNC
4498 void* LrtsAlloc(int n_bytes, int header)
4500     void *ptr = NULL;
4501 #if 0
4502     printf("\n[PE:%d]Alloc Lrts for bytes=%d, head=%d %d\n", CmiMyPe(), n_bytes, header, SMSG_MAX_MSG);
4503 #endif
4504     if(n_bytes <= SMSG_MAX_MSG)
4505     {
4506         int totalsize = n_bytes+header;
4507         ptr = memalign(ALIGNBUF, totalsize);
4508     }
4509     else {
4510         CmiAssert(header+sizeof(mempool_header) <= ALIGNBUF);
4511 #if     USE_LRTS_MEMPOOL
4512         n_bytes = ALIGN64(n_bytes);
4513         if(n_bytes < BIG_MSG)
4514         {
4515             char *res = (char *)mempool_malloc(CpvAccess(mempool), ALIGNBUF+n_bytes-sizeof(mempool_header), 1);
4516             if (res) ptr = res - sizeof(mempool_header) + ALIGNBUF - header;
4517         }else 
4518         {
4519 #if LARGEPAGE
4520             //printf("[%d] LrtsAlloc a big_msg: %d %d\n", myrank, n_bytes, ALIGNHUGEPAGE(n_bytes+ALIGNBUF));
4521             n_bytes = ALIGNHUGEPAGE(n_bytes+ALIGNBUF);
4522             char *res = (char *)my_get_huge_pages(n_bytes);
4523 #else
4524             char *res = (char *)memalign(ALIGNBUF, n_bytes+ALIGNBUF);
4525 #endif
4526             if (res) ptr = res + ALIGNBUF - header;
4527         }
4528 #else
4529         n_bytes = ALIGN64(n_bytes);           /* make sure size if 4 aligned */
4530         char *res = (char *)memalign(ALIGNBUF, n_bytes+ALIGNBUF);
4531         ptr = res + ALIGNBUF - header;
4532 #endif
4533     }
4534     return ptr;
4537 CMI_EXTERNC
4538 void  LrtsFree(void *msg)
4540     int headersize = sizeof(CmiChunkHeader);
4541     char *aligned_addr = (char *)msg + headersize - ALIGNBUF;
4542     CmiUInt4 size = SIZEFIELD((char*)msg+headersize);
4543 #if CMK_PERSISTENT_COMM_PUT
4544     if (IS_PERSISTENT_MEMORY(msg)) return;
4545 #endif
4546     if (size <= SMSG_MAX_MSG)
4547         free(msg);
4548     else {
4549         size = ALIGN64(size);
4550         if(size>=BIG_MSG)
4551         {
4552 #if LARGEPAGE
4553             int s = ALIGNHUGEPAGE(size+ALIGNBUF);
4554             my_free_huge_pages(aligned_addr, s);
4555 #else
4556             free((char*)msg + headersize - ALIGNBUF);
4557 #endif
4558         }
4559         else {
4560 #if    USE_LRTS_MEMPOOL
4561 #if CMK_SMP
4562             mempool_free_thread(aligned_addr + sizeof(mempool_header));
4563 #else
4564             mempool_free(CpvAccess(mempool), aligned_addr + sizeof(mempool_header));
4565 #endif
4566 #else
4567             free(aligned_addr);
4568 #endif
4569         }
4570     }
4573 void LrtsExit(int exitcode)
4575 #if CMK_WITH_STATS
4576 #if CMK_SMP
4577     if(CmiMyRank() == CmiMyNodeSize())
4578 #endif
4579     if (print_stats) print_comm_stats();
4580 #endif
4581     /* free memory ? */
4582 #if USE_LRTS_MEMPOOL
4583     //printf("FINAL [%d, %d]  register=%lld, send=%lld\n", myrank, CmiMyRank(), register_memory_size, buffered_send_msg); 
4584     mempool_destroy(CpvAccess(mempool));
4585 #endif
4586     if(!CharmLibInterOperate || userDrivenMode) {
4587       PMI_Barrier();
4588       PMI_Finalize();
4589       exit(exitcode);
4590     }
4593 void LrtsDrainResources(void)
4595     if(mysize == 1) return;
4596     while (
4597 #if CMK_USE_OOB
4598            !SendBufferMsg(&smsg_oob_queue, NULL) ||
4599 #endif
4600            !SendBufferMsg(&smsg_queue, NULL) 
4601           )
4602     {
4603         if (useDynamicSMSG)
4604             PumpDatagramConnection();
4605         PumpNetworkSmsg();
4606         PumpLocalTransactions(default_tx_cqh, default_tx_cq_lock);
4608 #if MULTI_THREAD_SEND
4609         PumpLocalTransactions(rdma_tx_cqh, rdma_tx_cq_lock);
4610 #endif
4612 #if CMK_USE_OOB 
4613         PumpLocalTransactions(highprior_rdma_tx_cqh, rdma_tx_cq_lock);
4614 #endif
4616 #if REMOTE_EVENT
4617         PumpRemoteTransactions(rdma_rx_cqh);
4618 #endif
4619         SendRdmaMsg(sendRdmaBuf);
4620         SendRdmaMsg(sendHighPriorBuf);
4621     }
4622     PMI_Barrier();
4625 void LrtsAbort(const char *message) {
4626     PMI_Abort(-1, message);
4627     CMI_NORETURN_FUNCTION_END
4630 /**************************  TIMER FUNCTIONS **************************/
4631 #if CMK_TIMER_USE_SPECIAL
4632 /* MPI calls are not threadsafe, even the timer on some machines */
4633 static CmiNodeLock  timerLock = 0;
4634 static int _absoluteTime = 0;
4635 static int _is_global = 0;
4636 static struct timespec start_ts;
4638 int CmiTimerIsSynchronized(void) {
4639     return 0;
4642 int CmiTimerAbsolute(void) {
4643     return _absoluteTime;
4646 double CmiStartTimer(void) {
4647     return 0.0;
4650 double CmiInitTime(void) {
4651     return (double)(start_ts.tv_sec)+(double)start_ts.tv_nsec/1000000000.0;
4654 void CmiTimerInit(char **argv) {
4655     _absoluteTime = CmiGetArgFlagDesc(argv,"+useAbsoluteTime", "Use system's absolute time as wallclock time.");
4656     if (_absoluteTime && CmiMyPe() == 0)
4657         printf("Charm++> absolute  timer is used\n");
4658     
4659     _is_global = CmiTimerIsSynchronized();
4662     if (_is_global) {
4663         if (CmiMyRank() == 0) {
4664             clock_gettime(CLOCK_MONOTONIC, &start_ts);
4665         }
4666     } else { /* we don't have a synchronous timer, set our own start time */
4667         CmiBarrier();
4668         CmiBarrier();
4669         CmiBarrier();
4670         clock_gettime(CLOCK_MONOTONIC, &start_ts);
4671     }
4672     CmiNodeAllBarrier();          /* for smp */
4676  * Since the timerLock is never created, and is
4677  * always NULL, then all the if-condition inside
4678  * the timer functions could be disabled right
4679  * now in the case of SMP.
4680  */
4681 double CmiTimer(void) {
4682     struct timespec now_ts;
4683     clock_gettime(CLOCK_MONOTONIC, &now_ts);
4684     return _absoluteTime?((double)(now_ts.tv_sec)+(double)now_ts.tv_nsec/1000000000.0)
4685         : (double)( now_ts.tv_sec - start_ts.tv_sec ) + (((double) now_ts.tv_nsec - (double) start_ts.tv_nsec)  / 1000000000.0);
4688 double CmiWallTimer(void) {
4689     struct timespec now_ts;
4690     clock_gettime(CLOCK_MONOTONIC, &now_ts);
4691     return _absoluteTime?((double)(now_ts.tv_sec)+(double)now_ts.tv_nsec/1000000000.0)
4692         : ( now_ts.tv_sec - start_ts.tv_sec ) + ((now_ts.tv_nsec - start_ts.tv_nsec)  / 1000000000.0);
4695 double CmiCpuTimer(void) {
4696     struct timespec now_ts;
4697     clock_gettime(CLOCK_MONOTONIC, &now_ts);
4698     return _absoluteTime?((double)(now_ts.tv_sec)+(double)now_ts.tv_nsec/1000000000.0)
4699         : (double)( now_ts.tv_sec - start_ts.tv_sec ) + (((double) now_ts.tv_nsec - (double) start_ts.tv_nsec)  / 1000000000.0);
4702 #endif
4703 /************Barrier Related Functions****************/
4705 void LrtsBarrier(void)
4707     gni_return_t status;
4709     status = (gni_return_t)PMI_Barrier();
4710     GNI_RC_CHECK("PMI_Barrier", status);
4712 #if CMK_ONESIDED_IMPL
4713 #include "machine-onesided.c"
4714 #endif
4715 #if CMK_DIRECT
4716 #include "machine-cmidirect.c"
4717 #endif
4718 #if CMK_PERSISTENT_COMM
4719 #include "machine-persistent.c"
4720 #endif