App Engine Python SDK version 1.8.8
[gae.git] / python / google / appengine / datastore / datastore_stub_util.py
blobb0f439fe66c91cae9d1a0f317afe69142edf3e85
1 #!/usr/bin/env python
3 # Copyright 2007 Google Inc.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
21 """Utility functions shared between the file and sqlite datastore stubs.
23 This module is internal and should not be used by client applications.
24 """
33 try:
34 import hashlib
35 _MD5_FUNC = hashlib.md5
36 except ImportError:
37 import md5
38 _MD5_FUNC = md5.new
40 import collections
41 import itertools
42 import logging
43 import os
44 import random
45 import struct
46 import time
47 import threading
48 import weakref
49 import atexit
51 from google.appengine.api import api_base_pb
52 from google.appengine.api import apiproxy_stub_map
53 from google.appengine.api import datastore_admin
54 from google.appengine.api import datastore_errors
55 from google.appengine.api import datastore_types
56 from google.appengine.api.taskqueue import taskqueue_service_pb
57 from google.appengine.datastore import datastore_index
58 from google.appengine.datastore import datastore_stub_index
59 from google.appengine.datastore import datastore_pb
60 from google.appengine.datastore import datastore_pbs
61 from google.appengine.datastore import datastore_query
62 from google.appengine.datastore import datastore_v4_pb
63 from google.appengine.runtime import apiproxy_errors
64 from google.appengine.datastore import entity_pb
69 _MAXIMUM_RESULTS = 300
75 _MAXIMUM_QUERY_RESULT_BYTES = 2000000
81 _MAX_QUERY_OFFSET = 1000
85 _PROPERTY_TYPE_NAMES = {
86 0: 'NULL',
87 entity_pb.PropertyValue.kint64Value: 'INT64',
88 entity_pb.PropertyValue.kbooleanValue: 'BOOLEAN',
89 entity_pb.PropertyValue.kstringValue: 'STRING',
90 entity_pb.PropertyValue.kdoubleValue: 'DOUBLE',
91 entity_pb.PropertyValue.kPointValueGroup: 'POINT',
92 entity_pb.PropertyValue.kUserValueGroup: 'USER',
93 entity_pb.PropertyValue.kReferenceValueGroup: 'REFERENCE'
98 _SCATTER_PROPORTION = 32768
103 _MAX_EG_PER_TXN = 5
108 _BLOB_MEANINGS = frozenset((entity_pb.Property.BLOB,
109 entity_pb.Property.ENTITY_PROTO,
110 entity_pb.Property.TEXT))
118 _RETRIES = 3
122 _INITIAL_RETRY_DELAY_MS = 100
126 _RETRY_DELAY_MULTIPLIER = 2
130 _MAX_RETRY_DELAY_MS = 120000
135 SEQUENTIAL = 'sequential'
136 SCATTERED = 'scattered'
142 _MAX_SEQUENTIAL_BIT = 52
147 _MAX_SEQUENTIAL_COUNTER = (1 << _MAX_SEQUENTIAL_BIT) - 1
151 _MAX_SEQUENTIAL_ID = _MAX_SEQUENTIAL_COUNTER
156 _MAX_SCATTERED_COUNTER = (1 << (_MAX_SEQUENTIAL_BIT - 1)) - 1
162 _MAX_SCATTERED_ID = _MAX_SEQUENTIAL_ID + 1 + _MAX_SCATTERED_COUNTER
166 _SCATTER_SHIFT = 64 - _MAX_SEQUENTIAL_BIT + 1
169 def _GetScatterProperty(entity_proto):
170 """Gets the scatter property for an object.
172 For ease of implementation, this is not synchronized with the actual
173 value on the App Engine server, but should work equally well.
175 Note: This property may change, either here or in production. No client
176 other than the mapper framework should rely on it directly.
178 Returns:
179 The PropertyValue of the scatter property or None if this entity should not
180 have a scatter property.
182 hash_obj = _MD5_FUNC()
183 for element in entity_proto.key().path().element_list():
184 if element.has_name():
185 hash_obj.update(element.name())
186 elif element.has_id():
187 hash_obj.update(str(element.id()))
188 hash_bytes = hash_obj.digest()[0:2]
189 (hash_int,) = struct.unpack('H', hash_bytes)
191 if hash_int >= _SCATTER_PROPORTION:
192 return None
194 scatter_property = entity_pb.Property()
195 scatter_property.set_name(datastore_types.SCATTER_SPECIAL_PROPERTY)
196 scatter_property.set_meaning(entity_pb.Property.BYTESTRING)
197 scatter_property.set_multiple(False)
198 property_value = scatter_property.mutable_value()
199 property_value.set_stringvalue(hash_bytes)
200 return scatter_property
206 _SPECIAL_PROPERTY_MAP = {
207 datastore_types.SCATTER_SPECIAL_PROPERTY: (False, True, _GetScatterProperty)
211 def GetInvisibleSpecialPropertyNames():
212 """Gets the names of all non user-visible special properties."""
213 invisible_names = []
214 for name, value in _SPECIAL_PROPERTY_MAP.items():
215 is_visible, _, _ = value
216 if not is_visible:
217 invisible_names.append(name)
218 return invisible_names
221 def _PrepareSpecialProperties(entity_proto, is_load):
222 """Computes special properties for loading or storing.
223 Strips other special properties."""
224 for i in xrange(entity_proto.property_size() - 1, -1, -1):
225 if _SPECIAL_PROPERTY_MAP.has_key(entity_proto.property(i).name()):
226 del entity_proto.property_list()[i]
228 for is_visible, is_stored, property_func in _SPECIAL_PROPERTY_MAP.values():
229 if is_load:
230 should_process = is_visible
231 else:
232 should_process = is_stored
234 if should_process:
235 special_property = property_func(entity_proto)
236 if special_property:
237 entity_proto.property_list().append(special_property)
240 def _GetGroupByKey(entity, property_names):
241 """Computes a key value that uniquely identifies the 'group' of an entity.
243 Args:
244 entity: The entity_pb.EntityProto for which to create the group key.
245 property_names: The names of the properties in the group by clause.
247 Returns:
248 A hashable value that uniquely identifies the entity's 'group'.
250 return frozenset((prop.name(), prop.value().SerializeToString())
251 for prop in entity.property_list()
252 if prop.name() in property_names)
255 def PrepareSpecialPropertiesForStore(entity_proto):
256 """Computes special properties for storing.
257 Strips other special properties."""
258 _PrepareSpecialProperties(entity_proto, False)
261 def LoadEntity(entity, keys_only=False, property_names=None):
262 """Prepares an entity to be returned to the user.
264 Args:
265 entity: a entity_pb.EntityProto or None
266 keys_only: if a keys only result should be produced
267 property_names: if not None or empty, cause a projected entity
268 to be produced with the given properties.
270 Returns:
271 A user friendly copy of entity or None.
273 if entity:
274 clone = entity_pb.EntityProto()
275 if property_names:
277 clone.mutable_key().CopyFrom(entity.key())
278 clone.mutable_entity_group()
279 seen = set()
280 for prop in entity.property_list():
281 if prop.name() in property_names:
283 Check(prop.name() not in seen,
284 "datastore dev stub produced bad result",
285 datastore_pb.Error.INTERNAL_ERROR)
286 seen.add(prop.name())
287 new_prop = clone.add_property()
288 new_prop.set_name(prop.name())
289 new_prop.set_meaning(entity_pb.Property.INDEX_VALUE)
290 new_prop.mutable_value().CopyFrom(prop.value())
291 new_prop.set_multiple(False)
292 elif keys_only:
294 clone.mutable_key().CopyFrom(entity.key())
295 clone.mutable_entity_group()
296 else:
298 clone.CopyFrom(entity)
299 PrepareSpecialPropertiesForLoad(clone)
300 return clone
303 def StoreEntity(entity):
304 """Prepares an entity for storing.
306 Args:
307 entity: a entity_pb.EntityProto to prepare
309 Returns:
310 A copy of entity that should be stored in its place.
312 clone = entity_pb.EntityProto()
313 clone.CopyFrom(entity)
317 PrepareSpecialPropertiesForStore(clone)
318 return clone
321 def PrepareSpecialPropertiesForLoad(entity_proto):
322 """Computes special properties that are user-visible.
323 Strips other special properties."""
324 _PrepareSpecialProperties(entity_proto, True)
327 def Check(test, msg='', error_code=datastore_pb.Error.BAD_REQUEST):
328 """Raises an apiproxy_errors.ApplicationError if the condition is false.
330 Args:
331 test: A condition to test.
332 msg: A string to return with the error.
333 error_code: One of datastore_pb.Error to use as an error code.
335 Raises:
336 apiproxy_errors.ApplicationError: If test is false.
338 if not test:
339 raise apiproxy_errors.ApplicationError(error_code, msg)
342 def CheckValidUTF8(string, desc):
343 """Check that the given string is valid UTF-8.
345 Args:
346 string: the string to validate.
347 desc: a description of the string being validated.
349 Raises:
350 apiproxy_errors.ApplicationError: if the string is not valid UTF-8.
352 try:
353 string.decode('utf-8')
354 except UnicodeDecodeError:
355 Check(False, '%s is not valid UTF-8.' % desc)
358 def CheckAppId(request_trusted, request_app_id, app_id):
359 """Check that this is the stub for app_id.
361 Args:
362 request_trusted: If the request is trusted.
363 request_app_id: The application ID of the app making the request.
364 app_id: An application ID.
366 Raises:
367 apiproxy_errors.ApplicationError: if this is not the stub for app_id.
370 assert app_id
371 CheckValidUTF8(app_id, "app id");
372 Check(request_trusted or app_id == request_app_id,
373 'app "%s" cannot access app "%s"\'s data' % (request_app_id, app_id))
376 def CheckReference(request_trusted,
377 request_app_id,
378 key,
379 require_id_or_name=True):
380 """Check this key.
382 Args:
383 request_trusted: If the request is trusted.
384 request_app_id: The application ID of the app making the request.
385 key: entity_pb.Reference
386 require_id_or_name: Boolean indicating if we should enforce the presence of
387 an id or name in the last element of the key's path.
389 Raises:
390 apiproxy_errors.ApplicationError: if the key is invalid
393 assert isinstance(key, entity_pb.Reference)
395 CheckAppId(request_trusted, request_app_id, key.app())
397 Check(key.path().element_size() > 0, 'key\'s path cannot be empty')
399 if require_id_or_name:
401 last_element = key.path().element_list()[-1]
402 has_id_or_name = ((last_element.has_id() and last_element.id() != 0) or
403 (last_element.has_name() and last_element.name() != ""))
404 if not has_id_or_name:
405 raise datastore_errors.BadRequestError('missing key id/name')
407 for elem in key.path().element_list():
408 Check(not elem.has_id() or not elem.has_name(),
409 'each key path element should have id or name but not both: %r' % key)
410 CheckValidUTF8(elem.type(), 'key path element type')
411 if elem.has_name():
412 CheckValidUTF8(elem.name(), 'key path element name')
415 def CheckEntity(request_trusted, request_app_id, entity):
416 """Check if this entity can be stored.
418 Args:
419 request_trusted: If the request is trusted.
420 request_app_id: The application ID of the app making the request.
421 entity: entity_pb.EntityProto
423 Raises:
424 apiproxy_errors.ApplicationError: if the entity is invalid
428 CheckReference(request_trusted, request_app_id, entity.key(), False)
429 for prop in entity.property_list():
430 CheckProperty(request_trusted, request_app_id, prop)
431 for prop in entity.raw_property_list():
432 CheckProperty(request_trusted, request_app_id, prop, indexed=False)
435 def CheckProperty(request_trusted, request_app_id, prop, indexed=True):
436 """Check if this property can be stored.
438 Args:
439 request_trusted: If the request is trusted.
440 request_app_id: The application ID of the app making the request.
441 prop: entity_pb.Property
442 indexed: Whether the property is indexed.
444 Raises:
445 apiproxy_errors.ApplicationError: if the property is invalid
447 name = prop.name()
448 value = prop.value()
449 meaning = prop.meaning()
450 CheckValidUTF8(name, 'property name')
451 Check(request_trusted or
452 not datastore_types.RESERVED_PROPERTY_NAME.match(name),
453 'cannot store entity with reserved property name \'%s\'' % name)
454 Check(prop.meaning() != entity_pb.Property.INDEX_VALUE,
455 'Entities with incomplete properties cannot be written.')
456 is_blob = meaning in _BLOB_MEANINGS
457 if indexed:
458 Check(not is_blob,
459 'BLOB, ENITY_PROTO or TEXT property ' + name +
460 ' must be in a raw_property field')
461 max_length = datastore_types._MAX_STRING_LENGTH
462 else:
463 if is_blob:
464 Check(value.has_stringvalue(),
465 'BLOB / ENTITY_PROTO / TEXT raw property ' + name +
466 'must have a string value')
467 max_length = datastore_types._MAX_RAW_PROPERTY_BYTES
468 if meaning == entity_pb.Property.ATOM_LINK:
469 max_length = datastore_types._MAX_LINK_PROPERTY_LENGTH
471 CheckPropertyValue(name, value, max_length, meaning)
474 def CheckPropertyValue(name, value, max_length, meaning):
475 """Check if this property value can be stored.
477 Args:
478 name: name of the property
479 value: entity_pb.PropertyValue
480 max_length: maximum length for string values
481 meaning: meaning of the property
483 Raises:
484 apiproxy_errors.ApplicationError: if the property is invalid
486 num_values = (value.has_int64value() +
487 value.has_stringvalue() +
488 value.has_booleanvalue() +
489 value.has_doublevalue() +
490 value.has_pointvalue() +
491 value.has_uservalue() +
492 value.has_referencevalue())
493 Check(num_values <= 1, 'PropertyValue for ' + name +
494 ' has multiple value fields set')
496 if value.has_stringvalue():
504 s16 = value.stringvalue().decode('utf-8', 'replace').encode('utf-16')
506 Check((len(s16) - 2) / 2 <= max_length,
507 'Property %s is too long. Maximum length is %d.' % (name, max_length))
508 if (meaning not in _BLOB_MEANINGS and
509 meaning != entity_pb.Property.BYTESTRING):
510 CheckValidUTF8(value.stringvalue(), 'String property value')
513 def CheckTransaction(request_trusted, request_app_id, transaction):
514 """Check that this transaction is valid.
516 Args:
517 request_trusted: If the request is trusted.
518 request_app_id: The application ID of the app making the request.
519 transaction: datastore_pb.Transaction
521 Raises:
522 apiproxy_errors.ApplicationError: if the transaction is not valid.
524 assert isinstance(transaction, datastore_pb.Transaction)
525 CheckAppId(request_trusted, request_app_id, transaction.app())
528 def CheckQuery(query, filters, orders, max_query_components):
529 """Check a datastore query with normalized filters, orders.
531 Raises an ApplicationError when any of the following conditions are violated:
532 - transactional queries have an ancestor
533 - queries that are not too large
534 (sum of filters, orders, ancestor <= max_query_components)
535 - ancestor (if any) app and namespace match query app and namespace
536 - kindless queries only filter on __key__ and only sort on __key__ ascending
537 - multiple inequality (<, <=, >, >=) filters all applied to the same property
538 - filters on __key__ compare to a reference in the same app and namespace as
539 the query
540 - if an inequality filter on prop X is used, the first order (if any) must
541 be on X
543 Args:
544 query: query to validate
545 filters: normalized (by datastore_index.Normalize) filters from query
546 orders: normalized (by datastore_index.Normalize) orders from query
547 max_query_components: limit on query complexity
549 Check(query.property_name_size() == 0 or not query.keys_only(),
550 'projection and keys_only cannot both be set')
552 projected_properties = set(query.property_name_list())
553 for prop_name in query.property_name_list():
554 Check(not datastore_types.RESERVED_PROPERTY_NAME.match(prop_name),
555 'projections are not supported for the property: ' + prop_name)
556 Check(len(projected_properties) == len(query.property_name_list()),
557 "cannot project a property multiple times")
559 key_prop_name = datastore_types.KEY_SPECIAL_PROPERTY
560 unapplied_log_timestamp_us_name = (
561 datastore_types._UNAPPLIED_LOG_TIMESTAMP_SPECIAL_PROPERTY)
563 if query.has_transaction():
565 Check(query.has_ancestor(),
566 'Only ancestor queries are allowed inside transactions.')
569 num_components = len(filters) + len(orders)
570 if query.has_ancestor():
571 num_components += 1
572 Check(num_components <= max_query_components,
573 'query is too large. may not have more than %s filters'
574 ' + sort orders ancestor total' % max_query_components)
577 if query.has_ancestor():
578 ancestor = query.ancestor()
579 Check(query.app() == ancestor.app(),
580 'query app is %s but ancestor app is %s' %
581 (query.app(), ancestor.app()))
582 Check(query.name_space() == ancestor.name_space(),
583 'query namespace is %s but ancestor namespace is %s' %
584 (query.name_space(), ancestor.name_space()))
587 if query.group_by_property_name_size():
588 group_by_set = set(query.group_by_property_name_list())
589 for order in orders:
590 if not group_by_set:
591 break
592 Check(order.property() in group_by_set,
593 'items in the group by clause must be specified first '
594 'in the ordering')
595 group_by_set.remove(order.property())
599 ineq_prop_name = None
600 for filter in filters:
601 Check(filter.property_size() == 1,
602 'Filter has %d properties, expected 1' % filter.property_size())
604 prop = filter.property(0)
605 prop_name = prop.name().decode('utf-8')
607 if prop_name == key_prop_name:
611 Check(prop.value().has_referencevalue(),
612 '%s filter value must be a Key' % key_prop_name)
613 ref_val = prop.value().referencevalue()
614 Check(ref_val.app() == query.app(),
615 '%s filter app is %s but query app is %s' %
616 (key_prop_name, ref_val.app(), query.app()))
617 Check(ref_val.name_space() == query.name_space(),
618 '%s filter namespace is %s but query namespace is %s' %
619 (key_prop_name, ref_val.name_space(), query.name_space()))
621 if filter.op() in datastore_index.EQUALITY_OPERATORS:
622 Check(prop_name not in projected_properties,
623 'cannot use projection on a property with an equality filter')
624 if (filter.op() in datastore_index.INEQUALITY_OPERATORS and
625 prop_name != unapplied_log_timestamp_us_name):
626 if ineq_prop_name is None:
627 ineq_prop_name = prop_name
628 else:
629 Check(ineq_prop_name == prop_name,
630 'Only one inequality filter per query is supported. '
631 'Encountered both %s and %s' % (ineq_prop_name, prop_name))
633 if (ineq_prop_name is not None
634 and query.group_by_property_name_size() > 0
635 and not orders):
637 Check(ineq_prop_name in group_by_set,
638 'Inequality filter on %s must also be a group by '
639 'property when group by properties are set.'
640 % (ineq_prop_name))
642 if ineq_prop_name is not None and orders:
644 first_order_prop = orders[0].property().decode('utf-8')
645 Check(first_order_prop == ineq_prop_name,
646 'The first sort property must be the same as the property '
647 'to which the inequality filter is applied. In your query '
648 'the first sort property is %s but the inequality filter '
649 'is on %s' % (first_order_prop, ineq_prop_name))
651 if not query.has_kind():
653 for filter in filters:
654 prop_name = filter.property(0).name().decode('utf-8')
655 Check(prop_name == key_prop_name or
656 prop_name == unapplied_log_timestamp_us_name,
657 'kind is required for non-__key__ filters')
658 for order in orders:
659 prop_name = order.property().decode('utf-8')
660 Check(prop_name == key_prop_name and
661 order.direction() is datastore_pb.Query_Order.ASCENDING,
662 'kind is required for all orders except __key__ ascending')
665 class ValueRange(object):
666 """A range of values defined by its two extremes (inclusive or exclusive)."""
668 def __init__(self):
669 """Constructor.
671 Creates an unlimited range.
673 self.__start = self.__end = None
674 self.__start_inclusive = self.__end_inclusive = False
676 def Update(self, rel_op, limit):
677 """Filter the range by 'rel_op limit'.
679 Args:
680 rel_op: relational operator from datastore_pb.Query_Filter.
681 limit: the value to limit the range by.
684 if rel_op == datastore_pb.Query_Filter.LESS_THAN:
685 if self.__end is None or limit <= self.__end:
686 self.__end = limit
687 self.__end_inclusive = False
688 elif (rel_op == datastore_pb.Query_Filter.LESS_THAN_OR_EQUAL or
689 rel_op == datastore_pb.Query_Filter.EQUAL):
690 if self.__end is None or limit < self.__end:
691 self.__end = limit
692 self.__end_inclusive = True
694 if rel_op == datastore_pb.Query_Filter.GREATER_THAN:
695 if self.__start is None or limit >= self.__start:
696 self.__start = limit
697 self.__start_inclusive = False
698 elif (rel_op == datastore_pb.Query_Filter.GREATER_THAN_OR_EQUAL or
699 rel_op == datastore_pb.Query_Filter.EQUAL):
700 if self.__start is None or limit > self.__start:
701 self.__start = limit
702 self.__start_inclusive = True
704 def Contains(self, value):
705 """Check if the range contains a specific value.
707 Args:
708 value: the value to check.
709 Returns:
710 True iff value is contained in this range.
712 if self.__start is not None:
713 if self.__start_inclusive and value < self.__start: return False
714 if not self.__start_inclusive and value <= self.__start: return False
715 if self.__end is not None:
716 if self.__end_inclusive and value > self.__end: return False
717 if not self.__end_inclusive and value >= self.__end: return False
718 return True
720 def Remap(self, mapper):
721 """Transforms the range extremes with a function.
723 The function mapper must preserve order, i.e.
724 x rel_op y iff mapper(x) rel_op y
726 Args:
727 mapper: function to apply to the range extremes.
729 self.__start = self.__start and mapper(self.__start)
730 self.__end = self.__end and mapper(self.__end)
732 def MapExtremes(self, mapper):
733 """Evaluate a function on the range extremes.
735 Args:
736 mapper: function to apply to the range extremes.
737 Returns:
738 (x, y) where x = None if the range has no start,
739 mapper(start, start_inclusive, False) otherwise
740 y = None if the range has no end,
741 mapper(end, end_inclusive, True) otherwise
743 return (
744 self.__start and mapper(self.__start, self.__start_inclusive, False),
745 self.__end and mapper(self.__end, self.__end_inclusive, True))
748 def ParseKeyFilteredQuery(filters, orders):
749 """Parse queries which only allow filters and ascending-orders on __key__.
751 Raises exceptions for illegal queries.
752 Args:
753 filters: the normalized filters of a query.
754 orders: the normalized orders of a query.
755 Returns:
756 The key range (a ValueRange over datastore_types.Key) requested in the
757 query.
760 remaining_filters = []
761 key_range = ValueRange()
762 key_prop = datastore_types.KEY_SPECIAL_PROPERTY
763 for f in filters:
764 op = f.op()
765 if not (f.property_size() == 1 and
766 f.property(0).name() == key_prop and
767 not (op == datastore_pb.Query_Filter.IN or
768 op == datastore_pb.Query_Filter.EXISTS)):
769 remaining_filters.append(f)
770 continue
772 val = f.property(0).value()
773 Check(val.has_referencevalue(), '__key__ kind must be compared to a key')
774 limit = datastore_types.FromReferenceProperty(val)
775 key_range.Update(op, limit)
778 remaining_orders = []
779 for o in orders:
780 if not (o.direction() == datastore_pb.Query_Order.ASCENDING and
781 o.property() == datastore_types.KEY_SPECIAL_PROPERTY):
782 remaining_orders.append(o)
783 else:
784 break
788 Check(not remaining_filters,
789 'Only comparison filters on ' + key_prop + ' supported')
790 Check(not remaining_orders,
791 'Only ascending order on ' + key_prop + ' supported')
793 return key_range
796 def ParseKindQuery(query, filters, orders):
797 """Parse __kind__ (schema) queries.
799 Raises exceptions for illegal queries.
800 Args:
801 query: A Query PB.
802 filters: the normalized filters from query.
803 orders: the normalized orders from query.
804 Returns:
805 The kind range (a ValueRange over string) requested in the query.
808 Check(not query.has_ancestor(), 'ancestor queries on __kind__ not allowed')
810 key_range = ParseKeyFilteredQuery(filters, orders)
811 key_range.Remap(_KindKeyToString)
813 return key_range
816 def _KindKeyToString(key):
817 """Extract kind name from __kind__ key.
819 Raises an ApplicationError if the key is not of the form '__kind__'/name.
821 Args:
822 key: a key for a __kind__ instance.
823 Returns:
824 kind specified by key.
826 key_path = key.to_path()
827 if (len(key_path) == 2 and key_path[0] == '__kind__' and
828 isinstance(key_path[1], basestring)):
829 return key_path[1]
830 Check(False, 'invalid Key for __kind__ table')
833 def ParseNamespaceQuery(query, filters, orders):
834 """Parse __namespace__ queries.
836 Raises exceptions for illegal queries.
837 Args:
838 query: A Query PB.
839 filters: the normalized filters from query.
840 orders: the normalized orders from query.
841 Returns:
842 The kind range (a ValueRange over string) requested in the query.
845 Check(not query.has_ancestor(),
846 'ancestor queries on __namespace__ not allowed')
848 key_range = ParseKeyFilteredQuery(filters, orders)
849 key_range.Remap(_NamespaceKeyToString)
851 return key_range
854 def _NamespaceKeyToString(key):
855 """Extract namespace name from __namespace__ key.
857 Raises an ApplicationError if the key is not of the form '__namespace__'/name
858 or '__namespace__'/_EMPTY_NAMESPACE_ID.
860 Args:
861 key: a key for a __namespace__ instance.
862 Returns:
863 namespace specified by key.
865 key_path = key.to_path()
866 if len(key_path) == 2 and key_path[0] == '__namespace__':
867 if key_path[1] == datastore_types._EMPTY_NAMESPACE_ID:
868 return ''
869 if isinstance(key_path[1], basestring):
870 return key_path[1]
871 Check(False, 'invalid Key for __namespace__ table')
874 def ParsePropertyQuery(query, filters, orders):
875 """Parse __property__ queries.
877 Raises exceptions for illegal queries.
878 Args:
879 query: A Query PB.
880 filters: the normalized filters from query.
881 orders: the normalized orders from query.
882 Returns:
883 The kind range (a ValueRange over (kind, property) pairs) requested
884 in the query.
887 Check(not query.has_transaction(),
888 'transactional queries on __property__ not allowed')
890 key_range = ParseKeyFilteredQuery(filters, orders)
891 key_range.Remap(lambda x: _PropertyKeyToString(x, ''))
893 if query.has_ancestor():
894 ancestor = datastore_types.Key._FromPb(query.ancestor())
895 ancestor_kind, ancestor_property = _PropertyKeyToString(ancestor, None)
898 if ancestor_property is not None:
899 key_range.Update(datastore_pb.Query_Filter.EQUAL,
900 (ancestor_kind, ancestor_property))
901 else:
903 key_range.Update(datastore_pb.Query_Filter.GREATER_THAN_OR_EQUAL,
904 (ancestor_kind, ''))
905 key_range.Update(datastore_pb.Query_Filter.LESS_THAN_OR_EQUAL,
906 (ancestor_kind + '\0', ''))
907 query.clear_ancestor()
909 return key_range
912 def _PropertyKeyToString(key, default_property):
913 """Extract property name from __property__ key.
915 Raises an ApplicationError if the key is not of the form
916 '__kind__'/kind, '__property__'/property or '__kind__'/kind
918 Args:
919 key: a key for a __property__ instance.
920 default_property: property value to return when key only has a kind.
921 Returns:
922 kind, property if key = '__kind__'/kind, '__property__'/property
923 kind, default_property if key = '__kind__'/kind
925 key_path = key.to_path()
926 if (len(key_path) == 2 and
927 key_path[0] == '__kind__' and isinstance(key_path[1], basestring)):
928 return (key_path[1], default_property)
929 if (len(key_path) == 4 and
930 key_path[0] == '__kind__' and isinstance(key_path[1], basestring) and
931 key_path[2] == '__property__' and isinstance(key_path[3], basestring)):
932 return (key_path[1], key_path[3])
934 Check(False, 'invalid Key for __property__ table')
937 def SynthesizeUserId(email):
938 """Return a synthetic user ID from an email address.
940 Note that this is not the same user ID found in the production system.
942 Args:
943 email: An email address.
945 Returns:
946 A string userid derived from the email address.
949 user_id_digest = _MD5_FUNC(email.lower()).digest()
950 user_id = '1' + ''.join(['%02d' % ord(x) for x in user_id_digest])[:20]
951 return user_id
954 def FillUsersInQuery(filters):
955 """Fill in a synthetic user ID for all user properties in a set of filters.
957 Args:
958 filters: The normalized filters from query.
960 for filter in filters:
961 for property in filter.property_list():
962 FillUser(property)
965 def FillUser(property):
966 """Fill in a synthetic user ID for a user properties.
968 Args:
969 property: A Property which may have a user value.
971 if property.value().has_uservalue():
972 uid = SynthesizeUserId(property.value().uservalue().email())
973 if uid:
974 property.mutable_value().mutable_uservalue().set_obfuscated_gaiaid(uid)
977 class BaseCursor(object):
978 """A base query cursor over a list of entities.
980 Public properties:
981 cursor: the integer cursor.
982 app: the app for which this cursor was created.
983 keys_only: whether the query is keys_only.
985 Class attributes:
986 _next_cursor: the next cursor to allocate.
987 _next_cursor_lock: protects _next_cursor.
989 _next_cursor = 1
990 _next_cursor_lock = threading.Lock()
992 def __init__(self, query, dsquery, orders, index_list):
993 """Constructor.
995 Args:
996 query: the query request proto.
997 dsquery: a datastore_query.Query over query.
998 orders: the orders of query as returned by _GuessOrders.
999 index_list: the list of indexes used by the query.
1002 self.keys_only = query.keys_only()
1003 self.property_names = set(query.property_name_list())
1004 self.group_by = set(query.group_by_property_name_list())
1005 self.app = query.app()
1006 self.cursor = self._AcquireCursorID()
1008 self.__order_compare_entities = dsquery._order.cmp_for_filter(
1009 dsquery._filter_predicate)
1010 if self.group_by:
1011 self.__cursor_properties = self.group_by
1012 else:
1013 self.__cursor_properties = set(order.property() for order in orders)
1014 self.__cursor_properties.add('__key__')
1015 self.__cursor_properties = frozenset(self.__cursor_properties)
1016 self.__index_list = index_list
1018 def _PopulateResultMetadata(self, query_result, compile,
1019 first_result, last_result):
1020 query_result.set_keys_only(self.keys_only)
1021 if query_result.more_results():
1022 cursor = query_result.mutable_cursor()
1023 cursor.set_app(self.app)
1024 cursor.set_cursor(self.cursor)
1025 if compile:
1026 self._EncodeCompiledCursor(last_result,
1027 query_result.mutable_compiled_cursor())
1028 if first_result:
1029 query_result.index_list().extend(self.__index_list)
1031 @classmethod
1032 def _AcquireCursorID(cls):
1033 """Acquires the next cursor id in a thread safe manner."""
1034 cls._next_cursor_lock.acquire()
1035 try:
1036 cursor_id = cls._next_cursor
1037 cls._next_cursor += 1
1038 finally:
1039 cls._next_cursor_lock.release()
1040 return cursor_id
1042 def _IsBeforeCursor(self, entity, cursor):
1043 """True if entity is before cursor according to the current order.
1045 Args:
1046 entity: a entity_pb.EntityProto entity.
1047 cursor: a compiled cursor as returned by _DecodeCompiledCursor.
1049 comparison_entity = entity_pb.EntityProto()
1050 for prop in entity.property_list():
1051 if prop.name() in self.__cursor_properties:
1052 comparison_entity.add_property().MergeFrom(prop)
1053 if cursor[0].has_key():
1054 comparison_entity.mutable_key().MergeFrom(entity.key())
1055 x = self.__order_compare_entities(comparison_entity, cursor[0])
1056 if cursor[1]:
1057 return x < 0
1058 else:
1059 return x <= 0
1061 def _DecodeCompiledCursor(self, compiled_cursor):
1062 """Converts a compiled_cursor into a cursor_entity.
1064 Args:
1065 compiled_cursor: The datastore_pb.CompiledCursor to decode.
1067 Returns:
1068 (cursor_entity, inclusive): a entity_pb.EntityProto and if it should
1069 be included in the result set.
1071 assert len(compiled_cursor.position_list()) == 1
1073 position = compiled_cursor.position(0)
1078 remaining_properties = set(self.__cursor_properties)
1080 cursor_entity = entity_pb.EntityProto()
1081 if position.has_key():
1082 cursor_entity.mutable_key().CopyFrom(position.key())
1083 try:
1084 remaining_properties.remove('__key__')
1085 except KeyError:
1086 Check(False, 'Cursor does not match query: extra value __key__')
1087 for indexvalue in position.indexvalue_list():
1088 property = cursor_entity.add_property()
1089 property.set_name(indexvalue.property())
1090 property.mutable_value().CopyFrom(indexvalue.value())
1091 try:
1092 remaining_properties.remove(indexvalue.property())
1093 except KeyError:
1094 Check(False, 'Cursor does not match query: extra value %s' %
1095 indexvalue.property())
1096 Check(not remaining_properties,
1097 'Cursor does not match query: missing values for %r' %
1098 remaining_properties)
1100 return (cursor_entity, position.start_inclusive())
1102 def _EncodeCompiledCursor(self, last_result, compiled_cursor):
1103 """Converts the current state of the cursor into a compiled_cursor.
1105 Args:
1106 last_result: the last result returned by this query.
1107 compiled_cursor: an empty datstore_pb.CompiledCursor.
1109 if last_result is not None:
1112 position = compiled_cursor.add_position()
1115 if '__key__' in self.__cursor_properties:
1116 position.mutable_key().MergeFrom(last_result.key())
1117 for prop in last_result.property_list():
1118 if prop.name() in self.__cursor_properties:
1119 indexvalue = position.add_indexvalue()
1120 indexvalue.set_property(prop.name())
1121 indexvalue.mutable_value().CopyFrom(prop.value())
1122 position.set_start_inclusive(False)
1125 class IteratorCursor(BaseCursor):
1126 """A query cursor over an entity iterator."""
1128 def __init__(self, query, dsquery, orders, index_list, results):
1129 """Constructor.
1131 Args:
1132 query: the query request proto
1133 dsquery: a datastore_query.Query over query.
1134 orders: the orders of query as returned by _GuessOrders.
1135 index_list: A list of indexes used by the query.
1136 results: iterator over entity_pb.EntityProto
1138 super(IteratorCursor, self).__init__(query, dsquery, orders, index_list)
1140 self.__last_result = None
1141 self.__next_result = None
1142 self.__results = results
1143 self.__distincts = set()
1144 self.__done = False
1147 if query.has_end_compiled_cursor():
1148 if query.end_compiled_cursor().position_list():
1149 self.__end_cursor = self._DecodeCompiledCursor(
1150 query.end_compiled_cursor())
1151 else:
1152 self.__done = True
1153 else:
1154 self.__end_cursor = None
1156 if query.has_compiled_cursor() and query.compiled_cursor().position_list():
1157 start_cursor = self._DecodeCompiledCursor(query.compiled_cursor())
1158 self.__last_result = start_cursor[0]
1159 try:
1160 self._Advance()
1161 while self._IsBeforeCursor(self.__next_result, start_cursor):
1162 self._Advance()
1163 except StopIteration:
1164 pass
1167 self.__offset = 0
1168 self.__limit = None
1169 if query.has_limit():
1170 limit = query.limit()
1171 if query.offset():
1172 limit += query.offset()
1173 if limit >= 0:
1174 self.__limit = limit
1176 def _Done(self):
1177 self.__done = True
1178 self.__next_result = None
1179 raise StopIteration
1181 def _Advance(self):
1182 """Advance to next result (handles end cursor, ignores limit)."""
1183 if self.__done:
1184 raise StopIteration
1185 try:
1186 while True:
1187 self.__next_result = self.__results.next()
1188 if not self.group_by:
1189 break
1190 next_group = _GetGroupByKey(self.__next_result, self.group_by)
1191 if next_group not in self.__distincts:
1192 self.__distincts.add(next_group)
1193 break
1194 except StopIteration:
1195 self._Done()
1196 if (self.__end_cursor and
1197 not self._IsBeforeCursor(self.__next_result, self.__end_cursor)):
1198 self._Done()
1200 def _GetNext(self):
1201 """Ensures next result is fetched."""
1202 if self.__limit is not None and self.__offset >= self.__limit:
1203 self._Done()
1204 if self.__next_result is None:
1205 self._Advance()
1207 def _Next(self):
1208 """Returns and consumes next result."""
1209 self._GetNext()
1210 self.__last_result = self.__next_result
1211 self.__next_result = None
1212 self.__offset += 1
1213 return self.__last_result
1215 def PopulateQueryResult(self, result, count, offset,
1216 compile=False, first_result=False):
1217 """Populates a QueryResult with this cursor and the given number of results.
1219 Args:
1220 result: datastore_pb.QueryResult
1221 count: integer of how many results to return
1222 offset: integer of how many results to skip
1223 compile: boolean, whether we are compiling this query
1224 first_result: whether the query result is the first for this query
1226 Check(offset >= 0, 'Offset must be >= 0')
1227 skipped = 0
1228 try:
1229 limited_offset = min(offset, _MAX_QUERY_OFFSET)
1230 while skipped < limited_offset:
1231 self._Next()
1232 skipped += 1
1240 if skipped == offset:
1241 if count > _MAXIMUM_RESULTS:
1242 count = _MAXIMUM_RESULTS
1243 while count > 0:
1244 result.result_list().append(LoadEntity(self._Next(), self.keys_only,
1245 self.property_names))
1246 count -= 1
1248 self._GetNext()
1249 except StopIteration:
1250 pass
1252 result.set_more_results(not self.__done)
1253 result.set_skipped_results(skipped)
1254 self._PopulateResultMetadata(result, compile,
1255 first_result, self.__last_result)
1258 class ListCursor(BaseCursor):
1259 """A query cursor over a list of entities.
1261 Public properties:
1262 keys_only: whether the query is keys_only
1265 def __init__(self, query, dsquery, orders, index_list, results):
1266 """Constructor.
1268 Args:
1269 query: the query request proto
1270 dsquery: a datastore_query.Query over query.
1271 orders: the orders of query as returned by _GuessOrders.
1272 index_list: the list of indexes used by the query.
1273 results: list of entity_pb.EntityProto
1275 super(ListCursor, self).__init__(query, dsquery, orders, index_list)
1278 if self.group_by:
1279 distincts = set()
1280 new_results = []
1281 for result in results:
1282 key_value = _GetGroupByKey(result, self.group_by)
1283 if key_value not in distincts:
1284 distincts.add(key_value)
1285 new_results.append(result)
1286 results = new_results
1288 if query.has_compiled_cursor() and query.compiled_cursor().position_list():
1289 start_cursor = self._DecodeCompiledCursor(query.compiled_cursor())
1290 self.__last_result = start_cursor[0]
1291 start_cursor_position = self._GetCursorOffset(results, start_cursor)
1292 else:
1293 self.__last_result = None
1294 start_cursor_position = 0
1296 if query.has_end_compiled_cursor():
1297 if query.end_compiled_cursor().position_list():
1298 end_cursor = self._DecodeCompiledCursor(query.end_compiled_cursor())
1299 end_cursor_position = self._GetCursorOffset(results, end_cursor)
1300 else:
1301 end_cursor_position = 0
1302 else:
1303 end_cursor_position = len(results)
1306 results = results[start_cursor_position:end_cursor_position]
1309 if query.has_limit():
1310 limit = query.limit()
1311 if query.offset():
1312 limit += query.offset()
1313 if limit >= 0 and limit < len(results):
1314 results = results[:limit]
1316 self.__results = results
1317 self.__offset = 0
1318 self.__count = len(self.__results)
1320 def _GetCursorOffset(self, results, cursor):
1321 """Converts a cursor into a offset into the result set even if the
1322 cursor's entity no longer exists.
1324 Args:
1325 results: the query's results (sequence of entity_pb.EntityProto)
1326 cursor: a compiled cursor as returned by _DecodeCompiledCursor
1327 Returns:
1328 the integer offset
1330 lo = 0
1331 hi = len(results)
1332 while lo < hi:
1333 mid = (lo + hi) // 2
1334 if self._IsBeforeCursor(results[mid], cursor):
1335 lo = mid + 1
1336 else:
1337 hi = mid
1338 return lo
1340 def PopulateQueryResult(self, result, count, offset,
1341 compile=False, first_result=False):
1342 """Populates a QueryResult with this cursor and the given number of results.
1344 Args:
1345 result: datastore_pb.QueryResult
1346 count: integer of how many results to return
1347 offset: integer of how many results to skip
1348 compile: boolean, whether we are compiling this query
1349 first_result: whether the query result is the first for this query
1351 Check(offset >= 0, 'Offset must be >= 0')
1353 offset = min(offset, self.__count - self.__offset)
1354 limited_offset = min(offset, _MAX_QUERY_OFFSET)
1355 if limited_offset:
1356 self.__offset += limited_offset
1357 result.set_skipped_results(limited_offset)
1359 if offset == limited_offset and count:
1361 if count > _MAXIMUM_RESULTS:
1362 count = _MAXIMUM_RESULTS
1363 results = self.__results[self.__offset:self.__offset + count]
1364 count = len(results)
1365 self.__offset += count
1371 result.result_list().extend(
1372 LoadEntity(entity, self.keys_only, self.property_names)
1373 for entity in results)
1375 if self.__offset:
1377 self.__last_result = self.__results[self.__offset - 1]
1379 result.set_more_results(self.__offset < self.__count)
1380 self._PopulateResultMetadata(result, compile,
1381 first_result, self.__last_result)
1384 def _SynchronizeTxn(function):
1385 """A decorator that locks a transaction during the function call."""
1387 def sync(txn, *args, **kwargs):
1389 txn._lock.acquire()
1390 try:
1392 Check(txn._state is LiveTxn.ACTIVE, 'transaction closed')
1394 return function(txn, *args, **kwargs)
1395 finally:
1397 txn._lock.release()
1398 return sync
1401 def _GetEntityGroup(ref):
1402 """Returns the entity group key for the given reference."""
1403 entity_group = entity_pb.Reference()
1404 entity_group.CopyFrom(ref)
1405 assert (entity_group.path().element_list()[0].has_id() or
1406 entity_group.path().element_list()[0].has_name())
1407 del entity_group.path().element_list()[1:]
1408 return entity_group
1411 def _GetKeyKind(key):
1412 """Return the kind of the given key."""
1413 return key.path().element_list()[-1].type()
1416 def _FilterIndexesByKind(key, indexes):
1417 """Return only the indexes with the specified kind."""
1418 return filter((lambda index:
1419 index.definition().entity_type() == _GetKeyKind(key)), indexes)
1422 class LiveTxn(object):
1423 """An in flight transaction."""
1442 ACTIVE = 1
1443 COMMITED = 2
1444 ROLLEDBACK = 3
1445 FAILED = 4
1447 _state = ACTIVE
1448 _commit_time_s = None
1450 def __init__(self, txn_manager, app, allow_multiple_eg):
1451 assert isinstance(txn_manager, BaseTransactionManager)
1452 assert isinstance(app, basestring)
1454 self._txn_manager = txn_manager
1455 self._app = app
1456 self._allow_multiple_eg = allow_multiple_eg
1459 self._entity_groups = {}
1461 self._lock = threading.RLock()
1462 self._apply_lock = threading.Lock()
1464 self._actions = []
1465 self._cost = datastore_pb.Cost()
1471 self._kind_to_indexes = collections.defaultdict(list)
1473 def _GetTracker(self, reference):
1474 """Gets the entity group tracker for reference.
1476 If this is the first time reference's entity group is seen, creates a new
1477 tracker, checking that the transaction doesn't exceed the entity group
1478 limit.
1480 entity_group = _GetEntityGroup(reference)
1481 key = datastore_types.ReferenceToKeyValue(entity_group)
1482 tracker = self._entity_groups.get(key, None)
1483 if tracker is None:
1484 Check(self._app == reference.app(),
1485 'Transactions cannot span applications (expected %s, got %s)' %
1486 (self._app, reference.app()))
1487 if self._allow_multiple_eg:
1488 Check(len(self._entity_groups) < _MAX_EG_PER_TXN,
1489 'operating on too many entity groups in a single transaction.')
1490 else:
1491 Check(len(self._entity_groups) < 1,
1492 "cross-groups transaction need to be explicitly "
1493 "specified (xg=True)")
1494 tracker = EntityGroupTracker(entity_group)
1495 self._entity_groups[key] = tracker
1497 return tracker
1499 def _GetAllTrackers(self):
1500 """Get the trackers for the transaction's entity groups.
1502 If no entity group has been discovered returns a 'global' entity group
1503 tracker. This is possible if the txn only contains transactional tasks.
1505 Returns:
1506 The tracker list for the entity groups used in this txn.
1508 if not self._entity_groups:
1509 self._GetTracker(datastore_types.Key.from_path(
1510 '__global__', 1, _app=self._app)._ToPb())
1511 return self._entity_groups.values()
1513 def _GrabSnapshot(self, reference):
1514 """Gets snapshot for this reference, creating it if necessary.
1516 If no snapshot has been set for reference's entity group, a snapshot is
1517 taken and stored for future reads (this also sets the read position),
1518 and a CONCURRENT_TRANSACTION exception is thrown if we no longer have
1519 a consistent snapshot.
1521 Args:
1522 reference: A entity_pb.Reference from which to extract the entity group.
1523 Raises:
1524 apiproxy_errors.ApplicationError if the snapshot is not consistent.
1526 tracker = self._GetTracker(reference)
1527 check_contention = tracker._snapshot is None
1528 snapshot = tracker._GrabSnapshot(self._txn_manager)
1529 if check_contention:
1535 candidates = [other for other in self._entity_groups.values()
1536 if other._snapshot is not None and other != tracker]
1537 meta_data_list = [other._meta_data for other in candidates]
1538 self._txn_manager._AcquireWriteLocks(meta_data_list)
1539 try:
1540 for other in candidates:
1541 if other._meta_data._log_pos != other._read_pos:
1542 self._state = self.FAILED
1543 raise apiproxy_errors.ApplicationError(
1544 datastore_pb.Error.CONCURRENT_TRANSACTION,
1545 'Concurrency exception.')
1546 finally:
1547 self._txn_manager._ReleaseWriteLocks(meta_data_list)
1548 return snapshot
1550 @_SynchronizeTxn
1551 def Get(self, reference):
1552 """Returns the entity associated with the given entity_pb.Reference or None.
1554 Does not see any modifications in the current txn.
1556 Args:
1557 reference: The entity_pb.Reference of the entity to look up.
1559 Returns:
1560 The associated entity_pb.EntityProto or None if no such entity exists.
1562 snapshot = self._GrabSnapshot(reference)
1563 entity = snapshot.get(datastore_types.ReferenceToKeyValue(reference))
1564 return LoadEntity(entity)
1566 @_SynchronizeTxn
1567 def GetQueryCursor(self, query, filters, orders, index_list):
1568 """Runs the given datastore_pb.Query and returns a QueryCursor for it.
1570 Does not see any modifications in the current txn.
1572 Args:
1573 query: The datastore_pb.Query to run.
1574 filters: A list of filters that override the ones found on query.
1575 orders: A list of orders that override the ones found on query.
1576 index_list: A list of indexes used by the query.
1578 Returns:
1579 A BaseCursor that can be used to fetch query results.
1581 Check(query.has_ancestor(),
1582 'Query must have an ancestor when performed in a transaction.')
1583 snapshot = self._GrabSnapshot(query.ancestor())
1584 return _ExecuteQuery(snapshot.values(), query, filters, orders, index_list)
1586 @_SynchronizeTxn
1587 def Put(self, entity, insert, indexes):
1588 """Puts the given entity.
1590 Args:
1591 entity: The entity_pb.EntityProto to put.
1592 insert: A boolean that indicates if we should fail if the entity already
1593 exists.
1594 indexes: The composite indexes that apply to the entity.
1596 tracker = self._GetTracker(entity.key())
1597 key = datastore_types.ReferenceToKeyValue(entity.key())
1598 tracker._delete.pop(key, None)
1599 tracker._put[key] = (entity, insert)
1600 self._kind_to_indexes[_GetKeyKind(entity.key())] = indexes
1602 @_SynchronizeTxn
1603 def Delete(self, reference, indexes):
1604 """Deletes the entity associated with the given reference.
1606 Args:
1607 reference: The entity_pb.Reference of the entity to delete.
1608 indexes: The composite indexes that apply to the entity.
1610 tracker = self._GetTracker(reference)
1611 key = datastore_types.ReferenceToKeyValue(reference)
1612 tracker._put.pop(key, None)
1613 tracker._delete[key] = reference
1614 self._kind_to_indexes[_GetKeyKind(reference)] = indexes
1616 @_SynchronizeTxn
1617 def AddActions(self, actions, max_actions=None):
1618 """Adds the given actions to the current txn.
1620 Args:
1621 actions: A list of pbs to send to taskqueue.Add when the txn is applied.
1622 max_actions: A number that indicates the maximum number of actions to
1623 allow on this txn.
1625 Check(not max_actions or len(self._actions) + len(actions) <= max_actions,
1626 'Too many messages, maximum allowed %s' % max_actions)
1627 self._actions.extend(actions)
1629 def Rollback(self):
1630 """Rollback the current txn."""
1632 self._lock.acquire()
1633 try:
1634 Check(self._state is self.ACTIVE or self._state is self.FAILED,
1635 'transaction closed')
1636 self._state = self.ROLLEDBACK
1637 finally:
1638 self._txn_manager._RemoveTxn(self)
1640 self._lock.release()
1642 @_SynchronizeTxn
1643 def Commit(self):
1644 """Commits the current txn.
1646 This function hands off the responsibility of calling _Apply to the owning
1647 TransactionManager.
1649 Returns:
1650 The cost of the transaction.
1652 try:
1654 trackers = self._GetAllTrackers()
1655 empty = True
1656 for tracker in trackers:
1657 snapshot = tracker._GrabSnapshot(self._txn_manager)
1658 empty = empty and not tracker._put and not tracker._delete
1661 for entity, insert in tracker._put.itervalues():
1662 Check(not insert or self.Get(entity.key()) is None,
1663 'the id allocated for a new entity was already '
1664 'in use, please try again')
1666 old_entity = None
1667 key = datastore_types.ReferenceToKeyValue(entity.key())
1668 if key in snapshot:
1669 old_entity = snapshot[key]
1670 self._AddWriteOps(old_entity, entity)
1672 for reference in tracker._delete.itervalues():
1675 old_entity = None
1676 key = datastore_types.ReferenceToKeyValue(reference)
1677 if key in snapshot:
1678 old_entity = snapshot[key]
1679 if old_entity is not None:
1680 self._AddWriteOps(None, old_entity)
1683 if empty and not self._actions:
1684 self.Rollback()
1685 return datastore_pb.Cost()
1688 meta_data_list = [tracker._meta_data for tracker in trackers]
1689 self._txn_manager._AcquireWriteLocks(meta_data_list)
1690 except:
1692 self.Rollback()
1693 raise
1695 try:
1697 for tracker in trackers:
1698 Check(tracker._meta_data._log_pos == tracker._read_pos,
1699 'Concurrency exception.',
1700 datastore_pb.Error.CONCURRENT_TRANSACTION)
1703 for tracker in trackers:
1704 tracker._meta_data.Log(self)
1705 self._state = self.COMMITED
1706 self._commit_time_s = time.time()
1707 except:
1709 self.Rollback()
1710 raise
1711 else:
1713 for action in self._actions:
1714 try:
1715 apiproxy_stub_map.MakeSyncCall(
1716 'taskqueue', 'Add', action, api_base_pb.VoidProto())
1717 except apiproxy_errors.ApplicationError, e:
1718 logging.warning('Transactional task %s has been dropped, %s',
1719 action, e)
1720 self._actions = []
1721 finally:
1722 self._txn_manager._RemoveTxn(self)
1724 self._txn_manager._ReleaseWriteLocks(meta_data_list)
1727 self._txn_manager._consistency_policy._OnCommit(self)
1728 return self._cost
1730 def _AddWriteOps(self, old_entity, new_entity):
1731 """Adds the cost of writing the new_entity to the _cost member.
1733 We assume that old_entity represents the current state of the Datastore.
1735 Args:
1736 old_entity: Entity representing the current state in the Datstore.
1737 new_entity: Entity representing the desired state in the Datstore.
1739 composite_indexes = self._kind_to_indexes[_GetKeyKind(new_entity.key())]
1740 entity_writes, index_writes = _CalculateWriteOps(
1741 composite_indexes, old_entity, new_entity)
1742 _UpdateCost(self._cost, entity_writes, index_writes)
1744 def _Apply(self, meta_data):
1745 """Applies the current txn on the given entity group.
1747 This function blindly performs the operations contained in the current txn.
1748 The calling function must acquire the entity group write lock and ensure
1749 transactions are applied in order.
1752 self._apply_lock.acquire()
1753 try:
1755 assert self._state == self.COMMITED
1756 for tracker in self._entity_groups.values():
1757 if tracker._meta_data is meta_data:
1758 break
1759 else:
1760 assert False
1761 assert tracker._read_pos != tracker.APPLIED
1764 for entity, insert in tracker._put.itervalues():
1765 self._txn_manager._Put(entity, insert)
1768 for key in tracker._delete.itervalues():
1769 self._txn_manager._Delete(key)
1773 tracker._read_pos = EntityGroupTracker.APPLIED
1776 tracker._meta_data.Unlog(self)
1777 finally:
1778 self._apply_lock.release()
1781 class EntityGroupTracker(object):
1782 """An entity group involved a transaction."""
1784 APPLIED = -2
1790 _read_pos = None
1793 _snapshot = None
1796 _meta_data = None
1798 def __init__(self, entity_group):
1799 self._entity_group = entity_group
1800 self._put = {}
1801 self._delete = {}
1803 def _GrabSnapshot(self, txn_manager):
1804 """Snapshot this entity group, remembering the read position."""
1805 if self._snapshot is None:
1806 self._meta_data, self._read_pos, self._snapshot = (
1807 txn_manager._GrabSnapshot(self._entity_group))
1808 return self._snapshot
1811 class EntityGroupMetaData(object):
1812 """The meta_data assoicated with an entity group."""
1815 _log_pos = -1
1817 _snapshot = None
1819 def __init__(self, entity_group):
1820 self._entity_group = entity_group
1821 self._write_lock = threading.Lock()
1822 self._apply_queue = []
1824 def CatchUp(self):
1825 """Applies all outstanding txns."""
1827 assert self._write_lock.acquire(False) is False
1829 while self._apply_queue:
1830 self._apply_queue[0]._Apply(self)
1832 def Log(self, txn):
1833 """Add a pending transaction to this entity group.
1835 Requires that the caller hold the meta data lock.
1836 This also increments the current log position and clears the snapshot cache.
1839 assert self._write_lock.acquire(False) is False
1840 self._apply_queue.append(txn)
1841 self._log_pos += 1
1842 self._snapshot = None
1844 def Unlog(self, txn):
1845 """Remove the first pending transaction from the apply queue.
1847 Requires that the caller hold the meta data lock.
1848 This checks that the first pending transaction is indeed txn.
1851 assert self._write_lock.acquire(False) is False
1853 Check(self._apply_queue and self._apply_queue[0] is txn,
1854 'Transaction is not appliable',
1855 datastore_pb.Error.INTERNAL_ERROR)
1856 self._apply_queue.pop(0)
1859 class BaseConsistencyPolicy(object):
1860 """A base class for a consistency policy to be used with a transaction manger.
1865 def _OnCommit(self, txn):
1866 """Called after a LiveTxn has been commited.
1868 This function can decide whether to apply the txn right away.
1870 Args:
1871 txn: A LiveTxn that has been commited
1873 raise NotImplementedError
1875 def _OnGroom(self, meta_data_list):
1876 """Called once for every global query.
1878 This function must aqcuire the write lock for any meta data before applying
1879 any outstanding txns.
1881 Args:
1882 meta_data_list: A list of EntityGroupMetaData objects.
1884 raise NotImplementedError
1887 class MasterSlaveConsistencyPolicy(BaseConsistencyPolicy):
1888 """Enforces the Master / Slave consistency policy.
1890 Applies all txn on commit.
1893 def _OnCommit(self, txn):
1895 for tracker in txn._GetAllTrackers():
1896 tracker._meta_data._write_lock.acquire()
1897 try:
1898 tracker._meta_data.CatchUp()
1899 finally:
1900 tracker._meta_data._write_lock.release()
1905 txn._txn_manager.Write()
1907 def _OnGroom(self, meta_data_list):
1910 pass
1913 class BaseHighReplicationConsistencyPolicy(BaseConsistencyPolicy):
1914 """A base class for High Replication Datastore consistency policies.
1916 All txn are applied asynchronously.
1919 def _OnCommit(self, txn):
1920 pass
1922 def _OnGroom(self, meta_data_list):
1925 for meta_data in meta_data_list:
1926 if not meta_data._apply_queue:
1927 continue
1930 meta_data._write_lock.acquire()
1931 try:
1932 while meta_data._apply_queue:
1933 txn = meta_data._apply_queue[0]
1934 if self._ShouldApply(txn, meta_data):
1935 txn._Apply(meta_data)
1936 else:
1937 break
1938 finally:
1939 meta_data._write_lock.release()
1941 def _ShouldApply(self, txn, meta_data):
1942 """Determins if the given transaction should be applied."""
1943 raise NotImplementedError
1946 class TimeBasedHRConsistencyPolicy(BaseHighReplicationConsistencyPolicy):
1947 """A High Replication Datastore consiseny policy based on elapsed time.
1949 This class tries to simulate performance seen in the high replication
1950 datastore using estimated probabilities of a transaction commiting after a
1951 given amount of time.
1954 _classification_map = [(.98, 100),
1955 (.99, 300),
1956 (.995, 2000),
1957 (1, 240000)
1960 def SetClassificationMap(self, classification_map):
1961 """Set the probability a txn will be applied after a given amount of time.
1963 Args:
1964 classification_map: A list of tuples containing (float between 0 and 1,
1965 number of miliseconds) that define the probability of a transaction
1966 applying after a given amount of time.
1968 for prob, delay in classification_map:
1969 if prob < 0 or prob > 1 or delay <= 0:
1970 raise TypeError(
1971 'classification_map must be a list of (probability, delay) tuples, '
1972 'found %r' % (classification_map,))
1974 self._classification_map = sorted(classification_map)
1976 def _ShouldApplyImpl(self, elapsed_ms, classification):
1977 for rate, ms in self._classification_map:
1978 if classification <= rate:
1979 break
1980 return elapsed_ms >= ms
1982 def _Classify(self, txn, meta_data):
1983 return random.Random(id(txn) ^ id(meta_data)).random()
1985 def _ShouldApply(self, txn, meta_data):
1986 elapsed_ms = (time.time() - txn._commit_time_s) * 1000
1987 classification = self._Classify(txn, meta_data)
1988 return self._ShouldApplyImpl(elapsed_ms, classification)
1991 class PseudoRandomHRConsistencyPolicy(BaseHighReplicationConsistencyPolicy):
1992 """A policy that always gives the same sequence of consistency decisions."""
1994 def __init__(self, probability=.5, seed=0):
1995 """Constructor.
1997 Args:
1998 probability: A number between 0 and 1 that is the likelihood of a
1999 transaction applying before a global query is executed.
2000 seed: A hashable object to use as a seed. Use None to use the current
2001 timestamp.
2003 self.SetProbability(probability)
2004 self.SetSeed(seed)
2006 def SetProbability(self, probability):
2007 """Change the probability of a transaction applying.
2009 Args:
2010 probability: A number between 0 and 1 that determins the probability of a
2011 transaction applying before a global query is run.
2013 if probability < 0 or probability > 1:
2014 raise TypeError('probability must be a number between 0 and 1, found %r' %
2015 probability)
2016 self._probability = probability
2018 def SetSeed(self, seed):
2019 """Reset the seed."""
2020 self._random = random.Random(seed)
2022 def _ShouldApply(self, txn, meta_data):
2023 return self._random.random() < self._probability
2026 class BaseTransactionManager(object):
2027 """A class that manages the state of transactions.
2029 This includes creating consistent snap shots for transactions.
2032 def __init__(self, consistency_policy=None):
2033 super(BaseTransactionManager, self).__init__()
2035 self._consistency_policy = (consistency_policy or
2036 MasterSlaveConsistencyPolicy())
2039 self._meta_data_lock = threading.Lock()
2040 BaseTransactionManager.Clear(self)
2042 def SetConsistencyPolicy(self, policy):
2043 """Set the consistency to use.
2045 Causes all data to be flushed.
2047 Args:
2048 policy: A obj inheriting from BaseConsistencyPolicy.
2050 if not isinstance(policy, BaseConsistencyPolicy):
2051 raise TypeError('policy should be of type '
2052 'datastore_stub_util.BaseConsistencyPolicy found %r.' %
2053 (policy,))
2054 self.Flush()
2055 self._consistency_policy = policy
2057 def Clear(self):
2058 """Discards any pending transactions and resets the meta data."""
2060 self._meta_data = {}
2062 self._txn_map = {}
2064 def BeginTransaction(self, app, allow_multiple_eg):
2065 """Start a transaction on the given app.
2067 Args:
2068 app: A string representing the app for which to start the transaction.
2069 allow_multiple_eg: True if transactions can span multiple entity groups.
2071 Returns:
2072 A datastore_pb.Transaction for the created transaction
2074 Check(not (allow_multiple_eg and isinstance(
2075 self._consistency_policy, MasterSlaveConsistencyPolicy)),
2076 'transactions on multiple entity groups only allowed with the '
2077 'High Replication datastore')
2078 txn = self._BeginTransaction(app, allow_multiple_eg)
2079 self._txn_map[id(txn)] = txn
2080 transaction = datastore_pb.Transaction()
2081 transaction.set_app(app)
2082 transaction.set_handle(id(txn))
2083 return transaction
2085 def GetTxn(self, transaction, request_trusted, request_app):
2086 """Gets the LiveTxn object associated with the given transaction.
2088 Args:
2089 transaction: The datastore_pb.Transaction to look up.
2090 request_trusted: A boolean indicating If the requesting app is trusted.
2091 request_app: A string representing the app making the request.
2093 Returns:
2094 The associated LiveTxn object.
2096 request_app = datastore_types.ResolveAppId(request_app)
2097 CheckTransaction(request_trusted, request_app, transaction)
2098 txn = self._txn_map.get(transaction.handle())
2099 Check(txn and txn._app == transaction.app(),
2100 'Transaction(<%s>) not found' % str(transaction).replace('\n', ', '))
2101 return txn
2103 def Groom(self):
2104 """Attempts to apply any outstanding transactions.
2106 The consistency policy determins if a transaction should be applied.
2108 self._meta_data_lock.acquire()
2109 try:
2110 self._consistency_policy._OnGroom(self._meta_data.itervalues())
2111 finally:
2112 self._meta_data_lock.release()
2114 def Flush(self):
2115 """Applies all outstanding transactions."""
2116 self._meta_data_lock.acquire()
2117 try:
2118 for meta_data in self._meta_data.itervalues():
2119 if not meta_data._apply_queue:
2120 continue
2123 meta_data._write_lock.acquire()
2124 try:
2125 meta_data.CatchUp()
2126 finally:
2127 meta_data._write_lock.release()
2128 finally:
2129 self._meta_data_lock.release()
2131 def _GetMetaData(self, entity_group):
2132 """Safely gets the EntityGroupMetaData object for the given entity_group.
2134 self._meta_data_lock.acquire()
2135 try:
2136 key = datastore_types.ReferenceToKeyValue(entity_group)
2138 meta_data = self._meta_data.get(key, None)
2139 if not meta_data:
2140 meta_data = EntityGroupMetaData(entity_group)
2141 self._meta_data[key] = meta_data
2142 return meta_data
2143 finally:
2144 self._meta_data_lock.release()
2146 def _BeginTransaction(self, app, allow_multiple_eg):
2147 """Starts a transaction without storing it in the txn_map."""
2148 return LiveTxn(self, app, allow_multiple_eg)
2150 def _GrabSnapshot(self, entity_group):
2151 """Grabs a consistent snapshot of the given entity group.
2153 Args:
2154 entity_group: A entity_pb.Reference of the entity group of which the
2155 snapshot should be taken.
2157 Returns:
2158 A tuple of (meta_data, log_pos, snapshot) where log_pos is the current log
2159 position and snapshot is a map of reference key value to
2160 entity_pb.EntityProto.
2163 meta_data = self._GetMetaData(entity_group)
2164 meta_data._write_lock.acquire()
2165 try:
2166 if not meta_data._snapshot:
2168 meta_data.CatchUp()
2169 meta_data._snapshot = self._GetEntitiesInEntityGroup(entity_group)
2170 return meta_data, meta_data._log_pos, meta_data._snapshot
2171 finally:
2173 meta_data._write_lock.release()
2175 def _AcquireWriteLocks(self, meta_data_list):
2176 """Acquire the write locks for the given entity group meta data.
2178 These locks must be released with _ReleaseWriteLock before returning to the
2179 user.
2181 Args:
2182 meta_data_list: list of EntityGroupMetaData objects.
2184 for meta_data in sorted(meta_data_list):
2185 meta_data._write_lock.acquire()
2187 def _ReleaseWriteLocks(self, meta_data_list):
2188 """Release the write locks of the given entity group meta data.
2190 Args:
2191 meta_data_list: list of EntityGroupMetaData objects.
2193 for meta_data in sorted(meta_data_list):
2194 meta_data._write_lock.release()
2196 def _RemoveTxn(self, txn):
2197 """Removes a LiveTxn from the txn_map (if present)."""
2198 self._txn_map.pop(id(txn), None)
2200 def _Put(self, entity, insert):
2201 """Put the given entity.
2203 This must be implemented by a sub-class. The sub-class can assume that any
2204 need consistency is enforced at a higher level (and can just put blindly).
2206 Args:
2207 entity: The entity_pb.EntityProto to put.
2208 insert: A boolean that indicates if we should fail if the entity already
2209 exists.
2211 raise NotImplementedError
2213 def _Delete(self, reference):
2214 """Delete the entity associated with the specified reference.
2216 This must be implemented by a sub-class. The sub-class can assume that any
2217 need consistency is enforced at a higher level (and can just delete
2218 blindly).
2220 Args:
2221 reference: The entity_pb.Reference of the entity to delete.
2223 raise NotImplementedError
2225 def _GetEntitiesInEntityGroup(self, entity_group):
2226 """Gets the contents of a specific entity group.
2228 This must be implemented by a sub-class. The sub-class can assume that any
2229 need consistency is enforced at a higher level (and can just blindly read).
2231 Other entity groups may be modified concurrently.
2233 Args:
2234 entity_group: A entity_pb.Reference of the entity group to get.
2236 Returns:
2237 A dict mapping datastore_types.ReferenceToKeyValue(key) to EntityProto
2239 raise NotImplementedError
2242 class BaseIndexManager(object):
2243 """A generic index manager that stores all data in memory."""
2252 WRITE_ONLY = entity_pb.CompositeIndex.WRITE_ONLY
2253 READ_WRITE = entity_pb.CompositeIndex.READ_WRITE
2254 DELETED = entity_pb.CompositeIndex.DELETED
2255 ERROR = entity_pb.CompositeIndex.ERROR
2257 _INDEX_STATE_TRANSITIONS = {
2258 WRITE_ONLY: frozenset((READ_WRITE, DELETED, ERROR)),
2259 READ_WRITE: frozenset((DELETED,)),
2260 ERROR: frozenset((DELETED,)),
2261 DELETED: frozenset((ERROR,)),
2264 def __init__(self):
2268 self.__indexes = collections.defaultdict(list)
2269 self.__indexes_lock = threading.Lock()
2270 self.__next_index_id = 1
2271 self.__index_id_lock = threading.Lock()
2273 def __FindIndex(self, index):
2274 """Finds an existing index by definition.
2276 Args:
2277 index: entity_pb.CompositeIndex
2279 Returns:
2280 entity_pb.CompositeIndex, if it exists; otherwise None
2282 app = index.app_id()
2283 if app in self.__indexes:
2284 for stored_index in self.__indexes[app]:
2285 if index.definition() == stored_index.definition():
2286 return stored_index
2288 return None
2290 def CreateIndex(self, index, trusted=False, calling_app=None):
2291 calling_app = datastore_types.ResolveAppId(calling_app)
2292 CheckAppId(trusted, calling_app, index.app_id())
2293 Check(index.id() == 0, 'New index id must be 0.')
2294 Check(not self.__FindIndex(index), 'Index already exists.')
2297 self.__index_id_lock.acquire()
2298 index.set_id(self.__next_index_id)
2299 self.__next_index_id += 1
2300 self.__index_id_lock.release()
2303 clone = entity_pb.CompositeIndex()
2304 clone.CopyFrom(index)
2305 app = index.app_id()
2306 clone.set_app_id(app)
2309 self.__indexes_lock.acquire()
2310 try:
2311 self.__indexes[app].append(clone)
2312 finally:
2313 self.__indexes_lock.release()
2315 self._OnIndexChange(index.app_id())
2317 return index.id()
2319 def GetIndexes(self, app, trusted=False, calling_app=None):
2320 """Get the CompositeIndex objects for the given app."""
2321 calling_app = datastore_types.ResolveAppId(calling_app)
2322 CheckAppId(trusted, calling_app, app)
2324 return self.__indexes[app]
2326 def UpdateIndex(self, index, trusted=False, calling_app=None):
2327 CheckAppId(trusted, calling_app, index.app_id())
2329 stored_index = self.__FindIndex(index)
2330 Check(stored_index, 'Index does not exist.')
2331 Check(index.state() == stored_index.state() or
2332 index.state() in self._INDEX_STATE_TRANSITIONS[stored_index.state()],
2333 'cannot move index state from %s to %s' %
2334 (entity_pb.CompositeIndex.State_Name(stored_index.state()),
2335 (entity_pb.CompositeIndex.State_Name(index.state()))))
2338 self.__indexes_lock.acquire()
2339 try:
2340 stored_index.set_state(index.state())
2341 finally:
2342 self.__indexes_lock.release()
2344 self._OnIndexChange(index.app_id())
2346 def DeleteIndex(self, index, trusted=False, calling_app=None):
2347 CheckAppId(trusted, calling_app, index.app_id())
2349 stored_index = self.__FindIndex(index)
2350 Check(stored_index, 'Index does not exist.')
2353 app = index.app_id()
2354 self.__indexes_lock.acquire()
2355 try:
2356 self.__indexes[app].remove(stored_index)
2357 finally:
2358 self.__indexes_lock.release()
2360 self._OnIndexChange(index.app_id())
2362 def _SideLoadIndex(self, index):
2363 self.__indexes[index.app()].append(index)
2365 def _OnIndexChange(self, app_id):
2366 pass
2369 class BaseDatastore(BaseTransactionManager, BaseIndexManager):
2370 """A base implemenation of a Datastore.
2372 This class implements common functions associated with a datastore and
2373 enforces security restrictions passed on by a stub or client. It is designed
2374 to be shared by any number of threads or clients serving any number of apps.
2376 If an app is not specified explicitly it is pulled from the env and assumed to
2377 be untrusted.
2382 _MAX_QUERY_COMPONENTS = 100
2386 _BATCH_SIZE = 20
2390 _MAX_ACTIONS_PER_TXN = 5
2392 def __init__(self, require_indexes=False, consistency_policy=None,
2393 use_atexit=True, auto_id_policy=SEQUENTIAL):
2394 BaseTransactionManager.__init__(self, consistency_policy=consistency_policy)
2395 BaseIndexManager.__init__(self)
2397 self._require_indexes = require_indexes
2398 self._pseudo_kinds = {}
2399 self.SetAutoIdPolicy(auto_id_policy)
2401 if use_atexit:
2406 atexit.register(self.Write)
2408 def Clear(self):
2409 """Clears out all stored values."""
2411 BaseTransactionManager.Clear(self)
2414 def _RegisterPseudoKind(self, kind):
2415 """Registers a pseudo kind to be used to satisfy a meta data query."""
2416 self._pseudo_kinds[kind.name] = kind
2417 kind._stub = weakref.proxy(self)
2422 def GetQueryCursor(self, raw_query, trusted=False, calling_app=None):
2423 """Execute a query.
2425 Args:
2426 raw_query: The non-validated datastore_pb.Query to run.
2427 trusted: If the calling app is trusted.
2428 calling_app: The app requesting the results or None to pull the app from
2429 the environment.
2431 Returns:
2432 A BaseCursor that can be used to retrieve results.
2435 calling_app = datastore_types.ResolveAppId(calling_app)
2436 CheckAppId(trusted, calling_app, raw_query.app())
2439 filters, orders = datastore_index.Normalize(raw_query.filter_list(),
2440 raw_query.order_list(),
2441 raw_query.property_name_list())
2444 CheckQuery(raw_query, filters, orders, self._MAX_QUERY_COMPONENTS)
2445 FillUsersInQuery(filters)
2448 self._CheckHasIndex(raw_query, trusted, calling_app)
2451 index_list = self.__IndexListForQuery(raw_query)
2454 if raw_query.has_transaction():
2456 Check(raw_query.kind() not in self._pseudo_kinds,
2457 'transactional queries on "%s" not allowed' % raw_query.kind())
2458 txn = self.GetTxn(raw_query.transaction(), trusted, calling_app)
2459 return txn.GetQueryCursor(raw_query, filters, orders, index_list)
2461 if raw_query.has_ancestor() and raw_query.kind() not in self._pseudo_kinds:
2463 txn = self._BeginTransaction(raw_query.app(), False)
2464 return txn.GetQueryCursor(raw_query, filters, orders, index_list)
2467 self.Groom()
2468 return self._GetQueryCursor(raw_query, filters, orders, index_list)
2470 def __IndexListForQuery(self, query):
2471 """Get the single composite index pb used by the query, if any, as a list.
2473 Args:
2474 query: the datastore_pb.Query to compute the index list for
2476 Returns:
2477 A singleton list of the composite index pb used by the query,
2480 required, kind, ancestor, props = (
2481 datastore_index.CompositeIndexForQuery(query))
2482 if not required:
2483 return []
2484 composite_index_pb = entity_pb.CompositeIndex()
2485 composite_index_pb.set_app_id(query.app())
2486 composite_index_pb.set_id(0)
2487 composite_index_pb.set_state(entity_pb.CompositeIndex.READ_WRITE)
2488 index_pb = composite_index_pb.mutable_definition()
2489 index_pb.set_entity_type(kind)
2490 index_pb.set_ancestor(bool(ancestor))
2491 for name, direction in datastore_index.GetRecommendedIndexProperties(props):
2492 prop_pb = entity_pb.Index_Property()
2493 prop_pb.set_name(name)
2494 prop_pb.set_direction(direction)
2495 index_pb.property_list().append(prop_pb)
2496 return [composite_index_pb]
2498 def Get(self, raw_keys, transaction=None, eventual_consistency=False,
2499 trusted=False, calling_app=None):
2500 """Get the entities for the given keys.
2502 Args:
2503 raw_keys: A list of unverified entity_pb.Reference objects.
2504 transaction: The datastore_pb.Transaction to use or None.
2505 eventual_consistency: If we should allow stale, potentially inconsistent
2506 results.
2507 trusted: If the calling app is trusted.
2508 calling_app: The app requesting the results or None to pull the app from
2509 the environment.
2511 Returns:
2512 A list containing the entity or None if no entity exists.
2515 if not raw_keys:
2516 return []
2518 calling_app = datastore_types.ResolveAppId(calling_app)
2520 if not transaction and eventual_consistency:
2522 result = []
2523 for key in raw_keys:
2524 CheckReference(calling_app, trusted, key)
2525 result.append(self._GetWithPseudoKinds(None, key))
2526 return result
2531 grouped_keys = collections.defaultdict(list)
2532 for i, key in enumerate(raw_keys):
2533 CheckReference(trusted, calling_app, key)
2534 entity_group = _GetEntityGroup(key)
2535 entity_group_key = datastore_types.ReferenceToKeyValue(entity_group)
2536 grouped_keys[entity_group_key].append((key, i))
2538 if transaction:
2540 txn = self.GetTxn(transaction, trusted, calling_app)
2541 return [self._GetWithPseudoKinds(txn, key) for key in raw_keys]
2542 else:
2545 result = [None] * len(raw_keys)
2547 def op(txn, v):
2548 key, i = v
2549 result[i] = self._GetWithPseudoKinds(txn, key)
2550 for keys in grouped_keys.itervalues():
2551 self._RunInTxn(keys, keys[0][0].app(), op)
2552 return result
2554 def _GetWithPseudoKinds(self, txn, key):
2555 """Fetch entity key in txn, taking account of pseudo-kinds."""
2556 pseudo_kind = self._pseudo_kinds.get(_GetKeyKind(key), None)
2557 if pseudo_kind:
2558 return pseudo_kind.Get(txn, key)
2559 elif txn:
2560 return txn.Get(key)
2561 else:
2562 return self._Get(key)
2564 def Put(self, raw_entities, cost, transaction=None,
2565 trusted=False, calling_app=None):
2566 """Writes the given given entities.
2568 Updates an entity's key and entity_group in place if needed
2570 Args:
2571 raw_entities: A list of unverified entity_pb.EntityProto objects.
2572 cost: Out param. The cost of putting the provided entities.
2573 transaction: The datastore_pb.Transaction to use or None.
2574 trusted: If the calling app is trusted.
2575 calling_app: The app requesting the results or None to pull the app from
2576 the environment.
2577 Returns:
2578 A list of entity_pb.Reference objects that indicates where each entity
2579 was stored.
2581 if not raw_entities:
2582 return []
2584 calling_app = datastore_types.ResolveAppId(calling_app)
2587 result = [None] * len(raw_entities)
2588 grouped_entities = collections.defaultdict(list)
2589 for i, raw_entity in enumerate(raw_entities):
2590 CheckEntity(trusted, calling_app, raw_entity)
2594 entity = entity_pb.EntityProto()
2595 entity.CopyFrom(raw_entity)
2598 for prop in itertools.chain(entity.property_list(),
2599 entity.raw_property_list()):
2600 FillUser(prop)
2602 last_element = entity.key().path().element_list()[-1]
2603 if not (last_element.id() or last_element.has_name()):
2604 insert = True
2607 if self._auto_id_policy == SEQUENTIAL:
2608 last_element.set_id(self._AllocateSequentialIds(entity.key())[0])
2609 else:
2610 full_key = self._AllocateIds([entity.key()])[0]
2611 last_element.set_id(full_key.path().element_list()[-1].id())
2612 else:
2613 insert = False
2615 entity_group = _GetEntityGroup(entity.key())
2616 entity.mutable_entity_group().CopyFrom(entity_group.path())
2617 entity_group_key = datastore_types.ReferenceToKeyValue(entity_group)
2618 grouped_entities[entity_group_key].append((entity, insert))
2622 key = entity_pb.Reference()
2623 key.CopyFrom(entity.key())
2624 result[i] = key
2626 if transaction:
2628 txn = self.GetTxn(transaction, trusted, calling_app)
2629 for group in grouped_entities.values():
2630 for entity, insert in group:
2632 indexes = _FilterIndexesByKind(entity.key(), self.GetIndexes(
2633 entity.key().app(), trusted, calling_app))
2634 txn.Put(entity, insert, indexes)
2635 else:
2637 for entities in grouped_entities.itervalues():
2638 txn_cost = self._RunInTxn(
2639 entities, entities[0][0].key().app(),
2641 lambda txn, v: txn.Put(v[0], v[1], _FilterIndexesByKind(
2642 v[0].key(),
2643 self.GetIndexes(v[0].key().app(), trusted, calling_app))))
2644 _UpdateCost(cost, txn_cost.entity_writes(), txn_cost.index_writes())
2645 return result
2647 def Delete(self, raw_keys, cost, transaction=None,
2648 trusted=False, calling_app=None):
2649 """Deletes the entities associated with the given keys.
2651 Args:
2652 raw_keys: A list of unverified entity_pb.Reference objects.
2653 cost: Out param. The cost of putting the provided entities.
2654 transaction: The datastore_pb.Transaction to use or None.
2655 trusted: If the calling app is trusted.
2656 calling_app: The app requesting the results or None to pull the app from
2657 the environment.
2659 if not raw_keys:
2660 return
2662 calling_app = datastore_types.ResolveAppId(calling_app)
2665 grouped_keys = collections.defaultdict(list)
2666 for key in raw_keys:
2667 CheckReference(trusted, calling_app, key)
2668 entity_group = _GetEntityGroup(key)
2669 entity_group_key = datastore_types.ReferenceToKeyValue(entity_group)
2670 grouped_keys[entity_group_key].append(key)
2672 if transaction:
2674 txn = self.GetTxn(transaction, trusted, calling_app)
2675 for key in raw_keys:
2677 indexes = _FilterIndexesByKind(key, self.GetIndexes(
2678 key.app(), trusted, calling_app))
2679 txn.Delete(key, indexes)
2680 else:
2682 for keys in grouped_keys.itervalues():
2684 txn_cost = self._RunInTxn(
2685 keys, keys[0].app(),
2686 lambda txn, key: txn.Delete(key, _FilterIndexesByKind(
2687 key, self.GetIndexes(key.app(), trusted, calling_app))))
2688 _UpdateCost(cost, txn_cost.entity_writes(), txn_cost.index_writes())
2690 def Touch(self, raw_keys, trusted=False, calling_app=None):
2691 """Applies all outstanding writes."""
2692 calling_app = datastore_types.ResolveAppId(calling_app)
2694 grouped_keys = collections.defaultdict(list)
2695 for key in raw_keys:
2696 CheckReference(trusted, calling_app, key)
2697 entity_group = _GetEntityGroup(key)
2698 entity_group_key = datastore_types.ReferenceToKeyValue(entity_group)
2699 grouped_keys[entity_group_key].append(key)
2701 for keys in grouped_keys.itervalues():
2702 self._RunInTxn(keys, keys[0].app(), lambda txn, key: None)
2704 def _RunInTxn(self, values, app, op):
2705 """Runs the given values in a separate Txn.
2707 Retries up to _RETRIES times on CONCURRENT_TRANSACTION errors.
2709 Args:
2710 values: A list of arguments to op.
2711 app: The app to create the Txn on.
2712 op: A function to run on each value in the Txn.
2714 Returns:
2715 The cost of the txn.
2717 retries = 0
2718 backoff = _INITIAL_RETRY_DELAY_MS / 1000.0
2719 while True:
2720 try:
2721 txn = self._BeginTransaction(app, False)
2722 for value in values:
2723 op(txn, value)
2724 return txn.Commit()
2725 except apiproxy_errors.ApplicationError, e:
2726 if e.application_error == datastore_pb.Error.CONCURRENT_TRANSACTION:
2728 retries += 1
2729 if retries <= _RETRIES:
2730 time.sleep(backoff)
2731 backoff *= _RETRY_DELAY_MULTIPLIER
2732 if backoff * 1000.0 > _MAX_RETRY_DELAY_MS:
2733 backoff = _MAX_RETRY_DELAY_MS / 1000.0
2734 continue
2735 raise
2737 def _CheckHasIndex(self, query, trusted=False, calling_app=None):
2738 """Checks if the query can be satisfied given the existing indexes.
2740 Args:
2741 query: the datastore_pb.Query to check
2742 trusted: True if the calling app is trusted (like dev_admin_console)
2743 calling_app: app_id of the current running application
2745 if query.kind() in self._pseudo_kinds or not self._require_indexes:
2746 return
2748 minimal_index = datastore_index.MinimalCompositeIndexForQuery(query,
2749 (datastore_index.ProtoToIndexDefinition(index)
2750 for index in self.GetIndexes(query.app(), trusted, calling_app)
2751 if index.state() == entity_pb.CompositeIndex.READ_WRITE))
2752 if minimal_index is not None:
2753 msg = ('This query requires a composite index that is not defined. '
2754 'You must update the index.yaml file in your application root.')
2755 is_most_efficient, kind, ancestor, properties = minimal_index
2756 if not is_most_efficient:
2758 yaml = datastore_index.IndexYamlForQuery(kind, ancestor,
2759 datastore_index.GetRecommendedIndexProperties(properties))
2760 msg += '\nThe following index is the minimum index required:\n' + yaml
2761 raise apiproxy_errors.ApplicationError(datastore_pb.Error.NEED_INDEX, msg)
2763 def SetAutoIdPolicy(self, auto_id_policy):
2764 """Set value of _auto_id_policy flag (default SEQUENTIAL).
2766 SEQUENTIAL auto ID assignment behavior will eventually be deprecated
2767 and the default will be SCATTERED.
2769 Args:
2770 auto_id_policy: string constant.
2771 Raises:
2772 TypeError: if auto_id_policy is not one of SEQUENTIAL or SCATTERED.
2774 valid_policies = (SEQUENTIAL, SCATTERED)
2775 if auto_id_policy not in valid_policies:
2776 raise TypeError('auto_id_policy must be in %s, found %s instead',
2777 valid_policies, auto_id_policy)
2778 self._auto_id_policy = auto_id_policy
2782 def Write(self):
2783 """Writes the datastore to disk."""
2784 self.Flush()
2786 def _GetQueryCursor(self, query, filters, orders, index_list):
2787 """Runs the given datastore_pb.Query and returns a QueryCursor for it.
2789 This must be implemented by a sub-class. The sub-class does not need to
2790 enforced any consistency guarantees (and can just blindly read).
2792 Args:
2793 query: The datastore_pb.Query to run.
2794 filters: A list of filters that override the ones found on query.
2795 orders: A list of orders that override the ones found on query.
2796 index_list: A list of indexes used by the query.
2798 Returns:
2799 A BaseCursor that can be used to fetch query results.
2801 raise NotImplementedError
2803 def _Get(self, reference):
2804 """Get the entity for the given reference or None.
2806 This must be implemented by a sub-class. The sub-class does not need to
2807 enforced any consistency guarantees (and can just blindly read).
2809 Args:
2810 reference: A entity_pb.Reference to loop up.
2812 Returns:
2813 The entity_pb.EntityProto associated with the given reference or None.
2815 raise NotImplementedError
2817 def _AllocateSequentialIds(self, reference, size=1, max_id=None):
2818 """Allocate sequential ids for given reference.
2820 Args:
2821 reference: An entity_pb.Reference to allocate an id for.
2822 size: The size of the range to allocate
2823 max_id: The upper bound of the range to allocate
2825 Returns:
2826 A tuple containing (min, max) of the allocated range.
2828 raise NotImplementedError
2830 def _AllocateIds(self, references):
2831 """Allocate or reserves IDs for the v4 datastore API.
2833 Incomplete keys are allocated scattered IDs. Complete keys have every id in
2834 their paths reserved in the appropriate ID space.
2836 Args:
2837 references: a list of entity_pb.Reference objects to allocate or reserve
2839 Returns:
2840 a list of complete entity_pb.Reference objects corresponding to the
2841 incomplete keys in the input, with newly allocated ids.
2843 raise NotImplementedError
2846 def _NeedsIndexes(func):
2847 """A decorator for DatastoreStub methods that require or affect indexes.
2849 Updates indexes to match index.yaml before the call and updates index.yaml
2850 after the call if require_indexes is False. If root_path is not set, this is a
2851 no op.
2854 def UpdateIndexesWrapper(self, *args, **kwargs):
2855 self._SetupIndexes()
2856 try:
2857 return func(self, *args, **kwargs)
2858 finally:
2859 self._UpdateIndexes()
2861 return UpdateIndexesWrapper
2864 class EntityGroupPseudoKind(object):
2865 """A common implementation of get() for the __entity_group__ pseudo-kind.
2867 Public properties:
2868 name: the pseudo-kind name
2870 name = '__entity_group__'
2880 base_version = int(time.time() * 1e6)
2882 def Get(self, txn, key):
2883 """Fetch key of this pseudo-kind within txn.
2885 Args:
2886 txn: transaction within which Get occurs, may be None if this is an
2887 eventually consistent Get.
2888 key: key of pseudo-entity to Get.
2890 Returns:
2891 An entity for key, or None if it doesn't exist.
2894 if not txn:
2895 txn = self._stub._BeginTransaction(key.app(), False)
2896 try:
2897 return self.Get(txn, key)
2898 finally:
2899 txn.Rollback()
2902 if isinstance(txn._txn_manager._consistency_policy,
2903 MasterSlaveConsistencyPolicy):
2904 return None
2911 path = key.path()
2912 if path.element_size() != 2 or path.element_list()[-1].id() != 1:
2913 return None
2915 tracker = txn._GetTracker(key)
2916 tracker._GrabSnapshot(txn._txn_manager)
2918 eg = entity_pb.EntityProto()
2919 eg.mutable_key().CopyFrom(key)
2920 eg.mutable_entity_group().CopyFrom(_GetEntityGroup(key).path())
2921 version = entity_pb.Property()
2922 version.set_name('__version__')
2923 version.set_multiple(False)
2924 version.mutable_value().set_int64value(
2925 tracker._read_pos + self.base_version)
2926 eg.property_list().append(version)
2927 return eg
2929 def Query(self, query, filters, orders):
2930 """Perform a query on this pseudo-kind.
2932 Args:
2933 query: the original datastore_pb.Query.
2934 filters: the filters from query.
2935 orders: the orders from query.
2937 Returns:
2938 always raises an error
2942 raise apiproxy_errors.ApplicationError(
2943 datastore_pb.Error.BAD_REQUEST, 'queries not supported on ' + self.name)
2946 class DatastoreStub(object):
2947 """A stub that maps datastore service calls on to a BaseDatastore.
2949 This class also keeps track of query cursors.
2952 def __init__(self,
2953 datastore,
2954 app_id=None,
2955 trusted=None,
2956 root_path=None):
2957 super(DatastoreStub, self).__init__()
2958 self._datastore = datastore
2959 self._app_id = datastore_types.ResolveAppId(app_id)
2960 self._trusted = trusted
2961 self._root_path = root_path
2964 self.__query_history = {}
2967 self.__query_ci_history = set()
2971 self._cached_yaml = (None, None, None)
2973 if self._require_indexes or root_path is None:
2975 self._index_yaml_updater = None
2976 else:
2978 self._index_yaml_updater = datastore_stub_index.IndexYamlUpdater(
2979 root_path)
2981 DatastoreStub.Clear(self)
2983 def Clear(self):
2984 """Clears out all stored values."""
2985 self._query_cursors = {}
2986 self.__query_history = {}
2987 self.__query_ci_history = set()
2989 def QueryHistory(self):
2990 """Returns a dict that maps Query PBs to times they've been run."""
2992 return dict((pb, times) for pb, times in self.__query_history.items()
2993 if pb.app() == self._app_id)
2995 def _QueryCompositeIndexHistoryLength(self):
2996 """Returns the length of the CompositeIndex set for query history."""
2997 return len(self.__query_ci_history)
2999 def SetTrusted(self, trusted):
3000 """Set/clear the trusted bit in the stub.
3002 This bit indicates that the app calling the stub is trusted. A
3003 trusted app can write to datastores of other apps.
3005 Args:
3006 trusted: boolean.
3008 self._trusted = trusted
3012 def _Dynamic_Get(self, req, res):
3015 transaction = req.has_transaction() and req.transaction() or None
3018 if req.allow_deferred() and req.key_size() > _MAXIMUM_RESULTS:
3019 keys_to_get = req.key_list()[:_MAXIMUM_RESULTS]
3020 deferred_keys = req.key_list()[_MAXIMUM_RESULTS:]
3021 res.deferred_list().extend(deferred_keys)
3022 else:
3024 keys_to_get = req.key_list()
3026 res.set_in_order(not req.allow_deferred())
3028 total_response_bytes = 0
3029 for index, entity in enumerate(self._datastore.Get(keys_to_get,
3030 transaction,
3031 req.has_failover_ms(),
3032 self._trusted,
3033 self._app_id)):
3034 entity_size = entity and entity.ByteSize() or 0
3037 if (req.allow_deferred()
3038 and index > 0
3039 and total_response_bytes + entity_size > _MAXIMUM_QUERY_RESULT_BYTES):
3041 res.deferred_list().extend(keys_to_get[index:])
3042 break
3043 elif entity:
3044 entity_result = res.add_entity()
3045 entity_result.mutable_entity().CopyFrom(entity)
3046 total_response_bytes += entity_size
3047 else:
3049 entity_result = res.add_entity()
3050 entity_result.mutable_key().CopyFrom(keys_to_get[index])
3052 def _Dynamic_Put(self, req, res):
3053 transaction = req.has_transaction() and req.transaction() or None
3054 res.key_list().extend(self._datastore.Put(req.entity_list(),
3055 res.mutable_cost(),
3056 transaction,
3057 self._trusted, self._app_id))
3059 def _Dynamic_Delete(self, req, res):
3060 transaction = req.has_transaction() and req.transaction() or None
3061 self._datastore.Delete(req.key_list(), res.mutable_cost(), transaction,
3062 self._trusted, self._app_id)
3064 def _Dynamic_Touch(self, req, _):
3065 self._datastore.Touch(req.key_list(), self._trusted, self._app_id)
3067 @_NeedsIndexes
3068 def _Dynamic_RunQuery(self, query, query_result):
3069 cursor = self._datastore.GetQueryCursor(query, self._trusted, self._app_id)
3071 if query.has_count():
3072 count = query.count()
3073 elif query.has_limit():
3074 count = query.limit()
3075 else:
3076 count = self._BATCH_SIZE
3078 cursor.PopulateQueryResult(query_result, count, query.offset(),
3079 query.compile(), first_result=True)
3080 if query_result.has_cursor():
3081 self._query_cursors[query_result.cursor().cursor()] = cursor
3084 if query.compile():
3087 compiled_query = query_result.mutable_compiled_query()
3088 compiled_query.set_keys_only(query.keys_only())
3089 compiled_query.mutable_primaryscan().set_index_name(query.Encode())
3090 self.__UpdateQueryHistory(query)
3092 def __UpdateQueryHistory(self, query):
3094 clone = datastore_pb.Query()
3095 clone.CopyFrom(query)
3096 clone.clear_hint()
3097 clone.clear_limit()
3098 clone.clear_offset()
3099 clone.clear_count()
3100 if clone in self.__query_history:
3101 self.__query_history[clone] += 1
3102 else:
3103 self.__query_history[clone] = 1
3104 if clone.app() == self._app_id:
3105 self.__query_ci_history.add(
3106 datastore_index.CompositeIndexForQuery(clone))
3109 def _Dynamic_Next(self, next_request, query_result):
3110 app = next_request.cursor().app()
3111 CheckAppId(self._trusted, self._app_id, app)
3113 cursor = self._query_cursors.get(next_request.cursor().cursor())
3114 Check(cursor and cursor.app == app,
3115 'Cursor %d not found' % next_request.cursor().cursor())
3117 count = self._BATCH_SIZE
3118 if next_request.has_count():
3119 count = next_request.count()
3121 cursor.PopulateQueryResult(query_result, count, next_request.offset(),
3122 next_request.compile(), first_result=False)
3124 if not query_result.has_cursor():
3125 del self._query_cursors[next_request.cursor().cursor()]
3127 def _Dynamic_AddActions(self, request, _):
3128 """Associates the creation of one or more tasks with a transaction.
3130 Args:
3131 request: A taskqueue_service_pb.TaskQueueBulkAddRequest containing the
3132 tasks that should be created when the transaction is committed.
3138 if not request.add_request_list():
3139 return
3141 transaction = request.add_request_list()[0].transaction()
3142 txn = self._datastore.GetTxn(transaction, self._trusted, self._app_id)
3143 new_actions = []
3144 for add_request in request.add_request_list():
3148 Check(add_request.transaction() == transaction,
3149 'Cannot add requests to different transactions')
3150 clone = taskqueue_service_pb.TaskQueueAddRequest()
3151 clone.CopyFrom(add_request)
3152 clone.clear_transaction()
3153 new_actions.append(clone)
3155 txn.AddActions(new_actions, self._MAX_ACTIONS_PER_TXN)
3157 def _Dynamic_BeginTransaction(self, req, transaction):
3158 CheckAppId(self._trusted, self._app_id, req.app())
3159 transaction.CopyFrom(self._datastore.BeginTransaction(
3160 req.app(), req.allow_multiple_eg()))
3162 def _Dynamic_Commit(self, transaction, res):
3163 CheckAppId(self._trusted, self._app_id, transaction.app())
3164 txn = self._datastore.GetTxn(transaction, self._trusted, self._app_id)
3165 res.mutable_cost().CopyFrom(txn.Commit())
3167 def _Dynamic_Rollback(self, transaction, _):
3168 CheckAppId(self._trusted, self._app_id, transaction.app())
3169 txn = self._datastore.GetTxn(transaction, self._trusted, self._app_id)
3170 txn.Rollback()
3172 def _Dynamic_CreateIndex(self, index, id_response):
3173 id_response.set_value(self._datastore.CreateIndex(index,
3174 self._trusted,
3175 self._app_id))
3177 @_NeedsIndexes
3178 def _Dynamic_GetIndices(self, app_str, composite_indices):
3179 composite_indices.index_list().extend(self._datastore.GetIndexes(
3180 app_str.value(), self._trusted, self._app_id))
3182 def _Dynamic_UpdateIndex(self, index, _):
3183 self._datastore.UpdateIndex(index, self._trusted, self._app_id)
3185 def _Dynamic_DeleteIndex(self, index, _):
3186 self._datastore.DeleteIndex(index, self._trusted, self._app_id)
3188 def _Dynamic_AllocateIds(self, allocate_ids_request, allocate_ids_response):
3189 CheckAppId(allocate_ids_request.model_key().app(),
3190 self._trusted, self._app_id)
3192 reference = allocate_ids_request.model_key()
3194 (start, end) = self._datastore._AllocateSequentialIds(
3195 reference, allocate_ids_request.size(), allocate_ids_request.max())
3197 allocate_ids_response.set_start(start)
3198 allocate_ids_response.set_end(end)
3200 def _SetupIndexes(self, _open=open):
3201 """Ensure that the set of existing composite indexes matches index.yaml.
3203 Note: this is similar to the algorithm used by the admin console for
3204 the same purpose.
3209 if not self._root_path:
3210 return
3211 index_yaml_file = os.path.join(self._root_path, 'index.yaml')
3212 if (self._cached_yaml[0] == index_yaml_file and
3213 os.path.exists(index_yaml_file) and
3214 os.path.getmtime(index_yaml_file) == self._cached_yaml[1]):
3215 requested_indexes = self._cached_yaml[2]
3216 else:
3217 try:
3218 index_yaml_mtime = os.path.getmtime(index_yaml_file)
3219 fh = _open(index_yaml_file, 'r')
3220 except (OSError, IOError):
3221 index_yaml_data = None
3222 else:
3223 try:
3224 index_yaml_data = fh.read()
3225 finally:
3226 fh.close()
3228 requested_indexes = []
3229 if index_yaml_data is not None:
3231 index_defs = datastore_index.ParseIndexDefinitions(index_yaml_data)
3232 if index_defs is not None and index_defs.indexes is not None:
3234 requested_indexes = datastore_index.IndexDefinitionsToProtos(
3235 self._app_id,
3236 index_defs.indexes)
3237 self._cached_yaml = (index_yaml_file, index_yaml_mtime,
3238 requested_indexes)
3241 existing_indexes = self._datastore.GetIndexes(
3242 self._app_id, self._trusted, self._app_id)
3245 requested = dict((x.definition().Encode(), x) for x in requested_indexes)
3246 existing = dict((x.definition().Encode(), x) for x in existing_indexes)
3249 created = 0
3250 for key, index in requested.iteritems():
3251 if key not in existing:
3252 new_index = entity_pb.CompositeIndex()
3253 new_index.CopyFrom(index)
3254 new_index.set_id(datastore_admin.CreateIndex(new_index))
3255 new_index.set_state(entity_pb.CompositeIndex.READ_WRITE)
3256 datastore_admin.UpdateIndex(new_index)
3257 created += 1
3260 deleted = 0
3261 for key, index in existing.iteritems():
3262 if key not in requested:
3263 datastore_admin.DeleteIndex(index)
3264 deleted += 1
3267 if created or deleted:
3268 logging.debug('Created %d and deleted %d index(es); total %d',
3269 created, deleted, len(requested))
3271 def _UpdateIndexes(self):
3272 if self._index_yaml_updater is not None:
3273 self._index_yaml_updater.UpdateIndexYaml()
3276 class StubQueryConverter(object):
3277 """Converter for v3 and v4 queries suitable for use in stubs."""
3279 def __init__(self, entity_converter):
3280 self._entity_converter = entity_converter
3282 def v4_to_v3_compiled_cursor(self, v4_cursor, v3_compiled_cursor):
3283 """Converts a v4 cursor string to a v3 CompiledCursor.
3285 Args:
3286 v4_cursor: a string representing a v4 query cursor
3287 v3_compiled_cursor: a datastore_pb.CompiledCursor to populate
3289 v3_compiled_cursor.Clear()
3290 v3_compiled_cursor.ParseFromString(v4_cursor)
3292 def v3_to_v4_compiled_cursor(self, v3_compiled_cursor):
3293 """Converts a v3 CompiledCursor to a v4 cursor string.
3295 Args:
3296 v3_compiled_cursor: a datastore_pb.CompiledCursor
3298 Returns:
3299 a string representing a v4 query cursor
3301 return v3_compiled_cursor.SerializeToString()
3303 def v4_to_v3_query(self, v4_partition_id, v4_query, v3_query):
3304 """Converts a v4 Query to a v3 Query.
3306 Args:
3307 v4_partition_id: a datastore_v4_pb.PartitionId
3308 v4_query: a datastore_v4_pb.Query
3309 v3_query: a datastore_pb.Query to populate
3311 Raises:
3312 InvalidConversionError if the query cannot be converted
3314 v3_query.Clear()
3316 if v4_partition_id.dataset_id():
3317 v3_query.set_app(v4_partition_id.dataset_id())
3318 if v4_partition_id.has_namespace():
3319 v3_query.set_name_space(v4_partition_id.namespace())
3321 v3_query.set_persist_offset(True)
3322 v3_query.set_require_perfect_plan(True)
3323 v3_query.set_compile(True)
3326 if v4_query.has_limit():
3327 v3_query.set_limit(v4_query.limit())
3328 if v4_query.offset():
3329 v3_query.set_offset(v4_query.offset())
3330 if v4_query.has_start_cursor():
3331 self.v4_to_v3_compiled_cursor(v4_query.start_cursor(),
3332 v3_query.mutable_compiled_cursor())
3333 if v4_query.has_end_cursor():
3334 self.v4_to_v3_compiled_cursor(v4_query.end_cursor(),
3335 v3_query.mutable_end_compiled_cursor())
3338 if v4_query.kind_list():
3339 datastore_pbs.check_conversion(len(v4_query.kind_list()) == 1,
3340 'multiple kinds not supported')
3341 v3_query.set_kind(v4_query.kind(0).name())
3344 has_key_projection = False
3345 for prop in v4_query.projection_list():
3346 if prop.property().name() == datastore_pbs.PROPERTY_NAME_KEY:
3347 has_key_projection = True
3348 else:
3349 v3_query.add_property_name(prop.property().name())
3350 if has_key_projection and not v3_query.property_name_list():
3351 v3_query.set_keys_only(True)
3354 for prop in v4_query.group_by_list():
3355 v3_query.add_group_by_property_name(prop.name())
3358 self.__populate_v3_filters(v4_query.filter(), v3_query)
3361 for v4_order in v4_query.order_list():
3362 v3_order = v3_query.add_order()
3363 v3_order.set_property(v4_order.property().name())
3364 if v4_order.has_direction():
3365 v3_order.set_direction(v4_order.direction())
3367 def v3_to_v4_query(self, v3_query, v4_query):
3368 """Converts a v3 Query to a v4 Query.
3370 Args:
3371 v3_query: a datastore_pb.Query
3372 v4_query: a datastore_v4_pb.Query to populate
3374 Raises:
3375 InvalidConversionError if the query cannot be converted
3377 v4_query.Clear()
3379 datastore_pbs.check_conversion(not v3_query.has_distinct(),
3380 'distinct option not supported')
3381 datastore_pbs.check_conversion(v3_query.require_perfect_plan(),
3382 'non-perfect plans not supported')
3386 if v3_query.has_limit():
3387 v4_query.set_limit(v3_query.limit())
3388 if v3_query.offset():
3389 v4_query.set_offset(v3_query.offset())
3390 if v3_query.has_compiled_cursor():
3391 v4_query.set_start_cursor(
3392 self.v3_to_v4_compiled_cursor(v3_query.compiled_cursor()))
3393 if v3_query.has_end_compiled_cursor():
3394 v4_query.set_end_cursor(
3395 self.v3_to_v4_compiled_cursor(v3_query.end_compiled_cursor()))
3398 if v3_query.has_kind():
3399 v4_query.add_kind().set_name(v3_query.kind())
3402 for name in v3_query.property_name_list():
3403 v4_query.add_projection().mutable_property().set_name(name)
3404 if v3_query.keys_only():
3405 v4_query.add_projection().mutable_property().set_name(
3406 datastore_pbs.PROPERTY_NAME_KEY)
3409 for name in v3_query.group_by_property_name_list():
3410 v4_query.add_group_by().set_name(name)
3413 num_v4_filters = len(v3_query.filter_list())
3414 if v3_query.has_ancestor():
3415 num_v4_filters += 1
3417 if num_v4_filters == 1:
3418 get_property_filter = self.__get_property_filter
3419 elif num_v4_filters >= 1:
3420 v4_query.mutable_filter().mutable_composite_filter().set_operator(
3421 datastore_v4_pb.CompositeFilter.AND)
3422 get_property_filter = self.__add_property_filter
3424 if v3_query.has_ancestor():
3425 self.__v3_query_to_v4_ancestor_filter(v3_query,
3426 get_property_filter(v4_query))
3427 for v3_filter in v3_query.filter_list():
3428 self.__v3_filter_to_v4_property_filter(v3_filter,
3429 get_property_filter(v4_query))
3432 for v3_order in v3_query.order_list():
3433 v4_order = v4_query.add_order()
3434 v4_order.mutable_property().set_name(v3_order.property())
3435 if v3_order.has_direction():
3436 v4_order.set_direction(v3_order.direction())
3438 def __get_property_filter(self, v4_query):
3439 """Returns the PropertyFilter from the query's top-level filter."""
3440 return v4_query.mutable_filter().mutable_property_filter()
3442 def __add_property_filter(self, v4_query):
3443 """Adds and returns a PropertyFilter from the query's composite filter."""
3444 v4_comp_filter = v4_query.mutable_filter().mutable_composite_filter()
3445 return v4_comp_filter.add_filter().mutable_property_filter()
3447 def __populate_v3_filters(self, v4_filter, v3_query):
3448 """Populates a filters for a v3 Query.
3450 Args:
3451 v4_filter: a datastore_v4_pb.Filter
3452 v3_query: a datastore_pb.Query to populate with filters
3454 if v4_filter.has_property_filter():
3455 v4_property_filter = v4_filter.property_filter()
3456 if (v4_property_filter.operator()
3457 == datastore_v4_pb.PropertyFilter.HAS_ANCESTOR):
3458 datastore_pbs.check_conversion(
3459 v4_property_filter.value().has_key_value(),
3460 'HAS_ANCESTOR requires a reference value')
3461 datastore_pbs.check_conversion((v4_property_filter.property().name()
3462 == datastore_pbs.PROPERTY_NAME_KEY),
3463 'unsupported property')
3464 datastore_pbs.check_conversion(not v3_query.has_ancestor(),
3465 'duplicate ancestor constraint')
3466 self._entity_converter.v4_to_v3_reference(
3467 v4_property_filter.value().key_value(),
3468 v3_query.mutable_ancestor())
3469 else:
3470 v3_filter = v3_query.add_filter()
3471 property_name = v4_property_filter.property().name()
3472 v3_filter.set_op(v4_property_filter.operator())
3473 datastore_pbs.check_conversion(
3474 not v4_property_filter.value().list_value_list(),
3475 ('unsupported value type, %s, in property filter'
3476 ' on "%s"' % ('list_value', property_name)))
3477 prop = v3_filter.add_property()
3478 prop.set_multiple(False)
3479 prop.set_name(property_name)
3480 self._entity_converter.v4_value_to_v3_property_value(
3481 v4_property_filter.value(), prop.mutable_value())
3482 elif v4_filter.has_composite_filter():
3483 datastore_pbs.check_conversion((v4_filter.composite_filter().operator()
3484 == datastore_v4_pb.CompositeFilter.AND),
3485 'unsupported composite property operator')
3486 for v4_sub_filter in v4_filter.composite_filter().filter_list():
3487 self.__populate_v3_filters(v4_sub_filter, v3_query)
3489 def __v3_filter_to_v4_property_filter(self, v3_filter, v4_property_filter):
3490 """Converts a v3 Filter to a v4 PropertyFilter.
3492 Args:
3493 v3_filter: a datastore_pb.Filter
3494 v4_property_filter: a datastore_v4_pb.PropertyFilter to populate
3496 Raises:
3497 InvalidConversionError if the filter cannot be converted
3499 datastore_pbs.check_conversion(v3_filter.property_size() == 1,
3500 'invalid filter')
3501 datastore_pbs.check_conversion(v3_filter.op() <= 5,
3502 'unsupported filter op: %d' % v3_filter.op())
3503 v4_property_filter.Clear()
3504 v4_property_filter.set_operator(v3_filter.op())
3505 v4_property_filter.mutable_property().set_name(v3_filter.property(0).name())
3506 self._entity_converter.v3_property_to_v4_value(
3507 v3_filter.property(0), True, v4_property_filter.mutable_value())
3509 def __v3_query_to_v4_ancestor_filter(self, v3_query, v4_property_filter):
3510 """Converts a v3 Query to a v4 ancestor PropertyFilter.
3512 Args:
3513 v3_query: a datastore_pb.Query
3514 v4_property_filter: a datastore_v4_pb.PropertyFilter to populate
3516 v4_property_filter.Clear()
3517 v4_property_filter.set_operator(
3518 datastore_v4_pb.PropertyFilter.HAS_ANCESTOR)
3519 prop = v4_property_filter.mutable_property()
3520 prop.set_name(datastore_pbs.PROPERTY_NAME_KEY)
3521 self._entity_converter.v3_to_v4_key(
3522 v3_query.ancestor(),
3523 v4_property_filter.mutable_value().mutable_key_value())
3527 __query_converter = StubQueryConverter(datastore_pbs.get_entity_converter())
3530 def get_query_converter():
3531 """Returns a converter for v3 and v4 queries (not suitable for production).
3533 This converter is suitable for use in stubs but not for production.
3535 Returns:
3536 a StubQueryConverter
3538 return __query_converter
3541 class StubServiceConverter(object):
3542 """Converter for v3/v4 request/response protos suitable for use in stubs."""
3544 def __init__(self, entity_converter, query_converter):
3545 self._entity_converter = entity_converter
3546 self._query_converter = query_converter
3548 def v4_to_v3_cursor(self, v4_query_handle, v3_cursor):
3549 """Converts a v4 cursor string to a v3 Cursor.
3551 Args:
3552 v4_query_handle: a string representing a v4 query handle
3553 v3_cursor: a datastore_pb.Cursor to populate
3555 v3_cursor.ParseFromString(v4_query_handle)
3556 return v3_cursor
3558 def _v3_to_v4_query_handle(self, v3_cursor):
3559 """Converts a v3 Cursor to a v4 query handle string.
3561 Args:
3562 v3_cursor: a datastore_pb.Cursor
3564 Returns:
3565 a string representing a v4 cursor
3567 return v3_cursor.SerializeToString()
3569 def v4_to_v3_txn(self, v4_txn, v3_txn):
3570 """Converts a v4 transaction string to a v3 Transaction.
3572 Args:
3573 v4_txn: a string representing a v4 transaction
3574 v3_txn: a datastore_pb.Transaction to populate
3576 v3_txn.ParseFromString(v4_txn)
3577 return v3_txn
3579 def _v3_to_v4_txn(self, v3_txn):
3580 """Converts a v3 Transaction to a v4 transaction string.
3582 Args:
3583 v3_txn: a datastore_pb.Transaction
3585 Returns:
3586 a string representing a v4 transaction
3588 return v3_txn.SerializeToString()
3593 def v4_to_v3_begin_transaction_req(self, app_id, v4_req):
3594 """Converts a v4 BeginTransactionRequest to a v3 BeginTransactionRequest.
3596 Args:
3597 app_id: app id
3598 v4_req: a datastore_v4_pb.BeginTransactionRequest
3600 Returns:
3601 a datastore_pb.BeginTransactionRequest
3603 v3_req = datastore_pb.BeginTransactionRequest()
3604 v3_req.set_app(app_id)
3605 v3_req.set_allow_multiple_eg(v4_req.cross_group())
3606 return v3_req
3608 def v3_to_v4_begin_transaction_req(self, v3_req):
3609 """Converts a v3 BeginTransactionRequest to a v4 BeginTransactionRequest.
3611 Args:
3612 v3_req: a datastore_pb.BeginTransactionRequest
3614 Returns:
3615 a datastore_v4_pb.BeginTransactionRequest
3617 v4_req = datastore_v4_pb.BeginTransactionRequest()
3619 if v3_req.has_allow_multiple_eg():
3620 v4_req.set_cross_group(v3_req.allow_multiple_eg())
3622 return v4_req
3624 def v4_begin_transaction_resp_to_v3_txn(self, v4_resp):
3625 """Converts a v4 BeginTransactionResponse to a v3 Transaction.
3627 Args:
3628 v4_resp: datastore_v4_pb.BeginTransactionResponse
3630 Returns:
3631 a a datastore_pb.Transaction
3633 v3_txn = datastore_pb.Transaction()
3634 self.v4_to_v3_txn(v4_resp.transaction(), v3_txn)
3635 return v3_txn
3637 def v3_to_v4_begin_transaction_resp(self, v3_resp):
3638 """Converts a v3 Transaction to a v4 BeginTransactionResponse.
3640 Args:
3641 v3_resp: a datastore_pb.Transaction
3643 Returns:
3644 a datastore_v4_pb.BeginTransactionResponse
3646 v4_resp = datastore_v4_pb.BeginTransactionResponse()
3647 v4_resp.set_transaction(self._v3_to_v4_txn(v3_resp))
3648 return v4_resp
3653 def v4_rollback_req_to_v3_txn(self, v4_req):
3654 """Converts a v4 RollbackRequest to a v3 Transaction.
3656 Args:
3657 v4_req: a datastore_v4_pb.RollbackRequest
3659 Returns:
3660 a datastore_pb.Transaction
3662 v3_txn = datastore_pb.Transaction()
3663 self.v4_to_v3_txn(v4_req.transaction(), v3_txn)
3664 return v3_txn
3666 def v3_to_v4_rollback_req(self, v3_req):
3667 """Converts a v3 Transaction to a v4 RollbackRequest.
3669 Args:
3670 v3_req: datastore_pb.Transaction
3672 Returns:
3673 a a datastore_v4_pb.RollbackRequest
3675 v4_req = datastore_v4_pb.RollbackRequest()
3676 v4_req.set_transaction(self._v3_to_v4_txn(v3_req))
3677 return v4_req
3682 def v4_commit_req_to_v3_txn(self, v4_req):
3683 """Converts a v4 CommitRequest to a v3 Transaction.
3685 Args:
3686 v4_req: a datastore_v4_pb.CommitRequest
3688 Returns:
3689 a datastore_pb.Transaction
3691 v3_txn = datastore_pb.Transaction()
3692 self.v4_to_v3_txn(v4_req.transaction(), v3_txn)
3693 return v3_txn
3698 def v4_run_query_req_to_v3_query(self, v4_req):
3699 """Converts a v4 RunQueryRequest to a v3 Query.
3701 GQL is not supported.
3703 Args:
3704 v4_req: a datastore_v4_pb.RunQueryRequest
3706 Returns:
3707 a datastore_pb.Query
3710 datastore_pbs.check_conversion(not v4_req.has_gql_query(),
3711 'GQL not supported')
3712 v3_query = datastore_pb.Query()
3713 self._query_converter.v4_to_v3_query(v4_req.partition_id(), v4_req.query(),
3714 v3_query)
3717 if v4_req.has_suggested_batch_size():
3718 v3_query.set_count(v4_req.suggested_batch_size())
3721 read_options = v4_req.read_options()
3722 if read_options.has_transaction():
3723 self.v4_to_v3_txn(read_options.transaction(),
3724 v3_query.mutable_transaction())
3725 elif (read_options.read_consistency()
3726 == datastore_v4_pb.ReadOptions.EVENTUAL):
3727 v3_query.set_strong(False)
3728 v3_query.set_failover_ms(-1)
3729 elif read_options.read_consistency() == datastore_v4_pb.ReadOptions.STRONG:
3730 v3_query.set_strong(True)
3732 if v4_req.has_min_safe_time_seconds():
3733 v3_query.set_min_safe_time_seconds(v4_req.min_safe_time_seconds())
3735 return v3_query
3737 def v3_to_v4_run_query_req(self, v3_req):
3738 """Converts a v3 Query to a v4 RunQueryRequest.
3740 Args:
3741 v3_req: a datastore_pb.Query
3743 Returns:
3744 a datastore_v4_pb.RunQueryRequest
3746 v4_req = datastore_v4_pb.RunQueryRequest()
3749 v4_partition_id = v4_req.mutable_partition_id()
3750 v4_partition_id.set_dataset_id(v3_req.app())
3751 if v3_req.name_space():
3752 v4_partition_id.set_namespace(v3_req.name_space())
3755 if v3_req.has_count():
3756 v4_req.set_suggested_batch_size(v3_req.count())
3759 if v3_req.has_transaction():
3760 v4_req.mutable_read_options().set_transaction(
3761 self._v3_to_v4_txn(v3_req.transaction()))
3762 elif v3_req.strong():
3763 v4_req.mutable_read_options().set_read_consistency(
3764 datastore_v4_pb.ReadOptions.STRONG)
3765 elif v3_req.has_failover_ms():
3766 v4_req.mutable_read_options().set_read_consistency(
3767 datastore_v4_pb.ReadOptions.EVENTUAL)
3768 if v3_req.has_min_safe_time_seconds():
3769 v4_req.set_min_safe_time_seconds(v3_req.min_safe_time_seconds())
3771 self._query_converter.v3_to_v4_query(v3_req, v4_req.mutable_query())
3773 return v4_req
3775 def v4_run_query_resp_to_v3_query_result(self, v4_resp):
3776 """Converts a V4 RunQueryResponse to a v3 QueryResult.
3778 Args:
3779 v4_resp: a datastore_v4_pb.QueryResult
3781 Returns:
3782 a datastore_pb.QueryResult
3784 v3_resp = self.v4_to_v3_query_result(v4_resp.batch())
3787 if v4_resp.has_query_handle():
3788 self.v4_to_v3_cursor(v4_resp.query_handle(), v3_resp.mutable_cursor())
3790 return v3_resp
3792 def v3_to_v4_run_query_resp(self, v3_resp):
3793 """Converts a v3 QueryResult to a V4 RunQueryResponse.
3795 Args:
3796 v3_resp: a datastore_pb.QueryResult
3798 Returns:
3799 a datastore_v4_pb.RunQueryResponse
3801 v4_resp = datastore_v4_pb.RunQueryResponse()
3802 self.v3_to_v4_query_result_batch(v3_resp, v4_resp.mutable_batch())
3804 if v3_resp.has_cursor():
3805 v4_resp.set_query_handle(
3806 self._query_converter.v3_to_v4_compiled_cursor(v3_resp.cursor()))
3808 return v4_resp
3813 def v4_to_v3_next_req(self, v4_req):
3814 """Converts a v4 ContinueQueryRequest to a v3 NextRequest.
3816 Args:
3817 v4_req: a datastore_v4_pb.ContinueQueryRequest
3819 Returns:
3820 a datastore_pb.NextRequest
3822 v3_req = datastore_pb.NextRequest()
3823 v3_req.set_compile(True)
3824 self.v4_to_v3_cursor(v4_req.query_handle(), v3_req.mutable_cursor())
3825 return v3_req
3827 def v3_to_v4_continue_query_resp(self, v3_resp):
3828 """Converts a v3 QueryResult to a v4 ContinueQueryResponse.
3830 Args:
3831 v3_resp: a datstore_pb.QueryResult
3833 Returns:
3834 a datastore_v4_pb.ContinueQueryResponse
3836 v4_resp = datastore_v4_pb.ContinueQueryResponse()
3837 self.v3_to_v4_query_result_batch(v3_resp, v4_resp.mutable_batch())
3838 return v4_resp
3843 def v4_to_v3_get_req(self, v4_req):
3844 """Converts a v4 LookupRequest to a v3 GetRequest.
3846 Args:
3847 v4_req: a datastore_v4_pb.LookupRequest
3849 Returns:
3850 a datastore_pb.GetRequest
3852 v3_req = datastore_pb.GetRequest()
3853 v3_req.set_allow_deferred(True)
3856 if v4_req.read_options().has_transaction():
3857 self.v4_to_v3_txn(v4_req.read_options().transaction(),
3858 v3_req.mutable_transaction())
3859 elif (v4_req.read_options().read_consistency()
3860 == datastore_v4_pb.ReadOptions.EVENTUAL):
3861 v3_req.set_strong(False)
3862 v3_req.set_failover_ms(-1)
3863 elif (v4_req.read_options().read_consistency()
3864 == datastore_v4_pb.ReadOptions.STRONG):
3865 v3_req.set_strong(True)
3867 for v4_key in v4_req.key_list():
3868 self._entity_converter.v4_to_v3_reference(v4_key, v3_req.add_key())
3870 return v3_req
3872 def v3_to_v4_lookup_req(self, v3_req):
3873 """Converts a v3 GetRequest to a v4 LookupRequest.
3875 Args:
3876 v3_req: a datastore_pb.GetRequest
3878 Returns:
3879 a datastore_v4_pb.LookupRequest
3881 v4_req = datastore_v4_pb.LookupRequest()
3882 datastore_pbs.check_conversion(v3_req.allow_deferred(),
3883 'allow_deferred must be true')
3886 if v3_req.has_transaction():
3887 v4_req.mutable_read_options().set_transaction(
3888 self._v3_to_v4_txn(v3_req.transaction()))
3889 elif v3_req.strong():
3890 v4_req.mutable_read_options().set_read_consistency(
3891 datastore_v4_pb.ReadOptions.STRONG)
3892 elif v3_req.has_failover_ms():
3893 v4_req.mutable_read_options().set_read_consistency(
3894 datastore_v4_pb.ReadOptions.EVENTUAL)
3896 for v3_ref in v3_req.key_list():
3897 self._entity_converter.v3_to_v4_key(v3_ref, v4_req.add_key())
3899 return v4_req
3901 def v4_to_v3_get_resp(self, v4_resp):
3902 """Converts a v4 LookupResponse to a v3 GetResponse.
3904 Args:
3905 v4_resp: a datastore_v4_pb.LookupResponse
3907 Returns:
3908 a datastore_pb.GetResponse
3910 v3_resp = datastore_pb.GetResponse()
3912 for v4_key in v4_resp.deferred_list():
3913 self._entity_converter.v4_to_v3_reference(v4_key, v3_resp.add_deferred())
3914 for v4_found in v4_resp.found_list():
3915 self._entity_converter.v4_to_v3_entity(
3916 v4_found.entity(), v3_resp.add_entity().mutable_entity())
3917 for v4_missing in v4_resp.missing_list():
3918 self._entity_converter.v4_to_v3_reference(
3919 v4_missing.entity().key(),
3920 v3_resp.add_entity().mutable_key())
3922 return v3_resp
3924 def v3_to_v4_lookup_resp(self, v3_resp):
3925 """Converts a v3 GetResponse to a v4 LookupResponse.
3927 Args:
3928 v3_resp: a datastore_pb.GetResponse
3930 Returns:
3931 a datastore_v4_pb.LookupResponse
3933 v4_resp = datastore_v4_pb.LookupResponse()
3935 for v3_ref in v3_resp.deferred_list():
3936 self._entity_converter.v3_to_v4_key(v3_ref, v4_resp.add_deferred())
3937 for v3_entity in v3_resp.entity_list():
3938 if v3_entity.has_entity():
3939 self._entity_converter.v3_to_v4_entity(
3940 v3_entity.entity(),
3941 v4_resp.add_found().mutable_entity())
3942 if v3_entity.has_key():
3943 self._entity_converter.v3_to_v4_key(
3944 v3_entity.key(),
3945 v4_resp.add_missing().mutable_entity().mutable_key())
3947 return v4_resp
3949 def v4_to_v3_query_result(self, v4_batch):
3950 """Converts a v4 QueryResultBatch to a v3 QueryResult.
3952 Args:
3953 v4_batch: a datastore_v4_pb.QueryResultBatch
3955 Returns:
3956 a datastore_pb.QueryResult
3958 v3_result = datastore_pb.QueryResult()
3961 v3_result.set_more_results(
3962 (v4_batch.more_results()
3963 == datastore_v4_pb.QueryResultBatch.NOT_FINISHED))
3964 if v4_batch.has_end_cursor():
3965 self._query_converter.v4_to_v3_compiled_cursor(
3966 v4_batch.end_cursor(), v3_result.mutable_compiled_cursor())
3969 if v4_batch.entity_result_type() == datastore_v4_pb.EntityResult.PROJECTION:
3970 v3_result.set_index_only(True)
3971 elif v4_batch.entity_result_type() == datastore_v4_pb.EntityResult.KEY_ONLY:
3972 v3_result.set_keys_only(True)
3975 if v4_batch.has_skipped_results():
3976 v3_result.set_skipped_results(v4_batch.skipped_results())
3977 for v4_entity in v4_batch.entity_result_list():
3978 v3_entity = v3_result.add_result()
3979 self._entity_converter.v4_to_v3_entity(v4_entity.entity(), v3_entity)
3980 if v4_batch.entity_result_type() != datastore_v4_pb.EntityResult.FULL:
3983 v3_entity.clear_entity_group()
3985 return v3_result
3987 def v3_to_v4_query_result_batch(self, v3_result, v4_batch):
3988 """Converts a v3 QueryResult to a v4 QueryResultBatch.
3990 Args:
3991 v3_result: a datastore_pb.QueryResult
3992 v4_batch: a datastore_v4_pb.QueryResultBatch to populate
3994 v4_batch.Clear()
3997 if v3_result.more_results():
3998 v4_batch.set_more_results(datastore_v4_pb.QueryResultBatch.NOT_FINISHED)
3999 else:
4000 v4_batch.set_more_results(
4001 datastore_v4_pb.QueryResultBatch.MORE_RESULTS_AFTER_LIMIT)
4002 if v3_result.has_compiled_cursor():
4003 v4_batch.set_end_cursor(
4004 self._query_converter.v3_to_v4_compiled_cursor(
4005 v3_result.compiled_cursor()))
4008 if v3_result.keys_only():
4009 v4_batch.set_entity_result_type(datastore_v4_pb.EntityResult.KEY_ONLY)
4010 elif v3_result.index_only():
4011 v4_batch.set_entity_result_type(datastore_v4_pb.EntityResult.PROJECTION)
4012 else:
4013 v4_batch.set_entity_result_type(datastore_v4_pb.EntityResult.FULL)
4016 if v3_result.has_skipped_results():
4017 v4_batch.set_skipped_results(v3_result.skipped_results())
4018 for v3_entity in v3_result.result_list():
4019 v4_entity_result = datastore_v4_pb.EntityResult()
4020 self._entity_converter.v3_to_v4_entity(v3_entity,
4021 v4_entity_result.mutable_entity())
4022 v4_batch.entity_result_list().append(v4_entity_result)
4026 __service_converter = StubServiceConverter(
4027 datastore_pbs.get_entity_converter(), __query_converter)
4030 def get_service_converter():
4031 """Returns a converter for v3 and v4 service request/response protos.
4033 This converter is suitable for use in stubs but not for production.
4035 Returns:
4036 a StubServiceConverter
4038 return __service_converter
4041 def ReverseBitsInt64(v):
4042 """Reverse the bits of a 64-bit integer.
4044 Args:
4045 v: Input integer of type 'int' or 'long'.
4047 Returns:
4048 Bit-reversed input as 'int' on 64-bit machines or as 'long' otherwise.
4051 v = ((v >> 1) & 0x5555555555555555) | ((v & 0x5555555555555555) << 1)
4052 v = ((v >> 2) & 0x3333333333333333) | ((v & 0x3333333333333333) << 2)
4053 v = ((v >> 4) & 0x0F0F0F0F0F0F0F0F) | ((v & 0x0F0F0F0F0F0F0F0F) << 4)
4054 v = ((v >> 8) & 0x00FF00FF00FF00FF) | ((v & 0x00FF00FF00FF00FF) << 8)
4055 v = ((v >> 16) & 0x0000FFFF0000FFFF) | ((v & 0x0000FFFF0000FFFF) << 16)
4056 v = int((v >> 32) | (v << 32) & 0xFFFFFFFFFFFFFFFF)
4057 return v
4060 def ToScatteredId(v):
4061 """Map counter value v to the scattered ID space.
4063 Translate to scattered ID space, then reverse bits.
4065 Args:
4066 v: Counter value from which to produce ID.
4068 Returns:
4069 Integer ID.
4071 Raises:
4072 datastore_errors.BadArgumentError if counter value exceeds the range of
4073 the scattered ID space.
4075 if v >= _MAX_SCATTERED_COUNTER:
4076 raise datastore_errors.BadArgumentError('counter value too large (%d)' %v)
4077 return _MAX_SEQUENTIAL_ID + 1 + long(ReverseBitsInt64(v << _SCATTER_SHIFT))
4080 def IdToCounter(k):
4081 """Map ID k to the counter value from which it was generated.
4083 Determine whether k is sequential or scattered ID.
4085 Args:
4086 k: ID from which to infer counter value.
4088 Returns:
4089 Tuple of integers (counter_value, id_space).
4091 if k > _MAX_SCATTERED_ID:
4092 return 0, SCATTERED
4093 elif k > _MAX_SEQUENTIAL_ID and k <= _MAX_SCATTERED_ID:
4094 return long(ReverseBitsInt64(k) >> _SCATTER_SHIFT), SCATTERED
4095 elif k > 0:
4096 return long(k), SEQUENTIAL
4097 else:
4098 raise datastore_errors.BadArgumentError('invalid id (%d)' % k)
4101 def CompareEntityPbByKey(a, b):
4102 """Compare two entity protobuf's by key.
4104 Args:
4105 a: entity_pb.EntityProto to compare
4106 b: entity_pb.EntityProto to compare
4107 Returns:
4108 <0 if a's key is before b's, =0 if they are the same key, and >0 otherwise.
4110 return cmp(datastore_types.Key._FromPb(a.key()),
4111 datastore_types.Key._FromPb(b.key()))
4114 def _GuessOrders(filters, orders):
4115 """Guess any implicit ordering.
4117 The datastore gives a logical, but not necessarily predictable, ordering when
4118 orders are not completely explicit. This function guesses at that ordering
4119 (which is better then always ordering by __key__ for tests).
4121 Args:
4122 filters: The datastore_pb.Query_Filter that have already been normalized and
4123 checked.
4124 orders: The datastore_pb.Query_Order that have already been normalized and
4125 checked. Mutated in place.
4127 orders = orders[:]
4130 if not orders:
4131 for filter_pb in filters:
4132 if filter_pb.op() in datastore_index.INEQUALITY_OPERATORS:
4134 order = datastore_pb.Query_Order()
4135 order.set_property(filter_pb.property(0).name())
4136 orders.append(order)
4137 break
4140 exists_props = (filter_pb.property(0).name() for filter_pb in filters
4141 if filter_pb.op() == datastore_pb.Query_Filter.EXISTS)
4142 for prop in sorted(exists_props):
4143 order = datastore_pb.Query_Order()
4144 order.set_property(prop)
4145 orders.append(order)
4148 if not orders or orders[-1].property() != '__key__':
4149 order = datastore_pb.Query_Order()
4150 order.set_property('__key__')
4151 orders.append(order)
4152 return orders
4155 def _MakeQuery(query, filters, orders):
4156 """Make a datastore_query.Query for the given datastore_pb.Query.
4158 Overrides filters and orders in query with the specified arguments."""
4159 clone = datastore_pb.Query()
4160 clone.CopyFrom(query)
4161 clone.clear_filter()
4162 clone.clear_order()
4163 clone.filter_list().extend(filters)
4164 clone.order_list().extend(orders)
4165 return datastore_query.Query._from_pb(clone)
4167 def _CreateIndexEntities(entity, postfix_props):
4168 """Creates entities for index values that would appear in prodcution.
4170 This function finds all multi-valued properties listed in split_props, and
4171 creates a new entity for each unique combination of values. The resulting
4172 entities will only have a single value for each property listed in
4173 split_props.
4175 It reserves the right to include index data that would not be
4176 seen in production, e.g. by returning the original entity when no splitting
4177 is needed. LoadEntity will remove any excess fields.
4179 This simulates the results seen by an index scan in the datastore.
4181 Args:
4182 entity: The entity_pb.EntityProto to split.
4183 split_props: A set of property names to split on.
4185 Returns:
4186 A list of the split entity_pb.EntityProtos.
4188 to_split = {}
4189 split_required = False
4190 base_props = []
4191 for prop in entity.property_list():
4192 if prop.name() in postfix_props:
4193 values = to_split.get(prop.name())
4194 if values is None:
4195 values = []
4196 to_split[prop.name()] = values
4197 else:
4199 split_required = True
4200 if prop.value() not in values:
4201 values.append(prop.value())
4202 else:
4203 base_props.append(prop)
4205 if not split_required:
4207 return [entity]
4209 clone = entity_pb.EntityProto()
4210 clone.CopyFrom(entity)
4211 clone.clear_property()
4212 clone.property_list().extend(base_props)
4213 results = [clone]
4215 for name, splits in to_split.iteritems():
4216 if len(splits) == 1:
4218 for result in results:
4219 prop = result.add_property()
4220 prop.set_name(name)
4221 prop.set_multiple(False)
4222 prop.set_meaning(entity_pb.Property.INDEX_VALUE)
4223 prop.mutable_value().CopyFrom(splits[0])
4224 continue
4226 new_results = []
4227 for result in results:
4228 for split in splits:
4229 clone = entity_pb.EntityProto()
4230 clone.CopyFrom(result)
4231 prop = clone.add_property()
4232 prop.set_name(name)
4233 prop.set_multiple(False)
4234 prop.set_meaning(entity_pb.Property.INDEX_VALUE)
4235 prop.mutable_value().CopyFrom(split)
4236 new_results.append(clone)
4237 results = new_results
4238 return results
4241 def _CreateIndexOnlyQueryResults(results, postfix_props):
4242 """Creates a result set similar to that returned by an index only query."""
4243 new_results = []
4244 for result in results:
4245 new_results.extend(_CreateIndexEntities(result, postfix_props))
4246 return new_results
4249 def _ExecuteQuery(results, query, filters, orders, index_list):
4250 """Executes the query on a superset of its results.
4252 Args:
4253 results: superset of results for query.
4254 query: a datastore_pb.Query.
4255 filters: the filters from query.
4256 orders: the orders from query.
4257 index_list: the list of indexes used by the query.
4259 Returns:
4260 A ListCursor over the results of applying query to results.
4262 orders = _GuessOrders(filters, orders)
4263 dsquery = _MakeQuery(query, filters, orders)
4265 if query.property_name_size():
4266 results = _CreateIndexOnlyQueryResults(
4267 results, set(order.property() for order in orders))
4269 return ListCursor(query, dsquery, orders, index_list,
4270 datastore_query.apply_query(dsquery, results))
4273 def _UpdateCost(cost, entity_writes, index_writes):
4274 """Updates the provided cost.
4276 Args:
4277 cost: Out param. The cost object to update.
4278 entity_writes: The number of entity writes to add.
4279 index_writes: The number of index writes to add.
4281 cost.set_entity_writes(cost.entity_writes() + entity_writes)
4282 cost.set_index_writes(cost.index_writes() + index_writes)
4285 def _CalculateWriteOps(composite_indexes, old_entity, new_entity):
4286 """Determines number of entity and index writes needed to write new_entity.
4288 We assume that old_entity represents the current state of the Datastore.
4290 Args:
4291 composite_indexes: The composite_indexes for the kind of the entities.
4292 old_entity: Entity representing the current state in the Datstore.
4293 new_entity: Entity representing the desired state in the Datstore.
4295 Returns:
4296 A tuple of size 2, where the first value is the number of entity writes and
4297 the second value is the number of index writes.
4299 if (old_entity is not None and
4300 old_entity.property_list() == new_entity.property_list()
4301 and old_entity.raw_property_list() == new_entity.raw_property_list()):
4302 return 0, 0
4304 index_writes = _ChangedIndexRows(composite_indexes, old_entity, new_entity)
4305 if old_entity is None:
4309 index_writes += 1
4311 return 1, index_writes
4314 def _ChangedIndexRows(composite_indexes, old_entity, new_entity):
4315 """Determine the number of index rows that need to change.
4317 We assume that old_entity represents the current state of the Datastore.
4319 Args:
4320 composite_indexes: The composite_indexes for the kind of the entities.
4321 old_entity: Entity representing the current state in the Datastore.
4322 new_entity: Entity representing the desired state in the Datastore
4324 Returns:
4325 The number of index rows that need to change.
4330 unique_old_properties = collections.defaultdict(set)
4335 unique_new_properties = collections.defaultdict(set)
4337 if old_entity is not None:
4338 for old_prop in old_entity.property_list():
4339 _PopulateUniquePropertiesSet(old_prop, unique_old_properties)
4342 unchanged = collections.defaultdict(int)
4344 for new_prop in new_entity.property_list():
4345 new_prop_as_str = _PopulateUniquePropertiesSet(
4346 new_prop, unique_new_properties)
4347 if new_prop_as_str in unique_old_properties[new_prop.name()]:
4348 unchanged[new_prop.name()] += 1
4353 all_property_names = set(unique_old_properties.iterkeys())
4354 all_property_names.update(unique_old_properties.iterkeys())
4355 all_property_names.update(unchanged.iterkeys())
4357 all_indexes = _GetEntityByPropertyIndexes(all_property_names)
4358 all_indexes.extend([comp.definition() for comp in composite_indexes])
4359 path_size = new_entity.key().path().element_size()
4360 writes = 0
4361 for index in all_indexes:
4365 ancestor_multiplier = 1
4366 if index.ancestor() and index.property_size() > 1:
4367 ancestor_multiplier = path_size
4368 writes += (_CalculateWritesForCompositeIndex(
4369 index, unique_old_properties, unique_new_properties, unchanged) *
4370 ancestor_multiplier)
4371 return writes
4374 def _PopulateUniquePropertiesSet(prop, unique_properties):
4375 """Populates a set containing unique properties.
4377 Args:
4378 prop: An entity property.
4379 unique_properties: Dictionary mapping property names to a set of unique
4380 properties.
4382 Returns:
4383 The property pb in string (hashable) form.
4385 if prop.multiple():
4386 prop = _CopyAndSetMultipleToFalse(prop)
4389 prop_as_str = prop.SerializePartialToString()
4390 unique_properties[prop.name()].add(prop_as_str)
4391 return prop_as_str
4394 def _CalculateWritesForCompositeIndex(index, unique_old_properties,
4395 unique_new_properties,
4396 common_properties):
4397 """Calculate the number of writes required to maintain a specific Index.
4399 Args:
4400 index: The composite index.
4401 unique_old_properties: Dictionary mapping property names to a set of props
4402 present on the old entity.
4403 unique_new_properties: Dictionary mapping property names to a set of props
4404 present on the new entity.
4405 common_properties: Dictionary mapping property names to the number of
4406 properties with that name that are present on both the old and new
4407 entities.
4409 Returns:
4410 The number of writes required to maintained the provided index.
4412 old_count = 1
4413 new_count = 1
4414 common_count = 1
4415 for prop in index.property_list():
4416 old_count *= len(unique_old_properties[prop.name()])
4417 new_count *= len(unique_new_properties[prop.name()])
4418 common_count *= common_properties[prop.name()]
4420 return (old_count - common_count) + (new_count - common_count)
4423 def _GetEntityByPropertyIndexes(all_property_names):
4424 indexes = []
4425 for prop_name in all_property_names:
4426 indexes.append(
4427 _SinglePropertyIndex(prop_name, entity_pb.Index_Property.ASCENDING))
4428 indexes.append(
4429 _SinglePropertyIndex(prop_name, entity_pb.Index_Property.DESCENDING))
4430 return indexes
4433 def _SinglePropertyIndex(prop_name, direction):
4434 """Creates a single property Index for the given name and direction.
4436 Args:
4437 prop_name: The name of the single property on the Index.
4438 direction: The direction of the Index.
4440 Returns:
4441 A single property Index with the given property and direction.
4443 index = entity_pb.Index()
4444 prop = index.add_property()
4445 prop.set_name(prop_name)
4446 prop.set_direction(direction)
4447 return index
4450 def _CopyAndSetMultipleToFalse(prop):
4451 """Copy the provided Property and set its "multiple" attribute to False.
4453 Args:
4454 prop: The Property to copy.
4456 Returns:
4457 A copy of the given Property with its "multiple" attribute set to False.
4464 prop_copy = entity_pb.Property()
4465 prop_copy.MergeFrom(prop)
4466 prop_copy.set_multiple(False)
4467 return prop_copy