2 package VCS
::Git
::Torrent
::Peer
::Async
::Connection
;
6 VCS::Git::Torrent::Peer::Async::Connection - Coro-based connection to a remote node
11 my $conn = VCS::Git::Torrent::Peer::Async::Connection->new
12 ( local => $local_peer,
17 $conn = VCS::Git::Torrent::Peer::Async::Connection->new
18 ( local => $local_peer,
19 remote => $remote_peer,
21 $conn = VCS::Git::Torrent::Peer::Async::Connection->new
22 ( local => $local_peer,
23 remote_name => $address,
29 This class is used internally by L<VCS::Git::Torrent::Peer::Async>; it
30 is a class that implements the L<VCS::Git::Torrent::Peer::Connection>
33 There is a co-routine which continually waits for messages and
34 processes them. It processes the message, which usually would involve
35 some kind of response, possibly though not always starting a new Coro
36 to do so. This usually does not involve much real blocking, due to OS
37 buffering of sockets etc.
43 with
'VCS::Git::Torrent::Peer::Connection';
44 with
'VCS::Git::Torrent::Peer::Async::Socket';
50 use VCS
::Git
::Torrent
::PWP
qw(:pwp_constants pack_hex unpack_hex);
51 use constant RANDOM_BEHAVIOUR_BITS
=> "\0" x
8;
55 Return a hash consisting of the remote addres, port, and protocol used.
61 ( PeerHost
=> $self->remote->address,
62 PeerPort
=> $self->remote->port,
70 my $incoming = $self->remote->from_addr;
73 ($incoming?
"call from $incoming"
74 :"call to ".$self->remote->address)) });
75 ( ( $self->remote->from_addr
76 ?
$self->handshake && $self->send_handshake
77 : $self->send_handshake && $self->handshake )
78 && do { $self->recv_lock->unlock; 1 }
79 && do { $self->send_lock->unlock; 1 }
82 $self->local->connections_remove($self);
93 =head2 fail($when, $description)
102 my $description = shift;
103 $self->trace(sub {"failed: $description during $when"});
104 $self->error_when($when);
105 $self->error_desc($description);
112 Main async/Coro message-processing loop.
118 my $socket = $self->socket;
120 $self->local->send_message($self->remote, GTP_PWP_REELS
);
121 $self->local->send_message($self->remote, GTP_PWP_REFERENCES
);
123 $self->trace(sub {"main loop"});
124 while ( my $message = do {
125 $self->recv_lock->wrlock;
126 $self->trace(sub {"listening"});
127 $_ = VCS
::Git
::Torrent
::PWP
::Message
->create_io($socket);
128 $self->recv_lock->unlock;
132 $self->trace(sub {"got a message: $_ (self = $self)"});
133 $self->local->process($self, $message);
140 =head2 send_handshake
142 Send a handshake to whatever we're connected to.
148 confess
"called from wrong coro" unless Coro
::current
() == $self->coro;
149 my $socket = $self->socket;
151 $self->trace(sub {"got a socket - $socket"});
153 my $handshake = join "",
154 (chr(length(GTP_PWP_PROTO_NAME
)),
156 RANDOM_BEHAVIOUR_BITS
,
157 pack_hex
($self->local->repo_hash),
158 $self->local->peer_id);
160 $self->trace(sub {"sending handshake: $handshake"});
161 my $wrote = $socket->send($handshake);
162 $wrote == length($handshake)
163 or return $self->fail("handshake", "wrote only $wrote bytes");
168 Wait for a handshake from whatever we're connected to.
174 confess
"called from wrong coro" unless Coro
::current
() == $self->coro;
175 my $socket = $self->socket;
177 $self->trace(sub {"awaiting handshake"});
180 $socket->read($buf, 1)
181 or return $self->fail("handshake", "Nothing received");
183 my $proto_len = ord($buf);
184 if ( $proto_len != 7 ) {
185 return $self->fail("handshake", "Protocol mismatch");
187 $socket->read($buf, 7) == 7
188 or return $self->fail("handshake", "Short read");
190 if ( $buf ne GTP_PWP_PROTO_NAME
) {
191 return $self->fail("handshake", "Protocol mismatch");
194 $socket->read($buf, 8) == 8
195 or return $self->fail("handshake", "Short read");
196 if ( $buf ne RANDOM_BEHAVIOUR_BITS
) {
199 "This implementation is deterministic");
202 $socket->read($buf, 20) == 20
203 or return $self->fail("handshake", "Short read");
204 if ( unpack_hex
($buf) ne $self->local->repo_hash ) {
205 return $self->fail("handshake", "Repository mismatch");
208 $socket->read($buf, 20) == 20
209 or return $self->fail("handshake", "Short read");
211 if ( $self->local->connected_to($buf) ) {
212 return $self->fail("handshake", "duplicate peer id");
214 $self->trace(sub {"now connected to $buf"});
215 $self->remote->peer_id($buf);
220 Pack a message and send it off.
227 $self->send_lock->wrlock;
228 $message = $message->pack;
229 my $written = $self->socket->write($message);
230 ($written == length($message))
231 or return $self->fail
233 "Short write - only $written of "
234 .length($message)." bytes written");
235 $self->send_lock->unlock;
240 Your socket has performed an illegal operation and must be shut down.
246 if ( $self->{socket} ) {
247 $self->socket->shutdown;