App Engine Python SDK version 1.9.12
[gae.git] / python / google / appengine / datastore / datastore_stub_util.py
blob42dcea32bd47dcb0c274dea0e9be5a5479918387
1 #!/usr/bin/env python
3 # Copyright 2007 Google Inc.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
21 """Utility functions shared between the file and sqlite datastore stubs.
23 This module is internal and should not be used by client applications.
24 """
27 from __future__ import with_statement
35 try:
36 import hashlib
37 _MD5_FUNC = hashlib.md5
38 except ImportError:
39 import md5
40 _MD5_FUNC = md5.new
42 import atexit
43 import collections
44 import itertools
45 import logging
46 import os
47 import random
48 import struct
49 import threading
50 import time
51 import weakref
53 from google.net.proto import ProtocolBuffer
54 from google.appengine.datastore import entity_pb
56 from google.appengine.api import api_base_pb
57 from google.appengine.api import apiproxy_stub_map
58 from google.appengine.api import datastore_admin
59 from google.appengine.api import datastore_errors
60 from google.appengine.api import datastore_types
61 from google.appengine.api.taskqueue import taskqueue_service_pb
62 from google.appengine.datastore import datastore_index
63 from google.appengine.datastore import datastore_pb
64 from google.appengine.datastore import datastore_pbs
65 from google.appengine.datastore import datastore_query
66 from google.appengine.datastore import datastore_stub_index
67 from google.appengine.datastore import datastore_v4_pb
68 from google.appengine.runtime import apiproxy_errors
73 _MAXIMUM_RESULTS = 300
79 _MAXIMUM_QUERY_RESULT_BYTES = 2000000
85 _MAX_QUERY_OFFSET = 1000
89 _PROPERTY_TYPE_NAMES = {
90 0: 'NULL',
91 entity_pb.PropertyValue.kint64Value: 'INT64',
92 entity_pb.PropertyValue.kbooleanValue: 'BOOLEAN',
93 entity_pb.PropertyValue.kstringValue: 'STRING',
94 entity_pb.PropertyValue.kdoubleValue: 'DOUBLE',
95 entity_pb.PropertyValue.kPointValueGroup: 'POINT',
96 entity_pb.PropertyValue.kUserValueGroup: 'USER',
97 entity_pb.PropertyValue.kReferenceValueGroup: 'REFERENCE'
102 _SCATTER_PROPORTION = 32768
107 _MAX_EG_PER_TXN = 5
112 _BLOB_MEANINGS = frozenset((entity_pb.Property.BLOB,
113 entity_pb.Property.ENTITY_PROTO,
114 entity_pb.Property.TEXT))
122 _RETRIES = 3
126 _INITIAL_RETRY_DELAY_MS = 100
130 _RETRY_DELAY_MULTIPLIER = 2
134 _MAX_RETRY_DELAY_MS = 120000
139 SEQUENTIAL = 'sequential'
140 SCATTERED = 'scattered'
146 _MAX_SEQUENTIAL_BIT = 52
151 _MAX_SEQUENTIAL_COUNTER = (1 << _MAX_SEQUENTIAL_BIT) - 1
155 _MAX_SEQUENTIAL_ID = _MAX_SEQUENTIAL_COUNTER
160 _MAX_SCATTERED_COUNTER = (1 << (_MAX_SEQUENTIAL_BIT - 1)) - 1
166 _MAX_SCATTERED_ID = _MAX_SEQUENTIAL_ID + 1 + _MAX_SCATTERED_COUNTER
170 _SCATTER_SHIFT = 64 - _MAX_SEQUENTIAL_BIT + 1
173 def _GetScatterProperty(entity_proto):
174 """Gets the scatter property for an object.
176 For ease of implementation, this is not synchronized with the actual
177 value on the App Engine server, but should work equally well.
179 Note: This property may change, either here or in production. No client
180 other than the mapper framework should rely on it directly.
182 Returns:
183 The PropertyValue of the scatter property or None if this entity should not
184 have a scatter property.
186 hash_obj = _MD5_FUNC()
187 for element in entity_proto.key().path().element_list():
188 if element.has_name():
189 hash_obj.update(element.name())
190 elif element.has_id():
191 hash_obj.update(str(element.id()))
192 hash_bytes = hash_obj.digest()[0:2]
193 (hash_int,) = struct.unpack('H', hash_bytes)
195 if hash_int >= _SCATTER_PROPORTION:
196 return None
198 scatter_property = entity_pb.Property()
199 scatter_property.set_name(datastore_types.SCATTER_SPECIAL_PROPERTY)
200 scatter_property.set_meaning(entity_pb.Property.BYTESTRING)
201 scatter_property.set_multiple(False)
202 property_value = scatter_property.mutable_value()
203 property_value.set_stringvalue(hash_bytes)
204 return scatter_property
210 _SPECIAL_PROPERTY_MAP = {
211 datastore_types.SCATTER_SPECIAL_PROPERTY: (False, True, _GetScatterProperty)
215 def GetInvisibleSpecialPropertyNames():
216 """Gets the names of all non user-visible special properties."""
217 invisible_names = []
218 for name, value in _SPECIAL_PROPERTY_MAP.items():
219 is_visible, _, _ = value
220 if not is_visible:
221 invisible_names.append(name)
222 return invisible_names
225 def _PrepareSpecialProperties(entity_proto, is_load):
226 """Computes special properties for loading or storing.
227 Strips other special properties."""
228 for i in xrange(entity_proto.property_size() - 1, -1, -1):
229 if _SPECIAL_PROPERTY_MAP.has_key(entity_proto.property(i).name()):
230 del entity_proto.property_list()[i]
232 for is_visible, is_stored, property_func in _SPECIAL_PROPERTY_MAP.values():
233 if is_load:
234 should_process = is_visible
235 else:
236 should_process = is_stored
238 if should_process:
239 special_property = property_func(entity_proto)
240 if special_property:
241 entity_proto.property_list().append(special_property)
244 def _GetGroupByKey(entity, property_names):
245 """Computes a key value that uniquely identifies the 'group' of an entity.
247 Args:
248 entity: The entity_pb.EntityProto for which to create the group key.
249 property_names: The names of the properties in the group by clause.
251 Returns:
252 A hashable value that uniquely identifies the entity's 'group'.
254 return frozenset((prop.name(), prop.value().SerializeToString())
255 for prop in entity.property_list()
256 if prop.name() in property_names)
259 def PrepareSpecialPropertiesForStore(entity_proto):
260 """Computes special properties for storing.
261 Strips other special properties."""
262 _PrepareSpecialProperties(entity_proto, False)
265 def LoadEntity(entity, keys_only=False, property_names=None):
266 """Prepares an entity to be returned to the user.
268 Args:
269 entity: a entity_pb.EntityProto or None
270 keys_only: if a keys only result should be produced
271 property_names: if not None or empty, cause a projected entity
272 to be produced with the given properties.
274 Returns:
275 A user friendly copy of entity or None.
277 if entity:
278 clone = entity_pb.EntityProto()
279 if property_names:
281 clone.mutable_key().CopyFrom(entity.key())
282 clone.mutable_entity_group()
283 seen = set()
284 for prop in entity.property_list():
285 if prop.name() in property_names:
287 Check(prop.name() not in seen,
288 "datastore dev stub produced bad result",
289 datastore_pb.Error.INTERNAL_ERROR)
290 seen.add(prop.name())
291 new_prop = clone.add_property()
292 new_prop.set_name(prop.name())
293 new_prop.set_meaning(entity_pb.Property.INDEX_VALUE)
294 new_prop.mutable_value().CopyFrom(prop.value())
295 new_prop.set_multiple(False)
296 elif keys_only:
298 clone.mutable_key().CopyFrom(entity.key())
299 clone.mutable_entity_group()
300 else:
302 clone.CopyFrom(entity)
303 PrepareSpecialPropertiesForLoad(clone)
304 return clone
307 def StoreEntity(entity):
308 """Prepares an entity for storing.
310 Args:
311 entity: a entity_pb.EntityProto to prepare
313 Returns:
314 A copy of entity that should be stored in its place.
316 clone = entity_pb.EntityProto()
317 clone.CopyFrom(entity)
321 PrepareSpecialPropertiesForStore(clone)
322 return clone
325 def PrepareSpecialPropertiesForLoad(entity_proto):
326 """Computes special properties that are user-visible.
327 Strips other special properties."""
328 _PrepareSpecialProperties(entity_proto, True)
331 def Check(test, msg='', error_code=datastore_pb.Error.BAD_REQUEST):
332 """Raises an apiproxy_errors.ApplicationError if the condition is false.
334 Args:
335 test: A condition to test.
336 msg: A string to return with the error.
337 error_code: One of datastore_pb.Error to use as an error code.
339 Raises:
340 apiproxy_errors.ApplicationError: If test is false.
342 if not test:
343 raise apiproxy_errors.ApplicationError(error_code, msg)
346 def CheckValidUTF8(string, desc):
347 """Check that the given string is valid UTF-8.
349 Args:
350 string: the string to validate.
351 desc: a description of the string being validated.
353 Raises:
354 apiproxy_errors.ApplicationError: if the string is not valid UTF-8.
356 try:
357 string.decode('utf-8')
358 except UnicodeDecodeError:
359 Check(False, '%s is not valid UTF-8.' % desc)
362 def CheckAppId(request_trusted, request_app_id, app_id):
363 """Check that this is the stub for app_id.
365 Args:
366 request_trusted: If the request is trusted.
367 request_app_id: The application ID of the app making the request.
368 app_id: An application ID.
370 Raises:
371 apiproxy_errors.ApplicationError: if this is not the stub for app_id.
374 assert app_id
375 CheckValidUTF8(app_id, "app id");
376 Check(request_trusted or app_id == request_app_id,
377 'app "%s" cannot access app "%s"\'s data' % (request_app_id, app_id))
380 def CheckReference(request_trusted,
381 request_app_id,
382 key,
383 require_id_or_name=True):
384 """Check this key.
386 Args:
387 request_trusted: If the request is trusted.
388 request_app_id: The application ID of the app making the request.
389 key: entity_pb.Reference
390 require_id_or_name: Boolean indicating if we should enforce the presence of
391 an id or name in the last element of the key's path.
393 Raises:
394 apiproxy_errors.ApplicationError: if the key is invalid
397 assert isinstance(key, entity_pb.Reference)
399 CheckAppId(request_trusted, request_app_id, key.app())
401 Check(key.path().element_size() > 0, 'key\'s path cannot be empty')
403 if require_id_or_name:
405 last_element = key.path().element_list()[-1]
406 has_id_or_name = ((last_element.has_id() and last_element.id() != 0) or
407 (last_element.has_name() and last_element.name() != ""))
408 if not has_id_or_name:
409 raise datastore_errors.BadRequestError('missing key id/name')
411 for elem in key.path().element_list():
412 Check(not elem.has_id() or not elem.has_name(),
413 'each key path element should have id or name but not both: %r' % key)
414 CheckValidUTF8(elem.type(), 'key path element type')
415 if elem.has_name():
416 CheckValidUTF8(elem.name(), 'key path element name')
419 def CheckEntity(request_trusted, request_app_id, entity):
420 """Check if this entity can be stored.
422 Args:
423 request_trusted: If the request is trusted.
424 request_app_id: The application ID of the app making the request.
425 entity: entity_pb.EntityProto
427 Raises:
428 apiproxy_errors.ApplicationError: if the entity is invalid
432 CheckReference(request_trusted, request_app_id, entity.key(), False)
433 for prop in entity.property_list():
434 CheckProperty(request_trusted, request_app_id, prop)
435 for prop in entity.raw_property_list():
436 CheckProperty(request_trusted, request_app_id, prop, indexed=False)
439 def CheckProperty(request_trusted, request_app_id, prop, indexed=True):
440 """Check if this property can be stored.
442 Args:
443 request_trusted: If the request is trusted.
444 request_app_id: The application ID of the app making the request.
445 prop: entity_pb.Property
446 indexed: Whether the property is indexed.
448 Raises:
449 apiproxy_errors.ApplicationError: if the property is invalid
451 name = prop.name()
452 value = prop.value()
453 meaning = prop.meaning()
454 CheckValidUTF8(name, 'property name')
455 Check(request_trusted or
456 not datastore_types.RESERVED_PROPERTY_NAME.match(name),
457 'cannot store entity with reserved property name \'%s\'' % name)
458 Check(prop.meaning() != entity_pb.Property.INDEX_VALUE,
459 'Entities with incomplete properties cannot be written.')
460 is_blob = meaning in _BLOB_MEANINGS
461 if indexed:
462 Check(not is_blob,
463 'BLOB, ENITY_PROTO or TEXT property ' + name +
464 ' must be in a raw_property field')
465 max_length = datastore_types._MAX_STRING_LENGTH
466 else:
467 if is_blob:
468 Check(value.has_stringvalue(),
469 'BLOB / ENTITY_PROTO / TEXT raw property ' + name +
470 'must have a string value')
471 max_length = datastore_types._MAX_RAW_PROPERTY_BYTES
472 if meaning == entity_pb.Property.ATOM_LINK:
473 max_length = datastore_types._MAX_LINK_PROPERTY_LENGTH
475 CheckPropertyValue(name, value, max_length, meaning)
478 def CheckPropertyValue(name, value, max_length, meaning):
479 """Check if this property value can be stored.
481 Args:
482 name: name of the property
483 value: entity_pb.PropertyValue
484 max_length: maximum length for string values
485 meaning: meaning of the property
487 Raises:
488 apiproxy_errors.ApplicationError: if the property is invalid
490 num_values = (value.has_int64value() +
491 value.has_stringvalue() +
492 value.has_booleanvalue() +
493 value.has_doublevalue() +
494 value.has_pointvalue() +
495 value.has_uservalue() +
496 value.has_referencevalue())
497 Check(num_values <= 1, 'PropertyValue for ' + name +
498 ' has multiple value fields set')
500 if value.has_stringvalue():
508 s16 = value.stringvalue().decode('utf-8', 'replace').encode('utf-16')
510 Check((len(s16) - 2) / 2 <= max_length,
511 'Property %s is too long. Maximum length is %d.' % (name, max_length))
512 if (meaning not in _BLOB_MEANINGS and
513 meaning != entity_pb.Property.BYTESTRING):
514 CheckValidUTF8(value.stringvalue(), 'String property value')
517 def CheckTransaction(request_trusted, request_app_id, transaction):
518 """Check that this transaction is valid.
520 Args:
521 request_trusted: If the request is trusted.
522 request_app_id: The application ID of the app making the request.
523 transaction: datastore_pb.Transaction
525 Raises:
526 apiproxy_errors.ApplicationError: if the transaction is not valid.
528 assert isinstance(transaction, datastore_pb.Transaction)
529 CheckAppId(request_trusted, request_app_id, transaction.app())
532 def CheckQuery(query, filters, orders, max_query_components):
533 """Check a datastore query with normalized filters, orders.
535 Raises an ApplicationError when any of the following conditions are violated:
536 - transactional queries have an ancestor
537 - queries that are not too large
538 (sum of filters, orders, ancestor <= max_query_components)
539 - ancestor (if any) app and namespace match query app and namespace
540 - kindless queries only filter on __key__ and only sort on __key__ ascending
541 - multiple inequality (<, <=, >, >=) filters all applied to the same property
542 - filters on __key__ compare to a reference in the same app and namespace as
543 the query
544 - if an inequality filter on prop X is used, the first order (if any) must
545 be on X
547 Args:
548 query: query to validate
549 filters: normalized (by datastore_index.Normalize) filters from query
550 orders: normalized (by datastore_index.Normalize) orders from query
551 max_query_components: limit on query complexity
553 Check(query.property_name_size() == 0 or not query.keys_only(),
554 'projection and keys_only cannot both be set')
556 projected_properties = set(query.property_name_list())
557 for prop_name in query.property_name_list():
558 Check(not datastore_types.RESERVED_PROPERTY_NAME.match(prop_name),
559 'projections are not supported for the property: ' + prop_name)
560 Check(len(projected_properties) == len(query.property_name_list()),
561 "cannot project a property multiple times")
563 key_prop_name = datastore_types.KEY_SPECIAL_PROPERTY
564 unapplied_log_timestamp_us_name = (
565 datastore_types._UNAPPLIED_LOG_TIMESTAMP_SPECIAL_PROPERTY)
567 if query.has_transaction():
569 Check(query.has_ancestor(),
570 'Only ancestor queries are allowed inside transactions.')
573 num_components = len(filters) + len(orders)
574 if query.has_ancestor():
575 num_components += 1
576 Check(num_components <= max_query_components,
577 'query is too large. may not have more than %s filters'
578 ' + sort orders ancestor total' % max_query_components)
581 if query.has_ancestor():
582 ancestor = query.ancestor()
583 Check(query.app() == ancestor.app(),
584 'query app is %s but ancestor app is %s' %
585 (query.app(), ancestor.app()))
586 Check(query.name_space() == ancestor.name_space(),
587 'query namespace is %s but ancestor namespace is %s' %
588 (query.name_space(), ancestor.name_space()))
591 if query.group_by_property_name_size():
592 group_by_set = set(query.group_by_property_name_list())
593 for order in orders:
594 if not group_by_set:
595 break
596 Check(order.property() in group_by_set,
597 'items in the group by clause must be specified first '
598 'in the ordering')
599 group_by_set.remove(order.property())
603 ineq_prop_name = None
604 for filter in filters:
605 Check(filter.property_size() == 1,
606 'Filter has %d properties, expected 1' % filter.property_size())
608 prop = filter.property(0)
609 prop_name = prop.name().decode('utf-8')
611 if prop_name == key_prop_name:
615 Check(prop.value().has_referencevalue(),
616 '%s filter value must be a Key' % key_prop_name)
617 ref_val = prop.value().referencevalue()
618 Check(ref_val.app() == query.app(),
619 '%s filter app is %s but query app is %s' %
620 (key_prop_name, ref_val.app(), query.app()))
621 Check(ref_val.name_space() == query.name_space(),
622 '%s filter namespace is %s but query namespace is %s' %
623 (key_prop_name, ref_val.name_space(), query.name_space()))
625 if filter.op() in datastore_index.EQUALITY_OPERATORS:
626 Check(prop_name not in projected_properties,
627 'cannot use projection on a property with an equality filter')
628 if (filter.op() in datastore_index.INEQUALITY_OPERATORS and
629 prop_name != unapplied_log_timestamp_us_name):
630 if ineq_prop_name is None:
631 ineq_prop_name = prop_name
632 else:
633 Check(ineq_prop_name == prop_name,
634 'Only one inequality filter per query is supported. '
635 'Encountered both %s and %s' % (ineq_prop_name, prop_name))
637 if (ineq_prop_name is not None
638 and query.group_by_property_name_size() > 0
639 and not orders):
641 Check(ineq_prop_name in group_by_set,
642 'Inequality filter on %s must also be a group by '
643 'property when group by properties are set.'
644 % (ineq_prop_name))
646 if ineq_prop_name is not None and orders:
648 first_order_prop = orders[0].property().decode('utf-8')
649 Check(first_order_prop == ineq_prop_name,
650 'The first sort property must be the same as the property '
651 'to which the inequality filter is applied. In your query '
652 'the first sort property is %s but the inequality filter '
653 'is on %s' % (first_order_prop, ineq_prop_name))
655 if not query.has_kind():
657 for filter in filters:
658 prop_name = filter.property(0).name().decode('utf-8')
659 Check(prop_name == key_prop_name or
660 prop_name == unapplied_log_timestamp_us_name,
661 'kind is required for non-__key__ filters')
662 for order in orders:
663 prop_name = order.property().decode('utf-8')
664 Check(prop_name == key_prop_name and
665 order.direction() is datastore_pb.Query_Order.ASCENDING,
666 'kind is required for all orders except __key__ ascending')
669 class ValueRange(object):
670 """A range of values defined by its two extremes (inclusive or exclusive)."""
672 def __init__(self):
673 """Constructor.
675 Creates an unlimited range.
677 self.__start = self.__end = None
678 self.__start_inclusive = self.__end_inclusive = False
680 def Update(self, rel_op, limit):
681 """Filter the range by 'rel_op limit'.
683 Args:
684 rel_op: relational operator from datastore_pb.Query_Filter.
685 limit: the value to limit the range by.
688 if rel_op == datastore_pb.Query_Filter.LESS_THAN:
689 if self.__end is None or limit <= self.__end:
690 self.__end = limit
691 self.__end_inclusive = False
692 elif (rel_op == datastore_pb.Query_Filter.LESS_THAN_OR_EQUAL or
693 rel_op == datastore_pb.Query_Filter.EQUAL):
694 if self.__end is None or limit < self.__end:
695 self.__end = limit
696 self.__end_inclusive = True
698 if rel_op == datastore_pb.Query_Filter.GREATER_THAN:
699 if self.__start is None or limit >= self.__start:
700 self.__start = limit
701 self.__start_inclusive = False
702 elif (rel_op == datastore_pb.Query_Filter.GREATER_THAN_OR_EQUAL or
703 rel_op == datastore_pb.Query_Filter.EQUAL):
704 if self.__start is None or limit > self.__start:
705 self.__start = limit
706 self.__start_inclusive = True
708 def Contains(self, value):
709 """Check if the range contains a specific value.
711 Args:
712 value: the value to check.
713 Returns:
714 True iff value is contained in this range.
716 if self.__start is not None:
717 if self.__start_inclusive and value < self.__start: return False
718 if not self.__start_inclusive and value <= self.__start: return False
719 if self.__end is not None:
720 if self.__end_inclusive and value > self.__end: return False
721 if not self.__end_inclusive and value >= self.__end: return False
722 return True
724 def Remap(self, mapper):
725 """Transforms the range extremes with a function.
727 The function mapper must preserve order, i.e.
728 x rel_op y iff mapper(x) rel_op y
730 Args:
731 mapper: function to apply to the range extremes.
733 self.__start = self.__start and mapper(self.__start)
734 self.__end = self.__end and mapper(self.__end)
736 def MapExtremes(self, mapper):
737 """Evaluate a function on the range extremes.
739 Args:
740 mapper: function to apply to the range extremes.
741 Returns:
742 (x, y) where x = None if the range has no start,
743 mapper(start, start_inclusive, False) otherwise
744 y = None if the range has no end,
745 mapper(end, end_inclusive, True) otherwise
747 return (
748 self.__start and mapper(self.__start, self.__start_inclusive, False),
749 self.__end and mapper(self.__end, self.__end_inclusive, True))
752 def ParseKeyFilteredQuery(filters, orders):
753 """Parse queries which only allow filters and ascending-orders on __key__.
755 Raises exceptions for illegal queries.
756 Args:
757 filters: the normalized filters of a query.
758 orders: the normalized orders of a query.
759 Returns:
760 The key range (a ValueRange over datastore_types.Key) requested in the
761 query.
764 remaining_filters = []
765 key_range = ValueRange()
766 key_prop = datastore_types.KEY_SPECIAL_PROPERTY
767 for f in filters:
768 op = f.op()
769 if not (f.property_size() == 1 and
770 f.property(0).name() == key_prop and
771 not (op == datastore_pb.Query_Filter.IN or
772 op == datastore_pb.Query_Filter.EXISTS)):
773 remaining_filters.append(f)
774 continue
776 val = f.property(0).value()
777 Check(val.has_referencevalue(), '__key__ kind must be compared to a key')
778 limit = datastore_types.FromReferenceProperty(val)
779 key_range.Update(op, limit)
782 remaining_orders = []
783 for o in orders:
784 if not (o.direction() == datastore_pb.Query_Order.ASCENDING and
785 o.property() == datastore_types.KEY_SPECIAL_PROPERTY):
786 remaining_orders.append(o)
787 else:
788 break
792 Check(not remaining_filters,
793 'Only comparison filters on ' + key_prop + ' supported')
794 Check(not remaining_orders,
795 'Only ascending order on ' + key_prop + ' supported')
797 return key_range
800 def ParseKindQuery(query, filters, orders):
801 """Parse __kind__ (schema) queries.
803 Raises exceptions for illegal queries.
804 Args:
805 query: A Query PB.
806 filters: the normalized filters from query.
807 orders: the normalized orders from query.
808 Returns:
809 The kind range (a ValueRange over string) requested in the query.
812 Check(not query.has_ancestor(), 'ancestor queries on __kind__ not allowed')
814 key_range = ParseKeyFilteredQuery(filters, orders)
815 key_range.Remap(_KindKeyToString)
817 return key_range
820 def _KindKeyToString(key):
821 """Extract kind name from __kind__ key.
823 Raises an ApplicationError if the key is not of the form '__kind__'/name.
825 Args:
826 key: a key for a __kind__ instance.
827 Returns:
828 kind specified by key.
830 key_path = key.to_path()
831 if (len(key_path) == 2 and key_path[0] == '__kind__' and
832 isinstance(key_path[1], basestring)):
833 return key_path[1]
834 Check(False, 'invalid Key for __kind__ table')
837 def ParseNamespaceQuery(query, filters, orders):
838 """Parse __namespace__ queries.
840 Raises exceptions for illegal queries.
841 Args:
842 query: A Query PB.
843 filters: the normalized filters from query.
844 orders: the normalized orders from query.
845 Returns:
846 The kind range (a ValueRange over string) requested in the query.
849 Check(not query.has_ancestor(),
850 'ancestor queries on __namespace__ not allowed')
852 key_range = ParseKeyFilteredQuery(filters, orders)
853 key_range.Remap(_NamespaceKeyToString)
855 return key_range
858 def _NamespaceKeyToString(key):
859 """Extract namespace name from __namespace__ key.
861 Raises an ApplicationError if the key is not of the form '__namespace__'/name
862 or '__namespace__'/_EMPTY_NAMESPACE_ID.
864 Args:
865 key: a key for a __namespace__ instance.
866 Returns:
867 namespace specified by key.
869 key_path = key.to_path()
870 if len(key_path) == 2 and key_path[0] == '__namespace__':
871 if key_path[1] == datastore_types._EMPTY_NAMESPACE_ID:
872 return ''
873 if isinstance(key_path[1], basestring):
874 return key_path[1]
875 Check(False, 'invalid Key for __namespace__ table')
878 def ParsePropertyQuery(query, filters, orders):
879 """Parse __property__ queries.
881 Raises exceptions for illegal queries.
882 Args:
883 query: A Query PB.
884 filters: the normalized filters from query.
885 orders: the normalized orders from query.
886 Returns:
887 The kind range (a ValueRange over (kind, property) pairs) requested
888 in the query.
891 Check(not query.has_transaction(),
892 'transactional queries on __property__ not allowed')
894 key_range = ParseKeyFilteredQuery(filters, orders)
895 key_range.Remap(lambda x: _PropertyKeyToString(x, ''))
897 if query.has_ancestor():
898 ancestor = datastore_types.Key._FromPb(query.ancestor())
899 ancestor_kind, ancestor_property = _PropertyKeyToString(ancestor, None)
902 if ancestor_property is not None:
903 key_range.Update(datastore_pb.Query_Filter.EQUAL,
904 (ancestor_kind, ancestor_property))
905 else:
907 key_range.Update(datastore_pb.Query_Filter.GREATER_THAN_OR_EQUAL,
908 (ancestor_kind, ''))
909 key_range.Update(datastore_pb.Query_Filter.LESS_THAN_OR_EQUAL,
910 (ancestor_kind + '\0', ''))
911 query.clear_ancestor()
913 return key_range
916 def _PropertyKeyToString(key, default_property):
917 """Extract property name from __property__ key.
919 Raises an ApplicationError if the key is not of the form
920 '__kind__'/kind, '__property__'/property or '__kind__'/kind
922 Args:
923 key: a key for a __property__ instance.
924 default_property: property value to return when key only has a kind.
925 Returns:
926 kind, property if key = '__kind__'/kind, '__property__'/property
927 kind, default_property if key = '__kind__'/kind
929 key_path = key.to_path()
930 if (len(key_path) == 2 and
931 key_path[0] == '__kind__' and isinstance(key_path[1], basestring)):
932 return (key_path[1], default_property)
933 if (len(key_path) == 4 and
934 key_path[0] == '__kind__' and isinstance(key_path[1], basestring) and
935 key_path[2] == '__property__' and isinstance(key_path[3], basestring)):
936 return (key_path[1], key_path[3])
938 Check(False, 'invalid Key for __property__ table')
941 def SynthesizeUserId(email):
942 """Return a synthetic user ID from an email address.
944 Note that this is not the same user ID found in the production system.
946 Args:
947 email: An email address.
949 Returns:
950 A string userid derived from the email address.
953 user_id_digest = _MD5_FUNC(email.lower()).digest()
954 user_id = '1' + ''.join(['%02d' % ord(x) for x in user_id_digest])[:20]
955 return user_id
958 def FillUsersInQuery(filters):
959 """Fill in a synthetic user ID for all user properties in a set of filters.
961 Args:
962 filters: The normalized filters from query.
964 for filter in filters:
965 for property in filter.property_list():
966 FillUser(property)
969 def FillUser(property):
970 """Fill in a synthetic user ID for a user properties.
972 Args:
973 property: A Property which may have a user value.
975 if property.value().has_uservalue():
976 uid = SynthesizeUserId(property.value().uservalue().email())
977 if uid:
978 property.mutable_value().mutable_uservalue().set_obfuscated_gaiaid(uid)
981 class BaseCursor(object):
982 """A base query cursor over a list of entities.
984 Public properties:
985 cursor: the integer cursor.
986 app: the app for which this cursor was created.
987 keys_only: whether the query is keys_only.
989 Class attributes:
990 _next_cursor: the next cursor to allocate.
991 _next_cursor_lock: protects _next_cursor.
993 _next_cursor = 1
994 _next_cursor_lock = threading.Lock()
996 def __init__(self, query, dsquery, orders, index_list):
997 """Constructor.
999 Args:
1000 query: the query request proto.
1001 dsquery: a datastore_query.Query over query.
1002 orders: the orders of query as returned by _GuessOrders.
1003 index_list: the list of indexes used by the query.
1006 self.keys_only = query.keys_only()
1007 self.property_names = set(query.property_name_list())
1008 self.group_by = set(query.group_by_property_name_list())
1009 self.app = query.app()
1010 self.cursor = self._AcquireCursorID()
1012 self.__order_compare_entities = dsquery._order.cmp_for_filter(
1013 dsquery._filter_predicate)
1014 if self.group_by:
1015 self.__cursor_properties = self.group_by
1016 else:
1017 self.__cursor_properties = set(order.property() for order in orders)
1018 self.__cursor_properties.add('__key__')
1019 self.__cursor_properties = frozenset(self.__cursor_properties)
1021 self.__first_sort_order = orders[0].direction()
1022 self.__index_list = index_list
1024 def _PopulateResultMetadata(self, query_result, compile,
1025 first_result, last_result):
1026 query_result.set_keys_only(self.keys_only)
1027 if query_result.more_results():
1028 cursor = query_result.mutable_cursor()
1029 cursor.set_app(self.app)
1030 cursor.set_cursor(self.cursor)
1031 if compile:
1032 self._EncodeCompiledCursor(last_result,
1033 query_result.mutable_compiled_cursor())
1034 if first_result:
1035 query_result.index_list().extend(self.__index_list)
1037 @classmethod
1038 def _AcquireCursorID(cls):
1039 """Acquires the next cursor id in a thread safe manner."""
1040 cls._next_cursor_lock.acquire()
1041 try:
1042 cursor_id = cls._next_cursor
1043 cls._next_cursor += 1
1044 finally:
1045 cls._next_cursor_lock.release()
1046 return cursor_id
1048 def _IsBeforeCursor(self, entity, cursor):
1049 """True if entity is before cursor according to the current order.
1051 Args:
1052 entity: a entity_pb.EntityProto entity.
1053 cursor: a compiled cursor as returned by _DecodeCompiledCursor.
1055 comparison_entity = entity_pb.EntityProto()
1056 for prop in entity.property_list():
1057 if prop.name() in self.__cursor_properties:
1058 comparison_entity.add_property().MergeFrom(prop)
1059 if cursor[0].has_key():
1060 comparison_entity.mutable_key().MergeFrom(entity.key())
1061 x = self.__order_compare_entities(comparison_entity, cursor[0])
1062 if cursor[1]:
1063 return x < 0
1064 else:
1065 return x <= 0
1067 def _DecodeCompiledCursor(self, compiled_cursor):
1068 """Converts a compiled_cursor into a cursor_entity.
1070 Args:
1071 compiled_cursor: The datastore_pb.CompiledCursor to decode.
1073 Returns:
1074 (cursor_entity, inclusive): a entity_pb.EntityProto and if it should
1075 be included in the result set.
1077 assert compiled_cursor.has_position()
1079 position = compiled_cursor.position()
1084 remaining_properties = set(self.__cursor_properties)
1086 cursor_entity = entity_pb.EntityProto()
1087 if position.has_key():
1088 cursor_entity.mutable_key().CopyFrom(position.key())
1089 try:
1090 remaining_properties.remove('__key__')
1091 except KeyError:
1092 Check(False, 'Cursor does not match query: extra value __key__')
1093 for indexvalue in position.indexvalue_list():
1094 property = cursor_entity.add_property()
1095 property.set_name(indexvalue.property())
1096 property.mutable_value().CopyFrom(indexvalue.value())
1097 try:
1098 remaining_properties.remove(indexvalue.property())
1099 except KeyError:
1100 Check(False, 'Cursor does not match query: extra value %s' %
1101 indexvalue.property())
1102 Check(not remaining_properties,
1103 'Cursor does not match query: missing values for %r' %
1104 remaining_properties)
1108 return (cursor_entity, position.start_inclusive())
1110 def _EncodeCompiledCursor(self, last_result, compiled_cursor):
1111 """Converts the current state of the cursor into a compiled_cursor.
1113 Args:
1114 last_result: the last result returned by this query.
1115 compiled_cursor: an empty datstore_pb.CompiledCursor.
1117 if last_result is not None:
1120 position = compiled_cursor.mutable_position()
1123 if '__key__' in self.__cursor_properties:
1124 position.mutable_key().MergeFrom(last_result.key())
1125 for prop in last_result.property_list():
1126 if prop.name() in self.__cursor_properties:
1127 indexvalue = position.add_indexvalue()
1128 indexvalue.set_property(prop.name())
1129 indexvalue.mutable_value().CopyFrom(prop.value())
1130 position.set_start_inclusive(False)
1131 _SetBeforeAscending(position, self.__first_sort_order)
1134 class ListCursor(BaseCursor):
1135 """A query cursor over a list of entities.
1137 Public properties:
1138 keys_only: whether the query is keys_only
1141 def __init__(self, query, dsquery, orders, index_list, results):
1142 """Constructor.
1144 Args:
1145 query: the query request proto
1146 dsquery: a datastore_query.Query over query.
1147 orders: the orders of query as returned by _GuessOrders.
1148 index_list: the list of indexes used by the query.
1149 results: list of entity_pb.EntityProto
1151 super(ListCursor, self).__init__(query, dsquery, orders, index_list)
1154 if self.group_by:
1155 distincts = set()
1156 new_results = []
1157 for result in results:
1158 key_value = _GetGroupByKey(result, self.group_by)
1159 if key_value not in distincts:
1160 distincts.add(key_value)
1161 new_results.append(result)
1162 results = new_results
1164 if query.has_compiled_cursor() and query.compiled_cursor().has_position():
1165 start_cursor = self._DecodeCompiledCursor(query.compiled_cursor())
1166 self.__last_result = start_cursor[0]
1167 start_cursor_position = self._GetCursorOffset(results, start_cursor)
1168 else:
1169 self.__last_result = None
1170 start_cursor_position = 0
1172 if query.has_end_compiled_cursor():
1173 if query.end_compiled_cursor().has_position():
1174 end_cursor = self._DecodeCompiledCursor(query.end_compiled_cursor())
1175 end_cursor_position = self._GetCursorOffset(results, end_cursor)
1176 else:
1177 end_cursor_position = 0
1178 else:
1179 end_cursor_position = len(results)
1182 results = results[start_cursor_position:end_cursor_position]
1185 if query.has_limit():
1186 limit = query.limit()
1187 if query.offset():
1188 limit += query.offset()
1189 if limit >= 0 and limit < len(results):
1190 results = results[:limit]
1192 self.__results = results
1193 self.__offset = 0
1194 self.__count = len(self.__results)
1196 def _GetCursorOffset(self, results, cursor):
1197 """Converts a cursor into a offset into the result set even if the
1198 cursor's entity no longer exists.
1200 Args:
1201 results: the query's results (sequence of entity_pb.EntityProto)
1202 cursor: a compiled cursor as returned by _DecodeCompiledCursor
1203 Returns:
1204 the integer offset
1206 lo = 0
1207 hi = len(results)
1208 while lo < hi:
1209 mid = (lo + hi) // 2
1210 if self._IsBeforeCursor(results[mid], cursor):
1211 lo = mid + 1
1212 else:
1213 hi = mid
1214 return lo
1216 def PopulateQueryResult(self, result, count, offset,
1217 compile=False, first_result=False):
1218 """Populates a QueryResult with this cursor and the given number of results.
1220 Args:
1221 result: datastore_pb.QueryResult
1222 count: integer of how many results to return
1223 offset: integer of how many results to skip
1224 compile: boolean, whether we are compiling this query
1225 first_result: whether the query result is the first for this query
1227 Check(offset >= 0, 'Offset must be >= 0')
1229 offset = min(offset, self.__count - self.__offset)
1230 limited_offset = min(offset, _MAX_QUERY_OFFSET)
1231 if limited_offset:
1232 self.__offset += limited_offset
1233 result.set_skipped_results(limited_offset)
1235 if compile and result.skipped_results() > 0:
1236 self._EncodeCompiledCursor(self.__results[self.__offset - 1],
1237 result.mutable_skipped_results_compiled_cursor())
1238 if offset == limited_offset and count:
1240 if count > _MAXIMUM_RESULTS:
1241 count = _MAXIMUM_RESULTS
1242 results = self.__results[self.__offset:self.__offset + count]
1243 count = len(results)
1244 self.__offset += count
1250 result.result_list().extend(
1251 LoadEntity(entity, self.keys_only, self.property_names)
1252 for entity in results)
1253 if compile:
1254 for entity in results:
1255 self._EncodeCompiledCursor(entity,
1256 result.add_result_compiled_cursor())
1258 if self.__offset:
1260 self.__last_result = self.__results[self.__offset - 1]
1262 result.set_more_results(self.__offset < self.__count)
1263 self._PopulateResultMetadata(result, compile,
1264 first_result, self.__last_result)
1267 def _SynchronizeTxn(function):
1268 """A decorator that locks a transaction during the function call."""
1270 def sync(txn, *args, **kwargs):
1272 txn._lock.acquire()
1273 try:
1275 Check(txn._state is LiveTxn.ACTIVE, 'transaction closed')
1277 return function(txn, *args, **kwargs)
1278 finally:
1280 txn._lock.release()
1281 return sync
1284 def _GetEntityGroup(ref):
1285 """Returns the entity group key for the given reference."""
1286 entity_group = entity_pb.Reference()
1287 entity_group.CopyFrom(ref)
1288 assert (entity_group.path().element_list()[0].has_id() or
1289 entity_group.path().element_list()[0].has_name())
1290 del entity_group.path().element_list()[1:]
1291 return entity_group
1294 def _GetKeyKind(key):
1295 """Return the kind of the given key."""
1296 return key.path().element_list()[-1].type()
1299 def _FilterIndexesByKind(key, indexes):
1300 """Return only the indexes with the specified kind."""
1301 return filter((lambda index:
1302 index.definition().entity_type() == _GetKeyKind(key)), indexes)
1305 class LiveTxn(object):
1306 """An in flight transaction."""
1325 ACTIVE = 1
1326 COMMITED = 2
1327 ROLLEDBACK = 3
1328 FAILED = 4
1330 _state = ACTIVE
1331 _commit_time_s = None
1333 def __init__(self, txn_manager, app, allow_multiple_eg):
1334 assert isinstance(txn_manager, BaseTransactionManager)
1335 assert isinstance(app, basestring)
1337 self._txn_manager = txn_manager
1338 self._app = app
1339 self._allow_multiple_eg = allow_multiple_eg
1342 self._entity_groups = {}
1344 self._lock = threading.RLock()
1345 self._apply_lock = threading.Lock()
1347 self._actions = []
1348 self._cost = datastore_pb.Cost()
1354 self._kind_to_indexes = collections.defaultdict(list)
1356 def _GetTracker(self, reference):
1357 """Gets the entity group tracker for reference.
1359 If this is the first time reference's entity group is seen, creates a new
1360 tracker, checking that the transaction doesn't exceed the entity group
1361 limit.
1363 entity_group = _GetEntityGroup(reference)
1364 key = datastore_types.ReferenceToKeyValue(entity_group)
1365 tracker = self._entity_groups.get(key, None)
1366 if tracker is None:
1367 Check(self._app == reference.app(),
1368 'Transactions cannot span applications (expected %s, got %s)' %
1369 (self._app, reference.app()))
1370 if self._allow_multiple_eg:
1371 Check(len(self._entity_groups) < _MAX_EG_PER_TXN,
1372 'operating on too many entity groups in a single transaction.')
1373 else:
1374 Check(len(self._entity_groups) < 1,
1375 "cross-groups transaction need to be explicitly "
1376 "specified (xg=True)")
1377 tracker = EntityGroupTracker(entity_group)
1378 self._entity_groups[key] = tracker
1380 return tracker
1382 def _GetAllTrackers(self):
1383 """Get the trackers for the transaction's entity groups.
1385 If no entity group has been discovered returns a 'global' entity group
1386 tracker. This is possible if the txn only contains transactional tasks.
1388 Returns:
1389 The tracker list for the entity groups used in this txn.
1391 if not self._entity_groups:
1392 self._GetTracker(datastore_types.Key.from_path(
1393 '__global__', 1, _app=self._app)._ToPb())
1394 return self._entity_groups.values()
1396 def _GrabSnapshot(self, reference):
1397 """Gets snapshot for this reference, creating it if necessary.
1399 If no snapshot has been set for reference's entity group, a snapshot is
1400 taken and stored for future reads (this also sets the read position),
1401 and a CONCURRENT_TRANSACTION exception is thrown if we no longer have
1402 a consistent snapshot.
1404 Args:
1405 reference: A entity_pb.Reference from which to extract the entity group.
1406 Raises:
1407 apiproxy_errors.ApplicationError if the snapshot is not consistent.
1409 tracker = self._GetTracker(reference)
1410 check_contention = tracker._snapshot is None
1411 snapshot = tracker._GrabSnapshot(self._txn_manager)
1412 if check_contention:
1418 candidates = [other for other in self._entity_groups.values()
1419 if other._snapshot is not None and other != tracker]
1420 meta_data_list = [other._meta_data for other in candidates]
1421 self._txn_manager._AcquireWriteLocks(meta_data_list)
1422 try:
1423 for other in candidates:
1424 if other._meta_data._log_pos != other._read_pos:
1425 self._state = self.FAILED
1426 raise apiproxy_errors.ApplicationError(
1427 datastore_pb.Error.CONCURRENT_TRANSACTION,
1428 'Concurrency exception.')
1429 finally:
1430 self._txn_manager._ReleaseWriteLocks(meta_data_list)
1431 return snapshot
1433 @_SynchronizeTxn
1434 def Get(self, reference):
1435 """Returns the entity associated with the given entity_pb.Reference or None.
1437 Does not see any modifications in the current txn.
1439 Args:
1440 reference: The entity_pb.Reference of the entity to look up.
1442 Returns:
1443 The associated entity_pb.EntityProto or None if no such entity exists.
1445 snapshot = self._GrabSnapshot(reference)
1446 entity = snapshot.get(datastore_types.ReferenceToKeyValue(reference))
1447 return LoadEntity(entity)
1449 @_SynchronizeTxn
1450 def GetQueryCursor(self, query, filters, orders, index_list,
1451 filter_predicate=None):
1452 """Runs the given datastore_pb.Query and returns a QueryCursor for it.
1454 Does not see any modifications in the current txn.
1456 Args:
1457 query: The datastore_pb.Query to run.
1458 filters: A list of filters that override the ones found on query.
1459 orders: A list of orders that override the ones found on query.
1460 index_list: A list of indexes used by the query.
1461 filter_predicate: an additional filter of type
1462 datastore_query.FilterPredicate. This is passed along to implement V4
1463 specific filters without changing the entire stub.
1465 Returns:
1466 A BaseCursor that can be used to fetch query results.
1468 Check(query.has_ancestor(),
1469 'Query must have an ancestor when performed in a transaction.')
1470 snapshot = self._GrabSnapshot(query.ancestor())
1471 return _ExecuteQuery(snapshot.values(), query, filters, orders, index_list,
1472 filter_predicate)
1474 @_SynchronizeTxn
1475 def Put(self, entity, insert, indexes):
1476 """Puts the given entity.
1478 Args:
1479 entity: The entity_pb.EntityProto to put.
1480 insert: A boolean that indicates if we should fail if the entity already
1481 exists.
1482 indexes: The composite indexes that apply to the entity.
1484 tracker = self._GetTracker(entity.key())
1485 key = datastore_types.ReferenceToKeyValue(entity.key())
1486 tracker._delete.pop(key, None)
1487 tracker._put[key] = (entity, insert)
1488 self._kind_to_indexes[_GetKeyKind(entity.key())] = indexes
1490 @_SynchronizeTxn
1491 def Delete(self, reference, indexes):
1492 """Deletes the entity associated with the given reference.
1494 Args:
1495 reference: The entity_pb.Reference of the entity to delete.
1496 indexes: The composite indexes that apply to the entity.
1498 tracker = self._GetTracker(reference)
1499 key = datastore_types.ReferenceToKeyValue(reference)
1500 tracker._put.pop(key, None)
1501 tracker._delete[key] = reference
1502 self._kind_to_indexes[_GetKeyKind(reference)] = indexes
1504 @_SynchronizeTxn
1505 def AddActions(self, actions, max_actions=None):
1506 """Adds the given actions to the current txn.
1508 Args:
1509 actions: A list of pbs to send to taskqueue.Add when the txn is applied.
1510 max_actions: A number that indicates the maximum number of actions to
1511 allow on this txn.
1513 Check(not max_actions or len(self._actions) + len(actions) <= max_actions,
1514 'Too many messages, maximum allowed %s' % max_actions)
1515 self._actions.extend(actions)
1517 def Rollback(self):
1518 """Rollback the current txn."""
1520 self._lock.acquire()
1521 try:
1522 Check(self._state is self.ACTIVE or self._state is self.FAILED,
1523 'transaction closed')
1524 self._state = self.ROLLEDBACK
1525 finally:
1526 self._txn_manager._RemoveTxn(self)
1528 self._lock.release()
1530 @_SynchronizeTxn
1531 def Commit(self):
1532 """Commits the current txn.
1534 This function hands off the responsibility of calling _Apply to the owning
1535 TransactionManager.
1537 Returns:
1538 The cost of the transaction.
1540 try:
1542 trackers = self._GetAllTrackers()
1543 empty = True
1544 for tracker in trackers:
1545 snapshot = tracker._GrabSnapshot(self._txn_manager)
1546 empty = empty and not tracker._put and not tracker._delete
1549 for entity, insert in tracker._put.itervalues():
1550 Check(not insert or self.Get(entity.key()) is None,
1551 'the id allocated for a new entity was already '
1552 'in use, please try again')
1554 old_entity = None
1555 key = datastore_types.ReferenceToKeyValue(entity.key())
1556 if key in snapshot:
1557 old_entity = snapshot[key]
1558 self._AddWriteOps(old_entity, entity)
1560 for reference in tracker._delete.itervalues():
1563 old_entity = None
1564 key = datastore_types.ReferenceToKeyValue(reference)
1565 if key in snapshot:
1566 old_entity = snapshot[key]
1567 if old_entity is not None:
1568 self._AddWriteOps(None, old_entity)
1571 if empty and not self._actions:
1572 self.Rollback()
1573 return datastore_pb.Cost()
1576 meta_data_list = [tracker._meta_data for tracker in trackers]
1577 self._txn_manager._AcquireWriteLocks(meta_data_list)
1578 except:
1580 self.Rollback()
1581 raise
1583 try:
1585 for tracker in trackers:
1586 Check(tracker._meta_data._log_pos == tracker._read_pos,
1587 'Concurrency exception.',
1588 datastore_pb.Error.CONCURRENT_TRANSACTION)
1591 for tracker in trackers:
1592 tracker._meta_data.Log(self)
1593 self._state = self.COMMITED
1594 self._commit_time_s = time.time()
1595 except:
1597 self.Rollback()
1598 raise
1599 else:
1601 for action in self._actions:
1602 try:
1603 apiproxy_stub_map.MakeSyncCall(
1604 'taskqueue', 'Add', action, api_base_pb.VoidProto())
1605 except apiproxy_errors.ApplicationError, e:
1606 logging.warning('Transactional task %s has been dropped, %s',
1607 action, e)
1608 self._actions = []
1609 finally:
1610 self._txn_manager._RemoveTxn(self)
1612 self._txn_manager._ReleaseWriteLocks(meta_data_list)
1615 self._txn_manager._consistency_policy._OnCommit(self)
1616 return self._cost
1618 def _AddWriteOps(self, old_entity, new_entity):
1619 """Adds the cost of writing the new_entity to the _cost member.
1621 We assume that old_entity represents the current state of the Datastore.
1623 Args:
1624 old_entity: Entity representing the current state in the Datstore.
1625 new_entity: Entity representing the desired state in the Datstore.
1627 composite_indexes = self._kind_to_indexes[_GetKeyKind(new_entity.key())]
1628 entity_writes, index_writes = _CalculateWriteOps(
1629 composite_indexes, old_entity, new_entity)
1630 _UpdateCost(self._cost, entity_writes, index_writes)
1632 def _Apply(self, meta_data):
1633 """Applies the current txn on the given entity group.
1635 This function blindly performs the operations contained in the current txn.
1636 The calling function must acquire the entity group write lock and ensure
1637 transactions are applied in order.
1640 self._apply_lock.acquire()
1641 try:
1643 assert self._state == self.COMMITED
1644 for tracker in self._entity_groups.values():
1645 if tracker._meta_data is meta_data:
1646 break
1647 else:
1648 assert False
1649 assert tracker._read_pos != tracker.APPLIED
1652 for entity, insert in tracker._put.itervalues():
1653 self._txn_manager._Put(entity, insert)
1656 for key in tracker._delete.itervalues():
1657 self._txn_manager._Delete(key)
1661 tracker._read_pos = EntityGroupTracker.APPLIED
1664 tracker._meta_data.Unlog(self)
1665 finally:
1666 self._apply_lock.release()
1669 class EntityGroupTracker(object):
1670 """An entity group involved a transaction."""
1672 APPLIED = -2
1678 _read_pos = None
1681 _snapshot = None
1684 _meta_data = None
1686 def __init__(self, entity_group):
1687 self._entity_group = entity_group
1688 self._put = {}
1689 self._delete = {}
1691 def _GrabSnapshot(self, txn_manager):
1692 """Snapshot this entity group, remembering the read position."""
1693 if self._snapshot is None:
1694 self._meta_data, self._read_pos, self._snapshot = (
1695 txn_manager._GrabSnapshot(self._entity_group))
1696 return self._snapshot
1699 class EntityGroupMetaData(object):
1700 """The meta_data assoicated with an entity group."""
1703 _log_pos = -1
1705 _snapshot = None
1707 def __init__(self, entity_group):
1708 self._entity_group = entity_group
1709 self._write_lock = threading.Lock()
1710 self._apply_queue = []
1712 def CatchUp(self):
1713 """Applies all outstanding txns."""
1715 assert self._write_lock.acquire(False) is False
1717 while self._apply_queue:
1718 self._apply_queue[0]._Apply(self)
1720 def Log(self, txn):
1721 """Add a pending transaction to this entity group.
1723 Requires that the caller hold the meta data lock.
1724 This also increments the current log position and clears the snapshot cache.
1727 assert self._write_lock.acquire(False) is False
1728 self._apply_queue.append(txn)
1729 self._log_pos += 1
1730 self._snapshot = None
1732 def Unlog(self, txn):
1733 """Remove the first pending transaction from the apply queue.
1735 Requires that the caller hold the meta data lock.
1736 This checks that the first pending transaction is indeed txn.
1739 assert self._write_lock.acquire(False) is False
1741 Check(self._apply_queue and self._apply_queue[0] is txn,
1742 'Transaction is not appliable',
1743 datastore_pb.Error.INTERNAL_ERROR)
1744 self._apply_queue.pop(0)
1747 class BaseConsistencyPolicy(object):
1748 """A base class for a consistency policy to be used with a transaction manger.
1753 def _OnCommit(self, txn):
1754 """Called after a LiveTxn has been commited.
1756 This function can decide whether to apply the txn right away.
1758 Args:
1759 txn: A LiveTxn that has been commited
1761 raise NotImplementedError
1763 def _OnGroom(self, meta_data_list):
1764 """Called once for every global query.
1766 This function must aqcuire the write lock for any meta data before applying
1767 any outstanding txns.
1769 Args:
1770 meta_data_list: A list of EntityGroupMetaData objects.
1772 raise NotImplementedError
1775 class MasterSlaveConsistencyPolicy(BaseConsistencyPolicy):
1776 """Enforces the Master / Slave consistency policy.
1778 Applies all txn on commit.
1781 def _OnCommit(self, txn):
1783 for tracker in txn._GetAllTrackers():
1784 tracker._meta_data._write_lock.acquire()
1785 try:
1786 tracker._meta_data.CatchUp()
1787 finally:
1788 tracker._meta_data._write_lock.release()
1793 txn._txn_manager.Write()
1795 def _OnGroom(self, meta_data_list):
1798 pass
1801 class BaseHighReplicationConsistencyPolicy(BaseConsistencyPolicy):
1802 """A base class for High Replication Datastore consistency policies.
1804 All txn are applied asynchronously.
1807 def _OnCommit(self, txn):
1808 pass
1810 def _OnGroom(self, meta_data_list):
1813 for meta_data in meta_data_list:
1814 if not meta_data._apply_queue:
1815 continue
1818 meta_data._write_lock.acquire()
1819 try:
1820 while meta_data._apply_queue:
1821 txn = meta_data._apply_queue[0]
1822 if self._ShouldApply(txn, meta_data):
1823 txn._Apply(meta_data)
1824 else:
1825 break
1826 finally:
1827 meta_data._write_lock.release()
1829 def _ShouldApply(self, txn, meta_data):
1830 """Determins if the given transaction should be applied."""
1831 raise NotImplementedError
1834 class TimeBasedHRConsistencyPolicy(BaseHighReplicationConsistencyPolicy):
1835 """A High Replication Datastore consiseny policy based on elapsed time.
1837 This class tries to simulate performance seen in the high replication
1838 datastore using estimated probabilities of a transaction commiting after a
1839 given amount of time.
1842 _classification_map = [(.98, 100),
1843 (.99, 300),
1844 (.995, 2000),
1845 (1, 240000)
1848 def SetClassificationMap(self, classification_map):
1849 """Set the probability a txn will be applied after a given amount of time.
1851 Args:
1852 classification_map: A list of tuples containing (float between 0 and 1,
1853 number of miliseconds) that define the probability of a transaction
1854 applying after a given amount of time.
1856 for prob, delay in classification_map:
1857 if prob < 0 or prob > 1 or delay <= 0:
1858 raise TypeError(
1859 'classification_map must be a list of (probability, delay) tuples, '
1860 'found %r' % (classification_map,))
1862 self._classification_map = sorted(classification_map)
1864 def _ShouldApplyImpl(self, elapsed_ms, classification):
1865 for rate, ms in self._classification_map:
1866 if classification <= rate:
1867 break
1868 return elapsed_ms >= ms
1870 def _Classify(self, txn, meta_data):
1871 return random.Random(id(txn) ^ id(meta_data)).random()
1873 def _ShouldApply(self, txn, meta_data):
1874 elapsed_ms = (time.time() - txn._commit_time_s) * 1000
1875 classification = self._Classify(txn, meta_data)
1876 return self._ShouldApplyImpl(elapsed_ms, classification)
1879 class PseudoRandomHRConsistencyPolicy(BaseHighReplicationConsistencyPolicy):
1880 """A policy that always gives the same sequence of consistency decisions."""
1882 def __init__(self, probability=.5, seed=0):
1883 """Constructor.
1885 Args:
1886 probability: A number between 0 and 1 that is the likelihood of a
1887 transaction applying before a global query is executed.
1888 seed: A hashable object to use as a seed. Use None to use the current
1889 timestamp.
1891 self.SetProbability(probability)
1892 self.SetSeed(seed)
1894 def SetProbability(self, probability):
1895 """Change the probability of a transaction applying.
1897 Args:
1898 probability: A number between 0 and 1 that determins the probability of a
1899 transaction applying before a global query is run.
1901 if probability < 0 or probability > 1:
1902 raise TypeError('probability must be a number between 0 and 1, found %r' %
1903 probability)
1904 self._probability = probability
1906 def SetSeed(self, seed):
1907 """Reset the seed."""
1908 self._random = random.Random(seed)
1910 def _ShouldApply(self, txn, meta_data):
1911 return self._random.random() < self._probability
1914 class BaseTransactionManager(object):
1915 """A class that manages the state of transactions.
1917 This includes creating consistent snap shots for transactions.
1920 def __init__(self, consistency_policy=None):
1921 super(BaseTransactionManager, self).__init__()
1923 self._consistency_policy = (consistency_policy or
1924 MasterSlaveConsistencyPolicy())
1927 self._meta_data_lock = threading.Lock()
1928 BaseTransactionManager.Clear(self)
1930 def SetConsistencyPolicy(self, policy):
1931 """Set the consistency to use.
1933 Causes all data to be flushed.
1935 Args:
1936 policy: A obj inheriting from BaseConsistencyPolicy.
1938 if not isinstance(policy, BaseConsistencyPolicy):
1939 raise TypeError('policy should be of type '
1940 'datastore_stub_util.BaseConsistencyPolicy found %r.' %
1941 (policy,))
1942 self.Flush()
1943 self._consistency_policy = policy
1945 def Clear(self):
1946 """Discards any pending transactions and resets the meta data."""
1948 self._meta_data = {}
1950 self._txn_map = {}
1952 def BeginTransaction(self, app, allow_multiple_eg):
1953 """Start a transaction on the given app.
1955 Args:
1956 app: A string representing the app for which to start the transaction.
1957 allow_multiple_eg: True if transactions can span multiple entity groups.
1959 Returns:
1960 A datastore_pb.Transaction for the created transaction
1962 Check(not (allow_multiple_eg and isinstance(
1963 self._consistency_policy, MasterSlaveConsistencyPolicy)),
1964 'transactions on multiple entity groups only allowed with the '
1965 'High Replication datastore')
1966 txn = self._BeginTransaction(app, allow_multiple_eg)
1967 self._txn_map[id(txn)] = txn
1968 transaction = datastore_pb.Transaction()
1969 transaction.set_app(app)
1970 transaction.set_handle(id(txn))
1971 return transaction
1973 def GetTxn(self, transaction, request_trusted, request_app):
1974 """Gets the LiveTxn object associated with the given transaction.
1976 Args:
1977 transaction: The datastore_pb.Transaction to look up.
1978 request_trusted: A boolean indicating If the requesting app is trusted.
1979 request_app: A string representing the app making the request.
1981 Returns:
1982 The associated LiveTxn object.
1984 request_app = datastore_types.ResolveAppId(request_app)
1985 CheckTransaction(request_trusted, request_app, transaction)
1986 txn = self._txn_map.get(transaction.handle())
1987 Check(txn and txn._app == transaction.app(),
1988 'Transaction(<%s>) not found' % str(transaction).replace('\n', ', '))
1989 return txn
1991 def Groom(self):
1992 """Attempts to apply any outstanding transactions.
1994 The consistency policy determins if a transaction should be applied.
1996 self._meta_data_lock.acquire()
1997 try:
1998 self._consistency_policy._OnGroom(self._meta_data.itervalues())
1999 finally:
2000 self._meta_data_lock.release()
2002 def Flush(self):
2003 """Applies all outstanding transactions."""
2004 self._meta_data_lock.acquire()
2005 try:
2006 for meta_data in self._meta_data.itervalues():
2007 if not meta_data._apply_queue:
2008 continue
2011 meta_data._write_lock.acquire()
2012 try:
2013 meta_data.CatchUp()
2014 finally:
2015 meta_data._write_lock.release()
2016 finally:
2017 self._meta_data_lock.release()
2019 def _GetMetaData(self, entity_group):
2020 """Safely gets the EntityGroupMetaData object for the given entity_group.
2022 self._meta_data_lock.acquire()
2023 try:
2024 key = datastore_types.ReferenceToKeyValue(entity_group)
2026 meta_data = self._meta_data.get(key, None)
2027 if not meta_data:
2028 meta_data = EntityGroupMetaData(entity_group)
2029 self._meta_data[key] = meta_data
2030 return meta_data
2031 finally:
2032 self._meta_data_lock.release()
2034 def _BeginTransaction(self, app, allow_multiple_eg):
2035 """Starts a transaction without storing it in the txn_map."""
2036 return LiveTxn(self, app, allow_multiple_eg)
2038 def _GrabSnapshot(self, entity_group):
2039 """Grabs a consistent snapshot of the given entity group.
2041 Args:
2042 entity_group: A entity_pb.Reference of the entity group of which the
2043 snapshot should be taken.
2045 Returns:
2046 A tuple of (meta_data, log_pos, snapshot) where log_pos is the current log
2047 position and snapshot is a map of reference key value to
2048 entity_pb.EntityProto.
2051 meta_data = self._GetMetaData(entity_group)
2052 meta_data._write_lock.acquire()
2053 try:
2054 if not meta_data._snapshot:
2056 meta_data.CatchUp()
2057 meta_data._snapshot = self._GetEntitiesInEntityGroup(entity_group)
2058 return meta_data, meta_data._log_pos, meta_data._snapshot
2059 finally:
2061 meta_data._write_lock.release()
2063 def _AcquireWriteLocks(self, meta_data_list):
2064 """Acquire the write locks for the given entity group meta data.
2066 These locks must be released with _ReleaseWriteLock before returning to the
2067 user.
2069 Args:
2070 meta_data_list: list of EntityGroupMetaData objects.
2072 for meta_data in sorted(meta_data_list):
2073 meta_data._write_lock.acquire()
2075 def _ReleaseWriteLocks(self, meta_data_list):
2076 """Release the write locks of the given entity group meta data.
2078 Args:
2079 meta_data_list: list of EntityGroupMetaData objects.
2081 for meta_data in sorted(meta_data_list):
2082 meta_data._write_lock.release()
2084 def _RemoveTxn(self, txn):
2085 """Removes a LiveTxn from the txn_map (if present)."""
2086 self._txn_map.pop(id(txn), None)
2088 def _Put(self, entity, insert):
2089 """Put the given entity.
2091 This must be implemented by a sub-class. The sub-class can assume that any
2092 need consistency is enforced at a higher level (and can just put blindly).
2094 Args:
2095 entity: The entity_pb.EntityProto to put.
2096 insert: A boolean that indicates if we should fail if the entity already
2097 exists.
2099 raise NotImplementedError
2101 def _Delete(self, reference):
2102 """Delete the entity associated with the specified reference.
2104 This must be implemented by a sub-class. The sub-class can assume that any
2105 need consistency is enforced at a higher level (and can just delete
2106 blindly).
2108 Args:
2109 reference: The entity_pb.Reference of the entity to delete.
2111 raise NotImplementedError
2113 def _GetEntitiesInEntityGroup(self, entity_group):
2114 """Gets the contents of a specific entity group.
2116 This must be implemented by a sub-class. The sub-class can assume that any
2117 need consistency is enforced at a higher level (and can just blindly read).
2119 Other entity groups may be modified concurrently.
2121 Args:
2122 entity_group: A entity_pb.Reference of the entity group to get.
2124 Returns:
2125 A dict mapping datastore_types.ReferenceToKeyValue(key) to EntityProto
2127 raise NotImplementedError
2130 class BaseIndexManager(object):
2131 """A generic index manager that stores all data in memory."""
2140 WRITE_ONLY = entity_pb.CompositeIndex.WRITE_ONLY
2141 READ_WRITE = entity_pb.CompositeIndex.READ_WRITE
2142 DELETED = entity_pb.CompositeIndex.DELETED
2143 ERROR = entity_pb.CompositeIndex.ERROR
2145 _INDEX_STATE_TRANSITIONS = {
2146 WRITE_ONLY: frozenset((READ_WRITE, DELETED, ERROR)),
2147 READ_WRITE: frozenset((DELETED,)),
2148 ERROR: frozenset((DELETED,)),
2149 DELETED: frozenset((ERROR,)),
2152 def __init__(self):
2156 self.__indexes = collections.defaultdict(list)
2157 self.__indexes_lock = threading.Lock()
2158 self.__next_index_id = 1
2159 self.__index_id_lock = threading.Lock()
2161 def __FindIndex(self, index):
2162 """Finds an existing index by definition.
2164 Args:
2165 index: entity_pb.CompositeIndex
2167 Returns:
2168 entity_pb.CompositeIndex, if it exists; otherwise None
2170 app = index.app_id()
2171 if app in self.__indexes:
2172 for stored_index in self.__indexes[app]:
2173 if index.definition() == stored_index.definition():
2174 return stored_index
2176 return None
2178 def CreateIndex(self, index, trusted=False, calling_app=None):
2181 calling_app = datastore_types.ResolveAppId(calling_app)
2182 CheckAppId(trusted, calling_app, index.app_id())
2183 Check(index.id() == 0, 'New index id must be 0.')
2184 Check(not self.__FindIndex(index), 'Index already exists.')
2187 self.__index_id_lock.acquire()
2188 index.set_id(self.__next_index_id)
2189 self.__next_index_id += 1
2190 self.__index_id_lock.release()
2193 clone = entity_pb.CompositeIndex()
2194 clone.CopyFrom(index)
2195 app = index.app_id()
2196 clone.set_app_id(app)
2199 self.__indexes_lock.acquire()
2200 try:
2201 self.__indexes[app].append(clone)
2202 finally:
2203 self.__indexes_lock.release()
2205 self._OnIndexChange(index.app_id())
2207 return index.id()
2209 def GetIndexes(self, app, trusted=False, calling_app=None):
2210 """Get the CompositeIndex objects for the given app."""
2211 calling_app = datastore_types.ResolveAppId(calling_app)
2212 CheckAppId(trusted, calling_app, app)
2214 return self.__indexes[app]
2216 def UpdateIndex(self, index, trusted=False, calling_app=None):
2217 CheckAppId(trusted, calling_app, index.app_id())
2219 stored_index = self.__FindIndex(index)
2220 Check(stored_index, 'Index does not exist.')
2221 Check(index.state() == stored_index.state() or
2222 index.state() in self._INDEX_STATE_TRANSITIONS[stored_index.state()],
2223 'cannot move index state from %s to %s' %
2224 (entity_pb.CompositeIndex.State_Name(stored_index.state()),
2225 (entity_pb.CompositeIndex.State_Name(index.state()))))
2228 self.__indexes_lock.acquire()
2229 try:
2230 stored_index.set_state(index.state())
2231 finally:
2232 self.__indexes_lock.release()
2234 self._OnIndexChange(index.app_id())
2236 def DeleteIndex(self, index, trusted=False, calling_app=None):
2237 CheckAppId(trusted, calling_app, index.app_id())
2239 stored_index = self.__FindIndex(index)
2240 Check(stored_index, 'Index does not exist.')
2243 app = index.app_id()
2244 self.__indexes_lock.acquire()
2245 try:
2246 self.__indexes[app].remove(stored_index)
2247 finally:
2248 self.__indexes_lock.release()
2250 self._OnIndexChange(index.app_id())
2252 def _SideLoadIndex(self, index):
2253 self.__indexes[index.app()].append(index)
2255 def _OnIndexChange(self, app_id):
2256 pass
2259 class BaseDatastore(BaseTransactionManager, BaseIndexManager):
2260 """A base implemenation of a Datastore.
2262 This class implements common functions associated with a datastore and
2263 enforces security restrictions passed on by a stub or client. It is designed
2264 to be shared by any number of threads or clients serving any number of apps.
2266 If an app is not specified explicitly it is pulled from the env and assumed to
2267 be untrusted.
2272 _MAX_QUERY_COMPONENTS = 100
2276 _BATCH_SIZE = 20
2280 _MAX_ACTIONS_PER_TXN = 5
2282 def __init__(self, require_indexes=False, consistency_policy=None,
2283 use_atexit=True, auto_id_policy=SEQUENTIAL):
2284 BaseTransactionManager.__init__(self, consistency_policy=consistency_policy)
2285 BaseIndexManager.__init__(self)
2287 self._require_indexes = require_indexes
2288 self._pseudo_kinds = {}
2289 self.SetAutoIdPolicy(auto_id_policy)
2291 if use_atexit:
2296 atexit.register(self.Write)
2298 def Clear(self):
2299 """Clears out all stored values."""
2301 BaseTransactionManager.Clear(self)
2304 def _RegisterPseudoKind(self, kind):
2305 """Registers a pseudo kind to be used to satisfy a meta data query."""
2306 self._pseudo_kinds[kind.name] = kind
2307 kind._stub = weakref.proxy(self)
2312 def GetQueryCursor(self, raw_query, trusted=False, calling_app=None,
2313 filter_predicate=None):
2314 """Execute a query.
2316 Args:
2317 raw_query: The non-validated datastore_pb.Query to run.
2318 trusted: If the calling app is trusted.
2319 calling_app: The app requesting the results or None to pull the app from
2320 the environment.
2321 filter_predicate: an additional filter of type
2322 datastore_query.FilterPredicate. This is passed along to implement V4
2323 specific filters without changing the entire stub.
2325 Returns:
2326 A BaseCursor that can be used to retrieve results.
2329 calling_app = datastore_types.ResolveAppId(calling_app)
2330 CheckAppId(trusted, calling_app, raw_query.app())
2333 filters, orders = datastore_index.Normalize(raw_query.filter_list(),
2334 raw_query.order_list(),
2335 raw_query.property_name_list())
2338 CheckQuery(raw_query, filters, orders, self._MAX_QUERY_COMPONENTS)
2339 FillUsersInQuery(filters)
2341 index_list = []
2345 if filter_predicate is None:
2346 self._CheckHasIndex(raw_query, trusted, calling_app)
2349 index_list = self.__IndexListForQuery(raw_query)
2352 if raw_query.has_transaction():
2354 Check(raw_query.kind() not in self._pseudo_kinds,
2355 'transactional queries on "%s" not allowed' % raw_query.kind())
2356 txn = self.GetTxn(raw_query.transaction(), trusted, calling_app)
2357 return txn.GetQueryCursor(raw_query, filters, orders, index_list)
2359 if raw_query.has_ancestor() and raw_query.kind() not in self._pseudo_kinds:
2361 txn = self._BeginTransaction(raw_query.app(), False)
2362 return txn.GetQueryCursor(raw_query, filters, orders, index_list,
2363 filter_predicate)
2366 self.Groom()
2367 return self._GetQueryCursor(raw_query, filters, orders, index_list,
2368 filter_predicate)
2370 def __IndexListForQuery(self, query):
2371 """Get the single composite index pb used by the query, if any, as a list.
2373 Args:
2374 query: the datastore_pb.Query to compute the index list for
2376 Returns:
2377 A singleton list of the composite index pb used by the query,
2380 required, kind, ancestor, props = (
2381 datastore_index.CompositeIndexForQuery(query))
2382 if not required:
2383 return []
2384 composite_index_pb = entity_pb.CompositeIndex()
2385 composite_index_pb.set_app_id(query.app())
2386 composite_index_pb.set_id(0)
2387 composite_index_pb.set_state(entity_pb.CompositeIndex.READ_WRITE)
2388 index_pb = composite_index_pb.mutable_definition()
2389 index_pb.set_entity_type(kind)
2390 index_pb.set_ancestor(bool(ancestor))
2391 for name, direction in datastore_index.GetRecommendedIndexProperties(props):
2392 prop_pb = entity_pb.Index_Property()
2393 prop_pb.set_name(name)
2394 prop_pb.set_direction(direction)
2395 index_pb.property_list().append(prop_pb)
2396 return [composite_index_pb]
2398 def Get(self, raw_keys, transaction=None, eventual_consistency=False,
2399 trusted=False, calling_app=None):
2400 """Get the entities for the given keys.
2402 Args:
2403 raw_keys: A list of unverified entity_pb.Reference objects.
2404 transaction: The datastore_pb.Transaction to use or None.
2405 eventual_consistency: If we should allow stale, potentially inconsistent
2406 results.
2407 trusted: If the calling app is trusted.
2408 calling_app: The app requesting the results or None to pull the app from
2409 the environment.
2411 Returns:
2412 A list containing the entity or None if no entity exists.
2415 if not raw_keys:
2416 return []
2418 calling_app = datastore_types.ResolveAppId(calling_app)
2420 if not transaction and eventual_consistency:
2422 result = []
2423 for key in raw_keys:
2424 CheckReference(calling_app, trusted, key)
2425 result.append(self._GetWithPseudoKinds(None, key))
2426 return result
2431 grouped_keys = collections.defaultdict(list)
2432 for i, key in enumerate(raw_keys):
2433 CheckReference(trusted, calling_app, key)
2434 entity_group = _GetEntityGroup(key)
2435 entity_group_key = datastore_types.ReferenceToKeyValue(entity_group)
2436 grouped_keys[entity_group_key].append((key, i))
2438 if transaction:
2440 txn = self.GetTxn(transaction, trusted, calling_app)
2441 return [self._GetWithPseudoKinds(txn, key) for key in raw_keys]
2442 else:
2445 result = [None] * len(raw_keys)
2447 def op(txn, v):
2448 key, i = v
2449 result[i] = self._GetWithPseudoKinds(txn, key)
2450 for keys in grouped_keys.itervalues():
2451 self._RunInTxn(keys, keys[0][0].app(), op)
2452 return result
2454 def _GetWithPseudoKinds(self, txn, key):
2455 """Fetch entity key in txn, taking account of pseudo-kinds."""
2456 pseudo_kind = self._pseudo_kinds.get(_GetKeyKind(key), None)
2457 if pseudo_kind:
2458 return pseudo_kind.Get(txn, key)
2459 elif txn:
2460 return txn.Get(key)
2461 else:
2462 return self._Get(key)
2464 def Put(self, raw_entities, cost, transaction=None,
2465 trusted=False, calling_app=None):
2466 """Writes the given given entities.
2468 Updates an entity's key and entity_group in place if needed
2470 Args:
2471 raw_entities: A list of unverified entity_pb.EntityProto objects.
2472 cost: Out param. The cost of putting the provided entities.
2473 transaction: The datastore_pb.Transaction to use or None.
2474 trusted: If the calling app is trusted.
2475 calling_app: The app requesting the results or None to pull the app from
2476 the environment.
2477 Returns:
2478 A list of entity_pb.Reference objects that indicates where each entity
2479 was stored.
2481 if not raw_entities:
2482 return []
2484 calling_app = datastore_types.ResolveAppId(calling_app)
2487 result = [None] * len(raw_entities)
2488 grouped_entities = collections.defaultdict(list)
2489 for i, raw_entity in enumerate(raw_entities):
2490 CheckEntity(trusted, calling_app, raw_entity)
2494 entity = entity_pb.EntityProto()
2495 entity.CopyFrom(raw_entity)
2498 for prop in itertools.chain(entity.property_list(),
2499 entity.raw_property_list()):
2500 FillUser(prop)
2502 last_element = entity.key().path().element_list()[-1]
2503 if not (last_element.id() or last_element.has_name()):
2504 insert = True
2507 if self._auto_id_policy == SEQUENTIAL:
2508 last_element.set_id(self._AllocateSequentialIds(entity.key())[0])
2509 else:
2510 full_key = self._AllocateIds([entity.key()])[0]
2511 last_element.set_id(full_key.path().element_list()[-1].id())
2512 else:
2513 insert = False
2515 entity_group = _GetEntityGroup(entity.key())
2516 entity.mutable_entity_group().CopyFrom(entity_group.path())
2517 entity_group_key = datastore_types.ReferenceToKeyValue(entity_group)
2518 grouped_entities[entity_group_key].append((entity, insert))
2522 key = entity_pb.Reference()
2523 key.CopyFrom(entity.key())
2524 result[i] = key
2526 if transaction:
2528 txn = self.GetTxn(transaction, trusted, calling_app)
2529 for group in grouped_entities.values():
2530 for entity, insert in group:
2532 indexes = _FilterIndexesByKind(entity.key(), self.GetIndexes(
2533 entity.key().app(), trusted, calling_app))
2534 txn.Put(entity, insert, indexes)
2535 else:
2537 for entities in grouped_entities.itervalues():
2538 txn_cost = self._RunInTxn(
2539 entities, entities[0][0].key().app(),
2541 lambda txn, v: txn.Put(v[0], v[1], _FilterIndexesByKind(
2542 v[0].key(),
2543 self.GetIndexes(v[0].key().app(), trusted, calling_app))))
2544 _UpdateCost(cost, txn_cost.entity_writes(), txn_cost.index_writes())
2545 return result
2547 def Delete(self, raw_keys, cost, transaction=None,
2548 trusted=False, calling_app=None):
2549 """Deletes the entities associated with the given keys.
2551 Args:
2552 raw_keys: A list of unverified entity_pb.Reference objects.
2553 cost: Out param. The cost of putting the provided entities.
2554 transaction: The datastore_pb.Transaction to use or None.
2555 trusted: If the calling app is trusted.
2556 calling_app: The app requesting the results or None to pull the app from
2557 the environment.
2559 if not raw_keys:
2560 return
2562 calling_app = datastore_types.ResolveAppId(calling_app)
2565 grouped_keys = collections.defaultdict(list)
2566 for key in raw_keys:
2567 CheckReference(trusted, calling_app, key)
2568 entity_group = _GetEntityGroup(key)
2569 entity_group_key = datastore_types.ReferenceToKeyValue(entity_group)
2570 grouped_keys[entity_group_key].append(key)
2572 if transaction:
2574 txn = self.GetTxn(transaction, trusted, calling_app)
2575 for key in raw_keys:
2577 indexes = _FilterIndexesByKind(key, self.GetIndexes(
2578 key.app(), trusted, calling_app))
2579 txn.Delete(key, indexes)
2580 else:
2582 for keys in grouped_keys.itervalues():
2584 txn_cost = self._RunInTxn(
2585 keys, keys[0].app(),
2586 lambda txn, key: txn.Delete(key, _FilterIndexesByKind(
2587 key, self.GetIndexes(key.app(), trusted, calling_app))))
2588 _UpdateCost(cost, txn_cost.entity_writes(), txn_cost.index_writes())
2590 def Touch(self, raw_keys, trusted=False, calling_app=None):
2591 """Applies all outstanding writes."""
2592 calling_app = datastore_types.ResolveAppId(calling_app)
2594 grouped_keys = collections.defaultdict(list)
2595 for key in raw_keys:
2596 CheckReference(trusted, calling_app, key)
2597 entity_group = _GetEntityGroup(key)
2598 entity_group_key = datastore_types.ReferenceToKeyValue(entity_group)
2599 grouped_keys[entity_group_key].append(key)
2601 for keys in grouped_keys.itervalues():
2602 self._RunInTxn(keys, keys[0].app(), lambda txn, key: None)
2604 def _RunInTxn(self, values, app, op):
2605 """Runs the given values in a separate Txn.
2607 Retries up to _RETRIES times on CONCURRENT_TRANSACTION errors.
2609 Args:
2610 values: A list of arguments to op.
2611 app: The app to create the Txn on.
2612 op: A function to run on each value in the Txn.
2614 Returns:
2615 The cost of the txn.
2617 retries = 0
2618 backoff = _INITIAL_RETRY_DELAY_MS / 1000.0
2619 while True:
2620 try:
2621 txn = self._BeginTransaction(app, False)
2622 for value in values:
2623 op(txn, value)
2624 return txn.Commit()
2625 except apiproxy_errors.ApplicationError, e:
2626 if e.application_error == datastore_pb.Error.CONCURRENT_TRANSACTION:
2628 retries += 1
2629 if retries <= _RETRIES:
2630 time.sleep(backoff)
2631 backoff *= _RETRY_DELAY_MULTIPLIER
2632 if backoff * 1000.0 > _MAX_RETRY_DELAY_MS:
2633 backoff = _MAX_RETRY_DELAY_MS / 1000.0
2634 continue
2635 raise
2637 def _CheckHasIndex(self, query, trusted=False, calling_app=None):
2638 """Checks if the query can be satisfied given the existing indexes.
2640 Args:
2641 query: the datastore_pb.Query to check
2642 trusted: True if the calling app is trusted (like dev_admin_console)
2643 calling_app: app_id of the current running application
2645 if query.kind() in self._pseudo_kinds or not self._require_indexes:
2646 return
2648 minimal_index = datastore_index.MinimalCompositeIndexForQuery(query,
2649 (datastore_index.ProtoToIndexDefinition(index)
2650 for index in self.GetIndexes(query.app(), trusted, calling_app)
2651 if index.state() == entity_pb.CompositeIndex.READ_WRITE))
2652 if minimal_index is not None:
2653 msg = ('This query requires a composite index that is not defined. '
2654 'You must update the index.yaml file in your application root.')
2655 is_most_efficient, kind, ancestor, properties = minimal_index
2656 if not is_most_efficient:
2658 yaml = datastore_index.IndexYamlForQuery(kind, ancestor,
2659 datastore_index.GetRecommendedIndexProperties(properties))
2660 msg += '\nThe following index is the minimum index required:\n' + yaml
2661 raise apiproxy_errors.ApplicationError(datastore_pb.Error.NEED_INDEX, msg)
2663 def SetAutoIdPolicy(self, auto_id_policy):
2664 """Set value of _auto_id_policy flag (default SEQUENTIAL).
2666 SEQUENTIAL auto ID assignment behavior will eventually be deprecated
2667 and the default will be SCATTERED.
2669 Args:
2670 auto_id_policy: string constant.
2671 Raises:
2672 TypeError: if auto_id_policy is not one of SEQUENTIAL or SCATTERED.
2674 valid_policies = (SEQUENTIAL, SCATTERED)
2675 if auto_id_policy not in valid_policies:
2676 raise TypeError('auto_id_policy must be in %s, found %s instead',
2677 valid_policies, auto_id_policy)
2678 self._auto_id_policy = auto_id_policy
2682 def Write(self):
2683 """Writes the datastore to disk."""
2684 self.Flush()
2686 def _GetQueryCursor(self, query, filters, orders, index_list,
2687 filter_predicate):
2688 """Runs the given datastore_pb.Query and returns a QueryCursor for it.
2690 This must be implemented by a sub-class. The sub-class does not need to
2691 enforced any consistency guarantees (and can just blindly read).
2693 Args:
2694 query: The datastore_pb.Query to run.
2695 filters: A list of filters that override the ones found on query.
2696 orders: A list of orders that override the ones found on query.
2697 index_list: A list of indexes used by the query.
2698 filter_predicate: an additional filter of type
2699 datastore_query.FilterPredicate. This is passed along to implement V4
2700 specific filters without changing the entire stub.
2702 Returns:
2703 A BaseCursor that can be used to fetch query results.
2705 raise NotImplementedError
2707 def _Get(self, reference):
2708 """Get the entity for the given reference or None.
2710 This must be implemented by a sub-class. The sub-class does not need to
2711 enforced any consistency guarantees (and can just blindly read).
2713 Args:
2714 reference: A entity_pb.Reference to loop up.
2716 Returns:
2717 The entity_pb.EntityProto associated with the given reference or None.
2719 raise NotImplementedError
2721 def _AllocateSequentialIds(self, reference, size=1, max_id=None):
2722 """Allocate sequential ids for given reference.
2724 Args:
2725 reference: An entity_pb.Reference to allocate an id for.
2726 size: The size of the range to allocate
2727 max_id: The upper bound of the range to allocate
2729 Returns:
2730 A tuple containing (min, max) of the allocated range.
2732 raise NotImplementedError
2734 def _AllocateIds(self, references):
2735 """Allocate or reserves IDs for the v4 datastore API.
2737 Incomplete keys are allocated scattered IDs. Complete keys have every id in
2738 their paths reserved in the appropriate ID space.
2740 Args:
2741 references: a list of entity_pb.Reference objects to allocate or reserve
2743 Returns:
2744 a list of complete entity_pb.Reference objects corresponding to the
2745 incomplete keys in the input, with newly allocated ids.
2747 raise NotImplementedError
2750 def _NeedsIndexes(func):
2751 """A decorator for DatastoreStub methods that require or affect indexes.
2753 Updates indexes to match index.yaml before the call and updates index.yaml
2754 after the call if require_indexes is False. If root_path is not set, this is a
2755 no op.
2758 def UpdateIndexesWrapper(self, *args, **kwargs):
2759 self._SetupIndexes()
2760 try:
2761 return func(self, *args, **kwargs)
2762 finally:
2763 self._UpdateIndexes()
2765 return UpdateIndexesWrapper
2768 class EntityGroupPseudoKind(object):
2769 """A common implementation of get() for the __entity_group__ pseudo-kind.
2771 Public properties:
2772 name: the pseudo-kind name
2774 name = '__entity_group__'
2784 base_version = int(time.time() * 1e6)
2786 def Get(self, txn, key):
2787 """Fetch key of this pseudo-kind within txn.
2789 Args:
2790 txn: transaction within which Get occurs, may be None if this is an
2791 eventually consistent Get.
2792 key: key of pseudo-entity to Get.
2794 Returns:
2795 An entity for key, or None if it doesn't exist.
2798 if not txn:
2799 txn = self._stub._BeginTransaction(key.app(), False)
2800 try:
2801 return self.Get(txn, key)
2802 finally:
2803 txn.Rollback()
2806 if isinstance(txn._txn_manager._consistency_policy,
2807 MasterSlaveConsistencyPolicy):
2808 return None
2815 path = key.path()
2816 if path.element_size() != 2 or path.element_list()[-1].id() != 1:
2817 return None
2819 tracker = txn._GetTracker(key)
2820 tracker._GrabSnapshot(txn._txn_manager)
2822 eg = entity_pb.EntityProto()
2823 eg.mutable_key().CopyFrom(key)
2824 eg.mutable_entity_group().CopyFrom(_GetEntityGroup(key).path())
2825 version = entity_pb.Property()
2826 version.set_name('__version__')
2827 version.set_multiple(False)
2828 version.mutable_value().set_int64value(
2829 tracker._read_pos + self.base_version)
2830 eg.property_list().append(version)
2831 return eg
2833 def Query(self, query, filters, orders):
2834 """Perform a query on this pseudo-kind.
2836 Args:
2837 query: the original datastore_pb.Query.
2838 filters: the filters from query.
2839 orders: the orders from query.
2841 Returns:
2842 always raises an error
2846 raise apiproxy_errors.ApplicationError(
2847 datastore_pb.Error.BAD_REQUEST, 'queries not supported on ' + self.name)
2850 class _CachedIndexDefinitions(object):
2851 """Records definitions read from index configuration files for later reuse.
2853 If the names and modification times of the configuration files are unchanged,
2854 then the index configurations previously parsed out of those files can be
2855 reused.
2857 Attributes:
2858 file_names: a list of the names of the configuration files. This will have
2859 one element when the configuration is based on an index.yaml but may have
2860 more than one if it is based on datastore-indexes.xml and
2861 datastore-indexes-auto.xml.
2862 last_modifieds: a list of floats that are the modification times of the
2863 files in file_names.
2864 index_protos: a list of entity_pb.CompositeIndex objects corresponding to
2865 the index definitions read from file_names.
2868 def __init__(self, file_names, last_modifieds, index_protos):
2870 assert len(file_names) <= 1
2871 self.file_names = file_names
2872 self.last_modifieds = last_modifieds
2873 self.index_protos = index_protos
2876 class DatastoreStub(object):
2877 """A stub that maps datastore service calls on to a BaseDatastore.
2879 This class also keeps track of query cursors.
2882 def __init__(self,
2883 datastore,
2884 app_id=None,
2885 trusted=None,
2886 root_path=None):
2887 super(DatastoreStub, self).__init__()
2888 self._datastore = datastore
2889 self._app_id = datastore_types.ResolveAppId(app_id)
2890 self._trusted = trusted
2891 self._root_path = root_path
2894 self.__query_history = {}
2897 self.__query_ci_history = set()
2901 self._cached_index_definitions = _CachedIndexDefinitions([], [], None)
2903 if self._require_indexes or root_path is None:
2905 self._index_yaml_updater = None
2906 else:
2908 self._index_yaml_updater = datastore_stub_index.IndexYamlUpdater(
2909 root_path)
2911 DatastoreStub.Clear(self)
2913 def Clear(self):
2914 """Clears out all stored values."""
2915 self._query_cursors = {}
2916 self.__query_history = {}
2917 self.__query_ci_history = set()
2919 def QueryHistory(self):
2920 """Returns a dict that maps Query PBs to times they've been run."""
2922 return dict((pb, times) for pb, times in self.__query_history.items()
2923 if pb.app() == self._app_id)
2925 def _QueryCompositeIndexHistoryLength(self):
2926 """Returns the length of the CompositeIndex set for query history."""
2927 return len(self.__query_ci_history)
2929 def SetTrusted(self, trusted):
2930 """Set/clear the trusted bit in the stub.
2932 This bit indicates that the app calling the stub is trusted. A
2933 trusted app can write to datastores of other apps.
2935 Args:
2936 trusted: boolean.
2938 self._trusted = trusted
2942 def _Dynamic_Get(self, req, res):
2945 transaction = req.has_transaction() and req.transaction() or None
2948 if req.allow_deferred() and req.key_size() > _MAXIMUM_RESULTS:
2952 keys_to_get = req.key_list()[-_MAXIMUM_RESULTS:]
2953 deferred_keys = req.key_list()[:-_MAXIMUM_RESULTS]
2954 res.deferred_list().extend(deferred_keys)
2955 else:
2957 keys_to_get = req.key_list()
2959 res.set_in_order(not req.allow_deferred())
2961 total_response_bytes = 0
2962 for index, entity in enumerate(self._datastore.Get(keys_to_get,
2963 transaction,
2964 req.has_failover_ms(),
2965 self._trusted,
2966 self._app_id)):
2967 entity_size = entity and entity.ByteSize() or 0
2970 if (req.allow_deferred()
2971 and index > 0
2972 and total_response_bytes + entity_size > _MAXIMUM_QUERY_RESULT_BYTES):
2974 res.deferred_list().extend(keys_to_get[index:])
2975 break
2976 elif entity:
2977 entity_result = res.add_entity()
2978 entity_result.mutable_entity().CopyFrom(entity)
2979 total_response_bytes += entity_size
2980 else:
2982 entity_result = res.add_entity()
2983 entity_result.mutable_key().CopyFrom(keys_to_get[index])
2985 def _Dynamic_Put(self, req, res):
2986 transaction = req.has_transaction() and req.transaction() or None
2987 res.key_list().extend(self._datastore.Put(req.entity_list(),
2988 res.mutable_cost(),
2989 transaction,
2990 self._trusted, self._app_id))
2992 def _Dynamic_Delete(self, req, res):
2993 transaction = req.has_transaction() and req.transaction() or None
2994 self._datastore.Delete(req.key_list(), res.mutable_cost(), transaction,
2995 self._trusted, self._app_id)
2997 def _Dynamic_Touch(self, req, _):
2998 self._datastore.Touch(req.key_list(), self._trusted, self._app_id)
3000 @_NeedsIndexes
3001 def _Dynamic_RunQuery(self, query, query_result, filter_predicate=None):
3002 self.__UpgradeCursors(query)
3003 cursor = self._datastore.GetQueryCursor(query, self._trusted, self._app_id,
3004 filter_predicate)
3006 if query.has_count():
3007 count = query.count()
3008 elif query.has_limit():
3009 count = query.limit()
3010 else:
3011 count = self._BATCH_SIZE
3013 cursor.PopulateQueryResult(query_result, count, query.offset(),
3014 query.compile(), first_result=True)
3015 if query_result.has_cursor():
3016 self._query_cursors[query_result.cursor().cursor()] = cursor
3019 if query.compile():
3022 compiled_query = query_result.mutable_compiled_query()
3023 compiled_query.set_keys_only(query.keys_only())
3024 compiled_query.mutable_primaryscan().set_index_name(query.Encode())
3025 self.__UpdateQueryHistory(query)
3027 def __UpgradeCursors(self, query):
3028 """Upgrades compiled cursors in place.
3030 If the cursor position does not specify before_ascending, populate it.
3031 If before_ascending is already populated, use it and the sort direction
3032 from the query to set an appropriate value for start_inclusive.
3034 Args:
3035 query: datastore_pb.Query
3037 first_sort_direction = None
3038 if query.order_list():
3039 first_sort_direction = query.order(0).direction()
3041 for compiled_cursor in [query.compiled_cursor(),
3042 query.end_compiled_cursor()]:
3043 self.__UpgradeCursor(compiled_cursor, first_sort_direction)
3045 def __UpgradeCursor(self, compiled_cursor, first_sort_direction):
3046 """Upgrades a compiled cursor in place.
3048 If the cursor position does not specify before_ascending, populate it.
3049 If before_ascending is already populated, use it and the provided direction
3050 to set an appropriate value for start_inclusive.
3052 Args:
3053 compiled_cursor: datastore_pb.CompiledCursor
3054 first_sort_direction: first sort direction from the query or None
3058 if not self.__IsPlannable(compiled_cursor):
3059 return
3060 elif compiled_cursor.position().has_before_ascending():
3061 _SetStartInclusive(compiled_cursor.position(), first_sort_direction)
3062 elif compiled_cursor.position().has_start_inclusive():
3063 _SetBeforeAscending(compiled_cursor.position(), first_sort_direction)
3065 def __IsPlannable(self, compiled_cursor):
3066 """Returns True if compiled_cursor is plannable.
3068 Args:
3069 compiled_cursor: datastore_pb.CompiledCursor
3071 position = compiled_cursor.position()
3072 return position.has_key() or position.indexvalue_list()
3074 def __UpdateQueryHistory(self, query):
3076 clone = datastore_pb.Query()
3077 clone.CopyFrom(query)
3078 clone.clear_hint()
3079 clone.clear_limit()
3080 clone.clear_offset()
3081 clone.clear_count()
3082 if clone in self.__query_history:
3083 self.__query_history[clone] += 1
3084 else:
3085 self.__query_history[clone] = 1
3086 if clone.app() == self._app_id:
3087 self.__query_ci_history.add(
3088 datastore_index.CompositeIndexForQuery(clone))
3090 def _Dynamic_Next(self, next_request, query_result):
3091 app = next_request.cursor().app()
3092 CheckAppId(self._trusted, self._app_id, app)
3094 cursor = self._query_cursors.get(next_request.cursor().cursor())
3095 Check(cursor and cursor.app == app,
3096 'Cursor %d not found' % next_request.cursor().cursor())
3098 count = self._BATCH_SIZE
3099 if next_request.has_count():
3100 count = next_request.count()
3102 cursor.PopulateQueryResult(query_result, count, next_request.offset(),
3103 next_request.compile(), first_result=False)
3105 if not query_result.has_cursor():
3106 del self._query_cursors[next_request.cursor().cursor()]
3108 def _Dynamic_AddActions(self, request, _):
3109 """Associates the creation of one or more tasks with a transaction.
3111 Args:
3112 request: A taskqueue_service_pb.TaskQueueBulkAddRequest containing the
3113 tasks that should be created when the transaction is committed.
3119 if not request.add_request_list():
3120 return
3122 transaction = request.add_request_list()[0].transaction()
3123 txn = self._datastore.GetTxn(transaction, self._trusted, self._app_id)
3124 new_actions = []
3125 for add_request in request.add_request_list():
3129 Check(add_request.transaction() == transaction,
3130 'Cannot add requests to different transactions')
3131 clone = taskqueue_service_pb.TaskQueueAddRequest()
3132 clone.CopyFrom(add_request)
3133 clone.clear_transaction()
3134 new_actions.append(clone)
3136 txn.AddActions(new_actions, self._MAX_ACTIONS_PER_TXN)
3138 def _Dynamic_BeginTransaction(self, req, transaction):
3139 CheckAppId(self._trusted, self._app_id, req.app())
3140 transaction.CopyFrom(self._datastore.BeginTransaction(
3141 req.app(), req.allow_multiple_eg()))
3143 def _Dynamic_Commit(self, transaction, res):
3144 CheckAppId(self._trusted, self._app_id, transaction.app())
3145 txn = self._datastore.GetTxn(transaction, self._trusted, self._app_id)
3146 res.mutable_cost().CopyFrom(txn.Commit())
3148 def _Dynamic_Rollback(self, transaction, _):
3149 CheckAppId(self._trusted, self._app_id, transaction.app())
3150 txn = self._datastore.GetTxn(transaction, self._trusted, self._app_id)
3151 txn.Rollback()
3153 def _Dynamic_CreateIndex(self, index, id_response):
3154 id_response.set_value(self._datastore.CreateIndex(index,
3155 self._trusted,
3156 self._app_id))
3158 @_NeedsIndexes
3159 def _Dynamic_GetIndices(self, app_str, composite_indices):
3160 composite_indices.index_list().extend(self._datastore.GetIndexes(
3161 app_str.value(), self._trusted, self._app_id))
3163 def _Dynamic_UpdateIndex(self, index, _):
3164 self._datastore.UpdateIndex(index, self._trusted, self._app_id)
3166 def _Dynamic_DeleteIndex(self, index, _):
3167 self._datastore.DeleteIndex(index, self._trusted, self._app_id)
3169 def _Dynamic_AllocateIds(self, allocate_ids_request, allocate_ids_response):
3170 Check(not allocate_ids_request.has_model_key()
3171 or not allocate_ids_request.reserve_list(),
3172 'Cannot allocate and reserve IDs in the same request')
3173 if allocate_ids_request.reserve_list():
3174 Check(not allocate_ids_request.has_size(),
3175 'Cannot specify size when reserving IDs')
3176 Check(not allocate_ids_request.has_max(),
3177 'Cannot specify max when reserving IDs')
3179 if allocate_ids_request.has_model_key():
3180 CheckAppId(allocate_ids_request.model_key().app(),
3181 self._trusted, self._app_id)
3183 reference = allocate_ids_request.model_key()
3185 (start, end) = self._datastore._AllocateSequentialIds(
3186 reference, allocate_ids_request.size(), allocate_ids_request.max())
3188 allocate_ids_response.set_start(start)
3189 allocate_ids_response.set_end(end)
3190 else:
3191 for reference in allocate_ids_request.reserve_list():
3192 CheckAppId(reference.app(), self._trusted, self._app_id)
3193 self._datastore._AllocateIds(allocate_ids_request.reserve_list())
3194 allocate_ids_response.set_start(0)
3195 allocate_ids_response.set_end(0)
3197 def _SetupIndexes(self, _open=open):
3198 """Ensure that the set of existing composite indexes matches index.yaml.
3200 Note: this is similar to the algorithm used by the admin console for
3201 the same purpose.
3206 if not self._root_path:
3207 return
3208 file_names = [os.path.join(self._root_path, 'index.yaml')]
3209 file_mtimes = [os.path.getmtime(f) for f in file_names if os.path.exists(f)]
3210 if (self._cached_index_definitions.file_names == file_names and
3211 all(os.path.exists(f) for f in file_names) and
3212 self._cached_index_definitions.last_modifieds == file_mtimes):
3213 requested_indexes = self._cached_index_definitions.index_protos
3214 else:
3215 file_mtimes = []
3216 index_texts = []
3217 for file_name in file_names:
3218 try:
3219 file_mtimes.append(os.path.getmtime(file_name))
3220 with _open(file_name, 'r') as fh:
3221 index_texts.append(fh.read())
3222 except (OSError, IOError):
3223 pass
3225 requested_indexes = []
3226 if len(index_texts) == len(file_names):
3227 all_ok = True
3228 for index_text in index_texts:
3230 index_defs = datastore_index.ParseIndexDefinitions(index_text)
3231 if index_defs is None or index_defs.indexes is None:
3232 all_ok = False
3233 else:
3235 requested_indexes.extend(
3236 datastore_index.IndexDefinitionsToProtos(
3237 self._app_id, index_defs.indexes))
3238 if all_ok:
3239 self._cached_index_definitions = _CachedIndexDefinitions(
3240 file_names, file_mtimes, requested_indexes)
3243 existing_indexes = self._datastore.GetIndexes(
3244 self._app_id, self._trusted, self._app_id)
3247 requested = dict((x.definition().Encode(), x) for x in requested_indexes)
3248 existing = dict((x.definition().Encode(), x) for x in existing_indexes)
3251 created = 0
3252 for key, index in requested.iteritems():
3253 if key not in existing:
3254 new_index = entity_pb.CompositeIndex()
3255 new_index.CopyFrom(index)
3256 new_index.set_id(datastore_admin.CreateIndex(new_index))
3257 new_index.set_state(entity_pb.CompositeIndex.READ_WRITE)
3258 datastore_admin.UpdateIndex(new_index)
3259 created += 1
3262 deleted = 0
3263 for key, index in existing.iteritems():
3264 if key not in requested:
3265 datastore_admin.DeleteIndex(index)
3266 deleted += 1
3269 if created or deleted:
3270 logging.debug('Created %d and deleted %d index(es); total %d',
3271 created, deleted, len(requested))
3273 def _UpdateIndexes(self):
3274 if self._index_yaml_updater is not None:
3275 self._index_yaml_updater.UpdateIndexYaml()
3278 class StubQueryConverter(object):
3279 """Converter for v3 and v4 queries suitable for use in stubs."""
3281 def __init__(self, entity_converter):
3282 self._entity_converter = entity_converter
3284 def v4_to_v3_compiled_cursor(self, v4_cursor, v3_compiled_cursor):
3285 """Converts a v4 cursor string to a v3 CompiledCursor.
3287 Args:
3288 v4_cursor: a string representing a v4 query cursor
3289 v3_compiled_cursor: a datastore_pb.CompiledCursor to populate
3291 v3_compiled_cursor.Clear()
3292 try:
3293 v3_compiled_cursor.ParseFromString(v4_cursor)
3294 except ProtocolBuffer.ProtocolBufferDecodeError:
3295 raise datastore_pbs.InvalidConversionError('Invalid query cursor.')
3297 def v3_to_v4_compiled_cursor(self, v3_compiled_cursor):
3298 """Converts a v3 CompiledCursor to a v4 cursor string.
3300 Args:
3301 v3_compiled_cursor: a datastore_pb.CompiledCursor
3303 Returns:
3304 a string representing a v4 query cursor
3306 return v3_compiled_cursor.SerializeToString()
3308 def v4_to_v3_query(self, v4_partition_id, v4_query, v3_query):
3309 """Converts a v4 Query to a v3 Query.
3311 Args:
3312 v4_partition_id: a datastore_v4_pb.PartitionId
3313 v4_query: a datastore_v4_pb.Query
3314 v3_query: a datastore_pb.Query to populate
3316 Raises:
3317 InvalidConversionError if the query cannot be converted
3319 v3_query.Clear()
3321 if v4_partition_id.dataset_id():
3322 v3_query.set_app(v4_partition_id.dataset_id())
3323 if v4_partition_id.has_namespace():
3324 v3_query.set_name_space(v4_partition_id.namespace())
3326 v3_query.set_persist_offset(True)
3327 v3_query.set_require_perfect_plan(True)
3328 v3_query.set_compile(True)
3331 if v4_query.has_limit():
3332 v3_query.set_limit(v4_query.limit())
3333 if v4_query.offset():
3334 v3_query.set_offset(v4_query.offset())
3335 if v4_query.has_start_cursor():
3336 self.v4_to_v3_compiled_cursor(v4_query.start_cursor(),
3337 v3_query.mutable_compiled_cursor())
3338 if v4_query.has_end_cursor():
3339 self.v4_to_v3_compiled_cursor(v4_query.end_cursor(),
3340 v3_query.mutable_end_compiled_cursor())
3343 if v4_query.kind_list():
3344 datastore_pbs.check_conversion(len(v4_query.kind_list()) == 1,
3345 'multiple kinds not supported')
3346 v3_query.set_kind(v4_query.kind(0).name())
3349 has_key_projection = False
3350 for prop in v4_query.projection_list():
3351 if prop.property().name() == datastore_pbs.PROPERTY_NAME_KEY:
3352 has_key_projection = True
3353 else:
3354 v3_query.add_property_name(prop.property().name())
3355 if has_key_projection and not v3_query.property_name_list():
3356 v3_query.set_keys_only(True)
3359 for prop in v4_query.group_by_list():
3360 v3_query.add_group_by_property_name(prop.name())
3363 self.__populate_v3_filters(v4_query.filter(), v3_query)
3366 for v4_order in v4_query.order_list():
3367 v3_order = v3_query.add_order()
3368 v3_order.set_property(v4_order.property().name())
3369 if v4_order.has_direction():
3370 v3_order.set_direction(v4_order.direction())
3372 def v3_to_v4_query(self, v3_query, v4_query):
3373 """Converts a v3 Query to a v4 Query.
3375 Args:
3376 v3_query: a datastore_pb.Query
3377 v4_query: a datastore_v4_pb.Query to populate
3379 Raises:
3380 InvalidConversionError if the query cannot be converted
3382 v4_query.Clear()
3384 datastore_pbs.check_conversion(not v3_query.has_distinct(),
3385 'distinct option not supported')
3386 datastore_pbs.check_conversion(v3_query.require_perfect_plan(),
3387 'non-perfect plans not supported')
3391 if v3_query.has_limit():
3392 v4_query.set_limit(v3_query.limit())
3393 if v3_query.offset():
3394 v4_query.set_offset(v3_query.offset())
3395 if v3_query.has_compiled_cursor():
3396 v4_query.set_start_cursor(
3397 self.v3_to_v4_compiled_cursor(v3_query.compiled_cursor()))
3398 if v3_query.has_end_compiled_cursor():
3399 v4_query.set_end_cursor(
3400 self.v3_to_v4_compiled_cursor(v3_query.end_compiled_cursor()))
3403 if v3_query.has_kind():
3404 v4_query.add_kind().set_name(v3_query.kind())
3407 for name in v3_query.property_name_list():
3408 v4_query.add_projection().mutable_property().set_name(name)
3409 if v3_query.keys_only():
3410 v4_query.add_projection().mutable_property().set_name(
3411 datastore_pbs.PROPERTY_NAME_KEY)
3414 for name in v3_query.group_by_property_name_list():
3415 v4_query.add_group_by().set_name(name)
3418 num_v4_filters = len(v3_query.filter_list())
3419 if v3_query.has_ancestor():
3420 num_v4_filters += 1
3422 if num_v4_filters == 1:
3423 get_property_filter = self.__get_property_filter
3424 elif num_v4_filters >= 1:
3425 v4_query.mutable_filter().mutable_composite_filter().set_operator(
3426 datastore_v4_pb.CompositeFilter.AND)
3427 get_property_filter = self.__add_property_filter
3429 if v3_query.has_ancestor():
3430 self.__v3_query_to_v4_ancestor_filter(v3_query,
3431 get_property_filter(v4_query))
3432 for v3_filter in v3_query.filter_list():
3433 self.__v3_filter_to_v4_property_filter(v3_filter,
3434 get_property_filter(v4_query))
3437 for v3_order in v3_query.order_list():
3438 v4_order = v4_query.add_order()
3439 v4_order.mutable_property().set_name(v3_order.property())
3440 if v3_order.has_direction():
3441 v4_order.set_direction(v3_order.direction())
3443 def __get_property_filter(self, v4_query):
3444 """Returns the PropertyFilter from the query's top-level filter."""
3445 return v4_query.mutable_filter().mutable_property_filter()
3447 def __add_property_filter(self, v4_query):
3448 """Adds and returns a PropertyFilter from the query's composite filter."""
3449 v4_comp_filter = v4_query.mutable_filter().mutable_composite_filter()
3450 return v4_comp_filter.add_filter().mutable_property_filter()
3452 def __populate_v3_filters(self, v4_filter, v3_query):
3453 """Populates a filters for a v3 Query.
3455 Args:
3456 v4_filter: a datastore_v4_pb.Filter
3457 v3_query: a datastore_pb.Query to populate with filters
3460 datastore_pbs.check_conversion(not v4_filter.has_bounding_circle_filter(),
3461 'bounding circle filter not supported')
3462 datastore_pbs.check_conversion(not v4_filter.has_bounding_box_filter(),
3463 'bounding box filter not supported')
3465 if v4_filter.has_property_filter():
3466 v4_property_filter = v4_filter.property_filter()
3467 if (v4_property_filter.operator()
3468 == datastore_v4_pb.PropertyFilter.HAS_ANCESTOR):
3469 datastore_pbs.check_conversion(
3470 v4_property_filter.value().has_key_value(),
3471 'HAS_ANCESTOR requires a reference value')
3472 datastore_pbs.check_conversion((v4_property_filter.property().name()
3473 == datastore_pbs.PROPERTY_NAME_KEY),
3474 'unsupported property')
3475 datastore_pbs.check_conversion(not v3_query.has_ancestor(),
3476 'duplicate ancestor constraint')
3477 self._entity_converter.v4_to_v3_reference(
3478 v4_property_filter.value().key_value(),
3479 v3_query.mutable_ancestor())
3480 else:
3481 v3_filter = v3_query.add_filter()
3482 property_name = v4_property_filter.property().name()
3483 v3_filter.set_op(v4_property_filter.operator())
3484 datastore_pbs.check_conversion(
3485 not v4_property_filter.value().list_value_list(),
3486 ('unsupported value type, %s, in property filter'
3487 ' on "%s"' % ('list_value', property_name)))
3488 prop = v3_filter.add_property()
3489 prop.set_multiple(False)
3490 prop.set_name(property_name)
3491 self._entity_converter.v4_value_to_v3_property_value(
3492 v4_property_filter.value(), prop.mutable_value())
3493 elif v4_filter.has_composite_filter():
3494 datastore_pbs.check_conversion((v4_filter.composite_filter().operator()
3495 == datastore_v4_pb.CompositeFilter.AND),
3496 'unsupported composite property operator')
3497 for v4_sub_filter in v4_filter.composite_filter().filter_list():
3498 self.__populate_v3_filters(v4_sub_filter, v3_query)
3500 def __v3_filter_to_v4_property_filter(self, v3_filter, v4_property_filter):
3501 """Converts a v3 Filter to a v4 PropertyFilter.
3503 Args:
3504 v3_filter: a datastore_pb.Filter
3505 v4_property_filter: a datastore_v4_pb.PropertyFilter to populate
3507 Raises:
3508 InvalidConversionError if the filter cannot be converted
3510 datastore_pbs.check_conversion(v3_filter.property_size() == 1,
3511 'invalid filter')
3512 datastore_pbs.check_conversion(v3_filter.op() <= 5,
3513 'unsupported filter op: %d' % v3_filter.op())
3514 v4_property_filter.Clear()
3515 v4_property_filter.set_operator(v3_filter.op())
3516 v4_property_filter.mutable_property().set_name(v3_filter.property(0).name())
3517 self._entity_converter.v3_property_to_v4_value(
3518 v3_filter.property(0), True, v4_property_filter.mutable_value())
3520 def __v3_query_to_v4_ancestor_filter(self, v3_query, v4_property_filter):
3521 """Converts a v3 Query to a v4 ancestor PropertyFilter.
3523 Args:
3524 v3_query: a datastore_pb.Query
3525 v4_property_filter: a datastore_v4_pb.PropertyFilter to populate
3527 v4_property_filter.Clear()
3528 v4_property_filter.set_operator(
3529 datastore_v4_pb.PropertyFilter.HAS_ANCESTOR)
3530 prop = v4_property_filter.mutable_property()
3531 prop.set_name(datastore_pbs.PROPERTY_NAME_KEY)
3532 self._entity_converter.v3_to_v4_key(
3533 v3_query.ancestor(),
3534 v4_property_filter.mutable_value().mutable_key_value())
3538 __query_converter = StubQueryConverter(datastore_pbs.get_entity_converter())
3541 def get_query_converter():
3542 """Returns a converter for v3 and v4 queries (not suitable for production).
3544 This converter is suitable for use in stubs but not for production.
3546 Returns:
3547 a StubQueryConverter
3549 return __query_converter
3552 class StubServiceConverter(object):
3553 """Converter for v3/v4 request/response protos suitable for use in stubs."""
3555 def __init__(self, entity_converter, query_converter):
3556 self._entity_converter = entity_converter
3557 self._query_converter = query_converter
3559 def v4_to_v3_cursor(self, v4_query_handle, v3_cursor):
3560 """Converts a v4 cursor string to a v3 Cursor.
3562 Args:
3563 v4_query_handle: a string representing a v4 query handle
3564 v3_cursor: a datastore_pb.Cursor to populate
3566 try:
3567 v3_cursor.ParseFromString(v4_query_handle)
3568 except ProtocolBuffer.ProtocolBufferDecodeError:
3569 raise datastore_pbs.InvalidConversionError('Invalid query handle.')
3570 return v3_cursor
3572 def _v3_to_v4_query_handle(self, v3_cursor):
3573 """Converts a v3 Cursor to a v4 query handle string.
3575 Args:
3576 v3_cursor: a datastore_pb.Cursor
3578 Returns:
3579 a string representing a v4 cursor
3581 return v3_cursor.SerializeToString()
3583 def v4_to_v3_txn(self, v4_txn, v3_txn):
3584 """Converts a v4 transaction string to a v3 Transaction.
3586 Args:
3587 v4_txn: a string representing a v4 transaction
3588 v3_txn: a datastore_pb.Transaction to populate
3590 try:
3591 v3_txn.ParseFromString(v4_txn)
3592 except ProtocolBuffer.ProtocolBufferDecodeError:
3593 raise datastore_pbs.InvalidConversionError('Invalid transaction.')
3594 return v3_txn
3596 def _v3_to_v4_txn(self, v3_txn):
3597 """Converts a v3 Transaction to a v4 transaction string.
3599 Args:
3600 v3_txn: a datastore_pb.Transaction
3602 Returns:
3603 a string representing a v4 transaction
3605 return v3_txn.SerializeToString()
3610 def v4_to_v3_begin_transaction_req(self, app_id, v4_req):
3611 """Converts a v4 BeginTransactionRequest to a v3 BeginTransactionRequest.
3613 Args:
3614 app_id: app id
3615 v4_req: a datastore_v4_pb.BeginTransactionRequest
3617 Returns:
3618 a datastore_pb.BeginTransactionRequest
3620 v3_req = datastore_pb.BeginTransactionRequest()
3621 v3_req.set_app(app_id)
3622 v3_req.set_allow_multiple_eg(v4_req.cross_group())
3623 return v3_req
3625 def v3_to_v4_begin_transaction_resp(self, v3_resp):
3626 """Converts a v3 Transaction to a v4 BeginTransactionResponse.
3628 Args:
3629 v3_resp: a datastore_pb.Transaction
3631 Returns:
3632 a datastore_v4_pb.BeginTransactionResponse
3634 v4_resp = datastore_v4_pb.BeginTransactionResponse()
3635 v4_resp.set_transaction(self._v3_to_v4_txn(v3_resp))
3636 return v4_resp
3641 def v4_rollback_req_to_v3_txn(self, v4_req):
3642 """Converts a v4 RollbackRequest to a v3 Transaction.
3644 Args:
3645 v4_req: a datastore_v4_pb.RollbackRequest
3647 Returns:
3648 a datastore_pb.Transaction
3650 v3_txn = datastore_pb.Transaction()
3651 self.v4_to_v3_txn(v4_req.transaction(), v3_txn)
3652 return v3_txn
3657 def v4_commit_req_to_v3_txn(self, v4_req):
3658 """Converts a v4 CommitRequest to a v3 Transaction.
3660 Args:
3661 v4_req: a datastore_v4_pb.CommitRequest
3663 Returns:
3664 a datastore_pb.Transaction
3666 v3_txn = datastore_pb.Transaction()
3667 self.v4_to_v3_txn(v4_req.transaction(), v3_txn)
3668 return v3_txn
3673 def v4_run_query_req_to_v3_query(self, v4_req):
3674 """Converts a v4 RunQueryRequest to a v3 Query.
3676 GQL is not supported.
3678 Args:
3679 v4_req: a datastore_v4_pb.RunQueryRequest
3681 Returns:
3682 a datastore_pb.Query
3685 datastore_pbs.check_conversion(not v4_req.has_gql_query(),
3686 'GQL not supported')
3687 v3_query = datastore_pb.Query()
3688 self._query_converter.v4_to_v3_query(v4_req.partition_id(), v4_req.query(),
3689 v3_query)
3692 if v4_req.has_suggested_batch_size():
3693 v3_query.set_count(v4_req.suggested_batch_size())
3696 read_options = v4_req.read_options()
3697 if read_options.has_transaction():
3698 self.v4_to_v3_txn(read_options.transaction(),
3699 v3_query.mutable_transaction())
3700 elif (read_options.read_consistency()
3701 == datastore_v4_pb.ReadOptions.EVENTUAL):
3702 v3_query.set_strong(False)
3703 v3_query.set_failover_ms(-1)
3704 elif read_options.read_consistency() == datastore_v4_pb.ReadOptions.STRONG:
3705 v3_query.set_strong(True)
3707 if v4_req.has_min_safe_time_seconds():
3708 v3_query.set_min_safe_time_seconds(v4_req.min_safe_time_seconds())
3710 return v3_query
3712 def v3_to_v4_run_query_req(self, v3_req):
3713 """Converts a v3 Query to a v4 RunQueryRequest.
3715 Args:
3716 v3_req: a datastore_pb.Query
3718 Returns:
3719 a datastore_v4_pb.RunQueryRequest
3721 v4_req = datastore_v4_pb.RunQueryRequest()
3724 v4_partition_id = v4_req.mutable_partition_id()
3725 v4_partition_id.set_dataset_id(v3_req.app())
3726 if v3_req.name_space():
3727 v4_partition_id.set_namespace(v3_req.name_space())
3730 if v3_req.has_count():
3731 v4_req.set_suggested_batch_size(v3_req.count())
3733 datastore_pbs.check_conversion(
3734 not (v3_req.has_transaction() and v3_req.has_failover_ms()),
3735 'Cannot set failover and transaction handle.')
3738 if v3_req.has_transaction():
3739 v4_req.mutable_read_options().set_transaction(
3740 self._v3_to_v4_txn(v3_req.transaction()))
3741 elif v3_req.strong():
3742 v4_req.mutable_read_options().set_read_consistency(
3743 datastore_v4_pb.ReadOptions.STRONG)
3744 elif v3_req.has_failover_ms():
3745 v4_req.mutable_read_options().set_read_consistency(
3746 datastore_v4_pb.ReadOptions.EVENTUAL)
3747 if v3_req.has_min_safe_time_seconds():
3748 v4_req.set_min_safe_time_seconds(v3_req.min_safe_time_seconds())
3750 self._query_converter.v3_to_v4_query(v3_req, v4_req.mutable_query())
3752 return v4_req
3754 def v4_run_query_resp_to_v3_query_result(self, v4_resp):
3755 """Converts a V4 RunQueryResponse to a v3 QueryResult.
3757 Args:
3758 v4_resp: a datastore_v4_pb.QueryResult
3760 Returns:
3761 a datastore_pb.QueryResult
3763 v3_resp = self.v4_to_v3_query_result(v4_resp.batch())
3766 if v4_resp.has_query_handle():
3767 self.v4_to_v3_cursor(v4_resp.query_handle(), v3_resp.mutable_cursor())
3769 return v3_resp
3771 def v3_to_v4_run_query_resp(self, v3_resp):
3772 """Converts a v3 QueryResult to a V4 RunQueryResponse.
3774 Args:
3775 v3_resp: a datastore_pb.QueryResult
3777 Returns:
3778 a datastore_v4_pb.RunQueryResponse
3780 v4_resp = datastore_v4_pb.RunQueryResponse()
3781 self.v3_to_v4_query_result_batch(v3_resp, v4_resp.mutable_batch())
3783 if v3_resp.has_cursor():
3784 v4_resp.set_query_handle(
3785 self._query_converter.v3_to_v4_compiled_cursor(v3_resp.cursor()))
3787 return v4_resp
3792 def v4_to_v3_next_req(self, v4_req):
3793 """Converts a v4 ContinueQueryRequest to a v3 NextRequest.
3795 Args:
3796 v4_req: a datastore_v4_pb.ContinueQueryRequest
3798 Returns:
3799 a datastore_pb.NextRequest
3801 v3_req = datastore_pb.NextRequest()
3802 v3_req.set_compile(True)
3803 self.v4_to_v3_cursor(v4_req.query_handle(), v3_req.mutable_cursor())
3804 return v3_req
3806 def v3_to_v4_continue_query_resp(self, v3_resp):
3807 """Converts a v3 QueryResult to a v4 ContinueQueryResponse.
3809 Args:
3810 v3_resp: a datstore_pb.QueryResult
3812 Returns:
3813 a datastore_v4_pb.ContinueQueryResponse
3815 v4_resp = datastore_v4_pb.ContinueQueryResponse()
3816 self.v3_to_v4_query_result_batch(v3_resp, v4_resp.mutable_batch())
3817 return v4_resp
3822 def v4_to_v3_get_req(self, v4_req):
3823 """Converts a v4 LookupRequest to a v3 GetRequest.
3825 Args:
3826 v4_req: a datastore_v4_pb.LookupRequest
3828 Returns:
3829 a datastore_pb.GetRequest
3831 v3_req = datastore_pb.GetRequest()
3832 v3_req.set_allow_deferred(True)
3835 if v4_req.read_options().has_transaction():
3836 self.v4_to_v3_txn(v4_req.read_options().transaction(),
3837 v3_req.mutable_transaction())
3838 elif (v4_req.read_options().read_consistency()
3839 == datastore_v4_pb.ReadOptions.EVENTUAL):
3840 v3_req.set_strong(False)
3841 v3_req.set_failover_ms(-1)
3842 elif (v4_req.read_options().read_consistency()
3843 == datastore_v4_pb.ReadOptions.STRONG):
3844 v3_req.set_strong(True)
3846 for v4_key in v4_req.key_list():
3847 self._entity_converter.v4_to_v3_reference(v4_key, v3_req.add_key())
3849 return v3_req
3851 def v3_to_v4_lookup_resp(self, v3_resp):
3852 """Converts a v3 GetResponse to a v4 LookupResponse.
3854 Args:
3855 v3_resp: a datastore_pb.GetResponse
3857 Returns:
3858 a datastore_v4_pb.LookupResponse
3860 v4_resp = datastore_v4_pb.LookupResponse()
3862 for v3_ref in v3_resp.deferred_list():
3863 self._entity_converter.v3_to_v4_key(v3_ref, v4_resp.add_deferred())
3864 for v3_entity in v3_resp.entity_list():
3865 if v3_entity.has_entity():
3866 self._entity_converter.v3_to_v4_entity(
3867 v3_entity.entity(),
3868 v4_resp.add_found().mutable_entity())
3869 if v3_entity.has_key():
3870 self._entity_converter.v3_to_v4_key(
3871 v3_entity.key(),
3872 v4_resp.add_missing().mutable_entity().mutable_key())
3874 return v4_resp
3876 def v4_to_v3_query_result(self, v4_batch):
3877 """Converts a v4 QueryResultBatch to a v3 QueryResult.
3879 Args:
3880 v4_batch: a datastore_v4_pb.QueryResultBatch
3882 Returns:
3883 a datastore_pb.QueryResult
3885 v3_result = datastore_pb.QueryResult()
3888 v3_result.set_more_results(
3889 (v4_batch.more_results()
3890 == datastore_v4_pb.QueryResultBatch.NOT_FINISHED))
3891 if v4_batch.has_end_cursor():
3892 self._query_converter.v4_to_v3_compiled_cursor(
3893 v4_batch.end_cursor(), v3_result.mutable_compiled_cursor())
3894 if v4_batch.has_skipped_cursor():
3895 self._query_converter.v4_to_v3_compiled_cursor(
3896 v4_batch.skipped_cursor(),
3897 v3_result.mutable_skipped_results_compiled_cursor())
3900 if v4_batch.entity_result_type() == datastore_v4_pb.EntityResult.PROJECTION:
3901 v3_result.set_index_only(True)
3902 elif v4_batch.entity_result_type() == datastore_v4_pb.EntityResult.KEY_ONLY:
3903 v3_result.set_keys_only(True)
3906 if v4_batch.has_skipped_results():
3907 v3_result.set_skipped_results(v4_batch.skipped_results())
3908 for v4_entity in v4_batch.entity_result_list():
3909 v3_entity = v3_result.add_result()
3910 self._entity_converter.v4_to_v3_entity(v4_entity.entity(), v3_entity)
3911 if v4_entity.has_cursor():
3912 cursor = v3_result.add_result_compiled_cursor()
3913 self._query_converter.v4_to_v3_compiled_cursor(v4_entity.cursor(),
3914 cursor)
3915 if v4_batch.entity_result_type() != datastore_v4_pb.EntityResult.FULL:
3918 v3_entity.clear_entity_group()
3920 return v3_result
3922 def v3_to_v4_query_result_batch(self, v3_result, v4_batch):
3923 """Converts a v3 QueryResult to a v4 QueryResultBatch.
3925 Args:
3926 v3_result: a datastore_pb.QueryResult
3927 v4_batch: a datastore_v4_pb.QueryResultBatch to populate
3929 v4_batch.Clear()
3932 if v3_result.more_results():
3933 v4_batch.set_more_results(datastore_v4_pb.QueryResultBatch.NOT_FINISHED)
3934 else:
3935 v4_batch.set_more_results(
3936 datastore_v4_pb.QueryResultBatch.MORE_RESULTS_AFTER_LIMIT)
3937 if v3_result.has_compiled_cursor():
3938 v4_batch.set_end_cursor(
3939 self._query_converter.v3_to_v4_compiled_cursor(
3940 v3_result.compiled_cursor()))
3941 if v3_result.has_skipped_results_compiled_cursor():
3942 v4_batch.set_skipped_cursor(
3943 self._query_converter.v3_to_v4_compiled_cursor(
3944 v3_result.skipped_results_compiled_cursor()))
3947 if v3_result.keys_only():
3948 v4_batch.set_entity_result_type(datastore_v4_pb.EntityResult.KEY_ONLY)
3949 elif v3_result.index_only():
3950 v4_batch.set_entity_result_type(datastore_v4_pb.EntityResult.PROJECTION)
3951 else:
3952 v4_batch.set_entity_result_type(datastore_v4_pb.EntityResult.FULL)
3955 if v3_result.has_skipped_results():
3956 v4_batch.set_skipped_results(v3_result.skipped_results())
3957 for v3_entity, v3_cursor in itertools.izip_longest(
3958 v3_result.result_list(),
3959 v3_result.result_compiled_cursor_list()):
3960 v4_entity_result = datastore_v4_pb.EntityResult()
3961 self._entity_converter.v3_to_v4_entity(v3_entity,
3962 v4_entity_result.mutable_entity())
3963 if v3_cursor is not None:
3964 v4_entity_result.set_cursor(
3965 self._query_converter.v3_to_v4_compiled_cursor(v3_cursor))
3966 v4_batch.entity_result_list().append(v4_entity_result)
3970 __service_converter = StubServiceConverter(
3971 datastore_pbs.get_entity_converter(), __query_converter)
3974 def get_service_converter():
3975 """Returns a converter for v3 and v4 service request/response protos.
3977 This converter is suitable for use in stubs but not for production.
3979 Returns:
3980 a StubServiceConverter
3982 return __service_converter
3985 def ReverseBitsInt64(v):
3986 """Reverse the bits of a 64-bit integer.
3988 Args:
3989 v: Input integer of type 'int' or 'long'.
3991 Returns:
3992 Bit-reversed input as 'int' on 64-bit machines or as 'long' otherwise.
3995 v = ((v >> 1) & 0x5555555555555555) | ((v & 0x5555555555555555) << 1)
3996 v = ((v >> 2) & 0x3333333333333333) | ((v & 0x3333333333333333) << 2)
3997 v = ((v >> 4) & 0x0F0F0F0F0F0F0F0F) | ((v & 0x0F0F0F0F0F0F0F0F) << 4)
3998 v = ((v >> 8) & 0x00FF00FF00FF00FF) | ((v & 0x00FF00FF00FF00FF) << 8)
3999 v = ((v >> 16) & 0x0000FFFF0000FFFF) | ((v & 0x0000FFFF0000FFFF) << 16)
4000 v = int((v >> 32) | (v << 32) & 0xFFFFFFFFFFFFFFFF)
4001 return v
4004 def ToScatteredId(v):
4005 """Map counter value v to the scattered ID space.
4007 Translate to scattered ID space, then reverse bits.
4009 Args:
4010 v: Counter value from which to produce ID.
4012 Returns:
4013 Integer ID.
4015 Raises:
4016 datastore_errors.BadArgumentError if counter value exceeds the range of
4017 the scattered ID space.
4019 if v >= _MAX_SCATTERED_COUNTER:
4020 raise datastore_errors.BadArgumentError('counter value too large (%d)' %v)
4021 return _MAX_SEQUENTIAL_ID + 1 + long(ReverseBitsInt64(v << _SCATTER_SHIFT))
4024 def IdToCounter(k):
4025 """Map ID k to the counter value from which it was generated.
4027 Determine whether k is sequential or scattered ID.
4029 Args:
4030 k: ID from which to infer counter value.
4032 Returns:
4033 Tuple of integers (counter_value, id_space).
4035 if k > _MAX_SCATTERED_ID:
4036 return 0, SCATTERED
4037 elif k > _MAX_SEQUENTIAL_ID and k <= _MAX_SCATTERED_ID:
4038 return long(ReverseBitsInt64(k) >> _SCATTER_SHIFT), SCATTERED
4039 elif k > 0:
4040 return long(k), SEQUENTIAL
4041 else:
4042 raise datastore_errors.BadArgumentError('invalid id (%d)' % k)
4045 def CompareEntityPbByKey(a, b):
4046 """Compare two entity protobuf's by key.
4048 Args:
4049 a: entity_pb.EntityProto to compare
4050 b: entity_pb.EntityProto to compare
4051 Returns:
4052 <0 if a's key is before b's, =0 if they are the same key, and >0 otherwise.
4054 return cmp(datastore_types.Key._FromPb(a.key()),
4055 datastore_types.Key._FromPb(b.key()))
4058 def _GuessOrders(filters, orders):
4059 """Guess any implicit ordering.
4061 The datastore gives a logical, but not necessarily predictable, ordering when
4062 orders are not completely explicit. This function guesses at that ordering
4063 (which is better then always ordering by __key__ for tests).
4065 Args:
4066 filters: The datastore_pb.Query_Filter that have already been normalized and
4067 checked.
4068 orders: The datastore_pb.Query_Order that have already been normalized and
4069 checked. Mutated in place.
4071 orders = orders[:]
4074 if not orders:
4075 for filter_pb in filters:
4076 if filter_pb.op() in datastore_index.INEQUALITY_OPERATORS:
4078 order = datastore_pb.Query_Order()
4079 order.set_property(filter_pb.property(0).name())
4080 orders.append(order)
4081 break
4084 exists_props = (filter_pb.property(0).name() for filter_pb in filters
4085 if filter_pb.op() == datastore_pb.Query_Filter.EXISTS)
4086 for prop in sorted(exists_props):
4087 order = datastore_pb.Query_Order()
4088 order.set_property(prop)
4089 orders.append(order)
4092 if not orders or orders[-1].property() != '__key__':
4093 order = datastore_pb.Query_Order()
4094 order.set_property('__key__')
4095 orders.append(order)
4096 return orders
4099 def _MakeQuery(query_pb, filters, orders, filter_predicate):
4100 """Make a datastore_query.Query for the given datastore_pb.Query.
4102 Overrides filters and orders in query with the specified arguments.
4104 Args:
4105 query_pb: a datastore_pb.Query.
4106 filters: the filters from query.
4107 orders: the orders from query.
4108 filter_predicate: an additional filter of type
4109 datastore_query.FilterPredicate. This is passed along to implement V4
4110 specific filters without changing the entire stub.
4112 Returns:
4113 A datastore_query.Query for the datastore_pb.Query."""
4119 clone_pb = datastore_pb.Query()
4120 clone_pb.CopyFrom(query_pb)
4121 clone_pb.clear_filter()
4122 clone_pb.clear_order()
4123 clone_pb.filter_list().extend(filters)
4124 clone_pb.order_list().extend(orders)
4126 query = datastore_query.Query._from_pb(clone_pb)
4128 assert datastore_v4_pb.CompositeFilter._Operator_NAMES.values() == ['AND']
4133 if filter_predicate is not None:
4134 if query.filter_predicate is not None:
4137 filter_predicate = datastore_query.CompositeFilter(
4138 datastore_query.CompositeFilter.AND,
4139 [filter_predicate, query.filter_predicate])
4141 return datastore_query.Query(app=query.app,
4142 namespace=query.namespace,
4143 ancestor=query.ancestor,
4144 filter_predicate=filter_predicate,
4145 group_by=query.group_by,
4146 order=query.order)
4147 else:
4148 return query
4150 def _CreateIndexEntities(entity, postfix_props):
4151 """Creates entities for index values that would appear in prodcution.
4153 This function finds all multi-valued properties listed in split_props, and
4154 creates a new entity for each unique combination of values. The resulting
4155 entities will only have a single value for each property listed in
4156 split_props.
4158 It reserves the right to include index data that would not be
4159 seen in production, e.g. by returning the original entity when no splitting
4160 is needed. LoadEntity will remove any excess fields.
4162 This simulates the results seen by an index scan in the datastore.
4164 Args:
4165 entity: The entity_pb.EntityProto to split.
4166 split_props: A set of property names to split on.
4168 Returns:
4169 A list of the split entity_pb.EntityProtos.
4171 to_split = {}
4172 split_required = False
4173 base_props = []
4174 for prop in entity.property_list():
4175 if prop.name() in postfix_props:
4176 values = to_split.get(prop.name())
4177 if values is None:
4178 values = []
4179 to_split[prop.name()] = values
4180 else:
4182 split_required = True
4183 if prop.value() not in values:
4184 values.append(prop.value())
4185 else:
4186 base_props.append(prop)
4188 if not split_required:
4190 return [entity]
4192 clone = entity_pb.EntityProto()
4193 clone.CopyFrom(entity)
4194 clone.clear_property()
4195 clone.property_list().extend(base_props)
4196 results = [clone]
4198 for name, splits in to_split.iteritems():
4199 if len(splits) == 1:
4201 for result in results:
4202 prop = result.add_property()
4203 prop.set_name(name)
4204 prop.set_multiple(False)
4205 prop.set_meaning(entity_pb.Property.INDEX_VALUE)
4206 prop.mutable_value().CopyFrom(splits[0])
4207 continue
4209 new_results = []
4210 for result in results:
4211 for split in splits:
4212 clone = entity_pb.EntityProto()
4213 clone.CopyFrom(result)
4214 prop = clone.add_property()
4215 prop.set_name(name)
4216 prop.set_multiple(False)
4217 prop.set_meaning(entity_pb.Property.INDEX_VALUE)
4218 prop.mutable_value().CopyFrom(split)
4219 new_results.append(clone)
4220 results = new_results
4221 return results
4224 def _CreateIndexOnlyQueryResults(results, postfix_props):
4225 """Creates a result set similar to that returned by an index only query."""
4226 new_results = []
4227 for result in results:
4228 new_results.extend(_CreateIndexEntities(result, postfix_props))
4229 return new_results
4232 def _ExecuteQuery(results, query, filters, orders, index_list,
4233 filter_predicate=None):
4234 """Executes the query on a superset of its results.
4236 Args:
4237 results: superset of results for query.
4238 query: a datastore_pb.Query.
4239 filters: the filters from query.
4240 orders: the orders from query.
4241 index_list: the list of indexes used by the query.
4242 filter_predicate: an additional filter of type
4243 datastore_query.FilterPredicate. This is passed along to implement V4
4244 specific filters without changing the entire stub.
4246 Returns:
4247 A ListCursor over the results of applying query to results.
4249 orders = _GuessOrders(filters, orders)
4250 dsquery = _MakeQuery(query, filters, orders, filter_predicate)
4252 if query.property_name_size():
4253 results = _CreateIndexOnlyQueryResults(
4254 results, set(order.property() for order in orders))
4256 return ListCursor(query, dsquery, orders, index_list,
4257 datastore_query.apply_query(dsquery, results))
4260 def _UpdateCost(cost, entity_writes, index_writes):
4261 """Updates the provided cost.
4263 Args:
4264 cost: Out param. The cost object to update.
4265 entity_writes: The number of entity writes to add.
4266 index_writes: The number of index writes to add.
4268 cost.set_entity_writes(cost.entity_writes() + entity_writes)
4269 cost.set_index_writes(cost.index_writes() + index_writes)
4272 def _CalculateWriteOps(composite_indexes, old_entity, new_entity):
4273 """Determines number of entity and index writes needed to write new_entity.
4275 We assume that old_entity represents the current state of the Datastore.
4277 Args:
4278 composite_indexes: The composite_indexes for the kind of the entities.
4279 old_entity: Entity representing the current state in the Datstore.
4280 new_entity: Entity representing the desired state in the Datstore.
4282 Returns:
4283 A tuple of size 2, where the first value is the number of entity writes and
4284 the second value is the number of index writes.
4286 if (old_entity is not None and
4287 old_entity.property_list() == new_entity.property_list()
4288 and old_entity.raw_property_list() == new_entity.raw_property_list()):
4289 return 0, 0
4291 index_writes = _ChangedIndexRows(composite_indexes, old_entity, new_entity)
4292 if old_entity is None:
4296 index_writes += 1
4298 return 1, index_writes
4301 def _ChangedIndexRows(composite_indexes, old_entity, new_entity):
4302 """Determine the number of index rows that need to change.
4304 We assume that old_entity represents the current state of the Datastore.
4306 Args:
4307 composite_indexes: The composite_indexes for the kind of the entities.
4308 old_entity: Entity representing the current state in the Datastore.
4309 new_entity: Entity representing the desired state in the Datastore
4311 Returns:
4312 The number of index rows that need to change.
4317 unique_old_properties = collections.defaultdict(set)
4322 unique_new_properties = collections.defaultdict(set)
4324 if old_entity is not None:
4325 for old_prop in old_entity.property_list():
4326 _PopulateUniquePropertiesSet(old_prop, unique_old_properties)
4329 unchanged = collections.defaultdict(int)
4331 for new_prop in new_entity.property_list():
4332 new_prop_as_str = _PopulateUniquePropertiesSet(
4333 new_prop, unique_new_properties)
4334 if new_prop_as_str in unique_old_properties[new_prop.name()]:
4335 unchanged[new_prop.name()] += 1
4340 all_property_names = set(unique_old_properties.iterkeys())
4341 all_property_names.update(unique_old_properties.iterkeys())
4342 all_property_names.update(unchanged.iterkeys())
4344 all_indexes = _GetEntityByPropertyIndexes(all_property_names)
4345 all_indexes.extend([comp.definition() for comp in composite_indexes])
4346 path_size = new_entity.key().path().element_size()
4347 writes = 0
4348 for index in all_indexes:
4352 ancestor_multiplier = 1
4353 if index.ancestor() and index.property_size() > 1:
4354 ancestor_multiplier = path_size
4355 writes += (_CalculateWritesForCompositeIndex(
4356 index, unique_old_properties, unique_new_properties, unchanged) *
4357 ancestor_multiplier)
4358 return writes
4361 def _PopulateUniquePropertiesSet(prop, unique_properties):
4362 """Populates a set containing unique properties.
4364 Args:
4365 prop: An entity property.
4366 unique_properties: Dictionary mapping property names to a set of unique
4367 properties.
4369 Returns:
4370 The property pb in string (hashable) form.
4372 if prop.multiple():
4373 prop = _CopyAndSetMultipleToFalse(prop)
4376 prop_as_str = prop.SerializePartialToString()
4377 unique_properties[prop.name()].add(prop_as_str)
4378 return prop_as_str
4381 def _CalculateWritesForCompositeIndex(index, unique_old_properties,
4382 unique_new_properties,
4383 common_properties):
4384 """Calculate the number of writes required to maintain a specific Index.
4386 Args:
4387 index: The composite index.
4388 unique_old_properties: Dictionary mapping property names to a set of props
4389 present on the old entity.
4390 unique_new_properties: Dictionary mapping property names to a set of props
4391 present on the new entity.
4392 common_properties: Dictionary mapping property names to the number of
4393 properties with that name that are present on both the old and new
4394 entities.
4396 Returns:
4397 The number of writes required to maintained the provided index.
4399 old_count = 1
4400 new_count = 1
4401 common_count = 1
4402 for prop in index.property_list():
4403 old_count *= len(unique_old_properties[prop.name()])
4404 new_count *= len(unique_new_properties[prop.name()])
4405 common_count *= common_properties[prop.name()]
4407 return (old_count - common_count) + (new_count - common_count)
4410 def _GetEntityByPropertyIndexes(all_property_names):
4411 indexes = []
4412 for prop_name in all_property_names:
4413 indexes.append(
4414 _SinglePropertyIndex(prop_name, entity_pb.Index_Property.ASCENDING))
4415 indexes.append(
4416 _SinglePropertyIndex(prop_name, entity_pb.Index_Property.DESCENDING))
4417 return indexes
4420 def _SinglePropertyIndex(prop_name, direction):
4421 """Creates a single property Index for the given name and direction.
4423 Args:
4424 prop_name: The name of the single property on the Index.
4425 direction: The direction of the Index.
4427 Returns:
4428 A single property Index with the given property and direction.
4430 index = entity_pb.Index()
4431 prop = index.add_property()
4432 prop.set_name(prop_name)
4433 prop.set_direction(direction)
4434 return index
4437 def _CopyAndSetMultipleToFalse(prop):
4438 """Copy the provided Property and set its "multiple" attribute to False.
4440 Args:
4441 prop: The Property to copy.
4443 Returns:
4444 A copy of the given Property with its "multiple" attribute set to False.
4451 prop_copy = entity_pb.Property()
4452 prop_copy.MergeFrom(prop)
4453 prop_copy.set_multiple(False)
4454 return prop_copy
4457 def _SetStartInclusive(position, first_direction):
4458 """Sets the start_inclusive field in position.
4460 Args:
4461 position: datastore_pb.Position
4462 first_direction: the first sort order from the query
4463 (a datastore_pb.Query_Order) or None
4465 position.set_start_inclusive(
4466 position.before_ascending()
4467 != (first_direction == datastore_pb.Query_Order.DESCENDING))
4470 def _SetBeforeAscending(position, first_direction):
4471 """Sets the before_ascending field in position.
4473 Args:
4474 position: datastore_pb.Position
4475 first_direction: the first sort order from the query
4476 (a datastore_pb.Query_Order) or None
4478 position.set_before_ascending(
4479 position.start_inclusive()
4480 != (first_direction == datastore_pb.Query_Order.DESCENDING))