1 // Copyright 2009 Google Inc. All Rights Reserved.
2 package com
.google
.appengine
.tools
.development
.testing
;
4 import com
.google
.appengine
.api
.NamespaceManager
;
5 import com
.google
.appengine
.api
.taskqueue
.DeferredTask
;
6 import com
.google
.appengine
.api
.taskqueue
.DeferredTaskContext
;
7 import com
.google
.appengine
.api
.taskqueue
.TaskOptions
;
8 import com
.google
.appengine
.api
.taskqueue
.dev
.LocalTaskQueue
;
9 import com
.google
.appengine
.api
.taskqueue
.dev
.LocalTaskQueueCallback
;
10 import com
.google
.appengine
.api
.urlfetch
.URLFetchServicePb
;
11 import com
.google
.appengine
.api
.urlfetch
.URLFetchServicePb
.URLFetchRequest
.Header
;
12 import com
.google
.appengine
.tools
.development
.ApiProxyLocal
;
13 import com
.google
.common
.collect
.Maps
;
14 import com
.google
.protobuf
.ByteString
;
16 import java
.io
.ByteArrayInputStream
;
17 import java
.io
.ObjectInputStream
;
18 import java
.io
.UnsupportedEncodingException
;
20 import java
.net
.URLDecoder
;
22 import java
.util
.Map
.Entry
;
23 import java
.util
.concurrent
.CountDownLatch
;
24 import java
.util
.concurrent
.TimeUnit
;
25 import java
.util
.logging
.Level
;
26 import java
.util
.logging
.Logger
;
28 import javax
.servlet
.http
.HttpServlet
;
29 import javax
.servlet
.http
.HttpServletResponse
;
32 * Config for accessing the local task queue in tests. Default behavior is to
33 * configure the local task queue to not automatically execute any tasks.
34 * {@link #tearDown()} wipes out all in-memory state so all queues are empty at
35 * the end of every test.
38 public final class LocalTaskQueueTestConfig
implements LocalServiceTestConfig
{
39 private static final Logger logger
= Logger
.getLogger(LocalTaskQueueTestConfig
.class.getName());
40 private Boolean disableAutoTaskExecution
= true;
41 private String queueXmlPath
;
42 private Class
<?
extends LocalTaskQueueCallback
> callbackClass
;
43 private boolean shouldCopyApiProxyEnvironment
= false;
44 private CountDownLatch taskExecutionLatch
;
47 * Disables/enables automatic task execution. If you enable automatic task
48 * execution, keep in mind that the default behavior is to hit the url that
49 * was provided when the {@link TaskOptions} was constructed. If you do not
50 * have a servlet engine running, this will fail. As an alternative to
51 * launching a servlet engine, instead consider providing a
52 * {@link LocalTaskQueueCallback} via {@link #setCallbackClass(Class)} so that
53 * you can assert on the properties of the URLFetchServicePb.URLFetchRequest.
55 * @param disableAutoTaskExecution
56 * @return {@code this} (for chaining)
58 public LocalTaskQueueTestConfig
setDisableAutoTaskExecution(boolean disableAutoTaskExecution
) {
59 this.disableAutoTaskExecution
= disableAutoTaskExecution
;
64 * Overrides the location of queue.xml. Must be a full path, e.g.
65 * /usr/local/dev/myapp/test/queue.xml
68 * @return {@code this} (for chaining)
70 public LocalTaskQueueTestConfig
setQueueXmlPath(String queueXmlPath
) {
71 this.queueXmlPath
= queueXmlPath
;
76 * Overrides the callback implementation used by the local task queue for
77 * async task execution.
79 * @param callbackClass fully-qualified name of a class with a public, default
80 * constructor that implements {@link LocalTaskQueueCallback}.
81 * @return {@code this} (for chaining)
83 public LocalTaskQueueTestConfig
setCallbackClass(
84 Class
<?
extends LocalTaskQueueCallback
> callbackClass
) {
85 this.callbackClass
= callbackClass
;
90 * Enables copying of the {@code ApiProxy.Environment} to task handler
91 * threads. This setting is ignored unless both
93 * <li>a {@link #setCallbackClass(Class) callback} class has been set, and
94 * <li>automatic task execution has been
95 * {@link #setDisableAutoTaskExecution(boolean) enabled.}
97 * In this case tasks will be handled locally by new threads and it may be
98 * useful for those threads to use the same environment data as the main test
99 * thread. Properties such as the
100 * {@link LocalServiceTestHelper#setEnvAppId(String) appID}, and the user
101 * {@link LocalServiceTestHelper#setEnvEmail(String) email} will be copied
102 * into the environment of the task threads. Be aware that
103 * {@link LocalServiceTestHelper#setEnvAttributes(java.util.Map) attribute
104 * map} will be shallow-copied to the task thread environents, so that any
105 * mutable objects used as values of the map should be thread safe. If this
106 * property is {@code false} then the task handler threads will have an empty
107 * {@code ApiProxy.Environment}. This property is {@code false} by default.
109 * @param b should the {@code ApiProxy.Environment} be pushed to task handler
111 * @return {@code this} (for chaining)
113 public LocalTaskQueueTestConfig
setShouldCopyApiProxyEnvironment(boolean b
) {
114 this.shouldCopyApiProxyEnvironment
= b
;
119 * Sets a {@link CountDownLatch} that the thread executing the task will
120 * decrement after a {@link LocalTaskQueueCallback} finishes execution. This
121 * makes it easy for tests to block until a task queue task runs. Note that
122 * the latch is only used when a callback class is provided (via
123 * {@link #setCallbackClass(Class)}) and when automatic task execution is
124 * enabled (via {@link #setDisableAutoTaskExecution(boolean)}). Also note
125 * that a {@link CountDownLatch} cannot be reused, so if you have a test that
126 * requires the ability to "reset" a CountDownLatch you can pass an instance
127 * of {@link TaskCountDownLatch}, which exposes additional methods that help
130 * @param latch The latch.
131 * @return {@code this} (for chaining)
133 public LocalTaskQueueTestConfig
setTaskExecutionLatch(CountDownLatch latch
) {
134 this.taskExecutionLatch
= latch
;
139 public void setUp() {
140 ApiProxyLocal proxy
= LocalServiceTestHelper
.getApiProxyLocal();
142 LocalTaskQueue
.DISABLE_AUTO_TASK_EXEC_PROP
, disableAutoTaskExecution
.toString());
143 if (queueXmlPath
!= null) {
144 proxy
.setProperty(LocalTaskQueue
.QUEUE_XML_PATH_PROP
, queueXmlPath
);
146 if (callbackClass
!= null) {
148 if (!disableAutoTaskExecution
) {
149 EnvSettingTaskqueueCallback
.setProxyProperties(
150 proxy
, callbackClass
, shouldCopyApiProxyEnvironment
);
151 if (taskExecutionLatch
!= null) {
152 EnvSettingTaskqueueCallback
.setTaskExecutionLatch(taskExecutionLatch
);
154 callbackName
= EnvSettingTaskqueueCallback
.class.getName();
156 callbackName
= callbackClass
.getName();
158 proxy
.setProperty(LocalTaskQueue
.CALLBACK_CLASS_PROP
, callbackName
);
163 public void tearDown() {
164 LocalTaskQueue ltq
= getLocalTaskQueue();
166 for (String queueName
: ltq
.getQueueStateInfo().keySet()) {
167 ltq
.flushQueue(queueName
);
173 public static LocalTaskQueue
getLocalTaskQueue() {
174 return (LocalTaskQueue
) LocalServiceTestHelper
.getLocalService(LocalTaskQueue
.PACKAGE
);
178 * A {@link LocalTaskQueueCallback} implementation that automatically detects
179 * and runs tasks with a {@link DeferredTask} payload.
181 * Requests with a payload that is not a {@link DeferredTask} are dispatched
182 * to {@link #executeNonDeferredRequest}, which by default does nothing.
183 * If you need to handle a payload like this you can extend the class and
184 * override this method to do what you need.
186 public static class DeferredTaskCallback
implements LocalTaskQueueCallback
{
187 private static final String CURRENT_NAMESPACE_HEADER
= "X-AppEngine-Current-Namespace";
190 public void initialize(Map
<String
, String
> properties
) {
194 public int execute(URLFetchServicePb
.URLFetchRequest req
) {
195 String currentNamespace
= NamespaceManager
.get();
196 String requestNamespace
= null;
197 ByteString payload
= null;
198 for (URLFetchServicePb
.URLFetchRequest
.Header header
: req
.getHeaderList()) {
199 if (header
.getKey().equals("content-type") &&
200 DeferredTaskContext
.RUNNABLE_TASK_CONTENT_TYPE
.equals(header
.getValue())) {
201 payload
= req
.getPayload();
202 } else if (CURRENT_NAMESPACE_HEADER
.equals(header
.getKey())) {
203 requestNamespace
= header
.getValue();
206 boolean namespacesDiffer
=
207 requestNamespace
!= null && !requestNamespace
.equals(currentNamespace
);
208 if (namespacesDiffer
) {
209 NamespaceManager
.set(requestNamespace
);
213 if (payload
!= null) {
214 ByteArrayInputStream bais
= new ByteArrayInputStream(payload
.toByteArray());
215 ObjectInputStream ois
;
217 ois
= new ObjectInputStream(bais
);
218 DeferredTask deferredTask
= (DeferredTask
) ois
.readObject();
221 } catch (Exception e
) {
222 logger
.log(Level
.WARNING
, e
.getMessage(), e
);
226 return executeNonDeferredRequest(req
);
228 if (namespacesDiffer
) {
229 NamespaceManager
.set(currentNamespace
);
235 * Broken out to make it easy for subclasses to provide their own behavior
236 * when the request payload is not a {@link DeferredTask}.
238 protected int executeNonDeferredRequest(URLFetchServicePb
.URLFetchRequest req
) {
244 * A class to delegate incoming task queue callbacks to HttpServlets based on a provided mapping.
246 public abstract static class ServletInvokingTaskCallback
extends DeferredTaskCallback
{
249 public void initialize(Map
<String
, String
> properties
) {
253 * @return A mapping from url path to HttpServlet. Where url path is a string that looks like
254 * "/foo/bar" (It must start with a '/' and should not contain characters that are not
255 * allowed in the path portion of a url.)
257 protected abstract Map
<String
, ?
extends HttpServlet
> getServletMap();
260 * @return A servlet that will be used if none of the ones from {@link #getServletMap()} match.
262 protected abstract HttpServlet
getDefaultServlet();
264 private static Map
<String
, String
> extractParamValues(final String body
) {
265 Map
<String
, String
> params
= Maps
.newHashMap();
266 if (body
.length() > 0) {
267 for (String keyValue
: body
.split("&")) {
268 String
[] split
= keyValue
.split("=");
270 params
.put(split
[0], URLDecoder
.decode(split
[1], "utf-8"));
271 } catch (UnsupportedEncodingException e
) {
272 throw new RuntimeException("Could not decode param " + split
[1]);
280 protected int executeNonDeferredRequest(URLFetchServicePb
.URLFetchRequest req
) {
282 FakeHttpServletResponse response
= new FakeHttpServletResponse();
283 response
.setCharacterEncoding("utf-8");
285 URL url
= new URL(req
.getUrl());
286 FakeHttpServletRequest request
= new FakeHttpServletRequest();
287 request
.setMethod(req
.getMethod().name());
288 request
.setHostName(url
.getHost());
289 request
.setPort(url
.getPort());
290 request
.setParametersFromQueryString(url
.getQuery());
292 for (Header header
: req
.getHeaderList()) {
293 request
.setHeader(header
.getKey(), header
.getValue());
296 String payload
= req
.getPayload().toStringUtf8();
297 for (Map
.Entry
<String
, String
> entry
: extractParamValues(payload
).entrySet()) {
298 request
.addParameter(entry
.getKey(), entry
.getValue());
300 String servletPath
= null;
301 HttpServlet servlet
= null;
302 for (Entry
<String
, ?
extends HttpServlet
> entry
: getServletMap().entrySet()) {
303 if (url
.getPath().startsWith(entry
.getKey())) {
304 servletPath
= entry
.getKey();
305 servlet
= entry
.getValue();
308 if (servlet
== null) {
309 servlet
= getDefaultServlet();
310 request
.setPathInfo(url
.getPath());
312 int servletPathStart
= servletPath
.lastIndexOf('/');
313 if (servletPathStart
== -1) {
314 throw new IllegalArgumentException("The servlet path was configured as: "
315 + servletPath
+ " which does not contan a '/'");
317 request
.setContextPath(servletPath
.substring(0, servletPathStart
));
318 request
.setSerletPath(servletPath
.substring(servletPathStart
));
319 request
.setPathInfo(url
.getPath().substring(servletPath
.length()));
321 servlet
.service(request
, response
);
322 int result
= response
.getStatus();
324 } catch (Exception ex
) {
325 logger
.log(Level
.WARNING
, ex
.getMessage(), ex
);
326 return HttpServletResponse
.SC_INTERNAL_SERVER_ERROR
;
332 * A {@link CountDownLatch} extension that can be reset. Pass an instance of
333 * this class to {@link LocalTaskQueueTestConfig#setTaskExecutionLatch)} when
334 * you need to reuse the latch within or across tests. Only one thread at a
335 * time should ever call any of the {@link #await} or {@link #reset} methods.
337 public static final class TaskCountDownLatch
extends CountDownLatch
{
338 private int initialCount
;
339 private CountDownLatch latch
;
341 public TaskCountDownLatch(int count
) {
347 public long getCount() {
348 return latch
.getCount();
352 public String
toString() {
353 return latch
.toString();
359 * Only one thread at a time should call this.
361 public void await() throws InterruptedException
{
368 * Only one thread at a time should call this.
370 public boolean await(long timeout
, TimeUnit unit
) throws InterruptedException
{
371 return latch
.await(timeout
, unit
);
375 public void countDown() {
380 * Shorthand for calling {@link #await()} followed by {@link #reset()}.
381 * Only one thread at a time should call this.
383 public void awaitAndReset() throws InterruptedException
{
384 awaitAndReset(initialCount
);
388 * Shorthand for calling {@link #await()} followed by {@link #reset(int)}.
389 * Only one thread at a time should call this.
391 public void awaitAndReset(int count
) throws InterruptedException
{
397 * Shorthand for calling {@link #await(long, java.util.concurrent.TimeUnit)} followed by
398 * {@link #reset()}. Only one thread at a time should call this.
400 public boolean awaitAndReset(long timeout
, TimeUnit unit
)
401 throws InterruptedException
{
402 return awaitAndReset(timeout
, unit
, initialCount
);
406 * Shorthand for calling {@link #await(long, java.util.concurrent.TimeUnit)} followed by
407 * {@link #reset(int)}. Only one thread at a time should call this.
409 public boolean awaitAndReset(long timeout
, TimeUnit unit
, int count
)
410 throws InterruptedException
{
411 boolean result
= await(timeout
, unit
);
417 * Resets the latch to its most recent initial count. Only one thread at a
418 * time should call this.
420 public void reset() {
425 * Resets the latch to the provided count. Only one thread at a time
428 public void reset(int count
) {
429 this.initialCount
= count
;
430 this.latch
= new CountDownLatch(count
);