1 /*****************************************************************************
2 * rist.c: RIST (Reliable Internet Stream Transport) input module
3 *****************************************************************************
4 * Copyright (C) 2018, DVEO, the Broadcast Division of Computer Modules, Inc.
5 * Copyright (C) 2018, SipRadius LLC
7 * Authors: Sergio Ammirata <sergio@ammirata.net>
8 * Daniele Lacamera <root@danielinux.net>
10 * This program is free software; you can redistribute it and/or modify it
11 * under the terms of the GNU Lesser General Public License as published by
12 * the Free Software Foundation; either version 2.1 of the License, or
13 * (at your option) any later version.
15 * This program is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 * GNU Lesser General Public License for more details.
20 * You should have received a copy of the GNU Lesser General Public License
21 * along with this program; if not, write to the Free Software Foundation,
22 * Inc., 51 Franklin Street, Fifth Floor, Boston MA 02110-1301, USA.
23 *****************************************************************************/
29 #include <vlc_common.h>
30 #include <vlc_interrupt.h>
31 #include <vlc_plugin.h>
32 #include <vlc_access.h>
33 #include <vlc_threads.h>
34 #include <vlc_network.h>
35 #include <vlc_block.h>
40 #include <bitstream/ietf/rtcp_rr.h>
41 #include <bitstream/ietf/rtcp_sdes.h>
42 #include <bitstream/ietf/rtcp_fb.h>
43 #include <bitstream/ietf/rtp.h>
47 /* The default latency is 1000 ms */
48 #define RIST_DEFAULT_LATENCY 1000
49 /* The default nack retry interval */
50 #define RIST_DEFAULT_RETRY_INTERVAL 132
51 /* The default packet re-ordering buffer */
52 #define RIST_DEFAULT_REORDER_BUFFER 70
53 /* The default max packet size */
54 #define RIST_MAX_PACKET_SIZE 1472
55 /* The default timeout is 5 ms */
56 #define RIST_DEFAULT_POLL_TIMEOUT 5
57 /* The max retry count for nacks */
58 #define RIST_MAX_RETRIES 10
59 /* The rate at which we process and send nack requests */
60 #define NACK_INTERVAL 5 /*ms*/
61 /* Calculate and print stats once per second */
62 #define STATS_INTERVAL 1000 /*ms*/
64 static const int nack_type
[] = {
68 static const char *const nack_type_names
[] = {
69 N_("Range"), N_("Bitmask"),
79 struct rist_flow
*flow
;
80 char sender_name
[MAX_CNAME
];
81 enum NACK_TYPE nack_type
;
82 uint64_t last_data_rx
;
83 uint64_t last_nack_tx
;
85 int i_max_packet_size
;
87 int i_poll_timeout_current
;
91 uint64_t last_message
;
94 uint32_t i_poll_timeout_zero_count
;
95 uint32_t i_poll_timeout_nonzero_count
;
98 uint16_t vbr_ratio_count
;
99 uint32_t i_lost_packets
;
100 uint32_t i_recovered_packets
;
101 uint32_t i_reordered_packets
;
102 uint32_t i_total_packets
;
105 static int Control(stream_t
*p_access
, int i_query
, va_list args
)
109 case STREAM_CAN_SEEK
:
110 case STREAM_CAN_FASTSEEK
:
111 case STREAM_CAN_PAUSE
:
112 case STREAM_CAN_CONTROL_PACE
:
113 *va_arg( args
, bool * ) = false;
116 case STREAM_GET_PTS_DELAY
:
117 *va_arg( args
, vlc_tick_t
* ) = VLC_TICK_FROM_MS(
118 var_InheritInteger(p_access
, "network-caching") );
128 static struct rist_flow
*rist_init_rx(void)
130 struct rist_flow
*flow
= calloc(1, sizeof(struct rist_flow
));
135 flow
->buffer
= calloc(RIST_QUEUE_SIZE
, sizeof(struct rtp_pkt
));
137 if ( unlikely( flow
->buffer
== NULL
) )
145 static void rist_WriteTo_i11e_Locked(vlc_mutex_t lock
, int fd
, const void *buf
, size_t len
,
146 const struct sockaddr
*peer
, socklen_t slen
)
148 vlc_mutex_lock( &lock
);
149 rist_WriteTo_i11e(fd
, buf
, len
, peer
, slen
);
150 vlc_mutex_unlock( &lock
);
153 static struct rist_flow
*rist_udp_receiver(stream_t
*p_access
, vlc_url_t
*parsed_url
)
155 stream_sys_t
*p_sys
= p_access
->p_sys
;
156 msg_Info( p_access
, "Opening Rist Flow Receiver at %s:%d and %s:%d",
157 parsed_url
->psz_host
, parsed_url
->i_port
,
158 parsed_url
->psz_host
, parsed_url
->i_port
+1);
160 p_sys
->flow
= rist_init_rx();
164 p_sys
->flow
->fd_in
= net_OpenDgram(p_access
, parsed_url
->psz_host
, parsed_url
->i_port
, NULL
,
166 if (p_sys
->flow
->fd_in
< 0)
168 msg_Err( p_access
, "cannot open input socket" );
172 p_sys
->flow
->fd_nack
= net_OpenDgram(p_access
, parsed_url
->psz_host
, parsed_url
->i_port
+ 1,
173 NULL
, 0, IPPROTO_UDP
);
174 if (p_sys
->flow
->fd_nack
< 0)
176 msg_Err( p_access
, "cannot open nack socket" );
180 populate_cname(p_sys
->flow
->fd_nack
, p_sys
->flow
->cname
);
181 msg_Info(p_access
, "our cname is %s", p_sys
->flow
->cname
);
186 static int is_index_in_range(struct rist_flow
*flow
, uint16_t idx
)
188 if (flow
->ri
<= flow
->wi
) {
189 return ((idx
> flow
->ri
) && (idx
<= flow
->wi
));
191 return ((idx
> flow
->ri
) || (idx
<= flow
->wi
));
195 static void send_rtcp_feedback(stream_t
*p_access
, struct rist_flow
*flow
)
197 stream_sys_t
*p_sys
= p_access
->p_sys
;
198 int namelen
= strlen(flow
->cname
) + 1;
200 /* we need to make sure it is a multiple of 4, pad if necessary */
201 if ((namelen
- 2) & 0x3)
202 namelen
= ((((namelen
- 2) >> 2) + 1) << 2) + 2;
204 int rtcp_feedback_size
= RTCP_EMPTY_RR_SIZE
+ RTCP_SDES_SIZE
+ namelen
;
205 uint8_t *buf
= malloc(rtcp_feedback_size
);
206 if ( unlikely( buf
== NULL
) )
213 rtcp_set_length(rr
, 1);
214 rtcp_fb_set_int_ssrc_pkt_sender(rr
, 0);
217 uint8_t *p_sdes
= (buf
+ RTCP_EMPTY_RR_SIZE
);
219 rtp_set_cc(p_sdes
, 1); /* Actually it is source count in this case */
220 rtcp_sdes_set_pt(p_sdes
);
221 rtcp_set_length(p_sdes
, (namelen
>> 2) + 2);
222 rtcp_sdes_set_cname(p_sdes
, 1);
223 rtcp_sdes_set_name_length(p_sdes
, strlen(flow
->cname
));
224 uint8_t *p_sdes_name
= (buf
+ RTCP_EMPTY_RR_SIZE
+ RTCP_SDES_SIZE
);
225 strlcpy((char *)p_sdes_name
, flow
->cname
, namelen
);
227 /* Write to Socket */
228 rist_WriteTo_i11e_Locked(p_sys
->lock
, flow
->fd_nack
, buf
, rtcp_feedback_size
,
229 (struct sockaddr
*)&flow
->peer_sockaddr
, flow
->peer_socklen
);
234 static void send_bbnack(stream_t
*p_access
, int fd_nack
, block_t
*pkt_nacks
, uint16_t nack_count
)
236 stream_sys_t
*p_sys
= p_access
->p_sys
;
237 struct rist_flow
*flow
= p_sys
->flow
;
240 int bbnack_bufsize
= RTCP_FB_HEADER_SIZE
+
241 RTCP_FB_FCI_GENERIC_NACK_SIZE
* nack_count
;
242 uint8_t *buf
= malloc(bbnack_bufsize
);
243 if ( unlikely( buf
== NULL
) )
249 rtcp_fb_set_fmt(nack
, NACK_FMT_BITMASK
);
250 rtcp_set_pt(nack
, RTCP_PT_RTPFB
);
251 rtcp_set_length(nack
, 2 + nack_count
);
252 /*uint8_t name[4] = "RIST";*/
253 /*rtcp_fb_set_ssrc_media_src(nack, name);*/
254 len
+= RTCP_FB_HEADER_SIZE
;
255 /* TODO : group together */
256 uint16_t nacks
[MAX_NACKS
];
257 memcpy(nacks
, pkt_nacks
->p_buffer
, pkt_nacks
->i_buffer
);
258 for (int i
= 0; i
< nack_count
; i
++) {
259 uint8_t *nack_record
= buf
+ len
+ RTCP_FB_FCI_GENERIC_NACK_SIZE
*i
;
260 rtcp_fb_nack_set_packet_id(nack_record
, nacks
[i
]);
261 rtcp_fb_nack_set_bitmask_lost(nack_record
, 0);
263 len
+= RTCP_FB_FCI_GENERIC_NACK_SIZE
* nack_count
;
265 /* Write to Socket */
266 rist_WriteTo_i11e_Locked(p_sys
->lock
, fd_nack
, buf
, len
,
267 (struct sockaddr
*)&flow
->peer_sockaddr
, flow
->peer_socklen
);
272 static void send_rbnack(stream_t
*p_access
, int fd_nack
, block_t
*pkt_nacks
, uint16_t nack_count
)
274 stream_sys_t
*p_sys
= p_access
->p_sys
;
275 struct rist_flow
*flow
= p_sys
->flow
;
278 int rbnack_bufsize
= RTCP_FB_HEADER_SIZE
+
279 RTCP_FB_FCI_GENERIC_NACK_SIZE
* nack_count
;
280 uint8_t *buf
= malloc(rbnack_bufsize
);
281 if ( unlikely( buf
== NULL
) )
287 rtcp_fb_set_fmt(nack
, NACK_FMT_RANGE
);
288 rtcp_set_pt(nack
, RTCP_PT_RTPFR
);
289 rtcp_set_length(nack
, 2 + nack_count
);
290 uint8_t name
[4] = "RIST";
291 rtcp_fb_set_ssrc_media_src(nack
, name
);
292 len
+= RTCP_FB_HEADER_SIZE
;
293 /* TODO : group together */
294 uint16_t nacks
[MAX_NACKS
];
295 memcpy(nacks
, pkt_nacks
->p_buffer
, pkt_nacks
->i_buffer
);
296 for (int i
= 0; i
< nack_count
; i
++)
298 uint8_t *nack_record
= buf
+ len
+ RTCP_FB_FCI_GENERIC_NACK_SIZE
*i
;
299 rtcp_fb_nack_set_range_start(nack_record
, nacks
[i
]);
300 rtcp_fb_nack_set_range_extra(nack_record
, 0);
302 len
+= RTCP_FB_FCI_GENERIC_NACK_SIZE
* nack_count
;
304 /* Write to Socket */
305 rist_WriteTo_i11e_Locked(p_sys
->lock
, fd_nack
, buf
, len
,
306 (struct sockaddr
*)&flow
->peer_sockaddr
, flow
->peer_socklen
);
311 static void send_nacks(stream_t
*p_access
, struct rist_flow
*flow
)
313 stream_sys_t
*p_sys
= p_access
->p_sys
;
316 uint64_t last_ts
= 0;
317 uint16_t null_count
= 0;
319 uint16_t nacks
[MAX_NACKS
];
322 while(idx
++ != flow
->wi
)
324 pkt
= &(flow
->buffer
[idx
]);
325 if (pkt
->buffer
== NULL
)
327 if (nacks_len
+ 1 >= MAX_NACKS
)
334 /* TODO: after adding average spacing calculation, change this formula
335 to extrapolated_ts = last_ts + null_count * avg_delta_ts; */
336 uint64_t extrapolated_ts
= last_ts
;
337 /* Find out the age and add it only if necessary */
338 int retry_count
= flow
->nacks_retries
[idx
];
339 uint64_t age
= flow
->hi_timestamp
- extrapolated_ts
;
341 if (retry_count
== 0){
342 expiration
= flow
->reorder_buffer
;
344 expiration
= (uint64_t)flow
->nacks_retries
[idx
] * (uint64_t)flow
->retry_interval
;
346 if (age
> expiration
&& retry_count
<= flow
->max_retries
)
348 flow
->nacks_retries
[idx
]++;
349 nacks
[nacks_len
++] = idx
;
350 msg_Dbg(p_access
, "Sending NACK for seq %d, age %"PRId64
" ms, retry %u, " \
351 "expiration %"PRId64
" ms", idx
, ts_get_from_rtp(age
)/1000,
352 flow
->nacks_retries
[idx
], ts_get_from_rtp(expiration
)/1000);
358 last_ts
= pkt
->rtp_ts
;
364 block_t
*pkt_nacks
= block_Alloc(nacks_len
* 2);
367 memcpy(pkt_nacks
->p_buffer
, nacks
, nacks_len
* 2);
368 pkt_nacks
->i_buffer
= nacks_len
* 2;
369 block_FifoPut( p_sys
->p_fifo
, pkt_nacks
);
374 static int sockaddr_cmp(struct sockaddr
*x
, struct sockaddr
*y
)
376 #define CMP(a, b) if (a != b) return a < b ? -1 : 1
378 CMP(x
->sa_family
, y
->sa_family
);
380 if (x
->sa_family
== AF_INET
)
382 struct sockaddr_in
*xin
= (void*)x
, *yin
= (void*)y
;
383 CMP(ntohl(xin
->sin_addr
.s_addr
), ntohl(yin
->sin_addr
.s_addr
));
384 CMP(ntohs(xin
->sin_port
), ntohs(yin
->sin_port
));
386 else if (x
->sa_family
== AF_INET6
)
388 struct sockaddr_in6
*xin6
= (void*)x
, *yin6
= (void*)y
;
389 int r
= memcmp(xin6
->sin6_addr
.s6_addr
, yin6
->sin6_addr
.s6_addr
,
390 sizeof(xin6
->sin6_addr
.s6_addr
));
393 CMP(ntohs(xin6
->sin6_port
), ntohs(yin6
->sin6_port
));
394 CMP(xin6
->sin6_flowinfo
, yin6
->sin6_flowinfo
);
395 CMP(xin6
->sin6_scope_id
, yin6
->sin6_scope_id
);
402 static void print_sockaddr_info_change(stream_t
*p_access
, struct sockaddr
*x
, struct sockaddr
*y
)
404 if (x
->sa_family
== AF_INET
)
406 struct sockaddr_in
*xin
= (void*)x
, *yin
= (void*)y
;
407 msg_Info(p_access
, "Peer IP:Port change detected: old IP:Port %s:%d, new IP:Port %s:%d",
408 inet_ntoa(xin
->sin_addr
), ntohs(xin
->sin_port
), inet_ntoa(yin
->sin_addr
),
409 ntohs(yin
->sin_port
));
411 else if (x
->sa_family
== AF_INET6
)
413 struct sockaddr_in6
*xin6
= (void*)x
, *yin6
= (void*)y
;
414 char oldstr
[INET6_ADDRSTRLEN
];
415 char newstr
[INET6_ADDRSTRLEN
];
416 inet_ntop(xin6
->sin6_family
, &xin6
->sin6_addr
, oldstr
, sizeof(struct in6_addr
));
417 inet_ntop(yin6
->sin6_family
, &yin6
->sin6_addr
, newstr
, sizeof(struct in6_addr
));
418 msg_Info(p_access
, "Peer IP:Port change detected: old IP:Port %s:%d, new IP:Port %s:%d",
419 oldstr
, ntohs(xin6
->sin6_port
), newstr
, ntohs(yin6
->sin6_port
));
423 static void print_sockaddr_info(stream_t
*p_access
, struct sockaddr
*x
)
425 if (x
->sa_family
== AF_INET
)
427 struct sockaddr_in
*xin
= (void*)x
;
428 msg_Info(p_access
, "Peer IP:Port %s:%d", inet_ntoa(xin
->sin_addr
), ntohs(xin
->sin_port
));
430 else if (x
->sa_family
== AF_INET6
)
432 struct sockaddr_in6
*xin6
= (void*)x
;
433 char str
[INET6_ADDRSTRLEN
];
434 inet_ntop(xin6
->sin6_family
, &xin6
->sin6_addr
, str
, sizeof(struct in6_addr
));
435 msg_Info(p_access
, "Peer IP:Port %s:%d", str
, ntohs(xin6
->sin6_port
));
439 static void rtcp_input(stream_t
*p_access
, struct rist_flow
*flow
, uint8_t *buf_in
, size_t len
,
440 struct sockaddr
*peer
, socklen_t slen
)
442 stream_sys_t
*p_sys
= p_access
->p_sys
;
444 uint16_t processed_bytes
= 0;
446 char new_sender_name
[MAX_CNAME
];
449 while (processed_bytes
< len
) {
450 buf
= buf_in
+ processed_bytes
;
452 uint16_t bytes_left
= len
- processed_bytes
+ 1;
453 if ( bytes_left
< 4 )
455 /* we must have at least 4 bytes */
456 msg_Err(p_access
, "Rist rtcp packet must have at least 4 bytes, we have %d",
460 else if (!rtp_check_hdr(buf
))
462 /* check for a valid rtp header */
463 msg_Err(p_access
, "Malformed rtcp packet starting with %02x, ignoring.", buf
[0]);
467 ptype
= rtcp_get_pt(buf
);
468 records
= rtcp_get_length(buf
);
469 uint16_t bytes
= (uint16_t)(4 * (1 + records
));
470 if (bytes
> bytes_left
)
472 /* check for a sane number of bytes */
473 msg_Err(p_access
, "Malformed rtcp packet, wrong len %d, expecting %u bytes in the " \
474 "packet, got a buffer of %u bytes.", rtcp_get_length(buf
), bytes
, bytes_left
);
488 /* Check for changes in source IP address or port */
489 int8_t name_length
= rtcp_sdes_get_name_length(buf
);
490 if (name_length
> bytes_left
)
492 /* check for a sane number of bytes */
493 msg_Err(p_access
, "Malformed SDES packet, wrong cname len %u, got a " \
494 "buffer of %u bytes.", name_length
, bytes_left
);
497 bool ip_port_changed
= false;
498 if (sockaddr_cmp((struct sockaddr
*)&flow
->peer_sockaddr
, peer
) != 0)
500 ip_port_changed
= true;
501 if(flow
->peer_socklen
> 0)
502 print_sockaddr_info_change(p_access
,
503 (struct sockaddr
*)&flow
->peer_sockaddr
, peer
);
505 print_sockaddr_info(p_access
, peer
);
506 vlc_mutex_lock( &p_sys
->lock
);
507 memcpy(&flow
->peer_sockaddr
, peer
, sizeof(struct sockaddr_storage
));
508 flow
->peer_socklen
= slen
;
509 vlc_mutex_unlock( &p_sys
->lock
);
512 /* Check for changes in cname */
513 bool peer_name_changed
= false;
514 memset(new_sender_name
, 0, MAX_CNAME
);
515 memcpy(new_sender_name
, buf
+ RTCP_SDES_SIZE
, name_length
);
516 if (memcmp(new_sender_name
, p_sys
->sender_name
, name_length
) != 0)
518 peer_name_changed
= true;
519 if (strcmp(p_sys
->sender_name
, "") == 0)
520 msg_Info(p_access
, "Peer Name: %s", new_sender_name
);
522 msg_Info(p_access
, "Peer Name change detected: old Name: %s, new " \
523 "Name: %s", p_sys
->sender_name
, new_sender_name
);
524 memset(p_sys
->sender_name
, 0, MAX_CNAME
);
525 memcpy(p_sys
->sender_name
, buf
+ RTCP_SDES_SIZE
, name_length
);
528 /* Reset the buffer as the source must have been restarted */
529 if (peer_name_changed
|| ip_port_changed
)
531 /* reset the buffer */
541 msg_Err(p_access
, " Unrecognized RTCP packet with PTYPE=%02x!!", ptype
);
543 processed_bytes
+= (4 * (1 + records
));
547 static bool rist_input(stream_t
*p_access
, struct rist_flow
*flow
, uint8_t *buf
, size_t len
)
549 stream_sys_t
*p_sys
= p_access
->p_sys
;
552 if ( len
< RTP_HEADER_SIZE
)
554 /* check if packet size >= rtp header size */
555 msg_Err(p_access
, "Rist rtp packet must have at least 12 bytes, we have %lu", len
);
558 else if (!rtp_check_hdr(buf
))
560 /* check for a valid rtp header */
561 msg_Err(p_access
, "Malformed rtp packet header starting with %02x, ignoring.", buf
[0]);
565 uint16_t idx
= rtp_get_seqnum(buf
);
566 uint32_t pkt_ts
= rtp_get_timestamp(buf
);
567 bool retrasnmitted
= false;
569 uint64_t now
= vlc_tick_now();
571 if (flow
->reset
== 2)
573 if ((uint64_t)(now
- p_sys
->last_message
) > (uint64_t)VLC_TICK_FROM_MS(flow
->latency
) ) {
574 msg_Info(p_access
, "Waiting for Sender's Coordinates, i.e. rtcp handshake ...");
576 p_sys
->last_message
= now
;
579 else if (flow
->reset
== 1)
581 msg_Info(p_access
, "Traffic detected after buffer reset");
582 /* First packet in the queue */
583 flow
->hi_timestamp
= pkt_ts
;
584 msg_Info(p_access
, "ts@%u", flow
->hi_timestamp
);
590 /* Check to see if this is a retransmission or a regular packet */
591 if (buf
[11] & (1 << 0))
593 msg_Dbg(p_access
, "Packet %d RECOVERED, Window: [%d:%d-->%d]", idx
, flow
->ri
, flow
->wi
,
595 p_sys
->i_recovered_packets
++;
596 retrasnmitted
= true;
598 else if (flow
->wi
!= flow
->ri
)
600 /* Reset counter to 0 on incoming holes */
601 /* Regular packets only as retransmits are expected to come in out of order */
602 uint16_t idxnext
= (uint16_t)(flow
->wi
+ 1);
606 msg_Dbg(p_access
, "Gap, got %d, expected %d, %d packet gap, Window: [%d:%d-->%d]",
607 idx
, idxnext
, idx
- idxnext
, flow
->ri
, flow
->wi
, (uint16_t)(flow
->wi
-flow
->ri
));
609 p_sys
->i_reordered_packets
++;
610 msg_Dbg(p_access
, "Out of order, got %d, expected %d, Window: [%d:%d-->%d]", idx
,
611 idxnext
, flow
->ri
, flow
->wi
, (uint16_t)(flow
->wi
-flow
->ri
));
613 uint16_t zero_counter
= (uint16_t)(flow
->wi
+ 1);
614 while(zero_counter
++ != idx
) {
615 flow
->nacks_retries
[zero_counter
] = 0;
617 /*msg_Dbg(p_access, "Gap, reseting %d packets as zero nacks %d to %d",
618 idx - flow->wi - 1, (uint16_t)(flow->wi + 1), idx);*/
622 /* Always replace the existing one with the new one */
624 pkt
= &(flow
->buffer
[idx
]);
625 if (pkt
->buffer
&& pkt
->buffer
->i_buffer
> 0)
627 block_Release(pkt
->buffer
);
630 pkt
->buffer
= block_Alloc(len
);
634 pkt
->buffer
->i_buffer
= len
;
635 memcpy(pkt
->buffer
->p_buffer
, buf
, len
);
636 pkt
->rtp_ts
= pkt_ts
;
637 p_sys
->last_data_rx
= vlc_tick_now();
638 /* Reset the try counter regardless of wether it was a retransmit or not */
639 flow
->nacks_retries
[idx
] = 0;
644 p_sys
->i_total_packets
++;
645 /* Perform discontinuity checks and udpdate counters */
646 if (!is_index_in_range(flow
, idx
) && pkt_ts
>= flow
->hi_timestamp
)
648 if ((pkt_ts
- flow
->hi_timestamp
) > flow
->hi_timestamp
/10)
650 msg_Info(p_access
, "Forward stream discontinuity idx@%d/%d/%d ts@%u/%u", flow
->ri
, idx
,
651 flow
->wi
, pkt_ts
, flow
->hi_timestamp
);
658 flow
->hi_timestamp
= pkt_ts
;
661 else if (!is_index_in_range(flow
, idx
))
663 /* incoming timestamp just jumped back in time or index is outside of scope */
664 msg_Info(p_access
, "Backwards stream discontinuity idx@%d/%d/%d ts@%u/%u", flow
->ri
, idx
,
665 flow
->wi
, pkt_ts
, flow
->hi_timestamp
);
673 static block_t
*rist_dequeue(stream_t
*p_access
, struct rist_flow
*flow
)
675 stream_sys_t
*p_sys
= p_access
->p_sys
;
676 block_t
*pktout
= NULL
;
679 if (flow
->ri
== flow
->wi
|| flow
->reset
> 0)
683 bool found_data
= false;
684 uint16_t loss_amount
= 0;
685 while(idx
++ != flow
->wi
) {
687 pkt
= &(flow
->buffer
[idx
]);
690 /*msg_Info(p_access, "Possible packet loss on index #%d", idx);*/
692 /* We move ahead until we find a timestamp but we do not move the cursor.
693 * None of them are guaranteed packet loss because we do not really
694 * know their timestamps. They might still arrive on the next loop.
695 * We can confirm the loss only if we get a valid packet in the loop below. */
699 /*printf("IDX=%d, flow->hi_timestamp: %u, (ts + flow->rtp_latency): %u\n", idx,
700 flow->hi_timestamp, (ts - 100 * flow->qdelay));*/
701 if (flow
->hi_timestamp
> (uint32_t)(pkt
->rtp_ts
+ flow
->rtp_latency
))
703 /* Populate output packet now but remove rtp header from source */
704 int newSize
= pkt
->buffer
->i_buffer
- RTP_HEADER_SIZE
;
705 pktout
= block_Alloc(newSize
);
708 pktout
->i_buffer
= newSize
;
709 memcpy(pktout
->p_buffer
, pkt
->buffer
->p_buffer
+ RTP_HEADER_SIZE
, newSize
);
710 /* free the buffer and increase the read index */
712 /* TODO: calculate average duration using buffer average (bring from sender) */
715 block_Release(pkt
->buffer
);
722 if (loss_amount
> 0 && found_data
== true)
724 /* Packet loss confirmed, we found valid data after the holes */
725 msg_Dbg(p_access
, "Packet NOT RECOVERED, %d packet(s), Window: [%d:%d]", loss_amount
,
727 p_sys
->i_lost_packets
+= loss_amount
;
733 static void *rist_thread(void *data
)
735 stream_t
*p_access
= data
;
736 stream_sys_t
*p_sys
= p_access
->p_sys
;
738 /* Process nacks every 5ms */
739 /* We only ask for the relevant ones */
741 block_t
*pkt_nacks
= block_FifoGet(p_sys
->p_fifo
);
743 int canc
= vlc_savecancel();
745 /* there are two bytes per nack */
746 uint16_t nack_count
= (uint16_t)pkt_nacks
->i_buffer
/2;
747 switch(p_sys
->nack_type
) {
748 case NACK_FMT_BITMASK
:
749 send_bbnack(p_access
, p_sys
->flow
->fd_nack
, pkt_nacks
, nack_count
);
753 send_rbnack(p_access
, p_sys
->flow
->fd_nack
, pkt_nacks
, nack_count
);
757 msg_Dbg(p_access
, "Sent %u NACKs !!!", nack_count
);
758 block_Release(pkt_nacks
);
760 vlc_restorecancel (canc
);
766 static block_t
*BlockRIST(stream_t
*p_access
, bool *restrict eof
)
768 stream_sys_t
*p_sys
= p_access
->p_sys
;
771 block_t
*pktout
= NULL
;
772 struct pollfd pfd
[2];
775 struct sockaddr_storage peer
;
776 socklen_t slen
= sizeof(struct sockaddr_storage
);
777 struct rist_flow
*flow
= p_sys
->flow
;
779 if (vlc_killed() || (flow
->reset
== 1 && p_sys
->eof_on_reset
))
785 pfd
[0].fd
= flow
->fd_in
;
786 pfd
[0].events
= POLLIN
;
787 pfd
[1].fd
= flow
->fd_nack
;
788 pfd
[1].events
= POLLIN
;
790 /* The protocol uses a fifo buffer with a fixed time delay.
791 * That buffer needs to be emptied at a rate that is determined by the rtp timestamps of the
792 * packets. If I waited indefinitely for data coming in, the rate and delay of output packets
793 * would be wrong. I am calling the rist_dequeue function every time a data packet comes in
794 * and also every time we get a poll timeout. The configurable poll timeout is for controling
795 * the maximum jitter of output data coming out of the buffer. The default 5ms timeout covers
798 ret
= vlc_poll_i11e(pfd
, 2, p_sys
->i_poll_timeout_current
);
799 if (unlikely(ret
< 0))
803 /* Poll timeout, check the queue for the next packet that needs to be delivered */
804 pktout
= rist_dequeue(p_access
, flow
);
805 /* if there is data, we need to come back faster to finish emptying it */
807 p_sys
->i_poll_timeout_current
= 0;
808 p_sys
->i_poll_timeout_zero_count
++;
810 p_sys
->i_poll_timeout_current
= p_sys
->i_poll_timeout
;
811 p_sys
->i_poll_timeout_nonzero_count
++;
817 uint8_t *buf
= malloc(p_sys
->i_max_packet_size
);
818 if ( unlikely( buf
== NULL
) )
821 /* Process rctp incoming data */
822 if (pfd
[1].revents
& POLLIN
)
824 r
= rist_ReadFrom_i11e(flow
->fd_nack
, buf
, p_sys
->i_max_packet_size
,
825 (struct sockaddr
*)&peer
, &slen
);
826 if (unlikely(r
== -1)) {
827 msg_Err(p_access
, "socket %d error: %s\n", flow
->fd_nack
, gai_strerror(errno
));
830 rtcp_input(p_access
, flow
, buf
, r
, (struct sockaddr
*)&peer
, slen
);
834 /* Process regular incoming data */
835 if (pfd
[0].revents
& POLLIN
)
837 r
= rist_Read_i11e(flow
->fd_in
, buf
, p_sys
->i_max_packet_size
);
838 if (unlikely(r
== -1)) {
839 msg_Err(p_access
, "socket %d error: %s\n", flow
->fd_in
, gai_strerror(errno
));
843 /* rist_input will process and queue the pkt */
844 if (rist_input(p_access
, flow
, buf
, r
))
846 /* Check the queue for the next packet that needs to be delivered */
847 pktout
= rist_dequeue(p_access
, flow
);
849 p_sys
->i_poll_timeout_current
= 0;
850 p_sys
->i_poll_timeout_zero_count
++;
852 p_sys
->i_poll_timeout_current
= p_sys
->i_poll_timeout
;
853 p_sys
->i_poll_timeout_nonzero_count
++;
858 if (p_sys
->eof_on_reset
)
868 now
= vlc_tick_now();
870 /* Process stats and print them out */
871 /* We need to measure some items every 70ms */
872 uint64_t interval
= (now
- flow
->feedback_time
);
873 if ( interval
> VLC_TICK_FROM_MS(RTCP_INTERVAL
) )
875 if (p_sys
->i_poll_timeout_nonzero_count
> 0)
877 float ratio
= (float)p_sys
->i_poll_timeout_zero_count
878 / (float)p_sys
->i_poll_timeout_nonzero_count
;
880 p_sys
->vbr_ratio
+= 1 - ratio
;
882 p_sys
->vbr_ratio
+= ratio
- 1;
883 p_sys
->vbr_ratio_count
++;
884 /*msg_Dbg(p_access, "zero poll %u, non-zero poll %u, ratio %.2f",
885 p_sys->i_poll_timeout_zero_count, p_sys->i_poll_timeout_nonzero_count, ratio);*/
886 p_sys
->i_poll_timeout_zero_count
= 0;
887 p_sys
->i_poll_timeout_nonzero_count
= 0;
890 /* We print out the stats once per second */
891 interval
= (now
- p_sys
->i_last_stat
);
892 if ( interval
> VLC_TICK_FROM_MS(STATS_INTERVAL
) )
894 if ( p_sys
->i_lost_packets
> 0)
895 msg_Err(p_access
, "We have %d lost packets", p_sys
->i_lost_packets
);
897 if (p_sys
->vbr_ratio_count
> 0)
898 ratio
= p_sys
->vbr_ratio
/ (float)p_sys
->vbr_ratio_count
;
900 if (p_sys
->i_total_packets
> 0)
901 quality
-= (float)100*(float)(p_sys
->i_lost_packets
+ p_sys
->i_recovered_packets
+
902 p_sys
->i_reordered_packets
)/(float)p_sys
->i_total_packets
;
904 msg_Info(p_access
, "STATS: Total %u, Recovered %u, Reordered %u, Lost %u, VBR Score " \
905 "%.2f, Link Quality %.2f%%", p_sys
->i_total_packets
, p_sys
->i_recovered_packets
,
906 p_sys
->i_reordered_packets
, p_sys
->i_lost_packets
, ratio
, quality
);
907 p_sys
->i_last_stat
= now
;
908 p_sys
->vbr_ratio
= 0;
909 p_sys
->vbr_ratio_count
= 0;
910 p_sys
->i_lost_packets
= 0;
911 p_sys
->i_recovered_packets
= 0;
912 p_sys
->i_reordered_packets
= 0;
913 p_sys
->i_total_packets
= 0;
916 /* Send rtcp feedback every RTCP_INTERVAL */
917 interval
= (now
- flow
->feedback_time
);
918 if ( interval
> VLC_TICK_FROM_MS(RTCP_INTERVAL
) )
920 /* msg_Dbg(p_access, "Calling RTCP Feedback %lu<%d ms using timer", interval,
921 VLC_TICK_FROM_MS(RTCP_INTERVAL)); */
922 send_rtcp_feedback(p_access
, flow
);
923 flow
->feedback_time
= now
;
926 /* Send nacks every NACK_INTERVAL (only the ones that have matured, if any) */
927 interval
= (now
- p_sys
->last_nack_tx
);
928 if ( interval
> VLC_TICK_FROM_MS(NACK_INTERVAL
) )
930 send_nacks(p_access
, p_sys
->flow
);
931 p_sys
->last_nack_tx
= now
;
934 /* Safety check for when the input stream stalls */
935 if ( p_sys
->last_data_rx
> 0 && now
> p_sys
->last_data_rx
&&
936 (uint64_t)(now
- p_sys
->last_data_rx
) > (uint64_t)VLC_TICK_FROM_MS(flow
->latency
) &&
937 (uint64_t)(now
- p_sys
->last_reset
) > (uint64_t)VLC_TICK_FROM_MS(flow
->latency
) )
939 msg_Err(p_access
, "No data received for %"PRId64
" ms, resetting buffers",
940 (int64_t)(now
- p_sys
->last_data_rx
)/1000);
941 p_sys
->last_reset
= now
;
951 static void Clean( stream_t
*p_access
)
953 stream_sys_t
*p_sys
= p_access
->p_sys
;
955 if( likely(p_sys
->p_fifo
!= NULL
) )
956 block_FifoRelease( p_sys
->p_fifo
);
960 if (p_sys
->flow
->fd_in
>= 0)
961 net_Close (p_sys
->flow
->fd_in
);
962 if (p_sys
->flow
->fd_nack
>= 0)
963 net_Close (p_sys
->flow
->fd_nack
);
964 for (int i
=0; i
<RIST_QUEUE_SIZE
; i
++) {
965 struct rtp_pkt
*pkt
= &(p_sys
->flow
->buffer
[i
]);
966 if (pkt
->buffer
&& pkt
->buffer
->i_buffer
> 0) {
967 block_Release(pkt
->buffer
);
971 free(p_sys
->flow
->buffer
);
975 vlc_mutex_destroy( &p_sys
->lock
);
978 static void Close(vlc_object_t
*p_this
)
980 stream_t
*p_access
= (stream_t
*)p_this
;
981 stream_sys_t
*p_sys
= p_access
->p_sys
;
983 vlc_cancel(p_sys
->thread
);
984 vlc_join(p_sys
->thread
, NULL
);
989 static int Open(vlc_object_t
*p_this
)
991 stream_t
*p_access
= (stream_t
*)p_this
;
992 stream_sys_t
*p_sys
= NULL
;
993 vlc_url_t parsed_url
= { 0 };
995 p_sys
= vlc_obj_calloc( p_this
, 1, sizeof( *p_sys
) );
996 if( unlikely( p_sys
== NULL
) )
999 p_access
->p_sys
= p_sys
;
1001 vlc_mutex_init( &p_sys
->lock
);
1003 if ( vlc_UrlParse( &parsed_url
, p_access
->psz_url
) == -1 )
1005 msg_Err( p_access
, "Failed to parse input URL (%s)",
1006 p_access
->psz_url
);
1010 /* Initialize rist flow */
1011 p_sys
->flow
= rist_udp_receiver(p_access
, &parsed_url
);
1012 vlc_UrlClean( &parsed_url
);
1015 msg_Err( p_access
, "Failed to open rist flow (%s)",
1016 p_access
->psz_url
);
1020 p_sys
->nack_type
= var_InheritInteger( p_access
, "nack-type" );
1021 p_sys
->i_max_packet_size
= var_InheritInteger( p_access
, "packet-size" );
1022 p_sys
->i_poll_timeout
= var_InheritInteger( p_access
, "maximum-jitter" );
1023 p_sys
->eof_on_reset
= var_InheritBool( p_access
, "eof-on-reset" );
1024 p_sys
->flow
->retry_interval
= var_InheritInteger( p_access
, "retry-interval" );
1025 p_sys
->flow
->reorder_buffer
= var_InheritInteger( p_access
, "reorder-buffer" );
1026 p_sys
->flow
->max_retries
= var_InheritInteger( p_access
, "max-retries" );
1027 p_sys
->flow
->latency
= var_InheritInteger( p_access
, "latency" );
1028 msg_Info(p_access
, "Setting queue latency to %d ms", p_sys
->flow
->latency
);
1030 /* Convert to rtp times */
1031 p_sys
->flow
->rtp_latency
= rtp_get_ts(VLC_TICK_FROM_MS(p_sys
->flow
->latency
));
1032 p_sys
->flow
->retry_interval
= rtp_get_ts(VLC_TICK_FROM_MS(p_sys
->flow
->retry_interval
));
1033 p_sys
->flow
->reorder_buffer
= rtp_get_ts(VLC_TICK_FROM_MS(p_sys
->flow
->reorder_buffer
));
1035 p_sys
->p_fifo
= block_FifoNew();
1036 if( unlikely(p_sys
->p_fifo
== NULL
) )
1039 /* This extra thread is for sending feedback/nack packets even when no data comes in */
1040 if (vlc_clone(&p_sys
->thread
, rist_thread
, p_access
, VLC_THREAD_PRIORITY_INPUT
))
1042 msg_Err(p_access
, "Failed to create worker thread.");
1046 p_access
->pf_block
= BlockRIST
;
1047 p_access
->pf_control
= Control
;
1053 return VLC_EGENERIC
;
1056 /* Module descriptor */
1059 set_shortname( N_("RIST") )
1060 set_description( N_("RIST input") )
1061 set_category( CAT_INPUT
)
1062 set_subcategory( SUBCAT_INPUT_ACCESS
)
1064 add_integer( "packet-size", RIST_MAX_PACKET_SIZE
,
1065 N_("RIST maximum packet size (bytes)"), NULL
, true )
1066 add_integer( "maximum-jitter", RIST_DEFAULT_POLL_TIMEOUT
,
1067 N_("RIST demux/decode maximum jitter (default is 5ms)"),
1068 N_("This controls the maximum jitter that will be passed to the demux/decode chain. "
1069 "The lower the value, the more CPU cycles the algorithm will consume"), true )
1070 add_integer( "latency", RIST_DEFAULT_LATENCY
, N_("RIST latency (ms)"), NULL
, true )
1071 add_integer( "retry-interval", RIST_DEFAULT_RETRY_INTERVAL
, N_("RIST nack retry interval (ms)"),
1073 add_integer( "reorder-buffer", RIST_DEFAULT_REORDER_BUFFER
, N_("RIST reorder buffer (ms)"),
1075 add_integer( "max-retries", RIST_MAX_RETRIES
, N_("RIST maximum retry count"), NULL
, true )
1076 add_bool( "eof-on-reset", false, "Trigger an EOF event when a buffer reset is triggered",
1077 "This is probably useful when you are decoding but not so much if you are streaming", true )
1078 add_integer( "nack-type", NACK_FMT_RANGE
,
1079 N_("RIST nack type, 0 = range, 1 = bitmask. Default is range"), NULL
, true )
1080 change_integer_list( nack_type
, nack_type_names
)
1082 set_capability( "access", 0 )
1083 add_shortcut( "rist", "tr06" )
1085 set_callbacks( Open
, Close
)