AUTO_LT_SYNC
[tore.git] / libtorrent / src / peer_connection.cpp
blob6753b18318ea53df888c2bdf265d6cdd809e9b44
1 /*
3 Copyright (c) 2003, Arvid Norberg
4 All rights reserved.
6 Redistribution and use in source and binary forms, with or without
7 modification, are permitted provided that the following conditions
8 are met:
10 * Redistributions of source code must retain the above copyright
11 notice, this list of conditions and the following disclaimer.
12 * Redistributions in binary form must reproduce the above copyright
13 notice, this list of conditions and the following disclaimer in
14 the documentation and/or other materials provided with the distribution.
15 * Neither the name of the author nor the names of its
16 contributors may be used to endorse or promote products derived
17 from this software without specific prior written permission.
19 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20 AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21 IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
22 ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
23 LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24 CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25 SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26 INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27 CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28 ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29 POSSIBILITY OF SUCH DAMAGE.
33 #include "libtorrent/pch.hpp"
35 #include <vector>
36 #include <iostream>
37 #include <iomanip>
38 #include <limits>
39 #include <boost/bind.hpp>
41 #include "libtorrent/peer_connection.hpp"
42 #include "libtorrent/identify_client.hpp"
43 #include "libtorrent/entry.hpp"
44 #include "libtorrent/bencode.hpp"
45 #include "libtorrent/alert_types.hpp"
46 #include "libtorrent/invariant_check.hpp"
47 #include "libtorrent/io.hpp"
48 #include "libtorrent/file.hpp"
49 #include "libtorrent/version.hpp"
50 #include "libtorrent/extensions.hpp"
51 #include "libtorrent/aux_/session_impl.hpp"
52 #include "libtorrent/policy.hpp"
53 #include "libtorrent/socket_type.hpp"
54 #include "libtorrent/assert.hpp"
56 //#define TORRENT_CORRUPT_DATA
58 using boost::bind;
59 using boost::shared_ptr;
60 using libtorrent::aux::session_impl;
62 namespace libtorrent
64 // outbound connection
65 peer_connection::peer_connection(
66 session_impl& ses
67 , boost::weak_ptr<torrent> tor
68 , shared_ptr<socket_type> s
69 , tcp::endpoint const& endp
70 , policy::peer* peerinfo)
72 #ifndef NDEBUG
73 m_last_choke(time_now() - hours(1))
75 #endif
76 m_ses(ses)
77 , m_max_out_request_queue(m_ses.settings().max_out_request_queue)
78 , m_last_piece(time_now())
79 , m_last_request(time_now())
80 , m_last_incoming_request(min_time())
81 , m_last_unchoke(min_time())
82 , m_last_receive(time_now())
83 , m_last_sent(time_now())
84 , m_requested(min_time())
85 , m_timeout_extend(0)
86 , m_remote_dl_update(time_now())
87 , m_connect(time_now())
88 , m_became_uninterested(time_now())
89 , m_became_uninteresting(time_now())
90 , m_free_upload(0)
91 , m_downloaded_at_last_unchoke(0)
92 , m_disk_recv_buffer(ses, 0)
93 , m_socket(s)
94 , m_remote(endp)
95 , m_torrent(tor)
96 , m_num_pieces(0)
97 , m_timeout(m_ses.settings().peer_timeout)
98 , m_packet_size(0)
99 , m_recv_pos(0)
100 , m_disk_recv_buffer_size(0)
101 , m_reading_bytes(0)
102 , m_num_invalid_requests(0)
103 , m_priority(1)
104 , m_upload_limit(bandwidth_limit::inf)
105 , m_download_limit(bandwidth_limit::inf)
106 , m_peer_info(peerinfo)
107 , m_speed(slow)
108 , m_connection_ticket(-1)
109 , m_remote_bytes_dled(0)
110 , m_remote_dl_rate(0)
111 , m_outstanding_writing_bytes(0)
112 , m_download_rate_peak(0)
113 , m_upload_rate_peak(0)
114 , m_rtt(0)
115 , m_prefer_whole_pieces(0)
116 , m_desired_queue_size(2)
117 , m_fast_reconnect(false)
118 , m_active(true)
119 , m_peer_interested(false)
120 , m_peer_choked(true)
121 , m_interesting(false)
122 , m_choked(true)
123 , m_failed(false)
124 , m_ignore_bandwidth_limits(false)
125 , m_have_all(false)
126 , m_disconnecting(false)
127 , m_connecting(true)
128 , m_queued(true)
129 , m_request_large_blocks(false)
130 , m_upload_only(false)
131 , m_snubbed(false)
132 , m_bitfield_received(false)
133 #ifndef NDEBUG
134 , m_in_constructor(true)
135 , m_disconnect_started(false)
136 , m_initialized(false)
137 #endif
139 m_channel_state[upload_channel] = peer_info::bw_idle;
140 m_channel_state[download_channel] = peer_info::bw_idle;
142 TORRENT_ASSERT(peerinfo == 0 || peerinfo->banned == false);
143 #ifndef TORRENT_DISABLE_RESOLVE_COUNTRIES
144 std::fill(m_country, m_country + 2, 0);
145 #ifndef TORRENT_DISABLE_GEO_IP
146 if (m_ses.has_country_db())
148 char const *country = m_ses.country_for_ip(m_remote.address());
149 if (country != 0)
151 m_country[0] = country[0];
152 m_country[1] = country[1];
155 #endif
156 #endif
157 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
158 m_logger = m_ses.create_log(m_remote.address().to_string() + "_"
159 + boost::lexical_cast<std::string>(m_remote.port()), m_ses.listen_port());
160 (*m_logger) << "*** OUTGOING CONNECTION\n";
161 #endif
162 #ifndef NDEBUG
163 piece_failed = false;
164 #endif
165 #ifndef TORRENT_DISABLE_GEO_IP
166 m_inet_as_name = m_ses.as_name_for_ip(m_remote.address());
167 #endif
169 std::fill(m_peer_id.begin(), m_peer_id.end(), 0);
172 // incoming connection
173 peer_connection::peer_connection(
174 session_impl& ses
175 , shared_ptr<socket_type> s
176 , tcp::endpoint const& endp
177 , policy::peer* peerinfo)
179 #ifndef NDEBUG
180 m_last_choke(time_now() - hours(1))
182 #endif
183 m_ses(ses)
184 , m_max_out_request_queue(m_ses.settings().max_out_request_queue)
185 , m_last_piece(time_now())
186 , m_last_request(time_now())
187 , m_last_incoming_request(min_time())
188 , m_last_unchoke(min_time())
189 , m_last_receive(time_now())
190 , m_last_sent(time_now())
191 , m_requested(min_time())
192 , m_timeout_extend(0)
193 , m_remote_dl_update(time_now())
194 , m_connect(time_now())
195 , m_became_uninterested(time_now())
196 , m_became_uninteresting(time_now())
197 , m_free_upload(0)
198 , m_downloaded_at_last_unchoke(0)
199 , m_disk_recv_buffer(ses, 0)
200 , m_socket(s)
201 , m_remote(endp)
202 , m_num_pieces(0)
203 , m_timeout(m_ses.settings().peer_timeout)
204 , m_packet_size(0)
205 , m_recv_pos(0)
206 , m_disk_recv_buffer_size(0)
207 , m_reading_bytes(0)
208 , m_num_invalid_requests(0)
209 , m_priority(1)
210 , m_upload_limit(bandwidth_limit::inf)
211 , m_download_limit(bandwidth_limit::inf)
212 , m_peer_info(peerinfo)
213 , m_speed(slow)
214 , m_connection_ticket(-1)
215 , m_remote_bytes_dled(0)
216 , m_remote_dl_rate(0)
217 , m_outstanding_writing_bytes(0)
218 , m_download_rate_peak(0)
219 , m_upload_rate_peak(0)
220 , m_rtt(0)
221 , m_prefer_whole_pieces(0)
222 , m_desired_queue_size(2)
223 , m_fast_reconnect(false)
224 , m_active(false)
225 , m_peer_interested(false)
226 , m_peer_choked(true)
227 , m_interesting(false)
228 , m_choked(true)
229 , m_failed(false)
230 , m_ignore_bandwidth_limits(false)
231 , m_have_all(false)
232 , m_disconnecting(false)
233 , m_connecting(false)
234 , m_queued(false)
235 , m_request_large_blocks(false)
236 , m_upload_only(false)
237 , m_snubbed(false)
238 , m_bitfield_received(false)
239 #ifndef NDEBUG
240 , m_in_constructor(true)
241 , m_disconnect_started(false)
242 , m_initialized(false)
243 #endif
245 m_channel_state[upload_channel] = peer_info::bw_idle;
246 m_channel_state[download_channel] = peer_info::bw_idle;
248 #ifndef TORRENT_DISABLE_RESOLVE_COUNTRIES
249 std::fill(m_country, m_country + 2, 0);
250 #ifndef TORRENT_DISABLE_GEO_IP
251 if (m_ses.has_country_db())
253 char const *country = m_ses.country_for_ip(m_remote.address());
254 if (country != 0)
256 m_country[0] = country[0];
257 m_country[1] = country[1];
260 #endif
261 #endif
263 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
264 error_code ec;
265 TORRENT_ASSERT(m_socket->remote_endpoint(ec) == m_remote || ec);
266 m_logger = m_ses.create_log(remote().address().to_string(ec) + "_"
267 + boost::lexical_cast<std::string>(remote().port()), m_ses.listen_port());
268 (*m_logger) << "*** INCOMING CONNECTION\n";
269 #endif
271 #ifndef TORRENT_DISABLE_GEO_IP
272 m_inet_as_name = m_ses.as_name_for_ip(m_remote.address());
273 #endif
274 #ifndef NDEBUG
275 piece_failed = false;
276 #endif
277 std::fill(m_peer_id.begin(), m_peer_id.end(), 0);
280 bool peer_connection::unchoke_compare(boost::intrusive_ptr<peer_connection const> const& p) const
282 TORRENT_ASSERT(p);
283 peer_connection const& rhs = *p;
285 // first compare how many bytes they've sent us
286 size_type c1 = m_statistics.total_payload_download() - m_downloaded_at_last_unchoke;
287 size_type c2 = rhs.m_statistics.total_payload_download() - rhs.m_downloaded_at_last_unchoke;
288 if (c1 > c2) return true;
289 if (c1 < c2) return false;
291 // if they are equal, compare how much we have uploaded
292 if (m_peer_info) c1 = m_peer_info->total_upload();
293 else c1 = m_statistics.total_payload_upload();
294 if (rhs.m_peer_info) c2 = rhs.m_peer_info->total_upload();
295 else c2 = rhs.m_statistics.total_payload_upload();
297 return c1 < c2;
300 void peer_connection::reset_choke_counters()
302 m_downloaded_at_last_unchoke = m_statistics.total_payload_download();
305 void peer_connection::start()
307 TORRENT_ASSERT(m_peer_info == 0 || m_peer_info->connection == this);
308 boost::shared_ptr<torrent> t = m_torrent.lock();
310 if (!t)
312 tcp::socket::non_blocking_io ioc(true);
313 error_code ec;
314 m_socket->io_control(ioc, ec);
315 if (ec)
317 disconnect(ec.message().c_str());
318 return;
320 m_remote = m_socket->remote_endpoint(ec);
321 if (ec)
323 disconnect(ec.message().c_str());
324 return;
326 if (m_remote.address().is_v4())
327 m_socket->set_option(type_of_service(m_ses.settings().peer_tos), ec);
329 else if (t->ready_for_connections())
331 init();
335 void peer_connection::update_interest()
337 boost::shared_ptr<torrent> t = m_torrent.lock();
338 TORRENT_ASSERT(t);
340 // if m_have_piece is 0, it means the connections
341 // have not been initialized yet. The interested
342 // flag will be updated once they are.
343 if (m_have_piece.size() == 0) return;
344 if (!t->ready_for_connections()) return;
346 bool interested = false;
347 if (!t->is_finished())
349 piece_picker const& p = t->picker();
350 int num_pieces = p.num_pieces();
351 for (int j = 0; j != num_pieces; ++j)
353 if (!p.have_piece(j)
354 && t->piece_priority(j) > 0
355 && m_have_piece[j])
357 interested = true;
358 break;
364 if (!interested) send_not_interested();
365 else t->get_policy().peer_is_interesting(*this);
367 // may throw an asio error if socket has disconnected
368 catch (std::exception&) {}
370 TORRENT_ASSERT(in_handshake() || is_interesting() == interested);
373 #ifndef TORRENT_DISABLE_EXTENSIONS
374 void peer_connection::add_extension(boost::shared_ptr<peer_plugin> ext)
376 m_extensions.push_back(ext);
378 #endif
380 void peer_connection::send_allowed_set()
382 INVARIANT_CHECK;
384 boost::shared_ptr<torrent> t = m_torrent.lock();
385 TORRENT_ASSERT(t);
387 int num_allowed_pieces = m_ses.settings().allowed_fast_set_size;
388 int num_pieces = t->torrent_file().num_pieces();
390 if (num_allowed_pieces >= num_pieces)
392 for (int i = 0; i < num_pieces; ++i)
394 #ifdef TORRENT_VERBOSE_LOGGING
395 (*m_logger) << time_now_string()
396 << " ==> ALLOWED_FAST [ " << i << " ]\n";
397 #endif
398 write_allow_fast(i);
399 m_accept_fast.insert(i);
401 return;
404 std::string x;
405 address const& addr = m_remote.address();
406 if (addr.is_v4())
408 address_v4::bytes_type bytes = addr.to_v4().to_bytes();
409 x.assign((char*)&bytes[0], bytes.size());
411 else
413 address_v6::bytes_type bytes = addr.to_v6().to_bytes();
414 x.assign((char*)&bytes[0], bytes.size());
416 x.append((char*)&t->torrent_file().info_hash()[0], 20);
418 sha1_hash hash = hasher(&x[0], x.size()).final();
419 for (;;)
421 char* p = (char*)&hash[0];
422 for (int i = 0; i < 5; ++i)
424 int piece = detail::read_uint32(p) % num_pieces;
425 if (m_accept_fast.find(piece) == m_accept_fast.end())
427 #ifdef TORRENT_VERBOSE_LOGGING
428 (*m_logger) << time_now_string()
429 << " ==> ALLOWED_FAST [ " << piece << " ]\n";
430 #endif
431 write_allow_fast(piece);
432 m_accept_fast.insert(piece);
433 if (int(m_accept_fast.size()) >= num_allowed_pieces
434 || int(m_accept_fast.size()) == num_pieces) return;
437 hash = hasher((char*)&hash[0], 20).final();
441 void peer_connection::on_metadata_impl()
443 boost::shared_ptr<torrent> t = associated_torrent().lock();
444 m_have_piece.resize(t->torrent_file().num_pieces(), m_have_all);
445 m_num_pieces = m_have_piece.count();
446 if (m_num_pieces == int(m_have_piece.size()))
448 #ifdef TORRENT_VERBOSE_LOGGING
449 (*m_logger) << time_now_string()
450 << " *** on_metadata(): THIS IS A SEED ***\n";
451 #endif
452 // if this is a web seed. we don't have a peer_info struct
453 if (m_peer_info) m_peer_info->seed = true;
454 m_upload_only = true;
456 t->peer_has_all();
457 disconnect_if_redundant();
458 if (m_disconnecting) return;
460 on_metadata();
461 if (m_disconnecting) return;
463 if (!t->is_finished())
464 t->get_policy().peer_is_interesting(*this);
466 return;
468 TORRENT_ASSERT(!m_have_all);
470 on_metadata();
471 if (m_disconnecting) return;
473 // let the torrent know which pieces the
474 // peer has
475 // if we're a seed, we don't keep track of piece availability
476 bool interesting = false;
477 if (!t->is_seed())
479 t->peer_has(m_have_piece);
481 for (int i = 0; i < (int)m_have_piece.size(); ++i)
483 if (m_have_piece[i])
485 if (!t->have_piece(i) && t->picker().piece_priority(i) != 0)
486 interesting = true;
491 if (interesting) t->get_policy().peer_is_interesting(*this);
492 else if (upload_only()) disconnect("upload to upload connections");
495 void peer_connection::init()
497 INVARIANT_CHECK;
499 boost::shared_ptr<torrent> t = m_torrent.lock();
500 TORRENT_ASSERT(t);
501 TORRENT_ASSERT(t->valid_metadata());
502 TORRENT_ASSERT(t->ready_for_connections());
504 m_have_piece.resize(t->torrent_file().num_pieces(), m_have_all);
506 if (m_have_all) m_num_pieces = t->torrent_file().num_pieces();
507 #ifndef NDEBUG
508 m_initialized = true;
509 #endif
510 // now that we have a piece_picker,
511 // update it with this peer's pieces
513 TORRENT_ASSERT(m_num_pieces == m_have_piece.count());
515 if (m_num_pieces == int(m_have_piece.size()))
517 #ifdef TORRENT_VERBOSE_LOGGING
518 (*m_logger) << " *** THIS IS A SEED ***\n";
519 #endif
520 // if this is a web seed. we don't have a peer_info struct
521 if (m_peer_info) m_peer_info->seed = true;
522 m_upload_only = true;
524 t->peer_has_all();
525 if (t->is_finished()) send_not_interested();
526 else t->get_policy().peer_is_interesting(*this);
527 return;
530 // if we're a seed, we don't keep track of piece availability
531 if (!t->is_seed())
533 t->peer_has(m_have_piece);
534 bool interesting = false;
535 for (int i = 0; i < int(m_have_piece.size()); ++i)
537 if (m_have_piece[i])
539 // if the peer has a piece and we don't, the peer is interesting
540 if (!t->have_piece(i)
541 && t->picker().piece_priority(i) != 0)
542 interesting = true;
545 if (interesting) t->get_policy().peer_is_interesting(*this);
546 else send_not_interested();
548 else
550 update_interest();
554 peer_connection::~peer_connection()
556 // INVARIANT_CHECK;
557 TORRENT_ASSERT(!m_in_constructor);
558 TORRENT_ASSERT(m_disconnecting);
559 TORRENT_ASSERT(m_disconnect_started);
561 m_disk_recv_buffer_size = 0;
563 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
564 if (m_logger)
566 (*m_logger) << time_now_string()
567 << " *** CONNECTION CLOSED\n";
569 #endif
570 TORRENT_ASSERT(!m_ses.has_peer(this));
571 #ifndef NDEBUG
572 for (aux::session_impl::torrent_map::const_iterator i = m_ses.m_torrents.begin()
573 , end(m_ses.m_torrents.end()); i != end; ++i)
574 TORRENT_ASSERT(!i->second->has_peer(this));
575 if (m_peer_info)
576 TORRENT_ASSERT(m_peer_info->connection == 0);
578 boost::shared_ptr<torrent> t = m_torrent.lock();
579 #endif
582 int peer_connection::picker_options() const
584 int ret = 0;
585 boost::shared_ptr<torrent> t = m_torrent.lock();
586 TORRENT_ASSERT(t);
587 if (!t) return 0;
589 if (t->is_sequential_download())
591 ret |= piece_picker::sequential;
593 else if (t->num_have() < t->settings().initial_picker_threshold)
595 // if we have fewer pieces than a certain threshols
596 // don't pick rare pieces, just pick random ones,
597 // and prioritize finishing them
598 ret |= piece_picker::prioritize_partials;
600 else
602 ret |= piece_picker::rarest_first;
605 if (m_snubbed)
607 // snubbed peers should request
608 // the common pieces first, just to make
609 // it more likely for all snubbed peers to
610 // request blocks from the same piece
611 ret |= piece_picker::reverse;
614 if (t->settings().prioritize_partial_pieces)
615 ret |= piece_picker::prioritize_partials;
617 if (on_parole()) ret |= piece_picker::on_parole
618 | piece_picker::prioritize_partials;
620 // only one of rarest_first, common_first and sequential can be set.
621 TORRENT_ASSERT(bool(ret & piece_picker::rarest_first)
622 + bool(ret & piece_picker::sequential) <= 1);
623 return ret;
626 void peer_connection::fast_reconnect(bool r)
628 if (!peer_info_struct() || peer_info_struct()->fast_reconnects > 1)
629 return;
630 m_fast_reconnect = r;
631 peer_info_struct()->connected = time_now()
632 - seconds(m_ses.settings().min_reconnect_time
633 * m_ses.settings().max_failcount);
634 ++peer_info_struct()->fast_reconnects;
637 void peer_connection::announce_piece(int index)
639 // dont announce during handshake
640 if (in_handshake()) return;
642 // remove suggested pieces that we have
643 std::vector<int>::iterator i = std::find(
644 m_suggested_pieces.begin(), m_suggested_pieces.end(), index);
645 if (i != m_suggested_pieces.end()) m_suggested_pieces.erase(i);
647 if (has_piece(index))
649 // if we got a piece that this peer has
650 // it might have been the last interesting
651 // piece this peer had. We might not be
652 // interested anymore
653 update_interest();
654 if (is_disconnecting()) return;
656 // optimization, don't send have messages
657 // to peers that already have the piece
658 if (!m_ses.settings().send_redundant_have)
660 #ifdef TORRENT_VERBOSE_LOGGING
661 (*m_logger) << time_now_string()
662 << " ==> HAVE [ piece: " << index << " ] SUPRESSED\n";
663 #endif
664 return;
668 #ifdef TORRENT_VERBOSE_LOGGING
669 (*m_logger) << time_now_string()
670 << " ==> HAVE [ piece: " << index << "]\n";
671 #endif
672 write_have(index);
673 #ifndef NDEBUG
674 boost::shared_ptr<torrent> t = m_torrent.lock();
675 TORRENT_ASSERT(t);
676 TORRENT_ASSERT(t->have_piece(index));
677 #endif
680 bool peer_connection::has_piece(int i) const
682 boost::shared_ptr<torrent> t = m_torrent.lock();
683 TORRENT_ASSERT(t);
684 TORRENT_ASSERT(t->valid_metadata());
685 TORRENT_ASSERT(i >= 0);
686 TORRENT_ASSERT(i < t->torrent_file().num_pieces());
687 return m_have_piece[i];
690 std::deque<piece_block> const& peer_connection::request_queue() const
692 return m_request_queue;
695 std::deque<pending_block> const& peer_connection::download_queue() const
697 return m_download_queue;
700 std::deque<peer_request> const& peer_connection::upload_queue() const
702 return m_requests;
705 void peer_connection::add_stat(size_type downloaded, size_type uploaded)
707 m_statistics.add_stat(downloaded, uploaded);
710 bitfield const& peer_connection::get_bitfield() const
712 return m_have_piece;
715 void peer_connection::received_valid_data(int index)
717 INVARIANT_CHECK;
719 #ifndef TORRENT_DISABLE_EXTENSIONS
720 for (extension_list_t::iterator i = m_extensions.begin()
721 , end(m_extensions.end()); i != end; ++i)
723 #ifdef BOOST_NO_EXCEPTIONS
724 (*i)->on_piece_pass(index);
725 #else
726 try { (*i)->on_piece_pass(index); } catch (std::exception&) {}
727 #endif
729 #endif
732 void peer_connection::received_invalid_data(int index)
734 INVARIANT_CHECK;
736 #ifndef TORRENT_DISABLE_EXTENSIONS
737 for (extension_list_t::iterator i = m_extensions.begin()
738 , end(m_extensions.end()); i != end; ++i)
740 #ifdef BOOST_NO_EXCEPTIONS
741 (*i)->on_piece_failed(index);
742 #else
743 try { (*i)->on_piece_failed(index); } catch (std::exception&) {}
744 #endif
746 #endif
747 if (is_disconnecting()) return;
749 if (peer_info_struct())
751 if (m_ses.settings().use_parole_mode)
752 peer_info_struct()->on_parole = true;
754 ++peer_info_struct()->hashfails;
755 boost::int8_t& trust_points = peer_info_struct()->trust_points;
757 // we decrease more than we increase, to keep the
758 // allowed failed/passed ratio low.
759 // TODO: make this limit user settable
760 trust_points -= 2;
761 if (trust_points < -7) trust_points = -7;
765 size_type peer_connection::total_free_upload() const
767 return m_free_upload;
770 void peer_connection::add_free_upload(size_type free_upload)
772 INVARIANT_CHECK;
774 m_free_upload += free_upload;
777 // verifies a piece to see if it is valid (is within a valid range)
778 // and if it can correspond to a request generated by libtorrent.
779 bool peer_connection::verify_piece(const peer_request& p) const
781 INVARIANT_CHECK;
783 boost::shared_ptr<torrent> t = m_torrent.lock();
784 TORRENT_ASSERT(t);
786 TORRENT_ASSERT(t->valid_metadata());
787 torrent_info const& ti = t->torrent_file();
789 return p.piece >= 0
790 && p.piece < t->torrent_file().num_pieces()
791 && p.length > 0
792 && p.start >= 0
793 && (p.length == t->block_size()
794 || (p.length < t->block_size()
795 && p.piece == ti.num_pieces()-1
796 && p.start + p.length == ti.piece_size(p.piece))
797 || (m_request_large_blocks
798 && p.length <= ti.piece_length() * m_prefer_whole_pieces == 0 ?
799 1 : m_prefer_whole_pieces))
800 && p.piece * size_type(ti.piece_length()) + p.start + p.length
801 <= ti.total_size()
802 && (p.start % t->block_size() == 0);
805 void peer_connection::attach_to_torrent(sha1_hash const& ih)
807 INVARIANT_CHECK;
809 TORRENT_ASSERT(!m_disconnecting);
810 TORRENT_ASSERT(m_torrent.expired());
811 boost::weak_ptr<torrent> wpt = m_ses.find_torrent(ih);
812 boost::shared_ptr<torrent> t = wpt.lock();
814 if (t && t->is_aborted())
816 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
817 (*m_logger) << " *** the torrent has been aborted\n";
818 #endif
819 t.reset();
822 if (!t)
824 // we couldn't find the torrent!
825 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
826 (*m_logger) << " *** couldn't find a torrent with the given info_hash: " << ih << "\n";
827 (*m_logger) << " torrents:\n";
828 session_impl::torrent_map const& torrents = m_ses.m_torrents;
829 for (session_impl::torrent_map::const_iterator i = torrents.begin()
830 , end(torrents.end()); i != end; ++i)
832 (*m_logger) << " " << i->second->torrent_file().info_hash() << "\n";
834 #endif
835 disconnect("got invalid info-hash", 2);
836 return;
839 if (t->is_paused())
841 // paused torrents will not accept
842 // incoming connections
843 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
844 (*m_logger) << " rejected connection to paused torrent\n";
845 #endif
846 disconnect("connection rejected bacause torrent is paused");
847 return;
850 TORRENT_ASSERT(m_torrent.expired());
851 // check to make sure we don't have another connection with the same
852 // info_hash and peer_id. If we do. close this connection.
853 #ifndef NDEBUG
856 #endif
857 t->attach_peer(this);
858 #ifndef NDEBUG
860 catch (std::exception& e)
862 std::cout << e.what() << std::endl;
863 TORRENT_ASSERT(false);
865 #endif
866 if (m_disconnecting) return;
867 m_torrent = wpt;
869 TORRENT_ASSERT(!m_torrent.expired());
871 // if the torrent isn't ready to accept
872 // connections yet, we'll have to wait with
873 // our initialization
874 if (t->ready_for_connections()) init();
876 TORRENT_ASSERT(!m_torrent.expired());
878 // assume the other end has no pieces
879 // if we don't have valid metadata yet,
880 // leave the vector unallocated
881 TORRENT_ASSERT(m_num_pieces == 0);
882 m_have_piece.clear_all();
883 TORRENT_ASSERT(!m_torrent.expired());
886 // message handlers
888 // -----------------------------
889 // --------- KEEPALIVE ---------
890 // -----------------------------
892 void peer_connection::incoming_keepalive()
894 INVARIANT_CHECK;
896 #ifdef TORRENT_VERBOSE_LOGGING
897 (*m_logger) << time_now_string() << " <== KEEPALIVE\n";
898 #endif
901 // -----------------------------
902 // ----------- CHOKE -----------
903 // -----------------------------
905 void peer_connection::incoming_choke()
907 INVARIANT_CHECK;
909 boost::shared_ptr<torrent> t = m_torrent.lock();
910 TORRENT_ASSERT(t);
912 #ifndef TORRENT_DISABLE_EXTENSIONS
913 for (extension_list_t::iterator i = m_extensions.begin()
914 , end(m_extensions.end()); i != end; ++i)
916 if ((*i)->on_choke()) return;
918 #endif
919 if (is_disconnecting()) return;
921 #ifdef TORRENT_VERBOSE_LOGGING
922 (*m_logger) << time_now_string() << " <== CHOKE\n";
923 #endif
924 m_peer_choked = true;
926 if (peer_info_struct() == 0 || !peer_info_struct()->on_parole)
928 // if the peer is not in parole mode, clear the queued
929 // up block requests
930 if (!t->is_seed())
932 piece_picker& p = t->picker();
933 for (std::deque<piece_block>::const_iterator i = m_request_queue.begin()
934 , end(m_request_queue.end()); i != end; ++i)
936 // since this piece was skipped, clear it and allow it to
937 // be requested from other peers
938 p.abort_download(*i);
941 m_request_queue.clear();
945 bool match_request(peer_request const& r, piece_block const& b, int block_size)
947 if (b.piece_index != r.piece) return false;
948 if (b.block_index != r.start / block_size) return false;
949 if (r.start % block_size != 0) return false;
950 return true;
953 // -----------------------------
954 // -------- REJECT PIECE -------
955 // -----------------------------
957 void peer_connection::incoming_reject_request(peer_request const& r)
959 INVARIANT_CHECK;
961 boost::shared_ptr<torrent> t = m_torrent.lock();
962 TORRENT_ASSERT(t);
964 #ifndef TORRENT_DISABLE_EXTENSIONS
965 for (extension_list_t::iterator i = m_extensions.begin()
966 , end(m_extensions.end()); i != end; ++i)
968 if ((*i)->on_reject(r)) return;
970 #endif
972 if (is_disconnecting()) return;
974 std::deque<pending_block>::iterator i = std::find_if(
975 m_download_queue.begin(), m_download_queue.end()
976 , bind(match_request, boost::cref(r), bind(&pending_block::block, _1)
977 , t->block_size()));
979 #ifdef TORRENT_VERBOSE_LOGGING
980 (*m_logger) << time_now_string()
981 << " <== REJECT_PIECE [ piece: " << r.piece << " | s: " << r.start << " | l: " << r.length << " ]\n";
982 #endif
984 piece_block b(-1, 0);
985 if (i != m_download_queue.end())
987 b = i->block;
988 m_download_queue.erase(i);
990 // if the peer is in parole mode, keep the request
991 if (peer_info_struct() && peer_info_struct()->on_parole)
993 m_request_queue.push_front(b);
995 else if (!t->is_seed())
997 piece_picker& p = t->picker();
998 p.abort_download(b);
1001 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
1002 else
1004 (*m_logger) << time_now_string()
1005 << " *** PIECE NOT IN REQUEST QUEUE\n";
1007 #endif
1008 if (has_peer_choked())
1010 // if we're choked and we got a rejection of
1011 // a piece in the allowed fast set, remove it
1012 // from the allow fast set.
1013 std::vector<int>::iterator i = std::find(
1014 m_allowed_fast.begin(), m_allowed_fast.end(), r.piece);
1015 if (i != m_allowed_fast.end()) m_allowed_fast.erase(i);
1017 else
1019 std::vector<int>::iterator i = std::find(m_suggested_pieces.begin()
1020 , m_suggested_pieces.end(), r.piece);
1021 if (i != m_suggested_pieces.end())
1022 m_suggested_pieces.erase(i);
1025 if (m_request_queue.empty() && m_download_queue.size() < 2)
1027 request_a_block(*t, *this);
1028 send_block_requests();
1032 // -----------------------------
1033 // ------- SUGGEST PIECE -------
1034 // -----------------------------
1036 void peer_connection::incoming_suggest(int index)
1038 INVARIANT_CHECK;
1040 #ifdef TORRENT_VERBOSE_LOGGING
1041 (*m_logger) << time_now_string()
1042 << " <== SUGGEST_PIECE [ piece: " << index << " ]\n";
1043 #endif
1044 boost::shared_ptr<torrent> t = m_torrent.lock();
1045 if (!t) return;
1047 #ifndef TORRENT_DISABLE_EXTENSIONS
1048 for (extension_list_t::iterator i = m_extensions.begin()
1049 , end(m_extensions.end()); i != end; ++i)
1051 if ((*i)->on_suggest(index)) return;
1053 #endif
1055 if (is_disconnecting()) return;
1056 if (t->have_piece(index)) return;
1058 if (m_suggested_pieces.size() > 9)
1059 m_suggested_pieces.erase(m_suggested_pieces.begin());
1060 m_suggested_pieces.push_back(index);
1062 #ifdef TORRENT_VERBOSE_LOGGING
1063 (*m_logger) << time_now_string()
1064 << " ** SUGGEST_PIECE [ piece: " << index << " added to set: " << m_suggested_pieces.size() << " ]\n";
1065 #endif
1068 // -----------------------------
1069 // ---------- UNCHOKE ----------
1070 // -----------------------------
1072 void peer_connection::incoming_unchoke()
1074 INVARIANT_CHECK;
1076 boost::shared_ptr<torrent> t = m_torrent.lock();
1077 TORRENT_ASSERT(t);
1079 #ifndef TORRENT_DISABLE_EXTENSIONS
1080 for (extension_list_t::iterator i = m_extensions.begin()
1081 , end(m_extensions.end()); i != end; ++i)
1083 if ((*i)->on_unchoke()) return;
1085 #endif
1087 #ifdef TORRENT_VERBOSE_LOGGING
1088 (*m_logger) << time_now_string() << " <== UNCHOKE\n";
1089 #endif
1090 m_peer_choked = false;
1091 if (is_disconnecting()) return;
1093 t->get_policy().unchoked(*this);
1096 // -----------------------------
1097 // -------- INTERESTED ---------
1098 // -----------------------------
1100 void peer_connection::incoming_interested()
1102 INVARIANT_CHECK;
1104 boost::shared_ptr<torrent> t = m_torrent.lock();
1105 TORRENT_ASSERT(t);
1107 #ifndef TORRENT_DISABLE_EXTENSIONS
1108 for (extension_list_t::iterator i = m_extensions.begin()
1109 , end(m_extensions.end()); i != end; ++i)
1111 if ((*i)->on_interested()) return;
1113 #endif
1115 #ifdef TORRENT_VERBOSE_LOGGING
1116 (*m_logger) << time_now_string() << " <== INTERESTED\n";
1117 #endif
1118 m_peer_interested = true;
1119 if (is_disconnecting()) return;
1120 t->get_policy().interested(*this);
1123 // -----------------------------
1124 // ------ NOT INTERESTED -------
1125 // -----------------------------
1127 void peer_connection::incoming_not_interested()
1129 INVARIANT_CHECK;
1131 #ifndef TORRENT_DISABLE_EXTENSIONS
1132 for (extension_list_t::iterator i = m_extensions.begin()
1133 , end(m_extensions.end()); i != end; ++i)
1135 if ((*i)->on_not_interested()) return;
1137 #endif
1139 m_became_uninterested = time_now();
1141 #ifdef TORRENT_VERBOSE_LOGGING
1142 (*m_logger) << time_now_string() << " <== NOT_INTERESTED\n";
1143 #endif
1144 m_peer_interested = false;
1145 if (is_disconnecting()) return;
1147 boost::shared_ptr<torrent> t = m_torrent.lock();
1148 TORRENT_ASSERT(t);
1150 t->get_policy().not_interested(*this);
1153 // -----------------------------
1154 // ----------- HAVE ------------
1155 // -----------------------------
1157 void peer_connection::incoming_have(int index)
1159 INVARIANT_CHECK;
1161 boost::shared_ptr<torrent> t = m_torrent.lock();
1162 TORRENT_ASSERT(t);
1164 #ifndef TORRENT_DISABLE_EXTENSIONS
1165 for (extension_list_t::iterator i = m_extensions.begin()
1166 , end(m_extensions.end()); i != end; ++i)
1168 if ((*i)->on_have(index)) return;
1170 #endif
1172 if (is_disconnecting()) return;
1174 // if we haven't received a bitfield, it was
1175 // probably omitted, which is the same as 'have_none'
1176 if (!m_bitfield_received) incoming_have_none();
1178 #ifdef TORRENT_VERBOSE_LOGGING
1179 (*m_logger) << time_now_string()
1180 << " <== HAVE [ piece: " << index << "]\n";
1181 #endif
1183 if (is_disconnecting()) return;
1185 if (!t->valid_metadata() && index > int(m_have_piece.size()))
1187 if (index < 65536)
1189 // if we don't have metadata
1190 // and we might not have received a bitfield
1191 // extend the bitmask to fit the new
1192 // have message
1193 m_have_piece.resize(index + 1, false);
1195 else
1197 // unless the index > 64k, in which case
1198 // we just ignore it
1199 return;
1203 // if we got an invalid message, abort
1204 if (index >= int(m_have_piece.size()) || index < 0)
1206 disconnect("got 'have'-message with higher index than the number of pieces", 2);
1207 return;
1210 if (m_have_piece[index])
1212 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
1213 (*m_logger) << " got redundant HAVE message for index: " << index << "\n";
1214 #endif
1216 else
1218 m_have_piece.set_bit(index);
1219 ++m_num_pieces;
1221 // only update the piece_picker if
1222 // we have the metadata and if
1223 // we're not a seed (in which case
1224 // we won't have a piece picker)
1225 if (t->valid_metadata())
1227 t->peer_has(index);
1229 if (!t->have_piece(index)
1230 && !t->is_seed()
1231 && !is_interesting()
1232 && t->picker().piece_priority(index) != 0)
1233 t->get_policy().peer_is_interesting(*this);
1235 // this will disregard all have messages we get within
1236 // the first two seconds. Since some clients implements
1237 // lazy bitfields, these will not be reliable to use
1238 // for an estimated peer download rate.
1239 if (!peer_info_struct() || time_now() - peer_info_struct()->connected > seconds(2))
1241 // update bytes downloaded since last timer
1242 m_remote_bytes_dled += t->torrent_file().piece_size(index);
1246 if (is_seed())
1248 m_peer_info->seed = true;
1249 m_upload_only = true;
1250 disconnect_if_redundant();
1251 if (is_disconnecting()) return;
1256 // -----------------------------
1257 // --------- BITFIELD ----------
1258 // -----------------------------
1260 void peer_connection::incoming_bitfield(bitfield const& bits)
1262 INVARIANT_CHECK;
1264 boost::shared_ptr<torrent> t = m_torrent.lock();
1265 TORRENT_ASSERT(t);
1267 #ifndef TORRENT_DISABLE_EXTENSIONS
1268 for (extension_list_t::iterator i = m_extensions.begin()
1269 , end(m_extensions.end()); i != end; ++i)
1271 if ((*i)->on_bitfield(bits)) return;
1273 #endif
1275 if (is_disconnecting()) return;
1277 #ifdef TORRENT_VERBOSE_LOGGING
1278 (*m_logger) << time_now_string() << " <== BITFIELD ";
1280 for (int i = 0; i < int(bits.size()); ++i)
1282 if (bits[i]) (*m_logger) << "1";
1283 else (*m_logger) << "0";
1285 (*m_logger) << "\n";
1286 #endif
1288 // if we don't have the metedata, we cannot
1289 // verify the bitfield size
1290 if (t->valid_metadata()
1291 && (bits.size() + 7) / 8 != (m_have_piece.size() + 7) / 8)
1293 std::stringstream msg;
1294 msg << "got bitfield with invalid size: " << ((bits.size() + 7) / 8)
1295 << "bytes. expected: " << ((m_have_piece.size() + 7) / 8)
1296 << " bytes";
1297 disconnect(msg.str().c_str(), 2);
1298 return;
1301 m_bitfield_received = true;
1303 // if we don't have metadata yet
1304 // just remember the bitmask
1305 // don't update the piecepicker
1306 // (since it doesn't exist yet)
1307 if (!t->ready_for_connections())
1309 m_have_piece = bits;
1310 m_num_pieces = bits.count();
1311 if (m_peer_info) m_peer_info->seed = (m_num_pieces == int(bits.size()));
1312 return;
1315 TORRENT_ASSERT(t->valid_metadata());
1317 int num_pieces = bits.count();
1318 if (num_pieces == int(m_have_piece.size()))
1320 #ifdef TORRENT_VERBOSE_LOGGING
1321 (*m_logger) << " *** THIS IS A SEED ***\n";
1322 #endif
1323 // if this is a web seed. we don't have a peer_info struct
1324 if (m_peer_info) m_peer_info->seed = true;
1325 m_upload_only = true;
1327 m_have_piece.set_all();
1328 m_num_pieces = num_pieces;
1329 t->peer_has_all();
1330 if (!t->is_finished())
1331 t->get_policy().peer_is_interesting(*this);
1333 disconnect_if_redundant();
1335 return;
1338 // let the torrent know which pieces the
1339 // peer has
1340 // if we're a seed, we don't keep track of piece availability
1341 bool interesting = false;
1342 if (!t->is_seed())
1344 t->peer_has(bits);
1346 for (int i = 0; i < (int)m_have_piece.size(); ++i)
1348 bool have = bits[i];
1349 if (have && !m_have_piece[i])
1351 if (!t->have_piece(i) && t->picker().piece_priority(i) != 0)
1352 interesting = true;
1354 else if (!have && m_have_piece[i])
1356 // this should probably not be allowed
1357 t->peer_lost(i);
1362 m_have_piece = bits;
1363 m_num_pieces = num_pieces;
1365 if (interesting) t->get_policy().peer_is_interesting(*this);
1366 else if (upload_only()) disconnect("upload to upload connections");
1369 void peer_connection::disconnect_if_redundant()
1371 if (!m_ses.settings().close_redundant_connections) return;
1373 boost::shared_ptr<torrent> t = m_torrent.lock();
1374 TORRENT_ASSERT(t);
1375 if (m_upload_only && t->is_finished())
1376 disconnect("seed to seed");
1378 if (m_upload_only
1379 && !m_interesting
1380 && m_bitfield_received
1381 && t->are_files_checked())
1382 disconnect("uninteresting upload-only peer");
1385 // -----------------------------
1386 // ---------- REQUEST ----------
1387 // -----------------------------
1389 void peer_connection::incoming_request(peer_request const& r)
1391 INVARIANT_CHECK;
1393 boost::shared_ptr<torrent> t = m_torrent.lock();
1394 TORRENT_ASSERT(t);
1396 // if we haven't received a bitfield, it was
1397 // probably omitted, which is the same as 'have_none'
1398 if (!m_bitfield_received) incoming_have_none();
1399 if (is_disconnecting()) return;
1401 #ifndef TORRENT_DISABLE_EXTENSIONS
1402 for (extension_list_t::iterator i = m_extensions.begin()
1403 , end(m_extensions.end()); i != end; ++i)
1405 if ((*i)->on_request(r)) return;
1407 #endif
1408 if (is_disconnecting()) return;
1410 if (!t->valid_metadata())
1412 // if we don't have valid metadata yet,
1413 // we shouldn't get a request
1414 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
1415 (*m_logger) << time_now_string()
1416 << " <== UNEXPECTED_REQUEST [ "
1417 "piece: " << r.piece << " | "
1418 "s: " << r.start << " | "
1419 "l: " << r.length << " | "
1420 "i: " << m_peer_interested << " | "
1421 "t: " << t->torrent_file().piece_size(r.piece) << " | "
1422 "n: " << t->torrent_file().num_pieces() << " ]\n";
1424 (*m_logger) << time_now_string()
1425 << " ==> REJECT_PIECE [ "
1426 "piece: " << r.piece << " | "
1427 "s: " << r.start << " | "
1428 "l: " << r.length << " ]\n";
1429 #endif
1430 write_reject_request(r);
1431 return;
1434 if (int(m_requests.size()) > m_ses.settings().max_allowed_in_request_queue)
1436 // don't allow clients to abuse our
1437 // memory consumption.
1438 // ignore requests if the client
1439 // is making too many of them.
1440 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
1441 (*m_logger) << time_now_string()
1442 << " <== TOO MANY REQUESTS [ "
1443 "piece: " << r.piece << " | "
1444 "s: " << r.start << " | "
1445 "l: " << r.length << " | "
1446 "i: " << m_peer_interested << " | "
1447 "t: " << t->torrent_file().piece_size(r.piece) << " | "
1448 "n: " << t->torrent_file().num_pieces() << " ]\n";
1450 (*m_logger) << time_now_string()
1451 << " ==> REJECT_PIECE [ "
1452 "piece: " << r.piece << " | "
1453 "s: " << r.start << " | "
1454 "l: " << r.length << " ]\n";
1455 #endif
1456 write_reject_request(r);
1457 return;
1460 // make sure this request
1461 // is legal and that the peer
1462 // is not choked
1463 if (r.piece >= 0
1464 && r.piece < t->torrent_file().num_pieces()
1465 && t->have_piece(r.piece)
1466 && r.start >= 0
1467 && r.start < t->torrent_file().piece_size(r.piece)
1468 && r.length > 0
1469 && r.length + r.start <= t->torrent_file().piece_size(r.piece)
1470 && m_peer_interested
1471 && r.length <= t->block_size())
1473 #ifdef TORRENT_VERBOSE_LOGGING
1474 (*m_logger) << time_now_string()
1475 << " <== REQUEST [ piece: " << r.piece << " | s: " << r.start << " | l: " << r.length << " ]\n";
1476 #endif
1477 // if we have choked the client
1478 // ignore the request
1479 if (m_choked && m_accept_fast.find(r.piece) == m_accept_fast.end())
1481 write_reject_request(r);
1482 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
1483 (*m_logger) << time_now_string()
1484 << " *** REJECTING REQUEST [ peer choked and piece not in allowed fast set ]\n";
1485 (*m_logger) << time_now_string()
1486 << " ==> REJECT_PIECE [ "
1487 "piece: " << r.piece << " | "
1488 "s: " << r.start << " | "
1489 "l: " << r.length << " ]\n";
1490 #endif
1492 else
1494 m_requests.push_back(r);
1495 m_last_incoming_request = time_now();
1496 fill_send_buffer();
1499 else
1501 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
1502 (*m_logger) << time_now_string()
1503 << " <== INVALID_REQUEST [ "
1504 "piece: " << r.piece << " | "
1505 "s: " << r.start << " | "
1506 "l: " << r.length << " | "
1507 "i: " << m_peer_interested << " | "
1508 "t: " << t->torrent_file().piece_size(r.piece) << " | "
1509 "n: " << t->torrent_file().num_pieces() << " | "
1510 "h: " << t->have_piece(r.piece) << " | "
1511 "block_limit: " << t->block_size() << " ]\n";
1513 (*m_logger) << time_now_string()
1514 << " ==> REJECT_PIECE [ "
1515 "piece: " << r.piece << " | "
1516 "s: " << r.start << " | "
1517 "l: " << r.length << " ]\n";
1518 #endif
1520 write_reject_request(r);
1521 ++m_num_invalid_requests;
1523 if (t->alerts().should_post<invalid_request_alert>())
1525 t->alerts().post_alert(invalid_request_alert(
1526 t->get_handle(), m_remote, m_peer_id, r));
1531 void peer_connection::incoming_piece_fragment()
1533 m_last_piece = time_now();
1536 #ifndef NDEBUG
1537 struct check_postcondition
1539 check_postcondition(boost::shared_ptr<torrent> const& t_
1540 , bool init_check = true): t(t_) { if (init_check) check(); }
1542 ~check_postcondition() { check(); }
1544 void check()
1546 if (!t->is_seed())
1548 const int blocks_per_piece = static_cast<int>(
1549 t->torrent_file().piece_length() / t->block_size());
1551 std::vector<piece_picker::downloading_piece> const& dl_queue
1552 = t->picker().get_download_queue();
1554 for (std::vector<piece_picker::downloading_piece>::const_iterator i =
1555 dl_queue.begin(); i != dl_queue.end(); ++i)
1557 TORRENT_ASSERT(i->finished <= blocks_per_piece);
1562 shared_ptr<torrent> t;
1564 #endif
1567 // -----------------------------
1568 // ----------- PIECE -----------
1569 // -----------------------------
1571 void peer_connection::incoming_piece(peer_request const& p, char const* data)
1573 char* buffer = m_ses.allocate_disk_buffer();
1574 if (buffer == 0)
1576 disconnect("out of memory");
1577 return;
1579 disk_buffer_holder holder(m_ses, buffer);
1580 std::memcpy(buffer, data, p.length);
1581 incoming_piece(p, holder);
1584 void peer_connection::incoming_piece(peer_request const& p, disk_buffer_holder& data)
1586 INVARIANT_CHECK;
1588 boost::shared_ptr<torrent> t = m_torrent.lock();
1589 TORRENT_ASSERT(t);
1591 TORRENT_ASSERT(!m_disk_recv_buffer);
1592 TORRENT_ASSERT(m_disk_recv_buffer_size == 0);
1594 #ifdef TORRENT_CORRUPT_DATA
1595 // corrupt all pieces from certain peers
1596 if (m_remote.address().is_v4()
1597 && (m_remote.address().to_v4().to_ulong() & 0xf) == 0)
1599 data.get()[0] = ~data.get()[0];
1601 #endif
1603 // if we haven't received a bitfield, it was
1604 // probably omitted, which is the same as 'have_none'
1605 if (!m_bitfield_received) incoming_have_none();
1606 if (is_disconnecting()) return;
1608 #ifndef TORRENT_DISABLE_EXTENSIONS
1609 for (extension_list_t::iterator i = m_extensions.begin()
1610 , end(m_extensions.end()); i != end; ++i)
1612 if ((*i)->on_piece(p, data)) return;
1614 #endif
1615 if (is_disconnecting()) return;
1617 #ifndef NDEBUG
1618 check_postcondition post_checker_(t);
1619 #if !defined TORRENT_DISABLE_INVARIANT_CHECKS
1620 t->check_invariant();
1621 #endif
1622 #endif
1624 #ifdef TORRENT_VERBOSE_LOGGING
1625 (*m_logger) << time_now_string()
1626 << " <== PIECE [ piece: " << p.piece << " | "
1627 "s: " << p.start << " | "
1628 "l: " << p.length << " | "
1629 "ds: " << statistics().download_rate() << " | "
1630 "qs: " << int(m_desired_queue_size) << " ]\n";
1631 #endif
1633 if (p.length == 0)
1635 if (t->alerts().should_post<peer_error_alert>())
1637 t->alerts().post_alert(peer_error_alert(t->get_handle(), m_remote
1638 , m_peer_id, "peer sent 0 length piece"));
1640 return;
1643 if (!verify_piece(p))
1645 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
1646 (*m_logger) << time_now_string()
1647 << " <== INVALID_PIECE [ piece: " << p.piece << " | "
1648 "start: " << p.start << " | "
1649 "length: " << p.length << " ]\n";
1650 #endif
1651 disconnect("got invalid piece packet", 2);
1652 return;
1655 // if we're already seeding, don't bother,
1656 // just ignore it
1657 if (t->is_seed())
1659 t->add_redundant_bytes(p.length);
1660 return;
1663 ptime now = time_now();
1665 piece_picker& picker = t->picker();
1666 piece_manager& fs = t->filesystem();
1668 std::vector<piece_block> finished_blocks;
1669 piece_block block_finished(p.piece, p.start / t->block_size());
1670 TORRENT_ASSERT(p.start % t->block_size() == 0);
1671 TORRENT_ASSERT(p.length == t->block_size()
1672 || p.length == t->torrent_file().total_size() % t->block_size());
1674 std::deque<pending_block>::iterator b
1675 = std::find_if(
1676 m_download_queue.begin()
1677 , m_download_queue.end()
1678 , has_block(block_finished));
1680 if (b == m_download_queue.end())
1682 if (t->alerts().should_post<unwanted_block_alert>())
1684 t->alerts().post_alert(unwanted_block_alert(t->get_handle(), m_remote
1685 , m_peer_id, block_finished.block_index, block_finished.piece_index));
1687 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
1688 (*m_logger) << " *** The block we just got was not in the "
1689 "request queue ***\n";
1690 #endif
1691 t->add_redundant_bytes(p.length);
1692 request_a_block(*t, *this);
1693 send_block_requests();
1694 return;
1696 #ifndef NDEBUG
1697 pending_block pending_b = *b;
1698 #endif
1700 int block_index = b - m_download_queue.begin() - 1;
1701 for (int i = 0; i < block_index; ++i)
1703 pending_block& qe = m_download_queue[i];
1705 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
1706 (*m_logger) << time_now_string()
1707 << " *** SKIPPED_PIECE [ piece: " << qe.block.piece_index << " | "
1708 "b: " << qe.block.block_index << " ] ***\n";
1709 #endif
1711 ++qe.skipped;
1712 // if the number of times a block is skipped by out of order
1713 // blocks exceeds the size of the outstanding queue, assume that
1714 // the other end dropped the request.
1715 if (qe.skipped > m_desired_queue_size)
1717 if (m_ses.m_alerts.should_post<request_dropped_alert>())
1718 m_ses.m_alerts.post_alert(request_dropped_alert(t->get_handle()
1719 , remote(), pid(), qe.block.block_index, qe.block.piece_index));
1720 picker.abort_download(qe.block);
1721 TORRENT_ASSERT(m_download_queue.begin() + i != b);
1722 m_download_queue.erase(m_download_queue.begin() + i);
1723 --i;
1724 --block_index;
1727 TORRENT_ASSERT(int(m_download_queue.size()) > block_index + 1);
1728 b = m_download_queue.begin() + (block_index + 1);
1729 TORRENT_ASSERT(b->block == pending_b.block);
1731 // if the block we got is already finished, then ignore it
1732 if (picker.is_downloaded(block_finished))
1734 t->add_redundant_bytes(p.length);
1736 m_download_queue.erase(b);
1737 m_timeout_extend = 0;
1739 if (!m_download_queue.empty())
1740 m_requested = now;
1742 request_a_block(*t, *this);
1743 send_block_requests();
1744 return;
1747 if (total_seconds(now - m_requested)
1748 < m_ses.settings().request_timeout
1749 && m_snubbed)
1751 m_snubbed = false;
1752 if (m_ses.m_alerts.should_post<peer_unsnubbed_alert>())
1754 m_ses.m_alerts.post_alert(peer_unsnubbed_alert(t->get_handle()
1755 , m_remote, m_peer_id));
1759 fs.async_write(p, data, bind(&peer_connection::on_disk_write_complete
1760 , self(), _1, _2, p, t));
1761 m_outstanding_writing_bytes += p.length;
1762 TORRENT_ASSERT(m_channel_state[download_channel] == peer_info::bw_idle);
1763 m_download_queue.erase(b);
1765 if (m_outstanding_writing_bytes >= m_ses.settings().max_outstanding_disk_bytes_per_connection
1766 && t->alerts().should_post<performance_alert>())
1768 t->alerts().post_alert(performance_alert(t->get_handle()
1769 , performance_alert::outstanding_disk_buffer_limit_reached));
1772 if (!m_download_queue.empty())
1774 m_timeout_extend = (std::max)(m_timeout_extend
1775 - m_ses.settings().request_timeout, 0);
1776 m_requested += seconds(m_ses.settings().request_timeout);
1777 if (m_requested > now) m_requested = now;
1779 else
1781 m_timeout_extend = 0;
1784 // did we request this block from any other peers?
1785 bool multi = picker.num_peers(block_finished) > 1;
1786 picker.mark_as_writing(block_finished, peer_info_struct());
1788 TORRENT_ASSERT(picker.num_peers(block_finished) == 0);
1789 // if we requested this block from other peers, cancel it now
1790 if (multi) t->cancel_block(block_finished);
1792 TORRENT_ASSERT(picker.num_peers(block_finished) == 0);
1794 #if !defined NDEBUG && !defined TORRENT_DISABLE_INVARIANT_CHECKS
1795 t->check_invariant();
1796 #endif
1797 request_a_block(*t, *this);
1798 send_block_requests();
1801 void peer_connection::on_disk_write_complete(int ret, disk_io_job const& j
1802 , peer_request p, boost::shared_ptr<torrent> t)
1804 session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
1806 INVARIANT_CHECK;
1808 m_outstanding_writing_bytes -= p.length;
1809 TORRENT_ASSERT(m_outstanding_writing_bytes >= 0);
1811 #if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING)
1812 // (*m_ses.m_logger) << time_now_string() << " *** DISK_WRITE_COMPLETE [ p: "
1813 // << p.piece << " o: " << p.start << " ]\n";
1814 #endif
1815 // in case the outstanding bytes just dropped down
1816 // to allow to receive more data
1817 setup_receive();
1819 piece_block block_finished(p.piece, p.start / t->block_size());
1821 if (ret == -1 || !t)
1823 if (t->has_picker()) t->picker().write_failed(block_finished);
1825 if (!t)
1827 disconnect(j.str.c_str());
1828 return;
1831 if (t->alerts().should_post<file_error_alert>())
1832 t->alerts().post_alert(file_error_alert(j.error_file, t->get_handle(), j.str));
1833 t->set_error(j.str);
1834 t->pause();
1835 return;
1838 if (t->is_seed()) return;
1840 piece_picker& picker = t->picker();
1842 TORRENT_ASSERT(p.piece == j.piece);
1843 TORRENT_ASSERT(p.start == j.offset);
1844 TORRENT_ASSERT(picker.num_peers(block_finished) == 0);
1845 picker.mark_as_finished(block_finished, peer_info_struct());
1846 if (t->alerts().should_post<block_finished_alert>())
1848 t->alerts().post_alert(block_finished_alert(t->get_handle(),
1849 remote(), pid(), block_finished.block_index, block_finished.piece_index));
1852 // did we just finish the piece?
1853 if (picker.is_piece_finished(p.piece))
1855 #ifndef NDEBUG
1856 check_postcondition post_checker2_(t, false);
1857 #endif
1858 t->async_verify_piece(p.piece, bind(&torrent::piece_finished, t
1859 , p.piece, _1));
1862 if (!t->is_seed() && !m_torrent.expired())
1864 // this is a free function defined in policy.cpp
1865 request_a_block(*t, *this);
1866 send_block_requests();
1871 // -----------------------------
1872 // ---------- CANCEL -----------
1873 // -----------------------------
1875 void peer_connection::incoming_cancel(peer_request const& r)
1877 INVARIANT_CHECK;
1879 #ifndef TORRENT_DISABLE_EXTENSIONS
1880 for (extension_list_t::iterator i = m_extensions.begin()
1881 , end(m_extensions.end()); i != end; ++i)
1883 if ((*i)->on_cancel(r)) return;
1885 #endif
1886 if (is_disconnecting()) return;
1888 #ifdef TORRENT_VERBOSE_LOGGING
1889 (*m_logger) << time_now_string()
1890 << " <== CANCEL [ piece: " << r.piece << " | s: " << r.start << " | l: " << r.length << " ]\n";
1891 #endif
1893 std::deque<peer_request>::iterator i
1894 = std::find(m_requests.begin(), m_requests.end(), r);
1896 if (i != m_requests.end())
1898 m_requests.erase(i);
1899 #ifdef TORRENT_VERBOSE_LOGGING
1900 (*m_logger) << time_now_string()
1901 << " ==> REJECT_PIECE [ "
1902 "piece: " << r.piece << " | "
1903 "s: " << r.start << " | "
1904 "l: " << r.length << " ]\n";
1905 #endif
1906 write_reject_request(r);
1908 else
1910 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
1911 (*m_logger) << time_now_string() << " *** GOT CANCEL NOT IN THE QUEUE\n";
1912 #endif
1916 // -----------------------------
1917 // --------- DHT PORT ----------
1918 // -----------------------------
1920 void peer_connection::incoming_dht_port(int listen_port)
1922 INVARIANT_CHECK;
1924 #ifdef TORRENT_VERBOSE_LOGGING
1925 (*m_logger) << time_now_string()
1926 << " <== DHT_PORT [ p: " << listen_port << " ]\n";
1927 #endif
1928 #ifndef TORRENT_DISABLE_DHT
1929 m_ses.add_dht_node(udp::endpoint(
1930 m_remote.address(), listen_port));
1931 #endif
1934 // -----------------------------
1935 // --------- HAVE ALL ----------
1936 // -----------------------------
1938 void peer_connection::incoming_have_all()
1940 INVARIANT_CHECK;
1942 boost::shared_ptr<torrent> t = m_torrent.lock();
1943 TORRENT_ASSERT(t);
1945 #ifdef TORRENT_VERBOSE_LOGGING
1946 (*m_logger) << time_now_string() << " <== HAVE_ALL\n";
1947 #endif
1949 #ifndef TORRENT_DISABLE_EXTENSIONS
1950 for (extension_list_t::iterator i = m_extensions.begin()
1951 , end(m_extensions.end()); i != end; ++i)
1953 if ((*i)->on_have_all()) return;
1955 #endif
1956 if (is_disconnecting()) return;
1958 m_have_all = true;
1960 if (m_peer_info) m_peer_info->seed = true;
1961 m_upload_only = true;
1962 m_bitfield_received = true;
1964 #ifdef TORRENT_VERBOSE_LOGGING
1965 (*m_logger) << " *** THIS IS A SEED ***\n";
1966 #endif
1968 // if we don't have metadata yet
1969 // just remember the bitmask
1970 // don't update the piecepicker
1971 // (since it doesn't exist yet)
1972 if (!t->ready_for_connections())
1974 // assume seeds are interesting when we
1975 // don't even have the metadata
1976 t->get_policy().peer_is_interesting(*this);
1978 disconnect_if_redundant();
1979 // TODO: this might need something more
1980 // so that once we have the metadata
1981 // we can construct a full bitfield
1982 return;
1985 TORRENT_ASSERT(!m_have_piece.empty());
1986 m_have_piece.set_all();
1987 m_num_pieces = m_have_piece.size();
1989 t->peer_has_all();
1991 // if we're finished, we're not interested
1992 if (t->is_finished()) send_not_interested();
1993 else t->get_policy().peer_is_interesting(*this);
1995 disconnect_if_redundant();
1998 // -----------------------------
1999 // --------- HAVE NONE ---------
2000 // -----------------------------
2002 void peer_connection::incoming_have_none()
2004 INVARIANT_CHECK;
2006 #ifdef TORRENT_VERBOSE_LOGGING
2007 (*m_logger) << time_now_string() << " <== HAVE_NONE\n";
2008 #endif
2010 boost::shared_ptr<torrent> t = m_torrent.lock();
2011 TORRENT_ASSERT(t);
2013 #ifndef TORRENT_DISABLE_EXTENSIONS
2014 for (extension_list_t::iterator i = m_extensions.begin()
2015 , end(m_extensions.end()); i != end; ++i)
2017 if ((*i)->on_have_none()) return;
2019 #endif
2020 if (is_disconnecting()) return;
2021 if (m_peer_info) m_peer_info->seed = false;
2022 m_bitfield_received = true;
2024 // we're never interested in a peer that doesn't have anything
2025 send_not_interested();
2027 TORRENT_ASSERT(!m_have_piece.empty() || !t->ready_for_connections());
2028 disconnect_if_redundant();
2031 // -----------------------------
2032 // ------- ALLOWED FAST --------
2033 // -----------------------------
2035 void peer_connection::incoming_allowed_fast(int index)
2037 INVARIANT_CHECK;
2039 boost::shared_ptr<torrent> t = m_torrent.lock();
2040 TORRENT_ASSERT(t);
2042 #ifdef TORRENT_VERBOSE_LOGGING
2043 (*m_logger) << time_now_string() << " <== ALLOWED_FAST [ " << index << " ]\n";
2044 #endif
2046 #ifndef TORRENT_DISABLE_EXTENSIONS
2047 for (extension_list_t::iterator i = m_extensions.begin()
2048 , end(m_extensions.end()); i != end; ++i)
2050 if ((*i)->on_allowed_fast(index)) return;
2052 #endif
2053 if (is_disconnecting()) return;
2055 if (t->valid_metadata())
2057 if (index < 0 || index >= int(m_have_piece.size()))
2059 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
2060 (*m_logger) << time_now_string() << " <== INVALID_ALLOWED_FAST [ " << index << " | s: "
2061 << int(m_have_piece.size()) << " ]\n";
2062 #endif
2063 return;
2066 // if we already have the piece, we can
2067 // ignore this message
2068 if (t->have_piece(index))
2069 return;
2072 m_allowed_fast.push_back(index);
2074 // if the peer has the piece and we want
2075 // to download it, request it
2076 if (int(m_have_piece.size()) > index
2077 && m_have_piece[index]
2078 && t->valid_metadata()
2079 && t->has_picker()
2080 && t->picker().piece_priority(index) > 0)
2082 t->get_policy().peer_is_interesting(*this);
2086 std::vector<int> const& peer_connection::allowed_fast()
2088 boost::shared_ptr<torrent> t = m_torrent.lock();
2089 TORRENT_ASSERT(t);
2091 m_allowed_fast.erase(std::remove_if(m_allowed_fast.begin()
2092 , m_allowed_fast.end(), bind(&torrent::have_piece, t, _1))
2093 , m_allowed_fast.end());
2095 // TODO: sort the allowed fast set in priority order
2096 return m_allowed_fast;
2099 void peer_connection::add_request(piece_block const& block)
2101 // INVARIANT_CHECK;
2103 boost::shared_ptr<torrent> t = m_torrent.lock();
2104 TORRENT_ASSERT(t);
2106 TORRENT_ASSERT(t->valid_metadata());
2107 TORRENT_ASSERT(block.piece_index >= 0);
2108 TORRENT_ASSERT(block.piece_index < t->torrent_file().num_pieces());
2109 TORRENT_ASSERT(block.block_index >= 0);
2110 TORRENT_ASSERT(block.block_index < t->torrent_file().piece_size(block.piece_index));
2111 TORRENT_ASSERT(!t->picker().is_requested(block) || (t->picker().num_peers(block) > 0));
2112 TORRENT_ASSERT(!t->have_piece(block.piece_index));
2113 TORRENT_ASSERT(std::find_if(m_download_queue.begin(), m_download_queue.end()
2114 , has_block(block)) == m_download_queue.end());
2115 TORRENT_ASSERT(std::find(m_request_queue.begin(), m_request_queue.end()
2116 , block) == m_request_queue.end());
2118 piece_picker::piece_state_t state;
2119 peer_speed_t speed = peer_speed();
2120 char const* speedmsg = 0;
2121 if (speed == fast)
2123 speedmsg = "fast";
2124 state = piece_picker::fast;
2126 else if (speed == medium)
2128 speedmsg = "medium";
2129 state = piece_picker::medium;
2131 else
2133 speedmsg = "slow";
2134 state = piece_picker::slow;
2137 if (!t->picker().mark_as_downloading(block, peer_info_struct(), state))
2138 return;
2140 if (t->alerts().should_post<block_downloading_alert>())
2142 t->alerts().post_alert(block_downloading_alert(t->get_handle(),
2143 remote(), pid(), speedmsg, block.block_index, block.piece_index));
2146 m_request_queue.push_back(block);
2149 void peer_connection::cancel_request(piece_block const& block)
2151 INVARIANT_CHECK;
2153 boost::shared_ptr<torrent> t = m_torrent.lock();
2154 // this peer might be disconnecting
2155 if (!t) return;
2157 TORRENT_ASSERT(t->valid_metadata());
2159 TORRENT_ASSERT(block.piece_index >= 0);
2160 TORRENT_ASSERT(block.piece_index < t->torrent_file().num_pieces());
2161 TORRENT_ASSERT(block.block_index >= 0);
2162 TORRENT_ASSERT(block.block_index < t->torrent_file().piece_size(block.piece_index));
2164 // if all the peers that requested this block has been
2165 // cancelled, then just ignore the cancel.
2166 if (!t->picker().is_requested(block)) return;
2168 std::deque<pending_block>::iterator it
2169 = std::find_if(m_download_queue.begin(), m_download_queue.end(), has_block(block));
2170 if (it == m_download_queue.end())
2172 std::deque<piece_block>::iterator rit = std::find(m_request_queue.begin()
2173 , m_request_queue.end(), block);
2175 // when a multi block is received, it is cancelled
2176 // from all peers, so if this one hasn't requested
2177 // the block, just ignore to cancel it.
2178 if (rit == m_request_queue.end()) return;
2180 t->picker().abort_download(block);
2181 m_request_queue.erase(rit);
2182 // since we found it in the request queue, it means it hasn't been
2183 // sent yet, so we don't have to send a cancel.
2184 return;
2187 int block_offset = block.block_index * t->block_size();
2188 int block_size
2189 = (std::min)(t->torrent_file().piece_size(block.piece_index)-block_offset,
2190 t->block_size());
2191 TORRENT_ASSERT(block_size > 0);
2192 TORRENT_ASSERT(block_size <= t->block_size());
2194 peer_request r;
2195 r.piece = block.piece_index;
2196 r.start = block_offset;
2197 r.length = block_size;
2199 #ifdef TORRENT_VERBOSE_LOGGING
2200 (*m_logger) << time_now_string()
2201 << " ==> CANCEL [ piece: " << block.piece_index << " | s: "
2202 << block_offset << " | l: " << block_size << " | " << block.block_index << " ]\n";
2203 #endif
2204 write_cancel(r);
2207 void peer_connection::send_choke()
2209 INVARIANT_CHECK;
2211 TORRENT_ASSERT(!m_peer_info || !m_peer_info->optimistically_unchoked);
2213 if (m_choked) return;
2214 write_choke();
2215 m_choked = true;
2217 #ifdef TORRENT_VERBOSE_LOGGING
2218 (*m_logger) << time_now_string() << " ==> CHOKE\n";
2219 #endif
2220 #ifndef NDEBUG
2221 m_last_choke = time_now();
2222 #endif
2223 m_num_invalid_requests = 0;
2225 // reject the requests we have in the queue
2226 // except the allowed fast pieces
2227 for (std::deque<peer_request>::iterator i = m_requests.begin();
2228 i != m_requests.end();)
2230 if (m_accept_fast.count(i->piece))
2232 ++i;
2233 continue;
2236 peer_request const& r = *i;
2237 write_reject_request(r);
2239 #ifdef TORRENT_VERBOSE_LOGGING
2240 (*m_logger) << time_now_string()
2241 << " ==> REJECT_PIECE [ "
2242 "piece: " << r.piece << " | "
2243 "s: " << r.start << " | "
2244 "l: " << r.length << " ]\n";
2245 #endif
2246 i = m_requests.erase(i);
2250 bool peer_connection::send_unchoke()
2252 INVARIANT_CHECK;
2254 if (!m_choked) return false;
2255 boost::shared_ptr<torrent> t = m_torrent.lock();
2256 if (!t->ready_for_connections()) return false;
2257 m_last_unchoke = time_now();
2258 write_unchoke();
2259 m_choked = false;
2261 #ifdef TORRENT_VERBOSE_LOGGING
2262 (*m_logger) << time_now_string() << " ==> UNCHOKE\n";
2263 #endif
2264 return true;
2267 void peer_connection::send_interested()
2269 if (m_interesting) return;
2270 boost::shared_ptr<torrent> t = m_torrent.lock();
2271 if (!t->ready_for_connections()) return;
2272 m_interesting = true;
2273 write_interested();
2275 #ifdef TORRENT_VERBOSE_LOGGING
2276 (*m_logger) << time_now_string() << " ==> INTERESTED\n";
2277 #endif
2280 void peer_connection::send_not_interested()
2282 if (!m_interesting) return;
2283 boost::shared_ptr<torrent> t = m_torrent.lock();
2284 if (!t->ready_for_connections()) return;
2285 m_interesting = false;
2286 write_not_interested();
2288 m_became_uninteresting = time_now();
2290 #ifdef TORRENT_VERBOSE_LOGGING
2291 (*m_logger) << time_now_string() << " ==> NOT_INTERESTED\n";
2292 #endif
2293 disconnect_if_redundant();
2296 void peer_connection::send_block_requests()
2298 INVARIANT_CHECK;
2300 boost::shared_ptr<torrent> t = m_torrent.lock();
2301 TORRENT_ASSERT(t);
2303 if ((int)m_download_queue.size() >= m_desired_queue_size) return;
2305 bool empty_download_queue = m_download_queue.empty();
2307 while (!m_request_queue.empty()
2308 && (int)m_download_queue.size() < m_desired_queue_size)
2310 piece_block block = m_request_queue.front();
2312 int block_offset = block.block_index * t->block_size();
2313 int block_size = (std::min)(t->torrent_file().piece_size(
2314 block.piece_index) - block_offset, t->block_size());
2315 TORRENT_ASSERT(block_size > 0);
2316 TORRENT_ASSERT(block_size <= t->block_size());
2318 peer_request r;
2319 r.piece = block.piece_index;
2320 r.start = block_offset;
2321 r.length = block_size;
2323 m_request_queue.pop_front();
2324 if (t->is_seed()) continue;
2325 // this can happen if a block times out, is re-requested and
2326 // then arrives "unexpectedly"
2327 if (t->picker().is_finished(block) || t->picker().is_downloaded(block))
2328 continue;
2330 m_download_queue.push_back(block);
2332 #ifdef TORRENT_VERBOSE_LOGGING
2333 (*m_logger) << time_now_string()
2334 << " *** REQUEST-QUEUE** [ "
2335 "piece: " << block.piece_index << " | "
2336 "block: " << block.block_index << " ]\n";
2337 #endif
2339 // if we are requesting large blocks, merge the smaller
2340 // blocks that are in the same piece into larger requests
2341 if (m_request_large_blocks)
2343 int blocks_per_piece = t->torrent_file().piece_length() / t->block_size();
2345 while (!m_request_queue.empty())
2347 // check to see if this block is connected to the previous one
2348 // if it is, merge them, otherwise, break this merge loop
2349 piece_block const& front = m_request_queue.front();
2350 if (front.piece_index * blocks_per_piece + front.block_index
2351 != block.piece_index * blocks_per_piece + block.block_index + 1)
2352 break;
2353 block = m_request_queue.front();
2354 m_request_queue.pop_front();
2355 m_download_queue.push_back(block);
2357 #ifdef TORRENT_VERBOSE_LOGGING
2358 (*m_logger) << time_now_string()
2359 << " *** MERGING REQUEST ** [ "
2360 "piece: " << block.piece_index << " | "
2361 "block: " << block.block_index << " ]\n";
2362 #endif
2364 block_offset = block.block_index * t->block_size();
2365 block_size = (std::min)(t->torrent_file().piece_size(
2366 block.piece_index) - block_offset, t->block_size());
2367 TORRENT_ASSERT(block_size > 0);
2368 TORRENT_ASSERT(block_size <= t->block_size());
2370 r.length += block_size;
2374 TORRENT_ASSERT(verify_piece(r));
2376 #ifndef TORRENT_DISABLE_EXTENSIONS
2377 bool handled = false;
2378 for (extension_list_t::iterator i = m_extensions.begin()
2379 , end(m_extensions.end()); i != end; ++i)
2381 if (handled = (*i)->write_request(r)) break;
2383 if (is_disconnecting()) return;
2384 if (!handled)
2386 write_request(r);
2387 m_last_request = time_now();
2389 #else
2390 write_request(r);
2391 m_last_request = time_now();
2392 #endif
2394 #ifdef TORRENT_VERBOSE_LOGGING
2395 (*m_logger) << time_now_string()
2396 << " ==> REQUEST [ "
2397 "piece: " << r.piece << " | "
2398 "s: " << r.start << " | "
2399 "l: " << r.length << " | "
2400 "ds: " << statistics().download_rate() << " B/s | "
2401 "qs: " << int(m_desired_queue_size) << " "
2402 "blk: " << (m_request_large_blocks?"large":"single") << " ]\n";
2403 #endif
2405 m_last_piece = time_now();
2407 if (!m_download_queue.empty()
2408 && empty_download_queue)
2410 // This means we just added a request to this connection
2411 m_requested = time_now();
2415 void peer_connection::timed_out()
2417 TORRENT_ASSERT(m_connecting);
2418 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_LOGGING || defined TORRENT_ERROR_LOGGING
2419 (*m_ses.m_logger) << time_now_string() << " CONNECTION TIMED OUT: " << m_remote.address().to_string()
2420 << "\n";
2421 #endif
2422 disconnect("timed out: connect", 1);
2425 // the error argument defaults to 0, which means deliberate disconnect
2426 // 1 means unexpected disconnect/error
2427 // 2 protocol error (client sent something invalid)
2428 void peer_connection::disconnect(char const* message, int error)
2430 session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
2432 #ifndef NDEBUG
2433 m_disconnect_started = true;
2434 #endif
2436 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
2437 switch (error)
2439 case 0:
2440 (*m_logger) << "*** CONNECTION CLOSED " << message << "\n";
2441 break;
2442 case 1:
2443 (*m_logger) << "*** CONNECTION FAILED " << message << "\n";
2444 break;
2445 case 2:
2446 (*m_logger) << "*** PEER ERROR " << message << "\n";
2447 break;
2449 #endif
2450 // we cannot do this in a constructor
2451 TORRENT_ASSERT(m_in_constructor == false);
2452 if (error > 0) m_failed = true;
2453 if (m_disconnecting) return;
2454 boost::intrusive_ptr<peer_connection> me(this);
2456 INVARIANT_CHECK;
2458 if (m_connecting && m_connection_ticket >= 0)
2460 m_ses.m_half_open.done(m_connection_ticket);
2461 m_connection_ticket = -1;
2464 boost::shared_ptr<torrent> t = m_torrent.lock();
2465 torrent_handle handle;
2466 if (t) handle = t->get_handle();
2468 if (message)
2470 if (error > 1 && m_ses.m_alerts.should_post<peer_error_alert>())
2472 m_ses.m_alerts.post_alert(
2473 peer_error_alert(handle, remote(), pid(), message));
2475 else if (error <= 1 && m_ses.m_alerts.should_post<peer_disconnected_alert>())
2477 m_ses.m_alerts.post_alert(
2478 peer_disconnected_alert(handle, remote(), pid(), message));
2482 if (t)
2484 // make sure we keep all the stats!
2485 calc_ip_overhead();
2486 t->add_stats(statistics());
2488 if (t->has_picker())
2490 piece_picker& picker = t->picker();
2492 while (!m_download_queue.empty())
2494 picker.abort_download(m_download_queue.back().block);
2495 m_download_queue.pop_back();
2497 while (!m_request_queue.empty())
2499 picker.abort_download(m_request_queue.back());
2500 m_request_queue.pop_back();
2504 t->remove_peer(this);
2505 m_torrent.reset();
2508 #ifndef NDEBUG
2509 // since this connection doesn't have a torrent reference
2510 // no torrent should have a reference to this connection either
2511 for (aux::session_impl::torrent_map::const_iterator i = m_ses.m_torrents.begin()
2512 , end(m_ses.m_torrents.end()); i != end; ++i)
2513 TORRENT_ASSERT(!i->second->has_peer(this));
2514 #endif
2516 m_disconnecting = true;
2517 error_code ec;
2518 m_socket->close(ec);
2519 m_ses.close_connection(this, message);
2522 void peer_connection::set_upload_limit(int limit)
2524 TORRENT_ASSERT(limit >= -1);
2525 if (limit == -1) limit = (std::numeric_limits<int>::max)();
2526 if (limit < 10) limit = 10;
2527 m_upload_limit = limit;
2528 m_bandwidth_limit[upload_channel].throttle(m_upload_limit);
2531 void peer_connection::set_download_limit(int limit)
2533 TORRENT_ASSERT(limit >= -1);
2534 if (limit == -1) limit = (std::numeric_limits<int>::max)();
2535 if (limit < 10) limit = 10;
2536 m_download_limit = limit;
2537 m_bandwidth_limit[download_channel].throttle(m_download_limit);
2540 size_type peer_connection::share_diff() const
2542 INVARIANT_CHECK;
2544 boost::shared_ptr<torrent> t = m_torrent.lock();
2545 TORRENT_ASSERT(t);
2547 float ratio = t->ratio();
2549 // if we have an infinite ratio, just say we have downloaded
2550 // much more than we have uploaded. And we'll keep uploading.
2551 if (ratio == 0.f)
2552 return (std::numeric_limits<size_type>::max)();
2554 return m_free_upload
2555 + static_cast<size_type>(m_statistics.total_payload_download() * ratio)
2556 - m_statistics.total_payload_upload();
2559 // defined in upnp.cpp
2560 bool is_local(address const& a);
2562 bool peer_connection::on_local_network() const
2564 if (libtorrent::is_local(m_remote.address())
2565 || is_loopback(m_remote.address())) return true;
2566 return false;
2569 void peer_connection::get_peer_info(peer_info& p) const
2571 TORRENT_ASSERT(!associated_torrent().expired());
2573 ptime now = time_now();
2575 p.download_rate_peak = m_download_rate_peak;
2576 p.upload_rate_peak = m_upload_rate_peak;
2577 p.rtt = m_rtt;
2578 p.down_speed = statistics().download_rate();
2579 p.up_speed = statistics().upload_rate();
2580 p.payload_down_speed = statistics().download_payload_rate();
2581 p.payload_up_speed = statistics().upload_payload_rate();
2582 p.pid = pid();
2583 p.ip = remote();
2584 p.pending_disk_bytes = m_outstanding_writing_bytes;
2585 p.send_quota = m_bandwidth_limit[upload_channel].quota_left();
2586 p.receive_quota = m_bandwidth_limit[download_channel].quota_left();
2587 if (m_download_queue.empty()) p.request_timeout = -1;
2588 else p.request_timeout = total_seconds(m_requested - now) + m_ses.settings().request_timeout
2589 + m_timeout_extend;
2590 #ifndef TORRENT_DISABLE_GEO_IP
2591 p.inet_as_name = m_inet_as_name;
2592 #endif
2594 #ifndef TORRENT_DISABLE_RESOLVE_COUNTRIES
2595 p.country[0] = m_country[0];
2596 p.country[1] = m_country[1];
2597 #endif
2599 p.total_download = statistics().total_payload_download();
2600 p.total_upload = statistics().total_payload_upload();
2602 if (m_bandwidth_limit[upload_channel].throttle() == bandwidth_limit::inf)
2603 p.upload_limit = -1;
2604 else
2605 p.upload_limit = m_bandwidth_limit[upload_channel].throttle();
2607 if (m_bandwidth_limit[download_channel].throttle() == bandwidth_limit::inf)
2608 p.download_limit = -1;
2609 else
2610 p.download_limit = m_bandwidth_limit[download_channel].throttle();
2612 p.load_balancing = total_free_upload();
2614 p.download_queue_length = int(download_queue().size() + m_request_queue.size());
2615 p.requests_in_buffer = int(m_requests_in_buffer.size());
2616 p.target_dl_queue_length = int(desired_queue_size());
2617 p.upload_queue_length = int(upload_queue().size());
2619 if (boost::optional<piece_block_progress> ret = downloading_piece_progress())
2621 p.downloading_piece_index = ret->piece_index;
2622 p.downloading_block_index = ret->block_index;
2623 p.downloading_progress = ret->bytes_downloaded;
2624 p.downloading_total = ret->full_block_bytes;
2626 else
2628 p.downloading_piece_index = -1;
2629 p.downloading_block_index = -1;
2630 p.downloading_progress = 0;
2631 p.downloading_total = 0;
2634 p.pieces = get_bitfield();
2635 p.last_request = now - m_last_request;
2636 p.last_active = now - (std::max)(m_last_sent, m_last_receive);
2638 // this will set the flags so that we can update them later
2639 p.flags = 0;
2640 get_specific_peer_info(p);
2642 p.flags |= is_seed() ? peer_info::seed : 0;
2643 p.flags |= m_snubbed ? peer_info::snubbed : 0;
2644 p.flags |= m_upload_only ? peer_info::upload_only : 0;
2645 if (peer_info_struct())
2647 policy::peer* pi = peer_info_struct();
2648 p.source = pi->source;
2649 p.failcount = pi->failcount;
2650 p.num_hashfails = pi->hashfails;
2651 p.flags |= pi->on_parole ? peer_info::on_parole : 0;
2652 p.flags |= pi->optimistically_unchoked ? peer_info::optimistic_unchoke : 0;
2653 #ifndef TORRENT_DISABLE_GEO_IP
2654 p.inet_as = pi->inet_as->first;
2655 #endif
2657 else
2659 p.source = 0;
2660 p.failcount = 0;
2661 p.num_hashfails = 0;
2662 p.remote_dl_rate = 0;
2663 #ifndef TORRENT_DISABLE_GEO_IP
2664 p.inet_as = 0xffff;
2665 #endif
2668 p.remote_dl_rate = m_remote_dl_rate;
2669 p.send_buffer_size = m_send_buffer.capacity();
2670 p.used_send_buffer = m_send_buffer.size();
2671 p.receive_buffer_size = m_recv_buffer.capacity() + m_disk_recv_buffer_size;
2672 p.used_receive_buffer = m_recv_pos;
2673 p.write_state = m_channel_state[upload_channel];
2674 p.read_state = m_channel_state[download_channel];
2676 p.progress = (float)p.pieces.count() / (float)p.pieces.size();
2679 // allocates a disk buffer of size 'disk_buffer_size' and replaces the
2680 // end of the current receive buffer with it. i.e. the receive pos
2681 // must be <= packet_size - disk_buffer_size
2682 // the disk buffer can be accessed through release_disk_receive_buffer()
2683 // when it is queried, the responsibility to free it is transferred
2684 // to the caller
2685 bool peer_connection::allocate_disk_receive_buffer(int disk_buffer_size)
2687 INVARIANT_CHECK;
2689 TORRENT_ASSERT(m_packet_size > 0);
2690 TORRENT_ASSERT(m_recv_pos <= m_packet_size - disk_buffer_size);
2691 TORRENT_ASSERT(!m_disk_recv_buffer);
2692 TORRENT_ASSERT(disk_buffer_size <= 16 * 1024);
2694 if (disk_buffer_size > 16 * 1024)
2696 disconnect("invalid piece size", 2);
2697 return false;
2700 m_disk_recv_buffer.reset(m_ses.allocate_disk_buffer());
2701 if (!m_disk_recv_buffer)
2703 disconnect("out of memory");
2704 return false;
2706 m_disk_recv_buffer_size = disk_buffer_size;
2707 return true;
2710 char* peer_connection::release_disk_receive_buffer()
2712 m_disk_recv_buffer_size = 0;
2713 return m_disk_recv_buffer.release();
2716 void peer_connection::cut_receive_buffer(int size, int packet_size)
2718 INVARIANT_CHECK;
2720 TORRENT_ASSERT(packet_size > 0);
2721 TORRENT_ASSERT(int(m_recv_buffer.size()) >= size);
2722 TORRENT_ASSERT(int(m_recv_buffer.size()) >= m_recv_pos);
2723 TORRENT_ASSERT(m_recv_pos >= size);
2725 if (size > 0)
2726 std::memmove(&m_recv_buffer[0], &m_recv_buffer[0] + size, m_recv_pos - size);
2728 m_recv_pos -= size;
2730 #ifndef NDEBUG
2731 std::fill(m_recv_buffer.begin() + m_recv_pos, m_recv_buffer.end(), 0);
2732 #endif
2734 m_packet_size = packet_size;
2737 void peer_connection::calc_ip_overhead()
2739 m_statistics.calc_ip_overhead();
2742 void peer_connection::second_tick(float tick_interval)
2744 ptime now(time_now());
2745 boost::intrusive_ptr<peer_connection> me(self());
2747 // the invariant check must be run before me is destructed
2748 // in case the peer got disconnected
2749 INVARIANT_CHECK;
2751 boost::shared_ptr<torrent> t = m_torrent.lock();
2752 if (!t || m_disconnecting)
2754 m_ses.m_half_open.done(m_connection_ticket);
2755 m_connecting = false;
2756 disconnect("torrent aborted");
2757 return;
2760 on_tick();
2762 #ifndef TORRENT_DISABLE_EXTENSIONS
2763 for (extension_list_t::iterator i = m_extensions.begin()
2764 , end(m_extensions.end()); i != end; ++i)
2766 (*i)->tick();
2768 if (is_disconnecting()) return;
2769 #endif
2771 // if the peer hasn't said a thing for a certain
2772 // time, it is considered to have timed out
2773 time_duration d;
2774 d = now - m_last_receive;
2775 if (d > seconds(m_timeout) && !m_connecting)
2777 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
2778 (*m_logger) << time_now_string() << " *** LAST ACTIVITY [ "
2779 << total_seconds(d) << " seconds ago ] ***\n";
2780 #endif
2781 disconnect("timed out: inactivity");
2782 return;
2785 // do not stall waiting for a handshake
2786 if (!m_connecting
2787 && in_handshake()
2788 && d > seconds(m_ses.settings().handshake_timeout))
2790 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
2791 (*m_logger) << time_now_string() << " *** NO HANDSHAKE [ waited "
2792 << total_seconds(d) << " seconds ] ***\n";
2793 #endif
2794 disconnect("timed out: no handshake");
2795 return;
2798 // disconnect peers that we unchoked, but
2799 // they didn't send a request within 20 seconds.
2800 // but only if we're a seed
2801 d = now - (std::max)(m_last_unchoke, m_last_incoming_request);
2802 if (!m_connecting
2803 && m_requests.empty()
2804 && !m_choked
2805 && m_peer_interested
2806 && t && t->is_finished()
2807 && d > seconds(20))
2809 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
2810 (*m_logger) << time_now_string() << " *** NO REQUEST [ t: "
2811 << total_seconds(d) << " ] ***\n";
2812 #endif
2813 disconnect("timed out: no request when unchoked");
2814 return;
2817 // if the peer hasn't become interested and we haven't
2818 // become interested in the peer for 10 minutes, it
2819 // has also timed out.
2820 time_duration d1;
2821 time_duration d2;
2822 d1 = now - m_became_uninterested;
2823 d2 = now - m_became_uninteresting;
2824 time_duration time_limit = seconds(
2825 m_ses.settings().inactivity_timeout);
2827 // don't bother disconnect peers we haven't been interested
2828 // in (and that hasn't been interested in us) for a while
2829 // unless we have used up all our connection slots
2830 if (!m_interesting
2831 && !m_peer_interested
2832 && d1 > time_limit
2833 && d2 > time_limit
2834 && (m_ses.num_connections() >= m_ses.max_connections()
2835 || (t && t->num_peers() >= t->max_connections())))
2837 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
2838 (*m_logger) << time_now_string() << " *** MUTUAL NO INTEREST [ "
2839 "t1: " << total_seconds(d1) << " | "
2840 "t2: " << total_seconds(d2) << " ] ***\n";
2841 #endif
2842 disconnect("timed out: no interest");
2843 return;
2846 if (!m_download_queue.empty()
2847 && now > m_requested + seconds(m_ses.settings().request_timeout
2848 + m_timeout_extend))
2850 snub_peer();
2853 // if we haven't sent something in too long, send a keep-alive
2854 keep_alive();
2856 m_ignore_bandwidth_limits = m_ses.settings().ignore_limits_on_local_network
2857 && on_local_network();
2859 m_statistics.second_tick(tick_interval);
2861 if (m_statistics.upload_payload_rate() > m_upload_rate_peak)
2863 m_upload_rate_peak = m_statistics.upload_payload_rate();
2865 if (m_statistics.download_payload_rate() > m_download_rate_peak)
2867 m_download_rate_peak = m_statistics.download_payload_rate();
2868 #ifndef TORRENT_DISABLE_GEO_IP
2869 if (peer_info_struct())
2871 std::pair<const int, int>* as_stats = peer_info_struct()->inet_as;
2872 if (as_stats && as_stats->second < m_download_rate_peak)
2873 as_stats->second = m_download_rate_peak;
2875 #endif
2877 if (is_disconnecting()) return;
2879 if (!t->ready_for_connections()) return;
2881 // calculate the desired download queue size
2882 const float queue_time = m_ses.settings().request_queue_time;
2883 // (if the latency is more than this, the download will stall)
2884 // so, the queue size is queue_time * down_rate / 16 kiB
2885 // (16 kB is the size of each request)
2886 // the minimum number of requests is 2 and the maximum is 48
2887 // the block size doesn't have to be 16. So we first query the
2888 // torrent for it
2889 const int block_size = m_request_large_blocks
2890 ? t->torrent_file().piece_length() : t->block_size();
2891 TORRENT_ASSERT(block_size > 0);
2893 if (m_snubbed)
2895 m_desired_queue_size = 1;
2897 else
2899 m_desired_queue_size = static_cast<int>(queue_time
2900 * statistics().download_rate() / block_size);
2901 if (m_desired_queue_size > m_max_out_request_queue)
2902 m_desired_queue_size = m_max_out_request_queue;
2903 if (m_desired_queue_size < min_request_queue)
2904 m_desired_queue_size = min_request_queue;
2906 if (m_desired_queue_size == m_max_out_request_queue
2907 && t->alerts().should_post<performance_alert>())
2909 t->alerts().post_alert(performance_alert(t->get_handle()
2910 , performance_alert::outstanding_request_limit_reached));
2914 if (!m_download_queue.empty()
2915 && now - m_last_piece > seconds(m_ses.settings().piece_timeout
2916 + m_timeout_extend))
2918 // this peer isn't sending the pieces we've
2919 // requested (this has been observed by BitComet)
2920 // in this case we'll clear our download queue and
2921 // re-request the blocks.
2922 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
2923 (*m_logger) << time_now_string()
2924 << " *** PIECE_REQUESTS TIMED OUT [ " << (int)m_download_queue.size()
2925 << " " << total_seconds(now - m_last_piece) << "] ***\n";
2926 #endif
2928 snub_peer();
2931 // If the client sends more data
2932 // we send it data faster, otherwise, slower.
2933 // It will also depend on how much data the
2934 // client has sent us. This is the mean to
2935 // maintain the share ratio given by m_ratio
2936 // with all peers.
2938 if (t->is_finished() || is_choked() || t->ratio() == 0.0f)
2940 // if we have downloaded more than one piece more
2941 // than we have uploaded OR if we are a seed
2942 // have an unlimited upload rate
2943 m_bandwidth_limit[upload_channel].throttle(m_upload_limit);
2945 else
2947 size_type bias = 0x10000 + 2 * t->block_size() + m_free_upload;
2949 double break_even_time = 15; // seconds.
2950 size_type have_uploaded = m_statistics.total_payload_upload();
2951 size_type have_downloaded = m_statistics.total_payload_download();
2952 double download_speed = m_statistics.download_rate();
2954 size_type soon_downloaded =
2955 have_downloaded + (size_type)(download_speed * break_even_time*1.5);
2957 if (t->ratio() != 1.f)
2958 soon_downloaded = (size_type)(soon_downloaded*(double)t->ratio());
2960 double upload_speed_limit = (std::min)((soon_downloaded - have_uploaded
2961 + bias) / break_even_time, double(m_upload_limit));
2963 upload_speed_limit = (std::min)(upload_speed_limit,
2964 (double)(std::numeric_limits<int>::max)());
2966 m_bandwidth_limit[upload_channel].throttle(
2967 (std::min)((std::max)((int)upload_speed_limit, 20)
2968 , m_upload_limit));
2971 // update once every minute
2972 if (now - m_remote_dl_update >= seconds(60))
2974 float factor = 0.6666666666667f;
2976 if (m_remote_dl_rate == 0) factor = 0.0f;
2978 m_remote_dl_rate = int((m_remote_dl_rate * factor) +
2979 ((m_remote_bytes_dled * (1.0f-factor)) / 60.f));
2981 m_remote_bytes_dled = 0;
2982 m_remote_dl_update = now;
2985 fill_send_buffer();
2988 void peer_connection::snub_peer()
2990 INVARIANT_CHECK;
2992 boost::shared_ptr<torrent> t = m_torrent.lock();
2993 TORRENT_ASSERT(t);
2995 if (!m_snubbed)
2997 m_snubbed = true;
2998 if (m_ses.m_alerts.should_post<peer_snubbed_alert>())
3000 m_ses.m_alerts.post_alert(peer_snubbed_alert(t->get_handle()
3001 , m_remote, m_peer_id));
3004 m_desired_queue_size = 1;
3006 if (on_parole())
3008 m_timeout_extend += m_ses.settings().request_timeout;
3009 return;
3011 if (!t->has_picker()) return;
3012 piece_picker& picker = t->picker();
3014 piece_block r(-1, -1);
3015 // time out the last request in the queue
3016 if (!m_request_queue.empty())
3018 r = m_request_queue.back();
3019 m_request_queue.pop_back();
3021 else
3023 TORRENT_ASSERT(!m_download_queue.empty());
3024 r = m_download_queue.back().block;
3026 // only time out a request if it blocks the piece
3027 // from being completed (i.e. no free blocks to
3028 // request from it)
3029 piece_picker::downloading_piece p;
3030 picker.piece_info(r.piece_index, p);
3031 int free_blocks = picker.blocks_in_piece(r.piece_index)
3032 - p.finished - p.writing - p.requested;
3033 if (free_blocks > 0)
3035 m_timeout_extend += m_ses.settings().request_timeout;
3036 return;
3039 if (m_ses.m_alerts.should_post<block_timeout_alert>())
3041 m_ses.m_alerts.post_alert(block_timeout_alert(t->get_handle()
3042 , remote(), pid(), r.block_index, r.piece_index));
3044 m_download_queue.pop_back();
3046 if (!m_download_queue.empty() || !m_request_queue.empty())
3047 m_timeout_extend += m_ses.settings().request_timeout;
3049 m_desired_queue_size = 2;
3050 request_a_block(*t, *this);
3051 m_desired_queue_size = 1;
3053 // abort the block after the new one has
3054 // been requested in order to prevent it from
3055 // picking the same block again, stalling the
3056 // same piece indefinitely.
3057 if (r != piece_block(-1, -1))
3058 picker.abort_download(r);
3060 send_block_requests();
3063 void peer_connection::fill_send_buffer()
3065 INVARIANT_CHECK;
3067 boost::shared_ptr<torrent> t = m_torrent.lock();
3068 if (!t) return;
3070 // only add new piece-chunks if the send buffer is small enough
3071 // otherwise there will be no end to how large it will be!
3073 int buffer_size_watermark = int(m_statistics.upload_rate()) / 2;
3074 if (buffer_size_watermark < 512) buffer_size_watermark = 512;
3075 else if (buffer_size_watermark > m_ses.settings().send_buffer_watermark)
3076 buffer_size_watermark = m_ses.settings().send_buffer_watermark;
3078 while (!m_requests.empty()
3079 && (send_buffer_size() + m_reading_bytes < buffer_size_watermark))
3081 TORRENT_ASSERT(t->ready_for_connections());
3082 peer_request& r = m_requests.front();
3084 TORRENT_ASSERT(r.piece >= 0);
3085 TORRENT_ASSERT(r.piece < (int)m_have_piece.size());
3086 TORRENT_ASSERT(t->have_piece(r.piece));
3087 TORRENT_ASSERT(r.start + r.length <= t->torrent_file().piece_size(r.piece));
3088 TORRENT_ASSERT(r.length > 0 && r.start >= 0);
3090 t->filesystem().async_read(r, bind(&peer_connection::on_disk_read_complete
3091 , self(), _1, _2, r));
3092 m_reading_bytes += r.length;
3094 m_requests.erase(m_requests.begin());
3098 void peer_connection::on_disk_read_complete(int ret, disk_io_job const& j, peer_request r)
3100 session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
3102 m_reading_bytes -= r.length;
3104 disk_buffer_holder buffer(m_ses, j.buffer);
3106 if (ret != r.length || m_torrent.expired())
3108 boost::shared_ptr<torrent> t = m_torrent.lock();
3109 if (!t)
3111 disconnect(j.str.c_str());
3112 return;
3115 if (t->alerts().should_post<file_error_alert>())
3116 t->alerts().post_alert(file_error_alert(j.error_file, t->get_handle(), j.str));
3117 t->set_error(j.str);
3118 t->pause();
3119 return;
3122 #ifdef TORRENT_VERBOSE_LOGGING
3123 (*m_logger) << time_now_string()
3124 << " ==> PIECE [ piece: " << r.piece << " | s: " << r.start
3125 << " | l: " << r.length << " ]\n";
3126 #endif
3128 write_piece(r, buffer);
3129 setup_send();
3132 void peer_connection::assign_bandwidth(int channel, int amount)
3134 session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
3136 #ifdef TORRENT_VERBOSE_LOGGING
3137 (*m_logger) << "bandwidth [ " << channel << " ] + " << amount << "\n";
3138 #endif
3140 m_bandwidth_limit[channel].assign(amount);
3141 TORRENT_ASSERT(m_channel_state[channel] == peer_info::bw_global);
3142 m_channel_state[channel] = peer_info::bw_idle;
3143 if (channel == upload_channel)
3145 setup_send();
3147 else if (channel == download_channel)
3149 setup_receive();
3153 void peer_connection::expire_bandwidth(int channel, int amount)
3155 session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
3157 m_bandwidth_limit[channel].expire(amount);
3158 if (channel == upload_channel)
3160 setup_send();
3162 else if (channel == download_channel)
3164 setup_receive();
3168 void peer_connection::setup_send()
3170 session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
3172 if (m_channel_state[upload_channel] != peer_info::bw_idle) return;
3174 shared_ptr<torrent> t = m_torrent.lock();
3176 if (m_bandwidth_limit[upload_channel].quota_left() == 0
3177 && !m_send_buffer.empty()
3178 && !m_connecting
3179 && t
3180 && !m_ignore_bandwidth_limits)
3182 // in this case, we have data to send, but no
3183 // bandwidth. So, we simply request bandwidth
3184 // from the torrent
3185 TORRENT_ASSERT(t);
3186 if (m_bandwidth_limit[upload_channel].max_assignable() > 0)
3188 int priority = is_interesting() * 2 + m_requests_in_buffer.size();
3189 // peers that we are not interested in are non-prioritized
3190 m_channel_state[upload_channel] = peer_info::bw_torrent;
3191 t->request_bandwidth(upload_channel, self()
3192 , m_send_buffer.size(), priority);
3193 #ifdef TORRENT_VERBOSE_LOGGING
3194 (*m_logger) << time_now_string() << " *** REQUEST_BANDWIDTH [ upload prio: "
3195 << priority << "]\n";
3196 #endif
3199 return;
3202 if (!can_write())
3204 #ifdef TORRENT_VERBOSE_LOGGING
3205 (*m_logger) << time_now_string() << " *** CANNOT WRITE ["
3206 " quota: " << m_bandwidth_limit[download_channel].quota_left() <<
3207 " ignore: " << (m_ignore_bandwidth_limits?"yes":"no") <<
3208 " buf: " << m_send_buffer.size() <<
3209 " connecting: " << (m_connecting?"yes":"no") <<
3210 " ]\n";
3211 #endif
3212 return;
3215 // send the actual buffer
3216 if (!m_send_buffer.empty())
3218 int amount_to_send = m_send_buffer.size();
3219 int quota_left = m_bandwidth_limit[upload_channel].quota_left();
3220 if (!m_ignore_bandwidth_limits && amount_to_send > quota_left)
3221 amount_to_send = quota_left;
3223 TORRENT_ASSERT(amount_to_send > 0);
3225 #ifdef TORRENT_VERBOSE_LOGGING
3226 (*m_logger) << time_now_string() << " *** ASYNC_WRITE [ bytes: " << amount_to_send << " ]\n";
3227 #endif
3228 std::list<asio::const_buffer> const& vec = m_send_buffer.build_iovec(amount_to_send);
3229 m_socket->async_write_some(vec, bind(&peer_connection::on_send_data, self(), _1, _2));
3231 m_channel_state[upload_channel] = peer_info::bw_network;
3235 void peer_connection::setup_receive()
3237 session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
3239 INVARIANT_CHECK;
3241 if (m_channel_state[download_channel] != peer_info::bw_idle) return;
3243 shared_ptr<torrent> t = m_torrent.lock();
3245 if (m_bandwidth_limit[download_channel].quota_left() == 0
3246 && !m_connecting
3247 && t
3248 && !m_ignore_bandwidth_limits)
3250 if (m_bandwidth_limit[download_channel].max_assignable() > 0)
3252 #ifdef TORRENT_VERBOSE_LOGGING
3253 (*m_logger) << time_now_string() << " *** REQUEST_BANDWIDTH [ download ]\n";
3254 #endif
3255 TORRENT_ASSERT(m_channel_state[download_channel] == peer_info::bw_idle);
3256 m_channel_state[download_channel] = peer_info::bw_torrent;
3257 t->request_bandwidth(download_channel, self()
3258 , m_download_queue.size() * 16 * 1024 + 30, m_priority);
3260 return;
3263 if (!can_read())
3265 #ifdef TORRENT_VERBOSE_LOGGING
3266 (*m_logger) << time_now_string() << " *** CANNOT READ ["
3267 " quota: " << m_bandwidth_limit[download_channel].quota_left() <<
3268 " ignore: " << (m_ignore_bandwidth_limits?"yes":"no") <<
3269 " outstanding: " << m_outstanding_writing_bytes <<
3270 " outstanding-limit: " << m_ses.settings().max_outstanding_disk_bytes_per_connection <<
3271 " ]\n";
3272 #endif
3273 return;
3276 TORRENT_ASSERT(m_packet_size > 0);
3277 int max_receive = m_packet_size - m_recv_pos;
3278 int quota_left = m_bandwidth_limit[download_channel].quota_left();
3279 if (!m_ignore_bandwidth_limits && max_receive > quota_left)
3280 max_receive = quota_left;
3282 if (max_receive == 0) return;
3284 TORRENT_ASSERT(m_recv_pos >= 0);
3285 TORRENT_ASSERT(m_packet_size > 0);
3286 TORRENT_ASSERT(can_read());
3287 #ifdef TORRENT_VERBOSE_LOGGING
3288 (*m_logger) << time_now_string() << " *** ASYNC_READ [ max: " << max_receive << " bytes ]\n";
3289 #endif
3291 int regular_buffer_size = m_packet_size - m_disk_recv_buffer_size;
3293 if (int(m_recv_buffer.size()) < regular_buffer_size)
3294 m_recv_buffer.resize(regular_buffer_size);
3296 if (!m_disk_recv_buffer || regular_buffer_size >= m_recv_pos + max_receive)
3298 // only receive into regular buffer
3299 TORRENT_ASSERT(m_recv_pos + max_receive <= int(m_recv_buffer.size()));
3300 m_socket->async_read_some(asio::buffer(&m_recv_buffer[m_recv_pos]
3301 , max_receive), bind(&peer_connection::on_receive_data, self(), _1, _2));
3303 else if (m_recv_pos >= regular_buffer_size)
3305 // only receive into disk buffer
3306 TORRENT_ASSERT(m_recv_pos - regular_buffer_size >= 0);
3307 TORRENT_ASSERT(m_recv_pos - regular_buffer_size + max_receive <= m_disk_recv_buffer_size);
3308 m_socket->async_read_some(asio::buffer(m_disk_recv_buffer.get() + m_recv_pos - regular_buffer_size
3309 , max_receive)
3310 , bind(&peer_connection::on_receive_data, self(), _1, _2));
3312 else
3314 // receive into both regular and disk buffer
3315 TORRENT_ASSERT(max_receive + m_recv_pos > regular_buffer_size);
3316 TORRENT_ASSERT(m_recv_pos < regular_buffer_size);
3317 TORRENT_ASSERT(max_receive - regular_buffer_size
3318 + m_recv_pos <= m_disk_recv_buffer_size);
3320 boost::array<asio::mutable_buffer, 2> vec;
3321 vec[0] = asio::buffer(&m_recv_buffer[m_recv_pos]
3322 , regular_buffer_size - m_recv_pos);
3323 vec[1] = asio::buffer(m_disk_recv_buffer.get()
3324 , max_receive - regular_buffer_size + m_recv_pos);
3325 m_socket->async_read_some(vec, bind(&peer_connection::on_receive_data
3326 , self(), _1, _2));
3328 m_channel_state[download_channel] = peer_info::bw_network;
3331 #ifndef TORRENT_DISABLE_ENCRYPTION
3333 // returns the last 'bytes' from the receive buffer
3334 std::pair<buffer::interval, buffer::interval> peer_connection::wr_recv_buffers(int bytes)
3336 TORRENT_ASSERT(bytes <= m_recv_pos);
3338 std::pair<buffer::interval, buffer::interval> vec;
3339 int regular_buffer_size = m_packet_size - m_disk_recv_buffer_size;
3340 TORRENT_ASSERT(regular_buffer_size >= 0);
3341 if (!m_disk_recv_buffer || regular_buffer_size >= m_recv_pos)
3343 vec.first = buffer::interval(&m_recv_buffer[0]
3344 + m_recv_pos - bytes, &m_recv_buffer[0] + m_recv_pos);
3345 vec.second = buffer::interval(0,0);
3347 else if (m_recv_pos - bytes >= regular_buffer_size)
3349 vec.first = buffer::interval(m_disk_recv_buffer.get() + m_recv_pos
3350 - regular_buffer_size - bytes, m_disk_recv_buffer.get() + m_recv_pos
3351 - regular_buffer_size);
3352 vec.second = buffer::interval(0,0);
3354 else
3356 TORRENT_ASSERT(m_recv_pos - bytes < regular_buffer_size);
3357 TORRENT_ASSERT(m_recv_pos > regular_buffer_size);
3358 vec.first = buffer::interval(&m_recv_buffer[0] + m_recv_pos - bytes
3359 , &m_recv_buffer[0] + regular_buffer_size);
3360 vec.second = buffer::interval(m_disk_recv_buffer.get()
3361 , m_disk_recv_buffer.get() + m_recv_pos - regular_buffer_size);
3363 TORRENT_ASSERT(vec.first.left() + vec.second.left() == bytes);
3364 return vec;
3366 #endif
3368 void peer_connection::reset_recv_buffer(int packet_size)
3370 TORRENT_ASSERT(packet_size > 0);
3371 if (m_recv_pos > m_packet_size)
3373 cut_receive_buffer(m_packet_size, packet_size);
3374 return;
3376 m_recv_pos = 0;
3377 m_packet_size = packet_size;
3380 void peer_connection::send_buffer(char const* buf, int size, int flags)
3382 if (flags == message_type_request)
3383 m_requests_in_buffer.push_back(m_send_buffer.size() + size);
3385 int free_space = m_send_buffer.space_in_last_buffer();
3386 if (free_space > size) free_space = size;
3387 if (free_space > 0)
3389 m_send_buffer.append(buf, free_space);
3390 size -= free_space;
3391 buf += free_space;
3392 #ifdef TORRENT_STATS
3393 m_ses.m_buffer_usage_logger << log_time() << " send_buffer: "
3394 << free_space << std::endl;
3395 m_ses.log_buffer_usage();
3396 #endif
3398 if (size <= 0) return;
3400 std::pair<char*, int> buffer = m_ses.allocate_buffer(size);
3401 if (buffer.first == 0)
3403 disconnect("out of memory");
3404 return;
3406 TORRENT_ASSERT(buffer.second >= size);
3407 std::memcpy(buffer.first, buf, size);
3408 m_send_buffer.append_buffer(buffer.first, buffer.second, size
3409 , bind(&session_impl::free_buffer, boost::ref(m_ses), _1, buffer.second));
3410 #ifdef TORRENT_STATS
3411 m_ses.m_buffer_usage_logger << log_time() << " send_buffer_alloc: " << size << std::endl;
3412 m_ses.log_buffer_usage();
3413 #endif
3414 setup_send();
3417 // TODO: change this interface to automatically call setup_send() when the
3418 // return value is destructed
3419 buffer::interval peer_connection::allocate_send_buffer(int size)
3421 TORRENT_ASSERT(size > 0);
3422 char* insert = m_send_buffer.allocate_appendix(size);
3423 if (insert == 0)
3425 std::pair<char*, int> buffer = m_ses.allocate_buffer(size);
3426 if (buffer.first == 0)
3428 disconnect("out of memory");
3429 return buffer::interval(0, 0);
3431 TORRENT_ASSERT(buffer.second >= size);
3432 m_send_buffer.append_buffer(buffer.first, buffer.second, size
3433 , bind(&session_impl::free_buffer, boost::ref(m_ses), _1, buffer.second));
3434 buffer::interval ret(buffer.first, buffer.first + size);
3435 #ifdef TORRENT_STATS
3436 m_ses.m_buffer_usage_logger << log_time() << " allocate_buffer_alloc: " << size << std::endl;
3437 m_ses.log_buffer_usage();
3438 #endif
3439 return ret;
3441 else
3443 #ifdef TORRENT_STATS
3444 m_ses.m_buffer_usage_logger << log_time() << " allocate_buffer: " << size << std::endl;
3445 m_ses.log_buffer_usage();
3446 #endif
3447 buffer::interval ret(insert, insert + size);
3448 return ret;
3452 template<class T>
3453 struct set_to_zero
3455 set_to_zero(T& v, bool cond): m_val(v), m_cond(cond) {}
3456 void fire() { if (!m_cond) return; m_cond = false; m_val = 0; }
3457 ~set_to_zero() { if (m_cond) m_val = 0; }
3458 private:
3459 T& m_val;
3460 bool m_cond;
3463 // --------------------------
3464 // RECEIVE DATA
3465 // --------------------------
3467 // throws exception when the client should be disconnected
3468 void peer_connection::on_receive_data(const error_code& error
3469 , std::size_t bytes_transferred)
3471 session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
3473 INVARIANT_CHECK;
3475 TORRENT_ASSERT(m_channel_state[download_channel] == peer_info::bw_network);
3476 m_channel_state[download_channel] = peer_info::bw_idle;
3478 if (error)
3480 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
3481 (*m_logger) << time_now_string() << " **ERROR**: "
3482 << error.message() << "[in peer_connection::on_receive_data]\n";
3483 #endif
3484 on_receive(error, bytes_transferred);
3485 disconnect(error.message().c_str());
3486 return;
3489 int max_receive = 0;
3492 #ifdef TORRENT_VERBOSE_LOGGING
3493 (*m_logger) << "read " << bytes_transferred << " bytes\n";
3494 #endif
3495 // correct the dl quota usage, if not all of the buffer was actually read
3496 if (!m_ignore_bandwidth_limits)
3497 m_bandwidth_limit[download_channel].use_quota(bytes_transferred);
3499 if (m_disconnecting) return;
3501 TORRENT_ASSERT(m_packet_size > 0);
3502 TORRENT_ASSERT(bytes_transferred > 0);
3504 m_last_receive = time_now();
3505 m_recv_pos += bytes_transferred;
3506 TORRENT_ASSERT(m_recv_pos <= int(m_recv_buffer.size()
3507 + m_disk_recv_buffer_size));
3509 #ifndef NDEBUG
3510 size_type cur_payload_dl = m_statistics.last_payload_downloaded();
3511 size_type cur_protocol_dl = m_statistics.last_protocol_downloaded();
3512 #endif
3513 on_receive(error, bytes_transferred);
3514 #ifndef NDEBUG
3515 TORRENT_ASSERT(m_statistics.last_payload_downloaded() - cur_payload_dl >= 0);
3516 TORRENT_ASSERT(m_statistics.last_protocol_downloaded() - cur_protocol_dl >= 0);
3517 size_type stats_diff = m_statistics.last_payload_downloaded() - cur_payload_dl +
3518 m_statistics.last_protocol_downloaded() - cur_protocol_dl;
3519 TORRENT_ASSERT(stats_diff == bytes_transferred);
3520 #endif
3522 TORRENT_ASSERT(m_packet_size > 0);
3524 if (m_peer_choked
3525 && m_recv_pos == 0
3526 && (m_recv_buffer.capacity() - m_packet_size) > 128)
3528 buffer(m_packet_size).swap(m_recv_buffer);
3531 max_receive = m_packet_size - m_recv_pos;
3532 int quota_left = m_bandwidth_limit[download_channel].quota_left();
3533 if (!m_ignore_bandwidth_limits && max_receive > quota_left)
3534 max_receive = quota_left;
3536 if (max_receive == 0) break;
3538 int regular_buffer_size = m_packet_size - m_disk_recv_buffer_size;
3540 if (int(m_recv_buffer.size()) < regular_buffer_size)
3541 m_recv_buffer.resize(regular_buffer_size);
3543 error_code ec;
3544 if (!m_disk_recv_buffer || regular_buffer_size >= m_recv_pos + max_receive)
3546 // only receive into regular buffer
3547 TORRENT_ASSERT(m_recv_pos + max_receive <= int(m_recv_buffer.size()));
3548 bytes_transferred = m_socket->read_some(asio::buffer(&m_recv_buffer[m_recv_pos]
3549 , max_receive), ec);
3551 else if (m_recv_pos >= regular_buffer_size)
3553 // only receive into disk buffer
3554 TORRENT_ASSERT(m_recv_pos - regular_buffer_size >= 0);
3555 TORRENT_ASSERT(m_recv_pos - regular_buffer_size + max_receive <= m_disk_recv_buffer_size);
3556 bytes_transferred = m_socket->read_some(asio::buffer(m_disk_recv_buffer.get()
3557 + m_recv_pos - regular_buffer_size, (std::min)(m_packet_size
3558 - m_recv_pos, max_receive)), ec);
3560 else
3562 // receive into both regular and disk buffer
3563 TORRENT_ASSERT(max_receive + m_recv_pos > regular_buffer_size);
3564 TORRENT_ASSERT(m_recv_pos < regular_buffer_size);
3565 TORRENT_ASSERT(max_receive - regular_buffer_size
3566 + m_recv_pos <= m_disk_recv_buffer_size);
3568 boost::array<asio::mutable_buffer, 2> vec;
3569 vec[0] = asio::buffer(&m_recv_buffer[m_recv_pos]
3570 , regular_buffer_size - m_recv_pos);
3571 vec[1] = asio::buffer(m_disk_recv_buffer.get()
3572 , (std::min)(m_disk_recv_buffer_size
3573 , max_receive - regular_buffer_size + m_recv_pos));
3574 bytes_transferred = m_socket->read_some(vec, ec);
3576 if (ec && ec != asio::error::would_block)
3578 disconnect(ec.message().c_str());
3579 return;
3581 if (ec == asio::error::would_block) break;
3583 while (bytes_transferred > 0);
3585 setup_receive();
3588 bool peer_connection::can_write() const
3590 // if we have requests or pending data to be sent or announcements to be made
3591 // we want to send data
3592 return !m_send_buffer.empty()
3593 && (m_bandwidth_limit[upload_channel].quota_left() > 0
3594 || m_ignore_bandwidth_limits)
3595 && !m_connecting;
3598 bool peer_connection::can_read() const
3600 bool ret = (m_bandwidth_limit[download_channel].quota_left() > 0
3601 || m_ignore_bandwidth_limits)
3602 && !m_connecting
3603 && m_outstanding_writing_bytes <
3604 m_ses.settings().max_outstanding_disk_bytes_per_connection;
3606 return ret;
3609 void peer_connection::connect(int ticket)
3611 INVARIANT_CHECK;
3613 error_code ec;
3614 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_LOGGING || defined TORRENT_ERROR_LOGGING
3615 (*m_ses.m_logger) << time_now_string() << " CONNECTING: " << m_remote.address().to_string(ec)
3616 << ":" << m_remote.port() << "\n";
3617 #endif
3619 m_connection_ticket = ticket;
3620 boost::shared_ptr<torrent> t = m_torrent.lock();
3622 m_queued = false;
3623 TORRENT_ASSERT(m_connecting);
3625 if (!t)
3627 disconnect("torrent aborted");
3628 return;
3631 m_socket->open(t->get_interface().protocol(), ec);
3632 if (ec)
3634 disconnect(ec.message().c_str());
3635 return;
3638 // set the socket to non-blocking, so that we can
3639 // read the entire buffer on each read event we get
3640 tcp::socket::non_blocking_io ioc(true);
3641 m_socket->io_control(ioc, ec);
3642 if (ec)
3644 disconnect(ec.message().c_str());
3645 return;
3648 tcp::endpoint bind_interface = t->get_interface();
3650 std::pair<int, int> const& out_ports = m_ses.settings().outgoing_ports;
3651 if (out_ports.first > 0 && out_ports.second >= out_ports.first)
3653 m_socket->set_option(socket_acceptor::reuse_address(true), ec);
3654 if (ec)
3656 disconnect(ec.message().c_str());
3657 return;
3659 bind_interface.port(m_ses.next_port());
3662 m_socket->bind(bind_interface, ec);
3663 if (ec)
3665 disconnect(ec.message().c_str());
3666 return;
3668 m_socket->async_connect(m_remote
3669 , bind(&peer_connection::on_connection_complete, self(), _1));
3670 m_connect = time_now();
3671 m_statistics.sent_syn();
3673 if (t->alerts().should_post<peer_connect_alert>())
3675 t->alerts().post_alert(peer_connect_alert(
3676 t->get_handle(), remote(), pid()));
3680 void peer_connection::on_connection_complete(error_code const& e)
3682 ptime completed = time_now();
3684 session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
3686 INVARIANT_CHECK;
3688 m_rtt = total_milliseconds(completed - m_connect);
3690 if (m_disconnecting) return;
3692 m_connecting = false;
3693 m_ses.m_half_open.done(m_connection_ticket);
3695 if (e)
3697 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_LOGGING || defined TORRENT_ERROR_LOGGING
3698 (*m_ses.m_logger) << time_now_string() << " CONNECTION FAILED: " << m_remote.address().to_string()
3699 << ": " << e.message() << "\n";
3700 #endif
3701 disconnect(e.message().c_str(), 1);
3702 return;
3705 if (m_disconnecting) return;
3706 m_last_receive = time_now();
3708 // this means the connection just succeeded
3710 m_statistics.received_synack();
3712 TORRENT_ASSERT(m_socket);
3713 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_LOGGING
3714 (*m_ses.m_logger) << time_now_string() << " COMPLETED: " << m_remote.address().to_string()
3715 << " rtt = " << m_rtt << "\n";
3716 #endif
3718 error_code ec;
3719 if (m_remote == m_socket->local_endpoint(ec))
3721 // if the remote endpoint is the same as the local endpoint, we're connected
3722 // to ourselves
3723 disconnect("connected to ourselves", 1);
3724 return;
3727 if (m_remote.address().is_v4())
3729 error_code ec;
3730 m_socket->set_option(type_of_service(m_ses.settings().peer_tos), ec);
3733 on_connected();
3734 setup_send();
3735 setup_receive();
3738 // --------------------------
3739 // SEND DATA
3740 // --------------------------
3742 // throws exception when the client should be disconnected
3743 void peer_connection::on_send_data(error_code const& error
3744 , std::size_t bytes_transferred)
3746 session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
3748 INVARIANT_CHECK;
3750 TORRENT_ASSERT(m_channel_state[upload_channel] == peer_info::bw_network);
3752 m_send_buffer.pop_front(bytes_transferred);
3754 for (std::vector<int>::iterator i = m_requests_in_buffer.begin()
3755 , end(m_requests_in_buffer.end()); i != end; ++i)
3756 *i -= bytes_transferred;
3758 while (!m_requests_in_buffer.empty()
3759 && m_requests_in_buffer.front() <= 0)
3760 m_requests_in_buffer.erase(m_requests_in_buffer.begin());
3762 m_channel_state[upload_channel] = peer_info::bw_idle;
3764 if (!m_ignore_bandwidth_limits)
3765 m_bandwidth_limit[upload_channel].use_quota(bytes_transferred);
3767 #ifdef TORRENT_VERBOSE_LOGGING
3768 (*m_logger) << "wrote " << bytes_transferred << " bytes\n";
3769 #endif
3771 if (error)
3773 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
3774 (*m_logger) << "**ERROR**: " << error.message() << " [in peer_connection::on_send_data]\n";
3775 #endif
3776 disconnect(error.message().c_str());
3777 return;
3779 if (m_disconnecting) return;
3781 TORRENT_ASSERT(!m_connecting);
3782 TORRENT_ASSERT(bytes_transferred > 0);
3784 m_last_sent = time_now();
3786 #ifndef NDEBUG
3787 size_type cur_payload_ul = m_statistics.last_payload_uploaded();
3788 size_type cur_protocol_ul = m_statistics.last_protocol_uploaded();
3789 #endif
3790 on_sent(error, bytes_transferred);
3791 #ifndef NDEBUG
3792 TORRENT_ASSERT(m_statistics.last_payload_uploaded() - cur_payload_ul >= 0);
3793 TORRENT_ASSERT(m_statistics.last_protocol_uploaded() - cur_protocol_ul >= 0);
3794 size_type stats_diff = m_statistics.last_payload_uploaded() - cur_payload_ul
3795 + m_statistics.last_protocol_uploaded() - cur_protocol_ul;
3796 TORRENT_ASSERT(stats_diff == bytes_transferred);
3797 #endif
3799 fill_send_buffer();
3801 setup_send();
3804 #ifndef NDEBUG
3805 void peer_connection::check_invariant() const
3807 TORRENT_ASSERT(bool(m_disk_recv_buffer) == (m_disk_recv_buffer_size > 0));
3809 boost::shared_ptr<torrent> t = m_torrent.lock();
3810 if (m_disconnecting)
3812 TORRENT_ASSERT(!t);
3813 TORRENT_ASSERT(m_disconnect_started);
3815 else if (!m_in_constructor)
3817 TORRENT_ASSERT(m_ses.has_peer((peer_connection*)this));
3821 // this assertion correct most of the time, but sometimes right when the
3822 // limit is changed it might break
3823 for (int i = 0; i < 2; ++i)
3825 // this peer is in the bandwidth history iff max_assignable < limit
3826 TORRENT_ASSERT((m_bandwidth_limit[i].max_assignable() < m_bandwidth_limit[i].throttle())
3827 == m_ses.m_bandwidth_manager[i]->is_in_history(this)
3828 || m_bandwidth_limit[i].throttle() == bandwidth_limit::inf);
3832 if (m_channel_state[download_channel] == peer_info::bw_torrent
3833 || m_channel_state[download_channel] == peer_info::bw_global)
3834 TORRENT_ASSERT(m_bandwidth_limit[download_channel].quota_left() == 0);
3835 if (m_channel_state[upload_channel] == peer_info::bw_torrent
3836 || m_channel_state[upload_channel] == peer_info::bw_global)
3837 TORRENT_ASSERT(m_bandwidth_limit[upload_channel].quota_left() == 0);
3839 std::set<piece_block> unique;
3840 std::transform(m_download_queue.begin(), m_download_queue.end()
3841 , std::inserter(unique, unique.begin()), boost::bind(&pending_block::block, _1));
3842 std::copy(m_request_queue.begin(), m_request_queue.end(), std::inserter(unique, unique.begin()));
3843 TORRENT_ASSERT(unique.size() == m_download_queue.size() + m_request_queue.size());
3844 if (m_peer_info)
3846 TORRENT_ASSERT(m_peer_info->prev_amount_upload == 0);
3847 TORRENT_ASSERT(m_peer_info->prev_amount_download == 0);
3848 TORRENT_ASSERT(m_peer_info->connection == this
3849 || m_peer_info->connection == 0);
3851 if (m_peer_info->optimistically_unchoked)
3852 TORRENT_ASSERT(!is_choked());
3855 TORRENT_ASSERT(m_have_piece.count() == m_num_pieces);
3857 if (!t)
3859 #ifdef TORRENT_EXPENSIVE_INVARIANT_CHECKS
3860 // since this connection doesn't have a torrent reference
3861 // no torrent should have a reference to this connection either
3862 for (aux::session_impl::torrent_map::const_iterator i = m_ses.m_torrents.begin()
3863 , end(m_ses.m_torrents.end()); i != end; ++i)
3864 TORRENT_ASSERT(!i->second->has_peer((peer_connection*)this));
3865 #endif
3866 return;
3869 if (t->ready_for_connections() && m_initialized)
3870 TORRENT_ASSERT(t->torrent_file().num_pieces() == int(m_have_piece.size()));
3872 if (m_ses.settings().close_redundant_connections)
3874 // make sure upload only peers are disconnected
3875 if (t->is_finished() && m_upload_only)
3876 TORRENT_ASSERT(m_disconnect_started);
3877 if (m_upload_only
3878 && !m_interesting
3879 && m_bitfield_received
3880 && t->are_files_checked())
3881 TORRENT_ASSERT(m_disconnect_started);
3884 if (!m_disconnect_started)
3886 // none of this matters if we're disconnecting anyway
3887 if (t->is_finished())
3888 TORRENT_ASSERT(!m_interesting);
3889 if (is_seed())
3890 TORRENT_ASSERT(m_upload_only);
3893 if (t->has_picker())
3895 std::map<piece_block, int> num_requests;
3896 for (torrent::const_peer_iterator i = t->begin(); i != t->end(); ++i)
3898 // make sure this peer is not a dangling pointer
3899 #ifdef TORRENT_EXPENSIVE_INVARIANT_CHECKS
3900 TORRENT_ASSERT(m_ses.has_peer(*i));
3901 #endif
3902 peer_connection const& p = *(*i);
3903 for (std::deque<piece_block>::const_iterator i = p.request_queue().begin()
3904 , end(p.request_queue().end()); i != end; ++i)
3905 ++num_requests[*i];
3906 for (std::deque<pending_block>::const_iterator i = p.download_queue().begin()
3907 , end(p.download_queue().end()); i != end; ++i)
3908 ++num_requests[i->block];
3910 for (std::map<piece_block, int>::iterator i = num_requests.begin()
3911 , end(num_requests.end()); i != end; ++i)
3913 if (!t->picker().is_downloaded(i->first))
3914 TORRENT_ASSERT(t->picker().num_peers(i->first) == i->second);
3917 #ifdef TORRENT_EXPENSIVE_INVARIANT_CHECKS
3918 if (m_peer_info)
3920 policy::const_iterator i = t->get_policy().begin_peer();
3921 policy::const_iterator end = t->get_policy().end_peer();
3922 for (; i != end; ++i)
3924 if (&i->second == m_peer_info) break;
3926 TORRENT_ASSERT(i != end);
3928 #endif
3929 if (t->has_picker() && !t->is_aborted())
3931 // make sure that pieces that have completed the download
3932 // of all their blocks are in the disk io thread's queue
3933 // to be checked.
3934 const std::vector<piece_picker::downloading_piece>& dl_queue
3935 = t->picker().get_download_queue();
3936 for (std::vector<piece_picker::downloading_piece>::const_iterator i =
3937 dl_queue.begin(); i != dl_queue.end(); ++i)
3939 const int blocks_per_piece = t->picker().blocks_in_piece(i->index);
3941 bool complete = true;
3942 for (int j = 0; j < blocks_per_piece; ++j)
3944 if (i->info[j].state == piece_picker::block_info::state_finished)
3945 continue;
3946 complete = false;
3947 break;
3950 // this invariant is not valid anymore since the completion event
3951 // might be queued in the io service
3952 if (complete && !piece_failed)
3954 disk_io_job ret = m_ses.m_disk_thread.find_job(
3955 &t->filesystem(), -1, i->index);
3956 TORRENT_ASSERT(ret.action == disk_io_job::hash || ret.action == disk_io_job::write);
3957 TORRENT_ASSERT(ret.piece == i->index);
3963 // extremely expensive invariant check
3965 if (!t->is_seed())
3967 piece_picker& p = t->picker();
3968 const std::vector<piece_picker::downloading_piece>& dlq = p.get_download_queue();
3969 const int blocks_per_piece = static_cast<int>(
3970 t->torrent_file().piece_length() / t->block_size());
3972 for (std::vector<piece_picker::downloading_piece>::const_iterator i =
3973 dlq.begin(); i != dlq.end(); ++i)
3975 for (int j = 0; j < blocks_per_piece; ++j)
3977 if (std::find(m_request_queue.begin(), m_request_queue.end()
3978 , piece_block(i->index, j)) != m_request_queue.end()
3980 std::find(m_download_queue.begin(), m_download_queue.end()
3981 , piece_block(i->index, j)) != m_download_queue.end())
3983 TORRENT_ASSERT(i->info[j].peer == m_remote);
3985 else
3987 TORRENT_ASSERT(i->info[j].peer != m_remote || i->info[j].finished);
3994 #endif
3996 peer_connection::peer_speed_t peer_connection::peer_speed()
3998 shared_ptr<torrent> t = m_torrent.lock();
3999 TORRENT_ASSERT(t);
4001 int download_rate = int(statistics().download_payload_rate());
4002 int torrent_download_rate = int(t->statistics().download_payload_rate());
4004 if (download_rate > 512 && download_rate > torrent_download_rate / 16)
4005 m_speed = fast;
4006 else if (download_rate > 4096 && download_rate > torrent_download_rate / 64)
4007 m_speed = medium;
4008 else if (download_rate < torrent_download_rate / 15 && m_speed == fast)
4009 m_speed = medium;
4010 else
4011 m_speed = slow;
4013 return m_speed;
4016 void peer_connection::keep_alive()
4018 INVARIANT_CHECK;
4020 time_duration d;
4021 d = time_now() - m_last_sent;
4022 if (total_seconds(d) < m_timeout / 2) return;
4024 if (m_connecting) return;
4025 if (in_handshake()) return;
4027 // if the last send has not completed yet, do not send a keep
4028 // alive
4029 if (m_channel_state[upload_channel] != peer_info::bw_idle) return;
4031 #ifdef TORRENT_VERBOSE_LOGGING
4032 (*m_logger) << time_now_string() << " ==> KEEPALIVE\n";
4033 #endif
4035 m_last_sent = time_now();
4036 write_keepalive();
4039 bool peer_connection::is_seed() const
4041 // if m_num_pieces == 0, we probably don't have the
4042 // metadata yet.
4043 boost::shared_ptr<torrent> t = m_torrent.lock();
4044 return m_num_pieces == (int)m_have_piece.size() && m_num_pieces > 0 && t && t->valid_metadata();