metrics: Use N_EWMA for moving avg, with N=100.
[tor.git] / src / core / or / congestion_control_flow.c
blob90b1927ef9fe22dee57f1c9606cfdc746f1e7ba2
1 /* Copyright (c) 2019-2021, The Tor Project, Inc. */
2 /* See LICENSE for licensing information */
4 /**
5 * \file congestion_control_flow.c
6 * \brief Code that implements flow control for congestion controlled
7 * circuits.
8 */
10 #define TOR_CONGESTION_CONTROL_FLOW_PRIVATE
12 #include "core/or/or.h"
14 #include "core/or/relay.h"
15 #include "core/mainloop/connection.h"
16 #include "core/or/connection_edge.h"
17 #include "core/mainloop/mainloop.h"
18 #include "core/or/congestion_control_common.h"
19 #include "core/or/congestion_control_flow.h"
20 #include "core/or/congestion_control_st.h"
21 #include "core/or/circuitlist.h"
22 #include "core/or/trace_probes_cc.h"
23 #include "feature/nodelist/networkstatus.h"
24 #include "trunnel/flow_control_cells.h"
25 #include "feature/control/control_events.h"
26 #include "lib/math/stats.h"
28 #include "core/or/connection_st.h"
29 #include "core/or/cell_st.h"
30 #include "app/config/config.h"
32 /** Cache consensus parameters */
33 static uint32_t xoff_client;
34 static uint32_t xoff_exit;
36 static uint32_t xon_change_pct;
37 static uint32_t xon_ewma_cnt;
38 static uint32_t xon_rate_bytes;
40 /** Metricsport stats */
41 uint64_t cc_stats_flow_num_xoff_sent;
42 uint64_t cc_stats_flow_num_xon_sent;
43 double cc_stats_flow_xoff_outbuf_ma = 0;
44 double cc_stats_flow_xon_outbuf_ma = 0;
46 /* In normal operation, we can get a burst of up to 32 cells before returning
47 * to libevent to flush the outbuf. This is a heuristic from hardcoded values
48 * and strange logic in connection_bucket_get_share(). */
49 #define MAX_EXPECTED_CELL_BURST 32
51 /* The following three are for dropmark rate limiting. They define when we
52 * scale down our XON, XOFF, and xmit byte counts. Early scaling is beneficial
53 * because it limits the ability of spurious XON/XOFF to be sent after large
54 * amounts of data without XON/XOFF. At these limits, after 10MB of data (or
55 * more), an adversary can only inject (log2(10MB)-log2(200*500))*100 ~= 1000
56 * cells of fake XOFF/XON before the xmit byte count will be halved enough to
57 * triggering a limit. */
58 #define XON_COUNT_SCALE_AT 200
59 #define XOFF_COUNT_SCALE_AT 200
60 #define ONE_MEGABYTE (UINT64_C(1) << 20)
61 #define TOTAL_XMIT_SCALE_AT (10 * ONE_MEGABYTE)
63 /**
64 * Return the congestion control object of the given edge connection.
66 * Returns NULL if the edge connection doesn't have a cpath_layer or not
67 * attached to a circuit. But also if the cpath_layer or circuit doesn't have a
68 * congestion control object.
70 static inline const congestion_control_t *
71 edge_get_ccontrol(const edge_connection_t *edge)
73 congestion_control_t *ccontrol = NULL;
75 if (edge->on_circuit && edge->on_circuit->ccontrol) {
76 ccontrol = edge->on_circuit->ccontrol;
77 } else if (edge->cpath_layer && edge->cpath_layer->ccontrol) {
78 ccontrol = edge->cpath_layer->ccontrol;
81 return ccontrol;
84 /**
85 * Update global congestion control related consensus parameter values, every
86 * consensus update.
88 * More details for each of the parameters can be found in proposal 324,
89 * section 6.5 including tuning notes.
91 void
92 flow_control_new_consensus_params(const networkstatus_t *ns)
94 #define CC_XOFF_CLIENT_DFLT 500
95 #define CC_XOFF_CLIENT_MIN 1
96 #define CC_XOFF_CLIENT_MAX 10000
97 xoff_client = networkstatus_get_param(ns, "cc_xoff_client",
98 CC_XOFF_CLIENT_DFLT,
99 CC_XOFF_CLIENT_MIN,
100 CC_XOFF_CLIENT_MAX)*RELAY_PAYLOAD_SIZE;
102 #define CC_XOFF_EXIT_DFLT 500
103 #define CC_XOFF_EXIT_MIN 1
104 #define CC_XOFF_EXIT_MAX 10000
105 xoff_exit = networkstatus_get_param(ns, "cc_xoff_exit",
106 CC_XOFF_EXIT_DFLT,
107 CC_XOFF_EXIT_MIN,
108 CC_XOFF_EXIT_MAX)*RELAY_PAYLOAD_SIZE;
110 #define CC_XON_CHANGE_PCT_DFLT 25
111 #define CC_XON_CHANGE_PCT_MIN 1
112 #define CC_XON_CHANGE_PCT_MAX 99
113 xon_change_pct = networkstatus_get_param(ns, "cc_xon_change_pct",
114 CC_XON_CHANGE_PCT_DFLT,
115 CC_XON_CHANGE_PCT_MIN,
116 CC_XON_CHANGE_PCT_MAX);
118 #define CC_XON_RATE_BYTES_DFLT (500)
119 #define CC_XON_RATE_BYTES_MIN (1)
120 #define CC_XON_RATE_BYTES_MAX (5000)
121 xon_rate_bytes = networkstatus_get_param(ns, "cc_xon_rate",
122 CC_XON_RATE_BYTES_DFLT,
123 CC_XON_RATE_BYTES_MIN,
124 CC_XON_RATE_BYTES_MAX)*RELAY_PAYLOAD_SIZE;
126 #define CC_XON_EWMA_CNT_DFLT (2)
127 #define CC_XON_EWMA_CNT_MIN (2)
128 #define CC_XON_EWMA_CNT_MAX (100)
129 xon_ewma_cnt = networkstatus_get_param(ns, "cc_xon_ewma_cnt",
130 CC_XON_EWMA_CNT_DFLT,
131 CC_XON_EWMA_CNT_MIN,
132 CC_XON_EWMA_CNT_MAX);
136 * Send an XOFF for this stream, and note that we sent one
138 static void
139 circuit_send_stream_xoff(edge_connection_t *stream)
141 xoff_cell_t xoff;
142 uint8_t payload[CELL_PAYLOAD_SIZE];
143 ssize_t xoff_size;
145 memset(&xoff, 0, sizeof(xoff));
146 memset(payload, 0, sizeof(payload));
148 xoff_cell_set_version(&xoff, 0);
150 if ((xoff_size = xoff_cell_encode(payload, CELL_PAYLOAD_SIZE, &xoff)) < 0) {
151 log_warn(LD_BUG, "Failed to encode xon cell");
152 return;
155 if (connection_edge_send_command(stream, RELAY_COMMAND_XOFF,
156 (char*)payload, (size_t)xoff_size) == 0) {
157 stream->xoff_sent = true;
158 cc_stats_flow_num_xoff_sent++;
160 /* If this is an entry conn, notify control port */
161 if (TO_CONN(stream)->type == CONN_TYPE_AP) {
162 control_event_stream_status(TO_ENTRY_CONN(TO_CONN(stream)),
163 STREAM_EVENT_XOFF_SENT,
170 * Compute the recent drain rate (write rate) for this edge
171 * connection and return it, in KB/sec (1000 bytes/sec).
173 * Returns 0 if the monotime clock is busted.
175 static inline uint32_t
176 compute_drain_rate(const edge_connection_t *stream)
178 if (BUG(!is_monotime_clock_reliable())) {
179 log_warn(LD_BUG, "Computing drain rate with stalled monotime clock");
180 return 0;
183 uint64_t delta = monotime_absolute_usec() - stream->drain_start_usec;
185 if (delta == 0) {
186 log_warn(LD_BUG, "Computing stream drain rate with zero time delta");
187 return 0;
190 /* Overflow checks */
191 if (stream->prev_drained_bytes > INT32_MAX/1000 || /* Intermediate */
192 stream->prev_drained_bytes/delta > INT32_MAX/1000) { /* full value */
193 return INT32_MAX;
196 /* kb/sec = bytes/usec * 1000 usec/msec * 1000 msec/sec * kb/1000bytes */
197 return MAX(1, (uint32_t)(stream->prev_drained_bytes * 1000)/delta);
201 * Send an XON for this stream, with appropriate advisory rate information.
203 * Reverts the xoff sent status, and stores the rate information we sent,
204 * in case it changes.
206 static void
207 circuit_send_stream_xon(edge_connection_t *stream)
209 xon_cell_t xon;
210 uint8_t payload[CELL_PAYLOAD_SIZE];
211 ssize_t xon_size;
213 memset(&xon, 0, sizeof(xon));
214 memset(payload, 0, sizeof(payload));
216 xon_cell_set_version(&xon, 0);
217 xon_cell_set_kbps_ewma(&xon, stream->ewma_drain_rate);
219 if ((xon_size = xon_cell_encode(payload, CELL_PAYLOAD_SIZE, &xon)) < 0) {
220 log_warn(LD_BUG, "Failed to encode xon cell");
221 return;
224 /* Store the advisory rate information, to send advisory updates if
225 * it changes */
226 stream->ewma_rate_last_sent = stream->ewma_drain_rate;
228 if (connection_edge_send_command(stream, RELAY_COMMAND_XON, (char*)payload,
229 (size_t)xon_size) == 0) {
230 /* Revert the xoff sent status, so we can send another one if need be */
231 stream->xoff_sent = false;
233 cc_stats_flow_num_xon_sent++;
235 /* If it's an entry conn, notify control port */
236 if (TO_CONN(stream)->type == CONN_TYPE_AP) {
237 control_event_stream_status(TO_ENTRY_CONN(TO_CONN(stream)),
238 STREAM_EVENT_XON_SENT,
245 * Process a stream XOFF, parsing it, and then stopping reading on
246 * the edge connection.
248 * Record that we have received an xoff, so we know not to resume
249 * reading on this edge conn until we get an XON.
251 * Returns false if the XOFF did not validate; true if it does.
253 bool
254 circuit_process_stream_xoff(edge_connection_t *conn,
255 const crypt_path_t *layer_hint,
256 const cell_t *cell)
258 (void)cell;
259 bool retval = true;
261 if (BUG(!conn)) {
262 log_fn(LOG_PROTOCOL_WARN, LD_EDGE,
263 "Got XOFF on invalid stream?");
264 return false;
267 /* Make sure this XOFF came from the right hop */
268 if (layer_hint && layer_hint != conn->cpath_layer) {
269 log_fn(LOG_PROTOCOL_WARN, LD_EDGE,
270 "Got XOFF from wrong hop.");
271 return false;
274 if (edge_get_ccontrol(conn) == NULL) {
275 log_fn(LOG_PROTOCOL_WARN, LD_EDGE,
276 "Got XOFF for non-congestion control circuit");
277 return false;
280 if (conn->xoff_received) {
281 log_fn(LOG_PROTOCOL_WARN, LD_EDGE,
282 "Got multiple XOFF on connection");
283 return false;
286 /* If we are near the max, scale everything down */
287 if (conn->num_xoff_recv == XOFF_COUNT_SCALE_AT) {
288 log_info(LD_EDGE, "Scaling down for XOFF count: %d %d %d",
289 conn->total_bytes_xmit,
290 conn->num_xoff_recv,
291 conn->num_xon_recv);
292 conn->total_bytes_xmit /= 2;
293 conn->num_xoff_recv /= 2;
294 conn->num_xon_recv /= 2;
297 conn->num_xoff_recv++;
299 /* Client-side check to make sure that XOFF is not sent too early,
300 * for dropmark attacks. The main sidechannel risk is early cells,
301 * but we also check to make sure that we have not received more XOFFs
302 * than could have been generated by the bytes we sent.
304 if (TO_CONN(conn)->type == CONN_TYPE_AP || conn->hs_ident != NULL) {
305 uint32_t limit = 0;
306 if (conn->hs_ident)
307 limit = xoff_client;
308 else
309 limit = xoff_exit;
311 if (conn->total_bytes_xmit < limit*conn->num_xoff_recv) {
312 log_fn(LOG_PROTOCOL_WARN, LD_EDGE,
313 "Got extra XOFF for bytes sent. Got %d, expected max %d",
314 conn->num_xoff_recv, conn->total_bytes_xmit/limit);
315 /* We still process this, because the only dropmark defenses
316 * in C tor are via the vanguards addon's use of the read valid
317 * cells. So just signal that we think this is not valid protocol
318 * data and proceed. */
319 retval = false;
323 log_info(LD_EDGE, "Got XOFF!");
324 connection_stop_reading(TO_CONN(conn));
325 conn->xoff_received = true;
327 /* If this is an entry conn, notify control port */
328 if (TO_CONN(conn)->type == CONN_TYPE_AP) {
329 control_event_stream_status(TO_ENTRY_CONN(TO_CONN(conn)),
330 STREAM_EVENT_XOFF_RECV,
334 return retval;
338 * Process a stream XON, and if it validates, clear the xoff
339 * flag and resume reading on this edge connection.
341 * Also, use provided rate information to rate limit
342 * reading on this edge (or packagaing from it onto
343 * the circuit), to avoid XON/XOFF chatter.
345 * Returns true if the XON validates, false otherwise.
347 bool
348 circuit_process_stream_xon(edge_connection_t *conn,
349 const crypt_path_t *layer_hint,
350 const cell_t *cell)
352 xon_cell_t *xon;
353 bool retval = true;
355 if (BUG(!conn)) {
356 log_fn(LOG_PROTOCOL_WARN, LD_EDGE,
357 "Got XON on invalid stream?");
358 return false;
361 /* Make sure this XON came from the right hop */
362 if (layer_hint && layer_hint != conn->cpath_layer) {
363 log_fn(LOG_PROTOCOL_WARN, LD_EDGE,
364 "Got XON from wrong hop.");
365 return false;
368 if (edge_get_ccontrol(conn) == NULL) {
369 log_fn(LOG_PROTOCOL_WARN, LD_EDGE,
370 "Got XON for non-congestion control circuit");
371 return false;
374 if (xon_cell_parse(&xon, cell->payload+RELAY_HEADER_SIZE,
375 CELL_PAYLOAD_SIZE-RELAY_HEADER_SIZE) < 0) {
376 log_fn(LOG_PROTOCOL_WARN, LD_EDGE,
377 "Received malformed XON cell.");
378 return false;
381 /* If we are near the max, scale everything down */
382 if (conn->num_xon_recv == XON_COUNT_SCALE_AT) {
383 log_info(LD_EDGE, "Scaling down for XON count: %d %d %d",
384 conn->total_bytes_xmit,
385 conn->num_xoff_recv,
386 conn->num_xon_recv);
387 conn->total_bytes_xmit /= 2;
388 conn->num_xoff_recv /= 2;
389 conn->num_xon_recv /= 2;
392 conn->num_xon_recv++;
394 /* Client-side check to make sure that XON is not sent too early,
395 * for dropmark attacks. The main sidechannel risk is early cells,
396 * but we also check to see that we did not get more XONs than make
397 * sense for the number of bytes we sent.
399 if (TO_CONN(conn)->type == CONN_TYPE_AP || conn->hs_ident != NULL) {
400 uint32_t limit = 0;
402 if (conn->hs_ident)
403 limit = MIN(xoff_client, xon_rate_bytes);
404 else
405 limit = MIN(xoff_exit, xon_rate_bytes);
407 if (conn->total_bytes_xmit < limit*conn->num_xon_recv) {
408 log_fn(LOG_PROTOCOL_WARN, LD_EDGE,
409 "Got extra XON for bytes sent. Got %d, expected max %d",
410 conn->num_xon_recv, conn->total_bytes_xmit/limit);
412 /* We still process this, because the only dropmark defenses
413 * in C tor are via the vanguards addon's use of the read valid
414 * cells. So just signal that we think this is not valid protocol
415 * data and proceed. */
416 retval = false;
420 log_info(LD_EDGE, "Got XON: %d", xon->kbps_ewma);
422 /* Adjust the token bucket of this edge connection with the drain rate in
423 * the XON. Rate is in bytes from kilobit (kpbs). */
424 uint64_t rate = ((uint64_t) xon_cell_get_kbps_ewma(xon) * 1000);
425 if (rate == 0 || INT32_MAX < rate) {
426 /* No rate. */
427 rate = INT32_MAX;
429 token_bucket_rw_adjust(&conn->bucket, (uint32_t) rate, (uint32_t) rate);
431 if (conn->xoff_received) {
432 /* Clear the fact that we got an XOFF, so that this edge can
433 * start and stop reading normally */
434 conn->xoff_received = false;
435 connection_start_reading(TO_CONN(conn));
438 /* If this is an entry conn, notify control port */
439 if (TO_CONN(conn)->type == CONN_TYPE_AP) {
440 control_event_stream_status(TO_ENTRY_CONN(TO_CONN(conn)),
441 STREAM_EVENT_XON_RECV,
445 xon_cell_free(xon);
447 return retval;
451 * Called from sendme_stream_data_received(), when data arrives
452 * from a circuit to our edge's outbuf, to decide if we need to send
453 * an XOFF.
455 * Returns the amount of cells remaining until the buffer is full, at
456 * which point it sends an XOFF, and returns 0.
458 * Returns less than 0 if we have queued more than a congestion window
459 * worth of data and need to close the circuit.
462 flow_control_decide_xoff(edge_connection_t *stream)
464 size_t total_buffered = connection_get_outbuf_len(TO_CONN(stream));
465 uint32_t buffer_limit_xoff = 0;
467 if (BUG(edge_get_ccontrol(stream) == NULL)) {
468 log_err(LD_BUG, "Flow control called for non-congestion control circuit");
469 return -1;
472 /* Onion services and clients are typically localhost edges, so they
473 * need different buffering limits than exits do */
474 if (TO_CONN(stream)->type == CONN_TYPE_AP || stream->hs_ident != NULL) {
475 buffer_limit_xoff = xoff_client;
476 } else {
477 buffer_limit_xoff = xoff_exit;
480 if (total_buffered > buffer_limit_xoff) {
481 if (!stream->xoff_sent) {
482 log_info(LD_EDGE, "Sending XOFF: %"TOR_PRIuSZ" %d",
483 total_buffered, buffer_limit_xoff);
484 tor_trace(TR_SUBSYS(cc), TR_EV(flow_decide_xoff_sending), stream);
486 cc_stats_flow_xoff_outbuf_ma =
487 stats_update_running_avg(cc_stats_flow_xoff_outbuf_ma,
488 total_buffered);
490 circuit_send_stream_xoff(stream);
492 /* Clear the drain rate. It is considered wrong if we
493 * got all the way to XOFF */
494 stream->ewma_drain_rate = 0;
498 /* If the outbuf has accumulated more than the expected burst limit of
499 * cells, then assume it is not draining, and call decide_xon. We must
500 * do this because writes only happen when the socket unblocks, so
501 * may not otherwise notice accumulation of data in the outbuf for
502 * advisory XONs. */
503 if (total_buffered > MAX_EXPECTED_CELL_BURST*RELAY_PAYLOAD_SIZE) {
504 flow_control_decide_xon(stream, 0);
507 /* Flow control always takes more data; we rely on the oomkiller to
508 * handle misbehavior. */
509 return 0;
513 * Returns true if the stream's drain rate has changed significantly.
515 * Returns false if the monotime clock is stalled, or if we have
516 * no previous drain rate information.
518 static bool
519 stream_drain_rate_changed(const edge_connection_t *stream)
521 if (!is_monotime_clock_reliable()) {
522 return false;
525 if (!stream->ewma_rate_last_sent) {
526 return false;
529 if (stream->ewma_drain_rate >
530 (100+(uint64_t)xon_change_pct)*stream->ewma_rate_last_sent/100) {
531 return true;
534 if (stream->ewma_drain_rate <
535 (100-(uint64_t)xon_change_pct)*stream->ewma_rate_last_sent/100) {
536 return true;
539 return false;
543 * Called whenever we drain an edge connection outbuf by writing on
544 * its socket, to decide if it is time to send an xon.
546 * The n_written parameter tells us how many bytes we have written
547 * this time, which is used to compute the advisory drain rate fields.
549 void
550 flow_control_decide_xon(edge_connection_t *stream, size_t n_written)
552 size_t total_buffered = connection_get_outbuf_len(TO_CONN(stream));
554 /* Bounds check the number of drained bytes, and scale */
555 if (stream->drained_bytes >= UINT32_MAX - n_written) {
556 /* Cut the bytes in half, and move the start time up halfway to now
557 * (if we have one). */
558 stream->drained_bytes /= 2;
560 if (stream->drain_start_usec) {
561 uint64_t now = monotime_absolute_usec();
563 stream->drain_start_usec = now - (now-stream->drain_start_usec)/2;
567 /* Accumulate drained bytes since last rate computation */
568 stream->drained_bytes += n_written;
570 tor_trace(TR_SUBSYS(cc), TR_EV(flow_decide_xon), stream, n_written);
572 /* Check for bad monotime clock and bytecount wrap */
573 if (!is_monotime_clock_reliable()) {
574 /* If the monotime clock ever goes wrong, the safest thing to do
575 * is just clear our short-term rate info and wait for the clock to
576 * become reliable again.. */
577 stream->drain_start_usec = 0;
578 stream->drained_bytes = 0;
579 } else {
580 /* If we have no drain start timestamp, and we still have
581 * remaining buffer, start the buffering counter */
582 if (!stream->drain_start_usec && total_buffered > 0) {
583 log_debug(LD_EDGE, "Began edge buffering: %d %d %"TOR_PRIuSZ,
584 stream->ewma_rate_last_sent,
585 stream->ewma_drain_rate,
586 total_buffered);
587 tor_trace(TR_SUBSYS(cc), TR_EV(flow_decide_xon_drain_start),
588 stream);
589 stream->drain_start_usec = monotime_absolute_usec();
590 stream->drained_bytes = 0;
594 if (stream->drain_start_usec) {
595 /* If we have spent enough time in a queued state, update our drain
596 * rate. */
597 if (stream->drained_bytes > xon_rate_bytes) {
598 /* No previous drained bytes means it is the first time we are computing
599 * it so use the value we just drained onto the socket as a baseline. It
600 * won't be accurate but it will be a start towards the right value.
602 * We have to do this in order to have a drain rate else we could be
603 * sending a drain rate of 0 in an XON which would be undesirable and
604 * basically like sending an XOFF. */
605 if (stream->prev_drained_bytes == 0) {
606 stream->prev_drained_bytes = stream->drained_bytes;
608 uint32_t drain_rate = compute_drain_rate(stream);
609 /* Once the drain rate has been computed, note how many bytes we just
610 * drained so it can be used at the next calculation. We do this here
611 * because it gets reset once the rate is changed. */
612 stream->prev_drained_bytes = stream->drained_bytes;
614 if (drain_rate) {
615 stream->ewma_drain_rate =
616 (uint32_t)n_count_ewma(drain_rate,
617 stream->ewma_drain_rate,
618 xon_ewma_cnt);
619 log_debug(LD_EDGE, "Updating drain rate: %d %d %"TOR_PRIuSZ,
620 drain_rate,
621 stream->ewma_drain_rate,
622 total_buffered);
623 tor_trace(TR_SUBSYS(cc), TR_EV(flow_decide_xon_drain_update),
624 stream, drain_rate);
625 /* Reset recent byte counts. This prevents us from sending advisory
626 * XONs more frequent than every xon_rate_bytes. */
627 stream->drained_bytes = 0;
628 stream->drain_start_usec = 0;
633 /* If we don't have an XOFF outstanding, consider updating an
634 * old rate */
635 if (!stream->xoff_sent) {
636 if (stream_drain_rate_changed(stream)) {
637 /* If we are still buffering and the rate changed, update
638 * advisory XON */
639 log_info(LD_EDGE, "Sending rate-change XON: %d %d %"TOR_PRIuSZ,
640 stream->ewma_rate_last_sent,
641 stream->ewma_drain_rate,
642 total_buffered);
643 tor_trace(TR_SUBSYS(cc), TR_EV(flow_decide_xon_rate_change), stream);
645 cc_stats_flow_xon_outbuf_ma =
646 stats_update_running_avg(cc_stats_flow_xon_outbuf_ma,
647 total_buffered);
649 circuit_send_stream_xon(stream);
651 } else if (total_buffered == 0) {
652 log_info(LD_EDGE, "Sending XON: %d %d %"TOR_PRIuSZ,
653 stream->ewma_rate_last_sent,
654 stream->ewma_drain_rate,
655 total_buffered);
656 tor_trace(TR_SUBSYS(cc), TR_EV(flow_decide_xon_partial_drain), stream);
657 circuit_send_stream_xon(stream);
660 /* If the buffer has fully emptied, clear the drain timestamp,
661 * so we can total only bytes drained while outbuf is 0. */
662 if (total_buffered == 0) {
663 stream->drain_start_usec = 0;
665 /* After we've spent 'xon_rate_bytes' with the queue fully drained,
666 * double any rate we sent. */
667 if (stream->drained_bytes >= xon_rate_bytes &&
668 stream->ewma_rate_last_sent) {
669 stream->ewma_drain_rate = MIN(INT32_MAX, 2*stream->ewma_drain_rate);
671 log_debug(LD_EDGE,
672 "Queue empty for xon_rate_limit bytes: %d %d",
673 stream->ewma_rate_last_sent,
674 stream->ewma_drain_rate);
675 tor_trace(TR_SUBSYS(cc), TR_EV(flow_decide_xon_drain_doubled), stream);
676 /* Resetting the drained bytes count. We need to keep its value as a
677 * previous so the drain rate calculation takes into account what was
678 * actually drain the last time. */
679 stream->prev_drained_bytes = stream->drained_bytes;
680 stream->drained_bytes = 0;
684 return;
688 * Note that we packaged some data on this stream. Used to enforce
689 * client-side dropmark limits
691 void
692 flow_control_note_sent_data(edge_connection_t *stream, size_t len)
694 /* If we are near the max, scale everything down */
695 if (stream->total_bytes_xmit >= TOTAL_XMIT_SCALE_AT-len) {
696 log_info(LD_EDGE, "Scaling down for flow control xmit bytes:: %d %d %d",
697 stream->total_bytes_xmit,
698 stream->num_xoff_recv,
699 stream->num_xon_recv);
701 stream->total_bytes_xmit /= 2;
702 stream->num_xoff_recv /= 2;
703 stream->num_xon_recv /= 2;
706 stream->total_bytes_xmit += len;
709 /** Returns true if an edge connection uses flow control */
710 bool
711 edge_uses_flow_control(const edge_connection_t *stream)
713 bool ret = (stream->on_circuit && stream->on_circuit->ccontrol) ||
714 (stream->cpath_layer && stream->cpath_layer->ccontrol);
716 /* All circuits with congestion control use flow control */
717 return ret;
721 * Returns the max RTT for the circuit that carries this stream,
722 * as observed by congestion control.
724 uint64_t
725 edge_get_max_rtt(const edge_connection_t *stream)
727 if (stream->on_circuit && stream->on_circuit->ccontrol)
728 return stream->on_circuit->ccontrol->max_rtt_usec;
729 else if (stream->cpath_layer && stream->cpath_layer->ccontrol)
730 return stream->cpath_layer->ccontrol->max_rtt_usec;
732 return 0;
735 /** Returns true if a connection is an edge conn that uses flow control */
736 bool
737 conn_uses_flow_control(connection_t *conn)
739 bool ret = false;
741 if (CONN_IS_EDGE(conn)) {
742 edge_connection_t *edge = TO_EDGE_CONN(conn);
744 if (edge_uses_flow_control(edge)) {
745 ret = true;
749 return ret;