1 // Copyright 2009 Google Inc. All Rights Reserved.
2 package com
.google
.appengine
.tools
.development
.testing
;
4 import com
.google
.appengine
.api
.NamespaceManager
;
5 import com
.google
.appengine
.api
.taskqueue
.DeferredTask
;
6 import com
.google
.appengine
.api
.taskqueue
.DeferredTaskContext
;
7 import com
.google
.appengine
.api
.taskqueue
.TaskOptions
;
8 import com
.google
.appengine
.api
.taskqueue
.dev
.LocalTaskQueue
;
9 import com
.google
.appengine
.api
.taskqueue
.dev
.LocalTaskQueueCallback
;
10 import com
.google
.appengine
.api
.urlfetch
.URLFetchServicePb
;
11 import com
.google
.appengine
.api
.urlfetch
.URLFetchServicePb
.URLFetchRequest
.Header
;
12 import com
.google
.appengine
.tools
.development
.ApiProxyLocal
;
13 import com
.google
.common
.collect
.Maps
;
14 import com
.google
.protobuf
.ByteString
;
16 import java
.io
.ByteArrayInputStream
;
17 import java
.io
.ObjectInputStream
;
18 import java
.io
.UnsupportedEncodingException
;
20 import java
.net
.URLDecoder
;
22 import java
.util
.Map
.Entry
;
23 import java
.util
.concurrent
.CountDownLatch
;
24 import java
.util
.concurrent
.TimeUnit
;
25 import java
.util
.logging
.Level
;
26 import java
.util
.logging
.Logger
;
28 import javax
.servlet
.http
.HttpServlet
;
29 import javax
.servlet
.http
.HttpServletResponse
;
32 * Config for accessing the local task queue in tests. Default behavior is to
33 * configure the local task queue to not automatically execute any tasks.
34 * {@link #tearDown()} wipes out all in-memory state so all queues are empty at
35 * the end of every test. LocalTaskQueue configuration are not restored.
36 * {@link #tearDown()} does not restore default configuration values modified
39 * <li>{@link #setDisableAutoTaskExecution()}</li>
40 * <li>{@link #setQueueXmlPath()}</li>
41 * <li>{@link #setCallbackClass()}</li>
42 * <li>{@link #setShouldCopyApiProxyEnvironment()}</li>
43 * <li>{@link #setTaskExecutionLatch()}</li>
47 public final class LocalTaskQueueTestConfig
implements LocalServiceTestConfig
{
48 private static final Logger logger
= Logger
.getLogger(LocalTaskQueueTestConfig
.class.getName());
49 private Boolean disableAutoTaskExecution
= true;
50 private String queueXmlPath
;
51 private Class
<?
extends LocalTaskQueueCallback
> callbackClass
;
52 private boolean shouldCopyApiProxyEnvironment
= false;
53 private CountDownLatch taskExecutionLatch
;
56 * Disables/enables automatic task execution. If you enable automatic task
57 * execution, keep in mind that the default behavior is to hit the url that
58 * was provided when the {@link TaskOptions} was constructed. If you do not
59 * have a servlet engine running, this will fail. As an alternative to
60 * launching a servlet engine, instead consider providing a
61 * {@link LocalTaskQueueCallback} via {@link #setCallbackClass(Class)} so that
62 * you can assert on the properties of the URLFetchServicePb.URLFetchRequest.
64 * Once set, this value is persistent across tests. If this value needs to be
65 * set for any one test, it should be appropriately configured in the setup
66 * stage for all tests.
68 * @param disableAutoTaskExecution
69 * @return {@code this} (for chaining)
71 public LocalTaskQueueTestConfig
setDisableAutoTaskExecution(boolean disableAutoTaskExecution
) {
72 this.disableAutoTaskExecution
= disableAutoTaskExecution
;
77 * Overrides the location of queue.xml. Must be a full path, e.g.
78 * /usr/local/dev/myapp/test/queue.xml
80 * Once set, this value is persistent across tests. If this value needs to be
81 * set for an operation specific to any one test, it should appropriately
82 * configured in the setup stage for all tests.
85 * @return {@code this} (for chaining)
87 public LocalTaskQueueTestConfig
setQueueXmlPath(String queueXmlPath
) {
88 this.queueXmlPath
= queueXmlPath
;
93 * Overrides the callback implementation used by the local task queue for
94 * async task execution.
96 * Once set, this value is persistent across tests. If this value needs to be
97 * set for any one test, it should be appropriately configured in the setup
98 * stage for all tests.
100 * @param callbackClass fully-qualified name of a class with a public, default
101 * constructor that implements {@link LocalTaskQueueCallback}.
102 * @return {@code this} (for chaining)
104 public LocalTaskQueueTestConfig
setCallbackClass(
105 Class
<?
extends LocalTaskQueueCallback
> callbackClass
) {
106 this.callbackClass
= callbackClass
;
111 * Enables copying of the {@code ApiProxy.Environment} to task handler
112 * threads. This setting is ignored unless both
114 * <li>a {@link #setCallbackClass(Class) callback} class has been set, and
115 * <li>automatic task execution has been
116 * {@link #setDisableAutoTaskExecution(boolean) enabled.}
118 * In this case tasks will be handled locally by new threads and it may be
119 * useful for those threads to use the same environment data as the main test
120 * thread. Properties such as the
121 * {@link LocalServiceTestHelper#setEnvAppId(String) appID}, and the user
122 * {@link LocalServiceTestHelper#setEnvEmail(String) email} will be copied
123 * into the environment of the task threads. Be aware that
124 * {@link LocalServiceTestHelper#setEnvAttributes(java.util.Map) attribute
125 * map} will be shallow-copied to the task thread environents, so that any
126 * mutable objects used as values of the map should be thread safe. If this
127 * property is {@code false} then the task handler threads will have an empty
128 * {@code ApiProxy.Environment}. This property is {@code false} by default.
130 * Once set, this value is persistent across tests. If this value needs to be
131 * set for any one test, it should be appropriately configured in the setup
132 * stage for all tests.
134 * @param b should the {@code ApiProxy.Environment} be pushed to task handler
136 * @return {@code this} (for chaining)
138 public LocalTaskQueueTestConfig
setShouldCopyApiProxyEnvironment(boolean b
) {
139 this.shouldCopyApiProxyEnvironment
= b
;
144 * Sets a {@link CountDownLatch} that the thread executing the task will
145 * decrement after a {@link LocalTaskQueueCallback} finishes execution. This
146 * makes it easy for tests to block until a task queue task runs. Note that
147 * the latch is only used when a callback class is provided (via
148 * {@link #setCallbackClass(Class)}) and when automatic task execution is
149 * enabled (via {@link #setDisableAutoTaskExecution(boolean)}). Also note
150 * that a {@link CountDownLatch} cannot be reused, so if you have a test that
151 * requires the ability to "reset" a CountDownLatch you can pass an instance
152 * of {@link TaskCountDownLatch}, which exposes additional methods that help
155 * Once set, this value is persistent across tests. If this value needs to be
156 * set for any one test, it should be appropriately configured in the setup
157 * stage for all tests.
159 * @param latch The latch.
160 * @return {@code this} (for chaining)
162 public LocalTaskQueueTestConfig
setTaskExecutionLatch(CountDownLatch latch
) {
163 this.taskExecutionLatch
= latch
;
168 public void setUp() {
169 ApiProxyLocal proxy
= LocalServiceTestHelper
.getApiProxyLocal();
171 LocalTaskQueue
.DISABLE_AUTO_TASK_EXEC_PROP
, disableAutoTaskExecution
.toString());
172 if (queueXmlPath
!= null) {
173 proxy
.setProperty(LocalTaskQueue
.QUEUE_XML_PATH_PROP
, queueXmlPath
);
175 if (callbackClass
!= null) {
177 if (!disableAutoTaskExecution
) {
178 EnvSettingTaskqueueCallback
.setProxyProperties(
179 proxy
, callbackClass
, shouldCopyApiProxyEnvironment
);
180 if (taskExecutionLatch
!= null) {
181 EnvSettingTaskqueueCallback
.setTaskExecutionLatch(taskExecutionLatch
);
183 callbackName
= EnvSettingTaskqueueCallback
.class.getName();
185 callbackName
= callbackClass
.getName();
187 proxy
.setProperty(LocalTaskQueue
.CALLBACK_CLASS_PROP
, callbackName
);
192 public void tearDown() {
193 LocalTaskQueue ltq
= getLocalTaskQueue();
195 for (String queueName
: ltq
.getQueueStateInfo().keySet()) {
196 ltq
.flushQueue(queueName
);
202 public static LocalTaskQueue
getLocalTaskQueue() {
203 return (LocalTaskQueue
) LocalServiceTestHelper
.getLocalService(LocalTaskQueue
.PACKAGE
);
207 * A {@link LocalTaskQueueCallback} implementation that automatically detects
208 * and runs tasks with a {@link DeferredTask} payload.
210 * Requests with a payload that is not a {@link DeferredTask} are dispatched
211 * to {@link #executeNonDeferredRequest}, which by default does nothing.
212 * If you need to handle a payload like this you can extend the class and
213 * override this method to do what you need.
215 public static class DeferredTaskCallback
implements LocalTaskQueueCallback
{
216 private static final String CURRENT_NAMESPACE_HEADER
= "X-AppEngine-Current-Namespace";
219 public void initialize(Map
<String
, String
> properties
) {
223 public int execute(URLFetchServicePb
.URLFetchRequest req
) {
224 String currentNamespace
= NamespaceManager
.get();
225 String requestNamespace
= null;
226 ByteString payload
= null;
227 for (URLFetchServicePb
.URLFetchRequest
.Header header
: req
.getHeaderList()) {
228 if (header
.getKey().equals("content-type") &&
229 DeferredTaskContext
.RUNNABLE_TASK_CONTENT_TYPE
.equals(header
.getValue())) {
230 payload
= req
.getPayload();
231 } else if (CURRENT_NAMESPACE_HEADER
.equals(header
.getKey())) {
232 requestNamespace
= header
.getValue();
235 boolean namespacesDiffer
=
236 requestNamespace
!= null && !requestNamespace
.equals(currentNamespace
);
237 if (namespacesDiffer
) {
238 NamespaceManager
.set(requestNamespace
);
242 if (payload
!= null) {
243 ByteArrayInputStream bais
= new ByteArrayInputStream(payload
.toByteArray());
244 ObjectInputStream ois
;
246 ois
= new ObjectInputStream(bais
);
247 DeferredTask deferredTask
= (DeferredTask
) ois
.readObject();
250 } catch (Exception e
) {
251 logger
.log(Level
.WARNING
, e
.getMessage(), e
);
255 return executeNonDeferredRequest(req
);
257 if (namespacesDiffer
) {
258 NamespaceManager
.set(currentNamespace
);
264 * Broken out to make it easy for subclasses to provide their own behavior
265 * when the request payload is not a {@link DeferredTask}.
267 protected int executeNonDeferredRequest(URLFetchServicePb
.URLFetchRequest req
) {
273 * A class to delegate incoming task queue callbacks to HttpServlets based on a provided mapping.
275 public abstract static class ServletInvokingTaskCallback
extends DeferredTaskCallback
{
278 public void initialize(Map
<String
, String
> properties
) {
282 * @return A mapping from url path to HttpServlet. Where url path is a string that looks like
283 * "/foo/bar" (It must start with a '/' and should not contain characters that are not
284 * allowed in the path portion of a url.)
286 protected abstract Map
<String
, ?
extends HttpServlet
> getServletMap();
289 * @return A servlet that will be used if none of the ones from {@link #getServletMap()} match.
291 protected abstract HttpServlet
getDefaultServlet();
293 private static Map
<String
, String
> extractParamValues(final String body
) {
294 Map
<String
, String
> params
= Maps
.newHashMap();
295 if (body
.length() > 0) {
296 for (String keyValue
: body
.split("&")) {
297 String
[] split
= keyValue
.split("=");
299 params
.put(split
[0], URLDecoder
.decode(split
[1], "utf-8"));
300 } catch (UnsupportedEncodingException e
) {
301 throw new RuntimeException("Could not decode param " + split
[1]);
309 protected int executeNonDeferredRequest(URLFetchServicePb
.URLFetchRequest req
) {
311 FakeHttpServletResponse response
= new FakeHttpServletResponse();
312 response
.setCharacterEncoding("utf-8");
314 URL url
= new URL(req
.getUrl());
315 FakeHttpServletRequest request
= new FakeHttpServletRequest();
316 request
.setMethod(req
.getMethod().name());
317 request
.setHostName(url
.getHost());
318 request
.setPort(url
.getPort());
319 request
.setParametersFromQueryString(url
.getQuery());
321 for (Header header
: req
.getHeaderList()) {
322 request
.setHeader(header
.getKey(), header
.getValue());
325 String payload
= req
.getPayload().toStringUtf8();
326 for (Map
.Entry
<String
, String
> entry
: extractParamValues(payload
).entrySet()) {
327 request
.addParameter(entry
.getKey(), entry
.getValue());
329 String servletPath
= null;
330 HttpServlet servlet
= null;
331 for (Entry
<String
, ?
extends HttpServlet
> entry
: getServletMap().entrySet()) {
332 if (url
.getPath().startsWith(entry
.getKey())) {
333 servletPath
= entry
.getKey();
334 servlet
= entry
.getValue();
337 if (servlet
== null) {
338 servlet
= getDefaultServlet();
339 request
.setPathInfo(url
.getPath());
341 int servletPathStart
= servletPath
.lastIndexOf('/');
342 if (servletPathStart
== -1) {
343 throw new IllegalArgumentException("The servlet path was configured as: "
344 + servletPath
+ " which does not contan a '/'");
346 request
.setContextPath(servletPath
.substring(0, servletPathStart
));
347 request
.setSerletPath(servletPath
.substring(servletPathStart
));
348 request
.setPathInfo(url
.getPath().substring(servletPath
.length()));
350 servlet
.service(request
, response
);
351 int result
= response
.getStatus();
353 } catch (Exception ex
) {
354 logger
.log(Level
.WARNING
, ex
.getMessage(), ex
);
355 return HttpServletResponse
.SC_INTERNAL_SERVER_ERROR
;
361 * A {@link CountDownLatch} extension that can be reset. Pass an instance of
362 * this class to {@link LocalTaskQueueTestConfig#setTaskExecutionLatch)} when
363 * you need to reuse the latch within or across tests. Only one thread at a
364 * time should ever call any of the {@link #await} or {@link #reset} methods.
366 public static final class TaskCountDownLatch
extends CountDownLatch
{
367 private int initialCount
;
368 private CountDownLatch latch
;
370 public TaskCountDownLatch(int count
) {
376 public long getCount() {
377 return latch
.getCount();
381 public String
toString() {
382 return latch
.toString();
388 * Only one thread at a time should call this.
390 public void await() throws InterruptedException
{
397 * Only one thread at a time should call this.
399 public boolean await(long timeout
, TimeUnit unit
) throws InterruptedException
{
400 return latch
.await(timeout
, unit
);
404 public void countDown() {
409 * Shorthand for calling {@link #await()} followed by {@link #reset()}.
410 * Only one thread at a time should call this.
412 public void awaitAndReset() throws InterruptedException
{
413 awaitAndReset(initialCount
);
417 * Shorthand for calling {@link #await()} followed by {@link #reset(int)}.
418 * Only one thread at a time should call this.
420 public void awaitAndReset(int count
) throws InterruptedException
{
426 * Shorthand for calling {@link #await(long, java.util.concurrent.TimeUnit)} followed by
427 * {@link #reset()}. Only one thread at a time should call this.
429 public boolean awaitAndReset(long timeout
, TimeUnit unit
)
430 throws InterruptedException
{
431 return awaitAndReset(timeout
, unit
, initialCount
);
435 * Shorthand for calling {@link #await(long, java.util.concurrent.TimeUnit)} followed by
436 * {@link #reset(int)}. Only one thread at a time should call this.
438 public boolean awaitAndReset(long timeout
, TimeUnit unit
, int count
)
439 throws InterruptedException
{
440 boolean result
= await(timeout
, unit
);
446 * Resets the latch to its most recent initial count. Only one thread at a
447 * time should call this.
449 public void reset() {
454 * Resets the latch to the provided count. Only one thread at a time
457 public void reset(int count
) {
458 this.initialCount
= count
;
459 this.latch
= new CountDownLatch(count
);