1 /* @(#) implementation of packet-io functions for udpxy
3 * Copyright 2008-2011 Pavel V. Cherenkov (pcherenkov@gmail.com)
5 * This file is part of udpxy.
7 * udpxy is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
12 * udpxy is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
17 * You should have received a copy of the GNU General Public License
18 * along with udpxy. If not, see <http://www.gnu.org/licenses/>.
21 #include <sys/types.h>
40 static const size_t TS_SEG_LEN
= 188;
42 static const char* upxfmt_NAME
[] = {
43 "UNKNOWN", "MPEG-TS", "RTP-TS", "UDPXY-UDS"
48 fmt2str( upxfmt_t fmt
)
52 assert( (ifmt
> (int)DT_FIRST
) && (ifmt
< (int)DT_LAST
) );
53 return upxfmt_NAME
[ ifmt
];
57 /* check for MPEG TS signature, complain if not found
60 ts_sigcheck( const int c
, off_t offset
, ssize_t len
,
61 FILE* log
, const char* func
)
65 (void)(len
); /* NOP to avoid warnings */
67 if( c
== MPEG_TS_SIG
) return 0;
70 (void) offset
; /* get rid of a warning if TRACE is disabled */
71 TRACE( (void)tmfprintf( log
, "%s: TS signature mismatch TS=[0x%02X], found=[0x%02X]; "
72 "offset [0x%X(%u)] of packet len=[%lu]\n",
73 func
, MPEG_TS_SIG
, c
, offset
, offset
, (u_long
)len
) );
80 /* determine type of stream in memory
83 get_mstream_type( const char* data
, size_t len
, FILE* log
)
89 assert( data
&& len
);
91 if( len
< (RTP_HDR_SIZE
+ 1) ) {
92 (void) tmfprintf( log
, "%s: read [%ld] bytes,"
93 " not enough for RTP header\n", __func__
,
98 /* if the 1st byte has MPEG-TS signature - skip further checks */
100 if( 0 == ts_sigcheck( sig
, 0, 1, NULL
/*log*/, __func__
) )
103 /* if not RTP - quit */
104 if( 0 == RTP_verify( data
, RTP_HDR_SIZE
, log
) )
107 /* check the first byte after RTP header - should be
108 * TS signature to be RTP over TS */
110 if( (0 != RTP_hdrlen( data
, len
, &hdrlen
, log
)) ||
115 sig
= data
[ hdrlen
];
116 if( 0 != ts_sigcheck( sig
, 0, 1, log
, __func__
) )
124 /* determine type of stream saved in file
127 get_fstream_type( int fd
, FILE* log
)
130 off_t offset
= 0, where
= 0;
131 upxfmt_t dtype
= DT_UNKNOWN
;
134 /* read in enough data to contain extended header
135 * and beginning of payload segment */
136 size_t len
= TS_SEG_LEN
+ RTP_XTHDRLEN
;
138 assert( (fd
> 0) && log
);
140 if( NULL
== (data
= malloc( len
)) ) {
141 mperror( log
, errno
, "%s: malloc", __func__
);
146 /* check if it is a MPEG TS stream
148 n
= read( fd
, data
, len
);
149 if( 0 != sizecheck( "Not enough space for stream data",
150 len
, n
, log
, __func__
) ) break;
153 dtype
= get_mstream_type( data
, len
, log
);
154 if( DT_UNKNOWN
== dtype
) {
155 TRACE( (void)tmfprintf( log
, "%s: file type is not recognized\n",
162 if( NULL
!= data
) free( data
);
165 mperror( log
, errno
, "%s", __func__
);
169 where
= lseek( fd
, (-1) * offset
, SEEK_CUR
);
171 mperror( log
, errno
, "%s: lseek", __func__
);
176 TRACE( (void)tmfprintf( log, "%s: stream type = [%d]=[%s]\n",
177 __func__, (int)dtype, fmt2str(dtype) ) );
184 /* read a sequence of MPEG TS packets (to fit into the given buffer)
187 read_ts_file( int fd
, char* data
, const size_t len
, FILE* log
)
189 const size_t pkt_len
= ((len
- 1) / TS_SEG_LEN
) * TS_SEG_LEN
;
194 assert( (fd
> 0) && data
&& len
&& log
);
196 assert( !buf_overrun( data
, len
, 0, pkt_len
, log
) );
197 n
= read_buf( fd
, data
, pkt_len
, log
);
198 if( n
<= 0 ) return n
;
200 if( 0 != sizecheck( "Bad TS packet stream",
201 pkt_len
, n
, log
, __func__
) )
204 /* make sure we've read TS records, not random data
206 for( k
= 0; k
< (off_t
)pkt_len
; k
+= TS_SEG_LEN
) {
207 if( -1 == ts_sigcheck( data
[k
], k
, (u_long
)pkt_len
,
213 return (bad_frg
? -1 : n
);
217 /* read an RTP packet
220 read_rtp_file( int fd
, char* data
, const size_t len
, FILE* log
)
222 ssize_t nrd
= -1, offset
= 0;
223 size_t hdrlen
= 0, rdlen
= 0;
225 int rtp_end
= 0, rc
= 0;
228 assert( (fd
> 0) && data
&& len
&& log
);
230 assert( !buf_overrun( data
, len
, 0, RTP_HDR_SIZE
, log
) );
231 nrd
= read_buf( fd
, data
, RTP_HDR_SIZE
, log
);
232 if( nrd
<= 0 ) return nrd
;
235 if( -1 == sizecheck( "Bad RTP header", RTP_HDR_SIZE
, nrd
,
239 if( 0 == RTP_verify( data
, nrd
, log
) )
242 if( -1 == (rc
= RTP_hdrlen( data
, nrd
, &hdrlen
, log
)) )
245 /* if there is an extended header, read it in */
247 assert( !buf_overrun( data
, len
, offset
,
248 RTP_XTHDRLEN
- RTP_HDR_SIZE
, log
) );
249 nrd
= read_buf( fd
, data
+ offset
,
250 RTP_XTHDRLEN
- RTP_HDR_SIZE
, log
);
252 (-1 == sizecheck("Bad RTP x-header",
253 RTP_XTHDRLEN
- RTP_HDR_SIZE
, nrd
,
257 if( 0 == nrd
) return nrd
;
260 rc
= RTP_hdrlen( data
, offset
, &hdrlen
, log
);
262 TRACE( (void)tmfprintf( log
, "%s: bad RTP header - quitting\n",
268 TRACE( (void)tmfprintf( log, "%s: RTP x-header length=[%lu]\n",
269 __func__, (u_long)hdrlen ) );
272 if( (size_t)offset
> hdrlen
) {
273 /* read more than needed: step back */
275 where
= lseek( fd
, (-1)*((size_t)offset
- hdrlen
), SEEK_CUR
);
277 mperror( log
, errno
, "%s: lseek", __func__
);
281 offset
-= ((size_t)offset
- hdrlen
);
282 assert( (size_t)offset
== hdrlen
);
285 TRACE( (void)tmfprintf( log, "%s: back to fpos=[0x%X], "
286 "offset=[%ld]\n", __func__, (u_int)where, (long)offset ) );
289 else if( hdrlen
> (size_t)offset
) {
290 /* read remainder of the header in */
292 assert( !buf_overrun( data
, len
, offset
,
293 (hdrlen
- (size_t)offset
), log
) );
294 nrd
= read_buf( fd
, data
+ offset
,
295 (hdrlen
- (size_t)offset
), log
);
297 (-1 == sizecheck("Bad RTP x-header tail",
298 (hdrlen
- (size_t)offset
), nrd
,
302 if( 0 == nrd
) return nrd
;
305 assert( (size_t)offset
== hdrlen
);
307 } /* read extended header */
310 /* read TS records until there is another RTP header or EOF */
311 for( frg
= 0; (ssize_t
)len
> offset
; ++frg
) {
313 rdlen
= ( (len
- offset
) < TS_SEG_LEN
318 TRACE( (void)tmfprintf( log, "%s: reading [%lu] more bytes\n",
319 __func__, (u_long)rdlen ) );
322 assert( !buf_overrun( data
, len
, offset
, rdlen
, log
) );
323 nrd
= read_buf( fd
, data
+ offset
, rdlen
, log
);
324 if( nrd
<= 0 ) break;
326 /* if it's an RTP header, roll back and return
328 rtp_end
= RTP_verify( data
+ offset
, nrd
, log
);
330 if( -1 == lseek( fd
, (-1) * nrd
, SEEK_CUR
) ) {
331 mperror( log
, errno
, "%s: lseek", __func__
);
338 /* check if it is a TS packet and it's of the right size
340 if( (-1 == ts_sigcheck( data
[offset
], offset
, (u_long
)TS_SEG_LEN
,
342 (-1 == sizecheck( "Bad TS segment size", TS_SEG_LEN
, nrd
,
344 TRACE( hex_dump( "Data in question", data
, offset
+ nrd
, log
) );
352 /* If it is not EOF and no RTP header for the next message is found,
353 * it is either our buffer is too small (to fit the whole message)
354 * or the stream is invalid
356 if( !rtp_end
&& (0 != nrd
) ) {
357 (void)tmfprintf( log
, "%s: no RTP end after reading [%ld] bytes\n",
358 __func__
, (long)offset
);
362 return (nrd
< 0) ? nrd
: offset
;
366 /* read record of one of the supported types from file
369 read_frecord( int fd
, char* data
, const size_t len
,
370 upxfmt_t
* stream_type
, FILE* log
)
373 /* off_t where = -1, endmark = -1; */
377 assert( data
&& len
);
378 assert( stream_type
&& log
);
380 stype
= *stream_type
;
383 where = lseek( fd, 0, SEEK_CUR );
384 TRACE( (void)tmfprintf( log, "%s: BEGIN reading at pos=[0x%X:%u]\n",
385 __func__, (u_int)where, (u_int)where ) );
388 if( DT_UNKNOWN
== *stream_type
) {
389 stype
= get_fstream_type( fd
, log
);
391 if( DT_UNKNOWN
== stype
) {
392 (void)tmfprintf( log
, "%s: Unsupported type\n", __func__
);
396 *stream_type
= stype
;
399 if( DT_TS
== stype
) {
400 nrd
= read_ts_file( fd
, data
, len
, log
);
402 else if( DT_RTP_TS
== stype
) {
403 nrd
= read_rtp_file( fd
, data
, len
, log
);
406 (void)tmfprintf( log
, "%s: unknown stream type [%d]\n",
413 endmark = lseek( fd, 0, SEEK_CUR );
415 TRACE( (void)tmfprintf( log, "%s: END reading [%ld] bytes at pos=[0x%X:%u]\n",
416 __func__, (long)nrd, (u_int)endmark, (u_int)endmark ) );
418 TRACE( sizecheck( "WARNING: Read file discrepancy",
419 where + nrd, endmark,
428 /* write data as a UDS record
431 write_uds_record( int fd
, const char* data
, size_t len
, FILE* log
)
433 assert( (fd
> 0) && data
&& len
);
434 (void)(data
&& len
&& fd
);
435 (void)tmfprintf( log
, "%s: UDS conversion not yet implemented\n",
441 /* write RTP record into TS stream
444 write_rtp2ts( int fd
, const char* data
, size_t len
, FILE* log
)
446 void* buf
= (void*)data
;
448 const int NO_VERIFY
= 0;
451 assert( (fd
> 0) && data
&& len
&& log
);
453 rc
= RTP_process( &buf
, &pldlen
, NO_VERIFY
, log
);
454 if( -1 == rc
) return -1;
456 assert( !buf_overrun( buf
, len
, 0, pldlen
, log
) );
457 return write_buf( fd
, buf
, pldlen
, log
);
461 /* write record after converting it from source into destination
465 write_frecord( int fd
, const char* data
, size_t len
,
466 upxfmt_t sfmt
, upxfmt_t dfmt
, FILE* log
)
470 const char *str_from
= NULL
, *str_to
= NULL
;
472 if( DT_UDS
== dfmt
) {
474 nwr
= write_uds_record( fd
, data
, len
, log
);
476 else if( DT_TS
== dfmt
) {
477 if( DT_RTP_TS
== sfmt
) {
479 nwr
= write_rtp2ts( fd
, data
, len
, log
);
484 str_from
= fmt2str(sfmt
);
485 str_to
= fmt2str(dfmt
);
486 (void)tmfprintf( log
, "Conversion from [%s] into [%s] is not supported\n",
495 /* reset packet-buffer registry in stream spec
498 reset_pkt_registry( struct dstream_ctx
* ds
)
502 ds
->flags
&= ~F_DROP_PACKET
;
507 /* release resources allocated for stream spec
510 free_dstream_ctx( struct dstream_ctx
* ds
)
514 if( NULL
!= ds
->pkt
) {
518 ds
->pkt_count
= ds
->max_pkt
= 0;
524 /* register received packet into registry (for scattered output)
527 register_packet( struct dstream_ctx
* spc
, char* buf
, size_t len
)
529 struct iovec
* new_pkt
= NULL
;
530 static const int DO_VERIFY
= 1;
533 size_t new_len
= len
;
535 assert( spc
->max_pkt
> 0 );
537 /* enlarge packet registry if needed */
538 if( spc
->pkt_count
>= spc
->max_pkt
) {
540 spc
->pkt
= realloc( spc
->pkt
, spc
->max_pkt
* sizeof(spc
->pkt
[0]) );
541 if( NULL
== spc
->pkt
) {
542 mperror( g_flog
, errno
, "%s: realloc", __func__
);
546 TRACE( (void)tmfprintf( g_flog
, "RTP packet registry "
547 "expanded to [%lu] records\n", (u_long
)spc
->max_pkt
) );
550 /* put packet info into registry */
552 new_pkt
= &(spc
->pkt
[ spc
->pkt_count
]);
555 TRACE( (void)tmfprintf( stderr, "IN: packet [%lu]: buf=[%p], len=[%lu]\n",
556 (u_long)spc->pkt_count, (void*)buf, (u_long)len ) );
559 if( 0 != RTP_process( &new_buf
, &new_len
, DO_VERIFY
, g_flog
) ) {
560 TRACE( (void)tmfputs("register packet: dropping\n", g_flog
) );
561 spc
->flags
|= F_DROP_PACKET
;
566 new_pkt
->iov_base
= new_buf
;
567 new_pkt
->iov_len
= new_len
;
570 TRACE( (void)tmfprintf( stderr, "OUT: packet [%lu]: buf=[%p], len=[%lu]\n",
571 (u_long)spc->pkt_count, new_pkt->iov_base,
572 (u_long)new_pkt->iov_len ) );
580 /* read data from source, determine underlying protocol
581 * (if not already known); and process the packet
582 * if needed (for RTP - register packet)
584 * return the number of octets read from the source
588 read_packet( struct dstream_ctx
* spc
, int fd
, char* buf
, size_t len
)
591 size_t chunk_len
= len
;
593 assert( spc
&& buf
&& len
);
596 /* if *RAW* data specified - read AS IS
598 if( DT_RAW
== spc
->stype
) {
599 return read_buf( fd
, buf
, len
, g_flog
);
602 /* if it is (or *could* be) RTP, read only MTU bytes
604 if( (spc
->stype
== DT_RTP_TS
) || (spc
->flags
& F_CHECK_FMT
) )
605 chunk_len
= (len
> spc
->mtu
) ? spc
->mtu
: len
;
607 if( spc
->flags
& F_FILE_INPUT
) {
608 assert( !buf_overrun( buf
, len
, 0, chunk_len
, g_flog
) );
609 n
= read_frecord( fd
, buf
, chunk_len
, &(spc
->stype
), g_flog
);
610 if( n
<= 0 ) return n
;
613 assert( !buf_overrun(buf
, len
, 0, chunk_len
, g_flog
) );
614 n
= read_buf( fd
, buf
, chunk_len
, g_flog
);
615 if( n
<= 0 ) return n
;
617 spc
->stype
= get_mstream_type( buf
, n
, g_flog
);
620 if( spc
->flags
& F_CHECK_FMT
) {
621 if( DT_RTP_TS
== spc
->stype
) {
622 /* scattered data (to exclude RTP headers):
623 * use packet registry */
624 spc
->flags
|= F_SCATTERED
;
626 else if( DT_TS
== spc
->stype
) {
627 spc
->flags
&= ~F_SCATTERED
;
631 TRACE( (void)tmfputs( "Unrecognized stream type\n", g_flog
) );
634 TRACE( (void)tmfprintf( g_flog
, "Established stream as [%s]\n",
635 fmt2str( spc
->stype
) ) );
637 spc
->flags
&= ~F_CHECK_FMT
;
640 if( spc
->flags
& F_SCATTERED
)
641 if( -1 == register_packet( spc
, buf
, n
) ) return -1;
647 /* read data from source of specified type (UDP socket or otherwise);
648 * read as many fragments as specified (max_frgs) into the buffer
651 read_data( struct dstream_ctx
* spc
, int fd
, char* data
,
652 const ssize_t data_len
, const struct rdata_opt
* opt
)
655 ssize_t n
= 0, nrcv
= -1;
656 time_t start_tm
= time(NULL
), cur_tm
= 0;
657 time_t buftm_sec
= 0;
659 assert( spc
&& (data_len
> 0) && opt
);
661 /* if max_frgs < 0, read as many packets as can fit in the buffer,
662 * otherwise read no more than max_frgs packets
665 for( m
= 0, n
= 0; ((opt
->max_frgs
< 0) ? 1 : (m
< opt
->max_frgs
)); ++m
) {
666 nrcv
= read_packet( spc
, fd
, data
+ n
, data_len
- n
);
668 if( EAGAIN
== errno
) {
669 (void)tmfprintf( g_flog
,
670 "Receive on socket/file [%d] timed out\n",
674 if( 0 == nrcv
) (void)tmfprintf(g_flog
, "%s - EOF\n",__func__
);
676 mperror(g_flog
, errno
, "%s: read/recv", __func__
);
682 if( spc
->flags
& F_DROP_PACKET
) {
683 spc
->flags
&= ~F_DROP_PACKET
;
688 if( n
>= (data_len
- nrcv
) ) break;
690 if( -1 != opt
->buf_tmout
) {
692 buftm_sec
= cur_tm
- start_tm
;
693 if( buftm_sec
>= opt
->buf_tmout
) {
694 TRACE( (void)tmfprintf( g_flog
, "%s: Buffer timed out "
695 "after [%ld] seconds\n", __func__
,
701 (void) tmfprintf( g_flog, "%s: Skip\n", __func__ );
707 if( (nrcv
> 0) && !n
) {
708 TRACE( (void)tmfprintf( g_flog
, "%s: no data to send "
709 "out of [%d] packets\n", __func__
, m
) );
713 return (nrcv
> 0) ? n
: -1;
717 /* write data to destination(s)
720 write_data( const struct dstream_ctx
* spc
,
725 ssize_t n
= 0, error
= IO_ERR
;
726 int32_t n_count
= -1;
728 assert( spc
&& data
&& len
);
729 if( fd
<= 0 ) return 0;
731 if( spc
->flags
& F_SCATTERED
) {
732 n_count
= spc
->pkt_count
;
733 n
= writev( fd
, spc
->pkt
, n_count
);
735 if( EAGAIN
== errno
) {
736 (void)tmfprintf( g_flog
, "Write on fd=[%d] timed out\n", fd
);
739 mperror( g_flog
, errno
, "%s: writev", __func__
);
744 n
= write_buf( fd
, data
, len
, g_flog
);
749 return (n
> 0) ? n
: error
;
753 /* initialize incoming-stream context:
754 * set data type (if possible) and flags
757 init_dstream_ctx( struct dstream_ctx
* ds
, const char* cmd
, const char* fname
,
760 extern const char CMD_UDP
[];
761 extern const size_t CMD_UDP_LEN
;
762 extern const char CMD_RTP
[];
763 extern const size_t CMD_RTP_LEN
;
765 assert( ds
&& cmd
&& (nmsgs
> 0) );
769 ds
->max_pkt
= ds
->pkt_count
= 0;
770 ds
->mtu
= ETHERNET_MTU
;
772 if( NULL
!= fname
) {
773 ds
->stype
= DT_UNKNOWN
;
774 ds
->flags
|= (F_CHECK_FMT
| F_FILE_INPUT
);
775 TRACE( (void)tmfputs( "File stream, RTP check enabled\n", g_flog
) );
777 else if( 0 == strncmp( cmd
, CMD_UDP
, CMD_UDP_LEN
) ) {
778 ds
->stype
= DT_UNKNOWN
;
779 ds
->flags
|= F_CHECK_FMT
;
780 TRACE( (void)tmfputs( "UDP stream, RTP check enabled\n", g_flog
) );
782 else if( 0 == strncmp( cmd
, CMD_RTP
, CMD_RTP_LEN
) ) {
783 ds
->stype
= DT_RTP_TS
;
784 ds
->flags
|= F_SCATTERED
;
785 TRACE( (void)tmfputs( "RTP (over UDP) stream assumed,"
786 " no checks\n", g_flog
) );
789 TRACE( (void)tmfprintf( g_flog
, "%s: "
790 "Irrelevant command [%s]\n", __func__
, cmd
) );
794 ds
->pkt
= calloc( nmsgs
, sizeof(ds
->pkt
[0]) );
795 if( NULL
== ds
->pkt
) {
796 mperror( g_flog
, errno
, "%s: calloc", __func__
);