Fix behavior of adding a cell to a blocked queue.
[tor.git] / src / or / relay.c
blob794f448523085f1a26b4524d426a2aacaf5ace94
1 /* Copyright (c) 2001 Matej Pfajfar.
2 * Copyright (c) 2001-2004, Roger Dingledine.
3 * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson.
4 * Copyright (c) 2007-2010, The Tor Project, Inc. */
5 /* See LICENSE for licensing information */
7 /**
8 * \file relay.c
9 * \brief Handle relay cell encryption/decryption, plus packaging and
10 * receiving from circuits, plus queuing on circuits.
11 **/
13 #include <math.h>
14 #include "or.h"
15 #include "buffers.h"
16 #include "circuitbuild.h"
17 #include "circuitlist.h"
18 #include "config.h"
19 #include "connection.h"
20 #include "connection_edge.h"
21 #include "connection_or.h"
22 #include "control.h"
23 #include "geoip.h"
24 #include "main.h"
25 #include "mempool.h"
26 #include "networkstatus.h"
27 #include "policies.h"
28 #include "reasons.h"
29 #include "relay.h"
30 #include "rendcommon.h"
31 #include "routerlist.h"
32 #include "routerparse.h"
34 static int relay_crypt(circuit_t *circ, cell_t *cell,
35 cell_direction_t cell_direction,
36 crypt_path_t **layer_hint, char *recognized);
37 static edge_connection_t *relay_lookup_conn(circuit_t *circ, cell_t *cell,
38 cell_direction_t cell_direction,
39 crypt_path_t *layer_hint);
41 static int
42 connection_edge_process_relay_cell(cell_t *cell, circuit_t *circ,
43 edge_connection_t *conn,
44 crypt_path_t *layer_hint);
45 static void
46 circuit_consider_sending_sendme(circuit_t *circ, crypt_path_t *layer_hint);
47 static void
48 circuit_resume_edge_reading(circuit_t *circ, crypt_path_t *layer_hint);
49 static int
50 circuit_resume_edge_reading_helper(edge_connection_t *conn,
51 circuit_t *circ,
52 crypt_path_t *layer_hint);
53 static int
54 circuit_consider_stop_edge_reading(circuit_t *circ, crypt_path_t *layer_hint);
55 static int circuit_queue_streams_are_blocked(circuit_t *circ);
57 /** Cache the current hi-res time; the cache gets reset when libevent
58 * calls us. */
60 static struct timeval cached_time_hires = {0, 0};
62 static void
63 tor_gettimeofday_cached(struct timeval *tv)
65 if (cached_time_hires.tv_sec == 0) {
66 tor_gettimeofday(&cached_time_hires);
68 *tv = cached_time_hires;
71 void
72 tor_gettimeofday_cache_clear(void)
74 cached_time_hires.tv_sec = 0;
77 /** Stats: how many relay cells have originated at this hop, or have
78 * been relayed onward (not recognized at this hop)?
80 uint64_t stats_n_relay_cells_relayed = 0;
81 /** Stats: how many relay cells have been delivered to streams at this
82 * hop?
84 uint64_t stats_n_relay_cells_delivered = 0;
86 /** Update digest from the payload of cell. Assign integrity part to
87 * cell.
89 static void
90 relay_set_digest(crypto_digest_env_t *digest, cell_t *cell)
92 char integrity[4];
93 relay_header_t rh;
95 crypto_digest_add_bytes(digest, cell->payload, CELL_PAYLOAD_SIZE);
96 crypto_digest_get_digest(digest, integrity, 4);
97 // log_fn(LOG_DEBUG,"Putting digest of %u %u %u %u into relay cell.",
98 // integrity[0], integrity[1], integrity[2], integrity[3]);
99 relay_header_unpack(&rh, cell->payload);
100 memcpy(rh.integrity, integrity, 4);
101 relay_header_pack(cell->payload, &rh);
104 /** Does the digest for this circuit indicate that this cell is for us?
106 * Update digest from the payload of cell (with the integrity part set
107 * to 0). If the integrity part is valid, return 1, else restore digest
108 * and cell to their original state and return 0.
110 static int
111 relay_digest_matches(crypto_digest_env_t *digest, cell_t *cell)
113 char received_integrity[4], calculated_integrity[4];
114 relay_header_t rh;
115 crypto_digest_env_t *backup_digest=NULL;
117 backup_digest = crypto_digest_dup(digest);
119 relay_header_unpack(&rh, cell->payload);
120 memcpy(received_integrity, rh.integrity, 4);
121 memset(rh.integrity, 0, 4);
122 relay_header_pack(cell->payload, &rh);
124 // log_fn(LOG_DEBUG,"Reading digest of %u %u %u %u from relay cell.",
125 // received_integrity[0], received_integrity[1],
126 // received_integrity[2], received_integrity[3]);
128 crypto_digest_add_bytes(digest, cell->payload, CELL_PAYLOAD_SIZE);
129 crypto_digest_get_digest(digest, calculated_integrity, 4);
131 if (memcmp(received_integrity, calculated_integrity, 4)) {
132 // log_fn(LOG_INFO,"Recognized=0 but bad digest. Not recognizing.");
133 // (%d vs %d).", received_integrity, calculated_integrity);
134 /* restore digest to its old form */
135 crypto_digest_assign(digest, backup_digest);
136 /* restore the relay header */
137 memcpy(rh.integrity, received_integrity, 4);
138 relay_header_pack(cell->payload, &rh);
139 crypto_free_digest_env(backup_digest);
140 return 0;
142 crypto_free_digest_env(backup_digest);
143 return 1;
146 /** Apply <b>cipher</b> to CELL_PAYLOAD_SIZE bytes of <b>in</b>
147 * (in place).
149 * If <b>encrypt_mode</b> is 1 then encrypt, else decrypt.
151 * Return -1 if the crypto fails, else return 0.
153 static int
154 relay_crypt_one_payload(crypto_cipher_env_t *cipher, char *in,
155 int encrypt_mode)
157 int r;
158 (void)encrypt_mode;
159 r = crypto_cipher_crypt_inplace(cipher, in, CELL_PAYLOAD_SIZE);
161 if (r) {
162 log_warn(LD_BUG,"Error during relay encryption");
163 return -1;
165 return 0;
168 /** Receive a relay cell:
169 * - Crypt it (encrypt if headed toward the origin or if we <b>are</b> the
170 * origin; decrypt if we're headed toward the exit).
171 * - Check if recognized (if exitward).
172 * - If recognized and the digest checks out, then find if there's a stream
173 * that the cell is intended for, and deliver it to the right
174 * connection_edge.
175 * - If not recognized, then we need to relay it: append it to the appropriate
176 * cell_queue on <b>circ</b>.
178 * Return -<b>reason</b> on failure.
181 circuit_receive_relay_cell(cell_t *cell, circuit_t *circ,
182 cell_direction_t cell_direction)
184 or_connection_t *or_conn=NULL;
185 crypt_path_t *layer_hint=NULL;
186 char recognized=0;
187 int reason;
189 tor_assert(cell);
190 tor_assert(circ);
191 tor_assert(cell_direction == CELL_DIRECTION_OUT ||
192 cell_direction == CELL_DIRECTION_IN);
193 if (circ->marked_for_close)
194 return 0;
196 if (relay_crypt(circ, cell, cell_direction, &layer_hint, &recognized) < 0) {
197 log_warn(LD_BUG,"relay crypt failed. Dropping connection.");
198 return -END_CIRC_REASON_INTERNAL;
201 if (recognized) {
202 edge_connection_t *conn = relay_lookup_conn(circ, cell, cell_direction,
203 layer_hint);
204 if (cell_direction == CELL_DIRECTION_OUT) {
205 ++stats_n_relay_cells_delivered;
206 log_debug(LD_OR,"Sending away from origin.");
207 if ((reason=connection_edge_process_relay_cell(cell, circ, conn, NULL))
208 < 0) {
209 log_fn(LOG_PROTOCOL_WARN, LD_PROTOCOL,
210 "connection_edge_process_relay_cell (away from origin) "
211 "failed.");
212 return reason;
215 if (cell_direction == CELL_DIRECTION_IN) {
216 ++stats_n_relay_cells_delivered;
217 log_debug(LD_OR,"Sending to origin.");
218 if ((reason = connection_edge_process_relay_cell(cell, circ, conn,
219 layer_hint)) < 0) {
220 log_warn(LD_OR,
221 "connection_edge_process_relay_cell (at origin) failed.");
222 return reason;
225 return 0;
228 /* not recognized. pass it on. */
229 if (cell_direction == CELL_DIRECTION_OUT) {
230 cell->circ_id = circ->n_circ_id; /* switch it */
231 or_conn = circ->n_conn;
232 } else if (! CIRCUIT_IS_ORIGIN(circ)) {
233 cell->circ_id = TO_OR_CIRCUIT(circ)->p_circ_id; /* switch it */
234 or_conn = TO_OR_CIRCUIT(circ)->p_conn;
235 } else {
236 log_fn(LOG_PROTOCOL_WARN, LD_OR,
237 "Dropping unrecognized inbound cell on origin circuit.");
238 return 0;
241 if (!or_conn) {
242 // XXXX Can this splice stuff be done more cleanly?
243 if (! CIRCUIT_IS_ORIGIN(circ) &&
244 TO_OR_CIRCUIT(circ)->rend_splice &&
245 cell_direction == CELL_DIRECTION_OUT) {
246 or_circuit_t *splice = TO_OR_CIRCUIT(circ)->rend_splice;
247 tor_assert(circ->purpose == CIRCUIT_PURPOSE_REND_ESTABLISHED);
248 tor_assert(splice->_base.purpose == CIRCUIT_PURPOSE_REND_ESTABLISHED);
249 cell->circ_id = splice->p_circ_id;
250 cell->command = CELL_RELAY; /* can't be relay_early anyway */
251 if ((reason = circuit_receive_relay_cell(cell, TO_CIRCUIT(splice),
252 CELL_DIRECTION_IN)) < 0) {
253 log_warn(LD_REND, "Error relaying cell across rendezvous; closing "
254 "circuits");
255 /* XXXX Do this here, or just return -1? */
256 circuit_mark_for_close(circ, -reason);
257 return reason;
259 return 0;
261 log_fn(LOG_PROTOCOL_WARN, LD_PROTOCOL,
262 "Didn't recognize cell, but circ stops here! Closing circ.");
263 return -END_CIRC_REASON_TORPROTOCOL;
266 log_debug(LD_OR,"Passing on unrecognized cell.");
268 ++stats_n_relay_cells_relayed; /* XXXX no longer quite accurate {cells}
269 * we might kill the circ before we relay
270 * the cells. */
272 append_cell_to_circuit_queue(circ, or_conn, cell, cell_direction, 0);
273 return 0;
276 /** Do the appropriate en/decryptions for <b>cell</b> arriving on
277 * <b>circ</b> in direction <b>cell_direction</b>.
279 * If cell_direction == CELL_DIRECTION_IN:
280 * - If we're at the origin (we're the OP), for hops 1..N,
281 * decrypt cell. If recognized, stop.
282 * - Else (we're not the OP), encrypt one hop. Cell is not recognized.
284 * If cell_direction == CELL_DIRECTION_OUT:
285 * - decrypt one hop. Check if recognized.
287 * If cell is recognized, set *recognized to 1, and set
288 * *layer_hint to the hop that recognized it.
290 * Return -1 to indicate that we should mark the circuit for close,
291 * else return 0.
293 static int
294 relay_crypt(circuit_t *circ, cell_t *cell, cell_direction_t cell_direction,
295 crypt_path_t **layer_hint, char *recognized)
297 relay_header_t rh;
299 tor_assert(circ);
300 tor_assert(cell);
301 tor_assert(recognized);
302 tor_assert(cell_direction == CELL_DIRECTION_IN ||
303 cell_direction == CELL_DIRECTION_OUT);
305 if (cell_direction == CELL_DIRECTION_IN) {
306 if (CIRCUIT_IS_ORIGIN(circ)) { /* We're at the beginning of the circuit.
307 * We'll want to do layered decrypts. */
308 crypt_path_t *thishop, *cpath = TO_ORIGIN_CIRCUIT(circ)->cpath;
309 thishop = cpath;
310 if (thishop->state != CPATH_STATE_OPEN) {
311 log_fn(LOG_PROTOCOL_WARN, LD_PROTOCOL,
312 "Relay cell before first created cell? Closing.");
313 return -1;
315 do { /* Remember: cpath is in forward order, that is, first hop first. */
316 tor_assert(thishop);
318 if (relay_crypt_one_payload(thishop->b_crypto, cell->payload, 0) < 0)
319 return -1;
321 relay_header_unpack(&rh, cell->payload);
322 if (rh.recognized == 0) {
323 /* it's possibly recognized. have to check digest to be sure. */
324 if (relay_digest_matches(thishop->b_digest, cell)) {
325 *recognized = 1;
326 *layer_hint = thishop;
327 return 0;
331 thishop = thishop->next;
332 } while (thishop != cpath && thishop->state == CPATH_STATE_OPEN);
333 log_fn(LOG_PROTOCOL_WARN, LD_OR,
334 "Incoming cell at client not recognized. Closing.");
335 return -1;
336 } else { /* we're in the middle. Just one crypt. */
337 if (relay_crypt_one_payload(TO_OR_CIRCUIT(circ)->p_crypto,
338 cell->payload, 1) < 0)
339 return -1;
340 // log_fn(LOG_DEBUG,"Skipping recognized check, because we're not "
341 // "the client.");
343 } else /* cell_direction == CELL_DIRECTION_OUT */ {
344 /* we're in the middle. Just one crypt. */
346 if (relay_crypt_one_payload(TO_OR_CIRCUIT(circ)->n_crypto,
347 cell->payload, 0) < 0)
348 return -1;
350 relay_header_unpack(&rh, cell->payload);
351 if (rh.recognized == 0) {
352 /* it's possibly recognized. have to check digest to be sure. */
353 if (relay_digest_matches(TO_OR_CIRCUIT(circ)->n_digest, cell)) {
354 *recognized = 1;
355 return 0;
359 return 0;
362 /** Package a relay cell from an edge:
363 * - Encrypt it to the right layer
364 * - Append it to the appropriate cell_queue on <b>circ</b>.
366 static int
367 circuit_package_relay_cell(cell_t *cell, circuit_t *circ,
368 cell_direction_t cell_direction,
369 crypt_path_t *layer_hint, uint16_t on_stream)
371 or_connection_t *conn; /* where to send the cell */
373 if (cell_direction == CELL_DIRECTION_OUT) {
374 crypt_path_t *thishop; /* counter for repeated crypts */
375 conn = circ->n_conn;
376 if (!CIRCUIT_IS_ORIGIN(circ) || !conn) {
377 log_warn(LD_BUG,"outgoing relay cell has n_conn==NULL. Dropping.");
378 return 0; /* just drop it */
381 relay_set_digest(layer_hint->f_digest, cell);
383 thishop = layer_hint;
384 /* moving from farthest to nearest hop */
385 do {
386 tor_assert(thishop);
387 /* XXXX RD This is a bug, right? */
388 log_debug(LD_OR,"crypting a layer of the relay cell.");
389 if (relay_crypt_one_payload(thishop->f_crypto, cell->payload, 1) < 0) {
390 return -1;
393 thishop = thishop->prev;
394 } while (thishop != TO_ORIGIN_CIRCUIT(circ)->cpath->prev);
396 } else { /* incoming cell */
397 or_circuit_t *or_circ;
398 if (CIRCUIT_IS_ORIGIN(circ)) {
399 /* We should never package an _incoming_ cell from the circuit
400 * origin; that means we messed up somewhere. */
401 log_warn(LD_BUG,"incoming relay cell at origin circuit. Dropping.");
402 assert_circuit_ok(circ);
403 return 0; /* just drop it */
405 or_circ = TO_OR_CIRCUIT(circ);
406 conn = or_circ->p_conn;
407 relay_set_digest(or_circ->p_digest, cell);
408 if (relay_crypt_one_payload(or_circ->p_crypto, cell->payload, 1) < 0)
409 return -1;
411 ++stats_n_relay_cells_relayed;
413 append_cell_to_circuit_queue(circ, conn, cell, cell_direction, on_stream);
414 return 0;
417 /** If cell's stream_id matches the stream_id of any conn that's
418 * attached to circ, return that conn, else return NULL.
420 static edge_connection_t *
421 relay_lookup_conn(circuit_t *circ, cell_t *cell,
422 cell_direction_t cell_direction, crypt_path_t *layer_hint)
424 edge_connection_t *tmpconn;
425 relay_header_t rh;
427 relay_header_unpack(&rh, cell->payload);
429 if (!rh.stream_id)
430 return NULL;
432 /* IN or OUT cells could have come from either direction, now
433 * that we allow rendezvous *to* an OP.
436 if (CIRCUIT_IS_ORIGIN(circ)) {
437 for (tmpconn = TO_ORIGIN_CIRCUIT(circ)->p_streams; tmpconn;
438 tmpconn=tmpconn->next_stream) {
439 if (rh.stream_id == tmpconn->stream_id &&
440 !tmpconn->_base.marked_for_close &&
441 tmpconn->cpath_layer == layer_hint) {
442 log_debug(LD_APP,"found conn for stream %d.", rh.stream_id);
443 return tmpconn;
446 } else {
447 for (tmpconn = TO_OR_CIRCUIT(circ)->n_streams; tmpconn;
448 tmpconn=tmpconn->next_stream) {
449 if (rh.stream_id == tmpconn->stream_id &&
450 !tmpconn->_base.marked_for_close) {
451 log_debug(LD_EXIT,"found conn for stream %d.", rh.stream_id);
452 if (cell_direction == CELL_DIRECTION_OUT ||
453 connection_edge_is_rendezvous_stream(tmpconn))
454 return tmpconn;
457 for (tmpconn = TO_OR_CIRCUIT(circ)->resolving_streams; tmpconn;
458 tmpconn=tmpconn->next_stream) {
459 if (rh.stream_id == tmpconn->stream_id &&
460 !tmpconn->_base.marked_for_close) {
461 log_debug(LD_EXIT,"found conn for stream %d.", rh.stream_id);
462 return tmpconn;
466 return NULL; /* probably a begin relay cell */
469 /** Pack the relay_header_t host-order structure <b>src</b> into
470 * network-order in the buffer <b>dest</b>. See tor-spec.txt for details
471 * about the wire format.
473 void
474 relay_header_pack(char *dest, const relay_header_t *src)
476 *(uint8_t*)(dest) = src->command;
478 set_uint16(dest+1, htons(src->recognized));
479 set_uint16(dest+3, htons(src->stream_id));
480 memcpy(dest+5, src->integrity, 4);
481 set_uint16(dest+9, htons(src->length));
484 /** Unpack the network-order buffer <b>src</b> into a host-order
485 * relay_header_t structure <b>dest</b>.
487 void
488 relay_header_unpack(relay_header_t *dest, const char *src)
490 dest->command = *(uint8_t*)(src);
492 dest->recognized = ntohs(get_uint16(src+1));
493 dest->stream_id = ntohs(get_uint16(src+3));
494 memcpy(dest->integrity, src+5, 4);
495 dest->length = ntohs(get_uint16(src+9));
498 /** Convert the relay <b>command</b> into a human-readable string. */
499 static const char *
500 relay_command_to_string(uint8_t command)
502 switch (command) {
503 case RELAY_COMMAND_BEGIN: return "BEGIN";
504 case RELAY_COMMAND_DATA: return "DATA";
505 case RELAY_COMMAND_END: return "END";
506 case RELAY_COMMAND_CONNECTED: return "CONNECTED";
507 case RELAY_COMMAND_SENDME: return "SENDME";
508 case RELAY_COMMAND_EXTEND: return "EXTEND";
509 case RELAY_COMMAND_EXTENDED: return "EXTENDED";
510 case RELAY_COMMAND_TRUNCATE: return "TRUNCATE";
511 case RELAY_COMMAND_TRUNCATED: return "TRUNCATED";
512 case RELAY_COMMAND_DROP: return "DROP";
513 case RELAY_COMMAND_RESOLVE: return "RESOLVE";
514 case RELAY_COMMAND_RESOLVED: return "RESOLVED";
515 case RELAY_COMMAND_BEGIN_DIR: return "BEGIN_DIR";
516 case RELAY_COMMAND_ESTABLISH_INTRO: return "ESTABLISH_INTRO";
517 case RELAY_COMMAND_ESTABLISH_RENDEZVOUS: return "ESTABLISH_RENDEZVOUS";
518 case RELAY_COMMAND_INTRODUCE1: return "INTRODUCE1";
519 case RELAY_COMMAND_INTRODUCE2: return "INTRODUCE2";
520 case RELAY_COMMAND_RENDEZVOUS1: return "RENDEZVOUS1";
521 case RELAY_COMMAND_RENDEZVOUS2: return "RENDEZVOUS2";
522 case RELAY_COMMAND_INTRO_ESTABLISHED: return "INTRO_ESTABLISHED";
523 case RELAY_COMMAND_RENDEZVOUS_ESTABLISHED:
524 return "RENDEZVOUS_ESTABLISHED";
525 case RELAY_COMMAND_INTRODUCE_ACK: return "INTRODUCE_ACK";
526 default: return "(unrecognized)";
530 /** Make a relay cell out of <b>relay_command</b> and <b>payload</b>, and send
531 * it onto the open circuit <b>circ</b>. <b>stream_id</b> is the ID on
532 * <b>circ</b> for the stream that's sending the relay cell, or 0 if it's a
533 * control cell. <b>cpath_layer</b> is NULL for OR->OP cells, or the
534 * destination hop for OP->OR cells.
536 * If you can't send the cell, mark the circuit for close and return -1. Else
537 * return 0.
540 relay_send_command_from_edge(uint16_t stream_id, circuit_t *circ,
541 uint8_t relay_command, const char *payload,
542 size_t payload_len, crypt_path_t *cpath_layer)
544 cell_t cell;
545 relay_header_t rh;
546 cell_direction_t cell_direction;
547 /* XXXX NM Split this function into a separate versions per circuit type? */
549 tor_assert(circ);
550 tor_assert(payload_len <= RELAY_PAYLOAD_SIZE);
552 memset(&cell, 0, sizeof(cell_t));
553 cell.command = CELL_RELAY;
554 if (cpath_layer) {
555 cell.circ_id = circ->n_circ_id;
556 cell_direction = CELL_DIRECTION_OUT;
557 } else if (! CIRCUIT_IS_ORIGIN(circ)) {
558 cell.circ_id = TO_OR_CIRCUIT(circ)->p_circ_id;
559 cell_direction = CELL_DIRECTION_IN;
560 } else {
561 return -1;
564 memset(&rh, 0, sizeof(rh));
565 rh.command = relay_command;
566 rh.stream_id = stream_id;
567 rh.length = payload_len;
568 relay_header_pack(cell.payload, &rh);
569 if (payload_len)
570 memcpy(cell.payload+RELAY_HEADER_SIZE, payload, payload_len);
572 log_debug(LD_OR,"delivering %d cell %s.", relay_command,
573 cell_direction == CELL_DIRECTION_OUT ? "forward" : "backward");
575 /* If we are sending an END cell and this circuit is used for a tunneled
576 * directory request, advance its state. */
577 if (relay_command == RELAY_COMMAND_END && circ->dirreq_id)
578 geoip_change_dirreq_state(circ->dirreq_id, DIRREQ_TUNNELED,
579 DIRREQ_END_CELL_SENT);
581 if (cell_direction == CELL_DIRECTION_OUT && circ->n_conn) {
582 /* if we're using relaybandwidthrate, this conn wants priority */
583 circ->n_conn->client_used = approx_time();
586 if (cell_direction == CELL_DIRECTION_OUT) {
587 origin_circuit_t *origin_circ = TO_ORIGIN_CIRCUIT(circ);
588 if (origin_circ->remaining_relay_early_cells > 0 &&
589 (relay_command == RELAY_COMMAND_EXTEND ||
590 (cpath_layer != origin_circ->cpath &&
591 !CIRCUIT_PURPOSE_IS_ESTABLISHED_REND(circ->purpose)))) {
592 /* If we've got any relay_early cells left, and we're sending
593 * an extend cell or (we're not talking to the first hop and we're
594 * not talking to a rendezvous circuit), use one of them.
595 * Don't worry about the conn protocol version:
596 * append_cell_to_circuit_queue will fix it up. */
597 /* XXX For now, clients don't use RELAY_EARLY cells when sending
598 * relay cells on rendezvous circuits. See bug 1038. Once no relays
599 * (and thus no rendezvous points) are running 0.2.1.3-alpha through
600 * 0.2.1.18, we can take out that exception. -RD */
601 cell.command = CELL_RELAY_EARLY;
602 --origin_circ->remaining_relay_early_cells;
603 log_debug(LD_OR, "Sending a RELAY_EARLY cell; %d remaining.",
604 (int)origin_circ->remaining_relay_early_cells);
605 /* Memorize the command that is sent as RELAY_EARLY cell; helps debug
606 * task 878. */
607 origin_circ->relay_early_commands[
608 origin_circ->relay_early_cells_sent++] = relay_command;
609 } else if (relay_command == RELAY_COMMAND_EXTEND) {
610 /* If no RELAY_EARLY cells can be sent over this circuit, log which
611 * commands have been sent as RELAY_EARLY cells before; helps debug
612 * task 878. */
613 smartlist_t *commands_list = smartlist_create();
614 int i = 0;
615 char *commands = NULL;
616 for (; i < origin_circ->relay_early_cells_sent; i++)
617 smartlist_add(commands_list, (char *)
618 relay_command_to_string(origin_circ->relay_early_commands[i]));
619 commands = smartlist_join_strings(commands_list, ",", 0, NULL);
620 log_warn(LD_BUG, "Uh-oh. We're sending a RELAY_COMMAND_EXTEND cell, "
621 "but we have run out of RELAY_EARLY cells on that circuit. "
622 "Commands sent before: %s", commands);
623 tor_free(commands);
624 smartlist_free(commands_list);
628 if (circuit_package_relay_cell(&cell, circ, cell_direction, cpath_layer, 0)
629 < 0) {
630 log_warn(LD_BUG,"circuit_package_relay_cell failed. Closing.");
631 circuit_mark_for_close(circ, END_CIRC_REASON_INTERNAL);
632 return -1;
634 return 0;
637 /** Make a relay cell out of <b>relay_command</b> and <b>payload</b>, and
638 * send it onto the open circuit <b>circ</b>. <b>fromconn</b> is the stream
639 * that's sending the relay cell, or NULL if it's a control cell.
640 * <b>cpath_layer</b> is NULL for OR->OP cells, or the destination hop
641 * for OP->OR cells.
643 * If you can't send the cell, mark the circuit for close and
644 * return -1. Else return 0.
647 connection_edge_send_command(edge_connection_t *fromconn,
648 uint8_t relay_command, const char *payload,
649 size_t payload_len)
651 /* XXXX NM Split this function into a separate versions per circuit type? */
652 circuit_t *circ;
653 tor_assert(fromconn);
654 circ = fromconn->on_circuit;
656 if (fromconn->_base.marked_for_close) {
657 log_warn(LD_BUG,
658 "called on conn that's already marked for close at %s:%d.",
659 fromconn->_base.marked_for_close_file,
660 fromconn->_base.marked_for_close);
661 return 0;
664 if (!circ) {
665 if (fromconn->_base.type == CONN_TYPE_AP) {
666 log_info(LD_APP,"no circ. Closing conn.");
667 connection_mark_unattached_ap(fromconn, END_STREAM_REASON_INTERNAL);
668 } else {
669 log_info(LD_EXIT,"no circ. Closing conn.");
670 fromconn->edge_has_sent_end = 1; /* no circ to send to */
671 fromconn->end_reason = END_STREAM_REASON_INTERNAL;
672 connection_mark_for_close(TO_CONN(fromconn));
674 return -1;
677 return relay_send_command_from_edge(fromconn->stream_id, circ,
678 relay_command, payload,
679 payload_len, fromconn->cpath_layer);
682 /** How many times will I retry a stream that fails due to DNS
683 * resolve failure or misc error?
685 #define MAX_RESOLVE_FAILURES 3
687 /** Return 1 if reason is something that you should retry if you
688 * get the end cell before you've connected; else return 0. */
689 static int
690 edge_reason_is_retriable(int reason)
692 return reason == END_STREAM_REASON_HIBERNATING ||
693 reason == END_STREAM_REASON_RESOURCELIMIT ||
694 reason == END_STREAM_REASON_EXITPOLICY ||
695 reason == END_STREAM_REASON_RESOLVEFAILED ||
696 reason == END_STREAM_REASON_MISC ||
697 reason == END_STREAM_REASON_NOROUTE;
700 /** Called when we receive an END cell on a stream that isn't open yet,
701 * from the client side.
702 * Arguments are as for connection_edge_process_relay_cell().
704 static int
705 connection_ap_process_end_not_open(
706 relay_header_t *rh, cell_t *cell, origin_circuit_t *circ,
707 edge_connection_t *conn, crypt_path_t *layer_hint)
709 struct in_addr in;
710 routerinfo_t *exitrouter;
711 int reason = *(cell->payload+RELAY_HEADER_SIZE);
712 int control_reason = reason | END_STREAM_REASON_FLAG_REMOTE;
713 (void) layer_hint; /* unused */
715 if (rh->length > 0 && edge_reason_is_retriable(reason) &&
716 !connection_edge_is_rendezvous_stream(conn) /* avoid retry if rend */
718 log_info(LD_APP,"Address '%s' refused due to '%s'. Considering retrying.",
719 safe_str(conn->socks_request->address),
720 stream_end_reason_to_string(reason));
721 exitrouter =
722 router_get_by_digest(circ->build_state->chosen_exit->identity_digest);
723 switch (reason) {
724 case END_STREAM_REASON_EXITPOLICY:
725 if (rh->length >= 5) {
726 uint32_t addr = ntohl(get_uint32(cell->payload+RELAY_HEADER_SIZE+1));
727 int ttl;
728 if (!addr) {
729 log_info(LD_APP,"Address '%s' resolved to 0.0.0.0. Closing,",
730 safe_str(conn->socks_request->address));
731 connection_mark_unattached_ap(conn, END_STREAM_REASON_TORPROTOCOL);
732 return 0;
734 if (rh->length >= 9)
735 ttl = (int)ntohl(get_uint32(cell->payload+RELAY_HEADER_SIZE+5));
736 else
737 ttl = -1;
739 if (get_options()->ClientDNSRejectInternalAddresses &&
740 is_internal_IP(addr, 0)) {
741 log_info(LD_APP,"Address '%s' resolved to internal. Closing,",
742 safe_str(conn->socks_request->address));
743 connection_mark_unattached_ap(conn, END_STREAM_REASON_TORPROTOCOL);
744 return 0;
746 client_dns_set_addressmap(conn->socks_request->address, addr,
747 conn->chosen_exit_name, ttl);
749 /* check if he *ought* to have allowed it */
750 if (exitrouter &&
751 (rh->length < 5 ||
752 (tor_inet_aton(conn->socks_request->address, &in) &&
753 !conn->chosen_exit_name))) {
754 log_info(LD_APP,
755 "Exitrouter '%s' seems to be more restrictive than its exit "
756 "policy. Not using this router as exit for now.",
757 exitrouter->nickname);
758 policies_set_router_exitpolicy_to_reject_all(exitrouter);
760 /* rewrite it to an IP if we learned one. */
761 if (addressmap_rewrite(conn->socks_request->address,
762 sizeof(conn->socks_request->address),
763 NULL)) {
764 control_event_stream_status(conn, STREAM_EVENT_REMAP, 0);
766 if (conn->chosen_exit_optional ||
767 conn->chosen_exit_retries) {
768 /* stop wanting a specific exit */
769 conn->chosen_exit_optional = 0;
770 /* A non-zero chosen_exit_retries can happen if we set a
771 * TrackHostExits for this address under a port that the exit
772 * relay allows, but then try the same address with a different
773 * port that it doesn't allow to exit. We shouldn't unregister
774 * the mapping, since it is probably still wanted on the
775 * original port. But now we give away to the exit relay that
776 * we probably have a TrackHostExits on it. So be it. */
777 conn->chosen_exit_retries = 0;
778 tor_free(conn->chosen_exit_name); /* clears it */
780 if (connection_ap_detach_retriable(conn, circ, control_reason) >= 0)
781 return 0;
782 /* else, conn will get closed below */
783 break;
784 case END_STREAM_REASON_CONNECTREFUSED:
785 if (!conn->chosen_exit_optional)
786 break; /* break means it'll close, below */
787 /* Else fall through: expire this circuit, clear the
788 * chosen_exit_name field, and try again. */
789 case END_STREAM_REASON_RESOLVEFAILED:
790 case END_STREAM_REASON_TIMEOUT:
791 case END_STREAM_REASON_MISC:
792 case END_STREAM_REASON_NOROUTE:
793 if (client_dns_incr_failures(conn->socks_request->address)
794 < MAX_RESOLVE_FAILURES) {
795 /* We haven't retried too many times; reattach the connection. */
796 circuit_log_path(LOG_INFO,LD_APP,circ);
797 tor_assert(circ->_base.timestamp_dirty);
798 circ->_base.timestamp_dirty -= get_options()->MaxCircuitDirtiness;
800 if (conn->chosen_exit_optional) {
801 /* stop wanting a specific exit */
802 conn->chosen_exit_optional = 0;
803 tor_free(conn->chosen_exit_name); /* clears it */
805 if (connection_ap_detach_retriable(conn, circ, control_reason) >= 0)
806 return 0;
807 /* else, conn will get closed below */
808 } else {
809 log_notice(LD_APP,
810 "Have tried resolving or connecting to address '%s' "
811 "at %d different places. Giving up.",
812 safe_str(conn->socks_request->address),
813 MAX_RESOLVE_FAILURES);
814 /* clear the failures, so it will have a full try next time */
815 client_dns_clear_failures(conn->socks_request->address);
817 break;
818 case END_STREAM_REASON_HIBERNATING:
819 case END_STREAM_REASON_RESOURCELIMIT:
820 if (exitrouter) {
821 policies_set_router_exitpolicy_to_reject_all(exitrouter);
823 if (conn->chosen_exit_optional) {
824 /* stop wanting a specific exit */
825 conn->chosen_exit_optional = 0;
826 tor_free(conn->chosen_exit_name); /* clears it */
828 if (connection_ap_detach_retriable(conn, circ, control_reason) >= 0)
829 return 0;
830 /* else, will close below */
831 break;
832 } /* end switch */
833 log_info(LD_APP,"Giving up on retrying; conn can't be handled.");
836 log_info(LD_APP,
837 "Edge got end (%s) before we're connected. Marking for close.",
838 stream_end_reason_to_string(rh->length > 0 ? reason : -1));
839 circuit_log_path(LOG_INFO,LD_APP,circ);
840 /* need to test because of detach_retriable */
841 if (!conn->_base.marked_for_close)
842 connection_mark_unattached_ap(conn, control_reason);
843 return 0;
846 /** Helper: change the socks_request-&gt;address field on conn to the
847 * dotted-quad representation of <b>new_addr</b> (given in host order),
848 * and send an appropriate REMAP event. */
849 static void
850 remap_event_helper(edge_connection_t *conn, uint32_t new_addr)
852 struct in_addr in;
854 in.s_addr = htonl(new_addr);
855 tor_inet_ntoa(&in, conn->socks_request->address,
856 sizeof(conn->socks_request->address));
857 control_event_stream_status(conn, STREAM_EVENT_REMAP,
858 REMAP_STREAM_SOURCE_EXIT);
861 /** An incoming relay cell has arrived from circuit <b>circ</b> to
862 * stream <b>conn</b>.
864 * The arguments here are the same as in
865 * connection_edge_process_relay_cell() below; this function is called
866 * from there when <b>conn</b> is defined and not in an open state.
868 static int
869 connection_edge_process_relay_cell_not_open(
870 relay_header_t *rh, cell_t *cell, circuit_t *circ,
871 edge_connection_t *conn, crypt_path_t *layer_hint)
873 if (rh->command == RELAY_COMMAND_END) {
874 if (CIRCUIT_IS_ORIGIN(circ) && conn->_base.type == CONN_TYPE_AP) {
875 return connection_ap_process_end_not_open(rh, cell,
876 TO_ORIGIN_CIRCUIT(circ), conn,
877 layer_hint);
878 } else {
879 /* we just got an 'end', don't need to send one */
880 conn->edge_has_sent_end = 1;
881 conn->end_reason = *(cell->payload+RELAY_HEADER_SIZE) |
882 END_STREAM_REASON_FLAG_REMOTE;
883 connection_mark_for_close(TO_CONN(conn));
884 return 0;
888 if (conn->_base.type == CONN_TYPE_AP &&
889 rh->command == RELAY_COMMAND_CONNECTED) {
890 tor_assert(CIRCUIT_IS_ORIGIN(circ));
891 if (conn->_base.state != AP_CONN_STATE_CONNECT_WAIT) {
892 log_fn(LOG_PROTOCOL_WARN, LD_APP,
893 "Got 'connected' while not in state connect_wait. Dropping.");
894 return 0;
896 conn->_base.state = AP_CONN_STATE_OPEN;
897 log_info(LD_APP,"'connected' received after %d seconds.",
898 (int)(time(NULL) - conn->_base.timestamp_lastread));
899 if (rh->length >= 4) {
900 uint32_t addr = ntohl(get_uint32(cell->payload+RELAY_HEADER_SIZE));
901 int ttl;
902 if (!addr || (get_options()->ClientDNSRejectInternalAddresses &&
903 is_internal_IP(addr, 0))) {
904 char buf[INET_NTOA_BUF_LEN];
905 struct in_addr a;
906 a.s_addr = htonl(addr);
907 tor_inet_ntoa(&a, buf, sizeof(buf));
908 log_info(LD_APP,
909 "...but it claims the IP address was %s. Closing.", buf);
910 connection_edge_end(conn, END_STREAM_REASON_TORPROTOCOL);
911 connection_mark_unattached_ap(conn, END_STREAM_REASON_TORPROTOCOL);
912 return 0;
914 if (rh->length >= 8)
915 ttl = (int)ntohl(get_uint32(cell->payload+RELAY_HEADER_SIZE+4));
916 else
917 ttl = -1;
918 client_dns_set_addressmap(conn->socks_request->address, addr,
919 conn->chosen_exit_name, ttl);
921 remap_event_helper(conn, addr);
923 circuit_log_path(LOG_INFO,LD_APP,TO_ORIGIN_CIRCUIT(circ));
924 /* don't send a socks reply to transparent conns */
925 if (!conn->socks_request->has_finished)
926 connection_ap_handshake_socks_reply(conn, NULL, 0, 0);
928 /* Was it a linked dir conn? If so, a dir request just started to
929 * fetch something; this could be a bootstrap status milestone. */
930 log_debug(LD_APP, "considering");
931 if (TO_CONN(conn)->linked_conn &&
932 TO_CONN(conn)->linked_conn->type == CONN_TYPE_DIR) {
933 connection_t *dirconn = TO_CONN(conn)->linked_conn;
934 log_debug(LD_APP, "it is! %d", dirconn->purpose);
935 switch (dirconn->purpose) {
936 case DIR_PURPOSE_FETCH_CERTIFICATE:
937 if (consensus_is_waiting_for_certs())
938 control_event_bootstrap(BOOTSTRAP_STATUS_LOADING_KEYS, 0);
939 break;
940 case DIR_PURPOSE_FETCH_CONSENSUS:
941 control_event_bootstrap(BOOTSTRAP_STATUS_LOADING_STATUS, 0);
942 break;
943 case DIR_PURPOSE_FETCH_SERVERDESC:
944 control_event_bootstrap(BOOTSTRAP_STATUS_LOADING_DESCRIPTORS,
945 count_loading_descriptors_progress());
946 break;
950 /* handle anything that might have queued */
951 if (connection_edge_package_raw_inbuf(conn, 1) < 0) {
952 /* (We already sent an end cell if possible) */
953 connection_mark_for_close(TO_CONN(conn));
954 return 0;
956 return 0;
958 if (conn->_base.type == CONN_TYPE_AP &&
959 rh->command == RELAY_COMMAND_RESOLVED) {
960 int ttl;
961 int answer_len;
962 uint8_t answer_type;
963 if (conn->_base.state != AP_CONN_STATE_RESOLVE_WAIT) {
964 log_fn(LOG_PROTOCOL_WARN, LD_APP, "Got a 'resolved' cell while "
965 "not in state resolve_wait. Dropping.");
966 return 0;
968 tor_assert(SOCKS_COMMAND_IS_RESOLVE(conn->socks_request->command));
969 answer_len = cell->payload[RELAY_HEADER_SIZE+1];
970 if (rh->length < 2 || answer_len+2>rh->length) {
971 log_fn(LOG_PROTOCOL_WARN, LD_PROTOCOL,
972 "Dropping malformed 'resolved' cell");
973 connection_mark_unattached_ap(conn, END_STREAM_REASON_TORPROTOCOL);
974 return 0;
976 answer_type = cell->payload[RELAY_HEADER_SIZE];
977 if (rh->length >= answer_len+6)
978 ttl = (int)ntohl(get_uint32(cell->payload+RELAY_HEADER_SIZE+
979 2+answer_len));
980 else
981 ttl = -1;
982 if (answer_type == RESOLVED_TYPE_IPV4 && answer_len == 4) {
983 uint32_t addr = ntohl(get_uint32(cell->payload+RELAY_HEADER_SIZE+2));
984 if (get_options()->ClientDNSRejectInternalAddresses &&
985 is_internal_IP(addr, 0)) {
986 char buf[INET_NTOA_BUF_LEN];
987 struct in_addr a;
988 a.s_addr = htonl(addr);
989 tor_inet_ntoa(&a, buf, sizeof(buf));
990 log_info(LD_APP,"Got a resolve with answer %s. Rejecting.", buf);
991 connection_ap_handshake_socks_resolved(conn,
992 RESOLVED_TYPE_ERROR_TRANSIENT,
993 0, NULL, 0, TIME_MAX);
994 connection_mark_unattached_ap(conn, END_STREAM_REASON_TORPROTOCOL);
995 return 0;
998 connection_ap_handshake_socks_resolved(conn,
999 answer_type,
1000 cell->payload[RELAY_HEADER_SIZE+1], /*answer_len*/
1001 cell->payload+RELAY_HEADER_SIZE+2, /*answer*/
1002 ttl,
1003 -1);
1004 if (answer_type == RESOLVED_TYPE_IPV4 && answer_len == 4) {
1005 uint32_t addr = ntohl(get_uint32(cell->payload+RELAY_HEADER_SIZE+2));
1006 remap_event_helper(conn, addr);
1008 connection_mark_unattached_ap(conn,
1009 END_STREAM_REASON_DONE |
1010 END_STREAM_REASON_FLAG_ALREADY_SOCKS_REPLIED);
1011 return 0;
1014 log_fn(LOG_PROTOCOL_WARN, LD_PROTOCOL,
1015 "Got an unexpected relay command %d, in state %d (%s). Dropping.",
1016 rh->command, conn->_base.state,
1017 conn_state_to_string(conn->_base.type, conn->_base.state));
1018 return 0; /* for forward compatibility, don't kill the circuit */
1019 // connection_edge_end(conn, END_STREAM_REASON_TORPROTOCOL);
1020 // connection_mark_for_close(conn);
1021 // return -1;
1024 /** An incoming relay cell has arrived on circuit <b>circ</b>. If
1025 * <b>conn</b> is NULL this is a control cell, else <b>cell</b> is
1026 * destined for <b>conn</b>.
1028 * If <b>layer_hint</b> is defined, then we're the origin of the
1029 * circuit, and it specifies the hop that packaged <b>cell</b>.
1031 * Return -reason if you want to warn and tear down the circuit, else 0.
1033 static int
1034 connection_edge_process_relay_cell(cell_t *cell, circuit_t *circ,
1035 edge_connection_t *conn,
1036 crypt_path_t *layer_hint)
1038 static int num_seen=0;
1039 relay_header_t rh;
1040 unsigned domain = layer_hint?LD_APP:LD_EXIT;
1041 int reason;
1043 tor_assert(cell);
1044 tor_assert(circ);
1046 relay_header_unpack(&rh, cell->payload);
1047 // log_fn(LOG_DEBUG,"command %d stream %d", rh.command, rh.stream_id);
1048 num_seen++;
1049 log_debug(domain, "Now seen %d relay cells here (command %d, stream %d).",
1050 num_seen, rh.command, rh.stream_id);
1052 if (rh.length > RELAY_PAYLOAD_SIZE) {
1053 log_fn(LOG_PROTOCOL_WARN, LD_PROTOCOL,
1054 "Relay cell length field too long. Closing circuit.");
1055 return - END_CIRC_REASON_TORPROTOCOL;
1058 /* either conn is NULL, in which case we've got a control cell, or else
1059 * conn points to the recognized stream. */
1061 if (conn && !connection_state_is_open(TO_CONN(conn)))
1062 return connection_edge_process_relay_cell_not_open(
1063 &rh, cell, circ, conn, layer_hint);
1065 switch (rh.command) {
1066 case RELAY_COMMAND_DROP:
1067 // log_info(domain,"Got a relay-level padding cell. Dropping.");
1068 return 0;
1069 case RELAY_COMMAND_BEGIN:
1070 case RELAY_COMMAND_BEGIN_DIR:
1071 if (layer_hint &&
1072 circ->purpose != CIRCUIT_PURPOSE_S_REND_JOINED) {
1073 log_fn(LOG_PROTOCOL_WARN, LD_APP,
1074 "Relay begin request unsupported at AP. Dropping.");
1075 return 0;
1077 if (circ->purpose == CIRCUIT_PURPOSE_S_REND_JOINED &&
1078 layer_hint != TO_ORIGIN_CIRCUIT(circ)->cpath->prev) {
1079 log_fn(LOG_PROTOCOL_WARN, LD_APP,
1080 "Relay begin request to Hidden Service "
1081 "from intermediary node. Dropping.");
1082 return 0;
1084 if (conn) {
1085 log_fn(LOG_PROTOCOL_WARN, domain,
1086 "Begin cell for known stream. Dropping.");
1087 return 0;
1089 if (rh.command == RELAY_COMMAND_BEGIN_DIR) {
1090 /* Assign this circuit and its app-ward OR connection a unique ID,
1091 * so that we can measure download times. The local edge and dir
1092 * connection will be assigned the same ID when they are created
1093 * and linked. */
1094 static uint64_t next_id = 0;
1095 circ->dirreq_id = ++next_id;
1096 TO_CONN(TO_OR_CIRCUIT(circ)->p_conn)->dirreq_id = circ->dirreq_id;
1099 return connection_exit_begin_conn(cell, circ);
1100 case RELAY_COMMAND_DATA:
1101 ++stats_n_data_cells_received;
1102 if (( layer_hint && --layer_hint->deliver_window < 0) ||
1103 (!layer_hint && --circ->deliver_window < 0)) {
1104 log_fn(LOG_PROTOCOL_WARN, LD_PROTOCOL,
1105 "(relay data) circ deliver_window below 0. Killing.");
1106 connection_edge_end(conn, END_STREAM_REASON_TORPROTOCOL);
1107 connection_mark_for_close(TO_CONN(conn));
1108 return -END_CIRC_REASON_TORPROTOCOL;
1110 log_debug(domain,"circ deliver_window now %d.", layer_hint ?
1111 layer_hint->deliver_window : circ->deliver_window);
1113 circuit_consider_sending_sendme(circ, layer_hint);
1115 if (!conn) {
1116 log_info(domain,"data cell dropped, unknown stream (streamid %d).",
1117 rh.stream_id);
1118 return 0;
1121 if (--conn->deliver_window < 0) { /* is it below 0 after decrement? */
1122 log_fn(LOG_PROTOCOL_WARN, LD_PROTOCOL,
1123 "(relay data) conn deliver_window below 0. Killing.");
1124 return -END_CIRC_REASON_TORPROTOCOL;
1127 stats_n_data_bytes_received += rh.length;
1128 connection_write_to_buf(cell->payload + RELAY_HEADER_SIZE,
1129 rh.length, TO_CONN(conn));
1130 connection_edge_consider_sending_sendme(conn);
1131 return 0;
1132 case RELAY_COMMAND_END:
1133 reason = rh.length > 0 ?
1134 *(uint8_t *)(cell->payload+RELAY_HEADER_SIZE) : END_STREAM_REASON_MISC;
1135 if (!conn) {
1136 log_info(domain,"end cell (%s) dropped, unknown stream.",
1137 stream_end_reason_to_string(reason));
1138 return 0;
1140 /* XXX add to this log_fn the exit node's nickname? */
1141 log_info(domain,"%d: end cell (%s) for stream %d. Removing stream.",
1142 conn->_base.s,
1143 stream_end_reason_to_string(reason),
1144 conn->stream_id);
1145 if (conn->socks_request && !conn->socks_request->has_finished)
1146 log_warn(LD_BUG,
1147 "open stream hasn't sent socks answer yet? Closing.");
1148 /* We just *got* an end; no reason to send one. */
1149 conn->edge_has_sent_end = 1;
1150 if (!conn->end_reason)
1151 conn->end_reason = reason | END_STREAM_REASON_FLAG_REMOTE;
1152 if (!conn->_base.marked_for_close) {
1153 /* only mark it if not already marked. it's possible to
1154 * get the 'end' right around when the client hangs up on us. */
1155 connection_mark_for_close(TO_CONN(conn));
1156 conn->_base.hold_open_until_flushed = 1;
1158 return 0;
1159 case RELAY_COMMAND_EXTEND:
1160 if (conn) {
1161 log_fn(LOG_PROTOCOL_WARN, domain,
1162 "'extend' cell received for non-zero stream. Dropping.");
1163 return 0;
1165 return circuit_extend(cell, circ);
1166 case RELAY_COMMAND_EXTENDED:
1167 if (!layer_hint) {
1168 log_fn(LOG_PROTOCOL_WARN, LD_PROTOCOL,
1169 "'extended' unsupported at non-origin. Dropping.");
1170 return 0;
1172 log_debug(domain,"Got an extended cell! Yay.");
1173 if ((reason = circuit_finish_handshake(TO_ORIGIN_CIRCUIT(circ),
1174 CELL_CREATED,
1175 cell->payload+RELAY_HEADER_SIZE)) < 0) {
1176 log_warn(domain,"circuit_finish_handshake failed.");
1177 return reason;
1179 if ((reason=circuit_send_next_onion_skin(TO_ORIGIN_CIRCUIT(circ)))<0) {
1180 log_info(domain,"circuit_send_next_onion_skin() failed.");
1181 return reason;
1183 return 0;
1184 case RELAY_COMMAND_TRUNCATE:
1185 if (layer_hint) {
1186 log_fn(LOG_PROTOCOL_WARN, LD_APP,
1187 "'truncate' unsupported at origin. Dropping.");
1188 return 0;
1190 if (circ->n_conn) {
1191 uint8_t trunc_reason = *(uint8_t*)(cell->payload + RELAY_HEADER_SIZE);
1192 connection_or_send_destroy(circ->n_circ_id, circ->n_conn,
1193 trunc_reason);
1194 circuit_set_n_circid_orconn(circ, 0, NULL);
1196 log_debug(LD_EXIT, "Processed 'truncate', replying.");
1198 char payload[1];
1199 payload[0] = (char)END_CIRC_REASON_REQUESTED;
1200 relay_send_command_from_edge(0, circ, RELAY_COMMAND_TRUNCATED,
1201 payload, sizeof(payload), NULL);
1203 return 0;
1204 case RELAY_COMMAND_TRUNCATED:
1205 if (!layer_hint) {
1206 log_fn(LOG_PROTOCOL_WARN, LD_EXIT,
1207 "'truncated' unsupported at non-origin. Dropping.");
1208 return 0;
1210 circuit_truncated(TO_ORIGIN_CIRCUIT(circ), layer_hint);
1211 return 0;
1212 case RELAY_COMMAND_CONNECTED:
1213 if (conn) {
1214 log_fn(LOG_PROTOCOL_WARN, LD_PROTOCOL,
1215 "'connected' unsupported while open. Closing circ.");
1216 return -END_CIRC_REASON_TORPROTOCOL;
1218 log_info(domain,
1219 "'connected' received, no conn attached anymore. Ignoring.");
1220 return 0;
1221 case RELAY_COMMAND_SENDME:
1222 if (!conn) {
1223 if (layer_hint) {
1224 layer_hint->package_window += CIRCWINDOW_INCREMENT;
1225 log_debug(LD_APP,"circ-level sendme at origin, packagewindow %d.",
1226 layer_hint->package_window);
1227 circuit_resume_edge_reading(circ, layer_hint);
1228 } else {
1229 circ->package_window += CIRCWINDOW_INCREMENT;
1230 log_debug(LD_APP,
1231 "circ-level sendme at non-origin, packagewindow %d.",
1232 circ->package_window);
1233 circuit_resume_edge_reading(circ, layer_hint);
1235 return 0;
1237 conn->package_window += STREAMWINDOW_INCREMENT;
1238 log_debug(domain,"stream-level sendme, packagewindow now %d.",
1239 conn->package_window);
1240 if (circuit_queue_streams_are_blocked(circ)) {
1241 /* Still waiting for queue to flush; don't touch conn */
1242 return 0;
1244 connection_start_reading(TO_CONN(conn));
1245 /* handle whatever might still be on the inbuf */
1246 if (connection_edge_package_raw_inbuf(conn, 1) < 0) {
1247 /* (We already sent an end cell if possible) */
1248 connection_mark_for_close(TO_CONN(conn));
1249 return 0;
1251 return 0;
1252 case RELAY_COMMAND_RESOLVE:
1253 if (layer_hint) {
1254 log_fn(LOG_PROTOCOL_WARN, LD_APP,
1255 "resolve request unsupported at AP; dropping.");
1256 return 0;
1257 } else if (conn) {
1258 log_fn(LOG_PROTOCOL_WARN, domain,
1259 "resolve request for known stream; dropping.");
1260 return 0;
1261 } else if (circ->purpose != CIRCUIT_PURPOSE_OR) {
1262 log_fn(LOG_PROTOCOL_WARN, domain,
1263 "resolve request on circ with purpose %d; dropping",
1264 circ->purpose);
1265 return 0;
1267 connection_exit_begin_resolve(cell, TO_OR_CIRCUIT(circ));
1268 return 0;
1269 case RELAY_COMMAND_RESOLVED:
1270 if (conn) {
1271 log_fn(LOG_PROTOCOL_WARN, domain,
1272 "'resolved' unsupported while open. Closing circ.");
1273 return -END_CIRC_REASON_TORPROTOCOL;
1275 log_info(domain,
1276 "'resolved' received, no conn attached anymore. Ignoring.");
1277 return 0;
1278 case RELAY_COMMAND_ESTABLISH_INTRO:
1279 case RELAY_COMMAND_ESTABLISH_RENDEZVOUS:
1280 case RELAY_COMMAND_INTRODUCE1:
1281 case RELAY_COMMAND_INTRODUCE2:
1282 case RELAY_COMMAND_INTRODUCE_ACK:
1283 case RELAY_COMMAND_RENDEZVOUS1:
1284 case RELAY_COMMAND_RENDEZVOUS2:
1285 case RELAY_COMMAND_INTRO_ESTABLISHED:
1286 case RELAY_COMMAND_RENDEZVOUS_ESTABLISHED:
1287 rend_process_relay_cell(circ, layer_hint,
1288 rh.command, rh.length,
1289 cell->payload+RELAY_HEADER_SIZE);
1290 return 0;
1292 log_fn(LOG_PROTOCOL_WARN, LD_PROTOCOL,
1293 "Received unknown relay command %d. Perhaps the other side is using "
1294 "a newer version of Tor? Dropping.",
1295 rh.command);
1296 return 0; /* for forward compatibility, don't kill the circuit */
1299 /** How many relay_data cells have we built, ever? */
1300 uint64_t stats_n_data_cells_packaged = 0;
1301 /** How many bytes of data have we put in relay_data cells have we built,
1302 * ever? This would be RELAY_PAYLOAD_SIZE*stats_n_data_cells_packaged if
1303 * every relay cell we ever sent were completely full of data. */
1304 uint64_t stats_n_data_bytes_packaged = 0;
1305 /** How many relay_data cells have we received, ever? */
1306 uint64_t stats_n_data_cells_received = 0;
1307 /** How many bytes of data have we received relay_data cells, ever? This would
1308 * be RELAY_PAYLOAD_SIZE*stats_n_data_cells_packaged if every relay cell we
1309 * ever received were completely full of data. */
1310 uint64_t stats_n_data_bytes_received = 0;
1312 /** While conn->inbuf has an entire relay payload of bytes on it,
1313 * and the appropriate package windows aren't empty, grab a cell
1314 * and send it down the circuit.
1316 * Return -1 (and send a RELAY_COMMAND_END cell if necessary) if conn should
1317 * be marked for close, else return 0.
1320 connection_edge_package_raw_inbuf(edge_connection_t *conn, int package_partial)
1322 size_t amount_to_process, length;
1323 char payload[CELL_PAYLOAD_SIZE];
1324 circuit_t *circ;
1325 unsigned domain = conn->cpath_layer ? LD_APP : LD_EXIT;
1327 tor_assert(conn);
1329 if (conn->_base.marked_for_close) {
1330 log_warn(LD_BUG,
1331 "called on conn that's already marked for close at %s:%d.",
1332 conn->_base.marked_for_close_file, conn->_base.marked_for_close);
1333 return 0;
1336 repeat_connection_edge_package_raw_inbuf:
1338 circ = circuit_get_by_edge_conn(conn);
1339 if (!circ) {
1340 log_info(domain,"conn has no circuit! Closing.");
1341 conn->end_reason = END_STREAM_REASON_CANT_ATTACH;
1342 return -1;
1345 if (circuit_consider_stop_edge_reading(circ, conn->cpath_layer))
1346 return 0;
1348 if (conn->package_window <= 0) {
1349 log_info(domain,"called with package_window %d. Skipping.",
1350 conn->package_window);
1351 connection_stop_reading(TO_CONN(conn));
1352 return 0;
1355 amount_to_process = buf_datalen(conn->_base.inbuf);
1357 if (!amount_to_process)
1358 return 0;
1360 if (!package_partial && amount_to_process < RELAY_PAYLOAD_SIZE)
1361 return 0;
1363 if (amount_to_process > RELAY_PAYLOAD_SIZE) {
1364 length = RELAY_PAYLOAD_SIZE;
1365 } else {
1366 length = amount_to_process;
1368 stats_n_data_bytes_packaged += length;
1369 stats_n_data_cells_packaged += 1;
1371 connection_fetch_from_buf(payload, length, TO_CONN(conn));
1373 log_debug(domain,"(%d) Packaging %d bytes (%d waiting).", conn->_base.s,
1374 (int)length, (int)buf_datalen(conn->_base.inbuf));
1376 if (connection_edge_send_command(conn, RELAY_COMMAND_DATA,
1377 payload, length) < 0 )
1378 /* circuit got marked for close, don't continue, don't need to mark conn */
1379 return 0;
1381 if (!conn->cpath_layer) { /* non-rendezvous exit */
1382 tor_assert(circ->package_window > 0);
1383 circ->package_window--;
1384 } else { /* we're an AP, or an exit on a rendezvous circ */
1385 tor_assert(conn->cpath_layer->package_window > 0);
1386 conn->cpath_layer->package_window--;
1389 if (--conn->package_window <= 0) { /* is it 0 after decrement? */
1390 connection_stop_reading(TO_CONN(conn));
1391 log_debug(domain,"conn->package_window reached 0.");
1392 circuit_consider_stop_edge_reading(circ, conn->cpath_layer);
1393 return 0; /* don't process the inbuf any more */
1395 log_debug(domain,"conn->package_window is now %d",conn->package_window);
1397 /* handle more if there's more, or return 0 if there isn't */
1398 goto repeat_connection_edge_package_raw_inbuf;
1401 /** Called when we've just received a relay data cell, or when
1402 * we've just finished flushing all bytes to stream <b>conn</b>.
1404 * If conn->outbuf is not too full, and our deliver window is
1405 * low, send back a suitable number of stream-level sendme cells.
1407 void
1408 connection_edge_consider_sending_sendme(edge_connection_t *conn)
1410 circuit_t *circ;
1412 if (connection_outbuf_too_full(TO_CONN(conn)))
1413 return;
1415 circ = circuit_get_by_edge_conn(conn);
1416 if (!circ) {
1417 /* this can legitimately happen if the destroy has already
1418 * arrived and torn down the circuit */
1419 log_info(LD_APP,"No circuit associated with conn. Skipping.");
1420 return;
1423 while (conn->deliver_window <= STREAMWINDOW_START - STREAMWINDOW_INCREMENT) {
1424 log_debug(conn->cpath_layer?LD_APP:LD_EXIT,
1425 "Outbuf %d, Queuing stream sendme.",
1426 (int)conn->_base.outbuf_flushlen);
1427 conn->deliver_window += STREAMWINDOW_INCREMENT;
1428 if (connection_edge_send_command(conn, RELAY_COMMAND_SENDME,
1429 NULL, 0) < 0) {
1430 log_warn(LD_APP,"connection_edge_send_command failed. Skipping.");
1431 return; /* the circuit's closed, don't continue */
1436 /** The circuit <b>circ</b> has received a circuit-level sendme
1437 * (on hop <b>layer_hint</b>, if we're the OP). Go through all the
1438 * attached streams and let them resume reading and packaging, if
1439 * their stream windows allow it.
1441 static void
1442 circuit_resume_edge_reading(circuit_t *circ, crypt_path_t *layer_hint)
1444 if (circuit_queue_streams_are_blocked(circ)) {
1445 log_debug(layer_hint?LD_APP:LD_EXIT,"Too big queue, no resuming");
1446 return;
1448 log_debug(layer_hint?LD_APP:LD_EXIT,"resuming");
1450 if (CIRCUIT_IS_ORIGIN(circ))
1451 circuit_resume_edge_reading_helper(TO_ORIGIN_CIRCUIT(circ)->p_streams,
1452 circ, layer_hint);
1453 else
1454 circuit_resume_edge_reading_helper(TO_OR_CIRCUIT(circ)->n_streams,
1455 circ, layer_hint);
1458 /** A helper function for circuit_resume_edge_reading() above.
1459 * The arguments are the same, except that <b>conn</b> is the head
1460 * of a linked list of edge streams that should each be considered.
1462 static int
1463 circuit_resume_edge_reading_helper(edge_connection_t *conn,
1464 circuit_t *circ,
1465 crypt_path_t *layer_hint)
1467 for ( ; conn; conn=conn->next_stream) {
1468 if (conn->_base.marked_for_close)
1469 continue;
1470 if ((!layer_hint && conn->package_window > 0) ||
1471 (layer_hint && conn->package_window > 0 &&
1472 conn->cpath_layer == layer_hint)) {
1473 connection_start_reading(TO_CONN(conn));
1474 /* handle whatever might still be on the inbuf */
1475 if (connection_edge_package_raw_inbuf(conn, 1)<0) {
1476 /* (We already sent an end cell if possible) */
1477 connection_mark_for_close(TO_CONN(conn));
1478 continue;
1481 /* If the circuit won't accept any more data, return without looking
1482 * at any more of the streams. Any connections that should be stopped
1483 * have already been stopped by connection_edge_package_raw_inbuf. */
1484 if (circuit_consider_stop_edge_reading(circ, layer_hint))
1485 return -1;
1488 return 0;
1491 /** Check if the package window for <b>circ</b> is empty (at
1492 * hop <b>layer_hint</b> if it's defined).
1494 * If yes, tell edge streams to stop reading and return 1.
1495 * Else return 0.
1497 static int
1498 circuit_consider_stop_edge_reading(circuit_t *circ, crypt_path_t *layer_hint)
1500 edge_connection_t *conn = NULL;
1501 unsigned domain = layer_hint ? LD_APP : LD_EXIT;
1503 if (!layer_hint) {
1504 or_circuit_t *or_circ = TO_OR_CIRCUIT(circ);
1505 log_debug(domain,"considering circ->package_window %d",
1506 circ->package_window);
1507 if (circ->package_window <= 0) {
1508 log_debug(domain,"yes, not-at-origin. stopped.");
1509 for (conn = or_circ->n_streams; conn; conn=conn->next_stream)
1510 connection_stop_reading(TO_CONN(conn));
1511 return 1;
1513 return 0;
1515 /* else, layer hint is defined, use it */
1516 log_debug(domain,"considering layer_hint->package_window %d",
1517 layer_hint->package_window);
1518 if (layer_hint->package_window <= 0) {
1519 log_debug(domain,"yes, at-origin. stopped.");
1520 for (conn = TO_ORIGIN_CIRCUIT(circ)->p_streams; conn;
1521 conn=conn->next_stream)
1522 if (conn->cpath_layer == layer_hint)
1523 connection_stop_reading(TO_CONN(conn));
1524 return 1;
1526 return 0;
1529 /** Check if the deliver_window for circuit <b>circ</b> (at hop
1530 * <b>layer_hint</b> if it's defined) is low enough that we should
1531 * send a circuit-level sendme back down the circuit. If so, send
1532 * enough sendmes that the window would be overfull if we sent any
1533 * more.
1535 static void
1536 circuit_consider_sending_sendme(circuit_t *circ, crypt_path_t *layer_hint)
1538 // log_fn(LOG_INFO,"Considering: layer_hint is %s",
1539 // layer_hint ? "defined" : "null");
1540 while ((layer_hint ? layer_hint->deliver_window : circ->deliver_window) <=
1541 CIRCWINDOW_START - CIRCWINDOW_INCREMENT) {
1542 log_debug(LD_CIRC,"Queuing circuit sendme.");
1543 if (layer_hint)
1544 layer_hint->deliver_window += CIRCWINDOW_INCREMENT;
1545 else
1546 circ->deliver_window += CIRCWINDOW_INCREMENT;
1547 if (relay_send_command_from_edge(0, circ, RELAY_COMMAND_SENDME,
1548 NULL, 0, layer_hint) < 0) {
1549 log_warn(LD_CIRC,
1550 "relay_send_command_from_edge failed. Circuit's closed.");
1551 return; /* the circuit's closed, don't continue */
1556 /** Stop reading on edge connections when we have this many cells
1557 * waiting on the appropriate queue. */
1558 #define CELL_QUEUE_HIGHWATER_SIZE 256
1559 /** Start reading from edge connections again when we get down to this many
1560 * cells. */
1561 #define CELL_QUEUE_LOWWATER_SIZE 64
1563 #ifdef ACTIVE_CIRCUITS_PARANOIA
1564 #define assert_active_circuits_ok_paranoid(conn) \
1565 assert_active_circuits_ok(conn)
1566 #else
1567 #define assert_active_circuits_ok_paranoid(conn)
1568 #endif
1570 /** The total number of cells we have allocated from the memory pool. */
1571 static int total_cells_allocated = 0;
1573 /** A memory pool to allocate packed_cell_t objects. */
1574 static mp_pool_t *cell_pool = NULL;
1576 /** Memory pool to allocate insertion_time_elem_t objects used for cell
1577 * statistics. */
1578 static mp_pool_t *it_pool = NULL;
1580 /** Allocate structures to hold cells. */
1581 void
1582 init_cell_pool(void)
1584 tor_assert(!cell_pool);
1585 cell_pool = mp_pool_new(sizeof(packed_cell_t), 128*1024);
1588 /** Free all storage used to hold cells (and insertion times if we measure
1589 * cell statistics). */
1590 void
1591 free_cell_pool(void)
1593 /* Maybe we haven't called init_cell_pool yet; need to check for it. */
1594 if (cell_pool) {
1595 mp_pool_destroy(cell_pool);
1596 cell_pool = NULL;
1598 if (it_pool) {
1599 mp_pool_destroy(it_pool);
1600 it_pool = NULL;
1604 /** Free excess storage in cell pool. */
1605 void
1606 clean_cell_pool(void)
1608 tor_assert(cell_pool);
1609 mp_pool_clean(cell_pool, 0, 1);
1612 /** Release storage held by <b>cell</b>. */
1613 static INLINE void
1614 packed_cell_free_unchecked(packed_cell_t *cell)
1616 --total_cells_allocated;
1617 mp_pool_release(cell);
1620 /** Allocate and return a new packed_cell_t. */
1621 static INLINE packed_cell_t *
1622 packed_cell_alloc(void)
1624 ++total_cells_allocated;
1625 return mp_pool_get(cell_pool);
1628 /** Log current statistics for cell pool allocation at log level
1629 * <b>severity</b>. */
1630 void
1631 dump_cell_pool_usage(int severity)
1633 circuit_t *c;
1634 int n_circs = 0;
1635 int n_cells = 0;
1636 for (c = _circuit_get_global_list(); c; c = c->next) {
1637 n_cells += c->n_conn_cells.n;
1638 if (!CIRCUIT_IS_ORIGIN(c))
1639 n_cells += TO_OR_CIRCUIT(c)->p_conn_cells.n;
1640 ++n_circs;
1642 log(severity, LD_MM, "%d cells allocated on %d circuits. %d cells leaked.",
1643 n_cells, n_circs, total_cells_allocated - n_cells);
1644 mp_pool_log_status(cell_pool, severity);
1647 /** Allocate a new copy of packed <b>cell</b>. */
1648 static INLINE packed_cell_t *
1649 packed_cell_copy(const cell_t *cell)
1651 packed_cell_t *c = packed_cell_alloc();
1652 cell_pack(c, cell);
1653 c->next = NULL;
1654 return c;
1657 /** Append <b>cell</b> to the end of <b>queue</b>. */
1658 void
1659 cell_queue_append(cell_queue_t *queue, packed_cell_t *cell)
1661 if (queue->tail) {
1662 tor_assert(!queue->tail->next);
1663 queue->tail->next = cell;
1664 } else {
1665 queue->head = cell;
1667 queue->tail = cell;
1668 cell->next = NULL;
1669 ++queue->n;
1672 /** Append a newly allocated copy of <b>cell</b> to the end of <b>queue</b> */
1673 void
1674 cell_queue_append_packed_copy(cell_queue_t *queue, const cell_t *cell)
1676 packed_cell_t *copy = packed_cell_copy(cell);
1677 /* Remember the time when this cell was put in the queue. */
1678 if (get_options()->CellStatistics) {
1679 struct timeval now;
1680 uint32_t added;
1681 insertion_time_queue_t *it_queue = queue->insertion_times;
1682 if (!it_pool)
1683 it_pool = mp_pool_new(sizeof(insertion_time_elem_t), 1024);
1684 tor_gettimeofday_cached(&now);
1685 #define SECONDS_IN_A_DAY 86400L
1686 added = (uint32_t)(((now.tv_sec % SECONDS_IN_A_DAY) * 100L)
1687 + ((uint32_t)now.tv_usec / (uint32_t)10000L));
1688 if (!it_queue) {
1689 it_queue = tor_malloc_zero(sizeof(insertion_time_queue_t));
1690 queue->insertion_times = it_queue;
1692 if (it_queue->last && it_queue->last->insertion_time == added) {
1693 it_queue->last->counter++;
1694 } else {
1695 insertion_time_elem_t *elem = mp_pool_get(it_pool);
1696 elem->next = NULL;
1697 elem->insertion_time = added;
1698 elem->counter = 1;
1699 if (it_queue->last) {
1700 it_queue->last->next = elem;
1701 it_queue->last = elem;
1702 } else {
1703 it_queue->first = it_queue->last = elem;
1707 cell_queue_append(queue, copy);
1710 /** Remove and free every cell in <b>queue</b>. */
1711 void
1712 cell_queue_clear(cell_queue_t *queue)
1714 packed_cell_t *cell, *next;
1715 cell = queue->head;
1716 while (cell) {
1717 next = cell->next;
1718 packed_cell_free_unchecked(cell);
1719 cell = next;
1721 queue->head = queue->tail = NULL;
1722 queue->n = 0;
1723 if (queue->insertion_times) {
1724 while (queue->insertion_times->first) {
1725 insertion_time_elem_t *elem = queue->insertion_times->first;
1726 queue->insertion_times->first = elem->next;
1727 mp_pool_release(elem);
1729 tor_free(queue->insertion_times);
1733 /** Extract and return the cell at the head of <b>queue</b>; return NULL if
1734 * <b>queue</b> is empty. */
1735 static INLINE packed_cell_t *
1736 cell_queue_pop(cell_queue_t *queue)
1738 packed_cell_t *cell = queue->head;
1739 if (!cell)
1740 return NULL;
1741 queue->head = cell->next;
1742 if (cell == queue->tail) {
1743 tor_assert(!queue->head);
1744 queue->tail = NULL;
1746 --queue->n;
1747 return cell;
1750 /** Return a pointer to the "next_active_on_{n,p}_conn" pointer of <b>circ</b>,
1751 * depending on whether <b>conn</b> matches n_conn or p_conn. */
1752 static INLINE circuit_t **
1753 next_circ_on_conn_p(circuit_t *circ, or_connection_t *conn)
1755 tor_assert(circ);
1756 tor_assert(conn);
1757 if (conn == circ->n_conn) {
1758 return &circ->next_active_on_n_conn;
1759 } else {
1760 or_circuit_t *orcirc = TO_OR_CIRCUIT(circ);
1761 tor_assert(conn == orcirc->p_conn);
1762 return &orcirc->next_active_on_p_conn;
1766 /** Return a pointer to the "prev_active_on_{n,p}_conn" pointer of <b>circ</b>,
1767 * depending on whether <b>conn</b> matches n_conn or p_conn. */
1768 static INLINE circuit_t **
1769 prev_circ_on_conn_p(circuit_t *circ, or_connection_t *conn)
1771 tor_assert(circ);
1772 tor_assert(conn);
1773 if (conn == circ->n_conn) {
1774 return &circ->prev_active_on_n_conn;
1775 } else {
1776 or_circuit_t *orcirc = TO_OR_CIRCUIT(circ);
1777 tor_assert(conn == orcirc->p_conn);
1778 return &orcirc->prev_active_on_p_conn;
1782 /** Helper for sorting cell_ewma_t values in their priority queue. */
1783 static int
1784 compare_cell_ewma_counts(const void *p1, const void *p2)
1786 const cell_ewma_t *e1=p1, *e2=p2;
1787 if (e1->cell_count < e2->cell_count)
1788 return -1;
1789 else if (e1->cell_count > e2->cell_count)
1790 return 1;
1791 else
1792 return 0;
1795 /** Given a cell_ewma_t, return a pointer to the circuit containing it. */
1796 static circuit_t *
1797 cell_ewma_to_circuit(cell_ewma_t *ewma)
1799 if (ewma->is_for_p_conn) {
1800 /* This is an or_circuit_t's p_cell_ewma. */
1801 or_circuit_t *orcirc = SUBTYPE_P(ewma, or_circuit_t, p_cell_ewma);
1802 return TO_CIRCUIT(orcirc);
1803 } else {
1804 /* This is some circuit's n_cell_ewma. */
1805 return SUBTYPE_P(ewma, circuit_t, n_cell_ewma);
1809 /* ==== Functions for scaling cell_ewma_t ====
1811 When choosing which cells to relay first, we favor circuits that have been
1812 quiet recently. This gives better latency on connections that aren't
1813 pushing lots of data, and makes the network feel more interactive.
1815 Conceptually, we take an exponentially weighted mean average of the number
1816 of cells a circuit has sent, and allow active circuits (those with cells to
1817 relay) to send cells in reverse order of their exponentially-weighted mean
1818 average (EWMA) cell count. [That is, a cell sent N seconds ago 'counts'
1819 F^N times as much as a cell sent now, for 0<F<1.0, and we favor the
1820 circuit that has sent the fewest cells]
1822 If 'double' had infinite precision, we could do this simply by counting a
1823 cell sent at startup as having weight 1.0, and a cell sent N seconds later
1824 as having weight F^-N. This way, we would never need to re-scale
1825 any already-sent cells.
1827 To prevent double from overflowing, we could count a cell sent now as
1828 having weight 1.0 and a cell sent N seconds ago as having weight F^N.
1829 This, however, would mean we'd need to re-scale *ALL* old circuits every
1830 time we wanted to send a cell.
1832 So as a compromise, we divide time into 'ticks' (currently, 10-second
1833 increments) and say that a cell sent at the start of a current tick is
1834 worth 1.0, a cell sent N seconds before the start of the current tick is
1835 worth F^N, and a cell sent N seconds after the start of the current tick is
1836 worth F^-N. This way we don't overflow, and we don't need to constantly
1837 rescale.
1840 /** How long does a tick last (seconds)? */
1841 #define EWMA_TICK_LEN 10
1843 /** The default per-tick scale factor, if it hasn't been overridden by a
1844 * consensus or a configuration setting. zero means "disabled". */
1845 #define EWMA_DEFAULT_HALFLIFE 0.0
1847 /** Given a timeval <b>now</b>, compute the cell_ewma tick in which it occurs
1848 * and the fraction of the tick that has elapsed between the start of the tick
1849 * and <b>now</b>. Return the former and store the latter in
1850 * *<b>remainder_out</b>.
1852 * These tick values are not meant to be shared between Tor instances, or used
1853 * for other purposes. */
1854 static unsigned
1855 cell_ewma_tick_from_timeval(const struct timeval *now,
1856 double *remainder_out)
1858 unsigned res = (unsigned) (now->tv_sec / EWMA_TICK_LEN);
1859 /* rem */
1860 double rem = (now->tv_sec % EWMA_TICK_LEN) +
1861 ((double)(now->tv_usec)) / 1.0e6;
1862 *remainder_out = rem / EWMA_TICK_LEN;
1863 return res;
1866 /** Compute and return the current cell_ewma tick. */
1867 unsigned
1868 cell_ewma_get_tick(void)
1870 return ((unsigned)approx_time() / EWMA_TICK_LEN);
1873 /** The per-tick scale factor to be used when computing cell-count EWMA
1874 * values. (A cell sent N ticks before the start of the current tick
1875 * has value ewma_scale_factor ** N.)
1877 static double ewma_scale_factor = 0.1;
1878 static int ewma_enabled = 0;
1880 #define EPSILON 0.00001
1881 #define LOG_ONEHALF -0.69314718055994529
1883 /** Adjust the global cell scale factor based on <b>options</b> */
1884 void
1885 cell_ewma_set_scale_factor(or_options_t *options, networkstatus_t *consensus)
1887 int32_t halflife_ms;
1888 double halflife;
1889 const char *source;
1890 if (options && options->CircuitPriorityHalflife >= -EPSILON) {
1891 halflife = options->CircuitPriorityHalflife;
1892 source = "CircuitPriorityHalflife in configuration";
1893 } else if (consensus &&
1894 (halflife_ms = networkstatus_get_param(
1895 consensus, "CircuitPriorityHalflifeMsec", -1)) >= 0) {
1896 halflife = ((double)halflife_ms)/1000.0;
1897 source = "CircuitPriorityHalflifeMsec in consensus";
1898 } else {
1899 halflife = EWMA_DEFAULT_HALFLIFE;
1900 source = "Default value";
1903 if (halflife <= EPSILON) {
1904 /* The cell EWMA algorithm is disabled. */
1905 ewma_scale_factor = 0.1;
1906 ewma_enabled = 0;
1907 log_info(LD_OR,
1908 "Disabled cell_ewma algorithm because of value in %s",
1909 source);
1910 } else {
1911 /* convert halflife into halflife-per-tick. */
1912 halflife /= EWMA_TICK_LEN;
1913 /* compute per-tick scale factor. */
1914 ewma_scale_factor = exp( LOG_ONEHALF / halflife );
1915 ewma_enabled = 1;
1916 log_info(LD_OR,
1917 "Enabled cell_ewma algorithm because of value in %s; "
1918 "scale factor is %lf per %d seconds",
1919 source, ewma_scale_factor, EWMA_TICK_LEN);
1923 /** Return the multiplier necessary to convert the value of a cell sent in
1924 * 'from_tick' to one sent in 'to_tick'. */
1925 static INLINE double
1926 get_scale_factor(unsigned from_tick, unsigned to_tick)
1928 /* This math can wrap around, but that's okay: unsigned overflow is
1929 well-defined */
1930 int diff = (int)(to_tick - from_tick);
1931 return pow(ewma_scale_factor, diff);
1934 /** Adjust the cell count of <b>ewma</b> so that it is scaled with respect to
1935 * <b>cur_tick</b> */
1936 static void
1937 scale_single_cell_ewma(cell_ewma_t *ewma, unsigned cur_tick)
1939 double factor = get_scale_factor(ewma->last_adjusted_tick, cur_tick);
1940 ewma->cell_count *= factor;
1941 ewma->last_adjusted_tick = cur_tick;
1944 /** Adjust the cell count of every active circuit on <b>conn</b> so
1945 * that they are scaled with respect to <b>cur_tick</b> */
1946 static void
1947 scale_active_circuits(or_connection_t *conn, unsigned cur_tick)
1950 double factor = get_scale_factor(
1951 conn->active_circuit_pqueue_last_recalibrated,
1952 cur_tick);
1953 /** Ordinarily it isn't okay to change the value of an element in a heap,
1954 * but it's okay here, since we are preserving the order. */
1955 SMARTLIST_FOREACH(conn->active_circuit_pqueue, cell_ewma_t *, e, {
1956 tor_assert(e->last_adjusted_tick ==
1957 conn->active_circuit_pqueue_last_recalibrated);
1958 e->cell_count *= factor;
1959 e->last_adjusted_tick = cur_tick;
1961 conn->active_circuit_pqueue_last_recalibrated = cur_tick;
1964 /** Rescale <b>ewma</b> to the same scale as <b>conn</b>, and add it to
1965 * <b>conn</b>'s priority queue of active circuits */
1966 static void
1967 add_cell_ewma_to_conn(or_connection_t *conn, cell_ewma_t *ewma)
1969 tor_assert(ewma->heap_index == -1);
1970 scale_single_cell_ewma(ewma,
1971 conn->active_circuit_pqueue_last_recalibrated);
1973 smartlist_pqueue_add(conn->active_circuit_pqueue,
1974 compare_cell_ewma_counts,
1975 STRUCT_OFFSET(cell_ewma_t, heap_index),
1976 ewma);
1979 /** Remove <b>ewma</b> from <b>conn</b>'s priority queue of active circuits */
1980 static void
1981 remove_cell_ewma_from_conn(or_connection_t *conn, cell_ewma_t *ewma)
1983 tor_assert(ewma->heap_index != -1);
1984 smartlist_pqueue_remove(conn->active_circuit_pqueue,
1985 compare_cell_ewma_counts,
1986 STRUCT_OFFSET(cell_ewma_t, heap_index),
1987 ewma);
1990 /** Remove and return the first cell_ewma_t from conn's priority queue of
1991 * active circuits. Requires that the priority queue is nonempty. */
1992 static cell_ewma_t *
1993 pop_first_cell_ewma_from_conn(or_connection_t *conn)
1995 return smartlist_pqueue_pop(conn->active_circuit_pqueue,
1996 compare_cell_ewma_counts,
1997 STRUCT_OFFSET(cell_ewma_t, heap_index));
2000 /** Add <b>circ</b> to the list of circuits with pending cells on
2001 * <b>conn</b>. No effect if <b>circ</b> is already linked. */
2002 void
2003 make_circuit_active_on_conn(circuit_t *circ, or_connection_t *conn)
2005 circuit_t **nextp = next_circ_on_conn_p(circ, conn);
2006 circuit_t **prevp = prev_circ_on_conn_p(circ, conn);
2008 if (*nextp && *prevp) {
2009 /* Already active. */
2010 return;
2013 assert_active_circuits_ok_paranoid(conn);
2015 if (! conn->active_circuits) {
2016 conn->active_circuits = circ;
2017 *prevp = *nextp = circ;
2018 } else {
2019 circuit_t *head = conn->active_circuits;
2020 circuit_t *old_tail = *prev_circ_on_conn_p(head, conn);
2021 *next_circ_on_conn_p(old_tail, conn) = circ;
2022 *nextp = head;
2023 *prev_circ_on_conn_p(head, conn) = circ;
2024 *prevp = old_tail;
2027 if (circ->n_conn == conn) {
2028 add_cell_ewma_to_conn(conn, &circ->n_cell_ewma);
2029 } else {
2030 or_circuit_t *orcirc = TO_OR_CIRCUIT(circ);
2031 tor_assert(conn == orcirc->p_conn);
2032 add_cell_ewma_to_conn(conn, &orcirc->p_cell_ewma);
2035 assert_active_circuits_ok_paranoid(conn);
2038 /** Remove <b>circ</b> from the list of circuits with pending cells on
2039 * <b>conn</b>. No effect if <b>circ</b> is already unlinked. */
2040 void
2041 make_circuit_inactive_on_conn(circuit_t *circ, or_connection_t *conn)
2043 circuit_t **nextp = next_circ_on_conn_p(circ, conn);
2044 circuit_t **prevp = prev_circ_on_conn_p(circ, conn);
2045 circuit_t *next = *nextp, *prev = *prevp;
2047 if (!next && !prev) {
2048 /* Already inactive. */
2049 return;
2052 assert_active_circuits_ok_paranoid(conn);
2054 tor_assert(next && prev);
2055 tor_assert(*prev_circ_on_conn_p(next, conn) == circ);
2056 tor_assert(*next_circ_on_conn_p(prev, conn) == circ);
2058 if (next == circ) {
2059 conn->active_circuits = NULL;
2060 } else {
2061 *prev_circ_on_conn_p(next, conn) = prev;
2062 *next_circ_on_conn_p(prev, conn) = next;
2063 if (conn->active_circuits == circ)
2064 conn->active_circuits = next;
2066 *prevp = *nextp = NULL;
2068 if (circ->n_conn == conn) {
2069 remove_cell_ewma_from_conn(conn, &circ->n_cell_ewma);
2070 } else {
2071 or_circuit_t *orcirc = TO_OR_CIRCUIT(circ);
2072 tor_assert(conn == orcirc->p_conn);
2073 remove_cell_ewma_from_conn(conn, &orcirc->p_cell_ewma);
2076 assert_active_circuits_ok_paranoid(conn);
2079 /** Remove all circuits from the list of circuits with pending cells on
2080 * <b>conn</b>. */
2081 void
2082 connection_or_unlink_all_active_circs(or_connection_t *orconn)
2084 circuit_t *head = orconn->active_circuits;
2085 circuit_t *cur = head;
2086 if (! head)
2087 return;
2088 do {
2089 circuit_t *next = *next_circ_on_conn_p(cur, orconn);
2090 *prev_circ_on_conn_p(cur, orconn) = NULL;
2091 *next_circ_on_conn_p(cur, orconn) = NULL;
2092 cur = next;
2093 } while (cur != head);
2094 orconn->active_circuits = NULL;
2096 SMARTLIST_FOREACH(orconn->active_circuit_pqueue, cell_ewma_t *, e,
2097 e->heap_index = -1);
2098 smartlist_clear(orconn->active_circuit_pqueue);
2101 /** Block (if <b>block</b> is true) or unblock (if <b>block</b> is false)
2102 * every edge connection that is using <b>circ</b> to write to <b>orconn</b>,
2103 * and start or stop reading as appropriate.
2105 * If <b>stream_id</b> is nonzero, block only the edge connection whose
2106 * stream_id matches it.
2108 * Returns the number of streams whose status we changed.
2110 static int
2111 set_streams_blocked_on_circ(circuit_t *circ, or_connection_t *orconn,
2112 int block, uint16_t stream_id)
2114 edge_connection_t *edge = NULL;
2115 int n = 0;
2116 if (circ->n_conn == orconn) {
2117 circ->streams_blocked_on_n_conn = block;
2118 if (CIRCUIT_IS_ORIGIN(circ))
2119 edge = TO_ORIGIN_CIRCUIT(circ)->p_streams;
2120 } else {
2121 circ->streams_blocked_on_p_conn = block;
2122 tor_assert(!CIRCUIT_IS_ORIGIN(circ));
2123 edge = TO_OR_CIRCUIT(circ)->n_streams;
2126 for (; edge; edge = edge->next_stream) {
2127 connection_t *conn = TO_CONN(edge);
2128 if (stream_id && edge->stream_id != stream_id)
2129 continue;
2131 if (edge->edge_blocked_on_circ != block) {
2132 ++n;
2133 edge->edge_blocked_on_circ = block;
2136 if (!conn->read_event) {
2137 /* This connection is a placeholder for something; probably a DNS
2138 * request. It can't actually stop or start reading.*/
2139 continue;
2142 if (block) {
2143 if (connection_is_reading(conn))
2144 connection_stop_reading(conn);
2145 } else {
2146 /* Is this right? */
2147 if (!connection_is_reading(conn))
2148 connection_start_reading(conn);
2152 return n;
2155 /** Pull as many cells as possible (but no more than <b>max</b>) from the
2156 * queue of the first active circuit on <b>conn</b>, and write them to
2157 * <b>conn</b>-&gt;outbuf. Return the number of cells written. Advance
2158 * the active circuit pointer to the next active circuit in the ring. */
2160 connection_or_flush_from_first_active_circuit(or_connection_t *conn, int max,
2161 time_t now)
2163 int n_flushed;
2164 cell_queue_t *queue;
2165 circuit_t *circ;
2166 int streams_blocked;
2168 /* The current (hi-res) time */
2169 struct timeval now_hires;
2171 /* The EWMA cell counter for the circuit we're flushing. */
2172 cell_ewma_t *cell_ewma = NULL;
2173 double ewma_increment = -1;
2175 circ = conn->active_circuits;
2176 if (!circ) return 0;
2177 assert_active_circuits_ok_paranoid(conn);
2179 /* See if we're doing the ewma circuit selection algorithm. */
2180 if (ewma_enabled) {
2181 unsigned tick;
2182 double fractional_tick;
2183 tor_gettimeofday_cached(&now_hires);
2184 tick = cell_ewma_tick_from_timeval(&now_hires, &fractional_tick);
2186 if (tick != conn->active_circuit_pqueue_last_recalibrated) {
2187 scale_active_circuits(conn, tick);
2190 ewma_increment = pow(ewma_scale_factor, -fractional_tick);
2192 cell_ewma = smartlist_get(conn->active_circuit_pqueue, 0);
2193 circ = cell_ewma_to_circuit(cell_ewma);
2196 if (circ->n_conn == conn) {
2197 queue = &circ->n_conn_cells;
2198 streams_blocked = circ->streams_blocked_on_n_conn;
2199 } else {
2200 queue = &TO_OR_CIRCUIT(circ)->p_conn_cells;
2201 streams_blocked = circ->streams_blocked_on_p_conn;
2203 tor_assert(*next_circ_on_conn_p(circ,conn));
2205 for (n_flushed = 0; n_flushed < max && queue->head; ) {
2206 packed_cell_t *cell = cell_queue_pop(queue);
2207 tor_assert(*next_circ_on_conn_p(circ,conn));
2209 /* Calculate the exact time that this cell has spent in the queue. */
2210 if (get_options()->CellStatistics && !CIRCUIT_IS_ORIGIN(circ)) {
2211 struct timeval now;
2212 uint32_t flushed;
2213 uint32_t cell_waiting_time;
2214 insertion_time_queue_t *it_queue = queue->insertion_times;
2215 tor_gettimeofday_cached(&now);
2216 flushed = (uint32_t)((now.tv_sec % SECONDS_IN_A_DAY) * 100L +
2217 (uint32_t)now.tv_usec / (uint32_t)10000L);
2218 if (!it_queue || !it_queue->first) {
2219 log_warn(LD_BUG, "Cannot determine insertion time of cell.");
2220 } else {
2221 or_circuit_t *orcirc = TO_OR_CIRCUIT(circ);
2222 insertion_time_elem_t *elem = it_queue->first;
2223 cell_waiting_time =
2224 (uint32_t)((flushed * 10L + SECONDS_IN_A_DAY * 1000L -
2225 elem->insertion_time * 10L) %
2226 (SECONDS_IN_A_DAY * 1000L));
2227 #undef SECONDS_IN_A_DAY
2228 elem->counter--;
2229 if (elem->counter < 1) {
2230 it_queue->first = elem->next;
2231 if (elem == it_queue->last)
2232 it_queue->last = NULL;
2233 mp_pool_release(elem);
2235 orcirc->total_cell_waiting_time += cell_waiting_time;
2236 orcirc->processed_cells++;
2240 /* If we just flushed our queue and this circuit is used for a
2241 * tunneled directory request, possibly advance its state. */
2242 if (queue->n == 0 && TO_CONN(conn)->dirreq_id)
2243 geoip_change_dirreq_state(TO_CONN(conn)->dirreq_id,
2244 DIRREQ_TUNNELED,
2245 DIRREQ_CIRC_QUEUE_FLUSHED);
2247 connection_write_to_buf(cell->body, CELL_NETWORK_SIZE, TO_CONN(conn));
2249 packed_cell_free_unchecked(cell);
2250 ++n_flushed;
2251 if (cell_ewma) {
2252 cell_ewma_t *tmp;
2253 cell_ewma->cell_count += ewma_increment;
2254 /* We pop and re-add the cell_ewma_t here, not above, since we need to
2255 * re-add it immediately to keep the priority queue consistent with
2256 * the linked-list implementation */
2257 tmp = pop_first_cell_ewma_from_conn(conn);
2258 tor_assert(tmp == cell_ewma);
2259 add_cell_ewma_to_conn(conn, cell_ewma);
2261 if (circ != conn->active_circuits) {
2262 /* If this happens, the current circuit just got made inactive by
2263 * a call in connection_write_to_buf(). That's nothing to worry about:
2264 * circuit_make_inactive_on_conn() already advanced conn->active_circuits
2265 * for us.
2267 assert_active_circuits_ok_paranoid(conn);
2268 goto done;
2271 tor_assert(*next_circ_on_conn_p(circ,conn));
2272 assert_active_circuits_ok_paranoid(conn);
2273 conn->active_circuits = *next_circ_on_conn_p(circ, conn);
2275 /* Is the cell queue low enough to unblock all the streams that are waiting
2276 * to write to this circuit? */
2277 if (streams_blocked && queue->n <= CELL_QUEUE_LOWWATER_SIZE)
2278 set_streams_blocked_on_circ(circ, conn, 0, 0); /* unblock streams */
2280 /* Did we just run out of cells on this circuit's queue? */
2281 if (queue->n == 0) {
2282 log_debug(LD_GENERAL, "Made a circuit inactive.");
2283 make_circuit_inactive_on_conn(circ, conn);
2285 done:
2286 if (n_flushed)
2287 conn->timestamp_last_added_nonpadding = now;
2288 return n_flushed;
2291 /** Add <b>cell</b> to the queue of <b>circ</b> writing to <b>orconn</b>
2292 * transmitting in <b>direction</b>. */
2293 void
2294 append_cell_to_circuit_queue(circuit_t *circ, or_connection_t *orconn,
2295 cell_t *cell, cell_direction_t direction,
2296 uint16_t fromstream)
2298 cell_queue_t *queue;
2299 int streams_blocked;
2300 if (direction == CELL_DIRECTION_OUT) {
2301 queue = &circ->n_conn_cells;
2302 streams_blocked = circ->streams_blocked_on_n_conn;
2303 } else {
2304 or_circuit_t *orcirc = TO_OR_CIRCUIT(circ);
2305 queue = &orcirc->p_conn_cells;
2306 streams_blocked = circ->streams_blocked_on_p_conn;
2308 if (cell->command == CELL_RELAY_EARLY && orconn->link_proto < 2) {
2309 /* V1 connections don't understand RELAY_EARLY. */
2310 cell->command = CELL_RELAY;
2313 cell_queue_append_packed_copy(queue, cell);
2315 /* If we have too many cells on the circuit, we should stop reading from
2316 * the edge streams for a while. */
2317 if (!streams_blocked && queue->n >= CELL_QUEUE_HIGHWATER_SIZE)
2318 set_streams_blocked_on_circ(circ, orconn, 1, 0); /* block streams */
2320 if (streams_blocked && fromstream) {
2321 /* This edge connection is apparently not blocked; block it. */
2322 set_streams_blocked_on_circ(circ, orconn, 1, fromstream);
2325 if (queue->n == 1) {
2326 /* This was the first cell added to the queue. We need to make this
2327 * circuit active. */
2328 log_debug(LD_GENERAL, "Made a circuit active.");
2329 make_circuit_active_on_conn(circ, orconn);
2332 if (! buf_datalen(orconn->_base.outbuf)) {
2333 /* There is no data at all waiting to be sent on the outbuf. Add a
2334 * cell, so that we can notice when it gets flushed, flushed_some can
2335 * get called, and we can start putting more data onto the buffer then.
2337 log_debug(LD_GENERAL, "Primed a buffer.");
2338 connection_or_flush_from_first_active_circuit(orconn, 1, approx_time());
2342 /** Append an encoded value of <b>addr</b> to <b>payload_out</b>, which must
2343 * have at least 18 bytes of free space. The encoding is, as specified in
2344 * tor-spec.txt:
2345 * RESOLVED_TYPE_IPV4 or RESOLVED_TYPE_IPV6 [1 byte]
2346 * LENGTH [1 byte]
2347 * ADDRESS [length bytes]
2348 * Return the number of bytes added, or -1 on error */
2350 append_address_to_payload(char *payload_out, const tor_addr_t *addr)
2352 uint32_t a;
2353 switch (tor_addr_family(addr)) {
2354 case AF_INET:
2355 payload_out[0] = RESOLVED_TYPE_IPV4;
2356 payload_out[1] = 4;
2357 a = tor_addr_to_ipv4n(addr);
2358 memcpy(payload_out+2, &a, 4);
2359 return 6;
2360 case AF_INET6:
2361 payload_out[0] = RESOLVED_TYPE_IPV6;
2362 payload_out[1] = 16;
2363 memcpy(payload_out+2, tor_addr_to_in6_addr8(addr), 16);
2364 return 18;
2365 case AF_UNSPEC:
2366 default:
2367 return -1;
2371 /** Given <b>payload_len</b> bytes at <b>payload</b>, starting with an address
2372 * encoded as by append_address_to_payload(), try to decode the address into
2373 * *<b>addr_out</b>. Return the next byte in the payload after the address on
2374 * success, or NULL on failure. */
2375 const char *
2376 decode_address_from_payload(tor_addr_t *addr_out, const char *payload,
2377 int payload_len)
2379 if (payload_len < 2)
2380 return NULL;
2381 if (payload_len < 2+(uint8_t)payload[1])
2382 return NULL;
2384 switch (payload[0]) {
2385 case RESOLVED_TYPE_IPV4:
2386 if (payload[1] != 4)
2387 return NULL;
2388 tor_addr_from_ipv4n(addr_out, get_uint32(payload+2));
2389 break;
2390 case RESOLVED_TYPE_IPV6:
2391 if (payload[1] != 16)
2392 return NULL;
2393 tor_addr_from_ipv6_bytes(addr_out, payload+2);
2394 break;
2395 default:
2396 tor_addr_make_unspec(addr_out);
2397 break;
2399 return payload + 2 + (uint8_t)payload[1];
2402 /** Fail with an assert if the active circuits ring on <b>orconn</b> is
2403 * corrupt. */
2404 void
2405 assert_active_circuits_ok(or_connection_t *orconn)
2407 circuit_t *head = orconn->active_circuits;
2408 circuit_t *cur = head;
2409 int n = 0;
2410 if (! head)
2411 return;
2412 do {
2413 circuit_t *next = *next_circ_on_conn_p(cur, orconn);
2414 circuit_t *prev = *prev_circ_on_conn_p(cur, orconn);
2415 cell_ewma_t *ewma;
2416 tor_assert(next);
2417 tor_assert(prev);
2418 tor_assert(*next_circ_on_conn_p(prev, orconn) == cur);
2419 tor_assert(*prev_circ_on_conn_p(next, orconn) == cur);
2420 if (orconn == cur->n_conn) {
2421 ewma = &cur->n_cell_ewma;
2422 tor_assert(!ewma->is_for_p_conn);
2423 } else {
2424 ewma = &TO_OR_CIRCUIT(cur)->p_cell_ewma;
2425 tor_assert(ewma->is_for_p_conn);
2427 tor_assert(ewma->heap_index != -1);
2428 tor_assert(ewma == smartlist_get(orconn->active_circuit_pqueue,
2429 ewma->heap_index));
2430 n++;
2431 cur = next;
2432 } while (cur != head);
2434 tor_assert(n == smartlist_len(orconn->active_circuit_pqueue));
2437 /** Return 1 if we shouldn't restart reading on this circuit, even if
2438 * we get a SENDME. Else return 0.
2440 static int
2441 circuit_queue_streams_are_blocked(circuit_t *circ)
2443 if (CIRCUIT_IS_ORIGIN(circ)) {
2444 return circ->streams_blocked_on_n_conn;
2445 } else {
2446 return circ->streams_blocked_on_p_conn;