1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "jingle/glue/pseudotcp_adapter.h"
7 #include "base/compiler_specific.h"
8 #include "base/logging.h"
9 #include "base/time/time.h"
10 #include "net/base/address_list.h"
11 #include "net/base/completion_callback.h"
12 #include "net/base/io_buffer.h"
13 #include "net/base/net_errors.h"
14 #include "net/base/net_util.h"
16 using cricket::PseudoTcp
;
19 const int kReadBufferSize
= 65536; // Maximum size of a packet.
20 const uint16 kDefaultMtu
= 1280;
23 namespace jingle_glue
{
25 class PseudoTcpAdapter::Core
: public cricket::IPseudoTcpNotify
,
26 public base::RefCounted
<Core
> {
28 Core(net::Socket
* socket
);
30 // Functions used to implement net::StreamSocket.
31 int Read(net::IOBuffer
* buffer
, int buffer_size
,
32 const net::CompletionCallback
& callback
);
33 int Write(net::IOBuffer
* buffer
, int buffer_size
,
34 const net::CompletionCallback
& callback
);
35 int Connect(const net::CompletionCallback
& callback
);
37 bool IsConnected() const;
39 // cricket::IPseudoTcpNotify interface.
40 // These notifications are triggered from NotifyPacket.
41 virtual void OnTcpOpen(cricket::PseudoTcp
* tcp
) OVERRIDE
;
42 virtual void OnTcpReadable(cricket::PseudoTcp
* tcp
) OVERRIDE
;
43 virtual void OnTcpWriteable(cricket::PseudoTcp
* tcp
) OVERRIDE
;
44 // This is triggered by NotifyClock or NotifyPacket.
45 virtual void OnTcpClosed(cricket::PseudoTcp
* tcp
, uint32 error
) OVERRIDE
;
46 // This is triggered by NotifyClock, NotifyPacket, Recv and Send.
47 virtual WriteResult
TcpWritePacket(cricket::PseudoTcp
* tcp
,
48 const char* buffer
, size_t len
) OVERRIDE
;
50 void SetAckDelay(int delay_ms
);
51 void SetNoDelay(bool no_delay
);
52 void SetReceiveBufferSize(int32 size
);
53 void SetSendBufferSize(int32 size
);
54 void SetWriteWaitsForSend(bool write_waits_for_send
);
59 friend class base::RefCounted
<Core
>;
62 // These are invoked by the underlying Socket, and may trigger callbacks.
63 // They hold a reference to |this| while running, to protect from deletion.
64 void OnRead(int result
);
65 void OnWritten(int result
);
67 // These may trigger callbacks, so the holder must hold a reference on
68 // the stack while calling them.
69 void DoReadFromSocket();
70 void HandleReadResults(int result
);
71 void HandleTcpClock();
73 // Checks if current write has completed in the write-waits-for-send
75 void CheckWriteComplete();
77 // This re-sets |timer| without triggering callbacks.
80 net::CompletionCallback connect_callback_
;
81 net::CompletionCallback read_callback_
;
82 net::CompletionCallback write_callback_
;
84 cricket::PseudoTcp pseudo_tcp_
;
85 scoped_ptr
<net::Socket
> socket_
;
87 scoped_refptr
<net::IOBuffer
> read_buffer_
;
88 int read_buffer_size_
;
89 scoped_refptr
<net::IOBuffer
> write_buffer_
;
90 int write_buffer_size_
;
92 // Whether we need to wait for data to be sent before completing write.
93 bool write_waits_for_send_
;
95 // Set to true in the write-waits-for-send mode when we've
96 // successfully writtend data to the send buffer and waiting for the
97 // data to be sent to the remote end.
98 bool waiting_write_position_
;
100 // Number of the bytes written by the last write stored while we wait
101 // for the data to be sent (i.e. when waiting_write_position_ = true).
102 int last_write_result_
;
104 bool socket_write_pending_
;
105 scoped_refptr
<net::IOBuffer
> socket_read_buffer_
;
107 base::OneShotTimer
<Core
> timer_
;
109 DISALLOW_COPY_AND_ASSIGN(Core
);
113 PseudoTcpAdapter::Core::Core(net::Socket
* socket
)
114 : pseudo_tcp_(this, 0),
116 write_waits_for_send_(false),
117 waiting_write_position_(false),
118 socket_write_pending_(false) {
119 // Doesn't trigger callbacks.
120 pseudo_tcp_
.NotifyMTU(kDefaultMtu
);
123 PseudoTcpAdapter::Core::~Core() {
126 int PseudoTcpAdapter::Core::Read(net::IOBuffer
* buffer
, int buffer_size
,
127 const net::CompletionCallback
& callback
) {
128 DCHECK(read_callback_
.is_null());
130 // Reference the Core in case a callback deletes the adapter.
131 scoped_refptr
<Core
> core(this);
133 int result
= pseudo_tcp_
.Recv(buffer
->data(), buffer_size
);
135 result
= net::MapSystemError(pseudo_tcp_
.GetError());
139 if (result
== net::ERR_IO_PENDING
) {
140 read_buffer_
= buffer
;
141 read_buffer_size_
= buffer_size
;
142 read_callback_
= callback
;
150 int PseudoTcpAdapter::Core::Write(net::IOBuffer
* buffer
, int buffer_size
,
151 const net::CompletionCallback
& callback
) {
152 DCHECK(write_callback_
.is_null());
154 // Reference the Core in case a callback deletes the adapter.
155 scoped_refptr
<Core
> core(this);
157 int result
= pseudo_tcp_
.Send(buffer
->data(), buffer_size
);
159 result
= net::MapSystemError(pseudo_tcp_
.GetError());
165 if (result
== net::ERR_IO_PENDING
) {
166 write_buffer_
= buffer
;
167 write_buffer_size_
= buffer_size
;
168 write_callback_
= callback
;
175 // Need to wait until the data is sent to the peer when
176 // send-confirmation mode is enabled.
177 if (write_waits_for_send_
&& pseudo_tcp_
.GetBytesBufferedNotSent() > 0) {
178 DCHECK(!waiting_write_position_
);
179 waiting_write_position_
= true;
180 last_write_result_
= result
;
181 write_buffer_
= buffer
;
182 write_buffer_size_
= buffer_size
;
183 write_callback_
= callback
;
184 return net::ERR_IO_PENDING
;
190 int PseudoTcpAdapter::Core::Connect(const net::CompletionCallback
& callback
) {
191 DCHECK_EQ(pseudo_tcp_
.State(), cricket::PseudoTcp::TCP_LISTEN
);
193 // Reference the Core in case a callback deletes the adapter.
194 scoped_refptr
<Core
> core(this);
196 // Start the connection attempt.
197 int result
= pseudo_tcp_
.Connect();
199 return net::ERR_FAILED
;
203 connect_callback_
= callback
;
206 return net::ERR_IO_PENDING
;
209 void PseudoTcpAdapter::Core::Disconnect() {
210 // Don't dispatch outstanding callbacks, as mandated by net::StreamSocket.
211 read_callback_
.Reset();
213 write_callback_
.Reset();
214 write_buffer_
= NULL
;
215 connect_callback_
.Reset();
217 // TODO(wez): Connect should succeed if called after Disconnect, which
218 // PseudoTcp doesn't support, so we need to teardown the internal PseudoTcp
219 // and create a new one in Connect.
220 // TODO(wez): Close sets a shutdown flag inside PseudoTcp but has no other
221 // effect. This should be addressed in PseudoTcp, really.
222 // In the meantime we can fake OnTcpClosed notification and tear down the
224 pseudo_tcp_
.Close(true);
227 bool PseudoTcpAdapter::Core::IsConnected() const {
228 return pseudo_tcp_
.State() == PseudoTcp::TCP_ESTABLISHED
;
231 void PseudoTcpAdapter::Core::OnTcpOpen(PseudoTcp
* tcp
) {
232 DCHECK(tcp
== &pseudo_tcp_
);
234 if (!connect_callback_
.is_null()) {
235 net::CompletionCallback callback
= connect_callback_
;
236 connect_callback_
.Reset();
237 callback
.Run(net::OK
);
244 void PseudoTcpAdapter::Core::OnTcpReadable(PseudoTcp
* tcp
) {
245 DCHECK_EQ(tcp
, &pseudo_tcp_
);
246 if (read_callback_
.is_null())
249 int result
= pseudo_tcp_
.Recv(read_buffer_
->data(), read_buffer_size_
);
251 result
= net::MapSystemError(pseudo_tcp_
.GetError());
253 if (result
== net::ERR_IO_PENDING
)
259 net::CompletionCallback callback
= read_callback_
;
260 read_callback_
.Reset();
262 callback
.Run(result
);
265 void PseudoTcpAdapter::Core::OnTcpWriteable(PseudoTcp
* tcp
) {
266 DCHECK_EQ(tcp
, &pseudo_tcp_
);
267 if (write_callback_
.is_null())
270 if (waiting_write_position_
) {
271 CheckWriteComplete();
275 int result
= pseudo_tcp_
.Send(write_buffer_
->data(), write_buffer_size_
);
277 result
= net::MapSystemError(pseudo_tcp_
.GetError());
279 if (result
== net::ERR_IO_PENDING
)
285 if (write_waits_for_send_
&& pseudo_tcp_
.GetBytesBufferedNotSent() > 0) {
286 DCHECK(!waiting_write_position_
);
287 waiting_write_position_
= true;
288 last_write_result_
= result
;
292 net::CompletionCallback callback
= write_callback_
;
293 write_callback_
.Reset();
294 write_buffer_
= NULL
;
295 callback
.Run(result
);
298 void PseudoTcpAdapter::Core::OnTcpClosed(PseudoTcp
* tcp
, uint32 error
) {
299 DCHECK_EQ(tcp
, &pseudo_tcp_
);
301 if (!connect_callback_
.is_null()) {
302 net::CompletionCallback callback
= connect_callback_
;
303 connect_callback_
.Reset();
304 callback
.Run(net::MapSystemError(error
));
307 if (!read_callback_
.is_null()) {
308 net::CompletionCallback callback
= read_callback_
;
309 read_callback_
.Reset();
310 callback
.Run(net::MapSystemError(error
));
313 if (!write_callback_
.is_null()) {
314 net::CompletionCallback callback
= write_callback_
;
315 write_callback_
.Reset();
316 callback
.Run(net::MapSystemError(error
));
320 void PseudoTcpAdapter::Core::SetAckDelay(int delay_ms
) {
321 pseudo_tcp_
.SetOption(cricket::PseudoTcp::OPT_ACKDELAY
, delay_ms
);
324 void PseudoTcpAdapter::Core::SetNoDelay(bool no_delay
) {
325 pseudo_tcp_
.SetOption(cricket::PseudoTcp::OPT_NODELAY
, no_delay
? 1 : 0);
328 void PseudoTcpAdapter::Core::SetReceiveBufferSize(int32 size
) {
329 pseudo_tcp_
.SetOption(cricket::PseudoTcp::OPT_RCVBUF
, size
);
332 void PseudoTcpAdapter::Core::SetSendBufferSize(int32 size
) {
333 pseudo_tcp_
.SetOption(cricket::PseudoTcp::OPT_SNDBUF
, size
);
336 void PseudoTcpAdapter::Core::SetWriteWaitsForSend(bool write_waits_for_send
) {
337 write_waits_for_send_
= write_waits_for_send
;
340 void PseudoTcpAdapter::Core::DeleteSocket() {
344 cricket::IPseudoTcpNotify::WriteResult
PseudoTcpAdapter::Core::TcpWritePacket(
348 DCHECK_EQ(tcp
, &pseudo_tcp_
);
350 // If we already have a write pending, we behave like a congested network,
351 // returning success for the write, but dropping the packet. PseudoTcp will
352 // back-off and retransmit, adjusting for the perceived congestion.
353 if (socket_write_pending_
)
354 return IPseudoTcpNotify::WR_SUCCESS
;
356 scoped_refptr
<net::IOBuffer
> write_buffer
= new net::IOBuffer(len
);
357 memcpy(write_buffer
->data(), buffer
, len
);
359 // Our underlying socket is datagram-oriented, which means it should either
360 // send exactly as many bytes as we requested, or fail.
363 result
= socket_
->Write(
366 base::Bind(&PseudoTcpAdapter::Core::OnWritten
, base::Unretained(this)));
368 result
= net::ERR_CONNECTION_CLOSED
;
370 if (result
== net::ERR_IO_PENDING
) {
371 socket_write_pending_
= true;
372 return IPseudoTcpNotify::WR_SUCCESS
;
373 } else if (result
== net::ERR_MSG_TOO_BIG
) {
374 return IPseudoTcpNotify::WR_TOO_LARGE
;
375 } else if (result
< 0) {
376 return IPseudoTcpNotify::WR_FAIL
;
378 return IPseudoTcpNotify::WR_SUCCESS
;
382 void PseudoTcpAdapter::Core::DoReadFromSocket() {
383 if (!socket_read_buffer_
.get())
384 socket_read_buffer_
= new net::IOBuffer(kReadBufferSize
);
387 while (socket_
.get() && result
> 0) {
388 result
= socket_
->Read(
389 socket_read_buffer_
.get(),
391 base::Bind(&PseudoTcpAdapter::Core::OnRead
, base::Unretained(this)));
392 if (result
!= net::ERR_IO_PENDING
)
393 HandleReadResults(result
);
397 void PseudoTcpAdapter::Core::HandleReadResults(int result
) {
399 LOG(ERROR
) << "Read returned " << result
;
403 // TODO(wez): Disconnect on failure of NotifyPacket?
404 pseudo_tcp_
.NotifyPacket(socket_read_buffer_
->data(), result
);
407 CheckWriteComplete();
410 void PseudoTcpAdapter::Core::OnRead(int result
) {
411 // Reference the Core in case a callback deletes the adapter.
412 scoped_refptr
<Core
> core(this);
414 HandleReadResults(result
);
419 void PseudoTcpAdapter::Core::OnWritten(int result
) {
420 // Reference the Core in case a callback deletes the adapter.
421 scoped_refptr
<Core
> core(this);
423 socket_write_pending_
= false;
425 LOG(WARNING
) << "Write failed. Error code: " << result
;
429 void PseudoTcpAdapter::Core::AdjustClock() {
431 if (pseudo_tcp_
.GetNextClock(PseudoTcp::Now(), timeout
)) {
433 timer_
.Start(FROM_HERE
,
434 base::TimeDelta::FromMilliseconds(std::max(timeout
, 0L)), this,
435 &PseudoTcpAdapter::Core::HandleTcpClock
);
439 void PseudoTcpAdapter::Core::HandleTcpClock() {
440 // Reference the Core in case a callback deletes the adapter.
441 scoped_refptr
<Core
> core(this);
443 pseudo_tcp_
.NotifyClock(PseudoTcp::Now());
446 CheckWriteComplete();
449 void PseudoTcpAdapter::Core::CheckWriteComplete() {
450 if (!write_callback_
.is_null() && waiting_write_position_
) {
451 if (pseudo_tcp_
.GetBytesBufferedNotSent() == 0) {
452 waiting_write_position_
= false;
454 net::CompletionCallback callback
= write_callback_
;
455 write_callback_
.Reset();
456 write_buffer_
= NULL
;
457 callback
.Run(last_write_result_
);
462 // Public interface implemention.
464 PseudoTcpAdapter::PseudoTcpAdapter(net::Socket
* socket
)
465 : core_(new Core(socket
)) {
468 PseudoTcpAdapter::~PseudoTcpAdapter() {
471 // Make sure that the underlying socket is destroyed before PseudoTcp.
472 core_
->DeleteSocket();
475 int PseudoTcpAdapter::Read(net::IOBuffer
* buffer
, int buffer_size
,
476 const net::CompletionCallback
& callback
) {
477 DCHECK(CalledOnValidThread());
478 return core_
->Read(buffer
, buffer_size
, callback
);
481 int PseudoTcpAdapter::Write(net::IOBuffer
* buffer
, int buffer_size
,
482 const net::CompletionCallback
& callback
) {
483 DCHECK(CalledOnValidThread());
484 return core_
->Write(buffer
, buffer_size
, callback
);
487 int PseudoTcpAdapter::SetReceiveBufferSize(int32 size
) {
488 DCHECK(CalledOnValidThread());
490 core_
->SetReceiveBufferSize(size
);
494 int PseudoTcpAdapter::SetSendBufferSize(int32 size
) {
495 DCHECK(CalledOnValidThread());
497 core_
->SetSendBufferSize(size
);
501 int PseudoTcpAdapter::Connect(const net::CompletionCallback
& callback
) {
502 DCHECK(CalledOnValidThread());
504 // net::StreamSocket requires that Connect return OK if already connected.
508 return core_
->Connect(callback
);
511 void PseudoTcpAdapter::Disconnect() {
512 DCHECK(CalledOnValidThread());
516 bool PseudoTcpAdapter::IsConnected() const {
517 return core_
->IsConnected();
520 bool PseudoTcpAdapter::IsConnectedAndIdle() const {
521 DCHECK(CalledOnValidThread());
526 int PseudoTcpAdapter::GetPeerAddress(net::IPEndPoint
* address
) const {
527 DCHECK(CalledOnValidThread());
529 // We don't have a meaningful peer address, but we can't return an
530 // error, so we return a INADDR_ANY instead.
531 net::IPAddressNumber
ip_address(net::kIPv4AddressSize
);
532 *address
= net::IPEndPoint(ip_address
, 0);
536 int PseudoTcpAdapter::GetLocalAddress(net::IPEndPoint
* address
) const {
537 DCHECK(CalledOnValidThread());
539 return net::ERR_FAILED
;
542 const net::BoundNetLog
& PseudoTcpAdapter::NetLog() const {
543 DCHECK(CalledOnValidThread());
547 void PseudoTcpAdapter::SetSubresourceSpeculation() {
548 DCHECK(CalledOnValidThread());
552 void PseudoTcpAdapter::SetOmniboxSpeculation() {
553 DCHECK(CalledOnValidThread());
557 bool PseudoTcpAdapter::WasEverUsed() const {
558 DCHECK(CalledOnValidThread());
563 bool PseudoTcpAdapter::UsingTCPFastOpen() const {
564 DCHECK(CalledOnValidThread());
568 bool PseudoTcpAdapter::WasNpnNegotiated() const {
569 DCHECK(CalledOnValidThread());
573 net::NextProto
PseudoTcpAdapter::GetNegotiatedProtocol() const {
574 DCHECK(CalledOnValidThread());
575 return net::kProtoUnknown
;
578 bool PseudoTcpAdapter::GetSSLInfo(net::SSLInfo
* ssl_info
) {
579 DCHECK(CalledOnValidThread());
583 void PseudoTcpAdapter::SetAckDelay(int delay_ms
) {
584 DCHECK(CalledOnValidThread());
585 core_
->SetAckDelay(delay_ms
);
588 void PseudoTcpAdapter::SetNoDelay(bool no_delay
) {
589 DCHECK(CalledOnValidThread());
590 core_
->SetNoDelay(no_delay
);
593 void PseudoTcpAdapter::SetWriteWaitsForSend(bool write_waits_for_send
) {
594 DCHECK(CalledOnValidThread());
595 core_
->SetWriteWaitsForSend(write_waits_for_send
);
598 } // namespace jingle_glue