1 // Copyright 2008 Google Inc. All Rights Reserved.
3 package com
.google
.appengine
.tools
.development
;
5 import com
.google
.appengine
.api
.capabilities
.CapabilityStatus
;
6 import com
.google
.appengine
.tools
.development
.LocalRpcService
.Status
;
7 import com
.google
.apphosting
.api
.ApiProxy
;
8 import com
.google
.apphosting
.api
.ApiProxy
.CallNotFoundException
;
9 import com
.google
.apphosting
.api
.ApiProxy
.Environment
;
10 import com
.google
.apphosting
.api
.ApiProxy
.LogRecord
;
11 import com
.google
.apphosting
.api
.ApiProxy
.RequestTooLargeException
;
12 import com
.google
.apphosting
.api
.ApiProxy
.UnknownException
;
13 import com
.google
.io
.protocol
.ProtocolMessage
;
14 import com
.google
.protobuf
.Message
;
16 import java
.lang
.reflect
.InvocationTargetException
;
17 import java
.lang
.reflect
.Method
;
18 import java
.security
.AccessController
;
19 import java
.security
.PrivilegedAction
;
20 import java
.util
.Arrays
;
21 import java
.util
.Formatter
;
22 import java
.util
.HashMap
;
23 import java
.util
.Iterator
;
24 import java
.util
.List
;
26 import java
.util
.concurrent
.Callable
;
27 import java
.util
.concurrent
.CancellationException
;
28 import java
.util
.concurrent
.ConcurrentHashMap
;
29 import java
.util
.concurrent
.ExecutionException
;
30 import java
.util
.concurrent
.ExecutorService
;
31 import java
.util
.concurrent
.Executors
;
32 import java
.util
.concurrent
.Future
;
33 import java
.util
.concurrent
.Semaphore
;
34 import java
.util
.concurrent
.ThreadFactory
;
35 import java
.util
.concurrent
.TimeUnit
;
36 import java
.util
.concurrent
.TimeoutException
;
37 import java
.util
.logging
.Level
;
38 import java
.util
.logging
.Logger
;
41 * Implements ApiProxy.Delegate such that the requests are dispatched to local service
42 * implementations. Used for both the {@link com.google.appengine.tools.development.DevAppServer}
43 * and for unit testing services.
46 class ApiProxyLocalImpl
implements ApiProxyLocal
{
48 private static final Class
<?
> BYTE_ARRAY_CLASS
= byte[].class;
51 * The maximum size of any given API request.
53 private static final int MAX_API_REQUEST_SIZE
= 1048576;
55 private static final String API_DEADLINE_KEY
=
56 "com.google.apphosting.api.ApiProxy.api_deadline_key";
58 static final String IS_OFFLINE_REQUEST_KEY
= "com.google.appengine.request.offline";
61 * Implementation of the {@link LocalServiceContext} interface
63 private class LocalServiceContextImpl
implements LocalServiceContext
{
66 * The local server environment
68 private final LocalServerEnvironment localServerEnvironment
;
70 private final LocalCapabilitiesEnvironment localCapabilitiesEnvironment
=
71 new LocalCapabilitiesEnvironment(System
.getProperties());
74 * Creates a new context, for the given application.
76 * @param localServerEnvironment The environment for the local server.
78 public LocalServiceContextImpl(LocalServerEnvironment localServerEnvironment
) {
79 this.localServerEnvironment
= localServerEnvironment
;
83 public LocalServerEnvironment
getLocalServerEnvironment() {
84 return localServerEnvironment
;
88 public LocalCapabilitiesEnvironment
getLocalCapabilitiesEnvironment() {
89 return localCapabilitiesEnvironment
;
93 public Clock
getClock() {
98 public LocalRpcService
getLocalService(String packageName
) {
99 return ApiProxyLocalImpl
.this.getService(packageName
);
103 private static final Logger logger
= Logger
.getLogger(ApiProxyLocalImpl
.class.getName());
105 private final Map
<String
, LocalRpcService
> serviceCache
=
106 new ConcurrentHashMap
<String
, LocalRpcService
>();
108 private final Map
<String
, Method
> methodCache
= new ConcurrentHashMap
<String
, Method
>();
109 final Map
<Method
, LatencySimulator
> latencySimulatorCache
=
110 new ConcurrentHashMap
<Method
, LatencySimulator
>();
112 private final Map
<String
, String
> properties
= new HashMap
<String
, String
>();
114 private final ExecutorService apiExecutor
= Executors
.newCachedThreadPool(
115 new DaemonThreadFactory(Executors
.defaultThreadFactory()));
117 private final LocalServiceContext context
;
119 private Clock clock
= Clock
.DEFAULT
;
122 * Creates the local proxy in a given context
124 * @param environment the local server environment.
126 protected ApiProxyLocalImpl(LocalServerEnvironment environment
) {
127 this.context
= new LocalServiceContextImpl(environment
);
131 public void log(Environment environment
, LogRecord record
) {
132 logger
.log(toJavaLevel(record
.getLevel()), record
.getMessage());
136 public void flushLogs(Environment environment
) {
141 public byte[] makeSyncCall(ApiProxy
.Environment environment
, String packageName
,
142 String methodName
, byte[] requestBytes
) {
143 ApiProxy
.ApiConfig apiConfig
= null;
144 Double deadline
= (Double
) environment
.getAttributes().get(API_DEADLINE_KEY
);
145 if (deadline
!= null) {
146 apiConfig
= new ApiProxy
.ApiConfig();
147 apiConfig
.setDeadlineInSeconds(deadline
);
150 Future
<byte[]> future
=
151 doAsyncCall(environment
, packageName
, methodName
, requestBytes
, apiConfig
);
154 } catch (InterruptedException ex
) {
155 throw new ApiProxy
.CancelledException(packageName
, methodName
);
156 } catch (CancellationException ex
) {
157 throw new ApiProxy
.CancelledException(packageName
, methodName
);
158 } catch (ExecutionException ex
) {
159 if (ex
.getCause() instanceof RuntimeException
) {
160 throw (RuntimeException
) ex
.getCause();
161 } else if (ex
.getCause() instanceof Error
) {
162 throw (Error
) ex
.getCause();
164 throw new ApiProxy
.UnknownException(packageName
, methodName
, ex
.getCause());
170 public Future
<byte[]> makeAsyncCall(Environment environment
, String packageName
,
171 String methodName
, byte[] requestBytes
, ApiProxy
.ApiConfig apiConfig
) {
172 return doAsyncCall(environment
, packageName
, methodName
, requestBytes
, apiConfig
);
176 public List
<Thread
> getRequestThreads(Environment environment
) {
177 return Arrays
.asList(new Thread
[]{Thread
.currentThread()});
180 private Future
<byte[]> doAsyncCall(Environment environment
, final String packageName
,
181 final String methodName
, byte[] requestBytes
, ApiProxy
.ApiConfig apiConfig
) {
182 Semaphore semaphore
= (Semaphore
) environment
.getAttributes().get(
183 LocalEnvironment
.API_CALL_SEMAPHORE
);
184 if (semaphore
!= null) {
187 } catch (InterruptedException ex
) {
188 throw new RuntimeException("Interrupted while waiting on semaphore:", ex
);
191 boolean offline
= environment
.getAttributes().get(IS_OFFLINE_REQUEST_KEY
) != null;
192 AsyncApiCall asyncApiCall
= new AsyncApiCall(
193 environment
, packageName
, methodName
, requestBytes
, semaphore
);
194 boolean success
= false;
196 Callable
<byte[]> callable
= Executors
.privilegedCallable(asyncApiCall
);
198 Future
<byte[]> resultFuture
= AccessController
.doPrivileged(
199 new PrivilegedApiAction(callable
, asyncApiCall
));
201 if (context
.getLocalServerEnvironment().enforceApiDeadlines()) {
202 long deadlineMillis
= (long) (1000.0 * resolveDeadline(packageName
, apiConfig
, offline
));
203 resultFuture
= new TimedFuture
<byte[]>(resultFuture
, deadlineMillis
, clock
) {
205 protected RuntimeException
createDeadlineException() {
206 throw new ApiProxy
.ApiDeadlineExceededException(packageName
, methodName
);
213 asyncApiCall
.tryReleaseSemaphore();
218 private double resolveDeadline(String packageName
, ApiProxy
.ApiConfig apiConfig
,
220 LocalRpcService service
= getService(packageName
);
221 Double deadline
= null;
222 if (apiConfig
!= null) {
223 deadline
= apiConfig
.getDeadlineInSeconds();
225 if (deadline
== null && service
!= null) {
226 deadline
= service
.getDefaultDeadline(isOffline
);
228 if (deadline
== null) {
232 Double maxDeadline
= null;
233 if (service
!= null) {
234 maxDeadline
= service
.getMaximumDeadline(isOffline
);
236 if (maxDeadline
== null) {
239 return Math
.min(deadline
, maxDeadline
);
242 private class PrivilegedApiAction
implements PrivilegedAction
<Future
<byte[]>> {
244 private final Callable
<byte[]> callable
;
245 private final AsyncApiCall asyncApiCall
;
247 PrivilegedApiAction(Callable
<byte[]> callable
, AsyncApiCall asyncApiCall
) {
248 this.callable
= callable
;
249 this.asyncApiCall
= asyncApiCall
;
253 public Future
<byte[]> run() {
254 final Future
<byte[]> result
= apiExecutor
.submit(callable
);
255 return new Future
<byte[]>() {
257 public boolean cancel(final boolean mayInterruptIfRunning
) {
258 return AccessController
.doPrivileged(
259 new PrivilegedAction
<Boolean
>() {
261 public Boolean
run() {
262 asyncApiCall
.tryReleaseSemaphore();
263 return result
.cancel(mayInterruptIfRunning
);
269 public boolean isCancelled() {
270 return result
.isCancelled();
274 public boolean isDone() {
275 return result
.isDone();
279 public byte[] get() throws InterruptedException
, ExecutionException
{
284 public byte[] get(long timeout
, TimeUnit unit
)
285 throws InterruptedException
, ExecutionException
, TimeoutException
{
286 return result
.get(timeout
, unit
);
293 * Convert the specified byte array to a protocol buffer representation of the specified type.
294 * This type can either be a subclass of {@link ProtocolMessage} (a legacy protocol buffer
295 * implementation), or {@link Message} (the open-sourced protocol buffer implementation).
297 private <T
> T
convertBytesToPb(byte[] bytes
, Class
<T
> requestClass
)
298 throws IllegalAccessException
, InstantiationException
,
299 InvocationTargetException
, NoSuchMethodException
{
300 if (ProtocolMessage
.class.isAssignableFrom(requestClass
)) {
301 ProtocolMessage
<?
> proto
= (ProtocolMessage
<?
>) requestClass
.newInstance();
302 proto
.mergeFrom(bytes
);
303 return requestClass
.cast(proto
);
305 if (Message
.class.isAssignableFrom(requestClass
)) {
306 Method method
= requestClass
.getMethod("parseFrom", BYTE_ARRAY_CLASS
);
307 return requestClass
.cast(method
.invoke(null, bytes
));
309 throw new UnsupportedOperationException(format("Cannot assign %s to either %s or %s",
310 classDescription(requestClass
), ProtocolMessage
.class, Message
.class));
314 * Convert the protocol buffer representation to a byte array. The object can either be an
315 * instance of {@link ProtocolMessage} (a legacy protocol buffer implementation), or {@link
316 * Message} (the open-sourced protocol buffer implementation).
318 private byte[] convertPbToBytes(Object object
) {
319 if (object
instanceof ProtocolMessage
) {
320 return ((ProtocolMessage
<?
>) object
).toByteArray();
322 if (object
instanceof Message
) {
323 return ((Message
) object
).toByteArray();
325 throw new UnsupportedOperationException(format("%s is neither %s nor %s",
326 classDescription(object
.getClass()), ProtocolMessage
.class, Message
.class));
330 * Create a textual description of a class that is appropriate for
331 * troubleshooting problems with {@link #convertBytesToPb(byte[], Class)}
332 * or {@link #convertPbToBytes(Object)}.
334 * @param klass The class to create a description for.
335 * @return A string description.
337 private static String
classDescription(Class
<?
> klass
) {
338 return format("(%s extends %s loaded from %s)",
339 klass
, klass
.getSuperclass(),
340 klass
.getProtectionDomain().getCodeSource().getLocation());
344 * A helper utility that abstracts the ceremony of using a one time
345 * formatter. The arguments are passed unmodified to Formatter.format()
346 * and the return is the unmodified Formatter.toString();
348 private static String
format(String format
, Object
... args
) {
349 return new Formatter().format(format
, args
).toString();
353 public void setProperty(String property
, String value
) {
354 properties
.put(property
, value
);
358 * Resets the service properties to {@code properties}.
360 * @param properties a maybe {@code null} set of properties for local services.
363 public void setProperties(Map
<String
, String
> properties
) {
364 this.properties
.clear();
365 if (properties
!= null) {
366 this.appendProperties(properties
);
371 * Appends the given service properties to {@code properties}.
373 * @param properties a set of properties to append for local services.
376 public void appendProperties(Map
<String
, String
> properties
) {
377 this.properties
.putAll(properties
);
381 * Stops all services started by this ApiProxy and releases all of its resources.
385 for (LocalRpcService service
: serviceCache
.values()) {
389 serviceCache
.clear();
391 latencySimulatorCache
.clear();
395 int getMaxApiRequestSize(LocalRpcService rpcService
) {
396 Integer size
= rpcService
.getMaxApiRequestSize();
398 return MAX_API_REQUEST_SIZE
;
403 private Method
getDispatchMethod(LocalRpcService service
, String packageName
, String methodName
) {
404 String dispatchName
= Character
.toLowerCase(methodName
.charAt(0)) + methodName
.substring(1);
405 String methodId
= packageName
+ "." + dispatchName
;
406 Method method
= methodCache
.get(methodId
);
407 if (method
!= null) {
410 for (Method candidate
: service
.getClass().getMethods()) {
411 if (dispatchName
.equals(candidate
.getName())) {
412 methodCache
.put(methodId
, candidate
);
413 LatencyPercentiles latencyPercentiles
= candidate
.getAnnotation(LatencyPercentiles
.class);
414 if (latencyPercentiles
== null) {
416 latencyPercentiles
= service
.getClass().getAnnotation(LatencyPercentiles
.class);
418 if (latencyPercentiles
!= null) {
419 latencySimulatorCache
.put(candidate
, new LatencySimulator(latencyPercentiles
));
424 throw new CallNotFoundException(packageName
, methodName
);
427 private class AsyncApiCall
implements Callable
<byte[]> {
429 private final Environment environment
;
430 private final String packageName
;
431 private final String methodName
;
432 private final byte[] requestBytes
;
433 private final Semaphore semaphore
;
434 private boolean released
;
436 public AsyncApiCall(Environment environment
, String packageName
, String methodName
,
437 byte[] requestBytes
, Semaphore semaphore
) {
438 this.environment
= environment
;
439 this.packageName
= packageName
;
440 this.methodName
= methodName
;
441 this.requestBytes
= requestBytes
;
442 this.semaphore
= semaphore
;
446 public byte[] call() {
448 return callInternal();
450 tryReleaseSemaphore();
454 private byte[] callInternal() {
455 ApiProxy
.setEnvironmentForCurrentThread(environment
);
457 LocalRpcService service
= getService(packageName
);
458 if (service
== null) {
459 throw new CallNotFoundException(packageName
, methodName
);
461 LocalCapabilitiesEnvironment capEnv
= context
.getLocalCapabilitiesEnvironment();
462 CapabilityStatus capabilityStatus
= capEnv
463 .getStatusFromMethodName(packageName
, methodName
);
464 if (!CapabilityStatus
.ENABLED
.equals(capabilityStatus
)) {
465 throw new ApiProxy
.CapabilityDisabledException(
466 "Setup in local configuration.", packageName
, methodName
);
469 if (requestBytes
.length
> getMaxApiRequestSize(service
)) {
470 throw new RequestTooLargeException(packageName
, methodName
);
473 Method method
= getDispatchMethod(service
, packageName
, methodName
);
474 Status status
= new Status();
475 Class
<?
> requestClass
= method
.getParameterTypes()[1];
476 Object request
= convertBytesToPb(requestBytes
, requestClass
);
478 long start
= clock
.getCurrentTime();
480 return convertPbToBytes(method
.invoke(service
, status
, request
));
482 LatencySimulator latencySimulator
= latencySimulatorCache
.get(method
);
483 if (latencySimulator
!= null) {
484 if (context
.getLocalServerEnvironment().simulateProductionLatencies()) {
485 latencySimulator
.simulateLatency(clock
.getCurrentTime() - start
, service
, request
);
489 } catch (IllegalAccessException e
) {
490 throw new UnknownException(packageName
, methodName
, e
);
491 } catch (InstantiationException e
) {
492 throw new UnknownException(packageName
, methodName
, e
);
493 } catch (NoSuchMethodException e
) {
494 throw new UnknownException(packageName
, methodName
, e
);
495 } catch (InvocationTargetException e
) {
496 if (e
.getCause() instanceof RuntimeException
) {
497 throw (RuntimeException
) e
.getCause();
499 throw new UnknownException(packageName
, methodName
, e
.getCause());
501 ApiProxy
.clearEnvironmentForCurrentThread();
506 * Synchronized method that ensures the semaphore that was claimed for this API call only gets
509 synchronized void tryReleaseSemaphore() {
510 if (!released
&& semaphore
!= null) {
518 * Method needs to be synchronized to ensure that we don't end up starting multiple instances of
519 * the same service. As an example, we've seen a race condition where the local datastore service
520 * has not yet been initialized and two datastore requests come in at the exact same time. The
521 * first request looks in the service cache, doesn't find it, starts a new local datastore
522 * service, registers it in the service cache, and uses that local datastore service to handle the
523 * first request. Meanwhile the second request looks in the service cache, doesn't find it,
524 * starts a new local datastore service, registers it in the service cache (stomping on the
525 * original one), and uses that local datastore service to handle the second request. If both of
526 * these requests start txns we can end up with 2 requests receiving the same txn id, and that
527 * yields all sorts of exciting behavior. So, we synchronize this method to ensure that we only
528 * register a single instance of each service type.
531 public synchronized final LocalRpcService
getService(final String pkg
) {
532 LocalRpcService cachedService
= serviceCache
.get(pkg
);
533 if (cachedService
!= null) {
534 return cachedService
;
537 return AccessController
.doPrivileged(
538 new PrivilegedAction
<LocalRpcService
>() {
540 public LocalRpcService
run() {
541 return startServices(pkg
);
546 private LocalRpcService
startServices(String pkg
) {
547 @SuppressWarnings({"unchecked", "sunapi"})
548 Iterator
<LocalRpcService
> services
= sun
.misc
.Service
.providers(LocalRpcService
.class,
549 ApiProxyLocalImpl
.class.getClassLoader());
550 while (services
.hasNext()) {
551 LocalRpcService service
= services
.next();
552 if (service
.getPackage().equals(pkg
)) {
553 service
.init(context
, properties
);
555 serviceCache
.put(pkg
, service
);
562 private static Level
toJavaLevel(ApiProxy
.LogRecord
.Level apiProxyLevel
) {
563 switch (apiProxyLevel
) {
569 return Level
.WARNING
;
575 return Level
.WARNING
;
580 public Clock
getClock() {
585 public void setClock(Clock clock
) {
589 private static class DaemonThreadFactory
implements ThreadFactory
{
591 private final ThreadFactory parent
;
593 public DaemonThreadFactory(ThreadFactory parent
) {
594 this.parent
= parent
;
598 public Thread
newThread(Runnable r
) {
599 Thread thread
= parent
.newThread(r
);
600 thread
.setDaemon(true);