AMPI: fix mismatched free/delete[] identified by valgrind
[charm.git] / src / arch / net / machine-eth.c
blobcfe6dfc013b65167ce8b4ad72dfe90f163ba5dc1
1 /** @file
2 * UDP implementation of Converse NET version
3 * @ingroup NET
4 * contains only UDP specific code for:
5 * - CmiMachineInit()
6 * - CmiCommunicationInit()
7 * - CmiNotifyIdle()
8 * - DeliverViaNetwork()
9 * - CommunicationServer()
11 moved from machine.c by
12 Gengbin Zheng, gzheng@uiuc.edu 4/22/2001
15 /**
16 * @addtogroup NET
17 * @{
20 /******************************************************************************
22 * CmiNotifyIdle()-- wait until a packet comes in
24 *****************************************************************************/
26 typedef struct {
27 int sleepMs; /*Milliseconds to sleep while idle*/
28 int nIdles; /*Number of times we've been idle in a row*/
29 CmiState cs; /*Machine state*/
30 } CmiIdleState;
32 static CmiIdleState *CmiNotifyGetState(void)
34 CmiIdleState *s=(CmiIdleState *)malloc(sizeof(CmiIdleState));
35 s->sleepMs=0;
36 s->nIdles=0;
37 s->cs=CmiGetState();
38 return s;
41 static void CmiNotifyBeginIdle(CmiIdleState *s)
43 s->sleepMs=0;
44 s->nIdles=0;
46 MACHSTATE(3,"begin idle")
49 static void CmiNotifyStillIdle(CmiIdleState *s)
51 #if CMK_SHARED_VARS_UNAVAILABLE
52 /*No comm. thread-- listen on sockets for incoming messages*/
53 MACHSTATE(1,"idle commserver {")
54 CommunicationServer(Cmi_idlepoll?0:10, COMM_SERVER_FROM_SMP);
55 MACHSTATE(1,"} idle commserver")
56 #else
57 #if CMK_SHARED_VARS_POSIX_THREADS_SMP
58 if(_Cmi_sleepOnIdle ){
59 #endif
60 int nSpins=20; /*Number of times to spin before sleeping*/
61 s->nIdles++;
62 if (s->nIdles>nSpins) { /*Start giving some time back to the OS*/
63 s->sleepMs+=2;
64 if (s->sleepMs>10) s->sleepMs=10;
66 /*Comm. thread will listen on sockets-- just sleep*/
67 if (s->sleepMs>0) {
68 MACHSTATE1(3,"idle lock(%d) {",CmiMyPe())
69 CmiIdleLock_sleep(&s->cs->idle,s->sleepMs);
70 CsdResetPeriodic(); /* check ccd callbacks when I am awakened */
71 MACHSTATE1(3,"} idle lock(%d)",CmiMyPe())
73 #if CMK_SHARED_VARS_POSIX_THREADS_SMP
75 #endif
76 #endif
79 void CmiNotifyIdle(void) {
80 CmiIdleState s;
81 s.sleepMs=5;
82 CmiNotifyStillIdle(&s);
85 /****************************************************************************
87 * CheckSocketsReady
89 * Checks both sockets to see which are readable and which are writeable.
90 * We check all these things at the same time since this can be done for
91 * free with ``select.'' The result is stored in global variables, since
92 * this is essentially global state information and several routines need it.
94 ***************************************************************************/
96 int CheckSocketsReady(int withDelayMs)
98 int nreadable,dataWrite=writeableDgrams || writeableAcks;
99 CMK_PIPE_DECL(withDelayMs);
102 #if CMK_USE_KQUEUE && 0
103 // This implementation doesn't yet work, but potentially is much faster
105 /* Only setup the CMK_PIPE structures the first time they are used.
106 This makes the kqueue implementation much faster.
108 static int first = 1;
109 if(first){
110 first = 0;
111 CmiStdoutAdd(CMK_PIPE_SUB);
112 if (Cmi_charmrun_fd!=-1) { CMK_PIPE_ADDREAD(Cmi_charmrun_fd); }
113 else return 0; /* If there's no charmrun, none of this matters. */
114 if (dataskt!=-1) {
115 CMK_PIPE_ADDREAD(dataskt);
116 CMK_PIPE_ADDWRITE(dataskt);
120 #else
121 CmiStdoutAdd(CMK_PIPE_SUB);
122 if (Cmi_charmrun_fd!=-1) { CMK_PIPE_ADDREAD(Cmi_charmrun_fd); }
123 else return 0; /* If there's no charmrun, none of this matters. */
124 if (dataskt!=-1) {
125 { CMK_PIPE_ADDREAD(dataskt); }
126 if (dataWrite)
127 CMK_PIPE_ADDWRITE(dataskt);
129 #endif
131 nreadable=CMK_PIPE_CALL();
132 ctrlskt_ready_read = 0;
133 dataskt_ready_read = 0;
134 dataskt_ready_write = 0;
136 if (nreadable == 0) {
137 MACHSTATE(1,"} CheckSocketsReady (nothing readable)")
138 return nreadable;
140 if (nreadable==-1) {
141 CMK_PIPE_CHECKERR();
142 MACHSTATE(2,"} CheckSocketsReady (INTERRUPTED!)")
143 return CheckSocketsReady(0);
146 CmiStdoutCheck(CMK_PIPE_SUB);
147 if (Cmi_charmrun_fd!=-1)
148 ctrlskt_ready_read = CMK_PIPE_CHECKREAD(Cmi_charmrun_fd);
149 if (dataskt!=-1) {
150 dataskt_ready_read = CMK_PIPE_CHECKREAD(dataskt);
151 if (dataWrite)
152 dataskt_ready_write = CMK_PIPE_CHECKWRITE(dataskt);
154 return nreadable;
157 /***********************************************************************
158 * TransmitAckDatagram
160 * This function sends the ack datagram, after setting the window
161 * array to show which of the datagrams in the current window have been
162 * received. The sending side will then use this information to resend
163 * packets, mark packets as received, etc. This system also prevents
164 * multiple retransmissions/acks when acks are lost.
165 ***********************************************************************/
166 void TransmitAckDatagram(OtherNode node)
168 DgramAck ack; int i, seqno, slot; ExplicitDgram dg;
169 int retval;
171 seqno = node->recv_next;
172 MACHSTATE2(3," TransmitAckDgram [seq %d to 'pe' %d]",seqno,node->nodestart)
173 DgramHeaderMake(&ack, DGRAM_ACKNOWLEDGE, Cmi_nodestart, Cmi_net_magic, seqno, 0);
174 LOG(Cmi_clock, Cmi_nodestart, 'A', node->nodestart, seqno);
175 for (i=0; i<Cmi_window_size; i++) {
176 slot = seqno % Cmi_window_size;
177 dg = node->recv_window[slot];
178 ack.window[i] = (dg && (dg->seqno == seqno));
179 seqno = ((seqno+1) & DGRAM_SEQNO_MASK);
181 memcpy(&ack.window[Cmi_window_size], &(node->send_ack_seqno),
182 sizeof(unsigned int));
183 node->send_ack_seqno = ((node->send_ack_seqno + 1) & DGRAM_SEQNO_MASK);
184 retval = (-1);
185 #ifdef CMK_USE_CHECKSUM
186 DgramHeader *head = (DgramHeader *)(&ack);
187 head->magic ^= computeCheckSum((unsigned char*)&ack, DGRAM_HEADER_SIZE + Cmi_window_size + sizeof(unsigned int));
188 #endif
189 while(retval==(-1))
190 retval = sendto(dataskt, (char *)&ack,
191 DGRAM_HEADER_SIZE + Cmi_window_size + sizeof(unsigned int), 0,
192 (struct sockaddr *)&(node->addr),
193 sizeof(struct sockaddr_in));
194 node->stat_send_ack++;
198 /***********************************************************************
199 * TransmitImplicitDgram
200 * TransmitImplicitDgram1
202 * These functions do the actual work of sending a UDP datagram.
203 ***********************************************************************/
204 void TransmitImplicitDgram(ImplicitDgram dg)
206 char *data; DgramHeader *head; int len; DgramHeader temp;
207 OtherNode dest;
208 int retval;
210 MACHSTATE3(3," TransmitImplicitDgram (%d bytes) [seq %d to 'pe' %d]",
211 dg->datalen,dg->seqno,dg->dest->nodestart)
212 len = dg->datalen;
213 data = dg->dataptr;
214 head = (DgramHeader *)(data - DGRAM_HEADER_SIZE);
215 temp = *head;
216 dest = dg->dest;
217 DgramHeaderMake(head, dg->rank, dg->srcpe, Cmi_net_magic, dg->seqno, dg->broot);
218 #ifdef CMK_USE_CHECKSUM
219 head->magic ^= computeCheckSum((unsigned char*)head, len + DGRAM_HEADER_SIZE);
220 #endif
221 LOG(Cmi_clock, Cmi_nodestart, 'T', dest->nodestart, dg->seqno);
222 retval = (-1);
223 while(retval==(-1))
224 retval = sendto(dataskt, (char *)head, len + DGRAM_HEADER_SIZE, 0,
225 (struct sockaddr *)&(dest->addr), sizeof(struct sockaddr_in));
226 *head = temp;
227 dest->stat_send_pkt++;
230 void TransmitImplicitDgram1(ImplicitDgram dg)
232 char *data; DgramHeader *head; int len; DgramHeader temp;
233 OtherNode dest;
234 int retval;
236 MACHSTATE3(4," RETransmitImplicitDgram (%d bytes) [seq %d to 'pe' %d]",
237 dg->datalen,dg->seqno,dg->dest->nodestart)
238 len = dg->datalen;
239 data = dg->dataptr;
240 head = (DgramHeader *)(data - DGRAM_HEADER_SIZE);
241 temp = *head;
242 dest = dg->dest;
243 DgramHeaderMake(head, dg->rank, dg->srcpe, Cmi_net_magic, dg->seqno, dg->broot);
244 #ifdef CMK_USE_CHECKSUM
245 head->magic ^= computeCheckSum((unsigned char *)head, len + DGRAM_HEADER_SIZE);
246 #endif
247 LOG(Cmi_clock, Cmi_nodestart, 'P', dest->nodestart, dg->seqno);
248 retval = (-1);
249 while (retval == (-1))
250 retval = sendto(dataskt, (char *)head, len + DGRAM_HEADER_SIZE, 0,
251 (struct sockaddr *)&(dest->addr), sizeof(struct sockaddr_in));
252 *head = temp;
253 dest->stat_resend_pkt++;
257 /***********************************************************************
258 * TransmitAcknowledgement
260 * This function sends the ack datagrams, after checking to see if the
261 * Recv Window is atleast half-full. After that, if the Recv window size
262 * is 0, then the count of un-acked datagrams, and the time at which
263 * the ack should be sent is reset.
264 ***********************************************************************/
265 int TransmitAcknowledgement(void)
267 int skip; static int nextnode=0; OtherNode node;
268 for (skip=0; skip<_Cmi_numnodes; skip++) {
269 node = nodes+nextnode;
270 nextnode = (nextnode + 1) % _Cmi_numnodes;
271 if (node->recv_ack_cnt) {
272 if ((node->recv_ack_cnt > Cmi_half_window) ||
273 (Cmi_clock >= node->recv_ack_time)) {
274 TransmitAckDatagram(node);
275 if (node->recv_winsz) {
276 node->recv_ack_cnt = 1;
277 node->recv_ack_time = Cmi_clock + Cmi_ack_delay;
278 } else {
279 node->recv_ack_cnt = 0;
280 node->recv_ack_time = 0.0;
282 return 1;
286 return 0;
290 /***********************************************************************
291 * TransmitDatagram()
293 * This function fills up the Send Window with the contents of the
294 * Send Queue. It also sets the node->send_primer variable, which
295 * indicates when a retransmission will be attempted.
296 ***********************************************************************/
297 int TransmitDatagram(void)
299 ImplicitDgram dg; OtherNode node;
300 static int nextnode=0; int skip, count, slot;
301 unsigned int seqno;
303 for (skip=0; skip<_Cmi_numnodes; skip++) {
304 node = nodes+nextnode;
305 nextnode = (nextnode + 1) % _Cmi_numnodes;
306 dg = node->send_queue_h;
307 if (dg) {
308 seqno = dg->seqno;
309 slot = seqno % Cmi_window_size;
310 if (node->send_window[slot] == 0) {
311 node->send_queue_h = dg->next;
312 node->send_window[slot] = dg;
313 TransmitImplicitDgram(dg);
314 if (seqno == ((node->send_last+1)&DGRAM_SEQNO_MASK))
315 node->send_last = seqno;
316 node->send_primer = Cmi_clock + Cmi_delay_retransmit;
317 return 1;
320 if (Cmi_clock > node->send_primer) {
321 slot = (node->send_last % Cmi_window_size);
322 for (count=0; count<Cmi_window_size; count++) {
323 dg = node->send_window[slot];
324 if (dg) break;
325 slot = ((slot+Cmi_window_size-1) % Cmi_window_size);
327 if (dg) {
328 TransmitImplicitDgram1(node->send_window[slot]);
329 node->send_primer = Cmi_clock + Cmi_delay_retransmit;
330 return 1;
334 return 0;
337 /***********************************************************************
338 * EnqueOutgoingDgram()
340 * This function enqueues the datagrams onto the Send queue of the
341 * sender, after setting appropriate data values into each of the
342 * datagrams.
343 ***********************************************************************/
344 void EnqueueOutgoingDgram
345 (OutgoingMsg ogm, char *ptr, int len, OtherNode node, int rank, int broot)
347 int seqno, dst, src; ImplicitDgram dg;
348 src = ogm->src;
349 dst = ogm->dst;
350 seqno = node->send_next;
351 node->send_next = ((seqno+1)&DGRAM_SEQNO_MASK);
352 MallocImplicitDgram(dg);
353 dg->dest = node;
354 dg->srcpe = src;
355 dg->rank = rank;
356 dg->seqno = seqno;
357 dg->broot = broot;
358 dg->dataptr = ptr;
359 dg->datalen = len;
360 dg->ogm = ogm;
361 ogm->refcount++;
362 dg->next = 0;
363 if (node->send_queue_h == 0) {
364 node->send_queue_h = dg;
365 node->send_queue_t = dg;
366 } else {
367 node->send_queue_t->next = dg;
368 node->send_queue_t = dg;
373 /***********************************************************************
374 * DeliverViaNetwork()
376 * This function is responsible for all non-local transmission. This
377 * function takes the outgoing messages, splits it into datagrams and
378 * enqueues them into the Send Queue.
379 ***********************************************************************/
380 void DeliverViaNetwork(OutgoingMsg ogm, OtherNode node, int rank, unsigned int broot, int copy)
382 int size; char *data;
383 OtherNode myNode = nodes+CmiMyNode();
385 MACHSTATE2(3,"DeliverViaNetwork %d-byte message to pe %d",
386 ogm->size,node->nodestart+rank);
387 size = ogm->size - DGRAM_HEADER_SIZE;
388 data = ogm->data + DGRAM_HEADER_SIZE;
389 writeableDgrams++;
390 while (size > Cmi_dgram_max_data) {
391 EnqueueOutgoingDgram(ogm, data, Cmi_dgram_max_data, node, rank, broot);
392 data += Cmi_dgram_max_data;
393 size -= Cmi_dgram_max_data;
395 EnqueueOutgoingDgram(ogm, data, size, node, rank, broot);
397 myNode->sent_msgs++;
398 myNode->sent_bytes += ogm->size;
399 /*Try to immediately send the packets off*/
400 writeableDgrams=1;
404 /***********************************************************************
405 * AssembleDatagram()
407 * This function does the actual assembly of datagrams into a
408 * message. node->asm_msg holds the current message being
409 * assembled. Once the message assemble is complete (known by checking
410 * if the total number of datagrams is equal to the number of datagrams
411 * constituting the assembled message), the message is pushed into the
412 * Producer-Consumer queue
413 ***********************************************************************/
414 void AssembleDatagram(OtherNode node, ExplicitDgram dg)
416 int i;
417 unsigned int size; char *msg;
418 OtherNode myNode = nodes+CmiMyNode();
420 MACHSTATE3(2," AssembleDatagram [seq %d from 'pe' %d, packet len %d]",
421 dg->seqno,node->nodestart,dg->len)
422 LOG(Cmi_clock, Cmi_nodestart, 'X', dg->srcpe, dg->seqno);
423 msg = node->asm_msg;
424 if (msg == 0) {
425 size = CmiMsgHeaderGetLength(dg->data);
426 MACHSTATE3(4," Assemble new datagram seq %d from 'pe' %d, len %d",
427 dg->seqno,node->nodestart,size)
428 msg = (char *)CmiAlloc(size);
429 if (!msg)
430 fprintf(stderr, "%d: Out of mem\n", _Cmi_mynode);
431 if (size < dg->len) KillEveryoneCode(4559312);
432 #if CMK_CHARMDEBUG
433 setMemoryTypeMessage(msg);
434 #endif
435 memcpy(msg, (char*)(dg->data), dg->len);
436 node->asm_rank = dg->rank;
437 node->asm_total = size;
438 node->asm_fill = dg->len;
439 node->asm_msg = msg;
440 } else {
441 size = dg->len - DGRAM_HEADER_SIZE;
442 memcpy(msg + node->asm_fill, ((char*)(dg->data))+DGRAM_HEADER_SIZE, size);
443 node->asm_fill += size;
445 MACHSTATE3(2," AssembleDatagram: now have %d of %d bytes from %d",
446 node->asm_fill, node->asm_total, node->nodestart)
447 if (node->asm_fill > node->asm_total) {
448 fprintf(stderr, "\n\n\t\tLength mismatch!!\n\n");
449 fflush(stderr);
450 MACHSTATE4(5,"Length mismatch seq %d, from 'pe' %d, fill %d, total %d\n", dg->seqno,node->nodestart,node->asm_fill,node->asm_total)
451 KillEveryoneCode(4559313);
453 if (node->asm_fill == node->asm_total) {
454 /* spanning tree broadcast - send first to avoid invalid msg ptr */
455 #if CMK_BROADCAST_SPANNING_TREE
456 if (node->asm_rank == DGRAM_BROADCAST
457 #if CMK_NODE_QUEUE_AVAILABLE
458 || node->asm_rank == DGRAM_NODEBROADCAST
459 #endif
461 SendSpanningChildren(NULL, 0, node->asm_total, msg, dg->broot, dg->rank);
462 #elif CMK_BROADCAST_HYPERCUBE
463 if (node->asm_rank == DGRAM_BROADCAST
464 #if CMK_NODE_QUEUE_AVAILABLE
465 || node->asm_rank == DGRAM_NODEBROADCAST
466 #endif
468 SendHypercube(NULL, 0, node->asm_total, msg, dg->broot, dg->rank);
469 #endif
470 if (node->asm_rank == DGRAM_BROADCAST) {
471 int len = node->asm_total;
472 for (i=1; i<_Cmi_mynodesize; i++)
473 CmiPushPE(i, CopyMsg(msg, len));
474 CmiPushPE(0, msg);
475 } else {
476 #if CMK_NODE_QUEUE_AVAILABLE
477 if (node->asm_rank==DGRAM_NODEMESSAGE ||
478 node->asm_rank==DGRAM_NODEBROADCAST)
480 CmiPushNode(msg);
482 else
483 #endif
484 CmiPushPE(node->asm_rank, msg);
486 node->asm_msg = 0;
487 myNode->recd_msgs++;
488 myNode->recd_bytes += node->asm_total;
490 FreeExplicitDgram(dg);
494 /***********************************************************************
495 * AssembleReceivedDatagrams()
497 * This function assembles the datagrams received so far, into a
498 * single message. This also results in part of the Receive Window being
499 * freed.
500 ***********************************************************************/
501 void AssembleReceivedDatagrams(OtherNode node)
503 unsigned int next, slot; ExplicitDgram dg;
504 next = node->recv_next;
505 while (1) {
506 slot = (next % Cmi_window_size);
507 dg = node->recv_window[slot];
508 if (dg == 0) break;
509 AssembleDatagram(node, dg);
510 node->recv_window[slot] = 0;
511 node->recv_winsz--;
512 next = ((next + 1) & DGRAM_SEQNO_MASK);
514 node->recv_next = next;
520 /************************************************************************
521 * IntegrateMessageDatagram()
523 * This function integrates the received datagrams. It first
524 * increments the count of un-acked datagrams. (This is to aid the
525 * heuristic that an ack should be sent when the Receive window is half
526 * full). If the current datagram is the first missing packet, then this
527 * means that the datagram that was missing in the incomplete sequence
528 * of datagrams so far, has arrived, and hence the datagrams can be
529 * assembled.
530 ************************************************************************/
532 void IntegrateMessageDatagram(ExplicitDgram dg)
534 int seqno;
535 unsigned int slot; OtherNode node;
537 LOG(Cmi_clock, Cmi_nodestart, 'M', dg->srcpe, dg->seqno);
538 MACHSTATE2(2," IntegrateMessageDatagram [seq %d from pe %d]", dg->seqno,dg->srcpe)
540 node = nodes_by_pe[dg->srcpe];
541 node->stat_recv_pkt++;
542 seqno = dg->seqno;
543 writeableAcks=1;
544 node->recv_ack_cnt++;
545 if (node->recv_ack_time == 0.0)
546 node->recv_ack_time = Cmi_clock + Cmi_ack_delay;
547 if (((seqno - node->recv_next) & DGRAM_SEQNO_MASK) < Cmi_window_size) {
548 slot = (seqno % Cmi_window_size);
549 if (node->recv_window[slot] == 0) {
550 node->recv_window[slot] = dg;
551 node->recv_winsz++;
552 if (seqno == node->recv_next)
553 AssembleReceivedDatagrams(node);
554 if (seqno > node->recv_expect)
555 node->recv_ack_time = 0.0;
556 if (seqno >= node->recv_expect)
557 node->recv_expect = ((seqno+1)&DGRAM_SEQNO_MASK);
558 LOG(Cmi_clock, Cmi_nodestart, 'Y', node->recv_next, dg->seqno);
559 return;
562 LOG(Cmi_clock, Cmi_nodestart, 'y', node->recv_next, dg->seqno);
563 FreeExplicitDgram(dg);
568 /***********************************************************************
569 * IntegrateAckDatagram()
571 * This function is called on the message sending side, on receipt of
572 * an ack for a message that it sent. Since messages and acks could be
573 * lost, our protocol works in such a way that acks for higher sequence
574 * numbered packets act as implict acks for lower sequence numbered
575 * packets, in case the acks for the lower sequence numbered packets
576 * were lost.
578 * Recall that the Send and Receive windows are circular queues, and the
579 * sequence numbers of the packets (datagrams) are monotically
580 * increasing. Hence it is important to know for which sequence number
581 * the ack is for, and to correspodinly relate that to tha actual packet
582 * sitting in the Send window. Since every 20th packet occupies the same
583 * slot in the windows, a number of sanity checks are required for our
584 * protocol to work.
585 * 1. If the ack number (first missing packet sequence number) is less
586 * than the last ack number received then this ack can be ignored.
588 * 2. The last ack number received must be set to the current ack
589 * sequence number (This is done only if 1. is not true).
591 * 3. Now the whole Send window is examined, in a kind of reverse
592 * order. The check starts from a sequence number = 20 + the first
593 * missing packet's sequence number. For each of these sequence numbers,
594 * the slot in the Send window is checked for existence of a datagram
595 * that should have been sent. If there is no datagram, then the search
596 * advances. If there is a datagram, then the sequence number of that is
597 * checked with the expected sequence number for the current iteration
598 * (This is decremented in each iteration of the loop).
600 * If the sequence numbers do not match, then checks are made (for
601 * the unlikely scenarios where the current slot sequence number is
602 * equal to the first missing packet's sequence number, and where
603 * somehow, packets which have greater sequence numbers than allowed for
604 * the current window)
606 * If the sequence numbers DO match, then the flag 'rxing' is
607 * checked. The semantics for this flag is that : If any packet with a
608 * greater sequence number than the current packet (and hence in the
609 * previous iteration of the for loop) has been acked, then the 'rxing'
610 * flag is set to 1, to imply that all the packets of lower sequence
611 * number, for which the ack->window[] element does not indicate that the
612 * packet has been received, must be retransmitted.
614 ***********************************************************************/
616 void IntegrateAckDatagram(ExplicitDgram dg)
618 OtherNode node; DgramAck *ack; ImplicitDgram idg;
619 int i; unsigned int slot, rxing, dgseqno, seqno, ackseqno;
620 int diff;
621 unsigned int tmp;
623 node = nodes_by_pe[dg->srcpe];
624 ack = ((DgramAck*)(dg->data));
625 memcpy(&ackseqno, &(ack->window[Cmi_window_size]), sizeof(unsigned int));
626 dgseqno = dg->seqno;
627 seqno = (dgseqno + Cmi_window_size) & DGRAM_SEQNO_MASK;
628 slot = seqno % Cmi_window_size;
629 rxing = 0;
630 node->stat_recv_ack++;
631 LOG(Cmi_clock, Cmi_nodestart, 'R', node->nodestart, dg->seqno);
633 tmp = node->recv_ack_seqno;
634 /* check that the ack being received is actually appropriate */
635 if ( !((node->recv_ack_seqno >=
636 ((DGRAM_SEQNO_MASK >> 1) + (DGRAM_SEQNO_MASK >> 2))) &&
637 (ackseqno < (DGRAM_SEQNO_MASK >> 1))) &&
638 (ackseqno <= node->recv_ack_seqno))
640 FreeExplicitDgram(dg);
641 return;
643 /* higher ack so adjust */
644 node->recv_ack_seqno = ackseqno;
645 writeableDgrams=1; /* May have freed up some send slots */
647 for (i=Cmi_window_size-1; i>=0; i--) {
648 slot--; if (slot== ((unsigned int)-1)) slot+=Cmi_window_size;
649 seqno = (seqno-1) & DGRAM_SEQNO_MASK;
650 idg = node->send_window[slot];
651 if (idg) {
652 if (idg->seqno == seqno) {
653 if (ack->window[i]) {
654 /* remove those that have been received and are within a window
655 of the first missing packet */
656 node->stat_ack_pkts++;
657 LOG(Cmi_clock, Cmi_nodestart, 'r', node->nodestart, seqno);
658 node->send_window[slot] = 0;
659 DiscardImplicitDgram(idg);
660 rxing = 1;
661 } else if (rxing) {
662 node->send_window[slot] = 0;
663 idg->next = node->send_queue_h;
664 if (node->send_queue_h == 0) {
665 node->send_queue_t = idg;
667 node->send_queue_h = idg;
669 } else {
670 diff = dgseqno >= idg->seqno ?
671 ((dgseqno - idg->seqno) & DGRAM_SEQNO_MASK) :
672 ((dgseqno + (DGRAM_SEQNO_MASK - idg->seqno) + 1) & DGRAM_SEQNO_MASK);
674 if ((diff <= 0) || (diff > Cmi_window_size))
676 continue;
679 /* if ack is really less than our packet seq (consider wrap around) */
680 if (dgseqno < idg->seqno && (idg->seqno - dgseqno <= Cmi_window_size))
682 continue;
684 if (dgseqno == idg->seqno)
686 continue;
688 node->stat_ack_pkts++;
689 LOG(Cmi_clock, Cmi_nodestart, 'o', node->nodestart, idg->seqno);
690 node->send_window[slot] = 0;
691 DiscardImplicitDgram(idg);
695 FreeExplicitDgram(dg);
698 void ReceiveDatagram(void)
700 ExplicitDgram dg; int ok, magic;
701 MACHLOCK_ASSERT(comm_flag,"ReceiveDatagram")
702 MallocExplicitDgram(dg);
703 ok = recv(dataskt,(char*)(dg->data),Cmi_max_dgram_size,0);
704 /*ok = recvfrom(dataskt,(char*)(dg->data),Cmi_max_dgram_size,0, 0, 0);*/
705 /* if (ok<0) { perror("recv"); KillEveryoneCode(37489437); } */
706 if (ok < 0) {
707 MACHSTATE1(4," recv dgram failed (errno=%d)",errno)
708 FreeExplicitDgram(dg);
709 if (errno == EINTR) return; /* A SIGIO interrupted the receive */
710 if (errno == EAGAIN) return; /* Just try again later */
711 #if !defined(_WIN32)
712 if (errno == EWOULDBLOCK) return; /* No more messages on that socket. */
713 if (errno == ECONNREFUSED) return; /* A "Host unreachable" ICMP packet came in */
714 #endif
715 CmiPrintf("ReceiveDatagram: recv: %s(%d)\n", strerror(errno), errno) ;
716 KillEveryoneCode(37489437);
718 dg->len = ok;
719 #ifdef CMK_RANDOMLY_CORRUPT_MESSAGES
720 /* randomly corrupt data and ack datagrams */
721 randomCorrupt((char*)dg->data, dg->len);
722 #endif
724 if (ok >= DGRAM_HEADER_SIZE) {
725 DgramHeaderBreak(dg->data, dg->rank, dg->srcpe, magic, dg->seqno, dg->broot);
726 MACHSTATE3(2," recv dgram [seq %d, for rank %d, from pe %d]",
727 dg->seqno,dg->rank,dg->srcpe)
728 #ifdef CMK_USE_CHECKSUM
729 if (computeCheckSum((unsigned char*)dg->data, dg->len) == 0)
730 #else
731 if (magic == (Cmi_net_magic&DGRAM_MAGIC_MASK))
732 #endif
734 if (dg->rank == DGRAM_ACKNOWLEDGE)
735 IntegrateAckDatagram(dg);
736 else IntegrateMessageDatagram(dg);
737 } else FreeExplicitDgram(dg);
738 } else {
739 MACHSTATE1(4," recv dgram failed (len=%d)",ok)
740 FreeExplicitDgram(dg);
745 /***********************************************************************
746 * CommunicationServer()
748 * This function does the scheduling of the tasks related to the
749 * message sends and receives. It is called from the CmiGeneralSend()
750 * function, and periodically from the CommunicationInterrupt() (in case
751 * of the single processor version), and from the comm_thread (for the
752 * SMP version). Based on which of the data/control read/write sockets
753 * are ready, the corresponding tasks are called
755 ***********************************************************************/
756 void CmiHandleImmediate(void);
758 static void CommunicationServer(int sleepTime, int where)
760 unsigned int nTimes=0; /* Loop counter */
761 LOG(GetClock(), Cmi_nodestart, 'I', 0, 0);
762 MACHSTATE2(1,"CommunicationsServer(%d,%d)",
763 sleepTime,writeableAcks||writeableDgrams)
764 #if !CMK_SHARED_VARS_UNAVAILABLE /*SMP mode: comm. lock is precious*/
765 if (sleepTime!=0) {/*Sleep *without* holding the comm. lock*/
766 MACHSTATE(1,"CommServer going to sleep (NO LOCK)");
767 if (CheckSocketsReady(sleepTime)<=0) {
768 MACHSTATE(1,"CommServer finished without anything happening.");
771 sleepTime=0;
772 #endif
773 CmiCommLock();
774 inProgress[CmiMyRank()] += 1;
775 /* in netpoll mode, only perform service to stdout */
776 if (Cmi_netpoll && where == COMM_SERVER_FROM_INTERRUPT) {
777 if (CmiStdoutNeedsService()) {CmiStdoutService();}
778 CmiCommUnlock();
779 inProgress[CmiMyRank()] -= 1;
780 return;
782 CommunicationsClock();
783 /*Don't sleep if a signal has stored messages for us*/
784 if (sleepTime&&CmiGetState()->idle.hasMessages) sleepTime=0;
785 while (CheckSocketsReady(sleepTime)>0) {
786 int again=0;
787 MACHSTATE(2,"CheckSocketsReady returned true");
788 sleepTime=0;
789 if (ctrlskt_ready_read) {again=1;ctrl_getone();}
790 if (dataskt_ready_read) {again=1;ReceiveDatagram();}
791 if (dataskt_ready_write) {
792 if (writeableAcks)
793 if (0!=(writeableAcks=TransmitAcknowledgement())) again=1;
794 if (writeableDgrams)
795 if (0!=(writeableDgrams=TransmitDatagram())) again=1;
797 if (CmiStdoutNeedsService()) {CmiStdoutService();}
798 if (!again) break; /* Nothing more to do */
799 if ((nTimes++ &16)==15) {
800 /*We just grabbed a whole pile of packets-- try to retire a few*/
801 CommunicationsClock();
804 CmiCommUnlock();
805 inProgress[CmiMyRank()] -= 1;
807 /* when called by communication thread or in interrupt */
808 if (where == COMM_SERVER_FROM_SMP || where == COMM_SERVER_FROM_INTERRUPT) {
809 #if CMK_IMMEDIATE_MSG
810 CmiHandleImmediate();
811 #endif
812 #if CMK_PERSISTENT_COMM
813 PumpPersistent();
814 #endif
817 MACHSTATE(1,"} CommunicationServer")
820 void CmiMachineInit(char **argv)
824 void CmiCommunicationInit(char **argv)
828 void CmiMachineExit(void)