1 /* @(#) client/server context implementation
3 * Copyright 2008-2011 Pavel V. Cherenkov (pcherenkov@gmail.com)
5 * This file is part of udpxy.
7 * udpxy is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
12 * udpxy is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
17 * You should have received a copy of the GNU General Public License
18 * along with udpxy. If not, see <http://www.gnu.org/licenses/>.
21 #include "osdef.h" /* os-specific definitions */
23 #include <sys/types.h>
24 #include <sys/socket.h>
26 #include <arpa/inet.h>
28 #include <netinet/in.h>
45 extern const char IPv4_ALL
[];
47 /* initialize server context data
50 init_server_ctx( struct server_ctx
* ctx
,
52 const char* laddr
, uint16_t lport
,
53 const char* mifc_addr
)
57 assert( lport
&& mifc_addr
&& ctx
&& max
);
60 (void) strncpy( ctx
->listen_addr
, (laddr
? laddr
: IPv4_ALL
),
62 ctx
->listen_addr
[ IPADDR_STR_SIZE
- 1 ] = '\0';
64 ctx
->listen_port
= lport
;
66 (void) strncpy( ctx
->mcast_ifc_addr
, mifc_addr
, IPADDR_STR_SIZE
);
67 ctx
->mcast_ifc_addr
[ IPADDR_STR_SIZE
- 1 ] = '\0';
69 ctx
->cl
= calloc(max
, sizeof(struct client_ctx
));
70 if( NULL
== ctx
->cl
) {
71 mperror( g_flog
, errno
, "%s: client_t - calloc", __func__
);
75 (void) memset( ctx
->cl
, 0, max
* sizeof(struct client_ctx
) );
76 ctx
->clfree
= ctx
->clmax
= max
;
78 if( 0 != pipe(ctx
->cpipe
) ) {
79 mperror( g_flog
, errno
, "%s: pipe", __func__
);
83 /* make reading end of pipe non-blocking (we don't want to
84 * block on pipe read on the server side)
86 if( -1 == (flags
= fcntl( ctx
->cpipe
[0], F_GETFL
)) ||
87 -1 == fcntl( ctx
->cpipe
[0], F_SETFL
, flags
| O_NONBLOCK
) ) {
88 mperror( g_flog
, errno
, "%s: fcntl", __func__
);
97 free_server_ctx( struct server_ctx
* ctx
)
104 for( i
= 0; i
< 2; ++i
) {
105 if( ctx
->cpipe
[i
] < 0 ) continue;
107 if( 0 != close( ctx
->cpipe
[i
] ) ) {
108 mperror( g_flog
, errno
,
109 "%s: close(pipe)", __func__
);
113 (void) memset( ctx
, 0, sizeof(struct server_ctx
) );
118 /* find index of the first client with the given pid
121 find_client( const struct server_ctx
* ctx
, pid_t pid
)
125 assert( ctx
&& (pid
>= 0) );
127 for( i
= 0; (size_t)i
< ctx
->clmax
; ++i
) {
128 if( ctx
->cl
[ i
].pid
== pid
)
135 /* populate connection's source info in client context
138 get_src_info( struct client_ctx
* cl
, int sockfd
)
141 struct sockaddr_in addr
;
146 if( -1 == (rc
= fstat( sockfd
, &st
)) ) {
147 mperror( g_flog
, errno
, "%s: fstat", __func__
);
151 if( S_ISREG( st
.st_mode
) ) {
152 (void) strncpy( cl
->src_addr
, "File", sizeof(cl
->src_addr
) );
153 cl
->src_addr
[ sizeof(cl
->src_addr
) - 1 ] = '\0';
156 else if( S_ISSOCK( st
.st_mode
) ) {
158 rc
= getpeername( sockfd
, (struct sockaddr
*)&addr
, &len
);
160 (void)strncpy( cl
->src_addr
, inet_ntoa(addr
.sin_addr
),
161 sizeof(cl
->src_addr
) - 1 );
162 cl
->src_addr
[ sizeof(cl
->src_addr
) - 1 ] = '\0';
164 cl
->src_port
= ntohs(addr
.sin_port
);
167 mperror( g_flog
, errno
, "%s: getpeername", __func__
);
176 /* add client to server context
179 add_client( struct server_ctx
* ctx
,
180 pid_t cpid
, const char* maddr
, uint16_t mport
,
183 struct client_ctx
* client
= NULL
;
187 assert( ctx
&& maddr
&& mport
);
189 index
= find_client( ctx
, 0 );
193 client
= &(ctx
->cl
[ index
]);
196 (void) strncpy( client
->mcast_addr
, maddr
, IPADDR_STR_SIZE
);
197 client
->mcast_addr
[ IPADDR_STR_SIZE
- 1 ] = '\0';
199 client
->mcast_port
= mport
;
201 rc
= get_src_info( client
, sockfd
);
207 /* init sender id: 0 indicates no stats */
208 client
->tstat
.sender_id
= 0;
213 (void)tmfprintf( g_flog
, "Added client: pid=[%d], maddr=[%s], mport=[%d], "
214 "saddr=[%s], sport=[%d]\n",
215 (int)cpid
, maddr
, (int)mport
, client
->src_addr
, client
->src_port
);
221 /* delete client from server context
224 delete_client( struct server_ctx
* ctx
, pid_t cpid
)
226 struct client_ctx
* client
= NULL
;
229 assert( ctx
&& (cpid
> 0) );
231 index
= find_client( ctx
, cpid
);
236 client
= &(ctx
->cl
[ index
]);
242 TRACE( (void)tmfprintf( g_flog
, "Deleted client: pid=[%d]\n", cpid
) );
249 /* init traffic relay statistics
252 tpstat_init( struct tps_data
* d
, int setpid
)
259 d
->tm_from
= time(NULL
);
267 /* send statistics update to server (if it's time)
270 tpstat_update( struct server_ctx
* ctx
,
271 struct tps_data
* d
, ssize_t nbytes
)
273 static const double MAX_NOUPDATE_ITER
= 1000.0;
274 static const double MAX_SEC
= 10.0;
277 int writefd
= -1, rc
= 0;
278 const int DO_NOT_SET_PID
= 0;
286 nsec
= difftime( time(NULL
), d
->tm_from
);
288 if( !((d
->niter
>= MAX_NOUPDATE_ITER
) || (nsec
>= MAX_SEC
)) )
291 /* attempt to send update to server */
294 ts
.sender_id
= d
->pid
;
295 ts
.nbytes
= d
->nbytes
;
298 writefd
= ctx
->cpipe
[1];
300 nwr
= write( writefd
, &ts
, sizeof(ts
) );
302 if( (EINTR
!= errno
) && (EAGAIN
!= errno
) ) {
303 mperror( g_flog
, errno
, "%s: write", __func__
);
307 /* if it's an interrupt or pipe full - ignore */
308 TRACE( (void)tmfprintf( g_flog
, "%s - write error [%s] - ignored\n",
309 __func__
, strerror(errno
)) );
312 if( sizeof(ts
) != (size_t)nwr
) {
313 (void)tmfprintf( g_flog
, "%s - wrote [%d] bytes to pipe, "
315 __func__
, nwr
, (int)sizeof(ts
) );
321 TRACE( (void)tmfprintf( g_flog,
322 "Sent TSTAT={ sender=[%ld], bytes=[%f], seconds=[%f] }\n",
323 (long)ts.sender_id, ts.nbytes, ts.nsec) );
329 tpstat_init( d
, DO_NOT_SET_PID
);
336 /* read client statistics data and update the context
339 tpstat_read( struct server_ctx
* ctx
)
345 struct client_ctx
* client
= NULL
;
349 readfd
= ctx
->cpipe
[0];
350 assert( readfd
> 0 );
352 (void)memset( &ts
, 0, sizeof(ts
) );
354 nread
= read( readfd
, &ts
, sizeof(ts
) );
356 if( (EINTR
!= errno
) && (EAGAIN
!= errno
) ) {
357 mperror( g_flog
, errno
, "%s: read", __func__
);
360 /* if it's an interrupt or no data available - ignore */
361 TRACE( (void)tmfprintf( g_flog
, "%s - read error [%s] - ingored\n",
362 __func__
, strerror(errno
)) );
365 if( sizeof(ts
) != (size_t)nread
) {
366 (void)tmfprintf( g_flog
, "%s - read [%d] bytes from pipe, expected [%u]\n",
367 __func__
, nread
, (int)sizeof(ts
) );
371 TRACE( (void)tmfprintf( g_flog
,
372 "Received TSTAT={ sender=[%ld], bytes=[%f], seconds=[%f] }\n",
373 (long)ts
.sender_id
, ts
.nbytes
, ts
.nsec
) );
375 cindex
= find_client( ctx
, (pid_t
)ts
.sender_id
);
377 (void)tmfprintf( g_flog
, "%s - cannot find client [%ld]\n",
378 __func__
, (long)ts
.sender_id
);
379 /* ignore invalid client id's */
383 client
= &(ctx
->cl
[ cindex
]);
386 TRACE( (void)tmfprintf( g_flog
, "Updated context for pid=[%d]; "
387 "[%.1f] Kb/sec\n", client
->pid
, (ts
.nbytes
/ 1024) / ts
.nsec
) );