3 # Copyright 2007 Google Inc.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
23 A library for working with QueueInfo records, describing task queue entries
24 for an application. Supports loading the records from queue.yaml.
26 A queue has two required parameters and various optional ones. The required
27 parameters are 'name' (must be unique for an appid) and 'rate' (the rate
28 at which jobs in the queue are run). There is an optional parameter
29 'bucket_size' that will allow tokens to be 'saved up' (for more on the
30 algorithm, see http://en.wikipedia.org/wiki/Token_Bucket). rate is expressed
31 as number/unit, with number being an int or a float, and unit being one of
32 's' (seconds), 'm' (minutes), 'h' (hours) or 'd' (days). bucket_size is
35 An example of the use of bucket_size rate: the free email quota is 2000/d,
36 and the maximum you can send in a single minute is 11. So we can define a
37 queue for sending email like this:
44 If this queue had been idle for a while before some jobs were submitted to it,
45 the first 10 jobs submitted would be run immediately, then subsequent ones
46 would be run once every 40s or so. The limit of 2000 per day would still apply.
48 Another optional parameter is 'max_concurrent_requests', which pertains to the
49 requests being made by the queue. It specifies the maximum number of requests
50 that may be in-flight at any one time. An example:
55 max_concurrent_requests: 5
57 Each queue has an optional 'mode' parameter with legal values 'push' and 'pull'.
58 If mode is not specified, it defaults to 'push'. Tasks in queues with mode
59 'push' are invoked (pushed) at the specified rate. Tasks in queues with mode
60 'pull' are not directly invoked by App Engine. These tasks are leased for a
61 period by client code, and deleted by client code when the task's work is
62 finished. If not deleted before the expiry of the lease, the tasks are available
65 Each queue has an optional 'target' parameter. If specified all tasks inserted
66 into the queue will be executed on the specified alternate version/server
69 A queue may also optionally specify retry_parameters.
74 min_backoff_seconds: 0.1
75 max_backoff_seconds: 3600
78 Each task in the queue that fails during execution will be retried using these
79 parameters. All these fields are optional.
81 task_retry_limit: A non-negative integer. Tasks will be retried a maximum of
82 task_retry_limit times before failing permanently. If task_age_limit is also
83 specified, both task_retry_limit and task_age_limit must be exceeded before a
84 task fails permanently.
86 task_age_limit: A non-negative floating point number followed by a suffix s
87 (seconds), m (minutes), h (hours) or d (days). If the time since a task was
88 first tried exceeds task_age_limit, it will fail permanently. If
89 task_retry_limit is also specified, both task_retry_limit and task_age_limit
90 must be exceeded before a task fails permanently.
92 min_backoff_seconds: A non-negative floating point number. This is the minimum
93 interval after the first failure and the first retry of a task. If
94 max_backoff_seconds is also specified, min_backoff_seconds must not be greater
95 than max_backoff_seconds.
97 max_backoff_seconds: A non-negative floating point number. This is the maximum
98 allowed interval between successive retries of a failed task. If
99 min_backoff_seconds is also specified, min_backoff_seconds must not be greater
100 than max_backoff_seconds.
102 max_doublings: A non-negative integer. On successive failures, the retry backoff
103 interval will be successively doubled up to max_doublings times, starting at
104 min_backoff_seconds and not exceeding max_backoff_seconds. For retries after
105 max_doublings, the retry backoff will increase by the value of the backoff
106 when doubling ceased. e.g. for min_backoff_seconds of 1 ,max_doublings of 5,
107 we have successive retry backoffs of 1, 2, 4, 8, 16, 32, 64, 96, 128, ...
108 not exceeding max_backoff_seconds.
110 A queue may optionally specify an acl (Access Control List).
112 - user_email: a@foo.com
113 - writer_email: b@gmail.com
114 Each email must correspond to an account hosted by Google. The acl is
115 enforced for queue access from outside AppEngine.
117 An app's queues are also subject to storage quota limits for their stored tasks,
118 i.e. those tasks that have been added to queues but not yet executed. This quota
119 is part of their total storage quota (including datastore and blobstore quota).
120 We allow an app to override the default portion of this quota available for
121 taskqueue storage (100M) with a top level field "total_storage_limit".
123 total_storage_limit: 1.2G
125 If no suffix is specified, the number is interpreted as bytes. Supported
126 suffices are B (bytes), K (kilobytes), M (megabytes), G (gigabytes) and
127 T (terabytes). If total_storage_limit exceeds the total disk storage
128 available to an app, it is clamped.
138 from google
.appengine
.api
import appinfo
139 from google
.appengine
.api
import validation
140 from google
.appengine
.api
import yaml_builder
141 from google
.appengine
.api
import yaml_listener
142 from google
.appengine
.api
import yaml_object
143 from google
.appengine
.api
.taskqueue
import taskqueue_service_pb
146 _NAME_REGEX
= r
'^[A-Za-z0-9-]{0,499}$'
147 _RATE_REGEX
= r
'^(0|[0-9]+(\.[0-9]*)?/[smhd])'
148 _TOTAL_STORAGE_LIMIT_REGEX
= r
'^([0-9]+(\.[0-9]*)?[BKMGT]?)'
149 _MODE_REGEX
= r
'(pull)|(push)'
154 MODULE_ID_RE_STRING
= r
'(?!-)[a-z\d\-]{1,63}'
157 MODULE_VERSION_RE_STRING
= r
'(?!-)[a-z\d\-]{1,100}'
158 _VERSION_REGEX
= r
'^(?:(?:(%s):)?)(%s)$' % (MODULE_ID_RE_STRING
,
159 MODULE_VERSION_RE_STRING
)
165 BUCKET_SIZE
= 'bucket_size'
168 MAX_CONCURRENT_REQUESTS
= 'max_concurrent_requests'
169 TOTAL_STORAGE_LIMIT
= 'total_storage_limit'
171 BYTE_SUFFIXES
= 'BKMGT'
173 RETRY_PARAMETERS
= 'retry_parameters'
174 TASK_RETRY_LIMIT
= 'task_retry_limit'
175 TASK_AGE_LIMIT
= 'task_age_limit'
176 MIN_BACKOFF_SECONDS
= 'min_backoff_seconds'
177 MAX_BACKOFF_SECONDS
= 'max_backoff_seconds'
178 MAX_DOUBLINGS
= 'max_doublings'
181 USER_EMAIL
= 'user_email'
182 WRITER_EMAIL
= 'writer_email'
185 class MalformedQueueConfiguration(Exception):
186 """Configuration file for Task Queue is malformed."""
189 class RetryParameters(validation
.Validated
):
190 """Retry parameters for a single task queue."""
192 TASK_RETRY_LIMIT
: validation
.Optional(validation
.TYPE_INT
),
193 TASK_AGE_LIMIT
: validation
.Optional(validation
.TimeValue()),
194 MIN_BACKOFF_SECONDS
: validation
.Optional(validation
.TYPE_FLOAT
),
195 MAX_BACKOFF_SECONDS
: validation
.Optional(validation
.TYPE_FLOAT
),
196 MAX_DOUBLINGS
: validation
.Optional(validation
.TYPE_INT
),
200 class Acl(validation
.Validated
):
201 """Access control list for a single task queue."""
203 USER_EMAIL
: validation
.Optional(validation
.TYPE_STR
),
204 WRITER_EMAIL
: validation
.Optional(validation
.TYPE_STR
),
208 class QueueEntry(validation
.Validated
):
209 """A queue entry describes a single task queue."""
212 RATE
: validation
.Optional(_RATE_REGEX
),
213 MODE
: validation
.Optional(_MODE_REGEX
),
214 BUCKET_SIZE
: validation
.Optional(validation
.TYPE_INT
),
215 MAX_CONCURRENT_REQUESTS
: validation
.Optional(validation
.TYPE_INT
),
216 RETRY_PARAMETERS
: validation
.Optional(RetryParameters
),
217 ACL
: validation
.Optional(validation
.Repeated(Acl
)),
220 TARGET
: validation
.Optional(_VERSION_REGEX
),
224 class QueueInfoExternal(validation
.Validated
):
225 """QueueInfoExternal describes all queue entries for an application."""
227 appinfo
.APPLICATION
: validation
.Optional(appinfo
.APPLICATION_RE_STRING
),
228 TOTAL_STORAGE_LIMIT
: validation
.Optional(_TOTAL_STORAGE_LIMIT_REGEX
),
229 QUEUE
: validation
.Optional(validation
.Repeated(QueueEntry
)),
233 def LoadSingleQueue(queue_info
, open_fn
=None):
234 """Load a queue.yaml file or string and return a QueueInfoExternal object.
237 queue_info: the contents of a queue.yaml file, as a string.
238 open_fn: Function for opening files. Unused.
241 A QueueInfoExternal object.
243 builder
= yaml_object
.ObjectBuilder(QueueInfoExternal
)
244 handler
= yaml_builder
.BuilderHandler(builder
)
245 listener
= yaml_listener
.EventListener(handler
)
246 listener
.Parse(queue_info
)
248 queue_info
= handler
.GetResults()
249 if len(queue_info
) < 1:
250 raise MalformedQueueConfiguration('Empty queue configuration.')
251 if len(queue_info
) > 1:
252 raise MalformedQueueConfiguration('Multiple queue: sections '
258 """Parses a rate string in the form number/unit, or the literal 0.
260 The unit is one of s (seconds), m (minutes), h (hours) or d (days).
263 rate: the rate string.
266 a floating point number representing the rate/second.
269 MalformedQueueConfiguration: if the rate is invalid
273 elements
= rate
.split('/')
274 if len(elements
) != 2:
275 raise MalformedQueueConfiguration('Rate "%s" is invalid.' % rate
)
276 number
, unit
= elements
278 number
= float(number
)
280 raise MalformedQueueConfiguration('Rate "%s" is invalid:'
281 ' "%s" is not a number.' %
283 if unit
not in 'smhd':
284 raise MalformedQueueConfiguration('Rate "%s" is invalid:'
285 ' "%s" is not one of s, m, h, d.' %
292 return number
/(60 * 60)
294 return number
/(24 * 60 * 60)
297 def ParseTotalStorageLimit(limit
):
298 """Parses a string representing the storage bytes limit.
300 Optional limit suffixes are:
301 B (bytes), K (kilobytes), M (megabytes), G (gigabytes), T (terabytes)
304 limit: The storage bytes limit string.
307 An int representing the storage limit in bytes.
310 MalformedQueueConfiguration: if the limit argument isn't a valid python
311 double followed by an optional suffix.
313 limit
= limit
.strip()
315 raise MalformedQueueConfiguration('Total Storage Limit must not be empty.')
317 if limit
[-1] in BYTE_SUFFIXES
:
318 number
= float(limit
[0:-1])
319 for c
in BYTE_SUFFIXES
:
321 number
= number
* 1024
329 raise MalformedQueueConfiguration('Total Storage Limit "%s" is invalid.' %
333 def ParseTaskAgeLimit(age_limit
):
334 """Parses a string representing the task's age limit (maximum allowed age).
336 The string must be a non-negative integer or floating point number followed by
337 one of s, m, h, or d (seconds, minutes, hours or days respectively).
340 age_limit: The task age limit string.
343 An int representing the age limit in seconds.
346 MalformedQueueConfiguration: if the limit argument isn't a valid python
347 double followed by a required suffix.
349 age_limit
= age_limit
.strip()
351 raise MalformedQueueConfiguration('Task Age Limit must not be empty.')
353 if unit
not in "smhd":
354 raise MalformedQueueConfiguration('Task Age_Limit must be in s (seconds), '
355 'm (minutes), h (hours) or d (days)')
357 number
= float(age_limit
[0:-1])
361 return int(number
* 60)
363 return int(number
* 3600)
365 return int(number
* 86400)
368 raise MalformedQueueConfiguration('Task Age_Limit "%s" is invalid.' %
372 def TranslateRetryParameters(retry
):
373 """Populates a TaskQueueRetryParameters from a queueinfo.RetryParameters.
376 retry: A queueinfo.RetryParameters read from queue.yaml that describes the
377 queue's retry parameters.
380 A taskqueue_service_pb.TaskQueueRetryParameters proto populated with the
384 MalformedQueueConfiguration: if the retry parameters are invalid.
386 params
= taskqueue_service_pb
.TaskQueueRetryParameters()
387 if retry
.task_retry_limit
is not None:
388 params
.set_retry_limit(int(retry
.task_retry_limit
))
389 if retry
.task_age_limit
is not None:
391 params
.set_age_limit_sec(ParseTaskAgeLimit(retry
.task_age_limit
))
392 if retry
.min_backoff_seconds
is not None:
393 params
.set_min_backoff_sec(float(retry
.min_backoff_seconds
))
394 if retry
.max_backoff_seconds
is not None:
395 params
.set_max_backoff_sec(float(retry
.max_backoff_seconds
))
396 if retry
.max_doublings
is not None:
397 params
.set_max_doublings(int(retry
.max_doublings
))
403 if params
.has_min_backoff_sec() and not params
.has_max_backoff_sec():
404 if params
.min_backoff_sec() > params
.max_backoff_sec():
405 params
.set_max_backoff_sec(params
.min_backoff_sec())
407 if not params
.has_min_backoff_sec() and params
.has_max_backoff_sec():
408 if params
.min_backoff_sec() > params
.max_backoff_sec():
409 params
.set_min_backoff_sec(params
.max_backoff_sec())
412 if params
.has_retry_limit() and params
.retry_limit() < 0:
413 raise MalformedQueueConfiguration(
414 'Task retry limit must not be less than zero.')
416 if params
.has_age_limit_sec() and not params
.age_limit_sec() > 0:
417 raise MalformedQueueConfiguration(
418 'Task age limit must be greater than zero.')
420 if params
.has_min_backoff_sec() and params
.min_backoff_sec() < 0:
421 raise MalformedQueueConfiguration(
422 'Min backoff seconds must not be less than zero.')
424 if params
.has_max_backoff_sec() and params
.max_backoff_sec() < 0:
425 raise MalformedQueueConfiguration(
426 'Max backoff seconds must not be less than zero.')
428 if params
.has_max_doublings() and params
.max_doublings() < 0:
429 raise MalformedQueueConfiguration(
430 'Max doublings must not be less than zero.')
432 if (params
.has_min_backoff_sec() and params
.has_max_backoff_sec() and
433 params
.min_backoff_sec() > params
.max_backoff_sec()):
434 raise MalformedQueueConfiguration(
435 'Min backoff sec must not be greater than than max backoff sec.')