App Engine Python SDK version 1.7.4 (2)
[gae.git] / java / src / main / com / google / appengine / api / taskqueue / QueueImpl.java
blobabe583c872e0cfbbfe0d36feed55f478f7a2ac53
1 // Copyright 2010 Google Inc. All rights reserved.
2 package com.google.appengine.api.taskqueue;
4 import com.google.appengine.api.NamespaceManager;
5 import com.google.appengine.api.datastore.DatastoreService;
6 import com.google.appengine.api.datastore.DatastoreServiceFactory;
7 import com.google.appengine.api.taskqueue.TaskOptions.Param;
8 import com.google.appengine.api.taskqueue.TaskQueuePb.TaskQueueAddRequest;
9 import com.google.appengine.api.taskqueue.TaskQueuePb.TaskQueueAddRequest.Header;
10 import com.google.appengine.api.taskqueue.TaskQueuePb.TaskQueueBulkAddRequest;
11 import com.google.appengine.api.taskqueue.TaskQueuePb.TaskQueueBulkAddResponse;
12 import com.google.appengine.api.taskqueue.TaskQueuePb.TaskQueueDeleteRequest;
13 import com.google.appengine.api.taskqueue.TaskQueuePb.TaskQueueDeleteResponse;
14 import com.google.appengine.api.taskqueue.TaskQueuePb.TaskQueueMode;
15 import com.google.appengine.api.taskqueue.TaskQueuePb.TaskQueueModifyTaskLeaseRequest;
16 import com.google.appengine.api.taskqueue.TaskQueuePb.TaskQueueModifyTaskLeaseResponse;
17 import com.google.appengine.api.taskqueue.TaskQueuePb.TaskQueuePurgeQueueRequest;
18 import com.google.appengine.api.taskqueue.TaskQueuePb.TaskQueuePurgeQueueResponse;
19 import com.google.appengine.api.taskqueue.TaskQueuePb.TaskQueueQueryAndOwnTasksRequest;
20 import com.google.appengine.api.taskqueue.TaskQueuePb.TaskQueueQueryAndOwnTasksResponse;
21 import com.google.appengine.api.taskqueue.TaskQueuePb.TaskQueueRetryParameters;
22 import com.google.appengine.api.taskqueue.TaskQueuePb.TaskQueueServiceError;
23 import com.google.apphosting.api.ApiProxy.ApiConfig;
25 import java.io.UnsupportedEncodingException;
26 import java.net.URI;
27 import java.net.URISyntaxException;
28 import java.util.ArrayList;
29 import java.util.Arrays;
30 import java.util.Collections;
31 import java.util.HashMap;
32 import java.util.HashSet;
33 import java.util.List;
34 import java.util.Map.Entry;
35 import java.util.Set;
36 import java.util.concurrent.TimeUnit;
38 /**
39 * Implements the {@link Queue} interface.
40 * {@link QueueImpl} is thread safe.
43 class QueueImpl implements Queue {
44 private final String queueName;
45 private final DatastoreService datastoreService = DatastoreServiceFactory.getDatastoreService();
46 private final QueueApiHelper apiHelper;
48 /**
49 * The name of the HTTP header specifying the default namespace
50 * for API calls.
52 static final String DEFAULT_NAMESPACE_HEADER = "X-AppEngine-Default-Namespace";
53 static final String CURRENT_NAMESPACE_HEADER = "X-AppEngine-Current-Namespace";
55 static final double DEFAULT_LEASE_TASKS_DEADLINE_SECONDS = 10.0;
57 QueueImpl(String queueName, QueueApiHelper apiHelper) {
58 QueueApiHelper.validateQueueName(queueName);
60 this.apiHelper = apiHelper;
61 this.queueName = queueName;
64 /**
65 * See {@link Queue#add()}
67 public TaskHandle add() {
68 return add(
69 getDatastoreService().getCurrentTransaction(null), TaskOptions.Builder.withDefaults());
72 /**
73 * Returns a {@link URI} validated to only contain legal components.
74 * <p>The "scheme", "authority" and "fragment" components of a URI
75 * must not be specified. The path component must be absolute
76 * (i.e. start with "/").
78 * @param urlString The "url" specified by the client.
79 * @throws IllegalArgumentException The provided urlString is null, too long or does not have
80 * correct syntax.
82 private URI parsePartialUrl(String urlString) {
83 if (urlString == null) {
84 throw new IllegalArgumentException("url must not be null");
87 if (urlString.length() > QueueConstants.maxUrlLength()) {
88 throw new IllegalArgumentException(
89 "url is longer than " + ((Integer) QueueConstants.maxUrlLength()).toString() + ".");
92 URI uri;
93 try {
94 uri = new URI(urlString);
95 } catch (URISyntaxException exception) {
96 throw new IllegalArgumentException("URL syntax error", exception);
99 uriCheckNull(uri.getScheme(), "scheme");
100 uriCheckNull(uri.getRawAuthority(), "authority");
101 uriCheckNull(uri.getRawFragment(), "fragment");
102 String path = uri.getPath();
104 if (path == null || path.length() == 0 || path.charAt(0) != '/') {
105 if (path == null) {
106 path = "(null)";
107 } else if (path.length() == 0) {
108 path = "<empty string>";
110 throw new IllegalArgumentException(
111 "url must contain a path starting with '/' part - contains :" + path);
114 return uri;
117 private void uriCheckNull(String value, String valueName) {
118 if (value != null) {
119 throw new IllegalArgumentException(
120 "url must not contain a '" + valueName + "' part - contains :" + value);
124 private void checkPullTask(String url,
125 HashMap<String, List<String>> headers,
126 byte[] payload,
127 RetryOptions retryOptions) {
128 if (url != null && !url.isEmpty()) {
129 throw new IllegalArgumentException("May not specify url in tasks that have method PULL");
131 if (!headers.isEmpty()) {
132 throw new IllegalArgumentException(
133 "May not specify any header in tasks that have method PULL");
135 if (retryOptions != null) {
136 throw new IllegalArgumentException(
137 "May not specify retry options in tasks that have method PULL");
139 if (payload == null) {
140 throw new IllegalArgumentException("payload must be specified for tasks with method PULL");
144 private void checkPostTask(List<Param> params, byte[] payload, String query) {
145 if (query != null && query.length() != 0) {
146 throw new IllegalArgumentException(
147 "POST method may not have a query string; use setParamater(s) instead");
152 * Construct a byte array data from params if payload is not specified.
153 * If it sees payload is specified, return null.
154 * @throws IllegalArgumentException if params and payload both exist
156 private byte[] constructPayloadFromParams(List<Param> params, byte[] payload) {
157 if (!params.isEmpty() && payload != null) {
158 throw new IllegalArgumentException(
159 "Message body and parameters may not both be present; "
160 + "only one of these may be supplied");
162 return payload != null ? null : encodeParamsPost(params);
166 private void validateAndFillAddRequest(com.google.appengine.api.datastore.Transaction txn,
167 TaskOptions taskOptions,
168 TaskQueueAddRequest addRequest) {
169 boolean useUrlEncodedContentType = false;
171 HashMap<String, List<String>> headers = taskOptions.getHeaders();
172 String url = taskOptions.getUrl();
173 byte[] payload = taskOptions.getPayload();
174 List<Param> params = taskOptions.getParams();
175 RetryOptions retryOptions = taskOptions.getRetryOptions();
176 TaskOptions.Method method = taskOptions.getMethod();
178 URI parsedUrl;
179 if (url == null) {
180 parsedUrl = parsePartialUrl(defaultUrl());
181 } else {
182 parsedUrl = parsePartialUrl(url);
184 String query = parsedUrl.getQuery();
185 StringBuilder relativeUrl = new StringBuilder(parsedUrl.getRawPath());
186 if (query != null && query.length() != 0 && !params.isEmpty()) {
187 throw new IllegalArgumentException(
188 "Query string and parameters both present; only one of these may be supplied");
191 byte[] constructedPayload;
192 if (method == TaskOptions.Method.PULL) {
193 constructedPayload = constructPayloadFromParams(params, payload);
194 if (constructedPayload != null) {
195 payload = constructedPayload;
197 checkPullTask(url, headers, payload, retryOptions);
198 } else if (method == TaskOptions.Method.POST) {
199 constructedPayload = constructPayloadFromParams(params, payload);
200 if (constructedPayload != null) {
201 payload = constructedPayload;
202 useUrlEncodedContentType = true;
204 checkPostTask(params, payload, query);
205 } else {
206 if (!params.isEmpty()) {
207 query = encodeParamsUrlEncoded(params);
209 if (query != null && query.length() != 0) {
210 relativeUrl.append("?").append(query);
213 if (payload != null && payload.length != 0 && !taskOptions.getMethod().supportsBody()) {
214 throw new IllegalArgumentException(
215 taskOptions.getMethod().toString() + " method may not specify a payload.");
218 fillAddRequest(txn,
219 queueName,
220 taskOptions.getTaskName(),
221 determineEta(taskOptions),
222 method,
223 relativeUrl.toString(),
224 payload,
225 headers,
226 retryOptions,
227 useUrlEncodedContentType,
228 taskOptions.getTagAsBytes(),
229 addRequest);
232 private void fillAddRequest(com.google.appengine.api.datastore.Transaction txn,
233 String queueName,
234 String taskName,
235 long etaMillis,
236 TaskOptions.Method method,
237 String relativeUrl,
238 byte[] payload,
239 HashMap<String, List<String>> headers,
240 RetryOptions retryOptions,
241 boolean useUrlEncodedContentType,
242 byte[] tag,
243 TaskQueueAddRequest addRequest) {
244 addRequest.setQueueName(queueName);
245 addRequest.setTaskName(taskName == null ? "" : taskName);
247 if (method == TaskOptions.Method.PULL) {
248 addRequest.setMode(TaskQueueMode.Mode.PULL.getValue());
249 } else {
250 addRequest.setUrl(relativeUrl.toString());
251 addRequest.setMode(TaskQueueMode.Mode.PUSH.getValue());
252 addRequest.setMethod(method.getPbMethod());
255 if (payload != null) {
256 addRequest.setBodyAsBytes(payload);
259 addRequest.setEtaUsec(etaMillis * 1000);
261 if (taskName != null && !taskName.equals("") && txn != null) {
262 throw new IllegalArgumentException(
263 "transactional tasks cannot be named: " + taskName);
265 if (txn != null) {
266 addRequest.setTransaction(localTxnToRemoteTxn(txn));
269 if (retryOptions != null) {
270 fillRetryParameters(retryOptions, addRequest.getMutableRetryParameters());
273 if (NamespaceManager.getGoogleAppsNamespace().length() != 0) {
274 if (!headers.containsKey(DEFAULT_NAMESPACE_HEADER)) {
275 headers.put(DEFAULT_NAMESPACE_HEADER,
276 Arrays.asList(NamespaceManager.getGoogleAppsNamespace()));
279 if (!headers.containsKey(CURRENT_NAMESPACE_HEADER)) {
280 String namespace = NamespaceManager.get();
281 headers.put(CURRENT_NAMESPACE_HEADER, Arrays.asList(namespace == null ? "" : namespace));
283 for (Entry<String, List<String>> entry : headers.entrySet()) {
284 if (useUrlEncodedContentType && entry.getKey().toLowerCase().equals("content-type")) {
285 continue;
288 for (String value : entry.getValue()) {
289 Header header = addRequest.addHeader();
290 header.setKey(entry.getKey());
291 header.setValue(value);
294 if (useUrlEncodedContentType) {
295 Header contentTypeHeader = addRequest.addHeader();
296 contentTypeHeader.setKey("content-type");
297 contentTypeHeader.setValue("application/x-www-form-urlencoded");
300 if (tag != null) {
301 if (method != TaskOptions.Method.PULL) {
302 throw new IllegalArgumentException("Only PULL tasks can have a tag.");
304 if (tag.length > QueueConstants.maxTaskTagLength()) {
305 throw new IllegalArgumentException(
306 "Task tag must be no more than " + QueueConstants.maxTaskTagLength() + " bytes.");
308 addRequest.setTagAsBytes(tag);
311 if (method == TaskOptions.Method.PULL) {
312 if (addRequest.encodingSize() > QueueConstants.maxPullTaskSizeBytes()) {
313 throw new IllegalArgumentException("Task size too large");
315 } else {
316 if (addRequest.encodingSize() > QueueConstants.maxPushTaskSizeBytes()) {
317 throw new IllegalArgumentException("Task size too large");
323 * Translates a local transaction to the Datastore PB.
324 * Due to pb dependency issues, Transaction pb is redefined for TaskQueue.
325 * Keep in sync with DatastoreServiceImpl.localTxnToRemoteTxn.
327 private static Transaction localTxnToRemoteTxn(
328 com.google.appengine.api.datastore.Transaction local) {
329 Transaction remote = new Transaction();
330 remote.setApp(local.getApp());
331 remote.setHandle(Long.parseLong(local.getId()));
332 return remote;
336 * Translates from RetryOptions to TaskQueueRetryParameters.
337 * Also checks ensures minBackoffSeconds and maxBackoffSeconds are ordered
338 * correctly.
340 private static void fillRetryParameters(
341 RetryOptions retryOptions,
342 TaskQueueRetryParameters retryParameters) {
343 if (retryOptions.getTaskRetryLimit() != null) {
344 retryParameters.setRetryLimit(retryOptions.getTaskRetryLimit());
346 if (retryOptions.getTaskAgeLimitSeconds() != null) {
347 retryParameters.setAgeLimitSec(retryOptions.getTaskAgeLimitSeconds());
349 if (retryOptions.getMinBackoffSeconds() != null) {
350 retryParameters.setMinBackoffSec(retryOptions.getMinBackoffSeconds());
352 if (retryOptions.getMaxBackoffSeconds() != null) {
353 retryParameters.setMaxBackoffSec(retryOptions.getMaxBackoffSeconds());
355 if (retryOptions.getMaxDoublings() != null) {
356 retryParameters.setMaxDoublings(retryOptions.getMaxDoublings());
359 if (retryParameters.hasMinBackoffSec() && retryParameters.hasMaxBackoffSec()) {
360 if (retryParameters.getMinBackoffSec() > retryParameters.getMaxBackoffSec()) {
361 throw new IllegalArgumentException(
362 "minBackoffSeconds must not be greater than maxBackoffSeconds.");
364 } else if (retryParameters.hasMinBackoffSec()) {
365 if (retryParameters.getMinBackoffSec() > retryParameters.getMaxBackoffSec()) {
366 retryParameters.setMaxBackoffSec(retryParameters.getMinBackoffSec());
368 } else if (retryParameters.hasMaxBackoffSec()) {
369 if (retryParameters.getMinBackoffSec() > retryParameters.getMaxBackoffSec()) {
370 retryParameters.setMinBackoffSec(retryParameters.getMaxBackoffSec());
376 * See {@link Queue#add(TaskOptions)}.
378 public TaskHandle add(TaskOptions taskOptions) {
379 return add(getDatastoreService().getCurrentTransaction(null), taskOptions);
383 * See {@link Queue#add(Iterable)}.
385 public List<TaskHandle> add(Iterable<TaskOptions> taskOptions) {
386 return add(getDatastoreService().getCurrentTransaction(null), taskOptions);
390 * See {@link Queue#add(com.google.appengine.api.datastore.Transaction, TaskOptions)}.
392 public TaskHandle add(com.google.appengine.api.datastore.Transaction txn,
393 TaskOptions taskOptions) {
394 return add(txn, Collections.singletonList(taskOptions)).get(0);
398 * See {@link
399 * Queue#add(com.google.appengine.api.datastore.Transaction, Iterable)}.
401 public List<TaskHandle> add(com.google.appengine.api.datastore.Transaction txn,
402 Iterable<TaskOptions> taskOptions) {
403 List<TaskOptions> taskOptionsList = new ArrayList<TaskOptions>();
404 Set<String> taskNames = new HashSet<String>();
406 TaskQueueBulkAddRequest bulkAddRequest = new TaskQueueBulkAddRequest();
407 TaskQueueBulkAddResponse bulkAddResponse = new TaskQueueBulkAddResponse();
409 boolean hasPushTask = false;
410 boolean hasPullTask = false;
411 for (TaskOptions option : taskOptions) {
412 TaskQueueAddRequest addRequest = bulkAddRequest.addAddRequest();
413 validateAndFillAddRequest(txn, option, addRequest);
414 if (addRequest.getMode() == TaskQueueMode.Mode.PULL.getValue()) {
415 hasPullTask = true;
416 } else {
417 hasPushTask = true;
420 taskOptionsList.add(option);
421 if (option.getTaskName() != null && !option.getTaskName().equals("")) {
422 if (!taskNames.add(option.getTaskName())) {
423 throw new IllegalArgumentException(
424 String.format("Identical task names in request : \"%s\" duplicated",
425 option.getTaskName()));
429 if (bulkAddRequest.addRequestSize() > QueueConstants.maxTasksPerAdd()) {
430 throw new IllegalArgumentException(
431 String.format("No more than %d tasks can be added in a single add call",
432 QueueConstants.maxTasksPerAdd()));
435 if (hasPullTask && hasPushTask) {
436 throw new IllegalArgumentException(
437 "May not add both push tasks and pull tasks in the same call.");
440 if (txn != null &&
441 bulkAddRequest.encodingSize() > QueueConstants.maxTransactionalRequestSizeBytes()) {
442 throw new IllegalArgumentException(
443 String.format("Transactional add may not be larger than %d bytes: %d bytes requested.",
444 QueueConstants.maxTransactionalRequestSizeBytes(),
445 bulkAddRequest.encodingSize()));
448 apiHelper.makeSyncCall("BulkAdd", bulkAddRequest, bulkAddResponse);
450 if (bulkAddResponse.taskResultSize() != bulkAddRequest.addRequestSize()) {
451 throw new InternalFailureException(
452 String.format("expected %d results from BulkAdd(), got %d",
453 bulkAddRequest.addRequestSize(), bulkAddResponse.taskResultSize()));
456 List<TaskHandle> tasks = new ArrayList<TaskHandle>();
457 for (int i = 0; i < bulkAddResponse.taskResultSize(); ++i) {
458 TaskQueueBulkAddResponse.TaskResult taskResult = bulkAddResponse.taskResults().get(i);
459 TaskQueueAddRequest addRequest = bulkAddRequest.getAddRequest(i);
460 TaskOptions options = taskOptionsList.get(i);
462 if (taskResult.getResult() == TaskQueueServiceError.ErrorCode.OK.getValue()) {
463 String taskName = options.getTaskName();
464 if (taskResult.hasChosenTaskName()) {
465 taskName = taskResult.getChosenTaskName();
467 TaskOptions taskResultOptions = new TaskOptions(options);
468 taskResultOptions.taskName(taskName).payload(addRequest.getBodyAsBytes());
469 TaskHandle handle = new TaskHandle(taskResultOptions, queueName);
470 tasks.add(handle.etaUsec(addRequest.getEtaUsec()));
471 } else if (taskResult.getResult() != TaskQueueServiceError.ErrorCode.SKIPPED.getValue()) {
472 throw QueueApiHelper.translateError(taskResult.getResult(), "");
475 return tasks;
478 long currentTimeMillis() {
479 return System.currentTimeMillis();
482 private long determineEta(TaskOptions taskOptions) {
483 Long etaMillis = taskOptions.getEtaMillis();
484 Long countdownMillis = taskOptions.getCountdownMillis();
485 if (etaMillis == null) {
486 if (countdownMillis == null) {
487 return currentTimeMillis();
488 } else {
489 if (countdownMillis > QueueConstants.getMaxEtaDeltaMillis()) {
490 throw new IllegalArgumentException("ETA too far into the future");
492 if (countdownMillis < 0) {
493 throw new IllegalArgumentException("Negative countdown is not allowed");
495 return currentTimeMillis() + countdownMillis;
497 } else {
498 if (countdownMillis == null) {
499 if (etaMillis - currentTimeMillis() > QueueConstants.getMaxEtaDeltaMillis()) {
500 throw new IllegalArgumentException("ETA too far into the future");
502 if (etaMillis < 0) {
503 throw new IllegalArgumentException("Negative ETA is invalid");
505 return etaMillis;
506 } else {
507 throw new IllegalArgumentException(
508 "Only one or neither of EtaMillis and CountdownMillis may be specified");
513 byte[] encodeParamsPost(List<Param> params) {
514 byte[] payload;
515 try {
516 payload = encodeParamsUrlEncoded(params).getBytes("UTF-8");
517 } catch (UnsupportedEncodingException exception) {
518 throw new UnsupportedTranslationException(exception);
521 return payload;
524 String encodeParamsUrlEncoded(List<Param> params) {
525 StringBuilder result = new StringBuilder();
526 try {
527 String appender = "";
528 for (Param param : params) {
529 result.append(appender);
530 appender = "&";
531 result.append(param.getURLEncodedName());
532 result.append("=");
533 result.append(param.getURLEncodedValue());
535 } catch (UnsupportedEncodingException exception) {
536 throw new UnsupportedTranslationException(exception);
538 return result.toString();
541 private String defaultUrl() {
542 return DEFAULT_QUEUE_PATH + "/" + queueName;
546 * See {@link Queue#getQueueName()}.
548 public String getQueueName() {
549 return queueName;
552 DatastoreService getDatastoreService() {
553 return datastoreService;
557 * See {@link Queue#purge()}.
559 public void purge() {
560 TaskQueuePurgeQueueRequest purgeRequest = new TaskQueuePurgeQueueRequest();
561 TaskQueuePurgeQueueResponse purgeResponse = new TaskQueuePurgeQueueResponse();
563 purgeRequest.setQueueName(queueName);
564 apiHelper.makeSyncCall("PurgeQueue", purgeRequest, purgeResponse);
568 * See {@link Queue#deleteTask(String)}.
570 @Override
571 public boolean deleteTask(String taskName) {
572 TaskHandle.validateTaskName(taskName);
573 return deleteTask(new TaskHandle(TaskOptions.Builder.withTaskName(taskName),
574 queueName));
578 * See {@link Queue#deleteTask(TaskHandle)}.
580 @Override
581 public boolean deleteTask(TaskHandle taskHandle) {
582 List<TaskHandle> taskHandles = new ArrayList<TaskHandle>(1);
583 taskHandles.add(taskHandle);
584 List<Boolean> result = deleteTask(taskHandles);
585 return result.get(0);
589 * See {@link Queue#deleteTask(List<TaskHandle>)}.
591 @Override
592 public List<Boolean> deleteTask(List<TaskHandle> taskHandles) {
594 TaskQueueDeleteRequest deleteRequest = new TaskQueueDeleteRequest();
595 TaskQueueDeleteResponse deleteResponse = new TaskQueueDeleteResponse();
596 deleteRequest.setQueueName(queueName);
598 for (TaskHandle taskHandle : taskHandles) {
599 if (taskHandle.getQueueName().equals(this.queueName)) {
600 deleteRequest.addTaskName(taskHandle.getName());
601 } else {
602 throw new QueueNameMismatchException(
603 String.format("The task %s is associated with the queue named %s "
604 + "and cannot be deleted from the queue named %s.",
605 taskHandle.getName(), taskHandle.getQueueName(), this.queueName));
609 apiHelper.makeSyncCall("Delete", deleteRequest, deleteResponse);
611 List<Boolean> result = new ArrayList<Boolean>(deleteResponse.resultSize());
613 for (int i = 0; i < deleteResponse.resultSize(); ++i) {
614 int errorCode = deleteResponse.getResult(i);
615 if (errorCode != TaskQueueServiceError.ErrorCode.OK.getValue() &&
616 errorCode != TaskQueueServiceError.ErrorCode.TOMBSTONED_TASK.getValue() &&
617 errorCode != TaskQueueServiceError.ErrorCode.UNKNOWN_TASK.getValue()) {
618 throw QueueApiHelper.translateError(errorCode, "");
620 result.add(errorCode == TaskQueueServiceError.ErrorCode.OK.getValue());
623 return result;
626 private List<TaskHandle> leaseTasksInternal(LeaseOptions options) {
627 long leaseMillis = options.getUnit().toMillis(options.getLease());
628 if (leaseMillis > QueueConstants.maxLease(TimeUnit.MILLISECONDS)) {
629 throw new IllegalArgumentException(
630 String.format("A lease period can be no longer than %d seconds",
631 QueueConstants.maxLease(TimeUnit.SECONDS)));
634 if (options.getCountLimit() > QueueConstants.maxLeaseCount()) {
635 throw new IllegalArgumentException(
636 String.format("No more than %d tasks can be leased in one call",
637 QueueConstants.maxLeaseCount()));
640 TaskQueueQueryAndOwnTasksRequest leaseRequest = new TaskQueueQueryAndOwnTasksRequest();
641 TaskQueueQueryAndOwnTasksResponse leaseResponse = new TaskQueueQueryAndOwnTasksResponse();
643 leaseRequest.setQueueName(queueName);
644 leaseRequest.setLeaseSeconds(leaseMillis / 1000.0);
645 leaseRequest.setMaxTasks(options.getCountLimit());
646 if (options.getGroupByTag()) {
647 leaseRequest.setGroupByTag(true);
648 if (options.getTag() != null) {
649 leaseRequest.setTagAsBytes(options.getTag());
653 ApiConfig apiConfig = new ApiConfig();
654 if (options.getDeadlineInSeconds() == null) {
655 apiConfig.setDeadlineInSeconds(DEFAULT_LEASE_TASKS_DEADLINE_SECONDS);
656 } else {
657 apiConfig.setDeadlineInSeconds(options.getDeadlineInSeconds());
659 apiHelper.makeSyncCall("QueryAndOwnTasks", leaseRequest, leaseResponse, apiConfig);
661 List<TaskHandle> result = new ArrayList<TaskHandle>();
662 for (TaskQueueQueryAndOwnTasksResponse.Task response : leaseResponse.tasks()) {
663 TaskOptions taskOptions = TaskOptions.Builder.withTaskName(response.getTaskName())
664 .payload(response.getBodyAsBytes())
665 .method(TaskOptions.Method.PULL);
666 if (response.hasTag()) {
667 taskOptions.tag(response.getTagAsBytes());
669 TaskHandle handle = new TaskHandle(taskOptions, queueName, response.getRetryCount());
670 result.add(handle.etaUsec(response.getEtaUsec()));
673 return result;
677 * See {@link Queue#leaseTasks(long, TimeUnit, long)}.
679 @Override
680 public List<TaskHandle> leaseTasks(long lease, TimeUnit unit, long countLimit) {
681 return leaseTasksInternal(LeaseOptions.Builder.withLeasePeriod(lease, unit)
682 .countLimit(countLimit));
686 * See {@link Queue#leaseTasksByTagBytes(long, TimeUnit, long, byte[])}.
688 @Override
689 public List<TaskHandle> leaseTasksByTagBytes(
690 long lease, TimeUnit unit, long countLimit, byte[] tag) {
691 LeaseOptions options = LeaseOptions.Builder.withLeasePeriod(lease, unit)
692 .countLimit(countLimit);
693 if (tag != null) {
694 options.tag(tag);
695 } else {
696 options.groupByTag();
698 return leaseTasksInternal(options);
702 * See {@link Queue#leaseTasksByTag(long, TimeUnit, long, String)}.
704 @Override
705 public List<TaskHandle> leaseTasksByTag(long lease, TimeUnit unit,
706 long countLimit, String tag) {
707 LeaseOptions options = LeaseOptions.Builder.withLeasePeriod(lease, unit)
708 .countLimit(countLimit);
709 if (tag != null) {
710 options.tag(tag);
711 } else {
712 options.groupByTag();
714 return leaseTasksInternal(options);
718 * See {@link Queue#leaseTasks(LeaseOptions)}.
720 @Override
721 public List<TaskHandle> leaseTasks(LeaseOptions options) {
722 if (options.getLease() == null) {
723 throw new IllegalArgumentException("The lease period must be specified");
725 if (options.getCountLimit() == null) {
726 throw new IllegalArgumentException("The count limit must be specified");
728 return leaseTasksInternal(options);
732 * See {@link Queue#modifyTaskLease(TaskHandle, long, TimeUnit)}.
734 @Override
735 public TaskHandle modifyTaskLease(TaskHandle taskHandle, long lease, TimeUnit unit) {
736 long leaseMillis = unit.toMillis(lease);
737 if (leaseMillis > QueueConstants.maxLease(TimeUnit.MILLISECONDS)) {
738 throw new IllegalArgumentException(
739 String.format("The lease time specified (%s seconds) is too large. " +
740 "Lease period can be no longer than %d seconds.",
741 formatLeaseTimeInSeconds(leaseMillis),
742 QueueConstants.maxLease(TimeUnit.SECONDS)));
744 if (leaseMillis < 0) {
745 throw new IllegalArgumentException(
746 String.format("The lease time must not be negative. " +
747 "Specified lease time was %s seconds.",
748 formatLeaseTimeInSeconds(leaseMillis)));
751 TaskQueueModifyTaskLeaseRequest request = new TaskQueueModifyTaskLeaseRequest();
752 TaskQueueModifyTaskLeaseResponse response = new TaskQueueModifyTaskLeaseResponse();
754 request.setQueueName(this.queueName);
755 request.setTaskName(taskHandle.getName());
756 request.setLeaseSeconds(leaseMillis / 1000.0);
757 request.setEtaUsec(taskHandle.getEtaUsec());
759 apiHelper.makeSyncCall("ModifyTaskLease", request, response);
760 taskHandle.etaUsec(response.getUpdatedEtaUsec());
761 return taskHandle;
764 private String formatLeaseTimeInSeconds(long milliSeconds) {
765 long seconds = TimeUnit.SECONDS.convert(milliSeconds, TimeUnit.MILLISECONDS);
766 long remainder = milliSeconds - TimeUnit.MILLISECONDS.convert(seconds, TimeUnit.SECONDS);
767 String formatString = milliSeconds < 0 ? "-%01d.%03d" : "%01d.%03d";
768 return String.format(formatString, Math.abs(seconds), Math.abs(remainder));
772 * See {@link Queue#fetchStatistics()}.
774 @Override
775 public QueueStatistics fetchStatistics() {
776 List<Queue> queues = Collections.<Queue>singletonList(this);
777 List<QueueStatistics> stats = QueueStatistics.fetchForQueues(queues, apiHelper);
778 if (stats.size() != 1) {
779 throw new IllegalStateException(
780 "An internal error occurred while obtaining queue statistics");
782 return stats.get(0);