App Engine Python SDK version 1.9.13
[gae.git] / python / google / appengine / datastore / datastore_stub_util.py
blob3e459d98e8f56607fd0e491e2f6a334715fec962
1 #!/usr/bin/env python
3 # Copyright 2007 Google Inc.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
21 """Utility functions shared between the file and sqlite datastore stubs.
23 This module is internal and should not be used by client applications.
24 """
27 from __future__ import with_statement
35 try:
36 import hashlib
37 _MD5_FUNC = hashlib.md5
38 except ImportError:
39 import md5
40 _MD5_FUNC = md5.new
42 import atexit
43 import collections
44 import itertools
45 import logging
46 import os
47 import random
48 import struct
49 import threading
50 import time
51 import weakref
53 from google.net.proto import ProtocolBuffer
54 from google.appengine.datastore import entity_pb
56 from google.appengine.api import api_base_pb
57 from google.appengine.api import apiproxy_stub_map
58 from google.appengine.api import datastore_admin
59 from google.appengine.api import datastore_errors
60 from google.appengine.api import datastore_types
61 from google.appengine.api import yaml_errors
62 from google.appengine.api.taskqueue import taskqueue_service_pb
63 from google.appengine.datastore import datastore_index
64 from google.appengine.datastore import datastore_pb
65 from google.appengine.datastore import datastore_pbs
66 from google.appengine.datastore import datastore_query
67 from google.appengine.datastore import datastore_stub_index
68 from google.appengine.datastore import datastore_v4_pb
69 from google.appengine.runtime import apiproxy_errors
74 _MAXIMUM_RESULTS = 300
80 _MAXIMUM_QUERY_RESULT_BYTES = 2000000
86 _MAX_QUERY_OFFSET = 1000
90 _PROPERTY_TYPE_NAMES = {
91 0: 'NULL',
92 entity_pb.PropertyValue.kint64Value: 'INT64',
93 entity_pb.PropertyValue.kbooleanValue: 'BOOLEAN',
94 entity_pb.PropertyValue.kstringValue: 'STRING',
95 entity_pb.PropertyValue.kdoubleValue: 'DOUBLE',
96 entity_pb.PropertyValue.kPointValueGroup: 'POINT',
97 entity_pb.PropertyValue.kUserValueGroup: 'USER',
98 entity_pb.PropertyValue.kReferenceValueGroup: 'REFERENCE'
103 _SCATTER_PROPORTION = 32768
108 _MAX_EG_PER_TXN = 5
113 _BLOB_MEANINGS = frozenset((entity_pb.Property.BLOB,
114 entity_pb.Property.ENTITY_PROTO,
115 entity_pb.Property.TEXT))
123 _RETRIES = 3
127 _INITIAL_RETRY_DELAY_MS = 100
131 _RETRY_DELAY_MULTIPLIER = 2
135 _MAX_RETRY_DELAY_MS = 120000
140 SEQUENTIAL = 'sequential'
141 SCATTERED = 'scattered'
147 _MAX_SEQUENTIAL_BIT = 52
152 _MAX_SEQUENTIAL_COUNTER = (1 << _MAX_SEQUENTIAL_BIT) - 1
156 _MAX_SEQUENTIAL_ID = _MAX_SEQUENTIAL_COUNTER
161 _MAX_SCATTERED_COUNTER = (1 << (_MAX_SEQUENTIAL_BIT - 1)) - 1
167 _MAX_SCATTERED_ID = _MAX_SEQUENTIAL_ID + 1 + _MAX_SCATTERED_COUNTER
171 _SCATTER_SHIFT = 64 - _MAX_SEQUENTIAL_BIT + 1
174 def _GetScatterProperty(entity_proto):
175 """Gets the scatter property for an object.
177 For ease of implementation, this is not synchronized with the actual
178 value on the App Engine server, but should work equally well.
180 Note: This property may change, either here or in production. No client
181 other than the mapper framework should rely on it directly.
183 Returns:
184 The PropertyValue of the scatter property or None if this entity should not
185 have a scatter property.
187 hash_obj = _MD5_FUNC()
188 for element in entity_proto.key().path().element_list():
189 if element.has_name():
190 hash_obj.update(element.name())
191 elif element.has_id():
192 hash_obj.update(str(element.id()))
193 hash_bytes = hash_obj.digest()[0:2]
194 (hash_int,) = struct.unpack('H', hash_bytes)
196 if hash_int >= _SCATTER_PROPORTION:
197 return None
199 scatter_property = entity_pb.Property()
200 scatter_property.set_name(datastore_types.SCATTER_SPECIAL_PROPERTY)
201 scatter_property.set_meaning(entity_pb.Property.BYTESTRING)
202 scatter_property.set_multiple(False)
203 property_value = scatter_property.mutable_value()
204 property_value.set_stringvalue(hash_bytes)
205 return scatter_property
211 _SPECIAL_PROPERTY_MAP = {
212 datastore_types.SCATTER_SPECIAL_PROPERTY: (False, True, _GetScatterProperty)
216 def GetInvisibleSpecialPropertyNames():
217 """Gets the names of all non user-visible special properties."""
218 invisible_names = []
219 for name, value in _SPECIAL_PROPERTY_MAP.items():
220 is_visible, _, _ = value
221 if not is_visible:
222 invisible_names.append(name)
223 return invisible_names
226 def _PrepareSpecialProperties(entity_proto, is_load):
227 """Computes special properties for loading or storing.
228 Strips other special properties."""
229 for i in xrange(entity_proto.property_size() - 1, -1, -1):
230 if _SPECIAL_PROPERTY_MAP.has_key(entity_proto.property(i).name()):
231 del entity_proto.property_list()[i]
233 for is_visible, is_stored, property_func in _SPECIAL_PROPERTY_MAP.values():
234 if is_load:
235 should_process = is_visible
236 else:
237 should_process = is_stored
239 if should_process:
240 special_property = property_func(entity_proto)
241 if special_property:
242 entity_proto.property_list().append(special_property)
245 def _GetGroupByKey(entity, property_names):
246 """Computes a key value that uniquely identifies the 'group' of an entity.
248 Args:
249 entity: The entity_pb.EntityProto for which to create the group key.
250 property_names: The names of the properties in the group by clause.
252 Returns:
253 A hashable value that uniquely identifies the entity's 'group'.
255 return frozenset((prop.name(), prop.value().SerializeToString())
256 for prop in entity.property_list()
257 if prop.name() in property_names)
260 def PrepareSpecialPropertiesForStore(entity_proto):
261 """Computes special properties for storing.
262 Strips other special properties."""
263 _PrepareSpecialProperties(entity_proto, False)
266 def LoadEntity(entity, keys_only=False, property_names=None):
267 """Prepares an entity to be returned to the user.
269 Args:
270 entity: a entity_pb.EntityProto or None
271 keys_only: if a keys only result should be produced
272 property_names: if not None or empty, cause a projected entity
273 to be produced with the given properties.
275 Returns:
276 A user friendly copy of entity or None.
278 if entity:
279 clone = entity_pb.EntityProto()
280 if property_names:
282 clone.mutable_key().CopyFrom(entity.key())
283 clone.mutable_entity_group()
284 seen = set()
285 for prop in entity.property_list():
286 if prop.name() in property_names:
288 Check(prop.name() not in seen,
289 "datastore dev stub produced bad result",
290 datastore_pb.Error.INTERNAL_ERROR)
291 seen.add(prop.name())
292 new_prop = clone.add_property()
293 new_prop.set_name(prop.name())
294 new_prop.set_meaning(entity_pb.Property.INDEX_VALUE)
295 new_prop.mutable_value().CopyFrom(prop.value())
296 new_prop.set_multiple(False)
297 elif keys_only:
299 clone.mutable_key().CopyFrom(entity.key())
300 clone.mutable_entity_group()
301 else:
303 clone.CopyFrom(entity)
304 PrepareSpecialPropertiesForLoad(clone)
305 return clone
308 def StoreEntity(entity):
309 """Prepares an entity for storing.
311 Args:
312 entity: a entity_pb.EntityProto to prepare
314 Returns:
315 A copy of entity that should be stored in its place.
317 clone = entity_pb.EntityProto()
318 clone.CopyFrom(entity)
322 PrepareSpecialPropertiesForStore(clone)
323 return clone
326 def PrepareSpecialPropertiesForLoad(entity_proto):
327 """Computes special properties that are user-visible.
328 Strips other special properties."""
329 _PrepareSpecialProperties(entity_proto, True)
332 def Check(test, msg='', error_code=datastore_pb.Error.BAD_REQUEST):
333 """Raises an apiproxy_errors.ApplicationError if the condition is false.
335 Args:
336 test: A condition to test.
337 msg: A string to return with the error.
338 error_code: One of datastore_pb.Error to use as an error code.
340 Raises:
341 apiproxy_errors.ApplicationError: If test is false.
343 if not test:
344 raise apiproxy_errors.ApplicationError(error_code, msg)
347 def CheckValidUTF8(string, desc):
348 """Check that the given string is valid UTF-8.
350 Args:
351 string: the string to validate.
352 desc: a description of the string being validated.
354 Raises:
355 apiproxy_errors.ApplicationError: if the string is not valid UTF-8.
357 try:
358 string.decode('utf-8')
359 except UnicodeDecodeError:
360 Check(False, '%s is not valid UTF-8.' % desc)
363 def CheckAppId(request_trusted, request_app_id, app_id):
364 """Check that this is the stub for app_id.
366 Args:
367 request_trusted: If the request is trusted.
368 request_app_id: The application ID of the app making the request.
369 app_id: An application ID.
371 Raises:
372 apiproxy_errors.ApplicationError: if this is not the stub for app_id.
375 assert app_id
376 CheckValidUTF8(app_id, "app id");
377 Check(request_trusted or app_id == request_app_id,
378 'app "%s" cannot access app "%s"\'s data' % (request_app_id, app_id))
381 def CheckReference(request_trusted,
382 request_app_id,
383 key,
384 require_id_or_name=True):
385 """Check this key.
387 Args:
388 request_trusted: If the request is trusted.
389 request_app_id: The application ID of the app making the request.
390 key: entity_pb.Reference
391 require_id_or_name: Boolean indicating if we should enforce the presence of
392 an id or name in the last element of the key's path.
394 Raises:
395 apiproxy_errors.ApplicationError: if the key is invalid
398 assert isinstance(key, entity_pb.Reference)
400 CheckAppId(request_trusted, request_app_id, key.app())
402 Check(key.path().element_size() > 0, 'key\'s path cannot be empty')
404 if require_id_or_name:
406 last_element = key.path().element_list()[-1]
407 has_id_or_name = ((last_element.has_id() and last_element.id() != 0) or
408 (last_element.has_name() and last_element.name() != ""))
409 if not has_id_or_name:
410 raise datastore_errors.BadRequestError('missing key id/name')
412 for elem in key.path().element_list():
413 Check(not elem.has_id() or not elem.has_name(),
414 'each key path element should have id or name but not both: %r' % key)
415 CheckValidUTF8(elem.type(), 'key path element type')
416 if elem.has_name():
417 CheckValidUTF8(elem.name(), 'key path element name')
420 def CheckEntity(request_trusted, request_app_id, entity):
421 """Check if this entity can be stored.
423 Args:
424 request_trusted: If the request is trusted.
425 request_app_id: The application ID of the app making the request.
426 entity: entity_pb.EntityProto
428 Raises:
429 apiproxy_errors.ApplicationError: if the entity is invalid
433 CheckReference(request_trusted, request_app_id, entity.key(), False)
434 for prop in entity.property_list():
435 CheckProperty(request_trusted, request_app_id, prop)
436 for prop in entity.raw_property_list():
437 CheckProperty(request_trusted, request_app_id, prop, indexed=False)
440 def CheckProperty(request_trusted, request_app_id, prop, indexed=True):
441 """Check if this property can be stored.
443 Args:
444 request_trusted: If the request is trusted.
445 request_app_id: The application ID of the app making the request.
446 prop: entity_pb.Property
447 indexed: Whether the property is indexed.
449 Raises:
450 apiproxy_errors.ApplicationError: if the property is invalid
452 name = prop.name()
453 value = prop.value()
454 meaning = prop.meaning()
455 CheckValidUTF8(name, 'property name')
456 Check(request_trusted or
457 not datastore_types.RESERVED_PROPERTY_NAME.match(name),
458 'cannot store entity with reserved property name \'%s\'' % name)
459 Check(prop.meaning() != entity_pb.Property.INDEX_VALUE,
460 'Entities with incomplete properties cannot be written.')
461 is_blob = meaning in _BLOB_MEANINGS
462 if indexed:
463 Check(not is_blob,
464 'BLOB, ENITY_PROTO or TEXT property ' + name +
465 ' must be in a raw_property field')
466 max_length = datastore_types._MAX_STRING_LENGTH
467 else:
468 if is_blob:
469 Check(value.has_stringvalue(),
470 'BLOB / ENTITY_PROTO / TEXT raw property ' + name +
471 'must have a string value')
472 max_length = datastore_types._MAX_RAW_PROPERTY_BYTES
473 if meaning == entity_pb.Property.ATOM_LINK:
474 max_length = datastore_types._MAX_LINK_PROPERTY_LENGTH
476 CheckPropertyValue(name, value, max_length, meaning)
479 def CheckPropertyValue(name, value, max_length, meaning):
480 """Check if this property value can be stored.
482 Args:
483 name: name of the property
484 value: entity_pb.PropertyValue
485 max_length: maximum length for string values
486 meaning: meaning of the property
488 Raises:
489 apiproxy_errors.ApplicationError: if the property is invalid
491 num_values = (value.has_int64value() +
492 value.has_stringvalue() +
493 value.has_booleanvalue() +
494 value.has_doublevalue() +
495 value.has_pointvalue() +
496 value.has_uservalue() +
497 value.has_referencevalue())
498 Check(num_values <= 1, 'PropertyValue for ' + name +
499 ' has multiple value fields set')
501 if value.has_stringvalue():
509 s16 = value.stringvalue().decode('utf-8', 'replace').encode('utf-16')
511 Check((len(s16) - 2) / 2 <= max_length,
512 'Property %s is too long. Maximum length is %d.' % (name, max_length))
513 if (meaning not in _BLOB_MEANINGS and
514 meaning != entity_pb.Property.BYTESTRING):
515 CheckValidUTF8(value.stringvalue(), 'String property value')
518 def CheckTransaction(request_trusted, request_app_id, transaction):
519 """Check that this transaction is valid.
521 Args:
522 request_trusted: If the request is trusted.
523 request_app_id: The application ID of the app making the request.
524 transaction: datastore_pb.Transaction
526 Raises:
527 apiproxy_errors.ApplicationError: if the transaction is not valid.
529 assert isinstance(transaction, datastore_pb.Transaction)
530 CheckAppId(request_trusted, request_app_id, transaction.app())
533 def CheckQuery(query, filters, orders, max_query_components):
534 """Check a datastore query with normalized filters, orders.
536 Raises an ApplicationError when any of the following conditions are violated:
537 - transactional queries have an ancestor
538 - queries that are not too large
539 (sum of filters, orders, ancestor <= max_query_components)
540 - ancestor (if any) app and namespace match query app and namespace
541 - kindless queries only filter on __key__ and only sort on __key__ ascending
542 - multiple inequality (<, <=, >, >=) filters all applied to the same property
543 - filters on __key__ compare to a reference in the same app and namespace as
544 the query
545 - if an inequality filter on prop X is used, the first order (if any) must
546 be on X
548 Args:
549 query: query to validate
550 filters: normalized (by datastore_index.Normalize) filters from query
551 orders: normalized (by datastore_index.Normalize) orders from query
552 max_query_components: limit on query complexity
554 Check(query.property_name_size() == 0 or not query.keys_only(),
555 'projection and keys_only cannot both be set')
557 projected_properties = set(query.property_name_list())
558 for prop_name in query.property_name_list():
559 Check(not datastore_types.RESERVED_PROPERTY_NAME.match(prop_name),
560 'projections are not supported for the property: ' + prop_name)
561 Check(len(projected_properties) == len(query.property_name_list()),
562 "cannot project a property multiple times")
564 key_prop_name = datastore_types.KEY_SPECIAL_PROPERTY
565 unapplied_log_timestamp_us_name = (
566 datastore_types._UNAPPLIED_LOG_TIMESTAMP_SPECIAL_PROPERTY)
568 if query.has_transaction():
570 Check(query.has_ancestor(),
571 'Only ancestor queries are allowed inside transactions.')
574 num_components = len(filters) + len(orders)
575 if query.has_ancestor():
576 num_components += 1
577 Check(num_components <= max_query_components,
578 'query is too large. may not have more than %s filters'
579 ' + sort orders ancestor total' % max_query_components)
582 if query.has_ancestor():
583 ancestor = query.ancestor()
584 Check(query.app() == ancestor.app(),
585 'query app is %s but ancestor app is %s' %
586 (query.app(), ancestor.app()))
587 Check(query.name_space() == ancestor.name_space(),
588 'query namespace is %s but ancestor namespace is %s' %
589 (query.name_space(), ancestor.name_space()))
592 if query.group_by_property_name_size():
593 group_by_set = set(query.group_by_property_name_list())
594 for order in orders:
595 if not group_by_set:
596 break
597 Check(order.property() in group_by_set,
598 'items in the group by clause must be specified first '
599 'in the ordering')
600 group_by_set.remove(order.property())
604 ineq_prop_name = None
605 for filter in filters:
606 Check(filter.property_size() == 1,
607 'Filter has %d properties, expected 1' % filter.property_size())
609 prop = filter.property(0)
610 prop_name = prop.name().decode('utf-8')
612 if prop_name == key_prop_name:
616 Check(prop.value().has_referencevalue(),
617 '%s filter value must be a Key' % key_prop_name)
618 ref_val = prop.value().referencevalue()
619 Check(ref_val.app() == query.app(),
620 '%s filter app is %s but query app is %s' %
621 (key_prop_name, ref_val.app(), query.app()))
622 Check(ref_val.name_space() == query.name_space(),
623 '%s filter namespace is %s but query namespace is %s' %
624 (key_prop_name, ref_val.name_space(), query.name_space()))
626 if filter.op() in datastore_index.EQUALITY_OPERATORS:
627 Check(prop_name not in projected_properties,
628 'cannot use projection on a property with an equality filter')
629 if (filter.op() in datastore_index.INEQUALITY_OPERATORS and
630 prop_name != unapplied_log_timestamp_us_name):
631 if ineq_prop_name is None:
632 ineq_prop_name = prop_name
633 else:
634 Check(ineq_prop_name == prop_name,
635 'Only one inequality filter per query is supported. '
636 'Encountered both %s and %s' % (ineq_prop_name, prop_name))
638 if (ineq_prop_name is not None
639 and query.group_by_property_name_size() > 0
640 and not orders):
642 Check(ineq_prop_name in group_by_set,
643 'Inequality filter on %s must also be a group by '
644 'property when group by properties are set.'
645 % (ineq_prop_name))
647 if ineq_prop_name is not None and orders:
649 first_order_prop = orders[0].property().decode('utf-8')
650 Check(first_order_prop == ineq_prop_name,
651 'The first sort property must be the same as the property '
652 'to which the inequality filter is applied. In your query '
653 'the first sort property is %s but the inequality filter '
654 'is on %s' % (first_order_prop, ineq_prop_name))
656 if not query.has_kind():
658 for filter in filters:
659 prop_name = filter.property(0).name().decode('utf-8')
660 Check(prop_name == key_prop_name or
661 prop_name == unapplied_log_timestamp_us_name,
662 'kind is required for non-__key__ filters')
663 for order in orders:
664 prop_name = order.property().decode('utf-8')
665 Check(prop_name == key_prop_name and
666 order.direction() is datastore_pb.Query_Order.ASCENDING,
667 'kind is required for all orders except __key__ ascending')
670 class ValueRange(object):
671 """A range of values defined by its two extremes (inclusive or exclusive)."""
673 def __init__(self):
674 """Constructor.
676 Creates an unlimited range.
678 self.__start = self.__end = None
679 self.__start_inclusive = self.__end_inclusive = False
681 def Update(self, rel_op, limit):
682 """Filter the range by 'rel_op limit'.
684 Args:
685 rel_op: relational operator from datastore_pb.Query_Filter.
686 limit: the value to limit the range by.
689 if rel_op == datastore_pb.Query_Filter.LESS_THAN:
690 if self.__end is None or limit <= self.__end:
691 self.__end = limit
692 self.__end_inclusive = False
693 elif (rel_op == datastore_pb.Query_Filter.LESS_THAN_OR_EQUAL or
694 rel_op == datastore_pb.Query_Filter.EQUAL):
695 if self.__end is None or limit < self.__end:
696 self.__end = limit
697 self.__end_inclusive = True
699 if rel_op == datastore_pb.Query_Filter.GREATER_THAN:
700 if self.__start is None or limit >= self.__start:
701 self.__start = limit
702 self.__start_inclusive = False
703 elif (rel_op == datastore_pb.Query_Filter.GREATER_THAN_OR_EQUAL or
704 rel_op == datastore_pb.Query_Filter.EQUAL):
705 if self.__start is None or limit > self.__start:
706 self.__start = limit
707 self.__start_inclusive = True
709 def Contains(self, value):
710 """Check if the range contains a specific value.
712 Args:
713 value: the value to check.
714 Returns:
715 True iff value is contained in this range.
717 if self.__start is not None:
718 if self.__start_inclusive and value < self.__start: return False
719 if not self.__start_inclusive and value <= self.__start: return False
720 if self.__end is not None:
721 if self.__end_inclusive and value > self.__end: return False
722 if not self.__end_inclusive and value >= self.__end: return False
723 return True
725 def Remap(self, mapper):
726 """Transforms the range extremes with a function.
728 The function mapper must preserve order, i.e.
729 x rel_op y iff mapper(x) rel_op y
731 Args:
732 mapper: function to apply to the range extremes.
734 self.__start = self.__start and mapper(self.__start)
735 self.__end = self.__end and mapper(self.__end)
737 def MapExtremes(self, mapper):
738 """Evaluate a function on the range extremes.
740 Args:
741 mapper: function to apply to the range extremes.
742 Returns:
743 (x, y) where x = None if the range has no start,
744 mapper(start, start_inclusive, False) otherwise
745 y = None if the range has no end,
746 mapper(end, end_inclusive, True) otherwise
748 return (
749 self.__start and mapper(self.__start, self.__start_inclusive, False),
750 self.__end and mapper(self.__end, self.__end_inclusive, True))
753 def ParseKeyFilteredQuery(filters, orders):
754 """Parse queries which only allow filters and ascending-orders on __key__.
756 Raises exceptions for illegal queries.
757 Args:
758 filters: the normalized filters of a query.
759 orders: the normalized orders of a query.
760 Returns:
761 The key range (a ValueRange over datastore_types.Key) requested in the
762 query.
765 remaining_filters = []
766 key_range = ValueRange()
767 key_prop = datastore_types.KEY_SPECIAL_PROPERTY
768 for f in filters:
769 op = f.op()
770 if not (f.property_size() == 1 and
771 f.property(0).name() == key_prop and
772 not (op == datastore_pb.Query_Filter.IN or
773 op == datastore_pb.Query_Filter.EXISTS)):
774 remaining_filters.append(f)
775 continue
777 val = f.property(0).value()
778 Check(val.has_referencevalue(), '__key__ kind must be compared to a key')
779 limit = datastore_types.FromReferenceProperty(val)
780 key_range.Update(op, limit)
783 remaining_orders = []
784 for o in orders:
785 if not (o.direction() == datastore_pb.Query_Order.ASCENDING and
786 o.property() == datastore_types.KEY_SPECIAL_PROPERTY):
787 remaining_orders.append(o)
788 else:
789 break
793 Check(not remaining_filters,
794 'Only comparison filters on ' + key_prop + ' supported')
795 Check(not remaining_orders,
796 'Only ascending order on ' + key_prop + ' supported')
798 return key_range
801 def ParseKindQuery(query, filters, orders):
802 """Parse __kind__ (schema) queries.
804 Raises exceptions for illegal queries.
805 Args:
806 query: A Query PB.
807 filters: the normalized filters from query.
808 orders: the normalized orders from query.
809 Returns:
810 The kind range (a ValueRange over string) requested in the query.
813 Check(not query.has_ancestor(), 'ancestor queries on __kind__ not allowed')
815 key_range = ParseKeyFilteredQuery(filters, orders)
816 key_range.Remap(_KindKeyToString)
818 return key_range
821 def _KindKeyToString(key):
822 """Extract kind name from __kind__ key.
824 Raises an ApplicationError if the key is not of the form '__kind__'/name.
826 Args:
827 key: a key for a __kind__ instance.
828 Returns:
829 kind specified by key.
831 key_path = key.to_path()
832 if (len(key_path) == 2 and key_path[0] == '__kind__' and
833 isinstance(key_path[1], basestring)):
834 return key_path[1]
835 Check(False, 'invalid Key for __kind__ table')
838 def ParseNamespaceQuery(query, filters, orders):
839 """Parse __namespace__ queries.
841 Raises exceptions for illegal queries.
842 Args:
843 query: A Query PB.
844 filters: the normalized filters from query.
845 orders: the normalized orders from query.
846 Returns:
847 The kind range (a ValueRange over string) requested in the query.
850 Check(not query.has_ancestor(),
851 'ancestor queries on __namespace__ not allowed')
853 key_range = ParseKeyFilteredQuery(filters, orders)
854 key_range.Remap(_NamespaceKeyToString)
856 return key_range
859 def _NamespaceKeyToString(key):
860 """Extract namespace name from __namespace__ key.
862 Raises an ApplicationError if the key is not of the form '__namespace__'/name
863 or '__namespace__'/_EMPTY_NAMESPACE_ID.
865 Args:
866 key: a key for a __namespace__ instance.
867 Returns:
868 namespace specified by key.
870 key_path = key.to_path()
871 if len(key_path) == 2 and key_path[0] == '__namespace__':
872 if key_path[1] == datastore_types._EMPTY_NAMESPACE_ID:
873 return ''
874 if isinstance(key_path[1], basestring):
875 return key_path[1]
876 Check(False, 'invalid Key for __namespace__ table')
879 def ParsePropertyQuery(query, filters, orders):
880 """Parse __property__ queries.
882 Raises exceptions for illegal queries.
883 Args:
884 query: A Query PB.
885 filters: the normalized filters from query.
886 orders: the normalized orders from query.
887 Returns:
888 The kind range (a ValueRange over (kind, property) pairs) requested
889 in the query.
892 Check(not query.has_transaction(),
893 'transactional queries on __property__ not allowed')
895 key_range = ParseKeyFilteredQuery(filters, orders)
896 key_range.Remap(lambda x: _PropertyKeyToString(x, ''))
898 if query.has_ancestor():
899 ancestor = datastore_types.Key._FromPb(query.ancestor())
900 ancestor_kind, ancestor_property = _PropertyKeyToString(ancestor, None)
903 if ancestor_property is not None:
904 key_range.Update(datastore_pb.Query_Filter.EQUAL,
905 (ancestor_kind, ancestor_property))
906 else:
908 key_range.Update(datastore_pb.Query_Filter.GREATER_THAN_OR_EQUAL,
909 (ancestor_kind, ''))
910 key_range.Update(datastore_pb.Query_Filter.LESS_THAN_OR_EQUAL,
911 (ancestor_kind + '\0', ''))
912 query.clear_ancestor()
914 return key_range
917 def _PropertyKeyToString(key, default_property):
918 """Extract property name from __property__ key.
920 Raises an ApplicationError if the key is not of the form
921 '__kind__'/kind, '__property__'/property or '__kind__'/kind
923 Args:
924 key: a key for a __property__ instance.
925 default_property: property value to return when key only has a kind.
926 Returns:
927 kind, property if key = '__kind__'/kind, '__property__'/property
928 kind, default_property if key = '__kind__'/kind
930 key_path = key.to_path()
931 if (len(key_path) == 2 and
932 key_path[0] == '__kind__' and isinstance(key_path[1], basestring)):
933 return (key_path[1], default_property)
934 if (len(key_path) == 4 and
935 key_path[0] == '__kind__' and isinstance(key_path[1], basestring) and
936 key_path[2] == '__property__' and isinstance(key_path[3], basestring)):
937 return (key_path[1], key_path[3])
939 Check(False, 'invalid Key for __property__ table')
942 def SynthesizeUserId(email):
943 """Return a synthetic user ID from an email address.
945 Note that this is not the same user ID found in the production system.
947 Args:
948 email: An email address.
950 Returns:
951 A string userid derived from the email address.
954 user_id_digest = _MD5_FUNC(email.lower()).digest()
955 user_id = '1' + ''.join(['%02d' % ord(x) for x in user_id_digest])[:20]
956 return user_id
959 def FillUsersInQuery(filters):
960 """Fill in a synthetic user ID for all user properties in a set of filters.
962 Args:
963 filters: The normalized filters from query.
965 for filter in filters:
966 for property in filter.property_list():
967 FillUser(property)
970 def FillUser(property):
971 """Fill in a synthetic user ID for a user properties.
973 Args:
974 property: A Property which may have a user value.
976 if property.value().has_uservalue():
977 uid = SynthesizeUserId(property.value().uservalue().email())
978 if uid:
979 property.mutable_value().mutable_uservalue().set_obfuscated_gaiaid(uid)
982 class BaseCursor(object):
983 """A base query cursor over a list of entities.
985 Public properties:
986 cursor: the integer cursor.
987 app: the app for which this cursor was created.
988 keys_only: whether the query is keys_only.
990 Class attributes:
991 _next_cursor: the next cursor to allocate.
992 _next_cursor_lock: protects _next_cursor.
994 _next_cursor = 1
995 _next_cursor_lock = threading.Lock()
997 def __init__(self, query, dsquery, orders, index_list):
998 """Constructor.
1000 Args:
1001 query: the query request proto.
1002 dsquery: a datastore_query.Query over query.
1003 orders: the orders of query as returned by _GuessOrders.
1004 index_list: the list of indexes used by the query.
1007 self.keys_only = query.keys_only()
1008 self.property_names = set(query.property_name_list())
1009 self.group_by = set(query.group_by_property_name_list())
1010 self.app = query.app()
1011 self.cursor = self._AcquireCursorID()
1013 self.__order_compare_entities = dsquery._order.cmp_for_filter(
1014 dsquery._filter_predicate)
1015 if self.group_by:
1016 self.__cursor_properties = self.group_by
1017 else:
1018 self.__cursor_properties = set(order.property() for order in orders)
1019 self.__cursor_properties.add('__key__')
1020 self.__cursor_properties = frozenset(self.__cursor_properties)
1022 self.__first_sort_order = orders[0].direction()
1023 self.__index_list = index_list
1025 def _PopulateResultMetadata(self, query_result, compile,
1026 first_result, last_result):
1027 query_result.set_keys_only(self.keys_only)
1028 if query_result.more_results():
1029 cursor = query_result.mutable_cursor()
1030 cursor.set_app(self.app)
1031 cursor.set_cursor(self.cursor)
1032 if compile:
1033 self._EncodeCompiledCursor(last_result,
1034 query_result.mutable_compiled_cursor())
1035 if first_result:
1036 query_result.index_list().extend(self.__index_list)
1038 @classmethod
1039 def _AcquireCursorID(cls):
1040 """Acquires the next cursor id in a thread safe manner."""
1041 cls._next_cursor_lock.acquire()
1042 try:
1043 cursor_id = cls._next_cursor
1044 cls._next_cursor += 1
1045 finally:
1046 cls._next_cursor_lock.release()
1047 return cursor_id
1049 def _IsBeforeCursor(self, entity, cursor):
1050 """True if entity is before cursor according to the current order.
1052 Args:
1053 entity: a entity_pb.EntityProto entity.
1054 cursor: a compiled cursor as returned by _DecodeCompiledCursor.
1056 comparison_entity = entity_pb.EntityProto()
1057 for prop in entity.property_list():
1058 if prop.name() in self.__cursor_properties:
1059 comparison_entity.add_property().MergeFrom(prop)
1060 if cursor[0].has_key():
1061 comparison_entity.mutable_key().MergeFrom(entity.key())
1062 x = self.__order_compare_entities(comparison_entity, cursor[0])
1063 if cursor[1]:
1064 return x < 0
1065 else:
1066 return x <= 0
1068 def _DecodeCompiledCursor(self, compiled_cursor):
1069 """Converts a compiled_cursor into a cursor_entity.
1071 Args:
1072 compiled_cursor: The datastore_pb.CompiledCursor to decode.
1074 Returns:
1075 (cursor_entity, inclusive): a entity_pb.EntityProto and if it should
1076 be included in the result set.
1078 assert compiled_cursor.has_position()
1080 position = compiled_cursor.position()
1085 remaining_properties = set(self.__cursor_properties)
1087 cursor_entity = entity_pb.EntityProto()
1088 if position.has_key():
1089 cursor_entity.mutable_key().CopyFrom(position.key())
1090 try:
1091 remaining_properties.remove('__key__')
1092 except KeyError:
1093 Check(False, 'Cursor does not match query: extra value __key__')
1094 for indexvalue in position.indexvalue_list():
1095 property = cursor_entity.add_property()
1096 property.set_name(indexvalue.property())
1097 property.mutable_value().CopyFrom(indexvalue.value())
1098 try:
1099 remaining_properties.remove(indexvalue.property())
1100 except KeyError:
1101 Check(False, 'Cursor does not match query: extra value %s' %
1102 indexvalue.property())
1103 Check(not remaining_properties,
1104 'Cursor does not match query: missing values for %r' %
1105 remaining_properties)
1109 return (cursor_entity, position.start_inclusive())
1111 def _EncodeCompiledCursor(self, last_result, compiled_cursor):
1112 """Converts the current state of the cursor into a compiled_cursor.
1114 Args:
1115 last_result: the last result returned by this query.
1116 compiled_cursor: an empty datstore_pb.CompiledCursor.
1118 if last_result is not None:
1121 position = compiled_cursor.mutable_position()
1124 if '__key__' in self.__cursor_properties:
1125 position.mutable_key().MergeFrom(last_result.key())
1126 for prop in last_result.property_list():
1127 if prop.name() in self.__cursor_properties:
1128 indexvalue = position.add_indexvalue()
1129 indexvalue.set_property(prop.name())
1130 indexvalue.mutable_value().CopyFrom(prop.value())
1131 position.set_start_inclusive(False)
1132 _SetBeforeAscending(position, self.__first_sort_order)
1135 class ListCursor(BaseCursor):
1136 """A query cursor over a list of entities.
1138 Public properties:
1139 keys_only: whether the query is keys_only
1142 def __init__(self, query, dsquery, orders, index_list, results):
1143 """Constructor.
1145 Args:
1146 query: the query request proto
1147 dsquery: a datastore_query.Query over query.
1148 orders: the orders of query as returned by _GuessOrders.
1149 index_list: the list of indexes used by the query.
1150 results: list of entity_pb.EntityProto
1152 super(ListCursor, self).__init__(query, dsquery, orders, index_list)
1155 if self.group_by:
1156 distincts = set()
1157 new_results = []
1158 for result in results:
1159 key_value = _GetGroupByKey(result, self.group_by)
1160 if key_value not in distincts:
1161 distincts.add(key_value)
1162 new_results.append(result)
1163 results = new_results
1165 if query.has_compiled_cursor() and query.compiled_cursor().has_position():
1166 start_cursor = self._DecodeCompiledCursor(query.compiled_cursor())
1167 self.__last_result = start_cursor[0]
1168 start_cursor_position = self._GetCursorOffset(results, start_cursor)
1169 else:
1170 self.__last_result = None
1171 start_cursor_position = 0
1173 if query.has_end_compiled_cursor():
1174 if query.end_compiled_cursor().has_position():
1175 end_cursor = self._DecodeCompiledCursor(query.end_compiled_cursor())
1176 end_cursor_position = self._GetCursorOffset(results, end_cursor)
1177 else:
1178 end_cursor_position = 0
1179 else:
1180 end_cursor_position = len(results)
1183 results = results[start_cursor_position:end_cursor_position]
1186 if query.has_limit():
1187 limit = query.limit()
1188 if query.offset():
1189 limit += query.offset()
1190 if limit >= 0 and limit < len(results):
1191 results = results[:limit]
1193 self.__results = results
1194 self.__offset = 0
1195 self.__count = len(self.__results)
1197 def _GetCursorOffset(self, results, cursor):
1198 """Converts a cursor into a offset into the result set even if the
1199 cursor's entity no longer exists.
1201 Args:
1202 results: the query's results (sequence of entity_pb.EntityProto)
1203 cursor: a compiled cursor as returned by _DecodeCompiledCursor
1204 Returns:
1205 the integer offset
1207 lo = 0
1208 hi = len(results)
1209 while lo < hi:
1210 mid = (lo + hi) // 2
1211 if self._IsBeforeCursor(results[mid], cursor):
1212 lo = mid + 1
1213 else:
1214 hi = mid
1215 return lo
1217 def PopulateQueryResult(self, result, count, offset,
1218 compile=False, first_result=False):
1219 """Populates a QueryResult with this cursor and the given number of results.
1221 Args:
1222 result: datastore_pb.QueryResult
1223 count: integer of how many results to return
1224 offset: integer of how many results to skip
1225 compile: boolean, whether we are compiling this query
1226 first_result: whether the query result is the first for this query
1228 Check(offset >= 0, 'Offset must be >= 0')
1230 offset = min(offset, self.__count - self.__offset)
1231 limited_offset = min(offset, _MAX_QUERY_OFFSET)
1232 if limited_offset:
1233 self.__offset += limited_offset
1234 result.set_skipped_results(limited_offset)
1236 if compile and result.skipped_results() > 0:
1237 self._EncodeCompiledCursor(self.__results[self.__offset - 1],
1238 result.mutable_skipped_results_compiled_cursor())
1239 if offset == limited_offset and count:
1241 if count > _MAXIMUM_RESULTS:
1242 count = _MAXIMUM_RESULTS
1243 results = self.__results[self.__offset:self.__offset + count]
1244 count = len(results)
1245 self.__offset += count
1251 result.result_list().extend(
1252 LoadEntity(entity, self.keys_only, self.property_names)
1253 for entity in results)
1254 if compile:
1255 for entity in results:
1256 self._EncodeCompiledCursor(entity,
1257 result.add_result_compiled_cursor())
1259 if self.__offset:
1261 self.__last_result = self.__results[self.__offset - 1]
1263 result.set_more_results(self.__offset < self.__count)
1264 self._PopulateResultMetadata(result, compile,
1265 first_result, self.__last_result)
1268 def _SynchronizeTxn(function):
1269 """A decorator that locks a transaction during the function call."""
1271 def sync(txn, *args, **kwargs):
1273 txn._lock.acquire()
1274 try:
1276 Check(txn._state is LiveTxn.ACTIVE, 'transaction closed')
1278 return function(txn, *args, **kwargs)
1279 finally:
1281 txn._lock.release()
1282 return sync
1285 def _GetEntityGroup(ref):
1286 """Returns the entity group key for the given reference."""
1287 entity_group = entity_pb.Reference()
1288 entity_group.CopyFrom(ref)
1289 assert (entity_group.path().element_list()[0].has_id() or
1290 entity_group.path().element_list()[0].has_name())
1291 del entity_group.path().element_list()[1:]
1292 return entity_group
1295 def _GetKeyKind(key):
1296 """Return the kind of the given key."""
1297 return key.path().element_list()[-1].type()
1300 def _FilterIndexesByKind(key, indexes):
1301 """Return only the indexes with the specified kind."""
1302 return filter((lambda index:
1303 index.definition().entity_type() == _GetKeyKind(key)), indexes)
1306 class LiveTxn(object):
1307 """An in flight transaction."""
1326 ACTIVE = 1
1327 COMMITED = 2
1328 ROLLEDBACK = 3
1329 FAILED = 4
1331 _state = ACTIVE
1332 _commit_time_s = None
1334 def __init__(self, txn_manager, app, allow_multiple_eg):
1335 assert isinstance(txn_manager, BaseTransactionManager)
1336 assert isinstance(app, basestring)
1338 self._txn_manager = txn_manager
1339 self._app = app
1340 self._allow_multiple_eg = allow_multiple_eg
1343 self._entity_groups = {}
1345 self._lock = threading.RLock()
1346 self._apply_lock = threading.Lock()
1348 self._actions = []
1349 self._cost = datastore_pb.Cost()
1355 self._kind_to_indexes = collections.defaultdict(list)
1357 def _GetTracker(self, reference):
1358 """Gets the entity group tracker for reference.
1360 If this is the first time reference's entity group is seen, creates a new
1361 tracker, checking that the transaction doesn't exceed the entity group
1362 limit.
1364 entity_group = _GetEntityGroup(reference)
1365 key = datastore_types.ReferenceToKeyValue(entity_group)
1366 tracker = self._entity_groups.get(key, None)
1367 if tracker is None:
1368 Check(self._app == reference.app(),
1369 'Transactions cannot span applications (expected %s, got %s)' %
1370 (self._app, reference.app()))
1371 if self._allow_multiple_eg:
1372 Check(len(self._entity_groups) < _MAX_EG_PER_TXN,
1373 'operating on too many entity groups in a single transaction.')
1374 else:
1375 Check(len(self._entity_groups) < 1,
1376 "cross-groups transaction need to be explicitly "
1377 "specified (xg=True)")
1378 tracker = EntityGroupTracker(entity_group)
1379 self._entity_groups[key] = tracker
1381 return tracker
1383 def _GetAllTrackers(self):
1384 """Get the trackers for the transaction's entity groups.
1386 If no entity group has been discovered returns a 'global' entity group
1387 tracker. This is possible if the txn only contains transactional tasks.
1389 Returns:
1390 The tracker list for the entity groups used in this txn.
1392 if not self._entity_groups:
1393 self._GetTracker(datastore_types.Key.from_path(
1394 '__global__', 1, _app=self._app)._ToPb())
1395 return self._entity_groups.values()
1397 def _GrabSnapshot(self, reference):
1398 """Gets snapshot for this reference, creating it if necessary.
1400 If no snapshot has been set for reference's entity group, a snapshot is
1401 taken and stored for future reads (this also sets the read position),
1402 and a CONCURRENT_TRANSACTION exception is thrown if we no longer have
1403 a consistent snapshot.
1405 Args:
1406 reference: A entity_pb.Reference from which to extract the entity group.
1407 Raises:
1408 apiproxy_errors.ApplicationError if the snapshot is not consistent.
1410 tracker = self._GetTracker(reference)
1411 check_contention = tracker._snapshot is None
1412 snapshot = tracker._GrabSnapshot(self._txn_manager)
1413 if check_contention:
1419 candidates = [other for other in self._entity_groups.values()
1420 if other._snapshot is not None and other != tracker]
1421 meta_data_list = [other._meta_data for other in candidates]
1422 self._txn_manager._AcquireWriteLocks(meta_data_list)
1423 try:
1424 for other in candidates:
1425 if other._meta_data._log_pos != other._read_pos:
1426 self._state = self.FAILED
1427 raise apiproxy_errors.ApplicationError(
1428 datastore_pb.Error.CONCURRENT_TRANSACTION,
1429 'Concurrency exception.')
1430 finally:
1431 self._txn_manager._ReleaseWriteLocks(meta_data_list)
1432 return snapshot
1434 @_SynchronizeTxn
1435 def Get(self, reference):
1436 """Returns the entity associated with the given entity_pb.Reference or None.
1438 Does not see any modifications in the current txn.
1440 Args:
1441 reference: The entity_pb.Reference of the entity to look up.
1443 Returns:
1444 The associated entity_pb.EntityProto or None if no such entity exists.
1446 snapshot = self._GrabSnapshot(reference)
1447 entity = snapshot.get(datastore_types.ReferenceToKeyValue(reference))
1448 return LoadEntity(entity)
1450 @_SynchronizeTxn
1451 def GetQueryCursor(self, query, filters, orders, index_list,
1452 filter_predicate=None):
1453 """Runs the given datastore_pb.Query and returns a QueryCursor for it.
1455 Does not see any modifications in the current txn.
1457 Args:
1458 query: The datastore_pb.Query to run.
1459 filters: A list of filters that override the ones found on query.
1460 orders: A list of orders that override the ones found on query.
1461 index_list: A list of indexes used by the query.
1462 filter_predicate: an additional filter of type
1463 datastore_query.FilterPredicate. This is passed along to implement V4
1464 specific filters without changing the entire stub.
1466 Returns:
1467 A BaseCursor that can be used to fetch query results.
1469 Check(query.has_ancestor(),
1470 'Query must have an ancestor when performed in a transaction.')
1471 snapshot = self._GrabSnapshot(query.ancestor())
1472 return _ExecuteQuery(snapshot.values(), query, filters, orders, index_list,
1473 filter_predicate)
1475 @_SynchronizeTxn
1476 def Put(self, entity, insert, indexes):
1477 """Puts the given entity.
1479 Args:
1480 entity: The entity_pb.EntityProto to put.
1481 insert: A boolean that indicates if we should fail if the entity already
1482 exists.
1483 indexes: The composite indexes that apply to the entity.
1485 tracker = self._GetTracker(entity.key())
1486 key = datastore_types.ReferenceToKeyValue(entity.key())
1487 tracker._delete.pop(key, None)
1488 tracker._put[key] = (entity, insert)
1489 self._kind_to_indexes[_GetKeyKind(entity.key())] = indexes
1491 @_SynchronizeTxn
1492 def Delete(self, reference, indexes):
1493 """Deletes the entity associated with the given reference.
1495 Args:
1496 reference: The entity_pb.Reference of the entity to delete.
1497 indexes: The composite indexes that apply to the entity.
1499 tracker = self._GetTracker(reference)
1500 key = datastore_types.ReferenceToKeyValue(reference)
1501 tracker._put.pop(key, None)
1502 tracker._delete[key] = reference
1503 self._kind_to_indexes[_GetKeyKind(reference)] = indexes
1505 @_SynchronizeTxn
1506 def AddActions(self, actions, max_actions=None):
1507 """Adds the given actions to the current txn.
1509 Args:
1510 actions: A list of pbs to send to taskqueue.Add when the txn is applied.
1511 max_actions: A number that indicates the maximum number of actions to
1512 allow on this txn.
1514 Check(not max_actions or len(self._actions) + len(actions) <= max_actions,
1515 'Too many messages, maximum allowed %s' % max_actions)
1516 self._actions.extend(actions)
1518 def Rollback(self):
1519 """Rollback the current txn."""
1521 self._lock.acquire()
1522 try:
1523 Check(self._state is self.ACTIVE or self._state is self.FAILED,
1524 'transaction closed')
1525 self._state = self.ROLLEDBACK
1526 finally:
1527 self._txn_manager._RemoveTxn(self)
1529 self._lock.release()
1531 @_SynchronizeTxn
1532 def Commit(self):
1533 """Commits the current txn.
1535 This function hands off the responsibility of calling _Apply to the owning
1536 TransactionManager.
1538 Returns:
1539 The cost of the transaction.
1541 try:
1543 trackers = self._GetAllTrackers()
1544 empty = True
1545 for tracker in trackers:
1546 snapshot = tracker._GrabSnapshot(self._txn_manager)
1547 empty = empty and not tracker._put and not tracker._delete
1550 for entity, insert in tracker._put.itervalues():
1551 Check(not insert or self.Get(entity.key()) is None,
1552 'the id allocated for a new entity was already '
1553 'in use, please try again')
1555 old_entity = None
1556 key = datastore_types.ReferenceToKeyValue(entity.key())
1557 if key in snapshot:
1558 old_entity = snapshot[key]
1559 self._AddWriteOps(old_entity, entity)
1561 for reference in tracker._delete.itervalues():
1564 old_entity = None
1565 key = datastore_types.ReferenceToKeyValue(reference)
1566 if key in snapshot:
1567 old_entity = snapshot[key]
1568 if old_entity is not None:
1569 self._AddWriteOps(None, old_entity)
1572 if empty and not self._actions:
1573 self.Rollback()
1574 return datastore_pb.Cost()
1577 meta_data_list = [tracker._meta_data for tracker in trackers]
1578 self._txn_manager._AcquireWriteLocks(meta_data_list)
1579 except:
1581 self.Rollback()
1582 raise
1584 try:
1586 for tracker in trackers:
1587 Check(tracker._meta_data._log_pos == tracker._read_pos,
1588 'Concurrency exception.',
1589 datastore_pb.Error.CONCURRENT_TRANSACTION)
1592 for tracker in trackers:
1593 tracker._meta_data.Log(self)
1594 self._state = self.COMMITED
1595 self._commit_time_s = time.time()
1596 except:
1598 self.Rollback()
1599 raise
1600 else:
1602 for action in self._actions:
1603 try:
1604 apiproxy_stub_map.MakeSyncCall(
1605 'taskqueue', 'Add', action, api_base_pb.VoidProto())
1606 except apiproxy_errors.ApplicationError, e:
1607 logging.warning('Transactional task %s has been dropped, %s',
1608 action, e)
1609 self._actions = []
1610 finally:
1611 self._txn_manager._RemoveTxn(self)
1613 self._txn_manager._ReleaseWriteLocks(meta_data_list)
1616 self._txn_manager._consistency_policy._OnCommit(self)
1617 return self._cost
1619 def _AddWriteOps(self, old_entity, new_entity):
1620 """Adds the cost of writing the new_entity to the _cost member.
1622 We assume that old_entity represents the current state of the Datastore.
1624 Args:
1625 old_entity: Entity representing the current state in the Datstore.
1626 new_entity: Entity representing the desired state in the Datstore.
1628 composite_indexes = self._kind_to_indexes[_GetKeyKind(new_entity.key())]
1629 entity_writes, index_writes = _CalculateWriteOps(
1630 composite_indexes, old_entity, new_entity)
1631 _UpdateCost(self._cost, entity_writes, index_writes)
1633 def _Apply(self, meta_data):
1634 """Applies the current txn on the given entity group.
1636 This function blindly performs the operations contained in the current txn.
1637 The calling function must acquire the entity group write lock and ensure
1638 transactions are applied in order.
1641 self._apply_lock.acquire()
1642 try:
1644 assert self._state == self.COMMITED
1645 for tracker in self._entity_groups.values():
1646 if tracker._meta_data is meta_data:
1647 break
1648 else:
1649 assert False
1650 assert tracker._read_pos != tracker.APPLIED
1653 for entity, insert in tracker._put.itervalues():
1654 self._txn_manager._Put(entity, insert)
1657 for key in tracker._delete.itervalues():
1658 self._txn_manager._Delete(key)
1662 tracker._read_pos = EntityGroupTracker.APPLIED
1665 tracker._meta_data.Unlog(self)
1666 finally:
1667 self._apply_lock.release()
1670 class EntityGroupTracker(object):
1671 """An entity group involved a transaction."""
1673 APPLIED = -2
1679 _read_pos = None
1682 _snapshot = None
1685 _meta_data = None
1687 def __init__(self, entity_group):
1688 self._entity_group = entity_group
1689 self._put = {}
1690 self._delete = {}
1692 def _GrabSnapshot(self, txn_manager):
1693 """Snapshot this entity group, remembering the read position."""
1694 if self._snapshot is None:
1695 self._meta_data, self._read_pos, self._snapshot = (
1696 txn_manager._GrabSnapshot(self._entity_group))
1697 return self._snapshot
1700 class EntityGroupMetaData(object):
1701 """The meta_data assoicated with an entity group."""
1704 _log_pos = -1
1706 _snapshot = None
1708 def __init__(self, entity_group):
1709 self._entity_group = entity_group
1710 self._write_lock = threading.Lock()
1711 self._apply_queue = []
1713 def CatchUp(self):
1714 """Applies all outstanding txns."""
1716 assert self._write_lock.acquire(False) is False
1718 while self._apply_queue:
1719 self._apply_queue[0]._Apply(self)
1721 def Log(self, txn):
1722 """Add a pending transaction to this entity group.
1724 Requires that the caller hold the meta data lock.
1725 This also increments the current log position and clears the snapshot cache.
1728 assert self._write_lock.acquire(False) is False
1729 self._apply_queue.append(txn)
1730 self._log_pos += 1
1731 self._snapshot = None
1733 def Unlog(self, txn):
1734 """Remove the first pending transaction from the apply queue.
1736 Requires that the caller hold the meta data lock.
1737 This checks that the first pending transaction is indeed txn.
1740 assert self._write_lock.acquire(False) is False
1742 Check(self._apply_queue and self._apply_queue[0] is txn,
1743 'Transaction is not appliable',
1744 datastore_pb.Error.INTERNAL_ERROR)
1745 self._apply_queue.pop(0)
1748 class BaseConsistencyPolicy(object):
1749 """A base class for a consistency policy to be used with a transaction manger.
1754 def _OnCommit(self, txn):
1755 """Called after a LiveTxn has been commited.
1757 This function can decide whether to apply the txn right away.
1759 Args:
1760 txn: A LiveTxn that has been commited
1762 raise NotImplementedError
1764 def _OnGroom(self, meta_data_list):
1765 """Called once for every global query.
1767 This function must aqcuire the write lock for any meta data before applying
1768 any outstanding txns.
1770 Args:
1771 meta_data_list: A list of EntityGroupMetaData objects.
1773 raise NotImplementedError
1776 class MasterSlaveConsistencyPolicy(BaseConsistencyPolicy):
1777 """Enforces the Master / Slave consistency policy.
1779 Applies all txn on commit.
1782 def _OnCommit(self, txn):
1784 for tracker in txn._GetAllTrackers():
1785 tracker._meta_data._write_lock.acquire()
1786 try:
1787 tracker._meta_data.CatchUp()
1788 finally:
1789 tracker._meta_data._write_lock.release()
1794 txn._txn_manager.Write()
1796 def _OnGroom(self, meta_data_list):
1799 pass
1802 class BaseHighReplicationConsistencyPolicy(BaseConsistencyPolicy):
1803 """A base class for High Replication Datastore consistency policies.
1805 All txn are applied asynchronously.
1808 def _OnCommit(self, txn):
1809 pass
1811 def _OnGroom(self, meta_data_list):
1814 for meta_data in meta_data_list:
1815 if not meta_data._apply_queue:
1816 continue
1819 meta_data._write_lock.acquire()
1820 try:
1821 while meta_data._apply_queue:
1822 txn = meta_data._apply_queue[0]
1823 if self._ShouldApply(txn, meta_data):
1824 txn._Apply(meta_data)
1825 else:
1826 break
1827 finally:
1828 meta_data._write_lock.release()
1830 def _ShouldApply(self, txn, meta_data):
1831 """Determins if the given transaction should be applied."""
1832 raise NotImplementedError
1835 class TimeBasedHRConsistencyPolicy(BaseHighReplicationConsistencyPolicy):
1836 """A High Replication Datastore consiseny policy based on elapsed time.
1838 This class tries to simulate performance seen in the high replication
1839 datastore using estimated probabilities of a transaction commiting after a
1840 given amount of time.
1843 _classification_map = [(.98, 100),
1844 (.99, 300),
1845 (.995, 2000),
1846 (1, 240000)
1849 def SetClassificationMap(self, classification_map):
1850 """Set the probability a txn will be applied after a given amount of time.
1852 Args:
1853 classification_map: A list of tuples containing (float between 0 and 1,
1854 number of miliseconds) that define the probability of a transaction
1855 applying after a given amount of time.
1857 for prob, delay in classification_map:
1858 if prob < 0 or prob > 1 or delay <= 0:
1859 raise TypeError(
1860 'classification_map must be a list of (probability, delay) tuples, '
1861 'found %r' % (classification_map,))
1863 self._classification_map = sorted(classification_map)
1865 def _ShouldApplyImpl(self, elapsed_ms, classification):
1866 for rate, ms in self._classification_map:
1867 if classification <= rate:
1868 break
1869 return elapsed_ms >= ms
1871 def _Classify(self, txn, meta_data):
1872 return random.Random(id(txn) ^ id(meta_data)).random()
1874 def _ShouldApply(self, txn, meta_data):
1875 elapsed_ms = (time.time() - txn._commit_time_s) * 1000
1876 classification = self._Classify(txn, meta_data)
1877 return self._ShouldApplyImpl(elapsed_ms, classification)
1880 class PseudoRandomHRConsistencyPolicy(BaseHighReplicationConsistencyPolicy):
1881 """A policy that always gives the same sequence of consistency decisions."""
1883 def __init__(self, probability=.5, seed=0):
1884 """Constructor.
1886 Args:
1887 probability: A number between 0 and 1 that is the likelihood of a
1888 transaction applying before a global query is executed.
1889 seed: A hashable object to use as a seed. Use None to use the current
1890 timestamp.
1892 self.SetProbability(probability)
1893 self.SetSeed(seed)
1895 def SetProbability(self, probability):
1896 """Change the probability of a transaction applying.
1898 Args:
1899 probability: A number between 0 and 1 that determins the probability of a
1900 transaction applying before a global query is run.
1902 if probability < 0 or probability > 1:
1903 raise TypeError('probability must be a number between 0 and 1, found %r' %
1904 probability)
1905 self._probability = probability
1907 def SetSeed(self, seed):
1908 """Reset the seed."""
1909 self._random = random.Random(seed)
1911 def _ShouldApply(self, txn, meta_data):
1912 return self._random.random() < self._probability
1915 class BaseTransactionManager(object):
1916 """A class that manages the state of transactions.
1918 This includes creating consistent snap shots for transactions.
1921 def __init__(self, consistency_policy=None):
1922 super(BaseTransactionManager, self).__init__()
1924 self._consistency_policy = (consistency_policy or
1925 MasterSlaveConsistencyPolicy())
1928 self._meta_data_lock = threading.Lock()
1929 BaseTransactionManager.Clear(self)
1931 def SetConsistencyPolicy(self, policy):
1932 """Set the consistency to use.
1934 Causes all data to be flushed.
1936 Args:
1937 policy: A obj inheriting from BaseConsistencyPolicy.
1939 if not isinstance(policy, BaseConsistencyPolicy):
1940 raise TypeError('policy should be of type '
1941 'datastore_stub_util.BaseConsistencyPolicy found %r.' %
1942 (policy,))
1943 self.Flush()
1944 self._consistency_policy = policy
1946 def Clear(self):
1947 """Discards any pending transactions and resets the meta data."""
1949 self._meta_data = {}
1951 self._txn_map = {}
1953 def BeginTransaction(self, app, allow_multiple_eg):
1954 """Start a transaction on the given app.
1956 Args:
1957 app: A string representing the app for which to start the transaction.
1958 allow_multiple_eg: True if transactions can span multiple entity groups.
1960 Returns:
1961 A datastore_pb.Transaction for the created transaction
1963 Check(not (allow_multiple_eg and isinstance(
1964 self._consistency_policy, MasterSlaveConsistencyPolicy)),
1965 'transactions on multiple entity groups only allowed with the '
1966 'High Replication datastore')
1967 txn = self._BeginTransaction(app, allow_multiple_eg)
1968 self._txn_map[id(txn)] = txn
1969 transaction = datastore_pb.Transaction()
1970 transaction.set_app(app)
1971 transaction.set_handle(id(txn))
1972 return transaction
1974 def GetTxn(self, transaction, request_trusted, request_app):
1975 """Gets the LiveTxn object associated with the given transaction.
1977 Args:
1978 transaction: The datastore_pb.Transaction to look up.
1979 request_trusted: A boolean indicating If the requesting app is trusted.
1980 request_app: A string representing the app making the request.
1982 Returns:
1983 The associated LiveTxn object.
1985 request_app = datastore_types.ResolveAppId(request_app)
1986 CheckTransaction(request_trusted, request_app, transaction)
1987 txn = self._txn_map.get(transaction.handle())
1988 Check(txn and txn._app == transaction.app(),
1989 'Transaction(<%s>) not found' % str(transaction).replace('\n', ', '))
1990 return txn
1992 def Groom(self):
1993 """Attempts to apply any outstanding transactions.
1995 The consistency policy determins if a transaction should be applied.
1997 self._meta_data_lock.acquire()
1998 try:
1999 self._consistency_policy._OnGroom(self._meta_data.itervalues())
2000 finally:
2001 self._meta_data_lock.release()
2003 def Flush(self):
2004 """Applies all outstanding transactions."""
2005 self._meta_data_lock.acquire()
2006 try:
2007 for meta_data in self._meta_data.itervalues():
2008 if not meta_data._apply_queue:
2009 continue
2012 meta_data._write_lock.acquire()
2013 try:
2014 meta_data.CatchUp()
2015 finally:
2016 meta_data._write_lock.release()
2017 finally:
2018 self._meta_data_lock.release()
2020 def _GetMetaData(self, entity_group):
2021 """Safely gets the EntityGroupMetaData object for the given entity_group.
2023 self._meta_data_lock.acquire()
2024 try:
2025 key = datastore_types.ReferenceToKeyValue(entity_group)
2027 meta_data = self._meta_data.get(key, None)
2028 if not meta_data:
2029 meta_data = EntityGroupMetaData(entity_group)
2030 self._meta_data[key] = meta_data
2031 return meta_data
2032 finally:
2033 self._meta_data_lock.release()
2035 def _BeginTransaction(self, app, allow_multiple_eg):
2036 """Starts a transaction without storing it in the txn_map."""
2037 return LiveTxn(self, app, allow_multiple_eg)
2039 def _GrabSnapshot(self, entity_group):
2040 """Grabs a consistent snapshot of the given entity group.
2042 Args:
2043 entity_group: A entity_pb.Reference of the entity group of which the
2044 snapshot should be taken.
2046 Returns:
2047 A tuple of (meta_data, log_pos, snapshot) where log_pos is the current log
2048 position and snapshot is a map of reference key value to
2049 entity_pb.EntityProto.
2052 meta_data = self._GetMetaData(entity_group)
2053 meta_data._write_lock.acquire()
2054 try:
2055 if not meta_data._snapshot:
2057 meta_data.CatchUp()
2058 meta_data._snapshot = self._GetEntitiesInEntityGroup(entity_group)
2059 return meta_data, meta_data._log_pos, meta_data._snapshot
2060 finally:
2062 meta_data._write_lock.release()
2064 def _AcquireWriteLocks(self, meta_data_list):
2065 """Acquire the write locks for the given entity group meta data.
2067 These locks must be released with _ReleaseWriteLock before returning to the
2068 user.
2070 Args:
2071 meta_data_list: list of EntityGroupMetaData objects.
2073 for meta_data in sorted(meta_data_list):
2074 meta_data._write_lock.acquire()
2076 def _ReleaseWriteLocks(self, meta_data_list):
2077 """Release the write locks of the given entity group meta data.
2079 Args:
2080 meta_data_list: list of EntityGroupMetaData objects.
2082 for meta_data in sorted(meta_data_list):
2083 meta_data._write_lock.release()
2085 def _RemoveTxn(self, txn):
2086 """Removes a LiveTxn from the txn_map (if present)."""
2087 self._txn_map.pop(id(txn), None)
2089 def _Put(self, entity, insert):
2090 """Put the given entity.
2092 This must be implemented by a sub-class. The sub-class can assume that any
2093 need consistency is enforced at a higher level (and can just put blindly).
2095 Args:
2096 entity: The entity_pb.EntityProto to put.
2097 insert: A boolean that indicates if we should fail if the entity already
2098 exists.
2100 raise NotImplementedError
2102 def _Delete(self, reference):
2103 """Delete the entity associated with the specified reference.
2105 This must be implemented by a sub-class. The sub-class can assume that any
2106 need consistency is enforced at a higher level (and can just delete
2107 blindly).
2109 Args:
2110 reference: The entity_pb.Reference of the entity to delete.
2112 raise NotImplementedError
2114 def _GetEntitiesInEntityGroup(self, entity_group):
2115 """Gets the contents of a specific entity group.
2117 This must be implemented by a sub-class. The sub-class can assume that any
2118 need consistency is enforced at a higher level (and can just blindly read).
2120 Other entity groups may be modified concurrently.
2122 Args:
2123 entity_group: A entity_pb.Reference of the entity group to get.
2125 Returns:
2126 A dict mapping datastore_types.ReferenceToKeyValue(key) to EntityProto
2128 raise NotImplementedError
2131 class BaseIndexManager(object):
2132 """A generic index manager that stores all data in memory."""
2141 WRITE_ONLY = entity_pb.CompositeIndex.WRITE_ONLY
2142 READ_WRITE = entity_pb.CompositeIndex.READ_WRITE
2143 DELETED = entity_pb.CompositeIndex.DELETED
2144 ERROR = entity_pb.CompositeIndex.ERROR
2146 _INDEX_STATE_TRANSITIONS = {
2147 WRITE_ONLY: frozenset((READ_WRITE, DELETED, ERROR)),
2148 READ_WRITE: frozenset((DELETED,)),
2149 ERROR: frozenset((DELETED,)),
2150 DELETED: frozenset((ERROR,)),
2153 def __init__(self):
2157 self.__indexes = collections.defaultdict(list)
2158 self.__indexes_lock = threading.Lock()
2159 self.__next_index_id = 1
2160 self.__index_id_lock = threading.Lock()
2162 def __FindIndex(self, index):
2163 """Finds an existing index by definition.
2165 Args:
2166 index: entity_pb.CompositeIndex
2168 Returns:
2169 entity_pb.CompositeIndex, if it exists; otherwise None
2171 app = index.app_id()
2172 if app in self.__indexes:
2173 for stored_index in self.__indexes[app]:
2174 if index.definition() == stored_index.definition():
2175 return stored_index
2177 return None
2179 def CreateIndex(self, index, trusted=False, calling_app=None):
2182 calling_app = datastore_types.ResolveAppId(calling_app)
2183 CheckAppId(trusted, calling_app, index.app_id())
2184 Check(index.id() == 0, 'New index id must be 0.')
2185 Check(not self.__FindIndex(index), 'Index already exists.')
2188 self.__index_id_lock.acquire()
2189 index.set_id(self.__next_index_id)
2190 self.__next_index_id += 1
2191 self.__index_id_lock.release()
2194 clone = entity_pb.CompositeIndex()
2195 clone.CopyFrom(index)
2196 app = index.app_id()
2197 clone.set_app_id(app)
2200 self.__indexes_lock.acquire()
2201 try:
2202 self.__indexes[app].append(clone)
2203 finally:
2204 self.__indexes_lock.release()
2206 self._OnIndexChange(index.app_id())
2208 return index.id()
2210 def GetIndexes(self, app, trusted=False, calling_app=None):
2211 """Get the CompositeIndex objects for the given app."""
2212 calling_app = datastore_types.ResolveAppId(calling_app)
2213 CheckAppId(trusted, calling_app, app)
2215 return self.__indexes[app]
2217 def UpdateIndex(self, index, trusted=False, calling_app=None):
2218 CheckAppId(trusted, calling_app, index.app_id())
2220 stored_index = self.__FindIndex(index)
2221 Check(stored_index, 'Index does not exist.')
2222 Check(index.state() == stored_index.state() or
2223 index.state() in self._INDEX_STATE_TRANSITIONS[stored_index.state()],
2224 'cannot move index state from %s to %s' %
2225 (entity_pb.CompositeIndex.State_Name(stored_index.state()),
2226 (entity_pb.CompositeIndex.State_Name(index.state()))))
2229 self.__indexes_lock.acquire()
2230 try:
2231 stored_index.set_state(index.state())
2232 finally:
2233 self.__indexes_lock.release()
2235 self._OnIndexChange(index.app_id())
2237 def DeleteIndex(self, index, trusted=False, calling_app=None):
2238 CheckAppId(trusted, calling_app, index.app_id())
2240 stored_index = self.__FindIndex(index)
2241 Check(stored_index, 'Index does not exist.')
2244 app = index.app_id()
2245 self.__indexes_lock.acquire()
2246 try:
2247 self.__indexes[app].remove(stored_index)
2248 finally:
2249 self.__indexes_lock.release()
2251 self._OnIndexChange(index.app_id())
2253 def _SideLoadIndex(self, index):
2254 self.__indexes[index.app()].append(index)
2256 def _OnIndexChange(self, app_id):
2257 pass
2260 class BaseDatastore(BaseTransactionManager, BaseIndexManager):
2261 """A base implemenation of a Datastore.
2263 This class implements common functions associated with a datastore and
2264 enforces security restrictions passed on by a stub or client. It is designed
2265 to be shared by any number of threads or clients serving any number of apps.
2267 If an app is not specified explicitly it is pulled from the env and assumed to
2268 be untrusted.
2273 _MAX_QUERY_COMPONENTS = 100
2277 _BATCH_SIZE = 20
2281 _MAX_ACTIONS_PER_TXN = 5
2283 def __init__(self, require_indexes=False, consistency_policy=None,
2284 use_atexit=True, auto_id_policy=SEQUENTIAL):
2285 BaseTransactionManager.__init__(self, consistency_policy=consistency_policy)
2286 BaseIndexManager.__init__(self)
2288 self._require_indexes = require_indexes
2289 self._pseudo_kinds = {}
2290 self.SetAutoIdPolicy(auto_id_policy)
2292 if use_atexit:
2297 atexit.register(self.Write)
2299 def Clear(self):
2300 """Clears out all stored values."""
2302 BaseTransactionManager.Clear(self)
2305 def _RegisterPseudoKind(self, kind):
2306 """Registers a pseudo kind to be used to satisfy a meta data query."""
2307 self._pseudo_kinds[kind.name] = kind
2308 kind._stub = weakref.proxy(self)
2313 def GetQueryCursor(self, raw_query, trusted=False, calling_app=None,
2314 filter_predicate=None):
2315 """Execute a query.
2317 Args:
2318 raw_query: The non-validated datastore_pb.Query to run.
2319 trusted: If the calling app is trusted.
2320 calling_app: The app requesting the results or None to pull the app from
2321 the environment.
2322 filter_predicate: an additional filter of type
2323 datastore_query.FilterPredicate. This is passed along to implement V4
2324 specific filters without changing the entire stub.
2326 Returns:
2327 A BaseCursor that can be used to retrieve results.
2330 calling_app = datastore_types.ResolveAppId(calling_app)
2331 CheckAppId(trusted, calling_app, raw_query.app())
2334 filters, orders = datastore_index.Normalize(raw_query.filter_list(),
2335 raw_query.order_list(),
2336 raw_query.property_name_list())
2339 CheckQuery(raw_query, filters, orders, self._MAX_QUERY_COMPONENTS)
2340 FillUsersInQuery(filters)
2342 index_list = []
2346 if filter_predicate is None:
2347 self._CheckHasIndex(raw_query, trusted, calling_app)
2350 index_list = self.__IndexListForQuery(raw_query)
2353 if raw_query.has_transaction():
2355 Check(raw_query.kind() not in self._pseudo_kinds,
2356 'transactional queries on "%s" not allowed' % raw_query.kind())
2357 txn = self.GetTxn(raw_query.transaction(), trusted, calling_app)
2358 return txn.GetQueryCursor(raw_query, filters, orders, index_list)
2360 if raw_query.has_ancestor() and raw_query.kind() not in self._pseudo_kinds:
2362 txn = self._BeginTransaction(raw_query.app(), False)
2363 return txn.GetQueryCursor(raw_query, filters, orders, index_list,
2364 filter_predicate)
2367 self.Groom()
2368 return self._GetQueryCursor(raw_query, filters, orders, index_list,
2369 filter_predicate)
2371 def __IndexListForQuery(self, query):
2372 """Get the single composite index pb used by the query, if any, as a list.
2374 Args:
2375 query: the datastore_pb.Query to compute the index list for
2377 Returns:
2378 A singleton list of the composite index pb used by the query,
2381 required, kind, ancestor, props = (
2382 datastore_index.CompositeIndexForQuery(query))
2383 if not required:
2384 return []
2385 composite_index_pb = entity_pb.CompositeIndex()
2386 composite_index_pb.set_app_id(query.app())
2387 composite_index_pb.set_id(0)
2388 composite_index_pb.set_state(entity_pb.CompositeIndex.READ_WRITE)
2389 index_pb = composite_index_pb.mutable_definition()
2390 index_pb.set_entity_type(kind)
2391 index_pb.set_ancestor(bool(ancestor))
2392 for name, direction in datastore_index.GetRecommendedIndexProperties(props):
2393 prop_pb = entity_pb.Index_Property()
2394 prop_pb.set_name(name)
2395 prop_pb.set_direction(direction)
2396 index_pb.property_list().append(prop_pb)
2397 return [composite_index_pb]
2399 def Get(self, raw_keys, transaction=None, eventual_consistency=False,
2400 trusted=False, calling_app=None):
2401 """Get the entities for the given keys.
2403 Args:
2404 raw_keys: A list of unverified entity_pb.Reference objects.
2405 transaction: The datastore_pb.Transaction to use or None.
2406 eventual_consistency: If we should allow stale, potentially inconsistent
2407 results.
2408 trusted: If the calling app is trusted.
2409 calling_app: The app requesting the results or None to pull the app from
2410 the environment.
2412 Returns:
2413 A list containing the entity or None if no entity exists.
2416 if not raw_keys:
2417 return []
2419 calling_app = datastore_types.ResolveAppId(calling_app)
2421 if not transaction and eventual_consistency:
2423 result = []
2424 for key in raw_keys:
2425 CheckReference(calling_app, trusted, key)
2426 result.append(self._GetWithPseudoKinds(None, key))
2427 return result
2432 grouped_keys = collections.defaultdict(list)
2433 for i, key in enumerate(raw_keys):
2434 CheckReference(trusted, calling_app, key)
2435 entity_group = _GetEntityGroup(key)
2436 entity_group_key = datastore_types.ReferenceToKeyValue(entity_group)
2437 grouped_keys[entity_group_key].append((key, i))
2439 if transaction:
2441 txn = self.GetTxn(transaction, trusted, calling_app)
2442 return [self._GetWithPseudoKinds(txn, key) for key in raw_keys]
2443 else:
2446 result = [None] * len(raw_keys)
2448 def op(txn, v):
2449 key, i = v
2450 result[i] = self._GetWithPseudoKinds(txn, key)
2451 for keys in grouped_keys.itervalues():
2452 self._RunInTxn(keys, keys[0][0].app(), op)
2453 return result
2455 def _GetWithPseudoKinds(self, txn, key):
2456 """Fetch entity key in txn, taking account of pseudo-kinds."""
2457 pseudo_kind = self._pseudo_kinds.get(_GetKeyKind(key), None)
2458 if pseudo_kind:
2459 return pseudo_kind.Get(txn, key)
2460 elif txn:
2461 return txn.Get(key)
2462 else:
2463 return self._Get(key)
2465 def Put(self, raw_entities, cost, transaction=None,
2466 trusted=False, calling_app=None):
2467 """Writes the given given entities.
2469 Updates an entity's key and entity_group in place if needed
2471 Args:
2472 raw_entities: A list of unverified entity_pb.EntityProto objects.
2473 cost: Out param. The cost of putting the provided entities.
2474 transaction: The datastore_pb.Transaction to use or None.
2475 trusted: If the calling app is trusted.
2476 calling_app: The app requesting the results or None to pull the app from
2477 the environment.
2478 Returns:
2479 A list of entity_pb.Reference objects that indicates where each entity
2480 was stored.
2482 if not raw_entities:
2483 return []
2485 calling_app = datastore_types.ResolveAppId(calling_app)
2488 result = [None] * len(raw_entities)
2489 grouped_entities = collections.defaultdict(list)
2490 for i, raw_entity in enumerate(raw_entities):
2491 CheckEntity(trusted, calling_app, raw_entity)
2495 entity = entity_pb.EntityProto()
2496 entity.CopyFrom(raw_entity)
2499 for prop in itertools.chain(entity.property_list(),
2500 entity.raw_property_list()):
2501 FillUser(prop)
2503 last_element = entity.key().path().element_list()[-1]
2504 if not (last_element.id() or last_element.has_name()):
2505 insert = True
2508 if self._auto_id_policy == SEQUENTIAL:
2509 last_element.set_id(self._AllocateSequentialIds(entity.key())[0])
2510 else:
2511 full_key = self._AllocateIds([entity.key()])[0]
2512 last_element.set_id(full_key.path().element_list()[-1].id())
2513 else:
2514 insert = False
2516 entity_group = _GetEntityGroup(entity.key())
2517 entity.mutable_entity_group().CopyFrom(entity_group.path())
2518 entity_group_key = datastore_types.ReferenceToKeyValue(entity_group)
2519 grouped_entities[entity_group_key].append((entity, insert))
2523 key = entity_pb.Reference()
2524 key.CopyFrom(entity.key())
2525 result[i] = key
2527 if transaction:
2529 txn = self.GetTxn(transaction, trusted, calling_app)
2530 for group in grouped_entities.values():
2531 for entity, insert in group:
2533 indexes = _FilterIndexesByKind(entity.key(), self.GetIndexes(
2534 entity.key().app(), trusted, calling_app))
2535 txn.Put(entity, insert, indexes)
2536 else:
2538 for entities in grouped_entities.itervalues():
2539 txn_cost = self._RunInTxn(
2540 entities, entities[0][0].key().app(),
2542 lambda txn, v: txn.Put(v[0], v[1], _FilterIndexesByKind(
2543 v[0].key(),
2544 self.GetIndexes(v[0].key().app(), trusted, calling_app))))
2545 _UpdateCost(cost, txn_cost.entity_writes(), txn_cost.index_writes())
2546 return result
2548 def Delete(self, raw_keys, cost, transaction=None,
2549 trusted=False, calling_app=None):
2550 """Deletes the entities associated with the given keys.
2552 Args:
2553 raw_keys: A list of unverified entity_pb.Reference objects.
2554 cost: Out param. The cost of putting the provided entities.
2555 transaction: The datastore_pb.Transaction to use or None.
2556 trusted: If the calling app is trusted.
2557 calling_app: The app requesting the results or None to pull the app from
2558 the environment.
2560 if not raw_keys:
2561 return
2563 calling_app = datastore_types.ResolveAppId(calling_app)
2566 grouped_keys = collections.defaultdict(list)
2567 for key in raw_keys:
2568 CheckReference(trusted, calling_app, key)
2569 entity_group = _GetEntityGroup(key)
2570 entity_group_key = datastore_types.ReferenceToKeyValue(entity_group)
2571 grouped_keys[entity_group_key].append(key)
2573 if transaction:
2575 txn = self.GetTxn(transaction, trusted, calling_app)
2576 for key in raw_keys:
2578 indexes = _FilterIndexesByKind(key, self.GetIndexes(
2579 key.app(), trusted, calling_app))
2580 txn.Delete(key, indexes)
2581 else:
2583 for keys in grouped_keys.itervalues():
2585 txn_cost = self._RunInTxn(
2586 keys, keys[0].app(),
2587 lambda txn, key: txn.Delete(key, _FilterIndexesByKind(
2588 key, self.GetIndexes(key.app(), trusted, calling_app))))
2589 _UpdateCost(cost, txn_cost.entity_writes(), txn_cost.index_writes())
2591 def Touch(self, raw_keys, trusted=False, calling_app=None):
2592 """Applies all outstanding writes."""
2593 calling_app = datastore_types.ResolveAppId(calling_app)
2595 grouped_keys = collections.defaultdict(list)
2596 for key in raw_keys:
2597 CheckReference(trusted, calling_app, key)
2598 entity_group = _GetEntityGroup(key)
2599 entity_group_key = datastore_types.ReferenceToKeyValue(entity_group)
2600 grouped_keys[entity_group_key].append(key)
2602 for keys in grouped_keys.itervalues():
2603 self._RunInTxn(keys, keys[0].app(), lambda txn, key: None)
2605 def _RunInTxn(self, values, app, op):
2606 """Runs the given values in a separate Txn.
2608 Retries up to _RETRIES times on CONCURRENT_TRANSACTION errors.
2610 Args:
2611 values: A list of arguments to op.
2612 app: The app to create the Txn on.
2613 op: A function to run on each value in the Txn.
2615 Returns:
2616 The cost of the txn.
2618 retries = 0
2619 backoff = _INITIAL_RETRY_DELAY_MS / 1000.0
2620 while True:
2621 try:
2622 txn = self._BeginTransaction(app, False)
2623 for value in values:
2624 op(txn, value)
2625 return txn.Commit()
2626 except apiproxy_errors.ApplicationError, e:
2627 if e.application_error == datastore_pb.Error.CONCURRENT_TRANSACTION:
2629 retries += 1
2630 if retries <= _RETRIES:
2631 time.sleep(backoff)
2632 backoff *= _RETRY_DELAY_MULTIPLIER
2633 if backoff * 1000.0 > _MAX_RETRY_DELAY_MS:
2634 backoff = _MAX_RETRY_DELAY_MS / 1000.0
2635 continue
2636 raise
2638 def _CheckHasIndex(self, query, trusted=False, calling_app=None):
2639 """Checks if the query can be satisfied given the existing indexes.
2641 Args:
2642 query: the datastore_pb.Query to check
2643 trusted: True if the calling app is trusted (like dev_admin_console)
2644 calling_app: app_id of the current running application
2646 if query.kind() in self._pseudo_kinds or not self._require_indexes:
2647 return
2649 minimal_index = datastore_index.MinimalCompositeIndexForQuery(query,
2650 (datastore_index.ProtoToIndexDefinition(index)
2651 for index in self.GetIndexes(query.app(), trusted, calling_app)
2652 if index.state() == entity_pb.CompositeIndex.READ_WRITE))
2653 if minimal_index is not None:
2654 msg = ('This query requires a composite index that is not defined. '
2655 'You must update the index.yaml file in your application root.')
2656 is_most_efficient, kind, ancestor, properties = minimal_index
2657 if not is_most_efficient:
2659 yaml = datastore_index.IndexYamlForQuery(kind, ancestor,
2660 datastore_index.GetRecommendedIndexProperties(properties))
2661 msg += '\nThe following index is the minimum index required:\n' + yaml
2662 raise apiproxy_errors.ApplicationError(datastore_pb.Error.NEED_INDEX, msg)
2664 def SetAutoIdPolicy(self, auto_id_policy):
2665 """Set value of _auto_id_policy flag (default SEQUENTIAL).
2667 SEQUENTIAL auto ID assignment behavior will eventually be deprecated
2668 and the default will be SCATTERED.
2670 Args:
2671 auto_id_policy: string constant.
2672 Raises:
2673 TypeError: if auto_id_policy is not one of SEQUENTIAL or SCATTERED.
2675 valid_policies = (SEQUENTIAL, SCATTERED)
2676 if auto_id_policy not in valid_policies:
2677 raise TypeError('auto_id_policy must be in %s, found %s instead',
2678 valid_policies, auto_id_policy)
2679 self._auto_id_policy = auto_id_policy
2683 def Write(self):
2684 """Writes the datastore to disk."""
2685 self.Flush()
2687 def _GetQueryCursor(self, query, filters, orders, index_list,
2688 filter_predicate):
2689 """Runs the given datastore_pb.Query and returns a QueryCursor for it.
2691 This must be implemented by a sub-class. The sub-class does not need to
2692 enforced any consistency guarantees (and can just blindly read).
2694 Args:
2695 query: The datastore_pb.Query to run.
2696 filters: A list of filters that override the ones found on query.
2697 orders: A list of orders that override the ones found on query.
2698 index_list: A list of indexes used by the query.
2699 filter_predicate: an additional filter of type
2700 datastore_query.FilterPredicate. This is passed along to implement V4
2701 specific filters without changing the entire stub.
2703 Returns:
2704 A BaseCursor that can be used to fetch query results.
2706 raise NotImplementedError
2708 def _Get(self, reference):
2709 """Get the entity for the given reference or None.
2711 This must be implemented by a sub-class. The sub-class does not need to
2712 enforced any consistency guarantees (and can just blindly read).
2714 Args:
2715 reference: A entity_pb.Reference to loop up.
2717 Returns:
2718 The entity_pb.EntityProto associated with the given reference or None.
2720 raise NotImplementedError
2722 def _AllocateSequentialIds(self, reference, size=1, max_id=None):
2723 """Allocate sequential ids for given reference.
2725 Args:
2726 reference: An entity_pb.Reference to allocate an id for.
2727 size: The size of the range to allocate
2728 max_id: The upper bound of the range to allocate
2730 Returns:
2731 A tuple containing (min, max) of the allocated range.
2733 raise NotImplementedError
2735 def _AllocateIds(self, references):
2736 """Allocate or reserves IDs for the v4 datastore API.
2738 Incomplete keys are allocated scattered IDs. Complete keys have every id in
2739 their paths reserved in the appropriate ID space.
2741 Args:
2742 references: a list of entity_pb.Reference objects to allocate or reserve
2744 Returns:
2745 a list of complete entity_pb.Reference objects corresponding to the
2746 incomplete keys in the input, with newly allocated ids.
2748 raise NotImplementedError
2751 def _NeedsIndexes(func):
2752 """A decorator for DatastoreStub methods that require or affect indexes.
2754 Updates indexes to match index.yaml before the call and updates index.yaml
2755 after the call if require_indexes is False. If root_path is not set, this is a
2756 no op.
2759 def UpdateIndexesWrapper(self, *args, **kwargs):
2760 self._SetupIndexes()
2761 try:
2762 return func(self, *args, **kwargs)
2763 finally:
2764 self._UpdateIndexes()
2766 return UpdateIndexesWrapper
2769 class EntityGroupPseudoKind(object):
2770 """A common implementation of get() for the __entity_group__ pseudo-kind.
2772 Public properties:
2773 name: the pseudo-kind name
2775 name = '__entity_group__'
2785 base_version = int(time.time() * 1e6)
2787 def Get(self, txn, key):
2788 """Fetch key of this pseudo-kind within txn.
2790 Args:
2791 txn: transaction within which Get occurs, may be None if this is an
2792 eventually consistent Get.
2793 key: key of pseudo-entity to Get.
2795 Returns:
2796 An entity for key, or None if it doesn't exist.
2799 if not txn:
2800 txn = self._stub._BeginTransaction(key.app(), False)
2801 try:
2802 return self.Get(txn, key)
2803 finally:
2804 txn.Rollback()
2807 if isinstance(txn._txn_manager._consistency_policy,
2808 MasterSlaveConsistencyPolicy):
2809 return None
2816 path = key.path()
2817 if path.element_size() != 2 or path.element_list()[-1].id() != 1:
2818 return None
2820 tracker = txn._GetTracker(key)
2821 tracker._GrabSnapshot(txn._txn_manager)
2823 eg = entity_pb.EntityProto()
2824 eg.mutable_key().CopyFrom(key)
2825 eg.mutable_entity_group().CopyFrom(_GetEntityGroup(key).path())
2826 version = entity_pb.Property()
2827 version.set_name('__version__')
2828 version.set_multiple(False)
2829 version.mutable_value().set_int64value(
2830 tracker._read_pos + self.base_version)
2831 eg.property_list().append(version)
2832 return eg
2834 def Query(self, query, filters, orders):
2835 """Perform a query on this pseudo-kind.
2837 Args:
2838 query: the original datastore_pb.Query.
2839 filters: the filters from query.
2840 orders: the orders from query.
2842 Returns:
2843 always raises an error
2847 raise apiproxy_errors.ApplicationError(
2848 datastore_pb.Error.BAD_REQUEST, 'queries not supported on ' + self.name)
2851 class _CachedIndexDefinitions(object):
2852 """Records definitions read from index configuration files for later reuse.
2854 If the names and modification times of the configuration files are unchanged,
2855 then the index configurations previously parsed out of those files can be
2856 reused.
2858 Attributes:
2859 file_names: a list of the names of the configuration files. This will have
2860 one element when the configuration is based on an index.yaml but may have
2861 more than one if it is based on datastore-indexes.xml and
2862 datastore-indexes-auto.xml.
2863 last_modifieds: a list of floats that are the modification times of the
2864 files in file_names.
2865 index_protos: a list of entity_pb.CompositeIndex objects corresponding to
2866 the index definitions read from file_names.
2869 def __init__(self, file_names, last_modifieds, index_protos):
2871 assert len(file_names) <= 1
2872 self.file_names = file_names
2873 self.last_modifieds = last_modifieds
2874 self.index_protos = index_protos
2877 class DatastoreStub(object):
2878 """A stub that maps datastore service calls on to a BaseDatastore.
2880 This class also keeps track of query cursors.
2883 def __init__(self,
2884 datastore,
2885 app_id=None,
2886 trusted=None,
2887 root_path=None):
2888 super(DatastoreStub, self).__init__()
2889 self._datastore = datastore
2890 self._app_id = datastore_types.ResolveAppId(app_id)
2891 self._trusted = trusted
2892 self._root_path = root_path
2893 self._xml_configuration = self._XmlConfiguration()
2896 self.__query_history = {}
2899 self.__query_ci_history = set()
2903 self._cached_index_definitions = _CachedIndexDefinitions([], [], None)
2905 if self._require_indexes or root_path is None:
2907 self._index_config_updater = None
2908 else:
2911 updater_class = (
2912 datastore_stub_index.DatastoreIndexesAutoXmlUpdater
2913 if self._xml_configuration else datastore_stub_index.IndexYamlUpdater)
2914 self._index_config_updater = updater_class(root_path)
2916 DatastoreStub.Clear(self)
2918 def _XmlConfiguration(self):
2919 """Return True if the app at self._root_path uses XML configuration files.
2921 An app uses XML configuration files if it has a WEB-INF subdirectory and it
2922 does not have an index.yaml at its root. We assume this even if it doesn't
2923 currently have any configuration files at all, because then we will want to
2924 create a new datastore-indexes-auto.xml rather than create a new index.yaml.
2926 Returns:
2927 True if the app uses XML configuration files, False otherwise.
2929 Raises:
2930 yaml_errors.AmbiguousConfigurationFiles: if there is both an index.yaml
2931 and either or both of the two possible XML configuration files.
2933 if not self._root_path:
2934 return False
2935 index_yaml = os.path.join(self._root_path, 'index.yaml')
2936 web_inf = os.path.join(self._root_path, 'WEB-INF')
2937 datastore_indexes_xml = os.path.join(web_inf, 'datastore-indexes.xml')
2938 datastore_indexes_auto_xml = os.path.join(
2939 web_inf, 'appengine-generated', 'datastore-indexes-auto.xml')
2940 existing = [
2941 f for f in [
2942 index_yaml, datastore_indexes_xml, datastore_indexes_auto_xml]
2943 if os.path.isfile(f)]
2944 if existing == [index_yaml]:
2945 return False
2946 elif index_yaml in existing:
2947 raise yaml_errors.AmbiguousConfigurationFiles(
2948 'App has both XML and YAML configuration files: %s' % existing)
2949 else:
2950 return os.path.isdir(web_inf)
2954 def Clear(self):
2955 """Clears out all stored values."""
2956 self._query_cursors = {}
2957 self.__query_history = {}
2958 self.__query_ci_history = set()
2960 def QueryHistory(self):
2961 """Returns a dict that maps Query PBs to times they've been run."""
2963 return dict((pb, times) for pb, times in self.__query_history.items()
2964 if pb.app() == self._app_id)
2966 def _QueryCompositeIndexHistoryLength(self):
2967 """Returns the length of the CompositeIndex set for query history."""
2968 return len(self.__query_ci_history)
2970 def SetTrusted(self, trusted):
2971 """Set/clear the trusted bit in the stub.
2973 This bit indicates that the app calling the stub is trusted. A
2974 trusted app can write to datastores of other apps.
2976 Args:
2977 trusted: boolean.
2979 self._trusted = trusted
2983 def _Dynamic_Get(self, req, res):
2986 transaction = req.has_transaction() and req.transaction() or None
2989 if req.allow_deferred() and req.key_size() > _MAXIMUM_RESULTS:
2993 keys_to_get = req.key_list()[-_MAXIMUM_RESULTS:]
2994 deferred_keys = req.key_list()[:-_MAXIMUM_RESULTS]
2995 res.deferred_list().extend(deferred_keys)
2996 else:
2998 keys_to_get = req.key_list()
3000 res.set_in_order(not req.allow_deferred())
3002 total_response_bytes = 0
3003 for index, entity in enumerate(self._datastore.Get(keys_to_get,
3004 transaction,
3005 req.has_failover_ms(),
3006 self._trusted,
3007 self._app_id)):
3008 entity_size = entity and entity.ByteSize() or 0
3011 if (req.allow_deferred()
3012 and index > 0
3013 and total_response_bytes + entity_size > _MAXIMUM_QUERY_RESULT_BYTES):
3015 res.deferred_list().extend(keys_to_get[index:])
3016 break
3017 elif entity:
3018 entity_result = res.add_entity()
3019 entity_result.mutable_entity().CopyFrom(entity)
3020 total_response_bytes += entity_size
3021 else:
3023 entity_result = res.add_entity()
3024 entity_result.mutable_key().CopyFrom(keys_to_get[index])
3026 def _Dynamic_Put(self, req, res):
3027 transaction = req.has_transaction() and req.transaction() or None
3028 res.key_list().extend(self._datastore.Put(req.entity_list(),
3029 res.mutable_cost(),
3030 transaction,
3031 self._trusted, self._app_id))
3033 def _Dynamic_Delete(self, req, res):
3034 transaction = req.has_transaction() and req.transaction() or None
3035 self._datastore.Delete(req.key_list(), res.mutable_cost(), transaction,
3036 self._trusted, self._app_id)
3038 def _Dynamic_Touch(self, req, _):
3039 self._datastore.Touch(req.key_list(), self._trusted, self._app_id)
3041 @_NeedsIndexes
3042 def _Dynamic_RunQuery(self, query, query_result, filter_predicate=None):
3043 self.__UpgradeCursors(query)
3044 cursor = self._datastore.GetQueryCursor(query, self._trusted, self._app_id,
3045 filter_predicate)
3047 if query.has_count():
3048 count = query.count()
3049 elif query.has_limit():
3050 count = query.limit()
3051 else:
3052 count = self._BATCH_SIZE
3054 cursor.PopulateQueryResult(query_result, count, query.offset(),
3055 query.compile(), first_result=True)
3056 if query_result.has_cursor():
3057 self._query_cursors[query_result.cursor().cursor()] = cursor
3060 if query.compile():
3063 compiled_query = query_result.mutable_compiled_query()
3064 compiled_query.set_keys_only(query.keys_only())
3065 compiled_query.mutable_primaryscan().set_index_name(query.Encode())
3066 self.__UpdateQueryHistory(query)
3068 def __UpgradeCursors(self, query):
3069 """Upgrades compiled cursors in place.
3071 If the cursor position does not specify before_ascending, populate it.
3072 If before_ascending is already populated, use it and the sort direction
3073 from the query to set an appropriate value for start_inclusive.
3075 Args:
3076 query: datastore_pb.Query
3078 first_sort_direction = None
3079 if query.order_list():
3080 first_sort_direction = query.order(0).direction()
3082 for compiled_cursor in [query.compiled_cursor(),
3083 query.end_compiled_cursor()]:
3084 self.__UpgradeCursor(compiled_cursor, first_sort_direction)
3086 def __UpgradeCursor(self, compiled_cursor, first_sort_direction):
3087 """Upgrades a compiled cursor in place.
3089 If the cursor position does not specify before_ascending, populate it.
3090 If before_ascending is already populated, use it and the provided direction
3091 to set an appropriate value for start_inclusive.
3093 Args:
3094 compiled_cursor: datastore_pb.CompiledCursor
3095 first_sort_direction: first sort direction from the query or None
3099 if not self.__IsPlannable(compiled_cursor):
3100 return
3101 elif compiled_cursor.position().has_before_ascending():
3102 _SetStartInclusive(compiled_cursor.position(), first_sort_direction)
3103 elif compiled_cursor.position().has_start_inclusive():
3104 _SetBeforeAscending(compiled_cursor.position(), first_sort_direction)
3106 def __IsPlannable(self, compiled_cursor):
3107 """Returns True if compiled_cursor is plannable.
3109 Args:
3110 compiled_cursor: datastore_pb.CompiledCursor
3112 position = compiled_cursor.position()
3113 return position.has_key() or position.indexvalue_list()
3115 def __UpdateQueryHistory(self, query):
3117 clone = datastore_pb.Query()
3118 clone.CopyFrom(query)
3119 clone.clear_hint()
3120 clone.clear_limit()
3121 clone.clear_offset()
3122 clone.clear_count()
3123 if clone in self.__query_history:
3124 self.__query_history[clone] += 1
3125 else:
3126 self.__query_history[clone] = 1
3127 if clone.app() == self._app_id:
3128 self.__query_ci_history.add(
3129 datastore_index.CompositeIndexForQuery(clone))
3131 def _Dynamic_Next(self, next_request, query_result):
3132 app = next_request.cursor().app()
3133 CheckAppId(self._trusted, self._app_id, app)
3135 cursor = self._query_cursors.get(next_request.cursor().cursor())
3136 Check(cursor and cursor.app == app,
3137 'Cursor %d not found' % next_request.cursor().cursor())
3139 count = self._BATCH_SIZE
3140 if next_request.has_count():
3141 count = next_request.count()
3143 cursor.PopulateQueryResult(query_result, count, next_request.offset(),
3144 next_request.compile(), first_result=False)
3146 if not query_result.has_cursor():
3147 del self._query_cursors[next_request.cursor().cursor()]
3149 def _Dynamic_AddActions(self, request, _):
3150 """Associates the creation of one or more tasks with a transaction.
3152 Args:
3153 request: A taskqueue_service_pb.TaskQueueBulkAddRequest containing the
3154 tasks that should be created when the transaction is committed.
3160 if not request.add_request_list():
3161 return
3163 transaction = request.add_request_list()[0].transaction()
3164 txn = self._datastore.GetTxn(transaction, self._trusted, self._app_id)
3165 new_actions = []
3166 for add_request in request.add_request_list():
3170 Check(add_request.transaction() == transaction,
3171 'Cannot add requests to different transactions')
3172 clone = taskqueue_service_pb.TaskQueueAddRequest()
3173 clone.CopyFrom(add_request)
3174 clone.clear_transaction()
3175 new_actions.append(clone)
3177 txn.AddActions(new_actions, self._MAX_ACTIONS_PER_TXN)
3179 def _Dynamic_BeginTransaction(self, req, transaction):
3180 CheckAppId(self._trusted, self._app_id, req.app())
3181 transaction.CopyFrom(self._datastore.BeginTransaction(
3182 req.app(), req.allow_multiple_eg()))
3184 def _Dynamic_Commit(self, transaction, res):
3185 CheckAppId(self._trusted, self._app_id, transaction.app())
3186 txn = self._datastore.GetTxn(transaction, self._trusted, self._app_id)
3187 res.mutable_cost().CopyFrom(txn.Commit())
3189 def _Dynamic_Rollback(self, transaction, _):
3190 CheckAppId(self._trusted, self._app_id, transaction.app())
3191 txn = self._datastore.GetTxn(transaction, self._trusted, self._app_id)
3192 txn.Rollback()
3194 def _Dynamic_CreateIndex(self, index, id_response):
3195 id_response.set_value(self._datastore.CreateIndex(index,
3196 self._trusted,
3197 self._app_id))
3199 @_NeedsIndexes
3200 def _Dynamic_GetIndices(self, app_str, composite_indices):
3201 composite_indices.index_list().extend(self._datastore.GetIndexes(
3202 app_str.value(), self._trusted, self._app_id))
3204 def _Dynamic_UpdateIndex(self, index, _):
3205 self._datastore.UpdateIndex(index, self._trusted, self._app_id)
3207 def _Dynamic_DeleteIndex(self, index, _):
3208 self._datastore.DeleteIndex(index, self._trusted, self._app_id)
3210 def _Dynamic_AllocateIds(self, allocate_ids_request, allocate_ids_response):
3211 Check(not allocate_ids_request.has_model_key()
3212 or not allocate_ids_request.reserve_list(),
3213 'Cannot allocate and reserve IDs in the same request')
3214 if allocate_ids_request.reserve_list():
3215 Check(not allocate_ids_request.has_size(),
3216 'Cannot specify size when reserving IDs')
3217 Check(not allocate_ids_request.has_max(),
3218 'Cannot specify max when reserving IDs')
3220 if allocate_ids_request.has_model_key():
3221 CheckAppId(allocate_ids_request.model_key().app(),
3222 self._trusted, self._app_id)
3224 reference = allocate_ids_request.model_key()
3226 (start, end) = self._datastore._AllocateSequentialIds(
3227 reference, allocate_ids_request.size(), allocate_ids_request.max())
3229 allocate_ids_response.set_start(start)
3230 allocate_ids_response.set_end(end)
3231 else:
3232 for reference in allocate_ids_request.reserve_list():
3233 CheckAppId(reference.app(), self._trusted, self._app_id)
3234 self._datastore._AllocateIds(allocate_ids_request.reserve_list())
3235 allocate_ids_response.set_start(0)
3236 allocate_ids_response.set_end(0)
3238 def _SetupIndexes(self, _open=open):
3239 """Ensure that the set of existing composite indexes matches index.yaml.
3241 Note: this is similar to the algorithm used by the admin console for
3242 the same purpose.
3247 if not self._root_path:
3248 return
3249 file_names = [os.path.join(self._root_path, 'index.yaml')]
3250 file_mtimes = [os.path.getmtime(f) for f in file_names if os.path.exists(f)]
3251 if (self._cached_index_definitions.file_names == file_names and
3252 all(os.path.exists(f) for f in file_names) and
3253 self._cached_index_definitions.last_modifieds == file_mtimes):
3254 requested_indexes = self._cached_index_definitions.index_protos
3255 else:
3256 file_mtimes = []
3257 index_texts = []
3258 for file_name in file_names:
3259 try:
3260 file_mtimes.append(os.path.getmtime(file_name))
3261 with _open(file_name, 'r') as fh:
3262 index_texts.append(fh.read())
3263 except (OSError, IOError):
3264 pass
3266 requested_indexes = []
3267 if len(index_texts) == len(file_names):
3268 all_ok = True
3269 for index_text in index_texts:
3271 index_defs = datastore_index.ParseIndexDefinitions(index_text)
3272 if index_defs is None or index_defs.indexes is None:
3273 all_ok = False
3274 else:
3276 requested_indexes.extend(
3277 datastore_index.IndexDefinitionsToProtos(
3278 self._app_id, index_defs.indexes))
3279 if all_ok:
3280 self._cached_index_definitions = _CachedIndexDefinitions(
3281 file_names, file_mtimes, requested_indexes)
3284 existing_indexes = self._datastore.GetIndexes(
3285 self._app_id, self._trusted, self._app_id)
3288 requested = dict((x.definition().Encode(), x) for x in requested_indexes)
3289 existing = dict((x.definition().Encode(), x) for x in existing_indexes)
3292 created = 0
3293 for key, index in requested.iteritems():
3294 if key not in existing:
3295 new_index = entity_pb.CompositeIndex()
3296 new_index.CopyFrom(index)
3297 new_index.set_id(datastore_admin.CreateIndex(new_index))
3298 new_index.set_state(entity_pb.CompositeIndex.READ_WRITE)
3299 datastore_admin.UpdateIndex(new_index)
3300 created += 1
3303 deleted = 0
3304 for key, index in existing.iteritems():
3305 if key not in requested:
3306 datastore_admin.DeleteIndex(index)
3307 deleted += 1
3310 if created or deleted:
3311 logging.debug('Created %d and deleted %d index(es); total %d',
3312 created, deleted, len(requested))
3314 def _UpdateIndexes(self):
3315 if self._index_config_updater is not None:
3316 self._index_config_updater.UpdateIndexConfig()
3319 class StubQueryConverter(object):
3320 """Converter for v3 and v4 queries suitable for use in stubs."""
3322 def __init__(self, entity_converter):
3323 self._entity_converter = entity_converter
3325 def v4_to_v3_compiled_cursor(self, v4_cursor, v3_compiled_cursor):
3326 """Converts a v4 cursor string to a v3 CompiledCursor.
3328 Args:
3329 v4_cursor: a string representing a v4 query cursor
3330 v3_compiled_cursor: a datastore_pb.CompiledCursor to populate
3332 v3_compiled_cursor.Clear()
3333 try:
3334 v3_compiled_cursor.ParseFromString(v4_cursor)
3335 except ProtocolBuffer.ProtocolBufferDecodeError:
3336 raise datastore_pbs.InvalidConversionError('Invalid query cursor.')
3338 def v3_to_v4_compiled_cursor(self, v3_compiled_cursor):
3339 """Converts a v3 CompiledCursor to a v4 cursor string.
3341 Args:
3342 v3_compiled_cursor: a datastore_pb.CompiledCursor
3344 Returns:
3345 a string representing a v4 query cursor
3347 return v3_compiled_cursor.SerializeToString()
3349 def v4_to_v3_query(self, v4_partition_id, v4_query, v3_query):
3350 """Converts a v4 Query to a v3 Query.
3352 Args:
3353 v4_partition_id: a datastore_v4_pb.PartitionId
3354 v4_query: a datastore_v4_pb.Query
3355 v3_query: a datastore_pb.Query to populate
3357 Raises:
3358 InvalidConversionError if the query cannot be converted
3360 v3_query.Clear()
3362 if v4_partition_id.dataset_id():
3363 v3_query.set_app(v4_partition_id.dataset_id())
3364 if v4_partition_id.has_namespace():
3365 v3_query.set_name_space(v4_partition_id.namespace())
3367 v3_query.set_persist_offset(True)
3368 v3_query.set_require_perfect_plan(True)
3369 v3_query.set_compile(True)
3372 if v4_query.has_limit():
3373 v3_query.set_limit(v4_query.limit())
3374 if v4_query.offset():
3375 v3_query.set_offset(v4_query.offset())
3376 if v4_query.has_start_cursor():
3377 self.v4_to_v3_compiled_cursor(v4_query.start_cursor(),
3378 v3_query.mutable_compiled_cursor())
3379 if v4_query.has_end_cursor():
3380 self.v4_to_v3_compiled_cursor(v4_query.end_cursor(),
3381 v3_query.mutable_end_compiled_cursor())
3384 if v4_query.kind_list():
3385 datastore_pbs.check_conversion(len(v4_query.kind_list()) == 1,
3386 'multiple kinds not supported')
3387 v3_query.set_kind(v4_query.kind(0).name())
3390 has_key_projection = False
3391 for prop in v4_query.projection_list():
3392 if prop.property().name() == datastore_pbs.PROPERTY_NAME_KEY:
3393 has_key_projection = True
3394 else:
3395 v3_query.add_property_name(prop.property().name())
3396 if has_key_projection and not v3_query.property_name_list():
3397 v3_query.set_keys_only(True)
3400 for prop in v4_query.group_by_list():
3401 v3_query.add_group_by_property_name(prop.name())
3404 self.__populate_v3_filters(v4_query.filter(), v3_query)
3407 for v4_order in v4_query.order_list():
3408 v3_order = v3_query.add_order()
3409 v3_order.set_property(v4_order.property().name())
3410 if v4_order.has_direction():
3411 v3_order.set_direction(v4_order.direction())
3413 def v3_to_v4_query(self, v3_query, v4_query):
3414 """Converts a v3 Query to a v4 Query.
3416 Args:
3417 v3_query: a datastore_pb.Query
3418 v4_query: a datastore_v4_pb.Query to populate
3420 Raises:
3421 InvalidConversionError if the query cannot be converted
3423 v4_query.Clear()
3425 datastore_pbs.check_conversion(not v3_query.has_distinct(),
3426 'distinct option not supported')
3427 datastore_pbs.check_conversion(v3_query.require_perfect_plan(),
3428 'non-perfect plans not supported')
3432 if v3_query.has_limit():
3433 v4_query.set_limit(v3_query.limit())
3434 if v3_query.offset():
3435 v4_query.set_offset(v3_query.offset())
3436 if v3_query.has_compiled_cursor():
3437 v4_query.set_start_cursor(
3438 self.v3_to_v4_compiled_cursor(v3_query.compiled_cursor()))
3439 if v3_query.has_end_compiled_cursor():
3440 v4_query.set_end_cursor(
3441 self.v3_to_v4_compiled_cursor(v3_query.end_compiled_cursor()))
3444 if v3_query.has_kind():
3445 v4_query.add_kind().set_name(v3_query.kind())
3448 for name in v3_query.property_name_list():
3449 v4_query.add_projection().mutable_property().set_name(name)
3450 if v3_query.keys_only():
3451 v4_query.add_projection().mutable_property().set_name(
3452 datastore_pbs.PROPERTY_NAME_KEY)
3455 for name in v3_query.group_by_property_name_list():
3456 v4_query.add_group_by().set_name(name)
3459 num_v4_filters = len(v3_query.filter_list())
3460 if v3_query.has_ancestor():
3461 num_v4_filters += 1
3463 if num_v4_filters == 1:
3464 get_property_filter = self.__get_property_filter
3465 elif num_v4_filters >= 1:
3466 v4_query.mutable_filter().mutable_composite_filter().set_operator(
3467 datastore_v4_pb.CompositeFilter.AND)
3468 get_property_filter = self.__add_property_filter
3470 if v3_query.has_ancestor():
3471 self.__v3_query_to_v4_ancestor_filter(v3_query,
3472 get_property_filter(v4_query))
3473 for v3_filter in v3_query.filter_list():
3474 self.__v3_filter_to_v4_property_filter(v3_filter,
3475 get_property_filter(v4_query))
3478 for v3_order in v3_query.order_list():
3479 v4_order = v4_query.add_order()
3480 v4_order.mutable_property().set_name(v3_order.property())
3481 if v3_order.has_direction():
3482 v4_order.set_direction(v3_order.direction())
3484 def __get_property_filter(self, v4_query):
3485 """Returns the PropertyFilter from the query's top-level filter."""
3486 return v4_query.mutable_filter().mutable_property_filter()
3488 def __add_property_filter(self, v4_query):
3489 """Adds and returns a PropertyFilter from the query's composite filter."""
3490 v4_comp_filter = v4_query.mutable_filter().mutable_composite_filter()
3491 return v4_comp_filter.add_filter().mutable_property_filter()
3493 def __populate_v3_filters(self, v4_filter, v3_query):
3494 """Populates a filters for a v3 Query.
3496 Args:
3497 v4_filter: a datastore_v4_pb.Filter
3498 v3_query: a datastore_pb.Query to populate with filters
3501 datastore_pbs.check_conversion(not v4_filter.has_bounding_circle_filter(),
3502 'bounding circle filter not supported')
3503 datastore_pbs.check_conversion(not v4_filter.has_bounding_box_filter(),
3504 'bounding box filter not supported')
3506 if v4_filter.has_property_filter():
3507 v4_property_filter = v4_filter.property_filter()
3508 if (v4_property_filter.operator()
3509 == datastore_v4_pb.PropertyFilter.HAS_ANCESTOR):
3510 datastore_pbs.check_conversion(
3511 v4_property_filter.value().has_key_value(),
3512 'HAS_ANCESTOR requires a reference value')
3513 datastore_pbs.check_conversion((v4_property_filter.property().name()
3514 == datastore_pbs.PROPERTY_NAME_KEY),
3515 'unsupported property')
3516 datastore_pbs.check_conversion(not v3_query.has_ancestor(),
3517 'duplicate ancestor constraint')
3518 self._entity_converter.v4_to_v3_reference(
3519 v4_property_filter.value().key_value(),
3520 v3_query.mutable_ancestor())
3521 else:
3522 v3_filter = v3_query.add_filter()
3523 property_name = v4_property_filter.property().name()
3524 v3_filter.set_op(v4_property_filter.operator())
3525 datastore_pbs.check_conversion(
3526 not v4_property_filter.value().list_value_list(),
3527 ('unsupported value type, %s, in property filter'
3528 ' on "%s"' % ('list_value', property_name)))
3529 prop = v3_filter.add_property()
3530 prop.set_multiple(False)
3531 prop.set_name(property_name)
3532 self._entity_converter.v4_value_to_v3_property_value(
3533 v4_property_filter.value(), prop.mutable_value())
3534 elif v4_filter.has_composite_filter():
3535 datastore_pbs.check_conversion((v4_filter.composite_filter().operator()
3536 == datastore_v4_pb.CompositeFilter.AND),
3537 'unsupported composite property operator')
3538 for v4_sub_filter in v4_filter.composite_filter().filter_list():
3539 self.__populate_v3_filters(v4_sub_filter, v3_query)
3541 def __v3_filter_to_v4_property_filter(self, v3_filter, v4_property_filter):
3542 """Converts a v3 Filter to a v4 PropertyFilter.
3544 Args:
3545 v3_filter: a datastore_pb.Filter
3546 v4_property_filter: a datastore_v4_pb.PropertyFilter to populate
3548 Raises:
3549 InvalidConversionError if the filter cannot be converted
3551 datastore_pbs.check_conversion(v3_filter.property_size() == 1,
3552 'invalid filter')
3553 datastore_pbs.check_conversion(v3_filter.op() <= 5,
3554 'unsupported filter op: %d' % v3_filter.op())
3555 v4_property_filter.Clear()
3556 v4_property_filter.set_operator(v3_filter.op())
3557 v4_property_filter.mutable_property().set_name(v3_filter.property(0).name())
3558 self._entity_converter.v3_property_to_v4_value(
3559 v3_filter.property(0), True, v4_property_filter.mutable_value())
3561 def __v3_query_to_v4_ancestor_filter(self, v3_query, v4_property_filter):
3562 """Converts a v3 Query to a v4 ancestor PropertyFilter.
3564 Args:
3565 v3_query: a datastore_pb.Query
3566 v4_property_filter: a datastore_v4_pb.PropertyFilter to populate
3568 v4_property_filter.Clear()
3569 v4_property_filter.set_operator(
3570 datastore_v4_pb.PropertyFilter.HAS_ANCESTOR)
3571 prop = v4_property_filter.mutable_property()
3572 prop.set_name(datastore_pbs.PROPERTY_NAME_KEY)
3573 self._entity_converter.v3_to_v4_key(
3574 v3_query.ancestor(),
3575 v4_property_filter.mutable_value().mutable_key_value())
3579 __query_converter = StubQueryConverter(datastore_pbs.get_entity_converter())
3582 def get_query_converter():
3583 """Returns a converter for v3 and v4 queries (not suitable for production).
3585 This converter is suitable for use in stubs but not for production.
3587 Returns:
3588 a StubQueryConverter
3590 return __query_converter
3593 class StubServiceConverter(object):
3594 """Converter for v3/v4 request/response protos suitable for use in stubs."""
3596 def __init__(self, entity_converter, query_converter):
3597 self._entity_converter = entity_converter
3598 self._query_converter = query_converter
3600 def v4_to_v3_cursor(self, v4_query_handle, v3_cursor):
3601 """Converts a v4 cursor string to a v3 Cursor.
3603 Args:
3604 v4_query_handle: a string representing a v4 query handle
3605 v3_cursor: a datastore_pb.Cursor to populate
3607 try:
3608 v3_cursor.ParseFromString(v4_query_handle)
3609 except ProtocolBuffer.ProtocolBufferDecodeError:
3610 raise datastore_pbs.InvalidConversionError('Invalid query handle.')
3611 return v3_cursor
3613 def _v3_to_v4_query_handle(self, v3_cursor):
3614 """Converts a v3 Cursor to a v4 query handle string.
3616 Args:
3617 v3_cursor: a datastore_pb.Cursor
3619 Returns:
3620 a string representing a v4 cursor
3622 return v3_cursor.SerializeToString()
3624 def v4_to_v3_txn(self, v4_txn, v3_txn):
3625 """Converts a v4 transaction string to a v3 Transaction.
3627 Args:
3628 v4_txn: a string representing a v4 transaction
3629 v3_txn: a datastore_pb.Transaction to populate
3631 try:
3632 v3_txn.ParseFromString(v4_txn)
3633 except ProtocolBuffer.ProtocolBufferDecodeError:
3634 raise datastore_pbs.InvalidConversionError('Invalid transaction.')
3635 return v3_txn
3637 def _v3_to_v4_txn(self, v3_txn):
3638 """Converts a v3 Transaction to a v4 transaction string.
3640 Args:
3641 v3_txn: a datastore_pb.Transaction
3643 Returns:
3644 a string representing a v4 transaction
3646 return v3_txn.SerializeToString()
3651 def v4_to_v3_begin_transaction_req(self, app_id, v4_req):
3652 """Converts a v4 BeginTransactionRequest to a v3 BeginTransactionRequest.
3654 Args:
3655 app_id: app id
3656 v4_req: a datastore_v4_pb.BeginTransactionRequest
3658 Returns:
3659 a datastore_pb.BeginTransactionRequest
3661 v3_req = datastore_pb.BeginTransactionRequest()
3662 v3_req.set_app(app_id)
3663 v3_req.set_allow_multiple_eg(v4_req.cross_group())
3664 return v3_req
3666 def v3_to_v4_begin_transaction_resp(self, v3_resp):
3667 """Converts a v3 Transaction to a v4 BeginTransactionResponse.
3669 Args:
3670 v3_resp: a datastore_pb.Transaction
3672 Returns:
3673 a datastore_v4_pb.BeginTransactionResponse
3675 v4_resp = datastore_v4_pb.BeginTransactionResponse()
3676 v4_resp.set_transaction(self._v3_to_v4_txn(v3_resp))
3677 return v4_resp
3682 def v4_rollback_req_to_v3_txn(self, v4_req):
3683 """Converts a v4 RollbackRequest to a v3 Transaction.
3685 Args:
3686 v4_req: a datastore_v4_pb.RollbackRequest
3688 Returns:
3689 a datastore_pb.Transaction
3691 v3_txn = datastore_pb.Transaction()
3692 self.v4_to_v3_txn(v4_req.transaction(), v3_txn)
3693 return v3_txn
3698 def v4_commit_req_to_v3_txn(self, v4_req):
3699 """Converts a v4 CommitRequest to a v3 Transaction.
3701 Args:
3702 v4_req: a datastore_v4_pb.CommitRequest
3704 Returns:
3705 a datastore_pb.Transaction
3707 v3_txn = datastore_pb.Transaction()
3708 self.v4_to_v3_txn(v4_req.transaction(), v3_txn)
3709 return v3_txn
3714 def v4_run_query_req_to_v3_query(self, v4_req):
3715 """Converts a v4 RunQueryRequest to a v3 Query.
3717 GQL is not supported.
3719 Args:
3720 v4_req: a datastore_v4_pb.RunQueryRequest
3722 Returns:
3723 a datastore_pb.Query
3726 datastore_pbs.check_conversion(not v4_req.has_gql_query(),
3727 'GQL not supported')
3728 v3_query = datastore_pb.Query()
3729 self._query_converter.v4_to_v3_query(v4_req.partition_id(), v4_req.query(),
3730 v3_query)
3733 if v4_req.has_suggested_batch_size():
3734 v3_query.set_count(v4_req.suggested_batch_size())
3737 read_options = v4_req.read_options()
3738 if read_options.has_transaction():
3739 self.v4_to_v3_txn(read_options.transaction(),
3740 v3_query.mutable_transaction())
3741 elif (read_options.read_consistency()
3742 == datastore_v4_pb.ReadOptions.EVENTUAL):
3743 v3_query.set_strong(False)
3744 v3_query.set_failover_ms(-1)
3745 elif read_options.read_consistency() == datastore_v4_pb.ReadOptions.STRONG:
3746 v3_query.set_strong(True)
3748 if v4_req.has_min_safe_time_seconds():
3749 v3_query.set_min_safe_time_seconds(v4_req.min_safe_time_seconds())
3751 return v3_query
3753 def v3_to_v4_run_query_req(self, v3_req):
3754 """Converts a v3 Query to a v4 RunQueryRequest.
3756 Args:
3757 v3_req: a datastore_pb.Query
3759 Returns:
3760 a datastore_v4_pb.RunQueryRequest
3762 v4_req = datastore_v4_pb.RunQueryRequest()
3765 v4_partition_id = v4_req.mutable_partition_id()
3766 v4_partition_id.set_dataset_id(v3_req.app())
3767 if v3_req.name_space():
3768 v4_partition_id.set_namespace(v3_req.name_space())
3771 if v3_req.has_count():
3772 v4_req.set_suggested_batch_size(v3_req.count())
3775 if v3_req.has_transaction():
3776 v4_req.mutable_read_options().set_transaction(
3777 self._v3_to_v4_txn(v3_req.transaction()))
3778 elif v3_req.strong():
3779 v4_req.mutable_read_options().set_read_consistency(
3780 datastore_v4_pb.ReadOptions.STRONG)
3781 elif v3_req.has_strong():
3782 v4_req.mutable_read_options().set_read_consistency(
3783 datastore_v4_pb.ReadOptions.EVENTUAL)
3784 if v3_req.has_min_safe_time_seconds():
3785 v4_req.set_min_safe_time_seconds(v3_req.min_safe_time_seconds())
3787 self._query_converter.v3_to_v4_query(v3_req, v4_req.mutable_query())
3789 return v4_req
3791 def v4_run_query_resp_to_v3_query_result(self, v4_resp):
3792 """Converts a V4 RunQueryResponse to a v3 QueryResult.
3794 Args:
3795 v4_resp: a datastore_v4_pb.QueryResult
3797 Returns:
3798 a datastore_pb.QueryResult
3800 v3_resp = self.v4_to_v3_query_result(v4_resp.batch())
3803 if v4_resp.has_query_handle():
3804 self.v4_to_v3_cursor(v4_resp.query_handle(), v3_resp.mutable_cursor())
3806 return v3_resp
3808 def v3_to_v4_run_query_resp(self, v3_resp):
3809 """Converts a v3 QueryResult to a V4 RunQueryResponse.
3811 Args:
3812 v3_resp: a datastore_pb.QueryResult
3814 Returns:
3815 a datastore_v4_pb.RunQueryResponse
3817 v4_resp = datastore_v4_pb.RunQueryResponse()
3818 self.v3_to_v4_query_result_batch(v3_resp, v4_resp.mutable_batch())
3820 if v3_resp.has_cursor():
3821 v4_resp.set_query_handle(
3822 self._query_converter.v3_to_v4_compiled_cursor(v3_resp.cursor()))
3824 return v4_resp
3829 def v4_to_v3_next_req(self, v4_req):
3830 """Converts a v4 ContinueQueryRequest to a v3 NextRequest.
3832 Args:
3833 v4_req: a datastore_v4_pb.ContinueQueryRequest
3835 Returns:
3836 a datastore_pb.NextRequest
3838 v3_req = datastore_pb.NextRequest()
3839 v3_req.set_compile(True)
3840 self.v4_to_v3_cursor(v4_req.query_handle(), v3_req.mutable_cursor())
3841 return v3_req
3843 def v3_to_v4_continue_query_resp(self, v3_resp):
3844 """Converts a v3 QueryResult to a v4 ContinueQueryResponse.
3846 Args:
3847 v3_resp: a datstore_pb.QueryResult
3849 Returns:
3850 a datastore_v4_pb.ContinueQueryResponse
3852 v4_resp = datastore_v4_pb.ContinueQueryResponse()
3853 self.v3_to_v4_query_result_batch(v3_resp, v4_resp.mutable_batch())
3854 return v4_resp
3859 def v4_to_v3_get_req(self, v4_req):
3860 """Converts a v4 LookupRequest to a v3 GetRequest.
3862 Args:
3863 v4_req: a datastore_v4_pb.LookupRequest
3865 Returns:
3866 a datastore_pb.GetRequest
3868 v3_req = datastore_pb.GetRequest()
3869 v3_req.set_allow_deferred(True)
3872 if v4_req.read_options().has_transaction():
3873 self.v4_to_v3_txn(v4_req.read_options().transaction(),
3874 v3_req.mutable_transaction())
3875 elif (v4_req.read_options().read_consistency()
3876 == datastore_v4_pb.ReadOptions.EVENTUAL):
3877 v3_req.set_strong(False)
3878 v3_req.set_failover_ms(-1)
3879 elif (v4_req.read_options().read_consistency()
3880 == datastore_v4_pb.ReadOptions.STRONG):
3881 v3_req.set_strong(True)
3883 for v4_key in v4_req.key_list():
3884 self._entity_converter.v4_to_v3_reference(v4_key, v3_req.add_key())
3886 return v3_req
3888 def v3_to_v4_lookup_resp(self, v3_resp):
3889 """Converts a v3 GetResponse to a v4 LookupResponse.
3891 Args:
3892 v3_resp: a datastore_pb.GetResponse
3894 Returns:
3895 a datastore_v4_pb.LookupResponse
3897 v4_resp = datastore_v4_pb.LookupResponse()
3899 for v3_ref in v3_resp.deferred_list():
3900 self._entity_converter.v3_to_v4_key(v3_ref, v4_resp.add_deferred())
3901 for v3_entity in v3_resp.entity_list():
3902 if v3_entity.has_entity():
3903 self._entity_converter.v3_to_v4_entity(
3904 v3_entity.entity(),
3905 v4_resp.add_found().mutable_entity())
3906 if v3_entity.has_key():
3907 self._entity_converter.v3_to_v4_key(
3908 v3_entity.key(),
3909 v4_resp.add_missing().mutable_entity().mutable_key())
3911 return v4_resp
3913 def v4_to_v3_query_result(self, v4_batch):
3914 """Converts a v4 QueryResultBatch to a v3 QueryResult.
3916 Args:
3917 v4_batch: a datastore_v4_pb.QueryResultBatch
3919 Returns:
3920 a datastore_pb.QueryResult
3922 v3_result = datastore_pb.QueryResult()
3925 v3_result.set_more_results(
3926 (v4_batch.more_results()
3927 == datastore_v4_pb.QueryResultBatch.NOT_FINISHED))
3928 if v4_batch.has_end_cursor():
3929 self._query_converter.v4_to_v3_compiled_cursor(
3930 v4_batch.end_cursor(), v3_result.mutable_compiled_cursor())
3931 if v4_batch.has_skipped_cursor():
3932 self._query_converter.v4_to_v3_compiled_cursor(
3933 v4_batch.skipped_cursor(),
3934 v3_result.mutable_skipped_results_compiled_cursor())
3937 if v4_batch.entity_result_type() == datastore_v4_pb.EntityResult.PROJECTION:
3938 v3_result.set_index_only(True)
3939 elif v4_batch.entity_result_type() == datastore_v4_pb.EntityResult.KEY_ONLY:
3940 v3_result.set_keys_only(True)
3943 if v4_batch.has_skipped_results():
3944 v3_result.set_skipped_results(v4_batch.skipped_results())
3945 for v4_entity in v4_batch.entity_result_list():
3946 v3_entity = v3_result.add_result()
3947 self._entity_converter.v4_to_v3_entity(v4_entity.entity(), v3_entity)
3948 if v4_entity.has_cursor():
3949 cursor = v3_result.add_result_compiled_cursor()
3950 self._query_converter.v4_to_v3_compiled_cursor(v4_entity.cursor(),
3951 cursor)
3952 if v4_batch.entity_result_type() != datastore_v4_pb.EntityResult.FULL:
3955 v3_entity.clear_entity_group()
3957 return v3_result
3959 def v3_to_v4_query_result_batch(self, v3_result, v4_batch):
3960 """Converts a v3 QueryResult to a v4 QueryResultBatch.
3962 Args:
3963 v3_result: a datastore_pb.QueryResult
3964 v4_batch: a datastore_v4_pb.QueryResultBatch to populate
3966 v4_batch.Clear()
3969 if v3_result.more_results():
3970 v4_batch.set_more_results(datastore_v4_pb.QueryResultBatch.NOT_FINISHED)
3971 else:
3972 v4_batch.set_more_results(
3973 datastore_v4_pb.QueryResultBatch.MORE_RESULTS_AFTER_LIMIT)
3974 if v3_result.has_compiled_cursor():
3975 v4_batch.set_end_cursor(
3976 self._query_converter.v3_to_v4_compiled_cursor(
3977 v3_result.compiled_cursor()))
3978 if v3_result.has_skipped_results_compiled_cursor():
3979 v4_batch.set_skipped_cursor(
3980 self._query_converter.v3_to_v4_compiled_cursor(
3981 v3_result.skipped_results_compiled_cursor()))
3984 if v3_result.keys_only():
3985 v4_batch.set_entity_result_type(datastore_v4_pb.EntityResult.KEY_ONLY)
3986 elif v3_result.index_only():
3987 v4_batch.set_entity_result_type(datastore_v4_pb.EntityResult.PROJECTION)
3988 else:
3989 v4_batch.set_entity_result_type(datastore_v4_pb.EntityResult.FULL)
3992 if v3_result.has_skipped_results():
3993 v4_batch.set_skipped_results(v3_result.skipped_results())
3994 for v3_entity, v3_cursor in itertools.izip_longest(
3995 v3_result.result_list(),
3996 v3_result.result_compiled_cursor_list()):
3997 v4_entity_result = datastore_v4_pb.EntityResult()
3998 self._entity_converter.v3_to_v4_entity(v3_entity,
3999 v4_entity_result.mutable_entity())
4000 if v3_cursor is not None:
4001 v4_entity_result.set_cursor(
4002 self._query_converter.v3_to_v4_compiled_cursor(v3_cursor))
4003 v4_batch.entity_result_list().append(v4_entity_result)
4007 __service_converter = StubServiceConverter(
4008 datastore_pbs.get_entity_converter(), __query_converter)
4011 def get_service_converter():
4012 """Returns a converter for v3 and v4 service request/response protos.
4014 This converter is suitable for use in stubs but not for production.
4016 Returns:
4017 a StubServiceConverter
4019 return __service_converter
4022 def ReverseBitsInt64(v):
4023 """Reverse the bits of a 64-bit integer.
4025 Args:
4026 v: Input integer of type 'int' or 'long'.
4028 Returns:
4029 Bit-reversed input as 'int' on 64-bit machines or as 'long' otherwise.
4032 v = ((v >> 1) & 0x5555555555555555) | ((v & 0x5555555555555555) << 1)
4033 v = ((v >> 2) & 0x3333333333333333) | ((v & 0x3333333333333333) << 2)
4034 v = ((v >> 4) & 0x0F0F0F0F0F0F0F0F) | ((v & 0x0F0F0F0F0F0F0F0F) << 4)
4035 v = ((v >> 8) & 0x00FF00FF00FF00FF) | ((v & 0x00FF00FF00FF00FF) << 8)
4036 v = ((v >> 16) & 0x0000FFFF0000FFFF) | ((v & 0x0000FFFF0000FFFF) << 16)
4037 v = int((v >> 32) | (v << 32) & 0xFFFFFFFFFFFFFFFF)
4038 return v
4041 def ToScatteredId(v):
4042 """Map counter value v to the scattered ID space.
4044 Translate to scattered ID space, then reverse bits.
4046 Args:
4047 v: Counter value from which to produce ID.
4049 Returns:
4050 Integer ID.
4052 Raises:
4053 datastore_errors.BadArgumentError if counter value exceeds the range of
4054 the scattered ID space.
4056 if v >= _MAX_SCATTERED_COUNTER:
4057 raise datastore_errors.BadArgumentError('counter value too large (%d)' %v)
4058 return _MAX_SEQUENTIAL_ID + 1 + long(ReverseBitsInt64(v << _SCATTER_SHIFT))
4061 def IdToCounter(k):
4062 """Map ID k to the counter value from which it was generated.
4064 Determine whether k is sequential or scattered ID.
4066 Args:
4067 k: ID from which to infer counter value.
4069 Returns:
4070 Tuple of integers (counter_value, id_space).
4072 if k > _MAX_SCATTERED_ID:
4073 return 0, SCATTERED
4074 elif k > _MAX_SEQUENTIAL_ID and k <= _MAX_SCATTERED_ID:
4075 return long(ReverseBitsInt64(k) >> _SCATTER_SHIFT), SCATTERED
4076 elif k > 0:
4077 return long(k), SEQUENTIAL
4078 else:
4079 raise datastore_errors.BadArgumentError('invalid id (%d)' % k)
4082 def CompareEntityPbByKey(a, b):
4083 """Compare two entity protobuf's by key.
4085 Args:
4086 a: entity_pb.EntityProto to compare
4087 b: entity_pb.EntityProto to compare
4088 Returns:
4089 <0 if a's key is before b's, =0 if they are the same key, and >0 otherwise.
4091 return cmp(datastore_types.Key._FromPb(a.key()),
4092 datastore_types.Key._FromPb(b.key()))
4095 def _GuessOrders(filters, orders):
4096 """Guess any implicit ordering.
4098 The datastore gives a logical, but not necessarily predictable, ordering when
4099 orders are not completely explicit. This function guesses at that ordering
4100 (which is better then always ordering by __key__ for tests).
4102 Args:
4103 filters: The datastore_pb.Query_Filter that have already been normalized and
4104 checked.
4105 orders: The datastore_pb.Query_Order that have already been normalized and
4106 checked. Mutated in place.
4108 orders = orders[:]
4111 if not orders:
4112 for filter_pb in filters:
4113 if filter_pb.op() in datastore_index.INEQUALITY_OPERATORS:
4115 order = datastore_pb.Query_Order()
4116 order.set_property(filter_pb.property(0).name())
4117 orders.append(order)
4118 break
4121 exists_props = (filter_pb.property(0).name() for filter_pb in filters
4122 if filter_pb.op() == datastore_pb.Query_Filter.EXISTS)
4123 for prop in sorted(exists_props):
4124 order = datastore_pb.Query_Order()
4125 order.set_property(prop)
4126 orders.append(order)
4129 if not orders or orders[-1].property() != '__key__':
4130 order = datastore_pb.Query_Order()
4131 order.set_property('__key__')
4132 orders.append(order)
4133 return orders
4136 def _MakeQuery(query_pb, filters, orders, filter_predicate):
4137 """Make a datastore_query.Query for the given datastore_pb.Query.
4139 Overrides filters and orders in query with the specified arguments.
4141 Args:
4142 query_pb: a datastore_pb.Query.
4143 filters: the filters from query.
4144 orders: the orders from query.
4145 filter_predicate: an additional filter of type
4146 datastore_query.FilterPredicate. This is passed along to implement V4
4147 specific filters without changing the entire stub.
4149 Returns:
4150 A datastore_query.Query for the datastore_pb.Query."""
4156 clone_pb = datastore_pb.Query()
4157 clone_pb.CopyFrom(query_pb)
4158 clone_pb.clear_filter()
4159 clone_pb.clear_order()
4160 clone_pb.filter_list().extend(filters)
4161 clone_pb.order_list().extend(orders)
4163 query = datastore_query.Query._from_pb(clone_pb)
4165 assert datastore_v4_pb.CompositeFilter._Operator_NAMES.values() == ['AND']
4170 if filter_predicate is not None:
4171 if query.filter_predicate is not None:
4174 filter_predicate = datastore_query.CompositeFilter(
4175 datastore_query.CompositeFilter.AND,
4176 [filter_predicate, query.filter_predicate])
4178 return datastore_query.Query(app=query.app,
4179 namespace=query.namespace,
4180 ancestor=query.ancestor,
4181 filter_predicate=filter_predicate,
4182 group_by=query.group_by,
4183 order=query.order)
4184 else:
4185 return query
4187 def _CreateIndexEntities(entity, postfix_props):
4188 """Creates entities for index values that would appear in prodcution.
4190 This function finds all multi-valued properties listed in split_props, and
4191 creates a new entity for each unique combination of values. The resulting
4192 entities will only have a single value for each property listed in
4193 split_props.
4195 It reserves the right to include index data that would not be
4196 seen in production, e.g. by returning the original entity when no splitting
4197 is needed. LoadEntity will remove any excess fields.
4199 This simulates the results seen by an index scan in the datastore.
4201 Args:
4202 entity: The entity_pb.EntityProto to split.
4203 split_props: A set of property names to split on.
4205 Returns:
4206 A list of the split entity_pb.EntityProtos.
4208 to_split = {}
4209 split_required = False
4210 base_props = []
4211 for prop in entity.property_list():
4212 if prop.name() in postfix_props:
4213 values = to_split.get(prop.name())
4214 if values is None:
4215 values = []
4216 to_split[prop.name()] = values
4217 else:
4219 split_required = True
4220 if prop.value() not in values:
4221 values.append(prop.value())
4222 else:
4223 base_props.append(prop)
4225 if not split_required:
4227 return [entity]
4229 clone = entity_pb.EntityProto()
4230 clone.CopyFrom(entity)
4231 clone.clear_property()
4232 clone.property_list().extend(base_props)
4233 results = [clone]
4235 for name, splits in to_split.iteritems():
4236 if len(splits) == 1:
4238 for result in results:
4239 prop = result.add_property()
4240 prop.set_name(name)
4241 prop.set_multiple(False)
4242 prop.set_meaning(entity_pb.Property.INDEX_VALUE)
4243 prop.mutable_value().CopyFrom(splits[0])
4244 continue
4246 new_results = []
4247 for result in results:
4248 for split in splits:
4249 clone = entity_pb.EntityProto()
4250 clone.CopyFrom(result)
4251 prop = clone.add_property()
4252 prop.set_name(name)
4253 prop.set_multiple(False)
4254 prop.set_meaning(entity_pb.Property.INDEX_VALUE)
4255 prop.mutable_value().CopyFrom(split)
4256 new_results.append(clone)
4257 results = new_results
4258 return results
4261 def _CreateIndexOnlyQueryResults(results, postfix_props):
4262 """Creates a result set similar to that returned by an index only query."""
4263 new_results = []
4264 for result in results:
4265 new_results.extend(_CreateIndexEntities(result, postfix_props))
4266 return new_results
4269 def _ExecuteQuery(results, query, filters, orders, index_list,
4270 filter_predicate=None):
4271 """Executes the query on a superset of its results.
4273 Args:
4274 results: superset of results for query.
4275 query: a datastore_pb.Query.
4276 filters: the filters from query.
4277 orders: the orders from query.
4278 index_list: the list of indexes used by the query.
4279 filter_predicate: an additional filter of type
4280 datastore_query.FilterPredicate. This is passed along to implement V4
4281 specific filters without changing the entire stub.
4283 Returns:
4284 A ListCursor over the results of applying query to results.
4286 orders = _GuessOrders(filters, orders)
4287 dsquery = _MakeQuery(query, filters, orders, filter_predicate)
4289 if query.property_name_size():
4290 results = _CreateIndexOnlyQueryResults(
4291 results, set(order.property() for order in orders))
4293 return ListCursor(query, dsquery, orders, index_list,
4294 datastore_query.apply_query(dsquery, results))
4297 def _UpdateCost(cost, entity_writes, index_writes):
4298 """Updates the provided cost.
4300 Args:
4301 cost: Out param. The cost object to update.
4302 entity_writes: The number of entity writes to add.
4303 index_writes: The number of index writes to add.
4305 cost.set_entity_writes(cost.entity_writes() + entity_writes)
4306 cost.set_index_writes(cost.index_writes() + index_writes)
4309 def _CalculateWriteOps(composite_indexes, old_entity, new_entity):
4310 """Determines number of entity and index writes needed to write new_entity.
4312 We assume that old_entity represents the current state of the Datastore.
4314 Args:
4315 composite_indexes: The composite_indexes for the kind of the entities.
4316 old_entity: Entity representing the current state in the Datstore.
4317 new_entity: Entity representing the desired state in the Datstore.
4319 Returns:
4320 A tuple of size 2, where the first value is the number of entity writes and
4321 the second value is the number of index writes.
4323 if (old_entity is not None and
4324 old_entity.property_list() == new_entity.property_list()
4325 and old_entity.raw_property_list() == new_entity.raw_property_list()):
4326 return 0, 0
4328 index_writes = _ChangedIndexRows(composite_indexes, old_entity, new_entity)
4329 if old_entity is None:
4333 index_writes += 1
4335 return 1, index_writes
4338 def _ChangedIndexRows(composite_indexes, old_entity, new_entity):
4339 """Determine the number of index rows that need to change.
4341 We assume that old_entity represents the current state of the Datastore.
4343 Args:
4344 composite_indexes: The composite_indexes for the kind of the entities.
4345 old_entity: Entity representing the current state in the Datastore.
4346 new_entity: Entity representing the desired state in the Datastore
4348 Returns:
4349 The number of index rows that need to change.
4354 unique_old_properties = collections.defaultdict(set)
4359 unique_new_properties = collections.defaultdict(set)
4361 if old_entity is not None:
4362 for old_prop in old_entity.property_list():
4363 _PopulateUniquePropertiesSet(old_prop, unique_old_properties)
4366 unchanged = collections.defaultdict(int)
4368 for new_prop in new_entity.property_list():
4369 new_prop_as_str = _PopulateUniquePropertiesSet(
4370 new_prop, unique_new_properties)
4371 if new_prop_as_str in unique_old_properties[new_prop.name()]:
4372 unchanged[new_prop.name()] += 1
4377 all_property_names = set(unique_old_properties.iterkeys())
4378 all_property_names.update(unique_old_properties.iterkeys())
4379 all_property_names.update(unchanged.iterkeys())
4381 all_indexes = _GetEntityByPropertyIndexes(all_property_names)
4382 all_indexes.extend([comp.definition() for comp in composite_indexes])
4383 path_size = new_entity.key().path().element_size()
4384 writes = 0
4385 for index in all_indexes:
4389 ancestor_multiplier = 1
4390 if index.ancestor() and index.property_size() > 1:
4391 ancestor_multiplier = path_size
4392 writes += (_CalculateWritesForCompositeIndex(
4393 index, unique_old_properties, unique_new_properties, unchanged) *
4394 ancestor_multiplier)
4395 return writes
4398 def _PopulateUniquePropertiesSet(prop, unique_properties):
4399 """Populates a set containing unique properties.
4401 Args:
4402 prop: An entity property.
4403 unique_properties: Dictionary mapping property names to a set of unique
4404 properties.
4406 Returns:
4407 The property pb in string (hashable) form.
4409 if prop.multiple():
4410 prop = _CopyAndSetMultipleToFalse(prop)
4413 prop_as_str = prop.SerializePartialToString()
4414 unique_properties[prop.name()].add(prop_as_str)
4415 return prop_as_str
4418 def _CalculateWritesForCompositeIndex(index, unique_old_properties,
4419 unique_new_properties,
4420 common_properties):
4421 """Calculate the number of writes required to maintain a specific Index.
4423 Args:
4424 index: The composite index.
4425 unique_old_properties: Dictionary mapping property names to a set of props
4426 present on the old entity.
4427 unique_new_properties: Dictionary mapping property names to a set of props
4428 present on the new entity.
4429 common_properties: Dictionary mapping property names to the number of
4430 properties with that name that are present on both the old and new
4431 entities.
4433 Returns:
4434 The number of writes required to maintained the provided index.
4436 old_count = 1
4437 new_count = 1
4438 common_count = 1
4439 for prop in index.property_list():
4440 old_count *= len(unique_old_properties[prop.name()])
4441 new_count *= len(unique_new_properties[prop.name()])
4442 common_count *= common_properties[prop.name()]
4444 return (old_count - common_count) + (new_count - common_count)
4447 def _GetEntityByPropertyIndexes(all_property_names):
4448 indexes = []
4449 for prop_name in all_property_names:
4450 indexes.append(
4451 _SinglePropertyIndex(prop_name, entity_pb.Index_Property.ASCENDING))
4452 indexes.append(
4453 _SinglePropertyIndex(prop_name, entity_pb.Index_Property.DESCENDING))
4454 return indexes
4457 def _SinglePropertyIndex(prop_name, direction):
4458 """Creates a single property Index for the given name and direction.
4460 Args:
4461 prop_name: The name of the single property on the Index.
4462 direction: The direction of the Index.
4464 Returns:
4465 A single property Index with the given property and direction.
4467 index = entity_pb.Index()
4468 prop = index.add_property()
4469 prop.set_name(prop_name)
4470 prop.set_direction(direction)
4471 return index
4474 def _CopyAndSetMultipleToFalse(prop):
4475 """Copy the provided Property and set its "multiple" attribute to False.
4477 Args:
4478 prop: The Property to copy.
4480 Returns:
4481 A copy of the given Property with its "multiple" attribute set to False.
4488 prop_copy = entity_pb.Property()
4489 prop_copy.MergeFrom(prop)
4490 prop_copy.set_multiple(False)
4491 return prop_copy
4494 def _SetStartInclusive(position, first_direction):
4495 """Sets the start_inclusive field in position.
4497 Args:
4498 position: datastore_pb.Position
4499 first_direction: the first sort order from the query
4500 (a datastore_pb.Query_Order) or None
4502 position.set_start_inclusive(
4503 position.before_ascending()
4504 != (first_direction == datastore_pb.Query_Order.DESCENDING))
4507 def _SetBeforeAscending(position, first_direction):
4508 """Sets the before_ascending field in position.
4510 Args:
4511 position: datastore_pb.Position
4512 first_direction: the first sort order from the query
4513 (a datastore_pb.Query_Order) or None
4515 position.set_before_ascending(
4516 position.start_inclusive()
4517 != (first_direction == datastore_pb.Query_Order.DESCENDING))