2 * Myrinet API GM implementation of Converse NET version
4 * contains only GM API specific code for:
6 * - CmiCommunicationInit()
8 * - DeliverViaNetwork()
9 * - CommunicationServer()
13 Gengbin Zheng, gzheng@uiuc.edu 4/22/2001
16 * 3/7/2004, Gengbin Zheng
17 implemented fault tolerant gm layer. When GM detects a catastrophic error,
18 it temporarily disables the delivery of all messages with the same sender
19 port, target port, and priority as the message that experienced the error.
20 This layer needs to properly handle the error message of GM and resume
24 1. DMAable buffer reuse;
32 #ifdef GM_API_VERSION_2_0
33 #if GM_API_VERSION >= GM_API_VERSION_2_0
38 /* default as in busywaiting mode */
39 #undef CMK_WHEN_PROCESSOR_IDLE_BUSYWAIT
40 #undef CMK_WHEN_PROCESSOR_IDLE_USLEEP
41 #define CMK_WHEN_PROCESSOR_IDLE_BUSYWAIT 1
42 #define CMK_WHEN_PROCESSOR_IDLE_USLEEP 0
44 #ifdef __ONESIDED_IMPL
45 #ifdef __ONESIDED_GM_HARDWARE
46 #include "conv-onesided.h"
49 void handleGetSrc(void *msg
);
50 void handleGetDest(void *msg
);
54 static gm_alarm_t gmalarm
;
56 /*#define CMK_USE_CHECKSUM*/
58 /******************************************************************************
60 * GM layer network statistics collection
62 *****************************************************************************/
67 static FILE *gmf
; /* one file per processor */
68 static int *gm_stats
; /* send count for each size */
69 static int possible_streamed
= 0; /* possible streaming counts */
70 static int defrag
= 0; /* number of defragment */
71 static int maxQueueLength
= 0; /* maximum send queue length */
74 /******************************************************************************
76 * Send messages pending queue (used internally)
78 *****************************************************************************/
81 /* max length of pending messages */
82 #define MAXPENDINGSEND 500
84 typedef struct PendingMsgStruct
87 int length
; /* length of message */
88 int size
; /* size of message, usually around log2(length) */
89 int mach_id
; /* receiver machine id */
90 int dataport
; /* receiver data port */
91 int node_idx
; /* receiver pe id */
92 struct PendingMsgStruct
*next
;
96 static int pendinglen
= 0;
98 /* reuse PendingMsg memory */
99 static PendingMsg pend_freelist
=NULL
;
101 #define FreePendingMsg(d) \
102 d->next = pend_freelist;\
105 #define MallocPendingMsg(d) \
107 if (d==0) {d = ((PendingMsg)malloc(sizeof(struct PendingMsgStruct)));\
109 } else pend_freelist = d->next;
111 void enqueue_sending(char *msg
, int length
, OtherNode node
, int size
)
114 MallocPendingMsg(pm
);
117 pm
->mach_id
= node
->mach_id
;
118 pm
->dataport
= node
->dataport
;
119 pm
->node_idx
= node
-nodes
;
122 if (node
->sendhead
== NULL
) {
123 node
->sendhead
= node
->sendtail
= pm
;
126 node
->sendtail
->next
= pm
;
131 if (pendinglen
> maxQueueLength
) maxQueueLength
= pendinglen
;
135 #define peek_sending(node) (node->sendhead)
137 #define dequeue_sending(node) \
138 if (node->sendhead != NULL) { \
139 node->sendhead = node->sendhead->next; \
143 static void alarmcallback (void *context
) {
144 MACHSTATE(4,"GM Alarm callback executed")
146 static int processEvent(gm_recv_event_t
*e
);
147 static void send_progress();
148 static void alarmInterrupt(int arg
);
149 static int gmExit(int code
,const char *msg
);
150 static char *getErrorMsg(gm_status_t status
);
152 /******************************************************************************
156 *****************************************************************************/
158 #define CMK_MSGPOOL 1
160 #define MAXMSGLEN 200
162 static char* msgpool
[MAXMSGLEN
];
163 static int msgNums
= 0;
165 static int maxMsgSize
= 0;
167 #define putPool(msg) { \
168 if (msgNums == MAXMSGLEN) gm_dma_free(gmport, msg); \
169 else msgpool[msgNums++] = msg; }
171 #define getPool(msg, len) { \
172 if (msgNums == 0) msg = gm_dma_malloc(gmport, maxMsgSize); \
173 else msg = msgpool[--msgNums]; \
177 /******************************************************************************
179 * CmiNotifyIdle()-- wait until a packet comes in
181 *****************************************************************************/
186 static CmiIdleState
*CmiNotifyGetState(void) { return NULL
; }
188 static void CmiNotifyStillIdle(CmiIdleState
*s
);
190 static void CmiNotifyBeginIdle(CmiIdleState
*s
)
192 CmiNotifyStillIdle(s
);
197 static void CmiNotifyStillIdle(CmiIdleState
*s
)
199 #if CMK_SHARED_VARS_UNAVAILABLE
200 /*No comm. thread-- listen on sockets for incoming messages*/
205 #define SLEEP_USING_ALARM 0
206 #if SLEEP_USING_ALARM /*Enable the alarm, so we don't sleep forever*/
207 gm_set_alarm (gmport
, &gmalarm
, (gm_u64_t
) pollMs
*1000, alarmcallback
,
211 #if SLEEP_USING_ALARM
212 MACHSTATE(3,"Blocking on receive {")
213 e
= gm_blocking_receive_no_spin(gmport
);
214 MACHSTATE(3,"} receive returned");
216 MACHSTATE(3,"CmiNotifyStillIdle NonBlocking on receive {")
217 e
= gm_receive(gmport
);
218 MACHSTATE(3,"} CmiNotifyStillIdle nonblocking receive returned");
221 #if SLEEP_USING_ALARM /*Cancel the alarm*/
222 gm_cancel_alarm (&gmalarm
);
225 /* have to handle this event now */
227 nreadable
= processEvent(e
);
233 /*Comm. thread will listen on sockets-- just sleep*/
234 CmiIdleLock_sleep(&CmiGetState()->idle
,5);
238 void CmiNotifyIdle(void) {
239 CmiNotifyStillIdle(NULL
);
242 /****************************************************************************
246 * Checks both sockets to see which are readable and which are writeable.
247 * We check all these things at the same time since this can be done for
248 * free with ``select.'' The result is stored in global variables, since
249 * this is essentially global state information and several routines need it.
251 ***************************************************************************/
253 int CheckSocketsReady(int withDelayMs
)
256 CMK_PIPE_DECL(withDelayMs
);
258 CmiStdoutAdd(CMK_PIPE_SUB
);
259 if (Cmi_charmrun_fd
!=-1) CMK_PIPE_ADDREAD(Cmi_charmrun_fd
);
261 nreadable
=CMK_PIPE_CALL();
262 ctrlskt_ready_read
= 0;
263 dataskt_ready_read
= 0;
264 dataskt_ready_write
= 0;
266 if (nreadable
== 0) {
267 MACHSTATE(1,"} CheckSocketsReady (nothing readable)")
272 MACHSTATE(2,"} CheckSocketsReady (INTERRUPTED!)")
273 return CheckSocketsReady(0);
275 CmiStdoutCheck(CMK_PIPE_SUB
);
276 if (Cmi_charmrun_fd
!=-1)
277 ctrlskt_ready_read
= CMK_PIPE_CHECKREAD(Cmi_charmrun_fd
);
278 MACHSTATE(1,"} CheckSocketsReady")
282 /***********************************************************************
283 * CommunicationServer()
285 * This function does the scheduling of the tasks related to the
286 * message sends and receives.
287 * It first check the charmrun port for message, and poll the gm event
288 * for send complete and outcoming messages.
290 ***********************************************************************/
292 /* always called from interrupt */
293 static void ServiceCharmrun_nolock()
296 MACHSTATE(2,"ServiceCharmrun_nolock begin {")
300 CheckSocketsReady(0);
301 if (ctrlskt_ready_read
) { ctrl_getone(); again
=1; }
302 if (CmiStdoutNeedsService()) { CmiStdoutService(); }
304 MACHSTATE(2,"} ServiceCharmrun_nolock end")
307 static void CommunicationServer_nolock(int withDelayMs
) {
310 MACHSTATE(2,"CommunicationServer_nolock start {")
312 MACHSTATE(3,"Non-blocking receive {")
313 e
= gm_receive(gmport
);
314 MACHSTATE(3,"} Non-blocking receive")
315 if (!processEvent(e
)) break;
317 MACHSTATE(2,"}CommunicationServer_nolock end")
323 2: from worker thread
324 Note in netpoll mode, charmrun service is only performed in interrupt,
325 pingCharmrun is from sig alarm, so it is lock free
327 static void CommunicationServerNet(int withDelayMs
, int where
)
329 /* standalone mode */
330 if (Cmi_charmrun_pid
== 0 && gmport
== NULL
) return;
332 MACHSTATE2(2,"CommunicationServer(%d) from %d {",withDelayMs
, where
)
335 /* don't service charmrun if converse exits, this fixed a hang bug */
336 if (!machine_initiated_shutdown
) ServiceCharmrun_nolock();
340 LOG(GetClock(), Cmi_nodestart
, 'I', 0, 0);
343 CommunicationServer_nolock(withDelayMs
);
346 #if CMK_IMMEDIATE_MSG
348 CmiHandleImmediate();
351 MACHSTATE(2,"} CommunicationServer")
354 static void processMessage(char *msg
, int len
)
357 int rank
, srcpe
, seqno
, magic
, i
;
360 unsigned char checksum
;
362 if (len
>= DGRAM_HEADER_SIZE
) {
363 DgramHeaderBreak(msg
, rank
, srcpe
, magic
, seqno
, broot
);
364 #ifdef CMK_USE_CHECKSUM
365 checksum
= computeCheckSum(msg
, len
);
368 if (magic
== (Cmi_charmrun_pid
&DGRAM_MAGIC_MASK
))
371 OtherNode node
= nodes_by_pe
[srcpe
];
373 if (seqno
== node
->recv_expect
) {
374 node
->recv_expect
= ((seqno
+1)&DGRAM_SEQNO_MASK
);
376 else if (seqno
< node
->recv_expect
) {
377 CmiPrintf("[%d] Warning: Past packet received from PE %d, something wrong with GM hardware? (expecting: %d seqno: %d)\n", CmiMyPe(), srcpe
, node
->recv_expect
, seqno
);
378 CmiPrintf("\n\n\t\t[%d] packet ignored!\n\n");
382 CmiPrintf("[%d] Error detected - Packet out of order from PE %d, something wrong with GM hardware? (expecting: %d got: %d)\n", CmiMyPe(), srcpe
, node
->recv_expect
, seqno
);
383 CmiAbort("\n\n\t\tPacket out of order!!\n\n");
385 newmsg
= node
->asm_msg
;
387 size
= CmiMsgHeaderGetLength(msg
);
389 CmiPrintf("size: %d, len:%d.\n", size
, len
);
390 CmiAbort("\n\n\t\tLength mismatch!!\n\n");
392 newmsg
= (char *)CmiAlloc(size
);
394 memcpy(newmsg
, msg
, len
);
395 node
->asm_rank
= rank
;
396 node
->asm_total
= size
;
397 node
->asm_fill
= len
;
398 node
->asm_msg
= newmsg
;
400 size
= len
- DGRAM_HEADER_SIZE
;
401 if (node
->asm_fill
+size
> node
->asm_total
) {
402 CmiPrintf("asm_total: %d, asm_fill: %d, len:%d.\n", node
->asm_total
, node
->asm_fill
, len
);
403 CmiAbort("\n\n\t\tLength mismatch!!\n\n");
405 memcpy(newmsg
+ node
->asm_fill
, msg
+DGRAM_HEADER_SIZE
, size
);
406 node
->asm_fill
+= size
;
408 /* get a full packet */
409 if (node
->asm_fill
== node
->asm_total
) {
410 int total_size
= node
->asm_total
;
413 /* do it after integration - the following function may re-entrant */
414 #if CMK_BROADCAST_SPANNING_TREE
415 if (rank
== DGRAM_BROADCAST
416 #if CMK_NODE_QUEUE_AVAILABLE
417 || rank
== DGRAM_NODEBROADCAST
420 SendSpanningChildren(NULL
, 0, total_size
, newmsg
, broot
, rank
);
421 #elif CMK_BROADCAST_HYPERCUBE
422 if (rank
== DGRAM_BROADCAST
423 #if CMK_NODE_QUEUE_AVAILABLE
424 || rank
== DGRAM_NODEBROADCAST
427 SendHypercube(NULL
, 0, total_size
, newmsg
, broot
, rank
);
431 case DGRAM_BROADCAST
: {
432 for (i
=1; i
<_Cmi_mynodesize
; i
++)
433 CmiPushPE(i
, CopyMsg(newmsg
, total_size
));
434 CmiPushPE(0, newmsg
);
437 #if CMK_NODE_QUEUE_AVAILABLE
438 case DGRAM_NODEBROADCAST
:
439 case DGRAM_NODEMESSAGE
: {
445 CmiPushPE(rank
, newmsg
);
446 } /* end of switch */
450 #ifdef CMK_USE_CHECKSUM
451 CmiPrintf("[%d] message ignored: checksum (%d) not 0!\n", CmiMyPe(), checksum
);
453 CmiPrintf("[%d] message ignored: magic not agree:%d != %d!\n",
454 CmiMyPe(), magic
, Cmi_charmrun_pid
&DGRAM_MAGIC_MASK
);
456 CmiPrintf("recved: rank:%d src:%d mag:%d seqno:%d len:%d\n", rank
, srcpe
, magic
, seqno
, len
);
460 CmiPrintf("[%d] message ignored: size is too small: %d!\n", CmiMyPe(), len
);
461 CmiPrintf("[%d] possible size: %d\n", CmiMsgHeaderGetLength(msg
));
465 /* return 1 - recv'ed 0 - no msg */
466 static int processEvent(gm_recv_event_t
*e
)
471 switch (gm_ntohc(e
->recv
.type
))
473 /* avoid copy from message to buffer */
474 case GM_FAST_PEER_RECV_EVENT
:
475 case GM_FAST_RECV_EVENT
:
476 MACHSTATE(4,"Incoming message")
477 msg
= gm_ntohp(e
->recv
.message
);
478 len
= gm_ntohl(e
->recv
.length
);
479 processMessage(msg
, len
);
482 case GM_FAST_HIGH_PEER_RECV_EVENT:
483 case GM_FAST_HIGH_RECV_EVENT:
485 case GM_HIGH_RECV_EVENT
:
487 MACHSTATE(4,"Incoming message")
488 size
= gm_ntohc(e
->recv
.size
);
489 msg
= gm_ntohp(e
->recv
.buffer
);
490 len
= gm_ntohl(e
->recv
.length
);
491 processMessage(msg
, len
);
492 gm_provide_receive_buffer(gmport
, msg
, size
, GM_HIGH_PRIORITY
);
494 case GM_NO_RECV_EVENT
:
499 MACHSTATE1(3,"Unrecognized GM event %d", gm_ntohc(e
->recv
.type
))
500 gm_unknown(gmport
, e
);
507 void drop_send_callback(struct gm_port
*p
, void *context
, gm_status_t status
)
509 PendingMsg out
= (PendingMsg
)context
;
510 void *msg
= out
->msg
;
512 printf("[%d] drop_send_callback dropped msg: %p\n", CmiMyPe(), msg
);
514 gm_dma_free(gmport
, msg
);
522 void send_callback(struct gm_port
*p
, void *context
, gm_status_t status
);
523 void drop_send_callback(struct gm_port
*p
, void *context
, gm_status_t status
)
525 PendingMsg out
= (PendingMsg
)context
;
526 void *msg
= out
->msg
;
527 gm_send_with_callback(gmport
, msg
, out
->size
, out
->length
,
528 GM_HIGH_PRIORITY
, out
->mach_id
, out
->dataport
,
533 void send_callback(struct gm_port
*p
, void *context
, gm_status_t status
)
535 PendingMsg out
= (PendingMsg
)context
;
536 void *msg
= out
->msg
;
538 OtherNode node
= nodes
+out
->node_idx
;
540 if (status
!= GM_SUCCESS
) {
541 int srcpe
, seqno
, magic
, broot
;
544 DgramHeaderBreak(msg
, rank
, srcpe
, magic
, seqno
, broot
);
545 errmsg
= getErrorMsg(status
);
546 CmiPrintf("GM Error> PE:%d send to msg %p node %d rank %d mach_id %d port %d len %d size %d failed to complete (error %d): %s\n", srcpe
, msg
, out
->node_idx
, rank
, out
->mach_id
, out
->dataport
, out
->length
, out
->size
, status
, errmsg
);
549 case GM_SEND_DROPPED
: {
550 OtherNode node
= nodes
+ out
->node_idx
;
551 if (out
->mach_id
== node
->mach_id
&& out
->dataport
== node
->dataport
) {
552 /* it not crashed, resent */
553 gm_send_with_callback(gmport
, msg
, out
->size
, out
->length
,
554 GM_HIGH_PRIORITY
, out
->mach_id
, out
->dataport
,
560 gm_drop_sends (gmport
, GM_HIGH_PRIORITY
, out
->mach_id
, out
->dataport
,
561 drop_send_callback
, out
);
565 case GM_SEND_TIMED_OUT
: {
566 CmiPrintf("gm send_callback timeout, drop sends and re-enable send. \n");
567 gm_drop_sends (gmport
, GM_HIGH_PRIORITY
, out
->mach_id
, out
->dataport
,
568 drop_send_callback
, out
);
569 node
->disable
=1; /* disable normal send */
572 case GM_SEND_DROPPED
:
573 CmiPrintf("Got DROPPED_SEND notification, resend\n");
574 gm_send_with_callback(gmport
, msg
, out
->size
, out
->length
,
575 GM_HIGH_PRIORITY
, out
->mach_id
, out
->dataport
,
579 CmiAbort("gm send_callback failed");
584 #ifdef CMK_USE_CHECKSUM
587 cksum = computeCheckSum((unsigned char*)msg, out->length);
589 CmiPrintf("[%d] Message altered during send, checksum (%d) does not agree!\n", CmiMyPe(), cksum);
590 CmiAbort("Myrinet error was detected!\n");
597 gm_dma_free(gmport
, msg
);
602 gm_free_send_token (gmport
, GM_HIGH_PRIORITY
);
605 /* send message pending in gm firmware */
607 if (node
->disable
&& node
->gm_pending
== 0) node
->disable
= 0;
609 /* since we have one free send token, start next send */
613 static void send_progress()
615 static int nextnode
= 0;
623 for (skip
=0; skip
<_Cmi_numnodes
; skip
++) {
624 node
= nodes
+nextnode
;
625 nextnode
= (nextnode
+ 1) % _Cmi_numnodes
;
626 if (node
->disable
) continue;
627 out
= peek_sending(node
);
629 if (gm_alloc_send_token(gmport
, GM_HIGH_PRIORITY
)) {
630 if (dataport
== out
->dataport
) {
631 gm_send_to_peer_with_callback(gmport
, out
->msg
, out
->size
,
632 out
->length
, GM_HIGH_PRIORITY
, out
->mach_id
,
636 gm_send_with_callback(gmport
, out
->msg
, out
->size
, out
->length
,
637 GM_HIGH_PRIORITY
, out
->mach_id
, out
->dataport
,
642 /* dequeue out, but not free it, used at callback */
643 dequeue_sending(node
);
645 gm_stats
[out
->size
] ++;
646 /* if we streaming, count how many message we possibly can combine */
649 curout
= peek_sending(node
);
653 possible_streamed
++;
660 else return; /* no send token */
667 /***********************************************************************
668 * DeliverViaNetwork()
670 * This function is responsible for all non-local transmission. It
671 * first allocate a send token, if fails, put the send message to
672 * penging message queue, otherwise invoke the GM send.
673 ***********************************************************************/
675 void EnqueueOutgoingDgram
676 (OutgoingMsg ogm
, char *ptr
, int dlen
, OtherNode node
, int rank
, int broot
)
679 int size
, len
, seqno
;
680 int alloclen
, allocSize
;
682 /* CmiPrintf("DeliverViaNetwork: size:%d\n", size); */
684 /* don't have to worry about ref count because we do copy, and
685 ogm can be free'ed right away */
686 /* ogm->refcount++; */
688 len
= dlen
+ DGRAM_HEADER_SIZE
;
690 /* allocate DMAable memory to prepare sending */
691 /* FIXME: another memory copy here from user buffer to DMAable buffer */
692 /* which however means the user buffer is untouched and can be reused */
694 buf
= (char *)gm_dma_malloc(gmport
, len
);
700 seqno
= node
->send_next
;
701 DgramHeaderMake(buf
, rank
, ogm
->src
, Cmi_charmrun_pid
, seqno
, broot
);
702 node
->send_next
= ((seqno
+1)&DGRAM_SEQNO_MASK
);
703 memcpy(buf
+DGRAM_HEADER_SIZE
, ptr
, dlen
);
704 #ifdef CMK_USE_CHECKSUM
706 DgramHeader
*head
= (DgramHeader
*)buf
;
707 head
->magic
^= computeCheckSum(buf
, len
);
710 size
= gm_min_size_for_length(len
);
712 /* if queue is not empty, enqueue msg. this is to guarantee the order */
713 if (pendinglen
!= 0) {
714 /* this potential screw up broadcast, because bcast packets can not be
715 interrupted by other sends in CommunicationServer_nolock */
716 enqueue_sending(buf
, len
, node
, size
);
719 enqueue_sending(buf
, len
, node
, size
);
723 /* copy is ignored, since we always copy */
724 void DeliverViaNetwork(OutgoingMsg ogm
, OtherNode node
, int rank
, unsigned int broot
, int copy
)
726 int size
; char *data
;
728 size
= ogm
->size
- DGRAM_HEADER_SIZE
;
729 data
= ogm
->data
+ DGRAM_HEADER_SIZE
;
731 if (size
> Cmi_dgram_max_data
) defrag
++;
733 while (size
> Cmi_dgram_max_data
) {
734 EnqueueOutgoingDgram(ogm
, data
, Cmi_dgram_max_data
, node
, rank
, broot
);
735 data
+= Cmi_dgram_max_data
;
736 size
-= Cmi_dgram_max_data
;
738 if (size
>0) EnqueueOutgoingDgram(ogm
, data
, size
, node
, rank
, broot
);
740 /* a simple flow control */
741 while (pendinglen
>= MAXPENDINGSEND
) {
742 /* pending max len exceeded, busy wait until get a token
743 Doing this surprisingly improve the performance by 2s for 200MB msg */
744 MACHSTATE(4,"Polling until token available")
745 CommunicationServer_nolock(0);
749 /* simple barrier at machine layer */
750 /* assuming no other flying messages */
751 static void send_callback_nothing(struct gm_port
*p
, void *context
, gm_status_t status
)
753 gm_dma_free(gmport
, context
);
756 static void sendBarrierMessage(int pe
)
759 char *buf
= (char *)gm_dma_malloc(gmport
, len
);
760 int size
= gm_min_size_for_length(len
);
761 OtherNode node
= nodes
+ pe
;
763 gm_send_with_callback(gmport
, buf
, size
, len
,
764 GM_HIGH_PRIORITY
, node
->mach_id
, node
->dataport
,
765 send_callback_nothing
, buf
);
768 static void recvBarrierMessage()
774 e
= gm_receive(gmport
);
775 switch (gm_ntohc(e
->recv
.type
))
777 case GM_HIGH_RECV_EVENT
:
779 MACHSTATE(4,"Incoming message")
780 size
= gm_ntohc(e
->recv
.size
);
781 msg
= gm_ntohp(e
->recv
.buffer
);
782 len
= gm_ntohl(e
->recv
.length
);
783 gm_provide_receive_buffer(gmport
, msg
, size
, GM_HIGH_PRIORITY
);
785 case GM_NO_RECV_EVENT
:
788 MACHSTATE1(3,"Unrecognized GM event %d", gm_ntohc(e
->recv
.type
))
789 gm_unknown(gmport
, e
);
794 /* happen at node level */
801 int numnodes
= CmiNumNodes();
802 if (CmiMyRank() == 0) {
803 /* every one send to pe 0 */
804 if (CmiMyNode() != 0) {
805 sendBarrierMessage(0);
807 /* printf("[%d] HERE\n", CmiMyPe()); */
808 if (CmiMyNode() == 0)
810 for (count
= 1; count
< numnodes
; count
++)
812 recvBarrierMessage();
815 for (i
=1; i
<=BROADCAST_SPANNING_FACTOR
; i
++) {
817 if (p
> numnodes
- 1) break;
818 /* printf("[%d] BD => %d \n", CmiMyPe(), p); */
819 sendBarrierMessage(p
);
822 /* non 0 node waiting */
823 if (CmiMyNode() != 0)
825 recvBarrierMessage();
826 for (i
=1; i
<=BROADCAST_SPANNING_FACTOR
; i
++) {
828 p
= BROADCAST_SPANNING_FACTOR
*p
+ i
;
829 if (p
> numnodes
- 1) break;
831 /* printf("[%d] RELAY => %d \n", CmiMyPe(), p); */
832 sendBarrierMessage(p
);
837 /* printf("[%d] OUT of barrier \n", CmiMyPe()); */
841 /* everyone sends a message to pe 0 and go on */
846 if (CmiMyRank() == 0) {
848 sendBarrierMessage(0);
851 for (i
=0; i
<CmiNumNodes()-1; i
++)
853 recvBarrierMessage();
861 /***********************************************************************
864 * This function intialize the GM board. Set receive buffer
866 ***********************************************************************/
870 void CmiMachineInit(char **argv
)
872 int dataport_max
=16; /*number of largest GM port to check*/
879 MACHSTATE(3,"CmiMachineInit {");
882 /* standalone mode */
883 if (dataport
== -1) return;
885 /* try a few times init gm */
886 for (i
=0; i
<retry
; i
++) {
888 if (status
== GM_SUCCESS
) break;
891 if (status
!= GM_SUCCESS
) {
892 printf("Cannot open GM library (does the machine have a GM card?)\n");
893 gm_perror("gm_init", status
);
898 for (dataport
=2;dataport
<dataport_max
;dataport
++) {
900 sprintf(portname
, "converse_port%d_%d", Cmi_charmrun_pid
, _Cmi_mynode
);
902 status
= gm_open(&gmport
, device
, dataport
, portname
, GM_API_VERSION_2_0
);
904 status
= gm_open(&gmport
, device
, dataport
, portname
, GM_API_VERSION_1_1
);
906 if (status
== GM_SUCCESS
) { break; }
908 if (dataport
==dataport_max
)
909 { /* Couldn't open any GM port... */
914 /* get our node id */
915 status
= gm_get_node_id(gmport
, (unsigned int *)&Cmi_mach_id
);
916 if (status
!= GM_SUCCESS
) { gm_perror("gm_get_node_id", status
); return; }
918 gm_node_id_to_global_id(gmport
, (unsigned int)Cmi_mach_id
, (unsigned int*)&Cmi_mach_id
);
921 /* default abort will take care of gm clean up */
922 skt_set_abort(gmExit
);
924 /* set up recv buffer */
926 maxsize = gm_min_size_for_length(4096);
927 Cmi_dgram_max_data = 4096 - DGRAM_HEADER_SIZE;
930 CmiGetArgIntDesc(argv
,"+gm_maxsize",&maxsize
,"maximum packet size in rank (2^maxsize)");
933 gm_stats
= (int*)malloc((maxsize
+1) * sizeof(int));
934 for (i
=0; i
<=maxsize
; i
++) gm_stats
[i
] = 0;
937 for (i
=1; i
<=maxsize
; i
++) {
938 int len
= gm_max_length_for_size(i
);
944 else if (i
<7) num
= 4;
945 else if (i
<13) num
= 20;
946 else if (i
<17) num
= 10;
947 else if (i
>22) num
= 1;
948 for (j
=0; j
<num
; j
++) {
949 buf
= gm_dma_malloc(gmport
, len
);
951 gm_provide_receive_buffer(gmport
, buf
, i
, GM_HIGH_PRIORITY
);
954 Cmi_dgram_max_data
= maxMsgSize
- DGRAM_HEADER_SIZE
;
956 status
= gm_set_acceptable_sizes (gmport
, GM_HIGH_PRIORITY
, (1<<(maxsize
+1))-1);
958 gm_free_send_tokens (gmport
, GM_HIGH_PRIORITY
,
959 gm_num_send_tokens (gmport
));
962 msgpool
[msgNums
++] = gm_dma_malloc(gmport
, maxMsgSize
);
965 /* alarm will ping charmrun */
966 gm_initialize_alarm(&gmalarm
);
968 MACHSTATE(3,"} CmiMachineInit");
971 void CmiCommunicationInit(char **argv
)
981 sprintf(fname
, "gm-stats.%d", CmiMyPe());
982 gmf
= fopen(fname
, "w");
984 for (i
=5; i
<=maxsize
; i
++) {
985 fprintf(gmf
, "[%d] size:%d count:%d\n", mype
, i
, gm_stats
[i
]);
987 fprintf(gmf
, "[%d] max quelen: %d possible streaming: %d defrag: %d \n", mype
, maxQueueLength
, possible_streamed
, defrag
);
992 void CmiGmConvertMachineID(unsigned int *mach_id
)
997 /* skip if running without charmrun */
998 if (Cmi_charmrun_pid
== 0 && gmport
== NULL
) return;
999 status
= gm_global_id_to_node_id(gmport
, *mach_id
, &newid
);
1000 if (status
== GM_SUCCESS
) *mach_id
= newid
;
1004 /* make sure other gm nodes are accessible in routing table */
1005 void CmiCheckGmStatus()
1009 if (Cmi_charmrun_pid
== 0 && gmport
== NULL
) return;
1010 if (gmport
== NULL
) machine_exit(1);
1011 for (i
=0; i
<_Cmi_numnodes
; i
++) {
1013 char uid
[6], str
[100];
1014 unsigned int mach_id
=nodes
[i
].mach_id
;
1015 status
= gm_node_id_to_unique_id(gmport
, mach_id
, uid
);
1016 if (status
!= GM_SUCCESS
|| ( uid
[0]==0 && uid
[1]== 0
1017 && uid
[2]==0 && uid
[3]==0 && uid
[4]==0 && uid
[5]==0)) {
1018 CmiPrintf("Error> gm node %d can't contact node %d. \n", CmiMyPe(), i
);
1021 /*CmiPrintf("[%d]: %d mach:%d ip:%d %d %d %d\n", CmiMyPe(), i, mach_id, nodes[i].IP,uid[0], uid[3], uid[5]);*/
1023 if (doabort
) CmiAbort("");
1026 static int gmExit(int code
,const char *msg
)
1028 fprintf(stderr
,"Fatal socket error: code %d-- %s\n",code
,msg
);
1033 static char *getErrorMsg(gm_status_t status
)
1037 case GM_SEND_TIMED_OUT
:
1038 errmsg
= "send time out"; break;
1039 case GM_SEND_REJECTED
:
1040 errmsg
= "send rejected"; break;
1041 case GM_SEND_TARGET_NODE_UNREACHABLE
:
1042 errmsg
= "target node unreachable"; break;
1043 case GM_SEND_TARGET_PORT_CLOSED
:
1044 errmsg
= "target port closed"; break;
1045 case GM_SEND_DROPPED
:
1046 errmsg
= "send dropped"; break;
1048 errmsg
= "unknown error"; break;
1053 #ifdef __ONESIDED_IMPL
1054 #ifdef __ONESIDED_GM_HARDWARE
1057 CmiRdmaCallbackFn fn
;
1060 typedef struct CmiCb CmiCb
;
1069 typedef struct CmiRMA CmiRMA
;
1072 char core
[CmiMsgHeaderSizeBytes
];
1075 typedef struct CmiRMAMsg CmiRMAMsg
;
1078 char core
[CmiMsgHeaderSizeBytes
];
1082 unsigned int targetId
;
1083 unsigned int sourceId
;
1084 unsigned int dataport
;
1087 typedef struct RMAPutMsg RMAPutMsg
;
1089 void *CmiDMAAlloc(int size
) {
1090 void *addr
= gm_dma_calloc(gmport
, 1, gm_max_length_for_size(size
));
1091 //gm_allow_remote_memory_access(gmport);
1095 int CmiRegisterMemory(void *addr
, unsigned int size
) {
1097 status
= gm_register_memory(gmport
, addr
, size
);
1098 if (status
!= GM_SUCCESS
) {
1099 CmiPrintf("Cannot register memory at %p of size %d\n",addr
,size
);
1100 gm_perror("registerMemory", status
);
1103 //gm_allow_remote_memory_access(gmport);
1107 int CmiUnRegisterMemory(void *addr
, unsigned int size
) {
1109 status
= gm_deregister_memory(gmport
, addr
, size
);
1110 if (status
!= GM_SUCCESS
) {
1111 CmiPrintf("Cannot unregister memory at %p of size %d\n",addr
,size
);
1112 gm_perror("UnregisterMemory", status
);
1118 void put_callback(struct gm_port
*p
, void *context
, gm_status_t status
)
1120 RMAPutMsg
*out
= (RMAPutMsg
*)context
;
1121 unsigned int destMachId
= (nodes_by_pe
[out
->targetId
])->mach_id
;
1122 if (status
!= GM_SUCCESS
) {
1124 case GM_SEND_DROPPED
:
1125 CmiPrintf("Got DROPPED_PUT notification, resend\n");
1126 gm_directed_send_with_callback(gmport
,out
->Saddr
,(gm_remote_ptr_t
)(out
->Taddr
),
1127 out
->size
,GM_HIGH_PRIORITY
,
1128 destMachId
, out
->dataport
,
1132 CmiAbort("gm send_callback failed");
1135 if(out
->stat
->type
==1) {
1136 out
->stat
->ready
.completed
= 1;
1137 //the handle is active, and the user will clean it
1140 (*(out
->stat
->ready
.cb
->fn
))(out
->stat
->ready
.cb
->param
);
1141 //I do not undestand, on turing the following two frees become double frees??
1142 CmiFree(out
->stat
->ready
.cb
);
1143 CmiFree(out
->stat
); //clean up the internal handle
1145 gm_free_send_token (gmport
, GM_HIGH_PRIORITY
);
1149 void *CmiPut(unsigned int sourceId
, unsigned int targetId
, void *Saddr
, void *Taddr
, unsigned int size
) {
1150 unsigned int dataport
= (nodes_by_pe
[targetId
])->dataport
;
1151 unsigned int destMachId
= (nodes_by_pe
[targetId
])->mach_id
;
1152 RMAPutMsg
*context
= (RMAPutMsg
*)CmiAlloc(sizeof(RMAPutMsg
));
1153 context
->Saddr
= Saddr
;
1154 context
->Taddr
= Taddr
;
1155 context
->size
= size
;
1156 context
->targetId
= targetId
;
1157 context
->sourceId
= sourceId
;
1158 context
->dataport
= dataport
;
1159 context
->stat
= (CmiRMA
*)CmiAlloc(sizeof(CmiRMA
));
1160 context
->stat
->type
= 1;
1161 context
->stat
->ready
.completed
= 0;
1162 void *stat
= (void*)(context
->stat
);
1163 //get a token before the put
1164 if (gm_alloc_send_token(gmport
, GM_HIGH_PRIORITY
)) {
1166 gm_directed_send_with_callback(gmport
,Saddr
,(gm_remote_ptr_t
)Taddr
,size
,
1167 GM_HIGH_PRIORITY
,destMachId
,dataport
,
1168 put_callback
,(void*)context
);
1173 void CmiPutCb(unsigned int sourceId
, unsigned int targetId
, void *Saddr
, void *Taddr
, unsigned int size
, CmiRdmaCallbackFn fn
, void *param
) {
1174 unsigned int dataport
= (nodes_by_pe
[targetId
])->dataport
;
1175 unsigned int destMachId
= (nodes_by_pe
[targetId
])->mach_id
;
1176 RMAPutMsg
*context
= (RMAPutMsg
*)CmiAlloc(sizeof(RMAPutMsg
));
1178 context
->Saddr
= Saddr
;
1179 context
->Taddr
= Taddr
;
1180 context
->size
= size
;
1181 context
->targetId
= targetId
;
1182 context
->sourceId
= sourceId
;
1183 context
->dataport
= dataport
;
1184 context
->stat
= (CmiRMA
*)CmiAlloc(sizeof(CmiRMA
));
1185 context
->stat
->type
= 0;
1186 context
->stat
->ready
.cb
= (CmiCb
*)CmiAlloc(sizeof(CmiCb
));
1187 context
->stat
->ready
.cb
->fn
= fn
;
1188 context
->stat
->ready
.cb
->param
= param
;
1189 //get a token before the put
1190 if (gm_alloc_send_token(gmport
, GM_HIGH_PRIORITY
)) {
1192 gm_directed_send_with_callback(gmport
,Saddr
,(gm_remote_ptr_t
)Taddr
,size
,
1193 GM_HIGH_PRIORITY
,destMachId
,dataport
,
1194 put_callback
,(void*)context
);
1199 void handleGetSrc(void *msg
) {
1200 CmiRMA
* stat
= ((CmiRMAMsg
*)msg
)->stat
;
1202 stat
->ready
.completed
= 1;
1203 //the handle is active, and the user will clean it
1206 (*(stat
->ready
.cb
->fn
))(stat
->ready
.cb
->param
);
1207 //I do not undestand, on turing the following two frees become double frees??
1208 CmiFree(stat
->ready
.cb
);
1209 CmiFree(stat
); //clean up the internal handle
1215 void get_callback_dest(struct gm_port
*p
, void *context
, gm_status_t status
)
1217 RMAPutMsg
*out
= (RMAPutMsg
*)context
;
1218 int sizeRmaStat
= sizeof(CmiRMAMsg
);
1220 unsigned int srcMachId
= (nodes_by_pe
[out
->targetId
])->mach_id
;
1221 if (status
!= GM_SUCCESS
) {
1223 case GM_SEND_DROPPED
:
1224 CmiPrintf("Got DROPPED_PUT notification, resend\n");
1225 gm_directed_send_with_callback(gmport
,out
->Saddr
,(int)(out
->Taddr
),
1226 out
->size
,GM_HIGH_PRIORITY
,
1227 out
->targetId
, out
->dataport
,
1228 get_callback_dest
, out
);
1231 CmiAbort("gm send_callback failed");
1234 gm_free_send_token (gmport
, GM_HIGH_PRIORITY
);
1235 //send a message to update the context status on the source node
1236 msgRmaStat
= (void*)CmiAlloc(sizeRmaStat
);
1237 ((CmiRMAMsg
*)msgRmaStat
)->stat
= out
->stat
;
1238 CmiSetHandler(msgRmaStat
,getSrcHandler
);
1239 CmiSyncSendAndFree(out
->targetId
,sizeRmaStat
,msgRmaStat
);
1243 void handleGetDest(void *msg
) {
1244 RMAPutMsg
*context1
= (RMAPutMsg
*)msg
;
1245 RMAPutMsg
*context
= (RMAPutMsg
*)CmiAlloc(sizeof(RMAPutMsg
));
1246 unsigned int srcMachId
= (nodes_by_pe
[context1
->sourceId
])->mach_id
;
1247 context
->Saddr
= context1
->Taddr
;
1248 context
->Taddr
= context1
->Saddr
;
1249 context
->size
= context1
->size
;
1250 context
->targetId
= context1
->sourceId
;
1251 context
->sourceId
= context1
->targetId
;
1252 context
->dataport
= context1
->dataport
;
1253 context
->stat
= context1
->stat
;
1254 //get a token before the put
1255 if (gm_alloc_send_token(gmport
, GM_HIGH_PRIORITY
)) {
1257 gm_directed_send_with_callback(gmport
,context
->Saddr
,(gm_remote_ptr_t
)context
->Taddr
,context
->size
,
1258 GM_HIGH_PRIORITY
,srcMachId
,context
->dataport
,
1259 get_callback_dest
,(void*)context
);
1264 /* Send a converse message to the destination to perform a get from destination.
1265 * The destination then sends a put to write the actual message
1266 * Then it sends a converse message to the sender to update the completed flag
1268 void *CmiGet(unsigned int sourceId
, unsigned int targetId
, void *Saddr
, void *Taddr
, unsigned int size
) {
1269 int dataport
= (nodes_by_pe
[targetId
])->dataport
;
1273 sizeRma
= sizeof(RMAPutMsg
);
1274 msgRma
= (void*)CmiAlloc(sizeRma
);
1276 context
= (RMAPutMsg
*)msgRma
;
1277 context
->Saddr
= Saddr
;
1278 context
->Taddr
= Taddr
;
1279 context
->size
= size
;
1280 context
->targetId
= targetId
;
1281 context
->sourceId
= sourceId
;
1282 context
->dataport
= dataport
;
1283 context
->stat
= (CmiRMA
*)CmiAlloc(sizeof(CmiRMA
));
1284 context
->stat
->type
= 1;
1285 context
->stat
->ready
.completed
= 0;
1286 void *stat
= (void*)(context
->stat
);
1288 CmiSetHandler(msgRma
,getDestHandler
);
1289 CmiSyncSendAndFree(targetId
,sizeRma
,msgRma
);
1294 void CmiGetCb(unsigned int sourceId
, unsigned int targetId
, void *Saddr
, void *Taddr
, unsigned int size
, CmiRdmaCallbackFn fn
, void *param
) {
1295 int dataport
= (nodes_by_pe
[targetId
])->dataport
;
1299 sizeRma
= sizeof(RMAPutMsg
);
1300 msgRma
= (void*)CmiAlloc(sizeRma
);
1302 context
= (RMAPutMsg
*)msgRma
;
1303 context
->Saddr
= Saddr
;
1304 context
->Taddr
= Taddr
;
1305 context
->size
= size
;
1306 context
->targetId
= targetId
;
1307 context
->sourceId
= sourceId
;
1308 context
->dataport
= dataport
;
1309 context
->stat
= (CmiRMA
*)CmiAlloc(sizeof(CmiRMA
));
1310 context
->stat
->type
= 0;
1311 context
->stat
->ready
.cb
= (CmiCb
*)CmiAlloc(sizeof(CmiCb
));
1312 context
->stat
->ready
.cb
->fn
= fn
;
1313 context
->stat
->ready
.cb
->param
= param
;
1315 CmiSetHandler(msgRma
,getDestHandler
);
1316 CmiSyncSendAndFree(targetId
,sizeRma
,msgRma
);
1321 int CmiWaitTest(void *obj
){
1322 CmiRMA
*stat
= (CmiRMA
*)obj
;
1323 return stat
->ready
.completed
;