Revision created by MOE tool push_codebase.
[gae.git] / java / src / main / com / google / appengine / tools / development / ApiProxyLocalImpl.java
blob4be2777890795e704da92510d30886679a63ae3f
1 // Copyright 2008 Google Inc. All Rights Reserved.
3 package com.google.appengine.tools.development;
5 import com.google.appengine.api.capabilities.CapabilityStatus;
6 import com.google.appengine.tools.development.LocalRpcService.Status;
7 import com.google.apphosting.api.ApiProxy;
8 import com.google.apphosting.api.ApiProxy.CallNotFoundException;
9 import com.google.apphosting.api.ApiProxy.Environment;
10 import com.google.apphosting.api.ApiProxy.LogRecord;
11 import com.google.apphosting.api.ApiProxy.RequestTooLargeException;
12 import com.google.apphosting.api.ApiProxy.UnknownException;
13 import com.google.io.protocol.ProtocolMessage;
14 import com.google.protobuf.Message;
16 import java.lang.reflect.InvocationTargetException;
17 import java.lang.reflect.Method;
18 import java.security.AccessController;
19 import java.security.PrivilegedAction;
20 import java.util.Arrays;
21 import java.util.HashMap;
22 import java.util.List;
23 import java.util.Map;
24 import java.util.ServiceLoader;
25 import java.util.concurrent.Callable;
26 import java.util.concurrent.CancellationException;
27 import java.util.concurrent.ConcurrentHashMap;
28 import java.util.concurrent.ExecutionException;
29 import java.util.concurrent.ExecutorService;
30 import java.util.concurrent.Executors;
31 import java.util.concurrent.Future;
32 import java.util.concurrent.Semaphore;
33 import java.util.concurrent.ThreadFactory;
34 import java.util.concurrent.TimeUnit;
35 import java.util.concurrent.TimeoutException;
36 import java.util.logging.Level;
37 import java.util.logging.Logger;
39 /**
40 * Implements ApiProxy.Delegate such that the requests are dispatched to local service
41 * implementations. Used for both the {@link com.google.appengine.tools.development.DevAppServer}
42 * and for unit testing services.
45 class ApiProxyLocalImpl implements ApiProxyLocal {
47 private static final Class<?> BYTE_ARRAY_CLASS = byte[].class;
49 /**
50 * The maximum size of any given API request.
52 private static final int MAX_API_REQUEST_SIZE = 1048576;
54 private static final String API_DEADLINE_KEY =
55 "com.google.apphosting.api.ApiProxy.api_deadline_key";
57 static final String IS_OFFLINE_REQUEST_KEY = "com.google.appengine.request.offline";
59 /**
60 * Implementation of the {@link LocalServiceContext} interface
62 private class LocalServiceContextImpl implements LocalServiceContext {
64 /**
65 * The local server environment
67 private final LocalServerEnvironment localServerEnvironment;
69 private final LocalCapabilitiesEnvironment localCapabilitiesEnvironment =
70 new LocalCapabilitiesEnvironment(System.getProperties());
72 /**
73 * Creates a new context, for the given application.
75 * @param localServerEnvironment The environment for the local server.
77 public LocalServiceContextImpl(LocalServerEnvironment localServerEnvironment) {
78 this.localServerEnvironment = localServerEnvironment;
81 @Override
82 public LocalServerEnvironment getLocalServerEnvironment() {
83 return localServerEnvironment;
86 @Override
87 public LocalCapabilitiesEnvironment getLocalCapabilitiesEnvironment() {
88 return localCapabilitiesEnvironment;
91 @Override
92 public Clock getClock() {
93 return clock;
96 @Override
97 public LocalRpcService getLocalService(String packageName) {
98 return ApiProxyLocalImpl.this.getService(packageName);
102 private static final Logger logger = Logger.getLogger(ApiProxyLocalImpl.class.getName());
104 private final Map<String, LocalRpcService> serviceCache =
105 new ConcurrentHashMap<String, LocalRpcService>();
107 private final Map<String, Method> methodCache = new ConcurrentHashMap<String, Method>();
108 final Map<Method, LatencySimulator> latencySimulatorCache =
109 new ConcurrentHashMap<Method, LatencySimulator>();
111 private final Map<String, String> properties = new HashMap<String, String>();
113 private final ExecutorService apiExecutor = Executors.newCachedThreadPool(
114 new DaemonThreadFactory(Executors.defaultThreadFactory()));
116 private final LocalServiceContext context;
118 private Clock clock = Clock.DEFAULT;
121 * Creates the local proxy in a given context
123 * @param environment the local server environment.
125 protected ApiProxyLocalImpl(LocalServerEnvironment environment) {
126 this.context = new LocalServiceContextImpl(environment);
129 @Override
130 public void log(Environment environment, LogRecord record) {
131 logger.log(toJavaLevel(record.getLevel()), record.getMessage());
134 @Override
135 public void flushLogs(Environment environment) {
136 System.err.flush();
139 @Override
140 public byte[] makeSyncCall(ApiProxy.Environment environment, String packageName,
141 String methodName, byte[] requestBytes) {
142 ApiProxy.ApiConfig apiConfig = null;
143 Double deadline = (Double) environment.getAttributes().get(API_DEADLINE_KEY);
144 if (deadline != null) {
145 apiConfig = new ApiProxy.ApiConfig();
146 apiConfig.setDeadlineInSeconds(deadline);
149 Future<byte[]> future =
150 doAsyncCall(environment, packageName, methodName, requestBytes, apiConfig);
151 try {
152 return future.get();
153 } catch (InterruptedException ex) {
154 throw new ApiProxy.CancelledException(packageName, methodName);
155 } catch (CancellationException ex) {
156 throw new ApiProxy.CancelledException(packageName, methodName);
157 } catch (ExecutionException ex) {
158 if (ex.getCause() instanceof RuntimeException) {
159 throw (RuntimeException) ex.getCause();
160 } else if (ex.getCause() instanceof Error) {
161 throw (Error) ex.getCause();
162 } else {
163 throw new ApiProxy.UnknownException(packageName, methodName, ex.getCause());
168 @Override
169 public Future<byte[]> makeAsyncCall(Environment environment, String packageName,
170 String methodName, byte[] requestBytes, ApiProxy.ApiConfig apiConfig) {
171 return doAsyncCall(environment, packageName, methodName, requestBytes, apiConfig);
174 @Override
175 public List<Thread> getRequestThreads(Environment environment) {
176 return Arrays.asList(new Thread[]{Thread.currentThread()});
179 private Future<byte[]> doAsyncCall(Environment environment, final String packageName,
180 final String methodName, byte[] requestBytes, ApiProxy.ApiConfig apiConfig) {
181 Semaphore semaphore = (Semaphore) environment.getAttributes().get(
182 LocalEnvironment.API_CALL_SEMAPHORE);
183 if (semaphore != null) {
184 try {
185 semaphore.acquire();
186 } catch (InterruptedException ex) {
187 throw new RuntimeException("Interrupted while waiting on semaphore:", ex);
190 boolean offline = environment.getAttributes().get(IS_OFFLINE_REQUEST_KEY) != null;
191 AsyncApiCall asyncApiCall = new AsyncApiCall(
192 environment, packageName, methodName, requestBytes, semaphore);
193 boolean success = false;
194 try {
195 Callable<byte[]> callable = Executors.privilegedCallable(asyncApiCall);
197 Future<byte[]> resultFuture = AccessController.doPrivileged(
198 new PrivilegedApiAction(callable, asyncApiCall));
199 success = true;
200 if (context.getLocalServerEnvironment().enforceApiDeadlines()) {
201 long deadlineMillis = (long) (1000.0 * resolveDeadline(packageName, apiConfig, offline));
202 resultFuture = new TimedFuture<byte[]>(resultFuture, deadlineMillis, clock) {
203 @Override
204 protected RuntimeException createDeadlineException() {
205 throw new ApiProxy.ApiDeadlineExceededException(packageName, methodName);
209 return resultFuture;
210 } finally {
211 if (!success) {
212 asyncApiCall.tryReleaseSemaphore();
217 private double resolveDeadline(String packageName, ApiProxy.ApiConfig apiConfig,
218 boolean isOffline) {
219 LocalRpcService service = getService(packageName);
220 Double deadline = null;
221 if (apiConfig != null) {
222 deadline = apiConfig.getDeadlineInSeconds();
224 if (deadline == null && service != null) {
225 deadline = service.getDefaultDeadline(isOffline);
227 if (deadline == null) {
228 deadline = 5.0;
231 Double maxDeadline = null;
232 if (service != null) {
233 maxDeadline = service.getMaximumDeadline(isOffline);
235 if (maxDeadline == null) {
236 maxDeadline = 10.0;
238 return Math.min(deadline, maxDeadline);
241 private class PrivilegedApiAction implements PrivilegedAction<Future<byte[]>> {
243 private final Callable<byte[]> callable;
244 private final AsyncApiCall asyncApiCall;
246 PrivilegedApiAction(Callable<byte[]> callable, AsyncApiCall asyncApiCall) {
247 this.callable = callable;
248 this.asyncApiCall = asyncApiCall;
251 @Override
252 public Future<byte[]> run() {
253 final Future<byte[]> result = apiExecutor.submit(callable);
254 return new Future<byte[]>() {
255 @Override
256 public boolean cancel(final boolean mayInterruptIfRunning) {
257 return AccessController.doPrivileged(
258 new PrivilegedAction<Boolean>() {
259 @Override
260 public Boolean run() {
261 asyncApiCall.tryReleaseSemaphore();
262 return result.cancel(mayInterruptIfRunning);
267 @Override
268 public boolean isCancelled() {
269 return result.isCancelled();
272 @Override
273 public boolean isDone() {
274 return result.isDone();
277 @Override
278 public byte[] get() throws InterruptedException, ExecutionException {
279 return result.get();
282 @Override
283 public byte[] get(long timeout, TimeUnit unit)
284 throws InterruptedException, ExecutionException, TimeoutException {
285 return result.get(timeout, unit);
292 * Convert the specified byte array to a protocol buffer representation of the specified type.
293 * This type can either be a subclass of {@link ProtocolMessage} (a legacy protocol buffer
294 * implementation), or {@link Message} (the open-sourced protocol buffer implementation).
296 private <T> T convertBytesToPb(byte[] bytes, Class<T> requestClass)
297 throws IllegalAccessException, InstantiationException,
298 InvocationTargetException, NoSuchMethodException {
299 if (ProtocolMessage.class.isAssignableFrom(requestClass)) {
300 ProtocolMessage<?> proto = (ProtocolMessage<?>) requestClass.newInstance();
301 proto.mergeFrom(bytes);
302 return requestClass.cast(proto);
304 if (Message.class.isAssignableFrom(requestClass)) {
305 Method method = requestClass.getMethod("parseFrom", BYTE_ARRAY_CLASS);
306 return requestClass.cast(method.invoke(null, bytes));
308 throw new UnsupportedOperationException(String.format("Cannot assign %s to either %s or %s",
309 classDescription(requestClass), ProtocolMessage.class, Message.class));
313 * Convert the protocol buffer representation to a byte array. The object can either be an
314 * instance of {@link ProtocolMessage} (a legacy protocol buffer implementation), or {@link
315 * Message} (the open-sourced protocol buffer implementation).
317 private byte[] convertPbToBytes(Object object) {
318 if (object instanceof ProtocolMessage) {
319 return ((ProtocolMessage<?>) object).toByteArray();
321 if (object instanceof Message) {
322 return ((Message) object).toByteArray();
324 throw new UnsupportedOperationException(String.format("%s is neither %s nor %s",
325 classDescription(object.getClass()), ProtocolMessage.class, Message.class));
329 * Create a textual description of a class that is appropriate for
330 * troubleshooting problems with {@link #convertBytesToPb(byte[], Class)}
331 * or {@link #convertPbToBytes(Object)}.
333 * @param klass The class to create a description for.
334 * @return A string description.
336 private static String classDescription(Class<?> klass) {
337 return String.format("(%s extends %s loaded from %s)",
338 klass, klass.getSuperclass(),
339 klass.getProtectionDomain().getCodeSource().getLocation());
342 @Override
343 public void setProperty(String property, String value) {
344 properties.put(property, value);
348 * Resets the service properties to {@code properties}.
350 * @param properties a maybe {@code null} set of properties for local services.
352 @Override
353 public void setProperties(Map<String, String> properties) {
354 this.properties.clear();
355 if (properties != null) {
356 this.appendProperties(properties);
361 * Appends the given service properties to {@code properties}.
363 * @param properties a set of properties to append for local services.
365 @Override
366 public void appendProperties(Map<String, String> properties) {
367 this.properties.putAll(properties);
371 * Stops all services started by this ApiProxy and releases all of its resources.
373 @Override
374 public void stop() {
375 for (LocalRpcService service : serviceCache.values()) {
376 service.stop();
379 serviceCache.clear();
380 methodCache.clear();
381 latencySimulatorCache.clear();
385 int getMaxApiRequestSize(LocalRpcService rpcService) {
386 Integer size = rpcService.getMaxApiRequestSize();
387 if (size == null) {
388 return MAX_API_REQUEST_SIZE;
390 return size;
393 private Method getDispatchMethod(LocalRpcService service, String packageName, String methodName) {
394 String dispatchName = Character.toLowerCase(methodName.charAt(0)) + methodName.substring(1);
395 String methodId = packageName + "." + dispatchName;
396 Method method = methodCache.get(methodId);
397 if (method != null) {
398 return method;
400 for (Method candidate : service.getClass().getMethods()) {
401 if (dispatchName.equals(candidate.getName())) {
402 methodCache.put(methodId, candidate);
403 LatencyPercentiles latencyPercentiles = candidate.getAnnotation(LatencyPercentiles.class);
404 if (latencyPercentiles == null) {
406 latencyPercentiles = service.getClass().getAnnotation(LatencyPercentiles.class);
408 if (latencyPercentiles != null) {
409 latencySimulatorCache.put(candidate, new LatencySimulator(latencyPercentiles));
411 return candidate;
414 throw new CallNotFoundException(packageName, methodName);
417 private class AsyncApiCall implements Callable<byte[]> {
419 private final Environment environment;
420 private final String packageName;
421 private final String methodName;
422 private final byte[] requestBytes;
423 private final Semaphore semaphore;
424 private boolean released;
426 public AsyncApiCall(Environment environment, String packageName, String methodName,
427 byte[] requestBytes, Semaphore semaphore) {
428 this.environment = environment;
429 this.packageName = packageName;
430 this.methodName = methodName;
431 this.requestBytes = requestBytes;
432 this.semaphore = semaphore;
435 @Override
436 public byte[] call() {
437 try {
438 return callInternal();
439 } finally {
440 tryReleaseSemaphore();
444 private byte[] callInternal() {
445 ApiProxy.setEnvironmentForCurrentThread(environment);
446 try {
447 LocalRpcService service = getService(packageName);
448 if (service == null) {
449 throw new CallNotFoundException(packageName, methodName);
451 LocalCapabilitiesEnvironment capEnv = context.getLocalCapabilitiesEnvironment();
452 CapabilityStatus capabilityStatus = capEnv
453 .getStatusFromMethodName(packageName, methodName);
454 if (!CapabilityStatus.ENABLED.equals(capabilityStatus)) {
455 throw new ApiProxy.CapabilityDisabledException(
456 "Setup in local configuration.", packageName, methodName);
459 if (requestBytes.length > getMaxApiRequestSize(service)) {
460 throw new RequestTooLargeException(packageName, methodName);
463 Method method = getDispatchMethod(service, packageName, methodName);
464 Status status = new Status();
465 Class<?> requestClass = method.getParameterTypes()[1];
466 Object request = convertBytesToPb(requestBytes, requestClass);
468 long start = clock.getCurrentTime();
469 try {
470 return convertPbToBytes(method.invoke(service, status, request));
471 } finally {
472 LatencySimulator latencySimulator = latencySimulatorCache.get(method);
473 if (latencySimulator != null) {
474 if (context.getLocalServerEnvironment().simulateProductionLatencies()) {
475 latencySimulator.simulateLatency(clock.getCurrentTime() - start, service, request);
479 } catch (IllegalAccessException e) {
480 throw new UnknownException(packageName, methodName, e);
481 } catch (InstantiationException e) {
482 throw new UnknownException(packageName, methodName, e);
483 } catch (NoSuchMethodException e) {
484 throw new UnknownException(packageName, methodName, e);
485 } catch (InvocationTargetException e) {
486 if (e.getCause() instanceof RuntimeException) {
487 throw (RuntimeException) e.getCause();
489 throw new UnknownException(packageName, methodName, e.getCause());
490 } finally {
491 ApiProxy.clearEnvironmentForCurrentThread();
496 * Synchronized method that ensures the semaphore that was claimed for this API call only gets
497 * released once.
499 synchronized void tryReleaseSemaphore() {
500 if (!released && semaphore != null) {
501 semaphore.release();
502 released = true;
508 * Method needs to be synchronized to ensure that we don't end up starting multiple instances of
509 * the same service. As an example, we've seen a race condition where the local datastore service
510 * has not yet been initialized and two datastore requests come in at the exact same time. The
511 * first request looks in the service cache, doesn't find it, starts a new local datastore
512 * service, registers it in the service cache, and uses that local datastore service to handle the
513 * first request. Meanwhile the second request looks in the service cache, doesn't find it,
514 * starts a new local datastore service, registers it in the service cache (stomping on the
515 * original one), and uses that local datastore service to handle the second request. If both of
516 * these requests start txns we can end up with 2 requests receiving the same txn id, and that
517 * yields all sorts of exciting behavior. So, we synchronize this method to ensure that we only
518 * register a single instance of each service type.
520 @Override
521 public synchronized final LocalRpcService getService(final String pkg) {
522 LocalRpcService cachedService = serviceCache.get(pkg);
523 if (cachedService != null) {
524 return cachedService;
527 return AccessController.doPrivileged(
528 new PrivilegedAction<LocalRpcService>() {
529 @Override
530 public LocalRpcService run() {
531 return startServices(pkg);
536 private LocalRpcService startServices(String pkg) {
537 for (LocalRpcService service : ServiceLoader.load(LocalRpcService.class,
538 ApiProxyLocalImpl.class.getClassLoader())) {
539 if (service.getPackage().equals(pkg)) {
540 service.init(context, properties);
541 service.start();
542 serviceCache.put(pkg, service);
543 return service;
546 return null;
549 private static Level toJavaLevel(ApiProxy.LogRecord.Level apiProxyLevel) {
550 switch (apiProxyLevel) {
551 case debug:
552 return Level.FINE;
553 case info:
554 return Level.INFO;
555 case warn:
556 return Level.WARNING;
557 case error:
558 return Level.SEVERE;
559 case fatal:
560 return Level.SEVERE;
561 default:
562 return Level.WARNING;
566 @Override
567 public Clock getClock() {
568 return clock;
571 @Override
572 public void setClock(Clock clock) {
573 this.clock = clock;
576 private static class DaemonThreadFactory implements ThreadFactory {
578 private final ThreadFactory parent;
580 public DaemonThreadFactory(ThreadFactory parent) {
581 this.parent = parent;
584 @Override
585 public Thread newThread(Runnable r) {
586 Thread thread = parent.newThread(r);
587 thread.setDaemon(true);
588 return thread;