App Engine Python SDK version 1.8.9
[gae.git] / python / google / appengine / datastore / datastore_stub_util.py
blob3a76744926f2d49a87bed81721570609d100e3d8
1 #!/usr/bin/env python
3 # Copyright 2007 Google Inc.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
21 """Utility functions shared between the file and sqlite datastore stubs.
23 This module is internal and should not be used by client applications.
24 """
33 try:
34 import hashlib
35 _MD5_FUNC = hashlib.md5
36 except ImportError:
37 import md5
38 _MD5_FUNC = md5.new
40 import atexit
41 import collections
42 import itertools
43 import logging
44 import os
45 import random
46 import struct
47 import threading
48 import time
49 import weakref
51 from google.net.proto import ProtocolBuffer
52 from google.appengine.datastore import entity_pb
54 from google.appengine.api import api_base_pb
55 from google.appengine.api import apiproxy_stub_map
56 from google.appengine.api import datastore_admin
57 from google.appengine.api import datastore_errors
58 from google.appengine.api import datastore_types
59 from google.appengine.api.taskqueue import taskqueue_service_pb
60 from google.appengine.datastore import datastore_index
61 from google.appengine.datastore import datastore_pb
62 from google.appengine.datastore import datastore_pbs
63 from google.appengine.datastore import datastore_query
64 from google.appengine.datastore import datastore_stub_index
65 from google.appengine.datastore import datastore_v4_pb
66 from google.appengine.runtime import apiproxy_errors
71 _MAXIMUM_RESULTS = 300
77 _MAXIMUM_QUERY_RESULT_BYTES = 2000000
83 _MAX_QUERY_OFFSET = 1000
87 _PROPERTY_TYPE_NAMES = {
88 0: 'NULL',
89 entity_pb.PropertyValue.kint64Value: 'INT64',
90 entity_pb.PropertyValue.kbooleanValue: 'BOOLEAN',
91 entity_pb.PropertyValue.kstringValue: 'STRING',
92 entity_pb.PropertyValue.kdoubleValue: 'DOUBLE',
93 entity_pb.PropertyValue.kPointValueGroup: 'POINT',
94 entity_pb.PropertyValue.kUserValueGroup: 'USER',
95 entity_pb.PropertyValue.kReferenceValueGroup: 'REFERENCE'
100 _SCATTER_PROPORTION = 32768
105 _MAX_EG_PER_TXN = 5
110 _BLOB_MEANINGS = frozenset((entity_pb.Property.BLOB,
111 entity_pb.Property.ENTITY_PROTO,
112 entity_pb.Property.TEXT))
120 _RETRIES = 3
124 _INITIAL_RETRY_DELAY_MS = 100
128 _RETRY_DELAY_MULTIPLIER = 2
132 _MAX_RETRY_DELAY_MS = 120000
137 SEQUENTIAL = 'sequential'
138 SCATTERED = 'scattered'
144 _MAX_SEQUENTIAL_BIT = 52
149 _MAX_SEQUENTIAL_COUNTER = (1 << _MAX_SEQUENTIAL_BIT) - 1
153 _MAX_SEQUENTIAL_ID = _MAX_SEQUENTIAL_COUNTER
158 _MAX_SCATTERED_COUNTER = (1 << (_MAX_SEQUENTIAL_BIT - 1)) - 1
164 _MAX_SCATTERED_ID = _MAX_SEQUENTIAL_ID + 1 + _MAX_SCATTERED_COUNTER
168 _SCATTER_SHIFT = 64 - _MAX_SEQUENTIAL_BIT + 1
171 def _GetScatterProperty(entity_proto):
172 """Gets the scatter property for an object.
174 For ease of implementation, this is not synchronized with the actual
175 value on the App Engine server, but should work equally well.
177 Note: This property may change, either here or in production. No client
178 other than the mapper framework should rely on it directly.
180 Returns:
181 The PropertyValue of the scatter property or None if this entity should not
182 have a scatter property.
184 hash_obj = _MD5_FUNC()
185 for element in entity_proto.key().path().element_list():
186 if element.has_name():
187 hash_obj.update(element.name())
188 elif element.has_id():
189 hash_obj.update(str(element.id()))
190 hash_bytes = hash_obj.digest()[0:2]
191 (hash_int,) = struct.unpack('H', hash_bytes)
193 if hash_int >= _SCATTER_PROPORTION:
194 return None
196 scatter_property = entity_pb.Property()
197 scatter_property.set_name(datastore_types.SCATTER_SPECIAL_PROPERTY)
198 scatter_property.set_meaning(entity_pb.Property.BYTESTRING)
199 scatter_property.set_multiple(False)
200 property_value = scatter_property.mutable_value()
201 property_value.set_stringvalue(hash_bytes)
202 return scatter_property
208 _SPECIAL_PROPERTY_MAP = {
209 datastore_types.SCATTER_SPECIAL_PROPERTY: (False, True, _GetScatterProperty)
213 def GetInvisibleSpecialPropertyNames():
214 """Gets the names of all non user-visible special properties."""
215 invisible_names = []
216 for name, value in _SPECIAL_PROPERTY_MAP.items():
217 is_visible, _, _ = value
218 if not is_visible:
219 invisible_names.append(name)
220 return invisible_names
223 def _PrepareSpecialProperties(entity_proto, is_load):
224 """Computes special properties for loading or storing.
225 Strips other special properties."""
226 for i in xrange(entity_proto.property_size() - 1, -1, -1):
227 if _SPECIAL_PROPERTY_MAP.has_key(entity_proto.property(i).name()):
228 del entity_proto.property_list()[i]
230 for is_visible, is_stored, property_func in _SPECIAL_PROPERTY_MAP.values():
231 if is_load:
232 should_process = is_visible
233 else:
234 should_process = is_stored
236 if should_process:
237 special_property = property_func(entity_proto)
238 if special_property:
239 entity_proto.property_list().append(special_property)
242 def _GetGroupByKey(entity, property_names):
243 """Computes a key value that uniquely identifies the 'group' of an entity.
245 Args:
246 entity: The entity_pb.EntityProto for which to create the group key.
247 property_names: The names of the properties in the group by clause.
249 Returns:
250 A hashable value that uniquely identifies the entity's 'group'.
252 return frozenset((prop.name(), prop.value().SerializeToString())
253 for prop in entity.property_list()
254 if prop.name() in property_names)
257 def PrepareSpecialPropertiesForStore(entity_proto):
258 """Computes special properties for storing.
259 Strips other special properties."""
260 _PrepareSpecialProperties(entity_proto, False)
263 def LoadEntity(entity, keys_only=False, property_names=None):
264 """Prepares an entity to be returned to the user.
266 Args:
267 entity: a entity_pb.EntityProto or None
268 keys_only: if a keys only result should be produced
269 property_names: if not None or empty, cause a projected entity
270 to be produced with the given properties.
272 Returns:
273 A user friendly copy of entity or None.
275 if entity:
276 clone = entity_pb.EntityProto()
277 if property_names:
279 clone.mutable_key().CopyFrom(entity.key())
280 clone.mutable_entity_group()
281 seen = set()
282 for prop in entity.property_list():
283 if prop.name() in property_names:
285 Check(prop.name() not in seen,
286 "datastore dev stub produced bad result",
287 datastore_pb.Error.INTERNAL_ERROR)
288 seen.add(prop.name())
289 new_prop = clone.add_property()
290 new_prop.set_name(prop.name())
291 new_prop.set_meaning(entity_pb.Property.INDEX_VALUE)
292 new_prop.mutable_value().CopyFrom(prop.value())
293 new_prop.set_multiple(False)
294 elif keys_only:
296 clone.mutable_key().CopyFrom(entity.key())
297 clone.mutable_entity_group()
298 else:
300 clone.CopyFrom(entity)
301 PrepareSpecialPropertiesForLoad(clone)
302 return clone
305 def StoreEntity(entity):
306 """Prepares an entity for storing.
308 Args:
309 entity: a entity_pb.EntityProto to prepare
311 Returns:
312 A copy of entity that should be stored in its place.
314 clone = entity_pb.EntityProto()
315 clone.CopyFrom(entity)
319 PrepareSpecialPropertiesForStore(clone)
320 return clone
323 def PrepareSpecialPropertiesForLoad(entity_proto):
324 """Computes special properties that are user-visible.
325 Strips other special properties."""
326 _PrepareSpecialProperties(entity_proto, True)
329 def Check(test, msg='', error_code=datastore_pb.Error.BAD_REQUEST):
330 """Raises an apiproxy_errors.ApplicationError if the condition is false.
332 Args:
333 test: A condition to test.
334 msg: A string to return with the error.
335 error_code: One of datastore_pb.Error to use as an error code.
337 Raises:
338 apiproxy_errors.ApplicationError: If test is false.
340 if not test:
341 raise apiproxy_errors.ApplicationError(error_code, msg)
344 def CheckValidUTF8(string, desc):
345 """Check that the given string is valid UTF-8.
347 Args:
348 string: the string to validate.
349 desc: a description of the string being validated.
351 Raises:
352 apiproxy_errors.ApplicationError: if the string is not valid UTF-8.
354 try:
355 string.decode('utf-8')
356 except UnicodeDecodeError:
357 Check(False, '%s is not valid UTF-8.' % desc)
360 def CheckAppId(request_trusted, request_app_id, app_id):
361 """Check that this is the stub for app_id.
363 Args:
364 request_trusted: If the request is trusted.
365 request_app_id: The application ID of the app making the request.
366 app_id: An application ID.
368 Raises:
369 apiproxy_errors.ApplicationError: if this is not the stub for app_id.
372 assert app_id
373 CheckValidUTF8(app_id, "app id");
374 Check(request_trusted or app_id == request_app_id,
375 'app "%s" cannot access app "%s"\'s data' % (request_app_id, app_id))
378 def CheckReference(request_trusted,
379 request_app_id,
380 key,
381 require_id_or_name=True):
382 """Check this key.
384 Args:
385 request_trusted: If the request is trusted.
386 request_app_id: The application ID of the app making the request.
387 key: entity_pb.Reference
388 require_id_or_name: Boolean indicating if we should enforce the presence of
389 an id or name in the last element of the key's path.
391 Raises:
392 apiproxy_errors.ApplicationError: if the key is invalid
395 assert isinstance(key, entity_pb.Reference)
397 CheckAppId(request_trusted, request_app_id, key.app())
399 Check(key.path().element_size() > 0, 'key\'s path cannot be empty')
401 if require_id_or_name:
403 last_element = key.path().element_list()[-1]
404 has_id_or_name = ((last_element.has_id() and last_element.id() != 0) or
405 (last_element.has_name() and last_element.name() != ""))
406 if not has_id_or_name:
407 raise datastore_errors.BadRequestError('missing key id/name')
409 for elem in key.path().element_list():
410 Check(not elem.has_id() or not elem.has_name(),
411 'each key path element should have id or name but not both: %r' % key)
412 CheckValidUTF8(elem.type(), 'key path element type')
413 if elem.has_name():
414 CheckValidUTF8(elem.name(), 'key path element name')
417 def CheckEntity(request_trusted, request_app_id, entity):
418 """Check if this entity can be stored.
420 Args:
421 request_trusted: If the request is trusted.
422 request_app_id: The application ID of the app making the request.
423 entity: entity_pb.EntityProto
425 Raises:
426 apiproxy_errors.ApplicationError: if the entity is invalid
430 CheckReference(request_trusted, request_app_id, entity.key(), False)
431 for prop in entity.property_list():
432 CheckProperty(request_trusted, request_app_id, prop)
433 for prop in entity.raw_property_list():
434 CheckProperty(request_trusted, request_app_id, prop, indexed=False)
437 def CheckProperty(request_trusted, request_app_id, prop, indexed=True):
438 """Check if this property can be stored.
440 Args:
441 request_trusted: If the request is trusted.
442 request_app_id: The application ID of the app making the request.
443 prop: entity_pb.Property
444 indexed: Whether the property is indexed.
446 Raises:
447 apiproxy_errors.ApplicationError: if the property is invalid
449 name = prop.name()
450 value = prop.value()
451 meaning = prop.meaning()
452 CheckValidUTF8(name, 'property name')
453 Check(request_trusted or
454 not datastore_types.RESERVED_PROPERTY_NAME.match(name),
455 'cannot store entity with reserved property name \'%s\'' % name)
456 Check(prop.meaning() != entity_pb.Property.INDEX_VALUE,
457 'Entities with incomplete properties cannot be written.')
458 is_blob = meaning in _BLOB_MEANINGS
459 if indexed:
460 Check(not is_blob,
461 'BLOB, ENITY_PROTO or TEXT property ' + name +
462 ' must be in a raw_property field')
463 max_length = datastore_types._MAX_STRING_LENGTH
464 else:
465 if is_blob:
466 Check(value.has_stringvalue(),
467 'BLOB / ENTITY_PROTO / TEXT raw property ' + name +
468 'must have a string value')
469 max_length = datastore_types._MAX_RAW_PROPERTY_BYTES
470 if meaning == entity_pb.Property.ATOM_LINK:
471 max_length = datastore_types._MAX_LINK_PROPERTY_LENGTH
473 CheckPropertyValue(name, value, max_length, meaning)
476 def CheckPropertyValue(name, value, max_length, meaning):
477 """Check if this property value can be stored.
479 Args:
480 name: name of the property
481 value: entity_pb.PropertyValue
482 max_length: maximum length for string values
483 meaning: meaning of the property
485 Raises:
486 apiproxy_errors.ApplicationError: if the property is invalid
488 num_values = (value.has_int64value() +
489 value.has_stringvalue() +
490 value.has_booleanvalue() +
491 value.has_doublevalue() +
492 value.has_pointvalue() +
493 value.has_uservalue() +
494 value.has_referencevalue())
495 Check(num_values <= 1, 'PropertyValue for ' + name +
496 ' has multiple value fields set')
498 if value.has_stringvalue():
506 s16 = value.stringvalue().decode('utf-8', 'replace').encode('utf-16')
508 Check((len(s16) - 2) / 2 <= max_length,
509 'Property %s is too long. Maximum length is %d.' % (name, max_length))
510 if (meaning not in _BLOB_MEANINGS and
511 meaning != entity_pb.Property.BYTESTRING):
512 CheckValidUTF8(value.stringvalue(), 'String property value')
515 def CheckTransaction(request_trusted, request_app_id, transaction):
516 """Check that this transaction is valid.
518 Args:
519 request_trusted: If the request is trusted.
520 request_app_id: The application ID of the app making the request.
521 transaction: datastore_pb.Transaction
523 Raises:
524 apiproxy_errors.ApplicationError: if the transaction is not valid.
526 assert isinstance(transaction, datastore_pb.Transaction)
527 CheckAppId(request_trusted, request_app_id, transaction.app())
530 def CheckQuery(query, filters, orders, max_query_components):
531 """Check a datastore query with normalized filters, orders.
533 Raises an ApplicationError when any of the following conditions are violated:
534 - transactional queries have an ancestor
535 - queries that are not too large
536 (sum of filters, orders, ancestor <= max_query_components)
537 - ancestor (if any) app and namespace match query app and namespace
538 - kindless queries only filter on __key__ and only sort on __key__ ascending
539 - multiple inequality (<, <=, >, >=) filters all applied to the same property
540 - filters on __key__ compare to a reference in the same app and namespace as
541 the query
542 - if an inequality filter on prop X is used, the first order (if any) must
543 be on X
545 Args:
546 query: query to validate
547 filters: normalized (by datastore_index.Normalize) filters from query
548 orders: normalized (by datastore_index.Normalize) orders from query
549 max_query_components: limit on query complexity
551 Check(query.property_name_size() == 0 or not query.keys_only(),
552 'projection and keys_only cannot both be set')
554 projected_properties = set(query.property_name_list())
555 for prop_name in query.property_name_list():
556 Check(not datastore_types.RESERVED_PROPERTY_NAME.match(prop_name),
557 'projections are not supported for the property: ' + prop_name)
558 Check(len(projected_properties) == len(query.property_name_list()),
559 "cannot project a property multiple times")
561 key_prop_name = datastore_types.KEY_SPECIAL_PROPERTY
562 unapplied_log_timestamp_us_name = (
563 datastore_types._UNAPPLIED_LOG_TIMESTAMP_SPECIAL_PROPERTY)
565 if query.has_transaction():
567 Check(query.has_ancestor(),
568 'Only ancestor queries are allowed inside transactions.')
571 num_components = len(filters) + len(orders)
572 if query.has_ancestor():
573 num_components += 1
574 Check(num_components <= max_query_components,
575 'query is too large. may not have more than %s filters'
576 ' + sort orders ancestor total' % max_query_components)
579 if query.has_ancestor():
580 ancestor = query.ancestor()
581 Check(query.app() == ancestor.app(),
582 'query app is %s but ancestor app is %s' %
583 (query.app(), ancestor.app()))
584 Check(query.name_space() == ancestor.name_space(),
585 'query namespace is %s but ancestor namespace is %s' %
586 (query.name_space(), ancestor.name_space()))
589 if query.group_by_property_name_size():
590 group_by_set = set(query.group_by_property_name_list())
591 for order in orders:
592 if not group_by_set:
593 break
594 Check(order.property() in group_by_set,
595 'items in the group by clause must be specified first '
596 'in the ordering')
597 group_by_set.remove(order.property())
601 ineq_prop_name = None
602 for filter in filters:
603 Check(filter.property_size() == 1,
604 'Filter has %d properties, expected 1' % filter.property_size())
606 prop = filter.property(0)
607 prop_name = prop.name().decode('utf-8')
609 if prop_name == key_prop_name:
613 Check(prop.value().has_referencevalue(),
614 '%s filter value must be a Key' % key_prop_name)
615 ref_val = prop.value().referencevalue()
616 Check(ref_val.app() == query.app(),
617 '%s filter app is %s but query app is %s' %
618 (key_prop_name, ref_val.app(), query.app()))
619 Check(ref_val.name_space() == query.name_space(),
620 '%s filter namespace is %s but query namespace is %s' %
621 (key_prop_name, ref_val.name_space(), query.name_space()))
623 if filter.op() in datastore_index.EQUALITY_OPERATORS:
624 Check(prop_name not in projected_properties,
625 'cannot use projection on a property with an equality filter')
626 if (filter.op() in datastore_index.INEQUALITY_OPERATORS and
627 prop_name != unapplied_log_timestamp_us_name):
628 if ineq_prop_name is None:
629 ineq_prop_name = prop_name
630 else:
631 Check(ineq_prop_name == prop_name,
632 'Only one inequality filter per query is supported. '
633 'Encountered both %s and %s' % (ineq_prop_name, prop_name))
635 if (ineq_prop_name is not None
636 and query.group_by_property_name_size() > 0
637 and not orders):
639 Check(ineq_prop_name in group_by_set,
640 'Inequality filter on %s must also be a group by '
641 'property when group by properties are set.'
642 % (ineq_prop_name))
644 if ineq_prop_name is not None and orders:
646 first_order_prop = orders[0].property().decode('utf-8')
647 Check(first_order_prop == ineq_prop_name,
648 'The first sort property must be the same as the property '
649 'to which the inequality filter is applied. In your query '
650 'the first sort property is %s but the inequality filter '
651 'is on %s' % (first_order_prop, ineq_prop_name))
653 if not query.has_kind():
655 for filter in filters:
656 prop_name = filter.property(0).name().decode('utf-8')
657 Check(prop_name == key_prop_name or
658 prop_name == unapplied_log_timestamp_us_name,
659 'kind is required for non-__key__ filters')
660 for order in orders:
661 prop_name = order.property().decode('utf-8')
662 Check(prop_name == key_prop_name and
663 order.direction() is datastore_pb.Query_Order.ASCENDING,
664 'kind is required for all orders except __key__ ascending')
667 class ValueRange(object):
668 """A range of values defined by its two extremes (inclusive or exclusive)."""
670 def __init__(self):
671 """Constructor.
673 Creates an unlimited range.
675 self.__start = self.__end = None
676 self.__start_inclusive = self.__end_inclusive = False
678 def Update(self, rel_op, limit):
679 """Filter the range by 'rel_op limit'.
681 Args:
682 rel_op: relational operator from datastore_pb.Query_Filter.
683 limit: the value to limit the range by.
686 if rel_op == datastore_pb.Query_Filter.LESS_THAN:
687 if self.__end is None or limit <= self.__end:
688 self.__end = limit
689 self.__end_inclusive = False
690 elif (rel_op == datastore_pb.Query_Filter.LESS_THAN_OR_EQUAL or
691 rel_op == datastore_pb.Query_Filter.EQUAL):
692 if self.__end is None or limit < self.__end:
693 self.__end = limit
694 self.__end_inclusive = True
696 if rel_op == datastore_pb.Query_Filter.GREATER_THAN:
697 if self.__start is None or limit >= self.__start:
698 self.__start = limit
699 self.__start_inclusive = False
700 elif (rel_op == datastore_pb.Query_Filter.GREATER_THAN_OR_EQUAL or
701 rel_op == datastore_pb.Query_Filter.EQUAL):
702 if self.__start is None or limit > self.__start:
703 self.__start = limit
704 self.__start_inclusive = True
706 def Contains(self, value):
707 """Check if the range contains a specific value.
709 Args:
710 value: the value to check.
711 Returns:
712 True iff value is contained in this range.
714 if self.__start is not None:
715 if self.__start_inclusive and value < self.__start: return False
716 if not self.__start_inclusive and value <= self.__start: return False
717 if self.__end is not None:
718 if self.__end_inclusive and value > self.__end: return False
719 if not self.__end_inclusive and value >= self.__end: return False
720 return True
722 def Remap(self, mapper):
723 """Transforms the range extremes with a function.
725 The function mapper must preserve order, i.e.
726 x rel_op y iff mapper(x) rel_op y
728 Args:
729 mapper: function to apply to the range extremes.
731 self.__start = self.__start and mapper(self.__start)
732 self.__end = self.__end and mapper(self.__end)
734 def MapExtremes(self, mapper):
735 """Evaluate a function on the range extremes.
737 Args:
738 mapper: function to apply to the range extremes.
739 Returns:
740 (x, y) where x = None if the range has no start,
741 mapper(start, start_inclusive, False) otherwise
742 y = None if the range has no end,
743 mapper(end, end_inclusive, True) otherwise
745 return (
746 self.__start and mapper(self.__start, self.__start_inclusive, False),
747 self.__end and mapper(self.__end, self.__end_inclusive, True))
750 def ParseKeyFilteredQuery(filters, orders):
751 """Parse queries which only allow filters and ascending-orders on __key__.
753 Raises exceptions for illegal queries.
754 Args:
755 filters: the normalized filters of a query.
756 orders: the normalized orders of a query.
757 Returns:
758 The key range (a ValueRange over datastore_types.Key) requested in the
759 query.
762 remaining_filters = []
763 key_range = ValueRange()
764 key_prop = datastore_types.KEY_SPECIAL_PROPERTY
765 for f in filters:
766 op = f.op()
767 if not (f.property_size() == 1 and
768 f.property(0).name() == key_prop and
769 not (op == datastore_pb.Query_Filter.IN or
770 op == datastore_pb.Query_Filter.EXISTS)):
771 remaining_filters.append(f)
772 continue
774 val = f.property(0).value()
775 Check(val.has_referencevalue(), '__key__ kind must be compared to a key')
776 limit = datastore_types.FromReferenceProperty(val)
777 key_range.Update(op, limit)
780 remaining_orders = []
781 for o in orders:
782 if not (o.direction() == datastore_pb.Query_Order.ASCENDING and
783 o.property() == datastore_types.KEY_SPECIAL_PROPERTY):
784 remaining_orders.append(o)
785 else:
786 break
790 Check(not remaining_filters,
791 'Only comparison filters on ' + key_prop + ' supported')
792 Check(not remaining_orders,
793 'Only ascending order on ' + key_prop + ' supported')
795 return key_range
798 def ParseKindQuery(query, filters, orders):
799 """Parse __kind__ (schema) queries.
801 Raises exceptions for illegal queries.
802 Args:
803 query: A Query PB.
804 filters: the normalized filters from query.
805 orders: the normalized orders from query.
806 Returns:
807 The kind range (a ValueRange over string) requested in the query.
810 Check(not query.has_ancestor(), 'ancestor queries on __kind__ not allowed')
812 key_range = ParseKeyFilteredQuery(filters, orders)
813 key_range.Remap(_KindKeyToString)
815 return key_range
818 def _KindKeyToString(key):
819 """Extract kind name from __kind__ key.
821 Raises an ApplicationError if the key is not of the form '__kind__'/name.
823 Args:
824 key: a key for a __kind__ instance.
825 Returns:
826 kind specified by key.
828 key_path = key.to_path()
829 if (len(key_path) == 2 and key_path[0] == '__kind__' and
830 isinstance(key_path[1], basestring)):
831 return key_path[1]
832 Check(False, 'invalid Key for __kind__ table')
835 def ParseNamespaceQuery(query, filters, orders):
836 """Parse __namespace__ queries.
838 Raises exceptions for illegal queries.
839 Args:
840 query: A Query PB.
841 filters: the normalized filters from query.
842 orders: the normalized orders from query.
843 Returns:
844 The kind range (a ValueRange over string) requested in the query.
847 Check(not query.has_ancestor(),
848 'ancestor queries on __namespace__ not allowed')
850 key_range = ParseKeyFilteredQuery(filters, orders)
851 key_range.Remap(_NamespaceKeyToString)
853 return key_range
856 def _NamespaceKeyToString(key):
857 """Extract namespace name from __namespace__ key.
859 Raises an ApplicationError if the key is not of the form '__namespace__'/name
860 or '__namespace__'/_EMPTY_NAMESPACE_ID.
862 Args:
863 key: a key for a __namespace__ instance.
864 Returns:
865 namespace specified by key.
867 key_path = key.to_path()
868 if len(key_path) == 2 and key_path[0] == '__namespace__':
869 if key_path[1] == datastore_types._EMPTY_NAMESPACE_ID:
870 return ''
871 if isinstance(key_path[1], basestring):
872 return key_path[1]
873 Check(False, 'invalid Key for __namespace__ table')
876 def ParsePropertyQuery(query, filters, orders):
877 """Parse __property__ queries.
879 Raises exceptions for illegal queries.
880 Args:
881 query: A Query PB.
882 filters: the normalized filters from query.
883 orders: the normalized orders from query.
884 Returns:
885 The kind range (a ValueRange over (kind, property) pairs) requested
886 in the query.
889 Check(not query.has_transaction(),
890 'transactional queries on __property__ not allowed')
892 key_range = ParseKeyFilteredQuery(filters, orders)
893 key_range.Remap(lambda x: _PropertyKeyToString(x, ''))
895 if query.has_ancestor():
896 ancestor = datastore_types.Key._FromPb(query.ancestor())
897 ancestor_kind, ancestor_property = _PropertyKeyToString(ancestor, None)
900 if ancestor_property is not None:
901 key_range.Update(datastore_pb.Query_Filter.EQUAL,
902 (ancestor_kind, ancestor_property))
903 else:
905 key_range.Update(datastore_pb.Query_Filter.GREATER_THAN_OR_EQUAL,
906 (ancestor_kind, ''))
907 key_range.Update(datastore_pb.Query_Filter.LESS_THAN_OR_EQUAL,
908 (ancestor_kind + '\0', ''))
909 query.clear_ancestor()
911 return key_range
914 def _PropertyKeyToString(key, default_property):
915 """Extract property name from __property__ key.
917 Raises an ApplicationError if the key is not of the form
918 '__kind__'/kind, '__property__'/property or '__kind__'/kind
920 Args:
921 key: a key for a __property__ instance.
922 default_property: property value to return when key only has a kind.
923 Returns:
924 kind, property if key = '__kind__'/kind, '__property__'/property
925 kind, default_property if key = '__kind__'/kind
927 key_path = key.to_path()
928 if (len(key_path) == 2 and
929 key_path[0] == '__kind__' and isinstance(key_path[1], basestring)):
930 return (key_path[1], default_property)
931 if (len(key_path) == 4 and
932 key_path[0] == '__kind__' and isinstance(key_path[1], basestring) and
933 key_path[2] == '__property__' and isinstance(key_path[3], basestring)):
934 return (key_path[1], key_path[3])
936 Check(False, 'invalid Key for __property__ table')
939 def SynthesizeUserId(email):
940 """Return a synthetic user ID from an email address.
942 Note that this is not the same user ID found in the production system.
944 Args:
945 email: An email address.
947 Returns:
948 A string userid derived from the email address.
951 user_id_digest = _MD5_FUNC(email.lower()).digest()
952 user_id = '1' + ''.join(['%02d' % ord(x) for x in user_id_digest])[:20]
953 return user_id
956 def FillUsersInQuery(filters):
957 """Fill in a synthetic user ID for all user properties in a set of filters.
959 Args:
960 filters: The normalized filters from query.
962 for filter in filters:
963 for property in filter.property_list():
964 FillUser(property)
967 def FillUser(property):
968 """Fill in a synthetic user ID for a user properties.
970 Args:
971 property: A Property which may have a user value.
973 if property.value().has_uservalue():
974 uid = SynthesizeUserId(property.value().uservalue().email())
975 if uid:
976 property.mutable_value().mutable_uservalue().set_obfuscated_gaiaid(uid)
979 class BaseCursor(object):
980 """A base query cursor over a list of entities.
982 Public properties:
983 cursor: the integer cursor.
984 app: the app for which this cursor was created.
985 keys_only: whether the query is keys_only.
987 Class attributes:
988 _next_cursor: the next cursor to allocate.
989 _next_cursor_lock: protects _next_cursor.
991 _next_cursor = 1
992 _next_cursor_lock = threading.Lock()
994 def __init__(self, query, dsquery, orders, index_list):
995 """Constructor.
997 Args:
998 query: the query request proto.
999 dsquery: a datastore_query.Query over query.
1000 orders: the orders of query as returned by _GuessOrders.
1001 index_list: the list of indexes used by the query.
1004 self.keys_only = query.keys_only()
1005 self.property_names = set(query.property_name_list())
1006 self.group_by = set(query.group_by_property_name_list())
1007 self.app = query.app()
1008 self.cursor = self._AcquireCursorID()
1010 self.__order_compare_entities = dsquery._order.cmp_for_filter(
1011 dsquery._filter_predicate)
1012 if self.group_by:
1013 self.__cursor_properties = self.group_by
1014 else:
1015 self.__cursor_properties = set(order.property() for order in orders)
1016 self.__cursor_properties.add('__key__')
1017 self.__cursor_properties = frozenset(self.__cursor_properties)
1018 self.__index_list = index_list
1020 def _PopulateResultMetadata(self, query_result, compile,
1021 first_result, last_result):
1022 query_result.set_keys_only(self.keys_only)
1023 if query_result.more_results():
1024 cursor = query_result.mutable_cursor()
1025 cursor.set_app(self.app)
1026 cursor.set_cursor(self.cursor)
1027 if compile:
1028 self._EncodeCompiledCursor(last_result,
1029 query_result.mutable_compiled_cursor())
1030 if first_result:
1031 query_result.index_list().extend(self.__index_list)
1033 @classmethod
1034 def _AcquireCursorID(cls):
1035 """Acquires the next cursor id in a thread safe manner."""
1036 cls._next_cursor_lock.acquire()
1037 try:
1038 cursor_id = cls._next_cursor
1039 cls._next_cursor += 1
1040 finally:
1041 cls._next_cursor_lock.release()
1042 return cursor_id
1044 def _IsBeforeCursor(self, entity, cursor):
1045 """True if entity is before cursor according to the current order.
1047 Args:
1048 entity: a entity_pb.EntityProto entity.
1049 cursor: a compiled cursor as returned by _DecodeCompiledCursor.
1051 comparison_entity = entity_pb.EntityProto()
1052 for prop in entity.property_list():
1053 if prop.name() in self.__cursor_properties:
1054 comparison_entity.add_property().MergeFrom(prop)
1055 if cursor[0].has_key():
1056 comparison_entity.mutable_key().MergeFrom(entity.key())
1057 x = self.__order_compare_entities(comparison_entity, cursor[0])
1058 if cursor[1]:
1059 return x < 0
1060 else:
1061 return x <= 0
1063 def _DecodeCompiledCursor(self, compiled_cursor):
1064 """Converts a compiled_cursor into a cursor_entity.
1066 Args:
1067 compiled_cursor: The datastore_pb.CompiledCursor to decode.
1069 Returns:
1070 (cursor_entity, inclusive): a entity_pb.EntityProto and if it should
1071 be included in the result set.
1073 assert len(compiled_cursor.position_list()) == 1
1075 position = compiled_cursor.position(0)
1080 remaining_properties = set(self.__cursor_properties)
1082 cursor_entity = entity_pb.EntityProto()
1083 if position.has_key():
1084 cursor_entity.mutable_key().CopyFrom(position.key())
1085 try:
1086 remaining_properties.remove('__key__')
1087 except KeyError:
1088 Check(False, 'Cursor does not match query: extra value __key__')
1089 for indexvalue in position.indexvalue_list():
1090 property = cursor_entity.add_property()
1091 property.set_name(indexvalue.property())
1092 property.mutable_value().CopyFrom(indexvalue.value())
1093 try:
1094 remaining_properties.remove(indexvalue.property())
1095 except KeyError:
1096 Check(False, 'Cursor does not match query: extra value %s' %
1097 indexvalue.property())
1098 Check(not remaining_properties,
1099 'Cursor does not match query: missing values for %r' %
1100 remaining_properties)
1102 return (cursor_entity, position.start_inclusive())
1104 def _EncodeCompiledCursor(self, last_result, compiled_cursor):
1105 """Converts the current state of the cursor into a compiled_cursor.
1107 Args:
1108 last_result: the last result returned by this query.
1109 compiled_cursor: an empty datstore_pb.CompiledCursor.
1111 if last_result is not None:
1114 position = compiled_cursor.add_position()
1117 if '__key__' in self.__cursor_properties:
1118 position.mutable_key().MergeFrom(last_result.key())
1119 for prop in last_result.property_list():
1120 if prop.name() in self.__cursor_properties:
1121 indexvalue = position.add_indexvalue()
1122 indexvalue.set_property(prop.name())
1123 indexvalue.mutable_value().CopyFrom(prop.value())
1124 position.set_start_inclusive(False)
1127 class ListCursor(BaseCursor):
1128 """A query cursor over a list of entities.
1130 Public properties:
1131 keys_only: whether the query is keys_only
1134 def __init__(self, query, dsquery, orders, index_list, results):
1135 """Constructor.
1137 Args:
1138 query: the query request proto
1139 dsquery: a datastore_query.Query over query.
1140 orders: the orders of query as returned by _GuessOrders.
1141 index_list: the list of indexes used by the query.
1142 results: list of entity_pb.EntityProto
1144 super(ListCursor, self).__init__(query, dsquery, orders, index_list)
1147 if self.group_by:
1148 distincts = set()
1149 new_results = []
1150 for result in results:
1151 key_value = _GetGroupByKey(result, self.group_by)
1152 if key_value not in distincts:
1153 distincts.add(key_value)
1154 new_results.append(result)
1155 results = new_results
1157 if query.has_compiled_cursor() and query.compiled_cursor().position_list():
1158 start_cursor = self._DecodeCompiledCursor(query.compiled_cursor())
1159 self.__last_result = start_cursor[0]
1160 start_cursor_position = self._GetCursorOffset(results, start_cursor)
1161 else:
1162 self.__last_result = None
1163 start_cursor_position = 0
1165 if query.has_end_compiled_cursor():
1166 if query.end_compiled_cursor().position_list():
1167 end_cursor = self._DecodeCompiledCursor(query.end_compiled_cursor())
1168 end_cursor_position = self._GetCursorOffset(results, end_cursor)
1169 else:
1170 end_cursor_position = 0
1171 else:
1172 end_cursor_position = len(results)
1175 results = results[start_cursor_position:end_cursor_position]
1178 if query.has_limit():
1179 limit = query.limit()
1180 if query.offset():
1181 limit += query.offset()
1182 if limit >= 0 and limit < len(results):
1183 results = results[:limit]
1185 self.__results = results
1186 self.__offset = 0
1187 self.__count = len(self.__results)
1189 def _GetCursorOffset(self, results, cursor):
1190 """Converts a cursor into a offset into the result set even if the
1191 cursor's entity no longer exists.
1193 Args:
1194 results: the query's results (sequence of entity_pb.EntityProto)
1195 cursor: a compiled cursor as returned by _DecodeCompiledCursor
1196 Returns:
1197 the integer offset
1199 lo = 0
1200 hi = len(results)
1201 while lo < hi:
1202 mid = (lo + hi) // 2
1203 if self._IsBeforeCursor(results[mid], cursor):
1204 lo = mid + 1
1205 else:
1206 hi = mid
1207 return lo
1209 def PopulateQueryResult(self, result, count, offset,
1210 compile=False, first_result=False):
1211 """Populates a QueryResult with this cursor and the given number of results.
1213 Args:
1214 result: datastore_pb.QueryResult
1215 count: integer of how many results to return
1216 offset: integer of how many results to skip
1217 compile: boolean, whether we are compiling this query
1218 first_result: whether the query result is the first for this query
1220 Check(offset >= 0, 'Offset must be >= 0')
1222 offset = min(offset, self.__count - self.__offset)
1223 limited_offset = min(offset, _MAX_QUERY_OFFSET)
1224 if limited_offset:
1225 self.__offset += limited_offset
1226 result.set_skipped_results(limited_offset)
1228 if offset == limited_offset and count:
1230 if count > _MAXIMUM_RESULTS:
1231 count = _MAXIMUM_RESULTS
1232 results = self.__results[self.__offset:self.__offset + count]
1233 count = len(results)
1234 self.__offset += count
1240 result.result_list().extend(
1241 LoadEntity(entity, self.keys_only, self.property_names)
1242 for entity in results)
1244 if self.__offset:
1246 self.__last_result = self.__results[self.__offset - 1]
1248 result.set_more_results(self.__offset < self.__count)
1249 self._PopulateResultMetadata(result, compile,
1250 first_result, self.__last_result)
1253 def _SynchronizeTxn(function):
1254 """A decorator that locks a transaction during the function call."""
1256 def sync(txn, *args, **kwargs):
1258 txn._lock.acquire()
1259 try:
1261 Check(txn._state is LiveTxn.ACTIVE, 'transaction closed')
1263 return function(txn, *args, **kwargs)
1264 finally:
1266 txn._lock.release()
1267 return sync
1270 def _GetEntityGroup(ref):
1271 """Returns the entity group key for the given reference."""
1272 entity_group = entity_pb.Reference()
1273 entity_group.CopyFrom(ref)
1274 assert (entity_group.path().element_list()[0].has_id() or
1275 entity_group.path().element_list()[0].has_name())
1276 del entity_group.path().element_list()[1:]
1277 return entity_group
1280 def _GetKeyKind(key):
1281 """Return the kind of the given key."""
1282 return key.path().element_list()[-1].type()
1285 def _FilterIndexesByKind(key, indexes):
1286 """Return only the indexes with the specified kind."""
1287 return filter((lambda index:
1288 index.definition().entity_type() == _GetKeyKind(key)), indexes)
1291 class LiveTxn(object):
1292 """An in flight transaction."""
1311 ACTIVE = 1
1312 COMMITED = 2
1313 ROLLEDBACK = 3
1314 FAILED = 4
1316 _state = ACTIVE
1317 _commit_time_s = None
1319 def __init__(self, txn_manager, app, allow_multiple_eg):
1320 assert isinstance(txn_manager, BaseTransactionManager)
1321 assert isinstance(app, basestring)
1323 self._txn_manager = txn_manager
1324 self._app = app
1325 self._allow_multiple_eg = allow_multiple_eg
1328 self._entity_groups = {}
1330 self._lock = threading.RLock()
1331 self._apply_lock = threading.Lock()
1333 self._actions = []
1334 self._cost = datastore_pb.Cost()
1340 self._kind_to_indexes = collections.defaultdict(list)
1342 def _GetTracker(self, reference):
1343 """Gets the entity group tracker for reference.
1345 If this is the first time reference's entity group is seen, creates a new
1346 tracker, checking that the transaction doesn't exceed the entity group
1347 limit.
1349 entity_group = _GetEntityGroup(reference)
1350 key = datastore_types.ReferenceToKeyValue(entity_group)
1351 tracker = self._entity_groups.get(key, None)
1352 if tracker is None:
1353 Check(self._app == reference.app(),
1354 'Transactions cannot span applications (expected %s, got %s)' %
1355 (self._app, reference.app()))
1356 if self._allow_multiple_eg:
1357 Check(len(self._entity_groups) < _MAX_EG_PER_TXN,
1358 'operating on too many entity groups in a single transaction.')
1359 else:
1360 Check(len(self._entity_groups) < 1,
1361 "cross-groups transaction need to be explicitly "
1362 "specified (xg=True)")
1363 tracker = EntityGroupTracker(entity_group)
1364 self._entity_groups[key] = tracker
1366 return tracker
1368 def _GetAllTrackers(self):
1369 """Get the trackers for the transaction's entity groups.
1371 If no entity group has been discovered returns a 'global' entity group
1372 tracker. This is possible if the txn only contains transactional tasks.
1374 Returns:
1375 The tracker list for the entity groups used in this txn.
1377 if not self._entity_groups:
1378 self._GetTracker(datastore_types.Key.from_path(
1379 '__global__', 1, _app=self._app)._ToPb())
1380 return self._entity_groups.values()
1382 def _GrabSnapshot(self, reference):
1383 """Gets snapshot for this reference, creating it if necessary.
1385 If no snapshot has been set for reference's entity group, a snapshot is
1386 taken and stored for future reads (this also sets the read position),
1387 and a CONCURRENT_TRANSACTION exception is thrown if we no longer have
1388 a consistent snapshot.
1390 Args:
1391 reference: A entity_pb.Reference from which to extract the entity group.
1392 Raises:
1393 apiproxy_errors.ApplicationError if the snapshot is not consistent.
1395 tracker = self._GetTracker(reference)
1396 check_contention = tracker._snapshot is None
1397 snapshot = tracker._GrabSnapshot(self._txn_manager)
1398 if check_contention:
1404 candidates = [other for other in self._entity_groups.values()
1405 if other._snapshot is not None and other != tracker]
1406 meta_data_list = [other._meta_data for other in candidates]
1407 self._txn_manager._AcquireWriteLocks(meta_data_list)
1408 try:
1409 for other in candidates:
1410 if other._meta_data._log_pos != other._read_pos:
1411 self._state = self.FAILED
1412 raise apiproxy_errors.ApplicationError(
1413 datastore_pb.Error.CONCURRENT_TRANSACTION,
1414 'Concurrency exception.')
1415 finally:
1416 self._txn_manager._ReleaseWriteLocks(meta_data_list)
1417 return snapshot
1419 @_SynchronizeTxn
1420 def Get(self, reference):
1421 """Returns the entity associated with the given entity_pb.Reference or None.
1423 Does not see any modifications in the current txn.
1425 Args:
1426 reference: The entity_pb.Reference of the entity to look up.
1428 Returns:
1429 The associated entity_pb.EntityProto or None if no such entity exists.
1431 snapshot = self._GrabSnapshot(reference)
1432 entity = snapshot.get(datastore_types.ReferenceToKeyValue(reference))
1433 return LoadEntity(entity)
1435 @_SynchronizeTxn
1436 def GetQueryCursor(self, query, filters, orders, index_list):
1437 """Runs the given datastore_pb.Query and returns a QueryCursor for it.
1439 Does not see any modifications in the current txn.
1441 Args:
1442 query: The datastore_pb.Query to run.
1443 filters: A list of filters that override the ones found on query.
1444 orders: A list of orders that override the ones found on query.
1445 index_list: A list of indexes used by the query.
1447 Returns:
1448 A BaseCursor that can be used to fetch query results.
1450 Check(query.has_ancestor(),
1451 'Query must have an ancestor when performed in a transaction.')
1452 snapshot = self._GrabSnapshot(query.ancestor())
1453 return _ExecuteQuery(snapshot.values(), query, filters, orders, index_list)
1455 @_SynchronizeTxn
1456 def Put(self, entity, insert, indexes):
1457 """Puts the given entity.
1459 Args:
1460 entity: The entity_pb.EntityProto to put.
1461 insert: A boolean that indicates if we should fail if the entity already
1462 exists.
1463 indexes: The composite indexes that apply to the entity.
1465 tracker = self._GetTracker(entity.key())
1466 key = datastore_types.ReferenceToKeyValue(entity.key())
1467 tracker._delete.pop(key, None)
1468 tracker._put[key] = (entity, insert)
1469 self._kind_to_indexes[_GetKeyKind(entity.key())] = indexes
1471 @_SynchronizeTxn
1472 def Delete(self, reference, indexes):
1473 """Deletes the entity associated with the given reference.
1475 Args:
1476 reference: The entity_pb.Reference of the entity to delete.
1477 indexes: The composite indexes that apply to the entity.
1479 tracker = self._GetTracker(reference)
1480 key = datastore_types.ReferenceToKeyValue(reference)
1481 tracker._put.pop(key, None)
1482 tracker._delete[key] = reference
1483 self._kind_to_indexes[_GetKeyKind(reference)] = indexes
1485 @_SynchronizeTxn
1486 def AddActions(self, actions, max_actions=None):
1487 """Adds the given actions to the current txn.
1489 Args:
1490 actions: A list of pbs to send to taskqueue.Add when the txn is applied.
1491 max_actions: A number that indicates the maximum number of actions to
1492 allow on this txn.
1494 Check(not max_actions or len(self._actions) + len(actions) <= max_actions,
1495 'Too many messages, maximum allowed %s' % max_actions)
1496 self._actions.extend(actions)
1498 def Rollback(self):
1499 """Rollback the current txn."""
1501 self._lock.acquire()
1502 try:
1503 Check(self._state is self.ACTIVE or self._state is self.FAILED,
1504 'transaction closed')
1505 self._state = self.ROLLEDBACK
1506 finally:
1507 self._txn_manager._RemoveTxn(self)
1509 self._lock.release()
1511 @_SynchronizeTxn
1512 def Commit(self):
1513 """Commits the current txn.
1515 This function hands off the responsibility of calling _Apply to the owning
1516 TransactionManager.
1518 Returns:
1519 The cost of the transaction.
1521 try:
1523 trackers = self._GetAllTrackers()
1524 empty = True
1525 for tracker in trackers:
1526 snapshot = tracker._GrabSnapshot(self._txn_manager)
1527 empty = empty and not tracker._put and not tracker._delete
1530 for entity, insert in tracker._put.itervalues():
1531 Check(not insert or self.Get(entity.key()) is None,
1532 'the id allocated for a new entity was already '
1533 'in use, please try again')
1535 old_entity = None
1536 key = datastore_types.ReferenceToKeyValue(entity.key())
1537 if key in snapshot:
1538 old_entity = snapshot[key]
1539 self._AddWriteOps(old_entity, entity)
1541 for reference in tracker._delete.itervalues():
1544 old_entity = None
1545 key = datastore_types.ReferenceToKeyValue(reference)
1546 if key in snapshot:
1547 old_entity = snapshot[key]
1548 if old_entity is not None:
1549 self._AddWriteOps(None, old_entity)
1552 if empty and not self._actions:
1553 self.Rollback()
1554 return datastore_pb.Cost()
1557 meta_data_list = [tracker._meta_data for tracker in trackers]
1558 self._txn_manager._AcquireWriteLocks(meta_data_list)
1559 except:
1561 self.Rollback()
1562 raise
1564 try:
1566 for tracker in trackers:
1567 Check(tracker._meta_data._log_pos == tracker._read_pos,
1568 'Concurrency exception.',
1569 datastore_pb.Error.CONCURRENT_TRANSACTION)
1572 for tracker in trackers:
1573 tracker._meta_data.Log(self)
1574 self._state = self.COMMITED
1575 self._commit_time_s = time.time()
1576 except:
1578 self.Rollback()
1579 raise
1580 else:
1582 for action in self._actions:
1583 try:
1584 apiproxy_stub_map.MakeSyncCall(
1585 'taskqueue', 'Add', action, api_base_pb.VoidProto())
1586 except apiproxy_errors.ApplicationError, e:
1587 logging.warning('Transactional task %s has been dropped, %s',
1588 action, e)
1589 self._actions = []
1590 finally:
1591 self._txn_manager._RemoveTxn(self)
1593 self._txn_manager._ReleaseWriteLocks(meta_data_list)
1596 self._txn_manager._consistency_policy._OnCommit(self)
1597 return self._cost
1599 def _AddWriteOps(self, old_entity, new_entity):
1600 """Adds the cost of writing the new_entity to the _cost member.
1602 We assume that old_entity represents the current state of the Datastore.
1604 Args:
1605 old_entity: Entity representing the current state in the Datstore.
1606 new_entity: Entity representing the desired state in the Datstore.
1608 composite_indexes = self._kind_to_indexes[_GetKeyKind(new_entity.key())]
1609 entity_writes, index_writes = _CalculateWriteOps(
1610 composite_indexes, old_entity, new_entity)
1611 _UpdateCost(self._cost, entity_writes, index_writes)
1613 def _Apply(self, meta_data):
1614 """Applies the current txn on the given entity group.
1616 This function blindly performs the operations contained in the current txn.
1617 The calling function must acquire the entity group write lock and ensure
1618 transactions are applied in order.
1621 self._apply_lock.acquire()
1622 try:
1624 assert self._state == self.COMMITED
1625 for tracker in self._entity_groups.values():
1626 if tracker._meta_data is meta_data:
1627 break
1628 else:
1629 assert False
1630 assert tracker._read_pos != tracker.APPLIED
1633 for entity, insert in tracker._put.itervalues():
1634 self._txn_manager._Put(entity, insert)
1637 for key in tracker._delete.itervalues():
1638 self._txn_manager._Delete(key)
1642 tracker._read_pos = EntityGroupTracker.APPLIED
1645 tracker._meta_data.Unlog(self)
1646 finally:
1647 self._apply_lock.release()
1650 class EntityGroupTracker(object):
1651 """An entity group involved a transaction."""
1653 APPLIED = -2
1659 _read_pos = None
1662 _snapshot = None
1665 _meta_data = None
1667 def __init__(self, entity_group):
1668 self._entity_group = entity_group
1669 self._put = {}
1670 self._delete = {}
1672 def _GrabSnapshot(self, txn_manager):
1673 """Snapshot this entity group, remembering the read position."""
1674 if self._snapshot is None:
1675 self._meta_data, self._read_pos, self._snapshot = (
1676 txn_manager._GrabSnapshot(self._entity_group))
1677 return self._snapshot
1680 class EntityGroupMetaData(object):
1681 """The meta_data assoicated with an entity group."""
1684 _log_pos = -1
1686 _snapshot = None
1688 def __init__(self, entity_group):
1689 self._entity_group = entity_group
1690 self._write_lock = threading.Lock()
1691 self._apply_queue = []
1693 def CatchUp(self):
1694 """Applies all outstanding txns."""
1696 assert self._write_lock.acquire(False) is False
1698 while self._apply_queue:
1699 self._apply_queue[0]._Apply(self)
1701 def Log(self, txn):
1702 """Add a pending transaction to this entity group.
1704 Requires that the caller hold the meta data lock.
1705 This also increments the current log position and clears the snapshot cache.
1708 assert self._write_lock.acquire(False) is False
1709 self._apply_queue.append(txn)
1710 self._log_pos += 1
1711 self._snapshot = None
1713 def Unlog(self, txn):
1714 """Remove the first pending transaction from the apply queue.
1716 Requires that the caller hold the meta data lock.
1717 This checks that the first pending transaction is indeed txn.
1720 assert self._write_lock.acquire(False) is False
1722 Check(self._apply_queue and self._apply_queue[0] is txn,
1723 'Transaction is not appliable',
1724 datastore_pb.Error.INTERNAL_ERROR)
1725 self._apply_queue.pop(0)
1728 class BaseConsistencyPolicy(object):
1729 """A base class for a consistency policy to be used with a transaction manger.
1734 def _OnCommit(self, txn):
1735 """Called after a LiveTxn has been commited.
1737 This function can decide whether to apply the txn right away.
1739 Args:
1740 txn: A LiveTxn that has been commited
1742 raise NotImplementedError
1744 def _OnGroom(self, meta_data_list):
1745 """Called once for every global query.
1747 This function must aqcuire the write lock for any meta data before applying
1748 any outstanding txns.
1750 Args:
1751 meta_data_list: A list of EntityGroupMetaData objects.
1753 raise NotImplementedError
1756 class MasterSlaveConsistencyPolicy(BaseConsistencyPolicy):
1757 """Enforces the Master / Slave consistency policy.
1759 Applies all txn on commit.
1762 def _OnCommit(self, txn):
1764 for tracker in txn._GetAllTrackers():
1765 tracker._meta_data._write_lock.acquire()
1766 try:
1767 tracker._meta_data.CatchUp()
1768 finally:
1769 tracker._meta_data._write_lock.release()
1774 txn._txn_manager.Write()
1776 def _OnGroom(self, meta_data_list):
1779 pass
1782 class BaseHighReplicationConsistencyPolicy(BaseConsistencyPolicy):
1783 """A base class for High Replication Datastore consistency policies.
1785 All txn are applied asynchronously.
1788 def _OnCommit(self, txn):
1789 pass
1791 def _OnGroom(self, meta_data_list):
1794 for meta_data in meta_data_list:
1795 if not meta_data._apply_queue:
1796 continue
1799 meta_data._write_lock.acquire()
1800 try:
1801 while meta_data._apply_queue:
1802 txn = meta_data._apply_queue[0]
1803 if self._ShouldApply(txn, meta_data):
1804 txn._Apply(meta_data)
1805 else:
1806 break
1807 finally:
1808 meta_data._write_lock.release()
1810 def _ShouldApply(self, txn, meta_data):
1811 """Determins if the given transaction should be applied."""
1812 raise NotImplementedError
1815 class TimeBasedHRConsistencyPolicy(BaseHighReplicationConsistencyPolicy):
1816 """A High Replication Datastore consiseny policy based on elapsed time.
1818 This class tries to simulate performance seen in the high replication
1819 datastore using estimated probabilities of a transaction commiting after a
1820 given amount of time.
1823 _classification_map = [(.98, 100),
1824 (.99, 300),
1825 (.995, 2000),
1826 (1, 240000)
1829 def SetClassificationMap(self, classification_map):
1830 """Set the probability a txn will be applied after a given amount of time.
1832 Args:
1833 classification_map: A list of tuples containing (float between 0 and 1,
1834 number of miliseconds) that define the probability of a transaction
1835 applying after a given amount of time.
1837 for prob, delay in classification_map:
1838 if prob < 0 or prob > 1 or delay <= 0:
1839 raise TypeError(
1840 'classification_map must be a list of (probability, delay) tuples, '
1841 'found %r' % (classification_map,))
1843 self._classification_map = sorted(classification_map)
1845 def _ShouldApplyImpl(self, elapsed_ms, classification):
1846 for rate, ms in self._classification_map:
1847 if classification <= rate:
1848 break
1849 return elapsed_ms >= ms
1851 def _Classify(self, txn, meta_data):
1852 return random.Random(id(txn) ^ id(meta_data)).random()
1854 def _ShouldApply(self, txn, meta_data):
1855 elapsed_ms = (time.time() - txn._commit_time_s) * 1000
1856 classification = self._Classify(txn, meta_data)
1857 return self._ShouldApplyImpl(elapsed_ms, classification)
1860 class PseudoRandomHRConsistencyPolicy(BaseHighReplicationConsistencyPolicy):
1861 """A policy that always gives the same sequence of consistency decisions."""
1863 def __init__(self, probability=.5, seed=0):
1864 """Constructor.
1866 Args:
1867 probability: A number between 0 and 1 that is the likelihood of a
1868 transaction applying before a global query is executed.
1869 seed: A hashable object to use as a seed. Use None to use the current
1870 timestamp.
1872 self.SetProbability(probability)
1873 self.SetSeed(seed)
1875 def SetProbability(self, probability):
1876 """Change the probability of a transaction applying.
1878 Args:
1879 probability: A number between 0 and 1 that determins the probability of a
1880 transaction applying before a global query is run.
1882 if probability < 0 or probability > 1:
1883 raise TypeError('probability must be a number between 0 and 1, found %r' %
1884 probability)
1885 self._probability = probability
1887 def SetSeed(self, seed):
1888 """Reset the seed."""
1889 self._random = random.Random(seed)
1891 def _ShouldApply(self, txn, meta_data):
1892 return self._random.random() < self._probability
1895 class BaseTransactionManager(object):
1896 """A class that manages the state of transactions.
1898 This includes creating consistent snap shots for transactions.
1901 def __init__(self, consistency_policy=None):
1902 super(BaseTransactionManager, self).__init__()
1904 self._consistency_policy = (consistency_policy or
1905 MasterSlaveConsistencyPolicy())
1908 self._meta_data_lock = threading.Lock()
1909 BaseTransactionManager.Clear(self)
1911 def SetConsistencyPolicy(self, policy):
1912 """Set the consistency to use.
1914 Causes all data to be flushed.
1916 Args:
1917 policy: A obj inheriting from BaseConsistencyPolicy.
1919 if not isinstance(policy, BaseConsistencyPolicy):
1920 raise TypeError('policy should be of type '
1921 'datastore_stub_util.BaseConsistencyPolicy found %r.' %
1922 (policy,))
1923 self.Flush()
1924 self._consistency_policy = policy
1926 def Clear(self):
1927 """Discards any pending transactions and resets the meta data."""
1929 self._meta_data = {}
1931 self._txn_map = {}
1933 def BeginTransaction(self, app, allow_multiple_eg):
1934 """Start a transaction on the given app.
1936 Args:
1937 app: A string representing the app for which to start the transaction.
1938 allow_multiple_eg: True if transactions can span multiple entity groups.
1940 Returns:
1941 A datastore_pb.Transaction for the created transaction
1943 Check(not (allow_multiple_eg and isinstance(
1944 self._consistency_policy, MasterSlaveConsistencyPolicy)),
1945 'transactions on multiple entity groups only allowed with the '
1946 'High Replication datastore')
1947 txn = self._BeginTransaction(app, allow_multiple_eg)
1948 self._txn_map[id(txn)] = txn
1949 transaction = datastore_pb.Transaction()
1950 transaction.set_app(app)
1951 transaction.set_handle(id(txn))
1952 return transaction
1954 def GetTxn(self, transaction, request_trusted, request_app):
1955 """Gets the LiveTxn object associated with the given transaction.
1957 Args:
1958 transaction: The datastore_pb.Transaction to look up.
1959 request_trusted: A boolean indicating If the requesting app is trusted.
1960 request_app: A string representing the app making the request.
1962 Returns:
1963 The associated LiveTxn object.
1965 request_app = datastore_types.ResolveAppId(request_app)
1966 CheckTransaction(request_trusted, request_app, transaction)
1967 txn = self._txn_map.get(transaction.handle())
1968 Check(txn and txn._app == transaction.app(),
1969 'Transaction(<%s>) not found' % str(transaction).replace('\n', ', '))
1970 return txn
1972 def Groom(self):
1973 """Attempts to apply any outstanding transactions.
1975 The consistency policy determins if a transaction should be applied.
1977 self._meta_data_lock.acquire()
1978 try:
1979 self._consistency_policy._OnGroom(self._meta_data.itervalues())
1980 finally:
1981 self._meta_data_lock.release()
1983 def Flush(self):
1984 """Applies all outstanding transactions."""
1985 self._meta_data_lock.acquire()
1986 try:
1987 for meta_data in self._meta_data.itervalues():
1988 if not meta_data._apply_queue:
1989 continue
1992 meta_data._write_lock.acquire()
1993 try:
1994 meta_data.CatchUp()
1995 finally:
1996 meta_data._write_lock.release()
1997 finally:
1998 self._meta_data_lock.release()
2000 def _GetMetaData(self, entity_group):
2001 """Safely gets the EntityGroupMetaData object for the given entity_group.
2003 self._meta_data_lock.acquire()
2004 try:
2005 key = datastore_types.ReferenceToKeyValue(entity_group)
2007 meta_data = self._meta_data.get(key, None)
2008 if not meta_data:
2009 meta_data = EntityGroupMetaData(entity_group)
2010 self._meta_data[key] = meta_data
2011 return meta_data
2012 finally:
2013 self._meta_data_lock.release()
2015 def _BeginTransaction(self, app, allow_multiple_eg):
2016 """Starts a transaction without storing it in the txn_map."""
2017 return LiveTxn(self, app, allow_multiple_eg)
2019 def _GrabSnapshot(self, entity_group):
2020 """Grabs a consistent snapshot of the given entity group.
2022 Args:
2023 entity_group: A entity_pb.Reference of the entity group of which the
2024 snapshot should be taken.
2026 Returns:
2027 A tuple of (meta_data, log_pos, snapshot) where log_pos is the current log
2028 position and snapshot is a map of reference key value to
2029 entity_pb.EntityProto.
2032 meta_data = self._GetMetaData(entity_group)
2033 meta_data._write_lock.acquire()
2034 try:
2035 if not meta_data._snapshot:
2037 meta_data.CatchUp()
2038 meta_data._snapshot = self._GetEntitiesInEntityGroup(entity_group)
2039 return meta_data, meta_data._log_pos, meta_data._snapshot
2040 finally:
2042 meta_data._write_lock.release()
2044 def _AcquireWriteLocks(self, meta_data_list):
2045 """Acquire the write locks for the given entity group meta data.
2047 These locks must be released with _ReleaseWriteLock before returning to the
2048 user.
2050 Args:
2051 meta_data_list: list of EntityGroupMetaData objects.
2053 for meta_data in sorted(meta_data_list):
2054 meta_data._write_lock.acquire()
2056 def _ReleaseWriteLocks(self, meta_data_list):
2057 """Release the write locks of the given entity group meta data.
2059 Args:
2060 meta_data_list: list of EntityGroupMetaData objects.
2062 for meta_data in sorted(meta_data_list):
2063 meta_data._write_lock.release()
2065 def _RemoveTxn(self, txn):
2066 """Removes a LiveTxn from the txn_map (if present)."""
2067 self._txn_map.pop(id(txn), None)
2069 def _Put(self, entity, insert):
2070 """Put the given entity.
2072 This must be implemented by a sub-class. The sub-class can assume that any
2073 need consistency is enforced at a higher level (and can just put blindly).
2075 Args:
2076 entity: The entity_pb.EntityProto to put.
2077 insert: A boolean that indicates if we should fail if the entity already
2078 exists.
2080 raise NotImplementedError
2082 def _Delete(self, reference):
2083 """Delete the entity associated with the specified reference.
2085 This must be implemented by a sub-class. The sub-class can assume that any
2086 need consistency is enforced at a higher level (and can just delete
2087 blindly).
2089 Args:
2090 reference: The entity_pb.Reference of the entity to delete.
2092 raise NotImplementedError
2094 def _GetEntitiesInEntityGroup(self, entity_group):
2095 """Gets the contents of a specific entity group.
2097 This must be implemented by a sub-class. The sub-class can assume that any
2098 need consistency is enforced at a higher level (and can just blindly read).
2100 Other entity groups may be modified concurrently.
2102 Args:
2103 entity_group: A entity_pb.Reference of the entity group to get.
2105 Returns:
2106 A dict mapping datastore_types.ReferenceToKeyValue(key) to EntityProto
2108 raise NotImplementedError
2111 class BaseIndexManager(object):
2112 """A generic index manager that stores all data in memory."""
2121 WRITE_ONLY = entity_pb.CompositeIndex.WRITE_ONLY
2122 READ_WRITE = entity_pb.CompositeIndex.READ_WRITE
2123 DELETED = entity_pb.CompositeIndex.DELETED
2124 ERROR = entity_pb.CompositeIndex.ERROR
2126 _INDEX_STATE_TRANSITIONS = {
2127 WRITE_ONLY: frozenset((READ_WRITE, DELETED, ERROR)),
2128 READ_WRITE: frozenset((DELETED,)),
2129 ERROR: frozenset((DELETED,)),
2130 DELETED: frozenset((ERROR,)),
2133 def __init__(self):
2137 self.__indexes = collections.defaultdict(list)
2138 self.__indexes_lock = threading.Lock()
2139 self.__next_index_id = 1
2140 self.__index_id_lock = threading.Lock()
2142 def __FindIndex(self, index):
2143 """Finds an existing index by definition.
2145 Args:
2146 index: entity_pb.CompositeIndex
2148 Returns:
2149 entity_pb.CompositeIndex, if it exists; otherwise None
2151 app = index.app_id()
2152 if app in self.__indexes:
2153 for stored_index in self.__indexes[app]:
2154 if index.definition() == stored_index.definition():
2155 return stored_index
2157 return None
2159 def CreateIndex(self, index, trusted=False, calling_app=None):
2160 calling_app = datastore_types.ResolveAppId(calling_app)
2161 CheckAppId(trusted, calling_app, index.app_id())
2162 Check(index.id() == 0, 'New index id must be 0.')
2163 Check(not self.__FindIndex(index), 'Index already exists.')
2166 self.__index_id_lock.acquire()
2167 index.set_id(self.__next_index_id)
2168 self.__next_index_id += 1
2169 self.__index_id_lock.release()
2172 clone = entity_pb.CompositeIndex()
2173 clone.CopyFrom(index)
2174 app = index.app_id()
2175 clone.set_app_id(app)
2178 self.__indexes_lock.acquire()
2179 try:
2180 self.__indexes[app].append(clone)
2181 finally:
2182 self.__indexes_lock.release()
2184 self._OnIndexChange(index.app_id())
2186 return index.id()
2188 def GetIndexes(self, app, trusted=False, calling_app=None):
2189 """Get the CompositeIndex objects for the given app."""
2190 calling_app = datastore_types.ResolveAppId(calling_app)
2191 CheckAppId(trusted, calling_app, app)
2193 return self.__indexes[app]
2195 def UpdateIndex(self, index, trusted=False, calling_app=None):
2196 CheckAppId(trusted, calling_app, index.app_id())
2198 stored_index = self.__FindIndex(index)
2199 Check(stored_index, 'Index does not exist.')
2200 Check(index.state() == stored_index.state() or
2201 index.state() in self._INDEX_STATE_TRANSITIONS[stored_index.state()],
2202 'cannot move index state from %s to %s' %
2203 (entity_pb.CompositeIndex.State_Name(stored_index.state()),
2204 (entity_pb.CompositeIndex.State_Name(index.state()))))
2207 self.__indexes_lock.acquire()
2208 try:
2209 stored_index.set_state(index.state())
2210 finally:
2211 self.__indexes_lock.release()
2213 self._OnIndexChange(index.app_id())
2215 def DeleteIndex(self, index, trusted=False, calling_app=None):
2216 CheckAppId(trusted, calling_app, index.app_id())
2218 stored_index = self.__FindIndex(index)
2219 Check(stored_index, 'Index does not exist.')
2222 app = index.app_id()
2223 self.__indexes_lock.acquire()
2224 try:
2225 self.__indexes[app].remove(stored_index)
2226 finally:
2227 self.__indexes_lock.release()
2229 self._OnIndexChange(index.app_id())
2231 def _SideLoadIndex(self, index):
2232 self.__indexes[index.app()].append(index)
2234 def _OnIndexChange(self, app_id):
2235 pass
2238 class BaseDatastore(BaseTransactionManager, BaseIndexManager):
2239 """A base implemenation of a Datastore.
2241 This class implements common functions associated with a datastore and
2242 enforces security restrictions passed on by a stub or client. It is designed
2243 to be shared by any number of threads or clients serving any number of apps.
2245 If an app is not specified explicitly it is pulled from the env and assumed to
2246 be untrusted.
2251 _MAX_QUERY_COMPONENTS = 100
2255 _BATCH_SIZE = 20
2259 _MAX_ACTIONS_PER_TXN = 5
2261 def __init__(self, require_indexes=False, consistency_policy=None,
2262 use_atexit=True, auto_id_policy=SEQUENTIAL):
2263 BaseTransactionManager.__init__(self, consistency_policy=consistency_policy)
2264 BaseIndexManager.__init__(self)
2266 self._require_indexes = require_indexes
2267 self._pseudo_kinds = {}
2268 self.SetAutoIdPolicy(auto_id_policy)
2270 if use_atexit:
2275 atexit.register(self.Write)
2277 def Clear(self):
2278 """Clears out all stored values."""
2280 BaseTransactionManager.Clear(self)
2283 def _RegisterPseudoKind(self, kind):
2284 """Registers a pseudo kind to be used to satisfy a meta data query."""
2285 self._pseudo_kinds[kind.name] = kind
2286 kind._stub = weakref.proxy(self)
2291 def GetQueryCursor(self, raw_query, trusted=False, calling_app=None):
2292 """Execute a query.
2294 Args:
2295 raw_query: The non-validated datastore_pb.Query to run.
2296 trusted: If the calling app is trusted.
2297 calling_app: The app requesting the results or None to pull the app from
2298 the environment.
2300 Returns:
2301 A BaseCursor that can be used to retrieve results.
2304 calling_app = datastore_types.ResolveAppId(calling_app)
2305 CheckAppId(trusted, calling_app, raw_query.app())
2308 filters, orders = datastore_index.Normalize(raw_query.filter_list(),
2309 raw_query.order_list(),
2310 raw_query.property_name_list())
2313 CheckQuery(raw_query, filters, orders, self._MAX_QUERY_COMPONENTS)
2314 FillUsersInQuery(filters)
2317 self._CheckHasIndex(raw_query, trusted, calling_app)
2320 index_list = self.__IndexListForQuery(raw_query)
2323 if raw_query.has_transaction():
2325 Check(raw_query.kind() not in self._pseudo_kinds,
2326 'transactional queries on "%s" not allowed' % raw_query.kind())
2327 txn = self.GetTxn(raw_query.transaction(), trusted, calling_app)
2328 return txn.GetQueryCursor(raw_query, filters, orders, index_list)
2330 if raw_query.has_ancestor() and raw_query.kind() not in self._pseudo_kinds:
2332 txn = self._BeginTransaction(raw_query.app(), False)
2333 return txn.GetQueryCursor(raw_query, filters, orders, index_list)
2336 self.Groom()
2337 return self._GetQueryCursor(raw_query, filters, orders, index_list)
2339 def __IndexListForQuery(self, query):
2340 """Get the single composite index pb used by the query, if any, as a list.
2342 Args:
2343 query: the datastore_pb.Query to compute the index list for
2345 Returns:
2346 A singleton list of the composite index pb used by the query,
2349 required, kind, ancestor, props = (
2350 datastore_index.CompositeIndexForQuery(query))
2351 if not required:
2352 return []
2353 composite_index_pb = entity_pb.CompositeIndex()
2354 composite_index_pb.set_app_id(query.app())
2355 composite_index_pb.set_id(0)
2356 composite_index_pb.set_state(entity_pb.CompositeIndex.READ_WRITE)
2357 index_pb = composite_index_pb.mutable_definition()
2358 index_pb.set_entity_type(kind)
2359 index_pb.set_ancestor(bool(ancestor))
2360 for name, direction in datastore_index.GetRecommendedIndexProperties(props):
2361 prop_pb = entity_pb.Index_Property()
2362 prop_pb.set_name(name)
2363 prop_pb.set_direction(direction)
2364 index_pb.property_list().append(prop_pb)
2365 return [composite_index_pb]
2367 def Get(self, raw_keys, transaction=None, eventual_consistency=False,
2368 trusted=False, calling_app=None):
2369 """Get the entities for the given keys.
2371 Args:
2372 raw_keys: A list of unverified entity_pb.Reference objects.
2373 transaction: The datastore_pb.Transaction to use or None.
2374 eventual_consistency: If we should allow stale, potentially inconsistent
2375 results.
2376 trusted: If the calling app is trusted.
2377 calling_app: The app requesting the results or None to pull the app from
2378 the environment.
2380 Returns:
2381 A list containing the entity or None if no entity exists.
2384 if not raw_keys:
2385 return []
2387 calling_app = datastore_types.ResolveAppId(calling_app)
2389 if not transaction and eventual_consistency:
2391 result = []
2392 for key in raw_keys:
2393 CheckReference(calling_app, trusted, key)
2394 result.append(self._GetWithPseudoKinds(None, key))
2395 return result
2400 grouped_keys = collections.defaultdict(list)
2401 for i, key in enumerate(raw_keys):
2402 CheckReference(trusted, calling_app, key)
2403 entity_group = _GetEntityGroup(key)
2404 entity_group_key = datastore_types.ReferenceToKeyValue(entity_group)
2405 grouped_keys[entity_group_key].append((key, i))
2407 if transaction:
2409 txn = self.GetTxn(transaction, trusted, calling_app)
2410 return [self._GetWithPseudoKinds(txn, key) for key in raw_keys]
2411 else:
2414 result = [None] * len(raw_keys)
2416 def op(txn, v):
2417 key, i = v
2418 result[i] = self._GetWithPseudoKinds(txn, key)
2419 for keys in grouped_keys.itervalues():
2420 self._RunInTxn(keys, keys[0][0].app(), op)
2421 return result
2423 def _GetWithPseudoKinds(self, txn, key):
2424 """Fetch entity key in txn, taking account of pseudo-kinds."""
2425 pseudo_kind = self._pseudo_kinds.get(_GetKeyKind(key), None)
2426 if pseudo_kind:
2427 return pseudo_kind.Get(txn, key)
2428 elif txn:
2429 return txn.Get(key)
2430 else:
2431 return self._Get(key)
2433 def Put(self, raw_entities, cost, transaction=None,
2434 trusted=False, calling_app=None):
2435 """Writes the given given entities.
2437 Updates an entity's key and entity_group in place if needed
2439 Args:
2440 raw_entities: A list of unverified entity_pb.EntityProto objects.
2441 cost: Out param. The cost of putting the provided entities.
2442 transaction: The datastore_pb.Transaction to use or None.
2443 trusted: If the calling app is trusted.
2444 calling_app: The app requesting the results or None to pull the app from
2445 the environment.
2446 Returns:
2447 A list of entity_pb.Reference objects that indicates where each entity
2448 was stored.
2450 if not raw_entities:
2451 return []
2453 calling_app = datastore_types.ResolveAppId(calling_app)
2456 result = [None] * len(raw_entities)
2457 grouped_entities = collections.defaultdict(list)
2458 for i, raw_entity in enumerate(raw_entities):
2459 CheckEntity(trusted, calling_app, raw_entity)
2463 entity = entity_pb.EntityProto()
2464 entity.CopyFrom(raw_entity)
2467 for prop in itertools.chain(entity.property_list(),
2468 entity.raw_property_list()):
2469 FillUser(prop)
2471 last_element = entity.key().path().element_list()[-1]
2472 if not (last_element.id() or last_element.has_name()):
2473 insert = True
2476 if self._auto_id_policy == SEQUENTIAL:
2477 last_element.set_id(self._AllocateSequentialIds(entity.key())[0])
2478 else:
2479 full_key = self._AllocateIds([entity.key()])[0]
2480 last_element.set_id(full_key.path().element_list()[-1].id())
2481 else:
2482 insert = False
2484 entity_group = _GetEntityGroup(entity.key())
2485 entity.mutable_entity_group().CopyFrom(entity_group.path())
2486 entity_group_key = datastore_types.ReferenceToKeyValue(entity_group)
2487 grouped_entities[entity_group_key].append((entity, insert))
2491 key = entity_pb.Reference()
2492 key.CopyFrom(entity.key())
2493 result[i] = key
2495 if transaction:
2497 txn = self.GetTxn(transaction, trusted, calling_app)
2498 for group in grouped_entities.values():
2499 for entity, insert in group:
2501 indexes = _FilterIndexesByKind(entity.key(), self.GetIndexes(
2502 entity.key().app(), trusted, calling_app))
2503 txn.Put(entity, insert, indexes)
2504 else:
2506 for entities in grouped_entities.itervalues():
2507 txn_cost = self._RunInTxn(
2508 entities, entities[0][0].key().app(),
2510 lambda txn, v: txn.Put(v[0], v[1], _FilterIndexesByKind(
2511 v[0].key(),
2512 self.GetIndexes(v[0].key().app(), trusted, calling_app))))
2513 _UpdateCost(cost, txn_cost.entity_writes(), txn_cost.index_writes())
2514 return result
2516 def Delete(self, raw_keys, cost, transaction=None,
2517 trusted=False, calling_app=None):
2518 """Deletes the entities associated with the given keys.
2520 Args:
2521 raw_keys: A list of unverified entity_pb.Reference objects.
2522 cost: Out param. The cost of putting the provided entities.
2523 transaction: The datastore_pb.Transaction to use or None.
2524 trusted: If the calling app is trusted.
2525 calling_app: The app requesting the results or None to pull the app from
2526 the environment.
2528 if not raw_keys:
2529 return
2531 calling_app = datastore_types.ResolveAppId(calling_app)
2534 grouped_keys = collections.defaultdict(list)
2535 for key in raw_keys:
2536 CheckReference(trusted, calling_app, key)
2537 entity_group = _GetEntityGroup(key)
2538 entity_group_key = datastore_types.ReferenceToKeyValue(entity_group)
2539 grouped_keys[entity_group_key].append(key)
2541 if transaction:
2543 txn = self.GetTxn(transaction, trusted, calling_app)
2544 for key in raw_keys:
2546 indexes = _FilterIndexesByKind(key, self.GetIndexes(
2547 key.app(), trusted, calling_app))
2548 txn.Delete(key, indexes)
2549 else:
2551 for keys in grouped_keys.itervalues():
2553 txn_cost = self._RunInTxn(
2554 keys, keys[0].app(),
2555 lambda txn, key: txn.Delete(key, _FilterIndexesByKind(
2556 key, self.GetIndexes(key.app(), trusted, calling_app))))
2557 _UpdateCost(cost, txn_cost.entity_writes(), txn_cost.index_writes())
2559 def Touch(self, raw_keys, trusted=False, calling_app=None):
2560 """Applies all outstanding writes."""
2561 calling_app = datastore_types.ResolveAppId(calling_app)
2563 grouped_keys = collections.defaultdict(list)
2564 for key in raw_keys:
2565 CheckReference(trusted, calling_app, key)
2566 entity_group = _GetEntityGroup(key)
2567 entity_group_key = datastore_types.ReferenceToKeyValue(entity_group)
2568 grouped_keys[entity_group_key].append(key)
2570 for keys in grouped_keys.itervalues():
2571 self._RunInTxn(keys, keys[0].app(), lambda txn, key: None)
2573 def _RunInTxn(self, values, app, op):
2574 """Runs the given values in a separate Txn.
2576 Retries up to _RETRIES times on CONCURRENT_TRANSACTION errors.
2578 Args:
2579 values: A list of arguments to op.
2580 app: The app to create the Txn on.
2581 op: A function to run on each value in the Txn.
2583 Returns:
2584 The cost of the txn.
2586 retries = 0
2587 backoff = _INITIAL_RETRY_DELAY_MS / 1000.0
2588 while True:
2589 try:
2590 txn = self._BeginTransaction(app, False)
2591 for value in values:
2592 op(txn, value)
2593 return txn.Commit()
2594 except apiproxy_errors.ApplicationError, e:
2595 if e.application_error == datastore_pb.Error.CONCURRENT_TRANSACTION:
2597 retries += 1
2598 if retries <= _RETRIES:
2599 time.sleep(backoff)
2600 backoff *= _RETRY_DELAY_MULTIPLIER
2601 if backoff * 1000.0 > _MAX_RETRY_DELAY_MS:
2602 backoff = _MAX_RETRY_DELAY_MS / 1000.0
2603 continue
2604 raise
2606 def _CheckHasIndex(self, query, trusted=False, calling_app=None):
2607 """Checks if the query can be satisfied given the existing indexes.
2609 Args:
2610 query: the datastore_pb.Query to check
2611 trusted: True if the calling app is trusted (like dev_admin_console)
2612 calling_app: app_id of the current running application
2614 if query.kind() in self._pseudo_kinds or not self._require_indexes:
2615 return
2617 minimal_index = datastore_index.MinimalCompositeIndexForQuery(query,
2618 (datastore_index.ProtoToIndexDefinition(index)
2619 for index in self.GetIndexes(query.app(), trusted, calling_app)
2620 if index.state() == entity_pb.CompositeIndex.READ_WRITE))
2621 if minimal_index is not None:
2622 msg = ('This query requires a composite index that is not defined. '
2623 'You must update the index.yaml file in your application root.')
2624 is_most_efficient, kind, ancestor, properties = minimal_index
2625 if not is_most_efficient:
2627 yaml = datastore_index.IndexYamlForQuery(kind, ancestor,
2628 datastore_index.GetRecommendedIndexProperties(properties))
2629 msg += '\nThe following index is the minimum index required:\n' + yaml
2630 raise apiproxy_errors.ApplicationError(datastore_pb.Error.NEED_INDEX, msg)
2632 def SetAutoIdPolicy(self, auto_id_policy):
2633 """Set value of _auto_id_policy flag (default SEQUENTIAL).
2635 SEQUENTIAL auto ID assignment behavior will eventually be deprecated
2636 and the default will be SCATTERED.
2638 Args:
2639 auto_id_policy: string constant.
2640 Raises:
2641 TypeError: if auto_id_policy is not one of SEQUENTIAL or SCATTERED.
2643 valid_policies = (SEQUENTIAL, SCATTERED)
2644 if auto_id_policy not in valid_policies:
2645 raise TypeError('auto_id_policy must be in %s, found %s instead',
2646 valid_policies, auto_id_policy)
2647 self._auto_id_policy = auto_id_policy
2651 def Write(self):
2652 """Writes the datastore to disk."""
2653 self.Flush()
2655 def _GetQueryCursor(self, query, filters, orders, index_list):
2656 """Runs the given datastore_pb.Query and returns a QueryCursor for it.
2658 This must be implemented by a sub-class. The sub-class does not need to
2659 enforced any consistency guarantees (and can just blindly read).
2661 Args:
2662 query: The datastore_pb.Query to run.
2663 filters: A list of filters that override the ones found on query.
2664 orders: A list of orders that override the ones found on query.
2665 index_list: A list of indexes used by the query.
2667 Returns:
2668 A BaseCursor that can be used to fetch query results.
2670 raise NotImplementedError
2672 def _Get(self, reference):
2673 """Get the entity for the given reference or None.
2675 This must be implemented by a sub-class. The sub-class does not need to
2676 enforced any consistency guarantees (and can just blindly read).
2678 Args:
2679 reference: A entity_pb.Reference to loop up.
2681 Returns:
2682 The entity_pb.EntityProto associated with the given reference or None.
2684 raise NotImplementedError
2686 def _AllocateSequentialIds(self, reference, size=1, max_id=None):
2687 """Allocate sequential ids for given reference.
2689 Args:
2690 reference: An entity_pb.Reference to allocate an id for.
2691 size: The size of the range to allocate
2692 max_id: The upper bound of the range to allocate
2694 Returns:
2695 A tuple containing (min, max) of the allocated range.
2697 raise NotImplementedError
2699 def _AllocateIds(self, references):
2700 """Allocate or reserves IDs for the v4 datastore API.
2702 Incomplete keys are allocated scattered IDs. Complete keys have every id in
2703 their paths reserved in the appropriate ID space.
2705 Args:
2706 references: a list of entity_pb.Reference objects to allocate or reserve
2708 Returns:
2709 a list of complete entity_pb.Reference objects corresponding to the
2710 incomplete keys in the input, with newly allocated ids.
2712 raise NotImplementedError
2715 def _NeedsIndexes(func):
2716 """A decorator for DatastoreStub methods that require or affect indexes.
2718 Updates indexes to match index.yaml before the call and updates index.yaml
2719 after the call if require_indexes is False. If root_path is not set, this is a
2720 no op.
2723 def UpdateIndexesWrapper(self, *args, **kwargs):
2724 self._SetupIndexes()
2725 try:
2726 return func(self, *args, **kwargs)
2727 finally:
2728 self._UpdateIndexes()
2730 return UpdateIndexesWrapper
2733 class EntityGroupPseudoKind(object):
2734 """A common implementation of get() for the __entity_group__ pseudo-kind.
2736 Public properties:
2737 name: the pseudo-kind name
2739 name = '__entity_group__'
2749 base_version = int(time.time() * 1e6)
2751 def Get(self, txn, key):
2752 """Fetch key of this pseudo-kind within txn.
2754 Args:
2755 txn: transaction within which Get occurs, may be None if this is an
2756 eventually consistent Get.
2757 key: key of pseudo-entity to Get.
2759 Returns:
2760 An entity for key, or None if it doesn't exist.
2763 if not txn:
2764 txn = self._stub._BeginTransaction(key.app(), False)
2765 try:
2766 return self.Get(txn, key)
2767 finally:
2768 txn.Rollback()
2771 if isinstance(txn._txn_manager._consistency_policy,
2772 MasterSlaveConsistencyPolicy):
2773 return None
2780 path = key.path()
2781 if path.element_size() != 2 or path.element_list()[-1].id() != 1:
2782 return None
2784 tracker = txn._GetTracker(key)
2785 tracker._GrabSnapshot(txn._txn_manager)
2787 eg = entity_pb.EntityProto()
2788 eg.mutable_key().CopyFrom(key)
2789 eg.mutable_entity_group().CopyFrom(_GetEntityGroup(key).path())
2790 version = entity_pb.Property()
2791 version.set_name('__version__')
2792 version.set_multiple(False)
2793 version.mutable_value().set_int64value(
2794 tracker._read_pos + self.base_version)
2795 eg.property_list().append(version)
2796 return eg
2798 def Query(self, query, filters, orders):
2799 """Perform a query on this pseudo-kind.
2801 Args:
2802 query: the original datastore_pb.Query.
2803 filters: the filters from query.
2804 orders: the orders from query.
2806 Returns:
2807 always raises an error
2811 raise apiproxy_errors.ApplicationError(
2812 datastore_pb.Error.BAD_REQUEST, 'queries not supported on ' + self.name)
2815 class DatastoreStub(object):
2816 """A stub that maps datastore service calls on to a BaseDatastore.
2818 This class also keeps track of query cursors.
2821 def __init__(self,
2822 datastore,
2823 app_id=None,
2824 trusted=None,
2825 root_path=None):
2826 super(DatastoreStub, self).__init__()
2827 self._datastore = datastore
2828 self._app_id = datastore_types.ResolveAppId(app_id)
2829 self._trusted = trusted
2830 self._root_path = root_path
2833 self.__query_history = {}
2836 self.__query_ci_history = set()
2840 self._cached_yaml = (None, None, None)
2842 if self._require_indexes or root_path is None:
2844 self._index_yaml_updater = None
2845 else:
2847 self._index_yaml_updater = datastore_stub_index.IndexYamlUpdater(
2848 root_path)
2850 DatastoreStub.Clear(self)
2852 def Clear(self):
2853 """Clears out all stored values."""
2854 self._query_cursors = {}
2855 self.__query_history = {}
2856 self.__query_ci_history = set()
2858 def QueryHistory(self):
2859 """Returns a dict that maps Query PBs to times they've been run."""
2861 return dict((pb, times) for pb, times in self.__query_history.items()
2862 if pb.app() == self._app_id)
2864 def _QueryCompositeIndexHistoryLength(self):
2865 """Returns the length of the CompositeIndex set for query history."""
2866 return len(self.__query_ci_history)
2868 def SetTrusted(self, trusted):
2869 """Set/clear the trusted bit in the stub.
2871 This bit indicates that the app calling the stub is trusted. A
2872 trusted app can write to datastores of other apps.
2874 Args:
2875 trusted: boolean.
2877 self._trusted = trusted
2881 def _Dynamic_Get(self, req, res):
2884 transaction = req.has_transaction() and req.transaction() or None
2887 if req.allow_deferred() and req.key_size() > _MAXIMUM_RESULTS:
2888 keys_to_get = req.key_list()[:_MAXIMUM_RESULTS]
2889 deferred_keys = req.key_list()[_MAXIMUM_RESULTS:]
2890 res.deferred_list().extend(deferred_keys)
2891 else:
2893 keys_to_get = req.key_list()
2895 res.set_in_order(not req.allow_deferred())
2897 total_response_bytes = 0
2898 for index, entity in enumerate(self._datastore.Get(keys_to_get,
2899 transaction,
2900 req.has_failover_ms(),
2901 self._trusted,
2902 self._app_id)):
2903 entity_size = entity and entity.ByteSize() or 0
2906 if (req.allow_deferred()
2907 and index > 0
2908 and total_response_bytes + entity_size > _MAXIMUM_QUERY_RESULT_BYTES):
2910 res.deferred_list().extend(keys_to_get[index:])
2911 break
2912 elif entity:
2913 entity_result = res.add_entity()
2914 entity_result.mutable_entity().CopyFrom(entity)
2915 total_response_bytes += entity_size
2916 else:
2918 entity_result = res.add_entity()
2919 entity_result.mutable_key().CopyFrom(keys_to_get[index])
2921 def _Dynamic_Put(self, req, res):
2922 transaction = req.has_transaction() and req.transaction() or None
2923 res.key_list().extend(self._datastore.Put(req.entity_list(),
2924 res.mutable_cost(),
2925 transaction,
2926 self._trusted, self._app_id))
2928 def _Dynamic_Delete(self, req, res):
2929 transaction = req.has_transaction() and req.transaction() or None
2930 self._datastore.Delete(req.key_list(), res.mutable_cost(), transaction,
2931 self._trusted, self._app_id)
2933 def _Dynamic_Touch(self, req, _):
2934 self._datastore.Touch(req.key_list(), self._trusted, self._app_id)
2936 @_NeedsIndexes
2937 def _Dynamic_RunQuery(self, query, query_result):
2938 cursor = self._datastore.GetQueryCursor(query, self._trusted, self._app_id)
2940 if query.has_count():
2941 count = query.count()
2942 elif query.has_limit():
2943 count = query.limit()
2944 else:
2945 count = self._BATCH_SIZE
2947 cursor.PopulateQueryResult(query_result, count, query.offset(),
2948 query.compile(), first_result=True)
2949 if query_result.has_cursor():
2950 self._query_cursors[query_result.cursor().cursor()] = cursor
2953 if query.compile():
2956 compiled_query = query_result.mutable_compiled_query()
2957 compiled_query.set_keys_only(query.keys_only())
2958 compiled_query.mutable_primaryscan().set_index_name(query.Encode())
2959 self.__UpdateQueryHistory(query)
2961 def __UpdateQueryHistory(self, query):
2963 clone = datastore_pb.Query()
2964 clone.CopyFrom(query)
2965 clone.clear_hint()
2966 clone.clear_limit()
2967 clone.clear_offset()
2968 clone.clear_count()
2969 if clone in self.__query_history:
2970 self.__query_history[clone] += 1
2971 else:
2972 self.__query_history[clone] = 1
2973 if clone.app() == self._app_id:
2974 self.__query_ci_history.add(
2975 datastore_index.CompositeIndexForQuery(clone))
2978 def _Dynamic_Next(self, next_request, query_result):
2979 app = next_request.cursor().app()
2980 CheckAppId(self._trusted, self._app_id, app)
2982 cursor = self._query_cursors.get(next_request.cursor().cursor())
2983 Check(cursor and cursor.app == app,
2984 'Cursor %d not found' % next_request.cursor().cursor())
2986 count = self._BATCH_SIZE
2987 if next_request.has_count():
2988 count = next_request.count()
2990 cursor.PopulateQueryResult(query_result, count, next_request.offset(),
2991 next_request.compile(), first_result=False)
2993 if not query_result.has_cursor():
2994 del self._query_cursors[next_request.cursor().cursor()]
2996 def _Dynamic_AddActions(self, request, _):
2997 """Associates the creation of one or more tasks with a transaction.
2999 Args:
3000 request: A taskqueue_service_pb.TaskQueueBulkAddRequest containing the
3001 tasks that should be created when the transaction is committed.
3007 if not request.add_request_list():
3008 return
3010 transaction = request.add_request_list()[0].transaction()
3011 txn = self._datastore.GetTxn(transaction, self._trusted, self._app_id)
3012 new_actions = []
3013 for add_request in request.add_request_list():
3017 Check(add_request.transaction() == transaction,
3018 'Cannot add requests to different transactions')
3019 clone = taskqueue_service_pb.TaskQueueAddRequest()
3020 clone.CopyFrom(add_request)
3021 clone.clear_transaction()
3022 new_actions.append(clone)
3024 txn.AddActions(new_actions, self._MAX_ACTIONS_PER_TXN)
3026 def _Dynamic_BeginTransaction(self, req, transaction):
3027 CheckAppId(self._trusted, self._app_id, req.app())
3028 transaction.CopyFrom(self._datastore.BeginTransaction(
3029 req.app(), req.allow_multiple_eg()))
3031 def _Dynamic_Commit(self, transaction, res):
3032 CheckAppId(self._trusted, self._app_id, transaction.app())
3033 txn = self._datastore.GetTxn(transaction, self._trusted, self._app_id)
3034 res.mutable_cost().CopyFrom(txn.Commit())
3036 def _Dynamic_Rollback(self, transaction, _):
3037 CheckAppId(self._trusted, self._app_id, transaction.app())
3038 txn = self._datastore.GetTxn(transaction, self._trusted, self._app_id)
3039 txn.Rollback()
3041 def _Dynamic_CreateIndex(self, index, id_response):
3042 id_response.set_value(self._datastore.CreateIndex(index,
3043 self._trusted,
3044 self._app_id))
3046 @_NeedsIndexes
3047 def _Dynamic_GetIndices(self, app_str, composite_indices):
3048 composite_indices.index_list().extend(self._datastore.GetIndexes(
3049 app_str.value(), self._trusted, self._app_id))
3051 def _Dynamic_UpdateIndex(self, index, _):
3052 self._datastore.UpdateIndex(index, self._trusted, self._app_id)
3054 def _Dynamic_DeleteIndex(self, index, _):
3055 self._datastore.DeleteIndex(index, self._trusted, self._app_id)
3057 def _Dynamic_AllocateIds(self, allocate_ids_request, allocate_ids_response):
3058 CheckAppId(allocate_ids_request.model_key().app(),
3059 self._trusted, self._app_id)
3061 reference = allocate_ids_request.model_key()
3063 (start, end) = self._datastore._AllocateSequentialIds(
3064 reference, allocate_ids_request.size(), allocate_ids_request.max())
3066 allocate_ids_response.set_start(start)
3067 allocate_ids_response.set_end(end)
3069 def _SetupIndexes(self, _open=open):
3070 """Ensure that the set of existing composite indexes matches index.yaml.
3072 Note: this is similar to the algorithm used by the admin console for
3073 the same purpose.
3078 if not self._root_path:
3079 return
3080 index_yaml_file = os.path.join(self._root_path, 'index.yaml')
3081 if (self._cached_yaml[0] == index_yaml_file and
3082 os.path.exists(index_yaml_file) and
3083 os.path.getmtime(index_yaml_file) == self._cached_yaml[1]):
3084 requested_indexes = self._cached_yaml[2]
3085 else:
3086 try:
3087 index_yaml_mtime = os.path.getmtime(index_yaml_file)
3088 fh = _open(index_yaml_file, 'r')
3089 except (OSError, IOError):
3090 index_yaml_data = None
3091 else:
3092 try:
3093 index_yaml_data = fh.read()
3094 finally:
3095 fh.close()
3097 requested_indexes = []
3098 if index_yaml_data is not None:
3100 index_defs = datastore_index.ParseIndexDefinitions(index_yaml_data)
3101 if index_defs is not None and index_defs.indexes is not None:
3103 requested_indexes = datastore_index.IndexDefinitionsToProtos(
3104 self._app_id,
3105 index_defs.indexes)
3106 self._cached_yaml = (index_yaml_file, index_yaml_mtime,
3107 requested_indexes)
3110 existing_indexes = self._datastore.GetIndexes(
3111 self._app_id, self._trusted, self._app_id)
3114 requested = dict((x.definition().Encode(), x) for x in requested_indexes)
3115 existing = dict((x.definition().Encode(), x) for x in existing_indexes)
3118 created = 0
3119 for key, index in requested.iteritems():
3120 if key not in existing:
3121 new_index = entity_pb.CompositeIndex()
3122 new_index.CopyFrom(index)
3123 new_index.set_id(datastore_admin.CreateIndex(new_index))
3124 new_index.set_state(entity_pb.CompositeIndex.READ_WRITE)
3125 datastore_admin.UpdateIndex(new_index)
3126 created += 1
3129 deleted = 0
3130 for key, index in existing.iteritems():
3131 if key not in requested:
3132 datastore_admin.DeleteIndex(index)
3133 deleted += 1
3136 if created or deleted:
3137 logging.debug('Created %d and deleted %d index(es); total %d',
3138 created, deleted, len(requested))
3140 def _UpdateIndexes(self):
3141 if self._index_yaml_updater is not None:
3142 self._index_yaml_updater.UpdateIndexYaml()
3145 class StubQueryConverter(object):
3146 """Converter for v3 and v4 queries suitable for use in stubs."""
3148 def __init__(self, entity_converter):
3149 self._entity_converter = entity_converter
3151 def v4_to_v3_compiled_cursor(self, v4_cursor, v3_compiled_cursor):
3152 """Converts a v4 cursor string to a v3 CompiledCursor.
3154 Args:
3155 v4_cursor: a string representing a v4 query cursor
3156 v3_compiled_cursor: a datastore_pb.CompiledCursor to populate
3158 v3_compiled_cursor.Clear()
3159 try:
3160 v3_compiled_cursor.ParseFromString(v4_cursor)
3161 except ProtocolBuffer.ProtocolBufferDecodeError:
3162 raise datastore_pbs.InvalidConversionError('Invalid query cursor.')
3164 def v3_to_v4_compiled_cursor(self, v3_compiled_cursor):
3165 """Converts a v3 CompiledCursor to a v4 cursor string.
3167 Args:
3168 v3_compiled_cursor: a datastore_pb.CompiledCursor
3170 Returns:
3171 a string representing a v4 query cursor
3173 return v3_compiled_cursor.SerializeToString()
3175 def v4_to_v3_query(self, v4_partition_id, v4_query, v3_query):
3176 """Converts a v4 Query to a v3 Query.
3178 Args:
3179 v4_partition_id: a datastore_v4_pb.PartitionId
3180 v4_query: a datastore_v4_pb.Query
3181 v3_query: a datastore_pb.Query to populate
3183 Raises:
3184 InvalidConversionError if the query cannot be converted
3186 v3_query.Clear()
3188 if v4_partition_id.dataset_id():
3189 v3_query.set_app(v4_partition_id.dataset_id())
3190 if v4_partition_id.has_namespace():
3191 v3_query.set_name_space(v4_partition_id.namespace())
3193 v3_query.set_persist_offset(True)
3194 v3_query.set_require_perfect_plan(True)
3195 v3_query.set_compile(True)
3198 if v4_query.has_limit():
3199 v3_query.set_limit(v4_query.limit())
3200 if v4_query.offset():
3201 v3_query.set_offset(v4_query.offset())
3202 if v4_query.has_start_cursor():
3203 self.v4_to_v3_compiled_cursor(v4_query.start_cursor(),
3204 v3_query.mutable_compiled_cursor())
3205 if v4_query.has_end_cursor():
3206 self.v4_to_v3_compiled_cursor(v4_query.end_cursor(),
3207 v3_query.mutable_end_compiled_cursor())
3210 if v4_query.kind_list():
3211 datastore_pbs.check_conversion(len(v4_query.kind_list()) == 1,
3212 'multiple kinds not supported')
3213 v3_query.set_kind(v4_query.kind(0).name())
3216 has_key_projection = False
3217 for prop in v4_query.projection_list():
3218 if prop.property().name() == datastore_pbs.PROPERTY_NAME_KEY:
3219 has_key_projection = True
3220 else:
3221 v3_query.add_property_name(prop.property().name())
3222 if has_key_projection and not v3_query.property_name_list():
3223 v3_query.set_keys_only(True)
3226 for prop in v4_query.group_by_list():
3227 v3_query.add_group_by_property_name(prop.name())
3230 self.__populate_v3_filters(v4_query.filter(), v3_query)
3233 for v4_order in v4_query.order_list():
3234 v3_order = v3_query.add_order()
3235 v3_order.set_property(v4_order.property().name())
3236 if v4_order.has_direction():
3237 v3_order.set_direction(v4_order.direction())
3239 def v3_to_v4_query(self, v3_query, v4_query):
3240 """Converts a v3 Query to a v4 Query.
3242 Args:
3243 v3_query: a datastore_pb.Query
3244 v4_query: a datastore_v4_pb.Query to populate
3246 Raises:
3247 InvalidConversionError if the query cannot be converted
3249 v4_query.Clear()
3251 datastore_pbs.check_conversion(not v3_query.has_distinct(),
3252 'distinct option not supported')
3253 datastore_pbs.check_conversion(v3_query.require_perfect_plan(),
3254 'non-perfect plans not supported')
3258 if v3_query.has_limit():
3259 v4_query.set_limit(v3_query.limit())
3260 if v3_query.offset():
3261 v4_query.set_offset(v3_query.offset())
3262 if v3_query.has_compiled_cursor():
3263 v4_query.set_start_cursor(
3264 self.v3_to_v4_compiled_cursor(v3_query.compiled_cursor()))
3265 if v3_query.has_end_compiled_cursor():
3266 v4_query.set_end_cursor(
3267 self.v3_to_v4_compiled_cursor(v3_query.end_compiled_cursor()))
3270 if v3_query.has_kind():
3271 v4_query.add_kind().set_name(v3_query.kind())
3274 for name in v3_query.property_name_list():
3275 v4_query.add_projection().mutable_property().set_name(name)
3276 if v3_query.keys_only():
3277 v4_query.add_projection().mutable_property().set_name(
3278 datastore_pbs.PROPERTY_NAME_KEY)
3281 for name in v3_query.group_by_property_name_list():
3282 v4_query.add_group_by().set_name(name)
3285 num_v4_filters = len(v3_query.filter_list())
3286 if v3_query.has_ancestor():
3287 num_v4_filters += 1
3289 if num_v4_filters == 1:
3290 get_property_filter = self.__get_property_filter
3291 elif num_v4_filters >= 1:
3292 v4_query.mutable_filter().mutable_composite_filter().set_operator(
3293 datastore_v4_pb.CompositeFilter.AND)
3294 get_property_filter = self.__add_property_filter
3296 if v3_query.has_ancestor():
3297 self.__v3_query_to_v4_ancestor_filter(v3_query,
3298 get_property_filter(v4_query))
3299 for v3_filter in v3_query.filter_list():
3300 self.__v3_filter_to_v4_property_filter(v3_filter,
3301 get_property_filter(v4_query))
3304 for v3_order in v3_query.order_list():
3305 v4_order = v4_query.add_order()
3306 v4_order.mutable_property().set_name(v3_order.property())
3307 if v3_order.has_direction():
3308 v4_order.set_direction(v3_order.direction())
3310 def __get_property_filter(self, v4_query):
3311 """Returns the PropertyFilter from the query's top-level filter."""
3312 return v4_query.mutable_filter().mutable_property_filter()
3314 def __add_property_filter(self, v4_query):
3315 """Adds and returns a PropertyFilter from the query's composite filter."""
3316 v4_comp_filter = v4_query.mutable_filter().mutable_composite_filter()
3317 return v4_comp_filter.add_filter().mutable_property_filter()
3319 def __populate_v3_filters(self, v4_filter, v3_query):
3320 """Populates a filters for a v3 Query.
3322 Args:
3323 v4_filter: a datastore_v4_pb.Filter
3324 v3_query: a datastore_pb.Query to populate with filters
3326 if v4_filter.has_property_filter():
3327 v4_property_filter = v4_filter.property_filter()
3328 if (v4_property_filter.operator()
3329 == datastore_v4_pb.PropertyFilter.HAS_ANCESTOR):
3330 datastore_pbs.check_conversion(
3331 v4_property_filter.value().has_key_value(),
3332 'HAS_ANCESTOR requires a reference value')
3333 datastore_pbs.check_conversion((v4_property_filter.property().name()
3334 == datastore_pbs.PROPERTY_NAME_KEY),
3335 'unsupported property')
3336 datastore_pbs.check_conversion(not v3_query.has_ancestor(),
3337 'duplicate ancestor constraint')
3338 self._entity_converter.v4_to_v3_reference(
3339 v4_property_filter.value().key_value(),
3340 v3_query.mutable_ancestor())
3341 else:
3342 v3_filter = v3_query.add_filter()
3343 property_name = v4_property_filter.property().name()
3344 v3_filter.set_op(v4_property_filter.operator())
3345 datastore_pbs.check_conversion(
3346 not v4_property_filter.value().list_value_list(),
3347 ('unsupported value type, %s, in property filter'
3348 ' on "%s"' % ('list_value', property_name)))
3349 prop = v3_filter.add_property()
3350 prop.set_multiple(False)
3351 prop.set_name(property_name)
3352 self._entity_converter.v4_value_to_v3_property_value(
3353 v4_property_filter.value(), prop.mutable_value())
3354 elif v4_filter.has_composite_filter():
3355 datastore_pbs.check_conversion((v4_filter.composite_filter().operator()
3356 == datastore_v4_pb.CompositeFilter.AND),
3357 'unsupported composite property operator')
3358 for v4_sub_filter in v4_filter.composite_filter().filter_list():
3359 self.__populate_v3_filters(v4_sub_filter, v3_query)
3361 def __v3_filter_to_v4_property_filter(self, v3_filter, v4_property_filter):
3362 """Converts a v3 Filter to a v4 PropertyFilter.
3364 Args:
3365 v3_filter: a datastore_pb.Filter
3366 v4_property_filter: a datastore_v4_pb.PropertyFilter to populate
3368 Raises:
3369 InvalidConversionError if the filter cannot be converted
3371 datastore_pbs.check_conversion(v3_filter.property_size() == 1,
3372 'invalid filter')
3373 datastore_pbs.check_conversion(v3_filter.op() <= 5,
3374 'unsupported filter op: %d' % v3_filter.op())
3375 v4_property_filter.Clear()
3376 v4_property_filter.set_operator(v3_filter.op())
3377 v4_property_filter.mutable_property().set_name(v3_filter.property(0).name())
3378 self._entity_converter.v3_property_to_v4_value(
3379 v3_filter.property(0), True, v4_property_filter.mutable_value())
3381 def __v3_query_to_v4_ancestor_filter(self, v3_query, v4_property_filter):
3382 """Converts a v3 Query to a v4 ancestor PropertyFilter.
3384 Args:
3385 v3_query: a datastore_pb.Query
3386 v4_property_filter: a datastore_v4_pb.PropertyFilter to populate
3388 v4_property_filter.Clear()
3389 v4_property_filter.set_operator(
3390 datastore_v4_pb.PropertyFilter.HAS_ANCESTOR)
3391 prop = v4_property_filter.mutable_property()
3392 prop.set_name(datastore_pbs.PROPERTY_NAME_KEY)
3393 self._entity_converter.v3_to_v4_key(
3394 v3_query.ancestor(),
3395 v4_property_filter.mutable_value().mutable_key_value())
3399 __query_converter = StubQueryConverter(datastore_pbs.get_entity_converter())
3402 def get_query_converter():
3403 """Returns a converter for v3 and v4 queries (not suitable for production).
3405 This converter is suitable for use in stubs but not for production.
3407 Returns:
3408 a StubQueryConverter
3410 return __query_converter
3413 class StubServiceConverter(object):
3414 """Converter for v3/v4 request/response protos suitable for use in stubs."""
3416 def __init__(self, entity_converter, query_converter):
3417 self._entity_converter = entity_converter
3418 self._query_converter = query_converter
3420 def v4_to_v3_cursor(self, v4_query_handle, v3_cursor):
3421 """Converts a v4 cursor string to a v3 Cursor.
3423 Args:
3424 v4_query_handle: a string representing a v4 query handle
3425 v3_cursor: a datastore_pb.Cursor to populate
3427 try:
3428 v3_cursor.ParseFromString(v4_query_handle)
3429 except ProtocolBuffer.ProtocolBufferDecodeError:
3430 raise datastore_pbs.InvalidConversionError('Invalid query handle.')
3431 return v3_cursor
3433 def _v3_to_v4_query_handle(self, v3_cursor):
3434 """Converts a v3 Cursor to a v4 query handle string.
3436 Args:
3437 v3_cursor: a datastore_pb.Cursor
3439 Returns:
3440 a string representing a v4 cursor
3442 return v3_cursor.SerializeToString()
3444 def v4_to_v3_txn(self, v4_txn, v3_txn):
3445 """Converts a v4 transaction string to a v3 Transaction.
3447 Args:
3448 v4_txn: a string representing a v4 transaction
3449 v3_txn: a datastore_pb.Transaction to populate
3451 try:
3452 v3_txn.ParseFromString(v4_txn)
3453 except ProtocolBuffer.ProtocolBufferDecodeError:
3454 raise datastore_pbs.InvalidConversionError('Invalid transaction.')
3455 return v3_txn
3457 def _v3_to_v4_txn(self, v3_txn):
3458 """Converts a v3 Transaction to a v4 transaction string.
3460 Args:
3461 v3_txn: a datastore_pb.Transaction
3463 Returns:
3464 a string representing a v4 transaction
3466 return v3_txn.SerializeToString()
3471 def v4_to_v3_begin_transaction_req(self, app_id, v4_req):
3472 """Converts a v4 BeginTransactionRequest to a v3 BeginTransactionRequest.
3474 Args:
3475 app_id: app id
3476 v4_req: a datastore_v4_pb.BeginTransactionRequest
3478 Returns:
3479 a datastore_pb.BeginTransactionRequest
3481 v3_req = datastore_pb.BeginTransactionRequest()
3482 v3_req.set_app(app_id)
3483 v3_req.set_allow_multiple_eg(v4_req.cross_group())
3484 return v3_req
3486 def v3_to_v4_begin_transaction_resp(self, v3_resp):
3487 """Converts a v3 Transaction to a v4 BeginTransactionResponse.
3489 Args:
3490 v3_resp: a datastore_pb.Transaction
3492 Returns:
3493 a datastore_v4_pb.BeginTransactionResponse
3495 v4_resp = datastore_v4_pb.BeginTransactionResponse()
3496 v4_resp.set_transaction(self._v3_to_v4_txn(v3_resp))
3497 return v4_resp
3502 def v4_rollback_req_to_v3_txn(self, v4_req):
3503 """Converts a v4 RollbackRequest to a v3 Transaction.
3505 Args:
3506 v4_req: a datastore_v4_pb.RollbackRequest
3508 Returns:
3509 a datastore_pb.Transaction
3511 v3_txn = datastore_pb.Transaction()
3512 self.v4_to_v3_txn(v4_req.transaction(), v3_txn)
3513 return v3_txn
3518 def v4_commit_req_to_v3_txn(self, v4_req):
3519 """Converts a v4 CommitRequest to a v3 Transaction.
3521 Args:
3522 v4_req: a datastore_v4_pb.CommitRequest
3524 Returns:
3525 a datastore_pb.Transaction
3527 v3_txn = datastore_pb.Transaction()
3528 self.v4_to_v3_txn(v4_req.transaction(), v3_txn)
3529 return v3_txn
3534 def v4_run_query_req_to_v3_query(self, v4_req):
3535 """Converts a v4 RunQueryRequest to a v3 Query.
3537 GQL is not supported.
3539 Args:
3540 v4_req: a datastore_v4_pb.RunQueryRequest
3542 Returns:
3543 a datastore_pb.Query
3546 datastore_pbs.check_conversion(not v4_req.has_gql_query(),
3547 'GQL not supported')
3548 v3_query = datastore_pb.Query()
3549 self._query_converter.v4_to_v3_query(v4_req.partition_id(), v4_req.query(),
3550 v3_query)
3553 if v4_req.has_suggested_batch_size():
3554 v3_query.set_count(v4_req.suggested_batch_size())
3557 read_options = v4_req.read_options()
3558 if read_options.has_transaction():
3559 self.v4_to_v3_txn(read_options.transaction(),
3560 v3_query.mutable_transaction())
3561 elif (read_options.read_consistency()
3562 == datastore_v4_pb.ReadOptions.EVENTUAL):
3563 v3_query.set_strong(False)
3564 v3_query.set_failover_ms(-1)
3565 elif read_options.read_consistency() == datastore_v4_pb.ReadOptions.STRONG:
3566 v3_query.set_strong(True)
3568 if v4_req.has_min_safe_time_seconds():
3569 v3_query.set_min_safe_time_seconds(v4_req.min_safe_time_seconds())
3571 return v3_query
3573 def v3_to_v4_run_query_req(self, v3_req):
3574 """Converts a v3 Query to a v4 RunQueryRequest.
3576 Args:
3577 v3_req: a datastore_pb.Query
3579 Returns:
3580 a datastore_v4_pb.RunQueryRequest
3582 v4_req = datastore_v4_pb.RunQueryRequest()
3585 v4_partition_id = v4_req.mutable_partition_id()
3586 v4_partition_id.set_dataset_id(v3_req.app())
3587 if v3_req.name_space():
3588 v4_partition_id.set_namespace(v3_req.name_space())
3591 if v3_req.has_count():
3592 v4_req.set_suggested_batch_size(v3_req.count())
3594 datastore_pbs.check_conversion(
3595 not (v3_req.has_transaction() and v3_req.has_failover_ms()),
3596 'Cannot set failover and transaction handle.')
3599 if v3_req.has_transaction():
3600 v4_req.mutable_read_options().set_transaction(
3601 self._v3_to_v4_txn(v3_req.transaction()))
3602 elif v3_req.strong():
3603 v4_req.mutable_read_options().set_read_consistency(
3604 datastore_v4_pb.ReadOptions.STRONG)
3605 elif v3_req.has_failover_ms():
3606 v4_req.mutable_read_options().set_read_consistency(
3607 datastore_v4_pb.ReadOptions.EVENTUAL)
3608 if v3_req.has_min_safe_time_seconds():
3609 v4_req.set_min_safe_time_seconds(v3_req.min_safe_time_seconds())
3611 self._query_converter.v3_to_v4_query(v3_req, v4_req.mutable_query())
3613 return v4_req
3615 def v4_run_query_resp_to_v3_query_result(self, v4_resp):
3616 """Converts a V4 RunQueryResponse to a v3 QueryResult.
3618 Args:
3619 v4_resp: a datastore_v4_pb.QueryResult
3621 Returns:
3622 a datastore_pb.QueryResult
3624 v3_resp = self.v4_to_v3_query_result(v4_resp.batch())
3627 if v4_resp.has_query_handle():
3628 self.v4_to_v3_cursor(v4_resp.query_handle(), v3_resp.mutable_cursor())
3630 return v3_resp
3632 def v3_to_v4_run_query_resp(self, v3_resp):
3633 """Converts a v3 QueryResult to a V4 RunQueryResponse.
3635 Args:
3636 v3_resp: a datastore_pb.QueryResult
3638 Returns:
3639 a datastore_v4_pb.RunQueryResponse
3641 v4_resp = datastore_v4_pb.RunQueryResponse()
3642 self.v3_to_v4_query_result_batch(v3_resp, v4_resp.mutable_batch())
3644 if v3_resp.has_cursor():
3645 v4_resp.set_query_handle(
3646 self._query_converter.v3_to_v4_compiled_cursor(v3_resp.cursor()))
3648 return v4_resp
3653 def v4_to_v3_next_req(self, v4_req):
3654 """Converts a v4 ContinueQueryRequest to a v3 NextRequest.
3656 Args:
3657 v4_req: a datastore_v4_pb.ContinueQueryRequest
3659 Returns:
3660 a datastore_pb.NextRequest
3662 v3_req = datastore_pb.NextRequest()
3663 v3_req.set_compile(True)
3664 self.v4_to_v3_cursor(v4_req.query_handle(), v3_req.mutable_cursor())
3665 return v3_req
3667 def v3_to_v4_continue_query_resp(self, v3_resp):
3668 """Converts a v3 QueryResult to a v4 ContinueQueryResponse.
3670 Args:
3671 v3_resp: a datstore_pb.QueryResult
3673 Returns:
3674 a datastore_v4_pb.ContinueQueryResponse
3676 v4_resp = datastore_v4_pb.ContinueQueryResponse()
3677 self.v3_to_v4_query_result_batch(v3_resp, v4_resp.mutable_batch())
3678 return v4_resp
3683 def v4_to_v3_get_req(self, v4_req):
3684 """Converts a v4 LookupRequest to a v3 GetRequest.
3686 Args:
3687 v4_req: a datastore_v4_pb.LookupRequest
3689 Returns:
3690 a datastore_pb.GetRequest
3692 v3_req = datastore_pb.GetRequest()
3693 v3_req.set_allow_deferred(True)
3696 if v4_req.read_options().has_transaction():
3697 self.v4_to_v3_txn(v4_req.read_options().transaction(),
3698 v3_req.mutable_transaction())
3699 elif (v4_req.read_options().read_consistency()
3700 == datastore_v4_pb.ReadOptions.EVENTUAL):
3701 v3_req.set_strong(False)
3702 v3_req.set_failover_ms(-1)
3703 elif (v4_req.read_options().read_consistency()
3704 == datastore_v4_pb.ReadOptions.STRONG):
3705 v3_req.set_strong(True)
3707 for v4_key in v4_req.key_list():
3708 self._entity_converter.v4_to_v3_reference(v4_key, v3_req.add_key())
3710 return v3_req
3712 def v3_to_v4_lookup_resp(self, v3_resp):
3713 """Converts a v3 GetResponse to a v4 LookupResponse.
3715 Args:
3716 v3_resp: a datastore_pb.GetResponse
3718 Returns:
3719 a datastore_v4_pb.LookupResponse
3721 v4_resp = datastore_v4_pb.LookupResponse()
3723 for v3_ref in v3_resp.deferred_list():
3724 self._entity_converter.v3_to_v4_key(v3_ref, v4_resp.add_deferred())
3725 for v3_entity in v3_resp.entity_list():
3726 if v3_entity.has_entity():
3727 self._entity_converter.v3_to_v4_entity(
3728 v3_entity.entity(),
3729 v4_resp.add_found().mutable_entity())
3730 if v3_entity.has_key():
3731 self._entity_converter.v3_to_v4_key(
3732 v3_entity.key(),
3733 v4_resp.add_missing().mutable_entity().mutable_key())
3735 return v4_resp
3737 def v4_to_v3_query_result(self, v4_batch):
3738 """Converts a v4 QueryResultBatch to a v3 QueryResult.
3740 Args:
3741 v4_batch: a datastore_v4_pb.QueryResultBatch
3743 Returns:
3744 a datastore_pb.QueryResult
3746 v3_result = datastore_pb.QueryResult()
3749 v3_result.set_more_results(
3750 (v4_batch.more_results()
3751 == datastore_v4_pb.QueryResultBatch.NOT_FINISHED))
3752 if v4_batch.has_end_cursor():
3753 self._query_converter.v4_to_v3_compiled_cursor(
3754 v4_batch.end_cursor(), v3_result.mutable_compiled_cursor())
3757 if v4_batch.entity_result_type() == datastore_v4_pb.EntityResult.PROJECTION:
3758 v3_result.set_index_only(True)
3759 elif v4_batch.entity_result_type() == datastore_v4_pb.EntityResult.KEY_ONLY:
3760 v3_result.set_keys_only(True)
3763 if v4_batch.has_skipped_results():
3764 v3_result.set_skipped_results(v4_batch.skipped_results())
3765 for v4_entity in v4_batch.entity_result_list():
3766 v3_entity = v3_result.add_result()
3767 self._entity_converter.v4_to_v3_entity(v4_entity.entity(), v3_entity)
3768 if v4_batch.entity_result_type() != datastore_v4_pb.EntityResult.FULL:
3771 v3_entity.clear_entity_group()
3773 return v3_result
3775 def v3_to_v4_query_result_batch(self, v3_result, v4_batch):
3776 """Converts a v3 QueryResult to a v4 QueryResultBatch.
3778 Args:
3779 v3_result: a datastore_pb.QueryResult
3780 v4_batch: a datastore_v4_pb.QueryResultBatch to populate
3782 v4_batch.Clear()
3785 if v3_result.more_results():
3786 v4_batch.set_more_results(datastore_v4_pb.QueryResultBatch.NOT_FINISHED)
3787 else:
3788 v4_batch.set_more_results(
3789 datastore_v4_pb.QueryResultBatch.MORE_RESULTS_AFTER_LIMIT)
3790 if v3_result.has_compiled_cursor():
3791 v4_batch.set_end_cursor(
3792 self._query_converter.v3_to_v4_compiled_cursor(
3793 v3_result.compiled_cursor()))
3796 if v3_result.keys_only():
3797 v4_batch.set_entity_result_type(datastore_v4_pb.EntityResult.KEY_ONLY)
3798 elif v3_result.index_only():
3799 v4_batch.set_entity_result_type(datastore_v4_pb.EntityResult.PROJECTION)
3800 else:
3801 v4_batch.set_entity_result_type(datastore_v4_pb.EntityResult.FULL)
3804 if v3_result.has_skipped_results():
3805 v4_batch.set_skipped_results(v3_result.skipped_results())
3806 for v3_entity in v3_result.result_list():
3807 v4_entity_result = datastore_v4_pb.EntityResult()
3808 self._entity_converter.v3_to_v4_entity(v3_entity,
3809 v4_entity_result.mutable_entity())
3810 v4_batch.entity_result_list().append(v4_entity_result)
3814 __service_converter = StubServiceConverter(
3815 datastore_pbs.get_entity_converter(), __query_converter)
3818 def get_service_converter():
3819 """Returns a converter for v3 and v4 service request/response protos.
3821 This converter is suitable for use in stubs but not for production.
3823 Returns:
3824 a StubServiceConverter
3826 return __service_converter
3829 def ReverseBitsInt64(v):
3830 """Reverse the bits of a 64-bit integer.
3832 Args:
3833 v: Input integer of type 'int' or 'long'.
3835 Returns:
3836 Bit-reversed input as 'int' on 64-bit machines or as 'long' otherwise.
3839 v = ((v >> 1) & 0x5555555555555555) | ((v & 0x5555555555555555) << 1)
3840 v = ((v >> 2) & 0x3333333333333333) | ((v & 0x3333333333333333) << 2)
3841 v = ((v >> 4) & 0x0F0F0F0F0F0F0F0F) | ((v & 0x0F0F0F0F0F0F0F0F) << 4)
3842 v = ((v >> 8) & 0x00FF00FF00FF00FF) | ((v & 0x00FF00FF00FF00FF) << 8)
3843 v = ((v >> 16) & 0x0000FFFF0000FFFF) | ((v & 0x0000FFFF0000FFFF) << 16)
3844 v = int((v >> 32) | (v << 32) & 0xFFFFFFFFFFFFFFFF)
3845 return v
3848 def ToScatteredId(v):
3849 """Map counter value v to the scattered ID space.
3851 Translate to scattered ID space, then reverse bits.
3853 Args:
3854 v: Counter value from which to produce ID.
3856 Returns:
3857 Integer ID.
3859 Raises:
3860 datastore_errors.BadArgumentError if counter value exceeds the range of
3861 the scattered ID space.
3863 if v >= _MAX_SCATTERED_COUNTER:
3864 raise datastore_errors.BadArgumentError('counter value too large (%d)' %v)
3865 return _MAX_SEQUENTIAL_ID + 1 + long(ReverseBitsInt64(v << _SCATTER_SHIFT))
3868 def IdToCounter(k):
3869 """Map ID k to the counter value from which it was generated.
3871 Determine whether k is sequential or scattered ID.
3873 Args:
3874 k: ID from which to infer counter value.
3876 Returns:
3877 Tuple of integers (counter_value, id_space).
3879 if k > _MAX_SCATTERED_ID:
3880 return 0, SCATTERED
3881 elif k > _MAX_SEQUENTIAL_ID and k <= _MAX_SCATTERED_ID:
3882 return long(ReverseBitsInt64(k) >> _SCATTER_SHIFT), SCATTERED
3883 elif k > 0:
3884 return long(k), SEQUENTIAL
3885 else:
3886 raise datastore_errors.BadArgumentError('invalid id (%d)' % k)
3889 def CompareEntityPbByKey(a, b):
3890 """Compare two entity protobuf's by key.
3892 Args:
3893 a: entity_pb.EntityProto to compare
3894 b: entity_pb.EntityProto to compare
3895 Returns:
3896 <0 if a's key is before b's, =0 if they are the same key, and >0 otherwise.
3898 return cmp(datastore_types.Key._FromPb(a.key()),
3899 datastore_types.Key._FromPb(b.key()))
3902 def _GuessOrders(filters, orders):
3903 """Guess any implicit ordering.
3905 The datastore gives a logical, but not necessarily predictable, ordering when
3906 orders are not completely explicit. This function guesses at that ordering
3907 (which is better then always ordering by __key__ for tests).
3909 Args:
3910 filters: The datastore_pb.Query_Filter that have already been normalized and
3911 checked.
3912 orders: The datastore_pb.Query_Order that have already been normalized and
3913 checked. Mutated in place.
3915 orders = orders[:]
3918 if not orders:
3919 for filter_pb in filters:
3920 if filter_pb.op() in datastore_index.INEQUALITY_OPERATORS:
3922 order = datastore_pb.Query_Order()
3923 order.set_property(filter_pb.property(0).name())
3924 orders.append(order)
3925 break
3928 exists_props = (filter_pb.property(0).name() for filter_pb in filters
3929 if filter_pb.op() == datastore_pb.Query_Filter.EXISTS)
3930 for prop in sorted(exists_props):
3931 order = datastore_pb.Query_Order()
3932 order.set_property(prop)
3933 orders.append(order)
3936 if not orders or orders[-1].property() != '__key__':
3937 order = datastore_pb.Query_Order()
3938 order.set_property('__key__')
3939 orders.append(order)
3940 return orders
3943 def _MakeQuery(query, filters, orders):
3944 """Make a datastore_query.Query for the given datastore_pb.Query.
3946 Overrides filters and orders in query with the specified arguments."""
3947 clone = datastore_pb.Query()
3948 clone.CopyFrom(query)
3949 clone.clear_filter()
3950 clone.clear_order()
3951 clone.filter_list().extend(filters)
3952 clone.order_list().extend(orders)
3953 return datastore_query.Query._from_pb(clone)
3955 def _CreateIndexEntities(entity, postfix_props):
3956 """Creates entities for index values that would appear in prodcution.
3958 This function finds all multi-valued properties listed in split_props, and
3959 creates a new entity for each unique combination of values. The resulting
3960 entities will only have a single value for each property listed in
3961 split_props.
3963 It reserves the right to include index data that would not be
3964 seen in production, e.g. by returning the original entity when no splitting
3965 is needed. LoadEntity will remove any excess fields.
3967 This simulates the results seen by an index scan in the datastore.
3969 Args:
3970 entity: The entity_pb.EntityProto to split.
3971 split_props: A set of property names to split on.
3973 Returns:
3974 A list of the split entity_pb.EntityProtos.
3976 to_split = {}
3977 split_required = False
3978 base_props = []
3979 for prop in entity.property_list():
3980 if prop.name() in postfix_props:
3981 values = to_split.get(prop.name())
3982 if values is None:
3983 values = []
3984 to_split[prop.name()] = values
3985 else:
3987 split_required = True
3988 if prop.value() not in values:
3989 values.append(prop.value())
3990 else:
3991 base_props.append(prop)
3993 if not split_required:
3995 return [entity]
3997 clone = entity_pb.EntityProto()
3998 clone.CopyFrom(entity)
3999 clone.clear_property()
4000 clone.property_list().extend(base_props)
4001 results = [clone]
4003 for name, splits in to_split.iteritems():
4004 if len(splits) == 1:
4006 for result in results:
4007 prop = result.add_property()
4008 prop.set_name(name)
4009 prop.set_multiple(False)
4010 prop.set_meaning(entity_pb.Property.INDEX_VALUE)
4011 prop.mutable_value().CopyFrom(splits[0])
4012 continue
4014 new_results = []
4015 for result in results:
4016 for split in splits:
4017 clone = entity_pb.EntityProto()
4018 clone.CopyFrom(result)
4019 prop = clone.add_property()
4020 prop.set_name(name)
4021 prop.set_multiple(False)
4022 prop.set_meaning(entity_pb.Property.INDEX_VALUE)
4023 prop.mutable_value().CopyFrom(split)
4024 new_results.append(clone)
4025 results = new_results
4026 return results
4029 def _CreateIndexOnlyQueryResults(results, postfix_props):
4030 """Creates a result set similar to that returned by an index only query."""
4031 new_results = []
4032 for result in results:
4033 new_results.extend(_CreateIndexEntities(result, postfix_props))
4034 return new_results
4037 def _ExecuteQuery(results, query, filters, orders, index_list):
4038 """Executes the query on a superset of its results.
4040 Args:
4041 results: superset of results for query.
4042 query: a datastore_pb.Query.
4043 filters: the filters from query.
4044 orders: the orders from query.
4045 index_list: the list of indexes used by the query.
4047 Returns:
4048 A ListCursor over the results of applying query to results.
4050 orders = _GuessOrders(filters, orders)
4051 dsquery = _MakeQuery(query, filters, orders)
4053 if query.property_name_size():
4054 results = _CreateIndexOnlyQueryResults(
4055 results, set(order.property() for order in orders))
4057 return ListCursor(query, dsquery, orders, index_list,
4058 datastore_query.apply_query(dsquery, results))
4061 def _UpdateCost(cost, entity_writes, index_writes):
4062 """Updates the provided cost.
4064 Args:
4065 cost: Out param. The cost object to update.
4066 entity_writes: The number of entity writes to add.
4067 index_writes: The number of index writes to add.
4069 cost.set_entity_writes(cost.entity_writes() + entity_writes)
4070 cost.set_index_writes(cost.index_writes() + index_writes)
4073 def _CalculateWriteOps(composite_indexes, old_entity, new_entity):
4074 """Determines number of entity and index writes needed to write new_entity.
4076 We assume that old_entity represents the current state of the Datastore.
4078 Args:
4079 composite_indexes: The composite_indexes for the kind of the entities.
4080 old_entity: Entity representing the current state in the Datstore.
4081 new_entity: Entity representing the desired state in the Datstore.
4083 Returns:
4084 A tuple of size 2, where the first value is the number of entity writes and
4085 the second value is the number of index writes.
4087 if (old_entity is not None and
4088 old_entity.property_list() == new_entity.property_list()
4089 and old_entity.raw_property_list() == new_entity.raw_property_list()):
4090 return 0, 0
4092 index_writes = _ChangedIndexRows(composite_indexes, old_entity, new_entity)
4093 if old_entity is None:
4097 index_writes += 1
4099 return 1, index_writes
4102 def _ChangedIndexRows(composite_indexes, old_entity, new_entity):
4103 """Determine the number of index rows that need to change.
4105 We assume that old_entity represents the current state of the Datastore.
4107 Args:
4108 composite_indexes: The composite_indexes for the kind of the entities.
4109 old_entity: Entity representing the current state in the Datastore.
4110 new_entity: Entity representing the desired state in the Datastore
4112 Returns:
4113 The number of index rows that need to change.
4118 unique_old_properties = collections.defaultdict(set)
4123 unique_new_properties = collections.defaultdict(set)
4125 if old_entity is not None:
4126 for old_prop in old_entity.property_list():
4127 _PopulateUniquePropertiesSet(old_prop, unique_old_properties)
4130 unchanged = collections.defaultdict(int)
4132 for new_prop in new_entity.property_list():
4133 new_prop_as_str = _PopulateUniquePropertiesSet(
4134 new_prop, unique_new_properties)
4135 if new_prop_as_str in unique_old_properties[new_prop.name()]:
4136 unchanged[new_prop.name()] += 1
4141 all_property_names = set(unique_old_properties.iterkeys())
4142 all_property_names.update(unique_old_properties.iterkeys())
4143 all_property_names.update(unchanged.iterkeys())
4145 all_indexes = _GetEntityByPropertyIndexes(all_property_names)
4146 all_indexes.extend([comp.definition() for comp in composite_indexes])
4147 path_size = new_entity.key().path().element_size()
4148 writes = 0
4149 for index in all_indexes:
4153 ancestor_multiplier = 1
4154 if index.ancestor() and index.property_size() > 1:
4155 ancestor_multiplier = path_size
4156 writes += (_CalculateWritesForCompositeIndex(
4157 index, unique_old_properties, unique_new_properties, unchanged) *
4158 ancestor_multiplier)
4159 return writes
4162 def _PopulateUniquePropertiesSet(prop, unique_properties):
4163 """Populates a set containing unique properties.
4165 Args:
4166 prop: An entity property.
4167 unique_properties: Dictionary mapping property names to a set of unique
4168 properties.
4170 Returns:
4171 The property pb in string (hashable) form.
4173 if prop.multiple():
4174 prop = _CopyAndSetMultipleToFalse(prop)
4177 prop_as_str = prop.SerializePartialToString()
4178 unique_properties[prop.name()].add(prop_as_str)
4179 return prop_as_str
4182 def _CalculateWritesForCompositeIndex(index, unique_old_properties,
4183 unique_new_properties,
4184 common_properties):
4185 """Calculate the number of writes required to maintain a specific Index.
4187 Args:
4188 index: The composite index.
4189 unique_old_properties: Dictionary mapping property names to a set of props
4190 present on the old entity.
4191 unique_new_properties: Dictionary mapping property names to a set of props
4192 present on the new entity.
4193 common_properties: Dictionary mapping property names to the number of
4194 properties with that name that are present on both the old and new
4195 entities.
4197 Returns:
4198 The number of writes required to maintained the provided index.
4200 old_count = 1
4201 new_count = 1
4202 common_count = 1
4203 for prop in index.property_list():
4204 old_count *= len(unique_old_properties[prop.name()])
4205 new_count *= len(unique_new_properties[prop.name()])
4206 common_count *= common_properties[prop.name()]
4208 return (old_count - common_count) + (new_count - common_count)
4211 def _GetEntityByPropertyIndexes(all_property_names):
4212 indexes = []
4213 for prop_name in all_property_names:
4214 indexes.append(
4215 _SinglePropertyIndex(prop_name, entity_pb.Index_Property.ASCENDING))
4216 indexes.append(
4217 _SinglePropertyIndex(prop_name, entity_pb.Index_Property.DESCENDING))
4218 return indexes
4221 def _SinglePropertyIndex(prop_name, direction):
4222 """Creates a single property Index for the given name and direction.
4224 Args:
4225 prop_name: The name of the single property on the Index.
4226 direction: The direction of the Index.
4228 Returns:
4229 A single property Index with the given property and direction.
4231 index = entity_pb.Index()
4232 prop = index.add_property()
4233 prop.set_name(prop_name)
4234 prop.set_direction(direction)
4235 return index
4238 def _CopyAndSetMultipleToFalse(prop):
4239 """Copy the provided Property and set its "multiple" attribute to False.
4241 Args:
4242 prop: The Property to copy.
4244 Returns:
4245 A copy of the given Property with its "multiple" attribute set to False.
4252 prop_copy = entity_pb.Property()
4253 prop_copy.MergeFrom(prop)
4254 prop_copy.set_multiple(False)
4255 return prop_copy