App Engine Java SDK version 1.7.0
[gae.git] / java / src / main / com / google / appengine / tools / development / ApiProxyLocalImpl.java
blob790af9d416c70026ee6b0d05e85546ed85051b3a
1 // Copyright 2008 Google Inc. All Rights Reserved.
3 package com.google.appengine.tools.development;
5 import com.google.appengine.api.capabilities.CapabilityStatus;
6 import com.google.appengine.tools.development.LocalRpcService.Status;
7 import com.google.apphosting.api.ApiProxy;
8 import com.google.apphosting.api.ApiProxy.CallNotFoundException;
9 import com.google.apphosting.api.ApiProxy.Environment;
10 import com.google.apphosting.api.ApiProxy.LogRecord;
11 import com.google.apphosting.api.ApiProxy.RequestTooLargeException;
12 import com.google.apphosting.api.ApiProxy.UnknownException;
13 import com.google.io.protocol.ProtocolMessage;
14 import com.google.protobuf.Message;
16 import java.lang.reflect.InvocationTargetException;
17 import java.lang.reflect.Method;
18 import java.security.AccessController;
19 import java.security.PrivilegedAction;
20 import java.util.Arrays;
21 import java.util.Formatter;
22 import java.util.HashMap;
23 import java.util.Iterator;
24 import java.util.List;
25 import java.util.Map;
26 import java.util.concurrent.Callable;
27 import java.util.concurrent.CancellationException;
28 import java.util.concurrent.ConcurrentHashMap;
29 import java.util.concurrent.ExecutionException;
30 import java.util.concurrent.ExecutorService;
31 import java.util.concurrent.Executors;
32 import java.util.concurrent.Future;
33 import java.util.concurrent.Semaphore;
34 import java.util.concurrent.ThreadFactory;
35 import java.util.concurrent.TimeUnit;
36 import java.util.concurrent.TimeoutException;
37 import java.util.logging.Level;
38 import java.util.logging.Logger;
40 /**
41 * Implements ApiProxy.Delegate such that the requests are dispatched to local service
42 * implementations. Used for both the {@link com.google.appengine.tools.development.DevAppServer}
43 * and for unit testing services.
46 class ApiProxyLocalImpl implements ApiProxyLocal {
48 private static final Class<?> BYTE_ARRAY_CLASS = byte[].class;
50 /**
51 * The maximum size of any given API request.
53 private static final int MAX_API_REQUEST_SIZE = 1048576;
55 private static final String API_DEADLINE_KEY =
56 "com.google.apphosting.api.ApiProxy.api_deadline_key";
58 static final String IS_OFFLINE_REQUEST_KEY = "com.google.appengine.request.offline";
60 /**
61 * Implementation of the {@link LocalServiceContext} interface
63 private class LocalServiceContextImpl implements LocalServiceContext {
65 /**
66 * The local server environment
68 private final LocalServerEnvironment localServerEnvironment;
70 private final LocalCapabilitiesEnvironment localCapabilitiesEnvironment =
71 new LocalCapabilitiesEnvironment(System.getProperties());
73 /**
74 * Creates a new context, for the given application.
76 * @param localServerEnvironment The environment for the local server.
78 public LocalServiceContextImpl(LocalServerEnvironment localServerEnvironment) {
79 this.localServerEnvironment = localServerEnvironment;
82 @Override
83 public LocalServerEnvironment getLocalServerEnvironment() {
84 return localServerEnvironment;
87 @Override
88 public LocalCapabilitiesEnvironment getLocalCapabilitiesEnvironment() {
89 return localCapabilitiesEnvironment;
92 @Override
93 public Clock getClock() {
94 return clock;
97 @Override
98 public LocalRpcService getLocalService(String packageName) {
99 return ApiProxyLocalImpl.this.getService(packageName);
103 private static final Logger logger = Logger.getLogger(ApiProxyLocalImpl.class.getName());
105 private final Map<String, LocalRpcService> serviceCache =
106 new ConcurrentHashMap<String, LocalRpcService>();
108 private final Map<String, Method> methodCache = new ConcurrentHashMap<String, Method>();
109 final Map<Method, LatencySimulator> latencySimulatorCache =
110 new ConcurrentHashMap<Method, LatencySimulator>();
112 private final Map<String, String> properties = new HashMap<String, String>();
114 private final ExecutorService apiExecutor = Executors.newCachedThreadPool(
115 new DaemonThreadFactory(Executors.defaultThreadFactory()));
117 private final LocalServiceContext context;
119 private Clock clock = Clock.DEFAULT;
122 * Creates the local proxy in a given context
124 * @param environment the local server environment.
126 protected ApiProxyLocalImpl(LocalServerEnvironment environment) {
127 this.context = new LocalServiceContextImpl(environment);
130 @Override
131 public void log(Environment environment, LogRecord record) {
132 logger.log(toJavaLevel(record.getLevel()), record.getMessage());
135 @Override
136 public void flushLogs(Environment environment) {
137 System.err.flush();
140 @Override
141 public byte[] makeSyncCall(ApiProxy.Environment environment, String packageName,
142 String methodName, byte[] requestBytes) {
143 ApiProxy.ApiConfig apiConfig = null;
144 Double deadline = (Double) environment.getAttributes().get(API_DEADLINE_KEY);
145 if (deadline != null) {
146 apiConfig = new ApiProxy.ApiConfig();
147 apiConfig.setDeadlineInSeconds(deadline);
150 Future<byte[]> future =
151 doAsyncCall(environment, packageName, methodName, requestBytes, apiConfig);
152 try {
153 return future.get();
154 } catch (InterruptedException ex) {
155 throw new ApiProxy.CancelledException(packageName, methodName);
156 } catch (CancellationException ex) {
157 throw new ApiProxy.CancelledException(packageName, methodName);
158 } catch (ExecutionException ex) {
159 if (ex.getCause() instanceof RuntimeException) {
160 throw (RuntimeException) ex.getCause();
161 } else if (ex.getCause() instanceof Error) {
162 throw (Error) ex.getCause();
163 } else {
164 throw new ApiProxy.UnknownException(packageName, methodName, ex.getCause());
169 @Override
170 public Future<byte[]> makeAsyncCall(Environment environment, String packageName,
171 String methodName, byte[] requestBytes, ApiProxy.ApiConfig apiConfig) {
172 return doAsyncCall(environment, packageName, methodName, requestBytes, apiConfig);
175 @Override
176 public List<Thread> getRequestThreads(Environment environment) {
177 return Arrays.asList(new Thread[]{Thread.currentThread()});
180 private Future<byte[]> doAsyncCall(Environment environment, final String packageName,
181 final String methodName, byte[] requestBytes, ApiProxy.ApiConfig apiConfig) {
182 Semaphore semaphore = (Semaphore) environment.getAttributes().get(
183 LocalEnvironment.API_CALL_SEMAPHORE);
184 if (semaphore != null) {
185 try {
186 semaphore.acquire();
187 } catch (InterruptedException ex) {
188 throw new RuntimeException("Interrupted while waiting on semaphore:", ex);
191 boolean offline = environment.getAttributes().get(IS_OFFLINE_REQUEST_KEY) != null;
192 AsyncApiCall asyncApiCall = new AsyncApiCall(
193 environment, packageName, methodName, requestBytes, semaphore);
194 boolean success = false;
195 try {
196 Callable<byte[]> callable = Executors.privilegedCallable(asyncApiCall);
198 Future<byte[]> resultFuture = AccessController.doPrivileged(
199 new PrivilegedApiAction(callable, asyncApiCall));
200 success = true;
201 if (context.getLocalServerEnvironment().enforceApiDeadlines()) {
202 long deadlineMillis = (long) (1000.0 * resolveDeadline(packageName, apiConfig, offline));
203 resultFuture = new TimedFuture<byte[]>(resultFuture, deadlineMillis, clock) {
204 @Override
205 protected RuntimeException createDeadlineException() {
206 throw new ApiProxy.ApiDeadlineExceededException(packageName, methodName);
210 return resultFuture;
211 } finally {
212 if (!success) {
213 asyncApiCall.tryReleaseSemaphore();
218 private double resolveDeadline(String packageName, ApiProxy.ApiConfig apiConfig,
219 boolean isOffline) {
220 LocalRpcService service = getService(packageName);
221 Double deadline = null;
222 if (apiConfig != null) {
223 deadline = apiConfig.getDeadlineInSeconds();
225 if (deadline == null && service != null) {
226 deadline = service.getDefaultDeadline(isOffline);
228 if (deadline == null) {
229 deadline = 5.0;
232 Double maxDeadline = null;
233 if (service != null) {
234 maxDeadline = service.getMaximumDeadline(isOffline);
236 if (maxDeadline == null) {
237 maxDeadline = 10.0;
239 return Math.min(deadline, maxDeadline);
242 private class PrivilegedApiAction implements PrivilegedAction<Future<byte[]>> {
244 private final Callable<byte[]> callable;
245 private final AsyncApiCall asyncApiCall;
247 PrivilegedApiAction(Callable<byte[]> callable, AsyncApiCall asyncApiCall) {
248 this.callable = callable;
249 this.asyncApiCall = asyncApiCall;
252 @Override
253 public Future<byte[]> run() {
254 final Future<byte[]> result = apiExecutor.submit(callable);
255 return new Future<byte[]>() {
256 @Override
257 public boolean cancel(final boolean mayInterruptIfRunning) {
258 return AccessController.doPrivileged(
259 new PrivilegedAction<Boolean>() {
260 @Override
261 public Boolean run() {
262 asyncApiCall.tryReleaseSemaphore();
263 return result.cancel(mayInterruptIfRunning);
268 @Override
269 public boolean isCancelled() {
270 return result.isCancelled();
273 @Override
274 public boolean isDone() {
275 return result.isDone();
278 @Override
279 public byte[] get() throws InterruptedException, ExecutionException {
280 return result.get();
283 @Override
284 public byte[] get(long timeout, TimeUnit unit)
285 throws InterruptedException, ExecutionException, TimeoutException {
286 return result.get(timeout, unit);
293 * Convert the specified byte array to a protocol buffer representation of the specified type.
294 * This type can either be a subclass of {@link ProtocolMessage} (a legacy protocol buffer
295 * implementation), or {@link Message} (the open-sourced protocol buffer implementation).
297 private <T> T convertBytesToPb(byte[] bytes, Class<T> requestClass)
298 throws IllegalAccessException, InstantiationException,
299 InvocationTargetException, NoSuchMethodException {
300 if (ProtocolMessage.class.isAssignableFrom(requestClass)) {
301 ProtocolMessage<?> proto = (ProtocolMessage<?>) requestClass.newInstance();
302 proto.mergeFrom(bytes);
303 return requestClass.cast(proto);
305 if (Message.class.isAssignableFrom(requestClass)) {
306 Method method = requestClass.getMethod("parseFrom", BYTE_ARRAY_CLASS);
307 return requestClass.cast(method.invoke(null, bytes));
309 throw new UnsupportedOperationException(format("Cannot assign %s to either %s or %s",
310 classDescription(requestClass), ProtocolMessage.class, Message.class));
314 * Convert the protocol buffer representation to a byte array. The object can either be an
315 * instance of {@link ProtocolMessage} (a legacy protocol buffer implementation), or {@link
316 * Message} (the open-sourced protocol buffer implementation).
318 private byte[] convertPbToBytes(Object object) {
319 if (object instanceof ProtocolMessage) {
320 return ((ProtocolMessage<?>) object).toByteArray();
322 if (object instanceof Message) {
323 return ((Message) object).toByteArray();
325 throw new UnsupportedOperationException(format("%s is neither %s nor %s",
326 classDescription(object.getClass()), ProtocolMessage.class, Message.class));
330 * Create a textual description of a class that is appropriate for
331 * troubleshooting problems with {@link #convertBytesToPb(byte[], Class)}
332 * or {@link #convertPbToBytes(Object)}.
334 * @param klass The class to create a description for.
335 * @return A string description.
337 private static String classDescription(Class<?> klass) {
338 return format("(%s extends %s loaded from %s)",
339 klass, klass.getSuperclass(),
340 klass.getProtectionDomain().getCodeSource().getLocation());
344 * A helper utility that abstracts the ceremony of using a one time
345 * formatter. The arguments are passed unmodified to Formatter.format()
346 * and the return is the unmodified Formatter.toString();
348 private static String format(String format, Object ... args) {
349 return new Formatter().format(format, args).toString();
352 @Override
353 public void setProperty(String property, String value) {
354 properties.put(property, value);
358 * Resets the service properties to {@code properties}.
360 * @param properties a maybe {@code null} set of properties for local services.
362 @Override
363 public void setProperties(Map<String, String> properties) {
364 this.properties.clear();
365 if (properties != null) {
366 this.appendProperties(properties);
371 * Appends the given service properties to {@code properties}.
373 * @param properties a set of properties to append for local services.
375 @Override
376 public void appendProperties(Map<String, String> properties) {
377 this.properties.putAll(properties);
381 * Stops all services started by this ApiProxy and releases all of its resources.
383 @Override
384 public void stop() {
385 for (LocalRpcService service : serviceCache.values()) {
386 service.stop();
389 serviceCache.clear();
390 methodCache.clear();
391 latencySimulatorCache.clear();
395 int getMaxApiRequestSize(LocalRpcService rpcService) {
396 Integer size = rpcService.getMaxApiRequestSize();
397 if (size == null) {
398 return MAX_API_REQUEST_SIZE;
400 return size;
403 private Method getDispatchMethod(LocalRpcService service, String packageName, String methodName) {
404 String dispatchName = Character.toLowerCase(methodName.charAt(0)) + methodName.substring(1);
405 String methodId = packageName + "." + dispatchName;
406 Method method = methodCache.get(methodId);
407 if (method != null) {
408 return method;
410 for (Method candidate : service.getClass().getMethods()) {
411 if (dispatchName.equals(candidate.getName())) {
412 methodCache.put(methodId, candidate);
413 LatencyPercentiles latencyPercentiles = candidate.getAnnotation(LatencyPercentiles.class);
414 if (latencyPercentiles == null) {
416 latencyPercentiles = service.getClass().getAnnotation(LatencyPercentiles.class);
418 if (latencyPercentiles != null) {
419 latencySimulatorCache.put(candidate, new LatencySimulator(latencyPercentiles));
421 return candidate;
424 throw new CallNotFoundException(packageName, methodName);
427 private class AsyncApiCall implements Callable<byte[]> {
429 private final Environment environment;
430 private final String packageName;
431 private final String methodName;
432 private final byte[] requestBytes;
433 private final Semaphore semaphore;
434 private boolean released;
436 public AsyncApiCall(Environment environment, String packageName, String methodName,
437 byte[] requestBytes, Semaphore semaphore) {
438 this.environment = environment;
439 this.packageName = packageName;
440 this.methodName = methodName;
441 this.requestBytes = requestBytes;
442 this.semaphore = semaphore;
445 @Override
446 public byte[] call() {
447 try {
448 return callInternal();
449 } finally {
450 tryReleaseSemaphore();
454 private byte[] callInternal() {
455 ApiProxy.setEnvironmentForCurrentThread(environment);
456 try {
457 LocalRpcService service = getService(packageName);
458 if (service == null) {
459 throw new CallNotFoundException(packageName, methodName);
461 LocalCapabilitiesEnvironment capEnv = context.getLocalCapabilitiesEnvironment();
462 CapabilityStatus capabilityStatus = capEnv
463 .getStatusFromMethodName(packageName, methodName);
464 if (!CapabilityStatus.ENABLED.equals(capabilityStatus)) {
465 throw new ApiProxy.CapabilityDisabledException(
466 "Setup in local configuration.", packageName, methodName);
469 if (requestBytes.length > getMaxApiRequestSize(service)) {
470 throw new RequestTooLargeException(packageName, methodName);
473 Method method = getDispatchMethod(service, packageName, methodName);
474 Status status = new Status();
475 Class<?> requestClass = method.getParameterTypes()[1];
476 Object request = convertBytesToPb(requestBytes, requestClass);
478 long start = clock.getCurrentTime();
479 try {
480 return convertPbToBytes(method.invoke(service, status, request));
481 } finally {
482 LatencySimulator latencySimulator = latencySimulatorCache.get(method);
483 if (latencySimulator != null) {
484 if (context.getLocalServerEnvironment().simulateProductionLatencies()) {
485 latencySimulator.simulateLatency(clock.getCurrentTime() - start, service, request);
489 } catch (IllegalAccessException e) {
490 throw new UnknownException(packageName, methodName, e);
491 } catch (InstantiationException e) {
492 throw new UnknownException(packageName, methodName, e);
493 } catch (NoSuchMethodException e) {
494 throw new UnknownException(packageName, methodName, e);
495 } catch (InvocationTargetException e) {
496 if (e.getCause() instanceof RuntimeException) {
497 throw (RuntimeException) e.getCause();
499 throw new UnknownException(packageName, methodName, e.getCause());
500 } finally {
501 ApiProxy.clearEnvironmentForCurrentThread();
506 * Synchronized method that ensures the semaphore that was claimed for this API call only gets
507 * released once.
509 synchronized void tryReleaseSemaphore() {
510 if (!released && semaphore != null) {
511 semaphore.release();
512 released = true;
518 * Method needs to be synchronized to ensure that we don't end up starting multiple instances of
519 * the same service. As an example, we've seen a race condition where the local datastore service
520 * has not yet been initialized and two datastore requests come in at the exact same time. The
521 * first request looks in the service cache, doesn't find it, starts a new local datastore
522 * service, registers it in the service cache, and uses that local datastore service to handle the
523 * first request. Meanwhile the second request looks in the service cache, doesn't find it,
524 * starts a new local datastore service, registers it in the service cache (stomping on the
525 * original one), and uses that local datastore service to handle the second request. If both of
526 * these requests start txns we can end up with 2 requests receiving the same txn id, and that
527 * yields all sorts of exciting behavior. So, we synchronize this method to ensure that we only
528 * register a single instance of each service type.
530 @Override
531 public synchronized final LocalRpcService getService(final String pkg) {
532 LocalRpcService cachedService = serviceCache.get(pkg);
533 if (cachedService != null) {
534 return cachedService;
537 return AccessController.doPrivileged(
538 new PrivilegedAction<LocalRpcService>() {
539 @Override
540 public LocalRpcService run() {
541 return startServices(pkg);
546 private LocalRpcService startServices(String pkg) {
547 @SuppressWarnings({"unchecked", "sunapi"})
548 Iterator<LocalRpcService> services = sun.misc.Service.providers(LocalRpcService.class,
549 ApiProxyLocalImpl.class.getClassLoader());
550 while (services.hasNext()) {
551 LocalRpcService service = services.next();
552 if (service.getPackage().equals(pkg)) {
553 service.init(context, properties);
554 service.start();
555 serviceCache.put(pkg, service);
556 return service;
559 return null;
562 private static Level toJavaLevel(ApiProxy.LogRecord.Level apiProxyLevel) {
563 switch (apiProxyLevel) {
564 case debug:
565 return Level.FINE;
566 case info:
567 return Level.INFO;
568 case warn:
569 return Level.WARNING;
570 case error:
571 return Level.SEVERE;
572 case fatal:
573 return Level.SEVERE;
574 default:
575 return Level.WARNING;
579 @Override
580 public Clock getClock() {
581 return clock;
584 @Override
585 public void setClock(Clock clock) {
586 this.clock = clock;
589 private static class DaemonThreadFactory implements ThreadFactory {
591 private final ThreadFactory parent;
593 public DaemonThreadFactory(ThreadFactory parent) {
594 this.parent = parent;
597 @Override
598 public Thread newThread(Runnable r) {
599 Thread thread = parent.newThread(r);
600 thread.setDaemon(true);
601 return thread;