Udpxy v1.0-Chipmunk-build21 (from Teaman-ND branch)
[tomato.git] / release / src / router / udpxy / dpkt.c
blob9fbbeed1c11a404fd81c9fd71363f8bf4b84b418
1 /* @(#) implementation of packet-io functions for udpxy
3 * Copyright 2008-2011 Pavel V. Cherenkov (pcherenkov@gmail.com)
5 * This file is part of udpxy.
7 * udpxy is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
12 * udpxy is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
17 * You should have received a copy of the GNU General Public License
18 * along with udpxy. If not, see <http://www.gnu.org/licenses/>.
21 #include <sys/types.h>
22 #include <unistd.h>
23 #include <stdio.h>
24 #include <assert.h>
25 #include <errno.h>
26 #include <stdlib.h>
27 #include <sys/uio.h>
28 #include <string.h>
29 #include <strings.h>
30 #include <time.h>
32 #include "udpxy.h"
33 #include "dpkt.h"
34 #include "rtp.h"
35 #include "util.h"
36 #include "mtrace.h"
38 extern FILE* g_flog;
40 static const size_t TS_SEG_LEN = 188;
42 static const char* upxfmt_NAME[] = {
43 "UNKNOWN", "MPEG-TS", "RTP-TS", "UDPXY-UDS"
47 const char*
48 fmt2str( upxfmt_t fmt )
50 int ifmt = fmt;
52 assert( (ifmt > (int)DT_FIRST) && (ifmt < (int)DT_LAST) );
53 return upxfmt_NAME[ ifmt ];
57 /* check for MPEG TS signature, complain if not found
59 static int
60 ts_sigcheck( const int c, off_t offset, ssize_t len,
61 FILE* log, const char* func )
64 assert( len );
65 (void)(len); /* NOP to avoid warnings */
67 if( c == MPEG_TS_SIG ) return 0;
69 if( log && func ) {
70 (void) offset; /* get rid of a warning if TRACE is disabled */
71 TRACE( (void)tmfprintf( log, "%s: TS signature mismatch TS=[0x%02X], found=[0x%02X]; "
72 "offset [0x%X(%u)] of packet len=[%lu]\n",
73 func, MPEG_TS_SIG, c, offset, offset, (u_long)len ) );
76 return -1;
80 /* determine type of stream in memory
82 upxfmt_t
83 get_mstream_type( const char* data, size_t len, FILE* log )
85 int sig = 0;
86 size_t hdrlen = 0;
87 ssize_t n = -1;
89 assert( data && len );
91 if( len < (RTP_HDR_SIZE + 1) ) {
92 (void) tmfprintf( log, "%s: read [%ld] bytes,"
93 " not enough for RTP header\n", __func__,
94 (long)n );
95 return DT_UNKNOWN;
98 /* if the 1st byte has MPEG-TS signature - skip further checks */
99 sig = data[0] & 0xFF;
100 if( 0 == ts_sigcheck( sig, 0, 1, NULL /*log*/, __func__ ) )
101 return DT_TS;
103 /* if not RTP - quit */
104 if( 0 == RTP_verify( data, RTP_HDR_SIZE, log ) )
105 return DT_UNKNOWN;
107 /* check the first byte after RTP header - should be
108 * TS signature to be RTP over TS */
110 if( (0 != RTP_hdrlen( data, len, &hdrlen, log )) ||
111 (len < hdrlen) ) {
112 return DT_UNKNOWN;
115 sig = data[ hdrlen ];
116 if( 0 != ts_sigcheck( sig, 0, 1, log, __func__ ) )
117 return DT_UNKNOWN;
119 return DT_RTP_TS;
124 /* determine type of stream saved in file
126 upxfmt_t
127 get_fstream_type( int fd, FILE* log )
129 ssize_t n = 0;
130 off_t offset = 0, where = 0;
131 upxfmt_t dtype = DT_UNKNOWN;
132 char* data = NULL;
134 /* read in enough data to contain extended header
135 * and beginning of payload segment */
136 size_t len = TS_SEG_LEN + RTP_XTHDRLEN;
138 assert( (fd > 0) && log );
140 if( NULL == (data = malloc( len )) ) {
141 mperror( log, errno, "%s: malloc", __func__ );
142 return DT_UNKNOWN;
145 do {
146 /* check if it is a MPEG TS stream
148 n = read( fd, data, len );
149 if( 0 != sizecheck( "Not enough space for stream data",
150 len, n, log, __func__ ) ) break;
151 offset += n;
153 dtype = get_mstream_type( data, len, log );
154 if( DT_UNKNOWN == dtype ) {
155 TRACE( (void)tmfprintf( log, "%s: file type is not recognized\n",
156 __func__ ) );
157 dtype = DT_UNKNOWN;
158 break;
160 } while(0);
162 if( NULL != data ) free( data );
164 if( n <= 0 ) {
165 mperror( log, errno, "%s", __func__ );
166 return DT_UNKNOWN;
169 where = lseek( fd, (-1) * offset, SEEK_CUR );
170 if( -1 == where ) {
171 mperror( log, errno, "%s: lseek", __func__ );
172 return DT_UNKNOWN;
176 TRACE( (void)tmfprintf( log, "%s: stream type = [%d]=[%s]\n",
177 __func__, (int)dtype, fmt2str(dtype) ) );
180 return dtype;
184 /* read a sequence of MPEG TS packets (to fit into the given buffer)
186 static ssize_t
187 read_ts_file( int fd, char* data, const size_t len, FILE* log )
189 const size_t pkt_len = ((len - 1) / TS_SEG_LEN) * TS_SEG_LEN;
190 off_t k = 0;
191 u_int bad_frg = 0;
192 ssize_t n = -1;
194 assert( (fd > 0) && data && len && log);
196 assert( !buf_overrun( data, len, 0, pkt_len, log ) );
197 n = read_buf( fd, data, pkt_len, log );
198 if( n <= 0 ) return n;
200 if( 0 != sizecheck( "Bad TS packet stream",
201 pkt_len, n, log, __func__ ) )
202 return -1;
204 /* make sure we've read TS records, not random data
206 for( k = 0; k < (off_t)pkt_len; k += TS_SEG_LEN ) {
207 if( -1 == ts_sigcheck( data[k], k, (u_long)pkt_len,
208 log, __func__ ) ) {
209 ++bad_frg;
213 return (bad_frg ? -1 : n);
217 /* read an RTP packet
219 static ssize_t
220 read_rtp_file( int fd, char* data, const size_t len, FILE* log )
222 ssize_t nrd = -1, offset = 0;
223 size_t hdrlen = 0, rdlen = 0;
224 u_int frg = 0;
225 int rtp_end = 0, rc = 0;
226 off_t where = 0;
228 assert( (fd > 0) && data && len && log);
230 assert( !buf_overrun( data, len, 0, RTP_HDR_SIZE, log ) );
231 nrd = read_buf( fd, data, RTP_HDR_SIZE, log );
232 if( nrd <= 0 ) return nrd;
233 offset += nrd;
235 if( -1 == sizecheck( "Bad RTP header", RTP_HDR_SIZE, nrd,
236 log, __func__ ) )
237 return -1;
239 if( 0 == RTP_verify( data, nrd, log ) )
240 return -1;
242 if( -1 == (rc = RTP_hdrlen( data, nrd, &hdrlen, log )) )
243 return -1;
245 /* if there is an extended header, read it in */
246 if( ENOMEM == rc ) {
247 assert( !buf_overrun( data, len, offset,
248 RTP_XTHDRLEN - RTP_HDR_SIZE, log ) );
249 nrd = read_buf( fd, data + offset,
250 RTP_XTHDRLEN - RTP_HDR_SIZE, log );
251 if( (nrd <= 0) ||
252 (-1 == sizecheck("Bad RTP x-header",
253 RTP_XTHDRLEN - RTP_HDR_SIZE, nrd,
254 log, __func__ )) ) {
255 return -1;
257 if( 0 == nrd ) return nrd;
259 offset += nrd;
260 rc = RTP_hdrlen( data, offset, &hdrlen, log );
261 if( 0 != rc ) {
262 TRACE( (void)tmfprintf( log, "%s: bad RTP header - quitting\n",
263 __func__ ) );
264 return -1;
268 TRACE( (void)tmfprintf( log, "%s: RTP x-header length=[%lu]\n",
269 __func__, (u_long)hdrlen ) );
272 if( (size_t)offset > hdrlen ) {
273 /* read more than needed: step back */
275 where = lseek( fd, (-1)*((size_t)offset - hdrlen), SEEK_CUR );
276 if( -1 == where ) {
277 mperror( log, errno, "%s: lseek", __func__ );
278 return -1;
281 offset -= ((size_t)offset - hdrlen);
282 assert( (size_t)offset == hdrlen );
285 TRACE( (void)tmfprintf( log, "%s: back to fpos=[0x%X], "
286 "offset=[%ld]\n", __func__, (u_int)where, (long)offset ) );
289 else if( hdrlen > (size_t)offset ) {
290 /* read remainder of the header in */
292 assert( !buf_overrun( data, len, offset,
293 (hdrlen - (size_t)offset), log ) );
294 nrd = read_buf( fd, data + offset,
295 (hdrlen - (size_t)offset), log );
296 if( nrd <= 0 ||
297 (-1 == sizecheck("Bad RTP x-header tail",
298 (hdrlen - (size_t)offset), nrd,
299 log, __func__ )) ) {
300 return -1;
302 if( 0 == nrd ) return nrd;
304 offset += nrd;
305 assert( (size_t)offset == hdrlen );
307 } /* read extended header */
310 /* read TS records until there is another RTP header or EOF */
311 for( frg = 0; (ssize_t)len > offset; ++frg ) {
313 rdlen = ( (len - offset) < TS_SEG_LEN
314 ? (len - offset)
315 : TS_SEG_LEN );
318 TRACE( (void)tmfprintf( log, "%s: reading [%lu] more bytes\n",
319 __func__, (u_long)rdlen ) );
322 assert( !buf_overrun( data, len, offset, rdlen, log ) );
323 nrd = read_buf( fd, data + offset, rdlen, log );
324 if( nrd <= 0 ) break;
326 /* if it's an RTP header, roll back and return
328 rtp_end = RTP_verify( data + offset, nrd, log );
329 if( 1 == rtp_end ) {
330 if( -1 == lseek( fd, (-1) * nrd, SEEK_CUR ) ) {
331 mperror( log, errno, "%s: lseek", __func__ );
332 return -1;
335 break;
338 /* check if it is a TS packet and it's of the right size
340 if( (-1 == ts_sigcheck( data[offset], offset, (u_long)TS_SEG_LEN,
341 log, __func__ )) ||
342 (-1 == sizecheck( "Bad TS segment size", TS_SEG_LEN, nrd,
343 log, __func__ )) ) {
344 TRACE( hex_dump( "Data in question", data, offset + nrd, log ) );
345 return -1;
348 offset += nrd;
349 } /* for */
352 /* If it is not EOF and no RTP header for the next message is found,
353 * it is either our buffer is too small (to fit the whole message)
354 * or the stream is invalid
356 if( !rtp_end && (0 != nrd) ) {
357 (void)tmfprintf( log, "%s: no RTP end after reading [%ld] bytes\n",
358 __func__, (long)offset );
359 return -1;
362 return (nrd < 0) ? nrd : offset;
366 /* read record of one of the supported types from file
368 ssize_t
369 read_frecord( int fd, char* data, const size_t len,
370 upxfmt_t* stream_type, FILE* log )
372 upxfmt_t stype;
373 /* off_t where = -1, endmark = -1; */
374 ssize_t nrd = -1;
376 assert( fd > 0 );
377 assert( data && len );
378 assert( stream_type && log );
380 stype = *stream_type;
383 where = lseek( fd, 0, SEEK_CUR );
384 TRACE( (void)tmfprintf( log, "%s: BEGIN reading at pos=[0x%X:%u]\n",
385 __func__, (u_int)where, (u_int)where ) );
388 if( DT_UNKNOWN == *stream_type ) {
389 stype = get_fstream_type( fd, log );
391 if( DT_UNKNOWN == stype ) {
392 (void)tmfprintf( log, "%s: Unsupported type\n", __func__ );
393 return -1;
396 *stream_type = stype;
397 } /* DT_UNKNOWN */
399 if( DT_TS == stype ) {
400 nrd = read_ts_file( fd, data, len, log );
402 else if( DT_RTP_TS == stype ) {
403 nrd = read_rtp_file( fd, data, len, log );
405 else {
406 (void)tmfprintf( log, "%s: unknown stream type [%d]\n",
407 __func__, stype );
408 return -1;
412 if( nrd >= 0 ) {
413 endmark = lseek( fd, 0, SEEK_CUR );
415 TRACE( (void)tmfprintf( log, "%s: END reading [%ld] bytes at pos=[0x%X:%u]\n",
416 __func__, (long)nrd, (u_int)endmark, (u_int)endmark ) );
418 TRACE( sizecheck( "WARNING: Read file discrepancy",
419 where + nrd, endmark,
420 log, __func__ ) );
424 return nrd;
428 /* write data as a UDS record
430 static ssize_t
431 write_uds_record( int fd, const char* data, size_t len, FILE* log )
433 assert( (fd > 0) && data && len );
434 (void)(data && len && fd);
435 (void)tmfprintf( log, "%s: UDS conversion not yet implemented\n",
436 __func__ );
437 return -1;
441 /* write RTP record into TS stream
443 static ssize_t
444 write_rtp2ts( int fd, const char* data, size_t len, FILE* log )
446 void* buf = (void*)data;
447 size_t pldlen = len;
448 const int NO_VERIFY = 0;
449 int rc = 0;
451 assert( (fd > 0) && data && len && log );
453 rc = RTP_process( &buf, &pldlen, NO_VERIFY, log );
454 if( -1 == rc ) return -1;
456 assert( !buf_overrun( buf, len, 0, pldlen, log ) );
457 return write_buf( fd, buf, pldlen, log );
461 /* write record after converting it from source into destination
462 * format
464 ssize_t
465 write_frecord( int fd, const char* data, size_t len,
466 upxfmt_t sfmt, upxfmt_t dfmt, FILE* log )
468 ssize_t nwr = -1;
469 int fmt_ok = 0;
470 const char *str_from = NULL, *str_to = NULL;
472 if( DT_UDS == dfmt ) {
473 fmt_ok = 1;
474 nwr = write_uds_record( fd, data, len, log );
476 else if( DT_TS == dfmt ) {
477 if( DT_RTP_TS == sfmt ) {
478 fmt_ok = 1;
479 nwr = write_rtp2ts( fd, data, len, log );
483 if( !fmt_ok ) {
484 str_from = fmt2str(sfmt);
485 str_to = fmt2str(dfmt);
486 (void)tmfprintf( log, "Conversion from [%s] into [%s] is not supported\n",
487 str_from, str_to );
488 return -1;
491 return nwr;
495 /* reset packet-buffer registry in stream spec
497 void
498 reset_pkt_registry( struct dstream_ctx* ds )
500 assert( ds );
502 ds->flags &= ~F_DROP_PACKET;
503 ds->pkt_count = 0;
507 /* release resources allocated for stream spec
509 void
510 free_dstream_ctx( struct dstream_ctx* ds )
512 assert( ds );
514 if( NULL != ds->pkt ) {
515 free( ds->pkt );
516 ds->pkt = NULL;
518 ds->pkt_count = ds->max_pkt = 0;
524 /* register received packet into registry (for scattered output)
526 static int
527 register_packet( struct dstream_ctx* spc, char* buf, size_t len )
529 struct iovec* new_pkt = NULL;
530 static const int DO_VERIFY = 1;
532 void* new_buf = buf;
533 size_t new_len = len;
535 assert( spc->max_pkt > 0 );
537 /* enlarge packet registry if needed */
538 if( spc->pkt_count >= spc->max_pkt ) {
539 spc->max_pkt <<= 1;
540 spc->pkt = realloc( spc->pkt, spc->max_pkt * sizeof(spc->pkt[0]) );
541 if( NULL == spc->pkt ) {
542 mperror( g_flog, errno, "%s: realloc", __func__ );
543 return -1;
546 TRACE( (void)tmfprintf( g_flog, "RTP packet registry "
547 "expanded to [%lu] records\n", (u_long)spc->max_pkt ) );
550 /* put packet info into registry */
552 new_pkt = &(spc->pkt[ spc->pkt_count ]);
555 TRACE( (void)tmfprintf( stderr, "IN: packet [%lu]: buf=[%p], len=[%lu]\n",
556 (u_long)spc->pkt_count, (void*)buf, (u_long)len ) );
559 if( 0 != RTP_process( &new_buf, &new_len, DO_VERIFY, g_flog ) ) {
560 TRACE( (void)tmfputs("register packet: dropping\n", g_flog) );
561 spc->flags |= F_DROP_PACKET;
563 return 0;
566 new_pkt->iov_base = new_buf;
567 new_pkt->iov_len = new_len;
570 TRACE( (void)tmfprintf( stderr, "OUT: packet [%lu]: buf=[%p], len=[%lu]\n",
571 (u_long)spc->pkt_count, new_pkt->iov_base,
572 (u_long)new_pkt->iov_len ) );
575 spc->pkt_count++;
576 return 0;
580 /* read data from source, determine underlying protocol
581 * (if not already known); and process the packet
582 * if needed (for RTP - register packet)
584 * return the number of octets read from the source
585 * into the buffer
587 static ssize_t
588 read_packet( struct dstream_ctx* spc, int fd, char* buf, size_t len )
590 ssize_t n = -1;
591 size_t chunk_len = len;
593 assert( spc && buf && len );
594 assert( fd > 0 );
596 /* if *RAW* data specified - read AS IS
597 * and exit */
598 if( DT_RAW == spc->stype ) {
599 return read_buf( fd, buf, len, g_flog );
602 /* if it is (or *could* be) RTP, read only MTU bytes
604 if( (spc->stype == DT_RTP_TS) || (spc->flags & F_CHECK_FMT) )
605 chunk_len = (len > spc->mtu) ? spc->mtu : len;
607 if( spc->flags & F_FILE_INPUT ) {
608 assert( !buf_overrun( buf, len, 0, chunk_len, g_flog ) );
609 n = read_frecord( fd, buf, chunk_len, &(spc->stype), g_flog );
610 if( n <= 0 ) return n;
612 else {
613 assert( !buf_overrun(buf, len, 0, chunk_len, g_flog) );
614 n = read_buf( fd, buf, chunk_len, g_flog );
615 if( n <= 0 ) return n;
617 spc->stype = get_mstream_type( buf, n, g_flog );
620 if( spc->flags & F_CHECK_FMT ) {
621 if( DT_RTP_TS == spc->stype ) {
622 /* scattered data (to exclude RTP headers):
623 * use packet registry */
624 spc->flags |= F_SCATTERED;
626 else if( DT_TS == spc->stype ) {
627 spc->flags &= ~F_SCATTERED;
629 else {
630 spc->stype = DT_RAW;
631 TRACE( (void)tmfputs( "Unrecognized stream type\n", g_flog ) );
634 TRACE( (void)tmfprintf( g_flog, "Established stream as [%s]\n",
635 fmt2str( spc->stype ) ) );
637 spc->flags &= ~F_CHECK_FMT;
640 if( spc->flags & F_SCATTERED )
641 if( -1 == register_packet( spc, buf, n ) ) return -1;
643 return n;
647 /* read data from source of specified type (UDP socket or otherwise);
648 * read as many fragments as specified (max_frgs) into the buffer
650 ssize_t
651 read_data( struct dstream_ctx* spc, int fd, char* data,
652 const ssize_t data_len, const struct rdata_opt* opt )
654 int m = 0;
655 ssize_t n = 0, nrcv = -1;
656 time_t start_tm = time(NULL), cur_tm = 0;
657 time_t buftm_sec = 0;
659 assert( spc && (data_len > 0) && opt );
661 /* if max_frgs < 0, read as many packets as can fit in the buffer,
662 * otherwise read no more than max_frgs packets
665 for( m = 0, n = 0; ((opt->max_frgs < 0) ? 1 : (m < opt->max_frgs)); ++m ) {
666 nrcv = read_packet( spc, fd, data + n, data_len - n );
667 if( nrcv <= 0 ) {
668 if( EAGAIN == errno ) {
669 (void)tmfprintf( g_flog,
670 "Receive on socket/file [%d] timed out\n",
671 fd);
674 if( 0 == nrcv ) (void)tmfprintf(g_flog, "%s - EOF\n",__func__);
675 else {
676 mperror(g_flog, errno, "%s: read/recv", __func__);
679 break;
682 if( spc->flags & F_DROP_PACKET ) {
683 spc->flags &= ~F_DROP_PACKET;
684 continue;
687 n += nrcv;
688 if( n >= (data_len - nrcv) ) break;
690 if( -1 != opt->buf_tmout ) {
691 cur_tm = time(NULL);
692 buftm_sec = cur_tm - start_tm;
693 if( buftm_sec >= opt->buf_tmout ) {
694 TRACE( (void)tmfprintf( g_flog, "%s: Buffer timed out "
695 "after [%ld] seconds\n", __func__,
696 (long)buftm_sec ) );
697 break;
700 else {
701 (void) tmfprintf( g_flog, "%s: Skip\n", __func__ );
705 } /* for */
707 if( (nrcv > 0) && !n ) {
708 TRACE( (void)tmfprintf( g_flog, "%s: no data to send "
709 "out of [%d] packets\n", __func__, m ) );
710 return -1;
713 return (nrcv > 0) ? n : -1;
717 /* write data to destination(s)
719 ssize_t
720 write_data( const struct dstream_ctx* spc,
721 const char* data,
722 const ssize_t len,
723 int fd )
725 ssize_t n = 0, error = IO_ERR;
726 int32_t n_count = -1;
728 assert( spc && data && len );
729 if( fd <= 0 ) return 0;
731 if( spc->flags & F_SCATTERED ) {
732 n_count = spc->pkt_count;
733 n = writev( fd, spc->pkt, n_count );
734 if( n <= 0 ) {
735 if( EAGAIN == errno ) {
736 (void)tmfprintf( g_flog, "Write on fd=[%d] timed out\n", fd);
737 error = IO_BLK;
739 mperror( g_flog, errno, "%s: writev", __func__ );
740 return error;
743 else {
744 n = write_buf( fd, data, len, g_flog );
745 if( n < 0 )
746 error = n;
749 return (n > 0) ? n : error;
753 /* initialize incoming-stream context:
754 * set data type (if possible) and flags
757 init_dstream_ctx( struct dstream_ctx* ds, const char* cmd, const char* fname,
758 ssize_t nmsgs )
760 extern const char CMD_UDP[];
761 extern const size_t CMD_UDP_LEN;
762 extern const char CMD_RTP[];
763 extern const size_t CMD_RTP_LEN;
765 assert( ds && cmd && (nmsgs > 0) );
767 ds->flags = 0;
768 ds->pkt = NULL;
769 ds->max_pkt = ds->pkt_count = 0;
770 ds->mtu = ETHERNET_MTU;
772 if( NULL != fname ) {
773 ds->stype = DT_UNKNOWN;
774 ds->flags |= (F_CHECK_FMT | F_FILE_INPUT);
775 TRACE( (void)tmfputs( "File stream, RTP check enabled\n", g_flog ) );
777 else if( 0 == strncmp( cmd, CMD_UDP, CMD_UDP_LEN ) ) {
778 ds->stype = DT_UNKNOWN;
779 ds->flags |= F_CHECK_FMT;
780 TRACE( (void)tmfputs( "UDP stream, RTP check enabled\n", g_flog ) );
782 else if( 0 == strncmp( cmd, CMD_RTP, CMD_RTP_LEN ) ) {
783 ds->stype = DT_RTP_TS;
784 ds->flags |= F_SCATTERED;
785 TRACE( (void)tmfputs( "RTP (over UDP) stream assumed,"
786 " no checks\n", g_flog ) );
788 else {
789 TRACE( (void)tmfprintf( g_flog, "%s: "
790 "Irrelevant command [%s]\n", __func__, cmd) );
791 return -1;
794 ds->pkt = calloc( nmsgs, sizeof(ds->pkt[0]) );
795 if( NULL == ds->pkt ) {
796 mperror( g_flog, errno, "%s: calloc", __func__ );
797 return -1;
800 ds->max_pkt = nmsgs;
801 return 0;
806 /* __EOF__ */