App Engine Python SDK version 1.9.9
[gae.git] / python / google / appengine / datastore / datastore_stub_util.py
blobba264c512e677ac27515bf5934f0e86418709d10
1 #!/usr/bin/env python
3 # Copyright 2007 Google Inc.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
21 """Utility functions shared between the file and sqlite datastore stubs.
23 This module is internal and should not be used by client applications.
24 """
33 try:
34 import hashlib
35 _MD5_FUNC = hashlib.md5
36 except ImportError:
37 import md5
38 _MD5_FUNC = md5.new
40 import atexit
41 import collections
42 import itertools
43 import logging
44 import os
45 import random
46 import struct
47 import threading
48 import time
49 import weakref
51 from google.net.proto import ProtocolBuffer
52 from google.appengine.datastore import entity_pb
54 from google.appengine.api import api_base_pb
55 from google.appengine.api import apiproxy_stub_map
56 from google.appengine.api import datastore_admin
57 from google.appengine.api import datastore_errors
58 from google.appengine.api import datastore_types
59 from google.appengine.api.taskqueue import taskqueue_service_pb
60 from google.appengine.datastore import datastore_index
61 from google.appengine.datastore import datastore_pb
62 from google.appengine.datastore import datastore_pbs
63 from google.appengine.datastore import datastore_query
64 from google.appengine.datastore import datastore_stub_index
65 from google.appengine.datastore import datastore_v4_pb
66 from google.appengine.runtime import apiproxy_errors
71 _MAXIMUM_RESULTS = 300
77 _MAXIMUM_QUERY_RESULT_BYTES = 2000000
83 _MAX_QUERY_OFFSET = 1000
87 _PROPERTY_TYPE_NAMES = {
88 0: 'NULL',
89 entity_pb.PropertyValue.kint64Value: 'INT64',
90 entity_pb.PropertyValue.kbooleanValue: 'BOOLEAN',
91 entity_pb.PropertyValue.kstringValue: 'STRING',
92 entity_pb.PropertyValue.kdoubleValue: 'DOUBLE',
93 entity_pb.PropertyValue.kPointValueGroup: 'POINT',
94 entity_pb.PropertyValue.kUserValueGroup: 'USER',
95 entity_pb.PropertyValue.kReferenceValueGroup: 'REFERENCE'
100 _SCATTER_PROPORTION = 32768
105 _MAX_EG_PER_TXN = 5
110 _BLOB_MEANINGS = frozenset((entity_pb.Property.BLOB,
111 entity_pb.Property.ENTITY_PROTO,
112 entity_pb.Property.TEXT))
120 _RETRIES = 3
124 _INITIAL_RETRY_DELAY_MS = 100
128 _RETRY_DELAY_MULTIPLIER = 2
132 _MAX_RETRY_DELAY_MS = 120000
137 SEQUENTIAL = 'sequential'
138 SCATTERED = 'scattered'
144 _MAX_SEQUENTIAL_BIT = 52
149 _MAX_SEQUENTIAL_COUNTER = (1 << _MAX_SEQUENTIAL_BIT) - 1
153 _MAX_SEQUENTIAL_ID = _MAX_SEQUENTIAL_COUNTER
158 _MAX_SCATTERED_COUNTER = (1 << (_MAX_SEQUENTIAL_BIT - 1)) - 1
164 _MAX_SCATTERED_ID = _MAX_SEQUENTIAL_ID + 1 + _MAX_SCATTERED_COUNTER
168 _SCATTER_SHIFT = 64 - _MAX_SEQUENTIAL_BIT + 1
171 def _GetScatterProperty(entity_proto):
172 """Gets the scatter property for an object.
174 For ease of implementation, this is not synchronized with the actual
175 value on the App Engine server, but should work equally well.
177 Note: This property may change, either here or in production. No client
178 other than the mapper framework should rely on it directly.
180 Returns:
181 The PropertyValue of the scatter property or None if this entity should not
182 have a scatter property.
184 hash_obj = _MD5_FUNC()
185 for element in entity_proto.key().path().element_list():
186 if element.has_name():
187 hash_obj.update(element.name())
188 elif element.has_id():
189 hash_obj.update(str(element.id()))
190 hash_bytes = hash_obj.digest()[0:2]
191 (hash_int,) = struct.unpack('H', hash_bytes)
193 if hash_int >= _SCATTER_PROPORTION:
194 return None
196 scatter_property = entity_pb.Property()
197 scatter_property.set_name(datastore_types.SCATTER_SPECIAL_PROPERTY)
198 scatter_property.set_meaning(entity_pb.Property.BYTESTRING)
199 scatter_property.set_multiple(False)
200 property_value = scatter_property.mutable_value()
201 property_value.set_stringvalue(hash_bytes)
202 return scatter_property
208 _SPECIAL_PROPERTY_MAP = {
209 datastore_types.SCATTER_SPECIAL_PROPERTY: (False, True, _GetScatterProperty)
213 def GetInvisibleSpecialPropertyNames():
214 """Gets the names of all non user-visible special properties."""
215 invisible_names = []
216 for name, value in _SPECIAL_PROPERTY_MAP.items():
217 is_visible, _, _ = value
218 if not is_visible:
219 invisible_names.append(name)
220 return invisible_names
223 def _PrepareSpecialProperties(entity_proto, is_load):
224 """Computes special properties for loading or storing.
225 Strips other special properties."""
226 for i in xrange(entity_proto.property_size() - 1, -1, -1):
227 if _SPECIAL_PROPERTY_MAP.has_key(entity_proto.property(i).name()):
228 del entity_proto.property_list()[i]
230 for is_visible, is_stored, property_func in _SPECIAL_PROPERTY_MAP.values():
231 if is_load:
232 should_process = is_visible
233 else:
234 should_process = is_stored
236 if should_process:
237 special_property = property_func(entity_proto)
238 if special_property:
239 entity_proto.property_list().append(special_property)
242 def _GetGroupByKey(entity, property_names):
243 """Computes a key value that uniquely identifies the 'group' of an entity.
245 Args:
246 entity: The entity_pb.EntityProto for which to create the group key.
247 property_names: The names of the properties in the group by clause.
249 Returns:
250 A hashable value that uniquely identifies the entity's 'group'.
252 return frozenset((prop.name(), prop.value().SerializeToString())
253 for prop in entity.property_list()
254 if prop.name() in property_names)
257 def PrepareSpecialPropertiesForStore(entity_proto):
258 """Computes special properties for storing.
259 Strips other special properties."""
260 _PrepareSpecialProperties(entity_proto, False)
263 def LoadEntity(entity, keys_only=False, property_names=None):
264 """Prepares an entity to be returned to the user.
266 Args:
267 entity: a entity_pb.EntityProto or None
268 keys_only: if a keys only result should be produced
269 property_names: if not None or empty, cause a projected entity
270 to be produced with the given properties.
272 Returns:
273 A user friendly copy of entity or None.
275 if entity:
276 clone = entity_pb.EntityProto()
277 if property_names:
279 clone.mutable_key().CopyFrom(entity.key())
280 clone.mutable_entity_group()
281 seen = set()
282 for prop in entity.property_list():
283 if prop.name() in property_names:
285 Check(prop.name() not in seen,
286 "datastore dev stub produced bad result",
287 datastore_pb.Error.INTERNAL_ERROR)
288 seen.add(prop.name())
289 new_prop = clone.add_property()
290 new_prop.set_name(prop.name())
291 new_prop.set_meaning(entity_pb.Property.INDEX_VALUE)
292 new_prop.mutable_value().CopyFrom(prop.value())
293 new_prop.set_multiple(False)
294 elif keys_only:
296 clone.mutable_key().CopyFrom(entity.key())
297 clone.mutable_entity_group()
298 else:
300 clone.CopyFrom(entity)
301 PrepareSpecialPropertiesForLoad(clone)
302 return clone
305 def StoreEntity(entity):
306 """Prepares an entity for storing.
308 Args:
309 entity: a entity_pb.EntityProto to prepare
311 Returns:
312 A copy of entity that should be stored in its place.
314 clone = entity_pb.EntityProto()
315 clone.CopyFrom(entity)
319 PrepareSpecialPropertiesForStore(clone)
320 return clone
323 def PrepareSpecialPropertiesForLoad(entity_proto):
324 """Computes special properties that are user-visible.
325 Strips other special properties."""
326 _PrepareSpecialProperties(entity_proto, True)
329 def Check(test, msg='', error_code=datastore_pb.Error.BAD_REQUEST):
330 """Raises an apiproxy_errors.ApplicationError if the condition is false.
332 Args:
333 test: A condition to test.
334 msg: A string to return with the error.
335 error_code: One of datastore_pb.Error to use as an error code.
337 Raises:
338 apiproxy_errors.ApplicationError: If test is false.
340 if not test:
341 raise apiproxy_errors.ApplicationError(error_code, msg)
344 def CheckValidUTF8(string, desc):
345 """Check that the given string is valid UTF-8.
347 Args:
348 string: the string to validate.
349 desc: a description of the string being validated.
351 Raises:
352 apiproxy_errors.ApplicationError: if the string is not valid UTF-8.
354 try:
355 string.decode('utf-8')
356 except UnicodeDecodeError:
357 Check(False, '%s is not valid UTF-8.' % desc)
360 def CheckAppId(request_trusted, request_app_id, app_id):
361 """Check that this is the stub for app_id.
363 Args:
364 request_trusted: If the request is trusted.
365 request_app_id: The application ID of the app making the request.
366 app_id: An application ID.
368 Raises:
369 apiproxy_errors.ApplicationError: if this is not the stub for app_id.
372 assert app_id
373 CheckValidUTF8(app_id, "app id");
374 Check(request_trusted or app_id == request_app_id,
375 'app "%s" cannot access app "%s"\'s data' % (request_app_id, app_id))
378 def CheckReference(request_trusted,
379 request_app_id,
380 key,
381 require_id_or_name=True):
382 """Check this key.
384 Args:
385 request_trusted: If the request is trusted.
386 request_app_id: The application ID of the app making the request.
387 key: entity_pb.Reference
388 require_id_or_name: Boolean indicating if we should enforce the presence of
389 an id or name in the last element of the key's path.
391 Raises:
392 apiproxy_errors.ApplicationError: if the key is invalid
395 assert isinstance(key, entity_pb.Reference)
397 CheckAppId(request_trusted, request_app_id, key.app())
399 Check(key.path().element_size() > 0, 'key\'s path cannot be empty')
401 if require_id_or_name:
403 last_element = key.path().element_list()[-1]
404 has_id_or_name = ((last_element.has_id() and last_element.id() != 0) or
405 (last_element.has_name() and last_element.name() != ""))
406 if not has_id_or_name:
407 raise datastore_errors.BadRequestError('missing key id/name')
409 for elem in key.path().element_list():
410 Check(not elem.has_id() or not elem.has_name(),
411 'each key path element should have id or name but not both: %r' % key)
412 CheckValidUTF8(elem.type(), 'key path element type')
413 if elem.has_name():
414 CheckValidUTF8(elem.name(), 'key path element name')
417 def CheckEntity(request_trusted, request_app_id, entity):
418 """Check if this entity can be stored.
420 Args:
421 request_trusted: If the request is trusted.
422 request_app_id: The application ID of the app making the request.
423 entity: entity_pb.EntityProto
425 Raises:
426 apiproxy_errors.ApplicationError: if the entity is invalid
430 CheckReference(request_trusted, request_app_id, entity.key(), False)
431 for prop in entity.property_list():
432 CheckProperty(request_trusted, request_app_id, prop)
433 for prop in entity.raw_property_list():
434 CheckProperty(request_trusted, request_app_id, prop, indexed=False)
437 def CheckProperty(request_trusted, request_app_id, prop, indexed=True):
438 """Check if this property can be stored.
440 Args:
441 request_trusted: If the request is trusted.
442 request_app_id: The application ID of the app making the request.
443 prop: entity_pb.Property
444 indexed: Whether the property is indexed.
446 Raises:
447 apiproxy_errors.ApplicationError: if the property is invalid
449 name = prop.name()
450 value = prop.value()
451 meaning = prop.meaning()
452 CheckValidUTF8(name, 'property name')
453 Check(request_trusted or
454 not datastore_types.RESERVED_PROPERTY_NAME.match(name),
455 'cannot store entity with reserved property name \'%s\'' % name)
456 Check(prop.meaning() != entity_pb.Property.INDEX_VALUE,
457 'Entities with incomplete properties cannot be written.')
458 is_blob = meaning in _BLOB_MEANINGS
459 if indexed:
460 Check(not is_blob,
461 'BLOB, ENITY_PROTO or TEXT property ' + name +
462 ' must be in a raw_property field')
463 max_length = datastore_types._MAX_STRING_LENGTH
464 else:
465 if is_blob:
466 Check(value.has_stringvalue(),
467 'BLOB / ENTITY_PROTO / TEXT raw property ' + name +
468 'must have a string value')
469 max_length = datastore_types._MAX_RAW_PROPERTY_BYTES
470 if meaning == entity_pb.Property.ATOM_LINK:
471 max_length = datastore_types._MAX_LINK_PROPERTY_LENGTH
473 CheckPropertyValue(name, value, max_length, meaning)
476 def CheckPropertyValue(name, value, max_length, meaning):
477 """Check if this property value can be stored.
479 Args:
480 name: name of the property
481 value: entity_pb.PropertyValue
482 max_length: maximum length for string values
483 meaning: meaning of the property
485 Raises:
486 apiproxy_errors.ApplicationError: if the property is invalid
488 num_values = (value.has_int64value() +
489 value.has_stringvalue() +
490 value.has_booleanvalue() +
491 value.has_doublevalue() +
492 value.has_pointvalue() +
493 value.has_uservalue() +
494 value.has_referencevalue())
495 Check(num_values <= 1, 'PropertyValue for ' + name +
496 ' has multiple value fields set')
498 if value.has_stringvalue():
506 s16 = value.stringvalue().decode('utf-8', 'replace').encode('utf-16')
508 Check((len(s16) - 2) / 2 <= max_length,
509 'Property %s is too long. Maximum length is %d.' % (name, max_length))
510 if (meaning not in _BLOB_MEANINGS and
511 meaning != entity_pb.Property.BYTESTRING):
512 CheckValidUTF8(value.stringvalue(), 'String property value')
515 def CheckTransaction(request_trusted, request_app_id, transaction):
516 """Check that this transaction is valid.
518 Args:
519 request_trusted: If the request is trusted.
520 request_app_id: The application ID of the app making the request.
521 transaction: datastore_pb.Transaction
523 Raises:
524 apiproxy_errors.ApplicationError: if the transaction is not valid.
526 assert isinstance(transaction, datastore_pb.Transaction)
527 CheckAppId(request_trusted, request_app_id, transaction.app())
530 def CheckQuery(query, filters, orders, max_query_components):
531 """Check a datastore query with normalized filters, orders.
533 Raises an ApplicationError when any of the following conditions are violated:
534 - transactional queries have an ancestor
535 - queries that are not too large
536 (sum of filters, orders, ancestor <= max_query_components)
537 - ancestor (if any) app and namespace match query app and namespace
538 - kindless queries only filter on __key__ and only sort on __key__ ascending
539 - multiple inequality (<, <=, >, >=) filters all applied to the same property
540 - filters on __key__ compare to a reference in the same app and namespace as
541 the query
542 - if an inequality filter on prop X is used, the first order (if any) must
543 be on X
545 Args:
546 query: query to validate
547 filters: normalized (by datastore_index.Normalize) filters from query
548 orders: normalized (by datastore_index.Normalize) orders from query
549 max_query_components: limit on query complexity
551 Check(query.property_name_size() == 0 or not query.keys_only(),
552 'projection and keys_only cannot both be set')
554 projected_properties = set(query.property_name_list())
555 for prop_name in query.property_name_list():
556 Check(not datastore_types.RESERVED_PROPERTY_NAME.match(prop_name),
557 'projections are not supported for the property: ' + prop_name)
558 Check(len(projected_properties) == len(query.property_name_list()),
559 "cannot project a property multiple times")
561 key_prop_name = datastore_types.KEY_SPECIAL_PROPERTY
562 unapplied_log_timestamp_us_name = (
563 datastore_types._UNAPPLIED_LOG_TIMESTAMP_SPECIAL_PROPERTY)
565 if query.has_transaction():
567 Check(query.has_ancestor(),
568 'Only ancestor queries are allowed inside transactions.')
571 num_components = len(filters) + len(orders)
572 if query.has_ancestor():
573 num_components += 1
574 Check(num_components <= max_query_components,
575 'query is too large. may not have more than %s filters'
576 ' + sort orders ancestor total' % max_query_components)
579 if query.has_ancestor():
580 ancestor = query.ancestor()
581 Check(query.app() == ancestor.app(),
582 'query app is %s but ancestor app is %s' %
583 (query.app(), ancestor.app()))
584 Check(query.name_space() == ancestor.name_space(),
585 'query namespace is %s but ancestor namespace is %s' %
586 (query.name_space(), ancestor.name_space()))
589 if query.group_by_property_name_size():
590 group_by_set = set(query.group_by_property_name_list())
591 for order in orders:
592 if not group_by_set:
593 break
594 Check(order.property() in group_by_set,
595 'items in the group by clause must be specified first '
596 'in the ordering')
597 group_by_set.remove(order.property())
601 ineq_prop_name = None
602 for filter in filters:
603 Check(filter.property_size() == 1,
604 'Filter has %d properties, expected 1' % filter.property_size())
606 prop = filter.property(0)
607 prop_name = prop.name().decode('utf-8')
609 if prop_name == key_prop_name:
613 Check(prop.value().has_referencevalue(),
614 '%s filter value must be a Key' % key_prop_name)
615 ref_val = prop.value().referencevalue()
616 Check(ref_val.app() == query.app(),
617 '%s filter app is %s but query app is %s' %
618 (key_prop_name, ref_val.app(), query.app()))
619 Check(ref_val.name_space() == query.name_space(),
620 '%s filter namespace is %s but query namespace is %s' %
621 (key_prop_name, ref_val.name_space(), query.name_space()))
623 if filter.op() in datastore_index.EQUALITY_OPERATORS:
624 Check(prop_name not in projected_properties,
625 'cannot use projection on a property with an equality filter')
626 if (filter.op() in datastore_index.INEQUALITY_OPERATORS and
627 prop_name != unapplied_log_timestamp_us_name):
628 if ineq_prop_name is None:
629 ineq_prop_name = prop_name
630 else:
631 Check(ineq_prop_name == prop_name,
632 'Only one inequality filter per query is supported. '
633 'Encountered both %s and %s' % (ineq_prop_name, prop_name))
635 if (ineq_prop_name is not None
636 and query.group_by_property_name_size() > 0
637 and not orders):
639 Check(ineq_prop_name in group_by_set,
640 'Inequality filter on %s must also be a group by '
641 'property when group by properties are set.'
642 % (ineq_prop_name))
644 if ineq_prop_name is not None and orders:
646 first_order_prop = orders[0].property().decode('utf-8')
647 Check(first_order_prop == ineq_prop_name,
648 'The first sort property must be the same as the property '
649 'to which the inequality filter is applied. In your query '
650 'the first sort property is %s but the inequality filter '
651 'is on %s' % (first_order_prop, ineq_prop_name))
653 if not query.has_kind():
655 for filter in filters:
656 prop_name = filter.property(0).name().decode('utf-8')
657 Check(prop_name == key_prop_name or
658 prop_name == unapplied_log_timestamp_us_name,
659 'kind is required for non-__key__ filters')
660 for order in orders:
661 prop_name = order.property().decode('utf-8')
662 Check(prop_name == key_prop_name and
663 order.direction() is datastore_pb.Query_Order.ASCENDING,
664 'kind is required for all orders except __key__ ascending')
667 class ValueRange(object):
668 """A range of values defined by its two extremes (inclusive or exclusive)."""
670 def __init__(self):
671 """Constructor.
673 Creates an unlimited range.
675 self.__start = self.__end = None
676 self.__start_inclusive = self.__end_inclusive = False
678 def Update(self, rel_op, limit):
679 """Filter the range by 'rel_op limit'.
681 Args:
682 rel_op: relational operator from datastore_pb.Query_Filter.
683 limit: the value to limit the range by.
686 if rel_op == datastore_pb.Query_Filter.LESS_THAN:
687 if self.__end is None or limit <= self.__end:
688 self.__end = limit
689 self.__end_inclusive = False
690 elif (rel_op == datastore_pb.Query_Filter.LESS_THAN_OR_EQUAL or
691 rel_op == datastore_pb.Query_Filter.EQUAL):
692 if self.__end is None or limit < self.__end:
693 self.__end = limit
694 self.__end_inclusive = True
696 if rel_op == datastore_pb.Query_Filter.GREATER_THAN:
697 if self.__start is None or limit >= self.__start:
698 self.__start = limit
699 self.__start_inclusive = False
700 elif (rel_op == datastore_pb.Query_Filter.GREATER_THAN_OR_EQUAL or
701 rel_op == datastore_pb.Query_Filter.EQUAL):
702 if self.__start is None or limit > self.__start:
703 self.__start = limit
704 self.__start_inclusive = True
706 def Contains(self, value):
707 """Check if the range contains a specific value.
709 Args:
710 value: the value to check.
711 Returns:
712 True iff value is contained in this range.
714 if self.__start is not None:
715 if self.__start_inclusive and value < self.__start: return False
716 if not self.__start_inclusive and value <= self.__start: return False
717 if self.__end is not None:
718 if self.__end_inclusive and value > self.__end: return False
719 if not self.__end_inclusive and value >= self.__end: return False
720 return True
722 def Remap(self, mapper):
723 """Transforms the range extremes with a function.
725 The function mapper must preserve order, i.e.
726 x rel_op y iff mapper(x) rel_op y
728 Args:
729 mapper: function to apply to the range extremes.
731 self.__start = self.__start and mapper(self.__start)
732 self.__end = self.__end and mapper(self.__end)
734 def MapExtremes(self, mapper):
735 """Evaluate a function on the range extremes.
737 Args:
738 mapper: function to apply to the range extremes.
739 Returns:
740 (x, y) where x = None if the range has no start,
741 mapper(start, start_inclusive, False) otherwise
742 y = None if the range has no end,
743 mapper(end, end_inclusive, True) otherwise
745 return (
746 self.__start and mapper(self.__start, self.__start_inclusive, False),
747 self.__end and mapper(self.__end, self.__end_inclusive, True))
750 def ParseKeyFilteredQuery(filters, orders):
751 """Parse queries which only allow filters and ascending-orders on __key__.
753 Raises exceptions for illegal queries.
754 Args:
755 filters: the normalized filters of a query.
756 orders: the normalized orders of a query.
757 Returns:
758 The key range (a ValueRange over datastore_types.Key) requested in the
759 query.
762 remaining_filters = []
763 key_range = ValueRange()
764 key_prop = datastore_types.KEY_SPECIAL_PROPERTY
765 for f in filters:
766 op = f.op()
767 if not (f.property_size() == 1 and
768 f.property(0).name() == key_prop and
769 not (op == datastore_pb.Query_Filter.IN or
770 op == datastore_pb.Query_Filter.EXISTS)):
771 remaining_filters.append(f)
772 continue
774 val = f.property(0).value()
775 Check(val.has_referencevalue(), '__key__ kind must be compared to a key')
776 limit = datastore_types.FromReferenceProperty(val)
777 key_range.Update(op, limit)
780 remaining_orders = []
781 for o in orders:
782 if not (o.direction() == datastore_pb.Query_Order.ASCENDING and
783 o.property() == datastore_types.KEY_SPECIAL_PROPERTY):
784 remaining_orders.append(o)
785 else:
786 break
790 Check(not remaining_filters,
791 'Only comparison filters on ' + key_prop + ' supported')
792 Check(not remaining_orders,
793 'Only ascending order on ' + key_prop + ' supported')
795 return key_range
798 def ParseKindQuery(query, filters, orders):
799 """Parse __kind__ (schema) queries.
801 Raises exceptions for illegal queries.
802 Args:
803 query: A Query PB.
804 filters: the normalized filters from query.
805 orders: the normalized orders from query.
806 Returns:
807 The kind range (a ValueRange over string) requested in the query.
810 Check(not query.has_ancestor(), 'ancestor queries on __kind__ not allowed')
812 key_range = ParseKeyFilteredQuery(filters, orders)
813 key_range.Remap(_KindKeyToString)
815 return key_range
818 def _KindKeyToString(key):
819 """Extract kind name from __kind__ key.
821 Raises an ApplicationError if the key is not of the form '__kind__'/name.
823 Args:
824 key: a key for a __kind__ instance.
825 Returns:
826 kind specified by key.
828 key_path = key.to_path()
829 if (len(key_path) == 2 and key_path[0] == '__kind__' and
830 isinstance(key_path[1], basestring)):
831 return key_path[1]
832 Check(False, 'invalid Key for __kind__ table')
835 def ParseNamespaceQuery(query, filters, orders):
836 """Parse __namespace__ queries.
838 Raises exceptions for illegal queries.
839 Args:
840 query: A Query PB.
841 filters: the normalized filters from query.
842 orders: the normalized orders from query.
843 Returns:
844 The kind range (a ValueRange over string) requested in the query.
847 Check(not query.has_ancestor(),
848 'ancestor queries on __namespace__ not allowed')
850 key_range = ParseKeyFilteredQuery(filters, orders)
851 key_range.Remap(_NamespaceKeyToString)
853 return key_range
856 def _NamespaceKeyToString(key):
857 """Extract namespace name from __namespace__ key.
859 Raises an ApplicationError if the key is not of the form '__namespace__'/name
860 or '__namespace__'/_EMPTY_NAMESPACE_ID.
862 Args:
863 key: a key for a __namespace__ instance.
864 Returns:
865 namespace specified by key.
867 key_path = key.to_path()
868 if len(key_path) == 2 and key_path[0] == '__namespace__':
869 if key_path[1] == datastore_types._EMPTY_NAMESPACE_ID:
870 return ''
871 if isinstance(key_path[1], basestring):
872 return key_path[1]
873 Check(False, 'invalid Key for __namespace__ table')
876 def ParsePropertyQuery(query, filters, orders):
877 """Parse __property__ queries.
879 Raises exceptions for illegal queries.
880 Args:
881 query: A Query PB.
882 filters: the normalized filters from query.
883 orders: the normalized orders from query.
884 Returns:
885 The kind range (a ValueRange over (kind, property) pairs) requested
886 in the query.
889 Check(not query.has_transaction(),
890 'transactional queries on __property__ not allowed')
892 key_range = ParseKeyFilteredQuery(filters, orders)
893 key_range.Remap(lambda x: _PropertyKeyToString(x, ''))
895 if query.has_ancestor():
896 ancestor = datastore_types.Key._FromPb(query.ancestor())
897 ancestor_kind, ancestor_property = _PropertyKeyToString(ancestor, None)
900 if ancestor_property is not None:
901 key_range.Update(datastore_pb.Query_Filter.EQUAL,
902 (ancestor_kind, ancestor_property))
903 else:
905 key_range.Update(datastore_pb.Query_Filter.GREATER_THAN_OR_EQUAL,
906 (ancestor_kind, ''))
907 key_range.Update(datastore_pb.Query_Filter.LESS_THAN_OR_EQUAL,
908 (ancestor_kind + '\0', ''))
909 query.clear_ancestor()
911 return key_range
914 def _PropertyKeyToString(key, default_property):
915 """Extract property name from __property__ key.
917 Raises an ApplicationError if the key is not of the form
918 '__kind__'/kind, '__property__'/property or '__kind__'/kind
920 Args:
921 key: a key for a __property__ instance.
922 default_property: property value to return when key only has a kind.
923 Returns:
924 kind, property if key = '__kind__'/kind, '__property__'/property
925 kind, default_property if key = '__kind__'/kind
927 key_path = key.to_path()
928 if (len(key_path) == 2 and
929 key_path[0] == '__kind__' and isinstance(key_path[1], basestring)):
930 return (key_path[1], default_property)
931 if (len(key_path) == 4 and
932 key_path[0] == '__kind__' and isinstance(key_path[1], basestring) and
933 key_path[2] == '__property__' and isinstance(key_path[3], basestring)):
934 return (key_path[1], key_path[3])
936 Check(False, 'invalid Key for __property__ table')
939 def SynthesizeUserId(email):
940 """Return a synthetic user ID from an email address.
942 Note that this is not the same user ID found in the production system.
944 Args:
945 email: An email address.
947 Returns:
948 A string userid derived from the email address.
951 user_id_digest = _MD5_FUNC(email.lower()).digest()
952 user_id = '1' + ''.join(['%02d' % ord(x) for x in user_id_digest])[:20]
953 return user_id
956 def FillUsersInQuery(filters):
957 """Fill in a synthetic user ID for all user properties in a set of filters.
959 Args:
960 filters: The normalized filters from query.
962 for filter in filters:
963 for property in filter.property_list():
964 FillUser(property)
967 def FillUser(property):
968 """Fill in a synthetic user ID for a user properties.
970 Args:
971 property: A Property which may have a user value.
973 if property.value().has_uservalue():
974 uid = SynthesizeUserId(property.value().uservalue().email())
975 if uid:
976 property.mutable_value().mutable_uservalue().set_obfuscated_gaiaid(uid)
979 class BaseCursor(object):
980 """A base query cursor over a list of entities.
982 Public properties:
983 cursor: the integer cursor.
984 app: the app for which this cursor was created.
985 keys_only: whether the query is keys_only.
987 Class attributes:
988 _next_cursor: the next cursor to allocate.
989 _next_cursor_lock: protects _next_cursor.
991 _next_cursor = 1
992 _next_cursor_lock = threading.Lock()
994 def __init__(self, query, dsquery, orders, index_list):
995 """Constructor.
997 Args:
998 query: the query request proto.
999 dsquery: a datastore_query.Query over query.
1000 orders: the orders of query as returned by _GuessOrders.
1001 index_list: the list of indexes used by the query.
1004 self.keys_only = query.keys_only()
1005 self.property_names = set(query.property_name_list())
1006 self.group_by = set(query.group_by_property_name_list())
1007 self.app = query.app()
1008 self.cursor = self._AcquireCursorID()
1010 self.__order_compare_entities = dsquery._order.cmp_for_filter(
1011 dsquery._filter_predicate)
1012 if self.group_by:
1013 self.__cursor_properties = self.group_by
1014 else:
1015 self.__cursor_properties = set(order.property() for order in orders)
1016 self.__cursor_properties.add('__key__')
1017 self.__cursor_properties = frozenset(self.__cursor_properties)
1019 self.__first_sort_order = orders[0].direction()
1020 self.__index_list = index_list
1022 def _PopulateResultMetadata(self, query_result, compile,
1023 first_result, last_result):
1024 query_result.set_keys_only(self.keys_only)
1025 if query_result.more_results():
1026 cursor = query_result.mutable_cursor()
1027 cursor.set_app(self.app)
1028 cursor.set_cursor(self.cursor)
1029 if compile:
1030 self._EncodeCompiledCursor(last_result,
1031 query_result.mutable_compiled_cursor())
1032 if first_result:
1033 query_result.index_list().extend(self.__index_list)
1035 @classmethod
1036 def _AcquireCursorID(cls):
1037 """Acquires the next cursor id in a thread safe manner."""
1038 cls._next_cursor_lock.acquire()
1039 try:
1040 cursor_id = cls._next_cursor
1041 cls._next_cursor += 1
1042 finally:
1043 cls._next_cursor_lock.release()
1044 return cursor_id
1046 def _IsBeforeCursor(self, entity, cursor):
1047 """True if entity is before cursor according to the current order.
1049 Args:
1050 entity: a entity_pb.EntityProto entity.
1051 cursor: a compiled cursor as returned by _DecodeCompiledCursor.
1053 comparison_entity = entity_pb.EntityProto()
1054 for prop in entity.property_list():
1055 if prop.name() in self.__cursor_properties:
1056 comparison_entity.add_property().MergeFrom(prop)
1057 if cursor[0].has_key():
1058 comparison_entity.mutable_key().MergeFrom(entity.key())
1059 x = self.__order_compare_entities(comparison_entity, cursor[0])
1060 if cursor[1]:
1061 return x < 0
1062 else:
1063 return x <= 0
1065 def _DecodeCompiledCursor(self, compiled_cursor):
1066 """Converts a compiled_cursor into a cursor_entity.
1068 Args:
1069 compiled_cursor: The datastore_pb.CompiledCursor to decode.
1071 Returns:
1072 (cursor_entity, inclusive): a entity_pb.EntityProto and if it should
1073 be included in the result set.
1075 assert compiled_cursor.has_position()
1077 position = compiled_cursor.position()
1082 remaining_properties = set(self.__cursor_properties)
1084 cursor_entity = entity_pb.EntityProto()
1085 if position.has_key():
1086 cursor_entity.mutable_key().CopyFrom(position.key())
1087 try:
1088 remaining_properties.remove('__key__')
1089 except KeyError:
1090 Check(False, 'Cursor does not match query: extra value __key__')
1091 for indexvalue in position.indexvalue_list():
1092 property = cursor_entity.add_property()
1093 property.set_name(indexvalue.property())
1094 property.mutable_value().CopyFrom(indexvalue.value())
1095 try:
1096 remaining_properties.remove(indexvalue.property())
1097 except KeyError:
1098 Check(False, 'Cursor does not match query: extra value %s' %
1099 indexvalue.property())
1100 Check(not remaining_properties,
1101 'Cursor does not match query: missing values for %r' %
1102 remaining_properties)
1106 return (cursor_entity, position.start_inclusive())
1108 def _EncodeCompiledCursor(self, last_result, compiled_cursor):
1109 """Converts the current state of the cursor into a compiled_cursor.
1111 Args:
1112 last_result: the last result returned by this query.
1113 compiled_cursor: an empty datstore_pb.CompiledCursor.
1115 if last_result is not None:
1118 position = compiled_cursor.mutable_position()
1121 if '__key__' in self.__cursor_properties:
1122 position.mutable_key().MergeFrom(last_result.key())
1123 for prop in last_result.property_list():
1124 if prop.name() in self.__cursor_properties:
1125 indexvalue = position.add_indexvalue()
1126 indexvalue.set_property(prop.name())
1127 indexvalue.mutable_value().CopyFrom(prop.value())
1128 position.set_start_inclusive(False)
1129 _SetBeforeAscending(position, self.__first_sort_order)
1132 class ListCursor(BaseCursor):
1133 """A query cursor over a list of entities.
1135 Public properties:
1136 keys_only: whether the query is keys_only
1139 def __init__(self, query, dsquery, orders, index_list, results):
1140 """Constructor.
1142 Args:
1143 query: the query request proto
1144 dsquery: a datastore_query.Query over query.
1145 orders: the orders of query as returned by _GuessOrders.
1146 index_list: the list of indexes used by the query.
1147 results: list of entity_pb.EntityProto
1149 super(ListCursor, self).__init__(query, dsquery, orders, index_list)
1152 if self.group_by:
1153 distincts = set()
1154 new_results = []
1155 for result in results:
1156 key_value = _GetGroupByKey(result, self.group_by)
1157 if key_value not in distincts:
1158 distincts.add(key_value)
1159 new_results.append(result)
1160 results = new_results
1162 if query.has_compiled_cursor() and query.compiled_cursor().has_position():
1163 start_cursor = self._DecodeCompiledCursor(query.compiled_cursor())
1164 self.__last_result = start_cursor[0]
1165 start_cursor_position = self._GetCursorOffset(results, start_cursor)
1166 else:
1167 self.__last_result = None
1168 start_cursor_position = 0
1170 if query.has_end_compiled_cursor():
1171 if query.end_compiled_cursor().has_position():
1172 end_cursor = self._DecodeCompiledCursor(query.end_compiled_cursor())
1173 end_cursor_position = self._GetCursorOffset(results, end_cursor)
1174 else:
1175 end_cursor_position = 0
1176 else:
1177 end_cursor_position = len(results)
1180 results = results[start_cursor_position:end_cursor_position]
1183 if query.has_limit():
1184 limit = query.limit()
1185 if query.offset():
1186 limit += query.offset()
1187 if limit >= 0 and limit < len(results):
1188 results = results[:limit]
1190 self.__results = results
1191 self.__offset = 0
1192 self.__count = len(self.__results)
1194 def _GetCursorOffset(self, results, cursor):
1195 """Converts a cursor into a offset into the result set even if the
1196 cursor's entity no longer exists.
1198 Args:
1199 results: the query's results (sequence of entity_pb.EntityProto)
1200 cursor: a compiled cursor as returned by _DecodeCompiledCursor
1201 Returns:
1202 the integer offset
1204 lo = 0
1205 hi = len(results)
1206 while lo < hi:
1207 mid = (lo + hi) // 2
1208 if self._IsBeforeCursor(results[mid], cursor):
1209 lo = mid + 1
1210 else:
1211 hi = mid
1212 return lo
1214 def PopulateQueryResult(self, result, count, offset,
1215 compile=False, first_result=False):
1216 """Populates a QueryResult with this cursor and the given number of results.
1218 Args:
1219 result: datastore_pb.QueryResult
1220 count: integer of how many results to return
1221 offset: integer of how many results to skip
1222 compile: boolean, whether we are compiling this query
1223 first_result: whether the query result is the first for this query
1225 Check(offset >= 0, 'Offset must be >= 0')
1227 offset = min(offset, self.__count - self.__offset)
1228 limited_offset = min(offset, _MAX_QUERY_OFFSET)
1229 if limited_offset:
1230 self.__offset += limited_offset
1231 result.set_skipped_results(limited_offset)
1233 if compile and result.skipped_results() > 0:
1234 self._EncodeCompiledCursor(self.__results[self.__offset - 1],
1235 result.mutable_skipped_results_compiled_cursor())
1236 if offset == limited_offset and count:
1238 if count > _MAXIMUM_RESULTS:
1239 count = _MAXIMUM_RESULTS
1240 results = self.__results[self.__offset:self.__offset + count]
1241 count = len(results)
1242 self.__offset += count
1248 result.result_list().extend(
1249 LoadEntity(entity, self.keys_only, self.property_names)
1250 for entity in results)
1251 if compile:
1252 for entity in results:
1253 self._EncodeCompiledCursor(entity,
1254 result.add_result_compiled_cursor())
1256 if self.__offset:
1258 self.__last_result = self.__results[self.__offset - 1]
1260 result.set_more_results(self.__offset < self.__count)
1261 self._PopulateResultMetadata(result, compile,
1262 first_result, self.__last_result)
1265 def _SynchronizeTxn(function):
1266 """A decorator that locks a transaction during the function call."""
1268 def sync(txn, *args, **kwargs):
1270 txn._lock.acquire()
1271 try:
1273 Check(txn._state is LiveTxn.ACTIVE, 'transaction closed')
1275 return function(txn, *args, **kwargs)
1276 finally:
1278 txn._lock.release()
1279 return sync
1282 def _GetEntityGroup(ref):
1283 """Returns the entity group key for the given reference."""
1284 entity_group = entity_pb.Reference()
1285 entity_group.CopyFrom(ref)
1286 assert (entity_group.path().element_list()[0].has_id() or
1287 entity_group.path().element_list()[0].has_name())
1288 del entity_group.path().element_list()[1:]
1289 return entity_group
1292 def _GetKeyKind(key):
1293 """Return the kind of the given key."""
1294 return key.path().element_list()[-1].type()
1297 def _FilterIndexesByKind(key, indexes):
1298 """Return only the indexes with the specified kind."""
1299 return filter((lambda index:
1300 index.definition().entity_type() == _GetKeyKind(key)), indexes)
1303 class LiveTxn(object):
1304 """An in flight transaction."""
1323 ACTIVE = 1
1324 COMMITED = 2
1325 ROLLEDBACK = 3
1326 FAILED = 4
1328 _state = ACTIVE
1329 _commit_time_s = None
1331 def __init__(self, txn_manager, app, allow_multiple_eg):
1332 assert isinstance(txn_manager, BaseTransactionManager)
1333 assert isinstance(app, basestring)
1335 self._txn_manager = txn_manager
1336 self._app = app
1337 self._allow_multiple_eg = allow_multiple_eg
1340 self._entity_groups = {}
1342 self._lock = threading.RLock()
1343 self._apply_lock = threading.Lock()
1345 self._actions = []
1346 self._cost = datastore_pb.Cost()
1352 self._kind_to_indexes = collections.defaultdict(list)
1354 def _GetTracker(self, reference):
1355 """Gets the entity group tracker for reference.
1357 If this is the first time reference's entity group is seen, creates a new
1358 tracker, checking that the transaction doesn't exceed the entity group
1359 limit.
1361 entity_group = _GetEntityGroup(reference)
1362 key = datastore_types.ReferenceToKeyValue(entity_group)
1363 tracker = self._entity_groups.get(key, None)
1364 if tracker is None:
1365 Check(self._app == reference.app(),
1366 'Transactions cannot span applications (expected %s, got %s)' %
1367 (self._app, reference.app()))
1368 if self._allow_multiple_eg:
1369 Check(len(self._entity_groups) < _MAX_EG_PER_TXN,
1370 'operating on too many entity groups in a single transaction.')
1371 else:
1372 Check(len(self._entity_groups) < 1,
1373 "cross-groups transaction need to be explicitly "
1374 "specified (xg=True)")
1375 tracker = EntityGroupTracker(entity_group)
1376 self._entity_groups[key] = tracker
1378 return tracker
1380 def _GetAllTrackers(self):
1381 """Get the trackers for the transaction's entity groups.
1383 If no entity group has been discovered returns a 'global' entity group
1384 tracker. This is possible if the txn only contains transactional tasks.
1386 Returns:
1387 The tracker list for the entity groups used in this txn.
1389 if not self._entity_groups:
1390 self._GetTracker(datastore_types.Key.from_path(
1391 '__global__', 1, _app=self._app)._ToPb())
1392 return self._entity_groups.values()
1394 def _GrabSnapshot(self, reference):
1395 """Gets snapshot for this reference, creating it if necessary.
1397 If no snapshot has been set for reference's entity group, a snapshot is
1398 taken and stored for future reads (this also sets the read position),
1399 and a CONCURRENT_TRANSACTION exception is thrown if we no longer have
1400 a consistent snapshot.
1402 Args:
1403 reference: A entity_pb.Reference from which to extract the entity group.
1404 Raises:
1405 apiproxy_errors.ApplicationError if the snapshot is not consistent.
1407 tracker = self._GetTracker(reference)
1408 check_contention = tracker._snapshot is None
1409 snapshot = tracker._GrabSnapshot(self._txn_manager)
1410 if check_contention:
1416 candidates = [other for other in self._entity_groups.values()
1417 if other._snapshot is not None and other != tracker]
1418 meta_data_list = [other._meta_data for other in candidates]
1419 self._txn_manager._AcquireWriteLocks(meta_data_list)
1420 try:
1421 for other in candidates:
1422 if other._meta_data._log_pos != other._read_pos:
1423 self._state = self.FAILED
1424 raise apiproxy_errors.ApplicationError(
1425 datastore_pb.Error.CONCURRENT_TRANSACTION,
1426 'Concurrency exception.')
1427 finally:
1428 self._txn_manager._ReleaseWriteLocks(meta_data_list)
1429 return snapshot
1431 @_SynchronizeTxn
1432 def Get(self, reference):
1433 """Returns the entity associated with the given entity_pb.Reference or None.
1435 Does not see any modifications in the current txn.
1437 Args:
1438 reference: The entity_pb.Reference of the entity to look up.
1440 Returns:
1441 The associated entity_pb.EntityProto or None if no such entity exists.
1443 snapshot = self._GrabSnapshot(reference)
1444 entity = snapshot.get(datastore_types.ReferenceToKeyValue(reference))
1445 return LoadEntity(entity)
1447 @_SynchronizeTxn
1448 def GetQueryCursor(self, query, filters, orders, index_list,
1449 filter_predicate=None):
1450 """Runs the given datastore_pb.Query and returns a QueryCursor for it.
1452 Does not see any modifications in the current txn.
1454 Args:
1455 query: The datastore_pb.Query to run.
1456 filters: A list of filters that override the ones found on query.
1457 orders: A list of orders that override the ones found on query.
1458 index_list: A list of indexes used by the query.
1459 filter_predicate: an additional filter of type
1460 datastore_query.FilterPredicate. This is passed along to implement V4
1461 specific filters without changing the entire stub.
1463 Returns:
1464 A BaseCursor that can be used to fetch query results.
1466 Check(query.has_ancestor(),
1467 'Query must have an ancestor when performed in a transaction.')
1468 snapshot = self._GrabSnapshot(query.ancestor())
1469 return _ExecuteQuery(snapshot.values(), query, filters, orders, index_list,
1470 filter_predicate)
1472 @_SynchronizeTxn
1473 def Put(self, entity, insert, indexes):
1474 """Puts the given entity.
1476 Args:
1477 entity: The entity_pb.EntityProto to put.
1478 insert: A boolean that indicates if we should fail if the entity already
1479 exists.
1480 indexes: The composite indexes that apply to the entity.
1482 tracker = self._GetTracker(entity.key())
1483 key = datastore_types.ReferenceToKeyValue(entity.key())
1484 tracker._delete.pop(key, None)
1485 tracker._put[key] = (entity, insert)
1486 self._kind_to_indexes[_GetKeyKind(entity.key())] = indexes
1488 @_SynchronizeTxn
1489 def Delete(self, reference, indexes):
1490 """Deletes the entity associated with the given reference.
1492 Args:
1493 reference: The entity_pb.Reference of the entity to delete.
1494 indexes: The composite indexes that apply to the entity.
1496 tracker = self._GetTracker(reference)
1497 key = datastore_types.ReferenceToKeyValue(reference)
1498 tracker._put.pop(key, None)
1499 tracker._delete[key] = reference
1500 self._kind_to_indexes[_GetKeyKind(reference)] = indexes
1502 @_SynchronizeTxn
1503 def AddActions(self, actions, max_actions=None):
1504 """Adds the given actions to the current txn.
1506 Args:
1507 actions: A list of pbs to send to taskqueue.Add when the txn is applied.
1508 max_actions: A number that indicates the maximum number of actions to
1509 allow on this txn.
1511 Check(not max_actions or len(self._actions) + len(actions) <= max_actions,
1512 'Too many messages, maximum allowed %s' % max_actions)
1513 self._actions.extend(actions)
1515 def Rollback(self):
1516 """Rollback the current txn."""
1518 self._lock.acquire()
1519 try:
1520 Check(self._state is self.ACTIVE or self._state is self.FAILED,
1521 'transaction closed')
1522 self._state = self.ROLLEDBACK
1523 finally:
1524 self._txn_manager._RemoveTxn(self)
1526 self._lock.release()
1528 @_SynchronizeTxn
1529 def Commit(self):
1530 """Commits the current txn.
1532 This function hands off the responsibility of calling _Apply to the owning
1533 TransactionManager.
1535 Returns:
1536 The cost of the transaction.
1538 try:
1540 trackers = self._GetAllTrackers()
1541 empty = True
1542 for tracker in trackers:
1543 snapshot = tracker._GrabSnapshot(self._txn_manager)
1544 empty = empty and not tracker._put and not tracker._delete
1547 for entity, insert in tracker._put.itervalues():
1548 Check(not insert or self.Get(entity.key()) is None,
1549 'the id allocated for a new entity was already '
1550 'in use, please try again')
1552 old_entity = None
1553 key = datastore_types.ReferenceToKeyValue(entity.key())
1554 if key in snapshot:
1555 old_entity = snapshot[key]
1556 self._AddWriteOps(old_entity, entity)
1558 for reference in tracker._delete.itervalues():
1561 old_entity = None
1562 key = datastore_types.ReferenceToKeyValue(reference)
1563 if key in snapshot:
1564 old_entity = snapshot[key]
1565 if old_entity is not None:
1566 self._AddWriteOps(None, old_entity)
1569 if empty and not self._actions:
1570 self.Rollback()
1571 return datastore_pb.Cost()
1574 meta_data_list = [tracker._meta_data for tracker in trackers]
1575 self._txn_manager._AcquireWriteLocks(meta_data_list)
1576 except:
1578 self.Rollback()
1579 raise
1581 try:
1583 for tracker in trackers:
1584 Check(tracker._meta_data._log_pos == tracker._read_pos,
1585 'Concurrency exception.',
1586 datastore_pb.Error.CONCURRENT_TRANSACTION)
1589 for tracker in trackers:
1590 tracker._meta_data.Log(self)
1591 self._state = self.COMMITED
1592 self._commit_time_s = time.time()
1593 except:
1595 self.Rollback()
1596 raise
1597 else:
1599 for action in self._actions:
1600 try:
1601 apiproxy_stub_map.MakeSyncCall(
1602 'taskqueue', 'Add', action, api_base_pb.VoidProto())
1603 except apiproxy_errors.ApplicationError, e:
1604 logging.warning('Transactional task %s has been dropped, %s',
1605 action, e)
1606 self._actions = []
1607 finally:
1608 self._txn_manager._RemoveTxn(self)
1610 self._txn_manager._ReleaseWriteLocks(meta_data_list)
1613 self._txn_manager._consistency_policy._OnCommit(self)
1614 return self._cost
1616 def _AddWriteOps(self, old_entity, new_entity):
1617 """Adds the cost of writing the new_entity to the _cost member.
1619 We assume that old_entity represents the current state of the Datastore.
1621 Args:
1622 old_entity: Entity representing the current state in the Datstore.
1623 new_entity: Entity representing the desired state in the Datstore.
1625 composite_indexes = self._kind_to_indexes[_GetKeyKind(new_entity.key())]
1626 entity_writes, index_writes = _CalculateWriteOps(
1627 composite_indexes, old_entity, new_entity)
1628 _UpdateCost(self._cost, entity_writes, index_writes)
1630 def _Apply(self, meta_data):
1631 """Applies the current txn on the given entity group.
1633 This function blindly performs the operations contained in the current txn.
1634 The calling function must acquire the entity group write lock and ensure
1635 transactions are applied in order.
1638 self._apply_lock.acquire()
1639 try:
1641 assert self._state == self.COMMITED
1642 for tracker in self._entity_groups.values():
1643 if tracker._meta_data is meta_data:
1644 break
1645 else:
1646 assert False
1647 assert tracker._read_pos != tracker.APPLIED
1650 for entity, insert in tracker._put.itervalues():
1651 self._txn_manager._Put(entity, insert)
1654 for key in tracker._delete.itervalues():
1655 self._txn_manager._Delete(key)
1659 tracker._read_pos = EntityGroupTracker.APPLIED
1662 tracker._meta_data.Unlog(self)
1663 finally:
1664 self._apply_lock.release()
1667 class EntityGroupTracker(object):
1668 """An entity group involved a transaction."""
1670 APPLIED = -2
1676 _read_pos = None
1679 _snapshot = None
1682 _meta_data = None
1684 def __init__(self, entity_group):
1685 self._entity_group = entity_group
1686 self._put = {}
1687 self._delete = {}
1689 def _GrabSnapshot(self, txn_manager):
1690 """Snapshot this entity group, remembering the read position."""
1691 if self._snapshot is None:
1692 self._meta_data, self._read_pos, self._snapshot = (
1693 txn_manager._GrabSnapshot(self._entity_group))
1694 return self._snapshot
1697 class EntityGroupMetaData(object):
1698 """The meta_data assoicated with an entity group."""
1701 _log_pos = -1
1703 _snapshot = None
1705 def __init__(self, entity_group):
1706 self._entity_group = entity_group
1707 self._write_lock = threading.Lock()
1708 self._apply_queue = []
1710 def CatchUp(self):
1711 """Applies all outstanding txns."""
1713 assert self._write_lock.acquire(False) is False
1715 while self._apply_queue:
1716 self._apply_queue[0]._Apply(self)
1718 def Log(self, txn):
1719 """Add a pending transaction to this entity group.
1721 Requires that the caller hold the meta data lock.
1722 This also increments the current log position and clears the snapshot cache.
1725 assert self._write_lock.acquire(False) is False
1726 self._apply_queue.append(txn)
1727 self._log_pos += 1
1728 self._snapshot = None
1730 def Unlog(self, txn):
1731 """Remove the first pending transaction from the apply queue.
1733 Requires that the caller hold the meta data lock.
1734 This checks that the first pending transaction is indeed txn.
1737 assert self._write_lock.acquire(False) is False
1739 Check(self._apply_queue and self._apply_queue[0] is txn,
1740 'Transaction is not appliable',
1741 datastore_pb.Error.INTERNAL_ERROR)
1742 self._apply_queue.pop(0)
1745 class BaseConsistencyPolicy(object):
1746 """A base class for a consistency policy to be used with a transaction manger.
1751 def _OnCommit(self, txn):
1752 """Called after a LiveTxn has been commited.
1754 This function can decide whether to apply the txn right away.
1756 Args:
1757 txn: A LiveTxn that has been commited
1759 raise NotImplementedError
1761 def _OnGroom(self, meta_data_list):
1762 """Called once for every global query.
1764 This function must aqcuire the write lock for any meta data before applying
1765 any outstanding txns.
1767 Args:
1768 meta_data_list: A list of EntityGroupMetaData objects.
1770 raise NotImplementedError
1773 class MasterSlaveConsistencyPolicy(BaseConsistencyPolicy):
1774 """Enforces the Master / Slave consistency policy.
1776 Applies all txn on commit.
1779 def _OnCommit(self, txn):
1781 for tracker in txn._GetAllTrackers():
1782 tracker._meta_data._write_lock.acquire()
1783 try:
1784 tracker._meta_data.CatchUp()
1785 finally:
1786 tracker._meta_data._write_lock.release()
1791 txn._txn_manager.Write()
1793 def _OnGroom(self, meta_data_list):
1796 pass
1799 class BaseHighReplicationConsistencyPolicy(BaseConsistencyPolicy):
1800 """A base class for High Replication Datastore consistency policies.
1802 All txn are applied asynchronously.
1805 def _OnCommit(self, txn):
1806 pass
1808 def _OnGroom(self, meta_data_list):
1811 for meta_data in meta_data_list:
1812 if not meta_data._apply_queue:
1813 continue
1816 meta_data._write_lock.acquire()
1817 try:
1818 while meta_data._apply_queue:
1819 txn = meta_data._apply_queue[0]
1820 if self._ShouldApply(txn, meta_data):
1821 txn._Apply(meta_data)
1822 else:
1823 break
1824 finally:
1825 meta_data._write_lock.release()
1827 def _ShouldApply(self, txn, meta_data):
1828 """Determins if the given transaction should be applied."""
1829 raise NotImplementedError
1832 class TimeBasedHRConsistencyPolicy(BaseHighReplicationConsistencyPolicy):
1833 """A High Replication Datastore consiseny policy based on elapsed time.
1835 This class tries to simulate performance seen in the high replication
1836 datastore using estimated probabilities of a transaction commiting after a
1837 given amount of time.
1840 _classification_map = [(.98, 100),
1841 (.99, 300),
1842 (.995, 2000),
1843 (1, 240000)
1846 def SetClassificationMap(self, classification_map):
1847 """Set the probability a txn will be applied after a given amount of time.
1849 Args:
1850 classification_map: A list of tuples containing (float between 0 and 1,
1851 number of miliseconds) that define the probability of a transaction
1852 applying after a given amount of time.
1854 for prob, delay in classification_map:
1855 if prob < 0 or prob > 1 or delay <= 0:
1856 raise TypeError(
1857 'classification_map must be a list of (probability, delay) tuples, '
1858 'found %r' % (classification_map,))
1860 self._classification_map = sorted(classification_map)
1862 def _ShouldApplyImpl(self, elapsed_ms, classification):
1863 for rate, ms in self._classification_map:
1864 if classification <= rate:
1865 break
1866 return elapsed_ms >= ms
1868 def _Classify(self, txn, meta_data):
1869 return random.Random(id(txn) ^ id(meta_data)).random()
1871 def _ShouldApply(self, txn, meta_data):
1872 elapsed_ms = (time.time() - txn._commit_time_s) * 1000
1873 classification = self._Classify(txn, meta_data)
1874 return self._ShouldApplyImpl(elapsed_ms, classification)
1877 class PseudoRandomHRConsistencyPolicy(BaseHighReplicationConsistencyPolicy):
1878 """A policy that always gives the same sequence of consistency decisions."""
1880 def __init__(self, probability=.5, seed=0):
1881 """Constructor.
1883 Args:
1884 probability: A number between 0 and 1 that is the likelihood of a
1885 transaction applying before a global query is executed.
1886 seed: A hashable object to use as a seed. Use None to use the current
1887 timestamp.
1889 self.SetProbability(probability)
1890 self.SetSeed(seed)
1892 def SetProbability(self, probability):
1893 """Change the probability of a transaction applying.
1895 Args:
1896 probability: A number between 0 and 1 that determins the probability of a
1897 transaction applying before a global query is run.
1899 if probability < 0 or probability > 1:
1900 raise TypeError('probability must be a number between 0 and 1, found %r' %
1901 probability)
1902 self._probability = probability
1904 def SetSeed(self, seed):
1905 """Reset the seed."""
1906 self._random = random.Random(seed)
1908 def _ShouldApply(self, txn, meta_data):
1909 return self._random.random() < self._probability
1912 class BaseTransactionManager(object):
1913 """A class that manages the state of transactions.
1915 This includes creating consistent snap shots for transactions.
1918 def __init__(self, consistency_policy=None):
1919 super(BaseTransactionManager, self).__init__()
1921 self._consistency_policy = (consistency_policy or
1922 MasterSlaveConsistencyPolicy())
1925 self._meta_data_lock = threading.Lock()
1926 BaseTransactionManager.Clear(self)
1928 def SetConsistencyPolicy(self, policy):
1929 """Set the consistency to use.
1931 Causes all data to be flushed.
1933 Args:
1934 policy: A obj inheriting from BaseConsistencyPolicy.
1936 if not isinstance(policy, BaseConsistencyPolicy):
1937 raise TypeError('policy should be of type '
1938 'datastore_stub_util.BaseConsistencyPolicy found %r.' %
1939 (policy,))
1940 self.Flush()
1941 self._consistency_policy = policy
1943 def Clear(self):
1944 """Discards any pending transactions and resets the meta data."""
1946 self._meta_data = {}
1948 self._txn_map = {}
1950 def BeginTransaction(self, app, allow_multiple_eg):
1951 """Start a transaction on the given app.
1953 Args:
1954 app: A string representing the app for which to start the transaction.
1955 allow_multiple_eg: True if transactions can span multiple entity groups.
1957 Returns:
1958 A datastore_pb.Transaction for the created transaction
1960 Check(not (allow_multiple_eg and isinstance(
1961 self._consistency_policy, MasterSlaveConsistencyPolicy)),
1962 'transactions on multiple entity groups only allowed with the '
1963 'High Replication datastore')
1964 txn = self._BeginTransaction(app, allow_multiple_eg)
1965 self._txn_map[id(txn)] = txn
1966 transaction = datastore_pb.Transaction()
1967 transaction.set_app(app)
1968 transaction.set_handle(id(txn))
1969 return transaction
1971 def GetTxn(self, transaction, request_trusted, request_app):
1972 """Gets the LiveTxn object associated with the given transaction.
1974 Args:
1975 transaction: The datastore_pb.Transaction to look up.
1976 request_trusted: A boolean indicating If the requesting app is trusted.
1977 request_app: A string representing the app making the request.
1979 Returns:
1980 The associated LiveTxn object.
1982 request_app = datastore_types.ResolveAppId(request_app)
1983 CheckTransaction(request_trusted, request_app, transaction)
1984 txn = self._txn_map.get(transaction.handle())
1985 Check(txn and txn._app == transaction.app(),
1986 'Transaction(<%s>) not found' % str(transaction).replace('\n', ', '))
1987 return txn
1989 def Groom(self):
1990 """Attempts to apply any outstanding transactions.
1992 The consistency policy determins if a transaction should be applied.
1994 self._meta_data_lock.acquire()
1995 try:
1996 self._consistency_policy._OnGroom(self._meta_data.itervalues())
1997 finally:
1998 self._meta_data_lock.release()
2000 def Flush(self):
2001 """Applies all outstanding transactions."""
2002 self._meta_data_lock.acquire()
2003 try:
2004 for meta_data in self._meta_data.itervalues():
2005 if not meta_data._apply_queue:
2006 continue
2009 meta_data._write_lock.acquire()
2010 try:
2011 meta_data.CatchUp()
2012 finally:
2013 meta_data._write_lock.release()
2014 finally:
2015 self._meta_data_lock.release()
2017 def _GetMetaData(self, entity_group):
2018 """Safely gets the EntityGroupMetaData object for the given entity_group.
2020 self._meta_data_lock.acquire()
2021 try:
2022 key = datastore_types.ReferenceToKeyValue(entity_group)
2024 meta_data = self._meta_data.get(key, None)
2025 if not meta_data:
2026 meta_data = EntityGroupMetaData(entity_group)
2027 self._meta_data[key] = meta_data
2028 return meta_data
2029 finally:
2030 self._meta_data_lock.release()
2032 def _BeginTransaction(self, app, allow_multiple_eg):
2033 """Starts a transaction without storing it in the txn_map."""
2034 return LiveTxn(self, app, allow_multiple_eg)
2036 def _GrabSnapshot(self, entity_group):
2037 """Grabs a consistent snapshot of the given entity group.
2039 Args:
2040 entity_group: A entity_pb.Reference of the entity group of which the
2041 snapshot should be taken.
2043 Returns:
2044 A tuple of (meta_data, log_pos, snapshot) where log_pos is the current log
2045 position and snapshot is a map of reference key value to
2046 entity_pb.EntityProto.
2049 meta_data = self._GetMetaData(entity_group)
2050 meta_data._write_lock.acquire()
2051 try:
2052 if not meta_data._snapshot:
2054 meta_data.CatchUp()
2055 meta_data._snapshot = self._GetEntitiesInEntityGroup(entity_group)
2056 return meta_data, meta_data._log_pos, meta_data._snapshot
2057 finally:
2059 meta_data._write_lock.release()
2061 def _AcquireWriteLocks(self, meta_data_list):
2062 """Acquire the write locks for the given entity group meta data.
2064 These locks must be released with _ReleaseWriteLock before returning to the
2065 user.
2067 Args:
2068 meta_data_list: list of EntityGroupMetaData objects.
2070 for meta_data in sorted(meta_data_list):
2071 meta_data._write_lock.acquire()
2073 def _ReleaseWriteLocks(self, meta_data_list):
2074 """Release the write locks of the given entity group meta data.
2076 Args:
2077 meta_data_list: list of EntityGroupMetaData objects.
2079 for meta_data in sorted(meta_data_list):
2080 meta_data._write_lock.release()
2082 def _RemoveTxn(self, txn):
2083 """Removes a LiveTxn from the txn_map (if present)."""
2084 self._txn_map.pop(id(txn), None)
2086 def _Put(self, entity, insert):
2087 """Put the given entity.
2089 This must be implemented by a sub-class. The sub-class can assume that any
2090 need consistency is enforced at a higher level (and can just put blindly).
2092 Args:
2093 entity: The entity_pb.EntityProto to put.
2094 insert: A boolean that indicates if we should fail if the entity already
2095 exists.
2097 raise NotImplementedError
2099 def _Delete(self, reference):
2100 """Delete the entity associated with the specified reference.
2102 This must be implemented by a sub-class. The sub-class can assume that any
2103 need consistency is enforced at a higher level (and can just delete
2104 blindly).
2106 Args:
2107 reference: The entity_pb.Reference of the entity to delete.
2109 raise NotImplementedError
2111 def _GetEntitiesInEntityGroup(self, entity_group):
2112 """Gets the contents of a specific entity group.
2114 This must be implemented by a sub-class. The sub-class can assume that any
2115 need consistency is enforced at a higher level (and can just blindly read).
2117 Other entity groups may be modified concurrently.
2119 Args:
2120 entity_group: A entity_pb.Reference of the entity group to get.
2122 Returns:
2123 A dict mapping datastore_types.ReferenceToKeyValue(key) to EntityProto
2125 raise NotImplementedError
2128 class BaseIndexManager(object):
2129 """A generic index manager that stores all data in memory."""
2138 WRITE_ONLY = entity_pb.CompositeIndex.WRITE_ONLY
2139 READ_WRITE = entity_pb.CompositeIndex.READ_WRITE
2140 DELETED = entity_pb.CompositeIndex.DELETED
2141 ERROR = entity_pb.CompositeIndex.ERROR
2143 _INDEX_STATE_TRANSITIONS = {
2144 WRITE_ONLY: frozenset((READ_WRITE, DELETED, ERROR)),
2145 READ_WRITE: frozenset((DELETED,)),
2146 ERROR: frozenset((DELETED,)),
2147 DELETED: frozenset((ERROR,)),
2150 def __init__(self):
2154 self.__indexes = collections.defaultdict(list)
2155 self.__indexes_lock = threading.Lock()
2156 self.__next_index_id = 1
2157 self.__index_id_lock = threading.Lock()
2159 def __FindIndex(self, index):
2160 """Finds an existing index by definition.
2162 Args:
2163 index: entity_pb.CompositeIndex
2165 Returns:
2166 entity_pb.CompositeIndex, if it exists; otherwise None
2168 app = index.app_id()
2169 if app in self.__indexes:
2170 for stored_index in self.__indexes[app]:
2171 if index.definition() == stored_index.definition():
2172 return stored_index
2174 return None
2176 def CreateIndex(self, index, trusted=False, calling_app=None):
2177 calling_app = datastore_types.ResolveAppId(calling_app)
2178 CheckAppId(trusted, calling_app, index.app_id())
2179 Check(index.id() == 0, 'New index id must be 0.')
2180 Check(not self.__FindIndex(index), 'Index already exists.')
2183 self.__index_id_lock.acquire()
2184 index.set_id(self.__next_index_id)
2185 self.__next_index_id += 1
2186 self.__index_id_lock.release()
2189 clone = entity_pb.CompositeIndex()
2190 clone.CopyFrom(index)
2191 app = index.app_id()
2192 clone.set_app_id(app)
2195 self.__indexes_lock.acquire()
2196 try:
2197 self.__indexes[app].append(clone)
2198 finally:
2199 self.__indexes_lock.release()
2201 self._OnIndexChange(index.app_id())
2203 return index.id()
2205 def GetIndexes(self, app, trusted=False, calling_app=None):
2206 """Get the CompositeIndex objects for the given app."""
2207 calling_app = datastore_types.ResolveAppId(calling_app)
2208 CheckAppId(trusted, calling_app, app)
2210 return self.__indexes[app]
2212 def UpdateIndex(self, index, trusted=False, calling_app=None):
2213 CheckAppId(trusted, calling_app, index.app_id())
2215 stored_index = self.__FindIndex(index)
2216 Check(stored_index, 'Index does not exist.')
2217 Check(index.state() == stored_index.state() or
2218 index.state() in self._INDEX_STATE_TRANSITIONS[stored_index.state()],
2219 'cannot move index state from %s to %s' %
2220 (entity_pb.CompositeIndex.State_Name(stored_index.state()),
2221 (entity_pb.CompositeIndex.State_Name(index.state()))))
2224 self.__indexes_lock.acquire()
2225 try:
2226 stored_index.set_state(index.state())
2227 finally:
2228 self.__indexes_lock.release()
2230 self._OnIndexChange(index.app_id())
2232 def DeleteIndex(self, index, trusted=False, calling_app=None):
2233 CheckAppId(trusted, calling_app, index.app_id())
2235 stored_index = self.__FindIndex(index)
2236 Check(stored_index, 'Index does not exist.')
2239 app = index.app_id()
2240 self.__indexes_lock.acquire()
2241 try:
2242 self.__indexes[app].remove(stored_index)
2243 finally:
2244 self.__indexes_lock.release()
2246 self._OnIndexChange(index.app_id())
2248 def _SideLoadIndex(self, index):
2249 self.__indexes[index.app()].append(index)
2251 def _OnIndexChange(self, app_id):
2252 pass
2255 class BaseDatastore(BaseTransactionManager, BaseIndexManager):
2256 """A base implemenation of a Datastore.
2258 This class implements common functions associated with a datastore and
2259 enforces security restrictions passed on by a stub or client. It is designed
2260 to be shared by any number of threads or clients serving any number of apps.
2262 If an app is not specified explicitly it is pulled from the env and assumed to
2263 be untrusted.
2268 _MAX_QUERY_COMPONENTS = 100
2272 _BATCH_SIZE = 20
2276 _MAX_ACTIONS_PER_TXN = 5
2278 def __init__(self, require_indexes=False, consistency_policy=None,
2279 use_atexit=True, auto_id_policy=SEQUENTIAL):
2280 BaseTransactionManager.__init__(self, consistency_policy=consistency_policy)
2281 BaseIndexManager.__init__(self)
2283 self._require_indexes = require_indexes
2284 self._pseudo_kinds = {}
2285 self.SetAutoIdPolicy(auto_id_policy)
2287 if use_atexit:
2292 atexit.register(self.Write)
2294 def Clear(self):
2295 """Clears out all stored values."""
2297 BaseTransactionManager.Clear(self)
2300 def _RegisterPseudoKind(self, kind):
2301 """Registers a pseudo kind to be used to satisfy a meta data query."""
2302 self._pseudo_kinds[kind.name] = kind
2303 kind._stub = weakref.proxy(self)
2308 def GetQueryCursor(self, raw_query, trusted=False, calling_app=None,
2309 filter_predicate=None):
2310 """Execute a query.
2312 Args:
2313 raw_query: The non-validated datastore_pb.Query to run.
2314 trusted: If the calling app is trusted.
2315 calling_app: The app requesting the results or None to pull the app from
2316 the environment.
2317 filter_predicate: an additional filter of type
2318 datastore_query.FilterPredicate. This is passed along to implement V4
2319 specific filters without changing the entire stub.
2321 Returns:
2322 A BaseCursor that can be used to retrieve results.
2325 calling_app = datastore_types.ResolveAppId(calling_app)
2326 CheckAppId(trusted, calling_app, raw_query.app())
2329 filters, orders = datastore_index.Normalize(raw_query.filter_list(),
2330 raw_query.order_list(),
2331 raw_query.property_name_list())
2334 CheckQuery(raw_query, filters, orders, self._MAX_QUERY_COMPONENTS)
2335 FillUsersInQuery(filters)
2337 index_list = []
2341 if filter_predicate is None:
2342 self._CheckHasIndex(raw_query, trusted, calling_app)
2345 index_list = self.__IndexListForQuery(raw_query)
2348 if raw_query.has_transaction():
2350 Check(raw_query.kind() not in self._pseudo_kinds,
2351 'transactional queries on "%s" not allowed' % raw_query.kind())
2352 txn = self.GetTxn(raw_query.transaction(), trusted, calling_app)
2353 return txn.GetQueryCursor(raw_query, filters, orders, index_list)
2355 if raw_query.has_ancestor() and raw_query.kind() not in self._pseudo_kinds:
2357 txn = self._BeginTransaction(raw_query.app(), False)
2358 return txn.GetQueryCursor(raw_query, filters, orders, index_list,
2359 filter_predicate)
2362 self.Groom()
2363 return self._GetQueryCursor(raw_query, filters, orders, index_list,
2364 filter_predicate)
2366 def __IndexListForQuery(self, query):
2367 """Get the single composite index pb used by the query, if any, as a list.
2369 Args:
2370 query: the datastore_pb.Query to compute the index list for
2372 Returns:
2373 A singleton list of the composite index pb used by the query,
2376 required, kind, ancestor, props = (
2377 datastore_index.CompositeIndexForQuery(query))
2378 if not required:
2379 return []
2380 composite_index_pb = entity_pb.CompositeIndex()
2381 composite_index_pb.set_app_id(query.app())
2382 composite_index_pb.set_id(0)
2383 composite_index_pb.set_state(entity_pb.CompositeIndex.READ_WRITE)
2384 index_pb = composite_index_pb.mutable_definition()
2385 index_pb.set_entity_type(kind)
2386 index_pb.set_ancestor(bool(ancestor))
2387 for name, direction in datastore_index.GetRecommendedIndexProperties(props):
2388 prop_pb = entity_pb.Index_Property()
2389 prop_pb.set_name(name)
2390 prop_pb.set_direction(direction)
2391 index_pb.property_list().append(prop_pb)
2392 return [composite_index_pb]
2394 def Get(self, raw_keys, transaction=None, eventual_consistency=False,
2395 trusted=False, calling_app=None):
2396 """Get the entities for the given keys.
2398 Args:
2399 raw_keys: A list of unverified entity_pb.Reference objects.
2400 transaction: The datastore_pb.Transaction to use or None.
2401 eventual_consistency: If we should allow stale, potentially inconsistent
2402 results.
2403 trusted: If the calling app is trusted.
2404 calling_app: The app requesting the results or None to pull the app from
2405 the environment.
2407 Returns:
2408 A list containing the entity or None if no entity exists.
2411 if not raw_keys:
2412 return []
2414 calling_app = datastore_types.ResolveAppId(calling_app)
2416 if not transaction and eventual_consistency:
2418 result = []
2419 for key in raw_keys:
2420 CheckReference(calling_app, trusted, key)
2421 result.append(self._GetWithPseudoKinds(None, key))
2422 return result
2427 grouped_keys = collections.defaultdict(list)
2428 for i, key in enumerate(raw_keys):
2429 CheckReference(trusted, calling_app, key)
2430 entity_group = _GetEntityGroup(key)
2431 entity_group_key = datastore_types.ReferenceToKeyValue(entity_group)
2432 grouped_keys[entity_group_key].append((key, i))
2434 if transaction:
2436 txn = self.GetTxn(transaction, trusted, calling_app)
2437 return [self._GetWithPseudoKinds(txn, key) for key in raw_keys]
2438 else:
2441 result = [None] * len(raw_keys)
2443 def op(txn, v):
2444 key, i = v
2445 result[i] = self._GetWithPseudoKinds(txn, key)
2446 for keys in grouped_keys.itervalues():
2447 self._RunInTxn(keys, keys[0][0].app(), op)
2448 return result
2450 def _GetWithPseudoKinds(self, txn, key):
2451 """Fetch entity key in txn, taking account of pseudo-kinds."""
2452 pseudo_kind = self._pseudo_kinds.get(_GetKeyKind(key), None)
2453 if pseudo_kind:
2454 return pseudo_kind.Get(txn, key)
2455 elif txn:
2456 return txn.Get(key)
2457 else:
2458 return self._Get(key)
2460 def Put(self, raw_entities, cost, transaction=None,
2461 trusted=False, calling_app=None):
2462 """Writes the given given entities.
2464 Updates an entity's key and entity_group in place if needed
2466 Args:
2467 raw_entities: A list of unverified entity_pb.EntityProto objects.
2468 cost: Out param. The cost of putting the provided entities.
2469 transaction: The datastore_pb.Transaction to use or None.
2470 trusted: If the calling app is trusted.
2471 calling_app: The app requesting the results or None to pull the app from
2472 the environment.
2473 Returns:
2474 A list of entity_pb.Reference objects that indicates where each entity
2475 was stored.
2477 if not raw_entities:
2478 return []
2480 calling_app = datastore_types.ResolveAppId(calling_app)
2483 result = [None] * len(raw_entities)
2484 grouped_entities = collections.defaultdict(list)
2485 for i, raw_entity in enumerate(raw_entities):
2486 CheckEntity(trusted, calling_app, raw_entity)
2490 entity = entity_pb.EntityProto()
2491 entity.CopyFrom(raw_entity)
2494 for prop in itertools.chain(entity.property_list(),
2495 entity.raw_property_list()):
2496 FillUser(prop)
2498 last_element = entity.key().path().element_list()[-1]
2499 if not (last_element.id() or last_element.has_name()):
2500 insert = True
2503 if self._auto_id_policy == SEQUENTIAL:
2504 last_element.set_id(self._AllocateSequentialIds(entity.key())[0])
2505 else:
2506 full_key = self._AllocateIds([entity.key()])[0]
2507 last_element.set_id(full_key.path().element_list()[-1].id())
2508 else:
2509 insert = False
2511 entity_group = _GetEntityGroup(entity.key())
2512 entity.mutable_entity_group().CopyFrom(entity_group.path())
2513 entity_group_key = datastore_types.ReferenceToKeyValue(entity_group)
2514 grouped_entities[entity_group_key].append((entity, insert))
2518 key = entity_pb.Reference()
2519 key.CopyFrom(entity.key())
2520 result[i] = key
2522 if transaction:
2524 txn = self.GetTxn(transaction, trusted, calling_app)
2525 for group in grouped_entities.values():
2526 for entity, insert in group:
2528 indexes = _FilterIndexesByKind(entity.key(), self.GetIndexes(
2529 entity.key().app(), trusted, calling_app))
2530 txn.Put(entity, insert, indexes)
2531 else:
2533 for entities in grouped_entities.itervalues():
2534 txn_cost = self._RunInTxn(
2535 entities, entities[0][0].key().app(),
2537 lambda txn, v: txn.Put(v[0], v[1], _FilterIndexesByKind(
2538 v[0].key(),
2539 self.GetIndexes(v[0].key().app(), trusted, calling_app))))
2540 _UpdateCost(cost, txn_cost.entity_writes(), txn_cost.index_writes())
2541 return result
2543 def Delete(self, raw_keys, cost, transaction=None,
2544 trusted=False, calling_app=None):
2545 """Deletes the entities associated with the given keys.
2547 Args:
2548 raw_keys: A list of unverified entity_pb.Reference objects.
2549 cost: Out param. The cost of putting the provided entities.
2550 transaction: The datastore_pb.Transaction to use or None.
2551 trusted: If the calling app is trusted.
2552 calling_app: The app requesting the results or None to pull the app from
2553 the environment.
2555 if not raw_keys:
2556 return
2558 calling_app = datastore_types.ResolveAppId(calling_app)
2561 grouped_keys = collections.defaultdict(list)
2562 for key in raw_keys:
2563 CheckReference(trusted, calling_app, key)
2564 entity_group = _GetEntityGroup(key)
2565 entity_group_key = datastore_types.ReferenceToKeyValue(entity_group)
2566 grouped_keys[entity_group_key].append(key)
2568 if transaction:
2570 txn = self.GetTxn(transaction, trusted, calling_app)
2571 for key in raw_keys:
2573 indexes = _FilterIndexesByKind(key, self.GetIndexes(
2574 key.app(), trusted, calling_app))
2575 txn.Delete(key, indexes)
2576 else:
2578 for keys in grouped_keys.itervalues():
2580 txn_cost = self._RunInTxn(
2581 keys, keys[0].app(),
2582 lambda txn, key: txn.Delete(key, _FilterIndexesByKind(
2583 key, self.GetIndexes(key.app(), trusted, calling_app))))
2584 _UpdateCost(cost, txn_cost.entity_writes(), txn_cost.index_writes())
2586 def Touch(self, raw_keys, trusted=False, calling_app=None):
2587 """Applies all outstanding writes."""
2588 calling_app = datastore_types.ResolveAppId(calling_app)
2590 grouped_keys = collections.defaultdict(list)
2591 for key in raw_keys:
2592 CheckReference(trusted, calling_app, key)
2593 entity_group = _GetEntityGroup(key)
2594 entity_group_key = datastore_types.ReferenceToKeyValue(entity_group)
2595 grouped_keys[entity_group_key].append(key)
2597 for keys in grouped_keys.itervalues():
2598 self._RunInTxn(keys, keys[0].app(), lambda txn, key: None)
2600 def _RunInTxn(self, values, app, op):
2601 """Runs the given values in a separate Txn.
2603 Retries up to _RETRIES times on CONCURRENT_TRANSACTION errors.
2605 Args:
2606 values: A list of arguments to op.
2607 app: The app to create the Txn on.
2608 op: A function to run on each value in the Txn.
2610 Returns:
2611 The cost of the txn.
2613 retries = 0
2614 backoff = _INITIAL_RETRY_DELAY_MS / 1000.0
2615 while True:
2616 try:
2617 txn = self._BeginTransaction(app, False)
2618 for value in values:
2619 op(txn, value)
2620 return txn.Commit()
2621 except apiproxy_errors.ApplicationError, e:
2622 if e.application_error == datastore_pb.Error.CONCURRENT_TRANSACTION:
2624 retries += 1
2625 if retries <= _RETRIES:
2626 time.sleep(backoff)
2627 backoff *= _RETRY_DELAY_MULTIPLIER
2628 if backoff * 1000.0 > _MAX_RETRY_DELAY_MS:
2629 backoff = _MAX_RETRY_DELAY_MS / 1000.0
2630 continue
2631 raise
2633 def _CheckHasIndex(self, query, trusted=False, calling_app=None):
2634 """Checks if the query can be satisfied given the existing indexes.
2636 Args:
2637 query: the datastore_pb.Query to check
2638 trusted: True if the calling app is trusted (like dev_admin_console)
2639 calling_app: app_id of the current running application
2641 if query.kind() in self._pseudo_kinds or not self._require_indexes:
2642 return
2644 minimal_index = datastore_index.MinimalCompositeIndexForQuery(query,
2645 (datastore_index.ProtoToIndexDefinition(index)
2646 for index in self.GetIndexes(query.app(), trusted, calling_app)
2647 if index.state() == entity_pb.CompositeIndex.READ_WRITE))
2648 if minimal_index is not None:
2649 msg = ('This query requires a composite index that is not defined. '
2650 'You must update the index.yaml file in your application root.')
2651 is_most_efficient, kind, ancestor, properties = minimal_index
2652 if not is_most_efficient:
2654 yaml = datastore_index.IndexYamlForQuery(kind, ancestor,
2655 datastore_index.GetRecommendedIndexProperties(properties))
2656 msg += '\nThe following index is the minimum index required:\n' + yaml
2657 raise apiproxy_errors.ApplicationError(datastore_pb.Error.NEED_INDEX, msg)
2659 def SetAutoIdPolicy(self, auto_id_policy):
2660 """Set value of _auto_id_policy flag (default SEQUENTIAL).
2662 SEQUENTIAL auto ID assignment behavior will eventually be deprecated
2663 and the default will be SCATTERED.
2665 Args:
2666 auto_id_policy: string constant.
2667 Raises:
2668 TypeError: if auto_id_policy is not one of SEQUENTIAL or SCATTERED.
2670 valid_policies = (SEQUENTIAL, SCATTERED)
2671 if auto_id_policy not in valid_policies:
2672 raise TypeError('auto_id_policy must be in %s, found %s instead',
2673 valid_policies, auto_id_policy)
2674 self._auto_id_policy = auto_id_policy
2678 def Write(self):
2679 """Writes the datastore to disk."""
2680 self.Flush()
2682 def _GetQueryCursor(self, query, filters, orders, index_list,
2683 filter_predicate):
2684 """Runs the given datastore_pb.Query and returns a QueryCursor for it.
2686 This must be implemented by a sub-class. The sub-class does not need to
2687 enforced any consistency guarantees (and can just blindly read).
2689 Args:
2690 query: The datastore_pb.Query to run.
2691 filters: A list of filters that override the ones found on query.
2692 orders: A list of orders that override the ones found on query.
2693 index_list: A list of indexes used by the query.
2694 filter_predicate: an additional filter of type
2695 datastore_query.FilterPredicate. This is passed along to implement V4
2696 specific filters without changing the entire stub.
2698 Returns:
2699 A BaseCursor that can be used to fetch query results.
2701 raise NotImplementedError
2703 def _Get(self, reference):
2704 """Get the entity for the given reference or None.
2706 This must be implemented by a sub-class. The sub-class does not need to
2707 enforced any consistency guarantees (and can just blindly read).
2709 Args:
2710 reference: A entity_pb.Reference to loop up.
2712 Returns:
2713 The entity_pb.EntityProto associated with the given reference or None.
2715 raise NotImplementedError
2717 def _AllocateSequentialIds(self, reference, size=1, max_id=None):
2718 """Allocate sequential ids for given reference.
2720 Args:
2721 reference: An entity_pb.Reference to allocate an id for.
2722 size: The size of the range to allocate
2723 max_id: The upper bound of the range to allocate
2725 Returns:
2726 A tuple containing (min, max) of the allocated range.
2728 raise NotImplementedError
2730 def _AllocateIds(self, references):
2731 """Allocate or reserves IDs for the v4 datastore API.
2733 Incomplete keys are allocated scattered IDs. Complete keys have every id in
2734 their paths reserved in the appropriate ID space.
2736 Args:
2737 references: a list of entity_pb.Reference objects to allocate or reserve
2739 Returns:
2740 a list of complete entity_pb.Reference objects corresponding to the
2741 incomplete keys in the input, with newly allocated ids.
2743 raise NotImplementedError
2746 def _NeedsIndexes(func):
2747 """A decorator for DatastoreStub methods that require or affect indexes.
2749 Updates indexes to match index.yaml before the call and updates index.yaml
2750 after the call if require_indexes is False. If root_path is not set, this is a
2751 no op.
2754 def UpdateIndexesWrapper(self, *args, **kwargs):
2755 self._SetupIndexes()
2756 try:
2757 return func(self, *args, **kwargs)
2758 finally:
2759 self._UpdateIndexes()
2761 return UpdateIndexesWrapper
2764 class EntityGroupPseudoKind(object):
2765 """A common implementation of get() for the __entity_group__ pseudo-kind.
2767 Public properties:
2768 name: the pseudo-kind name
2770 name = '__entity_group__'
2780 base_version = int(time.time() * 1e6)
2782 def Get(self, txn, key):
2783 """Fetch key of this pseudo-kind within txn.
2785 Args:
2786 txn: transaction within which Get occurs, may be None if this is an
2787 eventually consistent Get.
2788 key: key of pseudo-entity to Get.
2790 Returns:
2791 An entity for key, or None if it doesn't exist.
2794 if not txn:
2795 txn = self._stub._BeginTransaction(key.app(), False)
2796 try:
2797 return self.Get(txn, key)
2798 finally:
2799 txn.Rollback()
2802 if isinstance(txn._txn_manager._consistency_policy,
2803 MasterSlaveConsistencyPolicy):
2804 return None
2811 path = key.path()
2812 if path.element_size() != 2 or path.element_list()[-1].id() != 1:
2813 return None
2815 tracker = txn._GetTracker(key)
2816 tracker._GrabSnapshot(txn._txn_manager)
2818 eg = entity_pb.EntityProto()
2819 eg.mutable_key().CopyFrom(key)
2820 eg.mutable_entity_group().CopyFrom(_GetEntityGroup(key).path())
2821 version = entity_pb.Property()
2822 version.set_name('__version__')
2823 version.set_multiple(False)
2824 version.mutable_value().set_int64value(
2825 tracker._read_pos + self.base_version)
2826 eg.property_list().append(version)
2827 return eg
2829 def Query(self, query, filters, orders):
2830 """Perform a query on this pseudo-kind.
2832 Args:
2833 query: the original datastore_pb.Query.
2834 filters: the filters from query.
2835 orders: the orders from query.
2837 Returns:
2838 always raises an error
2842 raise apiproxy_errors.ApplicationError(
2843 datastore_pb.Error.BAD_REQUEST, 'queries not supported on ' + self.name)
2846 class DatastoreStub(object):
2847 """A stub that maps datastore service calls on to a BaseDatastore.
2849 This class also keeps track of query cursors.
2852 def __init__(self,
2853 datastore,
2854 app_id=None,
2855 trusted=None,
2856 root_path=None):
2857 super(DatastoreStub, self).__init__()
2858 self._datastore = datastore
2859 self._app_id = datastore_types.ResolveAppId(app_id)
2860 self._trusted = trusted
2861 self._root_path = root_path
2864 self.__query_history = {}
2867 self.__query_ci_history = set()
2871 self._cached_yaml = (None, None, None)
2873 if self._require_indexes or root_path is None:
2875 self._index_yaml_updater = None
2876 else:
2878 self._index_yaml_updater = datastore_stub_index.IndexYamlUpdater(
2879 root_path)
2881 DatastoreStub.Clear(self)
2883 def Clear(self):
2884 """Clears out all stored values."""
2885 self._query_cursors = {}
2886 self.__query_history = {}
2887 self.__query_ci_history = set()
2889 def QueryHistory(self):
2890 """Returns a dict that maps Query PBs to times they've been run."""
2892 return dict((pb, times) for pb, times in self.__query_history.items()
2893 if pb.app() == self._app_id)
2895 def _QueryCompositeIndexHistoryLength(self):
2896 """Returns the length of the CompositeIndex set for query history."""
2897 return len(self.__query_ci_history)
2899 def SetTrusted(self, trusted):
2900 """Set/clear the trusted bit in the stub.
2902 This bit indicates that the app calling the stub is trusted. A
2903 trusted app can write to datastores of other apps.
2905 Args:
2906 trusted: boolean.
2908 self._trusted = trusted
2912 def _Dynamic_Get(self, req, res):
2915 transaction = req.has_transaction() and req.transaction() or None
2918 if req.allow_deferred() and req.key_size() > _MAXIMUM_RESULTS:
2922 keys_to_get = req.key_list()[-_MAXIMUM_RESULTS:]
2923 deferred_keys = req.key_list()[:-_MAXIMUM_RESULTS]
2924 res.deferred_list().extend(deferred_keys)
2925 else:
2927 keys_to_get = req.key_list()
2929 res.set_in_order(not req.allow_deferred())
2931 total_response_bytes = 0
2932 for index, entity in enumerate(self._datastore.Get(keys_to_get,
2933 transaction,
2934 req.has_failover_ms(),
2935 self._trusted,
2936 self._app_id)):
2937 entity_size = entity and entity.ByteSize() or 0
2940 if (req.allow_deferred()
2941 and index > 0
2942 and total_response_bytes + entity_size > _MAXIMUM_QUERY_RESULT_BYTES):
2944 res.deferred_list().extend(keys_to_get[index:])
2945 break
2946 elif entity:
2947 entity_result = res.add_entity()
2948 entity_result.mutable_entity().CopyFrom(entity)
2949 total_response_bytes += entity_size
2950 else:
2952 entity_result = res.add_entity()
2953 entity_result.mutable_key().CopyFrom(keys_to_get[index])
2955 def _Dynamic_Put(self, req, res):
2956 transaction = req.has_transaction() and req.transaction() or None
2957 res.key_list().extend(self._datastore.Put(req.entity_list(),
2958 res.mutable_cost(),
2959 transaction,
2960 self._trusted, self._app_id))
2962 def _Dynamic_Delete(self, req, res):
2963 transaction = req.has_transaction() and req.transaction() or None
2964 self._datastore.Delete(req.key_list(), res.mutable_cost(), transaction,
2965 self._trusted, self._app_id)
2967 def _Dynamic_Touch(self, req, _):
2968 self._datastore.Touch(req.key_list(), self._trusted, self._app_id)
2970 @_NeedsIndexes
2971 def _Dynamic_RunQuery(self, query, query_result, filter_predicate=None):
2972 self.__UpgradeCursors(query)
2973 cursor = self._datastore.GetQueryCursor(query, self._trusted, self._app_id,
2974 filter_predicate)
2976 if query.has_count():
2977 count = query.count()
2978 elif query.has_limit():
2979 count = query.limit()
2980 else:
2981 count = self._BATCH_SIZE
2983 cursor.PopulateQueryResult(query_result, count, query.offset(),
2984 query.compile(), first_result=True)
2985 if query_result.has_cursor():
2986 self._query_cursors[query_result.cursor().cursor()] = cursor
2989 if query.compile():
2992 compiled_query = query_result.mutable_compiled_query()
2993 compiled_query.set_keys_only(query.keys_only())
2994 compiled_query.mutable_primaryscan().set_index_name(query.Encode())
2995 self.__UpdateQueryHistory(query)
2997 def __UpgradeCursors(self, query):
2998 """Upgrades compiled cursors in place.
3000 If the cursor position does not specify before_ascending, populate it.
3001 If before_ascending is already populated, use it and the sort direction
3002 from the query to set an appropriate value for start_inclusive.
3004 Args:
3005 query: datastore_pb.Query
3007 first_sort_direction = None
3008 if query.order_list():
3009 first_sort_direction = query.order(0).direction()
3011 for compiled_cursor in [query.compiled_cursor(),
3012 query.end_compiled_cursor()]:
3013 self.__UpgradeCursor(compiled_cursor, first_sort_direction)
3015 def __UpgradeCursor(self, compiled_cursor, first_sort_direction):
3016 """Upgrades a compiled cursor in place.
3018 If the cursor position does not specify before_ascending, populate it.
3019 If before_ascending is already populated, use it and the provided direction
3020 to set an appropriate value for start_inclusive.
3022 Args:
3023 compiled_cursor: datastore_pb.CompiledCursor
3024 first_sort_direction: first sort direction from the query or None
3028 if not self.__IsPlannable(compiled_cursor):
3029 return
3030 elif compiled_cursor.position().has_before_ascending():
3031 _SetStartInclusive(compiled_cursor.position(), first_sort_direction)
3032 elif compiled_cursor.position().has_start_inclusive():
3033 _SetBeforeAscending(compiled_cursor.position(), first_sort_direction)
3035 def __IsPlannable(self, compiled_cursor):
3036 """Returns True if compiled_cursor is plannable.
3038 Args:
3039 compiled_cursor: datastore_pb.CompiledCursor
3041 position = compiled_cursor.position()
3042 return position.has_key() or position.indexvalue_list()
3044 def __UpdateQueryHistory(self, query):
3046 clone = datastore_pb.Query()
3047 clone.CopyFrom(query)
3048 clone.clear_hint()
3049 clone.clear_limit()
3050 clone.clear_offset()
3051 clone.clear_count()
3052 if clone in self.__query_history:
3053 self.__query_history[clone] += 1
3054 else:
3055 self.__query_history[clone] = 1
3056 if clone.app() == self._app_id:
3057 self.__query_ci_history.add(
3058 datastore_index.CompositeIndexForQuery(clone))
3060 def _Dynamic_Next(self, next_request, query_result):
3061 app = next_request.cursor().app()
3062 CheckAppId(self._trusted, self._app_id, app)
3064 cursor = self._query_cursors.get(next_request.cursor().cursor())
3065 Check(cursor and cursor.app == app,
3066 'Cursor %d not found' % next_request.cursor().cursor())
3068 count = self._BATCH_SIZE
3069 if next_request.has_count():
3070 count = next_request.count()
3072 cursor.PopulateQueryResult(query_result, count, next_request.offset(),
3073 next_request.compile(), first_result=False)
3075 if not query_result.has_cursor():
3076 del self._query_cursors[next_request.cursor().cursor()]
3078 def _Dynamic_AddActions(self, request, _):
3079 """Associates the creation of one or more tasks with a transaction.
3081 Args:
3082 request: A taskqueue_service_pb.TaskQueueBulkAddRequest containing the
3083 tasks that should be created when the transaction is committed.
3089 if not request.add_request_list():
3090 return
3092 transaction = request.add_request_list()[0].transaction()
3093 txn = self._datastore.GetTxn(transaction, self._trusted, self._app_id)
3094 new_actions = []
3095 for add_request in request.add_request_list():
3099 Check(add_request.transaction() == transaction,
3100 'Cannot add requests to different transactions')
3101 clone = taskqueue_service_pb.TaskQueueAddRequest()
3102 clone.CopyFrom(add_request)
3103 clone.clear_transaction()
3104 new_actions.append(clone)
3106 txn.AddActions(new_actions, self._MAX_ACTIONS_PER_TXN)
3108 def _Dynamic_BeginTransaction(self, req, transaction):
3109 CheckAppId(self._trusted, self._app_id, req.app())
3110 transaction.CopyFrom(self._datastore.BeginTransaction(
3111 req.app(), req.allow_multiple_eg()))
3113 def _Dynamic_Commit(self, transaction, res):
3114 CheckAppId(self._trusted, self._app_id, transaction.app())
3115 txn = self._datastore.GetTxn(transaction, self._trusted, self._app_id)
3116 res.mutable_cost().CopyFrom(txn.Commit())
3118 def _Dynamic_Rollback(self, transaction, _):
3119 CheckAppId(self._trusted, self._app_id, transaction.app())
3120 txn = self._datastore.GetTxn(transaction, self._trusted, self._app_id)
3121 txn.Rollback()
3123 def _Dynamic_CreateIndex(self, index, id_response):
3124 id_response.set_value(self._datastore.CreateIndex(index,
3125 self._trusted,
3126 self._app_id))
3128 @_NeedsIndexes
3129 def _Dynamic_GetIndices(self, app_str, composite_indices):
3130 composite_indices.index_list().extend(self._datastore.GetIndexes(
3131 app_str.value(), self._trusted, self._app_id))
3133 def _Dynamic_UpdateIndex(self, index, _):
3134 self._datastore.UpdateIndex(index, self._trusted, self._app_id)
3136 def _Dynamic_DeleteIndex(self, index, _):
3137 self._datastore.DeleteIndex(index, self._trusted, self._app_id)
3139 def _Dynamic_AllocateIds(self, allocate_ids_request, allocate_ids_response):
3140 Check(not allocate_ids_request.has_model_key()
3141 or not allocate_ids_request.reserve_list(),
3142 'Cannot allocate and reserve IDs in the same request')
3143 if allocate_ids_request.reserve_list():
3144 Check(not allocate_ids_request.has_size(),
3145 'Cannot specify size when reserving IDs')
3146 Check(not allocate_ids_request.has_max(),
3147 'Cannot specify max when reserving IDs')
3149 if allocate_ids_request.has_model_key():
3150 CheckAppId(allocate_ids_request.model_key().app(),
3151 self._trusted, self._app_id)
3153 reference = allocate_ids_request.model_key()
3155 (start, end) = self._datastore._AllocateSequentialIds(
3156 reference, allocate_ids_request.size(), allocate_ids_request.max())
3158 allocate_ids_response.set_start(start)
3159 allocate_ids_response.set_end(end)
3160 else:
3161 for reference in allocate_ids_request.reserve_list():
3162 CheckAppId(reference.app(), self._trusted, self._app_id)
3163 self._datastore._AllocateIds(allocate_ids_request.reserve_list())
3164 allocate_ids_response.set_start(0)
3165 allocate_ids_response.set_end(0)
3167 def _SetupIndexes(self, _open=open):
3168 """Ensure that the set of existing composite indexes matches index.yaml.
3170 Note: this is similar to the algorithm used by the admin console for
3171 the same purpose.
3176 if not self._root_path:
3177 return
3178 index_yaml_file = os.path.join(self._root_path, 'index.yaml')
3179 if (self._cached_yaml[0] == index_yaml_file and
3180 os.path.exists(index_yaml_file) and
3181 os.path.getmtime(index_yaml_file) == self._cached_yaml[1]):
3182 requested_indexes = self._cached_yaml[2]
3183 else:
3184 try:
3185 index_yaml_mtime = os.path.getmtime(index_yaml_file)
3186 fh = _open(index_yaml_file, 'r')
3187 except (OSError, IOError):
3188 index_yaml_data = None
3189 else:
3190 try:
3191 index_yaml_data = fh.read()
3192 finally:
3193 fh.close()
3195 requested_indexes = []
3196 if index_yaml_data is not None:
3198 index_defs = datastore_index.ParseIndexDefinitions(index_yaml_data)
3199 if index_defs is not None and index_defs.indexes is not None:
3201 requested_indexes = datastore_index.IndexDefinitionsToProtos(
3202 self._app_id,
3203 index_defs.indexes)
3204 self._cached_yaml = (index_yaml_file, index_yaml_mtime,
3205 requested_indexes)
3208 existing_indexes = self._datastore.GetIndexes(
3209 self._app_id, self._trusted, self._app_id)
3212 requested = dict((x.definition().Encode(), x) for x in requested_indexes)
3213 existing = dict((x.definition().Encode(), x) for x in existing_indexes)
3216 created = 0
3217 for key, index in requested.iteritems():
3218 if key not in existing:
3219 new_index = entity_pb.CompositeIndex()
3220 new_index.CopyFrom(index)
3221 new_index.set_id(datastore_admin.CreateIndex(new_index))
3222 new_index.set_state(entity_pb.CompositeIndex.READ_WRITE)
3223 datastore_admin.UpdateIndex(new_index)
3224 created += 1
3227 deleted = 0
3228 for key, index in existing.iteritems():
3229 if key not in requested:
3230 datastore_admin.DeleteIndex(index)
3231 deleted += 1
3234 if created or deleted:
3235 logging.debug('Created %d and deleted %d index(es); total %d',
3236 created, deleted, len(requested))
3238 def _UpdateIndexes(self):
3239 if self._index_yaml_updater is not None:
3240 self._index_yaml_updater.UpdateIndexYaml()
3243 class StubQueryConverter(object):
3244 """Converter for v3 and v4 queries suitable for use in stubs."""
3246 def __init__(self, entity_converter):
3247 self._entity_converter = entity_converter
3249 def v4_to_v3_compiled_cursor(self, v4_cursor, v3_compiled_cursor):
3250 """Converts a v4 cursor string to a v3 CompiledCursor.
3252 Args:
3253 v4_cursor: a string representing a v4 query cursor
3254 v3_compiled_cursor: a datastore_pb.CompiledCursor to populate
3256 v3_compiled_cursor.Clear()
3257 try:
3258 v3_compiled_cursor.ParseFromString(v4_cursor)
3259 except ProtocolBuffer.ProtocolBufferDecodeError:
3260 raise datastore_pbs.InvalidConversionError('Invalid query cursor.')
3262 def v3_to_v4_compiled_cursor(self, v3_compiled_cursor):
3263 """Converts a v3 CompiledCursor to a v4 cursor string.
3265 Args:
3266 v3_compiled_cursor: a datastore_pb.CompiledCursor
3268 Returns:
3269 a string representing a v4 query cursor
3271 return v3_compiled_cursor.SerializeToString()
3273 def v4_to_v3_query(self, v4_partition_id, v4_query, v3_query):
3274 """Converts a v4 Query to a v3 Query.
3276 Args:
3277 v4_partition_id: a datastore_v4_pb.PartitionId
3278 v4_query: a datastore_v4_pb.Query
3279 v3_query: a datastore_pb.Query to populate
3281 Raises:
3282 InvalidConversionError if the query cannot be converted
3284 v3_query.Clear()
3286 if v4_partition_id.dataset_id():
3287 v3_query.set_app(v4_partition_id.dataset_id())
3288 if v4_partition_id.has_namespace():
3289 v3_query.set_name_space(v4_partition_id.namespace())
3291 v3_query.set_persist_offset(True)
3292 v3_query.set_require_perfect_plan(True)
3293 v3_query.set_compile(True)
3296 if v4_query.has_limit():
3297 v3_query.set_limit(v4_query.limit())
3298 if v4_query.offset():
3299 v3_query.set_offset(v4_query.offset())
3300 if v4_query.has_start_cursor():
3301 self.v4_to_v3_compiled_cursor(v4_query.start_cursor(),
3302 v3_query.mutable_compiled_cursor())
3303 if v4_query.has_end_cursor():
3304 self.v4_to_v3_compiled_cursor(v4_query.end_cursor(),
3305 v3_query.mutable_end_compiled_cursor())
3308 if v4_query.kind_list():
3309 datastore_pbs.check_conversion(len(v4_query.kind_list()) == 1,
3310 'multiple kinds not supported')
3311 v3_query.set_kind(v4_query.kind(0).name())
3314 has_key_projection = False
3315 for prop in v4_query.projection_list():
3316 if prop.property().name() == datastore_pbs.PROPERTY_NAME_KEY:
3317 has_key_projection = True
3318 else:
3319 v3_query.add_property_name(prop.property().name())
3320 if has_key_projection and not v3_query.property_name_list():
3321 v3_query.set_keys_only(True)
3324 for prop in v4_query.group_by_list():
3325 v3_query.add_group_by_property_name(prop.name())
3328 self.__populate_v3_filters(v4_query.filter(), v3_query)
3331 for v4_order in v4_query.order_list():
3332 v3_order = v3_query.add_order()
3333 v3_order.set_property(v4_order.property().name())
3334 if v4_order.has_direction():
3335 v3_order.set_direction(v4_order.direction())
3337 def v3_to_v4_query(self, v3_query, v4_query):
3338 """Converts a v3 Query to a v4 Query.
3340 Args:
3341 v3_query: a datastore_pb.Query
3342 v4_query: a datastore_v4_pb.Query to populate
3344 Raises:
3345 InvalidConversionError if the query cannot be converted
3347 v4_query.Clear()
3349 datastore_pbs.check_conversion(not v3_query.has_distinct(),
3350 'distinct option not supported')
3351 datastore_pbs.check_conversion(v3_query.require_perfect_plan(),
3352 'non-perfect plans not supported')
3356 if v3_query.has_limit():
3357 v4_query.set_limit(v3_query.limit())
3358 if v3_query.offset():
3359 v4_query.set_offset(v3_query.offset())
3360 if v3_query.has_compiled_cursor():
3361 v4_query.set_start_cursor(
3362 self.v3_to_v4_compiled_cursor(v3_query.compiled_cursor()))
3363 if v3_query.has_end_compiled_cursor():
3364 v4_query.set_end_cursor(
3365 self.v3_to_v4_compiled_cursor(v3_query.end_compiled_cursor()))
3368 if v3_query.has_kind():
3369 v4_query.add_kind().set_name(v3_query.kind())
3372 for name in v3_query.property_name_list():
3373 v4_query.add_projection().mutable_property().set_name(name)
3374 if v3_query.keys_only():
3375 v4_query.add_projection().mutable_property().set_name(
3376 datastore_pbs.PROPERTY_NAME_KEY)
3379 for name in v3_query.group_by_property_name_list():
3380 v4_query.add_group_by().set_name(name)
3383 num_v4_filters = len(v3_query.filter_list())
3384 if v3_query.has_ancestor():
3385 num_v4_filters += 1
3387 if num_v4_filters == 1:
3388 get_property_filter = self.__get_property_filter
3389 elif num_v4_filters >= 1:
3390 v4_query.mutable_filter().mutable_composite_filter().set_operator(
3391 datastore_v4_pb.CompositeFilter.AND)
3392 get_property_filter = self.__add_property_filter
3394 if v3_query.has_ancestor():
3395 self.__v3_query_to_v4_ancestor_filter(v3_query,
3396 get_property_filter(v4_query))
3397 for v3_filter in v3_query.filter_list():
3398 self.__v3_filter_to_v4_property_filter(v3_filter,
3399 get_property_filter(v4_query))
3402 for v3_order in v3_query.order_list():
3403 v4_order = v4_query.add_order()
3404 v4_order.mutable_property().set_name(v3_order.property())
3405 if v3_order.has_direction():
3406 v4_order.set_direction(v3_order.direction())
3408 def __get_property_filter(self, v4_query):
3409 """Returns the PropertyFilter from the query's top-level filter."""
3410 return v4_query.mutable_filter().mutable_property_filter()
3412 def __add_property_filter(self, v4_query):
3413 """Adds and returns a PropertyFilter from the query's composite filter."""
3414 v4_comp_filter = v4_query.mutable_filter().mutable_composite_filter()
3415 return v4_comp_filter.add_filter().mutable_property_filter()
3417 def __populate_v3_filters(self, v4_filter, v3_query):
3418 """Populates a filters for a v3 Query.
3420 Args:
3421 v4_filter: a datastore_v4_pb.Filter
3422 v3_query: a datastore_pb.Query to populate with filters
3425 datastore_pbs.check_conversion(not v4_filter.has_bounding_circle_filter(),
3426 'bounding circle filter not supported')
3427 datastore_pbs.check_conversion(not v4_filter.has_bounding_box_filter(),
3428 'bounding box filter not supported')
3430 if v4_filter.has_property_filter():
3431 v4_property_filter = v4_filter.property_filter()
3432 if (v4_property_filter.operator()
3433 == datastore_v4_pb.PropertyFilter.HAS_ANCESTOR):
3434 datastore_pbs.check_conversion(
3435 v4_property_filter.value().has_key_value(),
3436 'HAS_ANCESTOR requires a reference value')
3437 datastore_pbs.check_conversion((v4_property_filter.property().name()
3438 == datastore_pbs.PROPERTY_NAME_KEY),
3439 'unsupported property')
3440 datastore_pbs.check_conversion(not v3_query.has_ancestor(),
3441 'duplicate ancestor constraint')
3442 self._entity_converter.v4_to_v3_reference(
3443 v4_property_filter.value().key_value(),
3444 v3_query.mutable_ancestor())
3445 else:
3446 v3_filter = v3_query.add_filter()
3447 property_name = v4_property_filter.property().name()
3448 v3_filter.set_op(v4_property_filter.operator())
3449 datastore_pbs.check_conversion(
3450 not v4_property_filter.value().list_value_list(),
3451 ('unsupported value type, %s, in property filter'
3452 ' on "%s"' % ('list_value', property_name)))
3453 prop = v3_filter.add_property()
3454 prop.set_multiple(False)
3455 prop.set_name(property_name)
3456 self._entity_converter.v4_value_to_v3_property_value(
3457 v4_property_filter.value(), prop.mutable_value())
3458 elif v4_filter.has_composite_filter():
3459 datastore_pbs.check_conversion((v4_filter.composite_filter().operator()
3460 == datastore_v4_pb.CompositeFilter.AND),
3461 'unsupported composite property operator')
3462 for v4_sub_filter in v4_filter.composite_filter().filter_list():
3463 self.__populate_v3_filters(v4_sub_filter, v3_query)
3465 def __v3_filter_to_v4_property_filter(self, v3_filter, v4_property_filter):
3466 """Converts a v3 Filter to a v4 PropertyFilter.
3468 Args:
3469 v3_filter: a datastore_pb.Filter
3470 v4_property_filter: a datastore_v4_pb.PropertyFilter to populate
3472 Raises:
3473 InvalidConversionError if the filter cannot be converted
3475 datastore_pbs.check_conversion(v3_filter.property_size() == 1,
3476 'invalid filter')
3477 datastore_pbs.check_conversion(v3_filter.op() <= 5,
3478 'unsupported filter op: %d' % v3_filter.op())
3479 v4_property_filter.Clear()
3480 v4_property_filter.set_operator(v3_filter.op())
3481 v4_property_filter.mutable_property().set_name(v3_filter.property(0).name())
3482 self._entity_converter.v3_property_to_v4_value(
3483 v3_filter.property(0), True, v4_property_filter.mutable_value())
3485 def __v3_query_to_v4_ancestor_filter(self, v3_query, v4_property_filter):
3486 """Converts a v3 Query to a v4 ancestor PropertyFilter.
3488 Args:
3489 v3_query: a datastore_pb.Query
3490 v4_property_filter: a datastore_v4_pb.PropertyFilter to populate
3492 v4_property_filter.Clear()
3493 v4_property_filter.set_operator(
3494 datastore_v4_pb.PropertyFilter.HAS_ANCESTOR)
3495 prop = v4_property_filter.mutable_property()
3496 prop.set_name(datastore_pbs.PROPERTY_NAME_KEY)
3497 self._entity_converter.v3_to_v4_key(
3498 v3_query.ancestor(),
3499 v4_property_filter.mutable_value().mutable_key_value())
3503 __query_converter = StubQueryConverter(datastore_pbs.get_entity_converter())
3506 def get_query_converter():
3507 """Returns a converter for v3 and v4 queries (not suitable for production).
3509 This converter is suitable for use in stubs but not for production.
3511 Returns:
3512 a StubQueryConverter
3514 return __query_converter
3517 class StubServiceConverter(object):
3518 """Converter for v3/v4 request/response protos suitable for use in stubs."""
3520 def __init__(self, entity_converter, query_converter):
3521 self._entity_converter = entity_converter
3522 self._query_converter = query_converter
3524 def v4_to_v3_cursor(self, v4_query_handle, v3_cursor):
3525 """Converts a v4 cursor string to a v3 Cursor.
3527 Args:
3528 v4_query_handle: a string representing a v4 query handle
3529 v3_cursor: a datastore_pb.Cursor to populate
3531 try:
3532 v3_cursor.ParseFromString(v4_query_handle)
3533 except ProtocolBuffer.ProtocolBufferDecodeError:
3534 raise datastore_pbs.InvalidConversionError('Invalid query handle.')
3535 return v3_cursor
3537 def _v3_to_v4_query_handle(self, v3_cursor):
3538 """Converts a v3 Cursor to a v4 query handle string.
3540 Args:
3541 v3_cursor: a datastore_pb.Cursor
3543 Returns:
3544 a string representing a v4 cursor
3546 return v3_cursor.SerializeToString()
3548 def v4_to_v3_txn(self, v4_txn, v3_txn):
3549 """Converts a v4 transaction string to a v3 Transaction.
3551 Args:
3552 v4_txn: a string representing a v4 transaction
3553 v3_txn: a datastore_pb.Transaction to populate
3555 try:
3556 v3_txn.ParseFromString(v4_txn)
3557 except ProtocolBuffer.ProtocolBufferDecodeError:
3558 raise datastore_pbs.InvalidConversionError('Invalid transaction.')
3559 return v3_txn
3561 def _v3_to_v4_txn(self, v3_txn):
3562 """Converts a v3 Transaction to a v4 transaction string.
3564 Args:
3565 v3_txn: a datastore_pb.Transaction
3567 Returns:
3568 a string representing a v4 transaction
3570 return v3_txn.SerializeToString()
3575 def v4_to_v3_begin_transaction_req(self, app_id, v4_req):
3576 """Converts a v4 BeginTransactionRequest to a v3 BeginTransactionRequest.
3578 Args:
3579 app_id: app id
3580 v4_req: a datastore_v4_pb.BeginTransactionRequest
3582 Returns:
3583 a datastore_pb.BeginTransactionRequest
3585 v3_req = datastore_pb.BeginTransactionRequest()
3586 v3_req.set_app(app_id)
3587 v3_req.set_allow_multiple_eg(v4_req.cross_group())
3588 return v3_req
3590 def v3_to_v4_begin_transaction_resp(self, v3_resp):
3591 """Converts a v3 Transaction to a v4 BeginTransactionResponse.
3593 Args:
3594 v3_resp: a datastore_pb.Transaction
3596 Returns:
3597 a datastore_v4_pb.BeginTransactionResponse
3599 v4_resp = datastore_v4_pb.BeginTransactionResponse()
3600 v4_resp.set_transaction(self._v3_to_v4_txn(v3_resp))
3601 return v4_resp
3606 def v4_rollback_req_to_v3_txn(self, v4_req):
3607 """Converts a v4 RollbackRequest to a v3 Transaction.
3609 Args:
3610 v4_req: a datastore_v4_pb.RollbackRequest
3612 Returns:
3613 a datastore_pb.Transaction
3615 v3_txn = datastore_pb.Transaction()
3616 self.v4_to_v3_txn(v4_req.transaction(), v3_txn)
3617 return v3_txn
3622 def v4_commit_req_to_v3_txn(self, v4_req):
3623 """Converts a v4 CommitRequest to a v3 Transaction.
3625 Args:
3626 v4_req: a datastore_v4_pb.CommitRequest
3628 Returns:
3629 a datastore_pb.Transaction
3631 v3_txn = datastore_pb.Transaction()
3632 self.v4_to_v3_txn(v4_req.transaction(), v3_txn)
3633 return v3_txn
3638 def v4_run_query_req_to_v3_query(self, v4_req):
3639 """Converts a v4 RunQueryRequest to a v3 Query.
3641 GQL is not supported.
3643 Args:
3644 v4_req: a datastore_v4_pb.RunQueryRequest
3646 Returns:
3647 a datastore_pb.Query
3650 datastore_pbs.check_conversion(not v4_req.has_gql_query(),
3651 'GQL not supported')
3652 v3_query = datastore_pb.Query()
3653 self._query_converter.v4_to_v3_query(v4_req.partition_id(), v4_req.query(),
3654 v3_query)
3657 if v4_req.has_suggested_batch_size():
3658 v3_query.set_count(v4_req.suggested_batch_size())
3661 read_options = v4_req.read_options()
3662 if read_options.has_transaction():
3663 self.v4_to_v3_txn(read_options.transaction(),
3664 v3_query.mutable_transaction())
3665 elif (read_options.read_consistency()
3666 == datastore_v4_pb.ReadOptions.EVENTUAL):
3667 v3_query.set_strong(False)
3668 v3_query.set_failover_ms(-1)
3669 elif read_options.read_consistency() == datastore_v4_pb.ReadOptions.STRONG:
3670 v3_query.set_strong(True)
3672 if v4_req.has_min_safe_time_seconds():
3673 v3_query.set_min_safe_time_seconds(v4_req.min_safe_time_seconds())
3675 return v3_query
3677 def v3_to_v4_run_query_req(self, v3_req):
3678 """Converts a v3 Query to a v4 RunQueryRequest.
3680 Args:
3681 v3_req: a datastore_pb.Query
3683 Returns:
3684 a datastore_v4_pb.RunQueryRequest
3686 v4_req = datastore_v4_pb.RunQueryRequest()
3689 v4_partition_id = v4_req.mutable_partition_id()
3690 v4_partition_id.set_dataset_id(v3_req.app())
3691 if v3_req.name_space():
3692 v4_partition_id.set_namespace(v3_req.name_space())
3695 if v3_req.has_count():
3696 v4_req.set_suggested_batch_size(v3_req.count())
3698 datastore_pbs.check_conversion(
3699 not (v3_req.has_transaction() and v3_req.has_failover_ms()),
3700 'Cannot set failover and transaction handle.')
3703 if v3_req.has_transaction():
3704 v4_req.mutable_read_options().set_transaction(
3705 self._v3_to_v4_txn(v3_req.transaction()))
3706 elif v3_req.strong():
3707 v4_req.mutable_read_options().set_read_consistency(
3708 datastore_v4_pb.ReadOptions.STRONG)
3709 elif v3_req.has_failover_ms():
3710 v4_req.mutable_read_options().set_read_consistency(
3711 datastore_v4_pb.ReadOptions.EVENTUAL)
3712 if v3_req.has_min_safe_time_seconds():
3713 v4_req.set_min_safe_time_seconds(v3_req.min_safe_time_seconds())
3715 self._query_converter.v3_to_v4_query(v3_req, v4_req.mutable_query())
3717 return v4_req
3719 def v4_run_query_resp_to_v3_query_result(self, v4_resp):
3720 """Converts a V4 RunQueryResponse to a v3 QueryResult.
3722 Args:
3723 v4_resp: a datastore_v4_pb.QueryResult
3725 Returns:
3726 a datastore_pb.QueryResult
3728 v3_resp = self.v4_to_v3_query_result(v4_resp.batch())
3731 if v4_resp.has_query_handle():
3732 self.v4_to_v3_cursor(v4_resp.query_handle(), v3_resp.mutable_cursor())
3734 return v3_resp
3736 def v3_to_v4_run_query_resp(self, v3_resp):
3737 """Converts a v3 QueryResult to a V4 RunQueryResponse.
3739 Args:
3740 v3_resp: a datastore_pb.QueryResult
3742 Returns:
3743 a datastore_v4_pb.RunQueryResponse
3745 v4_resp = datastore_v4_pb.RunQueryResponse()
3746 self.v3_to_v4_query_result_batch(v3_resp, v4_resp.mutable_batch())
3748 if v3_resp.has_cursor():
3749 v4_resp.set_query_handle(
3750 self._query_converter.v3_to_v4_compiled_cursor(v3_resp.cursor()))
3752 return v4_resp
3757 def v4_to_v3_next_req(self, v4_req):
3758 """Converts a v4 ContinueQueryRequest to a v3 NextRequest.
3760 Args:
3761 v4_req: a datastore_v4_pb.ContinueQueryRequest
3763 Returns:
3764 a datastore_pb.NextRequest
3766 v3_req = datastore_pb.NextRequest()
3767 v3_req.set_compile(True)
3768 self.v4_to_v3_cursor(v4_req.query_handle(), v3_req.mutable_cursor())
3769 return v3_req
3771 def v3_to_v4_continue_query_resp(self, v3_resp):
3772 """Converts a v3 QueryResult to a v4 ContinueQueryResponse.
3774 Args:
3775 v3_resp: a datstore_pb.QueryResult
3777 Returns:
3778 a datastore_v4_pb.ContinueQueryResponse
3780 v4_resp = datastore_v4_pb.ContinueQueryResponse()
3781 self.v3_to_v4_query_result_batch(v3_resp, v4_resp.mutable_batch())
3782 return v4_resp
3787 def v4_to_v3_get_req(self, v4_req):
3788 """Converts a v4 LookupRequest to a v3 GetRequest.
3790 Args:
3791 v4_req: a datastore_v4_pb.LookupRequest
3793 Returns:
3794 a datastore_pb.GetRequest
3796 v3_req = datastore_pb.GetRequest()
3797 v3_req.set_allow_deferred(True)
3800 if v4_req.read_options().has_transaction():
3801 self.v4_to_v3_txn(v4_req.read_options().transaction(),
3802 v3_req.mutable_transaction())
3803 elif (v4_req.read_options().read_consistency()
3804 == datastore_v4_pb.ReadOptions.EVENTUAL):
3805 v3_req.set_strong(False)
3806 v3_req.set_failover_ms(-1)
3807 elif (v4_req.read_options().read_consistency()
3808 == datastore_v4_pb.ReadOptions.STRONG):
3809 v3_req.set_strong(True)
3811 for v4_key in v4_req.key_list():
3812 self._entity_converter.v4_to_v3_reference(v4_key, v3_req.add_key())
3814 return v3_req
3816 def v3_to_v4_lookup_resp(self, v3_resp):
3817 """Converts a v3 GetResponse to a v4 LookupResponse.
3819 Args:
3820 v3_resp: a datastore_pb.GetResponse
3822 Returns:
3823 a datastore_v4_pb.LookupResponse
3825 v4_resp = datastore_v4_pb.LookupResponse()
3827 for v3_ref in v3_resp.deferred_list():
3828 self._entity_converter.v3_to_v4_key(v3_ref, v4_resp.add_deferred())
3829 for v3_entity in v3_resp.entity_list():
3830 if v3_entity.has_entity():
3831 self._entity_converter.v3_to_v4_entity(
3832 v3_entity.entity(),
3833 v4_resp.add_found().mutable_entity())
3834 if v3_entity.has_key():
3835 self._entity_converter.v3_to_v4_key(
3836 v3_entity.key(),
3837 v4_resp.add_missing().mutable_entity().mutable_key())
3839 return v4_resp
3841 def v4_to_v3_query_result(self, v4_batch):
3842 """Converts a v4 QueryResultBatch to a v3 QueryResult.
3844 Args:
3845 v4_batch: a datastore_v4_pb.QueryResultBatch
3847 Returns:
3848 a datastore_pb.QueryResult
3850 v3_result = datastore_pb.QueryResult()
3853 v3_result.set_more_results(
3854 (v4_batch.more_results()
3855 == datastore_v4_pb.QueryResultBatch.NOT_FINISHED))
3856 if v4_batch.has_end_cursor():
3857 self._query_converter.v4_to_v3_compiled_cursor(
3858 v4_batch.end_cursor(), v3_result.mutable_compiled_cursor())
3861 if v4_batch.entity_result_type() == datastore_v4_pb.EntityResult.PROJECTION:
3862 v3_result.set_index_only(True)
3863 elif v4_batch.entity_result_type() == datastore_v4_pb.EntityResult.KEY_ONLY:
3864 v3_result.set_keys_only(True)
3867 if v4_batch.has_skipped_results():
3868 v3_result.set_skipped_results(v4_batch.skipped_results())
3869 for v4_entity in v4_batch.entity_result_list():
3870 v3_entity = v3_result.add_result()
3871 self._entity_converter.v4_to_v3_entity(v4_entity.entity(), v3_entity)
3872 if v4_batch.entity_result_type() != datastore_v4_pb.EntityResult.FULL:
3875 v3_entity.clear_entity_group()
3877 return v3_result
3879 def v3_to_v4_query_result_batch(self, v3_result, v4_batch):
3880 """Converts a v3 QueryResult to a v4 QueryResultBatch.
3882 Args:
3883 v3_result: a datastore_pb.QueryResult
3884 v4_batch: a datastore_v4_pb.QueryResultBatch to populate
3886 v4_batch.Clear()
3889 if v3_result.more_results():
3890 v4_batch.set_more_results(datastore_v4_pb.QueryResultBatch.NOT_FINISHED)
3891 else:
3892 v4_batch.set_more_results(
3893 datastore_v4_pb.QueryResultBatch.MORE_RESULTS_AFTER_LIMIT)
3894 if v3_result.has_compiled_cursor():
3895 v4_batch.set_end_cursor(
3896 self._query_converter.v3_to_v4_compiled_cursor(
3897 v3_result.compiled_cursor()))
3900 if v3_result.keys_only():
3901 v4_batch.set_entity_result_type(datastore_v4_pb.EntityResult.KEY_ONLY)
3902 elif v3_result.index_only():
3903 v4_batch.set_entity_result_type(datastore_v4_pb.EntityResult.PROJECTION)
3904 else:
3905 v4_batch.set_entity_result_type(datastore_v4_pb.EntityResult.FULL)
3908 if v3_result.has_skipped_results():
3909 v4_batch.set_skipped_results(v3_result.skipped_results())
3910 for v3_entity in v3_result.result_list():
3911 v4_entity_result = datastore_v4_pb.EntityResult()
3912 self._entity_converter.v3_to_v4_entity(v3_entity,
3913 v4_entity_result.mutable_entity())
3914 v4_batch.entity_result_list().append(v4_entity_result)
3918 __service_converter = StubServiceConverter(
3919 datastore_pbs.get_entity_converter(), __query_converter)
3922 def get_service_converter():
3923 """Returns a converter for v3 and v4 service request/response protos.
3925 This converter is suitable for use in stubs but not for production.
3927 Returns:
3928 a StubServiceConverter
3930 return __service_converter
3933 def ReverseBitsInt64(v):
3934 """Reverse the bits of a 64-bit integer.
3936 Args:
3937 v: Input integer of type 'int' or 'long'.
3939 Returns:
3940 Bit-reversed input as 'int' on 64-bit machines or as 'long' otherwise.
3943 v = ((v >> 1) & 0x5555555555555555) | ((v & 0x5555555555555555) << 1)
3944 v = ((v >> 2) & 0x3333333333333333) | ((v & 0x3333333333333333) << 2)
3945 v = ((v >> 4) & 0x0F0F0F0F0F0F0F0F) | ((v & 0x0F0F0F0F0F0F0F0F) << 4)
3946 v = ((v >> 8) & 0x00FF00FF00FF00FF) | ((v & 0x00FF00FF00FF00FF) << 8)
3947 v = ((v >> 16) & 0x0000FFFF0000FFFF) | ((v & 0x0000FFFF0000FFFF) << 16)
3948 v = int((v >> 32) | (v << 32) & 0xFFFFFFFFFFFFFFFF)
3949 return v
3952 def ToScatteredId(v):
3953 """Map counter value v to the scattered ID space.
3955 Translate to scattered ID space, then reverse bits.
3957 Args:
3958 v: Counter value from which to produce ID.
3960 Returns:
3961 Integer ID.
3963 Raises:
3964 datastore_errors.BadArgumentError if counter value exceeds the range of
3965 the scattered ID space.
3967 if v >= _MAX_SCATTERED_COUNTER:
3968 raise datastore_errors.BadArgumentError('counter value too large (%d)' %v)
3969 return _MAX_SEQUENTIAL_ID + 1 + long(ReverseBitsInt64(v << _SCATTER_SHIFT))
3972 def IdToCounter(k):
3973 """Map ID k to the counter value from which it was generated.
3975 Determine whether k is sequential or scattered ID.
3977 Args:
3978 k: ID from which to infer counter value.
3980 Returns:
3981 Tuple of integers (counter_value, id_space).
3983 if k > _MAX_SCATTERED_ID:
3984 return 0, SCATTERED
3985 elif k > _MAX_SEQUENTIAL_ID and k <= _MAX_SCATTERED_ID:
3986 return long(ReverseBitsInt64(k) >> _SCATTER_SHIFT), SCATTERED
3987 elif k > 0:
3988 return long(k), SEQUENTIAL
3989 else:
3990 raise datastore_errors.BadArgumentError('invalid id (%d)' % k)
3993 def CompareEntityPbByKey(a, b):
3994 """Compare two entity protobuf's by key.
3996 Args:
3997 a: entity_pb.EntityProto to compare
3998 b: entity_pb.EntityProto to compare
3999 Returns:
4000 <0 if a's key is before b's, =0 if they are the same key, and >0 otherwise.
4002 return cmp(datastore_types.Key._FromPb(a.key()),
4003 datastore_types.Key._FromPb(b.key()))
4006 def _GuessOrders(filters, orders):
4007 """Guess any implicit ordering.
4009 The datastore gives a logical, but not necessarily predictable, ordering when
4010 orders are not completely explicit. This function guesses at that ordering
4011 (which is better then always ordering by __key__ for tests).
4013 Args:
4014 filters: The datastore_pb.Query_Filter that have already been normalized and
4015 checked.
4016 orders: The datastore_pb.Query_Order that have already been normalized and
4017 checked. Mutated in place.
4019 orders = orders[:]
4022 if not orders:
4023 for filter_pb in filters:
4024 if filter_pb.op() in datastore_index.INEQUALITY_OPERATORS:
4026 order = datastore_pb.Query_Order()
4027 order.set_property(filter_pb.property(0).name())
4028 orders.append(order)
4029 break
4032 exists_props = (filter_pb.property(0).name() for filter_pb in filters
4033 if filter_pb.op() == datastore_pb.Query_Filter.EXISTS)
4034 for prop in sorted(exists_props):
4035 order = datastore_pb.Query_Order()
4036 order.set_property(prop)
4037 orders.append(order)
4040 if not orders or orders[-1].property() != '__key__':
4041 order = datastore_pb.Query_Order()
4042 order.set_property('__key__')
4043 orders.append(order)
4044 return orders
4047 def _MakeQuery(query_pb, filters, orders, filter_predicate):
4048 """Make a datastore_query.Query for the given datastore_pb.Query.
4050 Overrides filters and orders in query with the specified arguments.
4052 Args:
4053 query_pb: a datastore_pb.Query.
4054 filters: the filters from query.
4055 orders: the orders from query.
4056 filter_predicate: an additional filter of type
4057 datastore_query.FilterPredicate. This is passed along to implement V4
4058 specific filters without changing the entire stub.
4060 Returns:
4061 A datastore_query.Query for the datastore_pb.Query."""
4067 clone_pb = datastore_pb.Query()
4068 clone_pb.CopyFrom(query_pb)
4069 clone_pb.clear_filter()
4070 clone_pb.clear_order()
4071 clone_pb.filter_list().extend(filters)
4072 clone_pb.order_list().extend(orders)
4074 query = datastore_query.Query._from_pb(clone_pb)
4076 assert datastore_v4_pb.CompositeFilter._Operator_NAMES.values() == ['AND']
4081 if filter_predicate is not None:
4082 if query.filter_predicate is not None:
4085 filter_predicate = datastore_query.CompositeFilter(
4086 datastore_query.CompositeFilter.AND,
4087 [filter_predicate, query.filter_predicate])
4089 return datastore_query.Query(app=query.app,
4090 namespace=query.namespace,
4091 ancestor=query.ancestor,
4092 filter_predicate=filter_predicate,
4093 group_by=query.group_by,
4094 order=query.order)
4095 else:
4096 return query
4098 def _CreateIndexEntities(entity, postfix_props):
4099 """Creates entities for index values that would appear in prodcution.
4101 This function finds all multi-valued properties listed in split_props, and
4102 creates a new entity for each unique combination of values. The resulting
4103 entities will only have a single value for each property listed in
4104 split_props.
4106 It reserves the right to include index data that would not be
4107 seen in production, e.g. by returning the original entity when no splitting
4108 is needed. LoadEntity will remove any excess fields.
4110 This simulates the results seen by an index scan in the datastore.
4112 Args:
4113 entity: The entity_pb.EntityProto to split.
4114 split_props: A set of property names to split on.
4116 Returns:
4117 A list of the split entity_pb.EntityProtos.
4119 to_split = {}
4120 split_required = False
4121 base_props = []
4122 for prop in entity.property_list():
4123 if prop.name() in postfix_props:
4124 values = to_split.get(prop.name())
4125 if values is None:
4126 values = []
4127 to_split[prop.name()] = values
4128 else:
4130 split_required = True
4131 if prop.value() not in values:
4132 values.append(prop.value())
4133 else:
4134 base_props.append(prop)
4136 if not split_required:
4138 return [entity]
4140 clone = entity_pb.EntityProto()
4141 clone.CopyFrom(entity)
4142 clone.clear_property()
4143 clone.property_list().extend(base_props)
4144 results = [clone]
4146 for name, splits in to_split.iteritems():
4147 if len(splits) == 1:
4149 for result in results:
4150 prop = result.add_property()
4151 prop.set_name(name)
4152 prop.set_multiple(False)
4153 prop.set_meaning(entity_pb.Property.INDEX_VALUE)
4154 prop.mutable_value().CopyFrom(splits[0])
4155 continue
4157 new_results = []
4158 for result in results:
4159 for split in splits:
4160 clone = entity_pb.EntityProto()
4161 clone.CopyFrom(result)
4162 prop = clone.add_property()
4163 prop.set_name(name)
4164 prop.set_multiple(False)
4165 prop.set_meaning(entity_pb.Property.INDEX_VALUE)
4166 prop.mutable_value().CopyFrom(split)
4167 new_results.append(clone)
4168 results = new_results
4169 return results
4172 def _CreateIndexOnlyQueryResults(results, postfix_props):
4173 """Creates a result set similar to that returned by an index only query."""
4174 new_results = []
4175 for result in results:
4176 new_results.extend(_CreateIndexEntities(result, postfix_props))
4177 return new_results
4180 def _ExecuteQuery(results, query, filters, orders, index_list,
4181 filter_predicate=None):
4182 """Executes the query on a superset of its results.
4184 Args:
4185 results: superset of results for query.
4186 query: a datastore_pb.Query.
4187 filters: the filters from query.
4188 orders: the orders from query.
4189 index_list: the list of indexes used by the query.
4190 filter_predicate: an additional filter of type
4191 datastore_query.FilterPredicate. This is passed along to implement V4
4192 specific filters without changing the entire stub.
4194 Returns:
4195 A ListCursor over the results of applying query to results.
4197 orders = _GuessOrders(filters, orders)
4198 dsquery = _MakeQuery(query, filters, orders, filter_predicate)
4200 if query.property_name_size():
4201 results = _CreateIndexOnlyQueryResults(
4202 results, set(order.property() for order in orders))
4204 return ListCursor(query, dsquery, orders, index_list,
4205 datastore_query.apply_query(dsquery, results))
4208 def _UpdateCost(cost, entity_writes, index_writes):
4209 """Updates the provided cost.
4211 Args:
4212 cost: Out param. The cost object to update.
4213 entity_writes: The number of entity writes to add.
4214 index_writes: The number of index writes to add.
4216 cost.set_entity_writes(cost.entity_writes() + entity_writes)
4217 cost.set_index_writes(cost.index_writes() + index_writes)
4220 def _CalculateWriteOps(composite_indexes, old_entity, new_entity):
4221 """Determines number of entity and index writes needed to write new_entity.
4223 We assume that old_entity represents the current state of the Datastore.
4225 Args:
4226 composite_indexes: The composite_indexes for the kind of the entities.
4227 old_entity: Entity representing the current state in the Datstore.
4228 new_entity: Entity representing the desired state in the Datstore.
4230 Returns:
4231 A tuple of size 2, where the first value is the number of entity writes and
4232 the second value is the number of index writes.
4234 if (old_entity is not None and
4235 old_entity.property_list() == new_entity.property_list()
4236 and old_entity.raw_property_list() == new_entity.raw_property_list()):
4237 return 0, 0
4239 index_writes = _ChangedIndexRows(composite_indexes, old_entity, new_entity)
4240 if old_entity is None:
4244 index_writes += 1
4246 return 1, index_writes
4249 def _ChangedIndexRows(composite_indexes, old_entity, new_entity):
4250 """Determine the number of index rows that need to change.
4252 We assume that old_entity represents the current state of the Datastore.
4254 Args:
4255 composite_indexes: The composite_indexes for the kind of the entities.
4256 old_entity: Entity representing the current state in the Datastore.
4257 new_entity: Entity representing the desired state in the Datastore
4259 Returns:
4260 The number of index rows that need to change.
4265 unique_old_properties = collections.defaultdict(set)
4270 unique_new_properties = collections.defaultdict(set)
4272 if old_entity is not None:
4273 for old_prop in old_entity.property_list():
4274 _PopulateUniquePropertiesSet(old_prop, unique_old_properties)
4277 unchanged = collections.defaultdict(int)
4279 for new_prop in new_entity.property_list():
4280 new_prop_as_str = _PopulateUniquePropertiesSet(
4281 new_prop, unique_new_properties)
4282 if new_prop_as_str in unique_old_properties[new_prop.name()]:
4283 unchanged[new_prop.name()] += 1
4288 all_property_names = set(unique_old_properties.iterkeys())
4289 all_property_names.update(unique_old_properties.iterkeys())
4290 all_property_names.update(unchanged.iterkeys())
4292 all_indexes = _GetEntityByPropertyIndexes(all_property_names)
4293 all_indexes.extend([comp.definition() for comp in composite_indexes])
4294 path_size = new_entity.key().path().element_size()
4295 writes = 0
4296 for index in all_indexes:
4300 ancestor_multiplier = 1
4301 if index.ancestor() and index.property_size() > 1:
4302 ancestor_multiplier = path_size
4303 writes += (_CalculateWritesForCompositeIndex(
4304 index, unique_old_properties, unique_new_properties, unchanged) *
4305 ancestor_multiplier)
4306 return writes
4309 def _PopulateUniquePropertiesSet(prop, unique_properties):
4310 """Populates a set containing unique properties.
4312 Args:
4313 prop: An entity property.
4314 unique_properties: Dictionary mapping property names to a set of unique
4315 properties.
4317 Returns:
4318 The property pb in string (hashable) form.
4320 if prop.multiple():
4321 prop = _CopyAndSetMultipleToFalse(prop)
4324 prop_as_str = prop.SerializePartialToString()
4325 unique_properties[prop.name()].add(prop_as_str)
4326 return prop_as_str
4329 def _CalculateWritesForCompositeIndex(index, unique_old_properties,
4330 unique_new_properties,
4331 common_properties):
4332 """Calculate the number of writes required to maintain a specific Index.
4334 Args:
4335 index: The composite index.
4336 unique_old_properties: Dictionary mapping property names to a set of props
4337 present on the old entity.
4338 unique_new_properties: Dictionary mapping property names to a set of props
4339 present on the new entity.
4340 common_properties: Dictionary mapping property names to the number of
4341 properties with that name that are present on both the old and new
4342 entities.
4344 Returns:
4345 The number of writes required to maintained the provided index.
4347 old_count = 1
4348 new_count = 1
4349 common_count = 1
4350 for prop in index.property_list():
4351 old_count *= len(unique_old_properties[prop.name()])
4352 new_count *= len(unique_new_properties[prop.name()])
4353 common_count *= common_properties[prop.name()]
4355 return (old_count - common_count) + (new_count - common_count)
4358 def _GetEntityByPropertyIndexes(all_property_names):
4359 indexes = []
4360 for prop_name in all_property_names:
4361 indexes.append(
4362 _SinglePropertyIndex(prop_name, entity_pb.Index_Property.ASCENDING))
4363 indexes.append(
4364 _SinglePropertyIndex(prop_name, entity_pb.Index_Property.DESCENDING))
4365 return indexes
4368 def _SinglePropertyIndex(prop_name, direction):
4369 """Creates a single property Index for the given name and direction.
4371 Args:
4372 prop_name: The name of the single property on the Index.
4373 direction: The direction of the Index.
4375 Returns:
4376 A single property Index with the given property and direction.
4378 index = entity_pb.Index()
4379 prop = index.add_property()
4380 prop.set_name(prop_name)
4381 prop.set_direction(direction)
4382 return index
4385 def _CopyAndSetMultipleToFalse(prop):
4386 """Copy the provided Property and set its "multiple" attribute to False.
4388 Args:
4389 prop: The Property to copy.
4391 Returns:
4392 A copy of the given Property with its "multiple" attribute set to False.
4399 prop_copy = entity_pb.Property()
4400 prop_copy.MergeFrom(prop)
4401 prop_copy.set_multiple(False)
4402 return prop_copy
4405 def _SetStartInclusive(position, first_direction):
4406 """Sets the start_inclusive field in position.
4408 Args:
4409 position: datastore_pb.Position
4410 first_direction: the first sort order from the query
4411 (a datastore_pb.Query_Order) or None
4413 position.set_start_inclusive(
4414 position.before_ascending()
4415 != (first_direction == datastore_pb.Query_Order.DESCENDING))
4418 def _SetBeforeAscending(position, first_direction):
4419 """Sets the before_ascending field in position.
4421 Args:
4422 position: datastore_pb.Position
4423 first_direction: the first sort order from the query
4424 (a datastore_pb.Query_Order) or None
4426 position.set_before_ascending(
4427 position.start_inclusive()
4428 != (first_direction == datastore_pb.Query_Order.DESCENDING))