1 /* Copyright (c) 2001 Matej Pfajfar.
2 * Copyright (c) 2001-2004, Roger Dingledine.
3 * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson.
4 * Copyright (c) 2007-2010, The Tor Project, Inc. */
5 /* See LICENSE for licensing information */
9 * \brief Handle relay cell encryption/decryption, plus packaging and
10 * receiving from circuits, plus queuing on circuits.
16 #include "circuitbuild.h"
17 #include "circuitlist.h"
19 #include "connection.h"
20 #include "connection_edge.h"
21 #include "connection_or.h"
26 #include "networkstatus.h"
30 #include "rendcommon.h"
31 #include "routerlist.h"
32 #include "routerparse.h"
34 static int relay_crypt(circuit_t
*circ
, cell_t
*cell
,
35 cell_direction_t cell_direction
,
36 crypt_path_t
**layer_hint
, char *recognized
);
37 static edge_connection_t
*relay_lookup_conn(circuit_t
*circ
, cell_t
*cell
,
38 cell_direction_t cell_direction
,
39 crypt_path_t
*layer_hint
);
41 static int connection_edge_process_relay_cell(cell_t
*cell
, circuit_t
*circ
,
42 edge_connection_t
*conn
,
43 crypt_path_t
*layer_hint
);
44 static void circuit_consider_sending_sendme(circuit_t
*circ
,
45 crypt_path_t
*layer_hint
);
46 static void circuit_resume_edge_reading(circuit_t
*circ
,
47 crypt_path_t
*layer_hint
);
48 static int circuit_resume_edge_reading_helper(edge_connection_t
*conn
,
50 crypt_path_t
*layer_hint
);
51 static int circuit_consider_stop_edge_reading(circuit_t
*circ
,
52 crypt_path_t
*layer_hint
);
53 static int circuit_queue_streams_are_blocked(circuit_t
*circ
);
55 /** Cache the current hi-res time; the cache gets reset when libevent
58 static struct timeval cached_time_hires
= {0, 0};
60 /** Stop reading on edge connections when we have this many cells
61 * waiting on the appropriate queue. */
62 #define CELL_QUEUE_HIGHWATER_SIZE 256
63 /** Start reading from edge connections again when we get down to this many
65 #define CELL_QUEUE_LOWWATER_SIZE 64
68 tor_gettimeofday_cached(struct timeval
*tv
)
70 if (cached_time_hires
.tv_sec
== 0) {
71 tor_gettimeofday(&cached_time_hires
);
73 *tv
= cached_time_hires
;
77 tor_gettimeofday_cache_clear(void)
79 cached_time_hires
.tv_sec
= 0;
82 /** Stats: how many relay cells have originated at this hop, or have
83 * been relayed onward (not recognized at this hop)?
85 uint64_t stats_n_relay_cells_relayed
= 0;
86 /** Stats: how many relay cells have been delivered to streams at this
89 uint64_t stats_n_relay_cells_delivered
= 0;
91 /** Update digest from the payload of cell. Assign integrity part to
95 relay_set_digest(crypto_digest_env_t
*digest
, cell_t
*cell
)
100 crypto_digest_add_bytes(digest
, cell
->payload
, CELL_PAYLOAD_SIZE
);
101 crypto_digest_get_digest(digest
, integrity
, 4);
102 // log_fn(LOG_DEBUG,"Putting digest of %u %u %u %u into relay cell.",
103 // integrity[0], integrity[1], integrity[2], integrity[3]);
104 relay_header_unpack(&rh
, cell
->payload
);
105 memcpy(rh
.integrity
, integrity
, 4);
106 relay_header_pack(cell
->payload
, &rh
);
109 /** Does the digest for this circuit indicate that this cell is for us?
111 * Update digest from the payload of cell (with the integrity part set
112 * to 0). If the integrity part is valid, return 1, else restore digest
113 * and cell to their original state and return 0.
116 relay_digest_matches(crypto_digest_env_t
*digest
, cell_t
*cell
)
118 char received_integrity
[4], calculated_integrity
[4];
120 crypto_digest_env_t
*backup_digest
=NULL
;
122 backup_digest
= crypto_digest_dup(digest
);
124 relay_header_unpack(&rh
, cell
->payload
);
125 memcpy(received_integrity
, rh
.integrity
, 4);
126 memset(rh
.integrity
, 0, 4);
127 relay_header_pack(cell
->payload
, &rh
);
129 // log_fn(LOG_DEBUG,"Reading digest of %u %u %u %u from relay cell.",
130 // received_integrity[0], received_integrity[1],
131 // received_integrity[2], received_integrity[3]);
133 crypto_digest_add_bytes(digest
, cell
->payload
, CELL_PAYLOAD_SIZE
);
134 crypto_digest_get_digest(digest
, calculated_integrity
, 4);
136 if (memcmp(received_integrity
, calculated_integrity
, 4)) {
137 // log_fn(LOG_INFO,"Recognized=0 but bad digest. Not recognizing.");
138 // (%d vs %d).", received_integrity, calculated_integrity);
139 /* restore digest to its old form */
140 crypto_digest_assign(digest
, backup_digest
);
141 /* restore the relay header */
142 memcpy(rh
.integrity
, received_integrity
, 4);
143 relay_header_pack(cell
->payload
, &rh
);
144 crypto_free_digest_env(backup_digest
);
147 crypto_free_digest_env(backup_digest
);
151 /** Apply <b>cipher</b> to CELL_PAYLOAD_SIZE bytes of <b>in</b>
154 * If <b>encrypt_mode</b> is 1 then encrypt, else decrypt.
156 * Return -1 if the crypto fails, else return 0.
159 relay_crypt_one_payload(crypto_cipher_env_t
*cipher
, char *in
,
164 r
= crypto_cipher_crypt_inplace(cipher
, in
, CELL_PAYLOAD_SIZE
);
167 log_warn(LD_BUG
,"Error during relay encryption");
173 /** Receive a relay cell:
174 * - Crypt it (encrypt if headed toward the origin or if we <b>are</b> the
175 * origin; decrypt if we're headed toward the exit).
176 * - Check if recognized (if exitward).
177 * - If recognized and the digest checks out, then find if there's a stream
178 * that the cell is intended for, and deliver it to the right
180 * - If not recognized, then we need to relay it: append it to the appropriate
181 * cell_queue on <b>circ</b>.
183 * Return -<b>reason</b> on failure.
186 circuit_receive_relay_cell(cell_t
*cell
, circuit_t
*circ
,
187 cell_direction_t cell_direction
)
189 or_connection_t
*or_conn
=NULL
;
190 crypt_path_t
*layer_hint
=NULL
;
196 tor_assert(cell_direction
== CELL_DIRECTION_OUT
||
197 cell_direction
== CELL_DIRECTION_IN
);
198 if (circ
->marked_for_close
)
201 if (relay_crypt(circ
, cell
, cell_direction
, &layer_hint
, &recognized
) < 0) {
202 log_warn(LD_BUG
,"relay crypt failed. Dropping connection.");
203 return -END_CIRC_REASON_INTERNAL
;
207 edge_connection_t
*conn
= relay_lookup_conn(circ
, cell
, cell_direction
,
209 if (cell_direction
== CELL_DIRECTION_OUT
) {
210 ++stats_n_relay_cells_delivered
;
211 log_debug(LD_OR
,"Sending away from origin.");
212 if ((reason
=connection_edge_process_relay_cell(cell
, circ
, conn
, NULL
))
214 log_fn(LOG_PROTOCOL_WARN
, LD_PROTOCOL
,
215 "connection_edge_process_relay_cell (away from origin) "
220 if (cell_direction
== CELL_DIRECTION_IN
) {
221 ++stats_n_relay_cells_delivered
;
222 log_debug(LD_OR
,"Sending to origin.");
223 if ((reason
= connection_edge_process_relay_cell(cell
, circ
, conn
,
226 "connection_edge_process_relay_cell (at origin) failed.");
233 /* not recognized. pass it on. */
234 if (cell_direction
== CELL_DIRECTION_OUT
) {
235 cell
->circ_id
= circ
->n_circ_id
; /* switch it */
236 or_conn
= circ
->n_conn
;
237 } else if (! CIRCUIT_IS_ORIGIN(circ
)) {
238 cell
->circ_id
= TO_OR_CIRCUIT(circ
)->p_circ_id
; /* switch it */
239 or_conn
= TO_OR_CIRCUIT(circ
)->p_conn
;
241 log_fn(LOG_PROTOCOL_WARN
, LD_OR
,
242 "Dropping unrecognized inbound cell on origin circuit.");
247 // XXXX Can this splice stuff be done more cleanly?
248 if (! CIRCUIT_IS_ORIGIN(circ
) &&
249 TO_OR_CIRCUIT(circ
)->rend_splice
&&
250 cell_direction
== CELL_DIRECTION_OUT
) {
251 or_circuit_t
*splice
= TO_OR_CIRCUIT(circ
)->rend_splice
;
252 tor_assert(circ
->purpose
== CIRCUIT_PURPOSE_REND_ESTABLISHED
);
253 tor_assert(splice
->_base
.purpose
== CIRCUIT_PURPOSE_REND_ESTABLISHED
);
254 cell
->circ_id
= splice
->p_circ_id
;
255 cell
->command
= CELL_RELAY
; /* can't be relay_early anyway */
256 if ((reason
= circuit_receive_relay_cell(cell
, TO_CIRCUIT(splice
),
257 CELL_DIRECTION_IN
)) < 0) {
258 log_warn(LD_REND
, "Error relaying cell across rendezvous; closing "
260 /* XXXX Do this here, or just return -1? */
261 circuit_mark_for_close(circ
, -reason
);
266 log_fn(LOG_PROTOCOL_WARN
, LD_PROTOCOL
,
267 "Didn't recognize cell, but circ stops here! Closing circ.");
268 return -END_CIRC_REASON_TORPROTOCOL
;
271 log_debug(LD_OR
,"Passing on unrecognized cell.");
273 ++stats_n_relay_cells_relayed
; /* XXXX no longer quite accurate {cells}
274 * we might kill the circ before we relay
277 append_cell_to_circuit_queue(circ
, or_conn
, cell
, cell_direction
, 0);
281 /** Do the appropriate en/decryptions for <b>cell</b> arriving on
282 * <b>circ</b> in direction <b>cell_direction</b>.
284 * If cell_direction == CELL_DIRECTION_IN:
285 * - If we're at the origin (we're the OP), for hops 1..N,
286 * decrypt cell. If recognized, stop.
287 * - Else (we're not the OP), encrypt one hop. Cell is not recognized.
289 * If cell_direction == CELL_DIRECTION_OUT:
290 * - decrypt one hop. Check if recognized.
292 * If cell is recognized, set *recognized to 1, and set
293 * *layer_hint to the hop that recognized it.
295 * Return -1 to indicate that we should mark the circuit for close,
299 relay_crypt(circuit_t
*circ
, cell_t
*cell
, cell_direction_t cell_direction
,
300 crypt_path_t
**layer_hint
, char *recognized
)
306 tor_assert(recognized
);
307 tor_assert(cell_direction
== CELL_DIRECTION_IN
||
308 cell_direction
== CELL_DIRECTION_OUT
);
310 if (cell_direction
== CELL_DIRECTION_IN
) {
311 if (CIRCUIT_IS_ORIGIN(circ
)) { /* We're at the beginning of the circuit.
312 * We'll want to do layered decrypts. */
313 crypt_path_t
*thishop
, *cpath
= TO_ORIGIN_CIRCUIT(circ
)->cpath
;
315 if (thishop
->state
!= CPATH_STATE_OPEN
) {
316 log_fn(LOG_PROTOCOL_WARN
, LD_PROTOCOL
,
317 "Relay cell before first created cell? Closing.");
320 do { /* Remember: cpath is in forward order, that is, first hop first. */
323 if (relay_crypt_one_payload(thishop
->b_crypto
, cell
->payload
, 0) < 0)
326 relay_header_unpack(&rh
, cell
->payload
);
327 if (rh
.recognized
== 0) {
328 /* it's possibly recognized. have to check digest to be sure. */
329 if (relay_digest_matches(thishop
->b_digest
, cell
)) {
331 *layer_hint
= thishop
;
336 thishop
= thishop
->next
;
337 } while (thishop
!= cpath
&& thishop
->state
== CPATH_STATE_OPEN
);
338 log_fn(LOG_PROTOCOL_WARN
, LD_OR
,
339 "Incoming cell at client not recognized. Closing.");
341 } else { /* we're in the middle. Just one crypt. */
342 if (relay_crypt_one_payload(TO_OR_CIRCUIT(circ
)->p_crypto
,
343 cell
->payload
, 1) < 0)
345 // log_fn(LOG_DEBUG,"Skipping recognized check, because we're not "
348 } else /* cell_direction == CELL_DIRECTION_OUT */ {
349 /* we're in the middle. Just one crypt. */
351 if (relay_crypt_one_payload(TO_OR_CIRCUIT(circ
)->n_crypto
,
352 cell
->payload
, 0) < 0)
355 relay_header_unpack(&rh
, cell
->payload
);
356 if (rh
.recognized
== 0) {
357 /* it's possibly recognized. have to check digest to be sure. */
358 if (relay_digest_matches(TO_OR_CIRCUIT(circ
)->n_digest
, cell
)) {
367 /** Package a relay cell from an edge:
368 * - Encrypt it to the right layer
369 * - Append it to the appropriate cell_queue on <b>circ</b>.
372 circuit_package_relay_cell(cell_t
*cell
, circuit_t
*circ
,
373 cell_direction_t cell_direction
,
374 crypt_path_t
*layer_hint
, streamid_t on_stream
)
376 or_connection_t
*conn
; /* where to send the cell */
378 if (cell_direction
== CELL_DIRECTION_OUT
) {
379 crypt_path_t
*thishop
; /* counter for repeated crypts */
381 if (!CIRCUIT_IS_ORIGIN(circ
) || !conn
) {
382 log_warn(LD_BUG
,"outgoing relay cell has n_conn==NULL. Dropping.");
383 return 0; /* just drop it */
386 relay_set_digest(layer_hint
->f_digest
, cell
);
388 thishop
= layer_hint
;
389 /* moving from farthest to nearest hop */
392 /* XXXX RD This is a bug, right? */
393 log_debug(LD_OR
,"crypting a layer of the relay cell.");
394 if (relay_crypt_one_payload(thishop
->f_crypto
, cell
->payload
, 1) < 0) {
398 thishop
= thishop
->prev
;
399 } while (thishop
!= TO_ORIGIN_CIRCUIT(circ
)->cpath
->prev
);
401 } else { /* incoming cell */
402 or_circuit_t
*or_circ
;
403 if (CIRCUIT_IS_ORIGIN(circ
)) {
404 /* We should never package an _incoming_ cell from the circuit
405 * origin; that means we messed up somewhere. */
406 log_warn(LD_BUG
,"incoming relay cell at origin circuit. Dropping.");
407 assert_circuit_ok(circ
);
408 return 0; /* just drop it */
410 or_circ
= TO_OR_CIRCUIT(circ
);
411 conn
= or_circ
->p_conn
;
412 relay_set_digest(or_circ
->p_digest
, cell
);
413 if (relay_crypt_one_payload(or_circ
->p_crypto
, cell
->payload
, 1) < 0)
416 ++stats_n_relay_cells_relayed
;
418 append_cell_to_circuit_queue(circ
, conn
, cell
, cell_direction
, on_stream
);
422 /** If cell's stream_id matches the stream_id of any conn that's
423 * attached to circ, return that conn, else return NULL.
425 static edge_connection_t
*
426 relay_lookup_conn(circuit_t
*circ
, cell_t
*cell
,
427 cell_direction_t cell_direction
, crypt_path_t
*layer_hint
)
429 edge_connection_t
*tmpconn
;
432 relay_header_unpack(&rh
, cell
->payload
);
437 /* IN or OUT cells could have come from either direction, now
438 * that we allow rendezvous *to* an OP.
441 if (CIRCUIT_IS_ORIGIN(circ
)) {
442 for (tmpconn
= TO_ORIGIN_CIRCUIT(circ
)->p_streams
; tmpconn
;
443 tmpconn
=tmpconn
->next_stream
) {
444 if (rh
.stream_id
== tmpconn
->stream_id
&&
445 !tmpconn
->_base
.marked_for_close
&&
446 tmpconn
->cpath_layer
== layer_hint
) {
447 log_debug(LD_APP
,"found conn for stream %d.", rh
.stream_id
);
452 for (tmpconn
= TO_OR_CIRCUIT(circ
)->n_streams
; tmpconn
;
453 tmpconn
=tmpconn
->next_stream
) {
454 if (rh
.stream_id
== tmpconn
->stream_id
&&
455 !tmpconn
->_base
.marked_for_close
) {
456 log_debug(LD_EXIT
,"found conn for stream %d.", rh
.stream_id
);
457 if (cell_direction
== CELL_DIRECTION_OUT
||
458 connection_edge_is_rendezvous_stream(tmpconn
))
462 for (tmpconn
= TO_OR_CIRCUIT(circ
)->resolving_streams
; tmpconn
;
463 tmpconn
=tmpconn
->next_stream
) {
464 if (rh
.stream_id
== tmpconn
->stream_id
&&
465 !tmpconn
->_base
.marked_for_close
) {
466 log_debug(LD_EXIT
,"found conn for stream %d.", rh
.stream_id
);
471 return NULL
; /* probably a begin relay cell */
474 /** Pack the relay_header_t host-order structure <b>src</b> into
475 * network-order in the buffer <b>dest</b>. See tor-spec.txt for details
476 * about the wire format.
479 relay_header_pack(char *dest
, const relay_header_t
*src
)
481 *(uint8_t*)(dest
) = src
->command
;
483 set_uint16(dest
+1, htons(src
->recognized
));
484 set_uint16(dest
+3, htons(src
->stream_id
));
485 memcpy(dest
+5, src
->integrity
, 4);
486 set_uint16(dest
+9, htons(src
->length
));
489 /** Unpack the network-order buffer <b>src</b> into a host-order
490 * relay_header_t structure <b>dest</b>.
493 relay_header_unpack(relay_header_t
*dest
, const char *src
)
495 dest
->command
= *(uint8_t*)(src
);
497 dest
->recognized
= ntohs(get_uint16(src
+1));
498 dest
->stream_id
= ntohs(get_uint16(src
+3));
499 memcpy(dest
->integrity
, src
+5, 4);
500 dest
->length
= ntohs(get_uint16(src
+9));
503 /** Convert the relay <b>command</b> into a human-readable string. */
505 relay_command_to_string(uint8_t command
)
508 case RELAY_COMMAND_BEGIN
: return "BEGIN";
509 case RELAY_COMMAND_DATA
: return "DATA";
510 case RELAY_COMMAND_END
: return "END";
511 case RELAY_COMMAND_CONNECTED
: return "CONNECTED";
512 case RELAY_COMMAND_SENDME
: return "SENDME";
513 case RELAY_COMMAND_EXTEND
: return "EXTEND";
514 case RELAY_COMMAND_EXTENDED
: return "EXTENDED";
515 case RELAY_COMMAND_TRUNCATE
: return "TRUNCATE";
516 case RELAY_COMMAND_TRUNCATED
: return "TRUNCATED";
517 case RELAY_COMMAND_DROP
: return "DROP";
518 case RELAY_COMMAND_RESOLVE
: return "RESOLVE";
519 case RELAY_COMMAND_RESOLVED
: return "RESOLVED";
520 case RELAY_COMMAND_BEGIN_DIR
: return "BEGIN_DIR";
521 case RELAY_COMMAND_ESTABLISH_INTRO
: return "ESTABLISH_INTRO";
522 case RELAY_COMMAND_ESTABLISH_RENDEZVOUS
: return "ESTABLISH_RENDEZVOUS";
523 case RELAY_COMMAND_INTRODUCE1
: return "INTRODUCE1";
524 case RELAY_COMMAND_INTRODUCE2
: return "INTRODUCE2";
525 case RELAY_COMMAND_RENDEZVOUS1
: return "RENDEZVOUS1";
526 case RELAY_COMMAND_RENDEZVOUS2
: return "RENDEZVOUS2";
527 case RELAY_COMMAND_INTRO_ESTABLISHED
: return "INTRO_ESTABLISHED";
528 case RELAY_COMMAND_RENDEZVOUS_ESTABLISHED
:
529 return "RENDEZVOUS_ESTABLISHED";
530 case RELAY_COMMAND_INTRODUCE_ACK
: return "INTRODUCE_ACK";
531 default: return "(unrecognized)";
535 /** Make a relay cell out of <b>relay_command</b> and <b>payload</b>, and send
536 * it onto the open circuit <b>circ</b>. <b>stream_id</b> is the ID on
537 * <b>circ</b> for the stream that's sending the relay cell, or 0 if it's a
538 * control cell. <b>cpath_layer</b> is NULL for OR->OP cells, or the
539 * destination hop for OP->OR cells.
541 * If you can't send the cell, mark the circuit for close and return -1. Else
545 relay_send_command_from_edge(streamid_t stream_id
, circuit_t
*circ
,
546 uint8_t relay_command
, const char *payload
,
547 size_t payload_len
, crypt_path_t
*cpath_layer
)
551 cell_direction_t cell_direction
;
552 /* XXXX NM Split this function into a separate versions per circuit type? */
555 tor_assert(payload_len
<= RELAY_PAYLOAD_SIZE
);
557 memset(&cell
, 0, sizeof(cell_t
));
558 cell
.command
= CELL_RELAY
;
560 cell
.circ_id
= circ
->n_circ_id
;
561 cell_direction
= CELL_DIRECTION_OUT
;
562 } else if (! CIRCUIT_IS_ORIGIN(circ
)) {
563 cell
.circ_id
= TO_OR_CIRCUIT(circ
)->p_circ_id
;
564 cell_direction
= CELL_DIRECTION_IN
;
569 memset(&rh
, 0, sizeof(rh
));
570 rh
.command
= relay_command
;
571 rh
.stream_id
= stream_id
;
572 rh
.length
= payload_len
;
573 relay_header_pack(cell
.payload
, &rh
);
575 memcpy(cell
.payload
+RELAY_HEADER_SIZE
, payload
, payload_len
);
577 log_debug(LD_OR
,"delivering %d cell %s.", relay_command
,
578 cell_direction
== CELL_DIRECTION_OUT
? "forward" : "backward");
580 /* If we are sending an END cell and this circuit is used for a tunneled
581 * directory request, advance its state. */
582 if (relay_command
== RELAY_COMMAND_END
&& circ
->dirreq_id
)
583 geoip_change_dirreq_state(circ
->dirreq_id
, DIRREQ_TUNNELED
,
584 DIRREQ_END_CELL_SENT
);
586 if (cell_direction
== CELL_DIRECTION_OUT
&& circ
->n_conn
) {
587 /* if we're using relaybandwidthrate, this conn wants priority */
588 circ
->n_conn
->client_used
= approx_time();
591 if (cell_direction
== CELL_DIRECTION_OUT
) {
592 origin_circuit_t
*origin_circ
= TO_ORIGIN_CIRCUIT(circ
);
593 if (origin_circ
->remaining_relay_early_cells
> 0 &&
594 (relay_command
== RELAY_COMMAND_EXTEND
||
595 (cpath_layer
!= origin_circ
->cpath
&&
596 !CIRCUIT_PURPOSE_IS_ESTABLISHED_REND(circ
->purpose
)))) {
597 /* If we've got any relay_early cells left, and we're sending
598 * an extend cell or (we're not talking to the first hop and we're
599 * not talking to a rendezvous circuit), use one of them.
600 * Don't worry about the conn protocol version:
601 * append_cell_to_circuit_queue will fix it up. */
602 /* XXX For now, clients don't use RELAY_EARLY cells when sending
603 * relay cells on rendezvous circuits. See bug 1038. Once no relays
604 * (and thus no rendezvous points) are running 0.2.1.3-alpha through
605 * 0.2.1.18, we can take out that exception. -RD */
606 cell
.command
= CELL_RELAY_EARLY
;
607 --origin_circ
->remaining_relay_early_cells
;
608 log_debug(LD_OR
, "Sending a RELAY_EARLY cell; %d remaining.",
609 (int)origin_circ
->remaining_relay_early_cells
);
610 /* Memorize the command that is sent as RELAY_EARLY cell; helps debug
612 origin_circ
->relay_early_commands
[
613 origin_circ
->relay_early_cells_sent
++] = relay_command
;
614 } else if (relay_command
== RELAY_COMMAND_EXTEND
) {
615 /* If no RELAY_EARLY cells can be sent over this circuit, log which
616 * commands have been sent as RELAY_EARLY cells before; helps debug
618 smartlist_t
*commands_list
= smartlist_create();
620 char *commands
= NULL
;
621 for (; i
< origin_circ
->relay_early_cells_sent
; i
++)
622 smartlist_add(commands_list
, (char *)
623 relay_command_to_string(origin_circ
->relay_early_commands
[i
]));
624 commands
= smartlist_join_strings(commands_list
, ",", 0, NULL
);
625 log_warn(LD_BUG
, "Uh-oh. We're sending a RELAY_COMMAND_EXTEND cell, "
626 "but we have run out of RELAY_EARLY cells on that circuit. "
627 "Commands sent before: %s", commands
);
629 smartlist_free(commands_list
);
633 if (circuit_package_relay_cell(&cell
, circ
, cell_direction
, cpath_layer
,
635 log_warn(LD_BUG
,"circuit_package_relay_cell failed. Closing.");
636 circuit_mark_for_close(circ
, END_CIRC_REASON_INTERNAL
);
642 /** Make a relay cell out of <b>relay_command</b> and <b>payload</b>, and
643 * send it onto the open circuit <b>circ</b>. <b>fromconn</b> is the stream
644 * that's sending the relay cell, or NULL if it's a control cell.
645 * <b>cpath_layer</b> is NULL for OR->OP cells, or the destination hop
648 * If you can't send the cell, mark the circuit for close and
649 * return -1. Else return 0.
652 connection_edge_send_command(edge_connection_t
*fromconn
,
653 uint8_t relay_command
, const char *payload
,
656 /* XXXX NM Split this function into a separate versions per circuit type? */
658 tor_assert(fromconn
);
659 circ
= fromconn
->on_circuit
;
661 if (fromconn
->_base
.marked_for_close
) {
663 "called on conn that's already marked for close at %s:%d.",
664 fromconn
->_base
.marked_for_close_file
,
665 fromconn
->_base
.marked_for_close
);
670 if (fromconn
->_base
.type
== CONN_TYPE_AP
) {
671 log_info(LD_APP
,"no circ. Closing conn.");
672 connection_mark_unattached_ap(fromconn
, END_STREAM_REASON_INTERNAL
);
674 log_info(LD_EXIT
,"no circ. Closing conn.");
675 fromconn
->edge_has_sent_end
= 1; /* no circ to send to */
676 fromconn
->end_reason
= END_STREAM_REASON_INTERNAL
;
677 connection_mark_for_close(TO_CONN(fromconn
));
682 return relay_send_command_from_edge(fromconn
->stream_id
, circ
,
683 relay_command
, payload
,
684 payload_len
, fromconn
->cpath_layer
);
687 /** How many times will I retry a stream that fails due to DNS
688 * resolve failure or misc error?
690 #define MAX_RESOLVE_FAILURES 3
692 /** Return 1 if reason is something that you should retry if you
693 * get the end cell before you've connected; else return 0. */
695 edge_reason_is_retriable(int reason
)
697 return reason
== END_STREAM_REASON_HIBERNATING
||
698 reason
== END_STREAM_REASON_RESOURCELIMIT
||
699 reason
== END_STREAM_REASON_EXITPOLICY
||
700 reason
== END_STREAM_REASON_RESOLVEFAILED
||
701 reason
== END_STREAM_REASON_MISC
||
702 reason
== END_STREAM_REASON_NOROUTE
;
705 /** Called when we receive an END cell on a stream that isn't open yet,
706 * from the client side.
707 * Arguments are as for connection_edge_process_relay_cell().
710 connection_ap_process_end_not_open(
711 relay_header_t
*rh
, cell_t
*cell
, origin_circuit_t
*circ
,
712 edge_connection_t
*conn
, crypt_path_t
*layer_hint
)
715 routerinfo_t
*exitrouter
;
716 int reason
= *(cell
->payload
+RELAY_HEADER_SIZE
);
717 int control_reason
= reason
| END_STREAM_REASON_FLAG_REMOTE
;
718 (void) layer_hint
; /* unused */
720 if (rh
->length
> 0 && edge_reason_is_retriable(reason
) &&
721 !connection_edge_is_rendezvous_stream(conn
) /* avoid retry if rend */
723 log_info(LD_APP
,"Address '%s' refused due to '%s'. Considering retrying.",
724 safe_str(conn
->socks_request
->address
),
725 stream_end_reason_to_string(reason
));
727 router_get_by_digest(circ
->build_state
->chosen_exit
->identity_digest
);
729 case END_STREAM_REASON_EXITPOLICY
:
730 if (rh
->length
>= 5) {
731 uint32_t addr
= ntohl(get_uint32(cell
->payload
+RELAY_HEADER_SIZE
+1));
734 log_info(LD_APP
,"Address '%s' resolved to 0.0.0.0. Closing,",
735 safe_str(conn
->socks_request
->address
));
736 connection_mark_unattached_ap(conn
, END_STREAM_REASON_TORPROTOCOL
);
740 ttl
= (int)ntohl(get_uint32(cell
->payload
+RELAY_HEADER_SIZE
+5));
744 if (get_options()->ClientDNSRejectInternalAddresses
&&
745 is_internal_IP(addr
, 0)) {
746 log_info(LD_APP
,"Address '%s' resolved to internal. Closing,",
747 safe_str(conn
->socks_request
->address
));
748 connection_mark_unattached_ap(conn
, END_STREAM_REASON_TORPROTOCOL
);
751 client_dns_set_addressmap(conn
->socks_request
->address
, addr
,
752 conn
->chosen_exit_name
, ttl
);
754 /* check if he *ought* to have allowed it */
757 (tor_inet_aton(conn
->socks_request
->address
, &in
) &&
758 !conn
->chosen_exit_name
))) {
760 "Exitrouter '%s' seems to be more restrictive than its exit "
761 "policy. Not using this router as exit for now.",
762 exitrouter
->nickname
);
763 policies_set_router_exitpolicy_to_reject_all(exitrouter
);
765 /* rewrite it to an IP if we learned one. */
766 if (addressmap_rewrite(conn
->socks_request
->address
,
767 sizeof(conn
->socks_request
->address
),
769 control_event_stream_status(conn
, STREAM_EVENT_REMAP
, 0);
771 if (conn
->chosen_exit_optional
||
772 conn
->chosen_exit_retries
) {
773 /* stop wanting a specific exit */
774 conn
->chosen_exit_optional
= 0;
775 /* A non-zero chosen_exit_retries can happen if we set a
776 * TrackHostExits for this address under a port that the exit
777 * relay allows, but then try the same address with a different
778 * port that it doesn't allow to exit. We shouldn't unregister
779 * the mapping, since it is probably still wanted on the
780 * original port. But now we give away to the exit relay that
781 * we probably have a TrackHostExits on it. So be it. */
782 conn
->chosen_exit_retries
= 0;
783 tor_free(conn
->chosen_exit_name
); /* clears it */
785 if (connection_ap_detach_retriable(conn
, circ
, control_reason
) >= 0)
787 /* else, conn will get closed below */
789 case END_STREAM_REASON_CONNECTREFUSED
:
790 if (!conn
->chosen_exit_optional
)
791 break; /* break means it'll close, below */
792 /* Else fall through: expire this circuit, clear the
793 * chosen_exit_name field, and try again. */
794 case END_STREAM_REASON_RESOLVEFAILED
:
795 case END_STREAM_REASON_TIMEOUT
:
796 case END_STREAM_REASON_MISC
:
797 case END_STREAM_REASON_NOROUTE
:
798 if (client_dns_incr_failures(conn
->socks_request
->address
)
799 < MAX_RESOLVE_FAILURES
) {
800 /* We haven't retried too many times; reattach the connection. */
801 circuit_log_path(LOG_INFO
,LD_APP
,circ
);
802 tor_assert(circ
->_base
.timestamp_dirty
);
803 circ
->_base
.timestamp_dirty
-= get_options()->MaxCircuitDirtiness
;
805 if (conn
->chosen_exit_optional
) {
806 /* stop wanting a specific exit */
807 conn
->chosen_exit_optional
= 0;
808 tor_free(conn
->chosen_exit_name
); /* clears it */
810 if (connection_ap_detach_retriable(conn
, circ
, control_reason
) >= 0)
812 /* else, conn will get closed below */
815 "Have tried resolving or connecting to address '%s' "
816 "at %d different places. Giving up.",
817 safe_str(conn
->socks_request
->address
),
818 MAX_RESOLVE_FAILURES
);
819 /* clear the failures, so it will have a full try next time */
820 client_dns_clear_failures(conn
->socks_request
->address
);
823 case END_STREAM_REASON_HIBERNATING
:
824 case END_STREAM_REASON_RESOURCELIMIT
:
826 policies_set_router_exitpolicy_to_reject_all(exitrouter
);
828 if (conn
->chosen_exit_optional
) {
829 /* stop wanting a specific exit */
830 conn
->chosen_exit_optional
= 0;
831 tor_free(conn
->chosen_exit_name
); /* clears it */
833 if (connection_ap_detach_retriable(conn
, circ
, control_reason
) >= 0)
835 /* else, will close below */
838 log_info(LD_APP
,"Giving up on retrying; conn can't be handled.");
842 "Edge got end (%s) before we're connected. Marking for close.",
843 stream_end_reason_to_string(rh
->length
> 0 ? reason
: -1));
844 circuit_log_path(LOG_INFO
,LD_APP
,circ
);
845 /* need to test because of detach_retriable */
846 if (!conn
->_base
.marked_for_close
)
847 connection_mark_unattached_ap(conn
, control_reason
);
851 /** Helper: change the socks_request->address field on conn to the
852 * dotted-quad representation of <b>new_addr</b> (given in host order),
853 * and send an appropriate REMAP event. */
855 remap_event_helper(edge_connection_t
*conn
, uint32_t new_addr
)
859 in
.s_addr
= htonl(new_addr
);
860 tor_inet_ntoa(&in
, conn
->socks_request
->address
,
861 sizeof(conn
->socks_request
->address
));
862 control_event_stream_status(conn
, STREAM_EVENT_REMAP
,
863 REMAP_STREAM_SOURCE_EXIT
);
866 /** An incoming relay cell has arrived from circuit <b>circ</b> to
867 * stream <b>conn</b>.
869 * The arguments here are the same as in
870 * connection_edge_process_relay_cell() below; this function is called
871 * from there when <b>conn</b> is defined and not in an open state.
874 connection_edge_process_relay_cell_not_open(
875 relay_header_t
*rh
, cell_t
*cell
, circuit_t
*circ
,
876 edge_connection_t
*conn
, crypt_path_t
*layer_hint
)
878 if (rh
->command
== RELAY_COMMAND_END
) {
879 if (CIRCUIT_IS_ORIGIN(circ
) && conn
->_base
.type
== CONN_TYPE_AP
) {
880 return connection_ap_process_end_not_open(rh
, cell
,
881 TO_ORIGIN_CIRCUIT(circ
), conn
,
884 /* we just got an 'end', don't need to send one */
885 conn
->edge_has_sent_end
= 1;
886 conn
->end_reason
= *(cell
->payload
+RELAY_HEADER_SIZE
) |
887 END_STREAM_REASON_FLAG_REMOTE
;
888 connection_mark_for_close(TO_CONN(conn
));
893 if (conn
->_base
.type
== CONN_TYPE_AP
&&
894 rh
->command
== RELAY_COMMAND_CONNECTED
) {
895 tor_assert(CIRCUIT_IS_ORIGIN(circ
));
896 if (conn
->_base
.state
!= AP_CONN_STATE_CONNECT_WAIT
) {
897 log_fn(LOG_PROTOCOL_WARN
, LD_APP
,
898 "Got 'connected' while not in state connect_wait. Dropping.");
901 conn
->_base
.state
= AP_CONN_STATE_OPEN
;
902 log_info(LD_APP
,"'connected' received after %d seconds.",
903 (int)(time(NULL
) - conn
->_base
.timestamp_lastread
));
904 if (rh
->length
>= 4) {
905 uint32_t addr
= ntohl(get_uint32(cell
->payload
+RELAY_HEADER_SIZE
));
907 if (!addr
|| (get_options()->ClientDNSRejectInternalAddresses
&&
908 is_internal_IP(addr
, 0))) {
909 char buf
[INET_NTOA_BUF_LEN
];
911 a
.s_addr
= htonl(addr
);
912 tor_inet_ntoa(&a
, buf
, sizeof(buf
));
914 "...but it claims the IP address was %s. Closing.", buf
);
915 connection_edge_end(conn
, END_STREAM_REASON_TORPROTOCOL
);
916 connection_mark_unattached_ap(conn
, END_STREAM_REASON_TORPROTOCOL
);
920 ttl
= (int)ntohl(get_uint32(cell
->payload
+RELAY_HEADER_SIZE
+4));
923 client_dns_set_addressmap(conn
->socks_request
->address
, addr
,
924 conn
->chosen_exit_name
, ttl
);
926 remap_event_helper(conn
, addr
);
928 circuit_log_path(LOG_INFO
,LD_APP
,TO_ORIGIN_CIRCUIT(circ
));
929 /* don't send a socks reply to transparent conns */
930 if (!conn
->socks_request
->has_finished
)
931 connection_ap_handshake_socks_reply(conn
, NULL
, 0, 0);
933 /* Was it a linked dir conn? If so, a dir request just started to
934 * fetch something; this could be a bootstrap status milestone. */
935 log_debug(LD_APP
, "considering");
936 if (TO_CONN(conn
)->linked_conn
&&
937 TO_CONN(conn
)->linked_conn
->type
== CONN_TYPE_DIR
) {
938 connection_t
*dirconn
= TO_CONN(conn
)->linked_conn
;
939 log_debug(LD_APP
, "it is! %d", dirconn
->purpose
);
940 switch (dirconn
->purpose
) {
941 case DIR_PURPOSE_FETCH_CERTIFICATE
:
942 if (consensus_is_waiting_for_certs())
943 control_event_bootstrap(BOOTSTRAP_STATUS_LOADING_KEYS
, 0);
945 case DIR_PURPOSE_FETCH_CONSENSUS
:
946 control_event_bootstrap(BOOTSTRAP_STATUS_LOADING_STATUS
, 0);
948 case DIR_PURPOSE_FETCH_SERVERDESC
:
949 control_event_bootstrap(BOOTSTRAP_STATUS_LOADING_DESCRIPTORS
,
950 count_loading_descriptors_progress());
955 /* handle anything that might have queued */
956 if (connection_edge_package_raw_inbuf(conn
, 1, NULL
) < 0) {
957 /* (We already sent an end cell if possible) */
958 connection_mark_for_close(TO_CONN(conn
));
963 if (conn
->_base
.type
== CONN_TYPE_AP
&&
964 rh
->command
== RELAY_COMMAND_RESOLVED
) {
968 if (conn
->_base
.state
!= AP_CONN_STATE_RESOLVE_WAIT
) {
969 log_fn(LOG_PROTOCOL_WARN
, LD_APP
, "Got a 'resolved' cell while "
970 "not in state resolve_wait. Dropping.");
973 tor_assert(SOCKS_COMMAND_IS_RESOLVE(conn
->socks_request
->command
));
974 answer_len
= cell
->payload
[RELAY_HEADER_SIZE
+1];
975 if (rh
->length
< 2 || answer_len
+2>rh
->length
) {
976 log_fn(LOG_PROTOCOL_WARN
, LD_PROTOCOL
,
977 "Dropping malformed 'resolved' cell");
978 connection_mark_unattached_ap(conn
, END_STREAM_REASON_TORPROTOCOL
);
981 answer_type
= cell
->payload
[RELAY_HEADER_SIZE
];
982 if (rh
->length
>= answer_len
+6)
983 ttl
= (int)ntohl(get_uint32(cell
->payload
+RELAY_HEADER_SIZE
+
987 if (answer_type
== RESOLVED_TYPE_IPV4
&& answer_len
== 4) {
988 uint32_t addr
= ntohl(get_uint32(cell
->payload
+RELAY_HEADER_SIZE
+2));
989 if (get_options()->ClientDNSRejectInternalAddresses
&&
990 is_internal_IP(addr
, 0)) {
991 char buf
[INET_NTOA_BUF_LEN
];
993 a
.s_addr
= htonl(addr
);
994 tor_inet_ntoa(&a
, buf
, sizeof(buf
));
995 log_info(LD_APP
,"Got a resolve with answer %s. Rejecting.", buf
);
996 connection_ap_handshake_socks_resolved(conn
,
997 RESOLVED_TYPE_ERROR_TRANSIENT
,
998 0, NULL
, 0, TIME_MAX
);
999 connection_mark_unattached_ap(conn
, END_STREAM_REASON_TORPROTOCOL
);
1003 connection_ap_handshake_socks_resolved(conn
,
1005 cell
->payload
[RELAY_HEADER_SIZE
+1], /*answer_len*/
1006 cell
->payload
+RELAY_HEADER_SIZE
+2, /*answer*/
1009 if (answer_type
== RESOLVED_TYPE_IPV4
&& answer_len
== 4) {
1010 uint32_t addr
= ntohl(get_uint32(cell
->payload
+RELAY_HEADER_SIZE
+2));
1011 remap_event_helper(conn
, addr
);
1013 connection_mark_unattached_ap(conn
,
1014 END_STREAM_REASON_DONE
|
1015 END_STREAM_REASON_FLAG_ALREADY_SOCKS_REPLIED
);
1019 log_fn(LOG_PROTOCOL_WARN
, LD_PROTOCOL
,
1020 "Got an unexpected relay command %d, in state %d (%s). Dropping.",
1021 rh
->command
, conn
->_base
.state
,
1022 conn_state_to_string(conn
->_base
.type
, conn
->_base
.state
));
1023 return 0; /* for forward compatibility, don't kill the circuit */
1024 // connection_edge_end(conn, END_STREAM_REASON_TORPROTOCOL);
1025 // connection_mark_for_close(conn);
1029 /** An incoming relay cell has arrived on circuit <b>circ</b>. If
1030 * <b>conn</b> is NULL this is a control cell, else <b>cell</b> is
1031 * destined for <b>conn</b>.
1033 * If <b>layer_hint</b> is defined, then we're the origin of the
1034 * circuit, and it specifies the hop that packaged <b>cell</b>.
1036 * Return -reason if you want to warn and tear down the circuit, else 0.
1039 connection_edge_process_relay_cell(cell_t
*cell
, circuit_t
*circ
,
1040 edge_connection_t
*conn
,
1041 crypt_path_t
*layer_hint
)
1043 static int num_seen
=0;
1045 unsigned domain
= layer_hint
?LD_APP
:LD_EXIT
;
1051 relay_header_unpack(&rh
, cell
->payload
);
1052 // log_fn(LOG_DEBUG,"command %d stream %d", rh.command, rh.stream_id);
1054 log_debug(domain
, "Now seen %d relay cells here (command %d, stream %d).",
1055 num_seen
, rh
.command
, rh
.stream_id
);
1057 if (rh
.length
> RELAY_PAYLOAD_SIZE
) {
1058 log_fn(LOG_PROTOCOL_WARN
, LD_PROTOCOL
,
1059 "Relay cell length field too long. Closing circuit.");
1060 return - END_CIRC_REASON_TORPROTOCOL
;
1063 /* either conn is NULL, in which case we've got a control cell, or else
1064 * conn points to the recognized stream. */
1066 if (conn
&& !connection_state_is_open(TO_CONN(conn
)))
1067 return connection_edge_process_relay_cell_not_open(
1068 &rh
, cell
, circ
, conn
, layer_hint
);
1070 switch (rh
.command
) {
1071 case RELAY_COMMAND_DROP
:
1072 // log_info(domain,"Got a relay-level padding cell. Dropping.");
1074 case RELAY_COMMAND_BEGIN
:
1075 case RELAY_COMMAND_BEGIN_DIR
:
1077 circ
->purpose
!= CIRCUIT_PURPOSE_S_REND_JOINED
) {
1078 log_fn(LOG_PROTOCOL_WARN
, LD_APP
,
1079 "Relay begin request unsupported at AP. Dropping.");
1082 if (circ
->purpose
== CIRCUIT_PURPOSE_S_REND_JOINED
&&
1083 layer_hint
!= TO_ORIGIN_CIRCUIT(circ
)->cpath
->prev
) {
1084 log_fn(LOG_PROTOCOL_WARN
, LD_APP
,
1085 "Relay begin request to Hidden Service "
1086 "from intermediary node. Dropping.");
1090 log_fn(LOG_PROTOCOL_WARN
, domain
,
1091 "Begin cell for known stream. Dropping.");
1094 if (rh
.command
== RELAY_COMMAND_BEGIN_DIR
) {
1095 /* Assign this circuit and its app-ward OR connection a unique ID,
1096 * so that we can measure download times. The local edge and dir
1097 * connection will be assigned the same ID when they are created
1099 static uint64_t next_id
= 0;
1100 circ
->dirreq_id
= ++next_id
;
1101 TO_CONN(TO_OR_CIRCUIT(circ
)->p_conn
)->dirreq_id
= circ
->dirreq_id
;
1104 return connection_exit_begin_conn(cell
, circ
);
1105 case RELAY_COMMAND_DATA
:
1106 ++stats_n_data_cells_received
;
1107 if (( layer_hint
&& --layer_hint
->deliver_window
< 0) ||
1108 (!layer_hint
&& --circ
->deliver_window
< 0)) {
1109 log_fn(LOG_PROTOCOL_WARN
, LD_PROTOCOL
,
1110 "(relay data) circ deliver_window below 0. Killing.");
1111 connection_edge_end(conn
, END_STREAM_REASON_TORPROTOCOL
);
1112 connection_mark_for_close(TO_CONN(conn
));
1113 return -END_CIRC_REASON_TORPROTOCOL
;
1115 log_debug(domain
,"circ deliver_window now %d.", layer_hint
?
1116 layer_hint
->deliver_window
: circ
->deliver_window
);
1118 circuit_consider_sending_sendme(circ
, layer_hint
);
1121 log_info(domain
,"data cell dropped, unknown stream (streamid %d).",
1126 if (--conn
->deliver_window
< 0) { /* is it below 0 after decrement? */
1127 log_fn(LOG_PROTOCOL_WARN
, LD_PROTOCOL
,
1128 "(relay data) conn deliver_window below 0. Killing.");
1129 return -END_CIRC_REASON_TORPROTOCOL
;
1132 stats_n_data_bytes_received
+= rh
.length
;
1133 connection_write_to_buf(cell
->payload
+ RELAY_HEADER_SIZE
,
1134 rh
.length
, TO_CONN(conn
));
1135 connection_edge_consider_sending_sendme(conn
);
1137 case RELAY_COMMAND_END
:
1138 reason
= rh
.length
> 0 ?
1139 *(uint8_t *)(cell
->payload
+RELAY_HEADER_SIZE
) : END_STREAM_REASON_MISC
;
1141 log_info(domain
,"end cell (%s) dropped, unknown stream.",
1142 stream_end_reason_to_string(reason
));
1145 /* XXX add to this log_fn the exit node's nickname? */
1146 log_info(domain
,"%d: end cell (%s) for stream %d. Removing stream.",
1148 stream_end_reason_to_string(reason
),
1150 if (conn
->socks_request
&& !conn
->socks_request
->has_finished
)
1152 "open stream hasn't sent socks answer yet? Closing.");
1153 /* We just *got* an end; no reason to send one. */
1154 conn
->edge_has_sent_end
= 1;
1155 if (!conn
->end_reason
)
1156 conn
->end_reason
= reason
| END_STREAM_REASON_FLAG_REMOTE
;
1157 if (!conn
->_base
.marked_for_close
) {
1158 /* only mark it if not already marked. it's possible to
1159 * get the 'end' right around when the client hangs up on us. */
1160 connection_mark_for_close(TO_CONN(conn
));
1161 conn
->_base
.hold_open_until_flushed
= 1;
1164 case RELAY_COMMAND_EXTEND
:
1166 log_fn(LOG_PROTOCOL_WARN
, domain
,
1167 "'extend' cell received for non-zero stream. Dropping.");
1170 return circuit_extend(cell
, circ
);
1171 case RELAY_COMMAND_EXTENDED
:
1173 log_fn(LOG_PROTOCOL_WARN
, LD_PROTOCOL
,
1174 "'extended' unsupported at non-origin. Dropping.");
1177 log_debug(domain
,"Got an extended cell! Yay.");
1178 if ((reason
= circuit_finish_handshake(TO_ORIGIN_CIRCUIT(circ
),
1180 cell
->payload
+RELAY_HEADER_SIZE
)) < 0) {
1181 log_warn(domain
,"circuit_finish_handshake failed.");
1184 if ((reason
=circuit_send_next_onion_skin(TO_ORIGIN_CIRCUIT(circ
)))<0) {
1185 log_info(domain
,"circuit_send_next_onion_skin() failed.");
1189 case RELAY_COMMAND_TRUNCATE
:
1191 log_fn(LOG_PROTOCOL_WARN
, LD_APP
,
1192 "'truncate' unsupported at origin. Dropping.");
1196 uint8_t trunc_reason
= *(uint8_t*)(cell
->payload
+ RELAY_HEADER_SIZE
);
1197 circuit_clear_cell_queue(circ
, circ
->n_conn
);
1198 connection_or_send_destroy(circ
->n_circ_id
, circ
->n_conn
,
1200 circuit_set_n_circid_orconn(circ
, 0, NULL
);
1202 log_debug(LD_EXIT
, "Processed 'truncate', replying.");
1205 payload
[0] = (char)END_CIRC_REASON_REQUESTED
;
1206 relay_send_command_from_edge(0, circ
, RELAY_COMMAND_TRUNCATED
,
1207 payload
, sizeof(payload
), NULL
);
1210 case RELAY_COMMAND_TRUNCATED
:
1212 log_fn(LOG_PROTOCOL_WARN
, LD_EXIT
,
1213 "'truncated' unsupported at non-origin. Dropping.");
1216 circuit_truncated(TO_ORIGIN_CIRCUIT(circ
), layer_hint
);
1218 case RELAY_COMMAND_CONNECTED
:
1220 log_fn(LOG_PROTOCOL_WARN
, LD_PROTOCOL
,
1221 "'connected' unsupported while open. Closing circ.");
1222 return -END_CIRC_REASON_TORPROTOCOL
;
1225 "'connected' received, no conn attached anymore. Ignoring.");
1227 case RELAY_COMMAND_SENDME
:
1230 layer_hint
->package_window
+= CIRCWINDOW_INCREMENT
;
1231 log_debug(LD_APP
,"circ-level sendme at origin, packagewindow %d.",
1232 layer_hint
->package_window
);
1233 circuit_resume_edge_reading(circ
, layer_hint
);
1235 circ
->package_window
+= CIRCWINDOW_INCREMENT
;
1237 "circ-level sendme at non-origin, packagewindow %d.",
1238 circ
->package_window
);
1239 circuit_resume_edge_reading(circ
, layer_hint
);
1243 conn
->package_window
+= STREAMWINDOW_INCREMENT
;
1244 log_debug(domain
,"stream-level sendme, packagewindow now %d.",
1245 conn
->package_window
);
1246 if (circuit_queue_streams_are_blocked(circ
)) {
1247 /* Still waiting for queue to flush; don't touch conn */
1250 connection_start_reading(TO_CONN(conn
));
1251 /* handle whatever might still be on the inbuf */
1252 if (connection_edge_package_raw_inbuf(conn
, 1, NULL
) < 0) {
1253 /* (We already sent an end cell if possible) */
1254 connection_mark_for_close(TO_CONN(conn
));
1258 case RELAY_COMMAND_RESOLVE
:
1260 log_fn(LOG_PROTOCOL_WARN
, LD_APP
,
1261 "resolve request unsupported at AP; dropping.");
1264 log_fn(LOG_PROTOCOL_WARN
, domain
,
1265 "resolve request for known stream; dropping.");
1267 } else if (circ
->purpose
!= CIRCUIT_PURPOSE_OR
) {
1268 log_fn(LOG_PROTOCOL_WARN
, domain
,
1269 "resolve request on circ with purpose %d; dropping",
1273 connection_exit_begin_resolve(cell
, TO_OR_CIRCUIT(circ
));
1275 case RELAY_COMMAND_RESOLVED
:
1277 log_fn(LOG_PROTOCOL_WARN
, domain
,
1278 "'resolved' unsupported while open. Closing circ.");
1279 return -END_CIRC_REASON_TORPROTOCOL
;
1282 "'resolved' received, no conn attached anymore. Ignoring.");
1284 case RELAY_COMMAND_ESTABLISH_INTRO
:
1285 case RELAY_COMMAND_ESTABLISH_RENDEZVOUS
:
1286 case RELAY_COMMAND_INTRODUCE1
:
1287 case RELAY_COMMAND_INTRODUCE2
:
1288 case RELAY_COMMAND_INTRODUCE_ACK
:
1289 case RELAY_COMMAND_RENDEZVOUS1
:
1290 case RELAY_COMMAND_RENDEZVOUS2
:
1291 case RELAY_COMMAND_INTRO_ESTABLISHED
:
1292 case RELAY_COMMAND_RENDEZVOUS_ESTABLISHED
:
1293 rend_process_relay_cell(circ
, layer_hint
,
1294 rh
.command
, rh
.length
,
1295 cell
->payload
+RELAY_HEADER_SIZE
);
1298 log_fn(LOG_PROTOCOL_WARN
, LD_PROTOCOL
,
1299 "Received unknown relay command %d. Perhaps the other side is using "
1300 "a newer version of Tor? Dropping.",
1302 return 0; /* for forward compatibility, don't kill the circuit */
1305 /** How many relay_data cells have we built, ever? */
1306 uint64_t stats_n_data_cells_packaged
= 0;
1307 /** How many bytes of data have we put in relay_data cells have we built,
1308 * ever? This would be RELAY_PAYLOAD_SIZE*stats_n_data_cells_packaged if
1309 * every relay cell we ever sent were completely full of data. */
1310 uint64_t stats_n_data_bytes_packaged
= 0;
1311 /** How many relay_data cells have we received, ever? */
1312 uint64_t stats_n_data_cells_received
= 0;
1313 /** How many bytes of data have we received relay_data cells, ever? This would
1314 * be RELAY_PAYLOAD_SIZE*stats_n_data_cells_packaged if every relay cell we
1315 * ever received were completely full of data. */
1316 uint64_t stats_n_data_bytes_received
= 0;
1318 /** If <b>conn</b> has an entire relay payload of bytes on its inbuf (or
1319 * <b>package_partial</b> is true), and the appropriate package windows aren't
1320 * empty, grab a cell and send it down the circuit.
1322 * If *<b>max_cells</b> is given, package no more than max_cells. Decrement
1323 * *<b>max_cells</b> by the number of cells packaged.
1325 * Return -1 (and send a RELAY_COMMAND_END cell if necessary) if conn should
1326 * be marked for close, else return 0.
1329 connection_edge_package_raw_inbuf(edge_connection_t
*conn
, int package_partial
,
1332 size_t amount_to_process
, length
;
1333 char payload
[CELL_PAYLOAD_SIZE
];
1335 unsigned domain
= conn
->cpath_layer
? LD_APP
: LD_EXIT
;
1339 if (conn
->_base
.marked_for_close
) {
1341 "called on conn that's already marked for close at %s:%d.",
1342 conn
->_base
.marked_for_close_file
, conn
->_base
.marked_for_close
);
1346 if (max_cells
&& *max_cells
<= 0)
1349 repeat_connection_edge_package_raw_inbuf
:
1351 circ
= circuit_get_by_edge_conn(conn
);
1353 log_info(domain
,"conn has no circuit! Closing.");
1354 conn
->end_reason
= END_STREAM_REASON_CANT_ATTACH
;
1358 if (circuit_consider_stop_edge_reading(circ
, conn
->cpath_layer
))
1361 if (conn
->package_window
<= 0) {
1362 log_info(domain
,"called with package_window %d. Skipping.",
1363 conn
->package_window
);
1364 connection_stop_reading(TO_CONN(conn
));
1368 amount_to_process
= buf_datalen(conn
->_base
.inbuf
);
1370 if (!amount_to_process
)
1373 if (!package_partial
&& amount_to_process
< RELAY_PAYLOAD_SIZE
)
1376 if (amount_to_process
> RELAY_PAYLOAD_SIZE
) {
1377 length
= RELAY_PAYLOAD_SIZE
;
1379 length
= amount_to_process
;
1381 stats_n_data_bytes_packaged
+= length
;
1382 stats_n_data_cells_packaged
+= 1;
1384 connection_fetch_from_buf(payload
, length
, TO_CONN(conn
));
1386 log_debug(domain
,"(%d) Packaging %d bytes (%d waiting).", conn
->_base
.s
,
1387 (int)length
, (int)buf_datalen(conn
->_base
.inbuf
));
1389 if (connection_edge_send_command(conn
, RELAY_COMMAND_DATA
,
1390 payload
, length
) < 0 )
1391 /* circuit got marked for close, don't continue, don't need to mark conn */
1394 if (!conn
->cpath_layer
) { /* non-rendezvous exit */
1395 tor_assert(circ
->package_window
> 0);
1396 circ
->package_window
--;
1397 } else { /* we're an AP, or an exit on a rendezvous circ */
1398 tor_assert(conn
->cpath_layer
->package_window
> 0);
1399 conn
->cpath_layer
->package_window
--;
1402 if (--conn
->package_window
<= 0) { /* is it 0 after decrement? */
1403 connection_stop_reading(TO_CONN(conn
));
1404 log_debug(domain
,"conn->package_window reached 0.");
1405 circuit_consider_stop_edge_reading(circ
, conn
->cpath_layer
);
1406 return 0; /* don't process the inbuf any more */
1408 log_debug(domain
,"conn->package_window is now %d",conn
->package_window
);
1412 if (*max_cells
<= 0)
1416 /* handle more if there's more, or return 0 if there isn't */
1417 goto repeat_connection_edge_package_raw_inbuf
;
1420 /** Called when we've just received a relay data cell, or when
1421 * we've just finished flushing all bytes to stream <b>conn</b>.
1423 * If conn->outbuf is not too full, and our deliver window is
1424 * low, send back a suitable number of stream-level sendme cells.
1427 connection_edge_consider_sending_sendme(edge_connection_t
*conn
)
1431 if (connection_outbuf_too_full(TO_CONN(conn
)))
1434 circ
= circuit_get_by_edge_conn(conn
);
1436 /* this can legitimately happen if the destroy has already
1437 * arrived and torn down the circuit */
1438 log_info(LD_APP
,"No circuit associated with conn. Skipping.");
1442 while (conn
->deliver_window
<= STREAMWINDOW_START
- STREAMWINDOW_INCREMENT
) {
1443 log_debug(conn
->cpath_layer
?LD_APP
:LD_EXIT
,
1444 "Outbuf %d, Queuing stream sendme.",
1445 (int)conn
->_base
.outbuf_flushlen
);
1446 conn
->deliver_window
+= STREAMWINDOW_INCREMENT
;
1447 if (connection_edge_send_command(conn
, RELAY_COMMAND_SENDME
,
1449 log_warn(LD_APP
,"connection_edge_send_command failed. Skipping.");
1450 return; /* the circuit's closed, don't continue */
1455 /** The circuit <b>circ</b> has received a circuit-level sendme
1456 * (on hop <b>layer_hint</b>, if we're the OP). Go through all the
1457 * attached streams and let them resume reading and packaging, if
1458 * their stream windows allow it.
1461 circuit_resume_edge_reading(circuit_t
*circ
, crypt_path_t
*layer_hint
)
1463 if (circuit_queue_streams_are_blocked(circ
)) {
1464 log_debug(layer_hint
?LD_APP
:LD_EXIT
,"Too big queue, no resuming");
1467 log_debug(layer_hint
?LD_APP
:LD_EXIT
,"resuming");
1469 if (CIRCUIT_IS_ORIGIN(circ
))
1470 circuit_resume_edge_reading_helper(TO_ORIGIN_CIRCUIT(circ
)->p_streams
,
1473 circuit_resume_edge_reading_helper(TO_OR_CIRCUIT(circ
)->n_streams
,
1477 /** A helper function for circuit_resume_edge_reading() above.
1478 * The arguments are the same, except that <b>conn</b> is the head
1479 * of a linked list of edge streams that should each be considered.
1482 circuit_resume_edge_reading_helper(edge_connection_t
*first_conn
,
1484 crypt_path_t
*layer_hint
)
1486 edge_connection_t
*conn
;
1487 int n_streams
, n_streams_left
;
1488 int packaged_this_round
;
1492 /* How many cells do we have space for? It will be the minimum of
1493 * the number needed to exhaust the package window, and the minimum
1494 * needed to fill the cell queue. */
1495 int max_to_package
= circ
->package_window
;
1496 if (CIRCUIT_IS_ORIGIN(circ
)) {
1497 cells_on_queue
= circ
->n_conn_cells
.n
;
1499 or_circuit_t
*or_circ
= TO_OR_CIRCUIT(circ
);
1500 cells_on_queue
= or_circ
->p_conn_cells
.n
;
1502 if (CELL_QUEUE_HIGHWATER_SIZE
- cells_on_queue
< max_to_package
)
1503 max_to_package
= CELL_QUEUE_HIGHWATER_SIZE
- cells_on_queue
;
1505 /* Count how many non-marked streams there are that have anything on
1506 * their inbuf, and enable reading on all of the connections. */
1508 for (conn
=first_conn
; conn
; conn
=conn
->next_stream
) {
1509 if (conn
->_base
.marked_for_close
|| conn
->package_window
<= 0)
1511 if (!layer_hint
|| conn
->cpath_layer
== layer_hint
) {
1512 connection_start_reading(TO_CONN(conn
));
1514 if (buf_datalen(conn
->_base
.inbuf
) > 0)
1519 if (n_streams
== 0) /* avoid divide-by-zero */
1524 cells_per_conn
= CEIL_DIV(max_to_package
, n_streams
);
1526 packaged_this_round
= 0;
1529 /* Iterate over all connections. Package up to cells_per_conn cells on
1530 * each. Update packaged_this_round with the total number of cells
1531 * packaged, and n_streams_left with the number that still have data to
1534 for (conn
=first_conn
; conn
; conn
=conn
->next_stream
) {
1535 if (conn
->_base
.marked_for_close
|| conn
->package_window
<= 0)
1537 if (!layer_hint
|| conn
->cpath_layer
== layer_hint
) {
1538 int n
= cells_per_conn
, r
;
1539 /* handle whatever might still be on the inbuf */
1540 r
= connection_edge_package_raw_inbuf(conn
, 1, &n
);
1542 /* Note how many we packaged */
1543 packaged_this_round
+= (cells_per_conn
-n
);
1546 /* Problem while packaging. (We already sent an end cell if
1548 connection_mark_for_close(TO_CONN(conn
));
1552 /* If there's still data to read, we'll be coming back to this stream. */
1553 if (buf_datalen(conn
->_base
.inbuf
))
1556 /* If the circuit won't accept any more data, return without looking
1557 * at any more of the streams. Any connections that should be stopped
1558 * have already been stopped by connection_edge_package_raw_inbuf. */
1559 if (circuit_consider_stop_edge_reading(circ
, layer_hint
))
1561 /* XXXX should we also stop immediately if we fill up the cell queue?
1566 /* If we made progress, and we are willing to package more, and there are
1567 * any streams left that want to package stuff... try again!
1569 if (packaged_this_round
&& packaged_this_round
< max_to_package
&&
1571 max_to_package
-= packaged_this_round
;
1572 n_streams
= n_streams_left
;
1579 /** Check if the package window for <b>circ</b> is empty (at
1580 * hop <b>layer_hint</b> if it's defined).
1582 * If yes, tell edge streams to stop reading and return 1.
1586 circuit_consider_stop_edge_reading(circuit_t
*circ
, crypt_path_t
*layer_hint
)
1588 edge_connection_t
*conn
= NULL
;
1589 unsigned domain
= layer_hint
? LD_APP
: LD_EXIT
;
1592 or_circuit_t
*or_circ
= TO_OR_CIRCUIT(circ
);
1593 log_debug(domain
,"considering circ->package_window %d",
1594 circ
->package_window
);
1595 if (circ
->package_window
<= 0) {
1596 log_debug(domain
,"yes, not-at-origin. stopped.");
1597 for (conn
= or_circ
->n_streams
; conn
; conn
=conn
->next_stream
)
1598 connection_stop_reading(TO_CONN(conn
));
1603 /* else, layer hint is defined, use it */
1604 log_debug(domain
,"considering layer_hint->package_window %d",
1605 layer_hint
->package_window
);
1606 if (layer_hint
->package_window
<= 0) {
1607 log_debug(domain
,"yes, at-origin. stopped.");
1608 for (conn
= TO_ORIGIN_CIRCUIT(circ
)->p_streams
; conn
;
1609 conn
=conn
->next_stream
)
1610 if (conn
->cpath_layer
== layer_hint
)
1611 connection_stop_reading(TO_CONN(conn
));
1617 /** Check if the deliver_window for circuit <b>circ</b> (at hop
1618 * <b>layer_hint</b> if it's defined) is low enough that we should
1619 * send a circuit-level sendme back down the circuit. If so, send
1620 * enough sendmes that the window would be overfull if we sent any
1624 circuit_consider_sending_sendme(circuit_t
*circ
, crypt_path_t
*layer_hint
)
1626 // log_fn(LOG_INFO,"Considering: layer_hint is %s",
1627 // layer_hint ? "defined" : "null");
1628 while ((layer_hint
? layer_hint
->deliver_window
: circ
->deliver_window
) <=
1629 CIRCWINDOW_START
- CIRCWINDOW_INCREMENT
) {
1630 log_debug(LD_CIRC
,"Queuing circuit sendme.");
1632 layer_hint
->deliver_window
+= CIRCWINDOW_INCREMENT
;
1634 circ
->deliver_window
+= CIRCWINDOW_INCREMENT
;
1635 if (relay_send_command_from_edge(0, circ
, RELAY_COMMAND_SENDME
,
1636 NULL
, 0, layer_hint
) < 0) {
1638 "relay_send_command_from_edge failed. Circuit's closed.");
1639 return; /* the circuit's closed, don't continue */
1644 #ifdef ACTIVE_CIRCUITS_PARANOIA
1645 #define assert_active_circuits_ok_paranoid(conn) \
1646 assert_active_circuits_ok(conn)
1648 #define assert_active_circuits_ok_paranoid(conn)
1651 /** The total number of cells we have allocated from the memory pool. */
1652 static int total_cells_allocated
= 0;
1654 /** A memory pool to allocate packed_cell_t objects. */
1655 static mp_pool_t
*cell_pool
= NULL
;
1657 /** Memory pool to allocate insertion_time_elem_t objects used for cell
1659 static mp_pool_t
*it_pool
= NULL
;
1661 /** Allocate structures to hold cells. */
1663 init_cell_pool(void)
1665 tor_assert(!cell_pool
);
1666 cell_pool
= mp_pool_new(sizeof(packed_cell_t
), 128*1024);
1669 /** Free all storage used to hold cells (and insertion times if we measure
1670 * cell statistics). */
1672 free_cell_pool(void)
1674 /* Maybe we haven't called init_cell_pool yet; need to check for it. */
1676 mp_pool_destroy(cell_pool
);
1680 mp_pool_destroy(it_pool
);
1685 /** Free excess storage in cell pool. */
1687 clean_cell_pool(void)
1689 tor_assert(cell_pool
);
1690 mp_pool_clean(cell_pool
, 0, 1);
1693 /** Release storage held by <b>cell</b>. */
1695 packed_cell_free_unchecked(packed_cell_t
*cell
)
1697 --total_cells_allocated
;
1698 mp_pool_release(cell
);
1701 /** Allocate and return a new packed_cell_t. */
1702 static INLINE packed_cell_t
*
1703 packed_cell_alloc(void)
1705 ++total_cells_allocated
;
1706 return mp_pool_get(cell_pool
);
1709 /** Log current statistics for cell pool allocation at log level
1710 * <b>severity</b>. */
1712 dump_cell_pool_usage(int severity
)
1717 for (c
= _circuit_get_global_list(); c
; c
= c
->next
) {
1718 n_cells
+= c
->n_conn_cells
.n
;
1719 if (!CIRCUIT_IS_ORIGIN(c
))
1720 n_cells
+= TO_OR_CIRCUIT(c
)->p_conn_cells
.n
;
1723 log(severity
, LD_MM
, "%d cells allocated on %d circuits. %d cells leaked.",
1724 n_cells
, n_circs
, total_cells_allocated
- n_cells
);
1725 mp_pool_log_status(cell_pool
, severity
);
1728 /** Allocate a new copy of packed <b>cell</b>. */
1729 static INLINE packed_cell_t
*
1730 packed_cell_copy(const cell_t
*cell
)
1732 packed_cell_t
*c
= packed_cell_alloc();
1738 /** Append <b>cell</b> to the end of <b>queue</b>. */
1740 cell_queue_append(cell_queue_t
*queue
, packed_cell_t
*cell
)
1743 tor_assert(!queue
->tail
->next
);
1744 queue
->tail
->next
= cell
;
1753 /** Append a newly allocated copy of <b>cell</b> to the end of <b>queue</b> */
1755 cell_queue_append_packed_copy(cell_queue_t
*queue
, const cell_t
*cell
)
1757 packed_cell_t
*copy
= packed_cell_copy(cell
);
1758 /* Remember the time when this cell was put in the queue. */
1759 if (get_options()->CellStatistics
) {
1762 insertion_time_queue_t
*it_queue
= queue
->insertion_times
;
1764 it_pool
= mp_pool_new(sizeof(insertion_time_elem_t
), 1024);
1765 tor_gettimeofday_cached(&now
);
1766 #define SECONDS_IN_A_DAY 86400L
1767 added
= (uint32_t)(((now
.tv_sec
% SECONDS_IN_A_DAY
) * 100L)
1768 + ((uint32_t)now
.tv_usec
/ (uint32_t)10000L));
1770 it_queue
= tor_malloc_zero(sizeof(insertion_time_queue_t
));
1771 queue
->insertion_times
= it_queue
;
1773 if (it_queue
->last
&& it_queue
->last
->insertion_time
== added
) {
1774 it_queue
->last
->counter
++;
1776 insertion_time_elem_t
*elem
= mp_pool_get(it_pool
);
1778 elem
->insertion_time
= added
;
1780 if (it_queue
->last
) {
1781 it_queue
->last
->next
= elem
;
1782 it_queue
->last
= elem
;
1784 it_queue
->first
= it_queue
->last
= elem
;
1788 cell_queue_append(queue
, copy
);
1791 /** Remove and free every cell in <b>queue</b>. */
1793 cell_queue_clear(cell_queue_t
*queue
)
1795 packed_cell_t
*cell
, *next
;
1799 packed_cell_free_unchecked(cell
);
1802 queue
->head
= queue
->tail
= NULL
;
1804 if (queue
->insertion_times
) {
1805 while (queue
->insertion_times
->first
) {
1806 insertion_time_elem_t
*elem
= queue
->insertion_times
->first
;
1807 queue
->insertion_times
->first
= elem
->next
;
1808 mp_pool_release(elem
);
1810 tor_free(queue
->insertion_times
);
1814 /** Extract and return the cell at the head of <b>queue</b>; return NULL if
1815 * <b>queue</b> is empty. */
1816 static INLINE packed_cell_t
*
1817 cell_queue_pop(cell_queue_t
*queue
)
1819 packed_cell_t
*cell
= queue
->head
;
1822 queue
->head
= cell
->next
;
1823 if (cell
== queue
->tail
) {
1824 tor_assert(!queue
->head
);
1831 /** Return a pointer to the "next_active_on_{n,p}_conn" pointer of <b>circ</b>,
1832 * depending on whether <b>conn</b> matches n_conn or p_conn. */
1833 static INLINE circuit_t
**
1834 next_circ_on_conn_p(circuit_t
*circ
, or_connection_t
*conn
)
1838 if (conn
== circ
->n_conn
) {
1839 return &circ
->next_active_on_n_conn
;
1841 or_circuit_t
*orcirc
= TO_OR_CIRCUIT(circ
);
1842 tor_assert(conn
== orcirc
->p_conn
);
1843 return &orcirc
->next_active_on_p_conn
;
1847 /** Return a pointer to the "prev_active_on_{n,p}_conn" pointer of <b>circ</b>,
1848 * depending on whether <b>conn</b> matches n_conn or p_conn. */
1849 static INLINE circuit_t
**
1850 prev_circ_on_conn_p(circuit_t
*circ
, or_connection_t
*conn
)
1854 if (conn
== circ
->n_conn
) {
1855 return &circ
->prev_active_on_n_conn
;
1857 or_circuit_t
*orcirc
= TO_OR_CIRCUIT(circ
);
1858 tor_assert(conn
== orcirc
->p_conn
);
1859 return &orcirc
->prev_active_on_p_conn
;
1863 /** Helper for sorting cell_ewma_t values in their priority queue. */
1865 compare_cell_ewma_counts(const void *p1
, const void *p2
)
1867 const cell_ewma_t
*e1
=p1
, *e2
=p2
;
1868 if (e1
->cell_count
< e2
->cell_count
)
1870 else if (e1
->cell_count
> e2
->cell_count
)
1876 /** Given a cell_ewma_t, return a pointer to the circuit containing it. */
1878 cell_ewma_to_circuit(cell_ewma_t
*ewma
)
1880 if (ewma
->is_for_p_conn
) {
1881 /* This is an or_circuit_t's p_cell_ewma. */
1882 or_circuit_t
*orcirc
= SUBTYPE_P(ewma
, or_circuit_t
, p_cell_ewma
);
1883 return TO_CIRCUIT(orcirc
);
1885 /* This is some circuit's n_cell_ewma. */
1886 return SUBTYPE_P(ewma
, circuit_t
, n_cell_ewma
);
1890 /* ==== Functions for scaling cell_ewma_t ====
1892 When choosing which cells to relay first, we favor circuits that have been
1893 quiet recently. This gives better latency on connections that aren't
1894 pushing lots of data, and makes the network feel more interactive.
1896 Conceptually, we take an exponentially weighted mean average of the number
1897 of cells a circuit has sent, and allow active circuits (those with cells to
1898 relay) to send cells in reverse order of their exponentially-weighted mean
1899 average (EWMA) cell count. [That is, a cell sent N seconds ago 'counts'
1900 F^N times as much as a cell sent now, for 0<F<1.0, and we favor the
1901 circuit that has sent the fewest cells]
1903 If 'double' had infinite precision, we could do this simply by counting a
1904 cell sent at startup as having weight 1.0, and a cell sent N seconds later
1905 as having weight F^-N. This way, we would never need to re-scale
1906 any already-sent cells.
1908 To prevent double from overflowing, we could count a cell sent now as
1909 having weight 1.0 and a cell sent N seconds ago as having weight F^N.
1910 This, however, would mean we'd need to re-scale *ALL* old circuits every
1911 time we wanted to send a cell.
1913 So as a compromise, we divide time into 'ticks' (currently, 10-second
1914 increments) and say that a cell sent at the start of a current tick is
1915 worth 1.0, a cell sent N seconds before the start of the current tick is
1916 worth F^N, and a cell sent N seconds after the start of the current tick is
1917 worth F^-N. This way we don't overflow, and we don't need to constantly
1921 /** How long does a tick last (seconds)? */
1922 #define EWMA_TICK_LEN 10
1924 /** The default per-tick scale factor, if it hasn't been overridden by a
1925 * consensus or a configuration setting. zero means "disabled". */
1926 #define EWMA_DEFAULT_HALFLIFE 0.0
1928 /** Given a timeval <b>now</b>, compute the cell_ewma tick in which it occurs
1929 * and the fraction of the tick that has elapsed between the start of the tick
1930 * and <b>now</b>. Return the former and store the latter in
1931 * *<b>remainder_out</b>.
1933 * These tick values are not meant to be shared between Tor instances, or used
1934 * for other purposes. */
1936 cell_ewma_tick_from_timeval(const struct timeval
*now
,
1937 double *remainder_out
)
1939 unsigned res
= (unsigned) (now
->tv_sec
/ EWMA_TICK_LEN
);
1941 double rem
= (now
->tv_sec
% EWMA_TICK_LEN
) +
1942 ((double)(now
->tv_usec
)) / 1.0e6
;
1943 *remainder_out
= rem
/ EWMA_TICK_LEN
;
1947 /** Compute and return the current cell_ewma tick. */
1949 cell_ewma_get_tick(void)
1951 return ((unsigned)approx_time() / EWMA_TICK_LEN
);
1954 /** The per-tick scale factor to be used when computing cell-count EWMA
1955 * values. (A cell sent N ticks before the start of the current tick
1956 * has value ewma_scale_factor ** N.)
1958 static double ewma_scale_factor
= 0.1;
1959 static int ewma_enabled
= 0;
1961 #define EPSILON 0.00001
1962 #define LOG_ONEHALF -0.69314718055994529
1964 /** Adjust the global cell scale factor based on <b>options</b> */
1966 cell_ewma_set_scale_factor(or_options_t
*options
, networkstatus_t
*consensus
)
1968 int32_t halflife_ms
;
1971 if (options
&& options
->CircuitPriorityHalflife
>= -EPSILON
) {
1972 halflife
= options
->CircuitPriorityHalflife
;
1973 source
= "CircuitPriorityHalflife in configuration";
1974 } else if (consensus
&&
1975 (halflife_ms
= networkstatus_get_param(
1976 consensus
, "CircuitPriorityHalflifeMsec", -1)) >= 0) {
1977 halflife
= ((double)halflife_ms
)/1000.0;
1978 source
= "CircuitPriorityHalflifeMsec in consensus";
1980 halflife
= EWMA_DEFAULT_HALFLIFE
;
1981 source
= "Default value";
1984 if (halflife
<= EPSILON
) {
1985 /* The cell EWMA algorithm is disabled. */
1986 ewma_scale_factor
= 0.1;
1989 "Disabled cell_ewma algorithm because of value in %s",
1992 /* convert halflife into halflife-per-tick. */
1993 halflife
/= EWMA_TICK_LEN
;
1994 /* compute per-tick scale factor. */
1995 ewma_scale_factor
= exp( LOG_ONEHALF
/ halflife
);
1998 "Enabled cell_ewma algorithm because of value in %s; "
1999 "scale factor is %lf per %d seconds",
2000 source
, ewma_scale_factor
, EWMA_TICK_LEN
);
2004 /** Return the multiplier necessary to convert the value of a cell sent in
2005 * 'from_tick' to one sent in 'to_tick'. */
2006 static INLINE
double
2007 get_scale_factor(unsigned from_tick
, unsigned to_tick
)
2009 /* This math can wrap around, but that's okay: unsigned overflow is
2011 int diff
= (int)(to_tick
- from_tick
);
2012 return pow(ewma_scale_factor
, diff
);
2015 /** Adjust the cell count of <b>ewma</b> so that it is scaled with respect to
2016 * <b>cur_tick</b> */
2018 scale_single_cell_ewma(cell_ewma_t
*ewma
, unsigned cur_tick
)
2020 double factor
= get_scale_factor(ewma
->last_adjusted_tick
, cur_tick
);
2021 ewma
->cell_count
*= factor
;
2022 ewma
->last_adjusted_tick
= cur_tick
;
2025 /** Adjust the cell count of every active circuit on <b>conn</b> so
2026 * that they are scaled with respect to <b>cur_tick</b> */
2028 scale_active_circuits(or_connection_t
*conn
, unsigned cur_tick
)
2031 double factor
= get_scale_factor(
2032 conn
->active_circuit_pqueue_last_recalibrated
,
2034 /** Ordinarily it isn't okay to change the value of an element in a heap,
2035 * but it's okay here, since we are preserving the order. */
2036 SMARTLIST_FOREACH(conn
->active_circuit_pqueue
, cell_ewma_t
*, e
, {
2037 tor_assert(e
->last_adjusted_tick
==
2038 conn
->active_circuit_pqueue_last_recalibrated
);
2039 e
->cell_count
*= factor
;
2040 e
->last_adjusted_tick
= cur_tick
;
2042 conn
->active_circuit_pqueue_last_recalibrated
= cur_tick
;
2045 /** Rescale <b>ewma</b> to the same scale as <b>conn</b>, and add it to
2046 * <b>conn</b>'s priority queue of active circuits */
2048 add_cell_ewma_to_conn(or_connection_t
*conn
, cell_ewma_t
*ewma
)
2050 tor_assert(ewma
->heap_index
== -1);
2051 scale_single_cell_ewma(ewma
,
2052 conn
->active_circuit_pqueue_last_recalibrated
);
2054 smartlist_pqueue_add(conn
->active_circuit_pqueue
,
2055 compare_cell_ewma_counts
,
2056 STRUCT_OFFSET(cell_ewma_t
, heap_index
),
2060 /** Remove <b>ewma</b> from <b>conn</b>'s priority queue of active circuits */
2062 remove_cell_ewma_from_conn(or_connection_t
*conn
, cell_ewma_t
*ewma
)
2064 tor_assert(ewma
->heap_index
!= -1);
2065 smartlist_pqueue_remove(conn
->active_circuit_pqueue
,
2066 compare_cell_ewma_counts
,
2067 STRUCT_OFFSET(cell_ewma_t
, heap_index
),
2071 /** Remove and return the first cell_ewma_t from conn's priority queue of
2072 * active circuits. Requires that the priority queue is nonempty. */
2073 static cell_ewma_t
*
2074 pop_first_cell_ewma_from_conn(or_connection_t
*conn
)
2076 return smartlist_pqueue_pop(conn
->active_circuit_pqueue
,
2077 compare_cell_ewma_counts
,
2078 STRUCT_OFFSET(cell_ewma_t
, heap_index
));
2081 /** Add <b>circ</b> to the list of circuits with pending cells on
2082 * <b>conn</b>. No effect if <b>circ</b> is already linked. */
2084 make_circuit_active_on_conn(circuit_t
*circ
, or_connection_t
*conn
)
2086 circuit_t
**nextp
= next_circ_on_conn_p(circ
, conn
);
2087 circuit_t
**prevp
= prev_circ_on_conn_p(circ
, conn
);
2089 if (*nextp
&& *prevp
) {
2090 /* Already active. */
2094 assert_active_circuits_ok_paranoid(conn
);
2096 if (! conn
->active_circuits
) {
2097 conn
->active_circuits
= circ
;
2098 *prevp
= *nextp
= circ
;
2100 circuit_t
*head
= conn
->active_circuits
;
2101 circuit_t
*old_tail
= *prev_circ_on_conn_p(head
, conn
);
2102 *next_circ_on_conn_p(old_tail
, conn
) = circ
;
2104 *prev_circ_on_conn_p(head
, conn
) = circ
;
2108 if (circ
->n_conn
== conn
) {
2109 add_cell_ewma_to_conn(conn
, &circ
->n_cell_ewma
);
2111 or_circuit_t
*orcirc
= TO_OR_CIRCUIT(circ
);
2112 tor_assert(conn
== orcirc
->p_conn
);
2113 add_cell_ewma_to_conn(conn
, &orcirc
->p_cell_ewma
);
2116 assert_active_circuits_ok_paranoid(conn
);
2119 /** Remove <b>circ</b> from the list of circuits with pending cells on
2120 * <b>conn</b>. No effect if <b>circ</b> is already unlinked. */
2122 make_circuit_inactive_on_conn(circuit_t
*circ
, or_connection_t
*conn
)
2124 circuit_t
**nextp
= next_circ_on_conn_p(circ
, conn
);
2125 circuit_t
**prevp
= prev_circ_on_conn_p(circ
, conn
);
2126 circuit_t
*next
= *nextp
, *prev
= *prevp
;
2128 if (!next
&& !prev
) {
2129 /* Already inactive. */
2133 assert_active_circuits_ok_paranoid(conn
);
2135 tor_assert(next
&& prev
);
2136 tor_assert(*prev_circ_on_conn_p(next
, conn
) == circ
);
2137 tor_assert(*next_circ_on_conn_p(prev
, conn
) == circ
);
2140 conn
->active_circuits
= NULL
;
2142 *prev_circ_on_conn_p(next
, conn
) = prev
;
2143 *next_circ_on_conn_p(prev
, conn
) = next
;
2144 if (conn
->active_circuits
== circ
)
2145 conn
->active_circuits
= next
;
2147 *prevp
= *nextp
= NULL
;
2149 if (circ
->n_conn
== conn
) {
2150 remove_cell_ewma_from_conn(conn
, &circ
->n_cell_ewma
);
2152 or_circuit_t
*orcirc
= TO_OR_CIRCUIT(circ
);
2153 tor_assert(conn
== orcirc
->p_conn
);
2154 remove_cell_ewma_from_conn(conn
, &orcirc
->p_cell_ewma
);
2157 assert_active_circuits_ok_paranoid(conn
);
2160 /** Remove all circuits from the list of circuits with pending cells on
2163 connection_or_unlink_all_active_circs(or_connection_t
*orconn
)
2165 circuit_t
*head
= orconn
->active_circuits
;
2166 circuit_t
*cur
= head
;
2170 circuit_t
*next
= *next_circ_on_conn_p(cur
, orconn
);
2171 *prev_circ_on_conn_p(cur
, orconn
) = NULL
;
2172 *next_circ_on_conn_p(cur
, orconn
) = NULL
;
2174 } while (cur
!= head
);
2175 orconn
->active_circuits
= NULL
;
2177 SMARTLIST_FOREACH(orconn
->active_circuit_pqueue
, cell_ewma_t
*, e
,
2178 e
->heap_index
= -1);
2179 smartlist_clear(orconn
->active_circuit_pqueue
);
2182 /** Block (if <b>block</b> is true) or unblock (if <b>block</b> is false)
2183 * every edge connection that is using <b>circ</b> to write to <b>orconn</b>,
2184 * and start or stop reading as appropriate.
2186 * If <b>stream_id</b> is nonzero, block only the edge connection whose
2187 * stream_id matches it.
2189 * Returns the number of streams whose status we changed.
2192 set_streams_blocked_on_circ(circuit_t
*circ
, or_connection_t
*orconn
,
2193 int block
, streamid_t stream_id
)
2195 edge_connection_t
*edge
= NULL
;
2197 if (circ
->n_conn
== orconn
) {
2198 circ
->streams_blocked_on_n_conn
= block
;
2199 if (CIRCUIT_IS_ORIGIN(circ
))
2200 edge
= TO_ORIGIN_CIRCUIT(circ
)->p_streams
;
2202 circ
->streams_blocked_on_p_conn
= block
;
2203 tor_assert(!CIRCUIT_IS_ORIGIN(circ
));
2204 edge
= TO_OR_CIRCUIT(circ
)->n_streams
;
2207 for (; edge
; edge
= edge
->next_stream
) {
2208 connection_t
*conn
= TO_CONN(edge
);
2209 if (stream_id
&& edge
->stream_id
!= stream_id
)
2212 if (edge
->edge_blocked_on_circ
!= block
) {
2214 edge
->edge_blocked_on_circ
= block
;
2217 if (!conn
->read_event
) {
2218 /* This connection is a placeholder for something; probably a DNS
2219 * request. It can't actually stop or start reading.*/
2224 if (connection_is_reading(conn
))
2225 connection_stop_reading(conn
);
2227 /* Is this right? */
2228 if (!connection_is_reading(conn
))
2229 connection_start_reading(conn
);
2236 /** Pull as many cells as possible (but no more than <b>max</b>) from the
2237 * queue of the first active circuit on <b>conn</b>, and write them to
2238 * <b>conn</b>->outbuf. Return the number of cells written. Advance
2239 * the active circuit pointer to the next active circuit in the ring. */
2241 connection_or_flush_from_first_active_circuit(or_connection_t
*conn
, int max
,
2245 cell_queue_t
*queue
;
2247 int streams_blocked
;
2249 /* The current (hi-res) time */
2250 struct timeval now_hires
;
2252 /* The EWMA cell counter for the circuit we're flushing. */
2253 cell_ewma_t
*cell_ewma
= NULL
;
2254 double ewma_increment
= -1;
2256 circ
= conn
->active_circuits
;
2257 if (!circ
) return 0;
2258 assert_active_circuits_ok_paranoid(conn
);
2260 /* See if we're doing the ewma circuit selection algorithm. */
2263 double fractional_tick
;
2264 tor_gettimeofday_cached(&now_hires
);
2265 tick
= cell_ewma_tick_from_timeval(&now_hires
, &fractional_tick
);
2267 if (tick
!= conn
->active_circuit_pqueue_last_recalibrated
) {
2268 scale_active_circuits(conn
, tick
);
2271 ewma_increment
= pow(ewma_scale_factor
, -fractional_tick
);
2273 cell_ewma
= smartlist_get(conn
->active_circuit_pqueue
, 0);
2274 circ
= cell_ewma_to_circuit(cell_ewma
);
2277 if (circ
->n_conn
== conn
) {
2278 queue
= &circ
->n_conn_cells
;
2279 streams_blocked
= circ
->streams_blocked_on_n_conn
;
2281 queue
= &TO_OR_CIRCUIT(circ
)->p_conn_cells
;
2282 streams_blocked
= circ
->streams_blocked_on_p_conn
;
2284 tor_assert(*next_circ_on_conn_p(circ
,conn
));
2286 for (n_flushed
= 0; n_flushed
< max
&& queue
->head
; ) {
2287 packed_cell_t
*cell
= cell_queue_pop(queue
);
2288 tor_assert(*next_circ_on_conn_p(circ
,conn
));
2290 /* Calculate the exact time that this cell has spent in the queue. */
2291 if (get_options()->CellStatistics
&& !CIRCUIT_IS_ORIGIN(circ
)) {
2294 uint32_t cell_waiting_time
;
2295 insertion_time_queue_t
*it_queue
= queue
->insertion_times
;
2296 tor_gettimeofday_cached(&now
);
2297 flushed
= (uint32_t)((now
.tv_sec
% SECONDS_IN_A_DAY
) * 100L +
2298 (uint32_t)now
.tv_usec
/ (uint32_t)10000L);
2299 if (!it_queue
|| !it_queue
->first
) {
2300 log_info(LD_GENERAL
, "Cannot determine insertion time of cell. "
2301 "Looks like the CellStatistics option was "
2302 "recently enabled.");
2304 or_circuit_t
*orcirc
= TO_OR_CIRCUIT(circ
);
2305 insertion_time_elem_t
*elem
= it_queue
->first
;
2307 (uint32_t)((flushed
* 10L + SECONDS_IN_A_DAY
* 1000L -
2308 elem
->insertion_time
* 10L) %
2309 (SECONDS_IN_A_DAY
* 1000L));
2310 #undef SECONDS_IN_A_DAY
2312 if (elem
->counter
< 1) {
2313 it_queue
->first
= elem
->next
;
2314 if (elem
== it_queue
->last
)
2315 it_queue
->last
= NULL
;
2316 mp_pool_release(elem
);
2318 orcirc
->total_cell_waiting_time
+= cell_waiting_time
;
2319 orcirc
->processed_cells
++;
2323 /* If we just flushed our queue and this circuit is used for a
2324 * tunneled directory request, possibly advance its state. */
2325 if (queue
->n
== 0 && TO_CONN(conn
)->dirreq_id
)
2326 geoip_change_dirreq_state(TO_CONN(conn
)->dirreq_id
,
2328 DIRREQ_CIRC_QUEUE_FLUSHED
);
2330 connection_write_to_buf(cell
->body
, CELL_NETWORK_SIZE
, TO_CONN(conn
));
2332 packed_cell_free_unchecked(cell
);
2336 cell_ewma
->cell_count
+= ewma_increment
;
2337 /* We pop and re-add the cell_ewma_t here, not above, since we need to
2338 * re-add it immediately to keep the priority queue consistent with
2339 * the linked-list implementation */
2340 tmp
= pop_first_cell_ewma_from_conn(conn
);
2341 tor_assert(tmp
== cell_ewma
);
2342 add_cell_ewma_to_conn(conn
, cell_ewma
);
2344 if (circ
!= conn
->active_circuits
) {
2345 /* If this happens, the current circuit just got made inactive by
2346 * a call in connection_write_to_buf(). That's nothing to worry about:
2347 * circuit_make_inactive_on_conn() already advanced conn->active_circuits
2350 assert_active_circuits_ok_paranoid(conn
);
2354 tor_assert(*next_circ_on_conn_p(circ
,conn
));
2355 assert_active_circuits_ok_paranoid(conn
);
2356 conn
->active_circuits
= *next_circ_on_conn_p(circ
, conn
);
2358 /* Is the cell queue low enough to unblock all the streams that are waiting
2359 * to write to this circuit? */
2360 if (streams_blocked
&& queue
->n
<= CELL_QUEUE_LOWWATER_SIZE
)
2361 set_streams_blocked_on_circ(circ
, conn
, 0, 0); /* unblock streams */
2363 /* Did we just run out of cells on this circuit's queue? */
2364 if (queue
->n
== 0) {
2365 log_debug(LD_GENERAL
, "Made a circuit inactive.");
2366 make_circuit_inactive_on_conn(circ
, conn
);
2370 conn
->timestamp_last_added_nonpadding
= now
;
2374 /** Add <b>cell</b> to the queue of <b>circ</b> writing to <b>orconn</b>
2375 * transmitting in <b>direction</b>. */
2377 append_cell_to_circuit_queue(circuit_t
*circ
, or_connection_t
*orconn
,
2378 cell_t
*cell
, cell_direction_t direction
,
2379 streamid_t fromstream
)
2381 cell_queue_t
*queue
;
2382 int streams_blocked
;
2383 if (circ
->marked_for_close
)
2386 if (direction
== CELL_DIRECTION_OUT
) {
2387 queue
= &circ
->n_conn_cells
;
2388 streams_blocked
= circ
->streams_blocked_on_n_conn
;
2390 or_circuit_t
*orcirc
= TO_OR_CIRCUIT(circ
);
2391 queue
= &orcirc
->p_conn_cells
;
2392 streams_blocked
= circ
->streams_blocked_on_p_conn
;
2394 if (cell
->command
== CELL_RELAY_EARLY
&& orconn
->link_proto
< 2) {
2395 /* V1 connections don't understand RELAY_EARLY. */
2396 cell
->command
= CELL_RELAY
;
2399 cell_queue_append_packed_copy(queue
, cell
);
2401 /* If we have too many cells on the circuit, we should stop reading from
2402 * the edge streams for a while. */
2403 if (!streams_blocked
&& queue
->n
>= CELL_QUEUE_HIGHWATER_SIZE
)
2404 set_streams_blocked_on_circ(circ
, orconn
, 1, 0); /* block streams */
2406 if (streams_blocked
&& fromstream
) {
2407 /* This edge connection is apparently not blocked; block it. */
2408 set_streams_blocked_on_circ(circ
, orconn
, 1, fromstream
);
2411 if (queue
->n
== 1) {
2412 /* This was the first cell added to the queue. We need to make this
2413 * circuit active. */
2414 log_debug(LD_GENERAL
, "Made a circuit active.");
2415 make_circuit_active_on_conn(circ
, orconn
);
2418 if (! buf_datalen(orconn
->_base
.outbuf
)) {
2419 /* There is no data at all waiting to be sent on the outbuf. Add a
2420 * cell, so that we can notice when it gets flushed, flushed_some can
2421 * get called, and we can start putting more data onto the buffer then.
2423 log_debug(LD_GENERAL
, "Primed a buffer.");
2424 connection_or_flush_from_first_active_circuit(orconn
, 1, approx_time());
2428 /** Append an encoded value of <b>addr</b> to <b>payload_out</b>, which must
2429 * have at least 18 bytes of free space. The encoding is, as specified in
2431 * RESOLVED_TYPE_IPV4 or RESOLVED_TYPE_IPV6 [1 byte]
2433 * ADDRESS [length bytes]
2434 * Return the number of bytes added, or -1 on error */
2436 append_address_to_payload(char *payload_out
, const tor_addr_t
*addr
)
2439 switch (tor_addr_family(addr
)) {
2441 payload_out
[0] = RESOLVED_TYPE_IPV4
;
2443 a
= tor_addr_to_ipv4n(addr
);
2444 memcpy(payload_out
+2, &a
, 4);
2447 payload_out
[0] = RESOLVED_TYPE_IPV6
;
2448 payload_out
[1] = 16;
2449 memcpy(payload_out
+2, tor_addr_to_in6_addr8(addr
), 16);
2457 /** Given <b>payload_len</b> bytes at <b>payload</b>, starting with an address
2458 * encoded as by append_address_to_payload(), try to decode the address into
2459 * *<b>addr_out</b>. Return the next byte in the payload after the address on
2460 * success, or NULL on failure. */
2462 decode_address_from_payload(tor_addr_t
*addr_out
, const char *payload
,
2465 if (payload_len
< 2)
2467 if (payload_len
< 2+(uint8_t)payload
[1])
2470 switch (payload
[0]) {
2471 case RESOLVED_TYPE_IPV4
:
2472 if (payload
[1] != 4)
2474 tor_addr_from_ipv4n(addr_out
, get_uint32(payload
+2));
2476 case RESOLVED_TYPE_IPV6
:
2477 if (payload
[1] != 16)
2479 tor_addr_from_ipv6_bytes(addr_out
, payload
+2);
2482 tor_addr_make_unspec(addr_out
);
2485 return payload
+ 2 + (uint8_t)payload
[1];
2488 /** Remove all the cells queued on <b>circ</b> for <b>orconn</b>. */
2490 circuit_clear_cell_queue(circuit_t
*circ
, or_connection_t
*orconn
)
2492 cell_queue_t
*queue
;
2493 if (circ
->n_conn
== orconn
) {
2494 queue
= &circ
->n_conn_cells
;
2496 or_circuit_t
*orcirc
= TO_OR_CIRCUIT(circ
);
2497 tor_assert(orcirc
->p_conn
== orconn
);
2498 queue
= &orcirc
->p_conn_cells
;
2502 make_circuit_inactive_on_conn(circ
,orconn
);
2504 cell_queue_clear(queue
);
2507 /** Fail with an assert if the active circuits ring on <b>orconn</b> is
2510 assert_active_circuits_ok(or_connection_t
*orconn
)
2512 circuit_t
*head
= orconn
->active_circuits
;
2513 circuit_t
*cur
= head
;
2518 circuit_t
*next
= *next_circ_on_conn_p(cur
, orconn
);
2519 circuit_t
*prev
= *prev_circ_on_conn_p(cur
, orconn
);
2523 tor_assert(*next_circ_on_conn_p(prev
, orconn
) == cur
);
2524 tor_assert(*prev_circ_on_conn_p(next
, orconn
) == cur
);
2525 if (orconn
== cur
->n_conn
) {
2526 ewma
= &cur
->n_cell_ewma
;
2527 tor_assert(!ewma
->is_for_p_conn
);
2529 ewma
= &TO_OR_CIRCUIT(cur
)->p_cell_ewma
;
2530 tor_assert(ewma
->is_for_p_conn
);
2532 tor_assert(ewma
->heap_index
!= -1);
2533 tor_assert(ewma
== smartlist_get(orconn
->active_circuit_pqueue
,
2537 } while (cur
!= head
);
2539 tor_assert(n
== smartlist_len(orconn
->active_circuit_pqueue
));
2542 /** Return 1 if we shouldn't restart reading on this circuit, even if
2543 * we get a SENDME. Else return 0.
2546 circuit_queue_streams_are_blocked(circuit_t
*circ
)
2548 if (CIRCUIT_IS_ORIGIN(circ
)) {
2549 return circ
->streams_blocked_on_n_conn
;
2551 return circ
->streams_blocked_on_p_conn
;