access: rist: fix misspellings on help text
[vlc.git] / modules / access / rist.c
blob660734004244053ffd9fe899678dc7c14944bfa1
1 /*****************************************************************************
2 * rist.c: RIST (Reliable Internet Stream Transport) input module
3 *****************************************************************************
4 * Copyright (C) 2018, DVEO, the Broadcast Division of Computer Modules, Inc.
5 * Copyright (C) 2018, SipRadius LLC
7 * Authors: Sergio Ammirata <sergio@ammirata.net>
8 * Daniele Lacamera <root@danielinux.net>
10 * This program is free software; you can redistribute it and/or modify it
11 * under the terms of the GNU Lesser General Public License as published by
12 * the Free Software Foundation; either version 2.1 of the License, or
13 * (at your option) any later version.
15 * This program is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 * GNU Lesser General Public License for more details.
20 * You should have received a copy of the GNU Lesser General Public License
21 * along with this program; if not, write to the Free Software Foundation,
22 * Inc., 51 Franklin Street, Fifth Floor, Boston MA 02110-1301, USA.
23 *****************************************************************************/
25 #ifdef HAVE_CONFIG_H
26 # include "config.h"
27 #endif
29 #include <vlc_common.h>
30 #include <vlc_interrupt.h>
31 #include <vlc_plugin.h>
32 #include <vlc_access.h>
33 #include <vlc_threads.h>
34 #include <vlc_network.h>
35 #include <vlc_block.h>
36 #include <vlc_url.h>
37 #ifdef HAVE_POLL
38 #include <poll.h>
39 #endif
40 #include <bitstream/ietf/rtcp_rr.h>
41 #include <bitstream/ietf/rtcp_sdes.h>
42 #include <bitstream/ietf/rtcp_fb.h>
43 #include <bitstream/ietf/rtp.h>
45 #include "rist.h"
47 /* The default latency is 1000 ms */
48 #define RIST_DEFAULT_LATENCY 1000
49 /* The default nack retry interval */
50 #define RIST_DEFAULT_RETRY_INTERVAL 132
51 /* The default packet re-ordering buffer */
52 #define RIST_DEFAULT_REORDER_BUFFER 70
53 /* The default max packet size */
54 #define RIST_MAX_PACKET_SIZE 1472
55 /* The default timeout is 5 ms */
56 #define RIST_DEFAULT_POLL_TIMEOUT 5
57 /* The max retry count for nacks */
58 #define RIST_MAX_RETRIES 10
59 /* The rate at which we process and send nack requests */
60 #define NACK_INTERVAL 5 /*ms*/
61 /* Calculate and print stats once per second */
62 #define STATS_INTERVAL 1000 /*ms*/
64 static const int nack_type[] = {
65 0, 1,
68 static const char *const nack_type_names[] = {
69 N_("Range"), N_("Bitmask"),
72 enum NACK_TYPE {
73 NACK_FMT_RANGE = 0,
74 NACK_FMT_BITMASK
77 typedef struct
79 struct rist_flow *flow;
80 char sender_name[MAX_CNAME];
81 enum NACK_TYPE nack_type;
82 uint64_t last_data_rx;
83 uint64_t last_nack_tx;
84 vlc_thread_t thread;
85 int i_max_packet_size;
86 int i_poll_timeout;
87 int i_poll_timeout_current;
88 bool eof_on_reset;
89 block_fifo_t *p_fifo;
90 vlc_mutex_t lock;
91 uint64_t last_message;
92 uint64_t last_reset;
93 /* stat variables */
94 uint32_t i_poll_timeout_zero_count;
95 uint32_t i_poll_timeout_nonzero_count;
96 uint64_t i_last_stat;
97 float vbr_ratio;
98 uint16_t vbr_ratio_count;
99 uint32_t i_lost_packets;
100 uint32_t i_recovered_packets;
101 uint32_t i_reordered_packets;
102 uint32_t i_total_packets;
103 } stream_sys_t;
105 static int Control(stream_t *p_access, int i_query, va_list args)
107 switch( i_query )
109 case STREAM_CAN_SEEK:
110 case STREAM_CAN_FASTSEEK:
111 case STREAM_CAN_PAUSE:
112 case STREAM_CAN_CONTROL_PACE:
113 *va_arg( args, bool * ) = false;
114 break;
116 case STREAM_GET_PTS_DELAY:
117 *va_arg( args, vlc_tick_t * ) = VLC_TICK_FROM_MS(
118 var_InheritInteger(p_access, "network-caching") );
119 break;
121 default:
122 return VLC_EGENERIC;
125 return VLC_SUCCESS;
128 static struct rist_flow *rist_init_rx(void)
130 struct rist_flow *flow = calloc(1, sizeof(struct rist_flow));
131 if (!flow)
132 return NULL;
134 flow->reset = 2;
135 flow->buffer = calloc(RIST_QUEUE_SIZE, sizeof(struct rtp_pkt));
137 if ( unlikely( flow->buffer == NULL ) )
139 free(flow);
140 return NULL;
142 return flow;
145 static void rist_WriteTo_i11e_Locked(vlc_mutex_t lock, int fd, const void *buf, size_t len,
146 const struct sockaddr *peer, socklen_t slen)
148 vlc_mutex_lock( &lock );
149 rist_WriteTo_i11e(fd, buf, len, peer, slen);
150 vlc_mutex_unlock( &lock );
153 static struct rist_flow *rist_udp_receiver(stream_t *p_access, vlc_url_t *parsed_url)
155 stream_sys_t *p_sys = p_access->p_sys;
156 msg_Info( p_access, "Opening Rist Flow Receiver at %s:%d and %s:%d",
157 parsed_url->psz_host, parsed_url->i_port,
158 parsed_url->psz_host, parsed_url->i_port+1);
160 p_sys->flow = rist_init_rx();
161 if (!p_sys->flow)
162 return NULL;
164 p_sys->flow->fd_in = net_OpenDgram(p_access, parsed_url->psz_host, parsed_url->i_port, NULL,
165 0, IPPROTO_UDP);
166 if (p_sys->flow->fd_in < 0)
168 msg_Err( p_access, "cannot open input socket" );
169 return NULL;
172 p_sys->flow->fd_nack = net_OpenDgram(p_access, parsed_url->psz_host, parsed_url->i_port + 1,
173 NULL, 0, IPPROTO_UDP);
174 if (p_sys->flow->fd_nack < 0)
176 msg_Err( p_access, "cannot open nack socket" );
177 return NULL;
180 populate_cname(p_sys->flow->fd_nack, p_sys->flow->cname);
181 msg_Info(p_access, "our cname is %s", p_sys->flow->cname);
183 return p_sys->flow;
186 static int is_index_in_range(struct rist_flow *flow, uint16_t idx)
188 if (flow->ri <= flow->wi) {
189 return ((idx > flow->ri) && (idx <= flow->wi));
190 } else {
191 return ((idx > flow->ri) || (idx <= flow->wi));
195 static void send_rtcp_feedback(stream_t *p_access, struct rist_flow *flow)
197 stream_sys_t *p_sys = p_access->p_sys;
198 int namelen = strlen(flow->cname) + 1;
200 /* we need to make sure it is a multiple of 4, pad if necessary */
201 if ((namelen - 2) & 0x3)
202 namelen = ((((namelen - 2) >> 2) + 1) << 2) + 2;
204 int rtcp_feedback_size = RTCP_EMPTY_RR_SIZE + RTCP_SDES_SIZE + namelen;
205 uint8_t *buf = malloc(rtcp_feedback_size);
206 if ( unlikely( buf == NULL ) )
207 return;
209 /* Populate RR */
210 uint8_t *rr = buf;
211 rtp_set_hdr(rr);
212 rtcp_rr_set_pt(rr);
213 rtcp_set_length(rr, 1);
214 rtcp_fb_set_int_ssrc_pkt_sender(rr, 0);
216 /* Populate SDES */
217 uint8_t *p_sdes = (buf + RTCP_EMPTY_RR_SIZE);
218 rtp_set_hdr(p_sdes);
219 rtp_set_cc(p_sdes, 1); /* Actually it is source count in this case */
220 rtcp_sdes_set_pt(p_sdes);
221 rtcp_set_length(p_sdes, (namelen >> 2) + 2);
222 rtcp_sdes_set_cname(p_sdes, 1);
223 rtcp_sdes_set_name_length(p_sdes, strlen(flow->cname));
224 uint8_t *p_sdes_name = (buf + RTCP_EMPTY_RR_SIZE + RTCP_SDES_SIZE);
225 strlcpy((char *)p_sdes_name, flow->cname, namelen);
227 /* Write to Socket */
228 rist_WriteTo_i11e_Locked(p_sys->lock, flow->fd_nack, buf, rtcp_feedback_size,
229 (struct sockaddr *)&flow->peer_sockaddr, flow->peer_socklen);
230 free(buf);
231 buf = NULL;
234 static void send_bbnack(stream_t *p_access, int fd_nack, block_t *pkt_nacks, uint16_t nack_count)
236 stream_sys_t *p_sys = p_access->p_sys;
237 struct rist_flow *flow = p_sys->flow;
238 int len = 0;
240 int bbnack_bufsize = RTCP_FB_HEADER_SIZE +
241 RTCP_FB_FCI_GENERIC_NACK_SIZE * nack_count;
242 uint8_t *buf = malloc(bbnack_bufsize);
243 if ( unlikely( buf == NULL ) )
244 return;
246 /* Populate NACKS */
247 uint8_t *nack = buf;
248 rtp_set_hdr(nack);
249 rtcp_fb_set_fmt(nack, NACK_FMT_BITMASK);
250 rtcp_set_pt(nack, RTCP_PT_RTPFB);
251 rtcp_set_length(nack, 2 + nack_count);
252 /*uint8_t name[4] = "RIST";*/
253 /*rtcp_fb_set_ssrc_media_src(nack, name);*/
254 len += RTCP_FB_HEADER_SIZE;
255 /* TODO : group together */
256 uint16_t nacks[MAX_NACKS];
257 memcpy(nacks, pkt_nacks->p_buffer, pkt_nacks->i_buffer);
258 for (int i = 0; i < nack_count; i++) {
259 uint8_t *nack_record = buf + len + RTCP_FB_FCI_GENERIC_NACK_SIZE*i;
260 rtcp_fb_nack_set_packet_id(nack_record, nacks[i]);
261 rtcp_fb_nack_set_bitmask_lost(nack_record, 0);
263 len += RTCP_FB_FCI_GENERIC_NACK_SIZE * nack_count;
265 /* Write to Socket */
266 rist_WriteTo_i11e_Locked(p_sys->lock, fd_nack, buf, len,
267 (struct sockaddr *)&flow->peer_sockaddr, flow->peer_socklen);
268 free(buf);
269 buf = NULL;
272 static void send_rbnack(stream_t *p_access, int fd_nack, block_t *pkt_nacks, uint16_t nack_count)
274 stream_sys_t *p_sys = p_access->p_sys;
275 struct rist_flow *flow = p_sys->flow;
276 int len = 0;
278 int rbnack_bufsize = RTCP_FB_HEADER_SIZE +
279 RTCP_FB_FCI_GENERIC_NACK_SIZE * nack_count;
280 uint8_t *buf = malloc(rbnack_bufsize);
281 if ( unlikely( buf == NULL ) )
282 return;
284 /* Populate NACKS */
285 uint8_t *nack = buf;
286 rtp_set_hdr(nack);
287 rtcp_fb_set_fmt(nack, NACK_FMT_RANGE);
288 rtcp_set_pt(nack, RTCP_PT_RTPFR);
289 rtcp_set_length(nack, 2 + nack_count);
290 uint8_t name[4] = "RIST";
291 rtcp_fb_set_ssrc_media_src(nack, name);
292 len += RTCP_FB_HEADER_SIZE;
293 /* TODO : group together */
294 uint16_t nacks[MAX_NACKS];
295 memcpy(nacks, pkt_nacks->p_buffer, pkt_nacks->i_buffer);
296 for (int i = 0; i < nack_count; i++)
298 uint8_t *nack_record = buf + len + RTCP_FB_FCI_GENERIC_NACK_SIZE*i;
299 rtcp_fb_nack_set_range_start(nack_record, nacks[i]);
300 rtcp_fb_nack_set_range_extra(nack_record, 0);
302 len += RTCP_FB_FCI_GENERIC_NACK_SIZE * nack_count;
304 /* Write to Socket */
305 rist_WriteTo_i11e_Locked(p_sys->lock, fd_nack, buf, len,
306 (struct sockaddr *)&flow->peer_sockaddr, flow->peer_socklen);
307 free(buf);
308 buf = NULL;
311 static void send_nacks(stream_t *p_access, struct rist_flow *flow)
313 stream_sys_t *p_sys = p_access->p_sys;
314 struct rtp_pkt *pkt;
315 uint16_t idx;
316 uint64_t last_ts = 0;
317 uint16_t null_count = 0;
318 int nacks_len = 0;
319 uint16_t nacks[MAX_NACKS];
321 idx = flow->ri;
322 while(idx++ != flow->wi)
324 pkt = &(flow->buffer[idx]);
325 if (pkt->buffer == NULL)
327 if (nacks_len + 1 >= MAX_NACKS)
329 break;
331 else
333 null_count++;
334 /* TODO: after adding average spacing calculation, change this formula
335 to extrapolated_ts = last_ts + null_count * avg_delta_ts; */
336 uint64_t extrapolated_ts = last_ts;
337 /* Find out the age and add it only if necessary */
338 int retry_count = flow->nacks_retries[idx];
339 uint64_t age = flow->hi_timestamp - extrapolated_ts;
340 uint64_t expiration;
341 if (retry_count == 0){
342 expiration = flow->reorder_buffer;
343 } else {
344 expiration = (uint64_t)flow->nacks_retries[idx] * (uint64_t)flow->retry_interval;
346 if (age > expiration && retry_count <= flow->max_retries)
348 flow->nacks_retries[idx]++;
349 nacks[nacks_len++] = idx;
350 msg_Dbg(p_access, "Sending NACK for seq %d, age %"PRId64" ms, retry %u, " \
351 "expiration %"PRId64" ms", idx, ts_get_from_rtp(age)/1000,
352 flow->nacks_retries[idx], ts_get_from_rtp(expiration)/1000);
356 else
358 last_ts = pkt->rtp_ts;
359 null_count = 0;
362 if (nacks_len > 0)
364 block_t *pkt_nacks = block_Alloc(nacks_len * 2);
365 if (pkt_nacks)
367 memcpy(pkt_nacks->p_buffer, nacks, nacks_len * 2);
368 pkt_nacks->i_buffer = nacks_len * 2;
369 block_FifoPut( p_sys->p_fifo, pkt_nacks );
374 static int sockaddr_cmp(struct sockaddr *x, struct sockaddr *y)
376 #define CMP(a, b) if (a != b) return a < b ? -1 : 1
378 CMP(x->sa_family, y->sa_family);
380 if (x->sa_family == AF_INET)
382 struct sockaddr_in *xin = (void*)x, *yin = (void*)y;
383 CMP(ntohl(xin->sin_addr.s_addr), ntohl(yin->sin_addr.s_addr));
384 CMP(ntohs(xin->sin_port), ntohs(yin->sin_port));
386 else if (x->sa_family == AF_INET6)
388 struct sockaddr_in6 *xin6 = (void*)x, *yin6 = (void*)y;
389 int r = memcmp(xin6->sin6_addr.s6_addr, yin6->sin6_addr.s6_addr,
390 sizeof(xin6->sin6_addr.s6_addr));
391 if (r != 0)
392 return r;
393 CMP(ntohs(xin6->sin6_port), ntohs(yin6->sin6_port));
394 CMP(xin6->sin6_flowinfo, yin6->sin6_flowinfo);
395 CMP(xin6->sin6_scope_id, yin6->sin6_scope_id);
398 #undef CMP
399 return 0;
402 static void print_sockaddr_info_change(stream_t *p_access, struct sockaddr *x, struct sockaddr *y)
404 if (x->sa_family == AF_INET)
406 struct sockaddr_in *xin = (void*)x, *yin = (void*)y;
407 msg_Info(p_access, "Peer IP:Port change detected: old IP:Port %s:%d, new IP:Port %s:%d",
408 inet_ntoa(xin->sin_addr), ntohs(xin->sin_port), inet_ntoa(yin->sin_addr),
409 ntohs(yin->sin_port));
411 else if (x->sa_family == AF_INET6)
413 struct sockaddr_in6 *xin6 = (void*)x, *yin6 = (void*)y;
414 char oldstr[INET6_ADDRSTRLEN];
415 char newstr[INET6_ADDRSTRLEN];
416 inet_ntop(xin6->sin6_family, &xin6->sin6_addr, oldstr, sizeof(struct in6_addr));
417 inet_ntop(yin6->sin6_family, &yin6->sin6_addr, newstr, sizeof(struct in6_addr));
418 msg_Info(p_access, "Peer IP:Port change detected: old IP:Port %s:%d, new IP:Port %s:%d",
419 oldstr, ntohs(xin6->sin6_port), newstr, ntohs(yin6->sin6_port));
423 static void print_sockaddr_info(stream_t *p_access, struct sockaddr *x)
425 if (x->sa_family == AF_INET)
427 struct sockaddr_in *xin = (void*)x;
428 msg_Info(p_access, "Peer IP:Port %s:%d", inet_ntoa(xin->sin_addr), ntohs(xin->sin_port));
430 else if (x->sa_family == AF_INET6)
432 struct sockaddr_in6 *xin6 = (void*)x;
433 char str[INET6_ADDRSTRLEN];
434 inet_ntop(xin6->sin6_family, &xin6->sin6_addr, str, sizeof(struct in6_addr));
435 msg_Info(p_access, "Peer IP:Port %s:%d", str, ntohs(xin6->sin6_port));
439 static void rtcp_input(stream_t *p_access, struct rist_flow *flow, uint8_t *buf_in, size_t len,
440 struct sockaddr *peer, socklen_t slen)
442 stream_sys_t *p_sys = p_access->p_sys;
443 uint8_t ptype;
444 uint16_t processed_bytes = 0;
445 uint16_t records;
446 char new_sender_name[MAX_CNAME];
447 uint8_t *buf;
449 while (processed_bytes < len) {
450 buf = buf_in + processed_bytes;
451 /* safety checks */
452 uint16_t bytes_left = len - processed_bytes + 1;
453 if ( bytes_left < 4 )
455 /* we must have at least 4 bytes */
456 msg_Err(p_access, "Rist rtcp packet must have at least 4 bytes, we have %d",
457 bytes_left);
458 return;
460 else if (!rtp_check_hdr(buf))
462 /* check for a valid rtp header */
463 msg_Err(p_access, "Malformed rtcp packet starting with %02x, ignoring.", buf[0]);
464 return;
467 ptype = rtcp_get_pt(buf);
468 records = rtcp_get_length(buf);
469 uint16_t bytes = (uint16_t)(4 * (1 + records));
470 if (bytes > bytes_left)
472 /* check for a sane number of bytes */
473 msg_Err(p_access, "Malformed rtcp packet, wrong len %d, expecting %u bytes in the " \
474 "packet, got a buffer of %u bytes.", rtcp_get_length(buf), bytes, bytes_left);
475 return;
478 switch(ptype) {
479 case RTCP_PT_RTPFR:
480 case RTCP_PT_RTPFB:
481 break;
483 case RTCP_PT_RR:
484 break;
486 case RTCP_PT_SDES:
488 /* Check for changes in source IP address or port */
489 int8_t name_length = rtcp_sdes_get_name_length(buf);
490 if (name_length > bytes_left)
492 /* check for a sane number of bytes */
493 msg_Err(p_access, "Malformed SDES packet, wrong cname len %u, got a " \
494 "buffer of %u bytes.", name_length, bytes_left);
495 return;
497 bool ip_port_changed = false;
498 if (sockaddr_cmp((struct sockaddr *)&flow->peer_sockaddr, peer) != 0)
500 ip_port_changed = true;
501 if(flow->peer_socklen > 0)
502 print_sockaddr_info_change(p_access,
503 (struct sockaddr *)&flow->peer_sockaddr, peer);
504 else
505 print_sockaddr_info(p_access, peer);
506 vlc_mutex_lock( &p_sys->lock );
507 memcpy(&flow->peer_sockaddr, peer, sizeof(struct sockaddr_storage));
508 flow->peer_socklen = slen;
509 vlc_mutex_unlock( &p_sys->lock );
512 /* Check for changes in cname */
513 bool peer_name_changed = false;
514 memset(new_sender_name, 0, MAX_CNAME);
515 memcpy(new_sender_name, buf + RTCP_SDES_SIZE, name_length);
516 if (memcmp(new_sender_name, p_sys->sender_name, name_length) != 0)
518 peer_name_changed = true;
519 if (strcmp(p_sys->sender_name, "") == 0)
520 msg_Info(p_access, "Peer Name: %s", new_sender_name);
521 else
522 msg_Info(p_access, "Peer Name change detected: old Name: %s, new " \
523 "Name: %s", p_sys->sender_name, new_sender_name);
524 memset(p_sys->sender_name, 0, MAX_CNAME);
525 memcpy(p_sys->sender_name, buf + RTCP_SDES_SIZE, name_length);
528 /* Reset the buffer as the source must have been restarted */
529 if (peer_name_changed || ip_port_changed)
531 /* reset the buffer */
532 flow->reset = 1;
535 break;
537 case RTCP_PT_SR:
538 break;
540 default:
541 msg_Err(p_access, " Unrecognized RTCP packet with PTYPE=%02x!!", ptype);
543 processed_bytes += (4 * (1 + records));
547 static bool rist_input(stream_t *p_access, struct rist_flow *flow, uint8_t *buf, size_t len)
549 stream_sys_t *p_sys = p_access->p_sys;
551 /* safety checks */
552 if ( len < RTP_HEADER_SIZE )
554 /* check if packet size >= rtp header size */
555 msg_Err(p_access, "Rist rtp packet must have at least 12 bytes, we have %lu", len);
556 return false;
558 else if (!rtp_check_hdr(buf))
560 /* check for a valid rtp header */
561 msg_Err(p_access, "Malformed rtp packet header starting with %02x, ignoring.", buf[0]);
562 return false;
565 uint16_t idx = rtp_get_seqnum(buf);
566 uint32_t pkt_ts = rtp_get_timestamp(buf);
567 bool retrasnmitted = false;
568 bool success = true;
569 uint64_t now = vlc_tick_now();
571 if (flow->reset == 2)
573 if ((uint64_t)(now - p_sys->last_message) > (uint64_t)VLC_TICK_FROM_MS(flow->latency) ) {
574 msg_Info(p_access, "Waiting for Sender's Coordinates, i.e. rtcp handshake ...");
576 p_sys->last_message = now;
577 return success;
579 else if (flow->reset == 1)
581 msg_Info(p_access, "Traffic detected after buffer reset");
582 /* First packet in the queue */
583 flow->hi_timestamp = pkt_ts;
584 msg_Info(p_access, "ts@%u", flow->hi_timestamp);
585 flow->wi = idx;
586 flow->ri = idx;
587 flow->reset = 0;
590 /* Check to see if this is a retransmission or a regular packet */
591 if (buf[11] & (1 << 0))
593 msg_Dbg(p_access, "Packet %d RECOVERED, Window: [%d:%d-->%d]", idx, flow->ri, flow->wi,
594 flow->wi-flow->ri);
595 p_sys->i_recovered_packets++;
596 retrasnmitted = true;
598 else if (flow->wi != flow->ri)
600 /* Reset counter to 0 on incoming holes */
601 /* Regular packets only as retransmits are expected to come in out of order */
602 uint16_t idxnext = (uint16_t)(flow->wi + 1);
603 if (idx != idxnext)
605 if (idx > idxnext) {
606 msg_Dbg(p_access, "Gap, got %d, expected %d, %d packet gap, Window: [%d:%d-->%d]",
607 idx, idxnext, idx - idxnext, flow->ri, flow->wi, (uint16_t)(flow->wi-flow->ri));
608 } else {
609 p_sys->i_reordered_packets++;
610 msg_Dbg(p_access, "Out of order, got %d, expected %d, Window: [%d:%d-->%d]", idx,
611 idxnext, flow->ri, flow->wi, (uint16_t)(flow->wi-flow->ri));
613 uint16_t zero_counter = (uint16_t)(flow->wi + 1);
614 while(zero_counter++ != idx) {
615 flow->nacks_retries[zero_counter] = 0;
617 /*msg_Dbg(p_access, "Gap, reseting %d packets as zero nacks %d to %d",
618 idx - flow->wi - 1, (uint16_t)(flow->wi + 1), idx);*/
622 /* Always replace the existing one with the new one */
623 struct rtp_pkt *pkt;
624 pkt = &(flow->buffer[idx]);
625 if (pkt->buffer && pkt->buffer->i_buffer > 0)
627 block_Release(pkt->buffer);
628 pkt->buffer = NULL;
630 pkt->buffer = block_Alloc(len);
631 if (!pkt->buffer)
632 return false;
634 pkt->buffer->i_buffer = len;
635 memcpy(pkt->buffer->p_buffer, buf, len);
636 pkt->rtp_ts = pkt_ts;
637 p_sys->last_data_rx = vlc_tick_now();
638 /* Reset the try counter regardless of wether it was a retransmit or not */
639 flow->nacks_retries[idx] = 0;
641 if (retrasnmitted)
642 return success;
644 p_sys->i_total_packets++;
645 /* Perform discontinuity checks and udpdate counters */
646 if (!is_index_in_range(flow, idx) && pkt_ts >= flow->hi_timestamp)
648 if ((pkt_ts - flow->hi_timestamp) > flow->hi_timestamp/10)
650 msg_Info(p_access, "Forward stream discontinuity idx@%d/%d/%d ts@%u/%u", flow->ri, idx,
651 flow->wi, pkt_ts, flow->hi_timestamp);
652 flow->reset = 1;
653 success = false;
655 else
657 flow->wi = idx;
658 flow->hi_timestamp = pkt_ts;
661 else if (!is_index_in_range(flow, idx))
663 /* incoming timestamp just jumped back in time or index is outside of scope */
664 msg_Info(p_access, "Backwards stream discontinuity idx@%d/%d/%d ts@%u/%u", flow->ri, idx,
665 flow->wi, pkt_ts, flow->hi_timestamp);
666 flow->reset = 1;
667 success = false;
670 return success;
673 static block_t *rist_dequeue(stream_t *p_access, struct rist_flow *flow)
675 stream_sys_t *p_sys = p_access->p_sys;
676 block_t *pktout = NULL;
677 struct rtp_pkt *pkt;
678 uint16_t idx;
679 if (flow->ri == flow->wi || flow->reset > 0)
680 return NULL;
682 idx = flow->ri;
683 bool found_data = false;
684 uint16_t loss_amount = 0;
685 while(idx++ != flow->wi) {
687 pkt = &(flow->buffer[idx]);
688 if (!pkt->buffer)
690 /*msg_Info(p_access, "Possible packet loss on index #%d", idx);*/
691 loss_amount++;
692 /* We move ahead until we find a timestamp but we do not move the cursor.
693 * None of them are guaranteed packet loss because we do not really
694 * know their timestamps. They might still arrive on the next loop.
695 * We can confirm the loss only if we get a valid packet in the loop below. */
696 continue;
699 /*printf("IDX=%d, flow->hi_timestamp: %u, (ts + flow->rtp_latency): %u\n", idx,
700 flow->hi_timestamp, (ts - 100 * flow->qdelay));*/
701 if (flow->hi_timestamp > (uint32_t)(pkt->rtp_ts + flow->rtp_latency))
703 /* Populate output packet now but remove rtp header from source */
704 int newSize = pkt->buffer->i_buffer - RTP_HEADER_SIZE;
705 pktout = block_Alloc(newSize);
706 if (pktout)
708 pktout->i_buffer = newSize;
709 memcpy(pktout->p_buffer, pkt->buffer->p_buffer + RTP_HEADER_SIZE, newSize);
710 /* free the buffer and increase the read index */
711 flow->ri = idx;
712 /* TODO: calculate average duration using buffer average (bring from sender) */
713 found_data = true;
715 block_Release(pkt->buffer);
716 pkt->buffer = NULL;
717 break;
722 if (loss_amount > 0 && found_data == true)
724 /* Packet loss confirmed, we found valid data after the holes */
725 msg_Dbg(p_access, "Packet NOT RECOVERED, %d packet(s), Window: [%d:%d]", loss_amount,
726 flow->ri, flow->wi);
727 p_sys->i_lost_packets += loss_amount;
730 return pktout;
733 static void *rist_thread(void *data)
735 stream_t *p_access = data;
736 stream_sys_t *p_sys = p_access->p_sys;
738 /* Process nacks every 5ms */
739 /* We only ask for the relevant ones */
740 for (;;) {
741 block_t *pkt_nacks = block_FifoGet(p_sys->p_fifo);
743 int canc = vlc_savecancel();
745 /* there are two bytes per nack */
746 uint16_t nack_count = (uint16_t)pkt_nacks->i_buffer/2;
747 switch(p_sys->nack_type) {
748 case NACK_FMT_BITMASK:
749 send_bbnack(p_access, p_sys->flow->fd_nack, pkt_nacks, nack_count);
750 break;
752 default:
753 send_rbnack(p_access, p_sys->flow->fd_nack, pkt_nacks, nack_count);
756 if (nack_count > 1)
757 msg_Dbg(p_access, "Sent %u NACKs !!!", nack_count);
758 block_Release(pkt_nacks);
760 vlc_restorecancel (canc);
763 return NULL;
766 static block_t *BlockRIST(stream_t *p_access, bool *restrict eof)
768 stream_sys_t *p_sys = p_access->p_sys;
769 uint64_t now;
770 *eof = false;
771 block_t *pktout = NULL;
772 struct pollfd pfd[2];
773 int ret;
774 ssize_t r;
775 struct sockaddr_storage peer;
776 socklen_t slen = sizeof(struct sockaddr_storage);
777 struct rist_flow *flow = p_sys->flow;
779 if (vlc_killed() || (flow->reset == 1 && p_sys->eof_on_reset))
781 *eof = true;
782 return NULL;
785 pfd[0].fd = flow->fd_in;
786 pfd[0].events = POLLIN;
787 pfd[1].fd = flow->fd_nack;
788 pfd[1].events = POLLIN;
790 /* The protocol uses a fifo buffer with a fixed time delay.
791 * That buffer needs to be emptied at a rate that is determined by the rtp timestamps of the
792 * packets. If I waited indefinitely for data coming in, the rate and delay of output packets
793 * would be wrong. I am calling the rist_dequeue function every time a data packet comes in
794 * and also every time we get a poll timeout. The configurable poll timeout is for controling
795 * the maximum jitter of output data coming out of the buffer. The default 5ms timeout covers
796 * most cases. */
798 ret = vlc_poll_i11e(pfd, 2, p_sys->i_poll_timeout_current);
799 if (unlikely(ret < 0))
800 return NULL;
801 else if (ret == 0)
803 /* Poll timeout, check the queue for the next packet that needs to be delivered */
804 pktout = rist_dequeue(p_access, flow);
805 /* if there is data, we need to come back faster to finish emptying it */
806 if (pktout) {
807 p_sys->i_poll_timeout_current = 0;
808 p_sys->i_poll_timeout_zero_count++;
809 } else {
810 p_sys->i_poll_timeout_current = p_sys->i_poll_timeout;
811 p_sys->i_poll_timeout_nonzero_count++;
814 else
817 uint8_t *buf = malloc(p_sys->i_max_packet_size);
818 if ( unlikely( buf == NULL ) )
819 return NULL;
821 /* Process rctp incoming data */
822 if (pfd[1].revents & POLLIN)
824 r = rist_ReadFrom_i11e(flow->fd_nack, buf, p_sys->i_max_packet_size,
825 (struct sockaddr *)&peer, &slen);
826 if (unlikely(r == -1)) {
827 msg_Err(p_access, "socket %d error: %s\n", flow->fd_nack, gai_strerror(errno));
829 else {
830 rtcp_input(p_access, flow, buf, r, (struct sockaddr *)&peer, slen);
834 /* Process regular incoming data */
835 if (pfd[0].revents & POLLIN)
837 r = rist_Read_i11e(flow->fd_in, buf, p_sys->i_max_packet_size);
838 if (unlikely(r == -1)) {
839 msg_Err(p_access, "socket %d error: %s\n", flow->fd_in, gai_strerror(errno));
841 else
843 /* rist_input will process and queue the pkt */
844 if (rist_input(p_access, flow, buf, r))
846 /* Check the queue for the next packet that needs to be delivered */
847 pktout = rist_dequeue(p_access, flow);
848 if (pktout) {
849 p_sys->i_poll_timeout_current = 0;
850 p_sys->i_poll_timeout_zero_count++;
851 } else {
852 p_sys->i_poll_timeout_current = p_sys->i_poll_timeout;
853 p_sys->i_poll_timeout_nonzero_count++;
856 else
858 if (p_sys->eof_on_reset)
859 *eof = true;
864 free(buf);
865 buf = NULL;
868 now = vlc_tick_now();
870 /* Process stats and print them out */
871 /* We need to measure some items every 70ms */
872 uint64_t interval = (now - flow->feedback_time);
873 if ( interval > VLC_TICK_FROM_MS(RTCP_INTERVAL) )
875 if (p_sys->i_poll_timeout_nonzero_count > 0)
877 float ratio = (float)p_sys->i_poll_timeout_zero_count
878 / (float)p_sys->i_poll_timeout_nonzero_count;
879 if (ratio <= 1)
880 p_sys->vbr_ratio += 1 - ratio;
881 else
882 p_sys->vbr_ratio += ratio - 1;
883 p_sys->vbr_ratio_count++;
884 /*msg_Dbg(p_access, "zero poll %u, non-zero poll %u, ratio %.2f",
885 p_sys->i_poll_timeout_zero_count, p_sys->i_poll_timeout_nonzero_count, ratio);*/
886 p_sys->i_poll_timeout_zero_count = 0;
887 p_sys->i_poll_timeout_nonzero_count = 0;
890 /* We print out the stats once per second */
891 interval = (now - p_sys->i_last_stat);
892 if ( interval > VLC_TICK_FROM_MS(STATS_INTERVAL) )
894 if ( p_sys->i_lost_packets > 0)
895 msg_Err(p_access, "We have %d lost packets", p_sys->i_lost_packets);
896 float ratio = 1;
897 if (p_sys->vbr_ratio_count > 0)
898 ratio = p_sys->vbr_ratio / (float)p_sys->vbr_ratio_count;
899 float quality = 100;
900 if (p_sys->i_total_packets > 0)
901 quality -= (float)100*(float)(p_sys->i_lost_packets + p_sys->i_recovered_packets +
902 p_sys->i_reordered_packets)/(float)p_sys->i_total_packets;
903 if (quality != 100)
904 msg_Info(p_access, "STATS: Total %u, Recovered %u, Reordered %u, Lost %u, VBR Score " \
905 "%.2f, Link Quality %.2f%%", p_sys->i_total_packets, p_sys->i_recovered_packets,
906 p_sys->i_reordered_packets, p_sys->i_lost_packets, ratio, quality);
907 p_sys->i_last_stat = now;
908 p_sys->vbr_ratio = 0;
909 p_sys->vbr_ratio_count = 0;
910 p_sys->i_lost_packets = 0;
911 p_sys->i_recovered_packets = 0;
912 p_sys->i_reordered_packets = 0;
913 p_sys->i_total_packets = 0;
916 /* Send rtcp feedback every RTCP_INTERVAL */
917 interval = (now - flow->feedback_time);
918 if ( interval > VLC_TICK_FROM_MS(RTCP_INTERVAL) )
920 /* msg_Dbg(p_access, "Calling RTCP Feedback %lu<%d ms using timer", interval,
921 VLC_TICK_FROM_MS(RTCP_INTERVAL)); */
922 send_rtcp_feedback(p_access, flow);
923 flow->feedback_time = now;
926 /* Send nacks every NACK_INTERVAL (only the ones that have matured, if any) */
927 interval = (now - p_sys->last_nack_tx);
928 if ( interval > VLC_TICK_FROM_MS(NACK_INTERVAL) )
930 send_nacks(p_access, p_sys->flow);
931 p_sys->last_nack_tx = now;
934 /* Safety check for when the input stream stalls */
935 if ( p_sys->last_data_rx > 0 && now > p_sys->last_data_rx &&
936 (uint64_t)(now - p_sys->last_data_rx) > (uint64_t)VLC_TICK_FROM_MS(flow->latency) &&
937 (uint64_t)(now - p_sys->last_reset) > (uint64_t)VLC_TICK_FROM_MS(flow->latency) )
939 msg_Err(p_access, "No data received for %"PRId64" ms, resetting buffers",
940 (int64_t)(now - p_sys->last_data_rx)/1000);
941 p_sys->last_reset = now;
942 flow->reset = 1;
945 if (pktout)
946 return pktout;
947 else
948 return NULL;
951 static void Clean( stream_t *p_access )
953 stream_sys_t *p_sys = p_access->p_sys;
955 if( likely(p_sys->p_fifo != NULL) )
956 block_FifoRelease( p_sys->p_fifo );
958 if (p_sys->flow)
960 if (p_sys->flow->fd_in >= 0)
961 net_Close (p_sys->flow->fd_in);
962 if (p_sys->flow->fd_nack >= 0)
963 net_Close (p_sys->flow->fd_nack);
964 for (int i=0; i<RIST_QUEUE_SIZE; i++) {
965 struct rtp_pkt *pkt = &(p_sys->flow->buffer[i]);
966 if (pkt->buffer && pkt->buffer->i_buffer > 0) {
967 block_Release(pkt->buffer);
968 pkt->buffer = NULL;
971 free(p_sys->flow->buffer);
972 free(p_sys->flow);
975 vlc_mutex_destroy( &p_sys->lock );
978 static void Close(vlc_object_t *p_this)
980 stream_t *p_access = (stream_t*)p_this;
981 stream_sys_t *p_sys = p_access->p_sys;
983 vlc_cancel(p_sys->thread);
984 vlc_join(p_sys->thread, NULL);
986 Clean( p_access );
989 static int Open(vlc_object_t *p_this)
991 stream_t *p_access = (stream_t*)p_this;
992 stream_sys_t *p_sys = NULL;
993 vlc_url_t parsed_url = { 0 };
995 p_sys = vlc_obj_calloc( p_this, 1, sizeof( *p_sys ) );
996 if( unlikely( p_sys == NULL ) )
997 return VLC_ENOMEM;
999 p_access->p_sys = p_sys;
1001 vlc_mutex_init( &p_sys->lock );
1003 if ( vlc_UrlParse( &parsed_url, p_access->psz_url ) == -1 )
1005 msg_Err( p_access, "Failed to parse input URL (%s)",
1006 p_access->psz_url );
1007 goto failed;
1010 /* Initialize rist flow */
1011 p_sys->flow = rist_udp_receiver(p_access, &parsed_url);
1012 vlc_UrlClean( &parsed_url );
1013 if (!p_sys->flow)
1015 msg_Err( p_access, "Failed to open rist flow (%s)",
1016 p_access->psz_url );
1017 goto failed;
1020 p_sys->nack_type = var_InheritInteger( p_access, "nack-type" );
1021 p_sys->i_max_packet_size = var_InheritInteger( p_access, "packet-size" );
1022 p_sys->i_poll_timeout = var_InheritInteger( p_access, "maximum-jitter" );
1023 p_sys->eof_on_reset = var_InheritBool( p_access, "eof-on-reset" );
1024 p_sys->flow->retry_interval = var_InheritInteger( p_access, "retry-interval" );
1025 p_sys->flow->reorder_buffer = var_InheritInteger( p_access, "reorder-buffer" );
1026 p_sys->flow->max_retries = var_InheritInteger( p_access, "max-retries" );
1027 p_sys->flow->latency = var_InheritInteger( p_access, "latency" );
1028 msg_Info(p_access, "Setting queue latency to %d ms", p_sys->flow->latency);
1030 /* Convert to rtp times */
1031 p_sys->flow->rtp_latency = rtp_get_ts(VLC_TICK_FROM_MS(p_sys->flow->latency));
1032 p_sys->flow->retry_interval = rtp_get_ts(VLC_TICK_FROM_MS(p_sys->flow->retry_interval));
1033 p_sys->flow->reorder_buffer = rtp_get_ts(VLC_TICK_FROM_MS(p_sys->flow->reorder_buffer));
1035 p_sys->p_fifo = block_FifoNew();
1036 if( unlikely(p_sys->p_fifo == NULL) )
1037 goto failed;
1039 /* This extra thread is for sending feedback/nack packets even when no data comes in */
1040 if (vlc_clone(&p_sys->thread, rist_thread, p_access, VLC_THREAD_PRIORITY_INPUT))
1042 msg_Err(p_access, "Failed to create worker thread.");
1043 goto failed;
1046 p_access->pf_block = BlockRIST;
1047 p_access->pf_control = Control;
1049 return VLC_SUCCESS;
1051 failed:
1052 Clean( p_access );
1053 return VLC_EGENERIC;
1056 /* Module descriptor */
1057 vlc_module_begin ()
1059 set_shortname( N_("RIST") )
1060 set_description( N_("RIST input") )
1061 set_category( CAT_INPUT )
1062 set_subcategory( SUBCAT_INPUT_ACCESS )
1064 add_integer( "packet-size", RIST_MAX_PACKET_SIZE,
1065 N_("RIST maximum packet size (bytes)"), NULL, true )
1066 add_integer( "maximum-jitter", RIST_DEFAULT_POLL_TIMEOUT,
1067 N_("RIST demux/decode maximum jitter (default is 5ms)"),
1068 N_("This controls the maximum jitter that will be passed to the demux/decode chain. "
1069 "The lower the value, the more CPU cycles the algorithm will consume"), true )
1070 add_integer( "latency", RIST_DEFAULT_LATENCY, N_("RIST latency (ms)"), NULL, true )
1071 add_integer( "retry-interval", RIST_DEFAULT_RETRY_INTERVAL, N_("RIST nack retry interval (ms)"),
1072 NULL, true )
1073 add_integer( "reorder-buffer", RIST_DEFAULT_REORDER_BUFFER, N_("RIST reorder buffer (ms)"),
1074 NULL, true )
1075 add_integer( "max-retries", RIST_MAX_RETRIES, N_("RIST maximum retry count"), NULL, true )
1076 add_bool( "eof-on-reset", false, "Trigger an EOF event when a buffer reset is triggered",
1077 "This is probably useful when you are decoding but not so much if you are streaming", true )
1078 add_integer( "nack-type", NACK_FMT_RANGE,
1079 N_("RIST nack type, 0 = range, 1 = bitmask. Default is range"), NULL, true )
1080 change_integer_list( nack_type, nack_type_names )
1082 set_capability( "access", 0 )
1083 add_shortcut( "rist", "tr06" )
1085 set_callbacks( Open, Close )
1087 vlc_module_end ()