Revision created by MOE tool push_codebase.
[gae.git] / java / src / main / com / google / appengine / api / taskqueue / QueueImpl.java
blobe196a7f0f65bbafa0ce84eec03c2c83c830e2a47
1 // Copyright 2010 Google Inc. All rights reserved.
2 package com.google.appengine.api.taskqueue;
4 import static com.google.appengine.api.taskqueue.QueueApiHelper.getInternal;
6 import com.google.appengine.api.NamespaceManager;
7 import com.google.appengine.api.datastore.DatastoreService;
8 import com.google.appengine.api.datastore.DatastoreServiceFactory;
9 import com.google.appengine.api.taskqueue.TaskOptions.Param;
10 import com.google.appengine.api.taskqueue.TaskQueuePb.TaskQueueAddRequest;
11 import com.google.appengine.api.taskqueue.TaskQueuePb.TaskQueueAddRequest.Header;
12 import com.google.appengine.api.taskqueue.TaskQueuePb.TaskQueueBulkAddRequest;
13 import com.google.appengine.api.taskqueue.TaskQueuePb.TaskQueueBulkAddResponse;
14 import com.google.appengine.api.taskqueue.TaskQueuePb.TaskQueueDeleteRequest;
15 import com.google.appengine.api.taskqueue.TaskQueuePb.TaskQueueDeleteResponse;
16 import com.google.appengine.api.taskqueue.TaskQueuePb.TaskQueueMode;
17 import com.google.appengine.api.taskqueue.TaskQueuePb.TaskQueueModifyTaskLeaseRequest;
18 import com.google.appengine.api.taskqueue.TaskQueuePb.TaskQueueModifyTaskLeaseResponse;
19 import com.google.appengine.api.taskqueue.TaskQueuePb.TaskQueuePurgeQueueRequest;
20 import com.google.appengine.api.taskqueue.TaskQueuePb.TaskQueuePurgeQueueResponse;
21 import com.google.appengine.api.taskqueue.TaskQueuePb.TaskQueueQueryAndOwnTasksRequest;
22 import com.google.appengine.api.taskqueue.TaskQueuePb.TaskQueueQueryAndOwnTasksResponse;
23 import com.google.appengine.api.taskqueue.TaskQueuePb.TaskQueueRetryParameters;
24 import com.google.appengine.api.taskqueue.TaskQueuePb.TaskQueueServiceError;
25 import com.google.apphosting.api.ApiProxy.ApiConfig;
26 import com.google.io.protocol.ProtocolMessage;
27 import java.io.UnsupportedEncodingException;
28 import java.net.URI;
29 import java.net.URISyntaxException;
30 import java.util.ArrayList;
31 import java.util.Arrays;
32 import java.util.Collections;
33 import java.util.HashMap;
34 import java.util.HashSet;
35 import java.util.List;
36 import java.util.Map.Entry;
37 import java.util.Set;
38 import java.util.concurrent.Future;
39 import java.util.concurrent.TimeUnit;
41 /**
42 * Implements the {@link Queue} interface.
43 * {@link QueueImpl} is thread safe.
46 class QueueImpl implements Queue {
47 private final String queueName;
48 private final DatastoreService datastoreService = DatastoreServiceFactory.getDatastoreService();
49 private final QueueApiHelper apiHelper;
51 /**
52 * The name of the HTTP header specifying the default namespace
53 * for API calls.
55 static final String DEFAULT_NAMESPACE_HEADER = "X-AppEngine-Default-Namespace";
56 static final String CURRENT_NAMESPACE_HEADER = "X-AppEngine-Current-Namespace";
58 static final double DEFAULT_LEASE_TASKS_DEADLINE_SECONDS = 10.0;
59 static final double DEFAULT_FETCH_STATISTICS_DEADLINE_SECONDS = 10.0;
61 QueueImpl(String queueName, QueueApiHelper apiHelper) {
62 QueueApiHelper.validateQueueName(queueName);
64 this.apiHelper = apiHelper;
65 this.queueName = queueName;
68 /**
69 * Transform a future returning a single-entry list into a future returning that entry.
70 * @param future A future whose result is a singleton list.
71 * @return A future whose result is the only element of the list.
73 private <T> Future<T> extractSingleEntry(Future<List<T>> future) {
74 return new FutureAdapter<List<T>, T>(future) {
75 @Override
76 protected T wrap(List<T> key) throws Exception {
77 if (key.size() != 1) {
78 throw new InternalFailureException(
79 "An internal error occurred while accessing queue '" + queueName + "'");
81 return key.get(0);
86 /**
87 * See {@link Queue#add()}
89 @Override
90 public TaskHandle add() {
91 return getInternal(addAsync());
94 /**
95 * See {@link Queue#addAsync()}
97 @Override
98 public Future<TaskHandle> addAsync() {
99 return addAsync(
100 getDatastoreService().getCurrentTransaction(null), TaskOptions.Builder.withDefaults());
104 * Returns a {@link URI} validated to only contain legal components.
105 * <p>The "scheme", "authority" and "fragment" components of a URI
106 * must not be specified. The path component must be absolute
107 * (i.e. start with "/").
109 * @param urlString The "url" specified by the client.
110 * @throws IllegalArgumentException The provided urlString is null, too long or does not have
111 * correct syntax.
113 private URI parsePartialUrl(String urlString) {
114 if (urlString == null) {
115 throw new IllegalArgumentException("url must not be null");
118 if (urlString.length() > QueueConstants.maxUrlLength()) {
119 throw new IllegalArgumentException(
120 "url is longer than " + ((Integer) QueueConstants.maxUrlLength()).toString() + ".");
123 URI uri;
124 try {
125 uri = new URI(urlString);
126 } catch (URISyntaxException exception) {
127 throw new IllegalArgumentException("URL syntax error", exception);
130 uriCheckNull(uri.getScheme(), "scheme");
131 uriCheckNull(uri.getRawAuthority(), "authority");
132 uriCheckNull(uri.getRawFragment(), "fragment");
133 String path = uri.getPath();
135 if (path == null || path.length() == 0 || path.charAt(0) != '/') {
136 if (path == null) {
137 path = "(null)";
138 } else if (path.length() == 0) {
139 path = "<empty string>";
141 throw new IllegalArgumentException(
142 "url must contain a path starting with '/' part - contains :" + path);
145 return uri;
148 private void uriCheckNull(String value, String valueName) {
149 if (value != null) {
150 throw new IllegalArgumentException(
151 "url must not contain a '" + valueName + "' part - contains :" + value);
155 private void checkPullTask(String url,
156 HashMap<String, List<String>> headers,
157 byte[] payload,
158 RetryOptions retryOptions) {
159 if (url != null && !url.isEmpty()) {
160 throw new IllegalArgumentException("May not specify url in tasks that have method PULL");
162 if (!headers.isEmpty()) {
163 throw new IllegalArgumentException(
164 "May not specify any header in tasks that have method PULL");
166 if (retryOptions != null) {
167 throw new IllegalArgumentException(
168 "May not specify retry options in tasks that have method PULL");
170 if (payload == null) {
171 throw new IllegalArgumentException("payload must be specified for tasks with method PULL");
175 private void checkPostTask(List<Param> params, byte[] payload, String query) {
176 if (query != null && query.length() != 0) {
177 throw new IllegalArgumentException(
178 "POST method may not have a query string; use setParamater(s) instead");
183 * Construct a byte array data from params if payload is not specified.
184 * If it sees payload is specified, return null.
185 * @throws IllegalArgumentException if params and payload both exist
187 private byte[] constructPayloadFromParams(List<Param> params, byte[] payload) {
188 if (!params.isEmpty() && payload != null) {
189 throw new IllegalArgumentException(
190 "Message body and parameters may not both be present; "
191 + "only one of these may be supplied");
193 return payload != null ? null : encodeParamsPost(params);
197 private void validateAndFillAddRequest(com.google.appengine.api.datastore.Transaction txn,
198 TaskOptions taskOptions,
199 TaskQueueAddRequest addRequest) {
200 boolean useUrlEncodedContentType = false;
202 HashMap<String, List<String>> headers = taskOptions.getHeaders();
203 String url = taskOptions.getUrl();
204 byte[] payload = taskOptions.getPayload();
205 List<Param> params = taskOptions.getParams();
206 RetryOptions retryOptions = taskOptions.getRetryOptions();
207 TaskOptions.Method method = taskOptions.getMethod();
209 URI parsedUrl;
210 if (url == null) {
211 parsedUrl = parsePartialUrl(defaultUrl());
212 } else {
213 parsedUrl = parsePartialUrl(url);
215 String query = parsedUrl.getQuery();
216 StringBuilder relativeUrl = new StringBuilder(parsedUrl.getRawPath());
217 if (query != null && query.length() != 0 && !params.isEmpty()) {
218 throw new IllegalArgumentException(
219 "Query string and parameters both present; only one of these may be supplied");
222 byte[] constructedPayload;
223 if (method == TaskOptions.Method.PULL) {
224 constructedPayload = constructPayloadFromParams(params, payload);
225 if (constructedPayload != null) {
226 payload = constructedPayload;
228 checkPullTask(url, headers, payload, retryOptions);
229 } else if (method == TaskOptions.Method.POST) {
230 constructedPayload = constructPayloadFromParams(params, payload);
231 if (constructedPayload != null) {
232 payload = constructedPayload;
233 useUrlEncodedContentType = true;
235 checkPostTask(params, payload, query);
236 } else {
237 if (!params.isEmpty()) {
238 query = encodeParamsUrlEncoded(params);
240 if (query != null && query.length() != 0) {
241 relativeUrl.append("?").append(query);
244 if (payload != null && payload.length != 0 && !taskOptions.getMethod().supportsBody()) {
245 throw new IllegalArgumentException(
246 taskOptions.getMethod().toString() + " method may not specify a payload.");
249 fillAddRequest(txn,
250 queueName,
251 taskOptions.getTaskName(),
252 determineEta(taskOptions),
253 method,
254 relativeUrl.toString(),
255 payload,
256 headers,
257 retryOptions,
258 useUrlEncodedContentType,
259 taskOptions.getTagAsBytes(),
260 addRequest);
263 private void fillAddRequest(com.google.appengine.api.datastore.Transaction txn,
264 String queueName,
265 String taskName,
266 long etaMillis,
267 TaskOptions.Method method,
268 String relativeUrl,
269 byte[] payload,
270 HashMap<String, List<String>> headers,
271 RetryOptions retryOptions,
272 boolean useUrlEncodedContentType,
273 byte[] tag,
274 TaskQueueAddRequest addRequest) {
275 addRequest.setQueueName(queueName);
276 addRequest.setTaskName(taskName == null ? "" : taskName);
278 if (method == TaskOptions.Method.PULL) {
279 addRequest.setMode(TaskQueueMode.Mode.PULL.getValue());
280 } else {
281 addRequest.setUrl(relativeUrl.toString());
282 addRequest.setMode(TaskQueueMode.Mode.PUSH.getValue());
283 addRequest.setMethod(method.getPbMethod());
286 if (payload != null) {
287 addRequest.setBodyAsBytes(payload);
290 addRequest.setEtaUsec(etaMillis * 1000);
292 if (taskName != null && !taskName.equals("") && txn != null) {
293 throw new IllegalArgumentException(
294 "transactional tasks cannot be named: " + taskName);
296 if (txn != null) {
297 addRequest.setTransaction(localTxnToRemoteTxn(txn));
300 if (retryOptions != null) {
301 fillRetryParameters(retryOptions, addRequest.getMutableRetryParameters());
304 if (NamespaceManager.getGoogleAppsNamespace().length() != 0) {
305 if (!headers.containsKey(DEFAULT_NAMESPACE_HEADER)) {
306 headers.put(DEFAULT_NAMESPACE_HEADER,
307 Arrays.asList(NamespaceManager.getGoogleAppsNamespace()));
310 if (!headers.containsKey(CURRENT_NAMESPACE_HEADER)) {
311 String namespace = NamespaceManager.get();
312 headers.put(CURRENT_NAMESPACE_HEADER, Arrays.asList(namespace == null ? "" : namespace));
314 for (Entry<String, List<String>> entry : headers.entrySet()) {
315 if (useUrlEncodedContentType && entry.getKey().toLowerCase().equals("content-type")) {
316 continue;
319 for (String value : entry.getValue()) {
320 Header header = addRequest.addHeader();
321 header.setKey(entry.getKey());
322 header.setValue(value);
325 if (useUrlEncodedContentType) {
326 Header contentTypeHeader = addRequest.addHeader();
327 contentTypeHeader.setKey("content-type");
328 contentTypeHeader.setValue("application/x-www-form-urlencoded");
331 if (tag != null) {
332 if (method != TaskOptions.Method.PULL) {
333 throw new IllegalArgumentException("Only PULL tasks can have a tag.");
335 if (tag.length > QueueConstants.maxTaskTagLength()) {
336 throw new IllegalArgumentException(
337 "Task tag must be no more than " + QueueConstants.maxTaskTagLength() + " bytes.");
339 addRequest.setTagAsBytes(tag);
342 if (method == TaskOptions.Method.PULL) {
343 if (addRequest.encodingSize() > QueueConstants.maxPullTaskSizeBytes()) {
344 throw new IllegalArgumentException("Task size too large");
346 } else {
347 if (addRequest.encodingSize() > QueueConstants.maxPushTaskSizeBytes()) {
348 throw new IllegalArgumentException("Task size too large");
354 * Translates a local transaction to the Datastore PB.
355 * Due to pb dependency issues, Transaction pb is redefined for TaskQueue.
356 * Keep in sync with DatastoreServiceImpl.localTxnToRemoteTxn.
358 private static Transaction localTxnToRemoteTxn(
359 com.google.appengine.api.datastore.Transaction local) {
360 Transaction remote = new Transaction();
361 remote.setApp(local.getApp());
362 remote.setHandle(Long.parseLong(local.getId()));
363 return remote;
367 * Translates from RetryOptions to TaskQueueRetryParameters.
368 * Also checks ensures minBackoffSeconds and maxBackoffSeconds are ordered
369 * correctly.
371 private static void fillRetryParameters(
372 RetryOptions retryOptions,
373 TaskQueueRetryParameters retryParameters) {
374 if (retryOptions.getTaskRetryLimit() != null) {
375 retryParameters.setRetryLimit(retryOptions.getTaskRetryLimit());
377 if (retryOptions.getTaskAgeLimitSeconds() != null) {
378 retryParameters.setAgeLimitSec(retryOptions.getTaskAgeLimitSeconds());
380 if (retryOptions.getMinBackoffSeconds() != null) {
381 retryParameters.setMinBackoffSec(retryOptions.getMinBackoffSeconds());
383 if (retryOptions.getMaxBackoffSeconds() != null) {
384 retryParameters.setMaxBackoffSec(retryOptions.getMaxBackoffSeconds());
386 if (retryOptions.getMaxDoublings() != null) {
387 retryParameters.setMaxDoublings(retryOptions.getMaxDoublings());
390 if (retryParameters.hasMinBackoffSec() && retryParameters.hasMaxBackoffSec()) {
391 if (retryParameters.getMinBackoffSec() > retryParameters.getMaxBackoffSec()) {
392 throw new IllegalArgumentException(
393 "minBackoffSeconds must not be greater than maxBackoffSeconds.");
395 } else if (retryParameters.hasMinBackoffSec()) {
396 if (retryParameters.getMinBackoffSec() > retryParameters.getMaxBackoffSec()) {
397 retryParameters.setMaxBackoffSec(retryParameters.getMinBackoffSec());
399 } else if (retryParameters.hasMaxBackoffSec()) {
400 if (retryParameters.getMinBackoffSec() > retryParameters.getMaxBackoffSec()) {
401 retryParameters.setMinBackoffSec(retryParameters.getMaxBackoffSec());
407 * See {@link Queue#add(TaskOptions)}.
409 @Override
410 public TaskHandle add(TaskOptions taskOptions) {
411 return getInternal(addAsync(taskOptions));
415 * See {@link Queue#addAsync(TaskOptions)}.
417 @Override
418 public Future<TaskHandle> addAsync(TaskOptions taskOptions) {
419 return addAsync(getDatastoreService().getCurrentTransaction(null), taskOptions);
423 * See {@link Queue#add(Iterable)}.
425 @Override
426 public List<TaskHandle> add(Iterable<TaskOptions> taskOptions) {
427 return getInternal(addAsync(taskOptions));
431 * See {@link Queue#addAsync(Iterable)}.
433 @Override
434 public Future<List<TaskHandle>> addAsync(Iterable<TaskOptions> taskOptions) {
435 return addAsync(getDatastoreService().getCurrentTransaction(null), taskOptions);
439 * See {@link Queue#add(com.google.appengine.api.datastore.Transaction, TaskOptions)}.
441 @Override
442 public TaskHandle add(com.google.appengine.api.datastore.Transaction txn,
443 TaskOptions taskOptions) {
444 return getInternal(addAsync(txn, taskOptions));
448 * See {@link Queue#addAsync(com.google.appengine.api.datastore.Transaction, TaskOptions)}.
450 @Override
451 public Future<TaskHandle> addAsync(
452 com.google.appengine.api.datastore.Transaction txn, TaskOptions taskOptions) {
453 return extractSingleEntry(addAsync(txn, Collections.singletonList(taskOptions)));
457 * See {@link
458 * Queue#add(com.google.appengine.api.datastore.Transaction, Iterable)}.
460 @Override
461 public List<TaskHandle> add(com.google.appengine.api.datastore.Transaction txn,
462 Iterable<TaskOptions> taskOptions) {
463 return getInternal(addAsync(txn, taskOptions));
467 * See {@link
468 * Queue#addAsync(com.google.appengine.api.datastore.Transaction, Iterable)}.
470 @Override
471 public Future<List<TaskHandle>> addAsync(
472 com.google.appengine.api.datastore.Transaction txn, Iterable<TaskOptions> taskOptions) {
473 final List<TaskOptions> taskOptionsList = new ArrayList<TaskOptions>();
474 Set<String> taskNames = new HashSet<String>();
476 final TaskQueueBulkAddRequest bulkAddRequest = new TaskQueueBulkAddRequest();
478 boolean hasPushTask = false;
479 boolean hasPullTask = false;
480 for (TaskOptions option : taskOptions) {
481 TaskQueueAddRequest addRequest = bulkAddRequest.addAddRequest();
482 validateAndFillAddRequest(txn, option, addRequest);
483 if (addRequest.getMode() == TaskQueueMode.Mode.PULL.getValue()) {
484 hasPullTask = true;
485 } else {
486 hasPushTask = true;
489 taskOptionsList.add(option);
490 if (option.getTaskName() != null && !option.getTaskName().equals("")) {
491 if (!taskNames.add(option.getTaskName())) {
492 throw new IllegalArgumentException(
493 String.format("Identical task names in request : \"%s\" duplicated",
494 option.getTaskName()));
498 if (bulkAddRequest.addRequestSize() > QueueConstants.maxTasksPerAdd()) {
499 throw new IllegalArgumentException(
500 String.format("No more than %d tasks can be added in a single add call",
501 QueueConstants.maxTasksPerAdd()));
504 if (hasPullTask && hasPushTask) {
505 throw new IllegalArgumentException(
506 "May not add both push tasks and pull tasks in the same call.");
509 if (txn != null &&
510 bulkAddRequest.encodingSize() > QueueConstants.maxTransactionalRequestSizeBytes()) {
511 throw new IllegalArgumentException(
512 String.format("Transactional add may not be larger than %d bytes: %d bytes requested.",
513 QueueConstants.maxTransactionalRequestSizeBytes(),
514 bulkAddRequest.encodingSize()));
517 Future<TaskQueueBulkAddResponse> responseFuture = makeAsyncCall(
518 "BulkAdd", bulkAddRequest, new TaskQueueBulkAddResponse());
519 return new FutureAdapter<TaskQueueBulkAddResponse, List<TaskHandle>>(responseFuture) {
520 @Override
521 protected List<TaskHandle> wrap(TaskQueueBulkAddResponse bulkAddResponse) {
522 if (bulkAddResponse.taskResultSize() != bulkAddRequest.addRequestSize()) {
523 throw new InternalFailureException(
524 String.format("expected %d results from BulkAdd(), got %d",
525 bulkAddRequest.addRequestSize(), bulkAddResponse.taskResultSize()));
528 List<TaskHandle> tasks = new ArrayList<TaskHandle>();
529 for (int i = 0; i < bulkAddResponse.taskResultSize(); ++i) {
530 TaskQueueBulkAddResponse.TaskResult taskResult = bulkAddResponse.taskResults().get(i);
531 TaskQueueAddRequest addRequest = bulkAddRequest.getAddRequest(i);
532 TaskOptions options = taskOptionsList.get(i);
534 if (taskResult.getResult() == TaskQueueServiceError.ErrorCode.OK.getValue()) {
535 String taskName = options.getTaskName();
536 if (taskResult.hasChosenTaskName()) {
537 taskName = taskResult.getChosenTaskName();
539 TaskOptions taskResultOptions = new TaskOptions(options);
540 taskResultOptions.taskName(taskName).payload(addRequest.getBodyAsBytes());
541 TaskHandle handle = new TaskHandle(taskResultOptions, queueName);
542 tasks.add(handle.etaUsec(addRequest.getEtaUsec()));
543 } else if (taskResult.getResult() != TaskQueueServiceError.ErrorCode.SKIPPED.getValue()) {
544 throw QueueApiHelper.translateError(taskResult.getResult(), options.getTaskName());
547 return tasks;
552 long currentTimeMillis() {
553 return System.currentTimeMillis();
556 private long determineEta(TaskOptions taskOptions) {
557 Long etaMillis = taskOptions.getEtaMillis();
558 Long countdownMillis = taskOptions.getCountdownMillis();
559 if (etaMillis == null) {
560 if (countdownMillis == null) {
561 return currentTimeMillis();
562 } else {
563 if (countdownMillis > QueueConstants.getMaxEtaDeltaMillis()) {
564 throw new IllegalArgumentException("ETA too far into the future");
566 if (countdownMillis < 0) {
567 throw new IllegalArgumentException("Negative countdown is not allowed");
569 return currentTimeMillis() + countdownMillis;
571 } else {
572 if (countdownMillis == null) {
573 if (etaMillis - currentTimeMillis() > QueueConstants.getMaxEtaDeltaMillis()) {
574 throw new IllegalArgumentException("ETA too far into the future");
576 if (etaMillis < 0) {
577 throw new IllegalArgumentException("Negative ETA is invalid");
579 return etaMillis;
580 } else {
581 throw new IllegalArgumentException(
582 "Only one or neither of EtaMillis and CountdownMillis may be specified");
587 byte[] encodeParamsPost(List<Param> params) {
588 byte[] payload;
589 try {
590 payload = encodeParamsUrlEncoded(params).getBytes("UTF-8");
591 } catch (UnsupportedEncodingException exception) {
592 throw new UnsupportedTranslationException(exception);
595 return payload;
598 String encodeParamsUrlEncoded(List<Param> params) {
599 StringBuilder result = new StringBuilder();
600 try {
601 String appender = "";
602 for (Param param : params) {
603 result.append(appender);
604 appender = "&";
605 result.append(param.getURLEncodedName());
606 result.append("=");
607 result.append(param.getURLEncodedValue());
609 } catch (UnsupportedEncodingException exception) {
610 throw new UnsupportedTranslationException(exception);
612 return result.toString();
615 private String defaultUrl() {
616 return DEFAULT_QUEUE_PATH + "/" + queueName;
620 * See {@link Queue#getQueueName()}.
622 @Override
623 public String getQueueName() {
624 return queueName;
627 DatastoreService getDatastoreService() {
628 return datastoreService;
632 * See {@link Queue#purge()}.
634 @Override
635 public void purge() {
636 TaskQueuePurgeQueueRequest purgeRequest = new TaskQueuePurgeQueueRequest();
637 TaskQueuePurgeQueueResponse purgeResponse = new TaskQueuePurgeQueueResponse();
639 purgeRequest.setQueueName(queueName);
640 apiHelper.makeSyncCall("PurgeQueue", purgeRequest, purgeResponse);
644 * See {@link Queue#deleteTask(String)}.
646 @Override
647 public boolean deleteTask(String taskName) {
648 return getInternal(deleteTaskAsync(taskName));
652 * See {@link Queue#deleteTaskAsync(String)}.
654 @Override
655 public Future<Boolean> deleteTaskAsync(String taskName) {
656 TaskHandle.validateTaskName(taskName);
657 return deleteTaskAsync(new TaskHandle(TaskOptions.Builder.withTaskName(taskName),
658 queueName));
662 * See {@link Queue#deleteTask(TaskHandle)}.
664 @Override
665 public boolean deleteTask(TaskHandle taskHandle) {
666 return getInternal(deleteTaskAsync(taskHandle));
670 * See {@link Queue#deleteTaskAsync(TaskHandle)}.
672 @Override
673 public Future<Boolean> deleteTaskAsync(TaskHandle taskHandle) {
674 return extractSingleEntry(deleteTaskAsync(Collections.singletonList(taskHandle)));
678 * See {@link Queue#deleteTask(List<TaskHandle>)}.
680 @Override
681 public List<Boolean> deleteTask(List<TaskHandle> taskHandles) {
682 return getInternal(deleteTaskAsync(taskHandles));
686 * See {@link Queue#deleteTaskAsync(List<TaskHandle>)}.
688 @Override
689 public Future<List<Boolean>> deleteTaskAsync(List<TaskHandle> taskHandles) {
691 final TaskQueueDeleteRequest deleteRequest = new TaskQueueDeleteRequest();
692 deleteRequest.setQueueName(queueName);
694 for (TaskHandle taskHandle : taskHandles) {
695 if (taskHandle.getQueueName().equals(this.queueName)) {
696 deleteRequest.addTaskName(taskHandle.getName());
697 } else {
698 throw new QueueNameMismatchException(
699 String.format("The task %s is associated with the queue named %s "
700 + "and cannot be deleted from the queue named %s.",
701 taskHandle.getName(), taskHandle.getQueueName(), this.queueName));
705 Future<TaskQueueDeleteResponse> responseFuture = makeAsyncCall(
706 "Delete", deleteRequest, new TaskQueueDeleteResponse());
707 return new FutureAdapter<TaskQueueDeleteResponse, List<Boolean>>(responseFuture) {
708 @Override
709 protected List<Boolean> wrap(TaskQueueDeleteResponse deleteResponse) {
710 List<Boolean> result = new ArrayList<Boolean>(deleteResponse.resultSize());
712 for (int i = 0; i < deleteResponse.resultSize(); ++i) {
713 int errorCode = deleteResponse.getResult(i);
714 if (errorCode != TaskQueueServiceError.ErrorCode.OK.getValue() &&
715 errorCode != TaskQueueServiceError.ErrorCode.TOMBSTONED_TASK.getValue() &&
716 errorCode != TaskQueueServiceError.ErrorCode.UNKNOWN_TASK.getValue()) {
717 throw QueueApiHelper.translateError(errorCode, deleteRequest.getTaskName(i));
719 result.add(errorCode == TaskQueueServiceError.ErrorCode.OK.getValue());
722 return result;
727 private Future<List<TaskHandle>> leaseTasksInternal(LeaseOptions options) {
728 long leaseMillis = options.getUnit().toMillis(options.getLease());
729 if (leaseMillis > QueueConstants.maxLease(TimeUnit.MILLISECONDS)) {
730 throw new IllegalArgumentException(
731 String.format("A lease period can be no longer than %d seconds",
732 QueueConstants.maxLease(TimeUnit.SECONDS)));
735 if (options.getCountLimit() > QueueConstants.maxLeaseCount()) {
736 throw new IllegalArgumentException(
737 String.format("No more than %d tasks can be leased in one call",
738 QueueConstants.maxLeaseCount()));
741 TaskQueueQueryAndOwnTasksRequest leaseRequest = new TaskQueueQueryAndOwnTasksRequest();
743 leaseRequest.setQueueName(queueName);
744 leaseRequest.setLeaseSeconds(leaseMillis / 1000.0);
745 leaseRequest.setMaxTasks(options.getCountLimit());
746 if (options.getGroupByTag()) {
747 leaseRequest.setGroupByTag(true);
748 if (options.getTag() != null) {
749 leaseRequest.setTagAsBytes(options.getTag());
753 ApiConfig apiConfig = new ApiConfig();
754 if (options.getDeadlineInSeconds() == null) {
755 apiConfig.setDeadlineInSeconds(DEFAULT_LEASE_TASKS_DEADLINE_SECONDS);
756 } else {
757 apiConfig.setDeadlineInSeconds(options.getDeadlineInSeconds());
760 Future<TaskQueueQueryAndOwnTasksResponse> responseFuture = apiHelper.makeAsyncCall(
761 "QueryAndOwnTasks", leaseRequest, new TaskQueueQueryAndOwnTasksResponse(), apiConfig);
762 return new FutureAdapter<TaskQueueQueryAndOwnTasksResponse, List<TaskHandle>>(responseFuture) {
763 @Override
764 protected List<TaskHandle> wrap(TaskQueueQueryAndOwnTasksResponse leaseResponse) {
765 List<TaskHandle> result = new ArrayList<TaskHandle>();
766 for (TaskQueueQueryAndOwnTasksResponse.Task response : leaseResponse.tasks()) {
767 TaskOptions taskOptions = TaskOptions.Builder.withTaskName(response.getTaskName())
768 .payload(response.getBodyAsBytes())
769 .method(TaskOptions.Method.PULL);
770 if (response.hasTag()) {
771 taskOptions.tag(response.getTagAsBytes());
773 TaskHandle handle = new TaskHandle(taskOptions, queueName, response.getRetryCount());
774 result.add(handle.etaUsec(response.getEtaUsec()));
777 return result;
783 * See {@link Queue#leaseTasks(long, TimeUnit, long)}.
785 @Override
786 public List<TaskHandle> leaseTasks(long lease, TimeUnit unit, long countLimit) {
787 return getInternal(leaseTasksAsync(lease, unit, countLimit));
791 * See {@link Queue#leaseTasksAsync(long, TimeUnit, long)}.
793 @Override
794 public Future<List<TaskHandle>> leaseTasksAsync(
795 long lease, TimeUnit unit, long countLimit) {
796 return leaseTasksInternal(LeaseOptions.Builder.withLeasePeriod(lease, unit)
797 .countLimit(countLimit));
801 * See {@link Queue#leaseTasksByTagBytes(long, TimeUnit, long, byte[])}.
803 @Override
804 public List<TaskHandle> leaseTasksByTagBytes(
805 long lease, TimeUnit unit, long countLimit, byte[] tag) {
806 return getInternal(leaseTasksByTagBytesAsync(lease, unit, countLimit, tag));
810 * See {@link Queue#leaseTasksByTagBytesAsync(long, TimeUnit, long, byte[])}.
812 @Override
813 public Future<List<TaskHandle>> leaseTasksByTagBytesAsync(
814 long lease, TimeUnit unit, long countLimit, byte[] tag) {
815 LeaseOptions options = LeaseOptions.Builder.withLeasePeriod(lease, unit)
816 .countLimit(countLimit);
817 if (tag != null) {
818 options.tag(tag);
819 } else {
820 options.groupByTag();
822 return leaseTasksInternal(options);
826 * See {@link Queue#leaseTasksByTag(long, TimeUnit, long, String)}.
828 @Override
829 public List<TaskHandle> leaseTasksByTag(long lease, TimeUnit unit,
830 long countLimit, String tag) {
831 return getInternal(leaseTasksByTagAsync(lease, unit, countLimit, tag));
835 * See {@link Queue#leaseTasksByTagAsync(long, TimeUnit, long, String)}.
837 @Override
838 public Future<List<TaskHandle>> leaseTasksByTagAsync(
839 long lease, TimeUnit unit, long countLimit, String tag) {
840 LeaseOptions options = LeaseOptions.Builder.withLeasePeriod(lease, unit)
841 .countLimit(countLimit);
842 if (tag != null) {
843 options.tag(tag);
844 } else {
845 options.groupByTag();
847 return leaseTasksInternal(options);
851 * See {@link Queue#leaseTasks(LeaseOptions)}.
853 @Override
854 public List<TaskHandle> leaseTasks(LeaseOptions options) {
855 return getInternal(leaseTasksAsync(options));
859 * See {@link Queue#leaseTasksAsync(LeaseOptions)}.
861 @Override
862 public Future<List<TaskHandle>> leaseTasksAsync(LeaseOptions options) {
863 if (options.getLease() == null) {
864 throw new IllegalArgumentException("The lease period must be specified");
866 if (options.getCountLimit() == null) {
867 throw new IllegalArgumentException("The count limit must be specified");
869 return leaseTasksInternal(options);
873 * See {@link Queue#modifyTaskLease(TaskHandle, long, TimeUnit)}.
875 @Override
876 public TaskHandle modifyTaskLease(TaskHandle taskHandle, long lease, TimeUnit unit) {
877 long leaseMillis = unit.toMillis(lease);
878 if (leaseMillis > QueueConstants.maxLease(TimeUnit.MILLISECONDS)) {
879 throw new IllegalArgumentException(
880 String.format("The lease time specified (%s seconds) is too large. " +
881 "Lease period can be no longer than %d seconds.",
882 formatLeaseTimeInSeconds(leaseMillis),
883 QueueConstants.maxLease(TimeUnit.SECONDS)));
885 if (leaseMillis < 0) {
886 throw new IllegalArgumentException(
887 String.format("The lease time must not be negative. " +
888 "Specified lease time was %s seconds.",
889 formatLeaseTimeInSeconds(leaseMillis)));
892 TaskQueueModifyTaskLeaseRequest request = new TaskQueueModifyTaskLeaseRequest();
893 TaskQueueModifyTaskLeaseResponse response = new TaskQueueModifyTaskLeaseResponse();
895 request.setQueueName(this.queueName);
896 request.setTaskName(taskHandle.getName());
897 request.setLeaseSeconds(leaseMillis / 1000.0);
898 request.setEtaUsec(taskHandle.getEtaUsec());
900 apiHelper.makeSyncCall("ModifyTaskLease", request, response);
901 taskHandle.etaUsec(response.getUpdatedEtaUsec());
902 return taskHandle;
905 private String formatLeaseTimeInSeconds(long milliSeconds) {
906 long seconds = TimeUnit.SECONDS.convert(milliSeconds, TimeUnit.MILLISECONDS);
907 long remainder = milliSeconds - TimeUnit.MILLISECONDS.convert(seconds, TimeUnit.SECONDS);
908 String formatString = milliSeconds < 0 ? "-%01d.%03d" : "%01d.%03d";
909 return String.format(formatString, Math.abs(seconds), Math.abs(remainder));
913 * See {@link Queue#fetchStatistics()}.
915 @Override
916 public QueueStatistics fetchStatistics() {
917 return getInternal(fetchStatisticsAsync(null));
921 * See {@link Queue#fetchStatisticsAsync(Double)}.
923 @Override
924 public Future<QueueStatistics> fetchStatisticsAsync( Double deadlineInSeconds) {
925 if (deadlineInSeconds == null) {
926 deadlineInSeconds = DEFAULT_FETCH_STATISTICS_DEADLINE_SECONDS;
929 if (deadlineInSeconds <= 0.0) {
930 throw new IllegalArgumentException("Deadline must be > 0, got " +
931 deadlineInSeconds);
934 List<Queue> queues = Collections.<Queue>singletonList(this);
935 Future<List<QueueStatistics>> future = QueueStatistics.fetchForQueuesAsync(
936 queues, apiHelper, deadlineInSeconds);
937 return extractSingleEntry(future);
940 <T extends ProtocolMessage<T>> Future<T> makeAsyncCall(
941 String methodName, ProtocolMessage<?> request, T response) {
942 return apiHelper.makeAsyncCall(methodName, request, response, new ApiConfig());