3 # Copyright 2007 Google Inc.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
18 """Stub version of the Task Queue API.
20 This stub stores tasks and runs them via dev_appserver's AddEvent capability.
21 It also validates the tasks by checking their queue name against the queue.yaml.
23 As well as implementing Task Queue API functions, the stub exposes various other
24 functions that are used by the dev_appserver's admin console to display the
25 application's queues and tasks.
41 import taskqueue_service_pb
43 from google
.appengine
.api
import api_base_pb
44 from google
.appengine
.api
import apiproxy_stub
45 from google
.appengine
.api
import apiproxy_stub_map
46 from google
.appengine
.api
import queueinfo
47 from google
.appengine
.runtime
import apiproxy_errors
50 DEFAULT_RATE
= '5.00/s'
52 DEFAULT_BUCKET_SIZE
= 5
54 MAX_ETA_DELTA_DAYS
= 30
56 admin_console_dummy_tasks
= {}
58 BUILT_IN_HEADERS
= set(['x-appengine-queuename',
59 'x-appengine-taskname',
60 'x-appengine-taskretrycount',
61 'x-appengine-development-payload',
64 DEFAULT_QUEUE_NAME
= 'default'
66 CRON_QUEUE_NAME
= '__cron'
69 class _DummyTaskStore(object):
70 """A class that encapsulates a sorted store of tasks.
72 Used for testing the admin console.
77 self
._sorted
_by
_name
= []
78 self
._sorted
_by
_eta
= []
80 def _InsertTask(self
, task
):
81 """Insert a task into the dummy store, keeps lists sorted.
87 name
= task
.task_name()
88 bisect
.insort_left(self
._sorted
_by
_eta
, (eta
, name
, task
))
89 bisect
.insort_left(self
._sorted
_by
_name
, (name
, task
))
91 def Lookup(self
, maximum
, name
=None, eta
=None):
92 """Lookup a number of sorted tasks from the store.
94 If 'eta' is specified, the tasks are looked up in a list sorted by 'eta',
95 then 'name'. Otherwise they are sorted by 'name'. We need to be able to
96 sort by 'eta' and 'name' because tasks can have identical eta. If you had
97 20 tasks with the same ETA, you wouldn't be able to page past them, since
98 the 'next eta' would give the first one again. Names are unique, though.
101 maximum: the maximum number of tasks to return.
102 name: a task name to start with.
103 eta: an eta to start with.
106 A list of up to 'maximum' tasks.
109 ValueError: if the task store gets corrupted.
112 pos
= bisect
.bisect_left(self
._sorted
_by
_name
, (name
,))
113 tasks
= (x
[1] for x
in self
._sorted
_by
_name
[pos
:pos
+ maximum
])
116 raise ValueError('must supply name or eta')
117 pos
= bisect
.bisect_left(self
._sorted
_by
_eta
, (eta
, name
))
118 tasks
= (x
[2] for x
in self
._sorted
_by
_eta
[pos
:pos
+ maximum
])
122 """Returns the number of tasks in the store."""
123 return len(self
._sorted
_by
_name
)
126 """Returns the oldest eta in the store, or None if no tasks."""
127 if self
._sorted
_by
_eta
:
128 return self
._sorted
_by
_eta
[0][0]
131 def Add(self
, request
):
132 """Inserts a new task into the store.
135 request: A taskqueue_service_pb.TaskQueueAddRequest.
138 apiproxy_errors.ApplicationError: If a task with the same name is already
141 pos
= bisect
.bisect_left(self
._sorted
_by
_name
, (request
.task_name(),))
142 if (pos
< len(self
._sorted
_by
_name
) and
143 self
._sorted
_by
_name
[pos
][0] == request
.task_name()):
144 raise apiproxy_errors
.ApplicationError(
145 taskqueue_service_pb
.TaskQueueServiceError
.TASK_ALREADY_EXISTS
)
147 now
= datetime
.datetime
.utcnow()
148 now_sec
= time
.mktime(now
.timetuple())
149 task
= taskqueue_service_pb
.TaskQueueQueryTasksResponse_Task()
150 task
.set_task_name(request
.task_name())
151 task
.set_eta_usec(request
.eta_usec())
152 task
.set_creation_time_usec(now_sec
* 1e6
)
153 task
.set_url(request
.url())
154 task
.set_method(request
.method())
155 for keyvalue
in task
.header_list():
156 header
= task
.add_header()
157 header
.set_key(keyvalue
.key())
158 header
.set_value(keyvalue
.value())
159 if request
.has_description():
160 task
.set_description(request
.description())
161 if request
.has_body():
162 task
.set_body(request
.body())
163 if request
.has_crontimetable():
164 task
.mutable_crontimetable().set_schedule(
165 request
.crontimetable().schedule())
166 task
.mutable_crontimetable().set_timezone(
167 request
.crontimetable().timezone())
168 self
._InsertTask
(task
)
170 def Delete(self
, name
):
171 """Deletes a task from the store by name.
174 name: the name of the task to delete.
177 TaskQueueServiceError.UNKNOWN_TASK: if the task is unknown.
178 TaskQueueServiceError.INTERNAL_ERROR: if the store is corrupted.
179 TaskQueueServiceError.OK: otherwise.
181 pos
= bisect
.bisect_left(self
._sorted
_by
_name
, (name
,))
182 if pos
>= len(self
._sorted
_by
_name
):
183 return taskqueue_service_pb
.TaskQueueServiceError
.UNKNOWN_TASK
184 if self
._sorted
_by
_name
[pos
][1].task_name() != name
:
185 logging
.info('looking for task name %s, got task name %s', name
,
186 self
._sorted
_by
_name
[pos
][1].task_name())
187 return taskqueue_service_pb
.TaskQueueServiceError
.UNKNOWN_TASK
188 old_task
= self
._sorted
_by
_name
.pop(pos
)[1]
189 eta
= old_task
.eta_usec()
190 pos
= bisect
.bisect_left(self
._sorted
_by
_eta
, (eta
, name
, None))
191 if self
._sorted
_by
_eta
[pos
][2] is not old_task
:
192 logging
.error('task store corrupted')
193 return taskqueue_service_pb
.TaskQueueServiceError
.INTERNAL_ERRROR
194 self
._sorted
_by
_eta
.pop(pos
)
195 return taskqueue_service_pb
.TaskQueueServiceError
.OK
197 def Populate(self
, num_tasks
):
198 """Populates the store with a number of tasks.
201 num_tasks: the number of tasks to insert.
203 now
= datetime
.datetime
.utcnow()
204 now_sec
= time
.mktime(now
.timetuple())
207 """Creates a new task and randomly populates values."""
208 task
= taskqueue_service_pb
.TaskQueueQueryTasksResponse_Task()
209 task
.set_task_name(''.join(random
.choice(string
.ascii_lowercase
)
211 task
.set_eta_usec(int(now_sec
* 1e6
) + random
.randint(-10e6
, 600e6
))
213 task
.set_creation_time_usec(min(now_sec
* 1e6
, task
.eta_usec()) -
214 random
.randint(0, 2e7
))
216 task
.set_url(random
.choice(['/a', '/b', '/c', '/d']))
217 if random
.random() < 0.2:
219 taskqueue_service_pb
.TaskQueueQueryTasksResponse_Task
.POST
)
220 task
.set_body('A' * 2000)
223 taskqueue_service_pb
.TaskQueueQueryTasksResponse_Task
.GET
)
224 task
.set_retry_count(max(0, random
.randint(-10, 5)))
225 if random
.random() < 0.3:
226 random_headers
= [('nexus', 'one'),
228 ('content-type', 'text/plain'),
229 ('from', 'user@email.com')]
230 for _
in xrange(random
.randint(1, 4)):
231 elem
= random
.randint(0, len(random_headers
)-1)
232 key
, value
= random_headers
.pop(elem
)
233 header_proto
= task
.add_header()
234 header_proto
.set_key(key
)
235 header_proto
.set_value(value
)
238 for _
in range(num_tasks
):
239 self
._InsertTask
(RandomTask())
242 def _ParseQueueYaml(unused_self
, root_path
):
243 """Loads the queue.yaml file and parses it.
246 unused_self: Allows this function to be bound to a class member. Not used.
247 root_path: Directory containing queue.yaml. Not used.
250 None if queue.yaml doesn't exist, otherwise a queueinfo.QueueEntry object
251 populated from the queue.yaml.
253 if root_path
is None:
255 for queueyaml
in ('queue.yaml', 'queue.yml'):
257 fh
= open(os
.path
.join(root_path
, queueyaml
), 'r')
261 queue_info
= queueinfo
.LoadSingleQueue(fh
)
268 def _CompareTasksByEta(a
, b
):
269 """Python sort comparator for tasks by estimated time of arrival (ETA).
272 a: A taskqueue_service_pb.TaskQueueAddRequest.
273 b: A taskqueue_service_pb.TaskQueueAddRequest.
276 Standard 1/0/-1 comparison result.
278 if a
.eta_usec() > b
.eta_usec():
280 if a
.eta_usec() < b
.eta_usec():
285 def _FormatEta(eta_usec
):
286 """Formats a task ETA as a date string in UTC."""
287 eta
= datetime
.datetime
.fromtimestamp(eta_usec
/1000000)
288 return eta
.strftime('%Y/%m/%d %H:%M:%S')
291 def _EtaDelta(eta_usec
):
292 """Formats a task ETA as a relative time string."""
293 eta
= datetime
.datetime
.fromtimestamp(eta_usec
/1000000)
294 now
= datetime
.datetime
.utcnow()
296 return str(eta
- now
) + ' from now'
298 return str(now
- eta
) + ' ago'
301 class TaskQueueServiceStub(apiproxy_stub
.APIProxyStub
):
302 """Python only task queue service stub.
304 This stub executes tasks when enabled by using the dev_appserver's AddEvent
305 capability. When task running is disabled this stub will store tasks for
306 display on a console, where the user may manually execute the tasks.
309 queue_yaml_parser
= _ParseQueueYaml
312 service_name
='taskqueue',
314 auto_task_running
=False,
315 task_retry_seconds
=30,
316 _all_queues_valid
=False):
320 service_name: Service name expected for all calls.
321 root_path: Root path to the directory of the application which may contain
322 a queue.yaml file. If None, then it's assumed no queue.yaml file is
324 auto_task_running: When True, the dev_appserver should automatically
325 run tasks after they are enqueued.
326 task_retry_seconds: How long to wait between task executions after a
329 super(TaskQueueServiceStub
, self
).__init
__(service_name
)
330 self
._taskqueues
= {}
331 self
._next
_task
_id
= 1
332 self
._root
_path
= root_path
333 self
._all
_queues
_valid
= _all_queues_valid
335 self
._add
_event
= None
336 self
._auto
_task
_running
= auto_task_running
337 self
._task
_retry
_seconds
= task_retry_seconds
339 self
._app
_queues
= {}
341 class _QueueDetails(taskqueue_service_pb
.TaskQueueUpdateQueueRequest
):
342 def __init__(self
, paused
=False):
345 def _ChooseTaskName(self
):
346 """Returns a string containing a unique task name."""
347 self
._next
_task
_id
+= 1
348 return 'task%d' % (self
._next
_task
_id
- 1)
350 def _VerifyTaskQueueAddRequest(self
, request
):
351 """Checks that a TaskQueueAddRequest is valid.
353 Checks that a TaskQueueAddRequest specifies a valid eta and a valid queue.
356 request: The taskqueue_service_pb.TaskQueueAddRequest to validate.
359 A taskqueue_service_pb.TaskQueueServiceError indicating any problems with
360 the request or taskqueue_service_pb.TaskQueueServiceError.OK if it is
363 if request
.eta_usec() < 0:
364 return taskqueue_service_pb
.TaskQueueServiceError
.INVALID_ETA
366 eta
= datetime
.datetime
.utcfromtimestamp(request
.eta_usec() / 1e6
)
367 max_eta
= (datetime
.datetime
.utcnow() +
368 datetime
.timedelta(days
=MAX_ETA_DELTA_DAYS
))
370 return taskqueue_service_pb
.TaskQueueServiceError
.INVALID_ETA
372 return taskqueue_service_pb
.TaskQueueServiceError
.OK
374 def _Dynamic_Add(self
, request
, response
):
375 bulk_request
= taskqueue_service_pb
.TaskQueueBulkAddRequest()
376 bulk_response
= taskqueue_service_pb
.TaskQueueBulkAddResponse()
378 bulk_request
.add_add_request().CopyFrom(request
)
379 self
._Dynamic
_BulkAdd
(bulk_request
, bulk_response
)
381 assert bulk_response
.taskresult_size() == 1
382 result
= bulk_response
.taskresult(0).result()
384 if result
!= taskqueue_service_pb
.TaskQueueServiceError
.OK
:
385 raise apiproxy_errors
.ApplicationError(result
)
386 elif bulk_response
.taskresult(0).has_chosen_task_name():
387 response
.set_chosen_task_name(
388 bulk_response
.taskresult(0).chosen_task_name())
390 def _Dynamic_BulkAdd(self
, request
, response
):
391 """Add many tasks to a queue using a single request.
394 request: The taskqueue_service_pb.TaskQueueBulkAddRequest. See
395 taskqueue_service.proto.
396 response: The taskqueue_service_pb.TaskQueueBulkAddResponse. See
397 taskqueue_service.proto.
400 assert request
.add_request_size(), 'taskqueue should prevent empty requests'
402 if not self
._IsValidQueue
(request
.add_request(0).queue_name()):
403 raise apiproxy_errors
.ApplicationError(
404 taskqueue_service_pb
.TaskQueueServiceError
.UNKNOWN_QUEUE
)
407 task_results_with_chosen_names
= []
409 for add_request
in request
.add_request_list():
410 task_result
= response
.add_taskresult()
411 error
= self
._VerifyTaskQueueAddRequest
(add_request
)
412 if error
== taskqueue_service_pb
.TaskQueueServiceError
.OK
:
413 if not add_request
.task_name():
414 chosen_name
= self
._ChooseTaskName
()
415 add_request
.set_task_name(chosen_name
)
416 task_results_with_chosen_names
.append(task_result
)
417 task_result
.set_result(
418 taskqueue_service_pb
.TaskQueueServiceError
.SKIPPED
)
421 task_result
.set_result(error
)
426 if request
.add_request(0).has_transaction():
427 self
._TransactionalBulkAdd
(request
)
428 elif request
.add_request(0).has_app_id():
429 self
._DummyTaskStoreBulkAdd
(request
, response
)
431 self
._NonTransactionalBulkAdd
(request
, response
)
433 for add_request
, task_result
in zip(request
.add_request_list(),
434 response
.taskresult_list()):
435 if (task_result
.result() ==
436 taskqueue_service_pb
.TaskQueueServiceError
.SKIPPED
):
437 task_result
.set_result(taskqueue_service_pb
.TaskQueueServiceError
.OK
)
438 if task_result
in task_results_with_chosen_names
:
439 task_result
.set_chosen_task_name(add_request
.task_name())
441 def _TransactionalBulkAdd(self
, request
):
442 """Uses datastore.AddActions to associate tasks with a transaction.
445 request: The taskqueue_service_pb.TaskQueueBulkAddRequest containing the
446 tasks to add. N.B. all tasks in the request have been validated and
447 assigned unique names.
450 apiproxy_stub_map
.MakeSyncCall(
451 'datastore_v3', 'AddActions', request
, api_base_pb
.VoidProto())
452 except apiproxy_errors
.ApplicationError
, e
:
453 raise apiproxy_errors
.ApplicationError(
454 e
.application_error
+
455 taskqueue_service_pb
.TaskQueueServiceError
.DATASTORE_ERROR
,
458 def _DummyTaskStoreBulkAdd(self
, request
, response
):
459 """Adds tasks to the appropriate DummyTaskStore.
462 request: The taskqueue_service_pb.TaskQueueBulkAddRequest containing the
463 tasks to add. N.B. all tasks in the request have been validated and
464 those with empty names have been assigned unique names.
465 response: The taskqueue_service_pb.TaskQueueBulkAddResponse to populate
466 with the results. N.B. the chosen_task_name field in the response will
469 store
= self
.GetDummyTaskStore(request
.add_request(0).app_id(),
470 request
.add_request(0).queue_name())
471 for add_request
, task_result
in zip(request
.add_request_list(),
472 response
.taskresult_list()):
474 store
.Add(add_request
)
475 except apiproxy_errors
.ApplicationError
, e
:
476 task_result
.set_result(e
.application_error
)
478 task_result
.set_result(taskqueue_service_pb
.TaskQueueServiceError
.OK
)
480 def _NonTransactionalBulkAdd(self
, request
, response
):
481 """Adds tasks to the appropriate list in in self._taskqueues.
484 request: The taskqueue_service_pb.TaskQueueBulkAddRequest containing the
485 tasks to add. N.B. all tasks in the request have been validated and
486 those with empty names have been assigned unique names.
487 response: The taskqueue_service_pb.TaskQueueBulkAddResponse to populate
488 with the results. N.B. the chosen_task_name field in the response will
491 existing_tasks
= self
._taskqueues
.setdefault(
492 request
.add_request(0).queue_name(), [])
493 existing_task_names
= set(task
.task_name() for task
in existing_tasks
)
495 for add_request
, task_result
in zip(request
.add_request_list(),
496 response
.taskresult_list()):
497 if add_request
.task_name() in existing_task_names
:
498 task_result
.set_result(
499 taskqueue_service_pb
.TaskQueueServiceError
.TASK_ALREADY_EXISTS
)
501 existing_tasks
.append(add_request
)
503 if self
._add
_event
and self
._auto
_task
_running
:
505 add_request
.eta_usec() / 1000000.0,
506 lambda: self
._RunTask
(
507 add_request
.queue_name(), add_request
.task_name()))
509 existing_tasks
.sort(_CompareTasksByEta
)
511 def _IsValidQueue(self
, queue_name
):
512 """Determines whether a queue is valid, i.e. tasks can be added to it.
514 Valid queues are the 'default' queue, plus any queues in the queue.yaml
518 queue_name: the name of the queue to validate.
521 True iff queue is valid.
523 if self
._all
_queues
_valid
:
525 if queue_name
== DEFAULT_QUEUE_NAME
or queue_name
== CRON_QUEUE_NAME
:
527 queue_info
= self
.queue_yaml_parser(self
._root
_path
)
528 if queue_info
and queue_info
.queue
:
529 for entry
in queue_info
.queue
:
530 if entry
.name
== queue_name
:
534 def _RunTask(self
, queue_name
, task_name
):
535 """Returns a fake request for running a task in the dev_appserver.
538 queue_name: The queue the task is in.
539 task_name: The name of the task to run.
542 None if this task no longer exists or tuple (connection, addrinfo) of
543 a fake connection and address information used to run this task. The
544 task will be deleted after it runs or re-enqueued in the future on
547 task_list
= self
.GetTasks(queue_name
)
548 for task
in task_list
:
549 if task
['name'] == task_name
:
554 class FakeConnection(object):
555 def __init__(self
, input_buffer
):
556 self
.rfile
= StringIO
.StringIO(input_buffer
)
557 self
.wfile
= StringIO
.StringIO()
558 self
.wfile_close
= self
.wfile
.close
559 self
.wfile
.close
= self
.connection_done
561 def connection_done(myself
):
562 result
= myself
.wfile
.getvalue()
564 first_line
, rest
= (result
.split('\n', 1) + ['', ''])[:2]
565 version
, code
, rest
= (first_line
.split(' ', 2) + ['', '500', ''])[:3]
566 if 200 <= int(code
) <= 299:
567 self
.DeleteTask(queue_name
, task_name
)
570 logging
.warning('Task named "%s" on queue "%s" failed with code %s; '
571 'will retry in %d seconds',
572 task_name
, queue_name
, code
, self
._task
_retry
_seconds
)
574 time
.time() + self
._task
_retry
_seconds
,
575 lambda: self
._RunTask
(queue_name
, task_name
))
580 def makefile(self
, mode
, buffsize
):
581 if mode
.startswith('w'):
586 payload
= StringIO
.StringIO()
587 payload
.write('%s %s HTTP/1.1\r\n' % (task
['method'], task
['url']))
588 for key
, value
in task
['headers']:
589 payload
.write('%s: %s\r\n' % (key
, value
))
590 payload
.write('\r\n')
591 payload
.write(task
['body'])
593 return FakeConnection(payload
.getvalue()), ('0.1.0.2', 80)
596 """Gets all the applications's queues.
599 A list of dictionaries, where each dictionary contains one queue's
601 [{'name': 'some-queue',
604 'oldest_task': '2009/02/02 05:37:42',
605 'eta_delta': '0:00:06.342511 ago',
606 'tasks_in_queue': 12}, ...]
607 The list of queues always includes the default queue.
610 queue_info
= self
.queue_yaml_parser(self
._root
_path
)
612 if queue_info
and queue_info
.queue
:
613 for entry
in queue_info
.queue
:
614 if entry
.name
== DEFAULT_QUEUE_NAME
:
618 queue
['name'] = entry
.name
619 queue
['max_rate'] = entry
.rate
620 if entry
.bucket_size
:
621 queue
['bucket_size'] = entry
.bucket_size
623 queue
['bucket_size'] = DEFAULT_BUCKET_SIZE
625 tasks
= self
._taskqueues
.setdefault(entry
.name
, [])
627 queue
['oldest_task'] = _FormatEta(tasks
[0].eta_usec())
628 queue
['eta_delta'] = _EtaDelta(tasks
[0].eta_usec())
630 queue
['oldest_task'] = ''
631 queue
['tasks_in_queue'] = len(tasks
)
636 queue
['name'] = DEFAULT_QUEUE_NAME
637 queue
['max_rate'] = DEFAULT_RATE
638 queue
['bucket_size'] = DEFAULT_BUCKET_SIZE
640 tasks
= self
._taskqueues
.get(DEFAULT_QUEUE_NAME
, [])
642 queue
['oldest_task'] = _FormatEta(tasks
[0].eta_usec())
643 queue
['eta_delta'] = _EtaDelta(tasks
[0].eta_usec())
645 queue
['oldest_task'] = ''
646 queue
['tasks_in_queue'] = len(tasks
)
649 def GetTasks(self
, queue_name
):
650 """Gets a queue's tasks.
653 queue_name: Queue's name to return tasks for.
656 A list of dictionaries, where each dictionary contains one task's
658 [{'name': 'task-123',
661 'eta': '2009/02/02 05:37:42',
662 'eta_delta': '0:00:06.342511 ago',
664 'headers': [('user-header', 'some-value')
665 ('X-AppEngine-QueueName': 'update-queue'),
666 ('X-AppEngine-TaskName': 'task-123'),
667 ('X-AppEngine-TaskRetryCount': '0'),
668 ('X-AppEngine-Development-Payload': '1'),
669 ('Content-Length': 0),
670 ('Content-Type': 'application/octet-stream')]
673 ValueError: A task request contains an unknown HTTP method type.
675 tasks
= self
._taskqueues
.get(queue_name
, [])
677 for task_request
in tasks
:
679 result_tasks
.append(task
)
680 task
['name'] = task_request
.task_name()
681 task
['url'] = task_request
.url()
682 method
= task_request
.method()
683 if method
== taskqueue_service_pb
.TaskQueueAddRequest
.GET
:
684 task
['method'] = 'GET'
685 elif method
== taskqueue_service_pb
.TaskQueueAddRequest
.POST
:
686 task
['method'] = 'POST'
687 elif method
== taskqueue_service_pb
.TaskQueueAddRequest
.HEAD
:
688 task
['method'] = 'HEAD'
689 elif method
== taskqueue_service_pb
.TaskQueueAddRequest
.PUT
:
690 task
['method'] = 'PUT'
691 elif method
== taskqueue_service_pb
.TaskQueueAddRequest
.DELETE
:
692 task
['method'] = 'DELETE'
694 raise ValueError('Unexpected method: %d' % method
)
696 task
['eta'] = _FormatEta(task_request
.eta_usec())
697 task
['eta_delta'] = _EtaDelta(task_request
.eta_usec())
698 task
['body'] = base64
.b64encode(task_request
.body())
700 headers
= [(header
.key(), header
.value())
701 for header
in task_request
.header_list()
702 if header
.key().lower() not in BUILT_IN_HEADERS
]
704 headers
.append(('X-AppEngine-QueueName', queue_name
))
705 headers
.append(('X-AppEngine-TaskName', task
['name']))
706 headers
.append(('X-AppEngine-TaskRetryCount', '0'))
707 headers
.append(('X-AppEngine-Development-Payload', '1'))
708 headers
.append(('Content-Length', len(task
['body'])))
709 if 'content-type' not in frozenset(key
.lower() for key
, _
in headers
):
710 headers
.append(('Content-Type', 'application/octet-stream'))
711 task
['headers'] = headers
715 def DeleteTask(self
, queue_name
, task_name
):
716 """Deletes a task from a queue.
719 queue_name: the name of the queue to delete the task from.
720 task_name: the name of the task to delete.
722 tasks
= self
._taskqueues
.get(queue_name
, [])
724 if task
.task_name() == task_name
:
728 def FlushQueue(self
, queue_name
):
729 """Removes all tasks from a queue.
732 queue_name: the name of the queue to remove tasks from.
734 self
._taskqueues
[queue_name
] = []
736 def _Dynamic_UpdateQueue(self
, request
, unused_response
):
737 """Local implementation of the UpdateQueue RPC in TaskQueueService.
739 Must adhere to the '_Dynamic_' naming convention for stubbing to work.
740 See taskqueue_service.proto for a full description of the RPC.
743 request: A taskqueue_service_pb.TaskQueueUpdateQueueRequest.
744 unused_response: A taskqueue_service_pb.TaskQueueUpdateQueueResponse.
747 queues
= self
._app
_queues
.setdefault(request
.app_id(), {})
748 if request
.queue_name() in queues
and queues
[request
.queue_name()] is None:
749 raise apiproxy_errors
.ApplicationError(
750 taskqueue_service_pb
.TaskQueueServiceError
.TOMBSTONED_QUEUE
)
752 defensive_copy
= self
._QueueDetails
()
753 defensive_copy
.CopyFrom(request
)
755 queues
[request
.queue_name()] = defensive_copy
757 def _Dynamic_FetchQueues(self
, request
, response
):
758 """Local implementation of the FetchQueues RPC in TaskQueueService.
760 Must adhere to the '_Dynamic_' naming convention for stubbing to work.
761 See taskqueue_service.proto for a full description of the RPC.
764 request: A taskqueue_service_pb.TaskQueueFetchQueuesRequest.
765 response: A taskqueue_service_pb.TaskQueueFetchQueuesResponse.
767 queues
= self
._app
_queues
.get(request
.app_id(), {})
768 for unused_key
, queue
in sorted(queues
.items()):
769 if request
.max_rows() == response
.queue_size():
775 response_queue
= response
.add_queue()
776 response_queue
.set_queue_name(queue
.queue_name())
777 response_queue
.set_bucket_refill_per_second(
778 queue
.bucket_refill_per_second())
779 response_queue
.set_bucket_capacity(queue
.bucket_capacity())
780 response_queue
.set_user_specified_rate(queue
.user_specified_rate())
781 response_queue
.set_paused(queue
.paused
)
783 def _Dynamic_FetchQueueStats(self
, request
, response
):
784 """Local 'random' implementation of the TaskQueueService.FetchQueueStats.
786 This implementation loads some stats from the dummy store,
787 the rest with random numbers.
788 Must adhere to the '_Dynamic_' naming convention for stubbing to work.
789 See taskqueue_service.proto for a full description of the RPC.
792 request: A taskqueue_service_pb.TaskQueueFetchQueueStatsRequest.
793 response: A taskqueue_service_pb.TaskQueueFetchQueueStatsResponse.
795 for queue
in request
.queue_name_list():
796 store
= self
.GetDummyTaskStore(request
.app_id(), queue
)
797 stats
= response
.add_queuestats()
798 stats
.set_num_tasks(store
.Count())
799 if stats
.num_tasks() == 0:
800 stats
.set_oldest_eta_usec(-1)
802 stats
.set_oldest_eta_usec(store
.Oldest())
804 if random
.randint(0, 9) > 0:
805 scanner_info
= stats
.mutable_scanner_info()
806 scanner_info
.set_executed_last_minute(random
.randint(0, 10))
807 scanner_info
.set_executed_last_hour(scanner_info
.executed_last_minute()
808 + random
.randint(0, 100))
809 scanner_info
.set_sampling_duration_seconds(random
.random() * 10000.0)
811 def GetDummyTaskStore(self
, app_id
, queue_name
):
812 """Get the dummy task store for this app_id/queue_name pair.
814 Creates an entry and populates it, if there's not already an entry.
818 queue_name: the queue_name.
821 the existing or the new dummy store.
823 task_store_key
= (app_id
, queue_name
)
824 if task_store_key
not in admin_console_dummy_tasks
:
825 store
= _DummyTaskStore()
826 if not self
._all
_queues
_valid
and queue_name
!= CRON_QUEUE_NAME
:
827 store
.Populate(random
.randint(10, 100))
828 admin_console_dummy_tasks
[task_store_key
] = store
830 store
= admin_console_dummy_tasks
[task_store_key
]
833 def _Dynamic_QueryTasks(self
, request
, response
):
834 """Local implementation of the TaskQueueService.QueryTasks RPC.
836 Uses the dummy store, creating tasks if this is the first time the
840 request: A taskqueue_service_pb.TaskQueueQueryTasksRequest.
841 response: A taskqueue_service_pb.TaskQueueQueryTasksResponse.
843 store
= self
.GetDummyTaskStore(request
.app_id(), request
.queue_name())
845 if request
.has_start_eta_usec():
846 tasks
= store
.Lookup(request
.max_rows(), name
=request
.start_task_name(),
847 eta
=request
.start_eta_usec())
849 tasks
= store
.Lookup(request
.max_rows(), name
=request
.start_task_name())
851 response
.add_task().MergeFrom(task
)
853 def _Dynamic_Delete(self
, request
, response
):
854 """Local delete implementation of TaskQueueService.Delete.
856 Deletes tasks from the dummy store. A 1/20 chance of a transient error.
859 request: A taskqueue_service_pb.TaskQueueDeleteRequest.
860 response: A taskqueue_service_pb.TaskQueueDeleteResponse.
862 task_store_key
= (request
.app_id(), request
.queue_name())
863 if task_store_key
not in admin_console_dummy_tasks
:
864 for _
in request
.task_name_list():
866 taskqueue_service_pb
.TaskQueueServiceError
.UNKNOWN_QUEUE
)
869 store
= admin_console_dummy_tasks
[task_store_key
]
870 for taskname
in request
.task_name_list():
871 if random
.random() <= 0.05:
873 taskqueue_service_pb
.TaskQueueServiceError
.TRANSIENT_ERROR
)
875 response
.add_result(store
.Delete(taskname
))
877 def _Dynamic_DeleteQueue(self
, request
, response
):
878 """Local delete implementation of TaskQueueService.DeleteQueue.
881 request: A taskqueue_service_pb.TaskQueueDeleteQueueRequest.
882 response: A taskqueue_service_pb.TaskQueueDeleteQueueResponse.
884 if not request
.queue_name():
885 raise apiproxy_errors
.ApplicationError(
886 taskqueue_service_pb
.TaskQueueServiceError
.INVALID_QUEUE_NAME
)
888 queues
= self
._app
_queues
.get(request
.app_id(), {})
889 if request
.queue_name() not in queues
:
890 raise apiproxy_errors
.ApplicationError(
891 taskqueue_service_pb
.TaskQueueServiceError
.UNKNOWN_QUEUE
)
892 elif queues
[request
.queue_name()] is None:
893 raise apiproxy_errors
.ApplicationError(
894 taskqueue_service_pb
.TaskQueueServiceError
.TOMBSTONED_QUEUE
)
896 queues
[request
.queue_name()] = None
898 def _Dynamic_PauseQueue(self
, request
, response
):
899 """Local pause implementation of TaskQueueService.PauseQueue.
902 request: A taskqueue_service_pb.TaskQueuePauseQueueRequest.
903 response: A taskqueue_service_pb.TaskQueuePauseQueueResponse.
905 if not request
.queue_name():
906 raise apiproxy_errors
.ApplicationError(
907 taskqueue_service_pb
.TaskQueueServiceError
.INVALID_QUEUE_NAME
)
909 queues
= self
._app
_queues
.get(request
.app_id(), {})
910 if request
.queue_name() != DEFAULT_QUEUE_NAME
:
911 if request
.queue_name() not in queues
:
912 raise apiproxy_errors
.ApplicationError(
913 taskqueue_service_pb
.TaskQueueServiceError
.UNKNOWN_QUEUE
)
914 elif queues
[request
.queue_name()] is None:
915 raise apiproxy_errors
.ApplicationError(
916 taskqueue_service_pb
.TaskQueueServiceError
.TOMBSTONED_QUEUE
)
918 queues
[request
.queue_name()].paused
= request
.pause()
920 def _Dynamic_PurgeQueue(self
, request
, response
):
921 """Local purge implementation of TaskQueueService.PurgeQueue.
924 request: A taskqueue_service_pb.TaskQueuePurgeQueueRequest.
925 response: A taskqueue_service_pb.TaskQueuePurgeQueueResponse.
927 if not request
.queue_name():
928 raise apiproxy_errors
.ApplicationError(
929 taskqueue_service_pb
.TaskQueueServiceError
.INVALID_QUEUE_NAME
)
931 queues
= self
._app
_queues
.get(request
.app_id(), {})
932 if request
.queue_name() != DEFAULT_QUEUE_NAME
:
933 if request
.queue_name() not in queues
:
934 raise apiproxy_errors
.ApplicationError(
935 taskqueue_service_pb
.TaskQueueServiceError
.UNKNOWN_QUEUE
)
936 elif queues
[request
.queue_name()] is None:
937 raise apiproxy_errors
.ApplicationError(
938 taskqueue_service_pb
.TaskQueueServiceError
.TOMBSTONED_QUEUE
)
940 store
= self
.GetDummyTaskStore(request
.app_id(), request
.queue_name())
941 for task
in store
.Lookup(store
.Count()):
942 store
.Delete(task
.task_name())
944 self
.FlushQueue(request
.queue_name())
946 def _Dynamic_UpdateStorageLimit(self
, request
, response
):
947 """Local implementation of TaskQueueService.UpdateStorageLimit.
949 request: A taskqueue_service_pb.TaskQueueUpdateStorageLimitRequest.
950 response: A taskqueue_service_pb.TaskQueueUpdateStorageLimitResponse.
952 if request
.limit() < 0 or request
.limit() > 1000 * (1024 ** 4):
953 raise apiproxy_errors
.ApplicationError(
954 taskqueue_service_pb
.TaskQueueServiceError
.INVALID_REQUEST
)
956 response
.set_new_limit(request
.limit())