1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "mojo/services/network/tcp_connected_socket_impl.h"
7 #include "base/message_loop/message_loop.h"
8 #include "mojo/services/network/net_adapters.h"
9 #include "net/base/net_errors.h"
13 TCPConnectedSocketImpl::TCPConnectedSocketImpl(
14 scoped_ptr
<net::TCPSocket
> socket
,
15 ScopedDataPipeConsumerHandle send_stream
,
16 ScopedDataPipeProducerHandle receive_stream
,
17 InterfaceRequest
<TCPConnectedSocket
> request
)
18 : socket_(socket
.Pass()),
19 send_stream_(send_stream
.Pass()),
20 receive_stream_(receive_stream
.Pass()),
21 binding_(this, request
.Pass()),
22 weak_ptr_factory_(this) {
23 // Queue up async communication.
24 binding_
.set_error_handler(this);
25 ListenForReceivePeerClosed();
26 ListenForSendPeerClosed();
31 TCPConnectedSocketImpl::~TCPConnectedSocketImpl() {
34 void TCPConnectedSocketImpl::OnConnectionError() {
39 void TCPConnectedSocketImpl::ReceiveMore() {
40 DCHECK(!pending_receive_
.get());
43 MojoResult result
= NetToMojoPendingBuffer::BeginWrite(
44 &receive_stream_
, &pending_receive_
, &num_bytes
);
45 if (result
== MOJO_RESULT_SHOULD_WAIT
) {
46 // The pipe is full. We need to wait for it to have more space.
47 receive_handle_watcher_
.Start(
48 receive_stream_
.get(), MOJO_HANDLE_SIGNAL_WRITABLE
,
49 MOJO_DEADLINE_INDEFINITE
,
50 base::Bind(&TCPConnectedSocketImpl::OnReceiveStreamReady
,
51 weak_ptr_factory_
.GetWeakPtr()));
55 if (result
== MOJO_RESULT_FAILED_PRECONDITION
) {
56 // It's valid that the user of this class consumed the data they care about
57 // and closed their data pipe handles after writing data. This class should
58 // still write out all the data.
60 // TODO(johnmccutchan): Notify socket direction is closed along with
61 // net_result and mojo_result.
65 if (result
!= MOJO_RESULT_OK
) {
66 // The receive stream is in a bad state.
68 // TODO(johnmccutchan): Notify socket direction is closed along with
69 // net_result and mojo_result.
73 // Mojo is ready for the receive.
74 CHECK_GT(static_cast<uint32_t>(std::numeric_limits
<int>::max()), num_bytes
);
75 scoped_refptr
<net::IOBuffer
> buf(
76 new NetToMojoIOBuffer(pending_receive_
.get()));
78 socket_
->Read(buf
.get(), static_cast<int>(num_bytes
),
79 base::Bind(&TCPConnectedSocketImpl::DidReceive
,
80 weak_ptr_factory_
.GetWeakPtr(), false));
81 if (read_result
== net::ERR_IO_PENDING
) {
82 // Pending I/O, wait for result in DidReceive().
83 } else if (read_result
> 0) {
84 // Synchronous data ready.
85 DidReceive(true, read_result
);
87 // read_result == 0 indicates EOF.
88 // read_result < 0 indicates error.
90 // TODO(johnmccutchan): Notify socket direction is closed along with
91 // net_result and mojo_result.
95 void TCPConnectedSocketImpl::OnReceiveStreamReady(MojoResult result
) {
96 if (result
!= MOJO_RESULT_OK
) {
98 // TODO(johnmccutchan): Notify socket direction is closed along with
99 // net_result and mojo_result.
102 ListenForReceivePeerClosed();
106 void TCPConnectedSocketImpl::DidReceive(bool completed_synchronously
,
108 if (!pending_receive_
)
114 // TODO(johnmccutchan): Notify socket direction is closed along with
115 // net_result and mojo_result.
119 receive_stream_
= pending_receive_
->Complete(result
);
120 pending_receive_
= nullptr;
122 // Schedule more reading.
123 if (completed_synchronously
) {
124 // Don't recursively call ReceiveMore if this is a sync read.
125 base::MessageLoop::current()->PostTask(
126 FROM_HERE
, base::Bind(&TCPConnectedSocketImpl::ReceiveMore
,
127 weak_ptr_factory_
.GetWeakPtr()));
133 void TCPConnectedSocketImpl::ShutdownReceive() {
134 receive_handle_watcher_
.Stop();
135 pending_receive_
= nullptr;
136 receive_stream_
.reset();
140 void TCPConnectedSocketImpl::ListenForReceivePeerClosed() {
141 receive_handle_watcher_
.Start(
142 receive_stream_
.get(), MOJO_HANDLE_SIGNAL_PEER_CLOSED
,
143 MOJO_DEADLINE_INDEFINITE
,
144 base::Bind(&TCPConnectedSocketImpl::OnReceiveDataPipeClosed
,
145 weak_ptr_factory_
.GetWeakPtr()));
148 void TCPConnectedSocketImpl::OnReceiveDataPipeClosed(MojoResult result
) {
152 void TCPConnectedSocketImpl::SendMore() {
153 uint32_t num_bytes
= 0;
154 MojoResult result
= MojoToNetPendingBuffer::BeginRead(
155 &send_stream_
, &pending_send_
, &num_bytes
);
156 if (result
== MOJO_RESULT_SHOULD_WAIT
) {
157 // Data not ready, wait for it.
158 send_handle_watcher_
.Start(
159 send_stream_
.get(), MOJO_HANDLE_SIGNAL_READABLE
,
160 MOJO_DEADLINE_INDEFINITE
,
161 base::Bind(&TCPConnectedSocketImpl::OnSendStreamReady
,
162 weak_ptr_factory_
.GetWeakPtr()));
164 } else if (result
!= MOJO_RESULT_OK
) {
166 // TODO(johnmccutchan): Notify socket direction is closed along with
167 // net_result and mojo_result.
171 // Got a buffer from Mojo, give it to the socket. Note that the sockets may
172 // do partial writes.
173 scoped_refptr
<net::IOBuffer
> buf(new MojoToNetIOBuffer(pending_send_
.get()));
175 socket_
->Write(buf
.get(), static_cast<int>(num_bytes
),
176 base::Bind(&TCPConnectedSocketImpl::DidSend
,
177 weak_ptr_factory_
.GetWeakPtr(), false));
178 if (write_result
== net::ERR_IO_PENDING
) {
179 // Pending I/O, wait for result in DidSend().
180 } else if (write_result
>= 0) {
181 // Synchronous data consumed.
182 DidSend(true, write_result
);
184 // write_result < 0 indicates error.
186 // TODO(johnmccutchan): Notify socket direction is closed along with
187 // net_result and mojo_result.
191 void TCPConnectedSocketImpl::OnSendStreamReady(MojoResult result
) {
192 if (result
!= MOJO_RESULT_OK
) {
194 // TODO(johnmccutchan): Notify socket direction is closed along with
195 // net_result and mojo_result.
198 ListenForSendPeerClosed();
202 void TCPConnectedSocketImpl::DidSend(bool completed_synchronously
, int result
) {
208 // TODO(johnmccutchan): Notify socket direction is closed along with
209 // net_result and mojo_result.
213 // Take back ownership of the stream and free the IOBuffer.
214 send_stream_
= pending_send_
->Complete(result
);
215 pending_send_
= nullptr;
217 // Schedule more writing.
218 if (completed_synchronously
) {
219 // Don't recursively call SendMore if this is a sync read.
220 base::MessageLoop::current()->PostTask(
221 FROM_HERE
, base::Bind(&TCPConnectedSocketImpl::SendMore
,
222 weak_ptr_factory_
.GetWeakPtr()));
228 void TCPConnectedSocketImpl::ShutdownSend() {
229 send_handle_watcher_
.Stop();
230 pending_send_
= nullptr;
231 send_stream_
.reset();
235 void TCPConnectedSocketImpl::ListenForSendPeerClosed() {
236 send_handle_watcher_
.Start(
237 send_stream_
.get(), MOJO_HANDLE_SIGNAL_PEER_CLOSED
,
238 MOJO_DEADLINE_INDEFINITE
,
239 base::Bind(&TCPConnectedSocketImpl::OnSendDataPipeClosed
,
240 weak_ptr_factory_
.GetWeakPtr()));
243 void TCPConnectedSocketImpl::OnSendDataPipeClosed(MojoResult result
) {
247 void TCPConnectedSocketImpl::DeleteIfNeeded() {
248 bool has_send
= pending_send_
|| send_stream_
.is_valid();
249 bool has_receive
= pending_receive_
|| receive_stream_
.is_valid();
250 if (!binding_
.is_bound() && !has_send
&& !has_receive
)