netlrts: broadcasts are now handled by the common core code
[charm.git] / src / arch / netlrts / machine-mx.c
blob5bb69ad280d2665be8c34b4ce62e5a114dcdcb4e
2 /** @file
3 * Myrinet API GM implementation of Converse NET version
4 * @ingroup NET
5 * contains only MX API specific code for:
6 * - CmiMachineInit()
7 * - CmiCommunicationInit()
8 * - CmiNotifyIdle()
9 * - DeliverViaNetwork()
10 * - CommunicationServer()
11 * - MachineExit()
13 written by
14 Yan Shi, yanshi@uiuc.edu 2/1/2006
15 Gengbin Zheng, gzheng@uiuc.edu 2/3/2006
17 ChangeLog:
18 * 2/3/2006: Gengbin Zheng
19 implemented packetization, and fix a bug related to buffer reuse/change
20 in pending send
21 * 2/7/2006: Gengbin Zheng
22 implement active message mode using callback
23 short message pingpong time improved by 0.5us
24 * 2/8/2006: Gengbin Zheng
25 implement buffering of future messages (I haven't seen out-of-order
26 messages so far, but it may happen)
27 Using POOL relies on charmrun set MX_MONOTHREAD=1.
30 /**
31 * @addtogroup NET
32 * @{
35 /* use unexp callback */
36 #define MX_ACTIVE_MESSAGE 1
38 /*#define CMK_USE_CHECKSUM 0*/
40 /* default as in busywaiting mode */
41 #undef CMK_WHEN_PROCESSOR_IDLE_BUSYWAIT
42 #undef CMK_WHEN_PROCESSOR_IDLE_USLEEP
43 #define CMK_WHEN_PROCESSOR_IDLE_BUSYWAIT 1
44 #define CMK_WHEN_PROCESSOR_IDLE_USLEEP 0
47 /******************************************************************************
49 * Send messages pending queue (used internally)
51 *****************************************************************************/
53 typedef struct PendingSentMsgStruct
55 OutgoingMsg ogm;
56 char *data;
57 struct PendingSentMsgStruct *next;
58 mx_request_t handle;
59 int flag; /* used for active message mode */
61 *PendingSentMsg;
63 #define CMK_PMPOOL 1
65 #if CMK_PMPOOL
66 #define MAXPMS 200
67 static PendingSentMsg pmpool[MAXPMS];
68 static int pmNums = 0;
70 #define putPool(pm) { \
71 if (pmNums == MAXPMS) free(pm); \
72 else pmpool[pmNums++] = pm; }
74 #define getPool(pm) { \
75 if (pmNums == 0) {pm = (PendingSentMsg)malloc(sizeof(struct PendingSentMsgStruct));} \
76 else { pm = pmpool[--pmNums]; }\
78 #else
79 #define putPool(pm) { free(pm); }
80 #define getPool(pm) { pm = (PendingSentMsg)malloc(sizeof(struct PendingSentMsgStruct)); _MEMCHECK(pm);}
81 #endif
83 static PendingSentMsg sent_handles=NULL; /* head of queue */
84 static PendingSentMsg sent_handles_end=NULL; /* end of the queue */
86 #define NewPendingSentMsg(pm, ogm) \
87 { getPool(pm); \
88 pm->next=NULL; pm->ogm=ogm; pm->data=data; \
89 MACHSTATE1(1,"alloc msg %p",pm);\
92 #define InsertPendingSentMsg(pm) \
93 { if(sent_handles_end==NULL) {sent_handles=pm;} \
94 else {sent_handles_end->next=pm;} \
95 sent_handles_end=pm; MACHSTATE(1,"Insert done");}
97 #define FreePendingSentMsg(pm) \
98 { sent_handles=pm->next; \
99 if (sent_handles == NULL) sent_handles_end = NULL; \
100 if (pm->ogm) {pm->ogm->refcount--; GarbageCollectMsg(pm->ogm);} \
101 else CmiFree(pm->data); \
102 putPool(pm); }
104 CmiUInt8 MATCH_FILTER = 0x11111111FFFFFFFFLL;
105 CmiUInt8 MATCH_MASK = 0xffffffffffffffffLL;
107 static int processMessage(char *msg, int len);
108 static const char *getErrorMsg(mx_return_t rc);
109 static void processStatusCode(mx_status_t status);
110 static void PumpMsgs(int getone);
111 static void ReleaseSentMsgs(void);
112 #if MX_ACTIVE_MESSAGE
113 static void PumpEvents(int getone);
114 static volatile int gotone = 0;
115 #endif
117 /******************************************************************************
119 * CmiNotifyIdle()-- wait until a packet comes in
121 *****************************************************************************/
122 typedef struct {
123 char none;
124 } CmiIdleState;
126 static CmiIdleState *CmiNotifyGetState(void) { return NULL; }
128 static void CmiNotifyStillIdle(CmiIdleState *s)
130 int sleep = 1;
131 MACHSTATE(1,"CmiNotifyStillIdle {");
132 #if 0
133 CommunicationServer(0, COMM_SERVER_FROM_WORKER);
134 #else
135 #if MX_ACTIVE_MESSAGE
136 CmiCommLock();
137 PumpEvents(1);
138 CmiCommUnlock();
139 #else
140 #if CMK_WHEN_PROCESSOR_IDLE_BUSYWAIT
141 sleep = 0;
142 #endif
143 CmiCommLock();
144 ReleaseSentMsgs();
145 PumpMsgs(sleep); /* busy waiting */
146 CmiCommUnlock();
147 #endif
148 #endif
149 MACHSTATE(1,"} CmiNotifyStillIdle");
152 void CmiNotifyIdle(void) {
153 CmiNotifyStillIdle(NULL);
156 static void CmiNotifyBeginIdle(CmiIdleState *s)
158 CmiNotifyStillIdle(s);
161 /****************************************************************************
163 * CheckSocketsReady
165 * Checks both sockets to see which are readable and which are writeable.
166 * We check all these things at the same time since this can be done for
167 * free with ``select.'' The result is stored in global variables, since
168 * this is essentially global state information and several routines need it.
170 ***************************************************************************/
172 int CheckSocketsReady(int withDelayMs)
174 int nreadable;
175 CMK_PIPE_DECL(withDelayMs);
177 CmiStdoutAdd(CMK_PIPE_SUB);
178 if (Cmi_charmrun_fd!=-1) CMK_PIPE_ADDREAD(Cmi_charmrun_fd);
180 nreadable=CMK_PIPE_CALL();
181 ctrlskt_ready_read = 0;
182 dataskt_ready_read = 0;
183 dataskt_ready_write = 0;
185 if (nreadable == 0) {
186 MACHSTATE(1,"} CheckSocketsReady (nothing readable)")
187 return nreadable;
189 if (nreadable==-1) {
190 CMK_PIPE_CHECKERR();
191 MACHSTATE(2,"} CheckSocketsReady (INTERRUPTED!)")
192 return CheckSocketsReady(0);
194 CmiStdoutCheck(CMK_PIPE_SUB);
195 if (Cmi_charmrun_fd!=-1)
196 ctrlskt_ready_read = CMK_PIPE_CHECKREAD(Cmi_charmrun_fd);
197 MACHSTATE(1,"} CheckSocketsReady")
198 return nreadable;
201 /***********************************************************************
202 * CommunicationServer()
204 * This function does the scheduling of the tasks related to the
205 * message sends and receives.
206 * It first check the charmrun port for message, and poll the gm event
207 * for send complete and outcoming messages.
209 ***********************************************************************/
211 /* always called from interrupt */
212 static void ServiceCharmrun_nolock()
214 int again = 1;
215 MACHSTATE(2,"ServiceCharmrun_nolock begin {")
216 while (again)
218 again = 0;
219 CheckSocketsReady(0);
220 if (ctrlskt_ready_read) { ctrl_getone(); again=1; }
221 if (CmiStdoutNeedsService()) { CmiStdoutService(); }
223 MACHSTATE(2,"} ServiceCharmrun_nolock end")
226 static void PumpMsgs(int getone) {
227 mx_return_t rc;
228 mx_status_t status;
229 uint32_t result;
230 mx_segment_t buffer_desc;
231 mx_request_t recv_handle;
233 MACHSTATE1(2,"PumpMsgs(%d) {", getone);
234 while (1) {
235 if (getone)
236 rc = mx_probe(endpoint, 1, MATCH_FILTER, MATCH_MASK, &status, &result);
237 else
238 rc = mx_iprobe(endpoint, MATCH_FILTER, MATCH_MASK, &status, &result);
239 if(rc != MX_SUCCESS) {
240 const char *errmsg = getErrorMsg(rc);
241 CmiPrintf("mx_iprobe error: %s\n", errmsg);
242 CmiAbort("mx_iprobe Abort");
244 if (result == 0) { /* no incoming */
245 break;
247 MACHSTATE(2,"PumpMsgs recv one");
248 buffer_desc.segment_length = status.msg_length;
249 buffer_desc.segment_ptr = (char *) CmiAlloc(status.msg_length);
250 MACHSTATE(1,"Non-blocking receive {")
251 MACHSTATE1(1," size %d", status.msg_length);
252 rc = mx_irecv(endpoint, &buffer_desc, 1, MATCH_FILTER, MATCH_MASK, NULL, &recv_handle);
253 if (rc != MX_SUCCESS) {
254 const char *errmsg = getErrorMsg(rc);
255 CmiPrintf("mx_irecv error: %s\n", errmsg);
256 CmiAbort("Abort");
258 MACHSTATE1(1,"} Non-blocking receive return %d", rc);
259 again:
260 rc = mx_wait(endpoint, &recv_handle, MX_INFINITE, &status, &result);
261 /*rc = mx_test(endpoint, &recv_handle, &status, &result);*/
262 MACHSTATE3(1,"mx_wait return rc=%d result=%d status=%d", rc, result, status.code);
263 if (rc != MX_SUCCESS) {
264 const char *errmsg = getErrorMsg(rc);
265 CmiPrintf("mx_wait error: %s\n", errmsg);
266 CmiAbort("Abort");
268 if(result==0) {
269 CmiPrintf("mx_wait error: TIME OUT\n");
270 goto again;
272 else {
273 processMessage(buffer_desc.segment_ptr, buffer_desc.segment_length);
275 if (getone) break;
276 } /* end while */
277 MACHSTATE1(2,"} PumpMsgs(%d)", getone);
280 #if MX_ACTIVE_MESSAGE
281 static void PumpEvents(int getone) {
282 mx_return_t rc;
283 mx_status_t status;
284 uint32_t result;
285 mx_segment_t buffer_desc;
286 mx_request_t recv_handle;
288 while (1) {
289 rc = mx_ipeek(endpoint, &recv_handle, &result);
290 if (rc != MX_SUCCESS) {
291 const char *errmsg = getErrorMsg(rc);
292 CmiPrintf("mx_ipeek error: %s\n", errmsg);
293 CmiAbort("Abort");
295 if (result == 0) break;
296 rc = mx_test(endpoint, &recv_handle, &status, &result);
297 /*rc = mx_wait(endpoint, &recv_handle, MX_INFINITE, &status, &result);*/
298 if (rc != MX_SUCCESS) {
299 const char *errmsg = getErrorMsg(rc);
300 CmiPrintf("mx_wait error: %s\n", errmsg);
301 CmiAbort("Abort");
303 if(result==0) {
304 CmiAbort("mx_test or wait: TIME OUT\n"); /* this should never happen */
306 else {
307 PendingSentMsg pm = (PendingSentMsg)status.context;
308 if (pm->flag == 1) { /* send */
309 if (pm->ogm) {pm->ogm->refcount--; GarbageCollectMsg(pm->ogm);}
310 else CmiFree(pm->data);
312 else if (pm->flag == 2) { /* recv */
313 #if MX_ACTIVE_MESSAGE
314 if (status.msg_length == 4) {
315 gotone ++;
316 CmiFree(pm->data);
318 else
319 #endif
320 processMessage(pm->data, status.msg_length);
322 else {
323 CmiAbort("Invalid PendingSentMsg!");
325 putPool(pm);
327 if (getone) break;
331 /* active message model, default */
332 void recv_callback(void * context, uint64_t match_info, int length)
334 mx_segment_t buffer_desc;
335 mx_request_t recv_handle;
336 mx_return_t rc;
337 mx_status_t status;
338 uint32_t result;
339 PendingSentMsg pm;
341 buffer_desc.segment_length = length;
342 buffer_desc.segment_ptr = (char *) CmiAlloc(length);
343 getPool(pm);
344 pm->flag = 2;
345 pm->data = buffer_desc.segment_ptr;
346 if (MATCH_FILTER != match_info) {
347 CmiAbort("Invalid match_info");
349 rc = mx_irecv(endpoint, &buffer_desc, 1, MATCH_FILTER, MATCH_MASK, pm, &recv_handle);
350 if (rc != MX_SUCCESS) {
351 const char *errmsg = getErrorMsg(rc);
352 CmiPrintf("mx_irecv error: %s\n", errmsg);
353 CmiAbort("Abort");
355 if (1) {
356 rc = mx_test(endpoint, &recv_handle, &status, &result);
357 if (rc != MX_SUCCESS) {
358 const char *errmsg = getErrorMsg(rc);
359 CmiPrintf("mx_wait error: %s\n", errmsg);
360 CmiAbort("Abort");
362 if(result==0) {
363 return;
365 else {
366 processStatusCode(status);
367 CmiPrintf("PUSH HERE\n");
368 processMessage(pm->data, status.msg_length);
369 putPool(pm);
373 #endif
375 #define test_send_complete(handle, status, result) \
377 mx_return_t rc; \
378 rc = mx_test(endpoint, &(handle), &status, &result); \
379 if (rc != MX_SUCCESS) { \
380 MACHSTATE1(3," mx_test returns %d", rc); \
381 CmiAbort("mx_test failed\n"); \
385 static void ReleaseSentMsgs(void) {
386 MACHSTATE(2,"ReleaseSentMsgs {");
387 mx_return_t rc;
388 mx_status_t status;
389 unsigned int result;
390 PendingSentMsg next, pm = sent_handles;
391 while (pm!=NULL) {
392 test_send_complete(pm->handle, status, result);
393 next = pm->next;
394 if(result!=0 && status.code == MX_STATUS_SUCCESS) {
395 MACHSTATE1(2,"Sent complete. Free sent msg size %d", status.msg_length);
396 FreePendingSentMsg(pm);
398 else
399 break;
400 pm = next;
402 MACHSTATE(2,"} ReleaseSentMsgs");
405 static void CommunicationServer_nolock(int withDelayMs) {
406 if (endpoint == NULL) return;
407 MACHSTATE(2,"CommunicationServer_nolock start {")
408 #if MX_ACTIVE_MESSAGE
409 PumpEvents(0);
410 #else
411 PumpMsgs(0);
412 ReleaseSentMsgs();
413 #endif
414 MACHSTATE(2,"}CommunicationServer_nolock end");
418 0: from smp thread
419 1: from interrupt
420 2: from worker thread
421 Note in netpoll mode, charmrun service is only performed in interrupt,
422 pingCharmrun is from sig alarm, so it is lock free
424 static void CommunicationServerNet(int withDelayMs, int where)
426 /* standalone mode */
427 if (Cmi_charmrun_pid == 0 && endpoint == NULL) return;
429 MACHSTATE2(2,"CommunicationServerNet(%d) from %d {",withDelayMs, where)
431 if (where == COMM_SERVER_FROM_WORKER && machine_initiated_shutdown) {
432 /* Converse exit, wait for pingCharm to quit */
433 return;
436 if (where == COMM_SERVER_FROM_INTERRUPT) {
437 /* don't service charmrun if converse exits, this fixed a hang bug */
438 if (!machine_initiated_shutdown) ServiceCharmrun_nolock();
439 return;
441 else if (where == COMM_SERVER_FROM_SMP || where == COMM_SERVER_FROM_WORKER) {
442 if (machine_initiated_shutdown) ServiceCharmrun_nolock(); /* to exit */
445 LOG(GetClock(), Cmi_nodestart, 'I', 0, 0);
447 CmiCommLock();
448 CommunicationServer_nolock(withDelayMs);
449 CmiCommUnlock();
451 #if CMK_IMMEDIATE_MSG
452 if (where == COMM_SERVER_FROM_SMP)
453 CmiHandleImmediate();
454 #endif
456 MACHSTATE(2,"} CommunicationServerNet")
459 void processFutureMessages(OtherNode node)
461 if (!CdsFifo_Empty(node->futureMsgs)) {
462 int len = CdsFifo_Length(node->futureMsgs);
463 CmiPrintf("[%d] processFutureMessages %d\n", CmiMyPe(), len);
464 int i=0;
465 while (i<len) {
466 FutureMessage f = (FutureMessage)CdsFifo_Dequeue(node->futureMsgs);
467 int status = processMessage(f->msg, f->len);
468 free(f);
469 i++;
474 static int processMessage(char *msg, int len)
476 char *newmsg;
477 int rank, srcpe, seqno, magic, i;
478 unsigned int broot;
479 int size;
480 unsigned char checksum;
482 if (len >= DGRAM_HEADER_SIZE) {
483 DgramHeaderBreak(msg, rank, srcpe, magic, seqno, broot);
485 MACHSTATE2(1, "Break header Cmi-charmrun_id=%d, magic=%d", Cmi_charmrun_pid, magic);
486 MACHSTATE3(1, "srcpe=%d, seqno=%d, rank=%d", srcpe, seqno, rank);
488 #ifdef CMK_USE_CHECKSUM
489 checksum = computeCheckSum(msg, len);
490 if (checksum == 0)
491 #else
492 if (magic == (Cmi_charmrun_pid&DGRAM_MAGIC_MASK))
493 #endif
495 OtherNode node = nodes_by_pe[srcpe];
496 /* check seqno */
497 if (seqno == node->recv_expect) {
498 node->recv_expect = ((seqno+1)&DGRAM_SEQNO_MASK);
500 else if (seqno < node->recv_expect) {
501 CmiPrintf("[%d] Warning: Past packet received from PE %d (expecting: %d seqno: %d)\n", CmiMyPe(), srcpe, node->recv_expect, seqno);
502 CmiPrintf("\n\n\t\t[%d] packet ignored!\n\n");
503 return 0;
505 else {
506 CmiPrintf("[%d] Error detected - Packet out of order from PE %d (expecting: %d got: %d)\n", CmiMyPe(), srcpe, node->recv_expect, seqno);
508 CmiAbort("\n\n\t\tPacket out of order!!\n\n");
510 FutureMessage f = (FutureMessage)malloc(sizeof(struct FutureMessageStruct));
511 f->msg = msg;
512 f->len = len;
513 CdsFifo_Enqueue(node->futureMsgs, f);
514 return 0;
517 newmsg = node->asm_msg;
518 if (newmsg == 0) {
519 size = CmiMsgHeaderGetLength(msg);
520 if (size != len) {
521 newmsg = (char *)CmiAlloc(size);
522 _MEMCHECK(newmsg);
523 if (len > size) {
524 CmiPrintf("size: %d, len:%d.\n", size, len);
525 CmiAbort("\n\n\t\tLength mismatch!!\n\n");
527 memcpy(newmsg, msg, len);
528 CmiFree(msg); /* free original msg */
530 else
531 newmsg = msg;
532 node->asm_rank = rank;
533 node->asm_total = size;
534 node->asm_fill = len;
535 node->asm_msg = newmsg;
537 else {
538 size = len - DGRAM_HEADER_SIZE;
539 if (node->asm_fill+size > node->asm_total) {
540 CmiPrintf("asm_total: %d, asm_fill: %d, len:%d.\n", node->asm_total, node->asm_fill, len);
541 CmiAbort("\n\n\t\tLength mismatch!!\n\n");
543 memcpy(newmsg + node->asm_fill, msg+DGRAM_HEADER_SIZE, size);
544 CmiFree(msg); /* free original msg */
545 node->asm_fill += size;
548 /* get a full packet */
549 if (node->asm_fill == node->asm_total) {
550 switch (rank) {
551 case DGRAM_BROADCAST: {
552 for (i=1; i<_Cmi_mynodesize; i++)
553 CmiPushPE(i, CopyMsg(newmsg, node->asm_total));
554 CmiPushPE(0, newmsg);
555 break;
557 #if CMK_NODE_QUEUE_AVAILABLE
558 case DGRAM_NODEBROADCAST:
559 case DGRAM_NODEMESSAGE: {
560 CmiPushNode(newmsg);
561 break;
563 #endif
564 default:
565 CmiPushPE(rank, newmsg);
567 node->asm_msg = 0;
568 /* do it after integration - the following function may re-entrant */
569 #if CMK_BROADCAST_SPANNING_TREE
570 if (rank == DGRAM_BROADCAST
571 #if CMK_NODE_QUEUE_AVAILABLE
572 || rank == DGRAM_NODEBROADCAST
573 #endif
575 SendSpanningChildren(NULL, 0, node->asm_total, newmsg, broot, rank);
576 #elif CMK_BROADCAST_HYPERCUBE
577 if (rank == DGRAM_BROADCAST
578 #if CMK_NODE_QUEUE_AVAILABLE
579 || rank == DGRAM_NODEBROADCAST
580 #endif
582 SendHypercube(NULL, 0, node->asm_total, newmsg, broot, rank);
583 #endif
585 processFutureMessages(node);
587 else { /* checksum failed */
588 #ifdef CMK_USE_CHECKSUM
589 CmiPrintf("[%d] message ignored: checksum (%d) not 0!\n", CmiMyPe(), checksum);
590 #else
591 CmiPrintf("[%d] message ignored: magic not agree:%d != %d!\n",
592 CmiMyPe(), magic, Cmi_charmrun_pid&DGRAM_MAGIC_MASK);
593 #endif
594 CmiPrintf("[%d] recved: rank:%d src:%d magic:%d seqno:%d len:%d\n", CmiMyPe(), rank, srcpe, magic, seqno,
595 len);
598 else {
599 CmiPrintf("[%d] message ignored: size is too small: %d!\n", CmiMyPe(), len);
600 CmiPrintf("[%d] possible size: %d\n", CmiMsgHeaderGetLength(msg));
603 return 1;
606 /***********************************************************************
607 * DeliverViaNetwork()
609 * This function is responsible for all non-local transmission. It
610 * first allocate a send token, if fails, put the send message to
611 * penging message queue, otherwise invoke the GM send.
612 ***********************************************************************/
614 void EnqueueOutgoingDgram
615 (OutgoingMsg ogm, char *ptr, int dlen, OtherNode node, int rank, int broot, int copy)
617 int size, len, seqno;
618 mx_return_t rc;
619 mx_request_t sent_handle;
620 mx_segment_t buffer_desc;
621 uint32_t result;
622 char *data;
624 len = dlen + DGRAM_HEADER_SIZE;;
626 if (copy) {
627 data = CopyMsg(ptr-DGRAM_HEADER_SIZE, len);
629 else {
630 data = ptr-DGRAM_HEADER_SIZE;
631 ogm->refcount++;
634 seqno = node->send_next;
635 MACHSTATE5(1, "[%d] SEQNO: %d to node %d rank: %d %d", CmiMyPe(), seqno, node-nodes, rank, broot);
636 DgramHeaderMake(data, rank, ogm->src, Cmi_charmrun_pid, seqno, broot);
637 node->send_next = ((seqno+1)&DGRAM_SEQNO_MASK);
638 #ifdef CMK_USE_CHECKSUM
640 DgramHeader *head = (DgramHeader *)data;
641 head->magic ^= computeCheckSum(data, len);
643 #endif
645 MACHSTATE1(2, "EnqueueOutgoingDgram { len=%d", len);
646 /* MX will put outgoing message in queue and progress to send */
647 /* Note: Assume that MX provides unlimited buffers
648 so no user maintain is required */
649 buffer_desc.segment_ptr = data;
650 buffer_desc.segment_length = len;
651 PendingSentMsg pm;
652 if (copy) ogm = NULL;
653 NewPendingSentMsg(pm, ogm);
654 pm->flag = 1;
655 rc = mx_isend(endpoint, &buffer_desc, 1, node->endpoint_addr, MATCH_FILTER, pm, &(pm->handle));
656 if (rc != MX_SUCCESS) {
657 MACHSTATE1(3," mx_isend returns %d", rc);
658 CmiAbort("mx_isend failed\n");
660 #if !MX_ACTIVE_MESSAGE
661 InsertPendingSentMsg(pm);
662 #endif
663 MACHSTATE(2, "} EnqueueOutgoingDgram");
666 /* can not guarantee that buffer is not altered after return, so it is not
667 safe */
668 void DeliverViaNetwork(OutgoingMsg ogm, OtherNode node, int rank, unsigned int broot, int copy)
670 int size; char *data;
672 size = ogm->size - DGRAM_HEADER_SIZE;
673 data = ogm->data + DGRAM_HEADER_SIZE;
675 MACHSTATE3(2, "DeliverViaNetwork { : size:%d, to node mach_id=%d, nic=%ld", ogm->size, node->mach_id, node->nic_id);
677 while (size > Cmi_dgram_max_data) {
678 copy = 1; /* since we are packetizing, we need to copy anyway now */
679 EnqueueOutgoingDgram(ogm, data, Cmi_dgram_max_data, node, rank, broot, copy);
680 data += Cmi_dgram_max_data;
681 size -= Cmi_dgram_max_data;
683 if (size>0) EnqueueOutgoingDgram(ogm, data, size, node, rank, broot, copy);
685 MACHSTATE(2, "} DeliverViaNetwork");
688 static void sendBarrierMessage(int pe)
690 mx_request_t send_handle;
691 mx_segment_t buffer_desc;
692 mx_return_t rc;
693 mx_status_t status;
694 uint32_t result;
695 char msg[4];
697 OtherNode node = nodes + pe;
698 buffer_desc.segment_ptr = msg;
699 buffer_desc.segment_length = 4;
700 rc = mx_isend(endpoint, &buffer_desc, 1, node->endpoint_addr, MATCH_FILTER, NULL, &send_handle);
701 do {
702 rc = mx_test(endpoint, &send_handle, &status, &result);
703 } while (rc != MX_SUCCESS || result==0);
706 static void recvBarrierMessage()
708 mx_segment_t buffer_desc;
709 char msg[4];
710 mx_return_t rc;
711 mx_status_t status;
712 mx_request_t recv_handle;
713 uint32_t result;
715 #if MX_ACTIVE_MESSAGE
716 while (gotone == 0) {
717 mx_progress(endpoint);
718 PumpEvents(1);
720 gotone--;
721 #else
722 do {
723 rc = mx_probe(endpoint, 100, MATCH_FILTER, MATCH_MASK, &status, &result);
724 } while (result == 0);
725 CmiAssert(status.msg_length == 4);
727 buffer_desc.segment_length = 4;
728 buffer_desc.segment_ptr = msg;
729 rc = mx_irecv(endpoint, &buffer_desc, 1, MATCH_FILTER, MATCH_MASK, NULL, &recv_handle);
730 do {
731 rc = mx_wait(endpoint, &recv_handle, MX_INFINITE, &status, &result);
732 } while (rc!=MX_SUCCESS || result == 0);
733 #endif
736 /* happen at node level */
737 int CmiBarrier()
739 int len, size, i;
740 int status;
741 int count = 0;
742 OtherNode node;
743 int numnodes = CmiNumNodes();
744 if (CmiMyRank() == 0) {
745 /* every one send to pe 0 */
746 if (CmiMyNode() != 0) {
747 sendBarrierMessage(0);
749 /* printf("[%d] HERE\n", CmiMyPe()); */
750 if (CmiMyNode() == 0)
752 for (count = 1; count < numnodes; count ++)
754 recvBarrierMessage();
756 /* pe 0 broadcast */
757 for (i=1; i<=BROADCAST_SPANNING_FACTOR; i++) {
758 int p = i;
759 if (p > numnodes - 1) break;
760 /* printf("[%d] BD => %d \n", CmiMyPe(), p); */
761 sendBarrierMessage(p);
764 /* non 0 node waiting */
765 if (CmiMyNode() != 0)
767 recvBarrierMessage();
768 for (i=1; i<=BROADCAST_SPANNING_FACTOR; i++) {
769 int p = CmiMyNode();
770 p = BROADCAST_SPANNING_FACTOR*p + i;
771 if (p > numnodes - 1) break;
772 p = p%numnodes;
773 /* printf("[%d] RELAY => %d \n", CmiMyPe(), p); */
774 sendBarrierMessage(p);
778 CmiNodeAllBarrier();
779 /* printf("[%d] OUT of barrier \n", CmiMyPe()); */
780 return 0;
783 /* everyone sends a message to pe 0 and go on */
784 int CmiBarrierZero()
786 int i;
788 if (CmiMyRank() == 0) {
789 if (CmiMyNode()) {
790 sendBarrierMessage(0);
792 else {
793 for (i=0; i<CmiNumNodes()-1; i++)
795 recvBarrierMessage();
799 CmiNodeAllBarrier();
800 return 0;
803 /***********************************************************************
804 * CmiMachineInit()
806 * This function intialize the GM board. Set receive buffer
808 ***********************************************************************/
810 static int maxsize;
812 void CmiMachineInit(char **argv)
814 MACHSTATE(3,"CmiMachineInit {");
815 mx_return_t rc;
816 endpoint = NULL;
817 /* standalone mode */
818 if (dataport == -1) return;
820 rc = mx_init();
821 if (rc != MX_SUCCESS) {
822 MACHSTATE1(3," mx_init returns %d", rc);
823 printf("Cannot open MX library (does the machine have a GM card?)\n");
824 return;
827 rc = mx_open_endpoint(MX_ANY_NIC, MX_ANY_ENDPOINT, MX_FILTER, 0, 0, &endpoint);
828 if (rc != MX_SUCCESS) {
829 MACHSTATE1(3," open endpoint address returns %d", rc);
830 printf("Cannot open endpoint address\n");
831 return;
834 /* get endpoint address of local endpoint */
835 rc = mx_get_endpoint_addr(endpoint, &endpoint_addr);
836 if (rc != MX_SUCCESS) {
837 MACHSTATE1(3," get endpoint address returns %d", rc);
838 printf("Cannot get endpoint address\n");
839 return;
842 /* get NIC id and endpoint id */
843 rc = mx_decompose_endpoint_addr(endpoint_addr, &Cmi_nic_id, (uint32_t*)&Cmi_mach_id);
844 if (rc != MX_SUCCESS) {
845 MACHSTATE1(3," mx_decompose_endpoint returns %d", rc);
846 printf("Cannot decompose endpoint address\n");
847 return;
850 dataport = 1; /* fake it so that charmrun checking won't fail */
852 MATCH_FILTER &= Cmi_charmrun_pid;
854 Cmi_dgram_max_data = 1024-DGRAM_HEADER_SIZE;
856 #if MX_ACTIVE_MESSAGE
857 mx_register_unexp_callback(endpoint, recv_callback, NULL);
858 #endif
860 MACHSTATE(3,"} CmiMachineInit");
863 void CmiMXMakeConnection();
865 void CmiCommunicationInit(char **argv)
867 CmiMXMakeConnection();
870 void MachineExit()
872 MACHSTATE(3, "MachineExit {");
873 mx_return_t rc;
874 if (endpoint) {
875 rc = mx_close_endpoint(endpoint);
876 if(rc!=MX_SUCCESS){
877 MACHSTATE1(3, "mx_close_endpoint returns %d", rc);
878 printf("Can't do mx_close_endpoint\n");
879 return;
881 endpoint = NULL;
882 rc = mx_finalize();
883 if(rc!=MX_SUCCESS){
884 MACHSTATE1(3, "mx_finalize returns %d", rc);
885 printf("Can't do mx_finalize\n");
886 return;
889 MACHSTATE(3, "} MachineExit");
892 /* make sure other gm nodes are accessible in routing table */
893 void CmiMXMakeConnection()
895 int i;
896 int doabort = 0;
897 if (Cmi_charmrun_pid == 0 && endpoint == NULL) return;
898 if (endpoint == NULL) machine_exit(1);
899 MACHSTATE(3,"CmiMXMakeConnection {");
900 for (i=0; i<_Cmi_numnodes; i++) {
901 mx_return_t rc;
902 char ip_str[128];
903 skt_print_ip(ip_str, nodes[i].IP);
904 rc = mx_connect(endpoint, nodes[i].nic_id, nodes[i].mach_id, MX_FILTER, MX_INFINITE, &nodes[i].endpoint_addr);
905 if (rc != MX_SUCCESS) {
906 CmiPrintf("Error> mx node %d can't contact node %d. \n", CmiMyPe(), i);
907 doabort = 1;
910 if (doabort) CmiAbort("CmiMXMakeConnection");
911 MACHSTATE(3,"}CmiMXMakeConnection");
914 static const char *getErrorMsg(mx_return_t rc)
917 char *errmsg;
918 switch (rc) {
919 case MX_SUCCESS: return "MX_SUCCESS";
920 case MX_NO_RESOURCES: return "MX_NO_RESOURCES";
922 return "Unknown MX error message";
924 return mx_strerror(rc);
927 static void processStatusCode(mx_status_t status){
928 const char *str = mx_strstatus(status.code);
929 CmiPrintf("processStatusCode: %s\n", str);
930 MACHSTATE1(4, "%s", str);
933 /*@}*/