1.9.30 sync.
[gae.git] / python / google / appengine / api / queueinfo.py
blobc7ba941ed8b018f203feb8e2a2ecb1ed70a04f70
1 #!/usr/bin/env python
3 # Copyright 2007 Google Inc.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
21 """QueueInfo tools.
23 A library for working with QueueInfo records, describing task queue entries
24 for an application. Supports loading the records from queue.yaml.
26 A queue has two required parameters and various optional ones. The required
27 parameters are 'name' (must be unique for an appid) and 'rate' (the rate
28 at which jobs in the queue are run). There is an optional parameter
29 'bucket_size' that will allow tokens to be 'saved up' (for more on the
30 algorithm, see http://en.wikipedia.org/wiki/Token_Bucket). rate is expressed
31 as number/unit, with number being an int or a float, and unit being one of
32 's' (seconds), 'm' (minutes), 'h' (hours) or 'd' (days). bucket_size is
33 an integer.
35 An example of the use of bucket_size rate: the free email quota is 2000/d,
36 and the maximum you can send in a single minute is 11. So we can define a
37 queue for sending email like this:
39 queue:
40 - name: mail-queue
41 rate: 2000/d
42 bucket_size: 10
44 If this queue had been idle for a while before some jobs were submitted to it,
45 the first 10 jobs submitted would be run immediately, then subsequent ones
46 would be run once every 40s or so. The limit of 2000 per day would still apply.
48 Another optional parameter is 'max_concurrent_requests', which pertains to the
49 requests being made by the queue. It specifies the maximum number of requests
50 that may be in-flight at any one time. An example:
52 queue:
53 - name: server-queue
54 rate: 50/s
55 max_concurrent_requests: 5
57 Each queue has an optional 'mode' parameter with legal values 'push' and 'pull'.
58 If mode is not specified, it defaults to 'push'. Tasks in queues with mode
59 'push' are invoked (pushed) at the specified rate. Tasks in queues with mode
60 'pull' are not directly invoked by App Engine. These tasks are leased for a
61 period by client code, and deleted by client code when the task's work is
62 finished. If not deleted before the expiry of the lease, the tasks are available
63 for lease again.
65 Each queue has an optional 'target' parameter. If specified all tasks inserted
66 into the queue will be executed on the specified alternate version/server
67 instance.
69 A queue may also optionally specify retry_parameters.
71 retry_parameters:
72 task_retry_limit: 100
73 task_age_limit: 1d
74 min_backoff_seconds: 0.1
75 max_backoff_seconds: 3600
76 max_doublings: 10
78 Each task in the queue that fails during execution will be retried using these
79 parameters. All these fields are optional.
81 task_retry_limit: A non-negative integer. Tasks will be retried a maximum of
82 task_retry_limit times before failing permanently. If task_age_limit is also
83 specified, both task_retry_limit and task_age_limit must be exceeded before a
84 task fails permanently.
86 task_age_limit: A non-negative floating point number followed by a suffix s
87 (seconds), m (minutes), h (hours) or d (days). If the time since a task was
88 first tried exceeds task_age_limit, it will fail permanently. If
89 task_retry_limit is also specified, both task_retry_limit and task_age_limit
90 must be exceeded before a task fails permanently.
92 min_backoff_seconds: A non-negative floating point number. This is the minimum
93 interval after the first failure and the first retry of a task. If
94 max_backoff_seconds is also specified, min_backoff_seconds must not be greater
95 than max_backoff_seconds.
97 max_backoff_seconds: A non-negative floating point number. This is the maximum
98 allowed interval between successive retries of a failed task. If
99 min_backoff_seconds is also specified, min_backoff_seconds must not be greater
100 than max_backoff_seconds.
102 max_doublings: A non-negative integer. On successive failures, the retry backoff
103 interval will be successively doubled up to max_doublings times, starting at
104 min_backoff_seconds and not exceeding max_backoff_seconds. For retries after
105 max_doublings, the retry backoff will increase by the value of the backoff
106 when doubling ceased. e.g. for min_backoff_seconds of 1 ,max_doublings of 5,
107 we have successive retry backoffs of 1, 2, 4, 8, 16, 32, 64, 96, 128, ...
108 not exceeding max_backoff_seconds.
110 A queue may optionally specify an acl (Access Control List).
111 acl:
112 - user_email: a@foo.com
113 - writer_email: b@gmail.com
114 Each email must correspond to an account hosted by Google. The acl is
115 enforced for queue access from outside AppEngine.
117 An app's queues are also subject to storage quota limits for their stored tasks,
118 i.e. those tasks that have been added to queues but not yet executed. This quota
119 is part of their total storage quota (including datastore and blobstore quota).
120 We allow an app to override the default portion of this quota available for
121 taskqueue storage (100M) with a top level field "total_storage_limit".
123 total_storage_limit: 1.2G
125 If no suffix is specified, the number is interpreted as bytes. Supported
126 suffices are B (bytes), K (kilobytes), M (megabytes), G (gigabytes) and
127 T (terabytes). If total_storage_limit exceeds the total disk storage
128 available to an app, it is clamped.
138 from google.appengine.api import appinfo
139 from google.appengine.api import validation
140 from google.appengine.api import yaml_builder
141 from google.appengine.api import yaml_listener
142 from google.appengine.api import yaml_object
143 from google.appengine.api.taskqueue import taskqueue_service_pb
146 _NAME_REGEX = r'^[A-Za-z0-9-]{0,499}$'
147 _RATE_REGEX = r'^(0|[0-9]+(\.[0-9]*)?/[smhd])'
148 _TOTAL_STORAGE_LIMIT_REGEX = r'^([0-9]+(\.[0-9]*)?[BKMGT]?)'
149 _MODE_REGEX = r'(pull)|(push)'
154 MODULE_ID_RE_STRING = r'(?!-)[a-z\d\-]{1,63}'
157 MODULE_VERSION_RE_STRING = r'(?!-)[a-z\d\-]{1,100}'
158 _VERSION_REGEX = r'^(?:(?:(%s):)?)(%s)$' % (MODULE_ID_RE_STRING,
159 MODULE_VERSION_RE_STRING)
161 QUEUE = 'queue'
163 NAME = 'name'
164 RATE = 'rate'
165 BUCKET_SIZE = 'bucket_size'
166 MODE = 'mode'
167 TARGET = 'target'
168 MAX_CONCURRENT_REQUESTS = 'max_concurrent_requests'
169 TOTAL_STORAGE_LIMIT = 'total_storage_limit'
171 BYTE_SUFFIXES = 'BKMGT'
173 RETRY_PARAMETERS = 'retry_parameters'
174 TASK_RETRY_LIMIT = 'task_retry_limit'
175 TASK_AGE_LIMIT = 'task_age_limit'
176 MIN_BACKOFF_SECONDS = 'min_backoff_seconds'
177 MAX_BACKOFF_SECONDS = 'max_backoff_seconds'
178 MAX_DOUBLINGS = 'max_doublings'
180 ACL = 'acl'
181 USER_EMAIL = 'user_email'
182 WRITER_EMAIL = 'writer_email'
185 class MalformedQueueConfiguration(Exception):
186 """Configuration file for Task Queue is malformed."""
189 class RetryParameters(validation.Validated):
190 """Retry parameters for a single task queue."""
191 ATTRIBUTES = {
192 TASK_RETRY_LIMIT: validation.Optional(validation.TYPE_INT),
193 TASK_AGE_LIMIT: validation.Optional(validation.TimeValue()),
194 MIN_BACKOFF_SECONDS: validation.Optional(validation.TYPE_FLOAT),
195 MAX_BACKOFF_SECONDS: validation.Optional(validation.TYPE_FLOAT),
196 MAX_DOUBLINGS: validation.Optional(validation.TYPE_INT),
200 class Acl(validation.Validated):
201 """Access control list for a single task queue."""
202 ATTRIBUTES = {
203 USER_EMAIL: validation.Optional(validation.TYPE_STR),
204 WRITER_EMAIL: validation.Optional(validation.TYPE_STR),
208 class QueueEntry(validation.Validated):
209 """A queue entry describes a single task queue."""
210 ATTRIBUTES = {
211 NAME: _NAME_REGEX,
212 RATE: validation.Optional(_RATE_REGEX),
213 MODE: validation.Optional(_MODE_REGEX),
214 BUCKET_SIZE: validation.Optional(validation.TYPE_INT),
215 MAX_CONCURRENT_REQUESTS: validation.Optional(validation.TYPE_INT),
216 RETRY_PARAMETERS: validation.Optional(RetryParameters),
217 ACL: validation.Optional(validation.Repeated(Acl)),
220 TARGET: validation.Optional(_VERSION_REGEX),
224 class QueueInfoExternal(validation.Validated):
225 """QueueInfoExternal describes all queue entries for an application."""
226 ATTRIBUTES = {
227 appinfo.APPLICATION: validation.Optional(appinfo.APPLICATION_RE_STRING),
228 TOTAL_STORAGE_LIMIT: validation.Optional(_TOTAL_STORAGE_LIMIT_REGEX),
229 QUEUE: validation.Optional(validation.Repeated(QueueEntry)),
233 def LoadSingleQueue(queue_info, open_fn=None):
234 """Load a queue.yaml file or string and return a QueueInfoExternal object.
236 Args:
237 queue_info: the contents of a queue.yaml file, as a string.
238 open_fn: Function for opening files. Unused.
240 Returns:
241 A QueueInfoExternal object.
243 builder = yaml_object.ObjectBuilder(QueueInfoExternal)
244 handler = yaml_builder.BuilderHandler(builder)
245 listener = yaml_listener.EventListener(handler)
246 listener.Parse(queue_info)
248 queue_info = handler.GetResults()
249 if len(queue_info) < 1:
250 raise MalformedQueueConfiguration('Empty queue configuration.')
251 if len(queue_info) > 1:
252 raise MalformedQueueConfiguration('Multiple queue: sections '
253 'in configuration.')
254 return queue_info[0]
257 def ParseRate(rate):
258 """Parses a rate string in the form number/unit, or the literal 0.
260 The unit is one of s (seconds), m (minutes), h (hours) or d (days).
262 Args:
263 rate: the rate string.
265 Returns:
266 a floating point number representing the rate/second.
268 Raises:
269 MalformedQueueConfiguration: if the rate is invalid
271 if rate == "0":
272 return 0.0
273 elements = rate.split('/')
274 if len(elements) != 2:
275 raise MalformedQueueConfiguration('Rate "%s" is invalid.' % rate)
276 number, unit = elements
277 try:
278 number = float(number)
279 except ValueError:
280 raise MalformedQueueConfiguration('Rate "%s" is invalid:'
281 ' "%s" is not a number.' %
282 (rate, number))
283 if unit not in 'smhd':
284 raise MalformedQueueConfiguration('Rate "%s" is invalid:'
285 ' "%s" is not one of s, m, h, d.' %
286 (rate, unit))
287 if unit == 's':
288 return number
289 if unit == 'm':
290 return number/60
291 if unit == 'h':
292 return number/(60 * 60)
293 if unit == 'd':
294 return number/(24 * 60 * 60)
297 def ParseTotalStorageLimit(limit):
298 """Parses a string representing the storage bytes limit.
300 Optional limit suffixes are:
301 B (bytes), K (kilobytes), M (megabytes), G (gigabytes), T (terabytes)
303 Args:
304 limit: The storage bytes limit string.
306 Returns:
307 An int representing the storage limit in bytes.
309 Raises:
310 MalformedQueueConfiguration: if the limit argument isn't a valid python
311 double followed by an optional suffix.
313 limit = limit.strip()
314 if not limit:
315 raise MalformedQueueConfiguration('Total Storage Limit must not be empty.')
316 try:
317 if limit[-1] in BYTE_SUFFIXES:
318 number = float(limit[0:-1])
319 for c in BYTE_SUFFIXES:
320 if limit[-1] != c:
321 number = number * 1024
322 else:
323 return int(number)
324 else:
327 return int(limit)
328 except ValueError:
329 raise MalformedQueueConfiguration('Total Storage Limit "%s" is invalid.' %
330 limit)
333 def ParseTaskAgeLimit(age_limit):
334 """Parses a string representing the task's age limit (maximum allowed age).
336 The string must be a non-negative integer or floating point number followed by
337 one of s, m, h, or d (seconds, minutes, hours or days respectively).
339 Args:
340 age_limit: The task age limit string.
342 Returns:
343 An int representing the age limit in seconds.
345 Raises:
346 MalformedQueueConfiguration: if the limit argument isn't a valid python
347 double followed by a required suffix.
349 age_limit = age_limit.strip()
350 if not age_limit:
351 raise MalformedQueueConfiguration('Task Age Limit must not be empty.')
352 unit = age_limit[-1]
353 if unit not in "smhd":
354 raise MalformedQueueConfiguration('Task Age_Limit must be in s (seconds), '
355 'm (minutes), h (hours) or d (days)')
356 try:
357 number = float(age_limit[0:-1])
358 if unit == 's':
359 return int(number)
360 if unit == 'm':
361 return int(number * 60)
362 if unit == 'h':
363 return int(number * 3600)
364 if unit == 'd':
365 return int(number * 86400)
367 except ValueError:
368 raise MalformedQueueConfiguration('Task Age_Limit "%s" is invalid.' %
369 age_limit)
372 def TranslateRetryParameters(retry):
373 """Populates a TaskQueueRetryParameters from a queueinfo.RetryParameters.
375 Args:
376 retry: A queueinfo.RetryParameters read from queue.yaml that describes the
377 queue's retry parameters.
379 Returns:
380 A taskqueue_service_pb.TaskQueueRetryParameters proto populated with the
381 data from "retry".
383 Raises:
384 MalformedQueueConfiguration: if the retry parameters are invalid.
386 params = taskqueue_service_pb.TaskQueueRetryParameters()
387 if retry.task_retry_limit is not None:
388 params.set_retry_limit(int(retry.task_retry_limit))
389 if retry.task_age_limit is not None:
391 params.set_age_limit_sec(ParseTaskAgeLimit(retry.task_age_limit))
392 if retry.min_backoff_seconds is not None:
393 params.set_min_backoff_sec(float(retry.min_backoff_seconds))
394 if retry.max_backoff_seconds is not None:
395 params.set_max_backoff_sec(float(retry.max_backoff_seconds))
396 if retry.max_doublings is not None:
397 params.set_max_doublings(int(retry.max_doublings))
403 if params.has_min_backoff_sec() and not params.has_max_backoff_sec():
404 if params.min_backoff_sec() > params.max_backoff_sec():
405 params.set_max_backoff_sec(params.min_backoff_sec())
407 if not params.has_min_backoff_sec() and params.has_max_backoff_sec():
408 if params.min_backoff_sec() > params.max_backoff_sec():
409 params.set_min_backoff_sec(params.max_backoff_sec())
412 if params.has_retry_limit() and params.retry_limit() < 0:
413 raise MalformedQueueConfiguration(
414 'Task retry limit must not be less than zero.')
416 if params.has_age_limit_sec() and not params.age_limit_sec() > 0:
417 raise MalformedQueueConfiguration(
418 'Task age limit must be greater than zero.')
420 if params.has_min_backoff_sec() and params.min_backoff_sec() < 0:
421 raise MalformedQueueConfiguration(
422 'Min backoff seconds must not be less than zero.')
424 if params.has_max_backoff_sec() and params.max_backoff_sec() < 0:
425 raise MalformedQueueConfiguration(
426 'Max backoff seconds must not be less than zero.')
428 if params.has_max_doublings() and params.max_doublings() < 0:
429 raise MalformedQueueConfiguration(
430 'Max doublings must not be less than zero.')
432 if (params.has_min_backoff_sec() and params.has_max_backoff_sec() and
433 params.min_backoff_sec() > params.max_backoff_sec()):
434 raise MalformedQueueConfiguration(
435 'Min backoff sec must not be greater than than max backoff sec.')
437 return params