first commit
[libutp-c.git] / utp.c
blobf9d72bf2dcdd180fbc7295751bb4f2722fe9d825
1 #define _GNU_SOURCE
3 #include <string.h>
4 #include <stdio.h>
5 #include <assert.h>
6 #include <string.h>
7 #include <stdlib.h>
8 #include <errno.h>
9 #include <limits.h> /* for UINT_MAX */
11 #include <StdAfx.h>
13 #include "utp.h"
14 #include "templates.h"
16 #ifdef WIN32
17 #include "win32_inet_ntop.h"
19 /* newer versions of MSVC define these in errno.h */
20 #ifndef ECONNRESET
21 #define ECONNRESET WSAECONNRESET
22 #define EMSGSIZE WSAEMSGSIZE
23 #define ECONNREFUSED WSAECONNREFUSED
24 #define ETIMEDOUT WSAETIMEDOUT
25 #endif
26 #endif
28 #ifdef POSIX
29 typedef struct sockaddr_storage SOCKADDR_STORAGE;
30 #endif /* POSIX */
32 /* number of bytes to increase max window size by, per RTT. This is
33 scaled down linearly proportional to off_target. i.e. if all packets
34 in one window have 0 delay, window size will increase by this number.
35 Typically it's less. TCP increases one MSS per RTT, which is 1500 */
36 #define MAX_CWND_INCREASE_BYTES_PER_RTT 3000
37 #define CUR_DELAY_SIZE 3
38 /* experiments suggest that a clock skew of 10 ms per 325 seconds
39 is not impossible. Reset delay_base every 13 minutes. The clock
40 skew is dealt with by observing the delay base in the other
41 direction, and adjusting our own upwards if the opposite direction
42 delay base keeps going down */
43 #define DELAY_BASE_HISTORY 13
44 #define MAX_WINDOW_DECAY 100 /* ms */
46 #define REORDER_BUFFER_SIZE 32
47 #define REORDER_BUFFER_MAX_SIZE 511
48 #define OUTGOING_BUFFER_MAX_SIZE 511
50 #define PACKET_SIZE 350
52 /* this is the minimum max_window value. It can never drop below this */
53 #define MIN_WINDOW_SIZE 10
55 /* when window sizes are smaller than one packet_size, this
56 will pace the packets to average at the given window size
57 if it's not set, it will simply not send anything until
58 there's a timeout */
59 #define USE_PACKET_PACING 1
61 /* if we receive 4 or more duplicate acks, we resend the packet
62 that hasn't been acked yet */
63 #define DUPLICATE_ACKS_BEFORE_RESEND 3
65 #define DELAYED_ACK_BYTE_THRESHOLD 2400 /* bytes */
66 #define DELAYED_ACK_TIME_THRESHOLD 100 /* milliseconds */
68 #define RST_INFO_TIMEOUT 10000
69 #define RST_INFO_LIMIT 1000
70 /* 29 seconds determined from measuring many home NAT devices */
71 #define KEEPALIVE_INTERVAL 29000
74 #define SEQ_NR_MASK 0xFFFF
75 #define ACK_NR_MASK 0xFFFF
77 #define DIV_ROUND_UP(num, denom) ((num + denom - 1) / denom)
79 #include "utp_utils.h"
80 #include "utp_config.h"
82 #define LOG_UTP if (g_log_utp) utp_log
83 #define LOG_UTPV if (g_log_utp_verbose) utp_log
85 uint32 g_current_ms;
87 /* The totals are derived from the following data:
88 45: IPv6 address including embedded IPv4 address
89 11: Scope Id
90 2: Brackets around IPv6 address when port is present
91 6: Port (including colon)
92 1: Terminating null byte */
93 char addrbuf[65];
94 char addrbuf2[65];
95 #define addrfmt(x, s) psa_fmt(x, s, sizeof(s))
97 #if (defined(__SVR4) && defined(__sun))
98 #pragma pack(1)
99 #else
100 #pragma pack(push,1)
101 #endif
103 /*----------------------------------------------------------------------------*/
104 /* packed socket address */
105 union PACKED_ATTRIBUTE psa_in {
106 /* The values are always stored here in network byte order */
107 byte _in6[16]; /* IPv6 */
108 uint16 _in6w[8]; /* IPv6, word based (for convenience) */
109 uint32 _in6d[4]; /* Dword access */
110 struct in6_addr _in6addr; /* For convenience */
113 struct PACKED_ATTRIBUTE ALIGNED_ATTRIBUTE(4) psa {
114 union psa_in _in;
115 /* Host byte order */
116 uint16 _port;
119 static byte psa_get_family(struct psa *psa)
121 return (IN6_IS_ADDR_V4MAPPED(&psa->_in._in6addr) != 0) ? AF_INET : AF_INET6;
124 static bool psa_is_equal(struct psa *lhs, struct psa *rhs)
126 if (rhs == lhs)
127 return true;
128 if (lhs->_port != rhs->_port)
129 return false;
130 return memcmp(&lhs->_in._in6[0], &rhs->_in._in6[0], sizeof(lhs->_in._in6)) == 0;
133 static bool psa_is_not_equal(struct psa *lhs, struct psa *rhs)
135 return !(psa_is_equal(lhs, rhs));
138 static void psa_init(struct psa *psa, SOCKADDR_STORAGE *sa, socklen_t len)
140 if (sa->ss_family == AF_INET) {
141 struct sockaddr_in *sin;
143 assert(len >= sizeof(struct sockaddr_in));
145 psa->_in._in6w[0] = 0;
146 psa->_in._in6w[1] = 0;
147 psa->_in._in6w[2] = 0;
148 psa->_in._in6w[3] = 0;
149 psa->_in._in6w[4] = 0;
150 psa->_in._in6w[5] = 0xffff;
152 sin = (struct sockaddr_in*)sa;
154 psa->_in._in6d[3] = sin->sin_addr.s_addr;
155 psa->_port = ntohs(sin->sin_port);
156 } else {
157 struct sockaddr_in6 *sin6;
159 assert(len >= sizeof(struct sockaddr_in6));
161 sin6 = (struct sockaddr_in6*)sa;
163 psa->_in._in6addr = sin6->sin6_addr;
164 psa->_port = ntohs(sin6->sin6_port);
168 /* structure SOCKADDR_STORAGE is passed by value */
169 /* len is defaulted to NULL */
170 static SOCKADDR_STORAGE psa_get_sockaddr_storage(struct psa *psa, socklen_t *len)
172 SOCKADDR_STORAGE sa;
173 byte family;
175 family = psa_get_family(psa);
176 if (family == AF_INET) {
177 struct sockaddr_in *sin;
179 sin = (struct sockaddr_in*)&sa;
181 if (len) *len = sizeof(struct sockaddr_in);
182 memset(sin, 0, sizeof(struct sockaddr_in));
183 sin->sin_family = family;
184 sin->sin_port = htons(psa->_port);
185 sin->sin_addr.s_addr = psa->_in._in6d[3];
186 } else {
187 struct sockaddr_in6 *sin6;
189 sin6 = (struct sockaddr_in6*)&sa;
191 memset(sin6, 0, sizeof(struct sockaddr_in6));
192 if (len) *len = sizeof(struct sockaddr_in6);
193 sin6->sin6_family = family;
194 sin6->sin6_addr = psa->_in._in6addr;
195 sin6->sin6_port = htons(psa->_port);
197 return sa;
200 static str psa_fmt(struct psa *psa, str s, size_t len)
202 byte family;
203 str i;
205 memset(s, 0, len);
207 family = psa_get_family(psa);
208 if (family == AF_INET) {
209 inet_ntop(family, (uint32*)&psa->_in._in6d[3], s, len);
210 i = s;
211 while (*++i) {}
212 } else {
213 i = s;
214 *i++ = '[';
215 inet_ntop(family, (struct in6_addr*)&psa->_in._in6addr, i, len-1);
216 while (*++i) {}
217 *i++ = ']';
219 snprintf(i, len - (i-s), ":%u", psa->_port);
220 return s;
222 /*----------------------------------------------------------------------------*/
224 struct PACKED_ATTRIBUTE RST_Info {
225 struct psa addr;
226 uint32 connid;
227 uint32 timestamp;
228 uint16 ack_nr;
231 /* these packet sizes are including the uTP header wich
232 is either 20 or 23 bytes depending on version */
233 #define PACKET_SIZE_EMPTY_BUCKET 0
234 #define PACKET_SIZE_EMPTY 23
235 #define PACKET_SIZE_SMALL_BUCKET 1
236 #define PACKET_SIZE_SMALL 373
237 #define PACKET_SIZE_MID_BUCKET 2
238 #define PACKET_SIZE_MID 723
239 #define PACKET_SIZE_BIG_BUCKET 3
240 #define PACKET_SIZE_BIG 1400
241 #define PACKET_SIZE_HUGE_BUCKET 4
243 /* 24 bytes */
244 struct PACKED_ATTRIBUTE pf { /* Packet Format */
245 /* connection ID */
246 uint32_big connid;
247 uint32_big tv_sec;
248 uint32_big tv_usec;
249 uint32_big reply_micro;
250 /* receive window size in PACKET_SIZE chunks */
251 byte windowsize;
252 /* Type of the first extension header */
253 byte ext;
254 /* Flags */
255 byte flags;
256 /* Sequence number */
257 uint16_big seq_nr;
258 /* Acknowledgment number */
259 uint16_big ack_nr;
262 struct PACKED_ATTRIBUTE pfa { /* Packet Format Ack */
263 struct pf pf;
264 byte ext_next;
265 byte ext_len;
266 byte acks[4];
269 struct PACKED_ATTRIBUTE pfe { /* Packet Format Extensions */
270 struct pf pf;
271 byte ext_next;
272 byte ext_len;
273 byte extensions[8];
276 /*----------------------------------------------------------------------------*/
277 /* 20 bytes */
278 struct PACKED_ATTRIBUTE pf1 { /* Packet Format V1 */
279 /* packet_type (4 high bits) */
280 /* protocol version (4 low bits) */
281 byte ver_type;
283 /* Type of the first extension header */
284 byte ext;
285 /* connection ID */
286 uint16_big connid;
287 uint32_big tv_usec;
288 uint32_big reply_micro;
289 /* receive window size in bytes */
290 uint32_big windowsize;
291 /* Sequence number */
292 uint16_big seq_nr;
293 /* Acknowledgment number */
294 uint16_big ack_nr;
297 static byte pf1_version(struct pf1 *pf1)
299 return pf1->ver_type & 0xf;
302 static byte pf1_type(struct pf1 *pf1)
304 return pf1->ver_type >> 4;
307 static void pf1_version_set(struct pf1 *pf1, byte v)
309 pf1->ver_type = (pf1->ver_type & 0xf0) | (v & 0x0f);
312 static void pf1_type_set(struct pf1 *pf1, byte t)
314 pf1->ver_type = (pf1->ver_type & 0x0f) | (t << 4);
316 /*----------------------------------------------------------------------------*/
318 struct PACKED_ATTRIBUTE pfa1 { /* Packet Format Ack V1 */
319 struct pf1 pf;
320 byte ext_next;
321 byte ext_len;
322 byte acks[4];
325 struct PACKED_ATTRIBUTE pfe1 { /* Packet Format Extensions V1 */
326 struct pf1 pf;
327 byte ext_next;
328 byte ext_len;
329 byte extensions[8];
332 /* XXX:carefull many compilers do support pragma pack */
333 #if (defined(__SVR4) && defined(__sun))
334 #pragma pack(0)
335 #else
336 #pragma pack(pop)
337 #endif
339 /* TODO:should be cpp defines */
340 enum {
341 ST_DATA = 0, /* Data packet. */
342 ST_FIN = 1, /* Finalize the connection. This is the last packet. */
343 ST_STATE = 2, /* State packet. Used to transmit an ACK with no data. */
344 ST_RESET = 3, /* Terminate connection forcefully. */
345 ST_SYN = 4, /* Connect SYN */
346 ST_NUM_STATES /* used for bounds checking */
349 /* XXX: no lib global initialization function, then must keep it that way or replace it all */
350 static const cstr flagnames[] = {
351 "ST_DATA","ST_FIN","ST_STATE","ST_RESET","ST_SYN"
354 /* TODO:should be cpp defines */
355 enum CONN_STATE {
356 CS_IDLE = 0,
357 CS_SYN_SENT = 1,
358 CS_CONNECTED = 2,
359 CS_CONNECTED_FULL = 3,
360 CS_GOT_FIN = 4,
361 CS_DESTROY_DELAY = 5,
362 CS_FIN_SENT = 6,
363 CS_RESET = 7,
364 CS_DESTROY = 8
367 /* XXX: no lib global initialization function, then must keep it that way or replace it all */
368 static const cstr statenames[] = {
369 "IDLE","SYN_SENT","CONNECTED","CONNECTED_FULL","GOT_FIN","DESTROY_DELAY","FIN_SENT","RESET","DESTROY"
372 struct op { /* Outgoing Packet */
373 size_t length;
374 size_t payload;
375 uint64 time_sent; /* microseconds */
376 uint transmissions;
377 bool need_resend;
378 byte data[1];
381 static void no_read(void *socket, const byte *bytes, size_t count)
383 (void)socket;
384 (void)bytes;
385 (void)count;
387 static void no_write(void *socket, byte *bytes, size_t count)
389 (void)socket;
390 (void)bytes;
391 (void)count;
393 static size_t no_rb_size(void *socket)
395 (void)socket;
396 return 0;
398 static void no_state(void *socket, int state)
400 (void)socket;
401 (void)state;
403 static void no_error(void *socket, int errcode)
405 (void)socket;
406 (void)errcode;
408 static void no_overhead(void *socket, bool send, size_t count, int type)
410 (void)socket;
411 (void)send;
412 (void)count;
413 (void)type;
416 static struct UTPFunctionTable zero_funcs = {
417 &no_read,
418 &no_write,
419 &no_rb_size,
420 &no_state,
421 &no_error,
422 &no_overhead,
425 /*----------------------------------------------------------------------------*/
426 struct scb {/* Sizable Circular Buffer */
427 /* This is the mask. Since it's always a power of 2, adding 1 to this value will return the size. */
428 size_t mask;
429 /* This is the elements that the circular buffer points to */
430 void **elements;
433 static void *scb_get(struct scb *scb, size_t i)
435 assert(scb->elements);
436 return scb->elements ? scb->elements[i & scb->mask] : NULL;
439 static void scb_put(struct scb *scb, size_t i, void *data)
441 assert(scb->elements);
442 scb->elements[i&scb->mask] = data;
445 /* Item contains the element we want to make space for
446 index is the index in the list. */
447 static void scb_grow(struct scb *scb, size_t item, size_t index)
449 size_t size;
450 void **buf;
451 size_t i;
453 /* Figure out the new size. */
454 size = scb->mask + 1;
455 do size *= 2; while (index >= size);
457 /* Allocate the new buffer */
458 buf = calloc(size, sizeof(void*));
460 size--;
463 /* XXX: hope the resize op is rarely called and with little data */
465 /* Copy elements from the old buffer to the new buffer */
466 for (i = 0; i <= scb->mask; ++i) {
467 buf[(item - index + i) & size] = scb_get(scb, item - index + i);
470 /* Swap to the newly allocated buffer */
471 scb->mask = size;
472 free(scb->elements);
473 scb->elements = buf;
476 static void scb_ensure_size(struct scb *scb, size_t item, size_t index)
478 if (index > scb->mask)
479 scb_grow(scb, item, index);
482 static size_t scb_size(struct scb *scb)
484 return scb->mask + 1; /* remember: power of 2 magic */
486 /*----------------------------------------------------------------------------*/
488 static struct UTPGlobalStats _global_stats;
490 /* compare if lhs is less than rhs, taking wrapping
491 into account. if lhs is close to UINT_MAX and rhs
492 is close to 0, lhs is assumed to have wrapped and
493 considered smaller */
494 static bool wrapping_compare_less(uint32 lhs, uint32 rhs)
496 uint32 dist_down;
497 uint32 dist_up;
499 /* distance walking from lhs to rhs, downwards */
500 dist_down = lhs - rhs;
501 /* distance walking from lhs to rhs, upwards */
502 dist_up = rhs - lhs;
504 /* if the distance walking up is shorter, lhs
505 is less than rhs. If the distance walking down
506 is shorter, then rhs is less than lhs */
507 return dist_up < dist_down;
510 /*----------------------------------------------------------------------------*/
511 struct dh {/* Delay History */
512 uint32 delay_base;
514 /* this is the history of delay samples,
515 normalized by using the delay_base. These
516 values are always greater than 0 and measures
517 the queuing delay in microseconds */
518 uint32 cur_delay_hist[CUR_DELAY_SIZE];
519 size_t cur_delay_idx;
521 /* this is the history of delay_base. It's
522 a number that doesn't have an absolute meaning
523 only relative. It doesn't make sense to initialize
524 it to anything other than values relative to
525 what's been seen in the real world. */
526 uint32 delay_base_hist[DELAY_BASE_HISTORY];
527 size_t delay_base_idx;
528 /* the time when we last stepped the delay_base_idx */
529 uint32 delay_base_time;
531 bool delay_base_initialized;
534 static void dh_clear(struct dh *dh)
536 size_t i;
538 dh->delay_base_initialized = false;
539 dh->delay_base = 0;
540 dh->cur_delay_idx = 0;
541 dh->delay_base_idx = 0;
542 dh->delay_base_time = g_current_ms;
544 for (i = 0; i < CUR_DELAY_SIZE; i++) {
545 dh->cur_delay_hist[i] = 0;
548 for (i = 0; i < DELAY_BASE_HISTORY; i++) {
549 dh->delay_base_hist[i] = 0;
553 static void dh_shift(struct dh *dh, uint32 offset)
555 size_t i;
557 /* the offset should never be "negative"
558 assert(offset < 0x10000000); */
560 /* increase all of our base delays by this amount
561 this is used to take clock skew into account
562 by observing the other side's changes in its base_delay */
563 for (i = 0; i < DELAY_BASE_HISTORY; i++) {
564 dh->delay_base_hist[i] += offset;
566 dh->delay_base += offset;
569 static void dh_add_sample(struct dh *dh, uint32 sample)
571 /* The two clocks (in the two peers) are assumed not to
572 progress at the exact same rate. They are assumed to be
573 drifting, which causes the delay samples to contain
574 a systematic error, either they are under-
575 estimated or over-estimated. This is why we update the
576 delay_base every two minutes, to adjust for this.
578 This means the values will keep drifting and eventually wrap.
579 We can cross the wrapping boundry in two directions, either
580 going up, crossing the highest value, or going down, crossing 0.
582 if the delay_base is close to the max value and sample actually
583 wrapped on the other end we would see something like this:
584 delay_base = 0xffffff00, sample = 0x00000400
585 sample - delay_base = 0x500 which is the correct difference
587 if the delay_base is instead close to 0, and we got an even lower
588 sample (that will eventually update the delay_base), we may see
589 something like this:
590 delay_base = 0x00000400, sample = 0xffffff00
591 sample - delay_base = 0xfffffb00
592 this needs to be interpreted as a negative number and the actual
593 recorded delay should be 0.
595 It is important that all arithmetic that assume wrapping
596 is done with unsigned intergers. Signed integers are not guaranteed
597 to wrap the way unsigned integers do. At least GCC takes advantage
598 of this relaxed rule and won't necessarily wrap signed ints.
600 remove the clock offset and propagation delay.
601 delay base is min of the sample and the current
602 delay base. This min-operation is subject to wrapping
603 and care needs to be taken to correctly choose the
604 true minimum.
606 specifically the problem case is when delay_base is very small
607 and sample is very large (because it wrapped past zero), sample
608 needs to be considered the smaller */
610 uint32 delay;
612 if (!dh->delay_base_initialized) {
613 size_t i;
615 /* delay_base being 0 suggests that we haven't initialized
616 it or its history with any real measurements yet. Initialize
617 everything with this sample. */
618 for (i = 0; i < DELAY_BASE_HISTORY; ++i) {
619 /* if we don't have a value, set it to the current sample */
620 dh->delay_base_hist[i] = sample;
621 continue;
623 dh->delay_base = sample;
624 dh->delay_base_initialized = true;
627 if (wrapping_compare_less(sample, dh->delay_base_hist[dh->delay_base_idx])) {
628 /* sample is smaller than the current delay_base_hist entry
629 update it */
630 dh->delay_base_hist[dh->delay_base_idx] = sample;
633 /* is sample lower than delay_base? If so, update delay_base */
634 if (wrapping_compare_less(sample, dh->delay_base)) {
635 /* sample is smaller than the current delay_base
636 update it */
637 dh->delay_base = sample;
640 /* this operation may wrap, and is supposed to */
641 delay = sample - dh->delay_base;
642 /* sanity check. If this is triggered, something fishy is going on
643 it means the measured sample was greater than 32 seconds! */
644 /* assert(delay < 0x2000000); */
646 dh->cur_delay_hist[dh->cur_delay_idx] = delay;
647 dh->cur_delay_idx = (dh->cur_delay_idx + 1) % CUR_DELAY_SIZE;
649 /* once every minute */
650 if (g_current_ms - dh->delay_base_time > 60 * 1000) {
651 size_t i;
653 dh->delay_base_time = g_current_ms;
654 dh->delay_base_idx = (dh->delay_base_idx + 1) % DELAY_BASE_HISTORY;
655 /* clear up the new delay base history spot by initializing
656 it to the current sample, then update it */
657 dh->delay_base_hist[dh->delay_base_idx] = sample;
658 dh->delay_base = dh->delay_base_hist[0];
659 /* Assign the lowest delay in the last 2 minutes to delay_base */
660 for (i = 0; i < DELAY_BASE_HISTORY; ++i) {
661 if (wrapping_compare_less(dh->delay_base_hist[i], dh->delay_base))
662 dh->delay_base = dh->delay_base_hist[i];
667 static uint32 dh_get_value(struct dh *dh)
669 uint32 value;
670 size_t i;
672 value = UINT_MAX;
673 for (i = 0; i < CUR_DELAY_SIZE; ++i) {
674 value = uint32_min(dh->cur_delay_hist[i], value);
676 /* value could be UINT_MAX if we have no samples yet... */
677 return value;
679 /*----------------------------------------------------------------------------*/
681 /*----------------------------------------------------------------------------*/
682 struct UTPSocket {
683 struct psa addr;
685 size_t idx;
687 uint16 reorder_count;
688 byte duplicate_ack;
690 /* the number of bytes we've received but not acked yet */
691 size_t bytes_since_ack;
693 /* the number of packets in the send queue. Packets that haven't
694 yet been sent count as well as packets marked as needing resend
695 the oldest un-acked packet in the send queue is seq_nr - cur_window_packets */
696 uint16 cur_window_packets;
698 /* how much of the window is used, number of bytes in-flight
699 packets that have not yet been sent do not count, packets
700 that are marked as needing to be re-sent (due to a timeout)
701 don't count either */
702 size_t cur_window;
703 /* maximum window size, in bytes */
704 size_t max_window;
705 /* SO_SNDBUF setting, in bytes */
706 size_t opt_sndbuf;
707 /* SO_RCVBUF setting, in bytes */
708 size_t opt_rcvbuf;
710 /* Is a FIN packet in the reassembly buffer? */
711 bool got_fin;
712 /* Timeout procedure */
713 bool fast_timeout;
715 /* max receive window for other end, in bytes */
716 size_t max_window_user;
717 /* 0 = original uTP header, 1 = second revision */
718 byte version;
719 enum CONN_STATE state;
720 /* TickCount when we last decayed window (wraps) */
721 int32 last_rwin_decay;
723 /* the sequence number of the FIN packet. This field is only set
724 when we have received a FIN, and the flag field has the FIN flag set.
725 it is used to know when it is safe to destroy the socket, we must have
726 received all packets up to this sequence number first. */
727 uint16 eof_pkt;
729 /* All sequence numbers up to including this have been properly received
730 by us */
731 uint16 ack_nr;
732 /* This is the sequence number for the next packet to be sent. */
733 uint16 seq_nr;
735 uint16 timeout_seq_nr;
737 /* This is the sequence number of the next packet we're allowed to
738 do a fast resend with. This makes sure we only do a fast-resend
739 once per packet. We can resend the packet with this sequence number
740 or any later packet (with a higher sequence number). */
741 uint16 fast_resend_seq_nr;
743 uint32 reply_micro;
745 /* the time when we need to send another ack. If there's
746 nothing to ack, this is a very large number */
747 uint32 ack_time;
749 uint32 last_got_packet;
750 uint32 last_sent_packet;
751 uint32 last_measured_delay;
752 uint32 last_maxed_out_window;
754 /* the last time we added send quota to the connection
755 when adding send quota, this is subtracted from the
756 current time multiplied by max_window / rtt
757 which is the current allowed send rate. */
758 int32 last_send_quota;
760 /* the number of bytes we are allowed to send on
761 this connection. If this is more than one packet
762 size when we run out of data to send, it is clamped
763 to the packet size
764 this value is multiplied by 100 in order to get
765 higher accuracy when dealing with low rates */
766 int32 send_quota;
768 SendToProc *send_to_proc;
769 void *send_to_userdata;
770 struct UTPFunctionTable func;
771 void *userdata;
773 /* Round trip time */
774 uint rtt;
775 /* Round trip time variance */
776 uint rtt_var;
777 /* Round trip timeout */
778 uint rto;
779 struct dh rtt_hist;
780 uint retransmit_timeout;
781 /* The RTO timer will timeout here. */
782 uint rto_timeout;
783 /* When the window size is set to zero, start this timer. It will send a new packet every 30secs. */
784 uint32 zerowindow_time;
786 uint32 conn_seed;
787 /* Connection ID for packets I receive */
788 uint32 conn_id_recv;
789 /* Connection ID for packets I send */
790 uint32 conn_id_send;
791 /* Last rcv window we advertised, in bytes */
792 size_t last_rcv_win;
794 struct dh our_hist;
795 struct dh their_hist;
797 /* extension bytes from SYN packet */
798 byte extensions[8];
800 struct scb inbuf;
801 struct scb outbuf;
803 #ifdef _DEBUG
804 /* Public stats, returned by UTP_GetStats(). See utp.h */
805 struct UTPStats _stats;
806 #endif /* _DEBUG */
809 static size_t us_get_udp_mtu(struct UTPSocket *us)
811 socklen_t len;
812 SOCKADDR_STORAGE sa;
814 sa = psa_get_sockaddr_storage(&us->addr, &len);
815 return UTP_GetUDPMTU((struct sockaddr *)&sa, len);
818 /* returns the max number of bytes of payload the uTP
819 connection is allowed to send */
820 static size_t us_get_packet_size(struct UTPSocket *us)
822 int header_size;
823 size_t mtu;
825 header_size = us->version == 1 ? sizeof(struct pf1) : sizeof(struct pf);
827 mtu = us_get_udp_mtu(us);
829 if (DYNAMIC_PACKET_SIZE_ENABLED) {
830 SOCKADDR_STORAGE sa;
831 size_t max_packet_size;
833 sa = psa_get_sockaddr_storage(&us->addr, NULL);
834 max_packet_size = UTP_sockaddr_GetPacketSize((struct sockaddr*)&sa);
835 return size_t_min(mtu - header_size, max_packet_size);
837 else
838 return mtu - header_size;
841 /* Calculates the current receive window */
842 static size_t us_get_rcv_window(struct UTPSocket *us)
844 size_t numbuf;
846 /* If we don't have a connection (such as during connection
847 establishment, always act as if we have an empty buffer). */
848 if (!us->userdata)
849 return us->opt_rcvbuf;
851 /* Trim window down according to what's already in buffer. */
852 numbuf = us->func.get_rb_size(us->userdata);
853 assert((int)numbuf >= 0);
854 return us->opt_rcvbuf > numbuf ? us->opt_rcvbuf - numbuf : 0;
857 /* Test if we're ready to decay max_window
858 XXX this breaks when spaced by > INT_MAX/2, which is 49
859 days; the failure mode in that case is we do an extra decay
860 or fail to do one when we really shouldn't. */
861 static bool us_can_decay_win(struct UTPSocket *us, int32 msec)
863 return msec - us->last_rwin_decay >= MAX_WINDOW_DECAY;
866 /* If we can, decay max window, returns true if we actually did so */
867 static void us_maybe_decay_win(struct UTPSocket *us)
869 if (us_can_decay_win(us, g_current_ms)) {
870 /* TCP uses 0.5 */
871 us->max_window = (size_t)(us->max_window * .5);
872 us->last_rwin_decay = g_current_ms;
873 if (us->max_window < MIN_WINDOW_SIZE)
874 us->max_window = MIN_WINDOW_SIZE;
878 static size_t us_get_header_size(struct UTPSocket *us)
880 return (us->version ? sizeof(struct pf1) : sizeof(struct pf));
883 static size_t us_get_header_extensions_size(struct UTPSocket *us)
885 return (us->version ? sizeof(struct pfe1) : sizeof(struct pfe));
888 static void us_sent_ack(struct UTPSocket *us)
890 us->ack_time = g_current_ms + 0x70000000;
891 us->bytes_since_ack = 0;
894 static size_t us_get_udp_overhead(struct UTPSocket *us)
896 socklen_t len;
897 SOCKADDR_STORAGE sa;
899 sa = psa_get_sockaddr_storage(&us->addr, &len);
900 return UTP_GetUDPOverhead((struct sockaddr *)&sa, len);
903 #if 0
904 /* we keep this function around but it's not used */
905 static uint64 us_get_global_utp_bytes_sent(struct UTPSocket *us)
907 socklen_t len;
908 SOCKADDR_STORAGE sa;
910 sa = psa_get_sockaddr_storage(&us->addr, &len);
911 return UTP_GetGlobalUTPBytesSent((struct sockaddr *)&sa, len);
913 #endif
915 static size_t us_get_overhead(struct UTPSocket *us)
917 return us_get_udp_overhead(us) + us_get_header_size(us);
919 /*----------------------------------------------------------------------------*/
921 struct array g_rst_info = {NULL, 0, 0, sizeof(struct RST_Info)};
922 struct array g_utp_sockets = {NULL, 0, 0, sizeof(struct UTPSocket*)};
924 static void UTP_RegisterSentPacket(size_t length) {
925 if (length <= PACKET_SIZE_MID) {
926 if (length <= PACKET_SIZE_EMPTY) {
927 _global_stats._nraw_send[PACKET_SIZE_EMPTY_BUCKET]++;
928 } else if (length <= PACKET_SIZE_SMALL) {
929 _global_stats._nraw_send[PACKET_SIZE_SMALL_BUCKET]++;
930 } else
931 _global_stats._nraw_send[PACKET_SIZE_MID_BUCKET]++;
932 } else {
933 if (length <= PACKET_SIZE_BIG) {
934 _global_stats._nraw_send[PACKET_SIZE_BIG_BUCKET]++;
935 } else
936 _global_stats._nraw_send[PACKET_SIZE_HUGE_BUCKET]++;
940 static void send_to_addr(SendToProc *send_to_proc, void *send_to_userdata, byte *p, size_t len, struct psa *addr)
942 socklen_t tolen;
943 SOCKADDR_STORAGE to;
945 to = psa_get_sockaddr_storage(addr, &tolen);
946 UTP_RegisterSentPacket(len);
947 send_to_proc(send_to_userdata, p, len, (const struct sockaddr *)&to, tolen);
950 static void us_send_data(struct UTPSocket *us, struct pf *b, size_t length, enum bandwidth_type_t type)
952 uint64 time;
953 struct pf1* b1;
954 #if g_log_utp_verbose
955 int flags;
956 uint16 seq_nr;
957 uint16 ack_nr;
958 #endif
960 /* time stamp this packet with local time, the stamp goes into
961 the header of every packet at the 8th byte for 8 bytes :
962 two integers, check packet.h for more */
963 time = UTP_GetMicroseconds();
965 b1 = (struct pf1*)b;
966 if (us->version == 0) {
967 b->tv_sec = htonl((uint32)(time / 1000000));
968 b->tv_usec = htonl(time % 1000000);
969 b->reply_micro = htonl(us->reply_micro);
970 } else {
971 b1->tv_usec = htonl((uint32)time);
972 b1->reply_micro = htonl(us->reply_micro);
975 us->last_sent_packet = g_current_ms;
977 #ifdef _DEBUG
978 us->_stats._nbytes_xmit += length;
979 ++us->_stats._nxmit;
980 #endif
981 if (us->userdata) {
982 size_t n;
984 if (type == payload_bandwidth) {
985 /* if this packet carries payload, just
986 count the header as overhead */
987 type = header_overhead;
988 n = us_get_overhead(us);
989 } else
990 n = length + us_get_udp_overhead(us);
991 us->func.on_overhead(us->userdata, true, n, type);
993 #if g_log_utp_verbose
994 flags = us->version == 0 ? b->flags : pf1_type(b1);
995 seq_nr = us->version == 0 ? ntohs(b->seq_nr) : ntohs(b1->seq_nr);
996 ack_nr = us->version == 0 ? ntohs(b->ack_nr) : ntohs(b1->ack_nr);
997 LOG_UTPV("0x%08x: send %s len:%u id:%u timestamp:"I64u" reply_micro:%u flags:%s seq_nr:%u ack_nr:%u", us, addrfmt(&us->addr, addrbuf), (uint)length, us->conn_id_send, time, us->reply_micro, flagnames[flags], seq_nr, ack_nr);
998 #endif
999 send_to_addr(us->send_to_proc, us->send_to_userdata, (byte*)b, length, &us->addr);
1002 /* XXX:carefull, synack defaults to false */
1003 static void us_send_ack(struct UTPSocket *us, bool synack)
1005 /* all following structs fit in PacketFormatExtensions */
1006 struct pfe pfe;
1007 struct pfe1 *pfe1;
1008 struct pfa *pfa;
1009 struct pfa1 *pfa1;
1010 size_t len;
1012 memset(&pfe, 0, sizeof(pfe));
1014 pfe1 = (struct pfe1*)&pfe;
1015 pfa = (struct pfa*)pfe1;
1016 pfa1 = (struct pfa1*)pfe1;
1018 us->last_rcv_win = us_get_rcv_window(us);
1019 if (us->version == 0) {
1020 pfa->pf.connid = htonl(us->conn_id_send);
1021 pfa->pf.ack_nr = htons(us->ack_nr);
1022 pfa->pf.seq_nr = htons(us->seq_nr);
1023 pfa->pf.flags = ST_STATE;
1024 pfa->pf.ext = 0;
1025 pfa->pf.windowsize = (byte)DIV_ROUND_UP(us->last_rcv_win, PACKET_SIZE);
1026 len = sizeof(struct pf);
1027 } else {
1028 pf1_version_set(&pfa1->pf, 1);
1029 pf1_type_set(&pfa1->pf, ST_STATE);
1030 pfa1->pf.ext = 0;
1031 pfa1->pf.connid = htons((uint16)us->conn_id_send);
1032 pfa1->pf.ack_nr = htons(us->ack_nr);
1033 pfa1->pf.seq_nr = htons(us->seq_nr);
1034 pfa1->pf.windowsize = htonl((uint32)us->last_rcv_win);
1035 len = sizeof(struct pf1);
1038 /* we never need to send EACK for connections
1039 that are shutting down */
1040 if (us->reorder_count != 0 && us->state < CS_GOT_FIN) {
1041 uint m;
1042 size_t window;
1043 size_t i;
1045 /* if reorder count > 0, send an EACK.
1046 reorder count should always be 0
1047 for synacks, so this should not be
1048 as synack */
1049 assert(!synack);
1050 if (us->version == 0) {
1051 pfa->pf.ext = 1;
1052 pfa->ext_next = 0;
1053 pfa->ext_len = 4;
1054 } else {
1055 pfa1->pf.ext = 1;
1056 pfa1->ext_next = 0;
1057 pfa1->ext_len = 4;
1060 m = 0;
1062 /* reorder count should only be non-zero
1063 if the packet ack_nr + 1 has not yet
1064 been received */
1065 assert(scb_get(&us->inbuf, us->ack_nr + 1) == NULL);
1066 window = size_t_min(14 + 16, scb_size(&us->inbuf));
1067 /* Generate bit mask of segments received. */
1068 for (i = 0; i < window; ++i) {
1069 if (scb_get(&us->inbuf, us->ack_nr + i + 2) != NULL) {
1070 m |= 1 << i;
1071 LOG_UTPV("0x%08x: EACK packet [%u]", us, us->ack_nr + i + 2);
1074 if (us->version == 0) {
1075 pfa->acks[0] = (byte)m;
1076 pfa->acks[1] = (byte)(m >> 8);
1077 pfa->acks[2] = (byte)(m >> 16);
1078 pfa->acks[3] = (byte)(m >> 24);
1079 } else {
1080 pfa1->acks[0] = (byte)m;
1081 pfa1->acks[1] = (byte)(m >> 8);
1082 pfa1->acks[2] = (byte)(m >> 16);
1083 pfa1->acks[3] = (byte)(m >> 24);
1085 len += 4 + 2;
1086 LOG_UTPV("0x%08x: Sending EACK %u [%u] bits:[%032b]", us, us->ack_nr, us->conn_id_send, m);
1087 } else if (synack) {
1088 /* we only send "extensions" in response to SYN
1089 and the reorder count is 0 in that state */
1091 LOG_UTPV("0x%08x: Sending ACK %u [%u] with extension bits", us, us->ack_nr, us->conn_id_send);
1092 if (us->version == 0) {
1093 pfe.pf.ext = 2;
1094 pfe.ext_next = 0;
1095 pfe.ext_len = 8;
1096 memset(&pfe.extensions[0], 0, 8);
1097 } else {
1098 pfe1->pf.ext = 2;
1099 pfe1->ext_next = 0;
1100 pfe1->ext_len = 8;
1101 memset(&pfe1->extensions[0], 0, 8);
1103 len += 8 + 2;
1104 } else {
1105 LOG_UTPV("0x%08x: Sending ACK %u [%u]", us, us->ack_nr, us->conn_id_send);
1108 us_sent_ack(us);
1109 us_send_data(us, (struct pf*)&pfe, len, ack_overhead);
1112 static void us_send_keep_alive(struct UTPSocket *us)
1114 us->ack_nr--;
1115 LOG_UTPV("0x%08x: Sending KeepAlive ACK %u [%u]", us, us->ack_nr, us->conn_id_send);
1116 us_send_ack(us, false);
1117 us->ack_nr++;
1120 /* XXX: static class member function */
1121 static void us_send_rst(SendToProc *send_to_proc, void *send_to_userdata, struct psa *addr, uint32 conn_id_send, uint16 ack_nr, uint16 seq_nr, byte version)
1123 struct pf pf; /* 23 bytes */
1124 struct pf1 *pf1; /* 20 bytes */
1125 size_t len;
1127 memset(&pf, 0, sizeof(pf));
1128 pf1 = (struct pf1*)&pf;
1130 if (version == 0) {
1131 pf.connid = htonl(conn_id_send);
1132 pf.ack_nr = htons(ack_nr);
1133 pf.seq_nr = htons(seq_nr);
1134 pf.flags = ST_RESET;
1135 pf.ext = 0;
1136 pf.windowsize = 0;
1137 len = sizeof(pf);
1138 } else {
1139 pf1_version_set(pf1, 1);
1140 pf1_type_set(pf1, ST_RESET);
1141 pf1->ext = 0;
1142 pf1->connid = htons((uint16)conn_id_send);
1143 pf1->ack_nr = htons(ack_nr);
1144 pf1->seq_nr = htons(seq_nr);
1145 pf1->windowsize = 0;
1146 len = sizeof(*pf1);
1149 LOG_UTPV("%s: Sending RST id:%u seq_nr:%u ack_nr:%u", addrfmt(addr, addrbuf), conn_id_send, seq_nr, ack_nr);
1150 LOG_UTPV("send %s len:%u id:%u", addrfmt(addr, addrbuf), (uint)len, conn_id_send);
1151 send_to_addr(send_to_proc, send_to_userdata, (byte*)pf1, len, addr);
1154 static void us_send_packet(struct UTPSocket *us, struct op *pkt)
1156 size_t max_send;
1157 size_t packet_size;
1158 struct pf1* p1;
1159 struct pf* p;
1161 /* only count against the quota the first time we
1162 send the packet. Don't enforce quota when closing
1163 a socket. Only enforce the quota when we're sending
1164 at slow rates (max window < packet size) */
1165 max_send = size_t_min3(us->max_window, us->opt_sndbuf, us->max_window_user);
1167 if (pkt->transmissions == 0 || pkt->need_resend)
1168 us->cur_window += pkt->payload;
1170 packet_size = us_get_packet_size(us);
1171 if (pkt->transmissions == 0 && max_send < packet_size) {
1172 assert(us->state == CS_FIN_SENT || (int32)pkt->payload <= us->send_quota / 100);
1173 us->send_quota = us->send_quota - (int32)(pkt->payload * 100);
1176 pkt->need_resend = false;
1178 p1 = (struct pf1*)pkt->data;
1179 p = (struct pf*)pkt->data;
1180 if (us->version == 0) {
1181 p->ack_nr = htons(us->ack_nr);
1182 } else {
1183 p1->ack_nr = htons(us->ack_nr);
1185 pkt->time_sent = UTP_GetMicroseconds();
1186 pkt->transmissions++;
1187 us_sent_ack(us);
1188 us_send_data(us, (struct pf*)pkt->data, pkt->length, (us->state == CS_SYN_SENT) ? connect_overhead : (pkt->transmissions == 1) ? payload_bandwidth : retransmit_overhead);
1191 static bool us_is_writable(struct UTPSocket *us, size_t to_write)
1193 size_t max_send;
1194 size_t packet_size;
1196 /* return true if it's OK to stuff another packet into the
1197 outgoing queue. Since we may be using packet pacing, we
1198 might not actually send the packet right away to affect the
1199 cur_window. The only thing that happens when we add another
1200 packet is that cur_window_packets is increased. */
1201 max_send = size_t_min3(us->max_window, us->opt_sndbuf, us->max_window_user);
1203 packet_size = us_get_packet_size(us);
1205 if (us->cur_window + packet_size >= us->max_window)
1206 us->last_maxed_out_window = g_current_ms;
1208 /* if we don't have enough quota, we can't write regardless */
1209 if (USE_PACKET_PACING)
1210 if (us->send_quota / 100 < (int32)to_write)
1211 return false;
1213 /* subtract one to save space for the FIN packet */
1214 if (us->cur_window_packets >= OUTGOING_BUFFER_MAX_SIZE - 1)
1215 return false;
1217 /* if sending another packet would not make the window exceed
1218 the max_window, we can write */
1219 if (us->cur_window + packet_size <= max_send)
1220 return true;
1222 /* if the window size is less than a packet, and we have enough
1223 quota to send a packet, we can write, even though it would
1224 make the window exceed the max size
1225 the last condition is needed to not put too many packets
1226 in the send buffer. cur_window isn't updated until we flush
1227 the send buffer, so we need to take the number of packets
1228 into account */
1229 if (USE_PACKET_PACING)
1230 if (us->max_window < to_write && us->cur_window < us->max_window && us->cur_window_packets == 0)
1231 return true;
1232 return false;
1235 static bool us_flush_packets(struct UTPSocket *us)
1237 size_t packet_size;
1238 uint16 i;
1240 packet_size = us_get_packet_size(us);
1242 /* send packets that are waiting on the pacer to be sent
1243 i has to be an unsigned 16 bit counter to wrap correctly
1244 signed types are not guaranteed to wrap the way you expect */
1245 for (i = us->seq_nr - us->cur_window_packets; i != us->seq_nr; ++i) {
1246 struct op *pkt;
1248 pkt = (struct op*)scb_get(&us->outbuf, i);
1249 if (pkt == 0 || (pkt->transmissions > 0 && pkt->need_resend == false))
1250 continue;
1251 /* have we run out of quota? */
1252 if (!us_is_writable(us, pkt->payload))
1253 return true;
1255 /* Nagle check
1256 don't send the last packet if we have one packet in-flight
1257 and the current packet is still smaller than packet_size. */
1258 if (i != ((us->seq_nr - 1) & ACK_NR_MASK) || us->cur_window_packets == 1 || pkt->payload >= packet_size) {
1259 us_send_packet(us, pkt);
1261 /* No need to send another ack if there is nothing to reorder. */
1262 if (us->reorder_count == 0)
1263 us_sent_ack(us);
1266 return false;
1269 static void us_write_outgoing_packet(struct UTPSocket *us, size_t payload, uint flags)
1271 size_t packet_size;
1273 /* Setup initial timeout timer */
1274 if (us->cur_window_packets == 0) {
1275 us->retransmit_timeout = us->rto;
1276 us->rto_timeout = g_current_ms + us->retransmit_timeout;
1277 assert(us->cur_window == 0);
1280 packet_size = us_get_packet_size(us);
1281 do {
1282 size_t added;
1283 struct op *pkt;
1284 size_t header_size;
1285 bool append;
1286 struct pf* p;
1287 struct pf1* p1;
1289 assert(us->cur_window_packets < OUTGOING_BUFFER_MAX_SIZE);
1290 assert(flags == ST_DATA || flags == ST_FIN);
1292 added = 0;
1293 pkt = NULL;
1295 if (us->cur_window_packets > 0)
1296 pkt = (struct op*)scb_get(&us->outbuf, us->seq_nr - 1);
1298 header_size = us_get_header_size(us);
1299 append = true;
1301 /* if there's any room left in the last packet in the window
1302 and it hasn't been sent yet, fill that frame first */
1303 if (payload && pkt && !pkt->transmissions && pkt->payload < packet_size) {
1304 /* Use the previous unsent packet */
1305 added = size_t_min(payload + pkt->payload, size_t_max(packet_size, pkt->payload)) - pkt->payload;
1306 pkt = realloc(pkt, (sizeof(struct op) - 1) + header_size + pkt->payload + added);
1307 scb_put(&us->outbuf, us->seq_nr - 1, pkt);
1308 append = false;
1309 assert(!pkt->need_resend);
1310 } else {
1311 /* Create the packet to send. */
1312 added = payload;
1313 pkt = malloc((sizeof(struct op) - 1) + header_size + added);
1314 pkt->payload = 0;
1315 pkt->transmissions = 0;
1316 pkt->need_resend = false;
1319 if (added)
1320 /* Fill it with data from the upper layer. */
1321 us->func.on_write(us->userdata, pkt->data + header_size + pkt->payload, added);
1323 pkt->payload += added;
1324 pkt->length = header_size + pkt->payload;
1326 us->last_rcv_win = us_get_rcv_window(us);
1328 p = (struct pf*)pkt->data;
1329 p1 = (struct pf1*)pkt->data;
1330 if (us->version == 0) {
1331 p->connid = htonl(us->conn_id_send);
1332 p->ext = 0;
1333 p->windowsize = (byte)DIV_ROUND_UP(us->last_rcv_win, PACKET_SIZE);
1334 p->ack_nr = htons(us->ack_nr);
1335 p->flags = flags;
1336 } else {
1337 pf1_version_set(p1, 1);
1338 pf1_type_set(p1, flags);
1339 p1->ext = 0;
1340 p1->connid = htons((uint16)us->conn_id_send);
1341 p1->windowsize = htonl((uint32)us->last_rcv_win);
1342 p1->ack_nr = htons(us->ack_nr);
1345 if (append) {
1346 /* Remember the message in the outgoing queue. */
1347 scb_ensure_size(&us->outbuf, us->seq_nr, us->cur_window_packets);
1348 scb_put(&us->outbuf, us->seq_nr, pkt);
1349 if (us->version == 0)
1350 p->seq_nr = htons(us->seq_nr);
1351 else
1352 p1->seq_nr = htons(us->seq_nr);
1353 us->seq_nr++;
1354 us->cur_window_packets++;
1356 payload -= added;
1358 } while (payload);
1360 us_flush_packets(us);
1363 static void us_update_send_quota(struct UTPSocket *us)
1365 int dt;
1366 size_t add;
1368 dt = g_current_ms - us->last_send_quota;
1370 if (dt == 0)
1371 return;
1372 us->last_send_quota = g_current_ms;
1373 add = us->max_window * dt * 100 / (us->rtt_hist.delay_base?us->rtt_hist.delay_base:50);
1374 if (add > us->max_window * 100 && add > MAX_CWND_INCREASE_BYTES_PER_RTT * 100)
1375 add = us->max_window;
1376 us->send_quota += (int32)add;
1377 /* LOG_UTPV("0x%08x: UTPSocket::update_send_quota dt:%d rtt:%u max_window:%u quota:%d", this, dt, rtt, (uint)max_window, send_quota / 100);*/
1380 #ifdef _DEBUG
1381 static void us_check_invariant(struct UTPSocket *us)
1383 size_t outstanding_bytes;
1384 int i;
1386 if (us->reorder_count > 0)
1387 assert(scb_get(&us->inbuf, us->ack_nr + 1) == NULL);
1389 outstanding_bytes = 0;
1390 for (i = 0; i < us->cur_window_packets; ++i) {
1391 struct op *pkt;
1393 pkt = (struct op*)scb_get(&us->outbuf, us->seq_nr - i - 1);
1394 if (pkt == 0 || pkt->transmissions == 0 || pkt->need_resend)
1395 continue;
1396 outstanding_bytes += pkt->payload;
1398 assert(outstanding_bytes == us->cur_window);
1400 #endif
1402 static void us_check_timeouts(struct UTPSocket *us)
1404 int32 limit;
1405 #ifdef _DEBUG
1406 us_check_invariant(us);
1407 #endif
1409 /* this invariant should always be true */
1410 assert(us->cur_window_packets == 0 || scb_get(&us->outbuf, us->seq_nr - us->cur_window_packets));
1412 LOG_UTPV("0x%08x: CheckTimeouts timeout:%d max_window:%u cur_window:%u quota:%d " "state:%s cur_window_packets:%u bytes_since_ack:%u ack_time:%d", us, (int)(us->rto_timeout - g_current_ms), (uint)us->max_window, (uint)us->cur_window, us->send_quota / 100, statenames[us->state], us->cur_window_packets, (uint)us->bytes_since_ack, (int)(g_current_ms - us->ack_time));
1414 us_update_send_quota(us);
1415 us_flush_packets(us);
1417 if (USE_PACKET_PACING) {
1418 /* In case the new send quota made it possible to send another packet
1419 Mark the socket as writable. If we don't use pacing, the send
1420 quota does not affect if the socket is writeable
1421 if we don't use packet pacing, the writable event is triggered
1422 whenever the cur_window falls below the max_window, so we don't
1423 need this check then */
1424 if (us->state == CS_CONNECTED_FULL && us_is_writable(us, us_get_packet_size(us))) {
1425 us->state = CS_CONNECTED;
1426 LOG_UTPV("0x%08x: Socket writable. max_window:%u cur_window:%u quota:%d packet_size:%u", us, (uint)us->max_window, (uint)us->cur_window, us->send_quota / 100, (uint)us_get_packet_size(us));
1427 us->func.on_state(us->userdata, UTP_STATE_WRITABLE);
1431 switch (us->state) {
1432 case CS_SYN_SENT:
1433 case CS_CONNECTED_FULL:
1434 case CS_CONNECTED:
1435 case CS_FIN_SENT: {
1437 /* Reset max window... */
1438 if ((int)(g_current_ms - us->zerowindow_time) >= 0 && us->max_window_user == 0)
1439 us->max_window_user = PACKET_SIZE;
1441 if ((int)(g_current_ms - us->rto_timeout) >= 0 && (!(USE_PACKET_PACING) || us->cur_window_packets > 0) && us->rto_timeout > 0) {
1442 uint new_timeout;
1443 int i;
1446 OutgoingPacket *pkt = (OutgoingPacket*)outbuf.get(seq_nr - cur_window_packets);
1448 // If there were a lot of retransmissions, force recomputation of round trip time
1449 if (pkt->transmissions >= 4)
1450 rtt = 0;
1453 /* Increase RTO */
1454 new_timeout = us->retransmit_timeout * 2;
1455 if (new_timeout >= 30000 || (us->state == CS_SYN_SENT && new_timeout > 6000)) {
1456 /* more than 30 seconds with no reply. kill it.
1457 if we haven't even connected yet, give up sooner. 6 seconds
1458 means 2 tries at the following timeouts: 3, 6 seconds */
1459 if (us->state == CS_FIN_SENT)
1460 us->state = CS_DESTROY;
1461 else
1462 us->state = CS_RESET;
1463 us->func.on_error(us->userdata, ETIMEDOUT);
1464 goto getout;
1467 us->retransmit_timeout = new_timeout;
1468 us->rto_timeout = g_current_ms + new_timeout;
1470 /* On Timeout */
1471 us->duplicate_ack = 0;
1473 /* rate = min_rate */
1474 us->max_window = us_get_packet_size(us);
1475 us->send_quota = int32_max((int32)us->max_window * 100, us->send_quota);
1477 /* every packet should be considered lost */
1478 for (i = 0; i < us->cur_window_packets; ++i) {
1479 struct op *pkt;
1481 pkt = (struct op*)scb_get(&us->outbuf, us->seq_nr - i - 1);
1482 if (pkt == 0 || pkt->transmissions == 0 || pkt->need_resend)
1483 continue;
1484 pkt->need_resend = true;
1485 assert(us->cur_window >= pkt->payload);
1486 us->cur_window -= pkt->payload;
1489 /* used in parse_log.py */
1490 LOG_UTP("0x%08x: Packet timeout. Resend. seq_nr:%u. timeout:%u max_window:%u", us, us->seq_nr - us->cur_window_packets, us->retransmit_timeout, (uint)us->max_window);
1492 us->fast_timeout = true;
1493 us->timeout_seq_nr = us->seq_nr;
1495 if (us->cur_window_packets > 0) {
1496 struct op *pkt;
1498 pkt = (struct op*)scb_get(&us->outbuf, us->seq_nr - us->cur_window_packets);
1499 assert(pkt);
1500 us->send_quota = int32_max((int32)pkt->length * 100, us->send_quota);
1502 /* Re-send the packet. */
1503 us_send_packet(us, pkt);
1507 /* Mark the socket as writable */
1508 if (us->state == CS_CONNECTED_FULL && us_is_writable(us, us_get_packet_size(us))) {
1509 us->state = CS_CONNECTED;
1510 LOG_UTPV("0x%08x: Socket writable. max_window:%u cur_window:%u quota:%d packet_size:%u", us, (uint)us->max_window, (uint)us->cur_window, us->send_quota / 100, (uint)us_get_packet_size(us));
1511 us->func.on_state(us->userdata, UTP_STATE_WRITABLE);
1514 if (us->state >= CS_CONNECTED && us->state <= CS_FIN_SENT) {
1515 /* Send acknowledgment packets periodically, or when the threshold is reached */
1516 if (us->bytes_since_ack > DELAYED_ACK_BYTE_THRESHOLD || (int)(g_current_ms - us->ack_time) >= 0)
1517 us_send_ack(us, false);
1519 if ((int)(g_current_ms - us->last_sent_packet) >= KEEPALIVE_INTERVAL)
1520 us_send_keep_alive(us);
1522 break;
1525 /* Close? */
1526 case CS_GOT_FIN:
1527 case CS_DESTROY_DELAY:
1528 if ((int)(g_current_ms - us->rto_timeout) >= 0) {
1529 us->state = (us->state == CS_DESTROY_DELAY) ? CS_DESTROY : CS_RESET;
1530 if (us->cur_window_packets > 0 && us->userdata)
1531 us->func.on_error(us->userdata, ECONNRESET);
1533 break;
1534 /* prevent warning */
1535 case CS_IDLE:
1536 case CS_RESET:
1537 case CS_DESTROY:
1538 break;
1541 getout:
1543 /* make sure we don't accumulate quota when we don't have
1544 anything to send */
1545 limit = int32_max((int32)us->max_window / 2, 5 * (int32)us_get_packet_size(us)) * 100;
1546 if (us->send_quota > limit) us->send_quota = limit;
1549 /* returns:
1550 0: the packet was acked.
1551 1: it means that the packet had already been acked
1552 2: the packet has not been sent yet */
1553 static int us_ack_packet(struct UTPSocket *us, uint16 seq)
1555 struct op *pkt;
1557 pkt = (struct op*)scb_get(&us->outbuf, seq);
1559 /* the packet has already been acked (or not sent) */
1560 if (pkt == NULL) {
1561 LOG_UTPV("0x%08x: got ack for:%u (already acked, or never sent)", us, seq);
1562 return 1;
1565 /* can't ack packets that haven't been sent yet! */
1566 if (pkt->transmissions == 0) {
1567 LOG_UTPV("0x%08x: got ack for:%u (never sent, pkt_size:%u need_resend:%u)", us, seq, (uint)pkt->payload, pkt->need_resend);
1568 return 2;
1571 LOG_UTPV("0x%08x: got ack for:%u (pkt_size:%u need_resend:%u)", us, seq, (uint)pkt->payload, pkt->need_resend);
1573 scb_put(&us->outbuf, seq, NULL);
1575 /* if we never re-sent the packet, update the RTT estimate */
1576 if (pkt->transmissions == 1) {
1577 uint32 ertt;
1579 /* Estimate the round trip time. */
1580 ertt = (uint32)((UTP_GetMicroseconds() - pkt->time_sent) / 1000);
1582 if (us->rtt == 0) {
1583 /* First round trip time sample */
1584 us->rtt = ertt;
1585 us->rtt_var = ertt / 2;
1586 /* sanity check. rtt should never be more than 6 seconds */
1587 /* assert(rtt < 6000); */
1588 } else {
1589 int delta;
1591 /* Compute new round trip times */
1592 delta = (int)us->rtt - ertt;
1593 us->rtt_var = us->rtt_var + (int)(abs(delta) - us->rtt_var) / 4;
1594 us->rtt = us->rtt - us->rtt/8 + ertt/8;
1595 /* sanity check. rtt should never be more than 6 seconds */
1596 /* assert(rtt < 6000); */
1597 dh_add_sample(&us->rtt_hist, ertt);
1599 us->rto = uint_max(us->rtt + us->rtt_var * 4, 500);
1600 LOG_UTPV("0x%08x: rtt:%u avg:%u var:%u rto:%u", us, ertt, us->rtt, us->rtt_var, us->rto);
1602 us->retransmit_timeout = us->rto;
1603 us->rto_timeout = g_current_ms + us->rto;
1604 /* if need_resend is set, this packet has already
1605 been considered timed-out, and is not included in
1606 the cur_window anymore */
1607 if (!pkt->need_resend) {
1608 assert(us->cur_window >= pkt->payload);
1609 us->cur_window -= pkt->payload;
1611 free(pkt);
1612 return 0;
1615 /* count the number of bytes that were acked by the EACK header */
1616 /* XXX: carefull min_rtt was a cerkerk reference */
1617 static size_t us_selective_ack_bytes(struct UTPSocket *us, uint base, byte* mask, byte len, int64 *min_rtt)
1619 size_t acked_bytes;
1620 int bits;
1622 if (us->cur_window_packets == 0)
1623 return 0;
1625 acked_bytes = 0;
1626 bits = len * 8;
1628 do {
1629 uint v;
1630 struct op *pkt;
1632 v = base + bits;
1634 /* ignore bits that haven't been sent yet
1635 see comment in UTPSocket::selective_ack */
1636 if (((us->seq_nr - v - 1) & ACK_NR_MASK) >= (uint16)(us->cur_window_packets - 1))
1637 continue;
1639 /* ignore bits that represents packets we haven't sent yet
1640 or packets that have already been acked */
1641 pkt = (struct op*)scb_get(&us->outbuf, v);
1642 if (!pkt || pkt->transmissions == 0)
1643 continue;
1645 /* Count the number of segments that were successfully received past it. */
1646 if (bits >= 0 && mask[bits>>3] & (1 << (bits & 7))) {
1647 assert((int)(pkt->payload) >= 0);
1648 acked_bytes += pkt->payload;
1649 *min_rtt = int64_min(*min_rtt, UTP_GetMicroseconds() - pkt->time_sent);
1650 continue;
1652 } while (--bits >= -1);
1653 return acked_bytes;
1656 #define MAX_EACK 128
1658 static void us_selective_ack(struct UTPSocket *us, uint base, byte *mask, byte len)
1660 int bits;
1661 int count;
1662 int resends[MAX_EACK];
1663 int nr;
1664 bool back_off;
1665 int i;
1667 if (us->cur_window_packets == 0)
1668 return;
1670 /* the range is inclusive [0, 31] bits */
1671 bits = len * 8 - 1;
1673 count = 0;
1675 /* resends is a stack of sequence numbers we need to resend. Since we
1676 iterate in reverse over the acked packets, at the end, the top packets
1677 are the ones we want to resend */
1678 /* resends[MAX_EACK]; */
1679 nr = 0;
1681 LOG_UTPV("0x%08x: Got EACK [%032b] base:%u", us, *(uint32*)mask, base);
1682 do {
1683 uint v;
1684 bool bit_set;
1685 struct op *pkt;
1687 /* we're iterating over the bits from higher sequence numbers
1688 to lower (kind of in reverse order, wich might not be very
1689 intuitive) */
1690 v = base + bits;
1692 /* ignore bits that haven't been sent yet
1693 and bits that fall below the ACKed sequence number
1694 this can happen if an EACK message gets
1695 reordered and arrives after a packet that ACKs up past
1696 the base for thie EACK message
1698 this is essentially the same as:
1699 if v >= seq_nr || v <= seq_nr - cur_window_packets
1700 but it takes wrapping into account
1702 if v == seq_nr the -1 will make it wrap. if v > seq_nr
1703 it will also wrap (since it will fall further below 0)
1704 and be > cur_window_packets.
1705 if v == seq_nr - cur_window_packets, the result will be
1706 seq_nr - (seq_nr - cur_window_packets) - 1
1707 == seq_nr - seq_nr + cur_window_packets - 1
1708 == cur_window_packets - 1 which will be caught by the
1709 test. If v < seq_nr - cur_window_packets the result will grow
1710 fall furhter outside of the cur_window_packets range.
1712 sequence number space:
1714 rejected < accepted > rejected
1715 <============+--------------+============>
1718 (seq_nr-wnd) seq_nr */
1720 if (((us->seq_nr - v - 1) & ACK_NR_MASK) >= (uint16)(us->cur_window_packets - 1))
1721 continue;
1723 /* this counts as a duplicate ack, even though we might have
1724 received an ack for this packet previously (in another EACK
1725 message for instance) */
1726 bit_set = bits >= 0 && mask[bits>>3] & (1 << (bits & 7));
1728 /* if this packet is acked, it counts towards the duplicate ack counter */
1729 if (bit_set)
1730 count++;
1732 /* ignore bits that represents packets we haven't sent yet
1733 or packets that have already been acked */
1734 pkt = (struct op*)scb_get(&us->outbuf, v);
1735 if (!pkt || pkt->transmissions == 0) {
1736 LOG_UTPV("0x%08x: skipping %u. pkt:%08x transmissions:%u %s", us, v, pkt, pkt?pkt->transmissions:0, pkt?"(not sent yet?)":"(already acked?)");
1737 continue;
1740 /* Count the number of segments that were successfully received past it. */
1741 if (bit_set) {
1742 /* the selective ack should never ACK the packet we're waiting for to decrement cur_window_packets */
1743 assert((v & us->outbuf.mask) != ((us->seq_nr - us->cur_window_packets) & us->outbuf.mask));
1744 us_ack_packet(us, v);
1745 continue;
1748 /* Resend segments
1749 if count is less than our re-send limit, we haven't seen enough
1750 acked packets in front of this one to warrant a re-send.
1751 if count == 0, we're still going through the tail of zeroes */
1752 if (((v - us->fast_resend_seq_nr) & ACK_NR_MASK) <= OUTGOING_BUFFER_MAX_SIZE && count >= DUPLICATE_ACKS_BEFORE_RESEND && us->duplicate_ack < DUPLICATE_ACKS_BEFORE_RESEND) {
1753 /* resends is a stack, and we're mostly interested in the top of it
1754 if we're full, just throw away the lower half */
1755 if (nr >= MAX_EACK - 2) {
1756 memmove(resends, &resends[MAX_EACK/2], MAX_EACK/2 * sizeof(resends[0]));
1757 nr -= MAX_EACK / 2;
1759 resends[nr++] = v;
1760 LOG_UTPV("0x%08x: no ack for %u", us, v);
1761 } else {
1762 LOG_UTPV("0x%08x: not resending %u count:%d dup_ack:%u fast_resend_seq_nr:%u", us, v, count, us->duplicate_ack, us->fast_resend_seq_nr);
1764 } while (--bits >= -1);
1766 if (((base - 1 - us->fast_resend_seq_nr) & ACK_NR_MASK) <= OUTGOING_BUFFER_MAX_SIZE && count >= DUPLICATE_ACKS_BEFORE_RESEND) {
1767 /* if we get enough duplicate acks to start
1768 resending, the first packet we should resend
1769 is base-1 */
1770 resends[nr++] = (base - 1) & ACK_NR_MASK;
1771 } else {
1772 LOG_UTPV("0x%08x: not resending %u count:%d dup_ack:%u fast_resend_seq_nr:%u", us, base - 1, count, us->duplicate_ack, us->fast_resend_seq_nr);
1775 back_off = false;
1776 i = 0;
1777 while (nr > 0) {
1778 uint v;
1779 struct op *pkt;
1781 v = resends[--nr];
1782 /* don't consider the tail of 0:es to be lost packets
1783 only unacked packets with acked packets after should
1784 be considered lost */
1785 pkt = (struct op*)scb_get(&us->outbuf, v);
1787 /* this may be an old (re-ordered) packet, and some of the
1788 packets in here may have been acked already. In which
1789 case they will not be in the send queue anymore */
1790 if (!pkt)
1791 continue;
1793 /* used in parse_log.py */
1794 LOG_UTP("0x%08x: Packet %u lost. Resending", us, v);
1796 /* On Loss */
1797 back_off = true;
1798 #ifdef _DEBUG
1799 ++us->_stats._rexmit;
1800 #endif
1801 us_send_packet(us, pkt);
1802 us->fast_resend_seq_nr = v + 1;
1804 /* Re-send max 4 packets. */
1805 if (++i >= 4)
1806 break;
1809 if (back_off)
1810 us_maybe_decay_win(us);
1812 us->duplicate_ack = count;
1815 static void us_apply_ledbat_ccontrol(struct UTPSocket *us, size_t bytes_acked, uint32 actual_delay, int64 min_rtt)
1817 int32 our_delay;
1818 SOCKADDR_STORAGE sa;
1819 int target;
1820 double off_target;
1821 double window_factor;
1822 double delay_factor;
1823 double scaled_gain;
1825 /* the delay can never be greater than the rtt. The min_rtt
1826 variable is the RTT in microseconds */
1828 assert(min_rtt >= 0);
1829 our_delay = uint32_min(dh_get_value(&us->our_hist), (uint32)min_rtt);
1830 assert(our_delay != INT_MAX);
1831 assert(our_delay >= 0);
1833 sa = psa_get_sockaddr_storage(&us->addr, NULL);
1834 UTP_DelaySample((struct sockaddr*)&sa, our_delay / 1000);
1836 /* This test the connection under heavy load from foreground
1837 traffic. Pretend that our delays are very high to force the
1838 connection to use sub-packet size window sizes */
1839 /*our_delay *= 4; */
1841 /* target is microseconds */
1842 target = CCONTROL_TARGET;
1843 if (target <= 0)
1844 target = 100000;
1846 off_target = target - our_delay;
1848 /* this is the same as:
1850 (min(off_target, target) / target) * (bytes_acked / max_window) * MAX_CWND_INCREASE_BYTES_PER_RTT
1852 so, it's scaling the max increase by the fraction of the window this ack represents, and the fraction
1853 of the target delay the current delay represents.
1854 The min() around off_target protects against crazy values of our_delay, which may happen when th
1855 timestamps wraps, or by just having a malicious peer sending garbage. This caps the increase
1856 of the window size to MAX_CWND_INCREASE_BYTES_PER_RTT per rtt.
1857 as for large negative numbers, this direction is already capped at the min packet size further down
1858 the min around the bytes_acked protects against the case where the window size was recently
1859 shrunk and the number of acked bytes exceeds that. This is considered no more than one full
1860 window, in order to keep the gain within sane boundries. */
1862 assert(bytes_acked > 0);
1863 window_factor = (double)size_t_min(bytes_acked, us->max_window) / (double)size_t_max(us->max_window, bytes_acked);
1864 delay_factor = off_target / target;
1865 scaled_gain = MAX_CWND_INCREASE_BYTES_PER_RTT * window_factor * delay_factor;
1867 /* since MAX_CWND_INCREASE_BYTES_PER_RTT is a cap on how much the window size (max_window)
1868 may increase per RTT, we may not increase the window size more than that proportional
1869 to the number of bytes that were acked, so that once one window has been acked (one rtt)
1870 the increase limit is not exceeded
1871 the +1. is to allow for floating point imprecision */
1872 assert(scaled_gain <= 1. + MAX_CWND_INCREASE_BYTES_PER_RTT * (int)size_t_min(bytes_acked, us->max_window) / (double)size_t_max(us->max_window, bytes_acked));
1874 if (scaled_gain > 0 && g_current_ms - us->last_maxed_out_window > 300)
1875 /* if it was more than 300 milliseconds since we tried to send a packet
1876 and stopped because we hit the max window, we're most likely rate
1877 limited (which prevents us from ever hitting the window size)
1878 if this is the case, we cannot let the max_window grow indefinitely */
1879 scaled_gain = 0;
1881 if (scaled_gain + us->max_window < MIN_WINDOW_SIZE)
1882 us->max_window = MIN_WINDOW_SIZE;
1883 else
1884 us->max_window = (size_t)(us->max_window + scaled_gain);
1886 /* make sure that the congestion window is below max
1887 make sure that we don't shrink our window too small */
1888 us->max_window = size_t_clamp(us->max_window, MIN_WINDOW_SIZE, us->opt_sndbuf);
1890 /* used in parse_log.py */
1891 LOG_UTP("0x%08x: actual_delay:%u our_delay:%d their_delay:%u off_target:%d max_window:%u " "delay_base:%u delay_sum:%d target_delay:%d acked_bytes:%u cur_window:%u " "scaled_gain:%f rtt:%u rate:%u quota:%d wnduser:%u rto:%u timeout:%d get_microseconds:"I64u" " "cur_window_packets:%u packet_size:%u their_delay_base:%u their_actual_delay:%u", us, actual_delay, our_delay / 1000, dh_get_value(&us->their_hist) / 1000, (int)off_target / 1000, (uint)(us->max_window), us->our_hist.delay_base, (our_delay + dh_get_value(&us->their_hist)) / 1000, target / 1000, (uint)bytes_acked, (uint)(us->cur_window - bytes_acked), (float)(scaled_gain), us->rtt, (uint)(us->max_window * 1000 / (us->rtt_hist.delay_base?us->rtt_hist.delay_base:50)), us->send_quota / 100, (uint)us->max_window_user, us->rto, (int)(us->rto_timeout - g_current_ms), UTP_GetMicroseconds(), us->cur_window_packets, (uint)us_get_packet_size(us), us->their_hist.delay_base, us->their_hist.delay_base + dh_get_value(&us->their_hist));
1894 static void UTP_RegisterRecvPacket(struct UTPSocket *conn, size_t len)
1896 #ifdef _DEBUG
1897 ++conn->_stats._nrecv;
1898 conn->_stats._nbytes_recv += len;
1899 #else
1900 (void)conn;
1901 #endif
1903 if (len <= PACKET_SIZE_MID)
1904 if (len <= PACKET_SIZE_EMPTY)
1905 _global_stats._nraw_recv[PACKET_SIZE_EMPTY_BUCKET]++;
1906 else if (len <= PACKET_SIZE_SMALL)
1907 _global_stats._nraw_recv[PACKET_SIZE_SMALL_BUCKET]++;
1908 else
1909 _global_stats._nraw_recv[PACKET_SIZE_MID_BUCKET]++;
1910 else
1911 if (len <= PACKET_SIZE_BIG)
1912 _global_stats._nraw_recv[PACKET_SIZE_BIG_BUCKET]++;
1913 else
1914 _global_stats._nraw_recv[PACKET_SIZE_HUGE_BUCKET]++;
1917 /* Process an incoming packet
1918 syn is true if this is the first packet received. It will cut off parsing
1919 as soon as the header is done */
1920 /* XXX: syn default to false */
1921 static size_t UTP_ProcessIncoming(struct UTPSocket *conn, byte *packet, size_t len, bool syn)
1923 struct pf *pf;
1924 struct pf1 *pf1;
1925 byte *packet_end;
1926 uint16 pk_seq_nr;
1927 uint16 pk_ack_nr;
1928 uint8 pk_flags;
1929 uint64 time;
1930 byte *selack_ptr;
1931 byte *data;
1932 uint extension;
1933 uint seqnr;
1934 int acks;
1935 size_t acked_bytes;
1936 int64 min_rtt;
1937 int i;
1938 uint64 p;
1939 uint32 their_delay;
1940 uint32 prev_delay_base;
1941 uint32 actual_delay;
1942 byte *buf;
1943 byte *mem;
1945 UTP_RegisterRecvPacket(conn, len);
1947 g_current_ms = UTP_GetMilliseconds();
1949 us_update_send_quota(conn);
1951 pf = (struct pf*)packet;
1952 pf1 = (struct pf1*)packet;
1953 packet_end = packet + len;
1955 if (conn->version == 0) {
1956 pk_seq_nr = ntohs(pf->seq_nr);
1957 pk_ack_nr = ntohs(pf->ack_nr);
1958 pk_flags = pf->flags;
1959 } else {
1960 pk_seq_nr = ntohs(pf1->seq_nr);
1961 pk_ack_nr = ntohs(pf1->ack_nr);
1962 pk_flags = pf1_type(pf1);
1965 if (pk_flags >= ST_NUM_STATES)
1966 return 0;
1968 LOG_UTPV("0x%08x: Got %s. seq_nr:%u ack_nr:%u state:%s version:%u timestamp:"I64u" reply_micro:%u", conn, flagnames[pk_flags], pk_seq_nr, pk_ack_nr, statenames[conn->state], conn->version, conn->version == 0?(uint64)(ntohl(pf->tv_sec)) * 1000000 + ntohl(pf->tv_usec):(uint64)(ntohl(pf1->tv_usec)), conn->version == 0?(uint32)(ntohl(pf->reply_micro)):(uint32)(ntohl(pf1->reply_micro)));
1970 /* mark receipt time */
1971 time = UTP_GetMicroseconds();
1973 /* RSTs are handled earlier, since the connid matches the send id not the recv id */
1974 assert(pk_flags != ST_RESET);
1976 /* TODO: maybe send a ST_RESET if we're in CS_RESET? */
1978 selack_ptr = NULL;
1980 /* Unpack UTP packet options
1981 Data pointer */
1982 data = (byte*)pf + us_get_header_size(conn);
1983 if (us_get_header_size(conn) > len) {
1984 LOG_UTPV("0x%08x: Invalid packet size (less than header size)", conn);
1985 return 0;
1987 /* Skip the extension headers */
1988 extension = conn->version == 0 ? pf->ext : pf1->ext;
1989 if (extension != 0) {
1990 do {
1991 /* Verify that the packet is valid. */
1992 data += 2;
1994 if ((int)(packet_end - data) < 0 || (int)(packet_end - data) < data[-1]) {
1995 LOG_UTPV("0x%08x: Invalid len of extensions", conn);
1996 return 0;
1999 switch(extension) {
2000 case 1: /* Selective Acknowledgment */
2001 selack_ptr = data;
2002 break;
2003 case 2: /* extension bits */
2004 if (data[-1] != 8) {
2005 LOG_UTPV("0x%08x: Invalid len of extension bits header", conn);
2006 return 0;
2008 memcpy(conn->extensions, data, 8);
2009 LOG_UTPV("0x%08x: got extension bits:%02x%02x%02x%02x%02x%02x%02x%02x", conn, conn->extensions[0], conn->extensions[1], conn->extensions[2], conn->extensions[3], conn->extensions[4], conn->extensions[5], conn->extensions[6], conn->extensions[7]);
2011 extension = data[-2];
2012 data += data[-1];
2013 } while (extension);
2016 if (conn->state == CS_SYN_SENT)
2017 /* if this is a syn-ack, initialize our ack_nr
2018 to match the sequence number we got from
2019 the other end */
2020 conn->ack_nr = (pk_seq_nr - 1) & SEQ_NR_MASK;
2022 g_current_ms = UTP_GetMilliseconds();
2023 conn->last_got_packet = g_current_ms;
2025 if (syn)
2026 return 0;
2028 /* seqnr is the number of packets past the expected
2029 packet this is. ack_nr is the last acked, seq_nr is the
2030 current. Subtracring 1 makes 0 mean "this is the next
2031 expected packet". */
2032 seqnr = (pk_seq_nr - conn->ack_nr - 1) & SEQ_NR_MASK;
2034 /* Getting an invalid sequence number? */
2035 if (seqnr >= REORDER_BUFFER_MAX_SIZE) {
2036 if (seqnr >= (SEQ_NR_MASK + 1) - REORDER_BUFFER_MAX_SIZE && pk_flags != ST_STATE)
2037 conn->ack_time = g_current_ms + uint_min(conn->ack_time - g_current_ms, DELAYED_ACK_TIME_THRESHOLD);
2038 LOG_UTPV(" Got old Packet/Ack (%u/%u)=%u!", pk_seq_nr, conn->ack_nr, seqnr);
2039 return 0;
2042 /* Process acknowledgment
2043 acks is the number of packets that was acked */
2044 acks = (pk_ack_nr - (conn->seq_nr - 1 - conn->cur_window_packets)) & ACK_NR_MASK;
2046 /* this happens when we receive an old ack nr */
2047 if (acks > conn->cur_window_packets)
2048 acks = 0;
2050 /* if we get the same ack_nr as in the last packet
2051 increase the duplicate_ack counter, otherwise reset
2052 it to 0 */
2053 if (conn->cur_window_packets > 0) {
2054 if (pk_ack_nr == ((conn->seq_nr - conn->cur_window_packets - 1) & ACK_NR_MASK) && conn->cur_window_packets > 0) {
2055 /*++conn->duplicate_ack; */
2056 } else
2057 conn->duplicate_ack = 0;
2059 /* TODO: if duplicate_ack == DUPLICATE_ACK_BEFORE_RESEND
2060 and fast_resend_seq_nr <= ack_nr + 1
2061 resend ack_nr + 1 */
2064 /* figure out how many bytes were acked */
2065 acked_bytes = 0;
2067 /* the minimum rtt of all acks
2068 this is the upper limit on the delay we get back
2069 from the other peer. Our delay cannot exceed
2070 the rtt of the packet. If it does, clamp it.
2071 this is done in apply_ledbat_ccontrol() */
2072 min_rtt = INT64_MAX;
2074 for (i = 0; i < acks; ++i) {
2075 int seq;
2076 struct op *pkt;
2078 seq = conn->seq_nr - conn->cur_window_packets + i;
2079 pkt = (struct op*)scb_get(&conn->outbuf, seq);
2080 if (pkt == 0 || pkt->transmissions == 0)
2081 continue;
2082 assert((int)(pkt->payload) >= 0);
2083 acked_bytes += pkt->payload;
2084 min_rtt = int64_min(min_rtt, UTP_GetMicroseconds() - pkt->time_sent);
2087 /* count bytes acked by EACK */
2088 if (selack_ptr != NULL)
2089 acked_bytes += us_selective_ack_bytes(conn, (pk_ack_nr + 2) & ACK_NR_MASK, selack_ptr, selack_ptr[-1], &min_rtt);
2091 LOG_UTPV("0x%08x: acks:%d acked_bytes:%u seq_nr:%d cur_window:%u cur_window_packets:%u relative_seqnr:%u max_window:%u min_rtt:%u rtt:%u", conn, acks, (uint)acked_bytes, conn->seq_nr, (uint)conn->cur_window, conn->cur_window_packets, seqnr, (uint)conn->max_window, (uint)(min_rtt / 1000), conn->rtt);
2093 if (conn->version == 0)
2094 p = (uint64)ntohl(pf->tv_sec) * 1000000 + ntohl(pf->tv_usec);
2095 else
2096 p = ntohl(pf1->tv_usec);
2098 conn->last_measured_delay = g_current_ms;
2100 /* get delay in both directions
2101 record the delay to report back */
2102 their_delay = (uint32)(p == 0 ? 0 : time - p);
2103 conn->reply_micro = their_delay;
2104 prev_delay_base = conn->their_hist.delay_base;
2105 if (their_delay != 0)
2106 dh_add_sample(&conn->their_hist, their_delay);
2108 /* if their new delay base is less than their previous one
2109 we should shift our delay base in the other direction in order
2110 to take the clock skew into account */
2111 if (prev_delay_base != 0 && wrapping_compare_less(conn->their_hist.delay_base, prev_delay_base))
2112 /* never adjust more than 10 milliseconds */
2113 if (prev_delay_base - conn->their_hist.delay_base <= 10000)
2114 dh_shift(&conn->our_hist, prev_delay_base - conn->their_hist.delay_base);
2116 actual_delay = conn->version==0 ? (ntohl(pf->reply_micro)==INT_MAX?0:(uint32)(ntohl(pf->reply_micro))) : ((uint32)(ntohl(pf1->reply_micro))==INT_MAX?0:(uint32)(ntohl(pf1->reply_micro)));
2118 /* if the actual delay is 0, it means the other end
2119 hasn't received a sample from us yet, and doesn't
2120 know what it is. We can't update out history unless
2121 we have a true measured sample */
2122 prev_delay_base = conn->our_hist.delay_base;
2123 if (actual_delay != 0)
2124 dh_add_sample(&conn->our_hist, actual_delay);
2126 /* if our new delay base is less than our previous one
2127 we should shift the other end's delay base in the other
2128 direction in order to take the clock skew into account
2129 This is commented out because it creates bad interactions
2130 with our adjustment in the other direction. We don't really
2131 need our estimates of the other peer to be very accurate
2132 anyway. The problem with shifting here is that we're more
2133 likely shift it back later because of a low latency. This
2134 second shift back would cause us to shift our delay base
2135 which then get's into a death spiral of shifting delay bases */
2136 /* if (prev_delay_base != 0 &&
2137 wrapping_compare_less(conn->our_hist.delay_base, prev_delay_base)) {
2138 // never adjust more than 10 milliseconds
2139 if (prev_delay_base - conn->our_hist.delay_base <= 10000) {
2140 conn->their_hist.Shift(prev_delay_base - conn->our_hist.delay_base);
2145 /* if the delay estimate exceeds the RTT, adjust the base_delay to
2146 compensate */
2147 if (dh_get_value(&conn->our_hist) > (uint32)(min_rtt)) {
2148 dh_shift(&conn->our_hist, dh_get_value(&conn->our_hist) - min_rtt);
2151 /* only apply the congestion controller on acks
2152 if we don't have a delay measurement, there's
2153 no point in invoking the congestion control */
2154 if (actual_delay != 0 && acked_bytes >= 1)
2155 us_apply_ledbat_ccontrol(conn, acked_bytes, actual_delay, min_rtt);
2157 /* sanity check, the other end should never ack packets
2158 past the point we've sent */
2159 if (acks <= conn->cur_window_packets) {
2160 conn->max_window_user = conn->version == 0 ? pf->windowsize * PACKET_SIZE : ntohl(pf1->windowsize);
2162 /* If max user window is set to 0, then we startup a timer
2163 That will reset it to 1 after 15 seconds. */
2164 if (conn->max_window_user == 0)
2165 /* Reset max_window_user to 1 every 15 seconds. */
2166 conn->zerowindow_time = g_current_ms + 15000;
2168 /* Respond to connect message
2169 Switch to CONNECTED state. */
2170 if (conn->state == CS_SYN_SENT) {
2171 conn->state = CS_CONNECTED;
2172 conn->func.on_state(conn->userdata, UTP_STATE_CONNECT);
2174 /* We've sent a fin, and everything was ACKed (including the FIN),
2175 it's safe to destroy the socket. cur_window_packets == acks
2176 means that this packet acked all the remaining packets that
2177 were in-flight. */
2178 } else if (conn->state == CS_FIN_SENT && conn->cur_window_packets == acks)
2179 conn->state = CS_DESTROY;
2181 /* Update fast resend counter */
2182 if (wrapping_compare_less(conn->fast_resend_seq_nr, (pk_ack_nr + 1) & ACK_NR_MASK))
2183 conn->fast_resend_seq_nr = pk_ack_nr + 1;
2185 LOG_UTPV("0x%08x: fast_resend_seq_nr:%u", conn, conn->fast_resend_seq_nr);
2187 for (i = 0; i < acks; ++i) {
2188 int ack_status;
2190 ack_status = us_ack_packet(conn, conn->seq_nr - conn->cur_window_packets);
2191 /* if ack_status is 0, the packet was acked.
2192 if acl_stauts is 1, it means that the packet had already been acked
2193 if it's 2, the packet has not been sent yet
2194 We need to break this loop in the latter case. This could potentially
2195 happen if we get an ack_nr that does not exceed what we have stuffed
2196 into the outgoing buffer, but does exceed what we have sent */
2197 if (ack_status == 2) {
2198 #ifdef _DEBUG
2199 struct op* pkt;
2201 pkt = (struct op*)scb_get(&conn->outbuf, conn->seq_nr - conn->cur_window_packets);
2202 assert(pkt->transmissions == 0);
2203 #endif
2204 break;
2206 conn->cur_window_packets--;
2208 #ifdef _DEBUG
2209 if (conn->cur_window_packets == 0)
2210 assert(conn->cur_window == 0);
2211 #endif
2213 /* packets in front of this may have been acked by a
2214 selective ack (EACK). Keep decreasing the window packet size
2215 until we hit a packet that is still waiting to be acked
2216 in the send queue
2217 this is especially likely to happen when the other end
2218 has the EACK send bug older versions of uTP had */
2219 while (conn->cur_window_packets > 0 && !scb_get(&conn->outbuf, conn->seq_nr - conn->cur_window_packets))
2220 conn->cur_window_packets--;
2222 #ifdef _DEBUG
2223 if (conn->cur_window_packets == 0)
2224 assert(conn->cur_window == 0);
2225 #endif
2227 /* this invariant should always be true */
2228 assert(conn->cur_window_packets == 0 || scb_get(&conn->outbuf, conn->seq_nr - conn->cur_window_packets));
2230 /* flush Nagle */
2231 if (conn->cur_window_packets == 1) {
2232 struct op *pkt;
2234 pkt = (struct op*)scb_get(&conn->outbuf, conn->seq_nr - 1);
2235 /* do we still have quota? */
2236 if (pkt->transmissions == 0 && (!(USE_PACKET_PACING) || conn->send_quota / 100 >= (int32)(pkt->length))) {
2237 us_send_packet(conn, pkt);
2239 /* No need to send another ack if there is nothing to reorder. */
2240 if (conn->reorder_count == 0)
2241 us_sent_ack(conn);
2245 /* Fast timeout-retry */
2246 if (conn->fast_timeout) {
2247 LOG_UTPV("Fast timeout %u,%u,%u?", (uint)conn->cur_window, conn->seq_nr - conn->timeout_seq_nr, conn->timeout_seq_nr);
2248 /* if the fast_resend_seq_nr is not pointing to the oldest outstanding packet, it suggests that we've already
2249 resent the packet that timed out, and we should leave the fast-timeout mode. */
2250 if (((conn->seq_nr - conn->cur_window_packets) & ACK_NR_MASK) != conn->fast_resend_seq_nr)
2251 conn->fast_timeout = false;
2252 else {
2253 struct op *pkt;
2254 /* resend the oldest packet and increment fast_resend_seq_nr
2255 to not allow another fast resend on it again */
2256 pkt = (struct op*)scb_get(&conn->outbuf, conn->seq_nr - conn->cur_window_packets);
2257 if (pkt && pkt->transmissions > 0) {
2258 LOG_UTPV("0x%08x: Packet %u fast timeout-retry.", conn, conn->seq_nr - conn->cur_window_packets);
2259 #ifdef _DEBUG
2260 ++conn->_stats._fastrexmit;
2261 #endif
2262 conn->fast_resend_seq_nr++;
2263 us_send_packet(conn, pkt);
2269 /* Process selective acknowledgent */
2270 if (selack_ptr != NULL)
2271 us_selective_ack(conn, pk_ack_nr + 2, selack_ptr, selack_ptr[-1]);
2273 /* this invariant should always be true */
2274 assert(conn->cur_window_packets == 0 || scb_get(&conn->outbuf, conn->seq_nr - conn->cur_window_packets));
2276 LOG_UTPV("0x%08x: acks:%d acked_bytes:%u seq_nr:%u cur_window:%u cur_window_packets:%u quota:%d", conn, acks, (uint)acked_bytes, conn->seq_nr, (uint)conn->cur_window, conn->cur_window_packets, conn->send_quota / 100);
2278 /* In case the ack dropped the current window below
2279 the max_window size, Mark the socket as writable */
2280 if (conn->state == CS_CONNECTED_FULL && us_is_writable(conn, us_get_packet_size(conn))) {
2281 conn->state = CS_CONNECTED;
2282 LOG_UTPV("0x%08x: Socket writable. max_window:%u cur_window:%u quota:%d packet_size:%u", conn, (uint)conn->max_window, (uint)conn->cur_window, conn->send_quota / 100, (uint)us_get_packet_size(conn));
2283 conn->func.on_state(conn->userdata, UTP_STATE_WRITABLE);
2286 if (pk_flags == ST_STATE)
2287 /* This is a state packet only. */
2288 return 0;
2290 /* The connection is not in a state that can accept data? */
2291 if (conn->state != CS_CONNECTED && conn->state != CS_CONNECTED_FULL && conn->state != CS_FIN_SENT)
2292 return 0;
2294 /* Is this a finalize packet? */
2295 if (pk_flags == ST_FIN && !conn->got_fin) {
2296 LOG_UTPV("Got FIN eof_pkt:%u", pk_seq_nr);
2297 conn->got_fin = true;
2298 conn->eof_pkt = pk_seq_nr;
2299 /* at this point, it is possible for the
2300 other end to have sent packets with
2301 sequence numbers higher than seq_nr.
2302 if this is the case, our reorder_count
2303 is out of sync. This case is dealt with
2304 when we re-order and hit the eof_pkt.
2305 we'll just ignore any packets with
2306 sequence numbers past this */
2309 /* Getting an in-order packet? */
2310 if (seqnr == 0) {
2311 size_t count;
2313 count = packet_end - data;
2314 if (count > 0 && conn->state != CS_FIN_SENT) {
2315 LOG_UTPV("0x%08x: Got Data len:%u (rb:%u)", conn, (uint)count, (uint)conn->func.get_rb_size(conn->userdata));
2316 /* Post bytes to the upper layer */
2317 conn->func.on_read(conn->userdata, data, count);
2319 conn->ack_nr++;
2320 conn->bytes_since_ack += count;
2322 /* Check if the next packet has been received too, but waiting
2323 in the reorder buffer. */
2324 for (;;) {
2325 if (conn->got_fin && conn->eof_pkt == conn->ack_nr) {
2326 if (conn->state != CS_FIN_SENT) {
2327 conn->state = CS_GOT_FIN;
2328 conn->rto_timeout = g_current_ms + uint_min(conn->rto * 3, 60);
2330 LOG_UTPV("0x%08x: Posting EOF", conn);
2331 conn->func.on_state(conn->userdata, UTP_STATE_EOF);
2334 /* if the other end wants to close, ack immediately */
2335 us_send_ack(conn, false);
2337 /* reorder_count is not necessarily 0 at this point.
2338 even though it is most of the time, the other end
2339 may have sent packets with higher sequence numbers
2340 than what later end up being eof_pkt
2341 since we have received all packets up to eof_pkt
2342 just ignore the ones after it. */
2343 conn->reorder_count = 0;
2346 /* Quick get-out in case there is nothing to reorder */
2347 if (conn->reorder_count == 0)
2348 break;
2350 /* Check if there are additional buffers in the reorder buffers
2351 that need delivery. */
2352 buf = (byte*)scb_get(&conn->inbuf, conn->ack_nr+1);
2353 if (buf == NULL)
2354 break;
2355 scb_put(&conn->inbuf, conn->ack_nr+1, NULL);
2356 count = *(uint*)buf;
2357 if (count > 0 && conn->state != CS_FIN_SENT)
2358 /* Pass the bytes to the upper layer */
2359 conn->func.on_read(conn->userdata, buf + sizeof(uint), count);
2360 conn->ack_nr++;
2361 conn->bytes_since_ack += count;
2363 /* Free the element from the reorder buffer */
2364 free(buf);
2365 assert(conn->reorder_count > 0);
2366 conn->reorder_count--;
2369 /* start the delayed ACK timer */
2370 conn->ack_time = g_current_ms + uint_min(conn->ack_time - g_current_ms, DELAYED_ACK_TIME_THRESHOLD);
2371 } else {
2372 /* Getting an out of order packet.
2373 The packet needs to be remembered and rearranged later. */
2375 /* if we have received a FIN packet, and the EOF-sequence number
2376 is lower than the sequence number of the packet we just received
2377 something is wrong. */
2378 if (conn->got_fin && pk_seq_nr > conn->eof_pkt) {
2379 LOG_UTPV("0x%08x: Got an invalid packet sequence number, past EOF " "reorder_count:%u len:%u (rb:%u)", conn, conn->reorder_count, (uint)(packet_end - data), (uint)conn->func.get_rb_size(conn->userdata));
2380 return 0;
2383 /* if the sequence number is entirely off the expected
2384 one, just drop it. We can't allocate buffer space in
2385 the inbuf entirely based on untrusted input */
2386 if (seqnr > 0x3ff) {
2387 LOG_UTPV("0x%08x: Got an invalid packet sequence number, too far off " "reorder_count:%u len:%u (rb:%u)", conn, conn->reorder_count, (uint)(packet_end - data), (uint)conn->func.get_rb_size(conn->userdata));
2388 return 0;
2391 /* we need to grow the circle buffer before we
2392 check if the packet is already in here, so that
2393 we don't end up looking at an older packet (since
2394 the indices wraps around). */
2395 scb_ensure_size(&conn->inbuf, pk_seq_nr + 1, seqnr + 1);
2397 /* Has this packet already been received? (i.e. a duplicate)
2398 If that is the case, just discard it. */
2399 if (scb_get(&conn->inbuf, pk_seq_nr) != NULL) {
2400 #ifdef _DEBUG
2401 ++conn->_stats._nduprecv;
2402 #endif
2403 return 0;
2406 /* Allocate memory to fit the packet that needs to re-ordered */
2407 mem = malloc((packet_end - data) + sizeof(uint));
2408 *(uint*)mem = (uint)(packet_end - data);
2409 memcpy(mem + sizeof(uint), data, packet_end - data);
2411 /* Insert into reorder buffer and increment the count
2412 of # of packets to be reordered.
2413 we add one to seqnr in order to leave the last
2414 entry empty, that way the assert in send_ack
2415 is valid. we have to add one to seqnr too, in order
2416 to make the circular buffer grow around the correct
2417 point (which is conn->ack_nr + 1). */
2418 assert(scb_get(&conn->inbuf, pk_seq_nr) == NULL);
2419 assert((pk_seq_nr & conn->inbuf.mask) != ((conn->ack_nr+1) & conn->inbuf.mask));
2420 scb_put(&conn->inbuf, pk_seq_nr, mem);
2421 conn->reorder_count++;
2423 LOG_UTPV("0x%08x: Got out of order data reorder_count:%u len:%u (rb:%u)", conn, conn->reorder_count, (uint)(packet_end - data), (uint)conn->func.get_rb_size(conn->userdata));
2425 /* Setup so the partial ACK message will get sent immediately. */
2426 conn->ack_time = g_current_ms + uint_min(conn->ack_time - g_current_ms, 1);
2429 /* If ack_time or ack_bytes indicate that we need to send and ack, send one
2430 here instead of waiting for the timer to trigger */
2431 LOG_UTPV("bytes_since_ack:%u ack_time:%d", (uint)conn->bytes_since_ack, (int)(g_current_ms - conn->ack_time));
2432 if (conn->state == CS_CONNECTED || conn->state == CS_CONNECTED_FULL)
2433 if (conn->bytes_since_ack > DELAYED_ACK_BYTE_THRESHOLD || (int)(g_current_ms - conn->ack_time) >= 0)
2434 us_send_ack(conn, false);
2435 return (size_t)(packet_end - data);
2438 static __inline__ bool UTP_IsV1(struct pf1 *pf1)
2440 return pf1_version(pf1) == 1 && pf1_type(pf1) < ST_NUM_STATES && pf1->ext < 3;
2443 static void UTP_Free(struct UTPSocket *conn)
2445 struct UTPSocket **last_ptr;
2446 struct UTPSocket *last;
2447 size_t i;
2449 LOG_UTPV("0x%08x: Killing socket", conn);
2451 conn->func.on_state(conn->userdata, UTP_STATE_DESTROYING);
2452 UTP_SetCallbacks(conn, NULL, NULL);
2454 assert(conn->idx < ar_GetCount(&g_utp_sockets));
2455 assert({last_ptr = ar_get(&g_utp_sockets, conn->idx); *last_ptr == conn;});
2457 /* Unlink object from the global list */
2458 assert(ar_GetCount(&g_utp_sockets) > 0);
2460 last_ptr = ar_get(&g_utp_sockets, ar_GetCount(&g_utp_sockets) - 1);
2461 last = *last_ptr;
2463 assert(last->idx < ar_GetCount(&g_utp_sockets));
2464 assert({last_ptr = ar_get(&g_utp_sockets, last->idx); *last_ptr == last;});
2466 last->idx = conn->idx;
2468 memcpy(ar_get(&g_utp_sockets, conn->idx), &last, sizeof(last));
2470 /* Decrease the count */
2471 ar_SetCount(&g_utp_sockets, ar_GetCount(&g_utp_sockets) - 1);
2473 /* Free all memory occupied by the socket object. */
2474 for (i = 0; i <= conn->inbuf.mask; i++) {
2475 free(conn->inbuf.elements[i]);
2477 for (i = 0; i <= conn->outbuf.mask; i++) {
2478 free(conn->outbuf.elements[i]);
2480 free(conn->inbuf.elements);
2481 free(conn->outbuf.elements);
2483 /* Finally free the socket object */
2484 free(conn);
2487 /******************************************************************************/
2488 /******************************************************************************/
2489 /******************************************************************************/
2490 /******************************************************************************/
2491 /* public functions */
2492 /******************************************************************************/
2493 /******************************************************************************/
2494 /******************************************************************************/
2495 /******************************************************************************/
2497 /* Create a UTP socket */
2498 struct UTPSocket *UTP_Create(SendToProc *send_to_proc, void *send_to_userdata, const struct sockaddr *addr, socklen_t addrlen)
2500 struct UTPSocket *conn;
2502 conn = calloc(1, sizeof(*conn));
2504 g_current_ms = UTP_GetMilliseconds();
2506 UTP_SetCallbacks(conn, NULL, NULL);
2507 dh_clear(&conn->our_hist);
2508 dh_clear(&conn->their_hist);
2509 conn->rto = 3000;
2510 conn->rtt_var = 800;
2511 conn->seq_nr = 1;
2512 conn->ack_nr = 0;
2513 conn->max_window_user = 255 * PACKET_SIZE;
2514 psa_init(&conn->addr, (SOCKADDR_STORAGE*)addr, addrlen);
2515 conn->send_to_proc = send_to_proc;
2516 conn->send_to_userdata = send_to_userdata;
2517 conn->ack_time = g_current_ms + 0x70000000;
2518 conn->last_got_packet = g_current_ms;
2519 conn->last_sent_packet = g_current_ms;
2520 conn->last_measured_delay = g_current_ms + 0x70000000;
2521 conn->last_rwin_decay = (int32)(g_current_ms) - MAX_WINDOW_DECAY;
2522 conn->last_send_quota = g_current_ms;
2523 conn->send_quota = PACKET_SIZE * 100;
2524 conn->cur_window_packets = 0;
2525 conn->fast_resend_seq_nr = conn->seq_nr;
2527 /* default to version 1 */
2528 UTP_SetSockopt(conn, SO_UTPVERSION, 1);
2530 /* we need to fit one packet in the window
2531 when we start the connection */
2532 conn->max_window = us_get_packet_size(conn);
2533 conn->state = CS_IDLE;
2535 conn->outbuf.mask = 15;
2536 conn->inbuf.mask = 15;
2538 conn->outbuf.elements = calloc(16, sizeof(void*));
2539 conn->inbuf.elements = calloc(16, sizeof(void*));
2541 conn->idx = ar_Append(&g_utp_sockets, &conn);
2543 LOG_UTPV("0x%08x: UTP_Create", conn);
2544 return conn;
2547 void UTP_SetCallbacks(struct UTPSocket *conn, struct UTPFunctionTable *funcs, void *userdata)
2549 assert(conn);
2551 if (funcs == NULL)
2552 funcs = &zero_funcs;
2553 conn->func = *funcs;
2554 conn->userdata = userdata;
2557 bool UTP_SetSockopt(struct UTPSocket* conn, int opt, int val)
2559 assert(conn);
2561 switch (opt) {
2562 case SO_SNDBUF:
2563 assert(val >= 1);
2564 conn->opt_sndbuf = val;
2565 return true;
2566 case SO_RCVBUF:
2567 conn->opt_rcvbuf = val;
2568 return true;
2569 case SO_UTPVERSION:
2570 assert(conn->state == CS_IDLE);
2571 if (conn->state != CS_IDLE)
2572 /* too late */
2573 return false;
2574 if (conn->version == 1 && val == 0) {
2575 conn->reply_micro = INT_MAX;
2576 conn->opt_rcvbuf = 200 * 1024;
2577 conn->opt_sndbuf = OUTGOING_BUFFER_MAX_SIZE * PACKET_SIZE;
2578 } else if (conn->version == 0 && val == 1) {
2579 conn->reply_micro = 0;
2580 conn->opt_rcvbuf = 3 * 1024 * 1024 + 512 * 1024;
2581 conn->opt_sndbuf = conn->opt_rcvbuf;
2583 conn->version = val;
2584 return true;
2586 return false;
2589 /* Try to connect to a specified host.
2590 'initial' is the number of data bytes to send in the connect packet. */
2591 void UTP_Connect(struct UTPSocket *conn)
2593 uint32 conn_seed;
2594 size_t header_ext_size;
2595 struct op *pkt;
2596 struct pfe* p;
2597 struct pfe1* p1;
2599 assert(conn);
2601 assert(conn->state == CS_IDLE);
2602 assert(conn->cur_window_packets == 0);
2603 assert(scb_get(&conn->outbuf, conn->seq_nr) == NULL);
2604 assert(sizeof(struct pf1) == 20);
2606 conn->state = CS_SYN_SENT;
2608 g_current_ms = UTP_GetMilliseconds();
2610 /* Create and send a connect message */
2611 conn_seed = UTP_Random();
2613 /* we identify newer versions by setting the
2614 first two bytes to 0x0001 */
2615 if (conn->version > 0)
2616 conn_seed &= 0xffff;
2618 /* used in parse_log.py */
2619 LOG_UTP("0x%08x: UTP_Connect conn_seed:%u packet_size:%u (B) " "target_delay:%u (ms) delay_history:%u " "delay_base_history:%u (minutes)", conn, conn_seed, PACKET_SIZE, CCONTROL_TARGET / 1000, CUR_DELAY_SIZE, DELAY_BASE_HISTORY);
2621 /* Setup initial timeout timer. */
2622 conn->retransmit_timeout = 3000;
2623 conn->rto_timeout = g_current_ms + conn->retransmit_timeout;
2624 conn->last_rcv_win = us_get_rcv_window(conn);
2626 conn->conn_seed = conn_seed;
2627 conn->conn_id_recv = conn_seed;
2628 conn->conn_id_send = conn_seed+1;
2629 /* if you need compatibiltiy with 1.8.1, use this. it increases attackability though.
2630 conn->seq_nr = 1; */
2631 conn->seq_nr = UTP_Random();
2633 /* Create the connect packet. */
2634 header_ext_size = us_get_header_extensions_size(conn);
2636 pkt = malloc(sizeof(struct op) - 1 + header_ext_size);
2638 p = (struct pfe*)pkt->data;
2639 p1 = (struct pfe1*)pkt->data;
2641 memset(p, 0, header_ext_size);
2642 /* SYN packets are special, and have the receive ID in the connid field,
2643 instead of conn_id_send. */
2644 if (conn->version == 0) {
2645 p->pf.connid = htonl(conn->conn_id_recv);
2646 p->pf.ext = 2;
2647 p->pf.windowsize = (byte)DIV_ROUND_UP(conn->last_rcv_win, PACKET_SIZE);
2648 p->pf.seq_nr = htons(conn->seq_nr);
2649 p->pf.flags = ST_SYN;
2650 p->ext_next = 0;
2651 p->ext_len = 8;
2652 memset(p->extensions, 0, 8);
2653 } else {
2654 pf1_version_set(&p1->pf, 1);
2655 pf1_type_set(&p1->pf, ST_SYN);
2656 p1->pf.ext = 2;
2657 p1->pf.connid = htons((uint16)conn->conn_id_recv);
2658 p1->pf.windowsize = htonl((uint32)conn->last_rcv_win);
2659 p1->pf.seq_nr = htons(conn->seq_nr);
2660 p1->ext_next = 0;
2661 p1->ext_len = 8;
2662 memset(p1->extensions, 0, 8);
2664 pkt->transmissions = 0;
2665 pkt->length = header_ext_size;
2666 pkt->payload = 0;
2668 /*LOG_UTPV("0x%08x: Sending connect %s [%u].",
2669 conn, addrfmt(conn->addr, addrbuf), conn_seed); */
2671 /* Remember the message in the outgoing queue. */
2672 scb_ensure_size(&conn->outbuf, conn->seq_nr, conn->cur_window_packets);
2673 scb_put(&conn->outbuf, conn->seq_nr, pkt);
2674 conn->seq_nr++;
2675 conn->cur_window_packets++;
2677 us_send_packet(conn, pkt);
2680 bool UTP_IsIncomingUTP(UTPGotIncomingConnection *incoming_proc, SendToProc *send_to_proc, void *send_to_userdata, const byte *buffer, size_t len, const struct sockaddr *to, socklen_t tolen)
2682 struct psa addr;
2683 struct pf* p;
2684 struct pf1* p1;
2685 byte version;
2686 uint32 id;
2687 struct pf *pf;
2688 struct pf1 *pf1;
2689 byte flags;
2690 size_t i;
2691 uint32 seq_nr;
2693 psa_init(&addr, (SOCKADDR_STORAGE*)to, tolen);
2695 if (len < sizeof(struct pf) && len < sizeof(struct pf1)) {
2696 LOG_UTPV("recv %s len:%u too small", addrfmt(&addr, addrbuf), (uint)len);
2697 return false;
2700 p = (struct pf*)buffer;
2701 p1 = (struct pf1*)buffer;
2703 version = UTP_IsV1(p1);
2704 id = (version == 0) ? ntohl(p->connid) : (uint32)(ntohs(p1->connid));
2706 if (version == 0 && len < sizeof(struct pf)) {
2707 LOG_UTPV("recv %s len:%u version:%u too small", addrfmt(&addr, addrbuf), (uint)len, version);
2708 return false;
2711 if (version == 1 && len < sizeof(struct pf1)) {
2712 LOG_UTPV("recv %s len:%u version:%u too small", addrfmt(&addr, addrbuf), (uint)len, version);
2713 return false;
2716 LOG_UTPV("recv %s len:%u id:%u", addrfmt(&addr, addrbuf), (uint)len, id);
2718 pf = (struct pf*)p;
2719 pf1 = (struct pf1*)p;
2721 if (version == 0) {
2722 LOG_UTPV("recv id:%u seq_nr:%u ack_nr:%u", id, (uint)ntohs(pf->seq_nr), (uint)ntohs(pf->ack_nr));
2723 } else {
2724 LOG_UTPV("recv id:%u seq_nr:%u ack_nr:%u", id, (uint)ntohs(pf1->seq_nr), (uint)ntohs(pf1->ack_nr));
2727 flags = version == 0 ? pf->flags : pf1_type(pf1);
2729 for (i = 0; i < ar_GetCount(&g_utp_sockets); i++) {
2730 struct UTPSocket **conn_ptr;
2731 struct UTPSocket *conn;
2733 conn_ptr = ar_get(&g_utp_sockets, i);
2734 conn = *conn_ptr;
2735 /*LOG_UTPV("Examining UTPSocket %s for %s and (seed:%u s:%u r:%u) for %u",
2736 addrfmt(conn->addr, addrbuf), addrfmt(addr, addrbuf2), conn->conn_seed, conn->conn_id_send, conn->conn_id_recv, id); */
2737 if (psa_is_not_equal(&conn->addr, &addr))
2738 continue;
2740 if (flags == ST_RESET && (conn->conn_id_send == id || conn->conn_id_recv == id)) {
2741 LOG_UTPV("0x%08x: recv RST for existing connection", conn);
2742 if (!conn->userdata || conn->state == CS_FIN_SENT)
2743 conn->state = CS_DESTROY;
2744 else
2745 conn->state = CS_RESET;
2746 if (conn->userdata) {
2747 int err;
2749 conn->func.on_overhead(conn->userdata, false, len + us_get_udp_overhead(conn), close_overhead);
2750 err = conn->state == CS_SYN_SENT ? ECONNREFUSED : ECONNRESET;
2751 conn->func.on_error(conn->userdata, err);
2753 return true;
2754 } else if (flags != ST_SYN && conn->conn_id_recv == id) {
2755 size_t read;
2757 LOG_UTPV("0x%08x: recv processing", conn);
2758 read = UTP_ProcessIncoming(conn, (byte*)buffer, len, false);
2759 if (conn->userdata)
2760 conn->func.on_overhead(conn->userdata, false, (len - read) + us_get_udp_overhead(conn), header_overhead);
2761 return true;
2765 if (flags == ST_RESET) {
2766 LOG_UTPV("recv RST for unknown connection");
2767 return true;
2770 seq_nr = version == 0 ? ntohs(pf->seq_nr) : ntohs(pf1->seq_nr);
2771 if (flags != ST_SYN) {
2772 size_t i;
2773 struct RST_Info *r;
2775 for (i = 0; i < ar_GetCount(&g_rst_info); i++) {
2776 struct RST_Info *cur;
2778 cur = ar_get(&g_rst_info,i);
2780 if (cur->connid != id)
2781 continue;
2782 if (psa_is_not_equal(&cur->addr, &addr))
2783 continue;
2784 if (seq_nr != cur->ack_nr)
2785 continue;
2786 cur->timestamp = UTP_GetMilliseconds();
2787 LOG_UTPV("recv not sending RST to non-SYN (stored)");
2788 return true;
2790 if (ar_GetCount(&g_rst_info) > RST_INFO_LIMIT) {
2791 LOG_UTPV("recv not sending RST to non-SYN (limit at %u stored)", (uint)ar_GetCount(&g_rst_info));
2792 return true;
2794 LOG_UTPV("recv send RST to non-SYN (%u stored)", (uint)ar_GetCount(&g_rst_info));
2795 r = ar_Append_new(&g_rst_info);
2796 r->addr = addr;
2797 r->connid = id;
2798 r->ack_nr = seq_nr;
2799 r->timestamp = UTP_GetMilliseconds();
2801 us_send_rst(send_to_proc, send_to_userdata, &addr, id, seq_nr, UTP_Random(), version);
2802 return true;
2805 if (incoming_proc) {
2806 struct UTPSocket *conn;
2807 size_t read;
2809 LOG_UTPV("Incoming connection from %s uTP version:%u", addrfmt(&addr, addrbuf), version);
2811 /* Create a new UTP socket to handle this new connection */
2812 conn = UTP_Create(send_to_proc, send_to_userdata, to, tolen);
2813 /* Need to track this value to be able to detect duplicate CONNECTs */
2814 conn->conn_seed = id;
2815 /* This is value that identifies this connection for them. */
2816 conn->conn_id_send = id;
2817 /* This is value that identifies this connection for us. */
2818 conn->conn_id_recv = id+1;
2819 conn->ack_nr = seq_nr;
2820 conn->seq_nr = UTP_Random();
2821 conn->fast_resend_seq_nr = conn->seq_nr;
2823 UTP_SetSockopt(conn, SO_UTPVERSION, version);
2824 conn->state = CS_CONNECTED;
2826 read = UTP_ProcessIncoming(conn, (byte*)buffer, len, true);
2828 LOG_UTPV("0x%08x: recv send connect ACK", conn);
2829 us_send_ack(conn, true);
2831 incoming_proc(send_to_userdata, conn);
2833 /* we report overhead after incoming_proc, because the callbacks are setup now */
2834 if (conn->userdata) {
2835 /* SYN */
2836 conn->func.on_overhead(conn->userdata, false, (len - read) + us_get_udp_overhead(conn), header_overhead);
2837 /* SYNACK */
2838 conn->func.on_overhead(conn->userdata, true, us_get_overhead(conn), ack_overhead);
2841 return true;
2844 bool UTP_HandleICMP(const byte* buffer, size_t len, const struct sockaddr *to, socklen_t tolen)
2846 struct psa addr;
2847 struct pf* p;
2848 struct pf1* p1;
2849 byte version;
2850 uint32 id;
2851 size_t i;
2853 psa_init(&addr, (SOCKADDR_STORAGE*)to, tolen);
2855 /* Want the whole packet so we have connection ID */
2856 if (len < sizeof(struct pf))
2857 return false;
2859 p = (struct pf*)buffer;
2860 p1 = (struct pf1*)buffer;
2862 version = UTP_IsV1(p1);
2863 id = (version == 0) ? ntohl(p->connid) : (uint32)(ntohs(p1->connid));
2865 for (i = 0; i < ar_GetCount(&g_utp_sockets); ++i) {
2866 struct UTPSocket **conn_ptr;
2867 struct UTPSocket *conn;
2869 conn_ptr = ar_get(&g_utp_sockets,i);
2870 conn = *conn_ptr;
2871 if (psa_is_equal(&conn->addr,&addr) && conn->conn_id_recv == id) {
2872 /* Don't pass on errors for idle/closed connections */
2873 if (conn->state != CS_IDLE) {
2874 if (!conn->userdata || conn->state == CS_FIN_SENT) {
2875 LOG_UTPV("0x%08x: icmp packet causing socket destruction", conn);
2876 conn->state = CS_DESTROY;
2877 } else
2878 conn->state = CS_RESET;
2879 if (conn->userdata) {
2880 int err;
2882 err = conn->state == CS_SYN_SENT ? ECONNREFUSED : ECONNRESET;
2883 LOG_UTPV("0x%08x: icmp packet causing error on socket:%d", conn, err);
2884 conn->func.on_error(conn->userdata, err);
2887 return true;
2890 return false;
2893 /* Write bytes to the UTP socket.
2894 Returns true if the socket is still writable. */
2895 bool UTP_Write(struct UTPSocket *conn, size_t bytes)
2897 size_t packet_size;
2898 size_t num_to_send;
2899 #ifdef g_log_utp_verbose
2900 size_t param;
2901 #endif
2902 assert(conn);
2904 #ifdef g_log_utp_verbose
2905 param = bytes;
2906 #endif
2908 if (conn->state != CS_CONNECTED) {
2909 LOG_UTPV("0x%08x: UTP_Write %u bytes = false (not CS_CONNECTED)", conn, (uint)bytes);
2910 return false;
2913 g_current_ms = UTP_GetMilliseconds();
2915 us_update_send_quota(conn);
2917 /* don't send unless it will all fit in the window */
2918 packet_size = us_get_packet_size(conn);
2919 num_to_send = size_t_min(bytes, packet_size);
2920 while (us_is_writable(conn, num_to_send)) {
2921 /* Send an outgoing packet.
2922 Also add it to the outgoing of packets that have been sent but not ACKed. */
2924 if (num_to_send == 0) {
2925 LOG_UTPV("0x%08x: UTP_Write %u bytes = true", conn, (uint)param);
2926 return true;
2928 bytes -= num_to_send;
2930 LOG_UTPV("0x%08x: Sending packet. seq_nr:%u ack_nr:%u wnd:%u/%u/%u rcv_win:%u size:%u quota:%d cur_window_packets:%u", conn, conn->seq_nr, conn->ack_nr, (uint)(conn->cur_window + num_to_send), (uint)conn->max_window, (uint)conn->max_window_user, (uint)conn->last_rcv_win, num_to_send, conn->send_quota / 100, conn->cur_window_packets);
2931 us_write_outgoing_packet(conn, num_to_send, ST_DATA);
2932 num_to_send = size_t_min(bytes, packet_size);
2935 /* mark the socket as not being writable. */
2936 conn->state = CS_CONNECTED_FULL;
2937 LOG_UTPV("0x%08x: UTP_Write %u bytes = false", conn, (uint)bytes);
2938 return false;
2941 void UTP_RBDrained(struct UTPSocket *conn)
2943 size_t rcvwin;
2945 assert(conn);
2947 rcvwin = us_get_rcv_window(conn);
2949 if (rcvwin > conn->last_rcv_win) {
2950 /* If last window was 0 send ACK immediately, otherwise should set timer */
2951 if (conn->last_rcv_win == 0)
2952 us_send_ack(conn, false);
2953 else
2954 conn->ack_time = g_current_ms + uint_min(conn->ack_time - g_current_ms, DELAYED_ACK_TIME_THRESHOLD);
2958 void UTP_CheckTimeouts(void)
2960 size_t i;
2962 g_current_ms = UTP_GetMilliseconds();
2964 for (i = 0; i < ar_GetCount(&g_rst_info); i++) {
2965 struct RST_Info *cur;
2967 cur = ar_get(&g_rst_info, i);
2968 if ((int)(g_current_ms - cur->timestamp) >= RST_INFO_TIMEOUT) {
2969 ar_MoveUpLast(&g_rst_info, i);
2970 i--;
2973 if (ar_GetCount(&g_rst_info) != ar_GetAlloc(&g_rst_info))
2974 ar_Compact(&g_rst_info);
2976 for (i = 0; i != ar_GetCount(&g_utp_sockets); i++) {
2977 struct UTPSocket **conn_ptr;
2978 struct UTPSocket *conn;
2980 conn_ptr = ar_get(&g_utp_sockets, i);
2981 conn = *conn_ptr;
2982 us_check_timeouts(conn);
2984 /* Check if the object was deleted */
2985 if (conn->state == CS_DESTROY) {
2986 LOG_UTPV("0x%08x: Destroying", conn);
2987 UTP_Free(conn);
2988 i--;
2993 size_t UTP_GetPacketSize(struct UTPSocket *socket)
2995 return us_get_packet_size(socket);
2998 void UTP_GetPeerName(struct UTPSocket *conn, struct sockaddr *addr, socklen_t *addrlen)
3000 socklen_t len;
3001 SOCKADDR_STORAGE sa;
3003 assert(conn);
3005 sa = psa_get_sockaddr_storage(&conn->addr, &len);
3006 *addrlen = socklen_t_min(len, *addrlen);
3007 memcpy(addr, &sa, *addrlen);
3010 void UTP_GetDelays(struct UTPSocket *conn, int32 *ours, int32 *theirs, uint32 *age)
3012 assert(conn);
3014 if (ours)
3015 *ours = dh_get_value(&conn->our_hist);
3016 if (theirs)
3017 *theirs = dh_get_value(&conn->their_hist);
3018 if (age)
3019 *age = g_current_ms - conn->last_measured_delay;
3022 #ifdef _DEBUG
3023 void UTP_GetStats(struct UTPSocket *conn, struct UTPStats *stats)
3025 assert(conn);
3027 *stats = conn->_stats;
3029 #endif /* _DEBUG */
3031 void UTP_GetGlobalStats(struct UTPGlobalStats *stats)
3033 *stats = _global_stats;
3036 /* Close the UTP socket.
3037 It is not valid for the upper layer to refer to socket after it is closed.
3038 Data will keep to try being delivered after the close. */
3039 void UTP_Close(struct UTPSocket *conn)
3041 assert(conn);
3043 assert(conn->state != CS_DESTROY_DELAY && conn->state != CS_FIN_SENT && conn->state != CS_DESTROY);
3045 LOG_UTPV("0x%08x: UTP_Close in state:%s", conn, statenames[conn->state]);
3047 switch(conn->state) {
3048 case CS_CONNECTED:
3049 case CS_CONNECTED_FULL:
3050 conn->state = CS_FIN_SENT;
3051 us_write_outgoing_packet(conn, 0, ST_FIN);
3052 break;
3054 case CS_SYN_SENT:
3055 conn->rto_timeout = UTP_GetMilliseconds() + uint_min(conn->rto * 2, 60);
3056 case CS_GOT_FIN:
3057 conn->state = CS_DESTROY_DELAY;
3058 break;
3060 default:
3061 conn->state = CS_DESTROY;
3062 break;