1 // Copyright 2008 Google Inc. All Rights Reserved.
3 package com
.google
.appengine
.tools
.development
;
5 import com
.google
.appengine
.api
.capabilities
.CapabilityStatus
;
6 import com
.google
.appengine
.tools
.development
.LocalRpcService
.Status
;
7 import com
.google
.apphosting
.api
.ApiProxy
;
8 import com
.google
.apphosting
.api
.ApiProxy
.CallNotFoundException
;
9 import com
.google
.apphosting
.api
.ApiProxy
.Environment
;
10 import com
.google
.apphosting
.api
.ApiProxy
.FeatureNotEnabledException
;
11 import com
.google
.apphosting
.api
.ApiProxy
.LogRecord
;
12 import com
.google
.apphosting
.api
.ApiProxy
.RequestTooLargeException
;
13 import com
.google
.apphosting
.api
.ApiProxy
.UnknownException
;
14 import com
.google
.io
.protocol
.ProtocolMessage
;
15 import com
.google
.protobuf
.Message
;
17 import java
.lang
.reflect
.InvocationTargetException
;
18 import java
.lang
.reflect
.Method
;
19 import java
.security
.AccessController
;
20 import java
.security
.PrivilegedAction
;
21 import java
.util
.Arrays
;
22 import java
.util
.HashMap
;
23 import java
.util
.List
;
25 import java
.util
.ServiceLoader
;
26 import java
.util
.concurrent
.Callable
;
27 import java
.util
.concurrent
.CancellationException
;
28 import java
.util
.concurrent
.ConcurrentHashMap
;
29 import java
.util
.concurrent
.ExecutionException
;
30 import java
.util
.concurrent
.ExecutorService
;
31 import java
.util
.concurrent
.Executors
;
32 import java
.util
.concurrent
.Future
;
33 import java
.util
.concurrent
.Semaphore
;
34 import java
.util
.concurrent
.ThreadFactory
;
35 import java
.util
.concurrent
.TimeUnit
;
36 import java
.util
.concurrent
.TimeoutException
;
37 import java
.util
.logging
.Level
;
38 import java
.util
.logging
.Logger
;
41 * Implements ApiProxy.Delegate such that the requests are dispatched to local service
42 * implementations. Used for both the {@link com.google.appengine.tools.development.DevAppServer}
43 * and for unit testing services.
46 class ApiProxyLocalImpl
implements ApiProxyLocal
, DevServices
{
48 private static final Class
<?
> BYTE_ARRAY_CLASS
= byte[].class;
50 private static final String FILESAPI_DISABLED_MESSAGE
=
51 "The Files API is disabled. Further information: "
52 + "https://cloud.google.com/appengine/docs/deprecations/files_api";
55 * The maximum size of any given API request.
57 private static final int MAX_API_REQUEST_SIZE
= 1048576;
59 private static final String API_DEADLINE_KEY
=
60 "com.google.apphosting.api.ApiProxy.api_deadline_key";
62 static final String IS_OFFLINE_REQUEST_KEY
= "com.google.appengine.request.offline";
65 * Implementation of the {@link LocalServiceContext} interface
67 private class LocalServiceContextImpl
implements LocalServiceContext
{
70 * The local server environment
72 private final LocalServerEnvironment localServerEnvironment
;
74 private final LocalCapabilitiesEnvironment localCapabilitiesEnvironment
=
75 new LocalCapabilitiesEnvironment(System
.getProperties());
78 * Creates a new context, for the given application.
80 * @param localServerEnvironment The environment for the local server.
82 public LocalServiceContextImpl(LocalServerEnvironment localServerEnvironment
) {
83 this.localServerEnvironment
= localServerEnvironment
;
87 public LocalServerEnvironment
getLocalServerEnvironment() {
88 return localServerEnvironment
;
92 public LocalCapabilitiesEnvironment
getLocalCapabilitiesEnvironment() {
93 return localCapabilitiesEnvironment
;
97 public Clock
getClock() {
102 public LocalRpcService
getLocalService(String packageName
) {
103 return ApiProxyLocalImpl
.this.getService(packageName
);
107 private static final Logger logger
= Logger
.getLogger(ApiProxyLocalImpl
.class.getName());
109 private final Map
<String
, LocalRpcService
> serviceCache
=
110 new ConcurrentHashMap
<String
, LocalRpcService
>();
112 private final Map
<String
, Method
> methodCache
= new ConcurrentHashMap
<String
, Method
>();
113 final Map
<Method
, LatencySimulator
> latencySimulatorCache
=
114 new ConcurrentHashMap
<Method
, LatencySimulator
>();
116 private final Map
<String
, String
> properties
= new HashMap
<String
, String
>();
118 private final ExecutorService apiExecutor
= Executors
.newCachedThreadPool(
119 new DaemonThreadFactory(Executors
.defaultThreadFactory()));
121 private final LocalServiceContext context
;
123 private Clock clock
= Clock
.DEFAULT
;
126 * Creates the local proxy in a given context
128 * @param environment the local server environment.
130 protected ApiProxyLocalImpl(LocalServerEnvironment environment
) {
131 this.context
= new LocalServiceContextImpl(environment
);
135 public void log(Environment environment
, LogRecord record
) {
136 logger
.log(toJavaLevel(record
.getLevel()), record
.getMessage());
140 public void flushLogs(Environment environment
) {
145 public byte[] makeSyncCall(ApiProxy
.Environment environment
, String packageName
,
146 String methodName
, byte[] requestBytes
) {
147 ApiProxy
.ApiConfig apiConfig
= null;
148 Double deadline
= (Double
) environment
.getAttributes().get(API_DEADLINE_KEY
);
149 if (deadline
!= null) {
150 apiConfig
= new ApiProxy
.ApiConfig();
151 apiConfig
.setDeadlineInSeconds(deadline
);
154 Future
<byte[]> future
=
155 makeAsyncCall(environment
, packageName
, methodName
, requestBytes
, apiConfig
);
158 } catch (InterruptedException ex
) {
159 throw new ApiProxy
.CancelledException(packageName
, methodName
);
160 } catch (CancellationException ex
) {
161 throw new ApiProxy
.CancelledException(packageName
, methodName
);
162 } catch (ExecutionException ex
) {
163 if (ex
.getCause() instanceof RuntimeException
) {
164 throw (RuntimeException
) ex
.getCause();
165 } else if (ex
.getCause() instanceof Error
) {
166 throw (Error
) ex
.getCause();
168 throw new ApiProxy
.UnknownException(packageName
, methodName
, ex
.getCause());
174 public Future
<byte[]> makeAsyncCall(Environment environment
, final String packageName
,
175 final String methodName
, byte[] requestBytes
, ApiProxy
.ApiConfig apiConfig
) {
176 Semaphore semaphore
= (Semaphore
) environment
.getAttributes().get(
177 LocalEnvironment
.API_CALL_SEMAPHORE
);
178 if (semaphore
!= null) {
181 } catch (InterruptedException ex
) {
182 throw new RuntimeException("Interrupted while waiting on semaphore:", ex
);
185 boolean offline
= environment
.getAttributes().get(IS_OFFLINE_REQUEST_KEY
) != null;
186 AsyncApiCall asyncApiCall
= new AsyncApiCall(
187 environment
, packageName
, methodName
, requestBytes
, semaphore
);
188 boolean success
= false;
190 Callable
<byte[]> callable
= Executors
.privilegedCallable(asyncApiCall
);
192 Future
<byte[]> resultFuture
= AccessController
.doPrivileged(
193 new PrivilegedApiAction(callable
, asyncApiCall
));
195 if (context
.getLocalServerEnvironment().enforceApiDeadlines()) {
196 long deadlineMillis
= (long) (1000.0 * resolveDeadline(packageName
, apiConfig
, offline
));
197 resultFuture
= new TimedFuture
<byte[]>(resultFuture
, deadlineMillis
, clock
) {
199 protected RuntimeException
createDeadlineException() {
200 return new ApiProxy
.ApiDeadlineExceededException(packageName
, methodName
);
207 asyncApiCall
.tryReleaseSemaphore();
213 public List
<Thread
> getRequestThreads(Environment environment
) {
214 return Arrays
.asList(new Thread
[]{Thread
.currentThread()});
217 private double resolveDeadline(String packageName
, ApiProxy
.ApiConfig apiConfig
,
219 LocalRpcService service
= getService(packageName
);
220 Double deadline
= null;
221 if (apiConfig
!= null) {
222 deadline
= apiConfig
.getDeadlineInSeconds();
224 if (deadline
== null && service
!= null) {
225 deadline
= service
.getDefaultDeadline(isOffline
);
227 if (deadline
== null) {
231 Double maxDeadline
= null;
232 if (service
!= null) {
233 maxDeadline
= service
.getMaximumDeadline(isOffline
);
235 if (maxDeadline
== null) {
238 return Math
.min(deadline
, maxDeadline
);
241 private class PrivilegedApiAction
implements PrivilegedAction
<Future
<byte[]>> {
243 private final Callable
<byte[]> callable
;
244 private final AsyncApiCall asyncApiCall
;
246 PrivilegedApiAction(Callable
<byte[]> callable
, AsyncApiCall asyncApiCall
) {
247 this.callable
= callable
;
248 this.asyncApiCall
= asyncApiCall
;
252 public Future
<byte[]> run() {
253 final Future
<byte[]> result
= apiExecutor
.submit(callable
);
254 return new Future
<byte[]>() {
256 public boolean cancel(final boolean mayInterruptIfRunning
) {
257 return AccessController
.doPrivileged(
258 new PrivilegedAction
<Boolean
>() {
260 public Boolean
run() {
261 asyncApiCall
.tryReleaseSemaphore();
262 return result
.cancel(mayInterruptIfRunning
);
268 public boolean isCancelled() {
269 return result
.isCancelled();
273 public boolean isDone() {
274 return result
.isDone();
278 public byte[] get() throws InterruptedException
, ExecutionException
{
283 public byte[] get(long timeout
, TimeUnit unit
)
284 throws InterruptedException
, ExecutionException
, TimeoutException
{
285 return result
.get(timeout
, unit
);
292 * Convert the specified byte array to a protocol buffer representation of the specified type.
293 * This type can either be a subclass of {@link ProtocolMessage} (a legacy protocol buffer
294 * implementation), or {@link Message} (the open-sourced protocol buffer implementation).
296 private <T
> T
convertBytesToPb(byte[] bytes
, Class
<T
> requestClass
)
297 throws IllegalAccessException
, InstantiationException
,
298 InvocationTargetException
, NoSuchMethodException
{
299 if (ProtocolMessage
.class.isAssignableFrom(requestClass
)) {
300 ProtocolMessage
<?
> proto
= (ProtocolMessage
<?
>) requestClass
.newInstance();
301 boolean parsed
= proto
.mergeFrom(bytes
);
302 if (!parsed
|| !proto
.isInitialized()) {
303 throw new RuntimeException(
304 "Could not parse request bytes into " + classDescription(requestClass
));
306 return requestClass
.cast(proto
);
308 if (Message
.class.isAssignableFrom(requestClass
)) {
309 Method method
= requestClass
.getMethod("parseFrom", BYTE_ARRAY_CLASS
);
310 return requestClass
.cast(method
.invoke(null, bytes
));
312 throw new UnsupportedOperationException(String
.format("Cannot assign %s to either %s or %s",
313 classDescription(requestClass
), ProtocolMessage
.class, Message
.class));
317 * Convert the protocol buffer representation to a byte array. The object can either be an
318 * instance of {@link ProtocolMessage} (a legacy protocol buffer implementation), or {@link
319 * Message} (the open-sourced protocol buffer implementation).
321 private byte[] convertPbToBytes(Object object
) {
322 if (object
instanceof ProtocolMessage
) {
323 return ((ProtocolMessage
<?
>) object
).toByteArray();
325 if (object
instanceof Message
) {
326 return ((Message
) object
).toByteArray();
328 throw new UnsupportedOperationException(String
.format("%s is neither %s nor %s",
329 classDescription(object
.getClass()), ProtocolMessage
.class, Message
.class));
333 * Create a textual description of a class that is appropriate for
334 * troubleshooting problems with {@link #convertBytesToPb(byte[], Class)}
335 * or {@link #convertPbToBytes(Object)}.
337 * @param klass The class to create a description for.
338 * @return A string description.
340 private static String
classDescription(Class
<?
> klass
) {
341 return String
.format("(%s extends %s loaded from %s)",
342 klass
, klass
.getSuperclass(),
343 klass
.getProtectionDomain().getCodeSource().getLocation());
347 public void setProperty(String property
, String value
) {
348 properties
.put(property
, value
);
352 * Resets the service properties to {@code properties}.
354 * @param properties a maybe {@code null} set of properties for local services.
357 public void setProperties(Map
<String
, String
> properties
) {
358 this.properties
.clear();
359 if (properties
!= null) {
360 this.appendProperties(properties
);
365 * Appends the given service properties to {@code properties}.
367 * @param properties a set of properties to append for local services.
370 public void appendProperties(Map
<String
, String
> properties
) {
371 this.properties
.putAll(properties
);
375 * Stops all services started by this ApiProxy and releases all of its resources.
379 for (LocalRpcService service
: serviceCache
.values()) {
383 serviceCache
.clear();
385 latencySimulatorCache
.clear();
389 int getMaxApiRequestSize(LocalRpcService rpcService
) {
390 Integer size
= rpcService
.getMaxApiRequestSize();
392 return MAX_API_REQUEST_SIZE
;
397 private Method
getDispatchMethod(LocalRpcService service
, String packageName
, String methodName
) {
398 String dispatchName
= Character
.toLowerCase(methodName
.charAt(0)) + methodName
.substring(1);
399 String methodId
= packageName
+ "." + dispatchName
;
400 Method method
= methodCache
.get(methodId
);
401 if (method
!= null) {
404 for (Method candidate
: service
.getClass().getMethods()) {
405 if (dispatchName
.equals(candidate
.getName())) {
406 methodCache
.put(methodId
, candidate
);
407 LatencyPercentiles latencyPercentiles
= candidate
.getAnnotation(LatencyPercentiles
.class);
408 if (latencyPercentiles
== null) {
410 latencyPercentiles
= service
.getClass().getAnnotation(LatencyPercentiles
.class);
412 if (latencyPercentiles
!= null) {
413 latencySimulatorCache
.put(candidate
, new LatencySimulator(latencyPercentiles
));
418 throw new CallNotFoundException(packageName
, methodName
);
421 private class AsyncApiCall
implements Callable
<byte[]> {
423 private final Environment environment
;
424 private final String packageName
;
425 private final String methodName
;
426 private final byte[] requestBytes
;
427 private final Semaphore semaphore
;
428 private boolean released
;
430 public AsyncApiCall(Environment environment
, String packageName
, String methodName
,
431 byte[] requestBytes
, Semaphore semaphore
) {
432 this.environment
= environment
;
433 this.packageName
= packageName
;
434 this.methodName
= methodName
;
435 this.requestBytes
= requestBytes
;
436 this.semaphore
= semaphore
;
440 public byte[] call() {
442 return callInternal();
444 tryReleaseSemaphore();
448 private byte[] callInternal() {
449 ApiProxy
.setEnvironmentForCurrentThread(environment
);
451 LocalRpcService service
= getService(packageName
);
452 if (service
== null) {
453 throw new CallNotFoundException(packageName
, methodName
);
456 if ("file".equals(packageName
)) {
457 if (Boolean
.getBoolean("appengine.enableFilesApi")) {
458 environment
.getAttributes().put(LocalEnvironment
.FILESAPI_WAS_USED
, true);
460 throw new FeatureNotEnabledException(
461 FILESAPI_DISABLED_MESSAGE
, packageName
, methodName
);
465 LocalCapabilitiesEnvironment capEnv
= context
.getLocalCapabilitiesEnvironment();
466 CapabilityStatus capabilityStatus
= capEnv
467 .getStatusFromMethodName(packageName
, methodName
);
468 if (!CapabilityStatus
.ENABLED
.equals(capabilityStatus
)) {
469 throw new ApiProxy
.CapabilityDisabledException(
470 "Setup in local configuration.", packageName
, methodName
);
473 if (requestBytes
.length
> getMaxApiRequestSize(service
)) {
474 throw new RequestTooLargeException(packageName
, methodName
);
477 Method method
= getDispatchMethod(service
, packageName
, methodName
);
478 Status status
= new Status();
479 Class
<?
> requestClass
= method
.getParameterTypes()[1];
480 Object request
= convertBytesToPb(requestBytes
, requestClass
);
482 long start
= clock
.getCurrentTime();
484 return convertPbToBytes(method
.invoke(service
, status
, request
));
486 LatencySimulator latencySimulator
= latencySimulatorCache
.get(method
);
487 if (latencySimulator
!= null) {
488 if (context
.getLocalServerEnvironment().simulateProductionLatencies()) {
489 latencySimulator
.simulateLatency(clock
.getCurrentTime() - start
, service
, request
);
493 } catch (IllegalAccessException e
) {
494 throw new UnknownException(packageName
, methodName
, e
);
495 } catch (InstantiationException e
) {
496 throw new UnknownException(packageName
, methodName
, e
);
497 } catch (NoSuchMethodException e
) {
498 throw new UnknownException(packageName
, methodName
, e
);
499 } catch (InvocationTargetException e
) {
500 if (e
.getCause() instanceof RuntimeException
) {
501 throw (RuntimeException
) e
.getCause();
503 throw new UnknownException(packageName
, methodName
, e
.getCause());
505 ApiProxy
.clearEnvironmentForCurrentThread();
510 * Synchronized method that ensures the semaphore that was claimed for this API call only gets
513 synchronized void tryReleaseSemaphore() {
514 if (!released
&& semaphore
!= null) {
522 * Method needs to be synchronized to ensure that we don't end up starting multiple instances of
523 * the same service. As an example, we've seen a race condition where the local datastore service
524 * has not yet been initialized and two datastore requests come in at the exact same time. The
525 * first request looks in the service cache, doesn't find it, starts a new local datastore
526 * service, registers it in the service cache, and uses that local datastore service to handle the
527 * first request. Meanwhile the second request looks in the service cache, doesn't find it,
528 * starts a new local datastore service, registers it in the service cache (stomping on the
529 * original one), and uses that local datastore service to handle the second request. If both of
530 * these requests start txns we can end up with 2 requests receiving the same txn id, and that
531 * yields all sorts of exciting behavior. So, we synchronize this method to ensure that we only
532 * register a single instance of each service type.
535 public final synchronized LocalRpcService
getService(final String pkg
) {
536 LocalRpcService cachedService
= serviceCache
.get(pkg
);
537 if (cachedService
!= null) {
538 return cachedService
;
541 return AccessController
.doPrivileged(
542 new PrivilegedAction
<LocalRpcService
>() {
544 public LocalRpcService
run() {
545 return startServices(pkg
);
551 public DevLogService
getLogService() {
552 return (DevLogService
) getService(DevLogService
.PACKAGE
);
555 private LocalRpcService
startServices(String pkg
) {
556 for (LocalRpcService service
: ServiceLoader
.load(LocalRpcService
.class,
557 ApiProxyLocalImpl
.class.getClassLoader())) {
558 if (service
.getPackage().equals(pkg
)) {
559 service
.init(context
, properties
);
561 serviceCache
.put(pkg
, service
);
568 private static Level
toJavaLevel(ApiProxy
.LogRecord
.Level apiProxyLevel
) {
569 switch (apiProxyLevel
) {
575 return Level
.WARNING
;
581 return Level
.WARNING
;
586 public Clock
getClock() {
591 public void setClock(Clock clock
) {
595 private static class DaemonThreadFactory
implements ThreadFactory
{
597 private final ThreadFactory parent
;
599 public DaemonThreadFactory(ThreadFactory parent
) {
600 this.parent
= parent
;
604 public Thread
newThread(Runnable r
) {
605 Thread thread
= parent
.newThread(r
);
606 thread
.setDaemon(true);