1.9.22 release.
[gae.git] / java / src / main / com / google / appengine / tools / development / testing / LocalTaskQueueTestConfig.java
blob9bc25b05cb9d20500f59770d8b4903d143ca81bb
1 // Copyright 2009 Google Inc. All Rights Reserved.
2 package com.google.appengine.tools.development.testing;
4 import com.google.appengine.api.NamespaceManager;
5 import com.google.appengine.api.taskqueue.DeferredTask;
6 import com.google.appengine.api.taskqueue.DeferredTaskContext;
7 import com.google.appengine.api.taskqueue.TaskOptions;
8 import com.google.appengine.api.taskqueue.dev.LocalTaskQueue;
9 import com.google.appengine.api.taskqueue.dev.LocalTaskQueueCallback;
10 import com.google.appengine.api.urlfetch.URLFetchServicePb;
11 import com.google.appengine.api.urlfetch.URLFetchServicePb.URLFetchRequest.Header;
12 import com.google.appengine.tools.development.ApiProxyLocal;
13 import com.google.common.collect.Maps;
14 import com.google.protobuf.ByteString;
16 import java.io.ByteArrayInputStream;
17 import java.io.ObjectInputStream;
18 import java.io.UnsupportedEncodingException;
19 import java.net.URL;
20 import java.net.URLDecoder;
21 import java.util.Map;
22 import java.util.Map.Entry;
23 import java.util.concurrent.CountDownLatch;
24 import java.util.concurrent.TimeUnit;
25 import java.util.logging.Level;
26 import java.util.logging.Logger;
28 import javax.servlet.http.HttpServlet;
29 import javax.servlet.http.HttpServletResponse;
31 /**
32 * Config for accessing the local task queue in tests. Default behavior is to
33 * configure the local task queue to not automatically execute any tasks.
34 * {@link #tearDown()} wipes out all in-memory state so all queues are empty at
35 * the end of every test. LocalTaskQueue configuration are not restored.
36 * {@link #tearDown()} does not restore default configuration values modified
37 * using:
38 * <ul>
39 * <li>{@link #setDisableAutoTaskExecution()}</li>
40 * <li>{@link #setQueueXmlPath()}</li>
41 * <li>{@link #setCallbackClass()}</li>
42 * <li>{@link #setShouldCopyApiProxyEnvironment()}</li>
43 * <li>{@link #setTaskExecutionLatch()}</li>
44 * </ul>
47 public final class LocalTaskQueueTestConfig implements LocalServiceTestConfig {
48 private static final Logger logger = Logger.getLogger(LocalTaskQueueTestConfig.class.getName());
49 private Boolean disableAutoTaskExecution = true;
50 private String queueXmlPath;
51 private Class<? extends LocalTaskQueueCallback> callbackClass;
52 private boolean shouldCopyApiProxyEnvironment = false;
53 private CountDownLatch taskExecutionLatch;
55 /**
56 * Disables/enables automatic task execution. If you enable automatic task
57 * execution, keep in mind that the default behavior is to hit the url that
58 * was provided when the {@link TaskOptions} was constructed. If you do not
59 * have a servlet engine running, this will fail. As an alternative to
60 * launching a servlet engine, instead consider providing a
61 * {@link LocalTaskQueueCallback} via {@link #setCallbackClass(Class)} so that
62 * you can assert on the properties of the URLFetchServicePb.URLFetchRequest.
64 * Once set, this value is persistent across tests. If this value needs to be
65 * set for any one test, it should be appropriately configured in the setup
66 * stage for all tests.
68 * @param disableAutoTaskExecution
69 * @return {@code this} (for chaining)
71 public LocalTaskQueueTestConfig setDisableAutoTaskExecution(boolean disableAutoTaskExecution) {
72 this.disableAutoTaskExecution = disableAutoTaskExecution;
73 return this;
76 /**
77 * Overrides the location of queue.xml. Must be a full path, e.g.
78 * /usr/local/dev/myapp/test/queue.xml
80 * Once set, this value is persistent across tests. If this value needs to be
81 * set for an operation specific to any one test, it should appropriately
82 * configured in the setup stage for all tests.
84 * @param queueXmlPath
85 * @return {@code this} (for chaining)
87 public LocalTaskQueueTestConfig setQueueXmlPath(String queueXmlPath) {
88 this.queueXmlPath = queueXmlPath;
89 return this;
92 /**
93 * Overrides the callback implementation used by the local task queue for
94 * async task execution.
96 * Once set, this value is persistent across tests. If this value needs to be
97 * set for any one test, it should be appropriately configured in the setup
98 * stage for all tests.
100 * @param callbackClass fully-qualified name of a class with a public, default
101 * constructor that implements {@link LocalTaskQueueCallback}.
102 * @return {@code this} (for chaining)
104 public LocalTaskQueueTestConfig setCallbackClass(
105 Class<? extends LocalTaskQueueCallback> callbackClass) {
106 this.callbackClass = callbackClass;
107 return this;
111 * Enables copying of the {@code ApiProxy.Environment} to task handler
112 * threads. This setting is ignored unless both
113 * <ol>
114 * <li>a {@link #setCallbackClass(Class) callback} class has been set, and
115 * <li>automatic task execution has been
116 * {@link #setDisableAutoTaskExecution(boolean) enabled.}
117 * </ol>
118 * In this case tasks will be handled locally by new threads and it may be
119 * useful for those threads to use the same environment data as the main test
120 * thread. Properties such as the
121 * {@link LocalServiceTestHelper#setEnvAppId(String) appID}, and the user
122 * {@link LocalServiceTestHelper#setEnvEmail(String) email} will be copied
123 * into the environment of the task threads. Be aware that
124 * {@link LocalServiceTestHelper#setEnvAttributes(java.util.Map) attribute
125 * map} will be shallow-copied to the task thread environents, so that any
126 * mutable objects used as values of the map should be thread safe. If this
127 * property is {@code false} then the task handler threads will have an empty
128 * {@code ApiProxy.Environment}. This property is {@code false} by default.
130 * Once set, this value is persistent across tests. If this value needs to be
131 * set for any one test, it should be appropriately configured in the setup
132 * stage for all tests.
134 * @param b should the {@code ApiProxy.Environment} be pushed to task handler
135 * threads
136 * @return {@code this} (for chaining)
138 public LocalTaskQueueTestConfig setShouldCopyApiProxyEnvironment(boolean b) {
139 this.shouldCopyApiProxyEnvironment = b;
140 return this;
144 * Sets a {@link CountDownLatch} that the thread executing the task will
145 * decrement after a {@link LocalTaskQueueCallback} finishes execution. This
146 * makes it easy for tests to block until a task queue task runs. Note that
147 * the latch is only used when a callback class is provided (via
148 * {@link #setCallbackClass(Class)}) and when automatic task execution is
149 * enabled (via {@link #setDisableAutoTaskExecution(boolean)}). Also note
150 * that a {@link CountDownLatch} cannot be reused, so if you have a test that
151 * requires the ability to "reset" a CountDownLatch you can pass an instance
152 * of {@link TaskCountDownLatch}, which exposes additional methods that help
153 * with this.
155 * Once set, this value is persistent across tests. If this value needs to be
156 * set for any one test, it should be appropriately configured in the setup
157 * stage for all tests.
159 * @param latch The latch.
160 * @return {@code this} (for chaining)
162 public LocalTaskQueueTestConfig setTaskExecutionLatch(CountDownLatch latch) {
163 this.taskExecutionLatch = latch;
164 return this;
167 @Override
168 public void setUp() {
169 ApiProxyLocal proxy = LocalServiceTestHelper.getApiProxyLocal();
170 proxy.setProperty(
171 LocalTaskQueue.DISABLE_AUTO_TASK_EXEC_PROP, disableAutoTaskExecution.toString());
172 if (queueXmlPath != null) {
173 proxy.setProperty(LocalTaskQueue.QUEUE_XML_PATH_PROP, queueXmlPath);
175 if (callbackClass != null) {
176 String callbackName;
177 if (!disableAutoTaskExecution) {
178 EnvSettingTaskqueueCallback.setProxyProperties(
179 proxy, callbackClass, shouldCopyApiProxyEnvironment);
180 if (taskExecutionLatch != null) {
181 EnvSettingTaskqueueCallback.setTaskExecutionLatch(taskExecutionLatch);
183 callbackName = EnvSettingTaskqueueCallback.class.getName();
184 } else {
185 callbackName = callbackClass.getName();
187 proxy.setProperty(LocalTaskQueue.CALLBACK_CLASS_PROP, callbackName);
191 @Override
192 public void tearDown() {
193 LocalTaskQueue ltq = getLocalTaskQueue();
194 if (ltq != null) {
195 for (String queueName : ltq.getQueueStateInfo().keySet()) {
196 ltq.flushQueue(queueName);
198 ltq.stop();
202 public static LocalTaskQueue getLocalTaskQueue() {
203 return (LocalTaskQueue) LocalServiceTestHelper.getLocalService(LocalTaskQueue.PACKAGE);
207 * A {@link LocalTaskQueueCallback} implementation that automatically detects
208 * and runs tasks with a {@link DeferredTask} payload.
210 * Requests with a payload that is not a {@link DeferredTask} are dispatched
211 * to {@link #executeNonDeferredRequest}, which by default does nothing.
212 * If you need to handle a payload like this you can extend the class and
213 * override this method to do what you need.
215 public static class DeferredTaskCallback implements LocalTaskQueueCallback {
216 private static final String CURRENT_NAMESPACE_HEADER = "X-AppEngine-Current-Namespace";
218 @Override
219 public void initialize(Map<String, String> properties) {
222 @Override
223 public int execute(URLFetchServicePb.URLFetchRequest req) {
224 String currentNamespace = NamespaceManager.get();
225 String requestNamespace = null;
226 ByteString payload = null;
227 for (URLFetchServicePb.URLFetchRequest.Header header : req.getHeaderList()) {
228 if (header.getKey().equals("content-type") &&
229 DeferredTaskContext.RUNNABLE_TASK_CONTENT_TYPE.equals(header.getValue())) {
230 payload = req.getPayload();
231 } else if (CURRENT_NAMESPACE_HEADER.equals(header.getKey())) {
232 requestNamespace = header.getValue();
235 boolean namespacesDiffer =
236 requestNamespace != null && !requestNamespace.equals(currentNamespace);
237 if (namespacesDiffer) {
238 NamespaceManager.set(requestNamespace);
241 try {
242 if (payload != null) {
243 ByteArrayInputStream bais = new ByteArrayInputStream(payload.toByteArray());
244 ObjectInputStream ois;
245 try {
246 ois = new ObjectInputStream(bais);
247 DeferredTask deferredTask = (DeferredTask) ois.readObject();
248 deferredTask.run();
249 return 200;
250 } catch (Exception e) {
251 logger.log(Level.WARNING, e.getMessage(), e);
252 return 500;
255 return executeNonDeferredRequest(req);
256 } finally {
257 if (namespacesDiffer) {
258 NamespaceManager.set(currentNamespace);
264 * Broken out to make it easy for subclasses to provide their own behavior
265 * when the request payload is not a {@link DeferredTask}.
267 protected int executeNonDeferredRequest(URLFetchServicePb.URLFetchRequest req) {
268 return 200;
273 * A class to delegate incoming task queue callbacks to HttpServlets based on a provided mapping.
275 public abstract static class ServletInvokingTaskCallback extends DeferredTaskCallback {
277 @Override
278 public void initialize(Map<String, String> properties) {
282 * @return A mapping from url path to HttpServlet. Where url path is a string that looks like
283 * "/foo/bar" (It must start with a '/' and should not contain characters that are not
284 * allowed in the path portion of a url.)
286 protected abstract Map<String, ? extends HttpServlet> getServletMap();
289 * @return A servlet that will be used if none of the ones from {@link #getServletMap()} match.
291 protected abstract HttpServlet getDefaultServlet();
293 private static Map<String, String> extractParamValues(final String body) {
294 Map<String, String> params = Maps.newHashMap();
295 if (body.length() > 0) {
296 for (String keyValue : body.split("&")) {
297 String[] split = keyValue.split("=");
298 try {
299 params.put(split[0], URLDecoder.decode(split[1], "utf-8"));
300 } catch (UnsupportedEncodingException e) {
301 throw new RuntimeException("Could not decode param " + split[1]);
305 return params;
308 @Override
309 protected int executeNonDeferredRequest(URLFetchServicePb.URLFetchRequest req) {
310 try {
311 FakeHttpServletResponse response = new FakeHttpServletResponse();
312 response.setCharacterEncoding("utf-8");
314 URL url = new URL(req.getUrl());
315 FakeHttpServletRequest request = new FakeHttpServletRequest();
316 request.setMethod(req.getMethod().name());
317 request.setHostName(url.getHost());
318 request.setPort(url.getPort());
319 request.setParametersFromQueryString(url.getQuery());
321 for (Header header : req.getHeaderList()) {
322 request.setHeader(header.getKey(), header.getValue());
325 String payload = req.getPayload().toStringUtf8();
326 for (Map.Entry<String, String> entry : extractParamValues(payload).entrySet()) {
327 request.addParameter(entry.getKey(), entry.getValue());
329 String servletPath = null;
330 HttpServlet servlet = null;
331 for (Entry<String, ? extends HttpServlet> entry : getServletMap().entrySet()) {
332 if (url.getPath().startsWith(entry.getKey())) {
333 servletPath = entry.getKey();
334 servlet = entry.getValue();
337 if (servlet == null) {
338 servlet = getDefaultServlet();
339 request.setPathInfo(url.getPath());
340 } else {
341 int servletPathStart = servletPath.lastIndexOf('/');
342 if (servletPathStart == -1) {
343 throw new IllegalArgumentException("The servlet path was configured as: "
344 + servletPath + " which does not contan a '/'");
346 request.setContextPath(servletPath.substring(0, servletPathStart));
347 request.setSerletPath(servletPath.substring(servletPathStart));
348 request.setPathInfo(url.getPath().substring(servletPath.length()));
350 servlet.service(request, response);
351 int result = response.getStatus();
352 return result;
353 } catch (Exception ex) {
354 logger.log(Level.WARNING, ex.getMessage(), ex);
355 return HttpServletResponse.SC_INTERNAL_SERVER_ERROR;
361 * A {@link CountDownLatch} extension that can be reset. Pass an instance of
362 * this class to {@link LocalTaskQueueTestConfig#setTaskExecutionLatch)} when
363 * you need to reuse the latch within or across tests. Only one thread at a
364 * time should ever call any of the {@link #await} or {@link #reset} methods.
366 public static final class TaskCountDownLatch extends CountDownLatch {
367 private int initialCount;
368 private CountDownLatch latch;
370 public TaskCountDownLatch(int count) {
371 super(count);
372 reset(count);
375 @Override
376 public long getCount() {
377 return latch.getCount();
380 @Override
381 public String toString() {
382 return latch.toString();
385 @Override
387 * {@inheritDoc}
388 * Only one thread at a time should call this.
390 public void await() throws InterruptedException {
391 latch.await();
394 @Override
396 * {@inheritDoc}
397 * Only one thread at a time should call this.
399 public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
400 return latch.await(timeout, unit);
403 @Override
404 public void countDown() {
405 latch.countDown();
409 * Shorthand for calling {@link #await()} followed by {@link #reset()}.
410 * Only one thread at a time should call this.
412 public void awaitAndReset() throws InterruptedException {
413 awaitAndReset(initialCount);
417 * Shorthand for calling {@link #await()} followed by {@link #reset(int)}.
418 * Only one thread at a time should call this.
420 public void awaitAndReset(int count) throws InterruptedException {
421 await();
422 reset(count);
426 * Shorthand for calling {@link #await(long, java.util.concurrent.TimeUnit)} followed by
427 * {@link #reset()}. Only one thread at a time should call this.
429 public boolean awaitAndReset(long timeout, TimeUnit unit)
430 throws InterruptedException {
431 return awaitAndReset(timeout, unit, initialCount);
435 * Shorthand for calling {@link #await(long, java.util.concurrent.TimeUnit)} followed by
436 * {@link #reset(int)}. Only one thread at a time should call this.
438 public boolean awaitAndReset(long timeout, TimeUnit unit, int count)
439 throws InterruptedException {
440 boolean result = await(timeout, unit);
441 reset(count);
442 return result;
446 * Resets the latch to its most recent initial count. Only one thread at a
447 * time should call this.
449 public void reset() {
450 reset(initialCount);
454 * Resets the latch to the provided count. Only one thread at a time
455 * should call this.
457 public void reset(int count) {
458 this.initialCount = count;
459 this.latch = new CountDownLatch(count);