1 // Copyright 2012 Google Inc. All Rights Reserved.
3 package com
.google
.appengine
.api
.socket
;
5 import com
.google
.appengine
.api
.socket
.AppEngineSocketOptions
.Option
;
6 import com
.google
.appengine
.api
.socket
.SocketServicePb
.AcceptReply
;
7 import com
.google
.appengine
.api
.socket
.SocketServicePb
.AcceptRequest
;
8 import com
.google
.appengine
.api
.socket
.SocketServicePb
.AddressPort
;
9 import com
.google
.appengine
.api
.socket
.SocketServicePb
.CloseReply
;
10 import com
.google
.appengine
.api
.socket
.SocketServicePb
.CloseRequest
;
11 import com
.google
.appengine
.api
.socket
.SocketServicePb
.ConnectReply
;
12 import com
.google
.appengine
.api
.socket
.SocketServicePb
.ConnectRequest
;
13 import com
.google
.appengine
.api
.socket
.SocketServicePb
.CreateSocketReply
;
14 import com
.google
.appengine
.api
.socket
.SocketServicePb
.CreateSocketRequest
;
15 import com
.google
.appengine
.api
.socket
.SocketServicePb
.CreateSocketRequest
.SocketFamily
;
16 import com
.google
.appengine
.api
.socket
.SocketServicePb
.CreateSocketRequest
.SocketProtocol
;
17 import com
.google
.appengine
.api
.socket
.SocketServicePb
.GetSocketNameReply
;
18 import com
.google
.appengine
.api
.socket
.SocketServicePb
.GetSocketNameRequest
;
19 import com
.google
.appengine
.api
.socket
.SocketServicePb
.GetSocketOptionsReply
;
20 import com
.google
.appengine
.api
.socket
.SocketServicePb
.GetSocketOptionsRequest
;
21 import com
.google
.appengine
.api
.socket
.SocketServicePb
.ListenReply
;
22 import com
.google
.appengine
.api
.socket
.SocketServicePb
.ListenRequest
;
23 import com
.google
.appengine
.api
.socket
.SocketServicePb
.ReceiveReply
;
24 import com
.google
.appengine
.api
.socket
.SocketServicePb
.ReceiveRequest
;
25 import com
.google
.appengine
.api
.socket
.SocketServicePb
.RemoteSocketServiceError
;
26 import com
.google
.appengine
.api
.socket
.SocketServicePb
.RemoteSocketServiceError
.ErrorCode
;
27 import com
.google
.appengine
.api
.socket
.SocketServicePb
.RemoteSocketServiceError
.SystemError
;
28 import com
.google
.appengine
.api
.socket
.SocketServicePb
.SendReply
;
29 import com
.google
.appengine
.api
.socket
.SocketServicePb
.SendRequest
;
30 import com
.google
.appengine
.api
.socket
.SocketServicePb
.SetSocketOptionsReply
;
31 import com
.google
.appengine
.api
.socket
.SocketServicePb
.SetSocketOptionsRequest
;
32 import com
.google
.appengine
.api
.socket
.SocketServicePb
.ShutDownReply
;
33 import com
.google
.appengine
.api
.socket
.SocketServicePb
.ShutDownRequest
;
34 import com
.google
.appengine
.api
.socket
.SocketServicePb
.SocketOption
.SocketOptionName
;
36 import java
.io
.IOException
;
37 import java
.io
.InputStream
;
38 import java
.io
.OutputStream
;
39 import java
.io
.Serializable
;
40 import java
.net
.ConnectException
;
41 import java
.net
.InetAddress
;
42 import java
.net
.InetSocketAddress
;
43 import java
.net
.SocketAddress
;
44 import java
.net
.SocketException
;
45 import java
.net
.SocketImpl
;
46 import java
.net
.SocketOptions
;
47 import java
.net
.SocketTimeoutException
;
48 import java
.net
.UnknownHostException
;
49 import java
.util
.Arrays
;
50 import java
.util
.concurrent
.atomic
.AtomicLong
;
53 * Implements the {@link SocketImpl} interface for App Engine based sockets.
56 class AppEngineSocketImpl
extends SocketImpl
implements AppEngineSocketOptionsClient
, Serializable
{
57 private static final long serialVersionUID
= -2683691443688405980L;
59 static enum SocketState
{
61 * Socket is uninitialized.
66 * {@link #create(boolean)} has been called.
67 * Note that this does not relate to remote socket create.
77 * Socket is connected.
82 * Socket is listening.
92 SocketState currentState
= SocketState
.UNINITIALIZED
;
94 private SocketApiHelper socketHelper
= null;
96 String descriptor
= null;
100 private boolean readsShutdown
= false;
101 private boolean writesShutdown
= false;
103 private final AtomicLong sendOffset
= new AtomicLong(0);
105 private AppEngineSocketInputStream socketInputStream
= null;
108 * True indicates that this socket is for a (TCP) stream socket otherwise
109 * it is a datagram socket (UDP).
111 private boolean stream
;
113 private InetAddress localAddress
;
115 AppEngineSocketImpl() {
118 AppEngineSocketImpl(SocketApiHelper socketHelper
) {
119 this.socketHelper
= socketHelper
;
122 SocketApiHelper
getSocketApiHelper() {
123 return socketHelper
== null ? AppEngineSocketImplFactory
.SOCKET_API_HELPER
: socketHelper
;
126 protected InetAddress
toInetAddress(AddressPort addressPort
) throws UnknownHostException
{
127 return InetAddress
.getByAddress(addressPort
.getPackedAddressAsBytes());
131 AppEngineSocketOptions
.Option option
, Object value
) throws SocketException
{
133 option
.validateAndApply(this, value
);
136 private void createSocketOrCheckNotClosed() throws SocketException
{
137 if (currentState
== SocketState
.CREATE_CALLED
) {
139 createSocket(null, 0, null, 0, false);
140 } catch (SocketTimeoutException e
) {
141 throw new SocketException(e
.getMessage());
149 * @see java.net.SocketOptions#setOption(int, java.lang.Object)
152 public void setOption(int opt
, Object value
) throws SocketException
{
153 AppEngineSocketOptions
.Option option
= AppEngineSocketOptions
.getOptionById(opt
);
154 if (option
== null) {
155 throw new SocketException("unrecognized socket option: " + opt
);
157 setOption(option
, value
);
160 String
statesToString(SocketState
...socketStates
) {
161 StringBuilder builder
= new StringBuilder();
163 for (SocketState state
: socketStates
) {
164 builder
.append(prefix
);
166 builder
.append(state
.toString());
168 return builder
.toString();
171 void checkStateIsOneOf(String errorMsg
, SocketState
...socketStates
) throws SocketException
{
172 for (SocketState state
: socketStates
) {
173 if (currentState
== state
) {
177 throw new SocketException(errorMsg
+ ": Expected to be in one of states : (" +
178 statesToString(socketStates
) + "), was in state " + currentState
);
181 void checkStateIsNotOneOf(String errorMsg
, SocketState
...socketStates
) throws SocketException
{
182 for (SocketState state
: socketStates
) {
183 if (currentState
== state
) {
184 throw new SocketException(errorMsg
+ ": Expected to not be in one of (" +
185 statesToString(socketStates
) + "), was in state " + currentState
);
191 * Checks that the socket is not closed.
193 private void checkNotClosed() throws SocketException
{
194 checkStateIsNotOneOf("Socket is closed", SocketState
.UNINITIALIZED
, SocketState
.CLOSED
);
198 * Implements the remote service SetSocketOptions call.
201 public void setSocketOptionAsBytes(AppEngineSocketOptions
.Option opt
, byte[] value
)
202 throws SocketException
{
203 SocketOptionName name
= opt
.getOptName();
207 createSocketOrCheckNotClosed();
208 SetSocketOptionsRequest request
= new SetSocketOptionsRequest();
209 SetSocketOptionsReply response
= new SetSocketOptionsReply();
210 request
.setSocketDescriptor(descriptor
);
211 request
.addOptions().setLevel(opt
.getLevel()).setOption(name
).setValueAsBytes(value
);
213 getSocketApiHelper().makeSyncCall("SetSocketOptions", request
, response
, null);
217 public void setTimeout(int timeout
) {
222 * @see java.net.SocketOptions#getOption(int)
225 public Object
getOption(int optID
) throws SocketException
{
226 if (SocketOptions
.SO_BINDADDR
== optID
) {
227 if (localAddress
== null) {
228 createSocketOrCheckNotClosed();
233 if (SocketOptions
.SO_TIMEOUT
== optID
) {
234 return soTimeout
<= 0 ?
0 : soTimeout
;
237 AppEngineSocketOptions
.Option option
= AppEngineSocketOptions
.getOptionById(optID
);
238 if (option
== null) {
239 throw new SocketException("unrecognized socket option: " + optID
);
243 return getOption(option
);
244 } catch (SocketException e
) {
245 if (SocketOptions
.SO_LINGER
== optID
) {
253 * Returns the value of the given option.
254 * @throws SocketException
256 private Object
getOption(Option option
) throws SocketException
{
257 createSocketOrCheckNotClosed();
258 return option
.getOption(this);
262 * Called by {@link Option#getOption(AppEngineSocketImpl)}.
263 * @see AppEngineSocketOptionsClient#getSocketOptionAsBytes(Option)
266 public byte[] getSocketOptionAsBytes(Option option
) throws SocketException
{
267 GetSocketOptionsRequest request
= new GetSocketOptionsRequest();
268 GetSocketOptionsReply response
= new GetSocketOptionsReply();
269 request
.setSocketDescriptor(descriptor
);
270 request
.addOptions().setLevel(option
.getLevel()).setOption(option
.getOptName());
271 getSocketApiHelper().makeSyncCall("GetSocketOptions", request
, response
, null);
272 return response
.optionss().get(0).getValueAsBytes();
276 * @see java.net.SocketImpl#create(boolean)
279 protected synchronized void create(boolean stream
) throws IOException
{
280 checkStateIsOneOf("Socket is already created", SocketState
.UNINITIALIZED
);
281 currentState
= SocketState
.CREATE_CALLED
;
282 this.stream
= stream
;
286 * @see java.net.SocketImpl#connect(java.lang.String, int)
289 protected synchronized void connect(String host
, int port
) throws IOException
{
291 InetAddress address
= InetAddress
.getByName(host
);
292 connectToAddress(address
, port
, null);
293 } catch (UnknownHostException e
) {
300 * @see java.net.SocketImpl#connect(java.net.InetAddress, int)
303 protected synchronized void connect(InetAddress address
, int port
) throws IOException
{
304 if (address
== null) {
305 throw new IllegalArgumentException("null address is illegal for connect");
308 checkStateIsOneOf("socket is not created", SocketState
.CREATE_CALLED
, SocketState
.BOUND
);
310 connectToAddress(address
, port
, null);
314 * @see java.net.SocketImpl#connect(java.net.SocketAddress, int)
317 protected synchronized void connect(SocketAddress socketAddress
, int timeout
) throws IOException
{
318 if (socketAddress
== null) {
319 throw new IllegalArgumentException("null address is illegal for connect");
322 if (!(socketAddress
instanceof InetSocketAddress
)) {
323 throw new IllegalArgumentException("Address must be of type InetSocketAddress");
326 InetSocketAddress addr
= (InetSocketAddress
) socketAddress
;
328 if (addr
.isUnresolved()) {
329 throw new UnknownHostException(addr
.getHostName());
332 connectToAddress(addr
.getAddress(), addr
.getPort(), timeout
== 0 ?
null : timeout
);
335 private void connectToAddress(InetAddress address
, int port
, Integer timeoutMillis
)
336 throws SocketException
, SocketTimeoutException
{
337 switch (currentState
) {
338 case CREATE_CALLED
: {
339 if (timeoutMillis
== null) {
340 createSocket(null, 0, address
, port
, true);
342 createSocket(null, 0, address
, port
, false);
343 connectSocket(address
, port
, timeoutMillis
);
348 connectSocket(address
, port
, timeoutMillis
);
357 this.address
= address
;
360 private void processConnectError(RemoteSocketServiceError serviceError
)
361 throws SocketException
, SocketTimeoutException
{
363 if (SystemError
.valueOf(serviceError
.getSystemError()) == SystemError
.SYS_EINPROGRESS
) {
364 throw new SocketTimeoutException();
366 if (SystemError
.valueOf(serviceError
.getSystemError()) == SystemError
.SYS_ETIMEDOUT
) {
367 throw new SocketTimeoutException();
369 if (SystemError
.valueOf(serviceError
.getSystemError()) == SystemError
.SYS_ECONNREFUSED
) {
370 throw new ConnectException(serviceError
.getErrorDetail());
372 throw SocketApiHelper
.translateError(
373 ErrorCode
.SYSTEM_ERROR
.getValue(),
374 "errno: " + serviceError
.getSystemError() + ", detail:" + serviceError
.getErrorDetail());
378 * Performs connect given a timeout.
380 private void connectSocket(InetAddress remoteAddress
, int remotePort
, Integer timeoutMillis
)
381 throws SocketException
, SocketTimeoutException
{
382 if (remoteAddress
== null) {
383 throw new IllegalArgumentException("remoteAddress must not be null if connect requested");
386 ConnectRequest request
= new ConnectRequest().setSocketDescriptor(descriptor
);
387 ConnectReply response
= new ConnectReply();
389 request
.getMutableRemoteIp()
391 .setPackedAddressAsBytes(AppEngineSocketUtils
.addrAsIpv6Bytes(remoteAddress
));
392 if (timeoutMillis
!= null) {
393 request
.setTimeoutSeconds(0.001D
* timeoutMillis
);
396 RemoteSocketServiceError serviceError
= new RemoteSocketServiceError();
397 if (!getSocketApiHelper().makeSyncCall("Connect", request
, response
, serviceError
)) {
398 processConnectError(serviceError
);
401 currentState
= SocketState
.CONNECTED
;
406 * Releases the socket and absorbs any exceptions.
408 private void releaseSocket() {
409 readsShutdown
= false;
410 writesShutdown
= false;
413 } catch (IOException ignored
) {
415 currentState
= SocketState
.CLOSED
;
418 private void fixLocalAddress() throws SocketException
{
419 GetSocketNameRequest request
= new GetSocketNameRequest().setSocketDescriptor(descriptor
);
420 GetSocketNameReply response
= new GetSocketNameReply();
422 getSocketApiHelper().makeSyncCall("GetSocketName", request
, response
, null);
423 AddressPort externalIp
= response
.getProxyExternalIp();
424 this.localport
= externalIp
.getPort();
427 localAddress
= toInetAddress(externalIp
);
428 } catch (UnknownHostException e
) {
429 throw new SocketException(e
.toString());
434 * Create the remote socket with an optional bind and connect. Either the
435 * remoteAddress or bindAddress or both parameters must be provided.
436 * It is used to determine if socket is to be created with an IPV4 or IPV6
438 * @throws SocketException
439 * @throws SocketTimeoutException
441 private void createSocket(
442 InetAddress bindAddress
, int bindPort
, InetAddress remoteAddress
,
443 int remotePort
, boolean connect
)
444 throws SocketException
, SocketTimeoutException
{
446 CreateSocketRequest request
= new CreateSocketRequest();
447 CreateSocketReply response
= new CreateSocketReply();
449 request
.setFamily(SocketFamily
.IPv6
);
450 request
.setProtocol(stream ? SocketProtocol
.TCP
: SocketProtocol
.UDP
);
452 if (bindAddress
!= null) {
453 request
.getMutableProxyExternalIp()
455 .setPackedAddressAsBytes(AppEngineSocketUtils
.addrAsIpv6Bytes(bindAddress
));
459 if (remoteAddress
== null) {
460 throw new IllegalArgumentException("remoteAddress must not be null if connect requested");
462 request
.getMutableRemoteIp()
464 .setPackedAddressAsBytes(AppEngineSocketUtils
.addrAsIpv6Bytes(remoteAddress
));
465 RemoteSocketServiceError serviceError
= new RemoteSocketServiceError();
466 if (!getSocketApiHelper().makeSyncCall("CreateSocket", request
, response
, serviceError
)) {
467 processConnectError(serviceError
);
470 getSocketApiHelper().makeSyncCall("CreateSocket", request
, response
, null);
473 descriptor
= response
.getSocketDescriptor();
474 currentState
= connect ? SocketState
.CONNECTED
: SocketState
.BOUND
;
482 * @see java.net.SocketImpl#bind(java.net.InetAddress, int)
485 protected synchronized void bind(InetAddress host
, int port
) throws IOException
{
486 createSocket(host
, port
, null, 0, false);
487 currentState
= SocketState
.BOUND
;
491 * @see java.net.SocketImpl#listen(int)
494 protected void listen(int backlog
) throws IOException
{
495 checkStateIsNotOneOf("Socket must be in a bound state.", SocketState
.UNINITIALIZED
,
496 SocketState
.CREATE_CALLED
, SocketState
.CONNECTED
, SocketState
.CLOSED
);
497 ListenRequest request
= new ListenRequest().setSocketDescriptor(descriptor
).setBacklog(backlog
);
498 getSocketApiHelper().makeSyncCall("Listen", request
, new ListenReply(), null);
499 currentState
= SocketState
.LISTEN
;
503 * @see java.net.SocketImpl#accept(java.net.SocketImpl)
506 protected void accept(SocketImpl s
) throws IOException
{
507 checkStateIsOneOf("Socket is not in passive (accepting) mode.", SocketState
.LISTEN
);
508 if (!(s
instanceof AppEngineSocketImpl
)) {
509 throw new IllegalStateException(
510 "Expected a SocketImpl compatable with '" + this.getClass().getName() +
511 "'. A '" + s
.getClass().getName() + "' was received.");
514 AppEngineSocketImpl acceptingSocket
= (AppEngineSocketImpl
) s
;
515 AcceptReply response
= new AcceptReply();
516 AcceptRequest request
= new AcceptRequest().setSocketDescriptor(descriptor
);
517 getSocketApiHelper().makeSyncCall("Accept", request
, response
, null);
518 acceptingSocket
.doAccept(response
);
522 * Initializes an AppEngineSocketImpl from an AcceptReply message.
523 * @throws SocketException
525 private void doAccept(AcceptReply response
) throws SocketException
{
526 switch (currentState
) {
538 descriptor
= response
.getNewSocketDescriptor();
539 AddressPort addressPort
= response
.getRemoteAddress();
541 address
= InetAddress
.getByAddress(addressPort
.getPackedAddressAsBytes());
542 } catch (UnknownHostException e
) {
543 throw new SocketException(e
.getMessage());
546 port
= addressPort
.getPort();
547 currentState
= SocketState
.CONNECTED
;
551 * @see java.net.SocketImpl#getInputStream()
554 protected InputStream
getInputStream() throws IOException
{
555 if (currentState
== SocketState
.CLOSED
) {
556 throw new IOException("Socket closed.");
560 throw new IOException("Socket input is shutdown.");
563 if (socketInputStream
== null) {
564 socketInputStream
= new AppEngineSocketInputStream(this);
566 return socketInputStream
;
570 * @see java.net.SocketImpl#getOutputStream()
573 protected OutputStream
getOutputStream() throws IOException
{
574 if (currentState
== SocketState
.CLOSED
) {
575 throw new IOException("Socket closed.");
578 if (writesShutdown
) {
579 throw new IOException("Socket output is shutdown.");
582 return new AppEngineSocketOutputStream(this);
586 * @see java.net.SocketImpl#available()
589 protected int available() throws IOException
{
594 * @see java.net.SocketImpl#close()
597 protected void close() throws IOException
{
598 if (descriptor
!= null) {
600 CloseRequest request
= new CloseRequest().setSocketDescriptor(descriptor
)
601 .setSendOffset(sendOffset
.get());
602 getSocketApiHelper().makeSyncCall("Close", request
, new CloseReply(), null);
604 currentState
= SocketState
.CLOSED
;
611 * @see java.net.SocketImpl#sendUrgentData(int)
614 protected void sendUrgentData(int data
) throws IOException
{
615 throw new IllegalStateException(
616 "AppEngineSocketImpl#sendUrgentData() function is unimplemented.");
620 * @see java.net.SocketImpl#shutdownInput
623 protected void shutdownInput() throws IOException
{
624 if (currentState
== SocketState
.CONNECTED
&& !readsShutdown
) {
625 ShutDownRequest request
= new ShutDownRequest().setSocketDescriptor(descriptor
)
626 .setSendOffset(sendOffset
.get())
627 .setHow(ShutDownRequest
.How
.SOCKET_SHUT_RD
);
628 getSocketApiHelper().makeSyncCall("ShutDown", request
, new ShutDownReply(), null);
629 readsShutdown
= true;
634 * @see java.net.SocketImpl#shutdownOutput
637 protected void shutdownOutput() throws IOException
{
638 if (currentState
== SocketState
.CONNECTED
&& !writesShutdown
) {
639 ShutDownRequest request
= new ShutDownRequest().setSocketDescriptor(descriptor
)
640 .setSendOffset(sendOffset
.get())
641 .setHow(ShutDownRequest
.How
.SOCKET_SHUT_WR
);
642 getSocketApiHelper().makeSyncCall("ShutDown", request
, new ShutDownReply(), null);
643 writesShutdown
= true;
648 * Used by {@link AppEngineSocketOutputStream} to send data.
650 * @param buf Buffer of bytes to write.
651 * @param off Offset into the buffer.
652 * @param len The length of bytes to write.
653 * @throws IOException On error conditions
655 protected void send(byte buf
[], int off
, int len
) throws IOException
{
656 checkStateIsNotOneOf("Socket is closed.",
657 SocketState
.UNINITIALIZED
, SocketState
.CREATE_CALLED
, SocketState
.CLOSED
);
658 if (writesShutdown
) {
659 throw new IOException("Socket output is shutdown.");
662 byte copy
[] = Arrays
.copyOfRange(buf
, off
, off
+ len
);
663 SendRequest request
= new SendRequest().setSocketDescriptor(descriptor
)
664 .setStreamOffset(sendOffset
.getAndAdd(copy
.length
))
665 .setDataAsBytes(copy
);
667 request
.setTimeoutSeconds(soTimeout
* 0.001);
670 RemoteSocketServiceError serviceError
= new RemoteSocketServiceError();
671 if (!getSocketApiHelper().makeSyncCall("Send", request
, new SendReply(), serviceError
)) {
672 if (serviceError
.getSystemError() == SystemError
.SYS_EAGAIN
.getValue()) {
673 throw new SocketTimeoutException("Write timed out");
675 throw SocketApiHelper
.translateError(
676 ErrorCode
.SYSTEM_ERROR
.getValue(),
677 "errno: " + serviceError
.getSystemError() +
678 ", detail:" + serviceError
.getErrorDetail());
684 * This is wrapped by {@link AppEngineSocketInputStream} and has the
685 * same semantics as {@link java.io.InputStream#read(byte[], int, int)}.
686 * @throws IOException On error conditions
688 protected int receive(byte[] buf
, int off
, int len
) throws IOException
{
689 checkStateIsNotOneOf("Socket is closed.",
690 SocketState
.UNINITIALIZED
, SocketState
.CREATE_CALLED
, SocketState
.CLOSED
);
693 throw new IOException("Socket input is shutdown.");
696 if (len
< 0 || off
< 0 || buf
.length
- off
< len
) {
700 throw new ArrayIndexOutOfBoundsException();
703 ReceiveReply response
= new ReceiveReply();
704 ReceiveRequest request
= new ReceiveRequest().setSocketDescriptor(descriptor
)
707 request
.setTimeoutSeconds(soTimeout
* 0.001);
709 RemoteSocketServiceError serviceError
= new RemoteSocketServiceError();
710 if (!getSocketApiHelper().makeSyncCall("Receive", request
, response
, serviceError
)) {
711 if (serviceError
.getSystemError() == SystemError
.SYS_EAGAIN
.getValue()) {
712 throw new SocketTimeoutException("Read timed out");
714 throw SocketApiHelper
.translateError(
715 ErrorCode
.SYSTEM_ERROR
.getValue(),
716 "errno: " + serviceError
.getSystemError() +
717 ", detail:" + serviceError
.getErrorDetail());
721 byte readBytes
[] = response
.getDataAsBytes();
723 if (readBytes
.length
> 0) {
724 System
.arraycopy(readBytes
, 0, buf
, off
, readBytes
.length
);
729 return readBytes
.length
;