Add google appengine to repo
[frozenviper.git] / google_appengine / google / appengine / api / labs / taskqueue / taskqueue_stub.py
blob94aad5120ecfaf1e58eb3ca315c09b61c9e0ba35
1 #!/usr/bin/env python
3 # Copyright 2007 Google Inc.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
18 """Stub version of the Task Queue API.
20 This stub stores tasks and runs them via dev_appserver's AddEvent capability.
21 It also validates the tasks by checking their queue name against the queue.yaml.
23 As well as implementing Task Queue API functions, the stub exposes various other
24 functions that are used by the dev_appserver's admin console to display the
25 application's queues and tasks.
26 """
31 import StringIO
32 import base64
33 import bisect
34 import datetime
35 import logging
36 import os
37 import random
38 import string
39 import time
41 import taskqueue_service_pb
43 from google.appengine.api import api_base_pb
44 from google.appengine.api import apiproxy_stub
45 from google.appengine.api import apiproxy_stub_map
46 from google.appengine.api import queueinfo
47 from google.appengine.runtime import apiproxy_errors
50 DEFAULT_RATE = '5.00/s'
52 DEFAULT_BUCKET_SIZE = 5
54 MAX_ETA_DELTA_DAYS = 30
56 admin_console_dummy_tasks = {}
58 BUILT_IN_HEADERS = set(['x-appengine-queuename',
59 'x-appengine-taskname',
60 'x-appengine-taskretrycount',
61 'x-appengine-development-payload',
62 'content-length'])
64 DEFAULT_QUEUE_NAME = 'default'
66 CRON_QUEUE_NAME = '__cron'
69 class _DummyTaskStore(object):
70 """A class that encapsulates a sorted store of tasks.
72 Used for testing the admin console.
73 """
75 def __init__(self):
76 """Constructor."""
77 self._sorted_by_name = []
78 self._sorted_by_eta = []
80 def _InsertTask(self, task):
81 """Insert a task into the dummy store, keeps lists sorted.
83 Args:
84 task: the new task.
85 """
86 eta = task.eta_usec()
87 name = task.task_name()
88 bisect.insort_left(self._sorted_by_eta, (eta, name, task))
89 bisect.insort_left(self._sorted_by_name, (name, task))
91 def Lookup(self, maximum, name=None, eta=None):
92 """Lookup a number of sorted tasks from the store.
94 If 'eta' is specified, the tasks are looked up in a list sorted by 'eta',
95 then 'name'. Otherwise they are sorted by 'name'. We need to be able to
96 sort by 'eta' and 'name' because tasks can have identical eta. If you had
97 20 tasks with the same ETA, you wouldn't be able to page past them, since
98 the 'next eta' would give the first one again. Names are unique, though.
100 Args:
101 maximum: the maximum number of tasks to return.
102 name: a task name to start with.
103 eta: an eta to start with.
105 Returns:
106 A list of up to 'maximum' tasks.
108 Raises:
109 ValueError: if the task store gets corrupted.
111 if eta is None:
112 pos = bisect.bisect_left(self._sorted_by_name, (name,))
113 tasks = (x[1] for x in self._sorted_by_name[pos:pos + maximum])
114 return list(tasks)
115 if name is None:
116 raise ValueError('must supply name or eta')
117 pos = bisect.bisect_left(self._sorted_by_eta, (eta, name))
118 tasks = (x[2] for x in self._sorted_by_eta[pos:pos + maximum])
119 return list(tasks)
121 def Count(self):
122 """Returns the number of tasks in the store."""
123 return len(self._sorted_by_name)
125 def Oldest(self):
126 """Returns the oldest eta in the store, or None if no tasks."""
127 if self._sorted_by_eta:
128 return self._sorted_by_eta[0][0]
129 return None
131 def Add(self, request):
132 """Inserts a new task into the store.
134 Args:
135 request: A taskqueue_service_pb.TaskQueueAddRequest.
137 Raises:
138 apiproxy_errors.ApplicationError: If a task with the same name is already
139 in the store.
141 pos = bisect.bisect_left(self._sorted_by_name, (request.task_name(),))
142 if (pos < len(self._sorted_by_name) and
143 self._sorted_by_name[pos][0] == request.task_name()):
144 raise apiproxy_errors.ApplicationError(
145 taskqueue_service_pb.TaskQueueServiceError.TASK_ALREADY_EXISTS)
147 now = datetime.datetime.utcnow()
148 now_sec = time.mktime(now.timetuple())
149 task = taskqueue_service_pb.TaskQueueQueryTasksResponse_Task()
150 task.set_task_name(request.task_name())
151 task.set_eta_usec(request.eta_usec())
152 task.set_creation_time_usec(now_sec * 1e6)
153 task.set_url(request.url())
154 task.set_method(request.method())
155 for keyvalue in task.header_list():
156 header = task.add_header()
157 header.set_key(keyvalue.key())
158 header.set_value(keyvalue.value())
159 if request.has_description():
160 task.set_description(request.description())
161 if request.has_body():
162 task.set_body(request.body())
163 if request.has_crontimetable():
164 task.mutable_crontimetable().set_schedule(
165 request.crontimetable().schedule())
166 task.mutable_crontimetable().set_timezone(
167 request.crontimetable().timezone())
168 self._InsertTask(task)
170 def Delete(self, name):
171 """Deletes a task from the store by name.
173 Args:
174 name: the name of the task to delete.
176 Returns:
177 TaskQueueServiceError.UNKNOWN_TASK: if the task is unknown.
178 TaskQueueServiceError.INTERNAL_ERROR: if the store is corrupted.
179 TaskQueueServiceError.OK: otherwise.
181 pos = bisect.bisect_left(self._sorted_by_name, (name,))
182 if pos >= len(self._sorted_by_name):
183 return taskqueue_service_pb.TaskQueueServiceError.UNKNOWN_TASK
184 if self._sorted_by_name[pos][1].task_name() != name:
185 logging.info('looking for task name %s, got task name %s', name,
186 self._sorted_by_name[pos][1].task_name())
187 return taskqueue_service_pb.TaskQueueServiceError.UNKNOWN_TASK
188 old_task = self._sorted_by_name.pop(pos)[1]
189 eta = old_task.eta_usec()
190 pos = bisect.bisect_left(self._sorted_by_eta, (eta, name, None))
191 if self._sorted_by_eta[pos][2] is not old_task:
192 logging.error('task store corrupted')
193 return taskqueue_service_pb.TaskQueueServiceError.INTERNAL_ERRROR
194 self._sorted_by_eta.pop(pos)
195 return taskqueue_service_pb.TaskQueueServiceError.OK
197 def Populate(self, num_tasks):
198 """Populates the store with a number of tasks.
200 Args:
201 num_tasks: the number of tasks to insert.
203 now = datetime.datetime.utcnow()
204 now_sec = time.mktime(now.timetuple())
206 def RandomTask():
207 """Creates a new task and randomly populates values."""
208 task = taskqueue_service_pb.TaskQueueQueryTasksResponse_Task()
209 task.set_task_name(''.join(random.choice(string.ascii_lowercase)
210 for x in range(20)))
211 task.set_eta_usec(int(now_sec * 1e6) + random.randint(-10e6, 600e6))
213 task.set_creation_time_usec(min(now_sec * 1e6, task.eta_usec()) -
214 random.randint(0, 2e7))
216 task.set_url(random.choice(['/a', '/b', '/c', '/d']))
217 if random.random() < 0.2:
218 task.set_method(
219 taskqueue_service_pb.TaskQueueQueryTasksResponse_Task.POST)
220 task.set_body('A' * 2000)
221 else:
222 task.set_method(
223 taskqueue_service_pb.TaskQueueQueryTasksResponse_Task.GET)
224 task.set_retry_count(max(0, random.randint(-10, 5)))
225 if random.random() < 0.3:
226 random_headers = [('nexus', 'one'),
227 ('foo', 'bar'),
228 ('content-type', 'text/plain'),
229 ('from', 'user@email.com')]
230 for _ in xrange(random.randint(1, 4)):
231 elem = random.randint(0, len(random_headers)-1)
232 key, value = random_headers.pop(elem)
233 header_proto = task.add_header()
234 header_proto.set_key(key)
235 header_proto.set_value(value)
236 return task
238 for _ in range(num_tasks):
239 self._InsertTask(RandomTask())
242 def _ParseQueueYaml(unused_self, root_path):
243 """Loads the queue.yaml file and parses it.
245 Args:
246 unused_self: Allows this function to be bound to a class member. Not used.
247 root_path: Directory containing queue.yaml. Not used.
249 Returns:
250 None if queue.yaml doesn't exist, otherwise a queueinfo.QueueEntry object
251 populated from the queue.yaml.
253 if root_path is None:
254 return None
255 for queueyaml in ('queue.yaml', 'queue.yml'):
256 try:
257 fh = open(os.path.join(root_path, queueyaml), 'r')
258 except IOError:
259 continue
260 try:
261 queue_info = queueinfo.LoadSingleQueue(fh)
262 return queue_info
263 finally:
264 fh.close()
265 return None
268 def _CompareTasksByEta(a, b):
269 """Python sort comparator for tasks by estimated time of arrival (ETA).
271 Args:
272 a: A taskqueue_service_pb.TaskQueueAddRequest.
273 b: A taskqueue_service_pb.TaskQueueAddRequest.
275 Returns:
276 Standard 1/0/-1 comparison result.
278 if a.eta_usec() > b.eta_usec():
279 return 1
280 if a.eta_usec() < b.eta_usec():
281 return -1
282 return 0
285 def _FormatEta(eta_usec):
286 """Formats a task ETA as a date string in UTC."""
287 eta = datetime.datetime.fromtimestamp(eta_usec/1000000)
288 return eta.strftime('%Y/%m/%d %H:%M:%S')
291 def _EtaDelta(eta_usec):
292 """Formats a task ETA as a relative time string."""
293 eta = datetime.datetime.fromtimestamp(eta_usec/1000000)
294 now = datetime.datetime.utcnow()
295 if eta > now:
296 return str(eta - now) + ' from now'
297 else:
298 return str(now - eta) + ' ago'
301 class TaskQueueServiceStub(apiproxy_stub.APIProxyStub):
302 """Python only task queue service stub.
304 This stub executes tasks when enabled by using the dev_appserver's AddEvent
305 capability. When task running is disabled this stub will store tasks for
306 display on a console, where the user may manually execute the tasks.
309 queue_yaml_parser = _ParseQueueYaml
311 def __init__(self,
312 service_name='taskqueue',
313 root_path=None,
314 auto_task_running=False,
315 task_retry_seconds=30,
316 _all_queues_valid=False):
317 """Constructor.
319 Args:
320 service_name: Service name expected for all calls.
321 root_path: Root path to the directory of the application which may contain
322 a queue.yaml file. If None, then it's assumed no queue.yaml file is
323 available.
324 auto_task_running: When True, the dev_appserver should automatically
325 run tasks after they are enqueued.
326 task_retry_seconds: How long to wait between task executions after a
327 task fails.
329 super(TaskQueueServiceStub, self).__init__(service_name)
330 self._taskqueues = {}
331 self._next_task_id = 1
332 self._root_path = root_path
333 self._all_queues_valid = _all_queues_valid
335 self._add_event = None
336 self._auto_task_running = auto_task_running
337 self._task_retry_seconds = task_retry_seconds
339 self._app_queues = {}
341 class _QueueDetails(taskqueue_service_pb.TaskQueueUpdateQueueRequest):
342 def __init__(self, paused=False):
343 self.paused = paused
345 def _ChooseTaskName(self):
346 """Returns a string containing a unique task name."""
347 self._next_task_id += 1
348 return 'task%d' % (self._next_task_id - 1)
350 def _VerifyTaskQueueAddRequest(self, request):
351 """Checks that a TaskQueueAddRequest is valid.
353 Checks that a TaskQueueAddRequest specifies a valid eta and a valid queue.
355 Args:
356 request: The taskqueue_service_pb.TaskQueueAddRequest to validate.
358 Returns:
359 A taskqueue_service_pb.TaskQueueServiceError indicating any problems with
360 the request or taskqueue_service_pb.TaskQueueServiceError.OK if it is
361 valid.
363 if request.eta_usec() < 0:
364 return taskqueue_service_pb.TaskQueueServiceError.INVALID_ETA
366 eta = datetime.datetime.utcfromtimestamp(request.eta_usec() / 1e6)
367 max_eta = (datetime.datetime.utcnow() +
368 datetime.timedelta(days=MAX_ETA_DELTA_DAYS))
369 if eta > max_eta:
370 return taskqueue_service_pb.TaskQueueServiceError.INVALID_ETA
372 return taskqueue_service_pb.TaskQueueServiceError.OK
374 def _Dynamic_Add(self, request, response):
375 bulk_request = taskqueue_service_pb.TaskQueueBulkAddRequest()
376 bulk_response = taskqueue_service_pb.TaskQueueBulkAddResponse()
378 bulk_request.add_add_request().CopyFrom(request)
379 self._Dynamic_BulkAdd(bulk_request, bulk_response)
381 assert bulk_response.taskresult_size() == 1
382 result = bulk_response.taskresult(0).result()
384 if result != taskqueue_service_pb.TaskQueueServiceError.OK:
385 raise apiproxy_errors.ApplicationError(result)
386 elif bulk_response.taskresult(0).has_chosen_task_name():
387 response.set_chosen_task_name(
388 bulk_response.taskresult(0).chosen_task_name())
390 def _Dynamic_BulkAdd(self, request, response):
391 """Add many tasks to a queue using a single request.
393 Args:
394 request: The taskqueue_service_pb.TaskQueueBulkAddRequest. See
395 taskqueue_service.proto.
396 response: The taskqueue_service_pb.TaskQueueBulkAddResponse. See
397 taskqueue_service.proto.
400 assert request.add_request_size(), 'taskqueue should prevent empty requests'
402 if not self._IsValidQueue(request.add_request(0).queue_name()):
403 raise apiproxy_errors.ApplicationError(
404 taskqueue_service_pb.TaskQueueServiceError.UNKNOWN_QUEUE)
406 error_found = False
407 task_results_with_chosen_names = []
409 for add_request in request.add_request_list():
410 task_result = response.add_taskresult()
411 error = self._VerifyTaskQueueAddRequest(add_request)
412 if error == taskqueue_service_pb.TaskQueueServiceError.OK:
413 if not add_request.task_name():
414 chosen_name = self._ChooseTaskName()
415 add_request.set_task_name(chosen_name)
416 task_results_with_chosen_names.append(task_result)
417 task_result.set_result(
418 taskqueue_service_pb.TaskQueueServiceError.SKIPPED)
419 else:
420 error_found = True
421 task_result.set_result(error)
423 if error_found:
424 return
426 if request.add_request(0).has_transaction():
427 self._TransactionalBulkAdd(request)
428 elif request.add_request(0).has_app_id():
429 self._DummyTaskStoreBulkAdd(request, response)
430 else:
431 self._NonTransactionalBulkAdd(request, response)
433 for add_request, task_result in zip(request.add_request_list(),
434 response.taskresult_list()):
435 if (task_result.result() ==
436 taskqueue_service_pb.TaskQueueServiceError.SKIPPED):
437 task_result.set_result(taskqueue_service_pb.TaskQueueServiceError.OK)
438 if task_result in task_results_with_chosen_names:
439 task_result.set_chosen_task_name(add_request.task_name())
441 def _TransactionalBulkAdd(self, request):
442 """Uses datastore.AddActions to associate tasks with a transaction.
444 Args:
445 request: The taskqueue_service_pb.TaskQueueBulkAddRequest containing the
446 tasks to add. N.B. all tasks in the request have been validated and
447 assigned unique names.
449 try:
450 apiproxy_stub_map.MakeSyncCall(
451 'datastore_v3', 'AddActions', request, api_base_pb.VoidProto())
452 except apiproxy_errors.ApplicationError, e:
453 raise apiproxy_errors.ApplicationError(
454 e.application_error +
455 taskqueue_service_pb.TaskQueueServiceError.DATASTORE_ERROR,
456 e.error_detail)
458 def _DummyTaskStoreBulkAdd(self, request, response):
459 """Adds tasks to the appropriate DummyTaskStore.
461 Args:
462 request: The taskqueue_service_pb.TaskQueueBulkAddRequest containing the
463 tasks to add. N.B. all tasks in the request have been validated and
464 those with empty names have been assigned unique names.
465 response: The taskqueue_service_pb.TaskQueueBulkAddResponse to populate
466 with the results. N.B. the chosen_task_name field in the response will
467 not be filled-in.
469 store = self.GetDummyTaskStore(request.add_request(0).app_id(),
470 request.add_request(0).queue_name())
471 for add_request, task_result in zip(request.add_request_list(),
472 response.taskresult_list()):
473 try:
474 store.Add(add_request)
475 except apiproxy_errors.ApplicationError, e:
476 task_result.set_result(e.application_error)
477 else:
478 task_result.set_result(taskqueue_service_pb.TaskQueueServiceError.OK)
480 def _NonTransactionalBulkAdd(self, request, response):
481 """Adds tasks to the appropriate list in in self._taskqueues.
483 Args:
484 request: The taskqueue_service_pb.TaskQueueBulkAddRequest containing the
485 tasks to add. N.B. all tasks in the request have been validated and
486 those with empty names have been assigned unique names.
487 response: The taskqueue_service_pb.TaskQueueBulkAddResponse to populate
488 with the results. N.B. the chosen_task_name field in the response will
489 not be filled-in.
491 existing_tasks = self._taskqueues.setdefault(
492 request.add_request(0).queue_name(), [])
493 existing_task_names = set(task.task_name() for task in existing_tasks)
495 for add_request, task_result in zip(request.add_request_list(),
496 response.taskresult_list()):
497 if add_request.task_name() in existing_task_names:
498 task_result.set_result(
499 taskqueue_service_pb.TaskQueueServiceError.TASK_ALREADY_EXISTS)
500 else:
501 existing_tasks.append(add_request)
503 if self._add_event and self._auto_task_running:
504 self._add_event(
505 add_request.eta_usec() / 1000000.0,
506 lambda: self._RunTask(
507 add_request.queue_name(), add_request.task_name()))
509 existing_tasks.sort(_CompareTasksByEta)
511 def _IsValidQueue(self, queue_name):
512 """Determines whether a queue is valid, i.e. tasks can be added to it.
514 Valid queues are the 'default' queue, plus any queues in the queue.yaml
515 file.
517 Args:
518 queue_name: the name of the queue to validate.
520 Returns:
521 True iff queue is valid.
523 if self._all_queues_valid:
524 return True
525 if queue_name == DEFAULT_QUEUE_NAME or queue_name == CRON_QUEUE_NAME:
526 return True
527 queue_info = self.queue_yaml_parser(self._root_path)
528 if queue_info and queue_info.queue:
529 for entry in queue_info.queue:
530 if entry.name == queue_name:
531 return True
532 return False
534 def _RunTask(self, queue_name, task_name):
535 """Returns a fake request for running a task in the dev_appserver.
537 Args:
538 queue_name: The queue the task is in.
539 task_name: The name of the task to run.
541 Returns:
542 None if this task no longer exists or tuple (connection, addrinfo) of
543 a fake connection and address information used to run this task. The
544 task will be deleted after it runs or re-enqueued in the future on
545 failure.
547 task_list = self.GetTasks(queue_name)
548 for task in task_list:
549 if task['name'] == task_name:
550 break
551 else:
552 return None
554 class FakeConnection(object):
555 def __init__(self, input_buffer):
556 self.rfile = StringIO.StringIO(input_buffer)
557 self.wfile = StringIO.StringIO()
558 self.wfile_close = self.wfile.close
559 self.wfile.close = self.connection_done
561 def connection_done(myself):
562 result = myself.wfile.getvalue()
563 myself.wfile_close()
564 first_line, rest = (result.split('\n', 1) + ['', ''])[:2]
565 version, code, rest = (first_line.split(' ', 2) + ['', '500', ''])[:3]
566 if 200 <= int(code) <= 299:
567 self.DeleteTask(queue_name, task_name)
568 return
570 logging.warning('Task named "%s" on queue "%s" failed with code %s; '
571 'will retry in %d seconds',
572 task_name, queue_name, code, self._task_retry_seconds)
573 self._add_event(
574 time.time() + self._task_retry_seconds,
575 lambda: self._RunTask(queue_name, task_name))
577 def close(self):
578 pass
580 def makefile(self, mode, buffsize):
581 if mode.startswith('w'):
582 return self.wfile
583 else:
584 return self.rfile
586 payload = StringIO.StringIO()
587 payload.write('%s %s HTTP/1.1\r\n' % (task['method'], task['url']))
588 for key, value in task['headers']:
589 payload.write('%s: %s\r\n' % (key, value))
590 payload.write('\r\n')
591 payload.write(task['body'])
593 return FakeConnection(payload.getvalue()), ('0.1.0.2', 80)
595 def GetQueues(self):
596 """Gets all the applications's queues.
598 Returns:
599 A list of dictionaries, where each dictionary contains one queue's
600 attributes. E.g.:
601 [{'name': 'some-queue',
602 'max_rate': '1/s',
603 'bucket_size': 5,
604 'oldest_task': '2009/02/02 05:37:42',
605 'eta_delta': '0:00:06.342511 ago',
606 'tasks_in_queue': 12}, ...]
607 The list of queues always includes the default queue.
609 queues = []
610 queue_info = self.queue_yaml_parser(self._root_path)
611 has_default = False
612 if queue_info and queue_info.queue:
613 for entry in queue_info.queue:
614 if entry.name == DEFAULT_QUEUE_NAME:
615 has_default = True
616 queue = {}
617 queues.append(queue)
618 queue['name'] = entry.name
619 queue['max_rate'] = entry.rate
620 if entry.bucket_size:
621 queue['bucket_size'] = entry.bucket_size
622 else:
623 queue['bucket_size'] = DEFAULT_BUCKET_SIZE
625 tasks = self._taskqueues.setdefault(entry.name, [])
626 if tasks:
627 queue['oldest_task'] = _FormatEta(tasks[0].eta_usec())
628 queue['eta_delta'] = _EtaDelta(tasks[0].eta_usec())
629 else:
630 queue['oldest_task'] = ''
631 queue['tasks_in_queue'] = len(tasks)
633 if not has_default:
634 queue = {}
635 queues.append(queue)
636 queue['name'] = DEFAULT_QUEUE_NAME
637 queue['max_rate'] = DEFAULT_RATE
638 queue['bucket_size'] = DEFAULT_BUCKET_SIZE
640 tasks = self._taskqueues.get(DEFAULT_QUEUE_NAME, [])
641 if tasks:
642 queue['oldest_task'] = _FormatEta(tasks[0].eta_usec())
643 queue['eta_delta'] = _EtaDelta(tasks[0].eta_usec())
644 else:
645 queue['oldest_task'] = ''
646 queue['tasks_in_queue'] = len(tasks)
647 return queues
649 def GetTasks(self, queue_name):
650 """Gets a queue's tasks.
652 Args:
653 queue_name: Queue's name to return tasks for.
655 Returns:
656 A list of dictionaries, where each dictionary contains one task's
657 attributes. E.g.
658 [{'name': 'task-123',
659 'url': '/update',
660 'method': 'GET',
661 'eta': '2009/02/02 05:37:42',
662 'eta_delta': '0:00:06.342511 ago',
663 'body': '',
664 'headers': [('user-header', 'some-value')
665 ('X-AppEngine-QueueName': 'update-queue'),
666 ('X-AppEngine-TaskName': 'task-123'),
667 ('X-AppEngine-TaskRetryCount': '0'),
668 ('X-AppEngine-Development-Payload': '1'),
669 ('Content-Length': 0),
670 ('Content-Type': 'application/octet-stream')]
672 Raises:
673 ValueError: A task request contains an unknown HTTP method type.
675 tasks = self._taskqueues.get(queue_name, [])
676 result_tasks = []
677 for task_request in tasks:
678 task = {}
679 result_tasks.append(task)
680 task['name'] = task_request.task_name()
681 task['url'] = task_request.url()
682 method = task_request.method()
683 if method == taskqueue_service_pb.TaskQueueAddRequest.GET:
684 task['method'] = 'GET'
685 elif method == taskqueue_service_pb.TaskQueueAddRequest.POST:
686 task['method'] = 'POST'
687 elif method == taskqueue_service_pb.TaskQueueAddRequest.HEAD:
688 task['method'] = 'HEAD'
689 elif method == taskqueue_service_pb.TaskQueueAddRequest.PUT:
690 task['method'] = 'PUT'
691 elif method == taskqueue_service_pb.TaskQueueAddRequest.DELETE:
692 task['method'] = 'DELETE'
693 else:
694 raise ValueError('Unexpected method: %d' % method)
696 task['eta'] = _FormatEta(task_request.eta_usec())
697 task['eta_delta'] = _EtaDelta(task_request.eta_usec())
698 task['body'] = base64.b64encode(task_request.body())
700 headers = [(header.key(), header.value())
701 for header in task_request.header_list()
702 if header.key().lower() not in BUILT_IN_HEADERS]
704 headers.append(('X-AppEngine-QueueName', queue_name))
705 headers.append(('X-AppEngine-TaskName', task['name']))
706 headers.append(('X-AppEngine-TaskRetryCount', '0'))
707 headers.append(('X-AppEngine-Development-Payload', '1'))
708 headers.append(('Content-Length', len(task['body'])))
709 if 'content-type' not in frozenset(key.lower() for key, _ in headers):
710 headers.append(('Content-Type', 'application/octet-stream'))
711 task['headers'] = headers
713 return result_tasks
715 def DeleteTask(self, queue_name, task_name):
716 """Deletes a task from a queue.
718 Args:
719 queue_name: the name of the queue to delete the task from.
720 task_name: the name of the task to delete.
722 tasks = self._taskqueues.get(queue_name, [])
723 for task in tasks:
724 if task.task_name() == task_name:
725 tasks.remove(task)
726 return
728 def FlushQueue(self, queue_name):
729 """Removes all tasks from a queue.
731 Args:
732 queue_name: the name of the queue to remove tasks from.
734 self._taskqueues[queue_name] = []
736 def _Dynamic_UpdateQueue(self, request, unused_response):
737 """Local implementation of the UpdateQueue RPC in TaskQueueService.
739 Must adhere to the '_Dynamic_' naming convention for stubbing to work.
740 See taskqueue_service.proto for a full description of the RPC.
742 Args:
743 request: A taskqueue_service_pb.TaskQueueUpdateQueueRequest.
744 unused_response: A taskqueue_service_pb.TaskQueueUpdateQueueResponse.
745 Not used.
747 queues = self._app_queues.setdefault(request.app_id(), {})
748 if request.queue_name() in queues and queues[request.queue_name()] is None:
749 raise apiproxy_errors.ApplicationError(
750 taskqueue_service_pb.TaskQueueServiceError.TOMBSTONED_QUEUE)
752 defensive_copy = self._QueueDetails()
753 defensive_copy.CopyFrom(request)
755 queues[request.queue_name()] = defensive_copy
757 def _Dynamic_FetchQueues(self, request, response):
758 """Local implementation of the FetchQueues RPC in TaskQueueService.
760 Must adhere to the '_Dynamic_' naming convention for stubbing to work.
761 See taskqueue_service.proto for a full description of the RPC.
763 Args:
764 request: A taskqueue_service_pb.TaskQueueFetchQueuesRequest.
765 response: A taskqueue_service_pb.TaskQueueFetchQueuesResponse.
767 queues = self._app_queues.get(request.app_id(), {})
768 for unused_key, queue in sorted(queues.items()):
769 if request.max_rows() == response.queue_size():
770 break
772 if queue is None:
773 continue
775 response_queue = response.add_queue()
776 response_queue.set_queue_name(queue.queue_name())
777 response_queue.set_bucket_refill_per_second(
778 queue.bucket_refill_per_second())
779 response_queue.set_bucket_capacity(queue.bucket_capacity())
780 response_queue.set_user_specified_rate(queue.user_specified_rate())
781 response_queue.set_paused(queue.paused)
783 def _Dynamic_FetchQueueStats(self, request, response):
784 """Local 'random' implementation of the TaskQueueService.FetchQueueStats.
786 This implementation loads some stats from the dummy store,
787 the rest with random numbers.
788 Must adhere to the '_Dynamic_' naming convention for stubbing to work.
789 See taskqueue_service.proto for a full description of the RPC.
791 Args:
792 request: A taskqueue_service_pb.TaskQueueFetchQueueStatsRequest.
793 response: A taskqueue_service_pb.TaskQueueFetchQueueStatsResponse.
795 for queue in request.queue_name_list():
796 store = self.GetDummyTaskStore(request.app_id(), queue)
797 stats = response.add_queuestats()
798 stats.set_num_tasks(store.Count())
799 if stats.num_tasks() == 0:
800 stats.set_oldest_eta_usec(-1)
801 else:
802 stats.set_oldest_eta_usec(store.Oldest())
804 if random.randint(0, 9) > 0:
805 scanner_info = stats.mutable_scanner_info()
806 scanner_info.set_executed_last_minute(random.randint(0, 10))
807 scanner_info.set_executed_last_hour(scanner_info.executed_last_minute()
808 + random.randint(0, 100))
809 scanner_info.set_sampling_duration_seconds(random.random() * 10000.0)
811 def GetDummyTaskStore(self, app_id, queue_name):
812 """Get the dummy task store for this app_id/queue_name pair.
814 Creates an entry and populates it, if there's not already an entry.
816 Args:
817 app_id: the app_id.
818 queue_name: the queue_name.
820 Returns:
821 the existing or the new dummy store.
823 task_store_key = (app_id, queue_name)
824 if task_store_key not in admin_console_dummy_tasks:
825 store = _DummyTaskStore()
826 if not self._all_queues_valid and queue_name != CRON_QUEUE_NAME:
827 store.Populate(random.randint(10, 100))
828 admin_console_dummy_tasks[task_store_key] = store
829 else:
830 store = admin_console_dummy_tasks[task_store_key]
831 return store
833 def _Dynamic_QueryTasks(self, request, response):
834 """Local implementation of the TaskQueueService.QueryTasks RPC.
836 Uses the dummy store, creating tasks if this is the first time the
837 queue has been seen.
839 Args:
840 request: A taskqueue_service_pb.TaskQueueQueryTasksRequest.
841 response: A taskqueue_service_pb.TaskQueueQueryTasksResponse.
843 store = self.GetDummyTaskStore(request.app_id(), request.queue_name())
845 if request.has_start_eta_usec():
846 tasks = store.Lookup(request.max_rows(), name=request.start_task_name(),
847 eta=request.start_eta_usec())
848 else:
849 tasks = store.Lookup(request.max_rows(), name=request.start_task_name())
850 for task in tasks:
851 response.add_task().MergeFrom(task)
853 def _Dynamic_Delete(self, request, response):
854 """Local delete implementation of TaskQueueService.Delete.
856 Deletes tasks from the dummy store. A 1/20 chance of a transient error.
858 Args:
859 request: A taskqueue_service_pb.TaskQueueDeleteRequest.
860 response: A taskqueue_service_pb.TaskQueueDeleteResponse.
862 task_store_key = (request.app_id(), request.queue_name())
863 if task_store_key not in admin_console_dummy_tasks:
864 for _ in request.task_name_list():
865 response.add_result(
866 taskqueue_service_pb.TaskQueueServiceError.UNKNOWN_QUEUE)
867 return
869 store = admin_console_dummy_tasks[task_store_key]
870 for taskname in request.task_name_list():
871 if random.random() <= 0.05:
872 response.add_result(
873 taskqueue_service_pb.TaskQueueServiceError.TRANSIENT_ERROR)
874 else:
875 response.add_result(store.Delete(taskname))
877 def _Dynamic_DeleteQueue(self, request, response):
878 """Local delete implementation of TaskQueueService.DeleteQueue.
880 Args:
881 request: A taskqueue_service_pb.TaskQueueDeleteQueueRequest.
882 response: A taskqueue_service_pb.TaskQueueDeleteQueueResponse.
884 if not request.queue_name():
885 raise apiproxy_errors.ApplicationError(
886 taskqueue_service_pb.TaskQueueServiceError.INVALID_QUEUE_NAME)
888 queues = self._app_queues.get(request.app_id(), {})
889 if request.queue_name() not in queues:
890 raise apiproxy_errors.ApplicationError(
891 taskqueue_service_pb.TaskQueueServiceError.UNKNOWN_QUEUE)
892 elif queues[request.queue_name()] is None:
893 raise apiproxy_errors.ApplicationError(
894 taskqueue_service_pb.TaskQueueServiceError.TOMBSTONED_QUEUE)
896 queues[request.queue_name()] = None
898 def _Dynamic_PauseQueue(self, request, response):
899 """Local pause implementation of TaskQueueService.PauseQueue.
901 Args:
902 request: A taskqueue_service_pb.TaskQueuePauseQueueRequest.
903 response: A taskqueue_service_pb.TaskQueuePauseQueueResponse.
905 if not request.queue_name():
906 raise apiproxy_errors.ApplicationError(
907 taskqueue_service_pb.TaskQueueServiceError.INVALID_QUEUE_NAME)
909 queues = self._app_queues.get(request.app_id(), {})
910 if request.queue_name() != DEFAULT_QUEUE_NAME:
911 if request.queue_name() not in queues:
912 raise apiproxy_errors.ApplicationError(
913 taskqueue_service_pb.TaskQueueServiceError.UNKNOWN_QUEUE)
914 elif queues[request.queue_name()] is None:
915 raise apiproxy_errors.ApplicationError(
916 taskqueue_service_pb.TaskQueueServiceError.TOMBSTONED_QUEUE)
918 queues[request.queue_name()].paused = request.pause()
920 def _Dynamic_PurgeQueue(self, request, response):
921 """Local purge implementation of TaskQueueService.PurgeQueue.
923 Args:
924 request: A taskqueue_service_pb.TaskQueuePurgeQueueRequest.
925 response: A taskqueue_service_pb.TaskQueuePurgeQueueResponse.
927 if not request.queue_name():
928 raise apiproxy_errors.ApplicationError(
929 taskqueue_service_pb.TaskQueueServiceError.INVALID_QUEUE_NAME)
931 queues = self._app_queues.get(request.app_id(), {})
932 if request.queue_name() != DEFAULT_QUEUE_NAME:
933 if request.queue_name() not in queues:
934 raise apiproxy_errors.ApplicationError(
935 taskqueue_service_pb.TaskQueueServiceError.UNKNOWN_QUEUE)
936 elif queues[request.queue_name()] is None:
937 raise apiproxy_errors.ApplicationError(
938 taskqueue_service_pb.TaskQueueServiceError.TOMBSTONED_QUEUE)
940 store = self.GetDummyTaskStore(request.app_id(), request.queue_name())
941 for task in store.Lookup(store.Count()):
942 store.Delete(task.task_name())
944 self.FlushQueue(request.queue_name())
946 def _Dynamic_UpdateStorageLimit(self, request, response):
947 """Local implementation of TaskQueueService.UpdateStorageLimit.
948 Args:
949 request: A taskqueue_service_pb.TaskQueueUpdateStorageLimitRequest.
950 response: A taskqueue_service_pb.TaskQueueUpdateStorageLimitResponse.
952 if request.limit() < 0 or request.limit() > 1000 * (1024 ** 4):
953 raise apiproxy_errors.ApplicationError(
954 taskqueue_service_pb.TaskQueueServiceError.INVALID_REQUEST)
956 response.set_new_limit(request.limit())