Revision created by MOE tool push_codebase.
[gae.git] / java / src / main / com / google / appengine / tools / development / testing / LocalTaskQueueTestConfig.java
blob52b715e31da51946d68fef03a8bc57606d152dc7
1 // Copyright 2009 Google Inc. All Rights Reserved.
2 package com.google.appengine.tools.development.testing;
4 import com.google.appengine.api.NamespaceManager;
5 import com.google.appengine.api.taskqueue.DeferredTask;
6 import com.google.appengine.api.taskqueue.DeferredTaskContext;
7 import com.google.appengine.api.taskqueue.TaskOptions;
8 import com.google.appengine.api.taskqueue.dev.LocalTaskQueue;
9 import com.google.appengine.api.taskqueue.dev.LocalTaskQueueCallback;
10 import com.google.appengine.api.urlfetch.URLFetchServicePb;
11 import com.google.appengine.api.urlfetch.URLFetchServicePb.URLFetchRequest.Header;
12 import com.google.appengine.tools.development.ApiProxyLocal;
13 import com.google.common.collect.Maps;
14 import com.google.protobuf.ByteString;
16 import java.io.ByteArrayInputStream;
17 import java.io.ObjectInputStream;
18 import java.io.UnsupportedEncodingException;
19 import java.net.URL;
20 import java.net.URLDecoder;
21 import java.util.Map;
22 import java.util.Map.Entry;
23 import java.util.concurrent.CountDownLatch;
24 import java.util.concurrent.TimeUnit;
25 import java.util.logging.Level;
26 import java.util.logging.Logger;
28 import javax.servlet.http.HttpServlet;
29 import javax.servlet.http.HttpServletResponse;
31 /**
32 * Config for accessing the local task queue in tests. Default behavior is to
33 * configure the local task queue to not automatically execute any tasks.
34 * {@link #tearDown()} wipes out all in-memory state so all queues are empty at
35 * the end of every test.
38 public final class LocalTaskQueueTestConfig implements LocalServiceTestConfig {
39 private static final Logger logger = Logger.getLogger(LocalTaskQueueTestConfig.class.getName());
40 private Boolean disableAutoTaskExecution = true;
41 private String queueXmlPath;
42 private Class<? extends LocalTaskQueueCallback> callbackClass;
43 private boolean shouldCopyApiProxyEnvironment = false;
44 private CountDownLatch taskExecutionLatch;
46 /**
47 * Disables/enables automatic task execution. If you enable automatic task
48 * execution, keep in mind that the default behavior is to hit the url that
49 * was provided when the {@link TaskOptions} was constructed. If you do not
50 * have a servlet engine running, this will fail. As an alternative to
51 * launching a servlet engine, instead consider providing a
52 * {@link LocalTaskQueueCallback} via {@link #setCallbackClass(Class)} so that
53 * you can assert on the properties of the URLFetchServicePb.URLFetchRequest.
55 * @param disableAutoTaskExecution
56 * @return {@code this} (for chaining)
58 public LocalTaskQueueTestConfig setDisableAutoTaskExecution(boolean disableAutoTaskExecution) {
59 this.disableAutoTaskExecution = disableAutoTaskExecution;
60 return this;
63 /**
64 * Overrides the location of queue.xml. Must be a full path, e.g.
65 * /usr/local/dev/myapp/test/queue.xml
67 * @param queueXmlPath
68 * @return {@code this} (for chaining)
70 public LocalTaskQueueTestConfig setQueueXmlPath(String queueXmlPath) {
71 this.queueXmlPath = queueXmlPath;
72 return this;
75 /**
76 * Overrides the callback implementation used by the local task queue for
77 * async task execution.
79 * @param callbackClass fully-qualified name of a class with a public, default
80 * constructor that implements {@link LocalTaskQueueCallback}.
81 * @return {@code this} (for chaining)
83 public LocalTaskQueueTestConfig setCallbackClass(
84 Class<? extends LocalTaskQueueCallback> callbackClass) {
85 this.callbackClass = callbackClass;
86 return this;
89 /**
90 * Enables copying of the {@code ApiProxy.Environment} to task handler
91 * threads. This setting is ignored unless both
92 * <ol>
93 * <li>a {@link #setCallbackClass(Class) callback} class has been set, and
94 * <li>automatic task execution has been
95 * {@link #setDisableAutoTaskExecution(boolean) enabled.}
96 * </ol>
97 * In this case tasks will be handled locally by new threads and it may be
98 * useful for those threads to use the same environment data as the main test
99 * thread. Properties such as the
100 * {@link LocalServiceTestHelper#setEnvAppId(String) appID}, and the user
101 * {@link LocalServiceTestHelper#setEnvEmail(String) email} will be copied
102 * into the environment of the task threads. Be aware that
103 * {@link LocalServiceTestHelper#setEnvAttributes(java.util.Map) attribute
104 * map} will be shallow-copied to the task thread environents, so that any
105 * mutable objects used as values of the map should be thread safe. If this
106 * property is {@code false} then the task handler threads will have an empty
107 * {@code ApiProxy.Environment}. This property is {@code false} by default.
109 * @param b should the {@code ApiProxy.Environment} be pushed to task handler
110 * threads
111 * @return {@code this} (for chaining)
113 public LocalTaskQueueTestConfig setShouldCopyApiProxyEnvironment(boolean b) {
114 this.shouldCopyApiProxyEnvironment = b;
115 return this;
119 * Sets a {@link CountDownLatch} that the thread executing the task will
120 * decrement after a {@link LocalTaskQueueCallback} finishes execution. This
121 * makes it easy for tests to block until a task queue task runs. Note that
122 * the latch is only used when a callback class is provided (via
123 * {@link #setCallbackClass(Class)}) and when automatic task execution is
124 * enabled (via {@link #setDisableAutoTaskExecution(boolean)}). Also note
125 * that a {@link CountDownLatch} cannot be reused, so if you have a test that
126 * requires the ability to "reset" a CountDownLatch you can pass an instance
127 * of {@link TaskCountDownLatch}, which exposes additional methods that help
128 * with this.
130 * @param latch The latch.
131 * @return {@code this} (for chaining)
133 public LocalTaskQueueTestConfig setTaskExecutionLatch(CountDownLatch latch) {
134 this.taskExecutionLatch = latch;
135 return this;
138 @Override
139 public void setUp() {
140 ApiProxyLocal proxy = LocalServiceTestHelper.getApiProxyLocal();
141 proxy.setProperty(
142 LocalTaskQueue.DISABLE_AUTO_TASK_EXEC_PROP, disableAutoTaskExecution.toString());
143 if (queueXmlPath != null) {
144 proxy.setProperty(LocalTaskQueue.QUEUE_XML_PATH_PROP, queueXmlPath);
146 if (callbackClass != null) {
147 String callbackName;
148 if (!disableAutoTaskExecution) {
149 EnvSettingTaskqueueCallback.setProxyProperties(
150 proxy, callbackClass, shouldCopyApiProxyEnvironment);
151 if (taskExecutionLatch != null) {
152 EnvSettingTaskqueueCallback.setTaskExecutionLatch(taskExecutionLatch);
154 callbackName = EnvSettingTaskqueueCallback.class.getName();
155 } else {
156 callbackName = callbackClass.getName();
158 proxy.setProperty(LocalTaskQueue.CALLBACK_CLASS_PROP, callbackName);
162 @Override
163 public void tearDown() {
164 LocalTaskQueue ltq = getLocalTaskQueue();
165 if (ltq != null) {
166 for (String queueName : ltq.getQueueStateInfo().keySet()) {
167 ltq.flushQueue(queueName);
169 ltq.stop();
173 public static LocalTaskQueue getLocalTaskQueue() {
174 return (LocalTaskQueue) LocalServiceTestHelper.getLocalService(LocalTaskQueue.PACKAGE);
178 * A {@link LocalTaskQueueCallback} implementation that automatically detects
179 * and runs tasks with a {@link DeferredTask} payload.
181 * Requests with a payload that is not a {@link DeferredTask} are dispatched
182 * to {@link #executeNonDeferredRequest}, which by default does nothing.
183 * If you need to handle a payload like this you can extend the class and
184 * override this method to do what you need.
186 public static class DeferredTaskCallback implements LocalTaskQueueCallback {
187 private static final String CURRENT_NAMESPACE_HEADER = "X-AppEngine-Current-Namespace";
189 @Override
190 public void initialize(Map<String, String> properties) {
193 @Override
194 public int execute(URLFetchServicePb.URLFetchRequest req) {
195 String currentNamespace = NamespaceManager.get();
196 String requestNamespace = null;
197 ByteString payload = null;
198 for (URLFetchServicePb.URLFetchRequest.Header header : req.getHeaderList()) {
199 if (header.getKey().equals("content-type") &&
200 DeferredTaskContext.RUNNABLE_TASK_CONTENT_TYPE.equals(header.getValue())) {
201 payload = req.getPayload();
202 } else if (CURRENT_NAMESPACE_HEADER.equals(header.getKey())) {
203 requestNamespace = header.getValue();
206 boolean namespacesDiffer =
207 requestNamespace != null && !requestNamespace.equals(currentNamespace);
208 if (namespacesDiffer) {
209 NamespaceManager.set(requestNamespace);
212 try {
213 if (payload != null) {
214 ByteArrayInputStream bais = new ByteArrayInputStream(payload.toByteArray());
215 ObjectInputStream ois;
216 try {
217 ois = new ObjectInputStream(bais);
218 DeferredTask deferredTask = (DeferredTask) ois.readObject();
219 deferredTask.run();
220 return 200;
221 } catch (Exception e) {
222 logger.log(Level.WARNING, e.getMessage(), e);
223 return 500;
226 return executeNonDeferredRequest(req);
227 } finally {
228 if (namespacesDiffer) {
229 NamespaceManager.set(currentNamespace);
235 * Broken out to make it easy for subclasses to provide their own behavior
236 * when the request payload is not a {@link DeferredTask}.
238 protected int executeNonDeferredRequest(URLFetchServicePb.URLFetchRequest req) {
239 return 200;
244 * A class to delegate incoming task queue callbacks to HttpServlets based on a provided mapping.
246 public abstract static class ServletInvokingTaskCallback extends DeferredTaskCallback {
248 @Override
249 public void initialize(Map<String, String> properties) {
253 * @return A mapping from url path to HttpServlet. Where url path is a string that looks like
254 * "/foo/bar" (It must start with a '/' and should not contain characters that are not
255 * allowed in the path portion of a url.)
257 protected abstract Map<String, ? extends HttpServlet> getServletMap();
260 * @return A servlet that will be used if none of the ones from {@link #getServletMap()} match.
262 protected abstract HttpServlet getDefaultServlet();
264 private static Map<String, String> extractParamValues(final String body) {
265 Map<String, String> params = Maps.newHashMap();
266 if (body.length() > 0) {
267 for (String keyValue : body.split("&")) {
268 String[] split = keyValue.split("=");
269 try {
270 params.put(split[0], URLDecoder.decode(split[1], "utf-8"));
271 } catch (UnsupportedEncodingException e) {
272 throw new RuntimeException("Could not decode param " + split[1]);
276 return params;
279 @Override
280 protected int executeNonDeferredRequest(URLFetchServicePb.URLFetchRequest req) {
281 try {
282 FakeHttpServletResponse response = new FakeHttpServletResponse();
283 response.setCharacterEncoding("utf-8");
285 URL url = new URL(req.getUrl());
286 FakeHttpServletRequest request = new FakeHttpServletRequest();
287 request.setMethod(req.getMethod().name());
288 request.setHostName(url.getHost());
289 request.setPort(url.getPort());
290 request.setParametersFromQueryString(url.getQuery());
292 for (Header header : req.getHeaderList()) {
293 request.setHeader(header.getKey(), header.getValue());
296 String payload = req.getPayload().toStringUtf8();
297 for (Map.Entry<String, String> entry : extractParamValues(payload).entrySet()) {
298 request.addParameter(entry.getKey(), entry.getValue());
300 String servletPath = null;
301 HttpServlet servlet = null;
302 for (Entry<String, ? extends HttpServlet> entry : getServletMap().entrySet()) {
303 if (url.getPath().startsWith(entry.getKey())) {
304 servletPath = entry.getKey();
305 servlet = entry.getValue();
308 if (servlet == null) {
309 servlet = getDefaultServlet();
310 request.setPathInfo(url.getPath());
311 } else {
312 int servletPathStart = servletPath.lastIndexOf('/');
313 if (servletPathStart == -1) {
314 throw new IllegalArgumentException("The servlet path was configured as: "
315 + servletPath + " which does not contan a '/'");
317 request.setContextPath(servletPath.substring(0, servletPathStart));
318 request.setSerletPath(servletPath.substring(servletPathStart));
319 request.setPathInfo(url.getPath().substring(servletPath.length()));
321 servlet.service(request, response);
322 int result = response.getStatus();
323 return result;
324 } catch (Exception ex) {
325 logger.log(Level.WARNING, ex.getMessage(), ex);
326 return HttpServletResponse.SC_INTERNAL_SERVER_ERROR;
332 * A {@link CountDownLatch} extension that can be reset. Pass an instance of
333 * this class to {@link LocalTaskQueueTestConfig#setTaskExecutionLatch)} when
334 * you need to reuse the latch within or across tests. Only one thread at a
335 * time should ever call any of the {@link #await} or {@link #reset} methods.
337 public static final class TaskCountDownLatch extends CountDownLatch {
338 private int initialCount;
339 private CountDownLatch latch;
341 public TaskCountDownLatch(int count) {
342 super(count);
343 reset(count);
346 @Override
347 public long getCount() {
348 return latch.getCount();
351 @Override
352 public String toString() {
353 return latch.toString();
356 @Override
358 * {@inheritDoc}
359 * Only one thread at a time should call this.
361 public void await() throws InterruptedException {
362 latch.await();
365 @Override
367 * {@inheritDoc}
368 * Only one thread at a time should call this.
370 public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
371 return latch.await(timeout, unit);
374 @Override
375 public void countDown() {
376 latch.countDown();
380 * Shorthand for calling {@link #await()} followed by {@link #reset()}.
381 * Only one thread at a time should call this.
383 public void awaitAndReset() throws InterruptedException {
384 awaitAndReset(initialCount);
388 * Shorthand for calling {@link #await()} followed by {@link #reset(int)}.
389 * Only one thread at a time should call this.
391 public void awaitAndReset(int count) throws InterruptedException {
392 await();
393 reset(count);
397 * Shorthand for calling {@link #await(long, java.util.concurrent.TimeUnit)} followed by
398 * {@link #reset()}. Only one thread at a time should call this.
400 public boolean awaitAndReset(long timeout, TimeUnit unit)
401 throws InterruptedException {
402 return awaitAndReset(timeout, unit, initialCount);
406 * Shorthand for calling {@link #await(long, java.util.concurrent.TimeUnit)} followed by
407 * {@link #reset(int)}. Only one thread at a time should call this.
409 public boolean awaitAndReset(long timeout, TimeUnit unit, int count)
410 throws InterruptedException {
411 boolean result = await(timeout, unit);
412 reset(count);
413 return result;
417 * Resets the latch to its most recent initial count. Only one thread at a
418 * time should call this.
420 public void reset() {
421 reset(initialCount);
425 * Resets the latch to the provided count. Only one thread at a time
426 * should call this.
428 public void reset(int count) {
429 this.initialCount = count;
430 this.latch = new CountDownLatch(count);