3 * Myrinet API GM implementation of Converse NET version
5 * contains only MX API specific code for:
7 * - CmiCommunicationInit()
9 * - DeliverViaNetwork()
10 * - CommunicationServer()
14 Yan Shi, yanshi@uiuc.edu 2/1/2006
15 Gengbin Zheng, gzheng@uiuc.edu 2/3/2006
18 * 2/3/2006: Gengbin Zheng
19 implemented packetization, and fix a bug related to buffer reuse/change
21 * 2/7/2006: Gengbin Zheng
22 implement active message mode using callback
23 short message pingpong time improved by 0.5us
24 * 2/8/2006: Gengbin Zheng
25 implement buffering of future messages (I haven't seen out-of-order
26 messages so far, but it may happen)
27 Using POOL relies on charmrun set MX_MONOTHREAD=1.
35 /* use unexp callback */
36 #define MX_ACTIVE_MESSAGE 1
38 /*#define CMK_USE_CHECKSUM 0*/
40 /* default as in busywaiting mode */
41 #undef CMK_WHEN_PROCESSOR_IDLE_BUSYWAIT
42 #undef CMK_WHEN_PROCESSOR_IDLE_USLEEP
43 #define CMK_WHEN_PROCESSOR_IDLE_BUSYWAIT 1
44 #define CMK_WHEN_PROCESSOR_IDLE_USLEEP 0
47 /******************************************************************************
49 * Send messages pending queue (used internally)
51 *****************************************************************************/
53 typedef struct PendingSentMsgStruct
57 struct PendingSentMsgStruct
*next
;
59 int flag
; /* used for active message mode */
67 static PendingSentMsg pmpool
[MAXPMS
];
68 static int pmNums
= 0;
70 #define putPool(pm) { \
71 if (pmNums == MAXPMS) free(pm); \
72 else pmpool[pmNums++] = pm; }
74 #define getPool(pm) { \
75 if (pmNums == 0) {pm = (PendingSentMsg)malloc(sizeof(struct PendingSentMsgStruct));} \
76 else { pm = pmpool[--pmNums]; }\
79 #define putPool(pm) { free(pm); }
80 #define getPool(pm) { pm = (PendingSentMsg)malloc(sizeof(struct PendingSentMsgStruct)); _MEMCHECK(pm);}
83 static PendingSentMsg sent_handles
=NULL
; /* head of queue */
84 static PendingSentMsg sent_handles_end
=NULL
; /* end of the queue */
86 #define NewPendingSentMsg(pm, ogm) \
88 pm->next=NULL; pm->ogm=ogm; pm->data=data; \
89 MACHSTATE1(1,"alloc msg %p",pm);\
92 #define InsertPendingSentMsg(pm) \
93 { if(sent_handles_end==NULL) {sent_handles=pm;} \
94 else {sent_handles_end->next=pm;} \
95 sent_handles_end=pm; MACHSTATE(1,"Insert done");}
97 #define FreePendingSentMsg(pm) \
98 { sent_handles=pm->next; \
99 if (sent_handles == NULL) sent_handles_end = NULL; \
100 if (pm->ogm) {pm->ogm->refcount--; GarbageCollectMsg(pm->ogm);} \
101 else CmiFree(pm->data); \
104 CmiUInt8 MATCH_FILTER
= 0x11111111FFFFFFFFLL
;
105 CmiUInt8 MATCH_MASK
= 0xffffffffffffffffLL
;
107 static int processMessage(char *msg
, int len
);
108 static const char *getErrorMsg(mx_return_t rc
);
109 static void processStatusCode(mx_status_t status
);
110 static void PumpMsgs(int getone
);
111 static void ReleaseSentMsgs(void);
112 #if MX_ACTIVE_MESSAGE
113 static void PumpEvents(int getone
);
114 static volatile int gotone
= 0;
117 /******************************************************************************
119 * CmiNotifyIdle()-- wait until a packet comes in
121 *****************************************************************************/
126 static CmiIdleState
*CmiNotifyGetState(void) { return NULL
; }
128 static void CmiNotifyStillIdle(CmiIdleState
*s
)
131 MACHSTATE(1,"CmiNotifyStillIdle {");
133 CommunicationServer(0, COMM_SERVER_FROM_WORKER
);
135 #if MX_ACTIVE_MESSAGE
140 #if CMK_WHEN_PROCESSOR_IDLE_BUSYWAIT
145 PumpMsgs(sleep
); /* busy waiting */
149 MACHSTATE(1,"} CmiNotifyStillIdle");
152 void CmiNotifyIdle(void) {
153 CmiNotifyStillIdle(NULL
);
156 static void CmiNotifyBeginIdle(CmiIdleState
*s
)
158 CmiNotifyStillIdle(s
);
161 /****************************************************************************
165 * Checks both sockets to see which are readable and which are writeable.
166 * We check all these things at the same time since this can be done for
167 * free with ``select.'' The result is stored in global variables, since
168 * this is essentially global state information and several routines need it.
170 ***************************************************************************/
172 int CheckSocketsReady(int withDelayMs
)
175 CMK_PIPE_DECL(withDelayMs
);
177 CmiStdoutAdd(CMK_PIPE_SUB
);
178 if (Cmi_charmrun_fd
!=-1) CMK_PIPE_ADDREAD(Cmi_charmrun_fd
);
180 nreadable
=CMK_PIPE_CALL();
181 ctrlskt_ready_read
= 0;
182 dataskt_ready_read
= 0;
183 dataskt_ready_write
= 0;
185 if (nreadable
== 0) {
186 MACHSTATE(1,"} CheckSocketsReady (nothing readable)")
191 MACHSTATE(2,"} CheckSocketsReady (INTERRUPTED!)")
192 return CheckSocketsReady(0);
194 CmiStdoutCheck(CMK_PIPE_SUB
);
195 if (Cmi_charmrun_fd
!=-1)
196 ctrlskt_ready_read
= CMK_PIPE_CHECKREAD(Cmi_charmrun_fd
);
197 MACHSTATE(1,"} CheckSocketsReady")
201 /***********************************************************************
202 * CommunicationServer()
204 * This function does the scheduling of the tasks related to the
205 * message sends and receives.
206 * It first check the charmrun port for message, and poll the gm event
207 * for send complete and outcoming messages.
209 ***********************************************************************/
211 /* always called from interrupt */
212 static void ServiceCharmrun_nolock()
215 MACHSTATE(2,"ServiceCharmrun_nolock begin {")
219 CheckSocketsReady(0);
220 if (ctrlskt_ready_read
) { ctrl_getone(); again
=1; }
221 if (CmiStdoutNeedsService()) { CmiStdoutService(); }
223 MACHSTATE(2,"} ServiceCharmrun_nolock end")
226 static void PumpMsgs(int getone
) {
230 mx_segment_t buffer_desc
;
231 mx_request_t recv_handle
;
233 MACHSTATE1(2,"PumpMsgs(%d) {", getone
);
236 rc
= mx_probe(endpoint
, 1, MATCH_FILTER
, MATCH_MASK
, &status
, &result
);
238 rc
= mx_iprobe(endpoint
, MATCH_FILTER
, MATCH_MASK
, &status
, &result
);
239 if(rc
!= MX_SUCCESS
) {
240 const char *errmsg
= getErrorMsg(rc
);
241 CmiPrintf("mx_iprobe error: %s\n", errmsg
);
242 CmiAbort("mx_iprobe Abort");
244 if (result
== 0) { /* no incoming */
247 MACHSTATE(2,"PumpMsgs recv one");
248 buffer_desc
.segment_length
= status
.msg_length
;
249 buffer_desc
.segment_ptr
= (char *) CmiAlloc(status
.msg_length
);
250 MACHSTATE(1,"Non-blocking receive {")
251 MACHSTATE1(1," size %d", status
.msg_length
);
252 rc
= mx_irecv(endpoint
, &buffer_desc
, 1, MATCH_FILTER
, MATCH_MASK
, NULL
, &recv_handle
);
253 if (rc
!= MX_SUCCESS
) {
254 const char *errmsg
= getErrorMsg(rc
);
255 CmiPrintf("mx_irecv error: %s\n", errmsg
);
258 MACHSTATE1(1,"} Non-blocking receive return %d", rc
);
260 rc
= mx_wait(endpoint
, &recv_handle
, MX_INFINITE
, &status
, &result
);
261 /*rc = mx_test(endpoint, &recv_handle, &status, &result);*/
262 MACHSTATE3(1,"mx_wait return rc=%d result=%d status=%d", rc
, result
, status
.code
);
263 if (rc
!= MX_SUCCESS
) {
264 const char *errmsg
= getErrorMsg(rc
);
265 CmiPrintf("mx_wait error: %s\n", errmsg
);
269 CmiPrintf("mx_wait error: TIME OUT\n");
273 processMessage(buffer_desc
.segment_ptr
, buffer_desc
.segment_length
);
277 MACHSTATE1(2,"} PumpMsgs(%d)", getone
);
280 #if MX_ACTIVE_MESSAGE
281 static void PumpEvents(int getone
) {
285 mx_segment_t buffer_desc
;
286 mx_request_t recv_handle
;
289 rc
= mx_ipeek(endpoint
, &recv_handle
, &result
);
290 if (rc
!= MX_SUCCESS
) {
291 const char *errmsg
= getErrorMsg(rc
);
292 CmiPrintf("mx_ipeek error: %s\n", errmsg
);
295 if (result
== 0) break;
296 rc
= mx_test(endpoint
, &recv_handle
, &status
, &result
);
297 /*rc = mx_wait(endpoint, &recv_handle, MX_INFINITE, &status, &result);*/
298 if (rc
!= MX_SUCCESS
) {
299 const char *errmsg
= getErrorMsg(rc
);
300 CmiPrintf("mx_wait error: %s\n", errmsg
);
304 CmiAbort("mx_test or wait: TIME OUT\n"); /* this should never happen */
307 PendingSentMsg pm
= (PendingSentMsg
)status
.context
;
308 if (pm
->flag
== 1) { /* send */
309 if (pm
->ogm
) {pm
->ogm
->refcount
--; GarbageCollectMsg(pm
->ogm
);}
310 else CmiFree(pm
->data
);
312 else if (pm
->flag
== 2) { /* recv */
313 #if MX_ACTIVE_MESSAGE
314 if (status
.msg_length
== 4) {
320 processMessage(pm
->data
, status
.msg_length
);
323 CmiAbort("Invalid PendingSentMsg!");
331 /* active message model, default */
332 void recv_callback(void * context
, uint64_t match_info
, int length
)
334 mx_segment_t buffer_desc
;
335 mx_request_t recv_handle
;
341 buffer_desc
.segment_length
= length
;
342 buffer_desc
.segment_ptr
= (char *) CmiAlloc(length
);
345 pm
->data
= buffer_desc
.segment_ptr
;
346 if (MATCH_FILTER
!= match_info
) {
347 CmiAbort("Invalid match_info");
349 rc
= mx_irecv(endpoint
, &buffer_desc
, 1, MATCH_FILTER
, MATCH_MASK
, pm
, &recv_handle
);
350 if (rc
!= MX_SUCCESS
) {
351 const char *errmsg
= getErrorMsg(rc
);
352 CmiPrintf("mx_irecv error: %s\n", errmsg
);
356 rc
= mx_test(endpoint
, &recv_handle
, &status
, &result
);
357 if (rc
!= MX_SUCCESS
) {
358 const char *errmsg
= getErrorMsg(rc
);
359 CmiPrintf("mx_wait error: %s\n", errmsg
);
366 processStatusCode(status
);
367 CmiPrintf("PUSH HERE\n");
368 processMessage(pm
->data
, status
.msg_length
);
375 #define test_send_complete(handle, status, result) \
378 rc = mx_test(endpoint, &(handle), &status, &result); \
379 if (rc != MX_SUCCESS) { \
380 MACHSTATE1(3," mx_test returns %d", rc); \
381 CmiAbort("mx_test failed\n"); \
385 static void ReleaseSentMsgs(void) {
386 MACHSTATE(2,"ReleaseSentMsgs {");
390 PendingSentMsg next
, pm
= sent_handles
;
392 test_send_complete(pm
->handle
, status
, result
);
394 if(result
!=0 && status
.code
== MX_STATUS_SUCCESS
) {
395 MACHSTATE1(2,"Sent complete. Free sent msg size %d", status
.msg_length
);
396 FreePendingSentMsg(pm
);
402 MACHSTATE(2,"} ReleaseSentMsgs");
405 static void CommunicationServer_nolock(int withDelayMs
) {
406 if (endpoint
== NULL
) return;
407 MACHSTATE(2,"CommunicationServer_nolock start {")
408 #if MX_ACTIVE_MESSAGE
414 MACHSTATE(2,"}CommunicationServer_nolock end");
420 2: from worker thread
421 Note in netpoll mode, charmrun service is only performed in interrupt,
422 pingCharmrun is from sig alarm, so it is lock free
424 static void CommunicationServerNet(int withDelayMs
, int where
)
426 /* standalone mode */
427 if (Cmi_charmrun_pid
== 0 && endpoint
== NULL
) return;
429 MACHSTATE2(2,"CommunicationServerNet(%d) from %d {",withDelayMs
, where
)
431 if (where
== COMM_SERVER_FROM_WORKER
&& machine_initiated_shutdown
) {
432 /* Converse exit, wait for pingCharm to quit */
436 if (where
== COMM_SERVER_FROM_INTERRUPT
) {
437 /* don't service charmrun if converse exits, this fixed a hang bug */
438 if (!machine_initiated_shutdown
) ServiceCharmrun_nolock();
441 else if (where
== COMM_SERVER_FROM_SMP
|| where
== COMM_SERVER_FROM_WORKER
) {
442 if (machine_initiated_shutdown
) ServiceCharmrun_nolock(); /* to exit */
445 LOG(GetClock(), Cmi_nodestart
, 'I', 0, 0);
448 CommunicationServer_nolock(withDelayMs
);
451 #if CMK_IMMEDIATE_MSG
452 if (where
== COMM_SERVER_FROM_SMP
)
453 CmiHandleImmediate();
456 MACHSTATE(2,"} CommunicationServerNet")
459 void processFutureMessages(OtherNode node
)
461 if (!CdsFifo_Empty(node
->futureMsgs
)) {
462 int len
= CdsFifo_Length(node
->futureMsgs
);
463 CmiPrintf("[%d] processFutureMessages %d\n", CmiMyPe(), len
);
466 FutureMessage f
= (FutureMessage
)CdsFifo_Dequeue(node
->futureMsgs
);
467 int status
= processMessage(f
->msg
, f
->len
);
474 static int processMessage(char *msg
, int len
)
477 int rank
, srcpe
, seqno
, magic
, i
;
480 unsigned char checksum
;
482 if (len
>= DGRAM_HEADER_SIZE
) {
483 DgramHeaderBreak(msg
, rank
, srcpe
, magic
, seqno
, broot
);
485 MACHSTATE2(1, "Break header Cmi-charmrun_id=%d, magic=%d", Cmi_charmrun_pid
, magic
);
486 MACHSTATE3(1, "srcpe=%d, seqno=%d, rank=%d", srcpe
, seqno
, rank
);
488 #ifdef CMK_USE_CHECKSUM
489 checksum
= computeCheckSum(msg
, len
);
492 if (magic
== (Cmi_charmrun_pid
&DGRAM_MAGIC_MASK
))
495 OtherNode node
= nodes_by_pe
[srcpe
];
497 if (seqno
== node
->recv_expect
) {
498 node
->recv_expect
= ((seqno
+1)&DGRAM_SEQNO_MASK
);
500 else if (seqno
< node
->recv_expect
) {
501 CmiPrintf("[%d] Warning: Past packet received from PE %d (expecting: %d seqno: %d)\n", CmiMyPe(), srcpe
, node
->recv_expect
, seqno
);
502 CmiPrintf("\n\n\t\t[%d] packet ignored!\n\n");
506 CmiPrintf("[%d] Error detected - Packet out of order from PE %d (expecting: %d got: %d)\n", CmiMyPe(), srcpe
, node
->recv_expect
, seqno
);
508 CmiAbort("\n\n\t\tPacket out of order!!\n\n");
510 FutureMessage f
= (FutureMessage
)malloc(sizeof(struct FutureMessageStruct
));
513 CdsFifo_Enqueue(node
->futureMsgs
, f
);
517 newmsg
= node
->asm_msg
;
519 size
= CmiMsgHeaderGetLength(msg
);
521 newmsg
= (char *)CmiAlloc(size
);
524 CmiPrintf("size: %d, len:%d.\n", size
, len
);
525 CmiAbort("\n\n\t\tLength mismatch!!\n\n");
527 memcpy(newmsg
, msg
, len
);
528 CmiFree(msg
); /* free original msg */
532 node
->asm_rank
= rank
;
533 node
->asm_total
= size
;
534 node
->asm_fill
= len
;
535 node
->asm_msg
= newmsg
;
538 size
= len
- DGRAM_HEADER_SIZE
;
539 if (node
->asm_fill
+size
> node
->asm_total
) {
540 CmiPrintf("asm_total: %d, asm_fill: %d, len:%d.\n", node
->asm_total
, node
->asm_fill
, len
);
541 CmiAbort("\n\n\t\tLength mismatch!!\n\n");
543 memcpy(newmsg
+ node
->asm_fill
, msg
+DGRAM_HEADER_SIZE
, size
);
544 CmiFree(msg
); /* free original msg */
545 node
->asm_fill
+= size
;
548 /* get a full packet */
549 if (node
->asm_fill
== node
->asm_total
) {
551 case DGRAM_BROADCAST
: {
552 for (i
=1; i
<_Cmi_mynodesize
; i
++)
553 CmiPushPE(i
, CopyMsg(newmsg
, node
->asm_total
));
554 CmiPushPE(0, newmsg
);
557 #if CMK_NODE_QUEUE_AVAILABLE
558 case DGRAM_NODEBROADCAST
:
559 case DGRAM_NODEMESSAGE
: {
565 CmiPushPE(rank
, newmsg
);
568 /* do it after integration - the following function may re-entrant */
569 #if CMK_BROADCAST_SPANNING_TREE
570 if (rank
== DGRAM_BROADCAST
571 #if CMK_NODE_QUEUE_AVAILABLE
572 || rank
== DGRAM_NODEBROADCAST
575 SendSpanningChildren(NULL
, 0, node
->asm_total
, newmsg
, broot
, rank
);
576 #elif CMK_BROADCAST_HYPERCUBE
577 if (rank
== DGRAM_BROADCAST
578 #if CMK_NODE_QUEUE_AVAILABLE
579 || rank
== DGRAM_NODEBROADCAST
582 SendHypercube(NULL
, 0, node
->asm_total
, newmsg
, broot
, rank
);
585 processFutureMessages(node
);
587 else { /* checksum failed */
588 #ifdef CMK_USE_CHECKSUM
589 CmiPrintf("[%d] message ignored: checksum (%d) not 0!\n", CmiMyPe(), checksum
);
591 CmiPrintf("[%d] message ignored: magic not agree:%d != %d!\n",
592 CmiMyPe(), magic
, Cmi_charmrun_pid
&DGRAM_MAGIC_MASK
);
594 CmiPrintf("[%d] recved: rank:%d src:%d magic:%d seqno:%d len:%d\n", CmiMyPe(), rank
, srcpe
, magic
, seqno
,
599 CmiPrintf("[%d] message ignored: size is too small: %d!\n", CmiMyPe(), len
);
600 CmiPrintf("[%d] possible size: %d\n", CmiMsgHeaderGetLength(msg
));
606 /***********************************************************************
607 * DeliverViaNetwork()
609 * This function is responsible for all non-local transmission. It
610 * first allocate a send token, if fails, put the send message to
611 * penging message queue, otherwise invoke the GM send.
612 ***********************************************************************/
614 void EnqueueOutgoingDgram
615 (OutgoingMsg ogm
, char *ptr
, int dlen
, OtherNode node
, int rank
, int broot
, int copy
)
617 int size
, len
, seqno
;
619 mx_request_t sent_handle
;
620 mx_segment_t buffer_desc
;
624 len
= dlen
+ DGRAM_HEADER_SIZE
;;
627 data
= CopyMsg(ptr
-DGRAM_HEADER_SIZE
, len
);
630 data
= ptr
-DGRAM_HEADER_SIZE
;
634 seqno
= node
->send_next
;
635 MACHSTATE5(1, "[%d] SEQNO: %d to node %d rank: %d %d", CmiMyPe(), seqno
, node
-nodes
, rank
, broot
);
636 DgramHeaderMake(data
, rank
, ogm
->src
, Cmi_charmrun_pid
, seqno
, broot
);
637 node
->send_next
= ((seqno
+1)&DGRAM_SEQNO_MASK
);
638 #ifdef CMK_USE_CHECKSUM
640 DgramHeader
*head
= (DgramHeader
*)data
;
641 head
->magic
^= computeCheckSum(data
, len
);
645 MACHSTATE1(2, "EnqueueOutgoingDgram { len=%d", len
);
646 /* MX will put outgoing message in queue and progress to send */
647 /* Note: Assume that MX provides unlimited buffers
648 so no user maintain is required */
649 buffer_desc
.segment_ptr
= data
;
650 buffer_desc
.segment_length
= len
;
652 if (copy
) ogm
= NULL
;
653 NewPendingSentMsg(pm
, ogm
);
655 rc
= mx_isend(endpoint
, &buffer_desc
, 1, node
->endpoint_addr
, MATCH_FILTER
, pm
, &(pm
->handle
));
656 if (rc
!= MX_SUCCESS
) {
657 MACHSTATE1(3," mx_isend returns %d", rc
);
658 CmiAbort("mx_isend failed\n");
660 #if !MX_ACTIVE_MESSAGE
661 InsertPendingSentMsg(pm
);
663 MACHSTATE(2, "} EnqueueOutgoingDgram");
666 /* can not guarantee that buffer is not altered after return, so it is not
668 void DeliverViaNetwork(OutgoingMsg ogm
, OtherNode node
, int rank
, unsigned int broot
, int copy
)
670 int size
; char *data
;
672 size
= ogm
->size
- DGRAM_HEADER_SIZE
;
673 data
= ogm
->data
+ DGRAM_HEADER_SIZE
;
675 MACHSTATE3(2, "DeliverViaNetwork { : size:%d, to node mach_id=%d, nic=%ld", ogm
->size
, node
->mach_id
, node
->nic_id
);
677 while (size
> Cmi_dgram_max_data
) {
678 copy
= 1; /* since we are packetizing, we need to copy anyway now */
679 EnqueueOutgoingDgram(ogm
, data
, Cmi_dgram_max_data
, node
, rank
, broot
, copy
);
680 data
+= Cmi_dgram_max_data
;
681 size
-= Cmi_dgram_max_data
;
683 if (size
>0) EnqueueOutgoingDgram(ogm
, data
, size
, node
, rank
, broot
, copy
);
685 MACHSTATE(2, "} DeliverViaNetwork");
688 static void sendBarrierMessage(int pe
)
690 mx_request_t send_handle
;
691 mx_segment_t buffer_desc
;
697 OtherNode node
= nodes
+ pe
;
698 buffer_desc
.segment_ptr
= msg
;
699 buffer_desc
.segment_length
= 4;
700 rc
= mx_isend(endpoint
, &buffer_desc
, 1, node
->endpoint_addr
, MATCH_FILTER
, NULL
, &send_handle
);
702 rc
= mx_test(endpoint
, &send_handle
, &status
, &result
);
703 } while (rc
!= MX_SUCCESS
|| result
==0);
706 static void recvBarrierMessage()
708 mx_segment_t buffer_desc
;
712 mx_request_t recv_handle
;
715 #if MX_ACTIVE_MESSAGE
716 while (gotone
== 0) {
717 mx_progress(endpoint
);
723 rc
= mx_probe(endpoint
, 100, MATCH_FILTER
, MATCH_MASK
, &status
, &result
);
724 } while (result
== 0);
725 CmiAssert(status
.msg_length
== 4);
727 buffer_desc
.segment_length
= 4;
728 buffer_desc
.segment_ptr
= msg
;
729 rc
= mx_irecv(endpoint
, &buffer_desc
, 1, MATCH_FILTER
, MATCH_MASK
, NULL
, &recv_handle
);
731 rc
= mx_wait(endpoint
, &recv_handle
, MX_INFINITE
, &status
, &result
);
732 } while (rc
!=MX_SUCCESS
|| result
== 0);
736 /* happen at node level */
743 int numnodes
= CmiNumNodes();
744 if (CmiMyRank() == 0) {
745 /* every one send to pe 0 */
746 if (CmiMyNode() != 0) {
747 sendBarrierMessage(0);
749 /* printf("[%d] HERE\n", CmiMyPe()); */
750 if (CmiMyNode() == 0)
752 for (count
= 1; count
< numnodes
; count
++)
754 recvBarrierMessage();
757 for (i
=1; i
<=BROADCAST_SPANNING_FACTOR
; i
++) {
759 if (p
> numnodes
- 1) break;
760 /* printf("[%d] BD => %d \n", CmiMyPe(), p); */
761 sendBarrierMessage(p
);
764 /* non 0 node waiting */
765 if (CmiMyNode() != 0)
767 recvBarrierMessage();
768 for (i
=1; i
<=BROADCAST_SPANNING_FACTOR
; i
++) {
770 p
= BROADCAST_SPANNING_FACTOR
*p
+ i
;
771 if (p
> numnodes
- 1) break;
773 /* printf("[%d] RELAY => %d \n", CmiMyPe(), p); */
774 sendBarrierMessage(p
);
779 /* printf("[%d] OUT of barrier \n", CmiMyPe()); */
783 /* everyone sends a message to pe 0 and go on */
788 if (CmiMyRank() == 0) {
790 sendBarrierMessage(0);
793 for (i
=0; i
<CmiNumNodes()-1; i
++)
795 recvBarrierMessage();
803 /***********************************************************************
806 * This function intialize the GM board. Set receive buffer
808 ***********************************************************************/
812 void CmiMachineInit(char **argv
)
814 MACHSTATE(3,"CmiMachineInit {");
817 /* standalone mode */
818 if (dataport
== -1) return;
821 if (rc
!= MX_SUCCESS
) {
822 MACHSTATE1(3," mx_init returns %d", rc
);
823 printf("Cannot open MX library (does the machine have a GM card?)\n");
827 rc
= mx_open_endpoint(MX_ANY_NIC
, MX_ANY_ENDPOINT
, MX_FILTER
, 0, 0, &endpoint
);
828 if (rc
!= MX_SUCCESS
) {
829 MACHSTATE1(3," open endpoint address returns %d", rc
);
830 printf("Cannot open endpoint address\n");
834 /* get endpoint address of local endpoint */
835 rc
= mx_get_endpoint_addr(endpoint
, &endpoint_addr
);
836 if (rc
!= MX_SUCCESS
) {
837 MACHSTATE1(3," get endpoint address returns %d", rc
);
838 printf("Cannot get endpoint address\n");
842 /* get NIC id and endpoint id */
843 rc
= mx_decompose_endpoint_addr(endpoint_addr
, &Cmi_nic_id
, (uint32_t*)&Cmi_mach_id
);
844 if (rc
!= MX_SUCCESS
) {
845 MACHSTATE1(3," mx_decompose_endpoint returns %d", rc
);
846 printf("Cannot decompose endpoint address\n");
850 dataport
= 1; /* fake it so that charmrun checking won't fail */
852 MATCH_FILTER
&= Cmi_charmrun_pid
;
854 Cmi_dgram_max_data
= 1024-DGRAM_HEADER_SIZE
;
856 #if MX_ACTIVE_MESSAGE
857 mx_register_unexp_callback(endpoint
, recv_callback
, NULL
);
860 MACHSTATE(3,"} CmiMachineInit");
863 void CmiMXMakeConnection();
865 void CmiCommunicationInit(char **argv
)
867 CmiMXMakeConnection();
872 MACHSTATE(3, "MachineExit {");
875 rc
= mx_close_endpoint(endpoint
);
877 MACHSTATE1(3, "mx_close_endpoint returns %d", rc
);
878 printf("Can't do mx_close_endpoint\n");
884 MACHSTATE1(3, "mx_finalize returns %d", rc
);
885 printf("Can't do mx_finalize\n");
889 MACHSTATE(3, "} MachineExit");
892 /* make sure other gm nodes are accessible in routing table */
893 void CmiMXMakeConnection()
897 if (Cmi_charmrun_pid
== 0 && endpoint
== NULL
) return;
898 if (endpoint
== NULL
) machine_exit(1);
899 MACHSTATE(3,"CmiMXMakeConnection {");
900 for (i
=0; i
<_Cmi_numnodes
; i
++) {
903 skt_print_ip(ip_str
, nodes
[i
].IP
);
904 rc
= mx_connect(endpoint
, nodes
[i
].nic_id
, nodes
[i
].mach_id
, MX_FILTER
, MX_INFINITE
, &nodes
[i
].endpoint_addr
);
905 if (rc
!= MX_SUCCESS
) {
906 CmiPrintf("Error> mx node %d can't contact node %d. \n", CmiMyPe(), i
);
910 if (doabort
) CmiAbort("CmiMXMakeConnection");
911 MACHSTATE(3,"}CmiMXMakeConnection");
914 static const char *getErrorMsg(mx_return_t rc
)
919 case MX_SUCCESS: return "MX_SUCCESS";
920 case MX_NO_RESOURCES: return "MX_NO_RESOURCES";
922 return "Unknown MX error message";
924 return mx_strerror(rc
);
927 static void processStatusCode(mx_status_t status
){
928 const char *str
= mx_strstatus(status
.code
);
929 CmiPrintf("processStatusCode: %s\n", str
);
930 MACHSTATE1(4, "%s", str
);