From 6a516dfdd38c05c7e535717cae766250a05b2471 Mon Sep 17 00:00:00 2001 From: Roger Dingledine Date: Sun, 21 Nov 2004 07:43:12 +0000 Subject: [PATCH] be more greedy about filling up all relay cells. this may have some bugs in it still. and it may end up not being what we want to do. svn:r2928 --- src/or/circuitbuild.c | 8 ++++--- src/or/config.c | 4 ++-- src/or/connection.c | 55 ++++++++++++++++++++++++++++++++++-------------- src/or/connection_edge.c | 7 +++--- src/or/or.h | 12 ++++++----- src/or/relay.c | 15 +++++++------ src/or/rendclient.c | 8 +++---- 7 files changed, 70 insertions(+), 39 deletions(-) diff --git a/src/or/circuitbuild.c b/src/or/circuitbuild.c index a98bca273f..9b5c834316 100644 --- a/src/or/circuitbuild.c +++ b/src/or/circuitbuild.c @@ -298,11 +298,13 @@ circuit_t *circuit_establish_circuit(uint8_t purpose, /** Find circuits that are waiting on or_conn to become open, * if any, and get them to send their create cells forward. + * + * Status is 1 if connect succeeded, or 0 if connect failed. */ -void circuit_n_conn_done(connection_t *or_conn, int success) { +void circuit_n_conn_done(connection_t *or_conn, int status) { circuit_t *circ; - log_fn(LOG_DEBUG,"or_conn to %s, success=%d", or_conn->nickname, success); + log_fn(LOG_DEBUG,"or_conn to %s, status=%d", or_conn->nickname, status); for(circ=global_circuitlist;circ;circ = circ->next) { if (circ->marked_for_close) @@ -312,7 +314,7 @@ void circuit_n_conn_done(connection_t *or_conn, int success) { circ->n_port == or_conn->port && !memcmp(or_conn->identity_digest, circ->n_conn_id_digest, DIGEST_LEN)) { tor_assert(circ->state == CIRCUIT_STATE_OR_WAIT); - if(!success) { /* or_conn failed; close circ */ + if(!status) { /* or_conn failed; close circ */ log_fn(LOG_INFO,"or_conn failed. Closing circ."); circuit_mark_for_close(circ); continue; diff --git a/src/or/config.c b/src/or/config.c index 0c1f669b10..cc5760f309 100644 --- a/src/or/config.c +++ b/src/or/config.c @@ -95,8 +95,8 @@ static config_var_t config_vars[] = { VAR("Address", STRING, Address, NULL), VAR("AllowUnverifiedNodes",CSV, AllowUnverifiedNodes, "middle,rendezvous"), VAR("AuthoritativeDirectory",BOOL, AuthoritativeDir, "0"), - VAR("BandwidthRate", MEMUNIT, BandwidthRate, "780 KB"), - VAR("BandwidthBurst", MEMUNIT, BandwidthBurst, "48 MB"), + VAR("BandwidthRate", MEMUNIT, BandwidthRate, "780 KB"), + VAR("BandwidthBurst", MEMUNIT, BandwidthBurst, "48 MB"), VAR("ClientOnly", BOOL, ClientOnly, "0"), VAR("ContactInfo", STRING, ContactInfo, NULL), VAR("ControlPort", UINT, ControlPort, "0"), diff --git a/src/or/connection.c b/src/or/connection.c index cec3773570..e9d39bc509 100644 --- a/src/or/connection.c +++ b/src/or/connection.c @@ -87,8 +87,8 @@ static int connection_handle_listener_read(connection_t *conn, int new_type); static int connection_receiver_bucket_should_increase(connection_t *conn); static int connection_finished_flushing(connection_t *conn); static int connection_finished_connecting(connection_t *conn); -static int connection_read_to_buf(connection_t *conn); -static int connection_process_inbuf(connection_t *conn); +static int connection_read_to_buf(connection_t *conn, int *max_to_read); +static int connection_process_inbuf(connection_t *conn, int package_partial); static int connection_bucket_read_limit(connection_t *conn); /**************************************************************/ @@ -803,6 +803,7 @@ static int connection_receiver_bucket_should_increase(connection_t *conn) { * return 0. */ int connection_handle_read(connection_t *conn) { + int max_to_read=-1, try_to_read; conn->timestamp_lastread = time(NULL); @@ -817,16 +818,19 @@ int connection_handle_read(connection_t *conn) { return connection_handle_listener_read(conn, CONN_TYPE_CONTROL); } - if(connection_read_to_buf(conn) < 0) { +loop_again: + try_to_read = max_to_read; + tor_assert(!conn->marked_for_close); + if (connection_read_to_buf(conn, &max_to_read) < 0) { /* There's a read error; kill the connection.*/ connection_close_immediate(conn); /* Don't flush; connection is dead. */ - if(conn->type == CONN_TYPE_AP || conn->type == CONN_TYPE_EXIT) { + if (conn->type == CONN_TYPE_AP || conn->type == CONN_TYPE_EXIT) { connection_edge_end(conn, (char)(connection_state_is_open(conn) ? END_STREAM_REASON_MISC : END_STREAM_REASON_CONNECTFAILED), conn->cpath_layer); } connection_mark_for_close(conn); - if(conn->type == CONN_TYPE_DIR && + if (conn->type == CONN_TYPE_DIR && conn->state == DIR_CONN_STATE_CONNECTING) { /* it's a directory server and connecting failed: forget about this router */ /* XXX I suspect pollerr may make Windows not get to this point. :( */ @@ -839,8 +843,17 @@ int connection_handle_read(connection_t *conn) { } return -1; } - if(connection_process_inbuf(conn) < 0) { -// log_fn(LOG_DEBUG,"connection_process_inbuf returned -1."); + if (CONN_IS_EDGE(conn) && + try_to_read != max_to_read) { + /* instruct it not to try to package partial cells. */ + if (connection_process_inbuf(conn, 0) < 0) { + return -1; + } + if (connection_is_reading(conn) && !conn->inbuf_reached_eof) + goto loop_again; /* try reading again, in case more is here now */ + } + /* one last try, packaging partial cells and all. */ + if (connection_process_inbuf(conn, 1) < 0) { return -1; } return 0; @@ -850,14 +863,19 @@ int connection_handle_read(connection_t *conn) { * directly or via TLS. Reduce the token buckets by the number of * bytes read. * + * If *max_to_read is -1, then decide it ourselves, else go with the + * value passed to us. When returning, if it's changed, subtract the + * number of bytes we read from *max_to_read. + * * Return -1 if we want to break conn, else return 0. */ -static int connection_read_to_buf(connection_t *conn) { - int result; - int at_most; +static int connection_read_to_buf(connection_t *conn, int *max_to_read) { + int result, at_most = *max_to_read; - /* how many bytes are we allowed to read? */ - at_most = connection_bucket_read_limit(conn); + if(at_most == -1) { /* we need to initialize it */ + /* how many bytes are we allowed to read? */ + at_most = connection_bucket_read_limit(conn); + } if(connection_speaks_cells(conn) && conn->state != OR_CONN_STATE_CONNECTING) { if(conn->state == OR_CONN_STATE_HANDSHAKING) { @@ -898,7 +916,11 @@ static int connection_read_to_buf(connection_t *conn) { return -1; } - if(result > 0 && !is_local_IP(conn->addr)) { /* remember it */ + if (result > 0) { /* change *max_to_read */ + *max_to_read = at_most - result; + } + + if (result > 0 && !is_local_IP(conn->addr)) { /* remember it */ rep_hist_note_bytes_read(result, time(NULL)); connection_read_bucket_decrement(conn, result); } @@ -1250,9 +1272,10 @@ int connection_send_destroy(uint16_t circ_id, connection_t *conn) { /** Process new bytes that have arrived on conn-\>inbuf. * * This function just passes conn to the connection-specific - * connection_*_process_inbuf() function. + * connection_*_process_inbuf() function. It also passes in + * package_partial if wanted. */ -static int connection_process_inbuf(connection_t *conn) { +static int connection_process_inbuf(connection_t *conn, int package_partial) { tor_assert(conn); @@ -1261,7 +1284,7 @@ static int connection_process_inbuf(connection_t *conn) { return connection_or_process_inbuf(conn); case CONN_TYPE_EXIT: case CONN_TYPE_AP: - return connection_edge_process_inbuf(conn); + return connection_edge_process_inbuf(conn, package_partial); case CONN_TYPE_DIR: return connection_dir_process_inbuf(conn); case CONN_TYPE_DNSWORKER: diff --git a/src/or/connection_edge.c b/src/or/connection_edge.c index 07e85b11b3..7a79bab63e 100644 --- a/src/or/connection_edge.c +++ b/src/or/connection_edge.c @@ -32,7 +32,7 @@ static int connection_ap_handshake_process_socks(connection_t *conn); * Mark and return -1 if there was an unexpected error with the conn, * else return 0. */ -int connection_edge_process_inbuf(connection_t *conn) { +int connection_edge_process_inbuf(connection_t *conn, int package_partial) { tor_assert(conn); tor_assert(conn->type == CONN_TYPE_AP || conn->type == CONN_TYPE_EXIT); @@ -81,7 +81,7 @@ int connection_edge_process_inbuf(connection_t *conn) { log_fn(LOG_WARN,"called with package_window %d. Tell Roger.", conn->package_window); return 0; } - if(connection_edge_package_raw_inbuf(conn) < 0) { + if(connection_edge_package_raw_inbuf(conn, package_partial) < 0) { connection_edge_end(conn, END_STREAM_REASON_MISC, conn->cpath_layer); connection_mark_for_close(conn); return -1; @@ -221,7 +221,8 @@ int connection_edge_finished_connecting(connection_t *conn) return 0; /* circuit is closed, don't continue */ } tor_assert(conn->package_window > 0); - return connection_edge_process_inbuf(conn); /* in case the server has written anything */ + /* in case the server has written anything */ + return connection_edge_process_inbuf(conn, 1); } /** How many times do we retry a general-purpose stream (detach it from diff --git a/src/or/or.h b/src/or/or.h index 2be3b1a5e3..d5825f3b75 100644 --- a/src/or/or.h +++ b/src/or/or.h @@ -180,6 +180,8 @@ typedef enum { #define CONN_TYPE_CONTROL 13 #define _CONN_TYPE_MAX 13 +#define CONN_IS_EDGE(x) ((x)->type == CONN_TYPE_EXIT || (x)->type == CONN_TYPE_AP) + /** State for any listener connection. */ #define LISTENER_STATE_READY 0 @@ -1034,7 +1036,7 @@ void circuit_rep_hist_note_result(circuit_t *circ); void circuit_dump_by_conn(connection_t *conn, int severity); circuit_t *circuit_establish_circuit(uint8_t purpose, const char *exit_digest); -void circuit_n_conn_done(connection_t *or_conn, int success); +void circuit_n_conn_done(connection_t *or_conn, int status); int circuit_send_next_onion_skin(circuit_t *circ); int circuit_extend(cell_t *cell, circuit_t *circ); int circuit_init_cpath_crypto(crypt_path_t *cpath, char *key_data, int reverse); @@ -1193,7 +1195,7 @@ int connection_or_nonopen_was_started_here(connection_t *conn); /********************************* connection_edge.c ***************************/ -int connection_edge_process_inbuf(connection_t *conn); +int connection_edge_process_inbuf(connection_t *conn, int package_partial); int connection_edge_destroy(uint16_t circ_id, connection_t *conn); int connection_edge_end(connection_t *conn, char reason, crypt_path_t *cpath_layer); int connection_edge_finished_flushing(connection_t *conn); @@ -1204,7 +1206,7 @@ int connection_ap_handshake_send_resolve(connection_t *ap_conn, circuit_t *circ) int connection_ap_make_bridge(char *address, uint16_t port); void connection_ap_handshake_socks_reply(connection_t *conn, char *reply, - size_t replylen, int success); + size_t replylen, int status); void connection_ap_handshake_socks_resolved(connection_t *conn, int answer_type, size_t answer_len, @@ -1405,7 +1407,7 @@ void relay_header_unpack(relay_header_t *dest, const char *src); int connection_edge_send_command(connection_t *fromconn, circuit_t *circ, int relay_command, const char *payload, size_t payload_len, crypt_path_t *cpath_layer); -int connection_edge_package_raw_inbuf(connection_t *conn); +int connection_edge_package_raw_inbuf(connection_t *conn, int package_partial); void connection_edge_consider_sending_sendme(connection_t *conn); extern uint64_t stats_n_data_cells_packaged; @@ -1439,7 +1441,7 @@ void rend_client_refetch_renddesc(const char *query); int rend_client_remove_intro_point(char *failed_intro, const char *query); int rend_client_rendezvous_acked(circuit_t *circ, const char *request, size_t request_len); int rend_client_receive_rendezvous(circuit_t *circ, const char *request, size_t request_len); -void rend_client_desc_fetched(char *query, int success); +void rend_client_desc_fetched(char *query, int status); char *rend_client_get_random_intro(char *query); int rend_parse_rendezvous_address(char *address); diff --git a/src/or/relay.c b/src/or/relay.c index 5ce9eaa35e..69ca8ce85a 100644 --- a/src/or/relay.c +++ b/src/or/relay.c @@ -590,7 +590,7 @@ connection_edge_process_relay_cell_not_open( connection_ap_handshake_socks_reply(conn, NULL, 0, 1); conn->socks_request->has_finished = 1; /* handle anything that might have queued */ - if (connection_edge_package_raw_inbuf(conn) < 0) { + if (connection_edge_package_raw_inbuf(conn, 1) < 0) { connection_edge_end(conn, END_STREAM_REASON_MISC, conn->cpath_layer); connection_mark_for_close(conn); return 0; @@ -803,7 +803,7 @@ connection_edge_process_relay_cell(cell_t *cell, circuit_t *circ, conn->package_window += STREAMWINDOW_INCREMENT; log_fn(LOG_DEBUG,"stream-level sendme, packagewindow now %d.", conn->package_window); connection_start_reading(conn); - connection_edge_package_raw_inbuf(conn); /* handle whatever might still be on the inbuf */ + connection_edge_package_raw_inbuf(conn, 1); /* handle whatever might still be on the inbuf */ return 0; case RELAY_COMMAND_RESOLVE: if (layer_hint) { @@ -854,7 +854,7 @@ uint64_t stats_n_data_bytes_received = 0; * * Return -1 if conn should be marked for close, else return 0. */ -int connection_edge_package_raw_inbuf(connection_t *conn) { +int connection_edge_package_raw_inbuf(connection_t *conn, int package_partial) { size_t amount_to_process, length; char payload[CELL_PAYLOAD_SIZE]; circuit_t *circ; @@ -881,10 +881,13 @@ repeat_connection_edge_package_raw_inbuf: amount_to_process = buf_datalen(conn->inbuf); - if(!amount_to_process) + if (!amount_to_process) return 0; - if(amount_to_process > RELAY_PAYLOAD_SIZE) { + if (!package_partial && amount_to_process < RELAY_PAYLOAD_SIZE) + return 0; + + if (amount_to_process > RELAY_PAYLOAD_SIZE) { length = RELAY_PAYLOAD_SIZE; } else { length = amount_to_process; @@ -982,7 +985,7 @@ circuit_resume_edge_reading_helper(connection_t *conn, (layer_hint && conn->package_window > 0 && conn->cpath_layer == layer_hint)) { connection_start_reading(conn); /* handle whatever might still be on the inbuf */ - connection_edge_package_raw_inbuf(conn); + connection_edge_package_raw_inbuf(conn, 1); /* If the circuit won't accept any more data, return without looking * at any more of the streams. Any connections that should be stopped diff --git a/src/or/rendclient.c b/src/or/rendclient.c index d4bd9f6bfe..8c7519853b 100644 --- a/src/or/rendclient.c +++ b/src/or/rendclient.c @@ -369,10 +369,10 @@ rend_client_receive_rendezvous(circuit_t *circ, const char *request, size_t requ } /** Find all the apconns in state AP_CONN_STATE_RENDDESC_WAIT that - * are waiting on query. If success==1, move them to the next state. - * If success==0, fail them. + * are waiting on query. If status==1, move them to the next state. + * If status==0, fail them. */ -void rend_client_desc_fetched(char *query, int success) { +void rend_client_desc_fetched(char *query, int status) { connection_t **carray; connection_t *conn; int n, i; @@ -388,7 +388,7 @@ void rend_client_desc_fetched(char *query, int success) { if (rend_cmp_service_ids(conn->rend_query, query)) continue; /* great, this guy was waiting */ - if(success || + if(status!=0 || rend_cache_lookup_entry(conn->rend_query, &entry) == 1) { /* either this fetch worked, or it failed but there was a * valid entry from before which we should reuse */ -- 2.11.4.GIT