1 // Copyright 2008 Google Inc. All Rights Reserved.
3 package com
.google
.appengine
.tools
.development
;
5 import com
.google
.appengine
.api
.capabilities
.CapabilityStatus
;
6 import com
.google
.appengine
.tools
.development
.LocalRpcService
.Status
;
7 import com
.google
.apphosting
.api
.ApiProxy
;
8 import com
.google
.apphosting
.api
.ApiProxy
.CallNotFoundException
;
9 import com
.google
.apphosting
.api
.ApiProxy
.Environment
;
10 import com
.google
.apphosting
.api
.ApiProxy
.LogRecord
;
11 import com
.google
.apphosting
.api
.ApiProxy
.RequestTooLargeException
;
12 import com
.google
.apphosting
.api
.ApiProxy
.UnknownException
;
13 import com
.google
.io
.protocol
.ProtocolMessage
;
14 import com
.google
.protobuf
.Message
;
16 import java
.lang
.reflect
.InvocationTargetException
;
17 import java
.lang
.reflect
.Method
;
18 import java
.security
.AccessController
;
19 import java
.security
.PrivilegedAction
;
20 import java
.util
.Arrays
;
21 import java
.util
.HashMap
;
22 import java
.util
.List
;
24 import java
.util
.ServiceLoader
;
25 import java
.util
.concurrent
.Callable
;
26 import java
.util
.concurrent
.CancellationException
;
27 import java
.util
.concurrent
.ConcurrentHashMap
;
28 import java
.util
.concurrent
.ExecutionException
;
29 import java
.util
.concurrent
.ExecutorService
;
30 import java
.util
.concurrent
.Executors
;
31 import java
.util
.concurrent
.Future
;
32 import java
.util
.concurrent
.Semaphore
;
33 import java
.util
.concurrent
.ThreadFactory
;
34 import java
.util
.concurrent
.TimeUnit
;
35 import java
.util
.concurrent
.TimeoutException
;
36 import java
.util
.logging
.Level
;
37 import java
.util
.logging
.Logger
;
40 * Implements ApiProxy.Delegate such that the requests are dispatched to local service
41 * implementations. Used for both the {@link com.google.appengine.tools.development.DevAppServer}
42 * and for unit testing services.
45 class ApiProxyLocalImpl
implements ApiProxyLocal
{
47 private static final Class
<?
> BYTE_ARRAY_CLASS
= byte[].class;
50 * The maximum size of any given API request.
52 private static final int MAX_API_REQUEST_SIZE
= 1048576;
54 private static final String API_DEADLINE_KEY
=
55 "com.google.apphosting.api.ApiProxy.api_deadline_key";
57 static final String IS_OFFLINE_REQUEST_KEY
= "com.google.appengine.request.offline";
60 * Implementation of the {@link LocalServiceContext} interface
62 private class LocalServiceContextImpl
implements LocalServiceContext
{
65 * The local server environment
67 private final LocalServerEnvironment localServerEnvironment
;
69 private final LocalCapabilitiesEnvironment localCapabilitiesEnvironment
=
70 new LocalCapabilitiesEnvironment(System
.getProperties());
73 * Creates a new context, for the given application.
75 * @param localServerEnvironment The environment for the local server.
77 public LocalServiceContextImpl(LocalServerEnvironment localServerEnvironment
) {
78 this.localServerEnvironment
= localServerEnvironment
;
82 public LocalServerEnvironment
getLocalServerEnvironment() {
83 return localServerEnvironment
;
87 public LocalCapabilitiesEnvironment
getLocalCapabilitiesEnvironment() {
88 return localCapabilitiesEnvironment
;
92 public Clock
getClock() {
97 public LocalRpcService
getLocalService(String packageName
) {
98 return ApiProxyLocalImpl
.this.getService(packageName
);
102 private static final Logger logger
= Logger
.getLogger(ApiProxyLocalImpl
.class.getName());
104 private final Map
<String
, LocalRpcService
> serviceCache
=
105 new ConcurrentHashMap
<String
, LocalRpcService
>();
107 private final Map
<String
, Method
> methodCache
= new ConcurrentHashMap
<String
, Method
>();
108 final Map
<Method
, LatencySimulator
> latencySimulatorCache
=
109 new ConcurrentHashMap
<Method
, LatencySimulator
>();
111 private final Map
<String
, String
> properties
= new HashMap
<String
, String
>();
113 private final ExecutorService apiExecutor
= Executors
.newCachedThreadPool(
114 new DaemonThreadFactory(Executors
.defaultThreadFactory()));
116 private final LocalServiceContext context
;
118 private Clock clock
= Clock
.DEFAULT
;
121 * Creates the local proxy in a given context
123 * @param environment the local server environment.
125 protected ApiProxyLocalImpl(LocalServerEnvironment environment
) {
126 this.context
= new LocalServiceContextImpl(environment
);
130 public void log(Environment environment
, LogRecord record
) {
131 logger
.log(toJavaLevel(record
.getLevel()), record
.getMessage());
135 public void flushLogs(Environment environment
) {
140 public byte[] makeSyncCall(ApiProxy
.Environment environment
, String packageName
,
141 String methodName
, byte[] requestBytes
) {
142 ApiProxy
.ApiConfig apiConfig
= null;
143 Double deadline
= (Double
) environment
.getAttributes().get(API_DEADLINE_KEY
);
144 if (deadline
!= null) {
145 apiConfig
= new ApiProxy
.ApiConfig();
146 apiConfig
.setDeadlineInSeconds(deadline
);
149 Future
<byte[]> future
=
150 doAsyncCall(environment
, packageName
, methodName
, requestBytes
, apiConfig
);
153 } catch (InterruptedException ex
) {
154 throw new ApiProxy
.CancelledException(packageName
, methodName
);
155 } catch (CancellationException ex
) {
156 throw new ApiProxy
.CancelledException(packageName
, methodName
);
157 } catch (ExecutionException ex
) {
158 if (ex
.getCause() instanceof RuntimeException
) {
159 throw (RuntimeException
) ex
.getCause();
160 } else if (ex
.getCause() instanceof Error
) {
161 throw (Error
) ex
.getCause();
163 throw new ApiProxy
.UnknownException(packageName
, methodName
, ex
.getCause());
169 public Future
<byte[]> makeAsyncCall(Environment environment
, String packageName
,
170 String methodName
, byte[] requestBytes
, ApiProxy
.ApiConfig apiConfig
) {
171 return doAsyncCall(environment
, packageName
, methodName
, requestBytes
, apiConfig
);
175 public List
<Thread
> getRequestThreads(Environment environment
) {
176 return Arrays
.asList(new Thread
[]{Thread
.currentThread()});
179 private Future
<byte[]> doAsyncCall(Environment environment
, final String packageName
,
180 final String methodName
, byte[] requestBytes
, ApiProxy
.ApiConfig apiConfig
) {
181 Semaphore semaphore
= (Semaphore
) environment
.getAttributes().get(
182 LocalEnvironment
.API_CALL_SEMAPHORE
);
183 if (semaphore
!= null) {
186 } catch (InterruptedException ex
) {
187 throw new RuntimeException("Interrupted while waiting on semaphore:", ex
);
190 boolean offline
= environment
.getAttributes().get(IS_OFFLINE_REQUEST_KEY
) != null;
191 AsyncApiCall asyncApiCall
= new AsyncApiCall(
192 environment
, packageName
, methodName
, requestBytes
, semaphore
);
193 boolean success
= false;
195 Callable
<byte[]> callable
= Executors
.privilegedCallable(asyncApiCall
);
197 Future
<byte[]> resultFuture
= AccessController
.doPrivileged(
198 new PrivilegedApiAction(callable
, asyncApiCall
));
200 if (context
.getLocalServerEnvironment().enforceApiDeadlines()) {
201 long deadlineMillis
= (long) (1000.0 * resolveDeadline(packageName
, apiConfig
, offline
));
202 resultFuture
= new TimedFuture
<byte[]>(resultFuture
, deadlineMillis
, clock
) {
204 protected RuntimeException
createDeadlineException() {
205 throw new ApiProxy
.ApiDeadlineExceededException(packageName
, methodName
);
212 asyncApiCall
.tryReleaseSemaphore();
217 private double resolveDeadline(String packageName
, ApiProxy
.ApiConfig apiConfig
,
219 LocalRpcService service
= getService(packageName
);
220 Double deadline
= null;
221 if (apiConfig
!= null) {
222 deadline
= apiConfig
.getDeadlineInSeconds();
224 if (deadline
== null && service
!= null) {
225 deadline
= service
.getDefaultDeadline(isOffline
);
227 if (deadline
== null) {
231 Double maxDeadline
= null;
232 if (service
!= null) {
233 maxDeadline
= service
.getMaximumDeadline(isOffline
);
235 if (maxDeadline
== null) {
238 return Math
.min(deadline
, maxDeadline
);
241 private class PrivilegedApiAction
implements PrivilegedAction
<Future
<byte[]>> {
243 private final Callable
<byte[]> callable
;
244 private final AsyncApiCall asyncApiCall
;
246 PrivilegedApiAction(Callable
<byte[]> callable
, AsyncApiCall asyncApiCall
) {
247 this.callable
= callable
;
248 this.asyncApiCall
= asyncApiCall
;
252 public Future
<byte[]> run() {
253 final Future
<byte[]> result
= apiExecutor
.submit(callable
);
254 return new Future
<byte[]>() {
256 public boolean cancel(final boolean mayInterruptIfRunning
) {
257 return AccessController
.doPrivileged(
258 new PrivilegedAction
<Boolean
>() {
260 public Boolean
run() {
261 asyncApiCall
.tryReleaseSemaphore();
262 return result
.cancel(mayInterruptIfRunning
);
268 public boolean isCancelled() {
269 return result
.isCancelled();
273 public boolean isDone() {
274 return result
.isDone();
278 public byte[] get() throws InterruptedException
, ExecutionException
{
283 public byte[] get(long timeout
, TimeUnit unit
)
284 throws InterruptedException
, ExecutionException
, TimeoutException
{
285 return result
.get(timeout
, unit
);
292 * Convert the specified byte array to a protocol buffer representation of the specified type.
293 * This type can either be a subclass of {@link ProtocolMessage} (a legacy protocol buffer
294 * implementation), or {@link Message} (the open-sourced protocol buffer implementation).
296 private <T
> T
convertBytesToPb(byte[] bytes
, Class
<T
> requestClass
)
297 throws IllegalAccessException
, InstantiationException
,
298 InvocationTargetException
, NoSuchMethodException
{
299 if (ProtocolMessage
.class.isAssignableFrom(requestClass
)) {
300 ProtocolMessage
<?
> proto
= (ProtocolMessage
<?
>) requestClass
.newInstance();
301 proto
.mergeFrom(bytes
);
302 return requestClass
.cast(proto
);
304 if (Message
.class.isAssignableFrom(requestClass
)) {
305 Method method
= requestClass
.getMethod("parseFrom", BYTE_ARRAY_CLASS
);
306 return requestClass
.cast(method
.invoke(null, bytes
));
308 throw new UnsupportedOperationException(String
.format("Cannot assign %s to either %s or %s",
309 classDescription(requestClass
), ProtocolMessage
.class, Message
.class));
313 * Convert the protocol buffer representation to a byte array. The object can either be an
314 * instance of {@link ProtocolMessage} (a legacy protocol buffer implementation), or {@link
315 * Message} (the open-sourced protocol buffer implementation).
317 private byte[] convertPbToBytes(Object object
) {
318 if (object
instanceof ProtocolMessage
) {
319 return ((ProtocolMessage
<?
>) object
).toByteArray();
321 if (object
instanceof Message
) {
322 return ((Message
) object
).toByteArray();
324 throw new UnsupportedOperationException(String
.format("%s is neither %s nor %s",
325 classDescription(object
.getClass()), ProtocolMessage
.class, Message
.class));
329 * Create a textual description of a class that is appropriate for
330 * troubleshooting problems with {@link #convertBytesToPb(byte[], Class)}
331 * or {@link #convertPbToBytes(Object)}.
333 * @param klass The class to create a description for.
334 * @return A string description.
336 private static String
classDescription(Class
<?
> klass
) {
337 return String
.format("(%s extends %s loaded from %s)",
338 klass
, klass
.getSuperclass(),
339 klass
.getProtectionDomain().getCodeSource().getLocation());
343 public void setProperty(String property
, String value
) {
344 properties
.put(property
, value
);
348 * Resets the service properties to {@code properties}.
350 * @param properties a maybe {@code null} set of properties for local services.
353 public void setProperties(Map
<String
, String
> properties
) {
354 this.properties
.clear();
355 if (properties
!= null) {
356 this.appendProperties(properties
);
361 * Appends the given service properties to {@code properties}.
363 * @param properties a set of properties to append for local services.
366 public void appendProperties(Map
<String
, String
> properties
) {
367 this.properties
.putAll(properties
);
371 * Stops all services started by this ApiProxy and releases all of its resources.
375 for (LocalRpcService service
: serviceCache
.values()) {
379 serviceCache
.clear();
381 latencySimulatorCache
.clear();
385 int getMaxApiRequestSize(LocalRpcService rpcService
) {
386 Integer size
= rpcService
.getMaxApiRequestSize();
388 return MAX_API_REQUEST_SIZE
;
393 private Method
getDispatchMethod(LocalRpcService service
, String packageName
, String methodName
) {
394 String dispatchName
= Character
.toLowerCase(methodName
.charAt(0)) + methodName
.substring(1);
395 String methodId
= packageName
+ "." + dispatchName
;
396 Method method
= methodCache
.get(methodId
);
397 if (method
!= null) {
400 for (Method candidate
: service
.getClass().getMethods()) {
401 if (dispatchName
.equals(candidate
.getName())) {
402 methodCache
.put(methodId
, candidate
);
403 LatencyPercentiles latencyPercentiles
= candidate
.getAnnotation(LatencyPercentiles
.class);
404 if (latencyPercentiles
== null) {
406 latencyPercentiles
= service
.getClass().getAnnotation(LatencyPercentiles
.class);
408 if (latencyPercentiles
!= null) {
409 latencySimulatorCache
.put(candidate
, new LatencySimulator(latencyPercentiles
));
414 throw new CallNotFoundException(packageName
, methodName
);
417 private class AsyncApiCall
implements Callable
<byte[]> {
419 private final Environment environment
;
420 private final String packageName
;
421 private final String methodName
;
422 private final byte[] requestBytes
;
423 private final Semaphore semaphore
;
424 private boolean released
;
426 public AsyncApiCall(Environment environment
, String packageName
, String methodName
,
427 byte[] requestBytes
, Semaphore semaphore
) {
428 this.environment
= environment
;
429 this.packageName
= packageName
;
430 this.methodName
= methodName
;
431 this.requestBytes
= requestBytes
;
432 this.semaphore
= semaphore
;
436 public byte[] call() {
438 return callInternal();
440 tryReleaseSemaphore();
444 private byte[] callInternal() {
445 ApiProxy
.setEnvironmentForCurrentThread(environment
);
447 LocalRpcService service
= getService(packageName
);
448 if (service
== null) {
449 throw new CallNotFoundException(packageName
, methodName
);
451 LocalCapabilitiesEnvironment capEnv
= context
.getLocalCapabilitiesEnvironment();
452 CapabilityStatus capabilityStatus
= capEnv
453 .getStatusFromMethodName(packageName
, methodName
);
454 if (!CapabilityStatus
.ENABLED
.equals(capabilityStatus
)) {
455 throw new ApiProxy
.CapabilityDisabledException(
456 "Setup in local configuration.", packageName
, methodName
);
459 if (requestBytes
.length
> getMaxApiRequestSize(service
)) {
460 throw new RequestTooLargeException(packageName
, methodName
);
463 Method method
= getDispatchMethod(service
, packageName
, methodName
);
464 Status status
= new Status();
465 Class
<?
> requestClass
= method
.getParameterTypes()[1];
466 Object request
= convertBytesToPb(requestBytes
, requestClass
);
468 long start
= clock
.getCurrentTime();
470 return convertPbToBytes(method
.invoke(service
, status
, request
));
472 LatencySimulator latencySimulator
= latencySimulatorCache
.get(method
);
473 if (latencySimulator
!= null) {
474 if (context
.getLocalServerEnvironment().simulateProductionLatencies()) {
475 latencySimulator
.simulateLatency(clock
.getCurrentTime() - start
, service
, request
);
479 } catch (IllegalAccessException e
) {
480 throw new UnknownException(packageName
, methodName
, e
);
481 } catch (InstantiationException e
) {
482 throw new UnknownException(packageName
, methodName
, e
);
483 } catch (NoSuchMethodException e
) {
484 throw new UnknownException(packageName
, methodName
, e
);
485 } catch (InvocationTargetException e
) {
486 if (e
.getCause() instanceof RuntimeException
) {
487 throw (RuntimeException
) e
.getCause();
489 throw new UnknownException(packageName
, methodName
, e
.getCause());
491 ApiProxy
.clearEnvironmentForCurrentThread();
496 * Synchronized method that ensures the semaphore that was claimed for this API call only gets
499 synchronized void tryReleaseSemaphore() {
500 if (!released
&& semaphore
!= null) {
508 * Method needs to be synchronized to ensure that we don't end up starting multiple instances of
509 * the same service. As an example, we've seen a race condition where the local datastore service
510 * has not yet been initialized and two datastore requests come in at the exact same time. The
511 * first request looks in the service cache, doesn't find it, starts a new local datastore
512 * service, registers it in the service cache, and uses that local datastore service to handle the
513 * first request. Meanwhile the second request looks in the service cache, doesn't find it,
514 * starts a new local datastore service, registers it in the service cache (stomping on the
515 * original one), and uses that local datastore service to handle the second request. If both of
516 * these requests start txns we can end up with 2 requests receiving the same txn id, and that
517 * yields all sorts of exciting behavior. So, we synchronize this method to ensure that we only
518 * register a single instance of each service type.
521 public synchronized final LocalRpcService
getService(final String pkg
) {
522 LocalRpcService cachedService
= serviceCache
.get(pkg
);
523 if (cachedService
!= null) {
524 return cachedService
;
527 return AccessController
.doPrivileged(
528 new PrivilegedAction
<LocalRpcService
>() {
530 public LocalRpcService
run() {
531 return startServices(pkg
);
536 private LocalRpcService
startServices(String pkg
) {
537 for (LocalRpcService service
: ServiceLoader
.load(LocalRpcService
.class,
538 ApiProxyLocalImpl
.class.getClassLoader())) {
539 if (service
.getPackage().equals(pkg
)) {
540 service
.init(context
, properties
);
542 serviceCache
.put(pkg
, service
);
549 private static Level
toJavaLevel(ApiProxy
.LogRecord
.Level apiProxyLevel
) {
550 switch (apiProxyLevel
) {
556 return Level
.WARNING
;
562 return Level
.WARNING
;
567 public Clock
getClock() {
572 public void setClock(Clock clock
) {
576 private static class DaemonThreadFactory
implements ThreadFactory
{
578 private final ThreadFactory parent
;
580 public DaemonThreadFactory(ThreadFactory parent
) {
581 this.parent
= parent
;
585 public Thread
newThread(Runnable r
) {
586 Thread thread
= parent
.newThread(r
);
587 thread
.setDaemon(true);