Cleanup #960: Replace CmiEnforce(0) calls in verbs and net layer
[charm.git] / src / arch / verbs / machine-ibverbs.c
blobd94a70e58ae5a469d5da2761495d631619c79d5e
1 /** @file
2 * Ibverbs (infiniband) implementation of Converse NET version
3 * @ingroup NET
4 * contains only Ibverbs specific code for:
5 * - CmiMachineInit()
6 * - CmiCommunicationInit()
7 * - CmiNotifyStillIdle()
8 * - DeliverViaNetwork()
9 * - CommunicationServer()
10 * - MachineExit()
12 created by
13 Sayantan Chakravorty, sayantan@gmail.com ,21st March 2007
16 /**
17 * @addtogroup NET
18 * @{
20 #include <stdio.h>
21 #include <stdlib.h>
22 #include <unistd.h>
23 #include <string.h>
24 #include <sys/types.h>
25 #include <sys/socket.h>
26 #if CMK_HAS_MALLOC_H
27 #include <malloc.h>
28 #endif
29 #include <getopt.h>
30 #include <time.h>
32 #include <infiniband/verbs.h>
35 #if ! QLOGIC
36 enum ibv_mtu mtu = IBV_MTU_2048;
37 #else
38 enum ibv_mtu mtu = IBV_MTU_4096;
39 #endif
40 static int page_size;
41 static int mtu_size;
42 static int packetSize;
43 static int dataSize;
44 static int rdma;
45 static int rdmaThreshold;
46 static int firstBinSize;
47 static int blockAllocRatio;
48 static int blockThreshold;
51 static int maxRecvBuffers;
52 static int maxTokens;
53 //static int tokensPerProcessor; /*number of outstanding sends and receives between any two nodes*/
54 static int sendPacketPoolSize; /*total number of send buffers created*/
56 static double _startTime=0;
57 static int regCount;
59 static int pktCount;
60 static int msgCount;
61 static int minTokensLeft;
64 static double regTime;
66 static double processBufferedTime;
67 static int processBufferedCount;
69 #define CMK_IBVERBS_STATS 0
70 #define CMK_IBVERBS_TOKENS_FLOW 1
71 #define CMK_IBVERBS_INCTOKENS 0 //never turn this on
72 #define CMK_IBVERBS_DEBUG 0
73 #define CMI_DIRECT_DEBUG 0
74 #define WC_LIST_SIZE 32
75 /*#define WC_BUFFER_SIZE 100*/
77 #if CMK_IBVERBS_STATS
78 static int numReg=0;
79 static int numUnReg=0;
80 static int numCurReg=0;
81 static int numAlloc=0;
82 static int numFree=0;
83 static int numMultiSendUnreg=0;
84 static int numMultiSend=0;
85 static int numMultiSendFree=0;
86 #endif
89 #define INCTOKENS_FRACTION 0.04
90 #define INCTOKENS_INCREASE .50
92 // flag for using a pool for every thread in SMP mode
93 #if CMK_SMP
94 #define THREAD_MULTI_POOL 1
95 #endif
97 #if THREAD_MULTI_POOL
98 #include "pcqueue.h"
99 PCQueue **queuePool;
100 void infi_CmiFreeDirect(void *ptr);
101 static inline void fillBufferPools();
102 #endif
104 #define INFIBARRIERPACKET 128
106 struct infiIncTokenAckPacket{
107 int a;
110 /***************
111 Data Structures
112 ***********************/
114 /******
115 This is a header attached to the beginning of every infiniband packet
116 *******/
117 #define INFIPACKETCODE_DATA 1
118 #define INFIPACKETCODE_INCTOKENS 2
119 #define INFIRDMA_START 4
120 #define INFIRDMA_ACK 8
121 #define INFIDIRECT_REQUEST 16
122 #define INFIPACKETCODE_INCTOKENSACK 32
123 #define INFIDUMMYPACKET 64
125 struct infiPacketHeader{
126 char code;
127 int nodeNo;
128 #if CMK_IBVERBS_DEBUG
129 int psn;
130 #endif
134 Types of rdma packets
136 #define INFI_MESG 1
137 #define INFI_DIRECT 2
139 struct infiRdmaPacket{
140 int fromNodeNo;
141 int type;
142 struct ibv_mr key;
143 struct ibv_mr *keyPtr;
144 int remoteSize;
145 char *remoteBuf;
146 void *localBuffer;
147 OutgoingMsg ogm;
148 struct infiRdmaPacket *next,*prev;
152 /** Represents a buffer that is used to receive messages
154 #define BUFFER_RECV 1
155 #define BUFFER_RDMA 2
156 struct infiBuffer{
157 int type;
158 char *buf;
159 int size;
160 struct ibv_mr *key;
165 /** At the moment it is a simple pool with just a list of buffers
166 * TODO; extend it to make it an element in a linklist of pools
168 struct infiBufferPool{
169 int numBuffers;
170 struct infiBuffer *buffers;
171 struct infiBufferPool *next;
174 /*****
175 It is the structure for the send buffers that are used
176 to send messages to other nodes
177 ********/
180 typedef struct infiPacketStruct {
181 char *buf;
182 int size;
183 struct infiPacketHeader header;
184 struct ibv_mr *keyHeader;
185 struct OtherNodeStruct *destNode;
186 struct infiPacketStruct *next;
187 OutgoingMsg ogm;
188 struct ibv_sge elemList[2];
189 struct ibv_send_wr wr;
190 }* infiPacket;
193 typedef struct infiBufferedWCStruct{
194 struct ibv_wc wcList[WC_BUFFER_SIZE];
195 int count;
196 struct infiBufferedWCStruct *next,*prev;
197 } * infiBufferedWC;
200 #define BCASTLIST_SIZE 50
202 struct infiBufferedBcastStruct{
203 char *msg;
204 int size;
205 int broot;
206 int asm_rank;
207 int valid;
210 typedef struct infiBufferedBcastPoolStruct{
211 struct infiBufferedBcastPoolStruct *next,*prev;
212 struct infiBufferedBcastStruct bcastList[BCASTLIST_SIZE];
213 int count;
215 } *infiBufferedBcastPool;
220 /***
221 This structure represents the data needed by the infiniband
222 communication routines of a node
223 TODO: add locking for the smp version
225 struct infiContext {
226 struct ibv_context *context;
228 fd_set asyncFds;
229 struct timeval tmo;
231 int ibPort;
232 // struct ibv_comp_channel *channel;
233 struct ibv_pd *pd;
234 struct ibv_cq *sendCq;
235 struct ibv_cq *recvCq;
236 struct ibv_srq *srq;
238 struct ibv_qp **qp; //Array of qps (numNodes long) to temporarily store the queue pairs
239 //It is used between CmiMachineInit and the call to node_addresses_store
240 //when the qps are stored in the corresponding OtherNodes
242 struct infiAddr *localAddr; //store the lid,qpn,msn address of ur qpair until they are sent
244 infiPacket infiPacketFreeList;
246 struct infiBufferPool *recvBufferPool;
248 struct infiPacketHeader header;
250 int srqSize;
251 int sendCqSize,recvCqSize;
252 int tokensLeft;
254 infiBufferedBcastPool bufferedBcastList;
256 struct infiRdmaPacket *bufferedRdmaAcks;
258 struct infiRdmaPacket *bufferedRdmaRequests;
259 /* infiBufferedWC infiBufferedRecvList;*/
261 int insideProcessBufferedBcasts;
264 static struct infiContext *context = NULL;
269 /** Represents a qp used to send messages to another node
270 There is one for each remote node
272 struct infiAddr {
273 int lid,qpn,psn;
277 Stored in the OtherNode structure in machine-dgram.c
278 Store the per node data for ibverbs layer
280 enum { INFI_HEADER_DATA=21,INFI_DATA};
282 struct infiOtherNodeData{
283 struct ibv_qp *qp ;
284 int state;// does it expect a packet with a header (first packet) or one without
285 int totalTokens;
286 int tokensLeft;
287 int nodeNo;
289 int postedRecvs;
290 int broot;//needed to store the root of a multi-packet broadcast sent along a spanning tree or hypercube
291 #if CMK_IBVERBS_DEBUG
292 int psn;
293 int recvPsn;
294 #endif
298 /********************************
299 Memory management structures and types
300 *****************/
302 struct infiCmiChunkHeaderStruct;
304 typedef struct infiCmiChunkMetaDataStruct {
305 struct ibv_mr *key;
306 int poolIdx;
307 void *nextBuf;
308 struct infiCmiChunkHeaderStruct *owner;
309 int count;
311 #if THREAD_MULTI_POOL
312 int parentPe; // the PE that allocated the buffer and must release it
313 #endif
314 } infiCmiChunkMetaData;
319 #define METADATAFIELD(m) (((infiCmiChunkHeader *)m)[-1].metaData)
321 typedef struct {
322 int size;//without infiCmiChunkHeader
323 void *startBuf;
324 int count;
325 } infiCmiChunkPool;
327 #define INFINUMPOOLS 14
328 #define INFIMAXPERPOOL 100
329 #define INFIMULTIPOOL 0xDEAFB00D
331 #if THREAD_MULTI_POOL
332 static infiCmiChunkPool **infiCmiChunkPools;
333 //TODO Find proper place to dispose the memory acquired by infiCmiChunkPool
334 #else
335 static infiCmiChunkPool infiCmiChunkPools[INFINUMPOOLS];
336 #endif
338 static void initInfiCmiChunkPools();
341 static inline infiPacket newPacket(){
342 infiPacket pkt = (infiPacket )CmiAlloc(sizeof(struct infiPacketStruct));
343 pkt->size = -1;
344 pkt->header = context->header;
345 pkt->next = NULL;
346 pkt->destNode = NULL;
347 pkt->keyHeader = METADATAFIELD(pkt)->key;
348 pkt->ogm=NULL;
349 CmiAssert(pkt->keyHeader!=NULL);
350 pkt->buf=NULL;
352 pkt->elemList[0].addr = (uintptr_t)&(pkt->header);
353 pkt->elemList[0].length = sizeof(struct infiPacketHeader);
354 pkt->elemList[0].lkey = pkt->keyHeader->lkey;
356 pkt->wr.wr_id = (uint64_t)pkt;
357 pkt->wr.sg_list = &(pkt->elemList[0]);
358 pkt->wr.num_sge = 2;
359 pkt->wr.opcode = IBV_WR_SEND;
360 pkt->wr.send_flags = IBV_SEND_SIGNALED;
361 pkt->wr.next = NULL;
363 return pkt;
366 #define FreeInfiPacket(pkt){ \
367 pkt->size = -1;\
368 pkt->ogm=NULL;\
369 pkt->buf=NULL;\
370 pkt->next = context->infiPacketFreeList; \
371 context->infiPacketFreeList = pkt; \
374 #define MallocInfiPacket(pkt) { \
375 infiPacket p = context->infiPacketFreeList; \
376 if(p == NULL){ p = newPacket();} \
377 else{context->infiPacketFreeList = p->next; } \
378 pkt = p;\
383 void infi_unregAndFreeMeta(void *md)
385 if(md!=NULL && (((infiCmiChunkMetaData *)md)->poolIdx == INFIMULTIPOOL))
387 int unregstat=ibv_dereg_mr(((infiCmiChunkMetaData*)md)->key);
388 CmiAssert(unregstat==0);
389 free(((infiCmiChunkMetaData *)md));
390 #if CMK_IBVERBS_STATS
391 numUnReg++;
392 numCurReg--;
393 numMultiSendUnreg++;
394 #endif
399 /******************CmiMachineInit and its helper functions*/
400 static inline int pollSendCq(const int toBuffer);
402 void createLocalQps(struct ibv_device *dev,int ibPort, int myNode,int numNodes,struct infiAddr *localAddr);
403 static uint16_t getLocalLid(struct ibv_context *context, int port);
404 static int checkQp(struct ibv_qp *qp){
405 struct ibv_qp_attr attr;
406 struct ibv_qp_init_attr init_attr;
408 ibv_query_qp(qp, &attr, IBV_QP_STATE | IBV_QP_CUR_STATE|IBV_QP_CAP ,&init_attr);
409 if(attr.cur_qp_state != IBV_QPS_RTS){
410 MACHSTATE2(3,"CHECKQP failed cap wr %d sge %d",attr.cap.max_send_wr,attr.cap.max_send_sge);
411 return 0;
413 return 1;
415 static void checkAllQps(){
416 int i;
417 for(i=0;i<CmiNumNodesGlobal();i++){
418 if(i != CmiMyNodeGlobal()){
419 if(!checkQp(nodes[i].infiData->qp)){
420 pollSendCq(0);
421 CmiAbort("Queue pair check failed");
427 #if CMK_IBVERBS_FAST_START
428 static void send_partial_init();
429 #endif
431 static void CmiMachineInit(char **argv){
432 struct ibv_device **devList;
433 struct ibv_device *dev;
434 int ibPort;
435 int i;
436 int calcMaxSize;
437 infiPacket *pktPtrs;
438 struct infiRdmaPacket **rdmaPktPtrs;
439 int num_devices, idev;
441 #if CMK_SMP
442 ibv_fork_init();
443 #endif
445 MACHSTATE(3,"CmiMachineInit {");
446 MACHSTATE2(3,"Lrts_numNodes %d CmiNumNodes() %d",Lrts_numNodes,CmiNumNodes());
447 MACHSTATE1(3,"CmiMyNodeSize() %d",CmiMyNodeSize());
449 //TODO: make the device and ibport configureable by commandline parameter
450 //Check example for how to do that
451 devList = ibv_get_device_list(&num_devices);
452 CmiEnforce(num_devices > 0);
453 CmiEnforce(devList != NULL);
455 context = (struct infiContext *)malloc(sizeof(struct infiContext));
456 MACHSTATE1(3,"context allocated %p",context);
458 //localAddr will store the local addresses of all the qps
459 context->localAddr = (struct infiAddr *)malloc(sizeof(struct infiAddr)*Lrts_numNodes);
460 MACHSTATE1(3,"context->localAddr allocated %p",context->localAddr);
462 idev = 0;
464 // try all devices, can't assume device 0 is IB, it may be ethernet
465 loop:
466 dev = devList[idev];
467 CmiEnforce(dev != NULL);
469 MACHSTATE1(3,"device name %s",ibv_get_device_name(dev));
471 //the context for this infiniband device
472 context->context = ibv_open_device(dev);
473 CmiEnforce(context->context != NULL);
475 // test ibPort
476 int MAXPORT = 8;
477 for (ibPort = 1; ibPort < MAXPORT; ibPort++) {
478 struct ibv_port_attr attr;
479 if (ibv_query_port(context->context, ibPort, &attr) != 0) continue;
480 #if CMK_IBV_PORT_ATTR_HAS_LINK_LAYER
481 if (attr.link_layer == IBV_LINK_LAYER_INFINIBAND) break;
482 #else
483 break;
484 #endif
487 if (ibPort == MAXPORT) {
488 if (++idev == num_devices)
489 CmiAbort("No valid IB port found!");
490 else
491 goto loop;
493 context->ibPort = ibPort;
494 MACHSTATE1(3,"use port %d", ibPort);
496 MACHSTATE1(3,"device opened %p",context->context);
498 /* FD_ZERO(&context->asyncFds);
499 FD_SET(context->context->async_fd,&context->asyncFds);
500 context->tmo.tv_sec=0;
501 context->tmo.tv_usec=0;
503 MACHSTATE(3,"asyncFds zeroed and set");*/
505 //protection domain
506 context->pd = ibv_alloc_pd(context->context);
507 CmiEnforce(context->pd != NULL);
508 MACHSTATE2(3,"pd %p pd->handle %d",context->pd,context->pd->handle);
510 /******** At this point we know that this node is more or less serviceable
511 So, this is a good point for sending the partial init message for the fast
512 start case
513 Moreover, no work dependent on the number of nodes has started yet.
514 ************/
516 #if CMK_IBVERBS_FAST_START
517 send_partial_init();
518 #endif
521 context->header.nodeNo = Lrts_myNode;
523 mtu_size=1200;
524 packetSize = mtu_size*4;
525 dataSize = packetSize-sizeof(struct infiPacketHeader);
527 calcMaxSize=8000;
528 /* if(Lrts_numNodes*50 > calcMaxSize){
529 calcMaxSize = Lrts_numNodes*50;
530 if(calcMaxSize > 10000){
531 calcMaxSize = 10000;
534 maxRecvBuffers=calcMaxSize;
535 if (CmiGetArgIntDesc(argv,"+IBVMaxSendTokens",&maxTokens,"User set IBV Max Outstanding Send Tokens") == 0)
536 maxTokens = 1000; // this value may need to be tweaked later
537 context->tokensLeft=maxTokens;
538 context->qp=NULL;
539 //tokensPerProcessor=4;
540 if(Lrts_numNodes > 1){
541 #if !CMK_IBVERBS_FAST_START
542 /* a barrier to make sure all nodes initialized the device */
543 ChMessage msg;
544 ctrl_sendone_nolock("barrier",NULL,0,NULL,0);
545 ChMessage_recv(Cmi_charmrun_fd,&msg);
546 #endif
547 createLocalQps(dev,ibPort,Lrts_myNode,Lrts_numNodes,context->localAddr);
550 if (Cmi_charmrun_fd == -1) return;
552 //TURN ON RDMA
553 rdma=1;
554 // rdmaThreshold=32768;
555 rdmaThreshold=22000;
556 firstBinSize = 128;
557 CmiAssert(rdmaThreshold > firstBinSize);
558 /* blockAllocRatio=16;
559 blockThreshold=8;*/
561 blockAllocRatio=128;
562 blockThreshold=9;
566 #if !THREAD_MULTI_POOL
567 initInfiCmiChunkPools();
568 #endif
570 /*create the pool of send packets*/
571 sendPacketPoolSize = maxTokens/2;
572 if(sendPacketPoolSize > 2000){
573 sendPacketPoolSize = 2000;
576 context->infiPacketFreeList=NULL;
577 pktPtrs = malloc(sizeof(infiPacket)*sendPacketPoolSize);
579 //Silly way of allocating the memory buffers (slow as well) but simplifies the code
580 #if !THREAD_MULTI_POOL
581 for(i=0;i<sendPacketPoolSize;i++){
582 MallocInfiPacket(pktPtrs[i]);
585 for(i=0;i<sendPacketPoolSize;i++){
586 FreeInfiPacket(pktPtrs[i]);
588 free(pktPtrs);
589 #endif
591 context->bufferedBcastList=NULL;
592 context->bufferedRdmaAcks = NULL;
593 context->bufferedRdmaRequests = NULL;
594 context->insideProcessBufferedBcasts=0;
597 if(rdma){
598 /* int numPkts;
599 int k;
600 if( Lrts_numNodes*4 < maxRecvBuffers/4){
601 numPkts = Lrts_numNodes*4;
602 }else{
603 numPkts = maxRecvBuffers/4;
606 rdmaPktPtrs = (struct infiRdmaPacket **)malloc(numPkts*sizeof(struct infiRdmaPacket));
607 for(k=0;k<numPkts;k++){
608 rdmaPktPtrs[k] = CmiAlloc(sizeof(struct infiRdmaPacket));
611 for(k=0;k<numPkts;k++){
612 CmiFree(rdmaPktPtrs[k]);
614 free(rdmaPktPtrs);*/
617 /* context->infiBufferedRecvList = NULL;*/
618 #if CMK_IBVERBS_STATS
619 regCount =0;
620 regTime = 0;
622 pktCount=0;
623 msgCount=0;
625 processBufferedCount=0;
626 processBufferedTime=0;
628 minTokensLeft = maxTokens;
629 #endif
633 MACHSTATE(3,"} CmiMachineInit");
636 void CmiCommunicationInit(char **argv)
638 #if THREAD_MULTI_POOL
639 initInfiCmiChunkPools();
640 fillBufferPools();
641 #endif
644 /*********
645 Open a qp for every processor
646 *****/
647 void createLocalQps(struct ibv_device *dev,int ibPort, int myNode,int numNodes,struct infiAddr *localAddr){
648 int myLid;
649 int i;
652 //find my lid
653 myLid = getLocalLid(context->context,ibPort);
655 MACHSTATE2(3,"myLid %d numNodes %d",myLid,numNodes);
657 context->sendCqSize = maxTokens+2;
658 context->sendCq = ibv_create_cq(context->context,context->sendCqSize,NULL,NULL,0);
659 CmiAssert(context->sendCq != NULL);
661 MACHSTATE1(3,"sendCq created %p",context->sendCq);
664 context->recvCqSize = maxRecvBuffers;
665 context->recvCq = ibv_create_cq(context->context,context->recvCqSize,NULL,NULL,0);
667 MACHSTATE2(3,"recvCq created %p %d",context->recvCq,context->recvCqSize);
668 CmiAssert(context->recvCq != NULL);
670 //array of queue pairs
672 context->qp = (struct ibv_qp **)malloc(sizeof(struct ibv_qp *)*numNodes);
674 if(numNodes > 1)
676 context->srqSize = (maxRecvBuffers+2);
677 struct ibv_srq_init_attr srqAttr = {
678 .attr = {
679 .max_wr = context->srqSize,
680 .max_sge = 1
683 context->srq = ibv_create_srq(context->pd,&srqAttr);
684 CmiAssert(context->srq != NULL);
686 struct ibv_qp_init_attr initAttr = {
687 .qp_type = IBV_QPT_RC,
688 .send_cq = context->sendCq,
689 .recv_cq = context->recvCq,
690 .srq = context->srq,
691 .sq_sig_all = 0,
692 .qp_context = NULL,
693 .cap = {
694 .max_send_wr = maxTokens,
695 .max_send_sge = 2,
698 struct ibv_qp_attr attr;
700 attr.qp_state = IBV_QPS_INIT;
701 attr.pkey_index = 0;
702 attr.port_num = ibPort;
703 attr.qp_access_flags = IBV_ACCESS_REMOTE_READ | IBV_ACCESS_REMOTE_WRITE;
705 /* MACHSTATE1(3,"context->pd %p",context->pd);
706 struct ibv_qp *qp = ibv_create_qp(context->pd,&initAttr);
707 MACHSTATE1(3,"TEST QP %p",qp);*/
709 for( i=1;i<numNodes;i++){
710 int n = (myNode + i)%numNodes;
711 if(n == myNode){
712 }else{
713 localAddr[n].lid = myLid;
714 context->qp[n] = ibv_create_qp(context->pd,&initAttr);
716 MACHSTATE2(3,"qp[%d] created %p",n,context->qp[n]);
717 CmiAssert(context->qp[n] != NULL);
719 ibv_modify_qp(context->qp[n], &attr,
720 IBV_QP_STATE |
721 IBV_QP_PKEY_INDEX |
722 IBV_QP_PORT |
723 IBV_QP_ACCESS_FLAGS);
725 localAddr[n].qpn = context->qp[n]->qp_num;
726 localAddr[n].psn = lrand48() & 0xffffff;
727 MACHSTATE4(3,"i %d lid Ox%x qpn 0x%x psn 0x%x",n,localAddr[n].lid,localAddr[n].qpn,localAddr[n].psn);
731 MACHSTATE(3,"qps created");
734 void copyInfiAddr(ChInfiAddr *qpList){
735 int qpListIdx=0;
736 int i;
737 MACHSTATE1(3,"copyInfiAddr Lrts_myNode %d",Lrts_myNode);
738 for(i=0;i<Lrts_numNodes;i++){
739 if(i == Lrts_myNode){
740 }else{
741 qpList[qpListIdx].lid = ChMessageInt_new(context->localAddr[i].lid);
742 qpList[qpListIdx].qpn = ChMessageInt_new(context->localAddr[i].qpn);
743 qpList[qpListIdx].psn = ChMessageInt_new(context->localAddr[i].psn);
744 qpListIdx++;
750 static uint16_t getLocalLid(struct ibv_context *dev_context, int port){
751 struct ibv_port_attr attr;
753 if (ibv_query_port(dev_context, port, &attr))
754 return 0;
756 return attr.lid;
759 /**************** END OF CmiMachineInit and its helper functions*/
761 struct infiBufferPool * allocateInfiBufferPool(int numRecvs,int sizePerBuffer);
762 void postInitialRecvs(struct infiBufferPool *recvBufferPool,int numRecvs,int sizePerBuffer);
764 /* Initial the infiniband specific data for a remote node
765 1. connect the qp and store it in and return it
767 struct infiOtherNodeData *initInfiOtherNodeData(int node,int addr[3]){
768 struct infiOtherNodeData * ret = malloc(sizeof(struct infiOtherNodeData));
769 int err;
770 ret->state = INFI_HEADER_DATA;
771 ret->qp = context->qp[node];
772 // ret->totalTokens = tokensPerProcessor;
773 // ret->tokensLeft = tokensPerProcessor;
774 ret->nodeNo = node;
775 // ret->postedRecvs = tokensPerProcessor;
776 #if CMK_IBVERBS_DEBUG
777 ret->psn = 0;
778 ret->recvPsn = 0;
779 #endif
781 struct ibv_qp_attr attr = {
782 .qp_state = IBV_QPS_RTR,
783 .path_mtu = mtu,
784 .dest_qp_num = addr[1],
785 .rq_psn = addr[2],
786 .max_dest_rd_atomic = 1,
787 .min_rnr_timer = 31,
788 .ah_attr = {
789 .is_global = 0,
790 .dlid = addr[0],
791 .sl = 0,
792 .src_path_bits = 0,
793 .port_num = context->ibPort
797 MACHSTATE2(3,"initInfiOtherNodeData %d{ qp %p",node,ret->qp);
798 MACHSTATE3(3,"dlid 0x%x qp 0x%x psn 0x%x",attr.ah_attr.dlid,attr.dest_qp_num,attr.rq_psn);
800 if (err = ibv_modify_qp(ret->qp, &attr,
801 IBV_QP_STATE |
802 IBV_QP_AV |
803 IBV_QP_PATH_MTU |
804 IBV_QP_DEST_QPN |
805 IBV_QP_RQ_PSN |
806 IBV_QP_MAX_DEST_RD_ATOMIC |
807 IBV_QP_MIN_RNR_TIMER)) {
808 MACHSTATE1(3,"ERROR %d",err);
809 CmiAbort("failed to change qp state to RTR");
812 MACHSTATE(3,"qp state changed to RTR");
814 attr.qp_state = IBV_QPS_RTS;
815 #if ! QLOGIC
816 attr.timeout = 26;
817 attr.retry_cnt = 20;
818 #else
819 attr.timeout = 14;
820 attr.retry_cnt = 7;
821 #endif
822 attr.rnr_retry = 7;
823 attr.sq_psn = context->localAddr[node].psn;
824 attr.max_rd_atomic = 1;
826 if (err = ibv_modify_qp(ret->qp, &attr,
827 IBV_QP_STATE |
828 IBV_QP_TIMEOUT |
829 IBV_QP_RETRY_CNT |
830 IBV_QP_RNR_RETRY |
831 IBV_QP_SQ_PSN |
832 IBV_QP_MAX_QP_RD_ATOMIC)) {
833 MACHSTATE1(3,"ERROR changing qp state to RTS %d: will retry",err);
836 // Error code 22 means that there was an invalid parameter when calling to this verbs, try with alternate parameters
837 if(err == 22) {
838 //use inverted logic
839 #if QLOGIC
840 mtu = IBV_MTU_2048;
841 attr.path_mtu = mtu;
842 attr.timeout = 26;
843 attr.retry_cnt = 20;
844 #else
845 mtu = IBV_MTU_4096;
846 attr.path_mtu = mtu;
847 attr.timeout = 14;
848 attr.retry_cnt = 7;
849 #endif
851 MACHSTATE3(3,"Retry:dlid 0x%x qp 0x%x psn 0x%x",attr.ah_attr.dlid,attr.dest_qp_num,attr.sq_psn);
852 if (err = ibv_modify_qp(ret->qp, &attr,
853 IBV_QP_STATE |
854 IBV_QP_TIMEOUT |
855 IBV_QP_RETRY_CNT |
856 IBV_QP_RNR_RETRY |
857 IBV_QP_SQ_PSN |
858 IBV_QP_MAX_QP_RD_ATOMIC)) {
859 MACHSTATE1(3,"ERROR changing qp state to RTS %d",err);
860 CmiAbort("Failed to change qp state to RTS: you may need some device-specific parameters in machine-ibverbs");
862 } else if(err) {
863 CmiAbort("Failed to change qp state to RTS");
866 MACHSTATE(3,"qp state changed to RTS");
868 MACHSTATE(3,"} initInfiOtherNodeData");
869 return ret;
873 void infiPostInitialRecvs(){
874 //create the pool and post the receives
875 int numPosts;
876 /* if(tokensPerProcessor*(Lrts_numNodes-1) <= maxRecvBuffers){
877 numPosts = tokensPerProcessor*(Lrts_numNodes-1);
878 }else{
879 numPosts = maxRecvBuffers;
882 if(Lrts_numNodes > 1){
883 numPosts = maxRecvBuffers;
884 }else{
885 numPosts = 0;
887 if(numPosts > 0){
888 context->recvBufferPool = allocateInfiBufferPool(numPosts,packetSize);
889 postInitialRecvs(context->recvBufferPool,numPosts,packetSize);
893 if (context->qp) {
894 free(context->qp);
895 context->qp = NULL;
897 free(context->localAddr);
898 context->localAddr= NULL;
901 struct infiBufferPool * allocateInfiBufferPool(int numRecvs,int sizePerBuffer){
902 int numBuffers;
903 int i;
904 int bigSize;
905 char *bigBuf;
906 struct infiBufferPool *ret;
907 struct ibv_mr *bigKey;
909 MACHSTATE2(3,"allocateInfiBufferPool numRecvs %d sizePerBuffer%d ",numRecvs,sizePerBuffer);
911 page_size = sysconf(_SC_PAGESIZE);
912 ret = malloc(sizeof(struct infiBufferPool));
913 ret->next = NULL;
914 numBuffers=ret->numBuffers = numRecvs;
916 ret->buffers = malloc(sizeof(struct infiBuffer)*numBuffers);
918 bigSize = numBuffers*sizePerBuffer;
919 bigBuf=malloc(bigSize);
920 bigKey = ibv_reg_mr(context->pd,bigBuf,bigSize,IBV_ACCESS_LOCAL_WRITE);
921 #if CMK_IBVERBS_STATS
922 numCurReg++;
923 numReg++;
924 #endif
926 CmiAssert(bigKey != NULL);
928 for(i=0;i<numBuffers;i++){
929 struct infiBuffer *buffer = &(ret->buffers[i]);
930 buffer->type = BUFFER_RECV;
931 buffer->size = sizePerBuffer;
934 buffer->buf = &bigBuf[i*sizePerBuffer];
935 buffer->key = bigKey;
937 if(buffer->key == NULL){
938 MACHSTATE2(3,"i %d buffer->buf %p",i,buffer->buf);
939 CmiAssert(buffer->key != NULL);
942 return ret;
948 Post the buffers as recv work requests
950 void postInitialRecvs(struct infiBufferPool *recvBufferPool,int numRecvs,int sizePerBuffer){
951 int j,err;
952 struct ibv_recv_wr *workRequests = malloc(sizeof(struct ibv_recv_wr)*numRecvs);
953 struct ibv_sge *sgElements = malloc(sizeof(struct ibv_sge)*numRecvs);
954 struct ibv_recv_wr *bad_wr;
956 int startBufferIdx=0;
957 MACHSTATE2(3,"posting %d receives of size %d",numRecvs,sizePerBuffer);
958 for(j=0;j<numRecvs;j++){
961 sgElements[j].addr = (uint64_t) recvBufferPool->buffers[startBufferIdx+j].buf;
962 sgElements[j].length = sizePerBuffer;
963 sgElements[j].lkey = recvBufferPool->buffers[startBufferIdx+j].key->lkey;
965 workRequests[j].wr_id = (uint64_t)&(recvBufferPool->buffers[startBufferIdx+j]);
966 workRequests[j].sg_list = &sgElements[j];
967 workRequests[j].num_sge = 1;
968 if(j != numRecvs-1){
969 workRequests[j].next = &workRequests[j+1];
973 workRequests[numRecvs-1].next = NULL;
974 MACHSTATE(3,"About to call ibv_post_srq_recv");
975 if(ibv_post_srq_recv(context->srq,workRequests,&bad_wr)){
976 CmiAbort("ibv_post_srq_recv failed");
979 free(workRequests);
980 free(sgElements);
986 static inline void CommunicationServer_nolock(int toBuffer); //if buffer ==1 recvd messages are buffered but not processed
988 void MachineExit()
990 #if CMK_IBVERBS_STATS
991 printf("[%d] numReg %d numUnReg %d numCurReg %d msgCount %d pktCount %d packetSize %d total Time %.6lf s processBufferedCount %d processBufferedTime %.6lf s maxTokens %d tokensLeft %d \n",Lrts_myNode,numReg, numUnReg, numCurReg, msgCount,pktCount,packetSize,CmiTimer(),processBufferedCount,processBufferedTime,maxTokens,context->tokensLeft);
992 #endif
995 static void ServiceCharmrun_nolock();
997 static inline void increaseTokens(OtherNode node);
999 static inline int pollRecvCq(const int toBuffer);
1000 static inline int pollSendCq(const int toBuffer);
1003 static inline void getFreeTokens(struct infiOtherNodeData *infiData){
1004 #if !CMK_IBVERBS_TOKENS_FLOW
1005 return;
1006 #else
1007 //if(infiData->tokensLeft == 0){
1008 if(context->tokensLeft == 0){
1009 MACHSTATE(3,"GET FREE TOKENS {{{");
1010 }else{
1011 return;
1013 while(context->tokensLeft == 0){
1014 CommunicationServer_nolock(1);
1016 MACHSTATE1(3,"}}} GET FREE TOKENS %d",context->tokensLeft);
1017 #endif
1022 Packetize this data and send it
1027 static void inline EnqueuePacket(OtherNode node,infiPacket packet,int size,struct ibv_mr *dataKey){
1028 int incTokens=0;
1029 int retval;
1030 #if CMK_IBVERBS_DEBUG
1031 packet->header.psn = (++node->infiData->psn);
1032 #endif
1036 packet->elemList[1].addr = (uintptr_t)packet->buf;
1037 packet->elemList[1].length = size;
1038 packet->elemList[1].lkey = dataKey->lkey;
1041 packet->destNode = node;
1043 #if CMK_IBVERBS_STATS
1044 pktCount++;
1045 #endif
1047 getFreeTokens(node->infiData);
1049 #if CMK_IBVERBS_INCTOKENS
1050 if((node->infiData->tokensLeft < INCTOKENS_FRACTION*node->infiData->totalTokens || node->infiData->tokensLeft < 2) && node->infiData->totalTokens < maxTokens){
1051 packet->header.code |= INFIPACKETCODE_INCTOKENS;
1052 incTokens=1;
1054 #endif
1056 if(!checkQp(node->infiData->qp)){
1057 pollSendCq(1);
1058 CmiEnforce(0);
1061 struct ibv_send_wr *bad_wr=NULL;
1062 if(retval = ibv_post_send(node->infiData->qp,&(packet->wr),&bad_wr)){
1063 CmiPrintf("[%d] Sending to node %d failed with return value %d\n",Lrts_myNode,node->infiData->nodeNo,retval);
1064 CmiAbort("ibv_post_send failed");
1066 #if CMK_IBVERBS_TOKENS_FLOW
1067 context->tokensLeft--;
1068 #if CMK_IBVERBS_STATS
1069 if(context->tokensLeft < minTokensLeft){
1070 minTokensLeft = context->tokensLeft;
1072 #endif
1073 #endif
1075 /* if(!checkQp(node->infiData->qp)){
1076 pollSendCq(1);
1077 CmiEnforce(0);
1080 #if CMK_IBVERBS_INCTOKENS
1081 if(incTokens){
1082 increaseTokens(node);
1084 #endif
1087 #if CMK_IBVERBS_DEBUG
1088 MACHSTATE4(3,"Packet send size %d node %d tokensLeft %d psn %d",size,packet->destNode->infiData->nodeNo,context->tokensLeft,packet->header.psn);
1089 #else
1090 MACHSTATE4(3,"Packet send size %d node %d tokensLeft %d packet->buf %p",size,packet->destNode->infiData->nodeNo,context->tokensLeft,packet->buf);
1091 #endif
1096 static void inline EnqueueDummyPacket(OtherNode node,int size){
1097 infiPacket packet;
1098 MallocInfiPacket(packet);
1099 packet->size = size;
1100 packet->buf = CmiAlloc(size);
1102 packet->header.code = INFIDUMMYPACKET;
1104 struct ibv_mr *key = METADATAFIELD(packet->buf)->key;
1106 MACHSTATE2(3,"Dummy packet to %d size %d",node->infiData->nodeNo,size);
1107 EnqueuePacket(node,packet,size,key);
1115 static void inline EnqueueDataPacket(OutgoingMsg ogm, OtherNode node, int rank,char *data,int size,int broot,int copy){
1116 infiPacket packet;
1117 MallocInfiPacket(packet);
1118 packet->size = size;
1119 packet->buf=data;
1121 //the nodeNo is added at time of packet allocation
1122 packet->header.code = INFIPACKETCODE_DATA;
1124 ogm->refcount++;
1125 packet->ogm = ogm;
1127 struct ibv_mr *key = METADATAFIELD(ogm->data)->key;
1128 CmiAssert(key != NULL);
1130 EnqueuePacket(node,packet,size,key);
1133 static inline void EnqueueRdmaPacket(OutgoingMsg ogm, OtherNode node);
1134 static inline void processAllBufferedMsgs();
1136 void DeliverViaNetwork(OutgoingMsg ogm, OtherNode node, int rank, unsigned int broot, int copy){
1137 int size; char *data;
1138 // processAllBufferedMsgs();
1141 ogm->refcount++;
1142 size = ogm->size;
1143 data = ogm->data;
1145 #if CMK_IBVERBS_STATS
1146 msgCount++;
1147 #endif
1149 MACHSTATE3(3,"Sending ogm %p of size %d to %d",ogm,size,node->infiData->nodeNo);
1150 //First packet has dgram header, other packets dont
1152 DgramHeaderMake(data, rank, ogm->src, Cmi_charmrun_pid, 1, broot);
1154 CMI_MSG_SIZE(ogm->data)=ogm->size;
1156 if(rdma && size > rdmaThreshold){
1157 EnqueueRdmaPacket(ogm,node);
1158 }else{
1160 while(size > dataSize){
1161 EnqueueDataPacket(ogm,node,rank,data,dataSize,broot,copy);
1162 size -= dataSize;
1163 data += dataSize;
1165 if(size > 0){
1166 EnqueueDataPacket(ogm,node,rank,data,size,broot,copy);
1169 //#if !CMK_SMP
1170 processAllBufferedMsgs();
1171 //#endif
1172 ogm->refcount--;
1173 MACHSTATE3(3,"DONE Sending ogm %p of size %d to %d",ogm,ogm->size,node->infiData->nodeNo);
1177 static inline void EnqueueRdmaPacket(OutgoingMsg ogm, OtherNode node){
1178 infiPacket packet;
1180 ogm->refcount++;
1182 MallocInfiPacket(packet);
1185 struct infiRdmaPacket *rdmaPacket = (struct infiRdmaPacket *)CmiAlloc(sizeof(struct infiRdmaPacket));
1188 packet->size = sizeof(struct infiRdmaPacket);
1189 packet->buf = (char *)rdmaPacket;
1191 struct ibv_mr *key = METADATAFIELD(ogm->data)->key;
1193 CmiAssert(key!=NULL);
1195 MACHSTATE3(3,"ogm->data %p metadata %p key %p",ogm->data,METADATAFIELD(ogm->data),key);
1197 packet->header.code = INFIRDMA_START;
1198 packet->header.nodeNo = CmiMyNodeGlobal();
1199 packet->ogm = NULL;
1201 rdmaPacket->type = INFI_MESG;
1202 rdmaPacket->ogm = ogm;
1203 rdmaPacket->key = *key;
1204 rdmaPacket->keyPtr = key;
1205 rdmaPacket->remoteBuf = ogm->data;
1206 rdmaPacket->remoteSize = ogm->size;
1209 struct ibv_mr *packetKey = METADATAFIELD((void *)rdmaPacket)->key;
1211 MACHSTATE3(3,"rdmaRequest being sent to node %d buf %p size %d",node->infiData->nodeNo,ogm->data,ogm->size);
1212 EnqueuePacket(node,packet,sizeof(struct infiRdmaPacket),packetKey);
1218 static inline void processRecvWC(struct ibv_wc *recvWC,const int toBuffer);
1219 static inline void processSendWC(struct ibv_wc *sendWC);
1220 static unsigned int _count=0;
1221 extern int errno;
1222 static int _countAsync=0;
1223 static inline void processAsyncEvents(){
1224 struct ibv_async_event event;
1225 int ready;
1226 _countAsync++;
1227 if(_countAsync < 1){
1228 return;
1230 _countAsync=0;
1231 FD_SET(context->context->async_fd,&context->asyncFds);
1232 CmiAssert(FD_ISSET(context->context->async_fd,&context->asyncFds));
1233 ready = select(1, &context->asyncFds,NULL,NULL,&context->tmo);
1235 if(ready==0){
1236 return;
1238 if(ready == -1){
1239 // printf("[%d] strerror %s \n",Lrts_myNode,strerror(errno));
1240 return;
1243 if (ibv_get_async_event(context->context, &event)){
1244 return;
1245 CmiAbort("get async event failed");
1247 printf("[%d] async event %d \n",CmiMyNodeGlobal(), event.event_type);
1248 ibv_ack_async_event(&event);
1253 static void pollCmiDirectQ();
1255 static inline void CommunicationServer_nolock(int toBuffer) {
1256 int processed;
1257 if(CmiNumNodesGlobal() <= 1){
1258 pollCmiDirectQ();
1259 return;
1261 MACHSTATE(2,"CommServer_nolock{");
1263 // processAsyncEvents();
1265 // checkAllQps();
1267 pollCmiDirectQ();
1269 processed = pollRecvCq(toBuffer);
1272 processed += pollSendCq(toBuffer);
1274 if(toBuffer == 0){
1275 // if(processed != 0)
1276 processAllBufferedMsgs();
1279 // checkAllQps();
1280 // _count--;
1282 MACHSTATE(2,"} CommServer_nolock ne");
1286 static inline infiBufferedWC createInfiBufferedWC(){
1287 infiBufferedWC ret = malloc(sizeof(struct infiBufferedWCStruct));
1288 ret->count = 0;
1289 ret->next = ret->prev =NULL;
1290 return ret;
1293 /****
1294 The buffered recvWC are stored in a doubly linked list of
1295 arrays or blocks of wcs.
1296 To keep the average insert cost low, a new block is added
1297 to the top of the list. (resulting in a reverse seq of blocks)
1298 Within a block however wc are stored in a sequence
1299 *****/
1300 /*static void insertBufferedRecv(struct ibv_wc *wc){
1301 infiBufferedWC block;
1302 MACHSTATE(3,"Insert Buffered Recv called");
1303 if( context->infiBufferedRecvList ==NULL){
1304 context->infiBufferedRecvList = createInfiBufferedWC();
1305 block = context->infiBufferedRecvList;
1306 }else{
1307 if(context->infiBufferedRecvList->count == WC_BUFFER_SIZE){
1308 block = createInfiBufferedWC();
1309 context->infiBufferedRecvList->prev = block;
1310 block->next = context->infiBufferedRecvList;
1311 context->infiBufferedRecvList = block;
1312 }else{
1313 block = context->infiBufferedRecvList;
1317 block->wcList[block->count] = *wc;
1318 block->count++;
1323 /********
1324 go through the blocks of bufferedWC. Process the last block first.
1325 Then the next one and so on. (Processing within a block should happen
1326 in sequence).
1327 Leave the last block in place to avoid having to allocate again
1328 ******/
1329 /*static inline void processBufferedRecvList(){
1330 infiBufferedWC start;
1331 start = context->infiBufferedRecvList;
1332 while(start->next != NULL){
1333 start = start->next;
1335 while(start != NULL){
1336 int i=0;
1337 infiBufferedWC tmp;
1338 for(i=0;i<start->count;i++){
1339 processRecvWC(&start->wcList[i]);
1341 if(start != context->infiBufferedRecvList){
1342 //not the first one
1343 tmp = start;
1344 start = start->prev;
1345 free(tmp);
1346 start->next = NULL;
1347 }else{
1348 start = start->prev;
1351 context->infiBufferedRecvList->next = NULL;
1352 context->infiBufferedRecvList->prev = NULL;
1353 context->infiBufferedRecvList->count = 0;
1357 static inline int pollRecvCq(const int toBuffer){
1358 int i;
1359 int ne;
1360 struct ibv_wc wc[WC_LIST_SIZE];
1362 MACHSTATE1(2,"pollRecvCq %d (((",toBuffer);
1363 ne = ibv_poll_cq(context->recvCq,WC_LIST_SIZE,&wc[0]);
1364 // CmiAssert(ne >=0);
1366 if(ne != 0){
1367 MACHSTATE1(3,"pollRecvCq ne %d",ne);
1370 for(i=0;i<ne;i++){
1371 if(wc[i].status != IBV_WC_SUCCESS){
1372 CmiAbort("Work completion error in recvCq");
1374 switch(wc[i].opcode){
1375 case IBV_WC_RECV:
1376 processRecvWC(&wc[i],toBuffer);
1377 break;
1378 default:
1379 CmiAbort("Wrong type of work completion object in recvCq");
1380 break;
1384 MACHSTATE1(2,"))) pollRecvCq %d",toBuffer);
1385 return ne;
1389 static inline void processRdmaWC(struct ibv_wc *rdmaWC,const int toBuffer);
1391 static inline int pollSendCq(const int toBuffer){
1392 int i;
1393 int ne;
1394 struct ibv_wc wc[WC_LIST_SIZE];
1396 ne = ibv_poll_cq(context->sendCq,WC_LIST_SIZE,&wc[0]);
1397 // CmiAssert(ne >=0);
1400 for(i=0;i<ne;i++){
1401 if(wc[i].status != IBV_WC_SUCCESS){
1402 printf("[%d] wc[%d] status %d wc[i].opcode %d\n",CmiMyNodeGlobal(),i,wc[i].status,wc[i].opcode);
1403 #if CMK_IBVERBS_STATS
1404 printf("[%d] msgCount %d pktCount %d packetSize %d total Time %.6lf s processBufferedCount %d processBufferedTime %.6lf s maxTokens %d tokensLeft %d minTokensLeft %d \n",CmiMyNodeGlobal(),msgCount,pktCount,packetSize,CmiTimer(),processBufferedCount,processBufferedTime,maxTokens,context->tokensLeft,minTokensLeft);
1405 #endif
1406 CmiAbort("Work completion error in sendCq");
1408 switch(wc[i].opcode){
1409 case IBV_WC_SEND:{
1410 //message received
1411 processSendWC(&wc[i]);
1413 break;
1415 case IBV_WC_RDMA_READ:
1417 // processRdmaWC(&wc[i],toBuffer);
1418 processRdmaWC(&wc[i],1);
1419 break;
1421 case IBV_WC_RDMA_WRITE:
1423 /*** used for CmiDirect puts
1424 Nothing needs to be done on the sender side once send is done **/
1425 break;
1427 default:
1428 CmiAbort("Wrong type of work completion object in sendCq");
1429 break;
1433 return ne;
1437 /******************
1438 Check the communication server socket and
1440 *****************/
1441 int CheckSocketsReady(int withDelayMs)
1443 int nreadable;
1444 CMK_PIPE_DECL(withDelayMs);
1446 CmiStdoutAdd(CMK_PIPE_SUB);
1447 if (Cmi_charmrun_fd!=-1) CMK_PIPE_ADDREAD(Cmi_charmrun_fd);
1449 nreadable=CMK_PIPE_CALL();
1450 ctrlskt_ready_read = 0;
1451 dataskt_ready_read = 0;
1452 dataskt_ready_write = 0;
1454 if (nreadable == 0) {
1455 MACHSTATE(1,"} CheckSocketsReady (nothing readable)")
1456 return nreadable;
1458 if (nreadable==-1) {
1459 CMK_PIPE_CHECKERR();
1460 MACHSTATE(2,"} CheckSocketsReady (INTERRUPTED!)")
1461 return CheckSocketsReady(0);
1463 CmiStdoutCheck(CMK_PIPE_SUB);
1464 if (Cmi_charmrun_fd!=-1)
1465 ctrlskt_ready_read = CMK_PIPE_CHECKREAD(Cmi_charmrun_fd);
1466 MACHSTATE(1,"} CheckSocketsReady")
1467 return nreadable;
1471 /*** Service the charmrun socket
1472 *************/
1474 static void ServiceCharmrun_nolock()
1476 int again = 1;
1477 MACHSTATE(2,"ServiceCharmrun_nolock begin {")
1478 while (again)
1480 again = 0;
1481 CheckSocketsReady(0);
1482 if (ctrlskt_ready_read) { ctrl_getone(); again=1; }
1483 if (CmiStdoutNeedsService()) { CmiStdoutService(); }
1485 MACHSTATE(2,"} ServiceCharmrun_nolock end")
1490 static void CommunicationServerNet(int sleepTime, int where){
1491 if( where == COMM_SERVER_FROM_INTERRUPT){
1492 #if CMK_IMMEDIATE_MSG
1493 CmiHandleImmediate();
1494 #endif
1495 return;
1497 #if CMK_SMP
1498 if(where == COMM_SERVER_FROM_WORKER){
1499 return;
1501 CmiCommLock();
1502 inProgress[CmiMyRank()] += 1;
1503 if(where == COMM_SERVER_FROM_SMP){
1504 #endif
1505 ServiceCharmrun_nolock();
1506 #if CMK_SMP
1508 #endif
1509 CommunicationServer_nolock(0);
1510 #if CMK_SMP
1511 CmiCommUnlock();
1512 inProgress[CmiMyRank()] -= 1;
1513 #endif
1515 /* when called by communication thread or in interrupt */
1516 #if CMK_IMMEDIATE_MSG
1517 if (where == COMM_SERVER_FROM_SMP) {
1518 CmiHandleImmediate();
1520 #endif
1524 static void insertBufferedBcast(char *msg,int size,int broot,int asm_rank);
1527 void static inline handoverMessage(char *newmsg,int total_size,int rank,int broot,int toBuffer);
1529 static inline void processMessage(int nodeNo,int len,char *msg,const int toBuffer){
1530 char *newmsg;
1532 MACHSTATE2(3,"Processing packet from node %d len %d",nodeNo,len);
1534 OtherNode node = &nodes[nodeNo];
1535 newmsg = node->asm_msg;
1537 /// This simple state machine determines if this packet marks the beginning of a new message
1538 // from another node, or if this is another in a sequence of packets
1539 switch(node->infiData->state){
1540 case INFI_HEADER_DATA:
1542 int size;
1543 int rank, srcpe, seqno, magic, i;
1544 unsigned int broot;
1545 DgramHeaderBreak(msg, rank, srcpe, magic, seqno, broot);
1546 size = CMI_MSG_SIZE(msg);
1547 MACHSTATE2(3,"START of a new message from node %d of total size %d",nodeNo,size);
1548 // CmiAssert(size > 0);
1549 // CmiAssert(nodes_by_pe[srcpe] == node);
1551 // CmiAssert(newmsg == NULL);
1552 if(len > size){
1553 //CmiPrintf("size: %d, len:%d.\n", size, len);
1554 CmiAbort("\n\n\t\tLength mismatch!!\n\n");
1556 newmsg = (char *)CmiAlloc(size);
1557 _MEMCHECK(newmsg);
1558 memcpy(newmsg, msg, len);
1559 node->asm_rank = rank;
1560 node->asm_total = size;
1561 node->asm_fill = len;
1562 node->asm_msg = newmsg;
1563 node->infiData->broot = broot;
1564 if(len == size){
1565 //this is the only packet for this message
1566 node->infiData->state = INFI_HEADER_DATA;
1567 }else{
1568 //there are more packets following
1569 node->infiData->state = INFI_DATA;
1571 break;
1573 case INFI_DATA:
1575 if(node->asm_fill + len < node->asm_total && len != dataSize){
1576 //CmiPrintf("from node %d asm_total: %d, asm_fill: %d, len:%d.\n",node->infiData->nodeNo, node->asm_total, node->asm_fill, len);
1577 CmiAbort("packet in the middle does not have expected length");
1579 if(node->asm_fill+len > node->asm_total){
1580 //CmiPrintf("asm_total: %d, asm_fill: %d, len:%d.\n", node->asm_total, node->asm_fill, len);
1581 CmiAbort("\n\n\t\tLength mismatch!!\n\n");
1583 //tODO: remove this
1584 memcpy(newmsg + node->asm_fill,msg,len);
1585 node->asm_fill += len;
1586 if(node->asm_fill == node->asm_total){
1587 node->infiData->state = INFI_HEADER_DATA;
1588 }else{
1589 node->infiData->state = INFI_DATA;
1591 break;
1594 /// if this packet was the last packet in a message ie state was
1595 /// reset to infi_header_data
1597 if(node->infiData->state == INFI_HEADER_DATA){
1598 int total_size = node->asm_total;
1599 node->asm_msg = NULL;
1600 handoverMessage(newmsg,total_size,node->asm_rank,node->infiData->broot,1);
1601 MACHSTATE3(3,"Message from node %d of length %d completely received msg %p",nodeNo,total_size,newmsg);
1606 void static inline handoverMessage(char *newmsg,int total_size,int rank,int broot,int toBuffer){
1608 if(toBuffer){
1609 if((CMI_BROADCAST_ROOT(newmsg)!=0))
1610 insertBufferedBcast(newmsg,total_size,broot,rank);
1611 else handleOneRecvedMsg(total_size, newmsg);
1613 else handleOneRecvedMsg(total_size, newmsg);
1615 if(!toBuffer){
1616 //#if !CMK_SMP
1617 processAllBufferedMsgs();
1618 //#endif
1623 static inline void increasePostedRecvs(int nodeNo);
1624 static inline void processRdmaRequest(struct infiRdmaPacket *rdmaPacket,int fromNodeNo,int isBuffered);
1625 static inline void processRdmaAck(struct infiRdmaPacket *rdmaPacket);
1627 //struct infiDirectRequestPacket;
1628 //static inline void processDirectRequest(struct infiDirectRequestPacket *directRequestPacket);
1630 static inline void processRecvWC(struct ibv_wc *recvWC,const int toBuffer){
1631 struct infiBuffer *buffer = (struct infiBuffer *) recvWC->wr_id;
1632 struct infiPacketHeader *header = (struct infiPacketHeader *)buffer->buf;
1633 int nodeNo = header->nodeNo;
1634 #if CMK_IBVERBS_DEBUG
1635 OtherNode node = &nodes[nodeNo];
1636 #endif
1638 int len = recvWC->byte_len-sizeof(struct infiPacketHeader);
1639 #if CMK_IBVERBS_DEBUG
1641 if(node->infiData->recvPsn == 0){
1642 node->infiData->recvPsn = header->psn;
1643 }else{
1644 CmiAssert(header->psn == (node->infiData->recvPsn)+1);
1645 * node->infiData->recvPsn++;
1648 MACHSTATE3(3,"packet from node %d len %d psn %d",nodeNo,len,header->psn);
1649 #else
1650 MACHSTATE2(3,"packet from node %d len %d",nodeNo,len);
1651 #endif
1653 if(header->code & INFIPACKETCODE_DATA){
1655 processMessage(nodeNo,len,(buffer->buf+sizeof(struct infiPacketHeader)),toBuffer);
1657 if(header->code & INFIDUMMYPACKET){
1658 MACHSTATE(3,"Dummy packet");
1660 if(header->code & INFIBARRIERPACKET){
1661 MACHSTATE(3,"Barrier packet");
1662 CmiAbort("Should not receive Barrier packet in normal polling loop. Your Barrier is broken");
1665 #if CMK_IBVERBS_INCTOKENS
1666 if(header->code & INFIPACKETCODE_INCTOKENS){
1667 increasePostedRecvs(nodeNo);
1669 #endif
1670 if(rdma && header->code & INFIRDMA_START){
1671 struct infiRdmaPacket *rdmaPacket = (struct infiRdmaPacket *)(buffer->buf+sizeof(struct infiPacketHeader));
1672 // if(toBuffer){
1673 //TODO: make a function of this and use for both acks and requests
1674 struct infiRdmaPacket *copyPacket = malloc(sizeof(struct infiRdmaPacket));
1675 struct infiRdmaPacket *tmp=context->bufferedRdmaRequests;
1676 *copyPacket = *rdmaPacket;
1677 copyPacket->fromNodeNo = nodeNo;
1678 MACHSTATE1(3,"Buffering Rdma Request %p",copyPacket);
1679 context->bufferedRdmaRequests = copyPacket;
1680 copyPacket->next = tmp;
1681 copyPacket->prev = NULL;
1682 if(tmp != NULL){
1683 tmp->prev = copyPacket;
1685 /* }else{
1686 processRdmaRequest(rdmaPacket,nodeNo,0);
1689 if(rdma && header->code & INFIRDMA_ACK){
1690 struct infiRdmaPacket *rdmaPacket = (struct infiRdmaPacket *)(buffer->buf+sizeof(struct infiPacketHeader)) ;
1691 processRdmaAck(rdmaPacket);
1693 /* if(header->code & INFIDIRECT_REQUEST){
1694 struct infiDirectRequestPacket *directRequestPacket = (struct infiDirectRequestPacket *)(buffer->buf+sizeof(struct infiPacketHeader));
1695 processDirectRequest(directRequestPacket);
1698 struct ibv_sge list = {
1699 .addr = (uintptr_t) buffer->buf,
1700 .length = buffer->size,
1701 .lkey = buffer->key->lkey
1704 struct ibv_recv_wr wr = {
1705 .wr_id = (uint64_t)buffer,
1706 .sg_list = &list,
1707 .num_sge = 1,
1708 .next = NULL
1710 struct ibv_recv_wr *bad_wr;
1712 if(ibv_post_srq_recv(context->srq,&wr,&bad_wr)){
1713 CmiAbort("ibv_post_srq_recv failed");
1722 static inline void processSendWC(struct ibv_wc *sendWC){
1724 infiPacket packet = (infiPacket )sendWC->wr_id;
1725 #if CMK_IBVERBS_TOKENS_FLOW
1726 // packet->destNode->infiData->tokensLeft++;
1727 context->tokensLeft++;
1728 #endif
1730 MACHSTATE2(3,"Packet send complete node %d tokensLeft %d",packet->destNode->infiData->nodeNo,context->tokensLeft);
1731 if(packet->ogm != NULL){
1732 packet->ogm->refcount--;
1733 if(packet->ogm->refcount == 0){
1734 GarbageCollectMsg(packet->ogm);
1736 }else{
1737 if(packet->header.code == INFIRDMA_START || packet->header.code == INFIRDMA_ACK || packet->header.code == INFIDUMMYPACKET){
1738 if (packet->buf) CmiFree(packet->buf); /* gzheng */
1742 FreeInfiPacket(packet);
1747 /********************************************************************/
1748 static inline void processRdmaRequest(struct infiRdmaPacket *_rdmaPacket,int fromNodeNo,int isBuffered){
1749 int nodeNo = fromNodeNo;
1750 OtherNode node = &nodes[nodeNo];
1751 struct infiRdmaPacket *rdmaPacket;
1753 getFreeTokens(node->infiData);
1754 #if CMK_IBVERBS_TOKENS_FLOW
1755 // node->infiData->tokensLeft--;
1756 context->tokensLeft--;
1757 #if CMK_IBVERBS_STATS
1758 if(context->tokensLeft < minTokensLeft){
1759 minTokensLeft = context->tokensLeft;
1761 #endif
1762 #endif
1764 struct infiBuffer *buffer = malloc(sizeof(struct infiBuffer));
1765 // CmiAssert(buffer != NULL);
1768 if(isBuffered){
1769 rdmaPacket = _rdmaPacket;
1770 }else{
1771 rdmaPacket = malloc(sizeof(struct infiRdmaPacket));
1772 *rdmaPacket = *_rdmaPacket;
1776 rdmaPacket->fromNodeNo = fromNodeNo;
1777 rdmaPacket->localBuffer = (void *)buffer;
1779 buffer->type = BUFFER_RDMA;
1780 buffer->size = rdmaPacket->remoteSize;
1782 buffer->buf = (char *)CmiAlloc(rdmaPacket->remoteSize);
1783 // CmiAssert(buffer->buf != NULL);
1785 buffer->key = METADATAFIELD(buffer->buf)->key;
1788 MACHSTATE3(3,"received rdma request from node %d for remoteBuffer %p keyPtr %p",nodeNo,rdmaPacket->remoteBuf,rdmaPacket->keyPtr);
1789 MACHSTATE3(3,"Local buffer->buf %p buffer->key %p rdmaPacket %p",buffer->buf,buffer->key,rdmaPacket);
1790 // CmiAssert(buffer->key != NULL);
1793 struct ibv_sge list = {
1794 .addr = (uintptr_t )buffer->buf,
1795 .length = buffer->size,
1796 .lkey = buffer->key->lkey
1799 struct ibv_send_wr *bad_wr;
1800 struct ibv_send_wr wr = {
1801 .wr_id = (uint64_t )rdmaPacket,
1802 .sg_list = &list,
1803 .num_sge = 1,
1804 .opcode = IBV_WR_RDMA_READ,
1805 .send_flags = IBV_SEND_SIGNALED,
1806 .wr.rdma = {
1807 .remote_addr = (uint64_t )rdmaPacket->remoteBuf,
1808 .rkey = rdmaPacket->key.rkey
1811 /** post and rdma_read that is a rdma get*/
1812 if(ibv_post_send(node->infiData->qp,&wr,&bad_wr)){
1813 CmiAbort("ibv_post_send failed");
1819 static inline void EnqueueRdmaAck(struct infiRdmaPacket *rdmaPacket);
1820 static inline void processDirectWC(struct infiRdmaPacket *rdmaPacket);
1822 static inline void processRdmaWC(struct ibv_wc *rdmaWC,const int toBuffer){
1823 //rdma get done
1824 #if CMK_IBVERBS_STATS
1825 double _startRegTime;
1826 #endif
1828 struct infiRdmaPacket *rdmaPacket = (struct infiRdmaPacket *) rdmaWC->wr_id;
1829 /* if(rdmaPacket->type == INFI_DIRECT){
1830 processDirectWC(rdmaPacket);
1831 return;
1833 // CmiAssert(rdmaPacket->type == INFI_MESG);
1834 struct infiBuffer *buffer = (struct infiBuffer *)rdmaPacket->localBuffer;
1836 /*TODO: remove this
1837 memcpy(buffer->buf,rdmaInBuf,rdmaWC->byte_len);*/
1839 /* CmiAssert(buffer->type == BUFFER_RDMA);
1840 CmiAssert(rdmaWC->byte_len == buffer->size);*/
1843 int size;
1844 int rank, srcpe, seqno, magic, i;
1845 unsigned int broot;
1846 char *msg = buffer->buf;
1847 DgramHeaderBreak(msg, rank, srcpe, magic, seqno, broot);
1848 size = CMI_MSG_SIZE(msg);
1849 /* CmiAssert(size == buffer->size);*/
1850 handoverMessage(buffer->buf,size,rank,broot,toBuffer);
1852 MACHSTATE2(3,"Rdma done for buffer->buf %p buffer->key %p",buffer->buf,buffer->key);
1855 free(buffer);
1857 OtherNode node=&nodes[rdmaPacket->fromNodeNo];
1858 //we are sending this ack as a response to a successful
1859 // rdma_Read.. the token for that rdma_Read needs to be freed
1860 #if CMK_IBVERBS_TOKENS_FLOW
1861 //node->infiData->tokensLeft++;
1862 context->tokensLeft++;
1863 #endif
1865 //send ack to sender if toBuffer is off otherwise buffer it
1866 if(toBuffer){
1867 MACHSTATE1(3,"Buffering Rdma Ack %p",rdmaPacket);
1868 struct infiRdmaPacket *tmp = context->bufferedRdmaAcks;
1869 context->bufferedRdmaAcks = rdmaPacket;
1870 rdmaPacket->next = tmp;
1871 rdmaPacket->prev = NULL;
1872 if(tmp != NULL){
1873 tmp->prev = rdmaPacket;
1875 }else{
1876 EnqueueRdmaAck(rdmaPacket);
1877 free(rdmaPacket);
1881 static inline void EnqueueRdmaAck(struct infiRdmaPacket *rdmaPacket){
1882 infiPacket packet;
1883 OtherNode node=&nodes[rdmaPacket->fromNodeNo];
1886 MallocInfiPacket(packet);
1888 struct infiRdmaPacket *ackPacket = (struct infiRdmaPacket *)CmiAlloc(sizeof(struct infiRdmaPacket));
1889 *ackPacket = *rdmaPacket;
1890 packet->size = sizeof(struct infiRdmaPacket);
1891 packet->buf = (char *)ackPacket;
1892 packet->header.code = INFIRDMA_ACK;
1893 packet->ogm=NULL;
1895 struct ibv_mr *packetKey = METADATAFIELD((void *)ackPacket)->key;
1898 EnqueuePacket(node,packet,sizeof(struct infiRdmaPacket),packetKey);
1903 static inline void processRdmaAck(struct infiRdmaPacket *rdmaPacket){
1904 MACHSTATE2(3,"rdma ack received for remoteBuf %p size %d",rdmaPacket->remoteBuf,rdmaPacket->remoteSize);
1905 rdmaPacket->ogm->refcount--;
1906 GarbageCollectMsg(rdmaPacket->ogm);
1910 /****************************
1911 Deal with all the buffered (delayed) messages
1912 such as processing recvd broadcasts, sending
1913 rdma acks and processing recvd rdma requests
1914 ******************************/
1917 static inline infiBufferedBcastPool createBcastPool(){
1918 int i;
1919 infiBufferedBcastPool ret = malloc(sizeof(struct infiBufferedBcastPoolStruct));
1920 ret->count = 0;
1921 ret->next = ret->prev = NULL;
1922 for(i=0;i<BCASTLIST_SIZE;i++){
1923 ret->bcastList[i].valid = 0;
1925 return ret;
1927 /****
1928 The buffered bcast messages are stored in a doubly linked list of
1929 arrays or blocks.
1930 To keep the average insert cost low, a new block is added
1931 to the top of the list. (resulting in a reverse seq of blocks)
1932 Within a block however bcast are stored in increasing order sequence
1933 *****/
1935 static void insertBufferedBcast(char *msg,int size,int broot,int asm_rank){
1936 if(context->bufferedBcastList == NULL){
1937 context->bufferedBcastList = createBcastPool();
1938 }else{
1939 if(context->bufferedBcastList->count == BCASTLIST_SIZE){
1940 infiBufferedBcastPool tmp;
1941 tmp = createBcastPool();
1942 context->bufferedBcastList->prev = tmp;
1943 tmp->next = context->bufferedBcastList;
1944 context->bufferedBcastList = tmp;
1947 context->bufferedBcastList->bcastList[context->bufferedBcastList->count].msg = msg;
1948 context->bufferedBcastList->bcastList[context->bufferedBcastList->count].size = size;
1949 context->bufferedBcastList->bcastList[context->bufferedBcastList->count].broot = broot;
1950 context->bufferedBcastList->bcastList[context->bufferedBcastList->count].asm_rank = asm_rank;
1951 context->bufferedBcastList->bcastList[context->bufferedBcastList->count].valid = 1;
1953 MACHSTATE3(3,"Broadcast msg %p of size %d being buffered at count %d ",msg,size,context->bufferedBcastList->count);
1955 context->bufferedBcastList->count++;
1958 /*********
1959 Go through the blocks of buffered bcast messages. process last block first
1960 processign within a block is in sequence though
1961 *********/
1962 static inline void processBufferedBcast(){
1963 infiBufferedBcastPool start;
1965 if(context->bufferedBcastList == NULL){
1966 return;
1968 start = context->bufferedBcastList;
1969 if(context->insideProcessBufferedBcasts==1){
1970 return;
1972 context->insideProcessBufferedBcasts=1;
1974 while(start->next != NULL){
1975 start = start->next;
1978 while(start != NULL){
1979 int i=0;
1980 infiBufferedBcastPool tmp;
1981 if(start->count != 0){
1982 MACHSTATE2(3,"start %p start->count %d[[[",start,start->count);
1984 for(i=0;i<start->count;i++){
1985 if(start->bcastList[i].valid == 0){
1986 continue;
1988 start->bcastList[i].valid=0;
1989 MACHSTATE3(3,"Buffered broadcast msg %p of size %d being processed at %d",start->bcastList[i].msg,start->bcastList[i].size,i);
1991 handleOneRecvedMsg(start->bcastList[i].size, start->bcastList[i].msg);
1992 start->bcastList[i].msg = NULL;
1995 if(start->count != 0){
1996 MACHSTATE2(3,"]]] start %p start->count %d",start,start->count);
1999 tmp = start;
2000 start = start->prev;
2001 free(tmp);
2002 if(start != NULL){
2003 //not the first one
2004 start->next = NULL;
2008 context->bufferedBcastList = NULL;
2009 /* context->bufferedBcastList->prev = NULL;
2010 context->bufferedBcastList->count =0; */
2011 context->insideProcessBufferedBcasts=0;
2012 MACHSTATE(2,"processBufferedBcast done ");
2016 static inline void processBufferedRdmaAcks(){
2017 struct infiRdmaPacket *start = context->bufferedRdmaAcks;
2018 if(start == NULL){
2019 return;
2021 while(start->next != NULL){
2022 start = start->next;
2024 while(start != NULL){
2025 struct infiRdmaPacket *rdmaPacket=start;
2026 MACHSTATE1(3,"Processing Buffered Rdma Ack %p",rdmaPacket);
2027 EnqueueRdmaAck(rdmaPacket);
2028 start = start->prev;
2029 free(rdmaPacket);
2031 context->bufferedRdmaAcks=NULL;
2036 static inline void processBufferedRdmaRequests(){
2037 struct infiRdmaPacket *start = context->bufferedRdmaRequests;
2038 if(start == NULL){
2039 return;
2043 while(start->next != NULL){
2044 start = start->next;
2046 while(start != NULL){
2047 struct infiRdmaPacket *rdmaPacket=start;
2048 MACHSTATE1(3,"Processing Buffered Rdma Request %p",rdmaPacket);
2049 processRdmaRequest(rdmaPacket,rdmaPacket->fromNodeNo,1);
2050 start = start->prev;
2053 context->bufferedRdmaRequests=NULL;
2060 static inline void processAllBufferedMsgs(){
2061 #if CMK_IBVERBS_STATS
2062 double _startTime = CmiWallTimer();
2063 processBufferedCount++;
2064 #endif
2065 processBufferedBcast();
2067 processBufferedRdmaAcks();
2068 processBufferedRdmaRequests();
2069 #if CMK_IBVERBS_STATS
2070 processBufferedTime += (CmiWallTimer()-_startTime);
2071 #endif
2075 /*************************
2076 Increase tokens when short of them
2077 **********/
2078 static inline void increaseTokens(OtherNode node){
2079 int err;
2080 int increase = node->infiData->totalTokens*INCTOKENS_INCREASE;
2081 if(node->infiData->totalTokens + increase > maxTokens){
2082 increase = maxTokens-node->infiData->totalTokens;
2084 node->infiData->totalTokens += increase;
2085 node->infiData->tokensLeft += increase;
2086 MACHSTATE3(3,"Increasing tokens for node %d to %d by %d",node->infiData->nodeNo,node->infiData->totalTokens,increase);
2087 //increase the size of the sendCq
2088 int currentCqSize = context->sendCqSize;
2089 if(ibv_resize_cq(context->sendCq,currentCqSize+increase)){
2090 fprintf(stderr,"[%d] failed to increase cq by %d from %d totalTokens %d \n",CmiMyNodeGlobal() ,increase,currentCqSize, node->infiData->totalTokens);
2091 CmiAbort("ibv_resize_cq failed");
2093 context->sendCqSize+= increase;
2096 static void increasePostedRecvs(int nodeNo){
2097 OtherNode node = &nodes[nodeNo];
2098 int tokenIncrease = node->infiData->postedRecvs*INCTOKENS_INCREASE;
2099 int recvIncrease = tokenIncrease;
2100 if(tokenIncrease+node->infiData->postedRecvs > maxTokens){
2101 tokenIncrease = maxTokens - node->infiData->postedRecvs;
2103 if(tokenIncrease+context->srqSize > maxRecvBuffers){
2104 recvIncrease = maxRecvBuffers-context->srqSize;
2106 node->infiData->postedRecvs+= recvIncrease;
2107 context->srqSize += recvIncrease;
2108 MACHSTATE3(3,"Increase tokens by %d to %d for node %d ",tokenIncrease,node->infiData->postedRecvs,nodeNo);
2109 //increase the size of the recvCq
2110 int currentCqSize = context->recvCqSize;
2111 if(ibv_resize_cq(context->recvCq,currentCqSize+tokenIncrease)){
2112 CmiAbort("ibv_resize_cq failed");
2114 context->recvCqSize += tokenIncrease;
2115 if(recvIncrease > 0){
2116 //create another bufferPool and attach it to the top of the current one
2117 struct infiBufferPool *newPool = allocateInfiBufferPool(recvIncrease,packetSize);
2118 newPool->next = context->recvBufferPool;
2119 context->recvBufferPool = newPool;
2120 postInitialRecvs(newPool,recvIncrease,packetSize);
2128 /*********************************************
2129 Memory management routines for RDMA
2131 ************************************************/
2134 There are INFINUMPOOLS of memory.
2135 The first pool is of size firstBinSize.
2136 The ith pool is of size firstBinSize*2^i
2139 static void initInfiCmiChunkPools(){
2140 int i,j;
2141 int size = firstBinSize;
2142 int nodeSize;
2144 #if THREAD_MULTI_POOL
2145 nodeSize = CmiMyNodeSize() + 1;
2146 infiCmiChunkPools = malloc(sizeof(infiCmiChunkPool *) * nodeSize);
2147 for(i = 0; i < nodeSize; i++){
2148 infiCmiChunkPools[i] = malloc(sizeof(infiCmiChunkPool) * INFINUMPOOLS);
2150 for(j = 0; j < nodeSize; j++){
2151 size = firstBinSize;
2152 for(i=0;i<INFINUMPOOLS;i++){
2153 infiCmiChunkPools[j][i].size = size;
2154 infiCmiChunkPools[j][i].startBuf = NULL;
2155 infiCmiChunkPools[j][i].count = 0;
2156 size *= 2;
2160 // creating the n^2 system of queues
2161 queuePool = malloc(sizeof(PCQueue *) * nodeSize);
2162 for(i = 0; i < nodeSize; i++){
2163 queuePool[i] = malloc(sizeof(PCQueue) * nodeSize);
2165 for(i = 0; i < nodeSize; i++)
2166 for(j = 0; j < nodeSize; j++)
2167 queuePool[i][j] = PCQueueCreate();
2169 #else
2171 size = firstBinSize;
2172 for(i=0;i<INFINUMPOOLS;i++){
2173 infiCmiChunkPools[i].size = size;
2174 infiCmiChunkPools[i].startBuf = NULL;
2175 infiCmiChunkPools[i].count = 0;
2176 size *= 2;
2178 #endif
2182 /***********
2183 Register memory for a part of a received multisend message
2184 *************/
2185 infiCmiChunkMetaData *registerMultiSendMesg(char *msg,int size){
2186 infiCmiChunkMetaData *metaData = malloc(sizeof(infiCmiChunkMetaData));
2187 char *res=msg-sizeof(infiCmiChunkHeader);
2188 metaData->key = ibv_reg_mr(context->pd,res,(size+sizeof(infiCmiChunkHeader)),IBV_ACCESS_REMOTE_READ | IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);
2189 #if CMK_IBVERBS_STATS
2190 numCurReg++;
2191 numReg++;
2192 numMultiSend++;
2193 #endif
2194 CmiAssert(metaData->key!=NULL);
2195 metaData->owner = NULL;
2196 metaData->poolIdx = INFIMULTIPOOL;
2198 return metaData;
2202 #if THREAD_MULTI_POOL
2204 // Fills up the buffer pools for every thread in the node
2205 static inline void fillBufferPools(){
2206 int nodeSize, poolIdx, thread;
2207 infiCmiChunkMetaData *metaData;
2208 infiCmiChunkHeader *hdr;
2209 int allocSize;
2210 int count=1;
2211 int i;
2212 struct ibv_mr *key;
2213 void *res;
2215 // initializing values
2216 nodeSize = CmiMyNodeSize() + 1;
2218 // iterating over all threads and all pools
2219 for(thread = 0; thread < nodeSize; thread++){
2220 for(poolIdx = 0; poolIdx < INFINUMPOOLS; poolIdx++){
2221 allocSize = infiCmiChunkPools[thread][poolIdx].size;
2222 if(poolIdx < blockThreshold){
2223 count = blockAllocRatio;
2224 }else{
2225 count = 1;
2227 posix_memalign(&res, ALIGN_BYTES, (allocSize+sizeof(infiCmiChunkHeader))*count);
2228 hdr = res;
2229 key = ibv_reg_mr(context->pd,res,(allocSize+sizeof(infiCmiChunkHeader))*count,IBV_ACCESS_REMOTE_READ | IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);
2230 CmiAssert(key != NULL);
2231 #if CMK_IBVERBS_STATS
2232 numCurReg++;
2233 numReg++;
2234 #endif
2235 res += sizeof(infiCmiChunkHeader);
2236 for(i=0;i<count;i++){
2237 metaData = METADATAFIELD(res) = malloc(sizeof(infiCmiChunkMetaData));
2238 metaData->key = key;
2239 metaData->owner = hdr;
2240 metaData->poolIdx = poolIdx;
2241 metaData->parentPe = thread; // setting the parent PE
2242 if(i == 0){
2243 metaData->owner->metaData->count = count;
2244 metaData->nextBuf = NULL;
2245 infiCmiChunkPools[thread][poolIdx].startBuf = res - sizeof(infiCmiChunkHeader);
2246 infiCmiChunkPools[thread][poolIdx].count++;
2247 }else{
2248 void *startBuf = res - sizeof(infiCmiChunkHeader);
2249 metaData->nextBuf = infiCmiChunkPools[thread][poolIdx].startBuf;
2250 infiCmiChunkPools[thread][poolIdx].startBuf = startBuf;
2251 infiCmiChunkPools[thread][poolIdx].count++;
2253 if(i != count-1){
2254 res += (allocSize+sizeof(infiCmiChunkHeader));
2261 static inline void *getInfiCmiChunkThread(int dataSize){
2262 //find out to which pool this dataSize belongs to
2263 // poolIdx = floor(log2(dataSize/firstBinSize))+1
2264 int ratio = dataSize/firstBinSize;
2265 int poolIdx=0;
2266 void *res;
2267 int i,j,nodeSize;
2268 void *pointer;
2270 //printf("Hi\n");
2271 MACHSTATE1(2,"Rank=%d",CmiMyRank());
2272 MACHSTATE1(3,"INFI_ALLOC %d",CmiMyRank());
2274 while(ratio > 0){
2275 ratio = ratio >> 1;
2276 poolIdx++;
2278 MACHSTATE1(2,"This is %d",CmiMyRank());
2279 MACHSTATE2(2,"getInfiCmiChunk for size %d in poolIdx %d",dataSize,poolIdx);
2281 // checking whether to analyze the free queues to reuse buffers
2282 nodeSize = CmiMyNodeSize() + 1;
2283 if(poolIdx < INFINUMPOOLS && infiCmiChunkPools[CmiMyRank()][poolIdx].startBuf == NULL){
2284 MACHSTATE1(3,"Disposing memory %d",CmiMyRank());
2285 for(i = 0; i < nodeSize; i++){
2286 if(!PCQueueEmpty(queuePool[CmiMyRank()][i])){
2287 for(j = 0; j < PCQueueLength(queuePool[CmiMyRank()][i]); j++){
2288 pointer = (void *)PCQueuePop(queuePool[CmiMyRank()][i]);
2289 infi_CmiFreeDirect(pointer);
2295 if((poolIdx < INFINUMPOOLS && infiCmiChunkPools[CmiMyRank()][poolIdx].startBuf == NULL) || poolIdx >= INFINUMPOOLS){
2296 infiCmiChunkMetaData *metaData;
2297 infiCmiChunkHeader *hdr;
2298 int allocSize;
2299 int count=1;
2300 int i;
2301 struct ibv_mr *key;
2302 void *origres;
2305 if(poolIdx < INFINUMPOOLS ){
2306 allocSize = infiCmiChunkPools[CmiMyRank()][poolIdx].size;
2307 }else{
2308 allocSize = dataSize;
2311 if(poolIdx < blockThreshold){
2312 count = blockAllocRatio;
2314 posix_memalign(&res, ALIGN_BYTES, (allocSize+sizeof(infiCmiChunkHeader))*count);
2315 _MEMCHECK(res);
2316 hdr = res;
2318 key = ibv_reg_mr(context->pd,res,(allocSize+sizeof(infiCmiChunkHeader))*count,IBV_ACCESS_REMOTE_READ | IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);
2319 if(key == NULL)
2320 CmiAbort("ibv_reg_mr failed to pin memory\n");
2321 #if CMK_IBVERBS_STATS
2322 numCurReg++;
2323 numReg++;
2324 #endif
2326 origres = (res += sizeof(infiCmiChunkHeader));
2328 for(i=0;i<count;i++){
2329 metaData = METADATAFIELD(res) = malloc(sizeof(infiCmiChunkMetaData));
2330 _MEMCHECK(metaData);
2331 metaData->key = key;
2332 metaData->owner = hdr;
2333 metaData->poolIdx = poolIdx;
2334 metaData->parentPe = CmiMyRank(); // setting the parent PE
2336 if(i == 0){
2337 metaData->owner->metaData->count = count;
2338 metaData->nextBuf = NULL;
2339 }else{
2340 void *startBuf = res - sizeof(infiCmiChunkHeader);
2341 metaData->nextBuf = infiCmiChunkPools[CmiMyRank()][poolIdx].startBuf;
2342 infiCmiChunkPools[CmiMyRank()][poolIdx].startBuf = startBuf;
2343 infiCmiChunkPools[CmiMyRank()][poolIdx].count++;
2346 if(i != count-1){
2347 res += (allocSize+sizeof(infiCmiChunkHeader));
2352 MACHSTATE3(3,"AllocSize %d buf %p key %p",allocSize,res,metaData->key);
2354 return origres;
2356 if(poolIdx < INFINUMPOOLS){
2357 infiCmiChunkMetaData *metaData;
2359 res = infiCmiChunkPools[CmiMyRank()][poolIdx].startBuf;
2360 res += sizeof(infiCmiChunkHeader);
2362 MACHSTATE2(2,"Reusing old pool %d buf %p",poolIdx,res);
2363 metaData = METADATAFIELD(res);
2365 infiCmiChunkPools[CmiMyRank()][poolIdx].startBuf = metaData->nextBuf;
2366 MACHSTATE2(1,"Pool %d now has startBuf at %p",poolIdx,infiCmiChunkPools[CmiMyRank()][poolIdx].startBuf);
2368 metaData->nextBuf = NULL;
2369 // CmiAssert(metaData->poolIdx == poolIdx);
2371 infiCmiChunkPools[CmiMyRank()][poolIdx].count--;
2372 return res;
2375 CmiAbort("getInfiCmiChunkThread failed");
2379 #else /* not MULTIPOOL case */
2380 static inline void *getInfiCmiChunk(int dataSize){
2381 //find out to which pool this dataSize belongs to
2382 // poolIdx = floor(log2(dataSize/firstBinSize))+1
2383 int ratio = dataSize/firstBinSize;
2384 int poolIdx=0;
2385 char *res;
2386 #if CMK_IBVERBS_STATS
2387 if(numAlloc>10000 && numAlloc%1000==0)
2389 printf("[%d] numReg %d numUnReg %d numCurReg %d numAlloc %d numFree %d msgCount %d pktCount %d packetSize %d total Time %.6lf s processBufferedCount %d processBufferedTime %.6lf s maxTokens %d tokensLeft %d \n",CmiMyNodeGlobal(),numReg, numUnReg, numCurReg, numAlloc, numFree, msgCount,pktCount,packetSize,CmiTimer(),processBufferedCount,processBufferedTime,maxTokens,context->tokensLeft);
2390 /* printf("[%d] numMultiSendUnreg %d numMultiSend %d numMultiSendFree %d\n", CmiMyNodeGlobal(), numMultiSendUnreg, numMultiSend, numMultiSendFree);*/
2392 #endif
2393 while(ratio > 0){
2394 ratio = ratio >> 1;
2395 poolIdx++;
2397 MACHSTATE2(2,"getInfiCmiChunk for size %d in poolIdx %d",dataSize,poolIdx);
2398 if((poolIdx < INFINUMPOOLS && infiCmiChunkPools[poolIdx].startBuf == NULL) || poolIdx >= INFINUMPOOLS){
2399 infiCmiChunkMetaData *metaData;
2400 infiCmiChunkHeader *hdr;
2401 int allocSize;
2402 int count=1;
2403 int i;
2404 struct ibv_mr *key;
2405 void *origres;
2408 if(poolIdx < INFINUMPOOLS ){
2409 allocSize = infiCmiChunkPools[poolIdx].size;
2410 }else{
2411 allocSize = dataSize;
2414 if(poolIdx < blockThreshold){
2415 count = blockAllocRatio;
2417 posix_memalign(&res, ALIGN_BYTES, (allocSize+sizeof(infiCmiChunkHeader))*count);
2418 hdr = (infiCmiChunkHeader *)res;
2420 key = ibv_reg_mr(context->pd,res,(allocSize+sizeof(infiCmiChunkHeader))*count,IBV_ACCESS_REMOTE_READ | IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);
2421 CmiAssert(key != NULL);
2422 #if CMK_IBVERBS_STATS
2423 numCurReg++;
2424 numReg++;
2425 #endif
2426 origres = (res += sizeof(infiCmiChunkHeader));
2428 for(i=0;i<count;i++){
2429 metaData = METADATAFIELD(res) = malloc(sizeof(infiCmiChunkMetaData));
2430 metaData->key = key;
2431 metaData->owner = hdr;
2432 metaData->poolIdx = poolIdx;
2434 if(i == 0){
2435 metaData->owner->metaData->count = count;
2436 metaData->nextBuf = NULL;
2437 }else{
2438 void *startBuf = res - sizeof(infiCmiChunkHeader);
2439 metaData->nextBuf = infiCmiChunkPools[poolIdx].startBuf;
2440 infiCmiChunkPools[poolIdx].startBuf = startBuf;
2441 infiCmiChunkPools[poolIdx].count++;
2444 if(i != count-1){
2445 res += (allocSize+sizeof(infiCmiChunkHeader));
2450 MACHSTATE3(2,"AllocSize %d buf %p key %p",allocSize,res,metaData->key);
2452 return origres;
2454 if(poolIdx < INFINUMPOOLS){
2455 infiCmiChunkMetaData *metaData;
2457 res = infiCmiChunkPools[poolIdx].startBuf;
2458 res += sizeof(infiCmiChunkHeader);
2460 MACHSTATE2(2,"Reusing old pool %d buf %p",poolIdx,res);
2461 metaData = METADATAFIELD(res);
2463 infiCmiChunkPools[poolIdx].startBuf = metaData->nextBuf;
2464 MACHSTATE2(1,"Pool %d now has startBuf at %p",poolIdx,infiCmiChunkPools[poolIdx].startBuf);
2466 metaData->nextBuf = NULL;
2467 // CmiAssert(metaData->poolIdx == poolIdx);
2469 infiCmiChunkPools[poolIdx].count--;
2470 return res;
2473 CmiAbort("getInfiCmiChunk failed");
2477 #endif
2480 void * infi_CmiAlloc(int size){
2481 char *res;
2482 #if CMK_IBVERBS_STATS
2483 numAlloc++;
2484 #endif
2485 if (Cmi_charmrun_fd == -1) {
2486 posix_memalign(&res, ALIGN_BYTES, size + sizeof(void*));
2487 res += sizeof(void*);
2488 return res;
2490 #if THREAD_MULTI_POOL
2491 res = getInfiCmiChunkThread(size-sizeof(CmiChunkHeader));
2492 res -= sizeof(CmiChunkHeader);
2494 return res;
2495 #else
2496 #if CMK_SMP
2497 CmiMemLock();
2498 #endif
2499 /*( if(size-sizeof(CmiChunkHeader) > firstBinSize){*/
2500 MACHSTATE1(1,"infi_CmiAlloc for dataSize %d",size-sizeof(CmiChunkHeader));
2502 res = (char*)getInfiCmiChunk(size-sizeof(CmiChunkHeader));
2503 res -= sizeof(CmiChunkHeader);
2504 #if CMK_SMP
2505 CmiMemUnlock();
2506 #endif
2507 /* }else{
2508 res = malloc(size);
2511 return res;
2512 #endif
2515 #if THREAD_MULTI_POOL
2516 //Note: this function receives a pointer to the data, so that it is not necessary to add any sizeof(CmiChunkHeader) to it.
2517 void infi_CmiFreeDirect(void *ptr){
2518 int size;
2519 int parentPe;
2520 void *freePtr = ptr;
2521 #if CMK_IBVERBS_STATS
2522 numFree++;
2523 #endif
2525 //ptr += sizeof(CmiChunkHeader);
2526 size = SIZEFIELD (ptr);
2527 /* if(size > firstBinSize){*/
2528 infiCmiChunkMetaData *metaData;
2529 int poolIdx;
2530 //there is a infiniband specific header
2531 freePtr = ptr - sizeof(infiCmiChunkHeader);
2532 metaData = METADATAFIELD(ptr);
2533 poolIdx = metaData->poolIdx;
2534 infiCmiChunkPool *pool = infiCmiChunkPools[CmiMyRank()] + poolIdx;
2535 MACHSTATE2(1,"CmiFree buf %p goes back to pool %d",ptr,poolIdx);
2536 // CmiAssert(poolIdx >= 0);
2537 if(poolIdx < INFINUMPOOLS && pool->count < INFIMAXPERPOOL &&
2538 pool->count < ((1 << INFINUMPOOLS) >> poolIdx) ){
2539 metaData->nextBuf = pool->startBuf;
2540 pool->startBuf = freePtr;
2541 pool->count++;
2542 MACHSTATE3(2,"Pool %d now has startBuf at %p count %d",poolIdx,pool->startBuf,pool->count);
2543 }else{
2544 MACHSTATE2(2,"Freeing up buf %p poolIdx %d",ptr,poolIdx);
2545 metaData->owner->metaData->count--;
2546 if(metaData->owner->metaData == metaData){
2547 //I am the owner
2548 if(metaData->owner->metaData->count == 0){
2549 //all the chunks have been freed
2550 int unregstat=ibv_dereg_mr(metaData->key);
2551 #if CMK_IBVERBS_STATS
2552 numUnReg++;
2553 numCurReg--;
2554 #endif
2556 CmiAssert(unregstat==0);
2557 free(freePtr);
2558 free(metaData);
2560 //if I am the owner and all the chunks have not been
2561 // freed dont free my metaData. will need later
2562 }else{
2563 if(metaData->owner->metaData->count == 0){
2564 //need to free the owner's buffer and metadata
2565 freePtr = metaData->owner;
2566 int unregstat=ibv_dereg_mr(metaData->key);
2567 #if CMK_IBVERBS_STATS
2568 numUnReg++;
2569 numCurReg--;
2570 #endif
2572 CmiAssert(unregstat==0);
2573 free(metaData->owner->metaData);
2574 free(freePtr);
2576 free(metaData);
2582 void infi_CmiFree(void *ptr){
2584 int i,j;
2585 int size;
2586 int parentPe;
2587 int nodeSize;
2588 void *pointer;
2589 void *freePtr = ptr;
2590 nodeSize = CmiMyNodeSize() + 1;
2592 MACHSTATE(3,"Freeing");
2594 if (Cmi_charmrun_fd == -1) { char *res = ptr; res -= sizeof(void*); free(res); return; }
2595 ptr += sizeof(CmiChunkHeader);
2596 size = SIZEFIELD (ptr);
2597 /* if(size > firstBinSize){*/
2598 infiCmiChunkMetaData *metaData;
2599 int poolIdx;
2600 //there is a infiniband specific header
2601 freePtr = ptr - sizeof(infiCmiChunkHeader);
2602 metaData = METADATAFIELD(ptr);
2603 poolIdx = metaData->poolIdx;
2605 if(poolIdx == INFIMULTIPOOL){
2606 /** this is a part of a received mult message
2607 it will be freed correctly later
2609 #if CMK_IBVERBS_STATS
2610 numMultiSendFree++;
2611 #endif
2612 return;
2616 // checking if this free operation is my responsibility
2617 parentPe = metaData->parentPe;
2618 if(parentPe != CmiMyRank()){
2619 PCQueuePush(queuePool[parentPe][CmiMyRank()],(char *)ptr);
2620 return;
2624 infi_CmiFreeDirect(ptr);
2628 #else
2629 void infi_CmiFree(void *ptr){
2630 int size;
2631 void *freePtr = ptr;
2632 #if CMK_IBVERBS_STATS
2633 numFree++;
2634 #endif
2636 if (Cmi_charmrun_fd == -1) { char *res = ptr; res -= sizeof(void*); free(res); return; }
2637 #if CMK_SMP
2638 CmiMemLock();
2639 #endif
2640 ptr += sizeof(CmiChunkHeader);
2641 size = SIZEFIELD (ptr);
2642 /* if(size > firstBinSize){*/
2643 infiCmiChunkMetaData *metaData;
2644 int poolIdx;
2645 //there is a infiniband specific header
2646 freePtr = (char*)ptr - sizeof(infiCmiChunkHeader);
2647 metaData = METADATAFIELD(ptr);
2648 poolIdx = metaData->poolIdx;
2649 if(poolIdx == INFIMULTIPOOL){
2650 /** this is a part of a received mult message
2651 it will be freed correctly later
2653 #if CMK_IBVERBS_STATS
2654 numMultiSendFree++;
2655 #endif
2656 return;
2658 MACHSTATE2(1,"CmiFree buf %p goes back to pool %d",ptr,poolIdx);
2659 // CmiAssert(poolIdx >= 0);
2660 if(poolIdx < INFINUMPOOLS &&
2661 infiCmiChunkPools[poolIdx].count <= INFIMAXPERPOOL &&
2662 infiCmiChunkPools[poolIdx].count < ((1 << INFINUMPOOLS) >> poolIdx) ){
2663 metaData->nextBuf = infiCmiChunkPools[poolIdx].startBuf;
2664 infiCmiChunkPools[poolIdx].startBuf = freePtr;
2665 infiCmiChunkPools[poolIdx].count++;
2667 MACHSTATE3(2,"Pool %d now has startBuf at %p count %d",poolIdx,infiCmiChunkPools[poolIdx].startBuf,infiCmiChunkPools[poolIdx].count);
2668 }else{
2669 MACHSTATE2(2,"Freeing up buf %p poolIdx %d",ptr,poolIdx);
2670 metaData->owner->metaData->count--;
2671 if(metaData->owner->metaData == metaData){
2672 //I am the owner
2673 if(metaData->owner->metaData->count == 0){
2674 //all the chunks have been freed
2675 int unregstat=ibv_dereg_mr(metaData->key);
2676 #if CMK_IBVERBS_STATS
2677 numUnReg++;
2678 numCurReg--;
2679 #endif
2681 CmiAssert(unregstat==0);
2682 free(freePtr);
2683 free(metaData);
2685 //if I am the owner and all the chunks have not been
2686 // freed dont free my metaData. will need later
2687 }else{
2688 if(metaData->owner->metaData->count == 0){
2689 //need to free the owner's buffer and metadata
2690 freePtr = metaData->owner;
2691 int unregstat=ibv_dereg_mr(metaData->key);
2692 #if CMK_IBVERBS_STATS
2693 numUnReg++;
2694 numCurReg--;
2695 #endif
2697 CmiAssert(unregstat==0);
2698 free(metaData->owner->metaData);
2699 free(freePtr);
2701 free(metaData);
2704 #if CMK_SMP
2705 CmiMemUnlock();
2706 #endif
2707 /* }else{
2708 free(freePtr);
2711 #endif
2713 /*********************************************************************************************
2714 This section is for CmiDirect. This is a variant of the persistent communication in which
2715 the user can transfer data between processors without using Charm++ messages. This lets the user
2716 send and receive data from the middle of his arrays without any copying on either send or receive
2717 side
2718 *********************************************************************************************/
2720 struct infiDirectRequestPacket{
2721 int senderProc;
2722 int handle;
2723 struct ibv_mr senderKey;
2724 void *senderBuf;
2725 int senderBufSize;
2728 #include "cmidirect.h"
2730 #define MAXHANDLES 512
2732 struct infiDirectHandleStruct;
2735 typedef struct directPollingQNodeStruct {
2736 struct infiDirectHandleStruct *handle;
2737 struct directPollingQNodeStruct *next;
2738 double *lastDouble;
2739 } directPollingQNode;
2741 typedef struct infiDirectHandleStruct{
2742 int id;
2743 void *buf;
2744 int size;
2745 struct ibv_mr *key;
2746 void (*callbackFnPtr)(void *);
2747 void *callbackData;
2748 // struct infiDirectRequestPacket *packet;
2749 struct infiDirectUserHandle userHandle;
2750 struct infiRdmaPacket *rdmaPacket;
2751 directPollingQNode pollingQNode;
2752 } infiDirectHandle;
2754 typedef struct infiDirectHandleTableStruct{
2755 infiDirectHandle handles[MAXHANDLES];
2756 struct infiDirectHandleTableStruct *next;
2757 } infiDirectHandleTable;
2760 // data structures
2762 directPollingQNode *headDirectPollingQ=NULL,*tailDirectPollingQ=NULL;
2764 static infiDirectHandleTable **sendHandleTable=NULL;
2765 static infiDirectHandleTable **recvHandleTable=NULL;
2767 static int *recvHandleCount=NULL;
2769 void addHandleToPollingQ(infiDirectHandle *handle){
2770 // directPollingQNode *newNode = malloc(sizeof(directPollingQNode));
2771 directPollingQNode *newNode = &(handle->pollingQNode);
2772 newNode->handle = handle;
2773 newNode->next = NULL;
2774 if(headDirectPollingQ==NULL){
2775 /*empty pollingQ*/
2776 headDirectPollingQ = newNode;
2777 tailDirectPollingQ = newNode;
2778 }else{
2779 tailDirectPollingQ->next = newNode;
2780 tailDirectPollingQ = newNode;
2784 infiDirectHandle *removeHandleFromPollingQ(){
2785 if(headDirectPollingQ == NULL){
2786 //polling Q is empty
2787 return NULL;
2789 directPollingQNode *retNode = headDirectPollingQ;
2790 if(headDirectPollingQ == tailDirectPollingQ){
2791 //PollingQ has one node
2792 headDirectPollingQ = tailDirectPollingQ = NULL;
2793 }else{
2794 headDirectPollingQ = headDirectPollingQ->next;
2796 infiDirectHandle *retHandle = retNode->handle;
2797 free(retNode);
2798 return retHandle;
2801 static inline infiDirectHandleTable **createHandleTable(){
2802 infiDirectHandleTable **table = malloc(Lrts_numNodes*sizeof(infiDirectHandleTable *));
2803 int i;
2804 for(i=0;i<Lrts_numNodes;i++){
2805 table[i] = NULL;
2807 return table;
2810 static inline void calcHandleTableIdx(int handle,int *tableIdx,int *idx){
2811 *tableIdx = handle/MAXHANDLES;
2812 *idx = handle%MAXHANDLES;
2815 static inline void initializeLastDouble(void *recvBuf,int recvBufSize,double initialValue)
2817 /** initialize the last double in the buffer to bufize***/
2818 int index = recvBufSize - sizeof(double);
2819 double *lastDouble = (double *)(((char *)recvBuf)+index);
2820 *lastDouble = initialValue;
2825 To be called on the receiver to create a handle and return its number
2827 struct infiDirectUserHandle CmiDirect_createHandle(int senderNode,void *recvBuf, int recvBufSize, void (*callbackFnPtr)(void *), void *callbackData,double initialValue){
2828 int newHandle;
2829 int tableIdx,idx;
2830 int i;
2831 infiDirectHandleTable *table;
2832 struct infiDirectUserHandle userHandle;
2834 CmiAssert(recvBufSize > sizeof(double));
2836 if(recvHandleTable == NULL){
2837 recvHandleTable = createHandleTable();
2838 recvHandleCount = malloc(sizeof(int)*CmiNumNodesGlobal());
2839 for(i=0;i<CmiNumNodesGlobal();i++){
2840 recvHandleCount[i] = -1;
2843 if(recvHandleTable[senderNode] == NULL){
2844 recvHandleTable[senderNode] = malloc(sizeof(infiDirectHandleTable));
2845 recvHandleTable[senderNode]->next = NULL;
2848 newHandle = ++recvHandleCount[senderNode];
2849 CmiAssert(newHandle >= 0);
2851 calcHandleTableIdx(newHandle,&tableIdx,&idx);
2853 table = recvHandleTable[senderNode];
2854 for(i=0;i<tableIdx;i++){
2855 if(table->next ==NULL){
2856 table->next = malloc(sizeof(infiDirectHandleTable));
2857 table->next->next = NULL;
2859 table = table->next;
2861 table->handles[idx].id = newHandle;
2862 table->handles[idx].buf = recvBuf;
2863 table->handles[idx].size = recvBufSize;
2864 #if CMI_DIRECT_DEBUG
2865 CmiPrintf("[%d] RDMA create addr %p %d sizeof(struct ibv_mr) %d\n",CmiMyNodeGlobal(),table->handles[idx].buf,recvBufSize,sizeof(struct ibv_mr));
2866 #endif
2867 table->handles[idx].callbackFnPtr = callbackFnPtr;
2868 table->handles[idx].callbackData = callbackData;
2869 table->handles[idx].key = ibv_reg_mr(context->pd, recvBuf, recvBufSize,IBV_ACCESS_REMOTE_READ | IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);
2870 CmiAssert(table->handles[idx].key != NULL);
2871 #if CMK_IBVERBS_STATS
2872 numCurReg++;
2873 numReg++;
2874 #endif
2875 /* table->handles[idx].rdmaPacket = CmiAlloc(sizeof(struct infiRdmaPacket));
2876 table->handles[idx].rdmaPacket->type = INFI_DIRECT;
2877 table->handles[idx].rdmaPacket->localBuffer = &(table->handles[idx]);*/
2879 userHandle.handle = newHandle;
2880 userHandle.recverNode = CmiMyNodeGlobal();
2881 userHandle.senderNode = senderNode;
2882 userHandle.recverBuf = recvBuf;
2883 userHandle.recverBufSize = recvBufSize;
2884 memcpy(userHandle.recverKey,table->handles[idx].key,sizeof(struct ibv_mr));
2885 userHandle.initialValue = initialValue;
2887 table->handles[idx].userHandle = userHandle;
2889 initializeLastDouble(recvBuf,recvBufSize,initialValue);
2892 int index = table->handles[idx].size - sizeof(double);
2893 table->handles[idx].pollingQNode.lastDouble = (double *)(((char *)table->handles[idx].buf)+index);
2896 addHandleToPollingQ(&(table->handles[idx]));
2898 // MACHSTATE4(3," Newhandle created %d senderProc %d recvBuf %p recvBufSize %d",newHandle,senderProc,recvBuf,recvBufSize);
2900 return userHandle;
2903 /****
2904 To be called on the sender to attach the sender's buffer to this handle
2905 ******/
2906 void CmiDirect_assocLocalBuffer(struct infiDirectUserHandle *userHandle,void *sendBuf,int sendBufSize){
2907 int tableIdx,idx;
2908 int i;
2909 int handle = userHandle->handle;
2910 int recverNode = userHandle->recverNode;
2912 infiDirectHandleTable *table;
2914 if(sendHandleTable == NULL){
2915 sendHandleTable = createHandleTable();
2917 if(sendHandleTable[recverNode] == NULL){
2918 sendHandleTable[recverNode] = malloc(sizeof(infiDirectHandleTable));
2919 sendHandleTable[recverNode]->next = NULL;
2922 CmiAssert(handle >= 0);
2923 calcHandleTableIdx(handle,&tableIdx,&idx);
2925 table = sendHandleTable[recverNode];
2926 for(i=0;i<tableIdx;i++){
2927 if(table->next ==NULL){
2928 table->next = malloc(sizeof(infiDirectHandleTable));
2929 table->next->next = NULL;
2931 table = table->next;
2934 table->handles[idx].id = handle;
2935 table->handles[idx].buf = sendBuf;
2937 table->handles[idx].size = sendBufSize;
2938 #if CMI_DIRECT_DEBUG
2939 CmiPrintf("[%d] RDMA assoc addr %p %d remote addr %p \n",CmiMyPe(),table->handles[idx].buf,sendBufSize,userHandle->recverBuf);
2940 #endif
2941 table->handles[idx].callbackFnPtr = NULL;
2942 table->handles[idx].callbackData = NULL;
2943 table->handles[idx].key = ibv_reg_mr(context->pd, sendBuf, sendBufSize,IBV_ACCESS_REMOTE_READ | IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);
2944 CmiAssert(table->handles[idx].key != NULL);
2945 #if CMK_IBVERBS_STATS
2946 numCurReg++;
2947 numReg++;
2948 #endif
2949 table->handles[idx].userHandle = *userHandle;
2950 CmiAssert(sendBufSize == table->handles[idx].userHandle.recverBufSize);
2952 table->handles[idx].rdmaPacket = CmiAlloc(sizeof(struct infiRdmaPacket));
2953 table->handles[idx].rdmaPacket->type = INFI_DIRECT;
2954 table->handles[idx].rdmaPacket->localBuffer = &(table->handles[idx]);
2957 /* table->handles[idx].packet = (struct infiDirectRequestPacket *)CmiAlloc(sizeof(struct infiDirectRequestPacket));
2958 table->handles[idx].packet->senderProc = Lrts_myNode;
2959 table->handles[idx].packet->handle = handle;
2960 table->handles[idx].packet->senderKey = *(table->handles[idx].key);
2961 table->handles[idx].packet->senderBuf = sendBuf;
2962 table->handles[idx].packet->senderBufSize = sendBufSize;*/
2964 MACHSTATE4(3,"idx %d recverProc %d handle %d sendBuf %p",idx,recverNode,handle,sendBuf);
2971 /****
2972 To be called on the sender to do the actual data transfer
2973 ******/
2974 void CmiDirect_put(struct infiDirectUserHandle *userHandle){
2975 int handle = userHandle->handle;
2976 int recverNode = userHandle->recverNode;
2977 if(recverNode == CmiMyNodeGlobal()){
2978 /*when the sender and receiver are on the same
2979 processor, just look up the sender and receiver
2980 buffers and do a memcpy*/
2982 infiDirectHandleTable *senderTable;
2983 infiDirectHandleTable *recverTable;
2985 int tableIdx,idx,i;
2988 /*find entry for this handle in sender table*/
2989 calcHandleTableIdx(handle,&tableIdx,&idx);
2990 CmiAssert(sendHandleTable!= NULL);
2991 senderTable = sendHandleTable[CmiMyNodeGlobal()];
2992 CmiAssert(senderTable != NULL);
2993 for(i=0;i<tableIdx;i++){
2994 senderTable = senderTable->next;
2997 /**find entry for this handle in recver table*/
2998 recverTable = recvHandleTable[recverNode];
2999 CmiAssert(recverTable != NULL);
3000 for(i=0;i< tableIdx;i++){
3001 recverTable = recverTable->next;
3004 CmiAssert(senderTable->handles[idx].size == recverTable->handles[idx].size);
3005 memcpy(recverTable->handles[idx].buf,senderTable->handles[idx].buf,senderTable->handles[idx].size);
3006 #if CMI_DIRECT_DEBUG
3007 CmiPrintf("[%d] RDMA memcpy put addr %p receiver %p, size %d\n",CmiMyPe(),senderTable->handles[idx].buf,recverTable->handles[idx].buf,senderTable->handles[idx].size);
3008 #endif
3009 // The polling Q should find you and handle the callback and pollingq entry
3010 // (*(recverTable->handles[idx].callbackFnPtr))(recverTable->handles[idx].callbackData);
3013 }else{
3014 infiPacket packet;
3015 int tableIdx,idx;
3016 int i;
3017 OtherNode node;
3018 infiDirectHandleTable *table;
3020 calcHandleTableIdx(handle,&tableIdx,&idx);
3022 table = sendHandleTable[recverNode];
3023 CmiAssert(table != NULL);
3024 for(i=0;i<tableIdx;i++){
3025 table = table->next;
3028 // MACHSTATE2(3,"CmiDirect_put to recverProc %d handle %d",recverProc,handle);
3029 #if CMI_DIRECT_DEBUG
3030 CmiPrintf("[%d] RDMA put addr %p\n",CmiMyPe(),table->handles[idx].buf);
3031 #endif
3036 OtherNode node = &nodes[table->handles[idx].userHandle.recverNode];
3037 struct ibv_sge list = {
3038 .addr = (uintptr_t )table->handles[idx].buf,
3039 .length = table->handles[idx].size,
3040 .lkey = table->handles[idx].key->lkey
3043 struct ibv_mr *remoteKey = (struct ibv_mr *)table->handles[idx].userHandle.recverKey;
3045 struct ibv_send_wr *bad_wr;
3046 struct ibv_send_wr wr = {
3047 .wr_id = (uint64_t)table->handles[idx].rdmaPacket,
3048 .sg_list = &list,
3049 .num_sge = 1,
3050 .opcode = IBV_WR_RDMA_WRITE,
3051 .send_flags = IBV_SEND_SIGNALED,
3053 .wr.rdma = {
3054 .remote_addr = (uint64_t )table->handles[idx].userHandle.recverBuf,
3055 .rkey = remoteKey->rkey
3058 /** post and rdma_read that is a rdma get*/
3059 if(ibv_post_send(node->infiData->qp,&wr,&bad_wr)){
3060 CmiAbort("ibv_post_send failed");
3064 /* MallocInfiPacket (packet);
3066 packet->size = sizeof(struct infiDirectRequestPacket);
3067 packet->buf = (char *)(table->handles[idx].packet);
3068 struct ibv_mr *packetKey = METADATAFIELD((void *)table->handles[idx].packet)->key;
3069 EnqueuePacket(node,packet,sizeof(struct infiDirectRequestPacket),packetKey);
3075 /**** need not be called the first time *********/
3076 void CmiDirect_readyMark(struct infiDirectUserHandle *userHandle){
3077 initializeLastDouble(userHandle->recverBuf,userHandle->recverBufSize,userHandle->initialValue);
3080 /**** need not be called the first time *********/
3081 void CmiDirect_readyPollQ(struct infiDirectUserHandle *userHandle){
3082 int handle = userHandle->handle;
3083 int tableIdx,idx,i;
3084 infiDirectHandleTable *table;
3085 calcHandleTableIdx(handle,&tableIdx,&idx);
3087 table = recvHandleTable[userHandle->senderNode];
3088 CmiAssert(table != NULL);
3089 for(i=0;i<tableIdx;i++){
3090 table = table->next;
3092 #if CMI_DIRECT_DEBUG
3093 CmiPrintf("[%d] CmiDirect_ready receiver %p\n",CmiMyNodeGlobal(),userHandle->recverBuf);
3094 #endif
3095 addHandleToPollingQ(&(table->handles[idx]));
3100 /**** need not be called the first time *********/
3101 void CmiDirect_ready(struct infiDirectUserHandle *userHandle){
3102 int handle = userHandle->handle;
3103 int tableIdx,idx,i;
3104 infiDirectHandleTable *table;
3106 initializeLastDouble(userHandle->recverBuf,userHandle->recverBufSize,userHandle->initialValue);
3108 calcHandleTableIdx(handle,&tableIdx,&idx);
3110 table = recvHandleTable[userHandle->senderNode];
3111 CmiAssert(table != NULL);
3112 for(i=0;i<tableIdx;i++){
3113 table = table->next;
3115 #if CMI_DIRECT_DEBUG
3116 CmiPrintf("[%d] CmiDirect_ready receiver %p\n",CmiMyNodeGlobal(),userHandle->recverBuf);
3117 #endif
3118 addHandleToPollingQ(&(table->handles[idx]));
3123 static int receivedDirectMessage(infiDirectHandle *handle){
3124 // int index = handle->size - sizeof(double);
3125 // double *lastDouble = (double *)(((char *)handle->buf)+index);
3126 if(*(handle->pollingQNode.lastDouble) == handle->userHandle.initialValue){
3127 return 0;
3128 }else{
3129 (*(handle->callbackFnPtr))(handle->callbackData);
3130 return 1;
3136 static void pollCmiDirectQ(){
3137 directPollingQNode *ptr = headDirectPollingQ, *prevPtr=NULL;
3138 while(ptr != NULL){
3139 if(receivedDirectMessage(ptr->handle)){
3140 #if CMI_DIRECT_DEBUG
3141 CmiPrintf("[%d] polling detected recvd message at buf %p\n",CmiMyNodeGlobal(),ptr->handle->userHandle.recverBuf);
3142 #endif
3143 directPollingQNode *delPtr = ptr;
3144 /** has been received and delete this node***/
3145 if(prevPtr == NULL){
3146 /** first in the pollingQ**/
3147 if(headDirectPollingQ == tailDirectPollingQ){
3148 /**only node in pollingQ****/
3149 headDirectPollingQ = tailDirectPollingQ = NULL;
3150 }else{
3151 headDirectPollingQ = headDirectPollingQ->next;
3153 }else{
3154 if(ptr == tailDirectPollingQ){
3155 /**last node is being deleted**/
3156 tailDirectPollingQ = prevPtr;
3158 prevPtr->next = ptr->next;
3160 ptr = ptr->next;
3161 // free(delPtr);
3162 }else{
3163 prevPtr = ptr;
3164 ptr = ptr->next;
3169 void CmiMachineCleanup(){
3170 MACHSTATE(3, "CmiMachineCleanup")
3171 int num_devices;
3172 struct ibv_device **devList;
3173 ibv_dealloc_pd(context->pd);
3174 ibv_close_device(context->context);
3175 devList = ibv_get_device_list(&num_devices);
3176 ibv_free_device_list(devList);
3177 MACHSTATE(3, "CmiMachineCleanup END")
3180 void LrtsNotifyIdle() {}
3181 void LrtsBeginIdle() {}
3182 void LrtsStillIdle() {}
3184 /*void processDirectRequest(struct infiDirectRequestPacket *directRequestPacket){
3185 int senderProc = directRequestPacket->senderProc;
3186 int handle = directRequestPacket->handle;
3187 int tableIdx,idx,i;
3188 infiDirectHandleTable *table;
3189 OtherNode node = nodes_by_pe[senderProc];
3191 MACHSTATE2(3,"processDirectRequest from proc %d handle %d",senderProc,handle);
3193 calcHandleTableIdx(handle,&tableIdx,&idx);
3195 table = recvHandleTable[senderProc];
3196 CmiAssert(table != NULL);
3197 for(i=0;i<tableIdx;i++){
3198 table = table->next;
3201 CmiAssert(table->handles[idx].size == directRequestPacket->senderBufSize);
3204 struct ibv_sge list = {
3205 .addr = (uintptr_t )table->handles[idx].buf,
3206 .length = table->handles[idx].size,
3207 .lkey = table->handles[idx].key->lkey
3210 struct ibv_send_wr *bad_wr;
3211 struct ibv_send_wr wr = {
3212 .wr_id = (uint64_t)table->handles[idx].rdmaPacket,
3213 .sg_list = &list,
3214 .num_sge = 1,
3215 .opcode = IBV_WR_RDMA_READ,
3216 .send_flags = IBV_SEND_SIGNALED,
3217 .wr.rdma = {
3218 .remote_addr = (uint64_t )directRequestPacket->senderBuf,
3219 .rkey = directRequestPacket->senderKey.rkey
3222 // post and rdma_read that is a rdma get
3223 if(ibv_post_send(node->infiData->qp,&wr,&bad_wr)){
3224 CmiEnforce(0);
3229 };*/
3231 void processDirectWC(struct infiRdmaPacket *rdmaPacket){
3232 MACHSTATE(3,"processDirectWC");
3233 infiDirectHandle *handle = (infiDirectHandle *)rdmaPacket->localBuffer;
3234 (*(handle->callbackFnPtr))(handle->callbackData);
3238 #if 0
3240 // use the common one
3242 static void sendBarrierMessage(int pe)
3244 /* we will only need one packet */
3245 int size=32;
3246 OtherNode node = nodes + pe;
3247 infiPacket packet;
3248 MallocInfiPacket(packet);
3249 packet->size = size;
3250 packet->buf = CmiAlloc(size);
3251 packet->header.code = INFIBARRIERPACKET;
3252 struct ibv_mr *key = METADATAFIELD(packet->buf)->key;
3253 MACHSTATE2(3,"Barrier packet to %d size %d",node->infiData->nodeNo,size);
3254 /* pollSendCq(0);*/
3255 EnqueuePacket(node,packet,size,key);
3258 static void recvBarrierMessage()
3260 int i;
3261 int ne;
3262 /* struct ibv_wc wc[WC_LIST_SIZE];*/
3263 struct ibv_wc wc[1];
3264 struct ibv_wc *recvWC;
3265 /* block on the recvq, this is lazy and evil in the general case because we abuse buffers but should be ok for startup barriers */
3266 int toBuffer=1; // buffer without processing recvd messages
3267 int barrierReached=0;
3268 struct infiBuffer *buffer = NULL;
3269 struct infiPacketHeader *header = NULL;
3270 int nodeNo=-1;
3271 int len=-1;
3272 while(!barrierReached)
3274 /* gengbin's semantic will implode if more than one q is polled at a time */
3275 ne = ibv_poll_cq(context->recvCq,1,&wc[0]);
3276 // CmiAssert(ne >=0);
3277 if(ne != 0){
3278 MACHSTATE1(3,"recvBarrier ne %d",ne);
3280 pollSendCq(1);
3281 for(i=0;i<ne;i++){
3282 if(wc[i].status != IBV_WC_SUCCESS){
3283 CmiEnforce(0);
3285 switch(wc[i].opcode){
3286 case IBV_WC_RECV: /* we have something to consider*/
3287 recvWC=&wc[i];
3288 buffer = (struct infiBuffer *) recvWC->wr_id;
3289 header = (struct infiPacketHeader *)buffer->buf;
3290 nodeNo = header->nodeNo;
3291 len = recvWC->byte_len-sizeof(struct infiPacketHeader);
3293 if(header->code & INFIPACKETCODE_DATA){
3294 processMessage(nodeNo,len,(buffer->buf+sizeof(struct infiPacketHeader)),toBuffer);
3296 if(header->code & INFIDUMMYPACKET){
3297 MACHSTATE(3,"Dummy packet");
3299 if(header->code & INFIBARRIERPACKET){
3300 MACHSTATE2(3,"Barrier packet from node %d len %d",nodeNo,len);
3301 // now we are done
3302 barrierReached=1;
3303 /* semantically questionable */
3304 //processAllBufferedMsgs();
3305 //return;
3307 if(rdma && header->code & INFIRDMA_START){
3308 struct infiRdmaPacket *rdmaPacket = (struct infiRdmaPacket *)(buffer->buf+sizeof(struct infiPacketHeader));
3309 // if(toBuffer){
3310 //TODO: make a function of this and use for both acks and requests
3311 struct infiRdmaPacket *copyPacket = malloc(sizeof(struct infiRdmaPacket));
3312 struct infiRdmaPacket *tmp=context->bufferedRdmaRequests;
3313 *copyPacket = *rdmaPacket;
3314 copyPacket->fromNodeNo = nodeNo;
3315 MACHSTATE1(3,"Buffering Rdma Request %p",copyPacket);
3316 context->bufferedRdmaRequests = copyPacket;
3317 copyPacket->next = tmp;
3318 copyPacket->prev = NULL;
3319 if(tmp != NULL){
3320 tmp->prev = copyPacket;
3322 /* }else{
3323 processRdmaRequest(rdmaPacket,nodeNo,0);
3326 if(rdma && header->code & INFIRDMA_ACK){
3327 struct infiRdmaPacket *rdmaPacket = (struct infiRdmaPacket *)(buffer->buf+sizeof(struct infiPacketHeader)) ;
3328 processRdmaAck(rdmaPacket);
3331 struct ibv_sge list = {
3332 .addr = (uintptr_t) buffer->buf,
3333 .length = buffer->size,
3334 .lkey = buffer->key->lkey
3337 struct ibv_recv_wr wr = {
3338 .wr_id = (uint64_t)buffer,
3339 .sg_list = &list,
3340 .num_sge = 1,
3341 .next = NULL
3343 struct ibv_recv_wr *bad_wr;
3345 if(ibv_post_srq_recv(context->srq,&wr,&bad_wr)){
3346 CmiEnforce(0);
3350 break;
3351 default:
3352 CmiAbort("Wrong type of work completion object in recvq");
3353 break;
3357 /* semantically questionable */
3358 // processAllBufferedMsgs();
3362 /* happen at node level */
3363 int CmiBarrier()
3365 int len, size, i;
3366 int status;
3367 int count = 0;
3368 OtherNode node;
3369 int numnodes = CmiNumNodesGlobal();
3370 if (CmiMyRank() == 0) {
3371 /* every one send to pe 0 */
3372 if (CmiMyNodeGlobal() != 0) {
3373 sendBarrierMessage(0);
3375 /* printf("[%d] HERE\n", CmiMyPe()); */
3376 if (CmiMyNodeGlobal() == 0)
3378 for (count = 1; count < numnodes; count ++)
3380 recvBarrierMessage();
3382 /* pe 0 broadcast */
3383 for (i=1; i<=BROADCAST_SPANNING_FACTOR; i++) {
3384 int p = i;
3385 if (p > numnodes - 1) break;
3386 /* printf("[%d] BD => %d \n", CmiMyPe(), p); */
3387 sendBarrierMessage(p);
3390 /* non 0 node waiting */
3391 if (CmiMyNodeGlobal() != 0)
3393 recvBarrierMessage();
3394 for (i=1; i<=BROADCAST_SPANNING_FACTOR; i++) {
3395 int p = CmiMyNodeGlobal();
3396 p = BROADCAST_SPANNING_FACTOR*p + i;
3397 if (p > numnodes - 1) break;
3398 p = p%numnodes;
3399 /* printf("[%d] RELAY => %d \n", CmiMyPe(), p); */
3400 sendBarrierMessage(p);
3404 CmiNodeAllBarrier();
3405 processAllBufferedMsgs();
3406 /* printf("[%d] OUT of barrier \n", CmiMyPe()); */
3409 /* everyone sends a message to pe 0 and go on */
3410 int CmiBarrierZero()
3412 int i;
3414 if (CmiMyRank() == 0) {
3415 if (CmiMyNodeGlobal()) {
3416 sendBarrierMessage(0);
3418 else {
3419 for (i=0; i<CmiNumNodesGlobal()-1; i++)
3421 recvBarrierMessage();
3425 CmiNodeAllBarrier();
3426 processAllBufferedMsgs();
3430 #endif