1 #include "sockRoutines.h"
3 #ifndef CMK_NO_SOCKETS /*<- for ASCI Red*/
17 #include <sys/bproc.h>
20 #ifndef CMK_NOT_USE_CONVERSE
21 # include "converse.h" /* use real CmiTmpAlloc/Free */
22 #elif CMK_NOT_USE_CONVERSE /* fake CmiTmpAlloc/Free via malloc */
24 # define CmiTmpAlloc(size) malloc(size)
25 # define CmiTmpFree(ptr) free(ptr)
29 typedef int socklen_t
;
32 #if CMK_HAS_GETIFADDRS
33 #include <netinet/in.h> /* for sockaddr_in */
34 #include <ifaddrs.h> /* for getifaddrs */
35 #include <net/if.h> /* for IFF_RUNNING */
38 /*Just print out error message and exit*/
39 static int default_skt_abort(SOCKET skt
, int code
, const char *msg
)
41 fprintf(stderr
,"Fatal socket error: code %d-- %s\n",code
,msg
);
46 static skt_idleFn idleFunc
=NULL
;
47 static skt_abortFn skt_abort
=default_skt_abort
;
48 void skt_set_idle(skt_idleFn f
) {idleFunc
=f
;}
49 skt_abortFn
skt_set_abort(skt_abortFn f
)
51 skt_abortFn old
=skt_abort
;
56 /* These little flags are used to ignore the SIGPIPE signal
57 * while we're inside one of our socket calls.
58 * This lets us only handle SIGPIPEs we generated. */
59 static int skt_ignore_SIGPIPE
=0;
61 #if defined(_WIN32) /*Windows systems:*/
62 static void doCleanup(void)
64 static int skt_inited
=0;
65 /*Initialization routine (Windows only)*/
70 if (skt_inited
) return;
72 WSAStartup(version
, &WSAData
);
76 void skt_close(SOCKET fd
)
80 #else /*UNIX Systems:*/
82 typedef void (*skt_signal_handler_fn
)(int sig
);
83 static skt_signal_handler_fn skt_fallback_SIGPIPE
=NULL
;
84 static struct sigaction sa
;
85 static void skt_SIGPIPE_handler(int sig
) {
86 if (skt_ignore_SIGPIPE
) {
87 fprintf(stderr
,"Caught SIGPIPE.\n");
88 sigaction(SIGPIPE
, &sa
, NULL
);
91 skt_fallback_SIGPIPE(sig
);
96 /* Install a SIGPIPE signal handler.
97 This prevents us from dying when one of our network
100 struct sigaction old_action
;
101 sa
.sa_handler
= skt_SIGPIPE_handler
;
102 sigemptyset(&sa
.sa_mask
);
103 sa
.sa_flags
= SA_RESTART
;
104 sigaction(SIGPIPE
, &sa
, &old_action
);
105 skt_fallback_SIGPIPE
= old_action
.sa_handler
;
107 void skt_close(SOCKET fd
)
109 skt_ignore_SIGPIPE
=1;
111 skt_ignore_SIGPIPE
=0;
115 #ifndef SKT_HAS_BUFFER_BEGIN
116 void skt_buffer_begin(SOCKET sk
) {}
117 void skt_buffer_end(SOCKET sk
) {}
121 static int ERRNO
= -1;
123 /*Called when a socket or select routine returns
124 an error-- determines how to respond.
125 Return 1 if the last call was interrupted
126 by, e.g., an alarm and should be retried.
128 static int skt_should_retry(void)
130 int isinterrupt
=0,istransient
=0,istimeout
=0;
131 #if defined(_WIN32) /*Windows systems-- check Windows Sockets Error*/
132 int err
=WSAGetLastError();
133 if (err
==WSAEINTR
) isinterrupt
=1;
134 if (err
==WSATRY_AGAIN
||err
==WSAECONNREFUSED
)
136 #else /*UNIX systems-- check errno*/
138 if (err
==EINTR
) isinterrupt
=1;
139 if (err
==ETIMEDOUT
) istimeout
=1;
140 if (err
==EAGAIN
||err
==ECONNREFUSED
141 ||err
==EWOULDBLOCK
||err
==ENOBUFS
148 /*We were interrupted by an alarm. Schedule, then retry.*/
149 if (idleFunc
!=NULL
) idleFunc();
151 else if (istransient
)
152 { /*A transient error-- idle a while, then try again later.*/
153 if (idleFunc
!=NULL
) idleFunc();
157 return 0; /*Some unrecognized problem-- abort!*/
158 /* timeout is used by send() for example to terminate node
159 program normally, when charmrun dies */
160 return 1;/*Otherwise, we recognized it*/
164 /* fd must be tcp socket */
165 int skt_tcp_no_nagle(SOCKET fd
)
169 ok
= setsockopt(fd
, IPPROTO_TCP
, TCP_NODELAY
, (char *)&flag
, sizeof(int));
174 int skt_select1(SOCKET fd
,int msec
)
176 struct pollfd fds
[1];
177 int begin
, nreadable
;
182 fds
[0].events
=POLLIN
;
184 if (msec
>0) begin
= time(0);
187 skt_ignore_SIGPIPE
=1;
188 nreadable
= poll(fds
, 1, msec
);
189 skt_ignore_SIGPIPE
=0;
192 if (skt_should_retry()) continue;
193 else skt_abort(fd
, 93200, "Fatal error in poll");
195 if (nreadable
>0) return 1; /*We gotta good socket*/
197 while(msec
>0 && ((secLeft
= sec
- (time(0) - begin
))>0));
199 return 0;/*Timed out*/
204 /*Sleep on given read socket until msec or readable*/
205 int skt_select1(SOCKET fd
,int msec
)
211 int begin
, nreadable
;
216 if (msec
>0) begin
= time(0);
220 tmo
.tv_usec
= (msec
-1000*sec
)*1000;
221 skt_ignore_SIGPIPE
=1;
222 nreadable
= select(1+fd
, &rfds
, NULL
, NULL
, &tmo
);
223 skt_ignore_SIGPIPE
=0;
226 if (skt_should_retry()) continue;
227 else skt_abort(fd
, 93200, "Fatal error in select");
229 if (nreadable
>0) return 1; /*We gotta good socket*/
231 while(msec
>0 && ((secLeft
= sec
- (time(0) - begin
))>0));
233 return 0;/*Timed out*/
237 /******* DNS *********/
238 skt_ip_t _skt_invalid_ip
={{0}};
240 skt_ip_t
skt_my_ip(void)
243 skt_ip_t ip
= _skt_invalid_ip
;
245 #if CMK_HAS_GETIFADDRS
246 /* Code snippet from Jens Alfke
247 * http://lists.apple.com/archives/macnetworkprog/2008/May/msg00013.html */
248 struct ifaddrs
*ifaces
=0;
249 if( getifaddrs(&ifaces
) == 0 ) {
250 struct ifaddrs
*iface
;
251 for( iface
=ifaces
; iface
; iface
=iface
->ifa_next
) {
252 if( (iface
->ifa_flags
& IFF_UP
) && ! (iface
->ifa_flags
& IFF_LOOPBACK
) ) {
253 const struct sockaddr_in
*addr
= (const struct sockaddr_in
*)iface
->ifa_addr
;
254 if( addr
&& addr
->sin_family
==AF_INET
) {
256 if ( ifcount
==1 ) memcpy(&ip
, &addr
->sin_addr
, sizeof(ip
));
262 /* fprintf(stderr, "My IP is %d.%d.%d.%d\n", ip.data[0],ip.data[1],ip.data[2],ip.data[3]); */
263 if (ifcount
==1) return ip
;
266 if (gethostname(hostname
, 999)==0) {
267 skt_ip_t ip2
= skt_lookup_ip(hostname
);
268 if ( ip2
.data
[0] != 127 ) return ip2
;
269 else if ( ifcount
!= 0 ) return ip
;
272 return _skt_invalid_ip
;
275 static int skt_parse_dotted(const char *str
,skt_ip_t
*ret
)
278 *ret
=_skt_invalid_ip
;
279 for (i
=0;i
<sizeof(skt_ip_t
);i
++) {
280 if (1!=sscanf(str
,"%d",&v
)) return 0;
281 if (v
<0 || v
>255) return 0;
282 while (isdigit(*str
)) str
++; /* Advance over number */
283 if (i
!=sizeof(skt_ip_t
)-1) { /*Not last time:*/
284 if (*str
!='.') return 0; /*Check for dot*/
285 } else { /*Last time:*/
286 if (*str
!=0) return 0; /*Check for end-of-string*/
289 ret
->data
[i
]=(unsigned char)v
;
294 /* this is NOT thread safe ! */
295 skt_ip_t
skt_lookup_ip(const char *name
)
297 skt_ip_t ret
=_skt_invalid_ip
;
298 /*First try to parse the name as dotted decimal*/
299 if (skt_parse_dotted(name
,&ret
))
301 else {/*Try a DNS lookup*/
302 struct hostent
*h
= gethostbyname(name
); /* not thread safe */
303 if (h
==0) return _skt_invalid_ip
;
304 memcpy(&ret
,h
->h_addr_list
[0],h
->h_length
);
309 /* these 2 functions will return the inner node IP, special for
310 Linux Scyld. G. Zheng
312 skt_ip_t
skt_innode_my_ip(void)
315 /* on Scyld, the hostname is just the node number */
317 sprintf(hostname
, "%d", bproc_currnode());
318 return skt_innode_lookup_ip(hostname
);
324 skt_ip_t
skt_innode_lookup_ip(const char *name
)
327 struct sockaddr_in addr
;
328 int len
= sizeof(struct sockaddr_in
);
329 if (-1 == bproc_nodeaddr(atoi(name
), &addr
, &len
)) {
330 return _skt_invalid_ip
;
334 memcpy(&ret
,&addr
.sin_addr
.s_addr
,sizeof(ret
));
338 return skt_lookup_ip(name
);
342 /*Write as dotted decimal*/
343 char *skt_print_ip(char *dest
,skt_ip_t addr
)
347 for (i
=0;i
<sizeof(addr
);i
++) {
348 const char *trail
=".";
349 if (i
==sizeof(addr
)-1) trail
=""; /*No trailing separator dot*/
350 sprintf(o
,"%d%s",(int)addr
.data
[i
],trail
);
355 int skt_ip_match(skt_ip_t a
,skt_ip_t b
)
357 return 0==memcmp(&a
,&b
,sizeof(a
));
359 struct sockaddr_in
skt_build_addr(skt_ip_t IP
,int port
)
361 struct sockaddr_in ret
={0};
362 ret
.sin_family
=AF_INET
;
363 ret
.sin_port
= htons((short)port
);
364 memcpy(&ret
.sin_addr
,&IP
,sizeof(IP
));
368 SOCKET
skt_datagram(unsigned int *port
, int bufsize
)
370 int connPort
=(port
==NULL
)?0:*port
;
371 struct sockaddr_in addr
=skt_build_addr(_skt_invalid_ip
,connPort
);
376 ret
= socket(AF_INET
,SOCK_DGRAM
,0);
377 if (ret
== SOCKET_ERROR
) {
378 if (skt_should_retry()) goto retry
;
379 return skt_abort(-1, 93490, "Error creating datagram socket.");
381 if (bind(ret
, (struct sockaddr
*)&addr
, sizeof(addr
)) == SOCKET_ERROR
)
382 return skt_abort(-1, 93491, "Error binding datagram socket.");
385 if (getsockname(ret
, (struct sockaddr
*)&addr
, &len
))
386 return skt_abort(-1, 93492, "Error getting address on datagram socket.");
391 if (setsockopt(ret
, SOL_SOCKET
, SO_RCVBUF
, (char *)&bufsize
, len
) == SOCKET_ERROR
)
392 return skt_abort(-1, 93495, "Error on RCVBUF sockopt for datagram socket.");
393 if (setsockopt(ret
, SOL_SOCKET
, SO_SNDBUF
, (char *)&bufsize
, len
) == SOCKET_ERROR
)
394 return skt_abort(-1, 93496, "Error on SNDBUF sockopt for datagram socket.");
397 if (port
!=NULL
) *port
= (int)ntohs(addr
.sin_port
);
400 SOCKET
skt_server(unsigned int *port
)
402 return skt_server_ip(port
,NULL
);
405 SOCKET
skt_server_ip(unsigned int *port
,skt_ip_t
*ip
)
409 int on
= 1; /* for setsockopt */
410 int connPort
=(port
==NULL
)?0:*port
;
411 struct sockaddr_in addr
=skt_build_addr((ip
==NULL
)?_skt_invalid_ip
:*ip
,connPort
);
414 ret
= socket(PF_INET
, SOCK_STREAM
, 0);
416 if (ret
== SOCKET_ERROR
) {
417 if (skt_should_retry()) goto retry
;
418 else return skt_abort(-1, 93483, "Error creating server socket.");
420 setsockopt(ret
, SOL_SOCKET
, SO_REUSEADDR
, (char *) &on
, sizeof(on
));
422 if (bind(ret
, (struct sockaddr
*)&addr
, sizeof(addr
)) == SOCKET_ERROR
)
423 return skt_abort(-1, 93484, "Error binding server socket.");
424 if (listen(ret
,5) == SOCKET_ERROR
)
425 return skt_abort(-1, 93485, "Error listening on server socket.");
427 if (getsockname(ret
, (struct sockaddr
*)&addr
, &len
) == SOCKET_ERROR
)
428 return skt_abort(-1, 93486, "Error getting name on server socket.");
430 if (port
!=NULL
) *port
= (int)ntohs(addr
.sin_port
);
431 if (ip
!=NULL
) memcpy(ip
, &addr
.sin_addr
, sizeof(*ip
));
435 SOCKET
skt_accept(SOCKET src_fd
,skt_ip_t
*pip
, unsigned int *port
)
438 struct sockaddr_in addr
={0};
442 ret
= accept(src_fd
, (struct sockaddr
*)&addr
, &len
);
443 if (ret
== SOCKET_ERROR
) {
444 if (skt_should_retry()) goto retry
;
445 else return skt_abort(-1, 93523, "Error in accept.");
448 if (port
!=NULL
) *port
=ntohs(addr
.sin_port
);
449 if (pip
!=NULL
) memcpy(pip
,&addr
.sin_addr
,sizeof(*pip
));
454 SOCKET
skt_connect(skt_ip_t ip
, int port
, int timeout
)
456 struct sockaddr_in addr
=skt_build_addr(ip
,port
);
461 while (time(0)-begin
< timeout
)
463 ret
= socket(AF_INET
, SOCK_STREAM
, 0);
464 if (ret
==SOCKET_ERROR
)
466 if (skt_should_retry()) continue;
467 else return skt_abort(-1, 93512, "Error creating socket");
469 ok
= connect(ret
, (struct sockaddr
*)&(addr
), sizeof(addr
));
470 if (ok
!= SOCKET_ERROR
)
471 return ret
;/*Good connect*/
472 else { /*Bad connect*/
474 if (skt_should_retry()) continue;
476 #if ! defined(_WIN32)
477 if (ERRNO
== ETIMEDOUT
) continue; /* time out is fine */
479 return skt_abort(-1, 93515, "Error connecting to socket\n");
485 return skt_abort(-1, 93517, "Timeout in socket connect\n");
486 return INVALID_SOCKET
;
489 void skt_setSockBuf(SOCKET skt
, int bufsize
)
491 int len
= sizeof(int);
492 if (setsockopt(skt
, SOL_SOCKET
, SO_SNDBUF
, (char *)&bufsize
, len
) == SOCKET_ERROR
)
493 skt_abort(-1, 93496, "Error on SNDBUF sockopt for datagram socket.");
494 if (setsockopt(skt
, SOL_SOCKET
, SO_RCVBUF
, (char *)&bufsize
, len
) == SOCKET_ERROR
)
495 skt_abort(-1, 93496, "Error on RCVBUF sockopt for datagram socket.");
498 int skt_recvN(SOCKET hSocket
,void *buff
,int nBytes
)
501 char *pBuff
=(char *)buff
;
506 if (0==skt_select1(hSocket
,600*1000))
507 return skt_abort(hSocket
, 93610, "Timeout on socket recv!");
508 skt_ignore_SIGPIPE
=1;
509 nRead
= recv(hSocket
,pBuff
,nLeft
,0);
510 skt_ignore_SIGPIPE
=0;
513 if (nRead
==0) return skt_abort(hSocket
, 93620, "Socket closed before recv.");
514 if (skt_should_retry()) continue;/*Try again*/
515 else return skt_abort(hSocket
, 93650+hSocket
, "Error on socket recv!");
526 int skt_sendN(SOCKET hSocket
,const void *buff
,int nBytes
)
529 const char *pBuff
=(const char *)buff
;
534 skt_ignore_SIGPIPE
=1;
535 nWritten
= send(hSocket
,pBuff
,nLeft
,0);
536 skt_ignore_SIGPIPE
=0;
539 if (nWritten
==0) return skt_abort(hSocket
, 93720, "Socket closed before send.");
540 if (skt_should_retry()) continue;/*Try again*/
541 else return skt_abort(hSocket
, 93700+hSocket
, "Error on socket send!");
552 /*Cheezy vector send:
553 really should use writev on machines where it's available.
555 #define skt_sendV_max (16*1024)
557 int skt_sendV(SOCKET fd
,int nBuffers
,const void **bufs
,int *lens
)
560 for (b
=0;b
<nBuffers
;b
++) len
+=lens
[b
];
561 if (len
<=skt_sendV_max
) { /*Short message: Copy and do one big send*/
562 char *buf
=(char *)CmiTmpAlloc(skt_sendV_max
);
565 for (b
=0;b
<nBuffers
;b
++) {
566 memcpy(dest
,bufs
[b
],lens
[b
]);
569 ret
=skt_sendN(fd
,buf
,len
);
573 else { /*Big message: Just send one-by-one as usual*/
575 for (b
=0;b
<nBuffers
;b
++)
576 if (0!=(ret
=skt_sendN(fd
,bufs
[b
],lens
[b
])))
583 int skt_sendmsg(SOCKET hSocket
, struct msghdr
*mh
, int num_bufs
, int nBytes
)
586 skt_ignore_SIGPIPE
= 1;
587 int bytes_sent
= sendmsg(hSocket
, mh
, 0);
588 skt_ignore_SIGPIPE
= 0;
589 if (bytes_sent
<= 0) {
590 return skt_abort(hSocket
, 93700+hSocket
, "Error on socket send!");
591 } else if (bytes_sent
< nBytes
) {
592 // this should happen very rarely
595 for (i
=0; i
< num_bufs
; i
++) {
596 int l
= mh
->msg_iov
[i
].iov_len
;
597 if ((offset
+ l
) <= bytes_sent
) {
598 mh
->msg_iov
[i
].iov_len
= 0; // all bytes from this buffer have been sent
601 // bytes from this buffer have been partially sent
602 int buf_transmitted
= (bytes_sent
- offset
);
603 mh
->msg_iov
[i
].iov_len
-= buf_transmitted
;
604 mh
->msg_iov
[i
].iov_base
= (char*)mh
->msg_iov
[i
].iov_base
+ buf_transmitted
;
609 nBytes
-= bytes_sent
;
614 int skt_sendmsg(SOCKET hSocket
, WSABUF
*buffers
, int num_bufs
, int nBytes
)
616 DWORD bytes_sent
= 0;
618 skt_ignore_SIGPIPE
= 1;
619 rc
= WSASend(hSocket
, buffers
, num_bufs
, &bytes_sent
, 0, NULL
, NULL
);
620 skt_ignore_SIGPIPE
= 0;
621 if ((rc
!= 0) || (bytes_sent
!= nBytes
)) {
622 return skt_abort(hSocket
, 93700+hSocket
, "Error on socket send!");
628 /***********************************************
629 Routines for manipulating simple binary messages,
630 e.g., to/from conv-host.
632 A conv-host message with header has the following format
634 ChMessage---------------------------------------------
635 /--ChMessageHeader---------------------------- ^
636 |12 bytes | Message type field ^ |
637 | | (ASCII, Null-terminated) | |
638 +----------------------------------- 24 bytes |
639 | 4 bytes | Message data length d | |
640 | | (big-endian binary integer) | |
641 +----------------------------------- | |
642 | 4 bytes | Return IP address | |
643 | 4 bytes | Return TCP port number | |
644 | | (big-endian binary integers) v 24+d bytes
645 \--------------------------------------------- |
646 d bytes | User data v
647 ------------------------------------------------------
649 For completeness, a big-endian (network byte order) 4 byte
650 integer has this format on the network:
651 ChMessageInt---------------------------------
652 1 byte | Most significant byte (&0xff000000; <<24)
653 1 byte | More significant byte (&0x00ff0000; <<16)
654 1 byte | Less significant byte (&0x0000ff00; <<8)
655 1 byte | Least significant byte (&0x000000ff; <<0)
656 ----------------------------------------------
659 ChMessageInt_t
ChMessageInt_new(unsigned int src
)
660 { /*Convert integer to bytes*/
661 int i
; ChMessageInt_t ret
;
662 for (i
=0;i
<4;i
++) ret
.data
[i
]=(unsigned char)(src
>>(8*(3-i
)));
665 unsigned int ChMessageInt(ChMessageInt_t src
)
666 { /*Convert bytes to integer*/
667 int i
; unsigned int ret
=0;
668 for (i
=0;i
<4;i
++) {ret
<<=8;ret
+=src
.data
[i
];}
672 ChMessageLong_t
ChMessageLong_new(CMK_TYPEDEF_UINT8 src
)
673 { /*Convert long integer to bytes*/
674 int i
; ChMessageLong_t ret
;
675 for (i
=0;i
<8;i
++) ret
.data
[i
]=(unsigned char)(src
>>(8*(7-i
)));
678 CMK_TYPEDEF_UINT8
ChMessageLong(ChMessageLong_t src
)
679 { /*Convert bytes to long integer*/
680 int i
; CMK_TYPEDEF_UINT8 ret
=0;
681 for (i
=0;i
<8;i
++) {ret
<<=8;ret
+=src
.data
[i
];}
682 return (CMK_TYPEDEF_UINT8
)ret
;
685 int ChMessage_recv(SOCKET fd
,ChMessage
*dst
)
687 /*Get the binary header*/
688 if (0!=ChMessageHeader_recv(fd
,dst
)) return -1;
689 if (0!=ChMessageData_recv(fd
,dst
)) return -1;
693 int ChMessageHeader_recv(SOCKET fd
,ChMessage
*dst
)
695 /*Get the binary header*/
696 if (0!=skt_recvN(fd
,(char *)&dst
->header
,sizeof(dst
->header
))) return -1;
697 /*Allocate a recieve buffer*/
698 dst
->len
=ChMessageInt(dst
->header
.len
);
702 int ChMessageData_recv(SOCKET fd
,ChMessage
*dst
)
704 dst
->data
=(char *)malloc(dst
->len
);
705 /*Get the actual data*/
706 if (0!=skt_recvN(fd
,dst
->data
,dst
->len
)) return -1;
710 void ChMessage_free(ChMessage
*doomed
)
713 strncpy(doomed
->header
.type
,"Free'd",CH_TYPELEN
);
717 void ChMessageHeader_new(const char *type
,int len
,ChMessageHeader
*dst
)
719 dst
->len
=ChMessageInt_new(len
);
720 if (type
==NULL
) type
="default";
721 strncpy(dst
->type
,type
,CH_TYPELEN
);
723 void ChMessage_new(const char *type
,int len
,ChMessage
*dst
)
725 ChMessageHeader_new(type
,len
,&dst
->header
);
727 dst
->data
=(char *)malloc(dst
->len
);
729 int ChMessage_send(SOCKET fd
,const ChMessage
*src
)
731 const void *bufs
[2]; int lens
[2];
732 bufs
[0]=&src
->header
; lens
[0]=sizeof(src
->header
);
733 bufs
[1]=src
->data
; lens
[1]=src
->len
;
734 return skt_sendV(fd
,2,bufs
,lens
);
735 } /*You must free after send*/
739 skt_ip_t _skt_invalid_ip
={{0}};
741 skt_ip_t
skt_my_ip(void)
743 return _skt_invalid_ip
;
746 #endif /*!CMK_NO_SOCKETS*/