2 * Ibverbs (infiniband) implementation of Converse NET version
4 * contains only Ibverbs specific code for:
6 * - CmiCommunicationInit()
7 * - CmiNotifyStillIdle()
8 * - DeliverViaNetwork()
9 * - CommunicationServer()
13 Sayantan Chakravorty, sayantan@gmail.com ,21st March 2007
24 #include <sys/types.h>
25 #include <sys/socket.h>
32 #include <infiniband/verbs.h>
36 enum ibv_mtu mtu
= IBV_MTU_2048
;
38 enum ibv_mtu mtu
= IBV_MTU_4096
;
42 static int packetSize
;
45 static int rdmaThreshold
;
46 static int firstBinSize
;
47 static int blockAllocRatio
;
48 static int blockThreshold
;
51 static int maxRecvBuffers
;
53 //static int tokensPerProcessor; /*number of outstanding sends and receives between any two nodes*/
54 static int sendPacketPoolSize
; /*total number of send buffers created*/
56 static double _startTime
=0;
61 static int minTokensLeft
;
64 static double regTime
;
66 static double processBufferedTime
;
67 static int processBufferedCount
;
69 #define CMK_IBVERBS_STATS 0
70 #define CMK_IBVERBS_TOKENS_FLOW 1
71 #define CMK_IBVERBS_INCTOKENS 0 //never turn this on
72 #define CMK_IBVERBS_DEBUG 0
73 #define CMI_DIRECT_DEBUG 0
74 #define WC_LIST_SIZE 32
75 /*#define WC_BUFFER_SIZE 100*/
79 static int numUnReg
=0;
80 static int numCurReg
=0;
81 static int numAlloc
=0;
83 static int numMultiSendUnreg
=0;
84 static int numMultiSend
=0;
85 static int numMultiSendFree
=0;
89 #define INCTOKENS_FRACTION 0.04
90 #define INCTOKENS_INCREASE .50
92 // flag for using a pool for every thread in SMP mode
94 #define THREAD_MULTI_POOL 1
100 void infi_CmiFreeDirect(void *ptr
);
101 static inline void fillBufferPools();
104 #define INFIBARRIERPACKET 128
106 struct infiIncTokenAckPacket
{
112 ***********************/
115 This is a header attached to the beginning of every infiniband packet
117 #define INFIPACKETCODE_DATA 1
118 #define INFIPACKETCODE_INCTOKENS 2
119 #define INFIRDMA_START 4
120 #define INFIRDMA_ACK 8
121 #define INFIDIRECT_REQUEST 16
122 #define INFIPACKETCODE_INCTOKENSACK 32
123 #define INFIDUMMYPACKET 64
125 struct infiPacketHeader
{
128 #if CMK_IBVERBS_DEBUG
134 Types of rdma packets
137 #define INFI_DIRECT 2
139 struct infiRdmaPacket
{
143 struct ibv_mr
*keyPtr
;
148 struct infiRdmaPacket
*next
,*prev
;
152 /** Represents a buffer that is used to receive messages
154 #define BUFFER_RECV 1
155 #define BUFFER_RDMA 2
165 /** At the moment it is a simple pool with just a list of buffers
166 * TODO; extend it to make it an element in a linklist of pools
168 struct infiBufferPool
{
170 struct infiBuffer
*buffers
;
171 struct infiBufferPool
*next
;
175 It is the structure for the send buffers that are used
176 to send messages to other nodes
180 typedef struct infiPacketStruct
{
183 struct infiPacketHeader header
;
184 struct ibv_mr
*keyHeader
;
185 struct OtherNodeStruct
*destNode
;
186 struct infiPacketStruct
*next
;
188 struct ibv_sge elemList
[2];
189 struct ibv_send_wr wr
;
193 typedef struct infiBufferedWCStruct{
194 struct ibv_wc wcList[WC_BUFFER_SIZE];
196 struct infiBufferedWCStruct *next,*prev;
200 #define BCASTLIST_SIZE 50
202 struct infiBufferedBcastStruct
{
210 typedef struct infiBufferedBcastPoolStruct
{
211 struct infiBufferedBcastPoolStruct
*next
,*prev
;
212 struct infiBufferedBcastStruct bcastList
[BCASTLIST_SIZE
];
215 } *infiBufferedBcastPool
;
221 This structure represents the data needed by the infiniband
222 communication routines of a node
223 TODO: add locking for the smp version
226 struct ibv_context
*context
;
232 // struct ibv_comp_channel *channel;
234 struct ibv_cq
*sendCq
;
235 struct ibv_cq
*recvCq
;
238 struct ibv_qp
**qp
; //Array of qps (numNodes long) to temporarily store the queue pairs
239 //It is used between CmiMachineInit and the call to node_addresses_store
240 //when the qps are stored in the corresponding OtherNodes
242 struct infiAddr
*localAddr
; //store the lid,qpn,msn address of ur qpair until they are sent
244 infiPacket infiPacketFreeList
;
246 struct infiBufferPool
*recvBufferPool
;
248 struct infiPacketHeader header
;
251 int sendCqSize
,recvCqSize
;
254 infiBufferedBcastPool bufferedBcastList
;
256 struct infiRdmaPacket
*bufferedRdmaAcks
;
258 struct infiRdmaPacket
*bufferedRdmaRequests
;
259 /* infiBufferedWC infiBufferedRecvList;*/
261 int insideProcessBufferedBcasts
;
264 static struct infiContext
*context
= NULL
;
269 /** Represents a qp used to send messages to another node
270 There is one for each remote node
277 Stored in the OtherNode structure in machine-dgram.c
278 Store the per node data for ibverbs layer
280 enum { INFI_HEADER_DATA
=21,INFI_DATA
};
282 struct infiOtherNodeData
{
284 int state
;// does it expect a packet with a header (first packet) or one without
290 int broot
;//needed to store the root of a multi-packet broadcast sent along a spanning tree or hypercube
291 #if CMK_IBVERBS_DEBUG
298 /********************************
299 Memory management structures and types
302 struct infiCmiChunkHeaderStruct
;
304 typedef struct infiCmiChunkMetaDataStruct
{
308 struct infiCmiChunkHeaderStruct
*owner
;
311 #if THREAD_MULTI_POOL
312 int parentPe
; // the PE that allocated the buffer and must release it
314 } infiCmiChunkMetaData
;
319 #define METADATAFIELD(m) (((infiCmiChunkHeader *)m)[-1].metaData)
322 int size
;//without infiCmiChunkHeader
327 #define INFINUMPOOLS 14
328 #define INFIMAXPERPOOL 100
329 #define INFIMULTIPOOL 0xDEAFB00D
331 #if THREAD_MULTI_POOL
332 static infiCmiChunkPool
**infiCmiChunkPools
;
333 //TODO Find proper place to dispose the memory acquired by infiCmiChunkPool
335 static infiCmiChunkPool infiCmiChunkPools
[INFINUMPOOLS
];
338 static void initInfiCmiChunkPools();
341 static inline infiPacket
newPacket(){
342 infiPacket pkt
= (infiPacket
)CmiAlloc(sizeof(struct infiPacketStruct
));
344 pkt
->header
= context
->header
;
346 pkt
->destNode
= NULL
;
347 pkt
->keyHeader
= METADATAFIELD(pkt
)->key
;
349 CmiAssert(pkt
->keyHeader
!=NULL
);
352 pkt
->elemList
[0].addr
= (uintptr_t)&(pkt
->header
);
353 pkt
->elemList
[0].length
= sizeof(struct infiPacketHeader
);
354 pkt
->elemList
[0].lkey
= pkt
->keyHeader
->lkey
;
356 pkt
->wr
.wr_id
= (uint64_t)pkt
;
357 pkt
->wr
.sg_list
= &(pkt
->elemList
[0]);
359 pkt
->wr
.opcode
= IBV_WR_SEND
;
360 pkt
->wr
.send_flags
= IBV_SEND_SIGNALED
;
366 #define FreeInfiPacket(pkt){ \
370 pkt->next = context->infiPacketFreeList; \
371 context->infiPacketFreeList = pkt; \
374 #define MallocInfiPacket(pkt) { \
375 infiPacket p = context->infiPacketFreeList; \
376 if(p == NULL){ p = newPacket();} \
377 else{context->infiPacketFreeList = p->next; } \
383 void infi_unregAndFreeMeta(void *md
)
385 if(md
!=NULL
&& (((infiCmiChunkMetaData
*)md
)->poolIdx
== INFIMULTIPOOL
))
387 int unregstat
=ibv_dereg_mr(((infiCmiChunkMetaData
*)md
)->key
);
388 CmiAssert(unregstat
==0);
389 free(((infiCmiChunkMetaData
*)md
));
390 #if CMK_IBVERBS_STATS
399 /******************CmiMachineInit and its helper functions*/
400 static inline int pollSendCq(const int toBuffer
);
402 void createLocalQps(struct ibv_device
*dev
,int ibPort
, int myNode
,int numNodes
,struct infiAddr
*localAddr
);
403 static uint16_t getLocalLid(struct ibv_context
*context
, int port
);
404 static int checkQp(struct ibv_qp
*qp
){
405 struct ibv_qp_attr attr
;
406 struct ibv_qp_init_attr init_attr
;
408 ibv_query_qp(qp
, &attr
, IBV_QP_STATE
| IBV_QP_CUR_STATE
|IBV_QP_CAP
,&init_attr
);
409 if(attr
.cur_qp_state
!= IBV_QPS_RTS
){
410 MACHSTATE2(3,"CHECKQP failed cap wr %d sge %d",attr
.cap
.max_send_wr
,attr
.cap
.max_send_sge
);
415 static void checkAllQps(){
417 for(i
=0;i
<CmiNumNodesGlobal();i
++){
418 if(i
!= CmiMyNodeGlobal()){
419 if(!checkQp(nodes
[i
].infiData
->qp
)){
421 CmiAbort("Queue pair check failed");
427 #if CMK_IBVERBS_FAST_START
428 static void send_partial_init();
431 static void CmiMachineInit(char **argv
){
432 struct ibv_device
**devList
;
433 struct ibv_device
*dev
;
438 struct infiRdmaPacket
**rdmaPktPtrs
;
439 int num_devices
, idev
;
445 MACHSTATE(3,"CmiMachineInit {");
446 MACHSTATE2(3,"Lrts_numNodes %d CmiNumNodes() %d",Lrts_numNodes
,CmiNumNodes());
447 MACHSTATE1(3,"CmiMyNodeSize() %d",CmiMyNodeSize());
449 //TODO: make the device and ibport configureable by commandline parameter
450 //Check example for how to do that
451 devList
= ibv_get_device_list(&num_devices
);
452 CmiEnforce(num_devices
> 0);
453 CmiEnforce(devList
!= NULL
);
455 context
= (struct infiContext
*)malloc(sizeof(struct infiContext
));
456 MACHSTATE1(3,"context allocated %p",context
);
458 //localAddr will store the local addresses of all the qps
459 context
->localAddr
= (struct infiAddr
*)malloc(sizeof(struct infiAddr
)*Lrts_numNodes
);
460 MACHSTATE1(3,"context->localAddr allocated %p",context
->localAddr
);
464 // try all devices, can't assume device 0 is IB, it may be ethernet
467 CmiEnforce(dev
!= NULL
);
469 MACHSTATE1(3,"device name %s",ibv_get_device_name(dev
));
471 //the context for this infiniband device
472 context
->context
= ibv_open_device(dev
);
473 CmiEnforce(context
->context
!= NULL
);
477 for (ibPort
= 1; ibPort
< MAXPORT
; ibPort
++) {
478 struct ibv_port_attr attr
;
479 if (ibv_query_port(context
->context
, ibPort
, &attr
) != 0) continue;
480 #if CMK_IBV_PORT_ATTR_HAS_LINK_LAYER
481 if (attr
.link_layer
== IBV_LINK_LAYER_INFINIBAND
) break;
487 if (ibPort
== MAXPORT
) {
488 if (++idev
== num_devices
)
489 CmiAbort("No valid IB port found!");
493 context
->ibPort
= ibPort
;
494 MACHSTATE1(3,"use port %d", ibPort
);
496 MACHSTATE1(3,"device opened %p",context
->context
);
498 /* FD_ZERO(&context->asyncFds);
499 FD_SET(context->context->async_fd,&context->asyncFds);
500 context->tmo.tv_sec=0;
501 context->tmo.tv_usec=0;
503 MACHSTATE(3,"asyncFds zeroed and set");*/
506 context
->pd
= ibv_alloc_pd(context
->context
);
507 CmiEnforce(context
->pd
!= NULL
);
508 MACHSTATE2(3,"pd %p pd->handle %d",context
->pd
,context
->pd
->handle
);
510 /******** At this point we know that this node is more or less serviceable
511 So, this is a good point for sending the partial init message for the fast
513 Moreover, no work dependent on the number of nodes has started yet.
516 #if CMK_IBVERBS_FAST_START
521 context
->header
.nodeNo
= Lrts_myNode
;
524 packetSize
= mtu_size
*4;
525 dataSize
= packetSize
-sizeof(struct infiPacketHeader
);
528 /* if(Lrts_numNodes*50 > calcMaxSize){
529 calcMaxSize = Lrts_numNodes*50;
530 if(calcMaxSize > 10000){
534 maxRecvBuffers
=calcMaxSize
;
535 if (CmiGetArgIntDesc(argv
,"+IBVMaxSendTokens",&maxTokens
,"User set IBV Max Outstanding Send Tokens") == 0)
536 maxTokens
= 1000; // this value may need to be tweaked later
537 context
->tokensLeft
=maxTokens
;
539 //tokensPerProcessor=4;
540 if(Lrts_numNodes
> 1){
541 #if !CMK_IBVERBS_FAST_START
542 /* a barrier to make sure all nodes initialized the device */
544 ctrl_sendone_nolock("barrier",NULL
,0,NULL
,0);
545 ChMessage_recv(Cmi_charmrun_fd
,&msg
);
547 createLocalQps(dev
,ibPort
,Lrts_myNode
,Lrts_numNodes
,context
->localAddr
);
550 if (Cmi_charmrun_fd
== -1) return;
554 // rdmaThreshold=32768;
557 CmiAssert(rdmaThreshold
> firstBinSize
);
558 /* blockAllocRatio=16;
566 #if !THREAD_MULTI_POOL
567 initInfiCmiChunkPools();
570 /*create the pool of send packets*/
571 sendPacketPoolSize
= maxTokens
/2;
572 if(sendPacketPoolSize
> 2000){
573 sendPacketPoolSize
= 2000;
576 context
->infiPacketFreeList
=NULL
;
577 pktPtrs
= malloc(sizeof(infiPacket
)*sendPacketPoolSize
);
579 //Silly way of allocating the memory buffers (slow as well) but simplifies the code
580 #if !THREAD_MULTI_POOL
581 for(i
=0;i
<sendPacketPoolSize
;i
++){
582 MallocInfiPacket(pktPtrs
[i
]);
585 for(i
=0;i
<sendPacketPoolSize
;i
++){
586 FreeInfiPacket(pktPtrs
[i
]);
591 context
->bufferedBcastList
=NULL
;
592 context
->bufferedRdmaAcks
= NULL
;
593 context
->bufferedRdmaRequests
= NULL
;
594 context
->insideProcessBufferedBcasts
=0;
600 if( Lrts_numNodes*4 < maxRecvBuffers/4){
601 numPkts = Lrts_numNodes*4;
603 numPkts = maxRecvBuffers/4;
606 rdmaPktPtrs = (struct infiRdmaPacket **)malloc(numPkts*sizeof(struct infiRdmaPacket));
607 for(k=0;k<numPkts;k++){
608 rdmaPktPtrs[k] = CmiAlloc(sizeof(struct infiRdmaPacket));
611 for(k=0;k<numPkts;k++){
612 CmiFree(rdmaPktPtrs[k]);
617 /* context->infiBufferedRecvList = NULL;*/
618 #if CMK_IBVERBS_STATS
625 processBufferedCount
=0;
626 processBufferedTime
=0;
628 minTokensLeft
= maxTokens
;
633 MACHSTATE(3,"} CmiMachineInit");
636 void CmiCommunicationInit(char **argv
)
638 #if THREAD_MULTI_POOL
639 initInfiCmiChunkPools();
645 Open a qp for every processor
647 void createLocalQps(struct ibv_device
*dev
,int ibPort
, int myNode
,int numNodes
,struct infiAddr
*localAddr
){
653 myLid
= getLocalLid(context
->context
,ibPort
);
655 MACHSTATE2(3,"myLid %d numNodes %d",myLid
,numNodes
);
657 context
->sendCqSize
= maxTokens
+2;
658 context
->sendCq
= ibv_create_cq(context
->context
,context
->sendCqSize
,NULL
,NULL
,0);
659 CmiAssert(context
->sendCq
!= NULL
);
661 MACHSTATE1(3,"sendCq created %p",context
->sendCq
);
664 context
->recvCqSize
= maxRecvBuffers
;
665 context
->recvCq
= ibv_create_cq(context
->context
,context
->recvCqSize
,NULL
,NULL
,0);
667 MACHSTATE2(3,"recvCq created %p %d",context
->recvCq
,context
->recvCqSize
);
668 CmiAssert(context
->recvCq
!= NULL
);
670 //array of queue pairs
672 context
->qp
= (struct ibv_qp
**)malloc(sizeof(struct ibv_qp
*)*numNodes
);
676 context
->srqSize
= (maxRecvBuffers
+2);
677 struct ibv_srq_init_attr srqAttr
= {
679 .max_wr
= context
->srqSize
,
683 context
->srq
= ibv_create_srq(context
->pd
,&srqAttr
);
684 CmiAssert(context
->srq
!= NULL
);
686 struct ibv_qp_init_attr initAttr
= {
687 .qp_type
= IBV_QPT_RC
,
688 .send_cq
= context
->sendCq
,
689 .recv_cq
= context
->recvCq
,
694 .max_send_wr
= maxTokens
,
698 struct ibv_qp_attr attr
;
700 attr
.qp_state
= IBV_QPS_INIT
;
702 attr
.port_num
= ibPort
;
703 attr
.qp_access_flags
= IBV_ACCESS_REMOTE_READ
| IBV_ACCESS_REMOTE_WRITE
;
705 /* MACHSTATE1(3,"context->pd %p",context->pd);
706 struct ibv_qp *qp = ibv_create_qp(context->pd,&initAttr);
707 MACHSTATE1(3,"TEST QP %p",qp);*/
709 for( i
=1;i
<numNodes
;i
++){
710 int n
= (myNode
+ i
)%numNodes
;
713 localAddr
[n
].lid
= myLid
;
714 context
->qp
[n
] = ibv_create_qp(context
->pd
,&initAttr
);
716 MACHSTATE2(3,"qp[%d] created %p",n
,context
->qp
[n
]);
717 CmiAssert(context
->qp
[n
] != NULL
);
719 ibv_modify_qp(context
->qp
[n
], &attr
,
723 IBV_QP_ACCESS_FLAGS
);
725 localAddr
[n
].qpn
= context
->qp
[n
]->qp_num
;
726 localAddr
[n
].psn
= lrand48() & 0xffffff;
727 MACHSTATE4(3,"i %d lid Ox%x qpn 0x%x psn 0x%x",n
,localAddr
[n
].lid
,localAddr
[n
].qpn
,localAddr
[n
].psn
);
731 MACHSTATE(3,"qps created");
734 void copyInfiAddr(ChInfiAddr
*qpList
){
737 MACHSTATE1(3,"copyInfiAddr Lrts_myNode %d",Lrts_myNode
);
738 for(i
=0;i
<Lrts_numNodes
;i
++){
739 if(i
== Lrts_myNode
){
741 qpList
[qpListIdx
].lid
= ChMessageInt_new(context
->localAddr
[i
].lid
);
742 qpList
[qpListIdx
].qpn
= ChMessageInt_new(context
->localAddr
[i
].qpn
);
743 qpList
[qpListIdx
].psn
= ChMessageInt_new(context
->localAddr
[i
].psn
);
750 static uint16_t getLocalLid(struct ibv_context
*dev_context
, int port
){
751 struct ibv_port_attr attr
;
753 if (ibv_query_port(dev_context
, port
, &attr
))
759 /**************** END OF CmiMachineInit and its helper functions*/
761 struct infiBufferPool
* allocateInfiBufferPool(int numRecvs
,int sizePerBuffer
);
762 void postInitialRecvs(struct infiBufferPool
*recvBufferPool
,int numRecvs
,int sizePerBuffer
);
764 /* Initial the infiniband specific data for a remote node
765 1. connect the qp and store it in and return it
767 struct infiOtherNodeData
*initInfiOtherNodeData(int node
,int addr
[3]){
768 struct infiOtherNodeData
* ret
= malloc(sizeof(struct infiOtherNodeData
));
770 ret
->state
= INFI_HEADER_DATA
;
771 ret
->qp
= context
->qp
[node
];
772 // ret->totalTokens = tokensPerProcessor;
773 // ret->tokensLeft = tokensPerProcessor;
775 // ret->postedRecvs = tokensPerProcessor;
776 #if CMK_IBVERBS_DEBUG
781 struct ibv_qp_attr attr
= {
782 .qp_state
= IBV_QPS_RTR
,
784 .dest_qp_num
= addr
[1],
786 .max_dest_rd_atomic
= 1,
793 .port_num
= context
->ibPort
797 MACHSTATE2(3,"initInfiOtherNodeData %d{ qp %p",node
,ret
->qp
);
798 MACHSTATE3(3,"dlid 0x%x qp 0x%x psn 0x%x",attr
.ah_attr
.dlid
,attr
.dest_qp_num
,attr
.rq_psn
);
800 if (err
= ibv_modify_qp(ret
->qp
, &attr
,
806 IBV_QP_MAX_DEST_RD_ATOMIC
|
807 IBV_QP_MIN_RNR_TIMER
)) {
808 MACHSTATE1(3,"ERROR %d",err
);
809 CmiAbort("failed to change qp state to RTR");
812 MACHSTATE(3,"qp state changed to RTR");
814 attr
.qp_state
= IBV_QPS_RTS
;
823 attr
.sq_psn
= context
->localAddr
[node
].psn
;
824 attr
.max_rd_atomic
= 1;
826 if (err
= ibv_modify_qp(ret
->qp
, &attr
,
832 IBV_QP_MAX_QP_RD_ATOMIC
)) {
833 MACHSTATE1(3,"ERROR changing qp state to RTS %d: will retry",err
);
836 // Error code 22 means that there was an invalid parameter when calling to this verbs, try with alternate parameters
851 MACHSTATE3(3,"Retry:dlid 0x%x qp 0x%x psn 0x%x",attr
.ah_attr
.dlid
,attr
.dest_qp_num
,attr
.sq_psn
);
852 if (err
= ibv_modify_qp(ret
->qp
, &attr
,
858 IBV_QP_MAX_QP_RD_ATOMIC
)) {
859 MACHSTATE1(3,"ERROR changing qp state to RTS %d",err
);
860 CmiAbort("Failed to change qp state to RTS: you may need some device-specific parameters in machine-ibverbs");
863 CmiAbort("Failed to change qp state to RTS");
866 MACHSTATE(3,"qp state changed to RTS");
868 MACHSTATE(3,"} initInfiOtherNodeData");
873 void infiPostInitialRecvs(){
874 //create the pool and post the receives
876 /* if(tokensPerProcessor*(Lrts_numNodes-1) <= maxRecvBuffers){
877 numPosts = tokensPerProcessor*(Lrts_numNodes-1);
879 numPosts = maxRecvBuffers;
882 if(Lrts_numNodes
> 1){
883 numPosts
= maxRecvBuffers
;
888 context
->recvBufferPool
= allocateInfiBufferPool(numPosts
,packetSize
);
889 postInitialRecvs(context
->recvBufferPool
,numPosts
,packetSize
);
897 free(context
->localAddr
);
898 context
->localAddr
= NULL
;
901 struct infiBufferPool
* allocateInfiBufferPool(int numRecvs
,int sizePerBuffer
){
906 struct infiBufferPool
*ret
;
907 struct ibv_mr
*bigKey
;
909 MACHSTATE2(3,"allocateInfiBufferPool numRecvs %d sizePerBuffer%d ",numRecvs
,sizePerBuffer
);
911 page_size
= sysconf(_SC_PAGESIZE
);
912 ret
= malloc(sizeof(struct infiBufferPool
));
914 numBuffers
=ret
->numBuffers
= numRecvs
;
916 ret
->buffers
= malloc(sizeof(struct infiBuffer
)*numBuffers
);
918 bigSize
= numBuffers
*sizePerBuffer
;
919 bigBuf
=malloc(bigSize
);
920 bigKey
= ibv_reg_mr(context
->pd
,bigBuf
,bigSize
,IBV_ACCESS_LOCAL_WRITE
);
921 #if CMK_IBVERBS_STATS
926 CmiAssert(bigKey
!= NULL
);
928 for(i
=0;i
<numBuffers
;i
++){
929 struct infiBuffer
*buffer
= &(ret
->buffers
[i
]);
930 buffer
->type
= BUFFER_RECV
;
931 buffer
->size
= sizePerBuffer
;
934 buffer
->buf
= &bigBuf
[i
*sizePerBuffer
];
935 buffer
->key
= bigKey
;
937 if(buffer
->key
== NULL
){
938 MACHSTATE2(3,"i %d buffer->buf %p",i
,buffer
->buf
);
939 CmiAssert(buffer
->key
!= NULL
);
948 Post the buffers as recv work requests
950 void postInitialRecvs(struct infiBufferPool
*recvBufferPool
,int numRecvs
,int sizePerBuffer
){
952 struct ibv_recv_wr
*workRequests
= malloc(sizeof(struct ibv_recv_wr
)*numRecvs
);
953 struct ibv_sge
*sgElements
= malloc(sizeof(struct ibv_sge
)*numRecvs
);
954 struct ibv_recv_wr
*bad_wr
;
956 int startBufferIdx
=0;
957 MACHSTATE2(3,"posting %d receives of size %d",numRecvs
,sizePerBuffer
);
958 for(j
=0;j
<numRecvs
;j
++){
961 sgElements
[j
].addr
= (uint64_t) recvBufferPool
->buffers
[startBufferIdx
+j
].buf
;
962 sgElements
[j
].length
= sizePerBuffer
;
963 sgElements
[j
].lkey
= recvBufferPool
->buffers
[startBufferIdx
+j
].key
->lkey
;
965 workRequests
[j
].wr_id
= (uint64_t)&(recvBufferPool
->buffers
[startBufferIdx
+j
]);
966 workRequests
[j
].sg_list
= &sgElements
[j
];
967 workRequests
[j
].num_sge
= 1;
969 workRequests
[j
].next
= &workRequests
[j
+1];
973 workRequests
[numRecvs
-1].next
= NULL
;
974 MACHSTATE(3,"About to call ibv_post_srq_recv");
975 if(ibv_post_srq_recv(context
->srq
,workRequests
,&bad_wr
)){
976 CmiAbort("ibv_post_srq_recv failed");
986 static inline void CommunicationServer_nolock(int toBuffer
); //if buffer ==1 recvd messages are buffered but not processed
990 #if CMK_IBVERBS_STATS
991 printf("[%d] numReg %d numUnReg %d numCurReg %d msgCount %d pktCount %d packetSize %d total Time %.6lf s processBufferedCount %d processBufferedTime %.6lf s maxTokens %d tokensLeft %d \n",Lrts_myNode
,numReg
, numUnReg
, numCurReg
, msgCount
,pktCount
,packetSize
,CmiTimer(),processBufferedCount
,processBufferedTime
,maxTokens
,context
->tokensLeft
);
995 static void ServiceCharmrun_nolock();
997 static inline void increaseTokens(OtherNode node
);
999 static inline int pollRecvCq(const int toBuffer
);
1000 static inline int pollSendCq(const int toBuffer
);
1003 static inline void getFreeTokens(struct infiOtherNodeData
*infiData
){
1004 #if !CMK_IBVERBS_TOKENS_FLOW
1007 //if(infiData->tokensLeft == 0){
1008 if(context
->tokensLeft
== 0){
1009 MACHSTATE(3,"GET FREE TOKENS {{{");
1013 while(context
->tokensLeft
== 0){
1014 CommunicationServer_nolock(1);
1016 MACHSTATE1(3,"}}} GET FREE TOKENS %d",context
->tokensLeft
);
1022 Packetize this data and send it
1027 static void inline EnqueuePacket(OtherNode node
,infiPacket packet
,int size
,struct ibv_mr
*dataKey
){
1030 #if CMK_IBVERBS_DEBUG
1031 packet
->header
.psn
= (++node
->infiData
->psn
);
1036 packet
->elemList
[1].addr
= (uintptr_t)packet
->buf
;
1037 packet
->elemList
[1].length
= size
;
1038 packet
->elemList
[1].lkey
= dataKey
->lkey
;
1041 packet
->destNode
= node
;
1043 #if CMK_IBVERBS_STATS
1047 getFreeTokens(node
->infiData
);
1049 #if CMK_IBVERBS_INCTOKENS
1050 if((node
->infiData
->tokensLeft
< INCTOKENS_FRACTION
*node
->infiData
->totalTokens
|| node
->infiData
->tokensLeft
< 2) && node
->infiData
->totalTokens
< maxTokens
){
1051 packet
->header
.code
|= INFIPACKETCODE_INCTOKENS
;
1056 if(!checkQp(node->infiData->qp)){
1061 struct ibv_send_wr
*bad_wr
=NULL
;
1062 if(retval
= ibv_post_send(node
->infiData
->qp
,&(packet
->wr
),&bad_wr
)){
1063 CmiPrintf("[%d] Sending to node %d failed with return value %d\n",Lrts_myNode
,node
->infiData
->nodeNo
,retval
);
1064 CmiAbort("ibv_post_send failed");
1066 #if CMK_IBVERBS_TOKENS_FLOW
1067 context
->tokensLeft
--;
1068 #if CMK_IBVERBS_STATS
1069 if(context
->tokensLeft
< minTokensLeft
){
1070 minTokensLeft
= context
->tokensLeft
;
1075 /* if(!checkQp(node->infiData->qp)){
1080 #if CMK_IBVERBS_INCTOKENS
1082 increaseTokens(node
);
1087 #if CMK_IBVERBS_DEBUG
1088 MACHSTATE4(3,"Packet send size %d node %d tokensLeft %d psn %d",size
,packet
->destNode
->infiData
->nodeNo
,context
->tokensLeft
,packet
->header
.psn
);
1090 MACHSTATE4(3,"Packet send size %d node %d tokensLeft %d packet->buf %p",size
,packet
->destNode
->infiData
->nodeNo
,context
->tokensLeft
,packet
->buf
);
1096 static void inline EnqueueDummyPacket(OtherNode node
,int size
){
1098 MallocInfiPacket(packet
);
1099 packet
->size
= size
;
1100 packet
->buf
= CmiAlloc(size
);
1102 packet
->header
.code
= INFIDUMMYPACKET
;
1104 struct ibv_mr
*key
= METADATAFIELD(packet
->buf
)->key
;
1106 MACHSTATE2(3,"Dummy packet to %d size %d",node
->infiData
->nodeNo
,size
);
1107 EnqueuePacket(node
,packet
,size
,key
);
1115 static void inline EnqueueDataPacket(OutgoingMsg ogm
, OtherNode node
, int rank
,char *data
,int size
,int broot
,int copy
){
1117 MallocInfiPacket(packet
);
1118 packet
->size
= size
;
1121 //the nodeNo is added at time of packet allocation
1122 packet
->header
.code
= INFIPACKETCODE_DATA
;
1127 struct ibv_mr
*key
= METADATAFIELD(ogm
->data
)->key
;
1128 CmiAssert(key
!= NULL
);
1130 EnqueuePacket(node
,packet
,size
,key
);
1133 static inline void EnqueueRdmaPacket(OutgoingMsg ogm
, OtherNode node
);
1134 static inline void processAllBufferedMsgs();
1136 void DeliverViaNetwork(OutgoingMsg ogm
, OtherNode node
, int rank
, unsigned int broot
, int copy
){
1137 int size
; char *data
;
1138 // processAllBufferedMsgs();
1145 #if CMK_IBVERBS_STATS
1149 MACHSTATE3(3,"Sending ogm %p of size %d to %d",ogm
,size
,node
->infiData
->nodeNo
);
1150 //First packet has dgram header, other packets dont
1152 DgramHeaderMake(data
, rank
, ogm
->src
, Cmi_charmrun_pid
, 1, broot
);
1154 CMI_MSG_SIZE(ogm
->data
)=ogm
->size
;
1156 if(rdma
&& size
> rdmaThreshold
){
1157 EnqueueRdmaPacket(ogm
,node
);
1160 while(size
> dataSize
){
1161 EnqueueDataPacket(ogm
,node
,rank
,data
,dataSize
,broot
,copy
);
1166 EnqueueDataPacket(ogm
,node
,rank
,data
,size
,broot
,copy
);
1170 processAllBufferedMsgs();
1173 MACHSTATE3(3,"DONE Sending ogm %p of size %d to %d",ogm
,ogm
->size
,node
->infiData
->nodeNo
);
1177 static inline void EnqueueRdmaPacket(OutgoingMsg ogm
, OtherNode node
){
1182 MallocInfiPacket(packet
);
1185 struct infiRdmaPacket
*rdmaPacket
= (struct infiRdmaPacket
*)CmiAlloc(sizeof(struct infiRdmaPacket
));
1188 packet
->size
= sizeof(struct infiRdmaPacket
);
1189 packet
->buf
= (char *)rdmaPacket
;
1191 struct ibv_mr
*key
= METADATAFIELD(ogm
->data
)->key
;
1193 CmiAssert(key
!=NULL
);
1195 MACHSTATE3(3,"ogm->data %p metadata %p key %p",ogm
->data
,METADATAFIELD(ogm
->data
),key
);
1197 packet
->header
.code
= INFIRDMA_START
;
1198 packet
->header
.nodeNo
= CmiMyNodeGlobal();
1201 rdmaPacket
->type
= INFI_MESG
;
1202 rdmaPacket
->ogm
= ogm
;
1203 rdmaPacket
->key
= *key
;
1204 rdmaPacket
->keyPtr
= key
;
1205 rdmaPacket
->remoteBuf
= ogm
->data
;
1206 rdmaPacket
->remoteSize
= ogm
->size
;
1209 struct ibv_mr
*packetKey
= METADATAFIELD((void *)rdmaPacket
)->key
;
1211 MACHSTATE3(3,"rdmaRequest being sent to node %d buf %p size %d",node
->infiData
->nodeNo
,ogm
->data
,ogm
->size
);
1212 EnqueuePacket(node
,packet
,sizeof(struct infiRdmaPacket
),packetKey
);
1218 static inline void processRecvWC(struct ibv_wc
*recvWC
,const int toBuffer
);
1219 static inline void processSendWC(struct ibv_wc
*sendWC
);
1220 static unsigned int _count
=0;
1222 static int _countAsync
=0;
1223 static inline void processAsyncEvents(){
1224 struct ibv_async_event event
;
1227 if(_countAsync
< 1){
1231 FD_SET(context
->context
->async_fd
,&context
->asyncFds
);
1232 CmiAssert(FD_ISSET(context
->context
->async_fd
,&context
->asyncFds
));
1233 ready
= select(1, &context
->asyncFds
,NULL
,NULL
,&context
->tmo
);
1239 // printf("[%d] strerror %s \n",Lrts_myNode,strerror(errno));
1243 if (ibv_get_async_event(context
->context
, &event
)){
1245 CmiAbort("get async event failed");
1247 printf("[%d] async event %d \n",CmiMyNodeGlobal(), event
.event_type
);
1248 ibv_ack_async_event(&event
);
1253 static void pollCmiDirectQ();
1255 static inline void CommunicationServer_nolock(int toBuffer
) {
1257 if(CmiNumNodesGlobal() <= 1){
1261 MACHSTATE(2,"CommServer_nolock{");
1263 // processAsyncEvents();
1269 processed
= pollRecvCq(toBuffer
);
1272 processed
+= pollSendCq(toBuffer
);
1275 // if(processed != 0)
1276 processAllBufferedMsgs();
1282 MACHSTATE(2,"} CommServer_nolock ne");
1286 static inline infiBufferedWC createInfiBufferedWC(){
1287 infiBufferedWC ret = malloc(sizeof(struct infiBufferedWCStruct));
1289 ret->next = ret->prev =NULL;
1294 The buffered recvWC are stored in a doubly linked list of
1295 arrays or blocks of wcs.
1296 To keep the average insert cost low, a new block is added
1297 to the top of the list. (resulting in a reverse seq of blocks)
1298 Within a block however wc are stored in a sequence
1300 /*static void insertBufferedRecv(struct ibv_wc *wc){
1301 infiBufferedWC block;
1302 MACHSTATE(3,"Insert Buffered Recv called");
1303 if( context->infiBufferedRecvList ==NULL){
1304 context->infiBufferedRecvList = createInfiBufferedWC();
1305 block = context->infiBufferedRecvList;
1307 if(context->infiBufferedRecvList->count == WC_BUFFER_SIZE){
1308 block = createInfiBufferedWC();
1309 context->infiBufferedRecvList->prev = block;
1310 block->next = context->infiBufferedRecvList;
1311 context->infiBufferedRecvList = block;
1313 block = context->infiBufferedRecvList;
1317 block->wcList[block->count] = *wc;
1324 go through the blocks of bufferedWC. Process the last block first.
1325 Then the next one and so on. (Processing within a block should happen
1327 Leave the last block in place to avoid having to allocate again
1329 /*static inline void processBufferedRecvList(){
1330 infiBufferedWC start;
1331 start = context->infiBufferedRecvList;
1332 while(start->next != NULL){
1333 start = start->next;
1335 while(start != NULL){
1338 for(i=0;i<start->count;i++){
1339 processRecvWC(&start->wcList[i]);
1341 if(start != context->infiBufferedRecvList){
1344 start = start->prev;
1348 start = start->prev;
1351 context->infiBufferedRecvList->next = NULL;
1352 context->infiBufferedRecvList->prev = NULL;
1353 context->infiBufferedRecvList->count = 0;
1357 static inline int pollRecvCq(const int toBuffer
){
1360 struct ibv_wc wc
[WC_LIST_SIZE
];
1362 MACHSTATE1(2,"pollRecvCq %d (((",toBuffer
);
1363 ne
= ibv_poll_cq(context
->recvCq
,WC_LIST_SIZE
,&wc
[0]);
1364 // CmiAssert(ne >=0);
1367 MACHSTATE1(3,"pollRecvCq ne %d",ne
);
1371 if(wc
[i
].status
!= IBV_WC_SUCCESS
){
1372 CmiAbort("Work completion error in recvCq");
1374 switch(wc
[i
].opcode
){
1376 processRecvWC(&wc
[i
],toBuffer
);
1379 CmiAbort("Wrong type of work completion object in recvCq");
1384 MACHSTATE1(2,"))) pollRecvCq %d",toBuffer
);
1389 static inline void processRdmaWC(struct ibv_wc
*rdmaWC
,const int toBuffer
);
1391 static inline int pollSendCq(const int toBuffer
){
1394 struct ibv_wc wc
[WC_LIST_SIZE
];
1396 ne
= ibv_poll_cq(context
->sendCq
,WC_LIST_SIZE
,&wc
[0]);
1397 // CmiAssert(ne >=0);
1401 if(wc
[i
].status
!= IBV_WC_SUCCESS
){
1402 printf("[%d] wc[%d] status %d wc[i].opcode %d\n",CmiMyNodeGlobal(),i
,wc
[i
].status
,wc
[i
].opcode
);
1403 #if CMK_IBVERBS_STATS
1404 printf("[%d] msgCount %d pktCount %d packetSize %d total Time %.6lf s processBufferedCount %d processBufferedTime %.6lf s maxTokens %d tokensLeft %d minTokensLeft %d \n",CmiMyNodeGlobal(),msgCount
,pktCount
,packetSize
,CmiTimer(),processBufferedCount
,processBufferedTime
,maxTokens
,context
->tokensLeft
,minTokensLeft
);
1406 CmiAbort("Work completion error in sendCq");
1408 switch(wc
[i
].opcode
){
1411 processSendWC(&wc
[i
]);
1415 case IBV_WC_RDMA_READ
:
1417 // processRdmaWC(&wc[i],toBuffer);
1418 processRdmaWC(&wc
[i
],1);
1421 case IBV_WC_RDMA_WRITE
:
1423 /*** used for CmiDirect puts
1424 Nothing needs to be done on the sender side once send is done **/
1428 CmiAbort("Wrong type of work completion object in sendCq");
1438 Check the communication server socket and
1441 int CheckSocketsReady(int withDelayMs
)
1444 CMK_PIPE_DECL(withDelayMs
);
1446 CmiStdoutAdd(CMK_PIPE_SUB
);
1447 if (Cmi_charmrun_fd
!=-1) CMK_PIPE_ADDREAD(Cmi_charmrun_fd
);
1449 nreadable
=CMK_PIPE_CALL();
1450 ctrlskt_ready_read
= 0;
1451 dataskt_ready_read
= 0;
1452 dataskt_ready_write
= 0;
1454 if (nreadable
== 0) {
1455 MACHSTATE(1,"} CheckSocketsReady (nothing readable)")
1458 if (nreadable
==-1) {
1459 CMK_PIPE_CHECKERR();
1460 MACHSTATE(2,"} CheckSocketsReady (INTERRUPTED!)")
1461 return CheckSocketsReady(0);
1463 CmiStdoutCheck(CMK_PIPE_SUB
);
1464 if (Cmi_charmrun_fd
!=-1)
1465 ctrlskt_ready_read
= CMK_PIPE_CHECKREAD(Cmi_charmrun_fd
);
1466 MACHSTATE(1,"} CheckSocketsReady")
1471 /*** Service the charmrun socket
1474 static void ServiceCharmrun_nolock()
1477 MACHSTATE(2,"ServiceCharmrun_nolock begin {")
1481 CheckSocketsReady(0);
1482 if (ctrlskt_ready_read
) { ctrl_getone(); again
=1; }
1483 if (CmiStdoutNeedsService()) { CmiStdoutService(); }
1485 MACHSTATE(2,"} ServiceCharmrun_nolock end")
1490 static void CommunicationServerNet(int sleepTime
, int where
){
1491 if( where
== COMM_SERVER_FROM_INTERRUPT
){
1492 #if CMK_IMMEDIATE_MSG
1493 CmiHandleImmediate();
1498 if(where
== COMM_SERVER_FROM_WORKER
){
1502 inProgress
[CmiMyRank()] += 1;
1503 if(where
== COMM_SERVER_FROM_SMP
){
1505 ServiceCharmrun_nolock();
1509 CommunicationServer_nolock(0);
1512 inProgress
[CmiMyRank()] -= 1;
1515 /* when called by communication thread or in interrupt */
1516 #if CMK_IMMEDIATE_MSG
1517 if (where
== COMM_SERVER_FROM_SMP
) {
1518 CmiHandleImmediate();
1524 static void insertBufferedBcast(char *msg
,int size
,int broot
,int asm_rank
);
1527 void static inline handoverMessage(char *newmsg
,int total_size
,int rank
,int broot
,int toBuffer
);
1529 static inline void processMessage(int nodeNo
,int len
,char *msg
,const int toBuffer
){
1532 MACHSTATE2(3,"Processing packet from node %d len %d",nodeNo
,len
);
1534 OtherNode node
= &nodes
[nodeNo
];
1535 newmsg
= node
->asm_msg
;
1537 /// This simple state machine determines if this packet marks the beginning of a new message
1538 // from another node, or if this is another in a sequence of packets
1539 switch(node
->infiData
->state
){
1540 case INFI_HEADER_DATA
:
1543 int rank
, srcpe
, seqno
, magic
, i
;
1545 DgramHeaderBreak(msg
, rank
, srcpe
, magic
, seqno
, broot
);
1546 size
= CMI_MSG_SIZE(msg
);
1547 MACHSTATE2(3,"START of a new message from node %d of total size %d",nodeNo
,size
);
1548 // CmiAssert(size > 0);
1549 // CmiAssert(nodes_by_pe[srcpe] == node);
1551 // CmiAssert(newmsg == NULL);
1553 //CmiPrintf("size: %d, len:%d.\n", size, len);
1554 CmiAbort("\n\n\t\tLength mismatch!!\n\n");
1556 newmsg
= (char *)CmiAlloc(size
);
1558 memcpy(newmsg
, msg
, len
);
1559 node
->asm_rank
= rank
;
1560 node
->asm_total
= size
;
1561 node
->asm_fill
= len
;
1562 node
->asm_msg
= newmsg
;
1563 node
->infiData
->broot
= broot
;
1565 //this is the only packet for this message
1566 node
->infiData
->state
= INFI_HEADER_DATA
;
1568 //there are more packets following
1569 node
->infiData
->state
= INFI_DATA
;
1575 if(node
->asm_fill
+ len
< node
->asm_total
&& len
!= dataSize
){
1576 //CmiPrintf("from node %d asm_total: %d, asm_fill: %d, len:%d.\n",node->infiData->nodeNo, node->asm_total, node->asm_fill, len);
1577 CmiAbort("packet in the middle does not have expected length");
1579 if(node
->asm_fill
+len
> node
->asm_total
){
1580 //CmiPrintf("asm_total: %d, asm_fill: %d, len:%d.\n", node->asm_total, node->asm_fill, len);
1581 CmiAbort("\n\n\t\tLength mismatch!!\n\n");
1584 memcpy(newmsg
+ node
->asm_fill
,msg
,len
);
1585 node
->asm_fill
+= len
;
1586 if(node
->asm_fill
== node
->asm_total
){
1587 node
->infiData
->state
= INFI_HEADER_DATA
;
1589 node
->infiData
->state
= INFI_DATA
;
1594 /// if this packet was the last packet in a message ie state was
1595 /// reset to infi_header_data
1597 if(node
->infiData
->state
== INFI_HEADER_DATA
){
1598 int total_size
= node
->asm_total
;
1599 node
->asm_msg
= NULL
;
1600 handoverMessage(newmsg
,total_size
,node
->asm_rank
,node
->infiData
->broot
,1);
1601 MACHSTATE3(3,"Message from node %d of length %d completely received msg %p",nodeNo
,total_size
,newmsg
);
1606 void static inline handoverMessage(char *newmsg
,int total_size
,int rank
,int broot
,int toBuffer
){
1609 if((CMI_BROADCAST_ROOT(newmsg
)!=0))
1610 insertBufferedBcast(newmsg
,total_size
,broot
,rank
);
1611 else handleOneRecvedMsg(total_size
, newmsg
);
1613 else handleOneRecvedMsg(total_size
, newmsg
);
1617 processAllBufferedMsgs();
1623 static inline void increasePostedRecvs(int nodeNo
);
1624 static inline void processRdmaRequest(struct infiRdmaPacket
*rdmaPacket
,int fromNodeNo
,int isBuffered
);
1625 static inline void processRdmaAck(struct infiRdmaPacket
*rdmaPacket
);
1627 //struct infiDirectRequestPacket;
1628 //static inline void processDirectRequest(struct infiDirectRequestPacket *directRequestPacket);
1630 static inline void processRecvWC(struct ibv_wc
*recvWC
,const int toBuffer
){
1631 struct infiBuffer
*buffer
= (struct infiBuffer
*) recvWC
->wr_id
;
1632 struct infiPacketHeader
*header
= (struct infiPacketHeader
*)buffer
->buf
;
1633 int nodeNo
= header
->nodeNo
;
1634 #if CMK_IBVERBS_DEBUG
1635 OtherNode node
= &nodes
[nodeNo
];
1638 int len
= recvWC
->byte_len
-sizeof(struct infiPacketHeader
);
1639 #if CMK_IBVERBS_DEBUG
1641 if(node->infiData->recvPsn == 0){
1642 node->infiData->recvPsn = header->psn;
1644 CmiAssert(header->psn == (node->infiData->recvPsn)+1);
1645 * node->infiData->recvPsn++;
1648 MACHSTATE3(3,"packet from node %d len %d psn %d",nodeNo
,len
,header
->psn
);
1650 MACHSTATE2(3,"packet from node %d len %d",nodeNo
,len
);
1653 if(header
->code
& INFIPACKETCODE_DATA
){
1655 processMessage(nodeNo
,len
,(buffer
->buf
+sizeof(struct infiPacketHeader
)),toBuffer
);
1657 if(header
->code
& INFIDUMMYPACKET
){
1658 MACHSTATE(3,"Dummy packet");
1660 if(header
->code
& INFIBARRIERPACKET
){
1661 MACHSTATE(3,"Barrier packet");
1662 CmiAbort("Should not receive Barrier packet in normal polling loop. Your Barrier is broken");
1665 #if CMK_IBVERBS_INCTOKENS
1666 if(header
->code
& INFIPACKETCODE_INCTOKENS
){
1667 increasePostedRecvs(nodeNo
);
1670 if(rdma
&& header
->code
& INFIRDMA_START
){
1671 struct infiRdmaPacket
*rdmaPacket
= (struct infiRdmaPacket
*)(buffer
->buf
+sizeof(struct infiPacketHeader
));
1673 //TODO: make a function of this and use for both acks and requests
1674 struct infiRdmaPacket
*copyPacket
= malloc(sizeof(struct infiRdmaPacket
));
1675 struct infiRdmaPacket
*tmp
=context
->bufferedRdmaRequests
;
1676 *copyPacket
= *rdmaPacket
;
1677 copyPacket
->fromNodeNo
= nodeNo
;
1678 MACHSTATE1(3,"Buffering Rdma Request %p",copyPacket
);
1679 context
->bufferedRdmaRequests
= copyPacket
;
1680 copyPacket
->next
= tmp
;
1681 copyPacket
->prev
= NULL
;
1683 tmp
->prev
= copyPacket
;
1686 processRdmaRequest(rdmaPacket,nodeNo,0);
1689 if(rdma
&& header
->code
& INFIRDMA_ACK
){
1690 struct infiRdmaPacket
*rdmaPacket
= (struct infiRdmaPacket
*)(buffer
->buf
+sizeof(struct infiPacketHeader
)) ;
1691 processRdmaAck(rdmaPacket
);
1693 /* if(header->code & INFIDIRECT_REQUEST){
1694 struct infiDirectRequestPacket *directRequestPacket = (struct infiDirectRequestPacket *)(buffer->buf+sizeof(struct infiPacketHeader));
1695 processDirectRequest(directRequestPacket);
1698 struct ibv_sge list
= {
1699 .addr
= (uintptr_t) buffer
->buf
,
1700 .length
= buffer
->size
,
1701 .lkey
= buffer
->key
->lkey
1704 struct ibv_recv_wr wr
= {
1705 .wr_id
= (uint64_t)buffer
,
1710 struct ibv_recv_wr
*bad_wr
;
1712 if(ibv_post_srq_recv(context
->srq
,&wr
,&bad_wr
)){
1713 CmiAbort("ibv_post_srq_recv failed");
1722 static inline void processSendWC(struct ibv_wc
*sendWC
){
1724 infiPacket packet
= (infiPacket
)sendWC
->wr_id
;
1725 #if CMK_IBVERBS_TOKENS_FLOW
1726 // packet->destNode->infiData->tokensLeft++;
1727 context
->tokensLeft
++;
1730 MACHSTATE2(3,"Packet send complete node %d tokensLeft %d",packet
->destNode
->infiData
->nodeNo
,context
->tokensLeft
);
1731 if(packet
->ogm
!= NULL
){
1732 packet
->ogm
->refcount
--;
1733 if(packet
->ogm
->refcount
== 0){
1734 GarbageCollectMsg(packet
->ogm
);
1737 if(packet
->header
.code
== INFIRDMA_START
|| packet
->header
.code
== INFIRDMA_ACK
|| packet
->header
.code
== INFIDUMMYPACKET
){
1738 if (packet
->buf
) CmiFree(packet
->buf
); /* gzheng */
1742 FreeInfiPacket(packet
);
1747 /********************************************************************/
1748 static inline void processRdmaRequest(struct infiRdmaPacket
*_rdmaPacket
,int fromNodeNo
,int isBuffered
){
1749 int nodeNo
= fromNodeNo
;
1750 OtherNode node
= &nodes
[nodeNo
];
1751 struct infiRdmaPacket
*rdmaPacket
;
1753 getFreeTokens(node
->infiData
);
1754 #if CMK_IBVERBS_TOKENS_FLOW
1755 // node->infiData->tokensLeft--;
1756 context
->tokensLeft
--;
1757 #if CMK_IBVERBS_STATS
1758 if(context
->tokensLeft
< minTokensLeft
){
1759 minTokensLeft
= context
->tokensLeft
;
1764 struct infiBuffer
*buffer
= malloc(sizeof(struct infiBuffer
));
1765 // CmiAssert(buffer != NULL);
1769 rdmaPacket
= _rdmaPacket
;
1771 rdmaPacket
= malloc(sizeof(struct infiRdmaPacket
));
1772 *rdmaPacket
= *_rdmaPacket
;
1776 rdmaPacket
->fromNodeNo
= fromNodeNo
;
1777 rdmaPacket
->localBuffer
= (void *)buffer
;
1779 buffer
->type
= BUFFER_RDMA
;
1780 buffer
->size
= rdmaPacket
->remoteSize
;
1782 buffer
->buf
= (char *)CmiAlloc(rdmaPacket
->remoteSize
);
1783 // CmiAssert(buffer->buf != NULL);
1785 buffer
->key
= METADATAFIELD(buffer
->buf
)->key
;
1788 MACHSTATE3(3,"received rdma request from node %d for remoteBuffer %p keyPtr %p",nodeNo
,rdmaPacket
->remoteBuf
,rdmaPacket
->keyPtr
);
1789 MACHSTATE3(3,"Local buffer->buf %p buffer->key %p rdmaPacket %p",buffer
->buf
,buffer
->key
,rdmaPacket
);
1790 // CmiAssert(buffer->key != NULL);
1793 struct ibv_sge list
= {
1794 .addr
= (uintptr_t )buffer
->buf
,
1795 .length
= buffer
->size
,
1796 .lkey
= buffer
->key
->lkey
1799 struct ibv_send_wr
*bad_wr
;
1800 struct ibv_send_wr wr
= {
1801 .wr_id
= (uint64_t )rdmaPacket
,
1804 .opcode
= IBV_WR_RDMA_READ
,
1805 .send_flags
= IBV_SEND_SIGNALED
,
1807 .remote_addr
= (uint64_t )rdmaPacket
->remoteBuf
,
1808 .rkey
= rdmaPacket
->key
.rkey
1811 /** post and rdma_read that is a rdma get*/
1812 if(ibv_post_send(node
->infiData
->qp
,&wr
,&bad_wr
)){
1813 CmiAbort("ibv_post_send failed");
1819 static inline void EnqueueRdmaAck(struct infiRdmaPacket
*rdmaPacket
);
1820 static inline void processDirectWC(struct infiRdmaPacket
*rdmaPacket
);
1822 static inline void processRdmaWC(struct ibv_wc
*rdmaWC
,const int toBuffer
){
1824 #if CMK_IBVERBS_STATS
1825 double _startRegTime
;
1828 struct infiRdmaPacket
*rdmaPacket
= (struct infiRdmaPacket
*) rdmaWC
->wr_id
;
1829 /* if(rdmaPacket->type == INFI_DIRECT){
1830 processDirectWC(rdmaPacket);
1833 // CmiAssert(rdmaPacket->type == INFI_MESG);
1834 struct infiBuffer
*buffer
= (struct infiBuffer
*)rdmaPacket
->localBuffer
;
1837 memcpy(buffer->buf,rdmaInBuf,rdmaWC->byte_len);*/
1839 /* CmiAssert(buffer->type == BUFFER_RDMA);
1840 CmiAssert(rdmaWC->byte_len == buffer->size);*/
1844 int rank
, srcpe
, seqno
, magic
, i
;
1846 char *msg
= buffer
->buf
;
1847 DgramHeaderBreak(msg
, rank
, srcpe
, magic
, seqno
, broot
);
1848 size
= CMI_MSG_SIZE(msg
);
1849 /* CmiAssert(size == buffer->size);*/
1850 handoverMessage(buffer
->buf
,size
,rank
,broot
,toBuffer
);
1852 MACHSTATE2(3,"Rdma done for buffer->buf %p buffer->key %p",buffer
->buf
,buffer
->key
);
1857 OtherNode node
=&nodes
[rdmaPacket
->fromNodeNo
];
1858 //we are sending this ack as a response to a successful
1859 // rdma_Read.. the token for that rdma_Read needs to be freed
1860 #if CMK_IBVERBS_TOKENS_FLOW
1861 //node->infiData->tokensLeft++;
1862 context
->tokensLeft
++;
1865 //send ack to sender if toBuffer is off otherwise buffer it
1867 MACHSTATE1(3,"Buffering Rdma Ack %p",rdmaPacket
);
1868 struct infiRdmaPacket
*tmp
= context
->bufferedRdmaAcks
;
1869 context
->bufferedRdmaAcks
= rdmaPacket
;
1870 rdmaPacket
->next
= tmp
;
1871 rdmaPacket
->prev
= NULL
;
1873 tmp
->prev
= rdmaPacket
;
1876 EnqueueRdmaAck(rdmaPacket
);
1881 static inline void EnqueueRdmaAck(struct infiRdmaPacket
*rdmaPacket
){
1883 OtherNode node
=&nodes
[rdmaPacket
->fromNodeNo
];
1886 MallocInfiPacket(packet
);
1888 struct infiRdmaPacket
*ackPacket
= (struct infiRdmaPacket
*)CmiAlloc(sizeof(struct infiRdmaPacket
));
1889 *ackPacket
= *rdmaPacket
;
1890 packet
->size
= sizeof(struct infiRdmaPacket
);
1891 packet
->buf
= (char *)ackPacket
;
1892 packet
->header
.code
= INFIRDMA_ACK
;
1895 struct ibv_mr
*packetKey
= METADATAFIELD((void *)ackPacket
)->key
;
1898 EnqueuePacket(node
,packet
,sizeof(struct infiRdmaPacket
),packetKey
);
1903 static inline void processRdmaAck(struct infiRdmaPacket
*rdmaPacket
){
1904 MACHSTATE2(3,"rdma ack received for remoteBuf %p size %d",rdmaPacket
->remoteBuf
,rdmaPacket
->remoteSize
);
1905 rdmaPacket
->ogm
->refcount
--;
1906 GarbageCollectMsg(rdmaPacket
->ogm
);
1910 /****************************
1911 Deal with all the buffered (delayed) messages
1912 such as processing recvd broadcasts, sending
1913 rdma acks and processing recvd rdma requests
1914 ******************************/
1917 static inline infiBufferedBcastPool
createBcastPool(){
1919 infiBufferedBcastPool ret
= malloc(sizeof(struct infiBufferedBcastPoolStruct
));
1921 ret
->next
= ret
->prev
= NULL
;
1922 for(i
=0;i
<BCASTLIST_SIZE
;i
++){
1923 ret
->bcastList
[i
].valid
= 0;
1928 The buffered bcast messages are stored in a doubly linked list of
1930 To keep the average insert cost low, a new block is added
1931 to the top of the list. (resulting in a reverse seq of blocks)
1932 Within a block however bcast are stored in increasing order sequence
1935 static void insertBufferedBcast(char *msg
,int size
,int broot
,int asm_rank
){
1936 if(context
->bufferedBcastList
== NULL
){
1937 context
->bufferedBcastList
= createBcastPool();
1939 if(context
->bufferedBcastList
->count
== BCASTLIST_SIZE
){
1940 infiBufferedBcastPool tmp
;
1941 tmp
= createBcastPool();
1942 context
->bufferedBcastList
->prev
= tmp
;
1943 tmp
->next
= context
->bufferedBcastList
;
1944 context
->bufferedBcastList
= tmp
;
1947 context
->bufferedBcastList
->bcastList
[context
->bufferedBcastList
->count
].msg
= msg
;
1948 context
->bufferedBcastList
->bcastList
[context
->bufferedBcastList
->count
].size
= size
;
1949 context
->bufferedBcastList
->bcastList
[context
->bufferedBcastList
->count
].broot
= broot
;
1950 context
->bufferedBcastList
->bcastList
[context
->bufferedBcastList
->count
].asm_rank
= asm_rank
;
1951 context
->bufferedBcastList
->bcastList
[context
->bufferedBcastList
->count
].valid
= 1;
1953 MACHSTATE3(3,"Broadcast msg %p of size %d being buffered at count %d ",msg
,size
,context
->bufferedBcastList
->count
);
1955 context
->bufferedBcastList
->count
++;
1959 Go through the blocks of buffered bcast messages. process last block first
1960 processign within a block is in sequence though
1962 static inline void processBufferedBcast(){
1963 infiBufferedBcastPool start
;
1965 if(context
->bufferedBcastList
== NULL
){
1968 start
= context
->bufferedBcastList
;
1969 if(context
->insideProcessBufferedBcasts
==1){
1972 context
->insideProcessBufferedBcasts
=1;
1974 while(start
->next
!= NULL
){
1975 start
= start
->next
;
1978 while(start
!= NULL
){
1980 infiBufferedBcastPool tmp
;
1981 if(start
->count
!= 0){
1982 MACHSTATE2(3,"start %p start->count %d[[[",start
,start
->count
);
1984 for(i
=0;i
<start
->count
;i
++){
1985 if(start
->bcastList
[i
].valid
== 0){
1988 start
->bcastList
[i
].valid
=0;
1989 MACHSTATE3(3,"Buffered broadcast msg %p of size %d being processed at %d",start
->bcastList
[i
].msg
,start
->bcastList
[i
].size
,i
);
1991 handleOneRecvedMsg(start
->bcastList
[i
].size
, start
->bcastList
[i
].msg
);
1992 start
->bcastList
[i
].msg
= NULL
;
1995 if(start
->count
!= 0){
1996 MACHSTATE2(3,"]]] start %p start->count %d",start
,start
->count
);
2000 start
= start
->prev
;
2008 context
->bufferedBcastList
= NULL
;
2009 /* context->bufferedBcastList->prev = NULL;
2010 context->bufferedBcastList->count =0; */
2011 context
->insideProcessBufferedBcasts
=0;
2012 MACHSTATE(2,"processBufferedBcast done ");
2016 static inline void processBufferedRdmaAcks(){
2017 struct infiRdmaPacket
*start
= context
->bufferedRdmaAcks
;
2021 while(start
->next
!= NULL
){
2022 start
= start
->next
;
2024 while(start
!= NULL
){
2025 struct infiRdmaPacket
*rdmaPacket
=start
;
2026 MACHSTATE1(3,"Processing Buffered Rdma Ack %p",rdmaPacket
);
2027 EnqueueRdmaAck(rdmaPacket
);
2028 start
= start
->prev
;
2031 context
->bufferedRdmaAcks
=NULL
;
2036 static inline void processBufferedRdmaRequests(){
2037 struct infiRdmaPacket
*start
= context
->bufferedRdmaRequests
;
2043 while(start
->next
!= NULL
){
2044 start
= start
->next
;
2046 while(start
!= NULL
){
2047 struct infiRdmaPacket
*rdmaPacket
=start
;
2048 MACHSTATE1(3,"Processing Buffered Rdma Request %p",rdmaPacket
);
2049 processRdmaRequest(rdmaPacket
,rdmaPacket
->fromNodeNo
,1);
2050 start
= start
->prev
;
2053 context
->bufferedRdmaRequests
=NULL
;
2060 static inline void processAllBufferedMsgs(){
2061 #if CMK_IBVERBS_STATS
2062 double _startTime
= CmiWallTimer();
2063 processBufferedCount
++;
2065 processBufferedBcast();
2067 processBufferedRdmaAcks();
2068 processBufferedRdmaRequests();
2069 #if CMK_IBVERBS_STATS
2070 processBufferedTime
+= (CmiWallTimer()-_startTime
);
2075 /*************************
2076 Increase tokens when short of them
2078 static inline void increaseTokens(OtherNode node
){
2080 int increase
= node
->infiData
->totalTokens
*INCTOKENS_INCREASE
;
2081 if(node
->infiData
->totalTokens
+ increase
> maxTokens
){
2082 increase
= maxTokens
-node
->infiData
->totalTokens
;
2084 node
->infiData
->totalTokens
+= increase
;
2085 node
->infiData
->tokensLeft
+= increase
;
2086 MACHSTATE3(3,"Increasing tokens for node %d to %d by %d",node
->infiData
->nodeNo
,node
->infiData
->totalTokens
,increase
);
2087 //increase the size of the sendCq
2088 int currentCqSize
= context
->sendCqSize
;
2089 if(ibv_resize_cq(context
->sendCq
,currentCqSize
+increase
)){
2090 fprintf(stderr
,"[%d] failed to increase cq by %d from %d totalTokens %d \n",CmiMyNodeGlobal() ,increase
,currentCqSize
, node
->infiData
->totalTokens
);
2091 CmiAbort("ibv_resize_cq failed");
2093 context
->sendCqSize
+= increase
;
2096 static void increasePostedRecvs(int nodeNo
){
2097 OtherNode node
= &nodes
[nodeNo
];
2098 int tokenIncrease
= node
->infiData
->postedRecvs
*INCTOKENS_INCREASE
;
2099 int recvIncrease
= tokenIncrease
;
2100 if(tokenIncrease
+node
->infiData
->postedRecvs
> maxTokens
){
2101 tokenIncrease
= maxTokens
- node
->infiData
->postedRecvs
;
2103 if(tokenIncrease
+context
->srqSize
> maxRecvBuffers
){
2104 recvIncrease
= maxRecvBuffers
-context
->srqSize
;
2106 node
->infiData
->postedRecvs
+= recvIncrease
;
2107 context
->srqSize
+= recvIncrease
;
2108 MACHSTATE3(3,"Increase tokens by %d to %d for node %d ",tokenIncrease
,node
->infiData
->postedRecvs
,nodeNo
);
2109 //increase the size of the recvCq
2110 int currentCqSize
= context
->recvCqSize
;
2111 if(ibv_resize_cq(context
->recvCq
,currentCqSize
+tokenIncrease
)){
2112 CmiAbort("ibv_resize_cq failed");
2114 context
->recvCqSize
+= tokenIncrease
;
2115 if(recvIncrease
> 0){
2116 //create another bufferPool and attach it to the top of the current one
2117 struct infiBufferPool
*newPool
= allocateInfiBufferPool(recvIncrease
,packetSize
);
2118 newPool
->next
= context
->recvBufferPool
;
2119 context
->recvBufferPool
= newPool
;
2120 postInitialRecvs(newPool
,recvIncrease
,packetSize
);
2128 /*********************************************
2129 Memory management routines for RDMA
2131 ************************************************/
2134 There are INFINUMPOOLS of memory.
2135 The first pool is of size firstBinSize.
2136 The ith pool is of size firstBinSize*2^i
2139 static void initInfiCmiChunkPools(){
2141 int size
= firstBinSize
;
2144 #if THREAD_MULTI_POOL
2145 nodeSize
= CmiMyNodeSize() + 1;
2146 infiCmiChunkPools
= malloc(sizeof(infiCmiChunkPool
*) * nodeSize
);
2147 for(i
= 0; i
< nodeSize
; i
++){
2148 infiCmiChunkPools
[i
] = malloc(sizeof(infiCmiChunkPool
) * INFINUMPOOLS
);
2150 for(j
= 0; j
< nodeSize
; j
++){
2151 size
= firstBinSize
;
2152 for(i
=0;i
<INFINUMPOOLS
;i
++){
2153 infiCmiChunkPools
[j
][i
].size
= size
;
2154 infiCmiChunkPools
[j
][i
].startBuf
= NULL
;
2155 infiCmiChunkPools
[j
][i
].count
= 0;
2160 // creating the n^2 system of queues
2161 queuePool
= malloc(sizeof(PCQueue
*) * nodeSize
);
2162 for(i
= 0; i
< nodeSize
; i
++){
2163 queuePool
[i
] = malloc(sizeof(PCQueue
) * nodeSize
);
2165 for(i
= 0; i
< nodeSize
; i
++)
2166 for(j
= 0; j
< nodeSize
; j
++)
2167 queuePool
[i
][j
] = PCQueueCreate();
2171 size
= firstBinSize
;
2172 for(i
=0;i
<INFINUMPOOLS
;i
++){
2173 infiCmiChunkPools
[i
].size
= size
;
2174 infiCmiChunkPools
[i
].startBuf
= NULL
;
2175 infiCmiChunkPools
[i
].count
= 0;
2183 Register memory for a part of a received multisend message
2185 infiCmiChunkMetaData
*registerMultiSendMesg(char *msg
,int size
){
2186 infiCmiChunkMetaData
*metaData
= malloc(sizeof(infiCmiChunkMetaData
));
2187 char *res
=msg
-sizeof(infiCmiChunkHeader
);
2188 metaData
->key
= ibv_reg_mr(context
->pd
,res
,(size
+sizeof(infiCmiChunkHeader
)),IBV_ACCESS_REMOTE_READ
| IBV_ACCESS_LOCAL_WRITE
| IBV_ACCESS_REMOTE_WRITE
);
2189 #if CMK_IBVERBS_STATS
2194 CmiAssert(metaData
->key
!=NULL
);
2195 metaData
->owner
= NULL
;
2196 metaData
->poolIdx
= INFIMULTIPOOL
;
2202 #if THREAD_MULTI_POOL
2204 // Fills up the buffer pools for every thread in the node
2205 static inline void fillBufferPools(){
2206 int nodeSize
, poolIdx
, thread
;
2207 infiCmiChunkMetaData
*metaData
;
2208 infiCmiChunkHeader
*hdr
;
2215 // initializing values
2216 nodeSize
= CmiMyNodeSize() + 1;
2218 // iterating over all threads and all pools
2219 for(thread
= 0; thread
< nodeSize
; thread
++){
2220 for(poolIdx
= 0; poolIdx
< INFINUMPOOLS
; poolIdx
++){
2221 allocSize
= infiCmiChunkPools
[thread
][poolIdx
].size
;
2222 if(poolIdx
< blockThreshold
){
2223 count
= blockAllocRatio
;
2227 posix_memalign(&res
, ALIGN_BYTES
, (allocSize
+sizeof(infiCmiChunkHeader
))*count
);
2229 key
= ibv_reg_mr(context
->pd
,res
,(allocSize
+sizeof(infiCmiChunkHeader
))*count
,IBV_ACCESS_REMOTE_READ
| IBV_ACCESS_LOCAL_WRITE
| IBV_ACCESS_REMOTE_WRITE
);
2230 CmiAssert(key
!= NULL
);
2231 #if CMK_IBVERBS_STATS
2235 res
+= sizeof(infiCmiChunkHeader
);
2236 for(i
=0;i
<count
;i
++){
2237 metaData
= METADATAFIELD(res
) = malloc(sizeof(infiCmiChunkMetaData
));
2238 metaData
->key
= key
;
2239 metaData
->owner
= hdr
;
2240 metaData
->poolIdx
= poolIdx
;
2241 metaData
->parentPe
= thread
; // setting the parent PE
2243 metaData
->owner
->metaData
->count
= count
;
2244 metaData
->nextBuf
= NULL
;
2245 infiCmiChunkPools
[thread
][poolIdx
].startBuf
= res
- sizeof(infiCmiChunkHeader
);
2246 infiCmiChunkPools
[thread
][poolIdx
].count
++;
2248 void *startBuf
= res
- sizeof(infiCmiChunkHeader
);
2249 metaData
->nextBuf
= infiCmiChunkPools
[thread
][poolIdx
].startBuf
;
2250 infiCmiChunkPools
[thread
][poolIdx
].startBuf
= startBuf
;
2251 infiCmiChunkPools
[thread
][poolIdx
].count
++;
2254 res
+= (allocSize
+sizeof(infiCmiChunkHeader
));
2261 static inline void *getInfiCmiChunkThread(int dataSize
){
2262 //find out to which pool this dataSize belongs to
2263 // poolIdx = floor(log2(dataSize/firstBinSize))+1
2264 int ratio
= dataSize
/firstBinSize
;
2271 MACHSTATE1(2,"Rank=%d",CmiMyRank());
2272 MACHSTATE1(3,"INFI_ALLOC %d",CmiMyRank());
2278 MACHSTATE1(2,"This is %d",CmiMyRank());
2279 MACHSTATE2(2,"getInfiCmiChunk for size %d in poolIdx %d",dataSize
,poolIdx
);
2281 // checking whether to analyze the free queues to reuse buffers
2282 nodeSize
= CmiMyNodeSize() + 1;
2283 if(poolIdx
< INFINUMPOOLS
&& infiCmiChunkPools
[CmiMyRank()][poolIdx
].startBuf
== NULL
){
2284 MACHSTATE1(3,"Disposing memory %d",CmiMyRank());
2285 for(i
= 0; i
< nodeSize
; i
++){
2286 if(!PCQueueEmpty(queuePool
[CmiMyRank()][i
])){
2287 for(j
= 0; j
< PCQueueLength(queuePool
[CmiMyRank()][i
]); j
++){
2288 pointer
= (void *)PCQueuePop(queuePool
[CmiMyRank()][i
]);
2289 infi_CmiFreeDirect(pointer
);
2295 if((poolIdx
< INFINUMPOOLS
&& infiCmiChunkPools
[CmiMyRank()][poolIdx
].startBuf
== NULL
) || poolIdx
>= INFINUMPOOLS
){
2296 infiCmiChunkMetaData
*metaData
;
2297 infiCmiChunkHeader
*hdr
;
2305 if(poolIdx
< INFINUMPOOLS
){
2306 allocSize
= infiCmiChunkPools
[CmiMyRank()][poolIdx
].size
;
2308 allocSize
= dataSize
;
2311 if(poolIdx
< blockThreshold
){
2312 count
= blockAllocRatio
;
2314 posix_memalign(&res
, ALIGN_BYTES
, (allocSize
+sizeof(infiCmiChunkHeader
))*count
);
2318 key
= ibv_reg_mr(context
->pd
,res
,(allocSize
+sizeof(infiCmiChunkHeader
))*count
,IBV_ACCESS_REMOTE_READ
| IBV_ACCESS_LOCAL_WRITE
| IBV_ACCESS_REMOTE_WRITE
);
2320 CmiAbort("ibv_reg_mr failed to pin memory\n");
2321 #if CMK_IBVERBS_STATS
2326 origres
= (res
+= sizeof(infiCmiChunkHeader
));
2328 for(i
=0;i
<count
;i
++){
2329 metaData
= METADATAFIELD(res
) = malloc(sizeof(infiCmiChunkMetaData
));
2330 _MEMCHECK(metaData
);
2331 metaData
->key
= key
;
2332 metaData
->owner
= hdr
;
2333 metaData
->poolIdx
= poolIdx
;
2334 metaData
->parentPe
= CmiMyRank(); // setting the parent PE
2337 metaData
->owner
->metaData
->count
= count
;
2338 metaData
->nextBuf
= NULL
;
2340 void *startBuf
= res
- sizeof(infiCmiChunkHeader
);
2341 metaData
->nextBuf
= infiCmiChunkPools
[CmiMyRank()][poolIdx
].startBuf
;
2342 infiCmiChunkPools
[CmiMyRank()][poolIdx
].startBuf
= startBuf
;
2343 infiCmiChunkPools
[CmiMyRank()][poolIdx
].count
++;
2347 res
+= (allocSize
+sizeof(infiCmiChunkHeader
));
2352 MACHSTATE3(3,"AllocSize %d buf %p key %p",allocSize
,res
,metaData
->key
);
2356 if(poolIdx
< INFINUMPOOLS
){
2357 infiCmiChunkMetaData
*metaData
;
2359 res
= infiCmiChunkPools
[CmiMyRank()][poolIdx
].startBuf
;
2360 res
+= sizeof(infiCmiChunkHeader
);
2362 MACHSTATE2(2,"Reusing old pool %d buf %p",poolIdx
,res
);
2363 metaData
= METADATAFIELD(res
);
2365 infiCmiChunkPools
[CmiMyRank()][poolIdx
].startBuf
= metaData
->nextBuf
;
2366 MACHSTATE2(1,"Pool %d now has startBuf at %p",poolIdx
,infiCmiChunkPools
[CmiMyRank()][poolIdx
].startBuf
);
2368 metaData
->nextBuf
= NULL
;
2369 // CmiAssert(metaData->poolIdx == poolIdx);
2371 infiCmiChunkPools
[CmiMyRank()][poolIdx
].count
--;
2375 CmiAbort("getInfiCmiChunkThread failed");
2379 #else /* not MULTIPOOL case */
2380 static inline void *getInfiCmiChunk(int dataSize
){
2381 //find out to which pool this dataSize belongs to
2382 // poolIdx = floor(log2(dataSize/firstBinSize))+1
2383 int ratio
= dataSize
/firstBinSize
;
2386 #if CMK_IBVERBS_STATS
2387 if(numAlloc
>10000 && numAlloc
%1000==0)
2389 printf("[%d] numReg %d numUnReg %d numCurReg %d numAlloc %d numFree %d msgCount %d pktCount %d packetSize %d total Time %.6lf s processBufferedCount %d processBufferedTime %.6lf s maxTokens %d tokensLeft %d \n",CmiMyNodeGlobal(),numReg
, numUnReg
, numCurReg
, numAlloc
, numFree
, msgCount
,pktCount
,packetSize
,CmiTimer(),processBufferedCount
,processBufferedTime
,maxTokens
,context
->tokensLeft
);
2390 /* printf("[%d] numMultiSendUnreg %d numMultiSend %d numMultiSendFree %d\n", CmiMyNodeGlobal(), numMultiSendUnreg, numMultiSend, numMultiSendFree);*/
2397 MACHSTATE2(2,"getInfiCmiChunk for size %d in poolIdx %d",dataSize
,poolIdx
);
2398 if((poolIdx
< INFINUMPOOLS
&& infiCmiChunkPools
[poolIdx
].startBuf
== NULL
) || poolIdx
>= INFINUMPOOLS
){
2399 infiCmiChunkMetaData
*metaData
;
2400 infiCmiChunkHeader
*hdr
;
2408 if(poolIdx
< INFINUMPOOLS
){
2409 allocSize
= infiCmiChunkPools
[poolIdx
].size
;
2411 allocSize
= dataSize
;
2414 if(poolIdx
< blockThreshold
){
2415 count
= blockAllocRatio
;
2417 posix_memalign(&res
, ALIGN_BYTES
, (allocSize
+sizeof(infiCmiChunkHeader
))*count
);
2418 hdr
= (infiCmiChunkHeader
*)res
;
2420 key
= ibv_reg_mr(context
->pd
,res
,(allocSize
+sizeof(infiCmiChunkHeader
))*count
,IBV_ACCESS_REMOTE_READ
| IBV_ACCESS_LOCAL_WRITE
| IBV_ACCESS_REMOTE_WRITE
);
2421 CmiAssert(key
!= NULL
);
2422 #if CMK_IBVERBS_STATS
2426 origres
= (res
+= sizeof(infiCmiChunkHeader
));
2428 for(i
=0;i
<count
;i
++){
2429 metaData
= METADATAFIELD(res
) = malloc(sizeof(infiCmiChunkMetaData
));
2430 metaData
->key
= key
;
2431 metaData
->owner
= hdr
;
2432 metaData
->poolIdx
= poolIdx
;
2435 metaData
->owner
->metaData
->count
= count
;
2436 metaData
->nextBuf
= NULL
;
2438 void *startBuf
= res
- sizeof(infiCmiChunkHeader
);
2439 metaData
->nextBuf
= infiCmiChunkPools
[poolIdx
].startBuf
;
2440 infiCmiChunkPools
[poolIdx
].startBuf
= startBuf
;
2441 infiCmiChunkPools
[poolIdx
].count
++;
2445 res
+= (allocSize
+sizeof(infiCmiChunkHeader
));
2450 MACHSTATE3(2,"AllocSize %d buf %p key %p",allocSize
,res
,metaData
->key
);
2454 if(poolIdx
< INFINUMPOOLS
){
2455 infiCmiChunkMetaData
*metaData
;
2457 res
= infiCmiChunkPools
[poolIdx
].startBuf
;
2458 res
+= sizeof(infiCmiChunkHeader
);
2460 MACHSTATE2(2,"Reusing old pool %d buf %p",poolIdx
,res
);
2461 metaData
= METADATAFIELD(res
);
2463 infiCmiChunkPools
[poolIdx
].startBuf
= metaData
->nextBuf
;
2464 MACHSTATE2(1,"Pool %d now has startBuf at %p",poolIdx
,infiCmiChunkPools
[poolIdx
].startBuf
);
2466 metaData
->nextBuf
= NULL
;
2467 // CmiAssert(metaData->poolIdx == poolIdx);
2469 infiCmiChunkPools
[poolIdx
].count
--;
2473 CmiAbort("getInfiCmiChunk failed");
2480 void * infi_CmiAlloc(int size
){
2482 #if CMK_IBVERBS_STATS
2485 if (Cmi_charmrun_fd
== -1) {
2486 posix_memalign(&res
, ALIGN_BYTES
, size
+ sizeof(void*));
2487 res
+= sizeof(void*);
2490 #if THREAD_MULTI_POOL
2491 res
= getInfiCmiChunkThread(size
-sizeof(CmiChunkHeader
));
2492 res
-= sizeof(CmiChunkHeader
);
2499 /*( if(size-sizeof(CmiChunkHeader) > firstBinSize){*/
2500 MACHSTATE1(1,"infi_CmiAlloc for dataSize %d",size
-sizeof(CmiChunkHeader
));
2502 res
= (char*)getInfiCmiChunk(size
-sizeof(CmiChunkHeader
));
2503 res
-= sizeof(CmiChunkHeader
);
2515 #if THREAD_MULTI_POOL
2516 //Note: this function receives a pointer to the data, so that it is not necessary to add any sizeof(CmiChunkHeader) to it.
2517 void infi_CmiFreeDirect(void *ptr
){
2520 void *freePtr
= ptr
;
2521 #if CMK_IBVERBS_STATS
2525 //ptr += sizeof(CmiChunkHeader);
2526 size
= SIZEFIELD (ptr
);
2527 /* if(size > firstBinSize){*/
2528 infiCmiChunkMetaData
*metaData
;
2530 //there is a infiniband specific header
2531 freePtr
= ptr
- sizeof(infiCmiChunkHeader
);
2532 metaData
= METADATAFIELD(ptr
);
2533 poolIdx
= metaData
->poolIdx
;
2534 infiCmiChunkPool
*pool
= infiCmiChunkPools
[CmiMyRank()] + poolIdx
;
2535 MACHSTATE2(1,"CmiFree buf %p goes back to pool %d",ptr
,poolIdx
);
2536 // CmiAssert(poolIdx >= 0);
2537 if(poolIdx
< INFINUMPOOLS
&& pool
->count
< INFIMAXPERPOOL
&&
2538 pool
->count
< ((1 << INFINUMPOOLS
) >> poolIdx
) ){
2539 metaData
->nextBuf
= pool
->startBuf
;
2540 pool
->startBuf
= freePtr
;
2542 MACHSTATE3(2,"Pool %d now has startBuf at %p count %d",poolIdx
,pool
->startBuf
,pool
->count
);
2544 MACHSTATE2(2,"Freeing up buf %p poolIdx %d",ptr
,poolIdx
);
2545 metaData
->owner
->metaData
->count
--;
2546 if(metaData
->owner
->metaData
== metaData
){
2548 if(metaData
->owner
->metaData
->count
== 0){
2549 //all the chunks have been freed
2550 int unregstat
=ibv_dereg_mr(metaData
->key
);
2551 #if CMK_IBVERBS_STATS
2556 CmiAssert(unregstat
==0);
2560 //if I am the owner and all the chunks have not been
2561 // freed dont free my metaData. will need later
2563 if(metaData
->owner
->metaData
->count
== 0){
2564 //need to free the owner's buffer and metadata
2565 freePtr
= metaData
->owner
;
2566 int unregstat
=ibv_dereg_mr(metaData
->key
);
2567 #if CMK_IBVERBS_STATS
2572 CmiAssert(unregstat
==0);
2573 free(metaData
->owner
->metaData
);
2582 void infi_CmiFree(void *ptr
){
2589 void *freePtr
= ptr
;
2590 nodeSize
= CmiMyNodeSize() + 1;
2592 MACHSTATE(3,"Freeing");
2594 if (Cmi_charmrun_fd
== -1) { char *res
= ptr
; res
-= sizeof(void*); free(res
); return; }
2595 ptr
+= sizeof(CmiChunkHeader
);
2596 size
= SIZEFIELD (ptr
);
2597 /* if(size > firstBinSize){*/
2598 infiCmiChunkMetaData
*metaData
;
2600 //there is a infiniband specific header
2601 freePtr
= ptr
- sizeof(infiCmiChunkHeader
);
2602 metaData
= METADATAFIELD(ptr
);
2603 poolIdx
= metaData
->poolIdx
;
2605 if(poolIdx
== INFIMULTIPOOL
){
2606 /** this is a part of a received mult message
2607 it will be freed correctly later
2609 #if CMK_IBVERBS_STATS
2616 // checking if this free operation is my responsibility
2617 parentPe
= metaData
->parentPe
;
2618 if(parentPe
!= CmiMyRank()){
2619 PCQueuePush(queuePool
[parentPe
][CmiMyRank()],(char *)ptr
);
2624 infi_CmiFreeDirect(ptr
);
2629 void infi_CmiFree(void *ptr
){
2631 void *freePtr
= ptr
;
2632 #if CMK_IBVERBS_STATS
2636 if (Cmi_charmrun_fd
== -1) { char *res
= ptr
; res
-= sizeof(void*); free(res
); return; }
2640 ptr
+= sizeof(CmiChunkHeader
);
2641 size
= SIZEFIELD (ptr
);
2642 /* if(size > firstBinSize){*/
2643 infiCmiChunkMetaData
*metaData
;
2645 //there is a infiniband specific header
2646 freePtr
= (char*)ptr
- sizeof(infiCmiChunkHeader
);
2647 metaData
= METADATAFIELD(ptr
);
2648 poolIdx
= metaData
->poolIdx
;
2649 if(poolIdx
== INFIMULTIPOOL
){
2650 /** this is a part of a received mult message
2651 it will be freed correctly later
2653 #if CMK_IBVERBS_STATS
2658 MACHSTATE2(1,"CmiFree buf %p goes back to pool %d",ptr
,poolIdx
);
2659 // CmiAssert(poolIdx >= 0);
2660 if(poolIdx
< INFINUMPOOLS
&&
2661 infiCmiChunkPools
[poolIdx
].count
<= INFIMAXPERPOOL
&&
2662 infiCmiChunkPools
[poolIdx
].count
< ((1 << INFINUMPOOLS
) >> poolIdx
) ){
2663 metaData
->nextBuf
= infiCmiChunkPools
[poolIdx
].startBuf
;
2664 infiCmiChunkPools
[poolIdx
].startBuf
= freePtr
;
2665 infiCmiChunkPools
[poolIdx
].count
++;
2667 MACHSTATE3(2,"Pool %d now has startBuf at %p count %d",poolIdx
,infiCmiChunkPools
[poolIdx
].startBuf
,infiCmiChunkPools
[poolIdx
].count
);
2669 MACHSTATE2(2,"Freeing up buf %p poolIdx %d",ptr
,poolIdx
);
2670 metaData
->owner
->metaData
->count
--;
2671 if(metaData
->owner
->metaData
== metaData
){
2673 if(metaData
->owner
->metaData
->count
== 0){
2674 //all the chunks have been freed
2675 int unregstat
=ibv_dereg_mr(metaData
->key
);
2676 #if CMK_IBVERBS_STATS
2681 CmiAssert(unregstat
==0);
2685 //if I am the owner and all the chunks have not been
2686 // freed dont free my metaData. will need later
2688 if(metaData
->owner
->metaData
->count
== 0){
2689 //need to free the owner's buffer and metadata
2690 freePtr
= metaData
->owner
;
2691 int unregstat
=ibv_dereg_mr(metaData
->key
);
2692 #if CMK_IBVERBS_STATS
2697 CmiAssert(unregstat
==0);
2698 free(metaData
->owner
->metaData
);
2713 /*********************************************************************************************
2714 This section is for CmiDirect. This is a variant of the persistent communication in which
2715 the user can transfer data between processors without using Charm++ messages. This lets the user
2716 send and receive data from the middle of his arrays without any copying on either send or receive
2718 *********************************************************************************************/
2720 struct infiDirectRequestPacket
{
2723 struct ibv_mr senderKey
;
2728 #include "cmidirect.h"
2730 #define MAXHANDLES 512
2732 struct infiDirectHandleStruct
;
2735 typedef struct directPollingQNodeStruct
{
2736 struct infiDirectHandleStruct
*handle
;
2737 struct directPollingQNodeStruct
*next
;
2739 } directPollingQNode
;
2741 typedef struct infiDirectHandleStruct
{
2746 void (*callbackFnPtr
)(void *);
2748 // struct infiDirectRequestPacket *packet;
2749 struct infiDirectUserHandle userHandle
;
2750 struct infiRdmaPacket
*rdmaPacket
;
2751 directPollingQNode pollingQNode
;
2754 typedef struct infiDirectHandleTableStruct
{
2755 infiDirectHandle handles
[MAXHANDLES
];
2756 struct infiDirectHandleTableStruct
*next
;
2757 } infiDirectHandleTable
;
2762 directPollingQNode
*headDirectPollingQ
=NULL
,*tailDirectPollingQ
=NULL
;
2764 static infiDirectHandleTable
**sendHandleTable
=NULL
;
2765 static infiDirectHandleTable
**recvHandleTable
=NULL
;
2767 static int *recvHandleCount
=NULL
;
2769 void addHandleToPollingQ(infiDirectHandle
*handle
){
2770 // directPollingQNode *newNode = malloc(sizeof(directPollingQNode));
2771 directPollingQNode
*newNode
= &(handle
->pollingQNode
);
2772 newNode
->handle
= handle
;
2773 newNode
->next
= NULL
;
2774 if(headDirectPollingQ
==NULL
){
2776 headDirectPollingQ
= newNode
;
2777 tailDirectPollingQ
= newNode
;
2779 tailDirectPollingQ
->next
= newNode
;
2780 tailDirectPollingQ
= newNode
;
2784 infiDirectHandle *removeHandleFromPollingQ(){
2785 if(headDirectPollingQ == NULL){
2786 //polling Q is empty
2789 directPollingQNode *retNode = headDirectPollingQ;
2790 if(headDirectPollingQ == tailDirectPollingQ){
2791 //PollingQ has one node
2792 headDirectPollingQ = tailDirectPollingQ = NULL;
2794 headDirectPollingQ = headDirectPollingQ->next;
2796 infiDirectHandle *retHandle = retNode->handle;
2801 static inline infiDirectHandleTable
**createHandleTable(){
2802 infiDirectHandleTable
**table
= malloc(Lrts_numNodes
*sizeof(infiDirectHandleTable
*));
2804 for(i
=0;i
<Lrts_numNodes
;i
++){
2810 static inline void calcHandleTableIdx(int handle
,int *tableIdx
,int *idx
){
2811 *tableIdx
= handle
/MAXHANDLES
;
2812 *idx
= handle
%MAXHANDLES
;
2815 static inline void initializeLastDouble(void *recvBuf
,int recvBufSize
,double initialValue
)
2817 /** initialize the last double in the buffer to bufize***/
2818 int index
= recvBufSize
- sizeof(double);
2819 double *lastDouble
= (double *)(((char *)recvBuf
)+index
);
2820 *lastDouble
= initialValue
;
2825 To be called on the receiver to create a handle and return its number
2827 struct infiDirectUserHandle
CmiDirect_createHandle(int senderNode
,void *recvBuf
, int recvBufSize
, void (*callbackFnPtr
)(void *), void *callbackData
,double initialValue
){
2831 infiDirectHandleTable
*table
;
2832 struct infiDirectUserHandle userHandle
;
2834 CmiAssert(recvBufSize
> sizeof(double));
2836 if(recvHandleTable
== NULL
){
2837 recvHandleTable
= createHandleTable();
2838 recvHandleCount
= malloc(sizeof(int)*CmiNumNodesGlobal());
2839 for(i
=0;i
<CmiNumNodesGlobal();i
++){
2840 recvHandleCount
[i
] = -1;
2843 if(recvHandleTable
[senderNode
] == NULL
){
2844 recvHandleTable
[senderNode
] = malloc(sizeof(infiDirectHandleTable
));
2845 recvHandleTable
[senderNode
]->next
= NULL
;
2848 newHandle
= ++recvHandleCount
[senderNode
];
2849 CmiAssert(newHandle
>= 0);
2851 calcHandleTableIdx(newHandle
,&tableIdx
,&idx
);
2853 table
= recvHandleTable
[senderNode
];
2854 for(i
=0;i
<tableIdx
;i
++){
2855 if(table
->next
==NULL
){
2856 table
->next
= malloc(sizeof(infiDirectHandleTable
));
2857 table
->next
->next
= NULL
;
2859 table
= table
->next
;
2861 table
->handles
[idx
].id
= newHandle
;
2862 table
->handles
[idx
].buf
= recvBuf
;
2863 table
->handles
[idx
].size
= recvBufSize
;
2864 #if CMI_DIRECT_DEBUG
2865 CmiPrintf("[%d] RDMA create addr %p %d sizeof(struct ibv_mr) %d\n",CmiMyNodeGlobal(),table
->handles
[idx
].buf
,recvBufSize
,sizeof(struct ibv_mr
));
2867 table
->handles
[idx
].callbackFnPtr
= callbackFnPtr
;
2868 table
->handles
[idx
].callbackData
= callbackData
;
2869 table
->handles
[idx
].key
= ibv_reg_mr(context
->pd
, recvBuf
, recvBufSize
,IBV_ACCESS_REMOTE_READ
| IBV_ACCESS_LOCAL_WRITE
| IBV_ACCESS_REMOTE_WRITE
);
2870 CmiAssert(table
->handles
[idx
].key
!= NULL
);
2871 #if CMK_IBVERBS_STATS
2875 /* table->handles[idx].rdmaPacket = CmiAlloc(sizeof(struct infiRdmaPacket));
2876 table->handles[idx].rdmaPacket->type = INFI_DIRECT;
2877 table->handles[idx].rdmaPacket->localBuffer = &(table->handles[idx]);*/
2879 userHandle
.handle
= newHandle
;
2880 userHandle
.recverNode
= CmiMyNodeGlobal();
2881 userHandle
.senderNode
= senderNode
;
2882 userHandle
.recverBuf
= recvBuf
;
2883 userHandle
.recverBufSize
= recvBufSize
;
2884 memcpy(userHandle
.recverKey
,table
->handles
[idx
].key
,sizeof(struct ibv_mr
));
2885 userHandle
.initialValue
= initialValue
;
2887 table
->handles
[idx
].userHandle
= userHandle
;
2889 initializeLastDouble(recvBuf
,recvBufSize
,initialValue
);
2892 int index
= table
->handles
[idx
].size
- sizeof(double);
2893 table
->handles
[idx
].pollingQNode
.lastDouble
= (double *)(((char *)table
->handles
[idx
].buf
)+index
);
2896 addHandleToPollingQ(&(table
->handles
[idx
]));
2898 // MACHSTATE4(3," Newhandle created %d senderProc %d recvBuf %p recvBufSize %d",newHandle,senderProc,recvBuf,recvBufSize);
2904 To be called on the sender to attach the sender's buffer to this handle
2906 void CmiDirect_assocLocalBuffer(struct infiDirectUserHandle
*userHandle
,void *sendBuf
,int sendBufSize
){
2909 int handle
= userHandle
->handle
;
2910 int recverNode
= userHandle
->recverNode
;
2912 infiDirectHandleTable
*table
;
2914 if(sendHandleTable
== NULL
){
2915 sendHandleTable
= createHandleTable();
2917 if(sendHandleTable
[recverNode
] == NULL
){
2918 sendHandleTable
[recverNode
] = malloc(sizeof(infiDirectHandleTable
));
2919 sendHandleTable
[recverNode
]->next
= NULL
;
2922 CmiAssert(handle
>= 0);
2923 calcHandleTableIdx(handle
,&tableIdx
,&idx
);
2925 table
= sendHandleTable
[recverNode
];
2926 for(i
=0;i
<tableIdx
;i
++){
2927 if(table
->next
==NULL
){
2928 table
->next
= malloc(sizeof(infiDirectHandleTable
));
2929 table
->next
->next
= NULL
;
2931 table
= table
->next
;
2934 table
->handles
[idx
].id
= handle
;
2935 table
->handles
[idx
].buf
= sendBuf
;
2937 table
->handles
[idx
].size
= sendBufSize
;
2938 #if CMI_DIRECT_DEBUG
2939 CmiPrintf("[%d] RDMA assoc addr %p %d remote addr %p \n",CmiMyPe(),table
->handles
[idx
].buf
,sendBufSize
,userHandle
->recverBuf
);
2941 table
->handles
[idx
].callbackFnPtr
= NULL
;
2942 table
->handles
[idx
].callbackData
= NULL
;
2943 table
->handles
[idx
].key
= ibv_reg_mr(context
->pd
, sendBuf
, sendBufSize
,IBV_ACCESS_REMOTE_READ
| IBV_ACCESS_LOCAL_WRITE
| IBV_ACCESS_REMOTE_WRITE
);
2944 CmiAssert(table
->handles
[idx
].key
!= NULL
);
2945 #if CMK_IBVERBS_STATS
2949 table
->handles
[idx
].userHandle
= *userHandle
;
2950 CmiAssert(sendBufSize
== table
->handles
[idx
].userHandle
.recverBufSize
);
2952 table
->handles
[idx
].rdmaPacket
= CmiAlloc(sizeof(struct infiRdmaPacket
));
2953 table
->handles
[idx
].rdmaPacket
->type
= INFI_DIRECT
;
2954 table
->handles
[idx
].rdmaPacket
->localBuffer
= &(table
->handles
[idx
]);
2957 /* table->handles[idx].packet = (struct infiDirectRequestPacket *)CmiAlloc(sizeof(struct infiDirectRequestPacket));
2958 table->handles[idx].packet->senderProc = Lrts_myNode;
2959 table->handles[idx].packet->handle = handle;
2960 table->handles[idx].packet->senderKey = *(table->handles[idx].key);
2961 table->handles[idx].packet->senderBuf = sendBuf;
2962 table->handles[idx].packet->senderBufSize = sendBufSize;*/
2964 MACHSTATE4(3,"idx %d recverProc %d handle %d sendBuf %p",idx
,recverNode
,handle
,sendBuf
);
2972 To be called on the sender to do the actual data transfer
2974 void CmiDirect_put(struct infiDirectUserHandle
*userHandle
){
2975 int handle
= userHandle
->handle
;
2976 int recverNode
= userHandle
->recverNode
;
2977 if(recverNode
== CmiMyNodeGlobal()){
2978 /*when the sender and receiver are on the same
2979 processor, just look up the sender and receiver
2980 buffers and do a memcpy*/
2982 infiDirectHandleTable
*senderTable
;
2983 infiDirectHandleTable
*recverTable
;
2988 /*find entry for this handle in sender table*/
2989 calcHandleTableIdx(handle
,&tableIdx
,&idx
);
2990 CmiAssert(sendHandleTable
!= NULL
);
2991 senderTable
= sendHandleTable
[CmiMyNodeGlobal()];
2992 CmiAssert(senderTable
!= NULL
);
2993 for(i
=0;i
<tableIdx
;i
++){
2994 senderTable
= senderTable
->next
;
2997 /**find entry for this handle in recver table*/
2998 recverTable
= recvHandleTable
[recverNode
];
2999 CmiAssert(recverTable
!= NULL
);
3000 for(i
=0;i
< tableIdx
;i
++){
3001 recverTable
= recverTable
->next
;
3004 CmiAssert(senderTable
->handles
[idx
].size
== recverTable
->handles
[idx
].size
);
3005 memcpy(recverTable
->handles
[idx
].buf
,senderTable
->handles
[idx
].buf
,senderTable
->handles
[idx
].size
);
3006 #if CMI_DIRECT_DEBUG
3007 CmiPrintf("[%d] RDMA memcpy put addr %p receiver %p, size %d\n",CmiMyPe(),senderTable
->handles
[idx
].buf
,recverTable
->handles
[idx
].buf
,senderTable
->handles
[idx
].size
);
3009 // The polling Q should find you and handle the callback and pollingq entry
3010 // (*(recverTable->handles[idx].callbackFnPtr))(recverTable->handles[idx].callbackData);
3018 infiDirectHandleTable
*table
;
3020 calcHandleTableIdx(handle
,&tableIdx
,&idx
);
3022 table
= sendHandleTable
[recverNode
];
3023 CmiAssert(table
!= NULL
);
3024 for(i
=0;i
<tableIdx
;i
++){
3025 table
= table
->next
;
3028 // MACHSTATE2(3,"CmiDirect_put to recverProc %d handle %d",recverProc,handle);
3029 #if CMI_DIRECT_DEBUG
3030 CmiPrintf("[%d] RDMA put addr %p\n",CmiMyPe(),table
->handles
[idx
].buf
);
3036 OtherNode node
= &nodes
[table
->handles
[idx
].userHandle
.recverNode
];
3037 struct ibv_sge list
= {
3038 .addr
= (uintptr_t )table
->handles
[idx
].buf
,
3039 .length
= table
->handles
[idx
].size
,
3040 .lkey
= table
->handles
[idx
].key
->lkey
3043 struct ibv_mr
*remoteKey
= (struct ibv_mr
*)table
->handles
[idx
].userHandle
.recverKey
;
3045 struct ibv_send_wr
*bad_wr
;
3046 struct ibv_send_wr wr
= {
3047 .wr_id
= (uint64_t)table
->handles
[idx
].rdmaPacket
,
3050 .opcode
= IBV_WR_RDMA_WRITE
,
3051 .send_flags
= IBV_SEND_SIGNALED
,
3054 .remote_addr
= (uint64_t )table
->handles
[idx
].userHandle
.recverBuf
,
3055 .rkey
= remoteKey
->rkey
3058 /** post and rdma_read that is a rdma get*/
3059 if(ibv_post_send(node
->infiData
->qp
,&wr
,&bad_wr
)){
3060 CmiAbort("ibv_post_send failed");
3064 /* MallocInfiPacket (packet);
3066 packet->size = sizeof(struct infiDirectRequestPacket);
3067 packet->buf = (char *)(table->handles[idx].packet);
3068 struct ibv_mr *packetKey = METADATAFIELD((void *)table->handles[idx].packet)->key;
3069 EnqueuePacket(node,packet,sizeof(struct infiDirectRequestPacket),packetKey);
3075 /**** need not be called the first time *********/
3076 void CmiDirect_readyMark(struct infiDirectUserHandle
*userHandle
){
3077 initializeLastDouble(userHandle
->recverBuf
,userHandle
->recverBufSize
,userHandle
->initialValue
);
3080 /**** need not be called the first time *********/
3081 void CmiDirect_readyPollQ(struct infiDirectUserHandle
*userHandle
){
3082 int handle
= userHandle
->handle
;
3084 infiDirectHandleTable
*table
;
3085 calcHandleTableIdx(handle
,&tableIdx
,&idx
);
3087 table
= recvHandleTable
[userHandle
->senderNode
];
3088 CmiAssert(table
!= NULL
);
3089 for(i
=0;i
<tableIdx
;i
++){
3090 table
= table
->next
;
3092 #if CMI_DIRECT_DEBUG
3093 CmiPrintf("[%d] CmiDirect_ready receiver %p\n",CmiMyNodeGlobal(),userHandle
->recverBuf
);
3095 addHandleToPollingQ(&(table
->handles
[idx
]));
3100 /**** need not be called the first time *********/
3101 void CmiDirect_ready(struct infiDirectUserHandle
*userHandle
){
3102 int handle
= userHandle
->handle
;
3104 infiDirectHandleTable
*table
;
3106 initializeLastDouble(userHandle
->recverBuf
,userHandle
->recverBufSize
,userHandle
->initialValue
);
3108 calcHandleTableIdx(handle
,&tableIdx
,&idx
);
3110 table
= recvHandleTable
[userHandle
->senderNode
];
3111 CmiAssert(table
!= NULL
);
3112 for(i
=0;i
<tableIdx
;i
++){
3113 table
= table
->next
;
3115 #if CMI_DIRECT_DEBUG
3116 CmiPrintf("[%d] CmiDirect_ready receiver %p\n",CmiMyNodeGlobal(),userHandle
->recverBuf
);
3118 addHandleToPollingQ(&(table
->handles
[idx
]));
3123 static int receivedDirectMessage(infiDirectHandle
*handle
){
3124 // int index = handle->size - sizeof(double);
3125 // double *lastDouble = (double *)(((char *)handle->buf)+index);
3126 if(*(handle
->pollingQNode
.lastDouble
) == handle
->userHandle
.initialValue
){
3129 (*(handle
->callbackFnPtr
))(handle
->callbackData
);
3136 static void pollCmiDirectQ(){
3137 directPollingQNode
*ptr
= headDirectPollingQ
, *prevPtr
=NULL
;
3139 if(receivedDirectMessage(ptr
->handle
)){
3140 #if CMI_DIRECT_DEBUG
3141 CmiPrintf("[%d] polling detected recvd message at buf %p\n",CmiMyNodeGlobal(),ptr
->handle
->userHandle
.recverBuf
);
3143 directPollingQNode
*delPtr
= ptr
;
3144 /** has been received and delete this node***/
3145 if(prevPtr
== NULL
){
3146 /** first in the pollingQ**/
3147 if(headDirectPollingQ
== tailDirectPollingQ
){
3148 /**only node in pollingQ****/
3149 headDirectPollingQ
= tailDirectPollingQ
= NULL
;
3151 headDirectPollingQ
= headDirectPollingQ
->next
;
3154 if(ptr
== tailDirectPollingQ
){
3155 /**last node is being deleted**/
3156 tailDirectPollingQ
= prevPtr
;
3158 prevPtr
->next
= ptr
->next
;
3169 void CmiMachineCleanup(){
3170 MACHSTATE(3, "CmiMachineCleanup")
3172 struct ibv_device
**devList
;
3173 ibv_dealloc_pd(context
->pd
);
3174 ibv_close_device(context
->context
);
3175 devList
= ibv_get_device_list(&num_devices
);
3176 ibv_free_device_list(devList
);
3177 MACHSTATE(3, "CmiMachineCleanup END")
3180 void LrtsNotifyIdle() {}
3181 void LrtsBeginIdle() {}
3182 void LrtsStillIdle() {}
3184 /*void processDirectRequest(struct infiDirectRequestPacket *directRequestPacket){
3185 int senderProc = directRequestPacket->senderProc;
3186 int handle = directRequestPacket->handle;
3188 infiDirectHandleTable *table;
3189 OtherNode node = nodes_by_pe[senderProc];
3191 MACHSTATE2(3,"processDirectRequest from proc %d handle %d",senderProc,handle);
3193 calcHandleTableIdx(handle,&tableIdx,&idx);
3195 table = recvHandleTable[senderProc];
3196 CmiAssert(table != NULL);
3197 for(i=0;i<tableIdx;i++){
3198 table = table->next;
3201 CmiAssert(table->handles[idx].size == directRequestPacket->senderBufSize);
3204 struct ibv_sge list = {
3205 .addr = (uintptr_t )table->handles[idx].buf,
3206 .length = table->handles[idx].size,
3207 .lkey = table->handles[idx].key->lkey
3210 struct ibv_send_wr *bad_wr;
3211 struct ibv_send_wr wr = {
3212 .wr_id = (uint64_t)table->handles[idx].rdmaPacket,
3215 .opcode = IBV_WR_RDMA_READ,
3216 .send_flags = IBV_SEND_SIGNALED,
3218 .remote_addr = (uint64_t )directRequestPacket->senderBuf,
3219 .rkey = directRequestPacket->senderKey.rkey
3222 // post and rdma_read that is a rdma get
3223 if(ibv_post_send(node->infiData->qp,&wr,&bad_wr)){
3231 void processDirectWC(struct infiRdmaPacket *rdmaPacket){
3232 MACHSTATE(3,"processDirectWC");
3233 infiDirectHandle *handle = (infiDirectHandle *)rdmaPacket->localBuffer;
3234 (*(handle->callbackFnPtr))(handle->callbackData);
3240 // use the common one
3242 static void sendBarrierMessage(int pe
)
3244 /* we will only need one packet */
3246 OtherNode node
= nodes
+ pe
;
3248 MallocInfiPacket(packet
);
3249 packet
->size
= size
;
3250 packet
->buf
= CmiAlloc(size
);
3251 packet
->header
.code
= INFIBARRIERPACKET
;
3252 struct ibv_mr
*key
= METADATAFIELD(packet
->buf
)->key
;
3253 MACHSTATE2(3,"Barrier packet to %d size %d",node
->infiData
->nodeNo
,size
);
3255 EnqueuePacket(node
,packet
,size
,key
);
3258 static void recvBarrierMessage()
3262 /* struct ibv_wc wc[WC_LIST_SIZE];*/
3263 struct ibv_wc wc
[1];
3264 struct ibv_wc
*recvWC
;
3265 /* block on the recvq, this is lazy and evil in the general case because we abuse buffers but should be ok for startup barriers */
3266 int toBuffer
=1; // buffer without processing recvd messages
3267 int barrierReached
=0;
3268 struct infiBuffer
*buffer
= NULL
;
3269 struct infiPacketHeader
*header
= NULL
;
3272 while(!barrierReached
)
3274 /* gengbin's semantic will implode if more than one q is polled at a time */
3275 ne
= ibv_poll_cq(context
->recvCq
,1,&wc
[0]);
3276 // CmiAssert(ne >=0);
3278 MACHSTATE1(3,"recvBarrier ne %d",ne
);
3282 if(wc
[i
].status
!= IBV_WC_SUCCESS
){
3285 switch(wc
[i
].opcode
){
3286 case IBV_WC_RECV
: /* we have something to consider*/
3288 buffer
= (struct infiBuffer
*) recvWC
->wr_id
;
3289 header
= (struct infiPacketHeader
*)buffer
->buf
;
3290 nodeNo
= header
->nodeNo
;
3291 len
= recvWC
->byte_len
-sizeof(struct infiPacketHeader
);
3293 if(header
->code
& INFIPACKETCODE_DATA
){
3294 processMessage(nodeNo
,len
,(buffer
->buf
+sizeof(struct infiPacketHeader
)),toBuffer
);
3296 if(header
->code
& INFIDUMMYPACKET
){
3297 MACHSTATE(3,"Dummy packet");
3299 if(header
->code
& INFIBARRIERPACKET
){
3300 MACHSTATE2(3,"Barrier packet from node %d len %d",nodeNo
,len
);
3303 /* semantically questionable */
3304 //processAllBufferedMsgs();
3307 if(rdma
&& header
->code
& INFIRDMA_START
){
3308 struct infiRdmaPacket
*rdmaPacket
= (struct infiRdmaPacket
*)(buffer
->buf
+sizeof(struct infiPacketHeader
));
3310 //TODO: make a function of this and use for both acks and requests
3311 struct infiRdmaPacket
*copyPacket
= malloc(sizeof(struct infiRdmaPacket
));
3312 struct infiRdmaPacket
*tmp
=context
->bufferedRdmaRequests
;
3313 *copyPacket
= *rdmaPacket
;
3314 copyPacket
->fromNodeNo
= nodeNo
;
3315 MACHSTATE1(3,"Buffering Rdma Request %p",copyPacket
);
3316 context
->bufferedRdmaRequests
= copyPacket
;
3317 copyPacket
->next
= tmp
;
3318 copyPacket
->prev
= NULL
;
3320 tmp
->prev
= copyPacket
;
3323 processRdmaRequest(rdmaPacket,nodeNo,0);
3326 if(rdma
&& header
->code
& INFIRDMA_ACK
){
3327 struct infiRdmaPacket
*rdmaPacket
= (struct infiRdmaPacket
*)(buffer
->buf
+sizeof(struct infiPacketHeader
)) ;
3328 processRdmaAck(rdmaPacket
);
3331 struct ibv_sge list
= {
3332 .addr
= (uintptr_t) buffer
->buf
,
3333 .length
= buffer
->size
,
3334 .lkey
= buffer
->key
->lkey
3337 struct ibv_recv_wr wr
= {
3338 .wr_id
= (uint64_t)buffer
,
3343 struct ibv_recv_wr
*bad_wr
;
3345 if(ibv_post_srq_recv(context
->srq
,&wr
,&bad_wr
)){
3352 CmiAbort("Wrong type of work completion object in recvq");
3357 /* semantically questionable */
3358 // processAllBufferedMsgs();
3362 /* happen at node level */
3369 int numnodes
= CmiNumNodesGlobal();
3370 if (CmiMyRank() == 0) {
3371 /* every one send to pe 0 */
3372 if (CmiMyNodeGlobal() != 0) {
3373 sendBarrierMessage(0);
3375 /* printf("[%d] HERE\n", CmiMyPe()); */
3376 if (CmiMyNodeGlobal() == 0)
3378 for (count
= 1; count
< numnodes
; count
++)
3380 recvBarrierMessage();
3382 /* pe 0 broadcast */
3383 for (i
=1; i
<=BROADCAST_SPANNING_FACTOR
; i
++) {
3385 if (p
> numnodes
- 1) break;
3386 /* printf("[%d] BD => %d \n", CmiMyPe(), p); */
3387 sendBarrierMessage(p
);
3390 /* non 0 node waiting */
3391 if (CmiMyNodeGlobal() != 0)
3393 recvBarrierMessage();
3394 for (i
=1; i
<=BROADCAST_SPANNING_FACTOR
; i
++) {
3395 int p
= CmiMyNodeGlobal();
3396 p
= BROADCAST_SPANNING_FACTOR
*p
+ i
;
3397 if (p
> numnodes
- 1) break;
3399 /* printf("[%d] RELAY => %d \n", CmiMyPe(), p); */
3400 sendBarrierMessage(p
);
3404 CmiNodeAllBarrier();
3405 processAllBufferedMsgs();
3406 /* printf("[%d] OUT of barrier \n", CmiMyPe()); */
3409 /* everyone sends a message to pe 0 and go on */
3410 int CmiBarrierZero()
3414 if (CmiMyRank() == 0) {
3415 if (CmiMyNodeGlobal()) {
3416 sendBarrierMessage(0);
3419 for (i
=0; i
<CmiNumNodesGlobal()-1; i
++)
3421 recvBarrierMessage();
3425 CmiNodeAllBarrier();
3426 processAllBufferedMsgs();