1 // Copyright 2010 Google Inc. All rights reserved.
2 package com
.google
.appengine
.api
.taskqueue
;
4 import static com
.google
.appengine
.api
.taskqueue
.QueueApiHelper
.getInternal
;
6 import com
.google
.appengine
.api
.NamespaceManager
;
7 import com
.google
.appengine
.api
.datastore
.DatastoreService
;
8 import com
.google
.appengine
.api
.datastore
.DatastoreServiceFactory
;
9 import com
.google
.appengine
.api
.taskqueue
.TaskOptions
.Param
;
10 import com
.google
.appengine
.api
.taskqueue
.TaskQueuePb
.TaskQueueAddRequest
;
11 import com
.google
.appengine
.api
.taskqueue
.TaskQueuePb
.TaskQueueAddRequest
.Header
;
12 import com
.google
.appengine
.api
.taskqueue
.TaskQueuePb
.TaskQueueBulkAddRequest
;
13 import com
.google
.appengine
.api
.taskqueue
.TaskQueuePb
.TaskQueueBulkAddResponse
;
14 import com
.google
.appengine
.api
.taskqueue
.TaskQueuePb
.TaskQueueDeleteRequest
;
15 import com
.google
.appengine
.api
.taskqueue
.TaskQueuePb
.TaskQueueDeleteResponse
;
16 import com
.google
.appengine
.api
.taskqueue
.TaskQueuePb
.TaskQueueMode
;
17 import com
.google
.appengine
.api
.taskqueue
.TaskQueuePb
.TaskQueueModifyTaskLeaseRequest
;
18 import com
.google
.appengine
.api
.taskqueue
.TaskQueuePb
.TaskQueueModifyTaskLeaseResponse
;
19 import com
.google
.appengine
.api
.taskqueue
.TaskQueuePb
.TaskQueuePurgeQueueRequest
;
20 import com
.google
.appengine
.api
.taskqueue
.TaskQueuePb
.TaskQueuePurgeQueueResponse
;
21 import com
.google
.appengine
.api
.taskqueue
.TaskQueuePb
.TaskQueueQueryAndOwnTasksRequest
;
22 import com
.google
.appengine
.api
.taskqueue
.TaskQueuePb
.TaskQueueQueryAndOwnTasksResponse
;
23 import com
.google
.appengine
.api
.taskqueue
.TaskQueuePb
.TaskQueueRetryParameters
;
24 import com
.google
.appengine
.api
.taskqueue
.TaskQueuePb
.TaskQueueServiceError
;
25 import com
.google
.apphosting
.api
.ApiProxy
.ApiConfig
;
26 import com
.google
.io
.protocol
.ProtocolMessage
;
27 import java
.io
.UnsupportedEncodingException
;
29 import java
.net
.URISyntaxException
;
30 import java
.util
.ArrayList
;
31 import java
.util
.Arrays
;
32 import java
.util
.Collections
;
33 import java
.util
.HashMap
;
34 import java
.util
.HashSet
;
35 import java
.util
.List
;
36 import java
.util
.Map
.Entry
;
38 import java
.util
.concurrent
.Future
;
39 import java
.util
.concurrent
.TimeUnit
;
42 * Implements the {@link Queue} interface.
43 * {@link QueueImpl} is thread safe.
46 class QueueImpl
implements Queue
{
47 private final String queueName
;
48 private final DatastoreService datastoreService
= DatastoreServiceFactory
.getDatastoreService();
49 private final QueueApiHelper apiHelper
;
52 * The name of the HTTP header specifying the default namespace
55 static final String DEFAULT_NAMESPACE_HEADER
= "X-AppEngine-Default-Namespace";
56 static final String CURRENT_NAMESPACE_HEADER
= "X-AppEngine-Current-Namespace";
58 static final double DEFAULT_LEASE_TASKS_DEADLINE_SECONDS
= 10.0;
59 static final double DEFAULT_FETCH_STATISTICS_DEADLINE_SECONDS
= 10.0;
61 QueueImpl(String queueName
, QueueApiHelper apiHelper
) {
62 QueueApiHelper
.validateQueueName(queueName
);
64 this.apiHelper
= apiHelper
;
65 this.queueName
= queueName
;
69 * Transform a future returning a single-entry list into a future returning that entry.
70 * @param future A future whose result is a singleton list.
71 * @return A future whose result is the only element of the list.
73 private <T
> Future
<T
> extractSingleEntry(Future
<List
<T
>> future
) {
74 return new FutureAdapter
<List
<T
>, T
>(future
) {
76 protected T
wrap(List
<T
> key
) throws Exception
{
77 if (key
.size() != 1) {
78 throw new InternalFailureException(
79 "An internal error occurred while accessing queue '" + queueName
+ "'");
87 * See {@link Queue#add()}
90 public TaskHandle
add() {
91 return getInternal(addAsync());
95 * See {@link Queue#addAsync()}
98 public Future
<TaskHandle
> addAsync() {
100 getDatastoreService().getCurrentTransaction(null), TaskOptions
.Builder
.withDefaults());
104 * Returns a {@link URI} validated to only contain legal components.
105 * <p>The "scheme", "authority" and "fragment" components of a URI
106 * must not be specified. The path component must be absolute
107 * (i.e. start with "/").
109 * @param urlString The "url" specified by the client.
110 * @throws IllegalArgumentException The provided urlString is null, too long or does not have
113 private URI
parsePartialUrl(String urlString
) {
114 if (urlString
== null) {
115 throw new IllegalArgumentException("url must not be null");
118 if (urlString
.length() > QueueConstants
.maxUrlLength()) {
119 throw new IllegalArgumentException(
120 "url is longer than " + ((Integer
) QueueConstants
.maxUrlLength()).toString() + ".");
125 uri
= new URI(urlString
);
126 } catch (URISyntaxException exception
) {
127 throw new IllegalArgumentException("URL syntax error", exception
);
130 uriCheckNull(uri
.getScheme(), "scheme");
131 uriCheckNull(uri
.getRawAuthority(), "authority");
132 uriCheckNull(uri
.getRawFragment(), "fragment");
133 String path
= uri
.getPath();
135 if (path
== null || path
.length() == 0 || path
.charAt(0) != '/') {
138 } else if (path
.length() == 0) {
139 path
= "<empty string>";
141 throw new IllegalArgumentException(
142 "url must contain a path starting with '/' part - contains :" + path
);
148 private void uriCheckNull(String value
, String valueName
) {
150 throw new IllegalArgumentException(
151 "url must not contain a '" + valueName
+ "' part - contains :" + value
);
155 private void checkPullTask(String url
,
156 HashMap
<String
, List
<String
>> headers
,
158 RetryOptions retryOptions
) {
159 if (url
!= null && !url
.isEmpty()) {
160 throw new IllegalArgumentException("May not specify url in tasks that have method PULL");
162 if (!headers
.isEmpty()) {
163 throw new IllegalArgumentException(
164 "May not specify any header in tasks that have method PULL");
166 if (retryOptions
!= null) {
167 throw new IllegalArgumentException(
168 "May not specify retry options in tasks that have method PULL");
170 if (payload
== null) {
171 throw new IllegalArgumentException("payload must be specified for tasks with method PULL");
175 private void checkPostTask(List
<Param
> params
, byte[] payload
, String query
) {
176 if (query
!= null && query
.length() != 0) {
177 throw new IllegalArgumentException(
178 "POST method may not have a query string; use setParamater(s) instead");
183 * Construct a byte array data from params if payload is not specified.
184 * If it sees payload is specified, return null.
185 * @throws IllegalArgumentException if params and payload both exist
187 private byte[] constructPayloadFromParams(List
<Param
> params
, byte[] payload
) {
188 if (!params
.isEmpty() && payload
!= null) {
189 throw new IllegalArgumentException(
190 "Message body and parameters may not both be present; "
191 + "only one of these may be supplied");
193 return payload
!= null ?
null : encodeParamsPost(params
);
197 private void validateAndFillAddRequest(com
.google
.appengine
.api
.datastore
.Transaction txn
,
198 TaskOptions taskOptions
,
199 TaskQueueAddRequest addRequest
) {
200 boolean useUrlEncodedContentType
= false;
202 HashMap
<String
, List
<String
>> headers
= taskOptions
.getHeaders();
203 String url
= taskOptions
.getUrl();
204 byte[] payload
= taskOptions
.getPayload();
205 List
<Param
> params
= taskOptions
.getParams();
206 RetryOptions retryOptions
= taskOptions
.getRetryOptions();
207 TaskOptions
.Method method
= taskOptions
.getMethod();
211 parsedUrl
= parsePartialUrl(defaultUrl());
213 parsedUrl
= parsePartialUrl(url
);
215 String query
= parsedUrl
.getQuery();
216 StringBuilder relativeUrl
= new StringBuilder(parsedUrl
.getRawPath());
217 if (query
!= null && query
.length() != 0 && !params
.isEmpty()) {
218 throw new IllegalArgumentException(
219 "Query string and parameters both present; only one of these may be supplied");
222 byte[] constructedPayload
;
223 if (method
== TaskOptions
.Method
.PULL
) {
224 constructedPayload
= constructPayloadFromParams(params
, payload
);
225 if (constructedPayload
!= null) {
226 payload
= constructedPayload
;
228 checkPullTask(url
, headers
, payload
, retryOptions
);
229 } else if (method
== TaskOptions
.Method
.POST
) {
230 constructedPayload
= constructPayloadFromParams(params
, payload
);
231 if (constructedPayload
!= null) {
232 payload
= constructedPayload
;
233 useUrlEncodedContentType
= true;
235 checkPostTask(params
, payload
, query
);
237 if (!params
.isEmpty()) {
238 query
= encodeParamsUrlEncoded(params
);
240 if (query
!= null && query
.length() != 0) {
241 relativeUrl
.append("?").append(query
);
244 if (payload
!= null && payload
.length
!= 0 && !taskOptions
.getMethod().supportsBody()) {
245 throw new IllegalArgumentException(
246 taskOptions
.getMethod().toString() + " method may not specify a payload.");
251 taskOptions
.getTaskName(),
252 determineEta(taskOptions
),
254 relativeUrl
.toString(),
258 useUrlEncodedContentType
,
259 taskOptions
.getTagAsBytes(),
263 private void fillAddRequest(com
.google
.appengine
.api
.datastore
.Transaction txn
,
267 TaskOptions
.Method method
,
270 HashMap
<String
, List
<String
>> headers
,
271 RetryOptions retryOptions
,
272 boolean useUrlEncodedContentType
,
274 TaskQueueAddRequest addRequest
) {
275 addRequest
.setQueueName(queueName
);
276 addRequest
.setTaskName(taskName
== null ?
"" : taskName
);
278 if (method
== TaskOptions
.Method
.PULL
) {
279 addRequest
.setMode(TaskQueueMode
.Mode
.PULL
.getValue());
281 addRequest
.setUrl(relativeUrl
.toString());
282 addRequest
.setMode(TaskQueueMode
.Mode
.PUSH
.getValue());
283 addRequest
.setMethod(method
.getPbMethod());
286 if (payload
!= null) {
287 addRequest
.setBodyAsBytes(payload
);
290 addRequest
.setEtaUsec(etaMillis
* 1000);
292 if (taskName
!= null && !taskName
.equals("") && txn
!= null) {
293 throw new IllegalArgumentException(
294 "transactional tasks cannot be named: " + taskName
);
297 addRequest
.setTransaction(localTxnToRemoteTxn(txn
));
300 if (retryOptions
!= null) {
301 fillRetryParameters(retryOptions
, addRequest
.getMutableRetryParameters());
304 if (NamespaceManager
.getGoogleAppsNamespace().length() != 0) {
305 if (!headers
.containsKey(DEFAULT_NAMESPACE_HEADER
)) {
306 headers
.put(DEFAULT_NAMESPACE_HEADER
,
307 Arrays
.asList(NamespaceManager
.getGoogleAppsNamespace()));
310 if (!headers
.containsKey(CURRENT_NAMESPACE_HEADER
)) {
311 String namespace
= NamespaceManager
.get();
312 headers
.put(CURRENT_NAMESPACE_HEADER
, Arrays
.asList(namespace
== null ?
"" : namespace
));
314 for (Entry
<String
, List
<String
>> entry
: headers
.entrySet()) {
315 if (useUrlEncodedContentType
&& entry
.getKey().toLowerCase().equals("content-type")) {
319 for (String value
: entry
.getValue()) {
320 Header header
= addRequest
.addHeader();
321 header
.setKey(entry
.getKey());
322 header
.setValue(value
);
325 if (useUrlEncodedContentType
) {
326 Header contentTypeHeader
= addRequest
.addHeader();
327 contentTypeHeader
.setKey("content-type");
328 contentTypeHeader
.setValue("application/x-www-form-urlencoded");
332 if (method
!= TaskOptions
.Method
.PULL
) {
333 throw new IllegalArgumentException("Only PULL tasks can have a tag.");
335 if (tag
.length
> QueueConstants
.maxTaskTagLength()) {
336 throw new IllegalArgumentException(
337 "Task tag must be no more than " + QueueConstants
.maxTaskTagLength() + " bytes.");
339 addRequest
.setTagAsBytes(tag
);
342 if (method
== TaskOptions
.Method
.PULL
) {
343 if (addRequest
.encodingSize() > QueueConstants
.maxPullTaskSizeBytes()) {
344 throw new IllegalArgumentException("Task size too large");
347 if (addRequest
.encodingSize() > QueueConstants
.maxPushTaskSizeBytes()) {
348 throw new IllegalArgumentException("Task size too large");
354 * Translates a local transaction to the Datastore PB.
355 * Due to pb dependency issues, Transaction pb is redefined for TaskQueue.
356 * Keep in sync with DatastoreServiceImpl.localTxnToRemoteTxn.
358 private static Transaction
localTxnToRemoteTxn(
359 com
.google
.appengine
.api
.datastore
.Transaction local
) {
360 Transaction remote
= new Transaction();
361 remote
.setApp(local
.getApp());
362 remote
.setHandle(Long
.parseLong(local
.getId()));
367 * Translates from RetryOptions to TaskQueueRetryParameters.
368 * Also checks ensures minBackoffSeconds and maxBackoffSeconds are ordered
371 private static void fillRetryParameters(
372 RetryOptions retryOptions
,
373 TaskQueueRetryParameters retryParameters
) {
374 if (retryOptions
.getTaskRetryLimit() != null) {
375 retryParameters
.setRetryLimit(retryOptions
.getTaskRetryLimit());
377 if (retryOptions
.getTaskAgeLimitSeconds() != null) {
378 retryParameters
.setAgeLimitSec(retryOptions
.getTaskAgeLimitSeconds());
380 if (retryOptions
.getMinBackoffSeconds() != null) {
381 retryParameters
.setMinBackoffSec(retryOptions
.getMinBackoffSeconds());
383 if (retryOptions
.getMaxBackoffSeconds() != null) {
384 retryParameters
.setMaxBackoffSec(retryOptions
.getMaxBackoffSeconds());
386 if (retryOptions
.getMaxDoublings() != null) {
387 retryParameters
.setMaxDoublings(retryOptions
.getMaxDoublings());
390 if (retryParameters
.hasMinBackoffSec() && retryParameters
.hasMaxBackoffSec()) {
391 if (retryParameters
.getMinBackoffSec() > retryParameters
.getMaxBackoffSec()) {
392 throw new IllegalArgumentException(
393 "minBackoffSeconds must not be greater than maxBackoffSeconds.");
395 } else if (retryParameters
.hasMinBackoffSec()) {
396 if (retryParameters
.getMinBackoffSec() > retryParameters
.getMaxBackoffSec()) {
397 retryParameters
.setMaxBackoffSec(retryParameters
.getMinBackoffSec());
399 } else if (retryParameters
.hasMaxBackoffSec()) {
400 if (retryParameters
.getMinBackoffSec() > retryParameters
.getMaxBackoffSec()) {
401 retryParameters
.setMinBackoffSec(retryParameters
.getMaxBackoffSec());
407 * See {@link Queue#add(TaskOptions)}.
410 public TaskHandle
add(TaskOptions taskOptions
) {
411 return getInternal(addAsync(taskOptions
));
415 * See {@link Queue#addAsync(TaskOptions)}.
418 public Future
<TaskHandle
> addAsync(TaskOptions taskOptions
) {
419 return addAsync(getDatastoreService().getCurrentTransaction(null), taskOptions
);
423 * See {@link Queue#add(Iterable)}.
426 public List
<TaskHandle
> add(Iterable
<TaskOptions
> taskOptions
) {
427 return getInternal(addAsync(taskOptions
));
431 * See {@link Queue#addAsync(Iterable)}.
434 public Future
<List
<TaskHandle
>> addAsync(Iterable
<TaskOptions
> taskOptions
) {
435 return addAsync(getDatastoreService().getCurrentTransaction(null), taskOptions
);
439 * See {@link Queue#add(com.google.appengine.api.datastore.Transaction, TaskOptions)}.
442 public TaskHandle
add(com
.google
.appengine
.api
.datastore
.Transaction txn
,
443 TaskOptions taskOptions
) {
444 return getInternal(addAsync(txn
, taskOptions
));
448 * See {@link Queue#addAsync(com.google.appengine.api.datastore.Transaction, TaskOptions)}.
451 public Future
<TaskHandle
> addAsync(
452 com
.google
.appengine
.api
.datastore
.Transaction txn
, TaskOptions taskOptions
) {
453 return extractSingleEntry(addAsync(txn
, Collections
.singletonList(taskOptions
)));
458 * Queue#add(com.google.appengine.api.datastore.Transaction, Iterable)}.
461 public List
<TaskHandle
> add(com
.google
.appengine
.api
.datastore
.Transaction txn
,
462 Iterable
<TaskOptions
> taskOptions
) {
463 return getInternal(addAsync(txn
, taskOptions
));
468 * Queue#addAsync(com.google.appengine.api.datastore.Transaction, Iterable)}.
471 public Future
<List
<TaskHandle
>> addAsync(
472 com
.google
.appengine
.api
.datastore
.Transaction txn
, Iterable
<TaskOptions
> taskOptions
) {
473 final List
<TaskOptions
> taskOptionsList
= new ArrayList
<TaskOptions
>();
474 Set
<String
> taskNames
= new HashSet
<String
>();
476 final TaskQueueBulkAddRequest bulkAddRequest
= new TaskQueueBulkAddRequest();
478 boolean hasPushTask
= false;
479 boolean hasPullTask
= false;
480 for (TaskOptions option
: taskOptions
) {
481 TaskQueueAddRequest addRequest
= bulkAddRequest
.addAddRequest();
482 validateAndFillAddRequest(txn
, option
, addRequest
);
483 if (addRequest
.getMode() == TaskQueueMode
.Mode
.PULL
.getValue()) {
489 taskOptionsList
.add(option
);
490 if (option
.getTaskName() != null && !option
.getTaskName().equals("")) {
491 if (!taskNames
.add(option
.getTaskName())) {
492 throw new IllegalArgumentException(
493 String
.format("Identical task names in request : \"%s\" duplicated",
494 option
.getTaskName()));
498 if (bulkAddRequest
.addRequestSize() > QueueConstants
.maxTasksPerAdd()) {
499 throw new IllegalArgumentException(
500 String
.format("No more than %d tasks can be added in a single add call",
501 QueueConstants
.maxTasksPerAdd()));
504 if (hasPullTask
&& hasPushTask
) {
505 throw new IllegalArgumentException(
506 "May not add both push tasks and pull tasks in the same call.");
510 bulkAddRequest
.encodingSize() > QueueConstants
.maxTransactionalRequestSizeBytes()) {
511 throw new IllegalArgumentException(
512 String
.format("Transactional add may not be larger than %d bytes: %d bytes requested.",
513 QueueConstants
.maxTransactionalRequestSizeBytes(),
514 bulkAddRequest
.encodingSize()));
517 Future
<TaskQueueBulkAddResponse
> responseFuture
= makeAsyncCall(
518 "BulkAdd", bulkAddRequest
, new TaskQueueBulkAddResponse());
519 return new FutureAdapter
<TaskQueueBulkAddResponse
, List
<TaskHandle
>>(responseFuture
) {
521 protected List
<TaskHandle
> wrap(TaskQueueBulkAddResponse bulkAddResponse
) {
522 if (bulkAddResponse
.taskResultSize() != bulkAddRequest
.addRequestSize()) {
523 throw new InternalFailureException(
524 String
.format("expected %d results from BulkAdd(), got %d",
525 bulkAddRequest
.addRequestSize(), bulkAddResponse
.taskResultSize()));
528 List
<TaskHandle
> tasks
= new ArrayList
<TaskHandle
>();
529 for (int i
= 0; i
< bulkAddResponse
.taskResultSize(); ++i
) {
530 TaskQueueBulkAddResponse
.TaskResult taskResult
= bulkAddResponse
.taskResults().get(i
);
531 TaskQueueAddRequest addRequest
= bulkAddRequest
.getAddRequest(i
);
532 TaskOptions options
= taskOptionsList
.get(i
);
534 if (taskResult
.getResult() == TaskQueueServiceError
.ErrorCode
.OK
.getValue()) {
535 String taskName
= options
.getTaskName();
536 if (taskResult
.hasChosenTaskName()) {
537 taskName
= taskResult
.getChosenTaskName();
539 TaskOptions taskResultOptions
= new TaskOptions(options
);
540 taskResultOptions
.taskName(taskName
).payload(addRequest
.getBodyAsBytes());
541 TaskHandle handle
= new TaskHandle(taskResultOptions
, queueName
);
542 tasks
.add(handle
.etaUsec(addRequest
.getEtaUsec()));
543 } else if (taskResult
.getResult() != TaskQueueServiceError
.ErrorCode
.SKIPPED
.getValue()) {
544 throw QueueApiHelper
.translateError(taskResult
.getResult(), options
.getTaskName());
552 long currentTimeMillis() {
553 return System
.currentTimeMillis();
556 private long determineEta(TaskOptions taskOptions
) {
557 Long etaMillis
= taskOptions
.getEtaMillis();
558 Long countdownMillis
= taskOptions
.getCountdownMillis();
559 if (etaMillis
== null) {
560 if (countdownMillis
== null) {
561 return currentTimeMillis();
563 if (countdownMillis
> QueueConstants
.getMaxEtaDeltaMillis()) {
564 throw new IllegalArgumentException("ETA too far into the future");
566 if (countdownMillis
< 0) {
567 throw new IllegalArgumentException("Negative countdown is not allowed");
569 return currentTimeMillis() + countdownMillis
;
572 if (countdownMillis
== null) {
573 if (etaMillis
- currentTimeMillis() > QueueConstants
.getMaxEtaDeltaMillis()) {
574 throw new IllegalArgumentException("ETA too far into the future");
577 throw new IllegalArgumentException("Negative ETA is invalid");
581 throw new IllegalArgumentException(
582 "Only one or neither of EtaMillis and CountdownMillis may be specified");
587 byte[] encodeParamsPost(List
<Param
> params
) {
590 payload
= encodeParamsUrlEncoded(params
).getBytes("UTF-8");
591 } catch (UnsupportedEncodingException exception
) {
592 throw new UnsupportedTranslationException(exception
);
598 String
encodeParamsUrlEncoded(List
<Param
> params
) {
599 StringBuilder result
= new StringBuilder();
601 String appender
= "";
602 for (Param param
: params
) {
603 result
.append(appender
);
605 result
.append(param
.getURLEncodedName());
607 result
.append(param
.getURLEncodedValue());
609 } catch (UnsupportedEncodingException exception
) {
610 throw new UnsupportedTranslationException(exception
);
612 return result
.toString();
615 private String
defaultUrl() {
616 return DEFAULT_QUEUE_PATH
+ "/" + queueName
;
620 * See {@link Queue#getQueueName()}.
623 public String
getQueueName() {
627 DatastoreService
getDatastoreService() {
628 return datastoreService
;
632 * See {@link Queue#purge()}.
635 public void purge() {
636 TaskQueuePurgeQueueRequest purgeRequest
= new TaskQueuePurgeQueueRequest();
637 TaskQueuePurgeQueueResponse purgeResponse
= new TaskQueuePurgeQueueResponse();
639 purgeRequest
.setQueueName(queueName
);
640 apiHelper
.makeSyncCall("PurgeQueue", purgeRequest
, purgeResponse
);
644 * See {@link Queue#deleteTask(String)}.
647 public boolean deleteTask(String taskName
) {
648 return getInternal(deleteTaskAsync(taskName
));
652 * See {@link Queue#deleteTaskAsync(String)}.
655 public Future
<Boolean
> deleteTaskAsync(String taskName
) {
656 TaskHandle
.validateTaskName(taskName
);
657 return deleteTaskAsync(new TaskHandle(TaskOptions
.Builder
.withTaskName(taskName
),
662 * See {@link Queue#deleteTask(TaskHandle)}.
665 public boolean deleteTask(TaskHandle taskHandle
) {
666 return getInternal(deleteTaskAsync(taskHandle
));
670 * See {@link Queue#deleteTaskAsync(TaskHandle)}.
673 public Future
<Boolean
> deleteTaskAsync(TaskHandle taskHandle
) {
674 return extractSingleEntry(deleteTaskAsync(Collections
.singletonList(taskHandle
)));
678 * See {@link Queue#deleteTask(List<TaskHandle>)}.
681 public List
<Boolean
> deleteTask(List
<TaskHandle
> taskHandles
) {
682 return getInternal(deleteTaskAsync(taskHandles
));
686 * See {@link Queue#deleteTaskAsync(List<TaskHandle>)}.
689 public Future
<List
<Boolean
>> deleteTaskAsync(List
<TaskHandle
> taskHandles
) {
691 final TaskQueueDeleteRequest deleteRequest
= new TaskQueueDeleteRequest();
692 deleteRequest
.setQueueName(queueName
);
694 for (TaskHandle taskHandle
: taskHandles
) {
695 if (taskHandle
.getQueueName().equals(this.queueName
)) {
696 deleteRequest
.addTaskName(taskHandle
.getName());
698 throw new QueueNameMismatchException(
699 String
.format("The task %s is associated with the queue named %s "
700 + "and cannot be deleted from the queue named %s.",
701 taskHandle
.getName(), taskHandle
.getQueueName(), this.queueName
));
705 Future
<TaskQueueDeleteResponse
> responseFuture
= makeAsyncCall(
706 "Delete", deleteRequest
, new TaskQueueDeleteResponse());
707 return new FutureAdapter
<TaskQueueDeleteResponse
, List
<Boolean
>>(responseFuture
) {
709 protected List
<Boolean
> wrap(TaskQueueDeleteResponse deleteResponse
) {
710 List
<Boolean
> result
= new ArrayList
<Boolean
>(deleteResponse
.resultSize());
712 for (int i
= 0; i
< deleteResponse
.resultSize(); ++i
) {
713 int errorCode
= deleteResponse
.getResult(i
);
714 if (errorCode
!= TaskQueueServiceError
.ErrorCode
.OK
.getValue() &&
715 errorCode
!= TaskQueueServiceError
.ErrorCode
.TOMBSTONED_TASK
.getValue() &&
716 errorCode
!= TaskQueueServiceError
.ErrorCode
.UNKNOWN_TASK
.getValue()) {
717 throw QueueApiHelper
.translateError(errorCode
, deleteRequest
.getTaskName(i
));
719 result
.add(errorCode
== TaskQueueServiceError
.ErrorCode
.OK
.getValue());
727 private Future
<List
<TaskHandle
>> leaseTasksInternal(LeaseOptions options
) {
728 long leaseMillis
= options
.getUnit().toMillis(options
.getLease());
729 if (leaseMillis
> QueueConstants
.maxLease(TimeUnit
.MILLISECONDS
)) {
730 throw new IllegalArgumentException(
731 String
.format("A lease period can be no longer than %d seconds",
732 QueueConstants
.maxLease(TimeUnit
.SECONDS
)));
735 if (options
.getCountLimit() > QueueConstants
.maxLeaseCount()) {
736 throw new IllegalArgumentException(
737 String
.format("No more than %d tasks can be leased in one call",
738 QueueConstants
.maxLeaseCount()));
741 TaskQueueQueryAndOwnTasksRequest leaseRequest
= new TaskQueueQueryAndOwnTasksRequest();
743 leaseRequest
.setQueueName(queueName
);
744 leaseRequest
.setLeaseSeconds(leaseMillis
/ 1000.0);
745 leaseRequest
.setMaxTasks(options
.getCountLimit());
746 if (options
.getGroupByTag()) {
747 leaseRequest
.setGroupByTag(true);
748 if (options
.getTag() != null) {
749 leaseRequest
.setTagAsBytes(options
.getTag());
753 ApiConfig apiConfig
= new ApiConfig();
754 if (options
.getDeadlineInSeconds() == null) {
755 apiConfig
.setDeadlineInSeconds(DEFAULT_LEASE_TASKS_DEADLINE_SECONDS
);
757 apiConfig
.setDeadlineInSeconds(options
.getDeadlineInSeconds());
760 Future
<TaskQueueQueryAndOwnTasksResponse
> responseFuture
= apiHelper
.makeAsyncCall(
761 "QueryAndOwnTasks", leaseRequest
, new TaskQueueQueryAndOwnTasksResponse(), apiConfig
);
762 return new FutureAdapter
<TaskQueueQueryAndOwnTasksResponse
, List
<TaskHandle
>>(responseFuture
) {
764 protected List
<TaskHandle
> wrap(TaskQueueQueryAndOwnTasksResponse leaseResponse
) {
765 List
<TaskHandle
> result
= new ArrayList
<TaskHandle
>();
766 for (TaskQueueQueryAndOwnTasksResponse
.Task response
: leaseResponse
.tasks()) {
767 TaskOptions taskOptions
= TaskOptions
.Builder
.withTaskName(response
.getTaskName())
768 .payload(response
.getBodyAsBytes())
769 .method(TaskOptions
.Method
.PULL
);
770 if (response
.hasTag()) {
771 taskOptions
.tag(response
.getTagAsBytes());
773 TaskHandle handle
= new TaskHandle(taskOptions
, queueName
, response
.getRetryCount());
774 result
.add(handle
.etaUsec(response
.getEtaUsec()));
783 * See {@link Queue#leaseTasks(long, TimeUnit, long)}.
786 public List
<TaskHandle
> leaseTasks(long lease
, TimeUnit unit
, long countLimit
) {
787 return getInternal(leaseTasksAsync(lease
, unit
, countLimit
));
791 * See {@link Queue#leaseTasksAsync(long, TimeUnit, long)}.
794 public Future
<List
<TaskHandle
>> leaseTasksAsync(
795 long lease
, TimeUnit unit
, long countLimit
) {
796 return leaseTasksInternal(LeaseOptions
.Builder
.withLeasePeriod(lease
, unit
)
797 .countLimit(countLimit
));
801 * See {@link Queue#leaseTasksByTagBytes(long, TimeUnit, long, byte[])}.
804 public List
<TaskHandle
> leaseTasksByTagBytes(
805 long lease
, TimeUnit unit
, long countLimit
, byte[] tag
) {
806 return getInternal(leaseTasksByTagBytesAsync(lease
, unit
, countLimit
, tag
));
810 * See {@link Queue#leaseTasksByTagBytesAsync(long, TimeUnit, long, byte[])}.
813 public Future
<List
<TaskHandle
>> leaseTasksByTagBytesAsync(
814 long lease
, TimeUnit unit
, long countLimit
, byte[] tag
) {
815 LeaseOptions options
= LeaseOptions
.Builder
.withLeasePeriod(lease
, unit
)
816 .countLimit(countLimit
);
820 options
.groupByTag();
822 return leaseTasksInternal(options
);
826 * See {@link Queue#leaseTasksByTag(long, TimeUnit, long, String)}.
829 public List
<TaskHandle
> leaseTasksByTag(long lease
, TimeUnit unit
,
830 long countLimit
, String tag
) {
831 return getInternal(leaseTasksByTagAsync(lease
, unit
, countLimit
, tag
));
835 * See {@link Queue#leaseTasksByTagAsync(long, TimeUnit, long, String)}.
838 public Future
<List
<TaskHandle
>> leaseTasksByTagAsync(
839 long lease
, TimeUnit unit
, long countLimit
, String tag
) {
840 LeaseOptions options
= LeaseOptions
.Builder
.withLeasePeriod(lease
, unit
)
841 .countLimit(countLimit
);
845 options
.groupByTag();
847 return leaseTasksInternal(options
);
851 * See {@link Queue#leaseTasks(LeaseOptions)}.
854 public List
<TaskHandle
> leaseTasks(LeaseOptions options
) {
855 return getInternal(leaseTasksAsync(options
));
859 * See {@link Queue#leaseTasksAsync(LeaseOptions)}.
862 public Future
<List
<TaskHandle
>> leaseTasksAsync(LeaseOptions options
) {
863 if (options
.getLease() == null) {
864 throw new IllegalArgumentException("The lease period must be specified");
866 if (options
.getCountLimit() == null) {
867 throw new IllegalArgumentException("The count limit must be specified");
869 return leaseTasksInternal(options
);
873 * See {@link Queue#modifyTaskLease(TaskHandle, long, TimeUnit)}.
876 public TaskHandle
modifyTaskLease(TaskHandle taskHandle
, long lease
, TimeUnit unit
) {
877 long leaseMillis
= unit
.toMillis(lease
);
878 if (leaseMillis
> QueueConstants
.maxLease(TimeUnit
.MILLISECONDS
)) {
879 throw new IllegalArgumentException(
880 String
.format("The lease time specified (%s seconds) is too large. " +
881 "Lease period can be no longer than %d seconds.",
882 formatLeaseTimeInSeconds(leaseMillis
),
883 QueueConstants
.maxLease(TimeUnit
.SECONDS
)));
885 if (leaseMillis
< 0) {
886 throw new IllegalArgumentException(
887 String
.format("The lease time must not be negative. " +
888 "Specified lease time was %s seconds.",
889 formatLeaseTimeInSeconds(leaseMillis
)));
892 TaskQueueModifyTaskLeaseRequest request
= new TaskQueueModifyTaskLeaseRequest();
893 TaskQueueModifyTaskLeaseResponse response
= new TaskQueueModifyTaskLeaseResponse();
895 request
.setQueueName(this.queueName
);
896 request
.setTaskName(taskHandle
.getName());
897 request
.setLeaseSeconds(leaseMillis
/ 1000.0);
898 request
.setEtaUsec(taskHandle
.getEtaUsec());
900 apiHelper
.makeSyncCall("ModifyTaskLease", request
, response
);
901 taskHandle
.etaUsec(response
.getUpdatedEtaUsec());
905 private String
formatLeaseTimeInSeconds(long milliSeconds
) {
906 long seconds
= TimeUnit
.SECONDS
.convert(milliSeconds
, TimeUnit
.MILLISECONDS
);
907 long remainder
= milliSeconds
- TimeUnit
.MILLISECONDS
.convert(seconds
, TimeUnit
.SECONDS
);
908 String formatString
= milliSeconds
< 0 ?
"-%01d.%03d" : "%01d.%03d";
909 return String
.format(formatString
, Math
.abs(seconds
), Math
.abs(remainder
));
913 * See {@link Queue#fetchStatistics()}.
916 public QueueStatistics
fetchStatistics() {
917 return getInternal(fetchStatisticsAsync(null));
921 * See {@link Queue#fetchStatisticsAsync(Double)}.
924 public Future
<QueueStatistics
> fetchStatisticsAsync( Double deadlineInSeconds
) {
925 if (deadlineInSeconds
== null) {
926 deadlineInSeconds
= DEFAULT_FETCH_STATISTICS_DEADLINE_SECONDS
;
929 if (deadlineInSeconds
<= 0.0) {
930 throw new IllegalArgumentException("Deadline must be > 0, got " +
934 List
<Queue
> queues
= Collections
.<Queue
>singletonList(this);
935 Future
<List
<QueueStatistics
>> future
= QueueStatistics
.fetchForQueuesAsync(
936 queues
, apiHelper
, deadlineInSeconds
);
937 return extractSingleEntry(future
);
940 <T
extends ProtocolMessage
<T
>> Future
<T
> makeAsyncCall(
941 String methodName
, ProtocolMessage
<?
> request
, T response
) {
942 return apiHelper
.makeAsyncCall(methodName
, request
, response
, new ApiConfig());