1 /*****************************************************************************
2 * rist.c: RIST (Reliable Internet Stream Transport) input module
3 *****************************************************************************
4 * Copyright (C) 2018, DVEO, the Broadcast Division of Computer Modules, Inc.
5 * Copyright (C) 2018, SipRadius LLC
7 * Authors: Sergio Ammirata <sergio@ammirata.net>
8 * Daniele Lacamera <root@danielinux.net>
10 * This program is free software; you can redistribute it and/or modify it
11 * under the terms of the GNU Lesser General Public License as published by
12 * the Free Software Foundation; either version 2.1 of the License, or
13 * (at your option) any later version.
15 * This program is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 * GNU Lesser General Public License for more details.
20 * You should have received a copy of the GNU Lesser General Public License
21 * along with this program; if not, write to the Free Software Foundation,
22 * Inc., 51 Franklin Street, Fifth Floor, Boston MA 02110-1301, USA.
23 *****************************************************************************/
29 #include <vlc_common.h>
30 #include <vlc_interrupt.h>
31 #include <vlc_plugin.h>
32 #include <vlc_access.h>
33 #include <vlc_queue.h>
34 #include <vlc_threads.h>
35 #include <vlc_network.h>
36 #include <vlc_block.h>
41 #include <bitstream/ietf/rtcp_rr.h>
42 #include <bitstream/ietf/rtcp_sdes.h>
43 #include <bitstream/ietf/rtcp_fb.h>
44 #include <bitstream/ietf/rtp.h>
48 /* The default latency is 1000 ms */
49 #define RIST_DEFAULT_LATENCY 1000
50 /* The default nack retry interval */
51 #define RIST_DEFAULT_RETRY_INTERVAL 132
52 /* The default packet re-ordering buffer */
53 #define RIST_DEFAULT_REORDER_BUFFER 70
54 /* The default max packet size */
55 #define RIST_MAX_PACKET_SIZE 1472
56 /* The default timeout is 5 ms */
57 #define RIST_DEFAULT_POLL_TIMEOUT 5
58 /* The max retry count for nacks */
59 #define RIST_MAX_RETRIES 10
60 /* The rate at which we process and send nack requests */
61 #define NACK_INTERVAL 5 /*ms*/
62 /* Calculate and print stats once per second */
63 #define STATS_INTERVAL 1000 /*ms*/
65 static const int nack_type
[] = {
69 static const char *const nack_type_names
[] = {
70 N_("Range"), N_("Bitmask"),
80 struct rist_flow
*flow
;
81 char sender_name
[MAX_CNAME
];
82 enum NACK_TYPE nack_type
;
83 uint64_t last_data_rx
;
84 uint64_t last_nack_tx
;
86 int i_max_packet_size
;
88 int i_poll_timeout_current
;
91 bool b_sendblindnacks
;
93 bool b_flag_discontinuity
;
97 uint64_t last_message
;
100 uint32_t i_poll_timeout_zero_count
;
101 uint32_t i_poll_timeout_nonzero_count
;
102 uint64_t i_last_stat
;
104 uint16_t vbr_ratio_count
;
105 uint32_t i_lost_packets
;
106 uint32_t i_nack_packets
;
107 uint32_t i_recovered_packets
;
108 uint32_t i_reordered_packets
;
109 uint32_t i_total_packets
;
112 static int Control(stream_t
*p_access
, int i_query
, va_list args
)
116 case STREAM_CAN_SEEK
:
117 case STREAM_CAN_FASTSEEK
:
118 case STREAM_CAN_PAUSE
:
119 case STREAM_CAN_CONTROL_PACE
:
120 *va_arg( args
, bool * ) = false;
123 case STREAM_GET_PTS_DELAY
:
124 *va_arg( args
, vlc_tick_t
* ) = VLC_TICK_FROM_MS(
125 var_InheritInteger(p_access
, "network-caching") );
135 static struct rist_flow
*rist_init_rx(void)
137 struct rist_flow
*flow
= calloc(1, sizeof(struct rist_flow
));
142 flow
->buffer
= calloc(RIST_QUEUE_SIZE
, sizeof(struct rtp_pkt
));
144 if ( unlikely( flow
->buffer
== NULL
) )
151 flow
->fd_rtcp_m
= -1;
156 static void rist_WriteTo_i11e_Locked(vlc_mutex_t lock
, int fd
, const void *buf
, size_t len
,
157 const struct sockaddr
*peer
, socklen_t slen
)
159 vlc_mutex_lock( &lock
);
160 rist_WriteTo_i11e(fd
, buf
, len
, peer
, slen
);
161 vlc_mutex_unlock( &lock
);
164 static struct rist_flow
*rist_udp_receiver(stream_t
*p_access
, vlc_url_t
*parsed_url
, bool b_ismulticast
)
166 stream_sys_t
*p_sys
= p_access
->p_sys
;
167 msg_Info( p_access
, "Opening Rist Flow Receiver at %s:%d and %s:%d",
168 parsed_url
->psz_host
, parsed_url
->i_port
,
169 parsed_url
->psz_host
, parsed_url
->i_port
+1);
171 p_sys
->flow
= rist_init_rx();
175 p_sys
->flow
->fd_in
= net_OpenDgram(p_access
, parsed_url
->psz_host
, parsed_url
->i_port
, NULL
,
177 if (p_sys
->flow
->fd_in
< 0)
179 msg_Err( p_access
, "cannot open input socket" );
185 p_sys
->flow
->fd_rtcp_m
= net_OpenDgram(p_access
, parsed_url
->psz_host
, parsed_url
->i_port
+ 1,
186 NULL
, 0, IPPROTO_UDP
);
187 if (p_sys
->flow
->fd_rtcp_m
< 0)
189 msg_Err( p_access
, "cannot open multicast nack socket" );
192 p_sys
->flow
->fd_nack
= net_ConnectDgram(p_access
, parsed_url
->psz_host
,
193 parsed_url
->i_port
+ 1, -1, IPPROTO_UDP
);
197 p_sys
->flow
->fd_nack
= net_OpenDgram(p_access
, parsed_url
->psz_host
, parsed_url
->i_port
+ 1,
198 NULL
, 0, IPPROTO_UDP
);
200 if (p_sys
->flow
->fd_nack
< 0)
202 msg_Err( p_access
, "cannot open nack socket" );
206 populate_cname(p_sys
->flow
->fd_nack
, p_sys
->flow
->cname
);
207 msg_Info(p_access
, "our cname is %s", p_sys
->flow
->cname
);
212 if (p_sys
->flow
->fd_in
!= -1)
213 vlc_close(p_sys
->flow
->fd_in
);
214 if (p_sys
->flow
->fd_nack
!= -1)
215 vlc_close(p_sys
->flow
->fd_nack
);
216 if (p_sys
->flow
->fd_rtcp_m
!= -1)
217 vlc_close(p_sys
->flow
->fd_rtcp_m
);
218 free(p_sys
->flow
->buffer
);
223 static int is_index_in_range(struct rist_flow
*flow
, uint16_t idx
)
225 if (flow
->ri
<= flow
->wi
) {
226 return ((idx
> flow
->ri
) && (idx
<= flow
->wi
));
228 return ((idx
> flow
->ri
) || (idx
<= flow
->wi
));
232 static void send_rtcp_feedback(stream_t
*p_access
, struct rist_flow
*flow
)
234 stream_sys_t
*p_sys
= p_access
->p_sys
;
235 int namelen
= strlen(flow
->cname
) + 1;
237 /* we need to make sure it is a multiple of 4, pad if necessary */
238 if ((namelen
- 2) & 0x3)
239 namelen
= ((((namelen
- 2) >> 2) + 1) << 2) + 2;
241 int rtcp_feedback_size
= RTCP_EMPTY_RR_SIZE
+ RTCP_SDES_SIZE
+ namelen
;
242 uint8_t *buf
= malloc(rtcp_feedback_size
);
243 if ( unlikely( buf
== NULL
) )
250 rtcp_set_length(rr
, 1);
251 rtcp_fb_set_int_ssrc_pkt_sender(rr
, 0);
254 uint8_t *p_sdes
= (buf
+ RTCP_EMPTY_RR_SIZE
);
256 rtp_set_cc(p_sdes
, 1); /* Actually it is source count in this case */
257 rtcp_sdes_set_pt(p_sdes
);
258 rtcp_set_length(p_sdes
, (namelen
>> 2) + 2);
259 rtcp_sdes_set_cname(p_sdes
, 1);
260 rtcp_sdes_set_name_length(p_sdes
, strlen(flow
->cname
));
261 uint8_t *p_sdes_name
= (buf
+ RTCP_EMPTY_RR_SIZE
+ RTCP_SDES_SIZE
);
262 strlcpy((char *)p_sdes_name
, flow
->cname
, namelen
);
264 /* Write to Socket */
265 rist_WriteTo_i11e_Locked(p_sys
->lock
, flow
->fd_nack
, buf
, rtcp_feedback_size
,
266 (struct sockaddr
*)&flow
->peer_sockaddr
, flow
->peer_socklen
);
271 static void send_bbnack(stream_t
*p_access
, int fd_nack
, block_t
*pkt_nacks
, uint16_t nack_count
)
273 stream_sys_t
*p_sys
= p_access
->p_sys
;
274 struct rist_flow
*flow
= p_sys
->flow
;
277 int bbnack_bufsize
= RTCP_FB_HEADER_SIZE
+
278 RTCP_FB_FCI_GENERIC_NACK_SIZE
* nack_count
;
279 uint8_t *buf
= malloc(bbnack_bufsize
);
280 if ( unlikely( buf
== NULL
) )
286 rtcp_fb_set_fmt(nack
, NACK_FMT_BITMASK
);
287 rtcp_set_pt(nack
, RTCP_PT_RTPFB
);
288 rtcp_set_length(nack
, 2 + nack_count
);
289 /*uint8_t name[4] = "RIST";*/
290 /*rtcp_fb_set_ssrc_media_src(nack, name);*/
291 len
+= RTCP_FB_HEADER_SIZE
;
292 /* TODO : group together */
293 uint16_t nacks
[MAX_NACKS
];
294 memcpy(nacks
, pkt_nacks
->p_buffer
, pkt_nacks
->i_buffer
);
295 for (int i
= 0; i
< nack_count
; i
++) {
296 uint8_t *nack_record
= buf
+ len
+ RTCP_FB_FCI_GENERIC_NACK_SIZE
*i
;
297 rtcp_fb_nack_set_packet_id(nack_record
, nacks
[i
]);
298 rtcp_fb_nack_set_bitmask_lost(nack_record
, 0);
300 len
+= RTCP_FB_FCI_GENERIC_NACK_SIZE
* nack_count
;
302 /* Write to Socket */
303 if (p_sys
->b_sendnacks
&& p_sys
->b_disablenacks
== false)
304 rist_WriteTo_i11e_Locked(p_sys
->lock
, fd_nack
, buf
, len
,
305 (struct sockaddr
*)&flow
->peer_sockaddr
, flow
->peer_socklen
);
310 static void send_rbnack(stream_t
*p_access
, int fd_nack
, block_t
*pkt_nacks
, uint16_t nack_count
)
312 stream_sys_t
*p_sys
= p_access
->p_sys
;
313 struct rist_flow
*flow
= p_sys
->flow
;
316 int rbnack_bufsize
= RTCP_FB_HEADER_SIZE
+
317 RTCP_FB_FCI_GENERIC_NACK_SIZE
* nack_count
;
318 uint8_t *buf
= malloc(rbnack_bufsize
);
319 if ( unlikely( buf
== NULL
) )
325 rtcp_fb_set_fmt(nack
, NACK_FMT_RANGE
);
326 rtcp_set_pt(nack
, RTCP_PT_RTPFR
);
327 rtcp_set_length(nack
, 2 + nack_count
);
328 uint8_t name
[4] = "RIST";
329 rtcp_fb_set_ssrc_media_src(nack
, name
);
330 len
+= RTCP_FB_HEADER_SIZE
;
331 /* TODO : group together */
332 uint16_t nacks
[MAX_NACKS
];
333 memcpy(nacks
, pkt_nacks
->p_buffer
, pkt_nacks
->i_buffer
);
334 for (int i
= 0; i
< nack_count
; i
++)
336 uint8_t *nack_record
= buf
+ len
+ RTCP_FB_FCI_GENERIC_NACK_SIZE
*i
;
337 rtcp_fb_nack_set_range_start(nack_record
, nacks
[i
]);
338 rtcp_fb_nack_set_range_extra(nack_record
, 0);
340 len
+= RTCP_FB_FCI_GENERIC_NACK_SIZE
* nack_count
;
342 /* Write to Socket */
343 if (p_sys
->b_sendnacks
&& p_sys
->b_disablenacks
== false)
344 rist_WriteTo_i11e_Locked(p_sys
->lock
, fd_nack
, buf
, len
,
345 (struct sockaddr
*)&flow
->peer_sockaddr
, flow
->peer_socklen
);
350 static void send_nacks(stream_t
*p_access
, struct rist_flow
*flow
)
352 stream_sys_t
*p_sys
= p_access
->p_sys
;
355 uint64_t last_ts
= 0;
356 uint16_t null_count
= 0;
358 uint16_t nacks
[MAX_NACKS
];
361 while(idx
++ != flow
->wi
)
363 pkt
= &(flow
->buffer
[idx
]);
364 if (pkt
->buffer
== NULL
)
366 if (nacks_len
+ 1 >= MAX_NACKS
)
373 /* TODO: after adding average spacing calculation, change this formula
374 to extrapolated_ts = last_ts + null_count * avg_delta_ts; */
375 uint64_t extrapolated_ts
= last_ts
;
376 /* Find out the age and add it only if necessary */
377 int retry_count
= flow
->nacks_retries
[idx
];
378 uint64_t age
= flow
->hi_timestamp
- extrapolated_ts
;
380 if (retry_count
== 0){
381 expiration
= flow
->reorder_buffer
;
383 expiration
= (uint64_t)flow
->nacks_retries
[idx
] * (uint64_t)flow
->retry_interval
;
385 if (age
> expiration
&& retry_count
<= flow
->max_retries
)
387 flow
->nacks_retries
[idx
]++;
388 nacks
[nacks_len
++] = idx
;
389 msg_Dbg(p_access
, "Sending NACK for seq %d, age %"PRId64
" ms, retry %u, " \
390 "expiration %"PRId64
" ms", idx
, ts_get_from_rtp(age
)/1000,
391 flow
->nacks_retries
[idx
], ts_get_from_rtp(expiration
)/1000);
397 last_ts
= pkt
->rtp_ts
;
403 p_sys
->i_nack_packets
+= nacks_len
;
404 block_t
*pkt_nacks
= block_Alloc(nacks_len
* 2);
407 memcpy(pkt_nacks
->p_buffer
, nacks
, nacks_len
* 2);
408 pkt_nacks
->i_buffer
= nacks_len
* 2;
409 vlc_queue_Enqueue(&p_sys
->queue
, pkt_nacks
);
414 static int sockaddr_cmp(struct sockaddr
*x
, struct sockaddr
*y
)
416 #define CMP(a, b) if (a != b) return a < b ? -1 : 1
418 CMP(x
->sa_family
, y
->sa_family
);
420 if (x
->sa_family
== AF_INET
)
422 struct sockaddr_in
*xin
= (void*)x
, *yin
= (void*)y
;
423 CMP(ntohl(xin
->sin_addr
.s_addr
), ntohl(yin
->sin_addr
.s_addr
));
424 CMP(ntohs(xin
->sin_port
), ntohs(yin
->sin_port
));
426 else if (x
->sa_family
== AF_INET6
)
428 struct sockaddr_in6
*xin6
= (void*)x
, *yin6
= (void*)y
;
429 int r
= memcmp(xin6
->sin6_addr
.s6_addr
, yin6
->sin6_addr
.s6_addr
,
430 sizeof(xin6
->sin6_addr
.s6_addr
));
433 CMP(ntohs(xin6
->sin6_port
), ntohs(yin6
->sin6_port
));
434 CMP(xin6
->sin6_flowinfo
, yin6
->sin6_flowinfo
);
435 CMP(xin6
->sin6_scope_id
, yin6
->sin6_scope_id
);
442 static void print_sockaddr_info_change(stream_t
*p_access
, struct sockaddr
*x
, struct sockaddr
*y
)
444 if (x
->sa_family
== AF_INET
)
446 struct sockaddr_in
*xin
= (void*)x
, *yin
= (void*)y
;
447 msg_Info(p_access
, "Peer IP:Port change detected: old IP:Port %s:%d, new IP:Port %s:%d",
448 inet_ntoa(xin
->sin_addr
), ntohs(xin
->sin_port
), inet_ntoa(yin
->sin_addr
),
449 ntohs(yin
->sin_port
));
451 else if (x
->sa_family
== AF_INET6
)
453 struct sockaddr_in6
*xin6
= (void*)x
, *yin6
= (void*)y
;
454 char oldstr
[INET6_ADDRSTRLEN
];
455 char newstr
[INET6_ADDRSTRLEN
];
456 inet_ntop(xin6
->sin6_family
, &xin6
->sin6_addr
, oldstr
, sizeof(struct in6_addr
));
457 inet_ntop(yin6
->sin6_family
, &yin6
->sin6_addr
, newstr
, sizeof(struct in6_addr
));
458 msg_Info(p_access
, "Peer IP:Port change detected: old IP:Port %s:%d, new IP:Port %s:%d",
459 oldstr
, ntohs(xin6
->sin6_port
), newstr
, ntohs(yin6
->sin6_port
));
463 static void print_sockaddr_info(stream_t
*p_access
, struct sockaddr
*x
)
465 if (x
->sa_family
== AF_INET
)
467 struct sockaddr_in
*xin
= (void*)x
;
468 msg_Info(p_access
, "Peer IP:Port %s:%d", inet_ntoa(xin
->sin_addr
), ntohs(xin
->sin_port
));
470 else if (x
->sa_family
== AF_INET6
)
472 struct sockaddr_in6
*xin6
= (void*)x
;
473 char str
[INET6_ADDRSTRLEN
];
474 inet_ntop(xin6
->sin6_family
, &xin6
->sin6_addr
, str
, sizeof(struct in6_addr
));
475 msg_Info(p_access
, "Peer IP:Port %s:%d", str
, ntohs(xin6
->sin6_port
));
479 static void rtcp_input(stream_t
*p_access
, struct rist_flow
*flow
, uint8_t *buf_in
, size_t len
,
480 struct sockaddr
*peer
, socklen_t slen
)
482 stream_sys_t
*p_sys
= p_access
->p_sys
;
484 uint16_t processed_bytes
= 0;
486 char new_sender_name
[MAX_CNAME
];
489 while (processed_bytes
< len
) {
490 buf
= buf_in
+ processed_bytes
;
492 uint16_t bytes_left
= len
- processed_bytes
+ 1;
493 if ( bytes_left
< 4 )
495 /* we must have at least 4 bytes */
496 msg_Err(p_access
, "Rist rtcp packet must have at least 4 bytes, we have %d",
500 else if (!rtp_check_hdr(buf
))
502 /* check for a valid rtp header */
503 msg_Err(p_access
, "Malformed rtcp packet starting with %02x, ignoring.", buf
[0]);
507 ptype
= rtcp_get_pt(buf
);
508 records
= rtcp_get_length(buf
);
509 uint16_t bytes
= (uint16_t)(4 * (1 + records
));
510 if (bytes
> bytes_left
)
512 /* check for a sane number of bytes */
513 msg_Err(p_access
, "Malformed rtcp packet, wrong len %d, expecting %u bytes in the " \
514 "packet, got a buffer of %u bytes.", rtcp_get_length(buf
), bytes
, bytes_left
);
528 if (p_sys
->b_sendnacks
== false)
529 p_sys
->b_sendnacks
= true;
530 if (p_sys
->b_ismulticast
)
532 /* Check for changes in source IP address or port */
533 int8_t name_length
= rtcp_sdes_get_name_length(buf
);
534 if (name_length
> bytes_left
|| name_length
<= 0 ||
535 (size_t)name_length
> sizeof(new_sender_name
))
537 /* check for a sane number of bytes */
538 msg_Err(p_access
, "Malformed SDES packet, wrong cname len %d, got a " \
539 "buffer of %u bytes.", name_length
, bytes_left
);
542 bool ip_port_changed
= false;
543 if (sockaddr_cmp((struct sockaddr
*)&flow
->peer_sockaddr
, peer
) != 0)
545 ip_port_changed
= true;
546 if(flow
->peer_socklen
> 0)
547 print_sockaddr_info_change(p_access
,
548 (struct sockaddr
*)&flow
->peer_sockaddr
, peer
);
550 print_sockaddr_info(p_access
, peer
);
551 vlc_mutex_lock( &p_sys
->lock
);
552 memcpy(&flow
->peer_sockaddr
, peer
, sizeof(struct sockaddr_storage
));
553 flow
->peer_socklen
= slen
;
554 vlc_mutex_unlock( &p_sys
->lock
);
557 /* Check for changes in cname */
558 bool peer_name_changed
= false;
559 memset(new_sender_name
, 0, MAX_CNAME
);
560 memcpy(new_sender_name
, buf
+ RTCP_SDES_SIZE
, name_length
);
561 if (memcmp(new_sender_name
, p_sys
->sender_name
, name_length
) != 0)
563 peer_name_changed
= true;
564 if (strcmp(p_sys
->sender_name
, "") == 0)
565 msg_Info(p_access
, "Peer Name: %s", new_sender_name
);
567 msg_Info(p_access
, "Peer Name change detected: old Name: %s, new " \
568 "Name: %s", p_sys
->sender_name
, new_sender_name
);
569 memset(p_sys
->sender_name
, 0, MAX_CNAME
);
570 memcpy(p_sys
->sender_name
, buf
+ RTCP_SDES_SIZE
, name_length
);
573 /* Reset the buffer as the source must have been restarted */
574 if (peer_name_changed
|| ip_port_changed
)
576 /* reset the buffer */
583 if (p_sys
->b_sendnacks
== false)
584 p_sys
->b_sendnacks
= true;
585 if (p_sys
->b_ismulticast
)
590 msg_Err(p_access
, " Unrecognized RTCP packet with PTYPE=%02x!!", ptype
);
592 processed_bytes
+= (4 * (1 + records
));
596 static bool rist_input(stream_t
*p_access
, struct rist_flow
*flow
, uint8_t *buf
, size_t len
)
598 stream_sys_t
*p_sys
= p_access
->p_sys
;
601 if ( len
< RTP_HEADER_SIZE
)
603 /* check if packet size >= rtp header size */
604 msg_Err(p_access
, "Rist rtp packet must have at least 12 bytes, we have %zu", len
);
607 else if (!rtp_check_hdr(buf
))
609 /* check for a valid rtp header */
610 msg_Err(p_access
, "Malformed rtp packet header starting with %02x, ignoring.", buf
[0]);
614 uint16_t idx
= rtp_get_seqnum(buf
);
615 uint32_t pkt_ts
= rtp_get_timestamp(buf
);
616 bool retrasnmitted
= false;
619 if (flow
->reset
== 1)
621 msg_Info(p_access
, "Traffic detected after buffer reset");
622 /* First packet in the queue */
623 flow
->hi_timestamp
= pkt_ts
;
624 msg_Info(p_access
, "ts@%u", flow
->hi_timestamp
);
628 p_sys
->b_flag_discontinuity
= true;
631 /* Check to see if this is a retransmission or a regular packet */
632 if (buf
[11] & (1 << 0))
634 msg_Dbg(p_access
, "Packet %d RECOVERED, Window: [%d:%d-->%d]", idx
, flow
->ri
, flow
->wi
,
636 p_sys
->i_recovered_packets
++;
637 retrasnmitted
= true;
639 else if (flow
->wi
!= flow
->ri
)
641 /* Reset counter to 0 on incoming holes */
642 /* Regular packets only as retransmits are expected to come in out of order */
643 uint16_t idxnext
= (uint16_t)(flow
->wi
+ 1);
647 msg_Dbg(p_access
, "Gap, got %d, expected %d, %d packet gap, Window: [%d:%d-->%d]",
648 idx
, idxnext
, idx
- idxnext
, flow
->ri
, flow
->wi
, (uint16_t)(flow
->wi
-flow
->ri
));
650 p_sys
->i_reordered_packets
++;
651 msg_Dbg(p_access
, "Out of order, got %d, expected %d, Window: [%d:%d-->%d]", idx
,
652 idxnext
, flow
->ri
, flow
->wi
, (uint16_t)(flow
->wi
-flow
->ri
));
654 uint16_t zero_counter
= (uint16_t)(flow
->wi
+ 1);
655 while(zero_counter
++ != idx
) {
656 flow
->nacks_retries
[zero_counter
] = 0;
658 /*msg_Dbg(p_access, "Gap, reseting %d packets as zero nacks %d to %d",
659 idx - flow->wi - 1, (uint16_t)(flow->wi + 1), idx);*/
663 /* Always replace the existing one with the new one */
665 pkt
= &(flow
->buffer
[idx
]);
666 if (pkt
->buffer
&& pkt
->buffer
->i_buffer
> 0)
668 block_Release(pkt
->buffer
);
671 pkt
->buffer
= block_Alloc(len
);
675 pkt
->buffer
->i_buffer
= len
;
676 memcpy(pkt
->buffer
->p_buffer
, buf
, len
);
677 pkt
->rtp_ts
= pkt_ts
;
678 p_sys
->last_data_rx
= vlc_tick_now();
679 /* Reset the try counter regardless of wether it was a retransmit or not */
680 flow
->nacks_retries
[idx
] = 0;
685 p_sys
->i_total_packets
++;
686 /* Perform discontinuity checks and udpdate counters */
687 if (!is_index_in_range(flow
, idx
) && pkt_ts
>= flow
->hi_timestamp
)
689 if ((pkt_ts
- flow
->hi_timestamp
) > flow
->hi_timestamp
/10)
691 msg_Info(p_access
, "Forward stream discontinuity idx@%d/%d/%d ts@%u/%u", flow
->ri
, idx
,
692 flow
->wi
, pkt_ts
, flow
->hi_timestamp
);
699 flow
->hi_timestamp
= pkt_ts
;
702 else if (!is_index_in_range(flow
, idx
))
704 /* incoming timestamp just jumped back in time or index is outside of scope */
705 msg_Info(p_access
, "Backwards stream discontinuity idx@%d/%d/%d ts@%u/%u", flow
->ri
, idx
,
706 flow
->wi
, pkt_ts
, flow
->hi_timestamp
);
714 static block_t
*rist_dequeue(stream_t
*p_access
, struct rist_flow
*flow
)
716 stream_sys_t
*p_sys
= p_access
->p_sys
;
717 block_t
*pktout
= NULL
;
720 if (flow
->ri
== flow
->wi
|| flow
->reset
> 0)
724 bool found_data
= false;
725 uint16_t loss_amount
= 0;
726 while(idx
++ != flow
->wi
) {
728 pkt
= &(flow
->buffer
[idx
]);
731 /*msg_Info(p_access, "Possible packet loss on index #%d", idx);*/
733 /* We move ahead until we find a timestamp but we do not move the cursor.
734 * None of them are guaranteed packet loss because we do not really
735 * know their timestamps. They might still arrive on the next loop.
736 * We can confirm the loss only if we get a valid packet in the loop below. */
740 /*printf("IDX=%d, flow->hi_timestamp: %u, (ts + flow->rtp_latency): %u\n", idx,
741 flow->hi_timestamp, (ts - 100 * flow->qdelay));*/
742 if (flow
->hi_timestamp
> (uint32_t)(pkt
->rtp_ts
+ flow
->rtp_latency
))
744 /* Populate output packet now but remove rtp header from source */
745 int newSize
= pkt
->buffer
->i_buffer
- RTP_HEADER_SIZE
;
746 pktout
= block_Alloc(newSize
);
749 pktout
->i_buffer
= newSize
;
750 memcpy(pktout
->p_buffer
, pkt
->buffer
->p_buffer
+ RTP_HEADER_SIZE
, newSize
);
751 /* free the buffer and increase the read index */
753 /* TODO: calculate average duration using buffer average (bring from sender) */
756 block_Release(pkt
->buffer
);
763 if (loss_amount
> 0 && found_data
== true)
765 /* Packet loss confirmed, we found valid data after the holes */
766 msg_Dbg(p_access
, "Packet NOT RECOVERED, %d packet(s), Window: [%d:%d]", loss_amount
,
768 p_sys
->i_lost_packets
+= loss_amount
;
769 p_sys
->b_flag_discontinuity
= true;
775 static void *rist_thread(void *data
)
777 stream_t
*p_access
= data
;
778 stream_sys_t
*p_sys
= p_access
->p_sys
;
781 /* Process nacks every 5ms */
782 /* We only ask for the relevant ones */
783 while ((pkt_nacks
= vlc_queue_DequeueKillable(&p_sys
->queue
,
784 &p_sys
->dead
)) != NULL
) {
785 /* there are two bytes per nack */
786 uint16_t nack_count
= (uint16_t)pkt_nacks
->i_buffer
/2;
787 switch(p_sys
->nack_type
) {
788 case NACK_FMT_BITMASK
:
789 send_bbnack(p_access
, p_sys
->flow
->fd_nack
, pkt_nacks
, nack_count
);
793 send_rbnack(p_access
, p_sys
->flow
->fd_nack
, pkt_nacks
, nack_count
);
797 msg_Dbg(p_access
, "Sent %u NACKs !!!", nack_count
);
798 block_Release(pkt_nacks
);
804 static block_t
*BlockRIST(stream_t
*p_access
, bool *restrict eof
)
806 stream_sys_t
*p_sys
= p_access
->p_sys
;
809 block_t
*pktout
= NULL
;
810 struct pollfd pfd
[3];
813 struct sockaddr_storage peer
;
814 socklen_t slen
= sizeof(struct sockaddr_storage
);
815 struct rist_flow
*flow
= p_sys
->flow
;
823 int poll_sockets
= 2;
824 pfd
[0].fd
= flow
->fd_in
;
825 pfd
[0].events
= POLLIN
;
826 pfd
[1].fd
= flow
->fd_nack
;
827 pfd
[1].events
= POLLIN
;
828 if (p_sys
->b_ismulticast
)
830 pfd
[2].fd
= flow
->fd_rtcp_m
;
831 pfd
[2].events
= POLLIN
;
835 /* The protocol uses a fifo buffer with a fixed time delay.
836 * That buffer needs to be emptied at a rate that is determined by the rtp timestamps of the
837 * packets. If I waited indefinitely for data coming in, the rate and delay of output packets
838 * would be wrong. I am calling the rist_dequeue function every time a data packet comes in
839 * and also every time we get a poll timeout. The configurable poll timeout is for controling
840 * the maximum jitter of output data coming out of the buffer. The default 5ms timeout covers
843 ret
= vlc_poll_i11e(pfd
, poll_sockets
, p_sys
->i_poll_timeout_current
);
844 if (unlikely(ret
< 0))
848 /* Poll timeout, check the queue for the next packet that needs to be delivered */
849 pktout
= rist_dequeue(p_access
, flow
);
850 /* if there is data, we need to come back faster to finish emptying it */
852 p_sys
->i_poll_timeout_current
= 0;
853 p_sys
->i_poll_timeout_zero_count
++;
855 p_sys
->i_poll_timeout_current
= p_sys
->i_poll_timeout
;
856 p_sys
->i_poll_timeout_nonzero_count
++;
862 uint8_t *buf
= malloc(p_sys
->i_max_packet_size
);
863 if ( unlikely( buf
== NULL
) )
866 /* Process rctp incoming data */
867 if (pfd
[1].revents
& POLLIN
)
869 r
= rist_ReadFrom_i11e(flow
->fd_nack
, buf
, p_sys
->i_max_packet_size
,
870 (struct sockaddr
*)&peer
, &slen
);
871 if (unlikely(r
== -1)) {
872 msg_Err(p_access
, "socket %d error: %s\n", flow
->fd_nack
, gai_strerror(errno
));
875 if (p_sys
->b_ismulticast
== false)
876 rtcp_input(p_access
, flow
, buf
, r
, (struct sockaddr
*)&peer
, slen
);
879 if (p_sys
->b_ismulticast
&& pfd
[2].revents
& POLLIN
)
881 r
= rist_ReadFrom_i11e(flow
->fd_rtcp_m
, buf
, p_sys
->i_max_packet_size
,
882 (struct sockaddr
*)&peer
, &slen
);
883 if (unlikely(r
== -1)) {
884 msg_Err(p_access
, "mcast socket %d error: %s\n",flow
->fd_rtcp_m
, gai_strerror(errno
));
887 rtcp_input(p_access
, flow
, buf
, r
, (struct sockaddr
*)&peer
, slen
);
891 /* Process regular incoming data */
892 if (pfd
[0].revents
& POLLIN
)
894 r
= rist_Read_i11e(flow
->fd_in
, buf
, p_sys
->i_max_packet_size
);
895 if (unlikely(r
== -1)) {
896 msg_Err(p_access
, "socket %d error: %s\n", flow
->fd_in
, gai_strerror(errno
));
900 /* rist_input will process and queue the pkt */
901 if (rist_input(p_access
, flow
, buf
, r
))
903 /* Check the queue for the next packet that needs to be delivered */
904 pktout
= rist_dequeue(p_access
, flow
);
906 p_sys
->i_poll_timeout_current
= 0;
907 p_sys
->i_poll_timeout_zero_count
++;
909 p_sys
->i_poll_timeout_current
= p_sys
->i_poll_timeout
;
910 p_sys
->i_poll_timeout_nonzero_count
++;
920 now
= vlc_tick_now();
922 /* Process stats and print them out */
923 /* We need to measure some items every 70ms */
924 uint64_t interval
= (now
- flow
->feedback_time
);
925 if ( interval
> VLC_TICK_FROM_MS(RTCP_INTERVAL
) )
927 if (p_sys
->i_poll_timeout_nonzero_count
> 0)
929 float ratio
= (float)p_sys
->i_poll_timeout_zero_count
930 / (float)p_sys
->i_poll_timeout_nonzero_count
;
932 p_sys
->vbr_ratio
+= 1 - ratio
;
934 p_sys
->vbr_ratio
+= ratio
- 1;
935 p_sys
->vbr_ratio_count
++;
936 /*msg_Dbg(p_access, "zero poll %u, non-zero poll %u, ratio %.2f",
937 p_sys->i_poll_timeout_zero_count, p_sys->i_poll_timeout_nonzero_count, ratio);*/
938 p_sys
->i_poll_timeout_zero_count
= 0;
939 p_sys
->i_poll_timeout_nonzero_count
= 0;
942 /* We print out the stats once per second */
943 interval
= (now
- p_sys
->i_last_stat
);
944 if ( interval
> VLC_TICK_FROM_MS(STATS_INTERVAL
) )
946 if ( p_sys
->i_lost_packets
> 0)
947 msg_Err(p_access
, "We have %d lost packets", p_sys
->i_lost_packets
);
949 if (p_sys
->vbr_ratio_count
> 0)
950 ratio
= p_sys
->vbr_ratio
/ (float)p_sys
->vbr_ratio_count
;
952 if (p_sys
->i_total_packets
> 0)
953 quality
-= (float)100*(float)(p_sys
->i_lost_packets
+ p_sys
->i_recovered_packets
+
954 p_sys
->i_reordered_packets
)/(float)p_sys
->i_total_packets
;
956 msg_Info(p_access
, "STATS: Total %u, Recovered %u/%u, Reordered %u, Lost %u, VBR " \
957 "Score %.2f, Link Quality %.2f%%", p_sys
->i_total_packets
,
958 p_sys
->i_recovered_packets
, p_sys
->i_nack_packets
, p_sys
->i_reordered_packets
,
959 p_sys
->i_lost_packets
, ratio
, quality
);
960 p_sys
->i_last_stat
= now
;
961 p_sys
->vbr_ratio
= 0;
962 p_sys
->vbr_ratio_count
= 0;
963 p_sys
->i_lost_packets
= 0;
964 p_sys
->i_nack_packets
= 0;
965 p_sys
->i_recovered_packets
= 0;
966 p_sys
->i_reordered_packets
= 0;
967 p_sys
->i_total_packets
= 0;
970 /* Send rtcp feedback every RTCP_INTERVAL */
971 interval
= (now
- flow
->feedback_time
);
972 if ( interval
> VLC_TICK_FROM_MS(RTCP_INTERVAL
) )
974 /* msg_Dbg(p_access, "Calling RTCP Feedback %lu<%d ms using timer", interval,
975 VLC_TICK_FROM_MS(RTCP_INTERVAL)); */
976 send_rtcp_feedback(p_access
, flow
);
977 flow
->feedback_time
= now
;
980 /* Send nacks every NACK_INTERVAL (only the ones that have matured, if any) */
981 interval
= (now
- p_sys
->last_nack_tx
);
982 if ( interval
> VLC_TICK_FROM_MS(NACK_INTERVAL
) )
984 send_nacks(p_access
, p_sys
->flow
);
985 p_sys
->last_nack_tx
= now
;
988 /* Safety check for when the input stream stalls */
989 if ( p_sys
->last_data_rx
> 0 && now
> p_sys
->last_data_rx
&&
990 (uint64_t)(now
- p_sys
->last_data_rx
) > (uint64_t)VLC_TICK_FROM_MS(flow
->latency
) &&
991 (uint64_t)(now
- p_sys
->last_reset
) > (uint64_t)VLC_TICK_FROM_MS(flow
->latency
) )
993 msg_Err(p_access
, "No data received for %"PRId64
" ms, resetting buffers",
994 (int64_t)(now
- p_sys
->last_data_rx
)/1000);
995 p_sys
->last_reset
= now
;
1001 if (p_sys
->b_flag_discontinuity
) {
1002 pktout
->i_flags
|= BLOCK_FLAG_DISCONTINUITY
;
1003 p_sys
->b_flag_discontinuity
= false;
1011 static void Clean( stream_t
*p_access
)
1013 stream_sys_t
*p_sys
= p_access
->p_sys
;
1017 if (p_sys
->flow
->fd_in
>= 0)
1018 net_Close (p_sys
->flow
->fd_in
);
1019 if (p_sys
->flow
->fd_nack
>= 0)
1020 net_Close (p_sys
->flow
->fd_nack
);
1021 if (p_sys
->flow
->fd_rtcp_m
>= 0)
1022 net_Close (p_sys
->flow
->fd_rtcp_m
);
1023 for (int i
=0; i
<RIST_QUEUE_SIZE
; i
++) {
1024 struct rtp_pkt
*pkt
= &(p_sys
->flow
->buffer
[i
]);
1025 if (pkt
->buffer
&& pkt
->buffer
->i_buffer
> 0) {
1026 block_Release(pkt
->buffer
);
1030 free(p_sys
->flow
->buffer
);
1035 static void Close(vlc_object_t
*p_this
)
1037 stream_t
*p_access
= (stream_t
*)p_this
;
1038 stream_sys_t
*p_sys
= p_access
->p_sys
;
1040 vlc_queue_Kill(&p_sys
->queue
, &p_sys
->dead
);
1041 vlc_join(p_sys
->thread
, NULL
);
1046 static int Open(vlc_object_t
*p_this
)
1048 stream_t
*p_access
= (stream_t
*)p_this
;
1049 stream_sys_t
*p_sys
= NULL
;
1050 vlc_url_t parsed_url
= { 0 };
1052 p_sys
= vlc_obj_calloc( p_this
, 1, sizeof( *p_sys
) );
1053 if( unlikely( p_sys
== NULL
) )
1056 p_access
->p_sys
= p_sys
;
1058 vlc_mutex_init( &p_sys
->lock
);
1060 if ( vlc_UrlParse( &parsed_url
, p_access
->psz_url
) == -1 )
1062 msg_Err( p_access
, "Failed to parse input URL (%s)",
1063 p_access
->psz_url
);
1067 /* Initialize rist flow */
1068 p_sys
->b_ismulticast
= is_multicast_address(parsed_url
.psz_host
);
1069 p_sys
->flow
= rist_udp_receiver(p_access
, &parsed_url
, p_sys
->b_ismulticast
);
1070 vlc_UrlClean( &parsed_url
);
1073 msg_Err( p_access
, "Failed to open rist flow (%s)",
1074 p_access
->psz_url
);
1078 p_sys
->b_flag_discontinuity
= false;
1079 p_sys
->b_disablenacks
= var_InheritBool( p_access
, "disable-nacks" );
1080 p_sys
->b_sendblindnacks
= var_InheritBool( p_access
, "mcast-blind-nacks" );
1081 if (p_sys
->b_sendblindnacks
&& p_sys
->b_disablenacks
== false)
1082 p_sys
->b_sendnacks
= true;
1084 p_sys
->b_sendnacks
= false;
1085 p_sys
->nack_type
= var_InheritInteger( p_access
, "nack-type" );
1086 p_sys
->i_max_packet_size
= var_InheritInteger( p_access
, "packet-size" );
1087 p_sys
->i_poll_timeout
= var_InheritInteger( p_access
, "maximum-jitter" );
1088 p_sys
->flow
->retry_interval
= var_InheritInteger( p_access
, "retry-interval" );
1089 p_sys
->flow
->max_retries
= var_InheritInteger( p_access
, "max-retries" );
1090 p_sys
->flow
->latency
= var_InheritInteger( p_access
, "latency" );
1091 if (p_sys
->b_disablenacks
)
1092 p_sys
->flow
->reorder_buffer
= p_sys
->flow
->latency
;
1094 p_sys
->flow
->reorder_buffer
= var_InheritInteger( p_access
, "reorder-buffer" );
1095 msg_Info(p_access
, "Setting queue latency to %d ms", p_sys
->flow
->latency
);
1097 /* Convert to rtp times */
1098 p_sys
->flow
->rtp_latency
= rtp_get_ts(VLC_TICK_FROM_MS(p_sys
->flow
->latency
));
1099 p_sys
->flow
->retry_interval
= rtp_get_ts(VLC_TICK_FROM_MS(p_sys
->flow
->retry_interval
));
1100 p_sys
->flow
->reorder_buffer
= rtp_get_ts(VLC_TICK_FROM_MS(p_sys
->flow
->reorder_buffer
));
1102 p_sys
->dead
= false;
1103 vlc_queue_Init(&p_sys
->queue
, offsetof (block_t
, p_next
));
1105 /* This extra thread is for sending feedback/nack packets even when no data comes in */
1106 if (vlc_clone(&p_sys
->thread
, rist_thread
, p_access
, VLC_THREAD_PRIORITY_INPUT
))
1108 msg_Err(p_access
, "Failed to create worker thread.");
1112 p_access
->pf_block
= BlockRIST
;
1113 p_access
->pf_control
= Control
;
1119 return VLC_EGENERIC
;
1122 /* Module descriptor */
1125 set_shortname( N_("RIST") )
1126 set_description( N_("RIST input") )
1127 set_category( CAT_INPUT
)
1128 set_subcategory( SUBCAT_INPUT_ACCESS
)
1130 add_integer( "packet-size", RIST_MAX_PACKET_SIZE
,
1131 N_("RIST maximum packet size (bytes)"), NULL
, true )
1132 add_integer( "maximum-jitter", RIST_DEFAULT_POLL_TIMEOUT
,
1133 N_("RIST demux/decode maximum jitter (default is 5ms)"),
1134 N_("This controls the maximum jitter that will be passed to the demux/decode chain. "
1135 "The lower the value, the more CPU cycles the algorithm will consume"), true )
1136 add_integer( "latency", RIST_DEFAULT_LATENCY
, N_("RIST latency (ms)"), NULL
, true )
1137 add_integer( "retry-interval", RIST_DEFAULT_RETRY_INTERVAL
, N_("RIST nack retry interval (ms)"),
1139 add_integer( "reorder-buffer", RIST_DEFAULT_REORDER_BUFFER
, N_("RIST reorder buffer (ms)"),
1141 add_integer( "max-retries", RIST_MAX_RETRIES
, N_("RIST maximum retry count"), NULL
, true )
1142 add_integer( "nack-type", NACK_FMT_RANGE
,
1143 N_("RIST nack type, 0 = range, 1 = bitmask. Default is range"), NULL
, true )
1144 change_integer_list( nack_type
, nack_type_names
)
1145 add_bool( "disable-nacks", false, "Disable NACK output packets",
1146 "Use this to disable packet recovery", true )
1147 add_bool( "mcast-blind-nacks", false, "Do not check for a valid rtcp message from the encoder",
1148 "Send nack messages even when we have not confirmed that the encoder is on our local " \
1151 set_capability( "access", 0 )
1152 add_shortcut( "rist", "tr06" )
1154 set_callbacks( Open
, Close
)