[Sync] Fix invalidations on Android.
[chromium-blink-merge.git] / mojo / services / network / tcp_connected_socket_impl.cc
blobbc8c98b568aa2e4f8d31e2b068a655c6262fa256
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "mojo/services/network/tcp_connected_socket_impl.h"
7 #include "base/message_loop/message_loop.h"
8 #include "mojo/services/network/net_adapters.h"
9 #include "net/base/net_errors.h"
11 namespace mojo {
13 TCPConnectedSocketImpl::TCPConnectedSocketImpl(
14 scoped_ptr<net::TCPSocket> socket,
15 ScopedDataPipeConsumerHandle send_stream,
16 ScopedDataPipeProducerHandle receive_stream,
17 InterfaceRequest<TCPConnectedSocket> request)
18 : socket_(socket.Pass()),
19 send_stream_(send_stream.Pass()),
20 receive_stream_(receive_stream.Pass()),
21 binding_(this, request.Pass()),
22 weak_ptr_factory_(this) {
23 // Queue up async communication.
24 binding_.set_error_handler(this);
25 ListenForReceivePeerClosed();
26 ListenForSendPeerClosed();
27 ReceiveMore();
28 SendMore();
31 TCPConnectedSocketImpl::~TCPConnectedSocketImpl() {
34 void TCPConnectedSocketImpl::OnConnectionError() {
35 binding_.Close();
36 DeleteIfNeeded();
39 void TCPConnectedSocketImpl::ReceiveMore() {
40 DCHECK(!pending_receive_.get());
42 uint32_t num_bytes;
43 MojoResult result = NetToMojoPendingBuffer::BeginWrite(
44 &receive_stream_, &pending_receive_, &num_bytes);
45 if (result == MOJO_RESULT_SHOULD_WAIT) {
46 // The pipe is full. We need to wait for it to have more space.
47 receive_handle_watcher_.Start(
48 receive_stream_.get(), MOJO_HANDLE_SIGNAL_WRITABLE,
49 MOJO_DEADLINE_INDEFINITE,
50 base::Bind(&TCPConnectedSocketImpl::OnReceiveStreamReady,
51 weak_ptr_factory_.GetWeakPtr()));
52 return;
55 if (result == MOJO_RESULT_FAILED_PRECONDITION) {
56 // It's valid that the user of this class consumed the data they care about
57 // and closed their data pipe handles after writing data. This class should
58 // still write out all the data.
59 ShutdownReceive();
60 // TODO(johnmccutchan): Notify socket direction is closed along with
61 // net_result and mojo_result.
62 return;
65 if (result != MOJO_RESULT_OK) {
66 // The receive stream is in a bad state.
67 ShutdownReceive();
68 // TODO(johnmccutchan): Notify socket direction is closed along with
69 // net_result and mojo_result.
70 return;
73 // Mojo is ready for the receive.
74 CHECK_GT(static_cast<uint32_t>(std::numeric_limits<int>::max()), num_bytes);
75 scoped_refptr<net::IOBuffer> buf(
76 new NetToMojoIOBuffer(pending_receive_.get()));
77 int read_result =
78 socket_->Read(buf.get(), static_cast<int>(num_bytes),
79 base::Bind(&TCPConnectedSocketImpl::DidReceive,
80 weak_ptr_factory_.GetWeakPtr(), false));
81 if (read_result == net::ERR_IO_PENDING) {
82 // Pending I/O, wait for result in DidReceive().
83 } else if (read_result > 0) {
84 // Synchronous data ready.
85 DidReceive(true, read_result);
86 } else {
87 // read_result == 0 indicates EOF.
88 // read_result < 0 indicates error.
89 ShutdownReceive();
90 // TODO(johnmccutchan): Notify socket direction is closed along with
91 // net_result and mojo_result.
95 void TCPConnectedSocketImpl::OnReceiveStreamReady(MojoResult result) {
96 if (result != MOJO_RESULT_OK) {
97 ShutdownReceive();
98 // TODO(johnmccutchan): Notify socket direction is closed along with
99 // net_result and mojo_result.
100 return;
102 ListenForReceivePeerClosed();
103 ReceiveMore();
106 void TCPConnectedSocketImpl::DidReceive(bool completed_synchronously,
107 int result) {
108 if (!pending_receive_)
109 return;
111 if (result < 0) {
112 // Error.
113 ShutdownReceive();
114 // TODO(johnmccutchan): Notify socket direction is closed along with
115 // net_result and mojo_result.
116 return;
119 receive_stream_ = pending_receive_->Complete(result);
120 pending_receive_ = nullptr;
122 // Schedule more reading.
123 if (completed_synchronously) {
124 // Don't recursively call ReceiveMore if this is a sync read.
125 base::MessageLoop::current()->PostTask(
126 FROM_HERE, base::Bind(&TCPConnectedSocketImpl::ReceiveMore,
127 weak_ptr_factory_.GetWeakPtr()));
128 } else {
129 ReceiveMore();
133 void TCPConnectedSocketImpl::ShutdownReceive() {
134 receive_handle_watcher_.Stop();
135 pending_receive_ = nullptr;
136 receive_stream_.reset();
137 DeleteIfNeeded();
140 void TCPConnectedSocketImpl::ListenForReceivePeerClosed() {
141 receive_handle_watcher_.Start(
142 receive_stream_.get(), MOJO_HANDLE_SIGNAL_PEER_CLOSED,
143 MOJO_DEADLINE_INDEFINITE,
144 base::Bind(&TCPConnectedSocketImpl::OnReceiveDataPipeClosed,
145 weak_ptr_factory_.GetWeakPtr()));
148 void TCPConnectedSocketImpl::OnReceiveDataPipeClosed(MojoResult result) {
149 ShutdownReceive();
152 void TCPConnectedSocketImpl::SendMore() {
153 uint32_t num_bytes = 0;
154 MojoResult result = MojoToNetPendingBuffer::BeginRead(
155 &send_stream_, &pending_send_, &num_bytes);
156 if (result == MOJO_RESULT_SHOULD_WAIT) {
157 // Data not ready, wait for it.
158 send_handle_watcher_.Start(
159 send_stream_.get(), MOJO_HANDLE_SIGNAL_READABLE,
160 MOJO_DEADLINE_INDEFINITE,
161 base::Bind(&TCPConnectedSocketImpl::OnSendStreamReady,
162 weak_ptr_factory_.GetWeakPtr()));
163 return;
164 } else if (result != MOJO_RESULT_OK) {
165 ShutdownSend();
166 // TODO(johnmccutchan): Notify socket direction is closed along with
167 // net_result and mojo_result.
168 return;
171 // Got a buffer from Mojo, give it to the socket. Note that the sockets may
172 // do partial writes.
173 scoped_refptr<net::IOBuffer> buf(new MojoToNetIOBuffer(pending_send_.get()));
174 int write_result =
175 socket_->Write(buf.get(), static_cast<int>(num_bytes),
176 base::Bind(&TCPConnectedSocketImpl::DidSend,
177 weak_ptr_factory_.GetWeakPtr(), false));
178 if (write_result == net::ERR_IO_PENDING) {
179 // Pending I/O, wait for result in DidSend().
180 } else if (write_result >= 0) {
181 // Synchronous data consumed.
182 DidSend(true, write_result);
183 } else {
184 // write_result < 0 indicates error.
185 ShutdownSend();
186 // TODO(johnmccutchan): Notify socket direction is closed along with
187 // net_result and mojo_result.
191 void TCPConnectedSocketImpl::OnSendStreamReady(MojoResult result) {
192 if (result != MOJO_RESULT_OK) {
193 ShutdownSend();
194 // TODO(johnmccutchan): Notify socket direction is closed along with
195 // net_result and mojo_result.
196 return;
198 ListenForSendPeerClosed();
199 SendMore();
202 void TCPConnectedSocketImpl::DidSend(bool completed_synchronously, int result) {
203 if (!pending_send_)
204 return;
206 if (result < 0) {
207 ShutdownSend();
208 // TODO(johnmccutchan): Notify socket direction is closed along with
209 // net_result and mojo_result.
210 return;
213 // Take back ownership of the stream and free the IOBuffer.
214 send_stream_ = pending_send_->Complete(result);
215 pending_send_ = nullptr;
217 // Schedule more writing.
218 if (completed_synchronously) {
219 // Don't recursively call SendMore if this is a sync read.
220 base::MessageLoop::current()->PostTask(
221 FROM_HERE, base::Bind(&TCPConnectedSocketImpl::SendMore,
222 weak_ptr_factory_.GetWeakPtr()));
223 } else {
224 SendMore();
228 void TCPConnectedSocketImpl::ShutdownSend() {
229 send_handle_watcher_.Stop();
230 pending_send_ = nullptr;
231 send_stream_.reset();
232 DeleteIfNeeded();
235 void TCPConnectedSocketImpl::ListenForSendPeerClosed() {
236 send_handle_watcher_.Start(
237 send_stream_.get(), MOJO_HANDLE_SIGNAL_PEER_CLOSED,
238 MOJO_DEADLINE_INDEFINITE,
239 base::Bind(&TCPConnectedSocketImpl::OnSendDataPipeClosed,
240 weak_ptr_factory_.GetWeakPtr()));
243 void TCPConnectedSocketImpl::OnSendDataPipeClosed(MojoResult result) {
244 ShutdownSend();
247 void TCPConnectedSocketImpl::DeleteIfNeeded() {
248 bool has_send = pending_send_ || send_stream_.is_valid();
249 bool has_receive = pending_receive_ || receive_stream_.is_valid();
250 if (!binding_.is_bound() && !has_send && !has_receive)
251 delete this;
254 } // namespace mojo