contrib: cargo: use cargo/vendored-openssl if needed
[vlc.git] / modules / access / rist.c
blob009c17790da463d65aae8d104141a641e83cd074
1 /*****************************************************************************
2 * rist.c: RIST (Reliable Internet Stream Transport) input module
3 *****************************************************************************
4 * Copyright (C) 2018, DVEO, the Broadcast Division of Computer Modules, Inc.
5 * Copyright (C) 2018, SipRadius LLC
7 * Authors: Sergio Ammirata <sergio@ammirata.net>
8 * Daniele Lacamera <root@danielinux.net>
10 * This program is free software; you can redistribute it and/or modify it
11 * under the terms of the GNU Lesser General Public License as published by
12 * the Free Software Foundation; either version 2.1 of the License, or
13 * (at your option) any later version.
15 * This program is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 * GNU Lesser General Public License for more details.
20 * You should have received a copy of the GNU Lesser General Public License
21 * along with this program; if not, write to the Free Software Foundation,
22 * Inc., 51 Franklin Street, Fifth Floor, Boston MA 02110-1301, USA.
23 *****************************************************************************/
25 #ifdef HAVE_CONFIG_H
26 # include "config.h"
27 #endif
29 #include <vlc_common.h>
30 #include <vlc_interrupt.h>
31 #include <vlc_plugin.h>
32 #include <vlc_access.h>
33 #include <vlc_queue.h>
34 #include <vlc_threads.h>
35 #include <vlc_network.h>
36 #include <vlc_block.h>
37 #include <vlc_url.h>
38 #ifdef HAVE_POLL_H
39 #include <poll.h>
40 #endif
41 #include <bitstream/ietf/rtcp_rr.h>
42 #include <bitstream/ietf/rtcp_sdes.h>
43 #include <bitstream/ietf/rtcp_fb.h>
44 #include <bitstream/ietf/rtp.h>
46 #include "rist.h"
48 /* The default latency is 1000 ms */
49 #define RIST_DEFAULT_LATENCY 1000
50 /* The default nack retry interval */
51 #define RIST_DEFAULT_RETRY_INTERVAL 132
52 /* The default packet re-ordering buffer */
53 #define RIST_DEFAULT_REORDER_BUFFER 70
54 /* The default max packet size */
55 #define RIST_MAX_PACKET_SIZE 1472
56 /* The default timeout is 5 ms */
57 #define RIST_DEFAULT_POLL_TIMEOUT 5
58 /* The max retry count for nacks */
59 #define RIST_MAX_RETRIES 10
60 /* The rate at which we process and send nack requests */
61 #define NACK_INTERVAL 5 /*ms*/
62 /* Calculate and print stats once per second */
63 #define STATS_INTERVAL 1000 /*ms*/
65 static const int nack_type[] = {
66 0, 1,
69 static const char *const nack_type_names[] = {
70 N_("Range"), N_("Bitmask"),
73 enum NACK_TYPE {
74 NACK_FMT_RANGE = 0,
75 NACK_FMT_BITMASK
78 typedef struct
80 struct rist_flow *flow;
81 char sender_name[MAX_CNAME];
82 enum NACK_TYPE nack_type;
83 uint64_t last_data_rx;
84 uint64_t last_nack_tx;
85 vlc_thread_t thread;
86 int i_max_packet_size;
87 int i_poll_timeout;
88 int i_poll_timeout_current;
89 bool b_ismulticast;
90 bool b_sendnacks;
91 bool b_sendblindnacks;
92 bool b_disablenacks;
93 bool b_flag_discontinuity;
94 bool dead;
95 vlc_queue_t queue;
96 vlc_mutex_t lock;
97 uint64_t last_message;
98 uint64_t last_reset;
99 /* stat variables */
100 uint32_t i_poll_timeout_zero_count;
101 uint32_t i_poll_timeout_nonzero_count;
102 uint64_t i_last_stat;
103 float vbr_ratio;
104 uint16_t vbr_ratio_count;
105 uint32_t i_lost_packets;
106 uint32_t i_nack_packets;
107 uint32_t i_recovered_packets;
108 uint32_t i_reordered_packets;
109 uint32_t i_total_packets;
110 } stream_sys_t;
112 static int Control(stream_t *p_access, int i_query, va_list args)
114 switch( i_query )
116 case STREAM_CAN_SEEK:
117 case STREAM_CAN_FASTSEEK:
118 case STREAM_CAN_PAUSE:
119 case STREAM_CAN_CONTROL_PACE:
120 *va_arg( args, bool * ) = false;
121 break;
123 case STREAM_GET_PTS_DELAY:
124 *va_arg( args, vlc_tick_t * ) = VLC_TICK_FROM_MS(
125 var_InheritInteger(p_access, "network-caching") );
126 break;
128 default:
129 return VLC_EGENERIC;
132 return VLC_SUCCESS;
135 static struct rist_flow *rist_init_rx(void)
137 struct rist_flow *flow = calloc(1, sizeof(struct rist_flow));
138 if (!flow)
139 return NULL;
141 flow->reset = 1;
142 flow->buffer = calloc(RIST_QUEUE_SIZE, sizeof(struct rtp_pkt));
144 if ( unlikely( flow->buffer == NULL ) )
146 free(flow);
147 return NULL;
149 flow->fd_in = -1;
150 flow->fd_nack = -1;
151 flow->fd_rtcp_m = -1;
153 return flow;
156 static void rist_WriteTo_i11e_Locked(vlc_mutex_t lock, int fd, const void *buf, size_t len,
157 const struct sockaddr *peer, socklen_t slen)
159 vlc_mutex_lock( &lock );
160 rist_WriteTo_i11e(fd, buf, len, peer, slen);
161 vlc_mutex_unlock( &lock );
164 static struct rist_flow *rist_udp_receiver(stream_t *p_access, vlc_url_t *parsed_url, bool b_ismulticast)
166 stream_sys_t *p_sys = p_access->p_sys;
167 msg_Info( p_access, "Opening Rist Flow Receiver at %s:%d and %s:%d",
168 parsed_url->psz_host, parsed_url->i_port,
169 parsed_url->psz_host, parsed_url->i_port+1);
171 p_sys->flow = rist_init_rx();
172 if (!p_sys->flow)
173 return NULL;
175 p_sys->flow->fd_in = net_OpenDgram(p_access, parsed_url->psz_host, parsed_url->i_port, NULL,
176 0, IPPROTO_UDP);
177 if (p_sys->flow->fd_in < 0)
179 msg_Err( p_access, "cannot open input socket" );
180 goto fail;
183 if (b_ismulticast)
185 p_sys->flow->fd_rtcp_m = net_OpenDgram(p_access, parsed_url->psz_host, parsed_url->i_port + 1,
186 NULL, 0, IPPROTO_UDP);
187 if (p_sys->flow->fd_rtcp_m < 0)
189 msg_Err( p_access, "cannot open multicast nack socket" );
190 goto fail;
192 p_sys->flow->fd_nack = net_ConnectDgram(p_access, parsed_url->psz_host,
193 parsed_url->i_port + 1, -1, IPPROTO_UDP );
195 else
197 p_sys->flow->fd_nack = net_OpenDgram(p_access, parsed_url->psz_host, parsed_url->i_port + 1,
198 NULL, 0, IPPROTO_UDP);
200 if (p_sys->flow->fd_nack < 0)
202 msg_Err( p_access, "cannot open nack socket" );
203 goto fail;
206 populate_cname(p_sys->flow->fd_nack, p_sys->flow->cname);
207 msg_Info(p_access, "our cname is %s", p_sys->flow->cname);
209 return p_sys->flow;
211 fail:
212 if (p_sys->flow->fd_in != -1)
213 vlc_close(p_sys->flow->fd_in);
214 if (p_sys->flow->fd_nack != -1)
215 vlc_close(p_sys->flow->fd_nack);
216 if (p_sys->flow->fd_rtcp_m != -1)
217 vlc_close(p_sys->flow->fd_rtcp_m);
218 free(p_sys->flow->buffer);
219 free(p_sys->flow);
220 return NULL;
223 static int is_index_in_range(struct rist_flow *flow, uint16_t idx)
225 if (flow->ri <= flow->wi) {
226 return ((idx > flow->ri) && (idx <= flow->wi));
227 } else {
228 return ((idx > flow->ri) || (idx <= flow->wi));
232 static void send_rtcp_feedback(stream_t *p_access, struct rist_flow *flow)
234 stream_sys_t *p_sys = p_access->p_sys;
235 int namelen = strlen(flow->cname) + 1;
237 /* we need to make sure it is a multiple of 4, pad if necessary */
238 if ((namelen - 2) & 0x3)
239 namelen = ((((namelen - 2) >> 2) + 1) << 2) + 2;
241 int rtcp_feedback_size = RTCP_EMPTY_RR_SIZE + RTCP_SDES_SIZE + namelen;
242 uint8_t *buf = malloc(rtcp_feedback_size);
243 if ( unlikely( buf == NULL ) )
244 return;
246 /* Populate RR */
247 uint8_t *rr = buf;
248 rtp_set_hdr(rr);
249 rtcp_rr_set_pt(rr);
250 rtcp_set_length(rr, 1);
251 rtcp_fb_set_int_ssrc_pkt_sender(rr, 0);
253 /* Populate SDES */
254 uint8_t *p_sdes = (buf + RTCP_EMPTY_RR_SIZE);
255 rtp_set_hdr(p_sdes);
256 rtp_set_cc(p_sdes, 1); /* Actually it is source count in this case */
257 rtcp_sdes_set_pt(p_sdes);
258 rtcp_set_length(p_sdes, (namelen >> 2) + 2);
259 rtcp_sdes_set_cname(p_sdes, 1);
260 rtcp_sdes_set_name_length(p_sdes, strlen(flow->cname));
261 uint8_t *p_sdes_name = (buf + RTCP_EMPTY_RR_SIZE + RTCP_SDES_SIZE);
262 strlcpy((char *)p_sdes_name, flow->cname, namelen);
264 /* Write to Socket */
265 rist_WriteTo_i11e_Locked(p_sys->lock, flow->fd_nack, buf, rtcp_feedback_size,
266 (struct sockaddr *)&flow->peer_sockaddr, flow->peer_socklen);
267 free(buf);
268 buf = NULL;
271 static void send_bbnack(stream_t *p_access, int fd_nack, block_t *pkt_nacks, uint16_t nack_count)
273 stream_sys_t *p_sys = p_access->p_sys;
274 struct rist_flow *flow = p_sys->flow;
275 int len = 0;
277 int bbnack_bufsize = RTCP_FB_HEADER_SIZE +
278 RTCP_FB_FCI_GENERIC_NACK_SIZE * nack_count;
279 uint8_t *buf = malloc(bbnack_bufsize);
280 if ( unlikely( buf == NULL ) )
281 return;
283 /* Populate NACKS */
284 uint8_t *nack = buf;
285 rtp_set_hdr(nack);
286 rtcp_fb_set_fmt(nack, NACK_FMT_BITMASK);
287 rtcp_set_pt(nack, RTCP_PT_RTPFB);
288 rtcp_set_length(nack, 2 + nack_count);
289 /*uint8_t name[4] = "RIST";*/
290 /*rtcp_fb_set_ssrc_media_src(nack, name);*/
291 len += RTCP_FB_HEADER_SIZE;
292 /* TODO : group together */
293 uint16_t nacks[MAX_NACKS];
294 memcpy(nacks, pkt_nacks->p_buffer, pkt_nacks->i_buffer);
295 for (int i = 0; i < nack_count; i++) {
296 uint8_t *nack_record = buf + len + RTCP_FB_FCI_GENERIC_NACK_SIZE*i;
297 rtcp_fb_nack_set_packet_id(nack_record, nacks[i]);
298 rtcp_fb_nack_set_bitmask_lost(nack_record, 0);
300 len += RTCP_FB_FCI_GENERIC_NACK_SIZE * nack_count;
302 /* Write to Socket */
303 if (p_sys->b_sendnacks && p_sys->b_disablenacks == false)
304 rist_WriteTo_i11e_Locked(p_sys->lock, fd_nack, buf, len,
305 (struct sockaddr *)&flow->peer_sockaddr, flow->peer_socklen);
306 free(buf);
307 buf = NULL;
310 static void send_rbnack(stream_t *p_access, int fd_nack, block_t *pkt_nacks, uint16_t nack_count)
312 stream_sys_t *p_sys = p_access->p_sys;
313 struct rist_flow *flow = p_sys->flow;
314 int len = 0;
316 int rbnack_bufsize = RTCP_FB_HEADER_SIZE +
317 RTCP_FB_FCI_GENERIC_NACK_SIZE * nack_count;
318 uint8_t *buf = malloc(rbnack_bufsize);
319 if ( unlikely( buf == NULL ) )
320 return;
322 /* Populate NACKS */
323 uint8_t *nack = buf;
324 rtp_set_hdr(nack);
325 rtcp_fb_set_fmt(nack, NACK_FMT_RANGE);
326 rtcp_set_pt(nack, RTCP_PT_RTPFR);
327 rtcp_set_length(nack, 2 + nack_count);
328 uint8_t name[4] = "RIST";
329 rtcp_fb_set_ssrc_media_src(nack, name);
330 len += RTCP_FB_HEADER_SIZE;
331 /* TODO : group together */
332 uint16_t nacks[MAX_NACKS];
333 memcpy(nacks, pkt_nacks->p_buffer, pkt_nacks->i_buffer);
334 for (int i = 0; i < nack_count; i++)
336 uint8_t *nack_record = buf + len + RTCP_FB_FCI_GENERIC_NACK_SIZE*i;
337 rtcp_fb_nack_set_range_start(nack_record, nacks[i]);
338 rtcp_fb_nack_set_range_extra(nack_record, 0);
340 len += RTCP_FB_FCI_GENERIC_NACK_SIZE * nack_count;
342 /* Write to Socket */
343 if (p_sys->b_sendnacks && p_sys->b_disablenacks == false)
344 rist_WriteTo_i11e_Locked(p_sys->lock, fd_nack, buf, len,
345 (struct sockaddr *)&flow->peer_sockaddr, flow->peer_socklen);
346 free(buf);
347 buf = NULL;
350 static void send_nacks(stream_t *p_access, struct rist_flow *flow)
352 stream_sys_t *p_sys = p_access->p_sys;
353 struct rtp_pkt *pkt;
354 uint16_t idx;
355 uint64_t last_ts = 0;
356 uint16_t null_count = 0;
357 int nacks_len = 0;
358 uint16_t nacks[MAX_NACKS];
360 idx = flow->ri;
361 while(idx++ != flow->wi)
363 pkt = &(flow->buffer[idx]);
364 if (pkt->buffer == NULL)
366 if (nacks_len + 1 >= MAX_NACKS)
368 break;
370 else
372 null_count++;
373 /* TODO: after adding average spacing calculation, change this formula
374 to extrapolated_ts = last_ts + null_count * avg_delta_ts; */
375 uint64_t extrapolated_ts = last_ts;
376 /* Find out the age and add it only if necessary */
377 int retry_count = flow->nacks_retries[idx];
378 uint64_t age = flow->hi_timestamp - extrapolated_ts;
379 uint64_t expiration;
380 if (retry_count == 0){
381 expiration = flow->reorder_buffer;
382 } else {
383 expiration = (uint64_t)flow->nacks_retries[idx] * (uint64_t)flow->retry_interval;
385 if (age > expiration && retry_count <= flow->max_retries)
387 flow->nacks_retries[idx]++;
388 nacks[nacks_len++] = idx;
389 msg_Dbg(p_access, "Sending NACK for seq %d, age %"PRId64" ms, retry %u, " \
390 "expiration %"PRId64" ms", idx, ts_get_from_rtp(age)/1000,
391 flow->nacks_retries[idx], ts_get_from_rtp(expiration)/1000);
395 else
397 last_ts = pkt->rtp_ts;
398 null_count = 0;
401 if (nacks_len > 0)
403 p_sys->i_nack_packets += nacks_len;
404 block_t *pkt_nacks = block_Alloc(nacks_len * 2);
405 if (pkt_nacks)
407 memcpy(pkt_nacks->p_buffer, nacks, nacks_len * 2);
408 pkt_nacks->i_buffer = nacks_len * 2;
409 vlc_queue_Enqueue(&p_sys->queue, pkt_nacks);
414 static int sockaddr_cmp(struct sockaddr *x, struct sockaddr *y)
416 #define CMP(a, b) if (a != b) return a < b ? -1 : 1
418 CMP(x->sa_family, y->sa_family);
420 if (x->sa_family == AF_INET)
422 struct sockaddr_in *xin = (void*)x, *yin = (void*)y;
423 CMP(ntohl(xin->sin_addr.s_addr), ntohl(yin->sin_addr.s_addr));
424 CMP(ntohs(xin->sin_port), ntohs(yin->sin_port));
426 else if (x->sa_family == AF_INET6)
428 struct sockaddr_in6 *xin6 = (void*)x, *yin6 = (void*)y;
429 int r = memcmp(xin6->sin6_addr.s6_addr, yin6->sin6_addr.s6_addr,
430 sizeof(xin6->sin6_addr.s6_addr));
431 if (r != 0)
432 return r;
433 CMP(ntohs(xin6->sin6_port), ntohs(yin6->sin6_port));
434 CMP(xin6->sin6_flowinfo, yin6->sin6_flowinfo);
435 CMP(xin6->sin6_scope_id, yin6->sin6_scope_id);
438 #undef CMP
439 return 0;
442 static void print_sockaddr_info_change(stream_t *p_access, struct sockaddr *x, struct sockaddr *y)
444 if (x->sa_family == AF_INET)
446 struct sockaddr_in *xin = (void*)x, *yin = (void*)y;
447 msg_Info(p_access, "Peer IP:Port change detected: old IP:Port %s:%d, new IP:Port %s:%d",
448 inet_ntoa(xin->sin_addr), ntohs(xin->sin_port), inet_ntoa(yin->sin_addr),
449 ntohs(yin->sin_port));
451 else if (x->sa_family == AF_INET6)
453 struct sockaddr_in6 *xin6 = (void*)x, *yin6 = (void*)y;
454 char oldstr[INET6_ADDRSTRLEN];
455 char newstr[INET6_ADDRSTRLEN];
456 inet_ntop(xin6->sin6_family, &xin6->sin6_addr, oldstr, sizeof(struct in6_addr));
457 inet_ntop(yin6->sin6_family, &yin6->sin6_addr, newstr, sizeof(struct in6_addr));
458 msg_Info(p_access, "Peer IP:Port change detected: old IP:Port %s:%d, new IP:Port %s:%d",
459 oldstr, ntohs(xin6->sin6_port), newstr, ntohs(yin6->sin6_port));
463 static void print_sockaddr_info(stream_t *p_access, struct sockaddr *x)
465 if (x->sa_family == AF_INET)
467 struct sockaddr_in *xin = (void*)x;
468 msg_Info(p_access, "Peer IP:Port %s:%d", inet_ntoa(xin->sin_addr), ntohs(xin->sin_port));
470 else if (x->sa_family == AF_INET6)
472 struct sockaddr_in6 *xin6 = (void*)x;
473 char str[INET6_ADDRSTRLEN];
474 inet_ntop(xin6->sin6_family, &xin6->sin6_addr, str, sizeof(struct in6_addr));
475 msg_Info(p_access, "Peer IP:Port %s:%d", str, ntohs(xin6->sin6_port));
479 static void rtcp_input(stream_t *p_access, struct rist_flow *flow, uint8_t *buf_in, size_t len,
480 struct sockaddr *peer, socklen_t slen)
482 stream_sys_t *p_sys = p_access->p_sys;
483 uint8_t ptype;
484 uint16_t processed_bytes = 0;
485 uint16_t records;
486 char new_sender_name[MAX_CNAME];
487 uint8_t *buf;
489 while (processed_bytes < len) {
490 buf = buf_in + processed_bytes;
491 /* safety checks */
492 uint16_t bytes_left = len - processed_bytes + 1;
493 if ( bytes_left < 4 )
495 /* we must have at least 4 bytes */
496 msg_Err(p_access, "Rist rtcp packet must have at least 4 bytes, we have %d",
497 bytes_left);
498 return;
500 else if (!rtp_check_hdr(buf))
502 /* check for a valid rtp header */
503 msg_Err(p_access, "Malformed rtcp packet starting with %02x, ignoring.", buf[0]);
504 return;
507 ptype = rtcp_get_pt(buf);
508 records = rtcp_get_length(buf);
509 uint16_t bytes = (uint16_t)(4 * (1 + records));
510 if (bytes > bytes_left)
512 /* check for a sane number of bytes */
513 msg_Err(p_access, "Malformed rtcp packet, wrong len %d, expecting %u bytes in the " \
514 "packet, got a buffer of %u bytes.", rtcp_get_length(buf), bytes, bytes_left);
515 return;
518 switch(ptype) {
519 case RTCP_PT_RTPFR:
520 case RTCP_PT_RTPFB:
521 break;
523 case RTCP_PT_RR:
524 break;
526 case RTCP_PT_SDES:
528 if (p_sys->b_sendnacks == false)
529 p_sys->b_sendnacks = true;
530 if (p_sys->b_ismulticast)
531 return;
532 /* Check for changes in source IP address or port */
533 int8_t name_length = rtcp_sdes_get_name_length(buf);
534 if (name_length > bytes_left || name_length <= 0 ||
535 (size_t)name_length > sizeof(new_sender_name))
537 /* check for a sane number of bytes */
538 msg_Err(p_access, "Malformed SDES packet, wrong cname len %d, got a " \
539 "buffer of %u bytes.", name_length, bytes_left);
540 return;
542 bool ip_port_changed = false;
543 if (sockaddr_cmp((struct sockaddr *)&flow->peer_sockaddr, peer) != 0)
545 ip_port_changed = true;
546 if(flow->peer_socklen > 0)
547 print_sockaddr_info_change(p_access,
548 (struct sockaddr *)&flow->peer_sockaddr, peer);
549 else
550 print_sockaddr_info(p_access, peer);
551 vlc_mutex_lock( &p_sys->lock );
552 memcpy(&flow->peer_sockaddr, peer, sizeof(struct sockaddr_storage));
553 flow->peer_socklen = slen;
554 vlc_mutex_unlock( &p_sys->lock );
557 /* Check for changes in cname */
558 bool peer_name_changed = false;
559 memset(new_sender_name, 0, MAX_CNAME);
560 memcpy(new_sender_name, buf + RTCP_SDES_SIZE, name_length);
561 if (memcmp(new_sender_name, p_sys->sender_name, name_length) != 0)
563 peer_name_changed = true;
564 if (strcmp(p_sys->sender_name, "") == 0)
565 msg_Info(p_access, "Peer Name: %s", new_sender_name);
566 else
567 msg_Info(p_access, "Peer Name change detected: old Name: %s, new " \
568 "Name: %s", p_sys->sender_name, new_sender_name);
569 memset(p_sys->sender_name, 0, MAX_CNAME);
570 memcpy(p_sys->sender_name, buf + RTCP_SDES_SIZE, name_length);
573 /* Reset the buffer as the source must have been restarted */
574 if (peer_name_changed || ip_port_changed)
576 /* reset the buffer */
577 flow->reset = 1;
580 break;
582 case RTCP_PT_SR:
583 if (p_sys->b_sendnacks == false)
584 p_sys->b_sendnacks = true;
585 if (p_sys->b_ismulticast)
586 return;
587 break;
589 default:
590 msg_Err(p_access, " Unrecognized RTCP packet with PTYPE=%02x!!", ptype);
592 processed_bytes += (4 * (1 + records));
596 static bool rist_input(stream_t *p_access, struct rist_flow *flow, uint8_t *buf, size_t len)
598 stream_sys_t *p_sys = p_access->p_sys;
600 /* safety checks */
601 if ( len < RTP_HEADER_SIZE )
603 /* check if packet size >= rtp header size */
604 msg_Err(p_access, "Rist rtp packet must have at least 12 bytes, we have %zu", len);
605 return false;
607 else if (!rtp_check_hdr(buf))
609 /* check for a valid rtp header */
610 msg_Err(p_access, "Malformed rtp packet header starting with %02x, ignoring.", buf[0]);
611 return false;
614 uint16_t idx = rtp_get_seqnum(buf);
615 uint32_t pkt_ts = rtp_get_timestamp(buf);
616 bool retrasnmitted = false;
617 bool success = true;
619 if (flow->reset == 1)
621 msg_Info(p_access, "Traffic detected after buffer reset");
622 /* First packet in the queue */
623 flow->hi_timestamp = pkt_ts;
624 msg_Info(p_access, "ts@%u", flow->hi_timestamp);
625 flow->wi = idx;
626 flow->ri = idx;
627 flow->reset = 0;
628 p_sys->b_flag_discontinuity = true;
631 /* Check to see if this is a retransmission or a regular packet */
632 if (buf[11] & (1 << 0))
634 msg_Dbg(p_access, "Packet %d RECOVERED, Window: [%d:%d-->%d]", idx, flow->ri, flow->wi,
635 flow->wi-flow->ri);
636 p_sys->i_recovered_packets++;
637 retrasnmitted = true;
639 else if (flow->wi != flow->ri)
641 /* Reset counter to 0 on incoming holes */
642 /* Regular packets only as retransmits are expected to come in out of order */
643 uint16_t idxnext = (uint16_t)(flow->wi + 1);
644 if (idx != idxnext)
646 if (idx > idxnext) {
647 msg_Dbg(p_access, "Gap, got %d, expected %d, %d packet gap, Window: [%d:%d-->%d]",
648 idx, idxnext, idx - idxnext, flow->ri, flow->wi, (uint16_t)(flow->wi-flow->ri));
649 } else {
650 p_sys->i_reordered_packets++;
651 msg_Dbg(p_access, "Out of order, got %d, expected %d, Window: [%d:%d-->%d]", idx,
652 idxnext, flow->ri, flow->wi, (uint16_t)(flow->wi-flow->ri));
654 uint16_t zero_counter = (uint16_t)(flow->wi + 1);
655 while(zero_counter++ != idx) {
656 flow->nacks_retries[zero_counter] = 0;
658 /*msg_Dbg(p_access, "Gap, reseting %d packets as zero nacks %d to %d",
659 idx - flow->wi - 1, (uint16_t)(flow->wi + 1), idx);*/
663 /* Always replace the existing one with the new one */
664 struct rtp_pkt *pkt;
665 pkt = &(flow->buffer[idx]);
666 if (pkt->buffer && pkt->buffer->i_buffer > 0)
668 block_Release(pkt->buffer);
669 pkt->buffer = NULL;
671 pkt->buffer = block_Alloc(len);
672 if (!pkt->buffer)
673 return false;
675 pkt->buffer->i_buffer = len;
676 memcpy(pkt->buffer->p_buffer, buf, len);
677 pkt->rtp_ts = pkt_ts;
678 p_sys->last_data_rx = vlc_tick_now();
679 /* Reset the try counter regardless of wether it was a retransmit or not */
680 flow->nacks_retries[idx] = 0;
682 if (retrasnmitted)
683 return success;
685 p_sys->i_total_packets++;
686 /* Perform discontinuity checks and udpdate counters */
687 if (!is_index_in_range(flow, idx) && pkt_ts >= flow->hi_timestamp)
689 if ((pkt_ts - flow->hi_timestamp) > flow->hi_timestamp/10)
691 msg_Info(p_access, "Forward stream discontinuity idx@%d/%d/%d ts@%u/%u", flow->ri, idx,
692 flow->wi, pkt_ts, flow->hi_timestamp);
693 flow->reset = 1;
694 success = false;
696 else
698 flow->wi = idx;
699 flow->hi_timestamp = pkt_ts;
702 else if (!is_index_in_range(flow, idx))
704 /* incoming timestamp just jumped back in time or index is outside of scope */
705 msg_Info(p_access, "Backwards stream discontinuity idx@%d/%d/%d ts@%u/%u", flow->ri, idx,
706 flow->wi, pkt_ts, flow->hi_timestamp);
707 flow->reset = 1;
708 success = false;
711 return success;
714 static block_t *rist_dequeue(stream_t *p_access, struct rist_flow *flow)
716 stream_sys_t *p_sys = p_access->p_sys;
717 block_t *pktout = NULL;
718 struct rtp_pkt *pkt;
719 uint16_t idx;
720 if (flow->ri == flow->wi || flow->reset > 0)
721 return NULL;
723 idx = flow->ri;
724 bool found_data = false;
725 uint16_t loss_amount = 0;
726 while(idx++ != flow->wi) {
728 pkt = &(flow->buffer[idx]);
729 if (!pkt->buffer)
731 /*msg_Info(p_access, "Possible packet loss on index #%d", idx);*/
732 loss_amount++;
733 /* We move ahead until we find a timestamp but we do not move the cursor.
734 * None of them are guaranteed packet loss because we do not really
735 * know their timestamps. They might still arrive on the next loop.
736 * We can confirm the loss only if we get a valid packet in the loop below. */
737 continue;
740 /*printf("IDX=%d, flow->hi_timestamp: %u, (ts + flow->rtp_latency): %u\n", idx,
741 flow->hi_timestamp, (ts - 100 * flow->qdelay));*/
742 if (flow->hi_timestamp > (uint32_t)(pkt->rtp_ts + flow->rtp_latency))
744 /* Populate output packet now but remove rtp header from source */
745 int newSize = pkt->buffer->i_buffer - RTP_HEADER_SIZE;
746 pktout = block_Alloc(newSize);
747 if (pktout)
749 pktout->i_buffer = newSize;
750 memcpy(pktout->p_buffer, pkt->buffer->p_buffer + RTP_HEADER_SIZE, newSize);
751 /* free the buffer and increase the read index */
752 flow->ri = idx;
753 /* TODO: calculate average duration using buffer average (bring from sender) */
754 found_data = true;
756 block_Release(pkt->buffer);
757 pkt->buffer = NULL;
758 break;
763 if (loss_amount > 0 && found_data == true)
765 /* Packet loss confirmed, we found valid data after the holes */
766 msg_Dbg(p_access, "Packet NOT RECOVERED, %d packet(s), Window: [%d:%d]", loss_amount,
767 flow->ri, flow->wi);
768 p_sys->i_lost_packets += loss_amount;
769 p_sys->b_flag_discontinuity = true;
772 return pktout;
775 static void *rist_thread(void *data)
777 stream_t *p_access = data;
778 stream_sys_t *p_sys = p_access->p_sys;
779 block_t *pkt_nacks;
781 /* Process nacks every 5ms */
782 /* We only ask for the relevant ones */
783 while ((pkt_nacks = vlc_queue_DequeueKillable(&p_sys->queue,
784 &p_sys->dead)) != NULL) {
785 /* there are two bytes per nack */
786 uint16_t nack_count = (uint16_t)pkt_nacks->i_buffer/2;
787 switch(p_sys->nack_type) {
788 case NACK_FMT_BITMASK:
789 send_bbnack(p_access, p_sys->flow->fd_nack, pkt_nacks, nack_count);
790 break;
792 default:
793 send_rbnack(p_access, p_sys->flow->fd_nack, pkt_nacks, nack_count);
796 if (nack_count > 1)
797 msg_Dbg(p_access, "Sent %u NACKs !!!", nack_count);
798 block_Release(pkt_nacks);
801 return NULL;
804 static block_t *BlockRIST(stream_t *p_access, bool *restrict eof)
806 stream_sys_t *p_sys = p_access->p_sys;
807 uint64_t now;
808 *eof = false;
809 block_t *pktout = NULL;
810 struct pollfd pfd[3];
811 int ret;
812 ssize_t r;
813 struct sockaddr_storage peer;
814 socklen_t slen = sizeof(struct sockaddr_storage);
815 struct rist_flow *flow = p_sys->flow;
817 if (vlc_killed())
819 *eof = true;
820 return NULL;
823 int poll_sockets = 2;
824 pfd[0].fd = flow->fd_in;
825 pfd[0].events = POLLIN;
826 pfd[1].fd = flow->fd_nack;
827 pfd[1].events = POLLIN;
828 if (p_sys->b_ismulticast)
830 pfd[2].fd = flow->fd_rtcp_m;
831 pfd[2].events = POLLIN;
832 poll_sockets++;
835 /* The protocol uses a fifo buffer with a fixed time delay.
836 * That buffer needs to be emptied at a rate that is determined by the rtp timestamps of the
837 * packets. If I waited indefinitely for data coming in, the rate and delay of output packets
838 * would be wrong. I am calling the rist_dequeue function every time a data packet comes in
839 * and also every time we get a poll timeout. The configurable poll timeout is for controling
840 * the maximum jitter of output data coming out of the buffer. The default 5ms timeout covers
841 * most cases. */
843 ret = vlc_poll_i11e(pfd, poll_sockets, p_sys->i_poll_timeout_current);
844 if (unlikely(ret < 0))
845 return NULL;
846 else if (ret == 0)
848 /* Poll timeout, check the queue for the next packet that needs to be delivered */
849 pktout = rist_dequeue(p_access, flow);
850 /* if there is data, we need to come back faster to finish emptying it */
851 if (pktout) {
852 p_sys->i_poll_timeout_current = 0;
853 p_sys->i_poll_timeout_zero_count++;
854 } else {
855 p_sys->i_poll_timeout_current = p_sys->i_poll_timeout;
856 p_sys->i_poll_timeout_nonzero_count++;
859 else
862 uint8_t *buf = malloc(p_sys->i_max_packet_size);
863 if ( unlikely( buf == NULL ) )
864 return NULL;
866 /* Process rctp incoming data */
867 if (pfd[1].revents & POLLIN)
869 r = rist_ReadFrom_i11e(flow->fd_nack, buf, p_sys->i_max_packet_size,
870 (struct sockaddr *)&peer, &slen);
871 if (unlikely(r == -1)) {
872 msg_Err(p_access, "socket %d error: %s\n", flow->fd_nack, gai_strerror(errno));
874 else {
875 if (p_sys->b_ismulticast == false)
876 rtcp_input(p_access, flow, buf, r, (struct sockaddr *)&peer, slen);
879 if (p_sys->b_ismulticast && pfd[2].revents & POLLIN)
881 r = rist_ReadFrom_i11e(flow->fd_rtcp_m, buf, p_sys->i_max_packet_size,
882 (struct sockaddr *)&peer, &slen);
883 if (unlikely(r == -1)) {
884 msg_Err(p_access, "mcast socket %d error: %s\n",flow->fd_rtcp_m, gai_strerror(errno));
886 else {
887 rtcp_input(p_access, flow, buf, r, (struct sockaddr *)&peer, slen);
891 /* Process regular incoming data */
892 if (pfd[0].revents & POLLIN)
894 r = rist_Read_i11e(flow->fd_in, buf, p_sys->i_max_packet_size);
895 if (unlikely(r == -1)) {
896 msg_Err(p_access, "socket %d error: %s\n", flow->fd_in, gai_strerror(errno));
898 else
900 /* rist_input will process and queue the pkt */
901 if (rist_input(p_access, flow, buf, r))
903 /* Check the queue for the next packet that needs to be delivered */
904 pktout = rist_dequeue(p_access, flow);
905 if (pktout) {
906 p_sys->i_poll_timeout_current = 0;
907 p_sys->i_poll_timeout_zero_count++;
908 } else {
909 p_sys->i_poll_timeout_current = p_sys->i_poll_timeout;
910 p_sys->i_poll_timeout_nonzero_count++;
916 free(buf);
917 buf = NULL;
920 now = vlc_tick_now();
922 /* Process stats and print them out */
923 /* We need to measure some items every 70ms */
924 uint64_t interval = (now - flow->feedback_time);
925 if ( interval > VLC_TICK_FROM_MS(RTCP_INTERVAL) )
927 if (p_sys->i_poll_timeout_nonzero_count > 0)
929 float ratio = (float)p_sys->i_poll_timeout_zero_count
930 / (float)p_sys->i_poll_timeout_nonzero_count;
931 if (ratio <= 1)
932 p_sys->vbr_ratio += 1 - ratio;
933 else
934 p_sys->vbr_ratio += ratio - 1;
935 p_sys->vbr_ratio_count++;
936 /*msg_Dbg(p_access, "zero poll %u, non-zero poll %u, ratio %.2f",
937 p_sys->i_poll_timeout_zero_count, p_sys->i_poll_timeout_nonzero_count, ratio);*/
938 p_sys->i_poll_timeout_zero_count = 0;
939 p_sys->i_poll_timeout_nonzero_count = 0;
942 /* We print out the stats once per second */
943 interval = (now - p_sys->i_last_stat);
944 if ( interval > VLC_TICK_FROM_MS(STATS_INTERVAL) )
946 if ( p_sys->i_lost_packets > 0)
947 msg_Err(p_access, "We have %d lost packets", p_sys->i_lost_packets);
948 float ratio = 1;
949 if (p_sys->vbr_ratio_count > 0)
950 ratio = p_sys->vbr_ratio / (float)p_sys->vbr_ratio_count;
951 float quality = 100;
952 if (p_sys->i_total_packets > 0)
953 quality -= (float)100*(float)(p_sys->i_lost_packets + p_sys->i_recovered_packets +
954 p_sys->i_reordered_packets)/(float)p_sys->i_total_packets;
955 if (quality != 100)
956 msg_Info(p_access, "STATS: Total %u, Recovered %u/%u, Reordered %u, Lost %u, VBR " \
957 "Score %.2f, Link Quality %.2f%%", p_sys->i_total_packets,
958 p_sys->i_recovered_packets, p_sys->i_nack_packets, p_sys->i_reordered_packets,
959 p_sys->i_lost_packets, ratio, quality);
960 p_sys->i_last_stat = now;
961 p_sys->vbr_ratio = 0;
962 p_sys->vbr_ratio_count = 0;
963 p_sys->i_lost_packets = 0;
964 p_sys->i_nack_packets = 0;
965 p_sys->i_recovered_packets = 0;
966 p_sys->i_reordered_packets = 0;
967 p_sys->i_total_packets = 0;
970 /* Send rtcp feedback every RTCP_INTERVAL */
971 interval = (now - flow->feedback_time);
972 if ( interval > VLC_TICK_FROM_MS(RTCP_INTERVAL) )
974 /* msg_Dbg(p_access, "Calling RTCP Feedback %lu<%d ms using timer", interval,
975 VLC_TICK_FROM_MS(RTCP_INTERVAL)); */
976 send_rtcp_feedback(p_access, flow);
977 flow->feedback_time = now;
980 /* Send nacks every NACK_INTERVAL (only the ones that have matured, if any) */
981 interval = (now - p_sys->last_nack_tx);
982 if ( interval > VLC_TICK_FROM_MS(NACK_INTERVAL) )
984 send_nacks(p_access, p_sys->flow);
985 p_sys->last_nack_tx = now;
988 /* Safety check for when the input stream stalls */
989 if ( p_sys->last_data_rx > 0 && now > p_sys->last_data_rx &&
990 (uint64_t)(now - p_sys->last_data_rx) > (uint64_t)VLC_TICK_FROM_MS(flow->latency) &&
991 (uint64_t)(now - p_sys->last_reset) > (uint64_t)VLC_TICK_FROM_MS(flow->latency) )
993 msg_Err(p_access, "No data received for %"PRId64" ms, resetting buffers",
994 (int64_t)(now - p_sys->last_data_rx)/1000);
995 p_sys->last_reset = now;
996 flow->reset = 1;
999 if (pktout)
1001 if (p_sys->b_flag_discontinuity) {
1002 pktout->i_flags |= BLOCK_FLAG_DISCONTINUITY;
1003 p_sys->b_flag_discontinuity = false;
1005 return pktout;
1007 else
1008 return NULL;
1011 static void Clean( stream_t *p_access )
1013 stream_sys_t *p_sys = p_access->p_sys;
1015 if (p_sys->flow)
1017 if (p_sys->flow->fd_in >= 0)
1018 net_Close (p_sys->flow->fd_in);
1019 if (p_sys->flow->fd_nack >= 0)
1020 net_Close (p_sys->flow->fd_nack);
1021 if (p_sys->flow->fd_rtcp_m >= 0)
1022 net_Close (p_sys->flow->fd_rtcp_m);
1023 for (int i=0; i<RIST_QUEUE_SIZE; i++) {
1024 struct rtp_pkt *pkt = &(p_sys->flow->buffer[i]);
1025 if (pkt->buffer && pkt->buffer->i_buffer > 0) {
1026 block_Release(pkt->buffer);
1027 pkt->buffer = NULL;
1030 free(p_sys->flow->buffer);
1031 free(p_sys->flow);
1035 static void Close(vlc_object_t *p_this)
1037 stream_t *p_access = (stream_t*)p_this;
1038 stream_sys_t *p_sys = p_access->p_sys;
1040 vlc_queue_Kill(&p_sys->queue, &p_sys->dead);
1041 vlc_join(p_sys->thread, NULL);
1043 Clean( p_access );
1046 static int Open(vlc_object_t *p_this)
1048 stream_t *p_access = (stream_t*)p_this;
1049 stream_sys_t *p_sys = NULL;
1050 vlc_url_t parsed_url = { 0 };
1052 p_sys = vlc_obj_calloc( p_this, 1, sizeof( *p_sys ) );
1053 if( unlikely( p_sys == NULL ) )
1054 return VLC_ENOMEM;
1056 p_access->p_sys = p_sys;
1058 vlc_mutex_init( &p_sys->lock );
1060 if ( vlc_UrlParse( &parsed_url, p_access->psz_url ) == -1 )
1062 msg_Err( p_access, "Failed to parse input URL (%s)",
1063 p_access->psz_url );
1064 goto failed;
1067 /* Initialize rist flow */
1068 p_sys->b_ismulticast = is_multicast_address(parsed_url.psz_host);
1069 p_sys->flow = rist_udp_receiver(p_access, &parsed_url, p_sys->b_ismulticast);
1070 vlc_UrlClean( &parsed_url );
1071 if (!p_sys->flow)
1073 msg_Err( p_access, "Failed to open rist flow (%s)",
1074 p_access->psz_url );
1075 goto failed;
1078 p_sys->b_flag_discontinuity = false;
1079 p_sys->b_disablenacks = var_InheritBool( p_access, "disable-nacks" );
1080 p_sys->b_sendblindnacks = var_InheritBool( p_access, "mcast-blind-nacks" );
1081 if (p_sys->b_sendblindnacks && p_sys->b_disablenacks == false)
1082 p_sys->b_sendnacks = true;
1083 else
1084 p_sys->b_sendnacks = false;
1085 p_sys->nack_type = var_InheritInteger( p_access, "nack-type" );
1086 p_sys->i_max_packet_size = var_InheritInteger( p_access, "packet-size" );
1087 p_sys->i_poll_timeout = var_InheritInteger( p_access, "maximum-jitter" );
1088 p_sys->flow->retry_interval = var_InheritInteger( p_access, "retry-interval" );
1089 p_sys->flow->max_retries = var_InheritInteger( p_access, "max-retries" );
1090 p_sys->flow->latency = var_InheritInteger( p_access, "latency" );
1091 if (p_sys->b_disablenacks)
1092 p_sys->flow->reorder_buffer = p_sys->flow->latency;
1093 else
1094 p_sys->flow->reorder_buffer = var_InheritInteger( p_access, "reorder-buffer" );
1095 msg_Info(p_access, "Setting queue latency to %d ms", p_sys->flow->latency);
1097 /* Convert to rtp times */
1098 p_sys->flow->rtp_latency = rtp_get_ts(VLC_TICK_FROM_MS(p_sys->flow->latency));
1099 p_sys->flow->retry_interval = rtp_get_ts(VLC_TICK_FROM_MS(p_sys->flow->retry_interval));
1100 p_sys->flow->reorder_buffer = rtp_get_ts(VLC_TICK_FROM_MS(p_sys->flow->reorder_buffer));
1102 p_sys->dead = false;
1103 vlc_queue_Init(&p_sys->queue, offsetof (block_t, p_next));
1105 /* This extra thread is for sending feedback/nack packets even when no data comes in */
1106 if (vlc_clone(&p_sys->thread, rist_thread, p_access, VLC_THREAD_PRIORITY_INPUT))
1108 msg_Err(p_access, "Failed to create worker thread.");
1109 goto failed;
1112 p_access->pf_block = BlockRIST;
1113 p_access->pf_control = Control;
1115 return VLC_SUCCESS;
1117 failed:
1118 Clean( p_access );
1119 return VLC_EGENERIC;
1122 /* Module descriptor */
1123 vlc_module_begin ()
1125 set_shortname( N_("RIST") )
1126 set_description( N_("RIST input") )
1127 set_category( CAT_INPUT )
1128 set_subcategory( SUBCAT_INPUT_ACCESS )
1130 add_integer( "packet-size", RIST_MAX_PACKET_SIZE,
1131 N_("RIST maximum packet size (bytes)"), NULL, true )
1132 add_integer( "maximum-jitter", RIST_DEFAULT_POLL_TIMEOUT,
1133 N_("RIST demux/decode maximum jitter (default is 5ms)"),
1134 N_("This controls the maximum jitter that will be passed to the demux/decode chain. "
1135 "The lower the value, the more CPU cycles the algorithm will consume"), true )
1136 add_integer( "latency", RIST_DEFAULT_LATENCY, N_("RIST latency (ms)"), NULL, true )
1137 add_integer( "retry-interval", RIST_DEFAULT_RETRY_INTERVAL, N_("RIST nack retry interval (ms)"),
1138 NULL, true )
1139 add_integer( "reorder-buffer", RIST_DEFAULT_REORDER_BUFFER, N_("RIST reorder buffer (ms)"),
1140 NULL, true )
1141 add_integer( "max-retries", RIST_MAX_RETRIES, N_("RIST maximum retry count"), NULL, true )
1142 add_integer( "nack-type", NACK_FMT_RANGE,
1143 N_("RIST nack type, 0 = range, 1 = bitmask. Default is range"), NULL, true )
1144 change_integer_list( nack_type, nack_type_names )
1145 add_bool( "disable-nacks", false, "Disable NACK output packets",
1146 "Use this to disable packet recovery", true )
1147 add_bool( "mcast-blind-nacks", false, "Do not check for a valid rtcp message from the encoder",
1148 "Send nack messages even when we have not confirmed that the encoder is on our local " \
1149 "network.", true )
1151 set_capability( "access", 0 )
1152 add_shortcut( "rist", "tr06" )
1154 set_callbacks( Open, Close )
1156 vlc_module_end ()