1 /* @(#) udpxrec utility: main module
3 * Copyright 2008-2011 Pavel V. Cherenkov (pcherenkov@gmail.com)
5 * This file is part of udpxy.
7 * udpxy is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
12 * udpxy is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
17 * You should have received a copy of the GNU General Public License
18 * along with udpxy. If not, see <http://www.gnu.org/licenses/>.
30 #include <sys/socket.h>
31 #include <arpa/inet.h>
32 #include <sys/types.h>
35 #ifndef __USE_LARGEFILE64
36 #define __USE_LARGEFILE64
39 #ifndef __USE_FILE_OFFSET64
40 #define __USE_FILE_OFFSET64
45 #include "osdef.h" /* os-specific definitions */
56 /* external globals */
59 extern volatile sig_atomic_t g_quit
;
60 extern const char g_udpxrec_app
[];
62 extern const char COMPILE_MODE
[];
63 extern const char VERSION
[];
64 extern const int BUILDNUM
;
66 static volatile sig_atomic_t g_alarm
= 0;
71 struct udpxrec_opt g_recopt
;
74 /* handler for signals requestin application exit
77 handle_quitsigs(int signo
)
80 if( SIGALRM
== signo
) g_alarm
= 1;
82 TRACE( (void)tmfprintf( g_flog
,
83 "*** Caught SIGNAL [%d] in process=[%d] ***\n",
89 /* return 1 if the application must gracefully quit
92 must_quit() { return g_quit
; }
96 usage( const char* app
, FILE* fp
)
98 extern const char VERSION
[];
99 extern const int BUILDNUM
;
100 extern const char UDPXY_COPYRIGHT_NOTICE
[];
101 extern const char UDPXY_CONTACT
[];
102 extern const char COMPILE_MODE
[];
104 (void) fprintf(fp
, "%s %s (build %d) %s\n", app
, VERSION
, BUILDNUM
,
106 (void) fprintf(fp
, "usage: %s [-v] [-b begin_time] [-e end_time] "
107 "[-M maxfilesize] [-p pidfile] [-B bufsizeK] [-n nice_incr] "
108 "[-m mcast_ifc_addr] [-l logfile] "
109 "-c src_addr:port dstfile\n",
113 "\t-v : enable verbose output [default = disabled]\n"
114 "\t-b : begin recording at [+]dd:hh24:mi.ss\n"
115 "\t-e : stop recording at [+]dd:hh24:mi.ss\n"
116 "\t-M : maximum size of destination file\n"
117 "\t-p : pidfile for this process [MUST be specified if daemon]\n"
118 "\t-B : buffer size for inbound (multicast) data [default = %ld bytes]\n"
119 "\t-R : maximum messages to store in buffer (-1 = all) "
121 (long)DEFAULT_CACHE_LEN
);
123 "\t-T : do NOT run as a daemon [default = daemon if root]\n"
124 "\t-n : nice value increment [default = %d]\n"
125 "\t-m : name or address of multicast interface to read from\n"
126 "\t-c : multicast channel to record - ipv4addr:port\n"
127 "\t-l : write output into the logfile\n"
128 "\t-u : seconds to wait before updating on how long till recording starts\n",
129 g_recopt
.nice_incr
);
131 (void) fprintf( fp
, "Examples:\n"
132 " %s -b 15:45.00 -e +2:00.00 -M 1.5Gb -n 2 -B 64K -c 224.0.11.31:5050 "
133 " /opt/video/tv5.mpg \n"
134 "\tbegin recording multicast channel 224.0.11.31:5050 at 15:45 today,\n"
135 "\tfinish recording in two hours or if destination file size >= 1.5 Gb;\n"
136 "\tset socket buffer to 64Kb; increment nice value by 2;\n"
137 "\twrite captured video to /opt/video/tv5.mpg\n",
139 (void) fprintf( fp
, "\n %s\n", UDPXY_COPYRIGHT_NOTICE
);
140 (void) fprintf( fp
, " %s\n\n", UDPXY_CONTACT
);
144 /* update wait status
148 update_waitstat( FILE* log
, time_t end_time
)
150 /* TODO: make sure to distinguish if log is a terminal
151 * terminal gets the progress bar, etc. */
153 double tdiff
= difftime( end_time
, time(NULL
) );
154 (void) tmfprintf( log
, "%.0f\tseconds till recording begins\n",
159 /* wait till the given time, update wait status
160 * every update_sec seconds
164 wait_till( time_t endtime
, int update_sec
)
168 time_t now
= time(NULL
);
169 sig_atomic_t quit
= 0;
171 TRACE( (void)tmfprintf( g_flog
, "%s: waiting till time=[%ld], now=[%ld]\n",
172 __func__
, (long)endtime
, (long)now
) );
174 if( now
>= endtime
) return 0;
176 (void) tmfprintf( g_flog
, "[%ld] seconds before recording begins\n",
177 (long)(endtime
- now
) );
179 if( (update_sec
<= 0) ||
180 ((now
+ update_sec
) > endtime
) ) {
181 unslept
= sleep( endtime
- now
);
184 while( !(quit
= must_quit()) && (now
< endtime
) ) {
185 sec2wait
= (now
+ update_sec
) <= endtime
186 ? update_sec
: (endtime
- now
);
187 update_waitstat( g_flog
, endtime
);
189 TRACE( (void)tmfprintf( g_flog
, "Waiting for [%u] more seconds.\n",
192 unslept
= sleep( sec2wait
);
197 TRACE( (void)tmfprintf( g_flog
, "[%u] seconds unslept, quit=[%d]\n", unslept
,
199 return (unslept
? ERR_INTERNAL
: 0);
204 calc_buf_settings( ssize_t
* bufmsgs
, size_t* sock_buflen
)
206 ssize_t nmsgs
= -1, max_buf_used
= -1, env_snd_buflen
= -1;
209 /* how many messages should we process? */
210 nmsgs
= (g_recopt
.rbuf_msgs
> 0) ? g_recopt
.rbuf_msgs
:
211 (int)g_recopt
.bufsize
/ ETHERNET_MTU
;
213 /* how many bytes could be written at once
214 * to the send socket */
215 max_buf_used
= (g_recopt
.rbuf_msgs
> 0)
216 ? (ssize_t
)(nmsgs
* ETHERNET_MTU
) : g_recopt
.bufsize
;
217 if (max_buf_used
> g_recopt
.bufsize
) {
218 max_buf_used
= g_recopt
.bufsize
;
221 assert( max_buf_used
>= 0 );
223 env_snd_buflen
= get_sizeval( "UDPXREC_SOCKBUF_LEN", 0);
224 buflen
= (env_snd_buflen
> 0) ? (size_t)env_snd_buflen
: (size_t)max_buf_used
;
226 if (buflen
< (size_t) MIN_SOCKBUF_LEN
) {
227 buflen
= (size_t) MIN_SOCKBUF_LEN
;
230 /* cannot go below the size of effective usage */
231 if( buflen
< (size_t)max_buf_used
) {
232 buflen
= (size_t)max_buf_used
;
235 if (bufmsgs
) *bufmsgs
= nmsgs
;
236 if (sock_buflen
) *sock_buflen
= buflen
;
238 TRACE( (void)tmfprintf( g_flog
,
239 "min socket buffer = [%ld], "
240 "max space to use = [%ld], "
242 (long)buflen
, (long)max_buf_used
, (long)nmsgs
) );
249 /* subscribe to the (configured) multicast channel
252 subscribe( int* sockfd
, struct in_addr
* mcast_inaddr
)
254 struct sockaddr_in sa
;
255 const char* ipaddr
= g_recopt
.rec_channel
;
256 size_t rcvbuf_len
= 0;
259 assert( sockfd
&& mcast_inaddr
);
261 if( 1 != inet_aton( ipaddr
, &sa
.sin_addr
) ) {
262 mperror( g_flog
, errno
,
263 "%s: Invalid subscription [%s:%d]: inet_aton",
264 __func__
, ipaddr
, g_recopt
.rec_port
);
268 sa
.sin_family
= AF_INET
;
269 sa
.sin_port
= htons( (uint16_t)g_recopt
.rec_port
);
271 if( 1 != inet_aton( g_recopt
.mcast_addr
, mcast_inaddr
) ) {
272 mperror( g_flog
, errno
,
273 "%s: Invalid multicast interface: [%s]: inet_aton",
274 __func__
, g_recopt
.mcast_addr
);
278 rc
= calc_buf_settings( NULL
, &rcvbuf_len
);
279 if (0 != rc
) return rc
;
281 return setup_mcast_listener( &sa
, mcast_inaddr
,
282 sockfd
, (g_recopt
.nosync_sbuf
? 0 : rcvbuf_len
) );
286 /* record network stream as per spec in opt
291 int rsock
= -1, destfd
= -1, rc
= 0, wtime_sec
= 0;
292 struct in_addr raddr
;
294 struct dstream_ctx ds
;
296 ssize_t nrcv
= -1, lrcv
= -1, t_delta
= 0;
298 ssize_t nwr
= -1, lwr
= -1;
299 sig_atomic_t quit
= 0;
300 struct rdata_opt ropt
;
305 static const u_short RSOCK_TIMEOUT
= 5;
306 extern const char CMD_UDP
[];
308 /* NOPs to eliminate warnings in lean version */
309 t_delta
= lrcv
= lwr
= 0; quit
=0;
311 check_fragments( NULL
, 0, 0, 0, 0, g_flog
);
316 data
= malloc( g_recopt
.bufsize
);
318 mperror(g_flog
, errno
, "%s: cannot allocate [%ld] bytes",
319 __func__
, (long)g_recopt
.bufsize
);
324 rc
= subscribe( &rsock
, &raddr
);
327 rtv
.tv_sec
= RSOCK_TIMEOUT
;
330 rc
= setsockopt( rsock
, SOL_SOCKET
, SO_RCVTIMEO
, &rtv
, sizeof(rtv
) );
332 mperror(g_flog
, errno
, "%s: setsockopt - SO_RCVTIMEO",
338 oflags
= O_CREAT
| O_TRUNC
| O_WRONLY
|
339 S_IRUSR
| S_IWUSR
| S_IRGRP
| S_IROTH
;
340 # if defined(O_LARGEFILE)
341 /* O_LARGEFILE is not defined under FreeBSD ??-7.1 */
342 oflags
|= O_LARGEFILE
;
344 destfd
= open( g_recopt
.dstfile
, oflags
,
345 (mode_t
)(S_IRUSR
| S_IWUSR
| S_IRGRP
| S_IROTH
));
347 mperror( g_flog
, errno
, "%s: cannot create destination file [%s]",
348 __func__
, g_recopt
.dstfile
);
353 rc
= calc_buf_settings( &nmsgs
, NULL
);
354 if (0 != rc
) return -1;
356 if( nmsgs
< (ssize_t
)1 ) {
357 (void) tmfprintf( g_flog
, "Buffer for inbound data is too small [%ld] bytes; "
358 "the minimum size is [%ld] bytes\n",
359 (long)g_recopt
.bufsize
, (long)ETHERNET_MTU
);
364 TRACE( (void)tmfprintf( g_flog
, "Inbound buffer set to "
365 "[%d] messages\n", nmsgs
) );
367 rc
= init_dstream_ctx( &ds
, CMD_UDP
, NULL
, nmsgs
);
368 if( 0 != rc
) return -1;
370 (void) set_nice( g_recopt
.nice_incr
, g_flog
);
372 /* set up alarm to break main loop */
373 if( 0 != g_recopt
.end_time
) {
374 wtime_sec
= (int)difftime( g_recopt
.end_time
, time(NULL
) );
375 assert( wtime_sec
>= 0 );
377 (void) alarm( wtime_sec
);
379 (void)tmfprintf( g_flog
, "Recording will end in [%d] seconds\n",
385 ropt
.max_frgs
= g_recopt
.rbuf_msgs
;
388 for( n_total
= 0; (0 == rc
) && !(quit
= must_quit()); ) {
389 nrcv
= read_data( &ds
, rsock
, data
, g_recopt
.bufsize
, &ropt
);
390 if( -1 == nrcv
) { rc
= ERR_INTERNAL
; break; }
393 (void) tmfprintf( g_flog
, "Recording to file=[%s] started.\n",
397 TRACE( check_fragments( "received new", g_recopt
.bufsize
,
398 lrcv
, nrcv
, t_delta
, g_flog
) );
402 if( g_recopt
.max_fsize
&&
403 ((n_total
+ nrcv
) >= g_recopt
.max_fsize
) ) {
407 nwr
= write_data( &ds
, data
, nrcv
, destfd
);
408 if( -1 == nwr
) { rc
= ERR_INTERNAL
; break; }
412 TRACE( tmfprintf( g_flog, "Wrote [%ld] to file, total=[%ld]\n",
413 (long)nwr, (long)n_total ) );
416 TRACE( check_fragments( "wrote to file",
417 nrcv
, lwr
, nwr
, t_delta
, g_flog
) );
421 if( ds
.flags
& F_SCATTERED
) reset_pkt_registry( &ds
);
425 (void) tmfprintf( g_flog
, "Recording to file=[%s] stopped at filesize=[%ld] bytes\n",
426 g_recopt
.dstfile
, (long)n_total
);
432 TRACE( (void)tmfprintf( g_flog
, "Exited record loop: wrote [%ld] bytes to file [%s], "
433 "rc=[%d], alarm=[%ld], quit=[%ld]\n",
434 (long)n_total
, g_recopt
.dstfile
, rc
, g_alarm
, (long)quit
) );
436 free_dstream_ctx( &ds
);
437 if( data
) free( data
);
439 close_mcast_listener( rsock
, &raddr
);
440 if( destfd
>= 0 ) (void) close( destfd
);
443 TRACE( (void)tmfprintf( g_flog
, "%s process must quit\n",
450 /* make sure the channel is valid: subscribe/close
455 struct in_addr mcast_inaddr
;
456 int sockfd
= -1, rc
= -1;
461 static const time_t MSOCK_TMOUT_SEC
= 2;
463 rc
= subscribe( &sockfd
, &mcast_inaddr
);
467 rtv
.tv_sec
= MSOCK_TMOUT_SEC
;
469 rc
= setsockopt( sockfd
, SOL_SOCKET
, SO_RCVTIMEO
, &rtv
, sizeof(rtv
) );
471 mperror(g_flog
, errno
, "%s: setsockopt - SO_RCVTIMEO",
473 rc
= ERR_INTERNAL
; break;
476 /* attempt to read from the socket to
477 * make sure the channel is alive
479 nrd
= read( sockfd
, buf
, sizeof(buf
) );
482 mperror( g_flog
, errno
, "channel read" );
484 (void) tmfprintf( g_flog
,
485 "failed to read from [%s:%d]\n",
486 g_recopt
.rec_channel
, g_recopt
.rec_port
);
488 rc
= ERR_INTERNAL
; break;
491 TRACE( (void)tmfprintf( g_flog
, "%s: read [%ld] bytes "
492 "from source channel\n", __func__
, nrd
) );
496 close_mcast_listener( sockfd
, &mcast_inaddr
);
503 /* set up signal handling
508 struct sigaction qact
, oldact
;
510 qact
.sa_handler
= handle_quitsigs
;
511 sigemptyset(&qact
.sa_mask
);
514 if( (sigaction(SIGTERM
, &qact
, &oldact
) < 0) ||
515 (sigaction(SIGQUIT
, &qact
, &oldact
) < 0) ||
516 (sigaction(SIGINT
, &qact
, &oldact
) < 0) ||
517 (sigaction(SIGALRM
, &qact
, &oldact
) < 0) ||
518 (sigaction(SIGPIPE
, &qact
, &oldact
) < 0)) {
519 perror("sigaction-quit");
528 extern int udpxrec_main( int argc
, char* const argv
[] );
530 /* main() for udpxrec module
532 int udpxrec_main( int argc
, char* const argv
[] )
534 int rc
= 0, ch
= 0, custom_log
= 0, no_daemon
= 0;
535 static const char OPTMASK
[] = "vb:e:M:p:B:n:m:l:c:R:u:T";
536 time_t now
= time(NULL
);
537 char now_buf
[ 32 ] = {0}, sel_buf
[ 32 ] = {0}, app_finfo
[80] = {0};
539 extern int optind
, optopt
;
540 extern const char IPv4_ALL
[];
543 usage( argv
[0], stderr
);
547 rc
= init_recopt( &g_recopt
);
548 while( (0 == rc
) && (-1 != (ch
= getopt( argc
, argv
, OPTMASK
))) ) {
550 case 'T': no_daemon
= 1; break;
551 case 'v': set_verbose( &g_recopt
.is_verbose
); break;
553 if( (time_t)0 != g_recopt
.end_time
) {
554 (void) fprintf( stderr
, "Cannot specify start-recording "
555 "time after end-recording time has been set\n" );
558 rc
= a2time( optarg
, &g_recopt
.bg_time
, time(NULL
) );
560 (void) fprintf( stderr
, "Invalid time: [%s]\n", optarg
);
564 if( g_recopt
.bg_time
< now
) {
565 (void)strncpy( now_buf
, Zasctime(localtime( &now
)),
567 (void)strncpy( sel_buf
,
568 Zasctime(localtime( &g_recopt
.bg_time
)),
571 (void) fprintf( stderr
,
572 "Selected %s time is in the past, "
573 "now=[%s], selected=[%s]\n", "start",
581 if( (time_t)0 == g_recopt
.bg_time
) {
582 g_recopt
.bg_time
= time(NULL
);
583 (void)fprintf( stderr
,
584 "Start-recording time defaults to now [%s]\n",
585 Zasctime( localtime( &g_recopt
.bg_time
) ) );
588 rc
= a2time( optarg
, &g_recopt
.end_time
, g_recopt
.bg_time
);
590 (void) fprintf( stderr
, "Invalid time: [%s]\n", optarg
);
594 if( g_recopt
.end_time
< now
) {
595 (void)strncpy( now_buf
, Zasctime(localtime( &now
)),
597 (void)strncpy( sel_buf
,
598 Zasctime(localtime( &g_recopt
.end_time
)),
601 (void) fprintf( stderr
,
602 "Selected %s time is in the past, "
603 "now=[%s], selected=[%s]\n", "end",
611 rc
= a2int64( optarg
, &g_recopt
.max_fsize
);
613 (void) fprintf( stderr
, "Invalid file size: [%s]\n",
619 g_recopt
.pidfile
= strdup(optarg
);
623 rc
= a2size( optarg
, &g_recopt
.bufsize
);
625 (void) fprintf( stderr
, "Invalid buffer size: [%s]\n",
629 else if( (g_recopt
.bufsize
< MIN_MCACHE_LEN
) ||
630 (g_recopt
.bufsize
> MAX_MCACHE_LEN
)) {
631 (void) fprintf( stderr
,
632 "Buffer size must be in [%ld-%ld] bytes range\n",
633 (long)MIN_MCACHE_LEN
, (long)MAX_MCACHE_LEN
);
639 g_recopt
.nice_incr
= atoi( optarg
);
640 if( 0 == g_recopt
.nice_incr
) {
641 (void) fprintf( stderr
,
642 "Invalid nice-value increment: [%s]\n", optarg
);
647 rc
= get_ipv4_address( optarg
, g_recopt
.mcast_addr
,
648 sizeof(g_recopt
.mcast_addr
) );
650 (void) fprintf( stderr
, "Invalid multicast address: [%s]\n",
656 g_flog
= fopen( optarg
, "a" );
657 if( NULL
== g_flog
) {
659 (void) fprintf( stderr
, "Error opening logfile [%s]: %s\n",
660 optarg
, strerror(rc
) );
661 rc
= ERR_PARAM
; break;
664 Setlinebuf( g_flog
);
669 rc
= get_addrport( optarg
, g_recopt
.rec_channel
,
670 sizeof( g_recopt
.rec_channel
),
671 &g_recopt
.rec_port
);
672 if( 0 != rc
) rc
= ERR_PARAM
;
676 g_recopt
.rbuf_msgs
= atoi( optarg
);
677 if( (g_recopt
.rbuf_msgs
<= 0) && (-1 != g_recopt
.rbuf_msgs
) ) {
678 (void) fprintf( stderr
,
679 "Invalid rcache size: [%s]\n", optarg
);
685 g_recopt
.waitupd_sec
= atoi(optarg
);
686 if( g_recopt
.waitupd_sec
<= 0 ) {
687 (void) fprintf( stderr
, "Invalid wait-update value [%s] "
688 "(must be a number > 0)\n", optarg
);
694 (void) fprintf( stderr
, "Option [-%c] requires an argument\n",
696 rc
= ERR_PARAM
; break;
698 (void) fprintf( stderr
, "Unrecognized option: [-%c]\n", optopt
);
699 rc
= ERR_PARAM
; break;
701 usage( argv
[0], stderr
);
702 rc
= ERR_PARAM
; break;
708 if( optind
>= argc
) {
709 (void) fputs( "Missing destination file parameter\n", stderr
);
713 g_recopt
.dstfile
= strdup( argv
[optind
] );
716 if( !(g_recopt
.max_fsize
> 0 || g_recopt
.end_time
) ) {
717 (void) fputs( "Must specify either max file [-M] size "
718 "or end time [-e]\n", stderr
);
722 if( !g_recopt
.rec_channel
[0] || !g_recopt
.rec_port
) {
723 (void) fputs( "Must specify multicast channel to record from\n",
730 free_recopt( &g_recopt
);
735 if( '\0' == g_recopt
.mcast_addr
[0] ) {
736 (void) strncpy( g_recopt
.mcast_addr
, IPv4_ALL
,
737 sizeof(g_recopt
.mcast_addr
) - 1 );
741 /* in debug mode output goes to stderr, otherwise to /dev/null */
742 g_flog
= ((uf_TRUE
== g_recopt
.is_verbose
)
744 : fopen( "/dev/null", "a" ));
745 if( NULL
== g_flog
) {
747 rc
= ERR_INTERNAL
; break;
751 if( 0 == geteuid() ) {
753 if( stderr
== g_flog
) {
754 (void) fprintf( stderr
,
755 "Logfile must be specified to run "
756 "in verbose mode in background\n" );
757 rc
= ERR_PARAM
; break;
760 if( NULL
== g_recopt
.pidfile
) {
761 (void) fprintf( stderr
, "pidfile must be specified "
762 "to run as daemon\n" );
763 rc
= ERR_PARAM
; break;
766 if( 0 != (rc
= daemonize(0, g_flog
)) ) {
767 rc
= ERR_INTERNAL
; break;
771 } /* 0 == geteuid() */
773 if( NULL
!= g_recopt
.pidfile
) {
774 rc
= make_pidfile( g_recopt
.pidfile
, getpid(), g_flog
);
778 (void) set_nice( g_recopt
.nice_incr
, g_flog
);
780 if( 0 != (rc
= setup_signals()) ) break;
782 TRACE( fprint_recopt( g_flog
, &g_recopt
) );
784 (void) snprintf( app_finfo
, sizeof(app_finfo
),
785 "%s %s (build %d) %s", g_udpxrec_app
, VERSION
, BUILDNUM
,
788 TRACE( printcmdln( g_flog
, app_finfo
, argc
, argv
) );
790 if( g_recopt
.bg_time
) {
791 if( 0 != (rc
= verify_channel()) || g_quit
)
794 rc
= wait_till( g_recopt
.bg_time
, g_recopt
.waitupd_sec
);
795 if( rc
|| g_quit
) break;
800 if( NULL
!= g_recopt
.pidfile
) {
801 if( -1 == unlink(g_recopt
.pidfile
) ) {
802 mperror( g_flog
, errno
, "unlink [%s]", g_recopt
.pidfile
);
809 (void)tmfprintf( g_flog
, "%s is exiting with rc=[%d]\n",
813 if( g_flog
&& (stderr
!= g_flog
) ) {
814 (void) fclose(g_flog
);
817 free_recopt( &g_recopt
);