nvram defaults now country=SG, txpwr=auto
[tomato.git] / release / src / router / udpxy / ctx.c
blob53022167054d989059af0b71bdb006f7b78a1e8e
1 /* @(#) client/server context implementation
3 * Copyright 2008-2011 Pavel V. Cherenkov (pcherenkov@gmail.com)
5 * This file is part of udpxy.
7 * udpxy is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
12 * udpxy is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
17 * You should have received a copy of the GNU General Public License
18 * along with udpxy. If not, see <http://www.gnu.org/licenses/>.
21 #include "osdef.h" /* os-specific definitions */
23 #include <sys/types.h>
24 #include <sys/socket.h>
25 #include <sys/stat.h>
26 #include <arpa/inet.h>
27 #include <net/if.h>
28 #include <netinet/in.h>
30 #include <string.h>
31 #include <assert.h>
32 #include <stdlib.h>
33 #include <stdio.h>
34 #include <errno.h>
35 #include <fcntl.h>
36 #include <unistd.h>
37 #include <time.h>
39 #include "ctx.h"
40 #include "udpxy.h"
41 #include "util.h"
42 #include "mtrace.h"
44 extern FILE* g_flog;
45 extern const char IPv4_ALL[];
47 /* initialize server context data
49 int
50 init_server_ctx( struct server_ctx* ctx,
51 const size_t max,
52 const char* laddr, uint16_t lport,
53 const char* mifc_addr )
55 int flags = -1;
57 assert( lport && mifc_addr && ctx && max );
59 ctx->lsockfd = 0;
60 (void) strncpy( ctx->listen_addr, (laddr ? laddr : IPv4_ALL),
61 IPADDR_STR_SIZE );
62 ctx->listen_addr[ IPADDR_STR_SIZE - 1 ] = '\0';
64 ctx->listen_port = lport;
66 (void) strncpy( ctx->mcast_ifc_addr, mifc_addr, IPADDR_STR_SIZE );
67 ctx->mcast_ifc_addr[ IPADDR_STR_SIZE - 1 ] = '\0';
69 ctx->cl = calloc(max, sizeof(struct client_ctx));
70 if( NULL == ctx->cl ) {
71 mperror( g_flog, errno, "%s: client_t - calloc", __func__ );
72 return ERR_INTERNAL;
75 (void) memset( ctx->cl, 0, max * sizeof(struct client_ctx) );
76 ctx->clfree = ctx->clmax = max;
78 if( 0 != pipe(ctx->cpipe) ) {
79 mperror( g_flog, errno, "%s: pipe", __func__ );
80 return ERR_INTERNAL;
83 /* make reading end of pipe non-blocking (we don't want to
84 * block on pipe read on the server side)
86 if( -1 == (flags = fcntl( ctx->cpipe[0], F_GETFL )) ||
87 -1 == fcntl( ctx->cpipe[0], F_SETFL, flags | O_NONBLOCK ) ) {
88 mperror( g_flog, errno, "%s: fcntl", __func__ );
89 return ERR_INTERNAL;
92 return 0;
96 void
97 free_server_ctx( struct server_ctx* ctx )
99 int i = -1;
101 assert(ctx);
102 free( ctx->cl );
104 for( i = 0; i < 2; ++i ) {
105 if( ctx->cpipe[i] < 0 ) continue;
107 if( 0 != close( ctx->cpipe[i] ) ) {
108 mperror( g_flog, errno,
109 "%s: close(pipe)", __func__ );
113 (void) memset( ctx, 0, sizeof(struct server_ctx) );
114 return;
118 /* find index of the first client with the given pid
121 find_client( const struct server_ctx* ctx, pid_t pid )
123 int i = -1;
125 assert( ctx && (pid >= 0) );
127 for( i = 0; (size_t)i < ctx->clmax; ++i ) {
128 if( ctx->cl[ i ].pid == pid )
129 return i;
132 return -1;
135 /* populate connection's source info in client context
137 static int
138 get_src_info( struct client_ctx* cl, int sockfd )
140 struct stat st;
141 struct sockaddr_in addr;
142 a_socklen_t len = 0;
143 int rc = 0;
145 assert( cl );
146 if( -1 == (rc = fstat( sockfd, &st )) ) {
147 mperror( g_flog, errno, "%s: fstat", __func__ );
148 return ERR_INTERNAL;
151 if( S_ISREG( st.st_mode ) ) {
152 (void) strncpy( cl->src_addr, "File", sizeof(cl->src_addr) );
153 cl->src_addr[ sizeof(cl->src_addr) - 1 ] = '\0';
154 cl->src_port = 0;
156 else if( S_ISSOCK( st.st_mode ) ) {
157 len = sizeof(addr);
158 rc = getpeername( sockfd, (struct sockaddr*)&addr, &len );
159 if( 0 == rc ) {
160 (void)strncpy( cl->src_addr, inet_ntoa(addr.sin_addr),
161 sizeof(cl->src_addr) - 1 );
162 cl->src_addr[ sizeof(cl->src_addr) - 1 ] = '\0';
164 cl->src_port = ntohs(addr.sin_port);
166 else {
167 mperror( g_flog, errno, "%s: getpeername", __func__ );
168 return ERR_INTERNAL;
170 } /* S_ISSOCK */
172 return rc;
176 /* add client to server context
179 add_client( struct server_ctx* ctx,
180 pid_t cpid, const char* maddr, uint16_t mport,
181 int sockfd )
183 struct client_ctx* client = NULL;
184 int index = -1;
185 int rc = 0;
187 assert( ctx && maddr && mport );
189 index = find_client( ctx, 0 );
190 if( -1 == index )
191 return ERR_INTERNAL;
193 client = &(ctx->cl[ index ]);
194 client->pid = cpid;
196 (void) strncpy( client->mcast_addr, maddr, IPADDR_STR_SIZE );
197 client->mcast_addr[ IPADDR_STR_SIZE - 1 ] = '\0';
199 client->mcast_port = mport;
201 rc = get_src_info( client, sockfd );
202 if( 0 != rc ) {
203 client->pid = 0;
204 return ERR_INTERNAL;
207 /* init sender id: 0 indicates no stats */
208 client->tstat.sender_id = 0;
210 ctx->clfree--;
212 if( g_flog ) {
213 (void)tmfprintf( g_flog, "Added client: pid=[%d], maddr=[%s], mport=[%d], "
214 "saddr=[%s], sport=[%d]\n",
215 (int)cpid, maddr, (int)mport, client->src_addr, client->src_port );
217 return 0;
221 /* delete client from server context
224 delete_client( struct server_ctx* ctx, pid_t cpid )
226 struct client_ctx* client = NULL;
227 int index = -1;
229 assert( ctx && (cpid > 0) );
231 index = find_client( ctx, cpid );
232 if( -1 == index ) {
233 return ERR_INTERNAL;
236 client = &(ctx->cl[ index ]);
237 client->pid = 0;
239 ctx->clfree++;
241 if( g_flog ) {
242 TRACE( (void)tmfprintf( g_flog, "Deleted client: pid=[%d]\n", cpid) );
245 return 0;
249 /* init traffic relay statistics
251 void
252 tpstat_init( struct tps_data* d, int setpid )
254 assert( d );
256 if( setpid )
257 d->pid = getpid();
259 d->tm_from = time(NULL);
260 d->niter = 0;
261 d->nbytes = 0;
263 return;
267 /* send statistics update to server (if it's time)
269 void
270 tpstat_update( struct server_ctx* ctx,
271 struct tps_data* d, ssize_t nbytes )
273 static const double MAX_NOUPDATE_ITER = 1000.0;
274 static const double MAX_SEC = 10.0;
276 struct tput_stat ts;
277 int writefd = -1, rc = 0;
278 const int DO_NOT_SET_PID = 0;
279 ssize_t nwr = -1;
280 double nsec;
282 assert( ctx && d );
284 d->nbytes += nbytes;
286 nsec = difftime( time(NULL), d->tm_from );
287 d->niter++;
288 if( !((d->niter >= MAX_NOUPDATE_ITER) || (nsec >= MAX_SEC)) )
289 return;
291 /* attempt to send update to server */
292 d->niter = 0;
294 ts.sender_id = d->pid;
295 ts.nbytes = d->nbytes;
296 ts.nsec = nsec;
298 writefd = ctx->cpipe[1];
299 do {
300 nwr = write( writefd, &ts, sizeof(ts) );
301 if( nwr <= 0 ) {
302 if( (EINTR != errno) && (EAGAIN != errno) ) {
303 mperror( g_flog, errno, "%s: write", __func__ );
304 rc = -1;
305 break;
307 /* if it's an interrupt or pipe full - ignore */
308 TRACE( (void)tmfprintf( g_flog, "%s - write error [%s] - ignored\n",
309 __func__, strerror(errno)) );
310 break;
312 if( sizeof(ts) != (size_t)nwr ) {
313 (void)tmfprintf( g_flog, "%s - wrote [%d] bytes to pipe, "
314 "expected [%u]\n",
315 __func__, nwr, (int)sizeof(ts) );
316 rc = -1;
317 break;
321 TRACE( (void)tmfprintf( g_flog,
322 "Sent TSTAT={ sender=[%ld], bytes=[%f], seconds=[%f] }\n",
323 (long)ts.sender_id, ts.nbytes, ts.nsec) );
325 } while(0);
328 if( 0 == rc ) {
329 tpstat_init( d, DO_NOT_SET_PID );
332 return;
336 /* read client statistics data and update the context
339 tpstat_read( struct server_ctx* ctx )
341 int cindex = -1;
342 int readfd = -1;
343 ssize_t nread = 0;
344 struct tput_stat ts;
345 struct client_ctx* client = NULL;
347 assert( ctx );
349 readfd = ctx->cpipe[0];
350 assert( readfd > 0 );
352 (void)memset( &ts, 0, sizeof(ts) );
354 nread = read( readfd, &ts, sizeof(ts) );
355 if( nread <= 0 ) {
356 if( (EINTR != errno) && (EAGAIN != errno) ) {
357 mperror( g_flog, errno, "%s: read", __func__ );
358 return ERR_INTERNAL;
360 /* if it's an interrupt or no data available - ignore */
361 TRACE( (void)tmfprintf( g_flog, "%s - read error [%s] - ingored\n",
362 __func__, strerror(errno)) );
363 return 0;
365 if( sizeof(ts) != (size_t)nread ) {
366 (void)tmfprintf( g_flog, "%s - read [%d] bytes from pipe, expected [%u]\n",
367 __func__, nread, (int)sizeof(ts) );
368 return ERR_INTERNAL;
371 TRACE( (void)tmfprintf( g_flog,
372 "Received TSTAT={ sender=[%ld], bytes=[%f], seconds=[%f] }\n",
373 (long)ts.sender_id, ts.nbytes, ts.nsec) );
375 cindex = find_client( ctx, (pid_t)ts.sender_id );
376 if( -1 == cindex ) {
377 (void)tmfprintf( g_flog, "%s - cannot find client [%ld]\n",
378 __func__, (long)ts.sender_id );
379 /* ignore invalid client id's */
380 return 0;
383 client = &(ctx->cl[ cindex ]);
384 client->tstat = ts;
386 TRACE( (void)tmfprintf( g_flog, "Updated context for pid=[%d]; "
387 "[%.1f] Kb/sec\n", client->pid, (ts.nbytes / 1024) / ts.nsec ) );
388 return 0;
393 /* __EOF__ */