1 // Copyright 2008 Google Inc. All Rights Reserved.
3 package com
.google
.appengine
.tools
.development
;
5 import com
.google
.appengine
.api
.capabilities
.CapabilityStatus
;
6 import com
.google
.appengine
.tools
.development
.LocalRpcService
.Status
;
7 import com
.google
.apphosting
.api
.ApiProxy
;
8 import com
.google
.apphosting
.api
.ApiProxy
.CallNotFoundException
;
9 import com
.google
.apphosting
.api
.ApiProxy
.Environment
;
10 import com
.google
.apphosting
.api
.ApiProxy
.LogRecord
;
11 import com
.google
.apphosting
.api
.ApiProxy
.RequestTooLargeException
;
12 import com
.google
.apphosting
.api
.ApiProxy
.UnknownException
;
13 import com
.google
.io
.protocol
.ProtocolMessage
;
14 import com
.google
.protobuf
.Message
;
16 import java
.lang
.reflect
.InvocationTargetException
;
17 import java
.lang
.reflect
.Method
;
18 import java
.security
.AccessController
;
19 import java
.security
.PrivilegedAction
;
20 import java
.util
.Arrays
;
21 import java
.util
.HashMap
;
22 import java
.util
.List
;
24 import java
.util
.ServiceLoader
;
25 import java
.util
.concurrent
.Callable
;
26 import java
.util
.concurrent
.CancellationException
;
27 import java
.util
.concurrent
.ConcurrentHashMap
;
28 import java
.util
.concurrent
.ExecutionException
;
29 import java
.util
.concurrent
.ExecutorService
;
30 import java
.util
.concurrent
.Executors
;
31 import java
.util
.concurrent
.Future
;
32 import java
.util
.concurrent
.Semaphore
;
33 import java
.util
.concurrent
.ThreadFactory
;
34 import java
.util
.concurrent
.TimeUnit
;
35 import java
.util
.concurrent
.TimeoutException
;
36 import java
.util
.logging
.Level
;
37 import java
.util
.logging
.Logger
;
40 * Implements ApiProxy.Delegate such that the requests are dispatched to local service
41 * implementations. Used for both the {@link com.google.appengine.tools.development.DevAppServer}
42 * and for unit testing services.
45 class ApiProxyLocalImpl
implements ApiProxyLocal
, DevServices
{
47 private static final Class
<?
> BYTE_ARRAY_CLASS
= byte[].class;
50 * The maximum size of any given API request.
52 private static final int MAX_API_REQUEST_SIZE
= 1048576;
54 private static final String API_DEADLINE_KEY
=
55 "com.google.apphosting.api.ApiProxy.api_deadline_key";
57 static final String IS_OFFLINE_REQUEST_KEY
= "com.google.appengine.request.offline";
60 * Implementation of the {@link LocalServiceContext} interface
62 private class LocalServiceContextImpl
implements LocalServiceContext
{
65 * The local server environment
67 private final LocalServerEnvironment localServerEnvironment
;
69 private final LocalCapabilitiesEnvironment localCapabilitiesEnvironment
=
70 new LocalCapabilitiesEnvironment(System
.getProperties());
73 * Creates a new context, for the given application.
75 * @param localServerEnvironment The environment for the local server.
77 public LocalServiceContextImpl(LocalServerEnvironment localServerEnvironment
) {
78 this.localServerEnvironment
= localServerEnvironment
;
82 public LocalServerEnvironment
getLocalServerEnvironment() {
83 return localServerEnvironment
;
87 public LocalCapabilitiesEnvironment
getLocalCapabilitiesEnvironment() {
88 return localCapabilitiesEnvironment
;
92 public Clock
getClock() {
97 public LocalRpcService
getLocalService(String packageName
) {
98 return ApiProxyLocalImpl
.this.getService(packageName
);
102 private static final Logger logger
= Logger
.getLogger(ApiProxyLocalImpl
.class.getName());
104 private final Map
<String
, LocalRpcService
> serviceCache
=
105 new ConcurrentHashMap
<String
, LocalRpcService
>();
107 private final Map
<String
, Method
> methodCache
= new ConcurrentHashMap
<String
, Method
>();
108 final Map
<Method
, LatencySimulator
> latencySimulatorCache
=
109 new ConcurrentHashMap
<Method
, LatencySimulator
>();
111 private final Map
<String
, String
> properties
= new HashMap
<String
, String
>();
113 private final ExecutorService apiExecutor
= Executors
.newCachedThreadPool(
114 new DaemonThreadFactory(Executors
.defaultThreadFactory()));
116 private final LocalServiceContext context
;
118 private Clock clock
= Clock
.DEFAULT
;
121 * Creates the local proxy in a given context
123 * @param environment the local server environment.
125 protected ApiProxyLocalImpl(LocalServerEnvironment environment
) {
126 this.context
= new LocalServiceContextImpl(environment
);
130 public void log(Environment environment
, LogRecord record
) {
131 logger
.log(toJavaLevel(record
.getLevel()), record
.getMessage());
135 public void flushLogs(Environment environment
) {
140 public byte[] makeSyncCall(ApiProxy
.Environment environment
, String packageName
,
141 String methodName
, byte[] requestBytes
) {
142 ApiProxy
.ApiConfig apiConfig
= null;
143 Double deadline
= (Double
) environment
.getAttributes().get(API_DEADLINE_KEY
);
144 if (deadline
!= null) {
145 apiConfig
= new ApiProxy
.ApiConfig();
146 apiConfig
.setDeadlineInSeconds(deadline
);
149 Future
<byte[]> future
=
150 makeAsyncCall(environment
, packageName
, methodName
, requestBytes
, apiConfig
);
153 } catch (InterruptedException ex
) {
154 throw new ApiProxy
.CancelledException(packageName
, methodName
);
155 } catch (CancellationException ex
) {
156 throw new ApiProxy
.CancelledException(packageName
, methodName
);
157 } catch (ExecutionException ex
) {
158 if (ex
.getCause() instanceof RuntimeException
) {
159 throw (RuntimeException
) ex
.getCause();
160 } else if (ex
.getCause() instanceof Error
) {
161 throw (Error
) ex
.getCause();
163 throw new ApiProxy
.UnknownException(packageName
, methodName
, ex
.getCause());
169 public Future
<byte[]> makeAsyncCall(Environment environment
, final String packageName
,
170 final String methodName
, byte[] requestBytes
, ApiProxy
.ApiConfig apiConfig
) {
171 Semaphore semaphore
= (Semaphore
) environment
.getAttributes().get(
172 LocalEnvironment
.API_CALL_SEMAPHORE
);
173 if (semaphore
!= null) {
176 } catch (InterruptedException ex
) {
177 throw new RuntimeException("Interrupted while waiting on semaphore:", ex
);
180 boolean offline
= environment
.getAttributes().get(IS_OFFLINE_REQUEST_KEY
) != null;
181 AsyncApiCall asyncApiCall
= new AsyncApiCall(
182 environment
, packageName
, methodName
, requestBytes
, semaphore
);
183 boolean success
= false;
185 Callable
<byte[]> callable
= Executors
.privilegedCallable(asyncApiCall
);
187 Future
<byte[]> resultFuture
= AccessController
.doPrivileged(
188 new PrivilegedApiAction(callable
, asyncApiCall
));
190 if (context
.getLocalServerEnvironment().enforceApiDeadlines()) {
191 long deadlineMillis
= (long) (1000.0 * resolveDeadline(packageName
, apiConfig
, offline
));
192 resultFuture
= new TimedFuture
<byte[]>(resultFuture
, deadlineMillis
, clock
) {
194 protected RuntimeException
createDeadlineException() {
195 return new ApiProxy
.ApiDeadlineExceededException(packageName
, methodName
);
202 asyncApiCall
.tryReleaseSemaphore();
208 public List
<Thread
> getRequestThreads(Environment environment
) {
209 return Arrays
.asList(new Thread
[]{Thread
.currentThread()});
212 private double resolveDeadline(String packageName
, ApiProxy
.ApiConfig apiConfig
,
214 LocalRpcService service
= getService(packageName
);
215 Double deadline
= null;
216 if (apiConfig
!= null) {
217 deadline
= apiConfig
.getDeadlineInSeconds();
219 if (deadline
== null && service
!= null) {
220 deadline
= service
.getDefaultDeadline(isOffline
);
222 if (deadline
== null) {
226 Double maxDeadline
= null;
227 if (service
!= null) {
228 maxDeadline
= service
.getMaximumDeadline(isOffline
);
230 if (maxDeadline
== null) {
233 return Math
.min(deadline
, maxDeadline
);
236 private class PrivilegedApiAction
implements PrivilegedAction
<Future
<byte[]>> {
238 private final Callable
<byte[]> callable
;
239 private final AsyncApiCall asyncApiCall
;
241 PrivilegedApiAction(Callable
<byte[]> callable
, AsyncApiCall asyncApiCall
) {
242 this.callable
= callable
;
243 this.asyncApiCall
= asyncApiCall
;
247 public Future
<byte[]> run() {
248 final Future
<byte[]> result
= apiExecutor
.submit(callable
);
249 return new Future
<byte[]>() {
251 public boolean cancel(final boolean mayInterruptIfRunning
) {
252 return AccessController
.doPrivileged(
253 new PrivilegedAction
<Boolean
>() {
255 public Boolean
run() {
256 asyncApiCall
.tryReleaseSemaphore();
257 return result
.cancel(mayInterruptIfRunning
);
263 public boolean isCancelled() {
264 return result
.isCancelled();
268 public boolean isDone() {
269 return result
.isDone();
273 public byte[] get() throws InterruptedException
, ExecutionException
{
278 public byte[] get(long timeout
, TimeUnit unit
)
279 throws InterruptedException
, ExecutionException
, TimeoutException
{
280 return result
.get(timeout
, unit
);
287 * Convert the specified byte array to a protocol buffer representation of the specified type.
288 * This type can either be a subclass of {@link ProtocolMessage} (a legacy protocol buffer
289 * implementation), or {@link Message} (the open-sourced protocol buffer implementation).
291 private <T
> T
convertBytesToPb(byte[] bytes
, Class
<T
> requestClass
)
292 throws IllegalAccessException
, InstantiationException
,
293 InvocationTargetException
, NoSuchMethodException
{
294 if (ProtocolMessage
.class.isAssignableFrom(requestClass
)) {
295 ProtocolMessage
<?
> proto
= (ProtocolMessage
<?
>) requestClass
.newInstance();
296 proto
.mergeFrom(bytes
);
297 return requestClass
.cast(proto
);
299 if (Message
.class.isAssignableFrom(requestClass
)) {
300 Method method
= requestClass
.getMethod("parseFrom", BYTE_ARRAY_CLASS
);
301 return requestClass
.cast(method
.invoke(null, bytes
));
303 throw new UnsupportedOperationException(String
.format("Cannot assign %s to either %s or %s",
304 classDescription(requestClass
), ProtocolMessage
.class, Message
.class));
308 * Convert the protocol buffer representation to a byte array. The object can either be an
309 * instance of {@link ProtocolMessage} (a legacy protocol buffer implementation), or {@link
310 * Message} (the open-sourced protocol buffer implementation).
312 private byte[] convertPbToBytes(Object object
) {
313 if (object
instanceof ProtocolMessage
) {
314 return ((ProtocolMessage
<?
>) object
).toByteArray();
316 if (object
instanceof Message
) {
317 return ((Message
) object
).toByteArray();
319 throw new UnsupportedOperationException(String
.format("%s is neither %s nor %s",
320 classDescription(object
.getClass()), ProtocolMessage
.class, Message
.class));
324 * Create a textual description of a class that is appropriate for
325 * troubleshooting problems with {@link #convertBytesToPb(byte[], Class)}
326 * or {@link #convertPbToBytes(Object)}.
328 * @param klass The class to create a description for.
329 * @return A string description.
331 private static String
classDescription(Class
<?
> klass
) {
332 return String
.format("(%s extends %s loaded from %s)",
333 klass
, klass
.getSuperclass(),
334 klass
.getProtectionDomain().getCodeSource().getLocation());
338 public void setProperty(String property
, String value
) {
339 properties
.put(property
, value
);
343 * Resets the service properties to {@code properties}.
345 * @param properties a maybe {@code null} set of properties for local services.
348 public void setProperties(Map
<String
, String
> properties
) {
349 this.properties
.clear();
350 if (properties
!= null) {
351 this.appendProperties(properties
);
356 * Appends the given service properties to {@code properties}.
358 * @param properties a set of properties to append for local services.
361 public void appendProperties(Map
<String
, String
> properties
) {
362 this.properties
.putAll(properties
);
366 * Stops all services started by this ApiProxy and releases all of its resources.
370 for (LocalRpcService service
: serviceCache
.values()) {
374 serviceCache
.clear();
376 latencySimulatorCache
.clear();
380 int getMaxApiRequestSize(LocalRpcService rpcService
) {
381 Integer size
= rpcService
.getMaxApiRequestSize();
383 return MAX_API_REQUEST_SIZE
;
388 private Method
getDispatchMethod(LocalRpcService service
, String packageName
, String methodName
) {
389 String dispatchName
= Character
.toLowerCase(methodName
.charAt(0)) + methodName
.substring(1);
390 String methodId
= packageName
+ "." + dispatchName
;
391 Method method
= methodCache
.get(methodId
);
392 if (method
!= null) {
395 for (Method candidate
: service
.getClass().getMethods()) {
396 if (dispatchName
.equals(candidate
.getName())) {
397 methodCache
.put(methodId
, candidate
);
398 LatencyPercentiles latencyPercentiles
= candidate
.getAnnotation(LatencyPercentiles
.class);
399 if (latencyPercentiles
== null) {
401 latencyPercentiles
= service
.getClass().getAnnotation(LatencyPercentiles
.class);
403 if (latencyPercentiles
!= null) {
404 latencySimulatorCache
.put(candidate
, new LatencySimulator(latencyPercentiles
));
409 throw new CallNotFoundException(packageName
, methodName
);
412 private class AsyncApiCall
implements Callable
<byte[]> {
414 private final Environment environment
;
415 private final String packageName
;
416 private final String methodName
;
417 private final byte[] requestBytes
;
418 private final Semaphore semaphore
;
419 private boolean released
;
421 public AsyncApiCall(Environment environment
, String packageName
, String methodName
,
422 byte[] requestBytes
, Semaphore semaphore
) {
423 this.environment
= environment
;
424 this.packageName
= packageName
;
425 this.methodName
= methodName
;
426 this.requestBytes
= requestBytes
;
427 this.semaphore
= semaphore
;
431 public byte[] call() {
433 return callInternal();
435 tryReleaseSemaphore();
439 private byte[] callInternal() {
440 ApiProxy
.setEnvironmentForCurrentThread(environment
);
442 LocalRpcService service
= getService(packageName
);
443 if (service
== null) {
444 throw new CallNotFoundException(packageName
, methodName
);
446 LocalCapabilitiesEnvironment capEnv
= context
.getLocalCapabilitiesEnvironment();
447 CapabilityStatus capabilityStatus
= capEnv
448 .getStatusFromMethodName(packageName
, methodName
);
449 if (!CapabilityStatus
.ENABLED
.equals(capabilityStatus
)) {
450 throw new ApiProxy
.CapabilityDisabledException(
451 "Setup in local configuration.", packageName
, methodName
);
454 if (requestBytes
.length
> getMaxApiRequestSize(service
)) {
455 throw new RequestTooLargeException(packageName
, methodName
);
458 Method method
= getDispatchMethod(service
, packageName
, methodName
);
459 Status status
= new Status();
460 Class
<?
> requestClass
= method
.getParameterTypes()[1];
461 Object request
= convertBytesToPb(requestBytes
, requestClass
);
463 long start
= clock
.getCurrentTime();
465 return convertPbToBytes(method
.invoke(service
, status
, request
));
467 LatencySimulator latencySimulator
= latencySimulatorCache
.get(method
);
468 if (latencySimulator
!= null) {
469 if (context
.getLocalServerEnvironment().simulateProductionLatencies()) {
470 latencySimulator
.simulateLatency(clock
.getCurrentTime() - start
, service
, request
);
474 } catch (IllegalAccessException e
) {
475 throw new UnknownException(packageName
, methodName
, e
);
476 } catch (InstantiationException e
) {
477 throw new UnknownException(packageName
, methodName
, e
);
478 } catch (NoSuchMethodException e
) {
479 throw new UnknownException(packageName
, methodName
, e
);
480 } catch (InvocationTargetException e
) {
481 if (e
.getCause() instanceof RuntimeException
) {
482 throw (RuntimeException
) e
.getCause();
484 throw new UnknownException(packageName
, methodName
, e
.getCause());
486 ApiProxy
.clearEnvironmentForCurrentThread();
491 * Synchronized method that ensures the semaphore that was claimed for this API call only gets
494 synchronized void tryReleaseSemaphore() {
495 if (!released
&& semaphore
!= null) {
503 * Method needs to be synchronized to ensure that we don't end up starting multiple instances of
504 * the same service. As an example, we've seen a race condition where the local datastore service
505 * has not yet been initialized and two datastore requests come in at the exact same time. The
506 * first request looks in the service cache, doesn't find it, starts a new local datastore
507 * service, registers it in the service cache, and uses that local datastore service to handle the
508 * first request. Meanwhile the second request looks in the service cache, doesn't find it,
509 * starts a new local datastore service, registers it in the service cache (stomping on the
510 * original one), and uses that local datastore service to handle the second request. If both of
511 * these requests start txns we can end up with 2 requests receiving the same txn id, and that
512 * yields all sorts of exciting behavior. So, we synchronize this method to ensure that we only
513 * register a single instance of each service type.
516 public final synchronized LocalRpcService
getService(final String pkg
) {
517 LocalRpcService cachedService
= serviceCache
.get(pkg
);
518 if (cachedService
!= null) {
519 return cachedService
;
522 return AccessController
.doPrivileged(
523 new PrivilegedAction
<LocalRpcService
>() {
525 public LocalRpcService
run() {
526 return startServices(pkg
);
532 public DevLogService
getLogService() {
533 return (DevLogService
) getService(DevLogService
.PACKAGE
);
536 private LocalRpcService
startServices(String pkg
) {
537 for (LocalRpcService service
: ServiceLoader
.load(LocalRpcService
.class,
538 ApiProxyLocalImpl
.class.getClassLoader())) {
539 if (service
.getPackage().equals(pkg
)) {
540 service
.init(context
, properties
);
542 serviceCache
.put(pkg
, service
);
549 private static Level
toJavaLevel(ApiProxy
.LogRecord
.Level apiProxyLevel
) {
550 switch (apiProxyLevel
) {
556 return Level
.WARNING
;
562 return Level
.WARNING
;
567 public Clock
getClock() {
572 public void setClock(Clock clock
) {
576 private static class DaemonThreadFactory
implements ThreadFactory
{
578 private final ThreadFactory parent
;
580 public DaemonThreadFactory(ThreadFactory parent
) {
581 this.parent
= parent
;
585 public Thread
newThread(Runnable r
) {
586 Thread thread
= parent
.newThread(r
);
587 thread
.setDaemon(true);