Revision created by MOE tool push_codebase.
[gae.git] / java / src / main / com / google / appengine / tools / development / ApiProxyLocalImpl.java
blob668a1bb2b5712695b7ba13fbec787f354df4c4f5
1 // Copyright 2008 Google Inc. All Rights Reserved.
3 package com.google.appengine.tools.development;
5 import com.google.appengine.api.capabilities.CapabilityStatus;
6 import com.google.appengine.tools.development.LocalRpcService.Status;
7 import com.google.apphosting.api.ApiProxy;
8 import com.google.apphosting.api.ApiProxy.CallNotFoundException;
9 import com.google.apphosting.api.ApiProxy.Environment;
10 import com.google.apphosting.api.ApiProxy.LogRecord;
11 import com.google.apphosting.api.ApiProxy.RequestTooLargeException;
12 import com.google.apphosting.api.ApiProxy.UnknownException;
13 import com.google.io.protocol.ProtocolMessage;
14 import com.google.protobuf.Message;
16 import java.lang.reflect.InvocationTargetException;
17 import java.lang.reflect.Method;
18 import java.security.AccessController;
19 import java.security.PrivilegedAction;
20 import java.util.Arrays;
21 import java.util.HashMap;
22 import java.util.List;
23 import java.util.Map;
24 import java.util.ServiceLoader;
25 import java.util.concurrent.Callable;
26 import java.util.concurrent.CancellationException;
27 import java.util.concurrent.ConcurrentHashMap;
28 import java.util.concurrent.ExecutionException;
29 import java.util.concurrent.ExecutorService;
30 import java.util.concurrent.Executors;
31 import java.util.concurrent.Future;
32 import java.util.concurrent.Semaphore;
33 import java.util.concurrent.ThreadFactory;
34 import java.util.concurrent.TimeUnit;
35 import java.util.concurrent.TimeoutException;
36 import java.util.logging.Level;
37 import java.util.logging.Logger;
39 /**
40 * Implements ApiProxy.Delegate such that the requests are dispatched to local service
41 * implementations. Used for both the {@link com.google.appengine.tools.development.DevAppServer}
42 * and for unit testing services.
45 class ApiProxyLocalImpl implements ApiProxyLocal, DevServices {
47 private static final Class<?> BYTE_ARRAY_CLASS = byte[].class;
49 /**
50 * The maximum size of any given API request.
52 private static final int MAX_API_REQUEST_SIZE = 1048576;
54 private static final String API_DEADLINE_KEY =
55 "com.google.apphosting.api.ApiProxy.api_deadline_key";
57 static final String IS_OFFLINE_REQUEST_KEY = "com.google.appengine.request.offline";
59 /**
60 * Implementation of the {@link LocalServiceContext} interface
62 private class LocalServiceContextImpl implements LocalServiceContext {
64 /**
65 * The local server environment
67 private final LocalServerEnvironment localServerEnvironment;
69 private final LocalCapabilitiesEnvironment localCapabilitiesEnvironment =
70 new LocalCapabilitiesEnvironment(System.getProperties());
72 /**
73 * Creates a new context, for the given application.
75 * @param localServerEnvironment The environment for the local server.
77 public LocalServiceContextImpl(LocalServerEnvironment localServerEnvironment) {
78 this.localServerEnvironment = localServerEnvironment;
81 @Override
82 public LocalServerEnvironment getLocalServerEnvironment() {
83 return localServerEnvironment;
86 @Override
87 public LocalCapabilitiesEnvironment getLocalCapabilitiesEnvironment() {
88 return localCapabilitiesEnvironment;
91 @Override
92 public Clock getClock() {
93 return clock;
96 @Override
97 public LocalRpcService getLocalService(String packageName) {
98 return ApiProxyLocalImpl.this.getService(packageName);
102 private static final Logger logger = Logger.getLogger(ApiProxyLocalImpl.class.getName());
104 private final Map<String, LocalRpcService> serviceCache =
105 new ConcurrentHashMap<String, LocalRpcService>();
107 private final Map<String, Method> methodCache = new ConcurrentHashMap<String, Method>();
108 final Map<Method, LatencySimulator> latencySimulatorCache =
109 new ConcurrentHashMap<Method, LatencySimulator>();
111 private final Map<String, String> properties = new HashMap<String, String>();
113 private final ExecutorService apiExecutor = Executors.newCachedThreadPool(
114 new DaemonThreadFactory(Executors.defaultThreadFactory()));
116 private final LocalServiceContext context;
118 private Clock clock = Clock.DEFAULT;
121 * Creates the local proxy in a given context
123 * @param environment the local server environment.
125 protected ApiProxyLocalImpl(LocalServerEnvironment environment) {
126 this.context = new LocalServiceContextImpl(environment);
129 @Override
130 public void log(Environment environment, LogRecord record) {
131 logger.log(toJavaLevel(record.getLevel()), record.getMessage());
134 @Override
135 public void flushLogs(Environment environment) {
136 System.err.flush();
139 @Override
140 public byte[] makeSyncCall(ApiProxy.Environment environment, String packageName,
141 String methodName, byte[] requestBytes) {
142 ApiProxy.ApiConfig apiConfig = null;
143 Double deadline = (Double) environment.getAttributes().get(API_DEADLINE_KEY);
144 if (deadline != null) {
145 apiConfig = new ApiProxy.ApiConfig();
146 apiConfig.setDeadlineInSeconds(deadline);
149 Future<byte[]> future =
150 makeAsyncCall(environment, packageName, methodName, requestBytes, apiConfig);
151 try {
152 return future.get();
153 } catch (InterruptedException ex) {
154 throw new ApiProxy.CancelledException(packageName, methodName);
155 } catch (CancellationException ex) {
156 throw new ApiProxy.CancelledException(packageName, methodName);
157 } catch (ExecutionException ex) {
158 if (ex.getCause() instanceof RuntimeException) {
159 throw (RuntimeException) ex.getCause();
160 } else if (ex.getCause() instanceof Error) {
161 throw (Error) ex.getCause();
162 } else {
163 throw new ApiProxy.UnknownException(packageName, methodName, ex.getCause());
168 @Override
169 public Future<byte[]> makeAsyncCall(Environment environment, final String packageName,
170 final String methodName, byte[] requestBytes, ApiProxy.ApiConfig apiConfig) {
171 Semaphore semaphore = (Semaphore) environment.getAttributes().get(
172 LocalEnvironment.API_CALL_SEMAPHORE);
173 if (semaphore != null) {
174 try {
175 semaphore.acquire();
176 } catch (InterruptedException ex) {
177 throw new RuntimeException("Interrupted while waiting on semaphore:", ex);
180 boolean offline = environment.getAttributes().get(IS_OFFLINE_REQUEST_KEY) != null;
181 AsyncApiCall asyncApiCall = new AsyncApiCall(
182 environment, packageName, methodName, requestBytes, semaphore);
183 boolean success = false;
184 try {
185 Callable<byte[]> callable = Executors.privilegedCallable(asyncApiCall);
187 Future<byte[]> resultFuture = AccessController.doPrivileged(
188 new PrivilegedApiAction(callable, asyncApiCall));
189 success = true;
190 if (context.getLocalServerEnvironment().enforceApiDeadlines()) {
191 long deadlineMillis = (long) (1000.0 * resolveDeadline(packageName, apiConfig, offline));
192 resultFuture = new TimedFuture<byte[]>(resultFuture, deadlineMillis, clock) {
193 @Override
194 protected RuntimeException createDeadlineException() {
195 return new ApiProxy.ApiDeadlineExceededException(packageName, methodName);
199 return resultFuture;
200 } finally {
201 if (!success) {
202 asyncApiCall.tryReleaseSemaphore();
207 @Override
208 public List<Thread> getRequestThreads(Environment environment) {
209 return Arrays.asList(new Thread[]{Thread.currentThread()});
212 private double resolveDeadline(String packageName, ApiProxy.ApiConfig apiConfig,
213 boolean isOffline) {
214 LocalRpcService service = getService(packageName);
215 Double deadline = null;
216 if (apiConfig != null) {
217 deadline = apiConfig.getDeadlineInSeconds();
219 if (deadline == null && service != null) {
220 deadline = service.getDefaultDeadline(isOffline);
222 if (deadline == null) {
223 deadline = 5.0;
226 Double maxDeadline = null;
227 if (service != null) {
228 maxDeadline = service.getMaximumDeadline(isOffline);
230 if (maxDeadline == null) {
231 maxDeadline = 10.0;
233 return Math.min(deadline, maxDeadline);
236 private class PrivilegedApiAction implements PrivilegedAction<Future<byte[]>> {
238 private final Callable<byte[]> callable;
239 private final AsyncApiCall asyncApiCall;
241 PrivilegedApiAction(Callable<byte[]> callable, AsyncApiCall asyncApiCall) {
242 this.callable = callable;
243 this.asyncApiCall = asyncApiCall;
246 @Override
247 public Future<byte[]> run() {
248 final Future<byte[]> result = apiExecutor.submit(callable);
249 return new Future<byte[]>() {
250 @Override
251 public boolean cancel(final boolean mayInterruptIfRunning) {
252 return AccessController.doPrivileged(
253 new PrivilegedAction<Boolean>() {
254 @Override
255 public Boolean run() {
256 asyncApiCall.tryReleaseSemaphore();
257 return result.cancel(mayInterruptIfRunning);
262 @Override
263 public boolean isCancelled() {
264 return result.isCancelled();
267 @Override
268 public boolean isDone() {
269 return result.isDone();
272 @Override
273 public byte[] get() throws InterruptedException, ExecutionException {
274 return result.get();
277 @Override
278 public byte[] get(long timeout, TimeUnit unit)
279 throws InterruptedException, ExecutionException, TimeoutException {
280 return result.get(timeout, unit);
287 * Convert the specified byte array to a protocol buffer representation of the specified type.
288 * This type can either be a subclass of {@link ProtocolMessage} (a legacy protocol buffer
289 * implementation), or {@link Message} (the open-sourced protocol buffer implementation).
291 private <T> T convertBytesToPb(byte[] bytes, Class<T> requestClass)
292 throws IllegalAccessException, InstantiationException,
293 InvocationTargetException, NoSuchMethodException {
294 if (ProtocolMessage.class.isAssignableFrom(requestClass)) {
295 ProtocolMessage<?> proto = (ProtocolMessage<?>) requestClass.newInstance();
296 proto.mergeFrom(bytes);
297 return requestClass.cast(proto);
299 if (Message.class.isAssignableFrom(requestClass)) {
300 Method method = requestClass.getMethod("parseFrom", BYTE_ARRAY_CLASS);
301 return requestClass.cast(method.invoke(null, bytes));
303 throw new UnsupportedOperationException(String.format("Cannot assign %s to either %s or %s",
304 classDescription(requestClass), ProtocolMessage.class, Message.class));
308 * Convert the protocol buffer representation to a byte array. The object can either be an
309 * instance of {@link ProtocolMessage} (a legacy protocol buffer implementation), or {@link
310 * Message} (the open-sourced protocol buffer implementation).
312 private byte[] convertPbToBytes(Object object) {
313 if (object instanceof ProtocolMessage) {
314 return ((ProtocolMessage<?>) object).toByteArray();
316 if (object instanceof Message) {
317 return ((Message) object).toByteArray();
319 throw new UnsupportedOperationException(String.format("%s is neither %s nor %s",
320 classDescription(object.getClass()), ProtocolMessage.class, Message.class));
324 * Create a textual description of a class that is appropriate for
325 * troubleshooting problems with {@link #convertBytesToPb(byte[], Class)}
326 * or {@link #convertPbToBytes(Object)}.
328 * @param klass The class to create a description for.
329 * @return A string description.
331 private static String classDescription(Class<?> klass) {
332 return String.format("(%s extends %s loaded from %s)",
333 klass, klass.getSuperclass(),
334 klass.getProtectionDomain().getCodeSource().getLocation());
337 @Override
338 public void setProperty(String property, String value) {
339 properties.put(property, value);
343 * Resets the service properties to {@code properties}.
345 * @param properties a maybe {@code null} set of properties for local services.
347 @Override
348 public void setProperties(Map<String, String> properties) {
349 this.properties.clear();
350 if (properties != null) {
351 this.appendProperties(properties);
356 * Appends the given service properties to {@code properties}.
358 * @param properties a set of properties to append for local services.
360 @Override
361 public void appendProperties(Map<String, String> properties) {
362 this.properties.putAll(properties);
366 * Stops all services started by this ApiProxy and releases all of its resources.
368 @Override
369 public void stop() {
370 for (LocalRpcService service : serviceCache.values()) {
371 service.stop();
374 serviceCache.clear();
375 methodCache.clear();
376 latencySimulatorCache.clear();
380 int getMaxApiRequestSize(LocalRpcService rpcService) {
381 Integer size = rpcService.getMaxApiRequestSize();
382 if (size == null) {
383 return MAX_API_REQUEST_SIZE;
385 return size;
388 private Method getDispatchMethod(LocalRpcService service, String packageName, String methodName) {
389 String dispatchName = Character.toLowerCase(methodName.charAt(0)) + methodName.substring(1);
390 String methodId = packageName + "." + dispatchName;
391 Method method = methodCache.get(methodId);
392 if (method != null) {
393 return method;
395 for (Method candidate : service.getClass().getMethods()) {
396 if (dispatchName.equals(candidate.getName())) {
397 methodCache.put(methodId, candidate);
398 LatencyPercentiles latencyPercentiles = candidate.getAnnotation(LatencyPercentiles.class);
399 if (latencyPercentiles == null) {
401 latencyPercentiles = service.getClass().getAnnotation(LatencyPercentiles.class);
403 if (latencyPercentiles != null) {
404 latencySimulatorCache.put(candidate, new LatencySimulator(latencyPercentiles));
406 return candidate;
409 throw new CallNotFoundException(packageName, methodName);
412 private class AsyncApiCall implements Callable<byte[]> {
414 private final Environment environment;
415 private final String packageName;
416 private final String methodName;
417 private final byte[] requestBytes;
418 private final Semaphore semaphore;
419 private boolean released;
421 public AsyncApiCall(Environment environment, String packageName, String methodName,
422 byte[] requestBytes, Semaphore semaphore) {
423 this.environment = environment;
424 this.packageName = packageName;
425 this.methodName = methodName;
426 this.requestBytes = requestBytes;
427 this.semaphore = semaphore;
430 @Override
431 public byte[] call() {
432 try {
433 return callInternal();
434 } finally {
435 tryReleaseSemaphore();
439 private byte[] callInternal() {
440 ApiProxy.setEnvironmentForCurrentThread(environment);
441 try {
442 LocalRpcService service = getService(packageName);
443 if (service == null) {
444 throw new CallNotFoundException(packageName, methodName);
446 LocalCapabilitiesEnvironment capEnv = context.getLocalCapabilitiesEnvironment();
447 CapabilityStatus capabilityStatus = capEnv
448 .getStatusFromMethodName(packageName, methodName);
449 if (!CapabilityStatus.ENABLED.equals(capabilityStatus)) {
450 throw new ApiProxy.CapabilityDisabledException(
451 "Setup in local configuration.", packageName, methodName);
454 if (requestBytes.length > getMaxApiRequestSize(service)) {
455 throw new RequestTooLargeException(packageName, methodName);
458 Method method = getDispatchMethod(service, packageName, methodName);
459 Status status = new Status();
460 Class<?> requestClass = method.getParameterTypes()[1];
461 Object request = convertBytesToPb(requestBytes, requestClass);
463 long start = clock.getCurrentTime();
464 try {
465 return convertPbToBytes(method.invoke(service, status, request));
466 } finally {
467 LatencySimulator latencySimulator = latencySimulatorCache.get(method);
468 if (latencySimulator != null) {
469 if (context.getLocalServerEnvironment().simulateProductionLatencies()) {
470 latencySimulator.simulateLatency(clock.getCurrentTime() - start, service, request);
474 } catch (IllegalAccessException e) {
475 throw new UnknownException(packageName, methodName, e);
476 } catch (InstantiationException e) {
477 throw new UnknownException(packageName, methodName, e);
478 } catch (NoSuchMethodException e) {
479 throw new UnknownException(packageName, methodName, e);
480 } catch (InvocationTargetException e) {
481 if (e.getCause() instanceof RuntimeException) {
482 throw (RuntimeException) e.getCause();
484 throw new UnknownException(packageName, methodName, e.getCause());
485 } finally {
486 ApiProxy.clearEnvironmentForCurrentThread();
491 * Synchronized method that ensures the semaphore that was claimed for this API call only gets
492 * released once.
494 synchronized void tryReleaseSemaphore() {
495 if (!released && semaphore != null) {
496 semaphore.release();
497 released = true;
503 * Method needs to be synchronized to ensure that we don't end up starting multiple instances of
504 * the same service. As an example, we've seen a race condition where the local datastore service
505 * has not yet been initialized and two datastore requests come in at the exact same time. The
506 * first request looks in the service cache, doesn't find it, starts a new local datastore
507 * service, registers it in the service cache, and uses that local datastore service to handle the
508 * first request. Meanwhile the second request looks in the service cache, doesn't find it,
509 * starts a new local datastore service, registers it in the service cache (stomping on the
510 * original one), and uses that local datastore service to handle the second request. If both of
511 * these requests start txns we can end up with 2 requests receiving the same txn id, and that
512 * yields all sorts of exciting behavior. So, we synchronize this method to ensure that we only
513 * register a single instance of each service type.
515 @Override
516 public final synchronized LocalRpcService getService(final String pkg) {
517 LocalRpcService cachedService = serviceCache.get(pkg);
518 if (cachedService != null) {
519 return cachedService;
522 return AccessController.doPrivileged(
523 new PrivilegedAction<LocalRpcService>() {
524 @Override
525 public LocalRpcService run() {
526 return startServices(pkg);
531 @Override
532 public DevLogService getLogService() {
533 return (DevLogService) getService(DevLogService.PACKAGE);
536 private LocalRpcService startServices(String pkg) {
537 for (LocalRpcService service : ServiceLoader.load(LocalRpcService.class,
538 ApiProxyLocalImpl.class.getClassLoader())) {
539 if (service.getPackage().equals(pkg)) {
540 service.init(context, properties);
541 service.start();
542 serviceCache.put(pkg, service);
543 return service;
546 return null;
549 private static Level toJavaLevel(ApiProxy.LogRecord.Level apiProxyLevel) {
550 switch (apiProxyLevel) {
551 case debug:
552 return Level.FINE;
553 case info:
554 return Level.INFO;
555 case warn:
556 return Level.WARNING;
557 case error:
558 return Level.SEVERE;
559 case fatal:
560 return Level.SEVERE;
561 default:
562 return Level.WARNING;
566 @Override
567 public Clock getClock() {
568 return clock;
571 @Override
572 public void setClock(Clock clock) {
573 this.clock = clock;
576 private static class DaemonThreadFactory implements ThreadFactory {
578 private final ThreadFactory parent;
580 public DaemonThreadFactory(ThreadFactory parent) {
581 this.parent = parent;
584 @Override
585 public Thread newThread(Runnable r) {
586 Thread thread = parent.newThread(r);
587 thread.setDaemon(true);
588 return thread;