2 * UDP implementation of Converse NET version
4 * contains only UDP specific code for:
6 * - CmiCommunicationInit()
8 * - DeliverViaNetwork()
9 * - CommunicationServer()
11 moved from machine.c by
12 Gengbin Zheng, gzheng@uiuc.edu 4/22/2001
20 /******************************************************************************
22 * CmiNotifyIdle()-- wait until a packet comes in
24 *****************************************************************************/
27 int sleepMs
; /*Milliseconds to sleep while idle*/
28 int nIdles
; /*Number of times we've been idle in a row*/
29 CmiState cs
; /*Machine state*/
32 static CmiIdleState
*CmiNotifyGetState(void)
34 CmiIdleState
*s
=(CmiIdleState
*)malloc(sizeof(CmiIdleState
));
41 static void CmiNotifyBeginIdle(CmiIdleState
*s
)
46 MACHSTATE(3,"begin idle")
49 static void CmiNotifyStillIdle(CmiIdleState
*s
)
51 #if CMK_SHARED_VARS_UNAVAILABLE
52 /*No comm. thread-- listen on sockets for incoming messages*/
53 MACHSTATE(1,"idle commserver {")
54 CommunicationServer(Cmi_idlepoll
?0:10, COMM_SERVER_FROM_SMP
);
55 MACHSTATE(1,"} idle commserver")
57 #if CMK_SHARED_VARS_POSIX_THREADS_SMP
58 if(_Cmi_sleepOnIdle
){
60 int nSpins
=20; /*Number of times to spin before sleeping*/
62 if (s
->nIdles
>nSpins
) { /*Start giving some time back to the OS*/
64 if (s
->sleepMs
>10) s
->sleepMs
=10;
66 /*Comm. thread will listen on sockets-- just sleep*/
68 MACHSTATE1(3,"idle lock(%d) {",CmiMyPe())
69 CmiIdleLock_sleep(&s
->cs
->idle
,s
->sleepMs
);
70 CsdResetPeriodic(); /* check ccd callbacks when I am awakened */
71 MACHSTATE1(3,"} idle lock(%d)",CmiMyPe())
73 #if CMK_SHARED_VARS_POSIX_THREADS_SMP
79 void CmiNotifyIdle(void) {
82 CmiNotifyStillIdle(&s
);
85 /****************************************************************************
89 * Checks both sockets to see which are readable and which are writeable.
90 * We check all these things at the same time since this can be done for
91 * free with ``select.'' The result is stored in global variables, since
92 * this is essentially global state information and several routines need it.
94 ***************************************************************************/
96 int CheckSocketsReady(int withDelayMs
)
98 int nreadable
,dataWrite
=writeableDgrams
|| writeableAcks
;
99 CMK_PIPE_DECL(withDelayMs
);
102 #if CMK_USE_KQUEUE && 0
103 // This implementation doesn't yet work, but potentially is much faster
105 /* Only setup the CMK_PIPE structures the first time they are used.
106 This makes the kqueue implementation much faster.
108 static int first
= 1;
111 CmiStdoutAdd(CMK_PIPE_SUB
);
112 if (Cmi_charmrun_fd
!=-1) { CMK_PIPE_ADDREAD(Cmi_charmrun_fd
); }
113 else return 0; /* If there's no charmrun, none of this matters. */
115 CMK_PIPE_ADDREAD(dataskt
);
116 CMK_PIPE_ADDWRITE(dataskt
);
121 CmiStdoutAdd(CMK_PIPE_SUB
);
122 if (Cmi_charmrun_fd
!=-1) { CMK_PIPE_ADDREAD(Cmi_charmrun_fd
); }
123 else return 0; /* If there's no charmrun, none of this matters. */
125 { CMK_PIPE_ADDREAD(dataskt
); }
127 CMK_PIPE_ADDWRITE(dataskt
);
131 nreadable
=CMK_PIPE_CALL();
132 ctrlskt_ready_read
= 0;
133 dataskt_ready_read
= 0;
134 dataskt_ready_write
= 0;
136 if (nreadable
== 0) {
137 MACHSTATE(1,"} CheckSocketsReady (nothing readable)")
142 MACHSTATE(2,"} CheckSocketsReady (INTERRUPTED!)")
143 return CheckSocketsReady(0);
146 CmiStdoutCheck(CMK_PIPE_SUB
);
147 if (Cmi_charmrun_fd
!=-1)
148 ctrlskt_ready_read
= CMK_PIPE_CHECKREAD(Cmi_charmrun_fd
);
150 dataskt_ready_read
= CMK_PIPE_CHECKREAD(dataskt
);
152 dataskt_ready_write
= CMK_PIPE_CHECKWRITE(dataskt
);
157 /***********************************************************************
158 * TransmitAckDatagram
160 * This function sends the ack datagram, after setting the window
161 * array to show which of the datagrams in the current window have been
162 * received. The sending side will then use this information to resend
163 * packets, mark packets as received, etc. This system also prevents
164 * multiple retransmissions/acks when acks are lost.
165 ***********************************************************************/
166 void TransmitAckDatagram(OtherNode node
)
168 DgramAck ack
; int i
, seqno
, slot
; ExplicitDgram dg
;
171 seqno
= node
->recv_next
;
172 MACHSTATE2(3," TransmitAckDgram [seq %d to 'pe' %d]",seqno
,node
->nodestart
)
173 DgramHeaderMake(&ack
, DGRAM_ACKNOWLEDGE
, Cmi_nodestart
, Cmi_net_magic
, seqno
, 0);
174 LOG(Cmi_clock
, Cmi_nodestart
, 'A', node
->nodestart
, seqno
);
175 for (i
=0; i
<Cmi_window_size
; i
++) {
176 slot
= seqno
% Cmi_window_size
;
177 dg
= node
->recv_window
[slot
];
178 ack
.window
[i
] = (dg
&& (dg
->seqno
== seqno
));
179 seqno
= ((seqno
+1) & DGRAM_SEQNO_MASK
);
181 memcpy(&ack
.window
[Cmi_window_size
], &(node
->send_ack_seqno
),
182 sizeof(unsigned int));
183 node
->send_ack_seqno
= ((node
->send_ack_seqno
+ 1) & DGRAM_SEQNO_MASK
);
185 #ifdef CMK_USE_CHECKSUM
186 DgramHeader
*head
= (DgramHeader
*)(&ack
);
187 head
->magic
^= computeCheckSum((unsigned char*)&ack
, DGRAM_HEADER_SIZE
+ Cmi_window_size
+ sizeof(unsigned int));
190 retval
= sendto(dataskt
, (char *)&ack
,
191 DGRAM_HEADER_SIZE
+ Cmi_window_size
+ sizeof(unsigned int), 0,
192 (struct sockaddr
*)&(node
->addr
),
193 sizeof(struct sockaddr_in
));
194 node
->stat_send_ack
++;
198 /***********************************************************************
199 * TransmitImplicitDgram
200 * TransmitImplicitDgram1
202 * These functions do the actual work of sending a UDP datagram.
203 ***********************************************************************/
204 void TransmitImplicitDgram(ImplicitDgram dg
)
206 char *data
; DgramHeader
*head
; int len
; DgramHeader temp
;
210 MACHSTATE3(3," TransmitImplicitDgram (%d bytes) [seq %d to 'pe' %d]",
211 dg
->datalen
,dg
->seqno
,dg
->dest
->nodestart
)
214 head
= (DgramHeader
*)(data
- DGRAM_HEADER_SIZE
);
217 DgramHeaderMake(head
, dg
->rank
, dg
->srcpe
, Cmi_net_magic
, dg
->seqno
, dg
->broot
);
218 #ifdef CMK_USE_CHECKSUM
219 head
->magic
^= computeCheckSum((unsigned char*)head
, len
+ DGRAM_HEADER_SIZE
);
221 LOG(Cmi_clock
, Cmi_nodestart
, 'T', dest
->nodestart
, dg
->seqno
);
224 retval
= sendto(dataskt
, (char *)head
, len
+ DGRAM_HEADER_SIZE
, 0,
225 (struct sockaddr
*)&(dest
->addr
), sizeof(struct sockaddr_in
));
227 dest
->stat_send_pkt
++;
230 void TransmitImplicitDgram1(ImplicitDgram dg
)
232 char *data
; DgramHeader
*head
; int len
; DgramHeader temp
;
236 MACHSTATE3(4," RETransmitImplicitDgram (%d bytes) [seq %d to 'pe' %d]",
237 dg
->datalen
,dg
->seqno
,dg
->dest
->nodestart
)
240 head
= (DgramHeader
*)(data
- DGRAM_HEADER_SIZE
);
243 DgramHeaderMake(head
, dg
->rank
, dg
->srcpe
, Cmi_net_magic
, dg
->seqno
, dg
->broot
);
244 #ifdef CMK_USE_CHECKSUM
245 head
->magic
^= computeCheckSum((unsigned char *)head
, len
+ DGRAM_HEADER_SIZE
);
247 LOG(Cmi_clock
, Cmi_nodestart
, 'P', dest
->nodestart
, dg
->seqno
);
249 while (retval
== (-1))
250 retval
= sendto(dataskt
, (char *)head
, len
+ DGRAM_HEADER_SIZE
, 0,
251 (struct sockaddr
*)&(dest
->addr
), sizeof(struct sockaddr_in
));
253 dest
->stat_resend_pkt
++;
257 /***********************************************************************
258 * TransmitAcknowledgement
260 * This function sends the ack datagrams, after checking to see if the
261 * Recv Window is atleast half-full. After that, if the Recv window size
262 * is 0, then the count of un-acked datagrams, and the time at which
263 * the ack should be sent is reset.
264 ***********************************************************************/
265 int TransmitAcknowledgement(void)
267 int skip
; static int nextnode
=0; OtherNode node
;
268 for (skip
=0; skip
<_Cmi_numnodes
; skip
++) {
269 node
= nodes
+nextnode
;
270 nextnode
= (nextnode
+ 1) % _Cmi_numnodes
;
271 if (node
->recv_ack_cnt
) {
272 if ((node
->recv_ack_cnt
> Cmi_half_window
) ||
273 (Cmi_clock
>= node
->recv_ack_time
)) {
274 TransmitAckDatagram(node
);
275 if (node
->recv_winsz
) {
276 node
->recv_ack_cnt
= 1;
277 node
->recv_ack_time
= Cmi_clock
+ Cmi_ack_delay
;
279 node
->recv_ack_cnt
= 0;
280 node
->recv_ack_time
= 0.0;
290 /***********************************************************************
293 * This function fills up the Send Window with the contents of the
294 * Send Queue. It also sets the node->send_primer variable, which
295 * indicates when a retransmission will be attempted.
296 ***********************************************************************/
297 int TransmitDatagram(void)
299 ImplicitDgram dg
; OtherNode node
;
300 static int nextnode
=0; int skip
, count
, slot
;
303 for (skip
=0; skip
<_Cmi_numnodes
; skip
++) {
304 node
= nodes
+nextnode
;
305 nextnode
= (nextnode
+ 1) % _Cmi_numnodes
;
306 dg
= node
->send_queue_h
;
309 slot
= seqno
% Cmi_window_size
;
310 if (node
->send_window
[slot
] == 0) {
311 node
->send_queue_h
= dg
->next
;
312 node
->send_window
[slot
] = dg
;
313 TransmitImplicitDgram(dg
);
314 if (seqno
== ((node
->send_last
+1)&DGRAM_SEQNO_MASK
))
315 node
->send_last
= seqno
;
316 node
->send_primer
= Cmi_clock
+ Cmi_delay_retransmit
;
320 if (Cmi_clock
> node
->send_primer
) {
321 slot
= (node
->send_last
% Cmi_window_size
);
322 for (count
=0; count
<Cmi_window_size
; count
++) {
323 dg
= node
->send_window
[slot
];
325 slot
= ((slot
+Cmi_window_size
-1) % Cmi_window_size
);
328 TransmitImplicitDgram1(node
->send_window
[slot
]);
329 node
->send_primer
= Cmi_clock
+ Cmi_delay_retransmit
;
337 /***********************************************************************
338 * EnqueOutgoingDgram()
340 * This function enqueues the datagrams onto the Send queue of the
341 * sender, after setting appropriate data values into each of the
343 ***********************************************************************/
344 void EnqueueOutgoingDgram
345 (OutgoingMsg ogm
, char *ptr
, int len
, OtherNode node
, int rank
, int broot
)
347 int seqno
, dst
, src
; ImplicitDgram dg
;
350 seqno
= node
->send_next
;
351 node
->send_next
= ((seqno
+1)&DGRAM_SEQNO_MASK
);
352 MallocImplicitDgram(dg
);
363 if (node
->send_queue_h
== 0) {
364 node
->send_queue_h
= dg
;
365 node
->send_queue_t
= dg
;
367 node
->send_queue_t
->next
= dg
;
368 node
->send_queue_t
= dg
;
373 /***********************************************************************
374 * DeliverViaNetwork()
376 * This function is responsible for all non-local transmission. This
377 * function takes the outgoing messages, splits it into datagrams and
378 * enqueues them into the Send Queue.
379 ***********************************************************************/
380 void DeliverViaNetwork(OutgoingMsg ogm
, OtherNode node
, int rank
, unsigned int broot
, int copy
)
382 int size
; char *data
;
383 OtherNode myNode
= nodes
+CmiMyNode();
385 MACHSTATE2(3,"DeliverViaNetwork %d-byte message to pe %d",
386 ogm
->size
,node
->nodestart
+rank
);
387 size
= ogm
->size
- DGRAM_HEADER_SIZE
;
388 data
= ogm
->data
+ DGRAM_HEADER_SIZE
;
390 while (size
> Cmi_dgram_max_data
) {
391 EnqueueOutgoingDgram(ogm
, data
, Cmi_dgram_max_data
, node
, rank
, broot
);
392 data
+= Cmi_dgram_max_data
;
393 size
-= Cmi_dgram_max_data
;
395 EnqueueOutgoingDgram(ogm
, data
, size
, node
, rank
, broot
);
398 myNode
->sent_bytes
+= ogm
->size
;
399 /*Try to immediately send the packets off*/
404 /***********************************************************************
407 * This function does the actual assembly of datagrams into a
408 * message. node->asm_msg holds the current message being
409 * assembled. Once the message assemble is complete (known by checking
410 * if the total number of datagrams is equal to the number of datagrams
411 * constituting the assembled message), the message is pushed into the
412 * Producer-Consumer queue
413 ***********************************************************************/
414 void AssembleDatagram(OtherNode node
, ExplicitDgram dg
)
417 unsigned int size
; char *msg
;
418 OtherNode myNode
= nodes
+CmiMyNode();
420 MACHSTATE3(2," AssembleDatagram [seq %d from 'pe' %d, packet len %d]",
421 dg
->seqno
,node
->nodestart
,dg
->len
)
422 LOG(Cmi_clock
, Cmi_nodestart
, 'X', dg
->srcpe
, dg
->seqno
);
425 size
= CmiMsgHeaderGetLength(dg
->data
);
426 MACHSTATE3(4," Assemble new datagram seq %d from 'pe' %d, len %d",
427 dg
->seqno
,node
->nodestart
,size
)
428 msg
= (char *)CmiAlloc(size
);
430 fprintf(stderr
, "%d: Out of mem\n", _Cmi_mynode
);
431 if (size
< dg
->len
) KillEveryoneCode(4559312);
433 setMemoryTypeMessage(msg
);
435 memcpy(msg
, (char*)(dg
->data
), dg
->len
);
436 node
->asm_rank
= dg
->rank
;
437 node
->asm_total
= size
;
438 node
->asm_fill
= dg
->len
;
441 size
= dg
->len
- DGRAM_HEADER_SIZE
;
442 memcpy(msg
+ node
->asm_fill
, ((char*)(dg
->data
))+DGRAM_HEADER_SIZE
, size
);
443 node
->asm_fill
+= size
;
445 MACHSTATE3(2," AssembleDatagram: now have %d of %d bytes from %d",
446 node
->asm_fill
, node
->asm_total
, node
->nodestart
)
447 if (node
->asm_fill
> node
->asm_total
) {
448 fprintf(stderr
, "\n\n\t\tLength mismatch!!\n\n");
450 MACHSTATE4(5,"Length mismatch seq %d, from 'pe' %d, fill %d, total %d\n", dg
->seqno
,node
->nodestart
,node
->asm_fill
,node
->asm_total
)
451 KillEveryoneCode(4559313);
453 if (node
->asm_fill
== node
->asm_total
) {
454 /* spanning tree broadcast - send first to avoid invalid msg ptr */
455 #if CMK_BROADCAST_SPANNING_TREE
456 if (node
->asm_rank
== DGRAM_BROADCAST
457 #if CMK_NODE_QUEUE_AVAILABLE
458 || node
->asm_rank
== DGRAM_NODEBROADCAST
461 SendSpanningChildren(NULL
, 0, node
->asm_total
, msg
, dg
->broot
, dg
->rank
);
462 #elif CMK_BROADCAST_HYPERCUBE
463 if (node
->asm_rank
== DGRAM_BROADCAST
464 #if CMK_NODE_QUEUE_AVAILABLE
465 || node
->asm_rank
== DGRAM_NODEBROADCAST
468 SendHypercube(NULL
, 0, node
->asm_total
, msg
, dg
->broot
, dg
->rank
);
470 if (node
->asm_rank
== DGRAM_BROADCAST
) {
471 int len
= node
->asm_total
;
472 for (i
=1; i
<_Cmi_mynodesize
; i
++)
473 CmiPushPE(i
, CopyMsg(msg
, len
));
476 #if CMK_NODE_QUEUE_AVAILABLE
477 if (node
->asm_rank
==DGRAM_NODEMESSAGE
||
478 node
->asm_rank
==DGRAM_NODEBROADCAST
)
484 CmiPushPE(node
->asm_rank
, msg
);
488 myNode
->recd_bytes
+= node
->asm_total
;
490 FreeExplicitDgram(dg
);
494 /***********************************************************************
495 * AssembleReceivedDatagrams()
497 * This function assembles the datagrams received so far, into a
498 * single message. This also results in part of the Receive Window being
500 ***********************************************************************/
501 void AssembleReceivedDatagrams(OtherNode node
)
503 unsigned int next
, slot
; ExplicitDgram dg
;
504 next
= node
->recv_next
;
506 slot
= (next
% Cmi_window_size
);
507 dg
= node
->recv_window
[slot
];
509 AssembleDatagram(node
, dg
);
510 node
->recv_window
[slot
] = 0;
512 next
= ((next
+ 1) & DGRAM_SEQNO_MASK
);
514 node
->recv_next
= next
;
520 /************************************************************************
521 * IntegrateMessageDatagram()
523 * This function integrates the received datagrams. It first
524 * increments the count of un-acked datagrams. (This is to aid the
525 * heuristic that an ack should be sent when the Receive window is half
526 * full). If the current datagram is the first missing packet, then this
527 * means that the datagram that was missing in the incomplete sequence
528 * of datagrams so far, has arrived, and hence the datagrams can be
530 ************************************************************************/
532 void IntegrateMessageDatagram(ExplicitDgram dg
)
535 unsigned int slot
; OtherNode node
;
537 LOG(Cmi_clock
, Cmi_nodestart
, 'M', dg
->srcpe
, dg
->seqno
);
538 MACHSTATE2(2," IntegrateMessageDatagram [seq %d from pe %d]", dg
->seqno
,dg
->srcpe
)
540 node
= nodes_by_pe
[dg
->srcpe
];
541 node
->stat_recv_pkt
++;
544 node
->recv_ack_cnt
++;
545 if (node
->recv_ack_time
== 0.0)
546 node
->recv_ack_time
= Cmi_clock
+ Cmi_ack_delay
;
547 if (((seqno
- node
->recv_next
) & DGRAM_SEQNO_MASK
) < Cmi_window_size
) {
548 slot
= (seqno
% Cmi_window_size
);
549 if (node
->recv_window
[slot
] == 0) {
550 node
->recv_window
[slot
] = dg
;
552 if (seqno
== node
->recv_next
)
553 AssembleReceivedDatagrams(node
);
554 if (seqno
> node
->recv_expect
)
555 node
->recv_ack_time
= 0.0;
556 if (seqno
>= node
->recv_expect
)
557 node
->recv_expect
= ((seqno
+1)&DGRAM_SEQNO_MASK
);
558 LOG(Cmi_clock
, Cmi_nodestart
, 'Y', node
->recv_next
, dg
->seqno
);
562 LOG(Cmi_clock
, Cmi_nodestart
, 'y', node
->recv_next
, dg
->seqno
);
563 FreeExplicitDgram(dg
);
568 /***********************************************************************
569 * IntegrateAckDatagram()
571 * This function is called on the message sending side, on receipt of
572 * an ack for a message that it sent. Since messages and acks could be
573 * lost, our protocol works in such a way that acks for higher sequence
574 * numbered packets act as implict acks for lower sequence numbered
575 * packets, in case the acks for the lower sequence numbered packets
578 * Recall that the Send and Receive windows are circular queues, and the
579 * sequence numbers of the packets (datagrams) are monotically
580 * increasing. Hence it is important to know for which sequence number
581 * the ack is for, and to correspodinly relate that to tha actual packet
582 * sitting in the Send window. Since every 20th packet occupies the same
583 * slot in the windows, a number of sanity checks are required for our
585 * 1. If the ack number (first missing packet sequence number) is less
586 * than the last ack number received then this ack can be ignored.
588 * 2. The last ack number received must be set to the current ack
589 * sequence number (This is done only if 1. is not true).
591 * 3. Now the whole Send window is examined, in a kind of reverse
592 * order. The check starts from a sequence number = 20 + the first
593 * missing packet's sequence number. For each of these sequence numbers,
594 * the slot in the Send window is checked for existence of a datagram
595 * that should have been sent. If there is no datagram, then the search
596 * advances. If there is a datagram, then the sequence number of that is
597 * checked with the expected sequence number for the current iteration
598 * (This is decremented in each iteration of the loop).
600 * If the sequence numbers do not match, then checks are made (for
601 * the unlikely scenarios where the current slot sequence number is
602 * equal to the first missing packet's sequence number, and where
603 * somehow, packets which have greater sequence numbers than allowed for
604 * the current window)
606 * If the sequence numbers DO match, then the flag 'rxing' is
607 * checked. The semantics for this flag is that : If any packet with a
608 * greater sequence number than the current packet (and hence in the
609 * previous iteration of the for loop) has been acked, then the 'rxing'
610 * flag is set to 1, to imply that all the packets of lower sequence
611 * number, for which the ack->window[] element does not indicate that the
612 * packet has been received, must be retransmitted.
614 ***********************************************************************/
616 void IntegrateAckDatagram(ExplicitDgram dg
)
618 OtherNode node
; DgramAck
*ack
; ImplicitDgram idg
;
619 int i
; unsigned int slot
, rxing
, dgseqno
, seqno
, ackseqno
;
623 node
= nodes_by_pe
[dg
->srcpe
];
624 ack
= ((DgramAck
*)(dg
->data
));
625 memcpy(&ackseqno
, &(ack
->window
[Cmi_window_size
]), sizeof(unsigned int));
627 seqno
= (dgseqno
+ Cmi_window_size
) & DGRAM_SEQNO_MASK
;
628 slot
= seqno
% Cmi_window_size
;
630 node
->stat_recv_ack
++;
631 LOG(Cmi_clock
, Cmi_nodestart
, 'R', node
->nodestart
, dg
->seqno
);
633 tmp
= node
->recv_ack_seqno
;
634 /* check that the ack being received is actually appropriate */
635 if ( !((node
->recv_ack_seqno
>=
636 ((DGRAM_SEQNO_MASK
>> 1) + (DGRAM_SEQNO_MASK
>> 2))) &&
637 (ackseqno
< (DGRAM_SEQNO_MASK
>> 1))) &&
638 (ackseqno
<= node
->recv_ack_seqno
))
640 FreeExplicitDgram(dg
);
643 /* higher ack so adjust */
644 node
->recv_ack_seqno
= ackseqno
;
645 writeableDgrams
=1; /* May have freed up some send slots */
647 for (i
=Cmi_window_size
-1; i
>=0; i
--) {
648 slot
--; if (slot
== ((unsigned int)-1)) slot
+=Cmi_window_size
;
649 seqno
= (seqno
-1) & DGRAM_SEQNO_MASK
;
650 idg
= node
->send_window
[slot
];
652 if (idg
->seqno
== seqno
) {
653 if (ack
->window
[i
]) {
654 /* remove those that have been received and are within a window
655 of the first missing packet */
656 node
->stat_ack_pkts
++;
657 LOG(Cmi_clock
, Cmi_nodestart
, 'r', node
->nodestart
, seqno
);
658 node
->send_window
[slot
] = 0;
659 DiscardImplicitDgram(idg
);
662 node
->send_window
[slot
] = 0;
663 idg
->next
= node
->send_queue_h
;
664 if (node
->send_queue_h
== 0) {
665 node
->send_queue_t
= idg
;
667 node
->send_queue_h
= idg
;
670 diff
= dgseqno
>= idg
->seqno
?
671 ((dgseqno
- idg
->seqno
) & DGRAM_SEQNO_MASK
) :
672 ((dgseqno
+ (DGRAM_SEQNO_MASK
- idg
->seqno
) + 1) & DGRAM_SEQNO_MASK
);
674 if ((diff
<= 0) || (diff
> Cmi_window_size
))
679 /* if ack is really less than our packet seq (consider wrap around) */
680 if (dgseqno
< idg
->seqno
&& (idg
->seqno
- dgseqno
<= Cmi_window_size
))
684 if (dgseqno
== idg
->seqno
)
688 node
->stat_ack_pkts
++;
689 LOG(Cmi_clock
, Cmi_nodestart
, 'o', node
->nodestart
, idg
->seqno
);
690 node
->send_window
[slot
] = 0;
691 DiscardImplicitDgram(idg
);
695 FreeExplicitDgram(dg
);
698 void ReceiveDatagram(void)
700 ExplicitDgram dg
; int ok
, magic
;
701 MACHLOCK_ASSERT(comm_flag
,"ReceiveDatagram")
702 MallocExplicitDgram(dg
);
703 ok
= recv(dataskt
,(char*)(dg
->data
),Cmi_max_dgram_size
,0);
704 /*ok = recvfrom(dataskt,(char*)(dg->data),Cmi_max_dgram_size,0, 0, 0);*/
705 /* if (ok<0) { perror("recv"); KillEveryoneCode(37489437); } */
707 MACHSTATE1(4," recv dgram failed (errno=%d)",errno
)
708 FreeExplicitDgram(dg
);
709 if (errno
== EINTR
) return; /* A SIGIO interrupted the receive */
710 if (errno
== EAGAIN
) return; /* Just try again later */
712 if (errno
== EWOULDBLOCK
) return; /* No more messages on that socket. */
713 if (errno
== ECONNREFUSED
) return; /* A "Host unreachable" ICMP packet came in */
715 CmiPrintf("ReceiveDatagram: recv: %s(%d)\n", strerror(errno
), errno
) ;
716 KillEveryoneCode(37489437);
719 #ifdef CMK_RANDOMLY_CORRUPT_MESSAGES
720 /* randomly corrupt data and ack datagrams */
721 randomCorrupt((char*)dg
->data
, dg
->len
);
724 if (ok
>= DGRAM_HEADER_SIZE
) {
725 DgramHeaderBreak(dg
->data
, dg
->rank
, dg
->srcpe
, magic
, dg
->seqno
, dg
->broot
);
726 MACHSTATE3(2," recv dgram [seq %d, for rank %d, from pe %d]",
727 dg
->seqno
,dg
->rank
,dg
->srcpe
)
728 #ifdef CMK_USE_CHECKSUM
729 if (computeCheckSum((unsigned char*)dg
->data
, dg
->len
) == 0)
731 if (magic
== (Cmi_net_magic
&DGRAM_MAGIC_MASK
))
734 if (dg
->rank
== DGRAM_ACKNOWLEDGE
)
735 IntegrateAckDatagram(dg
);
736 else IntegrateMessageDatagram(dg
);
737 } else FreeExplicitDgram(dg
);
739 MACHSTATE1(4," recv dgram failed (len=%d)",ok
)
740 FreeExplicitDgram(dg
);
745 /***********************************************************************
746 * CommunicationServer()
748 * This function does the scheduling of the tasks related to the
749 * message sends and receives. It is called from the CmiGeneralSend()
750 * function, and periodically from the CommunicationInterrupt() (in case
751 * of the single processor version), and from the comm_thread (for the
752 * SMP version). Based on which of the data/control read/write sockets
753 * are ready, the corresponding tasks are called
755 ***********************************************************************/
756 void CmiHandleImmediate(void);
758 static void CommunicationServer(int sleepTime
, int where
)
760 unsigned int nTimes
=0; /* Loop counter */
761 LOG(GetClock(), Cmi_nodestart
, 'I', 0, 0);
762 MACHSTATE2(1,"CommunicationsServer(%d,%d)",
763 sleepTime
,writeableAcks
||writeableDgrams
)
764 #if !CMK_SHARED_VARS_UNAVAILABLE /*SMP mode: comm. lock is precious*/
765 if (sleepTime
!=0) {/*Sleep *without* holding the comm. lock*/
766 MACHSTATE(1,"CommServer going to sleep (NO LOCK)");
767 if (CheckSocketsReady(sleepTime
)<=0) {
768 MACHSTATE(1,"CommServer finished without anything happening.");
774 inProgress
[CmiMyRank()] += 1;
775 /* in netpoll mode, only perform service to stdout */
776 if (Cmi_netpoll
&& where
== COMM_SERVER_FROM_INTERRUPT
) {
777 if (CmiStdoutNeedsService()) {CmiStdoutService();}
779 inProgress
[CmiMyRank()] -= 1;
782 CommunicationsClock();
783 /*Don't sleep if a signal has stored messages for us*/
784 if (sleepTime
&&CmiGetState()->idle
.hasMessages
) sleepTime
=0;
785 while (CheckSocketsReady(sleepTime
)>0) {
787 MACHSTATE(2,"CheckSocketsReady returned true");
789 if (ctrlskt_ready_read
) {again
=1;ctrl_getone();}
790 if (dataskt_ready_read
) {again
=1;ReceiveDatagram();}
791 if (dataskt_ready_write
) {
793 if (0!=(writeableAcks
=TransmitAcknowledgement())) again
=1;
795 if (0!=(writeableDgrams
=TransmitDatagram())) again
=1;
797 if (CmiStdoutNeedsService()) {CmiStdoutService();}
798 if (!again
) break; /* Nothing more to do */
799 if ((nTimes
++ &16)==15) {
800 /*We just grabbed a whole pile of packets-- try to retire a few*/
801 CommunicationsClock();
805 inProgress
[CmiMyRank()] -= 1;
807 /* when called by communication thread or in interrupt */
808 if (where
== COMM_SERVER_FROM_SMP
|| where
== COMM_SERVER_FROM_INTERRUPT
) {
809 #if CMK_IMMEDIATE_MSG
810 CmiHandleImmediate();
812 #if CMK_PERSISTENT_COMM
817 MACHSTATE(1,"} CommunicationServer")
820 void CmiMachineInit(char **argv
)
824 void CmiCommunicationInit(char **argv
)
828 void CmiMachineExit(void)