1 /* @(#) implementation of packet-io functions for udpxy
3 * Copyright 2008-2011 Pavel V. Cherenkov (pcherenkov@gmail.com)
5 * This file is part of udpxy.
7 * udpxy is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
12 * udpxy is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
17 * You should have received a copy of the GNU General Public License
18 * along with udpxy. If not, see <http://www.gnu.org/licenses/>.
21 #include <sys/types.h>
40 static const size_t TS_SEG_LEN
= 188;
42 /* data-stream format type */
44 UPXDT_UNKNOWN
= 0, /* no assumptions */
45 UPXDT_TS
, /* MPEG-TS */
46 UPXDT_RTP_TS
, /* RTP over MPEG-TS */
47 UPXDT_UDS
, /* UDS file format */
48 UPXDT_RAW
/* read AS-IS */
50 static const char* upxfmt_NAME
[] = {
57 static const int UPXDT_LEN
= sizeof(upxfmt_NAME
) / sizeof(upxfmt_NAME
[0]);
61 fmt2str( upxfmt_t fmt
)
65 assert( (ifmt
>= 0 ) && (ifmt
< UPXDT_LEN
) );
66 return upxfmt_NAME
[ ifmt
];
70 /* check for MPEG TS signature, complain if not found
73 ts_sigcheck( const int c
, off_t offset
, ssize_t len
,
74 FILE* log
, const char* func
)
78 (void)(len
); /* NOP to avoid warnings */
80 if( c
== MPEG_TS_SIG
) return 0;
83 (void) offset
; /* get rid of a warning if TRACE is disabled */
84 TRACE( (void)tmfprintf( log
, "%s: TS signature mismatch TS=[0x%02X], found=[0x%02X]; "
85 "offset [0x%X(%u)] of packet len=[%lu]\n",
86 func
, MPEG_TS_SIG
, c
, offset
, offset
, (u_long
)len
) );
93 /* determine type of stream in memory
96 get_mstream_type( const char* data
, size_t len
, FILE* log
)
102 assert( data
&& len
);
104 if( len
< (RTP_HDR_SIZE
+ 1) ) {
105 (void) tmfprintf( log
, "%s: read [%ld] bytes,"
106 " not enough for RTP header\n", __func__
,
108 return UPXDT_UNKNOWN
;
111 /* if the 1st byte has MPEG-TS signature - skip further checks */
112 sig
= data
[0] & 0xFF;
113 if( 0 == ts_sigcheck( sig
, 0, 1, NULL
/*log*/, __func__
) )
116 /* if not RTP - quit */
117 if( 0 == RTP_verify( data
, RTP_HDR_SIZE
, log
) )
118 return UPXDT_UNKNOWN
;
120 /* check the first byte after RTP header - should be
121 * TS signature to be RTP over TS */
123 if( (0 != RTP_hdrlen( data
, len
, &hdrlen
, log
)) ||
125 return UPXDT_UNKNOWN
;
128 sig
= data
[ hdrlen
];
129 if( 0 != ts_sigcheck( sig
, 0, 1, log
, __func__
) )
130 return UPXDT_UNKNOWN
;
137 /* determine type of stream saved in file
140 get_fstream_type( int fd
, FILE* log
)
143 off_t offset
= 0, where
= 0;
144 upxfmt_t dtype
= UPXDT_UNKNOWN
;
147 /* read in enough data to contain extended header
148 * and beginning of payload segment */
149 size_t len
= TS_SEG_LEN
+ RTP_XTHDRLEN
;
151 assert( (fd
> 0) && log
);
153 if( NULL
== (data
= malloc( len
)) ) {
154 mperror( log
, errno
, "%s: malloc", __func__
);
155 return UPXDT_UNKNOWN
;
159 /* check if it is a MPEG TS stream
161 n
= read( fd
, data
, len
);
162 if( 0 != sizecheck( "Not enough space for stream data",
163 len
, n
, log
, __func__
) ) break;
166 dtype
= get_mstream_type( data
, len
, log
);
167 if( UPXDT_UNKNOWN
== dtype
) {
168 TRACE( (void)tmfprintf( log
, "%s: file type is not recognized\n",
170 dtype
= UPXDT_UNKNOWN
;
175 if( NULL
!= data
) free( data
);
178 mperror( log
, errno
, "%s", __func__
);
179 return UPXDT_UNKNOWN
;
182 where
= lseek( fd
, (-1) * offset
, SEEK_CUR
);
184 mperror( log
, errno
, "%s: lseek", __func__
);
185 return UPXDT_UNKNOWN
;
189 TRACE( (void)tmfprintf( log, "%s: stream type = [%d]=[%s]\n",
190 __func__, (int)dtype, fmt2str(dtype) ) );
197 /* read a sequence of MPEG TS packets (to fit into the given buffer)
200 read_ts_file( int fd
, char* data
, const size_t len
, FILE* log
)
202 const size_t pkt_len
= ((len
- 1) / TS_SEG_LEN
) * TS_SEG_LEN
;
207 assert( (fd
> 0) && data
&& len
&& log
);
209 assert( !buf_overrun( data
, len
, 0, pkt_len
, log
) );
210 n
= read_buf( fd
, data
, pkt_len
, log
);
211 if( n
<= 0 ) return n
;
213 if( 0 != sizecheck( "Bad TS packet stream",
214 pkt_len
, n
, log
, __func__
) )
217 /* make sure we've read TS records, not random data
219 for( k
= 0; k
< (off_t
)pkt_len
; k
+= TS_SEG_LEN
) {
220 if( -1 == ts_sigcheck( data
[k
], k
, (u_long
)pkt_len
,
226 return (bad_frg
? -1 : n
);
230 /* read an RTP packet
233 read_rtp_file( int fd
, char* data
, const size_t len
, FILE* log
)
235 ssize_t nrd
= -1, offset
= 0;
236 size_t hdrlen
= 0, rdlen
= 0;
238 int rtp_end
= 0, rc
= 0;
241 assert( (fd
> 0) && data
&& len
&& log
);
243 assert( !buf_overrun( data
, len
, 0, RTP_HDR_SIZE
, log
) );
244 nrd
= read_buf( fd
, data
, RTP_HDR_SIZE
, log
);
245 if( nrd
<= 0 ) return nrd
;
248 if( -1 == sizecheck( "Bad RTP header", RTP_HDR_SIZE
, nrd
,
252 if( 0 == RTP_verify( data
, nrd
, log
) )
255 if( -1 == (rc
= RTP_hdrlen( data
, nrd
, &hdrlen
, log
)) )
258 /* if there is an extended header, read it in */
260 assert( !buf_overrun( data
, len
, offset
,
261 RTP_XTHDRLEN
- RTP_HDR_SIZE
, log
) );
262 nrd
= read_buf( fd
, data
+ offset
,
263 RTP_XTHDRLEN
- RTP_HDR_SIZE
, log
);
265 (-1 == sizecheck("Bad RTP x-header",
266 RTP_XTHDRLEN
- RTP_HDR_SIZE
, nrd
,
270 if( 0 == nrd
) return nrd
;
273 rc
= RTP_hdrlen( data
, offset
, &hdrlen
, log
);
275 TRACE( (void)tmfprintf( log
, "%s: bad RTP header - quitting\n",
281 TRACE( (void)tmfprintf( log, "%s: RTP x-header length=[%lu]\n",
282 __func__, (u_long)hdrlen ) );
285 if( (size_t)offset
> hdrlen
) {
286 /* read more than needed: step back */
288 where
= lseek( fd
, (-1)*((size_t)offset
- hdrlen
), SEEK_CUR
);
290 mperror( log
, errno
, "%s: lseek", __func__
);
294 offset
-= ((size_t)offset
- hdrlen
);
295 assert( (size_t)offset
== hdrlen
);
298 TRACE( (void)tmfprintf( log, "%s: back to fpos=[0x%X], "
299 "offset=[%ld]\n", __func__, (u_int)where, (long)offset ) );
302 else if( hdrlen
> (size_t)offset
) {
303 /* read remainder of the header in */
305 assert( !buf_overrun( data
, len
, offset
,
306 (hdrlen
- (size_t)offset
), log
) );
307 nrd
= read_buf( fd
, data
+ offset
,
308 (hdrlen
- (size_t)offset
), log
);
310 (-1 == sizecheck("Bad RTP x-header tail",
311 (hdrlen
- (size_t)offset
), nrd
,
315 if( 0 == nrd
) return nrd
;
318 assert( (size_t)offset
== hdrlen
);
320 } /* read extended header */
323 /* read TS records until there is another RTP header or EOF */
324 for( frg
= 0; (ssize_t
)len
> offset
; ++frg
) {
326 rdlen
= ( (len
- offset
) < TS_SEG_LEN
331 TRACE( (void)tmfprintf( log, "%s: reading [%lu] more bytes\n",
332 __func__, (u_long)rdlen ) );
335 assert( !buf_overrun( data
, len
, offset
, rdlen
, log
) );
336 nrd
= read_buf( fd
, data
+ offset
, rdlen
, log
);
337 if( nrd
<= 0 ) break;
339 /* if it's an RTP header, roll back and return
341 rtp_end
= RTP_verify( data
+ offset
, nrd
, log
);
343 if( -1 == lseek( fd
, (-1) * nrd
, SEEK_CUR
) ) {
344 mperror( log
, errno
, "%s: lseek", __func__
);
351 /* check if it is a TS packet and it's of the right size
353 if( (-1 == ts_sigcheck( data
[offset
], offset
, (u_long
)TS_SEG_LEN
,
355 (-1 == sizecheck( "Bad TS segment size", TS_SEG_LEN
, nrd
,
357 TRACE( hex_dump( "Data in question", data
, offset
+ nrd
, log
) );
365 /* If it is not EOF and no RTP header for the next message is found,
366 * it is either our buffer is too small (to fit the whole message)
367 * or the stream is invalid
369 if( !rtp_end
&& (0 != nrd
) ) {
370 (void)tmfprintf( log
, "%s: no RTP end after reading [%ld] bytes\n",
371 __func__
, (long)offset
);
375 return (nrd
< 0) ? nrd
: offset
;
379 /* read record of one of the supported types from file
382 read_frecord( int fd
, char* data
, const size_t len
,
383 upxfmt_t
* stream_type
, FILE* log
)
386 /* off_t where = -1, endmark = -1; */
390 assert( data
&& len
);
391 assert( stream_type
&& log
);
393 stype
= *stream_type
;
396 where = lseek( fd, 0, SEEK_CUR );
397 TRACE( (void)tmfprintf( log, "%s: BEGIN reading at pos=[0x%X:%u]\n",
398 __func__, (u_int)where, (u_int)where ) );
401 if( UPXDT_UNKNOWN
== *stream_type
) {
402 stype
= get_fstream_type( fd
, log
);
404 if( UPXDT_UNKNOWN
== stype
) {
405 (void)tmfprintf( log
, "%s: Unsupported type\n", __func__
);
409 *stream_type
= stype
;
410 } /* UPXDT_UNKNOWN */
412 if( UPXDT_TS
== stype
) {
413 nrd
= read_ts_file( fd
, data
, len
, log
);
415 else if( UPXDT_RTP_TS
== stype
) {
416 nrd
= read_rtp_file( fd
, data
, len
, log
);
419 (void)tmfprintf( log
, "%s: unknown stream type [%d]\n",
426 endmark = lseek( fd, 0, SEEK_CUR );
428 TRACE( (void)tmfprintf( log, "%s: END reading [%ld] bytes at pos=[0x%X:%u]\n",
429 __func__, (long)nrd, (u_int)endmark, (u_int)endmark ) );
431 TRACE( sizecheck( "WARNING: Read file discrepancy",
432 where + nrd, endmark,
441 /* write data as a UDS record
444 write_uds_record( int fd
, const char* data
, size_t len
, FILE* log
)
446 assert( (fd
> 0) && data
&& len
);
447 (void)(data
&& len
&& fd
);
448 (void)tmfprintf( log
, "%s: UDS conversion not yet implemented\n",
454 /* write RTP record into TS stream
457 write_rtp2ts( int fd
, const char* data
, size_t len
, FILE* log
)
459 void* buf
= (void*)data
;
461 const int NO_VERIFY
= 0;
464 assert( (fd
> 0) && data
&& len
&& log
);
466 rc
= RTP_process( &buf
, &pldlen
, NO_VERIFY
, log
);
467 if( -1 == rc
) return -1;
469 assert( !buf_overrun( buf
, len
, 0, pldlen
, log
) );
470 return write_buf( fd
, buf
, pldlen
, log
);
474 /* write record after converting it from source into destination
478 write_frecord( int fd
, const char* data
, size_t len
,
479 upxfmt_t sfmt
, upxfmt_t dfmt
, FILE* log
)
483 const char *str_from
= NULL
, *str_to
= NULL
;
485 if( UPXDT_UDS
== dfmt
) {
487 nwr
= write_uds_record( fd
, data
, len
, log
);
489 else if( UPXDT_TS
== dfmt
) {
490 if( UPXDT_RTP_TS
== sfmt
) {
492 nwr
= write_rtp2ts( fd
, data
, len
, log
);
497 str_from
= fmt2str(sfmt
);
498 str_to
= fmt2str(dfmt
);
499 (void)tmfprintf( log
, "Conversion from [%s] into [%s] is not supported\n",
508 /* reset packet-buffer registry in stream spec
511 reset_pkt_registry( struct dstream_ctx
* ds
)
515 ds
->flags
&= ~F_DROP_PACKET
;
520 /* release resources allocated for stream spec
523 free_dstream_ctx( struct dstream_ctx
* ds
)
527 if( NULL
!= ds
->pkt
) {
531 ds
->pkt_count
= ds
->max_pkt
= 0;
537 /* register received packet into registry (for scattered output)
540 register_packet( struct dstream_ctx
* spc
, char* buf
, size_t len
)
542 struct iovec
* new_pkt
= NULL
;
543 static const int DO_VERIFY
= 1;
546 size_t new_len
= len
;
548 assert( spc
->max_pkt
> 0 );
550 /* enlarge packet registry if needed */
551 if( spc
->pkt_count
>= spc
->max_pkt
) {
553 spc
->pkt
= realloc( spc
->pkt
, spc
->max_pkt
* sizeof(spc
->pkt
[0]) );
554 if( NULL
== spc
->pkt
) {
555 mperror( g_flog
, errno
, "%s: realloc", __func__
);
559 TRACE( (void)tmfprintf( g_flog
, "RTP packet registry "
560 "expanded to [%lu] records\n", (u_long
)spc
->max_pkt
) );
563 /* put packet info into registry */
565 new_pkt
= &(spc
->pkt
[ spc
->pkt_count
]);
568 TRACE( (void)tmfprintf( stderr, "IN: packet [%lu]: buf=[%p], len=[%lu]\n",
569 (u_long)spc->pkt_count, (void*)buf, (u_long)len ) );
572 if( 0 != RTP_process( &new_buf
, &new_len
, DO_VERIFY
, g_flog
) ) {
573 TRACE( (void)tmfputs("register packet: dropping\n", g_flog
) );
574 spc
->flags
|= F_DROP_PACKET
;
579 new_pkt
->iov_base
= new_buf
;
580 new_pkt
->iov_len
= new_len
;
583 TRACE( (void)tmfprintf( stderr, "OUT: packet [%lu]: buf=[%p], len=[%lu]\n",
584 (u_long)spc->pkt_count, new_pkt->iov_base,
585 (u_long)new_pkt->iov_len ) );
593 /* read data from source, determine underlying protocol
594 * (if not already known); and process the packet
595 * if needed (for RTP - register packet)
597 * return the number of octets read from the source
601 read_packet( struct dstream_ctx
* spc
, int fd
, char* buf
, size_t len
)
604 size_t chunk_len
= len
;
606 assert( spc
&& buf
&& len
);
609 /* if *RAW* data specified - read AS IS
611 if( UPXDT_RAW
== spc
->stype
) {
612 return read_buf( fd
, buf
, len
, g_flog
);
615 /* if it is (or *could* be) RTP, read only MTU bytes
617 if( (spc
->stype
== UPXDT_RTP_TS
) || (spc
->flags
& F_CHECK_FMT
) )
618 chunk_len
= (len
> spc
->mtu
) ? spc
->mtu
: len
;
620 if( spc
->flags
& F_FILE_INPUT
) {
621 assert( !buf_overrun( buf
, len
, 0, chunk_len
, g_flog
) );
622 n
= read_frecord( fd
, buf
, chunk_len
, &(spc
->stype
), g_flog
);
623 if( n
<= 0 ) return n
;
626 assert( !buf_overrun(buf
, len
, 0, chunk_len
, g_flog
) );
627 n
= read_buf( fd
, buf
, chunk_len
, g_flog
);
628 if( n
<= 0 ) return n
;
631 if( spc
->flags
& F_CHECK_FMT
) {
632 spc
->stype
= get_mstream_type( buf
, n
, g_flog
);
633 switch (spc
->stype
) {
635 /* scattered: exclude RTP headers */
636 spc
->flags
|= F_SCATTERED
; break;
638 spc
->flags
&= ~F_SCATTERED
; break;
640 spc
->stype
= UPXDT_RAW
;
641 TRACE( (void)tmfputs( "Unrecognized stream type\n", g_flog
) );
645 TRACE( (void)tmfprintf( g_flog
, "Established stream as [%s]\n",
646 fmt2str( spc
->stype
) ) );
648 spc
->flags
&= ~F_CHECK_FMT
;
651 if( spc
->flags
& F_SCATTERED
)
652 if( -1 == register_packet( spc
, buf
, n
) ) return -1;
658 /* read data from source of specified type (UDP socket or otherwise);
659 * read as many fragments as specified (max_frgs) into the buffer
662 read_data( struct dstream_ctx
* spc
, int fd
, char* data
,
663 const ssize_t data_len
, const struct rdata_opt
* opt
)
666 ssize_t n
= 0, nrcv
= -1;
667 time_t start_tm
= time(NULL
), cur_tm
= 0;
668 time_t buftm_sec
= 0;
670 assert( spc
&& (data_len
> 0) && opt
);
672 /* if max_frgs < 0, read as many packets as can fit in the buffer,
673 * otherwise read no more than max_frgs packets
676 for( m
= 0, n
= 0; ((opt
->max_frgs
< 0) ? 1 : (m
< opt
->max_frgs
)); ++m
) {
677 nrcv
= read_packet( spc
, fd
, data
+ n
, data_len
- n
);
679 if( EAGAIN
== errno
) {
680 (void)tmfprintf( g_flog
,
681 "Receive on socket/file [%d] timed out\n",
685 if( 0 == nrcv
) (void)tmfprintf(g_flog
, "%s - EOF\n",__func__
);
687 mperror(g_flog
, errno
, "%s: read/recv", __func__
);
693 if( spc
->flags
& F_DROP_PACKET
) {
694 spc
->flags
&= ~F_DROP_PACKET
;
699 if( n
>= (data_len
- nrcv
) ) break;
701 if( -1 != opt
->buf_tmout
) {
703 buftm_sec
= cur_tm
- start_tm
;
704 if( buftm_sec
>= opt
->buf_tmout
) {
705 TRACE( (void)tmfprintf( g_flog
, "%s: Buffer timed out "
706 "after [%ld] seconds\n", __func__
,
712 (void) tmfprintf( g_flog, "%s: Skip\n", __func__ );
718 if( (nrcv
> 0) && !n
) {
719 TRACE( (void)tmfprintf( g_flog
, "%s: no data to send "
720 "out of [%d] packets\n", __func__
, m
) );
724 return (nrcv
> 0) ? n
: -1;
728 /* write data to destination(s)
731 write_data( const struct dstream_ctx
* spc
,
736 ssize_t n
= 0, error
= IO_ERR
;
737 int32_t n_count
= -1;
739 assert( spc
&& data
&& len
);
740 if( fd
<= 0 ) return 0;
742 if( spc
->flags
& F_SCATTERED
) {
743 n_count
= spc
->pkt_count
;
744 n
= writev( fd
, spc
->pkt
, n_count
);
746 if( EAGAIN
== errno
) {
747 (void)tmfprintf( g_flog
, "Write on fd=[%d] timed out\n", fd
);
750 mperror( g_flog
, errno
, "%s: writev", __func__
);
755 n
= write_buf( fd
, data
, len
, g_flog
);
760 return (n
> 0) ? n
: error
;
764 /* initialize incoming-stream context:
765 * set data type (if possible) and flags
768 init_dstream_ctx( struct dstream_ctx
* ds
, const char* cmd
, const char* fname
,
771 extern const char CMD_UDP
[];
772 extern const size_t CMD_UDP_LEN
;
773 extern const char CMD_RTP
[];
774 extern const size_t CMD_RTP_LEN
;
776 assert( ds
&& cmd
&& (nmsgs
> 0) );
780 ds
->max_pkt
= ds
->pkt_count
= 0;
781 ds
->mtu
= ETHERNET_MTU
;
783 if( NULL
!= fname
) {
784 ds
->stype
= UPXDT_UNKNOWN
;
785 ds
->flags
|= (F_CHECK_FMT
| F_FILE_INPUT
);
786 TRACE( (void)tmfputs( "File stream, RTP check enabled\n", g_flog
) );
788 else if( 0 == strncmp( cmd
, CMD_UDP
, CMD_UDP_LEN
) ) {
789 ds
->stype
= UPXDT_UNKNOWN
;
790 ds
->flags
|= F_CHECK_FMT
;
791 TRACE( (void)tmfputs( "UDP stream, RTP check enabled\n", g_flog
) );
793 else if( 0 == strncmp( cmd
, CMD_RTP
, CMD_RTP_LEN
) ) {
794 ds
->stype
= UPXDT_RTP_TS
;
795 ds
->flags
|= F_SCATTERED
;
796 TRACE( (void)tmfputs( "RTP (over UDP) stream assumed,"
797 " no checks\n", g_flog
) );
800 TRACE( (void)tmfprintf( g_flog
, "%s: "
801 "Irrelevant command [%s]\n", __func__
, cmd
) );
805 ds
->pkt
= calloc( nmsgs
, sizeof(ds
->pkt
[0]) );
806 if( NULL
== ds
->pkt
) {
807 mperror( g_flog
, errno
, "%s: calloc", __func__
);