verbs: verbs seperated from netlrts build
[charm.git] / src / arch / verbs / machine-ibud.c
blobfad8d16ba572f28df00fc3a4a44ecf479b3f7b5e
1 /** @file
2 * ibverbs unreliable datagram implementation of Converse NET version
3 * @ingroup NET
4 * contains only ibverbs specific code for:
5 * - CmiMachineInit()
6 * - CmiCommunicationInit()
7 * - CmiNotifyIdle()
8 * - DeliverViaNetwork()
9 * - CommunicationServer()
11 Eric Shook and Esteban Meneses - Jul 22, 2008
14 /**
15 * @addtogroup NET
16 * @{
19 /** NOTES
20 - Every message sent using the unreliable layer of infiniband must include the GRH (Global Routing Header). The GRH are the first 40 bytes of every packet and the machine layer has no other responsibility over it than reserving the space in the packet.
24 // FIXME: Note: Charm does not guarantee in order messages - can use for bettter performance
27 #include <infiniband/verbs.h>
29 #define WC_LIST_SIZE 32
31 #define INFIPACKETCODE_DATA 1
32 #define INFIDUMMYPACKET 64
33 #define INFIBARRIERPACKET 128
35 #define METADATAFIELD(m) (((infiCmiChunkHeader *)m)[-1].metaData)
38 enum ibv_mtu mtu = IBV_MTU_2048;
39 static int mtu_size;
40 static int maxrecvbuffers;
41 static int maxtokens;
42 static int firstBinSize;
43 static int blockThreshold;
44 static int blockAllocRatio;
45 static int packetsize;
48 struct ibudstruct {
49 struct ibv_device **devlist;
50 struct ibv_device *dev;
53 struct ibudstruct ibud;
55 struct infiPacketHeader{
56 char code;
57 int nodeNo;
60 /** Represents a qp used to send messages to another node
61 There is one for each remote node */
62 struct infiAddr {
63 int lid,qpn,psn;
66 typedef struct infiPacketStruct {
67 char *buf;
68 int size;
69 char extra[40]; // FIXME: check this 40 extra stuff
70 struct infiPacketHeader header;
71 struct ibv_mr *keyHeader;
72 struct OtherNodeStruct *destNode;
73 struct infiPacketStruct *next;
74 OutgoingMsg ogm;
75 struct ibv_sge elemList[2];
76 struct ibv_send_wr wr;
77 }* infiPacket;
79 struct infiContext {
80 struct ibv_context *context;
82 fd_set asyncFds;
83 struct timeval tmo;
85 int ibPort;
86 struct ibv_pd *pd;
87 struct ibv_cq *sendCq;
88 struct ibv_cq *recvCq;
89 struct ibv_srq *srq;
90 struct ibv_mr *mr;
91 struct ibv_ah **ah;
93 struct ibv_qp *qp;
95 struct infiAddr localAddr; //store the lid,qpn,msn address of ur qpair until they are sent
97 struct infiBufferPool *recvBufferPool;
99 infiPacket infiPacketFreeList;
101 struct infiPacketHeader header;
102 int sendCqSize,recvCqSize;
104 void *buffer; // Registered memory buffer for msg's
108 static struct infiContext *context;
112 struct infiOtherNodeData{
113 int state;// does it expect a packet with a header (first packet) or one without
114 int totalTokens;
115 int tokensLeft;
116 int nodeNo;
118 int postedRecvs;
119 int broot;//needed to store the root of a multi-packet broadcast sent along a spanning tree or hypercube
120 struct infiAddr qp;
123 enum { INFI_HEADER_DATA=21,INFI_DATA};
125 typedef struct {
126 int sleepMs; /*Milliseconds to sleep while idle*/
127 int nIdles; /*Number of times we've been idle in a row*/
128 CmiState cs; /*Machine state*/
129 } CmiIdleState;
131 #define BUFFER_RECV 1
132 struct infiBuffer{
133 int type;
134 char *buf;
135 int size;
136 struct ibv_mr *key;
139 // FIXME: This is defined in converse.h for ibverbs
140 //struct infiCmiChunkMetaDataStruct;
142 typedef struct infiCmiChunkMetaDataStruct {
143 struct ibv_mr *key;
144 int poolIdx;
145 void *nextBuf;
146 struct infiCmiChunkHeaderStruct *owner;
147 int count;
148 } infiCmiChunkMetaData;
150 struct infiBufferPool{
151 int numBuffers;
152 struct infiBuffer *buffers;
153 struct infiBufferPool *next;
158 typedef struct infiCmiChunkHeaderStruct{
159 struct infiCmiChunkMetaDataStruct *metaData;
160 CmiChunkHeader chunkHeader;
161 } infiCmiChunkHeader;
163 struct infiCmiChunkMetaDataStruct *registerMultiSendMesg(char *msg,int msgSize);
166 // FIXME: temp for error reading
167 static const char *const __ibv_wc_status_str[] = {
168 "Success",
169 "Local Length Error",
170 "Local QP Operation Error",
171 "Local EE Context Operation Error",
172 "Local Protection Error",
173 "Work Request Flushed Error",
174 "Memory Management Operation Error",
175 "Bad Response Error",
176 "Local Access Error",
177 "Remote Invalid Request Error",
178 "Remote Access Error",
179 "Remote Operation Error",
180 "Transport Retry Counter Exceeded",
181 "RNR Retry Counter Exceeded",
182 "Local RDD Violation Error",
183 "Remote Invalid RD Request",
184 "Aborted Error",
185 "Invalid EE Context Number",
186 "Invalid EE Context State",
187 "Fatal Error",
188 "Response Timeout Error",
189 "General Error"
191 const char *ibv_wc_status_str(enum ibv_wc_status status) {
192 if (status < IBV_WC_SUCCESS || status > IBV_WC_GENERAL_ERR)
193 status = IBV_WC_GENERAL_ERR;
194 return (__ibv_wc_status_str[status]);
197 /***** BEGIN MEMORY MANAGEMENT STUFF *****/
198 typedef struct {
199 int size;//without infiCmiChunkHeader
200 void *startBuf;
201 int count;
202 } infiCmiChunkPool;
205 #define INFIMULTIPOOL -5
206 #define INFINUMPOOLS 20
207 #define INFIMAXPERPOOL 100
209 infiCmiChunkPool infiCmiChunkPools[INFINUMPOOLS];
211 #define FreeInfiPacket(pkt){ \
212 pkt->size = -1;\
213 pkt->ogm=NULL;\
214 pkt->next = context->infiPacketFreeList; \
215 context->infiPacketFreeList = pkt; \
218 #define MallocInfiPacket(pkt) { \
219 infiPacket p = context->infiPacketFreeList; \
220 if(p == NULL){ p = newPacket();} \
221 else{context->infiPacketFreeList = p->next; } \
222 pkt = p;\
225 void infi_unregAndFreeMeta(void *md) {
226 if(md!=NULL && (((infiCmiChunkMetaData *)md)->poolIdx == INFIMULTIPOOL)) {
227 ibv_dereg_mr(((infiCmiChunkMetaData*)md)->key);
228 free(((infiCmiChunkMetaData *)md));
232 static inline void *getInfiCmiChunk(int dataSize){
233 //find out to which pool this dataSize belongs to
234 // poolIdx = floor(log2(dataSize/firstBinSize))+1
235 int ratio = dataSize/firstBinSize;
236 int poolIdx=0;
237 void *res;
239 while(ratio > 0){
240 ratio = ratio >> 1;
241 poolIdx++;
243 MACHSTATE2(2,"getInfiCmiChunk for size %d in poolIdx %d",dataSize,poolIdx);
244 if((poolIdx < INFINUMPOOLS && infiCmiChunkPools[poolIdx].startBuf == NULL) || poolIdx >= INFINUMPOOLS){
245 infiCmiChunkMetaData *metaData;
246 infiCmiChunkHeader *hdr;
247 int allocSize;
248 int count=1;
249 int i;
250 struct ibv_mr *key;
251 void *origres;
254 if(poolIdx < INFINUMPOOLS ){
255 allocSize = infiCmiChunkPools[poolIdx].size;
256 // CmiAssert(allocSize>=dataSize); // FIXME: added this assertion
257 }else{
258 allocSize = dataSize;
261 if(poolIdx < blockThreshold){
262 count = blockAllocRatio;
264 res = malloc((allocSize+sizeof(infiCmiChunkHeader))*count);
265 hdr = res;
267 key = ibv_reg_mr(context->pd,res,(allocSize+sizeof(infiCmiChunkHeader))*count,IBV_ACCESS_LOCAL_WRITE);
268 CmiAssert(key != NULL);
270 origres = (res += sizeof(infiCmiChunkHeader));
272 for(i=0;i<count;i++){
273 metaData = METADATAFIELD(res) = malloc(sizeof(infiCmiChunkMetaData));
274 metaData->key = key;
275 metaData->owner = hdr;
276 metaData->poolIdx = poolIdx;
278 if(i == 0){
279 metaData->owner->metaData->count = count;
280 metaData->nextBuf = NULL;
281 }else{
282 void *startBuf = res - sizeof(infiCmiChunkHeader);
283 metaData->nextBuf = infiCmiChunkPools[poolIdx].startBuf;
284 infiCmiChunkPools[poolIdx].startBuf = startBuf;
285 infiCmiChunkPools[poolIdx].count++;
288 if(i != count-1){
289 res += (allocSize+sizeof(infiCmiChunkHeader));
292 MACHSTATE3(2,"AllocSize %d buf %p key %p",allocSize,res,metaData->key);
294 return origres;
296 if(poolIdx < INFINUMPOOLS){
297 infiCmiChunkMetaData *metaData;
299 res = infiCmiChunkPools[poolIdx].startBuf;
300 res += sizeof(infiCmiChunkHeader);
302 MACHSTATE2(2,"Reusing old pool %d buf %p",poolIdx,res);
303 metaData = METADATAFIELD(res);
305 infiCmiChunkPools[poolIdx].startBuf = metaData->nextBuf;
306 MACHSTATE2(1,"Pool %d now has startBuf at %p",poolIdx,infiCmiChunkPools[poolIdx].startBuf);
308 metaData->nextBuf = NULL;
309 // CmiAssert(metaData->poolIdx == poolIdx);
311 infiCmiChunkPools[poolIdx].count--;
312 return res;
315 CmiAssert(0);
318 void * infi_CmiAlloc(int size){
319 void *res;
321 #if CMK_SMP
322 CmiMemLock();
323 #endif
324 MACHSTATE1(1,"infi_CmiAlloc for dataSize %d",size-sizeof(CmiChunkHeader));
326 res = getInfiCmiChunk(size-sizeof(CmiChunkHeader));
327 res -= sizeof(CmiChunkHeader);
328 #if CMK_SMP
329 CmiMemUnlock();
330 #endif
331 return res;
335 void infi_CmiFree(void *ptr){
336 int size;
337 void *freePtr = ptr;
338 infiCmiChunkMetaData *metaData;
339 int poolIdx;
341 #if CMK_SMP
342 CmiMemLock();
343 #endif
344 ptr += sizeof(CmiChunkHeader);
345 size = SIZEFIELD (ptr);
346 //there is a infiniband specific header
347 freePtr = ptr - sizeof(infiCmiChunkHeader);
348 metaData = METADATAFIELD(ptr);
349 poolIdx = metaData->poolIdx;
350 if(poolIdx == INFIMULTIPOOL){
351 /** this is a part of a received mult message
352 it will be freed correctly later **/
353 return;
355 MACHSTATE2(1,"CmiFree buf %p goes back to pool %d",ptr,poolIdx);
356 if(poolIdx < INFINUMPOOLS && infiCmiChunkPools[poolIdx].count <= INFIMAXPERPOOL){
357 metaData->nextBuf = infiCmiChunkPools[poolIdx].startBuf;
358 infiCmiChunkPools[poolIdx].startBuf = freePtr;
359 infiCmiChunkPools[poolIdx].count++;
360 MACHSTATE3(2,"Pool %d now has startBuf at %p count %d",poolIdx,infiCmiChunkPools[poolIdx].startBuf,infiCmiChunkPools[poolIdx].count);
361 }else{
362 MACHSTATE2(2,"Freeing up buf %p poolIdx %d",ptr,poolIdx);
363 metaData->owner->metaData->count--;
364 if(metaData->owner->metaData == metaData){
365 //I am the owner
366 if(metaData->owner->metaData->count == 0){
367 //all the chunks have been freed
368 ibv_dereg_mr(metaData->key);
369 free(freePtr);
370 free(metaData);
372 //if I am the owner and all the chunks have not been
373 // freed dont free my metaData. will need later
374 }else {
375 if(metaData->owner->metaData->count == 0){
376 //need to free the owner's buffer and metadata
377 freePtr = metaData->owner;
378 ibv_dereg_mr(metaData->key);
379 free(metaData->owner->metaData);
380 free(freePtr);
382 free(metaData);
385 #if CMK_SMP
386 CmiMemUnlock();
387 #endif
391 static void initInfiCmiChunkPools(){
392 int i,j;
393 int size = firstBinSize;
394 int nodeSize;
396 size = firstBinSize;
397 for(i=0;i<INFINUMPOOLS;i++){
398 infiCmiChunkPools[i].size = size;
399 infiCmiChunkPools[i].startBuf = NULL;
400 infiCmiChunkPools[i].count = 0;
401 size *= 2;
408 /***** END MEMORY MANAGEMENT STUFF *****/
412 // Post the buffers as recv work requests
413 void postInitialRecvs(struct infiBufferPool *recvBufferPool,int numRecvs,int sizePerBuffer){
414 int j,err;
415 struct ibv_recv_wr *workRequests = malloc(sizeof(struct ibv_recv_wr)*numRecvs);
416 struct ibv_sge *sgElements = malloc(sizeof(struct ibv_sge)*numRecvs);
417 struct ibv_recv_wr *bad_wr;
419 int startBufferIdx=0;
420 MACHSTATE2(3,"posting %d receives of size %d",numRecvs,sizePerBuffer);
421 for(j=0;j<numRecvs;j++){
422 sgElements[j].addr = (uint64_t) recvBufferPool->buffers[startBufferIdx+j].buf;
423 sgElements[j].length = sizePerBuffer + 40; // we add the 40 bytes of the GRH
424 sgElements[j].lkey = recvBufferPool->buffers[startBufferIdx+j].key->lkey;
425 workRequests[j].wr_id = (uint64_t)&(recvBufferPool->buffers[startBufferIdx+j]);
426 workRequests[j].sg_list = &sgElements[j];
427 workRequests[j].num_sge = 1;
428 if(j != numRecvs-1){
429 workRequests[j].next = &workRequests[j+1];
432 workRequests[numRecvs-1].next = NULL;
433 MACHSTATE(3,"About to call ibv_post_recv");
434 CmiAssert(ibv_post_recv(context->qp,workRequests,&bad_wr)==0);
436 free(workRequests);
437 free(sgElements);
441 struct infiBufferPool * allocateInfiBufferPool(int numRecvs,int sizePerBuffer){
442 int numBuffers;
443 int i;
444 int bigSize;
445 char *bigBuf;
446 struct infiBufferPool *ret;
447 struct ibv_mr *bigKey;
448 int page_size;
450 MACHSTATE2(3,"allocateInfiBufferPool numRecvs %d sizePerBuffer%d ",numRecvs,sizePerBuffer);
452 page_size = sysconf(_SC_PAGESIZE);
453 ret = malloc(sizeof(struct infiBufferPool));
454 ret->next = NULL;
455 numBuffers=ret->numBuffers = numRecvs;
456 ret->buffers = malloc(sizeof(struct infiBuffer)*numBuffers);
457 bigSize = numBuffers*sizePerBuffer;
458 bigBuf = memalign(page_size,bigSize);
459 bigKey = ibv_reg_mr(context->pd,bigBuf,bigSize,IBV_ACCESS_LOCAL_WRITE);
460 CmiAssert(bigKey != NULL);
462 for(i=0;i<numBuffers;i++){
463 struct infiBuffer *buffer = &(ret->buffers[i]);
464 buffer->type = BUFFER_RECV;
465 buffer->size = sizePerBuffer;
466 buffer->buf = &bigBuf[i*sizePerBuffer];
467 buffer->key = bigKey;
469 if(buffer->key == NULL){
470 MACHSTATE2(3,"i %d buffer->buf %p",i,buffer->buf);
471 CmiAssert(buffer->key != NULL);
474 return ret;
479 void infiPostInitialRecvs(){
480 //create the pool and post the receives
481 int numPosts;
483 // we add 40 to the buffer size to handle administrative information
484 context->recvBufferPool = allocateInfiBufferPool(maxrecvbuffers, packetsize + 40); // we add 40 bytes to hold the GRH of Infiniband
485 postInitialRecvs(context->recvBufferPool,maxrecvbuffers,packetsize);
495 static CmiIdleState *CmiNotifyGetState(void) {
496 return NULL;
499 static void CmiNotifyStillIdle(CmiIdleState *s);
500 static void CmiNotifyBeginIdle(CmiIdleState *s) {
501 CmiNotifyStillIdle(s);
505 static inline void CommunicationServer_lock(int toBuffer);
506 static inline void CommunicationServer_nolock(int toBuffer);
508 static void CmiNotifyStillIdle(CmiIdleState *s) {
509 #if CMK_SMP
510 CommunicationServer_lock(0);
511 #else
512 CommunicationServer_nolock(0);
513 #endif
516 /******************************************************************************
518 * CmiNotifyIdle()-- wait until a packet comes in
520 *****************************************************************************/
522 void CmiNotifyIdle(void) {
523 CmiNotifyStillIdle(NULL);
526 /****************************************************************************
528 * CheckSocketsReady
530 * Checks both sockets to see which are readable and which are writeable.
531 * We check all these things at the same time since this can be done for
532 * free with ``select.'' The result is stored in global variables, since
533 * this is essentially global state information and several routines need it.
535 ***************************************************************************/
537 int CheckSocketsReady(int withDelayMs)
539 int nreadable,dataWrite=writeableDgrams || writeableAcks;
540 CMK_PIPE_DECL(withDelayMs);
543 #if CMK_USE_KQUEUE && 0
544 // This implementation doesn't yet work, but potentially is much faster
546 /* Only setup the CMK_PIPE structures the first time they are used.
547 This makes the kqueue implementation much faster.
549 static int first = 1;
550 if(first){
551 first = 0;
552 CmiStdoutAdd(CMK_PIPE_SUB);
553 if (Cmi_charmrun_fd!=-1) { CMK_PIPE_ADDREAD(Cmi_charmrun_fd); }
554 else return 0; /* If there's no charmrun, none of this matters. */
555 if (dataskt!=-1) {
556 CMK_PIPE_ADDREAD(dataskt);
557 CMK_PIPE_ADDWRITE(dataskt);
561 #else
562 CmiStdoutAdd(CMK_PIPE_SUB);
563 if (Cmi_charmrun_fd!=-1) { CMK_PIPE_ADDREAD(Cmi_charmrun_fd); }
564 else return 0; /* If there's no charmrun, none of this matters. */
565 if (dataskt!=-1) {
566 { CMK_PIPE_ADDREAD(dataskt); }
567 if (dataWrite)
568 CMK_PIPE_ADDWRITE(dataskt);
570 #endif
572 nreadable=CMK_PIPE_CALL();
573 ctrlskt_ready_read = 0;
574 dataskt_ready_read = 0;
575 dataskt_ready_write = 0;
577 if (nreadable == 0) {
578 MACHSTATE(1,"} CheckSocketsReady (nothing readable)")
579 return nreadable;
581 if (nreadable==-1) {
582 CMK_PIPE_CHECKERR();
583 MACHSTATE(2,"} CheckSocketsReady (INTERRUPTED!)")
584 return CheckSocketsReady(0);
587 CmiStdoutCheck(CMK_PIPE_SUB);
588 if (Cmi_charmrun_fd!=-1)
589 ctrlskt_ready_read = CMK_PIPE_CHECKREAD(Cmi_charmrun_fd);
590 if (dataskt!=-1) {
591 dataskt_ready_read = CMK_PIPE_CHECKREAD(dataskt);
592 if (dataWrite)
593 dataskt_ready_write = CMK_PIPE_CHECKWRITE(dataskt);
595 return nreadable;
600 static inline infiPacket newPacket(){
601 infiPacket pkt=(infiPacket )CmiAlloc(sizeof(struct infiPacketStruct));
603 pkt->size = -1;
604 pkt->header = context->header;
605 pkt->next = NULL;
606 pkt->destNode = NULL;
607 pkt->keyHeader = METADATAFIELD(pkt)->key;
608 pkt->ogm=NULL;
609 CmiAssert(pkt->keyHeader!=NULL);
611 pkt->elemList[0].addr = (uintptr_t)&(pkt->header);
612 pkt->elemList[0].length = sizeof(struct infiPacketHeader);
613 pkt->elemList[0].lkey = pkt->keyHeader->lkey;
615 pkt->wr.wr_id = (uint64_t)pkt;
616 pkt->wr.sg_list = &(pkt->elemList[0]);
617 pkt->wr.num_sge = 2; //FIXME: should be 2 here
618 pkt->wr.opcode = IBV_WR_SEND;
619 pkt->wr.send_flags = IBV_SEND_SIGNALED;
620 pkt->wr.next = NULL;
622 return pkt;
625 static void inline EnqueuePacket(OtherNode node,infiPacket packet,int size,struct ibv_mr *dataKey){
627 struct ibv_send_wr wr,*bad_wr=NULL;
628 struct ibv_sge list;
629 void *buffer;
630 struct ibv_mr *mr;
631 int pe=node->infiData->nodeNo;
632 int retval;
634 buffer=malloc(128);
635 mr=ibv_reg_mr(context->pd, buffer, 128, IBV_ACCESS_LOCAL_WRITE);
637 //memset(&list, 0, sizeof(struct ibv_sge));
638 list.addr = (uintptr_t) buffer + 40;
639 list.length = 128;
640 list.lkey = mr->lkey;
642 memset(&wr, 0, sizeof(struct ibv_send_wr));
643 //wr.wr_id = 1234;
644 wr.wr_id = (uint64_t)packet;
645 wr.sg_list = &list;
646 wr.num_sge = 1;
647 wr.opcode = IBV_WR_SEND;
648 //wr.send_flags = IBV_SEND_SIGNALED | IBV_SEND_SOLICITED;
649 wr.send_flags = IBV_SEND_SIGNALED;
650 wr.wr.ud.ah = context->ah[pe];
651 wr.wr.ud.remote_qpn = nodes[pe].infiData->qp.qpn;
652 wr.wr.ud.remote_qkey = 0;
654 MACHSTATE3(3," wr_id=%i qp_num=%i lkey=%p",wr.wr_id,wr.wr.ud.remote_qpn,mr->lkey);
656 if(retval = ibv_post_send(context->qp,&wr,&bad_wr)){
657 CmiPrintf("[%d] Sending to node %d failed with return value %d\n",_Cmi_mynode,node->infiData->nodeNo,retval);
658 CmiAssert(0);
660 ibv_dereg_mr(mr);
661 free(buffer);
664 int retval;
665 struct ibv_send_wr *bad_wr=NULL;
666 MACHSTATE(2," here");
668 // FIXME: these were originally [1], but I don't know why
669 packet->elemList[1].addr = (uintptr_t)packet->buf; // FIXME: It works if I add 40 here
670 packet->elemList[1].length = size;
671 packet->elemList[1].lkey = dataKey->lkey;
672 MACHSTATE(2," here");
674 packet->destNode = node;
676 MACHSTATE(2," here1");
677 MACHSTATE1(2," here qp=%i",context->qp);
678 MACHSTATE1(2," here wr=%i",&(packet->wr));
679 MACHSTATE1(2," here wr=%i",&bad_wr);
682 if(ibv_post_send(context->qp,&(packet->wr),&bad_wr)){
683 MACHSTATE(2," problem sending");
684 CmiPrintf("[%d] Sending to node %d failed with return value %d\n",_Cmi_mynode,node->infiData->nodeNo,retval);
685 CmiAssert(0);
687 MACHSTATE(2," here");
688 MACHSTATE2(3,"Packet send size %d node %d ",size,packet->destNode->infiData->nodeNo);
689 MACHSTATE2(2," addr %p lkey %p ",(uintptr_t)packet->buf,dataKey->lkey);
693 static void inline EnqueueDataPacket(OutgoingMsg ogm, char *data, int size, OtherNode node, int rank, int broot) {
694 struct ibv_mr *key;
695 infiPacket packet;
696 MallocInfiPacket(packet);
697 packet->size = size;
698 packet->buf=data;
700 //the nodeNo is added at time of packet allocation
701 packet->header.code = INFIPACKETCODE_DATA;
703 ogm->refcount++;
704 packet->ogm = ogm;
706 key = METADATAFIELD(ogm->data)->key;
707 CmiAssert(key != NULL);
709 EnqueuePacket(node,packet,size,key);
715 /***********************************************************************
716 * DeliverViaNetwork()
718 * This function is responsible for all non-local transmission. This
719 * function takes the outgoing messages, splits it into datagrams and
720 * enqueues them into the Send Queue.
721 ***********************************************************************/
722 void DeliverViaNetwork(OutgoingMsg ogm, OtherNode node, int rank, unsigned int broot, int copy) {
723 int size; char *data;
724 MACHSTATE(3,"DeliverViaNetwork");
725 size=ogm->size;
726 data=ogm->data;
727 DgramHeaderMake(data, rank, ogm->src, Cmi_charmrun_pid, 1, broot); // May not be needed
728 CmiMsgHeaderSetLength(data,size);
729 while(size>Cmi_dgram_max_data) {
730 EnqueueDataPacket(ogm, data, Cmi_dgram_max_data, node, rank, broot);
731 size -= Cmi_dgram_max_data;
732 data += Cmi_dgram_max_data;
735 if(size>0)
736 EnqueueDataPacket(ogm, data, size, node, rank, broot);
741 static void ServiceCharmrun_nolock() {
742 int again = 1;
743 MACHSTATE(2,"ServiceCharmrun_nolock begin {")
744 while (again) {
745 again = 0;
746 CheckSocketsReady(0);
747 if (ctrlskt_ready_read) { // FIXME: this is set in another call
748 ctrl_getone();
749 again=1;
751 if (CmiStdoutNeedsService())
752 CmiStdoutService();
754 MACHSTATE(2,"} ServiceCharmrun_nolock end")
758 void static inline handoverMessage(char *newmsg,int total_size,int rank,int broot,int toBuffer){
759 #if CMK_BROADCAST_SPANNING_TREE | CMK_BROADCAST_HYPERCUBE
760 if (rank == DGRAM_BROADCAST
761 #if CMK_NODE_QUEUE_AVAILABLE
762 || rank == DGRAM_NODEBROADCAST
763 #endif
765 if(toBuffer){
766 insertBufferedBcast(CopyMsg(newmsg,total_size),total_size,broot,rank);
767 }else{
768 #if CMK_BROADCAST_SPANNING_TREE
769 SendSpanningChildren(NULL, 0, total_size, newmsg,broot,rank);
770 #else
771 SendHypercube(NULL, 0, total_size, newmsg,broot,rank);
772 #endif
775 #endif
777 switch (rank) {
778 case DGRAM_BROADCAST: {
779 int i;
780 for (i=1; i<_Cmi_mynodesize; i++){
781 CmiPushPE(i, CopyMsg(newmsg, total_size));
783 CmiPushPE(0, newmsg);
784 break;
786 #if CMK_NODE_QUEUE_AVAILABLE
787 case DGRAM_NODEBROADCAST:
788 case DGRAM_NODEMESSAGE: {
789 CmiPushNode(newmsg);
790 break;
792 #endif
793 default: {
794 CmiPushPE(rank, newmsg);
796 } /* end of switch */
797 // if(!toBuffer){
798 // processAllBufferedMsgs();
799 // }
804 static inline void processMessage(int nodeNo,int len,char *msg,const int toBuffer){
805 char *newmsg;
806 OtherNode node = &nodes[nodeNo];
807 newmsg = node->asm_msg;
809 MACHSTATE2(3,"Processing packet from node %d len %d",nodeNo,len);
811 switch(node->infiData->state){
812 case INFI_HEADER_DATA: {
813 int size;
814 int rank, srcpe, seqno, magic, i;
815 unsigned int broot;
816 DgramHeaderBreak(msg, rank, srcpe, magic, seqno, broot); //FIXME: what does this do?
817 size = CmiMsgHeaderGetLength(msg);
818 MACHSTATE2(3,"START of a new message from node %d of total size %d",nodeNo,size);
819 newmsg = (char *)CmiAlloc(size); // FIXME: is there a better way than to do an alloc?
820 _MEMCHECK(newmsg);
821 memcpy(newmsg, msg, len);
822 node->asm_rank = rank;
823 node->asm_total = size;
824 node->asm_fill = len;
825 node->asm_msg = newmsg;
826 node->infiData->broot = broot;
827 if(len>size) {
828 //there are more packets following
829 node->infiData->state = INFI_DATA;
830 } else if(len == size){
831 //this is the only packet for this message
832 node->infiData->state = INFI_HEADER_DATA;
833 } else { //len < size
834 CmiPrintf("size: %d, len:%d.\n", size, len);
835 CmiAbort("\n\n\t\tLength mismatch!!\n\n");
838 break;
840 case INFI_DATA: {
841 if(node->asm_fill+len<node->asm_total&&len!=Cmi_dgram_max_data){
842 CmiPrintf("from node %d asm_total: %d, asm_fill: %d, len:%d.\n",node->infiData->nodeNo, node->asm_total, node->asm_fill, len);
843 CmiAbort("packet in the middle does not have expected length");
845 if(node->asm_fill+len > node->asm_total){
846 CmiPrintf("asm_total: %d, asm_fill: %d, len:%d.\n", node->asm_total, node->asm_fill, len);
847 CmiAbort("\n\n\t\tLength mismatch!!\n\n");
849 //tODO: remove this
850 memcpy(newmsg + node->asm_fill,msg,len);
851 node->asm_fill += len;
852 if(node->asm_fill == node->asm_total){
853 node->infiData->state = INFI_HEADER_DATA;
854 }else{
855 node->infiData->state = INFI_DATA;
858 break;
861 if(node->infiData->state == INFI_HEADER_DATA){ // then the entire message is ready so hand it over
862 int total_size = node->asm_total;
863 node->asm_msg = NULL;
864 // handoverMessage(newmsg,total_size,node->asm_rank,node->infiData->broot,1); // FIXME: handover the message!
865 MACHSTATE3(3,"Message from node %d of length %d completely received msg %p",nodeNo,total_size,newmsg);
870 void processSendWC(struct ibv_wc *sendWC) {
871 MACHSTATE(3,"processSendWC {");
872 infiPacket packet = (infiPacket )sendWC->wr_id;
873 FreeInfiPacket(packet);
874 MACHSTATE(3,"} processSendWC ");
877 void processRecvWC(struct ibv_wc *recvWC,const int toBuffer) {
878 //ibv_post_recv ...
879 struct infiBuffer *buffer = (struct infiBuffer *) recvWC->wr_id;
880 struct infiPacketHeader *header = (struct infiPacketHeader *)buffer->buf;
881 int nodeNo = header->nodeNo;
883 int len = recvWC->byte_len-sizeof(struct infiPacketHeader);
884 MACHSTATE(3,"processRecvWC {");
885 MACHSTATE2(3,"packet from node %d len %d",nodeNo,len);
887 if(header->code & INFIPACKETCODE_DATA){
888 processMessage(nodeNo,len,(buffer->buf+sizeof(struct infiPacketHeader)),toBuffer);
890 else if(header->code & INFIDUMMYPACKET){
891 MACHSTATE(3,"Dummy packet");
893 else if(header->code & INFIBARRIERPACKET){
894 MACHSTATE(3,"Barrier packet");
895 CmiAbort("Should not receive Barrier packet in normal polling loop. Your Barrier is broken");
899 struct ibv_sge list = {
900 .addr = (uintptr_t) buffer->buf,
901 .length = buffer->size,
902 .lkey = buffer->key->lkey,
905 struct ibv_recv_wr wr = {
906 .wr_id = (uint64_t)buffer,
907 .sg_list = &list,
908 .num_sge = 1,
909 .next = NULL
911 struct ibv_recv_wr *bad_wr;
913 CmiAssert(ibv_post_recv(context->qp,&wr,&bad_wr)==0);
915 MACHSTATE(3,"} processRecvWC ");
919 static inline int pollCq(const int toBuffer,struct ibv_cq *cq) {
920 /* toBuffer ignored for sendCq and is used for recvCq */
921 int i;
922 int ne;
923 struct ibv_wc wc[WC_LIST_SIZE];
925 MACHSTATE1(2,"pollCq %d (((",toBuffer);
926 ne = ibv_poll_cq(cq,WC_LIST_SIZE,&wc[0]);
928 if(ne != 0){
929 MACHSTATE1(3,"pollCq ne %d",ne);
930 if(ne<0)
931 CmiAbort("ibv_poll_cq error");
934 for(i=0;i<ne;i++){
935 // CmiAssert(wc[i].status==IBV_WC_SUCCESS);
936 if(wc[i].status!=IBV_WC_SUCCESS) {
937 MACHSTATE3(3,"wc[%i].status=%i (%s)",i,wc[i].status,ibv_wc_status_str(wc[i].status));
938 MACHSTATE3(3," wr_id=%i qp_num=%i vendor_err=%i",wc[i].wr_id,wc[i].qp_num,wc[i].vendor_err);
939 MACHSTATE1(3," key=%p ",
940 ((struct infiBuffer *)((&wc[i])->wr_id))->key);
941 /* MACHSTATE4(3," lkey=%p buffer=%d length=%d end=%d",
942 ((struct infiBuffer *)((&wc[i])->wr_id))->key->lkey,
943 ((struct infiBuffer *)((&wc[i])->wr_id))->buf,
944 ((struct infiBuffer *)((&wc[i])->wr_id))->size,
945 ((struct infiBuffer *)((&wc[i])->wr_id))->buf+((struct infiBuffer *)((&wc[i])->wr_id))->size);
947 */ CmiAssert(wc[i].status==IBV_WC_SUCCESS);
951 switch(wc[i].opcode){
952 case IBV_WC_SEND: //sending message
953 processSendWC(&wc[i]);
954 break;
955 case IBV_WC_RECV: // recving message
956 processRecvWC(&wc[i],toBuffer);
957 break;
958 default:
959 CmiAbort("Wrong type of work completion object in cq");
960 break;
964 MACHSTATE1(2,"))) pollCq %d",toBuffer);
965 return ne;
969 static inline void CommunicationServer_lock(int toBuffer) {
970 CmiCommLock();
971 CommunicationServer_nolock(0);
972 CmiCommUnlock();
975 static inline void CommunicationServer_nolock(int toBuffer) {
977 if(_Cmi_numnodes <= 1){
978 pollCmiDirectQ();
979 return;
982 MACHSTATE(2,"CommServer_nolock{");
984 // pollCmiDirectQ(); // FIXME: not sure what this does...
986 pollCq(toBuffer,context->sendCq);
987 pollCq(toBuffer,context->recvCq);
989 // if(toBuffer == 0)
990 // processAllBufferedMsgs(); // FIXME : I don't think we need buf'ed msgs
992 MACHSTATE(2,"} CommServer_nolock ne");
998 static uint16_t getLocalLid(struct ibv_context *dev_context, int port){
999 struct ibv_port_attr attr;
1000 if (ibv_query_port(dev_context, port, &attr))
1001 return 0;
1003 return attr.lid;
1007 struct infiAddr* initinfiAddr(int node,int lid,int qpn,int psn) {
1008 struct infiAddr *addr=malloc(sizeof(struct infiAddr));
1010 addr->lid=lid;
1011 addr->qpn=qpn;
1012 addr->psn=psn;
1014 return addr;
1017 struct infiOtherNodeData *initinfiData(int node,int lid,int qpn,int psn) {
1018 //struct infiOtherNodeData *initInfiOtherNodeData(int node,int addr[3]){
1019 struct infiOtherNodeData *ret=malloc(sizeof(struct infiOtherNodeData));
1020 //set qp
1021 ret->qp.lid=lid;
1022 ret->qp.qpn=qpn;
1023 ret->qp.psn=psn;
1025 ret->nodeNo = node;
1026 ret->state = INFI_HEADER_DATA;
1028 MACHSTATE4(3,"Storing node[%i] (lid=%i qpn=%i psn=%i)",node,lid,qpn,psn);
1030 // ret->qp = context->qp;
1031 // ret->totalTokens = tokensPerProcessor;
1032 // ret->tokensLeft = tokensPerProcessor;
1033 // ret->postedRecvs = tokensPerProcessor;
1034 return ret;
1038 /***********************************************************************
1039 * CommunicationServer()
1041 * This function does the scheduling of the tasks related to the
1042 * message sends and receives. It is called from the CmiGeneralSend()
1043 * function, and periodically from the CommunicationInterrupt() (in case
1044 * of the single processor version), and from the comm_thread (for the
1045 * SMP version). Based on which of the data/control read/write sockets
1046 * are ready, the corresponding tasks are called
1048 ***********************************************************************/
1049 void CmiHandleImmediate();
1050 static void CommunicationServer(int sleepTime, int where) {
1051 /* 0: from smp thread
1052 1: from interrupt
1053 2: from worker thread
1056 if(where==COMM_SERVER_FROM_INTERRUPT)
1057 return;
1058 #if CMK_SMP
1059 if(where == COMM_SERVER_FROM_WORKER)
1060 return;
1061 if(where == COMM_SERVER_FROM_SMP) {
1062 ServiceCharmrun_nolock();
1064 CommunicationServer_lock(0);
1065 #else
1066 ServiceCharmrun_nolock();
1067 CommunicationServer_nolock(0);
1068 #endif
1077 static void sendBarrierMessage(int pe) {
1078 /* we will only need one packet */
1079 int size=32;
1080 OtherNode node=nodes+pe;
1081 infiPacket packet;
1082 MallocInfiPacket(packet);
1083 packet->size = size;
1084 packet->buf = CmiAlloc(size);
1085 packet->header.code=INFIBARRIERPACKET;
1086 packet->wr.wr.ud.ah=context->ah[pe];
1087 packet->wr.wr.ud.remote_qpn=nodes[pe].infiData->qp.qpn;
1088 packet->wr.wr.ud.remote_qkey = 0x11111111;
1090 MACHSTATE1(3,"HERE -> %d",packet->header.code);
1091 MACHSTATE2(3,"sending to qpn=%i pe=%i",nodes[pe].infiData->qp.qpn,pe);
1092 struct ibv_mr *key=METADATAFIELD(packet->buf)->key;
1093 MACHSTATE3(3,"Barrier packet to %d size %d wr_id %d",node->infiData->nodeNo,size,packet->wr.wr_id);
1094 EnqueuePacket(node,packet,size,key);
1097 // FIXME: haven't looked at yet
1098 static void recvBarrierMessage() {
1099 int i;
1100 int ne;
1101 /* struct ibv_wc wc[WC_LIST_SIZE];*/
1102 struct ibv_wc wc[1];
1103 struct ibv_wc *recvWC;
1104 /* block on the recvq, this is lazy and evil in the general case because we abuse buffers but should be ok for startup barriers */
1105 int toBuffer=1; // buffer without processing recvd messages
1106 int barrierReached=0;
1107 struct infiBuffer *buffer = NULL;
1108 struct infiPacketHeader *header = NULL;
1109 int nodeNo=-1;
1110 int len=-1;
1111 int count=0; // FIXME: remove debug
1112 MACHSTATE(3,"recvBarrierMessage 0"); // FIXME: REMOVE this debug
1113 while(!barrierReached) {
1114 /* gengbin's semantic will implode if more than one q is polled at a time */
1115 pollCq(toBuffer,context->sendCq); // FIXME: just put this in to fix pollSendCq req - not sure if its correct
1116 ne = ibv_poll_cq(context->recvCq,1,&wc[0]);
1117 if(ne!=0){
1118 MACHSTATE1(3,"recvBarrier ne %d",ne);
1119 CmiAssert(ne>0);
1121 for(i=0;i<ne;i++){
1122 if(wc[i].status != IBV_WC_SUCCESS){
1123 MACHSTATE3(3,"wc[%i].status=%i (%s)",i,wc[i].status,ibv_wc_status_str(wc[i].status));
1124 MACHSTATE3(3," wr_id=%i qp_num=%i vendor_err=%i",wc[i].wr_id,wc[i].qp_num,wc[i].vendor_err);
1126 MACHSTATE4(3," lkey=%d buffer=%d length=%d end=%d",
1127 ((struct infiBuffer *)((&wc[i])->wr_id))->key->lkey,
1128 ((struct infiBuffer *)((&wc[i])->wr_id))->buf,
1129 ((struct infiBuffer *)((&wc[i])->wr_id))->size,
1130 ((struct infiBuffer *)((&wc[i])->wr_id))->buf+((struct infiBuffer *)((&wc[i])->wr_id))->size);
1131 MACHSTATE1(3," key=%p ",
1132 ((struct infiBuffer *)((&wc[i])->wr_id))->key);
1135 CmiAbort("wc.status !=IBV_WC_SUCCESS");
1137 switch(wc[i].opcode){
1138 case IBV_WC_RECV: /* we have something to consider*/
1139 MACHSTATE(3," IN HERE !!!!!!!!!!");
1140 recvWC=&wc[i];
1142 buffer = (struct infiBuffer *) recvWC->wr_id;
1143 header = (struct infiPacketHeader *)(buffer->buf + 40); // add 40 bytes to skip the GRH
1145 nodeNo = header->nodeNo;
1146 len = recvWC->byte_len-sizeof(struct infiPacketHeader);
1147 if(header->code & INFIPACKETCODE_DATA){
1148 processMessage(nodeNo,len,(buffer->buf+sizeof(struct infiPacketHeader)),toBuffer);
1149 } else if(header->code & INFIDUMMYPACKET){
1150 MACHSTATE(3,"Dummy packet");
1151 } else if(header->code & INFIBARRIERPACKET){
1152 MACHSTATE2(3,"Barrier packet from node %d len %d",nodeNo,len);
1153 barrierReached=1;
1154 }else // FIXME: erase this else clause
1155 MACHSTATE2(3,"Ups... %d %d",header->code,nodeNo);
1157 struct ibv_sge list = {
1158 .addr = (uintptr_t) buffer->buf,
1159 .length = buffer->size,
1160 .lkey = buffer->key->lkey
1163 struct ibv_recv_wr wr = {
1164 .wr_id = (uint64_t)buffer,
1165 .sg_list = &list,
1166 .num_sge = 1,
1167 .next = NULL
1169 struct ibv_recv_wr *bad_wr;
1171 CmiAssert(ibv_post_recv(context->qp,&wr,&bad_wr)==0);
1174 break;
1175 default:
1176 CmiAbort("Wrong type of work completion object in recvq");
1177 break;
1181 /* semantically questionable */
1182 // processAllBufferedMsgs();
1187 // FIXME: haven't looked at yet
1188 /* happen at node level */
1189 int CmiBarrier() {
1190 int len, size, i;
1191 int status;
1192 int count = 0;
1193 OtherNode node;
1194 int numnodes = CmiNumNodes();
1195 MACHSTATE1(3,"Barrier 1 rank=%i",CmiMyRank());
1196 if (CmiMyRank() == 0) { /* every one send to pe 0 */
1197 if (CmiMyNode() != 0) {
1199 MACHSTATE(3,"Barrier sendmsg");
1200 sendBarrierMessage(0);
1201 recvBarrierMessage();
1202 for (i=1; i<=BROADCAST_SPANNING_FACTOR; i++) {
1203 int p = CmiMyNode();
1204 p = BROADCAST_SPANNING_FACTOR*p + i;
1205 if (p > numnodes - 1) break;
1206 p = p%numnodes;
1207 /* printf("[%d] RELAY => %d \n", CmiMyPe(), p); */
1208 sendBarrierMessage(p);
1210 } else {
1211 MACHSTATE(3,"Barrier else");
1212 for (count = 1; count < numnodes; count ++) {
1213 recvBarrierMessage();
1215 /* pe 0 broadcast */
1216 for (i=1; i<=BROADCAST_SPANNING_FACTOR; i++) {
1217 int p = i;
1218 if (p > numnodes - 1) break;
1219 /* printf("[%d] BD => %d \n", CmiMyPe(), p); */
1220 sendBarrierMessage(p);
1223 MACHSTATE(3,"Barrier 3");
1225 MACHSTATE(3,"Barrier 4");
1226 CmiNodeAllBarrier();
1227 // processAllBufferedMsgs();
1228 /* printf("[%d] OUT of barrier \n", CmiMyPe()); */
1229 MACHSTATE(3,"Barrier e");
1232 // FIXME: haven't looked at yet
1233 /* everyone sends a message to pe 0 and go on */
1234 int CmiBarrierZero() {
1235 int i;
1237 if (CmiMyRank() == 0) {
1238 if (CmiMyNode()) {
1239 sendBarrierMessage(0);
1240 } else {
1241 for (i=0; i<CmiNumNodes()-1; i++) {
1242 recvBarrierMessage();
1246 CmiNodeAllBarrier();
1247 // processAllBufferedMsgs();
1251 void createqp(struct ibv_device *dev){
1253 context->sendCq = ibv_create_cq(context->context,context->sendCqSize,NULL,NULL,0);
1254 CmiAssert(context->sendCq != NULL);
1255 MACHSTATE1(3,"sendCq created %p",context->sendCq);
1257 context->recvCq = ibv_create_cq(context->context,context->recvCqSize,NULL,NULL,0);
1258 CmiAssert(context->recvCq != NULL);
1259 MACHSTATE2(3,"recvCq created %p %d",context->recvCq,context->recvCqSize);
1262 struct ibv_qp_init_attr attr = {
1263 .qp_context = context->context,
1264 .qp_type = IBV_QPT_UD,
1265 .send_cq = context->sendCq,
1266 .recv_cq = context->recvCq,
1267 .srq = NULL,
1268 .sq_sig_all=0,
1269 .cap = {
1270 .max_send_wr = context->sendCqSize, // FIXME: this isn't right - need to make a smaller number
1271 .max_recv_wr = context->recvCqSize, // FIXME: this isn't right - need to make a smaller number
1272 .max_send_sge = 1,
1273 .max_recv_sge = 1,
1276 context->qp = ibv_create_qp(context->pd,&attr);
1277 CmiAssert(context->qp != NULL);
1278 MACHSTATE1(3,"qp created %p",context->qp);
1281 struct ibv_qp_attr attr;
1282 attr.qp_state = IBV_QPS_INIT;
1283 attr.pkey_index = 0;
1284 attr.port_num = context->ibPort;
1285 attr.qkey = 0x11111111;
1286 if(ibv_modify_qp(context->qp, &attr,
1287 IBV_QP_STATE |
1288 IBV_QP_PKEY_INDEX |
1289 IBV_QP_PORT |
1290 IBV_QP_QKEY))
1291 CmiAbort("Could not modify QP to INIT");
1294 struct ibv_qp_attr attr;
1295 attr.qp_state = IBV_QPS_RTR;
1296 if(ibv_modify_qp(context->qp, &attr, IBV_QP_STATE))
1297 CmiAbort("Could not modify QP to RTR");
1300 struct ibv_qp_attr attr;
1301 attr.qp_state = IBV_QPS_RTS;
1302 attr.sq_psn=context->localAddr.psn;
1303 if(ibv_modify_qp(context->qp, &attr, IBV_QP_STATE|IBV_QP_SQ_PSN))
1304 CmiAbort("Could not modify QP to RTS");
1307 context->localAddr.lid=getLocalLid(context->context,context->ibPort);
1308 context->localAddr.qpn = context->qp->qp_num;
1309 context->localAddr.psn = lrand48() & 0xffffff;
1311 MACHSTATE3(4,"qp information (lid=%i qpn=%i psn=%i)",context->localAddr.lid,context->localAddr.qpn,context->localAddr.psn);
1314 void createah() {
1315 int i,numnodes;
1317 numnodes=_Cmi_numnodes;
1318 context->ah=(struct ibv_ah **)malloc(sizeof(struct ibv_ah *)*numnodes);
1320 for(i=0;i<numnodes;i++) {
1321 // if(i!=_Cmi_mynode) {
1323 struct ibv_ah_attr ah_attr = {
1324 .is_global = 0,
1325 .dlid = nodes[i].infiData->qp.lid,
1326 .sl = 0,
1327 .src_path_bits = 0,
1328 .port_num = context->ibPort,
1330 context->ah[i]=ibv_create_ah(context->pd,&ah_attr);
1331 CmiAssert(context->ah[i]!=0);
1332 MACHSTATE2(4,"ah for node %i lid=%i ",i,ah_attr.dlid);
1334 // }
1339 void CmiMachineInit(char **argv)
1341 int i;
1342 int calcmaxsize;
1343 int lid;
1345 MACHSTATE(3,"CmiMachineInit {");
1346 MACHSTATE2(3,"_Cmi_numnodes %d CmiNumNodes() %d",_Cmi_numnodes,CmiNumNodes());
1347 MACHSTATE1(3,"CmiMyNodeSize() %d",CmiMyNodeSize());
1349 /* copied from ibverbs.c */
1350 firstBinSize = 120;
1351 blockThreshold=8;
1352 blockAllocRatio=16;
1354 mtu_size=1200;
1355 packetsize = mtu_size*4;
1356 Cmi_dgram_max_data=packetsize-sizeof(struct infiPacketHeader);
1357 CmiAssert(Cmi_dgram_max_data>1);
1359 calcmaxsize=8000;
1361 maxrecvbuffers=calcmaxsize;
1362 maxtokens = calcmaxsize;
1364 initInfiCmiChunkPools();
1366 ibud.devlist = ibv_get_device_list(NULL);
1367 CmiAssert(ibud.devlist != NULL);
1369 ibud.dev = *(ibud.devlist);
1370 CmiAssert(ibud.dev != NULL);
1372 MACHSTATE1(3,"device name %s",ibv_get_device_name(ibud.dev));
1374 context = (struct infiContext *)malloc(sizeof(struct infiContext));
1376 MACHSTATE1(3,"context allocated %p",context);
1378 context->sendCqSize = 2; // FIXME: 1?
1379 context->recvCqSize = calcmaxsize+1;
1380 context->ibPort = 1;
1381 context->context = ibv_open_device(ibud.dev); //the context for this infiniband device
1382 CmiAssert(context->context != NULL);
1384 MACHSTATE1(3,"device opened %p",context->context);
1386 context->pd = ibv_alloc_pd(context->context); //protection domain
1387 CmiAssert(context->pd != NULL);
1389 context->header.nodeNo = _Cmi_mynode;
1391 if(_Cmi_numnodes>1) {
1392 createqp(ibud.dev);
1393 //MACHSTATE1(3,"pp post recv=%i",pp_post_recv());
1396 MACHSTATE(3,"} CmiMachineInit");
1399 void CmiCommunicationInit(char **argv) {
1400 MACHSTATE(3,"CmiCommunicationInit {");
1401 if(_Cmi_numnodes>1) {
1402 infiPostInitialRecvs();
1403 createah();
1405 MACHSTATE(3,"} CmiCommunicationInit");
1408 void MachineExit()
1410 ibv_destroy_qp(context->qp);
1411 ibv_dealloc_pd(context->pd);
1412 ibv_close_device(context->context);
1413 ibv_free_device_list(ibud.devlist);