Test script to transfer data over GTP without a tracker
[VCS-Git-Torrent.git] / lib / VCS / Git / Torrent / Peer / Async / Connection.pm
blob170b4e7881e799824be83065a6f6f2a0451e3bf4
2 package VCS::Git::Torrent::Peer::Async::Connection;
4 =head1 NAME
6 VCS::Git::Torrent::Peer::Async::Connection - Coro-based connection to a remote node
8 =head1 SYNOPSIS
10 # incoming.
11 my $conn = VCS::Git::Torrent::Peer::Async::Connection->new
12 ( local => $local_peer,
13 socket => $socket,
16 # outgoing.
17 $conn = VCS::Git::Torrent::Peer::Async::Connection->new
18 ( local => $local_peer,
19 remote => $remote_peer,
21 $conn = VCS::Git::Torrent::Peer::Async::Connection->new
22 ( local => $local_peer,
23 remote_name => $address,
24 remote_port => $port,
27 =head1 DESCRIPTION
29 This class is used internally by L<VCS::Git::Torrent::Peer::Async>; it
30 is a class that implements the L<VCS::Git::Torrent::Peer::Connection>
31 role using Coro.
33 There is a co-routine which continually waits for messages and
34 processes them. It processes the message, which usually would involve
35 some kind of response, possibly though not always starting a new Coro
36 to do so. This usually does not involve much real blocking, due to OS
37 buffering of sockets etc.
39 =cut
41 use Moose;
43 with 'VCS::Git::Torrent::Peer::Connection';
44 with 'VCS::Git::Torrent::Peer::Async::Socket';
46 use strict;
47 use warnings;
48 use Carp;
50 use VCS::Git::Torrent::PWP qw(:pwp_constants pack_hex unpack_hex);
51 use constant RANDOM_BEHAVIOUR_BITS => "\0" x 8;
53 =head2 socket_args
55 Return a hash consisting of the remote addres, port, and protocol used.
57 =cut
59 sub socket_args {
60 my $self = shift;
61 ( PeerHost => $self->remote->address,
62 PeerPort => $self->remote->port,
63 Proto => "tcp",
67 sub _start {
68 my $self = shift;
70 my $incoming = $self->remote->from_addr;
71 $self->trace(sub{
72 ("processing ".
73 ($incoming?"call from $incoming"
74 :"call to ".$self->remote->address)) });
75 ( ( $self->remote->from_addr
76 ? $self->handshake && $self->send_handshake
77 : $self->send_handshake && $self->handshake )
78 && do { $self->recv_lock->unlock; 1 }
79 && do { $self->send_lock->unlock; 1 }
80 && $self->loop );
82 $self->local->connections_remove($self);
85 has 'error_when' =>
86 isa => "Str",
87 is => "rw";
89 has 'error_desc' =>
90 isa => "Str",
91 is => "rw";
93 =head2 fail($when, $description)
95 Debug fail sub.
97 =cut
99 sub fail {
100 my $self = shift;
101 my $when = shift;
102 my $description = shift;
103 $self->trace(sub {"failed: $description during $when"});
104 $self->error_when($when);
105 $self->error_desc($description);
106 $self->cancel;
107 return();
110 =head2 loop
112 Main async/Coro message-processing loop.
114 =cut
116 sub loop {
117 my $self = shift;
118 my $socket = $self->socket;
120 $self->local->send_message($self->remote, GTP_PWP_REELS);
121 $self->local->send_message($self->remote, GTP_PWP_REFERENCES);
123 $self->trace(sub {"main loop"});
124 while ( my $message = do {
125 $self->recv_lock->wrlock;
126 $self->trace(sub {"listening"});
127 $_ = VCS::Git::Torrent::PWP::Message->create_io($socket);
128 $self->recv_lock->unlock;
129 $_ }
132 $self->trace(sub {"got a message: $_ (self = $self)"});
133 $self->local->process($self, $message);
137 $self->shutdown;
140 =head2 send_handshake
142 Send a handshake to whatever we're connected to.
144 =cut
146 sub send_handshake {
147 my $self = shift;
148 confess "called from wrong coro" unless Coro::current() == $self->coro;
149 my $socket = $self->socket;
151 $self->trace(sub {"got a socket - $socket"});
153 my $handshake = join "",
154 (chr(length(GTP_PWP_PROTO_NAME)),
155 GTP_PWP_PROTO_NAME,
156 RANDOM_BEHAVIOUR_BITS,
157 pack_hex($self->local->repo_hash),
158 $self->local->peer_id);
160 $self->trace(sub {"sending handshake: $handshake"});
161 my $wrote = $socket->send($handshake);
162 $wrote == length($handshake)
163 or return $self->fail("handshake", "wrote only $wrote bytes");
166 =head2 handshake
168 Wait for a handshake from whatever we're connected to.
170 =cut
172 sub handshake {
173 my $self = shift;
174 confess "called from wrong coro" unless Coro::current() == $self->coro;
175 my $socket = $self->socket;
177 $self->trace(sub {"awaiting handshake"});
178 my $buf;
180 $socket->read($buf, 1)
181 or return $self->fail("handshake", "Nothing received");
183 my $proto_len = ord($buf);
184 if ( $proto_len != 7 ) {
185 return $self->fail("handshake", "Protocol mismatch");
187 $socket->read($buf, 7) == 7
188 or return $self->fail("handshake", "Short read");
190 if ( $buf ne GTP_PWP_PROTO_NAME ) {
191 return $self->fail("handshake", "Protocol mismatch");
194 $socket->read($buf, 8) == 8
195 or return $self->fail("handshake", "Short read");
196 if ( $buf ne RANDOM_BEHAVIOUR_BITS ) {
197 return $self->fail
198 ("handshake",
199 "This implementation is deterministic");
202 $socket->read($buf, 20) == 20
203 or return $self->fail("handshake", "Short read");
204 if ( unpack_hex($buf) ne $self->local->repo_hash ) {
205 return $self->fail("handshake", "Repository mismatch");
208 $socket->read($buf, 20) == 20
209 or return $self->fail("handshake", "Short read");
211 if ( $self->local->connected_to($buf) ) {
212 return $self->fail("handshake", "duplicate peer id");
214 $self->trace(sub {"now connected to $buf"});
215 $self->remote->peer_id($buf);
218 =head2 send_message
220 Pack a message and send it off.
222 =cut
224 sub send_message {
225 my $self = shift;
226 my $message = shift;
227 $self->send_lock->wrlock;
228 $message = $message->pack;
229 my $written = $self->socket->write($message);
230 ($written == length($message))
231 or return $self->fail
232 ("send_message",
233 "Short write - only $written of "
234 .length($message)." bytes written");
235 $self->send_lock->unlock;
238 =head2 shutdown
240 Your socket has performed an illegal operation and must be shut down.
242 =cut
244 sub shutdown {
245 my $self = shift;
246 if ( $self->{socket} ) {
247 $self->socket->shutdown;