netlrts: broadcasts are now handled by the common core code
[charm.git] / src / arch / netlrts / machine-gm.c
blobbaf0ee082b79c3210658887e909ae7b645edd61d
1 /** @file
2 * Myrinet API GM implementation of Converse NET version
3 * @ingroup NET
4 * contains only GM API specific code for:
5 * - CmiMachineInit()
6 * - CmiCommunicationInit()
7 * - CmiNotifyIdle()
8 * - DeliverViaNetwork()
9 * - CommunicationServer()
10 * - MachineExit()
12 written by
13 Gengbin Zheng, gzheng@uiuc.edu 4/22/2001
15 ChangeLog:
16 * 3/7/2004, Gengbin Zheng
17 implemented fault tolerant gm layer. When GM detects a catastrophic error,
18 it temporarily disables the delivery of all messages with the same sender
19 port, target port, and priority as the message that experienced the error.
20 This layer needs to properly handle the error message of GM and resume
21 the port.
23 TODO:
24 1. DMAable buffer reuse;
27 /**
28 * @addtogroup NET
29 * @{
32 #ifdef GM_API_VERSION_2_0
33 #if GM_API_VERSION >= GM_API_VERSION_2_0
34 #define CMK_USE_GM2 1
35 #endif
36 #endif
38 /* default as in busywaiting mode */
39 #undef CMK_WHEN_PROCESSOR_IDLE_BUSYWAIT
40 #undef CMK_WHEN_PROCESSOR_IDLE_USLEEP
41 #define CMK_WHEN_PROCESSOR_IDLE_BUSYWAIT 1
42 #define CMK_WHEN_PROCESSOR_IDLE_USLEEP 0
44 #ifdef __ONESIDED_IMPL
45 #ifdef __ONESIDED_GM_HARDWARE
46 #include "conv-onesided.h"
47 int getSrcHandler;
48 int getDestHandler;
49 void handleGetSrc(void *msg);
50 void handleGetDest(void *msg);
51 #endif
52 #endif
54 static gm_alarm_t gmalarm;
56 /*#define CMK_USE_CHECKSUM*/
58 /******************************************************************************
60 * GM layer network statistics collection
62 *****************************************************************************/
64 #define GM_STATS 0
66 #if GM_STATS
67 static FILE *gmf; /* one file per processor */
68 static int *gm_stats; /* send count for each size */
69 static int possible_streamed = 0; /* possible streaming counts */
70 static int defrag = 0; /* number of defragment */
71 static int maxQueueLength = 0; /* maximum send queue length */
72 #endif
74 /******************************************************************************
76 * Send messages pending queue (used internally)
78 *****************************************************************************/
81 /* max length of pending messages */
82 #define MAXPENDINGSEND 500
84 typedef struct PendingMsgStruct
86 void *msg;
87 int length; /* length of message */
88 int size; /* size of message, usually around log2(length) */
89 int mach_id; /* receiver machine id */
90 int dataport; /* receiver data port */
91 int node_idx; /* receiver pe id */
92 struct PendingMsgStruct *next;
94 *PendingMsg;
96 static int pendinglen = 0;
98 /* reuse PendingMsg memory */
99 static PendingMsg pend_freelist=NULL;
101 #define FreePendingMsg(d) \
102 d->next = pend_freelist;\
103 pend_freelist = d;\
105 #define MallocPendingMsg(d) \
106 d = pend_freelist;\
107 if (d==0) {d = ((PendingMsg)malloc(sizeof(struct PendingMsgStruct)));\
108 _MEMCHECK(d);\
109 } else pend_freelist = d->next;
111 void enqueue_sending(char *msg, int length, OtherNode node, int size)
113 PendingMsg pm;
114 MallocPendingMsg(pm);
115 pm->msg = msg;
116 pm->length = length;
117 pm->mach_id = node->mach_id;
118 pm->dataport = node->dataport;
119 pm->node_idx = node-nodes;
120 pm->size = size;
121 pm->next = NULL;
122 if (node->sendhead == NULL) {
123 node->sendhead = node->sendtail = pm;
125 else {
126 node->sendtail->next = pm;
127 node->sendtail = pm;
129 pendinglen ++;
130 #if GM_STATS
131 if (pendinglen > maxQueueLength) maxQueueLength = pendinglen;
132 #endif
135 #define peek_sending(node) (node->sendhead)
137 #define dequeue_sending(node) \
138 if (node->sendhead != NULL) { \
139 node->sendhead = node->sendhead->next; \
140 pendinglen --; \
143 static void alarmcallback (void *context) {
144 MACHSTATE(4,"GM Alarm callback executed")
146 static int processEvent(gm_recv_event_t *e);
147 static void send_progress();
148 static void alarmInterrupt(int arg);
149 static int gmExit(int code,const char *msg);
150 static char *getErrorMsg(gm_status_t status);
152 /******************************************************************************
154 * DMA message pool
156 *****************************************************************************/
158 #define CMK_MSGPOOL 1
160 #define MAXMSGLEN 200
162 static char* msgpool[MAXMSGLEN];
163 static int msgNums = 0;
165 static int maxMsgSize = 0;
167 #define putPool(msg) { \
168 if (msgNums == MAXMSGLEN) gm_dma_free(gmport, msg); \
169 else msgpool[msgNums++] = msg; }
171 #define getPool(msg, len) { \
172 if (msgNums == 0) msg = gm_dma_malloc(gmport, maxMsgSize); \
173 else msg = msgpool[--msgNums]; \
177 /******************************************************************************
179 * CmiNotifyIdle()-- wait until a packet comes in
181 *****************************************************************************/
182 typedef struct {
183 char none;
184 } CmiIdleState;
186 static CmiIdleState *CmiNotifyGetState(void) { return NULL; }
188 static void CmiNotifyStillIdle(CmiIdleState *s);
190 static void CmiNotifyBeginIdle(CmiIdleState *s)
192 CmiNotifyStillIdle(s);
197 static void CmiNotifyStillIdle(CmiIdleState *s)
199 #if CMK_SHARED_VARS_UNAVAILABLE
200 /*No comm. thread-- listen on sockets for incoming messages*/
201 int nreadable;
202 gm_recv_event_t *e;
203 int pollMs = 4;
205 #define SLEEP_USING_ALARM 0
206 #if SLEEP_USING_ALARM /*Enable the alarm, so we don't sleep forever*/
207 gm_set_alarm (gmport, &gmalarm, (gm_u64_t) pollMs*1000, alarmcallback,
208 (void *)NULL );
209 #endif
211 #if SLEEP_USING_ALARM
212 MACHSTATE(3,"Blocking on receive {")
213 e = gm_blocking_receive_no_spin(gmport);
214 MACHSTATE(3,"} receive returned");
215 #else
216 MACHSTATE(3,"CmiNotifyStillIdle NonBlocking on receive {")
217 e = gm_receive(gmport);
218 MACHSTATE(3,"} CmiNotifyStillIdle nonblocking receive returned");
219 #endif
221 #if SLEEP_USING_ALARM /*Cancel the alarm*/
222 gm_cancel_alarm (&gmalarm);
223 #endif
225 /* have to handle this event now */
226 CmiCommLock();
227 nreadable = processEvent(e);
228 CmiCommUnlock();
229 if (nreadable) {
230 return;
232 #else
233 /*Comm. thread will listen on sockets-- just sleep*/
234 CmiIdleLock_sleep(&CmiGetState()->idle,5);
235 #endif
238 void CmiNotifyIdle(void) {
239 CmiNotifyStillIdle(NULL);
242 /****************************************************************************
244 * CheckSocketsReady
246 * Checks both sockets to see which are readable and which are writeable.
247 * We check all these things at the same time since this can be done for
248 * free with ``select.'' The result is stored in global variables, since
249 * this is essentially global state information and several routines need it.
251 ***************************************************************************/
253 int CheckSocketsReady(int withDelayMs)
255 int nreadable;
256 CMK_PIPE_DECL(withDelayMs);
258 CmiStdoutAdd(CMK_PIPE_SUB);
259 if (Cmi_charmrun_fd!=-1) CMK_PIPE_ADDREAD(Cmi_charmrun_fd);
261 nreadable=CMK_PIPE_CALL();
262 ctrlskt_ready_read = 0;
263 dataskt_ready_read = 0;
264 dataskt_ready_write = 0;
266 if (nreadable == 0) {
267 MACHSTATE(1,"} CheckSocketsReady (nothing readable)")
268 return nreadable;
270 if (nreadable==-1) {
271 CMK_PIPE_CHECKERR();
272 MACHSTATE(2,"} CheckSocketsReady (INTERRUPTED!)")
273 return CheckSocketsReady(0);
275 CmiStdoutCheck(CMK_PIPE_SUB);
276 if (Cmi_charmrun_fd!=-1)
277 ctrlskt_ready_read = CMK_PIPE_CHECKREAD(Cmi_charmrun_fd);
278 MACHSTATE(1,"} CheckSocketsReady")
279 return nreadable;
282 /***********************************************************************
283 * CommunicationServer()
285 * This function does the scheduling of the tasks related to the
286 * message sends and receives.
287 * It first check the charmrun port for message, and poll the gm event
288 * for send complete and outcoming messages.
290 ***********************************************************************/
292 /* always called from interrupt */
293 static void ServiceCharmrun_nolock()
295 int again = 1;
296 MACHSTATE(2,"ServiceCharmrun_nolock begin {")
297 while (again)
299 again = 0;
300 CheckSocketsReady(0);
301 if (ctrlskt_ready_read) { ctrl_getone(); again=1; }
302 if (CmiStdoutNeedsService()) { CmiStdoutService(); }
304 MACHSTATE(2,"} ServiceCharmrun_nolock end")
307 static void CommunicationServer_nolock(int withDelayMs) {
308 gm_recv_event_t *e;
310 MACHSTATE(2,"CommunicationServer_nolock start {")
311 while (1) {
312 MACHSTATE(3,"Non-blocking receive {")
313 e = gm_receive(gmport);
314 MACHSTATE(3,"} Non-blocking receive")
315 if (!processEvent(e)) break;
317 MACHSTATE(2,"}CommunicationServer_nolock end")
321 0: from smp thread
322 1: from interrupt
323 2: from worker thread
324 Note in netpoll mode, charmrun service is only performed in interrupt,
325 pingCharmrun is from sig alarm, so it is lock free
327 static void CommunicationServerNet(int withDelayMs, int where)
329 /* standalone mode */
330 if (Cmi_charmrun_pid == 0 && gmport == NULL) return;
332 MACHSTATE2(2,"CommunicationServer(%d) from %d {",withDelayMs, where)
334 if (where == 1) {
335 /* don't service charmrun if converse exits, this fixed a hang bug */
336 if (!machine_initiated_shutdown) ServiceCharmrun_nolock();
337 return;
340 LOG(GetClock(), Cmi_nodestart, 'I', 0, 0);
342 CmiCommLock();
343 CommunicationServer_nolock(withDelayMs);
344 CmiCommUnlock();
346 #if CMK_IMMEDIATE_MSG
347 if (where == 0)
348 CmiHandleImmediate();
349 #endif
351 MACHSTATE(2,"} CommunicationServer")
354 static void processMessage(char *msg, int len)
356 char *newmsg;
357 int rank, srcpe, seqno, magic, i;
358 unsigned int broot;
359 int size;
360 unsigned char checksum;
362 if (len >= DGRAM_HEADER_SIZE) {
363 DgramHeaderBreak(msg, rank, srcpe, magic, seqno, broot);
364 #ifdef CMK_USE_CHECKSUM
365 checksum = computeCheckSum(msg, len);
366 if (checksum == 0)
367 #else
368 if (magic == (Cmi_charmrun_pid&DGRAM_MAGIC_MASK))
369 #endif
371 OtherNode node = nodes_by_pe[srcpe];
372 /* check seqno */
373 if (seqno == node->recv_expect) {
374 node->recv_expect = ((seqno+1)&DGRAM_SEQNO_MASK);
376 else if (seqno < node->recv_expect) {
377 CmiPrintf("[%d] Warning: Past packet received from PE %d, something wrong with GM hardware? (expecting: %d seqno: %d)\n", CmiMyPe(), srcpe, node->recv_expect, seqno);
378 CmiPrintf("\n\n\t\t[%d] packet ignored!\n\n");
379 return;
381 else {
382 CmiPrintf("[%d] Error detected - Packet out of order from PE %d, something wrong with GM hardware? (expecting: %d got: %d)\n", CmiMyPe(), srcpe, node->recv_expect, seqno);
383 CmiAbort("\n\n\t\tPacket out of order!!\n\n");
385 newmsg = node->asm_msg;
386 if (newmsg == 0) {
387 size = CmiMsgHeaderGetLength(msg);
388 if (len > size) {
389 CmiPrintf("size: %d, len:%d.\n", size, len);
390 CmiAbort("\n\n\t\tLength mismatch!!\n\n");
392 newmsg = (char *)CmiAlloc(size);
393 _MEMCHECK(newmsg);
394 memcpy(newmsg, msg, len);
395 node->asm_rank = rank;
396 node->asm_total = size;
397 node->asm_fill = len;
398 node->asm_msg = newmsg;
399 } else {
400 size = len - DGRAM_HEADER_SIZE;
401 if (node->asm_fill+size > node->asm_total) {
402 CmiPrintf("asm_total: %d, asm_fill: %d, len:%d.\n", node->asm_total, node->asm_fill, len);
403 CmiAbort("\n\n\t\tLength mismatch!!\n\n");
405 memcpy(newmsg + node->asm_fill, msg+DGRAM_HEADER_SIZE, size);
406 node->asm_fill += size;
408 /* get a full packet */
409 if (node->asm_fill == node->asm_total) {
410 int total_size = node->asm_total;
411 node->asm_msg = 0;
413 /* do it after integration - the following function may re-entrant */
414 #if CMK_BROADCAST_SPANNING_TREE
415 if (rank == DGRAM_BROADCAST
416 #if CMK_NODE_QUEUE_AVAILABLE
417 || rank == DGRAM_NODEBROADCAST
418 #endif
420 SendSpanningChildren(NULL, 0, total_size, newmsg, broot, rank);
421 #elif CMK_BROADCAST_HYPERCUBE
422 if (rank == DGRAM_BROADCAST
423 #if CMK_NODE_QUEUE_AVAILABLE
424 || rank == DGRAM_NODEBROADCAST
425 #endif
427 SendHypercube(NULL, 0, total_size, newmsg, broot, rank);
428 #endif
430 switch (rank) {
431 case DGRAM_BROADCAST: {
432 for (i=1; i<_Cmi_mynodesize; i++)
433 CmiPushPE(i, CopyMsg(newmsg, total_size));
434 CmiPushPE(0, newmsg);
435 break;
437 #if CMK_NODE_QUEUE_AVAILABLE
438 case DGRAM_NODEBROADCAST:
439 case DGRAM_NODEMESSAGE: {
440 CmiPushNode(newmsg);
441 break;
443 #endif
444 default:
445 CmiPushPE(rank, newmsg);
446 } /* end of switch */
449 else {
450 #ifdef CMK_USE_CHECKSUM
451 CmiPrintf("[%d] message ignored: checksum (%d) not 0!\n", CmiMyPe(), checksum);
452 #else
453 CmiPrintf("[%d] message ignored: magic not agree:%d != %d!\n",
454 CmiMyPe(), magic, Cmi_charmrun_pid&DGRAM_MAGIC_MASK);
455 #endif
456 CmiPrintf("recved: rank:%d src:%d mag:%d seqno:%d len:%d\n", rank, srcpe, magic, seqno, len);
459 else {
460 CmiPrintf("[%d] message ignored: size is too small: %d!\n", CmiMyPe(), len);
461 CmiPrintf("[%d] possible size: %d\n", CmiMsgHeaderGetLength(msg));
465 /* return 1 - recv'ed 0 - no msg */
466 static int processEvent(gm_recv_event_t *e)
468 int size, len;
469 char *msg, *buf;
470 int status = 1;
471 switch (gm_ntohc(e->recv.type))
473 /* avoid copy from message to buffer */
474 case GM_FAST_PEER_RECV_EVENT:
475 case GM_FAST_RECV_EVENT:
476 MACHSTATE(4,"Incoming message")
477 msg = gm_ntohp(e->recv.message);
478 len = gm_ntohl(e->recv.length);
479 processMessage(msg, len);
480 break;
482 case GM_FAST_HIGH_PEER_RECV_EVENT:
483 case GM_FAST_HIGH_RECV_EVENT:
485 case GM_HIGH_RECV_EVENT:
486 case GM_RECV_EVENT:
487 MACHSTATE(4,"Incoming message")
488 size = gm_ntohc(e->recv.size);
489 msg = gm_ntohp(e->recv.buffer);
490 len = gm_ntohl(e->recv.length);
491 processMessage(msg, len);
492 gm_provide_receive_buffer(gmport, msg, size, GM_HIGH_PRIORITY);
493 break;
494 case GM_NO_RECV_EVENT:
495 return 0;
496 case GM_ALARM_EVENT:
497 status = 0;
498 default:
499 MACHSTATE1(3,"Unrecognized GM event %d", gm_ntohc(e->recv.type))
500 gm_unknown(gmport, e);
502 return status;
506 #ifdef __FAULT__
507 void drop_send_callback(struct gm_port *p, void *context, gm_status_t status)
509 PendingMsg out = (PendingMsg)context;
510 void *msg = out->msg;
512 printf("[%d] drop_send_callback dropped msg: %p\n", CmiMyPe(), msg);
513 #if !CMK_MSGPOOL
514 gm_dma_free(gmport, msg);
515 #else
516 putPool(msg);
517 #endif
519 FreePendingMsg(out);
521 #else
522 void send_callback(struct gm_port *p, void *context, gm_status_t status);
523 void drop_send_callback(struct gm_port *p, void *context, gm_status_t status)
525 PendingMsg out = (PendingMsg)context;
526 void *msg = out->msg;
527 gm_send_with_callback(gmport, msg, out->size, out->length,
528 GM_HIGH_PRIORITY, out->mach_id, out->dataport,
529 send_callback, out);
531 #endif
533 void send_callback(struct gm_port *p, void *context, gm_status_t status)
535 PendingMsg out = (PendingMsg)context;
536 void *msg = out->msg;
537 unsigned char cksum;
538 OtherNode node = nodes+out->node_idx;
540 if (status != GM_SUCCESS) {
541 int srcpe, seqno, magic, broot;
542 char rank;
543 char *errmsg;
544 DgramHeaderBreak(msg, rank, srcpe, magic, seqno, broot);
545 errmsg = getErrorMsg(status);
546 CmiPrintf("GM Error> PE:%d send to msg %p node %d rank %d mach_id %d port %d len %d size %d failed to complete (error %d): %s\n", srcpe, msg, out->node_idx, rank, out->mach_id, out->dataport, out->length, out->size, status, errmsg);
547 switch (status) {
548 #ifdef __FAULT__
549 case GM_SEND_DROPPED: {
550 OtherNode node = nodes + out->node_idx;
551 if (out->mach_id == node->mach_id && out->dataport == node->dataport) {
552 /* it not crashed, resent */
553 gm_send_with_callback(gmport, msg, out->size, out->length,
554 GM_HIGH_PRIORITY, out->mach_id, out->dataport,
555 send_callback, out);
556 return;
559 default: {
560 gm_drop_sends (gmport, GM_HIGH_PRIORITY, out->mach_id, out->dataport,
561 drop_send_callback, out);
562 return;
564 #else
565 case GM_SEND_TIMED_OUT: {
566 CmiPrintf("gm send_callback timeout, drop sends and re-enable send. \n");
567 gm_drop_sends (gmport, GM_HIGH_PRIORITY, out->mach_id, out->dataport,
568 drop_send_callback, out);
569 node->disable=1; /* disable normal send */
570 return;
572 case GM_SEND_DROPPED:
573 CmiPrintf("Got DROPPED_SEND notification, resend\n");
574 gm_send_with_callback(gmport, msg, out->size, out->length,
575 GM_HIGH_PRIORITY, out->mach_id, out->dataport,
576 send_callback, out);
577 return;
578 default:
579 CmiAbort("gm send_callback failed");
580 #endif
584 #ifdef CMK_USE_CHECKSUM
587 cksum = computeCheckSum((unsigned char*)msg, out->length);
588 if (cksum != 0) {
589 CmiPrintf("[%d] Message altered during send, checksum (%d) does not agree!\n", CmiMyPe(), cksum);
590 CmiAbort("Myrinet error was detected!\n");
594 #endif
596 #if !CMK_MSGPOOL
597 gm_dma_free(gmport, msg);
598 #else
599 putPool(msg);
600 #endif
602 gm_free_send_token (gmport, GM_HIGH_PRIORITY);
603 FreePendingMsg(out);
605 /* send message pending in gm firmware */
606 node->gm_pending --;
607 if (node->disable && node->gm_pending == 0) node->disable = 0;
609 /* since we have one free send token, start next send */
610 send_progress();
613 static void send_progress()
615 static int nextnode = 0;
616 int skip;
617 OtherNode node;
618 PendingMsg out;
620 while (1)
622 int sent = 0;
623 for (skip=0; skip<_Cmi_numnodes; skip++) {
624 node = nodes+nextnode;
625 nextnode = (nextnode + 1) % _Cmi_numnodes;
626 if (node->disable) continue;
627 out = peek_sending(node);
628 if (!out) continue;
629 if (gm_alloc_send_token(gmport, GM_HIGH_PRIORITY)) {
630 if (dataport == out->dataport) {
631 gm_send_to_peer_with_callback(gmport, out->msg, out->size,
632 out->length, GM_HIGH_PRIORITY, out->mach_id,
633 send_callback, out);
635 else {
636 gm_send_with_callback(gmport, out->msg, out->size, out->length,
637 GM_HIGH_PRIORITY, out->mach_id, out->dataport,
638 send_callback, out);
640 node->gm_pending ++;
641 sent=1;
642 /* dequeue out, but not free it, used at callback */
643 dequeue_sending(node);
644 #if GM_STATS
645 gm_stats[out->size] ++;
646 /* if we streaming, count how many message we possibly can combine */
648 PendingMsg curout;
649 curout = peek_sending(node);
650 if (curout) {
651 out = curout->next;
652 while (out) {
653 possible_streamed ++;
654 out = out->next;
658 #endif
660 else return; /* no send token */
661 } /* end for */
662 if (sent==0) return;
667 /***********************************************************************
668 * DeliverViaNetwork()
670 * This function is responsible for all non-local transmission. It
671 * first allocate a send token, if fails, put the send message to
672 * penging message queue, otherwise invoke the GM send.
673 ***********************************************************************/
675 void EnqueueOutgoingDgram
676 (OutgoingMsg ogm, char *ptr, int dlen, OtherNode node, int rank, int broot)
678 char *buf;
679 int size, len, seqno;
680 int alloclen, allocSize;
682 /* CmiPrintf("DeliverViaNetwork: size:%d\n", size); */
684 /* don't have to worry about ref count because we do copy, and
685 ogm can be free'ed right away */
686 /* ogm->refcount++; */
688 len = dlen + DGRAM_HEADER_SIZE;
690 /* allocate DMAable memory to prepare sending */
691 /* FIXME: another memory copy here from user buffer to DMAable buffer */
692 /* which however means the user buffer is untouched and can be reused */
693 #if !CMK_MSGPOOL
694 buf = (char *)gm_dma_malloc(gmport, len);
695 #else
696 getPool(buf, len);
697 #endif
698 _MEMCHECK(buf);
700 seqno = node->send_next;
701 DgramHeaderMake(buf, rank, ogm->src, Cmi_charmrun_pid, seqno, broot);
702 node->send_next = ((seqno+1)&DGRAM_SEQNO_MASK);
703 memcpy(buf+DGRAM_HEADER_SIZE, ptr, dlen);
704 #ifdef CMK_USE_CHECKSUM
706 DgramHeader *head = (DgramHeader *)buf;
707 head->magic ^= computeCheckSum(buf, len);
709 #endif
710 size = gm_min_size_for_length(len);
712 /* if queue is not empty, enqueue msg. this is to guarantee the order */
713 if (pendinglen != 0) {
714 /* this potential screw up broadcast, because bcast packets can not be
715 interrupted by other sends in CommunicationServer_nolock */
716 enqueue_sending(buf, len, node, size);
717 return;
719 enqueue_sending(buf, len, node, size);
720 send_progress();
723 /* copy is ignored, since we always copy */
724 void DeliverViaNetwork(OutgoingMsg ogm, OtherNode node, int rank, unsigned int broot, int copy)
726 int size; char *data;
728 size = ogm->size - DGRAM_HEADER_SIZE;
729 data = ogm->data + DGRAM_HEADER_SIZE;
730 #if GM_STATS
731 if (size > Cmi_dgram_max_data) defrag ++;
732 #endif
733 while (size > Cmi_dgram_max_data) {
734 EnqueueOutgoingDgram(ogm, data, Cmi_dgram_max_data, node, rank, broot);
735 data += Cmi_dgram_max_data;
736 size -= Cmi_dgram_max_data;
738 if (size>0) EnqueueOutgoingDgram(ogm, data, size, node, rank, broot);
740 /* a simple flow control */
741 while (pendinglen >= MAXPENDINGSEND) {
742 /* pending max len exceeded, busy wait until get a token
743 Doing this surprisingly improve the performance by 2s for 200MB msg */
744 MACHSTATE(4,"Polling until token available")
745 CommunicationServer_nolock(0);
749 /* simple barrier at machine layer */
750 /* assuming no other flying messages */
751 static void send_callback_nothing(struct gm_port *p, void *context, gm_status_t status)
753 gm_dma_free(gmport, context);
756 static void sendBarrierMessage(int pe)
758 int len = 32;
759 char *buf = (char *)gm_dma_malloc(gmport, len);
760 int size = gm_min_size_for_length(len);
761 OtherNode node = nodes + pe;
762 CmiAssert(buf);
763 gm_send_with_callback(gmport, buf, size, len,
764 GM_HIGH_PRIORITY, node->mach_id, node->dataport,
765 send_callback_nothing, buf);
768 static void recvBarrierMessage()
770 gm_recv_event_t *e;
771 int size, len;
772 char *msg;
773 while (1) {
774 e = gm_receive(gmport);
775 switch (gm_ntohc(e->recv.type))
777 case GM_HIGH_RECV_EVENT:
778 case GM_RECV_EVENT:
779 MACHSTATE(4,"Incoming message")
780 size = gm_ntohc(e->recv.size);
781 msg = gm_ntohp(e->recv.buffer);
782 len = gm_ntohl(e->recv.length);
783 gm_provide_receive_buffer(gmport, msg, size, GM_HIGH_PRIORITY);
784 return;
785 case GM_NO_RECV_EVENT:
786 continue ;
787 default:
788 MACHSTATE1(3,"Unrecognized GM event %d", gm_ntohc(e->recv.type))
789 gm_unknown(gmport, e);
794 /* happen at node level */
795 int CmiBarrier()
797 int len, size, i;
798 int status;
799 int count = 0;
800 OtherNode node;
801 int numnodes = CmiNumNodes();
802 if (CmiMyRank() == 0) {
803 /* every one send to pe 0 */
804 if (CmiMyNode() != 0) {
805 sendBarrierMessage(0);
807 /* printf("[%d] HERE\n", CmiMyPe()); */
808 if (CmiMyNode() == 0)
810 for (count = 1; count < numnodes; count ++)
812 recvBarrierMessage();
814 /* pe 0 broadcast */
815 for (i=1; i<=BROADCAST_SPANNING_FACTOR; i++) {
816 int p = i;
817 if (p > numnodes - 1) break;
818 /* printf("[%d] BD => %d \n", CmiMyPe(), p); */
819 sendBarrierMessage(p);
822 /* non 0 node waiting */
823 if (CmiMyNode() != 0)
825 recvBarrierMessage();
826 for (i=1; i<=BROADCAST_SPANNING_FACTOR; i++) {
827 int p = CmiMyNode();
828 p = BROADCAST_SPANNING_FACTOR*p + i;
829 if (p > numnodes - 1) break;
830 p = p%numnodes;
831 /* printf("[%d] RELAY => %d \n", CmiMyPe(), p); */
832 sendBarrierMessage(p);
836 CmiNodeAllBarrier();
837 /* printf("[%d] OUT of barrier \n", CmiMyPe()); */
838 return 0;
841 /* everyone sends a message to pe 0 and go on */
842 int CmiBarrierZero()
844 int i;
846 if (CmiMyRank() == 0) {
847 if (CmiMyNode()) {
848 sendBarrierMessage(0);
850 else {
851 for (i=0; i<CmiNumNodes()-1; i++)
853 recvBarrierMessage();
857 CmiNodeAllBarrier();
858 return 0;
861 /***********************************************************************
862 * CmiMachineInit()
864 * This function intialize the GM board. Set receive buffer
866 ***********************************************************************/
868 static int maxsize;
870 void CmiMachineInit(char **argv)
872 int dataport_max=16; /*number of largest GM port to check*/
873 gm_status_t status;
874 int device, i, j;
875 int retry = 10;
876 char *buf;
877 int mlen;
879 MACHSTATE(3,"CmiMachineInit {");
881 gmport = NULL;
882 /* standalone mode */
883 if (dataport == -1) return;
885 /* try a few times init gm */
886 for (i=0; i<retry; i++) {
887 status = gm_init();
888 if (status == GM_SUCCESS) break;
889 sleep(1);
891 if (status != GM_SUCCESS) {
892 printf("Cannot open GM library (does the machine have a GM card?)\n");
893 gm_perror("gm_init", status);
894 return;
897 device = 0;
898 for (dataport=2;dataport<dataport_max;dataport++) {
899 char portname[200];
900 sprintf(portname, "converse_port%d_%d", Cmi_charmrun_pid, _Cmi_mynode);
901 #if CMK_USE_GM2
902 status = gm_open(&gmport, device, dataport, portname, GM_API_VERSION_2_0);
903 #else
904 status = gm_open(&gmport, device, dataport, portname, GM_API_VERSION_1_1);
905 #endif
906 if (status == GM_SUCCESS) { break; }
908 if (dataport==dataport_max)
909 { /* Couldn't open any GM port... */
910 dataport=0;
911 return;
914 /* get our node id */
915 status = gm_get_node_id(gmport, (unsigned int *)&Cmi_mach_id);
916 if (status != GM_SUCCESS) { gm_perror("gm_get_node_id", status); return; }
917 #if CMK_USE_GM2
918 gm_node_id_to_global_id(gmport, (unsigned int)Cmi_mach_id, (unsigned int*)&Cmi_mach_id);
919 #endif
921 /* default abort will take care of gm clean up */
922 skt_set_abort(gmExit);
924 /* set up recv buffer */
926 maxsize = gm_min_size_for_length(4096);
927 Cmi_dgram_max_data = 4096 - DGRAM_HEADER_SIZE;
929 maxsize = 16;
930 CmiGetArgIntDesc(argv,"+gm_maxsize",&maxsize,"maximum packet size in rank (2^maxsize)");
932 #if GM_STATS
933 gm_stats = (int*)malloc((maxsize+1) * sizeof(int));
934 for (i=0; i<=maxsize; i++) gm_stats[i] = 0;
935 #endif
937 for (i=1; i<=maxsize; i++) {
938 int len = gm_max_length_for_size(i);
939 int num = 2;
941 maxMsgSize = len;
943 if (i<5) num = 0;
944 else if (i<7) num = 4;
945 else if (i<13) num = 20;
946 else if (i<17) num = 10;
947 else if (i>22) num = 1;
948 for (j=0; j<num; j++) {
949 buf = gm_dma_malloc(gmport, len);
950 _MEMCHECK(buf);
951 gm_provide_receive_buffer(gmport, buf, i, GM_HIGH_PRIORITY);
954 Cmi_dgram_max_data = maxMsgSize - DGRAM_HEADER_SIZE;
956 status = gm_set_acceptable_sizes (gmport, GM_HIGH_PRIORITY, (1<<(maxsize+1))-1);
958 gm_free_send_tokens (gmport, GM_HIGH_PRIORITY,
959 gm_num_send_tokens (gmport));
961 #if CMK_MSGPOOL
962 msgpool[msgNums++] = gm_dma_malloc(gmport, maxMsgSize);
963 #endif
965 /* alarm will ping charmrun */
966 gm_initialize_alarm(&gmalarm);
968 MACHSTATE(3,"} CmiMachineInit");
971 void CmiCommunicationInit(char **argv)
975 void MachineExit()
977 #if GM_STATS
978 int i;
979 int mype;
980 char fname[128];
981 sprintf(fname, "gm-stats.%d", CmiMyPe());
982 gmf = fopen(fname, "w");
983 mype = CmiMyPe();
984 for (i=5; i<=maxsize; i++) {
985 fprintf(gmf, "[%d] size:%d count:%d\n", mype, i, gm_stats[i]);
987 fprintf(gmf, "[%d] max quelen: %d possible streaming: %d defrag: %d \n", mype, maxQueueLength, possible_streamed, defrag);
988 fclose(gmf);
989 #endif
992 void CmiGmConvertMachineID(unsigned int *mach_id)
994 #if CMK_USE_GM2
995 gm_status_t status;
996 int newid;
997 /* skip if running without charmrun */
998 if (Cmi_charmrun_pid == 0 && gmport == NULL) return;
999 status = gm_global_id_to_node_id(gmport, *mach_id, &newid);
1000 if (status == GM_SUCCESS) *mach_id = newid;
1001 #endif
1004 /* make sure other gm nodes are accessible in routing table */
1005 void CmiCheckGmStatus()
1007 int i;
1008 int doabort = 0;
1009 if (Cmi_charmrun_pid == 0 && gmport == NULL) return;
1010 if (gmport == NULL) machine_exit(1);
1011 for (i=0; i<_Cmi_numnodes; i++) {
1012 gm_status_t status;
1013 char uid[6], str[100];
1014 unsigned int mach_id=nodes[i].mach_id;
1015 status = gm_node_id_to_unique_id(gmport, mach_id, uid);
1016 if (status != GM_SUCCESS || ( uid[0]==0 && uid[1]== 0
1017 && uid[2]==0 && uid[3]==0 && uid[4]==0 && uid[5]==0)) {
1018 CmiPrintf("Error> gm node %d can't contact node %d. \n", CmiMyPe(), i);
1019 doabort = 1;
1021 /*CmiPrintf("[%d]: %d mach:%d ip:%d %d %d %d\n", CmiMyPe(), i, mach_id, nodes[i].IP,uid[0], uid[3], uid[5]);*/
1023 if (doabort) CmiAbort("");
1026 static int gmExit(int code,const char *msg)
1028 fprintf(stderr,"Fatal socket error: code %d-- %s\n",code,msg);
1029 machine_exit(code);
1033 static char *getErrorMsg(gm_status_t status)
1035 char *errmsg;
1036 switch (status) {
1037 case GM_SEND_TIMED_OUT:
1038 errmsg = "send time out"; break;
1039 case GM_SEND_REJECTED:
1040 errmsg = "send rejected"; break;
1041 case GM_SEND_TARGET_NODE_UNREACHABLE:
1042 errmsg = "target node unreachable"; break;
1043 case GM_SEND_TARGET_PORT_CLOSED:
1044 errmsg = "target port closed"; break;
1045 case GM_SEND_DROPPED:
1046 errmsg = "send dropped"; break;
1047 default:
1048 errmsg = "unknown error"; break;
1050 return errmsg;
1053 #ifdef __ONESIDED_IMPL
1054 #ifdef __ONESIDED_GM_HARDWARE
1056 struct CmiCb {
1057 CmiRdmaCallbackFn fn;
1058 void *param;
1060 typedef struct CmiCb CmiCb;
1062 struct CmiRMA {
1063 int type;
1064 union {
1065 int completed;
1066 CmiCb *cb;
1067 } ready;
1069 typedef struct CmiRMA CmiRMA;
1071 struct CmiRMAMsg {
1072 char core[CmiMsgHeaderSizeBytes];
1073 CmiRMA* stat;
1075 typedef struct CmiRMAMsg CmiRMAMsg;
1077 struct RMAPutMsg {
1078 char core[CmiMsgHeaderSizeBytes];
1079 void *Saddr;
1080 void *Taddr;
1081 unsigned int size;
1082 unsigned int targetId;
1083 unsigned int sourceId;
1084 unsigned int dataport;
1085 CmiRMA *stat;
1087 typedef struct RMAPutMsg RMAPutMsg;
1089 void *CmiDMAAlloc(int size) {
1090 void *addr = gm_dma_calloc(gmport, 1, gm_max_length_for_size(size));
1091 //gm_allow_remote_memory_access(gmport);
1092 return addr;
1095 int CmiRegisterMemory(void *addr, unsigned int size) {
1096 gm_status_t status;
1097 status = gm_register_memory(gmport, addr, size);
1098 if (status != GM_SUCCESS) {
1099 CmiPrintf("Cannot register memory at %p of size %d\n",addr,size);
1100 gm_perror("registerMemory", status);
1101 return 0;
1103 //gm_allow_remote_memory_access(gmport);
1104 return 1;
1107 int CmiUnRegisterMemory(void *addr, unsigned int size) {
1108 gm_status_t status;
1109 status = gm_deregister_memory(gmport, addr, size);
1110 if (status != GM_SUCCESS) {
1111 CmiPrintf("Cannot unregister memory at %p of size %d\n",addr,size);
1112 gm_perror("UnregisterMemory", status);
1113 return 0;
1115 return 1;
1118 void put_callback(struct gm_port *p, void *context, gm_status_t status)
1120 RMAPutMsg *out = (RMAPutMsg*)context;
1121 unsigned int destMachId = (nodes_by_pe[out->targetId])->mach_id;
1122 if (status != GM_SUCCESS) {
1123 switch (status) {
1124 case GM_SEND_DROPPED:
1125 CmiPrintf("Got DROPPED_PUT notification, resend\n");
1126 gm_directed_send_with_callback(gmport,out->Saddr,(gm_remote_ptr_t)(out->Taddr),
1127 out->size,GM_HIGH_PRIORITY,
1128 destMachId, out->dataport,
1129 put_callback, out);
1130 return;
1131 default:
1132 CmiAbort("gm send_callback failed");
1135 if(out->stat->type==1) {
1136 out->stat->ready.completed = 1;
1137 //the handle is active, and the user will clean it
1139 else {
1140 (*(out->stat->ready.cb->fn))(out->stat->ready.cb->param);
1141 //I do not undestand, on turing the following two frees become double frees??
1142 CmiFree(out->stat->ready.cb);
1143 CmiFree(out->stat); //clean up the internal handle
1145 gm_free_send_token (gmport, GM_HIGH_PRIORITY);
1146 return;
1149 void *CmiPut(unsigned int sourceId, unsigned int targetId, void *Saddr, void *Taddr, unsigned int size) {
1150 unsigned int dataport = (nodes_by_pe[targetId])->dataport;
1151 unsigned int destMachId = (nodes_by_pe[targetId])->mach_id;
1152 RMAPutMsg *context = (RMAPutMsg*)CmiAlloc(sizeof(RMAPutMsg));
1153 context->Saddr = Saddr;
1154 context->Taddr = Taddr;
1155 context->size = size;
1156 context->targetId = targetId;
1157 context->sourceId = sourceId;
1158 context->dataport = dataport;
1159 context->stat = (CmiRMA*)CmiAlloc(sizeof(CmiRMA));
1160 context->stat->type = 1;
1161 context->stat->ready.completed = 0;
1162 void *stat = (void*)(context->stat);
1163 //get a token before the put
1164 if (gm_alloc_send_token(gmport, GM_HIGH_PRIORITY)) {
1165 //perform the put
1166 gm_directed_send_with_callback(gmport,Saddr,(gm_remote_ptr_t)Taddr,size,
1167 GM_HIGH_PRIORITY,destMachId,dataport,
1168 put_callback,(void*)context);
1170 return stat;
1173 void CmiPutCb(unsigned int sourceId, unsigned int targetId, void *Saddr, void *Taddr, unsigned int size, CmiRdmaCallbackFn fn, void *param) {
1174 unsigned int dataport = (nodes_by_pe[targetId])->dataport;
1175 unsigned int destMachId = (nodes_by_pe[targetId])->mach_id;
1176 RMAPutMsg *context = (RMAPutMsg*)CmiAlloc(sizeof(RMAPutMsg));
1178 context->Saddr = Saddr;
1179 context->Taddr = Taddr;
1180 context->size = size;
1181 context->targetId = targetId;
1182 context->sourceId = sourceId;
1183 context->dataport = dataport;
1184 context->stat = (CmiRMA*)CmiAlloc(sizeof(CmiRMA));
1185 context->stat->type = 0;
1186 context->stat->ready.cb = (CmiCb*)CmiAlloc(sizeof(CmiCb));
1187 context->stat->ready.cb->fn = fn;
1188 context->stat->ready.cb->param = param;
1189 //get a token before the put
1190 if (gm_alloc_send_token(gmport, GM_HIGH_PRIORITY)) {
1191 //perform the put
1192 gm_directed_send_with_callback(gmport,Saddr,(gm_remote_ptr_t)Taddr,size,
1193 GM_HIGH_PRIORITY,destMachId,dataport,
1194 put_callback,(void*)context);
1196 return;
1199 void handleGetSrc(void *msg) {
1200 CmiRMA* stat = ((CmiRMAMsg*)msg)->stat;
1201 if(stat->type==1) {
1202 stat->ready.completed = 1;
1203 //the handle is active, and the user will clean it
1205 else {
1206 (*(stat->ready.cb->fn))(stat->ready.cb->param);
1207 //I do not undestand, on turing the following two frees become double frees??
1208 CmiFree(stat->ready.cb);
1209 CmiFree(stat); //clean up the internal handle
1211 CmiFree(msg);
1212 return;
1215 void get_callback_dest(struct gm_port *p, void *context, gm_status_t status)
1217 RMAPutMsg *out = (RMAPutMsg*)context;
1218 int sizeRmaStat = sizeof(CmiRMAMsg);
1219 char *msgRmaStat;
1220 unsigned int srcMachId = (nodes_by_pe[out->targetId])->mach_id;
1221 if (status != GM_SUCCESS) {
1222 switch (status) {
1223 case GM_SEND_DROPPED:
1224 CmiPrintf("Got DROPPED_PUT notification, resend\n");
1225 gm_directed_send_with_callback(gmport,out->Saddr,(int)(out->Taddr),
1226 out->size,GM_HIGH_PRIORITY,
1227 out->targetId, out->dataport,
1228 get_callback_dest, out);
1229 return;
1230 default:
1231 CmiAbort("gm send_callback failed");
1234 gm_free_send_token (gmport, GM_HIGH_PRIORITY);
1235 //send a message to update the context status on the source node
1236 msgRmaStat = (void*)CmiAlloc(sizeRmaStat);
1237 ((CmiRMAMsg*)msgRmaStat)->stat = out->stat;
1238 CmiSetHandler(msgRmaStat,getSrcHandler);
1239 CmiSyncSendAndFree(out->targetId,sizeRmaStat,msgRmaStat);
1240 return;
1243 void handleGetDest(void *msg) {
1244 RMAPutMsg *context1 = (RMAPutMsg*)msg;
1245 RMAPutMsg *context = (RMAPutMsg*)CmiAlloc(sizeof(RMAPutMsg));
1246 unsigned int srcMachId = (nodes_by_pe[context1->sourceId])->mach_id;
1247 context->Saddr = context1->Taddr;
1248 context->Taddr = context1->Saddr;
1249 context->size = context1->size;
1250 context->targetId = context1->sourceId;
1251 context->sourceId = context1->targetId;
1252 context->dataport = context1->dataport;
1253 context->stat = context1->stat;
1254 //get a token before the put
1255 if (gm_alloc_send_token(gmport, GM_HIGH_PRIORITY)) {
1256 //perform the put
1257 gm_directed_send_with_callback(gmport,context->Saddr,(gm_remote_ptr_t)context->Taddr,context->size,
1258 GM_HIGH_PRIORITY,srcMachId,context->dataport,
1259 get_callback_dest,(void*)context);
1261 CmiFree(msg);
1262 return;
1264 /* Send a converse message to the destination to perform a get from destination.
1265 * The destination then sends a put to write the actual message
1266 * Then it sends a converse message to the sender to update the completed flag
1268 void *CmiGet(unsigned int sourceId, unsigned int targetId, void *Saddr, void *Taddr, unsigned int size) {
1269 int dataport = (nodes_by_pe[targetId])->dataport;
1270 int sizeRma;
1271 char *msgRma;
1272 RMAPutMsg *context;
1273 sizeRma = sizeof(RMAPutMsg);
1274 msgRma = (void*)CmiAlloc(sizeRma);
1276 context = (RMAPutMsg*)msgRma;
1277 context->Saddr = Saddr;
1278 context->Taddr = Taddr;
1279 context->size = size;
1280 context->targetId = targetId;
1281 context->sourceId = sourceId;
1282 context->dataport = dataport;
1283 context->stat = (CmiRMA*)CmiAlloc(sizeof(CmiRMA));
1284 context->stat->type = 1;
1285 context->stat->ready.completed = 0;
1286 void *stat = (void*)(context->stat);
1288 CmiSetHandler(msgRma,getDestHandler);
1289 CmiSyncSendAndFree(targetId,sizeRma,msgRma);
1291 return stat;
1294 void CmiGetCb(unsigned int sourceId, unsigned int targetId, void *Saddr, void *Taddr, unsigned int size, CmiRdmaCallbackFn fn, void *param) {
1295 int dataport = (nodes_by_pe[targetId])->dataport;
1296 int sizeRma;
1297 char *msgRma;
1298 RMAPutMsg *context;
1299 sizeRma = sizeof(RMAPutMsg);
1300 msgRma = (void*)CmiAlloc(sizeRma);
1302 context = (RMAPutMsg*)msgRma;
1303 context->Saddr = Saddr;
1304 context->Taddr = Taddr;
1305 context->size = size;
1306 context->targetId = targetId;
1307 context->sourceId = sourceId;
1308 context->dataport = dataport;
1309 context->stat = (CmiRMA*)CmiAlloc(sizeof(CmiRMA));
1310 context->stat->type = 0;
1311 context->stat->ready.cb = (CmiCb*)CmiAlloc(sizeof(CmiCb));
1312 context->stat->ready.cb->fn = fn;
1313 context->stat->ready.cb->param = param;
1315 CmiSetHandler(msgRma,getDestHandler);
1316 CmiSyncSendAndFree(targetId,sizeRma,msgRma);
1317 return;
1321 int CmiWaitTest(void *obj){
1322 CmiRMA *stat = (CmiRMA*)obj;
1323 return stat->ready.completed;
1326 #endif
1327 #endif
1329 /*@}*/