1 // Copyright 2010 Google Inc. All rights reserved.
2 package com
.google
.appengine
.api
.taskqueue
;
4 import com
.google
.appengine
.api
.NamespaceManager
;
5 import com
.google
.appengine
.api
.datastore
.DatastoreService
;
6 import com
.google
.appengine
.api
.datastore
.DatastoreServiceFactory
;
7 import com
.google
.appengine
.api
.taskqueue
.TaskOptions
.Param
;
8 import com
.google
.appengine
.api
.taskqueue
.TaskQueuePb
.TaskQueueAddRequest
;
9 import com
.google
.appengine
.api
.taskqueue
.TaskQueuePb
.TaskQueueAddRequest
.Header
;
10 import com
.google
.appengine
.api
.taskqueue
.TaskQueuePb
.TaskQueueBulkAddRequest
;
11 import com
.google
.appengine
.api
.taskqueue
.TaskQueuePb
.TaskQueueBulkAddResponse
;
12 import com
.google
.appengine
.api
.taskqueue
.TaskQueuePb
.TaskQueueDeleteRequest
;
13 import com
.google
.appengine
.api
.taskqueue
.TaskQueuePb
.TaskQueueDeleteResponse
;
14 import com
.google
.appengine
.api
.taskqueue
.TaskQueuePb
.TaskQueueMode
;
15 import com
.google
.appengine
.api
.taskqueue
.TaskQueuePb
.TaskQueueModifyTaskLeaseRequest
;
16 import com
.google
.appengine
.api
.taskqueue
.TaskQueuePb
.TaskQueueModifyTaskLeaseResponse
;
17 import com
.google
.appengine
.api
.taskqueue
.TaskQueuePb
.TaskQueuePurgeQueueRequest
;
18 import com
.google
.appengine
.api
.taskqueue
.TaskQueuePb
.TaskQueuePurgeQueueResponse
;
19 import com
.google
.appengine
.api
.taskqueue
.TaskQueuePb
.TaskQueueQueryAndOwnTasksRequest
;
20 import com
.google
.appengine
.api
.taskqueue
.TaskQueuePb
.TaskQueueQueryAndOwnTasksResponse
;
21 import com
.google
.appengine
.api
.taskqueue
.TaskQueuePb
.TaskQueueRetryParameters
;
22 import com
.google
.appengine
.api
.taskqueue
.TaskQueuePb
.TaskQueueServiceError
;
23 import com
.google
.apphosting
.api
.ApiProxy
.ApiConfig
;
25 import java
.io
.UnsupportedEncodingException
;
27 import java
.net
.URISyntaxException
;
28 import java
.util
.ArrayList
;
29 import java
.util
.Arrays
;
30 import java
.util
.Collections
;
31 import java
.util
.HashMap
;
32 import java
.util
.HashSet
;
33 import java
.util
.List
;
34 import java
.util
.Map
.Entry
;
36 import java
.util
.concurrent
.TimeUnit
;
39 * Implements the {@link Queue} interface.
40 * {@link QueueImpl} is thread safe.
43 class QueueImpl
implements Queue
{
44 private final String queueName
;
45 private final DatastoreService datastoreService
= DatastoreServiceFactory
.getDatastoreService();
46 private final QueueApiHelper apiHelper
;
49 * The name of the HTTP header specifying the default namespace
52 static final String DEFAULT_NAMESPACE_HEADER
= "X-AppEngine-Default-Namespace";
53 static final String CURRENT_NAMESPACE_HEADER
= "X-AppEngine-Current-Namespace";
55 static final double DEFAULT_LEASE_TASKS_DEADLINE_SECONDS
= 10.0;
57 QueueImpl(String queueName
, QueueApiHelper apiHelper
) {
58 QueueApiHelper
.validateQueueName(queueName
);
60 this.apiHelper
= apiHelper
;
61 this.queueName
= queueName
;
65 * See {@link Queue#add()}
67 public TaskHandle
add() {
69 getDatastoreService().getCurrentTransaction(null), TaskOptions
.Builder
.withDefaults());
73 * Returns a {@link URI} validated to only contain legal components.
74 * <p>The "scheme", "authority" and "fragment" components of a URI
75 * must not be specified. The path component must be absolute
76 * (i.e. start with "/").
78 * @param urlString The "url" specified by the client.
79 * @throws IllegalArgumentException The provided urlString is null, too long or does not have
82 private URI
parsePartialUrl(String urlString
) {
83 if (urlString
== null) {
84 throw new IllegalArgumentException("url must not be null");
87 if (urlString
.length() > QueueConstants
.maxUrlLength()) {
88 throw new IllegalArgumentException(
89 "url is longer than " + ((Integer
) QueueConstants
.maxUrlLength()).toString() + ".");
94 uri
= new URI(urlString
);
95 } catch (URISyntaxException exception
) {
96 throw new IllegalArgumentException("URL syntax error", exception
);
99 uriCheckNull(uri
.getScheme(), "scheme");
100 uriCheckNull(uri
.getRawAuthority(), "authority");
101 uriCheckNull(uri
.getRawFragment(), "fragment");
102 String path
= uri
.getPath();
104 if (path
== null || path
.length() == 0 || path
.charAt(0) != '/') {
107 } else if (path
.length() == 0) {
108 path
= "<empty string>";
110 throw new IllegalArgumentException(
111 "url must contain a path starting with '/' part - contains :" + path
);
117 private void uriCheckNull(String value
, String valueName
) {
119 throw new IllegalArgumentException(
120 "url must not contain a '" + valueName
+ "' part - contains :" + value
);
124 private void checkPullTask(String url
,
125 HashMap
<String
, List
<String
>> headers
,
127 RetryOptions retryOptions
) {
128 if (url
!= null && !url
.isEmpty()) {
129 throw new IllegalArgumentException("May not specify url in tasks that have method PULL");
131 if (!headers
.isEmpty()) {
132 throw new IllegalArgumentException(
133 "May not specify any header in tasks that have method PULL");
135 if (retryOptions
!= null) {
136 throw new IllegalArgumentException(
137 "May not specify retry options in tasks that have method PULL");
139 if (payload
== null) {
140 throw new IllegalArgumentException("payload must be specified for tasks with method PULL");
144 private void checkPostTask(List
<Param
> params
, byte[] payload
, String query
) {
145 if (query
!= null && query
.length() != 0) {
146 throw new IllegalArgumentException(
147 "POST method may not have a query string; use setParamater(s) instead");
152 * Construct a byte array data from params if payload is not specified.
153 * If it sees payload is specified, return null.
154 * @throws IllegalArgumentException if params and payload both exist
156 private byte[] constructPayloadFromParams(List
<Param
> params
, byte[] payload
) {
157 if (!params
.isEmpty() && payload
!= null) {
158 throw new IllegalArgumentException(
159 "Message body and parameters may not both be present; "
160 + "only one of these may be supplied");
162 return payload
!= null ?
null : encodeParamsPost(params
);
166 private void validateAndFillAddRequest(com
.google
.appengine
.api
.datastore
.Transaction txn
,
167 TaskOptions taskOptions
,
168 TaskQueueAddRequest addRequest
) {
169 boolean useUrlEncodedContentType
= false;
171 HashMap
<String
, List
<String
>> headers
= taskOptions
.getHeaders();
172 String url
= taskOptions
.getUrl();
173 byte[] payload
= taskOptions
.getPayload();
174 List
<Param
> params
= taskOptions
.getParams();
175 RetryOptions retryOptions
= taskOptions
.getRetryOptions();
176 TaskOptions
.Method method
= taskOptions
.getMethod();
180 parsedUrl
= parsePartialUrl(defaultUrl());
182 parsedUrl
= parsePartialUrl(url
);
184 String query
= parsedUrl
.getQuery();
185 StringBuilder relativeUrl
= new StringBuilder(parsedUrl
.getRawPath());
186 if (query
!= null && query
.length() != 0 && !params
.isEmpty()) {
187 throw new IllegalArgumentException(
188 "Query string and parameters both present; only one of these may be supplied");
191 byte[] constructedPayload
;
192 if (method
== TaskOptions
.Method
.PULL
) {
193 constructedPayload
= constructPayloadFromParams(params
, payload
);
194 if (constructedPayload
!= null) {
195 payload
= constructedPayload
;
197 checkPullTask(url
, headers
, payload
, retryOptions
);
198 } else if (method
== TaskOptions
.Method
.POST
) {
199 constructedPayload
= constructPayloadFromParams(params
, payload
);
200 if (constructedPayload
!= null) {
201 payload
= constructedPayload
;
202 useUrlEncodedContentType
= true;
204 checkPostTask(params
, payload
, query
);
206 if (!params
.isEmpty()) {
207 query
= encodeParamsUrlEncoded(params
);
209 if (query
!= null && query
.length() != 0) {
210 relativeUrl
.append("?").append(query
);
213 if (payload
!= null && payload
.length
!= 0 && !taskOptions
.getMethod().supportsBody()) {
214 throw new IllegalArgumentException(
215 taskOptions
.getMethod().toString() + " method may not specify a payload.");
220 taskOptions
.getTaskName(),
221 determineEta(taskOptions
),
223 relativeUrl
.toString(),
227 useUrlEncodedContentType
,
228 taskOptions
.getTagAsBytes(),
232 private void fillAddRequest(com
.google
.appengine
.api
.datastore
.Transaction txn
,
236 TaskOptions
.Method method
,
239 HashMap
<String
, List
<String
>> headers
,
240 RetryOptions retryOptions
,
241 boolean useUrlEncodedContentType
,
243 TaskQueueAddRequest addRequest
) {
244 addRequest
.setQueueName(queueName
);
245 addRequest
.setTaskName(taskName
== null ?
"" : taskName
);
247 if (method
== TaskOptions
.Method
.PULL
) {
248 addRequest
.setMode(TaskQueueMode
.Mode
.PULL
.getValue());
250 addRequest
.setUrl(relativeUrl
.toString());
251 addRequest
.setMode(TaskQueueMode
.Mode
.PUSH
.getValue());
252 addRequest
.setMethod(method
.getPbMethod());
255 if (payload
!= null) {
256 addRequest
.setBodyAsBytes(payload
);
259 addRequest
.setEtaUsec(etaMillis
* 1000);
261 if (taskName
!= null && !taskName
.equals("") && txn
!= null) {
262 throw new IllegalArgumentException(
263 "transactional tasks cannot be named: " + taskName
);
266 addRequest
.setTransaction(localTxnToRemoteTxn(txn
));
269 if (retryOptions
!= null) {
270 fillRetryParameters(retryOptions
, addRequest
.getMutableRetryParameters());
273 if (NamespaceManager
.getGoogleAppsNamespace().length() != 0) {
274 if (!headers
.containsKey(DEFAULT_NAMESPACE_HEADER
)) {
275 headers
.put(DEFAULT_NAMESPACE_HEADER
,
276 Arrays
.asList(NamespaceManager
.getGoogleAppsNamespace()));
279 if (!headers
.containsKey(CURRENT_NAMESPACE_HEADER
)) {
280 String namespace
= NamespaceManager
.get();
281 headers
.put(CURRENT_NAMESPACE_HEADER
, Arrays
.asList(namespace
== null ?
"" : namespace
));
283 for (Entry
<String
, List
<String
>> entry
: headers
.entrySet()) {
284 if (useUrlEncodedContentType
&& entry
.getKey().toLowerCase().equals("content-type")) {
288 for (String value
: entry
.getValue()) {
289 Header header
= addRequest
.addHeader();
290 header
.setKey(entry
.getKey());
291 header
.setValue(value
);
294 if (useUrlEncodedContentType
) {
295 Header contentTypeHeader
= addRequest
.addHeader();
296 contentTypeHeader
.setKey("content-type");
297 contentTypeHeader
.setValue("application/x-www-form-urlencoded");
301 if (method
!= TaskOptions
.Method
.PULL
) {
302 throw new IllegalArgumentException("Only PULL tasks can have a tag.");
304 if (tag
.length
> QueueConstants
.maxTaskTagLength()) {
305 throw new IllegalArgumentException(
306 "Task tag must be no more than " + QueueConstants
.maxTaskTagLength() + " bytes.");
308 addRequest
.setTagAsBytes(tag
);
311 if (method
== TaskOptions
.Method
.PULL
) {
312 if (addRequest
.encodingSize() > QueueConstants
.maxPullTaskSizeBytes()) {
313 throw new IllegalArgumentException("Task size too large");
316 if (addRequest
.encodingSize() > QueueConstants
.maxPushTaskSizeBytes()) {
317 throw new IllegalArgumentException("Task size too large");
323 * Translates a local transaction to the Datastore PB.
324 * Due to pb dependency issues, Transaction pb is redefined for TaskQueue.
325 * Keep in sync with DatastoreServiceImpl.localTxnToRemoteTxn.
327 private static Transaction
localTxnToRemoteTxn(
328 com
.google
.appengine
.api
.datastore
.Transaction local
) {
329 Transaction remote
= new Transaction();
330 remote
.setApp(local
.getApp());
331 remote
.setHandle(Long
.parseLong(local
.getId()));
336 * Translates from RetryOptions to TaskQueueRetryParameters.
337 * Also checks ensures minBackoffSeconds and maxBackoffSeconds are ordered
340 private static void fillRetryParameters(
341 RetryOptions retryOptions
,
342 TaskQueueRetryParameters retryParameters
) {
343 if (retryOptions
.getTaskRetryLimit() != null) {
344 retryParameters
.setRetryLimit(retryOptions
.getTaskRetryLimit());
346 if (retryOptions
.getTaskAgeLimitSeconds() != null) {
347 retryParameters
.setAgeLimitSec(retryOptions
.getTaskAgeLimitSeconds());
349 if (retryOptions
.getMinBackoffSeconds() != null) {
350 retryParameters
.setMinBackoffSec(retryOptions
.getMinBackoffSeconds());
352 if (retryOptions
.getMaxBackoffSeconds() != null) {
353 retryParameters
.setMaxBackoffSec(retryOptions
.getMaxBackoffSeconds());
355 if (retryOptions
.getMaxDoublings() != null) {
356 retryParameters
.setMaxDoublings(retryOptions
.getMaxDoublings());
359 if (retryParameters
.hasMinBackoffSec() && retryParameters
.hasMaxBackoffSec()) {
360 if (retryParameters
.getMinBackoffSec() > retryParameters
.getMaxBackoffSec()) {
361 throw new IllegalArgumentException(
362 "minBackoffSeconds must not be greater than maxBackoffSeconds.");
364 } else if (retryParameters
.hasMinBackoffSec()) {
365 if (retryParameters
.getMinBackoffSec() > retryParameters
.getMaxBackoffSec()) {
366 retryParameters
.setMaxBackoffSec(retryParameters
.getMinBackoffSec());
368 } else if (retryParameters
.hasMaxBackoffSec()) {
369 if (retryParameters
.getMinBackoffSec() > retryParameters
.getMaxBackoffSec()) {
370 retryParameters
.setMinBackoffSec(retryParameters
.getMaxBackoffSec());
376 * See {@link Queue#add(TaskOptions)}.
378 public TaskHandle
add(TaskOptions taskOptions
) {
379 return add(getDatastoreService().getCurrentTransaction(null), taskOptions
);
383 * See {@link Queue#add(Iterable)}.
385 public List
<TaskHandle
> add(Iterable
<TaskOptions
> taskOptions
) {
386 return add(getDatastoreService().getCurrentTransaction(null), taskOptions
);
390 * See {@link Queue#add(com.google.appengine.api.datastore.Transaction, TaskOptions)}.
392 public TaskHandle
add(com
.google
.appengine
.api
.datastore
.Transaction txn
,
393 TaskOptions taskOptions
) {
394 return add(txn
, Collections
.singletonList(taskOptions
)).get(0);
399 * Queue#add(com.google.appengine.api.datastore.Transaction, Iterable)}.
401 public List
<TaskHandle
> add(com
.google
.appengine
.api
.datastore
.Transaction txn
,
402 Iterable
<TaskOptions
> taskOptions
) {
403 List
<TaskOptions
> taskOptionsList
= new ArrayList
<TaskOptions
>();
404 Set
<String
> taskNames
= new HashSet
<String
>();
406 TaskQueueBulkAddRequest bulkAddRequest
= new TaskQueueBulkAddRequest();
407 TaskQueueBulkAddResponse bulkAddResponse
= new TaskQueueBulkAddResponse();
409 boolean hasPushTask
= false;
410 boolean hasPullTask
= false;
411 for (TaskOptions option
: taskOptions
) {
412 TaskQueueAddRequest addRequest
= bulkAddRequest
.addAddRequest();
413 validateAndFillAddRequest(txn
, option
, addRequest
);
414 if (addRequest
.getMode() == TaskQueueMode
.Mode
.PULL
.getValue()) {
420 taskOptionsList
.add(option
);
421 if (option
.getTaskName() != null && !option
.getTaskName().equals("")) {
422 if (!taskNames
.add(option
.getTaskName())) {
423 throw new IllegalArgumentException(
424 String
.format("Identical task names in request : \"%s\" duplicated",
425 option
.getTaskName()));
429 if (bulkAddRequest
.addRequestSize() > QueueConstants
.maxTasksPerAdd()) {
430 throw new IllegalArgumentException(
431 String
.format("No more than %d tasks can be added in a single add call",
432 QueueConstants
.maxTasksPerAdd()));
435 if (hasPullTask
&& hasPushTask
) {
436 throw new IllegalArgumentException(
437 "May not add both push tasks and pull tasks in the same call.");
441 bulkAddRequest
.encodingSize() > QueueConstants
.maxTransactionalRequestSizeBytes()) {
442 throw new IllegalArgumentException(
443 String
.format("Transactional add may not be larger than %d bytes: %d bytes requested.",
444 QueueConstants
.maxTransactionalRequestSizeBytes(),
445 bulkAddRequest
.encodingSize()));
448 apiHelper
.makeSyncCall("BulkAdd", bulkAddRequest
, bulkAddResponse
);
450 if (bulkAddResponse
.taskResultSize() != bulkAddRequest
.addRequestSize()) {
451 throw new InternalFailureException(
452 String
.format("expected %d results from BulkAdd(), got %d",
453 bulkAddRequest
.addRequestSize(), bulkAddResponse
.taskResultSize()));
456 List
<TaskHandle
> tasks
= new ArrayList
<TaskHandle
>();
457 for (int i
= 0; i
< bulkAddResponse
.taskResultSize(); ++i
) {
458 TaskQueueBulkAddResponse
.TaskResult taskResult
= bulkAddResponse
.taskResults().get(i
);
459 TaskQueueAddRequest addRequest
= bulkAddRequest
.getAddRequest(i
);
460 TaskOptions options
= taskOptionsList
.get(i
);
462 if (taskResult
.getResult() == TaskQueueServiceError
.ErrorCode
.OK
.getValue()) {
463 String taskName
= options
.getTaskName();
464 if (taskResult
.hasChosenTaskName()) {
465 taskName
= taskResult
.getChosenTaskName();
467 TaskOptions taskResultOptions
= new TaskOptions(options
);
468 taskResultOptions
.taskName(taskName
).payload(addRequest
.getBodyAsBytes());
469 TaskHandle handle
= new TaskHandle(taskResultOptions
, queueName
);
470 tasks
.add(handle
.etaUsec(addRequest
.getEtaUsec()));
471 } else if (taskResult
.getResult() != TaskQueueServiceError
.ErrorCode
.SKIPPED
.getValue()) {
472 throw QueueApiHelper
.translateError(taskResult
.getResult(), "");
478 long currentTimeMillis() {
479 return System
.currentTimeMillis();
482 private long determineEta(TaskOptions taskOptions
) {
483 Long etaMillis
= taskOptions
.getEtaMillis();
484 Long countdownMillis
= taskOptions
.getCountdownMillis();
485 if (etaMillis
== null) {
486 if (countdownMillis
== null) {
487 return currentTimeMillis();
489 if (countdownMillis
> QueueConstants
.getMaxEtaDeltaMillis()) {
490 throw new IllegalArgumentException("ETA too far into the future");
492 if (countdownMillis
< 0) {
493 throw new IllegalArgumentException("Negative countdown is not allowed");
495 return currentTimeMillis() + countdownMillis
;
498 if (countdownMillis
== null) {
499 if (etaMillis
- currentTimeMillis() > QueueConstants
.getMaxEtaDeltaMillis()) {
500 throw new IllegalArgumentException("ETA too far into the future");
503 throw new IllegalArgumentException("Negative ETA is invalid");
507 throw new IllegalArgumentException(
508 "Only one or neither of EtaMillis and CountdownMillis may be specified");
513 byte[] encodeParamsPost(List
<Param
> params
) {
516 payload
= encodeParamsUrlEncoded(params
).getBytes("UTF-8");
517 } catch (UnsupportedEncodingException exception
) {
518 throw new UnsupportedTranslationException(exception
);
524 String
encodeParamsUrlEncoded(List
<Param
> params
) {
525 StringBuilder result
= new StringBuilder();
527 String appender
= "";
528 for (Param param
: params
) {
529 result
.append(appender
);
531 result
.append(param
.getURLEncodedName());
533 result
.append(param
.getURLEncodedValue());
535 } catch (UnsupportedEncodingException exception
) {
536 throw new UnsupportedTranslationException(exception
);
538 return result
.toString();
541 private String
defaultUrl() {
542 return DEFAULT_QUEUE_PATH
+ "/" + queueName
;
546 * See {@link Queue#getQueueName()}.
548 public String
getQueueName() {
552 DatastoreService
getDatastoreService() {
553 return datastoreService
;
557 * See {@link Queue#purge()}.
559 public void purge() {
560 TaskQueuePurgeQueueRequest purgeRequest
= new TaskQueuePurgeQueueRequest();
561 TaskQueuePurgeQueueResponse purgeResponse
= new TaskQueuePurgeQueueResponse();
563 purgeRequest
.setQueueName(queueName
);
564 apiHelper
.makeSyncCall("PurgeQueue", purgeRequest
, purgeResponse
);
568 * See {@link Queue#deleteTask(String)}.
571 public boolean deleteTask(String taskName
) {
572 TaskHandle
.validateTaskName(taskName
);
573 return deleteTask(new TaskHandle(TaskOptions
.Builder
.withTaskName(taskName
),
578 * See {@link Queue#deleteTask(TaskHandle)}.
581 public boolean deleteTask(TaskHandle taskHandle
) {
582 List
<TaskHandle
> taskHandles
= new ArrayList
<TaskHandle
>(1);
583 taskHandles
.add(taskHandle
);
584 List
<Boolean
> result
= deleteTask(taskHandles
);
585 return result
.get(0);
589 * See {@link Queue#deleteTask(List<TaskHandle>)}.
592 public List
<Boolean
> deleteTask(List
<TaskHandle
> taskHandles
) {
594 TaskQueueDeleteRequest deleteRequest
= new TaskQueueDeleteRequest();
595 TaskQueueDeleteResponse deleteResponse
= new TaskQueueDeleteResponse();
596 deleteRequest
.setQueueName(queueName
);
598 for (TaskHandle taskHandle
: taskHandles
) {
599 if (taskHandle
.getQueueName().equals(this.queueName
)) {
600 deleteRequest
.addTaskName(taskHandle
.getName());
602 throw new QueueNameMismatchException(
603 String
.format("The task %s is associated with the queue named %s "
604 + "and cannot be deleted from the queue named %s.",
605 taskHandle
.getName(), taskHandle
.getQueueName(), this.queueName
));
609 apiHelper
.makeSyncCall("Delete", deleteRequest
, deleteResponse
);
611 List
<Boolean
> result
= new ArrayList
<Boolean
>(deleteResponse
.resultSize());
613 for (int i
= 0; i
< deleteResponse
.resultSize(); ++i
) {
614 int errorCode
= deleteResponse
.getResult(i
);
615 if (errorCode
!= TaskQueueServiceError
.ErrorCode
.OK
.getValue() &&
616 errorCode
!= TaskQueueServiceError
.ErrorCode
.TOMBSTONED_TASK
.getValue() &&
617 errorCode
!= TaskQueueServiceError
.ErrorCode
.UNKNOWN_TASK
.getValue()) {
618 throw QueueApiHelper
.translateError(errorCode
, "");
620 result
.add(errorCode
== TaskQueueServiceError
.ErrorCode
.OK
.getValue());
626 private List
<TaskHandle
> leaseTasksInternal(LeaseOptions options
) {
627 long leaseMillis
= options
.getUnit().toMillis(options
.getLease());
628 if (leaseMillis
> QueueConstants
.maxLease(TimeUnit
.MILLISECONDS
)) {
629 throw new IllegalArgumentException(
630 String
.format("A lease period can be no longer than %d seconds",
631 QueueConstants
.maxLease(TimeUnit
.SECONDS
)));
634 if (options
.getCountLimit() > QueueConstants
.maxLeaseCount()) {
635 throw new IllegalArgumentException(
636 String
.format("No more than %d tasks can be leased in one call",
637 QueueConstants
.maxLeaseCount()));
640 TaskQueueQueryAndOwnTasksRequest leaseRequest
= new TaskQueueQueryAndOwnTasksRequest();
641 TaskQueueQueryAndOwnTasksResponse leaseResponse
= new TaskQueueQueryAndOwnTasksResponse();
643 leaseRequest
.setQueueName(queueName
);
644 leaseRequest
.setLeaseSeconds(leaseMillis
/ 1000.0);
645 leaseRequest
.setMaxTasks(options
.getCountLimit());
646 if (options
.getGroupByTag()) {
647 leaseRequest
.setGroupByTag(true);
648 if (options
.getTag() != null) {
649 leaseRequest
.setTagAsBytes(options
.getTag());
653 ApiConfig apiConfig
= new ApiConfig();
654 if (options
.getDeadlineInSeconds() == null) {
655 apiConfig
.setDeadlineInSeconds(DEFAULT_LEASE_TASKS_DEADLINE_SECONDS
);
657 apiConfig
.setDeadlineInSeconds(options
.getDeadlineInSeconds());
659 apiHelper
.makeSyncCall("QueryAndOwnTasks", leaseRequest
, leaseResponse
, apiConfig
);
661 List
<TaskHandle
> result
= new ArrayList
<TaskHandle
>();
662 for (TaskQueueQueryAndOwnTasksResponse
.Task response
: leaseResponse
.tasks()) {
663 TaskOptions taskOptions
= TaskOptions
.Builder
.withTaskName(response
.getTaskName())
664 .payload(response
.getBodyAsBytes())
665 .method(TaskOptions
.Method
.PULL
);
666 if (response
.hasTag()) {
667 taskOptions
.tag(response
.getTagAsBytes());
669 TaskHandle handle
= new TaskHandle(taskOptions
, queueName
, response
.getRetryCount());
670 result
.add(handle
.etaUsec(response
.getEtaUsec()));
677 * See {@link Queue#leaseTasks(long, TimeUnit, long)}.
680 public List
<TaskHandle
> leaseTasks(long lease
, TimeUnit unit
, long countLimit
) {
681 return leaseTasksInternal(LeaseOptions
.Builder
.withLeasePeriod(lease
, unit
)
682 .countLimit(countLimit
));
686 * See {@link Queue#leaseTasksByTagBytes(long, TimeUnit, long, byte[])}.
689 public List
<TaskHandle
> leaseTasksByTagBytes(
690 long lease
, TimeUnit unit
, long countLimit
, byte[] tag
) {
691 LeaseOptions options
= LeaseOptions
.Builder
.withLeasePeriod(lease
, unit
)
692 .countLimit(countLimit
);
696 options
.groupByTag();
698 return leaseTasksInternal(options
);
702 * See {@link Queue#leaseTasksByTag(long, TimeUnit, long, String)}.
705 public List
<TaskHandle
> leaseTasksByTag(long lease
, TimeUnit unit
,
706 long countLimit
, String tag
) {
707 LeaseOptions options
= LeaseOptions
.Builder
.withLeasePeriod(lease
, unit
)
708 .countLimit(countLimit
);
712 options
.groupByTag();
714 return leaseTasksInternal(options
);
718 * See {@link Queue#leaseTasks(LeaseOptions)}.
721 public List
<TaskHandle
> leaseTasks(LeaseOptions options
) {
722 if (options
.getLease() == null) {
723 throw new IllegalArgumentException("The lease period must be specified");
725 if (options
.getCountLimit() == null) {
726 throw new IllegalArgumentException("The count limit must be specified");
728 return leaseTasksInternal(options
);
732 * See {@link Queue#modifyTaskLease(TaskHandle, long, TimeUnit)}.
735 public TaskHandle
modifyTaskLease(TaskHandle taskHandle
, long lease
, TimeUnit unit
) {
736 long leaseMillis
= unit
.toMillis(lease
);
737 if (leaseMillis
> QueueConstants
.maxLease(TimeUnit
.MILLISECONDS
)) {
738 throw new IllegalArgumentException(
739 String
.format("The lease time specified (%s seconds) is too large. " +
740 "Lease period can be no longer than %d seconds.",
741 formatLeaseTimeInSeconds(leaseMillis
),
742 QueueConstants
.maxLease(TimeUnit
.SECONDS
)));
744 if (leaseMillis
< 0) {
745 throw new IllegalArgumentException(
746 String
.format("The lease time must not be negative. " +
747 "Specified lease time was %s seconds.",
748 formatLeaseTimeInSeconds(leaseMillis
)));
751 TaskQueueModifyTaskLeaseRequest request
= new TaskQueueModifyTaskLeaseRequest();
752 TaskQueueModifyTaskLeaseResponse response
= new TaskQueueModifyTaskLeaseResponse();
754 request
.setQueueName(this.queueName
);
755 request
.setTaskName(taskHandle
.getName());
756 request
.setLeaseSeconds(leaseMillis
/ 1000.0);
757 request
.setEtaUsec(taskHandle
.getEtaUsec());
759 apiHelper
.makeSyncCall("ModifyTaskLease", request
, response
);
760 taskHandle
.etaUsec(response
.getUpdatedEtaUsec());
764 private String
formatLeaseTimeInSeconds(long milliSeconds
) {
765 long seconds
= TimeUnit
.SECONDS
.convert(milliSeconds
, TimeUnit
.MILLISECONDS
);
766 long remainder
= milliSeconds
- TimeUnit
.MILLISECONDS
.convert(seconds
, TimeUnit
.SECONDS
);
767 String formatString
= milliSeconds
< 0 ?
"-%01d.%03d" : "%01d.%03d";
768 return String
.format(formatString
, Math
.abs(seconds
), Math
.abs(remainder
));
772 * See {@link Queue#fetchStatistics()}.
775 public QueueStatistics
fetchStatistics() {
776 List
<Queue
> queues
= Collections
.<Queue
>singletonList(this);
777 List
<QueueStatistics
> stats
= QueueStatistics
.fetchForQueues(queues
, apiHelper
);
778 if (stats
.size() != 1) {
779 throw new IllegalStateException(
780 "An internal error occurred while obtaining queue statistics");