1.9.30 sync.
[gae.git] / java / src / main / com / google / appengine / tools / development / ApiProxyLocalImpl.java
blob798b8da7b7ab734b89a34a68f2b1886f35299b26
1 // Copyright 2008 Google Inc. All Rights Reserved.
3 package com.google.appengine.tools.development;
5 import com.google.appengine.api.capabilities.CapabilityStatus;
6 import com.google.appengine.tools.development.LocalRpcService.Status;
7 import com.google.apphosting.api.ApiProxy;
8 import com.google.apphosting.api.ApiProxy.CallNotFoundException;
9 import com.google.apphosting.api.ApiProxy.Environment;
10 import com.google.apphosting.api.ApiProxy.FeatureNotEnabledException;
11 import com.google.apphosting.api.ApiProxy.LogRecord;
12 import com.google.apphosting.api.ApiProxy.RequestTooLargeException;
13 import com.google.apphosting.api.ApiProxy.UnknownException;
14 import com.google.io.protocol.ProtocolMessage;
15 import com.google.protobuf.Message;
17 import java.lang.reflect.InvocationTargetException;
18 import java.lang.reflect.Method;
19 import java.security.AccessController;
20 import java.security.PrivilegedAction;
21 import java.util.Arrays;
22 import java.util.HashMap;
23 import java.util.List;
24 import java.util.Map;
25 import java.util.ServiceLoader;
26 import java.util.concurrent.Callable;
27 import java.util.concurrent.CancellationException;
28 import java.util.concurrent.ConcurrentHashMap;
29 import java.util.concurrent.ExecutionException;
30 import java.util.concurrent.ExecutorService;
31 import java.util.concurrent.Executors;
32 import java.util.concurrent.Future;
33 import java.util.concurrent.Semaphore;
34 import java.util.concurrent.ThreadFactory;
35 import java.util.concurrent.TimeUnit;
36 import java.util.concurrent.TimeoutException;
37 import java.util.logging.Level;
38 import java.util.logging.Logger;
40 /**
41 * Implements ApiProxy.Delegate such that the requests are dispatched to local service
42 * implementations. Used for both the {@link com.google.appengine.tools.development.DevAppServer}
43 * and for unit testing services.
46 class ApiProxyLocalImpl implements ApiProxyLocal, DevServices {
48 private static final Class<?> BYTE_ARRAY_CLASS = byte[].class;
50 private static final String FILESAPI_DISABLED_MESSAGE =
51 "The Files API is disabled. Further information: "
52 + "https://cloud.google.com/appengine/docs/deprecations/files_api";
54 /**
55 * The maximum size of any given API request.
57 private static final int MAX_API_REQUEST_SIZE = 1048576;
59 private static final String API_DEADLINE_KEY =
60 "com.google.apphosting.api.ApiProxy.api_deadline_key";
62 static final String IS_OFFLINE_REQUEST_KEY = "com.google.appengine.request.offline";
64 /**
65 * Implementation of the {@link LocalServiceContext} interface
67 private class LocalServiceContextImpl implements LocalServiceContext {
69 /**
70 * The local server environment
72 private final LocalServerEnvironment localServerEnvironment;
74 private final LocalCapabilitiesEnvironment localCapabilitiesEnvironment =
75 new LocalCapabilitiesEnvironment(System.getProperties());
77 /**
78 * Creates a new context, for the given application.
80 * @param localServerEnvironment The environment for the local server.
82 public LocalServiceContextImpl(LocalServerEnvironment localServerEnvironment) {
83 this.localServerEnvironment = localServerEnvironment;
86 @Override
87 public LocalServerEnvironment getLocalServerEnvironment() {
88 return localServerEnvironment;
91 @Override
92 public LocalCapabilitiesEnvironment getLocalCapabilitiesEnvironment() {
93 return localCapabilitiesEnvironment;
96 @Override
97 public Clock getClock() {
98 return clock;
101 @Override
102 public LocalRpcService getLocalService(String packageName) {
103 return ApiProxyLocalImpl.this.getService(packageName);
107 private static final Logger logger = Logger.getLogger(ApiProxyLocalImpl.class.getName());
109 private final Map<String, LocalRpcService> serviceCache =
110 new ConcurrentHashMap<String, LocalRpcService>();
112 private final Map<String, Method> methodCache = new ConcurrentHashMap<String, Method>();
113 final Map<Method, LatencySimulator> latencySimulatorCache =
114 new ConcurrentHashMap<Method, LatencySimulator>();
116 private final Map<String, String> properties = new HashMap<String, String>();
118 private final ExecutorService apiExecutor = Executors.newCachedThreadPool(
119 new DaemonThreadFactory(Executors.defaultThreadFactory()));
121 private final LocalServiceContext context;
123 private Clock clock = Clock.DEFAULT;
126 * Creates the local proxy in a given context
128 * @param environment the local server environment.
130 protected ApiProxyLocalImpl(LocalServerEnvironment environment) {
131 this.context = new LocalServiceContextImpl(environment);
134 @Override
135 public void log(Environment environment, LogRecord record) {
136 logger.log(toJavaLevel(record.getLevel()), record.getMessage());
139 @Override
140 public void flushLogs(Environment environment) {
141 System.err.flush();
144 @Override
145 public byte[] makeSyncCall(ApiProxy.Environment environment, String packageName,
146 String methodName, byte[] requestBytes) {
147 ApiProxy.ApiConfig apiConfig = null;
148 Double deadline = (Double) environment.getAttributes().get(API_DEADLINE_KEY);
149 if (deadline != null) {
150 apiConfig = new ApiProxy.ApiConfig();
151 apiConfig.setDeadlineInSeconds(deadline);
154 Future<byte[]> future =
155 makeAsyncCall(environment, packageName, methodName, requestBytes, apiConfig);
156 try {
157 return future.get();
158 } catch (InterruptedException ex) {
159 throw new ApiProxy.CancelledException(packageName, methodName);
160 } catch (CancellationException ex) {
161 throw new ApiProxy.CancelledException(packageName, methodName);
162 } catch (ExecutionException ex) {
163 if (ex.getCause() instanceof RuntimeException) {
164 throw (RuntimeException) ex.getCause();
165 } else if (ex.getCause() instanceof Error) {
166 throw (Error) ex.getCause();
167 } else {
168 throw new ApiProxy.UnknownException(packageName, methodName, ex.getCause());
173 @Override
174 public Future<byte[]> makeAsyncCall(Environment environment, final String packageName,
175 final String methodName, byte[] requestBytes, ApiProxy.ApiConfig apiConfig) {
176 Semaphore semaphore = (Semaphore) environment.getAttributes().get(
177 LocalEnvironment.API_CALL_SEMAPHORE);
178 if (semaphore != null) {
179 try {
180 semaphore.acquire();
181 } catch (InterruptedException ex) {
182 throw new RuntimeException("Interrupted while waiting on semaphore:", ex);
185 boolean offline = environment.getAttributes().get(IS_OFFLINE_REQUEST_KEY) != null;
186 AsyncApiCall asyncApiCall = new AsyncApiCall(
187 environment, packageName, methodName, requestBytes, semaphore);
188 boolean success = false;
189 try {
190 Callable<byte[]> callable = Executors.privilegedCallable(asyncApiCall);
192 Future<byte[]> resultFuture = AccessController.doPrivileged(
193 new PrivilegedApiAction(callable, asyncApiCall));
194 success = true;
195 if (context.getLocalServerEnvironment().enforceApiDeadlines()) {
196 long deadlineMillis = (long) (1000.0 * resolveDeadline(packageName, apiConfig, offline));
197 resultFuture = new TimedFuture<byte[]>(resultFuture, deadlineMillis, clock) {
198 @Override
199 protected RuntimeException createDeadlineException() {
200 return new ApiProxy.ApiDeadlineExceededException(packageName, methodName);
204 return resultFuture;
205 } finally {
206 if (!success) {
207 asyncApiCall.tryReleaseSemaphore();
212 @Override
213 public List<Thread> getRequestThreads(Environment environment) {
214 return Arrays.asList(new Thread[]{Thread.currentThread()});
217 private double resolveDeadline(String packageName, ApiProxy.ApiConfig apiConfig,
218 boolean isOffline) {
219 LocalRpcService service = getService(packageName);
220 Double deadline = null;
221 if (apiConfig != null) {
222 deadline = apiConfig.getDeadlineInSeconds();
224 if (deadline == null && service != null) {
225 deadline = service.getDefaultDeadline(isOffline);
227 if (deadline == null) {
228 deadline = 5.0;
231 Double maxDeadline = null;
232 if (service != null) {
233 maxDeadline = service.getMaximumDeadline(isOffline);
235 if (maxDeadline == null) {
236 maxDeadline = 10.0;
238 return Math.min(deadline, maxDeadline);
241 private class PrivilegedApiAction implements PrivilegedAction<Future<byte[]>> {
243 private final Callable<byte[]> callable;
244 private final AsyncApiCall asyncApiCall;
246 PrivilegedApiAction(Callable<byte[]> callable, AsyncApiCall asyncApiCall) {
247 this.callable = callable;
248 this.asyncApiCall = asyncApiCall;
251 @Override
252 public Future<byte[]> run() {
253 final Future<byte[]> result = apiExecutor.submit(callable);
254 return new Future<byte[]>() {
255 @Override
256 public boolean cancel(final boolean mayInterruptIfRunning) {
257 return AccessController.doPrivileged(
258 new PrivilegedAction<Boolean>() {
259 @Override
260 public Boolean run() {
261 asyncApiCall.tryReleaseSemaphore();
262 return result.cancel(mayInterruptIfRunning);
267 @Override
268 public boolean isCancelled() {
269 return result.isCancelled();
272 @Override
273 public boolean isDone() {
274 return result.isDone();
277 @Override
278 public byte[] get() throws InterruptedException, ExecutionException {
279 return result.get();
282 @Override
283 public byte[] get(long timeout, TimeUnit unit)
284 throws InterruptedException, ExecutionException, TimeoutException {
285 return result.get(timeout, unit);
292 * Convert the specified byte array to a protocol buffer representation of the specified type.
293 * This type can either be a subclass of {@link ProtocolMessage} (a legacy protocol buffer
294 * implementation), or {@link Message} (the open-sourced protocol buffer implementation).
296 private <T> T convertBytesToPb(byte[] bytes, Class<T> requestClass)
297 throws IllegalAccessException, InstantiationException,
298 InvocationTargetException, NoSuchMethodException {
299 if (ProtocolMessage.class.isAssignableFrom(requestClass)) {
300 ProtocolMessage<?> proto = (ProtocolMessage<?>) requestClass.newInstance();
301 boolean parsed = proto.mergeFrom(bytes);
302 if (!parsed || !proto.isInitialized()) {
303 throw new RuntimeException(
304 "Could not parse request bytes into " + classDescription(requestClass));
306 return requestClass.cast(proto);
308 if (Message.class.isAssignableFrom(requestClass)) {
309 Method method = requestClass.getMethod("parseFrom", BYTE_ARRAY_CLASS);
310 return requestClass.cast(method.invoke(null, bytes));
312 throw new UnsupportedOperationException(String.format("Cannot assign %s to either %s or %s",
313 classDescription(requestClass), ProtocolMessage.class, Message.class));
317 * Convert the protocol buffer representation to a byte array. The object can either be an
318 * instance of {@link ProtocolMessage} (a legacy protocol buffer implementation), or {@link
319 * Message} (the open-sourced protocol buffer implementation).
321 private byte[] convertPbToBytes(Object object) {
322 if (object instanceof ProtocolMessage) {
323 return ((ProtocolMessage<?>) object).toByteArray();
325 if (object instanceof Message) {
326 return ((Message) object).toByteArray();
328 throw new UnsupportedOperationException(String.format("%s is neither %s nor %s",
329 classDescription(object.getClass()), ProtocolMessage.class, Message.class));
333 * Create a textual description of a class that is appropriate for
334 * troubleshooting problems with {@link #convertBytesToPb(byte[], Class)}
335 * or {@link #convertPbToBytes(Object)}.
337 * @param klass The class to create a description for.
338 * @return A string description.
340 private static String classDescription(Class<?> klass) {
341 return String.format("(%s extends %s loaded from %s)",
342 klass, klass.getSuperclass(),
343 klass.getProtectionDomain().getCodeSource().getLocation());
346 @Override
347 public void setProperty(String property, String value) {
348 properties.put(property, value);
352 * Resets the service properties to {@code properties}.
354 * @param properties a maybe {@code null} set of properties for local services.
356 @Override
357 public void setProperties(Map<String, String> properties) {
358 this.properties.clear();
359 if (properties != null) {
360 this.appendProperties(properties);
365 * Appends the given service properties to {@code properties}.
367 * @param properties a set of properties to append for local services.
369 @Override
370 public void appendProperties(Map<String, String> properties) {
371 this.properties.putAll(properties);
375 * Stops all services started by this ApiProxy and releases all of its resources.
377 @Override
378 public void stop() {
379 for (LocalRpcService service : serviceCache.values()) {
380 service.stop();
383 serviceCache.clear();
384 methodCache.clear();
385 latencySimulatorCache.clear();
389 int getMaxApiRequestSize(LocalRpcService rpcService) {
390 Integer size = rpcService.getMaxApiRequestSize();
391 if (size == null) {
392 return MAX_API_REQUEST_SIZE;
394 return size;
397 private Method getDispatchMethod(LocalRpcService service, String packageName, String methodName) {
398 String dispatchName = Character.toLowerCase(methodName.charAt(0)) + methodName.substring(1);
399 String methodId = packageName + "." + dispatchName;
400 Method method = methodCache.get(methodId);
401 if (method != null) {
402 return method;
404 for (Method candidate : service.getClass().getMethods()) {
405 if (dispatchName.equals(candidate.getName())) {
406 methodCache.put(methodId, candidate);
407 LatencyPercentiles latencyPercentiles = candidate.getAnnotation(LatencyPercentiles.class);
408 if (latencyPercentiles == null) {
410 latencyPercentiles = service.getClass().getAnnotation(LatencyPercentiles.class);
412 if (latencyPercentiles != null) {
413 latencySimulatorCache.put(candidate, new LatencySimulator(latencyPercentiles));
415 return candidate;
418 throw new CallNotFoundException(packageName, methodName);
421 private class AsyncApiCall implements Callable<byte[]> {
423 private final Environment environment;
424 private final String packageName;
425 private final String methodName;
426 private final byte[] requestBytes;
427 private final Semaphore semaphore;
428 private boolean released;
430 public AsyncApiCall(Environment environment, String packageName, String methodName,
431 byte[] requestBytes, Semaphore semaphore) {
432 this.environment = environment;
433 this.packageName = packageName;
434 this.methodName = methodName;
435 this.requestBytes = requestBytes;
436 this.semaphore = semaphore;
439 @Override
440 public byte[] call() {
441 try {
442 return callInternal();
443 } finally {
444 tryReleaseSemaphore();
448 private byte[] callInternal() {
449 ApiProxy.setEnvironmentForCurrentThread(environment);
450 try {
451 LocalRpcService service = getService(packageName);
452 if (service == null) {
453 throw new CallNotFoundException(packageName, methodName);
456 if ("file".equals(packageName)) {
457 if (Boolean.getBoolean("appengine.enableFilesApi")) {
458 environment.getAttributes().put(LocalEnvironment.FILESAPI_WAS_USED, true);
459 } else {
460 throw new FeatureNotEnabledException(
461 FILESAPI_DISABLED_MESSAGE, packageName, methodName);
465 LocalCapabilitiesEnvironment capEnv = context.getLocalCapabilitiesEnvironment();
466 CapabilityStatus capabilityStatus = capEnv
467 .getStatusFromMethodName(packageName, methodName);
468 if (!CapabilityStatus.ENABLED.equals(capabilityStatus)) {
469 throw new ApiProxy.CapabilityDisabledException(
470 "Setup in local configuration.", packageName, methodName);
473 if (requestBytes.length > getMaxApiRequestSize(service)) {
474 throw new RequestTooLargeException(packageName, methodName);
477 Method method = getDispatchMethod(service, packageName, methodName);
478 Status status = new Status();
479 Class<?> requestClass = method.getParameterTypes()[1];
480 Object request = convertBytesToPb(requestBytes, requestClass);
482 long start = clock.getCurrentTime();
483 try {
484 return convertPbToBytes(method.invoke(service, status, request));
485 } finally {
486 LatencySimulator latencySimulator = latencySimulatorCache.get(method);
487 if (latencySimulator != null) {
488 if (context.getLocalServerEnvironment().simulateProductionLatencies()) {
489 latencySimulator.simulateLatency(clock.getCurrentTime() - start, service, request);
493 } catch (IllegalAccessException e) {
494 throw new UnknownException(packageName, methodName, e);
495 } catch (InstantiationException e) {
496 throw new UnknownException(packageName, methodName, e);
497 } catch (NoSuchMethodException e) {
498 throw new UnknownException(packageName, methodName, e);
499 } catch (InvocationTargetException e) {
500 if (e.getCause() instanceof RuntimeException) {
501 throw (RuntimeException) e.getCause();
503 throw new UnknownException(packageName, methodName, e.getCause());
504 } finally {
505 ApiProxy.clearEnvironmentForCurrentThread();
510 * Synchronized method that ensures the semaphore that was claimed for this API call only gets
511 * released once.
513 synchronized void tryReleaseSemaphore() {
514 if (!released && semaphore != null) {
515 semaphore.release();
516 released = true;
522 * Method needs to be synchronized to ensure that we don't end up starting multiple instances of
523 * the same service. As an example, we've seen a race condition where the local datastore service
524 * has not yet been initialized and two datastore requests come in at the exact same time. The
525 * first request looks in the service cache, doesn't find it, starts a new local datastore
526 * service, registers it in the service cache, and uses that local datastore service to handle the
527 * first request. Meanwhile the second request looks in the service cache, doesn't find it,
528 * starts a new local datastore service, registers it in the service cache (stomping on the
529 * original one), and uses that local datastore service to handle the second request. If both of
530 * these requests start txns we can end up with 2 requests receiving the same txn id, and that
531 * yields all sorts of exciting behavior. So, we synchronize this method to ensure that we only
532 * register a single instance of each service type.
534 @Override
535 public final synchronized LocalRpcService getService(final String pkg) {
536 LocalRpcService cachedService = serviceCache.get(pkg);
537 if (cachedService != null) {
538 return cachedService;
541 return AccessController.doPrivileged(
542 new PrivilegedAction<LocalRpcService>() {
543 @Override
544 public LocalRpcService run() {
545 return startServices(pkg);
550 @Override
551 public DevLogService getLogService() {
552 return (DevLogService) getService(DevLogService.PACKAGE);
555 private LocalRpcService startServices(String pkg) {
556 for (LocalRpcService service : ServiceLoader.load(LocalRpcService.class,
557 ApiProxyLocalImpl.class.getClassLoader())) {
558 if (service.getPackage().equals(pkg)) {
559 service.init(context, properties);
560 service.start();
561 serviceCache.put(pkg, service);
562 return service;
565 return null;
568 private static Level toJavaLevel(ApiProxy.LogRecord.Level apiProxyLevel) {
569 switch (apiProxyLevel) {
570 case debug:
571 return Level.FINE;
572 case info:
573 return Level.INFO;
574 case warn:
575 return Level.WARNING;
576 case error:
577 return Level.SEVERE;
578 case fatal:
579 return Level.SEVERE;
580 default:
581 return Level.WARNING;
585 @Override
586 public Clock getClock() {
587 return clock;
590 @Override
591 public void setClock(Clock clock) {
592 this.clock = clock;
595 private static class DaemonThreadFactory implements ThreadFactory {
597 private final ThreadFactory parent;
599 public DaemonThreadFactory(ThreadFactory parent) {
600 this.parent = parent;
603 @Override
604 public Thread newThread(Runnable r) {
605 Thread thread = parent.newThread(r);
606 thread.setDaemon(true);
607 return thread;