Revision created by MOE tool push_codebase.
[gae.git] / java / src / main / com / google / appengine / api / socket / AppEngineSocketImpl.java
blob4c94b19d66a8ff948d14081ba96c957d8c5439c4
1 // Copyright 2012 Google Inc. All Rights Reserved.
3 package com.google.appengine.api.socket;
5 import com.google.appengine.api.socket.AppEngineSocketOptions.Option;
6 import com.google.appengine.api.socket.SocketServicePb.AcceptReply;
7 import com.google.appengine.api.socket.SocketServicePb.AcceptRequest;
8 import com.google.appengine.api.socket.SocketServicePb.AddressPort;
9 import com.google.appengine.api.socket.SocketServicePb.CloseReply;
10 import com.google.appengine.api.socket.SocketServicePb.CloseRequest;
11 import com.google.appengine.api.socket.SocketServicePb.ConnectReply;
12 import com.google.appengine.api.socket.SocketServicePb.ConnectRequest;
13 import com.google.appengine.api.socket.SocketServicePb.CreateSocketReply;
14 import com.google.appengine.api.socket.SocketServicePb.CreateSocketRequest;
15 import com.google.appengine.api.socket.SocketServicePb.CreateSocketRequest.SocketFamily;
16 import com.google.appengine.api.socket.SocketServicePb.CreateSocketRequest.SocketProtocol;
17 import com.google.appengine.api.socket.SocketServicePb.GetSocketNameReply;
18 import com.google.appengine.api.socket.SocketServicePb.GetSocketNameRequest;
19 import com.google.appengine.api.socket.SocketServicePb.GetSocketOptionsReply;
20 import com.google.appengine.api.socket.SocketServicePb.GetSocketOptionsRequest;
21 import com.google.appengine.api.socket.SocketServicePb.ListenReply;
22 import com.google.appengine.api.socket.SocketServicePb.ListenRequest;
23 import com.google.appengine.api.socket.SocketServicePb.ReceiveReply;
24 import com.google.appengine.api.socket.SocketServicePb.ReceiveRequest;
25 import com.google.appengine.api.socket.SocketServicePb.RemoteSocketServiceError;
26 import com.google.appengine.api.socket.SocketServicePb.RemoteSocketServiceError.ErrorCode;
27 import com.google.appengine.api.socket.SocketServicePb.RemoteSocketServiceError.SystemError;
28 import com.google.appengine.api.socket.SocketServicePb.SendReply;
29 import com.google.appengine.api.socket.SocketServicePb.SendRequest;
30 import com.google.appengine.api.socket.SocketServicePb.SetSocketOptionsReply;
31 import com.google.appengine.api.socket.SocketServicePb.SetSocketOptionsRequest;
32 import com.google.appengine.api.socket.SocketServicePb.ShutDownReply;
33 import com.google.appengine.api.socket.SocketServicePb.ShutDownRequest;
34 import com.google.appengine.api.socket.SocketServicePb.SocketOption.SocketOptionName;
36 import java.io.IOException;
37 import java.io.InputStream;
38 import java.io.OutputStream;
39 import java.io.Serializable;
40 import java.net.ConnectException;
41 import java.net.InetAddress;
42 import java.net.InetSocketAddress;
43 import java.net.SocketAddress;
44 import java.net.SocketException;
45 import java.net.SocketImpl;
46 import java.net.SocketOptions;
47 import java.net.SocketTimeoutException;
48 import java.net.UnknownHostException;
49 import java.util.Arrays;
50 import java.util.concurrent.atomic.AtomicLong;
52 /**
53 * Implements the {@link SocketImpl} interface for App Engine based sockets.
56 class AppEngineSocketImpl extends SocketImpl implements AppEngineSocketOptionsClient, Serializable {
57 private static final long serialVersionUID = -2683691443688405980L;
59 static enum SocketState {
60 /**
61 * Socket is uninitialized.
63 UNINITIALIZED,
65 /**
66 * {@link #create(boolean)} has been called.
67 * Note that this does not relate to remote socket create.
69 CREATE_CALLED,
71 /**
72 * Socket is bound.
74 BOUND,
76 /**
77 * Socket is connected.
79 CONNECTED,
81 /**
82 * Socket is listening.
84 LISTEN,
86 /**
87 * Socket is closed.
89 CLOSED
92 SocketState currentState = SocketState.UNINITIALIZED;
94 private SocketApiHelper socketHelper = null;
96 String descriptor = null;
98 int soTimeout = -1;
100 private boolean readsShutdown = false;
101 private boolean writesShutdown = false;
103 private final AtomicLong sendOffset = new AtomicLong(0);
105 private AppEngineSocketInputStream socketInputStream = null;
108 * True indicates that this socket is for a (TCP) stream socket otherwise
109 * it is a datagram socket (UDP).
111 private boolean stream;
113 private InetAddress localAddress;
115 AppEngineSocketImpl() {
118 AppEngineSocketImpl(SocketApiHelper socketHelper) {
119 this.socketHelper = socketHelper;
122 SocketApiHelper getSocketApiHelper() {
123 return socketHelper == null ? AppEngineSocketImplFactory.SOCKET_API_HELPER : socketHelper;
126 protected InetAddress toInetAddress(AddressPort addressPort) throws UnknownHostException {
127 return InetAddress.getByAddress(addressPort.getPackedAddressAsBytes());
130 void setOption(
131 AppEngineSocketOptions.Option option, Object value) throws SocketException {
132 checkNotClosed();
133 option.validateAndApply(this, value);
136 private void createSocketOrCheckNotClosed() throws SocketException {
137 if (currentState == SocketState.CREATE_CALLED) {
138 try {
139 createSocket(null, 0, null, 0, false);
140 } catch (SocketTimeoutException e) {
141 throw new SocketException(e.getMessage());
143 } else {
144 checkNotClosed();
149 * @see java.net.SocketOptions#setOption(int, java.lang.Object)
151 @Override
152 public void setOption(int opt, Object value) throws SocketException {
153 AppEngineSocketOptions.Option option = AppEngineSocketOptions.getOptionById(opt);
154 if (option == null) {
155 throw new SocketException("unrecognized socket option: " + opt);
157 setOption(option, value);
160 String statesToString(SocketState...socketStates) {
161 StringBuilder builder = new StringBuilder();
162 String prefix = "";
163 for (SocketState state : socketStates) {
164 builder.append(prefix);
165 prefix = ", ";
166 builder.append(state.toString());
168 return builder.toString();
171 void checkStateIsOneOf(String errorMsg, SocketState...socketStates) throws SocketException {
172 for (SocketState state : socketStates) {
173 if (currentState == state) {
174 return;
177 throw new SocketException(errorMsg + ": Expected to be in one of states : (" +
178 statesToString(socketStates) + "), was in state " + currentState);
181 void checkStateIsNotOneOf(String errorMsg, SocketState...socketStates) throws SocketException {
182 for (SocketState state : socketStates) {
183 if (currentState == state) {
184 throw new SocketException(errorMsg + ": Expected to not be in one of (" +
185 statesToString(socketStates) + "), was in state " + currentState);
191 * Checks that the socket is not closed.
193 private void checkNotClosed() throws SocketException {
194 checkStateIsNotOneOf("Socket is closed", SocketState.UNINITIALIZED, SocketState.CLOSED);
198 * Implements the remote service SetSocketOptions call.
200 @Override
201 public void setSocketOptionAsBytes(AppEngineSocketOptions.Option opt, byte[] value)
202 throws SocketException {
203 SocketOptionName name = opt.getOptName();
204 if (name == null) {
205 return;
207 createSocketOrCheckNotClosed();
208 SetSocketOptionsRequest request = new SetSocketOptionsRequest();
209 SetSocketOptionsReply response = new SetSocketOptionsReply();
210 request.setSocketDescriptor(descriptor);
211 request.addOptions().setLevel(opt.getLevel()).setOption(name).setValueAsBytes(value);
213 getSocketApiHelper().makeSyncCall("SetSocketOptions", request, response, null);
216 @Override
217 public void setTimeout(int timeout) {
218 soTimeout = timeout;
222 * @see java.net.SocketOptions#getOption(int)
224 @Override
225 public Object getOption(int optID) throws SocketException {
226 if (SocketOptions.SO_BINDADDR == optID) {
227 if (localAddress == null) {
228 createSocketOrCheckNotClosed();
229 fixLocalAddress();
231 return localAddress;
233 if (SocketOptions.SO_TIMEOUT == optID) {
234 return soTimeout <= 0 ? 0 : soTimeout;
237 AppEngineSocketOptions.Option option = AppEngineSocketOptions.getOptionById(optID);
238 if (option == null) {
239 throw new SocketException("unrecognized socket option: " + optID);
242 try {
243 return getOption(option);
244 } catch (SocketException e) {
245 if (SocketOptions.SO_LINGER == optID) {
246 return -1;
248 throw e;
253 * Returns the value of the given option.
254 * @throws SocketException
256 private Object getOption(Option option) throws SocketException {
257 createSocketOrCheckNotClosed();
258 return option.getOption(this);
262 * Called by {@link Option#getOption(AppEngineSocketImpl)}.
263 * @see AppEngineSocketOptionsClient#getSocketOptionAsBytes(Option)
265 @Override
266 public byte[] getSocketOptionAsBytes(Option option) throws SocketException {
267 GetSocketOptionsRequest request = new GetSocketOptionsRequest();
268 GetSocketOptionsReply response = new GetSocketOptionsReply();
269 request.setSocketDescriptor(descriptor);
270 request.addOptions().setLevel(option.getLevel()).setOption(option.getOptName());
271 getSocketApiHelper().makeSyncCall("GetSocketOptions", request, response, null);
272 return response.optionss().get(0).getValueAsBytes();
276 * @see java.net.SocketImpl#create(boolean)
278 @Override
279 protected synchronized void create(boolean stream) throws IOException {
280 checkStateIsOneOf("Socket is already created", SocketState.UNINITIALIZED);
281 currentState = SocketState.CREATE_CALLED;
282 this.stream = stream;
286 * @see java.net.SocketImpl#connect(java.lang.String, int)
288 @Override
289 protected synchronized void connect(String host, int port) throws IOException {
290 try {
291 InetAddress address = InetAddress.getByName(host);
292 connectToAddress(address, port, null);
293 } catch (UnknownHostException e) {
294 releaseSocket();
295 throw e;
300 * @see java.net.SocketImpl#connect(java.net.InetAddress, int)
302 @Override
303 protected synchronized void connect(InetAddress address, int port) throws IOException {
304 if (address == null) {
305 throw new IllegalArgumentException("null address is illegal for connect");
308 checkStateIsOneOf("socket is not created", SocketState.CREATE_CALLED, SocketState.BOUND);
310 connectToAddress(address, port, null);
314 * @see java.net.SocketImpl#connect(java.net.SocketAddress, int)
316 @Override
317 protected synchronized void connect(SocketAddress socketAddress, int timeout) throws IOException {
318 if (socketAddress == null) {
319 throw new IllegalArgumentException("null address is illegal for connect");
322 if (!(socketAddress instanceof InetSocketAddress)) {
323 throw new IllegalArgumentException("Address must be of type InetSocketAddress");
326 InetSocketAddress addr = (InetSocketAddress) socketAddress;
328 if (addr.isUnresolved()) {
329 throw new UnknownHostException(addr.getHostName());
332 connectToAddress(addr.getAddress(), addr.getPort(), timeout == 0 ? null : timeout);
335 private void connectToAddress(InetAddress address, int port, Integer timeoutMillis)
336 throws SocketException, SocketTimeoutException {
337 switch (currentState) {
338 case CREATE_CALLED: {
339 if (timeoutMillis == null) {
340 createSocket(null, 0, address, port, true);
341 } else {
342 createSocket(null, 0, address, port, false);
343 connectSocket(address, port, timeoutMillis);
345 break;
347 case BOUND: {
348 connectSocket(address, port, timeoutMillis);
349 break;
351 default: {
352 break;
356 this.port = port;
357 this.address = address;
360 private void processConnectError(RemoteSocketServiceError serviceError)
361 throws SocketException, SocketTimeoutException {
362 releaseSocket();
363 if (SystemError.valueOf(serviceError.getSystemError()) == SystemError.SYS_EINPROGRESS) {
364 throw new SocketTimeoutException();
366 if (SystemError.valueOf(serviceError.getSystemError()) == SystemError.SYS_ETIMEDOUT) {
367 throw new SocketTimeoutException();
369 if (SystemError.valueOf(serviceError.getSystemError()) == SystemError.SYS_ECONNREFUSED) {
370 throw new ConnectException(serviceError.getErrorDetail());
372 throw SocketApiHelper.translateError(
373 ErrorCode.SYSTEM_ERROR.getValue(),
374 "errno: " + serviceError.getSystemError() + ", detail:" + serviceError.getErrorDetail());
378 * Performs connect given a timeout.
380 private void connectSocket(InetAddress remoteAddress, int remotePort, Integer timeoutMillis)
381 throws SocketException, SocketTimeoutException {
382 if (remoteAddress == null) {
383 throw new IllegalArgumentException("remoteAddress must not be null if connect requested");
386 ConnectRequest request = new ConnectRequest().setSocketDescriptor(descriptor);
387 ConnectReply response = new ConnectReply();
389 request.getMutableRemoteIp()
390 .setPort(remotePort)
391 .setPackedAddressAsBytes(AppEngineSocketUtils.addrAsIpv6Bytes(remoteAddress));
392 if (timeoutMillis != null) {
393 request.setTimeoutSeconds(0.001D * timeoutMillis);
396 RemoteSocketServiceError serviceError = new RemoteSocketServiceError();
397 if (!getSocketApiHelper().makeSyncCall("Connect", request, response, serviceError)) {
398 processConnectError(serviceError);
401 currentState = SocketState.CONNECTED;
402 fixLocalAddress();
406 * Releases the socket and absorbs any exceptions.
408 private void releaseSocket() {
409 readsShutdown = false;
410 writesShutdown = false;
411 try {
412 close();
413 } catch (IOException ignored) {
415 currentState = SocketState.CLOSED;
418 private void fixLocalAddress() throws SocketException {
419 GetSocketNameRequest request = new GetSocketNameRequest().setSocketDescriptor(descriptor);
420 GetSocketNameReply response = new GetSocketNameReply();
422 getSocketApiHelper().makeSyncCall("GetSocketName", request, response, null);
423 AddressPort externalIp = response.getProxyExternalIp();
424 this.localport = externalIp.getPort();
426 try {
427 localAddress = toInetAddress(externalIp);
428 } catch (UnknownHostException e) {
429 throw new SocketException(e.toString());
434 * Create the remote socket with an optional bind and connect. Either the
435 * remoteAddress or bindAddress or both parameters must be provided.
436 * It is used to determine if socket is to be created with an IPV4 or IPV6
437 * address family.
438 * @throws SocketException
439 * @throws SocketTimeoutException
441 private void createSocket(
442 InetAddress bindAddress, int bindPort, InetAddress remoteAddress,
443 int remotePort, boolean connect)
444 throws SocketException, SocketTimeoutException {
446 CreateSocketRequest request = new CreateSocketRequest();
447 CreateSocketReply response = new CreateSocketReply();
449 request.setFamily(SocketFamily.IPv6);
450 request.setProtocol(stream ? SocketProtocol.TCP : SocketProtocol.UDP);
452 if (bindAddress != null) {
453 request.getMutableProxyExternalIp()
454 .setPort(bindPort)
455 .setPackedAddressAsBytes(AppEngineSocketUtils.addrAsIpv6Bytes(bindAddress));
458 if (connect) {
459 if (remoteAddress == null) {
460 throw new IllegalArgumentException("remoteAddress must not be null if connect requested");
462 request.getMutableRemoteIp()
463 .setPort(remotePort)
464 .setPackedAddressAsBytes(AppEngineSocketUtils.addrAsIpv6Bytes(remoteAddress));
465 RemoteSocketServiceError serviceError = new RemoteSocketServiceError();
466 if (!getSocketApiHelper().makeSyncCall("CreateSocket", request, response, serviceError)) {
467 processConnectError(serviceError);
469 } else {
470 getSocketApiHelper().makeSyncCall("CreateSocket", request, response, null);
473 descriptor = response.getSocketDescriptor();
474 currentState = connect ? SocketState.CONNECTED : SocketState.BOUND;
476 if (connect) {
477 fixLocalAddress();
482 * @see java.net.SocketImpl#bind(java.net.InetAddress, int)
484 @Override
485 protected synchronized void bind(InetAddress host, int port) throws IOException {
486 createSocket(host, port, null, 0, false);
487 currentState = SocketState.BOUND;
491 * @see java.net.SocketImpl#listen(int)
493 @Override
494 protected void listen(int backlog) throws IOException {
495 checkStateIsNotOneOf("Socket must be in a bound state.", SocketState.UNINITIALIZED,
496 SocketState.CREATE_CALLED, SocketState.CONNECTED, SocketState.CLOSED);
497 ListenRequest request = new ListenRequest().setSocketDescriptor(descriptor).setBacklog(backlog);
498 getSocketApiHelper().makeSyncCall("Listen", request, new ListenReply(), null);
499 currentState = SocketState.LISTEN;
503 * @see java.net.SocketImpl#accept(java.net.SocketImpl)
505 @Override
506 protected void accept(SocketImpl s) throws IOException {
507 checkStateIsOneOf("Socket is not in passive (accepting) mode.", SocketState.LISTEN);
508 if (!(s instanceof AppEngineSocketImpl)) {
509 throw new IllegalStateException(
510 "Expected a SocketImpl compatable with '" + this.getClass().getName() +
511 "'. A '" + s.getClass().getName() + "' was received.");
514 AppEngineSocketImpl acceptingSocket = (AppEngineSocketImpl) s;
515 AcceptReply response = new AcceptReply();
516 AcceptRequest request = new AcceptRequest().setSocketDescriptor(descriptor);
517 getSocketApiHelper().makeSyncCall("Accept", request, response, null);
518 acceptingSocket.doAccept(response);
522 * Initializes an AppEngineSocketImpl from an AcceptReply message.
523 * @throws SocketException
525 private void doAccept(AcceptReply response) throws SocketException {
526 switch (currentState) {
527 case UNINITIALIZED:
528 case CREATE_CALLED:
529 case CLOSED: {
530 break;
532 default: {
533 releaseSocket();
537 stream = true;
538 descriptor = response.getNewSocketDescriptor();
539 AddressPort addressPort = response.getRemoteAddress();
540 try {
541 address = InetAddress.getByAddress(addressPort.getPackedAddressAsBytes());
542 } catch (UnknownHostException e) {
543 throw new SocketException(e.getMessage());
546 port = addressPort.getPort();
547 currentState = SocketState.CONNECTED;
551 * @see java.net.SocketImpl#getInputStream()
553 @Override
554 protected InputStream getInputStream() throws IOException {
555 if (currentState == SocketState.CLOSED) {
556 throw new IOException("Socket closed.");
559 if (readsShutdown) {
560 throw new IOException("Socket input is shutdown.");
563 if (socketInputStream == null) {
564 socketInputStream = new AppEngineSocketInputStream(this);
566 return socketInputStream;
570 * @see java.net.SocketImpl#getOutputStream()
572 @Override
573 protected OutputStream getOutputStream() throws IOException {
574 if (currentState == SocketState.CLOSED) {
575 throw new IOException("Socket closed.");
578 if (writesShutdown) {
579 throw new IOException("Socket output is shutdown.");
582 return new AppEngineSocketOutputStream(this);
586 * @see java.net.SocketImpl#available()
588 @Override
589 protected int available() throws IOException {
590 return 0;
594 * @see java.net.SocketImpl#close()
596 @Override
597 protected void close() throws IOException {
598 if (descriptor != null) {
599 try {
600 CloseRequest request = new CloseRequest().setSocketDescriptor(descriptor)
601 .setSendOffset(sendOffset.get());
602 getSocketApiHelper().makeSyncCall("Close", request, new CloseReply(), null);
603 } finally {
604 currentState = SocketState.CLOSED;
605 descriptor = null;
611 * @see java.net.SocketImpl#sendUrgentData(int)
613 @Override
614 protected void sendUrgentData(int data) throws IOException {
615 throw new IllegalStateException(
616 "AppEngineSocketImpl#sendUrgentData() function is unimplemented.");
620 * @see java.net.SocketImpl#shutdownInput
622 @Override
623 protected void shutdownInput() throws IOException {
624 if (currentState == SocketState.CONNECTED && !readsShutdown) {
625 ShutDownRequest request = new ShutDownRequest().setSocketDescriptor(descriptor)
626 .setSendOffset(sendOffset.get())
627 .setHow(ShutDownRequest.How.SOCKET_SHUT_RD);
628 getSocketApiHelper().makeSyncCall("ShutDown", request, new ShutDownReply(), null);
629 readsShutdown = true;
634 * @see java.net.SocketImpl#shutdownOutput
636 @Override
637 protected void shutdownOutput() throws IOException {
638 if (currentState == SocketState.CONNECTED && !writesShutdown) {
639 ShutDownRequest request = new ShutDownRequest().setSocketDescriptor(descriptor)
640 .setSendOffset(sendOffset.get())
641 .setHow(ShutDownRequest.How.SOCKET_SHUT_WR);
642 getSocketApiHelper().makeSyncCall("ShutDown", request, new ShutDownReply(), null);
643 writesShutdown = true;
648 * Used by {@link AppEngineSocketOutputStream} to send data.
650 * @param buf Buffer of bytes to write.
651 * @param off Offset into the buffer.
652 * @param len The length of bytes to write.
653 * @throws IOException On error conditions
655 protected void send(byte buf[], int off, int len) throws IOException {
656 checkStateIsNotOneOf("Socket is closed.",
657 SocketState.UNINITIALIZED, SocketState.CREATE_CALLED, SocketState.CLOSED);
658 if (writesShutdown) {
659 throw new IOException("Socket output is shutdown.");
662 byte copy[] = Arrays.copyOfRange(buf, off, off + len);
663 SendRequest request = new SendRequest().setSocketDescriptor(descriptor)
664 .setStreamOffset(sendOffset.getAndAdd(copy.length))
665 .setDataAsBytes(copy);
666 if (soTimeout > 0) {
667 request.setTimeoutSeconds(soTimeout * 0.001);
670 RemoteSocketServiceError serviceError = new RemoteSocketServiceError();
671 if (!getSocketApiHelper().makeSyncCall("Send", request, new SendReply(), serviceError)) {
672 if (serviceError.getSystemError() == SystemError.SYS_EAGAIN.getValue()) {
673 throw new SocketTimeoutException("Write timed out");
674 } else {
675 throw SocketApiHelper.translateError(
676 ErrorCode.SYSTEM_ERROR.getValue(),
677 "errno: " + serviceError.getSystemError() +
678 ", detail:" + serviceError.getErrorDetail());
684 * This is wrapped by {@link AppEngineSocketInputStream} and has the
685 * same semantics as {@link java.io.InputStream#read(byte[], int, int)}.
686 * @throws IOException On error conditions
688 protected int receive(byte[] buf, int off, int len) throws IOException {
689 checkStateIsNotOneOf("Socket is closed.",
690 SocketState.UNINITIALIZED, SocketState.CREATE_CALLED, SocketState.CLOSED);
692 if (readsShutdown) {
693 throw new IOException("Socket input is shutdown.");
696 if (len < 0 || off < 0 || buf.length - off < len) {
697 if (len == 0) {
698 return 0;
700 throw new ArrayIndexOutOfBoundsException();
703 ReceiveReply response = new ReceiveReply();
704 ReceiveRequest request = new ReceiveRequest().setSocketDescriptor(descriptor)
705 .setDataSize(len);
706 if (soTimeout > 0) {
707 request.setTimeoutSeconds(soTimeout * 0.001);
709 RemoteSocketServiceError serviceError = new RemoteSocketServiceError();
710 if (!getSocketApiHelper().makeSyncCall("Receive", request, response, serviceError)) {
711 if (serviceError.getSystemError() == SystemError.SYS_EAGAIN.getValue()) {
712 throw new SocketTimeoutException("Read timed out");
713 } else {
714 throw SocketApiHelper.translateError(
715 ErrorCode.SYSTEM_ERROR.getValue(),
716 "errno: " + serviceError.getSystemError() +
717 ", detail:" + serviceError.getErrorDetail());
721 byte readBytes[] = response.getDataAsBytes();
723 if (readBytes.length > 0) {
724 System.arraycopy(readBytes, 0, buf, off, readBytes.length);
725 } else {
726 return -1;
729 return readBytes.length;