2 * ibverbs unreliable datagram implementation of Converse NET version
4 * contains only ibverbs specific code for:
6 * - CmiCommunicationInit()
8 * - DeliverViaNetwork()
9 * - CommunicationServer()
11 Eric Shook and Esteban Meneses - Jul 22, 2008
20 - Every message sent using the unreliable layer of infiniband must include the GRH (Global Routing Header). The GRH are the first 40 bytes of every packet and the machine layer has no other responsibility over it than reserving the space in the packet.
24 // FIXME: Note: Charm does not guarantee in order messages - can use for bettter performance
27 #include <infiniband/verbs.h>
29 #define WC_LIST_SIZE 32
31 #define INFIPACKETCODE_DATA 1
32 #define INFIDUMMYPACKET 64
33 #define INFIBARRIERPACKET 128
35 #define METADATAFIELD(m) (((infiCmiChunkHeader *)m)[-1].metaData)
38 enum ibv_mtu mtu
= IBV_MTU_2048
;
40 static int maxrecvbuffers
;
42 static int firstBinSize
;
43 static int blockThreshold
;
44 static int blockAllocRatio
;
45 static int packetsize
;
49 struct ibv_device
**devlist
;
50 struct ibv_device
*dev
;
53 struct ibudstruct ibud
;
55 struct infiPacketHeader
{
60 /** Represents a qp used to send messages to another node
61 There is one for each remote node */
66 typedef struct infiPacketStruct
{
69 char extra
[40]; // FIXME: check this 40 extra stuff
70 struct infiPacketHeader header
;
71 struct ibv_mr
*keyHeader
;
72 struct OtherNodeStruct
*destNode
;
73 struct infiPacketStruct
*next
;
75 struct ibv_sge elemList
[2];
76 struct ibv_send_wr wr
;
80 struct ibv_context
*context
;
87 struct ibv_cq
*sendCq
;
88 struct ibv_cq
*recvCq
;
95 struct infiAddr localAddr
; //store the lid,qpn,msn address of ur qpair until they are sent
97 struct infiBufferPool
*recvBufferPool
;
99 infiPacket infiPacketFreeList
;
101 struct infiPacketHeader header
;
102 int sendCqSize
,recvCqSize
;
104 void *buffer
; // Registered memory buffer for msg's
108 static struct infiContext
*context
;
112 struct infiOtherNodeData
{
113 int state
;// does it expect a packet with a header (first packet) or one without
119 int broot
;//needed to store the root of a multi-packet broadcast sent along a spanning tree or hypercube
123 enum { INFI_HEADER_DATA
=21,INFI_DATA
};
126 int sleepMs
; /*Milliseconds to sleep while idle*/
127 int nIdles
; /*Number of times we've been idle in a row*/
128 CmiState cs
; /*Machine state*/
131 #define BUFFER_RECV 1
139 // FIXME: This is defined in converse.h for ibverbs
140 //struct infiCmiChunkMetaDataStruct;
142 typedef struct infiCmiChunkMetaDataStruct
{
146 struct infiCmiChunkHeaderStruct
*owner
;
148 } infiCmiChunkMetaData
;
150 struct infiBufferPool
{
152 struct infiBuffer
*buffers
;
153 struct infiBufferPool
*next
;
158 typedef struct infiCmiChunkHeaderStruct{
159 struct infiCmiChunkMetaDataStruct *metaData;
160 CmiChunkHeader chunkHeader;
161 } infiCmiChunkHeader;
163 struct infiCmiChunkMetaDataStruct *registerMultiSendMesg(char *msg,int msgSize);
166 // FIXME: temp for error reading
167 static const char *const __ibv_wc_status_str
[] = {
169 "Local Length Error",
170 "Local QP Operation Error",
171 "Local EE Context Operation Error",
172 "Local Protection Error",
173 "Work Request Flushed Error",
174 "Memory Management Operation Error",
175 "Bad Response Error",
176 "Local Access Error",
177 "Remote Invalid Request Error",
178 "Remote Access Error",
179 "Remote Operation Error",
180 "Transport Retry Counter Exceeded",
181 "RNR Retry Counter Exceeded",
182 "Local RDD Violation Error",
183 "Remote Invalid RD Request",
185 "Invalid EE Context Number",
186 "Invalid EE Context State",
188 "Response Timeout Error",
191 const char *ibv_wc_status_str(enum ibv_wc_status status
) {
192 if (status
< IBV_WC_SUCCESS
|| status
> IBV_WC_GENERAL_ERR
)
193 status
= IBV_WC_GENERAL_ERR
;
194 return (__ibv_wc_status_str
[status
]);
197 /***** BEGIN MEMORY MANAGEMENT STUFF *****/
199 int size
;//without infiCmiChunkHeader
205 #define INFIMULTIPOOL -5
206 #define INFINUMPOOLS 20
207 #define INFIMAXPERPOOL 100
209 infiCmiChunkPool infiCmiChunkPools
[INFINUMPOOLS
];
211 #define FreeInfiPacket(pkt){ \
214 pkt->next = context->infiPacketFreeList; \
215 context->infiPacketFreeList = pkt; \
218 #define MallocInfiPacket(pkt) { \
219 infiPacket p = context->infiPacketFreeList; \
220 if(p == NULL){ p = newPacket();} \
221 else{context->infiPacketFreeList = p->next; } \
225 void infi_unregAndFreeMeta(void *md
) {
226 if(md
!=NULL
&& (((infiCmiChunkMetaData
*)md
)->poolIdx
== INFIMULTIPOOL
)) {
227 ibv_dereg_mr(((infiCmiChunkMetaData
*)md
)->key
);
228 free(((infiCmiChunkMetaData
*)md
));
232 static inline void *getInfiCmiChunk(int dataSize
){
233 //find out to which pool this dataSize belongs to
234 // poolIdx = floor(log2(dataSize/firstBinSize))+1
235 int ratio
= dataSize
/firstBinSize
;
243 MACHSTATE2(2,"getInfiCmiChunk for size %d in poolIdx %d",dataSize
,poolIdx
);
244 if((poolIdx
< INFINUMPOOLS
&& infiCmiChunkPools
[poolIdx
].startBuf
== NULL
) || poolIdx
>= INFINUMPOOLS
){
245 infiCmiChunkMetaData
*metaData
;
246 infiCmiChunkHeader
*hdr
;
254 if(poolIdx
< INFINUMPOOLS
){
255 allocSize
= infiCmiChunkPools
[poolIdx
].size
;
256 // CmiAssert(allocSize>=dataSize); // FIXME: added this assertion
258 allocSize
= dataSize
;
261 if(poolIdx
< blockThreshold
){
262 count
= blockAllocRatio
;
264 res
= malloc((allocSize
+sizeof(infiCmiChunkHeader
))*count
);
267 key
= ibv_reg_mr(context
->pd
,res
,(allocSize
+sizeof(infiCmiChunkHeader
))*count
,IBV_ACCESS_LOCAL_WRITE
);
268 CmiAssert(key
!= NULL
);
270 origres
= (res
+= sizeof(infiCmiChunkHeader
));
272 for(i
=0;i
<count
;i
++){
273 metaData
= METADATAFIELD(res
) = malloc(sizeof(infiCmiChunkMetaData
));
275 metaData
->owner
= hdr
;
276 metaData
->poolIdx
= poolIdx
;
279 metaData
->owner
->metaData
->count
= count
;
280 metaData
->nextBuf
= NULL
;
282 void *startBuf
= res
- sizeof(infiCmiChunkHeader
);
283 metaData
->nextBuf
= infiCmiChunkPools
[poolIdx
].startBuf
;
284 infiCmiChunkPools
[poolIdx
].startBuf
= startBuf
;
285 infiCmiChunkPools
[poolIdx
].count
++;
289 res
+= (allocSize
+sizeof(infiCmiChunkHeader
));
292 MACHSTATE3(2,"AllocSize %d buf %p key %p",allocSize
,res
,metaData
->key
);
296 if(poolIdx
< INFINUMPOOLS
){
297 infiCmiChunkMetaData
*metaData
;
299 res
= infiCmiChunkPools
[poolIdx
].startBuf
;
300 res
+= sizeof(infiCmiChunkHeader
);
302 MACHSTATE2(2,"Reusing old pool %d buf %p",poolIdx
,res
);
303 metaData
= METADATAFIELD(res
);
305 infiCmiChunkPools
[poolIdx
].startBuf
= metaData
->nextBuf
;
306 MACHSTATE2(1,"Pool %d now has startBuf at %p",poolIdx
,infiCmiChunkPools
[poolIdx
].startBuf
);
308 metaData
->nextBuf
= NULL
;
309 // CmiAssert(metaData->poolIdx == poolIdx);
311 infiCmiChunkPools
[poolIdx
].count
--;
318 void * infi_CmiAlloc(int size
){
324 MACHSTATE1(1,"infi_CmiAlloc for dataSize %d",size
-sizeof(CmiChunkHeader
));
326 res
= getInfiCmiChunk(size
-sizeof(CmiChunkHeader
));
327 res
-= sizeof(CmiChunkHeader
);
335 void infi_CmiFree(void *ptr
){
338 infiCmiChunkMetaData
*metaData
;
344 ptr
+= sizeof(CmiChunkHeader
);
345 size
= SIZEFIELD (ptr
);
346 //there is a infiniband specific header
347 freePtr
= ptr
- sizeof(infiCmiChunkHeader
);
348 metaData
= METADATAFIELD(ptr
);
349 poolIdx
= metaData
->poolIdx
;
350 if(poolIdx
== INFIMULTIPOOL
){
351 /** this is a part of a received mult message
352 it will be freed correctly later **/
355 MACHSTATE2(1,"CmiFree buf %p goes back to pool %d",ptr
,poolIdx
);
356 if(poolIdx
< INFINUMPOOLS
&& infiCmiChunkPools
[poolIdx
].count
<= INFIMAXPERPOOL
){
357 metaData
->nextBuf
= infiCmiChunkPools
[poolIdx
].startBuf
;
358 infiCmiChunkPools
[poolIdx
].startBuf
= freePtr
;
359 infiCmiChunkPools
[poolIdx
].count
++;
360 MACHSTATE3(2,"Pool %d now has startBuf at %p count %d",poolIdx
,infiCmiChunkPools
[poolIdx
].startBuf
,infiCmiChunkPools
[poolIdx
].count
);
362 MACHSTATE2(2,"Freeing up buf %p poolIdx %d",ptr
,poolIdx
);
363 metaData
->owner
->metaData
->count
--;
364 if(metaData
->owner
->metaData
== metaData
){
366 if(metaData
->owner
->metaData
->count
== 0){
367 //all the chunks have been freed
368 ibv_dereg_mr(metaData
->key
);
372 //if I am the owner and all the chunks have not been
373 // freed dont free my metaData. will need later
375 if(metaData
->owner
->metaData
->count
== 0){
376 //need to free the owner's buffer and metadata
377 freePtr
= metaData
->owner
;
378 ibv_dereg_mr(metaData
->key
);
379 free(metaData
->owner
->metaData
);
391 static void initInfiCmiChunkPools(){
393 int size
= firstBinSize
;
397 for(i
=0;i
<INFINUMPOOLS
;i
++){
398 infiCmiChunkPools
[i
].size
= size
;
399 infiCmiChunkPools
[i
].startBuf
= NULL
;
400 infiCmiChunkPools
[i
].count
= 0;
408 /***** END MEMORY MANAGEMENT STUFF *****/
412 // Post the buffers as recv work requests
413 void postInitialRecvs(struct infiBufferPool
*recvBufferPool
,int numRecvs
,int sizePerBuffer
){
415 struct ibv_recv_wr
*workRequests
= malloc(sizeof(struct ibv_recv_wr
)*numRecvs
);
416 struct ibv_sge
*sgElements
= malloc(sizeof(struct ibv_sge
)*numRecvs
);
417 struct ibv_recv_wr
*bad_wr
;
419 int startBufferIdx
=0;
420 MACHSTATE2(3,"posting %d receives of size %d",numRecvs
,sizePerBuffer
);
421 for(j
=0;j
<numRecvs
;j
++){
422 sgElements
[j
].addr
= (uint64_t) recvBufferPool
->buffers
[startBufferIdx
+j
].buf
;
423 sgElements
[j
].length
= sizePerBuffer
+ 40; // we add the 40 bytes of the GRH
424 sgElements
[j
].lkey
= recvBufferPool
->buffers
[startBufferIdx
+j
].key
->lkey
;
425 workRequests
[j
].wr_id
= (uint64_t)&(recvBufferPool
->buffers
[startBufferIdx
+j
]);
426 workRequests
[j
].sg_list
= &sgElements
[j
];
427 workRequests
[j
].num_sge
= 1;
429 workRequests
[j
].next
= &workRequests
[j
+1];
432 workRequests
[numRecvs
-1].next
= NULL
;
433 MACHSTATE(3,"About to call ibv_post_recv");
434 CmiAssert(ibv_post_recv(context
->qp
,workRequests
,&bad_wr
)==0);
441 struct infiBufferPool
* allocateInfiBufferPool(int numRecvs
,int sizePerBuffer
){
446 struct infiBufferPool
*ret
;
447 struct ibv_mr
*bigKey
;
450 MACHSTATE2(3,"allocateInfiBufferPool numRecvs %d sizePerBuffer%d ",numRecvs
,sizePerBuffer
);
452 page_size
= sysconf(_SC_PAGESIZE
);
453 ret
= malloc(sizeof(struct infiBufferPool
));
455 numBuffers
=ret
->numBuffers
= numRecvs
;
456 ret
->buffers
= malloc(sizeof(struct infiBuffer
)*numBuffers
);
457 bigSize
= numBuffers
*sizePerBuffer
;
458 bigBuf
= memalign(page_size
,bigSize
);
459 bigKey
= ibv_reg_mr(context
->pd
,bigBuf
,bigSize
,IBV_ACCESS_LOCAL_WRITE
);
460 CmiAssert(bigKey
!= NULL
);
462 for(i
=0;i
<numBuffers
;i
++){
463 struct infiBuffer
*buffer
= &(ret
->buffers
[i
]);
464 buffer
->type
= BUFFER_RECV
;
465 buffer
->size
= sizePerBuffer
;
466 buffer
->buf
= &bigBuf
[i
*sizePerBuffer
];
467 buffer
->key
= bigKey
;
469 if(buffer
->key
== NULL
){
470 MACHSTATE2(3,"i %d buffer->buf %p",i
,buffer
->buf
);
471 CmiAssert(buffer
->key
!= NULL
);
479 void infiPostInitialRecvs(){
480 //create the pool and post the receives
483 // we add 40 to the buffer size to handle administrative information
484 context
->recvBufferPool
= allocateInfiBufferPool(maxrecvbuffers
, packetsize
+ 40); // we add 40 bytes to hold the GRH of Infiniband
485 postInitialRecvs(context
->recvBufferPool
,maxrecvbuffers
,packetsize
);
495 static CmiIdleState
*CmiNotifyGetState(void) {
499 static void CmiNotifyStillIdle(CmiIdleState
*s
);
500 static void CmiNotifyBeginIdle(CmiIdleState
*s
) {
501 CmiNotifyStillIdle(s
);
505 static inline void CommunicationServer_lock(int toBuffer
);
506 static inline void CommunicationServer_nolock(int toBuffer
);
508 static void CmiNotifyStillIdle(CmiIdleState
*s
) {
510 CommunicationServer_lock(0);
512 CommunicationServer_nolock(0);
516 /******************************************************************************
518 * CmiNotifyIdle()-- wait until a packet comes in
520 *****************************************************************************/
522 void CmiNotifyIdle(void) {
523 CmiNotifyStillIdle(NULL
);
526 /****************************************************************************
530 * Checks both sockets to see which are readable and which are writeable.
531 * We check all these things at the same time since this can be done for
532 * free with ``select.'' The result is stored in global variables, since
533 * this is essentially global state information and several routines need it.
535 ***************************************************************************/
537 int CheckSocketsReady(int withDelayMs
)
539 int nreadable
,dataWrite
=writeableDgrams
|| writeableAcks
;
540 CMK_PIPE_DECL(withDelayMs
);
543 #if CMK_USE_KQUEUE && 0
544 // This implementation doesn't yet work, but potentially is much faster
546 /* Only setup the CMK_PIPE structures the first time they are used.
547 This makes the kqueue implementation much faster.
549 static int first
= 1;
552 CmiStdoutAdd(CMK_PIPE_SUB
);
553 if (Cmi_charmrun_fd
!=-1) { CMK_PIPE_ADDREAD(Cmi_charmrun_fd
); }
554 else return 0; /* If there's no charmrun, none of this matters. */
556 CMK_PIPE_ADDREAD(dataskt
);
557 CMK_PIPE_ADDWRITE(dataskt
);
562 CmiStdoutAdd(CMK_PIPE_SUB
);
563 if (Cmi_charmrun_fd
!=-1) { CMK_PIPE_ADDREAD(Cmi_charmrun_fd
); }
564 else return 0; /* If there's no charmrun, none of this matters. */
566 { CMK_PIPE_ADDREAD(dataskt
); }
568 CMK_PIPE_ADDWRITE(dataskt
);
572 nreadable
=CMK_PIPE_CALL();
573 ctrlskt_ready_read
= 0;
574 dataskt_ready_read
= 0;
575 dataskt_ready_write
= 0;
577 if (nreadable
== 0) {
578 MACHSTATE(1,"} CheckSocketsReady (nothing readable)")
583 MACHSTATE(2,"} CheckSocketsReady (INTERRUPTED!)")
584 return CheckSocketsReady(0);
587 CmiStdoutCheck(CMK_PIPE_SUB
);
588 if (Cmi_charmrun_fd
!=-1)
589 ctrlskt_ready_read
= CMK_PIPE_CHECKREAD(Cmi_charmrun_fd
);
591 dataskt_ready_read
= CMK_PIPE_CHECKREAD(dataskt
);
593 dataskt_ready_write
= CMK_PIPE_CHECKWRITE(dataskt
);
600 static inline infiPacket
newPacket(){
601 infiPacket pkt
=(infiPacket
)CmiAlloc(sizeof(struct infiPacketStruct
));
604 pkt
->header
= context
->header
;
606 pkt
->destNode
= NULL
;
607 pkt
->keyHeader
= METADATAFIELD(pkt
)->key
;
609 CmiAssert(pkt
->keyHeader
!=NULL
);
611 pkt
->elemList
[0].addr
= (uintptr_t)&(pkt
->header
);
612 pkt
->elemList
[0].length
= sizeof(struct infiPacketHeader
);
613 pkt
->elemList
[0].lkey
= pkt
->keyHeader
->lkey
;
615 pkt
->wr
.wr_id
= (uint64_t)pkt
;
616 pkt
->wr
.sg_list
= &(pkt
->elemList
[0]);
617 pkt
->wr
.num_sge
= 2; //FIXME: should be 2 here
618 pkt
->wr
.opcode
= IBV_WR_SEND
;
619 pkt
->wr
.send_flags
= IBV_SEND_SIGNALED
;
625 static void inline EnqueuePacket(OtherNode node
,infiPacket packet
,int size
,struct ibv_mr
*dataKey
){
627 struct ibv_send_wr wr,*bad_wr=NULL;
631 int pe=node->infiData->nodeNo;
635 mr=ibv_reg_mr(context->pd, buffer, 128, IBV_ACCESS_LOCAL_WRITE);
637 //memset(&list, 0, sizeof(struct ibv_sge));
638 list.addr = (uintptr_t) buffer + 40;
640 list.lkey = mr->lkey;
642 memset(&wr, 0, sizeof(struct ibv_send_wr));
644 wr.wr_id = (uint64_t)packet;
647 wr.opcode = IBV_WR_SEND;
648 //wr.send_flags = IBV_SEND_SIGNALED | IBV_SEND_SOLICITED;
649 wr.send_flags = IBV_SEND_SIGNALED;
650 wr.wr.ud.ah = context->ah[pe];
651 wr.wr.ud.remote_qpn = nodes[pe].infiData->qp.qpn;
652 wr.wr.ud.remote_qkey = 0;
654 MACHSTATE3(3," wr_id=%i qp_num=%i lkey=%p",wr.wr_id,wr.wr.ud.remote_qpn,mr->lkey);
656 if(retval = ibv_post_send(context->qp,&wr,&bad_wr)){
657 CmiPrintf("[%d] Sending to node %d failed with return value %d\n",_Cmi_mynode,node->infiData->nodeNo,retval);
665 struct ibv_send_wr
*bad_wr
=NULL
;
666 MACHSTATE(2," here");
668 // FIXME: these were originally [1], but I don't know why
669 packet
->elemList
[1].addr
= (uintptr_t)packet
->buf
; // FIXME: It works if I add 40 here
670 packet
->elemList
[1].length
= size
;
671 packet
->elemList
[1].lkey
= dataKey
->lkey
;
672 MACHSTATE(2," here");
674 packet
->destNode
= node
;
676 MACHSTATE(2," here1");
677 MACHSTATE1(2," here qp=%i",context
->qp
);
678 MACHSTATE1(2," here wr=%i",&(packet
->wr
));
679 MACHSTATE1(2," here wr=%i",&bad_wr
);
682 if(ibv_post_send(context
->qp
,&(packet
->wr
),&bad_wr
)){
683 MACHSTATE(2," problem sending");
684 CmiPrintf("[%d] Sending to node %d failed with return value %d\n",_Cmi_mynode
,node
->infiData
->nodeNo
,retval
);
687 MACHSTATE(2," here");
688 MACHSTATE2(3,"Packet send size %d node %d ",size
,packet
->destNode
->infiData
->nodeNo
);
689 MACHSTATE2(2," addr %p lkey %p ",(uintptr_t)packet
->buf
,dataKey
->lkey
);
693 static void inline EnqueueDataPacket(OutgoingMsg ogm
, char *data
, int size
, OtherNode node
, int rank
, int broot
) {
696 MallocInfiPacket(packet
);
700 //the nodeNo is added at time of packet allocation
701 packet
->header
.code
= INFIPACKETCODE_DATA
;
706 key
= METADATAFIELD(ogm
->data
)->key
;
707 CmiAssert(key
!= NULL
);
709 EnqueuePacket(node
,packet
,size
,key
);
715 /***********************************************************************
716 * DeliverViaNetwork()
718 * This function is responsible for all non-local transmission. This
719 * function takes the outgoing messages, splits it into datagrams and
720 * enqueues them into the Send Queue.
721 ***********************************************************************/
722 void DeliverViaNetwork(OutgoingMsg ogm
, OtherNode node
, int rank
, unsigned int broot
, int copy
) {
723 int size
; char *data
;
724 MACHSTATE(3,"DeliverViaNetwork");
727 DgramHeaderMake(data
, rank
, ogm
->src
, Cmi_charmrun_pid
, 1, broot
); // May not be needed
728 CmiMsgHeaderSetLength(data
,size
);
729 while(size
>Cmi_dgram_max_data
) {
730 EnqueueDataPacket(ogm
, data
, Cmi_dgram_max_data
, node
, rank
, broot
);
731 size
-= Cmi_dgram_max_data
;
732 data
+= Cmi_dgram_max_data
;
736 EnqueueDataPacket(ogm
, data
, size
, node
, rank
, broot
);
741 static void ServiceCharmrun_nolock() {
743 MACHSTATE(2,"ServiceCharmrun_nolock begin {")
746 CheckSocketsReady(0);
747 if (ctrlskt_ready_read
) { // FIXME: this is set in another call
751 if (CmiStdoutNeedsService())
754 MACHSTATE(2,"} ServiceCharmrun_nolock end")
758 void static inline handoverMessage(char *newmsg
,int total_size
,int rank
,int broot
,int toBuffer
){
759 #if CMK_BROADCAST_SPANNING_TREE | CMK_BROADCAST_HYPERCUBE
760 if (rank
== DGRAM_BROADCAST
761 #if CMK_NODE_QUEUE_AVAILABLE
762 || rank
== DGRAM_NODEBROADCAST
766 insertBufferedBcast(CopyMsg(newmsg
,total_size
),total_size
,broot
,rank
);
768 #if CMK_BROADCAST_SPANNING_TREE
769 SendSpanningChildren(NULL
, 0, total_size
, newmsg
,broot
,rank
);
771 SendHypercube(NULL
, 0, total_size
, newmsg
,broot
,rank
);
778 case DGRAM_BROADCAST
: {
780 for (i
=1; i
<_Cmi_mynodesize
; i
++){
781 CmiPushPE(i
, CopyMsg(newmsg
, total_size
));
783 CmiPushPE(0, newmsg
);
786 #if CMK_NODE_QUEUE_AVAILABLE
787 case DGRAM_NODEBROADCAST
:
788 case DGRAM_NODEMESSAGE
: {
794 CmiPushPE(rank
, newmsg
);
796 } /* end of switch */
798 // processAllBufferedMsgs();
804 static inline void processMessage(int nodeNo
,int len
,char *msg
,const int toBuffer
){
806 OtherNode node
= &nodes
[nodeNo
];
807 newmsg
= node
->asm_msg
;
809 MACHSTATE2(3,"Processing packet from node %d len %d",nodeNo
,len
);
811 switch(node
->infiData
->state
){
812 case INFI_HEADER_DATA
: {
814 int rank
, srcpe
, seqno
, magic
, i
;
816 DgramHeaderBreak(msg
, rank
, srcpe
, magic
, seqno
, broot
); //FIXME: what does this do?
817 size
= CmiMsgHeaderGetLength(msg
);
818 MACHSTATE2(3,"START of a new message from node %d of total size %d",nodeNo
,size
);
819 newmsg
= (char *)CmiAlloc(size
); // FIXME: is there a better way than to do an alloc?
821 memcpy(newmsg
, msg
, len
);
822 node
->asm_rank
= rank
;
823 node
->asm_total
= size
;
824 node
->asm_fill
= len
;
825 node
->asm_msg
= newmsg
;
826 node
->infiData
->broot
= broot
;
828 //there are more packets following
829 node
->infiData
->state
= INFI_DATA
;
830 } else if(len
== size
){
831 //this is the only packet for this message
832 node
->infiData
->state
= INFI_HEADER_DATA
;
833 } else { //len < size
834 CmiPrintf("size: %d, len:%d.\n", size
, len
);
835 CmiAbort("\n\n\t\tLength mismatch!!\n\n");
841 if(node
->asm_fill
+len
<node
->asm_total
&&len
!=Cmi_dgram_max_data
){
842 CmiPrintf("from node %d asm_total: %d, asm_fill: %d, len:%d.\n",node
->infiData
->nodeNo
, node
->asm_total
, node
->asm_fill
, len
);
843 CmiAbort("packet in the middle does not have expected length");
845 if(node
->asm_fill
+len
> node
->asm_total
){
846 CmiPrintf("asm_total: %d, asm_fill: %d, len:%d.\n", node
->asm_total
, node
->asm_fill
, len
);
847 CmiAbort("\n\n\t\tLength mismatch!!\n\n");
850 memcpy(newmsg
+ node
->asm_fill
,msg
,len
);
851 node
->asm_fill
+= len
;
852 if(node
->asm_fill
== node
->asm_total
){
853 node
->infiData
->state
= INFI_HEADER_DATA
;
855 node
->infiData
->state
= INFI_DATA
;
861 if(node
->infiData
->state
== INFI_HEADER_DATA
){ // then the entire message is ready so hand it over
862 int total_size
= node
->asm_total
;
863 node
->asm_msg
= NULL
;
864 // handoverMessage(newmsg,total_size,node->asm_rank,node->infiData->broot,1); // FIXME: handover the message!
865 MACHSTATE3(3,"Message from node %d of length %d completely received msg %p",nodeNo
,total_size
,newmsg
);
870 void processSendWC(struct ibv_wc
*sendWC
) {
871 MACHSTATE(3,"processSendWC {");
872 infiPacket packet
= (infiPacket
)sendWC
->wr_id
;
873 FreeInfiPacket(packet
);
874 MACHSTATE(3,"} processSendWC ");
877 void processRecvWC(struct ibv_wc
*recvWC
,const int toBuffer
) {
879 struct infiBuffer
*buffer
= (struct infiBuffer
*) recvWC
->wr_id
;
880 struct infiPacketHeader
*header
= (struct infiPacketHeader
*)buffer
->buf
;
881 int nodeNo
= header
->nodeNo
;
883 int len
= recvWC
->byte_len
-sizeof(struct infiPacketHeader
);
884 MACHSTATE(3,"processRecvWC {");
885 MACHSTATE2(3,"packet from node %d len %d",nodeNo
,len
);
887 if(header
->code
& INFIPACKETCODE_DATA
){
888 processMessage(nodeNo
,len
,(buffer
->buf
+sizeof(struct infiPacketHeader
)),toBuffer
);
890 else if(header
->code
& INFIDUMMYPACKET
){
891 MACHSTATE(3,"Dummy packet");
893 else if(header
->code
& INFIBARRIERPACKET
){
894 MACHSTATE(3,"Barrier packet");
895 CmiAbort("Should not receive Barrier packet in normal polling loop. Your Barrier is broken");
899 struct ibv_sge list
= {
900 .addr
= (uintptr_t) buffer
->buf
,
901 .length
= buffer
->size
,
902 .lkey
= buffer
->key
->lkey
,
905 struct ibv_recv_wr wr
= {
906 .wr_id
= (uint64_t)buffer
,
911 struct ibv_recv_wr
*bad_wr
;
913 CmiAssert(ibv_post_recv(context
->qp
,&wr
,&bad_wr
)==0);
915 MACHSTATE(3,"} processRecvWC ");
919 static inline int pollCq(const int toBuffer
,struct ibv_cq
*cq
) {
920 /* toBuffer ignored for sendCq and is used for recvCq */
923 struct ibv_wc wc
[WC_LIST_SIZE
];
925 MACHSTATE1(2,"pollCq %d (((",toBuffer
);
926 ne
= ibv_poll_cq(cq
,WC_LIST_SIZE
,&wc
[0]);
929 MACHSTATE1(3,"pollCq ne %d",ne
);
931 CmiAbort("ibv_poll_cq error");
935 // CmiAssert(wc[i].status==IBV_WC_SUCCESS);
936 if(wc
[i
].status
!=IBV_WC_SUCCESS
) {
937 MACHSTATE3(3,"wc[%i].status=%i (%s)",i
,wc
[i
].status
,ibv_wc_status_str(wc
[i
].status
));
938 MACHSTATE3(3," wr_id=%i qp_num=%i vendor_err=%i",wc
[i
].wr_id
,wc
[i
].qp_num
,wc
[i
].vendor_err
);
939 MACHSTATE1(3," key=%p ",
940 ((struct infiBuffer
*)((&wc
[i
])->wr_id
))->key
);
941 /* MACHSTATE4(3," lkey=%p buffer=%d length=%d end=%d",
942 ((struct infiBuffer *)((&wc[i])->wr_id))->key->lkey,
943 ((struct infiBuffer *)((&wc[i])->wr_id))->buf,
944 ((struct infiBuffer *)((&wc[i])->wr_id))->size,
945 ((struct infiBuffer *)((&wc[i])->wr_id))->buf+((struct infiBuffer *)((&wc[i])->wr_id))->size);
947 */ CmiAssert(wc
[i
].status
==IBV_WC_SUCCESS
);
951 switch(wc
[i
].opcode
){
952 case IBV_WC_SEND
: //sending message
953 processSendWC(&wc
[i
]);
955 case IBV_WC_RECV
: // recving message
956 processRecvWC(&wc
[i
],toBuffer
);
959 CmiAbort("Wrong type of work completion object in cq");
964 MACHSTATE1(2,"))) pollCq %d",toBuffer
);
969 static inline void CommunicationServer_lock(int toBuffer
) {
971 CommunicationServer_nolock(0);
975 static inline void CommunicationServer_nolock(int toBuffer
) {
977 if(_Cmi_numnodes <= 1){
982 MACHSTATE(2,"CommServer_nolock{");
984 // pollCmiDirectQ(); // FIXME: not sure what this does...
986 pollCq(toBuffer
,context
->sendCq
);
987 pollCq(toBuffer
,context
->recvCq
);
990 // processAllBufferedMsgs(); // FIXME : I don't think we need buf'ed msgs
992 MACHSTATE(2,"} CommServer_nolock ne");
998 static uint16_t getLocalLid(struct ibv_context
*dev_context
, int port
){
999 struct ibv_port_attr attr
;
1000 if (ibv_query_port(dev_context
, port
, &attr
))
1007 struct infiAddr
* initinfiAddr(int node
,int lid
,int qpn
,int psn
) {
1008 struct infiAddr
*addr
=malloc(sizeof(struct infiAddr
));
1017 struct infiOtherNodeData
*initinfiData(int node
,int lid
,int qpn
,int psn
) {
1018 //struct infiOtherNodeData *initInfiOtherNodeData(int node,int addr[3]){
1019 struct infiOtherNodeData
*ret
=malloc(sizeof(struct infiOtherNodeData
));
1026 ret
->state
= INFI_HEADER_DATA
;
1028 MACHSTATE4(3,"Storing node[%i] (lid=%i qpn=%i psn=%i)",node
,lid
,qpn
,psn
);
1030 // ret->qp = context->qp;
1031 // ret->totalTokens = tokensPerProcessor;
1032 // ret->tokensLeft = tokensPerProcessor;
1033 // ret->postedRecvs = tokensPerProcessor;
1038 /***********************************************************************
1039 * CommunicationServer()
1041 * This function does the scheduling of the tasks related to the
1042 * message sends and receives. It is called from the CmiGeneralSend()
1043 * function, and periodically from the CommunicationInterrupt() (in case
1044 * of the single processor version), and from the comm_thread (for the
1045 * SMP version). Based on which of the data/control read/write sockets
1046 * are ready, the corresponding tasks are called
1048 ***********************************************************************/
1049 void CmiHandleImmediate();
1050 static void CommunicationServer(int sleepTime
, int where
) {
1051 /* 0: from smp thread
1053 2: from worker thread
1056 if(where
==COMM_SERVER_FROM_INTERRUPT
)
1059 if(where
== COMM_SERVER_FROM_WORKER
)
1061 if(where
== COMM_SERVER_FROM_SMP
) {
1062 ServiceCharmrun_nolock();
1064 CommunicationServer_lock(0);
1066 ServiceCharmrun_nolock();
1067 CommunicationServer_nolock(0);
1077 static void sendBarrierMessage(int pe
) {
1078 /* we will only need one packet */
1080 OtherNode node
=nodes
+pe
;
1082 MallocInfiPacket(packet
);
1083 packet
->size
= size
;
1084 packet
->buf
= CmiAlloc(size
);
1085 packet
->header
.code
=INFIBARRIERPACKET
;
1086 packet
->wr
.wr
.ud
.ah
=context
->ah
[pe
];
1087 packet
->wr
.wr
.ud
.remote_qpn
=nodes
[pe
].infiData
->qp
.qpn
;
1088 packet
->wr
.wr
.ud
.remote_qkey
= 0x11111111;
1090 MACHSTATE1(3,"HERE -> %d",packet
->header
.code
);
1091 MACHSTATE2(3,"sending to qpn=%i pe=%i",nodes
[pe
].infiData
->qp
.qpn
,pe
);
1092 struct ibv_mr
*key
=METADATAFIELD(packet
->buf
)->key
;
1093 MACHSTATE3(3,"Barrier packet to %d size %d wr_id %d",node
->infiData
->nodeNo
,size
,packet
->wr
.wr_id
);
1094 EnqueuePacket(node
,packet
,size
,key
);
1097 // FIXME: haven't looked at yet
1098 static void recvBarrierMessage() {
1101 /* struct ibv_wc wc[WC_LIST_SIZE];*/
1102 struct ibv_wc wc
[1];
1103 struct ibv_wc
*recvWC
;
1104 /* block on the recvq, this is lazy and evil in the general case because we abuse buffers but should be ok for startup barriers */
1105 int toBuffer
=1; // buffer without processing recvd messages
1106 int barrierReached
=0;
1107 struct infiBuffer
*buffer
= NULL
;
1108 struct infiPacketHeader
*header
= NULL
;
1111 int count
=0; // FIXME: remove debug
1112 MACHSTATE(3,"recvBarrierMessage 0"); // FIXME: REMOVE this debug
1113 while(!barrierReached
) {
1114 /* gengbin's semantic will implode if more than one q is polled at a time */
1115 pollCq(toBuffer
,context
->sendCq
); // FIXME: just put this in to fix pollSendCq req - not sure if its correct
1116 ne
= ibv_poll_cq(context
->recvCq
,1,&wc
[0]);
1118 MACHSTATE1(3,"recvBarrier ne %d",ne
);
1122 if(wc
[i
].status
!= IBV_WC_SUCCESS
){
1123 MACHSTATE3(3,"wc[%i].status=%i (%s)",i
,wc
[i
].status
,ibv_wc_status_str(wc
[i
].status
));
1124 MACHSTATE3(3," wr_id=%i qp_num=%i vendor_err=%i",wc
[i
].wr_id
,wc
[i
].qp_num
,wc
[i
].vendor_err
);
1126 MACHSTATE4(3," lkey=%d buffer=%d length=%d end=%d",
1127 ((struct infiBuffer *)((&wc[i])->wr_id))->key->lkey,
1128 ((struct infiBuffer *)((&wc[i])->wr_id))->buf,
1129 ((struct infiBuffer *)((&wc[i])->wr_id))->size,
1130 ((struct infiBuffer *)((&wc[i])->wr_id))->buf+((struct infiBuffer *)((&wc[i])->wr_id))->size);
1131 MACHSTATE1(3," key=%p ",
1132 ((struct infiBuffer *)((&wc[i])->wr_id))->key);
1135 CmiAbort("wc.status !=IBV_WC_SUCCESS");
1137 switch(wc
[i
].opcode
){
1138 case IBV_WC_RECV
: /* we have something to consider*/
1139 MACHSTATE(3," IN HERE !!!!!!!!!!");
1142 buffer
= (struct infiBuffer
*) recvWC
->wr_id
;
1143 header
= (struct infiPacketHeader
*)(buffer
->buf
+ 40); // add 40 bytes to skip the GRH
1145 nodeNo
= header
->nodeNo
;
1146 len
= recvWC
->byte_len
-sizeof(struct infiPacketHeader
);
1147 if(header
->code
& INFIPACKETCODE_DATA
){
1148 processMessage(nodeNo
,len
,(buffer
->buf
+sizeof(struct infiPacketHeader
)),toBuffer
);
1149 } else if(header
->code
& INFIDUMMYPACKET
){
1150 MACHSTATE(3,"Dummy packet");
1151 } else if(header
->code
& INFIBARRIERPACKET
){
1152 MACHSTATE2(3,"Barrier packet from node %d len %d",nodeNo
,len
);
1154 }else // FIXME: erase this else clause
1155 MACHSTATE2(3,"Ups... %d %d",header
->code
,nodeNo
);
1157 struct ibv_sge list
= {
1158 .addr
= (uintptr_t) buffer
->buf
,
1159 .length
= buffer
->size
,
1160 .lkey
= buffer
->key
->lkey
1163 struct ibv_recv_wr wr
= {
1164 .wr_id
= (uint64_t)buffer
,
1169 struct ibv_recv_wr
*bad_wr
;
1171 CmiAssert(ibv_post_recv(context
->qp
,&wr
,&bad_wr
)==0);
1176 CmiAbort("Wrong type of work completion object in recvq");
1181 /* semantically questionable */
1182 // processAllBufferedMsgs();
1187 // FIXME: haven't looked at yet
1188 /* happen at node level */
1194 int numnodes
= CmiNumNodes();
1195 MACHSTATE1(3,"Barrier 1 rank=%i",CmiMyRank());
1196 if (CmiMyRank() == 0) { /* every one send to pe 0 */
1197 if (CmiMyNode() != 0) {
1199 MACHSTATE(3,"Barrier sendmsg");
1200 sendBarrierMessage(0);
1201 recvBarrierMessage();
1202 for (i
=1; i
<=BROADCAST_SPANNING_FACTOR
; i
++) {
1203 int p
= CmiMyNode();
1204 p
= BROADCAST_SPANNING_FACTOR
*p
+ i
;
1205 if (p
> numnodes
- 1) break;
1207 /* printf("[%d] RELAY => %d \n", CmiMyPe(), p); */
1208 sendBarrierMessage(p
);
1211 MACHSTATE(3,"Barrier else");
1212 for (count
= 1; count
< numnodes
; count
++) {
1213 recvBarrierMessage();
1215 /* pe 0 broadcast */
1216 for (i
=1; i
<=BROADCAST_SPANNING_FACTOR
; i
++) {
1218 if (p
> numnodes
- 1) break;
1219 /* printf("[%d] BD => %d \n", CmiMyPe(), p); */
1220 sendBarrierMessage(p
);
1223 MACHSTATE(3,"Barrier 3");
1225 MACHSTATE(3,"Barrier 4");
1226 CmiNodeAllBarrier();
1227 // processAllBufferedMsgs();
1228 /* printf("[%d] OUT of barrier \n", CmiMyPe()); */
1229 MACHSTATE(3,"Barrier e");
1232 // FIXME: haven't looked at yet
1233 /* everyone sends a message to pe 0 and go on */
1234 int CmiBarrierZero() {
1237 if (CmiMyRank() == 0) {
1239 sendBarrierMessage(0);
1241 for (i
=0; i
<CmiNumNodes()-1; i
++) {
1242 recvBarrierMessage();
1246 CmiNodeAllBarrier();
1247 // processAllBufferedMsgs();
1251 void createqp(struct ibv_device
*dev
){
1253 context
->sendCq
= ibv_create_cq(context
->context
,context
->sendCqSize
,NULL
,NULL
,0);
1254 CmiAssert(context
->sendCq
!= NULL
);
1255 MACHSTATE1(3,"sendCq created %p",context
->sendCq
);
1257 context
->recvCq
= ibv_create_cq(context
->context
,context
->recvCqSize
,NULL
,NULL
,0);
1258 CmiAssert(context
->recvCq
!= NULL
);
1259 MACHSTATE2(3,"recvCq created %p %d",context
->recvCq
,context
->recvCqSize
);
1262 struct ibv_qp_init_attr attr
= {
1263 .qp_context
= context
->context
,
1264 .qp_type
= IBV_QPT_UD
,
1265 .send_cq
= context
->sendCq
,
1266 .recv_cq
= context
->recvCq
,
1270 .max_send_wr
= context
->sendCqSize
, // FIXME: this isn't right - need to make a smaller number
1271 .max_recv_wr
= context
->recvCqSize
, // FIXME: this isn't right - need to make a smaller number
1276 context
->qp
= ibv_create_qp(context
->pd
,&attr
);
1277 CmiAssert(context
->qp
!= NULL
);
1278 MACHSTATE1(3,"qp created %p",context
->qp
);
1281 struct ibv_qp_attr attr
;
1282 attr
.qp_state
= IBV_QPS_INIT
;
1283 attr
.pkey_index
= 0;
1284 attr
.port_num
= context
->ibPort
;
1285 attr
.qkey
= 0x11111111;
1286 if(ibv_modify_qp(context
->qp
, &attr
,
1291 CmiAbort("Could not modify QP to INIT");
1294 struct ibv_qp_attr attr
;
1295 attr
.qp_state
= IBV_QPS_RTR
;
1296 if(ibv_modify_qp(context
->qp
, &attr
, IBV_QP_STATE
))
1297 CmiAbort("Could not modify QP to RTR");
1300 struct ibv_qp_attr attr
;
1301 attr
.qp_state
= IBV_QPS_RTS
;
1302 attr
.sq_psn
=context
->localAddr
.psn
;
1303 if(ibv_modify_qp(context
->qp
, &attr
, IBV_QP_STATE
|IBV_QP_SQ_PSN
))
1304 CmiAbort("Could not modify QP to RTS");
1307 context
->localAddr
.lid
=getLocalLid(context
->context
,context
->ibPort
);
1308 context
->localAddr
.qpn
= context
->qp
->qp_num
;
1309 context
->localAddr
.psn
= lrand48() & 0xffffff;
1311 MACHSTATE3(4,"qp information (lid=%i qpn=%i psn=%i)",context
->localAddr
.lid
,context
->localAddr
.qpn
,context
->localAddr
.psn
);
1317 numnodes
=_Cmi_numnodes
;
1318 context
->ah
=(struct ibv_ah
**)malloc(sizeof(struct ibv_ah
*)*numnodes
);
1320 for(i
=0;i
<numnodes
;i
++) {
1321 // if(i!=_Cmi_mynode) {
1323 struct ibv_ah_attr ah_attr
= {
1325 .dlid
= nodes
[i
].infiData
->qp
.lid
,
1328 .port_num
= context
->ibPort
,
1330 context
->ah
[i
]=ibv_create_ah(context
->pd
,&ah_attr
);
1331 CmiAssert(context
->ah
[i
]!=0);
1332 MACHSTATE2(4,"ah for node %i lid=%i ",i
,ah_attr
.dlid
);
1339 void CmiMachineInit(char **argv
)
1345 MACHSTATE(3,"CmiMachineInit {");
1346 MACHSTATE2(3,"_Cmi_numnodes %d CmiNumNodes() %d",_Cmi_numnodes
,CmiNumNodes());
1347 MACHSTATE1(3,"CmiMyNodeSize() %d",CmiMyNodeSize());
1349 /* copied from ibverbs.c */
1355 packetsize
= mtu_size
*4;
1356 Cmi_dgram_max_data
=packetsize
-sizeof(struct infiPacketHeader
);
1357 CmiAssert(Cmi_dgram_max_data
>1);
1361 maxrecvbuffers
=calcmaxsize
;
1362 maxtokens
= calcmaxsize
;
1364 initInfiCmiChunkPools();
1366 ibud
.devlist
= ibv_get_device_list(NULL
);
1367 CmiAssert(ibud
.devlist
!= NULL
);
1369 ibud
.dev
= *(ibud
.devlist
);
1370 CmiAssert(ibud
.dev
!= NULL
);
1372 MACHSTATE1(3,"device name %s",ibv_get_device_name(ibud
.dev
));
1374 context
= (struct infiContext
*)malloc(sizeof(struct infiContext
));
1376 MACHSTATE1(3,"context allocated %p",context
);
1378 context
->sendCqSize
= 2; // FIXME: 1?
1379 context
->recvCqSize
= calcmaxsize
+1;
1380 context
->ibPort
= 1;
1381 context
->context
= ibv_open_device(ibud
.dev
); //the context for this infiniband device
1382 CmiAssert(context
->context
!= NULL
);
1384 MACHSTATE1(3,"device opened %p",context
->context
);
1386 context
->pd
= ibv_alloc_pd(context
->context
); //protection domain
1387 CmiAssert(context
->pd
!= NULL
);
1389 context
->header
.nodeNo
= _Cmi_mynode
;
1391 if(_Cmi_numnodes
>1) {
1393 //MACHSTATE1(3,"pp post recv=%i",pp_post_recv());
1396 MACHSTATE(3,"} CmiMachineInit");
1399 void CmiCommunicationInit(char **argv
) {
1400 MACHSTATE(3,"CmiCommunicationInit {");
1401 if(_Cmi_numnodes
>1) {
1402 infiPostInitialRecvs();
1405 MACHSTATE(3,"} CmiCommunicationInit");
1410 ibv_destroy_qp(context
->qp
);
1411 ibv_dealloc_pd(context
->pd
);
1412 ibv_close_device(context
->context
);
1413 ibv_free_device_list(ibud
.devlist
);